2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
15 # ifdef RX_KERNEL_TRACE
16 # include "rx_kcommon.h"
18 # if defined(AFS_DARWIN_ENV) || defined(AFS_XBSD_ENV)
19 # include "afs/sysincludes.h"
24 # if defined(AFS_AIX_ENV) || defined(AFS_AUX_ENV) || defined(AFS_SUN5_ENV)
28 # include <net/net_globals.h>
29 # endif /* AFS_OSF_ENV */
30 # ifdef AFS_LINUX20_ENV
31 # include "h/socket.h"
33 # include "netinet/in.h"
34 # if defined(AFS_SGI_ENV)
35 # include "afs/sysincludes.h"
38 # include "afs/afs_args.h"
39 # if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
43 # include "afs/sysincludes.h"
44 # endif /* !UKERNEL */
47 # undef RXDEBUG /* turn off debugging */
50 # include "afs/afs_osi.h"
51 # include "rx_kmutex.h"
52 # include "rx/rx_kernel.h"
53 # include "afs/lock.h"
61 #include "rx_globals.h"
62 #include "rx_atomic.h"
63 #include "rx_internal.h"
66 #include "rx_packet.h"
69 /* rxdb_fileID is used to identify the lock location, along with line#. */
70 static int rxdb_fileID = RXDB_FILE_RX_RDWR;
71 #endif /* RX_LOCKS_DB */
73 /* Get the next packet in the receive queue
75 * Dispose of the call's currentPacket, and move the next packet in the
76 * receive queue into the currentPacket field. If the next packet isn't
77 * available, then currentPacket is left NULL.
80 * The RX call to manipulate
82 * 0 on success, an error code on failure
85 * Must be called with the call locked. Unlocks the call if returning
90 rxi_GetNextPacket(struct rx_call *call) {
94 if (call->app.currentPacket != NULL) {
95 #ifdef RX_TRACK_PACKETS
96 call->app.currentPacket->flags |= RX_PKTFLAG_CP;
98 rxi_FreePacket(call->app.currentPacket);
99 call->app.currentPacket = NULL;
102 if (opr_queue_IsEmpty(&call->rq))
105 /* Check that next packet available is next in sequence */
106 rp = opr_queue_First(&call->rq, struct rx_packet, entry);
107 if (rp->header.seq != call->rnext)
110 opr_queue_Remove(&rp->entry);
111 #ifdef RX_TRACK_PACKETS
112 rp->flags &= ~RX_PKTFLAG_RQ;
114 #ifdef RXDEBUG_PACKET
116 #endif /* RXDEBUG_PACKET */
118 /* RXS_CheckPacket called to undo RXS_PreparePacket's work. It may
119 * reduce the length of the packet by up to conn->maxTrailerSize,
120 * to reflect the length of the data + the header. */
121 if ((error = RXS_CheckPacket(call->conn->securityObject, call, rp))) {
122 /* Used to merely shut down the call, but now we shut down the whole
123 * connection since this may indicate an attempt to hijack it */
125 MUTEX_EXIT(&call->lock);
126 rxi_ConnectionError(call->conn, error);
127 MUTEX_ENTER(&call->conn->conn_data_lock);
128 rp = rxi_SendConnectionAbort(call->conn, rp, 0, 0);
129 MUTEX_EXIT(&call->conn->conn_data_lock);
136 call->app.currentPacket = rp;
137 #ifdef RX_TRACK_PACKETS
138 call->app.currentPacket->flags |= RX_PKTFLAG_CP;
140 call->app.curvec = 1; /* 0th vec is always header */
142 /* begin at the beginning [ more or less ], continue on until the end,
144 call->app.curpos = (char *)call->app.currentPacket->wirevec[1].iov_base +
145 call->conn->securityHeaderSize;
146 call->app.curlen = call->app.currentPacket->wirevec[1].iov_len -
147 call->conn->securityHeaderSize;
149 call->app.nLeft = call->app.currentPacket->length;
150 call->app.bytesRcvd += call->app.currentPacket->length;
157 /* rxi_ReadProc -- internal version.
159 * LOCKS USED -- called at netpri
162 rxi_ReadProc(struct rx_call *call, char *buf,
169 /* XXXX took out clock_NewTime from here. Was it needed? */
170 requestCount = nbytes;
172 /* Free any packets from the last call to ReadvProc/WritevProc */
173 if (!opr_queue_IsEmpty(&call->app.iovq)) {
174 #ifdef RXDEBUG_PACKET
176 #endif /* RXDEBUG_PACKET */
177 rxi_FreePackets(0, &call->app.iovq);
181 if (call->app.nLeft == 0) {
182 /* Get next packet */
183 MUTEX_ENTER(&call->lock);
185 if (call->error || (call->app.mode != RX_MODE_RECEIVING)) {
187 call->app.mode = RX_MODE_ERROR;
188 MUTEX_EXIT(&call->lock);
191 if (call->app.mode == RX_MODE_SENDING) {
192 MUTEX_EXIT(&call->lock);
193 rxi_FlushWrite(call);
194 MUTEX_ENTER(&call->lock);
199 code = rxi_GetNextPacket(call);
203 if (call->app.currentPacket) {
204 if (!(call->flags & RX_CALL_RECEIVE_DONE)) {
205 if (call->nHardAcks > (u_short) rxi_HardAckRate) {
206 rxevent_Cancel(&call->delayedAckEvent, call,
207 RX_CALL_REFCOUNT_DELAY);
208 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
210 /* Delay to consolidate ack packets */
211 rxi_PostDelayedAckEvent(call, &rx_hardAckDelay);
218 * If we reach this point either we have no packets in the
219 * receive queue or the next packet in the queue is not the
220 * one we are looking for. There is nothing else for us to
221 * do but wait for another packet to arrive.
224 /* Are there ever going to be any more packets? */
225 if (call->flags & RX_CALL_RECEIVE_DONE) {
226 MUTEX_EXIT(&call->lock);
227 return requestCount - nbytes;
229 /* Wait for in-sequence packet */
230 call->flags |= RX_CALL_READER_WAIT;
232 call->startWait = clock_Sec();
233 while (call->flags & RX_CALL_READER_WAIT) {
234 #ifdef RX_ENABLE_LOCKS
235 CV_WAIT(&call->cv_rq, &call->lock);
237 osi_rxSleep(&call->rq);
242 #ifdef RX_ENABLE_LOCKS
244 MUTEX_EXIT(&call->lock);
247 #endif /* RX_ENABLE_LOCKS */
249 MUTEX_EXIT(&call->lock);
251 /* osi_Assert(cp); */
252 /* MTUXXX this should be replaced by some error-recovery code before shipping */
253 /* yes, the following block is allowed to be the ELSE clause (or not) */
254 /* It's possible for call->app.nLeft to be smaller than any particular
255 * iov_len. Usually, recvmsg doesn't change the iov_len, since it
256 * reflects the size of the buffer. We have to keep track of the
257 * number of bytes read in the length field of the packet struct. On
258 * the final portion of a received packet, it's almost certain that
259 * call->app.nLeft will be smaller than the final buffer. */
260 while (nbytes && call->app.currentPacket) {
261 t = MIN((int)call->app.curlen, nbytes);
262 t = MIN(t, (int)call->app.nLeft);
263 memcpy(buf, call->app.curpos, t);
266 call->app.curpos += t;
267 call->app.curlen -= t;
268 call->app.nLeft -= t;
270 if (!call->app.nLeft) {
271 /* out of packet. Get another one. */
272 #ifdef RX_TRACK_PACKETS
273 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
275 rxi_FreePacket(call->app.currentPacket);
276 call->app.currentPacket = NULL;
277 } else if (!call->app.curlen) {
278 /* need to get another struct iov */
279 if (++call->app.curvec >= call->app.currentPacket->niovecs) {
280 /* current packet is exhausted, get ready for another */
281 /* don't worry about curvec and stuff, they get set somewhere else */
282 #ifdef RX_TRACK_PACKETS
283 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
285 rxi_FreePacket(call->app.currentPacket);
286 call->app.currentPacket = NULL;
290 call->app.currentPacket->wirevec[call->app.curvec].iov_base;
292 call->app.currentPacket->wirevec[call->app.curvec].iov_len;
297 /* user buffer is full, return */
307 rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
312 /* Free any packets from the last call to ReadvProc/WritevProc */
313 if (!opr_queue_IsEmpty(&call->app.iovq)) {
314 #ifdef RXDEBUG_PACKET
316 #endif /* RXDEBUG_PACKET */
317 rxi_FreePackets(0, &call->app.iovq);
321 * Most common case, all of the data is in the current iovec.
322 * We are relying on nLeft being zero unless the call is in receive mode.
324 if (!call->error && call->app.curlen > nbytes && call->app.nLeft > nbytes) {
325 memcpy(buf, call->app.curpos, nbytes);
327 call->app.curpos += nbytes;
328 call->app.curlen -= nbytes;
329 call->app.nLeft -= nbytes;
331 if (!call->app.nLeft && call->app.currentPacket != NULL) {
332 /* out of packet. Get another one. */
333 rxi_FreePacket(call->app.currentPacket);
334 call->app.currentPacket = NULL;
340 bytes = rxi_ReadProc(call, buf, nbytes);
345 /* Optimization for unmarshalling 32 bit integers */
347 rx_ReadProc32(struct rx_call *call, afs_int32 * value)
352 /* Free any packets from the last call to ReadvProc/WritevProc */
353 if (!opr_queue_IsEmpty(&call->app.iovq)) {
354 #ifdef RXDEBUG_PACKET
356 #endif /* RXDEBUG_PACKET */
357 rxi_FreePackets(0, &call->app.iovq);
361 * Most common case, all of the data is in the current iovec.
362 * We are relying on nLeft being zero unless the call is in receive mode.
364 if (!call->error && call->app.curlen >= sizeof(afs_int32)
365 && call->app.nLeft >= sizeof(afs_int32)) {
367 memcpy((char *)value, call->app.curpos, sizeof(afs_int32));
369 call->app.curpos += sizeof(afs_int32);
370 call->app.curlen -= sizeof(afs_int32);
371 call->app.nLeft -= sizeof(afs_int32);
373 if (!call->app.nLeft && call->app.currentPacket != NULL) {
374 /* out of packet. Get another one. */
375 rxi_FreePacket(call->app.currentPacket);
376 call->app.currentPacket = NULL;
378 return sizeof(afs_int32);
382 bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
390 * Uses packets in the receive queue to fill in as much of the
391 * current iovec as possible. Does not block if it runs out
392 * of packets to complete the iovec. Return true if an ack packet
393 * was sent, otherwise return false */
395 rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
401 struct iovec *call_iov;
402 struct iovec *cur_iov = NULL;
404 if (call->app.currentPacket) {
405 cur_iov = &call->app.currentPacket->wirevec[call->app.curvec];
407 call_iov = &call->iov[call->iovNext];
409 while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) {
410 if (call->app.nLeft == 0) {
411 /* Get next packet */
412 code = rxi_GetNextPacket(call);
414 MUTEX_ENTER(&call->lock);
418 if (call->app.currentPacket) {
419 cur_iov = &call->app.currentPacket->wirevec[1];
427 /* It's possible for call->app.nLeft to be smaller than any particular
428 * iov_len. Usually, recvmsg doesn't change the iov_len, since it
429 * reflects the size of the buffer. We have to keep track of the
430 * number of bytes read in the length field of the packet struct. On
431 * the final portion of a received packet, it's almost certain that
432 * call->app.nLeft will be smaller than the final buffer. */
433 while (call->iovNBytes
434 && call->iovNext < call->iovMax
435 && call->app.currentPacket) {
437 t = MIN((int)call->app.curlen, call->iovNBytes);
438 t = MIN(t, (int)call->app.nLeft);
439 call_iov->iov_base = call->app.curpos;
440 call_iov->iov_len = t;
443 call->iovNBytes -= t;
444 call->app.curpos += t;
445 call->app.curlen -= t;
446 call->app.nLeft -= t;
448 if (!call->app.nLeft) {
449 /* out of packet. Get another one. */
450 #ifdef RX_TRACK_PACKETS
451 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
452 call->app.currentPacket->flags |= RX_PKTFLAG_IOVQ;
454 opr_queue_Append(&call->app.iovq,
455 &call->app.currentPacket->entry);
456 #ifdef RXDEBUG_PACKET
458 #endif /* RXDEBUG_PACKET */
459 call->app.currentPacket = NULL;
460 } else if (!call->app.curlen) {
461 /* need to get another struct iov */
462 if (++call->app.curvec >= call->app.currentPacket->niovecs) {
463 /* current packet is exhausted, get ready for another */
464 /* don't worry about curvec and stuff, they get set somewhere else */
465 #ifdef RX_TRACK_PACKETS
466 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
467 call->app.currentPacket->flags |= RX_PKTFLAG_IOVQ;
469 opr_queue_Append(&call->app.iovq,
470 &call->app.currentPacket->entry);
471 #ifdef RXDEBUG_PACKET
473 #endif /* RXDEBUG_PACKET */
474 call->app.currentPacket = NULL;
478 call->app.curpos = (char *)cur_iov->iov_base;
479 call->app.curlen = cur_iov->iov_len;
485 /* If we consumed any packets then check whether we need to
486 * send a hard ack. */
487 if (didConsume && (!(call->flags & RX_CALL_RECEIVE_DONE))) {
488 if (call->nHardAcks > (u_short) rxi_HardAckRate) {
489 rxevent_Cancel(&call->delayedAckEvent, call,
490 RX_CALL_REFCOUNT_DELAY);
491 rxi_SendAck(call, 0, serial, RX_ACK_DELAY, 0);
494 /* Delay to consolidate ack packets */
495 rxi_PostDelayedAckEvent(call, &rx_hardAckDelay);
502 /* rxi_ReadvProc -- internal version.
504 * Fills in an iovec with pointers to the packet buffers. All packets
505 * except the last packet (new current packet) are moved to the iovq
506 * while the application is processing the data.
508 * LOCKS USED -- called at netpri.
511 rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
516 /* Free any packets from the last call to ReadvProc/WritevProc */
517 if (!opr_queue_IsEmpty(&call->app.iovq)) {
518 #ifdef RXDEBUG_PACKET
520 #endif /* RXDEBUG_PACKET */
521 rxi_FreePackets(0, &call->app.iovq);
524 if (call->app.mode == RX_MODE_SENDING) {
525 rxi_FlushWrite(call);
528 MUTEX_ENTER(&call->lock);
532 /* Get whatever data is currently available in the receive queue.
533 * If rxi_FillReadVec sends an ack packet then it is possible
534 * that we will receive more data while we drop the call lock
535 * to send the packet. Set the RX_CALL_IOVEC_WAIT flag
536 * here to avoid a race with the receive thread if we send
537 * hard acks in rxi_FillReadVec. */
538 call->flags |= RX_CALL_IOVEC_WAIT;
539 call->iovNBytes = nbytes;
540 call->iovMax = maxio;
543 rxi_FillReadVec(call, 0);
545 /* if we need more data then sleep until the receive thread has
546 * filled in the rest. */
547 if (!call->error && call->iovNBytes && call->iovNext < call->iovMax
548 && !(call->flags & RX_CALL_RECEIVE_DONE)) {
549 call->flags |= RX_CALL_READER_WAIT;
551 call->startWait = clock_Sec();
552 while (call->flags & RX_CALL_READER_WAIT) {
553 #ifdef RX_ENABLE_LOCKS
554 CV_WAIT(&call->cv_rq, &call->lock);
556 osi_rxSleep(&call->rq);
561 call->flags &= ~RX_CALL_IOVEC_WAIT;
567 *nio = call->iovNext;
568 bytes = nbytes - call->iovNBytes;
569 MUTEX_EXIT(&call->lock);
573 MUTEX_EXIT(&call->lock);
574 call->app.mode = RX_MODE_ERROR;
579 rx_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
586 bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
591 /* rxi_WriteProc -- internal version.
593 * LOCKS USED -- called at netpri
597 rxi_WriteProc(struct rx_call *call, char *buf,
600 struct rx_connection *conn = call->conn;
602 int requestCount = nbytes;
604 /* Free any packets from the last call to ReadvProc/WritevProc */
605 if (!opr_queue_IsEmpty(&call->app.iovq)) {
606 #ifdef RXDEBUG_PACKET
608 #endif /* RXDEBUG_PACKET */
609 rxi_FreePackets(0, &call->app.iovq);
612 if (call->app.mode != RX_MODE_SENDING) {
613 if ((conn->type == RX_SERVER_CONNECTION)
614 && (call->app.mode == RX_MODE_RECEIVING)) {
615 call->app.mode = RX_MODE_SENDING;
616 if (call->app.currentPacket) {
617 #ifdef RX_TRACK_PACKETS
618 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
620 rxi_FreePacket(call->app.currentPacket);
621 call->app.currentPacket = NULL;
630 /* Loop condition is checked at end, so that a write of 0 bytes
631 * will force a packet to be created--specially for the case where
632 * there are 0 bytes on the stream, but we must send a packet
635 if (call->app.nFree == 0) {
636 MUTEX_ENTER(&call->lock);
638 call->app.mode = RX_MODE_ERROR;
639 if (!call->error && call->app.currentPacket) {
640 clock_NewTime(); /* Bogus: need new time package */
641 /* The 0, below, specifies that it is not the last packet:
642 * there will be others. PrepareSendPacket may
643 * alter the packet length by up to
644 * conn->securityMaxTrailerSize */
645 call->app.bytesSent += call->app.currentPacket->length;
646 rxi_PrepareSendPacket(call, call->app.currentPacket, 0);
647 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
648 /* PrepareSendPacket drops the call lock */
649 rxi_WaitforTQBusy(call);
650 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
651 #ifdef RX_TRACK_PACKETS
652 call->app.currentPacket->flags |= RX_PKTFLAG_TQ;
654 opr_queue_Append(&call->tq,
655 &call->app.currentPacket->entry);
656 #ifdef RXDEBUG_PACKET
658 #endif /* RXDEBUG_PACKET */
659 #ifdef RX_TRACK_PACKETS
660 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
662 call->app.currentPacket = NULL;
664 /* If the call is in recovery, let it exhaust its current
665 * retransmit queue before forcing it to send new packets
667 if (!(call->flags & (RX_CALL_FAST_RECOVER))) {
670 } else if (call->app.currentPacket) {
671 #ifdef RX_TRACK_PACKETS
672 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
674 rxi_FreePacket(call->app.currentPacket);
675 call->app.currentPacket = NULL;
677 /* Wait for transmit window to open up */
679 && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
681 call->startWait = clock_Sec();
683 #ifdef RX_ENABLE_LOCKS
684 CV_WAIT(&call->cv_twind, &call->lock);
686 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
687 osi_rxSleep(&call->twind);
691 #ifdef RX_ENABLE_LOCKS
693 call->app.mode = RX_MODE_ERROR;
694 MUTEX_EXIT(&call->lock);
697 #endif /* RX_ENABLE_LOCKS */
699 if ((call->app.currentPacket = rxi_AllocSendPacket(call, nbytes))) {
700 #ifdef RX_TRACK_PACKETS
701 call->app.currentPacket->flags |= RX_PKTFLAG_CP;
703 call->app.nFree = call->app.currentPacket->length;
704 call->app.curvec = 1; /* 0th vec is always header */
705 /* begin at the beginning [ more or less ], continue
706 * on until the end, then stop. */
708 (char *) call->app.currentPacket->wirevec[1].iov_base +
709 call->conn->securityHeaderSize;
711 call->app.currentPacket->wirevec[1].iov_len -
712 call->conn->securityHeaderSize;
715 call->app.mode = RX_MODE_ERROR;
716 if (call->app.currentPacket) {
717 #ifdef RX_TRACK_PACKETS
718 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
720 rxi_FreePacket(call->app.currentPacket);
721 call->app.currentPacket = NULL;
723 MUTEX_EXIT(&call->lock);
726 MUTEX_EXIT(&call->lock);
729 if (call->app.currentPacket && (int)call->app.nFree < nbytes) {
730 /* Try to extend the current buffer */
732 len = call->app.currentPacket->length;
733 mud = rx_MaxUserDataSize(call);
736 want = MIN(nbytes - (int)call->app.nFree, mud - len);
737 rxi_AllocDataBuf(call->app.currentPacket, want,
738 RX_PACKET_CLASS_SEND_CBUF);
739 if (call->app.currentPacket->length > (unsigned)mud)
740 call->app.currentPacket->length = mud;
741 call->app.nFree += (call->app.currentPacket->length - len);
745 /* If the remaining bytes fit in the buffer, then store them
746 * and return. Don't ship a buffer that's full immediately to
747 * the peer--we don't know if it's the last buffer yet */
749 if (!call->app.currentPacket) {
753 while (nbytes && call->app.nFree) {
755 t = MIN((int)call->app.curlen, nbytes);
756 t = MIN((int)call->app.nFree, t);
757 memcpy(call->app.curpos, buf, t);
760 call->app.curpos += t;
761 call->app.curlen -= (u_short)t;
762 call->app.nFree -= (u_short)t;
764 if (!call->app.curlen) {
765 /* need to get another struct iov */
766 if (++call->app.curvec >= call->app.currentPacket->niovecs) {
767 /* current packet is full, extend or send it */
771 call->app.currentPacket->wirevec[call->app.curvec].iov_base;
773 call->app.currentPacket->wirevec[call->app.curvec].iov_len;
776 } /* while bytes to send and room to send them */
778 /* might be out of space now */
781 } else; /* more data to send, so get another packet and keep going */
784 return requestCount - nbytes;
788 rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
796 /* Free any packets from the last call to ReadvProc/WritevProc */
797 if (!opr_queue_IsEmpty(&call->app.iovq)) {
798 #ifdef RXDEBUG_PACKET
800 #endif /* RXDEBUG_PACKET */
801 rxi_FreePackets(0, &call->app.iovq);
805 * Most common case: all of the data fits in the current iovec.
806 * We are relying on nFree being zero unless the call is in send mode.
808 tcurlen = (int)call->app.curlen;
809 tnFree = (int)call->app.nFree;
810 if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
811 tcurpos = call->app.curpos;
813 memcpy(tcurpos, buf, nbytes);
814 call->app.curpos = tcurpos + nbytes;
815 call->app.curlen = (u_short)(tcurlen - nbytes);
816 call->app.nFree = (u_short)(tnFree - nbytes);
821 bytes = rxi_WriteProc(call, buf, nbytes);
826 /* Optimization for marshalling 32 bit arguments */
828 rx_WriteProc32(struct rx_call *call, afs_int32 * value)
836 if (!opr_queue_IsEmpty(&call->app.iovq)) {
837 #ifdef RXDEBUG_PACKET
839 #endif /* RXDEBUG_PACKET */
840 rxi_FreePackets(0, &call->app.iovq);
844 * Most common case: all of the data fits in the current iovec.
845 * We are relying on nFree being zero unless the call is in send mode.
847 tcurlen = call->app.curlen;
848 tnFree = call->app.nFree;
849 if (!call->error && tcurlen >= sizeof(afs_int32)
850 && tnFree >= sizeof(afs_int32)) {
851 tcurpos = call->app.curpos;
853 if (!((size_t)tcurpos & (sizeof(afs_int32) - 1))) {
854 *((afs_int32 *) (tcurpos)) = *value;
856 memcpy(tcurpos, (char *)value, sizeof(afs_int32));
858 call->app.curpos = tcurpos + sizeof(afs_int32);
859 call->app.curlen = (u_short)(tcurlen - sizeof(afs_int32));
860 call->app.nFree = (u_short)(tnFree - sizeof(afs_int32));
861 return sizeof(afs_int32);
865 bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
870 /* rxi_WritevAlloc -- internal version.
872 * Fill in an iovec to point to data in packet buffers. The application
873 * calls rxi_WritevProc when the buffers are full.
875 * LOCKS USED -- called at netpri.
879 rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
882 struct rx_connection *conn = call->conn;
883 struct rx_packet *cp;
886 /* Temporary values, real work is done in rxi_WritevProc */
888 unsigned int tcurvec;
892 requestCount = nbytes;
895 /* Free any packets from the last call to ReadvProc/WritevProc */
896 if (!opr_queue_IsEmpty(&call->app.iovq)) {
897 #ifdef RXDEBUG_PACKET
899 #endif /* RXDEBUG_PACKET */
900 rxi_FreePackets(0, &call->app.iovq);
903 if (call->app.mode != RX_MODE_SENDING) {
904 if ((conn->type == RX_SERVER_CONNECTION)
905 && (call->app.mode == RX_MODE_RECEIVING)) {
906 call->app.mode = RX_MODE_SENDING;
907 if (call->app.currentPacket) {
908 #ifdef RX_TRACK_PACKETS
909 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
911 rxi_FreePacket(call->app.currentPacket);
912 call->app.currentPacket = NULL;
921 /* Set up the iovec to point to data in packet buffers. */
922 tnFree = call->app.nFree;
923 tcurvec = call->app.curvec;
924 tcurpos = call->app.curpos;
925 tcurlen = call->app.curlen;
926 cp = call->app.currentPacket;
931 /* current packet is full, allocate a new one */
932 MUTEX_ENTER(&call->lock);
933 cp = rxi_AllocSendPacket(call, nbytes);
934 MUTEX_EXIT(&call->lock);
936 /* out of space, return what we have */
938 return requestCount - nbytes;
940 #ifdef RX_TRACK_PACKETS
941 cp->flags |= RX_PKTFLAG_IOVQ;
943 opr_queue_Append(&call->app.iovq, &cp->entry);
944 #ifdef RXDEBUG_PACKET
946 #endif /* RXDEBUG_PACKET */
950 (char *)cp->wirevec[1].iov_base +
951 call->conn->securityHeaderSize;
952 tcurlen = cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
955 if (tnFree < nbytes) {
956 /* try to extend the current packet */
959 mud = rx_MaxUserDataSize(call);
962 want = MIN(nbytes - tnFree, mud - len);
963 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
964 if (cp->length > (unsigned)mud)
966 tnFree += (cp->length - len);
967 if (cp == call->app.currentPacket) {
968 call->app.nFree += (cp->length - len);
973 /* fill in the next entry in the iovec */
974 t = MIN(tcurlen, nbytes);
976 iov[nextio].iov_base = tcurpos;
977 iov[nextio].iov_len = t;
985 /* need to get another struct iov */
986 if (++tcurvec >= cp->niovecs) {
987 /* current packet is full, extend it or move on to next packet */
990 tcurpos = (char *)cp->wirevec[tcurvec].iov_base;
991 tcurlen = cp->wirevec[tcurvec].iov_len;
994 } while (nbytes && nextio < maxio);
996 return requestCount - nbytes;
1000 rx_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
1007 bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
1012 /* rxi_WritevProc -- internal version.
1014 * Send buffers allocated in rxi_WritevAlloc.
1016 * LOCKS USED -- called at netpri.
1019 rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1021 #ifdef RX_TRACK_PACKETS
1022 struct opr_queue *cursor;
1026 struct opr_queue tmpq;
1027 #ifdef RXDEBUG_PACKET
1031 requestCount = nbytes;
1034 MUTEX_ENTER(&call->lock);
1036 call->app.mode = RX_MODE_ERROR;
1037 } else if (call->app.mode != RX_MODE_SENDING) {
1038 call->error = RX_PROTOCOL_ERROR;
1040 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1041 rxi_WaitforTQBusy(call);
1042 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1045 call->app.mode = RX_MODE_ERROR;
1046 MUTEX_EXIT(&call->lock);
1047 if (call->app.currentPacket) {
1048 #ifdef RX_TRACK_PACKETS
1049 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
1050 call->app.currentPacket->flags |= RX_PKTFLAG_IOVQ;
1052 opr_queue_Prepend(&call->app.iovq,
1053 &call->app.currentPacket->entry);
1054 #ifdef RXDEBUG_PACKET
1056 #endif /* RXDEBUG_PACKET */
1057 call->app.currentPacket = NULL;
1059 #ifdef RXDEBUG_PACKET
1061 #endif /* RXDEBUG_PACKET */
1062 rxi_FreePackets(0, &call->app.iovq);
1066 /* Loop through the I/O vector adjusting packet pointers.
1067 * Place full packets back onto the iovq once they are ready
1068 * to send. Set RX_PROTOCOL_ERROR if any problems are found in
1069 * the iovec. We put the loop condition at the end to ensure that
1070 * a zero length write will push a short packet. */
1072 opr_queue_Init(&tmpq);
1073 #ifdef RXDEBUG_PACKET
1075 #endif /* RXDEBUG_PACKET */
1077 if (call->app.nFree == 0 && call->app.currentPacket) {
1078 clock_NewTime(); /* Bogus: need new time package */
1079 /* The 0, below, specifies that it is not the last packet:
1080 * there will be others. PrepareSendPacket may
1081 * alter the packet length by up to
1082 * conn->securityMaxTrailerSize */
1083 call->app.bytesSent += call->app.currentPacket->length;
1084 rxi_PrepareSendPacket(call, call->app.currentPacket, 0);
1085 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1086 /* PrepareSendPacket drops the call lock */
1087 rxi_WaitforTQBusy(call);
1088 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1089 opr_queue_Append(&tmpq, &call->app.currentPacket->entry);
1090 #ifdef RXDEBUG_PACKET
1092 #endif /* RXDEBUG_PACKET */
1093 call->app.currentPacket = NULL;
1095 /* The head of the iovq is now the current packet */
1097 if (opr_queue_IsEmpty(&call->app.iovq)) {
1098 MUTEX_EXIT(&call->lock);
1099 call->error = RX_PROTOCOL_ERROR;
1100 #ifdef RXDEBUG_PACKET
1102 #endif /* RXDEBUG_PACKET */
1103 rxi_FreePackets(0, &tmpq);
1106 call->app.currentPacket
1107 = opr_queue_First(&call->app.iovq, struct rx_packet,
1109 opr_queue_Remove(&call->app.currentPacket->entry);
1110 #ifdef RX_TRACK_PACKETS
1111 call->app.currentPacket->flags &= ~RX_PKTFLAG_IOVQ;
1112 call->app.currentPacket->flags |= RX_PKTFLAG_CP;
1114 #ifdef RXDEBUG_PACKET
1116 #endif /* RXDEBUG_PACKET */
1117 call->app.nFree = call->app.currentPacket->length;
1118 call->app.curvec = 1;
1120 (char *) call->app.currentPacket->wirevec[1].iov_base +
1121 call->conn->securityHeaderSize;
1123 call->app.currentPacket->wirevec[1].iov_len -
1124 call->conn->securityHeaderSize;
1129 /* The next iovec should point to the current position */
1130 if (iov[nextio].iov_base != call->app.curpos
1131 || iov[nextio].iov_len > (int)call->app.curlen) {
1132 call->error = RX_PROTOCOL_ERROR;
1133 MUTEX_EXIT(&call->lock);
1134 if (call->app.currentPacket) {
1135 #ifdef RX_TRACK_PACKETS
1136 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
1138 opr_queue_Prepend(&tmpq,
1139 &call->app.currentPacket->entry);
1140 #ifdef RXDEBUG_PACKET
1142 #endif /* RXDEBUG_PACKET */
1143 call->app.currentPacket = NULL;
1145 #ifdef RXDEBUG_PACKET
1147 #endif /* RXDEBUG_PACKET */
1148 rxi_FreePackets(0, &tmpq);
1151 nbytes -= iov[nextio].iov_len;
1152 call->app.curpos += iov[nextio].iov_len;
1153 call->app.curlen -= iov[nextio].iov_len;
1154 call->app.nFree -= iov[nextio].iov_len;
1156 if (call->app.curlen == 0) {
1157 if (++call->app.curvec > call->app.currentPacket->niovecs) {
1158 call->app.nFree = 0;
1161 call->app.currentPacket->wirevec[call->app.curvec].iov_base;
1163 call->app.currentPacket->wirevec[call->app.curvec].iov_len;
1167 } while (nbytes && nextio < nio);
1169 /* Move the packets from the temporary queue onto the transmit queue.
1170 * We may end up with more than call->twind packets on the queue. */
1172 #ifdef RX_TRACK_PACKETS
1173 for (opr_queue_Scan(&tmpq, cursor))
1175 struct rx_packet *p = opr_queue_Entry(cursor, struct rx_packet, entry);
1176 p->flags |= RX_PKTFLAG_TQ;
1180 call->app.mode = RX_MODE_ERROR;
1182 opr_queue_SpliceAppend(&call->tq, &tmpq);
1184 /* If the call is in recovery, let it exhaust its current retransmit
1185 * queue before forcing it to send new packets
1187 if (!(call->flags & RX_CALL_FAST_RECOVER)) {
1191 /* Wait for the length of the transmit queue to fall below call->twind */
1192 while (!call->error && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
1194 call->startWait = clock_Sec();
1195 #ifdef RX_ENABLE_LOCKS
1196 CV_WAIT(&call->cv_twind, &call->lock);
1198 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
1199 osi_rxSleep(&call->twind);
1201 call->startWait = 0;
1205 call->app.mode = RX_MODE_ERROR;
1206 call->app.currentPacket = NULL;
1207 MUTEX_EXIT(&call->lock);
1208 if (call->app.currentPacket) {
1209 #ifdef RX_TRACK_PACKETS
1210 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
1212 rxi_FreePacket(call->app.currentPacket);
1216 MUTEX_EXIT(&call->lock);
1218 return requestCount - nbytes;
1222 rx_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1228 bytes = rxi_WritevProc(call, iov, nio, nbytes);
1233 /* Flush any buffered data to the stream, switch to read mode
1234 * (clients) or to EOF mode (servers)
1236 * LOCKS HELD: called at netpri.
1239 rxi_FlushWrite(struct rx_call *call)
1241 struct rx_packet *cp = NULL;
1243 /* Free any packets from the last call to ReadvProc/WritevProc */
1244 if (!opr_queue_IsEmpty(&call->app.iovq)) {
1245 #ifdef RXDEBUG_PACKET
1247 #endif /* RXDEBUG_PACKET */
1248 rxi_FreePackets(0, &call->app.iovq);
1251 if (call->app.mode == RX_MODE_SENDING) {
1254 (call->conn->type ==
1255 RX_CLIENT_CONNECTION ? RX_MODE_RECEIVING : RX_MODE_EOF);
1257 #ifdef RX_KERNEL_TRACE
1259 int glockOwner = ISAFS_GLOCK();
1262 afs_Trace3(afs_iclSetp, CM_TRACE_WASHERE, ICL_TYPE_STRING,
1263 __FILE__, ICL_TYPE_INT32, __LINE__, ICL_TYPE_POINTER,
1270 MUTEX_ENTER(&call->lock);
1272 call->app.mode = RX_MODE_ERROR;
1274 call->flags |= RX_CALL_FLUSH;
1276 cp = call->app.currentPacket;
1279 /* cp->length is only supposed to be the user's data */
1280 /* cp->length was already set to (then-current)
1281 * MaxUserDataSize or less. */
1282 #ifdef RX_TRACK_PACKETS
1283 cp->flags &= ~RX_PKTFLAG_CP;
1285 cp->length -= call->app.nFree;
1286 call->app.currentPacket = NULL;
1287 call->app.nFree = 0;
1289 cp = rxi_AllocSendPacket(call, 0);
1291 /* Mode can no longer be MODE_SENDING */
1295 cp->niovecs = 2; /* header + space for rxkad stuff */
1296 call->app.nFree = 0;
1299 /* The 1 specifies that this is the last packet */
1300 call->app.bytesSent += cp->length;
1301 rxi_PrepareSendPacket(call, cp, 1);
1302 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
1303 /* PrepareSendPacket drops the call lock */
1304 rxi_WaitforTQBusy(call);
1305 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
1306 #ifdef RX_TRACK_PACKETS
1307 cp->flags |= RX_PKTFLAG_TQ;
1309 opr_queue_Append(&call->tq, &cp->entry);
1310 #ifdef RXDEBUG_PACKET
1312 #endif /* RXDEBUG_PACKET */
1314 /* If the call is in recovery, let it exhaust its current retransmit
1315 * queue before forcing it to send new packets
1317 if (!(call->flags & RX_CALL_FAST_RECOVER)) {
1320 MUTEX_EXIT(&call->lock);
1324 /* Flush any buffered data to the stream, switch to read mode
1325 * (clients) or to EOF mode (servers) */
1327 rx_FlushWrite(struct rx_call *call)
1331 rxi_FlushWrite(call);