rx-no-more-register-20090128
[openafs.git] / src / rx / rx_rdwr.c
1  /*
2   * Copyright 2000, International Business Machines Corporation and others.
3   * All Rights Reserved.
4   * 
5   * This software has been released under the terms of the IBM Public
6   * License.  For details, see the LICENSE file in the top-level source
7   * directory or online at http://www.openafs.org/dl/license10.html
8   */
9
10 #include <afsconfig.h>
11 #ifdef KERNEL
12 #include "afs/param.h"
13 #else
14 #include <afs/param.h>
15 #endif
16
17 RCSID
18     ("$Header$");
19
20 #ifdef KERNEL
21 #ifndef UKERNEL
22 #ifdef RX_KERNEL_TRACE
23 #include "rx_kcommon.h"
24 #endif
25 #if defined(AFS_DARWIN_ENV) || defined(AFS_XBSD_ENV)
26 #include "afs/sysincludes.h"
27 #else
28 #include "h/types.h"
29 #include "h/time.h"
30 #include "h/stat.h"
31 #if defined(AFS_AIX_ENV) || defined(AFS_AUX_ENV) || defined(AFS_SUN5_ENV) 
32 #include "h/systm.h"
33 #endif
34 #ifdef  AFS_OSF_ENV
35 #include <net/net_globals.h>
36 #endif /* AFS_OSF_ENV */
37 #ifdef AFS_LINUX20_ENV
38 #include "h/socket.h"
39 #endif
40 #include "netinet/in.h"
41 #if defined(AFS_SGI_ENV)
42 #include "afs/sysincludes.h"
43 #endif
44 #endif
45 #include "afs/afs_args.h"
46 #include "afs/afs_osi.h"
47 #if     (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
48 #include "h/systm.h"
49 #endif
50 #else /* !UKERNEL */
51 #include "afs/sysincludes.h"
52 #endif /* !UKERNEL */
53 #ifdef RXDEBUG
54 #undef RXDEBUG                  /* turn off debugging */
55 #endif /* RXDEBUG */
56
57 #include "rx_kmutex.h"
58 #include "rx/rx_kernel.h"
59 #include "rx/rx_clock.h"
60 #include "rx/rx_queue.h"
61 #include "rx/rx_internal.h"
62 #include "rx/rx.h"
63 #include "rx/rx_globals.h"
64 #include "afs/lock.h"
65 #include "afsint.h"
66 #ifdef  AFS_OSF_ENV
67 #undef kmem_alloc
68 #undef kmem_free
69 #undef mem_alloc
70 #undef mem_free
71 #undef register
72 #endif /* AFS_OSF_ENV */
73 #else /* KERNEL */
74 # include <sys/types.h>
75 #ifdef AFS_NT40_ENV
76 # include <winsock2.h>
77 #else /* !AFS_NT40_ENV */
78 # include <sys/socket.h>
79 # include <sys/file.h>
80 # include <netdb.h>
81 # include <netinet/in.h>
82 # include <sys/stat.h>
83 # include <sys/time.h>
84 #endif /* !AFS_NT40_ENV */
85 #include <string.h>
86 #ifdef HAVE_UNISTD_H
87 #include <unistd.h>
88 #endif
89 # include "rx_user.h"
90 # include "rx_clock.h"
91 # include "rx_queue.h"
92 # include "rx_internal.h"
93 # include "rx.h"
94 # include "rx_globals.h"
95 #endif /* KERNEL */
96
97 #ifdef RX_LOCKS_DB
98 /* rxdb_fileID is used to identify the lock location, along with line#. */
99 static int rxdb_fileID = RXDB_FILE_RX_RDWR;
100 #endif /* RX_LOCKS_DB */
101 /* rxi_ReadProc -- internal version.
102  *
103  * LOCKS USED -- called at netpri with rx global lock and call->lock held.
104  */
105 int
106 rxi_ReadProc(struct rx_call *call, char *buf,
107              int nbytes)
108 {
109     struct rx_packet *cp = call->currentPacket;
110     struct rx_packet *rp;
111     int requestCount;
112     unsigned int t;
113
114 /* XXXX took out clock_NewTime from here.  Was it needed? */
115     requestCount = nbytes;
116
117     /* Free any packets from the last call to ReadvProc/WritevProc */
118     if (queue_IsNotEmpty(&call->iovq)) {
119 #ifdef RXDEBUG_PACKET
120         call->iovqc -=
121 #endif /* RXDEBUG_PACKET */
122             rxi_FreePackets(0, &call->iovq);
123     }
124
125     do {
126         if (call->nLeft == 0) {
127             /* Get next packet */
128             for (;;) {
129                 if (call->error || (call->mode != RX_MODE_RECEIVING)) {
130                     if (call->error) {
131                         return 0;
132                     }
133                     if (call->mode == RX_MODE_SENDING) {
134                         rxi_FlushWrite(call);
135                         continue;
136                     }
137                 }
138                 if (queue_IsNotEmpty(&call->rq)) {
139                     /* Check that next packet available is next in sequence */
140                     rp = queue_First(&call->rq, rx_packet);
141                     if (rp->header.seq == call->rnext) {
142                         afs_int32 error;
143                         struct rx_connection *conn = call->conn;
144                         queue_Remove(rp);
145                         rp->flags &= ~RX_PKTFLAG_RQ;
146 #ifdef RXDEBUG_PACKET
147                         call->rqc--;
148 #endif /* RXDEBUG_PACKET */
149
150                         /* RXS_CheckPacket called to undo RXS_PreparePacket's
151                          * work.  It may reduce the length of the packet by up
152                          * to conn->maxTrailerSize, to reflect the length of the
153                          * data + the header. */
154                         if ((error =
155                              RXS_CheckPacket(conn->securityObject, call,
156                                              rp))) {
157                             /* Used to merely shut down the call, but now we 
158                              * shut down the whole connection since this may 
159                              * indicate an attempt to hijack it */
160
161                             MUTEX_EXIT(&call->lock);
162                             rxi_ConnectionError(conn, error);
163                             MUTEX_ENTER(&conn->conn_data_lock);
164                             rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
165                             MUTEX_EXIT(&conn->conn_data_lock);
166                             rxi_FreePacket(rp);
167                             MUTEX_ENTER(&call->lock);
168
169                             return 0;
170                         }
171                         call->rnext++;
172                         cp = call->currentPacket = rp;
173                         call->currentPacket->flags |= RX_PKTFLAG_CP;
174                         call->curvec = 1;       /* 0th vec is always header */
175                         /* begin at the beginning [ more or less ], continue 
176                          * on until the end, then stop. */
177                         call->curpos =
178                             (char *)cp->wirevec[1].iov_base +
179                             call->conn->securityHeaderSize;
180                         call->curlen =
181                             cp->wirevec[1].iov_len -
182                             call->conn->securityHeaderSize;
183
184                         /* Notice that this code works correctly if the data
185                          * size is 0 (which it may be--no reply arguments from
186                          * server, for example).  This relies heavily on the
187                          * fact that the code below immediately frees the packet
188                          * (no yields, etc.).  If it didn't, this would be a
189                          * problem because a value of zero for call->nLeft
190                          * normally means that there is no read packet */
191                         call->nLeft = cp->length;
192                         hadd32(call->bytesRcvd, cp->length);
193
194                         /* Send a hard ack for every rxi_HardAckRate+1 packets
195                          * consumed. Otherwise schedule an event to send
196                          * the hard ack later on.
197                          */
198                         call->nHardAcks++;
199                         if (!(call->flags & RX_CALL_RECEIVE_DONE)) {
200                             if (call->nHardAcks > (u_short) rxi_HardAckRate) {
201                                 rxevent_Cancel(call->delayedAckEvent, call,
202                                                RX_CALL_REFCOUNT_DELAY);
203                                 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
204                             } else {
205                                 struct clock when, now;
206                                 clock_GetTime(&now);
207                                 when = now;
208                                 /* Delay to consolidate ack packets */
209                                 clock_Add(&when, &rx_hardAckDelay);
210                                 if (!call->delayedAckEvent
211                                     || clock_Gt(&call->delayedAckEvent->
212                                                 eventTime, &when)) {
213                                     rxevent_Cancel(call->delayedAckEvent,
214                                                    call,
215                                                    RX_CALL_REFCOUNT_DELAY);
216                                     CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
217                                     call->delayedAckEvent =
218                                       rxevent_PostNow(&when, &now,
219                                                      rxi_SendDelayedAck, call,
220                                                      0);
221                                 }
222                             }
223                         }
224                         break;
225                     }
226                 }
227
228                 /* 
229                  * If we reach this point either we have no packets in the
230                  * receive queue or the next packet in the queue is not the
231                  * one we are looking for.  There is nothing else for us to
232                  * do but wait for another packet to arrive.
233                  */
234
235                 /* Are there ever going to be any more packets? */
236                 if (call->flags & RX_CALL_RECEIVE_DONE) {
237                     return requestCount - nbytes;
238                 }
239                 /* Wait for in-sequence packet */
240                 call->flags |= RX_CALL_READER_WAIT;
241                 clock_NewTime();
242                 call->startWait = clock_Sec();
243                 while (call->flags & RX_CALL_READER_WAIT) {
244 #ifdef  RX_ENABLE_LOCKS
245                     CV_WAIT(&call->cv_rq, &call->lock);
246 #else
247                     osi_rxSleep(&call->rq);
248 #endif
249                 }
250                 /* cp is no longer valid since we may have given up the lock */
251                 cp = call->currentPacket;
252
253                 call->startWait = 0;
254 #ifdef RX_ENABLE_LOCKS
255                 if (call->error) {
256                     return 0;
257                 }
258 #endif /* RX_ENABLE_LOCKS */
259             }
260         } else
261             /* assert(cp); */
262             /* MTUXXX  this should be replaced by some error-recovery code before shipping */
263             /* yes, the following block is allowed to be the ELSE clause (or not) */
264             /* It's possible for call->nLeft to be smaller than any particular
265              * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
266              * reflects the size of the buffer.  We have to keep track of the
267              * number of bytes read in the length field of the packet struct.  On
268              * the final portion of a received packet, it's almost certain that
269              * call->nLeft will be smaller than the final buffer. */
270             while (nbytes && cp) {
271                 t = MIN((int)call->curlen, nbytes);
272                 t = MIN(t, (int)call->nLeft);
273                 memcpy(buf, call->curpos, t);
274                 buf += t;
275                 nbytes -= t;
276                 call->curpos += t;
277                 call->curlen -= t;
278                 call->nLeft -= t;
279
280                 if (!call->nLeft) {
281                     /* out of packet.  Get another one. */
282                     call->currentPacket->flags &= ~RX_PKTFLAG_CP;
283                     rxi_FreePacket(cp);
284                     cp = call->currentPacket = (struct rx_packet *)0;
285                 } else if (!call->curlen) {
286                     /* need to get another struct iov */
287                     if (++call->curvec >= cp->niovecs) {
288                         /* current packet is exhausted, get ready for another */
289                         /* don't worry about curvec and stuff, they get set somewhere else */
290                         call->currentPacket->flags &= ~RX_PKTFLAG_CP;
291                         rxi_FreePacket(cp);
292                         cp = call->currentPacket = (struct rx_packet *)0;
293                         call->nLeft = 0;
294                     } else {
295                         call->curpos =
296                             (char *)cp->wirevec[call->curvec].iov_base;
297                         call->curlen = cp->wirevec[call->curvec].iov_len;
298                     }
299                 }
300             }
301         if (!nbytes) {
302             /* user buffer is full, return */
303             return requestCount;
304         }
305
306     } while (nbytes);
307
308     return requestCount;
309 }
310
311 int
312 rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
313 {
314     int bytes;
315     int tcurlen;
316     int tnLeft;
317     char *tcurpos;
318     SPLVAR;
319
320     /*
321      * Free any packets from the last call to ReadvProc/WritevProc.
322      * We do not need the lock because the receiver threads only
323      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
324      * RX_CALL_IOVEC_WAIT is always cleared before returning from
325      * ReadvProc/WritevProc.
326      */
327     if (!queue_IsEmpty(&call->iovq)) {
328 #ifdef RXDEBUG_PACKET
329         call->iovqc -=
330 #endif /* RXDEBUG_PACKET */
331             rxi_FreePackets(0, &call->iovq);
332     }
333
334     /*
335      * Most common case, all of the data is in the current iovec.
336      * We do not need the lock because this is the only thread that
337      * updates the curlen, curpos, nLeft fields.
338      *
339      * We are relying on nLeft being zero unless the call is in receive mode.
340      */
341     tcurlen = call->curlen;
342     tnLeft = call->nLeft;
343     if (!call->error && tcurlen > nbytes && tnLeft > nbytes) {
344         tcurpos = call->curpos;
345         memcpy(buf, tcurpos, nbytes);
346         call->curpos = tcurpos + nbytes;
347         call->curlen = tcurlen - nbytes;
348         call->nLeft = tnLeft - nbytes;
349
350         if (!call->nLeft) {
351             /* out of packet.  Get another one. */
352             NETPRI;
353             MUTEX_ENTER(&call->lock);
354             rxi_FreePacket(call->currentPacket);
355             call->currentPacket = (struct rx_packet *)0;
356             MUTEX_EXIT(&call->lock);
357             USERPRI;
358         }
359         return nbytes;
360     }
361
362     NETPRI;
363     MUTEX_ENTER(&call->lock);
364     bytes = rxi_ReadProc(call, buf, nbytes);
365     MUTEX_EXIT(&call->lock);
366     USERPRI;
367     return bytes;
368 }
369
370 /* Optimization for unmarshalling 32 bit integers */
371 int
372 rx_ReadProc32(struct rx_call *call, afs_int32 * value)
373 {
374     int bytes;
375     int tcurlen;
376     int tnLeft;
377     char *tcurpos;
378     SPLVAR;
379
380     /*
381      * Free any packets from the last call to ReadvProc/WritevProc.
382      * We do not need the lock because the receiver threads only
383      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
384      * RX_CALL_IOVEC_WAIT is always cleared before returning from
385      * ReadvProc/WritevProc.
386      */
387     if (!queue_IsEmpty(&call->iovq)) {
388 #ifdef RXDEBUG_PACKET
389         call->iovqc -=
390 #endif /* RXDEBUG_PACKET */
391             rxi_FreePackets(0, &call->iovq);
392     }
393
394     /*
395      * Most common case, all of the data is in the current iovec.
396      * We do not need the lock because this is the only thread that
397      * updates the curlen, curpos, nLeft fields.
398      *
399      * We are relying on nLeft being zero unless the call is in receive mode.
400      */
401     tcurlen = call->curlen;
402     tnLeft = call->nLeft;
403     if (!call->error && tcurlen >= sizeof(afs_int32)
404         && tnLeft >= sizeof(afs_int32)) {
405         tcurpos = call->curpos;
406         memcpy((char *)value, tcurpos, sizeof(afs_int32));
407         call->curpos = tcurpos + sizeof(afs_int32);
408         call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
409         call->nLeft = (u_short)(tnLeft - sizeof(afs_int32));
410         if (!call->nLeft && call->currentPacket != NULL) {
411             /* out of packet.  Get another one. */
412             NETPRI;
413             MUTEX_ENTER(&call->lock);
414             rxi_FreePacket(call->currentPacket);
415             call->currentPacket = (struct rx_packet *)0;
416             MUTEX_EXIT(&call->lock);
417             USERPRI;
418         }
419         return sizeof(afs_int32);
420     }
421
422     NETPRI;
423     MUTEX_ENTER(&call->lock);
424     bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
425     MUTEX_EXIT(&call->lock);
426     USERPRI;
427     return bytes;
428 }
429
430 /* rxi_FillReadVec
431  *
432  * Uses packets in the receive queue to fill in as much of the
433  * current iovec as possible. Does not block if it runs out
434  * of packets to complete the iovec. Return true if an ack packet
435  * was sent, otherwise return false */
436 int
437 rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
438 {
439     int didConsume = 0;
440     int didHardAck = 0;
441     unsigned int t;
442     struct rx_packet *rp;
443     struct rx_packet *curp;
444     struct iovec *call_iov;
445     struct iovec *cur_iov = NULL;
446
447     curp = call->currentPacket;
448     if (curp) {
449         cur_iov = &curp->wirevec[call->curvec];
450     }
451     call_iov = &call->iov[call->iovNext];
452
453     while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) {
454         if (call->nLeft == 0) {
455             /* Get next packet */
456             if (queue_IsNotEmpty(&call->rq)) {
457                 /* Check that next packet available is next in sequence */
458                 rp = queue_First(&call->rq, rx_packet);
459                 if (rp->header.seq == call->rnext) {
460                     afs_int32 error;
461                     struct rx_connection *conn = call->conn;
462                     queue_Remove(rp);
463                     rp->flags &= ~RX_PKTFLAG_RQ;
464 #ifdef RXDEBUG_PACKET
465                     call->rqc--;
466 #endif /* RXDEBUG_PACKET */
467
468                     /* RXS_CheckPacket called to undo RXS_PreparePacket's
469                      * work.  It may reduce the length of the packet by up
470                      * to conn->maxTrailerSize, to reflect the length of the
471                      * data + the header. */
472                     if ((error =
473                          RXS_CheckPacket(conn->securityObject, call, rp))) {
474                         /* Used to merely shut down the call, but now we 
475                          * shut down the whole connection since this may 
476                          * indicate an attempt to hijack it */
477
478                         MUTEX_EXIT(&call->lock);
479                         rxi_ConnectionError(conn, error);
480                         MUTEX_ENTER(&conn->conn_data_lock);
481                         rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
482                         MUTEX_EXIT(&conn->conn_data_lock);
483                         rxi_FreePacket(rp);
484                         MUTEX_ENTER(&call->lock);
485
486                         return 1;
487                     }
488                     call->rnext++;
489                     curp = call->currentPacket = rp;
490                     call->currentPacket->flags |= RX_PKTFLAG_CP;
491                     call->curvec = 1;   /* 0th vec is always header */
492                     cur_iov = &curp->wirevec[1];
493                     /* begin at the beginning [ more or less ], continue 
494                      * on until the end, then stop. */
495                     call->curpos =
496                         (char *)curp->wirevec[1].iov_base +
497                         call->conn->securityHeaderSize;
498                     call->curlen =
499                         curp->wirevec[1].iov_len -
500                         call->conn->securityHeaderSize;
501
502                     /* Notice that this code works correctly if the data
503                      * size is 0 (which it may be--no reply arguments from
504                      * server, for example).  This relies heavily on the
505                      * fact that the code below immediately frees the packet
506                      * (no yields, etc.).  If it didn't, this would be a
507                      * problem because a value of zero for call->nLeft
508                      * normally means that there is no read packet */
509                     call->nLeft = curp->length;
510                     hadd32(call->bytesRcvd, curp->length);
511
512                     /* Send a hard ack for every rxi_HardAckRate+1 packets
513                      * consumed. Otherwise schedule an event to send
514                      * the hard ack later on.
515                      */
516                     call->nHardAcks++;
517                     didConsume = 1;
518                     continue;
519                 }
520             }
521             break;
522         }
523
524         /* It's possible for call->nLeft to be smaller than any particular
525          * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
526          * reflects the size of the buffer.  We have to keep track of the
527          * number of bytes read in the length field of the packet struct.  On
528          * the final portion of a received packet, it's almost certain that
529          * call->nLeft will be smaller than the final buffer. */
530         while (call->iovNBytes && call->iovNext < call->iovMax && curp) {
531
532             t = MIN((int)call->curlen, call->iovNBytes);
533             t = MIN(t, (int)call->nLeft);
534             call_iov->iov_base = call->curpos;
535             call_iov->iov_len = t;
536             call_iov++;
537             call->iovNext++;
538             call->iovNBytes -= t;
539             call->curpos += t;
540             call->curlen -= t;
541             call->nLeft -= t;
542
543             if (!call->nLeft) {
544                 /* out of packet.  Get another one. */
545                 curp->flags &= ~RX_PKTFLAG_CP;
546                 curp->flags |= RX_PKTFLAG_IOVQ;
547                 queue_Append(&call->iovq, curp);
548 #ifdef RXDEBUG_PACKET
549                 call->iovqc++;
550 #endif /* RXDEBUG_PACKET */
551                 curp = call->currentPacket = (struct rx_packet *)0;
552             } else if (!call->curlen) {
553                 /* need to get another struct iov */
554                 if (++call->curvec >= curp->niovecs) {
555                     /* current packet is exhausted, get ready for another */
556                     /* don't worry about curvec and stuff, they get set somewhere else */
557                     curp->flags &= ~RX_PKTFLAG_CP;
558                     curp->flags |= RX_PKTFLAG_IOVQ;
559                     queue_Append(&call->iovq, curp);
560 #ifdef RXDEBUG_PACKET
561                     call->iovqc++;
562 #endif /* RXDEBUG_PACKET */
563                     curp = call->currentPacket = (struct rx_packet *)0;
564                     call->nLeft = 0;
565                 } else {
566                     cur_iov++;
567                     call->curpos = (char *)cur_iov->iov_base;
568                     call->curlen = cur_iov->iov_len;
569                 }
570             }
571         }
572     }
573
574     /* If we consumed any packets then check whether we need to
575      * send a hard ack. */
576     if (didConsume && (!(call->flags & RX_CALL_RECEIVE_DONE))) {
577         if (call->nHardAcks > (u_short) rxi_HardAckRate) {
578             rxevent_Cancel(call->delayedAckEvent, call,
579                            RX_CALL_REFCOUNT_DELAY);
580             rxi_SendAck(call, 0, serial, RX_ACK_DELAY, 0);
581             didHardAck = 1;
582         } else {
583             struct clock when, now;
584             clock_GetTime(&now);
585             when = now;
586             /* Delay to consolidate ack packets */
587             clock_Add(&when, &rx_hardAckDelay);
588             if (!call->delayedAckEvent
589                 || clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
590                 rxevent_Cancel(call->delayedAckEvent, call,
591                                RX_CALL_REFCOUNT_DELAY);
592                 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
593                 call->delayedAckEvent =
594                     rxevent_PostNow(&when, &now, rxi_SendDelayedAck, call, 0);
595             }
596         }
597     }
598     return didHardAck;
599 }
600
601
602 /* rxi_ReadvProc -- internal version.
603  *
604  * Fills in an iovec with pointers to the packet buffers. All packets
605  * except the last packet (new current packet) are moved to the iovq
606  * while the application is processing the data.
607  *
608  * LOCKS USED -- called at netpri with rx global lock and call->lock held.
609  */
610 int
611 rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
612               int nbytes)
613 {
614     int requestCount;
615     int nextio;
616
617     requestCount = nbytes;
618     nextio = 0;
619
620     /* Free any packets from the last call to ReadvProc/WritevProc */
621     if (queue_IsNotEmpty(&call->iovq)) {
622 #ifdef RXDEBUG_PACKET
623         call->iovqc -=
624 #endif /* RXDEBUG_PACKET */
625             rxi_FreePackets(0, &call->iovq);
626     }
627
628     if (call->mode == RX_MODE_SENDING) {
629         rxi_FlushWrite(call);
630     }
631
632     if (call->error) {
633         return 0;
634     }
635
636     /* Get whatever data is currently available in the receive queue.
637      * If rxi_FillReadVec sends an ack packet then it is possible
638      * that we will receive more data while we drop the call lock
639      * to send the packet. Set the RX_CALL_IOVEC_WAIT flag
640      * here to avoid a race with the receive thread if we send
641      * hard acks in rxi_FillReadVec. */
642     call->flags |= RX_CALL_IOVEC_WAIT;
643     call->iovNBytes = nbytes;
644     call->iovMax = maxio;
645     call->iovNext = 0;
646     call->iov = iov;
647     rxi_FillReadVec(call, 0);
648
649     /* if we need more data then sleep until the receive thread has
650      * filled in the rest. */
651     if (!call->error && call->iovNBytes && call->iovNext < call->iovMax
652         && !(call->flags & RX_CALL_RECEIVE_DONE)) {
653         call->flags |= RX_CALL_READER_WAIT;
654         clock_NewTime();
655         call->startWait = clock_Sec();
656         while (call->flags & RX_CALL_READER_WAIT) {
657 #ifdef  RX_ENABLE_LOCKS
658             CV_WAIT(&call->cv_rq, &call->lock);
659 #else
660             osi_rxSleep(&call->rq);
661 #endif
662         }
663         call->startWait = 0;
664     }
665     call->flags &= ~RX_CALL_IOVEC_WAIT;
666 #ifdef RX_ENABLE_LOCKS
667     if (call->error) {
668         return 0;
669     }
670 #endif /* RX_ENABLE_LOCKS */
671
672     call->iov = NULL;
673     *nio = call->iovNext;
674     return nbytes - call->iovNBytes;
675 }
676
677 int
678 rx_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
679              int nbytes)
680 {
681     int bytes;
682     SPLVAR;
683
684     NETPRI;
685     MUTEX_ENTER(&call->lock);
686     bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
687     MUTEX_EXIT(&call->lock);
688     USERPRI;
689     return bytes;
690 }
691
692 /* rxi_WriteProc -- internal version.
693  *
694  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
695
696 int
697 rxi_WriteProc(struct rx_call *call, char *buf,
698               int nbytes)
699 {
700     struct rx_connection *conn = call->conn;
701     struct rx_packet *cp = call->currentPacket;
702     unsigned int t;
703     int requestCount = nbytes;
704
705     /* Free any packets from the last call to ReadvProc/WritevProc */
706     if (queue_IsNotEmpty(&call->iovq)) {
707 #ifdef RXDEBUG_PACKET
708         call->iovqc -=
709 #endif /* RXDEBUG_PACKET */
710             rxi_FreePackets(0, &call->iovq);
711     }
712
713     if (call->mode != RX_MODE_SENDING) {
714         if ((conn->type == RX_SERVER_CONNECTION)
715             && (call->mode == RX_MODE_RECEIVING)) {
716             call->mode = RX_MODE_SENDING;
717             if (cp) {
718                 cp->flags &= ~RX_PKTFLAG_CP;
719                 rxi_FreePacket(cp);
720                 cp = call->currentPacket = (struct rx_packet *)0;
721                 call->nLeft = 0;
722                 call->nFree = 0;
723             }
724         } else {
725             return 0;
726         }
727     }
728
729     /* Loop condition is checked at end, so that a write of 0 bytes
730      * will force a packet to be created--specially for the case where
731      * there are 0 bytes on the stream, but we must send a packet
732      * anyway. */
733     do {
734         if (call->nFree == 0) {
735             if (!call->error && cp) {
736                 /* Clear the current packet now so that if
737                  * we are forced to wait and drop the lock 
738                  * the packet we are planning on using 
739                  * cannot be freed.
740                  */
741                 cp->flags &= ~RX_PKTFLAG_CP;
742                 call->currentPacket = (struct rx_packet *)0;
743 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
744                 /* Wait until TQ_BUSY is reset before adding any
745                  * packets to the transmit queue
746                  */
747                 while (call->flags & RX_CALL_TQ_BUSY) {
748                     call->flags |= RX_CALL_TQ_WAIT;
749 #ifdef RX_ENABLE_LOCKS
750                     CV_WAIT(&call->cv_tq, &call->lock);
751 #else /* RX_ENABLE_LOCKS */
752                     osi_rxSleep(&call->tq);
753 #endif /* RX_ENABLE_LOCKS */
754                 }
755 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
756                 clock_NewTime();        /* Bogus:  need new time package */
757                 /* The 0, below, specifies that it is not the last packet: 
758                  * there will be others. PrepareSendPacket may
759                  * alter the packet length by up to
760                  * conn->securityMaxTrailerSize */
761                 hadd32(call->bytesSent, cp->length);
762                 rxi_PrepareSendPacket(call, cp, 0);
763                 cp->flags |= RX_PKTFLAG_TQ;
764                 queue_Append(&call->tq, cp);
765 #ifdef RXDEBUG_PACKET
766                 call->tqc++;
767 #endif /* RXDEBUG_PACKET */
768                 cp = (struct rx_packet *)0;
769                 if (!
770                     (call->
771                      flags & (RX_CALL_FAST_RECOVER |
772                               RX_CALL_FAST_RECOVER_WAIT))) {
773                     rxi_Start(0, call, 0, 0);
774                 }
775             } else if (cp) {
776                 cp->flags &= ~RX_PKTFLAG_CP;
777                 rxi_FreePacket(cp);
778                 cp = call->currentPacket = (struct rx_packet *)0;
779             }
780             /* Wait for transmit window to open up */
781             while (!call->error
782                    && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
783                 clock_NewTime();
784                 call->startWait = clock_Sec();
785
786 #ifdef  RX_ENABLE_LOCKS
787                 CV_WAIT(&call->cv_twind, &call->lock);
788 #else
789                 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
790                 osi_rxSleep(&call->twind);
791 #endif
792
793                 call->startWait = 0;
794 #ifdef RX_ENABLE_LOCKS
795                 if (call->error) {
796                     return 0;
797                 }
798 #endif /* RX_ENABLE_LOCKS */
799             }
800             if ((cp = rxi_AllocSendPacket(call, nbytes))) {
801                 cp->flags |= RX_PKTFLAG_CP;
802                 call->currentPacket = cp;
803                 call->nFree = cp->length;
804                 call->curvec = 1;       /* 0th vec is always header */
805                 /* begin at the beginning [ more or less ], continue 
806                  * on until the end, then stop. */
807                 call->curpos =
808                     (char *)cp->wirevec[1].iov_base +
809                     call->conn->securityHeaderSize;
810                 call->curlen =
811                     cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
812             }
813             if (call->error) {
814                 if (cp) {
815                     cp->flags &= ~RX_PKTFLAG_CP;
816                     rxi_FreePacket(cp);
817                     call->currentPacket = NULL;
818                 }
819                 return 0;
820             }
821         }
822
823         if (cp && (int)call->nFree < nbytes) {
824             /* Try to extend the current buffer */
825             int len, mud;
826             len = cp->length;
827             mud = rx_MaxUserDataSize(call);
828             if (mud > len) {
829                 int want;
830                 want = MIN(nbytes - (int)call->nFree, mud - len);
831                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
832                 if (cp->length > (unsigned)mud)
833                     cp->length = mud;
834                 call->nFree += (cp->length - len);
835             }
836         }
837
838         /* If the remaining bytes fit in the buffer, then store them
839          * and return.  Don't ship a buffer that's full immediately to
840          * the peer--we don't know if it's the last buffer yet */
841
842         if (!cp) {
843             call->nFree = 0;
844         }
845
846         while (nbytes && call->nFree) {
847
848             t = MIN((int)call->curlen, nbytes);
849             t = MIN((int)call->nFree, t);
850             memcpy(call->curpos, buf, t);
851             buf += t;
852             nbytes -= t;
853             call->curpos += t;
854             call->curlen -= (u_short)t;
855             call->nFree -= (u_short)t;
856
857             if (!call->curlen) {
858                 /* need to get another struct iov */
859                 if (++call->curvec >= cp->niovecs) {
860                     /* current packet is full, extend or send it */
861                     call->nFree = 0;
862                 } else {
863                     call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
864                     call->curlen = cp->wirevec[call->curvec].iov_len;
865                 }
866             }
867         }                       /* while bytes to send and room to send them */
868
869         /* might be out of space now */
870         if (!nbytes) {
871             return requestCount;
872         } else;                 /* more data to send, so get another packet and keep going */
873     } while (nbytes);
874
875     return requestCount - nbytes;
876 }
877
878 int
879 rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
880 {
881     int bytes;
882     int tcurlen;
883     int tnFree;
884     char *tcurpos;
885     SPLVAR;
886
887     /*
888      * Free any packets from the last call to ReadvProc/WritevProc.
889      * We do not need the lock because the receiver threads only
890      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
891      * RX_CALL_IOVEC_WAIT is always cleared before returning from
892      * ReadvProc/WritevProc.
893      */
894     if (queue_IsNotEmpty(&call->iovq)) {
895 #ifdef RXDEBUG_PACKET
896         call->iovqc -=
897 #endif /* RXDEBUG_PACKET */
898             rxi_FreePackets(0, &call->iovq);
899     }
900
901     /*
902      * Most common case: all of the data fits in the current iovec.
903      * We do not need the lock because this is the only thread that
904      * updates the curlen, curpos, nFree fields.
905      *
906      * We are relying on nFree being zero unless the call is in send mode.
907      */
908     tcurlen = (int)call->curlen;
909     tnFree = (int)call->nFree;
910     if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
911         tcurpos = call->curpos;
912         memcpy(tcurpos, buf, nbytes);
913         call->curpos = tcurpos + nbytes;
914         call->curlen = (u_short)(tcurlen - nbytes);
915         call->nFree = (u_short)(tnFree - nbytes);
916         return nbytes;
917     }
918
919     NETPRI;
920     MUTEX_ENTER(&call->lock);
921     bytes = rxi_WriteProc(call, buf, nbytes);
922     MUTEX_EXIT(&call->lock);
923     USERPRI;
924     return bytes;
925 }
926
927 /* Optimization for marshalling 32 bit arguments */
928 int
929 rx_WriteProc32(struct rx_call *call, afs_int32 * value)
930 {
931     int bytes;
932     int tcurlen;
933     int tnFree;
934     char *tcurpos;
935     SPLVAR;
936
937     /*
938      * Free any packets from the last call to ReadvProc/WritevProc.
939      * We do not need the lock because the receiver threads only
940      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
941      * RX_CALL_IOVEC_WAIT is always cleared before returning from
942      * ReadvProc/WritevProc.
943      */
944     if (queue_IsNotEmpty(&call->iovq)) {
945 #ifdef RXDEBUG_PACKET
946         call->iovqc -=
947 #endif /* RXDEBUG_PACKET */
948             rxi_FreePackets(0, &call->iovq);
949     }
950
951     /*
952      * Most common case: all of the data fits in the current iovec.
953      * We do not need the lock because this is the only thread that
954      * updates the curlen, curpos, nFree fields.
955      *
956      * We are relying on nFree being zero unless the call is in send mode.
957      */
958     tcurlen = call->curlen;
959     tnFree = call->nFree;
960     if (!call->error && tcurlen >= sizeof(afs_int32)
961         && tnFree >= sizeof(afs_int32)) {
962         tcurpos = call->curpos;
963         if (!((size_t)tcurpos & (sizeof(afs_int32) - 1))) {
964             *((afs_int32 *) (tcurpos)) = *value;
965         } else {
966             memcpy(tcurpos, (char *)value, sizeof(afs_int32));
967         }
968         call->curpos = tcurpos + sizeof(afs_int32);
969         call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
970         call->nFree = (u_short)(tnFree - sizeof(afs_int32));
971         return sizeof(afs_int32);
972     }
973
974     NETPRI;
975     MUTEX_ENTER(&call->lock);
976     bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
977     MUTEX_EXIT(&call->lock);
978     USERPRI;
979     return bytes;
980 }
981
982 /* rxi_WritevAlloc -- internal version.
983  *
984  * Fill in an iovec to point to data in packet buffers. The application
985  * calls rxi_WritevProc when the buffers are full.
986  *
987  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
988
989 int
990 rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
991                 int nbytes)
992 {
993     struct rx_connection *conn = call->conn;
994     struct rx_packet *cp = call->currentPacket;
995     int requestCount;
996     int nextio;
997     /* Temporary values, real work is done in rxi_WritevProc */
998     int tnFree;
999     int tcurvec;
1000     char *tcurpos;
1001     int tcurlen;
1002
1003     requestCount = nbytes;
1004     nextio = 0;
1005
1006     /* Free any packets from the last call to ReadvProc/WritevProc */
1007     if (queue_IsNotEmpty(&call->iovq)) {
1008 #ifdef RXDEBUG_PACKET
1009         call->iovqc -=
1010 #endif /* RXDEBUG_PACKET */
1011             rxi_FreePackets(0, &call->iovq);
1012     }
1013
1014     if (call->mode != RX_MODE_SENDING) {
1015         if ((conn->type == RX_SERVER_CONNECTION)
1016             && (call->mode == RX_MODE_RECEIVING)) {
1017             call->mode = RX_MODE_SENDING;
1018             if (cp) {
1019                 cp->flags &= ~RX_PKTFLAG_CP;
1020                 rxi_FreePacket(cp);
1021                 cp = call->currentPacket = (struct rx_packet *)0;
1022                 call->nLeft = 0;
1023                 call->nFree = 0;
1024             }
1025         } else {
1026             return 0;
1027         }
1028     }
1029
1030     /* Set up the iovec to point to data in packet buffers. */
1031     tnFree = call->nFree;
1032     tcurvec = call->curvec;
1033     tcurpos = call->curpos;
1034     tcurlen = call->curlen;
1035     do {
1036         unsigned int t;
1037
1038         if (tnFree == 0) {
1039             /* current packet is full, allocate a new one */
1040             cp = rxi_AllocSendPacket(call, nbytes);
1041             if (cp == NULL) {
1042                 /* out of space, return what we have */
1043                 *nio = nextio;
1044                 return requestCount - nbytes;
1045             }
1046             cp->flags |= RX_PKTFLAG_IOVQ;
1047             queue_Append(&call->iovq, cp);
1048 #ifdef RXDEBUG_PACKET
1049             call->iovqc++;
1050 #endif /* RXDEBUG_PACKET */
1051             tnFree = cp->length;
1052             tcurvec = 1;
1053             tcurpos =
1054                 (char *)cp->wirevec[1].iov_base +
1055                 call->conn->securityHeaderSize;
1056             tcurlen = cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
1057         }
1058
1059         if (tnFree < nbytes) {
1060             /* try to extend the current packet */
1061             int len, mud;
1062             len = cp->length;
1063             mud = rx_MaxUserDataSize(call);
1064             if (mud > len) {
1065                 int want;
1066                 want = MIN(nbytes - tnFree, mud - len);
1067                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
1068                 if (cp->length > (unsigned)mud)
1069                     cp->length = mud;
1070                 tnFree += (cp->length - len);
1071                 if (cp == call->currentPacket) {
1072                     call->nFree += (cp->length - len);
1073                 }
1074             }
1075         }
1076
1077         /* fill in the next entry in the iovec */
1078         t = MIN(tcurlen, nbytes);
1079         t = MIN(tnFree, t);
1080         iov[nextio].iov_base = tcurpos;
1081         iov[nextio].iov_len = t;
1082         nbytes -= t;
1083         tcurpos += t;
1084         tcurlen -= t;
1085         tnFree -= t;
1086         nextio++;
1087
1088         if (!tcurlen) {
1089             /* need to get another struct iov */
1090             if (++tcurvec >= cp->niovecs) {
1091                 /* current packet is full, extend it or move on to next packet */
1092                 tnFree = 0;
1093             } else {
1094                 tcurpos = (char *)cp->wirevec[tcurvec].iov_base;
1095                 tcurlen = cp->wirevec[tcurvec].iov_len;
1096             }
1097         }
1098     } while (nbytes && nextio < maxio);
1099     *nio = nextio;
1100     return requestCount - nbytes;
1101 }
1102
1103 int
1104 rx_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
1105                int nbytes)
1106 {
1107     int bytes;
1108     SPLVAR;
1109
1110     NETPRI;
1111     MUTEX_ENTER(&call->lock);
1112     bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
1113     MUTEX_EXIT(&call->lock);
1114     USERPRI;
1115     return bytes;
1116 }
1117
1118 /* rxi_WritevProc -- internal version.
1119  *
1120  * Send buffers allocated in rxi_WritevAlloc.
1121  *
1122  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
1123
1124 int
1125 rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1126 {
1127     struct rx_packet *cp = NULL;
1128     struct rx_call *p, *np;
1129     int nextio;
1130     int requestCount;
1131     struct rx_queue tmpq;
1132 #ifdef RXDEBUG_PACKET
1133     u_short tmpqc;
1134 #endif
1135
1136     requestCount = nbytes;
1137     nextio = 0;
1138
1139     if (call->mode != RX_MODE_SENDING) {
1140         call->error = RX_PROTOCOL_ERROR;
1141     }
1142 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1143     /* Wait until TQ_BUSY is reset before trying to move any
1144      * packets to the transmit queue.  */
1145     while (!call->error && call->flags & RX_CALL_TQ_BUSY) {
1146         call->flags |= RX_CALL_TQ_WAIT;
1147 #ifdef RX_ENABLE_LOCKS
1148         CV_WAIT(&call->cv_tq, &call->lock);
1149 #else /* RX_ENABLE_LOCKS */
1150         osi_rxSleep(&call->tq);
1151 #endif /* RX_ENABLE_LOCKS */
1152     }
1153 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1154     /* cp is no longer valid since we may have given up the lock */
1155     cp = call->currentPacket;
1156
1157     if (call->error) {
1158         if (cp) {
1159             cp->flags &= ~RX_PKTFLAG_CP;
1160             cp->flags |= RX_PKTFLAG_IOVQ;
1161             queue_Prepend(&call->iovq, cp);
1162 #ifdef RXDEBUG_PACKET
1163             call->iovqc++;
1164 #endif /* RXDEBUG_PACKET */
1165             cp = call->currentPacket = (struct rx_packet *)0;
1166         }
1167 #ifdef RXDEBUG_PACKET
1168         call->iovqc -=
1169 #endif /* RXDEBUG_PACKET */
1170             rxi_FreePackets(0, &call->iovq);
1171         return 0;
1172     }
1173
1174     /* Loop through the I/O vector adjusting packet pointers.
1175      * Place full packets back onto the iovq once they are ready
1176      * to send. Set RX_PROTOCOL_ERROR if any problems are found in
1177      * the iovec. We put the loop condition at the end to ensure that
1178      * a zero length write will push a short packet. */
1179     nextio = 0;
1180     queue_Init(&tmpq);
1181 #ifdef RXDEBUG_PACKET
1182     tmpqc = 0;
1183 #endif /* RXDEBUG_PACKET */
1184     do {
1185         if (call->nFree == 0 && cp) {
1186             clock_NewTime();    /* Bogus:  need new time package */
1187             /* The 0, below, specifies that it is not the last packet: 
1188              * there will be others. PrepareSendPacket may
1189              * alter the packet length by up to
1190              * conn->securityMaxTrailerSize */
1191             hadd32(call->bytesSent, cp->length);
1192             rxi_PrepareSendPacket(call, cp, 0);
1193             queue_Append(&tmpq, cp);
1194 #ifdef RXDEBUG_PACKET
1195             tmpqc++;
1196 #endif /* RXDEBUG_PACKET */
1197             cp = call->currentPacket = (struct rx_packet *)0;
1198
1199             /* The head of the iovq is now the current packet */
1200             if (nbytes) {
1201                 if (queue_IsEmpty(&call->iovq)) {
1202                     call->error = RX_PROTOCOL_ERROR;
1203 #ifdef RXDEBUG_PACKET
1204                     tmpqc -=
1205 #endif /* RXDEBUG_PACKET */
1206                         rxi_FreePackets(0, &tmpq);
1207                     return 0;
1208                 }
1209                 cp = queue_First(&call->iovq, rx_packet);
1210                 queue_Remove(cp);
1211                 cp->flags &= ~RX_PKTFLAG_IOVQ;
1212 #ifdef RXDEBUG_PACKET
1213                 call->iovqc--;
1214 #endif /* RXDEBUG_PACKET */
1215                 cp->flags |= RX_PKTFLAG_CP;
1216                 call->currentPacket = cp;
1217                 call->nFree = cp->length;
1218                 call->curvec = 1;
1219                 call->curpos =
1220                     (char *)cp->wirevec[1].iov_base +
1221                     call->conn->securityHeaderSize;
1222                 call->curlen =
1223                     cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
1224             }
1225         }
1226
1227         if (nbytes) {
1228             /* The next iovec should point to the current position */
1229             if (iov[nextio].iov_base != call->curpos
1230                 || iov[nextio].iov_len > (int)call->curlen) {
1231                 call->error = RX_PROTOCOL_ERROR;
1232                 if (cp) {
1233                     cp->flags &= ~RX_PKTFLAG_CP;
1234                     queue_Prepend(&tmpq, cp);
1235 #ifdef RXDEBUG_PACKET
1236                     tmpqc++;
1237 #endif /* RXDEBUG_PACKET */
1238                     cp = call->currentPacket = (struct rx_packet *)0;
1239                 }
1240 #ifdef RXDEBUG_PACKET
1241                 tmpqc -=
1242 #endif /* RXDEBUG_PACKET */
1243                     rxi_FreePackets(0, &tmpq);
1244                 return 0;
1245             }
1246             nbytes -= iov[nextio].iov_len;
1247             call->curpos += iov[nextio].iov_len;
1248             call->curlen -= iov[nextio].iov_len;
1249             call->nFree -= iov[nextio].iov_len;
1250             nextio++;
1251             if (call->curlen == 0) {
1252                 if (++call->curvec > cp->niovecs) {
1253                     call->nFree = 0;
1254                 } else {
1255                     call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
1256                     call->curlen = cp->wirevec[call->curvec].iov_len;
1257                 }
1258             }
1259         }
1260     } while (nbytes && nextio < nio);
1261
1262     /* Move the packets from the temporary queue onto the transmit queue.
1263      * We may end up with more than call->twind packets on the queue. */
1264     
1265 #ifdef KDUMP_RX_LOCK
1266     for (queue_Scan(&tmpq, p, np, rx_call_rx_lock))
1267 #else
1268     for (queue_Scan(&tmpq, p, np, rx_call))
1269 #endif
1270     {
1271         p->flags |= RX_PKTFLAG_TQ;
1272     }
1273     queue_SpliceAppend(&call->tq, &tmpq);
1274
1275     if (!(call->flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
1276         rxi_Start(0, call, 0, 0);
1277     }
1278
1279     /* Wait for the length of the transmit queue to fall below call->twind */
1280     while (!call->error && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
1281         clock_NewTime();
1282         call->startWait = clock_Sec();
1283 #ifdef  RX_ENABLE_LOCKS
1284         CV_WAIT(&call->cv_twind, &call->lock);
1285 #else
1286         call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
1287         osi_rxSleep(&call->twind);
1288 #endif
1289         call->startWait = 0;
1290     }
1291     /* cp is no longer valid since we may have given up the lock */
1292     cp = call->currentPacket;
1293
1294     if (call->error) {
1295         if (cp) {
1296             cp->flags &= ~RX_PKTFLAG_CP;
1297             rxi_FreePacket(cp);
1298             cp = call->currentPacket = (struct rx_packet *)0;
1299         }
1300         return 0;
1301     }
1302
1303     return requestCount - nbytes;
1304 }
1305
1306 int
1307 rx_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1308 {
1309     int bytes;
1310     SPLVAR;
1311
1312     NETPRI;
1313     MUTEX_ENTER(&call->lock);
1314     bytes = rxi_WritevProc(call, iov, nio, nbytes);
1315     MUTEX_EXIT(&call->lock);
1316     USERPRI;
1317     return bytes;
1318 }
1319
1320 /* Flush any buffered data to the stream, switch to read mode
1321  * (clients) or to EOF mode (servers) */
1322 void
1323 rxi_FlushWrite(struct rx_call *call)
1324 {
1325     struct rx_packet *cp = NULL;
1326
1327     /* Free any packets from the last call to ReadvProc/WritevProc */
1328     if (queue_IsNotEmpty(&call->iovq)) {
1329 #ifdef RXDEBUG_PACKET
1330         call->iovqc -=
1331 #endif /* RXDEBUG_PACKET */
1332             rxi_FreePackets(0, &call->iovq);
1333     }
1334
1335     if (call->mode == RX_MODE_SENDING) {
1336
1337         call->mode =
1338             (call->conn->type ==
1339              RX_CLIENT_CONNECTION ? RX_MODE_RECEIVING : RX_MODE_EOF);
1340
1341 #ifdef RX_KERNEL_TRACE
1342         {
1343             int glockOwner = ISAFS_GLOCK();
1344             if (!glockOwner)
1345                 AFS_GLOCK();
1346             afs_Trace3(afs_iclSetp, CM_TRACE_WASHERE, ICL_TYPE_STRING,
1347                        __FILE__, ICL_TYPE_INT32, __LINE__, ICL_TYPE_POINTER,
1348                        call);
1349             if (!glockOwner)
1350                 AFS_GUNLOCK();
1351         }
1352 #endif
1353
1354 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1355         /* Wait until TQ_BUSY is reset before adding any
1356          * packets to the transmit queue
1357          */
1358         while (call->flags & RX_CALL_TQ_BUSY) {
1359             call->flags |= RX_CALL_TQ_WAIT;
1360 #ifdef RX_ENABLE_LOCKS
1361             CV_WAIT(&call->cv_tq, &call->lock);
1362 #else /* RX_ENABLE_LOCKS */
1363             osi_rxSleep(&call->tq);
1364 #endif /* RX_ENABLE_LOCKS */
1365         }
1366 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1367
1368         /* cp is no longer valid since we may have given up the lock */
1369         cp = call->currentPacket;
1370
1371         if (cp) {
1372             /* cp->length is only supposed to be the user's data */
1373             /* cp->length was already set to (then-current) 
1374              * MaxUserDataSize or less. */
1375             cp->flags &= ~RX_PKTFLAG_CP;
1376             cp->length -= call->nFree;
1377             call->currentPacket = (struct rx_packet *)0;
1378             call->nFree = 0;
1379         } else {
1380             cp = rxi_AllocSendPacket(call, 0);
1381             if (!cp) {
1382                 /* Mode can no longer be MODE_SENDING */
1383                 return;
1384             }
1385             cp->length = 0;
1386             cp->niovecs = 2;    /* header + space for rxkad stuff */
1387             call->nFree = 0;
1388         }
1389
1390         /* The 1 specifies that this is the last packet */
1391         hadd32(call->bytesSent, cp->length);
1392         rxi_PrepareSendPacket(call, cp, 1);
1393         cp->flags |= RX_PKTFLAG_TQ;
1394         queue_Append(&call->tq, cp);
1395 #ifdef RXDEBUG_PACKET
1396         call->tqc++;
1397 #endif /* RXDEBUG_PACKET */
1398         if (!
1399             (call->
1400              flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
1401             rxi_Start(0, call, 0, 0);
1402         }
1403     }
1404 }
1405
1406 /* Flush any buffered data to the stream, switch to read mode
1407  * (clients) or to EOF mode (servers) */
1408 void
1409 rx_FlushWrite(struct rx_call *call)
1410 {
1411     SPLVAR;
1412     NETPRI;
1413     MUTEX_ENTER(&call->lock);
1414     rxi_FlushWrite(call);
1415     MUTEX_EXIT(&call->lock);
1416     USERPRI;
1417 }