2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
11 #include "../afs/param.h"
13 #include <afs/param.h>
15 #include <afsconfig.h>
21 #if defined(AFS_DARWIN_ENV) || defined(AFS_FBSD_ENV)
22 #include "../afs/sysincludes.h"
24 #include "../h/types.h"
25 #include "../h/time.h"
26 #include "../h/stat.h"
28 #include <net/net_globals.h>
29 #endif /* AFS_OSF_ENV */
30 #ifdef AFS_LINUX20_ENV
31 #include "../h/socket.h"
33 #include "../netinet/in.h"
34 #if defined(AFS_SGI_ENV)
35 #include "../afs/sysincludes.h"
38 #include "../afs/afs_args.h"
39 #include "../afs/afs_osi.h"
40 #if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
41 #include "../h/systm.h"
44 #include "../afs/sysincludes.h"
47 #undef RXDEBUG /* turn off debugging */
50 #include "../rx/rx_kmutex.h"
51 #include "../rx/rx_kernel.h"
52 #include "../rx/rx_clock.h"
53 #include "../rx/rx_queue.h"
55 #include "../rx/rx_globals.h"
56 #include "../afs/lock.h"
57 #include "../afsint/afsint.h"
64 #endif /* AFS_ALPHA_ENV */
66 # include <sys/types.h>
68 # include <sys/socket.h>
69 # include <sys/file.h>
71 # include <netinet/in.h>
72 # include <sys/stat.h>
73 # include <sys/time.h>
86 # include "rx_clock.h"
87 # include "rx_queue.h"
89 # include "rx_globals.h"
90 # include "rx_internal.h"
94 /* rxdb_fileID is used to identify the lock location, along with line#. */
95 static int rxdb_fileID = RXDB_FILE_RX_RDWR;
96 #endif /* RX_LOCKS_DB */
97 /* rxi_ReadProc -- internal version.
99 * LOCKS USED -- called at netpri with rx global lock and call->lock held.
101 int rxi_ReadProc(call, buf, nbytes)
102 register struct rx_call *call;
106 register struct rx_packet *cp = call->currentPacket;
107 register struct rx_packet *rp;
108 register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
109 register int requestCount;
110 register unsigned int t;
111 /* XXXX took out clock_NewTime from here. Was it needed? */
112 requestCount = nbytes;
114 /* Free any packets from the last call to ReadvProc/WritevProc */
115 if (!queue_IsEmpty(&call->iovq)) {
116 for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
123 if (call->nLeft == 0) {
124 /* Get next packet */
126 if (call->error || (call->mode != RX_MODE_RECEIVING)) {
130 if (call->mode == RX_MODE_SENDING) {
131 rxi_FlushWrite(call);
135 if (queue_IsNotEmpty(&call->rq)) {
136 /* Check that next packet available is next in sequence */
137 rp = queue_First(&call->rq, rx_packet);
138 if (rp->header.seq == call->rnext) {
140 register struct rx_connection *conn = call->conn;
143 /* RXS_CheckPacket called to undo RXS_PreparePacket's
144 * work. It may reduce the length of the packet by up
145 * to conn->maxTrailerSize, to reflect the length of the
146 * data + the header. */
147 if ((error = RXS_CheckPacket(conn->securityObject, call, rp))) {
148 /* Used to merely shut down the call, but now we
149 * shut down the whole connection since this may
150 * indicate an attempt to hijack it */
152 MUTEX_EXIT(&call->lock);
153 rxi_ConnectionError(conn, error);
154 MUTEX_ENTER(&conn->conn_data_lock);
155 rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
156 MUTEX_EXIT(&conn->conn_data_lock);
158 MUTEX_ENTER(&call->lock);
163 cp = call->currentPacket = rp;
164 call->curvec = 1; /* 0th vec is always header */
165 /* begin at the beginning [ more or less ], continue
166 * on until the end, then stop. */
167 call->curpos = (char *)cp->wirevec[1].iov_base
168 + call->conn->securityHeaderSize;
169 call->curlen = cp->wirevec[1].iov_len
170 - call->conn->securityHeaderSize;
172 /* Notice that this code works correctly if the data
173 * size is 0 (which it may be--no reply arguments from
174 * server, for example). This relies heavily on the
175 * fact that the code below immediately frees the packet
176 * (no yields, etc.). If it didn't, this would be a
177 * problem because a value of zero for call->nLeft
178 * normally means that there is no read packet */
179 call->nLeft = cp->length;
180 hadd32(call->bytesRcvd, cp->length);
182 /* Send a hard ack for every rxi_HardAckRate+1 packets
183 * consumed. Otherwise schedule an event to send
184 * the hard ack later on.
187 if (!(call->flags &RX_CALL_RECEIVE_DONE)) {
188 if (call->nHardAcks > (u_short)rxi_HardAckRate) {
189 rxevent_Cancel(call->delayedAckEvent, call,
190 RX_CALL_REFCOUNT_DELAY);
191 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
195 clock_GetTime(&when);
196 /* Delay to consolidate ack packets */
197 clock_Add(&when, &rx_hardAckDelay);
198 if (!call->delayedAckEvent ||
199 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
200 rxevent_Cancel(call->delayedAckEvent, call,
201 RX_CALL_REFCOUNT_DELAY);
202 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
203 call->delayedAckEvent = rxevent_Post(&when,
214 MTUXXX doesn't there need to be an "else" here ???
216 /* Are there ever going to be any more packets? */
217 if (call->flags & RX_CALL_RECEIVE_DONE) {
218 return requestCount - nbytes;
220 /* Wait for in-sequence packet */
221 call->flags |= RX_CALL_READER_WAIT;
223 call->startWait = clock_Sec();
224 while (call->flags & RX_CALL_READER_WAIT) {
225 #ifdef RX_ENABLE_LOCKS
226 CV_WAIT(&call->cv_rq, &call->lock);
228 osi_rxSleep(&call->rq);
233 #ifdef RX_ENABLE_LOCKS
237 #endif /* RX_ENABLE_LOCKS */
240 else /* assert(cp); */ /* MTUXXX this should be replaced by some error-recovery code before shipping */
241 /* yes, the following block is allowed to be the ELSE clause (or not) */
243 /* It's possible for call->nLeft to be smaller than any particular
244 * iov_len. Usually, recvmsg doesn't change the iov_len, since it
245 * reflects the size of the buffer. We have to keep track of the
246 * number of bytes read in the length field of the packet struct. On
247 * the final portion of a received packet, it's almost certain that
248 * call->nLeft will be smaller than the final buffer. */
250 while (nbytes && cp) {
251 t = MIN((int)call->curlen, nbytes);
252 t = MIN(t, (int)call->nLeft);
253 bcopy (call->curpos, buf, t);
261 /* out of packet. Get another one. */
263 cp = call->currentPacket = (struct rx_packet *)0;
265 else if (!call->curlen) {
266 /* need to get another struct iov */
267 if (++call->curvec >= cp->niovecs) {
268 /* current packet is exhausted, get ready for another */
269 /* don't worry about curvec and stuff, they get set somewhere else */
271 cp = call->currentPacket = (struct rx_packet *)0;
275 call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
276 call->curlen = cp->wirevec[call->curvec].iov_len;
281 /* user buffer is full, return */
290 int rx_ReadProc(call, buf, nbytes)
291 struct rx_call *call;
302 * Free any packets from the last call to ReadvProc/WritevProc.
303 * We do not need the lock because the receiver threads only
304 * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
305 * RX_CALL_IOVEC_WAIT is always cleared before returning from
306 * ReadvProc/WritevProc.
308 if (!queue_IsEmpty(&call->iovq)) {
309 register struct rx_packet *rp;
310 register struct rx_packet *nxp;
311 for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
318 * Most common case, all of the data is in the current iovec.
319 * We do not need the lock because this is the only thread that
320 * updates the curlen, curpos, nLeft fields.
322 * We are relying on nLeft being zero unless the call is in receive mode.
324 tcurlen = call->curlen;
325 tnLeft = call->nLeft;
326 if (!call->error && tcurlen > nbytes && tnLeft > nbytes) {
327 tcurpos = call->curpos;
328 bcopy(tcurpos, buf, nbytes);
329 call->curpos = tcurpos + nbytes;
330 call->curlen = tcurlen - nbytes;
331 call->nLeft = tnLeft - nbytes;
337 MUTEX_ENTER(&call->lock);
338 bytes = rxi_ReadProc(call, buf, nbytes);
339 MUTEX_EXIT(&call->lock);
345 /* Optimization for unmarshalling 32 bit integers */
346 int rx_ReadProc32(call, value)
347 struct rx_call *call;
357 * Free any packets from the last call to ReadvProc/WritevProc.
358 * We do not need the lock because the receiver threads only
359 * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
360 * RX_CALL_IOVEC_WAIT is always cleared before returning from
361 * ReadvProc/WritevProc.
363 if (!queue_IsEmpty(&call->iovq)) {
364 register struct rx_packet *rp;
365 register struct rx_packet *nxp;
366 for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
373 * Most common case, all of the data is in the current iovec.
374 * We do not need the lock because this is the only thread that
375 * updates the curlen, curpos, nLeft fields.
377 * We are relying on nLeft being zero unless the call is in receive mode.
379 tcurlen = call->curlen;
380 tnLeft = call->nLeft;
381 if (!call->error && tcurlen > sizeof(afs_int32) && tnLeft > sizeof(afs_int32)) {
382 tcurpos = call->curpos;
383 if (!((long)tcurpos & (sizeof(afs_int32)-1))) {
384 *value = *((afs_int32 *)(tcurpos));
386 bcopy(tcurpos, (char *)value, sizeof(afs_int32));
388 call->curpos = tcurpos + sizeof(afs_int32);
389 call->curlen = tcurlen - sizeof(afs_int32);
390 call->nLeft = tnLeft - sizeof(afs_int32);
391 return sizeof(afs_int32);
396 MUTEX_ENTER(&call->lock);
397 bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
398 MUTEX_EXIT(&call->lock);
406 * Uses packets in the receive queue to fill in as much of the
407 * current iovec as possible. Does not block if it runs out
408 * of packets to complete the iovec. Return true if an ack packet
409 * was sent, otherwise return false */
410 int rxi_FillReadVec(call, seq, serial, flags)
411 struct rx_call *call;
412 afs_uint32 seq, serial, flags;
416 register unsigned int t;
417 struct rx_packet *rp;
418 struct rx_packet *curp;
419 struct iovec *call_iov;
420 struct iovec *cur_iov = NULL;
422 curp = call->currentPacket;
424 cur_iov = &curp->wirevec[call->curvec];
426 call_iov = &call->iov[call->iovNext];
428 while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) {
429 if (call->nLeft == 0) {
430 /* Get next packet */
431 if (queue_IsNotEmpty(&call->rq)) {
432 /* Check that next packet available is next in sequence */
433 rp = queue_First(&call->rq, rx_packet);
434 if (rp->header.seq == call->rnext) {
436 register struct rx_connection *conn = call->conn;
439 /* RXS_CheckPacket called to undo RXS_PreparePacket's
440 * work. It may reduce the length of the packet by up
441 * to conn->maxTrailerSize, to reflect the length of the
442 * data + the header. */
443 if ((error = RXS_CheckPacket(conn->securityObject, call, rp))) {
444 /* Used to merely shut down the call, but now we
445 * shut down the whole connection since this may
446 * indicate an attempt to hijack it */
448 MUTEX_EXIT(&call->lock);
449 rxi_ConnectionError(conn, error);
450 MUTEX_ENTER(&conn->conn_data_lock);
451 rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
452 MUTEX_EXIT(&conn->conn_data_lock);
454 MUTEX_ENTER(&call->lock);
459 curp = call->currentPacket = rp;
460 call->curvec = 1; /* 0th vec is always header */
461 cur_iov = &curp->wirevec[1];
462 /* begin at the beginning [ more or less ], continue
463 * on until the end, then stop. */
464 call->curpos = (char *)curp->wirevec[1].iov_base
465 + call->conn->securityHeaderSize;
466 call->curlen = curp->wirevec[1].iov_len
467 - call->conn->securityHeaderSize;
469 /* Notice that this code works correctly if the data
470 * size is 0 (which it may be--no reply arguments from
471 * server, for example). This relies heavily on the
472 * fact that the code below immediately frees the packet
473 * (no yields, etc.). If it didn't, this would be a
474 * problem because a value of zero for call->nLeft
475 * normally means that there is no read packet */
476 call->nLeft = curp->length;
477 hadd32(call->bytesRcvd, curp->length);
479 /* Send a hard ack for every rxi_HardAckRate+1 packets
480 * consumed. Otherwise schedule an event to send
481 * the hard ack later on.
491 /* It's possible for call->nLeft to be smaller than any particular
492 * iov_len. Usually, recvmsg doesn't change the iov_len, since it
493 * reflects the size of the buffer. We have to keep track of the
494 * number of bytes read in the length field of the packet struct. On
495 * the final portion of a received packet, it's almost certain that
496 * call->nLeft will be smaller than the final buffer. */
497 while (call->iovNBytes && call->iovNext < call->iovMax && curp) {
499 t = MIN((int)call->curlen, call->iovNBytes);
500 t = MIN(t, (int)call->nLeft);
501 call_iov->iov_base = call->curpos;
502 call_iov->iov_len = t;
505 call->iovNBytes -= t;
511 /* out of packet. Get another one. */
512 queue_Append(&call->iovq, curp);
513 curp = call->currentPacket = (struct rx_packet *)0;
515 else if (!call->curlen) {
516 /* need to get another struct iov */
517 if (++call->curvec >= curp->niovecs) {
518 /* current packet is exhausted, get ready for another */
519 /* don't worry about curvec and stuff, they get set somewhere else */
520 queue_Append(&call->iovq, curp);
521 curp = call->currentPacket = (struct rx_packet *)0;
526 call->curpos = (char *)cur_iov->iov_base;
527 call->curlen = cur_iov->iov_len;
533 /* If we consumed any packets then check whether we need to
534 * send a hard ack. */
535 if (didConsume && (!(call->flags &RX_CALL_RECEIVE_DONE))) {
536 if (call->nHardAcks > (u_short)rxi_HardAckRate) {
537 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
538 rxi_SendAck(call, 0, seq, serial, flags, RX_ACK_DELAY, 0);
543 clock_GetTime(&when);
544 /* Delay to consolidate ack packets */
545 clock_Add(&when, &rx_hardAckDelay);
546 if (!call->delayedAckEvent ||
547 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
548 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
549 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
550 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
559 /* rxi_ReadvProc -- internal version.
561 * Fills in an iovec with pointers to the packet buffers. All packets
562 * except the last packet (new current packet) are moved to the iovq
563 * while the application is processing the data.
565 * LOCKS USED -- called at netpri with rx global lock and call->lock held.
567 int rxi_ReadvProc(call, iov, nio, maxio, nbytes)
568 struct rx_call *call;
574 struct rx_packet *rp;
575 struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
579 requestCount = nbytes;
582 /* Free any packets from the last call to ReadvProc/WritevProc */
583 for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
588 if (call->mode == RX_MODE_SENDING) {
589 rxi_FlushWrite(call);
596 /* Get whatever data is currently available in the receive queue.
597 * If rxi_FillReadVec sends an ack packet then it is possible
598 * that we will receive more data while we drop the call lock
599 * to send the packet. Set the RX_CALL_IOVEC_WAIT flag
600 * here to avoid a race with the receive thread if we send
601 * hard acks in rxi_FillReadVec. */
602 call->flags |= RX_CALL_IOVEC_WAIT;
603 call->iovNBytes = nbytes;
604 call->iovMax = maxio;
607 rxi_FillReadVec(call, 0, 0, 0);
609 /* if we need more data then sleep until the receive thread has
610 * filled in the rest. */
611 if (!call->error && call->iovNBytes &&
612 call->iovNext < call->iovMax &&
613 !(call->flags & RX_CALL_RECEIVE_DONE)) {
614 call->flags |= RX_CALL_READER_WAIT;
616 call->startWait = clock_Sec();
617 while (call->flags & RX_CALL_READER_WAIT) {
618 #ifdef RX_ENABLE_LOCKS
619 CV_WAIT(&call->cv_rq, &call->lock);
621 osi_rxSleep(&call->rq);
626 call->flags &= ~RX_CALL_IOVEC_WAIT;
627 #ifdef RX_ENABLE_LOCKS
631 #endif /* RX_ENABLE_LOCKS */
634 *nio = call->iovNext;
635 return nbytes - call->iovNBytes;
638 int rx_ReadvProc(call, iov, nio, maxio, nbytes)
639 struct rx_call *call;
650 MUTEX_ENTER(&call->lock);
651 bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
652 MUTEX_EXIT(&call->lock);
658 /* rxi_WriteProc -- internal version.
660 * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
662 int rxi_WriteProc(call, buf, nbytes)
663 register struct rx_call *call;
667 struct rx_connection *conn = call->conn;
668 register struct rx_packet *cp = call->currentPacket;
669 register struct rx_packet *tp; /* Temporary packet pointer */
670 register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
671 register unsigned int t;
672 int requestCount = nbytes;
674 /* Free any packets from the last call to ReadvProc/WritevProc */
675 if (!queue_IsEmpty(&call->iovq)) {
676 for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
682 if (call->mode != RX_MODE_SENDING) {
683 if ((conn->type == RX_SERVER_CONNECTION)
684 && (call->mode == RX_MODE_RECEIVING)) {
685 call->mode = RX_MODE_SENDING;
688 cp = call->currentPacket = (struct rx_packet *) 0;
698 /* Loop condition is checked at end, so that a write of 0 bytes
699 * will force a packet to be created--specially for the case where
700 * there are 0 bytes on the stream, but we must send a packet
703 if (call->nFree == 0) {
704 if (!call->error && cp) {
705 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
706 /* Wait until TQ_BUSY is reset before adding any
707 * packets to the transmit queue
709 while (call->flags & RX_CALL_TQ_BUSY) {
710 call->flags |= RX_CALL_TQ_WAIT;
711 #ifdef RX_ENABLE_LOCKS
712 CV_WAIT(&call->cv_tq, &call->lock);
713 #else /* RX_ENABLE_LOCKS */
714 osi_rxSleep(&call->tq);
715 #endif /* RX_ENABLE_LOCKS */
717 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
718 clock_NewTime(); /* Bogus: need new time package */
719 /* The 0, below, specifies that it is not the last packet:
720 * there will be others. PrepareSendPacket may
721 * alter the packet length by up to
722 * conn->securityMaxTrailerSize */
723 hadd32(call->bytesSent, cp->length);
724 rxi_PrepareSendPacket(call, cp, 0);
725 queue_Append(&call->tq, cp);
726 cp = call->currentPacket = NULL;
727 if (!(call->flags & (RX_CALL_FAST_RECOVER|
728 RX_CALL_FAST_RECOVER_WAIT))) {
729 rxi_Start(0, call, 0);
732 /* Wait for transmit window to open up */
733 while (!call->error && call->tnext + 1 > call->tfirst + call->twind) {
735 call->startWait = clock_Sec();
737 #ifdef RX_ENABLE_LOCKS
738 CV_WAIT(&call->cv_twind, &call->lock);
740 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
741 osi_rxSleep(&call->twind);
745 #ifdef RX_ENABLE_LOCKS
749 #endif /* RX_ENABLE_LOCKS */
751 if ((cp = rxi_AllocSendPacket(call, nbytes))) {
752 call->currentPacket = cp;
753 call->nFree = cp->length;
754 call->curvec = 1; /* 0th vec is always header */
755 /* begin at the beginning [ more or less ], continue
756 * on until the end, then stop. */
757 call->curpos = (char *)cp->wirevec[1].iov_base
758 + call->conn->securityHeaderSize;
759 call->curlen = cp->wirevec[1].iov_len
760 - call->conn->securityHeaderSize;
765 call->currentPacket = NULL;
771 if (cp && (int)call->nFree < nbytes) {
772 /* Try to extend the current buffer */
773 register int len, mud;
775 mud = rx_MaxUserDataSize(call);
778 want = MIN(nbytes - (int)call->nFree, mud - len);
779 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
780 if (cp->length > (unsigned)mud)
782 call->nFree += (cp->length - len);
786 /* If the remaining bytes fit in the buffer, then store them
787 * and return. Don't ship a buffer that's full immediately to
788 * the peer--we don't know if it's the last buffer yet */
794 while (nbytes && call->nFree) {
796 t = MIN((int)call->curlen, nbytes);
797 t = MIN((int)call->nFree, t);
798 bcopy (buf, call->curpos, t);
806 /* need to get another struct iov */
807 if (++call->curvec >= cp->niovecs) {
808 /* current packet is full, extend or send it */
811 call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
812 call->curlen = cp->wirevec[call->curvec].iov_len;
815 } /* while bytes to send and room to send them */
817 /* might be out of space now */
821 else ; /* more data to send, so get another packet and keep going */
824 return requestCount - nbytes;
827 int rx_WriteProc(call, buf, nbytes)
828 struct rx_call *call;
839 * Free any packets from the last call to ReadvProc/WritevProc.
840 * We do not need the lock because the receiver threads only
841 * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
842 * RX_CALL_IOVEC_WAIT is always cleared before returning from
843 * ReadvProc/WritevProc.
845 if (!queue_IsEmpty(&call->iovq)) {
846 register struct rx_packet *rp;
847 register struct rx_packet *nxp;
848 for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
855 * Most common case: all of the data fits in the current iovec.
856 * We do not need the lock because this is the only thread that
857 * updates the curlen, curpos, nFree fields.
859 * We are relying on nFree being zero unless the call is in send mode.
861 tcurlen = (int)call->curlen;
862 tnFree = (int)call->nFree;
863 if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
864 tcurpos = call->curpos;
865 bcopy(buf, tcurpos, nbytes);
866 call->curpos = tcurpos + nbytes;
867 call->curlen = tcurlen - nbytes;
868 call->nFree = tnFree - nbytes;
874 MUTEX_ENTER(&call->lock);
875 bytes = rxi_WriteProc(call, buf, nbytes);
876 MUTEX_EXIT(&call->lock);
882 /* Optimization for marshalling 32 bit arguments */
883 int rx_WriteProc32(call, value)
884 register struct rx_call *call;
885 register afs_int32 *value;
894 * Free any packets from the last call to ReadvProc/WritevProc.
895 * We do not need the lock because the receiver threads only
896 * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
897 * RX_CALL_IOVEC_WAIT is always cleared before returning from
898 * ReadvProc/WritevProc.
900 if (!queue_IsEmpty(&call->iovq)) {
901 register struct rx_packet *rp;
902 register struct rx_packet *nxp;
903 for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
910 * Most common case: all of the data fits in the current iovec.
911 * We do not need the lock because this is the only thread that
912 * updates the curlen, curpos, nFree fields.
914 * We are relying on nFree being zero unless the call is in send mode.
916 tcurlen = (int)call->curlen;
917 tnFree = (int)call->nFree;
918 if (!call->error && tcurlen >= sizeof(afs_int32) && tnFree >= sizeof(afs_int32)) {
919 tcurpos = call->curpos;
920 if (!((long)tcurpos & (sizeof(afs_int32)-1))) {
921 *((afs_int32 *)(tcurpos)) = *value;
923 bcopy((char *)value, tcurpos, sizeof(afs_int32));
925 call->curpos = tcurpos + sizeof(afs_int32);
926 call->curlen = tcurlen - sizeof(afs_int32);
927 call->nFree = tnFree - sizeof(afs_int32);
928 return sizeof(afs_int32);
933 MUTEX_ENTER(&call->lock);
934 bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
935 MUTEX_EXIT(&call->lock);
941 /* rxi_WritevAlloc -- internal version.
943 * Fill in an iovec to point to data in packet buffers. The application
944 * calls rxi_WritevProc when the buffers are full.
946 * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
948 int rxi_WritevAlloc(call, iov, nio, maxio, nbytes)
949 struct rx_call *call;
955 struct rx_connection *conn = call->conn;
956 struct rx_packet *cp = call->currentPacket;
957 struct rx_packet *tp; /* temporary packet pointer */
958 struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
961 /* Temporary values, real work is done in rxi_WritevProc */
967 requestCount = nbytes;
970 /* Free any packets from the last call to ReadvProc/WritevProc */
971 for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
976 if (call->mode != RX_MODE_SENDING) {
977 if ((conn->type == RX_SERVER_CONNECTION)
978 && (call->mode == RX_MODE_RECEIVING)) {
979 call->mode = RX_MODE_SENDING;
982 cp = call->currentPacket = (struct rx_packet *) 0;
992 /* Set up the iovec to point to data in packet buffers. */
993 tnFree = call->nFree;
994 tcurvec = call->curvec;
995 tcurpos = call->curpos;
996 tcurlen = call->curlen;
998 register unsigned int t;
1001 /* current packet is full, allocate a new one */
1002 cp = rxi_AllocSendPacket(call, nbytes);
1004 /* out of space, return what we have */
1006 return requestCount - nbytes;
1008 queue_Append(&call->iovq, cp);
1009 tnFree = cp->length;
1011 tcurpos = (char *)cp->wirevec[1].iov_base
1012 + call->conn->securityHeaderSize;
1013 tcurlen = cp->wirevec[1].iov_len
1014 - call->conn->securityHeaderSize;
1017 if (tnFree < nbytes) {
1018 /* try to extend the current packet */
1019 register int len, mud;
1021 mud = rx_MaxUserDataSize(call);
1024 want = MIN(nbytes - tnFree, mud - len);
1025 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
1026 if (cp->length > (unsigned)mud)
1028 tnFree += (cp->length - len);
1029 if (cp == call->currentPacket) {
1030 call->nFree += (cp->length - len);
1035 /* fill in the next entry in the iovec */
1036 t = MIN(tcurlen, nbytes);
1038 iov[nextio].iov_base = tcurpos;
1039 iov[nextio].iov_len = t;
1047 /* need to get another struct iov */
1048 if (++tcurvec >= cp->niovecs) {
1049 /* current packet is full, extend it or move on to next packet */
1052 tcurpos = (char *)cp->wirevec[tcurvec].iov_base;
1053 tcurlen = cp->wirevec[tcurvec].iov_len;
1056 } while (nbytes && nextio < maxio);
1058 return requestCount - nbytes;
1061 int rx_WritevAlloc(call, iov, nio, maxio, nbytes)
1062 struct rx_call *call;
1073 MUTEX_ENTER(&call->lock);
1074 bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
1075 MUTEX_EXIT(&call->lock);
1081 int rx_WritevInit(call)
1082 struct rx_call *call;
1088 * Free any packets from the last call to ReadvProc/WritevProc.
1089 * We do not need the lock because the receiver threads only
1090 * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
1091 * RX_CALL_IOVEC_WAIT is always cleared before returning from
1092 * ReadvProc/WritevProc.
1094 if (!queue_IsEmpty(&call->iovq)) {
1095 register struct rx_packet *rp;
1096 register struct rx_packet *nxp;
1097 for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
1105 MUTEX_ENTER(&call->lock);
1106 bytes = rxi_WriteProc(call, &bytes, 0);
1107 MUTEX_EXIT(&call->lock);
1113 /* rxi_WritevProc -- internal version.
1115 * Send buffers allocated in rxi_WritevAlloc.
1117 * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
1119 int rxi_WritevProc(call, iov, nio, nbytes)
1120 struct rx_call *call;
1125 struct rx_packet *cp = call->currentPacket;
1126 register struct rx_packet *tp; /* Temporary packet pointer */
1127 register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1130 struct rx_queue tmpq;
1132 requestCount = nbytes;
1135 if (call->mode != RX_MODE_SENDING) {
1136 call->error = RX_PROTOCOL_ERROR;
1139 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1140 /* Wait until TQ_BUSY is reset before trying to move any
1141 * packets to the transmit queue. */
1142 while (!call->error && call->flags & RX_CALL_TQ_BUSY) {
1143 call->flags |= RX_CALL_TQ_WAIT;
1144 #ifdef RX_ENABLE_LOCKS
1145 CV_WAIT(&call->cv_tq, &call->lock);
1146 #else /* RX_ENABLE_LOCKS */
1147 osi_rxSleep(&call->tq);
1148 #endif /* RX_ENABLE_LOCKS */
1150 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1153 for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1159 cp = call->currentPacket = NULL;
1164 /* Loop through the I/O vector adjusting packet pointers.
1165 * Place full packets back onto the iovq once they are ready
1166 * to send. Set RX_PROTOCOL_ERROR if any problems are found in
1167 * the iovec. We put the loop condition at the end to ensure that
1168 * a zero length write will push a short packet. */
1172 if (call->nFree == 0 && cp) {
1173 clock_NewTime(); /* Bogus: need new time package */
1174 /* The 0, below, specifies that it is not the last packet:
1175 * there will be others. PrepareSendPacket may
1176 * alter the packet length by up to
1177 * conn->securityMaxTrailerSize */
1178 hadd32(call->bytesSent, cp->length);
1179 rxi_PrepareSendPacket(call, cp, 0);
1180 queue_Append(&tmpq, cp);
1182 /* The head of the iovq is now the current packet */
1184 if (queue_IsEmpty(&call->iovq)) {
1185 call->error = RX_PROTOCOL_ERROR;
1186 cp = call->currentPacket = NULL;
1187 for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
1193 cp = queue_First(&call->iovq, rx_packet);
1195 call->currentPacket = cp;
1196 call->nFree = cp->length;
1198 call->curpos = (char *)cp->wirevec[1].iov_base
1199 + call->conn->securityHeaderSize;
1200 call->curlen = cp->wirevec[1].iov_len
1201 - call->conn->securityHeaderSize;
1206 /* The next iovec should point to the current position */
1207 if (iov[nextio].iov_base != call->curpos
1208 || iov[nextio].iov_len > (int)call->curlen) {
1209 call->error = RX_PROTOCOL_ERROR;
1210 for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
1216 call->currentPacket = NULL;
1220 nbytes -= iov[nextio].iov_len;
1221 call->curpos += iov[nextio].iov_len;
1222 call->curlen -= iov[nextio].iov_len;
1223 call->nFree -= iov[nextio].iov_len;
1225 if (call->curlen == 0) {
1226 if (++call->curvec > cp->niovecs) {
1229 call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
1230 call->curlen = cp->wirevec[call->curvec].iov_len;
1234 } while (nbytes && nextio < nio);
1236 /* Move the packets from the temporary queue onto the transmit queue.
1237 * We may end up with more than call->twind packets on the queue. */
1238 for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
1240 queue_Append(&call->tq, tp);
1243 if (!(call->flags & (RX_CALL_FAST_RECOVER|RX_CALL_FAST_RECOVER_WAIT))) {
1244 rxi_Start(0, call, 0);
1247 /* Wait for the length of the transmit queue to fall below call->twind */
1248 while (!call->error && call->tnext + 1 > call->tfirst + call->twind) {
1250 call->startWait = clock_Sec();
1251 #ifdef RX_ENABLE_LOCKS
1252 CV_WAIT(&call->cv_twind, &call->lock);
1254 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
1255 osi_rxSleep(&call->twind);
1257 call->startWait = 0;
1263 cp = call->currentPacket = NULL;
1268 return requestCount - nbytes;
1271 int rx_WritevProc(call, iov, nio, nbytes)
1272 struct rx_call *call;
1282 MUTEX_ENTER(&call->lock);
1283 bytes = rxi_WritevProc(call, iov, nio, nbytes);
1284 MUTEX_EXIT(&call->lock);
1290 /* Flush any buffered data to the stream, switch to read mode
1291 * (clients) or to EOF mode (servers) */
1292 void rxi_FlushWrite(call)
1293 register struct rx_call *call;
1295 register struct rx_packet *cp = call->currentPacket;
1296 register struct rx_packet *tp; /* Temporary packet pointer */
1297 register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1299 /* Free any packets from the last call to ReadvProc/WritevProc */
1300 for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1305 if (call->mode == RX_MODE_SENDING) {
1307 call->mode = (call->conn->type == RX_CLIENT_CONNECTION ?
1308 RX_MODE_RECEIVING: RX_MODE_EOF);
1310 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1311 /* Wait until TQ_BUSY is reset before adding any
1312 * packets to the transmit queue
1314 while (call->flags & RX_CALL_TQ_BUSY) {
1315 call->flags |= RX_CALL_TQ_WAIT;
1316 #ifdef RX_ENABLE_LOCKS
1317 CV_WAIT(&call->cv_tq, &call->lock);
1318 #else /* RX_ENABLE_LOCKS */
1319 osi_rxSleep(&call->tq);
1320 #endif /* RX_ENABLE_LOCKS */
1322 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1325 /* cp->length is only supposed to be the user's data */
1326 /* cp->length was already set to (then-current)
1327 * MaxUserDataSize or less. */
1328 cp->length -= call->nFree;
1329 call->currentPacket = (struct rx_packet *) 0;
1333 cp = rxi_AllocSendPacket(call,0);
1335 /* Mode can no longer be MODE_SENDING */
1339 cp->niovecs = 1; /* just the header */
1343 /* The 1 specifies that this is the last packet */
1344 hadd32(call->bytesSent, cp->length);
1345 rxi_PrepareSendPacket(call, cp, 1);
1346 queue_Append(&call->tq, cp);
1347 if (!(call->flags & (RX_CALL_FAST_RECOVER|
1348 RX_CALL_FAST_RECOVER_WAIT))) {
1349 rxi_Start(0, call, 0);
1354 /* Flush any buffered data to the stream, switch to read mode
1355 * (clients) or to EOF mode (servers) */
1356 void rx_FlushWrite(call)
1357 struct rx_call *call;
1362 MUTEX_ENTER(&call->lock);
1363 rxi_FlushWrite(call);
1364 MUTEX_EXIT(&call->lock);