205adf92532069791298073c8daf2a2ff8c91cda
[openafs.git] / src / rx / rx_rdwr.c
1  /*
2   * Copyright 2000, International Business Machines Corporation and others.
3   * All Rights Reserved.
4   * 
5   * This software has been released under the terms of the IBM Public
6   * License.  For details, see the LICENSE file in the top-level source
7   * directory or online at http://www.openafs.org/dl/license10.html
8   */
9
10 #include <afsconfig.h>
11 #ifdef KERNEL
12 #include "afs/param.h"
13 #else
14 #include <afs/param.h>
15 #endif
16
17 RCSID
18     ("$Header$");
19
20 #ifdef KERNEL
21 #ifndef UKERNEL
22 #ifdef RX_KERNEL_TRACE
23 #include "rx_kcommon.h"
24 #endif
25 #if defined(AFS_DARWIN_ENV) || defined(AFS_XBSD_ENV)
26 #include "afs/sysincludes.h"
27 #else
28 #include "h/types.h"
29 #include "h/time.h"
30 #include "h/stat.h"
31 #if defined(AFS_AIX_ENV) || defined(AFS_AUX_ENV) || defined(AFS_SUN5_ENV) 
32 #include "h/systm.h"
33 #endif
34 #ifdef  AFS_OSF_ENV
35 #include <net/net_globals.h>
36 #endif /* AFS_OSF_ENV */
37 #ifdef AFS_LINUX20_ENV
38 #include "h/socket.h"
39 #endif
40 #include "netinet/in.h"
41 #if defined(AFS_SGI_ENV)
42 #include "afs/sysincludes.h"
43 #endif
44 #endif
45 #include "afs/afs_args.h"
46 #include "afs/afs_osi.h"
47 #if     (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
48 #include "h/systm.h"
49 #endif
50 #else /* !UKERNEL */
51 #include "afs/sysincludes.h"
52 #endif /* !UKERNEL */
53 #ifdef RXDEBUG
54 #undef RXDEBUG                  /* turn off debugging */
55 #endif /* RXDEBUG */
56
57 #include "rx_kmutex.h"
58 #include "rx/rx_kernel.h"
59 #include "rx/rx_clock.h"
60 #include "rx/rx_queue.h"
61 #include "rx/rx.h"
62 #include "rx/rx_globals.h"
63 #include "afs/lock.h"
64 #include "afsint.h"
65 #ifdef  AFS_OSF_ENV
66 #undef kmem_alloc
67 #undef kmem_free
68 #undef mem_alloc
69 #undef mem_free
70 #undef register
71 #endif /* AFS_OSF_ENV */
72 #else /* KERNEL */
73 # include <sys/types.h>
74 #ifdef AFS_NT40_ENV
75 # include <winsock2.h>
76 #else /* !AFS_NT40_ENV */
77 # include <sys/socket.h>
78 # include <sys/file.h>
79 # include <netdb.h>
80 # include <netinet/in.h>
81 # include <sys/stat.h>
82 # include <sys/time.h>
83 #endif /* !AFS_NT40_ENV */
84 #ifdef HAVE_STRING_H
85 #include <string.h>
86 #else
87 #ifdef HAVE_STRINGS_H
88 #include <strings.h>
89 #endif
90 #endif
91 #ifdef HAVE_UNISTD_H
92 #include <unistd.h>
93 #endif
94 # include "rx_user.h"
95 # include "rx_clock.h"
96 # include "rx_queue.h"
97 # include "rx.h"
98 # include "rx_globals.h"
99 #endif /* KERNEL */
100
101 #ifdef RX_LOCKS_DB
102 /* rxdb_fileID is used to identify the lock location, along with line#. */
103 static int rxdb_fileID = RXDB_FILE_RX_RDWR;
104 #endif /* RX_LOCKS_DB */
105 /* rxi_ReadProc -- internal version.
106  *
107  * LOCKS USED -- called at netpri with rx global lock and call->lock held.
108  */
109 int
110 rxi_ReadProc(register struct rx_call *call, register char *buf,
111              register int nbytes)
112 {
113     register struct rx_packet *cp = call->currentPacket;
114     register struct rx_packet *rp;
115     register int requestCount;
116     register unsigned int t;
117
118 /* XXXX took out clock_NewTime from here.  Was it needed? */
119     requestCount = nbytes;
120
121     /* Free any packets from the last call to ReadvProc/WritevProc */
122     if (queue_IsNotEmpty(&call->iovq)) {
123         rxi_FreePackets(0, &call->iovq);
124     }
125
126     do {
127         if (call->nLeft == 0) {
128             /* Get next packet */
129             for (;;) {
130                 if (call->error || (call->mode != RX_MODE_RECEIVING)) {
131                     if (call->error) {
132                         return 0;
133                     }
134                     if (call->mode == RX_MODE_SENDING) {
135                         rxi_FlushWrite(call);
136                         continue;
137                     }
138                 }
139                 if (queue_IsNotEmpty(&call->rq)) {
140                     /* Check that next packet available is next in sequence */
141                     rp = queue_First(&call->rq, rx_packet);
142                     if (rp->header.seq == call->rnext) {
143                         afs_int32 error;
144                         register struct rx_connection *conn = call->conn;
145                         queue_Remove(rp);
146
147                         /* RXS_CheckPacket called to undo RXS_PreparePacket's
148                          * work.  It may reduce the length of the packet by up
149                          * to conn->maxTrailerSize, to reflect the length of the
150                          * data + the header. */
151                         if ((error =
152                              RXS_CheckPacket(conn->securityObject, call,
153                                              rp))) {
154                             /* Used to merely shut down the call, but now we 
155                              * shut down the whole connection since this may 
156                              * indicate an attempt to hijack it */
157
158                             MUTEX_EXIT(&call->lock);
159                             rxi_ConnectionError(conn, error);
160                             MUTEX_ENTER(&conn->conn_data_lock);
161                             rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
162                             MUTEX_EXIT(&conn->conn_data_lock);
163                             rxi_FreePacket(rp);
164                             MUTEX_ENTER(&call->lock);
165
166                             return 0;
167                         }
168                         call->rnext++;
169                         cp = call->currentPacket = rp;
170                         call->curvec = 1;       /* 0th vec is always header */
171                         /* begin at the beginning [ more or less ], continue 
172                          * on until the end, then stop. */
173                         call->curpos =
174                             (char *)cp->wirevec[1].iov_base +
175                             call->conn->securityHeaderSize;
176                         call->curlen =
177                             cp->wirevec[1].iov_len -
178                             call->conn->securityHeaderSize;
179
180                         /* Notice that this code works correctly if the data
181                          * size is 0 (which it may be--no reply arguments from
182                          * server, for example).  This relies heavily on the
183                          * fact that the code below immediately frees the packet
184                          * (no yields, etc.).  If it didn't, this would be a
185                          * problem because a value of zero for call->nLeft
186                          * normally means that there is no read packet */
187                         call->nLeft = cp->length;
188                         hadd32(call->bytesRcvd, cp->length);
189
190                         /* Send a hard ack for every rxi_HardAckRate+1 packets
191                          * consumed. Otherwise schedule an event to send
192                          * the hard ack later on.
193                          */
194                         call->nHardAcks++;
195                         if (!(call->flags & RX_CALL_RECEIVE_DONE)) {
196                             if (call->nHardAcks > (u_short) rxi_HardAckRate) {
197                                 rxevent_Cancel(call->delayedAckEvent, call,
198                                                RX_CALL_REFCOUNT_DELAY);
199                                 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
200                             } else {
201                                 struct clock when;
202                                 clock_GetTime(&when);
203                                 /* Delay to consolidate ack packets */
204                                 clock_Add(&when, &rx_hardAckDelay);
205                                 if (!call->delayedAckEvent
206                                     || clock_Gt(&call->delayedAckEvent->
207                                                 eventTime, &when)) {
208                                     rxevent_Cancel(call->delayedAckEvent,
209                                                    call,
210                                                    RX_CALL_REFCOUNT_DELAY);
211                                     CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
212                                     call->delayedAckEvent =
213                                         rxevent_Post(&when,
214                                                      rxi_SendDelayedAck, call,
215                                                      0);
216                                 }
217                             }
218                         }
219                         break;
220                     }
221                 }
222
223 /*
224 MTUXXX  doesn't there need to be an "else" here ??? 
225 */
226                 /* Are there ever going to be any more packets? */
227                 if (call->flags & RX_CALL_RECEIVE_DONE) {
228                     return requestCount - nbytes;
229                 }
230                 /* Wait for in-sequence packet */
231                 call->flags |= RX_CALL_READER_WAIT;
232                 clock_NewTime();
233                 call->startWait = clock_Sec();
234                 while (call->flags & RX_CALL_READER_WAIT) {
235 #ifdef  RX_ENABLE_LOCKS
236                     CV_WAIT(&call->cv_rq, &call->lock);
237 #else
238                     osi_rxSleep(&call->rq);
239 #endif
240                 }
241
242                 call->startWait = 0;
243 #ifdef RX_ENABLE_LOCKS
244                 if (call->error) {
245                     return 0;
246                 }
247 #endif /* RX_ENABLE_LOCKS */
248             }
249         } else
250             /* assert(cp); */
251             /* MTUXXX  this should be replaced by some error-recovery code before shipping */
252             /* yes, the following block is allowed to be the ELSE clause (or not) */
253             /* It's possible for call->nLeft to be smaller than any particular
254              * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
255              * reflects the size of the buffer.  We have to keep track of the
256              * number of bytes read in the length field of the packet struct.  On
257              * the final portion of a received packet, it's almost certain that
258              * call->nLeft will be smaller than the final buffer. */
259             while (nbytes && cp) {
260                 t = MIN((int)call->curlen, nbytes);
261                 t = MIN(t, (int)call->nLeft);
262                 memcpy(buf, call->curpos, t);
263                 buf += t;
264                 nbytes -= t;
265                 call->curpos += t;
266                 call->curlen -= t;
267                 call->nLeft -= t;
268
269                 if (!call->nLeft) {
270                     /* out of packet.  Get another one. */
271                     rxi_FreePacket(cp);
272                     cp = call->currentPacket = (struct rx_packet *)0;
273                 } else if (!call->curlen) {
274                     /* need to get another struct iov */
275                     if (++call->curvec >= cp->niovecs) {
276                         /* current packet is exhausted, get ready for another */
277                         /* don't worry about curvec and stuff, they get set somewhere else */
278                         rxi_FreePacket(cp);
279                         cp = call->currentPacket = (struct rx_packet *)0;
280                         call->nLeft = 0;
281                     } else {
282                         call->curpos =
283                             (char *)cp->wirevec[call->curvec].iov_base;
284                         call->curlen = cp->wirevec[call->curvec].iov_len;
285                     }
286                 }
287             }
288         if (!nbytes) {
289             /* user buffer is full, return */
290             return requestCount;
291         }
292
293     } while (nbytes);
294
295     return requestCount;
296 }
297
298 int
299 rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
300 {
301     int bytes;
302     int tcurlen;
303     int tnLeft;
304     char *tcurpos;
305     SPLVAR;
306
307     /*
308      * Free any packets from the last call to ReadvProc/WritevProc.
309      * We do not need the lock because the receiver threads only
310      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
311      * RX_CALL_IOVEC_WAIT is always cleared before returning from
312      * ReadvProc/WritevProc.
313      */
314     if (!queue_IsEmpty(&call->iovq)) {
315         rxi_FreePackets(0, &call->iovq);
316     }
317
318     /*
319      * Most common case, all of the data is in the current iovec.
320      * We do not need the lock because this is the only thread that
321      * updates the curlen, curpos, nLeft fields.
322      *
323      * We are relying on nLeft being zero unless the call is in receive mode.
324      */
325     tcurlen = call->curlen;
326     tnLeft = call->nLeft;
327     if (!call->error && tcurlen > nbytes && tnLeft > nbytes) {
328         tcurpos = call->curpos;
329         memcpy(buf, tcurpos, nbytes);
330         call->curpos = tcurpos + nbytes;
331         call->curlen = tcurlen - nbytes;
332         call->nLeft = tnLeft - nbytes;
333         return nbytes;
334     }
335
336     NETPRI;
337     MUTEX_ENTER(&call->lock);
338     bytes = rxi_ReadProc(call, buf, nbytes);
339     MUTEX_EXIT(&call->lock);
340     USERPRI;
341     return bytes;
342 }
343
344 /* Optimization for unmarshalling 32 bit integers */
345 int
346 rx_ReadProc32(struct rx_call *call, afs_int32 * value)
347 {
348     int bytes;
349     int tcurlen;
350     int tnLeft;
351     char *tcurpos;
352     SPLVAR;
353
354     /*
355      * Free any packets from the last call to ReadvProc/WritevProc.
356      * We do not need the lock because the receiver threads only
357      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
358      * RX_CALL_IOVEC_WAIT is always cleared before returning from
359      * ReadvProc/WritevProc.
360      */
361     if (!queue_IsEmpty(&call->iovq)) {
362         rxi_FreePackets(0, &call->iovq);
363     }
364
365     /*
366      * Most common case, all of the data is in the current iovec.
367      * We do not need the lock because this is the only thread that
368      * updates the curlen, curpos, nLeft fields.
369      *
370      * We are relying on nLeft being zero unless the call is in receive mode.
371      */
372     tcurlen = call->curlen;
373     tnLeft = call->nLeft;
374     if (!call->error && tcurlen > sizeof(afs_int32)
375         && tnLeft > sizeof(afs_int32)) {
376         tcurpos = call->curpos;
377         if (!((size_t)tcurpos & (sizeof(afs_int32) - 1))) {
378             *value = *((afs_int32 *) (tcurpos));
379         } else {
380             memcpy((char *)value, tcurpos, sizeof(afs_int32));
381         }
382         call->curpos = tcurpos + sizeof(afs_int32);
383         call->curlen = tcurlen - (u_short)sizeof(afs_int32);
384         call->nLeft = tnLeft - (u_short)sizeof(afs_int32);
385         return sizeof(afs_int32);
386     }
387
388     NETPRI;
389     MUTEX_ENTER(&call->lock);
390     bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
391     MUTEX_EXIT(&call->lock);
392     USERPRI;
393     return bytes;
394 }
395
396 /* rxi_FillReadVec
397  *
398  * Uses packets in the receive queue to fill in as much of the
399  * current iovec as possible. Does not block if it runs out
400  * of packets to complete the iovec. Return true if an ack packet
401  * was sent, otherwise return false */
402 int
403 rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
404 {
405     int didConsume = 0;
406     int didHardAck = 0;
407     register unsigned int t;
408     struct rx_packet *rp;
409     struct rx_packet *curp;
410     struct iovec *call_iov;
411     struct iovec *cur_iov = NULL;
412
413     curp = call->currentPacket;
414     if (curp) {
415         cur_iov = &curp->wirevec[call->curvec];
416     }
417     call_iov = &call->iov[call->iovNext];
418
419     while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) {
420         if (call->nLeft == 0) {
421             /* Get next packet */
422             if (queue_IsNotEmpty(&call->rq)) {
423                 /* Check that next packet available is next in sequence */
424                 rp = queue_First(&call->rq, rx_packet);
425                 if (rp->header.seq == call->rnext) {
426                     afs_int32 error;
427                     register struct rx_connection *conn = call->conn;
428                     queue_Remove(rp);
429
430                     /* RXS_CheckPacket called to undo RXS_PreparePacket's
431                      * work.  It may reduce the length of the packet by up
432                      * to conn->maxTrailerSize, to reflect the length of the
433                      * data + the header. */
434                     if ((error =
435                          RXS_CheckPacket(conn->securityObject, call, rp))) {
436                         /* Used to merely shut down the call, but now we 
437                          * shut down the whole connection since this may 
438                          * indicate an attempt to hijack it */
439
440                         MUTEX_EXIT(&call->lock);
441                         rxi_ConnectionError(conn, error);
442                         MUTEX_ENTER(&conn->conn_data_lock);
443                         rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
444                         MUTEX_EXIT(&conn->conn_data_lock);
445                         rxi_FreePacket(rp);
446                         MUTEX_ENTER(&call->lock);
447
448                         return 1;
449                     }
450                     call->rnext++;
451                     curp = call->currentPacket = rp;
452                     call->curvec = 1;   /* 0th vec is always header */
453                     cur_iov = &curp->wirevec[1];
454                     /* begin at the beginning [ more or less ], continue 
455                      * on until the end, then stop. */
456                     call->curpos =
457                         (char *)curp->wirevec[1].iov_base +
458                         call->conn->securityHeaderSize;
459                     call->curlen =
460                         curp->wirevec[1].iov_len -
461                         call->conn->securityHeaderSize;
462
463                     /* Notice that this code works correctly if the data
464                      * size is 0 (which it may be--no reply arguments from
465                      * server, for example).  This relies heavily on the
466                      * fact that the code below immediately frees the packet
467                      * (no yields, etc.).  If it didn't, this would be a
468                      * problem because a value of zero for call->nLeft
469                      * normally means that there is no read packet */
470                     call->nLeft = curp->length;
471                     hadd32(call->bytesRcvd, curp->length);
472
473                     /* Send a hard ack for every rxi_HardAckRate+1 packets
474                      * consumed. Otherwise schedule an event to send
475                      * the hard ack later on.
476                      */
477                     call->nHardAcks++;
478                     didConsume = 1;
479                     continue;
480                 }
481             }
482             break;
483         }
484
485         /* It's possible for call->nLeft to be smaller than any particular
486          * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
487          * reflects the size of the buffer.  We have to keep track of the
488          * number of bytes read in the length field of the packet struct.  On
489          * the final portion of a received packet, it's almost certain that
490          * call->nLeft will be smaller than the final buffer. */
491         while (call->iovNBytes && call->iovNext < call->iovMax && curp) {
492
493             t = MIN((int)call->curlen, call->iovNBytes);
494             t = MIN(t, (int)call->nLeft);
495             call_iov->iov_base = call->curpos;
496             call_iov->iov_len = t;
497             call_iov++;
498             call->iovNext++;
499             call->iovNBytes -= t;
500             call->curpos += t;
501             call->curlen -= t;
502             call->nLeft -= t;
503
504             if (!call->nLeft) {
505                 /* out of packet.  Get another one. */
506                 queue_Append(&call->iovq, curp);
507                 curp = call->currentPacket = (struct rx_packet *)0;
508             } else if (!call->curlen) {
509                 /* need to get another struct iov */
510                 if (++call->curvec >= curp->niovecs) {
511                     /* current packet is exhausted, get ready for another */
512                     /* don't worry about curvec and stuff, they get set somewhere else */
513                     queue_Append(&call->iovq, curp);
514                     curp = call->currentPacket = (struct rx_packet *)0;
515                     call->nLeft = 0;
516                 } else {
517                     cur_iov++;
518                     call->curpos = (char *)cur_iov->iov_base;
519                     call->curlen = cur_iov->iov_len;
520                 }
521             }
522         }
523     }
524
525     /* If we consumed any packets then check whether we need to
526      * send a hard ack. */
527     if (didConsume && (!(call->flags & RX_CALL_RECEIVE_DONE))) {
528         if (call->nHardAcks > (u_short) rxi_HardAckRate) {
529             rxevent_Cancel(call->delayedAckEvent, call,
530                            RX_CALL_REFCOUNT_DELAY);
531             rxi_SendAck(call, 0, serial, RX_ACK_DELAY, 0);
532             didHardAck = 1;
533         } else {
534             struct clock when;
535             clock_GetTime(&when);
536             /* Delay to consolidate ack packets */
537             clock_Add(&when, &rx_hardAckDelay);
538             if (!call->delayedAckEvent
539                 || clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
540                 rxevent_Cancel(call->delayedAckEvent, call,
541                                RX_CALL_REFCOUNT_DELAY);
542                 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
543                 call->delayedAckEvent =
544                     rxevent_Post(&when, rxi_SendDelayedAck, call, 0);
545             }
546         }
547     }
548     return didHardAck;
549 }
550
551
552 /* rxi_ReadvProc -- internal version.
553  *
554  * Fills in an iovec with pointers to the packet buffers. All packets
555  * except the last packet (new current packet) are moved to the iovq
556  * while the application is processing the data.
557  *
558  * LOCKS USED -- called at netpri with rx global lock and call->lock held.
559  */
560 int
561 rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
562               int nbytes)
563 {
564     struct rx_packet *rp;
565     int requestCount;
566     int nextio;
567
568     requestCount = nbytes;
569     nextio = 0;
570
571     /* Free any packets from the last call to ReadvProc/WritevProc */
572     if (queue_IsNotEmpty(&call->iovq)) {
573         rxi_FreePackets(0, &call->iovq);
574     }
575
576     if (call->mode == RX_MODE_SENDING) {
577         rxi_FlushWrite(call);
578     }
579
580     if (call->error) {
581         return 0;
582     }
583
584     /* Get whatever data is currently available in the receive queue.
585      * If rxi_FillReadVec sends an ack packet then it is possible
586      * that we will receive more data while we drop the call lock
587      * to send the packet. Set the RX_CALL_IOVEC_WAIT flag
588      * here to avoid a race with the receive thread if we send
589      * hard acks in rxi_FillReadVec. */
590     call->flags |= RX_CALL_IOVEC_WAIT;
591     call->iovNBytes = nbytes;
592     call->iovMax = maxio;
593     call->iovNext = 0;
594     call->iov = iov;
595     rxi_FillReadVec(call, 0);
596
597     /* if we need more data then sleep until the receive thread has
598      * filled in the rest. */
599     if (!call->error && call->iovNBytes && call->iovNext < call->iovMax
600         && !(call->flags & RX_CALL_RECEIVE_DONE)) {
601         call->flags |= RX_CALL_READER_WAIT;
602         clock_NewTime();
603         call->startWait = clock_Sec();
604         while (call->flags & RX_CALL_READER_WAIT) {
605 #ifdef  RX_ENABLE_LOCKS
606             CV_WAIT(&call->cv_rq, &call->lock);
607 #else
608             osi_rxSleep(&call->rq);
609 #endif
610         }
611         call->startWait = 0;
612     }
613     call->flags &= ~RX_CALL_IOVEC_WAIT;
614 #ifdef RX_ENABLE_LOCKS
615     if (call->error) {
616         return 0;
617     }
618 #endif /* RX_ENABLE_LOCKS */
619
620     call->iov = NULL;
621     *nio = call->iovNext;
622     return nbytes - call->iovNBytes;
623 }
624
625 int
626 rx_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
627              int nbytes)
628 {
629     int bytes;
630     SPLVAR;
631
632     NETPRI;
633     MUTEX_ENTER(&call->lock);
634     bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
635     MUTEX_EXIT(&call->lock);
636     USERPRI;
637     return bytes;
638 }
639
640 /* rxi_WriteProc -- internal version.
641  *
642  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
643
644 int
645 rxi_WriteProc(register struct rx_call *call, register char *buf,
646               register int nbytes)
647 {
648     struct rx_connection *conn = call->conn;
649     register struct rx_packet *cp = call->currentPacket;
650     register unsigned int t;
651     int requestCount = nbytes;
652
653     /* Free any packets from the last call to ReadvProc/WritevProc */
654     if (queue_IsNotEmpty(&call->iovq)) {
655         rxi_FreePackets(0, &call->iovq);
656     }
657
658     if (call->mode != RX_MODE_SENDING) {
659         if ((conn->type == RX_SERVER_CONNECTION)
660             && (call->mode == RX_MODE_RECEIVING)) {
661             call->mode = RX_MODE_SENDING;
662             if (cp) {
663                 rxi_FreePacket(cp);
664                 cp = call->currentPacket = (struct rx_packet *)0;
665                 call->nLeft = 0;
666                 call->nFree = 0;
667             }
668         } else {
669             return 0;
670         }
671     }
672
673     /* Loop condition is checked at end, so that a write of 0 bytes
674      * will force a packet to be created--specially for the case where
675      * there are 0 bytes on the stream, but we must send a packet
676      * anyway. */
677     do {
678         if (call->nFree == 0) {
679             if (!call->error && cp) {
680 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
681                 /* Wait until TQ_BUSY is reset before adding any
682                  * packets to the transmit queue
683                  */
684                 while (call->flags & RX_CALL_TQ_BUSY) {
685                     call->flags |= RX_CALL_TQ_WAIT;
686 #ifdef RX_ENABLE_LOCKS
687                     CV_WAIT(&call->cv_tq, &call->lock);
688 #else /* RX_ENABLE_LOCKS */
689                     osi_rxSleep(&call->tq);
690 #endif /* RX_ENABLE_LOCKS */
691                 }
692 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
693                 clock_NewTime();        /* Bogus:  need new time package */
694                 /* The 0, below, specifies that it is not the last packet: 
695                  * there will be others. PrepareSendPacket may
696                  * alter the packet length by up to
697                  * conn->securityMaxTrailerSize */
698                 hadd32(call->bytesSent, cp->length);
699                 rxi_PrepareSendPacket(call, cp, 0);
700                 queue_Append(&call->tq, cp);
701                 cp = call->currentPacket = NULL;
702                 if (!
703                     (call->
704                      flags & (RX_CALL_FAST_RECOVER |
705                               RX_CALL_FAST_RECOVER_WAIT))) {
706                     rxi_Start(0, call, 0, 0);
707                 }
708             }
709             /* Wait for transmit window to open up */
710             while (!call->error
711                    && call->tnext + 1 > call->tfirst + call->twind) {
712                 clock_NewTime();
713                 call->startWait = clock_Sec();
714
715 #ifdef  RX_ENABLE_LOCKS
716                 CV_WAIT(&call->cv_twind, &call->lock);
717 #else
718                 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
719                 osi_rxSleep(&call->twind);
720 #endif
721
722                 call->startWait = 0;
723 #ifdef RX_ENABLE_LOCKS
724                 if (call->error) {
725                     return 0;
726                 }
727 #endif /* RX_ENABLE_LOCKS */
728             }
729             if ((cp = rxi_AllocSendPacket(call, nbytes))) {
730                 call->currentPacket = cp;
731                 call->nFree = cp->length;
732                 call->curvec = 1;       /* 0th vec is always header */
733                 /* begin at the beginning [ more or less ], continue 
734                  * on until the end, then stop. */
735                 call->curpos =
736                     (char *)cp->wirevec[1].iov_base +
737                     call->conn->securityHeaderSize;
738                 call->curlen =
739                     cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
740             }
741             if (call->error) {
742                 if (cp) {
743                     rxi_FreePacket(cp);
744                     call->currentPacket = NULL;
745                 }
746                 return 0;
747             }
748         }
749
750         if (cp && (int)call->nFree < nbytes) {
751             /* Try to extend the current buffer */
752             register int len, mud;
753             len = cp->length;
754             mud = rx_MaxUserDataSize(call);
755             if (mud > len) {
756                 int want;
757                 want = MIN(nbytes - (int)call->nFree, mud - len);
758                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
759                 if (cp->length > (unsigned)mud)
760                     cp->length = mud;
761                 call->nFree += (cp->length - len);
762             }
763         }
764
765         /* If the remaining bytes fit in the buffer, then store them
766          * and return.  Don't ship a buffer that's full immediately to
767          * the peer--we don't know if it's the last buffer yet */
768
769         if (!cp) {
770             call->nFree = 0;
771         }
772
773         while (nbytes && call->nFree) {
774
775             t = MIN((int)call->curlen, nbytes);
776             t = MIN((int)call->nFree, t);
777             memcpy(call->curpos, buf, t);
778             buf += t;
779             nbytes -= t;
780             call->curpos += t;
781             call->curlen -= t;
782             call->nFree -= t;
783
784             if (!call->curlen) {
785                 /* need to get another struct iov */
786                 if (++call->curvec >= cp->niovecs) {
787                     /* current packet is full, extend or send it */
788                     call->nFree = 0;
789                 } else {
790                     call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
791                     call->curlen = cp->wirevec[call->curvec].iov_len;
792                 }
793             }
794         }                       /* while bytes to send and room to send them */
795
796         /* might be out of space now */
797         if (!nbytes) {
798             return requestCount;
799         } else;                 /* more data to send, so get another packet and keep going */
800     } while (nbytes);
801
802     return requestCount - nbytes;
803 }
804
805 int
806 rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
807 {
808     int bytes;
809     int tcurlen;
810     int tnFree;
811     char *tcurpos;
812     SPLVAR;
813
814     /*
815      * Free any packets from the last call to ReadvProc/WritevProc.
816      * We do not need the lock because the receiver threads only
817      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
818      * RX_CALL_IOVEC_WAIT is always cleared before returning from
819      * ReadvProc/WritevProc.
820      */
821     if (queue_IsNotEmpty(&call->iovq)) {
822         rxi_FreePackets(0, &call->iovq);
823     }
824
825     /*
826      * Most common case: all of the data fits in the current iovec.
827      * We do not need the lock because this is the only thread that
828      * updates the curlen, curpos, nFree fields.
829      *
830      * We are relying on nFree being zero unless the call is in send mode.
831      */
832     tcurlen = (int)call->curlen;
833     tnFree = (int)call->nFree;
834     if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
835         tcurpos = call->curpos;
836         memcpy(tcurpos, buf, nbytes);
837         call->curpos = tcurpos + nbytes;
838         call->curlen = tcurlen - nbytes;
839         call->nFree = tnFree - nbytes;
840         return nbytes;
841     }
842
843     NETPRI;
844     MUTEX_ENTER(&call->lock);
845     bytes = rxi_WriteProc(call, buf, nbytes);
846     MUTEX_EXIT(&call->lock);
847     USERPRI;
848     return bytes;
849 }
850
851 /* Optimization for marshalling 32 bit arguments */
852 int
853 rx_WriteProc32(register struct rx_call *call, register afs_int32 * value)
854 {
855     int bytes;
856     int tcurlen;
857     int tnFree;
858     char *tcurpos;
859     SPLVAR;
860
861     /*
862      * Free any packets from the last call to ReadvProc/WritevProc.
863      * We do not need the lock because the receiver threads only
864      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
865      * RX_CALL_IOVEC_WAIT is always cleared before returning from
866      * ReadvProc/WritevProc.
867      */
868     if (queue_IsNotEmpty(&call->iovq)) {
869         rxi_FreePackets(0, &call->iovq);
870     }
871
872     /*
873      * Most common case: all of the data fits in the current iovec.
874      * We do not need the lock because this is the only thread that
875      * updates the curlen, curpos, nFree fields.
876      *
877      * We are relying on nFree being zero unless the call is in send mode.
878      */
879     tcurlen = (int)call->curlen;
880     tnFree = (int)call->nFree;
881     if (!call->error && tcurlen >= sizeof(afs_int32)
882         && tnFree >= sizeof(afs_int32)) {
883         tcurpos = call->curpos;
884         if (!((size_t)tcurpos & (sizeof(afs_int32) - 1))) {
885             *((afs_int32 *) (tcurpos)) = *value;
886         } else {
887             memcpy(tcurpos, (char *)value, sizeof(afs_int32));
888         }
889         call->curpos = tcurpos + sizeof(afs_int32);
890         call->curlen = tcurlen - (u_short)sizeof(afs_int32);
891         call->nFree = tnFree - (u_short)sizeof(afs_int32);
892         return sizeof(afs_int32);
893     }
894
895     NETPRI;
896     MUTEX_ENTER(&call->lock);
897     bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
898     MUTEX_EXIT(&call->lock);
899     USERPRI;
900     return bytes;
901 }
902
903 /* rxi_WritevAlloc -- internal version.
904  *
905  * Fill in an iovec to point to data in packet buffers. The application
906  * calls rxi_WritevProc when the buffers are full.
907  *
908  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
909
910 int
911 rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
912                 int nbytes)
913 {
914     struct rx_connection *conn = call->conn;
915     struct rx_packet *cp = call->currentPacket;
916     int requestCount;
917     int nextio;
918     /* Temporary values, real work is done in rxi_WritevProc */
919     int tnFree;
920     int tcurvec;
921     char *tcurpos;
922     int tcurlen;
923
924     requestCount = nbytes;
925     nextio = 0;
926
927     /* Free any packets from the last call to ReadvProc/WritevProc */
928     if (queue_IsNotEmpty(&call->iovq)) {
929         rxi_FreePackets(0, &call->iovq);
930     }
931
932     if (call->mode != RX_MODE_SENDING) {
933         if ((conn->type == RX_SERVER_CONNECTION)
934             && (call->mode == RX_MODE_RECEIVING)) {
935             call->mode = RX_MODE_SENDING;
936             if (cp) {
937                 rxi_FreePacket(cp);
938                 cp = call->currentPacket = (struct rx_packet *)0;
939                 call->nLeft = 0;
940                 call->nFree = 0;
941             }
942         } else {
943             return 0;
944         }
945     }
946
947     /* Set up the iovec to point to data in packet buffers. */
948     tnFree = call->nFree;
949     tcurvec = call->curvec;
950     tcurpos = call->curpos;
951     tcurlen = call->curlen;
952     do {
953         register unsigned int t;
954
955         if (tnFree == 0) {
956             /* current packet is full, allocate a new one */
957             cp = rxi_AllocSendPacket(call, nbytes);
958             if (cp == NULL) {
959                 /* out of space, return what we have */
960                 *nio = nextio;
961                 return requestCount - nbytes;
962             }
963             queue_Append(&call->iovq, cp);
964             tnFree = cp->length;
965             tcurvec = 1;
966             tcurpos =
967                 (char *)cp->wirevec[1].iov_base +
968                 call->conn->securityHeaderSize;
969             tcurlen = cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
970         }
971
972         if (tnFree < nbytes) {
973             /* try to extend the current packet */
974             register int len, mud;
975             len = cp->length;
976             mud = rx_MaxUserDataSize(call);
977             if (mud > len) {
978                 int want;
979                 want = MIN(nbytes - tnFree, mud - len);
980                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
981                 if (cp->length > (unsigned)mud)
982                     cp->length = mud;
983                 tnFree += (cp->length - len);
984                 if (cp == call->currentPacket) {
985                     call->nFree += (cp->length - len);
986                 }
987             }
988         }
989
990         /* fill in the next entry in the iovec */
991         t = MIN(tcurlen, nbytes);
992         t = MIN(tnFree, t);
993         iov[nextio].iov_base = tcurpos;
994         iov[nextio].iov_len = t;
995         nbytes -= t;
996         tcurpos += t;
997         tcurlen -= t;
998         tnFree -= t;
999         nextio++;
1000
1001         if (!tcurlen) {
1002             /* need to get another struct iov */
1003             if (++tcurvec >= cp->niovecs) {
1004                 /* current packet is full, extend it or move on to next packet */
1005                 tnFree = 0;
1006             } else {
1007                 tcurpos = (char *)cp->wirevec[tcurvec].iov_base;
1008                 tcurlen = cp->wirevec[tcurvec].iov_len;
1009             }
1010         }
1011     } while (nbytes && nextio < maxio);
1012     *nio = nextio;
1013     return requestCount - nbytes;
1014 }
1015
1016 int
1017 rx_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
1018                int nbytes)
1019 {
1020     int bytes;
1021     SPLVAR;
1022
1023     NETPRI;
1024     MUTEX_ENTER(&call->lock);
1025     bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
1026     MUTEX_EXIT(&call->lock);
1027     USERPRI;
1028     return bytes;
1029 }
1030
1031 /* rxi_WritevProc -- internal version.
1032  *
1033  * Send buffers allocated in rxi_WritevAlloc.
1034  *
1035  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
1036
1037 int
1038 rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1039 {
1040     struct rx_packet *cp = call->currentPacket;
1041     int nextio;
1042     int requestCount;
1043     struct rx_queue tmpq;
1044
1045     requestCount = nbytes;
1046     nextio = 0;
1047
1048     if (call->mode != RX_MODE_SENDING) {
1049         call->error = RX_PROTOCOL_ERROR;
1050     }
1051 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1052     /* Wait until TQ_BUSY is reset before trying to move any
1053      * packets to the transmit queue.  */
1054     while (!call->error && call->flags & RX_CALL_TQ_BUSY) {
1055         call->flags |= RX_CALL_TQ_WAIT;
1056 #ifdef RX_ENABLE_LOCKS
1057         CV_WAIT(&call->cv_tq, &call->lock);
1058 #else /* RX_ENABLE_LOCKS */
1059         osi_rxSleep(&call->tq);
1060 #endif /* RX_ENABLE_LOCKS */
1061     }
1062 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1063
1064     if (call->error) {
1065         if (cp) {
1066             queue_Prepend(&call->iovq, cp);
1067             cp = call->currentPacket = NULL;
1068         }
1069         rxi_FreePackets(0, &call->iovq);
1070         return 0;
1071     }
1072
1073     /* Loop through the I/O vector adjusting packet pointers.
1074      * Place full packets back onto the iovq once they are ready
1075      * to send. Set RX_PROTOCOL_ERROR if any problems are found in
1076      * the iovec. We put the loop condition at the end to ensure that
1077      * a zero length write will push a short packet. */
1078     nextio = 0;
1079     queue_Init(&tmpq);
1080     do {
1081         if (call->nFree == 0 && cp) {
1082             clock_NewTime();    /* Bogus:  need new time package */
1083             /* The 0, below, specifies that it is not the last packet: 
1084              * there will be others. PrepareSendPacket may
1085              * alter the packet length by up to
1086              * conn->securityMaxTrailerSize */
1087             hadd32(call->bytesSent, cp->length);
1088             rxi_PrepareSendPacket(call, cp, 0);
1089             queue_Append(&tmpq, cp);
1090
1091             /* The head of the iovq is now the current packet */
1092             if (nbytes) {
1093                 if (queue_IsEmpty(&call->iovq)) {
1094                     call->error = RX_PROTOCOL_ERROR;
1095                     cp = call->currentPacket = NULL;
1096                     rxi_FreePackets(0, &tmpq);
1097                     return 0;
1098                 }
1099                 cp = queue_First(&call->iovq, rx_packet);
1100                 queue_Remove(cp);
1101                 call->currentPacket = cp;
1102                 call->nFree = cp->length;
1103                 call->curvec = 1;
1104                 call->curpos =
1105                     (char *)cp->wirevec[1].iov_base +
1106                     call->conn->securityHeaderSize;
1107                 call->curlen =
1108                     cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
1109             }
1110         }
1111
1112         if (nbytes) {
1113             /* The next iovec should point to the current position */
1114             if (iov[nextio].iov_base != call->curpos
1115                 || iov[nextio].iov_len > (int)call->curlen) {
1116                 call->error = RX_PROTOCOL_ERROR;
1117                 if (cp) {
1118                     queue_Prepend(&tmpq, cp);
1119                     call->currentPacket = NULL;
1120                 }
1121                 rxi_FreePackets(0, &tmpq);
1122                 return 0;
1123             }
1124             nbytes -= iov[nextio].iov_len;
1125             call->curpos += iov[nextio].iov_len;
1126             call->curlen -= iov[nextio].iov_len;
1127             call->nFree -= iov[nextio].iov_len;
1128             nextio++;
1129             if (call->curlen == 0) {
1130                 if (++call->curvec > cp->niovecs) {
1131                     call->nFree = 0;
1132                 } else {
1133                     call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
1134                     call->curlen = cp->wirevec[call->curvec].iov_len;
1135                 }
1136             }
1137         }
1138     } while (nbytes && nextio < nio);
1139
1140     /* Move the packets from the temporary queue onto the transmit queue.
1141      * We may end up with more than call->twind packets on the queue. */
1142     queue_SpliceAppend(&call->tq, &tmpq);
1143
1144     if (!(call->flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
1145         rxi_Start(0, call, 0, 0);
1146     }
1147
1148     /* Wait for the length of the transmit queue to fall below call->twind */
1149     while (!call->error && call->tnext + 1 > call->tfirst + call->twind) {
1150         clock_NewTime();
1151         call->startWait = clock_Sec();
1152 #ifdef  RX_ENABLE_LOCKS
1153         CV_WAIT(&call->cv_twind, &call->lock);
1154 #else
1155         call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
1156         osi_rxSleep(&call->twind);
1157 #endif
1158         call->startWait = 0;
1159     }
1160
1161     if (call->error) {
1162         if (cp) {
1163             rxi_FreePacket(cp);
1164             cp = call->currentPacket = NULL;
1165         }
1166         return 0;
1167     }
1168
1169     return requestCount - nbytes;
1170 }
1171
1172 int
1173 rx_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1174 {
1175     int bytes;
1176     SPLVAR;
1177
1178     NETPRI;
1179     MUTEX_ENTER(&call->lock);
1180     bytes = rxi_WritevProc(call, iov, nio, nbytes);
1181     MUTEX_EXIT(&call->lock);
1182     USERPRI;
1183     return bytes;
1184 }
1185
1186 /* Flush any buffered data to the stream, switch to read mode
1187  * (clients) or to EOF mode (servers) */
1188 void
1189 rxi_FlushWrite(register struct rx_call *call)
1190 {
1191     register struct rx_packet *cp = call->currentPacket;
1192
1193     /* Free any packets from the last call to ReadvProc/WritevProc */
1194     if (queue_IsNotEmpty(&call->iovq)) {
1195         rxi_FreePackets(0, &call->iovq);
1196     }
1197
1198     if (call->mode == RX_MODE_SENDING) {
1199
1200         call->mode =
1201             (call->conn->type ==
1202              RX_CLIENT_CONNECTION ? RX_MODE_RECEIVING : RX_MODE_EOF);
1203
1204 #ifdef RX_KERNEL_TRACE
1205         {
1206             int glockOwner = ISAFS_GLOCK();
1207             if (!glockOwner)
1208                 AFS_GLOCK();
1209             afs_Trace3(afs_iclSetp, CM_TRACE_WASHERE, ICL_TYPE_STRING,
1210                        __FILE__, ICL_TYPE_INT32, __LINE__, ICL_TYPE_POINTER,
1211                        call);
1212             if (!glockOwner)
1213                 AFS_GUNLOCK();
1214         }
1215 #endif
1216
1217 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1218         /* Wait until TQ_BUSY is reset before adding any
1219          * packets to the transmit queue
1220          */
1221         while (call->flags & RX_CALL_TQ_BUSY) {
1222             call->flags |= RX_CALL_TQ_WAIT;
1223 #ifdef RX_ENABLE_LOCKS
1224             CV_WAIT(&call->cv_tq, &call->lock);
1225 #else /* RX_ENABLE_LOCKS */
1226             osi_rxSleep(&call->tq);
1227 #endif /* RX_ENABLE_LOCKS */
1228         }
1229 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1230
1231         if (cp) {
1232             /* cp->length is only supposed to be the user's data */
1233             /* cp->length was already set to (then-current) 
1234              * MaxUserDataSize or less. */
1235             cp->length -= call->nFree;
1236             call->currentPacket = (struct rx_packet *)0;
1237             call->nFree = 0;
1238         } else {
1239             cp = rxi_AllocSendPacket(call, 0);
1240             if (!cp) {
1241                 /* Mode can no longer be MODE_SENDING */
1242                 return;
1243             }
1244             cp->length = 0;
1245             cp->niovecs = 2;    /* header + space for rxkad stuff */
1246             call->nFree = 0;
1247         }
1248
1249         /* The 1 specifies that this is the last packet */
1250         hadd32(call->bytesSent, cp->length);
1251         rxi_PrepareSendPacket(call, cp, 1);
1252         queue_Append(&call->tq, cp);
1253         if (!
1254             (call->
1255              flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
1256             rxi_Start(0, call, 0, 0);
1257         }
1258     }
1259 }
1260
1261 /* Flush any buffered data to the stream, switch to read mode
1262  * (clients) or to EOF mode (servers) */
1263 void
1264 rx_FlushWrite(struct rx_call *call)
1265 {
1266     SPLVAR;
1267     NETPRI;
1268     MUTEX_ENTER(&call->lock);
1269     rxi_FlushWrite(call);
1270     MUTEX_EXIT(&call->lock);
1271     USERPRI;
1272 }