ubik: Drop dbase versionLock during I/O and sleeps
authorAndrew Deason <adeason@sinenomine.net>
Wed, 9 Jun 2010 17:45:57 +0000 (12:45 -0500)
committerDerrick Brashear <shadow@dementia.org>
Mon, 30 Aug 2010 00:32:31 +0000 (17:32 -0700)
Currently we hold versionLock during all ubik network I/O and while we
are sleeping for whatever reason. For pthreaded ubik, to allow other
things to happen during those times, drop the lock and reacquire when
we hit the net or sleep.

Change-Id: I414b3adfadc0bb506b98e8677ec5dfbb269e0129
Reviewed-on: http://gerrit.openafs.org/2107
Reviewed-by: Derrick Brashear <shadow@dementia.org>
Tested-by: Derrick Brashear <shadow@dementia.org>

src/ubik/ubik.c

index b8088a5..4b608c9 100644 (file)
@@ -94,6 +94,30 @@ struct rx_securityClass *ubik_sc[3];
 
 #define        CStampVersion       1   /* meaning set ts->version */
 
+static_inline struct rx_connection *
+Quorum_StartIO(struct ubik_trans *atrans, struct ubik_server *as)
+{
+    struct rx_connection *conn;
+
+    conn = as->disk_rxcid;
+
+#ifdef AFS_PTHREAD_ENV
+    rx_GetConnection(conn);
+    DBRELE(atrans->dbase);
+#endif /* AFS_PTHREAD_ENV */
+
+    return conn;
+}
+
+static_inline void
+Quorum_EndIO(struct ubik_trans *atrans, struct rx_connection *aconn)
+{
+#ifdef AFS_PTHREAD_ENV
+    DBHOLD(atrans->dbase);
+    rx_PutConnection(aconn);
+#endif /* AFS_PTHREAD_ENV */
+}
+
 /*!
  * \brief Perform an operation at a quorum, handling error conditions.
  * \return 0 if all worked and a quorum was contacted successfully
@@ -115,6 +139,7 @@ ContactQuorum_NoArguments(afs_int32 (*proc)(struct rx_connection *, ubik_tid *),
     struct ubik_server *ts;
     afs_int32 code;
     afs_int32 rcode, okcalls;
+    struct rx_connection *conn;
 
     rcode = 0;
     okcalls = 0;
@@ -124,7 +149,14 @@ ContactQuorum_NoArguments(afs_int32 (*proc)(struct rx_connection *, ubik_tid *),
            ts->currentDB = 0;  /* db is no longer current; we just missed an update */
            continue;           /* not up-to-date, don't bother */
        }
-       code = (*proc)(ts->disk_rxcid, &atrans->tid);
+
+       conn = Quorum_StartIO(atrans, ts);
+
+       code = (*proc)(conn, &atrans->tid);
+
+       Quorum_EndIO(atrans, conn);
+       conn = NULL;
+
        if (code) {             /* failure */
            rcode = code;
            ts->up = 0;         /* mark as down now; beacons will no longer be sent */
@@ -153,6 +185,7 @@ ContactQuorum_DISK_Lock(struct ubik_trans *atrans, int aflags,afs_int32 file,
     struct ubik_server *ts;
     afs_int32 code;
     afs_int32 rcode, okcalls;
+    struct rx_connection *conn;
 
     rcode = 0;
     okcalls = 0;
@@ -162,8 +195,14 @@ ContactQuorum_DISK_Lock(struct ubik_trans *atrans, int aflags,afs_int32 file,
            ts->currentDB = 0;  /* db is no longer current; we just missed an update */
            continue;           /* not up-to-date, don't bother */
        }
-       code = DISK_Lock(ts->disk_rxcid, &atrans->tid, file, position, length,
-                          type);
+
+       conn = Quorum_StartIO(atrans, ts);
+
+       code = DISK_Lock(conn, &atrans->tid, file, position, length, type);
+
+       Quorum_EndIO(atrans, conn);
+       conn = NULL;
+
        if (code) {             /* failure */
            rcode = code;
            ts->up = 0;         /* mark as down now; beacons will no longer be sent */
@@ -192,6 +231,7 @@ ContactQuorum_DISK_Write(struct ubik_trans *atrans, int aflags,
     struct ubik_server *ts;
     afs_int32 code;
     afs_int32 rcode, okcalls;
+    struct rx_connection *conn;
 
     rcode = 0;
     okcalls = 0;
@@ -201,7 +241,14 @@ ContactQuorum_DISK_Write(struct ubik_trans *atrans, int aflags,
            ts->currentDB = 0;  /* db is no longer current; we just missed an update */
            continue;           /* not up-to-date, don't bother */
        }
-       code = DISK_Write(ts->disk_rxcid, &atrans->tid, file, position, data);
+
+       conn = Quorum_StartIO(atrans, ts);
+
+       code = DISK_Write(conn, &atrans->tid, file, position, data);
+
+       Quorum_EndIO(atrans, conn);
+       conn = NULL;
+
        if (code) {             /* failure */
            rcode = code;
            ts->up = 0;         /* mark as down now; beacons will no longer be sent */
@@ -230,6 +277,7 @@ ContactQuorum_DISK_Truncate(struct ubik_trans *atrans, int aflags,
     struct ubik_server *ts;
     afs_int32 code;
     afs_int32 rcode, okcalls;
+    struct rx_connection *conn;
 
     rcode = 0;
     okcalls = 0;
@@ -239,7 +287,14 @@ ContactQuorum_DISK_Truncate(struct ubik_trans *atrans, int aflags,
            ts->currentDB = 0;  /* db is no longer current; we just missed an update */
            continue;           /* not up-to-date, don't bother */
        }
-       code = DISK_Truncate(ts->disk_rxcid, &atrans->tid, file, length);
+
+       conn = Quorum_StartIO(atrans, ts);
+
+       code = DISK_Truncate(conn, &atrans->tid, file, length);
+
+       Quorum_EndIO(atrans, conn);
+       conn = NULL;
+
        if (code) {             /* failure */
            rcode = code;
            ts->up = 0;         /* mark as down now; beacons will no longer be sent */
@@ -268,6 +323,7 @@ ContactQuorum_DISK_WriteV(struct ubik_trans *atrans, int aflags,
     struct ubik_server *ts;
     afs_int32 code;
     afs_int32 rcode, okcalls;
+    struct rx_connection *conn;
 
     rcode = 0;
     okcalls = 0;
@@ -278,7 +334,12 @@ ContactQuorum_DISK_WriteV(struct ubik_trans *atrans, int aflags,
            continue;           /* not up-to-date, don't bother */
        }
 
-       code = DISK_WriteV(ts->disk_rxcid, &atrans->tid, io_vector, io_buffer);
+       conn = Quorum_StartIO(atrans, ts);
+
+       code = DISK_WriteV(conn, &atrans->tid, io_vector, io_buffer);
+
+       Quorum_EndIO(atrans, conn);
+       conn = NULL;
 
        if ((code <= -450) && (code > -500)) {
            /* An RPC interface mismatch (as defined in comerr/error_msg.c).
@@ -291,6 +352,8 @@ ContactQuorum_DISK_WriteV(struct ubik_trans *atrans, int aflags,
            bulkdata tcbs;
            afs_int32 i, offset;
 
+           conn = Quorum_StartIO(atrans, ts);
+
            for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
                /* Sanity check for going off end of buffer */
                if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
@@ -299,14 +362,18 @@ ContactQuorum_DISK_WriteV(struct ubik_trans *atrans, int aflags,
                }
                tcbs.bulkdata_len = iovec[i].length;
                tcbs.bulkdata_val = &iobuf[offset];
+
                code =
-                   DISK_Write(ts->disk_rxcid, &atrans->tid, iovec[i].file,
+                   DISK_Write(conn, &atrans->tid, iovec[i].file,
                               iovec[i].position, &tcbs);
                if (code)
                    break;
 
                offset += iovec[i].length;
            }
+
+           Quorum_EndIO(atrans, conn);
+           conn = NULL;
        }
 
        if (code) {             /* failure */
@@ -338,6 +405,7 @@ ContactQuorum_DISK_SetVersion(struct ubik_trans *atrans, int aflags,
     struct ubik_server *ts;
     afs_int32 code;
     afs_int32 rcode, okcalls;
+    struct rx_connection *conn;
 
     rcode = 0;
     okcalls = 0;
@@ -347,8 +415,14 @@ ContactQuorum_DISK_SetVersion(struct ubik_trans *atrans, int aflags,
            ts->currentDB = 0;  /* db is no longer current; we just missed an update */
            continue;           /* not up-to-date, don't bother */
        }
-       code = DISK_SetVersion(ts->disk_rxcid, &atrans->tid, OldVersion,
-                              NewVersion);
+
+       conn = Quorum_StartIO(atrans, ts);
+
+       code = DISK_SetVersion(conn, &atrans->tid, OldVersion, NewVersion);
+
+       Quorum_EndIO(atrans, conn);
+       conn = NULL;
+
        if (code) {             /* failure */
            rcode = code;
            ts->up = 0;         /* mark as down now; beacons will no longer be sent */
@@ -947,15 +1021,23 @@ ubik_EndTrans(struct ubik_trans *transPtr)
        }
        for (ts = ubik_servers; ts; ts = ts->next) {
            if (!ts->beaconSinceDown && now <= ts->lastBeaconSent + BIGTIME) {
+
                /* this guy could have some damaged data, wait for him */
                code = 1;
                tv.tv_sec = 1;  /* try again after a while (ha ha) */
                tv.tv_usec = 0;
+
 #ifdef AFS_PTHREAD_ENV
+               /* we could release the dbase outside of the loop, but we do
+                * it here, in the loop, to avoid an unnecessary RELE/HOLD
+                * if all sites are up */
+               DBRELE(dbase);
                select(0, 0, 0, 0, &tv);
+               DBHOLD(dbase);
 #else
                IOMGR_Select(0, 0, 0, 0, &tv);  /* poll, should we wait on something? */
 #endif
+
                break;
            }
        }