From: Jeffrey Altman Date: Sun, 10 Jul 2011 02:43:45 +0000 (+0100) Subject: rx: prevent connection channel assignment race X-Git-Tag: openafs-devel-1_7_1~317 X-Git-Url: https://git.openafs.org/?p=openafs.git;a=commitdiff_plain;h=99b43273c0203881ea3d2d50f0abf000cdc0b03e rx: prevent connection channel assignment race When rx was converted to use pthreads, the code that allocates a call to a connection channel in rxi_ReceivePacket() was not made thread safe. The code prior to this patchset permitted a race in the server connection case. The rx_connection channel assignment in rxi_ReceivePacket() and the call destruction in rxi_FreeCall() and rxi_DestroyConnectionNoLock() did not consistently protect the rx_connection channel array using the conn_call_lock. This race could result in rxi_ReceivePacket() operating on a rx_call which was disconnected from the previously assigned rx_connection. In addition, the code in rxi_ReceivePacket() that was intended to protect the allocation of a call using rxi_NewCall() to the connection channel array was racy with itself. This patchset consistently applies the conn_call_lock to protect the allocation / deallocation of calls to the connection channel array and in the process simplifies the logic in rxi_ReceivePacket() as it is no longer necessary to protect against a null call pointer since the race can no longer be lost. Change-Id: Id61b55b4d1d57a2b9b35ea942545ef4bdc8d33f3 Reviewed-on: http://gerrit.openafs.org/4963 Tested-by: BuildBot Reviewed-by: Derrick Brashear --- diff --git a/src/rx/rx.c b/src/rx/rx.c index 3ce5178..1bbf78d 100644 --- a/src/rx/rx.c +++ b/src/rx/rx.c @@ -1208,6 +1208,7 @@ rxi_DestroyConnectionNoLock(struct rx_connection *conn) MUTEX_EXIT(&conn->conn_data_lock); /* Check for extant references to this connection */ + MUTEX_ENTER(&conn->conn_call_lock); for (i = 0; i < RX_MAXCALLS; i++) { struct rx_call *call = conn->call[i]; if (call) { @@ -1231,6 +1232,8 @@ rxi_DestroyConnectionNoLock(struct rx_connection *conn) } } } + MUTEX_EXIT(&conn->conn_call_lock); + #ifdef RX_ENABLE_LOCKS if (!havecalls) { if (MUTEX_TRYENTER(&conn->conn_data_lock)) { @@ -2665,7 +2668,11 @@ rxi_FreeCall(struct rx_call *call, int haveCTLock) call->state = RX_STATE_RESET; MUTEX_EXIT(&rx_refcnt_mutex); rxi_ResetCall(call, 0); - call->conn->call[channel] = (struct rx_call *)0; + + MUTEX_ENTER(&conn->conn_call_lock); + if (call->conn->call[channel] == call) + call->conn->call[channel] = 0; + MUTEX_EXIT(&conn->conn_call_lock); MUTEX_ENTER(&rx_freeCallQueue_lock); SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock); @@ -3207,94 +3214,77 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket, channel = np->header.cid & RX_CHANNELMASK; call = conn->call[channel]; -#ifdef RX_ENABLE_LOCKS - if (call) - MUTEX_ENTER(&call->lock); - /* Test to see if call struct is still attached to conn. */ - if (call != conn->call[channel]) { - if (call) - MUTEX_EXIT(&call->lock); - if (type == RX_SERVER_CONNECTION) { - call = conn->call[channel]; - /* If we started with no call attached and there is one now, - * another thread is also running this routine and has gotten - * the connection channel. We should drop this packet in the tests - * below. If there was a call on this connection and it's now - * gone, then we'll be making a new call below. - * If there was previously a call and it's now different then - * the old call was freed and another thread running this routine - * has created a call on this channel. One of these two threads - * has a packet for the old call and the code below handles those - * cases. - */ - if (call) - MUTEX_ENTER(&call->lock); - } else { - /* This packet can't be for this call. If the new call address is - * 0 then no call is running on this channel. If there is a call - * then, since this is a client connection we're getting data for - * it must be for the previous call. - */ - if (rx_stats_active) - rx_atomic_inc(&rx_stats.spuriousPacketsRead); - MUTEX_ENTER(&rx_refcnt_mutex); - conn->refCount--; - MUTEX_EXIT(&rx_refcnt_mutex); - return np; - } - } -#endif - currentCallNumber = conn->callNumber[channel]; - if (type == RX_SERVER_CONNECTION) { /* We're the server */ - if (np->header.callNumber < currentCallNumber) { - if (rx_stats_active) - rx_atomic_inc(&rx_stats.spuriousPacketsRead); -#ifdef RX_ENABLE_LOCKS - if (call) - MUTEX_EXIT(&call->lock); -#endif - MUTEX_ENTER(&rx_refcnt_mutex); - conn->refCount--; - MUTEX_EXIT(&rx_refcnt_mutex); - return np; - } - if (!call) { - MUTEX_ENTER(&conn->conn_call_lock); - call = rxi_NewCall(conn, channel); - MUTEX_EXIT(&conn->conn_call_lock); - *call->callNumber = np->header.callNumber; + if (call) { + MUTEX_ENTER(&call->lock); + currentCallNumber = conn->callNumber[channel]; + } else if (type == RX_SERVER_CONNECTION) { /* No call allocated */ + MUTEX_ENTER(&conn->conn_call_lock); + call = conn->call[channel]; + if (call) { + MUTEX_ENTER(&call->lock); + MUTEX_EXIT(&conn->conn_call_lock); + currentCallNumber = conn->callNumber[channel]; + } else { + call = rxi_NewCall(conn, channel); /* returns locked call */ + MUTEX_EXIT(&conn->conn_call_lock); + *call->callNumber = currentCallNumber = np->header.callNumber; #ifdef RXDEBUG - if (np->header.callNumber == 0) - dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" len %d\n", - np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port), - np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq, - np->header.flags, np, np->length)); + if (np->header.callNumber == 0) + dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" len %d\n", + np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port), + np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq, + np->header.flags, np, np->length)); #endif - call->state = RX_STATE_PRECALL; - clock_GetTime(&call->queueTime); - hzero(call->bytesSent); - hzero(call->bytesRcvd); - /* - * If the number of queued calls exceeds the overload - * threshold then abort this call. - */ - if ((rx_BusyThreshold > 0) && - (rx_atomic_read(&rx_nWaiting) > rx_BusyThreshold)) { - struct rx_packet *tp; - - rxi_CallError(call, rx_BusyError); - tp = rxi_SendCallAbort(call, np, 1, 0); - MUTEX_EXIT(&call->lock); + call->state = RX_STATE_PRECALL; + clock_GetTime(&call->queueTime); + hzero(call->bytesSent); + hzero(call->bytesRcvd); + /* + * If the number of queued calls exceeds the overload + * threshold then abort this call. + */ + if ((rx_BusyThreshold > 0) && + (rx_atomic_read(&rx_nWaiting) > rx_BusyThreshold)) { + struct rx_packet *tp; + + rxi_CallError(call, rx_BusyError); + tp = rxi_SendCallAbort(call, np, 1, 0); + MUTEX_EXIT(&call->lock); MUTEX_ENTER(&rx_refcnt_mutex); - conn->refCount--; + conn->refCount--; MUTEX_EXIT(&rx_refcnt_mutex); if (rx_stats_active) rx_atomic_inc(&rx_stats.nBusies); - return tp; - } - rxi_KeepAliveOn(call); - } else if (np->header.callNumber != currentCallNumber) { + return tp; + } + rxi_KeepAliveOn(call); + } + } else { /* RX_CLIENT_CONNECTION and No call allocated */ + /* This packet can't be for this call. If the new call address is + * 0 then no call is running on this channel. If there is a call + * then, since this is a client connection we're getting data for + * it must be for the previous call. + */ + if (rx_stats_active) + rx_atomic_inc(&rx_stats.spuriousPacketsRead); + MUTEX_ENTER(&rx_refcnt_mutex); + conn->refCount--; + MUTEX_EXIT(&rx_refcnt_mutex); + return np; + } + + /* There is a non-NULL locked call at this point */ + if (type == RX_SERVER_CONNECTION) { /* We're the server */ + if (np->header.callNumber < currentCallNumber) { + MUTEX_EXIT(&call->lock); + if (rx_stats_active) + rx_atomic_inc(&rx_stats.spuriousPacketsRead); + MUTEX_ENTER(&rx_refcnt_mutex); + conn->refCount--; + MUTEX_EXIT(&rx_refcnt_mutex); + return np; + } else if (np->header.callNumber != currentCallNumber) { /* Wait until the transmit queue is idle before deciding * whether to reset the current call. Chances are that the * call will be in ether DALLY or HOLD state once the TQ_BUSY @@ -3370,15 +3360,11 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket, } } else { /* we're the client */ /* Ignore all incoming acknowledgements for calls in DALLY state */ - if (call && (call->state == RX_STATE_DALLY) + if ((call->state == RX_STATE_DALLY) && (np->header.type == RX_PACKET_TYPE_ACK)) { if (rx_stats_active) rx_atomic_inc(&rx_stats.ignorePacketDally); -#ifdef RX_ENABLE_LOCKS - if (call) { - MUTEX_EXIT(&call->lock); - } -#endif + MUTEX_EXIT(&call->lock); MUTEX_ENTER(&rx_refcnt_mutex); conn->refCount--; MUTEX_EXIT(&rx_refcnt_mutex); @@ -3387,14 +3373,10 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket, /* Ignore anything that's not relevant to the current call. If there * isn't a current call, then no packet is relevant. */ - if (!call || (np->header.callNumber != currentCallNumber)) { + if (np->header.callNumber != currentCallNumber) { if (rx_stats_active) rx_atomic_inc(&rx_stats.spuriousPacketsRead); -#ifdef RX_ENABLE_LOCKS - if (call) { - MUTEX_EXIT(&call->lock); - } -#endif + MUTEX_EXIT(&call->lock); MUTEX_ENTER(&rx_refcnt_mutex); conn->refCount--; MUTEX_EXIT(&rx_refcnt_mutex); @@ -3403,9 +3385,7 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket, /* If the service security object index stamped in the packet does not * match the connection's security index, ignore the packet */ if (np->header.securityIndex != conn->securityIndex) { -#ifdef RX_ENABLE_LOCKS MUTEX_EXIT(&call->lock); -#endif MUTEX_ENTER(&rx_refcnt_mutex); conn->refCount--; MUTEX_EXIT(&rx_refcnt_mutex);