a5700e90ac8fc42bc4403f19f2fc6c5b0016d419
[openafs.git] / src / rx / rx_rdwr.c
1  /*
2   * Copyright 2000, International Business Machines Corporation and others.
3   * All Rights Reserved.
4   * 
5   * This software has been released under the terms of the IBM Public
6   * License.  For details, see the LICENSE file in the top-level source
7   * directory or online at http://www.openafs.org/dl/license10.html
8   */
9
10 #include <afsconfig.h>
11 #ifdef KERNEL
12 #include "afs/param.h"
13 #else
14 #include <afs/param.h>
15 #endif
16
17 RCSID
18     ("$Header$");
19
20 #ifdef KERNEL
21 #ifndef UKERNEL
22 #ifdef RX_KERNEL_TRACE
23 #include "rx_kcommon.h"
24 #endif
25 #if defined(AFS_DARWIN_ENV) || defined(AFS_XBSD_ENV)
26 #include "afs/sysincludes.h"
27 #else
28 #include "h/types.h"
29 #include "h/time.h"
30 #include "h/stat.h"
31 #if defined(AFS_AIX_ENV) || defined(AFS_AUX_ENV) || defined(AFS_SUN5_ENV) 
32 #include "h/systm.h"
33 #endif
34 #ifdef  AFS_OSF_ENV
35 #include <net/net_globals.h>
36 #endif /* AFS_OSF_ENV */
37 #ifdef AFS_LINUX20_ENV
38 #include "h/socket.h"
39 #endif
40 #include "netinet/in.h"
41 #if defined(AFS_SGI_ENV)
42 #include "afs/sysincludes.h"
43 #endif
44 #endif
45 #include "afs/afs_args.h"
46 #include "afs/afs_osi.h"
47 #if     (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
48 #include "h/systm.h"
49 #endif
50 #else /* !UKERNEL */
51 #include "afs/sysincludes.h"
52 #endif /* !UKERNEL */
53 #ifdef RXDEBUG
54 #undef RXDEBUG                  /* turn off debugging */
55 #endif /* RXDEBUG */
56
57 #include "rx_kmutex.h"
58 #include "rx/rx_kernel.h"
59 #include "rx/rx_clock.h"
60 #include "rx/rx_queue.h"
61 #include "rx/rx.h"
62 #include "rx/rx_globals.h"
63 #include "afs/lock.h"
64 #include "afsint.h"
65 #ifdef  AFS_OSF_ENV
66 #undef kmem_alloc
67 #undef kmem_free
68 #undef mem_alloc
69 #undef mem_free
70 #undef register
71 #endif /* AFS_OSF_ENV */
72 #else /* KERNEL */
73 # include <sys/types.h>
74 #ifdef AFS_NT40_ENV
75 # include <winsock2.h>
76 #else /* !AFS_NT40_ENV */
77 # include <sys/socket.h>
78 # include <sys/file.h>
79 # include <netdb.h>
80 # include <netinet/in.h>
81 # include <sys/stat.h>
82 # include <sys/time.h>
83 #endif /* !AFS_NT40_ENV */
84 #include <string.h>
85 #ifdef HAVE_UNISTD_H
86 #include <unistd.h>
87 #endif
88 # include "rx_user.h"
89 # include "rx_clock.h"
90 # include "rx_queue.h"
91 # include "rx.h"
92 # include "rx_globals.h"
93 #endif /* KERNEL */
94
95 #ifdef RX_LOCKS_DB
96 /* rxdb_fileID is used to identify the lock location, along with line#. */
97 static int rxdb_fileID = RXDB_FILE_RX_RDWR;
98 #endif /* RX_LOCKS_DB */
99 /* rxi_ReadProc -- internal version.
100  *
101  * LOCKS USED -- called at netpri with rx global lock and call->lock held.
102  */
103 int
104 rxi_ReadProc(register struct rx_call *call, register char *buf,
105              register int nbytes)
106 {
107     register struct rx_packet *cp = call->currentPacket;
108     register struct rx_packet *rp;
109     register int requestCount;
110     register unsigned int t;
111
112 /* XXXX took out clock_NewTime from here.  Was it needed? */
113     requestCount = nbytes;
114
115     /* Free any packets from the last call to ReadvProc/WritevProc */
116     if (queue_IsNotEmpty(&call->iovq)) {
117         rxi_FreePackets(0, &call->iovq);
118     }
119
120     do {
121         if (call->nLeft == 0) {
122             /* Get next packet */
123             for (;;) {
124                 if (call->error || (call->mode != RX_MODE_RECEIVING)) {
125                     if (call->error) {
126                         return 0;
127                     }
128                     if (call->mode == RX_MODE_SENDING) {
129                         rxi_FlushWrite(call);
130                         continue;
131                     }
132                 }
133                 if (queue_IsNotEmpty(&call->rq)) {
134                     /* Check that next packet available is next in sequence */
135                     rp = queue_First(&call->rq, rx_packet);
136                     if (rp->header.seq == call->rnext) {
137                         afs_int32 error;
138                         register struct rx_connection *conn = call->conn;
139                         queue_Remove(rp);
140
141                         /* RXS_CheckPacket called to undo RXS_PreparePacket's
142                          * work.  It may reduce the length of the packet by up
143                          * to conn->maxTrailerSize, to reflect the length of the
144                          * data + the header. */
145                         if ((error =
146                              RXS_CheckPacket(conn->securityObject, call,
147                                              rp))) {
148                             /* Used to merely shut down the call, but now we 
149                              * shut down the whole connection since this may 
150                              * indicate an attempt to hijack it */
151
152                             MUTEX_EXIT(&call->lock);
153                             rxi_ConnectionError(conn, error);
154                             MUTEX_ENTER(&conn->conn_data_lock);
155                             rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
156                             MUTEX_EXIT(&conn->conn_data_lock);
157                             rxi_FreePacket(rp);
158                             MUTEX_ENTER(&call->lock);
159
160                             return 0;
161                         }
162                         call->rnext++;
163                         cp = call->currentPacket = rp;
164                         call->curvec = 1;       /* 0th vec is always header */
165                         /* begin at the beginning [ more or less ], continue 
166                          * on until the end, then stop. */
167                         call->curpos =
168                             (char *)cp->wirevec[1].iov_base +
169                             call->conn->securityHeaderSize;
170                         call->curlen =
171                             cp->wirevec[1].iov_len -
172                             call->conn->securityHeaderSize;
173
174                         /* Notice that this code works correctly if the data
175                          * size is 0 (which it may be--no reply arguments from
176                          * server, for example).  This relies heavily on the
177                          * fact that the code below immediately frees the packet
178                          * (no yields, etc.).  If it didn't, this would be a
179                          * problem because a value of zero for call->nLeft
180                          * normally means that there is no read packet */
181                         call->nLeft = cp->length;
182                         hadd32(call->bytesRcvd, cp->length);
183
184                         /* Send a hard ack for every rxi_HardAckRate+1 packets
185                          * consumed. Otherwise schedule an event to send
186                          * the hard ack later on.
187                          */
188                         call->nHardAcks++;
189                         if (!(call->flags & RX_CALL_RECEIVE_DONE)) {
190                             if (call->nHardAcks > (u_short) rxi_HardAckRate) {
191                                 rxevent_Cancel(call->delayedAckEvent, call,
192                                                RX_CALL_REFCOUNT_DELAY);
193                                 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
194                             } else {
195                                 struct clock when, now;
196                                 clock_GetTime(&now);
197                                 when = now;
198                                 /* Delay to consolidate ack packets */
199                                 clock_Add(&when, &rx_hardAckDelay);
200                                 if (!call->delayedAckEvent
201                                     || clock_Gt(&call->delayedAckEvent->
202                                                 eventTime, &when)) {
203                                     rxevent_Cancel(call->delayedAckEvent,
204                                                    call,
205                                                    RX_CALL_REFCOUNT_DELAY);
206                                     CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
207                                     call->delayedAckEvent =
208                                       rxevent_PostNow(&when, &now,
209                                                      rxi_SendDelayedAck, call,
210                                                      0);
211                                 }
212                             }
213                         }
214                         break;
215                     }
216                 }
217
218 /*
219 MTUXXX  doesn't there need to be an "else" here ??? 
220 */
221                 /* Are there ever going to be any more packets? */
222                 if (call->flags & RX_CALL_RECEIVE_DONE) {
223                     return requestCount - nbytes;
224                 }
225                 /* Wait for in-sequence packet */
226                 call->flags |= RX_CALL_READER_WAIT;
227                 clock_NewTime();
228                 call->startWait = clock_Sec();
229                 while (call->flags & RX_CALL_READER_WAIT) {
230 #ifdef  RX_ENABLE_LOCKS
231                     CV_WAIT(&call->cv_rq, &call->lock);
232 #else
233                     osi_rxSleep(&call->rq);
234 #endif
235                 }
236
237                 call->startWait = 0;
238 #ifdef RX_ENABLE_LOCKS
239                 if (call->error) {
240                     return 0;
241                 }
242 #endif /* RX_ENABLE_LOCKS */
243             }
244         } else
245             /* assert(cp); */
246             /* MTUXXX  this should be replaced by some error-recovery code before shipping */
247             /* yes, the following block is allowed to be the ELSE clause (or not) */
248             /* It's possible for call->nLeft to be smaller than any particular
249              * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
250              * reflects the size of the buffer.  We have to keep track of the
251              * number of bytes read in the length field of the packet struct.  On
252              * the final portion of a received packet, it's almost certain that
253              * call->nLeft will be smaller than the final buffer. */
254             while (nbytes && cp) {
255                 t = MIN((int)call->curlen, nbytes);
256                 t = MIN(t, (int)call->nLeft);
257                 memcpy(buf, call->curpos, t);
258                 buf += t;
259                 nbytes -= t;
260                 call->curpos += t;
261                 call->curlen -= t;
262                 call->nLeft -= t;
263
264                 if (!call->nLeft) {
265                     /* out of packet.  Get another one. */
266                     rxi_FreePacket(cp);
267                     cp = call->currentPacket = (struct rx_packet *)0;
268                 } else if (!call->curlen) {
269                     /* need to get another struct iov */
270                     if (++call->curvec >= cp->niovecs) {
271                         /* current packet is exhausted, get ready for another */
272                         /* don't worry about curvec and stuff, they get set somewhere else */
273                         rxi_FreePacket(cp);
274                         cp = call->currentPacket = (struct rx_packet *)0;
275                         call->nLeft = 0;
276                     } else {
277                         call->curpos =
278                             (char *)cp->wirevec[call->curvec].iov_base;
279                         call->curlen = cp->wirevec[call->curvec].iov_len;
280                     }
281                 }
282             }
283         if (!nbytes) {
284             /* user buffer is full, return */
285             return requestCount;
286         }
287
288     } while (nbytes);
289
290     return requestCount;
291 }
292
293 int
294 rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
295 {
296     int bytes;
297     int tcurlen;
298     int tnLeft;
299     char *tcurpos;
300     SPLVAR;
301
302     /*
303      * Free any packets from the last call to ReadvProc/WritevProc.
304      * We do not need the lock because the receiver threads only
305      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
306      * RX_CALL_IOVEC_WAIT is always cleared before returning from
307      * ReadvProc/WritevProc.
308      */
309     if (!queue_IsEmpty(&call->iovq)) {
310         rxi_FreePackets(0, &call->iovq);
311     }
312
313     /*
314      * Most common case, all of the data is in the current iovec.
315      * We do not need the lock because this is the only thread that
316      * updates the curlen, curpos, nLeft fields.
317      *
318      * We are relying on nLeft being zero unless the call is in receive mode.
319      */
320     tcurlen = call->curlen;
321     tnLeft = call->nLeft;
322     if (!call->error && tcurlen > nbytes && tnLeft > nbytes) {
323         tcurpos = call->curpos;
324         memcpy(buf, tcurpos, nbytes);
325         call->curpos = tcurpos + nbytes;
326         call->curlen = tcurlen - nbytes;
327         call->nLeft = tnLeft - nbytes;
328         return nbytes;
329     }
330
331     NETPRI;
332     MUTEX_ENTER(&call->lock);
333     bytes = rxi_ReadProc(call, buf, nbytes);
334     MUTEX_EXIT(&call->lock);
335     USERPRI;
336     return bytes;
337 }
338
339 /* Optimization for unmarshalling 32 bit integers */
340 int
341 rx_ReadProc32(struct rx_call *call, afs_int32 * value)
342 {
343     int bytes;
344     int tcurlen;
345     int tnLeft;
346     char *tcurpos;
347     SPLVAR;
348
349     /*
350      * Free any packets from the last call to ReadvProc/WritevProc.
351      * We do not need the lock because the receiver threads only
352      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
353      * RX_CALL_IOVEC_WAIT is always cleared before returning from
354      * ReadvProc/WritevProc.
355      */
356     if (!queue_IsEmpty(&call->iovq)) {
357         rxi_FreePackets(0, &call->iovq);
358     }
359
360     /*
361      * Most common case, all of the data is in the current iovec.
362      * We do not need the lock because this is the only thread that
363      * updates the curlen, curpos, nLeft fields.
364      *
365      * We are relying on nLeft being zero unless the call is in receive mode.
366      */
367     tcurlen = call->curlen;
368     tnLeft = call->nLeft;
369     if (!call->error && tcurlen >= sizeof(afs_int32)
370         && tnLeft >= sizeof(afs_int32)) {
371         tcurpos = call->curpos;
372         memcpy((char *)value, tcurpos, sizeof(afs_int32));
373         call->curpos = tcurpos + sizeof(afs_int32);
374         call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
375         call->nLeft = (u_short)(tnLeft - sizeof(afs_int32));
376         return sizeof(afs_int32);
377     }
378
379     NETPRI;
380     MUTEX_ENTER(&call->lock);
381     bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
382     MUTEX_EXIT(&call->lock);
383     USERPRI;
384     return bytes;
385 }
386
387 /* rxi_FillReadVec
388  *
389  * Uses packets in the receive queue to fill in as much of the
390  * current iovec as possible. Does not block if it runs out
391  * of packets to complete the iovec. Return true if an ack packet
392  * was sent, otherwise return false */
393 int
394 rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
395 {
396     int didConsume = 0;
397     int didHardAck = 0;
398     register unsigned int t;
399     struct rx_packet *rp;
400     struct rx_packet *curp;
401     struct iovec *call_iov;
402     struct iovec *cur_iov = NULL;
403
404     curp = call->currentPacket;
405     if (curp) {
406         cur_iov = &curp->wirevec[call->curvec];
407     }
408     call_iov = &call->iov[call->iovNext];
409
410     while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) {
411         if (call->nLeft == 0) {
412             /* Get next packet */
413             if (queue_IsNotEmpty(&call->rq)) {
414                 /* Check that next packet available is next in sequence */
415                 rp = queue_First(&call->rq, rx_packet);
416                 if (rp->header.seq == call->rnext) {
417                     afs_int32 error;
418                     register struct rx_connection *conn = call->conn;
419                     queue_Remove(rp);
420
421                     /* RXS_CheckPacket called to undo RXS_PreparePacket's
422                      * work.  It may reduce the length of the packet by up
423                      * to conn->maxTrailerSize, to reflect the length of the
424                      * data + the header. */
425                     if ((error =
426                          RXS_CheckPacket(conn->securityObject, call, rp))) {
427                         /* Used to merely shut down the call, but now we 
428                          * shut down the whole connection since this may 
429                          * indicate an attempt to hijack it */
430
431                         MUTEX_EXIT(&call->lock);
432                         rxi_ConnectionError(conn, error);
433                         MUTEX_ENTER(&conn->conn_data_lock);
434                         rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
435                         MUTEX_EXIT(&conn->conn_data_lock);
436                         rxi_FreePacket(rp);
437                         MUTEX_ENTER(&call->lock);
438
439                         return 1;
440                     }
441                     call->rnext++;
442                     curp = call->currentPacket = rp;
443                     call->curvec = 1;   /* 0th vec is always header */
444                     cur_iov = &curp->wirevec[1];
445                     /* begin at the beginning [ more or less ], continue 
446                      * on until the end, then stop. */
447                     call->curpos =
448                         (char *)curp->wirevec[1].iov_base +
449                         call->conn->securityHeaderSize;
450                     call->curlen =
451                         curp->wirevec[1].iov_len -
452                         call->conn->securityHeaderSize;
453
454                     /* Notice that this code works correctly if the data
455                      * size is 0 (which it may be--no reply arguments from
456                      * server, for example).  This relies heavily on the
457                      * fact that the code below immediately frees the packet
458                      * (no yields, etc.).  If it didn't, this would be a
459                      * problem because a value of zero for call->nLeft
460                      * normally means that there is no read packet */
461                     call->nLeft = curp->length;
462                     hadd32(call->bytesRcvd, curp->length);
463
464                     /* Send a hard ack for every rxi_HardAckRate+1 packets
465                      * consumed. Otherwise schedule an event to send
466                      * the hard ack later on.
467                      */
468                     call->nHardAcks++;
469                     didConsume = 1;
470                     continue;
471                 }
472             }
473             break;
474         }
475
476         /* It's possible for call->nLeft to be smaller than any particular
477          * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
478          * reflects the size of the buffer.  We have to keep track of the
479          * number of bytes read in the length field of the packet struct.  On
480          * the final portion of a received packet, it's almost certain that
481          * call->nLeft will be smaller than the final buffer. */
482         while (call->iovNBytes && call->iovNext < call->iovMax && curp) {
483
484             t = MIN((int)call->curlen, call->iovNBytes);
485             t = MIN(t, (int)call->nLeft);
486             call_iov->iov_base = call->curpos;
487             call_iov->iov_len = t;
488             call_iov++;
489             call->iovNext++;
490             call->iovNBytes -= t;
491             call->curpos += t;
492             call->curlen -= t;
493             call->nLeft -= t;
494
495             if (!call->nLeft) {
496                 /* out of packet.  Get another one. */
497                 queue_Append(&call->iovq, curp);
498                 curp = call->currentPacket = (struct rx_packet *)0;
499             } else if (!call->curlen) {
500                 /* need to get another struct iov */
501                 if (++call->curvec >= curp->niovecs) {
502                     /* current packet is exhausted, get ready for another */
503                     /* don't worry about curvec and stuff, they get set somewhere else */
504                     queue_Append(&call->iovq, curp);
505                     curp = call->currentPacket = (struct rx_packet *)0;
506                     call->nLeft = 0;
507                 } else {
508                     cur_iov++;
509                     call->curpos = (char *)cur_iov->iov_base;
510                     call->curlen = cur_iov->iov_len;
511                 }
512             }
513         }
514     }
515
516     /* If we consumed any packets then check whether we need to
517      * send a hard ack. */
518     if (didConsume && (!(call->flags & RX_CALL_RECEIVE_DONE))) {
519         if (call->nHardAcks > (u_short) rxi_HardAckRate) {
520             rxevent_Cancel(call->delayedAckEvent, call,
521                            RX_CALL_REFCOUNT_DELAY);
522             rxi_SendAck(call, 0, serial, RX_ACK_DELAY, 0);
523             didHardAck = 1;
524         } else {
525             struct clock when, now;
526             clock_GetTime(&now);
527             when = now;
528             /* Delay to consolidate ack packets */
529             clock_Add(&when, &rx_hardAckDelay);
530             if (!call->delayedAckEvent
531                 || clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
532                 rxevent_Cancel(call->delayedAckEvent, call,
533                                RX_CALL_REFCOUNT_DELAY);
534                 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
535                 call->delayedAckEvent =
536                     rxevent_PostNow(&when, &now, rxi_SendDelayedAck, call, 0);
537             }
538         }
539     }
540     return didHardAck;
541 }
542
543
544 /* rxi_ReadvProc -- internal version.
545  *
546  * Fills in an iovec with pointers to the packet buffers. All packets
547  * except the last packet (new current packet) are moved to the iovq
548  * while the application is processing the data.
549  *
550  * LOCKS USED -- called at netpri with rx global lock and call->lock held.
551  */
552 int
553 rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
554               int nbytes)
555 {
556     int requestCount;
557     int nextio;
558
559     requestCount = nbytes;
560     nextio = 0;
561
562     /* Free any packets from the last call to ReadvProc/WritevProc */
563     if (queue_IsNotEmpty(&call->iovq)) {
564         rxi_FreePackets(0, &call->iovq);
565     }
566
567     if (call->mode == RX_MODE_SENDING) {
568         rxi_FlushWrite(call);
569     }
570
571     if (call->error) {
572         return 0;
573     }
574
575     /* Get whatever data is currently available in the receive queue.
576      * If rxi_FillReadVec sends an ack packet then it is possible
577      * that we will receive more data while we drop the call lock
578      * to send the packet. Set the RX_CALL_IOVEC_WAIT flag
579      * here to avoid a race with the receive thread if we send
580      * hard acks in rxi_FillReadVec. */
581     call->flags |= RX_CALL_IOVEC_WAIT;
582     call->iovNBytes = nbytes;
583     call->iovMax = maxio;
584     call->iovNext = 0;
585     call->iov = iov;
586     rxi_FillReadVec(call, 0);
587
588     /* if we need more data then sleep until the receive thread has
589      * filled in the rest. */
590     if (!call->error && call->iovNBytes && call->iovNext < call->iovMax
591         && !(call->flags & RX_CALL_RECEIVE_DONE)) {
592         call->flags |= RX_CALL_READER_WAIT;
593         clock_NewTime();
594         call->startWait = clock_Sec();
595         while (call->flags & RX_CALL_READER_WAIT) {
596 #ifdef  RX_ENABLE_LOCKS
597             CV_WAIT(&call->cv_rq, &call->lock);
598 #else
599             osi_rxSleep(&call->rq);
600 #endif
601         }
602         call->startWait = 0;
603     }
604     call->flags &= ~RX_CALL_IOVEC_WAIT;
605 #ifdef RX_ENABLE_LOCKS
606     if (call->error) {
607         return 0;
608     }
609 #endif /* RX_ENABLE_LOCKS */
610
611     call->iov = NULL;
612     *nio = call->iovNext;
613     return nbytes - call->iovNBytes;
614 }
615
616 int
617 rx_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
618              int nbytes)
619 {
620     int bytes;
621     SPLVAR;
622
623     NETPRI;
624     MUTEX_ENTER(&call->lock);
625     bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
626     MUTEX_EXIT(&call->lock);
627     USERPRI;
628     return bytes;
629 }
630
631 /* rxi_WriteProc -- internal version.
632  *
633  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
634
635 int
636 rxi_WriteProc(register struct rx_call *call, register char *buf,
637               register int nbytes)
638 {
639     struct rx_connection *conn = call->conn;
640     register struct rx_packet *cp = call->currentPacket;
641     register unsigned int t;
642     int requestCount = nbytes;
643
644     /* Free any packets from the last call to ReadvProc/WritevProc */
645     if (queue_IsNotEmpty(&call->iovq)) {
646         rxi_FreePackets(0, &call->iovq);
647     }
648
649     if (call->mode != RX_MODE_SENDING) {
650         if ((conn->type == RX_SERVER_CONNECTION)
651             && (call->mode == RX_MODE_RECEIVING)) {
652             call->mode = RX_MODE_SENDING;
653             if (cp) {
654                 rxi_FreePacket(cp);
655                 cp = call->currentPacket = (struct rx_packet *)0;
656                 call->nLeft = 0;
657                 call->nFree = 0;
658             }
659         } else {
660             return 0;
661         }
662     }
663
664     /* Loop condition is checked at end, so that a write of 0 bytes
665      * will force a packet to be created--specially for the case where
666      * there are 0 bytes on the stream, but we must send a packet
667      * anyway. */
668     do {
669         if (call->nFree == 0) {
670             if (!call->error && cp) {
671 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
672                 /* Wait until TQ_BUSY is reset before adding any
673                  * packets to the transmit queue
674                  */
675                 while (call->flags & RX_CALL_TQ_BUSY) {
676                     call->flags |= RX_CALL_TQ_WAIT;
677 #ifdef RX_ENABLE_LOCKS
678                     CV_WAIT(&call->cv_tq, &call->lock);
679 #else /* RX_ENABLE_LOCKS */
680                     osi_rxSleep(&call->tq);
681 #endif /* RX_ENABLE_LOCKS */
682                 }
683 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
684                 clock_NewTime();        /* Bogus:  need new time package */
685                 /* The 0, below, specifies that it is not the last packet: 
686                  * there will be others. PrepareSendPacket may
687                  * alter the packet length by up to
688                  * conn->securityMaxTrailerSize */
689                 hadd32(call->bytesSent, cp->length);
690                 rxi_PrepareSendPacket(call, cp, 0);
691                 queue_Append(&call->tq, cp);
692                 cp = call->currentPacket = NULL;
693                 if (!
694                     (call->
695                      flags & (RX_CALL_FAST_RECOVER |
696                               RX_CALL_FAST_RECOVER_WAIT))) {
697                     rxi_Start(0, call, 0, 0);
698                 }
699             }
700             /* Wait for transmit window to open up */
701             while (!call->error
702                    && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
703                 clock_NewTime();
704                 call->startWait = clock_Sec();
705
706 #ifdef  RX_ENABLE_LOCKS
707                 CV_WAIT(&call->cv_twind, &call->lock);
708 #else
709                 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
710                 osi_rxSleep(&call->twind);
711 #endif
712
713                 call->startWait = 0;
714 #ifdef RX_ENABLE_LOCKS
715                 if (call->error) {
716                     return 0;
717                 }
718 #endif /* RX_ENABLE_LOCKS */
719             }
720             if ((cp = rxi_AllocSendPacket(call, nbytes))) {
721                 call->currentPacket = cp;
722                 call->nFree = cp->length;
723                 call->curvec = 1;       /* 0th vec is always header */
724                 /* begin at the beginning [ more or less ], continue 
725                  * on until the end, then stop. */
726                 call->curpos =
727                     (char *)cp->wirevec[1].iov_base +
728                     call->conn->securityHeaderSize;
729                 call->curlen =
730                     cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
731             }
732             if (call->error) {
733                 if (cp) {
734                     rxi_FreePacket(cp);
735                     call->currentPacket = NULL;
736                 }
737                 return 0;
738             }
739         }
740
741         if (cp && (int)call->nFree < nbytes) {
742             /* Try to extend the current buffer */
743             register int len, mud;
744             len = cp->length;
745             mud = rx_MaxUserDataSize(call);
746             if (mud > len) {
747                 int want;
748                 want = MIN(nbytes - (int)call->nFree, mud - len);
749                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
750                 if (cp->length > (unsigned)mud)
751                     cp->length = mud;
752                 call->nFree += (cp->length - len);
753             }
754         }
755
756         /* If the remaining bytes fit in the buffer, then store them
757          * and return.  Don't ship a buffer that's full immediately to
758          * the peer--we don't know if it's the last buffer yet */
759
760         if (!cp) {
761             call->nFree = 0;
762         }
763
764         while (nbytes && call->nFree) {
765
766             t = MIN((int)call->curlen, nbytes);
767             t = MIN((int)call->nFree, t);
768             memcpy(call->curpos, buf, t);
769             buf += t;
770             nbytes -= t;
771             call->curpos += t;
772             call->curlen -= (u_short)t;
773             call->nFree -= (u_short)t;
774
775             if (!call->curlen) {
776                 /* need to get another struct iov */
777                 if (++call->curvec >= cp->niovecs) {
778                     /* current packet is full, extend or send it */
779                     call->nFree = 0;
780                 } else {
781                     call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
782                     call->curlen = cp->wirevec[call->curvec].iov_len;
783                 }
784             }
785         }                       /* while bytes to send and room to send them */
786
787         /* might be out of space now */
788         if (!nbytes) {
789             return requestCount;
790         } else;                 /* more data to send, so get another packet and keep going */
791     } while (nbytes);
792
793     return requestCount - nbytes;
794 }
795
796 int
797 rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
798 {
799     int bytes;
800     int tcurlen;
801     int tnFree;
802     char *tcurpos;
803     SPLVAR;
804
805     /*
806      * Free any packets from the last call to ReadvProc/WritevProc.
807      * We do not need the lock because the receiver threads only
808      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
809      * RX_CALL_IOVEC_WAIT is always cleared before returning from
810      * ReadvProc/WritevProc.
811      */
812     if (queue_IsNotEmpty(&call->iovq)) {
813         rxi_FreePackets(0, &call->iovq);
814     }
815
816     /*
817      * Most common case: all of the data fits in the current iovec.
818      * We do not need the lock because this is the only thread that
819      * updates the curlen, curpos, nFree fields.
820      *
821      * We are relying on nFree being zero unless the call is in send mode.
822      */
823     tcurlen = (int)call->curlen;
824     tnFree = (int)call->nFree;
825     if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
826         tcurpos = call->curpos;
827         memcpy(tcurpos, buf, nbytes);
828         call->curpos = tcurpos + nbytes;
829         call->curlen = (u_short)(tcurlen - nbytes);
830         call->nFree = (u_short)(tnFree - nbytes);
831         return nbytes;
832     }
833
834     NETPRI;
835     MUTEX_ENTER(&call->lock);
836     bytes = rxi_WriteProc(call, buf, nbytes);
837     MUTEX_EXIT(&call->lock);
838     USERPRI;
839     return bytes;
840 }
841
842 /* Optimization for marshalling 32 bit arguments */
843 int
844 rx_WriteProc32(register struct rx_call *call, register afs_int32 * value)
845 {
846     int bytes;
847     int tcurlen;
848     int tnFree;
849     char *tcurpos;
850     SPLVAR;
851
852     /*
853      * Free any packets from the last call to ReadvProc/WritevProc.
854      * We do not need the lock because the receiver threads only
855      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
856      * RX_CALL_IOVEC_WAIT is always cleared before returning from
857      * ReadvProc/WritevProc.
858      */
859     if (queue_IsNotEmpty(&call->iovq)) {
860         rxi_FreePackets(0, &call->iovq);
861     }
862
863     /*
864      * Most common case: all of the data fits in the current iovec.
865      * We do not need the lock because this is the only thread that
866      * updates the curlen, curpos, nFree fields.
867      *
868      * We are relying on nFree being zero unless the call is in send mode.
869      */
870     tcurlen = call->curlen;
871     tnFree = call->nFree;
872     if (!call->error && tcurlen >= sizeof(afs_int32)
873         && tnFree >= sizeof(afs_int32)) {
874         tcurpos = call->curpos;
875         if (!((size_t)tcurpos & (sizeof(afs_int32) - 1))) {
876             *((afs_int32 *) (tcurpos)) = *value;
877         } else {
878             memcpy(tcurpos, (char *)value, sizeof(afs_int32));
879         }
880         call->curpos = tcurpos + sizeof(afs_int32);
881         call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
882         call->nFree = (u_short)(tnFree - sizeof(afs_int32));
883         return sizeof(afs_int32);
884     }
885
886     NETPRI;
887     MUTEX_ENTER(&call->lock);
888     bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
889     MUTEX_EXIT(&call->lock);
890     USERPRI;
891     return bytes;
892 }
893
894 /* rxi_WritevAlloc -- internal version.
895  *
896  * Fill in an iovec to point to data in packet buffers. The application
897  * calls rxi_WritevProc when the buffers are full.
898  *
899  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
900
901 int
902 rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
903                 int nbytes)
904 {
905     struct rx_connection *conn = call->conn;
906     struct rx_packet *cp = call->currentPacket;
907     int requestCount;
908     int nextio;
909     /* Temporary values, real work is done in rxi_WritevProc */
910     int tnFree;
911     int tcurvec;
912     char *tcurpos;
913     int tcurlen;
914
915     requestCount = nbytes;
916     nextio = 0;
917
918     /* Free any packets from the last call to ReadvProc/WritevProc */
919     if (queue_IsNotEmpty(&call->iovq)) {
920         rxi_FreePackets(0, &call->iovq);
921     }
922
923     if (call->mode != RX_MODE_SENDING) {
924         if ((conn->type == RX_SERVER_CONNECTION)
925             && (call->mode == RX_MODE_RECEIVING)) {
926             call->mode = RX_MODE_SENDING;
927             if (cp) {
928                 rxi_FreePacket(cp);
929                 cp = call->currentPacket = (struct rx_packet *)0;
930                 call->nLeft = 0;
931                 call->nFree = 0;
932             }
933         } else {
934             return 0;
935         }
936     }
937
938     /* Set up the iovec to point to data in packet buffers. */
939     tnFree = call->nFree;
940     tcurvec = call->curvec;
941     tcurpos = call->curpos;
942     tcurlen = call->curlen;
943     do {
944         register unsigned int t;
945
946         if (tnFree == 0) {
947             /* current packet is full, allocate a new one */
948             cp = rxi_AllocSendPacket(call, nbytes);
949             if (cp == NULL) {
950                 /* out of space, return what we have */
951                 *nio = nextio;
952                 return requestCount - nbytes;
953             }
954             queue_Append(&call->iovq, cp);
955             tnFree = cp->length;
956             tcurvec = 1;
957             tcurpos =
958                 (char *)cp->wirevec[1].iov_base +
959                 call->conn->securityHeaderSize;
960             tcurlen = cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
961         }
962
963         if (tnFree < nbytes) {
964             /* try to extend the current packet */
965             register int len, mud;
966             len = cp->length;
967             mud = rx_MaxUserDataSize(call);
968             if (mud > len) {
969                 int want;
970                 want = MIN(nbytes - tnFree, mud - len);
971                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
972                 if (cp->length > (unsigned)mud)
973                     cp->length = mud;
974                 tnFree += (cp->length - len);
975                 if (cp == call->currentPacket) {
976                     call->nFree += (cp->length - len);
977                 }
978             }
979         }
980
981         /* fill in the next entry in the iovec */
982         t = MIN(tcurlen, nbytes);
983         t = MIN(tnFree, t);
984         iov[nextio].iov_base = tcurpos;
985         iov[nextio].iov_len = t;
986         nbytes -= t;
987         tcurpos += t;
988         tcurlen -= t;
989         tnFree -= t;
990         nextio++;
991
992         if (!tcurlen) {
993             /* need to get another struct iov */
994             if (++tcurvec >= cp->niovecs) {
995                 /* current packet is full, extend it or move on to next packet */
996                 tnFree = 0;
997             } else {
998                 tcurpos = (char *)cp->wirevec[tcurvec].iov_base;
999                 tcurlen = cp->wirevec[tcurvec].iov_len;
1000             }
1001         }
1002     } while (nbytes && nextio < maxio);
1003     *nio = nextio;
1004     return requestCount - nbytes;
1005 }
1006
1007 int
1008 rx_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
1009                int nbytes)
1010 {
1011     int bytes;
1012     SPLVAR;
1013
1014     NETPRI;
1015     MUTEX_ENTER(&call->lock);
1016     bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
1017     MUTEX_EXIT(&call->lock);
1018     USERPRI;
1019     return bytes;
1020 }
1021
1022 /* rxi_WritevProc -- internal version.
1023  *
1024  * Send buffers allocated in rxi_WritevAlloc.
1025  *
1026  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
1027
1028 int
1029 rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1030 {
1031     struct rx_packet *cp = call->currentPacket;
1032     int nextio;
1033     int requestCount;
1034     struct rx_queue tmpq;
1035
1036     requestCount = nbytes;
1037     nextio = 0;
1038
1039     if (call->mode != RX_MODE_SENDING) {
1040         call->error = RX_PROTOCOL_ERROR;
1041     }
1042 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1043     /* Wait until TQ_BUSY is reset before trying to move any
1044      * packets to the transmit queue.  */
1045     while (!call->error && call->flags & RX_CALL_TQ_BUSY) {
1046         call->flags |= RX_CALL_TQ_WAIT;
1047 #ifdef RX_ENABLE_LOCKS
1048         CV_WAIT(&call->cv_tq, &call->lock);
1049 #else /* RX_ENABLE_LOCKS */
1050         osi_rxSleep(&call->tq);
1051 #endif /* RX_ENABLE_LOCKS */
1052     }
1053 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1054
1055     if (call->error) {
1056         if (cp) {
1057             queue_Prepend(&call->iovq, cp);
1058             cp = call->currentPacket = NULL;
1059         }
1060         rxi_FreePackets(0, &call->iovq);
1061         return 0;
1062     }
1063
1064     /* Loop through the I/O vector adjusting packet pointers.
1065      * Place full packets back onto the iovq once they are ready
1066      * to send. Set RX_PROTOCOL_ERROR if any problems are found in
1067      * the iovec. We put the loop condition at the end to ensure that
1068      * a zero length write will push a short packet. */
1069     nextio = 0;
1070     queue_Init(&tmpq);
1071     do {
1072         if (call->nFree == 0 && cp) {
1073             clock_NewTime();    /* Bogus:  need new time package */
1074             /* The 0, below, specifies that it is not the last packet: 
1075              * there will be others. PrepareSendPacket may
1076              * alter the packet length by up to
1077              * conn->securityMaxTrailerSize */
1078             hadd32(call->bytesSent, cp->length);
1079             rxi_PrepareSendPacket(call, cp, 0);
1080             queue_Append(&tmpq, cp);
1081
1082             /* The head of the iovq is now the current packet */
1083             if (nbytes) {
1084                 if (queue_IsEmpty(&call->iovq)) {
1085                     call->error = RX_PROTOCOL_ERROR;
1086                     cp = call->currentPacket = NULL;
1087                     rxi_FreePackets(0, &tmpq);
1088                     return 0;
1089                 }
1090                 cp = queue_First(&call->iovq, rx_packet);
1091                 queue_Remove(cp);
1092                 call->currentPacket = cp;
1093                 call->nFree = cp->length;
1094                 call->curvec = 1;
1095                 call->curpos =
1096                     (char *)cp->wirevec[1].iov_base +
1097                     call->conn->securityHeaderSize;
1098                 call->curlen =
1099                     cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
1100             }
1101         }
1102
1103         if (nbytes) {
1104             /* The next iovec should point to the current position */
1105             if (iov[nextio].iov_base != call->curpos
1106                 || iov[nextio].iov_len > (int)call->curlen) {
1107                 call->error = RX_PROTOCOL_ERROR;
1108                 if (cp) {
1109                     queue_Prepend(&tmpq, cp);
1110                     call->currentPacket = NULL;
1111                 }
1112                 rxi_FreePackets(0, &tmpq);
1113                 return 0;
1114             }
1115             nbytes -= iov[nextio].iov_len;
1116             call->curpos += iov[nextio].iov_len;
1117             call->curlen -= iov[nextio].iov_len;
1118             call->nFree -= iov[nextio].iov_len;
1119             nextio++;
1120             if (call->curlen == 0) {
1121                 if (++call->curvec > cp->niovecs) {
1122                     call->nFree = 0;
1123                 } else {
1124                     call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
1125                     call->curlen = cp->wirevec[call->curvec].iov_len;
1126                 }
1127             }
1128         }
1129     } while (nbytes && nextio < nio);
1130
1131     /* Move the packets from the temporary queue onto the transmit queue.
1132      * We may end up with more than call->twind packets on the queue. */
1133     queue_SpliceAppend(&call->tq, &tmpq);
1134
1135     if (!(call->flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
1136         rxi_Start(0, call, 0, 0);
1137     }
1138
1139     /* Wait for the length of the transmit queue to fall below call->twind */
1140     while (!call->error && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
1141         clock_NewTime();
1142         call->startWait = clock_Sec();
1143 #ifdef  RX_ENABLE_LOCKS
1144         CV_WAIT(&call->cv_twind, &call->lock);
1145 #else
1146         call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
1147         osi_rxSleep(&call->twind);
1148 #endif
1149         call->startWait = 0;
1150     }
1151
1152     if (call->error) {
1153         if (cp) {
1154             rxi_FreePacket(cp);
1155             cp = call->currentPacket = NULL;
1156         }
1157         return 0;
1158     }
1159
1160     return requestCount - nbytes;
1161 }
1162
1163 int
1164 rx_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1165 {
1166     int bytes;
1167     SPLVAR;
1168
1169     NETPRI;
1170     MUTEX_ENTER(&call->lock);
1171     bytes = rxi_WritevProc(call, iov, nio, nbytes);
1172     MUTEX_EXIT(&call->lock);
1173     USERPRI;
1174     return bytes;
1175 }
1176
1177 /* Flush any buffered data to the stream, switch to read mode
1178  * (clients) or to EOF mode (servers) */
1179 void
1180 rxi_FlushWrite(register struct rx_call *call)
1181 {
1182     register struct rx_packet *cp = call->currentPacket;
1183
1184     /* Free any packets from the last call to ReadvProc/WritevProc */
1185     if (queue_IsNotEmpty(&call->iovq)) {
1186         rxi_FreePackets(0, &call->iovq);
1187     }
1188
1189     if (call->mode == RX_MODE_SENDING) {
1190
1191         call->mode =
1192             (call->conn->type ==
1193              RX_CLIENT_CONNECTION ? RX_MODE_RECEIVING : RX_MODE_EOF);
1194
1195 #ifdef RX_KERNEL_TRACE
1196         {
1197             int glockOwner = ISAFS_GLOCK();
1198             if (!glockOwner)
1199                 AFS_GLOCK();
1200             afs_Trace3(afs_iclSetp, CM_TRACE_WASHERE, ICL_TYPE_STRING,
1201                        __FILE__, ICL_TYPE_INT32, __LINE__, ICL_TYPE_POINTER,
1202                        call);
1203             if (!glockOwner)
1204                 AFS_GUNLOCK();
1205         }
1206 #endif
1207
1208 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1209         /* Wait until TQ_BUSY is reset before adding any
1210          * packets to the transmit queue
1211          */
1212         while (call->flags & RX_CALL_TQ_BUSY) {
1213             call->flags |= RX_CALL_TQ_WAIT;
1214 #ifdef RX_ENABLE_LOCKS
1215             CV_WAIT(&call->cv_tq, &call->lock);
1216 #else /* RX_ENABLE_LOCKS */
1217             osi_rxSleep(&call->tq);
1218 #endif /* RX_ENABLE_LOCKS */
1219         }
1220 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1221
1222         if (cp) {
1223             /* cp->length is only supposed to be the user's data */
1224             /* cp->length was already set to (then-current) 
1225              * MaxUserDataSize or less. */
1226             cp->length -= call->nFree;
1227             call->currentPacket = (struct rx_packet *)0;
1228             call->nFree = 0;
1229         } else {
1230             cp = rxi_AllocSendPacket(call, 0);
1231             if (!cp) {
1232                 /* Mode can no longer be MODE_SENDING */
1233                 return;
1234             }
1235             cp->length = 0;
1236             cp->niovecs = 2;    /* header + space for rxkad stuff */
1237             call->nFree = 0;
1238         }
1239
1240         /* The 1 specifies that this is the last packet */
1241         hadd32(call->bytesSent, cp->length);
1242         rxi_PrepareSendPacket(call, cp, 1);
1243         queue_Append(&call->tq, cp);
1244         if (!
1245             (call->
1246              flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
1247             rxi_Start(0, call, 0, 0);
1248         }
1249     }
1250 }
1251
1252 /* Flush any buffered data to the stream, switch to read mode
1253  * (clients) or to EOF mode (servers) */
1254 void
1255 rx_FlushWrite(struct rx_call *call)
1256 {
1257     SPLVAR;
1258     NETPRI;
1259     MUTEX_ENTER(&call->lock);
1260     rxi_FlushWrite(call);
1261     MUTEX_EXIT(&call->lock);
1262     USERPRI;
1263 }