* rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
* currently allocated within rx. This number is used to allocate the
* memory required to return the statistics when queried.
+ * Protected by the rx_rpc_stats mutex.
*/
static unsigned int rxi_rpc_peer_stat_cnt;
* rxi_rpc_process_stat_cnt counts the total number of local process stat
* structures currently allocated within rx. The number is used to allocate
* the memory required to return the statistics when queried.
+ * Protected by the rx_rpc_stats mutex.
*/
static unsigned int rxi_rpc_process_stat_cnt;
* freeSQEList_lock
*
* serverQueueEntry->lock
- * rx_rpc_stats
* rx_peerHashTable_lock - locked under rx_connHashTable_lock
+ * rx_rpc_stats
* peer->lock - locks peer data fields.
* conn_data_lock - that more than one thread is not updating a conn data
* field at the same time.
* conn->peer was previously a constant for all intents and so has no
* lock protecting this field. The multihomed client delta introduced
* a RX code change : change the peer field in the connection structure
- * to that remote inetrface from which the last packet for this
+ * to that remote interface from which the last packet for this
* connection was sent out. This may become an issue if further changes
* are made.
*/
rx_nFreePackets = 0;
queue_Init(&rx_freePacketQueue);
rxi_NeedMorePackets = FALSE;
+ rx_nPackets = 0; /* rx_nPackets is managed by rxi_MorePackets* */
+
+ /* enforce a minimum number of allocated packets */
+ if (rx_extraPackets < rxi_nSendFrags * rx_maxSendWindow)
+ rx_extraPackets = rxi_nSendFrags * rx_maxSendWindow;
+
+ /* allocate the initial free packet pool */
#ifdef RX_ENABLE_TSFPQ
- rx_nPackets = 0; /* in TSFPQ version, rx_nPackets is managed by rxi_MorePackets* */
rxi_MorePacketsTSFPQ(rx_extraPackets + RX_MAX_QUOTA + 2, RX_TS_FPQ_FLUSH_GLOBAL, 0);
#else /* RX_ENABLE_TSFPQ */
- rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2; /* fudge */
- rxi_MorePackets(rx_nPackets);
+ rxi_MorePackets(rx_extraPackets + RX_MAX_QUOTA + 2); /* fudge */
#endif /* RX_ENABLE_TSFPQ */
rx_CheckPackets();
/* otherwise, can use only if there are enough to allow everyone
* to go to their min quota after this guy starts.
*/
+ MUTEX_ENTER(&rx_quota_mutex);
if (rxi_availProcs > rxi_minDeficit)
rc = 1;
+ MUTEX_EXIT(&rx_quota_mutex);
return rc;
}
#endif /* RX_ENABLE_LOCKS */
* waiting, treat this as a running call, and wait to destroy the
* connection later when the call completes. */
if ((conn->type == RX_CLIENT_CONNECTION)
- && (conn->flags & RX_CONN_MAKECALL_WAITING)) {
+ && (conn->flags & (RX_CONN_MAKECALL_WAITING|RX_CONN_MAKECALL_ACTIVE))) {
conn->flags |= RX_CONN_DESTROY_ME;
MUTEX_EXIT(&conn->conn_data_lock);
USERPRI;
struct rx_call *
rx_NewCall(struct rx_connection *conn)
{
- int i;
+ int i, wait;
struct rx_call *call;
struct clock queueTime;
SPLVAR;
NETPRI;
clock_GetTime(&queueTime);
- MUTEX_ENTER(&conn->conn_call_lock);
-
/*
* Check if there are others waiting for a new call.
* If so, let them go first to avoid starving them.
* RX_CONN_MAKECALL_WAITING flag bit is used to
* indicate that there are indeed calls waiting.
* The flag is set when the waiter is incremented.
- * It is only cleared in rx_EndCall when
- * makeCallWaiters is 0. This prevents us from
- * accidently destroying the connection while it
- * is potentially about to be used.
+ * It is only cleared when makeCallWaiters is 0.
+ * This prevents us from accidently destroying the
+ * connection while it is potentially about to be used.
*/
+ MUTEX_ENTER(&conn->conn_call_lock);
MUTEX_ENTER(&conn->conn_data_lock);
- if (conn->makeCallWaiters) {
- conn->flags |= RX_CONN_MAKECALL_WAITING;
+ while (conn->flags & RX_CONN_MAKECALL_ACTIVE) {
+ conn->flags |= RX_CONN_MAKECALL_WAITING;
conn->makeCallWaiters++;
MUTEX_EXIT(&conn->conn_data_lock);
#endif
MUTEX_ENTER(&conn->conn_data_lock);
conn->makeCallWaiters--;
+ if (conn->makeCallWaiters == 0)
+ conn->flags &= ~RX_CONN_MAKECALL_WAITING;
}
+
+ /* We are now the active thread in rx_NewCall */
+ conn->flags |= RX_CONN_MAKECALL_ACTIVE;
MUTEX_EXIT(&conn->conn_data_lock);
for (;;) {
+ wait = 1;
+
for (i = 0; i < RX_MAXCALLS; i++) {
call = conn->call[i];
if (call) {
- MUTEX_ENTER(&call->lock);
if (call->state == RX_STATE_DALLY) {
- rxi_ResetCall(call, 0);
- (*call->callNumber)++;
- break;
- }
- MUTEX_EXIT(&call->lock);
+ MUTEX_ENTER(&call->lock);
+ if (call->state == RX_STATE_DALLY) {
+ /*
+ * We are setting the state to RX_STATE_RESET to
+ * ensure that no one else will attempt to use this
+ * call once we drop the conn->conn_call_lock and
+ * call->lock. We must drop the conn->conn_call_lock
+ * before calling rxi_ResetCall because the process
+ * of clearing the transmit queue can block for an
+ * extended period of time. If we block while holding
+ * the conn->conn_call_lock, then all rx_EndCall
+ * processing will block as well. This has a detrimental
+ * effect on overall system performance.
+ */
+ call->state = RX_STATE_RESET;
+ CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
+ MUTEX_EXIT(&conn->conn_call_lock);
+ rxi_ResetCall(call, 0);
+ (*call->callNumber)++;
+ if (MUTEX_TRYENTER(&conn->conn_call_lock))
+ break;
+
+ /*
+ * If we failed to be able to safely obtain the
+ * conn->conn_call_lock we will have to drop the
+ * call->lock to avoid a deadlock. When the call->lock
+ * is released the state of the call can change. If it
+ * is no longer RX_STATE_RESET then some other thread is
+ * using the call.
+ */
+ MUTEX_EXIT(&call->lock);
+ MUTEX_ENTER(&conn->conn_call_lock);
+ MUTEX_ENTER(&call->lock);
+
+ if (call->state == RX_STATE_RESET)
+ break;
+
+ /*
+ * If we get here it means that after dropping
+ * the conn->conn_call_lock and call->lock that
+ * the call is no longer ours. If we can't find
+ * a free call in the remaining slots we should
+ * not go immediately to RX_CONN_MAKECALL_WAITING
+ * because by dropping the conn->conn_call_lock
+ * we have given up synchronization with rx_EndCall.
+ * Instead, cycle through one more time to see if
+ * we can find a call that can call our own.
+ */
+ CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
+ wait = 0;
+ }
+ MUTEX_EXIT(&call->lock);
+ }
} else {
+ /* rxi_NewCall returns with mutex locked */
call = rxi_NewCall(conn, i);
+ CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
break;
}
}
if (i < RX_MAXCALLS) {
break;
}
+ if (!wait)
+ continue;
+
MUTEX_ENTER(&conn->conn_data_lock);
conn->flags |= RX_CONN_MAKECALL_WAITING;
conn->makeCallWaiters++;
#endif
MUTEX_ENTER(&conn->conn_data_lock);
conn->makeCallWaiters--;
+ if (conn->makeCallWaiters == 0)
+ conn->flags &= ~RX_CONN_MAKECALL_WAITING;
MUTEX_EXIT(&conn->conn_data_lock);
}
- /*
- * Wake up anyone else who might be giving us a chance to
- * run (see code above that avoids resource starvation).
- */
-#ifdef RX_ENABLE_LOCKS
- CV_BROADCAST(&conn->conn_call_cv);
-#else
- osi_rxWakeup(conn);
-#endif
-
- CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
-
/* Client is initially in send mode */
call->state = RX_STATE_ACTIVE;
call->error = conn->error;
/* Turn on busy protocol. */
rxi_KeepAliveOn(call);
- MUTEX_EXIT(&call->lock);
+ /*
+ * We are no longer the active thread in rx_NewCall
+ */
+ MUTEX_ENTER(&conn->conn_data_lock);
+ conn->flags &= ~RX_CONN_MAKECALL_ACTIVE;
+ MUTEX_EXIT(&conn->conn_data_lock);
+
+ /*
+ * Wake up anyone else who might be giving us a chance to
+ * run (see code above that avoids resource starvation).
+ */
+#ifdef RX_ENABLE_LOCKS
+ CV_BROADCAST(&conn->conn_call_cv);
+#else
+ osi_rxWakeup(conn);
+#endif
MUTEX_EXIT(&conn->conn_call_lock);
- USERPRI;
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- /* Now, if TQ wasn't cleared earlier, do it now. */
- MUTEX_ENTER(&call->lock);
- rxi_WaitforTQBusy(call);
- if (call->flags & RX_CALL_TQ_CLEARME) {
- rxi_ClearTransmitQueue(call, 1);
- /*queue_Init(&call->tq);*/
+ if (call->flags & (RX_CALL_TQ_BUSY | RX_CALL_TQ_CLEARME)) {
+ osi_Panic("rx_NewCall call about to be used without an empty tq");
}
- MUTEX_EXIT(&call->lock);
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+ MUTEX_EXIT(&call->lock);
+ USERPRI;
+
dpf(("rx_NewCall(call %"AFS_PTR_FMT")\n", call));
return call;
}
tservice = rxi_AllocService();
NETPRI;
+
+#ifdef RX_ENABLE_LOCKS
+ MUTEX_INIT(&tservice->svc_data_lock, "svc data lock", MUTEX_DEFAULT, 0);
+#endif
+
for (i = 0; i < RX_MAX_SERVICES; i++) {
struct rx_service *service = rx_services[i];
if (service) {
service->connDeadTime = rx_connDeadTime;
service->executeRequestProc = serviceProc;
service->checkReach = 0;
+ service->nSpecific = 0;
+ service->specific = NULL;
rx_services[i] = service; /* not visible until now */
USERPRI;
return service;
if (cur_service != NULL) {
cur_service->nRequestsRunning--;
+ MUTEX_ENTER(&rx_quota_mutex);
if (cur_service->nRequestsRunning < cur_service->minProcs)
rxi_minDeficit++;
rxi_availProcs++;
+ MUTEX_EXIT(&rx_quota_mutex);
}
if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
struct rx_call *tcall, *ncall;
service->nRequestsRunning++;
/* just started call in minProcs pool, need fewer to maintain
* guarantee */
+ MUTEX_ENTER(&rx_quota_mutex);
if (service->nRequestsRunning <= service->minProcs)
rxi_minDeficit--;
rxi_availProcs--;
+ MUTEX_EXIT(&rx_quota_mutex);
rx_nWaiting--;
/* MUTEX_EXIT(&call->lock); */
} else {
afs_int32 error;
SPLVAR;
-
-
dpf(("rx_EndCall(call %"AFS_PTR_FMT" rc %d error %d abortCode %d)\n",
call, rc, call->error, call->abortCode));
* rx_NewCall is in a stable state. Otherwise, rx_NewCall may
* have checked this call, found it active and by the time it
* goes to sleep, will have missed the signal.
- *
- * Do not clear the RX_CONN_MAKECALL_WAITING flag as long as
- * there are threads waiting to use the conn object.
*/
- MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&conn->conn_call_lock);
- MUTEX_ENTER(&call->lock);
+ MUTEX_EXIT(&call->lock);
+ MUTEX_ENTER(&conn->conn_call_lock);
+ MUTEX_ENTER(&call->lock);
MUTEX_ENTER(&conn->conn_data_lock);
conn->flags |= RX_CONN_BUSY;
if (conn->flags & RX_CONN_MAKECALL_WAITING) {
- if (conn->makeCallWaiters == 0)
- conn->flags &= (~RX_CONN_MAKECALL_WAITING);
MUTEX_EXIT(&conn->conn_data_lock);
#ifdef RX_ENABLE_LOCKS
CV_BROADCAST(&conn->conn_call_cv);
CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
MUTEX_EXIT(&call->lock);
if (conn->type == RX_CLIENT_CONNECTION) {
- MUTEX_EXIT(&conn->conn_call_lock);
+ MUTEX_ENTER(&conn->conn_data_lock);
conn->flags &= ~RX_CONN_BUSY;
+ MUTEX_EXIT(&conn->conn_data_lock);
+ MUTEX_EXIT(&conn->conn_call_lock);
}
USERPRI;
/*
* If someone else destroys a connection, they either have no
* call lock held or are going through this section of code.
*/
+ MUTEX_ENTER(&conn->conn_data_lock);
if (conn->flags & RX_CONN_DESTROY_ME && !(conn->flags & RX_CONN_MAKECALL_WAITING)) {
- MUTEX_ENTER(&conn->conn_data_lock);
conn->refCount++;
MUTEX_EXIT(&conn->conn_data_lock);
#ifdef RX_ENABLE_LOCKS
#else /* RX_ENABLE_LOCKS */
rxi_DestroyConnection(conn);
#endif /* RX_ENABLE_LOCKS */
+ } else {
+ MUTEX_EXIT(&conn->conn_data_lock);
}
}
}
void
-rxi_SetPeerMtu(afs_uint32 host, afs_uint32 port, int mtu)
+rxi_SetPeerMtu(struct rx_peer *peer, afs_uint32 host, afs_uint32 port, int mtu)
{
- struct rx_peer **peer_ptr, **peer_end;
+ struct rx_peer **peer_ptr = NULL, **peer_end = NULL;
+ struct rx_peer *next = NULL;
int hashIndex;
- MUTEX_ENTER(&rx_peerHashTable_lock);
- if (port == 0) {
- for (peer_ptr = &rx_peerHashTable[0], peer_end =
- &rx_peerHashTable[rx_hashTableSize]; peer_ptr < peer_end;
- peer_ptr++) {
- struct rx_peer *peer, *next;
- for (peer = *peer_ptr; peer; peer = next) {
- next = peer->next;
- if (host == peer->host) {
- MUTEX_ENTER(&peer->peer_lock);
- peer->ifMTU=MIN(mtu, peer->ifMTU);
- peer->natMTU = rxi_AdjustIfMTU(peer->ifMTU);
- MUTEX_EXIT(&peer->peer_lock);
- }
- }
- }
+ if (!peer) {
+ MUTEX_ENTER(&rx_peerHashTable_lock);
+ if (port == 0) {
+ peer_ptr = &rx_peerHashTable[0];
+ peer_end = &rx_peerHashTable[rx_hashTableSize];
+ next = NULL;
+ resume:
+ for ( ; peer_ptr < peer_end; peer_ptr++) {
+ if (!peer)
+ peer = *peer_ptr;
+ for ( ; peer; peer = next) {
+ next = peer->next;
+ if (host == peer->host)
+ break;
+ }
+ }
+ } else {
+ hashIndex = PEER_HASH(host, port);
+ for (peer = rx_peerHashTable[hashIndex]; peer; peer = peer->next) {
+ if ((peer->host == host) && (peer->port == port))
+ break;
+ }
+ }
} else {
- struct rx_peer *peer;
- hashIndex = PEER_HASH(host, port);
- for (peer = rx_peerHashTable[hashIndex]; peer; peer = peer->next) {
- if ((peer->host == host) && (peer->port == port)) {
- MUTEX_ENTER(&peer->peer_lock);
- peer->ifMTU=MIN(mtu, peer->ifMTU);
- peer->natMTU = rxi_AdjustIfMTU(peer->ifMTU);
- MUTEX_EXIT(&peer->peer_lock);
- }
- }
+ MUTEX_ENTER(&rx_peerHashTable_lock);
+ }
+
+ if (peer) {
+ peer->refCount++;
+ MUTEX_EXIT(&rx_peerHashTable_lock);
+
+ MUTEX_ENTER(&peer->peer_lock);
+ /* We don't handle dropping below min, so don't */
+ mtu = MAX(mtu, RX_MIN_PACKET_SIZE);
+ peer->ifMTU=MIN(mtu, peer->ifMTU);
+ peer->natMTU = rxi_AdjustIfMTU(peer->ifMTU);
+ /* if we tweaked this down, need to tune our peer MTU too */
+ peer->MTU = MIN(peer->MTU, peer->natMTU);
+ /* if we discovered a sub-1500 mtu, degrade */
+ if (peer->ifMTU < OLD_MAX_PACKET_SIZE)
+ peer->maxDgramPackets = 1;
+ /* We no longer have valid peer packet information */
+ if (peer->maxPacketSize-RX_IPUDP_SIZE > peer->ifMTU)
+ peer->maxPacketSize = 0;
+ MUTEX_EXIT(&peer->peer_lock);
+
+ MUTEX_ENTER(&rx_peerHashTable_lock);
+ peer->refCount--;
+ if (host && !port) {
+ peer = next;
+ /* pick up where we left off */
+ goto resume;
+ }
}
MUTEX_EXIT(&rx_peerHashTable_lock);
}
if (aconn->flags & (RX_CONN_MAKECALL_WAITING | RX_CONN_DESTROY_ME))
return 1;
+
for (i = 0; i < RX_MAXCALLS; i++) {
tcall = aconn->call[i];
if (tcall) {
int newAckCount = 0;
u_short maxMTU = 0; /* Set if peer supports AFS 3.4a jumbo datagrams */
int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
+ int pktsize = 0; /* Set if we need to update the peer mtu */
if (rx_stats_active)
rx_MutexIncrement(rx_stats.ackPacketsRead, rx_stats_mutex);
if (ap->reason == RX_ACK_PING_RESPONSE)
rxi_UpdatePeerReach(conn, call);
+ if (conn->lastPacketSizeSeq) {
+ MUTEX_ENTER(&conn->conn_data_lock);
+ if (first >= conn->lastPacketSizeSeq) {
+ pktsize = conn->lastPacketSize;
+ conn->lastPacketSize = conn->lastPacketSizeSeq = 0;
+ }
+ MUTEX_EXIT(&conn->conn_data_lock);
+ MUTEX_ENTER(&peer->peer_lock);
+ /* start somewhere */
+ if (!peer->maxPacketSize)
+ peer->maxPacketSize = np->length+RX_IPUDP_SIZE;
+
+ if (pktsize > peer->maxPacketSize) {
+ peer->maxPacketSize = pktsize;
+ if ((pktsize-RX_IPUDP_SIZE > peer->ifMTU)) {
+ peer->ifMTU=pktsize-RX_IPUDP_SIZE;
+ peer->natMTU = rxi_AdjustIfMTU(peer->ifMTU);
+ }
+ }
+ MUTEX_EXIT(&peer->peer_lock);
+ }
+
#ifdef RXDEBUG
#ifdef AFS_NT40_ENV
if (rxdebug_active) {
}
call->MTU = RX_HEADER_SIZE + RX_JUMBOBUFFERSIZE;
} else if (call->MTU < peer->maxMTU) {
- call->MTU += peer->natMTU;
- call->MTU = MIN(call->MTU, peer->maxMTU);
+ /* don't upgrade if we can't handle it */
+ if ((call->nDgramPackets == 1) && (call->MTU >= peer->ifMTU))
+ call->MTU = peer->ifMTU;
+ else {
+ call->MTU += peer->natMTU;
+ call->MTU = MIN(call->MTU, peer->maxMTU);
+ }
}
call->nAcks = 0;
}
CV_SIGNAL(&sq->cv);
#else
service->nRequestsRunning++;
+ MUTEX_ENTER(&rx_quota_mutex);
if (service->nRequestsRunning <= service->minProcs)
rxi_minDeficit--;
rxi_availProcs--;
+ MUTEX_EXIT(&rx_quota_mutex);
osi_rxWakeup(sq);
#endif
}
call->tqc -=
#endif /* RXDEBUG_PACKET */
rxi_FreePackets(0, &call->tq);
+ if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
+#ifdef RX_ENABLE_LOCKS
+ CV_BROADCAST(&call->cv_tq);
+#else /* RX_ENABLE_LOCKS */
+ osi_rxWakeup(&call->tq);
+#endif /* RX_ENABLE_LOCKS */
+ }
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
call->flags &= ~RX_CALL_TQ_CLEARME;
}
flags = call->flags;
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- if (flags & RX_CALL_TQ_BUSY) {
- call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
- call->flags |= (flags & RX_CALL_TQ_WAIT);
-#ifdef RX_ENABLE_LOCKS
- CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
- osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
- } else
+ rxi_WaitforTQBusy(call);
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- {
- rxi_ClearTransmitQueue(call, 1);
- /* why init the queue if you just emptied it? queue_Init(&call->tq); */
- if (call->tqWaiters || (flags & RX_CALL_TQ_WAIT)) {
- dpf(("rcall %"AFS_PTR_FMT" has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
- }
- call->flags = 0;
- while (call->tqWaiters) {
-#ifdef RX_ENABLE_LOCKS
- CV_BROADCAST(&call->cv_tq);
-#else /* RX_ENABLE_LOCKS */
- osi_rxWakeup(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
- call->tqWaiters--;
- }
+
+ rxi_ClearTransmitQueue(call, 1);
+ if (call->tqWaiters || (flags & RX_CALL_TQ_WAIT)) {
+ dpf(("rcall %"AFS_PTR_FMT" has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
}
+ call->flags = 0;
rxi_ClearReceiveQueue(call);
/* why init the queue if you just emptied it? queue_Init(&call->rq); */
/* Update last send time for this call (for keep-alive
* processing), and for the connection (so that we can discover
* idle connections) */
- call->lastSendData = conn->lastSendTime = call->lastSendTime = clock_Sec();
+ conn->lastSendTime = call->lastSendTime = clock_Sec();
+ /* Let a set of retransmits trigger an idle timeout */
+ if (!resending)
+ call->lastSendData = call->lastSendTime;
}
/* When sending packets we need to follow these rules:
* processing), and for the connection (so that we can discover
* idle connections) */
conn->lastSendTime = call->lastSendTime = clock_Sec();
- /* Don't count keepalives here, so idleness can be tracked. */
- if ((p->header.type != RX_PACKET_TYPE_ACK) || (((struct rx_ackPacket *)rx_DataOf(p))->reason != RX_ACK_PING))
+ /* Don't count keepalive ping/acks here, so idleness can be tracked. */
+ if ((p->header.type != RX_PACKET_TYPE_ACK) ||
+ ((((struct rx_ackPacket *)rx_DataOf(p))->reason != RX_ACK_PING) &&
+ (((struct rx_ackPacket *)rx_DataOf(p))->reason !=
+ RX_ACK_PING_RESPONSE)))
call->lastSendData = call->lastSendTime;
}
struct rx_connection *conn = call->conn;
afs_uint32 now;
afs_uint32 deadTime;
+ int cerror = 0;
+ int newmtu = 0;
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
if (call->flags & RX_CALL_TQ_BUSY) {
);
if (ire && ire->ire_max_frag > 0)
- rxi_SetPeerMtu(call->conn->peer->host, 0, ire->ire_max_frag);
+ rxi_SetPeerMtu(NULL, call->conn->peer->host, 0,
+ ire->ire_max_frag);
#if defined(GLOBAL_NETSTACKID)
netstack_rele(ns);
#endif
#endif
#endif /* ADAPT_PMTU */
- rxi_CallError(call, RX_CALL_DEAD);
- return -1;
+ cerror = RX_CALL_DEAD;
+ goto mtuout;
} else {
#ifdef RX_ENABLE_LOCKS
/* Cancel pending events */
return -1;
}
return 0;
+mtuout:
+ if (call->conn->msgsizeRetryErr && cerror != RX_CALL_TIMEOUT) {
+ /* if we never succeeded, let the error pass out as-is */
+ if (call->conn->peer->maxPacketSize)
+ cerror = call->conn->msgsizeRetryErr;
+
+ /* if we thought we could send more, perhaps things got worse */
+ if (call->conn->peer->maxPacketSize > conn->lastPacketSize)
+ /* maxpacketsize will be cleared in rxi_SetPeerMtu */
+ newmtu = MAX(call->conn->peer->maxPacketSize-RX_IPUDP_SIZE,
+ conn->lastPacketSize-(128+RX_IPUDP_SIZE));
+ else
+ newmtu = conn->lastPacketSize-(128+RX_IPUDP_SIZE);
+
+ /* minimum capped in SetPeerMtu */
+ rxi_SetPeerMtu(call->conn->peer, 0, 0, newmtu);
+
+ /* clean up */
+ conn->lastPacketSize = 0;
+
+ /* needed so ResetCall doesn't clobber us. */
+ call->MTU = call->conn->peer->ifMTU;
+ }
+ rxi_CallError(call, cerror);
+ return -1;
}
void
RX_CLIENT_CONNECTION ? rx_socket : conn->service->socket);
- MUTEX_ENTER(&conn->conn_data_lock);
- conn->natKeepAliveEvent = NULL;
- MUTEX_EXIT(&conn->conn_data_lock);
-
tp = &tbuffer[sizeof(struct rx_header)];
taddr.sin_family = AF_INET;
taddr.sin_port = rx_PortOf(rx_PeerOf(conn));
tmpiov[0].iov_len = 1 + sizeof(struct rx_header);
osi_NetSend(socket, &taddr, tmpiov, 1, 1 + sizeof(struct rx_header), 1);
- rxi_ScheduleNatKeepAliveEvent(conn);
+
+ MUTEX_ENTER(&conn->conn_data_lock);
+ /* Only reschedule ourselves if the connection would not be destroyed */
+ if (conn->refCount <= 1) {
+ conn->natKeepAliveEvent = NULL;
+ MUTEX_EXIT(&conn->conn_data_lock);
+ rx_DestroyConnection(conn); /* drop the reference for this */
+ } else {
+ conn->natKeepAliveEvent = NULL;
+ conn->refCount--; /* drop the reference for this */
+ rxi_ScheduleNatKeepAliveEvent(conn);
+ MUTEX_EXIT(&conn->conn_data_lock);
+ }
}
void
rxi_ScheduleNatKeepAliveEvent(struct rx_connection *conn)
{
- MUTEX_ENTER(&conn->conn_data_lock);
if (!conn->natKeepAliveEvent && conn->secondsUntilNatPing) {
struct clock when, now;
clock_GetTime(&now);
when = now;
when.sec += conn->secondsUntilNatPing;
+ conn->refCount++; /* hold a reference for this */
conn->natKeepAliveEvent =
rxevent_PostNow(&when, &now, rxi_NatKeepAliveEvent, conn, 0);
}
- MUTEX_EXIT(&conn->conn_data_lock);
}
void
rx_SetConnSecondsUntilNatPing(struct rx_connection *conn, afs_int32 seconds)
{
+ MUTEX_ENTER(&conn->conn_data_lock);
conn->secondsUntilNatPing = seconds;
if (seconds != 0)
rxi_ScheduleNatKeepAliveEvent(conn);
+ MUTEX_EXIT(&conn->conn_data_lock);
}
void
rxi_NatKeepAliveOn(struct rx_connection *conn)
{
+ MUTEX_ENTER(&conn->conn_data_lock);
rxi_ScheduleNatKeepAliveEvent(conn);
+ MUTEX_EXIT(&conn->conn_data_lock);
}
/* When a call is in progress, this routine is called occasionally to
{
struct rx_peer **peer_ptr, **peer_end;
int code;
- MUTEX_ENTER(&rx_rpc_stats);
- MUTEX_ENTER(&rx_peerHashTable_lock);
+
+ /*
+ * Why do we need to hold the rx_peerHashTable_lock across
+ * the incrementing of peer_ptr since the rx_peerHashTable
+ * array is not changing? We don't.
+ *
+ * By dropping the lock periodically we can permit other
+ * activities to be performed while a rxi_ReapConnections
+ * call is in progress. The goal of reap connections
+ * is to clean up quickly without causing large amounts
+ * of contention. Therefore, it is important that global
+ * mutexes not be held for extended periods of time.
+ */
for (peer_ptr = &rx_peerHashTable[0], peer_end =
&rx_peerHashTable[rx_hashTableSize]; peer_ptr < peer_end;
peer_ptr++) {
struct rx_peer *peer, *next, *prev;
- for (prev = peer = *peer_ptr; peer; peer = next) {
+
+ MUTEX_ENTER(&rx_peerHashTable_lock);
+ for (prev = peer = *peer_ptr; peer; peer = next) {
next = peer->next;
code = MUTEX_TRYENTER(&peer->peer_lock);
if ((code) && (peer->refCount == 0)
&& ((peer->idleWhen + rx_idlePeerTime) < now.sec)) {
rx_interface_stat_p rpc_stat, nrpc_stat;
size_t space;
+
+ /*
+ * now know that this peer object is one to be
+ * removed from the hash table. Once it is removed
+ * it can't be referenced by other threads.
+ * Lets remove it first and decrement the struct
+ * nPeerStructs count.
+ */
+ if (peer == *peer_ptr) {
+ *peer_ptr = next;
+ prev = next;
+ } else
+ prev->next = next;
+
+ if (rx_stats_active)
+ rx_MutexDecrement(rx_stats.nPeerStructs, rx_stats_mutex);
+
+ /*
+ * Now if we hold references on 'prev' and 'next'
+ * we can safely drop the rx_peerHashTable_lock
+ * while we destroy this 'peer' object.
+ */
+ if (next)
+ next->refCount++;
+ if (prev)
+ prev->refCount++;
+ MUTEX_EXIT(&rx_peerHashTable_lock);
+
MUTEX_EXIT(&peer->peer_lock);
MUTEX_DESTROY(&peer->peer_lock);
for (queue_Scan
sizeof(rx_function_entry_v1_t);
rxi_Free(rpc_stat, space);
+
+ MUTEX_ENTER(&rx_rpc_stats);
rxi_rpc_peer_stat_cnt -= num_funcs;
+ MUTEX_EXIT(&rx_rpc_stats);
}
rxi_FreePeer(peer);
- if (rx_stats_active)
- rx_MutexDecrement(rx_stats.nPeerStructs, rx_stats_mutex);
- if (peer == *peer_ptr) {
- *peer_ptr = next;
- prev = next;
- } else
- prev->next = next;
+
+ /*
+ * Regain the rx_peerHashTable_lock and
+ * decrement the reference count on 'prev'
+ * and 'next'.
+ */
+ MUTEX_ENTER(&rx_peerHashTable_lock);
+ if (next)
+ next->refCount--;
+ if (prev)
+ prev->refCount--;
} else {
if (code) {
MUTEX_EXIT(&peer->peer_lock);
prev = peer;
}
}
+ MUTEX_EXIT(&rx_peerHashTable_lock);
}
- MUTEX_EXIT(&rx_peerHashTable_lock);
- MUTEX_EXIT(&rx_rpc_stats);
}
/* THIS HACK IS A TEMPORARY HACK. The idea is that the race condition in
}
if (tp) {
+ tp->refCount++;
+ MUTEX_EXIT(&rx_peerHashTable_lock);
+
error = 0;
+ MUTEX_ENTER(&tp->peer_lock);
peerStats->host = tp->host;
peerStats->port = tp->port;
peerStats->ifMTU = tp->ifMTU;
peerStats->bytesSent.low = tp->bytesSent.low;
peerStats->bytesReceived.high = tp->bytesReceived.high;
peerStats->bytesReceived.low = tp->bytesReceived.low;
+ MUTEX_EXIT(&tp->peer_lock);
+
+ MUTEX_ENTER(&rx_peerHashTable_lock);
+ tp->refCount--;
}
MUTEX_EXIT(&rx_peerHashTable_lock);
&rx_peerHashTable[rx_hashTableSize]; peer_ptr < peer_end;
peer_ptr++) {
struct rx_peer *peer, *next;
- for (peer = *peer_ptr; peer; peer = next) {
+
+ MUTEX_ENTER(&rx_peerHashTable_lock);
+ for (peer = *peer_ptr; peer; peer = next) {
rx_interface_stat_p rpc_stat, nrpc_stat;
size_t space;
+
+ MUTEX_ENTER(&rx_rpc_stats);
+ MUTEX_ENTER(&peer->peer_lock);
for (queue_Scan
(&peer->rpcStats, rpc_stat, nrpc_stat,
rx_interface_stat)) {
sizeof(rx_function_entry_v1_t);
rxi_Free(rpc_stat, space);
- MUTEX_ENTER(&rx_rpc_stats);
+
+ /* rx_rpc_stats must be held */
rxi_rpc_peer_stat_cnt -= num_funcs;
- MUTEX_EXIT(&rx_rpc_stats);
}
+ MUTEX_EXIT(&peer->peer_lock);
+ MUTEX_EXIT(&rx_rpc_stats);
+
next = peer->next;
rxi_FreePeer(peer);
if (rx_stats_active)
rx_MutexDecrement(rx_stats.nPeerStructs, rx_stats_mutex);
}
+ MUTEX_EXIT(&rx_peerHashTable_lock);
}
}
for (i = 0; i < RX_MAX_SERVICES; i++) {
MUTEX_EXIT(&conn->conn_data_lock);
}
+void
+rx_SetServiceSpecific(struct rx_service *svc, int key, void *ptr)
+{
+ int i;
+ MUTEX_ENTER(&svc->svc_data_lock);
+ if (!svc->specific) {
+ svc->specific = (void **)malloc((key + 1) * sizeof(void *));
+ for (i = 0; i < key; i++)
+ svc->specific[i] = NULL;
+ svc->nSpecific = key + 1;
+ svc->specific[key] = ptr;
+ } else if (key >= svc->nSpecific) {
+ svc->specific = (void **)
+ realloc(svc->specific, (key + 1) * sizeof(void *));
+ for (i = svc->nSpecific; i < key; i++)
+ svc->specific[i] = NULL;
+ svc->nSpecific = key + 1;
+ svc->specific[key] = ptr;
+ } else {
+ if (svc->specific[key] && rxi_keyCreate_destructor[key])
+ (*rxi_keyCreate_destructor[key]) (svc->specific[key]);
+ svc->specific[key] = ptr;
+ }
+ MUTEX_EXIT(&svc->svc_data_lock);
+}
+
void *
rx_GetSpecific(struct rx_connection *conn, int key)
{
return ptr;
}
+void *
+rx_GetServiceSpecific(struct rx_service *svc, int key)
+{
+ void *ptr;
+ MUTEX_ENTER(&svc->svc_data_lock);
+ if (key >= svc->nSpecific)
+ ptr = NULL;
+ else
+ ptr = svc->specific[key];
+ MUTEX_EXIT(&svc->svc_data_lock);
+ return ptr;
+}
+
+
#endif /* !KERNEL */
/*
return;
MUTEX_ENTER(&rx_rpc_stats);
- MUTEX_ENTER(&peer->peer_lock);
if (rxi_monitor_peerStats) {
+ MUTEX_ENTER(&peer->peer_lock);
rxi_AddRpcStat(&peer->rpcStats, rxInterface, currentFunc, totalFunc,
queueTime, execTime, bytesSent, bytesRcvd, isServer,
peer->host, peer->port, 1, &rxi_rpc_peer_stat_cnt);
+ MUTEX_EXIT(&peer->peer_lock);
}
if (rxi_monitor_processStats) {
0xffffffff, 0xffffffff, 0, &rxi_rpc_process_stat_cnt);
}
- MUTEX_EXIT(&peer->peer_lock);
MUTEX_EXIT(&rx_rpc_stats);
}
struct rx_peer **peer_ptr, **peer_end;
int code;
- MUTEX_ENTER(&rx_rpc_stats);
-
/*
* Turn off peer statistics and if process stats is also off, turn
* off everything
rx_enable_stats = 0;
}
- MUTEX_ENTER(&rx_peerHashTable_lock);
for (peer_ptr = &rx_peerHashTable[0], peer_end =
&rx_peerHashTable[rx_hashTableSize]; peer_ptr < peer_end;
peer_ptr++) {
struct rx_peer *peer, *next, *prev;
- for (prev = peer = *peer_ptr; peer; peer = next) {
+
+ MUTEX_ENTER(&rx_peerHashTable_lock);
+ MUTEX_ENTER(&rx_rpc_stats);
+ for (prev = peer = *peer_ptr; peer; peer = next) {
next = peer->next;
code = MUTEX_TRYENTER(&peer->peer_lock);
if (code) {
rx_interface_stat_p rpc_stat, nrpc_stat;
size_t space;
- for (queue_Scan
+
+ if (prev == *peer_ptr) {
+ *peer_ptr = next;
+ prev = next;
+ } else
+ prev->next = next;
+
+ if (next)
+ next->refCount++;
+ if (prev)
+ prev->refCount++;
+ peer->refCount++;
+ MUTEX_EXIT(&rx_peerHashTable_lock);
+
+ for (queue_Scan
(&peer->rpcStats, rpc_stat, nrpc_stat,
rx_interface_stat)) {
unsigned int num_funcs = 0;
rxi_rpc_peer_stat_cnt -= num_funcs;
}
MUTEX_EXIT(&peer->peer_lock);
- if (prev == *peer_ptr) {
- *peer_ptr = next;
- prev = next;
- } else
- prev->next = next;
+
+ MUTEX_ENTER(&rx_peerHashTable_lock);
+ if (next)
+ next->refCount--;
+ if (prev)
+ prev->refCount--;
+ peer->refCount--;
} else {
prev = peer;
}
}
+ MUTEX_EXIT(&rx_rpc_stats);
+ MUTEX_EXIT(&rx_peerHashTable_lock);
}
- MUTEX_EXIT(&rx_peerHashTable_lock);
- MUTEX_EXIT(&rx_rpc_stats);
}
/*