2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
12 #include "afs/param.h"
14 #include <afs/param.h>
20 #ifdef RX_KERNEL_TRACE
21 #include "rx_kcommon.h"
23 #if defined(AFS_DARWIN_ENV) || defined(AFS_XBSD_ENV)
24 #include "afs/sysincludes.h"
29 #if defined(AFS_AIX_ENV) || defined(AFS_AUX_ENV) || defined(AFS_SUN5_ENV)
33 #include <net/net_globals.h>
34 #endif /* AFS_OSF_ENV */
35 #ifdef AFS_LINUX20_ENV
38 #include "netinet/in.h"
39 #if defined(AFS_SGI_ENV)
40 #include "afs/sysincludes.h"
43 #include "afs/afs_args.h"
44 #include "afs/afs_osi.h"
45 #if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
49 #include "afs/sysincludes.h"
52 #undef RXDEBUG /* turn off debugging */
55 #include "rx_kmutex.h"
56 #include "rx/rx_kernel.h"
57 #include "rx/rx_clock.h"
58 #include "rx/rx_queue.h"
60 #include "rx/rx_globals.h"
68 #endif /* AFS_OSF_ENV */
70 # include <sys/types.h>
72 # include <winsock2.h>
73 #else /* !AFS_NT40_ENV */
74 # include <sys/socket.h>
75 # include <sys/file.h>
77 # include <netinet/in.h>
78 # include <sys/stat.h>
79 # include <sys/time.h>
80 #endif /* !AFS_NT40_ENV */
86 # include "rx_clock.h"
87 # include "rx_queue.h"
89 # include "rx_globals.h"
93 /* rxdb_fileID is used to identify the lock location, along with line#. */
94 static int rxdb_fileID = RXDB_FILE_RX_RDWR;
95 #endif /* RX_LOCKS_DB */
96 /* rxi_ReadProc -- internal version.
98 * LOCKS USED -- called at netpri
101 rxi_ReadProc(struct rx_call *call, char *buf,
104 struct rx_packet *cp = call->currentPacket;
105 struct rx_packet *rp;
109 /* XXXX took out clock_NewTime from here. Was it needed? */
110 requestCount = nbytes;
112 /* Free any packets from the last call to ReadvProc/WritevProc */
113 if (queue_IsNotEmpty(&call->iovq)) {
114 #ifdef RXDEBUG_PACKET
116 #endif /* RXDEBUG_PACKET */
117 rxi_FreePackets(0, &call->iovq);
121 if (call->nLeft == 0) {
122 /* Get next packet */
123 MUTEX_ENTER(&call->lock);
125 if (call->error || (call->mode != RX_MODE_RECEIVING)) {
127 call->mode = RX_MODE_ERROR;
128 MUTEX_EXIT(&call->lock);
131 if (call->mode == RX_MODE_SENDING) {
132 MUTEX_EXIT(&call->lock);
133 rxi_FlushWrite(call);
134 MUTEX_ENTER(&call->lock);
138 if (queue_IsNotEmpty(&call->rq)) {
139 /* Check that next packet available is next in sequence */
140 rp = queue_First(&call->rq, rx_packet);
141 if (rp->header.seq == call->rnext) {
143 struct rx_connection *conn = call->conn;
145 #ifdef RX_TRACK_PACKETS
146 rp->flags &= ~RX_PKTFLAG_RQ;
148 #ifdef RXDEBUG_PACKET
150 #endif /* RXDEBUG_PACKET */
152 /* RXS_CheckPacket called to undo RXS_PreparePacket's
153 * work. It may reduce the length of the packet by up
154 * to conn->maxTrailerSize, to reflect the length of the
155 * data + the header. */
157 RXS_CheckPacket(conn->securityObject, call,
159 /* Used to merely shut down the call, but now we
160 * shut down the whole connection since this may
161 * indicate an attempt to hijack it */
163 MUTEX_EXIT(&call->lock);
164 rxi_ConnectionError(conn, error);
165 MUTEX_ENTER(&conn->conn_data_lock);
166 rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
167 MUTEX_EXIT(&conn->conn_data_lock);
173 cp = call->currentPacket = rp;
174 #ifdef RX_TRACK_PACKETS
175 call->currentPacket->flags |= RX_PKTFLAG_CP;
177 call->curvec = 1; /* 0th vec is always header */
178 /* begin at the beginning [ more or less ], continue
179 * on until the end, then stop. */
181 (char *)cp->wirevec[1].iov_base +
182 call->conn->securityHeaderSize;
184 cp->wirevec[1].iov_len -
185 call->conn->securityHeaderSize;
187 /* Notice that this code works correctly if the data
188 * size is 0 (which it may be--no reply arguments from
189 * server, for example). This relies heavily on the
190 * fact that the code below immediately frees the packet
191 * (no yields, etc.). If it didn't, this would be a
192 * problem because a value of zero for call->nLeft
193 * normally means that there is no read packet */
194 call->nLeft = cp->length;
195 hadd32(call->bytesRcvd, cp->length);
197 /* Send a hard ack for every rxi_HardAckRate+1 packets
198 * consumed. Otherwise schedule an event to send
199 * the hard ack later on.
202 if (!(call->flags & RX_CALL_RECEIVE_DONE)) {
203 if (call->nHardAcks > (u_short) rxi_HardAckRate) {
204 rxevent_Cancel(call->delayedAckEvent, call,
205 RX_CALL_REFCOUNT_DELAY);
206 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
208 struct clock when, now;
211 /* Delay to consolidate ack packets */
212 clock_Add(&when, &rx_hardAckDelay);
213 if (!call->delayedAckEvent
214 || clock_Gt(&call->delayedAckEvent->
216 rxevent_Cancel(call->delayedAckEvent,
218 RX_CALL_REFCOUNT_DELAY);
219 MUTEX_ENTER(&rx_refcnt_mutex);
220 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
221 MUTEX_EXIT(&rx_refcnt_mutex);
222 call->delayedAckEvent =
223 rxevent_PostNow(&when, &now,
224 rxi_SendDelayedAck, call,
234 * If we reach this point either we have no packets in the
235 * receive queue or the next packet in the queue is not the
236 * one we are looking for. There is nothing else for us to
237 * do but wait for another packet to arrive.
240 /* Are there ever going to be any more packets? */
241 if (call->flags & RX_CALL_RECEIVE_DONE) {
242 MUTEX_EXIT(&call->lock);
243 return requestCount - nbytes;
245 /* Wait for in-sequence packet */
246 call->flags |= RX_CALL_READER_WAIT;
248 call->startWait = clock_Sec();
249 while (call->flags & RX_CALL_READER_WAIT) {
250 #ifdef RX_ENABLE_LOCKS
251 CV_WAIT(&call->cv_rq, &call->lock);
253 osi_rxSleep(&call->rq);
256 cp = call->currentPacket;
259 #ifdef RX_ENABLE_LOCKS
261 MUTEX_EXIT(&call->lock);
264 #endif /* RX_ENABLE_LOCKS */
266 MUTEX_EXIT(&call->lock);
269 /* MTUXXX this should be replaced by some error-recovery code before shipping */
270 /* yes, the following block is allowed to be the ELSE clause (or not) */
271 /* It's possible for call->nLeft to be smaller than any particular
272 * iov_len. Usually, recvmsg doesn't change the iov_len, since it
273 * reflects the size of the buffer. We have to keep track of the
274 * number of bytes read in the length field of the packet struct. On
275 * the final portion of a received packet, it's almost certain that
276 * call->nLeft will be smaller than the final buffer. */
277 while (nbytes && cp) {
278 t = MIN((int)call->curlen, nbytes);
279 t = MIN(t, (int)call->nLeft);
280 memcpy(buf, call->curpos, t);
288 /* out of packet. Get another one. */
289 #ifdef RX_TRACK_PACKETS
290 call->currentPacket->flags &= ~RX_PKTFLAG_CP;
293 cp = call->currentPacket = (struct rx_packet *)0;
294 } else if (!call->curlen) {
295 /* need to get another struct iov */
296 if (++call->curvec >= cp->niovecs) {
297 /* current packet is exhausted, get ready for another */
298 /* don't worry about curvec and stuff, they get set somewhere else */
299 #ifdef RX_TRACK_PACKETS
300 call->currentPacket->flags &= ~RX_PKTFLAG_CP;
303 cp = call->currentPacket = (struct rx_packet *)0;
307 (char *)cp->wirevec[call->curvec].iov_base;
308 call->curlen = cp->wirevec[call->curvec].iov_len;
313 /* user buffer is full, return */
323 rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
331 /* Free any packets from the last call to ReadvProc/WritevProc */
332 if (!queue_IsEmpty(&call->iovq)) {
333 #ifdef RXDEBUG_PACKET
335 #endif /* RXDEBUG_PACKET */
336 rxi_FreePackets(0, &call->iovq);
340 * Most common case, all of the data is in the current iovec.
341 * We are relying on nLeft being zero unless the call is in receive mode.
343 tcurlen = call->curlen;
344 tnLeft = call->nLeft;
345 if (!call->error && tcurlen > nbytes && tnLeft > nbytes) {
346 tcurpos = call->curpos;
347 memcpy(buf, tcurpos, nbytes);
349 call->curpos = tcurpos + nbytes;
350 call->curlen = tcurlen - nbytes;
351 call->nLeft = tnLeft - nbytes;
353 if (!call->nLeft && call->currentPacket != NULL) {
354 /* out of packet. Get another one. */
355 rxi_FreePacket(call->currentPacket);
356 call->currentPacket = (struct rx_packet *)0;
362 bytes = rxi_ReadProc(call, buf, nbytes);
367 /* Optimization for unmarshalling 32 bit integers */
369 rx_ReadProc32(struct rx_call *call, afs_int32 * value)
377 /* Free any packets from the last call to ReadvProc/WritevProc */
378 if (!queue_IsEmpty(&call->iovq)) {
379 #ifdef RXDEBUG_PACKET
381 #endif /* RXDEBUG_PACKET */
382 rxi_FreePackets(0, &call->iovq);
386 * Most common case, all of the data is in the current iovec.
387 * We are relying on nLeft being zero unless the call is in receive mode.
389 tcurlen = call->curlen;
390 tnLeft = call->nLeft;
391 if (!call->error && tcurlen >= sizeof(afs_int32)
392 && tnLeft >= sizeof(afs_int32)) {
393 tcurpos = call->curpos;
395 memcpy((char *)value, tcurpos, sizeof(afs_int32));
397 call->curpos = tcurpos + sizeof(afs_int32);
398 call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
399 call->nLeft = (u_short)(tnLeft - sizeof(afs_int32));
400 if (!call->nLeft && call->currentPacket != NULL) {
401 /* out of packet. Get another one. */
402 rxi_FreePacket(call->currentPacket);
403 call->currentPacket = (struct rx_packet *)0;
405 return sizeof(afs_int32);
409 bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
417 * Uses packets in the receive queue to fill in as much of the
418 * current iovec as possible. Does not block if it runs out
419 * of packets to complete the iovec. Return true if an ack packet
420 * was sent, otherwise return false */
422 rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
427 struct rx_packet *rp;
428 struct rx_packet *curp;
429 struct iovec *call_iov;
430 struct iovec *cur_iov = NULL;
432 curp = call->currentPacket;
434 cur_iov = &curp->wirevec[call->curvec];
436 call_iov = &call->iov[call->iovNext];
438 while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) {
439 if (call->nLeft == 0) {
440 /* Get next packet */
441 if (queue_IsNotEmpty(&call->rq)) {
442 /* Check that next packet available is next in sequence */
443 rp = queue_First(&call->rq, rx_packet);
444 if (rp->header.seq == call->rnext) {
446 struct rx_connection *conn = call->conn;
448 #ifdef RX_TRACK_PACKETS
449 rp->flags &= ~RX_PKTFLAG_RQ;
451 #ifdef RXDEBUG_PACKET
453 #endif /* RXDEBUG_PACKET */
455 /* RXS_CheckPacket called to undo RXS_PreparePacket's
456 * work. It may reduce the length of the packet by up
457 * to conn->maxTrailerSize, to reflect the length of the
458 * data + the header. */
460 RXS_CheckPacket(conn->securityObject, call, rp))) {
461 /* Used to merely shut down the call, but now we
462 * shut down the whole connection since this may
463 * indicate an attempt to hijack it */
465 MUTEX_EXIT(&call->lock);
466 rxi_ConnectionError(conn, error);
467 MUTEX_ENTER(&conn->conn_data_lock);
468 rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
469 MUTEX_EXIT(&conn->conn_data_lock);
471 MUTEX_ENTER(&call->lock);
476 curp = call->currentPacket = rp;
477 #ifdef RX_TRACK_PACKETS
478 call->currentPacket->flags |= RX_PKTFLAG_CP;
480 call->curvec = 1; /* 0th vec is always header */
481 cur_iov = &curp->wirevec[1];
482 /* begin at the beginning [ more or less ], continue
483 * on until the end, then stop. */
485 (char *)curp->wirevec[1].iov_base +
486 call->conn->securityHeaderSize;
488 curp->wirevec[1].iov_len -
489 call->conn->securityHeaderSize;
491 /* Notice that this code works correctly if the data
492 * size is 0 (which it may be--no reply arguments from
493 * server, for example). This relies heavily on the
494 * fact that the code below immediately frees the packet
495 * (no yields, etc.). If it didn't, this would be a
496 * problem because a value of zero for call->nLeft
497 * normally means that there is no read packet */
498 call->nLeft = curp->length;
499 hadd32(call->bytesRcvd, curp->length);
501 /* Send a hard ack for every rxi_HardAckRate+1 packets
502 * consumed. Otherwise schedule an event to send
503 * the hard ack later on.
513 /* It's possible for call->nLeft to be smaller than any particular
514 * iov_len. Usually, recvmsg doesn't change the iov_len, since it
515 * reflects the size of the buffer. We have to keep track of the
516 * number of bytes read in the length field of the packet struct. On
517 * the final portion of a received packet, it's almost certain that
518 * call->nLeft will be smaller than the final buffer. */
519 while (call->iovNBytes && call->iovNext < call->iovMax && curp) {
521 t = MIN((int)call->curlen, call->iovNBytes);
522 t = MIN(t, (int)call->nLeft);
523 call_iov->iov_base = call->curpos;
524 call_iov->iov_len = t;
527 call->iovNBytes -= t;
533 /* out of packet. Get another one. */
534 #ifdef RX_TRACK_PACKETS
535 curp->flags &= ~RX_PKTFLAG_CP;
536 curp->flags |= RX_PKTFLAG_IOVQ;
538 queue_Append(&call->iovq, curp);
539 #ifdef RXDEBUG_PACKET
541 #endif /* RXDEBUG_PACKET */
542 curp = call->currentPacket = (struct rx_packet *)0;
543 } else if (!call->curlen) {
544 /* need to get another struct iov */
545 if (++call->curvec >= curp->niovecs) {
546 /* current packet is exhausted, get ready for another */
547 /* don't worry about curvec and stuff, they get set somewhere else */
548 #ifdef RX_TRACK_PACKETS
549 curp->flags &= ~RX_PKTFLAG_CP;
550 curp->flags |= RX_PKTFLAG_IOVQ;
552 queue_Append(&call->iovq, curp);
553 #ifdef RXDEBUG_PACKET
555 #endif /* RXDEBUG_PACKET */
556 curp = call->currentPacket = (struct rx_packet *)0;
560 call->curpos = (char *)cur_iov->iov_base;
561 call->curlen = cur_iov->iov_len;
567 /* If we consumed any packets then check whether we need to
568 * send a hard ack. */
569 if (didConsume && (!(call->flags & RX_CALL_RECEIVE_DONE))) {
570 if (call->nHardAcks > (u_short) rxi_HardAckRate) {
571 rxevent_Cancel(call->delayedAckEvent, call,
572 RX_CALL_REFCOUNT_DELAY);
573 rxi_SendAck(call, 0, serial, RX_ACK_DELAY, 0);
576 struct clock when, now;
579 /* Delay to consolidate ack packets */
580 clock_Add(&when, &rx_hardAckDelay);
581 if (!call->delayedAckEvent
582 || clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
583 rxevent_Cancel(call->delayedAckEvent, call,
584 RX_CALL_REFCOUNT_DELAY);
585 MUTEX_ENTER(&rx_refcnt_mutex);
586 CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
587 MUTEX_EXIT(&rx_refcnt_mutex);
588 call->delayedAckEvent =
589 rxevent_PostNow(&when, &now, rxi_SendDelayedAck, call, 0);
597 /* rxi_ReadvProc -- internal version.
599 * Fills in an iovec with pointers to the packet buffers. All packets
600 * except the last packet (new current packet) are moved to the iovq
601 * while the application is processing the data.
603 * LOCKS USED -- called at netpri.
606 rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
611 /* Free any packets from the last call to ReadvProc/WritevProc */
612 if (queue_IsNotEmpty(&call->iovq)) {
613 #ifdef RXDEBUG_PACKET
615 #endif /* RXDEBUG_PACKET */
616 rxi_FreePackets(0, &call->iovq);
619 if (call->mode == RX_MODE_SENDING) {
620 rxi_FlushWrite(call);
623 MUTEX_ENTER(&call->lock);
627 /* Get whatever data is currently available in the receive queue.
628 * If rxi_FillReadVec sends an ack packet then it is possible
629 * that we will receive more data while we drop the call lock
630 * to send the packet. Set the RX_CALL_IOVEC_WAIT flag
631 * here to avoid a race with the receive thread if we send
632 * hard acks in rxi_FillReadVec. */
633 call->flags |= RX_CALL_IOVEC_WAIT;
634 call->iovNBytes = nbytes;
635 call->iovMax = maxio;
638 rxi_FillReadVec(call, 0);
640 /* if we need more data then sleep until the receive thread has
641 * filled in the rest. */
642 if (!call->error && call->iovNBytes && call->iovNext < call->iovMax
643 && !(call->flags & RX_CALL_RECEIVE_DONE)) {
644 call->flags |= RX_CALL_READER_WAIT;
646 call->startWait = clock_Sec();
647 while (call->flags & RX_CALL_READER_WAIT) {
648 #ifdef RX_ENABLE_LOCKS
649 CV_WAIT(&call->cv_rq, &call->lock);
651 osi_rxSleep(&call->rq);
656 call->flags &= ~RX_CALL_IOVEC_WAIT;
662 *nio = call->iovNext;
663 bytes = nbytes - call->iovNBytes;
664 MUTEX_EXIT(&call->lock);
668 MUTEX_EXIT(&call->lock);
669 call->mode = RX_MODE_ERROR;
674 rx_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
681 bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
686 /* rxi_WriteProc -- internal version.
688 * LOCKS USED -- called at netpri
692 rxi_WriteProc(struct rx_call *call, char *buf,
695 struct rx_connection *conn = call->conn;
696 struct rx_packet *cp = call->currentPacket;
698 int requestCount = nbytes;
700 /* Free any packets from the last call to ReadvProc/WritevProc */
701 if (queue_IsNotEmpty(&call->iovq)) {
702 #ifdef RXDEBUG_PACKET
704 #endif /* RXDEBUG_PACKET */
705 rxi_FreePackets(0, &call->iovq);
708 if (call->mode != RX_MODE_SENDING) {
709 if ((conn->type == RX_SERVER_CONNECTION)
710 && (call->mode == RX_MODE_RECEIVING)) {
711 call->mode = RX_MODE_SENDING;
713 #ifdef RX_TRACK_PACKETS
714 cp->flags &= ~RX_PKTFLAG_CP;
717 cp = call->currentPacket = (struct rx_packet *)0;
726 /* Loop condition is checked at end, so that a write of 0 bytes
727 * will force a packet to be created--specially for the case where
728 * there are 0 bytes on the stream, but we must send a packet
731 if (call->nFree == 0) {
732 MUTEX_ENTER(&call->lock);
734 call->mode = RX_MODE_ERROR;
735 if (!call->error && cp) {
736 /* Clear the current packet now so that if
737 * we are forced to wait and drop the lock
738 * the packet we are planning on using
741 #ifdef RX_TRACK_PACKETS
742 cp->flags &= ~RX_PKTFLAG_CP;
744 call->currentPacket = (struct rx_packet *)0;
745 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
746 /* Wait until TQ_BUSY is reset before adding any
747 * packets to the transmit queue
749 while (call->flags & RX_CALL_TQ_BUSY) {
750 call->flags |= RX_CALL_TQ_WAIT;
752 #ifdef RX_ENABLE_LOCKS
753 CV_WAIT(&call->cv_tq, &call->lock);
754 #else /* RX_ENABLE_LOCKS */
755 osi_rxSleep(&call->tq);
756 #endif /* RX_ENABLE_LOCKS */
758 if (call->tqWaiters == 0)
759 call->flags &= ~RX_CALL_TQ_WAIT;
761 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
762 clock_NewTime(); /* Bogus: need new time package */
763 /* The 0, below, specifies that it is not the last packet:
764 * there will be others. PrepareSendPacket may
765 * alter the packet length by up to
766 * conn->securityMaxTrailerSize */
767 hadd32(call->bytesSent, cp->length);
768 rxi_PrepareSendPacket(call, cp, 0);
769 #ifdef RX_TRACK_PACKETS
770 cp->flags |= RX_PKTFLAG_TQ;
772 queue_Append(&call->tq, cp);
773 #ifdef RXDEBUG_PACKET
775 #endif /* RXDEBUG_PACKET */
776 cp = (struct rx_packet *)0;
779 flags & (RX_CALL_FAST_RECOVER |
780 RX_CALL_FAST_RECOVER_WAIT))) {
781 rxi_Start(0, call, 0, 0);
784 #ifdef RX_TRACK_PACKETS
785 cp->flags &= ~RX_PKTFLAG_CP;
788 cp = call->currentPacket = (struct rx_packet *)0;
790 /* Wait for transmit window to open up */
792 && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
794 call->startWait = clock_Sec();
796 #ifdef RX_ENABLE_LOCKS
797 CV_WAIT(&call->cv_twind, &call->lock);
799 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
800 osi_rxSleep(&call->twind);
804 #ifdef RX_ENABLE_LOCKS
806 call->mode = RX_MODE_ERROR;
807 MUTEX_EXIT(&call->lock);
810 #endif /* RX_ENABLE_LOCKS */
812 if ((cp = rxi_AllocSendPacket(call, nbytes))) {
813 #ifdef RX_TRACK_PACKETS
814 cp->flags |= RX_PKTFLAG_CP;
816 call->currentPacket = cp;
817 call->nFree = cp->length;
818 call->curvec = 1; /* 0th vec is always header */
819 /* begin at the beginning [ more or less ], continue
820 * on until the end, then stop. */
822 (char *)cp->wirevec[1].iov_base +
823 call->conn->securityHeaderSize;
825 cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
828 call->mode = RX_MODE_ERROR;
830 #ifdef RX_TRACK_PACKETS
831 cp->flags &= ~RX_PKTFLAG_CP;
834 call->currentPacket = NULL;
836 MUTEX_EXIT(&call->lock);
839 MUTEX_EXIT(&call->lock);
842 if (cp && (int)call->nFree < nbytes) {
843 /* Try to extend the current buffer */
846 mud = rx_MaxUserDataSize(call);
849 want = MIN(nbytes - (int)call->nFree, mud - len);
850 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
851 if (cp->length > (unsigned)mud)
853 call->nFree += (cp->length - len);
857 /* If the remaining bytes fit in the buffer, then store them
858 * and return. Don't ship a buffer that's full immediately to
859 * the peer--we don't know if it's the last buffer yet */
865 while (nbytes && call->nFree) {
867 t = MIN((int)call->curlen, nbytes);
868 t = MIN((int)call->nFree, t);
869 memcpy(call->curpos, buf, t);
873 call->curlen -= (u_short)t;
874 call->nFree -= (u_short)t;
877 /* need to get another struct iov */
878 if (++call->curvec >= cp->niovecs) {
879 /* current packet is full, extend or send it */
882 call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
883 call->curlen = cp->wirevec[call->curvec].iov_len;
886 } /* while bytes to send and room to send them */
888 /* might be out of space now */
891 } else; /* more data to send, so get another packet and keep going */
894 return requestCount - nbytes;
898 rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
906 /* Free any packets from the last call to ReadvProc/WritevProc */
907 if (queue_IsNotEmpty(&call->iovq)) {
908 #ifdef RXDEBUG_PACKET
910 #endif /* RXDEBUG_PACKET */
911 rxi_FreePackets(0, &call->iovq);
915 * Most common case: all of the data fits in the current iovec.
916 * We are relying on nFree being zero unless the call is in send mode.
918 tcurlen = (int)call->curlen;
919 tnFree = (int)call->nFree;
920 if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
921 tcurpos = call->curpos;
923 memcpy(tcurpos, buf, nbytes);
924 call->curpos = tcurpos + nbytes;
925 call->curlen = (u_short)(tcurlen - nbytes);
926 call->nFree = (u_short)(tnFree - nbytes);
931 bytes = rxi_WriteProc(call, buf, nbytes);
936 /* Optimization for marshalling 32 bit arguments */
938 rx_WriteProc32(struct rx_call *call, afs_int32 * value)
946 if (queue_IsNotEmpty(&call->iovq)) {
947 #ifdef RXDEBUG_PACKET
949 #endif /* RXDEBUG_PACKET */
950 rxi_FreePackets(0, &call->iovq);
954 * Most common case: all of the data fits in the current iovec.
955 * We are relying on nFree being zero unless the call is in send mode.
957 tcurlen = call->curlen;
958 tnFree = call->nFree;
959 if (!call->error && tcurlen >= sizeof(afs_int32)
960 && tnFree >= sizeof(afs_int32)) {
961 tcurpos = call->curpos;
963 if (!((size_t)tcurpos & (sizeof(afs_int32) - 1))) {
964 *((afs_int32 *) (tcurpos)) = *value;
966 memcpy(tcurpos, (char *)value, sizeof(afs_int32));
968 call->curpos = tcurpos + sizeof(afs_int32);
969 call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
970 call->nFree = (u_short)(tnFree - sizeof(afs_int32));
971 return sizeof(afs_int32);
975 bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
980 /* rxi_WritevAlloc -- internal version.
982 * Fill in an iovec to point to data in packet buffers. The application
983 * calls rxi_WritevProc when the buffers are full.
985 * LOCKS USED -- called at netpri.
989 rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
992 struct rx_connection *conn = call->conn;
993 struct rx_packet *cp = call->currentPacket;
996 /* Temporary values, real work is done in rxi_WritevProc */
998 unsigned int tcurvec;
1002 requestCount = nbytes;
1005 /* Free any packets from the last call to ReadvProc/WritevProc */
1006 if (queue_IsNotEmpty(&call->iovq)) {
1007 #ifdef RXDEBUG_PACKET
1009 #endif /* RXDEBUG_PACKET */
1010 rxi_FreePackets(0, &call->iovq);
1013 if (call->mode != RX_MODE_SENDING) {
1014 if ((conn->type == RX_SERVER_CONNECTION)
1015 && (call->mode == RX_MODE_RECEIVING)) {
1016 call->mode = RX_MODE_SENDING;
1018 #ifdef RX_TRACK_PACKETS
1019 cp->flags &= ~RX_PKTFLAG_CP;
1022 cp = call->currentPacket = (struct rx_packet *)0;
1031 /* Set up the iovec to point to data in packet buffers. */
1032 tnFree = call->nFree;
1033 tcurvec = call->curvec;
1034 tcurpos = call->curpos;
1035 tcurlen = call->curlen;
1040 /* current packet is full, allocate a new one */
1041 MUTEX_ENTER(&call->lock);
1042 cp = rxi_AllocSendPacket(call, nbytes);
1043 MUTEX_EXIT(&call->lock);
1045 /* out of space, return what we have */
1047 return requestCount - nbytes;
1049 #ifdef RX_TRACK_PACKETS
1050 cp->flags |= RX_PKTFLAG_IOVQ;
1052 queue_Append(&call->iovq, cp);
1053 #ifdef RXDEBUG_PACKET
1055 #endif /* RXDEBUG_PACKET */
1056 tnFree = cp->length;
1059 (char *)cp->wirevec[1].iov_base +
1060 call->conn->securityHeaderSize;
1061 tcurlen = cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
1064 if (tnFree < nbytes) {
1065 /* try to extend the current packet */
1068 mud = rx_MaxUserDataSize(call);
1071 want = MIN(nbytes - tnFree, mud - len);
1072 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
1073 if (cp->length > (unsigned)mud)
1075 tnFree += (cp->length - len);
1076 if (cp == call->currentPacket) {
1077 call->nFree += (cp->length - len);
1082 /* fill in the next entry in the iovec */
1083 t = MIN(tcurlen, nbytes);
1085 iov[nextio].iov_base = tcurpos;
1086 iov[nextio].iov_len = t;
1094 /* need to get another struct iov */
1095 if (++tcurvec >= cp->niovecs) {
1096 /* current packet is full, extend it or move on to next packet */
1099 tcurpos = (char *)cp->wirevec[tcurvec].iov_base;
1100 tcurlen = cp->wirevec[tcurvec].iov_len;
1103 } while (nbytes && nextio < maxio);
1105 return requestCount - nbytes;
1109 rx_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
1116 bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
1121 /* rxi_WritevProc -- internal version.
1123 * Send buffers allocated in rxi_WritevAlloc.
1125 * LOCKS USED -- called at netpri.
1128 rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1130 struct rx_packet *cp = NULL;
1131 #ifdef RX_TRACK_PACKETS
1132 struct rx_packet *p, *np;
1136 struct rx_queue tmpq;
1137 #ifdef RXDEBUG_PACKET
1141 requestCount = nbytes;
1144 MUTEX_ENTER(&call->lock);
1146 call->mode = RX_MODE_ERROR;
1147 } else if (call->mode != RX_MODE_SENDING) {
1148 call->error = RX_PROTOCOL_ERROR;
1150 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1151 /* Wait until TQ_BUSY is reset before trying to move any
1152 * packets to the transmit queue. */
1153 while (!call->error && call->flags & RX_CALL_TQ_BUSY) {
1154 call->flags |= RX_CALL_TQ_WAIT;
1156 #ifdef RX_ENABLE_LOCKS
1157 CV_WAIT(&call->cv_tq, &call->lock);
1158 #else /* RX_ENABLE_LOCKS */
1159 osi_rxSleep(&call->tq);
1160 #endif /* RX_ENABLE_LOCKS */
1162 if (call->tqWaiters == 0)
1163 call->flags &= ~RX_CALL_TQ_WAIT;
1165 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1166 cp = call->currentPacket;
1169 call->mode = RX_MODE_ERROR;
1170 MUTEX_EXIT(&call->lock);
1172 #ifdef RX_TRACK_PACKETS
1173 cp->flags &= ~RX_PKTFLAG_CP;
1174 cp->flags |= RX_PKTFLAG_IOVQ;
1176 queue_Prepend(&call->iovq, cp);
1177 #ifdef RXDEBUG_PACKET
1179 #endif /* RXDEBUG_PACKET */
1181 #ifdef RXDEBUG_PACKET
1183 #endif /* RXDEBUG_PACKET */
1184 rxi_FreePackets(0, &call->iovq);
1188 /* Loop through the I/O vector adjusting packet pointers.
1189 * Place full packets back onto the iovq once they are ready
1190 * to send. Set RX_PROTOCOL_ERROR if any problems are found in
1191 * the iovec. We put the loop condition at the end to ensure that
1192 * a zero length write will push a short packet. */
1195 #ifdef RXDEBUG_PACKET
1197 #endif /* RXDEBUG_PACKET */
1199 if (call->nFree == 0 && cp) {
1200 clock_NewTime(); /* Bogus: need new time package */
1201 /* The 0, below, specifies that it is not the last packet:
1202 * there will be others. PrepareSendPacket may
1203 * alter the packet length by up to
1204 * conn->securityMaxTrailerSize */
1205 hadd32(call->bytesSent, cp->length);
1206 rxi_PrepareSendPacket(call, cp, 0);
1207 queue_Append(&tmpq, cp);
1208 #ifdef RXDEBUG_PACKET
1210 #endif /* RXDEBUG_PACKET */
1211 cp = call->currentPacket = (struct rx_packet *)0;
1213 /* The head of the iovq is now the current packet */
1215 if (queue_IsEmpty(&call->iovq)) {
1216 MUTEX_EXIT(&call->lock);
1217 call->error = RX_PROTOCOL_ERROR;
1218 #ifdef RXDEBUG_PACKET
1220 #endif /* RXDEBUG_PACKET */
1221 rxi_FreePackets(0, &tmpq);
1224 cp = queue_First(&call->iovq, rx_packet);
1226 #ifdef RX_TRACK_PACKETS
1227 cp->flags &= ~RX_PKTFLAG_IOVQ;
1229 #ifdef RXDEBUG_PACKET
1231 #endif /* RXDEBUG_PACKET */
1232 #ifdef RX_TRACK_PACKETS
1233 cp->flags |= RX_PKTFLAG_CP;
1235 call->currentPacket = cp;
1236 call->nFree = cp->length;
1239 (char *)cp->wirevec[1].iov_base +
1240 call->conn->securityHeaderSize;
1242 cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
1247 /* The next iovec should point to the current position */
1248 if (iov[nextio].iov_base != call->curpos
1249 || iov[nextio].iov_len > (int)call->curlen) {
1250 call->error = RX_PROTOCOL_ERROR;
1251 MUTEX_EXIT(&call->lock);
1253 #ifdef RX_TRACK_PACKETS
1254 cp->flags &= ~RX_PKTFLAG_CP;
1256 queue_Prepend(&tmpq, cp);
1257 #ifdef RXDEBUG_PACKET
1259 #endif /* RXDEBUG_PACKET */
1260 cp = call->currentPacket = (struct rx_packet *)0;
1262 #ifdef RXDEBUG_PACKET
1264 #endif /* RXDEBUG_PACKET */
1265 rxi_FreePackets(0, &tmpq);
1268 nbytes -= iov[nextio].iov_len;
1269 call->curpos += iov[nextio].iov_len;
1270 call->curlen -= iov[nextio].iov_len;
1271 call->nFree -= iov[nextio].iov_len;
1273 if (call->curlen == 0) {
1274 if (++call->curvec > cp->niovecs) {
1277 call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
1278 call->curlen = cp->wirevec[call->curvec].iov_len;
1282 } while (nbytes && nextio < nio);
1284 /* Move the packets from the temporary queue onto the transmit queue.
1285 * We may end up with more than call->twind packets on the queue. */
1287 #ifdef RX_TRACK_PACKETS
1288 for (queue_Scan(&tmpq, p, np, rx_packet))
1290 p->flags |= RX_PKTFLAG_TQ;
1295 call->mode = RX_MODE_ERROR;
1297 queue_SpliceAppend(&call->tq, &tmpq);
1299 if (!(call->flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
1300 rxi_Start(0, call, 0, 0);
1303 /* Wait for the length of the transmit queue to fall below call->twind */
1304 while (!call->error && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
1306 call->startWait = clock_Sec();
1307 #ifdef RX_ENABLE_LOCKS
1308 CV_WAIT(&call->cv_twind, &call->lock);
1310 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
1311 osi_rxSleep(&call->twind);
1313 call->startWait = 0;
1316 /* cp is no longer valid since we may have given up the lock */
1317 cp = call->currentPacket;
1320 call->mode = RX_MODE_ERROR;
1321 call->currentPacket = NULL;
1322 MUTEX_EXIT(&call->lock);
1324 #ifdef RX_TRACK_PACKETS
1325 cp->flags &= ~RX_PKTFLAG_CP;
1331 MUTEX_EXIT(&call->lock);
1333 return requestCount - nbytes;
1337 rx_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1343 bytes = rxi_WritevProc(call, iov, nio, nbytes);
1348 /* Flush any buffered data to the stream, switch to read mode
1349 * (clients) or to EOF mode (servers)
1351 * LOCKS HELD: called at netpri.
1354 rxi_FlushWrite(struct rx_call *call)
1356 struct rx_packet *cp = NULL;
1358 /* Free any packets from the last call to ReadvProc/WritevProc */
1359 if (queue_IsNotEmpty(&call->iovq)) {
1360 #ifdef RXDEBUG_PACKET
1362 #endif /* RXDEBUG_PACKET */
1363 rxi_FreePackets(0, &call->iovq);
1366 if (call->mode == RX_MODE_SENDING) {
1369 (call->conn->type ==
1370 RX_CLIENT_CONNECTION ? RX_MODE_RECEIVING : RX_MODE_EOF);
1372 #ifdef RX_KERNEL_TRACE
1374 int glockOwner = ISAFS_GLOCK();
1377 afs_Trace3(afs_iclSetp, CM_TRACE_WASHERE, ICL_TYPE_STRING,
1378 __FILE__, ICL_TYPE_INT32, __LINE__, ICL_TYPE_POINTER,
1385 MUTEX_ENTER(&call->lock);
1386 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1387 /* Wait until TQ_BUSY is reset before adding any
1388 * packets to the transmit queue
1390 while (call->flags & RX_CALL_TQ_BUSY) {
1391 call->flags |= RX_CALL_TQ_WAIT;
1393 #ifdef RX_ENABLE_LOCKS
1394 CV_WAIT(&call->cv_tq, &call->lock);
1395 #else /* RX_ENABLE_LOCKS */
1396 osi_rxSleep(&call->tq);
1397 #endif /* RX_ENABLE_LOCKS */
1399 if (call->tqWaiters == 0)
1400 call->flags &= ~RX_CALL_TQ_WAIT;
1402 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1405 call->mode = RX_MODE_ERROR;
1407 cp = call->currentPacket;
1410 /* cp->length is only supposed to be the user's data */
1411 /* cp->length was already set to (then-current)
1412 * MaxUserDataSize or less. */
1413 #ifdef RX_TRACK_PACKETS
1414 cp->flags &= ~RX_PKTFLAG_CP;
1416 cp->length -= call->nFree;
1417 call->currentPacket = (struct rx_packet *)0;
1420 cp = rxi_AllocSendPacket(call, 0);
1422 /* Mode can no longer be MODE_SENDING */
1426 cp->niovecs = 2; /* header + space for rxkad stuff */
1430 /* The 1 specifies that this is the last packet */
1431 hadd32(call->bytesSent, cp->length);
1432 rxi_PrepareSendPacket(call, cp, 1);
1433 #ifdef RX_TRACK_PACKETS
1434 cp->flags |= RX_PKTFLAG_TQ;
1436 queue_Append(&call->tq, cp);
1437 #ifdef RXDEBUG_PACKET
1439 #endif /* RXDEBUG_PACKET */
1442 flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
1443 rxi_Start(0, call, 0, 0);
1445 MUTEX_EXIT(&call->lock);
1449 /* Flush any buffered data to the stream, switch to read mode
1450 * (clients) or to EOF mode (servers) */
1452 rx_FlushWrite(struct rx_call *call)
1456 rxi_FlushWrite(call);