#include "h/types.h"
#include "h/time.h"
#include "h/stat.h"
+#if defined(AFS_AIX_ENV) || defined(AFS_AUX_ENV) || defined(AFS_SUN5_ENV)
+#include "h/systm.h"
+#endif
#ifdef AFS_OSF_ENV
#include <net/net_globals.h>
#endif /* AFS_OSF_ENV */
#include "rx/rx_globals.h"
#include "afs/lock.h"
#include "afsint.h"
-#ifdef AFS_ALPHA_ENV
+#ifdef AFS_OSF_ENV
#undef kmem_alloc
#undef kmem_free
#undef mem_alloc
#undef mem_free
#undef register
-#endif /* AFS_ALPHA_ENV */
+#endif /* AFS_OSF_ENV */
#else /* KERNEL */
# include <sys/types.h>
-#ifndef AFS_NT40_ENV
+#ifdef AFS_NT40_ENV
+# include <winsock2.h>
+#else /* !AFS_NT40_ENV */
# include <sys/socket.h>
# include <sys/file.h>
# include <netdb.h>
# include <netinet/in.h>
# include <sys/stat.h>
# include <sys/time.h>
-#endif
-#ifdef HAVE_STRING_H
+#endif /* !AFS_NT40_ENV */
#include <string.h>
-#else
-#ifdef HAVE_STRINGS_H
-#include <strings.h>
-#endif
-#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
{
register struct rx_packet *cp = call->currentPacket;
register struct rx_packet *rp;
- register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
register int requestCount;
register unsigned int t;
+
/* XXXX took out clock_NewTime from here. Was it needed? */
requestCount = nbytes;
/* Free any packets from the last call to ReadvProc/WritevProc */
- if (!queue_IsEmpty(&call->iovq)) {
- for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
- queue_Remove(rp);
- rxi_FreePacket(rp);
- }
+ if (queue_IsNotEmpty(&call->iovq)) {
+ rxi_FreePackets(0, &call->iovq);
}
do {
RX_CALL_REFCOUNT_DELAY);
rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
} else {
- struct clock when;
- clock_GetTime(&when);
+ struct clock when, now;
+ clock_GetTime(&now);
+ when = now;
/* Delay to consolidate ack packets */
clock_Add(&when, &rx_hardAckDelay);
if (!call->delayedAckEvent
RX_CALL_REFCOUNT_DELAY);
CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
call->delayedAckEvent =
- rxevent_Post(&when,
+ rxevent_PostNow(&when, &now,
rxi_SendDelayedAck, call,
0);
}
* ReadvProc/WritevProc.
*/
if (!queue_IsEmpty(&call->iovq)) {
- register struct rx_packet *rp;
- register struct rx_packet *nxp;
- for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
- queue_Remove(rp);
- rxi_FreePacket(rp);
- }
+ rxi_FreePackets(0, &call->iovq);
}
/*
call->curpos = tcurpos + nbytes;
call->curlen = tcurlen - nbytes;
call->nLeft = tnLeft - nbytes;
+
+ if (!call->nLeft) {
+ /* out of packet. Get another one. */
+ NETPRI;
+ MUTEX_ENTER(&call->lock);
+ rxi_FreePacket(call->currentPacket);
+ call->currentPacket = (struct rx_packet *)0;
+ MUTEX_EXIT(&call->lock);
+ USERPRI;
+ }
return nbytes;
}
NETPRI;
- AFS_RXGLOCK();
MUTEX_ENTER(&call->lock);
bytes = rxi_ReadProc(call, buf, nbytes);
MUTEX_EXIT(&call->lock);
- AFS_RXGUNLOCK();
USERPRI;
return bytes;
}
* ReadvProc/WritevProc.
*/
if (!queue_IsEmpty(&call->iovq)) {
- register struct rx_packet *rp;
- register struct rx_packet *nxp;
- for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
- queue_Remove(rp);
- rxi_FreePacket(rp);
- }
+ rxi_FreePackets(0, &call->iovq);
}
/*
*/
tcurlen = call->curlen;
tnLeft = call->nLeft;
- if (!call->error && tcurlen > sizeof(afs_int32)
- && tnLeft > sizeof(afs_int32)) {
+ if (!call->error && tcurlen >= sizeof(afs_int32)
+ && tnLeft >= sizeof(afs_int32)) {
tcurpos = call->curpos;
- if (!((long)tcurpos & (sizeof(afs_int32) - 1))) {
- *value = *((afs_int32 *) (tcurpos));
- } else {
- memcpy((char *)value, tcurpos, sizeof(afs_int32));
- }
+ memcpy((char *)value, tcurpos, sizeof(afs_int32));
call->curpos = tcurpos + sizeof(afs_int32);
- call->curlen = tcurlen - sizeof(afs_int32);
- call->nLeft = tnLeft - sizeof(afs_int32);
+ call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
+ call->nLeft = (u_short)(tnLeft - sizeof(afs_int32));
+ if (!call->nLeft) {
+ /* out of packet. Get another one. */
+ NETPRI;
+ MUTEX_ENTER(&call->lock);
+ rxi_FreePacket(call->currentPacket);
+ call->currentPacket = (struct rx_packet *)0;
+ MUTEX_EXIT(&call->lock);
+ USERPRI;
+ }
return sizeof(afs_int32);
}
NETPRI;
- AFS_RXGLOCK();
MUTEX_ENTER(&call->lock);
bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
MUTEX_EXIT(&call->lock);
- AFS_RXGUNLOCK();
USERPRI;
return bytes;
}
rxi_SendAck(call, 0, serial, RX_ACK_DELAY, 0);
didHardAck = 1;
} else {
- struct clock when;
- clock_GetTime(&when);
+ struct clock when, now;
+ clock_GetTime(&now);
+ when = now;
/* Delay to consolidate ack packets */
clock_Add(&when, &rx_hardAckDelay);
if (!call->delayedAckEvent
RX_CALL_REFCOUNT_DELAY);
CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
call->delayedAckEvent =
- rxevent_Post(&when, rxi_SendDelayedAck, call, 0);
+ rxevent_PostNow(&when, &now, rxi_SendDelayedAck, call, 0);
}
}
}
rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
int nbytes)
{
- struct rx_packet *rp;
- struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
int requestCount;
int nextio;
nextio = 0;
/* Free any packets from the last call to ReadvProc/WritevProc */
- for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
- queue_Remove(rp);
- rxi_FreePacket(rp);
+ if (queue_IsNotEmpty(&call->iovq)) {
+ rxi_FreePackets(0, &call->iovq);
}
if (call->mode == RX_MODE_SENDING) {
SPLVAR;
NETPRI;
- AFS_RXGLOCK();
MUTEX_ENTER(&call->lock);
bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
MUTEX_EXIT(&call->lock);
- AFS_RXGUNLOCK();
USERPRI;
return bytes;
}
{
struct rx_connection *conn = call->conn;
register struct rx_packet *cp = call->currentPacket;
- register struct rx_packet *tp; /* Temporary packet pointer */
- register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
register unsigned int t;
int requestCount = nbytes;
/* Free any packets from the last call to ReadvProc/WritevProc */
- if (!queue_IsEmpty(&call->iovq)) {
- for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
- queue_Remove(tp);
- rxi_FreePacket(tp);
- }
+ if (queue_IsNotEmpty(&call->iovq)) {
+ rxi_FreePackets(0, &call->iovq);
}
if (call->mode != RX_MODE_SENDING) {
}
/* Wait for transmit window to open up */
while (!call->error
- && call->tnext + 1 > call->tfirst + call->twind) {
+ && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
clock_NewTime();
call->startWait = clock_Sec();
buf += t;
nbytes -= t;
call->curpos += t;
- call->curlen -= t;
- call->nFree -= t;
+ call->curlen -= (u_short)t;
+ call->nFree -= (u_short)t;
if (!call->curlen) {
/* need to get another struct iov */
* RX_CALL_IOVEC_WAIT is always cleared before returning from
* ReadvProc/WritevProc.
*/
- if (!queue_IsEmpty(&call->iovq)) {
- register struct rx_packet *rp;
- register struct rx_packet *nxp;
- for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
- queue_Remove(rp);
- rxi_FreePacket(rp);
- }
+ if (queue_IsNotEmpty(&call->iovq)) {
+ rxi_FreePackets(0, &call->iovq);
}
/*
tcurpos = call->curpos;
memcpy(tcurpos, buf, nbytes);
call->curpos = tcurpos + nbytes;
- call->curlen = tcurlen - nbytes;
- call->nFree = tnFree - nbytes;
+ call->curlen = (u_short)(tcurlen - nbytes);
+ call->nFree = (u_short)(tnFree - nbytes);
return nbytes;
}
NETPRI;
- AFS_RXGLOCK();
MUTEX_ENTER(&call->lock);
bytes = rxi_WriteProc(call, buf, nbytes);
MUTEX_EXIT(&call->lock);
- AFS_RXGUNLOCK();
USERPRI;
return bytes;
}
* RX_CALL_IOVEC_WAIT is always cleared before returning from
* ReadvProc/WritevProc.
*/
- if (!queue_IsEmpty(&call->iovq)) {
- register struct rx_packet *rp;
- register struct rx_packet *nxp;
- for (queue_Scan(&call->iovq, rp, nxp, rx_packet)) {
- queue_Remove(rp);
- rxi_FreePacket(rp);
- }
+ if (queue_IsNotEmpty(&call->iovq)) {
+ rxi_FreePackets(0, &call->iovq);
}
/*
*
* We are relying on nFree being zero unless the call is in send mode.
*/
- tcurlen = (int)call->curlen;
- tnFree = (int)call->nFree;
+ tcurlen = call->curlen;
+ tnFree = call->nFree;
if (!call->error && tcurlen >= sizeof(afs_int32)
&& tnFree >= sizeof(afs_int32)) {
tcurpos = call->curpos;
- if (!((long)tcurpos & (sizeof(afs_int32) - 1))) {
+ if (!((size_t)tcurpos & (sizeof(afs_int32) - 1))) {
*((afs_int32 *) (tcurpos)) = *value;
} else {
memcpy(tcurpos, (char *)value, sizeof(afs_int32));
}
call->curpos = tcurpos + sizeof(afs_int32);
- call->curlen = tcurlen - sizeof(afs_int32);
- call->nFree = tnFree - sizeof(afs_int32);
+ call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
+ call->nFree = (u_short)(tnFree - sizeof(afs_int32));
return sizeof(afs_int32);
}
NETPRI;
- AFS_RXGLOCK();
MUTEX_ENTER(&call->lock);
bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
MUTEX_EXIT(&call->lock);
- AFS_RXGUNLOCK();
USERPRI;
return bytes;
}
{
struct rx_connection *conn = call->conn;
struct rx_packet *cp = call->currentPacket;
- struct rx_packet *tp; /* temporary packet pointer */
- struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
int requestCount;
int nextio;
/* Temporary values, real work is done in rxi_WritevProc */
nextio = 0;
/* Free any packets from the last call to ReadvProc/WritevProc */
- for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
- queue_Remove(tp);
- rxi_FreePacket(tp);
+ if (queue_IsNotEmpty(&call->iovq)) {
+ rxi_FreePackets(0, &call->iovq);
}
if (call->mode != RX_MODE_SENDING) {
SPLVAR;
NETPRI;
- AFS_RXGLOCK();
MUTEX_ENTER(&call->lock);
bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
MUTEX_EXIT(&call->lock);
- AFS_RXGUNLOCK();
USERPRI;
return bytes;
}
rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
{
struct rx_packet *cp = call->currentPacket;
- register struct rx_packet *tp; /* Temporary packet pointer */
- register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
int nextio;
int requestCount;
struct rx_queue tmpq;
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
if (call->error) {
- for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
- queue_Remove(tp);
- rxi_FreePacket(tp);
- }
if (cp) {
- rxi_FreePacket(cp);
+ queue_Prepend(&call->iovq, cp);
cp = call->currentPacket = NULL;
}
+ rxi_FreePackets(0, &call->iovq);
return 0;
}
if (queue_IsEmpty(&call->iovq)) {
call->error = RX_PROTOCOL_ERROR;
cp = call->currentPacket = NULL;
- for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
- queue_Remove(tp);
- rxi_FreePacket(tp);
- }
+ rxi_FreePackets(0, &tmpq);
return 0;
}
cp = queue_First(&call->iovq, rx_packet);
if (iov[nextio].iov_base != call->curpos
|| iov[nextio].iov_len > (int)call->curlen) {
call->error = RX_PROTOCOL_ERROR;
- for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
- queue_Remove(tp);
- rxi_FreePacket(tp);
- }
if (cp) {
- rxi_FreePacket(cp);
+ queue_Prepend(&tmpq, cp);
call->currentPacket = NULL;
}
+ rxi_FreePackets(0, &tmpq);
return 0;
}
nbytes -= iov[nextio].iov_len;
/* Move the packets from the temporary queue onto the transmit queue.
* We may end up with more than call->twind packets on the queue. */
- for (queue_Scan(&tmpq, tp, nxp, rx_packet)) {
- queue_Remove(tp);
- queue_Append(&call->tq, tp);
- }
+ queue_SpliceAppend(&call->tq, &tmpq);
if (!(call->flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
rxi_Start(0, call, 0, 0);
}
/* Wait for the length of the transmit queue to fall below call->twind */
- while (!call->error && call->tnext + 1 > call->tfirst + call->twind) {
+ while (!call->error && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
clock_NewTime();
call->startWait = clock_Sec();
#ifdef RX_ENABLE_LOCKS
SPLVAR;
NETPRI;
- AFS_RXGLOCK();
MUTEX_ENTER(&call->lock);
bytes = rxi_WritevProc(call, iov, nio, nbytes);
MUTEX_EXIT(&call->lock);
- AFS_RXGUNLOCK();
USERPRI;
return bytes;
}
rxi_FlushWrite(register struct rx_call *call)
{
register struct rx_packet *cp = call->currentPacket;
- register struct rx_packet *tp; /* Temporary packet pointer */
- register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
/* Free any packets from the last call to ReadvProc/WritevProc */
- for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
- queue_Remove(tp);
- rxi_FreePacket(tp);
+ if (queue_IsNotEmpty(&call->iovq)) {
+ rxi_FreePackets(0, &call->iovq);
}
if (call->mode == RX_MODE_SENDING) {
{
SPLVAR;
NETPRI;
- AFS_RXGLOCK();
MUTEX_ENTER(&call->lock);
rxi_FlushWrite(call);
MUTEX_EXIT(&call->lock);
- AFS_RXGUNLOCK();
USERPRI;
}