1aee72d92f5eb5d6da5bbb3e6ae2cfa6875d5df5
[openafs.git] / src / rx / rx_rdwr.c
1  /*
2   * Copyright 2000, International Business Machines Corporation and others.
3   * All Rights Reserved.
4   *
5   * This software has been released under the terms of the IBM Public
6   * License.  For details, see the LICENSE file in the top-level source
7   * directory or online at http://www.openafs.org/dl/license10.html
8   */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #ifdef KERNEL
14 # ifndef UKERNEL
15 #  ifdef RX_KERNEL_TRACE
16 #   include "rx_kcommon.h"
17 #  endif
18 #  if defined(AFS_DARWIN_ENV) || defined(AFS_XBSD_ENV)
19 #   include "afs/sysincludes.h"
20 #  else
21 #   include "h/types.h"
22 #   include "h/time.h"
23 #   include "h/stat.h"
24 #   if defined(AFS_AIX_ENV) || defined(AFS_AUX_ENV) || defined(AFS_SUN5_ENV)
25 #    include "h/systm.h"
26 #   endif
27 #   ifdef       AFS_OSF_ENV
28 #    include <net/net_globals.h>
29 #   endif /* AFS_OSF_ENV */
30 #   ifdef AFS_LINUX20_ENV
31 #    include "h/socket.h"
32 #   endif
33 #   include "netinet/in.h"
34 #   if defined(AFS_SGI_ENV)
35 #    include "afs/sysincludes.h"
36 #   endif
37 #  endif
38 #  include "afs/afs_args.h"
39 #  if   (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
40 #   include "h/systm.h"
41 #  endif
42 # else /* !UKERNEL */
43 #  include "afs/sysincludes.h"
44 # endif /* !UKERNEL */
45
46 # ifdef RXDEBUG
47 #  undef RXDEBUG                        /* turn off debugging */
48 # endif /* RXDEBUG */
49
50 # include "afs/afs_osi.h"
51 # include "rx_kmutex.h"
52 # include "rx/rx_kernel.h"
53 # include "afs/lock.h"
54 #else /* KERNEL */
55 # include <roken.h>
56 # include <afs/opr.h>
57 #endif /* KERNEL */
58
59 #include "rx.h"
60 #include "rx_clock.h"
61 #include "rx_globals.h"
62 #include "rx_atomic.h"
63 #include "rx_internal.h"
64 #include "rx_conn.h"
65 #include "rx_call.h"
66 #include "rx_packet.h"
67
68 #ifdef RX_LOCKS_DB
69 /* rxdb_fileID is used to identify the lock location, along with line#. */
70 static int rxdb_fileID = RXDB_FILE_RX_RDWR;
71 #endif /* RX_LOCKS_DB */
72
73 /* Get the next packet in the receive queue
74  *
75  * Dispose of the call's currentPacket, and move the next packet in the
76  * receive queue into the currentPacket field. If the next packet isn't
77  * available, then currentPacket is left NULL.
78  *
79  * @param call
80  *      The RX call to manipulate
81  * @returns
82  *      0 on success, an error code on failure
83  *
84  * @notes
85  *      Must be called with the call locked. Unlocks the call if returning
86  *      with an error.
87  */
88
89 static int
90 rxi_GetNextPacket(struct rx_call *call) {
91     struct rx_packet *rp;
92     int error;
93
94     if (call->currentPacket != NULL) {
95 #ifdef RX_TRACK_PACKETS
96         call->currentPacket->flags |= RX_PKTFLAG_CP;
97 #endif
98         rxi_FreePacket(call->currentPacket);
99         call->currentPacket = NULL;
100     }
101
102     if (opr_queue_IsEmpty(&call->rq))
103         return 0;
104
105     /* Check that next packet available is next in sequence */
106     rp = opr_queue_First(&call->rq, struct rx_packet, entry);
107     if (rp->header.seq != call->rnext)
108         return 0;
109
110     opr_queue_Remove(&rp->entry);
111 #ifdef RX_TRACK_PACKETS
112     rp->flags &= ~RX_PKTFLAG_RQ;
113 #endif
114 #ifdef RXDEBUG_PACKET
115     call->rqc--;
116 #endif /* RXDEBUG_PACKET */
117
118     /* RXS_CheckPacket called to undo RXS_PreparePacket's work.  It may
119      * reduce the length of the packet by up to conn->maxTrailerSize,
120      * to reflect the length of the data + the header. */
121     if ((error = RXS_CheckPacket(call->conn->securityObject, call, rp))) {
122         /* Used to merely shut down the call, but now we shut down the whole
123          * connection since this may indicate an attempt to hijack it */
124
125         MUTEX_EXIT(&call->lock);
126         rxi_ConnectionError(call->conn, error);
127         MUTEX_ENTER(&call->conn->conn_data_lock);
128         rp = rxi_SendConnectionAbort(call->conn, rp, 0, 0);
129         MUTEX_EXIT(&call->conn->conn_data_lock);
130         rxi_FreePacket(rp);
131
132         return error;
133      }
134
135     call->rnext++;
136     call->currentPacket = rp;
137 #ifdef RX_TRACK_PACKETS
138     call->currentPacket->flags |= RX_PKTFLAG_CP;
139 #endif
140     call->curvec = 1;   /* 0th vec is always header */
141
142     /* begin at the beginning [ more or less ], continue on until the end,
143      * then stop. */
144     call->curpos = (char *)call->currentPacket->wirevec[1].iov_base +
145                    call->conn->securityHeaderSize;
146     call->curlen = call->currentPacket->wirevec[1].iov_len -
147                    call->conn->securityHeaderSize;
148
149     call->nLeft = call->currentPacket->length;
150     call->bytesRcvd += call->currentPacket->length;
151
152     call->nHardAcks++;
153
154     return 0;
155 }
156
157 /* rxi_ReadProc -- internal version.
158  *
159  * LOCKS USED -- called at netpri
160  */
161 int
162 rxi_ReadProc(struct rx_call *call, char *buf,
163              int nbytes)
164 {
165     int requestCount;
166     int code;
167     unsigned int t;
168
169 /* XXXX took out clock_NewTime from here.  Was it needed? */
170     requestCount = nbytes;
171
172     /* Free any packets from the last call to ReadvProc/WritevProc */
173     if (!opr_queue_IsEmpty(&call->iovq)) {
174 #ifdef RXDEBUG_PACKET
175         call->iovqc -=
176 #endif /* RXDEBUG_PACKET */
177             rxi_FreePackets(0, &call->iovq);
178     }
179
180     do {
181         if (call->nLeft == 0) {
182             /* Get next packet */
183             MUTEX_ENTER(&call->lock);
184             for (;;) {
185                 if (call->error || (call->mode != RX_MODE_RECEIVING)) {
186                     if (call->error) {
187                         call->mode = RX_MODE_ERROR;
188                         MUTEX_EXIT(&call->lock);
189                         return 0;
190                     }
191                     if (call->mode == RX_MODE_SENDING) {
192                         MUTEX_EXIT(&call->lock);
193                         rxi_FlushWrite(call);
194                         MUTEX_ENTER(&call->lock);
195                         continue;
196                     }
197                 }
198
199                 code = rxi_GetNextPacket(call);
200                 if (code)
201                      return 0;
202
203                 if (call->currentPacket) {
204                     if (!(call->flags & RX_CALL_RECEIVE_DONE)) {
205                         if (call->nHardAcks > (u_short) rxi_HardAckRate) {
206                             rxevent_Cancel(&call->delayedAckEvent, call,
207                                            RX_CALL_REFCOUNT_DELAY);
208                             rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
209                         } else {
210                             /* Delay to consolidate ack packets */
211                             rxi_PostDelayedAckEvent(call, &rx_hardAckDelay);
212                         }
213                     }
214                     break;
215                 }
216
217                 /*
218                  * If we reach this point either we have no packets in the
219                  * receive queue or the next packet in the queue is not the
220                  * one we are looking for.  There is nothing else for us to
221                  * do but wait for another packet to arrive.
222                  */
223
224                 /* Are there ever going to be any more packets? */
225                 if (call->flags & RX_CALL_RECEIVE_DONE) {
226                     MUTEX_EXIT(&call->lock);
227                     return requestCount - nbytes;
228                 }
229                 /* Wait for in-sequence packet */
230                 call->flags |= RX_CALL_READER_WAIT;
231                 clock_NewTime();
232                 call->startWait = clock_Sec();
233                 while (call->flags & RX_CALL_READER_WAIT) {
234 #ifdef  RX_ENABLE_LOCKS
235                     CV_WAIT(&call->cv_rq, &call->lock);
236 #else
237                     osi_rxSleep(&call->rq);
238 #endif
239                 }
240
241                 call->startWait = 0;
242 #ifdef RX_ENABLE_LOCKS
243                 if (call->error) {
244                     MUTEX_EXIT(&call->lock);
245                     return 0;
246                 }
247 #endif /* RX_ENABLE_LOCKS */
248             }
249             MUTEX_EXIT(&call->lock);
250         } else
251             /* osi_Assert(cp); */
252             /* MTUXXX  this should be replaced by some error-recovery code before shipping */
253             /* yes, the following block is allowed to be the ELSE clause (or not) */
254             /* It's possible for call->nLeft to be smaller than any particular
255              * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
256              * reflects the size of the buffer.  We have to keep track of the
257              * number of bytes read in the length field of the packet struct.  On
258              * the final portion of a received packet, it's almost certain that
259              * call->nLeft will be smaller than the final buffer. */
260             while (nbytes && call->currentPacket) {
261                 t = MIN((int)call->curlen, nbytes);
262                 t = MIN(t, (int)call->nLeft);
263                 memcpy(buf, call->curpos, t);
264                 buf += t;
265                 nbytes -= t;
266                 call->curpos += t;
267                 call->curlen -= t;
268                 call->nLeft -= t;
269
270                 if (!call->nLeft) {
271                     /* out of packet.  Get another one. */
272 #ifdef RX_TRACK_PACKETS
273                     call->currentPacket->flags &= ~RX_PKTFLAG_CP;
274 #endif
275                     rxi_FreePacket(call->currentPacket);
276                     call->currentPacket = NULL;
277                 } else if (!call->curlen) {
278                     /* need to get another struct iov */
279                     if (++call->curvec >= call->currentPacket->niovecs) {
280                         /* current packet is exhausted, get ready for another */
281                         /* don't worry about curvec and stuff, they get set somewhere else */
282 #ifdef RX_TRACK_PACKETS
283                         call->currentPacket->flags &= ~RX_PKTFLAG_CP;
284 #endif
285                         rxi_FreePacket(call->currentPacket);
286                         call->currentPacket = NULL;
287                         call->nLeft = 0;
288                     } else {
289                         call->curpos =
290                             call->currentPacket->wirevec[call->curvec].iov_base;
291                         call->curlen =
292                             call->currentPacket->wirevec[call->curvec].iov_len;
293                     }
294                 }
295             }
296         if (!nbytes) {
297             /* user buffer is full, return */
298             return requestCount;
299         }
300
301     } while (nbytes);
302
303     return requestCount;
304 }
305
306 int
307 rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
308 {
309     int bytes;
310     SPLVAR;
311
312     /* Free any packets from the last call to ReadvProc/WritevProc */
313     if (!opr_queue_IsEmpty(&call->iovq)) {
314 #ifdef RXDEBUG_PACKET
315         call->iovqc -=
316 #endif /* RXDEBUG_PACKET */
317             rxi_FreePackets(0, &call->iovq);
318     }
319
320     /*
321      * Most common case, all of the data is in the current iovec.
322      * We are relying on nLeft being zero unless the call is in receive mode.
323      */
324     if (!call->error && call->curlen > nbytes && call->nLeft > nbytes) {
325         memcpy(buf, call->curpos, nbytes);
326
327         call->curpos += nbytes;
328         call->curlen -= nbytes;
329         call->nLeft  -= nbytes;
330
331         if (!call->nLeft && call->currentPacket != NULL) {
332             /* out of packet.  Get another one. */
333             rxi_FreePacket(call->currentPacket);
334             call->currentPacket = (struct rx_packet *)0;
335         }
336         return nbytes;
337     }
338
339     NETPRI;
340     bytes = rxi_ReadProc(call, buf, nbytes);
341     USERPRI;
342     return bytes;
343 }
344
345 /* Optimization for unmarshalling 32 bit integers */
346 int
347 rx_ReadProc32(struct rx_call *call, afs_int32 * value)
348 {
349     int bytes;
350     SPLVAR;
351
352     /* Free any packets from the last call to ReadvProc/WritevProc */
353     if (!opr_queue_IsEmpty(&call->iovq)) {
354 #ifdef RXDEBUG_PACKET
355         call->iovqc -=
356 #endif /* RXDEBUG_PACKET */
357             rxi_FreePackets(0, &call->iovq);
358     }
359
360     /*
361      * Most common case, all of the data is in the current iovec.
362      * We are relying on nLeft being zero unless the call is in receive mode.
363      */
364     if (!call->error && call->curlen >= sizeof(afs_int32)
365         && call->nLeft >= sizeof(afs_int32)) {
366
367         memcpy((char *)value, call->curpos, sizeof(afs_int32));
368
369         call->curpos += sizeof(afs_int32);
370         call->curlen -= sizeof(afs_int32);
371         call->nLeft  -= sizeof(afs_int32);
372
373         if (!call->nLeft && call->currentPacket != NULL) {
374             /* out of packet.  Get another one. */
375             rxi_FreePacket(call->currentPacket);
376             call->currentPacket = (struct rx_packet *)0;
377         }
378         return sizeof(afs_int32);
379     }
380
381     NETPRI;
382     bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
383     USERPRI;
384
385     return bytes;
386 }
387
388 /* rxi_FillReadVec
389  *
390  * Uses packets in the receive queue to fill in as much of the
391  * current iovec as possible. Does not block if it runs out
392  * of packets to complete the iovec. Return true if an ack packet
393  * was sent, otherwise return false */
394 int
395 rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
396 {
397     int didConsume = 0;
398     int didHardAck = 0;
399     int code;
400     unsigned int t;
401     struct iovec *call_iov;
402     struct iovec *cur_iov = NULL;
403
404     if (call->currentPacket) {
405         cur_iov = &call->currentPacket->wirevec[call->curvec];
406     }
407     call_iov = &call->iov[call->iovNext];
408
409     while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) {
410         if (call->nLeft == 0) {
411             /* Get next packet */
412             code = rxi_GetNextPacket(call);
413             if (code) {
414                 MUTEX_ENTER(&call->lock);
415                 return 1;
416             }
417
418             if (call->currentPacket) {
419                 cur_iov = &call->currentPacket->wirevec[1];
420                 didConsume = 1;
421                 continue;
422             } else {
423                 break;
424             }
425         }
426
427         /* It's possible for call->nLeft to be smaller than any particular
428          * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
429          * reflects the size of the buffer.  We have to keep track of the
430          * number of bytes read in the length field of the packet struct.  On
431          * the final portion of a received packet, it's almost certain that
432          * call->nLeft will be smaller than the final buffer. */
433         while (call->iovNBytes
434                && call->iovNext < call->iovMax
435                && call->currentPacket) {
436
437             t = MIN((int)call->curlen, call->iovNBytes);
438             t = MIN(t, (int)call->nLeft);
439             call_iov->iov_base = call->curpos;
440             call_iov->iov_len = t;
441             call_iov++;
442             call->iovNext++;
443             call->iovNBytes -= t;
444             call->curpos += t;
445             call->curlen -= t;
446             call->nLeft -= t;
447
448             if (!call->nLeft) {
449                 /* out of packet.  Get another one. */
450 #ifdef RX_TRACK_PACKETS
451                 call->currentPacket->flags &= ~RX_PKTFLAG_CP;
452                 call->currentPacket->flags |= RX_PKTFLAG_IOVQ;
453 #endif
454                 opr_queue_Append(&call->iovq, &call->currentPacket->entry);
455 #ifdef RXDEBUG_PACKET
456                 call->iovqc++;
457 #endif /* RXDEBUG_PACKET */
458                 call->currentPacket = NULL;
459             } else if (!call->curlen) {
460                 /* need to get another struct iov */
461                 if (++call->curvec >= call->currentPacket->niovecs) {
462                     /* current packet is exhausted, get ready for another */
463                     /* don't worry about curvec and stuff, they get set somewhere else */
464 #ifdef RX_TRACK_PACKETS
465                     call->currentPacket->flags &= ~RX_PKTFLAG_CP;
466                     call->currentPacket->flags |= RX_PKTFLAG_IOVQ;
467 #endif
468                     opr_queue_Append(&call->iovq, &call->currentPacket->entry);
469 #ifdef RXDEBUG_PACKET
470                     call->iovqc++;
471 #endif /* RXDEBUG_PACKET */
472                     call->currentPacket = NULL;
473                     call->nLeft = 0;
474                 } else {
475                     cur_iov++;
476                     call->curpos = (char *)cur_iov->iov_base;
477                     call->curlen = cur_iov->iov_len;
478                 }
479             }
480         }
481     }
482
483     /* If we consumed any packets then check whether we need to
484      * send a hard ack. */
485     if (didConsume && (!(call->flags & RX_CALL_RECEIVE_DONE))) {
486         if (call->nHardAcks > (u_short) rxi_HardAckRate) {
487             rxevent_Cancel(&call->delayedAckEvent, call,
488                            RX_CALL_REFCOUNT_DELAY);
489             rxi_SendAck(call, 0, serial, RX_ACK_DELAY, 0);
490             didHardAck = 1;
491         } else {
492             /* Delay to consolidate ack packets */
493             rxi_PostDelayedAckEvent(call, &rx_hardAckDelay);
494         }
495     }
496     return didHardAck;
497 }
498
499
500 /* rxi_ReadvProc -- internal version.
501  *
502  * Fills in an iovec with pointers to the packet buffers. All packets
503  * except the last packet (new current packet) are moved to the iovq
504  * while the application is processing the data.
505  *
506  * LOCKS USED -- called at netpri.
507  */
508 int
509 rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
510               int nbytes)
511 {
512     int bytes;
513
514     /* Free any packets from the last call to ReadvProc/WritevProc */
515     if (!opr_queue_IsEmpty(&call->iovq)) {
516 #ifdef RXDEBUG_PACKET
517         call->iovqc -=
518 #endif /* RXDEBUG_PACKET */
519             rxi_FreePackets(0, &call->iovq);
520     }
521
522     if (call->mode == RX_MODE_SENDING) {
523         rxi_FlushWrite(call);
524     }
525
526     MUTEX_ENTER(&call->lock);
527     if (call->error)
528         goto error;
529
530     /* Get whatever data is currently available in the receive queue.
531      * If rxi_FillReadVec sends an ack packet then it is possible
532      * that we will receive more data while we drop the call lock
533      * to send the packet. Set the RX_CALL_IOVEC_WAIT flag
534      * here to avoid a race with the receive thread if we send
535      * hard acks in rxi_FillReadVec. */
536     call->flags |= RX_CALL_IOVEC_WAIT;
537     call->iovNBytes = nbytes;
538     call->iovMax = maxio;
539     call->iovNext = 0;
540     call->iov = iov;
541     rxi_FillReadVec(call, 0);
542
543     /* if we need more data then sleep until the receive thread has
544      * filled in the rest. */
545     if (!call->error && call->iovNBytes && call->iovNext < call->iovMax
546         && !(call->flags & RX_CALL_RECEIVE_DONE)) {
547         call->flags |= RX_CALL_READER_WAIT;
548         clock_NewTime();
549         call->startWait = clock_Sec();
550         while (call->flags & RX_CALL_READER_WAIT) {
551 #ifdef  RX_ENABLE_LOCKS
552             CV_WAIT(&call->cv_rq, &call->lock);
553 #else
554             osi_rxSleep(&call->rq);
555 #endif
556         }
557         call->startWait = 0;
558     }
559     call->flags &= ~RX_CALL_IOVEC_WAIT;
560
561     if (call->error)
562         goto error;
563
564     call->iov = NULL;
565     *nio = call->iovNext;
566     bytes = nbytes - call->iovNBytes;
567     MUTEX_EXIT(&call->lock);
568     return bytes;
569
570   error:
571     MUTEX_EXIT(&call->lock);
572     call->mode = RX_MODE_ERROR;
573     return 0;
574 }
575
576 int
577 rx_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
578              int nbytes)
579 {
580     int bytes;
581     SPLVAR;
582
583     NETPRI;
584     bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
585     USERPRI;
586     return bytes;
587 }
588
589 /* rxi_WriteProc -- internal version.
590  *
591  * LOCKS USED -- called at netpri
592  */
593
594 int
595 rxi_WriteProc(struct rx_call *call, char *buf,
596               int nbytes)
597 {
598     struct rx_connection *conn = call->conn;
599     unsigned int t;
600     int requestCount = nbytes;
601
602     /* Free any packets from the last call to ReadvProc/WritevProc */
603     if (!opr_queue_IsEmpty(&call->iovq)) {
604 #ifdef RXDEBUG_PACKET
605         call->iovqc -=
606 #endif /* RXDEBUG_PACKET */
607             rxi_FreePackets(0, &call->iovq);
608     }
609
610     if (call->mode != RX_MODE_SENDING) {
611         if ((conn->type == RX_SERVER_CONNECTION)
612             && (call->mode == RX_MODE_RECEIVING)) {
613             call->mode = RX_MODE_SENDING;
614             if (call->currentPacket) {
615 #ifdef RX_TRACK_PACKETS
616                 call->currentPacket->flags &= ~RX_PKTFLAG_CP;
617 #endif
618                 rxi_FreePacket(call->currentPacket);
619                 call->currentPacket = NULL;
620                 call->nLeft = 0;
621                 call->nFree = 0;
622             }
623         } else {
624             return 0;
625         }
626     }
627
628     /* Loop condition is checked at end, so that a write of 0 bytes
629      * will force a packet to be created--specially for the case where
630      * there are 0 bytes on the stream, but we must send a packet
631      * anyway. */
632     do {
633         if (call->nFree == 0) {
634             MUTEX_ENTER(&call->lock);
635             if (call->error)
636                 call->mode = RX_MODE_ERROR;
637             if (!call->error && call->currentPacket) {
638                 clock_NewTime();        /* Bogus:  need new time package */
639                 /* The 0, below, specifies that it is not the last packet:
640                  * there will be others. PrepareSendPacket may
641                  * alter the packet length by up to
642                  * conn->securityMaxTrailerSize */
643                 call->bytesSent += call->currentPacket->length;
644                 rxi_PrepareSendPacket(call, call->currentPacket, 0);
645 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
646                 /* PrepareSendPacket drops the call lock */
647                 rxi_WaitforTQBusy(call);
648 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
649 #ifdef RX_TRACK_PACKETS
650                 call->currentPacket->flags |= RX_PKTFLAG_TQ;
651 #endif
652                 opr_queue_Append(&call->tq, &call->currentPacket->entry);
653 #ifdef RXDEBUG_PACKET
654                 call->tqc++;
655 #endif /* RXDEBUG_PACKET */
656 #ifdef RX_TRACK_PACKETS
657                 call->currentPacket->flags &= ~RX_PKTFLAG_CP;
658 #endif
659                 call->currentPacket = NULL;
660
661                 /* If the call is in recovery, let it exhaust its current
662                  * retransmit queue before forcing it to send new packets
663                  */
664                 if (!(call->flags & (RX_CALL_FAST_RECOVER))) {
665                     rxi_Start(call, 0);
666                 }
667             } else if (call->currentPacket) {
668 #ifdef RX_TRACK_PACKETS
669                 call->currentPacket->flags &= ~RX_PKTFLAG_CP;
670 #endif
671                 rxi_FreePacket(call->currentPacket);
672                 call->currentPacket = NULL;
673             }
674             /* Wait for transmit window to open up */
675             while (!call->error
676                    && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
677                 clock_NewTime();
678                 call->startWait = clock_Sec();
679
680 #ifdef  RX_ENABLE_LOCKS
681                 CV_WAIT(&call->cv_twind, &call->lock);
682 #else
683                 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
684                 osi_rxSleep(&call->twind);
685 #endif
686
687                 call->startWait = 0;
688 #ifdef RX_ENABLE_LOCKS
689                 if (call->error) {
690                     call->mode = RX_MODE_ERROR;
691                     MUTEX_EXIT(&call->lock);
692                     return 0;
693                 }
694 #endif /* RX_ENABLE_LOCKS */
695             }
696             if ((call->currentPacket = rxi_AllocSendPacket(call, nbytes))) {
697 #ifdef RX_TRACK_PACKETS
698                 call->currentPacket->flags |= RX_PKTFLAG_CP;
699 #endif
700                 call->nFree = call->currentPacket->length;
701                 call->curvec = 1;       /* 0th vec is always header */
702                 /* begin at the beginning [ more or less ], continue
703                  * on until the end, then stop. */
704                 call->curpos =
705                     (char *) call->currentPacket->wirevec[1].iov_base +
706                     call->conn->securityHeaderSize;
707                 call->curlen =
708                     call->currentPacket->wirevec[1].iov_len -
709                     call->conn->securityHeaderSize;
710             }
711             if (call->error) {
712                 call->mode = RX_MODE_ERROR;
713                 if (call->currentPacket) {
714 #ifdef RX_TRACK_PACKETS
715                     call->currentPacket->flags &= ~RX_PKTFLAG_CP;
716 #endif
717                     rxi_FreePacket(call->currentPacket);
718                     call->currentPacket = NULL;
719                 }
720                 MUTEX_EXIT(&call->lock);
721                 return 0;
722             }
723             MUTEX_EXIT(&call->lock);
724         }
725
726         if (call->currentPacket && (int)call->nFree < nbytes) {
727             /* Try to extend the current buffer */
728             int len, mud;
729             len = call->currentPacket->length;
730             mud = rx_MaxUserDataSize(call);
731             if (mud > len) {
732                 int want;
733                 want = MIN(nbytes - (int)call->nFree, mud - len);
734                 rxi_AllocDataBuf(call->currentPacket, want,
735                                  RX_PACKET_CLASS_SEND_CBUF);
736                 if (call->currentPacket->length > (unsigned)mud)
737                     call->currentPacket->length = mud;
738                 call->nFree += (call->currentPacket->length - len);
739             }
740         }
741
742         /* If the remaining bytes fit in the buffer, then store them
743          * and return.  Don't ship a buffer that's full immediately to
744          * the peer--we don't know if it's the last buffer yet */
745
746         if (!call->currentPacket) {
747             call->nFree = 0;
748         }
749
750         while (nbytes && call->nFree) {
751
752             t = MIN((int)call->curlen, nbytes);
753             t = MIN((int)call->nFree, t);
754             memcpy(call->curpos, buf, t);
755             buf += t;
756             nbytes -= t;
757             call->curpos += t;
758             call->curlen -= (u_short)t;
759             call->nFree -= (u_short)t;
760
761             if (!call->curlen) {
762                 /* need to get another struct iov */
763                 if (++call->curvec >= call->currentPacket->niovecs) {
764                     /* current packet is full, extend or send it */
765                     call->nFree = 0;
766                 } else {
767                     call->curpos =
768                         call->currentPacket->wirevec[call->curvec].iov_base;
769                     call->curlen =
770                         call->currentPacket->wirevec[call->curvec].iov_len;
771                 }
772             }
773         }                       /* while bytes to send and room to send them */
774
775         /* might be out of space now */
776         if (!nbytes) {
777             return requestCount;
778         } else;                 /* more data to send, so get another packet and keep going */
779     } while (nbytes);
780
781     return requestCount - nbytes;
782 }
783
784 int
785 rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
786 {
787     int bytes;
788     int tcurlen;
789     int tnFree;
790     char *tcurpos;
791     SPLVAR;
792
793     /* Free any packets from the last call to ReadvProc/WritevProc */
794     if (!opr_queue_IsEmpty(&call->iovq)) {
795 #ifdef RXDEBUG_PACKET
796         call->iovqc -=
797 #endif /* RXDEBUG_PACKET */
798             rxi_FreePackets(0, &call->iovq);
799     }
800
801     /*
802      * Most common case: all of the data fits in the current iovec.
803      * We are relying on nFree being zero unless the call is in send mode.
804      */
805     tcurlen = (int)call->curlen;
806     tnFree = (int)call->nFree;
807     if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
808         tcurpos = call->curpos;
809
810         memcpy(tcurpos, buf, nbytes);
811         call->curpos = tcurpos + nbytes;
812         call->curlen = (u_short)(tcurlen - nbytes);
813         call->nFree = (u_short)(tnFree - nbytes);
814         return nbytes;
815     }
816
817     NETPRI;
818     bytes = rxi_WriteProc(call, buf, nbytes);
819     USERPRI;
820     return bytes;
821 }
822
823 /* Optimization for marshalling 32 bit arguments */
824 int
825 rx_WriteProc32(struct rx_call *call, afs_int32 * value)
826 {
827     int bytes;
828     int tcurlen;
829     int tnFree;
830     char *tcurpos;
831     SPLVAR;
832
833     if (!opr_queue_IsEmpty(&call->iovq)) {
834 #ifdef RXDEBUG_PACKET
835         call->iovqc -=
836 #endif /* RXDEBUG_PACKET */
837             rxi_FreePackets(0, &call->iovq);
838     }
839
840     /*
841      * Most common case: all of the data fits in the current iovec.
842      * We are relying on nFree being zero unless the call is in send mode.
843      */
844     tcurlen = call->curlen;
845     tnFree = call->nFree;
846     if (!call->error && tcurlen >= sizeof(afs_int32)
847         && tnFree >= sizeof(afs_int32)) {
848         tcurpos = call->curpos;
849
850         if (!((size_t)tcurpos & (sizeof(afs_int32) - 1))) {
851             *((afs_int32 *) (tcurpos)) = *value;
852         } else {
853             memcpy(tcurpos, (char *)value, sizeof(afs_int32));
854         }
855         call->curpos = tcurpos + sizeof(afs_int32);
856         call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
857         call->nFree = (u_short)(tnFree - sizeof(afs_int32));
858         return sizeof(afs_int32);
859     }
860
861     NETPRI;
862     bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
863     USERPRI;
864     return bytes;
865 }
866
867 /* rxi_WritevAlloc -- internal version.
868  *
869  * Fill in an iovec to point to data in packet buffers. The application
870  * calls rxi_WritevProc when the buffers are full.
871  *
872  * LOCKS USED -- called at netpri.
873  */
874
875 static int
876 rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
877                 int nbytes)
878 {
879     struct rx_connection *conn = call->conn;
880     struct rx_packet *cp;
881     int requestCount;
882     int nextio;
883     /* Temporary values, real work is done in rxi_WritevProc */
884     int tnFree;
885     unsigned int tcurvec;
886     char *tcurpos;
887     int tcurlen;
888
889     requestCount = nbytes;
890     nextio = 0;
891
892     /* Free any packets from the last call to ReadvProc/WritevProc */
893     if (!opr_queue_IsEmpty(&call->iovq)) {
894 #ifdef RXDEBUG_PACKET
895         call->iovqc -=
896 #endif /* RXDEBUG_PACKET */
897             rxi_FreePackets(0, &call->iovq);
898     }
899
900     if (call->mode != RX_MODE_SENDING) {
901         if ((conn->type == RX_SERVER_CONNECTION)
902             && (call->mode == RX_MODE_RECEIVING)) {
903             call->mode = RX_MODE_SENDING;
904             if (call->currentPacket) {
905 #ifdef RX_TRACK_PACKETS
906                 call->currentPacket->flags &= ~RX_PKTFLAG_CP;
907 #endif
908                 rxi_FreePacket(call->currentPacket);
909                 call->currentPacket = NULL;
910                 call->nLeft = 0;
911                 call->nFree = 0;
912             }
913         } else {
914             return 0;
915         }
916     }
917
918     /* Set up the iovec to point to data in packet buffers. */
919     tnFree = call->nFree;
920     tcurvec = call->curvec;
921     tcurpos = call->curpos;
922     tcurlen = call->curlen;
923     cp = call->currentPacket;
924     do {
925         int t;
926
927         if (tnFree == 0) {
928             /* current packet is full, allocate a new one */
929             MUTEX_ENTER(&call->lock);
930             cp = rxi_AllocSendPacket(call, nbytes);
931             MUTEX_EXIT(&call->lock);
932             if (cp == NULL) {
933                 /* out of space, return what we have */
934                 *nio = nextio;
935                 return requestCount - nbytes;
936             }
937 #ifdef RX_TRACK_PACKETS
938             cp->flags |= RX_PKTFLAG_IOVQ;
939 #endif
940             opr_queue_Append(&call->iovq, &cp->entry);
941 #ifdef RXDEBUG_PACKET
942             call->iovqc++;
943 #endif /* RXDEBUG_PACKET */
944             tnFree = cp->length;
945             tcurvec = 1;
946             tcurpos =
947                 (char *)cp->wirevec[1].iov_base +
948                 call->conn->securityHeaderSize;
949             tcurlen = cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
950         }
951
952         if (tnFree < nbytes) {
953             /* try to extend the current packet */
954             int len, mud;
955             len = cp->length;
956             mud = rx_MaxUserDataSize(call);
957             if (mud > len) {
958                 int want;
959                 want = MIN(nbytes - tnFree, mud - len);
960                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
961                 if (cp->length > (unsigned)mud)
962                     cp->length = mud;
963                 tnFree += (cp->length - len);
964                 if (cp == call->currentPacket) {
965                     call->nFree += (cp->length - len);
966                 }
967             }
968         }
969
970         /* fill in the next entry in the iovec */
971         t = MIN(tcurlen, nbytes);
972         t = MIN(tnFree, t);
973         iov[nextio].iov_base = tcurpos;
974         iov[nextio].iov_len = t;
975         nbytes -= t;
976         tcurpos += t;
977         tcurlen -= t;
978         tnFree -= t;
979         nextio++;
980
981         if (!tcurlen) {
982             /* need to get another struct iov */
983             if (++tcurvec >= cp->niovecs) {
984                 /* current packet is full, extend it or move on to next packet */
985                 tnFree = 0;
986             } else {
987                 tcurpos = (char *)cp->wirevec[tcurvec].iov_base;
988                 tcurlen = cp->wirevec[tcurvec].iov_len;
989             }
990         }
991     } while (nbytes && nextio < maxio);
992     *nio = nextio;
993     return requestCount - nbytes;
994 }
995
996 int
997 rx_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
998                int nbytes)
999 {
1000     int bytes;
1001     SPLVAR;
1002
1003     NETPRI;
1004     bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
1005     USERPRI;
1006     return bytes;
1007 }
1008
1009 /* rxi_WritevProc -- internal version.
1010  *
1011  * Send buffers allocated in rxi_WritevAlloc.
1012  *
1013  * LOCKS USED -- called at netpri.
1014  */
1015 int
1016 rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1017 {
1018 #ifdef RX_TRACK_PACKETS
1019     struct opr_queue *cursor;
1020 #endif
1021     int nextio;
1022     int requestCount;
1023     struct opr_queue tmpq;
1024 #ifdef RXDEBUG_PACKET
1025     u_short tmpqc;
1026 #endif
1027
1028     requestCount = nbytes;
1029     nextio = 0;
1030
1031     MUTEX_ENTER(&call->lock);
1032     if (call->error) {
1033         call->mode = RX_MODE_ERROR;
1034     } else if (call->mode != RX_MODE_SENDING) {
1035         call->error = RX_PROTOCOL_ERROR;
1036     }
1037 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1038     rxi_WaitforTQBusy(call);
1039 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1040
1041     if (call->error) {
1042         call->mode = RX_MODE_ERROR;
1043         MUTEX_EXIT(&call->lock);
1044         if (call->currentPacket) {
1045 #ifdef RX_TRACK_PACKETS
1046             call->currentPacket->flags &= ~RX_PKTFLAG_CP;
1047             call->currentPacket->flags |= RX_PKTFLAG_IOVQ;
1048 #endif
1049             opr_queue_Prepend(&call->iovq, &call->currentPacket->entry);
1050 #ifdef RXDEBUG_PACKET
1051             call->iovqc++;
1052 #endif /* RXDEBUG_PACKET */
1053             call->currentPacket = NULL;
1054         }
1055 #ifdef RXDEBUG_PACKET
1056         call->iovqc -=
1057 #endif /* RXDEBUG_PACKET */
1058             rxi_FreePackets(0, &call->iovq);
1059         return 0;
1060     }
1061
1062     /* Loop through the I/O vector adjusting packet pointers.
1063      * Place full packets back onto the iovq once they are ready
1064      * to send. Set RX_PROTOCOL_ERROR if any problems are found in
1065      * the iovec. We put the loop condition at the end to ensure that
1066      * a zero length write will push a short packet. */
1067     nextio = 0;
1068     opr_queue_Init(&tmpq);
1069 #ifdef RXDEBUG_PACKET
1070     tmpqc = 0;
1071 #endif /* RXDEBUG_PACKET */
1072     do {
1073         if (call->nFree == 0 && call->currentPacket) {
1074             clock_NewTime();    /* Bogus:  need new time package */
1075             /* The 0, below, specifies that it is not the last packet:
1076              * there will be others. PrepareSendPacket may
1077              * alter the packet length by up to
1078              * conn->securityMaxTrailerSize */
1079             call->bytesSent += call->currentPacket->length;
1080             rxi_PrepareSendPacket(call, call->currentPacket, 0);
1081 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1082             /* PrepareSendPacket drops the call lock */
1083             rxi_WaitforTQBusy(call);
1084 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1085             opr_queue_Append(&tmpq, &call->currentPacket->entry);
1086 #ifdef RXDEBUG_PACKET
1087             tmpqc++;
1088 #endif /* RXDEBUG_PACKET */
1089             call->currentPacket = NULL;
1090
1091             /* The head of the iovq is now the current packet */
1092             if (nbytes) {
1093                 if (opr_queue_IsEmpty(&call->iovq)) {
1094                     MUTEX_EXIT(&call->lock);
1095                     call->error = RX_PROTOCOL_ERROR;
1096 #ifdef RXDEBUG_PACKET
1097                     tmpqc -=
1098 #endif /* RXDEBUG_PACKET */
1099                         rxi_FreePackets(0, &tmpq);
1100                     return 0;
1101                 }
1102                 call->currentPacket = opr_queue_First(&call->iovq,
1103                                                       struct rx_packet,
1104                                                       entry);
1105                 opr_queue_Remove(&call->currentPacket->entry);
1106 #ifdef RX_TRACK_PACKETS
1107                 call->currentPacket->flags &= ~RX_PKTFLAG_IOVQ;
1108                 call->currentPacket->flags |= RX_PKTFLAG_CP;
1109 #endif
1110 #ifdef RXDEBUG_PACKET
1111                 call->iovqc--;
1112 #endif /* RXDEBUG_PACKET */
1113                 call->nFree = call->currentPacket->length;
1114                 call->curvec = 1;
1115                 call->curpos =
1116                     (char *) call->currentPacket->wirevec[1].iov_base +
1117                     call->conn->securityHeaderSize;
1118                 call->curlen =
1119                     call->currentPacket->wirevec[1].iov_len -
1120                     call->conn->securityHeaderSize;
1121             }
1122         }
1123
1124         if (nbytes) {
1125             /* The next iovec should point to the current position */
1126             if (iov[nextio].iov_base != call->curpos
1127                 || iov[nextio].iov_len > (int)call->curlen) {
1128                 call->error = RX_PROTOCOL_ERROR;
1129                 MUTEX_EXIT(&call->lock);
1130                 if (call->currentPacket) {
1131 #ifdef RX_TRACK_PACKETS
1132                     call->currentPacket->flags &= ~RX_PKTFLAG_CP;
1133 #endif
1134                     opr_queue_Prepend(&tmpq, &call->currentPacket->entry);
1135 #ifdef RXDEBUG_PACKET
1136                     tmpqc++;
1137 #endif /* RXDEBUG_PACKET */
1138                     call->currentPacket = NULL;
1139                 }
1140 #ifdef RXDEBUG_PACKET
1141                 tmpqc -=
1142 #endif /* RXDEBUG_PACKET */
1143                     rxi_FreePackets(0, &tmpq);
1144                 return 0;
1145             }
1146             nbytes -= iov[nextio].iov_len;
1147             call->curpos += iov[nextio].iov_len;
1148             call->curlen -= iov[nextio].iov_len;
1149             call->nFree -= iov[nextio].iov_len;
1150             nextio++;
1151             if (call->curlen == 0) {
1152                 if (++call->curvec > call->currentPacket->niovecs) {
1153                     call->nFree = 0;
1154                 } else {
1155                     call->curpos =
1156                         call->currentPacket->wirevec[call->curvec].iov_base;
1157                     call->curlen =
1158                         call->currentPacket->wirevec[call->curvec].iov_len;
1159                 }
1160             }
1161         }
1162     } while (nbytes && nextio < nio);
1163
1164     /* Move the packets from the temporary queue onto the transmit queue.
1165      * We may end up with more than call->twind packets on the queue. */
1166
1167 #ifdef RX_TRACK_PACKETS
1168     for (opr_queue_Scan(&tmpq, cursor))
1169     {
1170         struct rx_packet *p = opr_queue_Entry(cursor, struct rx_packet, entry);
1171         p->flags |= RX_PKTFLAG_TQ;
1172     }
1173 #endif
1174     if (call->error)
1175         call->mode = RX_MODE_ERROR;
1176
1177     opr_queue_SpliceAppend(&call->tq, &tmpq);
1178
1179     /* If the call is in recovery, let it exhaust its current retransmit
1180      * queue before forcing it to send new packets
1181      */
1182     if (!(call->flags & RX_CALL_FAST_RECOVER)) {
1183         rxi_Start(call, 0);
1184     }
1185
1186     /* Wait for the length of the transmit queue to fall below call->twind */
1187     while (!call->error && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
1188         clock_NewTime();
1189         call->startWait = clock_Sec();
1190 #ifdef  RX_ENABLE_LOCKS
1191         CV_WAIT(&call->cv_twind, &call->lock);
1192 #else
1193         call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
1194         osi_rxSleep(&call->twind);
1195 #endif
1196         call->startWait = 0;
1197     }
1198
1199     if (call->error) {
1200         call->mode = RX_MODE_ERROR;
1201         call->currentPacket = NULL;
1202         MUTEX_EXIT(&call->lock);
1203         if (call->currentPacket) {
1204 #ifdef RX_TRACK_PACKETS
1205             call->currentPacket->flags &= ~RX_PKTFLAG_CP;
1206 #endif
1207             rxi_FreePacket(call->currentPacket);
1208         }
1209         return 0;
1210     }
1211     MUTEX_EXIT(&call->lock);
1212
1213     return requestCount - nbytes;
1214 }
1215
1216 int
1217 rx_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1218 {
1219     int bytes;
1220     SPLVAR;
1221
1222     NETPRI;
1223     bytes = rxi_WritevProc(call, iov, nio, nbytes);
1224     USERPRI;
1225     return bytes;
1226 }
1227
1228 /* Flush any buffered data to the stream, switch to read mode
1229  * (clients) or to EOF mode (servers)
1230  *
1231  * LOCKS HELD: called at netpri.
1232  */
1233 void
1234 rxi_FlushWrite(struct rx_call *call)
1235 {
1236     struct rx_packet *cp = NULL;
1237
1238     /* Free any packets from the last call to ReadvProc/WritevProc */
1239     if (!opr_queue_IsEmpty(&call->iovq)) {
1240 #ifdef RXDEBUG_PACKET
1241         call->iovqc -=
1242 #endif /* RXDEBUG_PACKET */
1243             rxi_FreePackets(0, &call->iovq);
1244     }
1245
1246     if (call->mode == RX_MODE_SENDING) {
1247
1248         call->mode =
1249             (call->conn->type ==
1250              RX_CLIENT_CONNECTION ? RX_MODE_RECEIVING : RX_MODE_EOF);
1251
1252 #ifdef RX_KERNEL_TRACE
1253         {
1254             int glockOwner = ISAFS_GLOCK();
1255             if (!glockOwner)
1256                 AFS_GLOCK();
1257             afs_Trace3(afs_iclSetp, CM_TRACE_WASHERE, ICL_TYPE_STRING,
1258                        __FILE__, ICL_TYPE_INT32, __LINE__, ICL_TYPE_POINTER,
1259                        call);
1260             if (!glockOwner)
1261                 AFS_GUNLOCK();
1262         }
1263 #endif
1264
1265         MUTEX_ENTER(&call->lock);
1266         if (call->error)
1267             call->mode = RX_MODE_ERROR;
1268
1269         cp = call->currentPacket;
1270
1271         if (cp) {
1272             /* cp->length is only supposed to be the user's data */
1273             /* cp->length was already set to (then-current)
1274              * MaxUserDataSize or less. */
1275 #ifdef RX_TRACK_PACKETS
1276             cp->flags &= ~RX_PKTFLAG_CP;
1277 #endif
1278             cp->length -= call->nFree;
1279             call->currentPacket = (struct rx_packet *)0;
1280             call->nFree = 0;
1281         } else {
1282             cp = rxi_AllocSendPacket(call, 0);
1283             if (!cp) {
1284                 /* Mode can no longer be MODE_SENDING */
1285                 return;
1286             }
1287             cp->length = 0;
1288             cp->niovecs = 2;    /* header + space for rxkad stuff */
1289             call->nFree = 0;
1290         }
1291
1292         /* The 1 specifies that this is the last packet */
1293         call->bytesSent += cp->length;
1294         rxi_PrepareSendPacket(call, cp, 1);
1295 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1296         /* PrepareSendPacket drops the call lock */
1297         rxi_WaitforTQBusy(call);
1298 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1299 #ifdef RX_TRACK_PACKETS
1300         cp->flags |= RX_PKTFLAG_TQ;
1301 #endif
1302         opr_queue_Append(&call->tq, &cp->entry);
1303 #ifdef RXDEBUG_PACKET
1304         call->tqc++;
1305 #endif /* RXDEBUG_PACKET */
1306
1307         /* If the call is in recovery, let it exhaust its current retransmit
1308          * queue before forcing it to send new packets
1309          */
1310         if (!(call->flags & RX_CALL_FAST_RECOVER)) {
1311             rxi_Start(call, 0);
1312         }
1313         MUTEX_EXIT(&call->lock);
1314     }
1315 }
1316
1317 /* Flush any buffered data to the stream, switch to read mode
1318  * (clients) or to EOF mode (servers) */
1319 void
1320 rx_FlushWrite(struct rx_call *call)
1321 {
1322     SPLVAR;
1323     NETPRI;
1324     rxi_FlushWrite(call);
1325     USERPRI;
1326 }