avoid-string-h-conflict-in-kernel-rx-20010612
[openafs.git] / src / rx / rx_rdwr.c
1  /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #ifdef  KERNEL
11 #include "../afs/param.h"
12 #include <afsconfig.h>
13 #ifndef UKERNEL
14 #if defined(AFS_DARWIN_ENV) || defined(AFS_FBSD_ENV)
15 #include "../afs/sysincludes.h"
16 #else
17 #include "../h/types.h"
18 #include "../h/time.h"
19 #include "../h/stat.h"
20 #ifdef  AFS_OSF_ENV
21 #include <net/net_globals.h>
22 #endif  /* AFS_OSF_ENV */
23 #ifdef AFS_LINUX20_ENV
24 #include "../h/socket.h"
25 #endif
26 #include "../netinet/in.h"
27 #if defined(AFS_SGI_ENV)
28 #include "../afs/sysincludes.h"
29 #endif
30 #endif
31 #include "../afs/afs_args.h"
32 #include "../afs/afs_osi.h"
33 #if     (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
34 #include "../h/systm.h"
35 #endif
36 #else /* !UKERNEL */
37 #include "../afs/sysincludes.h"
38 #endif /* !UKERNEL */
39 #ifdef RXDEBUG
40 #undef RXDEBUG      /* turn off debugging */
41 #endif /* RXDEBUG */
42
43 #include "../rx/rx_kmutex.h"
44 #include "../rx/rx_kernel.h"
45 #include "../rx/rx_clock.h"
46 #include "../rx/rx_queue.h"
47 #include "../rx/rx.h"
48 #include "../rx/rx_globals.h"
49 #include "../afs/lock.h"
50 #include "../afsint/afsint.h"
51 #ifdef  AFS_ALPHA_ENV
52 #undef kmem_alloc
53 #undef kmem_free
54 #undef mem_alloc
55 #undef mem_free
56 #undef register
57 #endif  /* AFS_ALPHA_ENV */
58 #else /* KERNEL */
59 # include <afs/param.h>
60 # include <afsconfig.h>
61 # include <sys/types.h>
62 #ifndef AFS_NT40_ENV
63 # include <sys/socket.h>
64 # include <sys/file.h>
65 # include <netdb.h>
66 # include <netinet/in.h>
67 # include <sys/stat.h>
68 # include <sys/time.h>
69 #endif
70 #ifdef HAVE_STRING_H
71 #include <string.h>
72 #endif
73 #ifdef HAVE_STRINGS_H
74 #include <strings.h>
75 #endif
76 #ifdef HAVE_UNISTD_H
77 #include <unistd.h>
78 #endif
79 # include "rx_user.h"
80 # include "rx_clock.h"
81 # include "rx_queue.h"
82 # include "rx.h"
83 # include "rx_globals.h"
84 # include "rx_internal.h"
85 #endif /* KERNEL */
86
87 #ifdef RX_LOCKS_DB
88 /* rxdb_fileID is used to identify the lock location, along with line#. */
89 static int rxdb_fileID = RXDB_FILE_RX_RDWR;
90 #endif /* RX_LOCKS_DB */
91 /* rxi_ReadProc -- internal version.
92  *
93  * LOCKS USED -- called at netpri with rx global lock and call->lock held.
94  */
95 int rxi_ReadProc(call, buf, nbytes)
96     register struct rx_call *call;
97     register char *buf;
98     register int nbytes;
99 {
100     register struct rx_packet *cp = call->currentPacket;
101     register struct rx_packet *rp;
102     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
103     register int requestCount;
104     register unsigned int t;
105 /* XXXX took out clock_NewTime from here.  Was it needed? */
106     requestCount = nbytes;
107
108     /* Free any packets from the last call to ReadvProc/WritevProc */
109     if (!queue_IsEmpty(&call->iovq)) {
110         for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
111             queue_Remove(rp);
112             rxi_FreePacket(rp);
113         }
114     }
115
116     do {
117       if (call->nLeft == 0) {
118         /* Get next packet */
119         for (;;) {
120           if (call->error || (call->mode != RX_MODE_RECEIVING)) {
121             if (call->error) {
122               return 0;
123             }
124             if (call->mode == RX_MODE_SENDING) {
125               rxi_FlushWrite(call);
126               continue;
127             }
128           }
129           if (queue_IsNotEmpty(&call->rq)) {
130             /* Check that next packet available is next in sequence */
131             rp = queue_First(&call->rq, rx_packet);
132             if (rp->header.seq == call->rnext) {
133               afs_int32 error;
134               register struct rx_connection *conn = call->conn;
135               queue_Remove(rp);
136               
137               /* RXS_CheckPacket called to undo RXS_PreparePacket's
138                * work.  It may reduce the length of the packet by up
139                * to conn->maxTrailerSize, to reflect the length of the
140                * data + the header. */
141               if ((error = RXS_CheckPacket(conn->securityObject, call, rp))) {
142                 /* Used to merely shut down the call, but now we 
143                  * shut down the whole connection since this may 
144                  * indicate an attempt to hijack it */
145
146                 MUTEX_EXIT(&call->lock);
147                 rxi_ConnectionError(conn, error);
148                 MUTEX_ENTER(&conn->conn_data_lock);
149                 rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
150                 MUTEX_EXIT(&conn->conn_data_lock);
151                 rxi_FreePacket(rp);
152                 MUTEX_ENTER(&call->lock);
153
154                 return 0;
155               }
156               call->rnext++;
157               cp = call->currentPacket = rp;
158               call->curvec = 1;   /* 0th vec is always header */
159               /* begin at the beginning [ more or less ], continue 
160                * on until the end, then stop. */
161               call->curpos = (char *)cp->wirevec[1].iov_base
162                              + call->conn->securityHeaderSize; 
163               call->curlen = cp->wirevec[1].iov_len
164                              - call->conn->securityHeaderSize; 
165               
166               /* Notice that this code works correctly if the data
167                * size is 0 (which it may be--no reply arguments from
168                * server, for example).  This relies heavily on the
169                * fact that the code below immediately frees the packet
170                * (no yields, etc.).  If it didn't, this would be a
171                * problem because a value of zero for call->nLeft
172                * normally means that there is no read packet */
173               call->nLeft = cp->length;
174               hadd32(call->bytesRcvd, cp->length);
175               
176               /* Send a hard ack for every rxi_HardAckRate+1 packets
177                * consumed. Otherwise schedule an event to send
178                * the hard ack later on.
179                */
180               call->nHardAcks++;
181               if (!(call->flags &RX_CALL_RECEIVE_DONE)) {
182                 if (call->nHardAcks > (u_short)rxi_HardAckRate) {
183                   rxevent_Cancel(call->delayedAckEvent, call,
184                                  RX_CALL_REFCOUNT_DELAY);
185                   rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
186                 }
187                 else {
188                   struct clock when;
189                   clock_GetTime(&when);
190                   /* Delay to consolidate ack packets */
191                   clock_Add(&when, &rx_hardAckDelay);
192                   if (!call->delayedAckEvent ||
193                       clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
194                       rxevent_Cancel(call->delayedAckEvent, call,
195                                      RX_CALL_REFCOUNT_DELAY);
196                       CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
197                       call->delayedAckEvent = rxevent_Post(&when,
198                                                            rxi_SendDelayedAck,
199                                                            call, 0);
200                   }
201                 }
202               }
203               break;
204             }
205           }
206
207 /*
208 MTUXXX  doesn't there need to be an "else" here ??? 
209 */
210           /* Are there ever going to be any more packets? */
211           if (call->flags & RX_CALL_RECEIVE_DONE) {
212             return requestCount - nbytes;
213           }
214           /* Wait for in-sequence packet */
215           call->flags |= RX_CALL_READER_WAIT;
216           clock_NewTime();
217           call->startWait = clock_Sec();
218           while (call->flags & RX_CALL_READER_WAIT) {
219 #ifdef  RX_ENABLE_LOCKS
220               CV_WAIT(&call->cv_rq, &call->lock);
221 #else
222               osi_rxSleep(&call->rq);
223 #endif
224           }
225
226           call->startWait = 0;
227 #ifdef RX_ENABLE_LOCKS
228           if (call->error) {
229               return 0;
230           }
231 #endif /* RX_ENABLE_LOCKS */
232         }
233       }
234       else /* assert(cp); */  /* MTUXXX  this should be replaced by some error-recovery code before shipping */
235 /* yes, the following block is allowed to be the ELSE clause (or not) */
236
237         /* It's possible for call->nLeft to be smaller than any particular
238          * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
239          * reflects the size of the buffer.  We have to keep track of the
240          * number of bytes read in the length field of the packet struct.  On
241          * the final portion of a received packet, it's almost certain that
242          * call->nLeft will be smaller than the final buffer. */
243
244         while (nbytes && cp) {
245           t = MIN((int)call->curlen, nbytes);
246           t = MIN(t, (int)call->nLeft);
247           bcopy (call->curpos, buf, t);
248           buf += t;
249           nbytes -= t;
250           call->curpos += t;
251           call->curlen -= t;
252           call->nLeft -= t;
253
254           if (!call->nLeft) {
255             /* out of packet.  Get another one. */
256             rxi_FreePacket(cp);
257             cp = call->currentPacket = (struct rx_packet *)0;
258           }
259           else if (!call->curlen) {
260             /* need to get another struct iov */
261             if (++call->curvec >= cp->niovecs) {
262               /* current packet is exhausted, get ready for another */
263               /* don't worry about curvec and stuff, they get set somewhere else */
264               rxi_FreePacket(cp);
265               cp = call->currentPacket = (struct rx_packet *)0;
266               call->nLeft = 0;
267             }
268             else {
269               call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
270               call->curlen = cp->wirevec[call->curvec].iov_len;
271             }
272           }  
273         }
274         if (!nbytes) {
275           /* user buffer is full, return */
276           return requestCount;
277         }
278
279     } while (nbytes);
280
281     return requestCount;
282 }
283
284 int rx_ReadProc(call, buf, nbytes)
285   struct rx_call *call;
286   char *buf;
287   int nbytes;
288 {
289     int bytes;
290     int tcurlen;
291     int tnLeft;
292     char *tcurpos;
293     SPLVAR;
294
295     /*
296      * Free any packets from the last call to ReadvProc/WritevProc.
297      * We do not need the lock because the receiver threads only
298      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
299      * RX_CALL_IOVEC_WAIT is always cleared before returning from
300      * ReadvProc/WritevProc.
301      */
302     if (!queue_IsEmpty(&call->iovq)) {
303         register struct rx_packet *rp; 
304         register struct rx_packet *nxp;
305         for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
306             queue_Remove(rp);
307             rxi_FreePacket(rp);
308         }
309     }
310
311     /*
312      * Most common case, all of the data is in the current iovec.
313      * We do not need the lock because this is the only thread that
314      * updates the curlen, curpos, nLeft fields.
315      *
316      * We are relying on nLeft being zero unless the call is in receive mode.
317      */
318     tcurlen = call->curlen;
319     tnLeft = call->nLeft;
320     if (!call->error && tcurlen > nbytes && tnLeft > nbytes) {
321         tcurpos = call->curpos;
322         bcopy(tcurpos, buf, nbytes);
323         call->curpos = tcurpos + nbytes;
324         call->curlen = tcurlen - nbytes;
325         call->nLeft = tnLeft - nbytes;
326         return nbytes;
327     }
328
329     NETPRI;
330     AFS_RXGLOCK();
331     MUTEX_ENTER(&call->lock);
332     bytes = rxi_ReadProc(call, buf, nbytes);
333     MUTEX_EXIT(&call->lock);
334     AFS_RXGUNLOCK();
335     USERPRI;
336     return bytes;
337 }
338
339 /* Optimization for unmarshalling 32 bit integers */
340 int rx_ReadProc32(call, value)
341   struct rx_call *call;
342   afs_int32 *value;
343 {
344     int bytes;
345     int tcurlen;
346     int tnLeft;
347     char *tcurpos;
348     SPLVAR;
349
350     /*
351      * Free any packets from the last call to ReadvProc/WritevProc.
352      * We do not need the lock because the receiver threads only
353      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
354      * RX_CALL_IOVEC_WAIT is always cleared before returning from
355      * ReadvProc/WritevProc.
356      */
357     if (!queue_IsEmpty(&call->iovq)) {
358         register struct rx_packet *rp; 
359         register struct rx_packet *nxp;
360         for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
361             queue_Remove(rp);
362             rxi_FreePacket(rp);
363         }
364     }
365
366     /*
367      * Most common case, all of the data is in the current iovec.
368      * We do not need the lock because this is the only thread that
369      * updates the curlen, curpos, nLeft fields.
370      *
371      * We are relying on nLeft being zero unless the call is in receive mode.
372      */
373     tcurlen = call->curlen;
374     tnLeft = call->nLeft;
375     if (!call->error && tcurlen > sizeof(afs_int32) && tnLeft > sizeof(afs_int32)) {
376         tcurpos = call->curpos;
377         if (!((long)tcurpos & (sizeof(afs_int32)-1))) {
378             *value = *((afs_int32 *)(tcurpos));
379         } else {
380             bcopy(tcurpos, (char *)value, sizeof(afs_int32));
381         }
382         call->curpos = tcurpos + sizeof(afs_int32);
383         call->curlen = tcurlen - sizeof(afs_int32);
384         call->nLeft = tnLeft - sizeof(afs_int32);
385         return sizeof(afs_int32);
386     }
387
388     NETPRI;
389     AFS_RXGLOCK();
390     MUTEX_ENTER(&call->lock);
391     bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
392     MUTEX_EXIT(&call->lock);
393     AFS_RXGUNLOCK();
394     USERPRI;
395     return bytes;
396 }
397
398 /* rxi_FillReadVec
399  *
400  * Uses packets in the receive queue to fill in as much of the
401  * current iovec as possible. Does not block if it runs out
402  * of packets to complete the iovec. Return true if an ack packet
403  * was sent, otherwise return false */
404 int rxi_FillReadVec(call, seq, serial, flags)
405   struct rx_call *call;
406   afs_uint32 seq, serial, flags;
407 {
408     int didConsume = 0;
409     int didHardAck = 0;
410     register unsigned int t;
411     struct rx_packet *rp;
412     struct rx_packet *curp;
413     struct iovec *call_iov;
414     struct iovec *cur_iov = NULL;
415
416     curp = call->currentPacket;
417     if (curp) {
418         cur_iov = &curp->wirevec[call->curvec];
419     }
420     call_iov = &call->iov[call->iovNext];
421
422     while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) {
423       if (call->nLeft == 0) {
424         /* Get next packet */
425         if (queue_IsNotEmpty(&call->rq)) {
426           /* Check that next packet available is next in sequence */
427           rp = queue_First(&call->rq, rx_packet);
428           if (rp->header.seq == call->rnext) {
429             afs_int32 error;
430             register struct rx_connection *conn = call->conn;
431             queue_Remove(rp);
432               
433             /* RXS_CheckPacket called to undo RXS_PreparePacket's
434              * work.  It may reduce the length of the packet by up
435              * to conn->maxTrailerSize, to reflect the length of the
436              * data + the header. */
437             if ((error = RXS_CheckPacket(conn->securityObject, call, rp))) {
438               /* Used to merely shut down the call, but now we 
439                * shut down the whole connection since this may 
440                * indicate an attempt to hijack it */
441
442               MUTEX_EXIT(&call->lock);
443               rxi_ConnectionError(conn, error);
444               MUTEX_ENTER(&conn->conn_data_lock);
445               rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
446               MUTEX_EXIT(&conn->conn_data_lock);
447               rxi_FreePacket(rp);
448               MUTEX_ENTER(&call->lock);
449
450               return 1;
451             }
452             call->rnext++;
453             curp = call->currentPacket = rp;
454             call->curvec = 1;   /* 0th vec is always header */
455             cur_iov = &curp->wirevec[1];
456             /* begin at the beginning [ more or less ], continue 
457              * on until the end, then stop. */
458             call->curpos = (char *)curp->wirevec[1].iov_base
459                            + call->conn->securityHeaderSize; 
460             call->curlen = curp->wirevec[1].iov_len
461                            - call->conn->securityHeaderSize; 
462               
463             /* Notice that this code works correctly if the data
464              * size is 0 (which it may be--no reply arguments from
465              * server, for example).  This relies heavily on the
466              * fact that the code below immediately frees the packet
467              * (no yields, etc.).  If it didn't, this would be a
468              * problem because a value of zero for call->nLeft
469              * normally means that there is no read packet */
470             call->nLeft = curp->length;
471             hadd32(call->bytesRcvd, curp->length);
472               
473             /* Send a hard ack for every rxi_HardAckRate+1 packets
474              * consumed. Otherwise schedule an event to send
475              * the hard ack later on.
476              */
477             call->nHardAcks++;
478             didConsume = 1;
479             continue;
480           }
481         }
482         break;
483       }
484
485       /* It's possible for call->nLeft to be smaller than any particular
486        * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
487        * reflects the size of the buffer.  We have to keep track of the
488        * number of bytes read in the length field of the packet struct.  On
489        * the final portion of a received packet, it's almost certain that
490        * call->nLeft will be smaller than the final buffer. */
491       while (call->iovNBytes && call->iovNext < call->iovMax && curp) {
492         
493         t = MIN((int)call->curlen, call->iovNBytes);
494         t = MIN(t, (int)call->nLeft);
495         call_iov->iov_base = call->curpos;
496         call_iov->iov_len = t;
497         call_iov++;
498         call->iovNext++;
499         call->iovNBytes -= t;
500         call->curpos += t;
501         call->curlen -= t;
502         call->nLeft -= t;
503
504         if (!call->nLeft) {
505           /* out of packet.  Get another one. */
506           queue_Append(&call->iovq, curp);
507           curp = call->currentPacket = (struct rx_packet *)0;
508         }
509         else if (!call->curlen) {
510           /* need to get another struct iov */
511           if (++call->curvec >= curp->niovecs) {
512             /* current packet is exhausted, get ready for another */
513             /* don't worry about curvec and stuff, they get set somewhere else */
514             queue_Append(&call->iovq, curp);
515             curp = call->currentPacket = (struct rx_packet *)0;
516             call->nLeft = 0;
517           }
518           else {
519             cur_iov++;
520             call->curpos = (char *)cur_iov->iov_base;
521             call->curlen = cur_iov->iov_len;
522           }
523         }  
524       }  
525     }
526
527     /* If we consumed any packets then check whether we need to
528      * send a hard ack. */
529     if (didConsume && (!(call->flags &RX_CALL_RECEIVE_DONE))) {
530       if (call->nHardAcks > (u_short)rxi_HardAckRate) {
531         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
532         rxi_SendAck(call, 0, seq, serial, flags, RX_ACK_DELAY, 0);
533         didHardAck = 1;
534       }
535       else {
536         struct clock when;
537         clock_GetTime(&when);
538         /* Delay to consolidate ack packets */
539         clock_Add(&when, &rx_hardAckDelay);
540         if (!call->delayedAckEvent ||
541           clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
542           rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
543           CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
544           call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
545                                                call, 0);
546         }
547       }
548     }
549     return didHardAck;
550 }
551
552
553 /* rxi_ReadvProc -- internal version.
554  *
555  * Fills in an iovec with pointers to the packet buffers. All packets
556  * except the last packet (new current packet) are moved to the iovq
557  * while the application is processing the data.
558  *
559  * LOCKS USED -- called at netpri with rx global lock and call->lock held.
560  */
561 int rxi_ReadvProc(call, iov, nio, maxio, nbytes)
562     struct rx_call *call;
563     struct iovec *iov;
564     int *nio;
565     int maxio;
566     int nbytes;
567 {
568     struct rx_packet *rp; 
569     struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
570     int requestCount;
571     int nextio;
572
573     requestCount = nbytes;
574     nextio = 0;
575
576     /* Free any packets from the last call to ReadvProc/WritevProc */
577     for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
578         queue_Remove(rp);
579         rxi_FreePacket(rp);
580     }
581
582     if (call->mode == RX_MODE_SENDING) {
583         rxi_FlushWrite(call);
584     }
585
586     if (call->error) {
587         return 0;
588     }
589
590     /* Get whatever data is currently available in the receive queue.
591      * If rxi_FillReadVec sends an ack packet then it is possible
592      * that we will receive more data while we drop the call lock
593      * to send the packet. Set the RX_CALL_IOVEC_WAIT flag
594      * here to avoid a race with the receive thread if we send
595      * hard acks in rxi_FillReadVec. */
596     call->flags |= RX_CALL_IOVEC_WAIT;
597     call->iovNBytes = nbytes;
598     call->iovMax = maxio;
599     call->iovNext = 0;
600     call->iov = iov;
601     rxi_FillReadVec(call, 0, 0, 0);
602
603     /* if we need more data then sleep until the receive thread has
604      * filled in the rest. */
605     if (!call->error && call->iovNBytes &&
606         call->iovNext < call->iovMax &&
607         !(call->flags & RX_CALL_RECEIVE_DONE)) {
608       call->flags |= RX_CALL_READER_WAIT;
609       clock_NewTime();
610       call->startWait = clock_Sec();
611       while (call->flags & RX_CALL_READER_WAIT) {
612 #ifdef  RX_ENABLE_LOCKS
613         CV_WAIT(&call->cv_rq, &call->lock);
614 #else
615         osi_rxSleep(&call->rq);
616 #endif
617       }
618       call->startWait = 0;
619     }
620     call->flags &= ~RX_CALL_IOVEC_WAIT;
621 #ifdef RX_ENABLE_LOCKS
622     if (call->error) {
623       return 0;
624     }
625 #endif /* RX_ENABLE_LOCKS */
626
627     call->iov = NULL;
628     *nio = call->iovNext;
629     return nbytes - call->iovNBytes;
630 }
631
632 int rx_ReadvProc(call, iov, nio, maxio, nbytes)
633     struct rx_call *call;
634     struct iovec *iov;
635     int *nio;
636     int maxio;
637     int nbytes;
638 {
639     int bytes;
640     SPLVAR;
641
642     NETPRI;
643     AFS_RXGLOCK();
644     MUTEX_ENTER(&call->lock);
645     bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
646     MUTEX_EXIT(&call->lock);
647     AFS_RXGUNLOCK();
648     USERPRI;
649     return bytes;
650 }
651
652 /* rxi_WriteProc -- internal version.
653  *
654  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
655
656 int rxi_WriteProc(call, buf, nbytes)
657     register struct rx_call *call;
658     register char *buf;
659     register int nbytes;
660 {
661     struct rx_connection *conn = call->conn;
662     register struct rx_packet *cp = call->currentPacket;
663     register struct rx_packet *tp; /* Temporary packet pointer */
664     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
665     register unsigned int t;
666     int requestCount = nbytes;
667
668     /* Free any packets from the last call to ReadvProc/WritevProc */
669     if (!queue_IsEmpty(&call->iovq)) {
670         for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
671             queue_Remove(tp);
672             rxi_FreePacket(tp);
673         }
674     }
675
676     if (call->mode != RX_MODE_SENDING) {
677         if ((conn->type == RX_SERVER_CONNECTION)
678             && (call->mode == RX_MODE_RECEIVING)) {
679             call->mode = RX_MODE_SENDING;
680             if (cp) {
681                 rxi_FreePacket(cp);
682                 cp = call->currentPacket = (struct rx_packet *) 0;
683                 call->nLeft = 0;
684                 call->nFree = 0;
685             }
686         }
687         else {
688             return 0;
689         }
690     }
691
692     /* Loop condition is checked at end, so that a write of 0 bytes
693      * will force a packet to be created--specially for the case where
694      * there are 0 bytes on the stream, but we must send a packet
695      * anyway. */
696     do {
697         if (call->nFree == 0) {
698             if (!call->error && cp) {
699 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
700                 /* Wait until TQ_BUSY is reset before adding any
701                  * packets to the transmit queue
702                  */
703                 while (call->flags & RX_CALL_TQ_BUSY) {
704                     call->flags |= RX_CALL_TQ_WAIT;
705 #ifdef RX_ENABLE_LOCKS
706                     CV_WAIT(&call->cv_tq, &call->lock);
707 #else /* RX_ENABLE_LOCKS */
708                     osi_rxSleep(&call->tq);
709 #endif /* RX_ENABLE_LOCKS */
710                 }
711 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
712                 clock_NewTime(); /* Bogus:  need new time package */
713                 /* The 0, below, specifies that it is not the last packet: 
714                  * there will be others. PrepareSendPacket may
715                  * alter the packet length by up to
716                  * conn->securityMaxTrailerSize */
717                 hadd32(call->bytesSent, cp->length);
718                 rxi_PrepareSendPacket(call, cp, 0);
719                 queue_Append(&call->tq, cp);
720                 cp = call->currentPacket = NULL;
721                 if (!(call->flags & (RX_CALL_FAST_RECOVER|
722                                      RX_CALL_FAST_RECOVER_WAIT))) {
723                     rxi_Start(0, call, 0);
724                 }
725             }
726             /* Wait for transmit window to open up */
727             while (!call->error && call->tnext + 1 > call->tfirst + call->twind) {
728                 clock_NewTime();
729                 call->startWait = clock_Sec();
730
731 #ifdef  RX_ENABLE_LOCKS
732                 CV_WAIT(&call->cv_twind, &call->lock);
733 #else
734                 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
735                 osi_rxSleep(&call->twind);
736 #endif
737
738                 call->startWait = 0;
739 #ifdef RX_ENABLE_LOCKS
740                 if (call->error) {
741                     return 0;
742                 }
743 #endif /* RX_ENABLE_LOCKS */
744             }
745             if ((cp = rxi_AllocSendPacket(call, nbytes))) {
746                 call->currentPacket = cp;
747                 call->nFree = cp->length;
748                 call->curvec = 1;   /* 0th vec is always header */
749                 /* begin at the beginning [ more or less ], continue 
750                  * on until the end, then stop. */
751                 call->curpos = (char *)cp->wirevec[1].iov_base
752                                + call->conn->securityHeaderSize; 
753                 call->curlen = cp->wirevec[1].iov_len
754                                - call->conn->securityHeaderSize; 
755             }
756             if (call->error) {
757                 if (cp) {
758                   rxi_FreePacket(cp);
759                   call->currentPacket = NULL;
760                 }
761                 return 0;
762             }
763         }
764
765         if (cp && (int)call->nFree < nbytes) {
766               /* Try to extend the current buffer */
767               register int len, mud;
768               len = cp->length;
769               mud = rx_MaxUserDataSize(call);
770               if (mud > len) {
771                 int want;
772                 want = MIN(nbytes - (int)call->nFree, mud - len);
773                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
774                 if (cp->length > (unsigned)mud)
775                     cp->length = mud;
776                 call->nFree += (cp->length - len);
777               }
778         }
779
780         /* If the remaining bytes fit in the buffer, then store them
781          * and return.  Don't ship a buffer that's full immediately to
782          * the peer--we don't know if it's the last buffer yet */
783
784         if (!cp)  {
785           call->nFree = 0;
786         }
787
788         while (nbytes && call->nFree) {
789         
790           t = MIN((int)call->curlen, nbytes);
791           t = MIN((int)call->nFree, t);
792           bcopy (buf, call->curpos, t);
793           buf += t;
794           nbytes -= t;
795           call->curpos += t;
796           call->curlen -= t;
797           call->nFree -= t;
798
799           if (!call->curlen) {
800             /* need to get another struct iov */
801             if (++call->curvec >= cp->niovecs) {
802               /* current packet is full, extend or send it */
803               call->nFree = 0;
804             } else {
805               call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
806               call->curlen = cp->wirevec[call->curvec].iov_len;
807             }
808           }  
809         } /* while bytes to send and room to send them */
810
811         /* might be out of space now */
812         if (!nbytes) {
813             return requestCount;
814         }
815         else ; /* more data to send, so get another packet and keep going */
816     } while(nbytes);
817
818     return requestCount - nbytes;
819 }
820
821 int rx_WriteProc(call, buf, nbytes)
822   struct rx_call *call;
823   char *buf;
824   int nbytes;
825 {
826     int bytes;
827     int tcurlen;
828     int tnFree;
829     char *tcurpos;
830     SPLVAR;
831
832     /*
833      * Free any packets from the last call to ReadvProc/WritevProc.
834      * We do not need the lock because the receiver threads only
835      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
836      * RX_CALL_IOVEC_WAIT is always cleared before returning from
837      * ReadvProc/WritevProc.
838      */
839     if (!queue_IsEmpty(&call->iovq)) {
840         register struct rx_packet *rp; 
841         register struct rx_packet *nxp;
842         for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
843             queue_Remove(rp);
844             rxi_FreePacket(rp);
845         }
846     }
847
848     /*
849      * Most common case: all of the data fits in the current iovec.
850      * We do not need the lock because this is the only thread that
851      * updates the curlen, curpos, nFree fields.
852      *
853      * We are relying on nFree being zero unless the call is in send mode.
854      */
855     tcurlen = (int)call->curlen;
856     tnFree = (int)call->nFree;
857     if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
858         tcurpos = call->curpos;
859         bcopy(buf, tcurpos, nbytes);
860         call->curpos = tcurpos + nbytes;
861         call->curlen = tcurlen - nbytes;
862         call->nFree = tnFree - nbytes;
863         return nbytes;
864     }
865
866     NETPRI;
867     AFS_RXGLOCK();
868     MUTEX_ENTER(&call->lock);
869     bytes = rxi_WriteProc(call, buf, nbytes);
870     MUTEX_EXIT(&call->lock);
871     AFS_RXGUNLOCK();
872     USERPRI;
873     return bytes;
874 }
875
876 /* Optimization for marshalling 32 bit arguments */
877 int rx_WriteProc32(call, value)
878   register struct rx_call *call;
879   register afs_int32 *value;
880 {
881     int bytes;
882     int tcurlen;
883     int tnFree;
884     char *tcurpos;
885     SPLVAR;
886
887     /*
888      * Free any packets from the last call to ReadvProc/WritevProc.
889      * We do not need the lock because the receiver threads only
890      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
891      * RX_CALL_IOVEC_WAIT is always cleared before returning from
892      * ReadvProc/WritevProc.
893      */
894     if (!queue_IsEmpty(&call->iovq)) {
895         register struct rx_packet *rp; 
896         register struct rx_packet *nxp;
897         for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
898             queue_Remove(rp);
899             rxi_FreePacket(rp);
900         }
901     }
902
903     /*
904      * Most common case: all of the data fits in the current iovec.
905      * We do not need the lock because this is the only thread that
906      * updates the curlen, curpos, nFree fields.
907      *
908      * We are relying on nFree being zero unless the call is in send mode.
909      */
910     tcurlen = (int)call->curlen;
911     tnFree = (int)call->nFree;
912     if (!call->error && tcurlen >= sizeof(afs_int32) && tnFree >= sizeof(afs_int32)) {
913         tcurpos = call->curpos;
914         if (!((long)tcurpos & (sizeof(afs_int32)-1))) {
915             *((afs_int32 *)(tcurpos)) = *value;
916         } else {
917             bcopy((char *)value, tcurpos, sizeof(afs_int32));
918         }
919         call->curpos = tcurpos + sizeof(afs_int32);
920         call->curlen = tcurlen - sizeof(afs_int32);
921         call->nFree = tnFree - sizeof(afs_int32);
922         return sizeof(afs_int32);
923     }
924
925     NETPRI;
926     AFS_RXGLOCK();
927     MUTEX_ENTER(&call->lock);
928     bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
929     MUTEX_EXIT(&call->lock);
930     AFS_RXGUNLOCK();
931     USERPRI;
932     return bytes;
933 }
934
935 /* rxi_WritevAlloc -- internal version.
936  *
937  * Fill in an iovec to point to data in packet buffers. The application
938  * calls rxi_WritevProc when the buffers are full.
939  *
940  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
941
942 int rxi_WritevAlloc(call, iov, nio, maxio, nbytes)
943     struct rx_call *call;
944     struct iovec *iov;
945     int *nio;
946     int maxio;
947     int nbytes;
948 {
949     struct rx_connection *conn = call->conn;
950     struct rx_packet *cp = call->currentPacket;
951     struct rx_packet *tp; /* temporary packet pointer */
952     struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
953     int requestCount;
954     int nextio;
955     /* Temporary values, real work is done in rxi_WritevProc */
956     int tnFree; 
957     int tcurvec;
958     char * tcurpos;
959     int tcurlen;
960
961     requestCount = nbytes;
962     nextio = 0;
963     
964     /* Free any packets from the last call to ReadvProc/WritevProc */
965     for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
966         queue_Remove(tp);
967         rxi_FreePacket(tp);
968     }
969
970     if (call->mode != RX_MODE_SENDING) {
971         if ((conn->type == RX_SERVER_CONNECTION)
972             && (call->mode == RX_MODE_RECEIVING)) {
973             call->mode = RX_MODE_SENDING;
974             if (cp) {
975                 rxi_FreePacket(cp);
976                 cp = call->currentPacket = (struct rx_packet *) 0;
977                 call->nLeft = 0;
978                 call->nFree = 0;
979             }
980         }
981         else {
982             return 0;
983         }
984     }
985
986     /* Set up the iovec to point to data in packet buffers. */
987     tnFree = call->nFree;
988     tcurvec = call->curvec;
989     tcurpos = call->curpos;
990     tcurlen = call->curlen;
991     do {
992       register unsigned int t;
993
994       if (tnFree == 0) {
995         /* current packet is full, allocate a new one */
996         cp = rxi_AllocSendPacket(call, nbytes);
997         if (cp == NULL) {
998           /* out of space, return what we have */
999           *nio = nextio;
1000           return requestCount - nbytes;
1001         }
1002         queue_Append(&call->iovq, cp);
1003         tnFree = cp->length;
1004         tcurvec = 1;
1005         tcurpos = (char *)cp->wirevec[1].iov_base
1006                   + call->conn->securityHeaderSize;
1007         tcurlen = cp->wirevec[1].iov_len
1008                   - call->conn->securityHeaderSize;
1009       }
1010
1011       if (tnFree < nbytes) {
1012         /* try to extend the current packet */
1013         register int len, mud;
1014         len = cp->length;
1015         mud = rx_MaxUserDataSize(call);
1016         if (mud > len) {
1017           int want;
1018           want = MIN(nbytes - tnFree, mud - len);
1019           rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
1020           if (cp->length > (unsigned)mud)
1021             cp->length = mud;
1022           tnFree += (cp->length - len);
1023           if (cp == call->currentPacket) {
1024             call->nFree += (cp->length - len);
1025           }
1026         }
1027       }
1028
1029       /* fill in the next entry in the iovec */
1030       t = MIN(tcurlen, nbytes);
1031       t = MIN(tnFree, t);
1032       iov[nextio].iov_base = tcurpos;
1033       iov[nextio].iov_len = t;
1034       nbytes -= t;
1035       tcurpos += t;
1036       tcurlen -= t;
1037       tnFree -= t;
1038       nextio++;
1039
1040       if (!tcurlen) {
1041           /* need to get another struct iov */
1042           if (++tcurvec >= cp->niovecs) {
1043             /* current packet is full, extend it or move on to next packet */
1044             tnFree = 0;
1045           } else { 
1046             tcurpos = (char *)cp->wirevec[tcurvec].iov_base;
1047             tcurlen = cp->wirevec[tcurvec].iov_len;
1048           }
1049       }
1050     } while (nbytes && nextio < maxio);
1051     *nio = nextio;
1052     return requestCount - nbytes;
1053 }
1054
1055 int rx_WritevAlloc(call, iov, nio, maxio, nbytes)
1056     struct rx_call *call;
1057     struct iovec *iov;
1058     int *nio;
1059     int maxio;
1060     int nbytes;
1061 {
1062     int bytes;
1063     SPLVAR;
1064
1065     NETPRI;
1066     AFS_RXGLOCK();
1067     MUTEX_ENTER(&call->lock);
1068     bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
1069     MUTEX_EXIT(&call->lock);
1070     AFS_RXGUNLOCK();
1071     USERPRI;
1072     return bytes;
1073 }
1074
1075 int rx_WritevInit(call)
1076     struct rx_call *call;
1077 {
1078     int bytes;
1079     SPLVAR;
1080
1081     /*
1082      * Free any packets from the last call to ReadvProc/WritevProc.
1083      * We do not need the lock because the receiver threads only
1084      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
1085      * RX_CALL_IOVEC_WAIT is always cleared before returning from
1086      * ReadvProc/WritevProc.
1087      */
1088     if (!queue_IsEmpty(&call->iovq)) {
1089         register struct rx_packet *rp;
1090         register struct rx_packet *nxp;
1091         for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
1092             queue_Remove(rp);
1093             rxi_FreePacket(rp);
1094         }
1095     }
1096
1097     NETPRI;
1098     AFS_RXGLOCK();
1099     MUTEX_ENTER(&call->lock);
1100     bytes = rxi_WriteProc(call, &bytes, 0);
1101     MUTEX_EXIT(&call->lock);
1102     AFS_RXGUNLOCK();
1103     USERPRI;
1104     return bytes;
1105 }
1106
1107 /* rxi_WritevProc -- internal version.
1108  *
1109  * Send buffers allocated in rxi_WritevAlloc.
1110  *
1111  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
1112
1113 int rxi_WritevProc(call, iov, nio, nbytes)
1114     struct rx_call *call;
1115     struct iovec *iov;
1116     int nio;
1117     int nbytes;
1118 {
1119     struct rx_packet *cp = call->currentPacket;
1120     register struct rx_packet *tp; /* Temporary packet pointer */
1121     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1122     int nextio;
1123     int requestCount;
1124     struct rx_queue tmpq;
1125
1126     requestCount = nbytes;
1127     nextio = 0;
1128
1129     if (call->mode != RX_MODE_SENDING) {
1130         call->error = RX_PROTOCOL_ERROR;
1131     }
1132
1133 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1134     /* Wait until TQ_BUSY is reset before trying to move any
1135      * packets to the transmit queue.  */
1136     while (!call->error && call->flags & RX_CALL_TQ_BUSY) {
1137         call->flags |= RX_CALL_TQ_WAIT;
1138 #ifdef RX_ENABLE_LOCKS
1139         CV_WAIT(&call->cv_tq, &call->lock);
1140 #else /* RX_ENABLE_LOCKS */
1141         osi_rxSleep(&call->tq);
1142 #endif /* RX_ENABLE_LOCKS */
1143     }
1144 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1145
1146     if (call->error) {
1147         for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1148             queue_Remove(tp);
1149             rxi_FreePacket(tp);
1150         }
1151         if (cp) {
1152             rxi_FreePacket(cp);
1153             cp = call->currentPacket = NULL;
1154         }
1155         return 0;
1156     }
1157
1158     /* Loop through the I/O vector adjusting packet pointers.
1159      * Place full packets back onto the iovq once they are ready
1160      * to send. Set RX_PROTOCOL_ERROR if any problems are found in
1161      * the iovec. We put the loop condition at the end to ensure that
1162      * a zero length write will push a short packet. */
1163     nextio = 0;
1164     queue_Init(&tmpq);
1165     do {
1166         if (call->nFree == 0 && cp) {
1167             clock_NewTime(); /* Bogus:  need new time package */
1168             /* The 0, below, specifies that it is not the last packet: 
1169              * there will be others. PrepareSendPacket may
1170              * alter the packet length by up to
1171              * conn->securityMaxTrailerSize */
1172             hadd32(call->bytesSent, cp->length);
1173             rxi_PrepareSendPacket(call, cp, 0);
1174             queue_Append(&tmpq, cp);
1175
1176             /* The head of the iovq is now the current packet */
1177             if (nbytes) {
1178                 if (queue_IsEmpty(&call->iovq)) {
1179                     call->error = RX_PROTOCOL_ERROR;
1180                     cp = call->currentPacket = NULL;
1181                     for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
1182                         queue_Remove(tp);
1183                         rxi_FreePacket(tp);
1184                     }
1185                     return 0;
1186                 }
1187                 cp = queue_First(&call->iovq, rx_packet);
1188                 queue_Remove(cp);
1189                 call->currentPacket = cp;
1190                 call->nFree = cp->length;
1191                 call->curvec = 1;
1192                 call->curpos = (char *)cp->wirevec[1].iov_base
1193                                + call->conn->securityHeaderSize;
1194                 call->curlen = cp->wirevec[1].iov_len
1195                                - call->conn->securityHeaderSize;
1196             }
1197         }
1198
1199         if (nbytes) {
1200           /* The next iovec should point to the current position */
1201           if (iov[nextio].iov_base != call->curpos
1202               || iov[nextio].iov_len > (int)call->curlen) {
1203               call->error = RX_PROTOCOL_ERROR;
1204               for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
1205                   queue_Remove(tp);
1206                   rxi_FreePacket(tp);
1207               }
1208               if (cp) {
1209                 rxi_FreePacket(cp);
1210                 call->currentPacket = NULL;
1211               }
1212               return 0;
1213           }
1214           nbytes -= iov[nextio].iov_len;
1215           call->curpos += iov[nextio].iov_len;
1216           call->curlen -= iov[nextio].iov_len;
1217           call->nFree -= iov[nextio].iov_len;
1218           nextio++;
1219           if (call->curlen == 0) {
1220             if (++call->curvec > cp->niovecs) {
1221                 call->nFree = 0;
1222             } else {
1223                 call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
1224                 call->curlen = cp->wirevec[call->curvec].iov_len;
1225             }
1226           }
1227         }
1228     } while (nbytes && nextio < nio);
1229
1230     /* Move the packets from the temporary queue onto the transmit queue.
1231      * We may end up with more than call->twind packets on the queue. */
1232     for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
1233         queue_Remove(tp);
1234         queue_Append(&call->tq, tp);
1235     }
1236
1237     if (!(call->flags & (RX_CALL_FAST_RECOVER|RX_CALL_FAST_RECOVER_WAIT))) {
1238         rxi_Start(0, call, 0);
1239     }
1240
1241     /* Wait for the length of the transmit queue to fall below call->twind */
1242     while (!call->error && call->tnext + 1 > call->tfirst + call->twind) {
1243         clock_NewTime();
1244         call->startWait = clock_Sec();
1245 #ifdef  RX_ENABLE_LOCKS
1246         CV_WAIT(&call->cv_twind, &call->lock);
1247 #else
1248         call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
1249         osi_rxSleep(&call->twind);
1250 #endif
1251         call->startWait = 0;
1252     }
1253
1254     if (call->error) {
1255         if (cp) {
1256             rxi_FreePacket(cp);
1257             cp = call->currentPacket = NULL;
1258         }
1259         return 0;
1260     }
1261
1262     return requestCount - nbytes;
1263 }
1264
1265 int rx_WritevProc(call, iov, nio, nbytes)
1266     struct rx_call *call;
1267     struct iovec *iov;
1268     int nio;
1269     int nbytes;
1270 {
1271     int bytes;
1272     SPLVAR;
1273
1274     NETPRI;
1275     AFS_RXGLOCK();
1276     MUTEX_ENTER(&call->lock);
1277     bytes = rxi_WritevProc(call, iov, nio, nbytes);
1278     MUTEX_EXIT(&call->lock);
1279     AFS_RXGUNLOCK();
1280     USERPRI;
1281     return bytes;
1282 }
1283
1284 /* Flush any buffered data to the stream, switch to read mode
1285  * (clients) or to EOF mode (servers) */
1286 void rxi_FlushWrite(call)
1287     register struct rx_call *call;
1288 {
1289     register struct rx_packet *cp = call->currentPacket;
1290     register struct rx_packet *tp; /* Temporary packet pointer */
1291     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1292
1293     /* Free any packets from the last call to ReadvProc/WritevProc */
1294     for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1295         queue_Remove(tp);
1296         rxi_FreePacket(tp);
1297     }
1298
1299     if (call->mode == RX_MODE_SENDING) {
1300
1301         call->mode = (call->conn->type == RX_CLIENT_CONNECTION ?
1302                                           RX_MODE_RECEIVING: RX_MODE_EOF);
1303
1304 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1305         /* Wait until TQ_BUSY is reset before adding any
1306          * packets to the transmit queue
1307          */
1308         while (call->flags & RX_CALL_TQ_BUSY) {
1309             call->flags |= RX_CALL_TQ_WAIT;
1310 #ifdef RX_ENABLE_LOCKS
1311             CV_WAIT(&call->cv_tq, &call->lock);
1312 #else /* RX_ENABLE_LOCKS */
1313             osi_rxSleep(&call->tq);
1314 #endif /* RX_ENABLE_LOCKS */
1315         }
1316 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1317
1318         if (cp) {
1319             /* cp->length is only supposed to be the user's data */
1320             /* cp->length was already set to (then-current) 
1321              * MaxUserDataSize or less. */
1322             cp->length -= call->nFree;
1323             call->currentPacket = (struct rx_packet *) 0;
1324             call->nFree = 0;
1325         }
1326         else {
1327             cp = rxi_AllocSendPacket(call,0);
1328             if (!cp) {
1329                 /* Mode can no longer be MODE_SENDING */
1330                 return;
1331             }
1332             cp->length = 0;
1333             cp->niovecs = 1;  /* just the header */
1334             call->nFree = 0;
1335         }
1336
1337         /* The 1 specifies that this is the last packet */
1338         hadd32(call->bytesSent, cp->length);
1339         rxi_PrepareSendPacket(call, cp, 1);
1340         queue_Append(&call->tq, cp);
1341         if (!(call->flags & (RX_CALL_FAST_RECOVER|
1342                              RX_CALL_FAST_RECOVER_WAIT))) {
1343             rxi_Start(0, call, 0);
1344         }
1345     }
1346 }
1347
1348 /* Flush any buffered data to the stream, switch to read mode
1349  * (clients) or to EOF mode (servers) */
1350 void rx_FlushWrite(call)
1351   struct rx_call *call;
1352 {
1353     SPLVAR;
1354     NETPRI;
1355     AFS_RXGLOCK();
1356     MUTEX_ENTER(&call->lock);
1357     rxi_FlushWrite(call);
1358     MUTEX_EXIT(&call->lock);
1359     AFS_RXGUNLOCK();
1360     USERPRI;
1361 }