rx_nFreePackets = 0;
queue_Init(&rx_freePacketQueue);
rxi_NeedMorePackets = FALSE;
+ rx_nPackets = 0; /* rx_nPackets is managed by rxi_MorePackets* */
+
+ /* enforce a minimum number of allocated packets */
+ if (rx_extraPackets < rxi_nSendFrags * rx_maxSendWindow)
+ rx_extraPackets = rxi_nSendFrags * rx_maxSendWindow;
+
+ /* allocate the initial free packet pool */
#ifdef RX_ENABLE_TSFPQ
- rx_nPackets = 0; /* in TSFPQ version, rx_nPackets is managed by rxi_MorePackets* */
rxi_MorePacketsTSFPQ(rx_extraPackets + RX_MAX_QUOTA + 2, RX_TS_FPQ_FLUSH_GLOBAL, 0);
#else /* RX_ENABLE_TSFPQ */
- rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */
- rxi_MorePackets(rx_nPackets);
+ rxi_MorePackets(rx_extraPackets + RX_MAX_QUOTA + 2); /* fudge */
#endif /* RX_ENABLE_TSFPQ */
rx_CheckPackets();
/* otherwise, can use only if there are enough to allow everyone
* to go to their min quota after this guy starts.
*/
+ MUTEX_ENTER(&rx_quota_mutex);
if (rxi_availProcs > rxi_minDeficit)
rc = 1;
+ MUTEX_EXIT(&rx_quota_mutex);
return rc;
}
#endif /* RX_ENABLE_LOCKS */
if (cur_service != NULL) {
cur_service->nRequestsRunning--;
+ MUTEX_ENTER(&rx_quota_mutex);
if (cur_service->nRequestsRunning < cur_service->minProcs)
rxi_minDeficit++;
rxi_availProcs++;
+ MUTEX_EXIT(&rx_quota_mutex);
}
if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
struct rx_call *tcall, *ncall;
service->nRequestsRunning++;
/* just started call in minProcs pool, need fewer to maintain
* guarantee */
+ MUTEX_ENTER(&rx_quota_mutex);
if (service->nRequestsRunning <= service->minProcs)
rxi_minDeficit--;
rxi_availProcs--;
+ MUTEX_EXIT(&rx_quota_mutex);
rx_nWaiting--;
/* MUTEX_EXIT(&call->lock); */
} else {
CV_SIGNAL(&sq->cv);
#else
service->nRequestsRunning++;
+ MUTEX_ENTER(&rx_quota_mutex);
if (service->nRequestsRunning <= service->minProcs)
rxi_minDeficit--;
rxi_availProcs--;
+ MUTEX_EXIT(&rx_quota_mutex);
osi_rxWakeup(sq);
#endif
}
/* extra packets to add to the quota */
EXT int rx_extraQuota GLOBALSINIT(0);
-/* extra packets to alloc (2 windows by deflt) */
-EXT int rx_extraPackets GLOBALSINIT(32);
+/* extra packets to alloc (2 * maxWindowSize by default) */
+EXT int rx_extraPackets GLOBALSINIT(256);
EXT int rx_stackSize GLOBALSINIT(RX_DEFAULT_STACK_SIZE);
EXT int rx_maxSendWindow GLOBALSINIT(128);
EXT int rx_nackThreshold GLOBALSINIT(3); /* Number NACKS to trigger congestion recovery */
EXT int rx_nDgramThreshold GLOBALSINIT(4); /* Number of packets before increasing
- * packets per datagram */
+ * packets per datagram */
#define RX_MAX_FRAGS 4
EXT int rxi_nSendFrags GLOBALSINIT(RX_MAX_FRAGS); /* max fragments in a datagram */
EXT int rxi_nRecvFrags GLOBALSINIT(RX_MAX_FRAGS);
#define ACKHACK(p,r) { if (((p)->header.seq & (rxi_SoftAckRate))==0) (p)->header.flags |= RX_REQUEST_ACK; }
-EXT int rx_nPackets GLOBALSINIT(100); /* obsolete; use rx_extraPackets now */
+EXT int rx_nPackets GLOBALSINIT(0); /* preallocate packets with rx_extraPackets */
/*
* pthreads thread-specific rx info support
{
int threadID;
-/* jaltman - rxi_dataQuota is protected by a mutex everywhere else */
rxi_MorePackets(rx_maxReceiveWindow + 2); /* alloc more packets */
+ MUTEX_ENTER(&rx_quota_mutex);
rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
/* threadID is used for making decisions in GetCall. Get it by bumping
* number of threads handling incoming calls */
threadID = rxi_availProcs++;
+ MUTEX_EXIT(&rx_quota_mutex);
#ifdef RX_ENABLE_LOCKS
AFS_GUNLOCK();
AFS_GUNLOCK();
#endif /* RX_ENABLE_LOCKS && !AFS_SUN5_ENV */
while (afs_termState != AFSOP_STOP_RXK_LISTENER) {
+ /* See if a check for additional packets was issued */
+ rx_CheckPackets();
+
if (rxp) {
rxi_RestoreDataBufs(rxp);
} else {
(*swapNameProgram) (pid, "listener", &name[0]);
for (;;) {
+ /* See if a check for additional packets was issued */
+ rx_CheckPackets();
+
/* Grab a new packet only if necessary (otherwise re-use the old one) */
if (p) {
rxi_RestoreDataBufs(p);
rx_mallocedP = p;
}
+ rx_nPackets += apackets;
rx_nFreePackets += apackets;
rxi_NeedMorePackets = FALSE;
rxi_PacketsUnWait();
}
rx_nFreePackets += apackets;
-#ifdef RX_ENABLE_TSFPQ
- /* TSFPQ patch also needs to keep track of total packets */
MUTEX_ENTER(&rx_packets_mutex);
rx_nPackets += apackets;
+#ifdef RX_ENABLE_TSFPQ
RX_TS_FPQ_COMPUTE_LIMITS;
- MUTEX_EXIT(&rx_packets_mutex);
#endif /* RX_ENABLE_TSFPQ */
+ MUTEX_EXIT(&rx_packets_mutex);
rxi_NeedMorePackets = FALSE;
rxi_PacketsUnWait();
}
rx_CheckPackets(void)
{
if (rxi_NeedMorePackets) {
- rxi_MorePackets(rx_initSendWindow);
+ rxi_MorePackets(rx_maxSendWindow);
}
}
osi_Panic("rxi_AllocPacket error");
#else /* KERNEL */
if (queue_IsEmpty(&rx_freePacketQueue))
- rxi_MorePacketsNoLock(4 * rx_initSendWindow);
+ rxi_MorePacketsNoLock(rx_maxSendWindow);
#endif /* KERNEL */
osi_Panic("rxi_AllocPacket error");
#else /* KERNEL */
if (queue_IsEmpty(&rx_freePacketQueue))
- rxi_MorePacketsNoLock(4 * rx_initSendWindow);
+ rxi_MorePacketsNoLock(rx_maxSendWindow);
#endif /* KERNEL */
rx_nFreePackets--;
MUTEX_ENTER(&rx_freePktQ_lock);
if (queue_IsEmpty(&rx_freePacketQueue))
- rxi_MorePacketsNoLock(4 * rx_initSendWindow);
+ rxi_MorePacketsNoLock(rx_maxSendWindow);
RX_TS_FPQ_GTOL(rx_ts_info);
* grows past 13, rxdebug packets
* will need to be modified */
-/* Packet classes, for rx_AllocPacket */
+/* Packet classes, for rx_AllocPacket and rx_packetQuota */
#define RX_PACKET_CLASS_RECEIVE 0
#define RX_PACKET_CLASS_SEND 1
#define RX_PACKET_CLASS_SPECIAL 2
MUTEX_EXIT(&listener_mutex);
for (;;) {
+ /* See if a check for additional packets was issued */
+ rx_CheckPackets();
+
/*
* Grab a new packet only if necessary (otherwise re-use the old one)
*/