rx: Add a helper function for delayed acks
[openafs.git] / src / rx / rx_rdwr.c
1  /*
2   * Copyright 2000, International Business Machines Corporation and others.
3   * All Rights Reserved.
4   *
5   * This software has been released under the terms of the IBM Public
6   * License.  For details, see the LICENSE file in the top-level source
7   * directory or online at http://www.openafs.org/dl/license10.html
8   */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #ifdef KERNEL
14 # ifndef UKERNEL
15 #  ifdef RX_KERNEL_TRACE
16 #   include "rx_kcommon.h"
17 #  endif
18 #  if defined(AFS_DARWIN_ENV) || defined(AFS_XBSD_ENV)
19 #   include "afs/sysincludes.h"
20 #  else
21 #   include "h/types.h"
22 #   include "h/time.h"
23 #   include "h/stat.h"
24 #   if defined(AFS_AIX_ENV) || defined(AFS_AUX_ENV) || defined(AFS_SUN5_ENV)
25 #    include "h/systm.h"
26 #   endif
27 #   ifdef       AFS_OSF_ENV
28 #    include <net/net_globals.h>
29 #   endif /* AFS_OSF_ENV */
30 #   ifdef AFS_LINUX20_ENV
31 #    include "h/socket.h"
32 #   endif
33 #   include "netinet/in.h"
34 #   if defined(AFS_SGI_ENV)
35 #    include "afs/sysincludes.h"
36 #   endif
37 #  endif
38 #  include "afs/afs_args.h"
39 #  if   (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
40 #   include "h/systm.h"
41 #  endif
42 # else /* !UKERNEL */
43 #  include "afs/sysincludes.h"
44 # endif /* !UKERNEL */
45
46 # ifdef RXDEBUG
47 #  undef RXDEBUG                        /* turn off debugging */
48 # endif /* RXDEBUG */
49
50 # include "afs/afs_osi.h"
51 # include "rx_kmutex.h"
52 # include "rx/rx_kernel.h"
53 # include "afs/lock.h"
54 #else /* KERNEL */
55 # include <roken.h>
56 #endif /* KERNEL */
57
58 #include "rx.h"
59 #include "rx_clock.h"
60 #include "rx_queue.h"
61 #include "rx_globals.h"
62
63 #ifdef RX_LOCKS_DB
64 /* rxdb_fileID is used to identify the lock location, along with line#. */
65 static int rxdb_fileID = RXDB_FILE_RX_RDWR;
66 #endif /* RX_LOCKS_DB */
67 /* rxi_ReadProc -- internal version.
68  *
69  * LOCKS USED -- called at netpri
70  */
71 int
72 rxi_ReadProc(struct rx_call *call, char *buf,
73              int nbytes)
74 {
75     struct rx_packet *cp = call->currentPacket;
76     struct rx_packet *rp;
77     int requestCount;
78     unsigned int t;
79
80 /* XXXX took out clock_NewTime from here.  Was it needed? */
81     requestCount = nbytes;
82
83     /* Free any packets from the last call to ReadvProc/WritevProc */
84     if (queue_IsNotEmpty(&call->iovq)) {
85 #ifdef RXDEBUG_PACKET
86         call->iovqc -=
87 #endif /* RXDEBUG_PACKET */
88             rxi_FreePackets(0, &call->iovq);
89     }
90
91     do {
92         if (call->nLeft == 0) {
93             /* Get next packet */
94             MUTEX_ENTER(&call->lock);
95             for (;;) {
96                 if (call->error || (call->mode != RX_MODE_RECEIVING)) {
97                     if (call->error) {
98                         call->mode = RX_MODE_ERROR;
99                         MUTEX_EXIT(&call->lock);
100                         return 0;
101                     }
102                     if (call->mode == RX_MODE_SENDING) {
103                         MUTEX_EXIT(&call->lock);
104                         rxi_FlushWrite(call);
105                         MUTEX_ENTER(&call->lock);
106                         continue;
107                     }
108                 }
109                 if (queue_IsNotEmpty(&call->rq)) {
110                     /* Check that next packet available is next in sequence */
111                     rp = queue_First(&call->rq, rx_packet);
112                     if (rp->header.seq == call->rnext) {
113                         afs_int32 error;
114                         struct rx_connection *conn = call->conn;
115                         queue_Remove(rp);
116 #ifdef RX_TRACK_PACKETS
117                         rp->flags &= ~RX_PKTFLAG_RQ;
118 #endif
119 #ifdef RXDEBUG_PACKET
120                         call->rqc--;
121 #endif /* RXDEBUG_PACKET */
122
123                         /* RXS_CheckPacket called to undo RXS_PreparePacket's
124                          * work.  It may reduce the length of the packet by up
125                          * to conn->maxTrailerSize, to reflect the length of the
126                          * data + the header. */
127                         if ((error =
128                              RXS_CheckPacket(conn->securityObject, call,
129                                              rp))) {
130                             /* Used to merely shut down the call, but now we
131                              * shut down the whole connection since this may
132                              * indicate an attempt to hijack it */
133
134                             MUTEX_EXIT(&call->lock);
135                             rxi_ConnectionError(conn, error);
136                             MUTEX_ENTER(&conn->conn_data_lock);
137                             rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
138                             MUTEX_EXIT(&conn->conn_data_lock);
139                             rxi_FreePacket(rp);
140
141                             return 0;
142                         }
143                         call->rnext++;
144                         cp = call->currentPacket = rp;
145 #ifdef RX_TRACK_PACKETS
146                         call->currentPacket->flags |= RX_PKTFLAG_CP;
147 #endif
148                         call->curvec = 1;       /* 0th vec is always header */
149                         /* begin at the beginning [ more or less ], continue
150                          * on until the end, then stop. */
151                         call->curpos =
152                             (char *)cp->wirevec[1].iov_base +
153                             call->conn->securityHeaderSize;
154                         call->curlen =
155                             cp->wirevec[1].iov_len -
156                             call->conn->securityHeaderSize;
157
158                         /* Notice that this code works correctly if the data
159                          * size is 0 (which it may be--no reply arguments from
160                          * server, for example).  This relies heavily on the
161                          * fact that the code below immediately frees the packet
162                          * (no yields, etc.).  If it didn't, this would be a
163                          * problem because a value of zero for call->nLeft
164                          * normally means that there is no read packet */
165                         call->nLeft = cp->length;
166                         hadd32(call->bytesRcvd, cp->length);
167
168                         /* Send a hard ack for every rxi_HardAckRate+1 packets
169                          * consumed. Otherwise schedule an event to send
170                          * the hard ack later on.
171                          */
172                         call->nHardAcks++;
173                         if (!(call->flags & RX_CALL_RECEIVE_DONE)) {
174                             if (call->nHardAcks > (u_short) rxi_HardAckRate) {
175                                 rxevent_Cancel(call->delayedAckEvent, call,
176                                                RX_CALL_REFCOUNT_DELAY);
177                                 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
178                             } else {
179                                 /* Delay to consolidate ack packets */
180                                 rxi_PostDelayedAckEvent(call,
181                                                         &rx_hardAckDelay);
182                             }
183                         }
184                         break;
185                     }
186                 }
187
188                 /*
189                  * If we reach this point either we have no packets in the
190                  * receive queue or the next packet in the queue is not the
191                  * one we are looking for.  There is nothing else for us to
192                  * do but wait for another packet to arrive.
193                  */
194
195                 /* Are there ever going to be any more packets? */
196                 if (call->flags & RX_CALL_RECEIVE_DONE) {
197                     MUTEX_EXIT(&call->lock);
198                     return requestCount - nbytes;
199                 }
200                 /* Wait for in-sequence packet */
201                 call->flags |= RX_CALL_READER_WAIT;
202                 clock_NewTime();
203                 call->startWait = clock_Sec();
204                 while (call->flags & RX_CALL_READER_WAIT) {
205 #ifdef  RX_ENABLE_LOCKS
206                     CV_WAIT(&call->cv_rq, &call->lock);
207 #else
208                     osi_rxSleep(&call->rq);
209 #endif
210                 }
211                 cp = call->currentPacket;
212
213                 call->startWait = 0;
214 #ifdef RX_ENABLE_LOCKS
215                 if (call->error) {
216                     MUTEX_EXIT(&call->lock);
217                     return 0;
218                 }
219 #endif /* RX_ENABLE_LOCKS */
220             }
221             MUTEX_EXIT(&call->lock);
222         } else
223             /* osi_Assert(cp); */
224             /* MTUXXX  this should be replaced by some error-recovery code before shipping */
225             /* yes, the following block is allowed to be the ELSE clause (or not) */
226             /* It's possible for call->nLeft to be smaller than any particular
227              * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
228              * reflects the size of the buffer.  We have to keep track of the
229              * number of bytes read in the length field of the packet struct.  On
230              * the final portion of a received packet, it's almost certain that
231              * call->nLeft will be smaller than the final buffer. */
232             while (nbytes && cp) {
233                 t = MIN((int)call->curlen, nbytes);
234                 t = MIN(t, (int)call->nLeft);
235                 memcpy(buf, call->curpos, t);
236                 buf += t;
237                 nbytes -= t;
238                 call->curpos += t;
239                 call->curlen -= t;
240                 call->nLeft -= t;
241
242                 if (!call->nLeft) {
243                     /* out of packet.  Get another one. */
244 #ifdef RX_TRACK_PACKETS
245                     call->currentPacket->flags &= ~RX_PKTFLAG_CP;
246 #endif
247                     rxi_FreePacket(cp);
248                     cp = call->currentPacket = (struct rx_packet *)0;
249                 } else if (!call->curlen) {
250                     /* need to get another struct iov */
251                     if (++call->curvec >= cp->niovecs) {
252                         /* current packet is exhausted, get ready for another */
253                         /* don't worry about curvec and stuff, they get set somewhere else */
254 #ifdef RX_TRACK_PACKETS
255                         call->currentPacket->flags &= ~RX_PKTFLAG_CP;
256 #endif
257                         rxi_FreePacket(cp);
258                         cp = call->currentPacket = (struct rx_packet *)0;
259                         call->nLeft = 0;
260                     } else {
261                         call->curpos =
262                             (char *)cp->wirevec[call->curvec].iov_base;
263                         call->curlen = cp->wirevec[call->curvec].iov_len;
264                     }
265                 }
266             }
267         if (!nbytes) {
268             /* user buffer is full, return */
269             return requestCount;
270         }
271
272     } while (nbytes);
273
274     return requestCount;
275 }
276
277 int
278 rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
279 {
280     int bytes;
281     int tcurlen;
282     int tnLeft;
283     char *tcurpos;
284     SPLVAR;
285
286     /* Free any packets from the last call to ReadvProc/WritevProc */
287     if (!queue_IsEmpty(&call->iovq)) {
288 #ifdef RXDEBUG_PACKET
289         call->iovqc -=
290 #endif /* RXDEBUG_PACKET */
291             rxi_FreePackets(0, &call->iovq);
292     }
293
294     /*
295      * Most common case, all of the data is in the current iovec.
296      * We are relying on nLeft being zero unless the call is in receive mode.
297      */
298     tcurlen = call->curlen;
299     tnLeft = call->nLeft;
300     if (!call->error && tcurlen > nbytes && tnLeft > nbytes) {
301         tcurpos = call->curpos;
302         memcpy(buf, tcurpos, nbytes);
303
304         call->curpos = tcurpos + nbytes;
305         call->curlen = tcurlen - nbytes;
306         call->nLeft = tnLeft - nbytes;
307
308         if (!call->nLeft && call->currentPacket != NULL) {
309             /* out of packet.  Get another one. */
310             rxi_FreePacket(call->currentPacket);
311             call->currentPacket = (struct rx_packet *)0;
312         }
313         return nbytes;
314     }
315
316     NETPRI;
317     bytes = rxi_ReadProc(call, buf, nbytes);
318     USERPRI;
319     return bytes;
320 }
321
322 /* Optimization for unmarshalling 32 bit integers */
323 int
324 rx_ReadProc32(struct rx_call *call, afs_int32 * value)
325 {
326     int bytes;
327     int tcurlen;
328     int tnLeft;
329     char *tcurpos;
330     SPLVAR;
331
332     /* Free any packets from the last call to ReadvProc/WritevProc */
333     if (!queue_IsEmpty(&call->iovq)) {
334 #ifdef RXDEBUG_PACKET
335         call->iovqc -=
336 #endif /* RXDEBUG_PACKET */
337             rxi_FreePackets(0, &call->iovq);
338     }
339
340     /*
341      * Most common case, all of the data is in the current iovec.
342      * We are relying on nLeft being zero unless the call is in receive mode.
343      */
344     tcurlen = call->curlen;
345     tnLeft = call->nLeft;
346     if (!call->error && tcurlen >= sizeof(afs_int32)
347         && tnLeft >= sizeof(afs_int32)) {
348         tcurpos = call->curpos;
349
350         memcpy((char *)value, tcurpos, sizeof(afs_int32));
351
352         call->curpos = tcurpos + sizeof(afs_int32);
353         call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
354         call->nLeft = (u_short)(tnLeft - sizeof(afs_int32));
355         if (!call->nLeft && call->currentPacket != NULL) {
356             /* out of packet.  Get another one. */
357             rxi_FreePacket(call->currentPacket);
358             call->currentPacket = (struct rx_packet *)0;
359         }
360         return sizeof(afs_int32);
361     }
362
363     NETPRI;
364     bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
365     USERPRI;
366
367     return bytes;
368 }
369
370 /* rxi_FillReadVec
371  *
372  * Uses packets in the receive queue to fill in as much of the
373  * current iovec as possible. Does not block if it runs out
374  * of packets to complete the iovec. Return true if an ack packet
375  * was sent, otherwise return false */
376 int
377 rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
378 {
379     int didConsume = 0;
380     int didHardAck = 0;
381     unsigned int t;
382     struct rx_packet *rp;
383     struct rx_packet *curp;
384     struct iovec *call_iov;
385     struct iovec *cur_iov = NULL;
386
387     curp = call->currentPacket;
388     if (curp) {
389         cur_iov = &curp->wirevec[call->curvec];
390     }
391     call_iov = &call->iov[call->iovNext];
392
393     while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) {
394         if (call->nLeft == 0) {
395             /* Get next packet */
396             if (queue_IsNotEmpty(&call->rq)) {
397                 /* Check that next packet available is next in sequence */
398                 rp = queue_First(&call->rq, rx_packet);
399                 if (rp->header.seq == call->rnext) {
400                     afs_int32 error;
401                     struct rx_connection *conn = call->conn;
402                     queue_Remove(rp);
403 #ifdef RX_TRACK_PACKETS
404                     rp->flags &= ~RX_PKTFLAG_RQ;
405 #endif
406 #ifdef RXDEBUG_PACKET
407                     call->rqc--;
408 #endif /* RXDEBUG_PACKET */
409
410                     /* RXS_CheckPacket called to undo RXS_PreparePacket's
411                      * work.  It may reduce the length of the packet by up
412                      * to conn->maxTrailerSize, to reflect the length of the
413                      * data + the header. */
414                     if ((error =
415                          RXS_CheckPacket(conn->securityObject, call, rp))) {
416                         /* Used to merely shut down the call, but now we
417                          * shut down the whole connection since this may
418                          * indicate an attempt to hijack it */
419
420                         MUTEX_EXIT(&call->lock);
421                         rxi_ConnectionError(conn, error);
422                         MUTEX_ENTER(&conn->conn_data_lock);
423                         rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
424                         MUTEX_EXIT(&conn->conn_data_lock);
425                         rxi_FreePacket(rp);
426                         MUTEX_ENTER(&call->lock);
427
428                         return 1;
429                     }
430                     call->rnext++;
431                     curp = call->currentPacket = rp;
432 #ifdef RX_TRACK_PACKETS
433                     call->currentPacket->flags |= RX_PKTFLAG_CP;
434 #endif
435                     call->curvec = 1;   /* 0th vec is always header */
436                     cur_iov = &curp->wirevec[1];
437                     /* begin at the beginning [ more or less ], continue
438                      * on until the end, then stop. */
439                     call->curpos =
440                         (char *)curp->wirevec[1].iov_base +
441                         call->conn->securityHeaderSize;
442                     call->curlen =
443                         curp->wirevec[1].iov_len -
444                         call->conn->securityHeaderSize;
445
446                     /* Notice that this code works correctly if the data
447                      * size is 0 (which it may be--no reply arguments from
448                      * server, for example).  This relies heavily on the
449                      * fact that the code below immediately frees the packet
450                      * (no yields, etc.).  If it didn't, this would be a
451                      * problem because a value of zero for call->nLeft
452                      * normally means that there is no read packet */
453                     call->nLeft = curp->length;
454                     hadd32(call->bytesRcvd, curp->length);
455
456                     /* Send a hard ack for every rxi_HardAckRate+1 packets
457                      * consumed. Otherwise schedule an event to send
458                      * the hard ack later on.
459                      */
460                     call->nHardAcks++;
461                     didConsume = 1;
462                     continue;
463                 }
464             }
465             break;
466         }
467
468         /* It's possible for call->nLeft to be smaller than any particular
469          * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
470          * reflects the size of the buffer.  We have to keep track of the
471          * number of bytes read in the length field of the packet struct.  On
472          * the final portion of a received packet, it's almost certain that
473          * call->nLeft will be smaller than the final buffer. */
474         while (call->iovNBytes && call->iovNext < call->iovMax && curp) {
475
476             t = MIN((int)call->curlen, call->iovNBytes);
477             t = MIN(t, (int)call->nLeft);
478             call_iov->iov_base = call->curpos;
479             call_iov->iov_len = t;
480             call_iov++;
481             call->iovNext++;
482             call->iovNBytes -= t;
483             call->curpos += t;
484             call->curlen -= t;
485             call->nLeft -= t;
486
487             if (!call->nLeft) {
488                 /* out of packet.  Get another one. */
489 #ifdef RX_TRACK_PACKETS
490                 curp->flags &= ~RX_PKTFLAG_CP;
491                 curp->flags |= RX_PKTFLAG_IOVQ;
492 #endif
493                 queue_Append(&call->iovq, curp);
494 #ifdef RXDEBUG_PACKET
495                 call->iovqc++;
496 #endif /* RXDEBUG_PACKET */
497                 curp = call->currentPacket = (struct rx_packet *)0;
498             } else if (!call->curlen) {
499                 /* need to get another struct iov */
500                 if (++call->curvec >= curp->niovecs) {
501                     /* current packet is exhausted, get ready for another */
502                     /* don't worry about curvec and stuff, they get set somewhere else */
503 #ifdef RX_TRACK_PACKETS
504                     curp->flags &= ~RX_PKTFLAG_CP;
505                     curp->flags |= RX_PKTFLAG_IOVQ;
506 #endif
507                     queue_Append(&call->iovq, curp);
508 #ifdef RXDEBUG_PACKET
509                     call->iovqc++;
510 #endif /* RXDEBUG_PACKET */
511                     curp = call->currentPacket = (struct rx_packet *)0;
512                     call->nLeft = 0;
513                 } else {
514                     cur_iov++;
515                     call->curpos = (char *)cur_iov->iov_base;
516                     call->curlen = cur_iov->iov_len;
517                 }
518             }
519         }
520     }
521
522     /* If we consumed any packets then check whether we need to
523      * send a hard ack. */
524     if (didConsume && (!(call->flags & RX_CALL_RECEIVE_DONE))) {
525         if (call->nHardAcks > (u_short) rxi_HardAckRate) {
526             rxevent_Cancel(call->delayedAckEvent, call,
527                            RX_CALL_REFCOUNT_DELAY);
528             rxi_SendAck(call, 0, serial, RX_ACK_DELAY, 0);
529             didHardAck = 1;
530         } else {
531             /* Delay to consolidate ack packets */
532             rxi_PostDelayedAckEvent(call, &rx_hardAckDelay);
533         }
534     }
535     return didHardAck;
536 }
537
538
539 /* rxi_ReadvProc -- internal version.
540  *
541  * Fills in an iovec with pointers to the packet buffers. All packets
542  * except the last packet (new current packet) are moved to the iovq
543  * while the application is processing the data.
544  *
545  * LOCKS USED -- called at netpri.
546  */
547 int
548 rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
549               int nbytes)
550 {
551     int bytes;
552
553     /* Free any packets from the last call to ReadvProc/WritevProc */
554     if (queue_IsNotEmpty(&call->iovq)) {
555 #ifdef RXDEBUG_PACKET
556         call->iovqc -=
557 #endif /* RXDEBUG_PACKET */
558             rxi_FreePackets(0, &call->iovq);
559     }
560
561     if (call->mode == RX_MODE_SENDING) {
562         rxi_FlushWrite(call);
563     }
564
565     MUTEX_ENTER(&call->lock);
566     if (call->error)
567         goto error;
568
569     /* Get whatever data is currently available in the receive queue.
570      * If rxi_FillReadVec sends an ack packet then it is possible
571      * that we will receive more data while we drop the call lock
572      * to send the packet. Set the RX_CALL_IOVEC_WAIT flag
573      * here to avoid a race with the receive thread if we send
574      * hard acks in rxi_FillReadVec. */
575     call->flags |= RX_CALL_IOVEC_WAIT;
576     call->iovNBytes = nbytes;
577     call->iovMax = maxio;
578     call->iovNext = 0;
579     call->iov = iov;
580     rxi_FillReadVec(call, 0);
581
582     /* if we need more data then sleep until the receive thread has
583      * filled in the rest. */
584     if (!call->error && call->iovNBytes && call->iovNext < call->iovMax
585         && !(call->flags & RX_CALL_RECEIVE_DONE)) {
586         call->flags |= RX_CALL_READER_WAIT;
587         clock_NewTime();
588         call->startWait = clock_Sec();
589         while (call->flags & RX_CALL_READER_WAIT) {
590 #ifdef  RX_ENABLE_LOCKS
591             CV_WAIT(&call->cv_rq, &call->lock);
592 #else
593             osi_rxSleep(&call->rq);
594 #endif
595         }
596         call->startWait = 0;
597     }
598     call->flags &= ~RX_CALL_IOVEC_WAIT;
599
600     if (call->error)
601         goto error;
602
603     call->iov = NULL;
604     *nio = call->iovNext;
605     bytes = nbytes - call->iovNBytes;
606     MUTEX_EXIT(&call->lock);
607     return bytes;
608
609   error:
610     MUTEX_EXIT(&call->lock);
611     call->mode = RX_MODE_ERROR;
612     return 0;
613 }
614
615 int
616 rx_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
617              int nbytes)
618 {
619     int bytes;
620     SPLVAR;
621
622     NETPRI;
623     bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
624     USERPRI;
625     return bytes;
626 }
627
628 /* rxi_WriteProc -- internal version.
629  *
630  * LOCKS USED -- called at netpri
631  */
632
633 int
634 rxi_WriteProc(struct rx_call *call, char *buf,
635               int nbytes)
636 {
637     struct rx_connection *conn = call->conn;
638     struct rx_packet *cp = call->currentPacket;
639     unsigned int t;
640     int requestCount = nbytes;
641
642     /* Free any packets from the last call to ReadvProc/WritevProc */
643     if (queue_IsNotEmpty(&call->iovq)) {
644 #ifdef RXDEBUG_PACKET
645         call->iovqc -=
646 #endif /* RXDEBUG_PACKET */
647             rxi_FreePackets(0, &call->iovq);
648     }
649
650     if (call->mode != RX_MODE_SENDING) {
651         if ((conn->type == RX_SERVER_CONNECTION)
652             && (call->mode == RX_MODE_RECEIVING)) {
653             call->mode = RX_MODE_SENDING;
654             if (cp) {
655 #ifdef RX_TRACK_PACKETS
656                 cp->flags &= ~RX_PKTFLAG_CP;
657 #endif
658                 rxi_FreePacket(cp);
659                 cp = call->currentPacket = (struct rx_packet *)0;
660                 call->nLeft = 0;
661                 call->nFree = 0;
662             }
663         } else {
664             return 0;
665         }
666     }
667
668     /* Loop condition is checked at end, so that a write of 0 bytes
669      * will force a packet to be created--specially for the case where
670      * there are 0 bytes on the stream, but we must send a packet
671      * anyway. */
672     do {
673         if (call->nFree == 0) {
674             MUTEX_ENTER(&call->lock);
675             cp = call->currentPacket;
676             if (call->error)
677                 call->mode = RX_MODE_ERROR;
678             if (!call->error && cp) {
679                 /* Clear the current packet now so that if
680                  * we are forced to wait and drop the lock
681                  * the packet we are planning on using
682                  * cannot be freed.
683                  */
684 #ifdef RX_TRACK_PACKETS
685                 cp->flags &= ~RX_PKTFLAG_CP;
686 #endif
687                 call->currentPacket = (struct rx_packet *)0;
688                 clock_NewTime();        /* Bogus:  need new time package */
689                 /* The 0, below, specifies that it is not the last packet:
690                  * there will be others. PrepareSendPacket may
691                  * alter the packet length by up to
692                  * conn->securityMaxTrailerSize */
693                 hadd32(call->bytesSent, cp->length);
694                 rxi_PrepareSendPacket(call, cp, 0);
695 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
696                 /* PrepareSendPacket drops the call lock */
697                 rxi_WaitforTQBusy(call);
698 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
699 #ifdef RX_TRACK_PACKETS
700                 cp->flags |= RX_PKTFLAG_TQ;
701 #endif
702                 queue_Append(&call->tq, cp);
703 #ifdef RXDEBUG_PACKET
704                 call->tqc++;
705 #endif /* RXDEBUG_PACKET */
706                 cp = (struct rx_packet *)0;
707                 /* If the call is in recovery, let it exhaust its current
708                  * retransmit queue before forcing it to send new packets
709                  */
710                 if (!(call->flags & (RX_CALL_FAST_RECOVER))) {
711                     rxi_Start(call, 0);
712                 }
713             } else if (cp) {
714 #ifdef RX_TRACK_PACKETS
715                 cp->flags &= ~RX_PKTFLAG_CP;
716 #endif
717                 rxi_FreePacket(cp);
718                 cp = call->currentPacket = (struct rx_packet *)0;
719             }
720             /* Wait for transmit window to open up */
721             while (!call->error
722                    && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
723                 clock_NewTime();
724                 call->startWait = clock_Sec();
725
726 #ifdef  RX_ENABLE_LOCKS
727                 CV_WAIT(&call->cv_twind, &call->lock);
728 #else
729                 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
730                 osi_rxSleep(&call->twind);
731 #endif
732
733                 call->startWait = 0;
734 #ifdef RX_ENABLE_LOCKS
735                 if (call->error) {
736                     call->mode = RX_MODE_ERROR;
737                     MUTEX_EXIT(&call->lock);
738                     return 0;
739                 }
740 #endif /* RX_ENABLE_LOCKS */
741             }
742             if ((cp = rxi_AllocSendPacket(call, nbytes))) {
743 #ifdef RX_TRACK_PACKETS
744                 cp->flags |= RX_PKTFLAG_CP;
745 #endif
746                 call->currentPacket = cp;
747                 call->nFree = cp->length;
748                 call->curvec = 1;       /* 0th vec is always header */
749                 /* begin at the beginning [ more or less ], continue
750                  * on until the end, then stop. */
751                 call->curpos =
752                     (char *)cp->wirevec[1].iov_base +
753                     call->conn->securityHeaderSize;
754                 call->curlen =
755                     cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
756             }
757             if (call->error) {
758                 call->mode = RX_MODE_ERROR;
759                 if (cp) {
760 #ifdef RX_TRACK_PACKETS
761                     cp->flags &= ~RX_PKTFLAG_CP;
762 #endif
763                     rxi_FreePacket(cp);
764                     call->currentPacket = NULL;
765                 }
766                 MUTEX_EXIT(&call->lock);
767                 return 0;
768             }
769             MUTEX_EXIT(&call->lock);
770         }
771
772         if (cp && (int)call->nFree < nbytes) {
773             /* Try to extend the current buffer */
774             int len, mud;
775             len = cp->length;
776             mud = rx_MaxUserDataSize(call);
777             if (mud > len) {
778                 int want;
779                 want = MIN(nbytes - (int)call->nFree, mud - len);
780                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
781                 if (cp->length > (unsigned)mud)
782                     cp->length = mud;
783                 call->nFree += (cp->length - len);
784             }
785         }
786
787         /* If the remaining bytes fit in the buffer, then store them
788          * and return.  Don't ship a buffer that's full immediately to
789          * the peer--we don't know if it's the last buffer yet */
790
791         if (!cp) {
792             call->nFree = 0;
793         }
794
795         while (nbytes && call->nFree) {
796
797             t = MIN((int)call->curlen, nbytes);
798             t = MIN((int)call->nFree, t);
799             memcpy(call->curpos, buf, t);
800             buf += t;
801             nbytes -= t;
802             call->curpos += t;
803             call->curlen -= (u_short)t;
804             call->nFree -= (u_short)t;
805
806             if (!call->curlen) {
807                 /* need to get another struct iov */
808                 if (++call->curvec >= cp->niovecs) {
809                     /* current packet is full, extend or send it */
810                     call->nFree = 0;
811                 } else {
812                     call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
813                     call->curlen = cp->wirevec[call->curvec].iov_len;
814                 }
815             }
816         }                       /* while bytes to send and room to send them */
817
818         /* might be out of space now */
819         if (!nbytes) {
820             return requestCount;
821         } else;                 /* more data to send, so get another packet and keep going */
822     } while (nbytes);
823
824     return requestCount - nbytes;
825 }
826
827 int
828 rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
829 {
830     int bytes;
831     int tcurlen;
832     int tnFree;
833     char *tcurpos;
834     SPLVAR;
835
836     /* Free any packets from the last call to ReadvProc/WritevProc */
837     if (queue_IsNotEmpty(&call->iovq)) {
838 #ifdef RXDEBUG_PACKET
839         call->iovqc -=
840 #endif /* RXDEBUG_PACKET */
841             rxi_FreePackets(0, &call->iovq);
842     }
843
844     /*
845      * Most common case: all of the data fits in the current iovec.
846      * We are relying on nFree being zero unless the call is in send mode.
847      */
848     tcurlen = (int)call->curlen;
849     tnFree = (int)call->nFree;
850     if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
851         tcurpos = call->curpos;
852
853         memcpy(tcurpos, buf, nbytes);
854         call->curpos = tcurpos + nbytes;
855         call->curlen = (u_short)(tcurlen - nbytes);
856         call->nFree = (u_short)(tnFree - nbytes);
857         return nbytes;
858     }
859
860     NETPRI;
861     bytes = rxi_WriteProc(call, buf, nbytes);
862     USERPRI;
863     return bytes;
864 }
865
866 /* Optimization for marshalling 32 bit arguments */
867 int
868 rx_WriteProc32(struct rx_call *call, afs_int32 * value)
869 {
870     int bytes;
871     int tcurlen;
872     int tnFree;
873     char *tcurpos;
874     SPLVAR;
875
876     if (queue_IsNotEmpty(&call->iovq)) {
877 #ifdef RXDEBUG_PACKET
878         call->iovqc -=
879 #endif /* RXDEBUG_PACKET */
880             rxi_FreePackets(0, &call->iovq);
881     }
882
883     /*
884      * Most common case: all of the data fits in the current iovec.
885      * We are relying on nFree being zero unless the call is in send mode.
886      */
887     tcurlen = call->curlen;
888     tnFree = call->nFree;
889     if (!call->error && tcurlen >= sizeof(afs_int32)
890         && tnFree >= sizeof(afs_int32)) {
891         tcurpos = call->curpos;
892
893         if (!((size_t)tcurpos & (sizeof(afs_int32) - 1))) {
894             *((afs_int32 *) (tcurpos)) = *value;
895         } else {
896             memcpy(tcurpos, (char *)value, sizeof(afs_int32));
897         }
898         call->curpos = tcurpos + sizeof(afs_int32);
899         call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
900         call->nFree = (u_short)(tnFree - sizeof(afs_int32));
901         return sizeof(afs_int32);
902     }
903
904     NETPRI;
905     bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
906     USERPRI;
907     return bytes;
908 }
909
910 /* rxi_WritevAlloc -- internal version.
911  *
912  * Fill in an iovec to point to data in packet buffers. The application
913  * calls rxi_WritevProc when the buffers are full.
914  *
915  * LOCKS USED -- called at netpri.
916  */
917
918 static int
919 rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
920                 int nbytes)
921 {
922     struct rx_connection *conn = call->conn;
923     struct rx_packet *cp = call->currentPacket;
924     int requestCount;
925     int nextio;
926     /* Temporary values, real work is done in rxi_WritevProc */
927     int tnFree;
928     unsigned int tcurvec;
929     char *tcurpos;
930     int tcurlen;
931
932     requestCount = nbytes;
933     nextio = 0;
934
935     /* Free any packets from the last call to ReadvProc/WritevProc */
936     if (queue_IsNotEmpty(&call->iovq)) {
937 #ifdef RXDEBUG_PACKET
938         call->iovqc -=
939 #endif /* RXDEBUG_PACKET */
940             rxi_FreePackets(0, &call->iovq);
941     }
942
943     if (call->mode != RX_MODE_SENDING) {
944         if ((conn->type == RX_SERVER_CONNECTION)
945             && (call->mode == RX_MODE_RECEIVING)) {
946             call->mode = RX_MODE_SENDING;
947             if (cp) {
948 #ifdef RX_TRACK_PACKETS
949                 cp->flags &= ~RX_PKTFLAG_CP;
950 #endif
951                 rxi_FreePacket(cp);
952                 cp = call->currentPacket = (struct rx_packet *)0;
953                 call->nLeft = 0;
954                 call->nFree = 0;
955             }
956         } else {
957             return 0;
958         }
959     }
960
961     /* Set up the iovec to point to data in packet buffers. */
962     tnFree = call->nFree;
963     tcurvec = call->curvec;
964     tcurpos = call->curpos;
965     tcurlen = call->curlen;
966     do {
967         int t;
968
969         if (tnFree == 0) {
970             /* current packet is full, allocate a new one */
971             MUTEX_ENTER(&call->lock);
972             cp = rxi_AllocSendPacket(call, nbytes);
973             MUTEX_EXIT(&call->lock);
974             if (cp == NULL) {
975                 /* out of space, return what we have */
976                 *nio = nextio;
977                 return requestCount - nbytes;
978             }
979 #ifdef RX_TRACK_PACKETS
980             cp->flags |= RX_PKTFLAG_IOVQ;
981 #endif
982             queue_Append(&call->iovq, cp);
983 #ifdef RXDEBUG_PACKET
984             call->iovqc++;
985 #endif /* RXDEBUG_PACKET */
986             tnFree = cp->length;
987             tcurvec = 1;
988             tcurpos =
989                 (char *)cp->wirevec[1].iov_base +
990                 call->conn->securityHeaderSize;
991             tcurlen = cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
992         }
993
994         if (tnFree < nbytes) {
995             /* try to extend the current packet */
996             int len, mud;
997             len = cp->length;
998             mud = rx_MaxUserDataSize(call);
999             if (mud > len) {
1000                 int want;
1001                 want = MIN(nbytes - tnFree, mud - len);
1002                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
1003                 if (cp->length > (unsigned)mud)
1004                     cp->length = mud;
1005                 tnFree += (cp->length - len);
1006                 if (cp == call->currentPacket) {
1007                     call->nFree += (cp->length - len);
1008                 }
1009             }
1010         }
1011
1012         /* fill in the next entry in the iovec */
1013         t = MIN(tcurlen, nbytes);
1014         t = MIN(tnFree, t);
1015         iov[nextio].iov_base = tcurpos;
1016         iov[nextio].iov_len = t;
1017         nbytes -= t;
1018         tcurpos += t;
1019         tcurlen -= t;
1020         tnFree -= t;
1021         nextio++;
1022
1023         if (!tcurlen) {
1024             /* need to get another struct iov */
1025             if (++tcurvec >= cp->niovecs) {
1026                 /* current packet is full, extend it or move on to next packet */
1027                 tnFree = 0;
1028             } else {
1029                 tcurpos = (char *)cp->wirevec[tcurvec].iov_base;
1030                 tcurlen = cp->wirevec[tcurvec].iov_len;
1031             }
1032         }
1033     } while (nbytes && nextio < maxio);
1034     *nio = nextio;
1035     return requestCount - nbytes;
1036 }
1037
1038 int
1039 rx_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
1040                int nbytes)
1041 {
1042     int bytes;
1043     SPLVAR;
1044
1045     NETPRI;
1046     bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
1047     USERPRI;
1048     return bytes;
1049 }
1050
1051 /* rxi_WritevProc -- internal version.
1052  *
1053  * Send buffers allocated in rxi_WritevAlloc.
1054  *
1055  * LOCKS USED -- called at netpri.
1056  */
1057 int
1058 rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1059 {
1060     struct rx_packet *cp = NULL;
1061 #ifdef RX_TRACK_PACKETS
1062     struct rx_packet *p, *np;
1063 #endif
1064     int nextio;
1065     int requestCount;
1066     struct rx_queue tmpq;
1067 #ifdef RXDEBUG_PACKET
1068     u_short tmpqc;
1069 #endif
1070
1071     requestCount = nbytes;
1072     nextio = 0;
1073
1074     MUTEX_ENTER(&call->lock);
1075     if (call->error) {
1076         call->mode = RX_MODE_ERROR;
1077     } else if (call->mode != RX_MODE_SENDING) {
1078         call->error = RX_PROTOCOL_ERROR;
1079     }
1080 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1081     rxi_WaitforTQBusy(call);
1082 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1083     cp = call->currentPacket;
1084
1085     if (call->error) {
1086         call->mode = RX_MODE_ERROR;
1087         MUTEX_EXIT(&call->lock);
1088         if (cp) {
1089 #ifdef RX_TRACK_PACKETS
1090             cp->flags &= ~RX_PKTFLAG_CP;
1091             cp->flags |= RX_PKTFLAG_IOVQ;
1092 #endif
1093             queue_Prepend(&call->iovq, cp);
1094 #ifdef RXDEBUG_PACKET
1095             call->iovqc++;
1096 #endif /* RXDEBUG_PACKET */
1097             call->currentPacket = (struct rx_packet *)0;
1098         }
1099 #ifdef RXDEBUG_PACKET
1100         call->iovqc -=
1101 #endif /* RXDEBUG_PACKET */
1102             rxi_FreePackets(0, &call->iovq);
1103         return 0;
1104     }
1105
1106     /* Loop through the I/O vector adjusting packet pointers.
1107      * Place full packets back onto the iovq once they are ready
1108      * to send. Set RX_PROTOCOL_ERROR if any problems are found in
1109      * the iovec. We put the loop condition at the end to ensure that
1110      * a zero length write will push a short packet. */
1111     nextio = 0;
1112     queue_Init(&tmpq);
1113 #ifdef RXDEBUG_PACKET
1114     tmpqc = 0;
1115 #endif /* RXDEBUG_PACKET */
1116     do {
1117         if (call->nFree == 0 && cp) {
1118             clock_NewTime();    /* Bogus:  need new time package */
1119             /* The 0, below, specifies that it is not the last packet:
1120              * there will be others. PrepareSendPacket may
1121              * alter the packet length by up to
1122              * conn->securityMaxTrailerSize */
1123             hadd32(call->bytesSent, cp->length);
1124             rxi_PrepareSendPacket(call, cp, 0);
1125 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1126             /* PrepareSendPacket drops the call lock */
1127             rxi_WaitforTQBusy(call);
1128 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1129             queue_Append(&tmpq, cp);
1130 #ifdef RXDEBUG_PACKET
1131             tmpqc++;
1132 #endif /* RXDEBUG_PACKET */
1133             cp = call->currentPacket = (struct rx_packet *)0;
1134
1135             /* The head of the iovq is now the current packet */
1136             if (nbytes) {
1137                 if (queue_IsEmpty(&call->iovq)) {
1138                     MUTEX_EXIT(&call->lock);
1139                     call->error = RX_PROTOCOL_ERROR;
1140 #ifdef RXDEBUG_PACKET
1141                     tmpqc -=
1142 #endif /* RXDEBUG_PACKET */
1143                         rxi_FreePackets(0, &tmpq);
1144                     return 0;
1145                 }
1146                 cp = queue_First(&call->iovq, rx_packet);
1147                 queue_Remove(cp);
1148 #ifdef RX_TRACK_PACKETS
1149                 cp->flags &= ~RX_PKTFLAG_IOVQ;
1150 #endif
1151 #ifdef RXDEBUG_PACKET
1152                 call->iovqc--;
1153 #endif /* RXDEBUG_PACKET */
1154 #ifdef RX_TRACK_PACKETS
1155                 cp->flags |= RX_PKTFLAG_CP;
1156 #endif
1157                 call->currentPacket = cp;
1158                 call->nFree = cp->length;
1159                 call->curvec = 1;
1160                 call->curpos =
1161                     (char *)cp->wirevec[1].iov_base +
1162                     call->conn->securityHeaderSize;
1163                 call->curlen =
1164                     cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
1165             }
1166         }
1167
1168         if (nbytes) {
1169             /* The next iovec should point to the current position */
1170             if (iov[nextio].iov_base != call->curpos
1171                 || iov[nextio].iov_len > (int)call->curlen) {
1172                 call->error = RX_PROTOCOL_ERROR;
1173                 MUTEX_EXIT(&call->lock);
1174                 if (cp) {
1175 #ifdef RX_TRACK_PACKETS
1176                     cp->flags &= ~RX_PKTFLAG_CP;
1177 #endif
1178                     queue_Prepend(&tmpq, cp);
1179 #ifdef RXDEBUG_PACKET
1180                     tmpqc++;
1181 #endif /* RXDEBUG_PACKET */
1182                     cp = call->currentPacket = (struct rx_packet *)0;
1183                 }
1184 #ifdef RXDEBUG_PACKET
1185                 tmpqc -=
1186 #endif /* RXDEBUG_PACKET */
1187                     rxi_FreePackets(0, &tmpq);
1188                 return 0;
1189             }
1190             nbytes -= iov[nextio].iov_len;
1191             call->curpos += iov[nextio].iov_len;
1192             call->curlen -= iov[nextio].iov_len;
1193             call->nFree -= iov[nextio].iov_len;
1194             nextio++;
1195             if (call->curlen == 0) {
1196                 if (++call->curvec > cp->niovecs) {
1197                     call->nFree = 0;
1198                 } else {
1199                     call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
1200                     call->curlen = cp->wirevec[call->curvec].iov_len;
1201                 }
1202             }
1203         }
1204     } while (nbytes && nextio < nio);
1205
1206     /* Move the packets from the temporary queue onto the transmit queue.
1207      * We may end up with more than call->twind packets on the queue. */
1208
1209 #ifdef RX_TRACK_PACKETS
1210     for (queue_Scan(&tmpq, p, np, rx_packet))
1211     {
1212         p->flags |= RX_PKTFLAG_TQ;
1213     }
1214 #endif
1215
1216     if (call->error)
1217         call->mode = RX_MODE_ERROR;
1218
1219     queue_SpliceAppend(&call->tq, &tmpq);
1220
1221     /* If the call is in recovery, let it exhaust its current retransmit
1222      * queue before forcing it to send new packets
1223      */
1224     if (!(call->flags & RX_CALL_FAST_RECOVER)) {
1225         rxi_Start(call, 0);
1226     }
1227
1228     /* Wait for the length of the transmit queue to fall below call->twind */
1229     while (!call->error && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
1230         clock_NewTime();
1231         call->startWait = clock_Sec();
1232 #ifdef  RX_ENABLE_LOCKS
1233         CV_WAIT(&call->cv_twind, &call->lock);
1234 #else
1235         call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
1236         osi_rxSleep(&call->twind);
1237 #endif
1238         call->startWait = 0;
1239     }
1240
1241     /* cp is no longer valid since we may have given up the lock */
1242     cp = call->currentPacket;
1243
1244     if (call->error) {
1245         call->mode = RX_MODE_ERROR;
1246         call->currentPacket = NULL;
1247         MUTEX_EXIT(&call->lock);
1248         if (cp) {
1249 #ifdef RX_TRACK_PACKETS
1250             cp->flags &= ~RX_PKTFLAG_CP;
1251 #endif
1252             rxi_FreePacket(cp);
1253         }
1254         return 0;
1255     }
1256     MUTEX_EXIT(&call->lock);
1257
1258     return requestCount - nbytes;
1259 }
1260
1261 int
1262 rx_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1263 {
1264     int bytes;
1265     SPLVAR;
1266
1267     NETPRI;
1268     bytes = rxi_WritevProc(call, iov, nio, nbytes);
1269     USERPRI;
1270     return bytes;
1271 }
1272
1273 /* Flush any buffered data to the stream, switch to read mode
1274  * (clients) or to EOF mode (servers)
1275  *
1276  * LOCKS HELD: called at netpri.
1277  */
1278 void
1279 rxi_FlushWrite(struct rx_call *call)
1280 {
1281     struct rx_packet *cp = NULL;
1282
1283     /* Free any packets from the last call to ReadvProc/WritevProc */
1284     if (queue_IsNotEmpty(&call->iovq)) {
1285 #ifdef RXDEBUG_PACKET
1286         call->iovqc -=
1287 #endif /* RXDEBUG_PACKET */
1288             rxi_FreePackets(0, &call->iovq);
1289     }
1290
1291     if (call->mode == RX_MODE_SENDING) {
1292
1293         call->mode =
1294             (call->conn->type ==
1295              RX_CLIENT_CONNECTION ? RX_MODE_RECEIVING : RX_MODE_EOF);
1296
1297 #ifdef RX_KERNEL_TRACE
1298         {
1299             int glockOwner = ISAFS_GLOCK();
1300             if (!glockOwner)
1301                 AFS_GLOCK();
1302             afs_Trace3(afs_iclSetp, CM_TRACE_WASHERE, ICL_TYPE_STRING,
1303                        __FILE__, ICL_TYPE_INT32, __LINE__, ICL_TYPE_POINTER,
1304                        call);
1305             if (!glockOwner)
1306                 AFS_GUNLOCK();
1307         }
1308 #endif
1309
1310         MUTEX_ENTER(&call->lock);
1311         if (call->error)
1312             call->mode = RX_MODE_ERROR;
1313
1314         cp = call->currentPacket;
1315
1316         if (cp) {
1317             /* cp->length is only supposed to be the user's data */
1318             /* cp->length was already set to (then-current)
1319              * MaxUserDataSize or less. */
1320 #ifdef RX_TRACK_PACKETS
1321             cp->flags &= ~RX_PKTFLAG_CP;
1322 #endif
1323             cp->length -= call->nFree;
1324             call->currentPacket = (struct rx_packet *)0;
1325             call->nFree = 0;
1326         } else {
1327             cp = rxi_AllocSendPacket(call, 0);
1328             if (!cp) {
1329                 /* Mode can no longer be MODE_SENDING */
1330                 return;
1331             }
1332             cp->length = 0;
1333             cp->niovecs = 2;    /* header + space for rxkad stuff */
1334             call->nFree = 0;
1335         }
1336
1337         /* The 1 specifies that this is the last packet */
1338         hadd32(call->bytesSent, cp->length);
1339         rxi_PrepareSendPacket(call, cp, 1);
1340 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1341         /* PrepareSendPacket drops the call lock */
1342         rxi_WaitforTQBusy(call);
1343 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1344 #ifdef RX_TRACK_PACKETS
1345         cp->flags |= RX_PKTFLAG_TQ;
1346 #endif
1347         queue_Append(&call->tq, cp);
1348 #ifdef RXDEBUG_PACKET
1349         call->tqc++;
1350 #endif /* RXDEBUG_PACKET */
1351
1352         /* If the call is in recovery, let it exhaust its current retransmit
1353          * queue before forcing it to send new packets
1354          */
1355         if (!(call->flags & RX_CALL_FAST_RECOVER)) {
1356             rxi_Start(call, 0);
1357         }
1358         MUTEX_EXIT(&call->lock);
1359     }
1360 }
1361
1362 /* Flush any buffered data to the stream, switch to read mode
1363  * (clients) or to EOF mode (servers) */
1364 void
1365 rx_FlushWrite(struct rx_call *call)
1366 {
1367     SPLVAR;
1368     NETPRI;
1369     rxi_FlushWrite(call);
1370     USERPRI;
1371 }