rx: Reduce dependence on call->lock
authorSimon Wilkinson <sxw@your-file-system.com>
Sat, 2 Oct 2010 03:17:56 +0000 (23:17 -0400)
committerDerrick Brashear <shadow@dementia.org>
Tue, 5 Oct 2010 13:49:21 +0000 (06:49 -0700)
This patch reduces our dependence on call->lock, by allowing more
of the reader thread to run lock free.  Doing so requires that
call->mode only be set by the reader thread.  As a result, call->mode
can only be set to RX_CALL_ERROR by rxi_CallError().  The mode is
set to RX_CALL_ERROR by the reader thread immediately after regaining
the call->lock when it has been dropped.

Change-Id: Ie9541d8beac2d428526f8b2b4cc0004219e820be
Reviewed-on: http://gerrit.openafs.org/2880
Tested-by: BuildBot <buildbot@rampaginggeek.com>
Reviewed-by: Jeffrey Altman <jaltman@openafs.org>
Tested-by: Jeffrey Altman <jaltman@openafs.org>
Reviewed-by: Simon Wilkinson <sxw@inf.ed.ac.uk>
Reviewed-by: Derrick Brashear <shadow@dementia.org>
Tested-by: Derrick Brashear <shadow@dementia.org>

src/rx/rx.c
src/rx/rx_packet.c
src/rx/rx_rdwr.c

index eaa180e..5144b7b 100644 (file)
@@ -2066,6 +2066,7 @@ rx_EndCall(struct rx_call *call, afs_int32 rc)
     call->arrivalProc = (void (*)())0;
     if (rc && call->error == 0) {
        rxi_CallError(call, rc);
+        call->mode = RX_MODE_ERROR;
        /* Send an abort message to the peer if this error code has
         * only just been set.  If it was set previously, assume the
         * peer has already been sent the error code or will request it
@@ -2075,10 +2076,14 @@ rx_EndCall(struct rx_call *call, afs_int32 rc)
     if (conn->type == RX_SERVER_CONNECTION) {
        /* Make sure reply or at least dummy reply is sent */
        if (call->mode == RX_MODE_RECEIVING) {
+           MUTEX_EXIT(&call->lock);
            rxi_WriteProc(call, 0, 0);
+           MUTEX_ENTER(&call->lock);
        }
        if (call->mode == RX_MODE_SENDING) {
+            MUTEX_EXIT(&call->lock);
            rxi_FlushWrite(call);
+            MUTEX_ENTER(&call->lock);
        }
        rxi_calltrace(RX_CALL_END, call);
        /* Call goes to hold state until reply packets are acknowledged */
@@ -2097,7 +2102,9 @@ rx_EndCall(struct rx_call *call, afs_int32 rc)
         * no reply arguments are expected */
        if ((call->mode == RX_MODE_SENDING)
            || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
+           MUTEX_EXIT(&call->lock);
            (void)rxi_ReadProc(call, &dummy, 1);
+           MUTEX_ENTER(&call->lock);
        }
 
        /* If we had an outstanding delayed ack, be nice to the server
@@ -4881,7 +4888,6 @@ rxi_CallError(struct rx_call *call, afs_int32 error)
     rxi_ResetCall(call, 0);
 #endif
     call->error = error;
-    call->mode = RX_MODE_ERROR;
 }
 
 /* Reset various fields in a call structure, and wakeup waiting
index a8364df..e337faa 100644 (file)
@@ -2700,25 +2700,36 @@ rxi_DecodePacketHeader(struct rx_packet *p)
     /* Note: top 16 bits of this last word are the security checksum */
 }
 
+/*
+ * LOCKS HELD: called with call->lock held.
+ *
+ * PrepareSendPacket is the only place in the code that
+ * can increment call->tnext.  This could become an atomic
+ * in the future.  Beyond that there is nothing in this
+ * function that requires the call being locked.  This
+ * function can only be called by the application thread.
+ */
 void
 rxi_PrepareSendPacket(struct rx_call *call,
                      struct rx_packet *p, int last)
 {
     struct rx_connection *conn = call->conn;
+    afs_uint32 seq = call->tnext++;
     unsigned int i;
     afs_int32 len;             /* len must be a signed type; it can go negative */
 
+    /* No data packets on call 0. Where do these come from? */
+    if (*call->callNumber == 0)
+       *call->callNumber = 1;
+
+    MUTEX_EXIT(&call->lock);
     p->flags &= ~RX_PKTFLAG_ACKED;
     p->header.cid = (conn->cid | call->channel);
     p->header.serviceId = conn->serviceId;
     p->header.securityIndex = conn->securityIndex;
 
-    /* No data packets on call 0. Where do these come from? */
-    if (*call->callNumber == 0)
-       *call->callNumber = 1;
-
     p->header.callNumber = *call->callNumber;
-    p->header.seq = call->tnext++;
+    p->header.seq = seq;
     p->header.epoch = conn->epoch;
     p->header.type = RX_PACKET_TYPE_DATA;
     p->header.flags = 0;
@@ -2758,6 +2769,7 @@ rxi_PrepareSendPacket(struct rx_call *call,
     if (len)
         p->wirevec[i - 1].iov_len += len;
     RXS_PreparePacket(conn->securityObject, call, p);
+    MUTEX_ENTER(&call->lock);
 }
 
 /* Given an interface MTU size, calculate an adjusted MTU size that
index f086894..eb0ba9d 100644 (file)
@@ -95,7 +95,7 @@ static int rxdb_fileID = RXDB_FILE_RX_RDWR;
 #endif /* RX_LOCKS_DB */
 /* rxi_ReadProc -- internal version.
  *
- * LOCKS USED -- called at netpri with rx global lock and call->lock held.
+ * LOCKS USED -- called at netpri
  */
 int
 rxi_ReadProc(struct rx_call *call, char *buf,
@@ -120,13 +120,18 @@ rxi_ReadProc(struct rx_call *call, char *buf,
     do {
        if (call->nLeft == 0) {
            /* Get next packet */
+           MUTEX_ENTER(&call->lock);
            for (;;) {
                if (call->error || (call->mode != RX_MODE_RECEIVING)) {
                    if (call->error) {
+                        call->mode = RX_MODE_ERROR;
+                       MUTEX_EXIT(&call->lock);
                        return 0;
                    }
                    if (call->mode == RX_MODE_SENDING) {
+                        MUTEX_EXIT(&call->lock);
                        rxi_FlushWrite(call);
+                        MUTEX_ENTER(&call->lock);
                        continue;
                    }
                }
@@ -161,7 +166,6 @@ rxi_ReadProc(struct rx_call *call, char *buf,
                            rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
                            MUTEX_EXIT(&conn->conn_data_lock);
                            rxi_FreePacket(rp);
-                           MUTEX_ENTER(&call->lock);
 
                            return 0;
                        }
@@ -235,6 +239,7 @@ rxi_ReadProc(struct rx_call *call, char *buf,
 
                /* Are there ever going to be any more packets? */
                if (call->flags & RX_CALL_RECEIVE_DONE) {
+                   MUTEX_EXIT(&call->lock);
                    return requestCount - nbytes;
                }
                /* Wait for in-sequence packet */
@@ -253,10 +258,12 @@ rxi_ReadProc(struct rx_call *call, char *buf,
                call->startWait = 0;
 #ifdef RX_ENABLE_LOCKS
                if (call->error) {
+                   MUTEX_EXIT(&call->lock);
                    return 0;
                }
 #endif /* RX_ENABLE_LOCKS */
            }
+           MUTEX_EXIT(&call->lock);
        } else
            /* assert(cp); */
            /* MTUXXX  this should be replaced by some error-recovery code before shipping */
@@ -352,9 +359,7 @@ rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
     }
 
     NETPRI;
-    MUTEX_ENTER(&call->lock);
     bytes = rxi_ReadProc(call, buf, nbytes);
-    MUTEX_EXIT(&call->lock);
     USERPRI;
     return bytes;
 }
@@ -401,9 +406,7 @@ rx_ReadProc32(struct rx_call *call, afs_int32 * value)
     }
 
     NETPRI;
-    MUTEX_ENTER(&call->lock);
     bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
-    MUTEX_EXIT(&call->lock);
     USERPRI;
 
     return bytes;
@@ -597,12 +600,14 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
  * except the last packet (new current packet) are moved to the iovq
  * while the application is processing the data.
  *
- * LOCKS USED -- called at netpri with rx global lock and call->lock held.
+ * LOCKS USED -- called at netpri.
  */
 int
 rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
              int nbytes)
 {
+    int bytes;
+
     /* Free any packets from the last call to ReadvProc/WritevProc */
     if (queue_IsNotEmpty(&call->iovq)) {
 #ifdef RXDEBUG_PACKET
@@ -615,9 +620,9 @@ rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
        rxi_FlushWrite(call);
     }
 
-    if (call->error) {
-       return 0;
-    }
+    MUTEX_ENTER(&call->lock);
+    if (call->error)
+        goto error;
 
     /* Get whatever data is currently available in the receive queue.
      * If rxi_FillReadVec sends an ack packet then it is possible
@@ -649,15 +654,20 @@ rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
        call->startWait = 0;
     }
     call->flags &= ~RX_CALL_IOVEC_WAIT;
-#ifdef RX_ENABLE_LOCKS
-    if (call->error) {
-       return 0;
-    }
-#endif /* RX_ENABLE_LOCKS */
+
+    if (call->error)
+        goto error;
 
     call->iov = NULL;
     *nio = call->iovNext;
-    return nbytes - call->iovNBytes;
+    bytes = nbytes - call->iovNBytes;
+    MUTEX_EXIT(&call->lock);
+    return bytes;
+
+  error:
+    MUTEX_EXIT(&call->lock);
+    call->mode = RX_MODE_ERROR;
+    return 0;
 }
 
 int
@@ -668,16 +678,15 @@ rx_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
     SPLVAR;
 
     NETPRI;
-    MUTEX_ENTER(&call->lock);
     bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
-    MUTEX_EXIT(&call->lock);
     USERPRI;
     return bytes;
 }
 
 /* rxi_WriteProc -- internal version.
  *
- * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
+ * LOCKS USED -- called at netpri
+ */
 
 int
 rxi_WriteProc(struct rx_call *call, char *buf,
@@ -720,6 +729,9 @@ rxi_WriteProc(struct rx_call *call, char *buf,
      * anyway. */
     do {
        if (call->nFree == 0) {
+           MUTEX_ENTER(&call->lock);
+            if (call->error)
+                call->mode = RX_MODE_ERROR;
            if (!call->error && cp) {
                 /* Clear the current packet now so that if
                  * we are forced to wait and drop the lock
@@ -791,6 +803,8 @@ rxi_WriteProc(struct rx_call *call, char *buf,
                call->startWait = 0;
 #ifdef RX_ENABLE_LOCKS
                if (call->error) {
+                    call->mode = RX_MODE_ERROR;
+                   MUTEX_EXIT(&call->lock);
                    return 0;
                }
 #endif /* RX_ENABLE_LOCKS */
@@ -811,6 +825,7 @@ rxi_WriteProc(struct rx_call *call, char *buf,
                    cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
            }
            if (call->error) {
+                call->mode = RX_MODE_ERROR;
                if (cp) {
 #ifdef RX_TRACK_PACKETS
                    cp->flags &= ~RX_PKTFLAG_CP;
@@ -818,8 +833,10 @@ rxi_WriteProc(struct rx_call *call, char *buf,
                    rxi_FreePacket(cp);
                    call->currentPacket = NULL;
                }
+               MUTEX_EXIT(&call->lock);
                return 0;
            }
+           MUTEX_EXIT(&call->lock);
        }
 
        if (cp && (int)call->nFree < nbytes) {
@@ -911,9 +928,7 @@ rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
     }
 
     NETPRI;
-    MUTEX_ENTER(&call->lock);
     bytes = rxi_WriteProc(call, buf, nbytes);
-    MUTEX_EXIT(&call->lock);
     USERPRI;
     return bytes;
 }
@@ -957,9 +972,7 @@ rx_WriteProc32(struct rx_call *call, afs_int32 * value)
     }
 
     NETPRI;
-    MUTEX_ENTER(&call->lock);
     bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
-    MUTEX_EXIT(&call->lock);
     USERPRI;
     return bytes;
 }
@@ -969,7 +982,8 @@ rx_WriteProc32(struct rx_call *call, afs_int32 * value)
  * Fill in an iovec to point to data in packet buffers. The application
  * calls rxi_WritevProc when the buffers are full.
  *
- * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
+ * LOCKS USED -- called at netpri.
+ */
 
 int
 rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
@@ -1024,7 +1038,9 @@ rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
 
        if (tnFree == 0) {
            /* current packet is full, allocate a new one */
+           MUTEX_ENTER(&call->lock);
            cp = rxi_AllocSendPacket(call, nbytes);
+           MUTEX_EXIT(&call->lock);
            if (cp == NULL) {
                /* out of space, return what we have */
                *nio = nextio;
@@ -1097,9 +1113,7 @@ rx_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
     SPLVAR;
 
     NETPRI;
-    MUTEX_ENTER(&call->lock);
     bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
-    MUTEX_EXIT(&call->lock);
     USERPRI;
     return bytes;
 }
@@ -1108,8 +1122,8 @@ rx_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
  *
  * Send buffers allocated in rxi_WritevAlloc.
  *
- * LOCKS USED -- called at netpri with rx global lock and call->lock held. */
-
+ * LOCKS USED -- called at netpri.
+ */
 int
 rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
 {
@@ -1127,7 +1141,10 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
     requestCount = nbytes;
     nextio = 0;
 
-    if (call->mode != RX_MODE_SENDING) {
+    MUTEX_ENTER(&call->lock);
+    if (call->error) {
+        call->mode = RX_MODE_ERROR;
+    } else if (call->mode != RX_MODE_SENDING) {
        call->error = RX_PROTOCOL_ERROR;
     }
 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
@@ -1146,10 +1163,11 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
             call->flags &= ~RX_CALL_TQ_WAIT;
     }
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
-    /* cp is no longer valid since we may have given up the lock */
     cp = call->currentPacket;
 
     if (call->error) {
+        call->mode = RX_MODE_ERROR;
+       MUTEX_EXIT(&call->lock);
        if (cp) {
 #ifdef RX_TRACK_PACKETS
             cp->flags &= ~RX_PKTFLAG_CP;
@@ -1159,7 +1177,6 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
 #ifdef RXDEBUG_PACKET
             call->iovqc++;
 #endif /* RXDEBUG_PACKET */
-           cp = call->currentPacket = (struct rx_packet *)0;
        }
 #ifdef RXDEBUG_PACKET
         call->iovqc -=
@@ -1196,6 +1213,7 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
            /* The head of the iovq is now the current packet */
            if (nbytes) {
                if (queue_IsEmpty(&call->iovq)) {
+                    MUTEX_EXIT(&call->lock);
                    call->error = RX_PROTOCOL_ERROR;
 #ifdef RXDEBUG_PACKET
                     tmpqc -=
@@ -1230,6 +1248,7 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
            if (iov[nextio].iov_base != call->curpos
                || iov[nextio].iov_len > (int)call->curlen) {
                call->error = RX_PROTOCOL_ERROR;
+                MUTEX_EXIT(&call->lock);
                if (cp) {
 #ifdef RX_TRACK_PACKETS
                    cp->flags &= ~RX_PKTFLAG_CP;
@@ -1271,6 +1290,10 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
         p->flags |= RX_PKTFLAG_TQ;
     }
 #endif
+
+    if (call->error)
+        call->mode = RX_MODE_ERROR;
+
     queue_SpliceAppend(&call->tq, &tmpq);
 
     if (!(call->flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
@@ -1289,19 +1312,23 @@ rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
 #endif
        call->startWait = 0;
     }
+
     /* cp is no longer valid since we may have given up the lock */
     cp = call->currentPacket;
 
     if (call->error) {
+        call->mode = RX_MODE_ERROR;
+        call->currentPacket = NULL;
+        MUTEX_EXIT(&call->lock);
        if (cp) {
 #ifdef RX_TRACK_PACKETS
            cp->flags &= ~RX_PKTFLAG_CP;
 #endif
            rxi_FreePacket(cp);
-            cp = call->currentPacket = (struct rx_packet *)0;
        }
        return 0;
     }
+    MUTEX_EXIT(&call->lock);
 
     return requestCount - nbytes;
 }
@@ -1313,15 +1340,16 @@ rx_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
     SPLVAR;
 
     NETPRI;
-    MUTEX_ENTER(&call->lock);
     bytes = rxi_WritevProc(call, iov, nio, nbytes);
-    MUTEX_EXIT(&call->lock);
     USERPRI;
     return bytes;
 }
 
 /* Flush any buffered data to the stream, switch to read mode
- * (clients) or to EOF mode (servers) */
+ * (clients) or to EOF mode (servers)
+ *
+ * LOCKS HELD: called at netpri.
+ */
 void
 rxi_FlushWrite(struct rx_call *call)
 {
@@ -1354,6 +1382,7 @@ rxi_FlushWrite(struct rx_call *call)
        }
 #endif
 
+        MUTEX_ENTER(&call->lock);
 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
        /* Wait until TQ_BUSY is reset before adding any
         * packets to the transmit queue
@@ -1372,7 +1401,9 @@ rxi_FlushWrite(struct rx_call *call)
        }
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
 
-        /* cp is no longer valid since we may have given up the lock */
+        if (call->error)
+            call->mode = RX_MODE_ERROR;
+
         cp = call->currentPacket;
 
        if (cp) {
@@ -1411,6 +1442,7 @@ rxi_FlushWrite(struct rx_call *call)
             flags & (RX_CALL_FAST_RECOVER | RX_CALL_FAST_RECOVER_WAIT))) {
            rxi_Start(0, call, 0, 0);
        }
+        MUTEX_EXIT(&call->lock);
     }
 }
 
@@ -1421,8 +1453,6 @@ rx_FlushWrite(struct rx_call *call)
 {
     SPLVAR;
     NETPRI;
-    MUTEX_ENTER(&call->lock);
     rxi_FlushWrite(call);
-    MUTEX_EXIT(&call->lock);
     USERPRI;
 }