35fcce6d4477428cc96d7269670028513731a488
[openafs.git] / src / rx / rx_rdwr.c
1  /*
2   * Copyright 2000, International Business Machines Corporation and others.
3   * All Rights Reserved.
4   * 
5   * This software has been released under the terms of the IBM Public
6   * License.  For details, see the LICENSE file in the top-level source
7   * directory or online at http://www.openafs.org/dl/license10.html
8   */
9
10 #include <afsconfig.h>
11 #ifdef KERNEL
12 #include "afs/param.h"
13 #else
14 #include <afs/param.h>
15 #endif
16
17 RCSID
18     ("$Header$");
19
20 #ifdef KERNEL
21 #ifndef UKERNEL
22 #ifdef RX_KERNEL_TRACE
23 #include "rx_kcommon.h"
24 #endif
25 #if defined(AFS_DARWIN_ENV) || defined(AFS_XBSD_ENV)
26 #include "afs/sysincludes.h"
27 #else
28 #include "h/types.h"
29 #include "h/time.h"
30 #include "h/stat.h"
31 #if defined(AFS_AIX_ENV) || defined(AFS_AUX_ENV) || defined(AFS_SUN5_ENV) 
32 #include "h/systm.h"
33 #endif
34 #ifdef  AFS_OSF_ENV
35 #include <net/net_globals.h>
36 #endif /* AFS_OSF_ENV */
37 #ifdef AFS_LINUX20_ENV
38 #include "h/socket.h"
39 #endif
40 #include "netinet/in.h"
41 #if defined(AFS_SGI_ENV)
42 #include "afs/sysincludes.h"
43 #endif
44 #endif
45 #include "afs/afs_args.h"
46 #include "afs/afs_osi.h"
47 #if     (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
48 #include "h/systm.h"
49 #endif
50 #else /* !UKERNEL */
51 #include "afs/sysincludes.h"
52 #endif /* !UKERNEL */
53 #ifdef RXDEBUG
54 #undef RXDEBUG                  /* turn off debugging */
55 #endif /* RXDEBUG */
56
57 #include "rx_kmutex.h"
58 #include "rx/rx_kernel.h"
59 #include "rx/rx_clock.h"
60 #include "rx/rx_queue.h"
61 #include "rx/rx_internal.h"
62 #include "rx/rx.h"
63 #include "rx/rx_globals.h"
64 #include "afs/lock.h"
65 #include "afsint.h"
66 #ifdef  AFS_OSF_ENV
67 #undef kmem_alloc
68 #undef kmem_free
69 #undef mem_alloc
70 #undef mem_free
71 #undef register
72 #endif /* AFS_OSF_ENV */
73 #else /* KERNEL */
74 # include <sys/types.h>
75 #ifdef AFS_NT40_ENV
76 # include <winsock2.h>
77 #else /* !AFS_NT40_ENV */
78 # include <sys/socket.h>
79 # include <sys/file.h>
80 # include <netdb.h>
81 # include <netinet/in.h>
82 # include <sys/stat.h>
83 # include <sys/time.h>
84 #endif /* !AFS_NT40_ENV */
85 #include <string.h>
86 #ifdef HAVE_UNISTD_H
87 #include <unistd.h>
88 #endif
89 # include "rx_user.h"
90 # include "rx_clock.h"
91 # include "rx_queue.h"
92 # include "rx_internal.h"
93 # include "rx.h"
94 # include "rx_globals.h"
95 #endif /* KERNEL */
96
97 #ifdef RX_LOCKS_DB
98 /* rxdb_fileID is used to identify the lock location, along with line#. */
99 static int rxdb_fileID = RXDB_FILE_RX_RDWR;
100 #endif /* RX_LOCKS_DB */
101 /* rxi_ReadProc -- internal version.
102  *
103  * LOCKS USED -- called at netpri with rx global lock and call->lock held.
104  */
105 int
106 rxi_ReadProc(register struct rx_call *call, register char *buf,
107              register int nbytes)
108 {
109     register struct rx_packet *cp = call->currentPacket;
110     register struct rx_packet *rp;
111     register int requestCount;
112     register unsigned int t;
113
114 /* XXXX took out clock_NewTime from here.  Was it needed? */
115     requestCount = nbytes;
116
117     /* Free any packets from the last call to ReadvProc/WritevProc */
118     if (queue_IsNotEmpty(&call->iovq)) {
119         rxi_FreePackets(0, &call->iovq);
120     }
121
122     do {
123         if (call->nLeft == 0) {
124             /* Get next packet */
125             for (;;) {
126                 if (call->error || (call->mode != RX_MODE_RECEIVING)) {
127                     if (call->error) {
128                         return 0;
129                     }
130                     if (call->mode == RX_MODE_SENDING) {
131                         rxi_FlushWrite(call);
132                         continue;
133                     }
134                 }
135                 if (queue_IsNotEmpty(&call->rq)) {
136                     /* Check that next packet available is next in sequence */
137                     rp = queue_First(&call->rq, rx_packet);
138                     if (rp->header.seq == call->rnext) {
139                         afs_int32 error;
140                         register struct rx_connection *conn = call->conn;
141                         queue_Remove(rp);
142                         rp->flags &= ~RX_PKTFLAG_RQ;
143
144                         /* RXS_CheckPacket called to undo RXS_PreparePacket's
145                          * work.  It may reduce the length of the packet by up
146                          * to conn->maxTrailerSize, to reflect the length of the
147                          * data + the header. */
148                         if ((error =
149                              RXS_CheckPacket(conn->securityObject, call,
150                                              rp))) {
151                             /* Used to merely shut down the call, but now we 
152                              * shut down the whole connection since this may 
153                              * indicate an attempt to hijack it */
154
155                             MUTEX_EXIT(&call->lock);
156                             rxi_ConnectionError(conn, error);
157                             MUTEX_ENTER(&conn->conn_data_lock);
158                             rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
159                             MUTEX_EXIT(&conn->conn_data_lock);
160                             rxi_FreePacket(rp);
161                             MUTEX_ENTER(&call->lock);
162
163                             return 0;
164                         }
165                         call->rnext++;
166                         cp = call->currentPacket = rp;
167                         call->currentPacket->flags |= RX_PKTFLAG_CP;
168                         call->curvec = 1;       /* 0th vec is always header */
169                         /* begin at the beginning [ more or less ], continue 
170                          * on until the end, then stop. */
171                         call->curpos =
172                             (char *)cp->wirevec[1].iov_base +
173                             call->conn->securityHeaderSize;
174                         call->curlen =
175                             cp->wirevec[1].iov_len -
176                             call->conn->securityHeaderSize;
177
178                         /* Notice that this code works correctly if the data
179                          * size is 0 (which it may be--no reply arguments from
180                          * server, for example).  This relies heavily on the
181                          * fact that the code below immediately frees the packet
182                          * (no yields, etc.).  If it didn't, this would be a
183                          * problem because a value of zero for call->nLeft
184                          * normally means that there is no read packet */
185                         call->nLeft = cp->length;
186                         hadd32(call->bytesRcvd, cp->length);
187
188                         /* Send a hard ack for every rxi_HardAckRate+1 packets
189                          * consumed. Otherwise schedule an event to send
190                          * the hard ack later on.
191                          */
192                         call->nHardAcks++;
193                         if (!(call->flags & RX_CALL_RECEIVE_DONE)) {
194                             if (call->nHardAcks > (u_short) rxi_HardAckRate) {
195                                 rxevent_Cancel(call->delayedAckEvent, call,
196                                                RX_CALL_REFCOUNT_DELAY);
197                                 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
198                             } else {
199                                 struct clock when, now;
200                                 clock_GetTime(&now);
201                                 when = now;
202                                 /* Delay to consolidate ack packets */
203                                 clock_Add(&when, &rx_hardAckDelay);
204                                 if (!call->delayedAckEvent
205                                     || clock_Gt(&call->delayedAckEvent->
206                                                 eventTime, &when)) {
207                                     rxevent_Cancel(call->delayedAckEvent,
208                                                    call,
209                                                    RX_CALL_REFCOUNT_DELAY);
210                                     CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
211                                     call->delayedAckEvent =
212                                       rxevent_PostNow(&when, &now,
213                                                      rxi_SendDelayedAck, call,
214                                                      0);
215                                 }
216                             }
217                         }
218                         break;
219                     }
220                 }
221
222 /*
223 MTUXXX  doesn't there need to be an "else" here ??? 
224 */
225                 /* Are there ever going to be any more packets? */
226                 if (call->flags & RX_CALL_RECEIVE_DONE) {
227                     return requestCount - nbytes;
228                 }
229                 /* Wait for in-sequence packet */
230                 call->flags |= RX_CALL_READER_WAIT;
231                 clock_NewTime();
232                 call->startWait = clock_Sec();
233                 while (call->flags & RX_CALL_READER_WAIT) {
234 #ifdef  RX_ENABLE_LOCKS
235                     CV_WAIT(&call->cv_rq, &call->lock);
236 #else
237                     osi_rxSleep(&call->rq);
238 #endif
239                 }
240
241                 call->startWait = 0;
242 #ifdef RX_ENABLE_LOCKS
243                 if (call->error) {
244                     return 0;
245                 }
246 #endif /* RX_ENABLE_LOCKS */
247             }
248         } else
249             /* assert(cp); */
250             /* MTUXXX  this should be replaced by some error-recovery code before shipping */
251             /* yes, the following block is allowed to be the ELSE clause (or not) */
252             /* It's possible for call->nLeft to be smaller than any particular
253              * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
254              * reflects the size of the buffer.  We have to keep track of the
255              * number of bytes read in the length field of the packet struct.  On
256              * the final portion of a received packet, it's almost certain that
257              * call->nLeft will be smaller than the final buffer. */
258             while (nbytes && cp) {
259                 t = MIN((int)call->curlen, nbytes);
260                 t = MIN(t, (int)call->nLeft);
261                 memcpy(buf, call->curpos, t);
262                 buf += t;
263                 nbytes -= t;
264                 call->curpos += t;
265                 call->curlen -= t;
266                 call->nLeft -= t;
267
268                 if (!call->nLeft) {
269                     /* out of packet.  Get another one. */
270                     call->currentPacket->flags &= ~RX_PKTFLAG_CP;
271                     rxi_FreePacket(cp);
272                     cp = call->currentPacket = (struct rx_packet *)0;
273                 } else if (!call->curlen) {
274                     /* need to get another struct iov */
275                     if (++call->curvec >= cp->niovecs) {
276                         /* current packet is exhausted, get ready for another */
277                         /* don't worry about curvec and stuff, they get set somewhere else */
278                         call->currentPacket->flags &= ~RX_PKTFLAG_CP;
279                         rxi_FreePacket(cp);
280                         cp = call->currentPacket = (struct rx_packet *)0;
281                         call->nLeft = 0;
282                     } else {
283                         call->curpos =
284                             (char *)cp->wirevec[call->curvec].iov_base;
285                         call->curlen = cp->wirevec[call->curvec].iov_len;
286                     }
287                 }
288             }
289         if (!nbytes) {
290             /* user buffer is full, return */
291             return requestCount;
292         }
293
294     } while (nbytes);
295
296     return requestCount;
297 }
298
299 int
300 rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
301 {
302     int bytes;
303     int tcurlen;
304     int tnLeft;
305     char *tcurpos;
306     SPLVAR;
307
308     /*
309      * Free any packets from the last call to ReadvProc/WritevProc.
310      * We do not need the lock because the receiver threads only
311      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
312      * RX_CALL_IOVEC_WAIT is always cleared before returning from
313      * ReadvProc/WritevProc.
314      */
315     if (!queue_IsEmpty(&call->iovq)) {
316         rxi_FreePackets(0, &call->iovq);
317     }
318
319     /*
320      * Most common case, all of the data is in the current iovec.
321      * We do not need the lock because this is the only thread that
322      * updates the curlen, curpos, nLeft fields.
323      *
324      * We are relying on nLeft being zero unless the call is in receive mode.
325      */
326     tcurlen = call->curlen;
327     tnLeft = call->nLeft;
328     if (!call->error && tcurlen > nbytes && tnLeft > nbytes) {
329         tcurpos = call->curpos;
330         memcpy(buf, tcurpos, nbytes);
331         call->curpos = tcurpos + nbytes;
332         call->curlen = tcurlen - nbytes;
333         call->nLeft = tnLeft - nbytes;
334
335         if (!call->nLeft) {
336             /* out of packet.  Get another one. */
337             NETPRI;
338             MUTEX_ENTER(&call->lock);
339             rxi_FreePacket(call->currentPacket);
340             call->currentPacket = (struct rx_packet *)0;
341             MUTEX_EXIT(&call->lock);
342             USERPRI;
343         }
344         return nbytes;
345     }
346
347     NETPRI;
348     MUTEX_ENTER(&call->lock);
349     bytes = rxi_ReadProc(call, buf, nbytes);
350     MUTEX_EXIT(&call->lock);
351     USERPRI;
352     return bytes;
353 }
354
355 /* Optimization for unmarshalling 32 bit integers */
356 int
357 rx_ReadProc32(struct rx_call *call, afs_int32 * value)
358 {
359     int bytes;
360     int tcurlen;
361     int tnLeft;
362     char *tcurpos;
363     SPLVAR;
364
365     /*
366      * Free any packets from the last call to ReadvProc/WritevProc.
367      * We do not need the lock because the receiver threads only
368      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
369      * RX_CALL_IOVEC_WAIT is always cleared before returning from
370      * ReadvProc/WritevProc.
371      */
372     if (!queue_IsEmpty(&call->iovq)) {
373         rxi_FreePackets(0, &call->iovq);
374     }
375
376     /*
377      * Most common case, all of the data is in the current iovec.
378      * We do not need the lock because this is the only thread that
379      * updates the curlen, curpos, nLeft fields.
380      *
381      * We are relying on nLeft being zero unless the call is in receive mode.
382      */
383     tcurlen = call->curlen;
384     tnLeft = call->nLeft;
385     if (!call->error && tcurlen >= sizeof(afs_int32)
386         && tnLeft >= sizeof(afs_int32)) {
387         tcurpos = call->curpos;
388         memcpy((char *)value, tcurpos, sizeof(afs_int32));
389         call->curpos = tcurpos + sizeof(afs_int32);
390         call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
391         call->nLeft = (u_short)(tnLeft - sizeof(afs_int32));
392         if (!call->nLeft && call->currentPacket != NULL) {
393             /* out of packet.  Get another one. */
394             NETPRI;
395             MUTEX_ENTER(&call->lock);
396             rxi_FreePacket(call->currentPacket);
397             call->currentPacket = (struct rx_packet *)0;
398             MUTEX_EXIT(&call->lock);
399             USERPRI;
400         }
401         return sizeof(afs_int32);
402     }
403
404     NETPRI;
405     MUTEX_ENTER(&call->lock);
406     bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
407     MUTEX_EXIT(&call->lock);
408     USERPRI;
409     return bytes;
410 }
411
412 /* rxi_FillReadVec
413  *
414  * Uses packets in the receive queue to fill in as much of the
415  * current iovec as possible. Does not block if it runs out
416  * of packets to complete the iovec. Return true if an ack packet
417  * was sent, otherwise return false */
418 int
419 rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
420 {
421     int didConsume = 0;
422     int didHardAck = 0;
423     register unsigned int t;
424     struct rx_packet *rp;
425     struct rx_packet *curp;
426     struct iovec *call_iov;
427     struct iovec *cur_iov = NULL;
428
429     curp = call->currentPacket;
430     if (curp) {
431         cur_iov = &curp->wirevec[call->curvec];
432     }
433     call_iov = &call->iov[call->iovNext];
434
435     while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) {
436         if (call->nLeft == 0) {
437             /* Get next packet */
438             if (queue_IsNotEmpty(&call->rq)) {
439                 /* Check that next packet available is next in sequence */
440                 rp = queue_First(&call->rq, rx_packet);
441                 if (rp->header.seq == call->rnext) {
442                     afs_int32 error;
443                     register struct rx_connection *conn = call->conn;
444                     queue_Remove(rp);
445                     rp->flags &= ~RX_PKTFLAG_RQ;
446
447                     /* RXS_CheckPacket called to undo RXS_PreparePacket's
448                      * work.  It may reduce the length of the packet by up
449                      * to conn->maxTrailerSize, to reflect the length of the
450                      * data + the header. */
451                     if ((error =
452                          RXS_CheckPacket(conn->securityObject, call, rp))) {
453                         /* Used to merely shut down the call, but now we 
454                          * shut down the whole connection since this may 
455                          * indicate an attempt to hijack it */
456
457                         MUTEX_EXIT(&call->lock);
458                         rxi_ConnectionError(conn, error);
459                         MUTEX_ENTER(&conn->conn_data_lock);
460                         rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
461                         MUTEX_EXIT(&conn->conn_data_lock);
462                         rxi_FreePacket(rp);
463                         MUTEX_ENTER(&call->lock);
464
465                         return 1;
466                     }
467                     call->rnext++;
468                     curp = call->currentPacket = rp;
469                     call->currentPacket->flags |= RX_PKTFLAG_CP;
470                     call->curvec = 1;   /* 0th vec is always header */
471                     cur_iov = &curp->wirevec[1];
472                     /* begin at the beginning [ more or less ], continue 
473                      * on until the end, then stop. */
474                     call->curpos =
475                         (char *)curp->wirevec[1].iov_base +
476                         call->conn->securityHeaderSize;
477                     call->curlen =
478                         curp->wirevec[1].iov_len -
479                         call->conn->securityHeaderSize;
480
481                     /* Notice that this code works correctly if the data
482                      * size is 0 (which it may be--no reply arguments from
483                      * server, for example).  This relies heavily on the
484                      * fact that the code below immediately frees the packet
485                      * (no yields, etc.).  If it didn't, this would be a
486                      * problem because a value of zero for call->nLeft
487                      * normally means that there is no read packet */
488                     call->nLeft = curp->length;
489                     hadd32(call->bytesRcvd, curp->length);
490
491                     /* Send a hard ack for every rxi_HardAckRate+1 packets
492                      * consumed. Otherwise schedule an event to send
493                      * the hard ack later on.
494                      */
495                     call->nHardAcks++;
496                     didConsume = 1;
497                     continue;
498                 }
499             }
500             break;
501         }
502
503         /* It's possible for call->nLeft to be smaller than any particular
504          * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
505          * reflects the size of the buffer.  We have to keep track of the
506          * number of bytes read in the length field of the packet struct.  On
507          * the final portion of a received packet, it's almost certain that
508          * call->nLeft will be smaller than the final buffer. */
509         while (call->iovNBytes && call->iovNext < call->iovMax && curp) {
510
511             t = MIN((int)call->curlen, call->iovNBytes);
512             t = MIN(t, (int)call->nLeft);
513             call_iov->iov_base = call->curpos;
514             call_iov->iov_len = t;
515             call_iov++;
516             call->iovNext++;
517             call->iovNBytes -= t;
518             call->curpos += t;
519             call->curlen -= t;
520             call->nLeft -= t;
521
522             if (!call->nLeft) {
523                 /* out of packet.  Get another one. */
524                 curp->flags &= ~RX_PKTFLAG_CP;
525                 curp->flags |= RX_PKTFLAG_IOVQ;
526                 queue_Append(&call->iovq, curp);
527                 curp = call->currentPacket = (struct rx_packet *)0;
528             } else if (!call->curlen) {
529                 /* need to get another struct iov */
530                 if (++call->curvec >= curp->niovecs) {
531                     /* current packet is exhausted, get ready for another */
532                     /* don't worry about curvec and stuff, they get set somewhere else */
533                     curp->flags &= ~RX_PKTFLAG_CP;
534                     curp->flags |= RX_PKTFLAG_IOVQ;
535                     queue_Append(&call->iovq, curp);
536                     curp = call->currentPacket = (struct rx_packet *)0;
537                     call->nLeft = 0;
538                 } else {
539                     cur_iov++;
540                     call->curpos = (char *)cur_iov->iov_base;
541                     call->curlen = cur_iov->iov_len;
542                 }
543             }
544         }
545     }
546
547     /* If we consumed any packets then check whether we need to
548      * send a hard ack. */
549     if (didConsume && (!(call->flags & RX_CALL_RECEIVE_DONE))) {
550         if (call->nHardAcks > (u_short) rxi_HardAckRate) {
551             rxevent_Cancel(call->delayedAckEvent, call,
552                            RX_CALL_REFCOUNT_DELAY);
553             rxi_SendAck(call, 0, serial, RX_ACK_DELAY, 0);
554             didHardAck = 1;
555         } else {
556             struct clock when, now;
557             clock_GetTime(&now);
558             when = now;
559             /* Delay to consolidate ack packets */
560             clock_Add(&when, &rx_hardAckDelay);
561             if (!call->delayedAckEvent
562                 || clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
563                 rxevent_Cancel(call->delayedAckEvent, call,
564                                RX_CALL_REFCOUNT_DELAY);
565                 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
566                 call->delayedAckEvent =
567                     rxevent_PostNow(&when, &now, rxi_SendDelayedAck, call, 0);
568             }
569         }
570     }
571     return didHardAck;
572 }
573
574
575 /* rxi_ReadvProc -- internal version.
576  *
577  * Fills in an iovec with pointers to the packet buffers. All packets
578  * except the last packet (new current packet) are moved to the iovq
579  * while the application is processing the data.
580  *
581  * LOCKS USED -- called at netpri with rx global lock and call->lock held.
582  */
583 int
584 rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
585               int nbytes)
586 {
587     int requestCount;
588     int nextio;
589
590     requestCount = nbytes;
591     nextio = 0;
592
593     /* Free any packets from the last call to ReadvProc/WritevProc */
594     if (queue_IsNotEmpty(&call->iovq)) {
595         rxi_FreePackets(0, &call->iovq);
596     }
597
598     if (call->mode == RX_MODE_SENDING) {
599         rxi_FlushWrite(call);
600     }
601
602     if (call->error) {
603         return 0;
604     }
605
606     /* Get whatever data is currently available in the receive queue.
607      * If rxi_FillReadVec sends an ack packet then it is possible
608      * that we will receive more data while we drop the call lock
609      * to send the packet. Set the RX_CALL_IOVEC_WAIT flag
610      * here to avoid a race with the receive thread if we send
611      * hard acks in rxi_FillReadVec. */
612     call->flags |= RX_CALL_IOVEC_WAIT;
613     call->iovNBytes = nbytes;
614     call->iovMax = maxio;
615     call->iovNext = 0;
616     call->iov = iov;
617     rxi_FillReadVec(call, 0);
618
619     /* if we need more data then sleep until the receive thread has
620      * filled in the rest. */
621     if (!call->error && call->iovNBytes && call->iovNext < call->iovMax
622         && !(call->flags & RX_CALL_RECEIVE_DONE)) {
623         call->flags |= RX_CALL_READER_WAIT;
624         clock_NewTime();
625         call->startWait = clock_Sec();
626         while (call->flags & RX_CALL_READER_WAIT) {
627 #ifdef  RX_ENABLE_LOCKS
628             CV_WAIT(&call->cv_rq, &call->lock);
629 #else
630             osi_rxSleep(&call->rq);
631 #endif
632         }
633         call->startWait = 0;
634     }
635     call->flags &= ~RX_CALL_IOVEC_WAIT;
636 #ifdef RX_ENABLE_LOCKS
637     if (call->error) {
638         return 0;
639     }
640 #endif /* RX_ENABLE_LOCKS */
641
642     call->iov = NULL;
643     *nio = call->iovNext;
644     return nbytes - call->iovNBytes;
645 }
646
647 int
648 rx_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
649              int nbytes)
650 {
651     int bytes;
652     SPLVAR;
653
654     NETPRI;
655     MUTEX_ENTER(&call->lock);
656     bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
657     MUTEX_EXIT(&call->lock);
658     USERPRI;
659     return bytes;
660 }
661
662 /* rxi_WriteProc -- internal version.
663  *
664  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
665
666 int
667 rxi_WriteProc(register struct rx_call *call, register char *buf,
668               register int nbytes)
669 {
670     struct rx_connection *conn = call->conn;
671     register struct rx_packet *cp = call->currentPacket;
672     register unsigned int t;
673     int requestCount = nbytes;
674
675     /* Free any packets from the last call to ReadvProc/WritevProc */
676     if (queue_IsNotEmpty(&call->iovq)) {
677         rxi_FreePackets(0, &call->iovq);
678     }
679
680     if (call->mode != RX_MODE_SENDING) {
681         if ((conn->type == RX_SERVER_CONNECTION)
682             && (call->mode == RX_MODE_RECEIVING)) {
683             call->mode = RX_MODE_SENDING;
684             if (cp) {
685                 cp->flags &= ~RX_PKTFLAG_CP;
686                 rxi_FreePacket(cp);
687                 cp = call->currentPacket = (struct rx_packet *)0;
688                 call->nLeft = 0;
689                 call->nFree = 0;
690             }
691         } else {
692             return 0;
693         }
694     }
695
696     /* Loop condition is checked at end, so that a write of 0 bytes
697      * will force a packet to be created--specially for the case where
698      * there are 0 bytes on the stream, but we must send a packet
699      * anyway. */
700     do {
701         if (call->nFree == 0) {
702             if (!call->error && cp) {
703 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
704                 /* Wait until TQ_BUSY is reset before adding any
705                  * packets to the transmit queue
706                  */
707                 while (call->flags & RX_CALL_TQ_BUSY) {
708                     call->flags |= RX_CALL_TQ_WAIT;
709 #ifdef RX_ENABLE_LOCKS
710                     CV_WAIT(&call->cv_tq, &call->lock);
711 #else /* RX_ENABLE_LOCKS */
712                     osi_rxSleep(&call->tq);
713 #endif /* RX_ENABLE_LOCKS */
714                 }
715 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
716                 clock_NewTime();        /* Bogus:  need new time package */
717                 /* The 0, below, specifies that it is not the last packet: 
718                  * there will be others. PrepareSendPacket may
719                  * alter the packet length by up to
720                  * conn->securityMaxTrailerSize */
721                 hadd32(call->bytesSent, cp->length);
722                 rxi_PrepareSendPacket(call, cp, 0);
723                 cp->flags &= ~RX_PKTFLAG_CP;
724                 cp->flags |= RX_PKTFLAG_TQ;
725                 queue_Append(&call->tq, cp);
726                 cp = call->currentPacket = (struct rx_packet *)0;
727                 if (!
728                     (call->
729                      flags & (RX_CALL_FAST_RECOVER |
730                               RX_CALL_FAST_RECOVER_WAIT))) {
731                     rxi_Start(0, call, 0, 0);
732                 }
733             } else if (cp) {
734                 cp->flags &= ~RX_PKTFLAG_CP;
735                 rxi_FreePacket(cp);
736                 cp = call->currentPacket = (struct rx_packet *)0;
737             }
738             /* Wait for transmit window to open up */
739             while (!call->error
740                    && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
741                 clock_NewTime();
742                 call->startWait = clock_Sec();
743
744 #ifdef  RX_ENABLE_LOCKS
745                 CV_WAIT(&call->cv_twind, &call->lock);
746 #else
747                 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
748                 osi_rxSleep(&call->twind);
749 #endif
750
751                 call->startWait = 0;
752 #ifdef RX_ENABLE_LOCKS
753                 if (call->error) {
754                     return 0;
755                 }
756 #endif /* RX_ENABLE_LOCKS */
757             }
758             if ((cp = rxi_AllocSendPacket(call, nbytes))) {
759                 cp->flags |= RX_PKTFLAG_CP;
760                 call->currentPacket = cp;
761                 call->nFree = cp->length;
762                 call->curvec = 1;       /* 0th vec is always header */
763                 /* begin at the beginning [ more or less ], continue 
764                  * on until the end, then stop. */
765                 call->curpos =
766                     (char *)cp->wirevec[1].iov_base +
767                     call->conn->securityHeaderSize;
768                 call->curlen =
769                     cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
770             }
771             if (call->error) {
772                 if (cp) {
773                     cp->flags &= ~RX_PKTFLAG_CP;
774                     rxi_FreePacket(cp);
775                     call->currentPacket = NULL;
776                 }
777                 return 0;
778             }
779         }
780
781         if (cp && (int)call->nFree < nbytes) {
782             /* Try to extend the current buffer */
783             register int len, mud;
784             len = cp->length;
785             mud = rx_MaxUserDataSize(call);
786             if (mud > len) {
787                 int want;
788                 want = MIN(nbytes - (int)call->nFree, mud - len);
789                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
790                 if (cp->length > (unsigned)mud)
791                     cp->length = mud;
792                 call->nFree += (cp->length - len);
793             }
794         }
795
796         /* If the remaining bytes fit in the buffer, then store them
797          * and return.  Don't ship a buffer that's full immediately to
798          * the peer--we don't know if it's the last buffer yet */
799
800         if (!cp) {
801             call->nFree = 0;
802         }
803
804         while (nbytes && call->nFree) {
805
806             t = MIN((int)call->curlen, nbytes);
807             t = MIN((int)call->nFree, t);
808             memcpy(call->curpos, buf, t);
809             buf += t;
810             nbytes -= t;
811             call->curpos += t;
812             call->curlen -= (u_short)t;
813             call->nFree -= (u_short)t;
814
815             if (!call->curlen) {
816                 /* need to get another struct iov */
817                 if (++call->curvec >= cp->niovecs) {
818                     /* current packet is full, extend or send it */
819                     call->nFree = 0;
820                 } else {
821                     call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
822                     call->curlen = cp->wirevec[call->curvec].iov_len;
823                 }
824             }
825         }                       /* while bytes to send and room to send them */
826
827         /* might be out of space now */
828         if (!nbytes) {
829             return requestCount;
830         } else;                 /* more data to send, so get another packet and keep going */
831     } while (nbytes);
832
833     return requestCount - nbytes;
834 }
835
836 int
837 rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
838 {
839     int bytes;
840     int tcurlen;
841     int tnFree;
842     char *tcurpos;
843     SPLVAR;
844
845     /*
846      * Free any packets from the last call to ReadvProc/WritevProc.
847      * We do not need the lock because the receiver threads only
848      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
849      * RX_CALL_IOVEC_WAIT is always cleared before returning from
850      * ReadvProc/WritevProc.
851      */
852     if (queue_IsNotEmpty(&call->iovq)) {
853         rxi_FreePackets(0, &call->iovq);
854     }
855
856     /*
857      * Most common case: all of the data fits in the current iovec.
858      * We do not need the lock because this is the only thread that
859      * updates the curlen, curpos, nFree fields.
860      *
861      * We are relying on nFree being zero unless the call is in send mode.
862      */
863     tcurlen = (int)call->curlen;
864     tnFree = (int)call->nFree;
865     if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
866         tcurpos = call->curpos;
867         memcpy(tcurpos, buf, nbytes);
868         call->curpos = tcurpos + nbytes;
869         call->curlen = (u_short)(tcurlen - nbytes);
870         call->nFree = (u_short)(tnFree - nbytes);
871         return nbytes;
872     }
873
874     NETPRI;
875     MUTEX_ENTER(&call->lock);
876     bytes = rxi_WriteProc(call, buf, nbytes);
877     MUTEX_EXIT(&call->lock);
878     USERPRI;
879     return bytes;
880 }
881
882 /* Optimization for marshalling 32 bit arguments */
883 int
884 rx_WriteProc32(register struct rx_call *call, register afs_int32 * value)
885 {
886     int bytes;
887     int tcurlen;
888     int tnFree;
889     char *tcurpos;
890     SPLVAR;
891
892     /*
893      * Free any packets from the last call to ReadvProc/WritevProc.
894      * We do not need the lock because the receiver threads only
895      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
896      * RX_CALL_IOVEC_WAIT is always cleared before returning from
897      * ReadvProc/WritevProc.
898      */
899     if (queue_IsNotEmpty(&call->iovq)) {
900         rxi_FreePackets(0, &call->iovq);
901     }
902
903     /*
904      * Most common case: all of the data fits in the current iovec.
905      * We do not need the lock because this is the only thread that
906      * updates the curlen, curpos, nFree fields.
907      *
908      * We are relying on nFree being zero unless the call is in send mode.
909      */
910     tcurlen = call->curlen;
911     tnFree = call->nFree;
912     if (!call->error && tcurlen >= sizeof(afs_int32)
913         && tnFree >= sizeof(afs_int32)) {
914         tcurpos = call->curpos;
915         if (!((size_t)tcurpos & (sizeof(afs_int32) - 1))) {
916             *((afs_int32 *) (tcurpos)) = *value;
917         } else {
918             memcpy(tcurpos, (char *)value, sizeof(afs_int32));
919         }
920         call->curpos = tcurpos + sizeof(afs_int32);
921         call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
922         call->nFree = (u_short)(tnFree - sizeof(afs_int32));
923         return sizeof(afs_int32);
924     }
925
926     NETPRI;
927     MUTEX_ENTER(&call->lock);
928     bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
929     MUTEX_EXIT(&call->lock);
930     USERPRI;
931     return bytes;
932 }
933
934 /* rxi_WritevAlloc -- internal version.
935  *
936  * Fill in an iovec to point to data in packet buffers. The application
937  * calls rxi_WritevProc when the buffers are full.
938  *
939  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
940
941 int
942 rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
943                 int nbytes)
944 {
945     struct rx_connection *conn = call->conn;
946     struct rx_packet *cp = call->currentPacket;
947     int requestCount;
948     int nextio;
949     /* Temporary values, real work is done in rxi_WritevProc */
950     int tnFree;
951     int tcurvec;
952     char *tcurpos;
953     int tcurlen;
954
955     requestCount = nbytes;
956     nextio = 0;
957
958     /* Free any packets from the last call to ReadvProc/WritevProc */
959     if (queue_IsNotEmpty(&call->iovq)) {
960         rxi_FreePackets(0, &call->iovq);
961     }
962
963     if (call->mode != RX_MODE_SENDING) {
964         if ((conn->type == RX_SERVER_CONNECTION)
965             && (call->mode == RX_MODE_RECEIVING)) {
966             call->mode = RX_MODE_SENDING;
967             if (cp) {
968                 cp->flags &= ~RX_PKTFLAG_CP;
969                 rxi_FreePacket(cp);
970                 cp = call->currentPacket = (struct rx_packet *)0;
971                 call->nLeft = 0;
972                 call->nFree = 0;
973             }
974         } else {
975             return 0;
976         }
977     }
978
979     /* Set up the iovec to point to data in packet buffers. */
980     tnFree = call->nFree;
981     tcurvec = call->curvec;
982     tcurpos = call->curpos;
983     tcurlen = call->curlen;
984     do {
985         register unsigned int t;
986
987         if (tnFree == 0) {
988             /* current packet is full, allocate a new one */
989             cp = rxi_AllocSendPacket(call, nbytes);
990             if (cp == NULL) {
991                 /* out of space, return what we have */
992                 *nio = nextio;
993                 return requestCount - nbytes;
994             }
995             cp->flags |= RX_PKTFLAG_IOVQ;
996             queue_Append(&call->iovq, cp);
997             tnFree = cp->length;
998             tcurvec = 1;
999             tcurpos =
1000                 (char *)cp->wirevec[1].iov_base +
1001                 call->conn->securityHeaderSize;
1002             tcurlen = cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
1003         }
1004
1005         if (tnFree < nbytes) {
1006             /* try to extend the current packet */
1007             register int len, mud;
1008             len = cp->length;
1009             mud = rx_MaxUserDataSize(call);
1010             if (mud > len) {
1011                 int want;
1012                 want = MIN(nbytes - tnFree, mud - len);
1013                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
1014                 if (cp->length > (unsigned)mud)
1015                     cp->length = mud;
1016                 tnFree += (cp->length - len);
1017                 if (cp == call->currentPacket) {
1018                     call->nFree += (cp->length - len);
1019                 }
1020             }
1021         }
1022
1023         /* fill in the next entry in the iovec */
1024         t = MIN(tcurlen, nbytes);
1025         t = MIN(tnFree, t);
1026         iov[nextio].iov_base = tcurpos;
1027         iov[nextio].iov_len = t;
1028         nbytes -= t;
1029         tcurpos += t;
1030         tcurlen -= t;
1031         tnFree -= t;
1032         nextio++;
1033
1034         if (!tcurlen) {
1035             /* need to get another struct iov */
1036             if (++tcurvec >= cp->niovecs) {
1037                 /* current packet is full, extend it or move on to next packet */
1038                 tnFree = 0;
1039             } else {
1040                 tcurpos = (char *)cp->wirevec[tcurvec].iov_base;
1041                 tcurlen = cp->wirevec[tcurvec].iov_len;
1042             }
1043         }
1044     } while (nbytes && nextio < maxio);
1045     *nio = nextio;
1046     return requestCount - nbytes;
1047 }
1048
1049 int
1050 rx_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
1051                int nbytes)
1052 {
1053     int bytes;
1054     SPLVAR;
1055
1056     NETPRI;
1057     MUTEX_ENTER(&call->lock);
1058     bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
1059     MUTEX_EXIT(&call->lock);
1060     USERPRI;
1061     return bytes;
1062 }
1063
1064 /* rxi_WritevProc -- internal version.
1065  *
1066  * Send buffers allocated in rxi_WritevAlloc.
1067  *
1068  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
1069
1070 int
1071 rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1072 {
1073     struct rx_packet *cp = call->currentPacket;
1074     int nextio;
1075     int requestCount;
1076     struct rx_queue tmpq;
1077
1078     requestCount = nbytes;
1079     nextio = 0;
1080
1081     if (call->mode != RX_MODE_SENDING) {
1082         call->error = RX_PROTOCOL_ERROR;
1083     }
1084 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1085     /* Wait until TQ_BUSY is reset before trying to move any
1086      * packets to the transmit queue.  */
1087     while (!call->error && call->flags & RX_CALL_TQ_BUSY) {
1088         call->flags |= RX_CALL_TQ_WAIT;
1089 #ifdef RX_ENABLE_LOCKS
1090         CV_WAIT(&call->cv_tq, &call->lock);
1091 #else /* RX_ENABLE_LOCKS */
1092         osi_rxSleep(&call->tq);
1093 #endif /* RX_ENABLE_LOCKS */
1094     }
1095 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1096
1097     if (call->error) {
1098         if (cp) {
1099             cp->flags &= ~RX_PKTFLAG_CP;
1100             cp->flags |= RX_PKTFLAG_IOVQ;
1101             queue_Prepend(&call->iovq, cp);
1102             cp = call->currentPacket = (struct rx_packet *)0;
1103         }
1104         rxi_FreePackets(0, &call->iovq);
1105         return 0;
1106     }
1107
1108     /* Loop through the I/O vector adjusting packet pointers.
1109      * Place full packets back onto the iovq once they are ready
1110      * to send. Set RX_PROTOCOL_ERROR if any problems are found in
1111      * the iovec. We put the loop condition at the end to ensure that
1112      * a zero length write will push a short packet. */
1113     nextio = 0;
1114     queue_Init(&tmpq);
1115     do {
1116         if (call->nFree == 0 && cp) {
1117             clock_NewTime();    /* Bogus:  need new time package */
1118             /* The 0, below, specifies that it is not the last packet: 
1119              * there will be others. PrepareSendPacket may
1120              * alter the packet length by up to
1121              * conn->securityMaxTrailerSize */
1122             hadd32(call->bytesSent, cp->length);
1123             rxi_PrepareSendPacket(call, cp, 0);
1124             cp->flags |= RX_PKTFLAG_TQ;
1125             queue_Append(&tmpq, cp);
1126             cp = call->currentPacket = (struct rx_packet *)0;
1127
1128             /* The head of the iovq is now the current packet */
1129             if (nbytes) {
1130                 if (queue_IsEmpty(&call->iovq)) {
1131                     call->error = RX_PROTOCOL_ERROR;
1132                     rxi_FreePackets(0, &tmpq);
1133                     return 0;
1134                 }
1135                 cp = queue_First(&call->iovq, rx_packet);
1136                 queue_Remove(cp);
1137                 cp->flags &= ~RX_PKTFLAG_IOVQ;
1138                 cp->flags |= RX_PKTFLAG_CP;
1139                 call->currentPacket = cp;
1140                 call->nFree = cp->length;
1141                 call->curvec = 1;
1142                 call->curpos =
1143                     (char *)cp->wirevec[1].iov_base +
1144                     call->conn->securityHeaderSize;
1145                 call->curlen =
1146                     cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
1147             }
1148         }
1149
1150         if (nbytes) {
1151             /* The next iovec should point to the current position */
1152             if (iov[nextio].iov_base != call->curpos
1153                 || iov[nextio].iov_len > (int)call->curlen) {
1154                 call->error = RX_PROTOCOL_ERROR;
1155                 if (cp) {
1156                     cp->flags &= ~RX_PKTFLAG_CP;
1157                     queue_Prepend(&tmpq, cp);
1158                     cp = call->currentPacket = (struct rx_packet *)0;
1159                 }
1160                 rxi_FreePackets(0, &tmpq);
1161                 return 0;
1162             }
1163             nbytes -= iov[nextio].iov_len;
1164             call->curpos += iov[nextio].iov_len;
1165             call->curlen -= iov[nextio].iov_len;
1166             call->nFree -= iov[nextio].iov_len;
1167             nextio++;
1168             if (call->curlen == 0) {
1169                 if (++call->curvec > cp->niovecs) {
1170                     call->nFree = 0;
1171                 } else {
1172                     call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
1173                     call->curlen = cp->wirevec[call->curvec].iov_len;
1174                 }
1175             }
1176         }
1177     } while (nbytes && nextio < nio);
1178
1179     /* Move the packets from the temporary queue onto the transmit queue.
1180      * We may end up with more than call->twind packets on the queue. */
1181     queue_SpliceAppend(&call->tq, &tmpq);
1182
1183     if (!(call->flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
1184         rxi_Start(0, call, 0, 0);
1185     }
1186
1187     /* Wait for the length of the transmit queue to fall below call->twind */
1188     while (!call->error && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
1189         clock_NewTime();
1190         call->startWait = clock_Sec();
1191 #ifdef  RX_ENABLE_LOCKS
1192         CV_WAIT(&call->cv_twind, &call->lock);
1193 #else
1194         call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
1195         osi_rxSleep(&call->twind);
1196 #endif
1197         call->startWait = 0;
1198     }
1199
1200     if (call->error) {
1201         if (cp) {
1202             cp->flags &= ~RX_PKTFLAG_CP;
1203             rxi_FreePacket(cp);
1204             cp = call->currentPacket = (struct rx_packet *)0;
1205         }
1206         return 0;
1207     }
1208
1209     return requestCount - nbytes;
1210 }
1211
1212 int
1213 rx_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1214 {
1215     int bytes;
1216     SPLVAR;
1217
1218     NETPRI;
1219     MUTEX_ENTER(&call->lock);
1220     bytes = rxi_WritevProc(call, iov, nio, nbytes);
1221     MUTEX_EXIT(&call->lock);
1222     USERPRI;
1223     return bytes;
1224 }
1225
1226 /* Flush any buffered data to the stream, switch to read mode
1227  * (clients) or to EOF mode (servers) */
1228 void
1229 rxi_FlushWrite(register struct rx_call *call)
1230 {
1231     register struct rx_packet *cp = call->currentPacket;
1232
1233     /* Free any packets from the last call to ReadvProc/WritevProc */
1234     if (queue_IsNotEmpty(&call->iovq)) {
1235         rxi_FreePackets(0, &call->iovq);
1236     }
1237
1238     if (call->mode == RX_MODE_SENDING) {
1239
1240         call->mode =
1241             (call->conn->type ==
1242              RX_CLIENT_CONNECTION ? RX_MODE_RECEIVING : RX_MODE_EOF);
1243
1244 #ifdef RX_KERNEL_TRACE
1245         {
1246             int glockOwner = ISAFS_GLOCK();
1247             if (!glockOwner)
1248                 AFS_GLOCK();
1249             afs_Trace3(afs_iclSetp, CM_TRACE_WASHERE, ICL_TYPE_STRING,
1250                        __FILE__, ICL_TYPE_INT32, __LINE__, ICL_TYPE_POINTER,
1251                        call);
1252             if (!glockOwner)
1253                 AFS_GUNLOCK();
1254         }
1255 #endif
1256
1257 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1258         /* Wait until TQ_BUSY is reset before adding any
1259          * packets to the transmit queue
1260          */
1261         while (call->flags & RX_CALL_TQ_BUSY) {
1262             call->flags |= RX_CALL_TQ_WAIT;
1263 #ifdef RX_ENABLE_LOCKS
1264             CV_WAIT(&call->cv_tq, &call->lock);
1265 #else /* RX_ENABLE_LOCKS */
1266             osi_rxSleep(&call->tq);
1267 #endif /* RX_ENABLE_LOCKS */
1268         }
1269 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1270
1271         if (cp) {
1272             /* cp->length is only supposed to be the user's data */
1273             /* cp->length was already set to (then-current) 
1274              * MaxUserDataSize or less. */
1275             cp->flags &= ~RX_PKTFLAG_CP;
1276             cp->length -= call->nFree;
1277             call->currentPacket = (struct rx_packet *)0;
1278             call->nFree = 0;
1279         } else {
1280             cp = rxi_AllocSendPacket(call, 0);
1281             if (!cp) {
1282                 /* Mode can no longer be MODE_SENDING */
1283                 return;
1284             }
1285             cp->length = 0;
1286             cp->niovecs = 2;    /* header + space for rxkad stuff */
1287             call->nFree = 0;
1288         }
1289
1290         /* The 1 specifies that this is the last packet */
1291         hadd32(call->bytesSent, cp->length);
1292         rxi_PrepareSendPacket(call, cp, 1);
1293         cp->flags |= RX_PKTFLAG_TQ;
1294         queue_Append(&call->tq, cp);
1295         if (!
1296             (call->
1297              flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
1298             rxi_Start(0, call, 0, 0);
1299         }
1300     }
1301 }
1302
1303 /* Flush any buffered data to the stream, switch to read mode
1304  * (clients) or to EOF mode (servers) */
1305 void
1306 rx_FlushWrite(struct rx_call *call)
1307 {
1308     SPLVAR;
1309     NETPRI;
1310     MUTEX_ENTER(&call->lock);
1311     rxi_FlushWrite(call);
1312     MUTEX_EXIT(&call->lock);
1313     USERPRI;
1314 }