rx: Remove multi_End_Ignore
[openafs.git] / src / rx / rx_rdwr.c
1  /*
2   * Copyright 2000, International Business Machines Corporation and others.
3   * All Rights Reserved.
4   *
5   * This software has been released under the terms of the IBM Public
6   * License.  For details, see the LICENSE file in the top-level source
7   * directory or online at http://www.openafs.org/dl/license10.html
8   */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #ifdef KERNEL
14 # ifndef UKERNEL
15 #  ifdef RX_KERNEL_TRACE
16 #   include "rx_kcommon.h"
17 #  endif
18 #  if defined(AFS_DARWIN_ENV) || defined(AFS_XBSD_ENV)
19 #   include "afs/sysincludes.h"
20 #  else
21 #   include "h/types.h"
22 #   include "h/time.h"
23 #   include "h/stat.h"
24 #   if defined(AFS_AIX_ENV) || defined(AFS_AUX_ENV) || defined(AFS_SUN5_ENV)
25 #    include "h/systm.h"
26 #   endif
27 #   ifdef AFS_LINUX20_ENV
28 #    include "h/socket.h"
29 #   endif
30 #   include "netinet/in.h"
31 #   if defined(AFS_SGI_ENV)
32 #    include "afs/sysincludes.h"
33 #   endif
34 #  endif
35 #  include "afs/afs_args.h"
36 #  if   (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
37 #   include "h/systm.h"
38 #  endif
39 # else /* !UKERNEL */
40 #  include "afs/sysincludes.h"
41 # endif /* !UKERNEL */
42
43 # ifdef RXDEBUG
44 #  undef RXDEBUG                        /* turn off debugging */
45 # endif /* RXDEBUG */
46
47 # include "afs/afs_osi.h"
48 # include "rx_kmutex.h"
49 # include "rx/rx_kernel.h"
50 # include "afs/lock.h"
51 #else /* KERNEL */
52 # include <roken.h>
53 # include <afs/opr.h>
54 #endif /* KERNEL */
55
56 #include "rx.h"
57 #include "rx_clock.h"
58 #include "rx_globals.h"
59 #include "rx_atomic.h"
60 #include "rx_internal.h"
61 #include "rx_conn.h"
62 #include "rx_call.h"
63 #include "rx_packet.h"
64
65 #ifdef RX_LOCKS_DB
66 /* rxdb_fileID is used to identify the lock location, along with line#. */
67 static int rxdb_fileID = RXDB_FILE_RX_RDWR;
68 #endif /* RX_LOCKS_DB */
69
70 /* Get the next packet in the receive queue
71  *
72  * Dispose of the call's currentPacket, and move the next packet in the
73  * receive queue into the currentPacket field. If the next packet isn't
74  * available, then currentPacket is left NULL.
75  *
76  * @param call
77  *      The RX call to manipulate
78  * @returns
79  *      0 on success, an error code on failure
80  *
81  * @notes
82  *      Must be called with the call locked. Unlocks the call if returning
83  *      with an error.
84  */
85
86 static int
87 rxi_GetNextPacket(struct rx_call *call) {
88     struct rx_packet *rp;
89     int error;
90
91     if (call->app.currentPacket != NULL) {
92 #ifdef RX_TRACK_PACKETS
93         call->app.currentPacket->flags |= RX_PKTFLAG_CP;
94 #endif
95         rxi_FreePacket(call->app.currentPacket);
96         call->app.currentPacket = NULL;
97     }
98
99     if (opr_queue_IsEmpty(&call->rq))
100         return 0;
101
102     /* Check that next packet available is next in sequence */
103     rp = opr_queue_First(&call->rq, struct rx_packet, entry);
104     if (rp->header.seq != call->rnext)
105         return 0;
106
107     opr_queue_Remove(&rp->entry);
108 #ifdef RX_TRACK_PACKETS
109     rp->flags &= ~RX_PKTFLAG_RQ;
110 #endif
111 #ifdef RXDEBUG_PACKET
112     call->rqc--;
113 #endif /* RXDEBUG_PACKET */
114
115     /* RXS_CheckPacket called to undo RXS_PreparePacket's work.  It may
116      * reduce the length of the packet by up to conn->maxTrailerSize,
117      * to reflect the length of the data + the header. */
118     if ((error = RXS_CheckPacket(call->conn->securityObject, call, rp))) {
119         /* Used to merely shut down the call, but now we shut down the whole
120          * connection since this may indicate an attempt to hijack it */
121
122         MUTEX_EXIT(&call->lock);
123         rxi_ConnectionError(call->conn, error);
124         MUTEX_ENTER(&call->conn->conn_data_lock);
125         rp = rxi_SendConnectionAbort(call->conn, rp, 0, 0);
126         MUTEX_EXIT(&call->conn->conn_data_lock);
127         rxi_FreePacket(rp);
128
129         return error;
130      }
131
132     call->rnext++;
133     call->app.currentPacket = rp;
134 #ifdef RX_TRACK_PACKETS
135     call->app.currentPacket->flags |= RX_PKTFLAG_CP;
136 #endif
137     call->app.curvec = 1;       /* 0th vec is always header */
138
139     /* begin at the beginning [ more or less ], continue on until the end,
140      * then stop. */
141     call->app.curpos = (char *)call->app.currentPacket->wirevec[1].iov_base +
142                    call->conn->securityHeaderSize;
143     call->app.curlen = call->app.currentPacket->wirevec[1].iov_len -
144                    call->conn->securityHeaderSize;
145
146     call->app.nLeft = call->app.currentPacket->length;
147     call->app.bytesRcvd += call->app.currentPacket->length;
148
149     call->nHardAcks++;
150
151     return 0;
152 }
153
154 /* rxi_ReadProc -- internal version.
155  *
156  * LOCKS USED -- called at netpri
157  */
158 int
159 rxi_ReadProc(struct rx_call *call, char *buf,
160              int nbytes)
161 {
162     int requestCount;
163     int code;
164     unsigned int t;
165
166 /* XXXX took out clock_NewTime from here.  Was it needed? */
167     requestCount = nbytes;
168
169     /* Free any packets from the last call to ReadvProc/WritevProc */
170     if (!opr_queue_IsEmpty(&call->app.iovq)) {
171 #ifdef RXDEBUG_PACKET
172         call->iovqc -=
173 #endif /* RXDEBUG_PACKET */
174             rxi_FreePackets(0, &call->app.iovq);
175     }
176
177     do {
178         if (call->app.nLeft == 0) {
179             /* Get next packet */
180             MUTEX_ENTER(&call->lock);
181             for (;;) {
182                 if (call->error || (call->app.mode != RX_MODE_RECEIVING)) {
183                     if (call->error) {
184                         call->app.mode = RX_MODE_ERROR;
185                         MUTEX_EXIT(&call->lock);
186                         return 0;
187                     }
188                     if (call->app.mode == RX_MODE_SENDING) {
189                         rxi_FlushWriteLocked(call);
190                         continue;
191                     }
192                 }
193
194                 code = rxi_GetNextPacket(call);
195                 if (code)
196                      return 0;
197
198                 if (call->app.currentPacket) {
199                     if (!(call->flags & RX_CALL_RECEIVE_DONE)) {
200                         if (call->nHardAcks > (u_short) rxi_HardAckRate) {
201                             rxi_CancelDelayedAckEvent(call);
202                             rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
203                         } else {
204                             /* Delay to consolidate ack packets */
205                             rxi_PostDelayedAckEvent(call, &rx_hardAckDelay);
206                         }
207                     }
208                     break;
209                 }
210
211                 /*
212                  * If we reach this point either we have no packets in the
213                  * receive queue or the next packet in the queue is not the
214                  * one we are looking for.  There is nothing else for us to
215                  * do but wait for another packet to arrive.
216                  */
217
218                 /* Are there ever going to be any more packets? */
219                 if (call->flags & RX_CALL_RECEIVE_DONE) {
220                     MUTEX_EXIT(&call->lock);
221                     return requestCount - nbytes;
222                 }
223                 /* Wait for in-sequence packet */
224                 call->flags |= RX_CALL_READER_WAIT;
225                 clock_NewTime();
226                 call->startWait = clock_Sec();
227                 while (call->flags & RX_CALL_READER_WAIT) {
228 #ifdef  RX_ENABLE_LOCKS
229                     CV_WAIT(&call->cv_rq, &call->lock);
230 #else
231                     osi_rxSleep(&call->rq);
232 #endif
233                 }
234
235                 call->startWait = 0;
236 #ifdef RX_ENABLE_LOCKS
237                 if (call->error) {
238                     MUTEX_EXIT(&call->lock);
239                     return 0;
240                 }
241 #endif /* RX_ENABLE_LOCKS */
242             }
243             MUTEX_EXIT(&call->lock);
244         } else
245             /* osi_Assert(cp); */
246             /* MTUXXX  this should be replaced by some error-recovery code before shipping */
247             /* yes, the following block is allowed to be the ELSE clause (or not) */
248             /* It's possible for call->app.nLeft to be smaller than any particular
249              * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
250              * reflects the size of the buffer.  We have to keep track of the
251              * number of bytes read in the length field of the packet struct.  On
252              * the final portion of a received packet, it's almost certain that
253              * call->app.nLeft will be smaller than the final buffer. */
254             while (nbytes && call->app.currentPacket) {
255                 t = MIN((int)call->app.curlen, nbytes);
256                 t = MIN(t, (int)call->app.nLeft);
257                 memcpy(buf, call->app.curpos, t);
258                 buf += t;
259                 nbytes -= t;
260                 call->app.curpos += t;
261                 call->app.curlen -= t;
262                 call->app.nLeft -= t;
263
264                 if (!call->app.nLeft) {
265                     /* out of packet.  Get another one. */
266 #ifdef RX_TRACK_PACKETS
267                     call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
268 #endif
269                     rxi_FreePacket(call->app.currentPacket);
270                     call->app.currentPacket = NULL;
271                 } else if (!call->app.curlen) {
272                     /* need to get another struct iov */
273                     if (++call->app.curvec >= call->app.currentPacket->niovecs) {
274                         /* current packet is exhausted, get ready for another */
275                         /* don't worry about curvec and stuff, they get set somewhere else */
276 #ifdef RX_TRACK_PACKETS
277                         call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
278 #endif
279                         rxi_FreePacket(call->app.currentPacket);
280                         call->app.currentPacket = NULL;
281                         call->app.nLeft = 0;
282                     } else {
283                         call->app.curpos =
284                             call->app.currentPacket->wirevec[call->app.curvec].iov_base;
285                         call->app.curlen =
286                             call->app.currentPacket->wirevec[call->app.curvec].iov_len;
287                     }
288                 }
289             }
290         if (!nbytes) {
291             /* user buffer is full, return */
292             return requestCount;
293         }
294
295     } while (nbytes);
296
297     return requestCount;
298 }
299
300 int
301 rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
302 {
303     int bytes;
304     SPLVAR;
305
306     /* Free any packets from the last call to ReadvProc/WritevProc */
307     if (!opr_queue_IsEmpty(&call->app.iovq)) {
308 #ifdef RXDEBUG_PACKET
309         call->iovqc -=
310 #endif /* RXDEBUG_PACKET */
311             rxi_FreePackets(0, &call->app.iovq);
312     }
313
314     /*
315      * Most common case, all of the data is in the current iovec.
316      * We are relying on nLeft being zero unless the call is in receive mode.
317      */
318     if (!call->error && call->app.curlen > nbytes && call->app.nLeft > nbytes) {
319         memcpy(buf, call->app.curpos, nbytes);
320
321         call->app.curpos += nbytes;
322         call->app.curlen -= nbytes;
323         call->app.nLeft  -= nbytes;
324
325         if (!call->app.nLeft && call->app.currentPacket != NULL) {
326             /* out of packet.  Get another one. */
327             rxi_FreePacket(call->app.currentPacket);
328             call->app.currentPacket = NULL;
329         }
330         return nbytes;
331     }
332
333     NETPRI;
334     bytes = rxi_ReadProc(call, buf, nbytes);
335     USERPRI;
336     return bytes;
337 }
338
339 /* Optimization for unmarshalling 32 bit integers */
340 int
341 rx_ReadProc32(struct rx_call *call, afs_int32 * value)
342 {
343     int bytes;
344     SPLVAR;
345
346     /* Free any packets from the last call to ReadvProc/WritevProc */
347     if (!opr_queue_IsEmpty(&call->app.iovq)) {
348 #ifdef RXDEBUG_PACKET
349         call->iovqc -=
350 #endif /* RXDEBUG_PACKET */
351             rxi_FreePackets(0, &call->app.iovq);
352     }
353
354     /*
355      * Most common case, all of the data is in the current iovec.
356      * We are relying on nLeft being zero unless the call is in receive mode.
357      */
358     if (!call->error && call->app.curlen >= sizeof(afs_int32)
359         && call->app.nLeft >= sizeof(afs_int32)) {
360
361         memcpy((char *)value, call->app.curpos, sizeof(afs_int32));
362
363         call->app.curpos += sizeof(afs_int32);
364         call->app.curlen -= sizeof(afs_int32);
365         call->app.nLeft  -= sizeof(afs_int32);
366
367         if (!call->app.nLeft && call->app.currentPacket != NULL) {
368             /* out of packet.  Get another one. */
369             rxi_FreePacket(call->app.currentPacket);
370             call->app.currentPacket = NULL;
371         }
372         return sizeof(afs_int32);
373     }
374
375     NETPRI;
376     bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
377     USERPRI;
378
379     return bytes;
380 }
381
382 /* rxi_FillReadVec
383  *
384  * Uses packets in the receive queue to fill in as much of the
385  * current iovec as possible. Does not block if it runs out
386  * of packets to complete the iovec. Return true if an ack packet
387  * was sent, otherwise return false */
388 int
389 rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
390 {
391     int didConsume = 0;
392     int didHardAck = 0;
393     int code;
394     unsigned int t;
395     struct iovec *call_iov;
396     struct iovec *cur_iov = NULL;
397
398     if (call->app.currentPacket) {
399         cur_iov = &call->app.currentPacket->wirevec[call->app.curvec];
400     }
401     call_iov = &call->iov[call->iovNext];
402
403     while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) {
404         if (call->app.nLeft == 0) {
405             /* Get next packet */
406             code = rxi_GetNextPacket(call);
407             if (code) {
408                 MUTEX_ENTER(&call->lock);
409                 return 1;
410             }
411
412             if (call->app.currentPacket) {
413                 cur_iov = &call->app.currentPacket->wirevec[1];
414                 didConsume = 1;
415                 continue;
416             } else {
417                 break;
418             }
419         }
420
421         /* It's possible for call->app.nLeft to be smaller than any particular
422          * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
423          * reflects the size of the buffer.  We have to keep track of the
424          * number of bytes read in the length field of the packet struct.  On
425          * the final portion of a received packet, it's almost certain that
426          * call->app.nLeft will be smaller than the final buffer. */
427         while (call->iovNBytes
428                && call->iovNext < call->iovMax
429                && call->app.currentPacket) {
430
431             t = MIN((int)call->app.curlen, call->iovNBytes);
432             t = MIN(t, (int)call->app.nLeft);
433             call_iov->iov_base = call->app.curpos;
434             call_iov->iov_len = t;
435             call_iov++;
436             call->iovNext++;
437             call->iovNBytes -= t;
438             call->app.curpos += t;
439             call->app.curlen -= t;
440             call->app.nLeft -= t;
441
442             if (!call->app.nLeft) {
443                 /* out of packet.  Get another one. */
444 #ifdef RX_TRACK_PACKETS
445                 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
446                 call->app.currentPacket->flags |= RX_PKTFLAG_IOVQ;
447 #endif
448                 opr_queue_Append(&call->app.iovq,
449                                  &call->app.currentPacket->entry);
450 #ifdef RXDEBUG_PACKET
451                 call->iovqc++;
452 #endif /* RXDEBUG_PACKET */
453                 call->app.currentPacket = NULL;
454             } else if (!call->app.curlen) {
455                 /* need to get another struct iov */
456                 if (++call->app.curvec >= call->app.currentPacket->niovecs) {
457                     /* current packet is exhausted, get ready for another */
458                     /* don't worry about curvec and stuff, they get set somewhere else */
459 #ifdef RX_TRACK_PACKETS
460                     call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
461                     call->app.currentPacket->flags |= RX_PKTFLAG_IOVQ;
462 #endif
463                     opr_queue_Append(&call->app.iovq,
464                                      &call->app.currentPacket->entry);
465 #ifdef RXDEBUG_PACKET
466                     call->iovqc++;
467 #endif /* RXDEBUG_PACKET */
468                     call->app.currentPacket = NULL;
469                     call->app.nLeft = 0;
470                 } else {
471                     cur_iov++;
472                     call->app.curpos = (char *)cur_iov->iov_base;
473                     call->app.curlen = cur_iov->iov_len;
474                 }
475             }
476         }
477     }
478
479     /* If we consumed any packets then check whether we need to
480      * send a hard ack. */
481     if (didConsume && (!(call->flags & RX_CALL_RECEIVE_DONE))) {
482         if (call->nHardAcks > (u_short) rxi_HardAckRate) {
483             rxi_CancelDelayedAckEvent(call);
484             rxi_SendAck(call, 0, serial, RX_ACK_DELAY, 0);
485             didHardAck = 1;
486         } else {
487             /* Delay to consolidate ack packets */
488             rxi_PostDelayedAckEvent(call, &rx_hardAckDelay);
489         }
490     }
491     return didHardAck;
492 }
493
494
495 /* rxi_ReadvProc -- internal version.
496  *
497  * Fills in an iovec with pointers to the packet buffers. All packets
498  * except the last packet (new current packet) are moved to the iovq
499  * while the application is processing the data.
500  *
501  * LOCKS USED -- called at netpri.
502  */
503 int
504 rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
505               int nbytes)
506 {
507     int bytes;
508
509     /* Free any packets from the last call to ReadvProc/WritevProc */
510     if (!opr_queue_IsEmpty(&call->app.iovq)) {
511 #ifdef RXDEBUG_PACKET
512         call->iovqc -=
513 #endif /* RXDEBUG_PACKET */
514             rxi_FreePackets(0, &call->app.iovq);
515     }
516
517     if (call->app.mode == RX_MODE_SENDING) {
518         rxi_FlushWrite(call);
519     }
520
521     MUTEX_ENTER(&call->lock);
522     if (call->error)
523         goto error;
524
525     /* Get whatever data is currently available in the receive queue.
526      * If rxi_FillReadVec sends an ack packet then it is possible
527      * that we will receive more data while we drop the call lock
528      * to send the packet. Set the RX_CALL_IOVEC_WAIT flag
529      * here to avoid a race with the receive thread if we send
530      * hard acks in rxi_FillReadVec. */
531     call->flags |= RX_CALL_IOVEC_WAIT;
532     call->iovNBytes = nbytes;
533     call->iovMax = maxio;
534     call->iovNext = 0;
535     call->iov = iov;
536     rxi_FillReadVec(call, 0);
537
538     /* if we need more data then sleep until the receive thread has
539      * filled in the rest. */
540     if (!call->error && call->iovNBytes && call->iovNext < call->iovMax
541         && !(call->flags & RX_CALL_RECEIVE_DONE)) {
542         call->flags |= RX_CALL_READER_WAIT;
543         clock_NewTime();
544         call->startWait = clock_Sec();
545         while (call->flags & RX_CALL_READER_WAIT) {
546 #ifdef  RX_ENABLE_LOCKS
547             CV_WAIT(&call->cv_rq, &call->lock);
548 #else
549             osi_rxSleep(&call->rq);
550 #endif
551         }
552         call->startWait = 0;
553     }
554     call->flags &= ~RX_CALL_IOVEC_WAIT;
555
556     if (call->error)
557         goto error;
558
559     call->iov = NULL;
560     *nio = call->iovNext;
561     bytes = nbytes - call->iovNBytes;
562     MUTEX_EXIT(&call->lock);
563     return bytes;
564
565   error:
566     MUTEX_EXIT(&call->lock);
567     call->app.mode = RX_MODE_ERROR;
568     return 0;
569 }
570
571 int
572 rx_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
573              int nbytes)
574 {
575     int bytes;
576     SPLVAR;
577
578     NETPRI;
579     bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
580     USERPRI;
581     return bytes;
582 }
583
584 /* rxi_WriteProc -- internal version.
585  *
586  * LOCKS USED -- called at netpri
587  */
588
589 int
590 rxi_WriteProc(struct rx_call *call, char *buf,
591               int nbytes)
592 {
593     struct rx_connection *conn = call->conn;
594     unsigned int t;
595     int requestCount = nbytes;
596
597     /* Free any packets from the last call to ReadvProc/WritevProc */
598     if (!opr_queue_IsEmpty(&call->app.iovq)) {
599 #ifdef RXDEBUG_PACKET
600         call->iovqc -=
601 #endif /* RXDEBUG_PACKET */
602             rxi_FreePackets(0, &call->app.iovq);
603     }
604
605     if (call->app.mode != RX_MODE_SENDING) {
606         if ((conn->type == RX_SERVER_CONNECTION)
607             && (call->app.mode == RX_MODE_RECEIVING)) {
608             call->app.mode = RX_MODE_SENDING;
609             if (call->app.currentPacket) {
610 #ifdef RX_TRACK_PACKETS
611                 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
612 #endif
613                 rxi_FreePacket(call->app.currentPacket);
614                 call->app.currentPacket = NULL;
615                 call->app.nLeft = 0;
616                 call->app.nFree = 0;
617             }
618         } else {
619             return 0;
620         }
621     }
622
623     /* Loop condition is checked at end, so that a write of 0 bytes
624      * will force a packet to be created--specially for the case where
625      * there are 0 bytes on the stream, but we must send a packet
626      * anyway. */
627     do {
628         if (call->app.nFree == 0) {
629             MUTEX_ENTER(&call->lock);
630             if (call->error)
631                 call->app.mode = RX_MODE_ERROR;
632             if (!call->error && call->app.currentPacket) {
633                 clock_NewTime();        /* Bogus:  need new time package */
634                 /* The 0, below, specifies that it is not the last packet:
635                  * there will be others. PrepareSendPacket may
636                  * alter the packet length by up to
637                  * conn->securityMaxTrailerSize */
638                 call->app.bytesSent += call->app.currentPacket->length;
639                 rxi_PrepareSendPacket(call, call->app.currentPacket, 0);
640                 /* PrepareSendPacket drops the call lock */
641                 rxi_WaitforTQBusy(call);
642 #ifdef RX_TRACK_PACKETS
643                 call->app.currentPacket->flags |= RX_PKTFLAG_TQ;
644 #endif
645                 opr_queue_Append(&call->tq,
646                                  &call->app.currentPacket->entry);
647 #ifdef RXDEBUG_PACKET
648                 call->tqc++;
649 #endif /* RXDEBUG_PACKET */
650 #ifdef RX_TRACK_PACKETS
651                 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
652 #endif
653                 call->app.currentPacket = NULL;
654
655                 /* If the call is in recovery, let it exhaust its current
656                  * retransmit queue before forcing it to send new packets
657                  */
658                 if (!(call->flags & (RX_CALL_FAST_RECOVER))) {
659                     rxi_Start(call, 0);
660                 }
661             } else if (call->app.currentPacket) {
662 #ifdef RX_TRACK_PACKETS
663                 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
664 #endif
665                 rxi_FreePacket(call->app.currentPacket);
666                 call->app.currentPacket = NULL;
667             }
668             /* Wait for transmit window to open up */
669             while (!call->error
670                    && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
671                 clock_NewTime();
672                 call->startWait = clock_Sec();
673
674 #ifdef  RX_ENABLE_LOCKS
675                 CV_WAIT(&call->cv_twind, &call->lock);
676 #else
677                 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
678                 osi_rxSleep(&call->twind);
679 #endif
680
681                 call->startWait = 0;
682 #ifdef RX_ENABLE_LOCKS
683                 if (call->error) {
684                     call->app.mode = RX_MODE_ERROR;
685                     MUTEX_EXIT(&call->lock);
686                     return 0;
687                 }
688 #endif /* RX_ENABLE_LOCKS */
689             }
690             if ((call->app.currentPacket = rxi_AllocSendPacket(call, nbytes))) {
691 #ifdef RX_TRACK_PACKETS
692                 call->app.currentPacket->flags |= RX_PKTFLAG_CP;
693 #endif
694                 call->app.nFree = call->app.currentPacket->length;
695                 call->app.curvec = 1;   /* 0th vec is always header */
696                 /* begin at the beginning [ more or less ], continue
697                  * on until the end, then stop. */
698                 call->app.curpos =
699                     (char *) call->app.currentPacket->wirevec[1].iov_base +
700                     call->conn->securityHeaderSize;
701                 call->app.curlen =
702                     call->app.currentPacket->wirevec[1].iov_len -
703                     call->conn->securityHeaderSize;
704             }
705             if (call->error) {
706                 call->app.mode = RX_MODE_ERROR;
707                 if (call->app.currentPacket) {
708 #ifdef RX_TRACK_PACKETS
709                     call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
710 #endif
711                     rxi_FreePacket(call->app.currentPacket);
712                     call->app.currentPacket = NULL;
713                 }
714                 MUTEX_EXIT(&call->lock);
715                 return 0;
716             }
717             MUTEX_EXIT(&call->lock);
718         }
719
720         if (call->app.currentPacket && (int)call->app.nFree < nbytes) {
721             /* Try to extend the current buffer */
722             int len, mud;
723             len = call->app.currentPacket->length;
724             mud = rx_MaxUserDataSize(call);
725             if (mud > len) {
726                 int want;
727                 want = MIN(nbytes - (int)call->app.nFree, mud - len);
728                 rxi_AllocDataBuf(call->app.currentPacket, want,
729                                  RX_PACKET_CLASS_SEND_CBUF);
730                 if (call->app.currentPacket->length > (unsigned)mud)
731                     call->app.currentPacket->length = mud;
732                 call->app.nFree += (call->app.currentPacket->length - len);
733             }
734         }
735
736         /* If the remaining bytes fit in the buffer, then store them
737          * and return.  Don't ship a buffer that's full immediately to
738          * the peer--we don't know if it's the last buffer yet */
739
740         if (!call->app.currentPacket) {
741             call->app.nFree = 0;
742         }
743
744         while (nbytes && call->app.nFree) {
745
746             t = MIN((int)call->app.curlen, nbytes);
747             t = MIN((int)call->app.nFree, t);
748             memcpy(call->app.curpos, buf, t);
749             buf += t;
750             nbytes -= t;
751             call->app.curpos += t;
752             call->app.curlen -= (u_short)t;
753             call->app.nFree -= (u_short)t;
754
755             if (!call->app.curlen) {
756                 /* need to get another struct iov */
757                 if (++call->app.curvec >= call->app.currentPacket->niovecs) {
758                     /* current packet is full, extend or send it */
759                     call->app.nFree = 0;
760                 } else {
761                     call->app.curpos =
762                         call->app.currentPacket->wirevec[call->app.curvec].iov_base;
763                     call->app.curlen =
764                         call->app.currentPacket->wirevec[call->app.curvec].iov_len;
765                 }
766             }
767         }                       /* while bytes to send and room to send them */
768
769         /* might be out of space now */
770         if (!nbytes) {
771             return requestCount;
772         } else {
773             /* more data to send, so get another packet and keep going */
774         }
775     } while (nbytes);
776
777     return requestCount - nbytes;
778 }
779
780 int
781 rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
782 {
783     int bytes;
784     int tcurlen;
785     int tnFree;
786     char *tcurpos;
787     SPLVAR;
788
789     /* Free any packets from the last call to ReadvProc/WritevProc */
790     if (!opr_queue_IsEmpty(&call->app.iovq)) {
791 #ifdef RXDEBUG_PACKET
792         call->iovqc -=
793 #endif /* RXDEBUG_PACKET */
794             rxi_FreePackets(0, &call->app.iovq);
795     }
796
797     /*
798      * Most common case: all of the data fits in the current iovec.
799      * We are relying on nFree being zero unless the call is in send mode.
800      */
801     tcurlen = (int)call->app.curlen;
802     tnFree = (int)call->app.nFree;
803     if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
804         tcurpos = call->app.curpos;
805
806         memcpy(tcurpos, buf, nbytes);
807         call->app.curpos = tcurpos + nbytes;
808         call->app.curlen = (u_short)(tcurlen - nbytes);
809         call->app.nFree = (u_short)(tnFree - nbytes);
810         return nbytes;
811     }
812
813     NETPRI;
814     bytes = rxi_WriteProc(call, buf, nbytes);
815     USERPRI;
816     return bytes;
817 }
818
819 /* Optimization for marshalling 32 bit arguments */
820 int
821 rx_WriteProc32(struct rx_call *call, afs_int32 * value)
822 {
823     int bytes;
824     int tcurlen;
825     int tnFree;
826     char *tcurpos;
827     SPLVAR;
828
829     if (!opr_queue_IsEmpty(&call->app.iovq)) {
830 #ifdef RXDEBUG_PACKET
831         call->iovqc -=
832 #endif /* RXDEBUG_PACKET */
833             rxi_FreePackets(0, &call->app.iovq);
834     }
835
836     /*
837      * Most common case: all of the data fits in the current iovec.
838      * We are relying on nFree being zero unless the call is in send mode.
839      */
840     tcurlen = call->app.curlen;
841     tnFree = call->app.nFree;
842     if (!call->error && tcurlen >= sizeof(afs_int32)
843         && tnFree >= sizeof(afs_int32)) {
844         tcurpos = call->app.curpos;
845
846         if (!((size_t)tcurpos & (sizeof(afs_int32) - 1))) {
847             *((afs_int32 *) (tcurpos)) = *value;
848         } else {
849             memcpy(tcurpos, (char *)value, sizeof(afs_int32));
850         }
851         call->app.curpos = tcurpos + sizeof(afs_int32);
852         call->app.curlen = (u_short)(tcurlen - sizeof(afs_int32));
853         call->app.nFree = (u_short)(tnFree - sizeof(afs_int32));
854         return sizeof(afs_int32);
855     }
856
857     NETPRI;
858     bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
859     USERPRI;
860     return bytes;
861 }
862
863 /* rxi_WritevAlloc -- internal version.
864  *
865  * Fill in an iovec to point to data in packet buffers. The application
866  * calls rxi_WritevProc when the buffers are full.
867  *
868  * LOCKS USED -- called at netpri.
869  */
870
871 static int
872 rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
873                 int nbytes)
874 {
875     struct rx_connection *conn = call->conn;
876     struct rx_packet *cp;
877     int requestCount;
878     int nextio;
879     /* Temporary values, real work is done in rxi_WritevProc */
880     int tnFree;
881     unsigned int tcurvec;
882     char *tcurpos;
883     int tcurlen;
884
885     requestCount = nbytes;
886     nextio = 0;
887
888     /* Free any packets from the last call to ReadvProc/WritevProc */
889     if (!opr_queue_IsEmpty(&call->app.iovq)) {
890 #ifdef RXDEBUG_PACKET
891         call->iovqc -=
892 #endif /* RXDEBUG_PACKET */
893             rxi_FreePackets(0, &call->app.iovq);
894     }
895
896     if (call->app.mode != RX_MODE_SENDING) {
897         if ((conn->type == RX_SERVER_CONNECTION)
898             && (call->app.mode == RX_MODE_RECEIVING)) {
899             call->app.mode = RX_MODE_SENDING;
900             if (call->app.currentPacket) {
901 #ifdef RX_TRACK_PACKETS
902                 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
903 #endif
904                 rxi_FreePacket(call->app.currentPacket);
905                 call->app.currentPacket = NULL;
906                 call->app.nLeft = 0;
907                 call->app.nFree = 0;
908             }
909         } else {
910             return 0;
911         }
912     }
913
914     /* Set up the iovec to point to data in packet buffers. */
915     tnFree = call->app.nFree;
916     tcurvec = call->app.curvec;
917     tcurpos = call->app.curpos;
918     tcurlen = call->app.curlen;
919     cp = call->app.currentPacket;
920     do {
921         int t;
922
923         if (tnFree == 0) {
924             /* current packet is full, allocate a new one */
925             MUTEX_ENTER(&call->lock);
926             cp = rxi_AllocSendPacket(call, nbytes);
927             MUTEX_EXIT(&call->lock);
928             if (cp == NULL) {
929                 /* out of space, return what we have */
930                 *nio = nextio;
931                 return requestCount - nbytes;
932             }
933 #ifdef RX_TRACK_PACKETS
934             cp->flags |= RX_PKTFLAG_IOVQ;
935 #endif
936             opr_queue_Append(&call->app.iovq, &cp->entry);
937 #ifdef RXDEBUG_PACKET
938             call->iovqc++;
939 #endif /* RXDEBUG_PACKET */
940             tnFree = cp->length;
941             tcurvec = 1;
942             tcurpos =
943                 (char *)cp->wirevec[1].iov_base +
944                 call->conn->securityHeaderSize;
945             tcurlen = cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
946         }
947
948         if (tnFree < nbytes) {
949             /* try to extend the current packet */
950             int len, mud;
951             len = cp->length;
952             mud = rx_MaxUserDataSize(call);
953             if (mud > len) {
954                 int want;
955                 want = MIN(nbytes - tnFree, mud - len);
956                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
957                 if (cp->length > (unsigned)mud)
958                     cp->length = mud;
959                 tnFree += (cp->length - len);
960                 if (cp == call->app.currentPacket) {
961                     call->app.nFree += (cp->length - len);
962                 }
963             }
964         }
965
966         /* fill in the next entry in the iovec */
967         t = MIN(tcurlen, nbytes);
968         t = MIN(tnFree, t);
969         iov[nextio].iov_base = tcurpos;
970         iov[nextio].iov_len = t;
971         nbytes -= t;
972         tcurpos += t;
973         tcurlen -= t;
974         tnFree -= t;
975         nextio++;
976
977         if (!tcurlen) {
978             /* need to get another struct iov */
979             if (++tcurvec >= cp->niovecs) {
980                 /* current packet is full, extend it or move on to next packet */
981                 tnFree = 0;
982             } else {
983                 tcurpos = (char *)cp->wirevec[tcurvec].iov_base;
984                 tcurlen = cp->wirevec[tcurvec].iov_len;
985             }
986         }
987     } while (nbytes && nextio < maxio);
988     *nio = nextio;
989     return requestCount - nbytes;
990 }
991
992 int
993 rx_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
994                int nbytes)
995 {
996     int bytes;
997     SPLVAR;
998
999     NETPRI;
1000     bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
1001     USERPRI;
1002     return bytes;
1003 }
1004
1005 /* rxi_WritevProc -- internal version.
1006  *
1007  * Send buffers allocated in rxi_WritevAlloc.
1008  *
1009  * LOCKS USED -- called at netpri.
1010  */
1011 int
1012 rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1013 {
1014 #ifdef RX_TRACK_PACKETS
1015     struct opr_queue *cursor;
1016 #endif
1017     int nextio = 0;
1018     int requestCount;
1019     struct opr_queue tmpq;
1020 #ifdef RXDEBUG_PACKET
1021     u_short tmpqc;
1022 #endif
1023
1024     requestCount = nbytes;
1025
1026     MUTEX_ENTER(&call->lock);
1027     if (call->error) {
1028         call->app.mode = RX_MODE_ERROR;
1029     } else if (call->app.mode != RX_MODE_SENDING) {
1030         call->error = RX_PROTOCOL_ERROR;
1031     }
1032     rxi_WaitforTQBusy(call);
1033
1034     if (call->error) {
1035         call->app.mode = RX_MODE_ERROR;
1036         MUTEX_EXIT(&call->lock);
1037         if (call->app.currentPacket) {
1038 #ifdef RX_TRACK_PACKETS
1039             call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
1040             call->app.currentPacket->flags |= RX_PKTFLAG_IOVQ;
1041 #endif
1042             opr_queue_Prepend(&call->app.iovq,
1043                               &call->app.currentPacket->entry);
1044 #ifdef RXDEBUG_PACKET
1045             call->iovqc++;
1046 #endif /* RXDEBUG_PACKET */
1047             call->app.currentPacket = NULL;
1048         }
1049 #ifdef RXDEBUG_PACKET
1050         call->iovqc -=
1051 #endif /* RXDEBUG_PACKET */
1052             rxi_FreePackets(0, &call->app.iovq);
1053         return 0;
1054     }
1055
1056     /* Loop through the I/O vector adjusting packet pointers.
1057      * Place full packets back onto the iovq once they are ready
1058      * to send. Set RX_PROTOCOL_ERROR if any problems are found in
1059      * the iovec. We put the loop condition at the end to ensure that
1060      * a zero length write will push a short packet. */
1061     opr_queue_Init(&tmpq);
1062 #ifdef RXDEBUG_PACKET
1063     tmpqc = 0;
1064 #endif /* RXDEBUG_PACKET */
1065     do {
1066         if (call->app.nFree == 0 && call->app.currentPacket) {
1067             clock_NewTime();    /* Bogus:  need new time package */
1068             /* The 0, below, specifies that it is not the last packet:
1069              * there will be others. PrepareSendPacket may
1070              * alter the packet length by up to
1071              * conn->securityMaxTrailerSize */
1072             call->app.bytesSent += call->app.currentPacket->length;
1073             rxi_PrepareSendPacket(call, call->app.currentPacket, 0);
1074             /* PrepareSendPacket drops the call lock */
1075             rxi_WaitforTQBusy(call);
1076             opr_queue_Append(&tmpq, &call->app.currentPacket->entry);
1077 #ifdef RXDEBUG_PACKET
1078             tmpqc++;
1079 #endif /* RXDEBUG_PACKET */
1080             call->app.currentPacket = NULL;
1081
1082             /* The head of the iovq is now the current packet */
1083             if (nbytes) {
1084                 if (opr_queue_IsEmpty(&call->app.iovq)) {
1085                     MUTEX_EXIT(&call->lock);
1086                     call->error = RX_PROTOCOL_ERROR;
1087 #ifdef RXDEBUG_PACKET
1088                     tmpqc -=
1089 #endif /* RXDEBUG_PACKET */
1090                         rxi_FreePackets(0, &tmpq);
1091                     return 0;
1092                 }
1093                 call->app.currentPacket
1094                         = opr_queue_First(&call->app.iovq, struct rx_packet,
1095                                           entry);
1096                 opr_queue_Remove(&call->app.currentPacket->entry);
1097 #ifdef RX_TRACK_PACKETS
1098                 call->app.currentPacket->flags &= ~RX_PKTFLAG_IOVQ;
1099                 call->app.currentPacket->flags |= RX_PKTFLAG_CP;
1100 #endif
1101 #ifdef RXDEBUG_PACKET
1102                 call->iovqc--;
1103 #endif /* RXDEBUG_PACKET */
1104                 call->app.nFree = call->app.currentPacket->length;
1105                 call->app.curvec = 1;
1106                 call->app.curpos =
1107                     (char *) call->app.currentPacket->wirevec[1].iov_base +
1108                     call->conn->securityHeaderSize;
1109                 call->app.curlen =
1110                     call->app.currentPacket->wirevec[1].iov_len -
1111                     call->conn->securityHeaderSize;
1112             }
1113         }
1114
1115         if (nbytes) {
1116             /* The next iovec should point to the current position */
1117             if (iov[nextio].iov_base != call->app.curpos
1118                 || iov[nextio].iov_len > (int)call->app.curlen) {
1119                 call->error = RX_PROTOCOL_ERROR;
1120                 MUTEX_EXIT(&call->lock);
1121                 if (call->app.currentPacket) {
1122 #ifdef RX_TRACK_PACKETS
1123                     call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
1124 #endif
1125                     opr_queue_Prepend(&tmpq,
1126                                       &call->app.currentPacket->entry);
1127 #ifdef RXDEBUG_PACKET
1128                     tmpqc++;
1129 #endif /* RXDEBUG_PACKET */
1130                     call->app.currentPacket = NULL;
1131                 }
1132 #ifdef RXDEBUG_PACKET
1133                 tmpqc -=
1134 #endif /* RXDEBUG_PACKET */
1135                     rxi_FreePackets(0, &tmpq);
1136                 return 0;
1137             }
1138             nbytes -= iov[nextio].iov_len;
1139             call->app.curpos += iov[nextio].iov_len;
1140             call->app.curlen -= iov[nextio].iov_len;
1141             call->app.nFree -= iov[nextio].iov_len;
1142             nextio++;
1143             if (call->app.curlen == 0) {
1144                 if (++call->app.curvec > call->app.currentPacket->niovecs) {
1145                     call->app.nFree = 0;
1146                 } else {
1147                     call->app.curpos =
1148                         call->app.currentPacket->wirevec[call->app.curvec].iov_base;
1149                     call->app.curlen =
1150                         call->app.currentPacket->wirevec[call->app.curvec].iov_len;
1151                 }
1152             }
1153         }
1154     } while (nbytes && nextio < nio);
1155
1156     /* Move the packets from the temporary queue onto the transmit queue.
1157      * We may end up with more than call->twind packets on the queue. */
1158
1159 #ifdef RX_TRACK_PACKETS
1160     for (opr_queue_Scan(&tmpq, cursor))
1161     {
1162         struct rx_packet *p = opr_queue_Entry(cursor, struct rx_packet, entry);
1163         p->flags |= RX_PKTFLAG_TQ;
1164     }
1165 #endif
1166     if (call->error)
1167         call->app.mode = RX_MODE_ERROR;
1168
1169     opr_queue_SpliceAppend(&call->tq, &tmpq);
1170
1171     /* If the call is in recovery, let it exhaust its current retransmit
1172      * queue before forcing it to send new packets
1173      */
1174     if (!(call->flags & RX_CALL_FAST_RECOVER)) {
1175         rxi_Start(call, 0);
1176     }
1177
1178     /* Wait for the length of the transmit queue to fall below call->twind */
1179     while (!call->error && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
1180         clock_NewTime();
1181         call->startWait = clock_Sec();
1182 #ifdef  RX_ENABLE_LOCKS
1183         CV_WAIT(&call->cv_twind, &call->lock);
1184 #else
1185         call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
1186         osi_rxSleep(&call->twind);
1187 #endif
1188         call->startWait = 0;
1189     }
1190
1191     if (call->error) {
1192         call->app.mode = RX_MODE_ERROR;
1193         call->app.currentPacket = NULL;
1194         MUTEX_EXIT(&call->lock);
1195         if (call->app.currentPacket) {
1196 #ifdef RX_TRACK_PACKETS
1197             call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
1198 #endif
1199             rxi_FreePacket(call->app.currentPacket);
1200         }
1201         return 0;
1202     }
1203     MUTEX_EXIT(&call->lock);
1204
1205     return requestCount - nbytes;
1206 }
1207
1208 int
1209 rx_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1210 {
1211     int bytes;
1212     SPLVAR;
1213
1214     NETPRI;
1215     bytes = rxi_WritevProc(call, iov, nio, nbytes);
1216     USERPRI;
1217     return bytes;
1218 }
1219
1220 /* Flush any buffered data to the stream, switch to read mode
1221  * (clients) or to EOF mode (servers). If 'locked' is nonzero, call->lock must
1222  * be already held.
1223  *
1224  * LOCKS HELD: called at netpri.
1225  */
1226 static void
1227 FlushWrite(struct rx_call *call, int locked)
1228 {
1229     struct rx_packet *cp = NULL;
1230
1231     /* Free any packets from the last call to ReadvProc/WritevProc */
1232     if (!opr_queue_IsEmpty(&call->app.iovq)) {
1233 #ifdef RXDEBUG_PACKET
1234         call->iovqc -=
1235 #endif /* RXDEBUG_PACKET */
1236             rxi_FreePackets(0, &call->app.iovq);
1237     }
1238
1239     if (call->app.mode == RX_MODE_SENDING) {
1240
1241         call->app.mode =
1242             (call->conn->type ==
1243              RX_CLIENT_CONNECTION ? RX_MODE_RECEIVING : RX_MODE_EOF);
1244
1245 #ifdef RX_KERNEL_TRACE
1246         {
1247             int glockOwner = ISAFS_GLOCK();
1248             if (!glockOwner)
1249                 AFS_GLOCK();
1250             afs_Trace3(afs_iclSetp, CM_TRACE_WASHERE, ICL_TYPE_STRING,
1251                        __FILE__, ICL_TYPE_INT32, __LINE__, ICL_TYPE_POINTER,
1252                        call);
1253             if (!glockOwner)
1254                 AFS_GUNLOCK();
1255         }
1256 #endif
1257
1258         if (!locked) {
1259             MUTEX_ENTER(&call->lock);
1260         }
1261
1262         if (call->error)
1263             call->app.mode = RX_MODE_ERROR;
1264
1265         call->flags |= RX_CALL_FLUSH;
1266
1267         cp = call->app.currentPacket;
1268
1269         if (cp) {
1270             /* cp->length is only supposed to be the user's data */
1271             /* cp->length was already set to (then-current)
1272              * MaxUserDataSize or less. */
1273 #ifdef RX_TRACK_PACKETS
1274             cp->flags &= ~RX_PKTFLAG_CP;
1275 #endif
1276             cp->length -= call->app.nFree;
1277             call->app.currentPacket = NULL;
1278             call->app.nFree = 0;
1279         } else {
1280             cp = rxi_AllocSendPacket(call, 0);
1281             if (!cp) {
1282                 /* Mode can no longer be MODE_SENDING */
1283                 return;
1284             }
1285             cp->length = 0;
1286             cp->niovecs = 2;    /* header + space for rxkad stuff */
1287             call->app.nFree = 0;
1288         }
1289
1290         /* The 1 specifies that this is the last packet */
1291         call->app.bytesSent += cp->length;
1292         rxi_PrepareSendPacket(call, cp, 1);
1293         /* PrepareSendPacket drops the call lock */
1294         rxi_WaitforTQBusy(call);
1295 #ifdef RX_TRACK_PACKETS
1296         cp->flags |= RX_PKTFLAG_TQ;
1297 #endif
1298         opr_queue_Append(&call->tq, &cp->entry);
1299 #ifdef RXDEBUG_PACKET
1300         call->tqc++;
1301 #endif /* RXDEBUG_PACKET */
1302
1303         /* If the call is in recovery, let it exhaust its current retransmit
1304          * queue before forcing it to send new packets
1305          */
1306         if (!(call->flags & RX_CALL_FAST_RECOVER)) {
1307             rxi_Start(call, 0);
1308         }
1309         if (!locked) {
1310             MUTEX_EXIT(&call->lock);
1311         }
1312     }
1313 }
1314
1315 void
1316 rxi_FlushWrite(struct rx_call *call)
1317 {
1318     FlushWrite(call, 0);
1319 }
1320
1321 void
1322 rxi_FlushWriteLocked(struct rx_call *call)
1323 {
1324     FlushWrite(call, 1);
1325 }
1326
1327 /* Flush any buffered data to the stream, switch to read mode
1328  * (clients) or to EOF mode (servers) */
1329 void
1330 rx_FlushWrite(struct rx_call *call)
1331 {
1332     SPLVAR;
1333     NETPRI;
1334     FlushWrite(call, 0);
1335     USERPRI;
1336 }