2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
15 # ifdef RX_KERNEL_TRACE
16 # include "rx_kcommon.h"
18 # if defined(AFS_DARWIN_ENV) || defined(AFS_XBSD_ENV)
19 # include "afs/sysincludes.h"
24 # if defined(AFS_AIX_ENV) || defined(AFS_AUX_ENV) || defined(AFS_SUN5_ENV)
27 # ifdef AFS_LINUX20_ENV
28 # include "h/socket.h"
30 # include "netinet/in.h"
31 # if defined(AFS_SGI_ENV)
32 # include "afs/sysincludes.h"
35 # include "afs/afs_args.h"
36 # if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
40 # include "afs/sysincludes.h"
41 # endif /* !UKERNEL */
44 # undef RXDEBUG /* turn off debugging */
47 # include "afs/afs_osi.h"
48 # include "rx_kmutex.h"
49 # include "rx/rx_kernel.h"
50 # include "afs/lock.h"
58 #include "rx_globals.h"
59 #include "rx_atomic.h"
60 #include "rx_internal.h"
63 #include "rx_packet.h"
66 /* rxdb_fileID is used to identify the lock location, along with line#. */
67 static int rxdb_fileID = RXDB_FILE_RX_RDWR;
68 #endif /* RX_LOCKS_DB */
70 /* Get the next packet in the receive queue
72 * Dispose of the call's currentPacket, and move the next packet in the
73 * receive queue into the currentPacket field. If the next packet isn't
74 * available, then currentPacket is left NULL.
77 * The RX call to manipulate
79 * 0 on success, an error code on failure
82 * Must be called with the call locked. Unlocks the call if returning
87 rxi_GetNextPacket(struct rx_call *call) {
91 if (call->app.currentPacket != NULL) {
92 #ifdef RX_TRACK_PACKETS
93 call->app.currentPacket->flags |= RX_PKTFLAG_CP;
95 rxi_FreePacket(call->app.currentPacket);
96 call->app.currentPacket = NULL;
99 if (opr_queue_IsEmpty(&call->rq))
102 /* Check that next packet available is next in sequence */
103 rp = opr_queue_First(&call->rq, struct rx_packet, entry);
104 if (rp->header.seq != call->rnext)
107 opr_queue_Remove(&rp->entry);
108 #ifdef RX_TRACK_PACKETS
109 rp->flags &= ~RX_PKTFLAG_RQ;
111 #ifdef RXDEBUG_PACKET
113 #endif /* RXDEBUG_PACKET */
115 /* RXS_CheckPacket called to undo RXS_PreparePacket's work. It may
116 * reduce the length of the packet by up to conn->maxTrailerSize,
117 * to reflect the length of the data + the header. */
118 if ((error = RXS_CheckPacket(call->conn->securityObject, call, rp))) {
119 /* Used to merely shut down the call, but now we shut down the whole
120 * connection since this may indicate an attempt to hijack it */
122 MUTEX_EXIT(&call->lock);
123 rxi_ConnectionError(call->conn, error);
124 MUTEX_ENTER(&call->conn->conn_data_lock);
125 rp = rxi_SendConnectionAbort(call->conn, rp, 0, 0);
126 MUTEX_EXIT(&call->conn->conn_data_lock);
133 call->app.currentPacket = rp;
134 #ifdef RX_TRACK_PACKETS
135 call->app.currentPacket->flags |= RX_PKTFLAG_CP;
137 call->app.curvec = 1; /* 0th vec is always header */
139 /* begin at the beginning [ more or less ], continue on until the end,
141 call->app.curpos = (char *)call->app.currentPacket->wirevec[1].iov_base +
142 call->conn->securityHeaderSize;
143 call->app.curlen = call->app.currentPacket->wirevec[1].iov_len -
144 call->conn->securityHeaderSize;
146 call->app.nLeft = call->app.currentPacket->length;
147 call->app.bytesRcvd += call->app.currentPacket->length;
154 /* rxi_ReadProc -- internal version.
156 * LOCKS USED -- called at netpri
159 rxi_ReadProc(struct rx_call *call, char *buf,
166 /* XXXX took out clock_NewTime from here. Was it needed? */
167 requestCount = nbytes;
169 /* Free any packets from the last call to ReadvProc/WritevProc */
170 if (!opr_queue_IsEmpty(&call->app.iovq)) {
171 #ifdef RXDEBUG_PACKET
173 #endif /* RXDEBUG_PACKET */
174 rxi_FreePackets(0, &call->app.iovq);
178 if (call->app.nLeft == 0) {
179 /* Get next packet */
180 MUTEX_ENTER(&call->lock);
182 if (call->error || (call->app.mode != RX_MODE_RECEIVING)) {
184 call->app.mode = RX_MODE_ERROR;
185 MUTEX_EXIT(&call->lock);
188 if (call->app.mode == RX_MODE_SENDING) {
189 rxi_FlushWriteLocked(call);
194 code = rxi_GetNextPacket(call);
198 if (call->app.currentPacket) {
199 if (!(call->flags & RX_CALL_RECEIVE_DONE)) {
200 if (call->nHardAcks > (u_short) rxi_HardAckRate) {
201 rxi_CancelDelayedAckEvent(call);
202 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
204 /* Delay to consolidate ack packets */
205 rxi_PostDelayedAckEvent(call, &rx_hardAckDelay);
212 * If we reach this point either we have no packets in the
213 * receive queue or the next packet in the queue is not the
214 * one we are looking for. There is nothing else for us to
215 * do but wait for another packet to arrive.
218 /* Are there ever going to be any more packets? */
219 if (call->flags & RX_CALL_RECEIVE_DONE) {
220 MUTEX_EXIT(&call->lock);
221 return requestCount - nbytes;
223 /* Wait for in-sequence packet */
224 call->flags |= RX_CALL_READER_WAIT;
226 call->startWait = clock_Sec();
227 while (call->flags & RX_CALL_READER_WAIT) {
228 #ifdef RX_ENABLE_LOCKS
229 CV_WAIT(&call->cv_rq, &call->lock);
231 osi_rxSleep(&call->rq);
236 #ifdef RX_ENABLE_LOCKS
238 MUTEX_EXIT(&call->lock);
241 #endif /* RX_ENABLE_LOCKS */
243 MUTEX_EXIT(&call->lock);
245 /* osi_Assert(cp); */
246 /* MTUXXX this should be replaced by some error-recovery code before shipping */
247 /* yes, the following block is allowed to be the ELSE clause (or not) */
248 /* It's possible for call->app.nLeft to be smaller than any particular
249 * iov_len. Usually, recvmsg doesn't change the iov_len, since it
250 * reflects the size of the buffer. We have to keep track of the
251 * number of bytes read in the length field of the packet struct. On
252 * the final portion of a received packet, it's almost certain that
253 * call->app.nLeft will be smaller than the final buffer. */
254 while (nbytes && call->app.currentPacket) {
255 t = MIN((int)call->app.curlen, nbytes);
256 t = MIN(t, (int)call->app.nLeft);
257 memcpy(buf, call->app.curpos, t);
260 call->app.curpos += t;
261 call->app.curlen -= t;
262 call->app.nLeft -= t;
264 if (!call->app.nLeft) {
265 /* out of packet. Get another one. */
266 #ifdef RX_TRACK_PACKETS
267 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
269 rxi_FreePacket(call->app.currentPacket);
270 call->app.currentPacket = NULL;
271 } else if (!call->app.curlen) {
272 /* need to get another struct iov */
273 if (++call->app.curvec >= call->app.currentPacket->niovecs) {
274 /* current packet is exhausted, get ready for another */
275 /* don't worry about curvec and stuff, they get set somewhere else */
276 #ifdef RX_TRACK_PACKETS
277 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
279 rxi_FreePacket(call->app.currentPacket);
280 call->app.currentPacket = NULL;
284 call->app.currentPacket->wirevec[call->app.curvec].iov_base;
286 call->app.currentPacket->wirevec[call->app.curvec].iov_len;
291 /* user buffer is full, return */
301 rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
306 /* Free any packets from the last call to ReadvProc/WritevProc */
307 if (!opr_queue_IsEmpty(&call->app.iovq)) {
308 #ifdef RXDEBUG_PACKET
310 #endif /* RXDEBUG_PACKET */
311 rxi_FreePackets(0, &call->app.iovq);
315 * Most common case, all of the data is in the current iovec.
316 * We are relying on nLeft being zero unless the call is in receive mode.
318 if (!call->error && call->app.curlen > nbytes && call->app.nLeft > nbytes) {
319 memcpy(buf, call->app.curpos, nbytes);
321 call->app.curpos += nbytes;
322 call->app.curlen -= nbytes;
323 call->app.nLeft -= nbytes;
325 if (!call->app.nLeft && call->app.currentPacket != NULL) {
326 /* out of packet. Get another one. */
327 rxi_FreePacket(call->app.currentPacket);
328 call->app.currentPacket = NULL;
334 bytes = rxi_ReadProc(call, buf, nbytes);
339 /* Optimization for unmarshalling 32 bit integers */
341 rx_ReadProc32(struct rx_call *call, afs_int32 * value)
346 /* Free any packets from the last call to ReadvProc/WritevProc */
347 if (!opr_queue_IsEmpty(&call->app.iovq)) {
348 #ifdef RXDEBUG_PACKET
350 #endif /* RXDEBUG_PACKET */
351 rxi_FreePackets(0, &call->app.iovq);
355 * Most common case, all of the data is in the current iovec.
356 * We are relying on nLeft being zero unless the call is in receive mode.
358 if (!call->error && call->app.curlen >= sizeof(afs_int32)
359 && call->app.nLeft >= sizeof(afs_int32)) {
361 memcpy((char *)value, call->app.curpos, sizeof(afs_int32));
363 call->app.curpos += sizeof(afs_int32);
364 call->app.curlen -= sizeof(afs_int32);
365 call->app.nLeft -= sizeof(afs_int32);
367 if (!call->app.nLeft && call->app.currentPacket != NULL) {
368 /* out of packet. Get another one. */
369 rxi_FreePacket(call->app.currentPacket);
370 call->app.currentPacket = NULL;
372 return sizeof(afs_int32);
376 bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
384 * Uses packets in the receive queue to fill in as much of the
385 * current iovec as possible. Does not block if it runs out
386 * of packets to complete the iovec. Return true if an ack packet
387 * was sent, otherwise return false */
389 rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
395 struct iovec *call_iov;
396 struct iovec *cur_iov = NULL;
398 if (call->app.currentPacket) {
399 cur_iov = &call->app.currentPacket->wirevec[call->app.curvec];
401 call_iov = &call->iov[call->iovNext];
403 while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) {
404 if (call->app.nLeft == 0) {
405 /* Get next packet */
406 code = rxi_GetNextPacket(call);
408 MUTEX_ENTER(&call->lock);
412 if (call->app.currentPacket) {
413 cur_iov = &call->app.currentPacket->wirevec[1];
421 /* It's possible for call->app.nLeft to be smaller than any particular
422 * iov_len. Usually, recvmsg doesn't change the iov_len, since it
423 * reflects the size of the buffer. We have to keep track of the
424 * number of bytes read in the length field of the packet struct. On
425 * the final portion of a received packet, it's almost certain that
426 * call->app.nLeft will be smaller than the final buffer. */
427 while (call->iovNBytes
428 && call->iovNext < call->iovMax
429 && call->app.currentPacket) {
431 t = MIN((int)call->app.curlen, call->iovNBytes);
432 t = MIN(t, (int)call->app.nLeft);
433 call_iov->iov_base = call->app.curpos;
434 call_iov->iov_len = t;
437 call->iovNBytes -= t;
438 call->app.curpos += t;
439 call->app.curlen -= t;
440 call->app.nLeft -= t;
442 if (!call->app.nLeft) {
443 /* out of packet. Get another one. */
444 #ifdef RX_TRACK_PACKETS
445 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
446 call->app.currentPacket->flags |= RX_PKTFLAG_IOVQ;
448 opr_queue_Append(&call->app.iovq,
449 &call->app.currentPacket->entry);
450 #ifdef RXDEBUG_PACKET
452 #endif /* RXDEBUG_PACKET */
453 call->app.currentPacket = NULL;
454 } else if (!call->app.curlen) {
455 /* need to get another struct iov */
456 if (++call->app.curvec >= call->app.currentPacket->niovecs) {
457 /* current packet is exhausted, get ready for another */
458 /* don't worry about curvec and stuff, they get set somewhere else */
459 #ifdef RX_TRACK_PACKETS
460 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
461 call->app.currentPacket->flags |= RX_PKTFLAG_IOVQ;
463 opr_queue_Append(&call->app.iovq,
464 &call->app.currentPacket->entry);
465 #ifdef RXDEBUG_PACKET
467 #endif /* RXDEBUG_PACKET */
468 call->app.currentPacket = NULL;
472 call->app.curpos = (char *)cur_iov->iov_base;
473 call->app.curlen = cur_iov->iov_len;
479 /* If we consumed any packets then check whether we need to
480 * send a hard ack. */
481 if (didConsume && (!(call->flags & RX_CALL_RECEIVE_DONE))) {
482 if (call->nHardAcks > (u_short) rxi_HardAckRate) {
483 rxi_CancelDelayedAckEvent(call);
484 rxi_SendAck(call, 0, serial, RX_ACK_DELAY, 0);
487 /* Delay to consolidate ack packets */
488 rxi_PostDelayedAckEvent(call, &rx_hardAckDelay);
495 /* rxi_ReadvProc -- internal version.
497 * Fills in an iovec with pointers to the packet buffers. All packets
498 * except the last packet (new current packet) are moved to the iovq
499 * while the application is processing the data.
501 * LOCKS USED -- called at netpri.
504 rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
509 /* Free any packets from the last call to ReadvProc/WritevProc */
510 if (!opr_queue_IsEmpty(&call->app.iovq)) {
511 #ifdef RXDEBUG_PACKET
513 #endif /* RXDEBUG_PACKET */
514 rxi_FreePackets(0, &call->app.iovq);
517 if (call->app.mode == RX_MODE_SENDING) {
518 rxi_FlushWrite(call);
521 MUTEX_ENTER(&call->lock);
525 /* Get whatever data is currently available in the receive queue.
526 * If rxi_FillReadVec sends an ack packet then it is possible
527 * that we will receive more data while we drop the call lock
528 * to send the packet. Set the RX_CALL_IOVEC_WAIT flag
529 * here to avoid a race with the receive thread if we send
530 * hard acks in rxi_FillReadVec. */
531 call->flags |= RX_CALL_IOVEC_WAIT;
532 call->iovNBytes = nbytes;
533 call->iovMax = maxio;
536 rxi_FillReadVec(call, 0);
538 /* if we need more data then sleep until the receive thread has
539 * filled in the rest. */
540 if (!call->error && call->iovNBytes && call->iovNext < call->iovMax
541 && !(call->flags & RX_CALL_RECEIVE_DONE)) {
542 call->flags |= RX_CALL_READER_WAIT;
544 call->startWait = clock_Sec();
545 while (call->flags & RX_CALL_READER_WAIT) {
546 #ifdef RX_ENABLE_LOCKS
547 CV_WAIT(&call->cv_rq, &call->lock);
549 osi_rxSleep(&call->rq);
554 call->flags &= ~RX_CALL_IOVEC_WAIT;
560 *nio = call->iovNext;
561 bytes = nbytes - call->iovNBytes;
562 MUTEX_EXIT(&call->lock);
566 MUTEX_EXIT(&call->lock);
567 call->app.mode = RX_MODE_ERROR;
572 rx_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
579 bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
584 /* rxi_WriteProc -- internal version.
586 * LOCKS USED -- called at netpri
590 rxi_WriteProc(struct rx_call *call, char *buf,
593 struct rx_connection *conn = call->conn;
595 int requestCount = nbytes;
597 /* Free any packets from the last call to ReadvProc/WritevProc */
598 if (!opr_queue_IsEmpty(&call->app.iovq)) {
599 #ifdef RXDEBUG_PACKET
601 #endif /* RXDEBUG_PACKET */
602 rxi_FreePackets(0, &call->app.iovq);
605 if (call->app.mode != RX_MODE_SENDING) {
606 if ((conn->type == RX_SERVER_CONNECTION)
607 && (call->app.mode == RX_MODE_RECEIVING)) {
608 call->app.mode = RX_MODE_SENDING;
609 if (call->app.currentPacket) {
610 #ifdef RX_TRACK_PACKETS
611 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
613 rxi_FreePacket(call->app.currentPacket);
614 call->app.currentPacket = NULL;
623 /* Loop condition is checked at end, so that a write of 0 bytes
624 * will force a packet to be created--specially for the case where
625 * there are 0 bytes on the stream, but we must send a packet
628 if (call->app.nFree == 0) {
629 MUTEX_ENTER(&call->lock);
631 call->app.mode = RX_MODE_ERROR;
632 if (!call->error && call->app.currentPacket) {
633 clock_NewTime(); /* Bogus: need new time package */
634 /* The 0, below, specifies that it is not the last packet:
635 * there will be others. PrepareSendPacket may
636 * alter the packet length by up to
637 * conn->securityMaxTrailerSize */
638 call->app.bytesSent += call->app.currentPacket->length;
639 rxi_PrepareSendPacket(call, call->app.currentPacket, 0);
640 /* PrepareSendPacket drops the call lock */
641 rxi_WaitforTQBusy(call);
642 #ifdef RX_TRACK_PACKETS
643 call->app.currentPacket->flags |= RX_PKTFLAG_TQ;
645 opr_queue_Append(&call->tq,
646 &call->app.currentPacket->entry);
647 #ifdef RXDEBUG_PACKET
649 #endif /* RXDEBUG_PACKET */
650 #ifdef RX_TRACK_PACKETS
651 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
653 call->app.currentPacket = NULL;
655 /* If the call is in recovery, let it exhaust its current
656 * retransmit queue before forcing it to send new packets
658 if (!(call->flags & (RX_CALL_FAST_RECOVER))) {
661 } else if (call->app.currentPacket) {
662 #ifdef RX_TRACK_PACKETS
663 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
665 rxi_FreePacket(call->app.currentPacket);
666 call->app.currentPacket = NULL;
668 /* Wait for transmit window to open up */
670 && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
672 call->startWait = clock_Sec();
674 #ifdef RX_ENABLE_LOCKS
675 CV_WAIT(&call->cv_twind, &call->lock);
677 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
678 osi_rxSleep(&call->twind);
682 #ifdef RX_ENABLE_LOCKS
684 call->app.mode = RX_MODE_ERROR;
685 MUTEX_EXIT(&call->lock);
688 #endif /* RX_ENABLE_LOCKS */
690 if ((call->app.currentPacket = rxi_AllocSendPacket(call, nbytes))) {
691 #ifdef RX_TRACK_PACKETS
692 call->app.currentPacket->flags |= RX_PKTFLAG_CP;
694 call->app.nFree = call->app.currentPacket->length;
695 call->app.curvec = 1; /* 0th vec is always header */
696 /* begin at the beginning [ more or less ], continue
697 * on until the end, then stop. */
699 (char *) call->app.currentPacket->wirevec[1].iov_base +
700 call->conn->securityHeaderSize;
702 call->app.currentPacket->wirevec[1].iov_len -
703 call->conn->securityHeaderSize;
706 call->app.mode = RX_MODE_ERROR;
707 if (call->app.currentPacket) {
708 #ifdef RX_TRACK_PACKETS
709 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
711 rxi_FreePacket(call->app.currentPacket);
712 call->app.currentPacket = NULL;
714 MUTEX_EXIT(&call->lock);
717 MUTEX_EXIT(&call->lock);
720 if (call->app.currentPacket && (int)call->app.nFree < nbytes) {
721 /* Try to extend the current buffer */
723 len = call->app.currentPacket->length;
724 mud = rx_MaxUserDataSize(call);
727 want = MIN(nbytes - (int)call->app.nFree, mud - len);
728 rxi_AllocDataBuf(call->app.currentPacket, want,
729 RX_PACKET_CLASS_SEND_CBUF);
730 if (call->app.currentPacket->length > (unsigned)mud)
731 call->app.currentPacket->length = mud;
732 call->app.nFree += (call->app.currentPacket->length - len);
736 /* If the remaining bytes fit in the buffer, then store them
737 * and return. Don't ship a buffer that's full immediately to
738 * the peer--we don't know if it's the last buffer yet */
740 if (!call->app.currentPacket) {
744 while (nbytes && call->app.nFree) {
746 t = MIN((int)call->app.curlen, nbytes);
747 t = MIN((int)call->app.nFree, t);
748 memcpy(call->app.curpos, buf, t);
751 call->app.curpos += t;
752 call->app.curlen -= (u_short)t;
753 call->app.nFree -= (u_short)t;
755 if (!call->app.curlen) {
756 /* need to get another struct iov */
757 if (++call->app.curvec >= call->app.currentPacket->niovecs) {
758 /* current packet is full, extend or send it */
762 call->app.currentPacket->wirevec[call->app.curvec].iov_base;
764 call->app.currentPacket->wirevec[call->app.curvec].iov_len;
767 } /* while bytes to send and room to send them */
769 /* might be out of space now */
773 /* more data to send, so get another packet and keep going */
777 return requestCount - nbytes;
781 rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
789 /* Free any packets from the last call to ReadvProc/WritevProc */
790 if (!opr_queue_IsEmpty(&call->app.iovq)) {
791 #ifdef RXDEBUG_PACKET
793 #endif /* RXDEBUG_PACKET */
794 rxi_FreePackets(0, &call->app.iovq);
798 * Most common case: all of the data fits in the current iovec.
799 * We are relying on nFree being zero unless the call is in send mode.
801 tcurlen = (int)call->app.curlen;
802 tnFree = (int)call->app.nFree;
803 if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
804 tcurpos = call->app.curpos;
806 memcpy(tcurpos, buf, nbytes);
807 call->app.curpos = tcurpos + nbytes;
808 call->app.curlen = (u_short)(tcurlen - nbytes);
809 call->app.nFree = (u_short)(tnFree - nbytes);
814 bytes = rxi_WriteProc(call, buf, nbytes);
819 /* Optimization for marshalling 32 bit arguments */
821 rx_WriteProc32(struct rx_call *call, afs_int32 * value)
829 if (!opr_queue_IsEmpty(&call->app.iovq)) {
830 #ifdef RXDEBUG_PACKET
832 #endif /* RXDEBUG_PACKET */
833 rxi_FreePackets(0, &call->app.iovq);
837 * Most common case: all of the data fits in the current iovec.
838 * We are relying on nFree being zero unless the call is in send mode.
840 tcurlen = call->app.curlen;
841 tnFree = call->app.nFree;
842 if (!call->error && tcurlen >= sizeof(afs_int32)
843 && tnFree >= sizeof(afs_int32)) {
844 tcurpos = call->app.curpos;
846 if (!((size_t)tcurpos & (sizeof(afs_int32) - 1))) {
847 *((afs_int32 *) (tcurpos)) = *value;
849 memcpy(tcurpos, (char *)value, sizeof(afs_int32));
851 call->app.curpos = tcurpos + sizeof(afs_int32);
852 call->app.curlen = (u_short)(tcurlen - sizeof(afs_int32));
853 call->app.nFree = (u_short)(tnFree - sizeof(afs_int32));
854 return sizeof(afs_int32);
858 bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
863 /* rxi_WritevAlloc -- internal version.
865 * Fill in an iovec to point to data in packet buffers. The application
866 * calls rxi_WritevProc when the buffers are full.
868 * LOCKS USED -- called at netpri.
872 rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
875 struct rx_connection *conn = call->conn;
876 struct rx_packet *cp;
879 /* Temporary values, real work is done in rxi_WritevProc */
881 unsigned int tcurvec;
885 requestCount = nbytes;
888 /* Free any packets from the last call to ReadvProc/WritevProc */
889 if (!opr_queue_IsEmpty(&call->app.iovq)) {
890 #ifdef RXDEBUG_PACKET
892 #endif /* RXDEBUG_PACKET */
893 rxi_FreePackets(0, &call->app.iovq);
896 if (call->app.mode != RX_MODE_SENDING) {
897 if ((conn->type == RX_SERVER_CONNECTION)
898 && (call->app.mode == RX_MODE_RECEIVING)) {
899 call->app.mode = RX_MODE_SENDING;
900 if (call->app.currentPacket) {
901 #ifdef RX_TRACK_PACKETS
902 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
904 rxi_FreePacket(call->app.currentPacket);
905 call->app.currentPacket = NULL;
914 /* Set up the iovec to point to data in packet buffers. */
915 tnFree = call->app.nFree;
916 tcurvec = call->app.curvec;
917 tcurpos = call->app.curpos;
918 tcurlen = call->app.curlen;
919 cp = call->app.currentPacket;
924 /* current packet is full, allocate a new one */
925 MUTEX_ENTER(&call->lock);
926 cp = rxi_AllocSendPacket(call, nbytes);
927 MUTEX_EXIT(&call->lock);
929 /* out of space, return what we have */
931 return requestCount - nbytes;
933 #ifdef RX_TRACK_PACKETS
934 cp->flags |= RX_PKTFLAG_IOVQ;
936 opr_queue_Append(&call->app.iovq, &cp->entry);
937 #ifdef RXDEBUG_PACKET
939 #endif /* RXDEBUG_PACKET */
943 (char *)cp->wirevec[1].iov_base +
944 call->conn->securityHeaderSize;
945 tcurlen = cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
948 if (tnFree < nbytes) {
949 /* try to extend the current packet */
952 mud = rx_MaxUserDataSize(call);
955 want = MIN(nbytes - tnFree, mud - len);
956 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
957 if (cp->length > (unsigned)mud)
959 tnFree += (cp->length - len);
960 if (cp == call->app.currentPacket) {
961 call->app.nFree += (cp->length - len);
966 /* fill in the next entry in the iovec */
967 t = MIN(tcurlen, nbytes);
969 iov[nextio].iov_base = tcurpos;
970 iov[nextio].iov_len = t;
978 /* need to get another struct iov */
979 if (++tcurvec >= cp->niovecs) {
980 /* current packet is full, extend it or move on to next packet */
983 tcurpos = (char *)cp->wirevec[tcurvec].iov_base;
984 tcurlen = cp->wirevec[tcurvec].iov_len;
987 } while (nbytes && nextio < maxio);
989 return requestCount - nbytes;
993 rx_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
1000 bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
1005 /* rxi_WritevProc -- internal version.
1007 * Send buffers allocated in rxi_WritevAlloc.
1009 * LOCKS USED -- called at netpri.
1012 rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1014 #ifdef RX_TRACK_PACKETS
1015 struct opr_queue *cursor;
1019 struct opr_queue tmpq;
1020 #ifdef RXDEBUG_PACKET
1024 requestCount = nbytes;
1026 MUTEX_ENTER(&call->lock);
1028 call->app.mode = RX_MODE_ERROR;
1029 } else if (call->app.mode != RX_MODE_SENDING) {
1030 call->error = RX_PROTOCOL_ERROR;
1032 rxi_WaitforTQBusy(call);
1035 call->app.mode = RX_MODE_ERROR;
1036 MUTEX_EXIT(&call->lock);
1037 if (call->app.currentPacket) {
1038 #ifdef RX_TRACK_PACKETS
1039 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
1040 call->app.currentPacket->flags |= RX_PKTFLAG_IOVQ;
1042 opr_queue_Prepend(&call->app.iovq,
1043 &call->app.currentPacket->entry);
1044 #ifdef RXDEBUG_PACKET
1046 #endif /* RXDEBUG_PACKET */
1047 call->app.currentPacket = NULL;
1049 #ifdef RXDEBUG_PACKET
1051 #endif /* RXDEBUG_PACKET */
1052 rxi_FreePackets(0, &call->app.iovq);
1056 /* Loop through the I/O vector adjusting packet pointers.
1057 * Place full packets back onto the iovq once they are ready
1058 * to send. Set RX_PROTOCOL_ERROR if any problems are found in
1059 * the iovec. We put the loop condition at the end to ensure that
1060 * a zero length write will push a short packet. */
1061 opr_queue_Init(&tmpq);
1062 #ifdef RXDEBUG_PACKET
1064 #endif /* RXDEBUG_PACKET */
1066 if (call->app.nFree == 0 && call->app.currentPacket) {
1067 clock_NewTime(); /* Bogus: need new time package */
1068 /* The 0, below, specifies that it is not the last packet:
1069 * there will be others. PrepareSendPacket may
1070 * alter the packet length by up to
1071 * conn->securityMaxTrailerSize */
1072 call->app.bytesSent += call->app.currentPacket->length;
1073 rxi_PrepareSendPacket(call, call->app.currentPacket, 0);
1074 /* PrepareSendPacket drops the call lock */
1075 rxi_WaitforTQBusy(call);
1076 opr_queue_Append(&tmpq, &call->app.currentPacket->entry);
1077 #ifdef RXDEBUG_PACKET
1079 #endif /* RXDEBUG_PACKET */
1080 call->app.currentPacket = NULL;
1082 /* The head of the iovq is now the current packet */
1084 if (opr_queue_IsEmpty(&call->app.iovq)) {
1085 MUTEX_EXIT(&call->lock);
1086 call->error = RX_PROTOCOL_ERROR;
1087 #ifdef RXDEBUG_PACKET
1089 #endif /* RXDEBUG_PACKET */
1090 rxi_FreePackets(0, &tmpq);
1093 call->app.currentPacket
1094 = opr_queue_First(&call->app.iovq, struct rx_packet,
1096 opr_queue_Remove(&call->app.currentPacket->entry);
1097 #ifdef RX_TRACK_PACKETS
1098 call->app.currentPacket->flags &= ~RX_PKTFLAG_IOVQ;
1099 call->app.currentPacket->flags |= RX_PKTFLAG_CP;
1101 #ifdef RXDEBUG_PACKET
1103 #endif /* RXDEBUG_PACKET */
1104 call->app.nFree = call->app.currentPacket->length;
1105 call->app.curvec = 1;
1107 (char *) call->app.currentPacket->wirevec[1].iov_base +
1108 call->conn->securityHeaderSize;
1110 call->app.currentPacket->wirevec[1].iov_len -
1111 call->conn->securityHeaderSize;
1116 /* The next iovec should point to the current position */
1117 if (iov[nextio].iov_base != call->app.curpos
1118 || iov[nextio].iov_len > (int)call->app.curlen) {
1119 call->error = RX_PROTOCOL_ERROR;
1120 MUTEX_EXIT(&call->lock);
1121 if (call->app.currentPacket) {
1122 #ifdef RX_TRACK_PACKETS
1123 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
1125 opr_queue_Prepend(&tmpq,
1126 &call->app.currentPacket->entry);
1127 #ifdef RXDEBUG_PACKET
1129 #endif /* RXDEBUG_PACKET */
1130 call->app.currentPacket = NULL;
1132 #ifdef RXDEBUG_PACKET
1134 #endif /* RXDEBUG_PACKET */
1135 rxi_FreePackets(0, &tmpq);
1138 nbytes -= iov[nextio].iov_len;
1139 call->app.curpos += iov[nextio].iov_len;
1140 call->app.curlen -= iov[nextio].iov_len;
1141 call->app.nFree -= iov[nextio].iov_len;
1143 if (call->app.curlen == 0) {
1144 if (++call->app.curvec > call->app.currentPacket->niovecs) {
1145 call->app.nFree = 0;
1148 call->app.currentPacket->wirevec[call->app.curvec].iov_base;
1150 call->app.currentPacket->wirevec[call->app.curvec].iov_len;
1154 } while (nbytes && nextio < nio);
1156 /* Move the packets from the temporary queue onto the transmit queue.
1157 * We may end up with more than call->twind packets on the queue. */
1159 #ifdef RX_TRACK_PACKETS
1160 for (opr_queue_Scan(&tmpq, cursor))
1162 struct rx_packet *p = opr_queue_Entry(cursor, struct rx_packet, entry);
1163 p->flags |= RX_PKTFLAG_TQ;
1167 call->app.mode = RX_MODE_ERROR;
1169 opr_queue_SpliceAppend(&call->tq, &tmpq);
1171 /* If the call is in recovery, let it exhaust its current retransmit
1172 * queue before forcing it to send new packets
1174 if (!(call->flags & RX_CALL_FAST_RECOVER)) {
1178 /* Wait for the length of the transmit queue to fall below call->twind */
1179 while (!call->error && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
1181 call->startWait = clock_Sec();
1182 #ifdef RX_ENABLE_LOCKS
1183 CV_WAIT(&call->cv_twind, &call->lock);
1185 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
1186 osi_rxSleep(&call->twind);
1188 call->startWait = 0;
1192 call->app.mode = RX_MODE_ERROR;
1193 call->app.currentPacket = NULL;
1194 MUTEX_EXIT(&call->lock);
1195 if (call->app.currentPacket) {
1196 #ifdef RX_TRACK_PACKETS
1197 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
1199 rxi_FreePacket(call->app.currentPacket);
1203 MUTEX_EXIT(&call->lock);
1205 return requestCount - nbytes;
1209 rx_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1215 bytes = rxi_WritevProc(call, iov, nio, nbytes);
1220 /* Flush any buffered data to the stream, switch to read mode
1221 * (clients) or to EOF mode (servers). If 'locked' is nonzero, call->lock must
1224 * LOCKS HELD: called at netpri.
1227 FlushWrite(struct rx_call *call, int locked)
1229 struct rx_packet *cp = NULL;
1231 /* Free any packets from the last call to ReadvProc/WritevProc */
1232 if (!opr_queue_IsEmpty(&call->app.iovq)) {
1233 #ifdef RXDEBUG_PACKET
1235 #endif /* RXDEBUG_PACKET */
1236 rxi_FreePackets(0, &call->app.iovq);
1239 if (call->app.mode == RX_MODE_SENDING) {
1242 (call->conn->type ==
1243 RX_CLIENT_CONNECTION ? RX_MODE_RECEIVING : RX_MODE_EOF);
1245 #ifdef RX_KERNEL_TRACE
1247 int glockOwner = ISAFS_GLOCK();
1250 afs_Trace3(afs_iclSetp, CM_TRACE_WASHERE, ICL_TYPE_STRING,
1251 __FILE__, ICL_TYPE_INT32, __LINE__, ICL_TYPE_POINTER,
1259 MUTEX_ENTER(&call->lock);
1263 call->app.mode = RX_MODE_ERROR;
1265 call->flags |= RX_CALL_FLUSH;
1267 cp = call->app.currentPacket;
1270 /* cp->length is only supposed to be the user's data */
1271 /* cp->length was already set to (then-current)
1272 * MaxUserDataSize or less. */
1273 #ifdef RX_TRACK_PACKETS
1274 cp->flags &= ~RX_PKTFLAG_CP;
1276 cp->length -= call->app.nFree;
1277 call->app.currentPacket = NULL;
1278 call->app.nFree = 0;
1280 cp = rxi_AllocSendPacket(call, 0);
1282 /* Mode can no longer be MODE_SENDING */
1286 cp->niovecs = 2; /* header + space for rxkad stuff */
1287 call->app.nFree = 0;
1290 /* The 1 specifies that this is the last packet */
1291 call->app.bytesSent += cp->length;
1292 rxi_PrepareSendPacket(call, cp, 1);
1293 /* PrepareSendPacket drops the call lock */
1294 rxi_WaitforTQBusy(call);
1295 #ifdef RX_TRACK_PACKETS
1296 cp->flags |= RX_PKTFLAG_TQ;
1298 opr_queue_Append(&call->tq, &cp->entry);
1299 #ifdef RXDEBUG_PACKET
1301 #endif /* RXDEBUG_PACKET */
1303 /* If the call is in recovery, let it exhaust its current retransmit
1304 * queue before forcing it to send new packets
1306 if (!(call->flags & RX_CALL_FAST_RECOVER)) {
1310 MUTEX_EXIT(&call->lock);
1316 rxi_FlushWrite(struct rx_call *call)
1318 FlushWrite(call, 0);
1322 rxi_FlushWriteLocked(struct rx_call *call)
1324 FlushWrite(call, 1);
1327 /* Flush any buffered data to the stream, switch to read mode
1328 * (clients) or to EOF mode (servers) */
1330 rx_FlushWrite(struct rx_call *call)
1334 FlushWrite(call, 0);