Initial OpenBSD support. Most of user space builds. No kernel module yet.
[openafs.git] / src / rx / rx_rdwr.c
1  /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #ifdef KERNEL
12 #include "../afs/param.h"
13 #else
14 #include <afs/param.h>
15 #endif
16
17 RCSID("$Header$");
18
19 #ifdef KERNEL
20 #ifndef UKERNEL
21 #if defined(AFS_DARWIN_ENV) || defined(AFS_XBSD_ENV)
22 #include "../afs/sysincludes.h"
23 #else
24 #include "../h/types.h"
25 #include "../h/time.h"
26 #include "../h/stat.h"
27 #ifdef  AFS_OSF_ENV
28 #include <net/net_globals.h>
29 #endif  /* AFS_OSF_ENV */
30 #ifdef AFS_LINUX20_ENV
31 #include "../h/socket.h"
32 #endif
33 #include "../netinet/in.h"
34 #if defined(AFS_SGI_ENV)
35 #include "../afs/sysincludes.h"
36 #endif
37 #endif
38 #include "../afs/afs_args.h"
39 #include "../afs/afs_osi.h"
40 #if     (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
41 #include "../h/systm.h"
42 #endif
43 #else /* !UKERNEL */
44 #include "../afs/sysincludes.h"
45 #endif /* !UKERNEL */
46 #ifdef RXDEBUG
47 #undef RXDEBUG      /* turn off debugging */
48 #endif /* RXDEBUG */
49
50 #include "../rx/rx_kmutex.h"
51 #include "../rx/rx_kernel.h"
52 #include "../rx/rx_clock.h"
53 #include "../rx/rx_queue.h"
54 #include "../rx/rx.h"
55 #include "../rx/rx_globals.h"
56 #include "../afs/lock.h"
57 #include "../afsint/afsint.h"
58 #ifdef  AFS_ALPHA_ENV
59 #undef kmem_alloc
60 #undef kmem_free
61 #undef mem_alloc
62 #undef mem_free
63 #undef register
64 #endif  /* AFS_ALPHA_ENV */
65 #else /* KERNEL */
66 # include <sys/types.h>
67 #ifndef AFS_NT40_ENV
68 # include <sys/socket.h>
69 # include <sys/file.h>
70 # include <netdb.h>
71 # include <netinet/in.h>
72 # include <sys/stat.h>
73 # include <sys/time.h>
74 #endif
75 #ifdef HAVE_STRING_H
76 #include <string.h>
77 #else
78 #ifdef HAVE_STRINGS_H
79 #include <strings.h>
80 #endif
81 #endif
82 #ifdef HAVE_UNISTD_H
83 #include <unistd.h>
84 #endif
85 # include "rx_user.h"
86 # include "rx_clock.h"
87 # include "rx_queue.h"
88 # include "rx.h"
89 # include "rx_globals.h"
90 # include "rx_internal.h"
91 #endif /* KERNEL */
92
93 #ifdef RX_LOCKS_DB
94 /* rxdb_fileID is used to identify the lock location, along with line#. */
95 static int rxdb_fileID = RXDB_FILE_RX_RDWR;
96 #endif /* RX_LOCKS_DB */
97 /* rxi_ReadProc -- internal version.
98  *
99  * LOCKS USED -- called at netpri with rx global lock and call->lock held.
100  */
101 int rxi_ReadProc(call, buf, nbytes)
102     register struct rx_call *call;
103     register char *buf;
104     register int nbytes;
105 {
106     register struct rx_packet *cp = call->currentPacket;
107     register struct rx_packet *rp;
108     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
109     register int requestCount;
110     register unsigned int t;
111 /* XXXX took out clock_NewTime from here.  Was it needed? */
112     requestCount = nbytes;
113
114     /* Free any packets from the last call to ReadvProc/WritevProc */
115     if (!queue_IsEmpty(&call->iovq)) {
116         for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
117             queue_Remove(rp);
118             rxi_FreePacket(rp);
119         }
120     }
121
122     do {
123       if (call->nLeft == 0) {
124         /* Get next packet */
125         for (;;) {
126           if (call->error || (call->mode != RX_MODE_RECEIVING)) {
127             if (call->error) {
128               return 0;
129             }
130             if (call->mode == RX_MODE_SENDING) {
131               rxi_FlushWrite(call);
132               continue;
133             }
134           }
135           if (queue_IsNotEmpty(&call->rq)) {
136             /* Check that next packet available is next in sequence */
137             rp = queue_First(&call->rq, rx_packet);
138             if (rp->header.seq == call->rnext) {
139               afs_int32 error;
140               register struct rx_connection *conn = call->conn;
141               queue_Remove(rp);
142               
143               /* RXS_CheckPacket called to undo RXS_PreparePacket's
144                * work.  It may reduce the length of the packet by up
145                * to conn->maxTrailerSize, to reflect the length of the
146                * data + the header. */
147               if ((error = RXS_CheckPacket(conn->securityObject, call, rp))) {
148                 /* Used to merely shut down the call, but now we 
149                  * shut down the whole connection since this may 
150                  * indicate an attempt to hijack it */
151
152                 MUTEX_EXIT(&call->lock);
153                 rxi_ConnectionError(conn, error);
154                 MUTEX_ENTER(&conn->conn_data_lock);
155                 rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
156                 MUTEX_EXIT(&conn->conn_data_lock);
157                 rxi_FreePacket(rp);
158                 MUTEX_ENTER(&call->lock);
159
160                 return 0;
161               }
162               call->rnext++;
163               cp = call->currentPacket = rp;
164               call->curvec = 1;   /* 0th vec is always header */
165               /* begin at the beginning [ more or less ], continue 
166                * on until the end, then stop. */
167               call->curpos = (char *)cp->wirevec[1].iov_base
168                              + call->conn->securityHeaderSize; 
169               call->curlen = cp->wirevec[1].iov_len
170                              - call->conn->securityHeaderSize; 
171               
172               /* Notice that this code works correctly if the data
173                * size is 0 (which it may be--no reply arguments from
174                * server, for example).  This relies heavily on the
175                * fact that the code below immediately frees the packet
176                * (no yields, etc.).  If it didn't, this would be a
177                * problem because a value of zero for call->nLeft
178                * normally means that there is no read packet */
179               call->nLeft = cp->length;
180               hadd32(call->bytesRcvd, cp->length);
181               
182               /* Send a hard ack for every rxi_HardAckRate+1 packets
183                * consumed. Otherwise schedule an event to send
184                * the hard ack later on.
185                */
186               call->nHardAcks++;
187               if (!(call->flags &RX_CALL_RECEIVE_DONE)) {
188                 if (call->nHardAcks > (u_short)rxi_HardAckRate) {
189                   rxevent_Cancel(call->delayedAckEvent, call,
190                                  RX_CALL_REFCOUNT_DELAY);
191                   rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
192                 }
193                 else {
194                   struct clock when;
195                   clock_GetTime(&when);
196                   /* Delay to consolidate ack packets */
197                   clock_Add(&when, &rx_hardAckDelay);
198                   if (!call->delayedAckEvent ||
199                       clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
200                       rxevent_Cancel(call->delayedAckEvent, call,
201                                      RX_CALL_REFCOUNT_DELAY);
202                       CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
203                       call->delayedAckEvent = rxevent_Post(&when,
204                                                            rxi_SendDelayedAck,
205                                                            call, 0);
206                   }
207                 }
208               }
209               break;
210             }
211           }
212
213 /*
214 MTUXXX  doesn't there need to be an "else" here ??? 
215 */
216           /* Are there ever going to be any more packets? */
217           if (call->flags & RX_CALL_RECEIVE_DONE) {
218             return requestCount - nbytes;
219           }
220           /* Wait for in-sequence packet */
221           call->flags |= RX_CALL_READER_WAIT;
222           clock_NewTime();
223           call->startWait = clock_Sec();
224           while (call->flags & RX_CALL_READER_WAIT) {
225 #ifdef  RX_ENABLE_LOCKS
226               CV_WAIT(&call->cv_rq, &call->lock);
227 #else
228               osi_rxSleep(&call->rq);
229 #endif
230           }
231
232           call->startWait = 0;
233 #ifdef RX_ENABLE_LOCKS
234           if (call->error) {
235               return 0;
236           }
237 #endif /* RX_ENABLE_LOCKS */
238         }
239       }
240       else /* assert(cp); */  /* MTUXXX  this should be replaced by some error-recovery code before shipping */
241 /* yes, the following block is allowed to be the ELSE clause (or not) */
242
243         /* It's possible for call->nLeft to be smaller than any particular
244          * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
245          * reflects the size of the buffer.  We have to keep track of the
246          * number of bytes read in the length field of the packet struct.  On
247          * the final portion of a received packet, it's almost certain that
248          * call->nLeft will be smaller than the final buffer. */
249
250         while (nbytes && cp) {
251           t = MIN((int)call->curlen, nbytes);
252           t = MIN(t, (int)call->nLeft);
253           memcpy(buf, call->curpos, t);
254           buf += t;
255           nbytes -= t;
256           call->curpos += t;
257           call->curlen -= t;
258           call->nLeft -= t;
259
260           if (!call->nLeft) {
261             /* out of packet.  Get another one. */
262             rxi_FreePacket(cp);
263             cp = call->currentPacket = (struct rx_packet *)0;
264           }
265           else if (!call->curlen) {
266             /* need to get another struct iov */
267             if (++call->curvec >= cp->niovecs) {
268               /* current packet is exhausted, get ready for another */
269               /* don't worry about curvec and stuff, they get set somewhere else */
270               rxi_FreePacket(cp);
271               cp = call->currentPacket = (struct rx_packet *)0;
272               call->nLeft = 0;
273             }
274             else {
275               call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
276               call->curlen = cp->wirevec[call->curvec].iov_len;
277             }
278           }  
279         }
280         if (!nbytes) {
281           /* user buffer is full, return */
282           return requestCount;
283         }
284
285     } while (nbytes);
286
287     return requestCount;
288 }
289
290 int rx_ReadProc(call, buf, nbytes)
291   struct rx_call *call;
292   char *buf;
293   int nbytes;
294 {
295     int bytes;
296     int tcurlen;
297     int tnLeft;
298     char *tcurpos;
299     SPLVAR;
300
301     /*
302      * Free any packets from the last call to ReadvProc/WritevProc.
303      * We do not need the lock because the receiver threads only
304      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
305      * RX_CALL_IOVEC_WAIT is always cleared before returning from
306      * ReadvProc/WritevProc.
307      */
308     if (!queue_IsEmpty(&call->iovq)) {
309         register struct rx_packet *rp; 
310         register struct rx_packet *nxp;
311         for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
312             queue_Remove(rp);
313             rxi_FreePacket(rp);
314         }
315     }
316
317     /*
318      * Most common case, all of the data is in the current iovec.
319      * We do not need the lock because this is the only thread that
320      * updates the curlen, curpos, nLeft fields.
321      *
322      * We are relying on nLeft being zero unless the call is in receive mode.
323      */
324     tcurlen = call->curlen;
325     tnLeft = call->nLeft;
326     if (!call->error && tcurlen > nbytes && tnLeft > nbytes) {
327         tcurpos = call->curpos;
328         memcpy(buf, tcurpos, nbytes);
329         call->curpos = tcurpos + nbytes;
330         call->curlen = tcurlen - nbytes;
331         call->nLeft = tnLeft - nbytes;
332         return nbytes;
333     }
334
335     NETPRI;
336     AFS_RXGLOCK();
337     MUTEX_ENTER(&call->lock);
338     bytes = rxi_ReadProc(call, buf, nbytes);
339     MUTEX_EXIT(&call->lock);
340     AFS_RXGUNLOCK();
341     USERPRI;
342     return bytes;
343 }
344
345 /* Optimization for unmarshalling 32 bit integers */
346 int rx_ReadProc32(call, value)
347   struct rx_call *call;
348   afs_int32 *value;
349 {
350     int bytes;
351     int tcurlen;
352     int tnLeft;
353     char *tcurpos;
354     SPLVAR;
355
356     /*
357      * Free any packets from the last call to ReadvProc/WritevProc.
358      * We do not need the lock because the receiver threads only
359      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
360      * RX_CALL_IOVEC_WAIT is always cleared before returning from
361      * ReadvProc/WritevProc.
362      */
363     if (!queue_IsEmpty(&call->iovq)) {
364         register struct rx_packet *rp; 
365         register struct rx_packet *nxp;
366         for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
367             queue_Remove(rp);
368             rxi_FreePacket(rp);
369         }
370     }
371
372     /*
373      * Most common case, all of the data is in the current iovec.
374      * We do not need the lock because this is the only thread that
375      * updates the curlen, curpos, nLeft fields.
376      *
377      * We are relying on nLeft being zero unless the call is in receive mode.
378      */
379     tcurlen = call->curlen;
380     tnLeft = call->nLeft;
381     if (!call->error && tcurlen > sizeof(afs_int32) && tnLeft > sizeof(afs_int32)) {
382         tcurpos = call->curpos;
383         if (!((long)tcurpos & (sizeof(afs_int32)-1))) {
384             *value = *((afs_int32 *)(tcurpos));
385         } else {
386             memcpy((char *)value, tcurpos, sizeof(afs_int32));
387         }
388         call->curpos = tcurpos + sizeof(afs_int32);
389         call->curlen = tcurlen - sizeof(afs_int32);
390         call->nLeft = tnLeft - sizeof(afs_int32);
391         return sizeof(afs_int32);
392     }
393
394     NETPRI;
395     AFS_RXGLOCK();
396     MUTEX_ENTER(&call->lock);
397     bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
398     MUTEX_EXIT(&call->lock);
399     AFS_RXGUNLOCK();
400     USERPRI;
401     return bytes;
402 }
403
404 /* rxi_FillReadVec
405  *
406  * Uses packets in the receive queue to fill in as much of the
407  * current iovec as possible. Does not block if it runs out
408  * of packets to complete the iovec. Return true if an ack packet
409  * was sent, otherwise return false */
410 int rxi_FillReadVec(call, seq, serial, flags)
411   struct rx_call *call;
412   afs_uint32 seq, serial, flags;
413 {
414     int didConsume = 0;
415     int didHardAck = 0;
416     register unsigned int t;
417     struct rx_packet *rp;
418     struct rx_packet *curp;
419     struct iovec *call_iov;
420     struct iovec *cur_iov = NULL;
421
422     curp = call->currentPacket;
423     if (curp) {
424         cur_iov = &curp->wirevec[call->curvec];
425     }
426     call_iov = &call->iov[call->iovNext];
427
428     while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) {
429       if (call->nLeft == 0) {
430         /* Get next packet */
431         if (queue_IsNotEmpty(&call->rq)) {
432           /* Check that next packet available is next in sequence */
433           rp = queue_First(&call->rq, rx_packet);
434           if (rp->header.seq == call->rnext) {
435             afs_int32 error;
436             register struct rx_connection *conn = call->conn;
437             queue_Remove(rp);
438               
439             /* RXS_CheckPacket called to undo RXS_PreparePacket's
440              * work.  It may reduce the length of the packet by up
441              * to conn->maxTrailerSize, to reflect the length of the
442              * data + the header. */
443             if ((error = RXS_CheckPacket(conn->securityObject, call, rp))) {
444               /* Used to merely shut down the call, but now we 
445                * shut down the whole connection since this may 
446                * indicate an attempt to hijack it */
447
448               MUTEX_EXIT(&call->lock);
449               rxi_ConnectionError(conn, error);
450               MUTEX_ENTER(&conn->conn_data_lock);
451               rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
452               MUTEX_EXIT(&conn->conn_data_lock);
453               rxi_FreePacket(rp);
454               MUTEX_ENTER(&call->lock);
455
456               return 1;
457             }
458             call->rnext++;
459             curp = call->currentPacket = rp;
460             call->curvec = 1;   /* 0th vec is always header */
461             cur_iov = &curp->wirevec[1];
462             /* begin at the beginning [ more or less ], continue 
463              * on until the end, then stop. */
464             call->curpos = (char *)curp->wirevec[1].iov_base
465                            + call->conn->securityHeaderSize; 
466             call->curlen = curp->wirevec[1].iov_len
467                            - call->conn->securityHeaderSize; 
468               
469             /* Notice that this code works correctly if the data
470              * size is 0 (which it may be--no reply arguments from
471              * server, for example).  This relies heavily on the
472              * fact that the code below immediately frees the packet
473              * (no yields, etc.).  If it didn't, this would be a
474              * problem because a value of zero for call->nLeft
475              * normally means that there is no read packet */
476             call->nLeft = curp->length;
477             hadd32(call->bytesRcvd, curp->length);
478               
479             /* Send a hard ack for every rxi_HardAckRate+1 packets
480              * consumed. Otherwise schedule an event to send
481              * the hard ack later on.
482              */
483             call->nHardAcks++;
484             didConsume = 1;
485             continue;
486           }
487         }
488         break;
489       }
490
491       /* It's possible for call->nLeft to be smaller than any particular
492        * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
493        * reflects the size of the buffer.  We have to keep track of the
494        * number of bytes read in the length field of the packet struct.  On
495        * the final portion of a received packet, it's almost certain that
496        * call->nLeft will be smaller than the final buffer. */
497       while (call->iovNBytes && call->iovNext < call->iovMax && curp) {
498         
499         t = MIN((int)call->curlen, call->iovNBytes);
500         t = MIN(t, (int)call->nLeft);
501         call_iov->iov_base = call->curpos;
502         call_iov->iov_len = t;
503         call_iov++;
504         call->iovNext++;
505         call->iovNBytes -= t;
506         call->curpos += t;
507         call->curlen -= t;
508         call->nLeft -= t;
509
510         if (!call->nLeft) {
511           /* out of packet.  Get another one. */
512           queue_Append(&call->iovq, curp);
513           curp = call->currentPacket = (struct rx_packet *)0;
514         }
515         else if (!call->curlen) {
516           /* need to get another struct iov */
517           if (++call->curvec >= curp->niovecs) {
518             /* current packet is exhausted, get ready for another */
519             /* don't worry about curvec and stuff, they get set somewhere else */
520             queue_Append(&call->iovq, curp);
521             curp = call->currentPacket = (struct rx_packet *)0;
522             call->nLeft = 0;
523           }
524           else {
525             cur_iov++;
526             call->curpos = (char *)cur_iov->iov_base;
527             call->curlen = cur_iov->iov_len;
528           }
529         }  
530       }  
531     }
532
533     /* If we consumed any packets then check whether we need to
534      * send a hard ack. */
535     if (didConsume && (!(call->flags &RX_CALL_RECEIVE_DONE))) {
536       if (call->nHardAcks > (u_short)rxi_HardAckRate) {
537         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
538         rxi_SendAck(call, 0, seq, serial, flags, RX_ACK_DELAY, 0);
539         didHardAck = 1;
540       }
541       else {
542         struct clock when;
543         clock_GetTime(&when);
544         /* Delay to consolidate ack packets */
545         clock_Add(&when, &rx_hardAckDelay);
546         if (!call->delayedAckEvent ||
547           clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
548           rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
549           CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
550           call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
551                                                call, 0);
552         }
553       }
554     }
555     return didHardAck;
556 }
557
558
559 /* rxi_ReadvProc -- internal version.
560  *
561  * Fills in an iovec with pointers to the packet buffers. All packets
562  * except the last packet (new current packet) are moved to the iovq
563  * while the application is processing the data.
564  *
565  * LOCKS USED -- called at netpri with rx global lock and call->lock held.
566  */
567 int rxi_ReadvProc(call, iov, nio, maxio, nbytes)
568     struct rx_call *call;
569     struct iovec *iov;
570     int *nio;
571     int maxio;
572     int nbytes;
573 {
574     struct rx_packet *rp; 
575     struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
576     int requestCount;
577     int nextio;
578
579     requestCount = nbytes;
580     nextio = 0;
581
582     /* Free any packets from the last call to ReadvProc/WritevProc */
583     for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
584         queue_Remove(rp);
585         rxi_FreePacket(rp);
586     }
587
588     if (call->mode == RX_MODE_SENDING) {
589         rxi_FlushWrite(call);
590     }
591
592     if (call->error) {
593         return 0;
594     }
595
596     /* Get whatever data is currently available in the receive queue.
597      * If rxi_FillReadVec sends an ack packet then it is possible
598      * that we will receive more data while we drop the call lock
599      * to send the packet. Set the RX_CALL_IOVEC_WAIT flag
600      * here to avoid a race with the receive thread if we send
601      * hard acks in rxi_FillReadVec. */
602     call->flags |= RX_CALL_IOVEC_WAIT;
603     call->iovNBytes = nbytes;
604     call->iovMax = maxio;
605     call->iovNext = 0;
606     call->iov = iov;
607     rxi_FillReadVec(call, 0, 0, 0);
608
609     /* if we need more data then sleep until the receive thread has
610      * filled in the rest. */
611     if (!call->error && call->iovNBytes &&
612         call->iovNext < call->iovMax &&
613         !(call->flags & RX_CALL_RECEIVE_DONE)) {
614       call->flags |= RX_CALL_READER_WAIT;
615       clock_NewTime();
616       call->startWait = clock_Sec();
617       while (call->flags & RX_CALL_READER_WAIT) {
618 #ifdef  RX_ENABLE_LOCKS
619         CV_WAIT(&call->cv_rq, &call->lock);
620 #else
621         osi_rxSleep(&call->rq);
622 #endif
623       }
624       call->startWait = 0;
625     }
626     call->flags &= ~RX_CALL_IOVEC_WAIT;
627 #ifdef RX_ENABLE_LOCKS
628     if (call->error) {
629       return 0;
630     }
631 #endif /* RX_ENABLE_LOCKS */
632
633     call->iov = NULL;
634     *nio = call->iovNext;
635     return nbytes - call->iovNBytes;
636 }
637
638 int rx_ReadvProc(call, iov, nio, maxio, nbytes)
639     struct rx_call *call;
640     struct iovec *iov;
641     int *nio;
642     int maxio;
643     int nbytes;
644 {
645     int bytes;
646     SPLVAR;
647
648     NETPRI;
649     AFS_RXGLOCK();
650     MUTEX_ENTER(&call->lock);
651     bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
652     MUTEX_EXIT(&call->lock);
653     AFS_RXGUNLOCK();
654     USERPRI;
655     return bytes;
656 }
657
658 /* rxi_WriteProc -- internal version.
659  *
660  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
661
662 int rxi_WriteProc(call, buf, nbytes)
663     register struct rx_call *call;
664     register char *buf;
665     register int nbytes;
666 {
667     struct rx_connection *conn = call->conn;
668     register struct rx_packet *cp = call->currentPacket;
669     register struct rx_packet *tp; /* Temporary packet pointer */
670     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
671     register unsigned int t;
672     int requestCount = nbytes;
673
674     /* Free any packets from the last call to ReadvProc/WritevProc */
675     if (!queue_IsEmpty(&call->iovq)) {
676         for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
677             queue_Remove(tp);
678             rxi_FreePacket(tp);
679         }
680     }
681
682     if (call->mode != RX_MODE_SENDING) {
683         if ((conn->type == RX_SERVER_CONNECTION)
684             && (call->mode == RX_MODE_RECEIVING)) {
685             call->mode = RX_MODE_SENDING;
686             if (cp) {
687                 rxi_FreePacket(cp);
688                 cp = call->currentPacket = (struct rx_packet *) 0;
689                 call->nLeft = 0;
690                 call->nFree = 0;
691             }
692         }
693         else {
694             return 0;
695         }
696     }
697
698     /* Loop condition is checked at end, so that a write of 0 bytes
699      * will force a packet to be created--specially for the case where
700      * there are 0 bytes on the stream, but we must send a packet
701      * anyway. */
702     do {
703         if (call->nFree == 0) {
704             if (!call->error && cp) {
705 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
706                 /* Wait until TQ_BUSY is reset before adding any
707                  * packets to the transmit queue
708                  */
709                 while (call->flags & RX_CALL_TQ_BUSY) {
710                     call->flags |= RX_CALL_TQ_WAIT;
711 #ifdef RX_ENABLE_LOCKS
712                     CV_WAIT(&call->cv_tq, &call->lock);
713 #else /* RX_ENABLE_LOCKS */
714                     osi_rxSleep(&call->tq);
715 #endif /* RX_ENABLE_LOCKS */
716                 }
717 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
718                 clock_NewTime(); /* Bogus:  need new time package */
719                 /* The 0, below, specifies that it is not the last packet: 
720                  * there will be others. PrepareSendPacket may
721                  * alter the packet length by up to
722                  * conn->securityMaxTrailerSize */
723                 hadd32(call->bytesSent, cp->length);
724                 rxi_PrepareSendPacket(call, cp, 0);
725                 queue_Append(&call->tq, cp);
726                 cp = call->currentPacket = NULL;
727                 if (!(call->flags & (RX_CALL_FAST_RECOVER|
728                                      RX_CALL_FAST_RECOVER_WAIT))) {
729                     rxi_Start(0, call, 0);
730                 }
731             }
732             /* Wait for transmit window to open up */
733             while (!call->error && call->tnext + 1 > call->tfirst + call->twind) {
734                 clock_NewTime();
735                 call->startWait = clock_Sec();
736
737 #ifdef  RX_ENABLE_LOCKS
738                 CV_WAIT(&call->cv_twind, &call->lock);
739 #else
740                 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
741                 osi_rxSleep(&call->twind);
742 #endif
743
744                 call->startWait = 0;
745 #ifdef RX_ENABLE_LOCKS
746                 if (call->error) {
747                     return 0;
748                 }
749 #endif /* RX_ENABLE_LOCKS */
750             }
751             if ((cp = rxi_AllocSendPacket(call, nbytes))) {
752                 call->currentPacket = cp;
753                 call->nFree = cp->length;
754                 call->curvec = 1;   /* 0th vec is always header */
755                 /* begin at the beginning [ more or less ], continue 
756                  * on until the end, then stop. */
757                 call->curpos = (char *)cp->wirevec[1].iov_base
758                                + call->conn->securityHeaderSize; 
759                 call->curlen = cp->wirevec[1].iov_len
760                                - call->conn->securityHeaderSize; 
761             }
762             if (call->error) {
763                 if (cp) {
764                   rxi_FreePacket(cp);
765                   call->currentPacket = NULL;
766                 }
767                 return 0;
768             }
769         }
770
771         if (cp && (int)call->nFree < nbytes) {
772               /* Try to extend the current buffer */
773               register int len, mud;
774               len = cp->length;
775               mud = rx_MaxUserDataSize(call);
776               if (mud > len) {
777                 int want;
778                 want = MIN(nbytes - (int)call->nFree, mud - len);
779                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
780                 if (cp->length > (unsigned)mud)
781                     cp->length = mud;
782                 call->nFree += (cp->length - len);
783               }
784         }
785
786         /* If the remaining bytes fit in the buffer, then store them
787          * and return.  Don't ship a buffer that's full immediately to
788          * the peer--we don't know if it's the last buffer yet */
789
790         if (!cp)  {
791           call->nFree = 0;
792         }
793
794         while (nbytes && call->nFree) {
795         
796           t = MIN((int)call->curlen, nbytes);
797           t = MIN((int)call->nFree, t);
798           memcpy(call->curpos, buf, t);
799           buf += t;
800           nbytes -= t;
801           call->curpos += t;
802           call->curlen -= t;
803           call->nFree -= t;
804
805           if (!call->curlen) {
806             /* need to get another struct iov */
807             if (++call->curvec >= cp->niovecs) {
808               /* current packet is full, extend or send it */
809               call->nFree = 0;
810             } else {
811               call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
812               call->curlen = cp->wirevec[call->curvec].iov_len;
813             }
814           }  
815         } /* while bytes to send and room to send them */
816
817         /* might be out of space now */
818         if (!nbytes) {
819             return requestCount;
820         }
821         else ; /* more data to send, so get another packet and keep going */
822     } while(nbytes);
823
824     return requestCount - nbytes;
825 }
826
827 int rx_WriteProc(call, buf, nbytes)
828   struct rx_call *call;
829   char *buf;
830   int nbytes;
831 {
832     int bytes;
833     int tcurlen;
834     int tnFree;
835     char *tcurpos;
836     SPLVAR;
837
838     /*
839      * Free any packets from the last call to ReadvProc/WritevProc.
840      * We do not need the lock because the receiver threads only
841      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
842      * RX_CALL_IOVEC_WAIT is always cleared before returning from
843      * ReadvProc/WritevProc.
844      */
845     if (!queue_IsEmpty(&call->iovq)) {
846         register struct rx_packet *rp; 
847         register struct rx_packet *nxp;
848         for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
849             queue_Remove(rp);
850             rxi_FreePacket(rp);
851         }
852     }
853
854     /*
855      * Most common case: all of the data fits in the current iovec.
856      * We do not need the lock because this is the only thread that
857      * updates the curlen, curpos, nFree fields.
858      *
859      * We are relying on nFree being zero unless the call is in send mode.
860      */
861     tcurlen = (int)call->curlen;
862     tnFree = (int)call->nFree;
863     if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
864         tcurpos = call->curpos;
865         memcpy(tcurpos, buf, nbytes);
866         call->curpos = tcurpos + nbytes;
867         call->curlen = tcurlen - nbytes;
868         call->nFree = tnFree - nbytes;
869         return nbytes;
870     }
871
872     NETPRI;
873     AFS_RXGLOCK();
874     MUTEX_ENTER(&call->lock);
875     bytes = rxi_WriteProc(call, buf, nbytes);
876     MUTEX_EXIT(&call->lock);
877     AFS_RXGUNLOCK();
878     USERPRI;
879     return bytes;
880 }
881
882 /* Optimization for marshalling 32 bit arguments */
883 int rx_WriteProc32(call, value)
884   register struct rx_call *call;
885   register afs_int32 *value;
886 {
887     int bytes;
888     int tcurlen;
889     int tnFree;
890     char *tcurpos;
891     SPLVAR;
892
893     /*
894      * Free any packets from the last call to ReadvProc/WritevProc.
895      * We do not need the lock because the receiver threads only
896      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
897      * RX_CALL_IOVEC_WAIT is always cleared before returning from
898      * ReadvProc/WritevProc.
899      */
900     if (!queue_IsEmpty(&call->iovq)) {
901         register struct rx_packet *rp; 
902         register struct rx_packet *nxp;
903         for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
904             queue_Remove(rp);
905             rxi_FreePacket(rp);
906         }
907     }
908
909     /*
910      * Most common case: all of the data fits in the current iovec.
911      * We do not need the lock because this is the only thread that
912      * updates the curlen, curpos, nFree fields.
913      *
914      * We are relying on nFree being zero unless the call is in send mode.
915      */
916     tcurlen = (int)call->curlen;
917     tnFree = (int)call->nFree;
918     if (!call->error && tcurlen >= sizeof(afs_int32) && tnFree >= sizeof(afs_int32)) {
919         tcurpos = call->curpos;
920         if (!((long)tcurpos & (sizeof(afs_int32)-1))) {
921             *((afs_int32 *)(tcurpos)) = *value;
922         } else {
923             memcpy(tcurpos, (char *)value, sizeof(afs_int32));
924         }
925         call->curpos = tcurpos + sizeof(afs_int32);
926         call->curlen = tcurlen - sizeof(afs_int32);
927         call->nFree = tnFree - sizeof(afs_int32);
928         return sizeof(afs_int32);
929     }
930
931     NETPRI;
932     AFS_RXGLOCK();
933     MUTEX_ENTER(&call->lock);
934     bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
935     MUTEX_EXIT(&call->lock);
936     AFS_RXGUNLOCK();
937     USERPRI;
938     return bytes;
939 }
940
941 /* rxi_WritevAlloc -- internal version.
942  *
943  * Fill in an iovec to point to data in packet buffers. The application
944  * calls rxi_WritevProc when the buffers are full.
945  *
946  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
947
948 int rxi_WritevAlloc(call, iov, nio, maxio, nbytes)
949     struct rx_call *call;
950     struct iovec *iov;
951     int *nio;
952     int maxio;
953     int nbytes;
954 {
955     struct rx_connection *conn = call->conn;
956     struct rx_packet *cp = call->currentPacket;
957     struct rx_packet *tp; /* temporary packet pointer */
958     struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
959     int requestCount;
960     int nextio;
961     /* Temporary values, real work is done in rxi_WritevProc */
962     int tnFree; 
963     int tcurvec;
964     char * tcurpos;
965     int tcurlen;
966
967     requestCount = nbytes;
968     nextio = 0;
969     
970     /* Free any packets from the last call to ReadvProc/WritevProc */
971     for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
972         queue_Remove(tp);
973         rxi_FreePacket(tp);
974     }
975
976     if (call->mode != RX_MODE_SENDING) {
977         if ((conn->type == RX_SERVER_CONNECTION)
978             && (call->mode == RX_MODE_RECEIVING)) {
979             call->mode = RX_MODE_SENDING;
980             if (cp) {
981                 rxi_FreePacket(cp);
982                 cp = call->currentPacket = (struct rx_packet *) 0;
983                 call->nLeft = 0;
984                 call->nFree = 0;
985             }
986         }
987         else {
988             return 0;
989         }
990     }
991
992     /* Set up the iovec to point to data in packet buffers. */
993     tnFree = call->nFree;
994     tcurvec = call->curvec;
995     tcurpos = call->curpos;
996     tcurlen = call->curlen;
997     do {
998       register unsigned int t;
999
1000       if (tnFree == 0) {
1001         /* current packet is full, allocate a new one */
1002         cp = rxi_AllocSendPacket(call, nbytes);
1003         if (cp == NULL) {
1004           /* out of space, return what we have */
1005           *nio = nextio;
1006           return requestCount - nbytes;
1007         }
1008         queue_Append(&call->iovq, cp);
1009         tnFree = cp->length;
1010         tcurvec = 1;
1011         tcurpos = (char *)cp->wirevec[1].iov_base
1012                   + call->conn->securityHeaderSize;
1013         tcurlen = cp->wirevec[1].iov_len
1014                   - call->conn->securityHeaderSize;
1015       }
1016
1017       if (tnFree < nbytes) {
1018         /* try to extend the current packet */
1019         register int len, mud;
1020         len = cp->length;
1021         mud = rx_MaxUserDataSize(call);
1022         if (mud > len) {
1023           int want;
1024           want = MIN(nbytes - tnFree, mud - len);
1025           rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
1026           if (cp->length > (unsigned)mud)
1027             cp->length = mud;
1028           tnFree += (cp->length - len);
1029           if (cp == call->currentPacket) {
1030             call->nFree += (cp->length - len);
1031           }
1032         }
1033       }
1034
1035       /* fill in the next entry in the iovec */
1036       t = MIN(tcurlen, nbytes);
1037       t = MIN(tnFree, t);
1038       iov[nextio].iov_base = tcurpos;
1039       iov[nextio].iov_len = t;
1040       nbytes -= t;
1041       tcurpos += t;
1042       tcurlen -= t;
1043       tnFree -= t;
1044       nextio++;
1045
1046       if (!tcurlen) {
1047           /* need to get another struct iov */
1048           if (++tcurvec >= cp->niovecs) {
1049             /* current packet is full, extend it or move on to next packet */
1050             tnFree = 0;
1051           } else { 
1052             tcurpos = (char *)cp->wirevec[tcurvec].iov_base;
1053             tcurlen = cp->wirevec[tcurvec].iov_len;
1054           }
1055       }
1056     } while (nbytes && nextio < maxio);
1057     *nio = nextio;
1058     return requestCount - nbytes;
1059 }
1060
1061 int rx_WritevAlloc(call, iov, nio, maxio, nbytes)
1062     struct rx_call *call;
1063     struct iovec *iov;
1064     int *nio;
1065     int maxio;
1066     int nbytes;
1067 {
1068     int bytes;
1069     SPLVAR;
1070
1071     NETPRI;
1072     AFS_RXGLOCK();
1073     MUTEX_ENTER(&call->lock);
1074     bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
1075     MUTEX_EXIT(&call->lock);
1076     AFS_RXGUNLOCK();
1077     USERPRI;
1078     return bytes;
1079 }
1080
1081 int rx_WritevInit(call)
1082     struct rx_call *call;
1083 {
1084     int bytes;
1085     SPLVAR;
1086
1087     /*
1088      * Free any packets from the last call to ReadvProc/WritevProc.
1089      * We do not need the lock because the receiver threads only
1090      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
1091      * RX_CALL_IOVEC_WAIT is always cleared before returning from
1092      * ReadvProc/WritevProc.
1093      */
1094     if (!queue_IsEmpty(&call->iovq)) {
1095         register struct rx_packet *rp;
1096         register struct rx_packet *nxp;
1097         for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
1098             queue_Remove(rp);
1099             rxi_FreePacket(rp);
1100         }
1101     }
1102
1103     NETPRI;
1104     AFS_RXGLOCK();
1105     MUTEX_ENTER(&call->lock);
1106     bytes = rxi_WriteProc(call, &bytes, 0);
1107     MUTEX_EXIT(&call->lock);
1108     AFS_RXGUNLOCK();
1109     USERPRI;
1110     return bytes;
1111 }
1112
1113 /* rxi_WritevProc -- internal version.
1114  *
1115  * Send buffers allocated in rxi_WritevAlloc.
1116  *
1117  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
1118
1119 int rxi_WritevProc(call, iov, nio, nbytes)
1120     struct rx_call *call;
1121     struct iovec *iov;
1122     int nio;
1123     int nbytes;
1124 {
1125     struct rx_packet *cp = call->currentPacket;
1126     register struct rx_packet *tp; /* Temporary packet pointer */
1127     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1128     int nextio;
1129     int requestCount;
1130     struct rx_queue tmpq;
1131
1132     requestCount = nbytes;
1133     nextio = 0;
1134
1135     if (call->mode != RX_MODE_SENDING) {
1136         call->error = RX_PROTOCOL_ERROR;
1137     }
1138
1139 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1140     /* Wait until TQ_BUSY is reset before trying to move any
1141      * packets to the transmit queue.  */
1142     while (!call->error && call->flags & RX_CALL_TQ_BUSY) {
1143         call->flags |= RX_CALL_TQ_WAIT;
1144 #ifdef RX_ENABLE_LOCKS
1145         CV_WAIT(&call->cv_tq, &call->lock);
1146 #else /* RX_ENABLE_LOCKS */
1147         osi_rxSleep(&call->tq);
1148 #endif /* RX_ENABLE_LOCKS */
1149     }
1150 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1151
1152     if (call->error) {
1153         for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1154             queue_Remove(tp);
1155             rxi_FreePacket(tp);
1156         }
1157         if (cp) {
1158             rxi_FreePacket(cp);
1159             cp = call->currentPacket = NULL;
1160         }
1161         return 0;
1162     }
1163
1164     /* Loop through the I/O vector adjusting packet pointers.
1165      * Place full packets back onto the iovq once they are ready
1166      * to send. Set RX_PROTOCOL_ERROR if any problems are found in
1167      * the iovec. We put the loop condition at the end to ensure that
1168      * a zero length write will push a short packet. */
1169     nextio = 0;
1170     queue_Init(&tmpq);
1171     do {
1172         if (call->nFree == 0 && cp) {
1173             clock_NewTime(); /* Bogus:  need new time package */
1174             /* The 0, below, specifies that it is not the last packet: 
1175              * there will be others. PrepareSendPacket may
1176              * alter the packet length by up to
1177              * conn->securityMaxTrailerSize */
1178             hadd32(call->bytesSent, cp->length);
1179             rxi_PrepareSendPacket(call, cp, 0);
1180             queue_Append(&tmpq, cp);
1181
1182             /* The head of the iovq is now the current packet */
1183             if (nbytes) {
1184                 if (queue_IsEmpty(&call->iovq)) {
1185                     call->error = RX_PROTOCOL_ERROR;
1186                     cp = call->currentPacket = NULL;
1187                     for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
1188                         queue_Remove(tp);
1189                         rxi_FreePacket(tp);
1190                     }
1191                     return 0;
1192                 }
1193                 cp = queue_First(&call->iovq, rx_packet);
1194                 queue_Remove(cp);
1195                 call->currentPacket = cp;
1196                 call->nFree = cp->length;
1197                 call->curvec = 1;
1198                 call->curpos = (char *)cp->wirevec[1].iov_base
1199                                + call->conn->securityHeaderSize;
1200                 call->curlen = cp->wirevec[1].iov_len
1201                                - call->conn->securityHeaderSize;
1202             }
1203         }
1204
1205         if (nbytes) {
1206           /* The next iovec should point to the current position */
1207           if (iov[nextio].iov_base != call->curpos
1208               || iov[nextio].iov_len > (int)call->curlen) {
1209               call->error = RX_PROTOCOL_ERROR;
1210               for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
1211                   queue_Remove(tp);
1212                   rxi_FreePacket(tp);
1213               }
1214               if (cp) {
1215                 rxi_FreePacket(cp);
1216                 call->currentPacket = NULL;
1217               }
1218               return 0;
1219           }
1220           nbytes -= iov[nextio].iov_len;
1221           call->curpos += iov[nextio].iov_len;
1222           call->curlen -= iov[nextio].iov_len;
1223           call->nFree -= iov[nextio].iov_len;
1224           nextio++;
1225           if (call->curlen == 0) {
1226             if (++call->curvec > cp->niovecs) {
1227                 call->nFree = 0;
1228             } else {
1229                 call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
1230                 call->curlen = cp->wirevec[call->curvec].iov_len;
1231             }
1232           }
1233         }
1234     } while (nbytes && nextio < nio);
1235
1236     /* Move the packets from the temporary queue onto the transmit queue.
1237      * We may end up with more than call->twind packets on the queue. */
1238     for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
1239         queue_Remove(tp);
1240         queue_Append(&call->tq, tp);
1241     }
1242
1243     if (!(call->flags & (RX_CALL_FAST_RECOVER|RX_CALL_FAST_RECOVER_WAIT))) {
1244         rxi_Start(0, call, 0);
1245     }
1246
1247     /* Wait for the length of the transmit queue to fall below call->twind */
1248     while (!call->error && call->tnext + 1 > call->tfirst + call->twind) {
1249         clock_NewTime();
1250         call->startWait = clock_Sec();
1251 #ifdef  RX_ENABLE_LOCKS
1252         CV_WAIT(&call->cv_twind, &call->lock);
1253 #else
1254         call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
1255         osi_rxSleep(&call->twind);
1256 #endif
1257         call->startWait = 0;
1258     }
1259
1260     if (call->error) {
1261         if (cp) {
1262             rxi_FreePacket(cp);
1263             cp = call->currentPacket = NULL;
1264         }
1265         return 0;
1266     }
1267
1268     return requestCount - nbytes;
1269 }
1270
1271 int rx_WritevProc(call, iov, nio, nbytes)
1272     struct rx_call *call;
1273     struct iovec *iov;
1274     int nio;
1275     int nbytes;
1276 {
1277     int bytes;
1278     SPLVAR;
1279
1280     NETPRI;
1281     AFS_RXGLOCK();
1282     MUTEX_ENTER(&call->lock);
1283     bytes = rxi_WritevProc(call, iov, nio, nbytes);
1284     MUTEX_EXIT(&call->lock);
1285     AFS_RXGUNLOCK();
1286     USERPRI;
1287     return bytes;
1288 }
1289
1290 /* Flush any buffered data to the stream, switch to read mode
1291  * (clients) or to EOF mode (servers) */
1292 void rxi_FlushWrite(call)
1293     register struct rx_call *call;
1294 {
1295     register struct rx_packet *cp = call->currentPacket;
1296     register struct rx_packet *tp; /* Temporary packet pointer */
1297     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1298
1299     /* Free any packets from the last call to ReadvProc/WritevProc */
1300     for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1301         queue_Remove(tp);
1302         rxi_FreePacket(tp);
1303     }
1304
1305     if (call->mode == RX_MODE_SENDING) {
1306
1307         call->mode = (call->conn->type == RX_CLIENT_CONNECTION ?
1308                                           RX_MODE_RECEIVING: RX_MODE_EOF);
1309
1310 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1311         /* Wait until TQ_BUSY is reset before adding any
1312          * packets to the transmit queue
1313          */
1314         while (call->flags & RX_CALL_TQ_BUSY) {
1315             call->flags |= RX_CALL_TQ_WAIT;
1316 #ifdef RX_ENABLE_LOCKS
1317             CV_WAIT(&call->cv_tq, &call->lock);
1318 #else /* RX_ENABLE_LOCKS */
1319             osi_rxSleep(&call->tq);
1320 #endif /* RX_ENABLE_LOCKS */
1321         }
1322 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1323
1324         if (cp) {
1325             /* cp->length is only supposed to be the user's data */
1326             /* cp->length was already set to (then-current) 
1327              * MaxUserDataSize or less. */
1328             cp->length -= call->nFree;
1329             call->currentPacket = (struct rx_packet *) 0;
1330             call->nFree = 0;
1331         }
1332         else {
1333             cp = rxi_AllocSendPacket(call,0);
1334             if (!cp) {
1335                 /* Mode can no longer be MODE_SENDING */
1336                 return;
1337             }
1338             cp->length = 0;
1339             cp->niovecs = 1;  /* just the header */
1340             call->nFree = 0;
1341         }
1342
1343         /* The 1 specifies that this is the last packet */
1344         hadd32(call->bytesSent, cp->length);
1345         rxi_PrepareSendPacket(call, cp, 1);
1346         queue_Append(&call->tq, cp);
1347         if (!(call->flags & (RX_CALL_FAST_RECOVER|
1348                              RX_CALL_FAST_RECOVER_WAIT))) {
1349             rxi_Start(0, call, 0);
1350         }
1351     }
1352 }
1353
1354 /* Flush any buffered data to the stream, switch to read mode
1355  * (clients) or to EOF mode (servers) */
1356 void rx_FlushWrite(call)
1357   struct rx_call *call;
1358 {
1359     SPLVAR;
1360     NETPRI;
1361     AFS_RXGLOCK();
1362     MUTEX_ENTER(&call->lock);
1363     rxi_FlushWrite(call);
1364     MUTEX_EXIT(&call->lock);
1365     AFS_RXGUNLOCK();
1366     USERPRI;
1367 }