*/
extern pthread_mutex_t rx_stats_mutex;
+extern pthread_mutex_t rx_waiting_mutex;
+extern pthread_mutex_t rx_quota_mutex;
+extern pthread_mutex_t rx_pthread_mutex;
+extern pthread_mutex_t rx_packets_mutex;
extern pthread_mutex_t des_init_mutex;
extern pthread_mutex_t des_random_mutex;
extern pthread_mutex_t rx_clock_mutex;
== 0);
assert(pthread_mutex_init(&rx_stats_mutex, (const pthread_mutexattr_t *)0)
== 0);
+ assert(pthread_mutex_init(&rx_waiting_mutex, (const pthread_mutexattr_t *)0)
+ == 0);
+ assert(pthread_mutex_init(&rx_quota_mutex, (const pthread_mutexattr_t *)0)
+ == 0);
+ assert(pthread_mutex_init(&rx_pthread_mutex, (const pthread_mutexattr_t *)0)
+ == 0);
+ assert(pthread_mutex_init(&rx_packets_mutex, (const pthread_mutexattr_t *)0)
+ == 0);
assert(pthread_mutex_init
(&rxi_connCacheMutex, (const pthread_mutexattr_t *)0) == 0);
assert(pthread_mutex_init(&rx_init_mutex, (const pthread_mutexattr_t *)0)
assert(pthread_once(&rx_once_init, rxi_InitPthread)==0)
/*
* The rx_stats_mutex mutex protects the following global variables:
- * rxi_dataQuota
- * rxi_minDeficit
- * rxi_availProcs
- * rxi_totalMin
* rxi_lowConnRefCount
* rxi_lowPeerRefCount
* rxi_nCalls
* rxi_Alloccnt
* rxi_Allocsize
- * rx_nFreePackets
* rx_tq_debug
* rx_stats
*/
+
+/*
+ * The rx_quota_mutex mutex protects the following global variables:
+ * rxi_dataQuota
+ * rxi_minDeficit
+ * rxi_availProcs
+ * rxi_totalMin
+ */
+
+/*
+ * The rx_freePktQ_lock protects the following global variables:
+ * rx_nFreePackets
+ */
+
+/*
+ * The rx_packets_mutex mutex protects the following global variables:
+ * rx_nPackets
+ * rx_TSFPQLocalMax
+ * rx_TSFPQGlobSize
+ * rx_TSFPQMaxProcs
+ */
+
+/*
+ * The rx_pthread_mutex mutex protects the following global variables:
+ * rxi_pthread_hinum
+ */
#else
#define INIT_PTHREAD_LOCKS
#endif
rxdb_init();
#endif /* RX_LOCKS_DB */
MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex", MUTEX_DEFAULT, 0);
+ MUTEX_INIT(&rx_waiting_mutex, "rx_waiting_mutex", MUTEX_DEFAULT, 0);
+ MUTEX_INIT(&rx_quota_mutex, "rx_quota_mutex", MUTEX_DEFAULT, 0);
+ MUTEX_INIT(&rx_pthread_mutex, "rx_pthread_mutex", MUTEX_DEFAULT, 0);
+ MUTEX_INIT(&rx_packets_mutex, "rx_packets_mutex", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock", MUTEX_DEFAULT, 0);
MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock", MUTEX_DEFAULT, 0);
rx_SetEpoch(tv.tv_sec); /* Start time of this package, rxkad
* will provide a randomer value. */
#endif
- rx_MutexAdd(rxi_dataQuota, rx_extraQuota, rx_stats_mutex); /* + extra pkts caller asked to rsrv */
+ rx_MutexAdd(rxi_dataQuota, rx_extraQuota, rx_stats_quota); /* + extra pkts caller asked to rsrv */
/* *Slightly* random start time for the cid. This is just to help
* out with the hashing function at the peer */
rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
/* otherwise, can use only if there are enough to allow everyone
* to go to their min quota after this guy starts.
*/
- MUTEX_ENTER(&rx_stats_mutex);
+
+ MUTEX_ENTER(&rx_quota_mutex);
if ((aservice->nRequestsRunning < aservice->minProcs)
|| (rxi_availProcs > rxi_minDeficit)) {
aservice->nRequestsRunning++;
if (aservice->nRequestsRunning <= aservice->minProcs)
rxi_minDeficit--;
rxi_availProcs--;
- MUTEX_EXIT(&rx_stats_mutex);
+ MUTEX_EXIT(&rx_quota_mutex);
return 1;
}
- MUTEX_EXIT(&rx_stats_mutex);
+ MUTEX_EXIT(&rx_quota_mutex);
return 0;
}
ReturnToServerPool(register struct rx_service *aservice)
{
aservice->nRequestsRunning--;
- MUTEX_ENTER(&rx_stats_mutex);
+ MUTEX_ENTER(&rx_quota_mutex);
if (aservice->nRequestsRunning < aservice->minProcs)
rxi_minDeficit++;
rxi_availProcs++;
- MUTEX_EXIT(&rx_stats_mutex);
+ MUTEX_EXIT(&rx_quota_mutex);
}
#else /* RX_ENABLE_LOCKS */
service = rx_services[i];
if (service == (struct rx_service *)0)
break;
- MUTEX_ENTER(&rx_stats_mutex);
+ MUTEX_ENTER(&rx_quota_mutex);
rxi_totalMin += service->minProcs;
/* below works even if a thread is running, since minDeficit would
* still have been decremented and later re-incremented.
*/
rxi_minDeficit += service->minProcs;
- MUTEX_EXIT(&rx_stats_mutex);
+ MUTEX_EXIT(&rx_quota_mutex);
}
/* Turn on reaping of idle server connections */
if (call->flags & RX_CALL_WAIT_PROC) {
call->flags &= ~RX_CALL_WAIT_PROC;
- rx_MutexDecrement(rx_nWaiting, rx_stats_mutex);
+ rx_MutexDecrement(rx_nWaiting, rx_waiting_mutex);
}
if (call->state != RX_STATE_PRECALL || call->error) {
TooLow(struct rx_packet *ap, struct rx_call *acall)
{
int rc = 0;
- MUTEX_ENTER(&rx_stats_mutex);
+
+ MUTEX_ENTER(&rx_quota_mutex);
if (((ap->header.seq != 1) && (acall->flags & RX_CALL_CLEARED)
&& (acall->state == RX_STATE_PRECALL))
|| ((rx_nFreePackets < rxi_dataQuota + 2)
&& (acall->flags & RX_CALL_READER_WAIT)))) {
rc = 1;
}
- MUTEX_EXIT(&rx_stats_mutex);
+ MUTEX_EXIT(&rx_quota_mutex);
return rc;
}
#endif /* KERNEL */
if (!(call->flags & RX_CALL_WAIT_PROC)) {
call->flags |= RX_CALL_WAIT_PROC;
- MUTEX_ENTER(&rx_stats_mutex);
- rx_nWaiting++;
- rx_nWaited++;
- MUTEX_EXIT(&rx_stats_mutex);
+ MUTEX_ENTER(&rx_waiting_mutex);
+ rx_nWaiting++;
+ rx_nWaited++;
+ MUTEX_EXIT(&rx_waiting_mutex);
rxi_calltrace(RX_CALL_ARRIVAL, call);
SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
queue_Append(&rx_incomingCallQueue, call);
call->flags &= ~RX_CALL_WAIT_PROC;
if (queue_IsOnQueue(call)) {
queue_Remove(call);
- MUTEX_ENTER(&rx_stats_mutex);
- rx_nWaiting--;
- MUTEX_EXIT(&rx_stats_mutex);
+
+ MUTEX_ENTER(&rx_waiting_mutex);
+ rx_nWaiting--;
+ MUTEX_EXIT(&rx_waiting_mutex);
}
}
call->state = RX_STATE_ACTIVE;
if (queue_IsOnQueue(call)) {
queue_Remove(call);
if (flags & RX_CALL_WAIT_PROC) {
- MUTEX_ENTER(&rx_stats_mutex);
- rx_nWaiting--;
- MUTEX_EXIT(&rx_stats_mutex);
+
+ MUTEX_ENTER(&rx_waiting_mutex);
+ rx_nWaiting--;
+ MUTEX_EXIT(&rx_waiting_mutex);
}
}
MUTEX_EXIT(call->call_queue_lock);
rxi_FreeAllPackets();
- MUTEX_ENTER(&rx_stats_mutex);
+ MUTEX_ENTER(&rx_quota_mutex);
rxi_dataQuota = RX_MAX_QUOTA;
rxi_availProcs = rxi_totalMin = rxi_minDeficit = 0;
- MUTEX_EXIT(&rx_stats_mutex);
-
+ MUTEX_EXIT(&rx_quota_mutex);
rxinit_status = 1;
UNLOCK_RX_INIT;
}
* by each call to AllocPacketBufs() will increase indefinitely without a cap on the transfer
* glob size. A cap of 64 is selected because that will produce an allocation of greater than
* three times that amount which is greater than half of ncalls * maxReceiveWindow.
+ * Must be called under rx_packets_mutex.
*/
#define RX_TS_FPQ_COMPUTE_LIMITS \
do { \
(rx_ts_info_p)->_FPQ.ltog_ops++; \
(rx_ts_info_p)->_FPQ.ltog_xfer += tsize; \
if ((rx_ts_info_p)->_FPQ.delta) { \
- MUTEX_ENTER(&rx_stats_mutex); \
+ MUTEX_ENTER(&rx_packets_mutex); \
RX_TS_FPQ_COMPUTE_LIMITS; \
- MUTEX_EXIT(&rx_stats_mutex); \
+ MUTEX_EXIT(&rx_packets_mutex); \
(rx_ts_info_p)->_FPQ.delta = 0; \
} \
} while(0)
(rx_ts_info_p)->_FPQ.ltog_ops++; \
(rx_ts_info_p)->_FPQ.ltog_xfer += (num_transfer); \
if ((rx_ts_info_p)->_FPQ.delta) { \
- MUTEX_ENTER(&rx_stats_mutex); \
+ MUTEX_ENTER(&rx_packets_mutex); \
RX_TS_FPQ_COMPUTE_LIMITS; \
- MUTEX_EXIT(&rx_stats_mutex); \
+ MUTEX_EXIT(&rx_packets_mutex); \
(rx_ts_info_p)->_FPQ.delta = 0; \
} \
} while(0)
EXT int rxi_fcfs_thread_num GLOBALSINIT(0);
EXT pthread_key_t rx_thread_id_key;
/* keep track of pthread numbers - protected by rx_stats_mutex,
- except in rx_Init() before mutex exists! */
+ * except in rx_Init() before mutex exists! */
EXT int rxi_pthread_hinum GLOBALSINIT(0);
#else
#define rxi_fcfs_thread_num (0)
#endif
#if defined(RX_ENABLE_LOCKS)
-EXT afs_kmutex_t rx_stats_mutex; /* used to activate stats gathering */
+EXT afs_kmutex_t rx_stats_mutex; /* used to protect stats gathering */
+EXT afs_kmutex_t rx_waiting_mutex; /* used to protect waiting counters */
+EXT afs_kmutex_t rx_quota_mutex; /* used to protect quota counters */
+EXT afs_kmutex_t rx_pthread_mutex; /* used to protect pthread counters */
+EXT afs_kmutex_t rx_packets_mutex; /* used to protect packet counters */
#endif
EXT2 int rx_enable_stats GLOBALSINIT(0);
struct rx_call *newcall = NULL;
rxi_MorePackets(rx_maxReceiveWindow + 2); /* alloc more packets */
- MUTEX_ENTER(&rx_stats_mutex);
+ MUTEX_ENTER(&rx_quota_mutex);
rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
/* threadID is used for making decisions in GetCall. Get it by bumping
* number of threads handling incoming calls */
* So either introduce yet another counter or flag the FCFS
* thread... chose the latter.
*/
+ MUTEX_ENTER(&rx_pthread_mutex);
threadID = ++rxi_pthread_hinum;
+ MUTEX_EXIT(&rx_pthread_mutex);
if (rxi_fcfs_thread_num == 0 && rxi_fcfs_thread_num != threadID)
rxi_fcfs_thread_num = threadID;
++rxi_availProcs;
- MUTEX_EXIT(&rx_stats_mutex);
+ MUTEX_EXIT(&rx_quota_mutex);
while (1) {
sock = OSI_NULLSOCKET;
dpf(("Unable to create Rx event handling thread\n"));
exit(1);
}
- MUTEX_ENTER(&rx_stats_mutex);
+ MUTEX_ENTER(&rx_pthread_mutex);
++rxi_pthread_hinum;
- MUTEX_EXIT(&rx_stats_mutex);
+ MUTEX_EXIT(&rx_pthread_mutex);
AFS_SIGSET_RESTORE();
assert(pthread_mutex_lock(&listener_mutex) == 0);
dpf(("Unable to create socket listener thread\n"));
exit(1);
}
- MUTEX_ENTER(&rx_stats_mutex);
+ MUTEX_ENTER(&rx_pthread_mutex);
++rxi_pthread_hinum;
- MUTEX_EXIT(&rx_stats_mutex);
+ MUTEX_EXIT(&rx_pthread_mutex);
AFS_SIGSET_RESTORE();
return 0;
}
#ifdef RX_ENABLE_TSFPQ
queue_Init(&rx_ts_info->_FPQ);
- MUTEX_ENTER(&rx_stats_mutex);
+ MUTEX_ENTER(&rx_packets_mutex);
rx_TSFPQMaxProcs++;
RX_TS_FPQ_COMPUTE_LIMITS;
- MUTEX_EXIT(&rx_stats_mutex);
+ MUTEX_EXIT(&rx_packets_mutex);
#endif /* RX_ENABLE_TSFPQ */
return rx_ts_info;
}