# include <WINNT\afsreg.h>
# endif
+# include <afs/opr.h>
+
# include "rx_user.h"
#endif /* KERNEL */
+#include <opr/queue.h>
+#include <hcrypto/rand.h>
+
#include "rx.h"
#include "rx_clock.h"
-#include "rx_queue.h"
#include "rx_atomic.h"
#include "rx_globals.h"
#include "rx_trace.h"
#include "rx_internal.h"
#include "rx_stats.h"
+#include "rx_event.h"
+
+#include "rx_peer.h"
+#include "rx_conn.h"
+#include "rx_call.h"
+#include "rx_packet.h"
+#include "rx_server.h"
#include <afs/rxgen_consts.h>
static void rxi_Resend(struct rxevent *event, void *arg0, void *arg1,
int istack);
static void rxi_SendDelayedAck(struct rxevent *event, void *call,
- void *dummy);
+ void *dummy, int dummy2);
+static void rxi_SendDelayedCallAbort(struct rxevent *event, void *arg1,
+ void *dummy, int dummy2);
+static void rxi_SendDelayedConnAbort(struct rxevent *event, void *arg1,
+ void *unused, int unused2);
+static void rxi_ReapConnections(struct rxevent *unused, void *unused1,
+ void *unused2, int unused3);
+static struct rx_packet *rxi_SendCallAbort(struct rx_call *call,
+ struct rx_packet *packet,
+ int istack, int force);
+static void rxi_AckAll(struct rx_call *call);
+static struct rx_connection
+ *rxi_FindConnection(osi_socket socket, afs_uint32 host, u_short port,
+ u_short serviceId, afs_uint32 cid,
+ afs_uint32 epoch, int type, u_int securityIndex,
+ int *unknownService);
+static struct rx_packet
+ *rxi_ReceiveDataPacket(struct rx_call *call, struct rx_packet *np,
+ int istack, osi_socket socket,
+ afs_uint32 host, u_short port, int *tnop,
+ struct rx_call **newcallp);
+static struct rx_packet
+ *rxi_ReceiveAckPacket(struct rx_call *call, struct rx_packet *np,
+ int istack);
+static struct rx_packet
+ *rxi_ReceiveResponsePacket(struct rx_connection *conn,
+ struct rx_packet *np, int istack);
+static struct rx_packet
+ *rxi_ReceiveChallengePacket(struct rx_connection *conn,
+ struct rx_packet *np, int istack);
+static void rxi_AttachServerProc(struct rx_call *call, osi_socket socket,
+ int *tnop, struct rx_call **newcallp);
+static void rxi_ClearTransmitQueue(struct rx_call *call, int force);
+static void rxi_ClearReceiveQueue(struct rx_call *call);
+static void rxi_ResetCall(struct rx_call *call, int newcall);
+static void rxi_ScheduleKeepAliveEvent(struct rx_call *call);
+static void rxi_ScheduleNatKeepAliveEvent(struct rx_connection *conn);
+static void rxi_ScheduleGrowMTUEvent(struct rx_call *call, int secs);
+static void rxi_KeepAliveOn(struct rx_call *call);
+static void rxi_GrowMTUOn(struct rx_call *call);
+static void rxi_ChallengeOn(struct rx_connection *conn);
+static int rxi_CheckCall(struct rx_call *call, int haveCTLock);
+static void rxi_AckAllInTransmitQueue(struct rx_call *call);
+static void rxi_CancelKeepAliveEvent(struct rx_call *call);
+static void rxi_CancelDelayedAbortEvent(struct rx_call *call);
+static void rxi_CancelGrowMTUEvent(struct rx_call *call);
+static void update_nextCid(void);
#ifdef RX_ENABLE_LOCKS
-static void rxi_SetAcksInTransmitQueue(struct rx_call *call);
-#endif
-
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
struct rx_tq_debug {
rx_atomic_t rxi_start_aborted; /* rxi_start awoke after rxi_Send in error.*/
rx_atomic_t rxi_start_in_error;
} rx_tq_debug;
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif /* RX_ENABLE_LOCKS */
/* Constant delay time before sending an acknowledge of the last packet
* received. This is to avoid sending an extra acknowledge when the
static unsigned int rxi_rpc_process_stat_cnt;
/*
- * rxi_busyChannelError is the error to return to the application when a call
- * channel appears busy (inferred from the receipt of RX_PACKET_TYPE_BUSY
- * packets on the channel), and there are other call channels in the
- * connection that are not busy. If 0, we do not return errors upon receiving
- * busy packets; we just keep trying on the same call channel until we hit a
- * timeout.
+ * rxi_busyChannelError is a boolean. It indicates whether or not RX_CALL_BUSY
+ * errors should be reported to the application when a call channel appears busy
+ * (inferred from the receipt of RX_PACKET_TYPE_BUSY packets on the channel),
+ * and there are other call channels in the connection that are not busy.
+ * If 0, we do not return errors upon receiving busy packets; we just keep
+ * trying on the same call channel until we hit a timeout.
*/
static afs_int32 rxi_busyChannelError = 0;
rx_atomic_t rx_nWaiting = RX_ATOMIC_INIT(0);
rx_atomic_t rx_nWaited = RX_ATOMIC_INIT(0);
+/* Incoming calls wait on this queue when there are no available
+ * server processes */
+struct opr_queue rx_incomingCallQueue;
+
+/* Server processes wait on this queue when there are no appropriate
+ * calls to process */
+struct opr_queue rx_idleServerQueue;
+
#if !defined(offsetof)
#include <stddef.h> /* for definition of offsetof() */
#endif
/* Forward prototypes */
static struct rx_call * rxi_NewCall(struct rx_connection *, int);
+static_inline void
+putConnection (struct rx_connection *conn) {
+ MUTEX_ENTER(&rx_refcnt_mutex);
+ conn->refCount--;
+ MUTEX_EXIT(&rx_refcnt_mutex);
+}
+
#ifdef AFS_PTHREAD_ENV
/*
extern afs_kmutex_t rx_refcnt_mutex;
extern afs_kmutex_t des_init_mutex;
extern afs_kmutex_t des_random_mutex;
+#ifndef KERNEL
extern afs_kmutex_t rx_clock_mutex;
extern afs_kmutex_t rxi_connCacheMutex;
-extern afs_kmutex_t rx_event_mutex;
extern afs_kmutex_t event_handler_mutex;
extern afs_kmutex_t listener_mutex;
extern afs_kmutex_t rx_if_init_mutex;
extern afs_kcondvar_t rx_event_handler_cond;
extern afs_kcondvar_t rx_listener_cond;
+#endif /* !KERNEL */
static afs_kmutex_t epoch_mutex;
static afs_kmutex_t rx_init_mutex;
static void
rxi_InitPthread(void)
{
- MUTEX_INIT(&rx_clock_mutex, "clock", MUTEX_DEFAULT, 0);
- MUTEX_INIT(&rx_stats_mutex, "stats", MUTEX_DEFAULT, 0);
- MUTEX_INIT(&rx_atomic_mutex, "atomic", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_quota_mutex, "quota", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_pthread_mutex, "pthread", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_packets_mutex, "packets", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_refcnt_mutex, "refcnts", MUTEX_DEFAULT, 0);
- MUTEX_INIT(&epoch_mutex, "epoch", MUTEX_DEFAULT, 0);
- MUTEX_INIT(&rx_init_mutex, "init", MUTEX_DEFAULT, 0);
- MUTEX_INIT(&rx_event_mutex, "event", MUTEX_DEFAULT, 0);
- MUTEX_INIT(&event_handler_mutex, "event handler", MUTEX_DEFAULT, 0);
+#ifndef KERNEL
+ MUTEX_INIT(&rx_clock_mutex, "clock", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rxi_connCacheMutex, "conn cache", MUTEX_DEFAULT, 0);
+ MUTEX_INIT(&event_handler_mutex, "event handler", MUTEX_DEFAULT, 0);
MUTEX_INIT(&listener_mutex, "listener", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_if_init_mutex, "if init", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_if_mutex, "if", MUTEX_DEFAULT, 0);
+#endif
+ MUTEX_INIT(&rx_stats_mutex, "stats", MUTEX_DEFAULT, 0);
+ MUTEX_INIT(&rx_atomic_mutex, "atomic", MUTEX_DEFAULT, 0);
+ MUTEX_INIT(&epoch_mutex, "epoch", MUTEX_DEFAULT, 0);
+ MUTEX_INIT(&rx_init_mutex, "init", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_debug_mutex, "debug", MUTEX_DEFAULT, 0);
+#ifndef KERNEL
CV_INIT(&rx_event_handler_cond, "evhand", CV_DEFAULT, 0);
CV_INIT(&rx_listener_cond, "rxlisten", CV_DEFAULT, 0);
+#endif
osi_Assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
osi_Assert(pthread_key_create(&rx_ts_info_key, NULL) == 0);
MUTEX_INIT(&rx_connHashTable_lock, "rx_connHashTable_lock", MUTEX_DEFAULT,
0);
MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
+#ifndef KERNEL
MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
+#endif
#endif /* RX_ENABLE_LOCKS */
}
* tiers:
*
* rx_connHashTable_lock - synchronizes conn creation, rx_connHashTable access
+ * also protects updates to rx_nextCid
* conn_call_lock - used to synchonize rx_EndCall and rx_NewCall
* call->lock - locks call data fields.
* These are independent of each other:
#define CLEAR_CALL_QUEUE_LOCK(C)
#endif /* RX_ENABLE_LOCKS */
struct rx_serverQueueEntry *rx_waitForPacket = 0;
-struct rx_serverQueueEntry *rx_waitingForPacket = 0;
/* ------------Exported Interfaces------------- */
-/* This function allows rxkad to set the epoch to a suitably random number
- * which rx_NewConnection will use in the future. The principle purpose is to
- * get rxnull connections to use the same epoch as the rxkad connections do, at
- * least once the first rxkad connection is established. This is important now
- * that the host/port addresses aren't used in FindConnection: the uniqueness
- * of epoch/cid matters and the start time won't do. */
-
-#ifdef AFS_PTHREAD_ENV
-/*
- * This mutex protects the following global variables:
- * rx_epoch
- */
-
-#define LOCK_EPOCH MUTEX_ENTER(&epoch_mutex)
-#define UNLOCK_EPOCH MUTEX_EXIT(&epoch_mutex)
-#else
-#define LOCK_EPOCH
-#define UNLOCK_EPOCH
-#endif /* AFS_PTHREAD_ENV */
-
-void
-rx_SetEpoch(afs_uint32 epoch)
-{
- LOCK_EPOCH;
- rx_epoch = epoch;
- UNLOCK_EPOCH;
-}
-
/* Initialize rx. A port number may be mentioned, in which case this
* becomes the default port number for any service installed later.
* If 0 is provided for the port number, a random port will be chosen
#ifndef AFS_NT40_ENV
static
#endif
-int rxinit_status = 1;
-#ifdef AFS_PTHREAD_ENV
-/*
- * This mutex protects the following global variables:
- * rxinit_status
- */
-
-#define LOCK_RX_INIT MUTEX_ENTER(&rx_init_mutex)
-#define UNLOCK_RX_INIT MUTEX_EXIT(&rx_init_mutex)
-#else
-#define LOCK_RX_INIT
-#define UNLOCK_RX_INIT
-#endif
+rx_atomic_t rxinit_status = RX_ATOMIC_INIT(1);
int
rx_InitHost(u_int host, u_int port)
struct timeval tv;
#endif /* KERNEL */
char *htable, *ptable;
- int tmp_status;
SPLVAR;
INIT_PTHREAD_LOCKS;
- LOCK_RX_INIT;
- if (rxinit_status == 0) {
- tmp_status = rxinit_status;
- UNLOCK_RX_INIT;
- return tmp_status; /* Already started; return previous error code. */
- }
+ if (!rx_atomic_test_and_clear_bit(&rxinit_status, 0))
+ return 0; /* already started */
+
#ifdef RXDEBUG
rxi_DebugInit();
#endif
rx_socket = rxi_GetHostUDPSocket(host, (u_short) port);
if (rx_socket == OSI_NULLSOCKET) {
- UNLOCK_RX_INIT;
return RX_ADDRINUSE;
}
#if defined(RX_ENABLE_LOCKS) && defined(KERNEL)
#endif /* RX_LOCKS_DB */
MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_quota_mutex, "rx_quota_mutex", MUTEX_DEFAULT, 0);
+ MUTEX_INIT(&rx_atomic_mutex, "rx_atomic_mutex", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_pthread_mutex, "rx_pthread_mutex", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_packets_mutex, "rx_packets_mutex", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_refcnt_mutex, "rx_refcnt_mutex", MUTEX_DEFAULT, 0);
rx_connDeadTime = 12;
rx_tranquil = 0; /* reset flag */
rxi_ResetStatistics();
- htable = (char *)
- osi_Alloc(rx_hashTableSize * sizeof(struct rx_connection *));
+ htable = osi_Alloc(rx_hashTableSize * sizeof(struct rx_connection *));
PIN(htable, rx_hashTableSize * sizeof(struct rx_connection *)); /* XXXXX */
memset(htable, 0, rx_hashTableSize * sizeof(struct rx_connection *));
- ptable = (char *)osi_Alloc(rx_hashTableSize * sizeof(struct rx_peer *));
+ ptable = osi_Alloc(rx_hashTableSize * sizeof(struct rx_peer *));
PIN(ptable, rx_hashTableSize * sizeof(struct rx_peer *)); /* XXXXX */
memset(ptable, 0, rx_hashTableSize * sizeof(struct rx_peer *));
/* Malloc up a bunch of packets & buffers */
rx_nFreePackets = 0;
- queue_Init(&rx_freePacketQueue);
+ opr_queue_Init(&rx_freePacketQueue);
rxi_NeedMorePackets = FALSE;
rx_nPackets = 0; /* rx_nPackets is managed by rxi_MorePackets* */
#endif
if (getsockname((intptr_t)rx_socket, (struct sockaddr *)&addr, &addrlen)) {
rx_Finalize();
+ osi_Free(htable, rx_hashTableSize * sizeof(struct rx_connection *));
return -1;
}
rx_port = addr.sin_port;
#endif
}
rx_stats.minRtt.sec = 9999999;
-#ifdef KERNEL
- rx_SetEpoch(tv.tv_sec | 0x80000000);
-#else
- rx_SetEpoch(tv.tv_sec); /* Start time of this package, rxkad
- * will provide a randomer value. */
-#endif
+ if (RAND_bytes(&rx_epoch, sizeof(rx_epoch)) != 1)
+ return -1;
+ rx_epoch = (rx_epoch & ~0x40000000) | 0x80000000;
+ if (RAND_bytes(&rx_nextCid, sizeof(rx_nextCid)) != 1)
+ return -1;
+ rx_nextCid &= RX_CIDMASK;
MUTEX_ENTER(&rx_quota_mutex);
rxi_dataQuota += rx_extraQuota; /* + extra pkts caller asked to rsrv */
MUTEX_EXIT(&rx_quota_mutex);
rxevent_Init(20, rxi_ReScheduleEvents);
/* Initialize various global queues */
- queue_Init(&rx_idleServerQueue);
- queue_Init(&rx_incomingCallQueue);
- queue_Init(&rx_freeCallQueue);
+ opr_queue_Init(&rx_idleServerQueue);
+ opr_queue_Init(&rx_incomingCallQueue);
+ opr_queue_Init(&rx_freeCallQueue);
#if defined(AFS_NT40_ENV) && !defined(KERNEL)
/* Initialize our list of usable IP addresses. */
rx_GetIFInfo();
#endif
-#if defined(RXK_LISTENER_ENV) || !defined(KERNEL)
/* Start listener process (exact function is dependent on the
* implementation environment--kernel or user space) */
rxi_StartListener();
-#endif
USERPRI;
- tmp_status = rxinit_status = 0;
- UNLOCK_RX_INIT;
- return tmp_status;
+ rx_atomic_clear_bit(&rxinit_status, 0);
+ return 0;
}
int
if (lastPacket && call->conn->type == RX_CLIENT_CONNECTION)
clock_Addmsec(&retryTime, 400);
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_HOLD(call, RX_CALL_REFCOUNT_RESEND);
- MUTEX_EXIT(&rx_refcnt_mutex);
- call->resendEvent = rxevent_PostNow2(&retryTime, &now, rxi_Resend,
- call, 0, istack);
+ call->resendEvent = rxevent_Post(&retryTime, &now, rxi_Resend,
+ call, NULL, istack);
}
/*!
static_inline void
rxi_rto_cancel(struct rx_call *call)
{
- if (!call->resendEvent)
- return;
-
- rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
+ if (call->resendEvent != NULL) {
+ rxevent_Cancel(&call->resendEvent);
+ CALL_RELE(call, RX_CALL_REFCOUNT_RESEND);
+ }
}
/*!
static_inline void
rxi_rto_packet_acked(struct rx_call *call, int istack)
{
- struct rx_packet *p, *nxp;
+ struct opr_queue *cursor;
rxi_rto_cancel(call);
- if (queue_IsEmpty(&call->tq))
+ if (opr_queue_IsEmpty(&call->tq))
return;
- for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
+ for (opr_queue_Scan(&call->tq, cursor)) {
+ struct rx_packet *p = opr_queue_Entry(cursor, struct rx_packet, entry);
if (p->header.seq > call->tfirst + call->twind)
return;
}
/**
- * Sets the error generated when a busy call channel is detected.
+ * Enables or disables the busy call channel error (RX_CALL_BUSY).
*
- * @param[in] error The error to return for a call on a busy channel.
+ * @param[in] onoff Non-zero to enable busy call channel errors.
*
* @pre Neither rx_Init nor rx_InitHost have been called yet
*/
void
-rx_SetBusyChannelError(afs_int32 error)
+rx_SetBusyChannelError(afs_int32 onoff)
{
- osi_Assert(rxinit_status != 0);
- rxi_busyChannelError = error;
+ osi_Assert(rx_atomic_test_bit(&rxinit_status, 0));
+ rxi_busyChannelError = onoff ? 1 : 0;
}
/**
when = now;
clock_Add(&when, offset);
- if (!call->delayedAckEvent
- || clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
+ if (call->delayedAckEvent && clock_Gt(&call->delayedAckTime, &when)) {
+ /* The event we're cancelling already has a reference, so we don't
+ * need a new one */
+ rxevent_Cancel(&call->delayedAckEvent);
+ call->delayedAckEvent = rxevent_Post(&when, &now, rxi_SendDelayedAck,
+ call, NULL, 0);
- rxevent_Cancel(call->delayedAckEvent, call,
- RX_CALL_REFCOUNT_DELAY);
- MUTEX_ENTER(&rx_refcnt_mutex);
+ call->delayedAckTime = when;
+ } else if (!call->delayedAckEvent) {
CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
- MUTEX_EXIT(&rx_refcnt_mutex);
-
- call->delayedAckEvent = rxevent_PostNow(&when, &now,
- rxi_SendDelayedAck,
- call, 0);
+ call->delayedAckEvent = rxevent_Post(&when, &now,
+ rxi_SendDelayedAck,
+ call, NULL, 0);
+ call->delayedAckTime = when;
}
}
-
+void
+rxi_CancelDelayedAckEvent(struct rx_call *call)
+{
+ if (call->delayedAckEvent) {
+ rxevent_Cancel(&call->delayedAckEvent);
+ CALL_RELE(call, RX_CALL_REFCOUNT_DELAY);
+ }
+}
/* called with unincremented nRequestsRunning to see if it is OK to start
* a new thread in this service. Could be "no" for two reasons: over the
}
/* Turn on reaping of idle server connections */
- rxi_ReapConnections(NULL, NULL, NULL);
+ rxi_ReapConnections(NULL, NULL, NULL, 0);
USERPRI;
int serviceSecurityIndex)
{
int hashindex, i;
- afs_int32 cid;
struct rx_connection *conn;
SPLVAR;
#endif
NETPRI;
MUTEX_ENTER(&rx_connHashTable_lock);
- cid = (rx_nextCid += RX_MAXCALLS);
conn->type = RX_CLIENT_CONNECTION;
- conn->cid = cid;
conn->epoch = rx_epoch;
- conn->peer = rxi_FindPeer(shost, sport, 0, 1);
+ conn->cid = rx_nextCid;
+ update_nextCid();
+ conn->peer = rxi_FindPeer(shost, sport, 1);
conn->serviceId = sservice;
conn->securityObject = securityObject;
conn->securityData = (void *) 0;
rx_SetConnIdleDeadTime(struct rx_connection *conn, int seconds)
{
conn->idleDeadTime = seconds;
+ conn->idleDeadDetection = (seconds ? 1 : 0);
rxi_CheckConnTimeouts(conn);
}
/* Push the final acknowledgment out now--there
* won't be a subsequent call to acknowledge the
* last reply packets */
- rxevent_Cancel(call->delayedAckEvent, call,
- RX_CALL_REFCOUNT_DELAY);
+ rxi_CancelDelayedAckEvent(call);
if (call->state == RX_STATE_PRECALL
|| call->state == RX_STATE_ACTIVE) {
rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
} else {
- rxi_AckAll(NULL, call, 0);
+ rxi_AckAll(call);
}
}
MUTEX_EXIT(&call->lock);
}
if (conn->delayedAbortEvent) {
- rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
+ rxevent_Cancel(&conn->delayedAbortEvent);
packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
if (packet) {
MUTEX_ENTER(&conn->conn_data_lock);
/* Make sure the connection is completely reset before deleting it. */
/* get rid of pending events that could zap us later */
- if (conn->challengeEvent)
- rxevent_Cancel(conn->challengeEvent, (struct rx_call *)0, 0);
- if (conn->checkReachEvent)
- rxevent_Cancel(conn->checkReachEvent, (struct rx_call *)0, 0);
- if (conn->natKeepAliveEvent)
- rxevent_Cancel(conn->natKeepAliveEvent, (struct rx_call *)0, 0);
+ rxevent_Cancel(&conn->challengeEvent);
+ rxevent_Cancel(&conn->checkReachEvent);
+ rxevent_Cancel(&conn->natKeepAliveEvent);
/* Add the connection to the list of destroyed connections that
* need to be cleaned up. This is necessary to avoid deadlocks
USERPRI;
}
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
/* Wait for the transmit queue to no longer be busy.
* requires the call->lock to be held */
void
while (!call->error && (call->flags & RX_CALL_TQ_BUSY)) {
call->flags |= RX_CALL_TQ_WAIT;
call->tqWaiters++;
-#ifdef RX_ENABLE_LOCKS
- osirx_AssertMine(&call->lock, "rxi_WaitforTQ lock");
+ MUTEX_ASSERT(&call->lock);
CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
- osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
call->tqWaiters--;
if (call->tqWaiters == 0) {
call->flags &= ~RX_CALL_TQ_WAIT;
dpf(("call %"AFS_PTR_FMT" has %d waiters and flags %d\n",
call, call->tqWaiters, call->flags));
#ifdef RX_ENABLE_LOCKS
- osirx_AssertMine(&call->lock, "rxi_Start start");
+ MUTEX_ASSERT(&call->lock);
CV_BROADCAST(&call->cv_tq);
#else /* RX_ENABLE_LOCKS */
osi_rxWakeup(&call->tq);
* effect on overall system performance.
*/
call->state = RX_STATE_RESET;
+ (*call->callNumber)++;
MUTEX_EXIT(&conn->conn_call_lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
- MUTEX_EXIT(&rx_refcnt_mutex);
rxi_ResetCall(call, 0);
- (*call->callNumber)++;
if (MUTEX_TRYENTER(&conn->conn_call_lock))
break;
* Instead, cycle through one more time to see if
* we can find a call that can call our own.
*/
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
- MUTEX_EXIT(&rx_refcnt_mutex);
wait = 0;
}
MUTEX_EXIT(&call->lock);
/* rxi_NewCall returns with mutex locked */
call = rxi_NewCall(conn, i);
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
- MUTEX_EXIT(&rx_refcnt_mutex);
break;
}
}
if (i < RX_MAXCALLS) {
conn->lastBusy[i] = 0;
+ call->flags &= ~RX_CALL_PEER_BUSY;
break;
}
if (!wait)
call->state = RX_STATE_ACTIVE;
call->error = conn->error;
if (call->error)
- call->mode = RX_MODE_ERROR;
+ call->app.mode = RX_MODE_ERROR;
else
- call->mode = RX_MODE_SENDING;
+ call->app.mode = RX_MODE_SENDING;
+
+#ifdef AFS_RXERRQ_ENV
+ /* remember how many network errors the peer has when we started, so if
+ * more errors are encountered after the call starts, we know the other endpoint won't be
+ * responding to us */
+ call->neterr_gen = rx_atomic_read(&conn->peer->neterrs);
+#endif
/* remember start time for call in case we have hard dead time limit */
call->queueTime = queueTime;
clock_GetTime(&call->startTime);
- hzero(call->bytesSent);
- hzero(call->bytesRcvd);
+ call->app.bytesSent = 0;
+ call->app.bytesRcvd = 0;
/* Turn on busy protocol. */
rxi_KeepAliveOn(call);
* run (see code above that avoids resource starvation).
*/
#ifdef RX_ENABLE_LOCKS
+ if (call->flags & (RX_CALL_TQ_BUSY | RX_CALL_TQ_CLEARME)) {
+ osi_Panic("rx_NewCall call about to be used without an empty tq");
+ }
+
CV_BROADCAST(&conn->conn_call_cv);
#else
osi_rxWakeup(conn);
#endif
MUTEX_EXIT(&conn->conn_call_lock);
-
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- if (call->flags & (RX_CALL_TQ_BUSY | RX_CALL_TQ_CLEARME)) {
- osi_Panic("rx_NewCall call about to be used without an empty tq");
- }
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
-
MUTEX_EXIT(&call->lock);
USERPRI;
SPLVAR;
NETPRI;
+ MUTEX_ENTER(&aconn->conn_call_lock);
for (i = 0; i < RX_MAXCALLS; i++) {
if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
aint32s[i] = aconn->callNumber[i] + 1;
else
aint32s[i] = aconn->callNumber[i];
}
+ MUTEX_EXIT(&aconn->conn_call_lock);
USERPRI;
return 0;
}
SPLVAR;
NETPRI;
+ MUTEX_ENTER(&aconn->conn_call_lock);
for (i = 0; i < RX_MAXCALLS; i++) {
if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
aconn->callNumber[i] = aint32s[i] - 1;
else
aconn->callNumber[i] = aint32s[i];
}
+ MUTEX_EXIT(&aconn->conn_call_lock);
USERPRI;
return 0;
}
tservice = rxi_AllocService();
NETPRI;
-#ifdef RX_ENABLE_LOCKS
MUTEX_INIT(&tservice->svc_data_lock, "svc data lock", MUTEX_DEFAULT, 0);
-#endif
for (i = 0; i < RX_MAX_SERVICES; i++) {
struct rx_service *service = rx_services[i];
rx_WakeupServerProcs(void)
{
struct rx_serverQueueEntry *np, *tqp;
+ struct opr_queue *cursor;
SPLVAR;
NETPRI;
#endif /* RX_ENABLE_LOCKS */
}
MUTEX_EXIT(&freeSQEList_lock);
- for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
+ for (opr_queue_Scan(&rx_idleServerQueue, cursor)) {
+ np = opr_queue_Entry(cursor, struct rx_serverQueueEntry, entry);
#ifdef RX_ENABLE_LOCKS
CV_BROADCAST(&np->cv);
#else /* RX_ENABLE_LOCKS */
ReturnToServerPool(cur_service);
}
while (1) {
- if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
- struct rx_call *tcall, *ncall, *choice2 = NULL;
+ if (!opr_queue_IsEmpty(&rx_incomingCallQueue)) {
+ struct rx_call *tcall, *choice2 = NULL;
+ struct opr_queue *cursor;
/* Scan for eligible incoming calls. A call is not eligible
* if the maximum number of calls for its service type are
* while the other threads may run ahead looking for calls which
* have all their input data available immediately. This helps
* keep threads from blocking, waiting for data from the client. */
- for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
+ for (opr_queue_Scan(&rx_incomingCallQueue, cursor)) {
+ tcall = opr_queue_Entry(cursor, struct rx_call, entry);
+
service = tcall->conn->service;
if (!QuotaOK(service)) {
continue;
}
MUTEX_ENTER(&rx_pthread_mutex);
if (tno == rxi_fcfs_thread_num
- || !tcall->queue_item_header.next) {
+ || opr_queue_IsEnd(&rx_incomingCallQueue, cursor)) {
MUTEX_EXIT(&rx_pthread_mutex);
/* If we're the fcfs thread , then we'll just use
* this call. If we haven't been able to find an optimal
service = call->conn->service;
} else {
MUTEX_EXIT(&rx_pthread_mutex);
- if (!queue_IsEmpty(&tcall->rq)) {
+ if (!opr_queue_IsEmpty(&tcall->rq)) {
struct rx_packet *rp;
- rp = queue_First(&tcall->rq, rx_packet);
+ rp = opr_queue_First(&tcall->rq, struct rx_packet,
+ entry);
if (rp->header.seq == 1) {
if (!meltdown_1pkt
|| (rp->header.flags & RX_LAST_PACKET)) {
}
if (call) {
- queue_Remove(call);
+ opr_queue_Remove(&call->entry);
MUTEX_EXIT(&rx_serverPool_lock);
MUTEX_ENTER(&call->lock);
continue;
}
- if (queue_IsEmpty(&call->rq)
- || queue_First(&call->rq, rx_packet)->header.seq != 1)
+ if (opr_queue_IsEmpty(&call->rq)
+ || opr_queue_First(&call->rq, struct rx_packet, entry)->header.seq != 1)
rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
CLEAR_CALL_QUEUE_LOCK(call);
*socketp = OSI_NULLSOCKET;
}
sq->socketp = socketp;
- queue_Append(&rx_idleServerQueue, sq);
+ opr_queue_Append(&rx_idleServerQueue, &sq->entry);
#ifndef AFS_AIX41_ENV
rx_waitForPacket = sq;
-#else
- rx_waitingForPacket = sq;
#endif /* AFS_AIX41_ENV */
do {
CV_WAIT(&sq->cv, &rx_serverPool_lock);
if (call) {
clock_GetTime(&call->startTime);
call->state = RX_STATE_ACTIVE;
- call->mode = RX_MODE_RECEIVING;
+ call->app.mode = RX_MODE_RECEIVING;
#ifdef RX_KERNEL_TRACE
if (ICL_SETACTIVE(afs_iclSetp)) {
int glockOwner = ISAFS_GLOCK();
call));
MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
- MUTEX_EXIT(&rx_refcnt_mutex);
} else {
dpf(("rx_GetCall(socketp=%p, *socketp=0x%x)\n", socketp, *socketp));
}
rxi_availProcs++;
MUTEX_EXIT(&rx_quota_mutex);
}
- if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
- struct rx_call *tcall, *ncall;
+ if (!opr_queue_IsEmpty(&rx_incomingCallQueue)) {
+ struct rx_call *tcall;
+ struct opr_queue *cursor;
/* Scan for eligible incoming calls. A call is not eligible
* if the maximum number of calls for its service type are
* already executing */
* have all their input data available immediately. This helps
* keep threads from blocking, waiting for data from the client. */
choice2 = (struct rx_call *)0;
- for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
+ for (opr_queue_Scan(&rx_incomingCallQueue, cursor)) {
+ tcall = opr_queue_Entry(cursor, struct rx_call, entry);
service = tcall->conn->service;
if (QuotaOK(service)) {
MUTEX_ENTER(&rx_pthread_mutex);
- if (tno == rxi_fcfs_thread_num
- || !tcall->queue_item_header.next) {
+ /* XXX - If tcall->entry.next is NULL, then we're no longer
+ * on a queue at all. This shouldn't happen. */
+ if (tno == rxi_fcfs_thread_num || !tcall->entry.next) {
MUTEX_EXIT(&rx_pthread_mutex);
/* If we're the fcfs thread, then we'll just use
* this call. If we haven't been able to find an optimal
service = call->conn->service;
} else {
MUTEX_EXIT(&rx_pthread_mutex);
- if (!queue_IsEmpty(&tcall->rq)) {
+ if (!opr_queue_IsEmpty(&tcall->rq)) {
struct rx_packet *rp;
- rp = queue_First(&tcall->rq, rx_packet);
+ rp = opr_queue_First(&tcall->rq, struct rx_packet,
+ entry);
if (rp->header.seq == 1
&& (!meltdown_1pkt
|| (rp->header.flags & RX_LAST_PACKET))) {
}
if (call) {
- queue_Remove(call);
+ opr_queue_Remove(&call->entry);
/* we can't schedule a call if there's no data!!! */
/* send an ack if there's no data, if we're missing the
* first packet, or we're missing something between first
* and last -- there's a "hole" in the incoming data. */
- if (queue_IsEmpty(&call->rq)
- || queue_First(&call->rq, rx_packet)->header.seq != 1
- || call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
+ if (opr_queue_IsEmpty(&call->rq)
+ || opr_queue_First(&call->rq, struct rx_packet, entry)->header.seq != 1
+ || call->rprev != opr_queue_Last(&call->rq, struct rx_packet, entry)->header.seq)
rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
call->flags &= (~RX_CALL_WAIT_PROC);
*socketp = OSI_NULLSOCKET;
}
sq->socketp = socketp;
- queue_Append(&rx_idleServerQueue, sq);
+ opr_queue_Append(&rx_idleServerQueue, &sq->entry);
do {
osi_rxSleep(sq);
#ifdef KERNEL
if (call) {
clock_GetTime(&call->startTime);
call->state = RX_STATE_ACTIVE;
- call->mode = RX_MODE_RECEIVING;
+ call->app.mode = RX_MODE_RECEIVING;
#ifdef RX_KERNEL_TRACE
if (ICL_SETACTIVE(afs_iclSetp)) {
int glockOwner = ISAFS_GLOCK();
call->arrivalProc = (void (*)())0;
if (rc && call->error == 0) {
rxi_CallError(call, rc);
- call->mode = RX_MODE_ERROR;
+ call->app.mode = RX_MODE_ERROR;
/* Send an abort message to the peer if this error code has
* only just been set. If it was set previously, assume the
* peer has already been sent the error code or will request it
}
if (conn->type == RX_SERVER_CONNECTION) {
/* Make sure reply or at least dummy reply is sent */
- if (call->mode == RX_MODE_RECEIVING) {
+ if (call->app.mode == RX_MODE_RECEIVING) {
MUTEX_EXIT(&call->lock);
rxi_WriteProc(call, 0, 0);
MUTEX_ENTER(&call->lock);
}
- if (call->mode == RX_MODE_SENDING) {
+ if (call->app.mode == RX_MODE_SENDING) {
MUTEX_EXIT(&call->lock);
rxi_FlushWrite(call);
MUTEX_ENTER(&call->lock);
call->state = RX_STATE_DALLY;
rxi_ClearTransmitQueue(call, 0);
rxi_rto_cancel(call);
- rxevent_Cancel(call->keepAliveEvent, call,
- RX_CALL_REFCOUNT_ALIVE);
+ rxi_CancelKeepAliveEvent(call);
}
} else { /* Client connection */
char dummy;
/* Make sure server receives input packets, in the case where
* no reply arguments are expected */
- if ((call->mode == RX_MODE_SENDING)
- || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
+
+ if ((call->app.mode == RX_MODE_SENDING)
+ || (call->app.mode == RX_MODE_RECEIVING && call->rnext == 1)) {
MUTEX_EXIT(&call->lock);
(void)rxi_ReadProc(call, &dummy, 1);
MUTEX_ENTER(&call->lock);
* and force-send it now.
*/
if (call->delayedAckEvent) {
- rxevent_Cancel(call->delayedAckEvent, call,
- RX_CALL_REFCOUNT_DELAY);
- call->delayedAckEvent = NULL;
- rxi_SendDelayedAck(NULL, call, NULL);
+ rxi_CancelDelayedAckEvent(call);
+ rxi_SendDelayedAck(NULL, call, NULL, 0);
}
/* We need to release the call lock since it's lower than the
* ResetCall cannot: ResetCall may be called at splnet(), in the
* kernel version, and may interrupt the macros rx_Read or
* rx_Write, which run at normal priority for efficiency. */
- if (call->currentPacket) {
+ if (call->app.currentPacket) {
#ifdef RX_TRACK_PACKETS
- call->currentPacket->flags &= ~RX_PKTFLAG_CP;
+ call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
#endif
- rxi_FreePacket(call->currentPacket);
- call->currentPacket = (struct rx_packet *)0;
+ rxi_FreePacket(call->app.currentPacket);
+ call->app.currentPacket = (struct rx_packet *)0;
}
- call->nLeft = call->nFree = call->curlen = 0;
+ call->app.nLeft = call->app.nFree = call->app.curlen = 0;
/* Free any packets from the last call to ReadvProc/WritevProc */
#ifdef RXDEBUG_PACKET
call->iovqc -=
#endif /* RXDEBUG_PACKET */
- rxi_FreePackets(0, &call->iovq);
+ rxi_FreePackets(0, &call->app.iovq);
MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
- MUTEX_EXIT(&rx_refcnt_mutex);
if (conn->type == RX_CLIENT_CONNECTION) {
MUTEX_ENTER(&conn->conn_data_lock);
conn->flags &= ~RX_CONN_BUSY;
* Map errors to the local host's errno.h format.
*/
error = ntoh_syserr_conv(error);
+
+ /* If the caller said the call failed with some error, we had better
+ * return an error code. */
+ osi_Assert(!rc || error);
return error;
}
struct rx_connection **conn_ptr, **conn_end;
INIT_PTHREAD_LOCKS;
- LOCK_RX_INIT;
- if (rxinit_status == 1) {
- UNLOCK_RX_INIT;
+ if (rx_atomic_test_and_set_bit(&rxinit_status, 0))
return; /* Already shutdown. */
- }
+
rxi_DeleteCachedConnections();
if (rx_connHashTable) {
MUTEX_ENTER(&rx_connHashTable_lock);
afs_winsockCleanup();
#endif
- rxinit_status = 1;
- UNLOCK_RX_INIT;
}
#endif
rxi_NewCall(struct rx_connection *conn, int channel)
{
struct rx_call *call;
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
struct rx_call *cp; /* Call pointer temp */
- struct rx_call *nxp; /* Next call pointer, for queue_Scan */
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+ struct opr_queue *cursor;
+#endif
dpf(("rxi_NewCall(conn %"AFS_PTR_FMT", channel %d)\n", conn, channel));
* rxi_FreeCall */
MUTEX_ENTER(&rx_freeCallQueue_lock);
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
/*
* EXCEPT that the TQ might not yet be cleared out.
* Skip over those with in-use TQs.
*/
call = NULL;
- for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
+ for (opr_queue_Scan(&rx_freeCallQueue, cursor)) {
+ cp = opr_queue_Entry(cursor, struct rx_call, entry);
if (!(cp->flags & RX_CALL_TQ_BUSY)) {
call = cp;
break;
}
}
if (call) {
-#else /* AFS_GLOBAL_RXLOCK_KERNEL */
- if (queue_IsNotEmpty(&rx_freeCallQueue)) {
- call = queue_First(&rx_freeCallQueue, rx_call);
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- queue_Remove(call);
+#else /* RX_ENABLE_LOCKS */
+ if (!opr_queue_IsEmpty(&rx_freeCallQueue)) {
+ call = opr_queue_First(&rx_freeCallQueue, struct rx_call, entry);
+#endif /* RX_ENABLE_LOCKS */
+ opr_queue_Remove(&call->entry);
if (rx_stats_active)
rx_atomic_dec(&rx_stats.nFreeCallStructs);
MUTEX_EXIT(&rx_freeCallQueue_lock);
MUTEX_ENTER(&call->lock);
CLEAR_CALL_QUEUE_LOCK(call);
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
/* Now, if TQ wasn't cleared earlier, do it now. */
rxi_WaitforTQBusy(call);
if (call->flags & RX_CALL_TQ_CLEARME) {
rxi_ClearTransmitQueue(call, 1);
/*queue_Init(&call->tq);*/
}
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif /* RX_ENABLE_LOCKS */
/* Bind the call to its connection structure */
call->conn = conn;
rxi_ResetCall(call, 1);
CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
/* Initialize once-only items */
- queue_Init(&call->tq);
- queue_Init(&call->rq);
- queue_Init(&call->iovq);
+ opr_queue_Init(&call->tq);
+ opr_queue_Init(&call->rq);
+ opr_queue_Init(&call->app.iovq);
#ifdef RXDEBUG_PACKET
call->rqc = call->tqc = call->iovqc = 0;
#endif /* RXDEBUG_PACKET */
*
* call->lock amd rx_refcnt_mutex are held upon entry.
* haveCTLock is set when called from rxi_ReapConnections.
+ *
+ * return 1 if the call is freed, 0 if not.
*/
-static void
+static int
rxi_FreeCall(struct rx_call *call, int haveCTLock)
{
int channel = call->channel;
struct rx_connection *conn = call->conn;
+ u_char state = call->state;
-
- if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
- (*call->callNumber)++;
/*
* We are setting the state to RX_STATE_RESET to
* ensure that no one else will attempt to use this
MUTEX_EXIT(&rx_refcnt_mutex);
rxi_ResetCall(call, 0);
- MUTEX_ENTER(&conn->conn_call_lock);
- if (call->conn->call[channel] == call)
- call->conn->call[channel] = 0;
- MUTEX_EXIT(&conn->conn_call_lock);
+ if (MUTEX_TRYENTER(&conn->conn_call_lock))
+ {
+ if (state == RX_STATE_DALLY || state == RX_STATE_HOLD)
+ (*call->callNumber)++;
+
+ if (call->conn->call[channel] == call)
+ call->conn->call[channel] = 0;
+ MUTEX_EXIT(&conn->conn_call_lock);
+ } else {
+ /*
+ * We couldn't obtain the conn_call_lock so we can't
+ * disconnect the call from the connection. Set the
+ * call state to dally so that the call can be reused.
+ */
+ MUTEX_ENTER(&rx_refcnt_mutex);
+ call->state = RX_STATE_DALLY;
+ return 0;
+ }
MUTEX_ENTER(&rx_freeCallQueue_lock);
SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
/* A call may be free even though its transmit queue is still in use.
* Since we search the call list from head to tail, put busy calls at
* the head of the list, and idle calls at the tail.
*/
if (call->flags & RX_CALL_TQ_BUSY)
- queue_Prepend(&rx_freeCallQueue, call);
+ opr_queue_Prepend(&rx_freeCallQueue, &call->entry);
else
- queue_Append(&rx_freeCallQueue, call);
-#else /* AFS_GLOBAL_RXLOCK_KERNEL */
- queue_Append(&rx_freeCallQueue, call);
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+ opr_queue_Append(&rx_freeCallQueue, &call->entry);
+#else /* RX_ENABLE_LOCKS */
+ opr_queue_Append(&rx_freeCallQueue, &call->entry);
+#endif /* RX_ENABLE_LOCKS */
if (rx_stats_active)
rx_atomic_inc(&rx_stats.nFreeCallStructs);
MUTEX_EXIT(&rx_freeCallQueue_lock);
MUTEX_EXIT(&conn->conn_data_lock);
}
MUTEX_ENTER(&rx_refcnt_mutex);
+ return 1;
}
rx_atomic_t rxi_Allocsize = RX_ATOMIC_INIT(0);
MUTEX_EXIT(&rx_peerHashTable_lock);
}
+#ifdef AFS_RXERRQ_ENV
+static void
+rxi_SetPeerDead(struct sock_extended_err *err, afs_uint32 host, afs_uint16 port)
+{
+ int hashIndex = PEER_HASH(host, port);
+ struct rx_peer *peer;
+
+ MUTEX_ENTER(&rx_peerHashTable_lock);
+
+ for (peer = rx_peerHashTable[hashIndex]; peer; peer = peer->next) {
+ if (peer->host == host && peer->port == port) {
+ peer->refCount++;
+ break;
+ }
+ }
+
+ MUTEX_EXIT(&rx_peerHashTable_lock);
+
+ if (peer) {
+ rx_atomic_inc(&peer->neterrs);
+ MUTEX_ENTER(&peer->peer_lock);
+ peer->last_err_origin = RX_NETWORK_ERROR_ORIGIN_ICMP;
+ peer->last_err_type = err->ee_type;
+ peer->last_err_code = err->ee_code;
+ MUTEX_EXIT(&peer->peer_lock);
+
+ MUTEX_ENTER(&rx_peerHashTable_lock);
+ peer->refCount--;
+ MUTEX_EXIT(&rx_peerHashTable_lock);
+ }
+}
+
+void
+rxi_ProcessNetError(struct sock_extended_err *err, afs_uint32 addr, afs_uint16 port)
+{
+# ifdef AFS_ADAPT_PMTU
+ if (err->ee_errno == EMSGSIZE && err->ee_info >= 68) {
+ rxi_SetPeerMtu(NULL, addr, port, err->ee_info - RX_IPUDP_SIZE);
+ return;
+ }
+# endif
+ if (err->ee_origin == SO_EE_ORIGIN_ICMP && err->ee_type == ICMP_DEST_UNREACH) {
+ switch (err->ee_code) {
+ case ICMP_NET_UNREACH:
+ case ICMP_HOST_UNREACH:
+ case ICMP_PORT_UNREACH:
+ case ICMP_NET_ANO:
+ case ICMP_HOST_ANO:
+ rxi_SetPeerDead(err, addr, port);
+ break;
+ }
+ }
+}
+
+static const char *
+rxi_TranslateICMP(int type, int code)
+{
+ switch (type) {
+ case ICMP_DEST_UNREACH:
+ switch (code) {
+ case ICMP_NET_UNREACH:
+ return "Destination Net Unreachable";
+ case ICMP_HOST_UNREACH:
+ return "Destination Host Unreachable";
+ case ICMP_PROT_UNREACH:
+ return "Destination Protocol Unreachable";
+ case ICMP_PORT_UNREACH:
+ return "Destination Port Unreachable";
+ case ICMP_NET_ANO:
+ return "Destination Net Prohibited";
+ case ICMP_HOST_ANO:
+ return "Destination Host Prohibited";
+ }
+ break;
+ }
+ return NULL;
+}
+#endif /* AFS_RXERRQ_ENV */
+
+/**
+ * Get the last network error for a connection
+ *
+ * A "network error" here means an error retrieved from ICMP, or some other
+ * mechanism outside of Rx that informs us of errors in network reachability.
+ *
+ * If a peer associated with the given Rx connection has received a network
+ * error recently, this function allows the caller to know what error
+ * specifically occurred. This can be useful to know, since e.g. ICMP errors
+ * can cause calls to that peer to be quickly aborted. So, this function can
+ * help see why a call was aborted due to network errors.
+ *
+ * If we have received traffic from a peer since the last network error, we
+ * treat that peer as if we had not received an network error for it.
+ *
+ * @param[in] conn The Rx connection to examine
+ * @param[out] err_origin The origin of the last network error (e.g. ICMP);
+ * one of the RX_NETWORK_ERROR_ORIGIN_* constants
+ * @param[out] err_type The type of the last error
+ * @param[out] err_code The code of the last error
+ * @param[out] msg Human-readable error message, if applicable; NULL otherwise
+ *
+ * @return If we have an error
+ * @retval -1 No error to get; 'out' params are undefined
+ * @retval 0 We have an error; 'out' params contain the last error
+ */
+int
+rx_GetNetworkError(struct rx_connection *conn, int *err_origin, int *err_type,
+ int *err_code, const char **msg)
+{
+#ifdef AFS_RXERRQ_ENV
+ struct rx_peer *peer = conn->peer;
+ if (rx_atomic_read(&peer->neterrs)) {
+ MUTEX_ENTER(&peer->peer_lock);
+ *err_origin = peer->last_err_origin;
+ *err_type = peer->last_err_type;
+ *err_code = peer->last_err_code;
+ MUTEX_EXIT(&peer->peer_lock);
+
+ *msg = NULL;
+ if (*err_origin == RX_NETWORK_ERROR_ORIGIN_ICMP) {
+ *msg = rxi_TranslateICMP(*err_type, *err_code);
+ }
+
+ return 0;
+ }
+#endif
+ return -1;
+}
+
/* Find the peer process represented by the supplied (host,port)
* combination. If there is no appropriate active peer structure, a
* new one will be allocated and initialized
- * The origPeer, if set, is a pointer to a peer structure on which the
- * refcount will be be decremented. This is used to replace the peer
- * structure hanging off a connection structure */
+ */
struct rx_peer *
-rxi_FindPeer(afs_uint32 host, u_short port,
- struct rx_peer *origPeer, int create)
+rxi_FindPeer(afs_uint32 host, u_short port, int create)
{
struct rx_peer *pp;
int hashIndex;
pp = rxi_AllocPeer(); /* This bzero's *pp */
pp->host = host; /* set here or in InitPeerParams is zero */
pp->port = port;
+#ifdef AFS_RXERRQ_ENV
+ rx_atomic_set(&pp->neterrs, 0);
+#endif
MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
- queue_Init(&pp->congestionQueue);
- queue_Init(&pp->rpcStats);
+ opr_queue_Init(&pp->rpcStats);
pp->next = rx_peerHashTable[hashIndex];
rx_peerHashTable[hashIndex] = pp;
rxi_InitPeerParams(pp);
if (pp && create) {
pp->refCount++;
}
- if (origPeer)
- origPeer->refCount--;
MUTEX_EXIT(&rx_peerHashTable_lock);
return pp;
}
* parameter must match the existing index for the connection. If a
* server connection is created, it will be created using the supplied
* index, if the index is valid for this service */
-struct rx_connection *
+static struct rx_connection *
rxi_FindConnection(osi_socket socket, afs_uint32 host,
u_short port, u_short serviceId, afs_uint32 cid,
- afs_uint32 epoch, int type, u_int securityIndex)
+ afs_uint32 epoch, int type, u_int securityIndex,
+ int *unknownService)
{
int hashindex, flag, i;
struct rx_connection *conn;
+ *unknownService = 0;
hashindex = CONN_HASH(host, port, cid, epoch, type);
MUTEX_ENTER(&rx_connHashTable_lock);
rxLastConn ? (conn = rxLastConn, flag = 0) : (conn =
if (!service || (securityIndex >= service->nSecurityObjects)
|| (service->securityObjects[securityIndex] == 0)) {
MUTEX_EXIT(&rx_connHashTable_lock);
+ *unknownService = 1;
return (struct rx_connection *)0;
}
conn = rxi_AllocConnection(); /* This bzero's the connection */
CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
conn->next = rx_connHashTable[hashindex];
rx_connHashTable[hashindex] = conn;
- conn->peer = rxi_FindPeer(host, port, 0, 1);
+ conn->peer = rxi_FindPeer(host, port, 1);
conn->type = RX_SERVER_CONNECTION;
conn->lastSendTime = clock_Sec(); /* don't GC immediately */
conn->epoch = epoch;
conn->cid = cid & RX_CIDMASK;
- /* conn->serial = conn->lastSerial = 0; */
- /* conn->timeout = 0; */
conn->ackRate = RX_FAST_ACK_RATE;
conn->service = service;
conn->serviceId = serviceId;
conn->nSpecific = 0;
conn->specific = NULL;
rx_SetConnDeadTime(conn, service->connDeadTime);
- rx_SetConnIdleDeadTime(conn, service->idleDeadTime);
- rx_SetServerConnIdleDeadErr(conn, service->idleDeadErr);
+ conn->idleDeadTime = service->idleDeadTime;
+ conn->idleDeadDetection = service->idleDeadErr ? 1 : 0;
for (i = 0; i < RX_MAXCALLS; i++) {
conn->twind[i] = rx_initSendWindow;
conn->rwind[i] = rx_initReceiveWindow;
int channel = call->channel;
int freechannel = 0;
int i;
- afs_uint32 callNumber = *call->callNumber;
MUTEX_EXIT(&call->lock);
}
}
- MUTEX_EXIT(&conn->conn_call_lock);
-
MUTEX_ENTER(&call->lock);
- /* Since the call->lock and conn->conn_call_lock have been released it is
- * possible that (1) the call may no longer be busy and/or (2) the call may
- * have been reused by another waiting thread. Therefore, we must confirm
+ /* Since the call->lock has been released it is possible that the call may
+ * no longer be busy (the call channel cannot have been reallocated as we
+ * haven't dropped the conn_call_lock) Therefore, we must confirm
* that the call state has not changed when deciding whether or not to
* force this application thread to retry by forcing a Timeout error. */
- if (freechannel && *call->callNumber == callNumber &&
- (call->flags & RX_CALL_PEER_BUSY)) {
+ if (freechannel && (call->flags & RX_CALL_PEER_BUSY)) {
/* Since 'freechannel' is set, there exists another channel in this
* rx_conn that the application thread might be able to use. We know
* that we have the correct call since callNumber is unchanged, and we
* rxi_busyChannelError so the application can retry the request,
* presumably on a less-busy call channel. */
- rxi_CallError(call, rxi_busyChannelError);
+ rxi_CallError(call, RX_CALL_BUSY);
}
+ MUTEX_EXIT(&conn->conn_call_lock);
}
-/* There are two packet tracing routines available for testing and monitoring
- * Rx. One is called just after every packet is received and the other is
- * called just before every packet is sent. Received packets, have had their
- * headers decoded, and packets to be sent have not yet had their headers
- * encoded. Both take two parameters: a pointer to the packet and a sockaddr
- * containing the network address. Both can be modified. The return value, if
- * non-zero, indicates that the packet should be dropped. */
-
-int (*rx_justReceived) (struct rx_packet *, struct sockaddr_in *) = 0;
-int (*rx_almostSent) (struct rx_packet *, struct sockaddr_in *) = 0;
+/*!
+ * Abort the call if the server is over the busy threshold. This
+ * can be used without requiring a call structure be initialised,
+ * or connected to a particular channel
+ */
+static_inline int
+rxi_AbortIfServerBusy(osi_socket socket, struct rx_connection *conn,
+ struct rx_packet *np)
+{
+ if ((rx_BusyThreshold > 0) &&
+ (rx_atomic_read(&rx_nWaiting) > rx_BusyThreshold)) {
+ rxi_SendRawAbort(socket, conn->peer->host, conn->peer->port,
+ rx_BusyError, np, 0);
+ if (rx_stats_active)
+ rx_atomic_inc(&rx_stats.nBusies);
+ return 1;
+ }
-/* A packet has been received off the interface. Np is the packet, socket is
- * the socket number it was received from (useful in determining which service
- * this packet corresponds to), and (host, port) reflect the host,port of the
- * sender. This call returns the packet to the caller if it is finished with
- * it, rather than de-allocating it, just as a small performance hack */
+ return 0;
+}
-struct rx_packet *
-rxi_ReceivePacket(struct rx_packet *np, osi_socket socket,
- afs_uint32 host, u_short port, int *tnop,
- struct rx_call **newcallp)
+static_inline struct rx_call *
+rxi_ReceiveClientCall(struct rx_packet *np, struct rx_connection *conn)
{
- struct rx_call *call;
- struct rx_connection *conn;
int channel;
- afs_uint32 currentCallNumber;
- int type;
- int skew;
-#ifdef RXDEBUG
- char *packetType;
-#endif
- struct rx_packet *tnp;
-
-#ifdef RXDEBUG
-/* We don't print out the packet until now because (1) the time may not be
- * accurate enough until now in the lwp implementation (rx_Listener only gets
- * the time after the packet is read) and (2) from a protocol point of view,
- * this is the first time the packet has been seen */
- packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
- ? rx_packetTypes[np->header.type - 1] : "*UNKNOWN*";
- dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %"AFS_PTR_FMT"\n",
- np->header.serial, packetType, ntohl(host), ntohs(port), np->header.serviceId,
- np->header.epoch, np->header.cid, np->header.callNumber,
- np->header.seq, np->header.flags, np));
-#endif
+ struct rx_call *call;
- if (np->header.type == RX_PACKET_TYPE_VERSION) {
- return rxi_ReceiveVersionPacket(np, socket, host, port, 1);
+ channel = np->header.cid & RX_CHANNELMASK;
+ MUTEX_ENTER(&conn->conn_call_lock);
+ call = conn->call[channel];
+ if (!call || conn->callNumber[channel] != np->header.callNumber) {
+ MUTEX_EXIT(&conn->conn_call_lock);
+ if (rx_stats_active)
+ rx_atomic_inc(&rx_stats.spuriousPacketsRead);
+ return NULL;
}
- if (np->header.type == RX_PACKET_TYPE_DEBUG) {
- return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
+ MUTEX_ENTER(&call->lock);
+ MUTEX_EXIT(&conn->conn_call_lock);
+
+ if ((call->state == RX_STATE_DALLY)
+ && np->header.type == RX_PACKET_TYPE_ACK) {
+ if (rx_stats_active)
+ rx_atomic_inc(&rx_stats.ignorePacketDally);
+ MUTEX_EXIT(&call->lock);
+ return NULL;
}
-#ifdef RXDEBUG
- /* If an input tracer function is defined, call it with the packet and
- * network address. Note this function may modify its arguments. */
- if (rx_justReceived) {
- struct sockaddr_in addr;
- int drop;
- addr.sin_family = AF_INET;
- addr.sin_port = port;
- addr.sin_addr.s_addr = host;
-#ifdef STRUCT_SOCKADDR_HAS_SA_LEN
- addr.sin_len = sizeof(addr);
+
+ return call;
+}
+
+static_inline struct rx_call *
+rxi_ReceiveServerCall(osi_socket socket, struct rx_packet *np,
+ struct rx_connection *conn)
+{
+ int channel;
+ struct rx_call *call;
+
+ channel = np->header.cid & RX_CHANNELMASK;
+ MUTEX_ENTER(&conn->conn_call_lock);
+ call = conn->call[channel];
+
+ if (!call) {
+ if (rxi_AbortIfServerBusy(socket, conn, np)) {
+ MUTEX_EXIT(&conn->conn_call_lock);
+ return NULL;
+ }
+
+ call = rxi_NewCall(conn, channel); /* returns locked call */
+ *call->callNumber = np->header.callNumber;
+ MUTEX_EXIT(&conn->conn_call_lock);
+
+ call->state = RX_STATE_PRECALL;
+ clock_GetTime(&call->queueTime);
+ call->app.bytesSent = 0;
+ call->app.bytesRcvd = 0;
+ rxi_KeepAliveOn(call);
+
+ return call;
+ }
+
+ if (np->header.callNumber == conn->callNumber[channel]) {
+ MUTEX_ENTER(&call->lock);
+ MUTEX_EXIT(&conn->conn_call_lock);
+ return call;
+ }
+
+ if (np->header.callNumber < conn->callNumber[channel]) {
+ MUTEX_EXIT(&conn->conn_call_lock);
+ if (rx_stats_active)
+ rx_atomic_inc(&rx_stats.spuriousPacketsRead);
+ return NULL;
+ }
+
+ MUTEX_ENTER(&call->lock);
+ MUTEX_EXIT(&conn->conn_call_lock);
+
+ /* Wait until the transmit queue is idle before deciding
+ * whether to reset the current call. Chances are that the
+ * call will be in ether DALLY or HOLD state once the TQ_BUSY
+ * flag is cleared.
+ */
+#ifdef RX_ENABLE_LOCKS
+ if (call->state == RX_STATE_ACTIVE && !call->error) {
+ rxi_WaitforTQBusy(call);
+ /* If we entered error state while waiting,
+ * must call rxi_CallError to permit rxi_ResetCall
+ * to processed when the tqWaiter count hits zero.
+ */
+ if (call->error) {
+ rxi_CallError(call, call->error);
+ MUTEX_EXIT(&call->lock);
+ return NULL;
+ }
+ }
+#endif /* RX_ENABLE_LOCKS */
+ /* If the new call cannot be taken right now send a busy and set
+ * the error condition in this call, so that it terminates as
+ * quickly as possible */
+ if (call->state == RX_STATE_ACTIVE) {
+ rxi_CallError(call, RX_CALL_DEAD);
+ rxi_SendSpecial(call, conn, NULL, RX_PACKET_TYPE_BUSY,
+ NULL, 0, 1);
+ MUTEX_EXIT(&call->lock);
+ return NULL;
+ }
+
+ if (rxi_AbortIfServerBusy(socket, conn, np)) {
+ MUTEX_EXIT(&call->lock);
+ return NULL;
+ }
+
+ rxi_ResetCall(call, 0);
+ /* The conn_call_lock is not held but no one else should be
+ * using this call channel while we are processing this incoming
+ * packet. This assignment should be safe.
+ */
+ *call->callNumber = np->header.callNumber;
+ call->state = RX_STATE_PRECALL;
+ clock_GetTime(&call->queueTime);
+ call->app.bytesSent = 0;
+ call->app.bytesRcvd = 0;
+ rxi_KeepAliveOn(call);
+
+ return call;
+}
+
+
+/* There are two packet tracing routines available for testing and monitoring
+ * Rx. One is called just after every packet is received and the other is
+ * called just before every packet is sent. Received packets, have had their
+ * headers decoded, and packets to be sent have not yet had their headers
+ * encoded. Both take two parameters: a pointer to the packet and a sockaddr
+ * containing the network address. Both can be modified. The return value, if
+ * non-zero, indicates that the packet should be dropped. */
+
+int (*rx_justReceived) (struct rx_packet *, struct sockaddr_in *) = 0;
+int (*rx_almostSent) (struct rx_packet *, struct sockaddr_in *) = 0;
+
+/* A packet has been received off the interface. Np is the packet, socket is
+ * the socket number it was received from (useful in determining which service
+ * this packet corresponds to), and (host, port) reflect the host,port of the
+ * sender. This call returns the packet to the caller if it is finished with
+ * it, rather than de-allocating it, just as a small performance hack */
+
+struct rx_packet *
+rxi_ReceivePacket(struct rx_packet *np, osi_socket socket,
+ afs_uint32 host, u_short port, int *tnop,
+ struct rx_call **newcallp)
+{
+ struct rx_call *call;
+ struct rx_connection *conn;
+ int type;
+ int unknownService = 0;
+#ifdef RXDEBUG
+ char *packetType;
+#endif
+ struct rx_packet *tnp;
+
+#ifdef RXDEBUG
+/* We don't print out the packet until now because (1) the time may not be
+ * accurate enough until now in the lwp implementation (rx_Listener only gets
+ * the time after the packet is read) and (2) from a protocol point of view,
+ * this is the first time the packet has been seen */
+ packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
+ ? rx_packetTypes[np->header.type - 1] : "*UNKNOWN*";
+ dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %"AFS_PTR_FMT"\n",
+ np->header.serial, packetType, ntohl(host), ntohs(port), np->header.serviceId,
+ np->header.epoch, np->header.cid, np->header.callNumber,
+ np->header.seq, np->header.flags, np));
+#endif
+
+ /* Account for connectionless packets */
+ if (rx_stats_active &&
+ ((np->header.type == RX_PACKET_TYPE_VERSION) ||
+ (np->header.type == RX_PACKET_TYPE_DEBUG))) {
+ struct rx_peer *peer;
+
+ /* Try to look up the peer structure, but don't create one */
+ peer = rxi_FindPeer(host, port, 0);
+
+ /* Since this may not be associated with a connection, it may have
+ * no refCount, meaning we could race with ReapConnections
+ */
+
+ if (peer && (peer->refCount > 0)) {
+#ifdef AFS_RXERRQ_ENV
+ if (rx_atomic_read(&peer->neterrs)) {
+ rx_atomic_set(&peer->neterrs, 0);
+ }
+#endif
+ MUTEX_ENTER(&peer->peer_lock);
+ peer->bytesReceived += np->length;
+ MUTEX_EXIT(&peer->peer_lock);
+ }
+ }
+
+ if (np->header.type == RX_PACKET_TYPE_VERSION) {
+ return rxi_ReceiveVersionPacket(np, socket, host, port, 1);
+ }
+
+ if (np->header.type == RX_PACKET_TYPE_DEBUG) {
+ return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
+ }
+#ifdef RXDEBUG
+ /* If an input tracer function is defined, call it with the packet and
+ * network address. Note this function may modify its arguments. */
+ if (rx_justReceived) {
+ struct sockaddr_in addr;
+ int drop;
+ addr.sin_family = AF_INET;
+ addr.sin_port = port;
+ addr.sin_addr.s_addr = host;
+ memset(&addr.sin_zero, 0, sizeof(addr.sin_zero));
+#ifdef STRUCT_SOCKADDR_HAS_SA_LEN
+ addr.sin_len = sizeof(addr);
#endif /* AFS_OSF_ENV */
drop = (*rx_justReceived) (np, &addr);
/* drop packet if return value is non-zero */
conn =
rxi_FindConnection(socket, host, port, np->header.serviceId,
np->header.cid, np->header.epoch, type,
- np->header.securityIndex);
+ np->header.securityIndex, &unknownService);
+ /* To avoid having 2 connections just abort at each other,
+ don't abort an abort. */
if (!conn) {
- /* If no connection found or fabricated, just ignore the packet.
- * (An argument could be made for sending an abort packet for
- * the conn) */
- return np;
+ if (unknownService && (np->header.type != RX_PACKET_TYPE_ABORT))
+ rxi_SendRawAbort(socket, host, port, RX_INVALID_OPERATION,
+ np, 0);
+ return np;
+ }
+
+#ifdef AFS_RXERRQ_ENV
+ if (rx_atomic_read(&conn->peer->neterrs)) {
+ rx_atomic_set(&conn->peer->neterrs, 0);
+ }
+#endif
+
+ /* If we're doing statistics, then account for the incoming packet */
+ if (rx_stats_active) {
+ MUTEX_ENTER(&conn->peer->peer_lock);
+ conn->peer->bytesReceived += np->length;
+ MUTEX_EXIT(&conn->peer->peer_lock);
}
/* If the connection is in an error state, send an abort packet and ignore
MUTEX_ENTER(&conn->conn_data_lock);
if (np->header.type != RX_PACKET_TYPE_ABORT)
np = rxi_SendConnectionAbort(conn, np, 1, 0);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ putConnection(conn);
MUTEX_EXIT(&conn->conn_data_lock);
return np;
}
afs_int32 errcode = ntohl(rx_GetInt32(np, 0));
dpf(("rxi_ReceivePacket ABORT rx_GetInt32 = %d\n", errcode));
rxi_ConnectionError(conn, errcode);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ putConnection(conn);
return np;
}
case RX_PACKET_TYPE_CHALLENGE:
tnp = rxi_ReceiveChallengePacket(conn, np, 1);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ putConnection(conn);
return tnp;
case RX_PACKET_TYPE_RESPONSE:
tnp = rxi_ReceiveResponsePacket(conn, np, 1);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ putConnection(conn);
return tnp;
case RX_PACKET_TYPE_PARAMS:
case RX_PACKET_TYPE_PARAMS + 1:
case RX_PACKET_TYPE_PARAMS + 2:
/* ignore these packet types for now */
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ putConnection(conn);
return np;
-
default:
/* Should not reach here, unless the peer is broken: send an
* abort packet */
rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
MUTEX_ENTER(&conn->conn_data_lock);
tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ putConnection(conn);
MUTEX_EXIT(&conn->conn_data_lock);
return tnp;
}
}
- channel = np->header.cid & RX_CHANNELMASK;
- call = conn->call[channel];
-
- if (call) {
- MUTEX_ENTER(&call->lock);
- currentCallNumber = conn->callNumber[channel];
- } else if (type == RX_SERVER_CONNECTION) { /* No call allocated */
- MUTEX_ENTER(&conn->conn_call_lock);
- call = conn->call[channel];
- if (call) {
- MUTEX_ENTER(&call->lock);
- MUTEX_EXIT(&conn->conn_call_lock);
- currentCallNumber = conn->callNumber[channel];
- } else {
- call = rxi_NewCall(conn, channel); /* returns locked call */
- MUTEX_EXIT(&conn->conn_call_lock);
- *call->callNumber = currentCallNumber = np->header.callNumber;
-#ifdef RXDEBUG
- if (np->header.callNumber == 0)
- dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" len %d\n",
- np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port),
- np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq,
- np->header.flags, np, np->length));
-#endif
- call->state = RX_STATE_PRECALL;
- clock_GetTime(&call->queueTime);
- hzero(call->bytesSent);
- hzero(call->bytesRcvd);
- /*
- * If the number of queued calls exceeds the overload
- * threshold then abort this call.
- */
- if ((rx_BusyThreshold > 0) &&
- (rx_atomic_read(&rx_nWaiting) > rx_BusyThreshold)) {
- struct rx_packet *tp;
-
- rxi_CallError(call, rx_BusyError);
- tp = rxi_SendCallAbort(call, np, 1, 0);
- MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
- if (rx_stats_active)
- rx_atomic_inc(&rx_stats.nBusies);
- return tp;
- }
- rxi_KeepAliveOn(call);
- }
- } else { /* RX_CLIENT_CONNECTION and No call allocated */
- /* This packet can't be for this call. If the new call address is
- * 0 then no call is running on this channel. If there is a call
- * then, since this is a client connection we're getting data for
- * it must be for the previous call.
- */
- if (rx_stats_active)
- rx_atomic_inc(&rx_stats.spuriousPacketsRead);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
- return np;
- }
-
- /* There is a non-NULL locked call at this point */
- if (type == RX_SERVER_CONNECTION) { /* We're the server */
- if (np->header.callNumber < currentCallNumber) {
- MUTEX_EXIT(&call->lock);
- if (rx_stats_active)
- rx_atomic_inc(&rx_stats.spuriousPacketsRead);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
- return np;
- } else if (np->header.callNumber != currentCallNumber) {
- /* Wait until the transmit queue is idle before deciding
- * whether to reset the current call. Chances are that the
- * call will be in ether DALLY or HOLD state once the TQ_BUSY
- * flag is cleared.
- */
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- if (call->state == RX_STATE_ACTIVE) {
- rxi_WaitforTQBusy(call);
- /*
- * If we entered error state while waiting,
- * must call rxi_CallError to permit rxi_ResetCall
- * to processed when the tqWaiter count hits zero.
- */
- if (call->error) {
- rxi_CallError(call, call->error);
- MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
- return np;
- }
- }
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- /* If the new call cannot be taken right now send a busy and set
- * the error condition in this call, so that it terminates as
- * quickly as possible */
- if (call->state == RX_STATE_ACTIVE) {
- struct rx_packet *tp;
-
- rxi_CallError(call, RX_CALL_DEAD);
- tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY,
- NULL, 0, 1);
- MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
- return tp;
- }
- rxi_ResetCall(call, 0);
- *call->callNumber = np->header.callNumber;
-#ifdef RXDEBUG
- if (np->header.callNumber == 0)
- dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" len %d\n",
- np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port),
- np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq,
- np->header.flags, np, np->length));
-#endif
- call->state = RX_STATE_PRECALL;
- clock_GetTime(&call->queueTime);
- hzero(call->bytesSent);
- hzero(call->bytesRcvd);
- /*
- * If the number of queued calls exceeds the overload
- * threshold then abort this call.
- */
- if ((rx_BusyThreshold > 0) &&
- (rx_atomic_read(&rx_nWaiting) > rx_BusyThreshold)) {
- struct rx_packet *tp;
-
- rxi_CallError(call, rx_BusyError);
- tp = rxi_SendCallAbort(call, np, 1, 0);
- MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
- if (rx_stats_active)
- rx_atomic_inc(&rx_stats.nBusies);
- return tp;
- }
- rxi_KeepAliveOn(call);
- } else {
- /* Continuing call; do nothing here. */
- }
- } else { /* we're the client */
- /* Ignore all incoming acknowledgements for calls in DALLY state */
- if ((call->state == RX_STATE_DALLY)
- && (np->header.type == RX_PACKET_TYPE_ACK)) {
- if (rx_stats_active)
- rx_atomic_inc(&rx_stats.ignorePacketDally);
- MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
- return np;
- }
-
- /* Ignore anything that's not relevant to the current call. If there
- * isn't a current call, then no packet is relevant. */
- if (np->header.callNumber != currentCallNumber) {
- if (rx_stats_active)
- rx_atomic_inc(&rx_stats.spuriousPacketsRead);
- MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
- return np;
- }
- /* If the service security object index stamped in the packet does not
- * match the connection's security index, ignore the packet */
- if (np->header.securityIndex != conn->securityIndex) {
- MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
- return np;
- }
+ if (type == RX_SERVER_CONNECTION)
+ call = rxi_ReceiveServerCall(socket, np, conn);
+ else
+ call = rxi_ReceiveClientCall(np, conn);
- /* If we're receiving the response, then all transmit packets are
- * implicitly acknowledged. Get rid of them. */
- if (np->header.type == RX_PACKET_TYPE_DATA) {
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- /* XXX Hack. Because we must release the global rx lock when
- * sending packets (osi_NetSend) we drop all acks while we're
- * traversing the tq in rxi_Start sending packets out because
- * packets may move to the freePacketQueue as result of being here!
- * So we drop these packets until we're safely out of the
- * traversing. Really ugly!
- * For fine grain RX locking, we set the acked field in the
- * packets and let rxi_Start remove them from the transmit queue.
- */
- if (call->flags & RX_CALL_TQ_BUSY) {
-#ifdef RX_ENABLE_LOCKS
- rxi_SetAcksInTransmitQueue(call);
-#else
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
- return np; /* xmitting; drop packet */
-#endif
- } else {
- rxi_ClearTransmitQueue(call, 0);
- }
-#else /* AFS_GLOBAL_RXLOCK_KERNEL */
- rxi_ClearTransmitQueue(call, 0);
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- } else {
- if (np->header.type == RX_PACKET_TYPE_ACK) {
- /* now check to see if this is an ack packet acknowledging that the
- * server actually *lost* some hard-acked data. If this happens we
- * ignore this packet, as it may indicate that the server restarted in
- * the middle of a call. It is also possible that this is an old ack
- * packet. We don't abort the connection in this case, because this
- * *might* just be an old ack packet. The right way to detect a server
- * restart in the midst of a call is to notice that the server epoch
- * changed, btw. */
- /* XXX I'm not sure this is exactly right, since tfirst **IS**
- * XXX unacknowledged. I think that this is off-by-one, but
- * XXX I don't dare change it just yet, since it will
- * XXX interact badly with the server-restart detection
- * XXX code in receiveackpacket. */
- if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
- if (rx_stats_active)
- rx_atomic_inc(&rx_stats.spuriousPacketsRead);
- MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
- return np;
- }
- }
- } /* else not a data packet */
+ if (call == NULL) {
+ putConnection(conn);
+ return np;
}
- osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
+ MUTEX_ASSERT(&call->lock);
/* Set remote user defined status from packet */
call->remoteStatus = np->header.userStatus;
- /* Note the gap between the expected next packet and the actual
- * packet that arrived, when the new packet has a smaller serial number
- * than expected. Rioses frequently reorder packets all by themselves,
- * so this will be quite important with very large window sizes.
- * Skew is checked against 0 here to avoid any dependence on the type of
- * inPacketSkew (which may be unsigned). In C, -1 > (unsigned) 0 is always
- * true!
- * The inPacketSkew should be a smoothed running value, not just a maximum. MTUXXX
- * see CalculateRoundTripTime for an example of how to keep smoothed values.
- * I think using a beta of 1/8 is probably appropriate. 93.04.21
- */
- MUTEX_ENTER(&conn->conn_data_lock);
- skew = conn->lastSerial - np->header.serial;
- conn->lastSerial = np->header.serial;
- MUTEX_EXIT(&conn->conn_data_lock);
- if (skew > 0) {
- struct rx_peer *peer;
- peer = conn->peer;
- if (skew > peer->inPacketSkew) {
- dpf(("*** In skew changed from %d to %d\n",
- peer->inPacketSkew, skew));
- peer->inPacketSkew = skew;
- }
- }
-
/* Now do packet type-specific processing */
switch (np->header.type) {
case RX_PACKET_TYPE_DATA:
+ /* If we're a client, and receiving a response, then all the packets
+ * we transmitted packets are implicitly acknowledged. */
+ if (type == RX_CLIENT_CONNECTION && !opr_queue_IsEmpty(&call->tq))
+ rxi_AckAllInTransmitQueue(call);
+
np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port, tnop,
newcallp);
break;
dpf(("rxi_ReceivePacket ABORT rx_DataOf = %d\n", errdata));
rxi_CallError(call, errdata);
MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ putConnection(conn);
return np; /* xmitting; drop packet */
}
case RX_PACKET_TYPE_BUSY: {
MUTEX_EXIT(&call->lock);
MUTEX_EXIT(&conn->conn_call_lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ putConnection(conn);
return np;
}
case RX_PACKET_TYPE_ACKALL:
/* All packets acknowledged, so we can drop all packets previously
* readied for sending */
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- /* XXX Hack. We because we can't release the global rx lock when
- * sending packets (osi_NetSend) we drop all ack pkts while we're
- * traversing the tq in rxi_Start sending packets out because
- * packets may move to the freePacketQueue as result of being
- * here! So we drop these packets until we're safely out of the
- * traversing. Really ugly!
- * For fine grain RX locking, we set the acked field in the packets
- * and let rxi_Start remove the packets from the transmit queue.
- */
- if (call->flags & RX_CALL_TQ_BUSY) {
-#ifdef RX_ENABLE_LOCKS
- rxi_SetAcksInTransmitQueue(call);
- break;
-#else /* RX_ENABLE_LOCKS */
- MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
- return np; /* xmitting; drop packet */
-#endif /* RX_ENABLE_LOCKS */
- }
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- rxi_ClearTransmitQueue(call, 0);
+ rxi_AckAllInTransmitQueue(call);
break;
default:
/* Should not reach here, unless the peer is broken: send an abort
/* we've received a legit packet, so the channel is not busy */
call->flags &= ~RX_CALL_PEER_BUSY;
MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ putConnection(conn);
return np;
}
if ((tcall->state == RX_STATE_PRECALL)
|| (tcall->state == RX_STATE_ACTIVE))
return 1;
- if ((tcall->mode == RX_MODE_SENDING)
- || (tcall->mode == RX_MODE_RECEIVING))
+ if ((tcall->app.mode == RX_MODE_SENDING)
+ || (tcall->app.mode == RX_MODE_RECEIVING))
return 1;
}
}
}
static void
-rxi_CheckReachEvent(struct rxevent *event, void *arg1, void *arg2)
+rxi_CheckReachEvent(struct rxevent *event, void *arg1, void *arg2, int dummy)
{
struct rx_connection *conn = arg1;
struct rx_call *acall = arg2;
int i, waiting;
MUTEX_ENTER(&conn->conn_data_lock);
- conn->checkReachEvent = NULL;
+
+ if (event)
+ rxevent_Put(&conn->checkReachEvent);
+
waiting = conn->flags & RX_CONN_ATTACHWAIT;
if (event) {
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ putConnection(conn);
}
MUTEX_EXIT(&conn->conn_data_lock);
MUTEX_ENTER(&rx_refcnt_mutex);
conn->refCount++;
MUTEX_EXIT(&rx_refcnt_mutex);
- conn->checkReachEvent =
- rxevent_PostNow(&when, &now, rxi_CheckReachEvent, conn,
- NULL);
+ conn->checkReachEvent = rxevent_Post(&when, &now,
+ rxi_CheckReachEvent, conn,
+ NULL, 0);
}
MUTEX_EXIT(&conn->conn_data_lock);
}
conn->flags |= RX_CONN_ATTACHWAIT;
MUTEX_EXIT(&conn->conn_data_lock);
if (!conn->checkReachEvent)
- rxi_CheckReachEvent(NULL, conn, call);
+ rxi_CheckReachEvent(NULL, conn, call, 0);
return 1;
}
* appropriate to the call (the call is in the right state, etc.). This
* routine can return a packet to the caller, for re-use */
-struct rx_packet *
+static struct rx_packet *
rxi_ReceiveDataPacket(struct rx_call *call,
struct rx_packet *np, int istack,
osi_socket socket, afs_uint32 host, u_short port,
MUTEX_EXIT(&rx_freePktQ_lock);
if (rx_stats_active)
rx_atomic_inc(&rx_stats.noPacketBuffersOnRead);
- call->rprev = np->header.serial;
rxi_calltrace(RX_TRACE_DROP, call);
dpf(("packet %"AFS_PTR_FMT" dropped on receipt - quota problems\n", np));
/* We used to clear the receive queue here, in an attempt to free
* packets. However this is unsafe if the queue has received a
* soft ACK for the final packet */
rxi_PostDelayedAckEvent(call, &rx_softAckDelay);
-
- /* we've damaged this call already, might as well do it in. */
return np;
}
#endif /* KERNEL */
if (seq == call->rnext) {
/* Check to make sure it is not a duplicate of one already queued */
- if (queue_IsNotEmpty(&call->rq)
- && queue_First(&call->rq, rx_packet)->header.seq == seq) {
+ if (!opr_queue_IsEmpty(&call->rq)
+ && opr_queue_First(&call->rq, struct rx_packet, entry)->header.seq == seq) {
if (rx_stats_active)
rx_atomic_inc(&rx_stats.dupPacketsRead);
dpf(("packet %"AFS_PTR_FMT" dropped on receipt - duplicate\n", np));
- rxevent_Cancel(call->delayedAckEvent, call,
- RX_CALL_REFCOUNT_DELAY);
+ rxi_CancelDelayedAckEvent(call);
np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack);
ackNeeded = 0;
call->rprev = seq;
#ifdef RX_TRACK_PACKETS
np->flags |= RX_PKTFLAG_RQ;
#endif
- queue_Prepend(&call->rq, np);
+ opr_queue_Prepend(&call->rq, &np->entry);
#ifdef RXDEBUG_PACKET
call->rqc++;
#endif /* RXDEBUG_PACKET */
/* Check whether we have all of the packets for this call */
if (call->flags & RX_CALL_HAVE_LAST) {
afs_uint32 tseq; /* temporary sequence number */
- struct rx_packet *tp; /* Temporary packet pointer */
- struct rx_packet *nxp; /* Next pointer, for queue_Scan */
+ struct opr_queue *cursor;
- for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
+ for (tseq = seq, opr_queue_Scan(&call->rq, cursor)) {
+ struct rx_packet *tp;
+
+ tp = opr_queue_Entry(cursor, struct rx_packet, entry);
if (tseq != tp->header.seq)
break;
if (tp->header.flags & RX_LAST_PACKET) {
* any of this packets predecessors are missing. */
afs_uint32 prev; /* "Previous packet" sequence number */
- struct rx_packet *tp; /* Temporary packet pointer */
- struct rx_packet *nxp; /* Next pointer, for queue_Scan */
+ struct opr_queue *cursor;
int missing; /* Are any predecessors missing? */
/* If the new packet's sequence number has been sent to the
if (seq < call->rnext) {
if (rx_stats_active)
rx_atomic_inc(&rx_stats.dupPacketsRead);
- rxevent_Cancel(call->delayedAckEvent, call,
- RX_CALL_REFCOUNT_DELAY);
+ rxi_CancelDelayedAckEvent(call);
np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack);
ackNeeded = 0;
call->rprev = seq;
* accomodated by the current window, then send a negative
* acknowledge and drop the packet */
if ((call->rnext + call->rwind) <= seq) {
- rxevent_Cancel(call->delayedAckEvent, call,
- RX_CALL_REFCOUNT_DELAY);
+ rxi_CancelDelayedAckEvent(call);
np = rxi_SendAck(call, np, serial, RX_ACK_EXCEEDS_WINDOW,
istack);
ackNeeded = 0;
}
/* Look for the packet in the queue of old received packets */
- for (prev = call->rnext - 1, missing =
- 0, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
+ prev = call->rnext - 1;
+ missing = 0;
+ for (opr_queue_Scan(&call->rq, cursor)) {
+ struct rx_packet *tp
+ = opr_queue_Entry(cursor, struct rx_packet, entry);
+
/*Check for duplicate packet */
if (seq == tp->header.seq) {
if (rx_stats_active)
rx_atomic_inc(&rx_stats.dupPacketsRead);
- rxevent_Cancel(call->delayedAckEvent, call,
- RX_CALL_REFCOUNT_DELAY);
+ rxi_CancelDelayedAckEvent(call);
np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE,
istack);
ackNeeded = 0;
#ifdef RXDEBUG_PACKET
call->rqc++;
#endif /* RXDEBUG_PACKET */
- queue_InsertBefore(tp, np);
+ opr_queue_InsertBefore(cursor, &np->entry);
call->nSoftAcks++;
np = NULL;
&& !(call->flags & RX_CALL_RECEIVE_DONE)) {
afs_uint32 tseq; /* temporary sequence number */
- for (tseq =
- call->rnext, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
+ tseq = call->rnext;
+ for (opr_queue_Scan(&call->rq, cursor)) {
+ struct rx_packet *tp
+ = opr_queue_Entry(cursor, struct rx_packet, entry);
if (tseq != tp->header.seq)
break;
if (tp->header.flags & RX_LAST_PACKET) {
* received. Always send a soft ack for the last packet in
* the server's reply. */
if (ackNeeded) {
- rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
+ rxi_CancelDelayedAckEvent(call);
np = rxi_SendAck(call, np, serial, ackNeeded, istack);
} else if (call->nSoftAcks > (u_short) rxi_SoftAckRate) {
- rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
+ rxi_CancelDelayedAckEvent(call);
np = rxi_SendAck(call, np, serial, RX_ACK_IDLE, istack);
} else if (call->nSoftAcks) {
if (haveLast && !(flags & RX_CLIENT_INITIATED))
else
rxi_PostDelayedAckEvent(call, &rx_softAckDelay);
} else if (call->flags & RX_CALL_RECEIVE_DONE) {
- rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
+ rxi_CancelDelayedAckEvent(call);
}
return np;
/* The real smarts of the whole thing. */
-struct rx_packet *
+static struct rx_packet *
rxi_ReceiveAckPacket(struct rx_call *call, struct rx_packet *np,
int istack)
{
struct rx_ackPacket *ap;
int nAcks;
struct rx_packet *tp;
- struct rx_packet *nxp; /* Next packet pointer for queue_Scan */
struct rx_connection *conn = call->conn;
struct rx_peer *peer = conn->peer;
+ struct opr_queue *cursor;
struct clock now; /* Current time, for RTT calculations */
afs_uint32 first;
afs_uint32 prev;
afs_uint32 serial;
- /* because there are CM's that are bogus, sending weird values for this. */
- afs_uint32 skew = 0;
int nbytes;
int missing;
int acked;
first = ntohl(ap->firstPacket);
prev = ntohl(ap->previousPacket);
serial = ntohl(ap->serial);
- /* temporarily disabled -- needs to degrade over time
- * skew = ntohs(ap->maxSkew); */
- /* Ignore ack packets received out of order */
+ /*
+ * Ignore ack packets received out of order while protecting
+ * against peers that set the previousPacket field to a packet
+ * serial number instead of a sequence number.
+ */
if (first < call->tfirst ||
- (first == call->tfirst && prev < call->tprev)) {
+ (first == call->tfirst && prev < call->tprev && prev < call->tfirst
+ + call->twind)) {
return np;
}
size_t len;
len = _snprintf(msg, sizeof(msg),
- "tid[%d] RACK: reason %s serial %u previous %u seq %u skew %d first %u acks %u space %u ",
+ "tid[%d] RACK: reason %s serial %u previous %u seq %u first %u acks %u space %u ",
GetCurrentThreadId(), rx_ack_reason(ap->reason),
ntohl(ap->serial), ntohl(ap->previousPacket),
- (unsigned int)np->header.seq, (unsigned int)skew,
- ntohl(ap->firstPacket), ap->nAcks, ntohs(ap->bufferSpace) );
+ (unsigned int)np->header.seq, ntohl(ap->firstPacket),
+ ap->nAcks, ntohs(ap->bufferSpace) );
if (nAcks) {
int offset;
#else /* AFS_NT40_ENV */
if (rx_Log) {
fprintf(rx_Log,
- "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
+ "RACK: reason %x previous %u seq %u serial %u first %u",
ap->reason, ntohl(ap->previousPacket),
(unsigned int)np->header.seq, (unsigned int)serial,
- (unsigned int)skew, ntohl(ap->firstPacket));
+ ntohl(ap->firstPacket));
if (nAcks) {
int offset;
for (offset = 0; offset < nAcks; offset++)
}
}
- /* Update the outgoing packet skew value to the latest value of
- * the peer's incoming packet skew value. The ack packet, of
- * course, could arrive out of order, but that won't affect things
- * much */
- peer->outPacketSkew = skew;
-
-
clock_GetTime(&now);
/* The transmit queue splits into 4 sections.
* disposed of
*/
- tp = queue_First(&call->tq, rx_packet);
- while(!queue_IsEnd(&call->tq, tp) && tp->header.seq < first) {
+ tp = opr_queue_First(&call->tq, struct rx_packet, entry);
+ while(!opr_queue_IsEnd(&call->tq, &tp->entry) && tp->header.seq < first) {
struct rx_packet *next;
- next = queue_Next(tp, rx_packet);
+ next = opr_queue_Next(&tp->entry, struct rx_packet, entry);
call->tfirst = tp->header.seq + 1;
if (!(tp->flags & RX_PKTFLAG_ACKED)) {
rxi_ComputeRoundTripTime(tp, ap, call, peer, &now);
}
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- /* XXX Hack. Because we have to release the global rx lock when sending
+#ifdef RX_ENABLE_LOCKS
+ /* XXX Hack. Because we have to release the global call lock when sending
* packets (osi_NetSend) we drop all acks while we're traversing the tq
* in rxi_Start sending packets out because packets may move to the
* freePacketQueue as result of being here! So we drop these packets until
* when it's done transmitting.
*/
if (call->flags & RX_CALL_TQ_BUSY) {
-#ifdef RX_ENABLE_LOCKS
tp->flags |= RX_PKTFLAG_ACKED;
call->flags |= RX_CALL_TQ_SOME_ACKED;
-#else /* RX_ENABLE_LOCKS */
- break;
-#endif /* RX_ENABLE_LOCKS */
} else
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif /* RX_ENABLE_LOCKS */
{
- queue_Remove(tp);
+ opr_queue_Remove(&tp->entry);
#ifdef RX_TRACK_PACKETS
tp->flags &= ~RX_PKTFLAG_TQ;
#endif
call->nSoftAcked = 0;
missing = 0;
- while (!queue_IsEnd(&call->tq, tp) && tp->header.seq < first + nAcks) {
+ while (!opr_queue_IsEnd(&call->tq, &tp->entry)
+ && tp->header.seq < first + nAcks) {
/* Set the acknowledge flag per packet based on the
* information in the ack packet. An acknowlegded packet can
* be downgraded when the server has discarded a packet it
missing = 1;
}
- tp = queue_Next(tp, rx_packet);
+ tp = opr_queue_Next(&tp->entry, struct rx_packet, entry);
}
/* We don't need to take any action with the 3rd or 4th section in the
* so we will retransmit as soon as the window permits
*/
- for (acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
+ acked = 0;
+ for (opr_queue_ScanBackwards(&call->tq, cursor)) {
+ struct rx_packet *tp =
+ opr_queue_Entry(cursor, struct rx_packet, entry);
if (acked) {
if (!(tp->flags & RX_PKTFLAG_ACKED)) {
tp->flags &= ~RX_PKTFLAG_SENT;
&& call->tfirst + call->nSoftAcked >= call->tnext) {
call->state = RX_STATE_DALLY;
rxi_ClearTransmitQueue(call, 0);
- rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
- } else if (!queue_IsEmpty(&call->tq)) {
+ rxi_CancelKeepAliveEvent(call);
+ } else if (!opr_queue_IsEmpty(&call->tq)) {
rxi_Start(call, istack);
}
return np;
}
+/**
+ * Schedule a connection abort to be sent after some delay.
+ *
+ * @param[in] conn The connection to send the abort on.
+ * @param[in] msec The number of milliseconds to wait before sending.
+ *
+ * @pre conn_data_lock must be held
+ */
+static void
+rxi_SendConnectionAbortLater(struct rx_connection *conn, int msec)
+{
+ struct clock when, now;
+ if (!conn->error) {
+ return;
+ }
+ if (!conn->delayedAbortEvent) {
+ clock_GetTime(&now);
+ when = now;
+ clock_Addmsec(&when, msec);
+ conn->delayedAbortEvent =
+ rxevent_Post(&when, &now, rxi_SendDelayedConnAbort, conn, NULL, 0);
+ }
+}
+
/* Received a response to a challenge packet */
-struct rx_packet *
+static struct rx_packet *
rxi_ReceiveResponsePacket(struct rx_connection *conn,
struct rx_packet *np, int istack)
{
error = RXS_CheckResponse(conn->securityObject, conn, np);
if (error) {
/* If the response is invalid, reset the connection, sending
- * an abort to the peer */
-#ifndef KERNEL
- rxi_Delay(1);
-#endif
+ * an abort to the peer. Send the abort with a 1 second delay,
+ * to avoid a peer hammering us by constantly recreating a
+ * connection with bad credentials. */
rxi_ConnectionError(conn, error);
MUTEX_ENTER(&conn->conn_data_lock);
- np = rxi_SendConnectionAbort(conn, np, istack, 0);
+ rxi_SendConnectionAbortLater(conn, 1000);
MUTEX_EXIT(&conn->conn_data_lock);
return np;
} else {
* back to the server. The server is responsible for retrying the
* challenge if it fails to get a response. */
-struct rx_packet *
+static struct rx_packet *
rxi_ReceiveChallengePacket(struct rx_connection *conn,
struct rx_packet *np, int istack)
{
/* Find an available server process to service the current request in
* the given call structure. If one isn't available, queue up this
* call so it eventually gets one */
-void
+static void
rxi_AttachServerProc(struct rx_call *call,
osi_socket socket, int *tnop,
struct rx_call **newcallp)
MUTEX_ENTER(&rx_serverPool_lock);
haveQuota = QuotaOK(service);
- if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) {
+ if ((!haveQuota) || opr_queue_IsEmpty(&rx_idleServerQueue)) {
/* If there are no processes available to service this call,
* put the call on the incoming call queue (unless it's
* already on the queue).
rx_atomic_inc(&rx_nWaited);
rxi_calltrace(RX_CALL_ARRIVAL, call);
SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
- queue_Append(&rx_incomingCallQueue, call);
+ opr_queue_Append(&rx_incomingCallQueue, &call->entry);
}
} else {
- sq = queue_Last(&rx_idleServerQueue, rx_serverQueueEntry);
+ sq = opr_queue_Last(&rx_idleServerQueue,
+ struct rx_serverQueueEntry, entry);
/* If hot threads are enabled, and both newcallp and sq->socketp
* are non-null, then this thread will process the call, and the
* idle server thread will start listening on this threads socket.
*/
- queue_Remove(sq);
+ opr_queue_Remove(&sq->entry);
+
if (rx_enable_hot_thread && newcallp && sq->socketp) {
*newcallp = call;
*tnop = sq->tno;
*sq->socketp = socket;
clock_GetTime(&call->startTime);
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
- MUTEX_EXIT(&rx_refcnt_mutex);
} else {
sq->newcall = call;
}
if (call->flags & RX_CALL_WAIT_PROC) {
/* Conservative: I don't think this should happen */
call->flags &= ~RX_CALL_WAIT_PROC;
- if (queue_IsOnQueue(call)) {
- queue_Remove(call);
-
- rx_atomic_dec(&rx_nWaiting);
+ rx_atomic_dec(&rx_nWaiting);
+ if (opr_queue_IsOnQueue(&call->entry)) {
+ opr_queue_Remove(&call->entry);
}
}
call->state = RX_STATE_ACTIVE;
- call->mode = RX_MODE_RECEIVING;
+ call->app.mode = RX_MODE_RECEIVING;
#ifdef RX_KERNEL_TRACE
{
int glockOwner = ISAFS_GLOCK();
* a new call is being prepared (in the case of a client) or a reply
* is being prepared (in the case of a server). Rather than sending
* an ack packet, an ACKALL packet is sent. */
-void
-rxi_AckAll(struct rxevent *event, struct rx_call *call, char *dummy)
+static void
+rxi_AckAll(struct rx_call *call)
{
-#ifdef RX_ENABLE_LOCKS
- if (event) {
- MUTEX_ENTER(&call->lock);
- call->delayedAckEvent = NULL;
- MUTEX_ENTER(&rx_refcnt_mutex);
- CALL_RELE(call, RX_CALL_REFCOUNT_ACKALL);
- MUTEX_EXIT(&rx_refcnt_mutex);
- }
- rxi_SendSpecial(call, call->conn, (struct rx_packet *)0,
- RX_PACKET_TYPE_ACKALL, NULL, 0, 0);
- call->flags |= RX_CALL_ACKALL_SENT;
- if (event)
- MUTEX_EXIT(&call->lock);
-#else /* RX_ENABLE_LOCKS */
- if (event)
- call->delayedAckEvent = NULL;
- rxi_SendSpecial(call, call->conn, (struct rx_packet *)0,
- RX_PACKET_TYPE_ACKALL, NULL, 0, 0);
+ rxi_SendSpecial(call, call->conn, NULL, RX_PACKET_TYPE_ACKALL,
+ NULL, 0, 0);
call->flags |= RX_CALL_ACKALL_SENT;
-#endif /* RX_ENABLE_LOCKS */
}
-void
-rxi_SendDelayedAck(struct rxevent *event, void *arg1, void *unused)
+static void
+rxi_SendDelayedAck(struct rxevent *event, void *arg1, void *unused1,
+ int unused2)
{
struct rx_call *call = arg1;
#ifdef RX_ENABLE_LOCKS
if (event) {
MUTEX_ENTER(&call->lock);
if (event == call->delayedAckEvent)
- call->delayedAckEvent = NULL;
- MUTEX_ENTER(&rx_refcnt_mutex);
+ rxevent_Put(&call->delayedAckEvent);
CALL_RELE(call, RX_CALL_REFCOUNT_DELAY);
- MUTEX_EXIT(&rx_refcnt_mutex);
}
(void)rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
if (event)
MUTEX_EXIT(&call->lock);
#else /* RX_ENABLE_LOCKS */
if (event)
- call->delayedAckEvent = NULL;
+ rxevent_Put(&call->delayedAckEvent);
(void)rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
#endif /* RX_ENABLE_LOCKS */
}
-
#ifdef RX_ENABLE_LOCKS
/* Set ack in all packets in transmit queue. rxi_Start will deal with
* clearing them out.
static void
rxi_SetAcksInTransmitQueue(struct rx_call *call)
{
- struct rx_packet *p, *tp;
+ struct opr_queue *cursor;
int someAcked = 0;
- for (queue_Scan(&call->tq, p, tp, rx_packet)) {
+ for (opr_queue_Scan(&call->tq, cursor)) {
+ struct rx_packet *p
+ = opr_queue_Entry(cursor, struct rx_packet, entry);
+
p->flags |= RX_PKTFLAG_ACKED;
someAcked = 1;
}
+
if (someAcked) {
call->flags |= RX_CALL_TQ_CLEARME;
call->flags |= RX_CALL_TQ_SOME_ACKED;
}
#endif /* RX_ENABLE_LOCKS */
+/*!
+ * Acknowledge the whole transmit queue.
+ *
+ * If we're running without locks, or the transmit queue isn't busy, then
+ * we can just clear the queue now. Otherwise, we have to mark all of the
+ * packets as acknowledged, and let rxi_Start clear it later on
+ */
+static void
+rxi_AckAllInTransmitQueue(struct rx_call *call)
+{
+#ifdef RX_ENABLE_LOCKS
+ if (call->flags & RX_CALL_TQ_BUSY) {
+ rxi_SetAcksInTransmitQueue(call);
+ return;
+ }
+#endif
+ rxi_ClearTransmitQueue(call, 0);
+}
/* Clear out the transmit queue for the current call (all packets have
* been received by peer) */
-void
+static void
rxi_ClearTransmitQueue(struct rx_call *call, int force)
{
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- struct rx_packet *p, *tp;
-
+#ifdef RX_ENABLE_LOCKS
+ struct opr_queue *cursor;
if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
int someAcked = 0;
- for (queue_Scan(&call->tq, p, tp, rx_packet)) {
+ for (opr_queue_Scan(&call->tq, cursor)) {
+ struct rx_packet *p
+ = opr_queue_Entry(cursor, struct rx_packet, entry);
+
p->flags |= RX_PKTFLAG_ACKED;
someAcked = 1;
}
call->flags |= RX_CALL_TQ_SOME_ACKED;
}
} else {
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif /* RX_ENABLE_LOCKS */
#ifdef RXDEBUG_PACKET
call->tqc -=
#endif /* RXDEBUG_PACKET */
rxi_FreePackets(0, &call->tq);
rxi_WakeUpTransmitQueue(call);
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
call->flags &= ~RX_CALL_TQ_CLEARME;
}
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif
rxi_rto_cancel(call);
call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
#endif
}
-void
+static void
rxi_ClearReceiveQueue(struct rx_call *call)
{
- if (queue_IsNotEmpty(&call->rq)) {
+ if (!opr_queue_IsEmpty(&call->rq)) {
u_short count;
count = rxi_FreePackets(0, &call->rq);
}
/* Send an abort packet for the specified call */
-struct rx_packet *
+static struct rx_packet *
rxi_SendCallAbort(struct rx_call *call, struct rx_packet *packet,
int istack, int force)
{
- afs_int32 error;
+ afs_int32 error, cerror;
struct clock when, now;
if (!call->error)
return packet;
+ switch (call->error) {
+ case RX_CALL_IDLE:
+ case RX_CALL_BUSY:
+ cerror = RX_CALL_TIMEOUT;
+ break;
+ default:
+ cerror = call->error;
+ }
+
/* Clients should never delay abort messages */
if (rx_IsClientConn(call->conn))
force = 1;
- if (call->abortCode != call->error) {
- call->abortCode = call->error;
+ if (call->abortCode != cerror) {
+ call->abortCode = cerror;
call->abortCount = 0;
}
if (force || rxi_callAbortThreshhold == 0
|| call->abortCount < rxi_callAbortThreshhold) {
- if (call->delayedAbortEvent) {
- rxevent_Cancel(call->delayedAbortEvent, call,
- RX_CALL_REFCOUNT_ABORT);
- }
- error = htonl(call->error);
+ rxi_CancelDelayedAbortEvent(call);
+ error = htonl(cerror);
call->abortCount++;
packet =
rxi_SendSpecial(call, call->conn, packet, RX_PACKET_TYPE_ABORT,
clock_GetTime(&now);
when = now;
clock_Addmsec(&when, rxi_callAbortDelay);
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT);
- MUTEX_EXIT(&rx_refcnt_mutex);
call->delayedAbortEvent =
- rxevent_PostNow(&when, &now, rxi_SendDelayedCallAbort, call, 0);
+ rxevent_Post(&when, &now, rxi_SendDelayedCallAbort, call, 0, 0);
}
return packet;
}
+static void
+rxi_CancelDelayedAbortEvent(struct rx_call *call)
+{
+ if (call->delayedAbortEvent) {
+ rxevent_Cancel(&call->delayedAbortEvent);
+ CALL_RELE(call, RX_CALL_REFCOUNT_ABORT);
+ }
+}
+
/* Send an abort packet for the specified connection. Packet is an
* optional pointer to a packet that can be used to send the abort.
* Once the number of abort messages reaches the threshhold, an
struct rx_packet *packet, int istack, int force)
{
afs_int32 error;
- struct clock when, now;
if (!conn->error)
return packet;
if (force || rxi_connAbortThreshhold == 0
|| conn->abortCount < rxi_connAbortThreshhold) {
- if (conn->delayedAbortEvent) {
- rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
- }
+
+ rxevent_Cancel(&conn->delayedAbortEvent);
error = htonl(conn->error);
conn->abortCount++;
MUTEX_EXIT(&conn->conn_data_lock);
RX_PACKET_TYPE_ABORT, (char *)&error,
sizeof(error), istack);
MUTEX_ENTER(&conn->conn_data_lock);
- } else if (!conn->delayedAbortEvent) {
- clock_GetTime(&now);
- when = now;
- clock_Addmsec(&when, rxi_connAbortDelay);
- conn->delayedAbortEvent =
- rxevent_PostNow(&when, &now, rxi_SendDelayedConnAbort, conn, 0);
+ } else {
+ rxi_SendConnectionAbortLater(conn, rxi_connAbortDelay);
}
return packet;
}
dpf(("rxi_ConnectionError conn %"AFS_PTR_FMT" error %d\n", conn, error));
MUTEX_ENTER(&conn->conn_data_lock);
- if (conn->challengeEvent)
- rxevent_Cancel(conn->challengeEvent, (struct rx_call *)0, 0);
- if (conn->natKeepAliveEvent)
- rxevent_Cancel(conn->natKeepAliveEvent, (struct rx_call *)0, 0);
+ rxevent_Cancel(&conn->challengeEvent);
+ rxevent_Cancel(&conn->natKeepAliveEvent);
if (conn->checkReachEvent) {
- rxevent_Cancel(conn->checkReachEvent, (struct rx_call *)0, 0);
- conn->checkReachEvent = 0;
+ rxevent_Cancel(&conn->checkReachEvent);
conn->flags &= ~(RX_CONN_ATTACHWAIT|RX_CONN_NAT_PING);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ putConnection(conn);
}
MUTEX_EXIT(&conn->conn_data_lock);
for (i = 0; i < RX_MAXCALLS; i++) {
void
rxi_CallError(struct rx_call *call, afs_int32 error)
{
-#ifdef DEBUG
- osirx_AssertMine(&call->lock, "rxi_CallError");
-#endif
+ MUTEX_ASSERT(&call->lock);
dpf(("rxi_CallError call %"AFS_PTR_FMT" error %d call->error %d\n", call, error, call->error));
if (call->error)
error = call->error;
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
if (!((call->flags & RX_CALL_TQ_BUSY) || (call->tqWaiters > 0))) {
rxi_ResetCall(call, 0);
}
* unprotected macros, and may only be reset by non-interrupting code.
*/
-void
+static void
rxi_ResetCall(struct rx_call *call, int newcall)
{
int flags;
struct rx_peer *peer;
struct rx_packet *packet;
-#ifdef DEBUG
- osirx_AssertMine(&call->lock, "rxi_ResetCall");
-#endif
+
+ MUTEX_ASSERT(&call->lock);
dpf(("rxi_ResetCall(call %"AFS_PTR_FMT", newcall %d)\n", call, newcall));
/* Notify anyone who is waiting for asynchronous packet arrival */
call->arrivalProc = (void (*)())0;
}
- if (call->growMTUEvent)
- rxevent_Cancel(call->growMTUEvent, call,
- RX_CALL_REFCOUNT_ALIVE);
+
+ rxi_CancelGrowMTUEvent(call);
if (call->delayedAbortEvent) {
- rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
+ rxi_CancelDelayedAbortEvent(call);
packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
if (packet) {
rxi_SendCallAbort(call, packet, 0, 1);
MUTEX_EXIT(&peer->peer_lock);
flags = call->flags;
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
rxi_WaitforTQBusy(call);
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
rxi_ClearTransmitQueue(call, 1);
if (call->tqWaiters || (flags & RX_CALL_TQ_WAIT)) {
}
call->flags = 0;
- if ((flags & RX_CALL_PEER_BUSY)) {
+ if (!newcall && (flags & RX_CALL_PEER_BUSY)) {
/* The call channel is still busy; resetting the call doesn't change
- * that */
+ * that. However, if 'newcall' is set, we are processing a call
+ * structure that has either been recycled from the free list, or has
+ * been newly allocated. So, RX_CALL_PEER_BUSY is not relevant if
+ * 'newcall' is set, since it describes a completely different call
+ * channel which we do not care about. */
call->flags |= RX_CALL_PEER_BUSY;
}
call->rprev = 0;
call->lastAcked = 0;
call->localStatus = call->remoteStatus = 0;
+ call->lastSendData = 0;
if (flags & RX_CALL_READER_WAIT) {
#ifdef RX_ENABLE_LOCKS
osi_rxWakeup(&call->twind);
#endif
+ if (flags & RX_CALL_WAIT_PROC) {
+ rx_atomic_dec(&rx_nWaiting);
+ }
#ifdef RX_ENABLE_LOCKS
/* The following ensures that we don't mess with any queue while some
* other thread might also be doing so. The call_queue_lock field is
*/
if (call->call_queue_lock) {
MUTEX_ENTER(call->call_queue_lock);
- if (queue_IsOnQueue(call)) {
- queue_Remove(call);
- if (flags & RX_CALL_WAIT_PROC) {
- rx_atomic_dec(&rx_nWaiting);
- }
+ if (opr_queue_IsOnQueue(&call->entry)) {
+ opr_queue_Remove(&call->entry);
}
MUTEX_EXIT(call->call_queue_lock);
CLEAR_CALL_QUEUE_LOCK(call);
}
#else /* RX_ENABLE_LOCKS */
- if (queue_IsOnQueue(call)) {
- queue_Remove(call);
- if (flags & RX_CALL_WAIT_PROC)
- rx_atomic_dec(&rx_nWaiting);
+ if (opr_queue_IsOnQueue(&call->entry)) {
+ opr_queue_Remove(&call->entry);
}
#endif /* RX_ENABLE_LOCKS */
- rxi_KeepAliveOff(call);
- rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
+ rxi_CancelKeepAliveEvent(call);
+ rxi_CancelDelayedAckEvent(call);
}
/* Send an acknowledge for the indicated packet (seq,serial) of the
int istack)
{
struct rx_ackPacket *ap;
- struct rx_packet *rqp;
- struct rx_packet *nxp; /* For queue_Scan */
struct rx_packet *p;
+ struct opr_queue *cursor;
u_char offset = 0;
afs_int32 templ;
afs_uint32 padbytes = 0;
* are packets in the receive queue awaiting processing.
*/
if ((call->flags & RX_CALL_ACKALL_SENT) &&
- !queue_IsEmpty(&call->rq)) {
- ap->firstPacket = htonl(queue_Last(&call->rq, rx_packet)->header.seq + 1);
+ !opr_queue_IsEmpty(&call->rq)) {
+ ap->firstPacket = htonl(opr_queue_Last(&call->rq, struct rx_packet, entry)->header.seq + 1);
} else {
ap->firstPacket = htonl(call->rnext);
ap->previousPacket = htonl(call->rprev); /* Previous packet received */
- /* No fear of running out of ack packet here because there can only be at most
- * one window full of unacknowledged packets. The window size must be constrained
- * to be less than the maximum ack size, of course. Also, an ack should always
- * fit into a single packet -- it should not ever be fragmented. */
- for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
+ /* No fear of running out of ack packet here because there can only
+ * be at most one window full of unacknowledged packets. The window
+ * size must be constrained to be less than the maximum ack size,
+ * of course. Also, an ack should always fit into a single packet
+ * -- it should not ever be fragmented. */
+ offset = 0;
+ for (opr_queue_Scan(&call->rq, cursor)) {
+ struct rx_packet *rqp
+ = opr_queue_Entry(cursor, struct rx_packet, entry);
+
if (!rqp || !call->rq.next
|| (rqp->header.seq > (call->rnext + call->rwind))) {
#ifndef RX_ENABLE_TSFPQ
/* Since we're about to send a data packet to the peer, it's
* safe to nuke any scheduled end-of-packets ack */
- rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
+ rxi_CancelDelayedAckEvent(call);
MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
- MUTEX_EXIT(&rx_refcnt_mutex);
if (xmit->len > 1) {
rxi_SendPacketList(call, conn, xmit->list, xmit->len, istack);
} else {
rxi_SendPacket(call, conn, xmit->list[0], istack);
}
MUTEX_ENTER(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
- MUTEX_EXIT(&rx_refcnt_mutex);
/* Tell the RTO calculation engine that we have sent a packet, and
* if it was the last one */
/* Send the whole list when the call is in receive mode, when
* the call is in eof mode, when we are in fast recovery mode,
* and when we have the last packet */
+ /* XXX - The accesses to app.mode aren't safe, as this may be called by
+ * the listener or event threads
+ */
if ((list[len - 1]->header.flags & RX_LAST_PACKET)
- || call->mode == RX_MODE_RECEIVING || call->mode == RX_MODE_EOF
+ || (call->flags & RX_CALL_FLUSH)
|| (call->flags & RX_CALL_FAST_RECOVER)) {
/* Check for the case where the current list contains
* an acked packet. Since we always send retransmissions
}
}
+/**
+ * Check if the peer for the given call is known to be dead
+ *
+ * If the call's peer appears dead (it has encountered fatal network errors
+ * since the call started) the call is killed with RX_CALL_DEAD if the call
+ * is active. Otherwise, we do nothing.
+ *
+ * @param[in] call The call to check
+ *
+ * @return status
+ * @retval 0 The call is fine, and we haven't done anything to the call
+ * @retval nonzero The call's peer appears dead, and the call has been
+ * terminated if it was active
+ *
+ * @pre call->lock must be locked
+ */
+static int
+rxi_CheckPeerDead(struct rx_call *call)
+{
+#ifdef AFS_RXERRQ_ENV
+ int peererrs;
+
+ if (call->state == RX_STATE_DALLY) {
+ return 0;
+ }
+
+ peererrs = rx_atomic_read(&call->conn->peer->neterrs);
+ if (call->neterr_gen < peererrs) {
+ /* we have received network errors since this call started; kill
+ * the call */
+ if (call->state == RX_STATE_ACTIVE) {
+ rxi_CallError(call, RX_CALL_DEAD);
+ }
+ return -1;
+ }
+ if (call->neterr_gen > peererrs) {
+ /* someone has reset the number of peer errors; set the call error gen
+ * so we can detect if more errors are encountered */
+ call->neterr_gen = peererrs;
+ }
+#endif
+ return 0;
+}
+
static void
rxi_Resend(struct rxevent *event, void *arg0, void *arg1, int istack)
{
struct rx_call *call = arg0;
struct rx_peer *peer;
- struct rx_packet *p, *nxp;
+ struct opr_queue *cursor;
struct clock maxTimeout = { 60, 0 };
MUTEX_ENTER(&call->lock);
* structure, since there is no longer a per-call retransmission
* event pending. */
if (event == call->resendEvent) {
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_RELE(call, RX_CALL_REFCOUNT_RESEND);
- MUTEX_EXIT(&rx_refcnt_mutex);
- call->resendEvent = NULL;
+ rxevent_Put(&call->resendEvent);
}
+ rxi_CheckPeerDead(call);
+
if (rxi_busyChannelError && (call->flags & RX_CALL_PEER_BUSY)) {
rxi_CheckBusy(call);
}
- if (queue_IsEmpty(&call->tq)) {
+ if (opr_queue_IsEmpty(&call->tq)) {
/* Nothing to do. This means that we've been raced, and that an
* ACK has come in between when we were triggered, and when we
* actually got to run. */
call->flags |= RX_CALL_FAST_RECOVER;
/* Mark all of the pending packets in the queue as being lost */
- for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
+ for (opr_queue_Scan(&call->tq, cursor)) {
+ struct rx_packet *p = opr_queue_Entry(cursor, struct rx_packet, entry);
if (!(p->flags & RX_PKTFLAG_ACKED))
p->flags &= ~RX_PKTFLAG_SENT;
}
void
rxi_Start(struct rx_call *call, int istack)
{
-
- struct rx_packet *p;
- struct rx_packet *nxp; /* Next pointer for queue_Scan */
+ struct opr_queue *cursor;
+#ifdef RX_ENABLE_LOCKS
+ struct opr_queue *store;
+#endif
int nXmitPackets;
int maxXmitPackets;
if (call->error) {
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
if (rx_stats_active)
rx_atomic_inc(&rx_tq_debug.rxi_start_in_error);
#endif
return;
}
- if (queue_IsNotEmpty(&call->tq)) { /* If we have anything to send */
-
+ if (!opr_queue_IsEmpty(&call->tq)) { /* If we have anything to send */
/* Send (or resend) any packets that need it, subject to
* window restrictions and congestion burst control
* restrictions. Ask for an ack on the last packet sent in
* But check whether we're here recursively, and let the other guy
* do the work.
*/
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
if (!(call->flags & RX_CALL_TQ_BUSY)) {
call->flags |= RX_CALL_TQ_BUSY;
do {
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif /* RX_ENABLE_LOCKS */
restart:
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
call->flags &= ~RX_CALL_NEED_START;
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif /* RX_ENABLE_LOCKS */
nXmitPackets = 0;
maxXmitPackets = MIN(call->twind, call->cwind);
- for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
-#ifdef RX_TRACK_PACKETS
- if ((p->flags & RX_PKTFLAG_FREE)
- || (!queue_IsEnd(&call->tq, nxp)
- && (nxp->flags & RX_PKTFLAG_FREE))
- || (p == (struct rx_packet *)&rx_freePacketQueue)
- || (nxp == (struct rx_packet *)&rx_freePacketQueue)) {
- osi_Panic("rxi_Start: xmit queue clobbered");
- }
-#endif
+ for (opr_queue_Scan(&call->tq, cursor)) {
+ struct rx_packet *p
+ = opr_queue_Entry(cursor, struct rx_packet, entry);
+
if (p->flags & RX_PKTFLAG_ACKED) {
/* Since we may block, don't trust this */
if (rx_stats_active)
*(call->callNumber), p));
call->xmitList[nXmitPackets++] = p;
}
- }
+ } /* end of the queue_Scan */
/* xmitList now hold pointers to all of the packets that are
* ready to send. Now we loop to send the packets */
istack);
}
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
if (call->error) {
/* We went into the error state while sending packets. Now is
* the time to reset the call. This will also inform the using
rxi_CallError(call, call->error);
return;
}
-#ifdef RX_ENABLE_LOCKS
+
if (call->flags & RX_CALL_TQ_SOME_ACKED) {
int missing;
call->flags &= ~RX_CALL_TQ_SOME_ACKED;
/* Some packets have received acks. If they all have, we can clear
* the transmit queue.
*/
- for (missing =
- 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
+ missing = 0;
+ for (opr_queue_ScanSafe(&call->tq, cursor, store)) {
+ struct rx_packet *p
+ = opr_queue_Entry(cursor, struct rx_packet, entry);
+
if (p->header.seq < call->tfirst
&& (p->flags & RX_PKTFLAG_ACKED)) {
- queue_Remove(p);
+ opr_queue_Remove(&p->entry);
#ifdef RX_TRACK_PACKETS
p->flags &= ~RX_PKTFLAG_TQ;
#endif
if (!missing)
call->flags |= RX_CALL_TQ_CLEARME;
}
-#endif /* RX_ENABLE_LOCKS */
if (call->flags & RX_CALL_TQ_CLEARME)
rxi_ClearTransmitQueue(call, 1);
} while (call->flags & RX_CALL_NEED_START);
/*
* TQ references no longer protected by this flag; they must remain
- * protected by the global lock.
+ * protected by the call lock.
*/
call->flags &= ~RX_CALL_TQ_BUSY;
rxi_WakeUpTransmitQueue(call);
} else {
call->flags |= RX_CALL_NEED_START;
}
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif /* RX_ENABLE_LOCKS */
} else {
rxi_rto_cancel(call);
}
/* Since we're about to send SOME sort of packet to the peer, it's
* safe to nuke any scheduled end-of-packets ack */
- rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
+ rxi_CancelDelayedAckEvent(call);
/* Actually send the packet, filling in more connection-specific fields */
MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
- MUTEX_EXIT(&rx_refcnt_mutex);
rxi_SendPacket(call, conn, p, istack);
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
- MUTEX_EXIT(&rx_refcnt_mutex);
MUTEX_ENTER(&call->lock);
/* Update last send time for this call (for keep-alive
* may be freed!
* haveCTLock Set if calling from rxi_ReapConnections
*/
-#ifdef RX_ENABLE_LOCKS
-int
+static int
rxi_CheckCall(struct rx_call *call, int haveCTLock)
-#else /* RX_ENABLE_LOCKS */
-int
-rxi_CheckCall(struct rx_call *call)
-#endif /* RX_ENABLE_LOCKS */
{
struct rx_connection *conn = call->conn;
afs_uint32 now;
afs_uint32 fudgeFactor;
int cerror = 0;
int newmtu = 0;
+ int idle_timeout = 0;
+ afs_int32 clock_diff = 0;
+
+ if (rxi_CheckPeerDead(call)) {
+ return -1;
+ }
+
+ now = clock_Sec();
+
+ /* Large swings in the clock can have a significant impact on
+ * the performance of RX call processing. Forward clock shifts
+ * will result in premature event triggering or timeouts.
+ * Backward shifts can result in calls not completing until
+ * the clock catches up with the original start clock value.
+ *
+ * If a backward clock shift of more than five minutes is noticed,
+ * just fail the call.
+ */
+ if (now < call->lastSendTime)
+ clock_diff = call->lastSendTime - now;
+ if (now < call->startWait)
+ clock_diff = MAX(clock_diff, call->startWait - now);
+ if (now < call->lastReceiveTime)
+ clock_diff = MAX(clock_diff, call->lastReceiveTime - now);
+ if (clock_diff > 5 * 60)
+ {
+ if (call->state == RX_STATE_ACTIVE)
+ rxi_CallError(call, RX_CALL_TIMEOUT);
+ return -1;
+ }
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
if (call->flags & RX_CALL_TQ_BUSY) {
/* Call is active and will be reset by rxi_Start if it's
* in an error state.
((afs_uint32) call->rtt_dev << 1) + 1023) >> 10;
deadTime = conn->secondsUntilDead + fudgeFactor;
- now = clock_Sec();
/* These are computed to the second (+- 1 second). But that's
* good enough for these values, which should be a significant
* number of seconds. */
if (now > (call->lastReceiveTime + deadTime)) {
if (call->state == RX_STATE_ACTIVE) {
-#ifdef ADAPT_PMTU
-#if defined(KERNEL) && defined(AFS_SUN5_ENV)
+#ifdef AFS_ADAPT_PMTU
+# if defined(KERNEL) && defined(AFS_SUN5_ENV)
ire_t *ire;
-#if defined(AFS_SUN510_ENV) && defined(GLOBAL_NETSTACKID)
- netstack_t *ns = netstack_find_by_stackid(GLOBAL_NETSTACKID);
+# if defined(AFS_SUN510_ENV) && defined(GLOBAL_NETSTACKID)
+ netstack_t *ns = netstack_find_by_stackid(GLOBAL_NETSTACKID);
ip_stack_t *ipst = ns->netstack_ip;
-#endif
+# endif
ire = ire_cache_lookup(conn->peer->host
-#if defined(AFS_SUN510_ENV) && defined(ALL_ZONES)
+# if defined(AFS_SUN510_ENV) && defined(ALL_ZONES)
, ALL_ZONES
-#if defined(AFS_SUN510_ENV) && (defined(ICL_3_ARG) || defined(GLOBAL_NETSTACKID))
+# if defined(ICL_3_ARG) || defined(GLOBAL_NETSTACKID)
, NULL
-#if defined(AFS_SUN510_ENV) && defined(GLOBAL_NETSTACKID)
+# if defined(GLOBAL_NETSTACKID)
, ipst
-#endif
-#endif
-#endif
+# endif
+# endif
+# endif
);
if (ire && ire->ire_max_frag > 0)
rxi_SetPeerMtu(NULL, conn->peer->host, 0,
ire->ire_max_frag);
-#if defined(GLOBAL_NETSTACKID)
+# if defined(GLOBAL_NETSTACKID)
netstack_rele(ns);
-#endif
-#endif
-#endif /* ADAPT_PMTU */
+# endif
+# endif
+#endif /* AFS_ADAPT_PMTU */
cerror = RX_CALL_DEAD;
goto mtuout;
} else {
#ifdef RX_ENABLE_LOCKS
/* Cancel pending events */
- rxevent_Cancel(call->delayedAckEvent, call,
- RX_CALL_REFCOUNT_DELAY);
+ rxi_CancelDelayedAckEvent(call);
rxi_rto_cancel(call);
- rxevent_Cancel(call->keepAliveEvent, call,
- RX_CALL_REFCOUNT_ALIVE);
- if (call->growMTUEvent)
- rxevent_Cancel(call->growMTUEvent, call,
- RX_CALL_REFCOUNT_ALIVE);
+ rxi_CancelKeepAliveEvent(call);
+ rxi_CancelGrowMTUEvent(call);
MUTEX_ENTER(&rx_refcnt_mutex);
- if (call->refCount == 0) {
- rxi_FreeCall(call, haveCTLock);
+ /* if rxi_FreeCall returns 1 it has freed the call */
+ if (call->refCount == 0 &&
+ rxi_FreeCall(call, haveCTLock))
+ {
MUTEX_EXIT(&rx_refcnt_mutex);
- return -2;
+ return -2;
}
MUTEX_EXIT(&rx_refcnt_mutex);
return -1;
* attached process can die reasonably gracefully. */
}
- if (conn->idleDeadTime) {
- idleDeadTime = conn->idleDeadTime + fudgeFactor;
- }
+ if (conn->idleDeadDetection) {
+ if (conn->idleDeadTime) {
+ idleDeadTime = conn->idleDeadTime + fudgeFactor;
+ }
- /* see if we have a non-activity timeout */
- if (call->startWait && idleDeadTime
- && ((call->startWait + idleDeadTime) < now) &&
- (call->flags & RX_CALL_READER_WAIT)) {
- if (call->state == RX_STATE_ACTIVE) {
- cerror = RX_CALL_TIMEOUT;
- goto mtuout;
- }
- }
- if (call->lastSendData && idleDeadTime && (conn->idleDeadErr != 0)
- && ((call->lastSendData + idleDeadTime) < now)) {
- if (call->state == RX_STATE_ACTIVE) {
- cerror = conn->idleDeadErr;
- goto mtuout;
- }
+ if (idleDeadTime) {
+ /* see if we have a non-activity timeout */
+ if (call->startWait && ((call->startWait + idleDeadTime) < now) &&
+ (call->flags & RX_CALL_READER_WAIT)) {
+ if (call->state == RX_STATE_ACTIVE) {
+ cerror = RX_CALL_TIMEOUT;
+ goto mtuout;
+ }
+ }
+
+ if (call->lastSendData && ((call->lastSendData + idleDeadTime) < now)) {
+ if (call->state == RX_STATE_ACTIVE) {
+ cerror = conn->service ? conn->service->idleDeadErr : RX_CALL_IDLE;
+ idle_timeout = 1;
+ goto mtuout;
+ }
+ }
+ }
}
if (conn->hardDeadTime) {
}
return 0;
mtuout:
- if (conn->msgsizeRetryErr && cerror != RX_CALL_TIMEOUT
- && call->lastReceiveTime) {
+ if (conn->msgsizeRetryErr && cerror != RX_CALL_TIMEOUT && !idle_timeout &&
+ call->lastReceiveTime) {
int oldMTU = conn->peer->ifMTU;
/* if we thought we could send more, perhaps things got worse */
}
void
-rxi_NatKeepAliveEvent(struct rxevent *event, void *arg1, void *dummy)
+rxi_NatKeepAliveEvent(struct rxevent *event, void *arg1,
+ void *dummy, int dummy2)
{
struct rx_connection *conn = arg1;
struct rx_header theader;
taddr.sin_family = AF_INET;
taddr.sin_port = rx_PortOf(rx_PeerOf(conn));
taddr.sin_addr.s_addr = rx_HostOf(rx_PeerOf(conn));
+ memset(&taddr.sin_zero, 0, sizeof(taddr.sin_zero));
#ifdef STRUCT_SOCKADDR_HAS_SA_LEN
taddr.sin_len = sizeof(struct sockaddr_in);
#endif
MUTEX_ENTER(&rx_refcnt_mutex);
/* Only reschedule ourselves if the connection would not be destroyed */
if (conn->refCount <= 1) {
- conn->natKeepAliveEvent = NULL;
+ rxevent_Put(&conn->natKeepAliveEvent);
MUTEX_EXIT(&rx_refcnt_mutex);
MUTEX_EXIT(&conn->conn_data_lock);
rx_DestroyConnection(conn); /* drop the reference for this */
} else {
conn->refCount--; /* drop the reference for this */
MUTEX_EXIT(&rx_refcnt_mutex);
- conn->natKeepAliveEvent = NULL;
+ rxevent_Put(&conn->natKeepAliveEvent);
rxi_ScheduleNatKeepAliveEvent(conn);
MUTEX_EXIT(&conn->conn_data_lock);
}
}
-void
+static void
rxi_ScheduleNatKeepAliveEvent(struct rx_connection *conn)
{
if (!conn->natKeepAliveEvent && conn->secondsUntilNatPing) {
conn->refCount++; /* hold a reference for this */
MUTEX_EXIT(&rx_refcnt_mutex);
conn->natKeepAliveEvent =
- rxevent_PostNow(&when, &now, rxi_NatKeepAliveEvent, conn, 0);
+ rxevent_Post(&when, &now, rxi_NatKeepAliveEvent, conn, NULL, 0);
}
}
MUTEX_EXIT(&conn->conn_data_lock);
}
-void
-rxi_NatKeepAliveOn(struct rx_connection *conn)
-{
- MUTEX_ENTER(&conn->conn_data_lock);
- /* if it's already attached */
- if (!(conn->flags & RX_CONN_ATTACHWAIT))
- rxi_ScheduleNatKeepAliveEvent(conn);
- else
- conn->flags |= RX_CONN_NAT_PING;
- MUTEX_EXIT(&conn->conn_data_lock);
-}
-
/* When a call is in progress, this routine is called occasionally to
* make sure that some traffic has arrived (or been sent to) the peer.
* If nothing has arrived in a reasonable amount of time, the call is
* keep-alive packet (if we're actually trying to keep the call alive)
*/
void
-rxi_KeepAliveEvent(struct rxevent *event, void *arg1, void *dummy)
+rxi_KeepAliveEvent(struct rxevent *event, void *arg1, void *dummy,
+ int dummy2)
{
struct rx_call *call = arg1;
struct rx_connection *conn;
afs_uint32 now;
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_RELE(call, RX_CALL_REFCOUNT_ALIVE);
- MUTEX_EXIT(&rx_refcnt_mutex);
MUTEX_ENTER(&call->lock);
+
if (event == call->keepAliveEvent)
- call->keepAliveEvent = NULL;
+ rxevent_Put(&call->keepAliveEvent);
+
now = clock_Sec();
-#ifdef RX_ENABLE_LOCKS
if (rxi_CheckCall(call, 0)) {
MUTEX_EXIT(&call->lock);
return;
}
-#else /* RX_ENABLE_LOCKS */
- if (rxi_CheckCall(call))
- return;
-#endif /* RX_ENABLE_LOCKS */
/* Don't try to keep alive dallying calls */
if (call->state == RX_STATE_DALLY) {
/* Does what's on the nameplate. */
void
-rxi_GrowMTUEvent(struct rxevent *event, void *arg1, void *dummy)
+rxi_GrowMTUEvent(struct rxevent *event, void *arg1, void *dummy, int dummy2)
{
struct rx_call *call = arg1;
struct rx_connection *conn;
- MUTEX_ENTER(&rx_refcnt_mutex);
- CALL_RELE(call, RX_CALL_REFCOUNT_ALIVE);
- MUTEX_EXIT(&rx_refcnt_mutex);
+ CALL_RELE(call, RX_CALL_REFCOUNT_MTU);
MUTEX_ENTER(&call->lock);
if (event == call->growMTUEvent)
- call->growMTUEvent = NULL;
+ rxevent_Put(&call->growMTUEvent);
-#ifdef RX_ENABLE_LOCKS
if (rxi_CheckCall(call, 0)) {
MUTEX_EXIT(&call->lock);
return;
}
-#else /* RX_ENABLE_LOCKS */
- if (rxi_CheckCall(call))
- return;
-#endif /* RX_ENABLE_LOCKS */
/* Don't bother with dallying calls */
if (call->state == RX_STATE_DALLY) {
*/
if ((conn->peer->maxPacketSize != 0) &&
(conn->peer->natMTU < RX_MAX_PACKET_SIZE) &&
- (conn->idleDeadErr))
+ conn->idleDeadDetection)
(void)rxi_SendAck(call, NULL, 0, RX_ACK_MTU, 0);
rxi_ScheduleGrowMTUEvent(call, 0);
MUTEX_EXIT(&call->lock);
}
-void
+static void
rxi_ScheduleKeepAliveEvent(struct rx_call *call)
{
if (!call->keepAliveEvent) {
clock_GetTime(&now);
when = now;
when.sec += call->conn->secondsUntilPing;
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_HOLD(call, RX_CALL_REFCOUNT_ALIVE);
- MUTEX_EXIT(&rx_refcnt_mutex);
call->keepAliveEvent =
- rxevent_PostNow(&when, &now, rxi_KeepAliveEvent, call, 0);
+ rxevent_Post(&when, &now, rxi_KeepAliveEvent, call, NULL, 0);
}
}
-void
+static void
+rxi_CancelKeepAliveEvent(struct rx_call *call) {
+ if (call->keepAliveEvent) {
+ rxevent_Cancel(&call->keepAliveEvent);
+ CALL_RELE(call, RX_CALL_REFCOUNT_ALIVE);
+ }
+}
+
+static void
rxi_ScheduleGrowMTUEvent(struct rx_call *call, int secs)
{
if (!call->growMTUEvent) {
}
when.sec += secs;
- MUTEX_ENTER(&rx_refcnt_mutex);
- CALL_HOLD(call, RX_CALL_REFCOUNT_ALIVE);
- MUTEX_EXIT(&rx_refcnt_mutex);
+ CALL_HOLD(call, RX_CALL_REFCOUNT_MTU);
call->growMTUEvent =
- rxevent_PostNow(&when, &now, rxi_GrowMTUEvent, call, 0);
+ rxevent_Post(&when, &now, rxi_GrowMTUEvent, call, NULL, 0);
}
}
-/* N.B. rxi_KeepAliveOff: is defined earlier as a macro */
-void
+static void
+rxi_CancelGrowMTUEvent(struct rx_call *call)
+{
+ if (call->growMTUEvent) {
+ rxevent_Cancel(&call->growMTUEvent);
+ CALL_RELE(call, RX_CALL_REFCOUNT_MTU);
+ }
+}
+
+/*
+ * Increment the counter for the next connection ID, handling overflow.
+ */
+static void
+update_nextCid(void)
+{
+ /* Overflow is technically undefined behavior; avoid it. */
+ if (rx_nextCid > MAX_AFS_INT32 - (1 << RX_CIDSHIFT))
+ rx_nextCid = -1 * ((MAX_AFS_INT32 / RX_CIDSHIFT) * RX_CIDSHIFT);
+ else
+ rx_nextCid += 1 << RX_CIDSHIFT;
+}
+
+static void
rxi_KeepAliveOn(struct rx_call *call)
{
/* Pretend last packet received was received now--i.e. if another
}
void
+rx_KeepAliveOff(struct rx_call *call)
+{
+ MUTEX_ENTER(&call->lock);
+ rxi_CancelKeepAliveEvent(call);
+ MUTEX_EXIT(&call->lock);
+}
+
+void
+rx_KeepAliveOn(struct rx_call *call)
+{
+ MUTEX_ENTER(&call->lock);
+ rxi_KeepAliveOn(call);
+ MUTEX_EXIT(&call->lock);
+}
+
+static void
rxi_GrowMTUOn(struct rx_call *call)
{
struct rx_connection *conn = call->conn;
/* This routine is called to send connection abort messages
* that have been delayed to throttle looping clients. */
-void
-rxi_SendDelayedConnAbort(struct rxevent *event,
- void *arg1, void *unused)
+static void
+rxi_SendDelayedConnAbort(struct rxevent *event, void *arg1, void *unused,
+ int unused2)
{
struct rx_connection *conn = arg1;
struct rx_packet *packet;
MUTEX_ENTER(&conn->conn_data_lock);
- conn->delayedAbortEvent = NULL;
+ rxevent_Put(&conn->delayedAbortEvent);
error = htonl(conn->error);
conn->abortCount++;
MUTEX_EXIT(&conn->conn_data_lock);
/* This routine is called to send call abort messages
* that have been delayed to throttle looping clients. */
-void
-rxi_SendDelayedCallAbort(struct rxevent *event,
- void *arg1, void *dummy)
+static void
+rxi_SendDelayedCallAbort(struct rxevent *event, void *arg1, void *dummy,
+ int dummy2)
{
struct rx_call *call = arg1;
struct rx_packet *packet;
MUTEX_ENTER(&call->lock);
- call->delayedAbortEvent = NULL;
+ rxevent_Put(&call->delayedAbortEvent);
error = htonl(call->error);
call->abortCount++;
packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
rxi_FreePacket(packet);
}
MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_RELE(call, RX_CALL_REFCOUNT_ABORT);
- MUTEX_EXIT(&rx_refcnt_mutex);
}
/* This routine is called periodically (every RX_AUTH_REQUEST_TIMEOUT
* seconds) to ask the client to authenticate itself. The routine
* issues a challenge to the client, which is obtained from the
* security object associated with the connection */
-void
+static void
rxi_ChallengeEvent(struct rxevent *event,
void *arg0, void *arg1, int tries)
{
struct rx_connection *conn = arg0;
- conn->challengeEvent = NULL;
+ if (event)
+ rxevent_Put(&conn->challengeEvent);
+
+ /* If there are no active calls it is not worth re-issuing the
+ * challenge. If the client issues another call on this connection
+ * the challenge can be requested at that time.
+ */
+ if (!rxi_HasActiveCalls(conn))
+ return;
+
if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) {
struct rx_packet *packet;
struct clock when, now;
when = now;
when.sec += RX_CHALLENGE_TIMEOUT;
conn->challengeEvent =
- rxevent_PostNow2(&when, &now, rxi_ChallengeEvent, conn, 0,
+ rxevent_Post(&when, &now, rxi_ChallengeEvent, conn, 0,
(tries - 1));
}
}
* security object associated with the connection is asked to create
* the challenge at this time. N.B. rxi_ChallengeOff is a macro,
* defined earlier. */
-void
+static void
rxi_ChallengeOn(struct rx_connection *conn)
{
if (!conn->challengeEvent) {
/* Find all server connections that have not been active for a long time, and
* toss them */
-void
-rxi_ReapConnections(struct rxevent *unused, void *unused1, void *unused2)
+static void
+rxi_ReapConnections(struct rxevent *unused, void *unused1, void *unused2,
+ int unused3)
{
struct clock now, when;
+ struct rxevent *event;
clock_GetTime(&now);
/* Find server connection structures that haven't been used for
code = MUTEX_TRYENTER(&call->lock);
if (!code)
continue;
-#ifdef RX_ENABLE_LOCKS
result = rxi_CheckCall(call, 1);
-#else /* RX_ENABLE_LOCKS */
- result = rxi_CheckCall(call);
-#endif /* RX_ENABLE_LOCKS */
MUTEX_EXIT(&call->lock);
if (result == -2) {
/* If CheckCall freed the call, it might
code = MUTEX_TRYENTER(&peer->peer_lock);
if ((code) && (peer->refCount == 0)
&& ((peer->idleWhen + rx_idlePeerTime) < now.sec)) {
- rx_interface_stat_p rpc_stat, nrpc_stat;
+ struct opr_queue *cursor, *store;
size_t space;
/*
MUTEX_EXIT(&peer->peer_lock);
MUTEX_DESTROY(&peer->peer_lock);
- for (queue_Scan
- (&peer->rpcStats, rpc_stat, nrpc_stat,
- rx_interface_stat)) {
+
+ for (opr_queue_ScanSafe(&peer->rpcStats, cursor, store)) {
unsigned int num_funcs;
+ struct rx_interface_stat *rpc_stat
+ = opr_queue_Entry(cursor, struct rx_interface_stat,
+ entry);
if (!rpc_stat)
break;
- queue_Remove(&rpc_stat->queue_header);
- queue_Remove(&rpc_stat->all_peers);
+
+ opr_queue_Remove(&rpc_stat->entry);
+ opr_queue_Remove(&rpc_stat->entryPeers);
+
num_funcs = rpc_stat->stats[0].func_total;
space =
sizeof(rx_interface_stat_t) +
when = now;
when.sec += RX_REAP_TIME; /* Check every RX_REAP_TIME seconds */
- rxevent_Post(&when, rxi_ReapConnections, 0, 0);
+ event = rxevent_Post(&when, &now, rxi_ReapConnections, 0, NULL, 0);
+ rxevent_Put(&event);
}
void
rx_PrintPeerStats(FILE * file, struct rx_peer *peer)
{
- fprintf(file, "Peer %x.%d. " "Burst size %d, " "burst wait %d.%06d.\n",
- ntohl(peer->host), (int)ntohs(peer->port), (int)peer->burstSize,
- (int)peer->burstWait.sec, (int)peer->burstWait.usec);
+ fprintf(file, "Peer %x.%d.\n",
+ ntohl(peer->host), (int)ntohs(peer->port));
fprintf(file,
" Rtt %d, " "total sent %d, " "resent %d\n",
peer->rtt, peer->nSent, peer->reSends);
- fprintf(file,
- " Packet size %d, " "max in packet skew %d, "
- "max out packet skew %d\n", peer->ifMTU, (int)peer->inPacketSkew,
- (int)peer->outPacketSkew);
+ fprintf(file, " Packet size %d\n", peer->ifMTU);
}
#endif
taddr.sin_family = AF_INET;
taddr.sin_port = remotePort;
taddr.sin_addr.s_addr = remoteAddr;
+ memset(&taddr.sin_zero, 0, sizeof(taddr.sin_zero));
#ifdef STRUCT_SOCKADDR_HAS_SA_LEN
taddr.sin_len = sizeof(struct sockaddr_in);
#endif
peer->ifMTU = ntohs(peer->ifMTU);
peer->idleWhen = ntohl(peer->idleWhen);
peer->refCount = ntohs(peer->refCount);
- peer->burstWait.sec = ntohl(peer->burstWait.sec);
- peer->burstWait.usec = ntohl(peer->burstWait.usec);
peer->rtt = ntohl(peer->rtt);
peer->rtt_dev = ntohl(peer->rtt_dev);
peer->timeout.sec = 0;
peer->timeout.usec = 0;
peer->nSent = ntohl(peer->nSent);
peer->reSends = ntohl(peer->reSends);
- peer->inPacketSkew = ntohl(peer->inPacketSkew);
- peer->outPacketSkew = ntohl(peer->outPacketSkew);
peer->natMTU = ntohs(peer->natMTU);
peer->maxMTU = ntohs(peer->maxMTU);
peer->maxDgramPackets = ntohs(peer->maxDgramPackets);
peerStats->ifMTU = tp->ifMTU;
peerStats->idleWhen = tp->idleWhen;
peerStats->refCount = tp->refCount;
- peerStats->burstSize = tp->burstSize;
- peerStats->burst = tp->burst;
- peerStats->burstWait.sec = tp->burstWait.sec;
- peerStats->burstWait.usec = tp->burstWait.usec;
+ peerStats->burstSize = 0;
+ peerStats->burst = 0;
+ peerStats->burstWait.sec = 0;
+ peerStats->burstWait.usec = 0;
peerStats->rtt = tp->rtt;
peerStats->rtt_dev = tp->rtt_dev;
peerStats->timeout.sec = 0;
peerStats->timeout.usec = 0;
peerStats->nSent = tp->nSent;
peerStats->reSends = tp->reSends;
- peerStats->inPacketSkew = tp->inPacketSkew;
- peerStats->outPacketSkew = tp->outPacketSkew;
peerStats->natMTU = tp->natMTU;
peerStats->maxMTU = tp->maxMTU;
peerStats->maxDgramPackets = tp->maxDgramPackets;
peerStats->cwind = tp->cwind;
peerStats->nDgramPackets = tp->nDgramPackets;
peerStats->congestSeq = tp->congestSeq;
- peerStats->bytesSent.high = tp->bytesSent.high;
- peerStats->bytesSent.low = tp->bytesSent.low;
- peerStats->bytesReceived.high = tp->bytesReceived.high;
- peerStats->bytesReceived.low = tp->bytesReceived.low;
+ peerStats->bytesSent.high = tp->bytesSent >> 32;
+ peerStats->bytesSent.low = tp->bytesSent & MAX_AFS_UINT32;
+ peerStats->bytesReceived.high = tp->bytesReceived >> 32;
+ peerStats->bytesReceived.low
+ = tp->bytesReceived & MAX_AFS_UINT32;
MUTEX_EXIT(&tp->peer_lock);
MUTEX_ENTER(&rx_peerHashTable_lock);
struct rx_serverQueueEntry *sq;
#endif /* KERNEL */
- LOCK_RX_INIT;
- if (rxinit_status == 1) {
- UNLOCK_RX_INIT;
+ if (rx_atomic_test_and_set_bit(&rxinit_status, 0))
return; /* Already shutdown. */
- }
+
#ifndef KERNEL
rx_port = 0;
#ifndef AFS_PTHREAD_ENV
rxi_StopListener();
#endif /* AFS_PTHREAD_ENV */
shutdown_rxevent();
- rx_SetEpoch(0);
+ rx_epoch = 0;
#ifndef AFS_PTHREAD_ENV
#ifndef AFS_USE_GETTIMEOFDAY
clock_UnInit();
#endif /* AFS_USE_GETTIMEOFDAY */
#endif /* AFS_PTHREAD_ENV */
- while (!queue_IsEmpty(&rx_freeCallQueue)) {
- call = queue_First(&rx_freeCallQueue, rx_call);
- queue_Remove(call);
+ while (!opr_queue_IsEmpty(&rx_freeCallQueue)) {
+ call = opr_queue_First(&rx_freeCallQueue, struct rx_call, entry);
+ opr_queue_Remove(&call->entry);
rxi_Free(call, sizeof(struct rx_call));
}
- while (!queue_IsEmpty(&rx_idleServerQueue)) {
- sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
- queue_Remove(sq);
+ while (!opr_queue_IsEmpty(&rx_idleServerQueue)) {
+ sq = opr_queue_First(&rx_idleServerQueue, struct rx_serverQueueEntry,
+ entry);
+ opr_queue_Remove(&sq->entry);
}
#endif /* KERNEL */
MUTEX_ENTER(&rx_peerHashTable_lock);
for (peer = *peer_ptr; peer; peer = next) {
- rx_interface_stat_p rpc_stat, nrpc_stat;
+ struct opr_queue *cursor, *store;
size_t space;
MUTEX_ENTER(&rx_rpc_stats);
MUTEX_ENTER(&peer->peer_lock);
- for (queue_Scan
- (&peer->rpcStats, rpc_stat, nrpc_stat,
- rx_interface_stat)) {
+ for (opr_queue_ScanSafe(&peer->rpcStats, cursor, store)) {
unsigned int num_funcs;
+ struct rx_interface_stat *rpc_stat
+ = opr_queue_Entry(cursor, struct rx_interface_stat,
+ entry);
if (!rpc_stat)
break;
- queue_Remove(&rpc_stat->queue_header);
- queue_Remove(&rpc_stat->all_peers);
+ opr_queue_Remove(&rpc_stat->entry);
+ opr_queue_Remove(&rpc_stat->entryPeers);
num_funcs = rpc_stat->stats[0].func_total;
space =
sizeof(rx_interface_stat_t) +
rxi_dataQuota = RX_MAX_QUOTA;
rxi_availProcs = rxi_totalMin = rxi_minDeficit = 0;
MUTEX_EXIT(&rx_quota_mutex);
- rxinit_status = 1;
- UNLOCK_RX_INIT;
-}
-
-#ifdef RX_ENABLE_LOCKS
-void
-osirx_AssertMine(afs_kmutex_t * lockaddr, char *msg)
-{
- if (!MUTEX_ISMINE(lockaddr))
- osi_Panic("Lock not held: %s", msg);
}
-#endif /* RX_ENABLE_LOCKS */
#ifndef KERNEL
int i;
MUTEX_ENTER(&conn->conn_data_lock);
if (!conn->specific) {
- conn->specific = (void **)malloc((key + 1) * sizeof(void *));
+ conn->specific = malloc((key + 1) * sizeof(void *));
for (i = 0; i < key; i++)
conn->specific[i] = NULL;
conn->nSpecific = key + 1;
int i;
MUTEX_ENTER(&svc->svc_data_lock);
if (!svc->specific) {
- svc->specific = (void **)malloc((key + 1) * sizeof(void *));
+ svc->specific = malloc((key + 1) * sizeof(void *));
for (i = 0; i < key; i++)
svc->specific[i] = NULL;
svc->nSpecific = key + 1;
* which can come and go based upon the peer lifetime.
*/
-static struct rx_queue processStats = { &processStats, &processStats };
+static struct opr_queue processStats = { &processStats, &processStats };
/*
* peerStats is a queue used to store the statistics for all peer structs.
* Its contents are the union of all the peer rpcStats queues.
*/
-static struct rx_queue peerStats = { &peerStats, &peerStats };
+static struct opr_queue peerStats = { &peerStats, &peerStats };
/*
* rxi_monitor_processStats is used to turn process wide stat collection
static int rxi_monitor_peerStats = 0;
-/*
- * rxi_AddRpcStat - given all of the information for a particular rpc
- * call, create (if needed) and update the stat totals for the rpc.
- *
- * PARAMETERS
- *
- * IN stats - the queue of stats that will be updated with the new value
- *
- * IN rxInterface - a unique number that identifies the rpc interface
- *
- * IN currentFunc - the index of the function being invoked
- *
- * IN totalFunc - the total number of functions in this interface
- *
- * IN queueTime - the amount of time this function waited for a thread
+
+void
+rxi_ClearRPCOpStat(rx_function_entry_v1_p rpc_stat)
+{
+ rpc_stat->invocations = 0;
+ rpc_stat->bytes_sent = 0;
+ rpc_stat->bytes_rcvd = 0;
+ rpc_stat->queue_time_sum.sec = 0;
+ rpc_stat->queue_time_sum.usec = 0;
+ rpc_stat->queue_time_sum_sqr.sec = 0;
+ rpc_stat->queue_time_sum_sqr.usec = 0;
+ rpc_stat->queue_time_min.sec = 9999999;
+ rpc_stat->queue_time_min.usec = 9999999;
+ rpc_stat->queue_time_max.sec = 0;
+ rpc_stat->queue_time_max.usec = 0;
+ rpc_stat->execution_time_sum.sec = 0;
+ rpc_stat->execution_time_sum.usec = 0;
+ rpc_stat->execution_time_sum_sqr.sec = 0;
+ rpc_stat->execution_time_sum_sqr.usec = 0;
+ rpc_stat->execution_time_min.sec = 9999999;
+ rpc_stat->execution_time_min.usec = 9999999;
+ rpc_stat->execution_time_max.sec = 0;
+ rpc_stat->execution_time_max.usec = 0;
+}
+
+/*!
+ * Given all of the information for a particular rpc
+ * call, find or create (if requested) the stat structure for the rpc.
*
- * IN execTime - the amount of time this function invocation took to execute
+ * @param stats
+ * the queue of stats that will be updated with the new value
*
- * IN bytesSent - the number bytes sent by this invocation
+ * @param rxInterface
+ * a unique number that identifies the rpc interface
*
- * IN bytesRcvd - the number bytes received by this invocation
+ * @param totalFunc
+ * the total number of functions in this interface. this is only
+ * required if create is true
*
- * IN isServer - if true, this invocation was made to a server
+ * @param isServer
+ * if true, this invocation was made to a server
*
- * IN remoteHost - the ip address of the remote host
+ * @param remoteHost
+ * the ip address of the remote host. this is only required if create
+ * and addToPeerList are true
*
- * IN remotePort - the port of the remote host
+ * @param remotePort
+ * the port of the remote host. this is only required if create
+ * and addToPeerList are true
*
- * IN addToPeerList - if != 0, add newly created stat to the global peer list
+ * @param addToPeerList
+ * if != 0, add newly created stat to the global peer list
*
- * INOUT counter - if a new stats structure is allocated, the counter will
- * be updated with the new number of allocated stat structures
+ * @param counter
+ * if a new stats structure is allocated, the counter will
+ * be updated with the new number of allocated stat structures.
+ * only required if create is true
*
- * RETURN CODES
+ * @param create
+ * if no stats structure exists, allocate one
*
- * Returns void.
*/
-static int
-rxi_AddRpcStat(struct rx_queue *stats, afs_uint32 rxInterface,
- afs_uint32 currentFunc, afs_uint32 totalFunc,
- struct clock *queueTime, struct clock *execTime,
- afs_hyper_t * bytesSent, afs_hyper_t * bytesRcvd, int isServer,
- afs_uint32 remoteHost, afs_uint32 remotePort,
- int addToPeerList, unsigned int *counter)
+static rx_interface_stat_p
+rxi_FindRpcStat(struct opr_queue *stats, afs_uint32 rxInterface,
+ afs_uint32 totalFunc, int isServer, afs_uint32 remoteHost,
+ afs_uint32 remotePort, int addToPeerList,
+ unsigned int *counter, int create)
{
- int rc = 0;
- rx_interface_stat_p rpc_stat, nrpc_stat;
+ rx_interface_stat_p rpc_stat = NULL;
+ struct opr_queue *cursor;
/*
* See if there's already a structure for this interface
*/
- for (queue_Scan(stats, rpc_stat, nrpc_stat, rx_interface_stat)) {
+ for (opr_queue_Scan(stats, cursor)) {
+ rpc_stat = opr_queue_Entry(cursor, struct rx_interface_stat, entry);
+
if ((rpc_stat->stats[0].interfaceId == rxInterface)
&& (rpc_stat->stats[0].remote_is_server == isServer))
break;
}
+ /* if they didn't ask us to create, we're done */
+ if (!create) {
+ if (opr_queue_IsEnd(stats, cursor))
+ return NULL;
+ else
+ return rpc_stat;
+ }
+
+ /* can't proceed without these */
+ if (!totalFunc || !counter)
+ return NULL;
+
/*
* Didn't find a match so allocate a new structure and add it to the
* queue.
*/
- if (queue_IsEnd(stats, rpc_stat) || (rpc_stat == NULL)
+ if (opr_queue_IsEnd(stats, cursor) || (rpc_stat == NULL)
|| (rpc_stat->stats[0].interfaceId != rxInterface)
|| (rpc_stat->stats[0].remote_is_server != isServer)) {
int i;
totalFunc * sizeof(rx_function_entry_v1_t);
rpc_stat = rxi_Alloc(space);
- if (rpc_stat == NULL) {
- rc = 1;
- goto fail;
- }
+ if (rpc_stat == NULL)
+ return NULL;
+
*counter += totalFunc;
for (i = 0; i < totalFunc; i++) {
+ rxi_ClearRPCOpStat(&(rpc_stat->stats[i]));
rpc_stat->stats[i].remote_peer = remoteHost;
rpc_stat->stats[i].remote_port = remotePort;
rpc_stat->stats[i].remote_is_server = isServer;
rpc_stat->stats[i].interfaceId = rxInterface;
rpc_stat->stats[i].func_total = totalFunc;
rpc_stat->stats[i].func_index = i;
- hzero(rpc_stat->stats[i].invocations);
- hzero(rpc_stat->stats[i].bytes_sent);
- hzero(rpc_stat->stats[i].bytes_rcvd);
- rpc_stat->stats[i].queue_time_sum.sec = 0;
- rpc_stat->stats[i].queue_time_sum.usec = 0;
- rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
- rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
- rpc_stat->stats[i].queue_time_min.sec = 9999999;
- rpc_stat->stats[i].queue_time_min.usec = 9999999;
- rpc_stat->stats[i].queue_time_max.sec = 0;
- rpc_stat->stats[i].queue_time_max.usec = 0;
- rpc_stat->stats[i].execution_time_sum.sec = 0;
- rpc_stat->stats[i].execution_time_sum.usec = 0;
- rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
- rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
- rpc_stat->stats[i].execution_time_min.sec = 9999999;
- rpc_stat->stats[i].execution_time_min.usec = 9999999;
- rpc_stat->stats[i].execution_time_max.sec = 0;
- rpc_stat->stats[i].execution_time_max.usec = 0;
- }
- queue_Prepend(stats, rpc_stat);
+ }
+ opr_queue_Prepend(stats, &rpc_stat->entry);
if (addToPeerList) {
- queue_Prepend(&peerStats, &rpc_stat->all_peers);
+ opr_queue_Prepend(&peerStats, &rpc_stat->entryPeers);
}
}
+ return rpc_stat;
+}
+
+void
+rx_ClearProcessRPCStats(afs_int32 rxInterface)
+{
+ rx_interface_stat_p rpc_stat;
+ int totalFunc, i;
+
+ if (rxInterface == -1)
+ return;
+
+ MUTEX_ENTER(&rx_rpc_stats);
+ rpc_stat = rxi_FindRpcStat(&processStats, rxInterface, 0, 0,
+ 0, 0, 0, 0, 0);
+ if (rpc_stat) {
+ totalFunc = rpc_stat->stats[0].func_total;
+ for (i = 0; i < totalFunc; i++)
+ rxi_ClearRPCOpStat(&(rpc_stat->stats[i]));
+ }
+ MUTEX_EXIT(&rx_rpc_stats);
+ return;
+}
+
+void
+rx_ClearPeerRPCStats(afs_int32 rxInterface, afs_uint32 peerHost, afs_uint16 peerPort)
+{
+ rx_interface_stat_p rpc_stat;
+ int totalFunc, i;
+ struct rx_peer * peer;
+
+ if (rxInterface == -1)
+ return;
+
+ peer = rxi_FindPeer(peerHost, peerPort, 0);
+ if (!peer)
+ return;
+
+ MUTEX_ENTER(&rx_rpc_stats);
+ rpc_stat = rxi_FindRpcStat(&peer->rpcStats, rxInterface, 0, 1,
+ 0, 0, 0, 0, 0);
+ if (rpc_stat) {
+ totalFunc = rpc_stat->stats[0].func_total;
+ for (i = 0; i < totalFunc; i++)
+ rxi_ClearRPCOpStat(&(rpc_stat->stats[i]));
+ }
+ MUTEX_EXIT(&rx_rpc_stats);
+ return;
+}
+
+void *
+rx_CopyProcessRPCStats(afs_uint64 op)
+{
+ rx_interface_stat_p rpc_stat;
+ rx_function_entry_v1_p rpcop_stat =
+ rxi_Alloc(sizeof(rx_function_entry_v1_t));
+ int currentFunc = (op & MAX_AFS_UINT32);
+ afs_int32 rxInterface = (op >> 32);
+
+ if (!rxi_monitor_processStats)
+ return NULL;
+
+ if (rxInterface == -1)
+ return NULL;
+
+ if (rpcop_stat == NULL)
+ return NULL;
+
+ MUTEX_ENTER(&rx_rpc_stats);
+ rpc_stat = rxi_FindRpcStat(&processStats, rxInterface, 0, 0,
+ 0, 0, 0, 0, 0);
+ if (rpc_stat)
+ memcpy(rpcop_stat, &(rpc_stat->stats[currentFunc]),
+ sizeof(rx_function_entry_v1_t));
+ MUTEX_EXIT(&rx_rpc_stats);
+ if (!rpc_stat) {
+ rxi_Free(rpcop_stat, sizeof(rx_function_entry_v1_t));
+ return NULL;
+ }
+ return rpcop_stat;
+}
+
+void *
+rx_CopyPeerRPCStats(afs_uint64 op, afs_uint32 peerHost, afs_uint16 peerPort)
+{
+ rx_interface_stat_p rpc_stat;
+ rx_function_entry_v1_p rpcop_stat =
+ rxi_Alloc(sizeof(rx_function_entry_v1_t));
+ int currentFunc = (op & MAX_AFS_UINT32);
+ afs_int32 rxInterface = (op >> 32);
+ struct rx_peer *peer;
+
+ if (!rxi_monitor_peerStats)
+ return NULL;
+
+ if (rxInterface == -1)
+ return NULL;
+
+ if (rpcop_stat == NULL)
+ return NULL;
+
+ peer = rxi_FindPeer(peerHost, peerPort, 0);
+ if (!peer)
+ return NULL;
+
+ MUTEX_ENTER(&rx_rpc_stats);
+ rpc_stat = rxi_FindRpcStat(&peer->rpcStats, rxInterface, 0, 1,
+ 0, 0, 0, 0, 0);
+ if (rpc_stat)
+ memcpy(rpcop_stat, &(rpc_stat->stats[currentFunc]),
+ sizeof(rx_function_entry_v1_t));
+ MUTEX_EXIT(&rx_rpc_stats);
+ if (!rpc_stat) {
+ rxi_Free(rpcop_stat, sizeof(rx_function_entry_v1_t));
+ return NULL;
+ }
+ return rpcop_stat;
+}
+
+void
+rx_ReleaseRPCStats(void *stats)
+{
+ if (stats)
+ rxi_Free(stats, sizeof(rx_function_entry_v1_t));
+}
+
+/*!
+ * Given all of the information for a particular rpc
+ * call, create (if needed) and update the stat totals for the rpc.
+ *
+ * @param stats
+ * the queue of stats that will be updated with the new value
+ *
+ * @param rxInterface
+ * a unique number that identifies the rpc interface
+ *
+ * @param currentFunc
+ * the index of the function being invoked
+ *
+ * @param totalFunc
+ * the total number of functions in this interface
+ *
+ * @param queueTime
+ * the amount of time this function waited for a thread
+ *
+ * @param execTime
+ * the amount of time this function invocation took to execute
+ *
+ * @param bytesSent
+ * the number bytes sent by this invocation
+ *
+ * @param bytesRcvd
+ * the number bytes received by this invocation
+ *
+ * @param isServer
+ * if true, this invocation was made to a server
+ *
+ * @param remoteHost
+ * the ip address of the remote host
+ *
+ * @param remotePort
+ * the port of the remote host
+ *
+ * @param addToPeerList
+ * if != 0, add newly created stat to the global peer list
+ *
+ * @param counter
+ * if a new stats structure is allocated, the counter will
+ * be updated with the new number of allocated stat structures
+ *
+ */
+
+static int
+rxi_AddRpcStat(struct opr_queue *stats, afs_uint32 rxInterface,
+ afs_uint32 currentFunc, afs_uint32 totalFunc,
+ struct clock *queueTime, struct clock *execTime,
+ afs_uint64 bytesSent, afs_uint64 bytesRcvd, int isServer,
+ afs_uint32 remoteHost, afs_uint32 remotePort,
+ int addToPeerList, unsigned int *counter)
+{
+ int rc = 0;
+ rx_interface_stat_p rpc_stat;
+
+ rpc_stat = rxi_FindRpcStat(stats, rxInterface, totalFunc, isServer,
+ remoteHost, remotePort, addToPeerList, counter,
+ 1);
+ if (!rpc_stat) {
+ rc = -1;
+ goto fail;
+ }
/*
* Increment the stats for this function
*/
- hadd32(rpc_stat->stats[currentFunc].invocations, 1);
- hadd(rpc_stat->stats[currentFunc].bytes_sent, *bytesSent);
- hadd(rpc_stat->stats[currentFunc].bytes_rcvd, *bytesRcvd);
+ rpc_stat->stats[currentFunc].invocations++;
+ rpc_stat->stats[currentFunc].bytes_sent += bytesSent;
+ rpc_stat->stats[currentFunc].bytes_rcvd += bytesRcvd;
clock_Add(&rpc_stat->stats[currentFunc].queue_time_sum, queueTime);
clock_AddSq(&rpc_stat->stats[currentFunc].queue_time_sum_sqr, queueTime);
if (clock_Lt(queueTime, &rpc_stat->stats[currentFunc].queue_time_min)) {
return rc;
}
-/*
- * rx_IncrementTimeAndCount - increment the times and count for a particular
- * rpc function.
- *
- * PARAMETERS
- *
- * IN peer - the peer who invoked the rpc
- *
- * IN rxInterface - a unique number that identifies the rpc interface
- *
- * IN currentFunc - the index of the function being invoked
- *
- * IN totalFunc - the total number of functions in this interface
- *
- * IN queueTime - the amount of time this function waited for a thread
- *
- * IN execTime - the amount of time this function invocation took to execute
- *
- * IN bytesSent - the number bytes sent by this invocation
- *
- * IN bytesRcvd - the number bytes received by this invocation
- *
- * IN isServer - if true, this invocation was made to a server
- *
- * RETURN CODES
- *
- * Returns void.
- */
-
void
-rx_IncrementTimeAndCount(struct rx_peer *peer, afs_uint32 rxInterface,
- afs_uint32 currentFunc, afs_uint32 totalFunc,
- struct clock *queueTime, struct clock *execTime,
- afs_hyper_t * bytesSent, afs_hyper_t * bytesRcvd,
- int isServer)
+rxi_IncrementTimeAndCount(struct rx_peer *peer, afs_uint32 rxInterface,
+ afs_uint32 currentFunc, afs_uint32 totalFunc,
+ struct clock *queueTime, struct clock *execTime,
+ afs_uint64 bytesSent, afs_uint64 bytesRcvd,
+ int isServer)
{
if (!(rxi_monitor_peerStats || rxi_monitor_processStats))
}
MUTEX_EXIT(&rx_rpc_stats);
+}
+
+/*!
+ * Increment the times and count for a particular rpc function.
+ *
+ * Traditionally this call was invoked from rxgen stubs. Modern stubs
+ * call rx_RecordCallStatistics instead, so the public version of this
+ * function is left purely for legacy callers.
+ *
+ * @param peer
+ * The peer who invoked the rpc
+ *
+ * @param rxInterface
+ * A unique number that identifies the rpc interface
+ *
+ * @param currentFunc
+ * The index of the function being invoked
+ *
+ * @param totalFunc
+ * The total number of functions in this interface
+ *
+ * @param queueTime
+ * The amount of time this function waited for a thread
+ *
+ * @param execTime
+ * The amount of time this function invocation took to execute
+ *
+ * @param bytesSent
+ * The number bytes sent by this invocation
+ *
+ * @param bytesRcvd
+ * The number bytes received by this invocation
+ *
+ * @param isServer
+ * If true, this invocation was made to a server
+ *
+ */
+void
+rx_IncrementTimeAndCount(struct rx_peer *peer, afs_uint32 rxInterface,
+ afs_uint32 currentFunc, afs_uint32 totalFunc,
+ struct clock *queueTime, struct clock *execTime,
+ afs_hyper_t * bytesSent, afs_hyper_t * bytesRcvd,
+ int isServer)
+{
+ afs_uint64 sent64;
+ afs_uint64 rcvd64;
+ sent64 = ((afs_uint64)bytesSent->high << 32) + bytesSent->low;
+ rcvd64 = ((afs_uint64)bytesRcvd->high << 32) + bytesRcvd->low;
+
+ rxi_IncrementTimeAndCount(peer, rxInterface, currentFunc, totalFunc,
+ queueTime, execTime, sent64, rcvd64,
+ isServer);
}
+
+
/*
* rx_MarshallProcessRPCStats - marshall an array of rpc statistics
*
*(ptr++) = stats->interfaceId;
*(ptr++) = stats->func_total;
*(ptr++) = stats->func_index;
- *(ptr++) = hgethi(stats->invocations);
- *(ptr++) = hgetlo(stats->invocations);
- *(ptr++) = hgethi(stats->bytes_sent);
- *(ptr++) = hgetlo(stats->bytes_sent);
- *(ptr++) = hgethi(stats->bytes_rcvd);
- *(ptr++) = hgetlo(stats->bytes_rcvd);
+ *(ptr++) = stats->invocations >> 32;
+ *(ptr++) = stats->invocations & MAX_AFS_UINT32;
+ *(ptr++) = stats->bytes_sent >> 32;
+ *(ptr++) = stats->bytes_sent & MAX_AFS_UINT32;
+ *(ptr++) = stats->bytes_rcvd >> 32;
+ *(ptr++) = stats->bytes_rcvd & MAX_AFS_UINT32;
*(ptr++) = stats->queue_time_sum.sec;
*(ptr++) = stats->queue_time_sum.usec;
*(ptr++) = stats->queue_time_sum_sqr.sec;
ptr = *stats = rxi_Alloc(space);
if (ptr != NULL) {
- rx_interface_stat_p rpc_stat, nrpc_stat;
-
+ struct opr_queue *cursor;
- for (queue_Scan
- (&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
+ for (opr_queue_Scan(&processStats, cursor)) {
+ struct rx_interface_stat *rpc_stat =
+ opr_queue_Entry(cursor, struct rx_interface_stat, entry);
/*
* Copy the data based upon the caller version
*/
ptr = *stats = rxi_Alloc(space);
if (ptr != NULL) {
- rx_interface_stat_p rpc_stat, nrpc_stat;
- char *fix_offset;
-
- for (queue_Scan
- (&peerStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
- /*
- * We have to fix the offset of rpc_stat since we are
- * keeping this structure on two rx_queues. The rx_queue
- * package assumes that the rx_queue member is the first
- * member of the structure. That is, rx_queue assumes that
- * any one item is only on one queue at a time. We are
- * breaking that assumption and so we have to do a little
- * math to fix our pointers.
- */
+ struct opr_queue *cursor;
- fix_offset = (char *)rpc_stat;
- fix_offset -= offsetof(rx_interface_stat_t, all_peers);
- rpc_stat = (rx_interface_stat_p) fix_offset;
+ for (opr_queue_Scan(&peerStats, cursor)) {
+ struct rx_interface_stat *rpc_stat
+ = opr_queue_Entry(cursor, struct rx_interface_stat,
+ entryPeers);
/*
* Copy the data based upon the caller version
void
rx_disableProcessRPCStats(void)
{
- rx_interface_stat_p rpc_stat, nrpc_stat;
+ struct opr_queue *cursor, *store;
size_t space;
MUTEX_ENTER(&rx_rpc_stats);
rx_enable_stats = 0;
}
- for (queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
- unsigned int num_funcs = 0;
- if (!rpc_stat)
- break;
- queue_Remove(rpc_stat);
+ for (opr_queue_ScanSafe(&processStats, cursor, store)) {
+ unsigned int num_funcs = 0;
+ struct rx_interface_stat *rpc_stat
+ = opr_queue_Entry(cursor, struct rx_interface_stat, entry);
+
+ opr_queue_Remove(&rpc_stat->entry);
+
num_funcs = rpc_stat->stats[0].func_total;
space =
sizeof(rx_interface_stat_t) +
next = peer->next;
code = MUTEX_TRYENTER(&peer->peer_lock);
if (code) {
- rx_interface_stat_p rpc_stat, nrpc_stat;
size_t space;
+ struct opr_queue *cursor, *store;
if (prev == *peer_ptr) {
*peer_ptr = next;
peer->refCount++;
MUTEX_EXIT(&rx_peerHashTable_lock);
- for (queue_Scan
- (&peer->rpcStats, rpc_stat, nrpc_stat,
- rx_interface_stat)) {
+ for (opr_queue_ScanSafe(&peer->rpcStats, cursor, store)) {
unsigned int num_funcs = 0;
- if (!rpc_stat)
- break;
- queue_Remove(&rpc_stat->queue_header);
- queue_Remove(&rpc_stat->all_peers);
+ struct rx_interface_stat *rpc_stat
+ = opr_queue_Entry(cursor, struct rx_interface_stat,
+ entry);
+
+ opr_queue_Remove(&rpc_stat->entry);
+ opr_queue_Remove(&rpc_stat->entryPeers);
num_funcs = rpc_stat->stats[0].func_total;
space =
sizeof(rx_interface_stat_t) +
void
rx_clearProcessRPCStats(afs_uint32 clearFlag)
{
- rx_interface_stat_p rpc_stat, nrpc_stat;
+ struct opr_queue *cursor;
MUTEX_ENTER(&rx_rpc_stats);
- for (queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
+ for (opr_queue_Scan(&processStats, cursor)) {
unsigned int num_funcs = 0, i;
+ struct rx_interface_stat *rpc_stat
+ = opr_queue_Entry(cursor, struct rx_interface_stat, entry);
+
num_funcs = rpc_stat->stats[0].func_total;
for (i = 0; i < num_funcs; i++) {
if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
- hzero(rpc_stat->stats[i].invocations);
+ rpc_stat->stats[i].invocations = 0;
}
if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
- hzero(rpc_stat->stats[i].bytes_sent);
+ rpc_stat->stats[i].bytes_sent = 0;
}
if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
- hzero(rpc_stat->stats[i].bytes_rcvd);
+ rpc_stat->stats[i].bytes_rcvd = 0;
}
if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
rpc_stat->stats[i].queue_time_sum.sec = 0;
void
rx_clearPeerRPCStats(afs_uint32 clearFlag)
{
- rx_interface_stat_p rpc_stat, nrpc_stat;
+ struct opr_queue *cursor;
MUTEX_ENTER(&rx_rpc_stats);
- for (queue_Scan(&peerStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
- unsigned int num_funcs = 0, i;
- char *fix_offset;
- /*
- * We have to fix the offset of rpc_stat since we are
- * keeping this structure on two rx_queues. The rx_queue
- * package assumes that the rx_queue member is the first
- * member of the structure. That is, rx_queue assumes that
- * any one item is only on one queue at a time. We are
- * breaking that assumption and so we have to do a little
- * math to fix our pointers.
- */
-
- fix_offset = (char *)rpc_stat;
- fix_offset -= offsetof(rx_interface_stat_t, all_peers);
- rpc_stat = (rx_interface_stat_p) fix_offset;
+ for (opr_queue_Scan(&peerStats, cursor)) {
+ unsigned int num_funcs, i;
+ struct rx_interface_stat *rpc_stat
+ = opr_queue_Entry(cursor, struct rx_interface_stat, entryPeers);
num_funcs = rpc_stat->stats[0].func_total;
for (i = 0; i < num_funcs; i++) {
if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
- hzero(rpc_stat->stats[i].invocations);
+ rpc_stat->stats[i].invocations = 0;
}
if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
- hzero(rpc_stat->stats[i].bytes_sent);
+ rpc_stat->stats[i].bytes_sent = 0;
}
if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
- hzero(rpc_stat->stats[i].bytes_rcvd);
+ rpc_stat->stats[i].bytes_rcvd = 0;
}
if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
rpc_stat->stats[i].queue_time_sum.sec = 0;
for (c = rx_allCallsp; c; c = c->allNextp) {
u_short rqc, tqc, iovqc;
- struct rx_packet *p, *np;
MUTEX_ENTER(&c->lock);
- queue_Count(&c->rq, p, np, rx_packet, rqc);
- queue_Count(&c->tq, p, np, rx_packet, tqc);
- queue_Count(&c->iovq, p, np, rx_packet, iovqc);
+ rqc = opr_queue_Count(&c->rq);
+ tqc = opr_queue_Count(&c->tq);
+ iovqc = opr_queue_Count(&c->app.iovq);
RXDPRINTF(RXDPRINTOUT, "%s - call=0x%p, id=%u, state=%u, mode=%u, conn=%p, epoch=%u, cid=%u, callNum=%u, connFlags=0x%x, flags=0x%x, "
"rqc=%u,%u, tqc=%u,%u, iovqc=%u,%u, "
"lstatus=%u, rstatus=%u, error=%d, timeout=%u, "
- "resendEvent=%d, timeoutEvt=%d, keepAliveEvt=%d, delayedAckEvt=%d, delayedAbortEvt=%d, abortCode=%d, abortCount=%d, "
+ "resendEvent=%d, keepAliveEvt=%d, delayedAckEvt=%d, delayedAbortEvt=%d, abortCode=%d, abortCount=%d, "
"lastSendTime=%u, lastRecvTime=%u, lastSendData=%u"
#ifdef RX_ENABLE_LOCKS
", refCount=%u"
"refCountAlive=%u, refCountPacket=%u, refCountSend=%u, refCountAckAll=%u, refCountAbort=%u"
#endif
"\r\n",
- cookie, c, c->call_id, (afs_uint32)c->state, (afs_uint32)c->mode, c->conn, c->conn?c->conn->epoch:0, c->conn?c->conn->cid:0,
+ cookie, c, c->call_id, (afs_uint32)c->state, (afs_uint32)c->app.mode, c->conn, c->conn?c->conn->epoch:0, c->conn?c->conn->cid:0,
c->callNumber?*c->callNumber:0, c->conn?c->conn->flags:0, c->flags,
(afs_uint32)c->rqc, (afs_uint32)rqc, (afs_uint32)c->tqc, (afs_uint32)tqc, (afs_uint32)c->iovqc, (afs_uint32)iovqc,
(afs_uint32)c->localStatus, (afs_uint32)c->remoteStatus, c->error, c->timeout,
- c->resendEvent?1:0, c->timeoutEvent?1:0, c->keepAliveEvent?1:0, c->delayedAckEvent?1:0, c->delayedAbortEvent?1:0,
+ c->resendEvent?1:0, c->keepAliveEvent?1:0, c->delayedAckEvent?1:0, c->delayedAbortEvent?1:0,
c->abortCode, c->abortCount, c->lastSendTime, c->lastReceiveTime, c->lastSendData
#ifdef RX_ENABLE_LOCKS
, (afs_uint32)c->refCount