DEVEL15-kill-ubik-pthread-env-20080718
[openafs.git] / src / ubik / ubik.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 RCSID
14     ("$Header$");
15
16 #include <sys/types.h>
17 #ifdef AFS_NT40_ENV
18 #include <winsock2.h>
19 #else
20 #include <sys/file.h>
21 #include <netinet/in.h>
22 #include <sys/param.h>
23 #endif
24 #include <time.h>
25 #include <lock.h>
26 #include <string.h>
27 #include <rx/xdr.h>
28 #include <rx/rx.h>
29 #include <afs/cellconfig.h>
30
31 #define UBIK_INTERNALS
32 #include "ubik.h"
33 #include "ubik_int.h"
34
35 #include <lwp.h>   /* temporary hack by klm */
36
37 #define ERROR_EXIT(code) {error=(code); goto error_exit;}
38
39 /*  This system is organized in a hierarchical set of related modules.  Modules
40     at one level can only call modules at the same level or below.
41     
42     At the bottom level (0) we have R, RFTP, LWP and IOMGR, i.e. the basic
43     operating system primitives.
44     
45     At the next level (1) we have
46
47         VOTER--The module responsible for casting votes when asked.  It is also
48         responsible for determining whether this server should try to become
49         a synchronization site.
50         
51         BEACONER--The module responsible for sending keep-alives out when a
52         server is actually the sync site, or trying to become a sync site.
53         
54         DISK--The module responsible for representing atomic transactions
55         on the local disk.  It maintains a new-value only log.
56         
57         LOCK--The module responsible for locking byte ranges in the database file.
58         
59     At the next level (2) we have
60           
61         RECOVERY--The module responsible for ensuring that all members of a quorum
62         have the same up-to-date database after a new synchronization site is
63         elected.  This module runs only on the synchronization site.
64         
65     At the next level (3) we have
66     
67         REMOTE--The module responsible for interpreting requests from the sync
68         site and applying them to the database, after obtaining the appropriate
69         locks.
70         
71     At the next level (4) we have
72     
73         UBIK--The module users call to perform operations on the database.
74 */
75
76
77 /* some globals */
78 afs_int32 ubik_quorum = 0;
79 struct ubik_dbase *ubik_dbase = 0;
80 struct ubik_stats ubik_stats;
81 afs_uint32 ubik_host[UBIK_MAX_INTERFACE_ADDR];
82 afs_int32 ubik_epochTime = 0;
83 afs_int32 urecovery_state = 0;
84 int (*ubik_SRXSecurityProc) ();
85 char *ubik_SRXSecurityRock;
86 struct ubik_server *ubik_servers;
87 short ubik_callPortal;
88
89 static int BeginTrans();
90
91 struct rx_securityClass *ubik_sc[3];
92
93 /* perform an operation at a quorum, handling error conditions.  return 0 if
94     all worked, otherwise mark failing server as down and return UERROR
95
96     Note that if any server misses an update, we must wait BIGTIME seconds before
97     allowing the transaction to commit, to ensure that the missing and possibly still
98     functioning server times out and stop handing out old data.  This is done in the commit
99     code, where we wait for a server marked down to have stayed down for BIGTIME seconds
100     before we allow a transaction to commit.  A server that fails but comes back up won't give
101     out old data because it is sent the sync count along with the beacon message that
102     marks it as *really* up (beaconSinceDown).
103 */
104 #define CStampVersion       1   /* meaning set ts->version */
105 afs_int32
106 ContactQuorum(aproc, atrans, aflags, aparm0, aparm1, aparm2, aparm3, aparm4,
107               aparm5)
108      int (*aproc) ();
109      int aflags;
110      register struct ubik_trans *atrans;
111      long aparm0, aparm1, aparm2, aparm3, aparm4, aparm5;
112 {
113     register struct ubik_server *ts;
114     register afs_int32 code;
115     afs_int32 rcode, okcalls;
116
117     rcode = 0;
118     okcalls = 0;
119     for (ts = ubik_servers; ts; ts = ts->next) {
120         /* for each server */
121         if (!ts->up || !ts->currentDB) {
122             ts->currentDB = 0;  /* db is no longer current; we just missed an update */
123             continue;           /* not up-to-date, don't bother */
124         }
125         code =
126             (*aproc) (ts->disk_rxcid, &atrans->tid, aparm0, aparm1, aparm2,
127                       aparm3, aparm4, aparm5);
128         if ((aproc == DISK_WriteV) && (code <= -450) && (code > -500)) {
129             /* An RPC interface mismatch (as defined in comerr/error_msg.c).
130              * Un-bulk the entries and do individual DISK_Write calls
131              * instead of DISK_WriteV.
132              */
133             iovec_wrt *iovec_infoP = (iovec_wrt *) aparm0;
134             iovec_buf *iovec_dataP = (iovec_buf *) aparm1;
135             struct ubik_iovec *iovec =
136                 (struct ubik_iovec *)iovec_infoP->iovec_wrt_val;
137             char *iobuf = (char *)iovec_dataP->iovec_buf_val;
138             bulkdata tcbs;
139             afs_int32 i, offset;
140
141             for (i = 0, offset = 0; i < iovec_infoP->iovec_wrt_len; i++) {
142                 /* Sanity check for going off end of buffer */
143                 if ((offset + iovec[i].length) > iovec_dataP->iovec_buf_len) {
144                     code = UINTERNAL;
145                     break;
146                 }
147                 tcbs.bulkdata_len = iovec[i].length;
148                 tcbs.bulkdata_val = &iobuf[offset];
149                 code =
150                     DISK_Write(ts->disk_rxcid, &atrans->tid, iovec[i].file,
151                                iovec[i].position, &tcbs);
152                 if (code)
153                     break;
154
155                 offset += iovec[i].length;
156             }
157         }
158         if (code) {             /* failure */
159             rcode = code;
160             ts->up = 0;         /* mark as down now; beacons will no longer be sent */
161             ts->currentDB = 0;
162             ts->beaconSinceDown = 0;
163             urecovery_LostServer();     /* tell recovery to try to resend dbase later */
164         } else {                /* success */
165             if (!ts->isClone)
166                 okcalls++;      /* count up how many worked */
167             if (aflags & CStampVersion) {
168                 ts->version = atrans->dbase->version;
169             }
170         }
171     }
172     /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
173     if (okcalls + 1 >= ubik_quorum)
174         return 0;
175     else
176         return rcode;
177 }
178
179 /* This routine initializes the ubik system for a set of servers.  It returns 0 for success, or an error code on failure.  The set of servers is specified by serverList; nServers gives the number of entries in this array.  Finally, dbase is the returned structure representing this instance of a ubik; it is passed to various calls below.  The variable pathName provides an initial prefix used for naming storage files used by this system.  It should perhaps be generalized to a low-level disk interface providing read, write, file enumeration and sync operations.
180
181     Note that the host named by myHost should not also be listed in serverList.
182 */
183
184 int
185 ubik_ServerInitCommon(afs_int32 myHost, short myPort,
186                       struct afsconf_cell *info, char clones[],
187                       afs_int32 serverList[], char *pathName,
188                       struct ubik_dbase **dbase)
189 {
190     register struct ubik_dbase *tdb;
191     register afs_int32 code;
192 #ifdef AFS_PTHREAD_ENV
193     pthread_t rxServerThread;        /* pthread variables */
194     pthread_t ubeacon_InteractThread;
195     pthread_t urecovery_InteractThread;
196     pthread_attr_t rxServer_tattr;
197     pthread_attr_t ubeacon_Interact_tattr;
198     pthread_attr_t urecovery_Interact_tattr;
199 #else
200     PROCESS junk;
201 #endif
202
203     afs_int32 secIndex;
204     struct rx_securityClass *secClass;
205
206     struct rx_service *tservice;
207     extern int VOTE_ExecuteRequest(), DISK_ExecuteRequest();
208     extern int rx_stackSize;
209
210     initialize_U_error_table();
211
212     tdb = (struct ubik_dbase *)malloc(sizeof(struct ubik_dbase));
213     tdb->pathName = (char *)malloc(strlen(pathName) + 1);
214     strcpy(tdb->pathName, pathName);
215     tdb->activeTrans = (struct ubik_trans *)0;
216     memset(&tdb->version, 0, sizeof(struct ubik_version));
217     memset(&tdb->cachedVersion, 0, sizeof(struct ubik_version));
218     Lock_Init(&tdb->versionLock);
219     tdb->flags = 0;
220     tdb->read = uphys_read;
221     tdb->write = uphys_write;
222     tdb->truncate = uphys_truncate;
223     tdb->open = uphys_invalidate;       /* this function isn't used any more */
224     tdb->sync = uphys_sync;
225     tdb->stat = uphys_stat;
226     tdb->getlabel = uphys_getlabel;
227     tdb->setlabel = uphys_setlabel;
228     tdb->getnfiles = uphys_getnfiles;
229     tdb->readers = 0;
230     tdb->tidCounter = tdb->writeTidCounter = 0;
231     *dbase = tdb;
232     ubik_dbase = tdb;           /* for now, only one db per server; can fix later when we have names for the other dbases */
233
234     /* initialize RX */
235
236     /* the following call is idempotent so when/if it got called earlier,
237      * by whatever called us, it doesn't really matter -- klm */
238     code = rx_Init(myPort);
239     if (code < 0)
240         return code;
241
242     ubik_callPortal = myPort;
243     /* try to get an additional security object */
244     ubik_sc[0] = rxnull_NewServerSecurityObject();
245     ubik_sc[1] = 0;
246     ubik_sc[2] = 0;
247     if (ubik_SRXSecurityProc) {
248         code =
249             (*ubik_SRXSecurityProc) (ubik_SRXSecurityRock, &secClass,
250                                      &secIndex);
251         if (code == 0) {
252             ubik_sc[secIndex] = secClass;
253         }
254     }
255     /* for backwards compat this should keep working as it does now 
256        and not host bind */
257 #if 0
258     /* This really needs to be up above, where I have put it.  It works
259      * here when we're non-pthreaded, but the code above, when using
260      * pthreads may (and almost certainly does) end up calling on a
261      * pthread resource which gets initialized by rx_Init.  The end
262      * result is that an assert fails and the program dies. -- klm
263      */
264     code = rx_Init(myPort);
265     if (code < 0)
266         return code;
267 #endif
268
269     tservice =
270         rx_NewService(0, VOTE_SERVICE_ID, "VOTE", ubik_sc, 3,
271                       VOTE_ExecuteRequest);
272     if (tservice == (struct rx_service *)0) {
273         ubik_dprint("Could not create VOTE rx service!\n");
274         return -1;
275     }
276     rx_SetMinProcs(tservice, 2);
277     rx_SetMaxProcs(tservice, 3);
278
279     tservice =
280         rx_NewService(0, DISK_SERVICE_ID, "DISK", ubik_sc, 3,
281                       DISK_ExecuteRequest);
282     if (tservice == (struct rx_service *)0) {
283         ubik_dprint("Could not create DISK rx service!\n");
284         return -1;
285     }
286     rx_SetMinProcs(tservice, 2);
287     rx_SetMaxProcs(tservice, 3);
288
289     /* start an rx_ServerProc to handle incoming RPC's in particular the 
290      * UpdateInterfaceAddr RPC that occurs in ubeacon_InitServerList. This avoids
291      * the "steplock" problem in ubik initialization. Defect 11037.
292      */
293 #ifdef AFS_PTHREAD_ENV
294 /* do assert stuff */
295     assert(pthread_attr_init(&rxServer_tattr) == 0);
296     assert(pthread_attr_setdetachstate(&rxServer_tattr, PTHREAD_CREATE_DETACHED) == 0);
297 /*    assert(pthread_attr_setstacksize(&rxServer_tattr, rx_stackSize) == 0); */
298
299     assert(pthread_create(&rxServerThread, &rxServer_tattr, (void *)rx_ServerProc, NULL) == 0);
300 #else
301     LWP_CreateProcess(rx_ServerProc, rx_stackSize, RX_PROCESS_PRIORITY,
302               NULL, "rx_ServerProc", &junk);
303 #endif
304
305     /* do basic initialization */
306     code = uvote_Init();
307     if (code)
308         return code;
309     code = urecovery_Initialize(tdb);
310     if (code)
311         return code;
312     if (info)
313         code = ubeacon_InitServerListByInfo(myHost, info, clones);
314     else
315         code = ubeacon_InitServerList(myHost, serverList);
316     if (code)
317         return code;
318
319     /* now start up async processes */
320 #ifdef AFS_PTHREAD_ENV
321 /* do assert stuff */
322     assert(pthread_attr_init(&ubeacon_Interact_tattr) == 0);
323     assert(pthread_attr_setdetachstate(&ubeacon_Interact_tattr, PTHREAD_CREATE_DETACHED) == 0);
324 /*    assert(pthread_attr_setstacksize(&ubeacon_Interact_tattr, 16384) == 0); */
325     /*  need another attr set here for priority???  - klm */
326
327     assert(pthread_create(&ubeacon_InteractThread, &ubeacon_Interact_tattr,
328            (void *)ubeacon_Interact, NULL) == 0);
329 #else
330     code = LWP_CreateProcess(ubeacon_Interact, 16384 /*8192 */ ,
331                              LWP_MAX_PRIORITY - 1, (void *)0, "beacon",
332                              &junk);
333     if (code)
334         return code;
335 #endif
336
337 #ifdef AFS_PTHREAD_ENV
338 /* do assert stuff */
339     assert(pthread_attr_init(&urecovery_Interact_tattr) == 0);
340     assert(pthread_attr_setdetachstate(&urecovery_Interact_tattr, PTHREAD_CREATE_DETACHED) == 0);
341 /*    assert(pthread_attr_setstacksize(&urecovery_Interact_tattr, 16384) == 0); */
342     /*  need another attr set here for priority???  - klm */
343
344     assert(pthread_create(&urecovery_InteractThread, &urecovery_Interact_tattr,
345            (void *)urecovery_Interact, NULL) == 0);
346
347     return 0;  /* is this correct?  - klm */
348 #else  
349     code = LWP_CreateProcess(urecovery_Interact, 16384 /*8192 */ ,
350                              LWP_MAX_PRIORITY - 1, (void *)0, "recovery",
351                              &junk);
352     return code;
353 #endif
354
355 }
356
357 int
358 ubik_ServerInitByInfo(afs_int32 myHost, short myPort,
359                       struct afsconf_cell *info, char clones[],
360                       char *pathName, struct ubik_dbase **dbase)
361 {
362     afs_int32 code;
363
364     code =
365         ubik_ServerInitCommon(myHost, myPort, info, clones, 0, pathName,
366                               dbase);
367     return code;
368 }
369
370 int
371 ubik_ServerInit(afs_int32 myHost, short myPort, afs_int32 serverList[],
372                 char *pathName, struct ubik_dbase **dbase)
373 {
374     afs_int32 code;
375
376     code =
377         ubik_ServerInitCommon(myHost, myPort, (struct afsconf_cell *)0, 0,
378                               serverList, pathName, dbase);
379     return code;
380 }
381
382 /*  This routine begins a read or write transaction on the transaction
383     identified by transPtr, in the dbase named by dbase.  An open mode of
384     ubik_READTRANS identifies this as a read transaction, while a mode of
385     ubik_WRITETRANS identifies this as a write transaction.  transPtr 
386     is set to the returned transaction control block. The readAny flag is
387     set to 0 or 1 by the wrapper functions ubik_BeginTrans() or 
388     ubik_BeginTransReadAny() below.
389
390     We can only begin transaction when we have an up-to-date database.
391 */
392
393 static int
394 BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
395            struct ubik_trans **transPtr, int readAny)
396 {
397     struct ubik_trans *jt;
398     register struct ubik_trans *tt;
399     register afs_int32 code;
400 #if defined(UBIK_PAUSE)
401     int count;
402 #endif /* UBIK_PAUSE */
403
404     if ((transMode != UBIK_READTRANS) && readAny)
405         return UBADTYPE;
406     DBHOLD(dbase);
407 #if defined(UBIK_PAUSE)
408     /* if we're polling the slave sites, wait until the returns
409      *  are all in.  Otherwise, the urecovery_CheckTid call may
410      *  glitch us. 
411      */
412     if (transMode == UBIK_WRITETRANS)
413         for (count = 75; dbase->flags & DBVOTING; --count) {
414             DBRELE(dbase);
415 #ifdef GRAND_PAUSE_DEBUGGING
416             if (count == 75)
417                 fprintf(stderr,
418                         "%ld: myport=%d: BeginTrans is waiting 'cause of voting conflict\n",
419                         time(0), ntohs(ubik_callPortal));
420             else
421 #endif
422             if (count <= 0) {
423 #if 1
424                 fprintf(stderr,
425                         "%ld: myport=%d: BeginTrans failed because of voting conflict\n",
426                         time(0), ntohs(ubik_callPortal));
427 #endif
428                 return UNOQUORUM;       /* a white lie */
429             }
430 #ifdef AFS_PTHREAD_ENV
431             sleep(2);
432 #else
433             IOMGR_Sleep(2);
434 #endif
435             DBHOLD(dbase);
436         }
437 #endif /* UBIK_PAUSE */
438     if (urecovery_AllBetter(dbase, readAny) == 0) {
439         DBRELE(dbase);
440         return UNOQUORUM;
441     }
442     /* otherwise we have a quorum, use it */
443
444     /* make sure that at most one write transaction occurs at any one time.  This
445      * has nothing to do with transaction locking; that's enforced by the lock package.  However,
446      * we can't even handle two non-conflicting writes, since our log and recovery modules
447      * don't know how to restore one without possibly picking up some data from the other. */
448     if (transMode == UBIK_WRITETRANS) {
449         /* if we're writing already, wait */
450         while (dbase->flags & DBWRITING) {
451             DBRELE(dbase);
452 #ifdef AFS_PTHREAD_ENV
453             assert(pthread_mutex_lock(&dbase->flags_mutex) == 0);
454             assert(pthread_cond_wait(&dbase->flags_cond, &dbase->flags_mutex) == 0);
455             assert(pthread_mutex_unlock(&dbase->flags_mutex) == 0);
456 #else
457             LWP_WaitProcess(&dbase->flags);
458 #endif
459             DBHOLD(dbase);
460         }
461         if (!ubeacon_AmSyncSite()) {
462             DBRELE(dbase);
463             return UNOTSYNC;
464         }
465     }
466
467     /* create the transaction */
468     code = udisk_begin(dbase, transMode, &jt);  /* can't take address of register var */
469     tt = jt;                    /* move to a register */
470     if (code || tt == (struct ubik_trans *)NULL) {
471         DBRELE(dbase);
472         return code;
473     }
474     if (readAny)
475         tt->flags |= TRREADANY;
476     /* label trans and dbase with new tid */
477     tt->tid.epoch = ubik_epochTime;
478     /* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */
479     tt->tid.counter = (dbase->tidCounter += 2);
480
481     if (transMode == UBIK_WRITETRANS) {
482         /* for a write trans, we have to keep track of the write tid counter too */
483 #if defined(UBIK_PAUSE)
484         dbase->writeTidCounter = tt->tid.counter;
485 #else
486         dbase->writeTidCounter += 2;
487 #endif /* UBIK_PAUSE */
488
489         /* next try to start transaction on appropriate number of machines */
490         code = ContactQuorum(DISK_Begin, tt, 0);
491         if (code) {
492             /* we must abort the operation */
493             udisk_abort(tt);
494             ContactQuorum(DISK_Abort, tt, 0);   /* force aborts to the others */
495             udisk_end(tt);
496             DBRELE(dbase);
497             return code;
498         }
499     }
500
501     *transPtr = tt;
502     DBRELE(dbase);
503     return 0;
504 }
505
506 int
507 ubik_BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
508                 struct ubik_trans **transPtr)
509 {
510     return BeginTrans(dbase, transMode, transPtr, 0);
511 }
512
513 int
514 ubik_BeginTransReadAny(register struct ubik_dbase *dbase, afs_int32 transMode,
515                        struct ubik_trans **transPtr)
516 {
517     return BeginTrans(dbase, transMode, transPtr, 1);
518 }
519
520 /* this routine ends a read or write transaction by aborting it */
521 int
522 ubik_AbortTrans(register struct ubik_trans *transPtr)
523 {
524     register afs_int32 code;
525     afs_int32 code2;
526     register struct ubik_dbase *dbase;
527
528     dbase = transPtr->dbase;
529     DBHOLD(dbase);
530     memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
531     /* see if we're still up-to-date */
532     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
533         udisk_abort(transPtr);
534         udisk_end(transPtr);
535         DBRELE(dbase);
536         return UNOQUORUM;
537     }
538
539     if (transPtr->type == UBIK_READTRANS) {
540         code = udisk_abort(transPtr);
541         udisk_end(transPtr);
542         DBRELE(dbase);
543         return code;
544     }
545
546     /* below here, we know we're doing a write transaction */
547     if (!ubeacon_AmSyncSite()) {
548         udisk_abort(transPtr);
549         udisk_end(transPtr);
550         DBRELE(dbase);
551         return UNOTSYNC;
552     }
553
554     /* now it is safe to try remote abort */
555     code = ContactQuorum(DISK_Abort, transPtr, 0);
556     code2 = udisk_abort(transPtr);
557     udisk_end(transPtr);
558     DBRELE(dbase);
559     return (code ? code : code2);
560 }
561
562 /* This routine ends a read or write transaction on the open transaction identified by transPtr.  It returns an error code. */
563 int
564 ubik_EndTrans(register struct ubik_trans *transPtr)
565 {
566     register afs_int32 code;
567     struct timeval tv;
568     afs_int32 realStart;
569     register struct ubik_server *ts;
570     afs_int32 now;
571     register struct ubik_dbase *dbase;
572
573     if (transPtr->type == UBIK_WRITETRANS) {
574         code = ubik_Flush(transPtr);
575         if (code) {
576             ubik_AbortTrans(transPtr);
577             return (code);
578         }
579     }
580
581     dbase = transPtr->dbase;
582     DBHOLD(dbase);
583     memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
584
585     /* give up if no longer current */
586     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
587         udisk_abort(transPtr);
588         udisk_end(transPtr);
589         DBRELE(dbase);
590         return UNOQUORUM;
591     }
592
593     if (transPtr->type == UBIK_READTRANS) {     /* reads are easy */
594         code = udisk_commit(transPtr);
595         if (code == 0)
596             goto success;       /* update cachedVersion correctly */
597         udisk_end(transPtr);
598         DBRELE(dbase);
599         return code;
600     }
601
602     if (!ubeacon_AmSyncSite()) {        /* no longer sync site */
603         udisk_abort(transPtr);
604         udisk_end(transPtr);
605         DBRELE(dbase);
606         return UNOTSYNC;
607     }
608
609     /* now it is safe to do commit */
610     code = udisk_commit(transPtr);
611     if (code == 0)
612         code = ContactQuorum(DISK_Commit, transPtr, CStampVersion);
613     if (code) {
614         /* failed to commit, so must return failure.  Try to clear locks first, just for fun
615          * Note that we don't know if this transaction will eventually commit at this point.
616          * If it made it to a site that will be present in the next quorum, we win, otherwise
617          * we lose.  If we contact a majority of sites, then we won't be here: contacting
618          * a majority guarantees commit, since it guarantees that one dude will be a
619          * member of the next quorum. */
620         ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
621         udisk_end(transPtr);
622         DBRELE(dbase);
623         return code;
624     }
625     /* before we can start sending unlock messages, we must wait until all servers
626      * that are possibly still functioning on the other side of a network partition
627      * have timed out.  Check the server structures, compute how long to wait, then
628      * start the unlocks */
629     realStart = FT_ApproxTime();
630     while (1) {
631         /* wait for all servers to time out */
632         code = 0;
633         now = FT_ApproxTime();
634         /* check if we're still sync site, the guy should either come up
635          * to us, or timeout.  Put safety check in anyway */
636         if (now - realStart > 10 * BIGTIME) {
637             ubik_stats.escapes++;
638             ubik_print("ubik escaping from commit wait\n");
639             break;
640         }
641         for (ts = ubik_servers; ts; ts = ts->next) {
642             if (!ts->beaconSinceDown && now <= ts->lastBeaconSent + BIGTIME) {
643                 /* this guy could have some damaged data, wait for him */
644                 code = 1;
645                 tv.tv_sec = 1;  /* try again after a while (ha ha) */
646                 tv.tv_usec = 0;
647 #ifdef AFS_PTHREAD_ENV
648                 select(0, 0, 0, 0, &tv);
649 #else
650                 IOMGR_Select(0, 0, 0, 0, &tv);  /* poll, should we wait on something? */
651 #endif
652                 break;
653             }
654         }
655         if (code == 0)
656             break;              /* no down ones still pseudo-active */
657     }
658
659     /* finally, unlock all the dudes.  We can return success independent of the number of servers
660      * that really unlock the dbase; the others will do it if/when they elect a new sync site.
661      * The transaction is committed anyway, since we succeeded in contacting a quorum
662      * at the start (when invoking the DiskCommit function).
663      */
664     ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
665
666   success:
667     udisk_end(transPtr);
668     /* update version on successful EndTrans */
669     memcpy(&dbase->cachedVersion, &dbase->version,
670            sizeof(struct ubik_version));
671
672     DBRELE(dbase);
673     return 0;
674 }
675
676 /* This routine reads length bytes into buffer from the current position in the database.  The file pointer is updated appropriately (by adding the number of bytes actually transferred), and the length actually transferred is stored in the long integer pointed to by length.  Note that *length is an INOUT parameter: at the start it represents the size of the buffer, and when done, it contains the number of bytes actually transferred.  A short read returns zero for an error code. */
677
678 int
679 ubik_Read(register struct ubik_trans *transPtr, char *buffer,
680           afs_int32 length)
681 {
682     register afs_int32 code;
683
684     /* reads are easy to do: handle locally */
685     DBHOLD(transPtr->dbase);
686     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
687         DBRELE(transPtr->dbase);
688         return UNOQUORUM;
689     }
690
691     code =
692         udisk_read(transPtr, transPtr->seekFile, buffer, transPtr->seekPos,
693                    length);
694     if (code == 0) {
695         transPtr->seekPos += length;
696     }
697     DBRELE(transPtr->dbase);
698     return code;
699 }
700
701 /* This routine will flush the io data in the iovec structures. It first
702  * flushes to the local disk and then uses ContactQuorum to write it to 
703  * the other servers.
704  */
705 int
706 ubik_Flush(struct ubik_trans *transPtr)
707 {
708     afs_int32 code, error = 0;
709
710     if (transPtr->type != UBIK_WRITETRANS)
711         return UBADTYPE;
712     if (!transPtr->iovec_info.iovec_wrt_len
713         || !transPtr->iovec_info.iovec_wrt_val)
714         return 0;
715
716     DBHOLD(transPtr->dbase);
717     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
718         ERROR_EXIT(UNOQUORUM);
719     if (!ubeacon_AmSyncSite())  /* only sync site can write */
720         ERROR_EXIT(UNOTSYNC);
721
722     /* Update the rest of the servers in the quorum */
723     code =
724         ContactQuorum(DISK_WriteV, transPtr, 0, &transPtr->iovec_info,
725                       &transPtr->iovec_data);
726     if (code) {
727         udisk_abort(transPtr);
728         ContactQuorum(DISK_Abort, transPtr, 0); /* force aborts to the others */
729         transPtr->iovec_info.iovec_wrt_len = 0;
730         transPtr->iovec_data.iovec_buf_len = 0;
731         ERROR_EXIT(code);
732     }
733
734     /* Wrote the buffers out, so start at scratch again */
735     transPtr->iovec_info.iovec_wrt_len = 0;
736     transPtr->iovec_data.iovec_buf_len = 0;
737
738   error_exit:
739     DBRELE(transPtr->dbase);
740     return error;
741 }
742
743 int
744 ubik_Write(register struct ubik_trans *transPtr, char *buffer,
745            afs_int32 length)
746 {
747     struct ubik_iovec *iovec;
748     afs_int32 code, error = 0;
749     afs_int32 pos, len, size;
750
751     if (transPtr->type != UBIK_WRITETRANS)
752         return UBADTYPE;
753     if (!length)
754         return 0;
755
756     if (length > IOVEC_MAXBUF) {
757         for (pos = 0, len = length; len > 0; len -= size, pos += size) {
758             size = ((len < IOVEC_MAXBUF) ? len : IOVEC_MAXBUF);
759             code = ubik_Write(transPtr, &buffer[pos], size);
760             if (code)
761                 return (code);
762         }
763         return 0;
764     }
765
766     if (!transPtr->iovec_info.iovec_wrt_val) {
767         transPtr->iovec_info.iovec_wrt_len = 0;
768         transPtr->iovec_info.iovec_wrt_val =
769             (struct ubik_iovec *)malloc(IOVEC_MAXWRT *
770                                         sizeof(struct ubik_iovec));
771         transPtr->iovec_data.iovec_buf_len = 0;
772         transPtr->iovec_data.iovec_buf_val = (char *)malloc(IOVEC_MAXBUF);
773         if (!transPtr->iovec_info.iovec_wrt_val
774             || !transPtr->iovec_data.iovec_buf_val) {
775             if (transPtr->iovec_info.iovec_wrt_val)
776                 free(transPtr->iovec_info.iovec_wrt_val);
777             transPtr->iovec_info.iovec_wrt_val = 0;
778             if (transPtr->iovec_data.iovec_buf_val)
779                 free(transPtr->iovec_data.iovec_buf_val);
780             transPtr->iovec_data.iovec_buf_val = 0;
781             return UNOMEM;
782         }
783     }
784
785     /* If this write won't fit in the structure, then flush it out and start anew */
786     if ((transPtr->iovec_info.iovec_wrt_len >= IOVEC_MAXWRT)
787         || ((length + transPtr->iovec_data.iovec_buf_len) > IOVEC_MAXBUF)) {
788         code = ubik_Flush(transPtr);
789         if (code)
790             return (code);
791     }
792
793     DBHOLD(transPtr->dbase);
794     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
795         ERROR_EXIT(UNOQUORUM);
796     if (!ubeacon_AmSyncSite())  /* only sync site can write */
797         ERROR_EXIT(UNOTSYNC);
798
799     /* Write to the local disk */
800     code =
801         udisk_write(transPtr, transPtr->seekFile, buffer, transPtr->seekPos,
802                     length);
803     if (code) {
804         udisk_abort(transPtr);
805         transPtr->iovec_info.iovec_wrt_len = 0;
806         transPtr->iovec_data.iovec_buf_len = 0;
807         DBRELE(transPtr->dbase);
808         return (code);
809     }
810
811     /* Collect writes for the other ubik servers (to be done in bulk) */
812     iovec = (struct ubik_iovec *)transPtr->iovec_info.iovec_wrt_val;
813     iovec[transPtr->iovec_info.iovec_wrt_len].file = transPtr->seekFile;
814     iovec[transPtr->iovec_info.iovec_wrt_len].position = transPtr->seekPos;
815     iovec[transPtr->iovec_info.iovec_wrt_len].length = length;
816
817     memcpy(&transPtr->iovec_data.
818            iovec_buf_val[transPtr->iovec_data.iovec_buf_len], buffer, length);
819
820     transPtr->iovec_info.iovec_wrt_len++;
821     transPtr->iovec_data.iovec_buf_len += length;
822     transPtr->seekPos += length;
823
824   error_exit:
825     DBRELE(transPtr->dbase);
826     return error;
827 }
828
829 /* This sets the file pointer associated with the current transaction to the appropriate file and byte position.  Unlike Unix files, a transaction is labelled by both a file number (fileid) and a byte position relative to the specified file (position). */
830
831 int
832 ubik_Seek(register struct ubik_trans *transPtr, afs_int32 fileid,
833           afs_int32 position)
834 {
835     register afs_int32 code;
836
837     DBHOLD(transPtr->dbase);
838     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
839         code = UNOQUORUM;
840     } else {
841         transPtr->seekFile = fileid;
842         transPtr->seekPos = position;
843         code = 0;
844     }
845     DBRELE(transPtr->dbase);
846     return code;
847 }
848
849 /* This call returns the file pointer associated with the specified transaction in fileid and position. */
850
851 int
852 ubik_Tell(register struct ubik_trans *transPtr, afs_int32 * fileid,
853           afs_int32 * position)
854 {
855     DBHOLD(transPtr->dbase);
856     *fileid = transPtr->seekFile;
857     *position = transPtr->seekPos;
858     DBRELE(transPtr->dbase);
859     return 0;
860 }
861
862 /* This sets the file size for the currently-selected file to length bytes, if length is less than the file's current size. */
863
864 int
865 ubik_Truncate(register struct ubik_trans *transPtr, afs_int32 length)
866 {
867     afs_int32 code, error = 0;
868
869     /* Will also catch if not UBIK_WRITETRANS */
870     code = ubik_Flush(transPtr);
871     if (code)
872         return (code);
873
874     DBHOLD(transPtr->dbase);
875     /* first, check that quorum is still good, and that dbase is up-to-date */
876     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
877         ERROR_EXIT(UNOQUORUM);
878     if (!ubeacon_AmSyncSite())
879         ERROR_EXIT(UNOTSYNC);
880
881     /* now do the operation locally, and propagate it out */
882     code = udisk_truncate(transPtr, transPtr->seekFile, length);
883     if (!code) {
884         code =
885             ContactQuorum(DISK_Truncate, transPtr, 0, transPtr->seekFile,
886                           length);
887     }
888     if (code) {
889         /* we must abort the operation */
890         udisk_abort(transPtr);
891         ContactQuorum(DISK_Abort, transPtr, 0); /* force aborts to the others */
892         ERROR_EXIT(code);
893     }
894
895   error_exit:
896     DBRELE(transPtr->dbase);
897     return error;
898 }
899
900 /* set a lock; all locks are released on transaction end (commit/abort) */
901 int
902 ubik_SetLock(struct ubik_trans *atrans, afs_int32 apos, afs_int32 alen,
903              int atype)
904 {
905     afs_int32 code = 0, error = 0;
906
907     if (atype == LOCKWRITE) {
908         if (atrans->type == UBIK_READTRANS)
909             return UBADTYPE;
910         code = ubik_Flush(atrans);
911         if (code)
912             return (code);
913     }
914
915     DBHOLD(atrans->dbase);
916     if (atype == LOCKREAD) {
917         code = ulock_getLock(atrans, atype, 1);
918         if (code)
919             ERROR_EXIT(code);
920     } else {
921         /* first, check that quorum is still good, and that dbase is up-to-date */
922         if (!urecovery_AllBetter(atrans->dbase, atrans->flags & TRREADANY))
923             ERROR_EXIT(UNOQUORUM);
924         if (!ubeacon_AmSyncSite())
925             ERROR_EXIT(UNOTSYNC);
926
927         /* now do the operation locally, and propagate it out */
928         code = ulock_getLock(atrans, atype, 1);
929         if (code == 0) {
930             code = ContactQuorum(DISK_Lock, atrans, 0, 0, 1 /*unused */ ,
931                                  1 /*unused */ , LOCKWRITE);
932         }
933         if (code) {
934             /* we must abort the operation */
935             udisk_abort(atrans);
936             ContactQuorum(DISK_Abort, atrans, 0);       /* force aborts to the others */
937             ERROR_EXIT(code);
938         }
939     }
940
941   error_exit:
942     DBRELE(atrans->dbase);
943     return error;
944 }
945
946 /* utility to wait for a version # to change */
947 int
948 ubik_WaitVersion(register struct ubik_dbase *adatabase,
949                  register struct ubik_version *aversion)
950 {
951     while (1) {
952         /* wait until version # changes, and then return */
953         if (vcmp(*aversion, adatabase->version) != 0)
954             return 0;
955 #ifdef AFS_PTHREAD_ENV
956         assert(pthread_mutex_lock(&adatabase->version_mutex) == 0);
957         assert(pthread_cond_wait(&adatabase->version_cond,&adatabase->version_mutex) == 0);
958         assert(pthread_mutex_unlock(&adatabase->version_mutex) == 0);
959 #else
960         LWP_WaitProcess(&adatabase->version);   /* same vers, just wait */
961 #endif
962     }
963 }
964
965 /* utility to get the version of the dbase a transaction is dealing with */
966 int
967 ubik_GetVersion(register struct ubik_trans *atrans,
968                 register struct ubik_version *avers)
969 {
970     *avers = atrans->dbase->version;
971     return 0;
972 }
973
974 /* Facility to simplify database caching.  Returns zero if last trans was done
975    on the local server and was successful.  If return value is non-zero and the
976    caller is a server caching part of the Ubik database, it should invalidate
977    that cache.  A return value of -1 means bad (NULL) argument. */
978
979 int
980 ubik_CacheUpdate(register struct ubik_trans *atrans)
981 {
982     if (!(atrans && atrans->dbase))
983         return -1;
984     return vcmp(atrans->dbase->cachedVersion, atrans->dbase->version) != 0;
985 }
986
987 int
988 panic(char *a, char *b, char *c, char *d)
989 {
990     ubik_print("Ubik PANIC: ");
991     ubik_print(a, b, c, d);
992     abort();
993     ubik_print("BACK FROM ABORT\n");    /* shouldn't come back */
994     exit(1);                    /* never know, though  */
995 }
996
997 /*
998 ** This functions takes an IP addresses as its parameter. It returns the
999 ** the primary IP address that is on the host passed in.
1000 */
1001 afs_uint32
1002 ubikGetPrimaryInterfaceAddr(afs_uint32 addr)
1003 {
1004     struct ubik_server *ts;
1005     int j;
1006
1007     for (ts = ubik_servers; ts; ts = ts->next)
1008         for (j = 0; j < UBIK_MAX_INTERFACE_ADDR; j++)
1009             if (ts->addr[j] == addr)
1010                 return ts->addr[0];     /* net byte order */
1011     return 0;                   /* if not in server database, return error */
1012 }