rx: Reduce dependence on call->lock
[openafs.git] / src / rx / rx_rdwr.c
1  /*
2   * Copyright 2000, International Business Machines Corporation and others.
3   * All Rights Reserved.
4   *
5   * This software has been released under the terms of the IBM Public
6   * License.  For details, see the LICENSE file in the top-level source
7   * directory or online at http://www.openafs.org/dl/license10.html
8   */
9
10 #include <afsconfig.h>
11 #ifdef KERNEL
12 #include "afs/param.h"
13 #else
14 #include <afs/param.h>
15 #endif
16
17
18 #ifdef KERNEL
19 #ifndef UKERNEL
20 #ifdef RX_KERNEL_TRACE
21 #include "rx_kcommon.h"
22 #endif
23 #if defined(AFS_DARWIN_ENV) || defined(AFS_XBSD_ENV)
24 #include "afs/sysincludes.h"
25 #else
26 #include "h/types.h"
27 #include "h/time.h"
28 #include "h/stat.h"
29 #if defined(AFS_AIX_ENV) || defined(AFS_AUX_ENV) || defined(AFS_SUN5_ENV)
30 #include "h/systm.h"
31 #endif
32 #ifdef  AFS_OSF_ENV
33 #include <net/net_globals.h>
34 #endif /* AFS_OSF_ENV */
35 #ifdef AFS_LINUX20_ENV
36 #include "h/socket.h"
37 #endif
38 #include "netinet/in.h"
39 #if defined(AFS_SGI_ENV)
40 #include "afs/sysincludes.h"
41 #endif
42 #endif
43 #include "afs/afs_args.h"
44 #include "afs/afs_osi.h"
45 #if     (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
46 #include "h/systm.h"
47 #endif
48 #else /* !UKERNEL */
49 #include "afs/sysincludes.h"
50 #endif /* !UKERNEL */
51 #ifdef RXDEBUG
52 #undef RXDEBUG                  /* turn off debugging */
53 #endif /* RXDEBUG */
54
55 #include "rx_kmutex.h"
56 #include "rx/rx_kernel.h"
57 #include "rx/rx_clock.h"
58 #include "rx/rx_queue.h"
59 #include "rx/rx.h"
60 #include "rx/rx_globals.h"
61 #include "afs/lock.h"
62 #include "afsint.h"
63 #ifdef  AFS_OSF_ENV
64 #undef kmem_alloc
65 #undef kmem_free
66 #undef mem_alloc
67 #undef mem_free
68 #endif /* AFS_OSF_ENV */
69 #else /* KERNEL */
70 # include <sys/types.h>
71 #ifdef AFS_NT40_ENV
72 # include <winsock2.h>
73 #else /* !AFS_NT40_ENV */
74 # include <sys/socket.h>
75 # include <sys/file.h>
76 # include <netdb.h>
77 # include <netinet/in.h>
78 # include <sys/stat.h>
79 # include <sys/time.h>
80 #endif /* !AFS_NT40_ENV */
81 #include <string.h>
82 #ifdef HAVE_UNISTD_H
83 #include <unistd.h>
84 #endif
85 # include "rx_user.h"
86 # include "rx_clock.h"
87 # include "rx_queue.h"
88 # include "rx.h"
89 # include "rx_globals.h"
90 #endif /* KERNEL */
91
92 #ifdef RX_LOCKS_DB
93 /* rxdb_fileID is used to identify the lock location, along with line#. */
94 static int rxdb_fileID = RXDB_FILE_RX_RDWR;
95 #endif /* RX_LOCKS_DB */
96 /* rxi_ReadProc -- internal version.
97  *
98  * LOCKS USED -- called at netpri
99  */
100 int
101 rxi_ReadProc(struct rx_call *call, char *buf,
102              int nbytes)
103 {
104     struct rx_packet *cp = call->currentPacket;
105     struct rx_packet *rp;
106     int requestCount;
107     unsigned int t;
108
109 /* XXXX took out clock_NewTime from here.  Was it needed? */
110     requestCount = nbytes;
111
112     /* Free any packets from the last call to ReadvProc/WritevProc */
113     if (queue_IsNotEmpty(&call->iovq)) {
114 #ifdef RXDEBUG_PACKET
115         call->iovqc -=
116 #endif /* RXDEBUG_PACKET */
117             rxi_FreePackets(0, &call->iovq);
118     }
119
120     do {
121         if (call->nLeft == 0) {
122             /* Get next packet */
123             MUTEX_ENTER(&call->lock);
124             for (;;) {
125                 if (call->error || (call->mode != RX_MODE_RECEIVING)) {
126                     if (call->error) {
127                         call->mode = RX_MODE_ERROR;
128                         MUTEX_EXIT(&call->lock);
129                         return 0;
130                     }
131                     if (call->mode == RX_MODE_SENDING) {
132                         MUTEX_EXIT(&call->lock);
133                         rxi_FlushWrite(call);
134                         MUTEX_ENTER(&call->lock);
135                         continue;
136                     }
137                 }
138                 if (queue_IsNotEmpty(&call->rq)) {
139                     /* Check that next packet available is next in sequence */
140                     rp = queue_First(&call->rq, rx_packet);
141                     if (rp->header.seq == call->rnext) {
142                         afs_int32 error;
143                         struct rx_connection *conn = call->conn;
144                         queue_Remove(rp);
145 #ifdef RX_TRACK_PACKETS
146                         rp->flags &= ~RX_PKTFLAG_RQ;
147 #endif
148 #ifdef RXDEBUG_PACKET
149                         call->rqc--;
150 #endif /* RXDEBUG_PACKET */
151
152                         /* RXS_CheckPacket called to undo RXS_PreparePacket's
153                          * work.  It may reduce the length of the packet by up
154                          * to conn->maxTrailerSize, to reflect the length of the
155                          * data + the header. */
156                         if ((error =
157                              RXS_CheckPacket(conn->securityObject, call,
158                                              rp))) {
159                             /* Used to merely shut down the call, but now we
160                              * shut down the whole connection since this may
161                              * indicate an attempt to hijack it */
162
163                             MUTEX_EXIT(&call->lock);
164                             rxi_ConnectionError(conn, error);
165                             MUTEX_ENTER(&conn->conn_data_lock);
166                             rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
167                             MUTEX_EXIT(&conn->conn_data_lock);
168                             rxi_FreePacket(rp);
169
170                             return 0;
171                         }
172                         call->rnext++;
173                         cp = call->currentPacket = rp;
174 #ifdef RX_TRACK_PACKETS
175                         call->currentPacket->flags |= RX_PKTFLAG_CP;
176 #endif
177                         call->curvec = 1;       /* 0th vec is always header */
178                         /* begin at the beginning [ more or less ], continue
179                          * on until the end, then stop. */
180                         call->curpos =
181                             (char *)cp->wirevec[1].iov_base +
182                             call->conn->securityHeaderSize;
183                         call->curlen =
184                             cp->wirevec[1].iov_len -
185                             call->conn->securityHeaderSize;
186
187                         /* Notice that this code works correctly if the data
188                          * size is 0 (which it may be--no reply arguments from
189                          * server, for example).  This relies heavily on the
190                          * fact that the code below immediately frees the packet
191                          * (no yields, etc.).  If it didn't, this would be a
192                          * problem because a value of zero for call->nLeft
193                          * normally means that there is no read packet */
194                         call->nLeft = cp->length;
195                         hadd32(call->bytesRcvd, cp->length);
196
197                         /* Send a hard ack for every rxi_HardAckRate+1 packets
198                          * consumed. Otherwise schedule an event to send
199                          * the hard ack later on.
200                          */
201                         call->nHardAcks++;
202                         if (!(call->flags & RX_CALL_RECEIVE_DONE)) {
203                             if (call->nHardAcks > (u_short) rxi_HardAckRate) {
204                                 rxevent_Cancel(call->delayedAckEvent, call,
205                                                RX_CALL_REFCOUNT_DELAY);
206                                 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
207                             } else {
208                                 struct clock when, now;
209                                 clock_GetTime(&now);
210                                 when = now;
211                                 /* Delay to consolidate ack packets */
212                                 clock_Add(&when, &rx_hardAckDelay);
213                                 if (!call->delayedAckEvent
214                                     || clock_Gt(&call->delayedAckEvent->
215                                                 eventTime, &when)) {
216                                     rxevent_Cancel(call->delayedAckEvent,
217                                                    call,
218                                                    RX_CALL_REFCOUNT_DELAY);
219                                     MUTEX_ENTER(&rx_refcnt_mutex);
220                                     CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
221                                     MUTEX_EXIT(&rx_refcnt_mutex);
222                                     call->delayedAckEvent =
223                                       rxevent_PostNow(&when, &now,
224                                                      rxi_SendDelayedAck, call,
225                                                      0);
226                                 }
227                             }
228                         }
229                         break;
230                     }
231                 }
232
233                 /*
234                  * If we reach this point either we have no packets in the
235                  * receive queue or the next packet in the queue is not the
236                  * one we are looking for.  There is nothing else for us to
237                  * do but wait for another packet to arrive.
238                  */
239
240                 /* Are there ever going to be any more packets? */
241                 if (call->flags & RX_CALL_RECEIVE_DONE) {
242                     MUTEX_EXIT(&call->lock);
243                     return requestCount - nbytes;
244                 }
245                 /* Wait for in-sequence packet */
246                 call->flags |= RX_CALL_READER_WAIT;
247                 clock_NewTime();
248                 call->startWait = clock_Sec();
249                 while (call->flags & RX_CALL_READER_WAIT) {
250 #ifdef  RX_ENABLE_LOCKS
251                     CV_WAIT(&call->cv_rq, &call->lock);
252 #else
253                     osi_rxSleep(&call->rq);
254 #endif
255                 }
256                 cp = call->currentPacket;
257
258                 call->startWait = 0;
259 #ifdef RX_ENABLE_LOCKS
260                 if (call->error) {
261                     MUTEX_EXIT(&call->lock);
262                     return 0;
263                 }
264 #endif /* RX_ENABLE_LOCKS */
265             }
266             MUTEX_EXIT(&call->lock);
267         } else
268             /* assert(cp); */
269             /* MTUXXX  this should be replaced by some error-recovery code before shipping */
270             /* yes, the following block is allowed to be the ELSE clause (or not) */
271             /* It's possible for call->nLeft to be smaller than any particular
272              * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
273              * reflects the size of the buffer.  We have to keep track of the
274              * number of bytes read in the length field of the packet struct.  On
275              * the final portion of a received packet, it's almost certain that
276              * call->nLeft will be smaller than the final buffer. */
277             while (nbytes && cp) {
278                 t = MIN((int)call->curlen, nbytes);
279                 t = MIN(t, (int)call->nLeft);
280                 memcpy(buf, call->curpos, t);
281                 buf += t;
282                 nbytes -= t;
283                 call->curpos += t;
284                 call->curlen -= t;
285                 call->nLeft -= t;
286
287                 if (!call->nLeft) {
288                     /* out of packet.  Get another one. */
289 #ifdef RX_TRACK_PACKETS
290                     call->currentPacket->flags &= ~RX_PKTFLAG_CP;
291 #endif
292                     rxi_FreePacket(cp);
293                     cp = call->currentPacket = (struct rx_packet *)0;
294                 } else if (!call->curlen) {
295                     /* need to get another struct iov */
296                     if (++call->curvec >= cp->niovecs) {
297                         /* current packet is exhausted, get ready for another */
298                         /* don't worry about curvec and stuff, they get set somewhere else */
299 #ifdef RX_TRACK_PACKETS
300                         call->currentPacket->flags &= ~RX_PKTFLAG_CP;
301 #endif
302                         rxi_FreePacket(cp);
303                         cp = call->currentPacket = (struct rx_packet *)0;
304                         call->nLeft = 0;
305                     } else {
306                         call->curpos =
307                             (char *)cp->wirevec[call->curvec].iov_base;
308                         call->curlen = cp->wirevec[call->curvec].iov_len;
309                     }
310                 }
311             }
312         if (!nbytes) {
313             /* user buffer is full, return */
314             return requestCount;
315         }
316
317     } while (nbytes);
318
319     return requestCount;
320 }
321
322 int
323 rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
324 {
325     int bytes;
326     int tcurlen;
327     int tnLeft;
328     char *tcurpos;
329     SPLVAR;
330
331     /* Free any packets from the last call to ReadvProc/WritevProc */
332     if (!queue_IsEmpty(&call->iovq)) {
333 #ifdef RXDEBUG_PACKET
334         call->iovqc -=
335 #endif /* RXDEBUG_PACKET */
336             rxi_FreePackets(0, &call->iovq);
337     }
338
339     /*
340      * Most common case, all of the data is in the current iovec.
341      * We are relying on nLeft being zero unless the call is in receive mode.
342      */
343     tcurlen = call->curlen;
344     tnLeft = call->nLeft;
345     if (!call->error && tcurlen > nbytes && tnLeft > nbytes) {
346         tcurpos = call->curpos;
347         memcpy(buf, tcurpos, nbytes);
348
349         call->curpos = tcurpos + nbytes;
350         call->curlen = tcurlen - nbytes;
351         call->nLeft = tnLeft - nbytes;
352
353         if (!call->nLeft && call->currentPacket != NULL) {
354             /* out of packet.  Get another one. */
355             rxi_FreePacket(call->currentPacket);
356             call->currentPacket = (struct rx_packet *)0;
357         }
358         return nbytes;
359     }
360
361     NETPRI;
362     bytes = rxi_ReadProc(call, buf, nbytes);
363     USERPRI;
364     return bytes;
365 }
366
367 /* Optimization for unmarshalling 32 bit integers */
368 int
369 rx_ReadProc32(struct rx_call *call, afs_int32 * value)
370 {
371     int bytes;
372     int tcurlen;
373     int tnLeft;
374     char *tcurpos;
375     SPLVAR;
376
377     /* Free any packets from the last call to ReadvProc/WritevProc */
378     if (!queue_IsEmpty(&call->iovq)) {
379 #ifdef RXDEBUG_PACKET
380         call->iovqc -=
381 #endif /* RXDEBUG_PACKET */
382             rxi_FreePackets(0, &call->iovq);
383     }
384
385     /*
386      * Most common case, all of the data is in the current iovec.
387      * We are relying on nLeft being zero unless the call is in receive mode.
388      */
389     tcurlen = call->curlen;
390     tnLeft = call->nLeft;
391     if (!call->error && tcurlen >= sizeof(afs_int32)
392         && tnLeft >= sizeof(afs_int32)) {
393         tcurpos = call->curpos;
394
395         memcpy((char *)value, tcurpos, sizeof(afs_int32));
396
397         call->curpos = tcurpos + sizeof(afs_int32);
398         call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
399         call->nLeft = (u_short)(tnLeft - sizeof(afs_int32));
400         if (!call->nLeft && call->currentPacket != NULL) {
401             /* out of packet.  Get another one. */
402             rxi_FreePacket(call->currentPacket);
403             call->currentPacket = (struct rx_packet *)0;
404         }
405         return sizeof(afs_int32);
406     }
407
408     NETPRI;
409     bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
410     USERPRI;
411
412     return bytes;
413 }
414
415 /* rxi_FillReadVec
416  *
417  * Uses packets in the receive queue to fill in as much of the
418  * current iovec as possible. Does not block if it runs out
419  * of packets to complete the iovec. Return true if an ack packet
420  * was sent, otherwise return false */
421 int
422 rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
423 {
424     int didConsume = 0;
425     int didHardAck = 0;
426     unsigned int t;
427     struct rx_packet *rp;
428     struct rx_packet *curp;
429     struct iovec *call_iov;
430     struct iovec *cur_iov = NULL;
431
432     curp = call->currentPacket;
433     if (curp) {
434         cur_iov = &curp->wirevec[call->curvec];
435     }
436     call_iov = &call->iov[call->iovNext];
437
438     while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) {
439         if (call->nLeft == 0) {
440             /* Get next packet */
441             if (queue_IsNotEmpty(&call->rq)) {
442                 /* Check that next packet available is next in sequence */
443                 rp = queue_First(&call->rq, rx_packet);
444                 if (rp->header.seq == call->rnext) {
445                     afs_int32 error;
446                     struct rx_connection *conn = call->conn;
447                     queue_Remove(rp);
448 #ifdef RX_TRACK_PACKETS
449                     rp->flags &= ~RX_PKTFLAG_RQ;
450 #endif
451 #ifdef RXDEBUG_PACKET
452                     call->rqc--;
453 #endif /* RXDEBUG_PACKET */
454
455                     /* RXS_CheckPacket called to undo RXS_PreparePacket's
456                      * work.  It may reduce the length of the packet by up
457                      * to conn->maxTrailerSize, to reflect the length of the
458                      * data + the header. */
459                     if ((error =
460                          RXS_CheckPacket(conn->securityObject, call, rp))) {
461                         /* Used to merely shut down the call, but now we
462                          * shut down the whole connection since this may
463                          * indicate an attempt to hijack it */
464
465                         MUTEX_EXIT(&call->lock);
466                         rxi_ConnectionError(conn, error);
467                         MUTEX_ENTER(&conn->conn_data_lock);
468                         rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
469                         MUTEX_EXIT(&conn->conn_data_lock);
470                         rxi_FreePacket(rp);
471                         MUTEX_ENTER(&call->lock);
472
473                         return 1;
474                     }
475                     call->rnext++;
476                     curp = call->currentPacket = rp;
477 #ifdef RX_TRACK_PACKETS
478                     call->currentPacket->flags |= RX_PKTFLAG_CP;
479 #endif
480                     call->curvec = 1;   /* 0th vec is always header */
481                     cur_iov = &curp->wirevec[1];
482                     /* begin at the beginning [ more or less ], continue
483                      * on until the end, then stop. */
484                     call->curpos =
485                         (char *)curp->wirevec[1].iov_base +
486                         call->conn->securityHeaderSize;
487                     call->curlen =
488                         curp->wirevec[1].iov_len -
489                         call->conn->securityHeaderSize;
490
491                     /* Notice that this code works correctly if the data
492                      * size is 0 (which it may be--no reply arguments from
493                      * server, for example).  This relies heavily on the
494                      * fact that the code below immediately frees the packet
495                      * (no yields, etc.).  If it didn't, this would be a
496                      * problem because a value of zero for call->nLeft
497                      * normally means that there is no read packet */
498                     call->nLeft = curp->length;
499                     hadd32(call->bytesRcvd, curp->length);
500
501                     /* Send a hard ack for every rxi_HardAckRate+1 packets
502                      * consumed. Otherwise schedule an event to send
503                      * the hard ack later on.
504                      */
505                     call->nHardAcks++;
506                     didConsume = 1;
507                     continue;
508                 }
509             }
510             break;
511         }
512
513         /* It's possible for call->nLeft to be smaller than any particular
514          * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
515          * reflects the size of the buffer.  We have to keep track of the
516          * number of bytes read in the length field of the packet struct.  On
517          * the final portion of a received packet, it's almost certain that
518          * call->nLeft will be smaller than the final buffer. */
519         while (call->iovNBytes && call->iovNext < call->iovMax && curp) {
520
521             t = MIN((int)call->curlen, call->iovNBytes);
522             t = MIN(t, (int)call->nLeft);
523             call_iov->iov_base = call->curpos;
524             call_iov->iov_len = t;
525             call_iov++;
526             call->iovNext++;
527             call->iovNBytes -= t;
528             call->curpos += t;
529             call->curlen -= t;
530             call->nLeft -= t;
531
532             if (!call->nLeft) {
533                 /* out of packet.  Get another one. */
534 #ifdef RX_TRACK_PACKETS
535                 curp->flags &= ~RX_PKTFLAG_CP;
536                 curp->flags |= RX_PKTFLAG_IOVQ;
537 #endif
538                 queue_Append(&call->iovq, curp);
539 #ifdef RXDEBUG_PACKET
540                 call->iovqc++;
541 #endif /* RXDEBUG_PACKET */
542                 curp = call->currentPacket = (struct rx_packet *)0;
543             } else if (!call->curlen) {
544                 /* need to get another struct iov */
545                 if (++call->curvec >= curp->niovecs) {
546                     /* current packet is exhausted, get ready for another */
547                     /* don't worry about curvec and stuff, they get set somewhere else */
548 #ifdef RX_TRACK_PACKETS
549                     curp->flags &= ~RX_PKTFLAG_CP;
550                     curp->flags |= RX_PKTFLAG_IOVQ;
551 #endif
552                     queue_Append(&call->iovq, curp);
553 #ifdef RXDEBUG_PACKET
554                     call->iovqc++;
555 #endif /* RXDEBUG_PACKET */
556                     curp = call->currentPacket = (struct rx_packet *)0;
557                     call->nLeft = 0;
558                 } else {
559                     cur_iov++;
560                     call->curpos = (char *)cur_iov->iov_base;
561                     call->curlen = cur_iov->iov_len;
562                 }
563             }
564         }
565     }
566
567     /* If we consumed any packets then check whether we need to
568      * send a hard ack. */
569     if (didConsume && (!(call->flags & RX_CALL_RECEIVE_DONE))) {
570         if (call->nHardAcks > (u_short) rxi_HardAckRate) {
571             rxevent_Cancel(call->delayedAckEvent, call,
572                            RX_CALL_REFCOUNT_DELAY);
573             rxi_SendAck(call, 0, serial, RX_ACK_DELAY, 0);
574             didHardAck = 1;
575         } else {
576             struct clock when, now;
577             clock_GetTime(&now);
578             when = now;
579             /* Delay to consolidate ack packets */
580             clock_Add(&when, &rx_hardAckDelay);
581             if (!call->delayedAckEvent
582                 || clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
583                 rxevent_Cancel(call->delayedAckEvent, call,
584                                RX_CALL_REFCOUNT_DELAY);
585                 MUTEX_ENTER(&rx_refcnt_mutex);
586                 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
587                 MUTEX_EXIT(&rx_refcnt_mutex);
588                 call->delayedAckEvent =
589                     rxevent_PostNow(&when, &now, rxi_SendDelayedAck, call, 0);
590             }
591         }
592     }
593     return didHardAck;
594 }
595
596
597 /* rxi_ReadvProc -- internal version.
598  *
599  * Fills in an iovec with pointers to the packet buffers. All packets
600  * except the last packet (new current packet) are moved to the iovq
601  * while the application is processing the data.
602  *
603  * LOCKS USED -- called at netpri.
604  */
605 int
606 rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
607               int nbytes)
608 {
609     int bytes;
610
611     /* Free any packets from the last call to ReadvProc/WritevProc */
612     if (queue_IsNotEmpty(&call->iovq)) {
613 #ifdef RXDEBUG_PACKET
614         call->iovqc -=
615 #endif /* RXDEBUG_PACKET */
616             rxi_FreePackets(0, &call->iovq);
617     }
618
619     if (call->mode == RX_MODE_SENDING) {
620         rxi_FlushWrite(call);
621     }
622
623     MUTEX_ENTER(&call->lock);
624     if (call->error)
625         goto error;
626
627     /* Get whatever data is currently available in the receive queue.
628      * If rxi_FillReadVec sends an ack packet then it is possible
629      * that we will receive more data while we drop the call lock
630      * to send the packet. Set the RX_CALL_IOVEC_WAIT flag
631      * here to avoid a race with the receive thread if we send
632      * hard acks in rxi_FillReadVec. */
633     call->flags |= RX_CALL_IOVEC_WAIT;
634     call->iovNBytes = nbytes;
635     call->iovMax = maxio;
636     call->iovNext = 0;
637     call->iov = iov;
638     rxi_FillReadVec(call, 0);
639
640     /* if we need more data then sleep until the receive thread has
641      * filled in the rest. */
642     if (!call->error && call->iovNBytes && call->iovNext < call->iovMax
643         && !(call->flags & RX_CALL_RECEIVE_DONE)) {
644         call->flags |= RX_CALL_READER_WAIT;
645         clock_NewTime();
646         call->startWait = clock_Sec();
647         while (call->flags & RX_CALL_READER_WAIT) {
648 #ifdef  RX_ENABLE_LOCKS
649             CV_WAIT(&call->cv_rq, &call->lock);
650 #else
651             osi_rxSleep(&call->rq);
652 #endif
653         }
654         call->startWait = 0;
655     }
656     call->flags &= ~RX_CALL_IOVEC_WAIT;
657
658     if (call->error)
659         goto error;
660
661     call->iov = NULL;
662     *nio = call->iovNext;
663     bytes = nbytes - call->iovNBytes;
664     MUTEX_EXIT(&call->lock);
665     return bytes;
666
667   error:
668     MUTEX_EXIT(&call->lock);
669     call->mode = RX_MODE_ERROR;
670     return 0;
671 }
672
673 int
674 rx_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
675              int nbytes)
676 {
677     int bytes;
678     SPLVAR;
679
680     NETPRI;
681     bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
682     USERPRI;
683     return bytes;
684 }
685
686 /* rxi_WriteProc -- internal version.
687  *
688  * LOCKS USED -- called at netpri
689  */
690
691 int
692 rxi_WriteProc(struct rx_call *call, char *buf,
693               int nbytes)
694 {
695     struct rx_connection *conn = call->conn;
696     struct rx_packet *cp = call->currentPacket;
697     unsigned int t;
698     int requestCount = nbytes;
699
700     /* Free any packets from the last call to ReadvProc/WritevProc */
701     if (queue_IsNotEmpty(&call->iovq)) {
702 #ifdef RXDEBUG_PACKET
703         call->iovqc -=
704 #endif /* RXDEBUG_PACKET */
705             rxi_FreePackets(0, &call->iovq);
706     }
707
708     if (call->mode != RX_MODE_SENDING) {
709         if ((conn->type == RX_SERVER_CONNECTION)
710             && (call->mode == RX_MODE_RECEIVING)) {
711             call->mode = RX_MODE_SENDING;
712             if (cp) {
713 #ifdef RX_TRACK_PACKETS
714                 cp->flags &= ~RX_PKTFLAG_CP;
715 #endif
716                 rxi_FreePacket(cp);
717                 cp = call->currentPacket = (struct rx_packet *)0;
718                 call->nLeft = 0;
719                 call->nFree = 0;
720             }
721         } else {
722             return 0;
723         }
724     }
725
726     /* Loop condition is checked at end, so that a write of 0 bytes
727      * will force a packet to be created--specially for the case where
728      * there are 0 bytes on the stream, but we must send a packet
729      * anyway. */
730     do {
731         if (call->nFree == 0) {
732             MUTEX_ENTER(&call->lock);
733             if (call->error)
734                 call->mode = RX_MODE_ERROR;
735             if (!call->error && cp) {
736                 /* Clear the current packet now so that if
737                  * we are forced to wait and drop the lock
738                  * the packet we are planning on using
739                  * cannot be freed.
740                  */
741 #ifdef RX_TRACK_PACKETS
742                 cp->flags &= ~RX_PKTFLAG_CP;
743 #endif
744                 call->currentPacket = (struct rx_packet *)0;
745 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
746                 /* Wait until TQ_BUSY is reset before adding any
747                  * packets to the transmit queue
748                  */
749                 while (call->flags & RX_CALL_TQ_BUSY) {
750                     call->flags |= RX_CALL_TQ_WAIT;
751                     call->tqWaiters++;
752 #ifdef RX_ENABLE_LOCKS
753                     CV_WAIT(&call->cv_tq, &call->lock);
754 #else /* RX_ENABLE_LOCKS */
755                     osi_rxSleep(&call->tq);
756 #endif /* RX_ENABLE_LOCKS */
757                     call->tqWaiters--;
758                     if (call->tqWaiters == 0)
759                         call->flags &= ~RX_CALL_TQ_WAIT;
760                 }
761 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
762                 clock_NewTime();        /* Bogus:  need new time package */
763                 /* The 0, below, specifies that it is not the last packet:
764                  * there will be others. PrepareSendPacket may
765                  * alter the packet length by up to
766                  * conn->securityMaxTrailerSize */
767                 hadd32(call->bytesSent, cp->length);
768                 rxi_PrepareSendPacket(call, cp, 0);
769 #ifdef RX_TRACK_PACKETS
770                 cp->flags |= RX_PKTFLAG_TQ;
771 #endif
772                 queue_Append(&call->tq, cp);
773 #ifdef RXDEBUG_PACKET
774                 call->tqc++;
775 #endif /* RXDEBUG_PACKET */
776                 cp = (struct rx_packet *)0;
777                 if (!
778                     (call->
779                      flags & (RX_CALL_FAST_RECOVER |
780                               RX_CALL_FAST_RECOVER_WAIT))) {
781                     rxi_Start(0, call, 0, 0);
782                 }
783             } else if (cp) {
784 #ifdef RX_TRACK_PACKETS
785                 cp->flags &= ~RX_PKTFLAG_CP;
786 #endif
787                 rxi_FreePacket(cp);
788                 cp = call->currentPacket = (struct rx_packet *)0;
789             }
790             /* Wait for transmit window to open up */
791             while (!call->error
792                    && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
793                 clock_NewTime();
794                 call->startWait = clock_Sec();
795
796 #ifdef  RX_ENABLE_LOCKS
797                 CV_WAIT(&call->cv_twind, &call->lock);
798 #else
799                 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
800                 osi_rxSleep(&call->twind);
801 #endif
802
803                 call->startWait = 0;
804 #ifdef RX_ENABLE_LOCKS
805                 if (call->error) {
806                     call->mode = RX_MODE_ERROR;
807                     MUTEX_EXIT(&call->lock);
808                     return 0;
809                 }
810 #endif /* RX_ENABLE_LOCKS */
811             }
812             if ((cp = rxi_AllocSendPacket(call, nbytes))) {
813 #ifdef RX_TRACK_PACKETS
814                 cp->flags |= RX_PKTFLAG_CP;
815 #endif
816                 call->currentPacket = cp;
817                 call->nFree = cp->length;
818                 call->curvec = 1;       /* 0th vec is always header */
819                 /* begin at the beginning [ more or less ], continue
820                  * on until the end, then stop. */
821                 call->curpos =
822                     (char *)cp->wirevec[1].iov_base +
823                     call->conn->securityHeaderSize;
824                 call->curlen =
825                     cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
826             }
827             if (call->error) {
828                 call->mode = RX_MODE_ERROR;
829                 if (cp) {
830 #ifdef RX_TRACK_PACKETS
831                     cp->flags &= ~RX_PKTFLAG_CP;
832 #endif
833                     rxi_FreePacket(cp);
834                     call->currentPacket = NULL;
835                 }
836                 MUTEX_EXIT(&call->lock);
837                 return 0;
838             }
839             MUTEX_EXIT(&call->lock);
840         }
841
842         if (cp && (int)call->nFree < nbytes) {
843             /* Try to extend the current buffer */
844             int len, mud;
845             len = cp->length;
846             mud = rx_MaxUserDataSize(call);
847             if (mud > len) {
848                 int want;
849                 want = MIN(nbytes - (int)call->nFree, mud - len);
850                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
851                 if (cp->length > (unsigned)mud)
852                     cp->length = mud;
853                 call->nFree += (cp->length - len);
854             }
855         }
856
857         /* If the remaining bytes fit in the buffer, then store them
858          * and return.  Don't ship a buffer that's full immediately to
859          * the peer--we don't know if it's the last buffer yet */
860
861         if (!cp) {
862             call->nFree = 0;
863         }
864
865         while (nbytes && call->nFree) {
866
867             t = MIN((int)call->curlen, nbytes);
868             t = MIN((int)call->nFree, t);
869             memcpy(call->curpos, buf, t);
870             buf += t;
871             nbytes -= t;
872             call->curpos += t;
873             call->curlen -= (u_short)t;
874             call->nFree -= (u_short)t;
875
876             if (!call->curlen) {
877                 /* need to get another struct iov */
878                 if (++call->curvec >= cp->niovecs) {
879                     /* current packet is full, extend or send it */
880                     call->nFree = 0;
881                 } else {
882                     call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
883                     call->curlen = cp->wirevec[call->curvec].iov_len;
884                 }
885             }
886         }                       /* while bytes to send and room to send them */
887
888         /* might be out of space now */
889         if (!nbytes) {
890             return requestCount;
891         } else;                 /* more data to send, so get another packet and keep going */
892     } while (nbytes);
893
894     return requestCount - nbytes;
895 }
896
897 int
898 rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
899 {
900     int bytes;
901     int tcurlen;
902     int tnFree;
903     char *tcurpos;
904     SPLVAR;
905
906     /* Free any packets from the last call to ReadvProc/WritevProc */
907     if (queue_IsNotEmpty(&call->iovq)) {
908 #ifdef RXDEBUG_PACKET
909         call->iovqc -=
910 #endif /* RXDEBUG_PACKET */
911             rxi_FreePackets(0, &call->iovq);
912     }
913
914     /*
915      * Most common case: all of the data fits in the current iovec.
916      * We are relying on nFree being zero unless the call is in send mode.
917      */
918     tcurlen = (int)call->curlen;
919     tnFree = (int)call->nFree;
920     if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
921         tcurpos = call->curpos;
922
923         memcpy(tcurpos, buf, nbytes);
924         call->curpos = tcurpos + nbytes;
925         call->curlen = (u_short)(tcurlen - nbytes);
926         call->nFree = (u_short)(tnFree - nbytes);
927         return nbytes;
928     }
929
930     NETPRI;
931     bytes = rxi_WriteProc(call, buf, nbytes);
932     USERPRI;
933     return bytes;
934 }
935
936 /* Optimization for marshalling 32 bit arguments */
937 int
938 rx_WriteProc32(struct rx_call *call, afs_int32 * value)
939 {
940     int bytes;
941     int tcurlen;
942     int tnFree;
943     char *tcurpos;
944     SPLVAR;
945
946     if (queue_IsNotEmpty(&call->iovq)) {
947 #ifdef RXDEBUG_PACKET
948         call->iovqc -=
949 #endif /* RXDEBUG_PACKET */
950             rxi_FreePackets(0, &call->iovq);
951     }
952
953     /*
954      * Most common case: all of the data fits in the current iovec.
955      * We are relying on nFree being zero unless the call is in send mode.
956      */
957     tcurlen = call->curlen;
958     tnFree = call->nFree;
959     if (!call->error && tcurlen >= sizeof(afs_int32)
960         && tnFree >= sizeof(afs_int32)) {
961         tcurpos = call->curpos;
962
963         if (!((size_t)tcurpos & (sizeof(afs_int32) - 1))) {
964             *((afs_int32 *) (tcurpos)) = *value;
965         } else {
966             memcpy(tcurpos, (char *)value, sizeof(afs_int32));
967         }
968         call->curpos = tcurpos + sizeof(afs_int32);
969         call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
970         call->nFree = (u_short)(tnFree - sizeof(afs_int32));
971         return sizeof(afs_int32);
972     }
973
974     NETPRI;
975     bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
976     USERPRI;
977     return bytes;
978 }
979
980 /* rxi_WritevAlloc -- internal version.
981  *
982  * Fill in an iovec to point to data in packet buffers. The application
983  * calls rxi_WritevProc when the buffers are full.
984  *
985  * LOCKS USED -- called at netpri.
986  */
987
988 int
989 rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
990                 int nbytes)
991 {
992     struct rx_connection *conn = call->conn;
993     struct rx_packet *cp = call->currentPacket;
994     int requestCount;
995     int nextio;
996     /* Temporary values, real work is done in rxi_WritevProc */
997     int tnFree;
998     unsigned int tcurvec;
999     char *tcurpos;
1000     int tcurlen;
1001
1002     requestCount = nbytes;
1003     nextio = 0;
1004
1005     /* Free any packets from the last call to ReadvProc/WritevProc */
1006     if (queue_IsNotEmpty(&call->iovq)) {
1007 #ifdef RXDEBUG_PACKET
1008         call->iovqc -=
1009 #endif /* RXDEBUG_PACKET */
1010             rxi_FreePackets(0, &call->iovq);
1011     }
1012
1013     if (call->mode != RX_MODE_SENDING) {
1014         if ((conn->type == RX_SERVER_CONNECTION)
1015             && (call->mode == RX_MODE_RECEIVING)) {
1016             call->mode = RX_MODE_SENDING;
1017             if (cp) {
1018 #ifdef RX_TRACK_PACKETS
1019                 cp->flags &= ~RX_PKTFLAG_CP;
1020 #endif
1021                 rxi_FreePacket(cp);
1022                 cp = call->currentPacket = (struct rx_packet *)0;
1023                 call->nLeft = 0;
1024                 call->nFree = 0;
1025             }
1026         } else {
1027             return 0;
1028         }
1029     }
1030
1031     /* Set up the iovec to point to data in packet buffers. */
1032     tnFree = call->nFree;
1033     tcurvec = call->curvec;
1034     tcurpos = call->curpos;
1035     tcurlen = call->curlen;
1036     do {
1037         int t;
1038
1039         if (tnFree == 0) {
1040             /* current packet is full, allocate a new one */
1041             MUTEX_ENTER(&call->lock);
1042             cp = rxi_AllocSendPacket(call, nbytes);
1043             MUTEX_EXIT(&call->lock);
1044             if (cp == NULL) {
1045                 /* out of space, return what we have */
1046                 *nio = nextio;
1047                 return requestCount - nbytes;
1048             }
1049 #ifdef RX_TRACK_PACKETS
1050             cp->flags |= RX_PKTFLAG_IOVQ;
1051 #endif
1052             queue_Append(&call->iovq, cp);
1053 #ifdef RXDEBUG_PACKET
1054             call->iovqc++;
1055 #endif /* RXDEBUG_PACKET */
1056             tnFree = cp->length;
1057             tcurvec = 1;
1058             tcurpos =
1059                 (char *)cp->wirevec[1].iov_base +
1060                 call->conn->securityHeaderSize;
1061             tcurlen = cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
1062         }
1063
1064         if (tnFree < nbytes) {
1065             /* try to extend the current packet */
1066             int len, mud;
1067             len = cp->length;
1068             mud = rx_MaxUserDataSize(call);
1069             if (mud > len) {
1070                 int want;
1071                 want = MIN(nbytes - tnFree, mud - len);
1072                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
1073                 if (cp->length > (unsigned)mud)
1074                     cp->length = mud;
1075                 tnFree += (cp->length - len);
1076                 if (cp == call->currentPacket) {
1077                     call->nFree += (cp->length - len);
1078                 }
1079             }
1080         }
1081
1082         /* fill in the next entry in the iovec */
1083         t = MIN(tcurlen, nbytes);
1084         t = MIN(tnFree, t);
1085         iov[nextio].iov_base = tcurpos;
1086         iov[nextio].iov_len = t;
1087         nbytes -= t;
1088         tcurpos += t;
1089         tcurlen -= t;
1090         tnFree -= t;
1091         nextio++;
1092
1093         if (!tcurlen) {
1094             /* need to get another struct iov */
1095             if (++tcurvec >= cp->niovecs) {
1096                 /* current packet is full, extend it or move on to next packet */
1097                 tnFree = 0;
1098             } else {
1099                 tcurpos = (char *)cp->wirevec[tcurvec].iov_base;
1100                 tcurlen = cp->wirevec[tcurvec].iov_len;
1101             }
1102         }
1103     } while (nbytes && nextio < maxio);
1104     *nio = nextio;
1105     return requestCount - nbytes;
1106 }
1107
1108 int
1109 rx_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
1110                int nbytes)
1111 {
1112     int bytes;
1113     SPLVAR;
1114
1115     NETPRI;
1116     bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
1117     USERPRI;
1118     return bytes;
1119 }
1120
1121 /* rxi_WritevProc -- internal version.
1122  *
1123  * Send buffers allocated in rxi_WritevAlloc.
1124  *
1125  * LOCKS USED -- called at netpri.
1126  */
1127 int
1128 rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1129 {
1130     struct rx_packet *cp = NULL;
1131 #ifdef RX_TRACK_PACKETS
1132     struct rx_packet *p, *np;
1133 #endif
1134     int nextio;
1135     int requestCount;
1136     struct rx_queue tmpq;
1137 #ifdef RXDEBUG_PACKET
1138     u_short tmpqc;
1139 #endif
1140
1141     requestCount = nbytes;
1142     nextio = 0;
1143
1144     MUTEX_ENTER(&call->lock);
1145     if (call->error) {
1146         call->mode = RX_MODE_ERROR;
1147     } else if (call->mode != RX_MODE_SENDING) {
1148         call->error = RX_PROTOCOL_ERROR;
1149     }
1150 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1151     /* Wait until TQ_BUSY is reset before trying to move any
1152      * packets to the transmit queue.  */
1153     while (!call->error && call->flags & RX_CALL_TQ_BUSY) {
1154         call->flags |= RX_CALL_TQ_WAIT;
1155         call->tqWaiters++;
1156 #ifdef RX_ENABLE_LOCKS
1157         CV_WAIT(&call->cv_tq, &call->lock);
1158 #else /* RX_ENABLE_LOCKS */
1159         osi_rxSleep(&call->tq);
1160 #endif /* RX_ENABLE_LOCKS */
1161         call->tqWaiters--;
1162         if (call->tqWaiters == 0)
1163             call->flags &= ~RX_CALL_TQ_WAIT;
1164     }
1165 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1166     cp = call->currentPacket;
1167
1168     if (call->error) {
1169         call->mode = RX_MODE_ERROR;
1170         MUTEX_EXIT(&call->lock);
1171         if (cp) {
1172 #ifdef RX_TRACK_PACKETS
1173             cp->flags &= ~RX_PKTFLAG_CP;
1174             cp->flags |= RX_PKTFLAG_IOVQ;
1175 #endif
1176             queue_Prepend(&call->iovq, cp);
1177 #ifdef RXDEBUG_PACKET
1178             call->iovqc++;
1179 #endif /* RXDEBUG_PACKET */
1180         }
1181 #ifdef RXDEBUG_PACKET
1182         call->iovqc -=
1183 #endif /* RXDEBUG_PACKET */
1184             rxi_FreePackets(0, &call->iovq);
1185         return 0;
1186     }
1187
1188     /* Loop through the I/O vector adjusting packet pointers.
1189      * Place full packets back onto the iovq once they are ready
1190      * to send. Set RX_PROTOCOL_ERROR if any problems are found in
1191      * the iovec. We put the loop condition at the end to ensure that
1192      * a zero length write will push a short packet. */
1193     nextio = 0;
1194     queue_Init(&tmpq);
1195 #ifdef RXDEBUG_PACKET
1196     tmpqc = 0;
1197 #endif /* RXDEBUG_PACKET */
1198     do {
1199         if (call->nFree == 0 && cp) {
1200             clock_NewTime();    /* Bogus:  need new time package */
1201             /* The 0, below, specifies that it is not the last packet:
1202              * there will be others. PrepareSendPacket may
1203              * alter the packet length by up to
1204              * conn->securityMaxTrailerSize */
1205             hadd32(call->bytesSent, cp->length);
1206             rxi_PrepareSendPacket(call, cp, 0);
1207             queue_Append(&tmpq, cp);
1208 #ifdef RXDEBUG_PACKET
1209             tmpqc++;
1210 #endif /* RXDEBUG_PACKET */
1211             cp = call->currentPacket = (struct rx_packet *)0;
1212
1213             /* The head of the iovq is now the current packet */
1214             if (nbytes) {
1215                 if (queue_IsEmpty(&call->iovq)) {
1216                     MUTEX_EXIT(&call->lock);
1217                     call->error = RX_PROTOCOL_ERROR;
1218 #ifdef RXDEBUG_PACKET
1219                     tmpqc -=
1220 #endif /* RXDEBUG_PACKET */
1221                         rxi_FreePackets(0, &tmpq);
1222                     return 0;
1223                 }
1224                 cp = queue_First(&call->iovq, rx_packet);
1225                 queue_Remove(cp);
1226 #ifdef RX_TRACK_PACKETS
1227                 cp->flags &= ~RX_PKTFLAG_IOVQ;
1228 #endif
1229 #ifdef RXDEBUG_PACKET
1230                 call->iovqc--;
1231 #endif /* RXDEBUG_PACKET */
1232 #ifdef RX_TRACK_PACKETS
1233                 cp->flags |= RX_PKTFLAG_CP;
1234 #endif
1235                 call->currentPacket = cp;
1236                 call->nFree = cp->length;
1237                 call->curvec = 1;
1238                 call->curpos =
1239                     (char *)cp->wirevec[1].iov_base +
1240                     call->conn->securityHeaderSize;
1241                 call->curlen =
1242                     cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
1243             }
1244         }
1245
1246         if (nbytes) {
1247             /* The next iovec should point to the current position */
1248             if (iov[nextio].iov_base != call->curpos
1249                 || iov[nextio].iov_len > (int)call->curlen) {
1250                 call->error = RX_PROTOCOL_ERROR;
1251                 MUTEX_EXIT(&call->lock);
1252                 if (cp) {
1253 #ifdef RX_TRACK_PACKETS
1254                     cp->flags &= ~RX_PKTFLAG_CP;
1255 #endif
1256                     queue_Prepend(&tmpq, cp);
1257 #ifdef RXDEBUG_PACKET
1258                     tmpqc++;
1259 #endif /* RXDEBUG_PACKET */
1260                     cp = call->currentPacket = (struct rx_packet *)0;
1261                 }
1262 #ifdef RXDEBUG_PACKET
1263                 tmpqc -=
1264 #endif /* RXDEBUG_PACKET */
1265                     rxi_FreePackets(0, &tmpq);
1266                 return 0;
1267             }
1268             nbytes -= iov[nextio].iov_len;
1269             call->curpos += iov[nextio].iov_len;
1270             call->curlen -= iov[nextio].iov_len;
1271             call->nFree -= iov[nextio].iov_len;
1272             nextio++;
1273             if (call->curlen == 0) {
1274                 if (++call->curvec > cp->niovecs) {
1275                     call->nFree = 0;
1276                 } else {
1277                     call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
1278                     call->curlen = cp->wirevec[call->curvec].iov_len;
1279                 }
1280             }
1281         }
1282     } while (nbytes && nextio < nio);
1283
1284     /* Move the packets from the temporary queue onto the transmit queue.
1285      * We may end up with more than call->twind packets on the queue. */
1286
1287 #ifdef RX_TRACK_PACKETS
1288     for (queue_Scan(&tmpq, p, np, rx_packet))
1289     {
1290         p->flags |= RX_PKTFLAG_TQ;
1291     }
1292 #endif
1293
1294     if (call->error)
1295         call->mode = RX_MODE_ERROR;
1296
1297     queue_SpliceAppend(&call->tq, &tmpq);
1298
1299     if (!(call->flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
1300         rxi_Start(0, call, 0, 0);
1301     }
1302
1303     /* Wait for the length of the transmit queue to fall below call->twind */
1304     while (!call->error && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
1305         clock_NewTime();
1306         call->startWait = clock_Sec();
1307 #ifdef  RX_ENABLE_LOCKS
1308         CV_WAIT(&call->cv_twind, &call->lock);
1309 #else
1310         call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
1311         osi_rxSleep(&call->twind);
1312 #endif
1313         call->startWait = 0;
1314     }
1315
1316     /* cp is no longer valid since we may have given up the lock */
1317     cp = call->currentPacket;
1318
1319     if (call->error) {
1320         call->mode = RX_MODE_ERROR;
1321         call->currentPacket = NULL;
1322         MUTEX_EXIT(&call->lock);
1323         if (cp) {
1324 #ifdef RX_TRACK_PACKETS
1325             cp->flags &= ~RX_PKTFLAG_CP;
1326 #endif
1327             rxi_FreePacket(cp);
1328         }
1329         return 0;
1330     }
1331     MUTEX_EXIT(&call->lock);
1332
1333     return requestCount - nbytes;
1334 }
1335
1336 int
1337 rx_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1338 {
1339     int bytes;
1340     SPLVAR;
1341
1342     NETPRI;
1343     bytes = rxi_WritevProc(call, iov, nio, nbytes);
1344     USERPRI;
1345     return bytes;
1346 }
1347
1348 /* Flush any buffered data to the stream, switch to read mode
1349  * (clients) or to EOF mode (servers)
1350  *
1351  * LOCKS HELD: called at netpri.
1352  */
1353 void
1354 rxi_FlushWrite(struct rx_call *call)
1355 {
1356     struct rx_packet *cp = NULL;
1357
1358     /* Free any packets from the last call to ReadvProc/WritevProc */
1359     if (queue_IsNotEmpty(&call->iovq)) {
1360 #ifdef RXDEBUG_PACKET
1361         call->iovqc -=
1362 #endif /* RXDEBUG_PACKET */
1363             rxi_FreePackets(0, &call->iovq);
1364     }
1365
1366     if (call->mode == RX_MODE_SENDING) {
1367
1368         call->mode =
1369             (call->conn->type ==
1370              RX_CLIENT_CONNECTION ? RX_MODE_RECEIVING : RX_MODE_EOF);
1371
1372 #ifdef RX_KERNEL_TRACE
1373         {
1374             int glockOwner = ISAFS_GLOCK();
1375             if (!glockOwner)
1376                 AFS_GLOCK();
1377             afs_Trace3(afs_iclSetp, CM_TRACE_WASHERE, ICL_TYPE_STRING,
1378                        __FILE__, ICL_TYPE_INT32, __LINE__, ICL_TYPE_POINTER,
1379                        call);
1380             if (!glockOwner)
1381                 AFS_GUNLOCK();
1382         }
1383 #endif
1384
1385         MUTEX_ENTER(&call->lock);
1386 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1387         /* Wait until TQ_BUSY is reset before adding any
1388          * packets to the transmit queue
1389          */
1390         while (call->flags & RX_CALL_TQ_BUSY) {
1391             call->flags |= RX_CALL_TQ_WAIT;
1392             call->tqWaiters++;
1393 #ifdef RX_ENABLE_LOCKS
1394             CV_WAIT(&call->cv_tq, &call->lock);
1395 #else /* RX_ENABLE_LOCKS */
1396             osi_rxSleep(&call->tq);
1397 #endif /* RX_ENABLE_LOCKS */
1398             call->tqWaiters--;
1399             if (call->tqWaiters == 0)
1400                 call->flags &= ~RX_CALL_TQ_WAIT;
1401         }
1402 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1403
1404         if (call->error)
1405             call->mode = RX_MODE_ERROR;
1406
1407         cp = call->currentPacket;
1408
1409         if (cp) {
1410             /* cp->length is only supposed to be the user's data */
1411             /* cp->length was already set to (then-current)
1412              * MaxUserDataSize or less. */
1413 #ifdef RX_TRACK_PACKETS
1414             cp->flags &= ~RX_PKTFLAG_CP;
1415 #endif
1416             cp->length -= call->nFree;
1417             call->currentPacket = (struct rx_packet *)0;
1418             call->nFree = 0;
1419         } else {
1420             cp = rxi_AllocSendPacket(call, 0);
1421             if (!cp) {
1422                 /* Mode can no longer be MODE_SENDING */
1423                 return;
1424             }
1425             cp->length = 0;
1426             cp->niovecs = 2;    /* header + space for rxkad stuff */
1427             call->nFree = 0;
1428         }
1429
1430         /* The 1 specifies that this is the last packet */
1431         hadd32(call->bytesSent, cp->length);
1432         rxi_PrepareSendPacket(call, cp, 1);
1433 #ifdef RX_TRACK_PACKETS
1434         cp->flags |= RX_PKTFLAG_TQ;
1435 #endif
1436         queue_Append(&call->tq, cp);
1437 #ifdef RXDEBUG_PACKET
1438         call->tqc++;
1439 #endif /* RXDEBUG_PACKET */
1440         if (!
1441             (call->
1442              flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
1443             rxi_Start(0, call, 0, 0);
1444         }
1445         MUTEX_EXIT(&call->lock);
1446     }
1447 }
1448
1449 /* Flush any buffered data to the stream, switch to read mode
1450  * (clients) or to EOF mode (servers) */
1451 void
1452 rx_FlushWrite(struct rx_call *call)
1453 {
1454     SPLVAR;
1455     NETPRI;
1456     rxi_FlushWrite(call);
1457     USERPRI;
1458 }