98c0c857cb5b10f75b9fcf2e37686efbba658ed7
[openafs.git] / src / rx / rx_rdwr.c
1  /*
2   * Copyright 2000, International Business Machines Corporation and others.
3   * All Rights Reserved.
4   *
5   * This software has been released under the terms of the IBM Public
6   * License.  For details, see the LICENSE file in the top-level source
7   * directory or online at http://www.openafs.org/dl/license10.html
8   */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #ifdef KERNEL
14 # ifndef UKERNEL
15 #  ifdef RX_KERNEL_TRACE
16 #   include "rx_kcommon.h"
17 #  endif
18 #  if defined(AFS_DARWIN_ENV) || defined(AFS_XBSD_ENV)
19 #   include "afs/sysincludes.h"
20 #  else
21 #   include "h/types.h"
22 #   include "h/time.h"
23 #   include "h/stat.h"
24 #   if defined(AFS_AIX_ENV) || defined(AFS_AUX_ENV) || defined(AFS_SUN5_ENV)
25 #    include "h/systm.h"
26 #   endif
27 #   ifdef       AFS_OSF_ENV
28 #    include <net/net_globals.h>
29 #   endif /* AFS_OSF_ENV */
30 #   ifdef AFS_LINUX20_ENV
31 #    include "h/socket.h"
32 #   endif
33 #   include "netinet/in.h"
34 #   if defined(AFS_SGI_ENV)
35 #    include "afs/sysincludes.h"
36 #   endif
37 #  endif
38 #  include "afs/afs_args.h"
39 #  if   (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
40 #   include "h/systm.h"
41 #  endif
42 # else /* !UKERNEL */
43 #  include "afs/sysincludes.h"
44 # endif /* !UKERNEL */
45
46 # ifdef RXDEBUG
47 #  undef RXDEBUG                        /* turn off debugging */
48 # endif /* RXDEBUG */
49
50 # include "afs/afs_osi.h"
51 # include "rx_kmutex.h"
52 # include "rx/rx_kernel.h"
53 # include "afs/lock.h"
54 #else /* KERNEL */
55 # include <roken.h>
56 # include <afs/opr.h>
57 #endif /* KERNEL */
58
59 #include "rx.h"
60 #include "rx_clock.h"
61 #include "rx_globals.h"
62 #include "rx_atomic.h"
63 #include "rx_internal.h"
64 #include "rx_conn.h"
65 #include "rx_call.h"
66 #include "rx_packet.h"
67
68 #ifdef RX_LOCKS_DB
69 /* rxdb_fileID is used to identify the lock location, along with line#. */
70 static int rxdb_fileID = RXDB_FILE_RX_RDWR;
71 #endif /* RX_LOCKS_DB */
72
73 /* Get the next packet in the receive queue
74  *
75  * Dispose of the call's currentPacket, and move the next packet in the
76  * receive queue into the currentPacket field. If the next packet isn't
77  * available, then currentPacket is left NULL.
78  *
79  * @param call
80  *      The RX call to manipulate
81  * @returns
82  *      0 on success, an error code on failure
83  *
84  * @notes
85  *      Must be called with the call locked. Unlocks the call if returning
86  *      with an error.
87  */
88
89 static int
90 rxi_GetNextPacket(struct rx_call *call) {
91     struct rx_packet *rp;
92     int error;
93
94     if (call->app.currentPacket != NULL) {
95 #ifdef RX_TRACK_PACKETS
96         call->app.currentPacket->flags |= RX_PKTFLAG_CP;
97 #endif
98         rxi_FreePacket(call->app.currentPacket);
99         call->app.currentPacket = NULL;
100     }
101
102     if (opr_queue_IsEmpty(&call->rq))
103         return 0;
104
105     /* Check that next packet available is next in sequence */
106     rp = opr_queue_First(&call->rq, struct rx_packet, entry);
107     if (rp->header.seq != call->rnext)
108         return 0;
109
110     opr_queue_Remove(&rp->entry);
111 #ifdef RX_TRACK_PACKETS
112     rp->flags &= ~RX_PKTFLAG_RQ;
113 #endif
114 #ifdef RXDEBUG_PACKET
115     call->rqc--;
116 #endif /* RXDEBUG_PACKET */
117
118     /* RXS_CheckPacket called to undo RXS_PreparePacket's work.  It may
119      * reduce the length of the packet by up to conn->maxTrailerSize,
120      * to reflect the length of the data + the header. */
121     if ((error = RXS_CheckPacket(call->conn->securityObject, call, rp))) {
122         /* Used to merely shut down the call, but now we shut down the whole
123          * connection since this may indicate an attempt to hijack it */
124
125         MUTEX_EXIT(&call->lock);
126         rxi_ConnectionError(call->conn, error);
127         MUTEX_ENTER(&call->conn->conn_data_lock);
128         rp = rxi_SendConnectionAbort(call->conn, rp, 0, 0);
129         MUTEX_EXIT(&call->conn->conn_data_lock);
130         rxi_FreePacket(rp);
131
132         return error;
133      }
134
135     call->rnext++;
136     call->app.currentPacket = rp;
137 #ifdef RX_TRACK_PACKETS
138     call->app.currentPacket->flags |= RX_PKTFLAG_CP;
139 #endif
140     call->app.curvec = 1;       /* 0th vec is always header */
141
142     /* begin at the beginning [ more or less ], continue on until the end,
143      * then stop. */
144     call->app.curpos = (char *)call->app.currentPacket->wirevec[1].iov_base +
145                    call->conn->securityHeaderSize;
146     call->app.curlen = call->app.currentPacket->wirevec[1].iov_len -
147                    call->conn->securityHeaderSize;
148
149     call->app.nLeft = call->app.currentPacket->length;
150     call->bytesRcvd += call->app.currentPacket->length;
151
152     call->nHardAcks++;
153
154     return 0;
155 }
156
157 /* rxi_ReadProc -- internal version.
158  *
159  * LOCKS USED -- called at netpri
160  */
161 int
162 rxi_ReadProc(struct rx_call *call, char *buf,
163              int nbytes)
164 {
165     int requestCount;
166     int code;
167     unsigned int t;
168
169 /* XXXX took out clock_NewTime from here.  Was it needed? */
170     requestCount = nbytes;
171
172     /* Free any packets from the last call to ReadvProc/WritevProc */
173     if (!opr_queue_IsEmpty(&call->app.iovq)) {
174 #ifdef RXDEBUG_PACKET
175         call->iovqc -=
176 #endif /* RXDEBUG_PACKET */
177             rxi_FreePackets(0, &call->app.iovq);
178     }
179
180     do {
181         if (call->app.nLeft == 0) {
182             /* Get next packet */
183             MUTEX_ENTER(&call->lock);
184             for (;;) {
185                 if (call->error || (call->app.mode != RX_MODE_RECEIVING)) {
186                     if (call->error) {
187                         call->app.mode = RX_MODE_ERROR;
188                         MUTEX_EXIT(&call->lock);
189                         return 0;
190                     }
191                     if (call->app.mode == RX_MODE_SENDING) {
192                         MUTEX_EXIT(&call->lock);
193                         rxi_FlushWrite(call);
194                         MUTEX_ENTER(&call->lock);
195                         continue;
196                     }
197                 }
198
199                 code = rxi_GetNextPacket(call);
200                 if (code)
201                      return 0;
202
203                 if (call->app.currentPacket) {
204                     if (!(call->flags & RX_CALL_RECEIVE_DONE)) {
205                         if (call->nHardAcks > (u_short) rxi_HardAckRate) {
206                             rxevent_Cancel(&call->delayedAckEvent, call,
207                                            RX_CALL_REFCOUNT_DELAY);
208                             rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
209                         } else {
210                             /* Delay to consolidate ack packets */
211                             rxi_PostDelayedAckEvent(call, &rx_hardAckDelay);
212                         }
213                     }
214                     break;
215                 }
216
217                 /*
218                  * If we reach this point either we have no packets in the
219                  * receive queue or the next packet in the queue is not the
220                  * one we are looking for.  There is nothing else for us to
221                  * do but wait for another packet to arrive.
222                  */
223
224                 /* Are there ever going to be any more packets? */
225                 if (call->flags & RX_CALL_RECEIVE_DONE) {
226                     MUTEX_EXIT(&call->lock);
227                     return requestCount - nbytes;
228                 }
229                 /* Wait for in-sequence packet */
230                 call->flags |= RX_CALL_READER_WAIT;
231                 clock_NewTime();
232                 call->startWait = clock_Sec();
233                 while (call->flags & RX_CALL_READER_WAIT) {
234 #ifdef  RX_ENABLE_LOCKS
235                     CV_WAIT(&call->cv_rq, &call->lock);
236 #else
237                     osi_rxSleep(&call->rq);
238 #endif
239                 }
240
241                 call->startWait = 0;
242 #ifdef RX_ENABLE_LOCKS
243                 if (call->error) {
244                     MUTEX_EXIT(&call->lock);
245                     return 0;
246                 }
247 #endif /* RX_ENABLE_LOCKS */
248             }
249             MUTEX_EXIT(&call->lock);
250         } else
251             /* osi_Assert(cp); */
252             /* MTUXXX  this should be replaced by some error-recovery code before shipping */
253             /* yes, the following block is allowed to be the ELSE clause (or not) */
254             /* It's possible for call->app.nLeft to be smaller than any particular
255              * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
256              * reflects the size of the buffer.  We have to keep track of the
257              * number of bytes read in the length field of the packet struct.  On
258              * the final portion of a received packet, it's almost certain that
259              * call->app.nLeft will be smaller than the final buffer. */
260             while (nbytes && call->app.currentPacket) {
261                 t = MIN((int)call->app.curlen, nbytes);
262                 t = MIN(t, (int)call->app.nLeft);
263                 memcpy(buf, call->app.curpos, t);
264                 buf += t;
265                 nbytes -= t;
266                 call->app.curpos += t;
267                 call->app.curlen -= t;
268                 call->app.nLeft -= t;
269
270                 if (!call->app.nLeft) {
271                     /* out of packet.  Get another one. */
272 #ifdef RX_TRACK_PACKETS
273                     call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
274 #endif
275                     rxi_FreePacket(call->app.currentPacket);
276                     call->app.currentPacket = NULL;
277                 } else if (!call->app.curlen) {
278                     /* need to get another struct iov */
279                     if (++call->app.curvec >= call->app.currentPacket->niovecs) {
280                         /* current packet is exhausted, get ready for another */
281                         /* don't worry about curvec and stuff, they get set somewhere else */
282 #ifdef RX_TRACK_PACKETS
283                         call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
284 #endif
285                         rxi_FreePacket(call->app.currentPacket);
286                         call->app.currentPacket = NULL;
287                         call->app.nLeft = 0;
288                     } else {
289                         call->app.curpos =
290                             call->app.currentPacket->wirevec[call->app.curvec].iov_base;
291                         call->app.curlen =
292                             call->app.currentPacket->wirevec[call->app.curvec].iov_len;
293                     }
294                 }
295             }
296         if (!nbytes) {
297             /* user buffer is full, return */
298             return requestCount;
299         }
300
301     } while (nbytes);
302
303     return requestCount;
304 }
305
306 int
307 rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
308 {
309     int bytes;
310     SPLVAR;
311
312     /* Free any packets from the last call to ReadvProc/WritevProc */
313     if (!opr_queue_IsEmpty(&call->app.iovq)) {
314 #ifdef RXDEBUG_PACKET
315         call->iovqc -=
316 #endif /* RXDEBUG_PACKET */
317             rxi_FreePackets(0, &call->app.iovq);
318     }
319
320     /*
321      * Most common case, all of the data is in the current iovec.
322      * We are relying on nLeft being zero unless the call is in receive mode.
323      */
324     if (!call->error && call->app.curlen > nbytes && call->app.nLeft > nbytes) {
325         memcpy(buf, call->app.curpos, nbytes);
326
327         call->app.curpos += nbytes;
328         call->app.curlen -= nbytes;
329         call->app.nLeft  -= nbytes;
330
331         if (!call->app.nLeft && call->app.currentPacket != NULL) {
332             /* out of packet.  Get another one. */
333             rxi_FreePacket(call->app.currentPacket);
334             call->app.currentPacket = NULL;
335         }
336         return nbytes;
337     }
338
339     NETPRI;
340     bytes = rxi_ReadProc(call, buf, nbytes);
341     USERPRI;
342     return bytes;
343 }
344
345 /* Optimization for unmarshalling 32 bit integers */
346 int
347 rx_ReadProc32(struct rx_call *call, afs_int32 * value)
348 {
349     int bytes;
350     SPLVAR;
351
352     /* Free any packets from the last call to ReadvProc/WritevProc */
353     if (!opr_queue_IsEmpty(&call->app.iovq)) {
354 #ifdef RXDEBUG_PACKET
355         call->iovqc -=
356 #endif /* RXDEBUG_PACKET */
357             rxi_FreePackets(0, &call->app.iovq);
358     }
359
360     /*
361      * Most common case, all of the data is in the current iovec.
362      * We are relying on nLeft being zero unless the call is in receive mode.
363      */
364     if (!call->error && call->app.curlen >= sizeof(afs_int32)
365         && call->app.nLeft >= sizeof(afs_int32)) {
366
367         memcpy((char *)value, call->app.curpos, sizeof(afs_int32));
368
369         call->app.curpos += sizeof(afs_int32);
370         call->app.curlen -= sizeof(afs_int32);
371         call->app.nLeft  -= sizeof(afs_int32);
372
373         if (!call->app.nLeft && call->app.currentPacket != NULL) {
374             /* out of packet.  Get another one. */
375             rxi_FreePacket(call->app.currentPacket);
376             call->app.currentPacket = NULL;
377         }
378         return sizeof(afs_int32);
379     }
380
381     NETPRI;
382     bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
383     USERPRI;
384
385     return bytes;
386 }
387
388 /* rxi_FillReadVec
389  *
390  * Uses packets in the receive queue to fill in as much of the
391  * current iovec as possible. Does not block if it runs out
392  * of packets to complete the iovec. Return true if an ack packet
393  * was sent, otherwise return false */
394 int
395 rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
396 {
397     int didConsume = 0;
398     int didHardAck = 0;
399     int code;
400     unsigned int t;
401     struct iovec *call_iov;
402     struct iovec *cur_iov = NULL;
403
404     if (call->app.currentPacket) {
405         cur_iov = &call->app.currentPacket->wirevec[call->app.curvec];
406     }
407     call_iov = &call->iov[call->iovNext];
408
409     while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) {
410         if (call->app.nLeft == 0) {
411             /* Get next packet */
412             code = rxi_GetNextPacket(call);
413             if (code) {
414                 MUTEX_ENTER(&call->lock);
415                 return 1;
416             }
417
418             if (call->app.currentPacket) {
419                 cur_iov = &call->app.currentPacket->wirevec[1];
420                 didConsume = 1;
421                 continue;
422             } else {
423                 break;
424             }
425         }
426
427         /* It's possible for call->app.nLeft to be smaller than any particular
428          * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
429          * reflects the size of the buffer.  We have to keep track of the
430          * number of bytes read in the length field of the packet struct.  On
431          * the final portion of a received packet, it's almost certain that
432          * call->app.nLeft will be smaller than the final buffer. */
433         while (call->iovNBytes
434                && call->iovNext < call->iovMax
435                && call->app.currentPacket) {
436
437             t = MIN((int)call->app.curlen, call->iovNBytes);
438             t = MIN(t, (int)call->app.nLeft);
439             call_iov->iov_base = call->app.curpos;
440             call_iov->iov_len = t;
441             call_iov++;
442             call->iovNext++;
443             call->iovNBytes -= t;
444             call->app.curpos += t;
445             call->app.curlen -= t;
446             call->app.nLeft -= t;
447
448             if (!call->app.nLeft) {
449                 /* out of packet.  Get another one. */
450 #ifdef RX_TRACK_PACKETS
451                 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
452                 call->app.currentPacket->flags |= RX_PKTFLAG_IOVQ;
453 #endif
454                 opr_queue_Append(&call->app.iovq,
455                                  &call->app.currentPacket->entry);
456 #ifdef RXDEBUG_PACKET
457                 call->iovqc++;
458 #endif /* RXDEBUG_PACKET */
459                 call->app.currentPacket = NULL;
460             } else if (!call->app.curlen) {
461                 /* need to get another struct iov */
462                 if (++call->app.curvec >= call->app.currentPacket->niovecs) {
463                     /* current packet is exhausted, get ready for another */
464                     /* don't worry about curvec and stuff, they get set somewhere else */
465 #ifdef RX_TRACK_PACKETS
466                     call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
467                     call->app.currentPacket->flags |= RX_PKTFLAG_IOVQ;
468 #endif
469                     opr_queue_Append(&call->app.iovq,
470                                      &call->app.currentPacket->entry);
471 #ifdef RXDEBUG_PACKET
472                     call->iovqc++;
473 #endif /* RXDEBUG_PACKET */
474                     call->app.currentPacket = NULL;
475                     call->app.nLeft = 0;
476                 } else {
477                     cur_iov++;
478                     call->app.curpos = (char *)cur_iov->iov_base;
479                     call->app.curlen = cur_iov->iov_len;
480                 }
481             }
482         }
483     }
484
485     /* If we consumed any packets then check whether we need to
486      * send a hard ack. */
487     if (didConsume && (!(call->flags & RX_CALL_RECEIVE_DONE))) {
488         if (call->nHardAcks > (u_short) rxi_HardAckRate) {
489             rxevent_Cancel(&call->delayedAckEvent, call,
490                            RX_CALL_REFCOUNT_DELAY);
491             rxi_SendAck(call, 0, serial, RX_ACK_DELAY, 0);
492             didHardAck = 1;
493         } else {
494             /* Delay to consolidate ack packets */
495             rxi_PostDelayedAckEvent(call, &rx_hardAckDelay);
496         }
497     }
498     return didHardAck;
499 }
500
501
502 /* rxi_ReadvProc -- internal version.
503  *
504  * Fills in an iovec with pointers to the packet buffers. All packets
505  * except the last packet (new current packet) are moved to the iovq
506  * while the application is processing the data.
507  *
508  * LOCKS USED -- called at netpri.
509  */
510 int
511 rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
512               int nbytes)
513 {
514     int bytes;
515
516     /* Free any packets from the last call to ReadvProc/WritevProc */
517     if (!opr_queue_IsEmpty(&call->app.iovq)) {
518 #ifdef RXDEBUG_PACKET
519         call->iovqc -=
520 #endif /* RXDEBUG_PACKET */
521             rxi_FreePackets(0, &call->app.iovq);
522     }
523
524     if (call->app.mode == RX_MODE_SENDING) {
525         rxi_FlushWrite(call);
526     }
527
528     MUTEX_ENTER(&call->lock);
529     if (call->error)
530         goto error;
531
532     /* Get whatever data is currently available in the receive queue.
533      * If rxi_FillReadVec sends an ack packet then it is possible
534      * that we will receive more data while we drop the call lock
535      * to send the packet. Set the RX_CALL_IOVEC_WAIT flag
536      * here to avoid a race with the receive thread if we send
537      * hard acks in rxi_FillReadVec. */
538     call->flags |= RX_CALL_IOVEC_WAIT;
539     call->iovNBytes = nbytes;
540     call->iovMax = maxio;
541     call->iovNext = 0;
542     call->iov = iov;
543     rxi_FillReadVec(call, 0);
544
545     /* if we need more data then sleep until the receive thread has
546      * filled in the rest. */
547     if (!call->error && call->iovNBytes && call->iovNext < call->iovMax
548         && !(call->flags & RX_CALL_RECEIVE_DONE)) {
549         call->flags |= RX_CALL_READER_WAIT;
550         clock_NewTime();
551         call->startWait = clock_Sec();
552         while (call->flags & RX_CALL_READER_WAIT) {
553 #ifdef  RX_ENABLE_LOCKS
554             CV_WAIT(&call->cv_rq, &call->lock);
555 #else
556             osi_rxSleep(&call->rq);
557 #endif
558         }
559         call->startWait = 0;
560     }
561     call->flags &= ~RX_CALL_IOVEC_WAIT;
562
563     if (call->error)
564         goto error;
565
566     call->iov = NULL;
567     *nio = call->iovNext;
568     bytes = nbytes - call->iovNBytes;
569     MUTEX_EXIT(&call->lock);
570     return bytes;
571
572   error:
573     MUTEX_EXIT(&call->lock);
574     call->app.mode = RX_MODE_ERROR;
575     return 0;
576 }
577
578 int
579 rx_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
580              int nbytes)
581 {
582     int bytes;
583     SPLVAR;
584
585     NETPRI;
586     bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
587     USERPRI;
588     return bytes;
589 }
590
591 /* rxi_WriteProc -- internal version.
592  *
593  * LOCKS USED -- called at netpri
594  */
595
596 int
597 rxi_WriteProc(struct rx_call *call, char *buf,
598               int nbytes)
599 {
600     struct rx_connection *conn = call->conn;
601     unsigned int t;
602     int requestCount = nbytes;
603
604     /* Free any packets from the last call to ReadvProc/WritevProc */
605     if (!opr_queue_IsEmpty(&call->app.iovq)) {
606 #ifdef RXDEBUG_PACKET
607         call->iovqc -=
608 #endif /* RXDEBUG_PACKET */
609             rxi_FreePackets(0, &call->app.iovq);
610     }
611
612     if (call->app.mode != RX_MODE_SENDING) {
613         if ((conn->type == RX_SERVER_CONNECTION)
614             && (call->app.mode == RX_MODE_RECEIVING)) {
615             call->app.mode = RX_MODE_SENDING;
616             if (call->app.currentPacket) {
617 #ifdef RX_TRACK_PACKETS
618                 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
619 #endif
620                 rxi_FreePacket(call->app.currentPacket);
621                 call->app.currentPacket = NULL;
622                 call->app.nLeft = 0;
623                 call->app.nFree = 0;
624             }
625         } else {
626             return 0;
627         }
628     }
629
630     /* Loop condition is checked at end, so that a write of 0 bytes
631      * will force a packet to be created--specially for the case where
632      * there are 0 bytes on the stream, but we must send a packet
633      * anyway. */
634     do {
635         if (call->app.nFree == 0) {
636             MUTEX_ENTER(&call->lock);
637             if (call->error)
638                 call->app.mode = RX_MODE_ERROR;
639             if (!call->error && call->app.currentPacket) {
640                 clock_NewTime();        /* Bogus:  need new time package */
641                 /* The 0, below, specifies that it is not the last packet:
642                  * there will be others. PrepareSendPacket may
643                  * alter the packet length by up to
644                  * conn->securityMaxTrailerSize */
645                 call->bytesSent += call->app.currentPacket->length;
646                 rxi_PrepareSendPacket(call, call->app.currentPacket, 0);
647 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
648                 /* PrepareSendPacket drops the call lock */
649                 rxi_WaitforTQBusy(call);
650 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
651 #ifdef RX_TRACK_PACKETS
652                 call->app.currentPacket->flags |= RX_PKTFLAG_TQ;
653 #endif
654                 opr_queue_Append(&call->tq,
655                                  &call->app.currentPacket->entry);
656 #ifdef RXDEBUG_PACKET
657                 call->tqc++;
658 #endif /* RXDEBUG_PACKET */
659 #ifdef RX_TRACK_PACKETS
660                 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
661 #endif
662                 call->app.currentPacket = NULL;
663
664                 /* If the call is in recovery, let it exhaust its current
665                  * retransmit queue before forcing it to send new packets
666                  */
667                 if (!(call->flags & (RX_CALL_FAST_RECOVER))) {
668                     rxi_Start(call, 0);
669                 }
670             } else if (call->app.currentPacket) {
671 #ifdef RX_TRACK_PACKETS
672                 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
673 #endif
674                 rxi_FreePacket(call->app.currentPacket);
675                 call->app.currentPacket = NULL;
676             }
677             /* Wait for transmit window to open up */
678             while (!call->error
679                    && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
680                 clock_NewTime();
681                 call->startWait = clock_Sec();
682
683 #ifdef  RX_ENABLE_LOCKS
684                 CV_WAIT(&call->cv_twind, &call->lock);
685 #else
686                 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
687                 osi_rxSleep(&call->twind);
688 #endif
689
690                 call->startWait = 0;
691 #ifdef RX_ENABLE_LOCKS
692                 if (call->error) {
693                     call->app.mode = RX_MODE_ERROR;
694                     MUTEX_EXIT(&call->lock);
695                     return 0;
696                 }
697 #endif /* RX_ENABLE_LOCKS */
698             }
699             if ((call->app.currentPacket = rxi_AllocSendPacket(call, nbytes))) {
700 #ifdef RX_TRACK_PACKETS
701                 call->app.currentPacket->flags |= RX_PKTFLAG_CP;
702 #endif
703                 call->app.nFree = call->app.currentPacket->length;
704                 call->app.curvec = 1;   /* 0th vec is always header */
705                 /* begin at the beginning [ more or less ], continue
706                  * on until the end, then stop. */
707                 call->app.curpos =
708                     (char *) call->app.currentPacket->wirevec[1].iov_base +
709                     call->conn->securityHeaderSize;
710                 call->app.curlen =
711                     call->app.currentPacket->wirevec[1].iov_len -
712                     call->conn->securityHeaderSize;
713             }
714             if (call->error) {
715                 call->app.mode = RX_MODE_ERROR;
716                 if (call->app.currentPacket) {
717 #ifdef RX_TRACK_PACKETS
718                     call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
719 #endif
720                     rxi_FreePacket(call->app.currentPacket);
721                     call->app.currentPacket = NULL;
722                 }
723                 MUTEX_EXIT(&call->lock);
724                 return 0;
725             }
726             MUTEX_EXIT(&call->lock);
727         }
728
729         if (call->app.currentPacket && (int)call->app.nFree < nbytes) {
730             /* Try to extend the current buffer */
731             int len, mud;
732             len = call->app.currentPacket->length;
733             mud = rx_MaxUserDataSize(call);
734             if (mud > len) {
735                 int want;
736                 want = MIN(nbytes - (int)call->app.nFree, mud - len);
737                 rxi_AllocDataBuf(call->app.currentPacket, want,
738                                  RX_PACKET_CLASS_SEND_CBUF);
739                 if (call->app.currentPacket->length > (unsigned)mud)
740                     call->app.currentPacket->length = mud;
741                 call->app.nFree += (call->app.currentPacket->length - len);
742             }
743         }
744
745         /* If the remaining bytes fit in the buffer, then store them
746          * and return.  Don't ship a buffer that's full immediately to
747          * the peer--we don't know if it's the last buffer yet */
748
749         if (!call->app.currentPacket) {
750             call->app.nFree = 0;
751         }
752
753         while (nbytes && call->app.nFree) {
754
755             t = MIN((int)call->app.curlen, nbytes);
756             t = MIN((int)call->app.nFree, t);
757             memcpy(call->app.curpos, buf, t);
758             buf += t;
759             nbytes -= t;
760             call->app.curpos += t;
761             call->app.curlen -= (u_short)t;
762             call->app.nFree -= (u_short)t;
763
764             if (!call->app.curlen) {
765                 /* need to get another struct iov */
766                 if (++call->app.curvec >= call->app.currentPacket->niovecs) {
767                     /* current packet is full, extend or send it */
768                     call->app.nFree = 0;
769                 } else {
770                     call->app.curpos =
771                         call->app.currentPacket->wirevec[call->app.curvec].iov_base;
772                     call->app.curlen =
773                         call->app.currentPacket->wirevec[call->app.curvec].iov_len;
774                 }
775             }
776         }                       /* while bytes to send and room to send them */
777
778         /* might be out of space now */
779         if (!nbytes) {
780             return requestCount;
781         } else;                 /* more data to send, so get another packet and keep going */
782     } while (nbytes);
783
784     return requestCount - nbytes;
785 }
786
787 int
788 rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
789 {
790     int bytes;
791     int tcurlen;
792     int tnFree;
793     char *tcurpos;
794     SPLVAR;
795
796     /* Free any packets from the last call to ReadvProc/WritevProc */
797     if (!opr_queue_IsEmpty(&call->app.iovq)) {
798 #ifdef RXDEBUG_PACKET
799         call->iovqc -=
800 #endif /* RXDEBUG_PACKET */
801             rxi_FreePackets(0, &call->app.iovq);
802     }
803
804     /*
805      * Most common case: all of the data fits in the current iovec.
806      * We are relying on nFree being zero unless the call is in send mode.
807      */
808     tcurlen = (int)call->app.curlen;
809     tnFree = (int)call->app.nFree;
810     if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
811         tcurpos = call->app.curpos;
812
813         memcpy(tcurpos, buf, nbytes);
814         call->app.curpos = tcurpos + nbytes;
815         call->app.curlen = (u_short)(tcurlen - nbytes);
816         call->app.nFree = (u_short)(tnFree - nbytes);
817         return nbytes;
818     }
819
820     NETPRI;
821     bytes = rxi_WriteProc(call, buf, nbytes);
822     USERPRI;
823     return bytes;
824 }
825
826 /* Optimization for marshalling 32 bit arguments */
827 int
828 rx_WriteProc32(struct rx_call *call, afs_int32 * value)
829 {
830     int bytes;
831     int tcurlen;
832     int tnFree;
833     char *tcurpos;
834     SPLVAR;
835
836     if (!opr_queue_IsEmpty(&call->app.iovq)) {
837 #ifdef RXDEBUG_PACKET
838         call->iovqc -=
839 #endif /* RXDEBUG_PACKET */
840             rxi_FreePackets(0, &call->app.iovq);
841     }
842
843     /*
844      * Most common case: all of the data fits in the current iovec.
845      * We are relying on nFree being zero unless the call is in send mode.
846      */
847     tcurlen = call->app.curlen;
848     tnFree = call->app.nFree;
849     if (!call->error && tcurlen >= sizeof(afs_int32)
850         && tnFree >= sizeof(afs_int32)) {
851         tcurpos = call->app.curpos;
852
853         if (!((size_t)tcurpos & (sizeof(afs_int32) - 1))) {
854             *((afs_int32 *) (tcurpos)) = *value;
855         } else {
856             memcpy(tcurpos, (char *)value, sizeof(afs_int32));
857         }
858         call->app.curpos = tcurpos + sizeof(afs_int32);
859         call->app.curlen = (u_short)(tcurlen - sizeof(afs_int32));
860         call->app.nFree = (u_short)(tnFree - sizeof(afs_int32));
861         return sizeof(afs_int32);
862     }
863
864     NETPRI;
865     bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
866     USERPRI;
867     return bytes;
868 }
869
870 /* rxi_WritevAlloc -- internal version.
871  *
872  * Fill in an iovec to point to data in packet buffers. The application
873  * calls rxi_WritevProc when the buffers are full.
874  *
875  * LOCKS USED -- called at netpri.
876  */
877
878 static int
879 rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
880                 int nbytes)
881 {
882     struct rx_connection *conn = call->conn;
883     struct rx_packet *cp;
884     int requestCount;
885     int nextio;
886     /* Temporary values, real work is done in rxi_WritevProc */
887     int tnFree;
888     unsigned int tcurvec;
889     char *tcurpos;
890     int tcurlen;
891
892     requestCount = nbytes;
893     nextio = 0;
894
895     /* Free any packets from the last call to ReadvProc/WritevProc */
896     if (!opr_queue_IsEmpty(&call->app.iovq)) {
897 #ifdef RXDEBUG_PACKET
898         call->iovqc -=
899 #endif /* RXDEBUG_PACKET */
900             rxi_FreePackets(0, &call->app.iovq);
901     }
902
903     if (call->app.mode != RX_MODE_SENDING) {
904         if ((conn->type == RX_SERVER_CONNECTION)
905             && (call->app.mode == RX_MODE_RECEIVING)) {
906             call->app.mode = RX_MODE_SENDING;
907             if (call->app.currentPacket) {
908 #ifdef RX_TRACK_PACKETS
909                 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
910 #endif
911                 rxi_FreePacket(call->app.currentPacket);
912                 call->app.currentPacket = NULL;
913                 call->app.nLeft = 0;
914                 call->app.nFree = 0;
915             }
916         } else {
917             return 0;
918         }
919     }
920
921     /* Set up the iovec to point to data in packet buffers. */
922     tnFree = call->app.nFree;
923     tcurvec = call->app.curvec;
924     tcurpos = call->app.curpos;
925     tcurlen = call->app.curlen;
926     cp = call->app.currentPacket;
927     do {
928         int t;
929
930         if (tnFree == 0) {
931             /* current packet is full, allocate a new one */
932             MUTEX_ENTER(&call->lock);
933             cp = rxi_AllocSendPacket(call, nbytes);
934             MUTEX_EXIT(&call->lock);
935             if (cp == NULL) {
936                 /* out of space, return what we have */
937                 *nio = nextio;
938                 return requestCount - nbytes;
939             }
940 #ifdef RX_TRACK_PACKETS
941             cp->flags |= RX_PKTFLAG_IOVQ;
942 #endif
943             opr_queue_Append(&call->app.iovq, &cp->entry);
944 #ifdef RXDEBUG_PACKET
945             call->iovqc++;
946 #endif /* RXDEBUG_PACKET */
947             tnFree = cp->length;
948             tcurvec = 1;
949             tcurpos =
950                 (char *)cp->wirevec[1].iov_base +
951                 call->conn->securityHeaderSize;
952             tcurlen = cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
953         }
954
955         if (tnFree < nbytes) {
956             /* try to extend the current packet */
957             int len, mud;
958             len = cp->length;
959             mud = rx_MaxUserDataSize(call);
960             if (mud > len) {
961                 int want;
962                 want = MIN(nbytes - tnFree, mud - len);
963                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
964                 if (cp->length > (unsigned)mud)
965                     cp->length = mud;
966                 tnFree += (cp->length - len);
967                 if (cp == call->app.currentPacket) {
968                     call->app.nFree += (cp->length - len);
969                 }
970             }
971         }
972
973         /* fill in the next entry in the iovec */
974         t = MIN(tcurlen, nbytes);
975         t = MIN(tnFree, t);
976         iov[nextio].iov_base = tcurpos;
977         iov[nextio].iov_len = t;
978         nbytes -= t;
979         tcurpos += t;
980         tcurlen -= t;
981         tnFree -= t;
982         nextio++;
983
984         if (!tcurlen) {
985             /* need to get another struct iov */
986             if (++tcurvec >= cp->niovecs) {
987                 /* current packet is full, extend it or move on to next packet */
988                 tnFree = 0;
989             } else {
990                 tcurpos = (char *)cp->wirevec[tcurvec].iov_base;
991                 tcurlen = cp->wirevec[tcurvec].iov_len;
992             }
993         }
994     } while (nbytes && nextio < maxio);
995     *nio = nextio;
996     return requestCount - nbytes;
997 }
998
999 int
1000 rx_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
1001                int nbytes)
1002 {
1003     int bytes;
1004     SPLVAR;
1005
1006     NETPRI;
1007     bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
1008     USERPRI;
1009     return bytes;
1010 }
1011
1012 /* rxi_WritevProc -- internal version.
1013  *
1014  * Send buffers allocated in rxi_WritevAlloc.
1015  *
1016  * LOCKS USED -- called at netpri.
1017  */
1018 int
1019 rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1020 {
1021 #ifdef RX_TRACK_PACKETS
1022     struct opr_queue *cursor;
1023 #endif
1024     int nextio;
1025     int requestCount;
1026     struct opr_queue tmpq;
1027 #ifdef RXDEBUG_PACKET
1028     u_short tmpqc;
1029 #endif
1030
1031     requestCount = nbytes;
1032     nextio = 0;
1033
1034     MUTEX_ENTER(&call->lock);
1035     if (call->error) {
1036         call->app.mode = RX_MODE_ERROR;
1037     } else if (call->app.mode != RX_MODE_SENDING) {
1038         call->error = RX_PROTOCOL_ERROR;
1039     }
1040 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1041     rxi_WaitforTQBusy(call);
1042 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1043
1044     if (call->error) {
1045         call->app.mode = RX_MODE_ERROR;
1046         MUTEX_EXIT(&call->lock);
1047         if (call->app.currentPacket) {
1048 #ifdef RX_TRACK_PACKETS
1049             call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
1050             call->app.currentPacket->flags |= RX_PKTFLAG_IOVQ;
1051 #endif
1052             opr_queue_Prepend(&call->app.iovq,
1053                               &call->app.currentPacket->entry);
1054 #ifdef RXDEBUG_PACKET
1055             call->iovqc++;
1056 #endif /* RXDEBUG_PACKET */
1057             call->app.currentPacket = NULL;
1058         }
1059 #ifdef RXDEBUG_PACKET
1060         call->iovqc -=
1061 #endif /* RXDEBUG_PACKET */
1062             rxi_FreePackets(0, &call->app.iovq);
1063         return 0;
1064     }
1065
1066     /* Loop through the I/O vector adjusting packet pointers.
1067      * Place full packets back onto the iovq once they are ready
1068      * to send. Set RX_PROTOCOL_ERROR if any problems are found in
1069      * the iovec. We put the loop condition at the end to ensure that
1070      * a zero length write will push a short packet. */
1071     nextio = 0;
1072     opr_queue_Init(&tmpq);
1073 #ifdef RXDEBUG_PACKET
1074     tmpqc = 0;
1075 #endif /* RXDEBUG_PACKET */
1076     do {
1077         if (call->app.nFree == 0 && call->app.currentPacket) {
1078             clock_NewTime();    /* Bogus:  need new time package */
1079             /* The 0, below, specifies that it is not the last packet:
1080              * there will be others. PrepareSendPacket may
1081              * alter the packet length by up to
1082              * conn->securityMaxTrailerSize */
1083             call->bytesSent += call->app.currentPacket->length;
1084             rxi_PrepareSendPacket(call, call->app.currentPacket, 0);
1085 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1086             /* PrepareSendPacket drops the call lock */
1087             rxi_WaitforTQBusy(call);
1088 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1089             opr_queue_Append(&tmpq, &call->app.currentPacket->entry);
1090 #ifdef RXDEBUG_PACKET
1091             tmpqc++;
1092 #endif /* RXDEBUG_PACKET */
1093             call->app.currentPacket = NULL;
1094
1095             /* The head of the iovq is now the current packet */
1096             if (nbytes) {
1097                 if (opr_queue_IsEmpty(&call->app.iovq)) {
1098                     MUTEX_EXIT(&call->lock);
1099                     call->error = RX_PROTOCOL_ERROR;
1100 #ifdef RXDEBUG_PACKET
1101                     tmpqc -=
1102 #endif /* RXDEBUG_PACKET */
1103                         rxi_FreePackets(0, &tmpq);
1104                     return 0;
1105                 }
1106                 call->app.currentPacket
1107                         = opr_queue_First(&call->app.iovq, struct rx_packet,
1108                                           entry);
1109                 opr_queue_Remove(&call->app.currentPacket->entry);
1110 #ifdef RX_TRACK_PACKETS
1111                 call->app.currentPacket->flags &= ~RX_PKTFLAG_IOVQ;
1112                 call->app.currentPacket->flags |= RX_PKTFLAG_CP;
1113 #endif
1114 #ifdef RXDEBUG_PACKET
1115                 call->iovqc--;
1116 #endif /* RXDEBUG_PACKET */
1117                 call->app.nFree = call->app.currentPacket->length;
1118                 call->app.curvec = 1;
1119                 call->app.curpos =
1120                     (char *) call->app.currentPacket->wirevec[1].iov_base +
1121                     call->conn->securityHeaderSize;
1122                 call->app.curlen =
1123                     call->app.currentPacket->wirevec[1].iov_len -
1124                     call->conn->securityHeaderSize;
1125             }
1126         }
1127
1128         if (nbytes) {
1129             /* The next iovec should point to the current position */
1130             if (iov[nextio].iov_base != call->app.curpos
1131                 || iov[nextio].iov_len > (int)call->app.curlen) {
1132                 call->error = RX_PROTOCOL_ERROR;
1133                 MUTEX_EXIT(&call->lock);
1134                 if (call->app.currentPacket) {
1135 #ifdef RX_TRACK_PACKETS
1136                     call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
1137 #endif
1138                     opr_queue_Prepend(&tmpq,
1139                                       &call->app.currentPacket->entry);
1140 #ifdef RXDEBUG_PACKET
1141                     tmpqc++;
1142 #endif /* RXDEBUG_PACKET */
1143                     call->app.currentPacket = NULL;
1144                 }
1145 #ifdef RXDEBUG_PACKET
1146                 tmpqc -=
1147 #endif /* RXDEBUG_PACKET */
1148                     rxi_FreePackets(0, &tmpq);
1149                 return 0;
1150             }
1151             nbytes -= iov[nextio].iov_len;
1152             call->app.curpos += iov[nextio].iov_len;
1153             call->app.curlen -= iov[nextio].iov_len;
1154             call->app.nFree -= iov[nextio].iov_len;
1155             nextio++;
1156             if (call->app.curlen == 0) {
1157                 if (++call->app.curvec > call->app.currentPacket->niovecs) {
1158                     call->app.nFree = 0;
1159                 } else {
1160                     call->app.curpos =
1161                         call->app.currentPacket->wirevec[call->app.curvec].iov_base;
1162                     call->app.curlen =
1163                         call->app.currentPacket->wirevec[call->app.curvec].iov_len;
1164                 }
1165             }
1166         }
1167     } while (nbytes && nextio < nio);
1168
1169     /* Move the packets from the temporary queue onto the transmit queue.
1170      * We may end up with more than call->twind packets on the queue. */
1171
1172 #ifdef RX_TRACK_PACKETS
1173     for (opr_queue_Scan(&tmpq, cursor))
1174     {
1175         struct rx_packet *p = opr_queue_Entry(cursor, struct rx_packet, entry);
1176         p->flags |= RX_PKTFLAG_TQ;
1177     }
1178 #endif
1179     if (call->error)
1180         call->app.mode = RX_MODE_ERROR;
1181
1182     opr_queue_SpliceAppend(&call->tq, &tmpq);
1183
1184     /* If the call is in recovery, let it exhaust its current retransmit
1185      * queue before forcing it to send new packets
1186      */
1187     if (!(call->flags & RX_CALL_FAST_RECOVER)) {
1188         rxi_Start(call, 0);
1189     }
1190
1191     /* Wait for the length of the transmit queue to fall below call->twind */
1192     while (!call->error && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
1193         clock_NewTime();
1194         call->startWait = clock_Sec();
1195 #ifdef  RX_ENABLE_LOCKS
1196         CV_WAIT(&call->cv_twind, &call->lock);
1197 #else
1198         call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
1199         osi_rxSleep(&call->twind);
1200 #endif
1201         call->startWait = 0;
1202     }
1203
1204     if (call->error) {
1205         call->app.mode = RX_MODE_ERROR;
1206         call->app.currentPacket = NULL;
1207         MUTEX_EXIT(&call->lock);
1208         if (call->app.currentPacket) {
1209 #ifdef RX_TRACK_PACKETS
1210             call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
1211 #endif
1212             rxi_FreePacket(call->app.currentPacket);
1213         }
1214         return 0;
1215     }
1216     MUTEX_EXIT(&call->lock);
1217
1218     return requestCount - nbytes;
1219 }
1220
1221 int
1222 rx_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1223 {
1224     int bytes;
1225     SPLVAR;
1226
1227     NETPRI;
1228     bytes = rxi_WritevProc(call, iov, nio, nbytes);
1229     USERPRI;
1230     return bytes;
1231 }
1232
1233 /* Flush any buffered data to the stream, switch to read mode
1234  * (clients) or to EOF mode (servers)
1235  *
1236  * LOCKS HELD: called at netpri.
1237  */
1238 void
1239 rxi_FlushWrite(struct rx_call *call)
1240 {
1241     struct rx_packet *cp = NULL;
1242
1243     /* Free any packets from the last call to ReadvProc/WritevProc */
1244     if (!opr_queue_IsEmpty(&call->app.iovq)) {
1245 #ifdef RXDEBUG_PACKET
1246         call->iovqc -=
1247 #endif /* RXDEBUG_PACKET */
1248             rxi_FreePackets(0, &call->app.iovq);
1249     }
1250
1251     if (call->app.mode == RX_MODE_SENDING) {
1252
1253         call->app.mode =
1254             (call->conn->type ==
1255              RX_CLIENT_CONNECTION ? RX_MODE_RECEIVING : RX_MODE_EOF);
1256
1257 #ifdef RX_KERNEL_TRACE
1258         {
1259             int glockOwner = ISAFS_GLOCK();
1260             if (!glockOwner)
1261                 AFS_GLOCK();
1262             afs_Trace3(afs_iclSetp, CM_TRACE_WASHERE, ICL_TYPE_STRING,
1263                        __FILE__, ICL_TYPE_INT32, __LINE__, ICL_TYPE_POINTER,
1264                        call);
1265             if (!glockOwner)
1266                 AFS_GUNLOCK();
1267         }
1268 #endif
1269
1270         MUTEX_ENTER(&call->lock);
1271         if (call->error)
1272             call->app.mode = RX_MODE_ERROR;
1273
1274         call->flags |= RX_CALL_FLUSH;
1275
1276         cp = call->app.currentPacket;
1277
1278         if (cp) {
1279             /* cp->length is only supposed to be the user's data */
1280             /* cp->length was already set to (then-current)
1281              * MaxUserDataSize or less. */
1282 #ifdef RX_TRACK_PACKETS
1283             cp->flags &= ~RX_PKTFLAG_CP;
1284 #endif
1285             cp->length -= call->app.nFree;
1286             call->app.currentPacket = NULL;
1287             call->app.nFree = 0;
1288         } else {
1289             cp = rxi_AllocSendPacket(call, 0);
1290             if (!cp) {
1291                 /* Mode can no longer be MODE_SENDING */
1292                 return;
1293             }
1294             cp->length = 0;
1295             cp->niovecs = 2;    /* header + space for rxkad stuff */
1296             call->app.nFree = 0;
1297         }
1298
1299         /* The 1 specifies that this is the last packet */
1300         call->bytesSent += cp->length;
1301         rxi_PrepareSendPacket(call, cp, 1);
1302 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1303         /* PrepareSendPacket drops the call lock */
1304         rxi_WaitforTQBusy(call);
1305 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1306 #ifdef RX_TRACK_PACKETS
1307         cp->flags |= RX_PKTFLAG_TQ;
1308 #endif
1309         opr_queue_Append(&call->tq, &cp->entry);
1310 #ifdef RXDEBUG_PACKET
1311         call->tqc++;
1312 #endif /* RXDEBUG_PACKET */
1313
1314         /* If the call is in recovery, let it exhaust its current retransmit
1315          * queue before forcing it to send new packets
1316          */
1317         if (!(call->flags & RX_CALL_FAST_RECOVER)) {
1318             rxi_Start(call, 0);
1319         }
1320         MUTEX_EXIT(&call->lock);
1321     }
1322 }
1323
1324 /* Flush any buffered data to the stream, switch to read mode
1325  * (clients) or to EOF mode (servers) */
1326 void
1327 rx_FlushWrite(struct rx_call *call)
1328 {
1329     SPLVAR;
1330     NETPRI;
1331     rxi_FlushWrite(call);
1332     USERPRI;
1333 }