b63b42fd88f764aed1644b4b886b8ada772d1d36
[openafs.git] / src / rx / rx_rdwr.c
1  /*
2   * Copyright 2000, International Business Machines Corporation and others.
3   * All Rights Reserved.
4   *
5   * This software has been released under the terms of the IBM Public
6   * License.  For details, see the LICENSE file in the top-level source
7   * directory or online at http://www.openafs.org/dl/license10.html
8   */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #ifdef KERNEL
14 # ifndef UKERNEL
15 #  ifdef RX_KERNEL_TRACE
16 #   include "rx_kcommon.h"
17 #  endif
18 #  if defined(AFS_DARWIN_ENV) || defined(AFS_XBSD_ENV)
19 #   include "afs/sysincludes.h"
20 #  else
21 #   include "h/types.h"
22 #   include "h/time.h"
23 #   include "h/stat.h"
24 #   if defined(AFS_AIX_ENV) || defined(AFS_AUX_ENV) || defined(AFS_SUN5_ENV)
25 #    include "h/systm.h"
26 #   endif
27 #   ifdef       AFS_OSF_ENV
28 #    include <net/net_globals.h>
29 #   endif /* AFS_OSF_ENV */
30 #   ifdef AFS_LINUX20_ENV
31 #    include "h/socket.h"
32 #   endif
33 #   include "netinet/in.h"
34 #   if defined(AFS_SGI_ENV)
35 #    include "afs/sysincludes.h"
36 #   endif
37 #  endif
38 #  include "afs/afs_args.h"
39 #  if   (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
40 #   include "h/systm.h"
41 #  endif
42 # else /* !UKERNEL */
43 #  include "afs/sysincludes.h"
44 # endif /* !UKERNEL */
45
46 # ifdef RXDEBUG
47 #  undef RXDEBUG                        /* turn off debugging */
48 # endif /* RXDEBUG */
49
50 # include "afs/afs_osi.h"
51 # include "rx_kmutex.h"
52 # include "rx/rx_kernel.h"
53 # include "afs/lock.h"
54 #else /* KERNEL */
55 # include <roken.h>
56 # include <afs/opr.h>
57 #endif /* KERNEL */
58
59 #include "rx.h"
60 #include "rx_clock.h"
61 #include "rx_queue.h"
62 #include "rx_globals.h"
63 #include "rx_atomic.h"
64 #include "rx_internal.h"
65 #include "rx_conn.h"
66 #include "rx_call.h"
67 #include "rx_packet.h"
68
69 #ifdef RX_LOCKS_DB
70 /* rxdb_fileID is used to identify the lock location, along with line#. */
71 static int rxdb_fileID = RXDB_FILE_RX_RDWR;
72 #endif /* RX_LOCKS_DB */
73
74 /* Get the next packet in the receive queue
75  *
76  * Dispose of the call's currentPacket, and move the next packet in the
77  * receive queue into the currentPacket field. If the next packet isn't
78  * available, then currentPacket is left NULL.
79  *
80  * @param call
81  *      The RX call to manipulate
82  * @returns
83  *      0 on success, an error code on failure
84  *
85  * @notes
86  *      Must be called with the call locked. Unlocks the call if returning
87  *      with an error.
88  */
89
90 static int
91 rxi_GetNextPacket(struct rx_call *call) {
92     struct rx_packet *rp;
93     int error;
94
95     if (call->currentPacket != NULL) {
96 #ifdef RX_TRACK_PACKETS
97         call->currentPacket->flags |= RX_PKTFLAG_CP;
98 #endif
99         rxi_FreePacket(call->currentPacket);
100         call->currentPacket = NULL;
101     }
102
103     if (queue_IsEmpty(&call->rq))
104         return 0;
105
106     /* Check that next packet available is next in sequence */
107     rp = queue_First(&call->rq, rx_packet);
108     if (rp->header.seq != call->rnext)
109         return 0;
110
111     queue_Remove(rp);
112 #ifdef RX_TRACK_PACKETS
113     rp->flags &= ~RX_PKTFLAG_RQ;
114 #endif
115 #ifdef RXDEBUG_PACKET
116     call->rqc--;
117 #endif /* RXDEBUG_PACKET */
118
119     /* RXS_CheckPacket called to undo RXS_PreparePacket's work.  It may
120      * reduce the length of the packet by up to conn->maxTrailerSize,
121      * to reflect the length of the data + the header. */
122     if ((error = RXS_CheckPacket(call->conn->securityObject, call, rp))) {
123         /* Used to merely shut down the call, but now we shut down the whole
124          * connection since this may indicate an attempt to hijack it */
125
126         MUTEX_EXIT(&call->lock);
127         rxi_ConnectionError(call->conn, error);
128         MUTEX_ENTER(&call->conn->conn_data_lock);
129         rp = rxi_SendConnectionAbort(call->conn, rp, 0, 0);
130         MUTEX_EXIT(&call->conn->conn_data_lock);
131         rxi_FreePacket(rp);
132
133         return error;
134      }
135
136     call->rnext++;
137     call->currentPacket = rp;
138 #ifdef RX_TRACK_PACKETS
139     call->currentPacket->flags |= RX_PKTFLAG_CP;
140 #endif
141     call->curvec = 1;   /* 0th vec is always header */
142
143     /* begin at the beginning [ more or less ], continue on until the end,
144      * then stop. */
145     call->curpos = (char *)call->currentPacket->wirevec[1].iov_base +
146                    call->conn->securityHeaderSize;
147     call->curlen = call->currentPacket->wirevec[1].iov_len -
148                    call->conn->securityHeaderSize;
149
150     call->nLeft = call->currentPacket->length;
151     call->bytesRcvd += call->currentPacket->length;
152
153     call->nHardAcks++;
154
155     return 0;
156 }
157
158 /* rxi_ReadProc -- internal version.
159  *
160  * LOCKS USED -- called at netpri
161  */
162 int
163 rxi_ReadProc(struct rx_call *call, char *buf,
164              int nbytes)
165 {
166     int requestCount;
167     int code;
168     unsigned int t;
169
170 /* XXXX took out clock_NewTime from here.  Was it needed? */
171     requestCount = nbytes;
172
173     /* Free any packets from the last call to ReadvProc/WritevProc */
174     if (queue_IsNotEmpty(&call->iovq)) {
175 #ifdef RXDEBUG_PACKET
176         call->iovqc -=
177 #endif /* RXDEBUG_PACKET */
178             rxi_FreePackets(0, &call->iovq);
179     }
180
181     do {
182         if (call->nLeft == 0) {
183             /* Get next packet */
184             MUTEX_ENTER(&call->lock);
185             for (;;) {
186                 if (call->error || (call->mode != RX_MODE_RECEIVING)) {
187                     if (call->error) {
188                         call->mode = RX_MODE_ERROR;
189                         MUTEX_EXIT(&call->lock);
190                         return 0;
191                     }
192                     if (call->mode == RX_MODE_SENDING) {
193                         MUTEX_EXIT(&call->lock);
194                         rxi_FlushWrite(call);
195                         MUTEX_ENTER(&call->lock);
196                         continue;
197                     }
198                 }
199
200                 code = rxi_GetNextPacket(call);
201                 if (code)
202                      return 0;
203
204                 if (call->currentPacket) {
205                     if (!(call->flags & RX_CALL_RECEIVE_DONE)) {
206                         if (call->nHardAcks > (u_short) rxi_HardAckRate) {
207                             rxevent_Cancel(&call->delayedAckEvent, call,
208                                            RX_CALL_REFCOUNT_DELAY);
209                             rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
210                         } else {
211                             /* Delay to consolidate ack packets */
212                             rxi_PostDelayedAckEvent(call, &rx_hardAckDelay);
213                         }
214                     }
215                     break;
216                 }
217
218                 /*
219                  * If we reach this point either we have no packets in the
220                  * receive queue or the next packet in the queue is not the
221                  * one we are looking for.  There is nothing else for us to
222                  * do but wait for another packet to arrive.
223                  */
224
225                 /* Are there ever going to be any more packets? */
226                 if (call->flags & RX_CALL_RECEIVE_DONE) {
227                     MUTEX_EXIT(&call->lock);
228                     return requestCount - nbytes;
229                 }
230                 /* Wait for in-sequence packet */
231                 call->flags |= RX_CALL_READER_WAIT;
232                 clock_NewTime();
233                 call->startWait = clock_Sec();
234                 while (call->flags & RX_CALL_READER_WAIT) {
235 #ifdef  RX_ENABLE_LOCKS
236                     CV_WAIT(&call->cv_rq, &call->lock);
237 #else
238                     osi_rxSleep(&call->rq);
239 #endif
240                 }
241
242                 call->startWait = 0;
243 #ifdef RX_ENABLE_LOCKS
244                 if (call->error) {
245                     MUTEX_EXIT(&call->lock);
246                     return 0;
247                 }
248 #endif /* RX_ENABLE_LOCKS */
249             }
250             MUTEX_EXIT(&call->lock);
251         } else
252             /* osi_Assert(cp); */
253             /* MTUXXX  this should be replaced by some error-recovery code before shipping */
254             /* yes, the following block is allowed to be the ELSE clause (or not) */
255             /* It's possible for call->nLeft to be smaller than any particular
256              * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
257              * reflects the size of the buffer.  We have to keep track of the
258              * number of bytes read in the length field of the packet struct.  On
259              * the final portion of a received packet, it's almost certain that
260              * call->nLeft will be smaller than the final buffer. */
261             while (nbytes && call->currentPacket) {
262                 t = MIN((int)call->curlen, nbytes);
263                 t = MIN(t, (int)call->nLeft);
264                 memcpy(buf, call->curpos, t);
265                 buf += t;
266                 nbytes -= t;
267                 call->curpos += t;
268                 call->curlen -= t;
269                 call->nLeft -= t;
270
271                 if (!call->nLeft) {
272                     /* out of packet.  Get another one. */
273 #ifdef RX_TRACK_PACKETS
274                     call->currentPacket->flags &= ~RX_PKTFLAG_CP;
275 #endif
276                     rxi_FreePacket(call->currentPacket);
277                     call->currentPacket = NULL;
278                 } else if (!call->curlen) {
279                     /* need to get another struct iov */
280                     if (++call->curvec >= call->currentPacket->niovecs) {
281                         /* current packet is exhausted, get ready for another */
282                         /* don't worry about curvec and stuff, they get set somewhere else */
283 #ifdef RX_TRACK_PACKETS
284                         call->currentPacket->flags &= ~RX_PKTFLAG_CP;
285 #endif
286                         rxi_FreePacket(call->currentPacket);
287                         call->currentPacket = NULL;
288                         call->nLeft = 0;
289                     } else {
290                         call->curpos =
291                             call->currentPacket->wirevec[call->curvec].iov_base;
292                         call->curlen =
293                             call->currentPacket->wirevec[call->curvec].iov_len;
294                     }
295                 }
296             }
297         if (!nbytes) {
298             /* user buffer is full, return */
299             return requestCount;
300         }
301
302     } while (nbytes);
303
304     return requestCount;
305 }
306
307 int
308 rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
309 {
310     int bytes;
311     SPLVAR;
312
313     /* Free any packets from the last call to ReadvProc/WritevProc */
314     if (!queue_IsEmpty(&call->iovq)) {
315 #ifdef RXDEBUG_PACKET
316         call->iovqc -=
317 #endif /* RXDEBUG_PACKET */
318             rxi_FreePackets(0, &call->iovq);
319     }
320
321     /*
322      * Most common case, all of the data is in the current iovec.
323      * We are relying on nLeft being zero unless the call is in receive mode.
324      */
325     if (!call->error && call->curlen > nbytes && call->nLeft > nbytes) {
326         memcpy(buf, call->curpos, nbytes);
327
328         call->curpos += nbytes;
329         call->curlen -= nbytes;
330         call->nLeft  -= nbytes;
331
332         if (!call->nLeft && call->currentPacket != NULL) {
333             /* out of packet.  Get another one. */
334             rxi_FreePacket(call->currentPacket);
335             call->currentPacket = (struct rx_packet *)0;
336         }
337         return nbytes;
338     }
339
340     NETPRI;
341     bytes = rxi_ReadProc(call, buf, nbytes);
342     USERPRI;
343     return bytes;
344 }
345
346 /* Optimization for unmarshalling 32 bit integers */
347 int
348 rx_ReadProc32(struct rx_call *call, afs_int32 * value)
349 {
350     int bytes;
351     SPLVAR;
352
353     /* Free any packets from the last call to ReadvProc/WritevProc */
354     if (!queue_IsEmpty(&call->iovq)) {
355 #ifdef RXDEBUG_PACKET
356         call->iovqc -=
357 #endif /* RXDEBUG_PACKET */
358             rxi_FreePackets(0, &call->iovq);
359     }
360
361     /*
362      * Most common case, all of the data is in the current iovec.
363      * We are relying on nLeft being zero unless the call is in receive mode.
364      */
365     if (!call->error && call->curlen >= sizeof(afs_int32)
366         && call->nLeft >= sizeof(afs_int32)) {
367
368         memcpy((char *)value, call->curpos, sizeof(afs_int32));
369
370         call->curpos += sizeof(afs_int32);
371         call->curlen -= sizeof(afs_int32);
372         call->nLeft  -= sizeof(afs_int32);
373
374         if (!call->nLeft && call->currentPacket != NULL) {
375             /* out of packet.  Get another one. */
376             rxi_FreePacket(call->currentPacket);
377             call->currentPacket = (struct rx_packet *)0;
378         }
379         return sizeof(afs_int32);
380     }
381
382     NETPRI;
383     bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
384     USERPRI;
385
386     return bytes;
387 }
388
389 /* rxi_FillReadVec
390  *
391  * Uses packets in the receive queue to fill in as much of the
392  * current iovec as possible. Does not block if it runs out
393  * of packets to complete the iovec. Return true if an ack packet
394  * was sent, otherwise return false */
395 int
396 rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
397 {
398     int didConsume = 0;
399     int didHardAck = 0;
400     int code;
401     unsigned int t;
402     struct iovec *call_iov;
403     struct iovec *cur_iov = NULL;
404
405     if (call->currentPacket) {
406         cur_iov = &call->currentPacket->wirevec[call->curvec];
407     }
408     call_iov = &call->iov[call->iovNext];
409
410     while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) {
411         if (call->nLeft == 0) {
412             /* Get next packet */
413             code = rxi_GetNextPacket(call);
414             if (code) {
415                 MUTEX_ENTER(&call->lock);
416                 return 1;
417             }
418
419             if (call->currentPacket) {
420                 cur_iov = &call->currentPacket->wirevec[1];
421                 didConsume = 1;
422                 continue;
423             } else {
424                 break;
425             }
426         }
427
428         /* It's possible for call->nLeft to be smaller than any particular
429          * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
430          * reflects the size of the buffer.  We have to keep track of the
431          * number of bytes read in the length field of the packet struct.  On
432          * the final portion of a received packet, it's almost certain that
433          * call->nLeft will be smaller than the final buffer. */
434         while (call->iovNBytes
435                && call->iovNext < call->iovMax
436                && call->currentPacket) {
437
438             t = MIN((int)call->curlen, call->iovNBytes);
439             t = MIN(t, (int)call->nLeft);
440             call_iov->iov_base = call->curpos;
441             call_iov->iov_len = t;
442             call_iov++;
443             call->iovNext++;
444             call->iovNBytes -= t;
445             call->curpos += t;
446             call->curlen -= t;
447             call->nLeft -= t;
448
449             if (!call->nLeft) {
450                 /* out of packet.  Get another one. */
451 #ifdef RX_TRACK_PACKETS
452                 call->currentPacket->flags &= ~RX_PKTFLAG_CP;
453                 call->currentPacket->flags |= RX_PKTFLAG_IOVQ;
454 #endif
455                 queue_Append(&call->iovq, call->currentPacket);
456 #ifdef RXDEBUG_PACKET
457                 call->iovqc++;
458 #endif /* RXDEBUG_PACKET */
459                 call->currentPacket = NULL;
460             } else if (!call->curlen) {
461                 /* need to get another struct iov */
462                 if (++call->curvec >= call->currentPacket->niovecs) {
463                     /* current packet is exhausted, get ready for another */
464                     /* don't worry about curvec and stuff, they get set somewhere else */
465 #ifdef RX_TRACK_PACKETS
466                     call->currentPacket->flags &= ~RX_PKTFLAG_CP;
467                     call->currentPacket->flags |= RX_PKTFLAG_IOVQ;
468 #endif
469                     queue_Append(&call->iovq, call->currentPacket);
470 #ifdef RXDEBUG_PACKET
471                     call->iovqc++;
472 #endif /* RXDEBUG_PACKET */
473                     call->currentPacket = NULL;
474                     call->nLeft = 0;
475                 } else {
476                     cur_iov++;
477                     call->curpos = (char *)cur_iov->iov_base;
478                     call->curlen = cur_iov->iov_len;
479                 }
480             }
481         }
482     }
483
484     /* If we consumed any packets then check whether we need to
485      * send a hard ack. */
486     if (didConsume && (!(call->flags & RX_CALL_RECEIVE_DONE))) {
487         if (call->nHardAcks > (u_short) rxi_HardAckRate) {
488             rxevent_Cancel(&call->delayedAckEvent, call,
489                            RX_CALL_REFCOUNT_DELAY);
490             rxi_SendAck(call, 0, serial, RX_ACK_DELAY, 0);
491             didHardAck = 1;
492         } else {
493             /* Delay to consolidate ack packets */
494             rxi_PostDelayedAckEvent(call, &rx_hardAckDelay);
495         }
496     }
497     return didHardAck;
498 }
499
500
501 /* rxi_ReadvProc -- internal version.
502  *
503  * Fills in an iovec with pointers to the packet buffers. All packets
504  * except the last packet (new current packet) are moved to the iovq
505  * while the application is processing the data.
506  *
507  * LOCKS USED -- called at netpri.
508  */
509 int
510 rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
511               int nbytes)
512 {
513     int bytes;
514
515     /* Free any packets from the last call to ReadvProc/WritevProc */
516     if (queue_IsNotEmpty(&call->iovq)) {
517 #ifdef RXDEBUG_PACKET
518         call->iovqc -=
519 #endif /* RXDEBUG_PACKET */
520             rxi_FreePackets(0, &call->iovq);
521     }
522
523     if (call->mode == RX_MODE_SENDING) {
524         rxi_FlushWrite(call);
525     }
526
527     MUTEX_ENTER(&call->lock);
528     if (call->error)
529         goto error;
530
531     /* Get whatever data is currently available in the receive queue.
532      * If rxi_FillReadVec sends an ack packet then it is possible
533      * that we will receive more data while we drop the call lock
534      * to send the packet. Set the RX_CALL_IOVEC_WAIT flag
535      * here to avoid a race with the receive thread if we send
536      * hard acks in rxi_FillReadVec. */
537     call->flags |= RX_CALL_IOVEC_WAIT;
538     call->iovNBytes = nbytes;
539     call->iovMax = maxio;
540     call->iovNext = 0;
541     call->iov = iov;
542     rxi_FillReadVec(call, 0);
543
544     /* if we need more data then sleep until the receive thread has
545      * filled in the rest. */
546     if (!call->error && call->iovNBytes && call->iovNext < call->iovMax
547         && !(call->flags & RX_CALL_RECEIVE_DONE)) {
548         call->flags |= RX_CALL_READER_WAIT;
549         clock_NewTime();
550         call->startWait = clock_Sec();
551         while (call->flags & RX_CALL_READER_WAIT) {
552 #ifdef  RX_ENABLE_LOCKS
553             CV_WAIT(&call->cv_rq, &call->lock);
554 #else
555             osi_rxSleep(&call->rq);
556 #endif
557         }
558         call->startWait = 0;
559     }
560     call->flags &= ~RX_CALL_IOVEC_WAIT;
561
562     if (call->error)
563         goto error;
564
565     call->iov = NULL;
566     *nio = call->iovNext;
567     bytes = nbytes - call->iovNBytes;
568     MUTEX_EXIT(&call->lock);
569     return bytes;
570
571   error:
572     MUTEX_EXIT(&call->lock);
573     call->mode = RX_MODE_ERROR;
574     return 0;
575 }
576
577 int
578 rx_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
579              int nbytes)
580 {
581     int bytes;
582     SPLVAR;
583
584     NETPRI;
585     bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
586     USERPRI;
587     return bytes;
588 }
589
590 /* rxi_WriteProc -- internal version.
591  *
592  * LOCKS USED -- called at netpri
593  */
594
595 int
596 rxi_WriteProc(struct rx_call *call, char *buf,
597               int nbytes)
598 {
599     struct rx_connection *conn = call->conn;
600     unsigned int t;
601     int requestCount = nbytes;
602
603     /* Free any packets from the last call to ReadvProc/WritevProc */
604     if (queue_IsNotEmpty(&call->iovq)) {
605 #ifdef RXDEBUG_PACKET
606         call->iovqc -=
607 #endif /* RXDEBUG_PACKET */
608             rxi_FreePackets(0, &call->iovq);
609     }
610
611     if (call->mode != RX_MODE_SENDING) {
612         if ((conn->type == RX_SERVER_CONNECTION)
613             && (call->mode == RX_MODE_RECEIVING)) {
614             call->mode = RX_MODE_SENDING;
615             if (call->currentPacket) {
616 #ifdef RX_TRACK_PACKETS
617                 call->currentPacket->flags &= ~RX_PKTFLAG_CP;
618 #endif
619                 rxi_FreePacket(call->currentPacket);
620                 call->currentPacket = NULL;
621                 call->nLeft = 0;
622                 call->nFree = 0;
623             }
624         } else {
625             return 0;
626         }
627     }
628
629     /* Loop condition is checked at end, so that a write of 0 bytes
630      * will force a packet to be created--specially for the case where
631      * there are 0 bytes on the stream, but we must send a packet
632      * anyway. */
633     do {
634         if (call->nFree == 0) {
635             MUTEX_ENTER(&call->lock);
636             if (call->error)
637                 call->mode = RX_MODE_ERROR;
638             if (!call->error && call->currentPacket) {
639                 clock_NewTime();        /* Bogus:  need new time package */
640                 /* The 0, below, specifies that it is not the last packet:
641                  * there will be others. PrepareSendPacket may
642                  * alter the packet length by up to
643                  * conn->securityMaxTrailerSize */
644                 call->bytesSent += call->currentPacket->length;
645                 rxi_PrepareSendPacket(call, call->currentPacket, 0);
646 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
647                 /* PrepareSendPacket drops the call lock */
648                 rxi_WaitforTQBusy(call);
649 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
650 #ifdef RX_TRACK_PACKETS
651                 call->currentPacket->flags |= RX_PKTFLAG_TQ;
652 #endif
653                 queue_Append(&call->tq, call->currentPacket);
654 #ifdef RXDEBUG_PACKET
655                 call->tqc++;
656 #endif /* RXDEBUG_PACKET */
657 #ifdef RX_TRACK_PACKETS
658                 call->currentPacket->flags &= ~RX_PKTFLAG_CP;
659 #endif
660                 call->currentPacket = NULL;
661
662                 /* If the call is in recovery, let it exhaust its current
663                  * retransmit queue before forcing it to send new packets
664                  */
665                 if (!(call->flags & (RX_CALL_FAST_RECOVER))) {
666                     rxi_Start(call, 0);
667                 }
668             } else if (call->currentPacket) {
669 #ifdef RX_TRACK_PACKETS
670                 call->currentPacket->flags &= ~RX_PKTFLAG_CP;
671 #endif
672                 rxi_FreePacket(call->currentPacket);
673                 call->currentPacket = NULL;
674             }
675             /* Wait for transmit window to open up */
676             while (!call->error
677                    && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
678                 clock_NewTime();
679                 call->startWait = clock_Sec();
680
681 #ifdef  RX_ENABLE_LOCKS
682                 CV_WAIT(&call->cv_twind, &call->lock);
683 #else
684                 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
685                 osi_rxSleep(&call->twind);
686 #endif
687
688                 call->startWait = 0;
689 #ifdef RX_ENABLE_LOCKS
690                 if (call->error) {
691                     call->mode = RX_MODE_ERROR;
692                     MUTEX_EXIT(&call->lock);
693                     return 0;
694                 }
695 #endif /* RX_ENABLE_LOCKS */
696             }
697             if ((call->currentPacket = rxi_AllocSendPacket(call, nbytes))) {
698 #ifdef RX_TRACK_PACKETS
699                 call->currentPacket->flags |= RX_PKTFLAG_CP;
700 #endif
701                 call->nFree = call->currentPacket->length;
702                 call->curvec = 1;       /* 0th vec is always header */
703                 /* begin at the beginning [ more or less ], continue
704                  * on until the end, then stop. */
705                 call->curpos =
706                     (char *) call->currentPacket->wirevec[1].iov_base +
707                     call->conn->securityHeaderSize;
708                 call->curlen =
709                     call->currentPacket->wirevec[1].iov_len -
710                     call->conn->securityHeaderSize;
711             }
712             if (call->error) {
713                 call->mode = RX_MODE_ERROR;
714                 if (call->currentPacket) {
715 #ifdef RX_TRACK_PACKETS
716                     call->currentPacket->flags &= ~RX_PKTFLAG_CP;
717 #endif
718                     rxi_FreePacket(call->currentPacket);
719                     call->currentPacket = NULL;
720                 }
721                 MUTEX_EXIT(&call->lock);
722                 return 0;
723             }
724             MUTEX_EXIT(&call->lock);
725         }
726
727         if (call->currentPacket && (int)call->nFree < nbytes) {
728             /* Try to extend the current buffer */
729             int len, mud;
730             len = call->currentPacket->length;
731             mud = rx_MaxUserDataSize(call);
732             if (mud > len) {
733                 int want;
734                 want = MIN(nbytes - (int)call->nFree, mud - len);
735                 rxi_AllocDataBuf(call->currentPacket, want,
736                                  RX_PACKET_CLASS_SEND_CBUF);
737                 if (call->currentPacket->length > (unsigned)mud)
738                     call->currentPacket->length = mud;
739                 call->nFree += (call->currentPacket->length - len);
740             }
741         }
742
743         /* If the remaining bytes fit in the buffer, then store them
744          * and return.  Don't ship a buffer that's full immediately to
745          * the peer--we don't know if it's the last buffer yet */
746
747         if (!call->currentPacket) {
748             call->nFree = 0;
749         }
750
751         while (nbytes && call->nFree) {
752
753             t = MIN((int)call->curlen, nbytes);
754             t = MIN((int)call->nFree, t);
755             memcpy(call->curpos, buf, t);
756             buf += t;
757             nbytes -= t;
758             call->curpos += t;
759             call->curlen -= (u_short)t;
760             call->nFree -= (u_short)t;
761
762             if (!call->curlen) {
763                 /* need to get another struct iov */
764                 if (++call->curvec >= call->currentPacket->niovecs) {
765                     /* current packet is full, extend or send it */
766                     call->nFree = 0;
767                 } else {
768                     call->curpos =
769                         call->currentPacket->wirevec[call->curvec].iov_base;
770                     call->curlen =
771                         call->currentPacket->wirevec[call->curvec].iov_len;
772                 }
773             }
774         }                       /* while bytes to send and room to send them */
775
776         /* might be out of space now */
777         if (!nbytes) {
778             return requestCount;
779         } else;                 /* more data to send, so get another packet and keep going */
780     } while (nbytes);
781
782     return requestCount - nbytes;
783 }
784
785 int
786 rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
787 {
788     int bytes;
789     int tcurlen;
790     int tnFree;
791     char *tcurpos;
792     SPLVAR;
793
794     /* Free any packets from the last call to ReadvProc/WritevProc */
795     if (queue_IsNotEmpty(&call->iovq)) {
796 #ifdef RXDEBUG_PACKET
797         call->iovqc -=
798 #endif /* RXDEBUG_PACKET */
799             rxi_FreePackets(0, &call->iovq);
800     }
801
802     /*
803      * Most common case: all of the data fits in the current iovec.
804      * We are relying on nFree being zero unless the call is in send mode.
805      */
806     tcurlen = (int)call->curlen;
807     tnFree = (int)call->nFree;
808     if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
809         tcurpos = call->curpos;
810
811         memcpy(tcurpos, buf, nbytes);
812         call->curpos = tcurpos + nbytes;
813         call->curlen = (u_short)(tcurlen - nbytes);
814         call->nFree = (u_short)(tnFree - nbytes);
815         return nbytes;
816     }
817
818     NETPRI;
819     bytes = rxi_WriteProc(call, buf, nbytes);
820     USERPRI;
821     return bytes;
822 }
823
824 /* Optimization for marshalling 32 bit arguments */
825 int
826 rx_WriteProc32(struct rx_call *call, afs_int32 * value)
827 {
828     int bytes;
829     int tcurlen;
830     int tnFree;
831     char *tcurpos;
832     SPLVAR;
833
834     if (queue_IsNotEmpty(&call->iovq)) {
835 #ifdef RXDEBUG_PACKET
836         call->iovqc -=
837 #endif /* RXDEBUG_PACKET */
838             rxi_FreePackets(0, &call->iovq);
839     }
840
841     /*
842      * Most common case: all of the data fits in the current iovec.
843      * We are relying on nFree being zero unless the call is in send mode.
844      */
845     tcurlen = call->curlen;
846     tnFree = call->nFree;
847     if (!call->error && tcurlen >= sizeof(afs_int32)
848         && tnFree >= sizeof(afs_int32)) {
849         tcurpos = call->curpos;
850
851         if (!((size_t)tcurpos & (sizeof(afs_int32) - 1))) {
852             *((afs_int32 *) (tcurpos)) = *value;
853         } else {
854             memcpy(tcurpos, (char *)value, sizeof(afs_int32));
855         }
856         call->curpos = tcurpos + sizeof(afs_int32);
857         call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
858         call->nFree = (u_short)(tnFree - sizeof(afs_int32));
859         return sizeof(afs_int32);
860     }
861
862     NETPRI;
863     bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
864     USERPRI;
865     return bytes;
866 }
867
868 /* rxi_WritevAlloc -- internal version.
869  *
870  * Fill in an iovec to point to data in packet buffers. The application
871  * calls rxi_WritevProc when the buffers are full.
872  *
873  * LOCKS USED -- called at netpri.
874  */
875
876 static int
877 rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
878                 int nbytes)
879 {
880     struct rx_connection *conn = call->conn;
881     struct rx_packet *cp;
882     int requestCount;
883     int nextio;
884     /* Temporary values, real work is done in rxi_WritevProc */
885     int tnFree;
886     unsigned int tcurvec;
887     char *tcurpos;
888     int tcurlen;
889
890     requestCount = nbytes;
891     nextio = 0;
892
893     /* Free any packets from the last call to ReadvProc/WritevProc */
894     if (queue_IsNotEmpty(&call->iovq)) {
895 #ifdef RXDEBUG_PACKET
896         call->iovqc -=
897 #endif /* RXDEBUG_PACKET */
898             rxi_FreePackets(0, &call->iovq);
899     }
900
901     if (call->mode != RX_MODE_SENDING) {
902         if ((conn->type == RX_SERVER_CONNECTION)
903             && (call->mode == RX_MODE_RECEIVING)) {
904             call->mode = RX_MODE_SENDING;
905             if (call->currentPacket) {
906 #ifdef RX_TRACK_PACKETS
907                 call->currentPacket->flags &= ~RX_PKTFLAG_CP;
908 #endif
909                 rxi_FreePacket(call->currentPacket);
910                 call->currentPacket = NULL;
911                 call->nLeft = 0;
912                 call->nFree = 0;
913             }
914         } else {
915             return 0;
916         }
917     }
918
919     /* Set up the iovec to point to data in packet buffers. */
920     tnFree = call->nFree;
921     tcurvec = call->curvec;
922     tcurpos = call->curpos;
923     tcurlen = call->curlen;
924     cp = call->currentPacket;
925     do {
926         int t;
927
928         if (tnFree == 0) {
929             /* current packet is full, allocate a new one */
930             MUTEX_ENTER(&call->lock);
931             cp = rxi_AllocSendPacket(call, nbytes);
932             MUTEX_EXIT(&call->lock);
933             if (cp == NULL) {
934                 /* out of space, return what we have */
935                 *nio = nextio;
936                 return requestCount - nbytes;
937             }
938 #ifdef RX_TRACK_PACKETS
939             cp->flags |= RX_PKTFLAG_IOVQ;
940 #endif
941             queue_Append(&call->iovq, cp);
942 #ifdef RXDEBUG_PACKET
943             call->iovqc++;
944 #endif /* RXDEBUG_PACKET */
945             tnFree = cp->length;
946             tcurvec = 1;
947             tcurpos =
948                 (char *)cp->wirevec[1].iov_base +
949                 call->conn->securityHeaderSize;
950             tcurlen = cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
951         }
952
953         if (tnFree < nbytes) {
954             /* try to extend the current packet */
955             int len, mud;
956             len = cp->length;
957             mud = rx_MaxUserDataSize(call);
958             if (mud > len) {
959                 int want;
960                 want = MIN(nbytes - tnFree, mud - len);
961                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
962                 if (cp->length > (unsigned)mud)
963                     cp->length = mud;
964                 tnFree += (cp->length - len);
965                 if (cp == call->currentPacket) {
966                     call->nFree += (cp->length - len);
967                 }
968             }
969         }
970
971         /* fill in the next entry in the iovec */
972         t = MIN(tcurlen, nbytes);
973         t = MIN(tnFree, t);
974         iov[nextio].iov_base = tcurpos;
975         iov[nextio].iov_len = t;
976         nbytes -= t;
977         tcurpos += t;
978         tcurlen -= t;
979         tnFree -= t;
980         nextio++;
981
982         if (!tcurlen) {
983             /* need to get another struct iov */
984             if (++tcurvec >= cp->niovecs) {
985                 /* current packet is full, extend it or move on to next packet */
986                 tnFree = 0;
987             } else {
988                 tcurpos = (char *)cp->wirevec[tcurvec].iov_base;
989                 tcurlen = cp->wirevec[tcurvec].iov_len;
990             }
991         }
992     } while (nbytes && nextio < maxio);
993     *nio = nextio;
994     return requestCount - nbytes;
995 }
996
997 int
998 rx_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
999                int nbytes)
1000 {
1001     int bytes;
1002     SPLVAR;
1003
1004     NETPRI;
1005     bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
1006     USERPRI;
1007     return bytes;
1008 }
1009
1010 /* rxi_WritevProc -- internal version.
1011  *
1012  * Send buffers allocated in rxi_WritevAlloc.
1013  *
1014  * LOCKS USED -- called at netpri.
1015  */
1016 int
1017 rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1018 {
1019 #ifdef RX_TRACK_PACKETS
1020     struct rx_packet *p, *np;
1021 #endif
1022     int nextio;
1023     int requestCount;
1024     struct rx_queue tmpq;
1025 #ifdef RXDEBUG_PACKET
1026     u_short tmpqc;
1027 #endif
1028
1029     requestCount = nbytes;
1030     nextio = 0;
1031
1032     MUTEX_ENTER(&call->lock);
1033     if (call->error) {
1034         call->mode = RX_MODE_ERROR;
1035     } else if (call->mode != RX_MODE_SENDING) {
1036         call->error = RX_PROTOCOL_ERROR;
1037     }
1038 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1039     rxi_WaitforTQBusy(call);
1040 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1041
1042     if (call->error) {
1043         call->mode = RX_MODE_ERROR;
1044         MUTEX_EXIT(&call->lock);
1045         if (call->currentPacket) {
1046 #ifdef RX_TRACK_PACKETS
1047             call->currentPacket->flags &= ~RX_PKTFLAG_CP;
1048             call->currentPacket->flags |= RX_PKTFLAG_IOVQ;
1049 #endif
1050             queue_Prepend(&call->iovq, call->currentPacket);
1051 #ifdef RXDEBUG_PACKET
1052             call->iovqc++;
1053 #endif /* RXDEBUG_PACKET */
1054             call->currentPacket = NULL;
1055         }
1056 #ifdef RXDEBUG_PACKET
1057         call->iovqc -=
1058 #endif /* RXDEBUG_PACKET */
1059             rxi_FreePackets(0, &call->iovq);
1060         return 0;
1061     }
1062
1063     /* Loop through the I/O vector adjusting packet pointers.
1064      * Place full packets back onto the iovq once they are ready
1065      * to send. Set RX_PROTOCOL_ERROR if any problems are found in
1066      * the iovec. We put the loop condition at the end to ensure that
1067      * a zero length write will push a short packet. */
1068     nextio = 0;
1069     queue_Init(&tmpq);
1070 #ifdef RXDEBUG_PACKET
1071     tmpqc = 0;
1072 #endif /* RXDEBUG_PACKET */
1073     do {
1074         if (call->nFree == 0 && call->currentPacket) {
1075             clock_NewTime();    /* Bogus:  need new time package */
1076             /* The 0, below, specifies that it is not the last packet:
1077              * there will be others. PrepareSendPacket may
1078              * alter the packet length by up to
1079              * conn->securityMaxTrailerSize */
1080             call->bytesSent += call->currentPacket->length;
1081             rxi_PrepareSendPacket(call, call->currentPacket, 0);
1082 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1083             /* PrepareSendPacket drops the call lock */
1084             rxi_WaitforTQBusy(call);
1085 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1086             queue_Append(&tmpq, call->currentPacket);
1087 #ifdef RXDEBUG_PACKET
1088             tmpqc++;
1089 #endif /* RXDEBUG_PACKET */
1090             call->currentPacket = NULL;
1091
1092             /* The head of the iovq is now the current packet */
1093             if (nbytes) {
1094                 if (queue_IsEmpty(&call->iovq)) {
1095                     MUTEX_EXIT(&call->lock);
1096                     call->error = RX_PROTOCOL_ERROR;
1097 #ifdef RXDEBUG_PACKET
1098                     tmpqc -=
1099 #endif /* RXDEBUG_PACKET */
1100                         rxi_FreePackets(0, &tmpq);
1101                     return 0;
1102                 }
1103                 call->currentPacket = queue_First(&call->iovq, rx_packet);
1104                 queue_Remove(call->currentPacket);
1105 #ifdef RX_TRACK_PACKETS
1106                 call->currentPacket->flags &= ~RX_PKTFLAG_IOVQ;
1107                 call->currentPacket->flags |= RX_PKTFLAG_CP;
1108 #endif
1109 #ifdef RXDEBUG_PACKET
1110                 call->iovqc--;
1111 #endif /* RXDEBUG_PACKET */
1112                 call->nFree = call->currentPacket->length;
1113                 call->curvec = 1;
1114                 call->curpos =
1115                     (char *) call->currentPacket->wirevec[1].iov_base +
1116                     call->conn->securityHeaderSize;
1117                 call->curlen =
1118                     call->currentPacket->wirevec[1].iov_len -
1119                     call->conn->securityHeaderSize;
1120             }
1121         }
1122
1123         if (nbytes) {
1124             /* The next iovec should point to the current position */
1125             if (iov[nextio].iov_base != call->curpos
1126                 || iov[nextio].iov_len > (int)call->curlen) {
1127                 call->error = RX_PROTOCOL_ERROR;
1128                 MUTEX_EXIT(&call->lock);
1129                 if (call->currentPacket) {
1130 #ifdef RX_TRACK_PACKETS
1131                     call->currentPacket->flags &= ~RX_PKTFLAG_CP;
1132 #endif
1133                     queue_Prepend(&tmpq, call->currentPacket);
1134 #ifdef RXDEBUG_PACKET
1135                     tmpqc++;
1136 #endif /* RXDEBUG_PACKET */
1137                     call->currentPacket = NULL;
1138                 }
1139 #ifdef RXDEBUG_PACKET
1140                 tmpqc -=
1141 #endif /* RXDEBUG_PACKET */
1142                     rxi_FreePackets(0, &tmpq);
1143                 return 0;
1144             }
1145             nbytes -= iov[nextio].iov_len;
1146             call->curpos += iov[nextio].iov_len;
1147             call->curlen -= iov[nextio].iov_len;
1148             call->nFree -= iov[nextio].iov_len;
1149             nextio++;
1150             if (call->curlen == 0) {
1151                 if (++call->curvec > call->currentPacket->niovecs) {
1152                     call->nFree = 0;
1153                 } else {
1154                     call->curpos =
1155                         call->currentPacket->wirevec[call->curvec].iov_base;
1156                     call->curlen =
1157                         call->currentPacket->wirevec[call->curvec].iov_len;
1158                 }
1159             }
1160         }
1161     } while (nbytes && nextio < nio);
1162
1163     /* Move the packets from the temporary queue onto the transmit queue.
1164      * We may end up with more than call->twind packets on the queue. */
1165
1166 #ifdef RX_TRACK_PACKETS
1167     for (queue_Scan(&tmpq, p, np, rx_packet))
1168     {
1169         p->flags |= RX_PKTFLAG_TQ;
1170     }
1171 #endif
1172
1173     if (call->error)
1174         call->mode = RX_MODE_ERROR;
1175
1176     queue_SpliceAppend(&call->tq, &tmpq);
1177
1178     /* If the call is in recovery, let it exhaust its current retransmit
1179      * queue before forcing it to send new packets
1180      */
1181     if (!(call->flags & RX_CALL_FAST_RECOVER)) {
1182         rxi_Start(call, 0);
1183     }
1184
1185     /* Wait for the length of the transmit queue to fall below call->twind */
1186     while (!call->error && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
1187         clock_NewTime();
1188         call->startWait = clock_Sec();
1189 #ifdef  RX_ENABLE_LOCKS
1190         CV_WAIT(&call->cv_twind, &call->lock);
1191 #else
1192         call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
1193         osi_rxSleep(&call->twind);
1194 #endif
1195         call->startWait = 0;
1196     }
1197
1198     if (call->error) {
1199         call->mode = RX_MODE_ERROR;
1200         call->currentPacket = NULL;
1201         MUTEX_EXIT(&call->lock);
1202         if (call->currentPacket) {
1203 #ifdef RX_TRACK_PACKETS
1204             call->currentPacket->flags &= ~RX_PKTFLAG_CP;
1205 #endif
1206             rxi_FreePacket(call->currentPacket);
1207         }
1208         return 0;
1209     }
1210     MUTEX_EXIT(&call->lock);
1211
1212     return requestCount - nbytes;
1213 }
1214
1215 int
1216 rx_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1217 {
1218     int bytes;
1219     SPLVAR;
1220
1221     NETPRI;
1222     bytes = rxi_WritevProc(call, iov, nio, nbytes);
1223     USERPRI;
1224     return bytes;
1225 }
1226
1227 /* Flush any buffered data to the stream, switch to read mode
1228  * (clients) or to EOF mode (servers)
1229  *
1230  * LOCKS HELD: called at netpri.
1231  */
1232 void
1233 rxi_FlushWrite(struct rx_call *call)
1234 {
1235     struct rx_packet *cp = NULL;
1236
1237     /* Free any packets from the last call to ReadvProc/WritevProc */
1238     if (queue_IsNotEmpty(&call->iovq)) {
1239 #ifdef RXDEBUG_PACKET
1240         call->iovqc -=
1241 #endif /* RXDEBUG_PACKET */
1242             rxi_FreePackets(0, &call->iovq);
1243     }
1244
1245     if (call->mode == RX_MODE_SENDING) {
1246
1247         call->mode =
1248             (call->conn->type ==
1249              RX_CLIENT_CONNECTION ? RX_MODE_RECEIVING : RX_MODE_EOF);
1250
1251 #ifdef RX_KERNEL_TRACE
1252         {
1253             int glockOwner = ISAFS_GLOCK();
1254             if (!glockOwner)
1255                 AFS_GLOCK();
1256             afs_Trace3(afs_iclSetp, CM_TRACE_WASHERE, ICL_TYPE_STRING,
1257                        __FILE__, ICL_TYPE_INT32, __LINE__, ICL_TYPE_POINTER,
1258                        call);
1259             if (!glockOwner)
1260                 AFS_GUNLOCK();
1261         }
1262 #endif
1263
1264         MUTEX_ENTER(&call->lock);
1265         if (call->error)
1266             call->mode = RX_MODE_ERROR;
1267
1268         cp = call->currentPacket;
1269
1270         if (cp) {
1271             /* cp->length is only supposed to be the user's data */
1272             /* cp->length was already set to (then-current)
1273              * MaxUserDataSize or less. */
1274 #ifdef RX_TRACK_PACKETS
1275             cp->flags &= ~RX_PKTFLAG_CP;
1276 #endif
1277             cp->length -= call->nFree;
1278             call->currentPacket = (struct rx_packet *)0;
1279             call->nFree = 0;
1280         } else {
1281             cp = rxi_AllocSendPacket(call, 0);
1282             if (!cp) {
1283                 /* Mode can no longer be MODE_SENDING */
1284                 return;
1285             }
1286             cp->length = 0;
1287             cp->niovecs = 2;    /* header + space for rxkad stuff */
1288             call->nFree = 0;
1289         }
1290
1291         /* The 1 specifies that this is the last packet */
1292         call->bytesSent += cp->length;
1293         rxi_PrepareSendPacket(call, cp, 1);
1294 #ifdef  AFS_GLOBAL_RXLOCK_KERNEL
1295         /* PrepareSendPacket drops the call lock */
1296         rxi_WaitforTQBusy(call);
1297 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1298 #ifdef RX_TRACK_PACKETS
1299         cp->flags |= RX_PKTFLAG_TQ;
1300 #endif
1301         queue_Append(&call->tq, cp);
1302 #ifdef RXDEBUG_PACKET
1303         call->tqc++;
1304 #endif /* RXDEBUG_PACKET */
1305
1306         /* If the call is in recovery, let it exhaust its current retransmit
1307          * queue before forcing it to send new packets
1308          */
1309         if (!(call->flags & RX_CALL_FAST_RECOVER)) {
1310             rxi_Start(call, 0);
1311         }
1312         MUTEX_EXIT(&call->lock);
1313     }
1314 }
1315
1316 /* Flush any buffered data to the stream, switch to read mode
1317  * (clients) or to EOF mode (servers) */
1318 void
1319 rx_FlushWrite(struct rx_call *call)
1320 {
1321     SPLVAR;
1322     NETPRI;
1323     rxi_FlushWrite(call);
1324     USERPRI;
1325 }