2c3b37b173dde8b9fb29cb5f7baf6d2975751901
[openafs.git] / src / rx / rx_rdwr.c
1  /*
2   * Copyright 2000, International Business Machines Corporation and others.
3   * All Rights Reserved.
4   * 
5   * This software has been released under the terms of the IBM Public
6   * License.  For details, see the LICENSE file in the top-level source
7   * directory or online at http://www.openafs.org/dl/license10.html
8   */
9
10 #include <afsconfig.h>
11 #ifdef KERNEL
12 #include "afs/param.h"
13 #else
14 #include <afs/param.h>
15 #endif
16
17 RCSID
18     ("$Header$");
19
20 #ifdef KERNEL
21 #ifndef UKERNEL
22 #ifdef RX_KERNEL_TRACE
23 #include "rx_kcommon.h"
24 #endif
25 #if defined(AFS_DARWIN_ENV) || defined(AFS_XBSD_ENV)
26 #include "afs/sysincludes.h"
27 #else
28 #include "h/types.h"
29 #include "h/time.h"
30 #include "h/stat.h"
31 #if defined(AFS_AIX_ENV) || defined(AFS_AUX_ENV) || defined(AFS_SUN5_ENV) 
32 #include "h/systm.h"
33 #endif
34 #ifdef  AFS_OSF_ENV
35 #include <net/net_globals.h>
36 #endif /* AFS_OSF_ENV */
37 #ifdef AFS_LINUX20_ENV
38 #include "h/socket.h"
39 #endif
40 #include "netinet/in.h"
41 #if defined(AFS_SGI_ENV)
42 #include "afs/sysincludes.h"
43 #endif
44 #endif
45 #include "afs/afs_args.h"
46 #include "afs/afs_osi.h"
47 #if     (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
48 #include "h/systm.h"
49 #endif
50 #else /* !UKERNEL */
51 #include "afs/sysincludes.h"
52 #endif /* !UKERNEL */
53 #ifdef RXDEBUG
54 #undef RXDEBUG                  /* turn off debugging */
55 #endif /* RXDEBUG */
56
57 #include "rx_kmutex.h"
58 #include "rx/rx_kernel.h"
59 #include "rx/rx_clock.h"
60 #include "rx/rx_queue.h"
61 #include "rx/rx_internal.h"
62 #include "rx/rx.h"
63 #include "rx/rx_globals.h"
64 #include "afs/lock.h"
65 #include "afsint.h"
66 #ifdef  AFS_OSF_ENV
67 #undef kmem_alloc
68 #undef kmem_free
69 #undef mem_alloc
70 #undef mem_free
71 #undef register
72 #endif /* AFS_OSF_ENV */
73 #else /* KERNEL */
74 # include <sys/types.h>
75 #ifdef AFS_NT40_ENV
76 # include <winsock2.h>
77 #else /* !AFS_NT40_ENV */
78 # include <sys/socket.h>
79 # include <sys/file.h>
80 # include <netdb.h>
81 # include <netinet/in.h>
82 # include <sys/stat.h>
83 # include <sys/time.h>
84 #endif /* !AFS_NT40_ENV */
85 #include <string.h>
86 #ifdef HAVE_UNISTD_H
87 #include <unistd.h>
88 #endif
89 # include "rx_user.h"
90 # include "rx_clock.h"
91 # include "rx_queue.h"
92 # include "rx_internal.h"
93 # include "rx.h"
94 # include "rx_globals.h"
95 #endif /* KERNEL */
96
97 #ifdef RX_LOCKS_DB
98 /* rxdb_fileID is used to identify the lock location, along with line#. */
99 static int rxdb_fileID = RXDB_FILE_RX_RDWR;
100 #endif /* RX_LOCKS_DB */
101 /* rxi_ReadProc -- internal version.
102  *
103  * LOCKS USED -- called at netpri with rx global lock and call->lock held.
104  */
105 int
106 rxi_ReadProc(register struct rx_call *call, register char *buf,
107              register int nbytes)
108 {
109     register struct rx_packet *cp = call->currentPacket;
110     register struct rx_packet *rp;
111     register int requestCount;
112     register unsigned int t;
113
114 /* XXXX took out clock_NewTime from here.  Was it needed? */
115     requestCount = nbytes;
116
117     /* Free any packets from the last call to ReadvProc/WritevProc */
118     if (queue_IsNotEmpty(&call->iovq)) {
119         rxi_FreePackets(0, &call->iovq);
120     }
121
122     do {
123         if (call->nLeft == 0) {
124             /* Get next packet */
125             for (;;) {
126                 if (call->error || (call->mode != RX_MODE_RECEIVING)) {
127                     if (call->error) {
128                         return 0;
129                     }
130                     if (call->mode == RX_MODE_SENDING) {
131                         rxi_FlushWrite(call);
132                         continue;
133                     }
134                 }
135                 if (queue_IsNotEmpty(&call->rq)) {
136                     /* Check that next packet available is next in sequence */
137                     rp = queue_First(&call->rq, rx_packet);
138                     if (rp->header.seq == call->rnext) {
139                         afs_int32 error;
140                         register struct rx_connection *conn = call->conn;
141                         queue_Remove(rp);
142                         rp->flags &= ~RX_PKTFLAG_RQ;
143
144                         /* RXS_CheckPacket called to undo RXS_PreparePacket's
145                          * work.  It may reduce the length of the packet by up
146                          * to conn->maxTrailerSize, to reflect the length of the
147                          * data + the header. */
148                         if ((error =
149                              RXS_CheckPacket(conn->securityObject, call,
150                                              rp))) {
151                             /* Used to merely shut down the call, but now we 
152                              * shut down the whole connection since this may 
153                              * indicate an attempt to hijack it */
154
155                             MUTEX_EXIT(&call->lock);
156                             rxi_ConnectionError(conn, error);
157                             MUTEX_ENTER(&conn->conn_data_lock);
158                             rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
159                             MUTEX_EXIT(&conn->conn_data_lock);
160                             rxi_FreePacket(rp);
161                             MUTEX_ENTER(&call->lock);
162
163                             return 0;
164                         }
165                         call->rnext++;
166                         cp = call->currentPacket = rp;
167                         call->currentPacket->flags |= RX_PKTFLAG_CP;
168                         call->curvec = 1;       /* 0th vec is always header */
169                         /* begin at the beginning [ more or less ], continue 
170                          * on until the end, then stop. */
171                         call->curpos =
172                             (char *)cp->wirevec[1].iov_base +
173                             call->conn->securityHeaderSize;
174                         call->curlen =
175                             cp->wirevec[1].iov_len -
176                             call->conn->securityHeaderSize;
177
178                         /* Notice that this code works correctly if the data
179                          * size is 0 (which it may be--no reply arguments from
180                          * server, for example).  This relies heavily on the
181                          * fact that the code below immediately frees the packet
182                          * (no yields, etc.).  If it didn't, this would be a
183                          * problem because a value of zero for call->nLeft
184                          * normally means that there is no read packet */
185                         call->nLeft = cp->length;
186                         hadd32(call->bytesRcvd, cp->length);
187
188                         /* Send a hard ack for every rxi_HardAckRate+1 packets
189                          * consumed. Otherwise schedule an event to send
190                          * the hard ack later on.
191                          */
192                         call->nHardAcks++;
193                         if (!(call->flags & RX_CALL_RECEIVE_DONE)) {
194                             if (call->nHardAcks > (u_short) rxi_HardAckRate) {
195                                 rxevent_Cancel(call->delayedAckEvent, call,
196                                                RX_CALL_REFCOUNT_DELAY);
197                                 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
198                             } else {
199                                 struct clock when, now;
200                                 clock_GetTime(&now);
201                                 when = now;
202                                 /* Delay to consolidate ack packets */
203                                 clock_Add(&when, &rx_hardAckDelay);
204                                 if (!call->delayedAckEvent
205                                     || clock_Gt(&call->delayedAckEvent->
206                                                 eventTime, &when)) {
207                                     rxevent_Cancel(call->delayedAckEvent,
208                                                    call,
209                                                    RX_CALL_REFCOUNT_DELAY);
210                                     CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
211                                     call->delayedAckEvent =
212                                       rxevent_PostNow(&when, &now,
213                                                      rxi_SendDelayedAck, call,
214                                                      0);
215                                 }
216                             }
217                         }
218                         break;
219                     }
220                 }
221
222 /*
223 MTUXXX  doesn't there need to be an "else" here ??? 
224 */
225                 /* Are there ever going to be any more packets? */
226                 if (call->flags & RX_CALL_RECEIVE_DONE) {
227                     return requestCount - nbytes;
228                 }
229                 /* Wait for in-sequence packet */
230                 call->flags |= RX_CALL_READER_WAIT;
231                 clock_NewTime();
232                 call->startWait = clock_Sec();
233                 while (call->flags & RX_CALL_READER_WAIT) {
234 #ifdef  RX_ENABLE_LOCKS
235                     CV_WAIT(&call->cv_rq, &call->lock);
236 #else
237                     osi_rxSleep(&call->rq);
238 #endif
239                 }
240
241                 call->startWait = 0;
242 #ifdef RX_ENABLE_LOCKS
243                 if (call->error) {
244                     return 0;
245                 }
246 #endif /* RX_ENABLE_LOCKS */
247             }
248         } else
249             /* assert(cp); */
250             /* MTUXXX  this should be replaced by some error-recovery code before shipping */
251             /* yes, the following block is allowed to be the ELSE clause (or not) */
252             /* It's possible for call->nLeft to be smaller than any particular
253              * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
254              * reflects the size of the buffer.  We have to keep track of the
255              * number of bytes read in the length field of the packet struct.  On
256              * the final portion of a received packet, it's almost certain that
257              * call->nLeft will be smaller than the final buffer. */
258             while (nbytes && cp) {
259                 t = MIN((int)call->curlen, nbytes);
260                 t = MIN(t, (int)call->nLeft);
261                 memcpy(buf, call->curpos, t);
262                 buf += t;
263                 nbytes -= t;
264                 call->curpos += t;
265                 call->curlen -= t;
266                 call->nLeft -= t;
267
268                 if (!call->nLeft) {
269                     /* out of packet.  Get another one. */
270                     call->currentPacket->flags &= ~RX_PKTFLAG_CP;
271                     rxi_FreePacket(cp);
272                     cp = call->currentPacket = (struct rx_packet *)0;
273                 } else if (!call->curlen) {
274                     /* need to get another struct iov */
275                     if (++call->curvec >= cp->niovecs) {
276                         /* current packet is exhausted, get ready for another */
277                         /* don't worry about curvec and stuff, they get set somewhere else */
278                         call->currentPacket->flags &= ~RX_PKTFLAG_CP;
279                         rxi_FreePacket(cp);
280                         cp = call->currentPacket = (struct rx_packet *)0;
281                         call->nLeft = 0;
282                     } else {
283                         call->curpos =
284                             (char *)cp->wirevec[call->curvec].iov_base;
285                         call->curlen = cp->wirevec[call->curvec].iov_len;
286                     }
287                 }
288             }
289         if (!nbytes) {
290             /* user buffer is full, return */
291             return requestCount;
292         }
293
294     } while (nbytes);
295
296     return requestCount;
297 }
298
299 int
300 rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
301 {
302     int bytes;
303     int tcurlen;
304     int tnLeft;
305     char *tcurpos;
306     SPLVAR;
307
308     /*
309      * Free any packets from the last call to ReadvProc/WritevProc.
310      * We do not need the lock because the receiver threads only
311      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
312      * RX_CALL_IOVEC_WAIT is always cleared before returning from
313      * ReadvProc/WritevProc.
314      */
315     if (!queue_IsEmpty(&call->iovq)) {
316         rxi_FreePackets(0, &call->iovq);
317     }
318
319     /*
320      * Most common case, all of the data is in the current iovec.
321      * We do not need the lock because this is the only thread that
322      * updates the curlen, curpos, nLeft fields.
323      *
324      * We are relying on nLeft being zero unless the call is in receive mode.
325      */
326     tcurlen = call->curlen;
327     tnLeft = call->nLeft;
328     if (!call->error && tcurlen > nbytes && tnLeft > nbytes) {
329         tcurpos = call->curpos;
330         memcpy(buf, tcurpos, nbytes);
331         call->curpos = tcurpos + nbytes;
332         call->curlen = tcurlen - nbytes;
333         call->nLeft = tnLeft - nbytes;
334
335         if (!call->nLeft) {
336             /* out of packet.  Get another one. */
337             NETPRI;
338             MUTEX_ENTER(&call->lock);
339             rxi_FreePacket(call->currentPacket);
340             call->currentPacket = (struct rx_packet *)0;
341             MUTEX_EXIT(&call->lock);
342             USERPRI;
343         }
344         return nbytes;
345     }
346
347     NETPRI;
348     MUTEX_ENTER(&call->lock);
349     bytes = rxi_ReadProc(call, buf, nbytes);
350     MUTEX_EXIT(&call->lock);
351     USERPRI;
352     return bytes;
353 }
354
355 /* Optimization for unmarshalling 32 bit integers */
356 int
357 rx_ReadProc32(struct rx_call *call, afs_int32 * value)
358 {
359     int bytes;
360     int tcurlen;
361     int tnLeft;
362     char *tcurpos;
363     SPLVAR;
364
365     /*
366      * Free any packets from the last call to ReadvProc/WritevProc.
367      * We do not need the lock because the receiver threads only
368      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
369      * RX_CALL_IOVEC_WAIT is always cleared before returning from
370      * ReadvProc/WritevProc.
371      */
372     if (!queue_IsEmpty(&call->iovq)) {
373         rxi_FreePackets(0, &call->iovq);
374     }
375
376     /*
377      * Most common case, all of the data is in the current iovec.
378      * We do not need the lock because this is the only thread that
379      * updates the curlen, curpos, nLeft fields.
380      *
381      * We are relying on nLeft being zero unless the call is in receive mode.
382      */
383     tcurlen = call->curlen;
384     tnLeft = call->nLeft;
385     if (!call->error && tcurlen >= sizeof(afs_int32)
386         && tnLeft >= sizeof(afs_int32)) {
387         tcurpos = call->curpos;
388         memcpy((char *)value, tcurpos, sizeof(afs_int32));
389         call->curpos = tcurpos + sizeof(afs_int32);
390         call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
391         call->nLeft = (u_short)(tnLeft - sizeof(afs_int32));
392         if (!call->nLeft && call->currentPacket != NULL) {
393             /* out of packet.  Get another one. */
394             NETPRI;
395             MUTEX_ENTER(&call->lock);
396             rxi_FreePacket(call->currentPacket);
397             call->currentPacket = (struct rx_packet *)0;
398             MUTEX_EXIT(&call->lock);
399             USERPRI;
400         }
401         return sizeof(afs_int32);
402     }
403
404     NETPRI;
405     MUTEX_ENTER(&call->lock);
406     bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
407     MUTEX_EXIT(&call->lock);
408     USERPRI;
409     return bytes;
410 }
411
412 /* rxi_FillReadVec
413  *
414  * Uses packets in the receive queue to fill in as much of the
415  * current iovec as possible. Does not block if it runs out
416  * of packets to complete the iovec. Return true if an ack packet
417  * was sent, otherwise return false */
418 int
419 rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
420 {
421     int didConsume = 0;
422     int didHardAck = 0;
423     register unsigned int t;
424     struct rx_packet *rp;
425     struct rx_packet *curp;
426     struct iovec *call_iov;
427     struct iovec *cur_iov = NULL;
428
429     curp = call->currentPacket;
430     if (curp) {
431         cur_iov = &curp->wirevec[call->curvec];
432     }
433     call_iov = &call->iov[call->iovNext];
434
435     while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) {
436         if (call->nLeft == 0) {
437             /* Get next packet */
438             if (queue_IsNotEmpty(&call->rq)) {
439                 /* Check that next packet available is next in sequence */
440                 rp = queue_First(&call->rq, rx_packet);
441                 if (rp->header.seq == call->rnext) {
442                     afs_int32 error;
443                     register struct rx_connection *conn = call->conn;
444                     queue_Remove(rp);
445                     rp->flags &= ~RX_PKTFLAG_RQ;
446
447                     /* RXS_CheckPacket called to undo RXS_PreparePacket's
448                      * work.  It may reduce the length of the packet by up
449                      * to conn->maxTrailerSize, to reflect the length of the
450                      * data + the header. */
451                     if ((error =
452                          RXS_CheckPacket(conn->securityObject, call, rp))) {
453                         /* Used to merely shut down the call, but now we 
454                          * shut down the whole connection since this may 
455                          * indicate an attempt to hijack it */
456
457                         MUTEX_EXIT(&call->lock);
458                         rxi_ConnectionError(conn, error);
459                         MUTEX_ENTER(&conn->conn_data_lock);
460                         rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
461                         MUTEX_EXIT(&conn->conn_data_lock);
462                         rxi_FreePacket(rp);
463                         MUTEX_ENTER(&call->lock);
464
465                         return 1;
466                     }
467                     call->rnext++;
468                     curp = call->currentPacket = rp;
469                     call->currentPacket->flags |= RX_PKTFLAG_CP;
470                     call->curvec = 1;   /* 0th vec is always header */
471                     cur_iov = &curp->wirevec[1];
472                     /* begin at the beginning [ more or less ], continue 
473                      * on until the end, then stop. */
474                     call->curpos =
475                         (char *)curp->wirevec[1].iov_base +
476                         call->conn->securityHeaderSize;
477                     call->curlen =
478                         curp->wirevec[1].iov_len -
479                         call->conn->securityHeaderSize;
480
481                     /* Notice that this code works correctly if the data
482                      * size is 0 (which it may be--no reply arguments from
483                      * server, for example).  This relies heavily on the
484                      * fact that the code below immediately frees the packet
485                      * (no yields, etc.).  If it didn't, this would be a
486                      * problem because a value of zero for call->nLeft
487                      * normally means that there is no read packet */
488                     call->nLeft = curp->length;
489                     hadd32(call->bytesRcvd, curp->length);
490
491                     /* Send a hard ack for every rxi_HardAckRate+1 packets
492                      * consumed. Otherwise schedule an event to send
493                      * the hard ack later on.
494                      */
495                     call->nHardAcks++;
496                     didConsume = 1;
497                     continue;
498                 }
499             }
500             break;
501         }
502
503         /* It's possible for call->nLeft to be smaller than any particular
504          * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
505          * reflects the size of the buffer.  We have to keep track of the
506          * number of bytes read in the length field of the packet struct.  On
507          * the final portion of a received packet, it's almost certain that
508          * call->nLeft will be smaller than the final buffer. */
509         while (call->iovNBytes && call->iovNext < call->iovMax && curp) {
510
511             t = MIN((int)call->curlen, call->iovNBytes);
512             t = MIN(t, (int)call->nLeft);
513             call_iov->iov_base = call->curpos;
514             call_iov->iov_len = t;
515             call_iov++;
516             call->iovNext++;
517             call->iovNBytes -= t;
518             call->curpos += t;
519             call->curlen -= t;
520             call->nLeft -= t;
521
522             if (!call->nLeft) {
523                 /* out of packet.  Get another one. */
524                 curp->flags &= ~RX_PKTFLAG_CP;
525                 curp->flags |= RX_PKTFLAG_IOVQ;
526                 queue_Append(&call->iovq, curp);
527                 curp = call->currentPacket = (struct rx_packet *)0;
528             } else if (!call->curlen) {
529                 /* need to get another struct iov */
530                 if (++call->curvec >= curp->niovecs) {
531                     /* current packet is exhausted, get ready for another */
532                     /* don't worry about curvec and stuff, they get set somewhere else */
533                     curp->flags &= ~RX_PKTFLAG_CP;
534                     curp->flags |= RX_PKTFLAG_IOVQ;
535                     queue_Append(&call->iovq, curp);
536                     curp = call->currentPacket = (struct rx_packet *)0;
537                     call->nLeft = 0;
538                 } else {
539                     cur_iov++;
540                     call->curpos = (char *)cur_iov->iov_base;
541                     call->curlen = cur_iov->iov_len;
542                 }
543             }
544         }
545     }
546
547     /* If we consumed any packets then check whether we need to
548      * send a hard ack. */
549     if (didConsume && (!(call->flags & RX_CALL_RECEIVE_DONE))) {
550         if (call->nHardAcks > (u_short) rxi_HardAckRate) {
551             rxevent_Cancel(call->delayedAckEvent, call,
552                            RX_CALL_REFCOUNT_DELAY);
553             rxi_SendAck(call, 0, serial, RX_ACK_DELAY, 0);
554             didHardAck = 1;
555         } else {
556             struct clock when, now;
557             clock_GetTime(&now);
558             when = now;
559             /* Delay to consolidate ack packets */
560             clock_Add(&when, &rx_hardAckDelay);
561             if (!call->delayedAckEvent
562                 || clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
563                 rxevent_Cancel(call->delayedAckEvent, call,
564                                RX_CALL_REFCOUNT_DELAY);
565                 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
566                 call->delayedAckEvent =
567                     rxevent_PostNow(&when, &now, rxi_SendDelayedAck, call, 0);
568             }
569         }
570     }
571     return didHardAck;
572 }
573
574
575 /* rxi_ReadvProc -- internal version.
576  *
577  * Fills in an iovec with pointers to the packet buffers. All packets
578  * except the last packet (new current packet) are moved to the iovq
579  * while the application is processing the data.
580  *
581  * LOCKS USED -- called at netpri with rx global lock and call->lock held.
582  */
583 int
584 rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
585               int nbytes)
586 {
587     int requestCount;
588     int nextio;
589
590     requestCount = nbytes;
591     nextio = 0;
592
593     /* Free any packets from the last call to ReadvProc/WritevProc */
594     if (queue_IsNotEmpty(&call->iovq)) {
595         rxi_FreePackets(0, &call->iovq);
596     }
597
598     if (call->mode == RX_MODE_SENDING) {
599         rxi_FlushWrite(call);
600     }
601
602     if (call->error) {
603         return 0;
604     }
605
606     /* Get whatever data is currently available in the receive queue.
607      * If rxi_FillReadVec sends an ack packet then it is possible
608      * that we will receive more data while we drop the call lock
609      * to send the packet. Set the RX_CALL_IOVEC_WAIT flag
610      * here to avoid a race with the receive thread if we send
611      * hard acks in rxi_FillReadVec. */
612     call->flags |= RX_CALL_IOVEC_WAIT;
613     call->iovNBytes = nbytes;
614     call->iovMax = maxio;
615     call->iovNext = 0;
616     call->iov = iov;
617     rxi_FillReadVec(call, 0);
618
619     /* if we need more data then sleep until the receive thread has
620      * filled in the rest. */
621     if (!call->error && call->iovNBytes && call->iovNext < call->iovMax
622         && !(call->flags & RX_CALL_RECEIVE_DONE)) {
623         call->flags |= RX_CALL_READER_WAIT;
624         clock_NewTime();
625         call->startWait = clock_Sec();
626         while (call->flags & RX_CALL_READER_WAIT) {
627 #ifdef  RX_ENABLE_LOCKS
628             CV_WAIT(&call->cv_rq, &call->lock);
629 #else
630             osi_rxSleep(&call->rq);
631 #endif
632         }
633         call->startWait = 0;
634     }
635     call->flags &= ~RX_CALL_IOVEC_WAIT;
636 #ifdef RX_ENABLE_LOCKS
637     if (call->error) {
638         return 0;
639     }
640 #endif /* RX_ENABLE_LOCKS */
641
642     call->iov = NULL;
643     *nio = call->iovNext;
644     return nbytes - call->iovNBytes;
645 }
646
647 int
648 rx_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
649              int nbytes)
650 {
651     int bytes;
652     SPLVAR;
653
654     NETPRI;
655     MUTEX_ENTER(&call->lock);
656     bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
657     MUTEX_EXIT(&call->lock);
658     USERPRI;
659     return bytes;
660 }
661
662 /* rxi_WriteProc -- internal version.
663  *
664  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
665
666 int
667 rxi_WriteProc(register struct rx_call *call, register char *buf,
668               register int nbytes)
669 {
670     struct rx_connection *conn = call->conn;
671     register struct rx_packet *cp = call->currentPacket;
672     register unsigned int t;
673     int requestCount = nbytes;
674
675     /* Free any packets from the last call to ReadvProc/WritevProc */
676     if (queue_IsNotEmpty(&call->iovq)) {
677         rxi_FreePackets(0, &call->iovq);
678     }
679
680     if (call->mode != RX_MODE_SENDING) {
681         if ((conn->type == RX_SERVER_CONNECTION)
682             && (call->mode == RX_MODE_RECEIVING)) {
683             call->mode = RX_MODE_SENDING;
684             if (cp) {
685                 cp->flags &= ~RX_PKTFLAG_CP;
686                 rxi_FreePacket(cp);
687                 cp = call->currentPacket = (struct rx_packet *)0;
688                 call->nLeft = 0;
689                 call->nFree = 0;
690             }
691         } else {
692             return 0;
693         }
694     }
695
696     /* Loop condition is checked at end, so that a write of 0 bytes
697      * will force a packet to be created--specially for the case where
698      * there are 0 bytes on the stream, but we must send a packet
699      * anyway. */
700     do {
701         if (call->nFree == 0) {
702             if (!call->error && cp) {
703                 /* Clear the current packet now so that if
704                  * we are forced to wait and drop the lock 
705                  * the packet we are planning on using 
706                  * cannot be freed.
707                  */
708                 cp->flags &= ~RX_PKTFLAG_CP;
709                 call->currentPacket = (struct rx_packet *)0;
710 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
711                 /* Wait until TQ_BUSY is reset before adding any
712                  * packets to the transmit queue
713                  */
714                 while (call->flags & RX_CALL_TQ_BUSY) {
715                     call->flags |= RX_CALL_TQ_WAIT;
716 #ifdef RX_ENABLE_LOCKS
717                     CV_WAIT(&call->cv_tq, &call->lock);
718 #else /* RX_ENABLE_LOCKS */
719                     osi_rxSleep(&call->tq);
720 #endif /* RX_ENABLE_LOCKS */
721                 }
722 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
723                 clock_NewTime();        /* Bogus:  need new time package */
724                 /* The 0, below, specifies that it is not the last packet: 
725                  * there will be others. PrepareSendPacket may
726                  * alter the packet length by up to
727                  * conn->securityMaxTrailerSize */
728                 hadd32(call->bytesSent, cp->length);
729                 rxi_PrepareSendPacket(call, cp, 0);
730                 cp->flags |= RX_PKTFLAG_TQ;
731                 queue_Append(&call->tq, cp);
732                 cp = (struct rx_packet *)0;
733                 if (!
734                     (call->
735                      flags & (RX_CALL_FAST_RECOVER |
736                               RX_CALL_FAST_RECOVER_WAIT))) {
737                     rxi_Start(0, call, 0, 0);
738                 }
739             } else if (cp) {
740                 cp->flags &= ~RX_PKTFLAG_CP;
741                 rxi_FreePacket(cp);
742                 cp = call->currentPacket = (struct rx_packet *)0;
743             }
744             /* Wait for transmit window to open up */
745             while (!call->error
746                    && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
747                 clock_NewTime();
748                 call->startWait = clock_Sec();
749
750 #ifdef  RX_ENABLE_LOCKS
751                 CV_WAIT(&call->cv_twind, &call->lock);
752 #else
753                 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
754                 osi_rxSleep(&call->twind);
755 #endif
756
757                 call->startWait = 0;
758 #ifdef RX_ENABLE_LOCKS
759                 if (call->error) {
760                     return 0;
761                 }
762 #endif /* RX_ENABLE_LOCKS */
763             }
764             if ((cp = rxi_AllocSendPacket(call, nbytes))) {
765                 cp->flags |= RX_PKTFLAG_CP;
766                 call->currentPacket = cp;
767                 call->nFree = cp->length;
768                 call->curvec = 1;       /* 0th vec is always header */
769                 /* begin at the beginning [ more or less ], continue 
770                  * on until the end, then stop. */
771                 call->curpos =
772                     (char *)cp->wirevec[1].iov_base +
773                     call->conn->securityHeaderSize;
774                 call->curlen =
775                     cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
776             }
777             if (call->error) {
778                 if (cp) {
779                     cp->flags &= ~RX_PKTFLAG_CP;
780                     rxi_FreePacket(cp);
781                     call->currentPacket = NULL;
782                 }
783                 return 0;
784             }
785         }
786
787         if (cp && (int)call->nFree < nbytes) {
788             /* Try to extend the current buffer */
789             register int len, mud;
790             len = cp->length;
791             mud = rx_MaxUserDataSize(call);
792             if (mud > len) {
793                 int want;
794                 want = MIN(nbytes - (int)call->nFree, mud - len);
795                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
796                 if (cp->length > (unsigned)mud)
797                     cp->length = mud;
798                 call->nFree += (cp->length - len);
799             }
800         }
801
802         /* If the remaining bytes fit in the buffer, then store them
803          * and return.  Don't ship a buffer that's full immediately to
804          * the peer--we don't know if it's the last buffer yet */
805
806         if (!cp) {
807             call->nFree = 0;
808         }
809
810         while (nbytes && call->nFree) {
811
812             t = MIN((int)call->curlen, nbytes);
813             t = MIN((int)call->nFree, t);
814             memcpy(call->curpos, buf, t);
815             buf += t;
816             nbytes -= t;
817             call->curpos += t;
818             call->curlen -= (u_short)t;
819             call->nFree -= (u_short)t;
820
821             if (!call->curlen) {
822                 /* need to get another struct iov */
823                 if (++call->curvec >= cp->niovecs) {
824                     /* current packet is full, extend or send it */
825                     call->nFree = 0;
826                 } else {
827                     call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
828                     call->curlen = cp->wirevec[call->curvec].iov_len;
829                 }
830             }
831         }                       /* while bytes to send and room to send them */
832
833         /* might be out of space now */
834         if (!nbytes) {
835             return requestCount;
836         } else;                 /* more data to send, so get another packet and keep going */
837     } while (nbytes);
838
839     return requestCount - nbytes;
840 }
841
842 int
843 rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
844 {
845     int bytes;
846     int tcurlen;
847     int tnFree;
848     char *tcurpos;
849     SPLVAR;
850
851     /*
852      * Free any packets from the last call to ReadvProc/WritevProc.
853      * We do not need the lock because the receiver threads only
854      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
855      * RX_CALL_IOVEC_WAIT is always cleared before returning from
856      * ReadvProc/WritevProc.
857      */
858     if (queue_IsNotEmpty(&call->iovq)) {
859         rxi_FreePackets(0, &call->iovq);
860     }
861
862     /*
863      * Most common case: all of the data fits in the current iovec.
864      * We do not need the lock because this is the only thread that
865      * updates the curlen, curpos, nFree fields.
866      *
867      * We are relying on nFree being zero unless the call is in send mode.
868      */
869     tcurlen = (int)call->curlen;
870     tnFree = (int)call->nFree;
871     if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
872         tcurpos = call->curpos;
873         memcpy(tcurpos, buf, nbytes);
874         call->curpos = tcurpos + nbytes;
875         call->curlen = (u_short)(tcurlen - nbytes);
876         call->nFree = (u_short)(tnFree - nbytes);
877         return nbytes;
878     }
879
880     NETPRI;
881     MUTEX_ENTER(&call->lock);
882     bytes = rxi_WriteProc(call, buf, nbytes);
883     MUTEX_EXIT(&call->lock);
884     USERPRI;
885     return bytes;
886 }
887
888 /* Optimization for marshalling 32 bit arguments */
889 int
890 rx_WriteProc32(register struct rx_call *call, register afs_int32 * value)
891 {
892     int bytes;
893     int tcurlen;
894     int tnFree;
895     char *tcurpos;
896     SPLVAR;
897
898     /*
899      * Free any packets from the last call to ReadvProc/WritevProc.
900      * We do not need the lock because the receiver threads only
901      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
902      * RX_CALL_IOVEC_WAIT is always cleared before returning from
903      * ReadvProc/WritevProc.
904      */
905     if (queue_IsNotEmpty(&call->iovq)) {
906         rxi_FreePackets(0, &call->iovq);
907     }
908
909     /*
910      * Most common case: all of the data fits in the current iovec.
911      * We do not need the lock because this is the only thread that
912      * updates the curlen, curpos, nFree fields.
913      *
914      * We are relying on nFree being zero unless the call is in send mode.
915      */
916     tcurlen = call->curlen;
917     tnFree = call->nFree;
918     if (!call->error && tcurlen >= sizeof(afs_int32)
919         && tnFree >= sizeof(afs_int32)) {
920         tcurpos = call->curpos;
921         if (!((size_t)tcurpos & (sizeof(afs_int32) - 1))) {
922             *((afs_int32 *) (tcurpos)) = *value;
923         } else {
924             memcpy(tcurpos, (char *)value, sizeof(afs_int32));
925         }
926         call->curpos = tcurpos + sizeof(afs_int32);
927         call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
928         call->nFree = (u_short)(tnFree - sizeof(afs_int32));
929         return sizeof(afs_int32);
930     }
931
932     NETPRI;
933     MUTEX_ENTER(&call->lock);
934     bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
935     MUTEX_EXIT(&call->lock);
936     USERPRI;
937     return bytes;
938 }
939
940 /* rxi_WritevAlloc -- internal version.
941  *
942  * Fill in an iovec to point to data in packet buffers. The application
943  * calls rxi_WritevProc when the buffers are full.
944  *
945  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
946
947 int
948 rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
949                 int nbytes)
950 {
951     struct rx_connection *conn = call->conn;
952     struct rx_packet *cp = call->currentPacket;
953     int requestCount;
954     int nextio;
955     /* Temporary values, real work is done in rxi_WritevProc */
956     int tnFree;
957     int tcurvec;
958     char *tcurpos;
959     int tcurlen;
960
961     requestCount = nbytes;
962     nextio = 0;
963
964     /* Free any packets from the last call to ReadvProc/WritevProc */
965     if (queue_IsNotEmpty(&call->iovq)) {
966         rxi_FreePackets(0, &call->iovq);
967     }
968
969     if (call->mode != RX_MODE_SENDING) {
970         if ((conn->type == RX_SERVER_CONNECTION)
971             && (call->mode == RX_MODE_RECEIVING)) {
972             call->mode = RX_MODE_SENDING;
973             if (cp) {
974                 cp->flags &= ~RX_PKTFLAG_CP;
975                 rxi_FreePacket(cp);
976                 cp = call->currentPacket = (struct rx_packet *)0;
977                 call->nLeft = 0;
978                 call->nFree = 0;
979             }
980         } else {
981             return 0;
982         }
983     }
984
985     /* Set up the iovec to point to data in packet buffers. */
986     tnFree = call->nFree;
987     tcurvec = call->curvec;
988     tcurpos = call->curpos;
989     tcurlen = call->curlen;
990     do {
991         register unsigned int t;
992
993         if (tnFree == 0) {
994             /* current packet is full, allocate a new one */
995             cp = rxi_AllocSendPacket(call, nbytes);
996             if (cp == NULL) {
997                 /* out of space, return what we have */
998                 *nio = nextio;
999                 return requestCount - nbytes;
1000             }
1001             cp->flags |= RX_PKTFLAG_IOVQ;
1002             queue_Append(&call->iovq, cp);
1003             tnFree = cp->length;
1004             tcurvec = 1;
1005             tcurpos =
1006                 (char *)cp->wirevec[1].iov_base +
1007                 call->conn->securityHeaderSize;
1008             tcurlen = cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
1009         }
1010
1011         if (tnFree < nbytes) {
1012             /* try to extend the current packet */
1013             register int len, mud;
1014             len = cp->length;
1015             mud = rx_MaxUserDataSize(call);
1016             if (mud > len) {
1017                 int want;
1018                 want = MIN(nbytes - tnFree, mud - len);
1019                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
1020                 if (cp->length > (unsigned)mud)
1021                     cp->length = mud;
1022                 tnFree += (cp->length - len);
1023                 if (cp == call->currentPacket) {
1024                     call->nFree += (cp->length - len);
1025                 }
1026             }
1027         }
1028
1029         /* fill in the next entry in the iovec */
1030         t = MIN(tcurlen, nbytes);
1031         t = MIN(tnFree, t);
1032         iov[nextio].iov_base = tcurpos;
1033         iov[nextio].iov_len = t;
1034         nbytes -= t;
1035         tcurpos += t;
1036         tcurlen -= t;
1037         tnFree -= t;
1038         nextio++;
1039
1040         if (!tcurlen) {
1041             /* need to get another struct iov */
1042             if (++tcurvec >= cp->niovecs) {
1043                 /* current packet is full, extend it or move on to next packet */
1044                 tnFree = 0;
1045             } else {
1046                 tcurpos = (char *)cp->wirevec[tcurvec].iov_base;
1047                 tcurlen = cp->wirevec[tcurvec].iov_len;
1048             }
1049         }
1050     } while (nbytes && nextio < maxio);
1051     *nio = nextio;
1052     return requestCount - nbytes;
1053 }
1054
1055 int
1056 rx_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
1057                int nbytes)
1058 {
1059     int bytes;
1060     SPLVAR;
1061
1062     NETPRI;
1063     MUTEX_ENTER(&call->lock);
1064     bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
1065     MUTEX_EXIT(&call->lock);
1066     USERPRI;
1067     return bytes;
1068 }
1069
1070 /* rxi_WritevProc -- internal version.
1071  *
1072  * Send buffers allocated in rxi_WritevAlloc.
1073  *
1074  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
1075
1076 int
1077 rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1078 {
1079     struct rx_packet *cp = call->currentPacket;
1080     int nextio;
1081     int requestCount;
1082     struct rx_queue tmpq;
1083
1084     requestCount = nbytes;
1085     nextio = 0;
1086
1087     if (call->mode != RX_MODE_SENDING) {
1088         call->error = RX_PROTOCOL_ERROR;
1089     }
1090 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1091     /* Wait until TQ_BUSY is reset before trying to move any
1092      * packets to the transmit queue.  */
1093     while (!call->error && call->flags & RX_CALL_TQ_BUSY) {
1094         call->flags |= RX_CALL_TQ_WAIT;
1095 #ifdef RX_ENABLE_LOCKS
1096         CV_WAIT(&call->cv_tq, &call->lock);
1097 #else /* RX_ENABLE_LOCKS */
1098         osi_rxSleep(&call->tq);
1099 #endif /* RX_ENABLE_LOCKS */
1100     }
1101 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1102
1103     if (call->error) {
1104         if (cp) {
1105             cp->flags &= ~RX_PKTFLAG_CP;
1106             cp->flags |= RX_PKTFLAG_IOVQ;
1107             queue_Prepend(&call->iovq, cp);
1108             cp = call->currentPacket = (struct rx_packet *)0;
1109         }
1110         rxi_FreePackets(0, &call->iovq);
1111         return 0;
1112     }
1113
1114     /* Loop through the I/O vector adjusting packet pointers.
1115      * Place full packets back onto the iovq once they are ready
1116      * to send. Set RX_PROTOCOL_ERROR if any problems are found in
1117      * the iovec. We put the loop condition at the end to ensure that
1118      * a zero length write will push a short packet. */
1119     nextio = 0;
1120     queue_Init(&tmpq);
1121     do {
1122         if (call->nFree == 0 && cp) {
1123             clock_NewTime();    /* Bogus:  need new time package */
1124             /* The 0, below, specifies that it is not the last packet: 
1125              * there will be others. PrepareSendPacket may
1126              * alter the packet length by up to
1127              * conn->securityMaxTrailerSize */
1128             hadd32(call->bytesSent, cp->length);
1129             rxi_PrepareSendPacket(call, cp, 0);
1130             cp->flags |= RX_PKTFLAG_TQ;
1131             queue_Append(&tmpq, cp);
1132             cp = call->currentPacket = (struct rx_packet *)0;
1133
1134             /* The head of the iovq is now the current packet */
1135             if (nbytes) {
1136                 if (queue_IsEmpty(&call->iovq)) {
1137                     call->error = RX_PROTOCOL_ERROR;
1138                     rxi_FreePackets(0, &tmpq);
1139                     return 0;
1140                 }
1141                 cp = queue_First(&call->iovq, rx_packet);
1142                 queue_Remove(cp);
1143                 cp->flags &= ~RX_PKTFLAG_IOVQ;
1144                 cp->flags |= RX_PKTFLAG_CP;
1145                 call->currentPacket = cp;
1146                 call->nFree = cp->length;
1147                 call->curvec = 1;
1148                 call->curpos =
1149                     (char *)cp->wirevec[1].iov_base +
1150                     call->conn->securityHeaderSize;
1151                 call->curlen =
1152                     cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
1153             }
1154         }
1155
1156         if (nbytes) {
1157             /* The next iovec should point to the current position */
1158             if (iov[nextio].iov_base != call->curpos
1159                 || iov[nextio].iov_len > (int)call->curlen) {
1160                 call->error = RX_PROTOCOL_ERROR;
1161                 if (cp) {
1162                     cp->flags &= ~RX_PKTFLAG_CP;
1163                     queue_Prepend(&tmpq, cp);
1164                     cp = call->currentPacket = (struct rx_packet *)0;
1165                 }
1166                 rxi_FreePackets(0, &tmpq);
1167                 return 0;
1168             }
1169             nbytes -= iov[nextio].iov_len;
1170             call->curpos += iov[nextio].iov_len;
1171             call->curlen -= iov[nextio].iov_len;
1172             call->nFree -= iov[nextio].iov_len;
1173             nextio++;
1174             if (call->curlen == 0) {
1175                 if (++call->curvec > cp->niovecs) {
1176                     call->nFree = 0;
1177                 } else {
1178                     call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
1179                     call->curlen = cp->wirevec[call->curvec].iov_len;
1180                 }
1181             }
1182         }
1183     } while (nbytes && nextio < nio);
1184
1185     /* Move the packets from the temporary queue onto the transmit queue.
1186      * We may end up with more than call->twind packets on the queue. */
1187     queue_SpliceAppend(&call->tq, &tmpq);
1188
1189     if (!(call->flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
1190         rxi_Start(0, call, 0, 0);
1191     }
1192
1193     /* Wait for the length of the transmit queue to fall below call->twind */
1194     while (!call->error && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
1195         clock_NewTime();
1196         call->startWait = clock_Sec();
1197 #ifdef  RX_ENABLE_LOCKS
1198         CV_WAIT(&call->cv_twind, &call->lock);
1199 #else
1200         call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
1201         osi_rxSleep(&call->twind);
1202 #endif
1203         call->startWait = 0;
1204     }
1205
1206     if (call->error) {
1207         if (cp) {
1208             cp->flags &= ~RX_PKTFLAG_CP;
1209             rxi_FreePacket(cp);
1210             cp = call->currentPacket = (struct rx_packet *)0;
1211         }
1212         return 0;
1213     }
1214
1215     return requestCount - nbytes;
1216 }
1217
1218 int
1219 rx_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1220 {
1221     int bytes;
1222     SPLVAR;
1223
1224     NETPRI;
1225     MUTEX_ENTER(&call->lock);
1226     bytes = rxi_WritevProc(call, iov, nio, nbytes);
1227     MUTEX_EXIT(&call->lock);
1228     USERPRI;
1229     return bytes;
1230 }
1231
1232 /* Flush any buffered data to the stream, switch to read mode
1233  * (clients) or to EOF mode (servers) */
1234 void
1235 rxi_FlushWrite(register struct rx_call *call)
1236 {
1237     register struct rx_packet *cp = call->currentPacket;
1238
1239     /* Free any packets from the last call to ReadvProc/WritevProc */
1240     if (queue_IsNotEmpty(&call->iovq)) {
1241         rxi_FreePackets(0, &call->iovq);
1242     }
1243
1244     if (call->mode == RX_MODE_SENDING) {
1245
1246         call->mode =
1247             (call->conn->type ==
1248              RX_CLIENT_CONNECTION ? RX_MODE_RECEIVING : RX_MODE_EOF);
1249
1250 #ifdef RX_KERNEL_TRACE
1251         {
1252             int glockOwner = ISAFS_GLOCK();
1253             if (!glockOwner)
1254                 AFS_GLOCK();
1255             afs_Trace3(afs_iclSetp, CM_TRACE_WASHERE, ICL_TYPE_STRING,
1256                        __FILE__, ICL_TYPE_INT32, __LINE__, ICL_TYPE_POINTER,
1257                        call);
1258             if (!glockOwner)
1259                 AFS_GUNLOCK();
1260         }
1261 #endif
1262
1263 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1264         /* Wait until TQ_BUSY is reset before adding any
1265          * packets to the transmit queue
1266          */
1267         while (call->flags & RX_CALL_TQ_BUSY) {
1268             call->flags |= RX_CALL_TQ_WAIT;
1269 #ifdef RX_ENABLE_LOCKS
1270             CV_WAIT(&call->cv_tq, &call->lock);
1271 #else /* RX_ENABLE_LOCKS */
1272             osi_rxSleep(&call->tq);
1273 #endif /* RX_ENABLE_LOCKS */
1274         }
1275 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1276
1277         if (cp) {
1278             /* cp->length is only supposed to be the user's data */
1279             /* cp->length was already set to (then-current) 
1280              * MaxUserDataSize or less. */
1281             cp->flags &= ~RX_PKTFLAG_CP;
1282             cp->length -= call->nFree;
1283             call->currentPacket = (struct rx_packet *)0;
1284             call->nFree = 0;
1285         } else {
1286             cp = rxi_AllocSendPacket(call, 0);
1287             if (!cp) {
1288                 /* Mode can no longer be MODE_SENDING */
1289                 return;
1290             }
1291             cp->length = 0;
1292             cp->niovecs = 2;    /* header + space for rxkad stuff */
1293             call->nFree = 0;
1294         }
1295
1296         /* The 1 specifies that this is the last packet */
1297         hadd32(call->bytesSent, cp->length);
1298         rxi_PrepareSendPacket(call, cp, 1);
1299         cp->flags |= RX_PKTFLAG_TQ;
1300         queue_Append(&call->tq, cp);
1301         if (!
1302             (call->
1303              flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
1304             rxi_Start(0, call, 0, 0);
1305         }
1306     }
1307 }
1308
1309 /* Flush any buffered data to the stream, switch to read mode
1310  * (clients) or to EOF mode (servers) */
1311 void
1312 rx_FlushWrite(struct rx_call *call)
1313 {
1314     SPLVAR;
1315     NETPRI;
1316     MUTEX_ENTER(&call->lock);
1317     rxi_FlushWrite(call);
1318     MUTEX_EXIT(&call->lock);
1319     USERPRI;
1320 }