0685433c2cd36a6cedc848c90d8f4bc524bd98df
[openafs.git] / src / rx / rx_rdwr.c
1  /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #ifdef  KERNEL
11 #include "../afs/param.h"
12 #ifndef UKERNEL
13 #ifdef AFS_DARWIN_ENV
14 #include "../afs/sysincludes.h"
15 #else
16 #include "../h/types.h"
17 #include "../h/time.h"
18 #include "../h/stat.h"
19 #ifdef  AFS_OSF_ENV
20 #include <net/net_globals.h>
21 #endif  /* AFS_OSF_ENV */
22 #ifdef AFS_LINUX20_ENV
23 #include "../h/socket.h"
24 #endif
25 #include "../netinet/in.h"
26 #if defined(AFS_SGI_ENV)
27 #include "../afs/sysincludes.h"
28 #endif
29 #endif
30 #include "../afs/afs_args.h"
31 #include "../afs/afs_osi.h"
32 #if     (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
33 #include "../h/systm.h"
34 #endif
35 #else /* !UKERNEL */
36 #include "../afs/sysincludes.h"
37 #endif /* !UKERNEL */
38 #ifdef RXDEBUG
39 #undef RXDEBUG      /* turn off debugging */
40 #endif /* RXDEBUG */
41
42 #include "../rx/rx_kmutex.h"
43 #include "../rx/rx_kernel.h"
44 #include "../rx/rx_clock.h"
45 #include "../rx/rx_queue.h"
46 #include "../rx/rx.h"
47 #include "../rx/rx_globals.h"
48 #include "../afs/lock.h"
49 #include "../afsint/afsint.h"
50 #ifdef  AFS_ALPHA_ENV
51 #undef kmem_alloc
52 #undef kmem_free
53 #undef mem_alloc
54 #undef mem_free
55 #undef register
56 #endif  /* AFS_ALPHA_ENV */
57 #else /* KERNEL */
58 # include <afs/param.h>
59 # include <sys/types.h>
60 #ifndef AFS_NT40_ENV
61 # include <sys/socket.h>
62 # include <sys/file.h>
63 # include <netdb.h>
64 # include <netinet/in.h>
65 # include <sys/stat.h>
66 # include <sys/time.h>
67 #endif
68 # include "rx_user.h"
69 # include "rx_clock.h"
70 # include "rx_queue.h"
71 # include "rx.h"
72 # include "rx_globals.h"
73 # include "rx_internal.h"
74 #endif /* KERNEL */
75
76
77 #ifdef RX_LOCKS_DB
78 /* rxdb_fileID is used to identify the lock location, along with line#. */
79 static int rxdb_fileID = RXDB_FILE_RX_RDWR;
80 #endif /* RX_LOCKS_DB */
81 /* rxi_ReadProc -- internal version.
82  *
83  * LOCKS USED -- called at netpri with rx global lock and call->lock held.
84  */
85 int rxi_ReadProc(call, buf, nbytes)
86     register struct rx_call *call;
87     register char *buf;
88     register int nbytes;
89 {
90     register struct rx_packet *cp = call->currentPacket;
91     register struct rx_packet *rp;
92     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
93     register int requestCount;
94     register unsigned int t;
95 /* XXXX took out clock_NewTime from here.  Was it needed? */
96     requestCount = nbytes;
97
98     /* Free any packets from the last call to ReadvProc/WritevProc */
99     if (!queue_IsEmpty(&call->iovq)) {
100         for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
101             queue_Remove(rp);
102             rxi_FreePacket(rp);
103         }
104     }
105
106     do {
107       if (call->nLeft == 0) {
108         /* Get next packet */
109         for (;;) {
110           if (call->error || (call->mode != RX_MODE_RECEIVING)) {
111             if (call->error) {
112               return 0;
113             }
114             if (call->mode == RX_MODE_SENDING) {
115               rxi_FlushWrite(call);
116               continue;
117             }
118           }
119           if (queue_IsNotEmpty(&call->rq)) {
120             /* Check that next packet available is next in sequence */
121             rp = queue_First(&call->rq, rx_packet);
122             if (rp->header.seq == call->rnext) {
123               afs_int32 error;
124               register struct rx_connection *conn = call->conn;
125               queue_Remove(rp);
126               
127               /* RXS_CheckPacket called to undo RXS_PreparePacket's
128                * work.  It may reduce the length of the packet by up
129                * to conn->maxTrailerSize, to reflect the length of the
130                * data + the header. */
131               if (error = RXS_CheckPacket(conn->securityObject, call, rp)) {
132                 /* Used to merely shut down the call, but now we 
133                  * shut down the whole connection since this may 
134                  * indicate an attempt to hijack it */
135
136                 MUTEX_EXIT(&call->lock);
137                 rxi_ConnectionError(conn, error);
138                 MUTEX_ENTER(&conn->conn_data_lock);
139                 rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
140                 MUTEX_EXIT(&conn->conn_data_lock);
141                 rxi_FreePacket(rp);
142                 MUTEX_ENTER(&call->lock);
143
144                 return 0;
145               }
146               call->rnext++;
147               cp = call->currentPacket = rp;
148               call->curvec = 1;   /* 0th vec is always header */
149               /* begin at the beginning [ more or less ], continue 
150                * on until the end, then stop. */
151               call->curpos = (char *)cp->wirevec[1].iov_base
152                              + call->conn->securityHeaderSize; 
153               call->curlen = cp->wirevec[1].iov_len
154                              - call->conn->securityHeaderSize; 
155               
156               /* Notice that this code works correctly if the data
157                * size is 0 (which it may be--no reply arguments from
158                * server, for example).  This relies heavily on the
159                * fact that the code below immediately frees the packet
160                * (no yields, etc.).  If it didn't, this would be a
161                * problem because a value of zero for call->nLeft
162                * normally means that there is no read packet */
163               call->nLeft = cp->length;
164               hadd32(call->bytesRcvd, cp->length);
165               
166               /* Send a hard ack for every rxi_HardAckRate+1 packets
167                * consumed. Otherwise schedule an event to send
168                * the hard ack later on.
169                */
170               call->nHardAcks++;
171               if (!(call->flags &RX_CALL_RECEIVE_DONE)) {
172                 if (call->nHardAcks > (u_short)rxi_HardAckRate) {
173                   rxevent_Cancel(call->delayedAckEvent, call,
174                                  RX_CALL_REFCOUNT_DELAY);
175                   rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
176                 }
177                 else {
178                   struct clock when;
179                   clock_GetTime(&when);
180                   /* Delay to consolidate ack packets */
181                   clock_Add(&when, &rx_hardAckDelay);
182                   if (!call->delayedAckEvent ||
183                       clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
184                       rxevent_Cancel(call->delayedAckEvent, call,
185                                      RX_CALL_REFCOUNT_DELAY);
186                       CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
187                       call->delayedAckEvent = rxevent_Post(&when,
188                                                            rxi_SendDelayedAck,
189                                                            call, 0);
190                   }
191                 }
192               }
193               break;
194             }
195           }
196
197 /*
198 MTUXXX  doesn't there need to be an "else" here ??? 
199 */
200           /* Are there ever going to be any more packets? */
201           if (call->flags & RX_CALL_RECEIVE_DONE) {
202             return requestCount - nbytes;
203           }
204           /* Wait for in-sequence packet */
205           call->flags |= RX_CALL_READER_WAIT;
206           clock_NewTime();
207           call->startWait = clock_Sec();
208           while (call->flags & RX_CALL_READER_WAIT) {
209 #ifdef  RX_ENABLE_LOCKS
210               CV_WAIT(&call->cv_rq, &call->lock);
211 #else
212               osi_rxSleep(&call->rq);
213 #endif
214           }
215
216           call->startWait = 0;
217 #ifdef RX_ENABLE_LOCKS
218           if (call->error) {
219               return 0;
220           }
221 #endif /* RX_ENABLE_LOCKS */
222         }
223       }
224       else /* assert(cp); */  /* MTUXXX  this should be replaced by some error-recovery code before shipping */
225 /* yes, the following block is allowed to be the ELSE clause (or not) */
226
227         /* It's possible for call->nLeft to be smaller than any particular
228          * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
229          * reflects the size of the buffer.  We have to keep track of the
230          * number of bytes read in the length field of the packet struct.  On
231          * the final portion of a received packet, it's almost certain that
232          * call->nLeft will be smaller than the final buffer. */
233
234         while (nbytes && cp) {
235           t = MIN((int)call->curlen, nbytes);
236           t = MIN(t, (int)call->nLeft);
237           bcopy (call->curpos, buf, t);
238           buf += t;
239           nbytes -= t;
240           call->curpos += t;
241           call->curlen -= t;
242           call->nLeft -= t;
243
244           if (!call->nLeft) {
245             /* out of packet.  Get another one. */
246             rxi_FreePacket(cp);
247             cp = call->currentPacket = (struct rx_packet *)0;
248           }
249           else if (!call->curlen) {
250             /* need to get another struct iov */
251             if (++call->curvec >= cp->niovecs) {
252               /* current packet is exhausted, get ready for another */
253               /* don't worry about curvec and stuff, they get set somewhere else */
254               rxi_FreePacket(cp);
255               cp = call->currentPacket = (struct rx_packet *)0;
256               call->nLeft = 0;
257             }
258             else {
259               call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
260               call->curlen = cp->wirevec[call->curvec].iov_len;
261             }
262           }  
263         }
264         if (!nbytes) {
265           /* user buffer is full, return */
266           return requestCount;
267         }
268
269     } while (nbytes);
270
271     return requestCount;
272 }
273
274 int rx_ReadProc(call, buf, nbytes)
275   struct rx_call *call;
276   char *buf;
277   int nbytes;
278 {
279     int bytes;
280     int tcurlen;
281     int tnLeft;
282     char *tcurpos;
283     SPLVAR;
284
285     /*
286      * Free any packets from the last call to ReadvProc/WritevProc.
287      * We do not need the lock because the receiver threads only
288      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
289      * RX_CALL_IOVEC_WAIT is always cleared before returning from
290      * ReadvProc/WritevProc.
291      */
292     if (!queue_IsEmpty(&call->iovq)) {
293         register struct rx_packet *rp; 
294         register struct rx_packet *nxp;
295         for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
296             queue_Remove(rp);
297             rxi_FreePacket(rp);
298         }
299     }
300
301     /*
302      * Most common case, all of the data is in the current iovec.
303      * We do not need the lock because this is the only thread that
304      * updates the curlen, curpos, nLeft fields.
305      *
306      * We are relying on nLeft being zero unless the call is in receive mode.
307      */
308     tcurlen = call->curlen;
309     tnLeft = call->nLeft;
310     if (!call->error && tcurlen > nbytes && tnLeft > nbytes) {
311         tcurpos = call->curpos;
312         bcopy(tcurpos, buf, nbytes);
313         call->curpos = tcurpos + nbytes;
314         call->curlen = tcurlen - nbytes;
315         call->nLeft = tnLeft - nbytes;
316         return nbytes;
317     }
318
319     NETPRI;
320     AFS_RXGLOCK();
321     MUTEX_ENTER(&call->lock);
322     bytes = rxi_ReadProc(call, buf, nbytes);
323     MUTEX_EXIT(&call->lock);
324     AFS_RXGUNLOCK();
325     USERPRI;
326     return bytes;
327 }
328
329 /* Optimization for unmarshalling 32 bit integers */
330 int rx_ReadProc32(call, value)
331   struct rx_call *call;
332   afs_int32 *value;
333 {
334     int bytes;
335     int tcurlen;
336     int tnLeft;
337     char *tcurpos;
338     SPLVAR;
339
340     /*
341      * Free any packets from the last call to ReadvProc/WritevProc.
342      * We do not need the lock because the receiver threads only
343      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
344      * RX_CALL_IOVEC_WAIT is always cleared before returning from
345      * ReadvProc/WritevProc.
346      */
347     if (!queue_IsEmpty(&call->iovq)) {
348         register struct rx_packet *rp; 
349         register struct rx_packet *nxp;
350         for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
351             queue_Remove(rp);
352             rxi_FreePacket(rp);
353         }
354     }
355
356     /*
357      * Most common case, all of the data is in the current iovec.
358      * We do not need the lock because this is the only thread that
359      * updates the curlen, curpos, nLeft fields.
360      *
361      * We are relying on nLeft being zero unless the call is in receive mode.
362      */
363     tcurlen = call->curlen;
364     tnLeft = call->nLeft;
365     if (!call->error && tcurlen > sizeof(afs_int32) && tnLeft > sizeof(afs_int32)) {
366         tcurpos = call->curpos;
367         if (!((long)tcurpos & (sizeof(afs_int32)-1))) {
368             *value = *((afs_int32 *)(tcurpos));
369         } else {
370             bcopy(tcurpos, (char *)value, sizeof(afs_int32));
371         }
372         call->curpos = tcurpos + sizeof(afs_int32);
373         call->curlen = tcurlen - sizeof(afs_int32);
374         call->nLeft = tnLeft - sizeof(afs_int32);
375         return sizeof(afs_int32);
376     }
377
378     NETPRI;
379     AFS_RXGLOCK();
380     MUTEX_ENTER(&call->lock);
381     bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
382     MUTEX_EXIT(&call->lock);
383     AFS_RXGUNLOCK();
384     USERPRI;
385     return bytes;
386 }
387
388 /* rxi_FillReadVec
389  *
390  * Uses packets in the receive queue to fill in as much of the
391  * current iovec as possible. Does not block if it runs out
392  * of packets to complete the iovec. Return true if an ack packet
393  * was sent, otherwise return false */
394 int rxi_FillReadVec(call, seq, serial, flags)
395   struct rx_call *call;
396   afs_uint32 seq, serial, flags;
397 {
398     int didConsume = 0;
399     int didHardAck = 0;
400     register unsigned int t;
401     struct rx_packet *rp;
402     struct rx_packet *curp;
403     struct iovec *call_iov;
404     struct iovec *cur_iov;
405
406     curp = call->currentPacket;
407     if (curp) {
408         cur_iov = &curp->wirevec[call->curvec];
409     }
410     call_iov = &call->iov[call->iovNext];
411
412     while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) {
413       if (call->nLeft == 0) {
414         /* Get next packet */
415         if (queue_IsNotEmpty(&call->rq)) {
416           /* Check that next packet available is next in sequence */
417           rp = queue_First(&call->rq, rx_packet);
418           if (rp->header.seq == call->rnext) {
419             afs_int32 error;
420             register struct rx_connection *conn = call->conn;
421             queue_Remove(rp);
422               
423             /* RXS_CheckPacket called to undo RXS_PreparePacket's
424              * work.  It may reduce the length of the packet by up
425              * to conn->maxTrailerSize, to reflect the length of the
426              * data + the header. */
427             if (error = RXS_CheckPacket(conn->securityObject, call, rp)) {
428               /* Used to merely shut down the call, but now we 
429                * shut down the whole connection since this may 
430                * indicate an attempt to hijack it */
431
432               MUTEX_EXIT(&call->lock);
433               rxi_ConnectionError(conn, error);
434               MUTEX_ENTER(&conn->conn_data_lock);
435               rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
436               MUTEX_EXIT(&conn->conn_data_lock);
437               rxi_FreePacket(rp);
438               MUTEX_ENTER(&call->lock);
439
440               return 1;
441             }
442             call->rnext++;
443             curp = call->currentPacket = rp;
444             call->curvec = 1;   /* 0th vec is always header */
445             cur_iov = &curp->wirevec[1];
446             /* begin at the beginning [ more or less ], continue 
447              * on until the end, then stop. */
448             call->curpos = (char *)curp->wirevec[1].iov_base
449                            + call->conn->securityHeaderSize; 
450             call->curlen = curp->wirevec[1].iov_len
451                            - call->conn->securityHeaderSize; 
452               
453             /* Notice that this code works correctly if the data
454              * size is 0 (which it may be--no reply arguments from
455              * server, for example).  This relies heavily on the
456              * fact that the code below immediately frees the packet
457              * (no yields, etc.).  If it didn't, this would be a
458              * problem because a value of zero for call->nLeft
459              * normally means that there is no read packet */
460             call->nLeft = curp->length;
461             hadd32(call->bytesRcvd, curp->length);
462               
463             /* Send a hard ack for every rxi_HardAckRate+1 packets
464              * consumed. Otherwise schedule an event to send
465              * the hard ack later on.
466              */
467             call->nHardAcks++;
468             didConsume = 1;
469             continue;
470           }
471         }
472         break;
473       }
474
475       /* It's possible for call->nLeft to be smaller than any particular
476        * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
477        * reflects the size of the buffer.  We have to keep track of the
478        * number of bytes read in the length field of the packet struct.  On
479        * the final portion of a received packet, it's almost certain that
480        * call->nLeft will be smaller than the final buffer. */
481       while (call->iovNBytes && call->iovNext < call->iovMax && curp) {
482         
483         t = MIN((int)call->curlen, call->iovNBytes);
484         t = MIN(t, (int)call->nLeft);
485         call_iov->iov_base = call->curpos;
486         call_iov->iov_len = t;
487         call_iov++;
488         call->iovNext++;
489         call->iovNBytes -= t;
490         call->curpos += t;
491         call->curlen -= t;
492         call->nLeft -= t;
493
494         if (!call->nLeft) {
495           /* out of packet.  Get another one. */
496           queue_Append(&call->iovq, curp);
497           curp = call->currentPacket = (struct rx_packet *)0;
498         }
499         else if (!call->curlen) {
500           /* need to get another struct iov */
501           if (++call->curvec >= curp->niovecs) {
502             /* current packet is exhausted, get ready for another */
503             /* don't worry about curvec and stuff, they get set somewhere else */
504             queue_Append(&call->iovq, curp);
505             curp = call->currentPacket = (struct rx_packet *)0;
506             call->nLeft = 0;
507           }
508           else {
509             cur_iov++;
510             call->curpos = (char *)cur_iov->iov_base;
511             call->curlen = cur_iov->iov_len;
512           }
513         }  
514       }  
515     }
516
517     /* If we consumed any packets then check whether we need to
518      * send a hard ack. */
519     if (didConsume && (!(call->flags &RX_CALL_RECEIVE_DONE))) {
520       if (call->nHardAcks > (u_short)rxi_HardAckRate) {
521         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
522         rxi_SendAck(call, 0, seq, serial, flags, RX_ACK_DELAY, 0);
523         didHardAck = 1;
524       }
525       else {
526         struct clock when;
527         clock_GetTime(&when);
528         /* Delay to consolidate ack packets */
529         clock_Add(&when, &rx_hardAckDelay);
530         if (!call->delayedAckEvent ||
531           clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
532           rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
533           CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
534           call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
535                                                call, 0);
536         }
537       }
538     }
539     return didHardAck;
540 }
541
542
543 /* rxi_ReadvProc -- internal version.
544  *
545  * Fills in an iovec with pointers to the packet buffers. All packets
546  * except the last packet (new current packet) are moved to the iovq
547  * while the application is processing the data.
548  *
549  * LOCKS USED -- called at netpri with rx global lock and call->lock held.
550  */
551 int rxi_ReadvProc(call, iov, nio, maxio, nbytes)
552     struct rx_call *call;
553     struct iovec *iov;
554     int *nio;
555     int maxio;
556     int nbytes;
557 {
558     struct rx_packet *rp; 
559     struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
560     int requestCount;
561     int nextio;
562
563     requestCount = nbytes;
564     nextio = 0;
565
566     /* Free any packets from the last call to ReadvProc/WritevProc */
567     for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
568         queue_Remove(rp);
569         rxi_FreePacket(rp);
570     }
571
572     if (call->mode == RX_MODE_SENDING) {
573         rxi_FlushWrite(call);
574     }
575
576     if (call->error) {
577         return 0;
578     }
579
580     /* Get whatever data is currently available in the receive queue.
581      * If rxi_FillReadVec sends an ack packet then it is possible
582      * that we will receive more data while we drop the call lock
583      * to send the packet. Set the RX_CALL_IOVEC_WAIT flag
584      * here to avoid a race with the receive thread if we send
585      * hard acks in rxi_FillReadVec. */
586     call->flags |= RX_CALL_IOVEC_WAIT;
587     call->iovNBytes = nbytes;
588     call->iovMax = maxio;
589     call->iovNext = 0;
590     call->iov = iov;
591     rxi_FillReadVec(call, 0, 0, 0);
592
593     /* if we need more data then sleep until the receive thread has
594      * filled in the rest. */
595     if (!call->error && call->iovNBytes &&
596         call->iovNext < call->iovMax &&
597         !(call->flags & RX_CALL_RECEIVE_DONE)) {
598       call->flags |= RX_CALL_READER_WAIT;
599       clock_NewTime();
600       call->startWait = clock_Sec();
601       while (call->flags & RX_CALL_READER_WAIT) {
602 #ifdef  RX_ENABLE_LOCKS
603         CV_WAIT(&call->cv_rq, &call->lock);
604 #else
605         osi_rxSleep(&call->rq);
606 #endif
607       }
608       call->startWait = 0;
609     }
610     call->flags &= ~RX_CALL_IOVEC_WAIT;
611 #ifdef RX_ENABLE_LOCKS
612     if (call->error) {
613       return 0;
614     }
615 #endif /* RX_ENABLE_LOCKS */
616
617     call->iov = NULL;
618     *nio = call->iovNext;
619     return nbytes - call->iovNBytes;
620 }
621
622 int rx_ReadvProc(call, iov, nio, maxio, nbytes)
623     struct rx_call *call;
624     struct iovec *iov;
625     int *nio;
626     int maxio;
627     int nbytes;
628 {
629     int bytes;
630     SPLVAR;
631
632     NETPRI;
633     AFS_RXGLOCK();
634     MUTEX_ENTER(&call->lock);
635     bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
636     MUTEX_EXIT(&call->lock);
637     AFS_RXGUNLOCK();
638     USERPRI;
639     return bytes;
640 }
641
642 /* rxi_WriteProc -- internal version.
643  *
644  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
645
646 int rxi_WriteProc(call, buf, nbytes)
647     register struct rx_call *call;
648     register char *buf;
649     register int nbytes;
650 {
651     struct rx_connection *conn = call->conn;
652     register struct rx_packet *cp = call->currentPacket;
653     register struct rx_packet *tp; /* Temporary packet pointer */
654     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
655     register unsigned int t;
656     int requestCount = nbytes;
657
658     /* Free any packets from the last call to ReadvProc/WritevProc */
659     if (!queue_IsEmpty(&call->iovq)) {
660         for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
661             queue_Remove(tp);
662             rxi_FreePacket(tp);
663         }
664     }
665
666     if (call->mode != RX_MODE_SENDING) {
667         if ((conn->type == RX_SERVER_CONNECTION)
668             && (call->mode == RX_MODE_RECEIVING)) {
669             call->mode = RX_MODE_SENDING;
670             if (cp) {
671                 rxi_FreePacket(cp);
672                 cp = call->currentPacket = (struct rx_packet *) 0;
673                 call->nLeft = 0;
674                 call->nFree = 0;
675             }
676         }
677         else {
678             return 0;
679         }
680     }
681
682     /* Loop condition is checked at end, so that a write of 0 bytes
683      * will force a packet to be created--specially for the case where
684      * there are 0 bytes on the stream, but we must send a packet
685      * anyway. */
686     do {
687         if (call->nFree == 0) {
688             if (!call->error && cp) {
689 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
690                 /* Wait until TQ_BUSY is reset before adding any
691                  * packets to the transmit queue
692                  */
693                 while (call->flags & RX_CALL_TQ_BUSY) {
694                     call->flags |= RX_CALL_TQ_WAIT;
695 #ifdef RX_ENABLE_LOCKS
696                     CV_WAIT(&call->cv_tq, &call->lock);
697 #else /* RX_ENABLE_LOCKS */
698                     osi_rxSleep(&call->tq);
699 #endif /* RX_ENABLE_LOCKS */
700                 }
701 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
702                 clock_NewTime(); /* Bogus:  need new time package */
703                 /* The 0, below, specifies that it is not the last packet: 
704                  * there will be others. PrepareSendPacket may
705                  * alter the packet length by up to
706                  * conn->securityMaxTrailerSize */
707                 hadd32(call->bytesSent, cp->length);
708                 rxi_PrepareSendPacket(call, cp, 0);
709                 queue_Append(&call->tq, cp);
710                 cp = call->currentPacket = NULL;
711                 if (!(call->flags & (RX_CALL_FAST_RECOVER|
712                                      RX_CALL_FAST_RECOVER_WAIT))) {
713                     rxi_Start(0, call, 0);
714                 }
715             }
716             /* Wait for transmit window to open up */
717             while (!call->error && call->tnext + 1 > call->tfirst + call->twind) {
718                 clock_NewTime();
719                 call->startWait = clock_Sec();
720
721 #ifdef  RX_ENABLE_LOCKS
722                 CV_WAIT(&call->cv_twind, &call->lock);
723 #else
724                 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
725                 osi_rxSleep(&call->twind);
726 #endif
727
728                 call->startWait = 0;
729 #ifdef RX_ENABLE_LOCKS
730                 if (call->error) {
731                     return 0;
732                 }
733 #endif /* RX_ENABLE_LOCKS */
734             }
735             if (cp = rxi_AllocSendPacket(call, nbytes)) {
736                 call->currentPacket = cp;
737                 call->nFree = cp->length;
738                 call->curvec = 1;   /* 0th vec is always header */
739                 /* begin at the beginning [ more or less ], continue 
740                  * on until the end, then stop. */
741                 call->curpos = (char *)cp->wirevec[1].iov_base
742                                + call->conn->securityHeaderSize; 
743                 call->curlen = cp->wirevec[1].iov_len
744                                - call->conn->securityHeaderSize; 
745             }
746             if (call->error) {
747                 if (cp) {
748                   rxi_FreePacket(cp);
749                   call->currentPacket = NULL;
750                 }
751                 return 0;
752             }
753         }
754
755         if (cp && (int)call->nFree < nbytes) {
756               /* Try to extend the current buffer */
757               register int len, mud;
758               len = cp->length;
759               mud = rx_MaxUserDataSize(call);
760               if (mud > len) {
761                 int want;
762                 want = MIN(nbytes - (int)call->nFree, mud - len);
763                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
764                 if (cp->length > (unsigned)mud)
765                     cp->length = mud;
766                 call->nFree += (cp->length - len);
767               }
768         }
769
770         /* If the remaining bytes fit in the buffer, then store them
771          * and return.  Don't ship a buffer that's full immediately to
772          * the peer--we don't know if it's the last buffer yet */
773
774         if (!cp)  {
775           call->nFree = 0;
776         }
777
778         while (nbytes && call->nFree) {
779         
780           t = MIN((int)call->curlen, nbytes);
781           t = MIN((int)call->nFree, t);
782           bcopy (buf, call->curpos, t);
783           buf += t;
784           nbytes -= t;
785           call->curpos += t;
786           call->curlen -= t;
787           call->nFree -= t;
788
789           if (!call->curlen) {
790             /* need to get another struct iov */
791             if (++call->curvec >= cp->niovecs) {
792               /* current packet is full, extend or send it */
793               call->nFree = 0;
794             } else {
795               call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
796               call->curlen = cp->wirevec[call->curvec].iov_len;
797             }
798           }  
799         } /* while bytes to send and room to send them */
800
801         /* might be out of space now */
802         if (!nbytes) {
803             return requestCount;
804         }
805         else ; /* more data to send, so get another packet and keep going */
806     } while(nbytes);
807
808     return requestCount - nbytes;
809 }
810
811 int rx_WriteProc(call, buf, nbytes)
812   struct rx_call *call;
813   char *buf;
814   int nbytes;
815 {
816     int bytes;
817     int tcurlen;
818     int tnFree;
819     char *tcurpos;
820     SPLVAR;
821
822     /*
823      * Free any packets from the last call to ReadvProc/WritevProc.
824      * We do not need the lock because the receiver threads only
825      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
826      * RX_CALL_IOVEC_WAIT is always cleared before returning from
827      * ReadvProc/WritevProc.
828      */
829     if (!queue_IsEmpty(&call->iovq)) {
830         register struct rx_packet *rp; 
831         register struct rx_packet *nxp;
832         for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
833             queue_Remove(rp);
834             rxi_FreePacket(rp);
835         }
836     }
837
838     /*
839      * Most common case: all of the data fits in the current iovec.
840      * We do not need the lock because this is the only thread that
841      * updates the curlen, curpos, nFree fields.
842      *
843      * We are relying on nFree being zero unless the call is in send mode.
844      */
845     tcurlen = (int)call->curlen;
846     tnFree = (int)call->nFree;
847     if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
848         tcurpos = call->curpos;
849         bcopy(buf, tcurpos, nbytes);
850         call->curpos = tcurpos + nbytes;
851         call->curlen = tcurlen - nbytes;
852         call->nFree = tnFree - nbytes;
853         return nbytes;
854     }
855
856     NETPRI;
857     AFS_RXGLOCK();
858     MUTEX_ENTER(&call->lock);
859     bytes = rxi_WriteProc(call, buf, nbytes);
860     MUTEX_EXIT(&call->lock);
861     AFS_RXGUNLOCK();
862     USERPRI;
863     return bytes;
864 }
865
866 /* Optimization for marshalling 32 bit arguments */
867 int rx_WriteProc32(call, value)
868   register struct rx_call *call;
869   register afs_int32 *value;
870 {
871     int bytes;
872     int tcurlen;
873     int tnFree;
874     char *tcurpos;
875     SPLVAR;
876
877     /*
878      * Free any packets from the last call to ReadvProc/WritevProc.
879      * We do not need the lock because the receiver threads only
880      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
881      * RX_CALL_IOVEC_WAIT is always cleared before returning from
882      * ReadvProc/WritevProc.
883      */
884     if (!queue_IsEmpty(&call->iovq)) {
885         register struct rx_packet *rp; 
886         register struct rx_packet *nxp;
887         for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
888             queue_Remove(rp);
889             rxi_FreePacket(rp);
890         }
891     }
892
893     /*
894      * Most common case: all of the data fits in the current iovec.
895      * We do not need the lock because this is the only thread that
896      * updates the curlen, curpos, nFree fields.
897      *
898      * We are relying on nFree being zero unless the call is in send mode.
899      */
900     tcurlen = (int)call->curlen;
901     tnFree = (int)call->nFree;
902     if (!call->error && tcurlen >= sizeof(afs_int32) && tnFree >= sizeof(afs_int32)) {
903         tcurpos = call->curpos;
904         if (!((long)tcurpos & (sizeof(afs_int32)-1))) {
905             *((afs_int32 *)(tcurpos)) = *value;
906         } else {
907             bcopy((char *)value, tcurpos, sizeof(afs_int32));
908         }
909         call->curpos = tcurpos + sizeof(afs_int32);
910         call->curlen = tcurlen - sizeof(afs_int32);
911         call->nFree = tnFree - sizeof(afs_int32);
912         return sizeof(afs_int32);
913     }
914
915     NETPRI;
916     AFS_RXGLOCK();
917     MUTEX_ENTER(&call->lock);
918     bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
919     MUTEX_EXIT(&call->lock);
920     AFS_RXGUNLOCK();
921     USERPRI;
922     return bytes;
923 }
924
925 /* rxi_WritevAlloc -- internal version.
926  *
927  * Fill in an iovec to point to data in packet buffers. The application
928  * calls rxi_WritevProc when the buffers are full.
929  *
930  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
931
932 int rxi_WritevAlloc(call, iov, nio, maxio, nbytes)
933     struct rx_call *call;
934     struct iovec *iov;
935     int *nio;
936     int maxio;
937     int nbytes;
938 {
939     struct rx_connection *conn = call->conn;
940     struct rx_packet *cp = call->currentPacket;
941     struct rx_packet *tp; /* temporary packet pointer */
942     struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
943     int requestCount;
944     int nextio;
945     /* Temporary values, real work is done in rxi_WritevProc */
946     int tnFree; 
947     int tcurvec;
948     char * tcurpos;
949     int tcurlen;
950
951     requestCount = nbytes;
952     nextio = 0;
953     
954     /* Free any packets from the last call to ReadvProc/WritevProc */
955     for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
956         queue_Remove(tp);
957         rxi_FreePacket(tp);
958     }
959
960     if (call->mode != RX_MODE_SENDING) {
961         if ((conn->type == RX_SERVER_CONNECTION)
962             && (call->mode == RX_MODE_RECEIVING)) {
963             call->mode = RX_MODE_SENDING;
964             if (cp) {
965                 rxi_FreePacket(cp);
966                 cp = call->currentPacket = (struct rx_packet *) 0;
967                 call->nLeft = 0;
968                 call->nFree = 0;
969             }
970         }
971         else {
972             return 0;
973         }
974     }
975
976     /* Set up the iovec to point to data in packet buffers. */
977     tnFree = call->nFree;
978     tcurvec = call->curvec;
979     tcurpos = call->curpos;
980     tcurlen = call->curlen;
981     do {
982       register unsigned int t;
983
984       if (tnFree == 0) {
985         /* current packet is full, allocate a new one */
986         cp = rxi_AllocSendPacket(call, nbytes);
987         if (cp == NULL) {
988           /* out of space, return what we have */
989           *nio = nextio;
990           return requestCount - nbytes;
991         }
992         queue_Append(&call->iovq, cp);
993         tnFree = cp->length;
994         tcurvec = 1;
995         tcurpos = (char *)cp->wirevec[1].iov_base
996                   + call->conn->securityHeaderSize;
997         tcurlen = cp->wirevec[1].iov_len
998                   - call->conn->securityHeaderSize;
999       }
1000
1001       if (tnFree < nbytes) {
1002         /* try to extend the current packet */
1003         register int len, mud;
1004         len = cp->length;
1005         mud = rx_MaxUserDataSize(call);
1006         if (mud > len) {
1007           int want;
1008           want = MIN(nbytes - tnFree, mud - len);
1009           rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
1010           if (cp->length > (unsigned)mud)
1011             cp->length = mud;
1012           tnFree += (cp->length - len);
1013           if (cp == call->currentPacket) {
1014             call->nFree += (cp->length - len);
1015           }
1016         }
1017       }
1018
1019       /* fill in the next entry in the iovec */
1020       t = MIN(tcurlen, nbytes);
1021       t = MIN(tnFree, t);
1022       iov[nextio].iov_base = tcurpos;
1023       iov[nextio].iov_len = t;
1024       nbytes -= t;
1025       tcurpos += t;
1026       tcurlen -= t;
1027       tnFree -= t;
1028       nextio++;
1029
1030       if (!tcurlen) {
1031           /* need to get another struct iov */
1032           if (++tcurvec >= cp->niovecs) {
1033             /* current packet is full, extend it or move on to next packet */
1034             tnFree = 0;
1035           } else { 
1036             tcurpos = (char *)cp->wirevec[tcurvec].iov_base;
1037             tcurlen = cp->wirevec[tcurvec].iov_len;
1038           }
1039       }
1040     } while (nbytes && nextio < maxio);
1041     *nio = nextio;
1042     return requestCount - nbytes;
1043 }
1044
1045 int rx_WritevAlloc(call, iov, nio, maxio, nbytes)
1046     struct rx_call *call;
1047     struct iovec *iov;
1048     int *nio;
1049     int maxio;
1050     int nbytes;
1051 {
1052     int bytes;
1053     SPLVAR;
1054
1055     NETPRI;
1056     AFS_RXGLOCK();
1057     MUTEX_ENTER(&call->lock);
1058     bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
1059     MUTEX_EXIT(&call->lock);
1060     AFS_RXGUNLOCK();
1061     USERPRI;
1062     return bytes;
1063 }
1064
1065 int rx_WritevInit(call)
1066     struct rx_call *call;
1067 {
1068     int bytes;
1069     SPLVAR;
1070
1071     /*
1072      * Free any packets from the last call to ReadvProc/WritevProc.
1073      * We do not need the lock because the receiver threads only
1074      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
1075      * RX_CALL_IOVEC_WAIT is always cleared before returning from
1076      * ReadvProc/WritevProc.
1077      */
1078     if (!queue_IsEmpty(&call->iovq)) {
1079         register struct rx_packet *rp;
1080         register struct rx_packet *nxp;
1081         for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
1082             queue_Remove(rp);
1083             rxi_FreePacket(rp);
1084         }
1085     }
1086
1087     NETPRI;
1088     AFS_RXGLOCK();
1089     MUTEX_ENTER(&call->lock);
1090     bytes = rxi_WriteProc(call, &bytes, 0);
1091     MUTEX_EXIT(&call->lock);
1092     AFS_RXGUNLOCK();
1093     USERPRI;
1094     return bytes;
1095 }
1096
1097 /* rxi_WritevProc -- internal version.
1098  *
1099  * Send buffers allocated in rxi_WritevAlloc.
1100  *
1101  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
1102
1103 int rxi_WritevProc(call, iov, nio, nbytes)
1104     struct rx_call *call;
1105     struct iovec *iov;
1106     int nio;
1107     int nbytes;
1108 {
1109     struct rx_connection *conn = call->conn;
1110     struct rx_packet *cp = call->currentPacket;
1111     register struct rx_packet *tp; /* Temporary packet pointer */
1112     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1113     int nextio;
1114     int requestCount;
1115     struct rx_queue tmpq;
1116
1117     requestCount = nbytes;
1118     nextio = 0;
1119
1120     if (call->mode != RX_MODE_SENDING) {
1121         call->error = RX_PROTOCOL_ERROR;
1122     }
1123
1124 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1125     /* Wait until TQ_BUSY is reset before trying to move any
1126      * packets to the transmit queue.  */
1127     while (!call->error && call->flags & RX_CALL_TQ_BUSY) {
1128         call->flags |= RX_CALL_TQ_WAIT;
1129 #ifdef RX_ENABLE_LOCKS
1130         CV_WAIT(&call->cv_tq, &call->lock);
1131 #else /* RX_ENABLE_LOCKS */
1132         osi_rxSleep(&call->tq);
1133 #endif /* RX_ENABLE_LOCKS */
1134     }
1135 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1136
1137     if (call->error) {
1138         for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1139             queue_Remove(tp);
1140             rxi_FreePacket(tp);
1141         }
1142         if (cp) {
1143             rxi_FreePacket(cp);
1144             cp = call->currentPacket = NULL;
1145         }
1146         return 0;
1147     }
1148
1149     /* Loop through the I/O vector adjusting packet pointers.
1150      * Place full packets back onto the iovq once they are ready
1151      * to send. Set RX_PROTOCOL_ERROR if any problems are found in
1152      * the iovec. We put the loop condition at the end to ensure that
1153      * a zero length write will push a short packet. */
1154     nextio = 0;
1155     queue_Init(&tmpq);
1156     do {
1157         unsigned int t;
1158
1159         if (call->nFree == 0 && cp) {
1160             clock_NewTime(); /* Bogus:  need new time package */
1161             /* The 0, below, specifies that it is not the last packet: 
1162              * there will be others. PrepareSendPacket may
1163              * alter the packet length by up to
1164              * conn->securityMaxTrailerSize */
1165             hadd32(call->bytesSent, cp->length);
1166             rxi_PrepareSendPacket(call, cp, 0);
1167             queue_Append(&tmpq, cp);
1168
1169             /* The head of the iovq is now the current packet */
1170             if (nbytes) {
1171                 if (queue_IsEmpty(&call->iovq)) {
1172                     call->error = RX_PROTOCOL_ERROR;
1173                     cp = call->currentPacket = NULL;
1174                     for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
1175                         queue_Remove(tp);
1176                         rxi_FreePacket(tp);
1177                     }
1178                     return 0;
1179                 }
1180                 cp = queue_First(&call->iovq, rx_packet);
1181                 queue_Remove(cp);
1182                 call->currentPacket = cp;
1183                 call->nFree = cp->length;
1184                 call->curvec = 1;
1185                 call->curpos = (char *)cp->wirevec[1].iov_base
1186                                + call->conn->securityHeaderSize;
1187                 call->curlen = cp->wirevec[1].iov_len
1188                                - call->conn->securityHeaderSize;
1189             }
1190         }
1191
1192         if (nbytes) {
1193           /* The next iovec should point to the current position */
1194           if (iov[nextio].iov_base != call->curpos
1195               || iov[nextio].iov_len > (int)call->curlen) {
1196               call->error = RX_PROTOCOL_ERROR;
1197               for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
1198                   queue_Remove(tp);
1199                   rxi_FreePacket(tp);
1200               }
1201               if (cp) {
1202                 rxi_FreePacket(cp);
1203                 call->currentPacket = NULL;
1204               }
1205               return 0;
1206           }
1207           nbytes -= iov[nextio].iov_len;
1208           call->curpos += iov[nextio].iov_len;
1209           call->curlen -= iov[nextio].iov_len;
1210           call->nFree -= iov[nextio].iov_len;
1211           nextio++;
1212           if (call->curlen == 0) {
1213             if (++call->curvec > cp->niovecs) {
1214                 call->nFree = 0;
1215             } else {
1216                 call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
1217                 call->curlen = cp->wirevec[call->curvec].iov_len;
1218             }
1219           }
1220         }
1221     } while (nbytes && nextio < nio);
1222
1223     /* Move the packets from the temporary queue onto the transmit queue.
1224      * We may end up with more than call->twind packets on the queue. */
1225     for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
1226         queue_Remove(tp);
1227         queue_Append(&call->tq, tp);
1228     }
1229
1230     if (!(call->flags & (RX_CALL_FAST_RECOVER|RX_CALL_FAST_RECOVER_WAIT))) {
1231         rxi_Start(0, call, 0);
1232     }
1233
1234     /* Wait for the length of the transmit queue to fall below call->twind */
1235     while (!call->error && call->tnext + 1 > call->tfirst + call->twind) {
1236         clock_NewTime();
1237         call->startWait = clock_Sec();
1238 #ifdef  RX_ENABLE_LOCKS
1239         CV_WAIT(&call->cv_twind, &call->lock);
1240 #else
1241         call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
1242         osi_rxSleep(&call->twind);
1243 #endif
1244         call->startWait = 0;
1245     }
1246
1247     if (call->error) {
1248         if (cp) {
1249             rxi_FreePacket(cp);
1250             cp = call->currentPacket = NULL;
1251         }
1252         return 0;
1253     }
1254
1255     return requestCount - nbytes;
1256 }
1257
1258 int rx_WritevProc(call, iov, nio, nbytes)
1259     struct rx_call *call;
1260     struct iovec *iov;
1261     int nio;
1262     int nbytes;
1263 {
1264     int bytes;
1265     SPLVAR;
1266
1267     NETPRI;
1268     AFS_RXGLOCK();
1269     MUTEX_ENTER(&call->lock);
1270     bytes = rxi_WritevProc(call, iov, nio, nbytes);
1271     MUTEX_EXIT(&call->lock);
1272     AFS_RXGUNLOCK();
1273     USERPRI;
1274     return bytes;
1275 }
1276
1277 /* Flush any buffered data to the stream, switch to read mode
1278  * (clients) or to EOF mode (servers) */
1279 void rxi_FlushWrite(call)
1280     register struct rx_call *call;
1281 {
1282     register struct rx_packet *cp = call->currentPacket;
1283     register struct rx_packet *tp; /* Temporary packet pointer */
1284     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1285
1286     /* Free any packets from the last call to ReadvProc/WritevProc */
1287     for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1288         queue_Remove(tp);
1289         rxi_FreePacket(tp);
1290     }
1291
1292     if (call->mode == RX_MODE_SENDING) {
1293
1294         call->mode = (call->conn->type == RX_CLIENT_CONNECTION ?
1295                                           RX_MODE_RECEIVING: RX_MODE_EOF);
1296
1297 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1298         /* Wait until TQ_BUSY is reset before adding any
1299          * packets to the transmit queue
1300          */
1301         while (call->flags & RX_CALL_TQ_BUSY) {
1302             call->flags |= RX_CALL_TQ_WAIT;
1303 #ifdef RX_ENABLE_LOCKS
1304             CV_WAIT(&call->cv_tq, &call->lock);
1305 #else /* RX_ENABLE_LOCKS */
1306             osi_rxSleep(&call->tq);
1307 #endif /* RX_ENABLE_LOCKS */
1308         }
1309 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1310
1311         if (cp) {
1312             /* cp->length is only supposed to be the user's data */
1313             /* cp->length was already set to (then-current) 
1314              * MaxUserDataSize or less. */
1315             cp->length -= call->nFree;
1316             call->currentPacket = (struct rx_packet *) 0;
1317             call->nFree = 0;
1318         }
1319         else {
1320             cp = rxi_AllocSendPacket(call,0);
1321             if (!cp) {
1322                 /* Mode can no longer be MODE_SENDING */
1323                 return;
1324             }
1325             cp->length = 0;
1326             cp->niovecs = 1;  /* just the header */
1327             call->nFree = 0;
1328         }
1329
1330         /* The 1 specifies that this is the last packet */
1331         hadd32(call->bytesSent, cp->length);
1332         rxi_PrepareSendPacket(call, cp, 1);
1333         queue_Append(&call->tq, cp);
1334         if (!(call->flags & (RX_CALL_FAST_RECOVER|
1335                              RX_CALL_FAST_RECOVER_WAIT))) {
1336             rxi_Start(0, call, 0);
1337         }
1338     }
1339 }
1340
1341 /* Flush any buffered data to the stream, switch to read mode
1342  * (clients) or to EOF mode (servers) */
1343 void rx_FlushWrite(call)
1344   struct rx_call *call;
1345 {
1346     SPLVAR;
1347     NETPRI;
1348     AFS_RXGLOCK();
1349     MUTEX_ENTER(&call->lock);
1350     rxi_FlushWrite(call);
1351     MUTEX_EXIT(&call->lock);
1352     AFS_RXGUNLOCK();
1353     USERPRI;
1354 }