876d7be66bc453af7d76d453c6cab786180121f1
[openafs.git] / src / rx / rx_rdwr.c
1  /*
2   * Copyright 2000, International Business Machines Corporation and others.
3   * All Rights Reserved.
4   * 
5   * This software has been released under the terms of the IBM Public
6   * License.  For details, see the LICENSE file in the top-level source
7   * directory or online at http://www.openafs.org/dl/license10.html
8   */
9
10 #include <afsconfig.h>
11 #ifdef KERNEL
12 #include "afs/param.h"
13 #else
14 #include <afs/param.h>
15 #endif
16
17
18 #ifdef KERNEL
19 #ifndef UKERNEL
20 #ifdef RX_KERNEL_TRACE
21 #include "rx_kcommon.h"
22 #endif
23 #if defined(AFS_DARWIN_ENV) || defined(AFS_XBSD_ENV)
24 #include "afs/sysincludes.h"
25 #else
26 #include "h/types.h"
27 #include "h/time.h"
28 #include "h/stat.h"
29 #if defined(AFS_AIX_ENV) || defined(AFS_AUX_ENV) || defined(AFS_SUN5_ENV) 
30 #include "h/systm.h"
31 #endif
32 #ifdef  AFS_OSF_ENV
33 #include <net/net_globals.h>
34 #endif /* AFS_OSF_ENV */
35 #ifdef AFS_LINUX20_ENV
36 #include "h/socket.h"
37 #endif
38 #include "netinet/in.h"
39 #if defined(AFS_SGI_ENV)
40 #include "afs/sysincludes.h"
41 #endif
42 #endif
43 #include "afs/afs_args.h"
44 #include "afs/afs_osi.h"
45 #if     (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
46 #include "h/systm.h"
47 #endif
48 #else /* !UKERNEL */
49 #include "afs/sysincludes.h"
50 #endif /* !UKERNEL */
51 #ifdef RXDEBUG
52 #undef RXDEBUG                  /* turn off debugging */
53 #endif /* RXDEBUG */
54
55 #include "rx_kmutex.h"
56 #include "rx/rx_kernel.h"
57 #include "rx/rx_clock.h"
58 #include "rx/rx_queue.h"
59 #include "rx/rx.h"
60 #include "rx/rx_globals.h"
61 #include "afs/lock.h"
62 #include "afsint.h"
63 #ifdef  AFS_OSF_ENV
64 #undef kmem_alloc
65 #undef kmem_free
66 #undef mem_alloc
67 #undef mem_free
68 #undef register
69 #endif /* AFS_OSF_ENV */
70 #else /* KERNEL */
71 # include <sys/types.h>
72 #ifdef AFS_NT40_ENV
73 # include <winsock2.h>
74 #else /* !AFS_NT40_ENV */
75 # include <sys/socket.h>
76 # include <sys/file.h>
77 # include <netdb.h>
78 # include <netinet/in.h>
79 # include <sys/stat.h>
80 # include <sys/time.h>
81 #endif /* !AFS_NT40_ENV */
82 #include <string.h>
83 #ifdef HAVE_UNISTD_H
84 #include <unistd.h>
85 #endif
86 # include "rx_user.h"
87 # include "rx_clock.h"
88 # include "rx_queue.h"
89 # include "rx.h"
90 # include "rx_globals.h"
91 #endif /* KERNEL */
92
93 #ifdef RX_LOCKS_DB
94 /* rxdb_fileID is used to identify the lock location, along with line#. */
95 static int rxdb_fileID = RXDB_FILE_RX_RDWR;
96 #endif /* RX_LOCKS_DB */
97 /* rxi_ReadProc -- internal version.
98  *
99  * LOCKS USED -- called at netpri with rx global lock and call->lock held.
100  */
101 int
102 rxi_ReadProc(struct rx_call *call, char *buf,
103              int nbytes)
104 {
105     struct rx_packet *cp = call->currentPacket;
106     struct rx_packet *rp;
107     int requestCount;
108     unsigned int t;
109
110 /* XXXX took out clock_NewTime from here.  Was it needed? */
111     requestCount = nbytes;
112
113     /* Free any packets from the last call to ReadvProc/WritevProc */
114     if (queue_IsNotEmpty(&call->iovq)) {
115 #ifdef RXDEBUG_PACKET
116         call->iovqc -=
117 #endif /* RXDEBUG_PACKET */
118             rxi_FreePackets(0, &call->iovq);
119     }
120
121     do {
122         if (call->nLeft == 0) {
123             /* Get next packet */
124             for (;;) {
125                 if (call->error || (call->mode != RX_MODE_RECEIVING)) {
126                     if (call->error) {
127                         return 0;
128                     }
129                     if (call->mode == RX_MODE_SENDING) {
130                         rxi_FlushWrite(call);
131                         continue;
132                     }
133                 }
134                 if (queue_IsNotEmpty(&call->rq)) {
135                     /* Check that next packet available is next in sequence */
136                     rp = queue_First(&call->rq, rx_packet);
137                     if (rp->header.seq == call->rnext) {
138                         afs_int32 error;
139                         struct rx_connection *conn = call->conn;
140                         queue_Remove(rp);
141                         rp->flags &= ~RX_PKTFLAG_RQ;
142 #ifdef RXDEBUG_PACKET
143                         call->rqc--;
144 #endif /* RXDEBUG_PACKET */
145
146                         /* RXS_CheckPacket called to undo RXS_PreparePacket's
147                          * work.  It may reduce the length of the packet by up
148                          * to conn->maxTrailerSize, to reflect the length of the
149                          * data + the header. */
150                         if ((error =
151                              RXS_CheckPacket(conn->securityObject, call,
152                                              rp))) {
153                             /* Used to merely shut down the call, but now we 
154                              * shut down the whole connection since this may 
155                              * indicate an attempt to hijack it */
156
157                             MUTEX_EXIT(&call->lock);
158                             rxi_ConnectionError(conn, error);
159                             MUTEX_ENTER(&conn->conn_data_lock);
160                             rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
161                             MUTEX_EXIT(&conn->conn_data_lock);
162                             rxi_FreePacket(rp);
163                             MUTEX_ENTER(&call->lock);
164
165                             return 0;
166                         }
167                         call->rnext++;
168                         cp = call->currentPacket = rp;
169                         call->currentPacket->flags |= RX_PKTFLAG_CP;
170                         call->curvec = 1;       /* 0th vec is always header */
171                         /* begin at the beginning [ more or less ], continue 
172                          * on until the end, then stop. */
173                         call->curpos =
174                             (char *)cp->wirevec[1].iov_base +
175                             call->conn->securityHeaderSize;
176                         call->curlen =
177                             cp->wirevec[1].iov_len -
178                             call->conn->securityHeaderSize;
179
180                         /* Notice that this code works correctly if the data
181                          * size is 0 (which it may be--no reply arguments from
182                          * server, for example).  This relies heavily on the
183                          * fact that the code below immediately frees the packet
184                          * (no yields, etc.).  If it didn't, this would be a
185                          * problem because a value of zero for call->nLeft
186                          * normally means that there is no read packet */
187                         call->nLeft = cp->length;
188                         hadd32(call->bytesRcvd, cp->length);
189
190                         /* Send a hard ack for every rxi_HardAckRate+1 packets
191                          * consumed. Otherwise schedule an event to send
192                          * the hard ack later on.
193                          */
194                         call->nHardAcks++;
195                         if (!(call->flags & RX_CALL_RECEIVE_DONE)) {
196                             if (call->nHardAcks > (u_short) rxi_HardAckRate) {
197                                 rxevent_Cancel(call->delayedAckEvent, call,
198                                                RX_CALL_REFCOUNT_DELAY);
199                                 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
200                             } else {
201                                 struct clock when, now;
202                                 clock_GetTime(&now);
203                                 when = now;
204                                 /* Delay to consolidate ack packets */
205                                 clock_Add(&when, &rx_hardAckDelay);
206                                 if (!call->delayedAckEvent
207                                     || clock_Gt(&call->delayedAckEvent->
208                                                 eventTime, &when)) {
209                                     rxevent_Cancel(call->delayedAckEvent,
210                                                    call,
211                                                    RX_CALL_REFCOUNT_DELAY);
212                                     CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
213                                     call->delayedAckEvent =
214                                       rxevent_PostNow(&when, &now,
215                                                      rxi_SendDelayedAck, call,
216                                                      0);
217                                 }
218                             }
219                         }
220                         break;
221                     }
222                 }
223
224                 /* 
225                  * If we reach this point either we have no packets in the
226                  * receive queue or the next packet in the queue is not the
227                  * one we are looking for.  There is nothing else for us to
228                  * do but wait for another packet to arrive.
229                  */
230
231                 /* Are there ever going to be any more packets? */
232                 if (call->flags & RX_CALL_RECEIVE_DONE) {
233                     return requestCount - nbytes;
234                 }
235                 /* Wait for in-sequence packet */
236                 call->flags |= RX_CALL_READER_WAIT;
237                 clock_NewTime();
238                 call->startWait = clock_Sec();
239                 while (call->flags & RX_CALL_READER_WAIT) {
240 #ifdef  RX_ENABLE_LOCKS
241                     CV_WAIT(&call->cv_rq, &call->lock);
242 #else
243                     osi_rxSleep(&call->rq);
244 #endif
245                 }
246                 /* cp is no longer valid since we may have given up the lock */
247                 cp = call->currentPacket;
248
249                 call->startWait = 0;
250 #ifdef RX_ENABLE_LOCKS
251                 if (call->error) {
252                     return 0;
253                 }
254 #endif /* RX_ENABLE_LOCKS */
255             }
256         } else
257             /* assert(cp); */
258             /* MTUXXX  this should be replaced by some error-recovery code before shipping */
259             /* yes, the following block is allowed to be the ELSE clause (or not) */
260             /* It's possible for call->nLeft to be smaller than any particular
261              * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
262              * reflects the size of the buffer.  We have to keep track of the
263              * number of bytes read in the length field of the packet struct.  On
264              * the final portion of a received packet, it's almost certain that
265              * call->nLeft will be smaller than the final buffer. */
266             while (nbytes && cp) {
267                 t = MIN((int)call->curlen, nbytes);
268                 t = MIN(t, (int)call->nLeft);
269                 memcpy(buf, call->curpos, t);
270                 buf += t;
271                 nbytes -= t;
272                 call->curpos += t;
273                 call->curlen -= t;
274                 call->nLeft -= t;
275
276                 if (!call->nLeft) {
277                     /* out of packet.  Get another one. */
278                     call->currentPacket->flags &= ~RX_PKTFLAG_CP;
279                     rxi_FreePacket(cp);
280                     cp = call->currentPacket = (struct rx_packet *)0;
281                 } else if (!call->curlen) {
282                     /* need to get another struct iov */
283                     if (++call->curvec >= cp->niovecs) {
284                         /* current packet is exhausted, get ready for another */
285                         /* don't worry about curvec and stuff, they get set somewhere else */
286                         call->currentPacket->flags &= ~RX_PKTFLAG_CP;
287                         rxi_FreePacket(cp);
288                         cp = call->currentPacket = (struct rx_packet *)0;
289                         call->nLeft = 0;
290                     } else {
291                         call->curpos =
292                             (char *)cp->wirevec[call->curvec].iov_base;
293                         call->curlen = cp->wirevec[call->curvec].iov_len;
294                     }
295                 }
296             }
297         if (!nbytes) {
298             /* user buffer is full, return */
299             return requestCount;
300         }
301
302     } while (nbytes);
303
304     return requestCount;
305 }
306
307 int
308 rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
309 {
310     int bytes;
311     int tcurlen;
312     int tnLeft;
313     char *tcurpos;
314     SPLVAR;
315
316     /* Free any packets from the last call to ReadvProc/WritevProc */
317     NETPRI;
318     MUTEX_ENTER(&call->lock);
319     if (!queue_IsEmpty(&call->iovq)) {
320 #ifdef RXDEBUG_PACKET
321         call->iovqc -=
322 #endif /* RXDEBUG_PACKET */
323             rxi_FreePackets(0, &call->iovq);
324     }
325     MUTEX_EXIT(&call->lock);
326     USERPRI;
327
328     /*
329      * Most common case, all of the data is in the current iovec.
330      * We do not need the lock because this is the only thread that
331      * updates the curlen, curpos, nLeft fields.
332      *
333      * We are relying on nLeft being zero unless the call is in receive mode.
334      */
335     tcurlen = call->curlen;
336     tnLeft = call->nLeft;
337     if (!call->error && tcurlen > nbytes && tnLeft > nbytes) {
338         tcurpos = call->curpos;
339         memcpy(buf, tcurpos, nbytes);
340         call->curpos = tcurpos + nbytes;
341         call->curlen = tcurlen - nbytes;
342         call->nLeft = tnLeft - nbytes;
343
344         if (!call->nLeft && call->currentPacket != NULL) {
345             /* out of packet.  Get another one. */
346             NETPRI;
347             MUTEX_ENTER(&call->lock);
348             rxi_FreePacket(call->currentPacket);
349             call->currentPacket = (struct rx_packet *)0;
350             MUTEX_EXIT(&call->lock);
351             USERPRI;
352         }
353         return nbytes;
354     }
355
356     NETPRI;
357     MUTEX_ENTER(&call->lock);
358     bytes = rxi_ReadProc(call, buf, nbytes);
359     MUTEX_EXIT(&call->lock);
360     USERPRI;
361     return bytes;
362 }
363
364 /* Optimization for unmarshalling 32 bit integers */
365 int
366 rx_ReadProc32(struct rx_call *call, afs_int32 * value)
367 {
368     int bytes;
369     int tcurlen;
370     int tnLeft;
371     char *tcurpos;
372     SPLVAR;
373
374     /* Free any packets from the last call to ReadvProc/WritevProc */
375     NETPRI;
376     MUTEX_ENTER(&call->lock);
377     if (!queue_IsEmpty(&call->iovq)) {
378 #ifdef RXDEBUG_PACKET
379         call->iovqc -=
380 #endif /* RXDEBUG_PACKET */
381             rxi_FreePackets(0, &call->iovq);
382     }
383     MUTEX_EXIT(&call->lock);
384     USERPRI;
385
386     /*
387      * Most common case, all of the data is in the current iovec.
388      * We do not need the lock because this is the only thread that
389      * updates the curlen, curpos, nLeft fields.
390      *
391      * We are relying on nLeft being zero unless the call is in receive mode.
392      */
393     tcurlen = call->curlen;
394     tnLeft = call->nLeft;
395     if (!call->error && tcurlen >= sizeof(afs_int32)
396         && tnLeft >= sizeof(afs_int32)) {
397         tcurpos = call->curpos;
398         memcpy((char *)value, tcurpos, sizeof(afs_int32));
399         call->curpos = tcurpos + sizeof(afs_int32);
400         call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
401         call->nLeft = (u_short)(tnLeft - sizeof(afs_int32));
402         if (!call->nLeft && call->currentPacket != NULL) {
403             /* out of packet.  Get another one. */
404             NETPRI;
405             MUTEX_ENTER(&call->lock);
406             rxi_FreePacket(call->currentPacket);
407             call->currentPacket = (struct rx_packet *)0;
408             MUTEX_EXIT(&call->lock);
409             USERPRI;
410         }
411         return sizeof(afs_int32);
412     }
413
414     NETPRI;
415     MUTEX_ENTER(&call->lock);
416     bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
417     MUTEX_EXIT(&call->lock);
418     USERPRI;
419     return bytes;
420 }
421
422 /* rxi_FillReadVec
423  *
424  * Uses packets in the receive queue to fill in as much of the
425  * current iovec as possible. Does not block if it runs out
426  * of packets to complete the iovec. Return true if an ack packet
427  * was sent, otherwise return false */
428 int
429 rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
430 {
431     int didConsume = 0;
432     int didHardAck = 0;
433     unsigned int t;
434     struct rx_packet *rp;
435     struct rx_packet *curp;
436     struct iovec *call_iov;
437     struct iovec *cur_iov = NULL;
438
439     curp = call->currentPacket;
440     if (curp) {
441         cur_iov = &curp->wirevec[call->curvec];
442     }
443     call_iov = &call->iov[call->iovNext];
444
445     while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) {
446         if (call->nLeft == 0) {
447             /* Get next packet */
448             if (queue_IsNotEmpty(&call->rq)) {
449                 /* Check that next packet available is next in sequence */
450                 rp = queue_First(&call->rq, rx_packet);
451                 if (rp->header.seq == call->rnext) {
452                     afs_int32 error;
453                     struct rx_connection *conn = call->conn;
454                     queue_Remove(rp);
455                     rp->flags &= ~RX_PKTFLAG_RQ;
456 #ifdef RXDEBUG_PACKET
457                     call->rqc--;
458 #endif /* RXDEBUG_PACKET */
459
460                     /* RXS_CheckPacket called to undo RXS_PreparePacket's
461                      * work.  It may reduce the length of the packet by up
462                      * to conn->maxTrailerSize, to reflect the length of the
463                      * data + the header. */
464                     if ((error =
465                          RXS_CheckPacket(conn->securityObject, call, rp))) {
466                         /* Used to merely shut down the call, but now we 
467                          * shut down the whole connection since this may 
468                          * indicate an attempt to hijack it */
469
470                         MUTEX_EXIT(&call->lock);
471                         rxi_ConnectionError(conn, error);
472                         MUTEX_ENTER(&conn->conn_data_lock);
473                         rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
474                         MUTEX_EXIT(&conn->conn_data_lock);
475                         rxi_FreePacket(rp);
476                         MUTEX_ENTER(&call->lock);
477
478                         return 1;
479                     }
480                     call->rnext++;
481                     curp = call->currentPacket = rp;
482                     call->currentPacket->flags |= RX_PKTFLAG_CP;
483                     call->curvec = 1;   /* 0th vec is always header */
484                     cur_iov = &curp->wirevec[1];
485                     /* begin at the beginning [ more or less ], continue 
486                      * on until the end, then stop. */
487                     call->curpos =
488                         (char *)curp->wirevec[1].iov_base +
489                         call->conn->securityHeaderSize;
490                     call->curlen =
491                         curp->wirevec[1].iov_len -
492                         call->conn->securityHeaderSize;
493
494                     /* Notice that this code works correctly if the data
495                      * size is 0 (which it may be--no reply arguments from
496                      * server, for example).  This relies heavily on the
497                      * fact that the code below immediately frees the packet
498                      * (no yields, etc.).  If it didn't, this would be a
499                      * problem because a value of zero for call->nLeft
500                      * normally means that there is no read packet */
501                     call->nLeft = curp->length;
502                     hadd32(call->bytesRcvd, curp->length);
503
504                     /* Send a hard ack for every rxi_HardAckRate+1 packets
505                      * consumed. Otherwise schedule an event to send
506                      * the hard ack later on.
507                      */
508                     call->nHardAcks++;
509                     didConsume = 1;
510                     continue;
511                 }
512             }
513             break;
514         }
515
516         /* It's possible for call->nLeft to be smaller than any particular
517          * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
518          * reflects the size of the buffer.  We have to keep track of the
519          * number of bytes read in the length field of the packet struct.  On
520          * the final portion of a received packet, it's almost certain that
521          * call->nLeft will be smaller than the final buffer. */
522         while (call->iovNBytes && call->iovNext < call->iovMax && curp) {
523
524             t = MIN((int)call->curlen, call->iovNBytes);
525             t = MIN(t, (int)call->nLeft);
526             call_iov->iov_base = call->curpos;
527             call_iov->iov_len = t;
528             call_iov++;
529             call->iovNext++;
530             call->iovNBytes -= t;
531             call->curpos += t;
532             call->curlen -= t;
533             call->nLeft -= t;
534
535             if (!call->nLeft) {
536                 /* out of packet.  Get another one. */
537                 curp->flags &= ~RX_PKTFLAG_CP;
538                 curp->flags |= RX_PKTFLAG_IOVQ;
539                 queue_Append(&call->iovq, curp);
540 #ifdef RXDEBUG_PACKET
541                 call->iovqc++;
542 #endif /* RXDEBUG_PACKET */
543                 curp = call->currentPacket = (struct rx_packet *)0;
544             } else if (!call->curlen) {
545                 /* need to get another struct iov */
546                 if (++call->curvec >= curp->niovecs) {
547                     /* current packet is exhausted, get ready for another */
548                     /* don't worry about curvec and stuff, they get set somewhere else */
549                     curp->flags &= ~RX_PKTFLAG_CP;
550                     curp->flags |= RX_PKTFLAG_IOVQ;
551                     queue_Append(&call->iovq, curp);
552 #ifdef RXDEBUG_PACKET
553                     call->iovqc++;
554 #endif /* RXDEBUG_PACKET */
555                     curp = call->currentPacket = (struct rx_packet *)0;
556                     call->nLeft = 0;
557                 } else {
558                     cur_iov++;
559                     call->curpos = (char *)cur_iov->iov_base;
560                     call->curlen = cur_iov->iov_len;
561                 }
562             }
563         }
564     }
565
566     /* If we consumed any packets then check whether we need to
567      * send a hard ack. */
568     if (didConsume && (!(call->flags & RX_CALL_RECEIVE_DONE))) {
569         if (call->nHardAcks > (u_short) rxi_HardAckRate) {
570             rxevent_Cancel(call->delayedAckEvent, call,
571                            RX_CALL_REFCOUNT_DELAY);
572             rxi_SendAck(call, 0, serial, RX_ACK_DELAY, 0);
573             didHardAck = 1;
574         } else {
575             struct clock when, now;
576             clock_GetTime(&now);
577             when = now;
578             /* Delay to consolidate ack packets */
579             clock_Add(&when, &rx_hardAckDelay);
580             if (!call->delayedAckEvent
581                 || clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
582                 rxevent_Cancel(call->delayedAckEvent, call,
583                                RX_CALL_REFCOUNT_DELAY);
584                 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
585                 call->delayedAckEvent =
586                     rxevent_PostNow(&when, &now, rxi_SendDelayedAck, call, 0);
587             }
588         }
589     }
590     return didHardAck;
591 }
592
593
594 /* rxi_ReadvProc -- internal version.
595  *
596  * Fills in an iovec with pointers to the packet buffers. All packets
597  * except the last packet (new current packet) are moved to the iovq
598  * while the application is processing the data.
599  *
600  * LOCKS USED -- called at netpri with rx global lock and call->lock held.
601  */
602 int
603 rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
604               int nbytes)
605 {
606     int requestCount;
607     int nextio;
608
609     requestCount = nbytes;
610     nextio = 0;
611
612     /* Free any packets from the last call to ReadvProc/WritevProc */
613     if (queue_IsNotEmpty(&call->iovq)) {
614 #ifdef RXDEBUG_PACKET
615         call->iovqc -=
616 #endif /* RXDEBUG_PACKET */
617             rxi_FreePackets(0, &call->iovq);
618     }
619
620     if (call->mode == RX_MODE_SENDING) {
621         rxi_FlushWrite(call);
622     }
623
624     if (call->error) {
625         return 0;
626     }
627
628     /* Get whatever data is currently available in the receive queue.
629      * If rxi_FillReadVec sends an ack packet then it is possible
630      * that we will receive more data while we drop the call lock
631      * to send the packet. Set the RX_CALL_IOVEC_WAIT flag
632      * here to avoid a race with the receive thread if we send
633      * hard acks in rxi_FillReadVec. */
634     call->flags |= RX_CALL_IOVEC_WAIT;
635     call->iovNBytes = nbytes;
636     call->iovMax = maxio;
637     call->iovNext = 0;
638     call->iov = iov;
639     rxi_FillReadVec(call, 0);
640
641     /* if we need more data then sleep until the receive thread has
642      * filled in the rest. */
643     if (!call->error && call->iovNBytes && call->iovNext < call->iovMax
644         && !(call->flags & RX_CALL_RECEIVE_DONE)) {
645         call->flags |= RX_CALL_READER_WAIT;
646         clock_NewTime();
647         call->startWait = clock_Sec();
648         while (call->flags & RX_CALL_READER_WAIT) {
649 #ifdef  RX_ENABLE_LOCKS
650             CV_WAIT(&call->cv_rq, &call->lock);
651 #else
652             osi_rxSleep(&call->rq);
653 #endif
654         }
655         call->startWait = 0;
656     }
657     call->flags &= ~RX_CALL_IOVEC_WAIT;
658 #ifdef RX_ENABLE_LOCKS
659     if (call->error) {
660         return 0;
661     }
662 #endif /* RX_ENABLE_LOCKS */
663
664     call->iov = NULL;
665     *nio = call->iovNext;
666     return nbytes - call->iovNBytes;
667 }
668
669 int
670 rx_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
671              int nbytes)
672 {
673     int bytes;
674     SPLVAR;
675
676     NETPRI;
677     MUTEX_ENTER(&call->lock);
678     bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
679     MUTEX_EXIT(&call->lock);
680     USERPRI;
681     return bytes;
682 }
683
684 /* rxi_WriteProc -- internal version.
685  *
686  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
687
688 int
689 rxi_WriteProc(struct rx_call *call, char *buf,
690               int nbytes)
691 {
692     struct rx_connection *conn = call->conn;
693     struct rx_packet *cp = call->currentPacket;
694     unsigned int t;
695     int requestCount = nbytes;
696
697     /* Free any packets from the last call to ReadvProc/WritevProc */
698     if (queue_IsNotEmpty(&call->iovq)) {
699 #ifdef RXDEBUG_PACKET
700         call->iovqc -=
701 #endif /* RXDEBUG_PACKET */
702             rxi_FreePackets(0, &call->iovq);
703     }
704
705     if (call->mode != RX_MODE_SENDING) {
706         if ((conn->type == RX_SERVER_CONNECTION)
707             && (call->mode == RX_MODE_RECEIVING)) {
708             call->mode = RX_MODE_SENDING;
709             if (cp) {
710                 cp->flags &= ~RX_PKTFLAG_CP;
711                 rxi_FreePacket(cp);
712                 cp = call->currentPacket = (struct rx_packet *)0;
713                 call->nLeft = 0;
714                 call->nFree = 0;
715             }
716         } else {
717             return 0;
718         }
719     }
720
721     /* Loop condition is checked at end, so that a write of 0 bytes
722      * will force a packet to be created--specially for the case where
723      * there are 0 bytes on the stream, but we must send a packet
724      * anyway. */
725     do {
726         if (call->nFree == 0) {
727             if (!call->error && cp) {
728                 /* Clear the current packet now so that if
729                  * we are forced to wait and drop the lock 
730                  * the packet we are planning on using 
731                  * cannot be freed.
732                  */
733                 cp->flags &= ~RX_PKTFLAG_CP;
734                 call->currentPacket = (struct rx_packet *)0;
735 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
736                 /* Wait until TQ_BUSY is reset before adding any
737                  * packets to the transmit queue
738                  */
739                 while (call->flags & RX_CALL_TQ_BUSY) {
740                     call->flags |= RX_CALL_TQ_WAIT;
741 #ifdef RX_ENABLE_LOCKS
742                     CV_WAIT(&call->cv_tq, &call->lock);
743 #else /* RX_ENABLE_LOCKS */
744                     osi_rxSleep(&call->tq);
745 #endif /* RX_ENABLE_LOCKS */
746                 }
747 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
748                 clock_NewTime();        /* Bogus:  need new time package */
749                 /* The 0, below, specifies that it is not the last packet: 
750                  * there will be others. PrepareSendPacket may
751                  * alter the packet length by up to
752                  * conn->securityMaxTrailerSize */
753                 hadd32(call->bytesSent, cp->length);
754                 rxi_PrepareSendPacket(call, cp, 0);
755                 cp->flags |= RX_PKTFLAG_TQ;
756                 queue_Append(&call->tq, cp);
757 #ifdef RXDEBUG_PACKET
758                 call->tqc++;
759 #endif /* RXDEBUG_PACKET */
760                 cp = (struct rx_packet *)0;
761                 if (!
762                     (call->
763                      flags & (RX_CALL_FAST_RECOVER |
764                               RX_CALL_FAST_RECOVER_WAIT))) {
765                     rxi_Start(0, call, 0, 0);
766                 }
767             } else if (cp) {
768                 cp->flags &= ~RX_PKTFLAG_CP;
769                 rxi_FreePacket(cp);
770                 cp = call->currentPacket = (struct rx_packet *)0;
771             }
772             /* Wait for transmit window to open up */
773             while (!call->error
774                    && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
775                 clock_NewTime();
776                 call->startWait = clock_Sec();
777
778 #ifdef  RX_ENABLE_LOCKS
779                 CV_WAIT(&call->cv_twind, &call->lock);
780 #else
781                 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
782                 osi_rxSleep(&call->twind);
783 #endif
784
785                 call->startWait = 0;
786 #ifdef RX_ENABLE_LOCKS
787                 if (call->error) {
788                     return 0;
789                 }
790 #endif /* RX_ENABLE_LOCKS */
791             }
792             if ((cp = rxi_AllocSendPacket(call, nbytes))) {
793                 cp->flags |= RX_PKTFLAG_CP;
794                 call->currentPacket = cp;
795                 call->nFree = cp->length;
796                 call->curvec = 1;       /* 0th vec is always header */
797                 /* begin at the beginning [ more or less ], continue 
798                  * on until the end, then stop. */
799                 call->curpos =
800                     (char *)cp->wirevec[1].iov_base +
801                     call->conn->securityHeaderSize;
802                 call->curlen =
803                     cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
804             }
805             if (call->error) {
806                 if (cp) {
807                     cp->flags &= ~RX_PKTFLAG_CP;
808                     rxi_FreePacket(cp);
809                     call->currentPacket = NULL;
810                 }
811                 return 0;
812             }
813         }
814
815         if (cp && (int)call->nFree < nbytes) {
816             /* Try to extend the current buffer */
817             int len, mud;
818             len = cp->length;
819             mud = rx_MaxUserDataSize(call);
820             if (mud > len) {
821                 int want;
822                 want = MIN(nbytes - (int)call->nFree, mud - len);
823                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
824                 if (cp->length > (unsigned)mud)
825                     cp->length = mud;
826                 call->nFree += (cp->length - len);
827             }
828         }
829
830         /* If the remaining bytes fit in the buffer, then store them
831          * and return.  Don't ship a buffer that's full immediately to
832          * the peer--we don't know if it's the last buffer yet */
833
834         if (!cp) {
835             call->nFree = 0;
836         }
837
838         while (nbytes && call->nFree) {
839
840             t = MIN((int)call->curlen, nbytes);
841             t = MIN((int)call->nFree, t);
842             memcpy(call->curpos, buf, t);
843             buf += t;
844             nbytes -= t;
845             call->curpos += t;
846             call->curlen -= (u_short)t;
847             call->nFree -= (u_short)t;
848
849             if (!call->curlen) {
850                 /* need to get another struct iov */
851                 if (++call->curvec >= cp->niovecs) {
852                     /* current packet is full, extend or send it */
853                     call->nFree = 0;
854                 } else {
855                     call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
856                     call->curlen = cp->wirevec[call->curvec].iov_len;
857                 }
858             }
859         }                       /* while bytes to send and room to send them */
860
861         /* might be out of space now */
862         if (!nbytes) {
863             return requestCount;
864         } else;                 /* more data to send, so get another packet and keep going */
865     } while (nbytes);
866
867     return requestCount - nbytes;
868 }
869
870 int
871 rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
872 {
873     int bytes;
874     int tcurlen;
875     int tnFree;
876     char *tcurpos;
877     SPLVAR;
878
879     /* Free any packets from the last call to ReadvProc/WritevProc */
880     NETPRI;
881     MUTEX_ENTER(&call->lock);
882     if (queue_IsNotEmpty(&call->iovq)) {
883 #ifdef RXDEBUG_PACKET
884         call->iovqc -=
885 #endif /* RXDEBUG_PACKET */
886             rxi_FreePackets(0, &call->iovq);
887     }
888     MUTEX_EXIT(&call->lock);
889     USERPRI;
890
891     /*
892      * Most common case: all of the data fits in the current iovec.
893      * We do not need the lock because this is the only thread that
894      * updates the curlen, curpos, nFree fields.
895      *
896      * We are relying on nFree being zero unless the call is in send mode.
897      */
898     tcurlen = (int)call->curlen;
899     tnFree = (int)call->nFree;
900     if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
901         tcurpos = call->curpos;
902         memcpy(tcurpos, buf, nbytes);
903         call->curpos = tcurpos + nbytes;
904         call->curlen = (u_short)(tcurlen - nbytes);
905         call->nFree = (u_short)(tnFree - nbytes);
906         return nbytes;
907     }
908
909     NETPRI;
910     MUTEX_ENTER(&call->lock);
911     bytes = rxi_WriteProc(call, buf, nbytes);
912     MUTEX_EXIT(&call->lock);
913     USERPRI;
914     return bytes;
915 }
916
917 /* Optimization for marshalling 32 bit arguments */
918 int
919 rx_WriteProc32(struct rx_call *call, afs_int32 * value)
920 {
921     int bytes;
922     int tcurlen;
923     int tnFree;
924     char *tcurpos;
925     SPLVAR;
926
927     /* Free any packets from the last call to ReadvProc/WritevProc */
928     NETPRI;
929     MUTEX_ENTER(&call->lock);
930     if (queue_IsNotEmpty(&call->iovq)) {
931 #ifdef RXDEBUG_PACKET
932         call->iovqc -=
933 #endif /* RXDEBUG_PACKET */
934             rxi_FreePackets(0, &call->iovq);
935     }
936     MUTEX_EXIT(&call->lock);
937     USERPRI;
938
939     /*
940      * Most common case: all of the data fits in the current iovec.
941      * We do not need the lock because this is the only thread that
942      * updates the curlen, curpos, nFree fields.
943      *
944      * We are relying on nFree being zero unless the call is in send mode.
945      */
946     tcurlen = call->curlen;
947     tnFree = call->nFree;
948     if (!call->error && tcurlen >= sizeof(afs_int32)
949         && tnFree >= sizeof(afs_int32)) {
950         tcurpos = call->curpos;
951         if (!((size_t)tcurpos & (sizeof(afs_int32) - 1))) {
952             *((afs_int32 *) (tcurpos)) = *value;
953         } else {
954             memcpy(tcurpos, (char *)value, sizeof(afs_int32));
955         }
956         call->curpos = tcurpos + sizeof(afs_int32);
957         call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
958         call->nFree = (u_short)(tnFree - sizeof(afs_int32));
959         return sizeof(afs_int32);
960     }
961
962     NETPRI;
963     MUTEX_ENTER(&call->lock);
964     bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
965     MUTEX_EXIT(&call->lock);
966     USERPRI;
967     return bytes;
968 }
969
970 /* rxi_WritevAlloc -- internal version.
971  *
972  * Fill in an iovec to point to data in packet buffers. The application
973  * calls rxi_WritevProc when the buffers are full.
974  *
975  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
976
977 int
978 rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
979                 int nbytes)
980 {
981     struct rx_connection *conn = call->conn;
982     struct rx_packet *cp = call->currentPacket;
983     int requestCount;
984     int nextio;
985     /* Temporary values, real work is done in rxi_WritevProc */
986     int tnFree;
987     unsigned int tcurvec;
988     char *tcurpos;
989     int tcurlen;
990
991     requestCount = nbytes;
992     nextio = 0;
993
994     /* Free any packets from the last call to ReadvProc/WritevProc */
995     if (queue_IsNotEmpty(&call->iovq)) {
996 #ifdef RXDEBUG_PACKET
997         call->iovqc -=
998 #endif /* RXDEBUG_PACKET */
999             rxi_FreePackets(0, &call->iovq);
1000     }
1001
1002     if (call->mode != RX_MODE_SENDING) {
1003         if ((conn->type == RX_SERVER_CONNECTION)
1004             && (call->mode == RX_MODE_RECEIVING)) {
1005             call->mode = RX_MODE_SENDING;
1006             if (cp) {
1007                 cp->flags &= ~RX_PKTFLAG_CP;
1008                 rxi_FreePacket(cp);
1009                 cp = call->currentPacket = (struct rx_packet *)0;
1010                 call->nLeft = 0;
1011                 call->nFree = 0;
1012             }
1013         } else {
1014             return 0;
1015         }
1016     }
1017
1018     /* Set up the iovec to point to data in packet buffers. */
1019     tnFree = call->nFree;
1020     tcurvec = call->curvec;
1021     tcurpos = call->curpos;
1022     tcurlen = call->curlen;
1023     do {
1024         int t;
1025
1026         if (tnFree == 0) {
1027             /* current packet is full, allocate a new one */
1028             cp = rxi_AllocSendPacket(call, nbytes);
1029             if (cp == NULL) {
1030                 /* out of space, return what we have */
1031                 *nio = nextio;
1032                 return requestCount - nbytes;
1033             }
1034             cp->flags |= RX_PKTFLAG_IOVQ;
1035             queue_Append(&call->iovq, cp);
1036 #ifdef RXDEBUG_PACKET
1037             call->iovqc++;
1038 #endif /* RXDEBUG_PACKET */
1039             tnFree = cp->length;
1040             tcurvec = 1;
1041             tcurpos =
1042                 (char *)cp->wirevec[1].iov_base +
1043                 call->conn->securityHeaderSize;
1044             tcurlen = cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
1045         }
1046
1047         if (tnFree < nbytes) {
1048             /* try to extend the current packet */
1049             int len, mud;
1050             len = cp->length;
1051             mud = rx_MaxUserDataSize(call);
1052             if (mud > len) {
1053                 int want;
1054                 want = MIN(nbytes - tnFree, mud - len);
1055                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
1056                 if (cp->length > (unsigned)mud)
1057                     cp->length = mud;
1058                 tnFree += (cp->length - len);
1059                 if (cp == call->currentPacket) {
1060                     call->nFree += (cp->length - len);
1061                 }
1062             }
1063         }
1064
1065         /* fill in the next entry in the iovec */
1066         t = MIN(tcurlen, nbytes);
1067         t = MIN(tnFree, t);
1068         iov[nextio].iov_base = tcurpos;
1069         iov[nextio].iov_len = t;
1070         nbytes -= t;
1071         tcurpos += t;
1072         tcurlen -= t;
1073         tnFree -= t;
1074         nextio++;
1075
1076         if (!tcurlen) {
1077             /* need to get another struct iov */
1078             if (++tcurvec >= cp->niovecs) {
1079                 /* current packet is full, extend it or move on to next packet */
1080                 tnFree = 0;
1081             } else {
1082                 tcurpos = (char *)cp->wirevec[tcurvec].iov_base;
1083                 tcurlen = cp->wirevec[tcurvec].iov_len;
1084             }
1085         }
1086     } while (nbytes && nextio < maxio);
1087     *nio = nextio;
1088     return requestCount - nbytes;
1089 }
1090
1091 int
1092 rx_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
1093                int nbytes)
1094 {
1095     int bytes;
1096     SPLVAR;
1097
1098     NETPRI;
1099     MUTEX_ENTER(&call->lock);
1100     bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
1101     MUTEX_EXIT(&call->lock);
1102     USERPRI;
1103     return bytes;
1104 }
1105
1106 /* rxi_WritevProc -- internal version.
1107  *
1108  * Send buffers allocated in rxi_WritevAlloc.
1109  *
1110  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
1111
1112 int
1113 rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1114 {
1115     struct rx_packet *cp = NULL;
1116     struct rx_packet *p, *np;
1117     int nextio;
1118     int requestCount;
1119     struct rx_queue tmpq;
1120 #ifdef RXDEBUG_PACKET
1121     u_short tmpqc;
1122 #endif
1123
1124     requestCount = nbytes;
1125     nextio = 0;
1126
1127     if (call->mode != RX_MODE_SENDING) {
1128         call->error = RX_PROTOCOL_ERROR;
1129     }
1130 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1131     /* Wait until TQ_BUSY is reset before trying to move any
1132      * packets to the transmit queue.  */
1133     while (!call->error && call->flags & RX_CALL_TQ_BUSY) {
1134         call->flags |= RX_CALL_TQ_WAIT;
1135 #ifdef RX_ENABLE_LOCKS
1136         CV_WAIT(&call->cv_tq, &call->lock);
1137 #else /* RX_ENABLE_LOCKS */
1138         osi_rxSleep(&call->tq);
1139 #endif /* RX_ENABLE_LOCKS */
1140     }
1141 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1142     /* cp is no longer valid since we may have given up the lock */
1143     cp = call->currentPacket;
1144
1145     if (call->error) {
1146         if (cp) {
1147             cp->flags &= ~RX_PKTFLAG_CP;
1148             cp->flags |= RX_PKTFLAG_IOVQ;
1149             queue_Prepend(&call->iovq, cp);
1150 #ifdef RXDEBUG_PACKET
1151             call->iovqc++;
1152 #endif /* RXDEBUG_PACKET */
1153             cp = call->currentPacket = (struct rx_packet *)0;
1154         }
1155 #ifdef RXDEBUG_PACKET
1156         call->iovqc -=
1157 #endif /* RXDEBUG_PACKET */
1158             rxi_FreePackets(0, &call->iovq);
1159         return 0;
1160     }
1161
1162     /* Loop through the I/O vector adjusting packet pointers.
1163      * Place full packets back onto the iovq once they are ready
1164      * to send. Set RX_PROTOCOL_ERROR if any problems are found in
1165      * the iovec. We put the loop condition at the end to ensure that
1166      * a zero length write will push a short packet. */
1167     nextio = 0;
1168     queue_Init(&tmpq);
1169 #ifdef RXDEBUG_PACKET
1170     tmpqc = 0;
1171 #endif /* RXDEBUG_PACKET */
1172     do {
1173         if (call->nFree == 0 && cp) {
1174             clock_NewTime();    /* Bogus:  need new time package */
1175             /* The 0, below, specifies that it is not the last packet: 
1176              * there will be others. PrepareSendPacket may
1177              * alter the packet length by up to
1178              * conn->securityMaxTrailerSize */
1179             hadd32(call->bytesSent, cp->length);
1180             rxi_PrepareSendPacket(call, cp, 0);
1181             queue_Append(&tmpq, cp);
1182 #ifdef RXDEBUG_PACKET
1183             tmpqc++;
1184 #endif /* RXDEBUG_PACKET */
1185             cp = call->currentPacket = (struct rx_packet *)0;
1186
1187             /* The head of the iovq is now the current packet */
1188             if (nbytes) {
1189                 if (queue_IsEmpty(&call->iovq)) {
1190                     call->error = RX_PROTOCOL_ERROR;
1191 #ifdef RXDEBUG_PACKET
1192                     tmpqc -=
1193 #endif /* RXDEBUG_PACKET */
1194                         rxi_FreePackets(0, &tmpq);
1195                     return 0;
1196                 }
1197                 cp = queue_First(&call->iovq, rx_packet);
1198                 queue_Remove(cp);
1199                 cp->flags &= ~RX_PKTFLAG_IOVQ;
1200 #ifdef RXDEBUG_PACKET
1201                 call->iovqc--;
1202 #endif /* RXDEBUG_PACKET */
1203                 cp->flags |= RX_PKTFLAG_CP;
1204                 call->currentPacket = cp;
1205                 call->nFree = cp->length;
1206                 call->curvec = 1;
1207                 call->curpos =
1208                     (char *)cp->wirevec[1].iov_base +
1209                     call->conn->securityHeaderSize;
1210                 call->curlen =
1211                     cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
1212             }
1213         }
1214
1215         if (nbytes) {
1216             /* The next iovec should point to the current position */
1217             if (iov[nextio].iov_base != call->curpos
1218                 || iov[nextio].iov_len > (int)call->curlen) {
1219                 call->error = RX_PROTOCOL_ERROR;
1220                 if (cp) {
1221                     cp->flags &= ~RX_PKTFLAG_CP;
1222                     queue_Prepend(&tmpq, cp);
1223 #ifdef RXDEBUG_PACKET
1224                     tmpqc++;
1225 #endif /* RXDEBUG_PACKET */
1226                     cp = call->currentPacket = (struct rx_packet *)0;
1227                 }
1228 #ifdef RXDEBUG_PACKET
1229                 tmpqc -=
1230 #endif /* RXDEBUG_PACKET */
1231                     rxi_FreePackets(0, &tmpq);
1232                 return 0;
1233             }
1234             nbytes -= iov[nextio].iov_len;
1235             call->curpos += iov[nextio].iov_len;
1236             call->curlen -= iov[nextio].iov_len;
1237             call->nFree -= iov[nextio].iov_len;
1238             nextio++;
1239             if (call->curlen == 0) {
1240                 if (++call->curvec > cp->niovecs) {
1241                     call->nFree = 0;
1242                 } else {
1243                     call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
1244                     call->curlen = cp->wirevec[call->curvec].iov_len;
1245                 }
1246             }
1247         }
1248     } while (nbytes && nextio < nio);
1249
1250     /* Move the packets from the temporary queue onto the transmit queue.
1251      * We may end up with more than call->twind packets on the queue. */
1252     
1253     for (queue_Scan(&tmpq, p, np, rx_packet))
1254     {
1255         p->flags |= RX_PKTFLAG_TQ;
1256     }
1257     queue_SpliceAppend(&call->tq, &tmpq);
1258
1259     if (!(call->flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
1260         rxi_Start(0, call, 0, 0);
1261     }
1262
1263     /* Wait for the length of the transmit queue to fall below call->twind */
1264     while (!call->error && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
1265         clock_NewTime();
1266         call->startWait = clock_Sec();
1267 #ifdef  RX_ENABLE_LOCKS
1268         CV_WAIT(&call->cv_twind, &call->lock);
1269 #else
1270         call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
1271         osi_rxSleep(&call->twind);
1272 #endif
1273         call->startWait = 0;
1274     }
1275     /* cp is no longer valid since we may have given up the lock */
1276     cp = call->currentPacket;
1277
1278     if (call->error) {
1279         if (cp) {
1280             cp->flags &= ~RX_PKTFLAG_CP;
1281             rxi_FreePacket(cp);
1282             cp = call->currentPacket = (struct rx_packet *)0;
1283         }
1284         return 0;
1285     }
1286
1287     return requestCount - nbytes;
1288 }
1289
1290 int
1291 rx_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1292 {
1293     int bytes;
1294     SPLVAR;
1295
1296     NETPRI;
1297     MUTEX_ENTER(&call->lock);
1298     bytes = rxi_WritevProc(call, iov, nio, nbytes);
1299     MUTEX_EXIT(&call->lock);
1300     USERPRI;
1301     return bytes;
1302 }
1303
1304 /* Flush any buffered data to the stream, switch to read mode
1305  * (clients) or to EOF mode (servers) */
1306 void
1307 rxi_FlushWrite(struct rx_call *call)
1308 {
1309     struct rx_packet *cp = NULL;
1310
1311     /* Free any packets from the last call to ReadvProc/WritevProc */
1312     if (queue_IsNotEmpty(&call->iovq)) {
1313 #ifdef RXDEBUG_PACKET
1314         call->iovqc -=
1315 #endif /* RXDEBUG_PACKET */
1316             rxi_FreePackets(0, &call->iovq);
1317     }
1318
1319     if (call->mode == RX_MODE_SENDING) {
1320
1321         call->mode =
1322             (call->conn->type ==
1323              RX_CLIENT_CONNECTION ? RX_MODE_RECEIVING : RX_MODE_EOF);
1324
1325 #ifdef RX_KERNEL_TRACE
1326         {
1327             int glockOwner = ISAFS_GLOCK();
1328             if (!glockOwner)
1329                 AFS_GLOCK();
1330             afs_Trace3(afs_iclSetp, CM_TRACE_WASHERE, ICL_TYPE_STRING,
1331                        __FILE__, ICL_TYPE_INT32, __LINE__, ICL_TYPE_POINTER,
1332                        call);
1333             if (!glockOwner)
1334                 AFS_GUNLOCK();
1335         }
1336 #endif
1337
1338 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1339         /* Wait until TQ_BUSY is reset before adding any
1340          * packets to the transmit queue
1341          */
1342         while (call->flags & RX_CALL_TQ_BUSY) {
1343             call->flags |= RX_CALL_TQ_WAIT;
1344 #ifdef RX_ENABLE_LOCKS
1345             CV_WAIT(&call->cv_tq, &call->lock);
1346 #else /* RX_ENABLE_LOCKS */
1347             osi_rxSleep(&call->tq);
1348 #endif /* RX_ENABLE_LOCKS */
1349         }
1350 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1351
1352         /* cp is no longer valid since we may have given up the lock */
1353         cp = call->currentPacket;
1354
1355         if (cp) {
1356             /* cp->length is only supposed to be the user's data */
1357             /* cp->length was already set to (then-current) 
1358              * MaxUserDataSize or less. */
1359             cp->flags &= ~RX_PKTFLAG_CP;
1360             cp->length -= call->nFree;
1361             call->currentPacket = (struct rx_packet *)0;
1362             call->nFree = 0;
1363         } else {
1364             cp = rxi_AllocSendPacket(call, 0);
1365             if (!cp) {
1366                 /* Mode can no longer be MODE_SENDING */
1367                 return;
1368             }
1369             cp->length = 0;
1370             cp->niovecs = 2;    /* header + space for rxkad stuff */
1371             call->nFree = 0;
1372         }
1373
1374         /* The 1 specifies that this is the last packet */
1375         hadd32(call->bytesSent, cp->length);
1376         rxi_PrepareSendPacket(call, cp, 1);
1377         cp->flags |= RX_PKTFLAG_TQ;
1378         queue_Append(&call->tq, cp);
1379 #ifdef RXDEBUG_PACKET
1380         call->tqc++;
1381 #endif /* RXDEBUG_PACKET */
1382         if (!
1383             (call->
1384              flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
1385             rxi_Start(0, call, 0, 0);
1386         }
1387     }
1388 }
1389
1390 /* Flush any buffered data to the stream, switch to read mode
1391  * (clients) or to EOF mode (servers) */
1392 void
1393 rx_FlushWrite(struct rx_call *call)
1394 {
1395     SPLVAR;
1396     NETPRI;
1397     MUTEX_ENTER(&call->lock);
1398     rxi_FlushWrite(call);
1399     MUTEX_EXIT(&call->lock);
1400     USERPRI;
1401 }