static void rxi_ScheduleGrowMTUEvent(struct rx_call *call, int secs);
static void rxi_KeepAliveOn(struct rx_call *call);
static void rxi_GrowMTUOn(struct rx_call *call);
-static void rxi_ChallengeOn(struct rx_connection *conn);
+static int rxi_ChallengeOn(struct rx_connection *conn);
static int rxi_CheckCall(struct rx_call *call, int haveCTLock);
static void rxi_AckAllInTransmitQueue(struct rx_call *call);
static void rxi_CancelKeepAliveEvent(struct rx_call *call);
static void rxi_CancelGrowMTUEvent(struct rx_call *call);
static void update_nextCid(void);
+#ifndef KERNEL
+static void rxi_Finalize_locked(void);
+#elif defined(UKERNEL)
+# define rxi_Finalize_locked() do { } while (0)
+#endif
+
#ifdef RX_ENABLE_LOCKS
struct rx_tq_debug {
rx_atomic_t rxi_start_aborted; /* rxi_start awoke after rxi_Send in error.*/
* calls to process */
struct opr_queue rx_idleServerQueue;
+/* List of free rx_serverQueueEntry structs */
+struct opr_queue rx_freeServerQueue;
+
#if !defined(offsetof)
#include <stddef.h> /* for definition of offsetof() */
#endif
#ifdef RX_ENABLE_LOCKS
afs_kmutex_t rx_atomic_mutex;
+static afs_kmutex_t freeSQEList_lock;
#endif
/* Forward prototypes */
#endif /* RX_ENABLE_LOCKS */
struct rx_serverQueueEntry *rx_waitForPacket = 0;
+/*
+ * This mutex serializes calls to our initialization and shutdown routines
+ * (rx_InitHost, rx_Finalize and shutdown_rx). Only one thread can be running
+ * these at any time; all other threads must wait for it to finish running, and
+ * then examine the value of rxi_running afterwards.
+ */
+#ifdef AFS_PTHREAD_ENV
+# define LOCK_RX_INIT MUTEX_ENTER(&rx_init_mutex)
+# define UNLOCK_RX_INIT MUTEX_EXIT(&rx_init_mutex)
+#else
+# define LOCK_RX_INIT
+# define UNLOCK_RX_INIT
+#endif
+
/* ------------Exported Interfaces------------- */
+static rx_atomic_t rxi_running = RX_ATOMIC_INIT(0);
+int
+rxi_IsRunning(void)
+{
+ return rx_atomic_read(&rxi_running);
+}
+
/* Initialize rx. A port number may be mentioned, in which case this
* becomes the default port number for any service installed later.
* If 0 is provided for the port number, a random port will be chosen
* by the kernel. Whether this will ever overlap anything in
* /etc/services is anybody's guess... Returns 0 on success, -1 on
* error. */
-#if !(defined(AFS_NT40_ENV) || defined(RXK_UPCALL_ENV))
-static
-#endif
-rx_atomic_t rxinit_status = RX_ATOMIC_INIT(1);
-
int
rx_InitHost(u_int host, u_int port)
{
SPLVAR;
INIT_PTHREAD_LOCKS;
- if (!rx_atomic_test_and_clear_bit(&rxinit_status, 0))
+ LOCK_RX_INIT;
+ if (rxi_IsRunning()) {
+ UNLOCK_RX_INIT;
return 0; /* already started */
-
+ }
#ifdef RXDEBUG
rxi_DebugInit();
#endif
#ifdef AFS_NT40_ENV
if (afs_winsockInit() < 0)
- return -1;
+ goto error;
#endif
#ifndef KERNEL
rx_socket = rxi_GetHostUDPSocket(host, (u_short) port);
if (rx_socket == OSI_NULLSOCKET) {
- return RX_ADDRINUSE;
+ goto addrinuse;
}
#if defined(RX_ENABLE_LOCKS) && defined(KERNEL)
#ifdef RX_LOCKS_DB
socklen_t addrlen = sizeof(addr);
#endif
if (getsockname((intptr_t)rx_socket, (struct sockaddr *)&addr, &addrlen)) {
- rx_Finalize();
+ rxi_Finalize_locked();
osi_Free(htable, rx_hashTableSize * sizeof(struct rx_connection *));
- return -1;
+ goto error;
}
rx_port = addr.sin_port;
#endif
}
rx_stats.minRtt.sec = 9999999;
if (RAND_bytes(&rx_epoch, sizeof(rx_epoch)) != 1)
- return -1;
+ goto error;
rx_epoch = (rx_epoch & ~0x40000000) | 0x80000000;
if (RAND_bytes(&rx_nextCid, sizeof(rx_nextCid)) != 1)
- return -1;
+ goto error;
rx_nextCid &= RX_CIDMASK;
MUTEX_ENTER(&rx_quota_mutex);
rxi_dataQuota += rx_extraQuota; /* + extra pkts caller asked to rsrv */
/* Initialize various global queues */
opr_queue_Init(&rx_idleServerQueue);
+ opr_queue_Init(&rx_freeServerQueue);
opr_queue_Init(&rx_incomingCallQueue);
opr_queue_Init(&rx_freeCallQueue);
rxi_StartListener();
USERPRI;
- rx_atomic_clear_bit(&rxinit_status, 0);
+
+ rx_atomic_set(&rxi_running, 1);
+ UNLOCK_RX_INIT;
+
return 0;
+
+ addrinuse:
+ UNLOCK_RX_INIT;
+ return RX_ADDRINUSE;
+
+ error:
+ UNLOCK_RX_INIT;
+ return -1;
}
int
{
int hashindex, i;
struct rx_connection *conn;
+ int code;
SPLVAR;
conn->lastBusy[i] = 0;
}
- RXS_NewConnection(securityObject, conn);
+ code = RXS_NewConnection(securityObject, conn);
hashindex =
CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
rx_atomic_inc(&rx_stats.nClientConns);
MUTEX_EXIT(&rx_connHashTable_lock);
USERPRI;
+ if (code) {
+ rxi_ConnectionError(conn, code);
+ }
return conn;
}
void *value)
{
int i;
+ int code;
for (i = 0; i<service->nSecurityObjects; i++) {
if (service->securityObjects[i]) {
- RXS_SetConfiguration(service->securityObjects[i], NULL, type,
- value, NULL);
+ code = RXS_SetConfiguration(service->securityObjects[i], NULL, type,
+ value, NULL);
+ if (code) {
+ return code;
+ }
}
}
return 0;
void
rx_WakeupServerProcs(void)
{
- struct rx_serverQueueEntry *np, *tqp;
+ struct rx_serverQueueEntry *np;
struct opr_queue *cursor;
SPLVAR;
osi_rxWakeup(rx_waitForPacket);
#endif /* RX_ENABLE_LOCKS */
MUTEX_ENTER(&freeSQEList_lock);
- for (np = rx_FreeSQEList; np; np = tqp) {
- tqp = *(struct rx_serverQueueEntry **)np;
+ for (opr_queue_Scan(&rx_freeServerQueue, cursor)) {
+ np = opr_queue_Entry(cursor, struct rx_serverQueueEntry, entry);
#ifdef RX_ENABLE_LOCKS
CV_BROADCAST(&np->cv);
#else /* RX_ENABLE_LOCKS */
MUTEX_ENTER(&freeSQEList_lock);
- if ((sq = rx_FreeSQEList)) {
- rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
+ if (!opr_queue_IsEmpty(&rx_freeServerQueue)) {
+ sq = opr_queue_First(&rx_freeServerQueue, struct rx_serverQueueEntry,
+ entry);
+ opr_queue_Remove(&sq->entry);
MUTEX_EXIT(&freeSQEList_lock);
} else { /* otherwise allocate a new one and return that */
MUTEX_EXIT(&freeSQEList_lock);
CV_WAIT(&sq->cv, &rx_serverPool_lock);
#ifdef KERNEL
if (afs_termState == AFSOP_STOP_RXCALLBACK) {
- MUTEX_EXIT(&rx_serverPool_lock);
- return (struct rx_call *)0;
+ break;
}
#endif
} while (!(call = sq->newcall)
&& !(socketp && *socketp != OSI_NULLSOCKET));
+ if (opr_queue_IsOnQueue(&sq->entry)) {
+ opr_queue_Remove(&sq->entry);
+ }
MUTEX_EXIT(&rx_serverPool_lock);
if (call) {
MUTEX_ENTER(&call->lock);
}
MUTEX_ENTER(&freeSQEList_lock);
- *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
- rx_FreeSQEList = sq;
+ opr_queue_Prepend(&rx_freeServerQueue, &sq->entry);
MUTEX_EXIT(&freeSQEList_lock);
if (call) {
NETPRI;
MUTEX_ENTER(&freeSQEList_lock);
- if ((sq = rx_FreeSQEList)) {
- rx_FreeSQEList = *(struct rx_serverQueueEntry **)sq;
+ if (!opr_queue_IsEmpty(&rx_freeServerQueue)) {
+ sq = opr_queue_First(&rx_freeServerQueue, struct rx_serverQueueEntry,
+ entry);
+ opr_queue_Remove(&sq->entry);
MUTEX_EXIT(&freeSQEList_lock);
} else { /* otherwise allocate a new one and return that */
MUTEX_EXIT(&freeSQEList_lock);
MUTEX_EXIT(&sq->lock);
MUTEX_ENTER(&freeSQEList_lock);
- *(struct rx_serverQueueEntry **)sq = rx_FreeSQEList;
- rx_FreeSQEList = sq;
+ opr_queue_Prepend(&rx_freeServerQueue, &sq->entry);
MUTEX_EXIT(&freeSQEList_lock);
if (call) {
call->abortCount = 0;
}
- call->arrivalProc = (void (*)())0;
+ call->arrivalProc = NULL;
if (rc && call->error == 0) {
rxi_CallError(call, rc);
call->app.mode = RX_MODE_ERROR;
void
rx_Finalize(void)
{
- struct rx_connection **conn_ptr, **conn_end;
-
INIT_PTHREAD_LOCKS;
- if (rx_atomic_test_and_set_bit(&rxinit_status, 0))
+ LOCK_RX_INIT;
+ if (!rxi_IsRunning()) {
+ UNLOCK_RX_INIT;
return; /* Already shutdown. */
+ }
+ rxi_Finalize_locked();
+ UNLOCK_RX_INIT;
+}
+static void
+rxi_Finalize_locked(void)
+{
+ struct rx_connection **conn_ptr, **conn_end;
+ rx_atomic_set(&rxi_running, 0);
rxi_DeleteCachedConnections();
if (rx_connHashTable) {
MUTEX_ENTER(&rx_connHashTable_lock);
#ifdef AFS_NT40_ENV
afs_winsockCleanup();
#endif
-
}
#endif
void
rxi_Free(void *addr, size_t size)
{
+ if (!addr) {
+ return;
+ }
if (rx_stats_active) {
rx_atomic_sub(&rxi_Allocsize, (int) size);
rx_atomic_dec(&rxi_Alloccnt);
int *unknownService)
{
int hashindex, flag, i;
+ int code = 0;
struct rx_connection *conn;
*unknownService = 0;
hashindex = CONN_HASH(host, port, cid, epoch, type);
conn->rwind[i] = rx_initReceiveWindow;
}
/* Notify security object of the new connection */
- RXS_NewConnection(conn->securityObject, conn);
+ code = RXS_NewConnection(conn->securityObject, conn);
/* XXXX Connection timeout? */
if (service->newConnProc)
(*service->newConnProc) (conn);
rxLastConn = conn; /* store this connection as the last conn used */
MUTEX_EXIT(&rx_connHashTable_lock);
+ if (code) {
+ rxi_ConnectionError(conn, code);
+ }
return conn;
}
memset(&addr.sin_zero, 0, sizeof(addr.sin_zero));
#ifdef STRUCT_SOCKADDR_HAS_SA_LEN
addr.sin_len = sizeof(addr);
-#endif /* AFS_OSF_ENV */
+#endif
drop = (*rx_justReceived) (np, &addr);
/* drop packet if return value is non-zero */
if (drop)
don't abort an abort. */
if (!conn) {
if (unknownService && (np->header.type != RX_PACKET_TYPE_ABORT))
- rxi_SendRawAbort(socket, host, port, 1, RX_INVALID_OPERATION,
+ rxi_SendRawAbort(socket, host, port, 0, RX_INVALID_OPERATION,
np, 0);
return np;
}
static void
TryAttach(struct rx_call *acall, osi_socket socket,
int *tnop, struct rx_call **newcallp,
- int reachOverride)
+ int reachOverride, int istack)
{
struct rx_connection *conn = acall->conn;
* may not any proc available
*/
} else {
- rxi_ChallengeOn(acall->conn);
+ int code;
+ code = rxi_ChallengeOn(acall->conn);
+ if (code) {
+ /*
+ * Ideally we would rxi_ConnectionError here, but doing that is
+ * difficult, because some callers may have locked 'call',
+ * _and_ another call on the same conn. So we cannot
+ * rxi_ConnectionError, since that needs to lock every call on
+ * the conn. But we can at least abort the call we have.
+ */
+ rxi_CallError(acall, code);
+ rxi_SendCallAbort(acall, NULL, istack, 0);
+ }
}
}
}
if (call->arrivalProc) {
(*call->arrivalProc) (call, call->arrivalProcHandle,
call->arrivalProcArg);
- call->arrivalProc = (void (*)())0;
+ call->arrivalProc = NULL;
}
/* Update last packet received */
* server thread is available, this thread becomes a server
* thread and the server thread becomes a listener thread. */
if (isFirst) {
- TryAttach(call, socket, tnop, newcallp, 0);
+ TryAttach(call, socket, tnop, newcallp, 0, istack);
}
}
/* This is not the expected next packet. */
}
static void
-rxi_UpdatePeerReach(struct rx_connection *conn, struct rx_call *acall)
+rxi_UpdatePeerReach(struct rx_connection *conn, struct rx_call *acall,
+ int istack)
{
struct rx_peer *peer = conn->peer;
if (call != acall)
MUTEX_ENTER(&call->lock);
/* tnop can be null if newcallp is null */
- TryAttach(call, (osi_socket) - 1, NULL, NULL, 1);
+ TryAttach(call, (osi_socket) - 1, NULL, NULL, 1, istack);
if (call != acall)
MUTEX_EXIT(&call->lock);
}
}
if (ap->reason == RX_ACK_PING_RESPONSE)
- rxi_UpdatePeerReach(conn, call);
+ rxi_UpdatePeerReach(conn, call, istack);
if (conn->lastPacketSizeSeq) {
MUTEX_ENTER(&conn->conn_data_lock);
#ifdef RX_ENABLE_LOCKS
/* XXX Hack. Because we have to release the global call lock when sending
- * packets (osi_NetSend) we drop all acks while we're traversing the tq
+ * packets (rxi_NetSend) we drop all acks while we're traversing the tq
* in rxi_Start sending packets out because packets may move to the
* freePacketQueue as result of being here! So we drop these packets until
* we're safely out of the traversing. Really ugly!
* some calls went into attach-wait while we were waiting
* for authentication..
*/
- rxi_UpdatePeerReach(conn, NULL);
+ rxi_UpdatePeerReach(conn, NULL, istack);
}
return np;
}
if (call->arrivalProc) {
(*call->arrivalProc) (call, call->arrivalProcHandle,
call->arrivalProcArg);
- call->arrivalProc = (void (*)())0;
+ call->arrivalProc = NULL;
}
rxi_Send(struct rx_call *call, struct rx_packet *p,
int istack)
{
+ int code;
struct rx_connection *conn = call->conn;
/* Stamp each packet with the user supplied status */
/* Allow the security object controlling this call's security to
* make any last-minute changes to the packet */
- RXS_SendPacket(conn->securityObject, call, p);
+ code = RXS_SendPacket(conn->securityObject, call, p);
+ if (code) {
+ MUTEX_EXIT(&call->lock);
+ CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
+ rxi_ConnectionError(conn, code);
+ CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
+ MUTEX_ENTER(&call->lock);
+ return;
+ }
/* Since we're about to send SOME sort of packet to the peer, it's
* safe to nuke any scheduled end-of-packets ack */
tmpiov[0].iov_base = tbuffer;
tmpiov[0].iov_len = 1 + sizeof(struct rx_header);
- osi_NetSend(socket, &taddr, tmpiov, 1, 1 + sizeof(struct rx_header), 1);
+ rxi_NetSend(socket, &taddr, tmpiov, 1, 1 + sizeof(struct rx_header), 1);
MUTEX_ENTER(&conn->conn_data_lock);
/* We ran, so the handle is no longer needed to try to cancel ourselves. */
*
* This routine is both an event handler and a function called directly;
* when called directly the passed |event| is NULL and the
- * conn->conn->data>lock must must not be held.
+ * conn->conn->data>lock must must not be held. Also, when called as an
+ * an event handler, we must putConnection before we exit; but when called
+ * directly (the first challenge), we must NOT putConnection.
*/
static void
rxi_ChallengeEvent(struct rxevent *event,
void *arg0, void *arg1, int tries)
{
struct rx_connection *conn = arg0;
+ int event_raised = 0; /* assume we were called directly */
MUTEX_ENTER(&conn->conn_data_lock);
- if (event != NULL && event == conn->challengeEvent)
+ if (event != NULL && event == conn->challengeEvent) {
+ event_raised = 1; /* called as an event */
rxevent_Put(&conn->challengeEvent);
+ }
MUTEX_EXIT(&conn->conn_data_lock);
/* If there are no active calls it is not worth re-issuing the
* challenge. If the client issues another call on this connection
* the challenge can be requested at that time.
*/
- if (!rxi_HasActiveCalls(conn)) {
- putConnection(conn);
- return;
- }
+ if (!rxi_HasActiveCalls(conn))
+ goto done;
if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) {
struct rx_packet *packet;
}
}
MUTEX_EXIT(&conn->conn_call_lock);
- putConnection(conn);
- return;
+ goto done;
}
packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
if (packet) {
- /* If there's no packet available, do this later. */
- RXS_GetChallenge(conn->securityObject, conn, packet);
- rxi_SendSpecial((struct rx_call *)0, conn, packet,
- RX_PACKET_TYPE_CHALLENGE, NULL, -1, 0);
+ int code;
+ code = RXS_GetChallenge(conn->securityObject, conn, packet);
+ if (code && event_raised) {
+ /*
+ * We can only rxi_ConnectionError the connection if we are
+ * running as an event. Otherwise, the caller may have our call
+ * locked, and so we cannot call rxi_ConnectionError (since it
+ * tries to lock each call in the conn).
+ */
+ rxi_FreePacket(packet);
+ rxi_ConnectionError(conn, code);
+ goto done;
+ }
+ if (code == 0) {
+ /* Only send a challenge packet if we were able to allocate a
+ * packet, and the security layer successfully populated the
+ * challenge. */
+ rxi_SendSpecial((struct rx_call *)0, conn, packet,
+ RX_PACKET_TYPE_CHALLENGE, NULL, -1, 0);
+ conn->securityChallengeSent = 1;
+ }
rxi_FreePacket(packet);
- conn->securityChallengeSent = 1;
}
clock_GetTime(&now);
when = now;
}
MUTEX_EXIT(&conn->conn_data_lock);
}
- putConnection(conn);
+ done:
+ if (event_raised)
+ putConnection(conn);
}
/* Call this routine to start requesting the client to authenticate
* the call times out, or an invalid response is returned. The
* security object associated with the connection is asked to create
* the challenge at this time. */
-static void
+static int
rxi_ChallengeOn(struct rx_connection *conn)
{
int start = 0;
start = 1;
MUTEX_EXIT(&conn->conn_data_lock);
if (start) {
- RXS_CreateChallenge(conn->securityObject, conn);
+ int code;
+ code = RXS_CreateChallenge(conn->securityObject, conn);
+ if (code) {
+ return code;
+ }
rxi_ChallengeEvent(NULL, conn, 0, RX_CHALLENGE_MAXTRIES);
- };
+ }
+ return 0;
}
struct rx_serverQueueEntry *sq;
#endif /* KERNEL */
- if (rx_atomic_test_and_set_bit(&rxinit_status, 0))
+ LOCK_RX_INIT;
+ if (!rxi_IsRunning()) {
+ UNLOCK_RX_INIT;
return; /* Already shutdown. */
-
+ }
+ rx_atomic_set(&rxi_running, 0);
#ifndef KERNEL
rx_port = 0;
#ifndef AFS_PTHREAD_ENV
MUTEX_ENTER(&freeSQEList_lock);
- while ((np = rx_FreeSQEList)) {
- rx_FreeSQEList = *(struct rx_serverQueueEntry **)np;
+ while (!opr_queue_IsEmpty(&rx_freeServerQueue)) {
+ np = opr_queue_First(&rx_freeServerQueue, struct rx_serverQueueEntry,
+ entry);
+ opr_queue_Remove(&np->entry);
MUTEX_DESTROY(&np->lock);
rxi_Free(np, sizeof(*np));
}
rxi_dataQuota = RX_MAX_QUOTA;
rxi_availProcs = rxi_totalMin = rxi_minDeficit = 0;
MUTEX_EXIT(&rx_quota_mutex);
+ UNLOCK_RX_INIT;
}
#ifndef KERNEL
return 0;
}
#endif
+
+int
+rxi_NetSend(osi_socket socket, void *addr, struct iovec *dvec,
+ int nvecs, int length, int istack)
+{
+ return osi_NetSend(socket, addr, dvec, nvecs, length, istack);
+}