ubik: Drop dbase versionLock during I/O and sleeps
[openafs.git] / src / ubik / ubik.c
index b8088a5..4b608c9 100644 (file)
@@ -94,6 +94,30 @@ struct rx_securityClass *ubik_sc[3];
 
 #define        CStampVersion       1   /* meaning set ts->version */
 
+static_inline struct rx_connection *
+Quorum_StartIO(struct ubik_trans *atrans, struct ubik_server *as)
+{
+    struct rx_connection *conn;
+
+    conn = as->disk_rxcid;
+
+#ifdef AFS_PTHREAD_ENV
+    rx_GetConnection(conn);
+    DBRELE(atrans->dbase);
+#endif /* AFS_PTHREAD_ENV */
+
+    return conn;
+}
+
+static_inline void
+Quorum_EndIO(struct ubik_trans *atrans, struct rx_connection *aconn)
+{
+#ifdef AFS_PTHREAD_ENV
+    DBHOLD(atrans->dbase);
+    rx_PutConnection(aconn);
+#endif /* AFS_PTHREAD_ENV */
+}
+
 /*!
  * \brief Perform an operation at a quorum, handling error conditions.
  * \return 0 if all worked and a quorum was contacted successfully
@@ -115,6 +139,7 @@ ContactQuorum_NoArguments(afs_int32 (*proc)(struct rx_connection *, ubik_tid *),
     struct ubik_server *ts;
     afs_int32 code;
     afs_int32 rcode, okcalls;
+    struct rx_connection *conn;
 
     rcode = 0;
     okcalls = 0;
@@ -124,7 +149,14 @@ ContactQuorum_NoArguments(afs_int32 (*proc)(struct rx_connection *, ubik_tid *),
            ts->currentDB = 0;  /* db is no longer current; we just missed an update */
            continue;           /* not up-to-date, don't bother */
        }
-       code = (*proc)(ts->disk_rxcid, &atrans->tid);
+
+       conn = Quorum_StartIO(atrans, ts);
+
+       code = (*proc)(conn, &atrans->tid);
+
+       Quorum_EndIO(atrans, conn);
+       conn = NULL;
+
        if (code) {             /* failure */
            rcode = code;
            ts->up = 0;         /* mark as down now; beacons will no longer be sent */
@@ -153,6 +185,7 @@ ContactQuorum_DISK_Lock(struct ubik_trans *atrans, int aflags,afs_int32 file,
     struct ubik_server *ts;
     afs_int32 code;
     afs_int32 rcode, okcalls;
+    struct rx_connection *conn;
 
     rcode = 0;
     okcalls = 0;
@@ -162,8 +195,14 @@ ContactQuorum_DISK_Lock(struct ubik_trans *atrans, int aflags,afs_int32 file,
            ts->currentDB = 0;  /* db is no longer current; we just missed an update */
            continue;           /* not up-to-date, don't bother */
        }
-       code = DISK_Lock(ts->disk_rxcid, &atrans->tid, file, position, length,
-                          type);
+
+       conn = Quorum_StartIO(atrans, ts);
+
+       code = DISK_Lock(conn, &atrans->tid, file, position, length, type);
+
+       Quorum_EndIO(atrans, conn);
+       conn = NULL;
+
        if (code) {             /* failure */
            rcode = code;
            ts->up = 0;         /* mark as down now; beacons will no longer be sent */
@@ -192,6 +231,7 @@ ContactQuorum_DISK_Write(struct ubik_trans *atrans, int aflags,
     struct ubik_server *ts;
     afs_int32 code;
     afs_int32 rcode, okcalls;
+    struct rx_connection *conn;
 
     rcode = 0;
     okcalls = 0;
@@ -201,7 +241,14 @@ ContactQuorum_DISK_Write(struct ubik_trans *atrans, int aflags,
            ts->currentDB = 0;  /* db is no longer current; we just missed an update */
            continue;           /* not up-to-date, don't bother */
        }
-       code = DISK_Write(ts->disk_rxcid, &atrans->tid, file, position, data);
+
+       conn = Quorum_StartIO(atrans, ts);
+
+       code = DISK_Write(conn, &atrans->tid, file, position, data);
+
+       Quorum_EndIO(atrans, conn);
+       conn = NULL;
+
        if (code) {             /* failure */
            rcode = code;
            ts->up = 0;         /* mark as down now; beacons will no longer be sent */
@@ -230,6 +277,7 @@ ContactQuorum_DISK_Truncate(struct ubik_trans *atrans, int aflags,
     struct ubik_server *ts;
     afs_int32 code;
     afs_int32 rcode, okcalls;
+    struct rx_connection *conn;
 
     rcode = 0;
     okcalls = 0;
@@ -239,7 +287,14 @@ ContactQuorum_DISK_Truncate(struct ubik_trans *atrans, int aflags,
            ts->currentDB = 0;  /* db is no longer current; we just missed an update */
            continue;           /* not up-to-date, don't bother */
        }
-       code = DISK_Truncate(ts->disk_rxcid, &atrans->tid, file, length);
+
+       conn = Quorum_StartIO(atrans, ts);
+
+       code = DISK_Truncate(conn, &atrans->tid, file, length);
+
+       Quorum_EndIO(atrans, conn);
+       conn = NULL;
+
        if (code) {             /* failure */
            rcode = code;
            ts->up = 0;         /* mark as down now; beacons will no longer be sent */
@@ -268,6 +323,7 @@ ContactQuorum_DISK_WriteV(struct ubik_trans *atrans, int aflags,
     struct ubik_server *ts;
     afs_int32 code;
     afs_int32 rcode, okcalls;
+    struct rx_connection *conn;
 
     rcode = 0;
     okcalls = 0;
@@ -278,7 +334,12 @@ ContactQuorum_DISK_WriteV(struct ubik_trans *atrans, int aflags,
            continue;           /* not up-to-date, don't bother */
        }
 
-       code = DISK_WriteV(ts->disk_rxcid, &atrans->tid, io_vector, io_buffer);
+       conn = Quorum_StartIO(atrans, ts);
+
+       code = DISK_WriteV(conn, &atrans->tid, io_vector, io_buffer);
+
+       Quorum_EndIO(atrans, conn);
+       conn = NULL;
 
        if ((code <= -450) && (code > -500)) {
            /* An RPC interface mismatch (as defined in comerr/error_msg.c).
@@ -291,6 +352,8 @@ ContactQuorum_DISK_WriteV(struct ubik_trans *atrans, int aflags,
            bulkdata tcbs;
            afs_int32 i, offset;
 
+           conn = Quorum_StartIO(atrans, ts);
+
            for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
                /* Sanity check for going off end of buffer */
                if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
@@ -299,14 +362,18 @@ ContactQuorum_DISK_WriteV(struct ubik_trans *atrans, int aflags,
                }
                tcbs.bulkdata_len = iovec[i].length;
                tcbs.bulkdata_val = &iobuf[offset];
+
                code =
-                   DISK_Write(ts->disk_rxcid, &atrans->tid, iovec[i].file,
+                   DISK_Write(conn, &atrans->tid, iovec[i].file,
                               iovec[i].position, &tcbs);
                if (code)
                    break;
 
                offset += iovec[i].length;
            }
+
+           Quorum_EndIO(atrans, conn);
+           conn = NULL;
        }
 
        if (code) {             /* failure */
@@ -338,6 +405,7 @@ ContactQuorum_DISK_SetVersion(struct ubik_trans *atrans, int aflags,
     struct ubik_server *ts;
     afs_int32 code;
     afs_int32 rcode, okcalls;
+    struct rx_connection *conn;
 
     rcode = 0;
     okcalls = 0;
@@ -347,8 +415,14 @@ ContactQuorum_DISK_SetVersion(struct ubik_trans *atrans, int aflags,
            ts->currentDB = 0;  /* db is no longer current; we just missed an update */
            continue;           /* not up-to-date, don't bother */
        }
-       code = DISK_SetVersion(ts->disk_rxcid, &atrans->tid, OldVersion,
-                              NewVersion);
+
+       conn = Quorum_StartIO(atrans, ts);
+
+       code = DISK_SetVersion(conn, &atrans->tid, OldVersion, NewVersion);
+
+       Quorum_EndIO(atrans, conn);
+       conn = NULL;
+
        if (code) {             /* failure */
            rcode = code;
            ts->up = 0;         /* mark as down now; beacons will no longer be sent */
@@ -947,15 +1021,23 @@ ubik_EndTrans(struct ubik_trans *transPtr)
        }
        for (ts = ubik_servers; ts; ts = ts->next) {
            if (!ts->beaconSinceDown && now <= ts->lastBeaconSent + BIGTIME) {
+
                /* this guy could have some damaged data, wait for him */
                code = 1;
                tv.tv_sec = 1;  /* try again after a while (ha ha) */
                tv.tv_usec = 0;
+
 #ifdef AFS_PTHREAD_ENV
+               /* we could release the dbase outside of the loop, but we do
+                * it here, in the loop, to avoid an unnecessary RELE/HOLD
+                * if all sites are up */
+               DBRELE(dbase);
                select(0, 0, 0, 0, &tv);
+               DBHOLD(dbase);
 #else
                IOMGR_Select(0, 0, 0, 0, &tv);  /* poll, should we wait on something? */
 #endif
+
                break;
            }
        }