RX: Tidy reader data locking
authorSimon Wilkinson <sxw@your-file-system.com>
Sun, 26 Sep 2010 14:48:54 +0000 (15:48 +0100)
committerDerrick Brashear <shadow@dementia.org>
Wed, 29 Sep 2010 03:01:49 +0000 (20:01 -0700)
Data which is accessed only by the reader thread doesn't need to be
protected by call->lock

Remove the call->lock protection where it isn't required, which makes
certain read/write calls lock free.

Stop rx_ResetCall from manipulating reader thread data. This data will
be zero'd and cleared when the reader thread calls rx_EndCall, and
doesn't need to be reset by the Listener thread.

The change which made rx_ResetCall reset reader thread information
was originally part of 559ea99b. It caused race conditions that were
fixed by adding additional lock protection in d0cc6e, 4dadd2 and
423ab97e. This commit reverts portions of all of those changes. It
is safe to not clear the iovc in ResetCall because any NewCall must
be balanced by a corresponding EndCall in the reader thread, and
EndCall does the appropriate freeing of reader elements.

Change-Id: I450469a4591fbe4af34482a2b219708795c57e8e
Reviewed-on: http://gerrit.openafs.org/2856
Reviewed-by: Derrick Brashear <shadow@dementia.org>
Reviewed-by: Jeffrey Altman <jaltman@openafs.org>
Tested-by: BuildBot <buildbot@rampaginggeek.com>

src/rx/rx.c
src/rx/rx_rdwr.c

index 0f015c8..ce7d836 100644 (file)
@@ -4899,23 +4899,6 @@ rxi_ResetCall(struct rx_call *call, int newcall)
     rxi_ClearReceiveQueue(call);
     /* why init the queue if you just emptied it? queue_Init(&call->rq); */
 
-    if (call->currentPacket) {
-#ifdef RX_TRACK_PACKETS
-        call->currentPacket->flags &= ~RX_PKTFLAG_CP;
-        call->currentPacket->flags |= RX_PKTFLAG_IOVQ;
-#endif
-        queue_Prepend(&call->iovq, call->currentPacket);
-#ifdef RXDEBUG_PACKET
-        call->iovqc++;
-#endif /* RXDEBUG_PACKET */
-        call->currentPacket = (struct rx_packet *)0;
-    }
-    call->curlen = call->nLeft = call->nFree = 0;
-
-#ifdef RXDEBUG_PACKET
-    call->iovqc -=
-#endif
-        rxi_FreePackets(0, &call->iovq);
 
     call->error = 0;
     call->twind = call->conn->twind[call->channel];
index 05097e5..27acf1b 100644 (file)
@@ -246,7 +246,6 @@ rxi_ReadProc(struct rx_call *call, char *buf,
                    osi_rxSleep(&call->rq);
 #endif
                }
-                /* cp is no longer valid since we may have given up the lock */
                 cp = call->currentPacket;
 
                call->startWait = 0;
@@ -318,12 +317,9 @@ rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
     int tcurlen;
     int tnLeft;
     char *tcurpos;
-    int locked = 1;
     SPLVAR;
 
     /* Free any packets from the last call to ReadvProc/WritevProc */
-    NETPRI;
-    MUTEX_ENTER(&call->lock);
     if (!queue_IsEmpty(&call->iovq)) {
 #ifdef RXDEBUG_PACKET
         call->iovqc -=
@@ -339,10 +335,6 @@ rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
     tnLeft = call->nLeft;
     if (!call->error && tcurlen > nbytes && tnLeft > nbytes) {
        tcurpos = call->curpos;
-        MUTEX_EXIT(&call->lock);
-        USERPRI;
-        locked = 0;
-
         memcpy(buf, tcurpos, nbytes);
 
        call->curpos = tcurpos + nbytes;
@@ -351,20 +343,17 @@ rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
 
         if (!call->nLeft && call->currentPacket != NULL) {
             /* out of packet.  Get another one. */
-            NETPRI;
-            MUTEX_ENTER(&call->lock);
-            locked = 1;
             rxi_FreePacket(call->currentPacket);
             call->currentPacket = (struct rx_packet *)0;
         }
-       bytes = nbytes;
-    } else
-        bytes = rxi_ReadProc(call, buf, nbytes);
-
-    if (locked) {
-        MUTEX_EXIT(&call->lock);
-        USERPRI;
+       return nbytes;
     }
+
+    NETPRI;
+    MUTEX_ENTER(&call->lock);
+    bytes = rxi_ReadProc(call, buf, nbytes);
+    MUTEX_EXIT(&call->lock);
+    USERPRI;
     return bytes;
 }
 
@@ -376,12 +365,9 @@ rx_ReadProc32(struct rx_call *call, afs_int32 * value)
     int tcurlen;
     int tnLeft;
     char *tcurpos;
-    int locked = 1;
     SPLVAR;
 
     /* Free any packets from the last call to ReadvProc/WritevProc */
-    NETPRI;
-    MUTEX_ENTER(&call->lock);
     if (!queue_IsEmpty(&call->iovq)) {
 #ifdef RXDEBUG_PACKET
         call->iovqc -=
@@ -398,9 +384,6 @@ rx_ReadProc32(struct rx_call *call, afs_int32 * value)
     if (!call->error && tcurlen >= sizeof(afs_int32)
        && tnLeft >= sizeof(afs_int32)) {
        tcurpos = call->curpos;
-        MUTEX_EXIT(&call->lock);
-        USERPRI;
-        locked = 0;
 
         memcpy((char *)value, tcurpos, sizeof(afs_int32));
 
@@ -409,20 +392,18 @@ rx_ReadProc32(struct rx_call *call, afs_int32 * value)
        call->nLeft = (u_short)(tnLeft - sizeof(afs_int32));
         if (!call->nLeft && call->currentPacket != NULL) {
             /* out of packet.  Get another one. */
-            NETPRI;
-            MUTEX_ENTER(&call->lock);
-            locked = 1;
             rxi_FreePacket(call->currentPacket);
             call->currentPacket = (struct rx_packet *)0;
         }
-       bytes = sizeof(afs_int32);
-    } else
-        bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
-
-    if (locked) {
-        MUTEX_EXIT(&call->lock);
-        USERPRI;
+       return sizeof(afs_int32);
     }
+
+    NETPRI;
+    MUTEX_ENTER(&call->lock);
+    bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
+    MUTEX_EXIT(&call->lock);
+    USERPRI;
+
     return bytes;
 }
 
@@ -899,12 +880,9 @@ rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
     int tcurlen;
     int tnFree;
     char *tcurpos;
-    int locked = 1;
     SPLVAR;
 
     /* Free any packets from the last call to ReadvProc/WritevProc */
-    NETPRI;
-    MUTEX_ENTER(&call->lock);
     if (queue_IsNotEmpty(&call->iovq)) {
 #ifdef RXDEBUG_PACKET
         call->iovqc -=
@@ -921,22 +899,18 @@ rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
     if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
        tcurpos = call->curpos;
 
-        MUTEX_EXIT(&call->lock);
-        USERPRI;
-        locked = 0;
-
        memcpy(tcurpos, buf, nbytes);
        call->curpos = tcurpos + nbytes;
        call->curlen = (u_short)(tcurlen - nbytes);
        call->nFree = (u_short)(tnFree - nbytes);
-       bytes = nbytes;
-    } else
-        bytes = rxi_WriteProc(call, buf, nbytes);
-
-    if (locked) {
-        MUTEX_EXIT(&call->lock);
-        USERPRI;
+       return nbytes;
     }
+
+    NETPRI;
+    MUTEX_ENTER(&call->lock);
+    bytes = rxi_WriteProc(call, buf, nbytes);
+    MUTEX_EXIT(&call->lock);
+    USERPRI;
     return bytes;
 }
 
@@ -948,12 +922,8 @@ rx_WriteProc32(struct rx_call *call, afs_int32 * value)
     int tcurlen;
     int tnFree;
     char *tcurpos;
-    int locked = 1;
     SPLVAR;
 
-    /* Free any packets from the last call to ReadvProc/WritevProc */
-    NETPRI;
-    MUTEX_ENTER(&call->lock);
     if (queue_IsNotEmpty(&call->iovq)) {
 #ifdef RXDEBUG_PACKET
         call->iovqc -=
@@ -971,10 +941,6 @@ rx_WriteProc32(struct rx_call *call, afs_int32 * value)
        && tnFree >= sizeof(afs_int32)) {
        tcurpos = call->curpos;
 
-        MUTEX_EXIT(&call->lock);
-        USERPRI;
-        locked = 0;
-
        if (!((size_t)tcurpos & (sizeof(afs_int32) - 1))) {
            *((afs_int32 *) (tcurpos)) = *value;
        } else {
@@ -983,14 +949,14 @@ rx_WriteProc32(struct rx_call *call, afs_int32 * value)
        call->curpos = tcurpos + sizeof(afs_int32);
        call->curlen = (u_short)(tcurlen - sizeof(afs_int32));
        call->nFree = (u_short)(tnFree - sizeof(afs_int32));
-       bytes = sizeof(afs_int32);
-    } else
-        bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
-
-    if (locked) {
-        MUTEX_EXIT(&call->lock);
-        USERPRI;
+       return sizeof(afs_int32);
     }
+
+    NETPRI;
+    MUTEX_ENTER(&call->lock);
+    bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
+    MUTEX_EXIT(&call->lock);
+    USERPRI;
     return bytes;
 }