2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
11 #include "../afs/param.h"
12 #include <afsconfig.h>
14 #if defined(AFS_DARWIN_ENV) || defined(AFS_FBSD_ENV)
15 #include "../afs/sysincludes.h"
17 #include "../h/types.h"
18 #include "../h/time.h"
19 #include "../h/stat.h"
21 #include <net/net_globals.h>
22 #endif /* AFS_OSF_ENV */
23 #ifdef AFS_LINUX20_ENV
24 #include "../h/socket.h"
26 #include "../netinet/in.h"
27 #if defined(AFS_SGI_ENV)
28 #include "../afs/sysincludes.h"
31 #include "../afs/afs_args.h"
32 #include "../afs/afs_osi.h"
33 #if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
34 #include "../h/systm.h"
37 #include "../afs/sysincludes.h"
40 #undef RXDEBUG /* turn off debugging */
43 #include "../rx/rx_kmutex.h"
44 #include "../rx/rx_kernel.h"
45 #include "../rx/rx_clock.h"
46 #include "../rx/rx_queue.h"
48 #include "../rx/rx_globals.h"
49 #include "../afs/lock.h"
50 #include "../afsint/afsint.h"
57 #endif /* AFS_ALPHA_ENV */
59 # include <afs/param.h>
60 # include <afsconfig.h>
61 # include <sys/types.h>
63 # include <sys/socket.h>
64 # include <sys/file.h>
66 # include <netinet/in.h>
67 # include <sys/stat.h>
68 # include <sys/time.h>
71 # include "rx_clock.h"
72 # include "rx_queue.h"
74 # include "rx_globals.h"
75 # include "rx_internal.h"
89 /* rxdb_fileID is used to identify the lock location, along with line#. */
90 static int rxdb_fileID = RXDB_FILE_RX_RDWR;
91 #endif /* RX_LOCKS_DB */
92 /* rxi_ReadProc -- internal version.
94 * LOCKS USED -- called at netpri with rx global lock and call->lock held.
96 int rxi_ReadProc(call, buf, nbytes)
97 register struct rx_call *call;
101 register struct rx_packet *cp = call->currentPacket;
102 register struct rx_packet *rp;
103 register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
104 register int requestCount;
105 register unsigned int t;
106 /* XXXX took out clock_NewTime from here. Was it needed? */
107 requestCount = nbytes;
109 /* Free any packets from the last call to ReadvProc/WritevProc */
110 if (!queue_IsEmpty(&call->iovq)) {
111 for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
118 if (call->nLeft == 0) {
119 /* Get next packet */
121 if (call->error || (call->mode != RX_MODE_RECEIVING)) {
125 if (call->mode == RX_MODE_SENDING) {
126 rxi_FlushWrite(call);
130 if (queue_IsNotEmpty(&call->rq)) {
131 /* Check that next packet available is next in sequence */
132 rp = queue_First(&call->rq, rx_packet);
133 if (rp->header.seq == call->rnext) {
135 register struct rx_connection *conn = call->conn;
138 /* RXS_CheckPacket called to undo RXS_PreparePacket's
139 * work. It may reduce the length of the packet by up
140 * to conn->maxTrailerSize, to reflect the length of the
141 * data + the header. */
142 if ((error = RXS_CheckPacket(conn->securityObject, call, rp))) {
143 /* Used to merely shut down the call, but now we
144 * shut down the whole connection since this may
145 * indicate an attempt to hijack it */
147 MUTEX_EXIT(&call->lock);
148 rxi_ConnectionError(conn, error);
149 MUTEX_ENTER(&conn->conn_data_lock);
150 rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
151 MUTEX_EXIT(&conn->conn_data_lock);
153 MUTEX_ENTER(&call->lock);
158 cp = call->currentPacket = rp;
159 call->curvec = 1; /* 0th vec is always header */
160 /* begin at the beginning [ more or less ], continue
161 * on until the end, then stop. */
162 call->curpos = (char *)cp->wirevec[1].iov_base
163 + call->conn->securityHeaderSize;
164 call->curlen = cp->wirevec[1].iov_len
165 - call->conn->securityHeaderSize;
167 /* Notice that this code works correctly if the data
168 * size is 0 (which it may be--no reply arguments from
169 * server, for example). This relies heavily on the
170 * fact that the code below immediately frees the packet
171 * (no yields, etc.). If it didn't, this would be a
172 * problem because a value of zero for call->nLeft
173 * normally means that there is no read packet */
174 call->nLeft = cp->length;
175 hadd32(call->bytesRcvd, cp->length);
177 /* Send a hard ack for every rxi_HardAckRate+1 packets
178 * consumed. Otherwise schedule an event to send
179 * the hard ack later on.
182 if (!(call->flags &RX_CALL_RECEIVE_DONE)) {
183 if (call->nHardAcks > (u_short)rxi_HardAckRate) {
184 rxevent_Cancel(call->delayedAckEvent, call,
185 RX_CALL_REFCOUNT_DELAY);
186 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
190 clock_GetTime(&when);
191 /* Delay to consolidate ack packets */
192 clock_Add(&when, &rx_hardAckDelay);
193 if (!call->delayedAckEvent ||
194 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
195 rxevent_Cancel(call->delayedAckEvent, call,
196 RX_CALL_REFCOUNT_DELAY);
197 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
198 call->delayedAckEvent = rxevent_Post(&when,
209 MTUXXX doesn't there need to be an "else" here ???
211 /* Are there ever going to be any more packets? */
212 if (call->flags & RX_CALL_RECEIVE_DONE) {
213 return requestCount - nbytes;
215 /* Wait for in-sequence packet */
216 call->flags |= RX_CALL_READER_WAIT;
218 call->startWait = clock_Sec();
219 while (call->flags & RX_CALL_READER_WAIT) {
220 #ifdef RX_ENABLE_LOCKS
221 CV_WAIT(&call->cv_rq, &call->lock);
223 osi_rxSleep(&call->rq);
228 #ifdef RX_ENABLE_LOCKS
232 #endif /* RX_ENABLE_LOCKS */
235 else /* assert(cp); */ /* MTUXXX this should be replaced by some error-recovery code before shipping */
236 /* yes, the following block is allowed to be the ELSE clause (or not) */
238 /* It's possible for call->nLeft to be smaller than any particular
239 * iov_len. Usually, recvmsg doesn't change the iov_len, since it
240 * reflects the size of the buffer. We have to keep track of the
241 * number of bytes read in the length field of the packet struct. On
242 * the final portion of a received packet, it's almost certain that
243 * call->nLeft will be smaller than the final buffer. */
245 while (nbytes && cp) {
246 t = MIN((int)call->curlen, nbytes);
247 t = MIN(t, (int)call->nLeft);
248 bcopy (call->curpos, buf, t);
256 /* out of packet. Get another one. */
258 cp = call->currentPacket = (struct rx_packet *)0;
260 else if (!call->curlen) {
261 /* need to get another struct iov */
262 if (++call->curvec >= cp->niovecs) {
263 /* current packet is exhausted, get ready for another */
264 /* don't worry about curvec and stuff, they get set somewhere else */
266 cp = call->currentPacket = (struct rx_packet *)0;
270 call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
271 call->curlen = cp->wirevec[call->curvec].iov_len;
276 /* user buffer is full, return */
285 int rx_ReadProc(call, buf, nbytes)
286 struct rx_call *call;
297 * Free any packets from the last call to ReadvProc/WritevProc.
298 * We do not need the lock because the receiver threads only
299 * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
300 * RX_CALL_IOVEC_WAIT is always cleared before returning from
301 * ReadvProc/WritevProc.
303 if (!queue_IsEmpty(&call->iovq)) {
304 register struct rx_packet *rp;
305 register struct rx_packet *nxp;
306 for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
313 * Most common case, all of the data is in the current iovec.
314 * We do not need the lock because this is the only thread that
315 * updates the curlen, curpos, nLeft fields.
317 * We are relying on nLeft being zero unless the call is in receive mode.
319 tcurlen = call->curlen;
320 tnLeft = call->nLeft;
321 if (!call->error && tcurlen > nbytes && tnLeft > nbytes) {
322 tcurpos = call->curpos;
323 bcopy(tcurpos, buf, nbytes);
324 call->curpos = tcurpos + nbytes;
325 call->curlen = tcurlen - nbytes;
326 call->nLeft = tnLeft - nbytes;
332 MUTEX_ENTER(&call->lock);
333 bytes = rxi_ReadProc(call, buf, nbytes);
334 MUTEX_EXIT(&call->lock);
340 /* Optimization for unmarshalling 32 bit integers */
341 int rx_ReadProc32(call, value)
342 struct rx_call *call;
352 * Free any packets from the last call to ReadvProc/WritevProc.
353 * We do not need the lock because the receiver threads only
354 * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
355 * RX_CALL_IOVEC_WAIT is always cleared before returning from
356 * ReadvProc/WritevProc.
358 if (!queue_IsEmpty(&call->iovq)) {
359 register struct rx_packet *rp;
360 register struct rx_packet *nxp;
361 for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
368 * Most common case, all of the data is in the current iovec.
369 * We do not need the lock because this is the only thread that
370 * updates the curlen, curpos, nLeft fields.
372 * We are relying on nLeft being zero unless the call is in receive mode.
374 tcurlen = call->curlen;
375 tnLeft = call->nLeft;
376 if (!call->error && tcurlen > sizeof(afs_int32) && tnLeft > sizeof(afs_int32)) {
377 tcurpos = call->curpos;
378 if (!((long)tcurpos & (sizeof(afs_int32)-1))) {
379 *value = *((afs_int32 *)(tcurpos));
381 bcopy(tcurpos, (char *)value, sizeof(afs_int32));
383 call->curpos = tcurpos + sizeof(afs_int32);
384 call->curlen = tcurlen - sizeof(afs_int32);
385 call->nLeft = tnLeft - sizeof(afs_int32);
386 return sizeof(afs_int32);
391 MUTEX_ENTER(&call->lock);
392 bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
393 MUTEX_EXIT(&call->lock);
401 * Uses packets in the receive queue to fill in as much of the
402 * current iovec as possible. Does not block if it runs out
403 * of packets to complete the iovec. Return true if an ack packet
404 * was sent, otherwise return false */
405 int rxi_FillReadVec(call, seq, serial, flags)
406 struct rx_call *call;
407 afs_uint32 seq, serial, flags;
411 register unsigned int t;
412 struct rx_packet *rp;
413 struct rx_packet *curp;
414 struct iovec *call_iov;
415 struct iovec *cur_iov = NULL;
417 curp = call->currentPacket;
419 cur_iov = &curp->wirevec[call->curvec];
421 call_iov = &call->iov[call->iovNext];
423 while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) {
424 if (call->nLeft == 0) {
425 /* Get next packet */
426 if (queue_IsNotEmpty(&call->rq)) {
427 /* Check that next packet available is next in sequence */
428 rp = queue_First(&call->rq, rx_packet);
429 if (rp->header.seq == call->rnext) {
431 register struct rx_connection *conn = call->conn;
434 /* RXS_CheckPacket called to undo RXS_PreparePacket's
435 * work. It may reduce the length of the packet by up
436 * to conn->maxTrailerSize, to reflect the length of the
437 * data + the header. */
438 if ((error = RXS_CheckPacket(conn->securityObject, call, rp))) {
439 /* Used to merely shut down the call, but now we
440 * shut down the whole connection since this may
441 * indicate an attempt to hijack it */
443 MUTEX_EXIT(&call->lock);
444 rxi_ConnectionError(conn, error);
445 MUTEX_ENTER(&conn->conn_data_lock);
446 rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
447 MUTEX_EXIT(&conn->conn_data_lock);
449 MUTEX_ENTER(&call->lock);
454 curp = call->currentPacket = rp;
455 call->curvec = 1; /* 0th vec is always header */
456 cur_iov = &curp->wirevec[1];
457 /* begin at the beginning [ more or less ], continue
458 * on until the end, then stop. */
459 call->curpos = (char *)curp->wirevec[1].iov_base
460 + call->conn->securityHeaderSize;
461 call->curlen = curp->wirevec[1].iov_len
462 - call->conn->securityHeaderSize;
464 /* Notice that this code works correctly if the data
465 * size is 0 (which it may be--no reply arguments from
466 * server, for example). This relies heavily on the
467 * fact that the code below immediately frees the packet
468 * (no yields, etc.). If it didn't, this would be a
469 * problem because a value of zero for call->nLeft
470 * normally means that there is no read packet */
471 call->nLeft = curp->length;
472 hadd32(call->bytesRcvd, curp->length);
474 /* Send a hard ack for every rxi_HardAckRate+1 packets
475 * consumed. Otherwise schedule an event to send
476 * the hard ack later on.
486 /* It's possible for call->nLeft to be smaller than any particular
487 * iov_len. Usually, recvmsg doesn't change the iov_len, since it
488 * reflects the size of the buffer. We have to keep track of the
489 * number of bytes read in the length field of the packet struct. On
490 * the final portion of a received packet, it's almost certain that
491 * call->nLeft will be smaller than the final buffer. */
492 while (call->iovNBytes && call->iovNext < call->iovMax && curp) {
494 t = MIN((int)call->curlen, call->iovNBytes);
495 t = MIN(t, (int)call->nLeft);
496 call_iov->iov_base = call->curpos;
497 call_iov->iov_len = t;
500 call->iovNBytes -= t;
506 /* out of packet. Get another one. */
507 queue_Append(&call->iovq, curp);
508 curp = call->currentPacket = (struct rx_packet *)0;
510 else if (!call->curlen) {
511 /* need to get another struct iov */
512 if (++call->curvec >= curp->niovecs) {
513 /* current packet is exhausted, get ready for another */
514 /* don't worry about curvec and stuff, they get set somewhere else */
515 queue_Append(&call->iovq, curp);
516 curp = call->currentPacket = (struct rx_packet *)0;
521 call->curpos = (char *)cur_iov->iov_base;
522 call->curlen = cur_iov->iov_len;
528 /* If we consumed any packets then check whether we need to
529 * send a hard ack. */
530 if (didConsume && (!(call->flags &RX_CALL_RECEIVE_DONE))) {
531 if (call->nHardAcks > (u_short)rxi_HardAckRate) {
532 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
533 rxi_SendAck(call, 0, seq, serial, flags, RX_ACK_DELAY, 0);
538 clock_GetTime(&when);
539 /* Delay to consolidate ack packets */
540 clock_Add(&when, &rx_hardAckDelay);
541 if (!call->delayedAckEvent ||
542 clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
543 rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
544 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
545 call->delayedAckEvent = rxevent_Post(&when, rxi_SendDelayedAck,
554 /* rxi_ReadvProc -- internal version.
556 * Fills in an iovec with pointers to the packet buffers. All packets
557 * except the last packet (new current packet) are moved to the iovq
558 * while the application is processing the data.
560 * LOCKS USED -- called at netpri with rx global lock and call->lock held.
562 int rxi_ReadvProc(call, iov, nio, maxio, nbytes)
563 struct rx_call *call;
569 struct rx_packet *rp;
570 struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
574 requestCount = nbytes;
577 /* Free any packets from the last call to ReadvProc/WritevProc */
578 for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
583 if (call->mode == RX_MODE_SENDING) {
584 rxi_FlushWrite(call);
591 /* Get whatever data is currently available in the receive queue.
592 * If rxi_FillReadVec sends an ack packet then it is possible
593 * that we will receive more data while we drop the call lock
594 * to send the packet. Set the RX_CALL_IOVEC_WAIT flag
595 * here to avoid a race with the receive thread if we send
596 * hard acks in rxi_FillReadVec. */
597 call->flags |= RX_CALL_IOVEC_WAIT;
598 call->iovNBytes = nbytes;
599 call->iovMax = maxio;
602 rxi_FillReadVec(call, 0, 0, 0);
604 /* if we need more data then sleep until the receive thread has
605 * filled in the rest. */
606 if (!call->error && call->iovNBytes &&
607 call->iovNext < call->iovMax &&
608 !(call->flags & RX_CALL_RECEIVE_DONE)) {
609 call->flags |= RX_CALL_READER_WAIT;
611 call->startWait = clock_Sec();
612 while (call->flags & RX_CALL_READER_WAIT) {
613 #ifdef RX_ENABLE_LOCKS
614 CV_WAIT(&call->cv_rq, &call->lock);
616 osi_rxSleep(&call->rq);
621 call->flags &= ~RX_CALL_IOVEC_WAIT;
622 #ifdef RX_ENABLE_LOCKS
626 #endif /* RX_ENABLE_LOCKS */
629 *nio = call->iovNext;
630 return nbytes - call->iovNBytes;
633 int rx_ReadvProc(call, iov, nio, maxio, nbytes)
634 struct rx_call *call;
645 MUTEX_ENTER(&call->lock);
646 bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
647 MUTEX_EXIT(&call->lock);
653 /* rxi_WriteProc -- internal version.
655 * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
657 int rxi_WriteProc(call, buf, nbytes)
658 register struct rx_call *call;
662 struct rx_connection *conn = call->conn;
663 register struct rx_packet *cp = call->currentPacket;
664 register struct rx_packet *tp; /* Temporary packet pointer */
665 register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
666 register unsigned int t;
667 int requestCount = nbytes;
669 /* Free any packets from the last call to ReadvProc/WritevProc */
670 if (!queue_IsEmpty(&call->iovq)) {
671 for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
677 if (call->mode != RX_MODE_SENDING) {
678 if ((conn->type == RX_SERVER_CONNECTION)
679 && (call->mode == RX_MODE_RECEIVING)) {
680 call->mode = RX_MODE_SENDING;
683 cp = call->currentPacket = (struct rx_packet *) 0;
693 /* Loop condition is checked at end, so that a write of 0 bytes
694 * will force a packet to be created--specially for the case where
695 * there are 0 bytes on the stream, but we must send a packet
698 if (call->nFree == 0) {
699 if (!call->error && cp) {
700 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
701 /* Wait until TQ_BUSY is reset before adding any
702 * packets to the transmit queue
704 while (call->flags & RX_CALL_TQ_BUSY) {
705 call->flags |= RX_CALL_TQ_WAIT;
706 #ifdef RX_ENABLE_LOCKS
707 CV_WAIT(&call->cv_tq, &call->lock);
708 #else /* RX_ENABLE_LOCKS */
709 osi_rxSleep(&call->tq);
710 #endif /* RX_ENABLE_LOCKS */
712 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
713 clock_NewTime(); /* Bogus: need new time package */
714 /* The 0, below, specifies that it is not the last packet:
715 * there will be others. PrepareSendPacket may
716 * alter the packet length by up to
717 * conn->securityMaxTrailerSize */
718 hadd32(call->bytesSent, cp->length);
719 rxi_PrepareSendPacket(call, cp, 0);
720 queue_Append(&call->tq, cp);
721 cp = call->currentPacket = NULL;
722 if (!(call->flags & (RX_CALL_FAST_RECOVER|
723 RX_CALL_FAST_RECOVER_WAIT))) {
724 rxi_Start(0, call, 0);
727 /* Wait for transmit window to open up */
728 while (!call->error && call->tnext + 1 > call->tfirst + call->twind) {
730 call->startWait = clock_Sec();
732 #ifdef RX_ENABLE_LOCKS
733 CV_WAIT(&call->cv_twind, &call->lock);
735 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
736 osi_rxSleep(&call->twind);
740 #ifdef RX_ENABLE_LOCKS
744 #endif /* RX_ENABLE_LOCKS */
746 if ((cp = rxi_AllocSendPacket(call, nbytes))) {
747 call->currentPacket = cp;
748 call->nFree = cp->length;
749 call->curvec = 1; /* 0th vec is always header */
750 /* begin at the beginning [ more or less ], continue
751 * on until the end, then stop. */
752 call->curpos = (char *)cp->wirevec[1].iov_base
753 + call->conn->securityHeaderSize;
754 call->curlen = cp->wirevec[1].iov_len
755 - call->conn->securityHeaderSize;
760 call->currentPacket = NULL;
766 if (cp && (int)call->nFree < nbytes) {
767 /* Try to extend the current buffer */
768 register int len, mud;
770 mud = rx_MaxUserDataSize(call);
773 want = MIN(nbytes - (int)call->nFree, mud - len);
774 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
775 if (cp->length > (unsigned)mud)
777 call->nFree += (cp->length - len);
781 /* If the remaining bytes fit in the buffer, then store them
782 * and return. Don't ship a buffer that's full immediately to
783 * the peer--we don't know if it's the last buffer yet */
789 while (nbytes && call->nFree) {
791 t = MIN((int)call->curlen, nbytes);
792 t = MIN((int)call->nFree, t);
793 bcopy (buf, call->curpos, t);
801 /* need to get another struct iov */
802 if (++call->curvec >= cp->niovecs) {
803 /* current packet is full, extend or send it */
806 call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
807 call->curlen = cp->wirevec[call->curvec].iov_len;
810 } /* while bytes to send and room to send them */
812 /* might be out of space now */
816 else ; /* more data to send, so get another packet and keep going */
819 return requestCount - nbytes;
822 int rx_WriteProc(call, buf, nbytes)
823 struct rx_call *call;
834 * Free any packets from the last call to ReadvProc/WritevProc.
835 * We do not need the lock because the receiver threads only
836 * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
837 * RX_CALL_IOVEC_WAIT is always cleared before returning from
838 * ReadvProc/WritevProc.
840 if (!queue_IsEmpty(&call->iovq)) {
841 register struct rx_packet *rp;
842 register struct rx_packet *nxp;
843 for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
850 * Most common case: all of the data fits in the current iovec.
851 * We do not need the lock because this is the only thread that
852 * updates the curlen, curpos, nFree fields.
854 * We are relying on nFree being zero unless the call is in send mode.
856 tcurlen = (int)call->curlen;
857 tnFree = (int)call->nFree;
858 if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
859 tcurpos = call->curpos;
860 bcopy(buf, tcurpos, nbytes);
861 call->curpos = tcurpos + nbytes;
862 call->curlen = tcurlen - nbytes;
863 call->nFree = tnFree - nbytes;
869 MUTEX_ENTER(&call->lock);
870 bytes = rxi_WriteProc(call, buf, nbytes);
871 MUTEX_EXIT(&call->lock);
877 /* Optimization for marshalling 32 bit arguments */
878 int rx_WriteProc32(call, value)
879 register struct rx_call *call;
880 register afs_int32 *value;
889 * Free any packets from the last call to ReadvProc/WritevProc.
890 * We do not need the lock because the receiver threads only
891 * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
892 * RX_CALL_IOVEC_WAIT is always cleared before returning from
893 * ReadvProc/WritevProc.
895 if (!queue_IsEmpty(&call->iovq)) {
896 register struct rx_packet *rp;
897 register struct rx_packet *nxp;
898 for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
905 * Most common case: all of the data fits in the current iovec.
906 * We do not need the lock because this is the only thread that
907 * updates the curlen, curpos, nFree fields.
909 * We are relying on nFree being zero unless the call is in send mode.
911 tcurlen = (int)call->curlen;
912 tnFree = (int)call->nFree;
913 if (!call->error && tcurlen >= sizeof(afs_int32) && tnFree >= sizeof(afs_int32)) {
914 tcurpos = call->curpos;
915 if (!((long)tcurpos & (sizeof(afs_int32)-1))) {
916 *((afs_int32 *)(tcurpos)) = *value;
918 bcopy((char *)value, tcurpos, sizeof(afs_int32));
920 call->curpos = tcurpos + sizeof(afs_int32);
921 call->curlen = tcurlen - sizeof(afs_int32);
922 call->nFree = tnFree - sizeof(afs_int32);
923 return sizeof(afs_int32);
928 MUTEX_ENTER(&call->lock);
929 bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
930 MUTEX_EXIT(&call->lock);
936 /* rxi_WritevAlloc -- internal version.
938 * Fill in an iovec to point to data in packet buffers. The application
939 * calls rxi_WritevProc when the buffers are full.
941 * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
943 int rxi_WritevAlloc(call, iov, nio, maxio, nbytes)
944 struct rx_call *call;
950 struct rx_connection *conn = call->conn;
951 struct rx_packet *cp = call->currentPacket;
952 struct rx_packet *tp; /* temporary packet pointer */
953 struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
956 /* Temporary values, real work is done in rxi_WritevProc */
962 requestCount = nbytes;
965 /* Free any packets from the last call to ReadvProc/WritevProc */
966 for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
971 if (call->mode != RX_MODE_SENDING) {
972 if ((conn->type == RX_SERVER_CONNECTION)
973 && (call->mode == RX_MODE_RECEIVING)) {
974 call->mode = RX_MODE_SENDING;
977 cp = call->currentPacket = (struct rx_packet *) 0;
987 /* Set up the iovec to point to data in packet buffers. */
988 tnFree = call->nFree;
989 tcurvec = call->curvec;
990 tcurpos = call->curpos;
991 tcurlen = call->curlen;
993 register unsigned int t;
996 /* current packet is full, allocate a new one */
997 cp = rxi_AllocSendPacket(call, nbytes);
999 /* out of space, return what we have */
1001 return requestCount - nbytes;
1003 queue_Append(&call->iovq, cp);
1004 tnFree = cp->length;
1006 tcurpos = (char *)cp->wirevec[1].iov_base
1007 + call->conn->securityHeaderSize;
1008 tcurlen = cp->wirevec[1].iov_len
1009 - call->conn->securityHeaderSize;
1012 if (tnFree < nbytes) {
1013 /* try to extend the current packet */
1014 register int len, mud;
1016 mud = rx_MaxUserDataSize(call);
1019 want = MIN(nbytes - tnFree, mud - len);
1020 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
1021 if (cp->length > (unsigned)mud)
1023 tnFree += (cp->length - len);
1024 if (cp == call->currentPacket) {
1025 call->nFree += (cp->length - len);
1030 /* fill in the next entry in the iovec */
1031 t = MIN(tcurlen, nbytes);
1033 iov[nextio].iov_base = tcurpos;
1034 iov[nextio].iov_len = t;
1042 /* need to get another struct iov */
1043 if (++tcurvec >= cp->niovecs) {
1044 /* current packet is full, extend it or move on to next packet */
1047 tcurpos = (char *)cp->wirevec[tcurvec].iov_base;
1048 tcurlen = cp->wirevec[tcurvec].iov_len;
1051 } while (nbytes && nextio < maxio);
1053 return requestCount - nbytes;
1056 int rx_WritevAlloc(call, iov, nio, maxio, nbytes)
1057 struct rx_call *call;
1068 MUTEX_ENTER(&call->lock);
1069 bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
1070 MUTEX_EXIT(&call->lock);
1076 int rx_WritevInit(call)
1077 struct rx_call *call;
1083 * Free any packets from the last call to ReadvProc/WritevProc.
1084 * We do not need the lock because the receiver threads only
1085 * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
1086 * RX_CALL_IOVEC_WAIT is always cleared before returning from
1087 * ReadvProc/WritevProc.
1089 if (!queue_IsEmpty(&call->iovq)) {
1090 register struct rx_packet *rp;
1091 register struct rx_packet *nxp;
1092 for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
1100 MUTEX_ENTER(&call->lock);
1101 bytes = rxi_WriteProc(call, &bytes, 0);
1102 MUTEX_EXIT(&call->lock);
1108 /* rxi_WritevProc -- internal version.
1110 * Send buffers allocated in rxi_WritevAlloc.
1112 * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
1114 int rxi_WritevProc(call, iov, nio, nbytes)
1115 struct rx_call *call;
1120 struct rx_packet *cp = call->currentPacket;
1121 register struct rx_packet *tp; /* Temporary packet pointer */
1122 register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1125 struct rx_queue tmpq;
1127 requestCount = nbytes;
1130 if (call->mode != RX_MODE_SENDING) {
1131 call->error = RX_PROTOCOL_ERROR;
1134 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1135 /* Wait until TQ_BUSY is reset before trying to move any
1136 * packets to the transmit queue. */
1137 while (!call->error && call->flags & RX_CALL_TQ_BUSY) {
1138 call->flags |= RX_CALL_TQ_WAIT;
1139 #ifdef RX_ENABLE_LOCKS
1140 CV_WAIT(&call->cv_tq, &call->lock);
1141 #else /* RX_ENABLE_LOCKS */
1142 osi_rxSleep(&call->tq);
1143 #endif /* RX_ENABLE_LOCKS */
1145 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1148 for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1154 cp = call->currentPacket = NULL;
1159 /* Loop through the I/O vector adjusting packet pointers.
1160 * Place full packets back onto the iovq once they are ready
1161 * to send. Set RX_PROTOCOL_ERROR if any problems are found in
1162 * the iovec. We put the loop condition at the end to ensure that
1163 * a zero length write will push a short packet. */
1167 if (call->nFree == 0 && cp) {
1168 clock_NewTime(); /* Bogus: need new time package */
1169 /* The 0, below, specifies that it is not the last packet:
1170 * there will be others. PrepareSendPacket may
1171 * alter the packet length by up to
1172 * conn->securityMaxTrailerSize */
1173 hadd32(call->bytesSent, cp->length);
1174 rxi_PrepareSendPacket(call, cp, 0);
1175 queue_Append(&tmpq, cp);
1177 /* The head of the iovq is now the current packet */
1179 if (queue_IsEmpty(&call->iovq)) {
1180 call->error = RX_PROTOCOL_ERROR;
1181 cp = call->currentPacket = NULL;
1182 for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
1188 cp = queue_First(&call->iovq, rx_packet);
1190 call->currentPacket = cp;
1191 call->nFree = cp->length;
1193 call->curpos = (char *)cp->wirevec[1].iov_base
1194 + call->conn->securityHeaderSize;
1195 call->curlen = cp->wirevec[1].iov_len
1196 - call->conn->securityHeaderSize;
1201 /* The next iovec should point to the current position */
1202 if (iov[nextio].iov_base != call->curpos
1203 || iov[nextio].iov_len > (int)call->curlen) {
1204 call->error = RX_PROTOCOL_ERROR;
1205 for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
1211 call->currentPacket = NULL;
1215 nbytes -= iov[nextio].iov_len;
1216 call->curpos += iov[nextio].iov_len;
1217 call->curlen -= iov[nextio].iov_len;
1218 call->nFree -= iov[nextio].iov_len;
1220 if (call->curlen == 0) {
1221 if (++call->curvec > cp->niovecs) {
1224 call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
1225 call->curlen = cp->wirevec[call->curvec].iov_len;
1229 } while (nbytes && nextio < nio);
1231 /* Move the packets from the temporary queue onto the transmit queue.
1232 * We may end up with more than call->twind packets on the queue. */
1233 for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
1235 queue_Append(&call->tq, tp);
1238 if (!(call->flags & (RX_CALL_FAST_RECOVER|RX_CALL_FAST_RECOVER_WAIT))) {
1239 rxi_Start(0, call, 0);
1242 /* Wait for the length of the transmit queue to fall below call->twind */
1243 while (!call->error && call->tnext + 1 > call->tfirst + call->twind) {
1245 call->startWait = clock_Sec();
1246 #ifdef RX_ENABLE_LOCKS
1247 CV_WAIT(&call->cv_twind, &call->lock);
1249 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
1250 osi_rxSleep(&call->twind);
1252 call->startWait = 0;
1258 cp = call->currentPacket = NULL;
1263 return requestCount - nbytes;
1266 int rx_WritevProc(call, iov, nio, nbytes)
1267 struct rx_call *call;
1277 MUTEX_ENTER(&call->lock);
1278 bytes = rxi_WritevProc(call, iov, nio, nbytes);
1279 MUTEX_EXIT(&call->lock);
1285 /* Flush any buffered data to the stream, switch to read mode
1286 * (clients) or to EOF mode (servers) */
1287 void rxi_FlushWrite(call)
1288 register struct rx_call *call;
1290 register struct rx_packet *cp = call->currentPacket;
1291 register struct rx_packet *tp; /* Temporary packet pointer */
1292 register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
1294 /* Free any packets from the last call to ReadvProc/WritevProc */
1295 for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
1300 if (call->mode == RX_MODE_SENDING) {
1302 call->mode = (call->conn->type == RX_CLIENT_CONNECTION ?
1303 RX_MODE_RECEIVING: RX_MODE_EOF);
1305 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1306 /* Wait until TQ_BUSY is reset before adding any
1307 * packets to the transmit queue
1309 while (call->flags & RX_CALL_TQ_BUSY) {
1310 call->flags |= RX_CALL_TQ_WAIT;
1311 #ifdef RX_ENABLE_LOCKS
1312 CV_WAIT(&call->cv_tq, &call->lock);
1313 #else /* RX_ENABLE_LOCKS */
1314 osi_rxSleep(&call->tq);
1315 #endif /* RX_ENABLE_LOCKS */
1317 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1320 /* cp->length is only supposed to be the user's data */
1321 /* cp->length was already set to (then-current)
1322 * MaxUserDataSize or less. */
1323 cp->length -= call->nFree;
1324 call->currentPacket = (struct rx_packet *) 0;
1328 cp = rxi_AllocSendPacket(call,0);
1330 /* Mode can no longer be MODE_SENDING */
1334 cp->niovecs = 1; /* just the header */
1338 /* The 1 specifies that this is the last packet */
1339 hadd32(call->bytesSent, cp->length);
1340 rxi_PrepareSendPacket(call, cp, 1);
1341 queue_Append(&call->tq, cp);
1342 if (!(call->flags & (RX_CALL_FAST_RECOVER|
1343 RX_CALL_FAST_RECOVER_WAIT))) {
1344 rxi_Start(0, call, 0);
1349 /* Flush any buffered data to the stream, switch to read mode
1350 * (clients) or to EOF mode (servers) */
1351 void rx_FlushWrite(call)
1352 struct rx_call *call;
1357 MUTEX_ENTER(&call->lock);
1358 rxi_FlushWrite(call);
1359 MUTEX_EXIT(&call->lock);