#include "h/socket.h"
#endif
#include "netinet/in.h"
+#ifdef AFS_SUN57_ENV
+#include "inet/common.h"
+#include "inet/ip.h"
+#include "inet/ip_ire.h"
+#endif
#include "afs/afs_args.h"
#include "afs/afs_osi.h"
#ifdef RX_KERNEL_TRACE
#include "sys/debug.h"
#endif
#include "afsint.h"
-#ifdef AFS_ALPHA_ENV
+#ifdef AFS_OSF_ENV
#undef kmem_alloc
#undef kmem_free
#undef mem_alloc
#undef mem_free
#undef register
-#endif /* AFS_ALPHA_ENV */
+#endif /* AFS_OSF_ENV */
#else /* !UKERNEL */
#include "afs/sysincludes.h"
#include "afsincludes.h"
# include "rxgen_consts.h"
#else /* KERNEL */
# include <sys/types.h>
+# include <string.h>
+# include <stdarg.h>
# include <errno.h>
#ifdef AFS_NT40_ENV
# include <stdlib.h>
# include <fcntl.h>
# include <afs/afsutil.h>
+# include <WINNT\afsreg.h>
#else
# include <sys/socket.h>
# include <sys/file.h>
# include <netinet/in.h>
# include <sys/time.h>
#endif
-#ifdef HAVE_STRING_H
-#include <string.h>
-#else
-#ifdef HAVE_STRINGS_H
-#include <strings.h>
-#endif
-#endif
# include "rx.h"
# include "rx_user.h"
# include "rx_clock.h"
# include <afs/rxgen_consts.h>
#endif /* KERNEL */
-int (*registerProgram) () = 0;
-int (*swapNameProgram) () = 0;
+#ifndef KERNEL
+#ifdef AFS_PTHREAD_ENV
+#ifndef AFS_NT40_ENV
+int (*registerProgram) (pid_t, char *) = 0;
+int (*swapNameProgram) (pid_t, const char *, char *) = 0;
+#endif
+#else
+int (*registerProgram) (PROCESS, char *) = 0;
+int (*swapNameProgram) (PROCESS, const char *, char *) = 0;
+#endif
+#endif
/* Local static routines */
static void rxi_DestroyConnectionNoLock(register struct rx_connection *conn);
*/
extern pthread_mutex_t rx_stats_mutex;
-extern pthread_mutex_t rxkad_stats_mutex;
extern pthread_mutex_t des_init_mutex;
extern pthread_mutex_t des_random_mutex;
extern pthread_mutex_t rx_clock_mutex;
static pthread_mutex_t epoch_mutex;
static pthread_mutex_t rx_init_mutex;
static pthread_mutex_t rx_debug_mutex;
+static pthread_mutex_t rx_rpc_stats;
static void
rxi_InitPthread(void)
(&rxkad_client_uid_mutex, (const pthread_mutexattr_t *)0) == 0);
assert(pthread_mutex_init
(&rxkad_random_mutex, (const pthread_mutexattr_t *)0) == 0);
- assert(pthread_mutex_init
- (&rxkad_stats_mutex, (const pthread_mutexattr_t *)0) == 0);
assert(pthread_mutex_init(&rx_debug_mutex, (const pthread_mutexattr_t *)0)
== 0);
assert(pthread_cond_init(&rx_listener_cond, (const pthread_condattr_t *)0)
== 0);
assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
+ assert(pthread_key_create(&rx_ts_info_key, NULL) == 0);
+
+ rxkad_global_stats_init();
+
+ MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats", MUTEX_DEFAULT, 0);
+ MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock", MUTEX_DEFAULT, 0);
+#ifdef RX_ENABLE_LOCKS
+#ifdef RX_LOCKS_DB
+ rxdb_init();
+#endif /* RX_LOCKS_DB */
+ MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock", MUTEX_DEFAULT, 0);
+ MUTEX_INIT(&rx_freeCallQueue_lock, "rx_freeCallQueue_lock", MUTEX_DEFAULT,
+ 0);
+ CV_INIT(&rx_waitingForPackets_cv, "rx_waitingForPackets_cv", CV_DEFAULT,
+ 0);
+ MUTEX_INIT(&rx_peerHashTable_lock, "rx_peerHashTable_lock", MUTEX_DEFAULT,
+ 0);
+ MUTEX_INIT(&rx_connHashTable_lock, "rx_connHashTable_lock", MUTEX_DEFAULT,
+ 0);
+ MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
+ MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
+#endif /* RX_ENABLE_LOCKS */
}
pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
#define INIT_PTHREAD_LOCKS \
-assert(pthread_once(&rx_once_init, rxi_InitPthread)==0);
+assert(pthread_once(&rx_once_init, rxi_InitPthread)==0)
/*
* The rx_stats_mutex mutex protects the following global variables:
* rxi_dataQuota
* to manipulate the queue.
*/
-#ifdef RX_ENABLE_LOCKS
+#if defined(RX_ENABLE_LOCKS) && defined(KERNEL)
static afs_kmutex_t rx_rpc_stats;
-void rxi_StartUnlocked();
+void rxi_StartUnlocked(struct rxevent *event, void *call,
+ void *arg1, int istack);
#endif
/* We keep a "last conn pointer" in rxi_FindConnection. The odds are
* rx_epoch
*/
-#define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0);
-#define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0);
+#define LOCK_EPOCH assert(pthread_mutex_lock(&epoch_mutex)==0)
+#define UNLOCK_EPOCH assert(pthread_mutex_unlock(&epoch_mutex)==0)
#else
#define LOCK_EPOCH
#define UNLOCK_EPOCH
void
rx_SetEpoch(afs_uint32 epoch)
{
- LOCK_EPOCH rx_epoch = epoch;
-UNLOCK_EPOCH}
+ LOCK_EPOCH;
+ rx_epoch = epoch;
+ UNLOCK_EPOCH;
+}
/* Initialize rx. A port number may be mentioned, in which case this
* becomes the default port number for any service installed later.
* by the kernel. Whether this will ever overlap anything in
* /etc/services is anybody's guess... Returns 0 on success, -1 on
* error. */
-static int rxinit_status = 1;
+#ifndef AFS_NT40_ENV
+static
+#endif
+int rxinit_status = 1;
#ifdef AFS_PTHREAD_ENV
/*
* This mutex protects the following global variables:
* rxinit_status
*/
-#define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0);
-#define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0);
+#define LOCK_RX_INIT assert(pthread_mutex_lock(&rx_init_mutex)==0)
+#define UNLOCK_RX_INIT assert(pthread_mutex_unlock(&rx_init_mutex)==0)
#else
#define LOCK_RX_INIT
#define UNLOCK_RX_INIT
#endif
-int
+int
rx_InitHost(u_int host, u_int port)
{
#ifdef KERNEL
#endif /* KERNEL */
char *htable, *ptable;
int tmp_status;
-
-#if defined(AFS_DJGPP_ENV) && !defined(DEBUG)
- __djgpp_set_quiet_socket(1);
-#endif
-
+
SPLVAR;
-
- INIT_PTHREAD_LOCKS LOCK_RX_INIT if (rxinit_status == 0) {
+
+ INIT_PTHREAD_LOCKS;
+ LOCK_RX_INIT;
+ if (rxinit_status == 0) {
tmp_status = rxinit_status;
- UNLOCK_RX_INIT return tmp_status; /* Already started; return previous error code. */
+ UNLOCK_RX_INIT;
+ return tmp_status; /* Already started; return previous error code. */
}
+#ifdef RXDEBUG
+ rxi_DebugInit();
+#endif
#ifdef AFS_NT40_ENV
if (afs_winsockInit() < 0)
return -1;
#endif
-
+
#ifndef KERNEL
/*
* Initialize anything necessary to provide a non-premptive threading
*/
rxi_InitializeThreadSupport();
#endif
-
+
/* Allocate and initialize a socket for client and perhaps server
* connections. */
-
+
rx_socket = rxi_GetHostUDPSocket(host, (u_short) port);
if (rx_socket == OSI_NULLSOCKET) {
- UNLOCK_RX_INIT return RX_ADDRINUSE;
+ UNLOCK_RX_INIT;
+ return RX_ADDRINUSE;
}
-#ifdef RX_ENABLE_LOCKS
+#if defined(RX_ENABLE_LOCKS) && defined(KERNEL)
#ifdef RX_LOCKS_DB
rxdb_init();
#endif /* RX_LOCKS_DB */
MUTEX_INIT(&rx_connHashTable_lock, "rx_connHashTable_lock", MUTEX_DEFAULT,
0);
MUTEX_INIT(&rx_serverPool_lock, "rx_serverPool_lock", MUTEX_DEFAULT, 0);
-#ifndef KERNEL
- MUTEX_INIT(&rxi_keyCreate_lock, "rxi_keyCreate_lock", MUTEX_DEFAULT, 0);
-#endif /* !KERNEL */
-#if defined(KERNEL) && defined(AFS_HPUX110_ENV)
+#if defined(AFS_HPUX110_ENV)
if (!uniprocessor)
rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER - 10, "rx_sleepLock");
-#endif /* KERNEL && AFS_HPUX110_ENV */
-#else /* RX_ENABLE_LOCKS */
-#if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV) && !defined(AFS_OBSD_ENV)
- mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
-#endif /* AFS_GLOBAL_SUNLOCK */
-#endif /* RX_ENABLE_LOCKS */
+#endif /* AFS_HPUX110_ENV */
+#endif /* RX_ENABLE_LOCKS && KERNEL */
rxi_nCalls = 0;
rx_connDeadTime = 12;
rx_tranquil = 0; /* reset flag */
- memset((char *)&rx_stats, 0, sizeof(struct rx_stats));
+ memset((char *)&rx_stats, 0, sizeof(struct rx_statistics));
htable = (char *)
osi_Alloc(rx_hashTableSize * sizeof(struct rx_connection *));
PIN(htable, rx_hashTableSize * sizeof(struct rx_connection *)); /* XXXXX */
/* Malloc up a bunch of packets & buffers */
rx_nFreePackets = 0;
- rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */
queue_Init(&rx_freePacketQueue);
rxi_NeedMorePackets = FALSE;
+#ifdef RX_ENABLE_TSFPQ
+ rx_nPackets = 0; /* in TSFPQ version, rx_nPackets is managed by rxi_MorePackets* */
+ rxi_MorePacketsTSFPQ(rx_extraPackets + RX_MAX_QUOTA + 2, RX_TS_FPQ_FLUSH_GLOBAL, 0);
+#else /* RX_ENABLE_TSFPQ */
+ rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */
rxi_MorePackets(rx_nPackets);
+#endif /* RX_ENABLE_TSFPQ */
rx_CheckPackets();
NETPRI;
- AFS_RXGLOCK();
clock_Init();
rx_port = 0;
#else
struct sockaddr_in addr;
- int addrlen = sizeof(addr);
+#ifdef AFS_NT40_ENV
+ int addrlen = sizeof(addr);
+#else
+ socklen_t addrlen = sizeof(addr);
+#endif
if (getsockname((int)rx_socket, (struct sockaddr *)&addr, &addrlen)) {
rx_Finalize();
return -1;
* implementation environment--kernel or user space) */
rxi_StartListener();
- AFS_RXGUNLOCK();
USERPRI;
tmp_status = rxinit_status = 0;
- UNLOCK_RX_INIT return tmp_status;
+ UNLOCK_RX_INIT;
+ return tmp_status;
}
-int rx_Init(u_int port)
+int
+rx_Init(u_int port)
{
return rx_InitHost(htonl(INADDR_ANY), port);
}
}
#endif /* KERNEL */
+#ifdef AFS_NT40_ENV
+/* This routine is only required on Windows */
+void
+rx_StartClientThread(void)
+{
+#ifdef AFS_PTHREAD_ENV
+ pthread_t pid;
+ pid = pthread_self();
+#endif /* AFS_PTHREAD_ENV */
+}
+#endif /* AFS_NT40_ENV */
+
/* This routine must be called if any services are exported. If the
* donateMe flag is set, the calling process is donated to the server
* process pool */
rx_StartServer(int donateMe)
{
register struct rx_service *service;
- register int i, nProcs = 0;
+ register int i;
SPLVAR;
clock_NewTime();
NETPRI;
- AFS_RXGLOCK();
/* Start server processes, if necessary (exact function is dependent
* on the implementation environment--kernel or user space). DonateMe
* will be 1 if there is 1 pre-existing proc, i.e. this one. In this
}
/* Turn on reaping of idle server connections */
- rxi_ReapConnections();
+ rxi_ReapConnections(NULL, NULL, NULL);
- AFS_RXGUNLOCK();
USERPRI;
if (donateMe) {
#ifndef AFS_NT40_ENV
#ifndef KERNEL
char name[32];
+ static int nProcs;
#ifdef AFS_PTHREAD_ENV
pid_t pid;
pid = (pid_t) pthread_self();
(*registerProgram) (pid, name);
#endif /* KERNEL */
#endif /* AFS_NT40_ENV */
- rx_ServerProc(); /* Never returns */
+ rx_ServerProc(NULL); /* Never returns */
}
+#ifdef RX_ENABLE_TSFPQ
+ /* no use leaving packets around in this thread's local queue if
+ * it isn't getting donated to the server thread pool.
+ */
+ rxi_FlushLocalPacketsTSFPQ();
+#endif /* RX_ENABLE_TSFPQ */
return;
}
register struct rx_securityClass *securityObject,
int serviceSecurityIndex)
{
- int hashindex;
- afs_int32 cid;
- register struct rx_connection *conn;
+ int hashindex, i;
+ afs_int32 cid, cix, nclones;
+ register struct rx_connection *conn, *tconn, *ptconn;
SPLVAR;
clock_NewTime();
- dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n", shost, sport, sservice, securityObject, serviceSecurityIndex));
+ dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n", ntohl(shost), ntohs(sport), sservice, securityObject, serviceSecurityIndex));
+
+ conn = tconn = 0;
+ nclones = rx_max_clones_per_connection;
/* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
* the case of kmem_alloc? */
- conn = rxi_AllocConnection();
-#ifdef RX_ENABLE_LOCKS
- MUTEX_INIT(&conn->conn_call_lock, "conn call lock", MUTEX_DEFAULT, 0);
- MUTEX_INIT(&conn->conn_data_lock, "conn call lock", MUTEX_DEFAULT, 0);
- CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
-#endif
+
NETPRI;
- AFS_RXGLOCK();
MUTEX_ENTER(&rx_connHashTable_lock);
- cid = (rx_nextCid += RX_MAXCALLS);
- conn->type = RX_CLIENT_CONNECTION;
- conn->cid = cid;
- conn->epoch = rx_epoch;
- conn->peer = rxi_FindPeer(shost, sport, 0, 1);
- conn->serviceId = sservice;
- conn->securityObject = securityObject;
- /* This doesn't work in all compilers with void (they're buggy), so fake it
- * with VOID */
- conn->securityData = (VOID *) 0;
- conn->securityIndex = serviceSecurityIndex;
- rx_SetConnDeadTime(conn, rx_connDeadTime);
- conn->ackRate = RX_FAST_ACK_RATE;
- conn->nSpecific = 0;
- conn->specific = NULL;
- conn->challengeEvent = NULL;
- conn->delayedAbortEvent = NULL;
- conn->abortCount = 0;
- conn->error = 0;
-
- RXS_NewConnection(securityObject, conn);
- hashindex =
- CONN_HASH(shost, sport, conn->cid, conn->epoch, RX_CLIENT_CONNECTION);
-
- conn->refCount++; /* no lock required since only this thread knows... */
- conn->next = rx_connHashTable[hashindex];
- rx_connHashTable[hashindex] = conn;
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.nClientConns++;
- MUTEX_EXIT(&rx_stats_mutex);
+ /* send in the clones */
+ for(cix = 0; cix <= nclones; ++cix) {
+
+ ptconn = tconn;
+ tconn = rxi_AllocConnection();
+ tconn->type = RX_CLIENT_CONNECTION;
+ tconn->epoch = rx_epoch;
+ tconn->peer = rxi_FindPeer(shost, sport, 0, 1);
+ tconn->serviceId = sservice;
+ tconn->securityObject = securityObject;
+ tconn->securityData = (void *) 0;
+ tconn->securityIndex = serviceSecurityIndex;
+ tconn->ackRate = RX_FAST_ACK_RATE;
+ tconn->nSpecific = 0;
+ tconn->specific = NULL;
+ tconn->challengeEvent = NULL;
+ tconn->delayedAbortEvent = NULL;
+ tconn->abortCount = 0;
+ tconn->error = 0;
+ for (i = 0; i < RX_MAXCALLS; i++) {
+ tconn->twind[i] = rx_initSendWindow;
+ tconn->rwind[i] = rx_initReceiveWindow;
+ }
+ tconn->parent = 0;
+ tconn->next_clone = 0;
+ tconn->nclones = nclones;
+ rx_SetConnDeadTime(tconn, rx_connDeadTime);
+
+ if(cix == 0) {
+ conn = tconn;
+ } else {
+ tconn->flags |= RX_CLONED_CONNECTION;
+ tconn->parent = conn;
+ ptconn->next_clone = tconn;
+ }
+
+ /* generic connection setup */
+#ifdef RX_ENABLE_LOCKS
+ MUTEX_INIT(&tconn->conn_call_lock, "conn call lock", MUTEX_DEFAULT, 0);
+ MUTEX_INIT(&tconn->conn_data_lock, "conn data lock", MUTEX_DEFAULT, 0);
+ CV_INIT(&tconn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
+#endif
+ cid = (rx_nextCid += RX_MAXCALLS);
+ tconn->cid = cid;
+ RXS_NewConnection(securityObject, tconn);
+ hashindex =
+ CONN_HASH(shost, sport, tconn->cid, tconn->epoch,
+ RX_CLIENT_CONNECTION);
+ tconn->refCount++; /* no lock required since only this thread knows */
+ tconn->next = rx_connHashTable[hashindex];
+ rx_connHashTable[hashindex] = tconn;
+ rx_MutexIncrement(rx_stats.nClientConns, rx_stats_mutex);
+ }
+
MUTEX_EXIT(&rx_connHashTable_lock);
- AFS_RXGUNLOCK();
USERPRI;
return conn;
}
void
rx_SetConnDeadTime(register struct rx_connection *conn, register int seconds)
{
- /* The idea is to set the dead time to a value that allows several
- * keepalives to be dropped without timing out the connection. */
- conn->secondsUntilDead = MAX(seconds, 6);
- conn->secondsUntilPing = conn->secondsUntilDead / 6;
+ /* The idea is to set the dead time to a value that allows several
+ * keepalives to be dropped without timing out the connection. */
+ struct rx_connection *tconn;
+ tconn = conn;
+ do {
+ tconn->secondsUntilDead = MAX(seconds, 6);
+ tconn->secondsUntilPing = tconn->secondsUntilDead / 6;
+ } while(tconn->next_clone && (tconn = tconn->next_clone));
}
int rxi_lowPeerRefCount = 0;
* idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
*/
MUTEX_ENTER(&rx_peerHashTable_lock);
- if (--conn->peer->refCount <= 0) {
+ if (conn->peer->refCount < 2) {
conn->peer->idleWhen = clock_Sec();
- if (conn->peer->refCount < 0) {
- conn->peer->refCount = 0;
+ if (conn->peer->refCount < 1) {
+ conn->peer->refCount = 1;
MUTEX_ENTER(&rx_stats_mutex);
rxi_lowPeerRefCount++;
MUTEX_EXIT(&rx_stats_mutex);
}
}
+ conn->peer->refCount--;
MUTEX_EXIT(&rx_peerHashTable_lock);
- MUTEX_ENTER(&rx_stats_mutex);
if (conn->type == RX_SERVER_CONNECTION)
- rx_stats.nServerConns--;
+ rx_MutexDecrement(rx_stats.nServerConns, rx_stats_mutex);
else
- rx_stats.nClientConns--;
- MUTEX_EXIT(&rx_stats_mutex);
-
+ rx_MutexDecrement(rx_stats.nClientConns, rx_stats_mutex);
#ifndef KERNEL
if (conn->specific) {
int i;
void
rxi_DestroyConnection(register struct rx_connection *conn)
{
- MUTEX_ENTER(&rx_connHashTable_lock);
- rxi_DestroyConnectionNoLock(conn);
- /* conn should be at the head of the cleanup list */
- if (conn == rx_connCleanup_list) {
+ register struct rx_connection *tconn, *dtconn;
+
+ MUTEX_ENTER(&rx_connHashTable_lock);
+
+ if(!(conn->flags & RX_CLONED_CONNECTION)) {
+ tconn = conn->next_clone;
+ conn->next_clone = 0; /* once */
+ do {
+ if(tconn) {
+ dtconn = tconn;
+ tconn = tconn->next_clone;
+ rxi_DestroyConnectionNoLock(dtconn);
+ /* destroyed? */
+ if (dtconn == rx_connCleanup_list) {
+ rx_connCleanup_list = rx_connCleanup_list->next;
+ MUTEX_EXIT(&rx_connHashTable_lock);
+ /* rxi_CleanupConnection will free tconn */
+ rxi_CleanupConnection(dtconn);
+ MUTEX_ENTER(&rx_connHashTable_lock);
+ (conn->nclones)--;
+ }
+ }
+ } while(tconn);
+ }
+
+ rxi_DestroyConnectionNoLock(conn);
+ /* conn should be at the head of the cleanup list */
+ if (conn == rx_connCleanup_list) {
rx_connCleanup_list = rx_connCleanup_list->next;
MUTEX_EXIT(&rx_connHashTable_lock);
rxi_CleanupConnection(conn);
- }
+ }
#ifdef RX_ENABLE_LOCKS
- else {
+ else {
MUTEX_EXIT(&rx_connHashTable_lock);
- }
+ }
#endif /* RX_ENABLE_LOCKS */
}
SPLVAR;
NETPRI;
- AFS_RXGLOCK();
rxi_DestroyConnection(conn);
- AFS_RXGUNLOCK();
USERPRI;
}
+void
+rx_GetConnection(register struct rx_connection *conn)
+{
+ SPLVAR;
+
+ NETPRI;
+ MUTEX_ENTER(&conn->conn_data_lock);
+ conn->refCount++;
+ MUTEX_EXIT(&conn->conn_data_lock);
+ USERPRI;
+}
+
+#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+/* Wait for the transmit queue to no longer be busy.
+ * requires the call->lock to be held */
+static void rxi_WaitforTQBusy(struct rx_call *call) {
+ while (call->flags & RX_CALL_TQ_BUSY) {
+ call->flags |= RX_CALL_TQ_WAIT;
+ call->tqWaiters++;
+#ifdef RX_ENABLE_LOCKS
+ osirx_AssertMine(&call->lock, "rxi_WaitforTQ lock");
+ CV_WAIT(&call->cv_tq, &call->lock);
+#else /* RX_ENABLE_LOCKS */
+ osi_rxSleep(&call->tq);
+#endif /* RX_ENABLE_LOCKS */
+ call->tqWaiters--;
+ if (call->tqWaiters == 0) {
+ call->flags &= ~RX_CALL_TQ_WAIT;
+ }
+ }
+}
+#endif
+
/* Start a new rx remote procedure call, on the specified connection.
* If wait is set to 1, wait for a free call channel; otherwise return
* 0. Maxtime gives the maximum number of seconds this call may take,
- * after rx_MakeCall returns. After this time interval, a call to any
+ * after rx_NewCall returns. After this time interval, a call to any
* of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
* For fine grain locking, we hold the conn_call_lock in order to
* to ensure that we don't get signalle after we found a call in an active
{
register int i;
register struct rx_call *call;
+ register struct rx_connection *tconn;
struct clock queueTime;
SPLVAR;
clock_NewTime();
- dpf(("rx_MakeCall(conn %x)\n", conn));
+ dpf(("rx_NewCall(conn %x)\n", conn));
NETPRI;
clock_GetTime(&queueTime);
- AFS_RXGLOCK();
MUTEX_ENTER(&conn->conn_call_lock);
/*
* If so, let them go first to avoid starving them.
* This is a fairly simple scheme, and might not be
* a complete solution for large numbers of waiters.
+ *
+ * makeCallWaiters keeps track of the number of
+ * threads waiting to make calls and the
+ * RX_CONN_MAKECALL_WAITING flag bit is used to
+ * indicate that there are indeed calls waiting.
+ * The flag is set when the waiter is incremented.
+ * It is only cleared in rx_EndCall when
+ * makeCallWaiters is 0. This prevents us from
+ * accidently destroying the connection while it
+ * is potentially about to be used.
*/
+ MUTEX_ENTER(&conn->conn_data_lock);
if (conn->makeCallWaiters) {
+ conn->flags |= RX_CONN_MAKECALL_WAITING;
+ conn->makeCallWaiters++;
+ MUTEX_EXIT(&conn->conn_data_lock);
+
#ifdef RX_ENABLE_LOCKS
- CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
+ CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
#else
- osi_rxSleep(conn);
+ osi_rxSleep(conn);
#endif
- }
+ MUTEX_ENTER(&conn->conn_data_lock);
+ conn->makeCallWaiters--;
+ }
+ MUTEX_EXIT(&conn->conn_data_lock);
+ /* search for next free call on this connection or
+ * its clones, if any */
for (;;) {
- for (i = 0; i < RX_MAXCALLS; i++) {
- call = conn->call[i];
- if (call) {
- MUTEX_ENTER(&call->lock);
- if (call->state == RX_STATE_DALLY) {
- rxi_ResetCall(call, 0);
- (*call->callNumber)++;
- break;
+ tconn = conn;
+ do {
+ for (i = 0; i < RX_MAXCALLS; i++) {
+ call = tconn->call[i];
+ if (call) {
+ MUTEX_ENTER(&call->lock);
+ if (call->state == RX_STATE_DALLY) {
+ rxi_ResetCall(call, 0);
+ (*call->callNumber)++;
+ goto f_call;
+ }
+ MUTEX_EXIT(&call->lock);
+ } else {
+ call = rxi_NewCall(tconn, i);
+ goto f_call;
+ }
+ } /* for i < RX_MAXCALLS */
+ } while (tconn->next_clone && (tconn = tconn->next_clone));
+
+ f_call:
+
+ if (i < RX_MAXCALLS) {
+ break;
}
- MUTEX_EXIT(&call->lock);
- } else {
- call = rxi_NewCall(conn, i);
- break;
- }
- }
- if (i < RX_MAXCALLS) {
- break;
- }
- MUTEX_ENTER(&conn->conn_data_lock);
- conn->flags |= RX_CONN_MAKECALL_WAITING;
- MUTEX_EXIT(&conn->conn_data_lock);
- conn->makeCallWaiters++;
+ /* to be here, all available calls for this connection (and all
+ * its clones) must be in use */
+
+ MUTEX_ENTER(&conn->conn_data_lock);
+ conn->flags |= RX_CONN_MAKECALL_WAITING;
+ conn->makeCallWaiters++;
+ MUTEX_EXIT(&conn->conn_data_lock);
+
#ifdef RX_ENABLE_LOCKS
- CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
+ CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
#else
- osi_rxSleep(conn);
+ osi_rxSleep(conn);
#endif
- conn->makeCallWaiters--;
- }
+ MUTEX_ENTER(&conn->conn_data_lock);
+ conn->makeCallWaiters--;
+ MUTEX_EXIT(&conn->conn_data_lock);
+ } /* for ;; */
/*
* Wake up anyone else who might be giving us a chance to
* run (see code above that avoids resource starvation).
/* Client is initially in send mode */
call->state = RX_STATE_ACTIVE;
- call->mode = RX_MODE_SENDING;
-
+ call->error = conn->error;
+ if (call->error)
+ call->mode = RX_MODE_ERROR;
+ else
+ call->mode = RX_MODE_SENDING;
+
/* remember start time for call in case we have hard dead time limit */
call->queueTime = queueTime;
clock_GetTime(&call->startTime);
MUTEX_EXIT(&call->lock);
MUTEX_EXIT(&conn->conn_call_lock);
- AFS_RXGUNLOCK();
USERPRI;
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
/* Now, if TQ wasn't cleared earlier, do it now. */
- AFS_RXGLOCK();
MUTEX_ENTER(&call->lock);
- while (call->flags & RX_CALL_TQ_BUSY) {
- call->flags |= RX_CALL_TQ_WAIT;
-#ifdef RX_ENABLE_LOCKS
- CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
- osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
- }
+ rxi_WaitforTQBusy(call);
if (call->flags & RX_CALL_TQ_CLEARME) {
- rxi_ClearTransmitQueue(call, 0);
+ rxi_ClearTransmitQueue(call, 1);
queue_Init(&call->tq);
}
MUTEX_EXIT(&call->lock);
- AFS_RXGUNLOCK();
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+ dpf(("rx_NewCall(call %x)\n", call));
return call;
}
service name might be used for probing for
statistics) */
struct rx_service *
-rx_NewService(u_short port, u_short serviceId, char *serviceName,
- struct rx_securityClass **securityObjects, int nSecurityObjects,
- afs_int32(*serviceProc) (struct rx_call * acall))
+rx_NewServiceHost(afs_uint32 host, u_short port, u_short serviceId,
+ char *serviceName, struct rx_securityClass **securityObjects,
+ int nSecurityObjects,
+ afs_int32(*serviceProc) (struct rx_call * acall))
{
osi_socket socket = OSI_NULLSOCKET;
register struct rx_service *tservice;
tservice = rxi_AllocService();
NETPRI;
- AFS_RXGLOCK();
for (i = 0; i < RX_MAX_SERVICES; i++) {
register struct rx_service *service = rx_services[i];
if (service) {
- if (port == service->servicePort) {
+ if (port == service->servicePort && host == service->serviceHost) {
if (service->serviceId == serviceId) {
/* The identical service has already been
* installed; if the caller was intending to
(osi_Msg
"rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n",
serviceName, serviceId, service->serviceName);
- AFS_RXGUNLOCK();
USERPRI;
rxi_FreeService(tservice);
return service;
* service on same port) get a new one */
socket = rxi_GetHostUDPSocket(htonl(INADDR_ANY), port);
if (socket == OSI_NULLSOCKET) {
- AFS_RXGUNLOCK();
USERPRI;
rxi_FreeService(tservice);
return 0;
}
service = tservice;
service->socket = socket;
+ service->serviceHost = host;
service->servicePort = port;
service->serviceId = serviceId;
service->serviceName = serviceName;
service->minProcs = 0;
service->maxProcs = 1;
service->idleDeadTime = 60;
+ service->idleDeadErr = 0;
service->connDeadTime = rx_connDeadTime;
service->executeRequestProc = serviceProc;
service->checkReach = 0;
rx_services[i] = service; /* not visible until now */
- AFS_RXGUNLOCK();
USERPRI;
return service;
}
}
- AFS_RXGUNLOCK();
USERPRI;
rxi_FreeService(tservice);
(osi_Msg "rx_NewService: cannot support > %d services\n",
return 0;
}
+/* Set configuration options for all of a service's security objects */
+
+afs_int32
+rx_SetSecurityConfiguration(struct rx_service *service,
+ rx_securityConfigVariables type,
+ void *value)
+{
+ int i;
+ for (i = 0; i<service->nSecurityObjects; i++) {
+ if (service->securityObjects[i]) {
+ RXS_SetConfiguration(service->securityObjects[i], NULL, type,
+ value, NULL);
+ }
+ }
+ return 0;
+}
+
+struct rx_service *
+rx_NewService(u_short port, u_short serviceId, char *serviceName,
+ struct rx_securityClass **securityObjects, int nSecurityObjects,
+ afs_int32(*serviceProc) (struct rx_call * acall))
+{
+ return rx_NewServiceHost(htonl(INADDR_ANY), port, serviceId, serviceName, securityObjects, nSecurityObjects, serviceProc);
+}
+
/* Generic request processing loop. This routine should be called
* by the implementation dependent rx_ServerProc. If socketp is
* non-null, it will be set to the file descriptor that this thread
SPLVAR;
NETPRI;
- AFS_RXGLOCK();
MUTEX_ENTER(&call->lock);
rxi_CallError(call, RX_RESTARTING);
rxi_SendCallAbort(call, (struct rx_packet *)0, 0, 0);
MUTEX_EXIT(&call->lock);
- AFS_RXGUNLOCK();
USERPRI;
}
#ifdef KERNEL
SPLVAR;
NETPRI;
- AFS_RXGLOCK();
MUTEX_ENTER(&rx_serverPool_lock);
#ifdef RX_ENABLE_LOCKS
#endif /* RX_ENABLE_LOCKS */
}
MUTEX_EXIT(&rx_serverPool_lock);
- AFS_RXGUNLOCK();
USERPRI;
}
SPLVAR;
NETPRI;
- AFS_RXGLOCK();
MUTEX_ENTER(&freeSQEList_lock);
if ((sq = rx_FreeSQEList)) {
osi_rxSleep(sq);
#ifdef KERNEL
if (afs_termState == AFSOP_STOP_RXCALLBACK) {
- AFS_RXGUNLOCK();
USERPRI;
rxi_Free(sq, sizeof(struct rx_serverQueueEntry));
return (struct rx_call *)0;
dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
}
- AFS_RXGUNLOCK();
USERPRI;
return call;
*/
void
rx_SetArrivalProc(register struct rx_call *call,
- register VOID(*proc) (register struct rx_call * call,
- register struct multi_handle * mh,
+ register void (*proc) (register struct rx_call * call,
+ register void * mh,
register int index),
- register VOID * handle, register VOID * arg)
+ register void * handle, register int arg)
{
call->arrivalProc = proc;
call->arrivalProcHandle = handle;
{
register struct rx_connection *conn = call->conn;
register struct rx_service *service;
- register struct rx_packet *tp; /* Temporary packet pointer */
- register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
afs_int32 error;
SPLVAR;
- dpf(("rx_EndCall(call %x)\n", call));
+
+
+ dpf(("rx_EndCall(call %x rc %d error %d abortCode %d)\n", call, rc, call->error, call->abortCode));
NETPRI;
- AFS_RXGLOCK();
MUTEX_ENTER(&call->lock);
if (rc == 0 && call->error == 0) {
call->abortCount = 0;
}
- call->arrivalProc = (VOID(*)())0;
+ call->arrivalProc = (void (*)())0;
if (rc && call->error == 0) {
rxi_CallError(call, rc);
/* Send an abort message to the peer if this error code has
* rx_NewCall is in a stable state. Otherwise, rx_NewCall may
* have checked this call, found it active and by the time it
* goes to sleep, will have missed the signal.
+ *
+ * Do not clear the RX_CONN_MAKECALL_WAITING flag as long as
+ * there are threads waiting to use the conn object.
*/
MUTEX_EXIT(&call->lock);
MUTEX_ENTER(&conn->conn_call_lock);
MUTEX_ENTER(&conn->conn_data_lock);
conn->flags |= RX_CONN_BUSY;
if (conn->flags & RX_CONN_MAKECALL_WAITING) {
- conn->flags &= (~RX_CONN_MAKECALL_WAITING);
+ if (conn->makeCallWaiters == 0)
+ conn->flags &= (~RX_CONN_MAKECALL_WAITING);
MUTEX_EXIT(&conn->conn_data_lock);
#ifdef RX_ENABLE_LOCKS
CV_BROADCAST(&conn->conn_call_cv);
* kernel version, and may interrupt the macros rx_Read or
* rx_Write, which run at normal priority for efficiency. */
if (call->currentPacket) {
+ call->currentPacket->flags &= ~RX_PKTFLAG_CP;
rxi_FreePacket(call->currentPacket);
call->currentPacket = (struct rx_packet *)0;
- call->nLeft = call->nFree = call->curlen = 0;
- } else
- call->nLeft = call->nFree = call->curlen = 0;
+ }
+
+ call->nLeft = call->nFree = call->curlen = 0;
/* Free any packets from the last call to ReadvProc/WritevProc */
- for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
- queue_Remove(tp);
- rxi_FreePacket(tp);
- }
+ rxi_FreePackets(0, &call->iovq);
CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
MUTEX_EXIT(&call->lock);
MUTEX_EXIT(&conn->conn_call_lock);
conn->flags &= ~RX_CONN_BUSY;
}
- AFS_RXGUNLOCK();
USERPRI;
/*
* Map errors to the local host's errno.h format.
{
register struct rx_connection **conn_ptr, **conn_end;
- INIT_PTHREAD_LOCKS LOCK_RX_INIT if (rxinit_status == 1) {
- UNLOCK_RX_INIT return; /* Already shutdown. */
+ INIT_PTHREAD_LOCKS;
+ LOCK_RX_INIT;
+ if (rxinit_status == 1) {
+ UNLOCK_RX_INIT;
+ return; /* Already shutdown. */
}
rxi_DeleteCachedConnections();
if (rx_connHashTable) {
}
rxi_flushtrace();
+#ifdef AFS_NT40_ENV
+ afs_winsockCleanup();
+#endif
+
rxinit_status = 1;
-UNLOCK_RX_INIT}
+ UNLOCK_RX_INIT;
+}
#endif
/* if we wakeup packet waiter too often, can get in loop with two
register struct rx_call *nxp; /* Next call pointer, for queue_Scan */
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+ dpf(("rxi_NewCall(conn %x, channel %d)\n", conn, channel));
+
/* Grab an existing call structure, or allocate a new one.
* Existing call structures are assumed to have been left reset by
* rxi_FreeCall */
call = queue_First(&rx_freeCallQueue, rx_call);
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
queue_Remove(call);
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.nFreeCallStructs--;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexDecrement(rx_stats.nFreeCallStructs, rx_stats_mutex);
MUTEX_EXIT(&rx_freeCallQueue_lock);
MUTEX_ENTER(&call->lock);
CLEAR_CALL_QUEUE_LOCK(call);
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
/* Now, if TQ wasn't cleared earlier, do it now. */
if (call->flags & RX_CALL_TQ_CLEARME) {
- rxi_ClearTransmitQueue(call, 0);
+ rxi_ClearTransmitQueue(call, 1);
queue_Init(&call->tq);
}
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
call->conn = conn;
rxi_ResetCall(call, 1);
} else {
+
call = (struct rx_call *)rxi_Alloc(sizeof(struct rx_call));
MUTEX_EXIT(&rx_freeCallQueue_lock);
CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.nCallStructs++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.nFreeCallStructs, rx_stats_mutex);
/* Initialize once-only items */
queue_Init(&call->tq);
queue_Init(&call->rq);
}
call->channel = channel;
call->callNumber = &conn->callNumber[channel];
+ call->rwind = conn->rwind[channel];
+ call->twind = conn->twind[channel];
/* Note that the next expected call number is retained (in
* conn->callNumber[i]), even if we reallocate the call structure
*/
#else /* AFS_GLOBAL_RXLOCK_KERNEL */
queue_Append(&rx_freeCallQueue, call);
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.nFreeCallStructs++;
- MUTEX_EXIT(&rx_stats_mutex);
-
+ rx_MutexIncrement(rx_stats.nFreeCallStructs, rx_stats_mutex);
MUTEX_EXIT(&rx_freeCallQueue_lock);
/* Destroy the connection if it was previously slated for
* If someone else destroys a connection, they either have no
* call lock held or are going through this section of code.
*/
- if (conn->flags & RX_CONN_DESTROY_ME) {
+ if (conn->flags & RX_CONN_DESTROY_ME && !(conn->flags & RX_CONN_MAKECALL_WAITING)) {
MUTEX_ENTER(&conn->conn_data_lock);
conn->refCount++;
MUTEX_EXIT(&conn->conn_data_lock);
{
register char *p;
-#if defined(AFS_AIX41_ENV) && defined(KERNEL)
- /* Grab the AFS filesystem lock. See afs/osi.h for the lock
- * implementation.
- */
- int glockOwner = ISAFS_GLOCK();
- if (!glockOwner)
- AFS_GLOCK();
-#endif
- MUTEX_ENTER(&rx_stats_mutex);
- rxi_Alloccnt++;
- rxi_Allocsize += size;
- MUTEX_EXIT(&rx_stats_mutex);
-#if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
- if (size > AFS_SMALLOCSIZ) {
- p = (char *)osi_AllocMediumSpace(size);
- } else
- p = (char *)osi_AllocSmall(size, 1);
-#if defined(AFS_AIX41_ENV) && defined(KERNEL)
- if (!glockOwner)
- AFS_GUNLOCK();
-#endif
+ rx_MutexAdd1Increment2(rxi_Allocsize, (afs_int32)size, rxi_Alloccnt, rx_stats_mutex);
+
+p = (char *)
+#if defined(KERNEL) && !defined(UKERNEL) && defined(AFS_FBSD80_ENV)
+ afs_osi_Alloc_NoSleep(size);
#else
- p = (char *)osi_Alloc(size);
+ osi_Alloc(size);
#endif
if (!p)
osi_Panic("rxi_Alloc error");
void
rxi_Free(void *addr, register size_t size)
{
-#if defined(AFS_AIX41_ENV) && defined(KERNEL)
- /* Grab the AFS filesystem lock. See afs/osi.h for the lock
- * implementation.
- */
- int glockOwner = ISAFS_GLOCK();
- if (!glockOwner)
- AFS_GLOCK();
-#endif
- MUTEX_ENTER(&rx_stats_mutex);
- rxi_Alloccnt--;
- rxi_Allocsize -= size;
- MUTEX_EXIT(&rx_stats_mutex);
-#if (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
- if (size > AFS_SMALLOCSIZ)
- osi_FreeMediumSpace(addr);
- else
- osi_FreeSmall(addr);
-#if defined(AFS_AIX41_ENV) && defined(KERNEL)
- if (!glockOwner)
- AFS_GUNLOCK();
-#endif
-#else
+ rx_MutexAdd1Decrement2(rxi_Allocsize, -(afs_int32)size, rxi_Alloccnt, rx_stats_mutex);
osi_Free(addr, size);
-#endif
+}
+
+void
+rxi_SetPeerMtu(register afs_uint32 host, register afs_uint32 port, int mtu)
+{
+ struct rx_peer **peer_ptr, **peer_end;
+ int hashIndex;
+
+ MUTEX_ENTER(&rx_peerHashTable_lock);
+ if (port == 0) {
+ for (peer_ptr = &rx_peerHashTable[0], peer_end =
+ &rx_peerHashTable[rx_hashTableSize]; peer_ptr < peer_end;
+ peer_ptr++) {
+ struct rx_peer *peer, *next;
+ for (peer = *peer_ptr; peer; peer = next) {
+ next = peer->next;
+ if (host == peer->host) {
+ MUTEX_ENTER(&peer->peer_lock);
+ peer->ifMTU=MIN(mtu, peer->ifMTU);
+ peer->natMTU = rxi_AdjustIfMTU(peer->ifMTU);
+ MUTEX_EXIT(&peer->peer_lock);
+ }
+ }
+ }
+ } else {
+ struct rx_peer *peer;
+ hashIndex = PEER_HASH(host, port);
+ for (peer = rx_peerHashTable[hashIndex]; peer; peer = peer->next) {
+ if ((peer->host == host) && (peer->port == port)) {
+ MUTEX_ENTER(&peer->peer_lock);
+ peer->ifMTU=MIN(mtu, peer->ifMTU);
+ peer->natMTU = rxi_AdjustIfMTU(peer->ifMTU);
+ MUTEX_EXIT(&peer->peer_lock);
+ }
+ }
+ }
+ MUTEX_EXIT(&rx_peerHashTable_lock);
}
/* Find the peer process represented by the supplied (host,port)
* structure hanging off a connection structure */
struct rx_peer *
rxi_FindPeer(register afs_uint32 host, register u_short port,
- struct rx_peer *origPeer, int create)
+ struct rx_peer *origPeer, int create)
{
register struct rx_peer *pp;
int hashIndex;
MUTEX_ENTER(&rx_peerHashTable_lock);
for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
if ((pp->host == host) && (pp->port == port))
- break;
+ break;
}
if (!pp) {
- if (create) {
- pp = rxi_AllocPeer(); /* This bzero's *pp */
- pp->host = host; /* set here or in InitPeerParams is zero */
+ if (create) {
+ pp = rxi_AllocPeer(); /* This bzero's *pp */
+ pp->host = host; /* set here or in InitPeerParams is zero */
pp->port = port;
MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
queue_Init(&pp->congestionQueue);
pp->next = rx_peerHashTable[hashIndex];
rx_peerHashTable[hashIndex] = pp;
rxi_InitPeerParams(pp);
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.nPeerStructs++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.nPeerStructs, rx_stats_mutex);
}
}
if (pp && create) {
register u_short port, u_short serviceId, afs_uint32 cid,
afs_uint32 epoch, int type, u_int securityIndex)
{
- int hashindex, flag;
+ int hashindex, flag, i;
register struct rx_connection *conn;
hashindex = CONN_HASH(host, port, cid, epoch, type);
MUTEX_ENTER(&rx_connHashTable_lock);
if (type == RX_CLIENT_CONNECTION && pp->port == port)
break;
/* So what happens when it's a callback connection? */
- if (/*type == RX_CLIENT_CONNECTION &&*/ (conn->epoch & 0x80000000))
+ if ( /*type == RX_CLIENT_CONNECTION && */
+ (conn->epoch & 0x80000000))
break;
}
if (!flag) {
conn->specific = NULL;
rx_SetConnDeadTime(conn, service->connDeadTime);
rx_SetConnIdleDeadTime(conn, service->idleDeadTime);
+ rx_SetServerConnIdleDeadErr(conn, service->idleDeadErr);
+ for (i = 0; i < RX_MAXCALLS; i++) {
+ conn->twind[i] = rx_initSendWindow;
+ conn->rwind[i] = rx_initReceiveWindow;
+ }
/* Notify security object of the new connection */
RXS_NewConnection(conn->securityObject, conn);
/* XXXX Connection timeout? */
if (service->newConnProc)
(*service->newConnProc) (conn);
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.nServerConns++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.nServerConns, rx_stats_mutex);
}
MUTEX_ENTER(&conn->conn_data_lock);
* containing the network address. Both can be modified. The return value, if
* non-zero, indicates that the packet should be dropped. */
-int (*rx_justReceived) () = 0;
-int (*rx_almostSent) () = 0;
+int (*rx_justReceived) (struct rx_packet *, struct sockaddr_in *) = 0;
+int (*rx_almostSent) (struct rx_packet *, struct sockaddr_in *) = 0;
/* A packet has been received off the interface. Np is the packet, socket is
* the socket number it was received from (useful in determining which service
packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
? rx_packetTypes[np->header.type - 1] : "*UNKNOWN*";
dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
- np->header.serial, packetType, host, port, np->header.serviceId,
+ np->header.serial, packetType, ntohl(host), ntohs(port), np->header.serviceId,
np->header.epoch, np->header.cid, np->header.callNumber,
np->header.seq, np->header.flags, np));
#endif
/* Check for connection-only requests (i.e. not call specific). */
if (np->header.callNumber == 0) {
switch (np->header.type) {
- case RX_PACKET_TYPE_ABORT:
+ case RX_PACKET_TYPE_ABORT: {
/* What if the supplied error is zero? */
- rxi_ConnectionError(conn, ntohl(rx_GetInt32(np, 0)));
+ afs_int32 errcode = ntohl(rx_GetInt32(np, 0));
+ dpf(("rxi_ReceivePacket ABORT rx_GetInt32 = %d", errcode));
+ rxi_ConnectionError(conn, errcode);
MUTEX_ENTER(&conn->conn_data_lock);
conn->refCount--;
MUTEX_EXIT(&conn->conn_data_lock);
return np;
+ }
case RX_PACKET_TYPE_CHALLENGE:
tnp = rxi_ReceiveChallengePacket(conn, np, 1);
MUTEX_ENTER(&conn->conn_data_lock);
* then, since this is a client connection we're getting data for
* it must be for the previous call.
*/
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.spuriousPacketsRead++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.spuriousPacketsRead, rx_stats_mutex);
MUTEX_ENTER(&conn->conn_data_lock);
conn->refCount--;
MUTEX_EXIT(&conn->conn_data_lock);
if (type == RX_SERVER_CONNECTION) { /* We're the server */
if (np->header.callNumber < currentCallNumber) {
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.spuriousPacketsRead++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.spuriousPacketsRead, rx_stats_mutex);
#ifdef RX_ENABLE_LOCKS
if (call)
MUTEX_EXIT(&call->lock);
call = rxi_NewCall(conn, channel);
MUTEX_EXIT(&conn->conn_call_lock);
*call->callNumber = np->header.callNumber;
+ if (np->header.callNumber == 0)
+ dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %lx resend %d.%0.3d len %d", np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port), np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq, np->header.flags, (unsigned long)np, np->retryTime.sec, np->retryTime.usec / 1000, np->length));
+
call->state = RX_STATE_PRECALL;
clock_GetTime(&call->queueTime);
hzero(call->bytesSent);
hzero(call->bytesRcvd);
+ /*
+ * If the number of queued calls exceeds the overload
+ * threshold then abort this call.
+ */
+ if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
+ struct rx_packet *tp;
+
+ rxi_CallError(call, rx_BusyError);
+ tp = rxi_SendCallAbort(call, np, 1, 0);
+ MUTEX_EXIT(&call->lock);
+ MUTEX_ENTER(&conn->conn_data_lock);
+ conn->refCount--;
+ MUTEX_EXIT(&conn->conn_data_lock);
+ rx_MutexIncrement(rx_stats.nBusies, rx_stats_mutex);
+ return tp;
+ }
rxi_KeepAliveOn(call);
} else if (np->header.callNumber != currentCallNumber) {
/* Wait until the transmit queue is idle before deciding
while ((call->state == RX_STATE_ACTIVE)
&& (call->flags & RX_CALL_TQ_BUSY)) {
call->flags |= RX_CALL_TQ_WAIT;
+ call->tqWaiters++;
#ifdef RX_ENABLE_LOCKS
+ osirx_AssertMine(&call->lock, "rxi_Start lock3");
CV_WAIT(&call->cv_tq, &call->lock);
#else /* RX_ENABLE_LOCKS */
osi_rxSleep(&call->tq);
#endif /* RX_ENABLE_LOCKS */
+ call->tqWaiters--;
+ if (call->tqWaiters == 0)
+ call->flags &= ~RX_CALL_TQ_WAIT;
}
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
/* If the new call cannot be taken right now send a busy and set
}
rxi_ResetCall(call, 0);
*call->callNumber = np->header.callNumber;
+ if (np->header.callNumber == 0)
+ dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %lx resend %d.%0.3d len %d", np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port), np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq, np->header.flags, (unsigned long)np, np->retryTime.sec, np->retryTime.usec / 1000, np->length));
+
call->state = RX_STATE_PRECALL;
clock_GetTime(&call->queueTime);
hzero(call->bytesSent);
MUTEX_ENTER(&conn->conn_data_lock);
conn->refCount--;
MUTEX_EXIT(&conn->conn_data_lock);
+ rx_MutexIncrement(rx_stats.nBusies, rx_stats_mutex);
return tp;
}
rxi_KeepAliveOn(call);
/* Ignore all incoming acknowledgements for calls in DALLY state */
if (call && (call->state == RX_STATE_DALLY)
&& (np->header.type == RX_PACKET_TYPE_ACK)) {
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.ignorePacketDally++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.ignorePacketDally, rx_stats_mutex);
#ifdef RX_ENABLE_LOCKS
if (call) {
MUTEX_EXIT(&call->lock);
/* Ignore anything that's not relevant to the current call. If there
* isn't a current call, then no packet is relevant. */
if (!call || (np->header.callNumber != currentCallNumber)) {
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.spuriousPacketsRead++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.spuriousPacketsRead, rx_stats_mutex);
#ifdef RX_ENABLE_LOCKS
if (call) {
MUTEX_EXIT(&call->lock);
* XXX interact badly with the server-restart detection
* XXX code in receiveackpacket. */
if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.spuriousPacketsRead++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.spuriousPacketsRead, rx_stats_mutex);
MUTEX_EXIT(&call->lock);
MUTEX_ENTER(&conn->conn_data_lock);
conn->refCount--;
}
np = rxi_ReceiveAckPacket(call, np, 1);
break;
- case RX_PACKET_TYPE_ABORT:
- /* An abort packet: reset the connection, passing the error up to
- * the user */
+ case RX_PACKET_TYPE_ABORT: {
+ /* An abort packet: reset the call, passing the error up to the user. */
/* What if error is zero? */
- rxi_CallError(call, ntohl(*(afs_int32 *) rx_DataOf(np)));
- break;
+ /* What if the error is -1? the application will treat it as a timeout. */
+ afs_int32 errdata = ntohl(*(afs_int32 *) rx_DataOf(np));
+ dpf(("rxi_ReceivePacket ABORT rx_DataOf = %d", errdata));
+ rxi_CallError(call, errdata);
+ MUTEX_EXIT(&call->lock);
+ MUTEX_ENTER(&conn->conn_data_lock);
+ conn->refCount--;
+ MUTEX_EXIT(&conn->conn_data_lock);
+ return np; /* xmitting; drop packet */
+ }
case RX_PACKET_TYPE_BUSY:
/* XXXX */
break;
rxi_SetAcksInTransmitQueue(call);
break;
#else /* RX_ENABLE_LOCKS */
+ MUTEX_EXIT(&call->lock);
+ MUTEX_ENTER(&conn->conn_data_lock);
conn->refCount--;
+ MUTEX_EXIT(&conn->conn_data_lock);
return np; /* xmitting; drop packet */
#endif /* RX_ENABLE_LOCKS */
}
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
rxi_ClearTransmitQueue(call, 0);
+ rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
break;
default:
/* Should not reach here, unless the peer is broken: send an abort
#endif /* KERNEL */
static void
-rxi_CheckReachEvent(struct rxevent *event, struct rx_connection *conn,
- struct rx_call *acall)
+rxi_CheckReachEvent(struct rxevent *event, void *arg1, void *arg2)
{
+ struct rx_connection *conn = arg1;
+ struct rx_call *acall = arg2;
struct rx_call *call = acall;
- struct clock when;
+ struct clock when, now;
int i, waiting;
MUTEX_ENTER(&conn->conn_data_lock);
if (call != acall)
MUTEX_EXIT(&call->lock);
- clock_GetTime(&when);
+ clock_GetTime(&now);
+ when = now;
when.sec += RX_CHECKREACH_TIMEOUT;
MUTEX_ENTER(&conn->conn_data_lock);
if (!conn->checkReachEvent) {
conn->refCount++;
conn->checkReachEvent =
- rxevent_Post(&when, rxi_CheckReachEvent, conn, NULL);
+ rxevent_PostNow(&when, &now, rxi_CheckReachEvent, conn,
+ NULL);
}
MUTEX_EXIT(&conn->conn_data_lock);
}
afs_uint32 seq, serial, flags;
int isFirst;
struct rx_packet *tnp;
- struct clock when;
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.dataPacketsRead++;
- MUTEX_EXIT(&rx_stats_mutex);
+ struct clock when, now;
+ rx_MutexIncrement(rx_stats.dataPacketsRead, rx_stats_mutex);
#ifdef KERNEL
/* If there are no packet buffers, drop this new packet, unless we can find
MUTEX_ENTER(&rx_freePktQ_lock);
rxi_NeedMorePackets = TRUE;
MUTEX_EXIT(&rx_freePktQ_lock);
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.noPacketBuffersOnRead++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.noPacketBuffersOnRead, rx_stats_mutex);
call->rprev = np->header.serial;
rxi_calltrace(RX_TRACE_DROP, call);
dpf(("packet %x dropped on receipt - quota problems", np));
if (rxi_doreclaim)
rxi_ClearReceiveQueue(call);
- clock_GetTime(&when);
+ clock_GetTime(&now);
+ when = now;
clock_Add(&when, &rx_softAckDelay);
if (!call->delayedAckEvent
|| clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
RX_CALL_REFCOUNT_DELAY);
CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
call->delayedAckEvent =
- rxevent_Post(&when, rxi_SendDelayedAck, call, 0);
+ rxevent_PostNow(&when, &now, rxi_SendDelayedAck, call, 0);
}
/* we've damaged this call already, might as well do it in. */
return np;
/* Check to make sure it is not a duplicate of one already queued */
if (queue_IsNotEmpty(&call->rq)
&& queue_First(&call->rq, rx_packet)->header.seq == seq) {
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.dupPacketsRead++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.dupPacketsRead, rx_stats_mutex);
dpf(("packet %x dropped on receipt - duplicate", np));
rxevent_Cancel(call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
/* It's the next packet. Stick it on the receive queue
* for this call. Set newPackets to make sure we wake
* the reader once all packets have been processed */
+ np->flags |= RX_PKTFLAG_RQ;
queue_Prepend(&call->rq, np);
call->nSoftAcks++;
np = NULL; /* We can't use this anymore */
* (e.g. multi rx) */
if (call->arrivalProc) {
(*call->arrivalProc) (call, call->arrivalProcHandle,
- (int)call->arrivalProcArg);
- call->arrivalProc = (VOID(*)())0;
+ call->arrivalProcArg);
+ call->arrivalProc = (void (*)())0;
}
/* Update last packet received */
/* If the new packet's sequence number has been sent to the
* application already, then this is a duplicate */
if (seq < call->rnext) {
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.dupPacketsRead++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.dupPacketsRead, rx_stats_mutex);
rxevent_Cancel(call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack);
0, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
/*Check for duplicate packet */
if (seq == tp->header.seq) {
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.dupPacketsRead++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.dupPacketsRead, rx_stats_mutex);
rxevent_Cancel(call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE,
/* We need to send an ack of the packet is out of sequence,
* or if an ack was requested by the peer. */
- if (seq != prev + 1 || missing || (flags & RX_REQUEST_ACK)) {
+ if (seq != prev + 1 || missing) {
ackNeeded = RX_ACK_OUT_OF_SEQUENCE;
- }
+ } else if (flags & RX_REQUEST_ACK) {
+ ackNeeded = RX_ACK_REQUESTED;
+ }
/* Acknowledge the last packet for each call */
if (flags & RX_LAST_PACKET) {
rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
np = rxi_SendAck(call, np, serial, RX_ACK_IDLE, istack);
} else if (call->nSoftAcks) {
- clock_GetTime(&when);
+ clock_GetTime(&now);
+ when = now;
if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
clock_Add(&when, &rx_lastAckDelay);
} else {
RX_CALL_REFCOUNT_DELAY);
CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
call->delayedAckEvent =
- rxevent_Post(&when, rxi_SendDelayedAck, call, 0);
+ rxevent_PostNow(&when, &now, rxi_SendDelayedAck, call, 0);
}
} else if (call->flags & RX_CALL_RECEIVE_DONE) {
rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
MUTEX_EXIT(&conn->conn_data_lock);
}
+static const char *
+rx_ack_reason(int reason)
+{
+ switch (reason) {
+ case RX_ACK_REQUESTED:
+ return "requested";
+ case RX_ACK_DUPLICATE:
+ return "duplicate";
+ case RX_ACK_OUT_OF_SEQUENCE:
+ return "sequence";
+ case RX_ACK_EXCEEDS_WINDOW:
+ return "window";
+ case RX_ACK_NOSPACE:
+ return "nospace";
+ case RX_ACK_PING:
+ return "ping";
+ case RX_ACK_PING_RESPONSE:
+ return "response";
+ case RX_ACK_DELAY:
+ return "delay";
+ case RX_ACK_IDLE:
+ return "idle";
+ default:
+ return "unknown!!";
+ }
+}
+
+
/* rxi_ComputePeerNetStats
*
* Called exclusively by rxi_ReceiveAckPacket to compute network link
u_short maxMTU = 0; /* Set if peer supports AFS 3.4a jumbo datagrams */
int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.ackPacketsRead++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.ackPacketsRead, rx_stats_mutex);
ap = (struct rx_ackPacket *)rx_DataOf(np);
- nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *) ap);
+ nbytes = rx_Contiguous(np) - (int)((ap->acks) - (u_char *) ap);
if (nbytes < 0)
return np; /* truncated ack packet */
rxi_UpdatePeerReach(conn, call);
#ifdef RXDEBUG
+#ifdef AFS_NT40_ENV
+ if (rxdebug_active) {
+ char msg[512];
+ size_t len;
+
+ len = _snprintf(msg, sizeof(msg),
+ "tid[%d] RACK: reason %s serial %u previous %u seq %u skew %d first %u acks %u space %u ",
+ GetCurrentThreadId(), rx_ack_reason(ap->reason),
+ ntohl(ap->serial), ntohl(ap->previousPacket),
+ (unsigned int)np->header.seq, (unsigned int)skew,
+ ntohl(ap->firstPacket), ap->nAcks, ntohs(ap->bufferSpace) );
+ if (nAcks) {
+ int offset;
+
+ for (offset = 0; offset < nAcks && len < sizeof(msg); offset++)
+ msg[len++] = (ap->acks[offset] == RX_ACK_TYPE_NACK ? '-' : '*');
+ }
+ msg[len++]='\n';
+ msg[len] = '\0';
+ OutputDebugString(msg);
+ }
+#else /* AFS_NT40_ENV */
if (rx_Log) {
fprintf(rx_Log,
"RACK: reason %x previous %u seq %u serial %u skew %d first %u",
}
putc('\n', rx_Log);
}
+#endif /* AFS_NT40_ENV */
#endif
/* Update the outgoing packet skew value to the latest value of
if (serial
&& (tp->header.serial == serial || tp->firstSerial == serial))
rxi_ComputePeerNetStats(call, tp, ap, np);
+ if (!(tp->flags & RX_PKTFLAG_ACKED)) {
+ newAckCount++;
+ }
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
/* XXX Hack. Because we have to release the global rx lock when sending
* packets (osi_NetSend) we drop all acks while we're traversing the tq
* set the ack bits in the packets and have rxi_Start remove the packets
* when it's done transmitting.
*/
- if (!(tp->flags & RX_PKTFLAG_ACKED)) {
- newAckCount++;
- }
if (call->flags & RX_CALL_TQ_BUSY) {
#ifdef RX_ENABLE_LOCKS
tp->flags |= RX_PKTFLAG_ACKED;
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
{
queue_Remove(tp);
+ tp->flags &= ~RX_PKTFLAG_TQ;
rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
}
}
} else {
call->nSoftAcked++;
}
- } else {
+ } else /* RX_ACK_TYPE_NACK */ {
tp->flags &= ~RX_PKTFLAG_ACKED;
missing = 1;
}
/* If the ack packet has a "recommended" size that is less than
* what I am using now, reduce my size to match */
rx_packetread(np, rx_AckDataSize(ap->nAcks) + sizeof(afs_int32),
- sizeof(afs_int32), &tSize);
+ (int)sizeof(afs_int32), &tSize);
tSize = (afs_uint32) ntohl(tSize);
peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
/* Get the maximum packet size to send to this peer */
- rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
+ rx_packetread(np, rx_AckDataSize(ap->nAcks), (int)sizeof(afs_int32),
&tSize);
tSize = (afs_uint32) ntohl(tSize);
tSize = (afs_uint32) MIN(tSize, rx_MyMaxSendSize);
* be unable to accept packets of the size that prior AFS versions would
* send without asking. */
if (peer->maxMTU != tSize) {
+ if (peer->maxMTU > tSize) /* possible cong., maxMTU decreased */
+ peer->congestSeq++;
peer->maxMTU = tSize;
peer->MTU = MIN(tSize, peer->MTU);
call->MTU = MIN(call->MTU, tSize);
- peer->congestSeq++;
}
if (np->length == rx_AckDataSize(ap->nAcks) + 3 * sizeof(afs_int32)) {
/* AFS 3.4a */
rx_packetread(np,
rx_AckDataSize(ap->nAcks) + 2 * sizeof(afs_int32),
- sizeof(afs_int32), &tSize);
+ (int)sizeof(afs_int32), &tSize);
tSize = (afs_uint32) ntohl(tSize); /* peer's receive window, if it's */
if (tSize < call->twind) { /* smaller than our send */
call->twind = tSize; /* window, we must send less... */
call->ssthresh = MIN(call->twind, call->ssthresh);
+ call->conn->twind[call->channel] = call->twind;
}
/* Only send jumbograms to 3.4a fileservers. 3.3a RX gets the
*/
if (tSize < call->twind) {
call->twind = tSize;
+ call->conn->twind[call->channel] = call->twind;
call->ssthresh = MIN(call->twind, call->ssthresh);
} else if (tSize > call->twind) {
call->twind = tSize;
+ call->conn->twind[call->channel] = call->twind;
}
/*
sizeof(afs_int32), &tSize);
maxDgramPackets = (afs_uint32) ntohl(tSize);
maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
- maxDgramPackets =
- MIN(maxDgramPackets, (int)(peer->ifDgramPackets));
- maxDgramPackets = MIN(maxDgramPackets, tSize);
+ maxDgramPackets = MIN(maxDgramPackets, peer->ifDgramPackets);
+ if (peer->natMTU < peer->ifMTU)
+ maxDgramPackets = MIN(maxDgramPackets, rxi_AdjustDgramPackets(1, peer->natMTU));
if (maxDgramPackets > 1) {
peer->maxDgramPackets = maxDgramPackets;
call->MTU = RX_JUMBOBUFFERSIZE + RX_HEADER_SIZE;
call->nNacks = nNacked;
}
} else {
- if (newAckCount) {
- call->nAcks++;
- }
+ call->nAcks += newAckCount;
call->nNacks = 0;
}
return np;
}
call->flags |= RX_CALL_FAST_RECOVER_WAIT;
- while (call->flags & RX_CALL_TQ_BUSY) {
- call->flags |= RX_CALL_TQ_WAIT;
-#ifdef RX_ENABLE_LOCKS
- CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
- osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
- }
+ rxi_WaitforTQBusy(call);
MUTEX_ENTER(&peer->peer_lock);
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
&& call->tfirst + call->nSoftAcked >= call->tnext) {
call->state = RX_STATE_DALLY;
rxi_ClearTransmitQueue(call, 0);
+ rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
} else if (!queue_IsEmpty(&call->tq)) {
- rxi_Start(0, call, istack);
+ rxi_Start(0, call, 0, istack);
}
return np;
}
call->flags |= RX_CALL_WAIT_PROC;
MUTEX_ENTER(&rx_stats_mutex);
rx_nWaiting++;
+ rx_nWaited++;
MUTEX_EXIT(&rx_stats_mutex);
rxi_calltrace(RX_CALL_ARRIVAL, call);
SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
}
void
-rxi_SendDelayedAck(struct rxevent *event, register struct rx_call *call,
- char *dummy)
+rxi_SendDelayedAck(struct rxevent *event, void *arg1, void *unused)
{
+ struct rx_call *call = arg1;
#ifdef RX_ENABLE_LOCKS
if (event) {
MUTEX_ENTER(&call->lock);
int someAcked = 0;
for (queue_Scan(&call->tq, p, tp, rx_packet)) {
- if (!p)
- break;
p->flags |= RX_PKTFLAG_ACKED;
someAcked = 1;
}
}
rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
- rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
call->tfirst = call->tnext;
call->nSoftAcked = 0;
void
rxi_ClearTransmitQueue(register struct rx_call *call, register int force)
{
+#ifdef AFS_GLOBAL_RXLOCK_KERNEL
register struct rx_packet *p, *tp;
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
int someAcked = 0;
for (queue_Scan(&call->tq, p, tp, rx_packet)) {
- if (!p)
- break;
p->flags |= RX_PKTFLAG_ACKED;
someAcked = 1;
}
}
} else {
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- for (queue_Scan(&call->tq, p, tp, rx_packet)) {
- if (!p)
- break;
- queue_Remove(p);
- rxi_FreePacket(p);
- }
+ rxi_FreePackets(0, &call->tq);
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
call->flags &= ~RX_CALL_TQ_CLEARME;
}
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
- rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
call->nSoftAcked = 0;
void
rxi_ClearReceiveQueue(register struct rx_call *call)
{
- register struct rx_packet *p, *tp;
if (queue_IsNotEmpty(&call->rq)) {
- for (queue_Scan(&call->rq, p, tp, rx_packet)) {
- if (!p)
- break;
- queue_Remove(p);
- rxi_FreePacket(p);
- rx_packetReclaims++;
- }
+ rx_packetReclaims += rxi_FreePackets(0, &call->rq);
call->flags &= ~(RX_CALL_RECEIVE_DONE | RX_CALL_HAVE_LAST);
}
if (call->state == RX_STATE_PRECALL) {
int istack, int force)
{
afs_int32 error;
- struct clock when;
+ struct clock when, now;
if (!call->error)
return packet;
rxi_SendSpecial(call, call->conn, packet, RX_PACKET_TYPE_ABORT,
(char *)&error, sizeof(error), istack);
} else if (!call->delayedAbortEvent) {
- clock_GetTime(&when);
+ clock_GetTime(&now);
+ when = now;
clock_Addmsec(&when, rxi_callAbortDelay);
CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT);
call->delayedAbortEvent =
- rxevent_Post(&when, rxi_SendDelayedCallAbort, call, 0);
+ rxevent_PostNow(&when, &now, rxi_SendDelayedCallAbort, call, 0);
}
return packet;
}
struct rx_packet *packet, int istack, int force)
{
afs_int32 error;
- struct clock when;
+ struct clock when, now;
if (!conn->error)
return packet;
sizeof(error), istack);
MUTEX_ENTER(&conn->conn_data_lock);
} else if (!conn->delayedAbortEvent) {
- clock_GetTime(&when);
+ clock_GetTime(&now);
+ when = now;
clock_Addmsec(&when, rxi_connAbortDelay);
conn->delayedAbortEvent =
- rxevent_Post(&when, rxi_SendDelayedConnAbort, conn, 0);
+ rxevent_PostNow(&when, &now, rxi_SendDelayedConnAbort, conn, 0);
}
return packet;
}
{
if (error) {
register int i;
+
+ dpf(("rxi_ConnectionError conn %x error %d", conn, error));
+
MUTEX_ENTER(&conn->conn_data_lock);
if (conn->challengeEvent)
rxevent_Cancel(conn->challengeEvent, (struct rx_call *)0, 0);
}
}
conn->error = error;
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.fatalErrors++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.fatalErrors, rx_stats_mutex);
}
}
void
rxi_CallError(register struct rx_call *call, afs_int32 error)
{
+ dpf(("rxi_CallError call %x error %d call->error %d", call, error, call->error));
if (call->error)
error = call->error;
-#ifdef RX_GLOBAL_RXLOCK_KERNEL
- if (!(call->flags & RX_CALL_TQ_BUSY)) {
+
+#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+ if (!((call->flags & RX_CALL_TQ_BUSY) || (call->tqWaiters > 0))) {
rxi_ResetCall(call, 0);
}
#else
register struct rx_peer *peer;
struct rx_packet *packet;
+ dpf(("rxi_ResetCall(call %x, newcall %d)\n", call, newcall));
+
/* Notify anyone who is waiting for asynchronous packet arrival */
if (call->arrivalProc) {
(*call->arrivalProc) (call, call->arrivalProcHandle,
- (int)call->arrivalProcArg);
- call->arrivalProc = (VOID(*)())0;
+ call->arrivalProcArg);
+ call->arrivalProc = (void (*)())0;
}
if (call->delayedAbortEvent) {
MUTEX_EXIT(&peer->peer_lock);
flags = call->flags;
- rxi_ClearReceiveQueue(call);
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- if (call->flags & RX_CALL_TQ_BUSY) {
+ if (flags & RX_CALL_TQ_BUSY) {
call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
call->flags |= (flags & RX_CALL_TQ_WAIT);
} else
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
{
- rxi_ClearTransmitQueue(call, 0);
- queue_Init(&call->tq);
+ rxi_ClearTransmitQueue(call, 1);
+ /* why init the queue if you just emptied it? queue_Init(&call->tq); */
+ if (call->tqWaiters || (flags & RX_CALL_TQ_WAIT)) {
+ dpf(("rcall %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
+ }
call->flags = 0;
+ while (call->tqWaiters) {
+#ifdef RX_ENABLE_LOCKS
+ CV_BROADCAST(&call->cv_tq);
+#else /* RX_ENABLE_LOCKS */
+ osi_rxWakeup(&call->tq);
+#endif /* RX_ENABLE_LOCKS */
+ call->tqWaiters--;
+ }
+ }
+
+ rxi_ClearReceiveQueue(call);
+ /* why init the queue if you just emptied it? queue_Init(&call->rq); */
+
+ if (call->currentPacket) {
+ call->currentPacket->flags &= ~RX_PKTFLAG_CP;
+ rxi_FreePacket(call->currentPacket);
+ call->currentPacket = (struct rx_packet *)0;
}
- queue_Init(&call->rq);
+ call->curlen = call->nLeft = call->nFree = 0;
+
+ rxi_FreePackets(0, &call->iovq);
+
call->error = 0;
- call->rwind = rx_initReceiveWindow;
- call->twind = rx_initSendWindow;
+ call->twind = call->conn->twind[call->channel];
+ call->rwind = call->conn->rwind[call->channel];
call->nSoftAcked = 0;
call->nextCwind = 0;
call->nAcks = 0;
register struct rx_packet *p;
u_char offset;
afs_int32 templ;
+#ifdef RX_ENABLE_TSFPQ
+ struct rx_ts_info_t * rx_ts_info;
+#endif
/*
* Open the receive window once a thread starts reading packets
*/
if (call->rnext > 1) {
- call->rwind = rx_maxReceiveWindow;
+ call->conn->rwind[call->channel] = call->rwind = rx_maxReceiveWindow;
}
call->nHardAcks = 0;
if (p) {
rx_computelen(p, p->length); /* reset length, you never know */
} /* where that's been... */
+#ifdef RX_ENABLE_TSFPQ
+ else {
+ RX_TS_INFO_GET(rx_ts_info);
+ if ((p = rx_ts_info->local_special_packet)) {
+ rx_computelen(p, p->length);
+ } else if ((p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) {
+ rx_ts_info->local_special_packet = p;
+ } else { /* We won't send the ack, but don't panic. */
+ return optionalPacket;
+ }
+ }
+#else
else if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) {
/* We won't send the ack, but don't panic. */
return optionalPacket;
}
+#endif
templ =
rx_AckDataSize(call->rwind) + 4 * sizeof(afs_int32) -
rx_GetDataSize(p);
if (templ > 0) {
- if (rxi_AllocDataBuf(p, templ, RX_PACKET_CLASS_SPECIAL)) {
+ if (rxi_AllocDataBuf(p, templ, RX_PACKET_CLASS_SPECIAL) > 0) {
+#ifndef RX_ENABLE_TSFPQ
if (!optionalPacket)
rxi_FreePacket(p);
+#endif
return optionalPacket;
}
templ = rx_AckDataSize(call->rwind) + 2 * sizeof(afs_int32);
if (rx_Contiguous(p) < templ) {
+#ifndef RX_ENABLE_TSFPQ
if (!optionalPacket)
rxi_FreePacket(p);
+#endif
return optionalPacket;
}
}
for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
if (!rqp || !call->rq.next
|| (rqp->header.seq > (call->rnext + call->rwind))) {
+#ifndef RX_ENABLE_TSFPQ
if (!optionalPacket)
rxi_FreePacket(p);
+#endif
rxi_CallError(call, RX_CALL_DEAD);
return optionalPacket;
}
ap->acks[offset++] = RX_ACK_TYPE_ACK;
if ((offset > (u_char) rx_maxReceiveWindow) || (offset > call->rwind)) {
+#ifndef RX_ENABLE_TSFPQ
if (!optionalPacket)
rxi_FreePacket(p);
+#endif
rxi_CallError(call, RX_CALL_DEAD);
return optionalPacket;
}
p->header.flags |= RX_CLIENT_INITIATED;
#ifdef RXDEBUG
+#ifdef AFS_NT40_ENV
+ if (rxdebug_active) {
+ char msg[512];
+ size_t len;
+
+ len = _snprintf(msg, sizeof(msg),
+ "tid[%d] SACK: reason %s serial %u previous %u seq %u first %u acks %u space %u ",
+ GetCurrentThreadId(), rx_ack_reason(ap->reason),
+ ntohl(ap->serial), ntohl(ap->previousPacket),
+ (unsigned int)p->header.seq, ntohl(ap->firstPacket),
+ ap->nAcks, ntohs(ap->bufferSpace) );
+ if (ap->nAcks) {
+ int offset;
+
+ for (offset = 0; offset < ap->nAcks && len < sizeof(msg); offset++)
+ msg[len++] = (ap->acks[offset] == RX_ACK_TYPE_NACK ? '-' : '*');
+ }
+ msg[len++]='\n';
+ msg[len] = '\0';
+ OutputDebugString(msg);
+ }
+#else /* AFS_NT40_ENV */
if (rx_Log) {
- fprintf(rx_Log, "SACK: reason %x previous %u seq %u first %u",
+ fprintf(rx_Log, "SACK: reason %x previous %u seq %u first %u ",
ap->reason, ntohl(ap->previousPacket),
(unsigned int)p->header.seq, ntohl(ap->firstPacket));
if (ap->nAcks) {
}
putc('\n', rx_Log);
}
+#endif /* AFS_NT40_ENV */
#endif
-
{
register int i, nbytes = p->length;
nbytes -= p->wirevec[i].iov_len;
}
}
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.ackPacketsSent++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.ackPacketsSent, rx_stats_mutex);
+#ifndef RX_ENABLE_TSFPQ
if (!optionalPacket)
rxi_FreePacket(p);
+#endif
return optionalPacket; /* Return packet for re-use by caller */
}
peer->nSent += len;
if (resending)
peer->reSends += len;
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.dataPacketsSent += len;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.dataPacketsSent, rx_stats_mutex);
MUTEX_EXIT(&peer->peer_lock);
if (list[len - 1]->header.flags & RX_LAST_PACKET) {
* packet until the congestion window reaches the ack rate. */
if (list[i]->header.serial) {
requestAck = 1;
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.dataPacketsReSent++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.dataPacketsReSent, rx_stats_mutex);
} else {
/* improved RTO calculation- not Karn */
list[i]->firstSent = *now;
peer->nSent++;
if (resending)
peer->reSends++;
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.dataPacketsSent++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.dataPacketsSent, rx_stats_mutex);
MUTEX_EXIT(&peer->peer_lock);
/* Tag this packet as not being the last in this group,
/* Update last send time for this call (for keep-alive
* processing), and for the connection (so that we can discover
* idle connections) */
- conn->lastSendTime = call->lastSendTime = clock_Sec();
+ call->lastSendData = conn->lastSendTime = call->lastSendTime = clock_Sec();
}
/* When sending packets we need to follow these rules:
#ifdef RX_ENABLE_LOCKS
/* Call rxi_Start, below, but with the call lock held. */
void
-rxi_StartUnlocked(struct rxevent *event, register struct rx_call *call,
- int istack)
+rxi_StartUnlocked(struct rxevent *event,
+ void *arg0, void *arg1, int istack)
{
+ struct rx_call *call = arg0;
+
MUTEX_ENTER(&call->lock);
- rxi_Start(event, call, istack);
+ rxi_Start(event, call, arg1, istack);
MUTEX_EXIT(&call->lock);
}
#endif /* RX_ENABLE_LOCKS */
* better optimized for new packets, the usual case, now that we've
* got rid of queues of send packets. XXXXXXXXXXX */
void
-rxi_Start(struct rxevent *event, register struct rx_call *call, int istack)
+rxi_Start(struct rxevent *event,
+ void *arg0, void *arg1, int istack)
{
+ struct rx_call *call = arg0;
+
struct rx_packet *p;
register struct rx_packet *nxp; /* Next pointer for queue_Scan */
struct rx_peer *peer = call->conn->peer;
- struct clock now, retryTime;
+ struct clock now, usenow, retryTime;
int haveEvent;
int nXmitPackets;
int maxXmitPackets;
return;
}
call->flags |= RX_CALL_FAST_RECOVER_WAIT;
- while (call->flags & RX_CALL_TQ_BUSY) {
- call->flags |= RX_CALL_TQ_WAIT;
-#ifdef RX_ENABLE_LOCKS
- CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
- osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
- }
+ rxi_WaitforTQBusy(call);
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
call->flags |= RX_CALL_FAST_RECOVER;
}
if (call->error) {
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- MUTEX_ENTER(&rx_stats_mutex);
- rx_tq_debug.rxi_start_in_error++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_tq_debug.rxi_start_in_error, rx_stats_mutex);
#endif
return;
}
* in this burst. Note, if we back off, it's reasonable to
* back off all of the packets in the same manner, even if
* some of them have been retransmitted more times than more
- * recent additions */
- clock_GetTime(&now);
- retryTime = now; /* initialize before use */
+ * recent additions.
+ * Do a dance to avoid blocking after setting now. */
+ clock_Zero(&retryTime);
MUTEX_ENTER(&peer->peer_lock);
clock_Add(&retryTime, &peer->timeout);
MUTEX_EXIT(&peer->peer_lock);
-
+ clock_GetTime(&now);
+ clock_Add(&retryTime, &now);
+ usenow = now;
/* Send (or resend) any packets that need it, subject to
* window restrictions and congestion burst control
* restrictions. Ask for an ack on the last packet sent in
if (!(call->flags & RX_CALL_TQ_BUSY)) {
call->flags |= RX_CALL_TQ_BUSY;
do {
+#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+ restart:
+#ifdef AFS_GLOBAL_RXLOCK_KERNEL
call->flags &= ~RX_CALL_NEED_START;
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
nXmitPackets = 0;
maxXmitPackets = MIN(call->twind, call->cwind);
xmitList = (struct rx_packet **)
- osi_Alloc(maxXmitPackets * sizeof(struct rx_packet *));
+#if defined(KERNEL) && !defined(UKERNEL) && defined(AFS_FBSD80_ENV)
+ /* XXXX else we must drop any mtx we hold */
+ afs_osi_Alloc_NoSleep(maxXmitPackets * sizeof(struct rx_packet *));
+#else
+ osi_Alloc(maxXmitPackets * sizeof(struct rx_packet *));
+#endif
if (xmitList == NULL)
osi_Panic("rxi_Start, failed to allocate xmit list");
for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
osi_Panic("rxi_Start: xmit queue clobbered");
}
if (p->flags & RX_PKTFLAG_ACKED) {
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.ignoreAckedPacket++;
- MUTEX_EXIT(&rx_stats_mutex);
+ /* Since we may block, don't trust this */
+ usenow.sec = usenow.usec = 0;
+ rx_MutexIncrement(rx_stats.ignoreAckedPacket, rx_stats_mutex);
continue; /* Ignore this packet if it has been acknowledged */
}
/* Transmit the packet if it needs to be sent. */
if (!clock_Lt(&now, &p->retryTime)) {
if (nXmitPackets == maxXmitPackets) {
- osi_Panic("rxi_Start: xmit list overflowed");
+ rxi_SendXmitList(call, xmitList, nXmitPackets,
+ istack, &now, &retryTime,
+ resending);
+ osi_Free(xmitList, maxXmitPackets *
+ sizeof(struct rx_packet *));
+ goto restart;
}
xmitList[nXmitPackets++] = p;
}
*/
if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
call->flags &= ~RX_CALL_TQ_BUSY;
- if (call->flags & RX_CALL_TQ_WAIT) {
- call->flags &= ~RX_CALL_TQ_WAIT;
+ if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
+ dpf(("call %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
#ifdef RX_ENABLE_LOCKS
+ osirx_AssertMine(&call->lock, "rxi_Start start");
CV_BROADCAST(&call->cv_tq);
#else /* RX_ENABLE_LOCKS */
osi_rxWakeup(&call->tq);
* the time to reset the call. This will also inform the using
* process that the call is in an error state.
*/
- MUTEX_ENTER(&rx_stats_mutex);
- rx_tq_debug.rxi_start_aborted++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_tq_debug.rxi_start_aborted, rx_stats_mutex);
call->flags &= ~RX_CALL_TQ_BUSY;
- if (call->flags & RX_CALL_TQ_WAIT) {
- call->flags &= ~RX_CALL_TQ_WAIT;
+ if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
+ dpf(("call %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
#ifdef RX_ENABLE_LOCKS
+ osirx_AssertMine(&call->lock, "rxi_Start middle");
CV_BROADCAST(&call->cv_tq);
#else /* RX_ENABLE_LOCKS */
osi_rxWakeup(&call->tq);
if (p->header.seq < call->tfirst
&& (p->flags & RX_PKTFLAG_ACKED)) {
queue_Remove(p);
+ p->flags &= ~RX_PKTFLAG_TQ;
rxi_FreePacket(p);
} else
missing = 1;
#ifdef RX_ENABLE_LOCKS
CALL_HOLD(call, RX_CALL_REFCOUNT_RESEND);
call->resendEvent =
- rxevent_Post(&retryTime, rxi_StartUnlocked,
- (void *)call, (void *)istack);
+ rxevent_PostNow2(&retryTime, &usenow,
+ rxi_StartUnlocked,
+ (void *)call, 0, istack);
#else /* RX_ENABLE_LOCKS */
call->resendEvent =
- rxevent_Post(&retryTime, rxi_Start, (void *)call,
- (void *)istack);
+ rxevent_PostNow2(&retryTime, &usenow, rxi_Start,
+ (void *)call, 0, istack);
#endif /* RX_ENABLE_LOCKS */
}
}
* protected by the global lock.
*/
call->flags &= ~RX_CALL_TQ_BUSY;
- if (call->flags & RX_CALL_TQ_WAIT) {
- call->flags &= ~RX_CALL_TQ_WAIT;
+ if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
+ dpf(("call %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
#ifdef RX_ENABLE_LOCKS
+ osirx_AssertMine(&call->lock, "rxi_Start end");
CV_BROADCAST(&call->cv_tq);
#else /* RX_ENABLE_LOCKS */
osi_rxWakeup(&call->tq);
* processing), and for the connection (so that we can discover
* idle connections) */
conn->lastSendTime = call->lastSendTime = clock_Sec();
+ /* Don't count keepalives here, so idleness can be tracked. */
+ if (p->header.type != RX_PACKET_TYPE_ACK)
+ call->lastSendData = call->lastSendTime;
}
afs_uint32 now;
afs_uint32 deadTime;
-#ifdef RX_GLOBAL_RXLOCK_KERNEL
+#ifdef AFS_GLOBAL_RXLOCK_KERNEL
if (call->flags & RX_CALL_TQ_BUSY) {
/* Call is active and will be reset by rxi_Start if it's
* in an error state.
* number of seconds. */
if (now > (call->lastReceiveTime + deadTime)) {
if (call->state == RX_STATE_ACTIVE) {
+#ifdef ADAPT_PMTU
+#if defined(KERNEL) && defined(AFS_SUN57_ENV)
+ ire_t *ire;
+#if defined(AFS_SUN510_ENV) && defined(GLOBAL_NETSTACKID)
+ netstack_t *ns = netstack_find_by_stackid(GLOBAL_NETSTACKID);
+ ip_stack_t *ipst = ns->netstack_ip;
+#endif
+ ire = ire_cache_lookup(call->conn->peer->host
+#if defined(AFS_SUN510_ENV) && defined(ALL_ZONES)
+ , ALL_ZONES
+#if defined(AFS_SUN510_ENV) && (defined(ICL_3_ARG) || defined(GLOBAL_NETSTACKID))
+ , NULL
+#if defined(AFS_SUN510_ENV) && defined(GLOBAL_NETSTACKID)
+ , ipst
+#endif
+#endif
+#endif
+ );
+
+ if (ire && ire->ire_max_frag > 0)
+ rxi_SetPeerMtu(call->conn->peer->host, 0, ire->ire_max_frag);
+#if defined(GLOBAL_NETSTACKID)
+ netstack_rele(ns);
+#endif
+#endif
+#endif /* ADAPT_PMTU */
rxi_CallError(call, RX_CALL_DEAD);
return -1;
} else {
return -1;
}
}
+ if (call->lastSendData && conn->idleDeadTime && (conn->idleDeadErr != 0)
+ && ((call->lastSendData + conn->idleDeadTime) < now)) {
+ if (call->state == RX_STATE_ACTIVE) {
+ rxi_CallError(call, conn->idleDeadErr);
+ return -1;
+ }
+ }
/* see if we have a hard timeout */
if (conn->hardDeadTime
&& (now > (conn->hardDeadTime + call->startTime.sec))) {
* keep-alive packet (if we're actually trying to keep the call alive)
*/
void
-rxi_KeepAliveEvent(struct rxevent *event, register struct rx_call *call,
- char *dummy)
+rxi_KeepAliveEvent(struct rxevent *event, void *arg1, void *dummy)
{
+ struct rx_call *call = arg1;
struct rx_connection *conn;
afs_uint32 now;
rxi_ScheduleKeepAliveEvent(register struct rx_call *call)
{
if (!call->keepAliveEvent) {
- struct clock when;
- clock_GetTime(&when);
+ struct clock when, now;
+ clock_GetTime(&now);
+ when = now;
when.sec += call->conn->secondsUntilPing;
CALL_HOLD(call, RX_CALL_REFCOUNT_ALIVE);
call->keepAliveEvent =
- rxevent_Post(&when, rxi_KeepAliveEvent, call, 0);
+ rxevent_PostNow(&when, &now, rxi_KeepAliveEvent, call, 0);
}
}
* that have been delayed to throttle looping clients. */
void
rxi_SendDelayedConnAbort(struct rxevent *event,
- register struct rx_connection *conn, char *dummy)
+ void *arg1, void *unused)
{
+ struct rx_connection *conn = arg1;
+
afs_int32 error;
struct rx_packet *packet;
/* This routine is called to send call abort messages
* that have been delayed to throttle looping clients. */
void
-rxi_SendDelayedCallAbort(struct rxevent *event, register struct rx_call *call,
- char *dummy)
+rxi_SendDelayedCallAbort(struct rxevent *event,
+ void *arg1, void *dummy)
{
+ struct rx_call *call = arg1;
+
afs_int32 error;
struct rx_packet *packet;
(char *)&error, sizeof(error), 0);
rxi_FreePacket(packet);
}
+ CALL_RELE(call, RX_CALL_REFCOUNT_ABORT);
MUTEX_EXIT(&call->lock);
}
* issues a challenge to the client, which is obtained from the
* security object associated with the connection */
void
-rxi_ChallengeEvent(struct rxevent *event, register struct rx_connection *conn,
- void *atries)
+rxi_ChallengeEvent(struct rxevent *event,
+ void *arg0, void *arg1, int tries)
{
- int tries = (int)atries;
+ struct rx_connection *conn = arg0;
+
conn->challengeEvent = NULL;
if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) {
register struct rx_packet *packet;
- struct clock when;
+ struct clock when, now;
if (tries <= 0) {
/* We've failed to authenticate for too long.
RX_PACKET_TYPE_CHALLENGE, NULL, -1, 0);
rxi_FreePacket(packet);
}
- clock_GetTime(&when);
+ clock_GetTime(&now);
+ when = now;
when.sec += RX_CHALLENGE_TIMEOUT;
conn->challengeEvent =
- rxevent_Post(&when, rxi_ChallengeEvent, conn,
- (void *)(tries - 1));
+ rxevent_PostNow2(&when, &now, rxi_ChallengeEvent, conn, 0,
+ (tries - 1));
}
}
{
if (!conn->challengeEvent) {
RXS_CreateChallenge(conn->securityObject, conn);
- rxi_ChallengeEvent(NULL, conn, (void *)RX_CHALLENGE_MAXTRIES);
+ rxi_ChallengeEvent(NULL, conn, 0, RX_CHALLENGE_MAXTRIES);
};
}
{
struct clock thisRtt, *rttp = &thisRtt;
-#if defined(AFS_ALPHA_LINUX22_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
- /* making year 2038 bugs to get this running now - stroucki */
- struct timeval temptime;
-#endif
register int rtt_timeout;
-#if defined(AFS_ALPHA_LINUX20_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
- /* yet again. This was the worst Heisenbug of the port - stroucki */
- clock_GetTime(&temptime);
- rttp->sec = (afs_int32) temptime.tv_sec;
- rttp->usec = (afs_int32) temptime.tv_usec;
-#else
clock_GetTime(rttp);
-#endif
+
if (clock_Lt(rttp, sentp)) {
clock_Zero(rttp);
return; /* somebody set the clock back, don't count this time. */
/* Find all server connections that have not been active for a long time, and
* toss them */
void
-rxi_ReapConnections(void)
+rxi_ReapConnections(struct rxevent *unused, void *unused1, void *unused2)
{
- struct clock now;
+ struct clock now, when;
clock_GetTime(&now);
/* Find server connection structures that haven't been used for
rxi_rpc_peer_stat_cnt -= num_funcs;
}
rxi_FreePeer(peer);
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.nPeerStructs--;
- MUTEX_EXIT(&rx_stats_mutex);
- if (prev == *peer_ptr) {
+ rx_MutexDecrement(rx_stats.nPeerStructs, rx_stats_mutex);
+ if (peer == *peer_ptr) {
*peer_ptr = next;
prev = next;
} else
}
MUTEX_EXIT(&rx_freePktQ_lock);
- now.sec += RX_REAP_TIME; /* Check every RX_REAP_TIME seconds */
- rxevent_Post(&now, rxi_ReapConnections, 0, 0);
+ when = now;
+ when.sec += RX_REAP_TIME; /* Check every RX_REAP_TIME seconds */
+ rxevent_Post(&when, rxi_ReapConnections, 0, 0);
}
#endif /* ADAPT_WINDOW */
+#ifdef RXDEBUG
+void
+rxi_DebugInit(void)
+{
+#ifdef AFS_NT40_ENV
+#define TRACE_OPTION_DEBUGLOG 4
+ HKEY parmKey;
+ DWORD dummyLen;
+ DWORD TraceOption;
+ long code;
+ rxdebug_active = 0;
+ code = RegOpenKeyEx(HKEY_LOCAL_MACHINE, AFSREG_CLT_SVC_PARAM_SUBKEY,
+ 0, KEY_QUERY_VALUE, &parmKey);
+ if (code != ERROR_SUCCESS)
+ return;
+
+ dummyLen = sizeof(TraceOption);
+ code = RegQueryValueEx(parmKey, "TraceOption", NULL, NULL,
+ (BYTE *) &TraceOption, &dummyLen);
+ if (code == ERROR_SUCCESS) {
+ rxdebug_active = (TraceOption & TRACE_OPTION_DEBUGLOG) ? 1 : 0;
+ }
+ RegCloseKey (parmKey);
+#endif /* AFS_NT40_ENV */
+}
+
+#ifdef AFS_NT40_ENV
+void
+rx_DebugOnOff(int on)
+{
+ rxdebug_active = on;
+}
+#endif /* AFS_NT40_ENV */
-#ifdef RXDEBUG
/* Don't call this debugging routine directly; use dpf */
void
-rxi_DebugPrint(char *format, int a1, int a2, int a3, int a4, int a5, int a6,
- int a7, int a8, int a9, int a10, int a11, int a12, int a13,
- int a14, int a15)
+rxi_DebugPrint(char *format, ...)
{
+#ifdef AFS_NT40_ENV
+ char msg[512];
+ char tformat[256];
+ size_t len;
+
+ len = _snprintf(tformat, sizeof(tformat), "tid[%d] %s", GetCurrentThreadId(), format);
+
+ if (len > 0) {
+ len = _vsnprintf(msg, sizeof(msg)-2, tformat, ap);
+ if (len > 0) {
+ if (msg[len-1] != '\n') {
+ msg[len] = '\n';
+ msg[len+1] = '\0';
+ }
+ OutputDebugString(msg);
+ }
+ }
+ va_end(ap);
+#else
struct clock now;
+ va_list ap;
+
+ va_start(ap, format);
+
clock_GetTime(&now);
fprintf(rx_Log, " %u.%.3u:", (unsigned int)now.sec,
(unsigned int)now.usec / 1000);
- fprintf(rx_Log, format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12,
- a13, a14, a15);
+ vfprintf(rx_Log, format, ap);
putc('\n', rx_Log);
-}
+ va_end(ap);
#endif
+}
-#ifdef RXDEBUG
/*
* This function is used to process the rx_stats structure that is local
* to a process as well as an rx_stats structure received from a remote
* checking.
*/
void
-rx_PrintTheseStats(FILE * file, struct rx_stats *s, int size,
+rx_PrintTheseStats(FILE * file, struct rx_statistics *s, int size,
afs_int32 freePackets, char version)
{
int i;
- if (size != sizeof(struct rx_stats)) {
+ if (size != sizeof(struct rx_statistics)) {
fprintf(file,
- "Unexpected size of stats structure: was %d, expected %d\n",
- size, sizeof(struct rx_stats));
+ "Unexpected size of stats structure: was %d, expected %lud\n",
+ size, sizeof(struct rx_statistics));
}
fprintf(file, "rx stats: free packets %d, allocs %d, ", (int)freePackets,
* counter
*/
-#define LOCK_RX_DEBUG assert(pthread_mutex_lock(&rx_debug_mutex)==0);
-#define UNLOCK_RX_DEBUG assert(pthread_mutex_unlock(&rx_debug_mutex)==0);
+#define LOCK_RX_DEBUG assert(pthread_mutex_lock(&rx_debug_mutex)==0)
+#define UNLOCK_RX_DEBUG assert(pthread_mutex_unlock(&rx_debug_mutex)==0)
#else
#define LOCK_RX_DEBUG
#define UNLOCK_RX_DEBUG
void *outputData, size_t outputLength)
{
static afs_int32 counter = 100;
- afs_int32 endTime;
+ time_t waitTime, waitCount, startTime;
struct rx_header theader;
char tbuffer[1500];
register afs_int32 code;
- struct timeval tv;
+ struct timeval tv_now, tv_wake, tv_delta;
struct sockaddr_in taddr, faddr;
+#ifdef AFS_NT40_ENV
int faddrLen;
+#else
+ socklen_t faddrLen;
+#endif
fd_set imask;
register char *tp;
- endTime = time(0) + 20; /* try for 20 seconds */
- LOCK_RX_DEBUG counter++;
- UNLOCK_RX_DEBUG tp = &tbuffer[sizeof(struct rx_header)];
+ startTime = time(0);
+ waitTime = 1;
+ waitCount = 5;
+ LOCK_RX_DEBUG;
+ counter++;
+ UNLOCK_RX_DEBUG;
+ tp = &tbuffer[sizeof(struct rx_header)];
taddr.sin_family = AF_INET;
taddr.sin_port = remotePort;
taddr.sin_addr.s_addr = remoteAddr;
(struct sockaddr *)&taddr, sizeof(struct sockaddr_in));
/* see if there's a packet available */
- FD_ZERO(&imask);
- FD_SET(socket, &imask);
- tv.tv_sec = 1;
- tv.tv_usec = 0;
- code = select(socket + 1, &imask, 0, 0, &tv);
- if (code == 1 && FD_ISSET(socket,&imask)) {
- /* now receive a packet */
- faddrLen = sizeof(struct sockaddr_in);
- code =
- recvfrom(socket, tbuffer, sizeof(tbuffer), 0,
- (struct sockaddr *)&faddr, &faddrLen);
-
- if (code > 0) {
- memcpy(&theader, tbuffer, sizeof(struct rx_header));
- if (counter == ntohl(theader.callNumber))
- break;
- }
+ gettimeofday(&tv_wake,0);
+ tv_wake.tv_sec += waitTime;
+ for (;;) {
+ FD_ZERO(&imask);
+ FD_SET(socket, &imask);
+ tv_delta.tv_sec = tv_wake.tv_sec;
+ tv_delta.tv_usec = tv_wake.tv_usec;
+ gettimeofday(&tv_now, 0);
+
+ if (tv_delta.tv_usec < tv_now.tv_usec) {
+ /* borrow */
+ tv_delta.tv_usec += 1000000;
+ tv_delta.tv_sec--;
+ }
+ tv_delta.tv_usec -= tv_now.tv_usec;
+
+ if (tv_delta.tv_sec < tv_now.tv_sec) {
+ /* time expired */
+ break;
+ }
+ tv_delta.tv_sec -= tv_now.tv_sec;
+
+ code = select(socket + 1, &imask, 0, 0, &tv_delta);
+ if (code == 1 && FD_ISSET(socket, &imask)) {
+ /* now receive a packet */
+ faddrLen = sizeof(struct sockaddr_in);
+ code =
+ recvfrom(socket, tbuffer, sizeof(tbuffer), 0,
+ (struct sockaddr *)&faddr, &faddrLen);
+
+ if (code > 0) {
+ memcpy(&theader, tbuffer, sizeof(struct rx_header));
+ if (counter == ntohl(theader.callNumber))
+ goto success;
+ continue;
+ }
+ }
+ break;
}
/* see if we've timed out */
- if (endTime < time(0))
- return -1;
+ if (!--waitCount) {
+ return -1;
+ }
+ waitTime <<= 1;
}
+
+ success:
code -= sizeof(struct rx_header);
if (code > outputLength)
code = outputLength;
if (stat->version >= RX_DEBUGI_VERSION_W_GETPEER) {
*supportedValues |= RX_SERVER_DEBUG_ALL_PEER;
}
+ if (stat->version >= RX_DEBUGI_VERSION_W_WAITED) {
+ *supportedValues |= RX_SERVER_DEBUG_WAITED_CNT;
+ }
stat->nFreePackets = ntohl(stat->nFreePackets);
stat->packetReclaims = ntohl(stat->packetReclaims);
afs_int32
rx_GetServerStats(osi_socket socket, afs_uint32 remoteAddr,
- afs_uint16 remotePort, struct rx_stats * stat,
+ afs_uint16 remotePort, struct rx_statistics * stat,
afs_uint32 * supportedValues)
{
struct rx_debugIn in;
register struct rx_serverQueueEntry *sq;
#endif /* KERNEL */
- LOCK_RX_INIT if (rxinit_status == 1) {
- UNLOCK_RX_INIT return; /* Already shutdown. */
+ LOCK_RX_INIT;
+ if (rxinit_status == 1) {
+ UNLOCK_RX_INIT;
+ return; /* Already shutdown. */
}
#ifndef KERNEL
rx_port = 0;
}
next = peer->next;
rxi_FreePeer(peer);
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.nPeerStructs--;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexDecrement(rx_stats.nPeerStructs, rx_stats_mutex);
}
}
}
MUTEX_EXIT(&rx_stats_mutex);
rxinit_status = 1;
-UNLOCK_RX_INIT}
+ UNLOCK_RX_INIT;
+}
#ifdef RX_ENABLE_LOCKS
void
int isServer)
{
+ if (!(rxi_monitor_peerStats || rxi_monitor_processStats))
+ return;
+
MUTEX_ENTER(&rx_rpc_stats);
MUTEX_ENTER(&peer->peer_lock);
return 0;
return rxi_rxstat_userok(call);
}
+
+#ifdef AFS_NT40_ENV
+/*
+ * DllMain() -- Entry-point function called by the DllMainCRTStartup()
+ * function in the MSVC runtime DLL (msvcrt.dll).
+ *
+ * Note: the system serializes calls to this function.
+ */
+BOOL WINAPI
+DllMain(HINSTANCE dllInstHandle, /* instance handle for this DLL module */
+ DWORD reason, /* reason function is being called */
+ LPVOID reserved) /* reserved for future use */
+{
+ switch (reason) {
+ case DLL_PROCESS_ATTACH:
+ /* library is being attached to a process */
+ INIT_PTHREAD_LOCKS;
+ return TRUE;
+
+ case DLL_PROCESS_DETACH:
+ return TRUE;
+
+ default:
+ return FALSE;
+ }
+}
+#endif
+