91d14a64bc7e43682b7431ce5c19fb887cac4405
[openafs.git] / src / rx / rx_rdwr.c
1  /*
2   * Copyright 2000, International Business Machines Corporation and others.
3   * All Rights Reserved.
4   * 
5   * This software has been released under the terms of the IBM Public
6   * License.  For details, see the LICENSE file in the top-level source
7   * directory or online at http://www.openafs.org/dl/license10.html
8   */
9
10 #include <afsconfig.h>
11 #ifdef KERNEL
12 #include "afs/param.h"
13 #else
14 #include <afs/param.h>
15 #endif
16
17 RCSID
18     ("$Header$");
19
20 #ifdef KERNEL
21 #ifndef UKERNEL
22 #ifdef RX_KERNEL_TRACE
23 #include "rx_kcommon.h"
24 #endif
25 #if defined(AFS_DARWIN_ENV) || defined(AFS_XBSD_ENV)
26 #include "afs/sysincludes.h"
27 #else
28 #include "h/types.h"
29 #include "h/time.h"
30 #include "h/stat.h"
31 #if defined(AFS_AIX_ENV) || defined(AFS_AUX_ENV) || defined(AFS_SUN5_ENV) 
32 #include "h/systm.h"
33 #endif
34 #ifdef  AFS_OSF_ENV
35 #include <net/net_globals.h>
36 #endif /* AFS_OSF_ENV */
37 #ifdef AFS_LINUX20_ENV
38 #include "h/socket.h"
39 #endif
40 #include "netinet/in.h"
41 #if defined(AFS_SGI_ENV)
42 #include "afs/sysincludes.h"
43 #endif
44 #endif
45 #include "afs/afs_args.h"
46 #include "afs/afs_osi.h"
47 #if     (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
48 #include "h/systm.h"
49 #endif
50 #else /* !UKERNEL */
51 #include "afs/sysincludes.h"
52 #endif /* !UKERNEL */
53 #ifdef RXDEBUG
54 #undef RXDEBUG                  /* turn off debugging */
55 #endif /* RXDEBUG */
56
57 #include "rx_kmutex.h"
58 #include "rx/rx_kernel.h"
59 #include "rx/rx_clock.h"
60 #include "rx/rx_queue.h"
61 #include "rx/rx.h"
62 #include "rx/rx_globals.h"
63 #include "afs/lock.h"
64 #include "afsint.h"
65 #ifdef  AFS_OSF_ENV
66 #undef kmem_alloc
67 #undef kmem_free
68 #undef mem_alloc
69 #undef mem_free
70 #undef register
71 #endif /* AFS_OSF_ENV */
72 #else /* KERNEL */
73 # include <sys/types.h>
74 #ifdef AFS_NT40_ENV
75 # include <winsock2.h>
76 #else /* !AFS_NT40_ENV */
77 # include <sys/socket.h>
78 # include <sys/file.h>
79 # include <netdb.h>
80 # include <netinet/in.h>
81 # include <sys/stat.h>
82 # include <sys/time.h>
83 #endif /* !AFS_NT40_ENV */
84 #ifdef HAVE_STRING_H
85 #include <string.h>
86 #else
87 #ifdef HAVE_STRINGS_H
88 #include <strings.h>
89 #endif
90 #endif
91 #ifdef HAVE_UNISTD_H
92 #include <unistd.h>
93 #endif
94 # include "rx_user.h"
95 # include "rx_clock.h"
96 # include "rx_queue.h"
97 # include "rx.h"
98 # include "rx_globals.h"
99 #endif /* KERNEL */
100
101 #ifdef RX_LOCKS_DB
102 /* rxdb_fileID is used to identify the lock location, along with line#. */
103 static int rxdb_fileID = RXDB_FILE_RX_RDWR;
104 #endif /* RX_LOCKS_DB */
105 /* rxi_ReadProc -- internal version.
106  *
107  * LOCKS USED -- called at netpri with rx global lock and call->lock held.
108  */
109 int
110 rxi_ReadProc(register struct rx_call *call, register char *buf,
111              register int nbytes)
112 {
113     register struct rx_packet *cp = call->currentPacket;
114     register struct rx_packet *rp;
115     register int requestCount;
116     register unsigned int t;
117
118 /* XXXX took out clock_NewTime from here.  Was it needed? */
119     requestCount = nbytes;
120
121     /* Free any packets from the last call to ReadvProc/WritevProc */
122     if (queue_IsNotEmpty(&call->iovq)) {
123         rxi_FreePackets(0, &call->iovq);
124     }
125
126     do {
127         if (call->nLeft == 0) {
128             /* Get next packet */
129             for (;;) {
130                 if (call->error || (call->mode != RX_MODE_RECEIVING)) {
131                     if (call->error) {
132                         return 0;
133                     }
134                     if (call->mode == RX_MODE_SENDING) {
135                         rxi_FlushWrite(call);
136                         continue;
137                     }
138                 }
139                 if (queue_IsNotEmpty(&call->rq)) {
140                     /* Check that next packet available is next in sequence */
141                     rp = queue_First(&call->rq, rx_packet);
142                     if (rp->header.seq == call->rnext) {
143                         afs_int32 error;
144                         register struct rx_connection *conn = call->conn;
145                         queue_Remove(rp);
146
147                         /* RXS_CheckPacket called to undo RXS_PreparePacket's
148                          * work.  It may reduce the length of the packet by up
149                          * to conn->maxTrailerSize, to reflect the length of the
150                          * data + the header. */
151                         if ((error =
152                              RXS_CheckPacket(conn->securityObject, call,
153                                              rp))) {
154                             /* Used to merely shut down the call, but now we 
155                              * shut down the whole connection since this may 
156                              * indicate an attempt to hijack it */
157
158                             MUTEX_EXIT(&call->lock);
159                             rxi_ConnectionError(conn, error);
160                             MUTEX_ENTER(&conn->conn_data_lock);
161                             rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
162                             MUTEX_EXIT(&conn->conn_data_lock);
163                             rxi_FreePacket(rp);
164                             MUTEX_ENTER(&call->lock);
165
166                             return 0;
167                         }
168                         call->rnext++;
169                         cp = call->currentPacket = rp;
170                         call->curvec = 1;       /* 0th vec is always header */
171                         /* begin at the beginning [ more or less ], continue 
172                          * on until the end, then stop. */
173                         call->curpos =
174                             (char *)cp->wirevec[1].iov_base +
175                             call->conn->securityHeaderSize;
176                         call->curlen =
177                             cp->wirevec[1].iov_len -
178                             call->conn->securityHeaderSize;
179
180                         /* Notice that this code works correctly if the data
181                          * size is 0 (which it may be--no reply arguments from
182                          * server, for example).  This relies heavily on the
183                          * fact that the code below immediately frees the packet
184                          * (no yields, etc.).  If it didn't, this would be a
185                          * problem because a value of zero for call->nLeft
186                          * normally means that there is no read packet */
187                         call->nLeft = cp->length;
188                         hadd32(call->bytesRcvd, cp->length);
189
190                         /* Send a hard ack for every rxi_HardAckRate+1 packets
191                          * consumed. Otherwise schedule an event to send
192                          * the hard ack later on.
193                          */
194                         call->nHardAcks++;
195                         if (!(call->flags & RX_CALL_RECEIVE_DONE)) {
196                             if (call->nHardAcks > (u_short) rxi_HardAckRate) {
197                                 rxevent_Cancel(call->delayedAckEvent, call,
198                                                RX_CALL_REFCOUNT_DELAY);
199                                 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
200                             } else {
201                                 struct clock when;
202                                 clock_GetTime(&when);
203                                 /* Delay to consolidate ack packets */
204                                 clock_Add(&when, &rx_hardAckDelay);
205                                 if (!call->delayedAckEvent
206                                     || clock_Gt(&call->delayedAckEvent->
207                                                 eventTime, &when)) {
208                                     rxevent_Cancel(call->delayedAckEvent,
209                                                    call,
210                                                    RX_CALL_REFCOUNT_DELAY);
211                                     CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
212                                     call->delayedAckEvent =
213                                         rxevent_Post(&when,
214                                                      rxi_SendDelayedAck, call,
215                                                      0);
216                                 }
217                             }
218                         }
219                         break;
220                     }
221                 }
222
223 /*
224 MTUXXX  doesn't there need to be an "else" here ??? 
225 */
226                 /* Are there ever going to be any more packets? */
227                 if (call->flags & RX_CALL_RECEIVE_DONE) {
228                     return requestCount - nbytes;
229                 }
230                 /* Wait for in-sequence packet */
231                 call->flags |= RX_CALL_READER_WAIT;
232                 clock_NewTime();
233                 call->startWait = clock_Sec();
234                 while (call->flags & RX_CALL_READER_WAIT) {
235 #ifdef  RX_ENABLE_LOCKS
236                     CV_WAIT(&call->cv_rq, &call->lock);
237 #else
238                     osi_rxSleep(&call->rq);
239 #endif
240                 }
241
242                 call->startWait = 0;
243 #ifdef RX_ENABLE_LOCKS
244                 if (call->error) {
245                     return 0;
246                 }
247 #endif /* RX_ENABLE_LOCKS */
248             }
249         } else
250             /* assert(cp); */
251             /* MTUXXX  this should be replaced by some error-recovery code before shipping */
252             /* yes, the following block is allowed to be the ELSE clause (or not) */
253             /* It's possible for call->nLeft to be smaller than any particular
254              * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
255              * reflects the size of the buffer.  We have to keep track of the
256              * number of bytes read in the length field of the packet struct.  On
257              * the final portion of a received packet, it's almost certain that
258              * call->nLeft will be smaller than the final buffer. */
259             while (nbytes && cp) {
260                 t = MIN((int)call->curlen, nbytes);
261                 t = MIN(t, (int)call->nLeft);
262                 memcpy(buf, call->curpos, t);
263                 buf += t;
264                 nbytes -= t;
265                 call->curpos += t;
266                 call->curlen -= t;
267                 call->nLeft -= t;
268
269                 if (!call->nLeft) {
270                     /* out of packet.  Get another one. */
271                     rxi_FreePacket(cp);
272                     cp = call->currentPacket = (struct rx_packet *)0;
273                 } else if (!call->curlen) {
274                     /* need to get another struct iov */
275                     if (++call->curvec >= cp->niovecs) {
276                         /* current packet is exhausted, get ready for another */
277                         /* don't worry about curvec and stuff, they get set somewhere else */
278                         rxi_FreePacket(cp);
279                         cp = call->currentPacket = (struct rx_packet *)0;
280                         call->nLeft = 0;
281                     } else {
282                         call->curpos =
283                             (char *)cp->wirevec[call->curvec].iov_base;
284                         call->curlen = cp->wirevec[call->curvec].iov_len;
285                     }
286                 }
287             }
288         if (!nbytes) {
289             /* user buffer is full, return */
290             return requestCount;
291         }
292
293     } while (nbytes);
294
295     return requestCount;
296 }
297
298 int
299 rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
300 {
301     int bytes;
302     int tcurlen;
303     int tnLeft;
304     char *tcurpos;
305     SPLVAR;
306
307     /*
308      * Free any packets from the last call to ReadvProc/WritevProc.
309      * We do not need the lock because the receiver threads only
310      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
311      * RX_CALL_IOVEC_WAIT is always cleared before returning from
312      * ReadvProc/WritevProc.
313      */
314     if (!queue_IsEmpty(&call->iovq)) {
315         rxi_FreePackets(0, &call->iovq);
316     }
317
318     /*
319      * Most common case, all of the data is in the current iovec.
320      * We do not need the lock because this is the only thread that
321      * updates the curlen, curpos, nLeft fields.
322      *
323      * We are relying on nLeft being zero unless the call is in receive mode.
324      */
325     tcurlen = call->curlen;
326     tnLeft = call->nLeft;
327     if (!call->error && tcurlen > nbytes && tnLeft > nbytes) {
328         tcurpos = call->curpos;
329         memcpy(buf, tcurpos, nbytes);
330         call->curpos = tcurpos + nbytes;
331         call->curlen = tcurlen - nbytes;
332         call->nLeft = tnLeft - nbytes;
333         return nbytes;
334     }
335
336     NETPRI;
337     MUTEX_ENTER(&call->lock);
338     bytes = rxi_ReadProc(call, buf, nbytes);
339     MUTEX_EXIT(&call->lock);
340     USERPRI;
341     return bytes;
342 }
343
344 /* Optimization for unmarshalling 32 bit integers */
345 int
346 rx_ReadProc32(struct rx_call *call, afs_int32 * value)
347 {
348     int bytes;
349     int tcurlen;
350     int tnLeft;
351     char *tcurpos;
352     SPLVAR;
353
354     /*
355      * Free any packets from the last call to ReadvProc/WritevProc.
356      * We do not need the lock because the receiver threads only
357      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
358      * RX_CALL_IOVEC_WAIT is always cleared before returning from
359      * ReadvProc/WritevProc.
360      */
361     if (!queue_IsEmpty(&call->iovq)) {
362         rxi_FreePackets(0, &call->iovq);
363     }
364
365     /*
366      * Most common case, all of the data is in the current iovec.
367      * We do not need the lock because this is the only thread that
368      * updates the curlen, curpos, nLeft fields.
369      *
370      * We are relying on nLeft being zero unless the call is in receive mode.
371      */
372     tcurlen = call->curlen;
373     tnLeft = call->nLeft;
374     if (!call->error && tcurlen >= sizeof(afs_int32)
375         && tnLeft >= sizeof(afs_int32)) {
376         tcurpos = call->curpos;
377         memcpy((char *)value, tcurpos, sizeof(afs_int32));
378         call->curpos = tcurpos + sizeof(afs_int32);
379         call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
380         call->nLeft = (u_short)(tnLeft - sizeof(afs_int32));
381         return sizeof(afs_int32);
382     }
383
384     NETPRI;
385     MUTEX_ENTER(&call->lock);
386     bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
387     MUTEX_EXIT(&call->lock);
388     USERPRI;
389     return bytes;
390 }
391
392 /* rxi_FillReadVec
393  *
394  * Uses packets in the receive queue to fill in as much of the
395  * current iovec as possible. Does not block if it runs out
396  * of packets to complete the iovec. Return true if an ack packet
397  * was sent, otherwise return false */
398 int
399 rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
400 {
401     int didConsume = 0;
402     int didHardAck = 0;
403     register unsigned int t;
404     struct rx_packet *rp;
405     struct rx_packet *curp;
406     struct iovec *call_iov;
407     struct iovec *cur_iov = NULL;
408
409     curp = call->currentPacket;
410     if (curp) {
411         cur_iov = &curp->wirevec[call->curvec];
412     }
413     call_iov = &call->iov[call->iovNext];
414
415     while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) {
416         if (call->nLeft == 0) {
417             /* Get next packet */
418             if (queue_IsNotEmpty(&call->rq)) {
419                 /* Check that next packet available is next in sequence */
420                 rp = queue_First(&call->rq, rx_packet);
421                 if (rp->header.seq == call->rnext) {
422                     afs_int32 error;
423                     register struct rx_connection *conn = call->conn;
424                     queue_Remove(rp);
425
426                     /* RXS_CheckPacket called to undo RXS_PreparePacket's
427                      * work.  It may reduce the length of the packet by up
428                      * to conn->maxTrailerSize, to reflect the length of the
429                      * data + the header. */
430                     if ((error =
431                          RXS_CheckPacket(conn->securityObject, call, rp))) {
432                         /* Used to merely shut down the call, but now we 
433                          * shut down the whole connection since this may 
434                          * indicate an attempt to hijack it */
435
436                         MUTEX_EXIT(&call->lock);
437                         rxi_ConnectionError(conn, error);
438                         MUTEX_ENTER(&conn->conn_data_lock);
439                         rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
440                         MUTEX_EXIT(&conn->conn_data_lock);
441                         rxi_FreePacket(rp);
442                         MUTEX_ENTER(&call->lock);
443
444                         return 1;
445                     }
446                     call->rnext++;
447                     curp = call->currentPacket = rp;
448                     call->curvec = 1;   /* 0th vec is always header */
449                     cur_iov = &curp->wirevec[1];
450                     /* begin at the beginning [ more or less ], continue 
451                      * on until the end, then stop. */
452                     call->curpos =
453                         (char *)curp->wirevec[1].iov_base +
454                         call->conn->securityHeaderSize;
455                     call->curlen =
456                         curp->wirevec[1].iov_len -
457                         call->conn->securityHeaderSize;
458
459                     /* Notice that this code works correctly if the data
460                      * size is 0 (which it may be--no reply arguments from
461                      * server, for example).  This relies heavily on the
462                      * fact that the code below immediately frees the packet
463                      * (no yields, etc.).  If it didn't, this would be a
464                      * problem because a value of zero for call->nLeft
465                      * normally means that there is no read packet */
466                     call->nLeft = curp->length;
467                     hadd32(call->bytesRcvd, curp->length);
468
469                     /* Send a hard ack for every rxi_HardAckRate+1 packets
470                      * consumed. Otherwise schedule an event to send
471                      * the hard ack later on.
472                      */
473                     call->nHardAcks++;
474                     didConsume = 1;
475                     continue;
476                 }
477             }
478             break;
479         }
480
481         /* It's possible for call->nLeft to be smaller than any particular
482          * iov_len.  Usually, recvmsg doesn't change the iov_len, since it
483          * reflects the size of the buffer.  We have to keep track of the
484          * number of bytes read in the length field of the packet struct.  On
485          * the final portion of a received packet, it's almost certain that
486          * call->nLeft will be smaller than the final buffer. */
487         while (call->iovNBytes && call->iovNext < call->iovMax && curp) {
488
489             t = MIN((int)call->curlen, call->iovNBytes);
490             t = MIN(t, (int)call->nLeft);
491             call_iov->iov_base = call->curpos;
492             call_iov->iov_len = t;
493             call_iov++;
494             call->iovNext++;
495             call->iovNBytes -= t;
496             call->curpos += t;
497             call->curlen -= t;
498             call->nLeft -= t;
499
500             if (!call->nLeft) {
501                 /* out of packet.  Get another one. */
502                 queue_Append(&call->iovq, curp);
503                 curp = call->currentPacket = (struct rx_packet *)0;
504             } else if (!call->curlen) {
505                 /* need to get another struct iov */
506                 if (++call->curvec >= curp->niovecs) {
507                     /* current packet is exhausted, get ready for another */
508                     /* don't worry about curvec and stuff, they get set somewhere else */
509                     queue_Append(&call->iovq, curp);
510                     curp = call->currentPacket = (struct rx_packet *)0;
511                     call->nLeft = 0;
512                 } else {
513                     cur_iov++;
514                     call->curpos = (char *)cur_iov->iov_base;
515                     call->curlen = cur_iov->iov_len;
516                 }
517             }
518         }
519     }
520
521     /* If we consumed any packets then check whether we need to
522      * send a hard ack. */
523     if (didConsume && (!(call->flags & RX_CALL_RECEIVE_DONE))) {
524         if (call->nHardAcks > (u_short) rxi_HardAckRate) {
525             rxevent_Cancel(call->delayedAckEvent, call,
526                            RX_CALL_REFCOUNT_DELAY);
527             rxi_SendAck(call, 0, serial, RX_ACK_DELAY, 0);
528             didHardAck = 1;
529         } else {
530             struct clock when;
531             clock_GetTime(&when);
532             /* Delay to consolidate ack packets */
533             clock_Add(&when, &rx_hardAckDelay);
534             if (!call->delayedAckEvent
535                 || clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
536                 rxevent_Cancel(call->delayedAckEvent, call,
537                                RX_CALL_REFCOUNT_DELAY);
538                 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
539                 call->delayedAckEvent =
540                     rxevent_Post(&when, rxi_SendDelayedAck, call, 0);
541             }
542         }
543     }
544     return didHardAck;
545 }
546
547
548 /* rxi_ReadvProc -- internal version.
549  *
550  * Fills in an iovec with pointers to the packet buffers. All packets
551  * except the last packet (new current packet) are moved to the iovq
552  * while the application is processing the data.
553  *
554  * LOCKS USED -- called at netpri with rx global lock and call->lock held.
555  */
556 int
557 rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
558               int nbytes)
559 {
560     struct rx_packet *rp;
561     int requestCount;
562     int nextio;
563
564     requestCount = nbytes;
565     nextio = 0;
566
567     /* Free any packets from the last call to ReadvProc/WritevProc */
568     if (queue_IsNotEmpty(&call->iovq)) {
569         rxi_FreePackets(0, &call->iovq);
570     }
571
572     if (call->mode == RX_MODE_SENDING) {
573         rxi_FlushWrite(call);
574     }
575
576     if (call->error) {
577         return 0;
578     }
579
580     /* Get whatever data is currently available in the receive queue.
581      * If rxi_FillReadVec sends an ack packet then it is possible
582      * that we will receive more data while we drop the call lock
583      * to send the packet. Set the RX_CALL_IOVEC_WAIT flag
584      * here to avoid a race with the receive thread if we send
585      * hard acks in rxi_FillReadVec. */
586     call->flags |= RX_CALL_IOVEC_WAIT;
587     call->iovNBytes = nbytes;
588     call->iovMax = maxio;
589     call->iovNext = 0;
590     call->iov = iov;
591     rxi_FillReadVec(call, 0);
592
593     /* if we need more data then sleep until the receive thread has
594      * filled in the rest. */
595     if (!call->error && call->iovNBytes && call->iovNext < call->iovMax
596         && !(call->flags & RX_CALL_RECEIVE_DONE)) {
597         call->flags |= RX_CALL_READER_WAIT;
598         clock_NewTime();
599         call->startWait = clock_Sec();
600         while (call->flags & RX_CALL_READER_WAIT) {
601 #ifdef  RX_ENABLE_LOCKS
602             CV_WAIT(&call->cv_rq, &call->lock);
603 #else
604             osi_rxSleep(&call->rq);
605 #endif
606         }
607         call->startWait = 0;
608     }
609     call->flags &= ~RX_CALL_IOVEC_WAIT;
610 #ifdef RX_ENABLE_LOCKS
611     if (call->error) {
612         return 0;
613     }
614 #endif /* RX_ENABLE_LOCKS */
615
616     call->iov = NULL;
617     *nio = call->iovNext;
618     return nbytes - call->iovNBytes;
619 }
620
621 int
622 rx_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
623              int nbytes)
624 {
625     int bytes;
626     SPLVAR;
627
628     NETPRI;
629     MUTEX_ENTER(&call->lock);
630     bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
631     MUTEX_EXIT(&call->lock);
632     USERPRI;
633     return bytes;
634 }
635
636 /* rxi_WriteProc -- internal version.
637  *
638  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
639
640 int
641 rxi_WriteProc(register struct rx_call *call, register char *buf,
642               register int nbytes)
643 {
644     struct rx_connection *conn = call->conn;
645     register struct rx_packet *cp = call->currentPacket;
646     register unsigned int t;
647     int requestCount = nbytes;
648
649     /* Free any packets from the last call to ReadvProc/WritevProc */
650     if (queue_IsNotEmpty(&call->iovq)) {
651         rxi_FreePackets(0, &call->iovq);
652     }
653
654     if (call->mode != RX_MODE_SENDING) {
655         if ((conn->type == RX_SERVER_CONNECTION)
656             && (call->mode == RX_MODE_RECEIVING)) {
657             call->mode = RX_MODE_SENDING;
658             if (cp) {
659                 rxi_FreePacket(cp);
660                 cp = call->currentPacket = (struct rx_packet *)0;
661                 call->nLeft = 0;
662                 call->nFree = 0;
663             }
664         } else {
665             return 0;
666         }
667     }
668
669     /* Loop condition is checked at end, so that a write of 0 bytes
670      * will force a packet to be created--specially for the case where
671      * there are 0 bytes on the stream, but we must send a packet
672      * anyway. */
673     do {
674         if (call->nFree == 0) {
675             if (!call->error && cp) {
676 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
677                 /* Wait until TQ_BUSY is reset before adding any
678                  * packets to the transmit queue
679                  */
680                 while (call->flags & RX_CALL_TQ_BUSY) {
681                     call->flags |= RX_CALL_TQ_WAIT;
682 #ifdef RX_ENABLE_LOCKS
683                     CV_WAIT(&call->cv_tq, &call->lock);
684 #else /* RX_ENABLE_LOCKS */
685                     osi_rxSleep(&call->tq);
686 #endif /* RX_ENABLE_LOCKS */
687                 }
688 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
689                 clock_NewTime();        /* Bogus:  need new time package */
690                 /* The 0, below, specifies that it is not the last packet: 
691                  * there will be others. PrepareSendPacket may
692                  * alter the packet length by up to
693                  * conn->securityMaxTrailerSize */
694                 hadd32(call->bytesSent, cp->length);
695                 rxi_PrepareSendPacket(call, cp, 0);
696                 queue_Append(&call->tq, cp);
697                 cp = call->currentPacket = NULL;
698                 if (!
699                     (call->
700                      flags & (RX_CALL_FAST_RECOVER |
701                               RX_CALL_FAST_RECOVER_WAIT))) {
702                     rxi_Start(0, call, 0, 0);
703                 }
704             }
705             /* Wait for transmit window to open up */
706             while (!call->error
707                    && call->tnext + 1 > call->tfirst + call->twind) {
708                 clock_NewTime();
709                 call->startWait = clock_Sec();
710
711 #ifdef  RX_ENABLE_LOCKS
712                 CV_WAIT(&call->cv_twind, &call->lock);
713 #else
714                 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
715                 osi_rxSleep(&call->twind);
716 #endif
717
718                 call->startWait = 0;
719 #ifdef RX_ENABLE_LOCKS
720                 if (call->error) {
721                     return 0;
722                 }
723 #endif /* RX_ENABLE_LOCKS */
724             }
725             if ((cp = rxi_AllocSendPacket(call, nbytes))) {
726                 call->currentPacket = cp;
727                 call->nFree = cp->length;
728                 call->curvec = 1;       /* 0th vec is always header */
729                 /* begin at the beginning [ more or less ], continue 
730                  * on until the end, then stop. */
731                 call->curpos =
732                     (char *)cp->wirevec[1].iov_base +
733                     call->conn->securityHeaderSize;
734                 call->curlen =
735                     cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
736             }
737             if (call->error) {
738                 if (cp) {
739                     rxi_FreePacket(cp);
740                     call->currentPacket = NULL;
741                 }
742                 return 0;
743             }
744         }
745
746         if (cp && (int)call->nFree < nbytes) {
747             /* Try to extend the current buffer */
748             register int len, mud;
749             len = cp->length;
750             mud = rx_MaxUserDataSize(call);
751             if (mud > len) {
752                 int want;
753                 want = MIN(nbytes - (int)call->nFree, mud - len);
754                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
755                 if (cp->length > (unsigned)mud)
756                     cp->length = mud;
757                 call->nFree += (cp->length - len);
758             }
759         }
760
761         /* If the remaining bytes fit in the buffer, then store them
762          * and return.  Don't ship a buffer that's full immediately to
763          * the peer--we don't know if it's the last buffer yet */
764
765         if (!cp) {
766             call->nFree = 0;
767         }
768
769         while (nbytes && call->nFree) {
770
771             t = MIN((int)call->curlen, nbytes);
772             t = MIN((int)call->nFree, t);
773             memcpy(call->curpos, buf, t);
774             buf += t;
775             nbytes -= t;
776             call->curpos += t;
777             call->curlen -= (u_short)t;
778             call->nFree -= (u_short)t;
779
780             if (!call->curlen) {
781                 /* need to get another struct iov */
782                 if (++call->curvec >= cp->niovecs) {
783                     /* current packet is full, extend or send it */
784                     call->nFree = 0;
785                 } else {
786                     call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
787                     call->curlen = cp->wirevec[call->curvec].iov_len;
788                 }
789             }
790         }                       /* while bytes to send and room to send them */
791
792         /* might be out of space now */
793         if (!nbytes) {
794             return requestCount;
795         } else;                 /* more data to send, so get another packet and keep going */
796     } while (nbytes);
797
798     return requestCount - nbytes;
799 }
800
801 int
802 rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
803 {
804     int bytes;
805     int tcurlen;
806     int tnFree;
807     char *tcurpos;
808     SPLVAR;
809
810     /*
811      * Free any packets from the last call to ReadvProc/WritevProc.
812      * We do not need the lock because the receiver threads only
813      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
814      * RX_CALL_IOVEC_WAIT is always cleared before returning from
815      * ReadvProc/WritevProc.
816      */
817     if (queue_IsNotEmpty(&call->iovq)) {
818         rxi_FreePackets(0, &call->iovq);
819     }
820
821     /*
822      * Most common case: all of the data fits in the current iovec.
823      * We do not need the lock because this is the only thread that
824      * updates the curlen, curpos, nFree fields.
825      *
826      * We are relying on nFree being zero unless the call is in send mode.
827      */
828     tcurlen = (int)call->curlen;
829     tnFree = (int)call->nFree;
830     if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
831         tcurpos = call->curpos;
832         memcpy(tcurpos, buf, nbytes);
833         call->curpos = tcurpos + nbytes;
834         call->curlen = (u_short)(tcurlen - nbytes);
835         call->nFree = (u_short)(tnFree - nbytes);
836         return nbytes;
837     }
838
839     NETPRI;
840     MUTEX_ENTER(&call->lock);
841     bytes = rxi_WriteProc(call, buf, nbytes);
842     MUTEX_EXIT(&call->lock);
843     USERPRI;
844     return bytes;
845 }
846
847 /* Optimization for marshalling 32 bit arguments */
848 int
849 rx_WriteProc32(register struct rx_call *call, register afs_int32 * value)
850 {
851     int bytes;
852     int tcurlen;
853     int tnFree;
854     char *tcurpos;
855     SPLVAR;
856
857     /*
858      * Free any packets from the last call to ReadvProc/WritevProc.
859      * We do not need the lock because the receiver threads only
860      * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
861      * RX_CALL_IOVEC_WAIT is always cleared before returning from
862      * ReadvProc/WritevProc.
863      */
864     if (queue_IsNotEmpty(&call->iovq)) {
865         rxi_FreePackets(0, &call->iovq);
866     }
867
868     /*
869      * Most common case: all of the data fits in the current iovec.
870      * We do not need the lock because this is the only thread that
871      * updates the curlen, curpos, nFree fields.
872      *
873      * We are relying on nFree being zero unless the call is in send mode.
874      */
875     tcurlen = call->curlen;
876     tnFree = call->nFree;
877     if (!call->error && tcurlen >= sizeof(afs_int32)
878         && tnFree >= sizeof(afs_int32)) {
879         tcurpos = call->curpos;
880         if (!((size_t)tcurpos & (sizeof(afs_int32) - 1))) {
881             *((afs_int32 *) (tcurpos)) = *value;
882         } else {
883             memcpy(tcurpos, (char *)value, sizeof(afs_int32));
884         }
885         call->curpos = tcurpos + sizeof(afs_int32);
886         call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
887         call->nFree = (u_short)(tnFree - sizeof(afs_int32));
888         return sizeof(afs_int32);
889     }
890
891     NETPRI;
892     MUTEX_ENTER(&call->lock);
893     bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
894     MUTEX_EXIT(&call->lock);
895     USERPRI;
896     return bytes;
897 }
898
899 /* rxi_WritevAlloc -- internal version.
900  *
901  * Fill in an iovec to point to data in packet buffers. The application
902  * calls rxi_WritevProc when the buffers are full.
903  *
904  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
905
906 int
907 rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
908                 int nbytes)
909 {
910     struct rx_connection *conn = call->conn;
911     struct rx_packet *cp = call->currentPacket;
912     int requestCount;
913     int nextio;
914     /* Temporary values, real work is done in rxi_WritevProc */
915     int tnFree;
916     int tcurvec;
917     char *tcurpos;
918     int tcurlen;
919
920     requestCount = nbytes;
921     nextio = 0;
922
923     /* Free any packets from the last call to ReadvProc/WritevProc */
924     if (queue_IsNotEmpty(&call->iovq)) {
925         rxi_FreePackets(0, &call->iovq);
926     }
927
928     if (call->mode != RX_MODE_SENDING) {
929         if ((conn->type == RX_SERVER_CONNECTION)
930             && (call->mode == RX_MODE_RECEIVING)) {
931             call->mode = RX_MODE_SENDING;
932             if (cp) {
933                 rxi_FreePacket(cp);
934                 cp = call->currentPacket = (struct rx_packet *)0;
935                 call->nLeft = 0;
936                 call->nFree = 0;
937             }
938         } else {
939             return 0;
940         }
941     }
942
943     /* Set up the iovec to point to data in packet buffers. */
944     tnFree = call->nFree;
945     tcurvec = call->curvec;
946     tcurpos = call->curpos;
947     tcurlen = call->curlen;
948     do {
949         register unsigned int t;
950
951         if (tnFree == 0) {
952             /* current packet is full, allocate a new one */
953             cp = rxi_AllocSendPacket(call, nbytes);
954             if (cp == NULL) {
955                 /* out of space, return what we have */
956                 *nio = nextio;
957                 return requestCount - nbytes;
958             }
959             queue_Append(&call->iovq, cp);
960             tnFree = cp->length;
961             tcurvec = 1;
962             tcurpos =
963                 (char *)cp->wirevec[1].iov_base +
964                 call->conn->securityHeaderSize;
965             tcurlen = cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
966         }
967
968         if (tnFree < nbytes) {
969             /* try to extend the current packet */
970             register int len, mud;
971             len = cp->length;
972             mud = rx_MaxUserDataSize(call);
973             if (mud > len) {
974                 int want;
975                 want = MIN(nbytes - tnFree, mud - len);
976                 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
977                 if (cp->length > (unsigned)mud)
978                     cp->length = mud;
979                 tnFree += (cp->length - len);
980                 if (cp == call->currentPacket) {
981                     call->nFree += (cp->length - len);
982                 }
983             }
984         }
985
986         /* fill in the next entry in the iovec */
987         t = MIN(tcurlen, nbytes);
988         t = MIN(tnFree, t);
989         iov[nextio].iov_base = tcurpos;
990         iov[nextio].iov_len = t;
991         nbytes -= t;
992         tcurpos += t;
993         tcurlen -= t;
994         tnFree -= t;
995         nextio++;
996
997         if (!tcurlen) {
998             /* need to get another struct iov */
999             if (++tcurvec >= cp->niovecs) {
1000                 /* current packet is full, extend it or move on to next packet */
1001                 tnFree = 0;
1002             } else {
1003                 tcurpos = (char *)cp->wirevec[tcurvec].iov_base;
1004                 tcurlen = cp->wirevec[tcurvec].iov_len;
1005             }
1006         }
1007     } while (nbytes && nextio < maxio);
1008     *nio = nextio;
1009     return requestCount - nbytes;
1010 }
1011
1012 int
1013 rx_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
1014                int nbytes)
1015 {
1016     int bytes;
1017     SPLVAR;
1018
1019     NETPRI;
1020     MUTEX_ENTER(&call->lock);
1021     bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
1022     MUTEX_EXIT(&call->lock);
1023     USERPRI;
1024     return bytes;
1025 }
1026
1027 /* rxi_WritevProc -- internal version.
1028  *
1029  * Send buffers allocated in rxi_WritevAlloc.
1030  *
1031  * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
1032
1033 int
1034 rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1035 {
1036     struct rx_packet *cp = call->currentPacket;
1037     int nextio;
1038     int requestCount;
1039     struct rx_queue tmpq;
1040
1041     requestCount = nbytes;
1042     nextio = 0;
1043
1044     if (call->mode != RX_MODE_SENDING) {
1045         call->error = RX_PROTOCOL_ERROR;
1046     }
1047 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1048     /* Wait until TQ_BUSY is reset before trying to move any
1049      * packets to the transmit queue.  */
1050     while (!call->error && call->flags & RX_CALL_TQ_BUSY) {
1051         call->flags |= RX_CALL_TQ_WAIT;
1052 #ifdef RX_ENABLE_LOCKS
1053         CV_WAIT(&call->cv_tq, &call->lock);
1054 #else /* RX_ENABLE_LOCKS */
1055         osi_rxSleep(&call->tq);
1056 #endif /* RX_ENABLE_LOCKS */
1057     }
1058 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1059
1060     if (call->error) {
1061         if (cp) {
1062             queue_Prepend(&call->iovq, cp);
1063             cp = call->currentPacket = NULL;
1064         }
1065         rxi_FreePackets(0, &call->iovq);
1066         return 0;
1067     }
1068
1069     /* Loop through the I/O vector adjusting packet pointers.
1070      * Place full packets back onto the iovq once they are ready
1071      * to send. Set RX_PROTOCOL_ERROR if any problems are found in
1072      * the iovec. We put the loop condition at the end to ensure that
1073      * a zero length write will push a short packet. */
1074     nextio = 0;
1075     queue_Init(&tmpq);
1076     do {
1077         if (call->nFree == 0 && cp) {
1078             clock_NewTime();    /* Bogus:  need new time package */
1079             /* The 0, below, specifies that it is not the last packet: 
1080              * there will be others. PrepareSendPacket may
1081              * alter the packet length by up to
1082              * conn->securityMaxTrailerSize */
1083             hadd32(call->bytesSent, cp->length);
1084             rxi_PrepareSendPacket(call, cp, 0);
1085             queue_Append(&tmpq, cp);
1086
1087             /* The head of the iovq is now the current packet */
1088             if (nbytes) {
1089                 if (queue_IsEmpty(&call->iovq)) {
1090                     call->error = RX_PROTOCOL_ERROR;
1091                     cp = call->currentPacket = NULL;
1092                     rxi_FreePackets(0, &tmpq);
1093                     return 0;
1094                 }
1095                 cp = queue_First(&call->iovq, rx_packet);
1096                 queue_Remove(cp);
1097                 call->currentPacket = cp;
1098                 call->nFree = cp->length;
1099                 call->curvec = 1;
1100                 call->curpos =
1101                     (char *)cp->wirevec[1].iov_base +
1102                     call->conn->securityHeaderSize;
1103                 call->curlen =
1104                     cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
1105             }
1106         }
1107
1108         if (nbytes) {
1109             /* The next iovec should point to the current position */
1110             if (iov[nextio].iov_base != call->curpos
1111                 || iov[nextio].iov_len > (int)call->curlen) {
1112                 call->error = RX_PROTOCOL_ERROR;
1113                 if (cp) {
1114                     queue_Prepend(&tmpq, cp);
1115                     call->currentPacket = NULL;
1116                 }
1117                 rxi_FreePackets(0, &tmpq);
1118                 return 0;
1119             }
1120             nbytes -= iov[nextio].iov_len;
1121             call->curpos += iov[nextio].iov_len;
1122             call->curlen -= iov[nextio].iov_len;
1123             call->nFree -= iov[nextio].iov_len;
1124             nextio++;
1125             if (call->curlen == 0) {
1126                 if (++call->curvec > cp->niovecs) {
1127                     call->nFree = 0;
1128                 } else {
1129                     call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
1130                     call->curlen = cp->wirevec[call->curvec].iov_len;
1131                 }
1132             }
1133         }
1134     } while (nbytes && nextio < nio);
1135
1136     /* Move the packets from the temporary queue onto the transmit queue.
1137      * We may end up with more than call->twind packets on the queue. */
1138     queue_SpliceAppend(&call->tq, &tmpq);
1139
1140     if (!(call->flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
1141         rxi_Start(0, call, 0, 0);
1142     }
1143
1144     /* Wait for the length of the transmit queue to fall below call->twind */
1145     while (!call->error && call->tnext + 1 > call->tfirst + call->twind) {
1146         clock_NewTime();
1147         call->startWait = clock_Sec();
1148 #ifdef  RX_ENABLE_LOCKS
1149         CV_WAIT(&call->cv_twind, &call->lock);
1150 #else
1151         call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
1152         osi_rxSleep(&call->twind);
1153 #endif
1154         call->startWait = 0;
1155     }
1156
1157     if (call->error) {
1158         if (cp) {
1159             rxi_FreePacket(cp);
1160             cp = call->currentPacket = NULL;
1161         }
1162         return 0;
1163     }
1164
1165     return requestCount - nbytes;
1166 }
1167
1168 int
1169 rx_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1170 {
1171     int bytes;
1172     SPLVAR;
1173
1174     NETPRI;
1175     MUTEX_ENTER(&call->lock);
1176     bytes = rxi_WritevProc(call, iov, nio, nbytes);
1177     MUTEX_EXIT(&call->lock);
1178     USERPRI;
1179     return bytes;
1180 }
1181
1182 /* Flush any buffered data to the stream, switch to read mode
1183  * (clients) or to EOF mode (servers) */
1184 void
1185 rxi_FlushWrite(register struct rx_call *call)
1186 {
1187     register struct rx_packet *cp = call->currentPacket;
1188
1189     /* Free any packets from the last call to ReadvProc/WritevProc */
1190     if (queue_IsNotEmpty(&call->iovq)) {
1191         rxi_FreePackets(0, &call->iovq);
1192     }
1193
1194     if (call->mode == RX_MODE_SENDING) {
1195
1196         call->mode =
1197             (call->conn->type ==
1198              RX_CLIENT_CONNECTION ? RX_MODE_RECEIVING : RX_MODE_EOF);
1199
1200 #ifdef RX_KERNEL_TRACE
1201         {
1202             int glockOwner = ISAFS_GLOCK();
1203             if (!glockOwner)
1204                 AFS_GLOCK();
1205             afs_Trace3(afs_iclSetp, CM_TRACE_WASHERE, ICL_TYPE_STRING,
1206                        __FILE__, ICL_TYPE_INT32, __LINE__, ICL_TYPE_POINTER,
1207                        call);
1208             if (!glockOwner)
1209                 AFS_GUNLOCK();
1210         }
1211 #endif
1212
1213 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1214         /* Wait until TQ_BUSY is reset before adding any
1215          * packets to the transmit queue
1216          */
1217         while (call->flags & RX_CALL_TQ_BUSY) {
1218             call->flags |= RX_CALL_TQ_WAIT;
1219 #ifdef RX_ENABLE_LOCKS
1220             CV_WAIT(&call->cv_tq, &call->lock);
1221 #else /* RX_ENABLE_LOCKS */
1222             osi_rxSleep(&call->tq);
1223 #endif /* RX_ENABLE_LOCKS */
1224         }
1225 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1226
1227         if (cp) {
1228             /* cp->length is only supposed to be the user's data */
1229             /* cp->length was already set to (then-current) 
1230              * MaxUserDataSize or less. */
1231             cp->length -= call->nFree;
1232             call->currentPacket = (struct rx_packet *)0;
1233             call->nFree = 0;
1234         } else {
1235             cp = rxi_AllocSendPacket(call, 0);
1236             if (!cp) {
1237                 /* Mode can no longer be MODE_SENDING */
1238                 return;
1239             }
1240             cp->length = 0;
1241             cp->niovecs = 2;    /* header + space for rxkad stuff */
1242             call->nFree = 0;
1243         }
1244
1245         /* The 1 specifies that this is the last packet */
1246         hadd32(call->bytesSent, cp->length);
1247         rxi_PrepareSendPacket(call, cp, 1);
1248         queue_Append(&call->tq, cp);
1249         if (!
1250             (call->
1251              flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
1252             rxi_Start(0, call, 0, 0);
1253         }
1254     }
1255 }
1256
1257 /* Flush any buffered data to the stream, switch to read mode
1258  * (clients) or to EOF mode (servers) */
1259 void
1260 rx_FlushWrite(struct rx_call *call)
1261 {
1262     SPLVAR;
1263     NETPRI;
1264     MUTEX_ENTER(&call->lock);
1265     rxi_FlushWrite(call);
1266     MUTEX_EXIT(&call->lock);
1267     USERPRI;
1268 }