1d1257c9166a4467f30ac067daa8849e73d1cd92
[openafs.git] / src / rx / rx_rdwr.c
1  /*
2   * Copyright 2000, International Business Machines Corporation and others.
3   * All Rights Reserved.
4   *
5   * This software has been released under the terms of the IBM Public
6   * License.  For details, see the LICENSE file in the top-level source
7   * directory or online at http://www.openafs.org/dl/license10.html
8   */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #ifdef KERNEL
14 # ifndef UKERNEL
15 #  ifdef RX_KERNEL_TRACE
16 #   include "rx_kcommon.h"
17 #  endif
18 #  if defined(AFS_DARWIN_ENV) || defined(AFS_XBSD_ENV)
19 #   include "afs/sysincludes.h"
20 #  else
21 #   include "h/types.h"
22 #   include "h/time.h"
23 #   include "h/stat.h"
24 #   if defined(AFS_AIX_ENV) || defined(AFS_AUX_ENV) || defined(AFS_SUN5_ENV)
25 #    include "h/systm.h"
26 #   endif
27 #   ifdef       AFS_OSF_ENV
28 #    include <net/net_globals.h>
29 #   endif /* AFS_OSF_ENV */
30 #   ifdef AFS_LINUX20_ENV
31 #    include "h/socket.h"
32 #   endif
33 #   include "netinet/in.h"
34 #   if defined(AFS_SGI_ENV)
35 #    include "afs/sysincludes.h"
36 #   endif
37 #  endif
38 #  include "afs/afs_args.h"
39 #  if   (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
40 #   include "h/systm.h"
41 #  endif
42 # else /* !UKERNEL */
43 #  include "afs/sysincludes.h"
44 # endif /* !UKERNEL */
45
46 # ifdef RXDEBUG
47 #  undef RXDEBUG                        /* turn off debugging */
48 # endif /* RXDEBUG */
49
50 # include "afs/afs_osi.h"
51 # include "rx_kmutex.h"
52 # include "rx/rx_kernel.h"
53 # include "afs/lock.h"
54 #else /* KERNEL */
55 # include <roken.h>
56 # include <afs/opr.h>
57 #endif /* KERNEL */
58
59 #include "rx.h"
60 #include "rx_clock.h"
61 #include "rx_globals.h"
62 #include "rx_atomic.h"
63 #include "rx_internal.h"
64 #include "rx_conn.h"
65 #include "rx_call.h"
66 #include "rx_packet.h"
67
68 #ifdef RX_LOCKS_DB
69 /* rxdb_fileID is used to identify the lock location, along with line#. */
70 static int rxdb_fileID = RXDB_FILE_RX_RDWR;
71 #endif /* RX_LOCKS_DB */
72
73 /* Get the next packet in the receive queue
74  *
75  * Dispose of the call's currentPacket, and move the next packet in the
76  * receive queue into the currentPacket field. If the next packet isn't
77  * available, then currentPacket is left NULL.
78  *
79  * @param call
80  *      The RX call to manipulate
81  * @returns
82  *      0 on success, an error code on failure
83  *
84  * @notes
85  *      Must be called with the call locked. Unlocks the call if returning
86  *      with an error.
87  */
88
89 static int
90 rxi_GetNextPacket(struct rx_call *call) {
91     struct rx_packet *rp;
92     int error;
93
94     if (call->app.currentPacket != NULL) {
95 #ifdef RX_TRACK_PACKETS
96         call->app.currentPacket->flags |= RX_PKTFLAG_CP;
97 #endif
98         rxi_FreePacket(call->app.currentPacket);
99         call->app.currentPacket = NULL;
100     }
101
102     if (opr_queue_IsEmpty(&call->rq))
103         return 0;
104
105     /* Check that next packet available is next in sequence */
106     rp = opr_queue_First(&call->rq, struct rx_packet, entry);
107     if (rp->header.seq != call->rnext)
108         return 0;
109
110     opr_queue_Remove(&rp->entry);
111 #ifdef RX_TRACK_PACKETS
112     rp->flags &= ~RX_PKTFLAG_RQ;
113 #endif
114 #ifdef RXDEBUG_PACKET
115     call->rqc--;
116 #endif /* RXDEBUG_PACKET */
117
118     /* RXS_CheckPacket called to undo RXS_PreparePacket's work.  It may
119      * reduce the length of the packet by up to conn->maxTrailerSize,
120      * to reflect the length of the data + the header. */
121     if ((error = RXS_CheckPacket(call->conn->securityObject, call, rp))) {
122         /* Used to merely shut down the call, but now we shut down the whole
123          * connection since this may indicate an attempt to hijack it */
124
125         MUTEX_EXIT(&call->lock);
126         rxi_ConnectionError(call->conn, error);
127         MUTEX_ENTER(&call->conn->conn_data_lock);
128         rp = rxi_SendConnectionAbort(call->conn, rp, 0, 0);
129         MUTEX_EXIT(&call->conn->conn_data_lock);
130         rxi_FreePacket(rp);
131
132         return error;
133      }
134
135     call->rnext++;
136     call->app.currentPacket = rp;
137 #ifdef RX_TRACK_PACKETS
138     call->app.currentPacket->flags |= RX_PKTFLAG_CP;
139 #endif
140     call->app.curvec = 1;       /* 0th vec is always header */
141
142     /* begin at the beginning [ more or less ], continue on until the end,
143      * then stop. */
144     call->app.curpos = (char *)call->app.currentPacket->wirevec[1].iov_base +
145                    call->conn->securityHeaderSize;
146     call->app.curlen = call->app.currentPacket->wirevec[1].iov_len -
147                    call->conn->securityHeaderSize;
148
149     call->app.nLeft = call->app.currentPacket->length;
150     call->app.bytesRcvd += call->app.currentPacket->length;
151
152     call->nHardAcks++;
153
154     return 0;
155 }
156
157 /* rxi_ReadProc -- internal version.
158  *
159  * LOCKS USED -- called at netpri
160  */
161 int
162 rxi_ReadProc(struct rx_call *call, char *buf,
163              int nbytes)
164 {
165     int requestCount;
166     int code;
167     unsigned int t;
168
169 /* XXXX took out clock_NewTime from here.  Was it needed? */
170     requestCount = nbytes;
171
172     /* Free any packets from the last call to ReadvProc/WritevProc */
173     if (!opr_queue_IsEmpty(&call->app.iovq)) {
174 #ifdef RXDEBUG_PACKET
175         call->iovqc -=
176 #endif /* RXDEBUG_PACKET */
177             rxi_FreePackets(0, &call->app.iovq);
178     }
179
180     do {
181         if (call->app.nLeft == 0) {
182             /* Get next packet */
183             MUTEX_ENTER(&call->lock);
184             for (;;) {
185                 if (call->error || (call->app.mode != RX_MODE_RECEIVING)) {
186                     if (call->error) {
187                         call->app.mode = RX_MODE_ERROR;
188                         MUTEX_EXIT(&call->lock);
189                         return 0;
190                     }
191                     if (call->app.mode == RX_MODE_SENDING) {
192                         rxi_FlushWriteLocked(call);
193                         continue;
194                     }
195                 }
196
197                 code = rxi_GetNextPacket(call);
198                 if (code)
199                      return 0;
200
201                 if (call->app.currentPacket) {
202                     if (!(call->flags & RX_CALL_RECEIVE_DONE)) {
203                         if (call->nHardAcks > (u_short) rxi_HardAckRate) {
204                             rxi_CancelDelayedAckEvent(call);
205                             rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
206                         } else {
207                             /* Delay to consolidate ack packets */
208                             rxi_PostDelayedAckEvent(call, &rx_hardAckDelay);
209                         }
210                     }
211                     break;
212                 }
213
214                 /*
215                  * If we reach this point either we have no packets in the
216                  * receive queue or the next packet in the queue is not the
217                  * one we are looking for.  There is nothing else for us to
218                  * do but wait for another packet to arrive.
219                  */
220
221                 /* Are there ever going to be any more packets? */
222                 if (call->flags & RX_CALL_RECEIVE_DONE) {
223                     MUTEX_EXIT(&call->lock);
224                     return requestCount - nbytes;
225                 }
226                 /* Wait for in-sequence packet */
227                 call->flags |= RX_CALL_READER_WAIT;
228                 clock_NewTime();
229                 call->startWait = clock_Sec();
230                 while (call->flags & RX_CALL_READER_WAIT) {
231 #ifdef  RX_ENABLE_LOCKS
232                     CV_WAIT(&call->cv_rq, &call->lock);
233 #else
234                     osi_rxSleep(&call->rq);
235 #endif
236                 }
237
238                 call->startWait = 0;
239 #ifdef RX_ENABLE_LOCKS
240                 if (call->error) {
241                     MUTEX_EXIT(&call->lock);
242                     return 0;
243                 }
244 #endif /* RX_ENABLE_LOCKS */
245             }
246             MUTEX_EXIT(&call->lock);
247         } else
248             /* osi_Assert(cp); */
249             /* MTUXXX  this should be replaced by some error-recovery code before shipping */
250             /* yes, the following block is allowed to be the ELSE clause (or not) */
251             /* It's possible for call->app.nLeft to be smaller than any particular
252              * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
253              * reflects the size of the buffer.  We have to keep track of the
254              * number of bytes read in the length field of the packet struct.  On
255              * the final portion of a received packet, it's almost certain that
256              * call->app.nLeft will be smaller than the final buffer. */
257             while (nbytes && call->app.currentPacket) {
258                 t = MIN((int)call->app.curlen, nbytes);
259                 t = MIN(t, (int)call->app.nLeft);
260                 memcpy(buf, call->app.curpos, t);
261                 buf += t;
262                 nbytes -= t;
263                 call->app.curpos += t;
264                 call->app.curlen -= t;
265                 call->app.nLeft -= t;
266
267                 if (!call->app.nLeft) {
268                     /* out of packet.  Get another one. */
269 #ifdef RX_TRACK_PACKETS
270                     call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
271 #endif
272                     rxi_FreePacket(call->app.currentPacket);
273                     call->app.currentPacket = NULL;
274                 } else if (!call->app.curlen) {
275                     /* need to get another struct iov */
276                     if (++call->app.curvec >= call->app.currentPacket->niovecs) {
277                         /* current packet is exhausted, get ready for another */
278                         /* don't worry about curvec and stuff, they get set somewhere else */
279 #ifdef RX_TRACK_PACKETS
280                         call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
281 #endif
282                         rxi_FreePacket(call->app.currentPacket);
283                         call->app.currentPacket = NULL;
284                         call->app.nLeft = 0;
285                     } else {
286                         call->app.curpos =
287                             call->app.currentPacket->wirevec[call->app.curvec].iov_base;
288                         call->app.curlen =
289                             call->app.currentPacket->wirevec[call->app.curvec].iov_len;
290                     }
291                 }
292             }
293         if (!nbytes) {
294             /* user buffer is full, return */
295             return requestCount;
296         }
297
298     } while (nbytes);
299
300     return requestCount;
301 }
302
303 int
304 rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
305 {
306     int bytes;
307     SPLVAR;
308
309     /* Free any packets from the last call to ReadvProc/WritevProc */
310     if (!opr_queue_IsEmpty(&call->app.iovq)) {
311 #ifdef RXDEBUG_PACKET
312         call->iovqc -=
313 #endif /* RXDEBUG_PACKET */
314             rxi_FreePackets(0, &call->app.iovq);
315     }
316
317     /*
318      * Most common case, all of the data is in the current iovec.
319      * We are relying on nLeft being zero unless the call is in receive mode.
320      */
321     if (!call->error && call->app.curlen > nbytes && call->app.nLeft > nbytes) {
322         memcpy(buf, call->app.curpos, nbytes);
323
324         call->app.curpos += nbytes;
325         call->app.curlen -= nbytes;
326         call->app.nLeft  -= nbytes;
327
328         if (!call->app.nLeft && call->app.currentPacket != NULL) {
329             /* out of packet.  Get another one. */
330             rxi_FreePacket(call->app.currentPacket);
331             call->app.currentPacket = NULL;
332         }
333         return nbytes;
334     }
335
336     NETPRI;
337     bytes = rxi_ReadProc(call, buf, nbytes);
338     USERPRI;
339     return bytes;
340 }
341
342 /* Optimization for unmarshalling 32 bit integers */
343 int
344 rx_ReadProc32(struct rx_call *call, afs_int32 * value)
345 {
346     int bytes;
347     SPLVAR;
348
349     /* Free any packets from the last call to ReadvProc/WritevProc */
350     if (!opr_queue_IsEmpty(&call->app.iovq)) {
351 #ifdef RXDEBUG_PACKET
352         call->iovqc -=
353 #endif /* RXDEBUG_PACKET */
354             rxi_FreePackets(0, &call->app.iovq);
355     }
356
357     /*
358      * Most common case, all of the data is in the current iovec.
359      * We are relying on nLeft being zero unless the call is in receive mode.
360      */
361     if (!call->error && call->app.curlen >= sizeof(afs_int32)
362         && call->app.nLeft >= sizeof(afs_int32)) {
363
364         memcpy((char *)value, call->app.curpos, sizeof(afs_int32));
365
366         call->app.curpos += sizeof(afs_int32);
367         call->app.curlen -= sizeof(afs_int32);
368         call->app.nLeft  -= sizeof(afs_int32);
369
370         if (!call->app.nLeft && call->app.currentPacket != NULL) {
371             /* out of packet.  Get another one. */
372             rxi_FreePacket(call->app.currentPacket);
373             call->app.currentPacket = NULL;
374         }
375         return sizeof(afs_int32);
376     }
377
378     NETPRI;
379     bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
380     USERPRI;
381
382     return bytes;
383 }
384
385 /* rxi_FillReadVec
386  *
387  * Uses packets in the receive queue to fill in as much of the
388  * current iovec as possible. Does not block if it runs out
389  * of packets to complete the iovec. Return true if an ack packet
390  * was sent, otherwise return false */
391 int
392 rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
393 {
394     int didConsume = 0;
395     int didHardAck = 0;
396     int code;
397     unsigned int t;
398     struct iovec *call_iov;
399     struct iovec *cur_iov = NULL;
400
401     if (call->app.currentPacket) {
402         cur_iov = &call->app.currentPacket->wirevec[call->app.curvec];
403     }
404     call_iov = &call->iov[call->iovNext];
405
406     while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) {
407         if (call->app.nLeft == 0) {
408             /* Get next packet */
409             code = rxi_GetNextPacket(call);
410             if (code) {
411                 MUTEX_ENTER(&call->lock);
412                 return 1;
413             }
414
415             if (call->app.currentPacket) {
416                 cur_iov = &call->app.currentPacket->wirevec[1];
417                 didConsume = 1;
418                 continue;
419             } else {
420                 break;
421             }
422         }
423
424         /* It's possible for call->app.nLeft to be smaller than any particular
425          * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
426          * reflects the size of the buffer.  We have to keep track of the
427          * number of bytes read in the length field of the packet struct.  On
428          * the final portion of a received packet, it's almost certain that
429          * call->app.nLeft will be smaller than the final buffer. */
430         while (call->iovNBytes
431                && call->iovNext < call->iovMax
432                && call->app.currentPacket) {
433
434             t = MIN((int)call->app.curlen, call->iovNBytes);
435             t = MIN(t, (int)call->app.nLeft);
436             call_iov->iov_base = call->app.curpos;
437             call_iov->iov_len = t;
438             call_iov++;
439             call->iovNext++;
440             call->iovNBytes -= t;
441             call->app.curpos += t;
442             call->app.curlen -= t;
443             call->app.nLeft -= t;
444
445             if (!call->app.nLeft) {
446                 /* out of packet.  Get another one. */
447 #ifdef RX_TRACK_PACKETS
448                 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
449                 call->app.currentPacket->flags |= RX_PKTFLAG_IOVQ;
450 #endif
451                 opr_queue_Append(&call->app.iovq,
452                                  &call->app.currentPacket->entry);
453 #ifdef RXDEBUG_PACKET
454                 call->iovqc++;
455 #endif /* RXDEBUG_PACKET */
456                 call->app.currentPacket = NULL;
457             } else if (!call->app.curlen) {
458                 /* need to get another struct iov */
459                 if (++call->app.curvec >= call->app.currentPacket->niovecs) {
460                     /* current packet is exhausted, get ready for another */
461                     /* don't worry about curvec and stuff, they get set somewhere else */
462 #ifdef RX_TRACK_PACKETS
463                     call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
464                     call->app.currentPacket->flags |= RX_PKTFLAG_IOVQ;
465 #endif
466                     opr_queue_Append(&call->app.iovq,
467                                      &call->app.currentPacket->entry);
468 #ifdef RXDEBUG_PACKET
469                     call->iovqc++;
470 #endif /* RXDEBUG_PACKET */
471                     call->app.currentPacket = NULL;
472                     call->app.nLeft = 0;
473                 } else {
474                     cur_iov++;
475                     call->app.curpos = (char *)cur_iov->iov_base;
476                     call->app.curlen = cur_iov->iov_len;
477                 }
478             }
479         }
480     }
481
482     /* If we consumed any packets then check whether we need to
483      * send a hard ack. */
484     if (didConsume && (!(call->flags & RX_CALL_RECEIVE_DONE))) {
485         if (call->nHardAcks > (u_short) rxi_HardAckRate) {
486             rxi_CancelDelayedAckEvent(call);
487             rxi_SendAck(call, 0, serial, RX_ACK_DELAY, 0);
488             didHardAck = 1;
489         } else {
490             /* Delay to consolidate ack packets */
491             rxi_PostDelayedAckEvent(call, &rx_hardAckDelay);
492         }
493     }
494     return didHardAck;
495 }
496
497
498 /* rxi_ReadvProc -- internal version.
499  *
500  * Fills in an iovec with pointers to the packet buffers. All packets
501  * except the last packet (new current packet) are moved to the iovq
502  * while the application is processing the data.
503  *
504  * LOCKS USED -- called at netpri.
505  */
506 int
507 rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
508               int nbytes)
509 {
510     int bytes;
511
512     /* Free any packets from the last call to ReadvProc/WritevProc */
513     if (!opr_queue_IsEmpty(&call->app.iovq)) {
514 #ifdef RXDEBUG_PACKET
515         call->iovqc -=
516 #endif /* RXDEBUG_PACKET */
517             rxi_FreePackets(0, &call->app.iovq);
518     }
519
520     if (call->app.mode == RX_MODE_SENDING) {
521         rxi_FlushWrite(call);
522     }
523
524     MUTEX_ENTER(&call->lock);
525     if (call->error)
526         goto error;
527
528     /* Get whatever data is currently available in the receive queue.
529      * If rxi_FillReadVec sends an ack packet then it is possible
530      * that we will receive more data while we drop the call lock
531      * to send the packet. Set the RX_CALL_IOVEC_WAIT flag
532      * here to avoid a race with the receive thread if we send
533      * hard acks in rxi_FillReadVec. */
534     call->flags |= RX_CALL_IOVEC_WAIT;
535     call->iovNBytes = nbytes;
536     call->iovMax = maxio;
537     call->iovNext = 0;
538     call->iov = iov;
539     rxi_FillReadVec(call, 0);
540
541     /* if we need more data then sleep until the receive thread has
542      * filled in the rest. */
543     if (!call->error && call->iovNBytes && call->iovNext < call->iovMax
544         && !(call->flags & RX_CALL_RECEIVE_DONE)) {
545         call->flags |= RX_CALL_READER_WAIT;
546         clock_NewTime();
547         call->startWait = clock_Sec();
548         while (call->flags & RX_CALL_READER_WAIT) {
549 #ifdef  RX_ENABLE_LOCKS
550             CV_WAIT(&call->cv_rq, &call->lock);
551 #else
552             osi_rxSleep(&call->rq);
553 #endif
554         }
555         call->startWait = 0;
556     }
557     call->flags &= ~RX_CALL_IOVEC_WAIT;
558
559     if (call->error)
560         goto error;
561
562     call->iov = NULL;
563     *nio = call->iovNext;
564     bytes = nbytes - call->iovNBytes;
565     MUTEX_EXIT(&call->lock);
566     return bytes;
567
568   error:
569     MUTEX_EXIT(&call->lock);
570     call->app.mode = RX_MODE_ERROR;
571     return 0;
572 }
573
574 int
575 rx_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
576              int nbytes)
577 {
578     int bytes;
579     SPLVAR;
580
581     NETPRI;
582     bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
583     USERPRI;
584     return bytes;
585 }
586
587 /* rxi_WriteProc -- internal version.
588  *
589  * LOCKS USED -- called at netpri
590  */
591
592 int
593 rxi_WriteProc(struct rx_call *call, char *buf,
594               int nbytes)
595 {
596     struct rx_connection *conn = call->conn;
597     unsigned int t;
598     int requestCount = nbytes;
599
600     /* Free any packets from the last call to ReadvProc/WritevProc */
601     if (!opr_queue_IsEmpty(&call->app.iovq)) {
602 #ifdef RXDEBUG_PACKET
603         call->iovqc -=
604 #endif /* RXDEBUG_PACKET */
605             rxi_FreePackets(0, &call->app.iovq);
606     }
607
608     if (call->app.mode != RX_MODE_SENDING) {
609         if ((conn->type == RX_SERVER_CONNECTION)
610             && (call->app.mode == RX_MODE_RECEIVING)) {
611             call->app.mode = RX_MODE_SENDING;
612             if (call->app.currentPacket) {
613 #ifdef RX_TRACK_PACKETS
614                 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
615 #endif
616                 rxi_FreePacket(call->app.currentPacket);
617                 call->app.currentPacket = NULL;
618                 call->app.nLeft = 0;
619                 call->app.nFree = 0;
620             }
621         } else {
622             return 0;
623         }
624     }
625
626     /* Loop condition is checked at end, so that a write of 0 bytes
627      * will force a packet to be created--specially for the case where
628      * there are 0 bytes on the stream, but we must send a packet
629      * anyway. */
630     do {
631         if (call->app.nFree == 0) {
632             MUTEX_ENTER(&call->lock);
633             if (call->error)
634                 call->app.mode = RX_MODE_ERROR;
635             if (!call->error && call->app.currentPacket) {
636                 clock_NewTime();        /* Bogus:  need new time package */
637                 /* The 0, below, specifies that it is not the last packet:
638                  * there will be others. PrepareSendPacket may
639                  * alter the packet length by up to
640                  * conn->securityMaxTrailerSize */
641                 call->app.bytesSent += call->app.currentPacket->length;
642                 rxi_PrepareSendPacket(call, call->app.currentPacket, 0);
643                 /* PrepareSendPacket drops the call lock */
644                 rxi_WaitforTQBusy(call);
645 #ifdef RX_TRACK_PACKETS
646                 call->app.currentPacket->flags |= RX_PKTFLAG_TQ;
647 #endif
648                 opr_queue_Append(&call->tq,
649                                  &call->app.currentPacket->entry);
650 #ifdef RXDEBUG_PACKET
651                 call->tqc++;
652 #endif /* RXDEBUG_PACKET */
653 #ifdef RX_TRACK_PACKETS
654                 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
655 #endif
656                 call->app.currentPacket = NULL;
657
658                 /* If the call is in recovery, let it exhaust its current
659                  * retransmit queue before forcing it to send new packets
660                  */
661                 if (!(call->flags & (RX_CALL_FAST_RECOVER))) {
662                     rxi_Start(call, 0);
663                 }
664             } else if (call->app.currentPacket) {
665 #ifdef RX_TRACK_PACKETS
666                 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
667 #endif
668                 rxi_FreePacket(call->app.currentPacket);
669                 call->app.currentPacket = NULL;
670             }
671             /* Wait for transmit window to open up */
672             while (!call->error
673                    && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
674                 clock_NewTime();
675                 call->startWait = clock_Sec();
676
677 #ifdef  RX_ENABLE_LOCKS
678                 CV_WAIT(&call->cv_twind, &call->lock);
679 #else
680                 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
681                 osi_rxSleep(&call->twind);
682 #endif
683
684                 call->startWait = 0;
685 #ifdef RX_ENABLE_LOCKS
686                 if (call->error) {
687                     call->app.mode = RX_MODE_ERROR;
688                     MUTEX_EXIT(&call->lock);
689                     return 0;
690                 }
691 #endif /* RX_ENABLE_LOCKS */
692             }
693             if ((call->app.currentPacket = rxi_AllocSendPacket(call, nbytes))) {
694 #ifdef RX_TRACK_PACKETS
695                 call->app.currentPacket->flags |= RX_PKTFLAG_CP;
696 #endif
697                 call->app.nFree = call->app.currentPacket->length;
698                 call->app.curvec = 1;   /* 0th vec is always header */
699                 /* begin at the beginning [ more or less ], continue
700                  * on until the end, then stop. */
701                 call->app.curpos =
702                     (char *) call->app.currentPacket->wirevec[1].iov_base +
703                     call->conn->securityHeaderSize;
704                 call->app.curlen =
705                     call->app.currentPacket->wirevec[1].iov_len -
706                     call->conn->securityHeaderSize;
707             }
708             if (call->error) {
709                 call->app.mode = RX_MODE_ERROR;
710                 if (call->app.currentPacket) {
711 #ifdef RX_TRACK_PACKETS
712                     call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
713 #endif
714                     rxi_FreePacket(call->app.currentPacket);
715                     call->app.currentPacket = NULL;
716                 }
717                 MUTEX_EXIT(&call->lock);
718                 return 0;
719             }
720             MUTEX_EXIT(&call->lock);
721         }
722
723         if (call->app.currentPacket && (int)call->app.nFree < nbytes) {
724             /* Try to extend the current buffer */
725             int len, mud;
726             len = call->app.currentPacket->length;
727             mud = rx_MaxUserDataSize(call);
728             if (mud > len) {
729                 int want;
730                 want = MIN(nbytes - (int)call->app.nFree, mud - len);
731                 rxi_AllocDataBuf(call->app.currentPacket, want,
732                                  RX_PACKET_CLASS_SEND_CBUF);
733                 if (call->app.currentPacket->length > (unsigned)mud)
734                     call->app.currentPacket->length = mud;
735                 call->app.nFree += (call->app.currentPacket->length - len);
736             }
737         }
738
739         /* If the remaining bytes fit in the buffer, then store them
740          * and return.  Don't ship a buffer that's full immediately to
741          * the peer--we don't know if it's the last buffer yet */
742
743         if (!call->app.currentPacket) {
744             call->app.nFree = 0;
745         }
746
747         while (nbytes && call->app.nFree) {
748
749             t = MIN((int)call->app.curlen, nbytes);
750             t = MIN((int)call->app.nFree, t);
751             memcpy(call->app.curpos, buf, t);
752             buf += t;
753             nbytes -= t;
754             call->app.curpos += t;
755             call->app.curlen -= (u_short)t;
756             call->app.nFree -= (u_short)t;
757
758             if (!call->app.curlen) {
759                 /* need to get another struct iov */
760                 if (++call->app.curvec >= call->app.currentPacket->niovecs) {
761                     /* current packet is full, extend or send it */
762                     call->app.nFree = 0;
763                 } else {
764                     call->app.curpos =
765                         call->app.currentPacket->wirevec[call->app.curvec].iov_base;
766                     call->app.curlen =
767                         call->app.currentPacket->wirevec[call->app.curvec].iov_len;
768                 }
769             }
770         }                       /* while bytes to send and room to send them */
771
772         /* might be out of space now */
773         if (!nbytes) {
774             return requestCount;
775         } else {
776             /* more data to send, so get another packet and keep going */
777         }
778     } while (nbytes);
779
780     return requestCount - nbytes;
781 }
782
783 int
784 rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
785 {
786     int bytes;
787     int tcurlen;
788     int tnFree;
789     char *tcurpos;
790     SPLVAR;
791
792     /* Free any packets from the last call to ReadvProc/WritevProc */
793     if (!opr_queue_IsEmpty(&call->app.iovq)) {
794 #ifdef RXDEBUG_PACKET
795         call->iovqc -=
796 #endif /* RXDEBUG_PACKET */
797             rxi_FreePackets(0, &call->app.iovq);
798     }
799
800     /*
801      * Most common case: all of the data fits in the current iovec.
802      * We are relying on nFree being zero unless the call is in send mode.
803      */
804     tcurlen = (int)call->app.curlen;
805     tnFree = (int)call->app.nFree;
806     if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
807         tcurpos = call->app.curpos;
808
809         memcpy(tcurpos, buf, nbytes);
810         call->app.curpos = tcurpos + nbytes;
811         call->app.curlen = (u_short)(tcurlen - nbytes);
812         call->app.nFree = (u_short)(tnFree - nbytes);
813         return nbytes;
814     }
815
816     NETPRI;
817     bytes = rxi_WriteProc(call, buf, nbytes);
818     USERPRI;
819     return bytes;
820 }
821
822 /* Optimization for marshalling 32 bit arguments */
823 int
824 rx_WriteProc32(struct rx_call *call, afs_int32 * value)
825 {
826     int bytes;
827     int tcurlen;
828     int tnFree;
829     char *tcurpos;
830     SPLVAR;
831
832     if (!opr_queue_IsEmpty(&call->app.iovq)) {
833 #ifdef RXDEBUG_PACKET
834         call->iovqc -=
835 #endif /* RXDEBUG_PACKET */
836             rxi_FreePackets(0, &call->app.iovq);
837     }
838
839     /*
840      * Most common case: all of the data fits in the current iovec.
841      * We are relying on nFree being zero unless the call is in send mode.
842      */
843     tcurlen = call->app.curlen;
844     tnFree = call->app.nFree;
845     if (!call->error && tcurlen >= sizeof(afs_int32)
846         && tnFree >= sizeof(afs_int32)) {
847         tcurpos = call->app.curpos;
848
849         if (!((size_t)tcurpos & (sizeof(afs_int32) - 1))) {
850             *((afs_int32 *) (tcurpos)) = *value;
851         } else {
852             memcpy(tcurpos, (char *)value, sizeof(afs_int32));
853         }
854         call->app.curpos = tcurpos + sizeof(afs_int32);
855         call->app.curlen = (u_short)(tcurlen - sizeof(afs_int32));
856         call->app.nFree = (u_short)(tnFree - sizeof(afs_int32));
857         return sizeof(afs_int32);
858     }
859
860     NETPRI;
861     bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
862     USERPRI;
863     return bytes;
864 }
865
866 /* rxi_WritevAlloc -- internal version.
867  *
868  * Fill in an iovec to point to data in packet buffers. The application
869  * calls rxi_WritevProc when the buffers are full.
870  *
871  * LOCKS USED -- called at netpri.
872  */
873
874 static int
875 rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
876                 int nbytes)
877 {
878     struct rx_connection *conn = call->conn;
879     struct rx_packet *cp;
880     int requestCount;
881     int nextio;
882     /* Temporary values, real work is done in rxi_WritevProc */
883     int tnFree;
884     unsigned int tcurvec;
885     char *tcurpos;
886     int tcurlen;
887
888     requestCount = nbytes;
889     nextio = 0;
890
891     /* Free any packets from the last call to ReadvProc/WritevProc */
892     if (!opr_queue_IsEmpty(&call->app.iovq)) {
893 #ifdef RXDEBUG_PACKET
894         call->iovqc -=
895 #endif /* RXDEBUG_PACKET */
896             rxi_FreePackets(0, &call->app.iovq);
897     }
898
899     if (call->app.mode != RX_MODE_SENDING) {
900         if ((conn->type == RX_SERVER_CONNECTION)
901             && (call->app.mode == RX_MODE_RECEIVING)) {
902             call->app.mode = RX_MODE_SENDING;
903             if (call->app.currentPacket) {
904 #ifdef RX_TRACK_PACKETS
905                 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
906 #endif
907                 rxi_FreePacket(call->app.currentPacket);
908                 call->app.currentPacket = NULL;
909                 call->app.nLeft = 0;
910                 call->app.nFree = 0;
911             }
912         } else {
913             return 0;
914         }
915     }
916
917     /* Set up the iovec to point to data in packet buffers. */
918     tnFree = call->app.nFree;
919     tcurvec = call->app.curvec;
920     tcurpos = call->app.curpos;
921     tcurlen = call->app.curlen;
922     cp = call->app.currentPacket;
923     do {
924         int t;
925
926         if (tnFree == 0) {
927             /* current packet is full, allocate a new one */
928             MUTEX_ENTER(&call->lock);
929             cp = rxi_AllocSendPacket(call, nbytes);
930             MUTEX_EXIT(&call->lock);
931             if (cp == NULL) {
932                 /* out of space, return what we have */
933                 *nio = nextio;
934                 return requestCount - nbytes;
935             }
936 #ifdef RX_TRACK_PACKETS
937             cp->flags |= RX_PKTFLAG_IOVQ;
938 #endif
939             opr_queue_Append(&call->app.iovq, &cp->entry);
940 #ifdef RXDEBUG_PACKET
941             call->iovqc++;
942 #endif /* RXDEBUG_PACKET */
943             tnFree = cp->length;
944             tcurvec = 1;
945             tcurpos =
946                 (char *)cp->wirevec[1].iov_base +
947                 call->conn->securityHeaderSize;
948             tcurlen = cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
949         }
950
951         if (tnFree < nbytes) {
952             /* try to extend the current packet */
953             int len, mud;
954             len = cp->length;
955             mud = rx_MaxUserDataSize(call);
956             if (mud > len) {
957                 int want;
958                 want = MIN(nbytes - tnFree, mud - len);
959                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
960                 if (cp->length > (unsigned)mud)
961                     cp->length = mud;
962                 tnFree += (cp->length - len);
963                 if (cp == call->app.currentPacket) {
964                     call->app.nFree += (cp->length - len);
965                 }
966             }
967         }
968
969         /* fill in the next entry in the iovec */
970         t = MIN(tcurlen, nbytes);
971         t = MIN(tnFree, t);
972         iov[nextio].iov_base = tcurpos;
973         iov[nextio].iov_len = t;
974         nbytes -= t;
975         tcurpos += t;
976         tcurlen -= t;
977         tnFree -= t;
978         nextio++;
979
980         if (!tcurlen) {
981             /* need to get another struct iov */
982             if (++tcurvec >= cp->niovecs) {
983                 /* current packet is full, extend it or move on to next packet */
984                 tnFree = 0;
985             } else {
986                 tcurpos = (char *)cp->wirevec[tcurvec].iov_base;
987                 tcurlen = cp->wirevec[tcurvec].iov_len;
988             }
989         }
990     } while (nbytes && nextio < maxio);
991     *nio = nextio;
992     return requestCount - nbytes;
993 }
994
995 int
996 rx_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
997                int nbytes)
998 {
999     int bytes;
1000     SPLVAR;
1001
1002     NETPRI;
1003     bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
1004     USERPRI;
1005     return bytes;
1006 }
1007
1008 /* rxi_WritevProc -- internal version.
1009  *
1010  * Send buffers allocated in rxi_WritevAlloc.
1011  *
1012  * LOCKS USED -- called at netpri.
1013  */
1014 int
1015 rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1016 {
1017 #ifdef RX_TRACK_PACKETS
1018     struct opr_queue *cursor;
1019 #endif
1020     int nextio;
1021     int requestCount;
1022     struct opr_queue tmpq;
1023 #ifdef RXDEBUG_PACKET
1024     u_short tmpqc;
1025 #endif
1026
1027     requestCount = nbytes;
1028     nextio = 0;
1029
1030     MUTEX_ENTER(&call->lock);
1031     if (call->error) {
1032         call->app.mode = RX_MODE_ERROR;
1033     } else if (call->app.mode != RX_MODE_SENDING) {
1034         call->error = RX_PROTOCOL_ERROR;
1035     }
1036     rxi_WaitforTQBusy(call);
1037
1038     if (call->error) {
1039         call->app.mode = RX_MODE_ERROR;
1040         MUTEX_EXIT(&call->lock);
1041         if (call->app.currentPacket) {
1042 #ifdef RX_TRACK_PACKETS
1043             call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
1044             call->app.currentPacket->flags |= RX_PKTFLAG_IOVQ;
1045 #endif
1046             opr_queue_Prepend(&call->app.iovq,
1047                               &call->app.currentPacket->entry);
1048 #ifdef RXDEBUG_PACKET
1049             call->iovqc++;
1050 #endif /* RXDEBUG_PACKET */
1051             call->app.currentPacket = NULL;
1052         }
1053 #ifdef RXDEBUG_PACKET
1054         call->iovqc -=
1055 #endif /* RXDEBUG_PACKET */
1056             rxi_FreePackets(0, &call->app.iovq);
1057         return 0;
1058     }
1059
1060     /* Loop through the I/O vector adjusting packet pointers.
1061      * Place full packets back onto the iovq once they are ready
1062      * to send. Set RX_PROTOCOL_ERROR if any problems are found in
1063      * the iovec. We put the loop condition at the end to ensure that
1064      * a zero length write will push a short packet. */
1065     nextio = 0;
1066     opr_queue_Init(&tmpq);
1067 #ifdef RXDEBUG_PACKET
1068     tmpqc = 0;
1069 #endif /* RXDEBUG_PACKET */
1070     do {
1071         if (call->app.nFree == 0 && call->app.currentPacket) {
1072             clock_NewTime();    /* Bogus:  need new time package */
1073             /* The 0, below, specifies that it is not the last packet:
1074              * there will be others. PrepareSendPacket may
1075              * alter the packet length by up to
1076              * conn->securityMaxTrailerSize */
1077             call->app.bytesSent += call->app.currentPacket->length;
1078             rxi_PrepareSendPacket(call, call->app.currentPacket, 0);
1079             /* PrepareSendPacket drops the call lock */
1080             rxi_WaitforTQBusy(call);
1081             opr_queue_Append(&tmpq, &call->app.currentPacket->entry);
1082 #ifdef RXDEBUG_PACKET
1083             tmpqc++;
1084 #endif /* RXDEBUG_PACKET */
1085             call->app.currentPacket = NULL;
1086
1087             /* The head of the iovq is now the current packet */
1088             if (nbytes) {
1089                 if (opr_queue_IsEmpty(&call->app.iovq)) {
1090                     MUTEX_EXIT(&call->lock);
1091                     call->error = RX_PROTOCOL_ERROR;
1092 #ifdef RXDEBUG_PACKET
1093                     tmpqc -=
1094 #endif /* RXDEBUG_PACKET */
1095                         rxi_FreePackets(0, &tmpq);
1096                     return 0;
1097                 }
1098                 call->app.currentPacket
1099                         = opr_queue_First(&call->app.iovq, struct rx_packet,
1100                                           entry);
1101                 opr_queue_Remove(&call->app.currentPacket->entry);
1102 #ifdef RX_TRACK_PACKETS
1103                 call->app.currentPacket->flags &= ~RX_PKTFLAG_IOVQ;
1104                 call->app.currentPacket->flags |= RX_PKTFLAG_CP;
1105 #endif
1106 #ifdef RXDEBUG_PACKET
1107                 call->iovqc--;
1108 #endif /* RXDEBUG_PACKET */
1109                 call->app.nFree = call->app.currentPacket->length;
1110                 call->app.curvec = 1;
1111                 call->app.curpos =
1112                     (char *) call->app.currentPacket->wirevec[1].iov_base +
1113                     call->conn->securityHeaderSize;
1114                 call->app.curlen =
1115                     call->app.currentPacket->wirevec[1].iov_len -
1116                     call->conn->securityHeaderSize;
1117             }
1118         }
1119
1120         if (nbytes) {
1121             /* The next iovec should point to the current position */
1122             if (iov[nextio].iov_base != call->app.curpos
1123                 || iov[nextio].iov_len > (int)call->app.curlen) {
1124                 call->error = RX_PROTOCOL_ERROR;
1125                 MUTEX_EXIT(&call->lock);
1126                 if (call->app.currentPacket) {
1127 #ifdef RX_TRACK_PACKETS
1128                     call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
1129 #endif
1130                     opr_queue_Prepend(&tmpq,
1131                                       &call->app.currentPacket->entry);
1132 #ifdef RXDEBUG_PACKET
1133                     tmpqc++;
1134 #endif /* RXDEBUG_PACKET */
1135                     call->app.currentPacket = NULL;
1136                 }
1137 #ifdef RXDEBUG_PACKET
1138                 tmpqc -=
1139 #endif /* RXDEBUG_PACKET */
1140                     rxi_FreePackets(0, &tmpq);
1141                 return 0;
1142             }
1143             nbytes -= iov[nextio].iov_len;
1144             call->app.curpos += iov[nextio].iov_len;
1145             call->app.curlen -= iov[nextio].iov_len;
1146             call->app.nFree -= iov[nextio].iov_len;
1147             nextio++;
1148             if (call->app.curlen == 0) {
1149                 if (++call->app.curvec > call->app.currentPacket->niovecs) {
1150                     call->app.nFree = 0;
1151                 } else {
1152                     call->app.curpos =
1153                         call->app.currentPacket->wirevec[call->app.curvec].iov_base;
1154                     call->app.curlen =
1155                         call->app.currentPacket->wirevec[call->app.curvec].iov_len;
1156                 }
1157             }
1158         }
1159     } while (nbytes && nextio < nio);
1160
1161     /* Move the packets from the temporary queue onto the transmit queue.
1162      * We may end up with more than call->twind packets on the queue. */
1163
1164 #ifdef RX_TRACK_PACKETS
1165     for (opr_queue_Scan(&tmpq, cursor))
1166     {
1167         struct rx_packet *p = opr_queue_Entry(cursor, struct rx_packet, entry);
1168         p->flags |= RX_PKTFLAG_TQ;
1169     }
1170 #endif
1171     if (call->error)
1172         call->app.mode = RX_MODE_ERROR;
1173
1174     opr_queue_SpliceAppend(&call->tq, &tmpq);
1175
1176     /* If the call is in recovery, let it exhaust its current retransmit
1177      * queue before forcing it to send new packets
1178      */
1179     if (!(call->flags & RX_CALL_FAST_RECOVER)) {
1180         rxi_Start(call, 0);
1181     }
1182
1183     /* Wait for the length of the transmit queue to fall below call->twind */
1184     while (!call->error && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
1185         clock_NewTime();
1186         call->startWait = clock_Sec();
1187 #ifdef  RX_ENABLE_LOCKS
1188         CV_WAIT(&call->cv_twind, &call->lock);
1189 #else
1190         call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
1191         osi_rxSleep(&call->twind);
1192 #endif
1193         call->startWait = 0;
1194     }
1195
1196     if (call->error) {
1197         call->app.mode = RX_MODE_ERROR;
1198         call->app.currentPacket = NULL;
1199         MUTEX_EXIT(&call->lock);
1200         if (call->app.currentPacket) {
1201 #ifdef RX_TRACK_PACKETS
1202             call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
1203 #endif
1204             rxi_FreePacket(call->app.currentPacket);
1205         }
1206         return 0;
1207     }
1208     MUTEX_EXIT(&call->lock);
1209
1210     return requestCount - nbytes;
1211 }
1212
1213 int
1214 rx_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1215 {
1216     int bytes;
1217     SPLVAR;
1218
1219     NETPRI;
1220     bytes = rxi_WritevProc(call, iov, nio, nbytes);
1221     USERPRI;
1222     return bytes;
1223 }
1224
1225 /* Flush any buffered data to the stream, switch to read mode
1226  * (clients) or to EOF mode (servers). If 'locked' is nonzero, call->lock must
1227  * be already held.
1228  *
1229  * LOCKS HELD: called at netpri.
1230  */
1231 static void
1232 FlushWrite(struct rx_call *call, int locked)
1233 {
1234     struct rx_packet *cp = NULL;
1235
1236     /* Free any packets from the last call to ReadvProc/WritevProc */
1237     if (!opr_queue_IsEmpty(&call->app.iovq)) {
1238 #ifdef RXDEBUG_PACKET
1239         call->iovqc -=
1240 #endif /* RXDEBUG_PACKET */
1241             rxi_FreePackets(0, &call->app.iovq);
1242     }
1243
1244     if (call->app.mode == RX_MODE_SENDING) {
1245
1246         call->app.mode =
1247             (call->conn->type ==
1248              RX_CLIENT_CONNECTION ? RX_MODE_RECEIVING : RX_MODE_EOF);
1249
1250 #ifdef RX_KERNEL_TRACE
1251         {
1252             int glockOwner = ISAFS_GLOCK();
1253             if (!glockOwner)
1254                 AFS_GLOCK();
1255             afs_Trace3(afs_iclSetp, CM_TRACE_WASHERE, ICL_TYPE_STRING,
1256                        __FILE__, ICL_TYPE_INT32, __LINE__, ICL_TYPE_POINTER,
1257                        call);
1258             if (!glockOwner)
1259                 AFS_GUNLOCK();
1260         }
1261 #endif
1262
1263         if (!locked) {
1264             MUTEX_ENTER(&call->lock);
1265         }
1266
1267         if (call->error)
1268             call->app.mode = RX_MODE_ERROR;
1269
1270         call->flags |= RX_CALL_FLUSH;
1271
1272         cp = call->app.currentPacket;
1273
1274         if (cp) {
1275             /* cp->length is only supposed to be the user's data */
1276             /* cp->length was already set to (then-current)
1277              * MaxUserDataSize or less. */
1278 #ifdef RX_TRACK_PACKETS
1279             cp->flags &= ~RX_PKTFLAG_CP;
1280 #endif
1281             cp->length -= call->app.nFree;
1282             call->app.currentPacket = NULL;
1283             call->app.nFree = 0;
1284         } else {
1285             cp = rxi_AllocSendPacket(call, 0);
1286             if (!cp) {
1287                 /* Mode can no longer be MODE_SENDING */
1288                 return;
1289             }
1290             cp->length = 0;
1291             cp->niovecs = 2;    /* header + space for rxkad stuff */
1292             call->app.nFree = 0;
1293         }
1294
1295         /* The 1 specifies that this is the last packet */
1296         call->app.bytesSent += cp->length;
1297         rxi_PrepareSendPacket(call, cp, 1);
1298         /* PrepareSendPacket drops the call lock */
1299         rxi_WaitforTQBusy(call);
1300 #ifdef RX_TRACK_PACKETS
1301         cp->flags |= RX_PKTFLAG_TQ;
1302 #endif
1303         opr_queue_Append(&call->tq, &cp->entry);
1304 #ifdef RXDEBUG_PACKET
1305         call->tqc++;
1306 #endif /* RXDEBUG_PACKET */
1307
1308         /* If the call is in recovery, let it exhaust its current retransmit
1309          * queue before forcing it to send new packets
1310          */
1311         if (!(call->flags & RX_CALL_FAST_RECOVER)) {
1312             rxi_Start(call, 0);
1313         }
1314         if (!locked) {
1315             MUTEX_EXIT(&call->lock);
1316         }
1317     }
1318 }
1319
1320 void
1321 rxi_FlushWrite(struct rx_call *call)
1322 {
1323     FlushWrite(call, 0);
1324 }
1325
1326 void
1327 rxi_FlushWriteLocked(struct rx_call *call)
1328 {
1329     FlushWrite(call, 1);
1330 }
1331
1332 /* Flush any buffered data to the stream, switch to read mode
1333  * (clients) or to EOF mode (servers) */
1334 void
1335 rx_FlushWrite(struct rx_call *call)
1336 {
1337     SPLVAR;
1338     NETPRI;
1339     FlushWrite(call, 0);
1340     USERPRI;
1341 }