/* RX: Extended Remote Procedure Call */
+#include <afsconfig.h>
#ifdef KERNEL
#include "../afs/param.h"
#else
#include <afs/param.h>
#endif
-#include <afsconfig.h>
RCSID("$Header$");
rxi_nCalls = 0;
rx_connDeadTime = 12;
rx_tranquil = 0; /* reset flag */
- bzero((char *)&rx_stats, sizeof(struct rx_stats));
+ memset((char *)&rx_stats, 0, sizeof(struct rx_stats));
htable = (char *)
osi_Alloc(rx_hashTableSize*sizeof(struct rx_connection *));
PIN(htable, rx_hashTableSize*sizeof(struct rx_connection *)); /* XXXXX */
- bzero(htable, rx_hashTableSize*sizeof(struct rx_connection *));
+ memset(htable, 0, rx_hashTableSize*sizeof(struct rx_connection *));
ptable = (char *) osi_Alloc(rx_hashTableSize*sizeof(struct rx_peer *));
PIN(ptable, rx_hashTableSize*sizeof(struct rx_peer *)); /* XXXXX */
- bzero(ptable, rx_hashTableSize*sizeof(struct rx_peer *));
+ memset(ptable, 0, rx_hashTableSize*sizeof(struct rx_peer *));
/* Malloc up a bunch of packets & buffers */
rx_nFreePackets = 0;
* last reply packets */
rxevent_Cancel(call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
- rxi_AckAll((struct rxevent *)0, call, 0);
+ if (call->state == RX_STATE_PRECALL ||
+ call->state == RX_STATE_ACTIVE) {
+ rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY, 0);
+ } else {
+ rxi_AckAll((struct rxevent *)0, call, 0);
+ }
}
MUTEX_EXIT(&call->lock);
}
/* Make sure the connection is completely reset before deleting it. */
/* get rid of pending events that could zap us later */
- if (conn->challengeEvent) {
+ if (conn->challengeEvent)
rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
- }
+ if (conn->checkReachEvent)
+ rxevent_Cancel(conn->checkReachEvent, (struct rx_call*)0, 0);
/* Add the connection to the list of destroyed connections that
* need to be cleaned up. This is necessary to avoid deadlocks
clock_GetTime(&queueTime);
AFS_RXGLOCK();
MUTEX_ENTER(&conn->conn_call_lock);
+
+ /*
+ * Check if there are others waiting for a new call.
+ * If so, let them go first to avoid starving them.
+ * This is a fairly simple scheme, and might not be
+ * a complete solution for large numbers of waiters.
+ */
+ if (conn->makeCallWaiters) {
+#ifdef RX_ENABLE_LOCKS
+ CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
+#else
+ osi_rxSleep(conn);
+#endif
+ }
+
for (;;) {
for (i=0; i<RX_MAXCALLS; i++) {
call = conn->call[i];
}
else {
call = rxi_NewCall(conn, i);
- MUTEX_ENTER(&call->lock);
break;
}
}
MUTEX_ENTER(&conn->conn_data_lock);
conn->flags |= RX_CONN_MAKECALL_WAITING;
MUTEX_EXIT(&conn->conn_data_lock);
+
+ conn->makeCallWaiters++;
#ifdef RX_ENABLE_LOCKS
CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
#else
osi_rxSleep(conn);
#endif
+ conn->makeCallWaiters--;
}
+ /*
+ * Wake up anyone else who might be giving us a chance to
+ * run (see code above that avoids resource starvation).
+ */
+#ifdef RX_ENABLE_LOCKS
+ CV_BROADCAST(&conn->conn_call_cv);
+#else
+ osi_rxWakeup(conn);
+#endif
CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
service->idleDeadTime = 60;
service->connDeadTime = rx_connDeadTime;
service->executeRequestProc = serviceProc;
+ service->checkReach = 0;
rx_services[i] = service; /* not visible until now */
AFS_RXGUNLOCK();
USERPRI;
/* Allocate a call structure, for the indicated channel of the
* supplied connection. The mode and state of the call must be set by
- * the caller. */
+ * the caller. Returns the call with mutex locked. */
struct rx_call *rxi_NewCall(conn, channel)
register struct rx_connection *conn;
register int channel;
the call number is valid from the last time this channel was used */
if (*call->callNumber == 0) *call->callNumber = 1;
- MUTEX_EXIT(&call->lock);
return call;
}
p = (char *) osi_Alloc(size);
#endif
if (!p) osi_Panic("rxi_Alloc error");
- bzero(p, size);
+ memset(p, 0, size);
return p;
}
addr.sin_family = AF_INET;
addr.sin_port = port;
addr.sin_addr.s_addr = host;
-#if defined(AFS_OSF_ENV) && defined(_KERNEL)
+#ifdef STRUCT_SOCKADDR_HAS_SA_LEN
addr.sin_len = sizeof(addr);
#endif /* AFS_OSF_ENV */
drop = (*rx_justReceived) (np, &addr);
MUTEX_ENTER(&conn->conn_data_lock);
if (conn->maxSerial < np->header.serial)
- conn->maxSerial = np->header.serial;
+ conn->maxSerial = np->header.serial;
MUTEX_EXIT(&conn->conn_data_lock);
/* If the connection is in an error state, send an abort packet and ignore
return np;
}
if (!call) {
+ MUTEX_ENTER(&conn->conn_call_lock);
call = rxi_NewCall(conn, channel);
- MUTEX_ENTER(&call->lock);
+ MUTEX_EXIT(&conn->conn_call_lock);
*call->callNumber = np->header.callNumber;
call->state = RX_STATE_PRECALL;
clock_GetTime(&call->queueTime);
}
#endif /* KERNEL */
+static void rxi_CheckReachEvent(event, conn, acall)
+ struct rxevent *event;
+ struct rx_connection *conn;
+ struct rx_call *acall;
+{
+ struct rx_call *call = acall;
+ struct clock when;
+ int i, waiting;
+
+ MUTEX_ENTER(&conn->conn_call_lock);
+ MUTEX_ENTER(&conn->conn_data_lock);
+ conn->checkReachEvent = (struct rxevent *) 0;
+ waiting = conn->flags & RX_CONN_ATTACHWAIT;
+ if (event) conn->refCount--;
+ MUTEX_EXIT(&conn->conn_data_lock);
+
+ if (waiting) {
+ if (!call)
+ for (i=0; i<RX_MAXCALLS; i++) {
+ struct rx_call *tc = conn->call[i];
+ if (tc && tc->state == RX_STATE_PRECALL) {
+ call = tc;
+ break;
+ }
+ }
+
+ if (call) {
+ if (call != acall) MUTEX_ENTER(&call->lock);
+ rxi_SendAck(call, NULL, 0, 0, 0, RX_ACK_PING, 0);
+ if (call != acall) MUTEX_EXIT(&call->lock);
+
+ MUTEX_ENTER(&conn->conn_data_lock);
+ conn->refCount++;
+ MUTEX_EXIT(&conn->conn_data_lock);
+ clock_GetTime(&when);
+ when.sec += RX_CHECKREACH_TIMEOUT;
+ conn->checkReachEvent =
+ rxevent_Post(&when, rxi_CheckReachEvent, conn, NULL);
+ }
+ }
+ MUTEX_EXIT(&conn->conn_call_lock);
+}
+
+static int rxi_CheckConnReach(conn, call)
+ struct rx_connection *conn;
+ struct rx_call *call;
+{
+ struct rx_service *service = conn->service;
+ struct rx_peer *peer = conn->peer;
+ afs_uint32 now, lastReach;
+
+ if (service->checkReach == 0)
+ return 0;
+
+ now = clock_Sec();
+ MUTEX_ENTER(&peer->peer_lock);
+ lastReach = peer->lastReachTime;
+ MUTEX_EXIT(&peer->peer_lock);
+ if (now - lastReach < RX_CHECKREACH_TTL)
+ return 0;
+
+ MUTEX_ENTER(&conn->conn_data_lock);
+ if (conn->flags & RX_CONN_ATTACHWAIT) {
+ MUTEX_EXIT(&conn->conn_data_lock);
+ return 1;
+ }
+ conn->flags |= RX_CONN_ATTACHWAIT;
+ MUTEX_EXIT(&conn->conn_data_lock);
+ if (!conn->checkReachEvent)
+ rxi_CheckReachEvent((struct rxevent *)0, conn, call);
+
+ return 1;
+}
+
/* try to attach call, if authentication is complete */
-static void TryAttach(acall, socket, tnop, newcallp)
-register struct rx_call *acall;
-register osi_socket socket;
-register int *tnop;
-register struct rx_call **newcallp; {
- register struct rx_connection *conn;
- conn = acall->conn;
- if ((conn->type==RX_SERVER_CONNECTION) && (acall->state == RX_STATE_PRECALL)) {
+static void TryAttach(acall, socket, tnop, newcallp, reachOverride)
+ register struct rx_call *acall;
+ register osi_socket socket;
+ register int *tnop;
+ register struct rx_call **newcallp;
+ int reachOverride;
+{
+ struct rx_connection *conn = acall->conn;
+
+ if (conn->type==RX_SERVER_CONNECTION && acall->state==RX_STATE_PRECALL) {
/* Don't attach until we have any req'd. authentication. */
if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
- rxi_AttachServerProc(acall, socket, tnop, newcallp);
- /* Note: this does not necessarily succeed; there
- may not any proc available */
+ if (reachOverride || rxi_CheckConnReach(conn, acall) == 0)
+ rxi_AttachServerProc(acall, socket, tnop, newcallp);
+ /* Note: this does not necessarily succeed; there
+ * may not any proc available
+ */
}
else {
rxi_ChallengeOn(acall->conn);
* server thread is available, this thread becomes a server
* thread and the server thread becomes a listener thread. */
if (isFirst) {
- TryAttach(call, socket, tnop, newcallp);
+ TryAttach(call, socket, tnop, newcallp, 0);
}
}
/* This is not the expected next packet. */
static void rxi_ComputeRate();
#endif
+static void rxi_UpdatePeerReach(conn, acall)
+ struct rx_connection *conn;
+ struct rx_call *acall;
+{
+ struct rx_peer *peer = conn->peer;
+
+ MUTEX_ENTER(&peer->peer_lock);
+ peer->lastReachTime = clock_Sec();
+ MUTEX_EXIT(&peer->peer_lock);
+
+ MUTEX_ENTER(&conn->conn_call_lock);
+ MUTEX_ENTER(&conn->conn_data_lock);
+ if (conn->flags & RX_CONN_ATTACHWAIT) {
+ int i;
+
+ conn->flags &= ~RX_CONN_ATTACHWAIT;
+ MUTEX_EXIT(&conn->conn_data_lock);
+
+ for (i=0; i<RX_MAXCALLS; i++) {
+ struct rx_call *call = conn->call[i];
+ if (call) {
+ if (call != acall) MUTEX_ENTER(&call->lock);
+ TryAttach(call, -1, NULL, NULL, 1);
+ if (call != acall) MUTEX_EXIT(&call->lock);
+ }
+ }
+ } else
+ MUTEX_EXIT(&conn->conn_data_lock);
+ MUTEX_EXIT(&conn->conn_call_lock);
+}
+
/* The real smarts of the whole thing. */
struct rx_packet *rxi_ReceiveAckPacket(call, np, istack)
register struct rx_call *call;
if (np->header.flags & RX_SLOW_START_OK) {
call->flags |= RX_CALL_SLOW_START_OK;
}
+
+ if (ap->reason == RX_ACK_PING_RESPONSE)
+ rxi_UpdatePeerReach(conn, call);
#ifdef RXDEBUG
if (rx_Log) {
* set the ack bits in the packets and have rxi_Start remove the packets
* when it's done transmitting.
*/
- if (!tp->acked) {
+ if (!(tp->flags & RX_PKTFLAG_ACKED)) {
newAckCount++;
}
if (call->flags & RX_CALL_TQ_BUSY) {
#ifdef RX_ENABLE_LOCKS
- tp->acked = 1;
+ tp->flags |= RX_PKTFLAG_ACKED;
call->flags |= RX_CALL_TQ_SOME_ACKED;
#else /* RX_ENABLE_LOCKS */
break;
* out of sequence. */
if (tp->header.seq < first) {
/* Implicit ack information */
- if (!tp->acked) {
+ if (!(tp->flags & RX_PKTFLAG_ACKED)) {
newAckCount++;
}
- tp->acked = 1;
+ tp->flags |= RX_PKTFLAG_ACKED;
}
else if (tp->header.seq < first + nAcks) {
/* Explicit ack information: set it in the packet appropriately */
if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
- if (!tp->acked) {
+ if (!(tp->flags & RX_PKTFLAG_ACKED)) {
newAckCount++;
- tp->acked = 1;
+ tp->flags |= RX_PKTFLAG_ACKED;
}
if (missing) {
nNacked++;
call->nSoftAcked++;
}
} else {
- tp->acked = 0;
+ tp->flags &= ~RX_PKTFLAG_ACKED;
missing = 1;
}
}
else {
- tp->acked = 0;
+ tp->flags &= ~RX_PKTFLAG_ACKED;
missing = 1;
}
* ie, this should readjust the retransmit timer for all outstanding
* packets... So we don't just retransmit when we should know better*/
- if (!tp->acked && !clock_IsZero(&tp->retryTime)) {
+ if (!(tp->flags & RX_PKTFLAG_ACKED) && !clock_IsZero(&tp->retryTime)) {
tp->retryTime = tp->timeSent;
clock_Add(&tp->retryTime, &peer->timeout);
/* shift by eight because one quarter-sec ~ 256 milliseconds */
/* if the ack packet has a receivelen field hanging off it,
* update our state */
- if ( np->length >= rx_AckDataSize(ap->nAcks) +sizeof(afs_int32)) {
+ if ( np->length >= rx_AckDataSize(ap->nAcks) + 2*sizeof(afs_int32)) {
afs_uint32 tSize;
/* If the ack packet has a "recommended" size that is less than
* so we will retransmit as soon as the window permits*/
for(acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
if (acked) {
- if (!tp->acked) {
+ if (!(tp->flags & RX_PKTFLAG_ACKED)) {
clock_Zero(&tp->retryTime);
}
- } else if (tp->acked) {
+ } else if (tp->flags & RX_PKTFLAG_ACKED) {
acked = 1;
}
}
}
else {
/* If the response is valid, any calls waiting to attach
- * servers can now do so */
+ * servers can now do so */
int i;
+
for (i=0; i<RX_MAXCALLS; i++) {
struct rx_call *call = conn->call[i];
if (call) {
MUTEX_EXIT(&call->lock);
}
}
+
+ /* Update the peer reachability information, just in case
+ * some calls went into attach-wait while we were waiting
+ * for authentication..
+ */
+ rxi_UpdatePeerReach(conn, NULL);
}
return np;
}
* call so it eventually gets one */
void
rxi_AttachServerProc(call, socket, tnop, newcallp)
-register struct rx_call *call;
-register osi_socket socket;
-register int *tnop;
-register struct rx_call **newcallp;
+ register struct rx_call *call;
+ register osi_socket socket;
+ register int *tnop;
+ register struct rx_call **newcallp;
{
register struct rx_serverQueueEntry *sq;
register struct rx_service *service = call->conn->service;
for (queue_Scan(&call->tq, p, tp, rx_packet)) {
if (!p)
break;
- p->acked = 1;
+ p->flags |= RX_PKTFLAG_ACKED;
someAcked = 1;
}
if (someAcked) {
for (queue_Scan(&call->tq, p, tp, rx_packet)) {
if (!p)
break;
- p->acked = 1;
+ p->flags |= RX_PKTFLAG_ACKED;
someAcked = 1;
}
if (someAcked) {
{
if (error) {
register int i;
+ MUTEX_ENTER(&conn->conn_data_lock);
if (conn->challengeEvent)
rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
+ if (conn->checkReachEvent) {
+ rxevent_Cancel(conn->checkReachEvent, (struct rx_call*)0, 0);
+ conn->checkReachEvent = 0;
+ conn->refCount--;
+ }
+ MUTEX_EXIT(&conn->conn_data_lock);
for (i=0; i<RX_MAXCALLS; i++) {
struct rx_call *call = conn->call[i];
if (call) {
/* Does the current packet force us to flush the current list? */
if (cnt > 0
&& (list[i]->header.serial
- || list[i]->acked
+ || (list[i]->flags & RX_PKTFLAG_ACKED)
|| list[i]->length > RX_JUMBOBUFFERSIZE)) {
if (lastCnt > 0) {
rxi_SendList(call, lastP, lastCnt, istack, 1, now, retryTime, resending);
}
/* Add the current packet to the list if it hasn't been acked.
* Otherwise adjust the list pointer to skip the current packet. */
- if (!list[i]->acked) {
+ if (!(list[i]->flags & RX_PKTFLAG_ACKED)) {
cnt++;
/* Do we need to flush the list? */
if (cnt >= (int)peer->maxDgramPackets
* an acked packet. Since we always send retransmissions
* in a separate packet, we only need to check the first
* packet in the list */
- if (cnt > 0 && !listP[0]->acked) {
+ if (cnt > 0 && !(listP[0]->flags & RX_PKTFLAG_ACKED)) {
morePackets = 1;
}
if (lastCnt > 0) {
* than recovery rates.
*/
for(queue_Scan(&call->tq, p, nxp, rx_packet)) {
- if (!p->acked) {
+ if (!(p->flags & RX_PKTFLAG_ACKED)) {
clock_Zero(&p->retryTime);
}
}
/* Only send one packet during fast recovery */
break;
}
- if ((p->header.flags == RX_FREE_PACKET) ||
+ if ((p->flags & RX_PKTFLAG_FREE) ||
(!queue_IsEnd(&call->tq, nxp)
- && (nxp->header.flags == RX_FREE_PACKET)) ||
+ && (nxp->flags & RX_PKTFLAG_FREE)) ||
(p == (struct rx_packet *)&rx_freePacketQueue) ||
(nxp == (struct rx_packet *)&rx_freePacketQueue)) {
osi_Panic("rxi_Start: xmit queue clobbered");
}
- if (p->acked) {
+ if (p->flags & RX_PKTFLAG_ACKED) {
MUTEX_ENTER(&rx_stats_mutex);
rx_stats.ignoreAckedPacket++;
MUTEX_EXIT(&rx_stats_mutex);
* the transmit queue.
*/
for (missing = 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
- if (p->header.seq < call->tfirst && p->acked) {
+ if (p->header.seq < call->tfirst && (p->flags & RX_PKTFLAG_ACKED)) {
queue_Remove(p);
rxi_FreePacket(p);
}
break;
}
- if (!p->acked && !clock_IsZero(&p->retryTime)) {
+ if (!(p->flags & RX_PKTFLAG_ACKED) && !clock_IsZero(&p->retryTime)) {
haveEvent = 1;
retryTime = p->retryTime;
break;
* seconds) to ask the client to authenticate itself. The routine
* issues a challenge to the client, which is obtained from the
* security object associated with the connection */
-void rxi_ChallengeEvent(event, conn, dummy)
+void rxi_ChallengeEvent(event, conn, atries)
struct rxevent *event;
register struct rx_connection *conn;
- char *dummy;
+ void *atries;
{
+ int tries = (int) atries;
conn->challengeEvent = (struct rxevent *) 0;
if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) {
register struct rx_packet *packet;
struct clock when;
+
+ if (tries <= 0) {
+ /* We've failed to authenticate for too long.
+ * Reset any calls waiting for authentication;
+ * they are all in RX_STATE_PRECALL.
+ */
+ int i;
+
+ MUTEX_ENTER(&conn->conn_call_lock);
+ for (i=0; i<RX_MAXCALLS; i++) {
+ struct rx_call *call = conn->call[i];
+ if (call) {
+ MUTEX_ENTER(&call->lock);
+ if (call->state == RX_STATE_PRECALL) {
+ rxi_CallError(call, RX_CALL_DEAD);
+ rxi_SendCallAbort(call, NULL, 0, 0);
+ }
+ MUTEX_EXIT(&call->lock);
+ }
+ }
+ MUTEX_EXIT(&conn->conn_call_lock);
+ return;
+ }
+
packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
if (packet) {
/* If there's no packet available, do this later. */
}
clock_GetTime(&when);
when.sec += RX_CHALLENGE_TIMEOUT;
- conn->challengeEvent = rxevent_Post(&when, rxi_ChallengeEvent, conn, 0);
+ conn->challengeEvent =
+ rxevent_Post(&when, rxi_ChallengeEvent, conn, (void *) (tries-1));
}
}
{
if (!conn->challengeEvent) {
RXS_CreateChallenge(conn->securityObject, conn);
- rxi_ChallengeEvent((struct rxevent *)0, conn, NULL);
+ rxi_ChallengeEvent(NULL, conn, (void *) RX_CHALLENGE_MAXTRIES);
};
}
taddr.sin_family = AF_INET;
taddr.sin_port = remotePort;
taddr.sin_addr.s_addr = remoteAddr;
+#ifdef STRUCT_SOCKADDR_HAS_SA_LEN
+ taddr.sin_len = sizeof(struct sockaddr_in);
+#endif
while(1) {
memset(&theader, 0, sizeof(theader));
theader.epoch = htonl(999);
theader.flags = RX_CLIENT_INITIATED | RX_LAST_PACKET;
theader.serviceId = 0;
- bcopy(&theader, tbuffer, sizeof(theader));
- bcopy(inputData, tp, inputLength);
+ memcpy(tbuffer, &theader, sizeof(theader));
+ memcpy(tp, inputData, inputLength);
code = sendto(socket, tbuffer, inputLength+sizeof(struct rx_header), 0,
(struct sockaddr *) &taddr, sizeof(struct sockaddr_in));
code = recvfrom(socket, tbuffer, sizeof(tbuffer), 0,
(struct sockaddr *) &faddr, &faddrLen);
- bcopy(tbuffer, &theader, sizeof(struct rx_header));
+ memcpy(&theader, tbuffer, sizeof(struct rx_header));
if (counter == ntohl(theader.callNumber)) break;
}
}
code -= sizeof(struct rx_header);
if (code > outputLength) code = outputLength;
- bcopy(tp, outputData, code);
+ memcpy(outputData, tp, code);
return code;
}