unroll-experimental-rx-20090603
[openafs.git] / src / rx / rx_rdwr.c
1  /*
2   * Copyright 2000, International Business Machines Corporation and others.
3   * All Rights Reserved.
4   * 
5   * This software has been released under the terms of the IBM Public
6   * License.  For details, see the LICENSE file in the top-level source
7   * directory or online at http://www.openafs.org/dl/license10.html
8   */
9
10 #include <afsconfig.h>
11 #ifdef KERNEL
12 #include "afs/param.h"
13 #else
14 #include <afs/param.h>
15 #endif
16
17 RCSID
18     ("$Header$");
19
20 #ifdef KERNEL
21 #ifndef UKERNEL
22 #ifdef RX_KERNEL_TRACE
23 #include "rx_kcommon.h"
24 #endif
25 #if defined(AFS_DARWIN_ENV) || defined(AFS_XBSD_ENV)
26 #include "afs/sysincludes.h"
27 #else
28 #include "h/types.h"
29 #include "h/time.h"
30 #include "h/stat.h"
31 #if defined(AFS_AIX_ENV) || defined(AFS_AUX_ENV) || defined(AFS_SUN5_ENV) 
32 #include "h/systm.h"
33 #endif
34 #ifdef  AFS_OSF_ENV
35 #include <net/net_globals.h>
36 #endif /* AFS_OSF_ENV */
37 #ifdef AFS_LINUX20_ENV
38 #include "h/socket.h"
39 #endif
40 #include "netinet/in.h"
41 #if defined(AFS_SGI_ENV)
42 #include "afs/sysincludes.h"
43 #endif
44 #endif
45 #include "afs/afs_args.h"
46 #include "afs/afs_osi.h"
47 #if     (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
48 #include "h/systm.h"
49 #endif
50 #else /* !UKERNEL */
51 #include "afs/sysincludes.h"
52 #endif /* !UKERNEL */
53 #ifdef RXDEBUG
54 #undef RXDEBUG                  /* turn off debugging */
55 #endif /* RXDEBUG */
56
57 #include "rx_kmutex.h"
58 #include "rx/rx_kernel.h"
59 #include "rx/rx_clock.h"
60 #include "rx/rx_queue.h"
61 #include "rx/rx.h"
62 #include "rx/rx_globals.h"
63 #include "afs/lock.h"
64 #include "afsint.h"
65 #ifdef  AFS_OSF_ENV
66 #undef kmem_alloc
67 #undef kmem_free
68 #undef mem_alloc
69 #undef mem_free
70 #undef register
71 #endif /* AFS_OSF_ENV */
72 #else /* KERNEL */
73 # include <sys/types.h>
74 #ifdef AFS_NT40_ENV
75 # include <winsock2.h>
76 #else /* !AFS_NT40_ENV */
77 # include <sys/socket.h>
78 # include <sys/file.h>
79 # include <netdb.h>
80 # include <netinet/in.h>
81 # include <sys/stat.h>
82 # include <sys/time.h>
83 #endif /* !AFS_NT40_ENV */
84 #include <string.h>
85 #ifdef HAVE_UNISTD_H
86 #include <unistd.h>
87 #endif
88 # include "rx_user.h"
89 # include "rx_clock.h"
90 # include "rx_queue.h"
91 # include "rx.h"
92 # include "rx_globals.h"
93 #endif /* KERNEL */
94
95 #ifdef RX_LOCKS_DB
96 /* rxdb_fileID is used to identify the lock location, along with line#. */
97 static int rxdb_fileID = RXDB_FILE_RX_RDWR;
98 #endif /* RX_LOCKS_DB */
99 /* rxi_ReadProc -- internal version.
100  *
101  * LOCKS USED -- called at netpri with rx global lock and call->lock held.
102  */
103 int
104 rxi_ReadProc(struct rx_call *call, char *buf,
105              int nbytes)
106 {
107     struct rx_packet *cp = call->currentPacket;
108     struct rx_packet *rp;
109     int requestCount;
110     unsigned int t;
111
112 /* XXXX took out clock_NewTime from here.  Was it needed? */
113     requestCount = nbytes;
114
115     /* Free any packets from the last call to ReadvProc/WritevProc */
116     if (queue_IsNotEmpty(&call->iovq)) {
117 #ifdef RXDEBUG_PACKET
118         call->iovqc -=
119 #endif /* RXDEBUG_PACKET */
120             rxi_FreePackets(0, &call->iovq);
121     }
122
123     do {
124         if (call->nLeft == 0) {
125             /* Get next packet */
126             for (;;) {
127                 if (call->error || (call->mode != RX_MODE_RECEIVING)) {
128                     if (call->error) {
129                         return 0;
130                     }
131                     if (call->mode == RX_MODE_SENDING) {
132                         rxi_FlushWrite(call);
133                         continue;
134                     }
135                 }
136                 if (queue_IsNotEmpty(&call->rq)) {
137                     /* Check that next packet available is next in sequence */
138                     rp = queue_First(&call->rq, rx_packet);
139                     if (rp->header.seq == call->rnext) {
140                         afs_int32 error;
141                         struct rx_connection *conn = call->conn;
142                         queue_Remove(rp);
143                         rp->flags &= ~RX_PKTFLAG_RQ;
144 #ifdef RXDEBUG_PACKET
145                         call->rqc--;
146 #endif /* RXDEBUG_PACKET */
147
148                         /* RXS_CheckPacket called to undo RXS_PreparePacket's
149                          * work.  It may reduce the length of the packet by up
150                          * to conn->maxTrailerSize, to reflect the length of the
151                          * data + the header. */
152                         if ((error =
153                              RXS_CheckPacket(conn->securityObject, call,
154                                              rp))) {
155                             /* Used to merely shut down the call, but now we 
156                              * shut down the whole connection since this may 
157                              * indicate an attempt to hijack it */
158
159                             MUTEX_EXIT(&call->lock);
160                             rxi_ConnectionError(conn, error);
161                             MUTEX_ENTER(&conn->conn_data_lock);
162                             rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
163                             MUTEX_EXIT(&conn->conn_data_lock);
164                             rxi_FreePacket(rp);
165                             MUTEX_ENTER(&call->lock);
166
167                             return 0;
168                         }
169                         call->rnext++;
170                         cp = call->currentPacket = rp;
171                         call->currentPacket->flags |= RX_PKTFLAG_CP;
172                         call->curvec = 1;       /* 0th vec is always header */
173                         /* begin at the beginning [ more or less ], continue 
174                          * on until the end, then stop. */
175                         call->curpos =
176                             (char *)cp->wirevec[1].iov_base +
177                             call->conn->securityHeaderSize;
178                         call->curlen =
179                             cp->wirevec[1].iov_len -
180                             call->conn->securityHeaderSize;
181
182                         /* Notice that this code works correctly if the data
183                          * size is 0 (which it may be--no reply arguments from
184                          * server, for example).  This relies heavily on the
185                          * fact that the code below immediately frees the packet
186                          * (no yields, etc.).  If it didn't, this would be a
187                          * problem because a value of zero for call->nLeft
188                          * normally means that there is no read packet */
189                         call->nLeft = cp->length;
190                         hadd32(call->bytesRcvd, cp->length);
191
192                         /* Send a hard ack for every rxi_HardAckRate+1 packets
193                          * consumed. Otherwise schedule an event to send
194                          * the hard ack later on.
195                          */
196                         call->nHardAcks++;
197                         if (!(call->flags & RX_CALL_RECEIVE_DONE)) {
198                             if (call->nHardAcks > (u_short) rxi_HardAckRate) {
199                                 rxevent_Cancel(call->delayedAckEvent, call,
200                                                RX_CALL_REFCOUNT_DELAY);
201                                 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
202                             } else {
203                                 struct clock when, now;
204                                 clock_GetTime(&now);
205                                 when = now;
206                                 /* Delay to consolidate ack packets */
207                                 clock_Add(&when, &rx_hardAckDelay);
208                                 if (!call->delayedAckEvent
209                                     || clock_Gt(&call->delayedAckEvent->
210                                                 eventTime, &when)) {
211                                     rxevent_Cancel(call->delayedAckEvent,
212                                                    call,
213                                                    RX_CALL_REFCOUNT_DELAY);
214                                     CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
215                                     call->delayedAckEvent =
216                                       rxevent_PostNow(&when, &now,
217                                                      rxi_SendDelayedAck, call,
218                                                      0);
219                                 }
220                             }
221                         }
222                         break;
223                     }
224                 }
225
226                 /* 
227                  * If we reach this point either we have no packets in the
228                  * receive queue or the next packet in the queue is not the
229                  * one we are looking for.  There is nothing else for us to
230                  * do but wait for another packet to arrive.
231                  */
232
233                 /* Are there ever going to be any more packets? */
234                 if (call->flags & RX_CALL_RECEIVE_DONE) {
235                     return requestCount - nbytes;
236                 }
237                 /* Wait for in-sequence packet */
238                 call->flags |= RX_CALL_READER_WAIT;
239                 clock_NewTime();
240                 call->startWait = clock_Sec();
241                 while (call->flags & RX_CALL_READER_WAIT) {
242 #ifdef  RX_ENABLE_LOCKS
243                     CV_WAIT(&call->cv_rq, &call->lock);
244 #else
245                     osi_rxSleep(&call->rq);
246 #endif
247                 }
248                 /* cp is no longer valid since we may have given up the lock */
249                 cp = call->currentPacket;
250
251                 call->startWait = 0;
252 #ifdef RX_ENABLE_LOCKS
253                 if (call->error) {
254                     return 0;
255                 }
256 #endif /* RX_ENABLE_LOCKS */
257             }
258         } else
259             /* assert(cp); */
260             /* MTUXXX  this should be replaced by some error-recovery code before shipping */
261             /* yes, the following block is allowed to be the ELSE clause (or not) */
262             /* It's possible for call->nLeft to be smaller than any particular
263              * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
264              * reflects the size of the buffer.  We have to keep track of the
265              * number of bytes read in the length field of the packet struct.  On
266              * the final portion of a received packet, it's almost certain that
267              * call->nLeft will be smaller than the final buffer. */
268             while (nbytes && cp) {
269                 t = MIN((int)call->curlen, nbytes);
270                 t = MIN(t, (int)call->nLeft);
271                 memcpy(buf, call->curpos, t);
272                 buf += t;
273                 nbytes -= t;
274                 call->curpos += t;
275                 call->curlen -= t;
276                 call->nLeft -= t;
277
278                 if (!call->nLeft) {
279                     /* out of packet.  Get another one. */
280                     call->currentPacket->flags &= ~RX_PKTFLAG_CP;
281                     rxi_FreePacket(cp);
282                     cp = call->currentPacket = (struct rx_packet *)0;
283                 } else if (!call->curlen) {
284                     /* need to get another struct iov */
285                     if (++call->curvec >= cp->niovecs) {
286                         /* current packet is exhausted, get ready for another */
287                         /* don't worry about curvec and stuff, they get set somewhere else */
288                         call->currentPacket->flags &= ~RX_PKTFLAG_CP;
289                         rxi_FreePacket(cp);
290                         cp = call->currentPacket = (struct rx_packet *)0;
291                         call->nLeft = 0;
292                     } else {
293                         call->curpos =
294                             (char *)cp->wirevec[call->curvec].iov_base;
295                         call->curlen = cp->wirevec[call->curvec].iov_len;
296                     }
297                 }
298             }
299         if (!nbytes) {
300             /* user buffer is full, return */
301             return requestCount;
302         }
303
304     } while (nbytes);
305
306     return requestCount;
307 }
308
309 int
310 rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
311 {
312     int bytes;
313     int tcurlen;
314     int tnLeft;
315     char *tcurpos;
316     SPLVAR;
317
318     /*
319      * Free any packets from the last call to ReadvProc/WritevProc.
320      * We do not need the lock because the receiver threads only
321      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
322      * RX_CALL_IOVEC_WAIT is always cleared before returning from
323      * ReadvProc/WritevProc.
324      */
325     if (!queue_IsEmpty(&call->iovq)) {
326 #ifdef RXDEBUG_PACKET
327         call->iovqc -=
328 #endif /* RXDEBUG_PACKET */
329             rxi_FreePackets(0, &call->iovq);
330     }
331
332     /*
333      * Most common case, all of the data is in the current iovec.
334      * We do not need the lock because this is the only thread that
335      * updates the curlen, curpos, nLeft fields.
336      *
337      * We are relying on nLeft being zero unless the call is in receive mode.
338      */
339     tcurlen = call->curlen;
340     tnLeft = call->nLeft;
341     if (!call->error && tcurlen > nbytes && tnLeft > nbytes) {
342         tcurpos = call->curpos;
343         memcpy(buf, tcurpos, nbytes);
344         call->curpos = tcurpos + nbytes;
345         call->curlen = tcurlen - nbytes;
346         call->nLeft = tnLeft - nbytes;
347
348         if (!call->nLeft) {
349             /* out of packet.  Get another one. */
350             NETPRI;
351             MUTEX_ENTER(&call->lock);
352             rxi_FreePacket(call->currentPacket);
353             call->currentPacket = (struct rx_packet *)0;
354             MUTEX_EXIT(&call->lock);
355             USERPRI;
356         }
357         return nbytes;
358     }
359
360     NETPRI;
361     MUTEX_ENTER(&call->lock);
362     bytes = rxi_ReadProc(call, buf, nbytes);
363     MUTEX_EXIT(&call->lock);
364     USERPRI;
365     return bytes;
366 }
367
368 /* Optimization for unmarshalling 32 bit integers */
369 int
370 rx_ReadProc32(struct rx_call *call, afs_int32 * value)
371 {
372     int bytes;
373     int tcurlen;
374     int tnLeft;
375     char *tcurpos;
376     SPLVAR;
377
378     /*
379      * Free any packets from the last call to ReadvProc/WritevProc.
380      * We do not need the lock because the receiver threads only
381      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
382      * RX_CALL_IOVEC_WAIT is always cleared before returning from
383      * ReadvProc/WritevProc.
384      */
385     if (!queue_IsEmpty(&call->iovq)) {
386 #ifdef RXDEBUG_PACKET
387         call->iovqc -=
388 #endif /* RXDEBUG_PACKET */
389             rxi_FreePackets(0, &call->iovq);
390     }
391
392     /*
393      * Most common case, all of the data is in the current iovec.
394      * We do not need the lock because this is the only thread that
395      * updates the curlen, curpos, nLeft fields.
396      *
397      * We are relying on nLeft being zero unless the call is in receive mode.
398      */
399     tcurlen = call->curlen;
400     tnLeft = call->nLeft;
401     if (!call->error && tcurlen >= sizeof(afs_int32)
402         && tnLeft >= sizeof(afs_int32)) {
403         tcurpos = call->curpos;
404         memcpy((char *)value, tcurpos, sizeof(afs_int32));
405         call->curpos = tcurpos + sizeof(afs_int32);
406         call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
407         call->nLeft = (u_short)(tnLeft - sizeof(afs_int32));
408         if (!call->nLeft && call->currentPacket != NULL) {
409             /* out of packet.  Get another one. */
410             NETPRI;
411             MUTEX_ENTER(&call->lock);
412             rxi_FreePacket(call->currentPacket);
413             call->currentPacket = (struct rx_packet *)0;
414             MUTEX_EXIT(&call->lock);
415             USERPRI;
416         }
417         return sizeof(afs_int32);
418     }
419
420     NETPRI;
421     MUTEX_ENTER(&call->lock);
422     bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
423     MUTEX_EXIT(&call->lock);
424     USERPRI;
425     return bytes;
426 }
427
428 /* rxi_FillReadVec
429  *
430  * Uses packets in the receive queue to fill in as much of the
431  * current iovec as possible. Does not block if it runs out
432  * of packets to complete the iovec. Return true if an ack packet
433  * was sent, otherwise return false */
434 int
435 rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
436 {
437     int didConsume = 0;
438     int didHardAck = 0;
439     unsigned int t;
440     struct rx_packet *rp;
441     struct rx_packet *curp;
442     struct iovec *call_iov;
443     struct iovec *cur_iov = NULL;
444
445     curp = call->currentPacket;
446     if (curp) {
447         cur_iov = &curp->wirevec[call->curvec];
448     }
449     call_iov = &call->iov[call->iovNext];
450
451     while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) {
452         if (call->nLeft == 0) {
453             /* Get next packet */
454             if (queue_IsNotEmpty(&call->rq)) {
455                 /* Check that next packet available is next in sequence */
456                 rp = queue_First(&call->rq, rx_packet);
457                 if (rp->header.seq == call->rnext) {
458                     afs_int32 error;
459                     struct rx_connection *conn = call->conn;
460                     queue_Remove(rp);
461                     rp->flags &= ~RX_PKTFLAG_RQ;
462 #ifdef RXDEBUG_PACKET
463                     call->rqc--;
464 #endif /* RXDEBUG_PACKET */
465
466                     /* RXS_CheckPacket called to undo RXS_PreparePacket's
467                      * work.  It may reduce the length of the packet by up
468                      * to conn->maxTrailerSize, to reflect the length of the
469                      * data + the header. */
470                     if ((error =
471                          RXS_CheckPacket(conn->securityObject, call, rp))) {
472                         /* Used to merely shut down the call, but now we 
473                          * shut down the whole connection since this may 
474                          * indicate an attempt to hijack it */
475
476                         MUTEX_EXIT(&call->lock);
477                         rxi_ConnectionError(conn, error);
478                         MUTEX_ENTER(&conn->conn_data_lock);
479                         rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
480                         MUTEX_EXIT(&conn->conn_data_lock);
481                         rxi_FreePacket(rp);
482                         MUTEX_ENTER(&call->lock);
483
484                         return 1;
485                     }
486                     call->rnext++;
487                     curp = call->currentPacket = rp;
488                     call->currentPacket->flags |= RX_PKTFLAG_CP;
489                     call->curvec = 1;   /* 0th vec is always header */
490                     cur_iov = &curp->wirevec[1];
491                     /* begin at the beginning [ more or less ], continue 
492                      * on until the end, then stop. */
493                     call->curpos =
494                         (char *)curp->wirevec[1].iov_base +
495                         call->conn->securityHeaderSize;
496                     call->curlen =
497                         curp->wirevec[1].iov_len -
498                         call->conn->securityHeaderSize;
499
500                     /* Notice that this code works correctly if the data
501                      * size is 0 (which it may be--no reply arguments from
502                      * server, for example).  This relies heavily on the
503                      * fact that the code below immediately frees the packet
504                      * (no yields, etc.).  If it didn't, this would be a
505                      * problem because a value of zero for call->nLeft
506                      * normally means that there is no read packet */
507                     call->nLeft = curp->length;
508                     hadd32(call->bytesRcvd, curp->length);
509
510                     /* Send a hard ack for every rxi_HardAckRate+1 packets
511                      * consumed. Otherwise schedule an event to send
512                      * the hard ack later on.
513                      */
514                     call->nHardAcks++;
515                     didConsume = 1;
516                     continue;
517                 }
518             }
519             break;
520         }
521
522         /* It's possible for call->nLeft to be smaller than any particular
523          * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
524          * reflects the size of the buffer.  We have to keep track of the
525          * number of bytes read in the length field of the packet struct.  On
526          * the final portion of a received packet, it's almost certain that
527          * call->nLeft will be smaller than the final buffer. */
528         while (call->iovNBytes && call->iovNext < call->iovMax && curp) {
529
530             t = MIN((int)call->curlen, call->iovNBytes);
531             t = MIN(t, (int)call->nLeft);
532             call_iov->iov_base = call->curpos;
533             call_iov->iov_len = t;
534             call_iov++;
535             call->iovNext++;
536             call->iovNBytes -= t;
537             call->curpos += t;
538             call->curlen -= t;
539             call->nLeft -= t;
540
541             if (!call->nLeft) {
542                 /* out of packet.  Get another one. */
543                 curp->flags &= ~RX_PKTFLAG_CP;
544                 curp->flags |= RX_PKTFLAG_IOVQ;
545                 queue_Append(&call->iovq, curp);
546 #ifdef RXDEBUG_PACKET
547                 call->iovqc++;
548 #endif /* RXDEBUG_PACKET */
549                 curp = call->currentPacket = (struct rx_packet *)0;
550             } else if (!call->curlen) {
551                 /* need to get another struct iov */
552                 if (++call->curvec >= curp->niovecs) {
553                     /* current packet is exhausted, get ready for another */
554                     /* don't worry about curvec and stuff, they get set somewhere else */
555                     curp->flags &= ~RX_PKTFLAG_CP;
556                     curp->flags |= RX_PKTFLAG_IOVQ;
557                     queue_Append(&call->iovq, curp);
558 #ifdef RXDEBUG_PACKET
559                     call->iovqc++;
560 #endif /* RXDEBUG_PACKET */
561                     curp = call->currentPacket = (struct rx_packet *)0;
562                     call->nLeft = 0;
563                 } else {
564                     cur_iov++;
565                     call->curpos = (char *)cur_iov->iov_base;
566                     call->curlen = cur_iov->iov_len;
567                 }
568             }
569         }
570     }
571
572     /* If we consumed any packets then check whether we need to
573      * send a hard ack. */
574     if (didConsume && (!(call->flags & RX_CALL_RECEIVE_DONE))) {
575         if (call->nHardAcks > (u_short) rxi_HardAckRate) {
576             rxevent_Cancel(call->delayedAckEvent, call,
577                            RX_CALL_REFCOUNT_DELAY);
578             rxi_SendAck(call, 0, serial, RX_ACK_DELAY, 0);
579             didHardAck = 1;
580         } else {
581             struct clock when, now;
582             clock_GetTime(&now);
583             when = now;
584             /* Delay to consolidate ack packets */
585             clock_Add(&when, &rx_hardAckDelay);
586             if (!call->delayedAckEvent
587                 || clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
588                 rxevent_Cancel(call->delayedAckEvent, call,
589                                RX_CALL_REFCOUNT_DELAY);
590                 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
591                 call->delayedAckEvent =
592                     rxevent_PostNow(&when, &now, rxi_SendDelayedAck, call, 0);
593             }
594         }
595     }
596     return didHardAck;
597 }
598
599
600 /* rxi_ReadvProc -- internal version.
601  *
602  * Fills in an iovec with pointers to the packet buffers. All packets
603  * except the last packet (new current packet) are moved to the iovq
604  * while the application is processing the data.
605  *
606  * LOCKS USED -- called at netpri with rx global lock and call->lock held.
607  */
608 int
609 rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
610               int nbytes)
611 {
612     int requestCount;
613     int nextio;
614
615     requestCount = nbytes;
616     nextio = 0;
617
618     /* Free any packets from the last call to ReadvProc/WritevProc */
619     if (queue_IsNotEmpty(&call->iovq)) {
620 #ifdef RXDEBUG_PACKET
621         call->iovqc -=
622 #endif /* RXDEBUG_PACKET */
623             rxi_FreePackets(0, &call->iovq);
624     }
625
626     if (call->mode == RX_MODE_SENDING) {
627         rxi_FlushWrite(call);
628     }
629
630     if (call->error) {
631         return 0;
632     }
633
634     /* Get whatever data is currently available in the receive queue.
635      * If rxi_FillReadVec sends an ack packet then it is possible
636      * that we will receive more data while we drop the call lock
637      * to send the packet. Set the RX_CALL_IOVEC_WAIT flag
638      * here to avoid a race with the receive thread if we send
639      * hard acks in rxi_FillReadVec. */
640     call->flags |= RX_CALL_IOVEC_WAIT;
641     call->iovNBytes = nbytes;
642     call->iovMax = maxio;
643     call->iovNext = 0;
644     call->iov = iov;
645     rxi_FillReadVec(call, 0);
646
647     /* if we need more data then sleep until the receive thread has
648      * filled in the rest. */
649     if (!call->error && call->iovNBytes && call->iovNext < call->iovMax
650         && !(call->flags & RX_CALL_RECEIVE_DONE)) {
651         call->flags |= RX_CALL_READER_WAIT;
652         clock_NewTime();
653         call->startWait = clock_Sec();
654         while (call->flags & RX_CALL_READER_WAIT) {
655 #ifdef  RX_ENABLE_LOCKS
656             CV_WAIT(&call->cv_rq, &call->lock);
657 #else
658             osi_rxSleep(&call->rq);
659 #endif
660         }
661         call->startWait = 0;
662     }
663     call->flags &= ~RX_CALL_IOVEC_WAIT;
664 #ifdef RX_ENABLE_LOCKS
665     if (call->error) {
666         return 0;
667     }
668 #endif /* RX_ENABLE_LOCKS */
669
670     call->iov = NULL;
671     *nio = call->iovNext;
672     return nbytes - call->iovNBytes;
673 }
674
675 int
676 rx_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
677              int nbytes)
678 {
679     int bytes;
680     SPLVAR;
681
682     NETPRI;
683     MUTEX_ENTER(&call->lock);
684     bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
685     MUTEX_EXIT(&call->lock);
686     USERPRI;
687     return bytes;
688 }
689
690 /* rxi_WriteProc -- internal version.
691  *
692  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
693
694 int
695 rxi_WriteProc(struct rx_call *call, char *buf,
696               int nbytes)
697 {
698     struct rx_connection *conn = call->conn;
699     struct rx_packet *cp = call->currentPacket;
700     unsigned int t;
701     int requestCount = nbytes;
702
703     /* Free any packets from the last call to ReadvProc/WritevProc */
704     if (queue_IsNotEmpty(&call->iovq)) {
705 #ifdef RXDEBUG_PACKET
706         call->iovqc -=
707 #endif /* RXDEBUG_PACKET */
708             rxi_FreePackets(0, &call->iovq);
709     }
710
711     if (call->mode != RX_MODE_SENDING) {
712         if ((conn->type == RX_SERVER_CONNECTION)
713             && (call->mode == RX_MODE_RECEIVING)) {
714             call->mode = RX_MODE_SENDING;
715             if (cp) {
716                 cp->flags &= ~RX_PKTFLAG_CP;
717                 rxi_FreePacket(cp);
718                 cp = call->currentPacket = (struct rx_packet *)0;
719                 call->nLeft = 0;
720                 call->nFree = 0;
721             }
722         } else {
723             return 0;
724         }
725     }
726
727     /* Loop condition is checked at end, so that a write of 0 bytes
728      * will force a packet to be created--specially for the case where
729      * there are 0 bytes on the stream, but we must send a packet
730      * anyway. */
731     do {
732         if (call->nFree == 0) {
733             if (!call->error && cp) {
734                 /* Clear the current packet now so that if
735                  * we are forced to wait and drop the lock 
736                  * the packet we are planning on using 
737                  * cannot be freed.
738                  */
739                 cp->flags &= ~RX_PKTFLAG_CP;
740                 call->currentPacket = (struct rx_packet *)0;
741 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
742                 /* Wait until TQ_BUSY is reset before adding any
743                  * packets to the transmit queue
744                  */
745                 while (call->flags & RX_CALL_TQ_BUSY) {
746                     call->flags |= RX_CALL_TQ_WAIT;
747 #ifdef RX_ENABLE_LOCKS
748                     CV_WAIT(&call->cv_tq, &call->lock);
749 #else /* RX_ENABLE_LOCKS */
750                     osi_rxSleep(&call->tq);
751 #endif /* RX_ENABLE_LOCKS */
752                 }
753 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
754                 clock_NewTime();        /* Bogus:  need new time package */
755                 /* The 0, below, specifies that it is not the last packet: 
756                  * there will be others. PrepareSendPacket may
757                  * alter the packet length by up to
758                  * conn->securityMaxTrailerSize */
759                 hadd32(call->bytesSent, cp->length);
760                 rxi_PrepareSendPacket(call, cp, 0);
761                 cp->flags |= RX_PKTFLAG_TQ;
762                 queue_Append(&call->tq, cp);
763 #ifdef RXDEBUG_PACKET
764                 call->tqc++;
765 #endif /* RXDEBUG_PACKET */
766                 cp = (struct rx_packet *)0;
767                 if (!
768                     (call->
769                      flags & (RX_CALL_FAST_RECOVER |
770                               RX_CALL_FAST_RECOVER_WAIT))) {
771                     rxi_Start(0, call, 0, 0);
772                 }
773             } else if (cp) {
774                 cp->flags &= ~RX_PKTFLAG_CP;
775                 rxi_FreePacket(cp);
776                 cp = call->currentPacket = (struct rx_packet *)0;
777             }
778             /* Wait for transmit window to open up */
779             while (!call->error
780                    && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
781                 clock_NewTime();
782                 call->startWait = clock_Sec();
783
784 #ifdef  RX_ENABLE_LOCKS
785                 CV_WAIT(&call->cv_twind, &call->lock);
786 #else
787                 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
788                 osi_rxSleep(&call->twind);
789 #endif
790
791                 call->startWait = 0;
792 #ifdef RX_ENABLE_LOCKS
793                 if (call->error) {
794                     return 0;
795                 }
796 #endif /* RX_ENABLE_LOCKS */
797             }
798             if ((cp = rxi_AllocSendPacket(call, nbytes))) {
799                 cp->flags |= RX_PKTFLAG_CP;
800                 call->currentPacket = cp;
801                 call->nFree = cp->length;
802                 call->curvec = 1;       /* 0th vec is always header */
803                 /* begin at the beginning [ more or less ], continue 
804                  * on until the end, then stop. */
805                 call->curpos =
806                     (char *)cp->wirevec[1].iov_base +
807                     call->conn->securityHeaderSize;
808                 call->curlen =
809                     cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
810             }
811             if (call->error) {
812                 if (cp) {
813                     cp->flags &= ~RX_PKTFLAG_CP;
814                     rxi_FreePacket(cp);
815                     call->currentPacket = NULL;
816                 }
817                 return 0;
818             }
819         }
820
821         if (cp && (int)call->nFree < nbytes) {
822             /* Try to extend the current buffer */
823             int len, mud;
824             len = cp->length;
825             mud = rx_MaxUserDataSize(call);
826             if (mud > len) {
827                 int want;
828                 want = MIN(nbytes - (int)call->nFree, mud - len);
829                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
830                 if (cp->length > (unsigned)mud)
831                     cp->length = mud;
832                 call->nFree += (cp->length - len);
833             }
834         }
835
836         /* If the remaining bytes fit in the buffer, then store them
837          * and return.  Don't ship a buffer that's full immediately to
838          * the peer--we don't know if it's the last buffer yet */
839
840         if (!cp) {
841             call->nFree = 0;
842         }
843
844         while (nbytes && call->nFree) {
845
846             t = MIN((int)call->curlen, nbytes);
847             t = MIN((int)call->nFree, t);
848             memcpy(call->curpos, buf, t);
849             buf += t;
850             nbytes -= t;
851             call->curpos += t;
852             call->curlen -= (u_short)t;
853             call->nFree -= (u_short)t;
854
855             if (!call->curlen) {
856                 /* need to get another struct iov */
857                 if (++call->curvec >= cp->niovecs) {
858                     /* current packet is full, extend or send it */
859                     call->nFree = 0;
860                 } else {
861                     call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
862                     call->curlen = cp->wirevec[call->curvec].iov_len;
863                 }
864             }
865         }                       /* while bytes to send and room to send them */
866
867         /* might be out of space now */
868         if (!nbytes) {
869             return requestCount;
870         } else;                 /* more data to send, so get another packet and keep going */
871     } while (nbytes);
872
873     return requestCount - nbytes;
874 }
875
876 int
877 rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
878 {
879     int bytes;
880     int tcurlen;
881     int tnFree;
882     char *tcurpos;
883     SPLVAR;
884
885     /*
886      * Free any packets from the last call to ReadvProc/WritevProc.
887      * We do not need the lock because the receiver threads only
888      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
889      * RX_CALL_IOVEC_WAIT is always cleared before returning from
890      * ReadvProc/WritevProc.
891      */
892     if (queue_IsNotEmpty(&call->iovq)) {
893 #ifdef RXDEBUG_PACKET
894         call->iovqc -=
895 #endif /* RXDEBUG_PACKET */
896             rxi_FreePackets(0, &call->iovq);
897     }
898
899     /*
900      * Most common case: all of the data fits in the current iovec.
901      * We do not need the lock because this is the only thread that
902      * updates the curlen, curpos, nFree fields.
903      *
904      * We are relying on nFree being zero unless the call is in send mode.
905      */
906     tcurlen = (int)call->curlen;
907     tnFree = (int)call->nFree;
908     if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
909         tcurpos = call->curpos;
910         memcpy(tcurpos, buf, nbytes);
911         call->curpos = tcurpos + nbytes;
912         call->curlen = (u_short)(tcurlen - nbytes);
913         call->nFree = (u_short)(tnFree - nbytes);
914         return nbytes;
915     }
916
917     NETPRI;
918     MUTEX_ENTER(&call->lock);
919     bytes = rxi_WriteProc(call, buf, nbytes);
920     MUTEX_EXIT(&call->lock);
921     USERPRI;
922     return bytes;
923 }
924
925 /* Optimization for marshalling 32 bit arguments */
926 int
927 rx_WriteProc32(struct rx_call *call, afs_int32 * value)
928 {
929     int bytes;
930     int tcurlen;
931     int tnFree;
932     char *tcurpos;
933     SPLVAR;
934
935     /*
936      * Free any packets from the last call to ReadvProc/WritevProc.
937      * We do not need the lock because the receiver threads only
938      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
939      * RX_CALL_IOVEC_WAIT is always cleared before returning from
940      * ReadvProc/WritevProc.
941      */
942     if (queue_IsNotEmpty(&call->iovq)) {
943 #ifdef RXDEBUG_PACKET
944         call->iovqc -=
945 #endif /* RXDEBUG_PACKET */
946             rxi_FreePackets(0, &call->iovq);
947     }
948
949     /*
950      * Most common case: all of the data fits in the current iovec.
951      * We do not need the lock because this is the only thread that
952      * updates the curlen, curpos, nFree fields.
953      *
954      * We are relying on nFree being zero unless the call is in send mode.
955      */
956     tcurlen = call->curlen;
957     tnFree = call->nFree;
958     if (!call->error && tcurlen >= sizeof(afs_int32)
959         && tnFree >= sizeof(afs_int32)) {
960         tcurpos = call->curpos;
961         if (!((size_t)tcurpos & (sizeof(afs_int32) - 1))) {
962             *((afs_int32 *) (tcurpos)) = *value;
963         } else {
964             memcpy(tcurpos, (char *)value, sizeof(afs_int32));
965         }
966         call->curpos = tcurpos + sizeof(afs_int32);
967         call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
968         call->nFree = (u_short)(tnFree - sizeof(afs_int32));
969         return sizeof(afs_int32);
970     }
971
972     NETPRI;
973     MUTEX_ENTER(&call->lock);
974     bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
975     MUTEX_EXIT(&call->lock);
976     USERPRI;
977     return bytes;
978 }
979
980 /* rxi_WritevAlloc -- internal version.
981  *
982  * Fill in an iovec to point to data in packet buffers. The application
983  * calls rxi_WritevProc when the buffers are full.
984  *
985  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
986
987 int
988 rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
989                 int nbytes)
990 {
991     struct rx_connection *conn = call->conn;
992     struct rx_packet *cp = call->currentPacket;
993     int requestCount;
994     int nextio;
995     /* Temporary values, real work is done in rxi_WritevProc */
996     int tnFree;
997     int tcurvec;
998     char *tcurpos;
999     int tcurlen;
1000
1001     requestCount = nbytes;
1002     nextio = 0;
1003
1004     /* Free any packets from the last call to ReadvProc/WritevProc */
1005     if (queue_IsNotEmpty(&call->iovq)) {
1006 #ifdef RXDEBUG_PACKET
1007         call->iovqc -=
1008 #endif /* RXDEBUG_PACKET */
1009             rxi_FreePackets(0, &call->iovq);
1010     }
1011
1012     if (call->mode != RX_MODE_SENDING) {
1013         if ((conn->type == RX_SERVER_CONNECTION)
1014             && (call->mode == RX_MODE_RECEIVING)) {
1015             call->mode = RX_MODE_SENDING;
1016             if (cp) {
1017                 cp->flags &= ~RX_PKTFLAG_CP;
1018                 rxi_FreePacket(cp);
1019                 cp = call->currentPacket = (struct rx_packet *)0;
1020                 call->nLeft = 0;
1021                 call->nFree = 0;
1022             }
1023         } else {
1024             return 0;
1025         }
1026     }
1027
1028     /* Set up the iovec to point to data in packet buffers. */
1029     tnFree = call->nFree;
1030     tcurvec = call->curvec;
1031     tcurpos = call->curpos;
1032     tcurlen = call->curlen;
1033     do {
1034         unsigned int t;
1035
1036         if (tnFree == 0) {
1037             /* current packet is full, allocate a new one */
1038             cp = rxi_AllocSendPacket(call, nbytes);
1039             if (cp == NULL) {
1040                 /* out of space, return what we have */
1041                 *nio = nextio;
1042                 return requestCount - nbytes;
1043             }
1044             cp->flags |= RX_PKTFLAG_IOVQ;
1045             queue_Append(&call->iovq, cp);
1046 #ifdef RXDEBUG_PACKET
1047             call->iovqc++;
1048 #endif /* RXDEBUG_PACKET */
1049             tnFree = cp->length;
1050             tcurvec = 1;
1051             tcurpos =
1052                 (char *)cp->wirevec[1].iov_base +
1053                 call->conn->securityHeaderSize;
1054             tcurlen = cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
1055         }
1056
1057         if (tnFree < nbytes) {
1058             /* try to extend the current packet */
1059             int len, mud;
1060             len = cp->length;
1061             mud = rx_MaxUserDataSize(call);
1062             if (mud > len) {
1063                 int want;
1064                 want = MIN(nbytes - tnFree, mud - len);
1065                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
1066                 if (cp->length > (unsigned)mud)
1067                     cp->length = mud;
1068                 tnFree += (cp->length - len);
1069                 if (cp == call->currentPacket) {
1070                     call->nFree += (cp->length - len);
1071                 }
1072             }
1073         }
1074
1075         /* fill in the next entry in the iovec */
1076         t = MIN(tcurlen, nbytes);
1077         t = MIN(tnFree, t);
1078         iov[nextio].iov_base = tcurpos;
1079         iov[nextio].iov_len = t;
1080         nbytes -= t;
1081         tcurpos += t;
1082         tcurlen -= t;
1083         tnFree -= t;
1084         nextio++;
1085
1086         if (!tcurlen) {
1087             /* need to get another struct iov */
1088             if (++tcurvec >= cp->niovecs) {
1089                 /* current packet is full, extend it or move on to next packet */
1090                 tnFree = 0;
1091             } else {
1092                 tcurpos = (char *)cp->wirevec[tcurvec].iov_base;
1093                 tcurlen = cp->wirevec[tcurvec].iov_len;
1094             }
1095         }
1096     } while (nbytes && nextio < maxio);
1097     *nio = nextio;
1098     return requestCount - nbytes;
1099 }
1100
1101 int
1102 rx_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
1103                int nbytes)
1104 {
1105     int bytes;
1106     SPLVAR;
1107
1108     NETPRI;
1109     MUTEX_ENTER(&call->lock);
1110     bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
1111     MUTEX_EXIT(&call->lock);
1112     USERPRI;
1113     return bytes;
1114 }
1115
1116 /* rxi_WritevProc -- internal version.
1117  *
1118  * Send buffers allocated in rxi_WritevAlloc.
1119  *
1120  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
1121
1122 int
1123 rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1124 {
1125     struct rx_packet *cp = NULL;
1126     struct rx_packet *p, *np;
1127     int nextio;
1128     int requestCount;
1129     struct rx_queue tmpq;
1130 #ifdef RXDEBUG_PACKET
1131     u_short tmpqc;
1132 #endif
1133
1134     requestCount = nbytes;
1135     nextio = 0;
1136
1137     if (call->mode != RX_MODE_SENDING) {
1138         call->error = RX_PROTOCOL_ERROR;
1139     }
1140 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1141     /* Wait until TQ_BUSY is reset before trying to move any
1142      * packets to the transmit queue.  */
1143     while (!call->error && call->flags & RX_CALL_TQ_BUSY) {
1144         call->flags |= RX_CALL_TQ_WAIT;
1145 #ifdef RX_ENABLE_LOCKS
1146         CV_WAIT(&call->cv_tq, &call->lock);
1147 #else /* RX_ENABLE_LOCKS */
1148         osi_rxSleep(&call->tq);
1149 #endif /* RX_ENABLE_LOCKS */
1150     }
1151 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1152     /* cp is no longer valid since we may have given up the lock */
1153     cp = call->currentPacket;
1154
1155     if (call->error) {
1156         if (cp) {
1157             cp->flags &= ~RX_PKTFLAG_CP;
1158             cp->flags |= RX_PKTFLAG_IOVQ;
1159             queue_Prepend(&call->iovq, cp);
1160 #ifdef RXDEBUG_PACKET
1161             call->iovqc++;
1162 #endif /* RXDEBUG_PACKET */
1163             cp = call->currentPacket = (struct rx_packet *)0;
1164         }
1165 #ifdef RXDEBUG_PACKET
1166         call->iovqc -=
1167 #endif /* RXDEBUG_PACKET */
1168             rxi_FreePackets(0, &call->iovq);
1169         return 0;
1170     }
1171
1172     /* Loop through the I/O vector adjusting packet pointers.
1173      * Place full packets back onto the iovq once they are ready
1174      * to send. Set RX_PROTOCOL_ERROR if any problems are found in
1175      * the iovec. We put the loop condition at the end to ensure that
1176      * a zero length write will push a short packet. */
1177     nextio = 0;
1178     queue_Init(&tmpq);
1179 #ifdef RXDEBUG_PACKET
1180     tmpqc = 0;
1181 #endif /* RXDEBUG_PACKET */
1182     do {
1183         if (call->nFree == 0 && cp) {
1184             clock_NewTime();    /* Bogus:  need new time package */
1185             /* The 0, below, specifies that it is not the last packet: 
1186              * there will be others. PrepareSendPacket may
1187              * alter the packet length by up to
1188              * conn->securityMaxTrailerSize */
1189             hadd32(call->bytesSent, cp->length);
1190             rxi_PrepareSendPacket(call, cp, 0);
1191             queue_Append(&tmpq, cp);
1192 #ifdef RXDEBUG_PACKET
1193             tmpqc++;
1194 #endif /* RXDEBUG_PACKET */
1195             cp = call->currentPacket = (struct rx_packet *)0;
1196
1197             /* The head of the iovq is now the current packet */
1198             if (nbytes) {
1199                 if (queue_IsEmpty(&call->iovq)) {
1200                     call->error = RX_PROTOCOL_ERROR;
1201 #ifdef RXDEBUG_PACKET
1202                     tmpqc -=
1203 #endif /* RXDEBUG_PACKET */
1204                         rxi_FreePackets(0, &tmpq);
1205                     return 0;
1206                 }
1207                 cp = queue_First(&call->iovq, rx_packet);
1208                 queue_Remove(cp);
1209                 cp->flags &= ~RX_PKTFLAG_IOVQ;
1210 #ifdef RXDEBUG_PACKET
1211                 call->iovqc--;
1212 #endif /* RXDEBUG_PACKET */
1213                 cp->flags |= RX_PKTFLAG_CP;
1214                 call->currentPacket = cp;
1215                 call->nFree = cp->length;
1216                 call->curvec = 1;
1217                 call->curpos =
1218                     (char *)cp->wirevec[1].iov_base +
1219                     call->conn->securityHeaderSize;
1220                 call->curlen =
1221                     cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
1222             }
1223         }
1224
1225         if (nbytes) {
1226             /* The next iovec should point to the current position */
1227             if (iov[nextio].iov_base != call->curpos
1228                 || iov[nextio].iov_len > (int)call->curlen) {
1229                 call->error = RX_PROTOCOL_ERROR;
1230                 if (cp) {
1231                     cp->flags &= ~RX_PKTFLAG_CP;
1232                     queue_Prepend(&tmpq, cp);
1233 #ifdef RXDEBUG_PACKET
1234                     tmpqc++;
1235 #endif /* RXDEBUG_PACKET */
1236                     cp = call->currentPacket = (struct rx_packet *)0;
1237                 }
1238 #ifdef RXDEBUG_PACKET
1239                 tmpqc -=
1240 #endif /* RXDEBUG_PACKET */
1241                     rxi_FreePackets(0, &tmpq);
1242                 return 0;
1243             }
1244             nbytes -= iov[nextio].iov_len;
1245             call->curpos += iov[nextio].iov_len;
1246             call->curlen -= iov[nextio].iov_len;
1247             call->nFree -= iov[nextio].iov_len;
1248             nextio++;
1249             if (call->curlen == 0) {
1250                 if (++call->curvec > cp->niovecs) {
1251                     call->nFree = 0;
1252                 } else {
1253                     call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
1254                     call->curlen = cp->wirevec[call->curvec].iov_len;
1255                 }
1256             }
1257         }
1258     } while (nbytes && nextio < nio);
1259
1260     /* Move the packets from the temporary queue onto the transmit queue.
1261      * We may end up with more than call->twind packets on the queue. */
1262     
1263     for (queue_Scan(&tmpq, p, np, rx_packet))
1264     {
1265         p->flags |= RX_PKTFLAG_TQ;
1266     }
1267     queue_SpliceAppend(&call->tq, &tmpq);
1268
1269     if (!(call->flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
1270         rxi_Start(0, call, 0, 0);
1271     }
1272
1273     /* Wait for the length of the transmit queue to fall below call->twind */
1274     while (!call->error && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
1275         clock_NewTime();
1276         call->startWait = clock_Sec();
1277 #ifdef  RX_ENABLE_LOCKS
1278         CV_WAIT(&call->cv_twind, &call->lock);
1279 #else
1280         call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
1281         osi_rxSleep(&call->twind);
1282 #endif
1283         call->startWait = 0;
1284     }
1285     /* cp is no longer valid since we may have given up the lock */
1286     cp = call->currentPacket;
1287
1288     if (call->error) {
1289         if (cp) {
1290             cp->flags &= ~RX_PKTFLAG_CP;
1291             rxi_FreePacket(cp);
1292             cp = call->currentPacket = (struct rx_packet *)0;
1293         }
1294         return 0;
1295     }
1296
1297     return requestCount - nbytes;
1298 }
1299
1300 int
1301 rx_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1302 {
1303     int bytes;
1304     SPLVAR;
1305
1306     NETPRI;
1307     MUTEX_ENTER(&call->lock);
1308     bytes = rxi_WritevProc(call, iov, nio, nbytes);
1309     MUTEX_EXIT(&call->lock);
1310     USERPRI;
1311     return bytes;
1312 }
1313
1314 /* Flush any buffered data to the stream, switch to read mode
1315  * (clients) or to EOF mode (servers) */
1316 void
1317 rxi_FlushWrite(struct rx_call *call)
1318 {
1319     struct rx_packet *cp = NULL;
1320
1321     /* Free any packets from the last call to ReadvProc/WritevProc */
1322     if (queue_IsNotEmpty(&call->iovq)) {
1323 #ifdef RXDEBUG_PACKET
1324         call->iovqc -=
1325 #endif /* RXDEBUG_PACKET */
1326             rxi_FreePackets(0, &call->iovq);
1327     }
1328
1329     if (call->mode == RX_MODE_SENDING) {
1330
1331         call->mode =
1332             (call->conn->type ==
1333              RX_CLIENT_CONNECTION ? RX_MODE_RECEIVING : RX_MODE_EOF);
1334
1335 #ifdef RX_KERNEL_TRACE
1336         {
1337             int glockOwner = ISAFS_GLOCK();
1338             if (!glockOwner)
1339                 AFS_GLOCK();
1340             afs_Trace3(afs_iclSetp, CM_TRACE_WASHERE, ICL_TYPE_STRING,
1341                        __FILE__, ICL_TYPE_INT32, __LINE__, ICL_TYPE_POINTER,
1342                        call);
1343             if (!glockOwner)
1344                 AFS_GUNLOCK();
1345         }
1346 #endif
1347
1348 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1349         /* Wait until TQ_BUSY is reset before adding any
1350          * packets to the transmit queue
1351          */
1352         while (call->flags & RX_CALL_TQ_BUSY) {
1353             call->flags |= RX_CALL_TQ_WAIT;
1354 #ifdef RX_ENABLE_LOCKS
1355             CV_WAIT(&call->cv_tq, &call->lock);
1356 #else /* RX_ENABLE_LOCKS */
1357             osi_rxSleep(&call->tq);
1358 #endif /* RX_ENABLE_LOCKS */
1359         }
1360 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1361
1362         /* cp is no longer valid since we may have given up the lock */
1363         cp = call->currentPacket;
1364
1365         if (cp) {
1366             /* cp->length is only supposed to be the user's data */
1367             /* cp->length was already set to (then-current) 
1368              * MaxUserDataSize or less. */
1369             cp->flags &= ~RX_PKTFLAG_CP;
1370             cp->length -= call->nFree;
1371             call->currentPacket = (struct rx_packet *)0;
1372             call->nFree = 0;
1373         } else {
1374             cp = rxi_AllocSendPacket(call, 0);
1375             if (!cp) {
1376                 /* Mode can no longer be MODE_SENDING */
1377                 return;
1378             }
1379             cp->length = 0;
1380             cp->niovecs = 2;    /* header + space for rxkad stuff */
1381             call->nFree = 0;
1382         }
1383
1384         /* The 1 specifies that this is the last packet */
1385         hadd32(call->bytesSent, cp->length);
1386         rxi_PrepareSendPacket(call, cp, 1);
1387         cp->flags |= RX_PKTFLAG_TQ;
1388         queue_Append(&call->tq, cp);
1389 #ifdef RXDEBUG_PACKET
1390         call->tqc++;
1391 #endif /* RXDEBUG_PACKET */
1392         if (!
1393             (call->
1394              flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
1395             rxi_Start(0, call, 0, 0);
1396         }
1397     }
1398 }
1399
1400 /* Flush any buffered data to the stream, switch to read mode
1401  * (clients) or to EOF mode (servers) */
1402 void
1403 rx_FlushWrite(struct rx_call *call)
1404 {
1405     SPLVAR;
1406     NETPRI;
1407     MUTEX_ENTER(&call->lock);
1408     rxi_FlushWrite(call);
1409     MUTEX_EXIT(&call->lock);
1410     USERPRI;
1411 }