DEVEL15-kill-ubik-pthread-env-20080718
[openafs.git] / src / ubik / ubik.c
index a413ba3..7d017b1 100644 (file)
@@ -10,7 +10,8 @@
 #include <afsconfig.h>
 #include <afs/param.h>
 
-RCSID("$Header$");
+RCSID
+    ("$Header$");
 
 #include <sys/types.h>
 #ifdef AFS_NT40_ENV
@@ -22,6 +23,7 @@ RCSID("$Header$");
 #endif
 #include <time.h>
 #include <lock.h>
+#include <string.h>
 #include <rx/xdr.h>
 #include <rx/rx.h>
 #include <afs/cellconfig.h>
@@ -30,6 +32,8 @@ RCSID("$Header$");
 #include "ubik.h"
 #include "ubik_int.h"
 
+#include <lwp.h>   /* temporary hack by klm */
+
 #define ERROR_EXIT(code) {error=(code); goto error_exit;}
 
 /*  This system is organized in a hierarchical set of related modules.  Modules
@@ -71,13 +75,13 @@ RCSID("$Header$");
 
 
 /* some globals */
-afs_int32 ubik_quorum=0;
-struct ubik_dbase *ubik_dbase=0;
+afs_int32 ubik_quorum = 0;
+struct ubik_dbase *ubik_dbase = 0;
 struct ubik_stats ubik_stats;
 afs_uint32 ubik_host[UBIK_MAX_INTERFACE_ADDR];
 afs_int32 ubik_epochTime = 0;
 afs_int32 urecovery_state = 0;
-int (*ubik_SRXSecurityProc)();
+int (*ubik_SRXSecurityProc) ();
 char *ubik_SRXSecurityRock;
 struct ubik_server *ubik_servers;
 short ubik_callPortal;
@@ -97,69 +101,79 @@ struct rx_securityClass *ubik_sc[3];
     out old data because it is sent the sync count along with the beacon message that
     marks it as *really* up (beaconSinceDown).
 */
-#define        CStampVersion       1       /* meaning set ts->version */
-afs_int32 ContactQuorum(aproc, atrans, aflags, aparm0, aparm1, aparm2, aparm3, aparm4, aparm5)
-    int (*aproc)();
-    int aflags;
-    register struct ubik_trans *atrans;
-    long aparm0, aparm1, aparm2, aparm3, aparm4, aparm5; {
+#define        CStampVersion       1   /* meaning set ts->version */
+afs_int32
+ContactQuorum(aproc, atrans, aflags, aparm0, aparm1, aparm2, aparm3, aparm4,
+             aparm5)
+     int (*aproc) ();
+     int aflags;
+     register struct ubik_trans *atrans;
+     long aparm0, aparm1, aparm2, aparm3, aparm4, aparm5;
+{
     register struct ubik_server *ts;
     register afs_int32 code;
     afs_int32 rcode, okcalls;
 
     rcode = 0;
     okcalls = 0;
-    for(ts = ubik_servers; ts; ts=ts->next) {
+    for (ts = ubik_servers; ts; ts = ts->next) {
        /* for each server */
        if (!ts->up || !ts->currentDB) {
            ts->currentDB = 0;  /* db is no longer current; we just missed an update */
-           continue;    /* not up-to-date, don't bother */
+           continue;           /* not up-to-date, don't bother */
        }
-       code = (*aproc) (ts->disk_rxcid, &atrans->tid, aparm0, aparm1, aparm2, aparm3, aparm4, aparm5);
-       if ( (aproc == DISK_WriteV) && (code <= -450) && (code > -500) ) {
-          /* An RPC interface mismatch (as defined in comerr/error_msg.c).
-           * Un-bulk the entries and do individual DISK_Write calls
-           * instead of DISK_WriteV.
-           */
-          iovec_wrt         *iovec_infoP = (iovec_wrt *)aparm0;
-          iovec_buf         *iovec_dataP = (iovec_buf *)aparm1;
-          struct ubik_iovec *iovec       = (struct ubik_iovec *)iovec_infoP->iovec_wrt_val;
-          char              *iobuf       = (char *)iovec_dataP->iovec_buf_val;
-          bulkdata          tcbs;
-          afs_int32             i, offset;
-
-          for (i=0, offset=0; i<iovec_infoP->iovec_wrt_len; i++) {
-             /* Sanity check for going off end of buffer */
-             if ((offset + iovec[i].length) > iovec_dataP->iovec_buf_len) {
-                code = UINTERNAL;
-                break;
-             }
-             tcbs.bulkdata_len = iovec[i].length;
-             tcbs.bulkdata_val = &iobuf[offset];
-             code = DISK_Write(ts->disk_rxcid, &atrans->tid,
-                               iovec[i].file, iovec[i].position, &tcbs);
-             if (code) break;
-
-             offset += iovec[i].length;
-          }
+       code =
+           (*aproc) (ts->disk_rxcid, &atrans->tid, aparm0, aparm1, aparm2,
+                     aparm3, aparm4, aparm5);
+       if ((aproc == DISK_WriteV) && (code <= -450) && (code > -500)) {
+           /* An RPC interface mismatch (as defined in comerr/error_msg.c).
+            * Un-bulk the entries and do individual DISK_Write calls
+            * instead of DISK_WriteV.
+            */
+           iovec_wrt *iovec_infoP = (iovec_wrt *) aparm0;
+           iovec_buf *iovec_dataP = (iovec_buf *) aparm1;
+           struct ubik_iovec *iovec =
+               (struct ubik_iovec *)iovec_infoP->iovec_wrt_val;
+           char *iobuf = (char *)iovec_dataP->iovec_buf_val;
+           bulkdata tcbs;
+           afs_int32 i, offset;
+
+           for (i = 0, offset = 0; i < iovec_infoP->iovec_wrt_len; i++) {
+               /* Sanity check for going off end of buffer */
+               if ((offset + iovec[i].length) > iovec_dataP->iovec_buf_len) {
+                   code = UINTERNAL;
+                   break;
+               }
+               tcbs.bulkdata_len = iovec[i].length;
+               tcbs.bulkdata_val = &iobuf[offset];
+               code =
+                   DISK_Write(ts->disk_rxcid, &atrans->tid, iovec[i].file,
+                              iovec[i].position, &tcbs);
+               if (code)
+                   break;
+
+               offset += iovec[i].length;
+           }
        }
-       if (code) {         /* failure */
+       if (code) {             /* failure */
            rcode = code;
-           ts->up = 0;     /* mark as down now; beacons will no longer be sent */
+           ts->up = 0;         /* mark as down now; beacons will no longer be sent */
            ts->currentDB = 0;
            ts->beaconSinceDown = 0;
-           urecovery_LostServer(); /* tell recovery to try to resend dbase later */
-       } else {            /* success */
-            if (!ts->isClone)
-               okcalls++;          /* count up how many worked */
+           urecovery_LostServer();     /* tell recovery to try to resend dbase later */
+       } else {                /* success */
+           if (!ts->isClone)
+               okcalls++;      /* count up how many worked */
            if (aflags & CStampVersion) {
                ts->version = atrans->dbase->version;
            }
        }
     }
     /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
-    if (okcalls+1 >= ubik_quorum) return 0;
-    else return rcode;
+    if (okcalls + 1 >= ubik_quorum)
+       return 0;
+    else
+       return rcode;
 }
 
 /* This routine initializes the ubik system for a set of servers.  It returns 0 for success, or an error code on failure.  The set of servers is specified by serverList; nServers gives the number of entries in this array.  Finally, dbase is the returned structure representing this instance of a ubik; it is passed to various calls below.  The variable pathName provides an initial prefix used for naming storage files used by this system.  It should perhaps be generalized to a low-level disk interface providing read, write, file enumeration and sync operations.
@@ -167,61 +181,38 @@ afs_int32 ContactQuorum(aproc, atrans, aflags, aparm0, aparm1, aparm2, aparm3, a
     Note that the host named by myHost should not also be listed in serverList.
 */
 
-int ubik_ServerInitByInfo(myHost, myPort, info, clones, pathName, dbase)
-    struct afsconf_cell *info;  /* in */
-    char clones[];
-    afs_int32 myHost;
-    short myPort;
-    char *pathName;    /* in */
-    struct ubik_dbase **dbase; /* out */ 
-{
-     afs_int32 code;
-    
-     code = ubik_ServerInitCommon(myHost, myPort, info, clones, 0, pathName, dbase);
-     return code;
-}
-
-int ubik_ServerInit(myHost, myPort, serverList, pathName, dbase)
-    afs_int32 serverList[];    /* in */
-    afs_int32 myHost;
-    short myPort;
-    char *pathName;    /* in */
-    struct ubik_dbase **dbase; /* out */ 
-{
-     afs_int32 code;
-    
-     code = ubik_ServerInitCommon(myHost, myPort, (struct afsconf_cell *)0, 0,
-                       serverList, pathName, dbase);
-     return code;
-}
-
-int ubik_ServerInitCommon(myHost, myPort, info, clones, serverList, pathName, dbase)
-    afs_int32 myHost;
-    short myPort;
-    struct afsconf_cell *info;  /* in */
-    char clones[];
-    afs_int32 serverList[];    /* in */
-    char *pathName;    /* in */
-    struct ubik_dbase **dbase; /* out */ 
+int
+ubik_ServerInitCommon(afs_int32 myHost, short myPort,
+                     struct afsconf_cell *info, char clones[],
+                     afs_int32 serverList[], char *pathName,
+                     struct ubik_dbase **dbase)
 {
     register struct ubik_dbase *tdb;
     register afs_int32 code;
+#ifdef AFS_PTHREAD_ENV
+    pthread_t rxServerThread;        /* pthread variables */
+    pthread_t ubeacon_InteractThread;
+    pthread_t urecovery_InteractThread;
+    pthread_attr_t rxServer_tattr;
+    pthread_attr_t ubeacon_Interact_tattr;
+    pthread_attr_t urecovery_Interact_tattr;
+#else
     PROCESS junk;
+#endif
+
     afs_int32 secIndex;
     struct rx_securityClass *secClass;
 
     struct rx_service *tservice;
-    extern struct rx_securityClass *rxnull_NewServerSecurityObject();
     extern int VOTE_ExecuteRequest(), DISK_ExecuteRequest();
-    extern void rx_ServerProc();
     extern int rx_stackSize;
 
     initialize_U_error_table();
 
-    tdb = (struct ubik_dbase *) malloc(sizeof(struct ubik_dbase));
-    tdb->pathName = (char *) malloc(strlen(pathName)+1);
+    tdb = (struct ubik_dbase *)malloc(sizeof(struct ubik_dbase));
+    tdb->pathName = (char *)malloc(strlen(pathName) + 1);
     strcpy(tdb->pathName, pathName);
-    tdb->activeTrans = (struct ubik_trans *) 0;
+    tdb->activeTrans = (struct ubik_trans *)0;
     memset(&tdb->version, 0, sizeof(struct ubik_version));
     memset(&tdb->cachedVersion, 0, sizeof(struct ubik_version));
     Lock_Init(&tdb->versionLock);
@@ -229,32 +220,55 @@ int ubik_ServerInitCommon(myHost, myPort, info, clones, serverList, pathName, db
     tdb->read = uphys_read;
     tdb->write = uphys_write;
     tdb->truncate = uphys_truncate;
-    tdb->open =        0;  /* this function isn't used any more */
+    tdb->open = uphys_invalidate;      /* this function isn't used any more */
     tdb->sync = uphys_sync;
     tdb->stat = uphys_stat;
     tdb->getlabel = uphys_getlabel;
     tdb->setlabel = uphys_setlabel;
     tdb->getnfiles = uphys_getnfiles;
-    tdb->readers=0;
-    tdb->tidCounter=tdb->writeTidCounter=0;
+    tdb->readers = 0;
+    tdb->tidCounter = tdb->writeTidCounter = 0;
     *dbase = tdb;
-    ubik_dbase = tdb;  /* for now, only one db per server; can fix later when we have names for the other dbases */
+    ubik_dbase = tdb;          /* for now, only one db per server; can fix later when we have names for the other dbases */
 
     /* initialize RX */
+
+    /* the following call is idempotent so when/if it got called earlier,
+     * by whatever called us, it doesn't really matter -- klm */
+    code = rx_Init(myPort);
+    if (code < 0)
+       return code;
+
     ubik_callPortal = myPort;
     /* try to get an additional security object */
     ubik_sc[0] = rxnull_NewServerSecurityObject();
     ubik_sc[1] = 0;
     ubik_sc[2] = 0;
     if (ubik_SRXSecurityProc) {
-       code = (*ubik_SRXSecurityProc)(ubik_SRXSecurityRock, &secClass, &secIndex);
+       code =
+           (*ubik_SRXSecurityProc) (ubik_SRXSecurityRock, &secClass,
+                                    &secIndex);
        if (code == 0) {
            ubik_sc[secIndex] = secClass;
        }
     }
+    /* for backwards compat this should keep working as it does now 
+       and not host bind */
+#if 0
+    /* This really needs to be up above, where I have put it.  It works
+     * here when we're non-pthreaded, but the code above, when using
+     * pthreads may (and almost certainly does) end up calling on a
+     * pthread resource which gets initialized by rx_Init.  The end
+     * result is that an assert fails and the program dies. -- klm
+     */
     code = rx_Init(myPort);
-    if (code < 0) return code;
-    tservice = rx_NewService(0, VOTE_SERVICE_ID, "VOTE", ubik_sc, 3, VOTE_ExecuteRequest);
+    if (code < 0)
+       return code;
+#endif
+
+    tservice =
+       rx_NewService(0, VOTE_SERVICE_ID, "VOTE", ubik_sc, 3,
+                     VOTE_ExecuteRequest);
     if (tservice == (struct rx_service *)0) {
        ubik_dprint("Could not create VOTE rx service!\n");
        return -1;
@@ -262,7 +276,9 @@ int ubik_ServerInitCommon(myHost, myPort, info, clones, serverList, pathName, db
     rx_SetMinProcs(tservice, 2);
     rx_SetMaxProcs(tservice, 3);
 
-    tservice = rx_NewService(0, DISK_SERVICE_ID, "DISK", ubik_sc, 3, DISK_ExecuteRequest);
+    tservice =
+       rx_NewService(0, DISK_SERVICE_ID, "DISK", ubik_sc, 3,
+                     DISK_ExecuteRequest);
     if (tservice == (struct rx_service *)0) {
        ubik_dprint("Could not create DISK rx service!\n");
        return -1;
@@ -274,26 +290,92 @@ int ubik_ServerInitCommon(myHost, myPort, info, clones, serverList, pathName, db
      * UpdateInterfaceAddr RPC that occurs in ubeacon_InitServerList. This avoids
      * the "steplock" problem in ubik initialization. Defect 11037.
      */
+#ifdef AFS_PTHREAD_ENV
+/* do assert stuff */
+    assert(pthread_attr_init(&rxServer_tattr) == 0);
+    assert(pthread_attr_setdetachstate(&rxServer_tattr, PTHREAD_CREATE_DETACHED) == 0);
+/*    assert(pthread_attr_setstacksize(&rxServer_tattr, rx_stackSize) == 0); */
+
+    assert(pthread_create(&rxServerThread, &rxServer_tattr, (void *)rx_ServerProc, NULL) == 0);
+#else
     LWP_CreateProcess(rx_ServerProc, rx_stackSize, RX_PROCESS_PRIORITY,
-                     0, "rx_ServerProc", &junk);
+              NULL, "rx_ServerProc", &junk);
+#endif
 
     /* do basic initialization */
     code = uvote_Init();
-    if (code) return code;
+    if (code)
+       return code;
     code = urecovery_Initialize(tdb);
-    if (code) return code;
+    if (code)
+       return code;
     if (info)
-        code = ubeacon_InitServerListByInfo(myHost, info, clones);
-    else 
-        code = ubeacon_InitServerList(myHost, serverList);
-    if (code) return code;
+       code = ubeacon_InitServerListByInfo(myHost, info, clones);
+    else
+       code = ubeacon_InitServerList(myHost, serverList);
+    if (code)
+       return code;
 
     /* now start up async processes */
-    code = LWP_CreateProcess(ubeacon_Interact, 16384/*8192*/, LWP_MAX_PRIORITY-1,
-                            0, "beacon", &junk);
-    if (code) return code;
-    code = LWP_CreateProcess(urecovery_Interact, 16384/*8192*/, LWP_MAX_PRIORITY-1,
-                            0, "recovery", &junk);
+#ifdef AFS_PTHREAD_ENV
+/* do assert stuff */
+    assert(pthread_attr_init(&ubeacon_Interact_tattr) == 0);
+    assert(pthread_attr_setdetachstate(&ubeacon_Interact_tattr, PTHREAD_CREATE_DETACHED) == 0);
+/*    assert(pthread_attr_setstacksize(&ubeacon_Interact_tattr, 16384) == 0); */
+    /*  need another attr set here for priority???  - klm */
+
+    assert(pthread_create(&ubeacon_InteractThread, &ubeacon_Interact_tattr,
+           (void *)ubeacon_Interact, NULL) == 0);
+#else
+    code = LWP_CreateProcess(ubeacon_Interact, 16384 /*8192 */ ,
+                            LWP_MAX_PRIORITY - 1, (void *)0, "beacon",
+                            &junk);
+    if (code)
+       return code;
+#endif
+
+#ifdef AFS_PTHREAD_ENV
+/* do assert stuff */
+    assert(pthread_attr_init(&urecovery_Interact_tattr) == 0);
+    assert(pthread_attr_setdetachstate(&urecovery_Interact_tattr, PTHREAD_CREATE_DETACHED) == 0);
+/*    assert(pthread_attr_setstacksize(&urecovery_Interact_tattr, 16384) == 0); */
+    /*  need another attr set here for priority???  - klm */
+
+    assert(pthread_create(&urecovery_InteractThread, &urecovery_Interact_tattr,
+           (void *)urecovery_Interact, NULL) == 0);
+
+    return 0;  /* is this correct?  - klm */
+#else  
+    code = LWP_CreateProcess(urecovery_Interact, 16384 /*8192 */ ,
+                            LWP_MAX_PRIORITY - 1, (void *)0, "recovery",
+                            &junk);
+    return code;
+#endif
+
+}
+
+int
+ubik_ServerInitByInfo(afs_int32 myHost, short myPort,
+                     struct afsconf_cell *info, char clones[],
+                     char *pathName, struct ubik_dbase **dbase)
+{
+    afs_int32 code;
+
+    code =
+       ubik_ServerInitCommon(myHost, myPort, info, clones, 0, pathName,
+                             dbase);
+    return code;
+}
+
+int
+ubik_ServerInit(afs_int32 myHost, short myPort, afs_int32 serverList[],
+               char *pathName, struct ubik_dbase **dbase)
+{
+    afs_int32 code;
+
+    code =
+       ubik_ServerInitCommon(myHost, myPort, (struct afsconf_cell *)0, 0,
+                             serverList, pathName, dbase);
     return code;
 }
 
@@ -308,32 +390,72 @@ int ubik_ServerInitCommon(myHost, myPort, info, clones, serverList, pathName, db
     We can only begin transaction when we have an up-to-date database.
 */
 
-static int BeginTrans(dbase, transMode, transPtr, readAny)
-    register struct ubik_dbase *dbase; /* in */
-    int readAny;
-    afs_int32 transMode; /* in */
-    struct ubik_trans **transPtr;      /* out */ {
+static int
+BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
+          struct ubik_trans **transPtr, int readAny)
+{
     struct ubik_trans *jt;
     register struct ubik_trans *tt;
     register afs_int32 code;
+#if defined(UBIK_PAUSE)
+    int count;
+#endif /* UBIK_PAUSE */
 
-    if ((transMode != UBIK_READTRANS) && readAny) return UBADTYPE;
+    if ((transMode != UBIK_READTRANS) && readAny)
+       return UBADTYPE;
     DBHOLD(dbase);
-    if (urecovery_AllBetter(dbase, readAny)==0) {
+#if defined(UBIK_PAUSE)
+    /* if we're polling the slave sites, wait until the returns
+     *  are all in.  Otherwise, the urecovery_CheckTid call may
+     *  glitch us. 
+     */
+    if (transMode == UBIK_WRITETRANS)
+       for (count = 75; dbase->flags & DBVOTING; --count) {
+           DBRELE(dbase);
+#ifdef GRAND_PAUSE_DEBUGGING
+           if (count == 75)
+               fprintf(stderr,
+                       "%ld: myport=%d: BeginTrans is waiting 'cause of voting conflict\n",
+                       time(0), ntohs(ubik_callPortal));
+           else
+#endif
+           if (count <= 0) {
+#if 1
+               fprintf(stderr,
+                       "%ld: myport=%d: BeginTrans failed because of voting conflict\n",
+                       time(0), ntohs(ubik_callPortal));
+#endif
+               return UNOQUORUM;       /* a white lie */
+           }
+#ifdef AFS_PTHREAD_ENV
+           sleep(2);
+#else
+           IOMGR_Sleep(2);
+#endif
+           DBHOLD(dbase);
+       }
+#endif /* UBIK_PAUSE */
+    if (urecovery_AllBetter(dbase, readAny) == 0) {
        DBRELE(dbase);
        return UNOQUORUM;
     }
     /* otherwise we have a quorum, use it */
 
     /* make sure that at most one write transaction occurs at any one time.  This
-       has nothing to do with transaction locking; that's enforced by the lock package.  However,
-       we can't even handle two non-conflicting writes, since our log and recovery modules
-       don't know how to restore one without possibly picking up some data from the other. */
+     * has nothing to do with transaction locking; that's enforced by the lock package.  However,
+     * we can't even handle two non-conflicting writes, since our log and recovery modules
+     * don't know how to restore one without possibly picking up some data from the other. */
     if (transMode == UBIK_WRITETRANS) {
        /* if we're writing already, wait */
-       while(dbase->flags & DBWRITING) {
+       while (dbase->flags & DBWRITING) {
            DBRELE(dbase);
+#ifdef AFS_PTHREAD_ENV
+           assert(pthread_mutex_lock(&dbase->flags_mutex) == 0);
+           assert(pthread_cond_wait(&dbase->flags_cond, &dbase->flags_mutex) == 0);
+           assert(pthread_mutex_unlock(&dbase->flags_mutex) == 0);
+#else
            LWP_WaitProcess(&dbase->flags);
+#endif
            DBHOLD(dbase);
        }
        if (!ubeacon_AmSyncSite()) {
@@ -344,27 +466,32 @@ static int BeginTrans(dbase, transMode, transPtr, readAny)
 
     /* create the transaction */
     code = udisk_begin(dbase, transMode, &jt); /* can't take address of register var */
-    tt = jt;       /* move to a register */
+    tt = jt;                   /* move to a register */
     if (code || tt == (struct ubik_trans *)NULL) {
        DBRELE(dbase);
        return code;
     }
-    if (readAny) tt->flags |= TRREADANY;
+    if (readAny)
+       tt->flags |= TRREADANY;
     /* label trans and dbase with new tid */
     tt->tid.epoch = ubik_epochTime;
     /* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */
     tt->tid.counter = (dbase->tidCounter += 2);
 
     if (transMode == UBIK_WRITETRANS) {
-      /* for a write trans, we have to keep track of the write tid counter too */
-      dbase->writeTidCounter += 2;
+       /* for a write trans, we have to keep track of the write tid counter too */
+#if defined(UBIK_PAUSE)
+       dbase->writeTidCounter = tt->tid.counter;
+#else
+       dbase->writeTidCounter += 2;
+#endif /* UBIK_PAUSE */
 
        /* next try to start transaction on appropriate number of machines */
        code = ContactQuorum(DISK_Begin, tt, 0);
        if (code) {
            /* we must abort the operation */
            udisk_abort(tt);
-           ContactQuorum(DISK_Abort, tt, 0);    /* force aborts to the others */
+           ContactQuorum(DISK_Abort, tt, 0);   /* force aborts to the others */
            udisk_end(tt);
            DBRELE(dbase);
            return code;
@@ -376,27 +503,28 @@ static int BeginTrans(dbase, transMode, transPtr, readAny)
     return 0;
 }
 
-int ubik_BeginTrans(dbase, transMode, transPtr)
-    register struct ubik_dbase *dbase; /* in */
-    afs_int32 transMode; /* in */
-    struct ubik_trans **transPtr;      /* out */ {
-       return BeginTrans(dbase, transMode, transPtr, 0);
+int
+ubik_BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
+               struct ubik_trans **transPtr)
+{
+    return BeginTrans(dbase, transMode, transPtr, 0);
+}
+
+int
+ubik_BeginTransReadAny(register struct ubik_dbase *dbase, afs_int32 transMode,
+                      struct ubik_trans **transPtr)
+{
+    return BeginTrans(dbase, transMode, transPtr, 1);
 }
 
-int ubik_BeginTransReadAny(dbase, transMode, transPtr)
-    register struct ubik_dbase *dbase; /* in */
-    afs_int32 transMode; /* in */
-    struct ubik_trans **transPtr;      /* out */ {
-       return BeginTrans(dbase, transMode, transPtr, 1);
-} 
 /* this routine ends a read or write transaction by aborting it */
-int ubik_AbortTrans(transPtr)
-    register struct ubik_trans *transPtr; /* in */ {
+int
+ubik_AbortTrans(register struct ubik_trans *transPtr)
+{
     register afs_int32 code;
     afs_int32 code2;
     register struct ubik_dbase *dbase;
-    
+
     dbase = transPtr->dbase;
     DBHOLD(dbase);
     memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
@@ -422,31 +550,32 @@ int ubik_AbortTrans(transPtr)
        DBRELE(dbase);
        return UNOTSYNC;
     }
-    
+
     /* now it is safe to try remote abort */
     code = ContactQuorum(DISK_Abort, transPtr, 0);
     code2 = udisk_abort(transPtr);
     udisk_end(transPtr);
     DBRELE(dbase);
-    return (code? code : code2);
+    return (code ? code : code2);
 }
 
 /* This routine ends a read or write transaction on the open transaction identified by transPtr.  It returns an error code. */
-int ubik_EndTrans(transPtr)
-    register struct ubik_trans *transPtr; /* in */ {
+int
+ubik_EndTrans(register struct ubik_trans *transPtr)
+{
     register afs_int32 code;
     struct timeval tv;
     afs_int32 realStart;
     register struct ubik_server *ts;
     afs_int32 now;
     register struct ubik_dbase *dbase;
-    
+
     if (transPtr->type == UBIK_WRITETRANS) {
-       code = ubik_Flush(transPtr);
-       if (code) {
-         ubik_AbortTrans(transPtr);
-         return(code);
-       }
+       code = ubik_Flush(transPtr);
+       if (code) {
+           ubik_AbortTrans(transPtr);
+           return (code);
+       }
     }
 
     dbase = transPtr->dbase;
@@ -461,87 +590,95 @@ int ubik_EndTrans(transPtr)
        return UNOQUORUM;
     }
 
-    if (transPtr->type == UBIK_READTRANS) { /* reads are easy */
+    if (transPtr->type == UBIK_READTRANS) {    /* reads are easy */
        code = udisk_commit(transPtr);
-       if (code == 0) goto success;    /* update cachedVersion correctly */
+       if (code == 0)
+           goto success;       /* update cachedVersion correctly */
        udisk_end(transPtr);
        DBRELE(dbase);
        return code;
     }
 
-    if (!ubeacon_AmSyncSite()) {    /* no longer sync site */
+    if (!ubeacon_AmSyncSite()) {       /* no longer sync site */
        udisk_abort(transPtr);
        udisk_end(transPtr);
        DBRELE(dbase);
        return UNOTSYNC;
     }
-    
+
     /* now it is safe to do commit */
     code = udisk_commit(transPtr);
-    if (code == 0) code = ContactQuorum(DISK_Commit, transPtr, CStampVersion);
+    if (code == 0)
+       code = ContactQuorum(DISK_Commit, transPtr, CStampVersion);
     if (code) {
        /* failed to commit, so must return failure.  Try to clear locks first, just for fun
-           Note that we don't know if this transaction will eventually commit at this point.
-           If it made it to a site that will be present in the next quorum, we win, otherwise
-           we lose.  If we contact a majority of sites, then we won't be here: contacting
-           a majority guarantees commit, since it guarantees that one dude will be a
-           member of the next quorum. */
+        * Note that we don't know if this transaction will eventually commit at this point.
+        * If it made it to a site that will be present in the next quorum, we win, otherwise
+        * we lose.  If we contact a majority of sites, then we won't be here: contacting
+        * a majority guarantees commit, since it guarantees that one dude will be a
+        * member of the next quorum. */
        ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
        udisk_end(transPtr);
        DBRELE(dbase);
        return code;
     }
     /* before we can start sending unlock messages, we must wait until all servers
-       that are possibly still functioning on the other side of a network partition
-       have timed out.  Check the server structures, compute how long to wait, then
-       start the unlocks */
+     * that are possibly still functioning on the other side of a network partition
+     * have timed out.  Check the server structures, compute how long to wait, then
+     * start the unlocks */
     realStart = FT_ApproxTime();
     while (1) {
        /* wait for all servers to time out */
        code = 0;
        now = FT_ApproxTime();
        /* check if we're still sync site, the guy should either come up
-           to us, or timeout.  Put safety check in anyway */
+        * to us, or timeout.  Put safety check in anyway */
        if (now - realStart > 10 * BIGTIME) {
            ubik_stats.escapes++;
            ubik_print("ubik escaping from commit wait\n");
            break;
        }
-       for(ts = ubik_servers; ts; ts=ts->next) {
+       for (ts = ubik_servers; ts; ts = ts->next) {
            if (!ts->beaconSinceDown && now <= ts->lastBeaconSent + BIGTIME) {
                /* this guy could have some damaged data, wait for him */
                code = 1;
                tv.tv_sec = 1;  /* try again after a while (ha ha) */
                tv.tv_usec = 0;
-               IOMGR_Select(0, 0, 0, 0, &tv);  /* poll, should we wait on something? */
+#ifdef AFS_PTHREAD_ENV
+               select(0, 0, 0, 0, &tv);
+#else
+               IOMGR_Select(0, 0, 0, 0, &tv);  /* poll, should we wait on something? */
+#endif
                break;
            }
        }
-       if (code == 0) break;       /* no down ones still pseudo-active */
+       if (code == 0)
+           break;              /* no down ones still pseudo-active */
     }
 
     /* finally, unlock all the dudes.  We can return success independent of the number of servers
-       that really unlock the dbase; the others will do it if/when they elect a new sync site.
-       The transaction is committed anyway, since we succeeded in contacting a quorum
-       at the start (when invoking the DiskCommit function).
-    */
+     * that really unlock the dbase; the others will do it if/when they elect a new sync site.
+     * The transaction is committed anyway, since we succeeded in contacting a quorum
+     * at the start (when invoking the DiskCommit function).
+     */
     ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
 
   success:
     udisk_end(transPtr);
     /* update version on successful EndTrans */
-    memcpy(&dbase->cachedVersion, &dbase->version, sizeof(struct ubik_version));
-    
+    memcpy(&dbase->cachedVersion, &dbase->version,
+          sizeof(struct ubik_version));
+
     DBRELE(dbase);
     return 0;
 }
 
 /* This routine reads length bytes into buffer from the current position in the database.  The file pointer is updated appropriately (by adding the number of bytes actually transferred), and the length actually transferred is stored in the long integer pointed to by length.  Note that *length is an INOUT parameter: at the start it represents the size of the buffer, and when done, it contains the number of bytes actually transferred.  A short read returns zero for an error code. */
 
-int ubik_Read(transPtr, buffer, length)
-    register struct ubik_trans *transPtr;      /* in */
-    char *buffer;   /* in */
-    afs_int32 length;   /* in */ {
+int
+ubik_Read(register struct ubik_trans *transPtr, char *buffer,
+         afs_int32 length)
+{
     register afs_int32 code;
 
     /* reads are easy to do: handle locally */
@@ -550,8 +687,10 @@ int ubik_Read(transPtr, buffer, length)
        DBRELE(transPtr->dbase);
        return UNOQUORUM;
     }
-    
-    code = udisk_read(transPtr, transPtr->seekFile, buffer, transPtr->seekPos, length);
+
+    code =
+       udisk_read(transPtr, transPtr->seekFile, buffer, transPtr->seekPos,
+                  length);
     if (code == 0) {
        transPtr->seekPos += length;
     }
@@ -563,31 +702,33 @@ int ubik_Read(transPtr, buffer, length)
  * flushes to the local disk and then uses ContactQuorum to write it to 
  * the other servers.
  */
-int ubik_Flush(transPtr)
-    struct ubik_trans *transPtr;
+int
+ubik_Flush(struct ubik_trans *transPtr)
 {
-    afs_int32 code, error=0;
+    afs_int32 code, error = 0;
 
     if (transPtr->type != UBIK_WRITETRANS)
-       return UBADTYPE;
-    if (!transPtr->iovec_info.iovec_wrt_len || !transPtr->iovec_info.iovec_wrt_val)
-       return 0;
+       return UBADTYPE;
+    if (!transPtr->iovec_info.iovec_wrt_len
+       || !transPtr->iovec_info.iovec_wrt_val)
+       return 0;
 
     DBHOLD(transPtr->dbase);
     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
-       ERROR_EXIT(UNOQUORUM);
-    if (!ubeacon_AmSyncSite())      /* only sync site can write */
-       ERROR_EXIT(UNOTSYNC);
+       ERROR_EXIT(UNOQUORUM);
+    if (!ubeacon_AmSyncSite()) /* only sync site can write */
+       ERROR_EXIT(UNOTSYNC);
 
     /* Update the rest of the servers in the quorum */
-    code = ContactQuorum(DISK_WriteV, transPtr, 0, 
-                        &transPtr->iovec_info, &transPtr->iovec_data);
+    code =
+       ContactQuorum(DISK_WriteV, transPtr, 0, &transPtr->iovec_info,
+                     &transPtr->iovec_data);
     if (code) {
-       udisk_abort(transPtr);
-       ContactQuorum(DISK_Abort, transPtr, 0);    /* force aborts to the others */
-       transPtr->iovec_info.iovec_wrt_len = 0;
-       transPtr->iovec_data.iovec_buf_len = 0;
-       ERROR_EXIT(code);
+       udisk_abort(transPtr);
+       ContactQuorum(DISK_Abort, transPtr, 0); /* force aborts to the others */
+       transPtr->iovec_info.iovec_wrt_len = 0;
+       transPtr->iovec_data.iovec_buf_len = 0;
+       ERROR_EXIT(code);
     }
 
     /* Wrote the buffers out, so start at scratch again */
@@ -599,75 +740,82 @@ int ubik_Flush(transPtr)
     return error;
 }
 
-int ubik_Write(transPtr, buffer, length)
-    register struct ubik_trans *transPtr;      /* in */
-    char *buffer;      /* in */
-    afs_int32 length;  /* in */
+int
+ubik_Write(register struct ubik_trans *transPtr, char *buffer,
+          afs_int32 length)
 {
     struct ubik_iovec *iovec;
-    afs_int32 code, error=0;
+    afs_int32 code, error = 0;
     afs_int32 pos, len, size;
 
     if (transPtr->type != UBIK_WRITETRANS)
-       return UBADTYPE;
+       return UBADTYPE;
     if (!length)
-       return 0;
+       return 0;
 
     if (length > IOVEC_MAXBUF) {
-       for (pos=0, len=length; len>0; len-=size, pos+=size) {
-         size = ((len < IOVEC_MAXBUF) ? len : IOVEC_MAXBUF);
-         code = ubik_Write(transPtr, &buffer[pos], size);
-         if (code) return (code);
-       }
-       return 0;
+       for (pos = 0, len = length; len > 0; len -= size, pos += size) {
+           size = ((len < IOVEC_MAXBUF) ? len : IOVEC_MAXBUF);
+           code = ubik_Write(transPtr, &buffer[pos], size);
+           if (code)
+               return (code);
+       }
+       return 0;
     }
 
     if (!transPtr->iovec_info.iovec_wrt_val) {
-       transPtr->iovec_info.iovec_wrt_len  = 0;
-       transPtr->iovec_info.iovec_wrt_val  = 
-        (struct ubik_iovec *)malloc(IOVEC_MAXWRT*sizeof(struct ubik_iovec));
-       transPtr->iovec_data.iovec_buf_len = 0;
-       transPtr->iovec_data.iovec_buf_val = (char *)malloc(IOVEC_MAXBUF);
-       if (!transPtr->iovec_info.iovec_wrt_val || !transPtr->iovec_data.iovec_buf_val) {
-         if (transPtr->iovec_info.iovec_wrt_val) free(transPtr->iovec_info.iovec_wrt_val);
-         transPtr->iovec_info.iovec_wrt_val = 0;
-         if (transPtr->iovec_data.iovec_buf_val) free(transPtr->iovec_data.iovec_buf_val);
-         transPtr->iovec_data.iovec_buf_val = 0;
-         return UNOMEM;
-       }
+       transPtr->iovec_info.iovec_wrt_len = 0;
+       transPtr->iovec_info.iovec_wrt_val =
+           (struct ubik_iovec *)malloc(IOVEC_MAXWRT *
+                                       sizeof(struct ubik_iovec));
+       transPtr->iovec_data.iovec_buf_len = 0;
+       transPtr->iovec_data.iovec_buf_val = (char *)malloc(IOVEC_MAXBUF);
+       if (!transPtr->iovec_info.iovec_wrt_val
+           || !transPtr->iovec_data.iovec_buf_val) {
+           if (transPtr->iovec_info.iovec_wrt_val)
+               free(transPtr->iovec_info.iovec_wrt_val);
+           transPtr->iovec_info.iovec_wrt_val = 0;
+           if (transPtr->iovec_data.iovec_buf_val)
+               free(transPtr->iovec_data.iovec_buf_val);
+           transPtr->iovec_data.iovec_buf_val = 0;
+           return UNOMEM;
+       }
     }
 
     /* If this write won't fit in the structure, then flush it out and start anew */
-    if ( (transPtr->iovec_info.iovec_wrt_len >= IOVEC_MAXWRT) ||
-        ((length + transPtr->iovec_data.iovec_buf_len) > IOVEC_MAXBUF) ) {
-       code = ubik_Flush(transPtr);
-       if (code) return (code);
+    if ((transPtr->iovec_info.iovec_wrt_len >= IOVEC_MAXWRT)
+       || ((length + transPtr->iovec_data.iovec_buf_len) > IOVEC_MAXBUF)) {
+       code = ubik_Flush(transPtr);
+       if (code)
+           return (code);
     }
 
     DBHOLD(transPtr->dbase);
     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
-       ERROR_EXIT(UNOQUORUM);
-    if (!ubeacon_AmSyncSite())      /* only sync site can write */
-       ERROR_EXIT(UNOTSYNC);
+       ERROR_EXIT(UNOQUORUM);
+    if (!ubeacon_AmSyncSite()) /* only sync site can write */
+       ERROR_EXIT(UNOTSYNC);
 
     /* Write to the local disk */
-    code = udisk_write(transPtr, transPtr->seekFile, buffer, 
-                                transPtr->seekPos, length);
+    code =
+       udisk_write(transPtr, transPtr->seekFile, buffer, transPtr->seekPos,
+                   length);
     if (code) {
-       udisk_abort(transPtr);
-       transPtr->iovec_info.iovec_wrt_len = 0;
-       transPtr->iovec_data.iovec_buf_len = 0;
-       DBRELE(transPtr->dbase);
-       return(code);
+       udisk_abort(transPtr);
+       transPtr->iovec_info.iovec_wrt_len = 0;
+       transPtr->iovec_data.iovec_buf_len = 0;
+       DBRELE(transPtr->dbase);
+       return (code);
     }
 
     /* Collect writes for the other ubik servers (to be done in bulk) */
     iovec = (struct ubik_iovec *)transPtr->iovec_info.iovec_wrt_val;
-    iovec[transPtr->iovec_info.iovec_wrt_len].file     = transPtr->seekFile;
+    iovec[transPtr->iovec_info.iovec_wrt_len].file = transPtr->seekFile;
     iovec[transPtr->iovec_info.iovec_wrt_len].position = transPtr->seekPos;
-    iovec[transPtr->iovec_info.iovec_wrt_len].length   = length;
-    
-    memcpy(&transPtr->iovec_data.iovec_buf_val[transPtr->iovec_data.iovec_buf_len], buffer, length);
+    iovec[transPtr->iovec_info.iovec_wrt_len].length = length;
+
+    memcpy(&transPtr->iovec_data.
+          iovec_buf_val[transPtr->iovec_data.iovec_buf_len], buffer, length);
 
     transPtr->iovec_info.iovec_wrt_len++;
     transPtr->iovec_data.iovec_buf_len += length;
@@ -680,16 +828,16 @@ int ubik_Write(transPtr, buffer, length)
 
 /* This sets the file pointer associated with the current transaction to the appropriate file and byte position.  Unlike Unix files, a transaction is labelled by both a file number (fileid) and a byte position relative to the specified file (position). */
 
-int ubik_Seek(transPtr, fileid, position)
-    register struct ubik_trans *transPtr;      /* IN */
-    afs_int32 fileid;    /* IN */
-    afs_int32 position;  /* IN */ {
+int
+ubik_Seek(register struct ubik_trans *transPtr, afs_int32 fileid,
+         afs_int32 position)
+{
     register afs_int32 code;
 
     DBHOLD(transPtr->dbase);
     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
        code = UNOQUORUM;
-    } else  {
+    } else {
        transPtr->seekFile = fileid;
        transPtr->seekPos = position;
        code = 0;
@@ -700,10 +848,10 @@ int ubik_Seek(transPtr, fileid, position)
 
 /* This call returns the file pointer associated with the specified transaction in fileid and position. */
 
-int ubik_Tell(transPtr, fileid, position)
-    register struct ubik_trans *transPtr;      /* IN */
-    afs_int32 *fileid;   /* OUT */
-    afs_int32 *position; /* OUT */ {
+int
+ubik_Tell(register struct ubik_trans *transPtr, afs_int32 * fileid,
+         afs_int32 * position)
+{
     DBHOLD(transPtr->dbase);
     *fileid = transPtr->seekFile;
     *position = transPtr->seekPos;
@@ -713,31 +861,34 @@ int ubik_Tell(transPtr, fileid, position)
 
 /* This sets the file size for the currently-selected file to length bytes, if length is less than the file's current size. */
 
-int ubik_Truncate(transPtr, length)
-    register struct ubik_trans *transPtr;      /* in */
-    afs_int32 length;    /* in */ {
-    afs_int32 code, error=0;
+int
+ubik_Truncate(register struct ubik_trans *transPtr, afs_int32 length)
+{
+    afs_int32 code, error = 0;
 
     /* Will also catch if not UBIK_WRITETRANS */
     code = ubik_Flush(transPtr);
-    if (code) return(code);
+    if (code)
+       return (code);
 
     DBHOLD(transPtr->dbase);
     /* first, check that quorum is still good, and that dbase is up-to-date */
     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
-       ERROR_EXIT(UNOQUORUM);
+       ERROR_EXIT(UNOQUORUM);
     if (!ubeacon_AmSyncSite())
-       ERROR_EXIT(UNOTSYNC);
-    
+       ERROR_EXIT(UNOTSYNC);
+
     /* now do the operation locally, and propagate it out */
     code = udisk_truncate(transPtr, transPtr->seekFile, length);
     if (!code) {
-       code = ContactQuorum(DISK_Truncate, transPtr, 0, transPtr->seekFile, length);
+       code =
+           ContactQuorum(DISK_Truncate, transPtr, 0, transPtr->seekFile,
+                         length);
     }
     if (code) {
        /* we must abort the operation */
        udisk_abort(transPtr);
-       ContactQuorum(DISK_Abort, transPtr, 0);    /* force aborts to the others */
+       ContactQuorum(DISK_Abort, transPtr, 0); /* force aborts to the others */
        ERROR_EXIT(code);
     }
 
@@ -747,40 +898,42 @@ int ubik_Truncate(transPtr, length)
 }
 
 /* set a lock; all locks are released on transaction end (commit/abort) */
-ubik_SetLock(atrans, apos, alen, atype)
-    struct ubik_trans *atrans;
-    afs_int32 apos, alen;                  /* apos and alen are not used */
-    int atype; {
-    afs_int32 code=0, error=0;
-    
+int
+ubik_SetLock(struct ubik_trans *atrans, afs_int32 apos, afs_int32 alen,
+            int atype)
+{
+    afs_int32 code = 0, error = 0;
+
     if (atype == LOCKWRITE) {
-       if (atrans->type == UBIK_READTRANS) return UBADTYPE;
-       code = ubik_Flush(atrans);
-       if (code) return(code);
+       if (atrans->type == UBIK_READTRANS)
+           return UBADTYPE;
+       code = ubik_Flush(atrans);
+       if (code)
+           return (code);
     }
 
     DBHOLD(atrans->dbase);
     if (atype == LOCKREAD) {
        code = ulock_getLock(atrans, atype, 1);
-        if (code) ERROR_EXIT(code);
-    }
-    else {
+       if (code)
+           ERROR_EXIT(code);
+    } else {
        /* first, check that quorum is still good, and that dbase is up-to-date */
        if (!urecovery_AllBetter(atrans->dbase, atrans->flags & TRREADANY))
-          ERROR_EXIT(UNOQUORUM);
+           ERROR_EXIT(UNOQUORUM);
        if (!ubeacon_AmSyncSite())
-          ERROR_EXIT(UNOTSYNC);
+           ERROR_EXIT(UNOTSYNC);
 
        /* now do the operation locally, and propagate it out */
        code = ulock_getLock(atrans, atype, 1);
        if (code == 0) {
-           code = ContactQuorum(DISK_Lock, atrans, 0, 0, 
-                                1/*unused*/, 1/*unused*/, LOCKWRITE);
+           code = ContactQuorum(DISK_Lock, atrans, 0, 0, 1 /*unused */ ,
+                                1 /*unused */ , LOCKWRITE);
        }
        if (code) {
            /* we must abort the operation */
            udisk_abort(atrans);
-           ContactQuorum(DISK_Abort, atrans, 0);    /* force aborts to the others */
+           ContactQuorum(DISK_Abort, atrans, 0);       /* force aborts to the others */
            ERROR_EXIT(code);
        }
     }
@@ -791,21 +944,29 @@ ubik_SetLock(atrans, apos, alen, atype)
 }
 
 /* utility to wait for a version # to change */
-int ubik_WaitVersion(adatabase, aversion)
-register struct ubik_version *aversion;
-register struct ubik_dbase *adatabase; {
+int
+ubik_WaitVersion(register struct ubik_dbase *adatabase,
+                register struct ubik_version *aversion)
+{
     while (1) {
        /* wait until version # changes, and then return */
        if (vcmp(*aversion, adatabase->version) != 0)
            return 0;
+#ifdef AFS_PTHREAD_ENV
+       assert(pthread_mutex_lock(&adatabase->version_mutex) == 0);
+       assert(pthread_cond_wait(&adatabase->version_cond,&adatabase->version_mutex) == 0);
+       assert(pthread_mutex_unlock(&adatabase->version_mutex) == 0);
+#else
        LWP_WaitProcess(&adatabase->version);   /* same vers, just wait */
+#endif
     }
 }
 
 /* utility to get the version of the dbase a transaction is dealing with */
-int ubik_GetVersion(atrans, avers)
-register struct ubik_trans *atrans;
-register struct ubik_version *avers; {
+int
+ubik_GetVersion(register struct ubik_trans *atrans,
+               register struct ubik_version *avers)
+{
     *avers = atrans->dbase->version;
     return 0;
 }
@@ -815,21 +976,22 @@ register struct ubik_version *avers; {
    caller is a server caching part of the Ubik database, it should invalidate
    that cache.  A return value of -1 means bad (NULL) argument. */
 
-int ubik_CacheUpdate (atrans)
-  register struct ubik_trans *atrans;
+int
+ubik_CacheUpdate(register struct ubik_trans *atrans)
 {
-    if (!(atrans && atrans->dbase)) return -1;
+    if (!(atrans && atrans->dbase))
+       return -1;
     return vcmp(atrans->dbase->cachedVersion, atrans->dbase->version) != 0;
 }
 
-int panic(a, b, c, d)
-    char *a, *b, *c, *d;
+int
+panic(char *a, char *b, char *c, char *d)
 {
     ubik_print("Ubik PANIC: ");
     ubik_print(a, b, c, d);
     abort();
-    ubik_print("BACK FROM ABORT\n");    /* shouldn't come back */
-    exit(1);                       /* never know, though  */
+    ubik_print("BACK FROM ABORT\n");   /* shouldn't come back */
+    exit(1);                   /* never know, though  */
 }
 
 /*
@@ -837,16 +999,14 @@ int panic(a, b, c, d)
 ** the primary IP address that is on the host passed in.
 */
 afs_uint32
-ubikGetPrimaryInterfaceAddr(addr)
-afs_uint32  addr;                      /* network byte order */
+ubikGetPrimaryInterfaceAddr(afs_uint32 addr)
 {
-        struct ubik_server *ts;
-        int j;
-
-        for ( ts=ubik_servers; ts; ts=ts->next )
-            for ( j=0; j < UBIK_MAX_INTERFACE_ADDR; j++)
-                if ( ts->addr[j] == addr )
-                    return  ts->addr[0];       /* net byte order */
-        return 0; /* if not in server database, return error */
+    struct ubik_server *ts;
+    int j;
+
+    for (ts = ubik_servers; ts; ts = ts->next)
+       for (j = 0; j < UBIK_MAX_INTERFACE_ADDR; j++)
+           if (ts->addr[j] == addr)
+               return ts->addr[0];     /* net byte order */
+    return 0;                  /* if not in server database, return error */
 }
-