rx-packet-count-debugging-20090104
[openafs.git] / src / rx / rx_rdwr.c
1  /*
2   * Copyright 2000, International Business Machines Corporation and others.
3   * All Rights Reserved.
4   * 
5   * This software has been released under the terms of the IBM Public
6   * License.  For details, see the LICENSE file in the top-level source
7   * directory or online at http://www.openafs.org/dl/license10.html
8   */
9
10 #include <afsconfig.h>
11 #ifdef KERNEL
12 #include "afs/param.h"
13 #else
14 #include <afs/param.h>
15 #endif
16
17 RCSID
18     ("$Header$");
19
20 #ifdef KERNEL
21 #ifndef UKERNEL
22 #ifdef RX_KERNEL_TRACE
23 #include "rx_kcommon.h"
24 #endif
25 #if defined(AFS_DARWIN_ENV) || defined(AFS_XBSD_ENV)
26 #include "afs/sysincludes.h"
27 #else
28 #include "h/types.h"
29 #include "h/time.h"
30 #include "h/stat.h"
31 #if defined(AFS_AIX_ENV) || defined(AFS_AUX_ENV) || defined(AFS_SUN5_ENV) 
32 #include "h/systm.h"
33 #endif
34 #ifdef  AFS_OSF_ENV
35 #include <net/net_globals.h>
36 #endif /* AFS_OSF_ENV */
37 #ifdef AFS_LINUX20_ENV
38 #include "h/socket.h"
39 #endif
40 #include "netinet/in.h"
41 #if defined(AFS_SGI_ENV)
42 #include "afs/sysincludes.h"
43 #endif
44 #endif
45 #include "afs/afs_args.h"
46 #include "afs/afs_osi.h"
47 #if     (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
48 #include "h/systm.h"
49 #endif
50 #else /* !UKERNEL */
51 #include "afs/sysincludes.h"
52 #endif /* !UKERNEL */
53 #ifdef RXDEBUG
54 #undef RXDEBUG                  /* turn off debugging */
55 #endif /* RXDEBUG */
56
57 #include "rx_kmutex.h"
58 #include "rx/rx_kernel.h"
59 #include "rx/rx_clock.h"
60 #include "rx/rx_queue.h"
61 #include "rx/rx_internal.h"
62 #include "rx/rx.h"
63 #include "rx/rx_globals.h"
64 #include "afs/lock.h"
65 #include "afsint.h"
66 #ifdef  AFS_OSF_ENV
67 #undef kmem_alloc
68 #undef kmem_free
69 #undef mem_alloc
70 #undef mem_free
71 #undef register
72 #endif /* AFS_OSF_ENV */
73 #else /* KERNEL */
74 # include <sys/types.h>
75 #ifdef AFS_NT40_ENV
76 # include <winsock2.h>
77 #else /* !AFS_NT40_ENV */
78 # include <sys/socket.h>
79 # include <sys/file.h>
80 # include <netdb.h>
81 # include <netinet/in.h>
82 # include <sys/stat.h>
83 # include <sys/time.h>
84 #endif /* !AFS_NT40_ENV */
85 #include <string.h>
86 #ifdef HAVE_UNISTD_H
87 #include <unistd.h>
88 #endif
89 # include "rx_user.h"
90 # include "rx_clock.h"
91 # include "rx_queue.h"
92 # include "rx_internal.h"
93 # include "rx.h"
94 # include "rx_globals.h"
95 #endif /* KERNEL */
96
97 #ifdef RX_LOCKS_DB
98 /* rxdb_fileID is used to identify the lock location, along with line#. */
99 static int rxdb_fileID = RXDB_FILE_RX_RDWR;
100 #endif /* RX_LOCKS_DB */
101 /* rxi_ReadProc -- internal version.
102  *
103  * LOCKS USED -- called at netpri with rx global lock and call->lock held.
104  */
105 int
106 rxi_ReadProc(register struct rx_call *call, register char *buf,
107              register int nbytes)
108 {
109     register struct rx_packet *cp = call->currentPacket;
110     register struct rx_packet *rp;
111     register int requestCount;
112     register unsigned int t;
113
114 /* XXXX took out clock_NewTime from here.  Was it needed? */
115     requestCount = nbytes;
116
117     /* Free any packets from the last call to ReadvProc/WritevProc */
118     if (queue_IsNotEmpty(&call->iovq)) {
119 #ifdef RXDEBUG_PACKET
120         call->iovqc -=
121 #endif /* RXDEBUG_PACKET */
122             rxi_FreePackets(0, &call->iovq);
123     }
124
125     do {
126         if (call->nLeft == 0) {
127             /* Get next packet */
128             for (;;) {
129                 if (call->error || (call->mode != RX_MODE_RECEIVING)) {
130                     if (call->error) {
131                         return 0;
132                     }
133                     if (call->mode == RX_MODE_SENDING) {
134                         rxi_FlushWrite(call);
135                         continue;
136                     }
137                 }
138                 if (queue_IsNotEmpty(&call->rq)) {
139                     /* Check that next packet available is next in sequence */
140                     rp = queue_First(&call->rq, rx_packet);
141                     if (rp->header.seq == call->rnext) {
142                         afs_int32 error;
143                         register struct rx_connection *conn = call->conn;
144                         queue_Remove(rp);
145                         rp->flags &= ~RX_PKTFLAG_RQ;
146 #ifdef RXDEBUG_PACKET
147                         call->rqc--;
148 #endif /* RXDEBUG_PACKET */
149
150                         /* RXS_CheckPacket called to undo RXS_PreparePacket's
151                          * work.  It may reduce the length of the packet by up
152                          * to conn->maxTrailerSize, to reflect the length of the
153                          * data + the header. */
154                         if ((error =
155                              RXS_CheckPacket(conn->securityObject, call,
156                                              rp))) {
157                             /* Used to merely shut down the call, but now we 
158                              * shut down the whole connection since this may 
159                              * indicate an attempt to hijack it */
160
161                             MUTEX_EXIT(&call->lock);
162                             rxi_ConnectionError(conn, error);
163                             MUTEX_ENTER(&conn->conn_data_lock);
164                             rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
165                             MUTEX_EXIT(&conn->conn_data_lock);
166                             rxi_FreePacket(rp);
167                             MUTEX_ENTER(&call->lock);
168
169                             return 0;
170                         }
171                         call->rnext++;
172                         cp = call->currentPacket = rp;
173                         call->currentPacket->flags |= RX_PKTFLAG_CP;
174                         call->curvec = 1;       /* 0th vec is always header */
175                         /* begin at the beginning [ more or less ], continue 
176                          * on until the end, then stop. */
177                         call->curpos =
178                             (char *)cp->wirevec[1].iov_base +
179                             call->conn->securityHeaderSize;
180                         call->curlen =
181                             cp->wirevec[1].iov_len -
182                             call->conn->securityHeaderSize;
183
184                         /* Notice that this code works correctly if the data
185                          * size is 0 (which it may be--no reply arguments from
186                          * server, for example).  This relies heavily on the
187                          * fact that the code below immediately frees the packet
188                          * (no yields, etc.).  If it didn't, this would be a
189                          * problem because a value of zero for call->nLeft
190                          * normally means that there is no read packet */
191                         call->nLeft = cp->length;
192                         hadd32(call->bytesRcvd, cp->length);
193
194                         /* Send a hard ack for every rxi_HardAckRate+1 packets
195                          * consumed. Otherwise schedule an event to send
196                          * the hard ack later on.
197                          */
198                         call->nHardAcks++;
199                         if (!(call->flags & RX_CALL_RECEIVE_DONE)) {
200                             if (call->nHardAcks > (u_short) rxi_HardAckRate) {
201                                 rxevent_Cancel(call->delayedAckEvent, call,
202                                                RX_CALL_REFCOUNT_DELAY);
203                                 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
204                             } else {
205                                 struct clock when, now;
206                                 clock_GetTime(&now);
207                                 when = now;
208                                 /* Delay to consolidate ack packets */
209                                 clock_Add(&when, &rx_hardAckDelay);
210                                 if (!call->delayedAckEvent
211                                     || clock_Gt(&call->delayedAckEvent->
212                                                 eventTime, &when)) {
213                                     rxevent_Cancel(call->delayedAckEvent,
214                                                    call,
215                                                    RX_CALL_REFCOUNT_DELAY);
216                                     CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
217                                     call->delayedAckEvent =
218                                       rxevent_PostNow(&when, &now,
219                                                      rxi_SendDelayedAck, call,
220                                                      0);
221                                 }
222                             }
223                         }
224                         break;
225                     }
226                 }
227
228 /*
229 MTUXXX  doesn't there need to be an "else" here ??? 
230 */
231                 /* Are there ever going to be any more packets? */
232                 if (call->flags & RX_CALL_RECEIVE_DONE) {
233                     return requestCount - nbytes;
234                 }
235                 /* Wait for in-sequence packet */
236                 call->flags |= RX_CALL_READER_WAIT;
237                 clock_NewTime();
238                 call->startWait = clock_Sec();
239                 while (call->flags & RX_CALL_READER_WAIT) {
240 #ifdef  RX_ENABLE_LOCKS
241                     CV_WAIT(&call->cv_rq, &call->lock);
242 #else
243                     osi_rxSleep(&call->rq);
244 #endif
245                 }
246
247                 call->startWait = 0;
248 #ifdef RX_ENABLE_LOCKS
249                 if (call->error) {
250                     return 0;
251                 }
252 #endif /* RX_ENABLE_LOCKS */
253             }
254         } else
255             /* assert(cp); */
256             /* MTUXXX  this should be replaced by some error-recovery code before shipping */
257             /* yes, the following block is allowed to be the ELSE clause (or not) */
258             /* It's possible for call->nLeft to be smaller than any particular
259              * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
260              * reflects the size of the buffer.  We have to keep track of the
261              * number of bytes read in the length field of the packet struct.  On
262              * the final portion of a received packet, it's almost certain that
263              * call->nLeft will be smaller than the final buffer. */
264             while (nbytes && cp) {
265                 t = MIN((int)call->curlen, nbytes);
266                 t = MIN(t, (int)call->nLeft);
267                 memcpy(buf, call->curpos, t);
268                 buf += t;
269                 nbytes -= t;
270                 call->curpos += t;
271                 call->curlen -= t;
272                 call->nLeft -= t;
273
274                 if (!call->nLeft) {
275                     /* out of packet.  Get another one. */
276                     call->currentPacket->flags &= ~RX_PKTFLAG_CP;
277                     rxi_FreePacket(cp);
278                     cp = call->currentPacket = (struct rx_packet *)0;
279                 } else if (!call->curlen) {
280                     /* need to get another struct iov */
281                     if (++call->curvec >= cp->niovecs) {
282                         /* current packet is exhausted, get ready for another */
283                         /* don't worry about curvec and stuff, they get set somewhere else */
284                         call->currentPacket->flags &= ~RX_PKTFLAG_CP;
285                         rxi_FreePacket(cp);
286                         cp = call->currentPacket = (struct rx_packet *)0;
287                         call->nLeft = 0;
288                     } else {
289                         call->curpos =
290                             (char *)cp->wirevec[call->curvec].iov_base;
291                         call->curlen = cp->wirevec[call->curvec].iov_len;
292                     }
293                 }
294             }
295         if (!nbytes) {
296             /* user buffer is full, return */
297             return requestCount;
298         }
299
300     } while (nbytes);
301
302     return requestCount;
303 }
304
305 int
306 rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
307 {
308     int bytes;
309     int tcurlen;
310     int tnLeft;
311     char *tcurpos;
312     SPLVAR;
313
314     /*
315      * Free any packets from the last call to ReadvProc/WritevProc.
316      * We do not need the lock because the receiver threads only
317      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
318      * RX_CALL_IOVEC_WAIT is always cleared before returning from
319      * ReadvProc/WritevProc.
320      */
321     if (!queue_IsEmpty(&call->iovq)) {
322 #ifdef RXDEBUG_PACKET
323         call->iovqc -=
324 #endif /* RXDEBUG_PACKET */
325             rxi_FreePackets(0, &call->iovq);
326     }
327
328     /*
329      * Most common case, all of the data is in the current iovec.
330      * We do not need the lock because this is the only thread that
331      * updates the curlen, curpos, nLeft fields.
332      *
333      * We are relying on nLeft being zero unless the call is in receive mode.
334      */
335     tcurlen = call->curlen;
336     tnLeft = call->nLeft;
337     if (!call->error && tcurlen > nbytes && tnLeft > nbytes) {
338         tcurpos = call->curpos;
339         memcpy(buf, tcurpos, nbytes);
340         call->curpos = tcurpos + nbytes;
341         call->curlen = tcurlen - nbytes;
342         call->nLeft = tnLeft - nbytes;
343
344         if (!call->nLeft) {
345             /* out of packet.  Get another one. */
346             NETPRI;
347             MUTEX_ENTER(&call->lock);
348             rxi_FreePacket(call->currentPacket);
349             call->currentPacket = (struct rx_packet *)0;
350             MUTEX_EXIT(&call->lock);
351             USERPRI;
352         }
353         return nbytes;
354     }
355
356     NETPRI;
357     MUTEX_ENTER(&call->lock);
358     bytes = rxi_ReadProc(call, buf, nbytes);
359     MUTEX_EXIT(&call->lock);
360     USERPRI;
361     return bytes;
362 }
363
364 /* Optimization for unmarshalling 32 bit integers */
365 int
366 rx_ReadProc32(struct rx_call *call, afs_int32 * value)
367 {
368     int bytes;
369     int tcurlen;
370     int tnLeft;
371     char *tcurpos;
372     SPLVAR;
373
374     /*
375      * Free any packets from the last call to ReadvProc/WritevProc.
376      * We do not need the lock because the receiver threads only
377      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
378      * RX_CALL_IOVEC_WAIT is always cleared before returning from
379      * ReadvProc/WritevProc.
380      */
381     if (!queue_IsEmpty(&call->iovq)) {
382 #ifdef RXDEBUG_PACKET
383         call->iovqc -=
384 #endif /* RXDEBUG_PACKET */
385             rxi_FreePackets(0, &call->iovq);
386     }
387
388     /*
389      * Most common case, all of the data is in the current iovec.
390      * We do not need the lock because this is the only thread that
391      * updates the curlen, curpos, nLeft fields.
392      *
393      * We are relying on nLeft being zero unless the call is in receive mode.
394      */
395     tcurlen = call->curlen;
396     tnLeft = call->nLeft;
397     if (!call->error && tcurlen >= sizeof(afs_int32)
398         && tnLeft >= sizeof(afs_int32)) {
399         tcurpos = call->curpos;
400         memcpy((char *)value, tcurpos, sizeof(afs_int32));
401         call->curpos = tcurpos + sizeof(afs_int32);
402         call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
403         call->nLeft = (u_short)(tnLeft - sizeof(afs_int32));
404         if (!call->nLeft && call->currentPacket != NULL) {
405             /* out of packet.  Get another one. */
406             NETPRI;
407             MUTEX_ENTER(&call->lock);
408             rxi_FreePacket(call->currentPacket);
409             call->currentPacket = (struct rx_packet *)0;
410             MUTEX_EXIT(&call->lock);
411             USERPRI;
412         }
413         return sizeof(afs_int32);
414     }
415
416     NETPRI;
417     MUTEX_ENTER(&call->lock);
418     bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
419     MUTEX_EXIT(&call->lock);
420     USERPRI;
421     return bytes;
422 }
423
424 /* rxi_FillReadVec
425  *
426  * Uses packets in the receive queue to fill in as much of the
427  * current iovec as possible. Does not block if it runs out
428  * of packets to complete the iovec. Return true if an ack packet
429  * was sent, otherwise return false */
430 int
431 rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
432 {
433     int didConsume = 0;
434     int didHardAck = 0;
435     register unsigned int t;
436     struct rx_packet *rp;
437     struct rx_packet *curp;
438     struct iovec *call_iov;
439     struct iovec *cur_iov = NULL;
440
441     curp = call->currentPacket;
442     if (curp) {
443         cur_iov = &curp->wirevec[call->curvec];
444     }
445     call_iov = &call->iov[call->iovNext];
446
447     while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) {
448         if (call->nLeft == 0) {
449             /* Get next packet */
450             if (queue_IsNotEmpty(&call->rq)) {
451                 /* Check that next packet available is next in sequence */
452                 rp = queue_First(&call->rq, rx_packet);
453                 if (rp->header.seq == call->rnext) {
454                     afs_int32 error;
455                     register struct rx_connection *conn = call->conn;
456                     queue_Remove(rp);
457                     rp->flags &= ~RX_PKTFLAG_RQ;
458 #ifdef RXDEBUG_PACKET
459                     call->rqc--;
460 #endif /* RXDEBUG_PACKET */
461
462                     /* RXS_CheckPacket called to undo RXS_PreparePacket's
463                      * work.  It may reduce the length of the packet by up
464                      * to conn->maxTrailerSize, to reflect the length of the
465                      * data + the header. */
466                     if ((error =
467                          RXS_CheckPacket(conn->securityObject, call, rp))) {
468                         /* Used to merely shut down the call, but now we 
469                          * shut down the whole connection since this may 
470                          * indicate an attempt to hijack it */
471
472                         MUTEX_EXIT(&call->lock);
473                         rxi_ConnectionError(conn, error);
474                         MUTEX_ENTER(&conn->conn_data_lock);
475                         rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
476                         MUTEX_EXIT(&conn->conn_data_lock);
477                         rxi_FreePacket(rp);
478                         MUTEX_ENTER(&call->lock);
479
480                         return 1;
481                     }
482                     call->rnext++;
483                     curp = call->currentPacket = rp;
484                     call->currentPacket->flags |= RX_PKTFLAG_CP;
485                     call->curvec = 1;   /* 0th vec is always header */
486                     cur_iov = &curp->wirevec[1];
487                     /* begin at the beginning [ more or less ], continue 
488                      * on until the end, then stop. */
489                     call->curpos =
490                         (char *)curp->wirevec[1].iov_base +
491                         call->conn->securityHeaderSize;
492                     call->curlen =
493                         curp->wirevec[1].iov_len -
494                         call->conn->securityHeaderSize;
495
496                     /* Notice that this code works correctly if the data
497                      * size is 0 (which it may be--no reply arguments from
498                      * server, for example).  This relies heavily on the
499                      * fact that the code below immediately frees the packet
500                      * (no yields, etc.).  If it didn't, this would be a
501                      * problem because a value of zero for call->nLeft
502                      * normally means that there is no read packet */
503                     call->nLeft = curp->length;
504                     hadd32(call->bytesRcvd, curp->length);
505
506                     /* Send a hard ack for every rxi_HardAckRate+1 packets
507                      * consumed. Otherwise schedule an event to send
508                      * the hard ack later on.
509                      */
510                     call->nHardAcks++;
511                     didConsume = 1;
512                     continue;
513                 }
514             }
515             break;
516         }
517
518         /* It's possible for call->nLeft to be smaller than any particular
519          * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
520          * reflects the size of the buffer.  We have to keep track of the
521          * number of bytes read in the length field of the packet struct.  On
522          * the final portion of a received packet, it's almost certain that
523          * call->nLeft will be smaller than the final buffer. */
524         while (call->iovNBytes && call->iovNext < call->iovMax && curp) {
525
526             t = MIN((int)call->curlen, call->iovNBytes);
527             t = MIN(t, (int)call->nLeft);
528             call_iov->iov_base = call->curpos;
529             call_iov->iov_len = t;
530             call_iov++;
531             call->iovNext++;
532             call->iovNBytes -= t;
533             call->curpos += t;
534             call->curlen -= t;
535             call->nLeft -= t;
536
537             if (!call->nLeft) {
538                 /* out of packet.  Get another one. */
539                 curp->flags &= ~RX_PKTFLAG_CP;
540                 curp->flags |= RX_PKTFLAG_IOVQ;
541                 queue_Append(&call->iovq, curp);
542 #ifdef RXDEBUG_PACKET
543                 call->iovqc++;
544 #endif /* RXDEBUG_PACKET */
545                 curp = call->currentPacket = (struct rx_packet *)0;
546             } else if (!call->curlen) {
547                 /* need to get another struct iov */
548                 if (++call->curvec >= curp->niovecs) {
549                     /* current packet is exhausted, get ready for another */
550                     /* don't worry about curvec and stuff, they get set somewhere else */
551                     curp->flags &= ~RX_PKTFLAG_CP;
552                     curp->flags |= RX_PKTFLAG_IOVQ;
553                     queue_Append(&call->iovq, curp);
554 #ifdef RXDEBUG_PACKET
555                     call->iovqc++;
556 #endif /* RXDEBUG_PACKET */
557                     curp = call->currentPacket = (struct rx_packet *)0;
558                     call->nLeft = 0;
559                 } else {
560                     cur_iov++;
561                     call->curpos = (char *)cur_iov->iov_base;
562                     call->curlen = cur_iov->iov_len;
563                 }
564             }
565         }
566     }
567
568     /* If we consumed any packets then check whether we need to
569      * send a hard ack. */
570     if (didConsume && (!(call->flags & RX_CALL_RECEIVE_DONE))) {
571         if (call->nHardAcks > (u_short) rxi_HardAckRate) {
572             rxevent_Cancel(call->delayedAckEvent, call,
573                            RX_CALL_REFCOUNT_DELAY);
574             rxi_SendAck(call, 0, serial, RX_ACK_DELAY, 0);
575             didHardAck = 1;
576         } else {
577             struct clock when, now;
578             clock_GetTime(&now);
579             when = now;
580             /* Delay to consolidate ack packets */
581             clock_Add(&when, &rx_hardAckDelay);
582             if (!call->delayedAckEvent
583                 || clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
584                 rxevent_Cancel(call->delayedAckEvent, call,
585                                RX_CALL_REFCOUNT_DELAY);
586                 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
587                 call->delayedAckEvent =
588                     rxevent_PostNow(&when, &now, rxi_SendDelayedAck, call, 0);
589             }
590         }
591     }
592     return didHardAck;
593 }
594
595
596 /* rxi_ReadvProc -- internal version.
597  *
598  * Fills in an iovec with pointers to the packet buffers. All packets
599  * except the last packet (new current packet) are moved to the iovq
600  * while the application is processing the data.
601  *
602  * LOCKS USED -- called at netpri with rx global lock and call->lock held.
603  */
604 int
605 rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
606               int nbytes)
607 {
608     int requestCount;
609     int nextio;
610
611     requestCount = nbytes;
612     nextio = 0;
613
614     /* Free any packets from the last call to ReadvProc/WritevProc */
615     if (queue_IsNotEmpty(&call->iovq)) {
616 #ifdef RXDEBUG_PACKET
617         call->iovqc -=
618 #endif /* RXDEBUG_PACKET */
619             rxi_FreePackets(0, &call->iovq);
620     }
621
622     if (call->mode == RX_MODE_SENDING) {
623         rxi_FlushWrite(call);
624     }
625
626     if (call->error) {
627         return 0;
628     }
629
630     /* Get whatever data is currently available in the receive queue.
631      * If rxi_FillReadVec sends an ack packet then it is possible
632      * that we will receive more data while we drop the call lock
633      * to send the packet. Set the RX_CALL_IOVEC_WAIT flag
634      * here to avoid a race with the receive thread if we send
635      * hard acks in rxi_FillReadVec. */
636     call->flags |= RX_CALL_IOVEC_WAIT;
637     call->iovNBytes = nbytes;
638     call->iovMax = maxio;
639     call->iovNext = 0;
640     call->iov = iov;
641     rxi_FillReadVec(call, 0);
642
643     /* if we need more data then sleep until the receive thread has
644      * filled in the rest. */
645     if (!call->error && call->iovNBytes && call->iovNext < call->iovMax
646         && !(call->flags & RX_CALL_RECEIVE_DONE)) {
647         call->flags |= RX_CALL_READER_WAIT;
648         clock_NewTime();
649         call->startWait = clock_Sec();
650         while (call->flags & RX_CALL_READER_WAIT) {
651 #ifdef  RX_ENABLE_LOCKS
652             CV_WAIT(&call->cv_rq, &call->lock);
653 #else
654             osi_rxSleep(&call->rq);
655 #endif
656         }
657         call->startWait = 0;
658     }
659     call->flags &= ~RX_CALL_IOVEC_WAIT;
660 #ifdef RX_ENABLE_LOCKS
661     if (call->error) {
662         return 0;
663     }
664 #endif /* RX_ENABLE_LOCKS */
665
666     call->iov = NULL;
667     *nio = call->iovNext;
668     return nbytes - call->iovNBytes;
669 }
670
671 int
672 rx_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
673              int nbytes)
674 {
675     int bytes;
676     SPLVAR;
677
678     NETPRI;
679     MUTEX_ENTER(&call->lock);
680     bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
681     MUTEX_EXIT(&call->lock);
682     USERPRI;
683     return bytes;
684 }
685
686 /* rxi_WriteProc -- internal version.
687  *
688  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
689
690 int
691 rxi_WriteProc(register struct rx_call *call, register char *buf,
692               register int nbytes)
693 {
694     struct rx_connection *conn = call->conn;
695     register struct rx_packet *cp = call->currentPacket;
696     register unsigned int t;
697     int requestCount = nbytes;
698
699     /* Free any packets from the last call to ReadvProc/WritevProc */
700     if (queue_IsNotEmpty(&call->iovq)) {
701 #ifdef RXDEBUG_PACKET
702         call->iovqc -=
703 #endif /* RXDEBUG_PACKET */
704             rxi_FreePackets(0, &call->iovq);
705     }
706
707     if (call->mode != RX_MODE_SENDING) {
708         if ((conn->type == RX_SERVER_CONNECTION)
709             && (call->mode == RX_MODE_RECEIVING)) {
710             call->mode = RX_MODE_SENDING;
711             if (cp) {
712                 cp->flags &= ~RX_PKTFLAG_CP;
713                 rxi_FreePacket(cp);
714                 cp = call->currentPacket = (struct rx_packet *)0;
715                 call->nLeft = 0;
716                 call->nFree = 0;
717             }
718         } else {
719             return 0;
720         }
721     }
722
723     /* Loop condition is checked at end, so that a write of 0 bytes
724      * will force a packet to be created--specially for the case where
725      * there are 0 bytes on the stream, but we must send a packet
726      * anyway. */
727     do {
728         if (call->nFree == 0) {
729             if (!call->error && cp) {
730                 /* Clear the current packet now so that if
731                  * we are forced to wait and drop the lock 
732                  * the packet we are planning on using 
733                  * cannot be freed.
734                  */
735                 cp->flags &= ~RX_PKTFLAG_CP;
736                 call->currentPacket = (struct rx_packet *)0;
737 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
738                 /* Wait until TQ_BUSY is reset before adding any
739                  * packets to the transmit queue
740                  */
741                 while (call->flags & RX_CALL_TQ_BUSY) {
742                     call->flags |= RX_CALL_TQ_WAIT;
743 #ifdef RX_ENABLE_LOCKS
744                     CV_WAIT(&call->cv_tq, &call->lock);
745 #else /* RX_ENABLE_LOCKS */
746                     osi_rxSleep(&call->tq);
747 #endif /* RX_ENABLE_LOCKS */
748                 }
749 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
750                 clock_NewTime();        /* Bogus:  need new time package */
751                 /* The 0, below, specifies that it is not the last packet: 
752                  * there will be others. PrepareSendPacket may
753                  * alter the packet length by up to
754                  * conn->securityMaxTrailerSize */
755                 hadd32(call->bytesSent, cp->length);
756                 rxi_PrepareSendPacket(call, cp, 0);
757                 cp->flags |= RX_PKTFLAG_TQ;
758                 queue_Append(&call->tq, cp);
759 #ifdef RXDEBUG_PACKET
760                 call->tqc++;
761 #endif /* RXDEBUG_PACKET */
762                 cp = (struct rx_packet *)0;
763                 if (!
764                     (call->
765                      flags & (RX_CALL_FAST_RECOVER |
766                               RX_CALL_FAST_RECOVER_WAIT))) {
767                     rxi_Start(0, call, 0, 0);
768                 }
769             } else if (cp) {
770                 cp->flags &= ~RX_PKTFLAG_CP;
771                 rxi_FreePacket(cp);
772                 cp = call->currentPacket = (struct rx_packet *)0;
773             }
774             /* Wait for transmit window to open up */
775             while (!call->error
776                    && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
777                 clock_NewTime();
778                 call->startWait = clock_Sec();
779
780 #ifdef  RX_ENABLE_LOCKS
781                 CV_WAIT(&call->cv_twind, &call->lock);
782 #else
783                 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
784                 osi_rxSleep(&call->twind);
785 #endif
786
787                 call->startWait = 0;
788 #ifdef RX_ENABLE_LOCKS
789                 if (call->error) {
790                     return 0;
791                 }
792 #endif /* RX_ENABLE_LOCKS */
793             }
794             if ((cp = rxi_AllocSendPacket(call, nbytes))) {
795                 cp->flags |= RX_PKTFLAG_CP;
796                 call->currentPacket = cp;
797                 call->nFree = cp->length;
798                 call->curvec = 1;       /* 0th vec is always header */
799                 /* begin at the beginning [ more or less ], continue 
800                  * on until the end, then stop. */
801                 call->curpos =
802                     (char *)cp->wirevec[1].iov_base +
803                     call->conn->securityHeaderSize;
804                 call->curlen =
805                     cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
806             }
807             if (call->error) {
808                 if (cp) {
809                     cp->flags &= ~RX_PKTFLAG_CP;
810                     rxi_FreePacket(cp);
811                     call->currentPacket = NULL;
812                 }
813                 return 0;
814             }
815         }
816
817         if (cp && (int)call->nFree < nbytes) {
818             /* Try to extend the current buffer */
819             register int len, mud;
820             len = cp->length;
821             mud = rx_MaxUserDataSize(call);
822             if (mud > len) {
823                 int want;
824                 want = MIN(nbytes - (int)call->nFree, mud - len);
825                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
826                 if (cp->length > (unsigned)mud)
827                     cp->length = mud;
828                 call->nFree += (cp->length - len);
829             }
830         }
831
832         /* If the remaining bytes fit in the buffer, then store them
833          * and return.  Don't ship a buffer that's full immediately to
834          * the peer--we don't know if it's the last buffer yet */
835
836         if (!cp) {
837             call->nFree = 0;
838         }
839
840         while (nbytes && call->nFree) {
841
842             t = MIN((int)call->curlen, nbytes);
843             t = MIN((int)call->nFree, t);
844             memcpy(call->curpos, buf, t);
845             buf += t;
846             nbytes -= t;
847             call->curpos += t;
848             call->curlen -= (u_short)t;
849             call->nFree -= (u_short)t;
850
851             if (!call->curlen) {
852                 /* need to get another struct iov */
853                 if (++call->curvec >= cp->niovecs) {
854                     /* current packet is full, extend or send it */
855                     call->nFree = 0;
856                 } else {
857                     call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
858                     call->curlen = cp->wirevec[call->curvec].iov_len;
859                 }
860             }
861         }                       /* while bytes to send and room to send them */
862
863         /* might be out of space now */
864         if (!nbytes) {
865             return requestCount;
866         } else;                 /* more data to send, so get another packet and keep going */
867     } while (nbytes);
868
869     return requestCount - nbytes;
870 }
871
872 int
873 rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
874 {
875     int bytes;
876     int tcurlen;
877     int tnFree;
878     char *tcurpos;
879     SPLVAR;
880
881     /*
882      * Free any packets from the last call to ReadvProc/WritevProc.
883      * We do not need the lock because the receiver threads only
884      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
885      * RX_CALL_IOVEC_WAIT is always cleared before returning from
886      * ReadvProc/WritevProc.
887      */
888     if (queue_IsNotEmpty(&call->iovq)) {
889 #ifdef RXDEBUG_PACKET
890         call->iovqc -=
891 #endif /* RXDEBUG_PACKET */
892             rxi_FreePackets(0, &call->iovq);
893     }
894
895     /*
896      * Most common case: all of the data fits in the current iovec.
897      * We do not need the lock because this is the only thread that
898      * updates the curlen, curpos, nFree fields.
899      *
900      * We are relying on nFree being zero unless the call is in send mode.
901      */
902     tcurlen = (int)call->curlen;
903     tnFree = (int)call->nFree;
904     if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
905         tcurpos = call->curpos;
906         memcpy(tcurpos, buf, nbytes);
907         call->curpos = tcurpos + nbytes;
908         call->curlen = (u_short)(tcurlen - nbytes);
909         call->nFree = (u_short)(tnFree - nbytes);
910         return nbytes;
911     }
912
913     NETPRI;
914     MUTEX_ENTER(&call->lock);
915     bytes = rxi_WriteProc(call, buf, nbytes);
916     MUTEX_EXIT(&call->lock);
917     USERPRI;
918     return bytes;
919 }
920
921 /* Optimization for marshalling 32 bit arguments */
922 int
923 rx_WriteProc32(register struct rx_call *call, register afs_int32 * value)
924 {
925     int bytes;
926     int tcurlen;
927     int tnFree;
928     char *tcurpos;
929     SPLVAR;
930
931     /*
932      * Free any packets from the last call to ReadvProc/WritevProc.
933      * We do not need the lock because the receiver threads only
934      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
935      * RX_CALL_IOVEC_WAIT is always cleared before returning from
936      * ReadvProc/WritevProc.
937      */
938     if (queue_IsNotEmpty(&call->iovq)) {
939 #ifdef RXDEBUG_PACKET
940         call->iovqc -=
941 #endif /* RXDEBUG_PACKET */
942             rxi_FreePackets(0, &call->iovq);
943     }
944
945     /*
946      * Most common case: all of the data fits in the current iovec.
947      * We do not need the lock because this is the only thread that
948      * updates the curlen, curpos, nFree fields.
949      *
950      * We are relying on nFree being zero unless the call is in send mode.
951      */
952     tcurlen = call->curlen;
953     tnFree = call->nFree;
954     if (!call->error && tcurlen >= sizeof(afs_int32)
955         && tnFree >= sizeof(afs_int32)) {
956         tcurpos = call->curpos;
957         if (!((size_t)tcurpos & (sizeof(afs_int32) - 1))) {
958             *((afs_int32 *) (tcurpos)) = *value;
959         } else {
960             memcpy(tcurpos, (char *)value, sizeof(afs_int32));
961         }
962         call->curpos = tcurpos + sizeof(afs_int32);
963         call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
964         call->nFree = (u_short)(tnFree - sizeof(afs_int32));
965         return sizeof(afs_int32);
966     }
967
968     NETPRI;
969     MUTEX_ENTER(&call->lock);
970     bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
971     MUTEX_EXIT(&call->lock);
972     USERPRI;
973     return bytes;
974 }
975
976 /* rxi_WritevAlloc -- internal version.
977  *
978  * Fill in an iovec to point to data in packet buffers. The application
979  * calls rxi_WritevProc when the buffers are full.
980  *
981  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
982
983 int
984 rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
985                 int nbytes)
986 {
987     struct rx_connection *conn = call->conn;
988     struct rx_packet *cp = call->currentPacket;
989     int requestCount;
990     int nextio;
991     /* Temporary values, real work is done in rxi_WritevProc */
992     int tnFree;
993     int tcurvec;
994     char *tcurpos;
995     int tcurlen;
996
997     requestCount = nbytes;
998     nextio = 0;
999
1000     /* Free any packets from the last call to ReadvProc/WritevProc */
1001     if (queue_IsNotEmpty(&call->iovq)) {
1002 #ifdef RXDEBUG_PACKET
1003         call->iovqc -=
1004 #endif /* RXDEBUG_PACKET */
1005             rxi_FreePackets(0, &call->iovq);
1006     }
1007
1008     if (call->mode != RX_MODE_SENDING) {
1009         if ((conn->type == RX_SERVER_CONNECTION)
1010             && (call->mode == RX_MODE_RECEIVING)) {
1011             call->mode = RX_MODE_SENDING;
1012             if (cp) {
1013                 cp->flags &= ~RX_PKTFLAG_CP;
1014                 rxi_FreePacket(cp);
1015                 cp = call->currentPacket = (struct rx_packet *)0;
1016                 call->nLeft = 0;
1017                 call->nFree = 0;
1018             }
1019         } else {
1020             return 0;
1021         }
1022     }
1023
1024     /* Set up the iovec to point to data in packet buffers. */
1025     tnFree = call->nFree;
1026     tcurvec = call->curvec;
1027     tcurpos = call->curpos;
1028     tcurlen = call->curlen;
1029     do {
1030         register unsigned int t;
1031
1032         if (tnFree == 0) {
1033             /* current packet is full, allocate a new one */
1034             cp = rxi_AllocSendPacket(call, nbytes);
1035             if (cp == NULL) {
1036                 /* out of space, return what we have */
1037                 *nio = nextio;
1038                 return requestCount - nbytes;
1039             }
1040             cp->flags |= RX_PKTFLAG_IOVQ;
1041             queue_Append(&call->iovq, cp);
1042 #ifdef RXDEBUG_PACKET
1043             call->iovqc++;
1044 #endif /* RXDEBUG_PACKET */
1045             tnFree = cp->length;
1046             tcurvec = 1;
1047             tcurpos =
1048                 (char *)cp->wirevec[1].iov_base +
1049                 call->conn->securityHeaderSize;
1050             tcurlen = cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
1051         }
1052
1053         if (tnFree < nbytes) {
1054             /* try to extend the current packet */
1055             register int len, mud;
1056             len = cp->length;
1057             mud = rx_MaxUserDataSize(call);
1058             if (mud > len) {
1059                 int want;
1060                 want = MIN(nbytes - tnFree, mud - len);
1061                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
1062                 if (cp->length > (unsigned)mud)
1063                     cp->length = mud;
1064                 tnFree += (cp->length - len);
1065                 if (cp == call->currentPacket) {
1066                     call->nFree += (cp->length - len);
1067                 }
1068             }
1069         }
1070
1071         /* fill in the next entry in the iovec */
1072         t = MIN(tcurlen, nbytes);
1073         t = MIN(tnFree, t);
1074         iov[nextio].iov_base = tcurpos;
1075         iov[nextio].iov_len = t;
1076         nbytes -= t;
1077         tcurpos += t;
1078         tcurlen -= t;
1079         tnFree -= t;
1080         nextio++;
1081
1082         if (!tcurlen) {
1083             /* need to get another struct iov */
1084             if (++tcurvec >= cp->niovecs) {
1085                 /* current packet is full, extend it or move on to next packet */
1086                 tnFree = 0;
1087             } else {
1088                 tcurpos = (char *)cp->wirevec[tcurvec].iov_base;
1089                 tcurlen = cp->wirevec[tcurvec].iov_len;
1090             }
1091         }
1092     } while (nbytes && nextio < maxio);
1093     *nio = nextio;
1094     return requestCount - nbytes;
1095 }
1096
1097 int
1098 rx_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
1099                int nbytes)
1100 {
1101     int bytes;
1102     SPLVAR;
1103
1104     NETPRI;
1105     MUTEX_ENTER(&call->lock);
1106     bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
1107     MUTEX_EXIT(&call->lock);
1108     USERPRI;
1109     return bytes;
1110 }
1111
1112 /* rxi_WritevProc -- internal version.
1113  *
1114  * Send buffers allocated in rxi_WritevAlloc.
1115  *
1116  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
1117
1118 int
1119 rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1120 {
1121     struct rx_packet *cp = call->currentPacket;
1122     struct rx_call *p, *np;
1123     int nextio;
1124     int requestCount;
1125     struct rx_queue tmpq;
1126 #ifdef RXDEBUG_PACKET
1127     u_short tmpqc;
1128 #endif
1129
1130     requestCount = nbytes;
1131     nextio = 0;
1132
1133     if (call->mode != RX_MODE_SENDING) {
1134         call->error = RX_PROTOCOL_ERROR;
1135     }
1136 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1137     /* Wait until TQ_BUSY is reset before trying to move any
1138      * packets to the transmit queue.  */
1139     while (!call->error && call->flags & RX_CALL_TQ_BUSY) {
1140         call->flags |= RX_CALL_TQ_WAIT;
1141 #ifdef RX_ENABLE_LOCKS
1142         CV_WAIT(&call->cv_tq, &call->lock);
1143 #else /* RX_ENABLE_LOCKS */
1144         osi_rxSleep(&call->tq);
1145 #endif /* RX_ENABLE_LOCKS */
1146     }
1147 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1148
1149     if (call->error) {
1150         if (cp) {
1151             cp->flags &= ~RX_PKTFLAG_CP;
1152             cp->flags |= RX_PKTFLAG_IOVQ;
1153             queue_Prepend(&call->iovq, cp);
1154 #ifdef RXDEBUG_PACKET
1155             call->iovqc++;
1156 #endif /* RXDEBUG_PACKET */
1157             cp = call->currentPacket = (struct rx_packet *)0;
1158         }
1159 #ifdef RXDEBUG_PACKET
1160         call->iovqc -=
1161 #endif /* RXDEBUG_PACKET */
1162             rxi_FreePackets(0, &call->iovq);
1163         return 0;
1164     }
1165
1166     /* Loop through the I/O vector adjusting packet pointers.
1167      * Place full packets back onto the iovq once they are ready
1168      * to send. Set RX_PROTOCOL_ERROR if any problems are found in
1169      * the iovec. We put the loop condition at the end to ensure that
1170      * a zero length write will push a short packet. */
1171     nextio = 0;
1172     queue_Init(&tmpq);
1173 #ifdef RXDEBUG_PACKET
1174     tmpqc = 0;
1175 #endif /* RXDEBUG_PACKET */
1176     do {
1177         if (call->nFree == 0 && cp) {
1178             clock_NewTime();    /* Bogus:  need new time package */
1179             /* The 0, below, specifies that it is not the last packet: 
1180              * there will be others. PrepareSendPacket may
1181              * alter the packet length by up to
1182              * conn->securityMaxTrailerSize */
1183             hadd32(call->bytesSent, cp->length);
1184             rxi_PrepareSendPacket(call, cp, 0);
1185             queue_Append(&tmpq, cp);
1186 #ifdef RXDEBUG_PACKET
1187             tmpqc++;
1188 #endif /* RXDEBUG_PACKET */
1189             cp = call->currentPacket = (struct rx_packet *)0;
1190
1191             /* The head of the iovq is now the current packet */
1192             if (nbytes) {
1193                 if (queue_IsEmpty(&call->iovq)) {
1194                     call->error = RX_PROTOCOL_ERROR;
1195 #ifdef RXDEBUG_PACKET
1196                     tmpqc -=
1197 #endif /* RXDEBUG_PACKET */
1198                         rxi_FreePackets(0, &tmpq);
1199                     return 0;
1200                 }
1201                 cp = queue_First(&call->iovq, rx_packet);
1202                 queue_Remove(cp);
1203                 cp->flags &= ~RX_PKTFLAG_IOVQ;
1204 #ifdef RXDEBUG_PACKET
1205                 call->iovqc--;
1206 #endif /* RXDEBUG_PACKET */
1207                 cp->flags |= RX_PKTFLAG_CP;
1208                 call->currentPacket = cp;
1209                 call->nFree = cp->length;
1210                 call->curvec = 1;
1211                 call->curpos =
1212                     (char *)cp->wirevec[1].iov_base +
1213                     call->conn->securityHeaderSize;
1214                 call->curlen =
1215                     cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
1216             }
1217         }
1218
1219         if (nbytes) {
1220             /* The next iovec should point to the current position */
1221             if (iov[nextio].iov_base != call->curpos
1222                 || iov[nextio].iov_len > (int)call->curlen) {
1223                 call->error = RX_PROTOCOL_ERROR;
1224                 if (cp) {
1225                     cp->flags &= ~RX_PKTFLAG_CP;
1226                     queue_Prepend(&tmpq, cp);
1227 #ifdef RXDEBUG_PACKET
1228                     tmpqc++;
1229 #endif /* RXDEBUG_PACKET */
1230                     cp = call->currentPacket = (struct rx_packet *)0;
1231                 }
1232 #ifdef RXDEBUG_PACKET
1233                 tmpqc -=
1234 #endif /* RXDEBUG_PACKET */
1235                     rxi_FreePackets(0, &tmpq);
1236                 return 0;
1237             }
1238             nbytes -= iov[nextio].iov_len;
1239             call->curpos += iov[nextio].iov_len;
1240             call->curlen -= iov[nextio].iov_len;
1241             call->nFree -= iov[nextio].iov_len;
1242             nextio++;
1243             if (call->curlen == 0) {
1244                 if (++call->curvec > cp->niovecs) {
1245                     call->nFree = 0;
1246                 } else {
1247                     call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
1248                     call->curlen = cp->wirevec[call->curvec].iov_len;
1249                 }
1250             }
1251         }
1252     } while (nbytes && nextio < nio);
1253
1254     /* Move the packets from the temporary queue onto the transmit queue.
1255      * We may end up with more than call->twind packets on the queue. */
1256     
1257 #ifdef KDUMP_RX_LOCK
1258     for (queue_Scan(&tmpq, p, np, rx_call_rx_lock))
1259 #else
1260     for (queue_Scan(&tmpq, p, np, rx_call))
1261 #endif
1262     {
1263         p->flags |= RX_PKTFLAG_TQ;
1264     }
1265     queue_SpliceAppend(&call->tq, &tmpq);
1266
1267     if (!(call->flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
1268         rxi_Start(0, call, 0, 0);
1269     }
1270
1271     /* Wait for the length of the transmit queue to fall below call->twind */
1272     while (!call->error && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
1273         clock_NewTime();
1274         call->startWait = clock_Sec();
1275 #ifdef  RX_ENABLE_LOCKS
1276         CV_WAIT(&call->cv_twind, &call->lock);
1277 #else
1278         call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
1279         osi_rxSleep(&call->twind);
1280 #endif
1281         call->startWait = 0;
1282     }
1283
1284     if (call->error) {
1285         if (cp) {
1286             cp->flags &= ~RX_PKTFLAG_CP;
1287             rxi_FreePacket(cp);
1288             cp = call->currentPacket = (struct rx_packet *)0;
1289         }
1290         return 0;
1291     }
1292
1293     return requestCount - nbytes;
1294 }
1295
1296 int
1297 rx_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1298 {
1299     int bytes;
1300     SPLVAR;
1301
1302     NETPRI;
1303     MUTEX_ENTER(&call->lock);
1304     bytes = rxi_WritevProc(call, iov, nio, nbytes);
1305     MUTEX_EXIT(&call->lock);
1306     USERPRI;
1307     return bytes;
1308 }
1309
1310 /* Flush any buffered data to the stream, switch to read mode
1311  * (clients) or to EOF mode (servers) */
1312 void
1313 rxi_FlushWrite(register struct rx_call *call)
1314 {
1315     register struct rx_packet *cp = call->currentPacket;
1316
1317     /* Free any packets from the last call to ReadvProc/WritevProc */
1318     if (queue_IsNotEmpty(&call->iovq)) {
1319 #ifdef RXDEBUG_PACKET
1320         call->iovqc -=
1321 #endif /* RXDEBUG_PACKET */
1322             rxi_FreePackets(0, &call->iovq);
1323     }
1324
1325     if (call->mode == RX_MODE_SENDING) {
1326
1327         call->mode =
1328             (call->conn->type ==
1329              RX_CLIENT_CONNECTION ? RX_MODE_RECEIVING : RX_MODE_EOF);
1330
1331 #ifdef RX_KERNEL_TRACE
1332         {
1333             int glockOwner = ISAFS_GLOCK();
1334             if (!glockOwner)
1335                 AFS_GLOCK();
1336             afs_Trace3(afs_iclSetp, CM_TRACE_WASHERE, ICL_TYPE_STRING,
1337                        __FILE__, ICL_TYPE_INT32, __LINE__, ICL_TYPE_POINTER,
1338                        call);
1339             if (!glockOwner)
1340                 AFS_GUNLOCK();
1341         }
1342 #endif
1343
1344 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1345         /* Wait until TQ_BUSY is reset before adding any
1346          * packets to the transmit queue
1347          */
1348         while (call->flags & RX_CALL_TQ_BUSY) {
1349             call->flags |= RX_CALL_TQ_WAIT;
1350 #ifdef RX_ENABLE_LOCKS
1351             CV_WAIT(&call->cv_tq, &call->lock);
1352 #else /* RX_ENABLE_LOCKS */
1353             osi_rxSleep(&call->tq);
1354 #endif /* RX_ENABLE_LOCKS */
1355         }
1356 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1357
1358         if (cp) {
1359             /* cp->length is only supposed to be the user's data */
1360             /* cp->length was already set to (then-current) 
1361              * MaxUserDataSize or less. */
1362             cp->flags &= ~RX_PKTFLAG_CP;
1363             cp->length -= call->nFree;
1364             call->currentPacket = (struct rx_packet *)0;
1365             call->nFree = 0;
1366         } else {
1367             cp = rxi_AllocSendPacket(call, 0);
1368             if (!cp) {
1369                 /* Mode can no longer be MODE_SENDING */
1370                 return;
1371             }
1372             cp->length = 0;
1373             cp->niovecs = 2;    /* header + space for rxkad stuff */
1374             call->nFree = 0;
1375         }
1376
1377         /* The 1 specifies that this is the last packet */
1378         hadd32(call->bytesSent, cp->length);
1379         rxi_PrepareSendPacket(call, cp, 1);
1380         cp->flags |= RX_PKTFLAG_TQ;
1381         queue_Append(&call->tq, cp);
1382 #ifdef RXDEBUG_PACKET
1383         call->tqc++;
1384 #endif /* RXDEBUG_PACKET */
1385         if (!
1386             (call->
1387              flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
1388             rxi_Start(0, call, 0, 0);
1389         }
1390     }
1391 }
1392
1393 /* Flush any buffered data to the stream, switch to read mode
1394  * (clients) or to EOF mode (servers) */
1395 void
1396 rx_FlushWrite(struct rx_call *call)
1397 {
1398     SPLVAR;
1399     NETPRI;
1400     MUTEX_ENTER(&call->lock);
1401     rxi_FlushWrite(call);
1402     MUTEX_EXIT(&call->lock);
1403     USERPRI;
1404 }