ubik: Rearrange some initialization code
[openafs.git] / src / ubik / ubik.c
index bd25f48..c91ce7d 100644 (file)
 
 #include <lwp.h>   /* temporary hack by klm */
 
-#define ERROR_EXIT(code) {error=(code); goto error_exit;}
+#define ERROR_EXIT(code) do { \
+    error = (code); \
+    goto error_exit; \
+} while (0)
 
 /*!
  * \file
@@ -119,6 +122,68 @@ Quorum_EndIO(struct ubik_trans *atrans, struct rx_connection *aconn)
 #endif /* AFS_PTHREAD_ENV */
 }
 
+
+/*
+ * Iterate over all servers.  Callers pass in *ts which is used to track
+ * the current server.
+ * - Returns 1 if there are no more servers
+ * - Returns 0 with conn set to the connection for the current server if
+ *   it's up and current
+ */
+static int
+ContactQuorum_iterate(struct ubik_trans *atrans, int aflags, struct ubik_server **ts,
+                        struct rx_connection **conn, afs_int32 *rcode,
+                        afs_int32 *okcalls, afs_int32 code)
+{
+    if (!*ts) {
+       /* Initial call - start iterating over servers */
+       *ts = ubik_servers;
+       *conn = NULL;
+       *rcode = 0;
+       *okcalls = 0;
+    } else {
+       if (*conn) {
+           Quorum_EndIO(atrans, *conn);
+           *conn = NULL;
+           if (code) {         /* failure */
+               *rcode = code;
+               (*ts)->up = 0;          /* mark as down now; beacons will no longer be sent */
+               (*ts)->beaconSinceDown = 0;
+               (*ts)->currentDB = 0;
+               urecovery_LostServer(*ts);      /* tell recovery to try to resend dbase later */
+           } else {            /* success */
+               if (!(*ts)->isClone)
+                   (*okcalls)++;       /* count up how many worked */
+               if (aflags & CStampVersion) {
+                   (*ts)->version = atrans->dbase->version;
+               }
+           }
+       }
+       *ts = (*ts)->next;
+    }
+    if (!(*ts))
+       return 1;
+    if (!(*ts)->up || !(*ts)->currentDB) {
+       (*ts)->currentDB = 0;   /* db is no longer current; we just missed an update */
+       return 0;               /* not up-to-date, don't bother.  NULL conn will tell caller not to use */
+    }
+    *conn = Quorum_StartIO(atrans, *ts);
+    return 0;
+}
+
+static int
+ContactQuorum_rcode(int okcalls, afs_int32 rcode)
+{
+    /*
+     * return 0 if we successfully contacted a quorum, otherwise return error code.
+     * We don't have to contact ourselves (that was done locally)
+     */
+    if (okcalls + 1 >= ubik_quorum)
+       return 0;
+    else
+       return rcode;
+}
+
 /*!
  * \brief Perform an operation at a quorum, handling error conditions.
  * \return 0 if all worked and a quorum was contacted successfully
@@ -137,314 +202,144 @@ afs_int32
 ContactQuorum_NoArguments(afs_int32 (*proc)(struct rx_connection *, ubik_tid *),
                          struct ubik_trans *atrans, int aflags)
 {
-    struct ubik_server *ts;
-    afs_int32 code;
-    afs_int32 rcode, okcalls;
+    struct ubik_server *ts = NULL;
+    afs_int32 code = 0, rcode, okcalls;
     struct rx_connection *conn;
+    int done;
 
-    rcode = 0;
-    okcalls = 0;
-    for (ts = ubik_servers; ts; ts = ts->next) {
-       /* for each server */
-       if (!ts->up || !ts->currentDB) {
-           ts->currentDB = 0;  /* db is no longer current; we just missed an update */
-           continue;           /* not up-to-date, don't bother */
-       }
-
-       conn = Quorum_StartIO(atrans, ts);
-
-       code = (*proc)(conn, &atrans->tid);
-
-       Quorum_EndIO(atrans, conn);
-       conn = NULL;
-
-       if (code) {             /* failure */
-           rcode = code;
-           ts->up = 0;         /* mark as down now; beacons will no longer be sent */
-           ts->currentDB = 0;
-           ts->beaconSinceDown = 0;
-           urecovery_LostServer(ts);   /* tell recovery to try to resend dbase later */
-       } else {                /* success */
-           if (!ts->isClone)
-               okcalls++;      /* count up how many worked */
-           if (aflags & CStampVersion) {
-               ts->version = atrans->dbase->version;
-           }
-       }
+    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+    while (!done) {
+       if (conn)
+           code = (*proc)(conn, &atrans->tid);
+       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
     }
-    /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
-    if (okcalls + 1 >= ubik_quorum)
-       return 0;
-    else
-       return rcode;
+    return ContactQuorum_rcode(okcalls, rcode);
 }
 
+
 afs_int32
 ContactQuorum_DISK_Lock(struct ubik_trans *atrans, int aflags,afs_int32 file,
                        afs_int32 position, afs_int32 length, afs_int32 type)
 {
-    struct ubik_server *ts;
-    afs_int32 code;
-    afs_int32 rcode, okcalls;
+    struct ubik_server *ts = NULL;
+    afs_int32 code = 0, rcode, okcalls;
     struct rx_connection *conn;
+    int done;
 
-    rcode = 0;
-    okcalls = 0;
-    for (ts = ubik_servers; ts; ts = ts->next) {
-       /* for each server */
-       if (!ts->up || !ts->currentDB) {
-           ts->currentDB = 0;  /* db is no longer current; we just missed an update */
-           continue;           /* not up-to-date, don't bother */
-       }
-
-       conn = Quorum_StartIO(atrans, ts);
-
-       code = DISK_Lock(conn, &atrans->tid, file, position, length, type);
-
-       Quorum_EndIO(atrans, conn);
-       conn = NULL;
-
-       if (code) {             /* failure */
-           rcode = code;
-           ts->up = 0;         /* mark as down now; beacons will no longer be sent */
-           ts->currentDB = 0;
-           ts->beaconSinceDown = 0;
-           urecovery_LostServer(ts);   /* tell recovery to try to resend dbase later */
-       } else {                /* success */
-           if (!ts->isClone)
-               okcalls++;      /* count up how many worked */
-           if (aflags & CStampVersion) {
-               ts->version = atrans->dbase->version;
-           }
-       }
+    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+    while (!done) {
+       if (conn)
+           code = DISK_Lock(conn, &atrans->tid, file, position, length, type);
+       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
     }
-    /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
-    if (okcalls + 1 >= ubik_quorum)
-       return 0;
-    else
-       return rcode;
+    return ContactQuorum_rcode(okcalls, rcode);
 }
 
+
 afs_int32
 ContactQuorum_DISK_Write(struct ubik_trans *atrans, int aflags,
                         afs_int32 file, afs_int32 position, bulkdata *data)
 {
-    struct ubik_server *ts;
-    afs_int32 code;
-    afs_int32 rcode, okcalls;
+    struct ubik_server *ts = NULL;
+    afs_int32 code = 0, rcode, okcalls;
     struct rx_connection *conn;
+    int done;
 
-    rcode = 0;
-    okcalls = 0;
-    for (ts = ubik_servers; ts; ts = ts->next) {
-       /* for each server */
-       if (!ts->up || !ts->currentDB) {
-           ts->currentDB = 0;  /* db is no longer current; we just missed an update */
-           continue;           /* not up-to-date, don't bother */
-       }
-
-       conn = Quorum_StartIO(atrans, ts);
-
-       code = DISK_Write(conn, &atrans->tid, file, position, data);
-
-       Quorum_EndIO(atrans, conn);
-       conn = NULL;
-
-       if (code) {             /* failure */
-           rcode = code;
-           ts->up = 0;         /* mark as down now; beacons will no longer be sent */
-           ts->currentDB = 0;
-           ts->beaconSinceDown = 0;
-           urecovery_LostServer(ts);   /* tell recovery to try to resend dbase later */
-       } else {                /* success */
-           if (!ts->isClone)
-               okcalls++;      /* count up how many worked */
-           if (aflags & CStampVersion) {
-               ts->version = atrans->dbase->version;
-           }
-       }
+    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+    while (!done) {
+       if (conn)
+           code = DISK_Write(conn, &atrans->tid, file, position, data);
+       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
     }
-    /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
-    if (okcalls + 1 >= ubik_quorum)
-       return 0;
-    else
-       return rcode;
+    return ContactQuorum_rcode(okcalls, rcode);
 }
 
+
 afs_int32
 ContactQuorum_DISK_Truncate(struct ubik_trans *atrans, int aflags,
                            afs_int32 file, afs_int32 length)
 {
-    struct ubik_server *ts;
-    afs_int32 code;
-    afs_int32 rcode, okcalls;
+    struct ubik_server *ts = NULL;
+    afs_int32 code = 0, rcode, okcalls;
     struct rx_connection *conn;
+    int done;
 
-    rcode = 0;
-    okcalls = 0;
-    for (ts = ubik_servers; ts; ts = ts->next) {
-       /* for each server */
-       if (!ts->up || !ts->currentDB) {
-           ts->currentDB = 0;  /* db is no longer current; we just missed an update */
-           continue;           /* not up-to-date, don't bother */
-       }
-
-       conn = Quorum_StartIO(atrans, ts);
-
-       code = DISK_Truncate(conn, &atrans->tid, file, length);
-
-       Quorum_EndIO(atrans, conn);
-       conn = NULL;
-
-       if (code) {             /* failure */
-           rcode = code;
-           ts->up = 0;         /* mark as down now; beacons will no longer be sent */
-           ts->currentDB = 0;
-           ts->beaconSinceDown = 0;
-           urecovery_LostServer(ts);   /* tell recovery to try to resend dbase later */
-       } else {                /* success */
-           if (!ts->isClone)
-               okcalls++;      /* count up how many worked */
-           if (aflags & CStampVersion) {
-               ts->version = atrans->dbase->version;
-           }
-       }
+    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+    while (!done) {
+       if (conn)
+           code = DISK_Truncate(conn, &atrans->tid, file, length);
+       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
     }
-    /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
-    if (okcalls + 1 >= ubik_quorum)
-       return 0;
-    else
-       return rcode;
+    return ContactQuorum_rcode(okcalls, rcode);
 }
 
+
 afs_int32
 ContactQuorum_DISK_WriteV(struct ubik_trans *atrans, int aflags,
                          iovec_wrt * io_vector, iovec_buf *io_buffer)
 {
-    struct ubik_server *ts;
-    afs_int32 code;
-    afs_int32 rcode, okcalls;
+    struct ubik_server *ts = NULL;
+    afs_int32 code = 0, rcode, okcalls;
     struct rx_connection *conn;
-
-    rcode = 0;
-    okcalls = 0;
-    for (ts = ubik_servers; ts; ts = ts->next) {
-       /* for each server */
-       if (!ts->up || !ts->currentDB) {
-           ts->currentDB = 0;  /* db is no longer current; we just missed an update */
-           continue;           /* not up-to-date, don't bother */
-       }
-
-       conn = Quorum_StartIO(atrans, ts);
-
-       code = DISK_WriteV(conn, &atrans->tid, io_vector, io_buffer);
-
-       Quorum_EndIO(atrans, conn);
-       conn = NULL;
-
-       if ((code <= -450) && (code > -500)) {
-           /* An RPC interface mismatch (as defined in comerr/error_msg.c).
-            * Un-bulk the entries and do individual DISK_Write calls
-            * instead of DISK_WriteV.
-            */
-           struct ubik_iovec *iovec =
-               (struct ubik_iovec *)io_vector->iovec_wrt_val;
-           char *iobuf = (char *)io_buffer->iovec_buf_val;
-           bulkdata tcbs;
-           afs_int32 i, offset;
-
-           conn = Quorum_StartIO(atrans, ts);
-
-           for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
-               /* Sanity check for going off end of buffer */
-               if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
-                   code = UINTERNAL;
-                   break;
+    int done;
+
+    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+    while (!done) {
+       if (conn) {
+           code = DISK_WriteV(conn, &atrans->tid, io_vector, io_buffer);
+           if ((code <= -450) && (code > -500)) {
+               /* An RPC interface mismatch (as defined in comerr/error_msg.c).
+                * Un-bulk the entries and do individual DISK_Write calls
+                * instead of DISK_WriteV.
+                */
+               struct ubik_iovec *iovec =
+                       (struct ubik_iovec *)io_vector->iovec_wrt_val;
+               char *iobuf = (char *)io_buffer->iovec_buf_val;
+               bulkdata tcbs;
+               afs_int32 i, offset;
+
+               for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
+                   /* Sanity check for going off end of buffer */
+                   if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
+                       code = UINTERNAL;
+                       break;
+                   }
+                   tcbs.bulkdata_len = iovec[i].length;
+                   tcbs.bulkdata_val = &iobuf[offset];
+                   code = DISK_Write(conn, &atrans->tid, iovec[i].file,
+                          iovec[i].position, &tcbs);
+                   if (code)
+                       break;
+                   offset += iovec[i].length;
                }
-               tcbs.bulkdata_len = iovec[i].length;
-               tcbs.bulkdata_val = &iobuf[offset];
-
-               code =
-                   DISK_Write(conn, &atrans->tid, iovec[i].file,
-                              iovec[i].position, &tcbs);
-               if (code)
-                   break;
-
-               offset += iovec[i].length;
-           }
-
-           Quorum_EndIO(atrans, conn);
-           conn = NULL;
-       }
-
-       if (code) {             /* failure */
-           rcode = code;
-           ts->up = 0;         /* mark as down now; beacons will no longer be sent */
-           ts->currentDB = 0;
-           ts->beaconSinceDown = 0;
-           urecovery_LostServer(ts);   /* tell recovery to try to resend dbase later */
-       } else {                /* success */
-           if (!ts->isClone)
-               okcalls++;      /* count up how many worked */
-           if (aflags & CStampVersion) {
-               ts->version = atrans->dbase->version;
            }
        }
+       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
     }
-    /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
-    if (okcalls + 1 >= ubik_quorum)
-       return 0;
-    else
-       return rcode;
+    return ContactQuorum_rcode(okcalls, rcode);
 }
 
+
 afs_int32
 ContactQuorum_DISK_SetVersion(struct ubik_trans *atrans, int aflags,
                              ubik_version *OldVersion,
                              ubik_version *NewVersion)
 {
-    struct ubik_server *ts;
-    afs_int32 code;
-    afs_int32 rcode, okcalls;
+    struct ubik_server *ts = NULL;
+    afs_int32 code = 0, rcode, okcalls;
     struct rx_connection *conn;
+    int done;
 
-    rcode = 0;
-    okcalls = 0;
-    for (ts = ubik_servers; ts; ts = ts->next) {
-       /* for each server */
-       if (!ts->up || !ts->currentDB) {
-           ts->currentDB = 0;  /* db is no longer current; we just missed an update */
-           continue;           /* not up-to-date, don't bother */
-       }
-
-       conn = Quorum_StartIO(atrans, ts);
-
-       code = DISK_SetVersion(conn, &atrans->tid, OldVersion, NewVersion);
-
-       Quorum_EndIO(atrans, conn);
-       conn = NULL;
-
-       if (code) {             /* failure */
-           rcode = code;
-           ts->up = 0;         /* mark as down now; beacons will no longer be sent */
-           ts->currentDB = 0;
-           ts->beaconSinceDown = 0;
-           urecovery_LostServer(ts);   /* tell recovery to try to resend dbase later */
-       } else {                /* success */
-           if (!ts->isClone)
-               okcalls++;      /* count up how many worked */
-           if (aflags & CStampVersion) {
-               ts->version = atrans->dbase->version;
-           }
-       }
+    done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
+    while (!done) {
+       if (conn)
+           code = DISK_SetVersion(conn, &atrans->tid, OldVersion, NewVersion);
+       done = ContactQuorum_iterate(atrans, aflags, &ts, &conn, &rcode, &okcalls, code);
     }
-    /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
-    if (okcalls + 1 >= ubik_quorum)
-       return 0;
-    else
-       return rcode;
+    return ContactQuorum_rcode(okcalls, rcode);
 }
 
+
 /*!
  * \brief This routine initializes the ubik system for a set of servers.
  * \return 0 for success, or an error code on failure.
@@ -525,6 +420,22 @@ ubik_ServerInitCommon(afs_uint32 myHost, short myPort,
     if (code < 0)
        return code;
 
+    udisk_Init(ubik_nBuffers);
+    ulock_Init();
+
+    code = uvote_Init();
+    if (code)
+       return code;
+    code = urecovery_Initialize(tdb);
+    if (code)
+       return code;
+    if (info)
+       code = ubeacon_InitServerListByInfo(myHost, info, clones);
+    else
+       code = ubeacon_InitServerList(myHost, serverList);
+    if (code)
+       return code;
+
     ubik_callPortal = myPort;
     /* try to get an additional security object */
     ubik_sc[0] = rxnull_NewServerSecurityObject();
@@ -588,20 +499,6 @@ ubik_ServerInitCommon(afs_uint32 myHost, short myPort,
               NULL, "rx_ServerProc", &junk);
 #endif
 
-    /* do basic initialization */
-    code = uvote_Init();
-    if (code)
-       return code;
-    code = urecovery_Initialize(tdb);
-    if (code)
-       return code;
-    if (info)
-       code = ubeacon_InitServerListByInfo(myHost, info, clones);
-    else
-       code = ubeacon_InitServerList(myHost, serverList);
-    if (code)
-       return code;
-
     /* now start up async processes */
 #ifdef AFS_PTHREAD_ENV
 /* do assert stuff */
@@ -691,9 +588,6 @@ BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
     struct ubik_trans *jt;
     struct ubik_trans *tt;
     afs_int32 code;
-#if defined(UBIK_PAUSE)
-    int count;
-#endif /* UBIK_PAUSE */
 
     if (readAny > 1 && ubik_SyncWriterCacheProc == NULL) {
        /* it's not safe to use ubik_BeginTransReadAnyWrite without a
@@ -708,37 +602,6 @@ BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
     if ((transMode != UBIK_READTRANS) && readAny)
        return UBADTYPE;
     DBHOLD(dbase);
-#if defined(UBIK_PAUSE)
-    /* if we're polling the slave sites, wait until the returns
-     *  are all in.  Otherwise, the urecovery_CheckTid call may
-     *  glitch us.
-     */
-    if (transMode == UBIK_WRITETRANS)
-       for (count = 75; dbase->flags & DBVOTING; --count) {
-           DBRELE(dbase);
-#ifdef GRAND_PAUSE_DEBUGGING
-           if (count == 75)
-               fprintf(stderr,
-                       "%ld: myport=%d: BeginTrans is waiting 'cause of voting conflict\n",
-                       time(0), ntohs(ubik_callPortal));
-           else
-#endif
-           if (count <= 0) {
-#if 1
-               fprintf(stderr,
-                       "%ld: myport=%d: BeginTrans failed because of voting conflict\n",
-                       time(0), ntohs(ubik_callPortal));
-#endif
-               return UNOQUORUM;       /* a white lie */
-           }
-#ifdef AFS_PTHREAD_ENV
-           sleep(2);
-#else
-           IOMGR_Sleep(2);
-#endif
-           DBHOLD(dbase);
-       }
-#endif /* UBIK_PAUSE */
     if (urecovery_AllBetter(dbase, readAny) == 0) {
        DBRELE(dbase);
        return UNOQUORUM;