allow-rx-shutdown-for-userlevel-20010212
[openafs.git] / src / rx / rx_rdwr.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #ifdef  KERNEL
11 #include "../afs/param.h"
12 #ifndef UKERNEL
13 #include "../h/types.h"
14 #include "../h/time.h"
15 #include "../h/stat.h"
16 #ifdef  AFS_OSF_ENV
17 #include <net/net_globals.h>
18 #endif  /* AFS_OSF_ENV */
19 #ifdef AFS_LINUX20_ENV
20 #include "../h/socket.h"
21 #endif
22 #include "../netinet/in.h"
23 #if defined(AFS_SGI_ENV)
24 #include "../afs/sysincludes.h"
25 #endif
26 #include "../afs/afs_args.h"
27 #include "../afs/afs_osi.h"
28 #if     (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
29 #include "../h/systm.h"
30 #endif
31 #else /* !UKERNEL */
32 #include "../afs/sysincludes.h"
33 #endif /* !UKERNEL */
34 #ifdef RXDEBUG
35 #undef RXDEBUG      /* turn off debugging */
36 #endif /* RXDEBUG */
37 #include "../afsint/afsint.h"
38
39 #include "../rx/rx_kmutex.h"
40 #include "../rx/rx_kernel.h"
41 #include "../rx/rx_clock.h"
42 #include "../rx/rx_queue.h"
43 #include "../rx/rx.h"
44 #include "../rx/rx_globals.h"
45 #include "../afs/lock.h"
46 #include "../afsint/afsint.h"
47 #ifdef  AFS_ALPHA_ENV
48 #undef kmem_alloc
49 #undef kmem_free
50 #undef mem_alloc
51 #undef mem_free
52 #undef register
53 #endif  /* AFS_ALPHA_ENV */
54 #else /* KERNEL */
55 # include <afs/param.h>
56 # include <sys/types.h>
57 #ifndef AFS_NT40_ENV
58 # include <sys/socket.h>
59 # include <sys/file.h>
60 # include <netdb.h>
61 # include <netinet/in.h>
62 # include <sys/stat.h>
63 # include <sys/time.h>
64 #endif
65 # include "rx_user.h"
66 # include "rx_clock.h"
67 # include "rx_queue.h"
68 # include "rx.h"
69 # include "rx_globals.h"
70 # include "rx_internal.h"
71 #endif /* KERNEL */
72
73
74 #ifdef RX_LOCKS_DB
75 /* rxdb_fileID is used to identify the lock location, along with line#. */
76 static int rxdb_fileID = RXDB_FILE_RX_RDWR;
77 #endif /* RX_LOCKS_DB */
78 /* rxi_ReadProc -- internal version.
79  *
80  * LOCKS USED -- called at netpri with rx global lock and call->lock held.
81  */
82 int rxi_ReadProc(call, buf, nbytes)
83     register struct rx_call *call;
84     register char *buf;
85     register int nbytes;
86 {
87     register struct rx_packet *cp = call->currentPacket;
88     register struct rx_packet *rp;
89     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
90     register int requestCount;
91     register unsigned int t;
92 /* XXXX took out clock_NewTime from here.  Was it needed? */
93     requestCount = nbytes;
94
95     /* Free any packets from the last call to ReadvProc/WritevProc */
96     if (!queue_IsEmpty(&call->iovq)) {
97         for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
98             queue_Remove(rp);
99             rxi_FreePacket(rp);
100         }
101     }
102
103     do {
104       if (call->nLeft == 0) {
105         /* Get next packet */
106         for (;;) {
107           if (call->error || (call->mode != RX_MODE_RECEIVING)) {
108             if (call->error) {
109               return 0;
110             }
111             if (call->mode == RX_MODE_SENDING) {
112               rxi_FlushWrite(call);
113               continue;
114             }
115           }
116           if (queue_IsNotEmpty(&call->rq)) {
117             /* Check that next packet available is next in sequence */
118             rp = queue_First(&call->rq, rx_packet);
119             if (rp->header.seq == call->rnext) {
120               afs_int32 error;
121               register struct rx_connection *conn = call->conn;
122               queue_Remove(rp);
123               
124               /* RXS_CheckPacket called to undo RXS_PreparePacket's
125                * work.  It may reduce the length of the packet by up
126                * to conn->maxTrailerSize, to reflect the length of the
127                * data + the header. */
128               if (error = RXS_CheckPacket(conn->securityObject, call, rp)) {
129                 /* Used to merely shut down the call, but now we 
130                  * shut down the whole connection since this may 
131                  * indicate an attempt to hijack it */
132
133                 MUTEX_EXIT(&call->lock);
134                 rxi_ConnectionError(conn, error);
135                 MUTEX_ENTER(&conn->conn_data_lock);
136                 rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
137                 MUTEX_EXIT(&conn->conn_data_lock);
138                 rxi_FreePacket(rp);
139                 MUTEX_ENTER(&call->lock);
140
141                 return 0;
142               }
143               call->rnext++;
144               cp = call->currentPacket = rp;
145               call->curvec = 1;   /* 0th vec is always header */
146               /* begin at the beginning [ more or less ], continue 
147                * on until the end, then stop. */
148               call->curpos = (char *)cp->wirevec[1].iov_base
149                              + call->conn->securityHeaderSize; 
150               call->curlen = cp->wirevec[1].iov_len
151                              - call->conn->securityHeaderSize; 
152               
153               /* Notice that this code works correctly if the data
154                * size is 0 (which it may be--no reply arguments from
155                * server, for example).  This relies heavily on the
156                * fact that the code below immediately frees the packet
157                * (no yields, etc.).  If it didn't, this would be a
158                * problem because a value of zero for call->nLeft
159                * normally means that there is no read packet */
160               call->nLeft = cp->length;
161               hadd32(call->bytesRcvd, cp->length);
162               
163               /* Send a hard ack for every rxi_HardAckRate+1 packets
164                * consumed. Otherwise schedule an event to send
165                * the hard ack later on.
166                */
167               call->nHardAcks++;
168               if (!(call->flags &RX_CALL_RECEIVE_DONE)) {
169                 if (call->nHardAcks > (u_short)rxi_HardAckRate) {
170                   rxevent_Cancel(call->delayedAckEvent, call,
171                                  RX_CALL_REFCOUNT_DELAY);
172                   rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
173                 }
174                 else {
175                   struct clock when;
176                   clock_GetTime(&when);
177                   /* Delay to consolidate ack packets */
178                   clock_Add(&when, &rx_hardAckDelay);
179                   if (!call->delayedAckEvent ||
180                       clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
181                       rxevent_Cancel(call->delayedAckEvent, call,
182                                      RX_CALL_REFCOUNT_DELAY);
183                       CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
184                       call->delayedAckEvent = rxevent_Post(&when,
185                                                            rxi_SendDelayedAck,
186                                                            call, 0);
187                   }
188                 }
189               }
190               break;
191             }
192           }
193
194 /*
195 MTUXXX  doesn't there need to be an "else" here ??? 
196 */
197           /* Are there ever going to be any more packets? */
198           if (call->flags & RX_CALL_RECEIVE_DONE) {
199             return requestCount - nbytes;
200           }
201           /* Wait for in-sequence packet */
202           call->flags |= RX_CALL_READER_WAIT;
203           clock_NewTime();
204           call->startWait = clock_Sec();
205           while (call->flags & RX_CALL_READER_WAIT) {
206 #ifdef  RX_ENABLE_LOCKS
207               CV_WAIT(&call->cv_rq, &call->lock);
208 #else
209               osi_rxSleep(&call->rq);
210 #endif
211           }
212
213           call->startWait = 0;
214 #ifdef RX_ENABLE_LOCKS
215           if (call->error) {
216               return 0;
217           }
218 #endif /* RX_ENABLE_LOCKS */
219         }
220       }
221       else /* assert(cp); */  /* MTUXXX  this should be replaced by some error-recovery code before shipping */
222 /* yes, the following block is allowed to be the ELSE clause (or not) */
223
224         /* It's possible for call->nLeft to be smaller than any particular
225          * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
226          * reflects the size of the buffer.  We have to keep track of the
227          * number of bytes read in the length field of the packet struct.  On
228          * the final portion of a received packet, it's almost certain that
229          * call->nLeft will be smaller than the final buffer. */
230
231         while (nbytes && cp) {
232           t = MIN((int)call->curlen, nbytes);
233           t = MIN(t, (int)call->nLeft);
234           bcopy (call->curpos, buf, t);
235           buf += t;
236           nbytes -= t;
237           call->curpos += t;
238           call->curlen -= t;
239           call->nLeft -= t;
240
241           if (!call->nLeft) {
242             /* out of packet.  Get another one. */
243             rxi_FreePacket(cp);
244             cp = call->currentPacket = (struct rx_packet *)0;
245           }
246           else if (!call->curlen) {
247             /* need to get another struct iov */
248             if (++call->curvec >= cp->niovecs) {
249               /* current packet is exhausted, get ready for another */
250               /* don't worry about curvec and stuff, they get set somewhere else */
251               rxi_FreePacket(cp);
252               cp = call->currentPacket = (struct rx_packet *)0;
253               call->nLeft = 0;
254             }
255             else {
256               call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
257               call->curlen = cp->wirevec[call->curvec].iov_len;
258             }
259           }  
260         }
261         if (!nbytes) {
262           /* user buffer is full, return */
263           return requestCount;
264         }
265
266     } while (nbytes);
267
268     return requestCount;
269 }
270
271 int rx_ReadProc(call, buf, nbytes)
272   struct rx_call *call;
273   char *buf;
274   int nbytes;
275 {
276     int bytes;
277     int tcurlen;
278     int tnLeft;
279     char *tcurpos;
280     SPLVAR;
281
282     /*
283      * Free any packets from the last call to ReadvProc/WritevProc.
284      * We do not need the lock because the receiver threads only
285      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
286      * RX_CALL_IOVEC_WAIT is always cleared before returning from
287      * ReadvProc/WritevProc.
288      */
289     if (!queue_IsEmpty(&call->iovq)) {
290         register struct rx_packet *rp; 
291         register struct rx_packet *nxp;
292         for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
293             queue_Remove(rp);
294             rxi_FreePacket(rp);
295         }
296     }
297
298     /*
299      * Most common case, all of the data is in the current iovec.
300      * We do not need the lock because this is the only thread that
301      * updates the curlen, curpos, nLeft fields.
302      *
303      * We are relying on nLeft being zero unless the call is in receive mode.
304      */
305     tcurlen = call->curlen;
306     tnLeft = call->nLeft;
307     if (!call->error && tcurlen > nbytes && tnLeft > nbytes) {
308         tcurpos = call->curpos;
309         bcopy(tcurpos, buf, nbytes);
310         call->curpos = tcurpos + nbytes;
311         call->curlen = tcurlen - nbytes;
312         call->nLeft = tnLeft - nbytes;
313         return nbytes;
314     }
315
316     NETPRI;
317     AFS_RXGLOCK();
318     MUTEX_ENTER(&call->lock);
319     bytes = rxi_ReadProc(call, buf, nbytes);
320     MUTEX_EXIT(&call->lock);
321     AFS_RXGUNLOCK();
322     USERPRI;
323     return bytes;
324 }
325
326 /* Optimization for unmarshalling 32 bit integers */
327 int rx_ReadProc32(call, value)
328   struct rx_call *call;
329   afs_int32 *value;
330 {
331     int bytes;
332     int tcurlen;
333     int tnLeft;
334     char *tcurpos;
335     SPLVAR;
336
337     /*
338      * Free any packets from the last call to ReadvProc/WritevProc.
339      * We do not need the lock because the receiver threads only
340      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
341      * RX_CALL_IOVEC_WAIT is always cleared before returning from
342      * ReadvProc/WritevProc.
343      */
344     if (!queue_IsEmpty(&call->iovq)) {
345         register struct rx_packet *rp; 
346         register struct rx_packet *nxp;
347         for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
348             queue_Remove(rp);
349             rxi_FreePacket(rp);
350         }
351     }
352
353     /*
354      * Most common case, all of the data is in the current iovec.
355      * We do not need the lock because this is the only thread that
356      * updates the curlen, curpos, nLeft fields.
357      *
358      * We are relying on nLeft being zero unless the call is in receive mode.
359      */
360     tcurlen = call->curlen;
361     tnLeft = call->nLeft;
362     if (!call->error && tcurlen > sizeof(afs_int32) && tnLeft > sizeof(afs_int32)) {
363         tcurpos = call->curpos;
364         if (!((long)tcurpos & (sizeof(afs_int32)-1))) {
365             *value = *((afs_int32 *)(tcurpos));
366         } else {
367             bcopy(tcurpos, (char *)value, sizeof(afs_int32));
368         }
369         call->curpos = tcurpos + sizeof(afs_int32);
370         call->curlen = tcurlen - sizeof(afs_int32);
371         call->nLeft = tnLeft - sizeof(afs_int32);
372         return sizeof(afs_int32);
373     }
374
375     NETPRI;
376     AFS_RXGLOCK();
377     MUTEX_ENTER(&call->lock);
378     bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
379     MUTEX_EXIT(&call->lock);
380     AFS_RXGUNLOCK();
381     USERPRI;
382     return bytes;
383 }
384
385 /* rxi_FillReadVec
386  *
387  * Uses packets in the receive queue to fill in as much of the
388  * current iovec as possible. Does not block if it runs out
389  * of packets to complete the iovec. Return true if an ack packet
390  * was sent, otherwise return false */
391 int rxi_FillReadVec(call, seq, serial, flags)
392   struct rx_call *call;
393   afs_uint32 seq, serial, flags;
394 {
395     int didConsume = 0;
396     int didHardAck = 0;
397     register unsigned int t;
398     struct rx_packet *rp;
399     struct rx_packet *curp;
400     struct iovec *call_iov;
401     struct iovec *cur_iov;
402
403     curp = call->currentPacket;
404     if (curp) {
405         cur_iov = &curp->wirevec[call->curvec];
406     }
407     call_iov = &call->iov[call->iovNext];
408
409     while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) {
410       if (call->nLeft == 0) {
411         /* Get next packet */
412         if (queue_IsNotEmpty(&call->rq)) {
413           /* Check that next packet available is next in sequence */
414           rp = queue_First(&call->rq, rx_packet);
415           if (rp->header.seq == call->rnext) {
416             afs_int32 error;
417             register struct rx_connection *conn = call->conn;
418             queue_Remove(rp);
419               
420             /* RXS_CheckPacket called to undo RXS_PreparePacket's
421              * work.  It may reduce the length of the packet by up
422              * to conn->maxTrailerSize, to reflect the length of the
423              * data + the header. */
424             if (error = RXS_CheckPacket(conn->securityObject, call, rp)) {
425               /* Used to merely shut down the call, but now we 
426                * shut down the whole connection since this may 
427                * indicate an attempt to hijack it */
428
429               MUTEX_EXIT(&call->lock);
430               rxi_ConnectionError(conn, error);
431               MUTEX_ENTER(&conn->conn_data_lock);
432               rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
433               MUTEX_EXIT(&conn->conn_data_lock);
434               rxi_FreePacket(rp);
435               MUTEX_ENTER(&call->lock);
436
437               return 1;
438             }
439             call->rnext++;
440             curp = call->currentPacket = rp;
441             call->curvec = 1;   /* 0th vec is always header */
442             cur_iov = &curp->wirevec[1];
443             /* begin at the beginning [ more or less ], continue 
444              * on until the end, then stop. */
445             call->curpos = (char *)curp->wirevec[1].iov_base
446                            + call->conn->securityHeaderSize; 
447             call->curlen = curp->wirevec[1].iov_len
448                            - call->conn->securityHeaderSize; 
449               
450             /* Notice that this code works correctly if the data
451              * size is 0 (which it may be--no reply arguments from
452              * server, for example).  This relies heavily on the
453              * fact that the code below immediately frees the packet
454              * (no yields, etc.).  If it didn't, this would be a
455              * problem because a value of zero for call->nLeft
456              * normally means that there is no read packet */
457             call->nLeft = curp->length;
458             hadd32(call->bytesRcvd, curp->length);
459               
460             /* Send a hard ack for every rxi_HardAckRate+1 packets
461              * consumed. Otherwise schedule an event to send
462              * the hard ack later on.
463              */
464             call->nHardAcks++;
465             didConsume = 1;
466             continue;
467           }
468         }
469         break;
470       }
471
472       /* It's possible for call->nLeft to be smaller than any particular
473        * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
474        * reflects the size of the buffer.  We have to keep track of the
475        * number of bytes read in the length field of the packet struct.  On
476        * the final portion of a received packet, it's almost certain that
477        * call->nLeft will be smaller than the final buffer. */
478       while (call->iovNBytes && call->iovNext < call->iovMax && curp) {
479         
480         t = MIN((int)call->curlen, call->iovNBytes);
481         t = MIN(t, (int)call->nLeft);
482         call_iov->iov_base = call->curpos;
483         call_iov->iov_len = t;
484         call_iov++;
485         call->iovNext++;
486         call->iovNBytes -= t;
487         call->curpos += t;
488         call->curlen -= t;
489         call->nLeft -= t;
490
491         if (!call->nLeft) {
492           /* out of packet.  Get another one. */
493           queue_Append(&call->iovq, curp);
494           curp = call->currentPacket = (struct rx_packet *)0;
495         }
496         else if (!call->curlen) {
497           /* need to get another struct iov */
498           if (++call->curvec >= curp->niovecs) {
499             /* current packet is exhausted, get ready for another */
500             /* don't worry about curvec and stuff, they get set somewhere else */
501             queue_Append(&call->iovq, curp);
502             curp = call->currentPacket = (struct rx_packet *)0;
503             call->nLeft = 0;
504           }
505           else {
506             cur_iov++;
507             call->curpos = (char *)cur_iov->iov_base;
508             call->curlen = cur_iov->iov_len;
509           }
510         }  
511       }  
512     }
513
514     /* If we consumed any packets then check whether we need to
515      * send a hard ack. */
516     if (didConsume && (!(call->flags &RX_CALL_RECEIVE_DONE))) {
517       if (call->nHardAcks > (u_short)rxi_HardAckRate) {
518         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
519         rxi_SendAck(call, 0, seq, serial, flags, RX_ACK_DELAY, 0);
520         didHardAck = 1;
521       }
522       else {
523         struct clock when;
524         clock_GetTime(&when);
525         /* Delay to consolidate ack packets */
526         clock_Add(&when, &rx_hardAckDelay);
527         if (!call->delayedAckEvent ||
528           clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
529           rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
530           CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
531           call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
532                                                call, 0);
533         }
534       }
535     }
536     return didHardAck;
537 }
538
539
540 /* rxi_ReadvProc -- internal version.
541  *
542  * Fills in an iovec with pointers to the packet buffers. All packets
543  * except the last packet (new current packet) are moved to the iovq
544  * while the application is processing the data.
545  *
546  * LOCKS USED -- called at netpri with rx global lock and call->lock held.
547  */
548 int rxi_ReadvProc(call, iov, nio, maxio, nbytes)
549     struct rx_call *call;
550     struct iovec *iov;
551     int *nio;
552     int maxio;
553     int nbytes;
554 {
555     struct rx_packet *rp; 
556     struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
557     int requestCount;
558     int nextio;
559
560     requestCount = nbytes;
561     nextio = 0;
562
563     /* Free any packets from the last call to ReadvProc/WritevProc */
564     for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
565         queue_Remove(rp);
566         rxi_FreePacket(rp);
567     }
568
569     if (call->mode == RX_MODE_SENDING) {
570         rxi_FlushWrite(call);
571     }
572
573     if (call->error) {
574         return 0;
575     }
576
577     /* Get whatever data is currently available in the receive queue.
578      * If rxi_FillReadVec sends an ack packet then it is possible
579      * that we will receive more data while we drop the call lock
580      * to send the packet. Set the RX_CALL_IOVEC_WAIT flag
581      * here to avoid a race with the receive thread if we send
582      * hard acks in rxi_FillReadVec. */
583     call->flags |= RX_CALL_IOVEC_WAIT;
584     call->iovNBytes = nbytes;
585     call->iovMax = maxio;
586     call->iovNext = 0;
587     call->iov = iov;
588     rxi_FillReadVec(call, 0, 0, 0);
589
590     /* if we need more data then sleep until the receive thread has
591      * filled in the rest. */
592     if (!call->error && call->iovNBytes &&
593         call->iovNext < call->iovMax &&
594         !(call->flags & RX_CALL_RECEIVE_DONE)) {
595       call->flags |= RX_CALL_READER_WAIT;
596       clock_NewTime();
597       call->startWait = clock_Sec();
598       while (call->flags & RX_CALL_READER_WAIT) {
599 #ifdef  RX_ENABLE_LOCKS
600         CV_WAIT(&call->cv_rq, &call->lock);
601 #else
602         osi_rxSleep(&call->rq);
603 #endif
604       }
605       call->startWait = 0;
606     }
607     call->flags &= ~RX_CALL_IOVEC_WAIT;
608 #ifdef RX_ENABLE_LOCKS
609     if (call->error) {
610       return 0;
611     }
612 #endif /* RX_ENABLE_LOCKS */
613
614     call->iov = NULL;
615     *nio = call->iovNext;
616     return nbytes - call->iovNBytes;
617 }
618
619 int rx_ReadvProc(call, iov, nio, maxio, nbytes)
620     struct rx_call *call;
621     struct iovec *iov;
622     int *nio;
623     int maxio;
624     int nbytes;
625 {
626     int bytes;
627     SPLVAR;
628
629     NETPRI;
630     AFS_RXGLOCK();
631     MUTEX_ENTER(&call->lock);
632     bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
633     MUTEX_EXIT(&call->lock);
634     AFS_RXGUNLOCK();
635     USERPRI;
636     return bytes;
637 }
638
639 /* rxi_WriteProc -- internal version.
640  *
641  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
642
643 int rxi_WriteProc(call, buf, nbytes)
644     register struct rx_call *call;
645     register char *buf;
646     register int nbytes;
647 {
648     struct rx_connection *conn = call->conn;
649     register struct rx_packet *cp = call->currentPacket;
650     register struct rx_packet *tp; /* Temporary packet pointer */
651     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
652     register unsigned int t;
653     int requestCount = nbytes;
654
655     /* Free any packets from the last call to ReadvProc/WritevProc */
656     if (!queue_IsEmpty(&call->iovq)) {
657         for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
658             queue_Remove(tp);
659             rxi_FreePacket(tp);
660         }
661     }
662
663     if (call->mode != RX_MODE_SENDING) {
664         if ((conn->type == RX_SERVER_CONNECTION)
665             && (call->mode == RX_MODE_RECEIVING)) {
666             call->mode = RX_MODE_SENDING;
667             if (cp) {
668                 rxi_FreePacket(cp);
669                 cp = call->currentPacket = (struct rx_packet *) 0;
670                 call->nLeft = 0;
671                 call->nFree = 0;
672             }
673         }
674         else {
675             return 0;
676         }
677     }
678
679     /* Loop condition is checked at end, so that a write of 0 bytes
680      * will force a packet to be created--specially for the case where
681      * there are 0 bytes on the stream, but we must send a packet
682      * anyway. */
683     do {
684         if (call->nFree == 0) {
685             if (!call->error && cp) {
686 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
687                 /* Wait until TQ_BUSY is reset before adding any
688                  * packets to the transmit queue
689                  */
690                 while (call->flags & RX_CALL_TQ_BUSY) {
691                     call->flags |= RX_CALL_TQ_WAIT;
692 #ifdef RX_ENABLE_LOCKS
693                     CV_WAIT(&call->cv_tq, &call->lock);
694 #else /* RX_ENABLE_LOCKS */
695                     osi_rxSleep(&call->tq);
696 #endif /* RX_ENABLE_LOCKS */
697                 }
698 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
699                 clock_NewTime(); /* Bogus:  need new time package */
700                 /* The 0, below, specifies that it is not the last packet: 
701                  * there will be others. PrepareSendPacket may
702                  * alter the packet length by up to
703                  * conn->securityMaxTrailerSize */
704                 hadd32(call->bytesSent, cp->length);
705                 rxi_PrepareSendPacket(call, cp, 0);
706                 queue_Append(&call->tq, cp);
707                 cp = call->currentPacket = NULL;
708                 if (!(call->flags & (RX_CALL_FAST_RECOVER|
709                                      RX_CALL_FAST_RECOVER_WAIT))) {
710                     rxi_Start(0, call, 0);
711                 }
712             }
713             /* Wait for transmit window to open up */
714             while (!call->error && call->tnext + 1 > call->tfirst + call->twind) {
715                 clock_NewTime();
716                 call->startWait = clock_Sec();
717
718 #ifdef  RX_ENABLE_LOCKS
719                 CV_WAIT(&call->cv_twind, &call->lock);
720 #else
721                 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
722                 osi_rxSleep(&call->twind);
723 #endif
724
725                 call->startWait = 0;
726 #ifdef RX_ENABLE_LOCKS
727                 if (call->error) {
728                     return 0;
729                 }
730 #endif /* RX_ENABLE_LOCKS */
731             }
732             if (cp = rxi_AllocSendPacket(call, nbytes)) {
733                 call->currentPacket = cp;
734                 call->nFree = cp->length;
735                 call->curvec = 1;   /* 0th vec is always header */
736                 /* begin at the beginning [ more or less ], continue 
737                  * on until the end, then stop. */
738                 call->curpos = (char *)cp->wirevec[1].iov_base
739                                + call->conn->securityHeaderSize; 
740                 call->curlen = cp->wirevec[1].iov_len
741                                - call->conn->securityHeaderSize; 
742             }
743             if (call->error) {
744                 if (cp) {
745                   rxi_FreePacket(cp);
746                   call->currentPacket = NULL;
747                 }
748                 return 0;
749             }
750         }
751
752         if (cp && (int)call->nFree < nbytes) {
753               /* Try to extend the current buffer */
754               register int len, mud;
755               len = cp->length;
756               mud = rx_MaxUserDataSize(call);
757               if (mud > len) {
758                 int want;
759                 want = MIN(nbytes - (int)call->nFree, mud - len);
760                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
761                 if (cp->length > (unsigned)mud)
762                     cp->length = mud;
763                 call->nFree += (cp->length - len);
764               }
765         }
766
767         /* If the remaining bytes fit in the buffer, then store them
768          * and return.  Don't ship a buffer that's full immediately to
769          * the peer--we don't know if it's the last buffer yet */
770
771         if (!cp)  {
772           call->nFree = 0;
773         }
774
775         while (nbytes && call->nFree) {
776         
777           t = MIN((int)call->curlen, nbytes);
778           t = MIN((int)call->nFree, t);
779           bcopy (buf, call->curpos, t);
780           buf += t;
781           nbytes -= t;
782           call->curpos += t;
783           call->curlen -= t;
784           call->nFree -= t;
785
786           if (!call->curlen) {
787             /* need to get another struct iov */
788             if (++call->curvec >= cp->niovecs) {
789               /* current packet is full, extend or send it */
790               call->nFree = 0;
791             } else {
792               call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
793               call->curlen = cp->wirevec[call->curvec].iov_len;
794             }
795           }  
796         } /* while bytes to send and room to send them */
797
798         /* might be out of space now */
799         if (!nbytes) {
800             return requestCount;
801         }
802         else ; /* more data to send, so get another packet and keep going */
803     } while(nbytes);
804
805     return requestCount - nbytes;
806 }
807
808 int rx_WriteProc(call, buf, nbytes)
809   struct rx_call *call;
810   char *buf;
811   int nbytes;
812 {
813     int bytes;
814     int tcurlen;
815     int tnFree;
816     char *tcurpos;
817     SPLVAR;
818
819     /*
820      * Free any packets from the last call to ReadvProc/WritevProc.
821      * We do not need the lock because the receiver threads only
822      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
823      * RX_CALL_IOVEC_WAIT is always cleared before returning from
824      * ReadvProc/WritevProc.
825      */
826     if (!queue_IsEmpty(&call->iovq)) {
827         register struct rx_packet *rp; 
828         register struct rx_packet *nxp;
829         for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
830             queue_Remove(rp);
831             rxi_FreePacket(rp);
832         }
833     }
834
835     /*
836      * Most common case: all of the data fits in the current iovec.
837      * We do not need the lock because this is the only thread that
838      * updates the curlen, curpos, nFree fields.
839      *
840      * We are relying on nFree being zero unless the call is in send mode.
841      */
842     tcurlen = (int)call->curlen;
843     tnFree = (int)call->nFree;
844     if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
845         tcurpos = call->curpos;
846         bcopy(buf, tcurpos, nbytes);
847         call->curpos = tcurpos + nbytes;
848         call->curlen = tcurlen - nbytes;
849         call->nFree = tnFree - nbytes;
850         return nbytes;
851     }
852
853     NETPRI;
854     AFS_RXGLOCK();
855     MUTEX_ENTER(&call->lock);
856     bytes = rxi_WriteProc(call, buf, nbytes);
857     MUTEX_EXIT(&call->lock);
858     AFS_RXGUNLOCK();
859     USERPRI;
860     return bytes;
861 }
862
863 /* Optimization for marshalling 32 bit arguments */
864 int rx_WriteProc32(call, value)
865   register struct rx_call *call;
866   register afs_int32 *value;
867 {
868     int bytes;
869     int tcurlen;
870     int tnFree;
871     char *tcurpos;
872     SPLVAR;
873
874     /*
875      * Free any packets from the last call to ReadvProc/WritevProc.
876      * We do not need the lock because the receiver threads only
877      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
878      * RX_CALL_IOVEC_WAIT is always cleared before returning from
879      * ReadvProc/WritevProc.
880      */
881     if (!queue_IsEmpty(&call->iovq)) {
882         register struct rx_packet *rp; 
883         register struct rx_packet *nxp;
884         for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
885             queue_Remove(rp);
886             rxi_FreePacket(rp);
887         }
888     }
889
890     /*
891      * Most common case: all of the data fits in the current iovec.
892      * We do not need the lock because this is the only thread that
893      * updates the curlen, curpos, nFree fields.
894      *
895      * We are relying on nFree being zero unless the call is in send mode.
896      */
897     tcurlen = (int)call->curlen;
898     tnFree = (int)call->nFree;
899     if (!call->error && tcurlen >= sizeof(afs_int32) && tnFree >= sizeof(afs_int32)) {
900         tcurpos = call->curpos;
901         if (!((long)tcurpos & (sizeof(afs_int32)-1))) {
902             *((afs_int32 *)(tcurpos)) = *value;
903         } else {
904             bcopy((char *)value, tcurpos, sizeof(afs_int32));
905         }
906         call->curpos = tcurpos + sizeof(afs_int32);
907         call->curlen = tcurlen - sizeof(afs_int32);
908         call->nFree = tnFree - sizeof(afs_int32);
909         return sizeof(afs_int32);
910     }
911
912     NETPRI;
913     AFS_RXGLOCK();
914     MUTEX_ENTER(&call->lock);
915     bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
916     MUTEX_EXIT(&call->lock);
917     AFS_RXGUNLOCK();
918     USERPRI;
919     return bytes;
920 }
921
922 /* rxi_WritevAlloc -- internal version.
923  *
924  * Fill in an iovec to point to data in packet buffers. The application
925  * calls rxi_WritevProc when the buffers are full.
926  *
927  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
928
929 int rxi_WritevAlloc(call, iov, nio, maxio, nbytes)
930     struct rx_call *call;
931     struct iovec *iov;
932     int *nio;
933     int maxio;
934     int nbytes;
935 {
936     struct rx_connection *conn = call->conn;
937     struct rx_packet *cp = call->currentPacket;
938     struct rx_packet *tp; /* temporary packet pointer */
939     struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
940     int requestCount;
941     int nextio;
942     /* Temporary values, real work is done in rxi_WritevProc */
943     int tnFree; 
944     int tcurvec;
945     char * tcurpos;
946     int tcurlen;
947
948     requestCount = nbytes;
949     nextio = 0;
950     
951     /* Free any packets from the last call to ReadvProc/WritevProc */
952     for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
953         queue_Remove(tp);
954         rxi_FreePacket(tp);
955     }
956
957     if (call->mode != RX_MODE_SENDING) {
958         if ((conn->type == RX_SERVER_CONNECTION)
959             && (call->mode == RX_MODE_RECEIVING)) {
960             call->mode = RX_MODE_SENDING;
961             if (cp) {
962                 rxi_FreePacket(cp);
963                 cp = call->currentPacket = (struct rx_packet *) 0;
964                 call->nLeft = 0;
965                 call->nFree = 0;
966             }
967         }
968         else {
969             return 0;
970         }
971     }
972
973     /* Set up the iovec to point to data in packet buffers. */
974     tnFree = call->nFree;
975     tcurvec = call->curvec;
976     tcurpos = call->curpos;
977     tcurlen = call->curlen;
978     do {
979       register unsigned int t;
980
981       if (tnFree == 0) {
982         /* current packet is full, allocate a new one */
983         cp = rxi_AllocSendPacket(call, nbytes);
984         if (cp == NULL) {
985           /* out of space, return what we have */
986           *nio = nextio;
987           return requestCount - nbytes;
988         }
989         queue_Append(&call->iovq, cp);
990         tnFree = cp->length;
991         tcurvec = 1;
992         tcurpos = (char *)cp->wirevec[1].iov_base
993                   + call->conn->securityHeaderSize;
994         tcurlen = cp->wirevec[1].iov_len
995                   - call->conn->securityHeaderSize;
996       }
997
998       if (tnFree < nbytes) {
999         /* try to extend the current packet */
1000         register int len, mud;
1001         len = cp->length;
1002         mud = rx_MaxUserDataSize(call);
1003         if (mud > len) {
1004           int want;
1005           want = MIN(nbytes - tnFree, mud - len);
1006           rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
1007           if (cp->length > (unsigned)mud)
1008             cp->length = mud;
1009           tnFree += (cp->length - len);
1010           if (cp == call->currentPacket) {
1011             call->nFree += (cp->length - len);
1012           }
1013         }
1014       }
1015
1016       /* fill in the next entry in the iovec */
1017       t = MIN(tcurlen, nbytes);
1018       t = MIN(tnFree, t);
1019       iov[nextio].iov_base = tcurpos;
1020       iov[nextio].iov_len = t;
1021       nbytes -= t;
1022       tcurpos += t;
1023       tcurlen -= t;
1024       tnFree -= t;
1025       nextio++;
1026
1027       if (!tcurlen) {
1028           /* need to get another struct iov */
1029           if (++tcurvec >= cp->niovecs) {
1030             /* current packet is full, extend it or move on to next packet */
1031             tnFree = 0;
1032           } else { 
1033             tcurpos = (char *)cp->wirevec[tcurvec].iov_base;
1034             tcurlen = cp->wirevec[tcurvec].iov_len;
1035           }
1036       }
1037     } while (nbytes && nextio < maxio);
1038     *nio = nextio;
1039     return requestCount - nbytes;
1040 }
1041
1042 int rx_WritevAlloc(call, iov, nio, maxio, nbytes)
1043     struct rx_call *call;
1044     struct iovec *iov;
1045     int *nio;
1046     int maxio;
1047     int nbytes;
1048 {
1049     int bytes;
1050     SPLVAR;
1051
1052     NETPRI;
1053     AFS_RXGLOCK();
1054     MUTEX_ENTER(&call->lock);
1055     bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
1056     MUTEX_EXIT(&call->lock);
1057     AFS_RXGUNLOCK();
1058     USERPRI;
1059     return bytes;
1060 }
1061
1062 /* rxi_WritevProc -- internal version.
1063  *
1064  * Send buffers allocated in rxi_WritevAlloc.
1065  *
1066  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
1067
1068 int rxi_WritevProc(call, iov, nio, nbytes)
1069     struct rx_call *call;
1070     struct iovec *iov;
1071     int nio;
1072     int nbytes;
1073 {
1074     struct rx_connection *conn = call->conn;
1075     struct rx_packet *cp = call->currentPacket;
1076     register struct rx_packet *tp; /* Temporary packet pointer */
1077     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1078     int nextio;
1079     int requestCount;
1080     struct rx_queue tmpq;
1081
1082     requestCount = nbytes;
1083     nextio = 0;
1084
1085     if (call->mode != RX_MODE_SENDING) {
1086         call->error = RX_PROTOCOL_ERROR;
1087     }
1088
1089 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1090     /* Wait until TQ_BUSY is reset before trying to move any
1091      * packets to the transmit queue.  */
1092     while (!call->error && call->flags & RX_CALL_TQ_BUSY) {
1093         call->flags |= RX_CALL_TQ_WAIT;
1094 #ifdef RX_ENABLE_LOCKS
1095         CV_WAIT(&call->cv_tq, &call->lock);
1096 #else /* RX_ENABLE_LOCKS */
1097         osi_rxSleep(&call->tq);
1098 #endif /* RX_ENABLE_LOCKS */
1099     }
1100 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1101
1102     if (call->error) {
1103         for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1104             queue_Remove(tp);
1105             rxi_FreePacket(tp);
1106         }
1107         if (cp) {
1108             rxi_FreePacket(cp);
1109             cp = call->currentPacket = NULL;
1110         }
1111         return 0;
1112     }
1113
1114     /* Loop through the I/O vector adjusting packet pointers.
1115      * Place full packets back onto the iovq once they are ready
1116      * to send. Set RX_PROTOCOL_ERROR if any problems are found in
1117      * the iovec. We put the loop condition at the end to ensure that
1118      * a zero length write will push a short packet. */
1119     nextio = 0;
1120     queue_Init(&tmpq);
1121     do {
1122         unsigned int t;
1123
1124         if (call->nFree == 0 && cp) {
1125             clock_NewTime(); /* Bogus:  need new time package */
1126             /* The 0, below, specifies that it is not the last packet: 
1127              * there will be others. PrepareSendPacket may
1128              * alter the packet length by up to
1129              * conn->securityMaxTrailerSize */
1130             hadd32(call->bytesSent, cp->length);
1131             rxi_PrepareSendPacket(call, cp, 0);
1132             queue_Append(&tmpq, cp);
1133
1134             /* The head of the iovq is now the current packet */
1135             if (nbytes) {
1136                 if (queue_IsEmpty(&call->iovq)) {
1137                     call->error = RX_PROTOCOL_ERROR;
1138                     cp = call->currentPacket = NULL;
1139                     for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
1140                         queue_Remove(tp);
1141                         rxi_FreePacket(tp);
1142                     }
1143                     return 0;
1144                 }
1145                 cp = queue_First(&call->iovq, rx_packet);
1146                 queue_Remove(cp);
1147                 call->currentPacket = cp;
1148                 call->nFree = cp->length;
1149                 call->curvec = 1;
1150                 call->curpos = (char *)cp->wirevec[1].iov_base
1151                                + call->conn->securityHeaderSize;
1152                 call->curlen = cp->wirevec[1].iov_len
1153                                - call->conn->securityHeaderSize;
1154             }
1155         }
1156
1157         if (nbytes) {
1158           /* The next iovec should point to the current position */
1159           if (iov[nextio].iov_base != call->curpos
1160               || iov[nextio].iov_len > (int)call->curlen) {
1161               call->error = RX_PROTOCOL_ERROR;
1162               for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
1163                   queue_Remove(tp);
1164                   rxi_FreePacket(tp);
1165               }
1166               if (cp) {
1167                 rxi_FreePacket(cp);
1168                 call->currentPacket = NULL;
1169               }
1170               return 0;
1171           }
1172           nbytes -= iov[nextio].iov_len;
1173           call->curpos += iov[nextio].iov_len;
1174           call->curlen -= iov[nextio].iov_len;
1175           call->nFree -= iov[nextio].iov_len;
1176           nextio++;
1177           if (call->curlen == 0) {
1178             if (++call->curvec > cp->niovecs) {
1179                 call->nFree = 0;
1180             } else {
1181                 call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
1182                 call->curlen = cp->wirevec[call->curvec].iov_len;
1183             }
1184           }
1185         }
1186     } while (nbytes && nextio < nio);
1187
1188     /* Move the packets from the temporary queue onto the transmit queue.
1189      * We may end up with more than call->twind packets on the queue. */
1190     for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
1191         queue_Remove(tp);
1192         queue_Append(&call->tq, tp);
1193     }
1194
1195     if (!(call->flags & (RX_CALL_FAST_RECOVER|RX_CALL_FAST_RECOVER_WAIT))) {
1196         rxi_Start(0, call, 0);
1197     }
1198
1199     /* Wait for the length of the transmit queue to fall below call->twind */
1200     while (!call->error && call->tnext + 1 > call->tfirst + call->twind) {
1201         clock_NewTime();
1202         call->startWait = clock_Sec();
1203 #ifdef  RX_ENABLE_LOCKS
1204         CV_WAIT(&call->cv_twind, &call->lock);
1205 #else
1206         call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
1207         osi_rxSleep(&call->twind);
1208 #endif
1209         call->startWait = 0;
1210     }
1211
1212     if (call->error) {
1213         if (cp) {
1214             rxi_FreePacket(cp);
1215             cp = call->currentPacket = NULL;
1216         }
1217         return 0;
1218     }
1219
1220     return requestCount - nbytes;
1221 }
1222
1223 int rx_WritevProc(call, iov, nio, nbytes)
1224     struct rx_call *call;
1225     struct iovec *iov;
1226     int nio;
1227     int nbytes;
1228 {
1229     int bytes;
1230     SPLVAR;
1231
1232     NETPRI;
1233     AFS_RXGLOCK();
1234     MUTEX_ENTER(&call->lock);
1235     bytes = rxi_WritevProc(call, iov, nio, nbytes);
1236     MUTEX_EXIT(&call->lock);
1237     AFS_RXGUNLOCK();
1238     USERPRI;
1239     return bytes;
1240 }
1241
1242 /* Flush any buffered data to the stream, switch to read mode
1243  * (clients) or to EOF mode (servers) */
1244 void rxi_FlushWrite(call)
1245     register struct rx_call *call;
1246 {
1247     register struct rx_packet *cp = call->currentPacket;
1248     register struct rx_packet *tp; /* Temporary packet pointer */
1249     register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1250
1251     /* Free any packets from the last call to ReadvProc/WritevProc */
1252     for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1253         queue_Remove(tp);
1254         rxi_FreePacket(tp);
1255     }
1256
1257     if (call->mode == RX_MODE_SENDING) {
1258
1259         call->mode = (call->conn->type == RX_CLIENT_CONNECTION ?
1260                                           RX_MODE_RECEIVING: RX_MODE_EOF);
1261
1262 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1263         /* Wait until TQ_BUSY is reset before adding any
1264          * packets to the transmit queue
1265          */
1266         while (call->flags & RX_CALL_TQ_BUSY) {
1267             call->flags |= RX_CALL_TQ_WAIT;
1268 #ifdef RX_ENABLE_LOCKS
1269             CV_WAIT(&call->cv_tq, &call->lock);
1270 #else /* RX_ENABLE_LOCKS */
1271             osi_rxSleep(&call->tq);
1272 #endif /* RX_ENABLE_LOCKS */
1273         }
1274 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1275
1276         if (cp) {
1277             /* cp->length is only supposed to be the user's data */
1278             /* cp->length was already set to (then-current) 
1279              * MaxUserDataSize or less. */
1280             cp->length -= call->nFree;
1281             call->currentPacket = (struct rx_packet *) 0;
1282             call->nFree = 0;
1283         }
1284         else {
1285             cp = rxi_AllocSendPacket(call,0);
1286             if (!cp) {
1287                 /* Mode can no longer be MODE_SENDING */
1288                 return;
1289             }
1290             cp->length = 0;
1291             cp->niovecs = 1;  /* just the header */
1292             call->nFree = 0;
1293         }
1294
1295         /* The 1 specifies that this is the last packet */
1296         hadd32(call->bytesSent, cp->length);
1297         rxi_PrepareSendPacket(call, cp, 1);
1298         queue_Append(&call->tq, cp);
1299         if (!(call->flags & (RX_CALL_FAST_RECOVER|
1300                              RX_CALL_FAST_RECOVER_WAIT))) {
1301             rxi_Start(0, call, 0);
1302         }
1303     }
1304 }
1305
1306 /* Flush any buffered data to the stream, switch to read mode
1307  * (clients) or to EOF mode (servers) */
1308 void rx_FlushWrite(call)
1309   struct rx_call *call;
1310 {
1311     SPLVAR;
1312     NETPRI;
1313     AFS_RXGLOCK();
1314     MUTEX_ENTER(&call->lock);
1315     rxi_FlushWrite(call);
1316     MUTEX_EXIT(&call->lock);
1317     AFS_RXGUNLOCK();
1318     USERPRI;
1319 }