openbsd-20021105
[openafs.git] / src / rx / rx_rdwr.c
1  /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #ifdef KERNEL
12 #include "afs/param.h"
13 #else
14 #include <afs/param.h>
15 #endif
16
17 RCSID("$Header$");
18
19 #ifdef KERNEL
20 #ifndef UKERNEL
21 #if defined(AFS_DARWIN_ENV) || defined(AFS_XBSD_ENV)
22 #include "afs/sysincludes.h"
23 #else
24 #include "h/types.h"
25 #include "h/time.h"
26 #include "h/stat.h"
27 #ifdef  AFS_OSF_ENV
28 #include <net/net_globals.h>
29 #endif  /* AFS_OSF_ENV */
30 #ifdef AFS_LINUX20_ENV
31 #include "h/socket.h"
32 #endif
33 #include "netinet/in.h"
34 #if defined(AFS_SGI_ENV)
35 #include "afs/sysincludes.h"
36 #endif
37 #endif
38 #include "afs/afs_args.h"
39 #include "afs/afs_osi.h"
40 #if     (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
41 #include "h/systm.h"
42 #endif
43 #else /* !UKERNEL */
44 #include "afs/sysincludes.h"
45 #endif /* !UKERNEL */
46 #ifdef RXDEBUG
47 #undef RXDEBUG      /* turn off debugging */
48 #endif /* RXDEBUG */
49
50 #include "rx_kmutex.h"
51 #include "rx/rx_kernel.h"
52 #include "rx/rx_clock.h"
53 #include "rx/rx_queue.h"
54 #include "rx/rx.h"
55 #include "rx/rx_globals.h"
56 #include "afs/lock.h"
57 #include "afsint.h"
58 #ifdef  AFS_ALPHA_ENV
59 #undef kmem_alloc
60 #undef kmem_free
61 #undef mem_alloc
62 #undef mem_free
63 #undef register
64 #endif  /* AFS_ALPHA_ENV */
65 #else /* KERNEL */
66 # include <sys/types.h>
67 #ifndef AFS_NT40_ENV
68 # include <sys/socket.h>
69 # include <sys/file.h>
70 # include <netdb.h>
71 # include <netinet/in.h>
72 # include <sys/stat.h>
73 # include <sys/time.h>
74 #endif
75 #ifdef HAVE_STRING_H
76 #include <string.h>
77 #else
78 #ifdef HAVE_STRINGS_H
79 #include <strings.h>
80 #endif
81 #endif
82 #ifdef HAVE_UNISTD_H
83 #include <unistd.h>
84 #endif
85 # include "rx_user.h"
86 # include "rx_clock.h"
87 # include "rx_queue.h"
88 # include "rx.h"
89 # include "rx_globals.h"
90 #endif /* KERNEL */
91
92 #ifdef RX_LOCKS_DB
93 /* rxdb_fileID is used to identify the lock location, along with line#. */
94 static int rxdb_fileID = RXDB_FILE_RX_RDWR;
95 #endif /* RX_LOCKS_DB */
96 /* rxi_ReadProc -- internal version.
97  *
98  * LOCKS USED -- called at netpri with rx global lock and call->lock held.
99  */
100 int rxi_ReadProc(register struct rx_call *call, register char *buf, 
101         register int nbytes)
102 {
103     register struct rx_packet *cp = call->currentPacket;
104     register struct rx_packet *rp;
105     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
106     register int requestCount;
107     register unsigned int t;
108 /* XXXX took out clock_NewTime from here.  Was it needed? */
109     requestCount = nbytes;
110
111     /* Free any packets from the last call to ReadvProc/WritevProc */
112     if (!queue_IsEmpty(&call->iovq)) {
113         for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
114             queue_Remove(rp);
115             rxi_FreePacket(rp);
116         }
117     }
118
119     do {
120       if (call->nLeft == 0) {
121         /* Get next packet */
122         for (;;) {
123           if (call->error || (call->mode != RX_MODE_RECEIVING)) {
124             if (call->error) {
125               return 0;
126             }
127             if (call->mode == RX_MODE_SENDING) {
128               rxi_FlushWrite(call);
129               continue;
130             }
131           }
132           if (queue_IsNotEmpty(&call->rq)) {
133             /* Check that next packet available is next in sequence */
134             rp = queue_First(&call->rq, rx_packet);
135             if (rp->header.seq == call->rnext) {
136               afs_int32 error;
137               register struct rx_connection *conn = call->conn;
138               queue_Remove(rp);
139               
140               /* RXS_CheckPacket called to undo RXS_PreparePacket's
141                * work.  It may reduce the length of the packet by up
142                * to conn->maxTrailerSize, to reflect the length of the
143                * data + the header. */
144               if ((error = RXS_CheckPacket(conn->securityObject, call, rp))) {
145                 /* Used to merely shut down the call, but now we 
146                  * shut down the whole connection since this may 
147                  * indicate an attempt to hijack it */
148
149                 MUTEX_EXIT(&call->lock);
150                 rxi_ConnectionError(conn, error);
151                 MUTEX_ENTER(&conn->conn_data_lock);
152                 rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
153                 MUTEX_EXIT(&conn->conn_data_lock);
154                 rxi_FreePacket(rp);
155                 MUTEX_ENTER(&call->lock);
156
157                 return 0;
158               }
159               call->rnext++;
160               cp = call->currentPacket = rp;
161               call->curvec = 1;   /* 0th vec is always header */
162               /* begin at the beginning [ more or less ], continue 
163                * on until the end, then stop. */
164               call->curpos = (char *)cp->wirevec[1].iov_base
165                              + call->conn->securityHeaderSize; 
166               call->curlen = cp->wirevec[1].iov_len
167                              - call->conn->securityHeaderSize; 
168               
169               /* Notice that this code works correctly if the data
170                * size is 0 (which it may be--no reply arguments from
171                * server, for example).  This relies heavily on the
172                * fact that the code below immediately frees the packet
173                * (no yields, etc.).  If it didn't, this would be a
174                * problem because a value of zero for call->nLeft
175                * normally means that there is no read packet */
176               call->nLeft = cp->length;
177               hadd32(call->bytesRcvd, cp->length);
178               
179               /* Send a hard ack for every rxi_HardAckRate+1 packets
180                * consumed. Otherwise schedule an event to send
181                * the hard ack later on.
182                */
183               call->nHardAcks++;
184               if (!(call->flags &RX_CALL_RECEIVE_DONE)) {
185                 if (call->nHardAcks > (u_short)rxi_HardAckRate) {
186                   rxevent_Cancel(call->delayedAckEvent, call,
187                                  RX_CALL_REFCOUNT_DELAY);
188                   rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
189                 }
190                 else {
191                   struct clock when;
192                   clock_GetTime(&when);
193                   /* Delay to consolidate ack packets */
194                   clock_Add(&when, &rx_hardAckDelay);
195                   if (!call->delayedAckEvent ||
196                       clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
197                       rxevent_Cancel(call->delayedAckEvent, call,
198                                      RX_CALL_REFCOUNT_DELAY);
199                       CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
200                       call->delayedAckEvent = rxevent_Post(&when,
201                                                            rxi_SendDelayedAck,
202                                                            call, 0);
203                   }
204                 }
205               }
206               break;
207             }
208           }
209
210 /*
211 MTUXXX  doesn't there need to be an "else" here ??? 
212 */
213           /* Are there ever going to be any more packets? */
214           if (call->flags & RX_CALL_RECEIVE_DONE) {
215             return requestCount - nbytes;
216           }
217           /* Wait for in-sequence packet */
218           call->flags |= RX_CALL_READER_WAIT;
219           clock_NewTime();
220           call->startWait = clock_Sec();
221           while (call->flags & RX_CALL_READER_WAIT) {
222 #ifdef  RX_ENABLE_LOCKS
223               CV_WAIT(&call->cv_rq, &call->lock);
224 #else
225               osi_rxSleep(&call->rq);
226 #endif
227           }
228
229           call->startWait = 0;
230 #ifdef RX_ENABLE_LOCKS
231           if (call->error) {
232               return 0;
233           }
234 #endif /* RX_ENABLE_LOCKS */
235         }
236       }
237       else /* assert(cp); */  /* MTUXXX  this should be replaced by some error-recovery code before shipping */
238 /* yes, the following block is allowed to be the ELSE clause (or not) */
239
240         /* It's possible for call->nLeft to be smaller than any particular
241          * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
242          * reflects the size of the buffer.  We have to keep track of the
243          * number of bytes read in the length field of the packet struct.  On
244          * the final portion of a received packet, it's almost certain that
245          * call->nLeft will be smaller than the final buffer. */
246
247         while (nbytes && cp) {
248           t = MIN((int)call->curlen, nbytes);
249           t = MIN(t, (int)call->nLeft);
250           memcpy(buf, call->curpos, t);
251           buf += t;
252           nbytes -= t;
253           call->curpos += t;
254           call->curlen -= t;
255           call->nLeft -= t;
256
257           if (!call->nLeft) {
258             /* out of packet.  Get another one. */
259             rxi_FreePacket(cp);
260             cp = call->currentPacket = (struct rx_packet *)0;
261           }
262           else if (!call->curlen) {
263             /* need to get another struct iov */
264             if (++call->curvec >= cp->niovecs) {
265               /* current packet is exhausted, get ready for another */
266               /* don't worry about curvec and stuff, they get set somewhere else */
267               rxi_FreePacket(cp);
268               cp = call->currentPacket = (struct rx_packet *)0;
269               call->nLeft = 0;
270             }
271             else {
272               call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
273               call->curlen = cp->wirevec[call->curvec].iov_len;
274             }
275           }  
276         }
277         if (!nbytes) {
278           /* user buffer is full, return */
279           return requestCount;
280         }
281
282     } while (nbytes);
283
284     return requestCount;
285 }
286
287 int rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
288 {
289     int bytes;
290     int tcurlen;
291     int tnLeft;
292     char *tcurpos;
293     SPLVAR;
294
295     /*
296      * Free any packets from the last call to ReadvProc/WritevProc.
297      * We do not need the lock because the receiver threads only
298      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
299      * RX_CALL_IOVEC_WAIT is always cleared before returning from
300      * ReadvProc/WritevProc.
301      */
302     if (!queue_IsEmpty(&call->iovq)) {
303         register struct rx_packet *rp; 
304         register struct rx_packet *nxp;
305         for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
306             queue_Remove(rp);
307             rxi_FreePacket(rp);
308         }
309     }
310
311     /*
312      * Most common case, all of the data is in the current iovec.
313      * We do not need the lock because this is the only thread that
314      * updates the curlen, curpos, nLeft fields.
315      *
316      * We are relying on nLeft being zero unless the call is in receive mode.
317      */
318     tcurlen = call->curlen;
319     tnLeft = call->nLeft;
320     if (!call->error && tcurlen > nbytes && tnLeft > nbytes) {
321         tcurpos = call->curpos;
322         memcpy(buf, tcurpos, nbytes);
323         call->curpos = tcurpos + nbytes;
324         call->curlen = tcurlen - nbytes;
325         call->nLeft = tnLeft - nbytes;
326         return nbytes;
327     }
328
329     NETPRI;
330     AFS_RXGLOCK();
331     MUTEX_ENTER(&call->lock);
332     bytes = rxi_ReadProc(call, buf, nbytes);
333     MUTEX_EXIT(&call->lock);
334     AFS_RXGUNLOCK();
335     USERPRI;
336     return bytes;
337 }
338
339 /* Optimization for unmarshalling 32 bit integers */
340 int rx_ReadProc32(struct rx_call *call, afs_int32 *value)
341 {
342     int bytes;
343     int tcurlen;
344     int tnLeft;
345     char *tcurpos;
346     SPLVAR;
347
348     /*
349      * Free any packets from the last call to ReadvProc/WritevProc.
350      * We do not need the lock because the receiver threads only
351      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
352      * RX_CALL_IOVEC_WAIT is always cleared before returning from
353      * ReadvProc/WritevProc.
354      */
355     if (!queue_IsEmpty(&call->iovq)) {
356         register struct rx_packet *rp; 
357         register struct rx_packet *nxp;
358         for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
359             queue_Remove(rp);
360             rxi_FreePacket(rp);
361         }
362     }
363
364     /*
365      * Most common case, all of the data is in the current iovec.
366      * We do not need the lock because this is the only thread that
367      * updates the curlen, curpos, nLeft fields.
368      *
369      * We are relying on nLeft being zero unless the call is in receive mode.
370      */
371     tcurlen = call->curlen;
372     tnLeft = call->nLeft;
373     if (!call->error && tcurlen > sizeof(afs_int32) && tnLeft > sizeof(afs_int32)) {
374         tcurpos = call->curpos;
375         if (!((long)tcurpos & (sizeof(afs_int32)-1))) {
376             *value = *((afs_int32 *)(tcurpos));
377         } else {
378             memcpy((char *)value, tcurpos, sizeof(afs_int32));
379         }
380         call->curpos = tcurpos + sizeof(afs_int32);
381         call->curlen = tcurlen - sizeof(afs_int32);
382         call->nLeft = tnLeft - sizeof(afs_int32);
383         return sizeof(afs_int32);
384     }
385
386     NETPRI;
387     AFS_RXGLOCK();
388     MUTEX_ENTER(&call->lock);
389     bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
390     MUTEX_EXIT(&call->lock);
391     AFS_RXGUNLOCK();
392     USERPRI;
393     return bytes;
394 }
395
396 /* rxi_FillReadVec
397  *
398  * Uses packets in the receive queue to fill in as much of the
399  * current iovec as possible. Does not block if it runs out
400  * of packets to complete the iovec. Return true if an ack packet
401  * was sent, otherwise return false */
402 int rxi_FillReadVec(struct rx_call *call, afs_uint32 seq, 
403         afs_uint32 serial, afs_uint32 flags)
404 {
405     int didConsume = 0;
406     int didHardAck = 0;
407     register unsigned int t;
408     struct rx_packet *rp;
409     struct rx_packet *curp;
410     struct iovec *call_iov;
411     struct iovec *cur_iov = NULL;
412
413     curp = call->currentPacket;
414     if (curp) {
415         cur_iov = &curp->wirevec[call->curvec];
416     }
417     call_iov = &call->iov[call->iovNext];
418
419     while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) {
420       if (call->nLeft == 0) {
421         /* Get next packet */
422         if (queue_IsNotEmpty(&call->rq)) {
423           /* Check that next packet available is next in sequence */
424           rp = queue_First(&call->rq, rx_packet);
425           if (rp->header.seq == call->rnext) {
426             afs_int32 error;
427             register struct rx_connection *conn = call->conn;
428             queue_Remove(rp);
429               
430             /* RXS_CheckPacket called to undo RXS_PreparePacket's
431              * work.  It may reduce the length of the packet by up
432              * to conn->maxTrailerSize, to reflect the length of the
433              * data + the header. */
434             if ((error = RXS_CheckPacket(conn->securityObject, call, rp))) {
435               /* Used to merely shut down the call, but now we 
436                * shut down the whole connection since this may 
437                * indicate an attempt to hijack it */
438
439               MUTEX_EXIT(&call->lock);
440               rxi_ConnectionError(conn, error);
441               MUTEX_ENTER(&conn->conn_data_lock);
442               rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
443               MUTEX_EXIT(&conn->conn_data_lock);
444               rxi_FreePacket(rp);
445               MUTEX_ENTER(&call->lock);
446
447               return 1;
448             }
449             call->rnext++;
450             curp = call->currentPacket = rp;
451             call->curvec = 1;   /* 0th vec is always header */
452             cur_iov = &curp->wirevec[1];
453             /* begin at the beginning [ more or less ], continue 
454              * on until the end, then stop. */
455             call->curpos = (char *)curp->wirevec[1].iov_base
456                            + call->conn->securityHeaderSize; 
457             call->curlen = curp->wirevec[1].iov_len
458                            - call->conn->securityHeaderSize; 
459               
460             /* Notice that this code works correctly if the data
461              * size is 0 (which it may be--no reply arguments from
462              * server, for example).  This relies heavily on the
463              * fact that the code below immediately frees the packet
464              * (no yields, etc.).  If it didn't, this would be a
465              * problem because a value of zero for call->nLeft
466              * normally means that there is no read packet */
467             call->nLeft = curp->length;
468             hadd32(call->bytesRcvd, curp->length);
469               
470             /* Send a hard ack for every rxi_HardAckRate+1 packets
471              * consumed. Otherwise schedule an event to send
472              * the hard ack later on.
473              */
474             call->nHardAcks++;
475             didConsume = 1;
476             continue;
477           }
478         }
479         break;
480       }
481
482       /* It's possible for call->nLeft to be smaller than any particular
483        * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
484        * reflects the size of the buffer.  We have to keep track of the
485        * number of bytes read in the length field of the packet struct.  On
486        * the final portion of a received packet, it's almost certain that
487        * call->nLeft will be smaller than the final buffer. */
488       while (call->iovNBytes && call->iovNext < call->iovMax && curp) {
489         
490         t = MIN((int)call->curlen, call->iovNBytes);
491         t = MIN(t, (int)call->nLeft);
492         call_iov->iov_base = call->curpos;
493         call_iov->iov_len = t;
494         call_iov++;
495         call->iovNext++;
496         call->iovNBytes -= t;
497         call->curpos += t;
498         call->curlen -= t;
499         call->nLeft -= t;
500
501         if (!call->nLeft) {
502           /* out of packet.  Get another one. */
503           queue_Append(&call->iovq, curp);
504           curp = call->currentPacket = (struct rx_packet *)0;
505         }
506         else if (!call->curlen) {
507           /* need to get another struct iov */
508           if (++call->curvec >= curp->niovecs) {
509             /* current packet is exhausted, get ready for another */
510             /* don't worry about curvec and stuff, they get set somewhere else */
511             queue_Append(&call->iovq, curp);
512             curp = call->currentPacket = (struct rx_packet *)0;
513             call->nLeft = 0;
514           }
515           else {
516             cur_iov++;
517             call->curpos = (char *)cur_iov->iov_base;
518             call->curlen = cur_iov->iov_len;
519           }
520         }  
521       }  
522     }
523
524     /* If we consumed any packets then check whether we need to
525      * send a hard ack. */
526     if (didConsume && (!(call->flags &RX_CALL_RECEIVE_DONE))) {
527       if (call->nHardAcks > (u_short)rxi_HardAckRate) {
528         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
529         rxi_SendAck(call, 0, seq, serial, flags, RX_ACK_DELAY, 0);
530         didHardAck = 1;
531       }
532       else {
533         struct clock when;
534         clock_GetTime(&when);
535         /* Delay to consolidate ack packets */
536         clock_Add(&when, &rx_hardAckDelay);
537         if (!call->delayedAckEvent ||
538           clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
539           rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
540           CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
541           call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
542                                                call, 0);
543         }
544       }
545     }
546     return didHardAck;
547 }
548
549
550 /* rxi_ReadvProc -- internal version.
551  *
552  * Fills in an iovec with pointers to the packet buffers. All packets
553  * except the last packet (new current packet) are moved to the iovq
554  * while the application is processing the data.
555  *
556  * LOCKS USED -- called at netpri with rx global lock and call->lock held.
557  */
558 int rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, 
559         int maxio, int nbytes)
560 {
561     struct rx_packet *rp; 
562     struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
563     int requestCount;
564     int nextio;
565
566     requestCount = nbytes;
567     nextio = 0;
568
569     /* Free any packets from the last call to ReadvProc/WritevProc */
570     for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
571         queue_Remove(rp);
572         rxi_FreePacket(rp);
573     }
574
575     if (call->mode == RX_MODE_SENDING) {
576         rxi_FlushWrite(call);
577     }
578
579     if (call->error) {
580         return 0;
581     }
582
583     /* Get whatever data is currently available in the receive queue.
584      * If rxi_FillReadVec sends an ack packet then it is possible
585      * that we will receive more data while we drop the call lock
586      * to send the packet. Set the RX_CALL_IOVEC_WAIT flag
587      * here to avoid a race with the receive thread if we send
588      * hard acks in rxi_FillReadVec. */
589     call->flags |= RX_CALL_IOVEC_WAIT;
590     call->iovNBytes = nbytes;
591     call->iovMax = maxio;
592     call->iovNext = 0;
593     call->iov = iov;
594     rxi_FillReadVec(call, 0, 0, 0);
595
596     /* if we need more data then sleep until the receive thread has
597      * filled in the rest. */
598     if (!call->error && call->iovNBytes &&
599         call->iovNext < call->iovMax &&
600         !(call->flags & RX_CALL_RECEIVE_DONE)) {
601       call->flags |= RX_CALL_READER_WAIT;
602       clock_NewTime();
603       call->startWait = clock_Sec();
604       while (call->flags & RX_CALL_READER_WAIT) {
605 #ifdef  RX_ENABLE_LOCKS
606         CV_WAIT(&call->cv_rq, &call->lock);
607 #else
608         osi_rxSleep(&call->rq);
609 #endif
610       }
611       call->startWait = 0;
612     }
613     call->flags &= ~RX_CALL_IOVEC_WAIT;
614 #ifdef RX_ENABLE_LOCKS
615     if (call->error) {
616       return 0;
617     }
618 #endif /* RX_ENABLE_LOCKS */
619
620     call->iov = NULL;
621     *nio = call->iovNext;
622     return nbytes - call->iovNBytes;
623 }
624
625 int rx_ReadvProc(struct rx_call *call, struct iovec *iov, 
626         int *nio, int maxio, int nbytes)
627 {
628     int bytes;
629     SPLVAR;
630
631     NETPRI;
632     AFS_RXGLOCK();
633     MUTEX_ENTER(&call->lock);
634     bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
635     MUTEX_EXIT(&call->lock);
636     AFS_RXGUNLOCK();
637     USERPRI;
638     return bytes;
639 }
640
641 /* rxi_WriteProc -- internal version.
642  *
643  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
644
645 int rxi_WriteProc(register struct rx_call *call, register char *buf,    
646         register int nbytes)
647 {
648     struct rx_connection *conn = call->conn;
649     register struct rx_packet *cp = call->currentPacket;
650     register struct rx_packet *tp; /* Temporary packet pointer */
651     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
652     register unsigned int t;
653     int requestCount = nbytes;
654
655     /* Free any packets from the last call to ReadvProc/WritevProc */
656     if (!queue_IsEmpty(&call->iovq)) {
657         for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
658             queue_Remove(tp);
659             rxi_FreePacket(tp);
660         }
661     }
662
663     if (call->mode != RX_MODE_SENDING) {
664         if ((conn->type == RX_SERVER_CONNECTION)
665             && (call->mode == RX_MODE_RECEIVING)) {
666             call->mode = RX_MODE_SENDING;
667             if (cp) {
668                 rxi_FreePacket(cp);
669                 cp = call->currentPacket = (struct rx_packet *) 0;
670                 call->nLeft = 0;
671                 call->nFree = 0;
672             }
673         }
674         else {
675             return 0;
676         }
677     }
678
679     /* Loop condition is checked at end, so that a write of 0 bytes
680      * will force a packet to be created--specially for the case where
681      * there are 0 bytes on the stream, but we must send a packet
682      * anyway. */
683     do {
684         if (call->nFree == 0) {
685             if (!call->error && cp) {
686 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
687                 /* Wait until TQ_BUSY is reset before adding any
688                  * packets to the transmit queue
689                  */
690                 while (call->flags & RX_CALL_TQ_BUSY) {
691                     call->flags |= RX_CALL_TQ_WAIT;
692 #ifdef RX_ENABLE_LOCKS
693                     CV_WAIT(&call->cv_tq, &call->lock);
694 #else /* RX_ENABLE_LOCKS */
695                     osi_rxSleep(&call->tq);
696 #endif /* RX_ENABLE_LOCKS */
697                 }
698 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
699                 clock_NewTime(); /* Bogus:  need new time package */
700                 /* The 0, below, specifies that it is not the last packet: 
701                  * there will be others. PrepareSendPacket may
702                  * alter the packet length by up to
703                  * conn->securityMaxTrailerSize */
704                 hadd32(call->bytesSent, cp->length);
705                 rxi_PrepareSendPacket(call, cp, 0);
706                 queue_Append(&call->tq, cp);
707                 cp = call->currentPacket = NULL;
708                 if (!(call->flags & (RX_CALL_FAST_RECOVER|
709                                      RX_CALL_FAST_RECOVER_WAIT))) {
710                     rxi_Start(0, call, 0);
711                 }
712             }
713             /* Wait for transmit window to open up */
714             while (!call->error && call->tnext + 1 > call->tfirst + call->twind) {
715                 clock_NewTime();
716                 call->startWait = clock_Sec();
717
718 #ifdef  RX_ENABLE_LOCKS
719                 CV_WAIT(&call->cv_twind, &call->lock);
720 #else
721                 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
722                 osi_rxSleep(&call->twind);
723 #endif
724
725                 call->startWait = 0;
726 #ifdef RX_ENABLE_LOCKS
727                 if (call->error) {
728                     return 0;
729                 }
730 #endif /* RX_ENABLE_LOCKS */
731             }
732             if ((cp = rxi_AllocSendPacket(call, nbytes))) {
733                 call->currentPacket = cp;
734                 call->nFree = cp->length;
735                 call->curvec = 1;   /* 0th vec is always header */
736                 /* begin at the beginning [ more or less ], continue 
737                  * on until the end, then stop. */
738                 call->curpos = (char *)cp->wirevec[1].iov_base
739                                + call->conn->securityHeaderSize; 
740                 call->curlen = cp->wirevec[1].iov_len
741                                - call->conn->securityHeaderSize; 
742             }
743             if (call->error) {
744                 if (cp) {
745                   rxi_FreePacket(cp);
746                   call->currentPacket = NULL;
747                 }
748                 return 0;
749             }
750         }
751
752         if (cp && (int)call->nFree < nbytes) {
753               /* Try to extend the current buffer */
754               register int len, mud;
755               len = cp->length;
756               mud = rx_MaxUserDataSize(call);
757               if (mud > len) {
758                 int want;
759                 want = MIN(nbytes - (int)call->nFree, mud - len);
760                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
761                 if (cp->length > (unsigned)mud)
762                     cp->length = mud;
763                 call->nFree += (cp->length - len);
764               }
765         }
766
767         /* If the remaining bytes fit in the buffer, then store them
768          * and return.  Don't ship a buffer that's full immediately to
769          * the peer--we don't know if it's the last buffer yet */
770
771         if (!cp)  {
772           call->nFree = 0;
773         }
774
775         while (nbytes && call->nFree) {
776         
777           t = MIN((int)call->curlen, nbytes);
778           t = MIN((int)call->nFree, t);
779           memcpy(call->curpos, buf, t);
780           buf += t;
781           nbytes -= t;
782           call->curpos += t;
783           call->curlen -= t;
784           call->nFree -= t;
785
786           if (!call->curlen) {
787             /* need to get another struct iov */
788             if (++call->curvec >= cp->niovecs) {
789               /* current packet is full, extend or send it */
790               call->nFree = 0;
791             } else {
792               call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
793               call->curlen = cp->wirevec[call->curvec].iov_len;
794             }
795           }  
796         } /* while bytes to send and room to send them */
797
798         /* might be out of space now */
799         if (!nbytes) {
800             return requestCount;
801         }
802         else ; /* more data to send, so get another packet and keep going */
803     } while(nbytes);
804
805     return requestCount - nbytes;
806 }
807
808 int rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
809 {
810     int bytes;
811     int tcurlen;
812     int tnFree;
813     char *tcurpos;
814     SPLVAR;
815
816     /*
817      * Free any packets from the last call to ReadvProc/WritevProc.
818      * We do not need the lock because the receiver threads only
819      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
820      * RX_CALL_IOVEC_WAIT is always cleared before returning from
821      * ReadvProc/WritevProc.
822      */
823     if (!queue_IsEmpty(&call->iovq)) {
824         register struct rx_packet *rp; 
825         register struct rx_packet *nxp;
826         for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
827             queue_Remove(rp);
828             rxi_FreePacket(rp);
829         }
830     }
831
832     /*
833      * Most common case: all of the data fits in the current iovec.
834      * We do not need the lock because this is the only thread that
835      * updates the curlen, curpos, nFree fields.
836      *
837      * We are relying on nFree being zero unless the call is in send mode.
838      */
839     tcurlen = (int)call->curlen;
840     tnFree = (int)call->nFree;
841     if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
842         tcurpos = call->curpos;
843         memcpy(tcurpos, buf, nbytes);
844         call->curpos = tcurpos + nbytes;
845         call->curlen = tcurlen - nbytes;
846         call->nFree = tnFree - nbytes;
847         return nbytes;
848     }
849
850     NETPRI;
851     AFS_RXGLOCK();
852     MUTEX_ENTER(&call->lock);
853     bytes = rxi_WriteProc(call, buf, nbytes);
854     MUTEX_EXIT(&call->lock);
855     AFS_RXGUNLOCK();
856     USERPRI;
857     return bytes;
858 }
859
860 /* Optimization for marshalling 32 bit arguments */
861 int rx_WriteProc32(register struct rx_call *call, register afs_int32 *value)
862 {
863     int bytes;
864     int tcurlen;
865     int tnFree;
866     char *tcurpos;
867     SPLVAR;
868
869     /*
870      * Free any packets from the last call to ReadvProc/WritevProc.
871      * We do not need the lock because the receiver threads only
872      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
873      * RX_CALL_IOVEC_WAIT is always cleared before returning from
874      * ReadvProc/WritevProc.
875      */
876     if (!queue_IsEmpty(&call->iovq)) {
877         register struct rx_packet *rp; 
878         register struct rx_packet *nxp;
879         for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
880             queue_Remove(rp);
881             rxi_FreePacket(rp);
882         }
883     }
884
885     /*
886      * Most common case: all of the data fits in the current iovec.
887      * We do not need the lock because this is the only thread that
888      * updates the curlen, curpos, nFree fields.
889      *
890      * We are relying on nFree being zero unless the call is in send mode.
891      */
892     tcurlen = (int)call->curlen;
893     tnFree = (int)call->nFree;
894     if (!call->error && tcurlen >= sizeof(afs_int32) && tnFree >= sizeof(afs_int32)) {
895         tcurpos = call->curpos;
896         if (!((long)tcurpos & (sizeof(afs_int32)-1))) {
897             *((afs_int32 *)(tcurpos)) = *value;
898         } else {
899             memcpy(tcurpos, (char *)value, sizeof(afs_int32));
900         }
901         call->curpos = tcurpos + sizeof(afs_int32);
902         call->curlen = tcurlen - sizeof(afs_int32);
903         call->nFree = tnFree - sizeof(afs_int32);
904         return sizeof(afs_int32);
905     }
906
907     NETPRI;
908     AFS_RXGLOCK();
909     MUTEX_ENTER(&call->lock);
910     bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
911     MUTEX_EXIT(&call->lock);
912     AFS_RXGUNLOCK();
913     USERPRI;
914     return bytes;
915 }
916
917 /* rxi_WritevAlloc -- internal version.
918  *
919  * Fill in an iovec to point to data in packet buffers. The application
920  * calls rxi_WritevProc when the buffers are full.
921  *
922  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
923
924 int rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, 
925         int *nio, int maxio, int nbytes)
926 {
927     struct rx_connection *conn = call->conn;
928     struct rx_packet *cp = call->currentPacket;
929     struct rx_packet *tp; /* temporary packet pointer */
930     struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
931     int requestCount;
932     int nextio;
933     /* Temporary values, real work is done in rxi_WritevProc */
934     int tnFree; 
935     int tcurvec;
936     char * tcurpos;
937     int tcurlen;
938
939     requestCount = nbytes;
940     nextio = 0;
941     
942     /* Free any packets from the last call to ReadvProc/WritevProc */
943     for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
944         queue_Remove(tp);
945         rxi_FreePacket(tp);
946     }
947
948     if (call->mode != RX_MODE_SENDING) {
949         if ((conn->type == RX_SERVER_CONNECTION)
950             && (call->mode == RX_MODE_RECEIVING)) {
951             call->mode = RX_MODE_SENDING;
952             if (cp) {
953                 rxi_FreePacket(cp);
954                 cp = call->currentPacket = (struct rx_packet *) 0;
955                 call->nLeft = 0;
956                 call->nFree = 0;
957             }
958         }
959         else {
960             return 0;
961         }
962     }
963
964     /* Set up the iovec to point to data in packet buffers. */
965     tnFree = call->nFree;
966     tcurvec = call->curvec;
967     tcurpos = call->curpos;
968     tcurlen = call->curlen;
969     do {
970       register unsigned int t;
971
972       if (tnFree == 0) {
973         /* current packet is full, allocate a new one */
974         cp = rxi_AllocSendPacket(call, nbytes);
975         if (cp == NULL) {
976           /* out of space, return what we have */
977           *nio = nextio;
978           return requestCount - nbytes;
979         }
980         queue_Append(&call->iovq, cp);
981         tnFree = cp->length;
982         tcurvec = 1;
983         tcurpos = (char *)cp->wirevec[1].iov_base
984                   + call->conn->securityHeaderSize;
985         tcurlen = cp->wirevec[1].iov_len
986                   - call->conn->securityHeaderSize;
987       }
988
989       if (tnFree < nbytes) {
990         /* try to extend the current packet */
991         register int len, mud;
992         len = cp->length;
993         mud = rx_MaxUserDataSize(call);
994         if (mud > len) {
995           int want;
996           want = MIN(nbytes - tnFree, mud - len);
997           rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
998           if (cp->length > (unsigned)mud)
999             cp->length = mud;
1000           tnFree += (cp->length - len);
1001           if (cp == call->currentPacket) {
1002             call->nFree += (cp->length - len);
1003           }
1004         }
1005       }
1006
1007       /* fill in the next entry in the iovec */
1008       t = MIN(tcurlen, nbytes);
1009       t = MIN(tnFree, t);
1010       iov[nextio].iov_base = tcurpos;
1011       iov[nextio].iov_len = t;
1012       nbytes -= t;
1013       tcurpos += t;
1014       tcurlen -= t;
1015       tnFree -= t;
1016       nextio++;
1017
1018       if (!tcurlen) {
1019           /* need to get another struct iov */
1020           if (++tcurvec >= cp->niovecs) {
1021             /* current packet is full, extend it or move on to next packet */
1022             tnFree = 0;
1023           } else { 
1024             tcurpos = (char *)cp->wirevec[tcurvec].iov_base;
1025             tcurlen = cp->wirevec[tcurvec].iov_len;
1026           }
1027       }
1028     } while (nbytes && nextio < maxio);
1029     *nio = nextio;
1030     return requestCount - nbytes;
1031 }
1032
1033 int rx_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, 
1034         int maxio, int nbytes)
1035 {
1036     int bytes;
1037     SPLVAR;
1038
1039     NETPRI;
1040     AFS_RXGLOCK();
1041     MUTEX_ENTER(&call->lock);
1042     bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
1043     MUTEX_EXIT(&call->lock);
1044     AFS_RXGUNLOCK();
1045     USERPRI;
1046     return bytes;
1047 }
1048
1049 /* rxi_WritevProc -- internal version.
1050  *
1051  * Send buffers allocated in rxi_WritevAlloc.
1052  *
1053  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
1054
1055 int rxi_WritevProc(struct rx_call *call, struct iovec *iov, 
1056         int nio, int nbytes)
1057 {
1058     struct rx_packet *cp = call->currentPacket;
1059     register struct rx_packet *tp; /* Temporary packet pointer */
1060     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1061     int nextio;
1062     int requestCount;
1063     struct rx_queue tmpq;
1064
1065     requestCount = nbytes;
1066     nextio = 0;
1067
1068     if (call->mode != RX_MODE_SENDING) {
1069         call->error = RX_PROTOCOL_ERROR;
1070     }
1071
1072 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1073     /* Wait until TQ_BUSY is reset before trying to move any
1074      * packets to the transmit queue.  */
1075     while (!call->error && call->flags & RX_CALL_TQ_BUSY) {
1076         call->flags |= RX_CALL_TQ_WAIT;
1077 #ifdef RX_ENABLE_LOCKS
1078         CV_WAIT(&call->cv_tq, &call->lock);
1079 #else /* RX_ENABLE_LOCKS */
1080         osi_rxSleep(&call->tq);
1081 #endif /* RX_ENABLE_LOCKS */
1082     }
1083 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1084
1085     if (call->error) {
1086         for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1087             queue_Remove(tp);
1088             rxi_FreePacket(tp);
1089         }
1090         if (cp) {
1091             rxi_FreePacket(cp);
1092             cp = call->currentPacket = NULL;
1093         }
1094         return 0;
1095     }
1096
1097     /* Loop through the I/O vector adjusting packet pointers.
1098      * Place full packets back onto the iovq once they are ready
1099      * to send. Set RX_PROTOCOL_ERROR if any problems are found in
1100      * the iovec. We put the loop condition at the end to ensure that
1101      * a zero length write will push a short packet. */
1102     nextio = 0;
1103     queue_Init(&tmpq);
1104     do {
1105         if (call->nFree == 0 && cp) {
1106             clock_NewTime(); /* Bogus:  need new time package */
1107             /* The 0, below, specifies that it is not the last packet: 
1108              * there will be others. PrepareSendPacket may
1109              * alter the packet length by up to
1110              * conn->securityMaxTrailerSize */
1111             hadd32(call->bytesSent, cp->length);
1112             rxi_PrepareSendPacket(call, cp, 0);
1113             queue_Append(&tmpq, cp);
1114
1115             /* The head of the iovq is now the current packet */
1116             if (nbytes) {
1117                 if (queue_IsEmpty(&call->iovq)) {
1118                     call->error = RX_PROTOCOL_ERROR;
1119                     cp = call->currentPacket = NULL;
1120                     for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
1121                         queue_Remove(tp);
1122                         rxi_FreePacket(tp);
1123                     }
1124                     return 0;
1125                 }
1126                 cp = queue_First(&call->iovq, rx_packet);
1127                 queue_Remove(cp);
1128                 call->currentPacket = cp;
1129                 call->nFree = cp->length;
1130                 call->curvec = 1;
1131                 call->curpos = (char *)cp->wirevec[1].iov_base
1132                                + call->conn->securityHeaderSize;
1133                 call->curlen = cp->wirevec[1].iov_len
1134                                - call->conn->securityHeaderSize;
1135             }
1136         }
1137
1138         if (nbytes) {
1139           /* The next iovec should point to the current position */
1140           if (iov[nextio].iov_base != call->curpos
1141               || iov[nextio].iov_len > (int)call->curlen) {
1142               call->error = RX_PROTOCOL_ERROR;
1143               for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
1144                   queue_Remove(tp);
1145                   rxi_FreePacket(tp);
1146               }
1147               if (cp) {
1148                 rxi_FreePacket(cp);
1149                 call->currentPacket = NULL;
1150               }
1151               return 0;
1152           }
1153           nbytes -= iov[nextio].iov_len;
1154           call->curpos += iov[nextio].iov_len;
1155           call->curlen -= iov[nextio].iov_len;
1156           call->nFree -= iov[nextio].iov_len;
1157           nextio++;
1158           if (call->curlen == 0) {
1159             if (++call->curvec > cp->niovecs) {
1160                 call->nFree = 0;
1161             } else {
1162                 call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
1163                 call->curlen = cp->wirevec[call->curvec].iov_len;
1164             }
1165           }
1166         }
1167     } while (nbytes && nextio < nio);
1168
1169     /* Move the packets from the temporary queue onto the transmit queue.
1170      * We may end up with more than call->twind packets on the queue. */
1171     for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
1172         queue_Remove(tp);
1173         queue_Append(&call->tq, tp);
1174     }
1175
1176     if (!(call->flags & (RX_CALL_FAST_RECOVER|RX_CALL_FAST_RECOVER_WAIT))) {
1177         rxi_Start(0, call, 0);
1178     }
1179
1180     /* Wait for the length of the transmit queue to fall below call->twind */
1181     while (!call->error && call->tnext + 1 > call->tfirst + call->twind) {
1182         clock_NewTime();
1183         call->startWait = clock_Sec();
1184 #ifdef  RX_ENABLE_LOCKS
1185         CV_WAIT(&call->cv_twind, &call->lock);
1186 #else
1187         call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
1188         osi_rxSleep(&call->twind);
1189 #endif
1190         call->startWait = 0;
1191     }
1192
1193     if (call->error) {
1194         if (cp) {
1195             rxi_FreePacket(cp);
1196             cp = call->currentPacket = NULL;
1197         }
1198         return 0;
1199     }
1200
1201     return requestCount - nbytes;
1202 }
1203
1204 int rx_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1205 {
1206     int bytes;
1207     SPLVAR;
1208
1209     NETPRI;
1210     AFS_RXGLOCK();
1211     MUTEX_ENTER(&call->lock);
1212     bytes = rxi_WritevProc(call, iov, nio, nbytes);
1213     MUTEX_EXIT(&call->lock);
1214     AFS_RXGUNLOCK();
1215     USERPRI;
1216     return bytes;
1217 }
1218
1219 /* Flush any buffered data to the stream, switch to read mode
1220  * (clients) or to EOF mode (servers) */
1221 void rxi_FlushWrite(register struct rx_call *call)
1222 {
1223     register struct rx_packet *cp = call->currentPacket;
1224     register struct rx_packet *tp; /* Temporary packet pointer */
1225     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1226
1227     /* Free any packets from the last call to ReadvProc/WritevProc */
1228     for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1229         queue_Remove(tp);
1230         rxi_FreePacket(tp);
1231     }
1232
1233     if (call->mode == RX_MODE_SENDING) {
1234
1235         call->mode = (call->conn->type == RX_CLIENT_CONNECTION ?
1236                                           RX_MODE_RECEIVING: RX_MODE_EOF);
1237
1238 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1239         /* Wait until TQ_BUSY is reset before adding any
1240          * packets to the transmit queue
1241          */
1242         while (call->flags & RX_CALL_TQ_BUSY) {
1243             call->flags |= RX_CALL_TQ_WAIT;
1244 #ifdef RX_ENABLE_LOCKS
1245             CV_WAIT(&call->cv_tq, &call->lock);
1246 #else /* RX_ENABLE_LOCKS */
1247             osi_rxSleep(&call->tq);
1248 #endif /* RX_ENABLE_LOCKS */
1249         }
1250 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1251
1252         if (cp) {
1253             /* cp->length is only supposed to be the user's data */
1254             /* cp->length was already set to (then-current) 
1255              * MaxUserDataSize or less. */
1256             cp->length -= call->nFree;
1257             call->currentPacket = (struct rx_packet *) 0;
1258             call->nFree = 0;
1259         }
1260         else {
1261             cp = rxi_AllocSendPacket(call,0);
1262             if (!cp) {
1263                 /* Mode can no longer be MODE_SENDING */
1264                 return;
1265             }
1266             cp->length = 0;
1267             cp->niovecs = 1;  /* just the header */
1268             call->nFree = 0;
1269         }
1270
1271         /* The 1 specifies that this is the last packet */
1272         hadd32(call->bytesSent, cp->length);
1273         rxi_PrepareSendPacket(call, cp, 1);
1274         queue_Append(&call->tq, cp);
1275         if (!(call->flags & (RX_CALL_FAST_RECOVER|
1276                              RX_CALL_FAST_RECOVER_WAIT))) {
1277             rxi_Start(0, call, 0);
1278         }
1279     }
1280 }
1281
1282 /* Flush any buffered data to the stream, switch to read mode
1283  * (clients) or to EOF mode (servers) */
1284 void rx_FlushWrite(struct rx_call *call)
1285 {
1286     SPLVAR;
1287     NETPRI;
1288     AFS_RXGLOCK();
1289     MUTEX_ENTER(&call->lock);
1290     rxi_FlushWrite(call);
1291     MUTEX_EXIT(&call->lock);
1292     AFS_RXGUNLOCK();
1293     USERPRI;
1294 }