/*
* Copyright 2000, International Business Machines Corporation and others.
* All Rights Reserved.
- *
+ *
* This software has been released under the terms of the IBM Public
* License. For details, see the LICENSE file in the top-level source
* directory or online at http://www.openafs.org/dl/license10.html
*/
#include <afsconfig.h>
-#ifdef KERNEL
-#include "afs/param.h"
-#else
#include <afs/param.h>
-#endif
-
-RCSID
- ("$Header$");
#ifdef KERNEL
-#ifndef UKERNEL
-#ifdef RX_KERNEL_TRACE
-#include "rx_kcommon.h"
-#endif
-#if defined(AFS_DARWIN_ENV) || defined(AFS_XBSD_ENV)
-#include "afs/sysincludes.h"
-#else
-#include "h/types.h"
-#include "h/time.h"
-#include "h/stat.h"
-#ifdef AFS_OSF_ENV
-#include <net/net_globals.h>
-#endif /* AFS_OSF_ENV */
-#ifdef AFS_LINUX20_ENV
-#include "h/socket.h"
-#endif
-#include "netinet/in.h"
-#if defined(AFS_SGI_ENV)
-#include "afs/sysincludes.h"
-#endif
-#endif
-#include "afs/afs_args.h"
-#include "afs/afs_osi.h"
-#if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
-#include "h/systm.h"
-#endif
-#else /* !UKERNEL */
-#include "afs/sysincludes.h"
-#endif /* !UKERNEL */
-#ifdef RXDEBUG
-#undef RXDEBUG /* turn off debugging */
-#endif /* RXDEBUG */
-
-#include "rx_kmutex.h"
-#include "rx/rx_kernel.h"
-#include "rx/rx_clock.h"
-#include "rx/rx_queue.h"
-#include "rx/rx.h"
-#include "rx/rx_globals.h"
-#include "afs/lock.h"
-#include "afsint.h"
-#ifdef AFS_ALPHA_ENV
-#undef kmem_alloc
-#undef kmem_free
-#undef mem_alloc
-#undef mem_free
-#undef register
-#endif /* AFS_ALPHA_ENV */
+# ifndef UKERNEL
+# ifdef RX_KERNEL_TRACE
+# include "rx_kcommon.h"
+# endif
+# if defined(AFS_DARWIN_ENV) || defined(AFS_XBSD_ENV)
+# include "afs/sysincludes.h"
+# else
+# include "h/types.h"
+# include "h/time.h"
+# include "h/stat.h"
+# if defined(AFS_AIX_ENV) || defined(AFS_AUX_ENV) || defined(AFS_SUN5_ENV)
+# include "h/systm.h"
+# endif
+# ifdef AFS_OSF_ENV
+# include <net/net_globals.h>
+# endif /* AFS_OSF_ENV */
+# ifdef AFS_LINUX20_ENV
+# include "h/socket.h"
+# endif
+# include "netinet/in.h"
+# if defined(AFS_SGI_ENV)
+# include "afs/sysincludes.h"
+# endif
+# endif
+# include "afs/afs_args.h"
+# if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
+# include "h/systm.h"
+# endif
+# else /* !UKERNEL */
+# include "afs/sysincludes.h"
+# endif /* !UKERNEL */
+
+# ifdef RXDEBUG
+# undef RXDEBUG /* turn off debugging */
+# endif /* RXDEBUG */
+
+# include "afs/afs_osi.h"
+# include "rx_kmutex.h"
+# include "rx/rx_kernel.h"
+# include "afs/lock.h"
#else /* KERNEL */
-# include <sys/types.h>
-#ifndef AFS_NT40_ENV
-# include <sys/socket.h>
-# include <sys/file.h>
-# include <netdb.h>
-# include <netinet/in.h>
-# include <sys/stat.h>
-# include <sys/time.h>
-#endif
-#ifdef HAVE_STRING_H
-#include <string.h>
-#else
-#ifdef HAVE_STRINGS_H
-#include <strings.h>
-#endif
-#endif
-#ifdef HAVE_UNISTD_H
-#include <unistd.h>
-#endif
-# include "rx_user.h"
-# include "rx_clock.h"
-# include "rx_queue.h"
-# include "rx.h"
-# include "rx_globals.h"
+# include <roken.h>
+# include <afs/opr.h>
#endif /* KERNEL */
+#include "rx.h"
+#include "rx_clock.h"
+#include "rx_globals.h"
+#include "rx_atomic.h"
+#include "rx_internal.h"
+#include "rx_conn.h"
+#include "rx_call.h"
+#include "rx_packet.h"
+
#ifdef RX_LOCKS_DB
/* rxdb_fileID is used to identify the lock location, along with line#. */
static int rxdb_fileID = RXDB_FILE_RX_RDWR;
#endif /* RX_LOCKS_DB */
+
+/* Get the next packet in the receive queue
+ *
+ * Dispose of the call's currentPacket, and move the next packet in the
+ * receive queue into the currentPacket field. If the next packet isn't
+ * available, then currentPacket is left NULL.
+ *
+ * @param call
+ * The RX call to manipulate
+ * @returns
+ * 0 on success, an error code on failure
+ *
+ * @notes
+ * Must be called with the call locked. Unlocks the call if returning
+ * with an error.
+ */
+
+static int
+rxi_GetNextPacket(struct rx_call *call) {
+ struct rx_packet *rp;
+ int error;
+
+ if (call->app.currentPacket != NULL) {
+#ifdef RX_TRACK_PACKETS
+ call->app.currentPacket->flags |= RX_PKTFLAG_CP;
+#endif
+ rxi_FreePacket(call->app.currentPacket);
+ call->app.currentPacket = NULL;
+ }
+
+ if (opr_queue_IsEmpty(&call->rq))
+ return 0;
+
+ /* Check that next packet available is next in sequence */
+ rp = opr_queue_First(&call->rq, struct rx_packet, entry);
+ if (rp->header.seq != call->rnext)
+ return 0;
+
+ opr_queue_Remove(&rp->entry);
+#ifdef RX_TRACK_PACKETS
+ rp->flags &= ~RX_PKTFLAG_RQ;
+#endif
+#ifdef RXDEBUG_PACKET
+ call->rqc--;
+#endif /* RXDEBUG_PACKET */
+
+ /* RXS_CheckPacket called to undo RXS_PreparePacket's work. It may
+ * reduce the length of the packet by up to conn->maxTrailerSize,
+ * to reflect the length of the data + the header. */
+ if ((error = RXS_CheckPacket(call->conn->securityObject, call, rp))) {
+ /* Used to merely shut down the call, but now we shut down the whole
+ * connection since this may indicate an attempt to hijack it */
+
+ MUTEX_EXIT(&call->lock);
+ rxi_ConnectionError(call->conn, error);
+ MUTEX_ENTER(&call->conn->conn_data_lock);
+ rp = rxi_SendConnectionAbort(call->conn, rp, 0, 0);
+ MUTEX_EXIT(&call->conn->conn_data_lock);
+ rxi_FreePacket(rp);
+
+ return error;
+ }
+
+ call->rnext++;
+ call->app.currentPacket = rp;
+#ifdef RX_TRACK_PACKETS
+ call->app.currentPacket->flags |= RX_PKTFLAG_CP;
+#endif
+ call->app.curvec = 1; /* 0th vec is always header */
+
+ /* begin at the beginning [ more or less ], continue on until the end,
+ * then stop. */
+ call->app.curpos = (char *)call->app.currentPacket->wirevec[1].iov_base +
+ call->conn->securityHeaderSize;
+ call->app.curlen = call->app.currentPacket->wirevec[1].iov_len -
+ call->conn->securityHeaderSize;
+
+ call->app.nLeft = call->app.currentPacket->length;
+ call->app.bytesRcvd += call->app.currentPacket->length;
+
+ call->nHardAcks++;
+
+ return 0;
+}
+
/* rxi_ReadProc -- internal version.
*
- * LOCKS USED -- called at netpri with rx global lock and call->lock held.
+ * LOCKS USED -- called at netpri
*/
int
-rxi_ReadProc(register struct rx_call *call, register char *buf,
- register int nbytes)
+rxi_ReadProc(struct rx_call *call, char *buf,
+ int nbytes)
{
- register struct rx_packet *cp = call->currentPacket;
- register struct rx_packet *rp;
- register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
- register int requestCount;
- register unsigned int t;
+ int requestCount;
+ int code;
+ unsigned int t;
+
/* XXXX took out clock_NewTime from here. Was it needed? */
requestCount = nbytes;
/* Free any packets from the last call to ReadvProc/WritevProc */
- if (!queue_IsEmpty(&call->iovq)) {
- for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
- queue_Remove(rp);
- rxi_FreePacket(rp);
- }
+ if (!opr_queue_IsEmpty(&call->app.iovq)) {
+#ifdef RXDEBUG_PACKET
+ call->iovqc -=
+#endif /* RXDEBUG_PACKET */
+ rxi_FreePackets(0, &call->app.iovq);
}
do {
- if (call->nLeft == 0) {
+ if (call->app.nLeft == 0) {
/* Get next packet */
+ MUTEX_ENTER(&call->lock);
for (;;) {
- if (call->error || (call->mode != RX_MODE_RECEIVING)) {
+ if (call->error || (call->app.mode != RX_MODE_RECEIVING)) {
if (call->error) {
+ call->app.mode = RX_MODE_ERROR;
+ MUTEX_EXIT(&call->lock);
return 0;
}
- if (call->mode == RX_MODE_SENDING) {
+ if (call->app.mode == RX_MODE_SENDING) {
+ MUTEX_EXIT(&call->lock);
rxi_FlushWrite(call);
+ MUTEX_ENTER(&call->lock);
continue;
}
}
- if (queue_IsNotEmpty(&call->rq)) {
- /* Check that next packet available is next in sequence */
- rp = queue_First(&call->rq, rx_packet);
- if (rp->header.seq == call->rnext) {
- afs_int32 error;
- register struct rx_connection *conn = call->conn;
- queue_Remove(rp);
-
- /* RXS_CheckPacket called to undo RXS_PreparePacket's
- * work. It may reduce the length of the packet by up
- * to conn->maxTrailerSize, to reflect the length of the
- * data + the header. */
- if ((error =
- RXS_CheckPacket(conn->securityObject, call,
- rp))) {
- /* Used to merely shut down the call, but now we
- * shut down the whole connection since this may
- * indicate an attempt to hijack it */
-
- MUTEX_EXIT(&call->lock);
- rxi_ConnectionError(conn, error);
- MUTEX_ENTER(&conn->conn_data_lock);
- rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
- MUTEX_EXIT(&conn->conn_data_lock);
- rxi_FreePacket(rp);
- MUTEX_ENTER(&call->lock);
-
- return 0;
- }
- call->rnext++;
- cp = call->currentPacket = rp;
- call->curvec = 1; /* 0th vec is always header */
- /* begin at the beginning [ more or less ], continue
- * on until the end, then stop. */
- call->curpos =
- (char *)cp->wirevec[1].iov_base +
- call->conn->securityHeaderSize;
- call->curlen =
- cp->wirevec[1].iov_len -
- call->conn->securityHeaderSize;
-
- /* Notice that this code works correctly if the data
- * size is 0 (which it may be--no reply arguments from
- * server, for example). This relies heavily on the
- * fact that the code below immediately frees the packet
- * (no yields, etc.). If it didn't, this would be a
- * problem because a value of zero for call->nLeft
- * normally means that there is no read packet */
- call->nLeft = cp->length;
- hadd32(call->bytesRcvd, cp->length);
-
- /* Send a hard ack for every rxi_HardAckRate+1 packets
- * consumed. Otherwise schedule an event to send
- * the hard ack later on.
- */
- call->nHardAcks++;
- if (!(call->flags & RX_CALL_RECEIVE_DONE)) {
- if (call->nHardAcks > (u_short) rxi_HardAckRate) {
- rxevent_Cancel(call->delayedAckEvent, call,
- RX_CALL_REFCOUNT_DELAY);
- rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
- } else {
- struct clock when;
- clock_GetTime(&when);
- /* Delay to consolidate ack packets */
- clock_Add(&when, &rx_hardAckDelay);
- if (!call->delayedAckEvent
- || clock_Gt(&call->delayedAckEvent->
- eventTime, &when)) {
- rxevent_Cancel(call->delayedAckEvent,
- call,
- RX_CALL_REFCOUNT_DELAY);
- CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
- call->delayedAckEvent =
- rxevent_Post(&when,
- rxi_SendDelayedAck, call,
- 0);
- }
- }
+
+ code = rxi_GetNextPacket(call);
+ if (code)
+ return 0;
+
+ if (call->app.currentPacket) {
+ if (!(call->flags & RX_CALL_RECEIVE_DONE)) {
+ if (call->nHardAcks > (u_short) rxi_HardAckRate) {
+ rxevent_Cancel(&call->delayedAckEvent, call,
+ RX_CALL_REFCOUNT_DELAY);
+ rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
+ } else {
+ /* Delay to consolidate ack packets */
+ rxi_PostDelayedAckEvent(call, &rx_hardAckDelay);
}
- break;
}
+ break;
}
-/*
-MTUXXX doesn't there need to be an "else" here ???
-*/
+ /*
+ * If we reach this point either we have no packets in the
+ * receive queue or the next packet in the queue is not the
+ * one we are looking for. There is nothing else for us to
+ * do but wait for another packet to arrive.
+ */
+
/* Are there ever going to be any more packets? */
if (call->flags & RX_CALL_RECEIVE_DONE) {
+ MUTEX_EXIT(&call->lock);
return requestCount - nbytes;
}
/* Wait for in-sequence packet */
call->startWait = 0;
#ifdef RX_ENABLE_LOCKS
if (call->error) {
+ MUTEX_EXIT(&call->lock);
return 0;
}
#endif /* RX_ENABLE_LOCKS */
}
+ MUTEX_EXIT(&call->lock);
} else
- /* assert(cp); */
+ /* osi_Assert(cp); */
/* MTUXXX this should be replaced by some error-recovery code before shipping */
/* yes, the following block is allowed to be the ELSE clause (or not) */
- /* It's possible for call->nLeft to be smaller than any particular
+ /* It's possible for call->app.nLeft to be smaller than any particular
* iov_len. Usually, recvmsg doesn't change the iov_len, since it
* reflects the size of the buffer. We have to keep track of the
* number of bytes read in the length field of the packet struct. On
* the final portion of a received packet, it's almost certain that
- * call->nLeft will be smaller than the final buffer. */
- while (nbytes && cp) {
- t = MIN((int)call->curlen, nbytes);
- t = MIN(t, (int)call->nLeft);
- memcpy(buf, call->curpos, t);
+ * call->app.nLeft will be smaller than the final buffer. */
+ while (nbytes && call->app.currentPacket) {
+ t = MIN((int)call->app.curlen, nbytes);
+ t = MIN(t, (int)call->app.nLeft);
+ memcpy(buf, call->app.curpos, t);
buf += t;
nbytes -= t;
- call->curpos += t;
- call->curlen -= t;
- call->nLeft -= t;
+ call->app.curpos += t;
+ call->app.curlen -= t;
+ call->app.nLeft -= t;
- if (!call->nLeft) {
+ if (!call->app.nLeft) {
/* out of packet. Get another one. */
- rxi_FreePacket(cp);
- cp = call->currentPacket = (struct rx_packet *)0;
- } else if (!call->curlen) {
+#ifdef RX_TRACK_PACKETS
+ call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
+#endif
+ rxi_FreePacket(call->app.currentPacket);
+ call->app.currentPacket = NULL;
+ } else if (!call->app.curlen) {
/* need to get another struct iov */
- if (++call->curvec >= cp->niovecs) {
+ if (++call->app.curvec >= call->app.currentPacket->niovecs) {
/* current packet is exhausted, get ready for another */
/* don't worry about curvec and stuff, they get set somewhere else */
- rxi_FreePacket(cp);
- cp = call->currentPacket = (struct rx_packet *)0;
- call->nLeft = 0;
+#ifdef RX_TRACK_PACKETS
+ call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
+#endif
+ rxi_FreePacket(call->app.currentPacket);
+ call->app.currentPacket = NULL;
+ call->app.nLeft = 0;
} else {
- call->curpos =
- (char *)cp->wirevec[call->curvec].iov_base;
- call->curlen = cp->wirevec[call->curvec].iov_len;
+ call->app.curpos =
+ call->app.currentPacket->wirevec[call->app.curvec].iov_base;
+ call->app.curlen =
+ call->app.currentPacket->wirevec[call->app.curvec].iov_len;
}
}
}
rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
{
int bytes;
- int tcurlen;
- int tnLeft;
- char *tcurpos;
SPLVAR;
- /*
- * Free any packets from the last call to ReadvProc/WritevProc.
- * We do not need the lock because the receiver threads only
- * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
- * RX_CALL_IOVEC_WAIT is always cleared before returning from
- * ReadvProc/WritevProc.
- */
- if (!queue_IsEmpty(&call->iovq)) {
- register struct rx_packet *rp;
- register struct rx_packet *nxp;
- for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
- queue_Remove(rp);
- rxi_FreePacket(rp);
- }
+ /* Free any packets from the last call to ReadvProc/WritevProc */
+ if (!opr_queue_IsEmpty(&call->app.iovq)) {
+#ifdef RXDEBUG_PACKET
+ call->iovqc -=
+#endif /* RXDEBUG_PACKET */
+ rxi_FreePackets(0, &call->app.iovq);
}
/*
* Most common case, all of the data is in the current iovec.
- * We do not need the lock because this is the only thread that
- * updates the curlen, curpos, nLeft fields.
- *
* We are relying on nLeft being zero unless the call is in receive mode.
*/
- tcurlen = call->curlen;
- tnLeft = call->nLeft;
- if (!call->error && tcurlen > nbytes && tnLeft > nbytes) {
- tcurpos = call->curpos;
- memcpy(buf, tcurpos, nbytes);
- call->curpos = tcurpos + nbytes;
- call->curlen = tcurlen - nbytes;
- call->nLeft = tnLeft - nbytes;
+ if (!call->error && call->app.curlen > nbytes && call->app.nLeft > nbytes) {
+ memcpy(buf, call->app.curpos, nbytes);
+
+ call->app.curpos += nbytes;
+ call->app.curlen -= nbytes;
+ call->app.nLeft -= nbytes;
+
+ if (!call->app.nLeft && call->app.currentPacket != NULL) {
+ /* out of packet. Get another one. */
+ rxi_FreePacket(call->app.currentPacket);
+ call->app.currentPacket = NULL;
+ }
return nbytes;
}
NETPRI;
- AFS_RXGLOCK();
- MUTEX_ENTER(&call->lock);
bytes = rxi_ReadProc(call, buf, nbytes);
- MUTEX_EXIT(&call->lock);
- AFS_RXGUNLOCK();
USERPRI;
return bytes;
}
rx_ReadProc32(struct rx_call *call, afs_int32 * value)
{
int bytes;
- int tcurlen;
- int tnLeft;
- char *tcurpos;
SPLVAR;
- /*
- * Free any packets from the last call to ReadvProc/WritevProc.
- * We do not need the lock because the receiver threads only
- * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
- * RX_CALL_IOVEC_WAIT is always cleared before returning from
- * ReadvProc/WritevProc.
- */
- if (!queue_IsEmpty(&call->iovq)) {
- register struct rx_packet *rp;
- register struct rx_packet *nxp;
- for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
- queue_Remove(rp);
- rxi_FreePacket(rp);
- }
+ /* Free any packets from the last call to ReadvProc/WritevProc */
+ if (!opr_queue_IsEmpty(&call->app.iovq)) {
+#ifdef RXDEBUG_PACKET
+ call->iovqc -=
+#endif /* RXDEBUG_PACKET */
+ rxi_FreePackets(0, &call->app.iovq);
}
/*
* Most common case, all of the data is in the current iovec.
- * We do not need the lock because this is the only thread that
- * updates the curlen, curpos, nLeft fields.
- *
* We are relying on nLeft being zero unless the call is in receive mode.
*/
- tcurlen = call->curlen;
- tnLeft = call->nLeft;
- if (!call->error && tcurlen > sizeof(afs_int32)
- && tnLeft > sizeof(afs_int32)) {
- tcurpos = call->curpos;
- if (!((long)tcurpos & (sizeof(afs_int32) - 1))) {
- *value = *((afs_int32 *) (tcurpos));
- } else {
- memcpy((char *)value, tcurpos, sizeof(afs_int32));
- }
- call->curpos = tcurpos + sizeof(afs_int32);
- call->curlen = tcurlen - sizeof(afs_int32);
- call->nLeft = tnLeft - sizeof(afs_int32);
+ if (!call->error && call->app.curlen >= sizeof(afs_int32)
+ && call->app.nLeft >= sizeof(afs_int32)) {
+
+ memcpy((char *)value, call->app.curpos, sizeof(afs_int32));
+
+ call->app.curpos += sizeof(afs_int32);
+ call->app.curlen -= sizeof(afs_int32);
+ call->app.nLeft -= sizeof(afs_int32);
+
+ if (!call->app.nLeft && call->app.currentPacket != NULL) {
+ /* out of packet. Get another one. */
+ rxi_FreePacket(call->app.currentPacket);
+ call->app.currentPacket = NULL;
+ }
return sizeof(afs_int32);
}
NETPRI;
- AFS_RXGLOCK();
- MUTEX_ENTER(&call->lock);
bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
- MUTEX_EXIT(&call->lock);
- AFS_RXGUNLOCK();
USERPRI;
+
return bytes;
}
{
int didConsume = 0;
int didHardAck = 0;
- register unsigned int t;
- struct rx_packet *rp;
- struct rx_packet *curp;
+ int code;
+ unsigned int t;
struct iovec *call_iov;
struct iovec *cur_iov = NULL;
- curp = call->currentPacket;
- if (curp) {
- cur_iov = &curp->wirevec[call->curvec];
+ if (call->app.currentPacket) {
+ cur_iov = &call->app.currentPacket->wirevec[call->app.curvec];
}
call_iov = &call->iov[call->iovNext];
while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) {
- if (call->nLeft == 0) {
+ if (call->app.nLeft == 0) {
/* Get next packet */
- if (queue_IsNotEmpty(&call->rq)) {
- /* Check that next packet available is next in sequence */
- rp = queue_First(&call->rq, rx_packet);
- if (rp->header.seq == call->rnext) {
- afs_int32 error;
- register struct rx_connection *conn = call->conn;
- queue_Remove(rp);
-
- /* RXS_CheckPacket called to undo RXS_PreparePacket's
- * work. It may reduce the length of the packet by up
- * to conn->maxTrailerSize, to reflect the length of the
- * data + the header. */
- if ((error =
- RXS_CheckPacket(conn->securityObject, call, rp))) {
- /* Used to merely shut down the call, but now we
- * shut down the whole connection since this may
- * indicate an attempt to hijack it */
+ code = rxi_GetNextPacket(call);
+ if (code) {
+ MUTEX_ENTER(&call->lock);
+ return 1;
+ }
- MUTEX_EXIT(&call->lock);
- rxi_ConnectionError(conn, error);
- MUTEX_ENTER(&conn->conn_data_lock);
- rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
- MUTEX_EXIT(&conn->conn_data_lock);
- rxi_FreePacket(rp);
- MUTEX_ENTER(&call->lock);
-
- return 1;
- }
- call->rnext++;
- curp = call->currentPacket = rp;
- call->curvec = 1; /* 0th vec is always header */
- cur_iov = &curp->wirevec[1];
- /* begin at the beginning [ more or less ], continue
- * on until the end, then stop. */
- call->curpos =
- (char *)curp->wirevec[1].iov_base +
- call->conn->securityHeaderSize;
- call->curlen =
- curp->wirevec[1].iov_len -
- call->conn->securityHeaderSize;
-
- /* Notice that this code works correctly if the data
- * size is 0 (which it may be--no reply arguments from
- * server, for example). This relies heavily on the
- * fact that the code below immediately frees the packet
- * (no yields, etc.). If it didn't, this would be a
- * problem because a value of zero for call->nLeft
- * normally means that there is no read packet */
- call->nLeft = curp->length;
- hadd32(call->bytesRcvd, curp->length);
-
- /* Send a hard ack for every rxi_HardAckRate+1 packets
- * consumed. Otherwise schedule an event to send
- * the hard ack later on.
- */
- call->nHardAcks++;
- didConsume = 1;
- continue;
- }
+ if (call->app.currentPacket) {
+ cur_iov = &call->app.currentPacket->wirevec[1];
+ didConsume = 1;
+ continue;
+ } else {
+ break;
}
- break;
}
- /* It's possible for call->nLeft to be smaller than any particular
+ /* It's possible for call->app.nLeft to be smaller than any particular
* iov_len. Usually, recvmsg doesn't change the iov_len, since it
* reflects the size of the buffer. We have to keep track of the
* number of bytes read in the length field of the packet struct. On
* the final portion of a received packet, it's almost certain that
- * call->nLeft will be smaller than the final buffer. */
- while (call->iovNBytes && call->iovNext < call->iovMax && curp) {
-
- t = MIN((int)call->curlen, call->iovNBytes);
- t = MIN(t, (int)call->nLeft);
- call_iov->iov_base = call->curpos;
+ * call->app.nLeft will be smaller than the final buffer. */
+ while (call->iovNBytes
+ && call->iovNext < call->iovMax
+ && call->app.currentPacket) {
+
+ t = MIN((int)call->app.curlen, call->iovNBytes);
+ t = MIN(t, (int)call->app.nLeft);
+ call_iov->iov_base = call->app.curpos;
call_iov->iov_len = t;
call_iov++;
call->iovNext++;
call->iovNBytes -= t;
- call->curpos += t;
- call->curlen -= t;
- call->nLeft -= t;
+ call->app.curpos += t;
+ call->app.curlen -= t;
+ call->app.nLeft -= t;
- if (!call->nLeft) {
+ if (!call->app.nLeft) {
/* out of packet. Get another one. */
- queue_Append(&call->iovq, curp);
- curp = call->currentPacket = (struct rx_packet *)0;
- } else if (!call->curlen) {
+#ifdef RX_TRACK_PACKETS
+ call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
+ call->app.currentPacket->flags |= RX_PKTFLAG_IOVQ;
+#endif
+ opr_queue_Append(&call->app.iovq,
+ &call->app.currentPacket->entry);
+#ifdef RXDEBUG_PACKET
+ call->iovqc++;
+#endif /* RXDEBUG_PACKET */
+ call->app.currentPacket = NULL;
+ } else if (!call->app.curlen) {
/* need to get another struct iov */
- if (++call->curvec >= curp->niovecs) {
+ if (++call->app.curvec >= call->app.currentPacket->niovecs) {
/* current packet is exhausted, get ready for another */
/* don't worry about curvec and stuff, they get set somewhere else */
- queue_Append(&call->iovq, curp);
- curp = call->currentPacket = (struct rx_packet *)0;
- call->nLeft = 0;
+#ifdef RX_TRACK_PACKETS
+ call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
+ call->app.currentPacket->flags |= RX_PKTFLAG_IOVQ;
+#endif
+ opr_queue_Append(&call->app.iovq,
+ &call->app.currentPacket->entry);
+#ifdef RXDEBUG_PACKET
+ call->iovqc++;
+#endif /* RXDEBUG_PACKET */
+ call->app.currentPacket = NULL;
+ call->app.nLeft = 0;
} else {
cur_iov++;
- call->curpos = (char *)cur_iov->iov_base;
- call->curlen = cur_iov->iov_len;
+ call->app.curpos = (char *)cur_iov->iov_base;
+ call->app.curlen = cur_iov->iov_len;
}
}
}
* send a hard ack. */
if (didConsume && (!(call->flags & RX_CALL_RECEIVE_DONE))) {
if (call->nHardAcks > (u_short) rxi_HardAckRate) {
- rxevent_Cancel(call->delayedAckEvent, call,
+ rxevent_Cancel(&call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
rxi_SendAck(call, 0, serial, RX_ACK_DELAY, 0);
didHardAck = 1;
} else {
- struct clock when;
- clock_GetTime(&when);
/* Delay to consolidate ack packets */
- clock_Add(&when, &rx_hardAckDelay);
- if (!call->delayedAckEvent
- || clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
- rxevent_Cancel(call->delayedAckEvent, call,
- RX_CALL_REFCOUNT_DELAY);
- CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
- call->delayedAckEvent =
- rxevent_Post(&when, rxi_SendDelayedAck, call, 0);
- }
+ rxi_PostDelayedAckEvent(call, &rx_hardAckDelay);
}
}
return didHardAck;
* except the last packet (new current packet) are moved to the iovq
* while the application is processing the data.
*
- * LOCKS USED -- called at netpri with rx global lock and call->lock held.
+ * LOCKS USED -- called at netpri.
*/
int
rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
int nbytes)
{
- struct rx_packet *rp;
- struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
- int requestCount;
- int nextio;
-
- requestCount = nbytes;
- nextio = 0;
+ int bytes;
/* Free any packets from the last call to ReadvProc/WritevProc */
- for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
- queue_Remove(rp);
- rxi_FreePacket(rp);
+ if (!opr_queue_IsEmpty(&call->app.iovq)) {
+#ifdef RXDEBUG_PACKET
+ call->iovqc -=
+#endif /* RXDEBUG_PACKET */
+ rxi_FreePackets(0, &call->app.iovq);
}
- if (call->mode == RX_MODE_SENDING) {
+ if (call->app.mode == RX_MODE_SENDING) {
rxi_FlushWrite(call);
}
- if (call->error) {
- return 0;
- }
+ MUTEX_ENTER(&call->lock);
+ if (call->error)
+ goto error;
/* Get whatever data is currently available in the receive queue.
* If rxi_FillReadVec sends an ack packet then it is possible
call->startWait = 0;
}
call->flags &= ~RX_CALL_IOVEC_WAIT;
-#ifdef RX_ENABLE_LOCKS
- if (call->error) {
- return 0;
- }
-#endif /* RX_ENABLE_LOCKS */
+
+ if (call->error)
+ goto error;
call->iov = NULL;
*nio = call->iovNext;
- return nbytes - call->iovNBytes;
+ bytes = nbytes - call->iovNBytes;
+ MUTEX_EXIT(&call->lock);
+ return bytes;
+
+ error:
+ MUTEX_EXIT(&call->lock);
+ call->app.mode = RX_MODE_ERROR;
+ return 0;
}
int
SPLVAR;
NETPRI;
- AFS_RXGLOCK();
- MUTEX_ENTER(&call->lock);
bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
- MUTEX_EXIT(&call->lock);
- AFS_RXGUNLOCK();
USERPRI;
return bytes;
}
/* rxi_WriteProc -- internal version.
*
- * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
+ * LOCKS USED -- called at netpri
+ */
int
-rxi_WriteProc(register struct rx_call *call, register char *buf,
- register int nbytes)
+rxi_WriteProc(struct rx_call *call, char *buf,
+ int nbytes)
{
struct rx_connection *conn = call->conn;
- register struct rx_packet *cp = call->currentPacket;
- register struct rx_packet *tp; /* Temporary packet pointer */
- register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
- register unsigned int t;
+ unsigned int t;
int requestCount = nbytes;
/* Free any packets from the last call to ReadvProc/WritevProc */
- if (!queue_IsEmpty(&call->iovq)) {
- for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
- queue_Remove(tp);
- rxi_FreePacket(tp);
- }
+ if (!opr_queue_IsEmpty(&call->app.iovq)) {
+#ifdef RXDEBUG_PACKET
+ call->iovqc -=
+#endif /* RXDEBUG_PACKET */
+ rxi_FreePackets(0, &call->app.iovq);
}
- if (call->mode != RX_MODE_SENDING) {
+ if (call->app.mode != RX_MODE_SENDING) {
if ((conn->type == RX_SERVER_CONNECTION)
- && (call->mode == RX_MODE_RECEIVING)) {
- call->mode = RX_MODE_SENDING;
- if (cp) {
- rxi_FreePacket(cp);
- cp = call->currentPacket = (struct rx_packet *)0;
- call->nLeft = 0;
- call->nFree = 0;
+ && (call->app.mode == RX_MODE_RECEIVING)) {
+ call->app.mode = RX_MODE_SENDING;
+ if (call->app.currentPacket) {
+#ifdef RX_TRACK_PACKETS
+ call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
+#endif
+ rxi_FreePacket(call->app.currentPacket);
+ call->app.currentPacket = NULL;
+ call->app.nLeft = 0;
+ call->app.nFree = 0;
}
} else {
return 0;
* there are 0 bytes on the stream, but we must send a packet
* anyway. */
do {
- if (call->nFree == 0) {
- if (!call->error && cp) {
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- /* Wait until TQ_BUSY is reset before adding any
- * packets to the transmit queue
- */
- while (call->flags & RX_CALL_TQ_BUSY) {
- call->flags |= RX_CALL_TQ_WAIT;
-#ifdef RX_ENABLE_LOCKS
- CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
- osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
- }
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+ if (call->app.nFree == 0) {
+ MUTEX_ENTER(&call->lock);
+ if (call->error)
+ call->app.mode = RX_MODE_ERROR;
+ if (!call->error && call->app.currentPacket) {
clock_NewTime(); /* Bogus: need new time package */
- /* The 0, below, specifies that it is not the last packet:
+ /* The 0, below, specifies that it is not the last packet:
* there will be others. PrepareSendPacket may
* alter the packet length by up to
* conn->securityMaxTrailerSize */
- hadd32(call->bytesSent, cp->length);
- rxi_PrepareSendPacket(call, cp, 0);
- queue_Append(&call->tq, cp);
- cp = call->currentPacket = NULL;
- if (!
- (call->
- flags & (RX_CALL_FAST_RECOVER |
- RX_CALL_FAST_RECOVER_WAIT))) {
- rxi_Start(0, call, 0);
+ call->app.bytesSent += call->app.currentPacket->length;
+ rxi_PrepareSendPacket(call, call->app.currentPacket, 0);
+#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+ /* PrepareSendPacket drops the call lock */
+ rxi_WaitforTQBusy(call);
+#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#ifdef RX_TRACK_PACKETS
+ call->app.currentPacket->flags |= RX_PKTFLAG_TQ;
+#endif
+ opr_queue_Append(&call->tq,
+ &call->app.currentPacket->entry);
+#ifdef RXDEBUG_PACKET
+ call->tqc++;
+#endif /* RXDEBUG_PACKET */
+#ifdef RX_TRACK_PACKETS
+ call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
+#endif
+ call->app.currentPacket = NULL;
+
+ /* If the call is in recovery, let it exhaust its current
+ * retransmit queue before forcing it to send new packets
+ */
+ if (!(call->flags & (RX_CALL_FAST_RECOVER))) {
+ rxi_Start(call, 0);
}
+ } else if (call->app.currentPacket) {
+#ifdef RX_TRACK_PACKETS
+ call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
+#endif
+ rxi_FreePacket(call->app.currentPacket);
+ call->app.currentPacket = NULL;
}
/* Wait for transmit window to open up */
while (!call->error
- && call->tnext + 1 > call->tfirst + call->twind) {
+ && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
clock_NewTime();
call->startWait = clock_Sec();
call->startWait = 0;
#ifdef RX_ENABLE_LOCKS
if (call->error) {
+ call->app.mode = RX_MODE_ERROR;
+ MUTEX_EXIT(&call->lock);
return 0;
}
#endif /* RX_ENABLE_LOCKS */
}
- if ((cp = rxi_AllocSendPacket(call, nbytes))) {
- call->currentPacket = cp;
- call->nFree = cp->length;
- call->curvec = 1; /* 0th vec is always header */
- /* begin at the beginning [ more or less ], continue
+ if ((call->app.currentPacket = rxi_AllocSendPacket(call, nbytes))) {
+#ifdef RX_TRACK_PACKETS
+ call->app.currentPacket->flags |= RX_PKTFLAG_CP;
+#endif
+ call->app.nFree = call->app.currentPacket->length;
+ call->app.curvec = 1; /* 0th vec is always header */
+ /* begin at the beginning [ more or less ], continue
* on until the end, then stop. */
- call->curpos =
- (char *)cp->wirevec[1].iov_base +
+ call->app.curpos =
+ (char *) call->app.currentPacket->wirevec[1].iov_base +
+ call->conn->securityHeaderSize;
+ call->app.curlen =
+ call->app.currentPacket->wirevec[1].iov_len -
call->conn->securityHeaderSize;
- call->curlen =
- cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
}
if (call->error) {
- if (cp) {
- rxi_FreePacket(cp);
- call->currentPacket = NULL;
+ call->app.mode = RX_MODE_ERROR;
+ if (call->app.currentPacket) {
+#ifdef RX_TRACK_PACKETS
+ call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
+#endif
+ rxi_FreePacket(call->app.currentPacket);
+ call->app.currentPacket = NULL;
}
+ MUTEX_EXIT(&call->lock);
return 0;
}
+ MUTEX_EXIT(&call->lock);
}
- if (cp && (int)call->nFree < nbytes) {
+ if (call->app.currentPacket && (int)call->app.nFree < nbytes) {
/* Try to extend the current buffer */
- register int len, mud;
- len = cp->length;
+ int len, mud;
+ len = call->app.currentPacket->length;
mud = rx_MaxUserDataSize(call);
if (mud > len) {
int want;
- want = MIN(nbytes - (int)call->nFree, mud - len);
- rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
- if (cp->length > (unsigned)mud)
- cp->length = mud;
- call->nFree += (cp->length - len);
+ want = MIN(nbytes - (int)call->app.nFree, mud - len);
+ rxi_AllocDataBuf(call->app.currentPacket, want,
+ RX_PACKET_CLASS_SEND_CBUF);
+ if (call->app.currentPacket->length > (unsigned)mud)
+ call->app.currentPacket->length = mud;
+ call->app.nFree += (call->app.currentPacket->length - len);
}
}
* and return. Don't ship a buffer that's full immediately to
* the peer--we don't know if it's the last buffer yet */
- if (!cp) {
- call->nFree = 0;
+ if (!call->app.currentPacket) {
+ call->app.nFree = 0;
}
- while (nbytes && call->nFree) {
+ while (nbytes && call->app.nFree) {
- t = MIN((int)call->curlen, nbytes);
- t = MIN((int)call->nFree, t);
- memcpy(call->curpos, buf, t);
+ t = MIN((int)call->app.curlen, nbytes);
+ t = MIN((int)call->app.nFree, t);
+ memcpy(call->app.curpos, buf, t);
buf += t;
nbytes -= t;
- call->curpos += t;
- call->curlen -= t;
- call->nFree -= t;
+ call->app.curpos += t;
+ call->app.curlen -= (u_short)t;
+ call->app.nFree -= (u_short)t;
- if (!call->curlen) {
+ if (!call->app.curlen) {
/* need to get another struct iov */
- if (++call->curvec >= cp->niovecs) {
+ if (++call->app.curvec >= call->app.currentPacket->niovecs) {
/* current packet is full, extend or send it */
- call->nFree = 0;
+ call->app.nFree = 0;
} else {
- call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
- call->curlen = cp->wirevec[call->curvec].iov_len;
+ call->app.curpos =
+ call->app.currentPacket->wirevec[call->app.curvec].iov_base;
+ call->app.curlen =
+ call->app.currentPacket->wirevec[call->app.curvec].iov_len;
}
}
} /* while bytes to send and room to send them */
char *tcurpos;
SPLVAR;
- /*
- * Free any packets from the last call to ReadvProc/WritevProc.
- * We do not need the lock because the receiver threads only
- * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
- * RX_CALL_IOVEC_WAIT is always cleared before returning from
- * ReadvProc/WritevProc.
- */
- if (!queue_IsEmpty(&call->iovq)) {
- register struct rx_packet *rp;
- register struct rx_packet *nxp;
- for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
- queue_Remove(rp);
- rxi_FreePacket(rp);
- }
+ /* Free any packets from the last call to ReadvProc/WritevProc */
+ if (!opr_queue_IsEmpty(&call->app.iovq)) {
+#ifdef RXDEBUG_PACKET
+ call->iovqc -=
+#endif /* RXDEBUG_PACKET */
+ rxi_FreePackets(0, &call->app.iovq);
}
/*
* Most common case: all of the data fits in the current iovec.
- * We do not need the lock because this is the only thread that
- * updates the curlen, curpos, nFree fields.
- *
* We are relying on nFree being zero unless the call is in send mode.
*/
- tcurlen = (int)call->curlen;
- tnFree = (int)call->nFree;
+ tcurlen = (int)call->app.curlen;
+ tnFree = (int)call->app.nFree;
if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
- tcurpos = call->curpos;
+ tcurpos = call->app.curpos;
+
memcpy(tcurpos, buf, nbytes);
- call->curpos = tcurpos + nbytes;
- call->curlen = tcurlen - nbytes;
- call->nFree = tnFree - nbytes;
+ call->app.curpos = tcurpos + nbytes;
+ call->app.curlen = (u_short)(tcurlen - nbytes);
+ call->app.nFree = (u_short)(tnFree - nbytes);
return nbytes;
}
NETPRI;
- AFS_RXGLOCK();
- MUTEX_ENTER(&call->lock);
bytes = rxi_WriteProc(call, buf, nbytes);
- MUTEX_EXIT(&call->lock);
- AFS_RXGUNLOCK();
USERPRI;
return bytes;
}
/* Optimization for marshalling 32 bit arguments */
int
-rx_WriteProc32(register struct rx_call *call, register afs_int32 * value)
+rx_WriteProc32(struct rx_call *call, afs_int32 * value)
{
int bytes;
int tcurlen;
char *tcurpos;
SPLVAR;
- /*
- * Free any packets from the last call to ReadvProc/WritevProc.
- * We do not need the lock because the receiver threads only
- * touch the iovq when the RX_CALL_IOVEC_WAIT flag is set, and the
- * RX_CALL_IOVEC_WAIT is always cleared before returning from
- * ReadvProc/WritevProc.
- */
- if (!queue_IsEmpty(&call->iovq)) {
- register struct rx_packet *rp;
- register struct rx_packet *nxp;
- for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
- queue_Remove(rp);
- rxi_FreePacket(rp);
- }
+ if (!opr_queue_IsEmpty(&call->app.iovq)) {
+#ifdef RXDEBUG_PACKET
+ call->iovqc -=
+#endif /* RXDEBUG_PACKET */
+ rxi_FreePackets(0, &call->app.iovq);
}
/*
* Most common case: all of the data fits in the current iovec.
- * We do not need the lock because this is the only thread that
- * updates the curlen, curpos, nFree fields.
- *
* We are relying on nFree being zero unless the call is in send mode.
*/
- tcurlen = (int)call->curlen;
- tnFree = (int)call->nFree;
+ tcurlen = call->app.curlen;
+ tnFree = call->app.nFree;
if (!call->error && tcurlen >= sizeof(afs_int32)
&& tnFree >= sizeof(afs_int32)) {
- tcurpos = call->curpos;
- if (!((long)tcurpos & (sizeof(afs_int32) - 1))) {
+ tcurpos = call->app.curpos;
+
+ if (!((size_t)tcurpos & (sizeof(afs_int32) - 1))) {
*((afs_int32 *) (tcurpos)) = *value;
} else {
memcpy(tcurpos, (char *)value, sizeof(afs_int32));
}
- call->curpos = tcurpos + sizeof(afs_int32);
- call->curlen = tcurlen - sizeof(afs_int32);
- call->nFree = tnFree - sizeof(afs_int32);
+ call->app.curpos = tcurpos + sizeof(afs_int32);
+ call->app.curlen = (u_short)(tcurlen - sizeof(afs_int32));
+ call->app.nFree = (u_short)(tnFree - sizeof(afs_int32));
return sizeof(afs_int32);
}
NETPRI;
- AFS_RXGLOCK();
- MUTEX_ENTER(&call->lock);
bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
- MUTEX_EXIT(&call->lock);
- AFS_RXGUNLOCK();
USERPRI;
return bytes;
}
* Fill in an iovec to point to data in packet buffers. The application
* calls rxi_WritevProc when the buffers are full.
*
- * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
+ * LOCKS USED -- called at netpri.
+ */
-int
+static int
rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
int nbytes)
{
struct rx_connection *conn = call->conn;
- struct rx_packet *cp = call->currentPacket;
- struct rx_packet *tp; /* temporary packet pointer */
- struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
+ struct rx_packet *cp;
int requestCount;
int nextio;
/* Temporary values, real work is done in rxi_WritevProc */
int tnFree;
- int tcurvec;
+ unsigned int tcurvec;
char *tcurpos;
int tcurlen;
nextio = 0;
/* Free any packets from the last call to ReadvProc/WritevProc */
- for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
- queue_Remove(tp);
- rxi_FreePacket(tp);
+ if (!opr_queue_IsEmpty(&call->app.iovq)) {
+#ifdef RXDEBUG_PACKET
+ call->iovqc -=
+#endif /* RXDEBUG_PACKET */
+ rxi_FreePackets(0, &call->app.iovq);
}
- if (call->mode != RX_MODE_SENDING) {
+ if (call->app.mode != RX_MODE_SENDING) {
if ((conn->type == RX_SERVER_CONNECTION)
- && (call->mode == RX_MODE_RECEIVING)) {
- call->mode = RX_MODE_SENDING;
- if (cp) {
- rxi_FreePacket(cp);
- cp = call->currentPacket = (struct rx_packet *)0;
- call->nLeft = 0;
- call->nFree = 0;
+ && (call->app.mode == RX_MODE_RECEIVING)) {
+ call->app.mode = RX_MODE_SENDING;
+ if (call->app.currentPacket) {
+#ifdef RX_TRACK_PACKETS
+ call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
+#endif
+ rxi_FreePacket(call->app.currentPacket);
+ call->app.currentPacket = NULL;
+ call->app.nLeft = 0;
+ call->app.nFree = 0;
}
} else {
return 0;
}
/* Set up the iovec to point to data in packet buffers. */
- tnFree = call->nFree;
- tcurvec = call->curvec;
- tcurpos = call->curpos;
- tcurlen = call->curlen;
+ tnFree = call->app.nFree;
+ tcurvec = call->app.curvec;
+ tcurpos = call->app.curpos;
+ tcurlen = call->app.curlen;
+ cp = call->app.currentPacket;
do {
- register unsigned int t;
+ int t;
if (tnFree == 0) {
/* current packet is full, allocate a new one */
+ MUTEX_ENTER(&call->lock);
cp = rxi_AllocSendPacket(call, nbytes);
+ MUTEX_EXIT(&call->lock);
if (cp == NULL) {
/* out of space, return what we have */
*nio = nextio;
return requestCount - nbytes;
}
- queue_Append(&call->iovq, cp);
+#ifdef RX_TRACK_PACKETS
+ cp->flags |= RX_PKTFLAG_IOVQ;
+#endif
+ opr_queue_Append(&call->app.iovq, &cp->entry);
+#ifdef RXDEBUG_PACKET
+ call->iovqc++;
+#endif /* RXDEBUG_PACKET */
tnFree = cp->length;
tcurvec = 1;
tcurpos =
if (tnFree < nbytes) {
/* try to extend the current packet */
- register int len, mud;
+ int len, mud;
len = cp->length;
mud = rx_MaxUserDataSize(call);
if (mud > len) {
if (cp->length > (unsigned)mud)
cp->length = mud;
tnFree += (cp->length - len);
- if (cp == call->currentPacket) {
- call->nFree += (cp->length - len);
+ if (cp == call->app.currentPacket) {
+ call->app.nFree += (cp->length - len);
}
}
}
SPLVAR;
NETPRI;
- AFS_RXGLOCK();
- MUTEX_ENTER(&call->lock);
bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
- MUTEX_EXIT(&call->lock);
- AFS_RXGUNLOCK();
USERPRI;
return bytes;
}
*
* Send buffers allocated in rxi_WritevAlloc.
*
- * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
-
+ * LOCKS USED -- called at netpri.
+ */
int
rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
{
- struct rx_packet *cp = call->currentPacket;
- register struct rx_packet *tp; /* Temporary packet pointer */
- register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
+#ifdef RX_TRACK_PACKETS
+ struct opr_queue *cursor;
+#endif
int nextio;
int requestCount;
- struct rx_queue tmpq;
+ struct opr_queue tmpq;
+#ifdef RXDEBUG_PACKET
+ u_short tmpqc;
+#endif
requestCount = nbytes;
nextio = 0;
- if (call->mode != RX_MODE_SENDING) {
+ MUTEX_ENTER(&call->lock);
+ if (call->error) {
+ call->app.mode = RX_MODE_ERROR;
+ } else if (call->app.mode != RX_MODE_SENDING) {
call->error = RX_PROTOCOL_ERROR;
}
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- /* Wait until TQ_BUSY is reset before trying to move any
- * packets to the transmit queue. */
- while (!call->error && call->flags & RX_CALL_TQ_BUSY) {
- call->flags |= RX_CALL_TQ_WAIT;
-#ifdef RX_ENABLE_LOCKS
- CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
- osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
- }
+ rxi_WaitforTQBusy(call);
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
if (call->error) {
- for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
- queue_Remove(tp);
- rxi_FreePacket(tp);
- }
- if (cp) {
- rxi_FreePacket(cp);
- cp = call->currentPacket = NULL;
+ call->app.mode = RX_MODE_ERROR;
+ MUTEX_EXIT(&call->lock);
+ if (call->app.currentPacket) {
+#ifdef RX_TRACK_PACKETS
+ call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
+ call->app.currentPacket->flags |= RX_PKTFLAG_IOVQ;
+#endif
+ opr_queue_Prepend(&call->app.iovq,
+ &call->app.currentPacket->entry);
+#ifdef RXDEBUG_PACKET
+ call->iovqc++;
+#endif /* RXDEBUG_PACKET */
+ call->app.currentPacket = NULL;
}
+#ifdef RXDEBUG_PACKET
+ call->iovqc -=
+#endif /* RXDEBUG_PACKET */
+ rxi_FreePackets(0, &call->app.iovq);
return 0;
}
* the iovec. We put the loop condition at the end to ensure that
* a zero length write will push a short packet. */
nextio = 0;
- queue_Init(&tmpq);
+ opr_queue_Init(&tmpq);
+#ifdef RXDEBUG_PACKET
+ tmpqc = 0;
+#endif /* RXDEBUG_PACKET */
do {
- if (call->nFree == 0 && cp) {
+ if (call->app.nFree == 0 && call->app.currentPacket) {
clock_NewTime(); /* Bogus: need new time package */
- /* The 0, below, specifies that it is not the last packet:
+ /* The 0, below, specifies that it is not the last packet:
* there will be others. PrepareSendPacket may
* alter the packet length by up to
* conn->securityMaxTrailerSize */
- hadd32(call->bytesSent, cp->length);
- rxi_PrepareSendPacket(call, cp, 0);
- queue_Append(&tmpq, cp);
+ call->app.bytesSent += call->app.currentPacket->length;
+ rxi_PrepareSendPacket(call, call->app.currentPacket, 0);
+#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+ /* PrepareSendPacket drops the call lock */
+ rxi_WaitforTQBusy(call);
+#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+ opr_queue_Append(&tmpq, &call->app.currentPacket->entry);
+#ifdef RXDEBUG_PACKET
+ tmpqc++;
+#endif /* RXDEBUG_PACKET */
+ call->app.currentPacket = NULL;
/* The head of the iovq is now the current packet */
if (nbytes) {
- if (queue_IsEmpty(&call->iovq)) {
+ if (opr_queue_IsEmpty(&call->app.iovq)) {
+ MUTEX_EXIT(&call->lock);
call->error = RX_PROTOCOL_ERROR;
- cp = call->currentPacket = NULL;
- for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
- queue_Remove(tp);
- rxi_FreePacket(tp);
- }
+#ifdef RXDEBUG_PACKET
+ tmpqc -=
+#endif /* RXDEBUG_PACKET */
+ rxi_FreePackets(0, &tmpq);
return 0;
}
- cp = queue_First(&call->iovq, rx_packet);
- queue_Remove(cp);
- call->currentPacket = cp;
- call->nFree = cp->length;
- call->curvec = 1;
- call->curpos =
- (char *)cp->wirevec[1].iov_base +
+ call->app.currentPacket
+ = opr_queue_First(&call->app.iovq, struct rx_packet,
+ entry);
+ opr_queue_Remove(&call->app.currentPacket->entry);
+#ifdef RX_TRACK_PACKETS
+ call->app.currentPacket->flags &= ~RX_PKTFLAG_IOVQ;
+ call->app.currentPacket->flags |= RX_PKTFLAG_CP;
+#endif
+#ifdef RXDEBUG_PACKET
+ call->iovqc--;
+#endif /* RXDEBUG_PACKET */
+ call->app.nFree = call->app.currentPacket->length;
+ call->app.curvec = 1;
+ call->app.curpos =
+ (char *) call->app.currentPacket->wirevec[1].iov_base +
+ call->conn->securityHeaderSize;
+ call->app.curlen =
+ call->app.currentPacket->wirevec[1].iov_len -
call->conn->securityHeaderSize;
- call->curlen =
- cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
}
}
if (nbytes) {
/* The next iovec should point to the current position */
- if (iov[nextio].iov_base != call->curpos
- || iov[nextio].iov_len > (int)call->curlen) {
+ if (iov[nextio].iov_base != call->app.curpos
+ || iov[nextio].iov_len > (int)call->app.curlen) {
call->error = RX_PROTOCOL_ERROR;
- for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
- queue_Remove(tp);
- rxi_FreePacket(tp);
- }
- if (cp) {
- rxi_FreePacket(cp);
- call->currentPacket = NULL;
+ MUTEX_EXIT(&call->lock);
+ if (call->app.currentPacket) {
+#ifdef RX_TRACK_PACKETS
+ call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
+#endif
+ opr_queue_Prepend(&tmpq,
+ &call->app.currentPacket->entry);
+#ifdef RXDEBUG_PACKET
+ tmpqc++;
+#endif /* RXDEBUG_PACKET */
+ call->app.currentPacket = NULL;
}
+#ifdef RXDEBUG_PACKET
+ tmpqc -=
+#endif /* RXDEBUG_PACKET */
+ rxi_FreePackets(0, &tmpq);
return 0;
}
nbytes -= iov[nextio].iov_len;
- call->curpos += iov[nextio].iov_len;
- call->curlen -= iov[nextio].iov_len;
- call->nFree -= iov[nextio].iov_len;
+ call->app.curpos += iov[nextio].iov_len;
+ call->app.curlen -= iov[nextio].iov_len;
+ call->app.nFree -= iov[nextio].iov_len;
nextio++;
- if (call->curlen == 0) {
- if (++call->curvec > cp->niovecs) {
- call->nFree = 0;
+ if (call->app.curlen == 0) {
+ if (++call->app.curvec > call->app.currentPacket->niovecs) {
+ call->app.nFree = 0;
} else {
- call->curpos = (char *)cp->wirevec[call->curvec].iov_base;
- call->curlen = cp->wirevec[call->curvec].iov_len;
+ call->app.curpos =
+ call->app.currentPacket->wirevec[call->app.curvec].iov_base;
+ call->app.curlen =
+ call->app.currentPacket->wirevec[call->app.curvec].iov_len;
}
}
}
/* Move the packets from the temporary queue onto the transmit queue.
* We may end up with more than call->twind packets on the queue. */
- for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
- queue_Remove(tp);
- queue_Append(&call->tq, tp);
+
+#ifdef RX_TRACK_PACKETS
+ for (opr_queue_Scan(&tmpq, cursor))
+ {
+ struct rx_packet *p = opr_queue_Entry(cursor, struct rx_packet, entry);
+ p->flags |= RX_PKTFLAG_TQ;
}
+#endif
+ if (call->error)
+ call->app.mode = RX_MODE_ERROR;
+
+ opr_queue_SpliceAppend(&call->tq, &tmpq);
- if (!(call->flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
- rxi_Start(0, call, 0);
+ /* If the call is in recovery, let it exhaust its current retransmit
+ * queue before forcing it to send new packets
+ */
+ if (!(call->flags & RX_CALL_FAST_RECOVER)) {
+ rxi_Start(call, 0);
}
/* Wait for the length of the transmit queue to fall below call->twind */
- while (!call->error && call->tnext + 1 > call->tfirst + call->twind) {
+ while (!call->error && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
clock_NewTime();
call->startWait = clock_Sec();
#ifdef RX_ENABLE_LOCKS
}
if (call->error) {
- if (cp) {
- rxi_FreePacket(cp);
- cp = call->currentPacket = NULL;
+ call->app.mode = RX_MODE_ERROR;
+ call->app.currentPacket = NULL;
+ MUTEX_EXIT(&call->lock);
+ if (call->app.currentPacket) {
+#ifdef RX_TRACK_PACKETS
+ call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
+#endif
+ rxi_FreePacket(call->app.currentPacket);
}
return 0;
}
+ MUTEX_EXIT(&call->lock);
return requestCount - nbytes;
}
SPLVAR;
NETPRI;
- AFS_RXGLOCK();
- MUTEX_ENTER(&call->lock);
bytes = rxi_WritevProc(call, iov, nio, nbytes);
- MUTEX_EXIT(&call->lock);
- AFS_RXGUNLOCK();
USERPRI;
return bytes;
}
/* Flush any buffered data to the stream, switch to read mode
- * (clients) or to EOF mode (servers) */
+ * (clients) or to EOF mode (servers)
+ *
+ * LOCKS HELD: called at netpri.
+ */
void
-rxi_FlushWrite(register struct rx_call *call)
+rxi_FlushWrite(struct rx_call *call)
{
- register struct rx_packet *cp = call->currentPacket;
- register struct rx_packet *tp; /* Temporary packet pointer */
- register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
+ struct rx_packet *cp = NULL;
/* Free any packets from the last call to ReadvProc/WritevProc */
- for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
- queue_Remove(tp);
- rxi_FreePacket(tp);
+ if (!opr_queue_IsEmpty(&call->app.iovq)) {
+#ifdef RXDEBUG_PACKET
+ call->iovqc -=
+#endif /* RXDEBUG_PACKET */
+ rxi_FreePackets(0, &call->app.iovq);
}
- if (call->mode == RX_MODE_SENDING) {
+ if (call->app.mode == RX_MODE_SENDING) {
- call->mode =
+ call->app.mode =
(call->conn->type ==
RX_CLIENT_CONNECTION ? RX_MODE_RECEIVING : RX_MODE_EOF);
}
#endif
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- /* Wait until TQ_BUSY is reset before adding any
- * packets to the transmit queue
- */
- while (call->flags & RX_CALL_TQ_BUSY) {
- call->flags |= RX_CALL_TQ_WAIT;
-#ifdef RX_ENABLE_LOCKS
- CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
- osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
- }
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+ MUTEX_ENTER(&call->lock);
+ if (call->error)
+ call->app.mode = RX_MODE_ERROR;
+
+ call->flags |= RX_CALL_FLUSH;
+
+ cp = call->app.currentPacket;
if (cp) {
/* cp->length is only supposed to be the user's data */
- /* cp->length was already set to (then-current)
+ /* cp->length was already set to (then-current)
* MaxUserDataSize or less. */
- cp->length -= call->nFree;
- call->currentPacket = (struct rx_packet *)0;
- call->nFree = 0;
+#ifdef RX_TRACK_PACKETS
+ cp->flags &= ~RX_PKTFLAG_CP;
+#endif
+ cp->length -= call->app.nFree;
+ call->app.currentPacket = NULL;
+ call->app.nFree = 0;
} else {
cp = rxi_AllocSendPacket(call, 0);
if (!cp) {
}
cp->length = 0;
cp->niovecs = 2; /* header + space for rxkad stuff */
- call->nFree = 0;
+ call->app.nFree = 0;
}
/* The 1 specifies that this is the last packet */
- hadd32(call->bytesSent, cp->length);
+ call->app.bytesSent += cp->length;
rxi_PrepareSendPacket(call, cp, 1);
- queue_Append(&call->tq, cp);
- if (!
- (call->
- flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
- rxi_Start(0, call, 0);
+#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+ /* PrepareSendPacket drops the call lock */
+ rxi_WaitforTQBusy(call);
+#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#ifdef RX_TRACK_PACKETS
+ cp->flags |= RX_PKTFLAG_TQ;
+#endif
+ opr_queue_Append(&call->tq, &cp->entry);
+#ifdef RXDEBUG_PACKET
+ call->tqc++;
+#endif /* RXDEBUG_PACKET */
+
+ /* If the call is in recovery, let it exhaust its current retransmit
+ * queue before forcing it to send new packets
+ */
+ if (!(call->flags & RX_CALL_FAST_RECOVER)) {
+ rxi_Start(call, 0);
}
+ MUTEX_EXIT(&call->lock);
}
}
{
SPLVAR;
NETPRI;
- AFS_RXGLOCK();
- MUTEX_ENTER(&call->lock);
rxi_FlushWrite(call);
- MUTEX_EXIT(&call->lock);
- AFS_RXGUNLOCK();
USERPRI;
}