ubik-doxygen-20081121
[openafs.git] / src / ubik / ubik.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 RCSID
14     ("$Header$");
15
16 #include <sys/types.h>
17 #ifdef AFS_NT40_ENV
18 #include <winsock2.h>
19 #else
20 #include <sys/file.h>
21 #include <netinet/in.h>
22 #include <sys/param.h>
23 #endif
24 #include <time.h>
25 #include <lock.h>
26 #include <string.h>
27 #include <rx/xdr.h>
28 #include <rx/rx.h>
29 #include <afs/cellconfig.h>
30
31 #define UBIK_INTERNALS
32 #include "ubik.h"
33 #include "ubik_int.h"
34
35 #include <lwp.h>   /* temporary hack by klm */
36
37 #define ERROR_EXIT(code) {error=(code); goto error_exit;}
38
39 /*!
40  * \file
41  * This system is organized in a hierarchical set of related modules.  Modules
42  * at one level can only call modules at the same level or below.
43  * 
44  * At the bottom level (0) we have R, RFTP, LWP and IOMGR, i.e. the basic
45  * operating system primitives.
46  *
47  * At the next level (1) we have
48  *
49  * \li VOTER--The module responsible for casting votes when asked.  It is also
50  * responsible for determining whether this server should try to become
51  * a synchronization site.
52  * \li BEACONER--The module responsible for sending keep-alives out when a
53  * server is actually the sync site, or trying to become a sync site.
54  * \li DISK--The module responsible for representing atomic transactions
55  * on the local disk.  It maintains a new-value only log.
56  * \li LOCK--The module responsible for locking byte ranges in the database file.
57  *      
58  * At the next level (2) we have
59  *  
60  * \li RECOVERY--The module responsible for ensuring that all members of a quorum
61  * have the same up-to-date database after a new synchronization site is
62  * elected.  This module runs only on the synchronization site.
63  *      
64  * At the next level (3) we have
65  *
66  * \li REMOTE--The module responsible for interpreting requests from the sync
67  * site and applying them to the database, after obtaining the appropriate
68  * locks.
69  *
70  * At the next level (4) we have
71  *
72  * \li UBIK--The module users call to perform operations on the database.
73  */
74
75
76 /* some globals */
77 afs_int32 ubik_quorum = 0;
78 struct ubik_dbase *ubik_dbase = 0;
79 struct ubik_stats ubik_stats;
80 afs_uint32 ubik_host[UBIK_MAX_INTERFACE_ADDR];
81 afs_int32 ubik_epochTime = 0;
82 afs_int32 urecovery_state = 0;
83 int (*ubik_SRXSecurityProc) ();
84 char *ubik_SRXSecurityRock;
85 struct ubik_server *ubik_servers;
86 short ubik_callPortal;
87
88 static int BeginTrans();
89
90 struct rx_securityClass *ubik_sc[3];
91
92 #define CStampVersion       1   /* meaning set ts->version */
93
94 /*! 
95  * \brief Perform an operation at a quorum, handling error conditions.
96  * \return 0 if all worked and a quorum was contacted successfully
97  * \return otherwise mark failing server as down and return #UERROR
98  *
99  * \note If any server misses an update, we must wait #BIGTIME seconds before
100  * allowing the transaction to commit, to ensure that the missing and possibly still
101  * functioning server times out and stops handing out old data.  This is done in the commit
102  * code, where we wait for a server marked down to have stayed down for #BIGTIME seconds
103  * before we allow a transaction to commit.  A server that fails but comes back up won't give
104  * out old data because it is sent the sync count along with the beacon message that
105  * marks it as \b really up (\p beaconSinceDown).
106  */
107 afs_int32
108 ContactQuorum(aproc, atrans, aflags, aparm0, aparm1, aparm2, aparm3, aparm4,
109               aparm5)
110      int (*aproc) ();
111      int aflags;
112      register struct ubik_trans *atrans;
113      long aparm0, aparm1, aparm2, aparm3, aparm4, aparm5;
114 {
115     register struct ubik_server *ts;
116     register afs_int32 code;
117     afs_int32 rcode, okcalls;
118
119     rcode = 0;
120     okcalls = 0;
121     for (ts = ubik_servers; ts; ts = ts->next) {
122         /* for each server */
123         if (!ts->up || !ts->currentDB) {
124             ts->currentDB = 0;  /* db is no longer current; we just missed an update */
125             continue;           /* not up-to-date, don't bother */
126         }
127         code =
128             (*aproc) (ts->disk_rxcid, &atrans->tid, aparm0, aparm1, aparm2,
129                       aparm3, aparm4, aparm5);
130         if ((aproc == DISK_WriteV) && (code <= -450) && (code > -500)) {
131             /* An RPC interface mismatch (as defined in comerr/error_msg.c).
132              * Un-bulk the entries and do individual DISK_Write calls
133              * instead of DISK_WriteV.
134              */
135             iovec_wrt *iovec_infoP = (iovec_wrt *) aparm0;
136             iovec_buf *iovec_dataP = (iovec_buf *) aparm1;
137             struct ubik_iovec *iovec =
138                 (struct ubik_iovec *)iovec_infoP->iovec_wrt_val;
139             char *iobuf = (char *)iovec_dataP->iovec_buf_val;
140             bulkdata tcbs;
141             afs_int32 i, offset;
142
143             for (i = 0, offset = 0; i < iovec_infoP->iovec_wrt_len; i++) {
144                 /* Sanity check for going off end of buffer */
145                 if ((offset + iovec[i].length) > iovec_dataP->iovec_buf_len) {
146                     code = UINTERNAL;
147                     break;
148                 }
149                 tcbs.bulkdata_len = iovec[i].length;
150                 tcbs.bulkdata_val = &iobuf[offset];
151                 code =
152                     DISK_Write(ts->disk_rxcid, &atrans->tid, iovec[i].file,
153                                iovec[i].position, &tcbs);
154                 if (code)
155                     break;
156
157                 offset += iovec[i].length;
158             }
159         }
160         if (code) {             /* failure */
161             rcode = code;
162             ts->up = 0;         /* mark as down now; beacons will no longer be sent */
163             ts->currentDB = 0;
164             ts->beaconSinceDown = 0;
165             urecovery_LostServer();     /* tell recovery to try to resend dbase later */
166         } else {                /* success */
167             if (!ts->isClone)
168                 okcalls++;      /* count up how many worked */
169             if (aflags & CStampVersion) {
170                 ts->version = atrans->dbase->version;
171             }
172         }
173     }
174     /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
175     if (okcalls + 1 >= ubik_quorum)
176         return 0;
177     else
178         return rcode;
179 }
180
181 /*! 
182  * \brief This routine initializes the ubik system for a set of servers.
183  * \return 0 for success, or an error code on failure.
184  * \param serverList set of servers specified; nServers gives the number of entries in this array.
185  * \param pathName provides an initial prefix used for naming storage files used by this system.  
186  * \param dbase the returned structure representing this instance of an ubik; it is passed to various calls below.  
187  *
188  * \todo This routine should perhaps be generalized to a low-level disk interface providing read, write, file enumeration and sync operations.
189  *
190  * \warning The host named by myHost should not also be listed in serverList.
191  *
192  * \see ubik_ServerInit(), ubik_ServerInitByInfo()
193  */
194 int
195 ubik_ServerInitCommon(afs_int32 myHost, short myPort,
196                       struct afsconf_cell *info, char clones[],
197                       afs_int32 serverList[], char *pathName,
198                       struct ubik_dbase **dbase)
199 {
200     register struct ubik_dbase *tdb;
201     register afs_int32 code;
202 #ifdef AFS_PTHREAD_ENV
203     pthread_t rxServerThread;        /* pthread variables */
204     pthread_t ubeacon_InteractThread;
205     pthread_t urecovery_InteractThread;
206     pthread_attr_t rxServer_tattr;
207     pthread_attr_t ubeacon_Interact_tattr;
208     pthread_attr_t urecovery_Interact_tattr;
209 #else
210     PROCESS junk;
211 #endif
212
213     afs_int32 secIndex;
214     struct rx_securityClass *secClass;
215
216     struct rx_service *tservice;
217     extern int VOTE_ExecuteRequest(), DISK_ExecuteRequest();
218     extern int rx_stackSize;
219
220     initialize_U_error_table();
221
222     tdb = (struct ubik_dbase *)malloc(sizeof(struct ubik_dbase));
223     tdb->pathName = (char *)malloc(strlen(pathName) + 1);
224     strcpy(tdb->pathName, pathName);
225     tdb->activeTrans = (struct ubik_trans *)0;
226     memset(&tdb->version, 0, sizeof(struct ubik_version));
227     memset(&tdb->cachedVersion, 0, sizeof(struct ubik_version));
228     Lock_Init(&tdb->versionLock);
229     tdb->flags = 0;
230     tdb->read = uphys_read;
231     tdb->write = uphys_write;
232     tdb->truncate = uphys_truncate;
233     tdb->open = uphys_invalidate;       /* this function isn't used any more */
234     tdb->sync = uphys_sync;
235     tdb->stat = uphys_stat;
236     tdb->getlabel = uphys_getlabel;
237     tdb->setlabel = uphys_setlabel;
238     tdb->getnfiles = uphys_getnfiles;
239     tdb->readers = 0;
240     tdb->tidCounter = tdb->writeTidCounter = 0;
241     *dbase = tdb;
242     ubik_dbase = tdb;           /* for now, only one db per server; can fix later when we have names for the other dbases */
243
244     /* initialize RX */
245
246     /* the following call is idempotent so when/if it got called earlier,
247      * by whatever called us, it doesn't really matter -- klm */
248     code = rx_Init(myPort);
249     if (code < 0)
250         return code;
251
252     ubik_callPortal = myPort;
253     /* try to get an additional security object */
254     ubik_sc[0] = rxnull_NewServerSecurityObject();
255     ubik_sc[1] = 0;
256     ubik_sc[2] = 0;
257     if (ubik_SRXSecurityProc) {
258         code =
259             (*ubik_SRXSecurityProc) (ubik_SRXSecurityRock, &secClass,
260                                      &secIndex);
261         if (code == 0) {
262             ubik_sc[secIndex] = secClass;
263         }
264     }
265     /* for backwards compat this should keep working as it does now 
266        and not host bind */
267 #if 0
268     /* This really needs to be up above, where I have put it.  It works
269      * here when we're non-pthreaded, but the code above, when using
270      * pthreads may (and almost certainly does) end up calling on a
271      * pthread resource which gets initialized by rx_Init.  The end
272      * result is that an assert fails and the program dies. -- klm
273      */
274     code = rx_Init(myPort);
275     if (code < 0)
276         return code;
277 #endif
278
279     tservice =
280         rx_NewService(0, VOTE_SERVICE_ID, "VOTE", ubik_sc, 3,
281                       VOTE_ExecuteRequest);
282     if (tservice == (struct rx_service *)0) {
283         ubik_dprint("Could not create VOTE rx service!\n");
284         return -1;
285     }
286     rx_SetMinProcs(tservice, 2);
287     rx_SetMaxProcs(tservice, 3);
288
289     tservice =
290         rx_NewService(0, DISK_SERVICE_ID, "DISK", ubik_sc, 3,
291                       DISK_ExecuteRequest);
292     if (tservice == (struct rx_service *)0) {
293         ubik_dprint("Could not create DISK rx service!\n");
294         return -1;
295     }
296     rx_SetMinProcs(tservice, 2);
297     rx_SetMaxProcs(tservice, 3);
298
299     /* start an rx_ServerProc to handle incoming RPC's in particular the 
300      * UpdateInterfaceAddr RPC that occurs in ubeacon_InitServerList. This avoids
301      * the "steplock" problem in ubik initialization. Defect 11037.
302      */
303 #ifdef AFS_PTHREAD_ENV
304 /* do assert stuff */
305     assert(pthread_attr_init(&rxServer_tattr) == 0);
306     assert(pthread_attr_setdetachstate(&rxServer_tattr, PTHREAD_CREATE_DETACHED) == 0);
307 /*    assert(pthread_attr_setstacksize(&rxServer_tattr, rx_stackSize) == 0); */
308
309     assert(pthread_create(&rxServerThread, &rxServer_tattr, (void *)rx_ServerProc, NULL) == 0);
310 #else
311     LWP_CreateProcess(rx_ServerProc, rx_stackSize, RX_PROCESS_PRIORITY,
312               NULL, "rx_ServerProc", &junk);
313 #endif
314
315     /* do basic initialization */
316     code = uvote_Init();
317     if (code)
318         return code;
319     code = urecovery_Initialize(tdb);
320     if (code)
321         return code;
322     if (info)
323         code = ubeacon_InitServerListByInfo(myHost, info, clones);
324     else
325         code = ubeacon_InitServerList(myHost, serverList);
326     if (code)
327         return code;
328
329     /* now start up async processes */
330 #ifdef AFS_PTHREAD_ENV
331 /* do assert stuff */
332     assert(pthread_attr_init(&ubeacon_Interact_tattr) == 0);
333     assert(pthread_attr_setdetachstate(&ubeacon_Interact_tattr, PTHREAD_CREATE_DETACHED) == 0);
334 /*    assert(pthread_attr_setstacksize(&ubeacon_Interact_tattr, 16384) == 0); */
335     /*  need another attr set here for priority???  - klm */
336
337     assert(pthread_create(&ubeacon_InteractThread, &ubeacon_Interact_tattr,
338            (void *)ubeacon_Interact, NULL) == 0);
339 #else
340     code = LWP_CreateProcess(ubeacon_Interact, 16384 /*8192 */ ,
341                              LWP_MAX_PRIORITY - 1, (void *)0, "beacon",
342                              &junk);
343     if (code)
344         return code;
345 #endif
346
347 #ifdef AFS_PTHREAD_ENV
348 /* do assert stuff */
349     assert(pthread_attr_init(&urecovery_Interact_tattr) == 0);
350     assert(pthread_attr_setdetachstate(&urecovery_Interact_tattr, PTHREAD_CREATE_DETACHED) == 0);
351 /*    assert(pthread_attr_setstacksize(&urecovery_Interact_tattr, 16384) == 0); */
352     /*  need another attr set here for priority???  - klm */
353
354     assert(pthread_create(&urecovery_InteractThread, &urecovery_Interact_tattr,
355            (void *)urecovery_Interact, NULL) == 0);
356
357     return 0;  /* is this correct?  - klm */
358 #else  
359     code = LWP_CreateProcess(urecovery_Interact, 16384 /*8192 */ ,
360                              LWP_MAX_PRIORITY - 1, (void *)0, "recovery",
361                              &junk);
362     return code;
363 #endif
364
365 }
366
367 /*!
368  * \see ubik_ServerInitCommon()
369  */
370 int
371 ubik_ServerInitByInfo(afs_int32 myHost, short myPort,
372                       struct afsconf_cell *info, char clones[],
373                       char *pathName, struct ubik_dbase **dbase)
374 {
375     afs_int32 code;
376
377     code =
378         ubik_ServerInitCommon(myHost, myPort, info, clones, 0, pathName,
379                               dbase);
380     return code;
381 }
382
383 /*!
384  * \see ubik_ServerInitCommon()
385  */
386 int
387 ubik_ServerInit(afs_int32 myHost, short myPort, afs_int32 serverList[],
388                 char *pathName, struct ubik_dbase **dbase)
389 {
390     afs_int32 code;
391
392     code =
393         ubik_ServerInitCommon(myHost, myPort, (struct afsconf_cell *)0, 0,
394                               serverList, pathName, dbase);
395     return code;
396 }
397
398 /*!
399  * \brief This routine begins a read or write transaction on the transaction
400  * identified by transPtr, in the dbase named by dbase.
401  *
402  * An open mode of ubik_READTRANS identifies this as a read transaction, 
403  * while a mode of ubik_WRITETRANS identifies this as a write transaction.
404  * transPtr is set to the returned transaction control block. 
405  * The readAny flag is set to 0 or 1 by the wrapper functions ubik_BeginTrans() or 
406  * ubik_BeginTransReadAny() below.
407  *
408  * \note We can only begin transaction when we have an up-to-date database.
409  */
410 static int
411 BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
412            struct ubik_trans **transPtr, int readAny)
413 {
414     struct ubik_trans *jt;
415     register struct ubik_trans *tt;
416     register afs_int32 code;
417 #if defined(UBIK_PAUSE)
418     int count;
419 #endif /* UBIK_PAUSE */
420
421     if ((transMode != UBIK_READTRANS) && readAny)
422         return UBADTYPE;
423     DBHOLD(dbase);
424 #if defined(UBIK_PAUSE)
425     /* if we're polling the slave sites, wait until the returns
426      *  are all in.  Otherwise, the urecovery_CheckTid call may
427      *  glitch us. 
428      */
429     if (transMode == UBIK_WRITETRANS)
430         for (count = 75; dbase->flags & DBVOTING; --count) {
431             DBRELE(dbase);
432 #ifdef GRAND_PAUSE_DEBUGGING
433             if (count == 75)
434                 fprintf(stderr,
435                         "%ld: myport=%d: BeginTrans is waiting 'cause of voting conflict\n",
436                         time(0), ntohs(ubik_callPortal));
437             else
438 #endif
439             if (count <= 0) {
440 #if 1
441                 fprintf(stderr,
442                         "%ld: myport=%d: BeginTrans failed because of voting conflict\n",
443                         time(0), ntohs(ubik_callPortal));
444 #endif
445                 return UNOQUORUM;       /* a white lie */
446             }
447 #ifdef AFS_PTHREAD_ENV
448             sleep(2);
449 #else
450             IOMGR_Sleep(2);
451 #endif
452             DBHOLD(dbase);
453         }
454 #endif /* UBIK_PAUSE */
455     if (urecovery_AllBetter(dbase, readAny) == 0) {
456         DBRELE(dbase);
457         return UNOQUORUM;
458     }
459     /* otherwise we have a quorum, use it */
460
461     /* make sure that at most one write transaction occurs at any one time.  This
462      * has nothing to do with transaction locking; that's enforced by the lock package.  However,
463      * we can't even handle two non-conflicting writes, since our log and recovery modules
464      * don't know how to restore one without possibly picking up some data from the other. */
465     if (transMode == UBIK_WRITETRANS) {
466         /* if we're writing already, wait */
467         while (dbase->flags & DBWRITING) {
468             DBRELE(dbase);
469 #ifdef AFS_PTHREAD_ENV
470             assert(pthread_mutex_lock(&dbase->flags_mutex) == 0);
471             assert(pthread_cond_wait(&dbase->flags_cond, &dbase->flags_mutex) == 0);
472             assert(pthread_mutex_unlock(&dbase->flags_mutex) == 0);
473 #else
474             LWP_WaitProcess(&dbase->flags);
475 #endif
476             DBHOLD(dbase);
477         }
478         if (!ubeacon_AmSyncSite()) {
479             DBRELE(dbase);
480             return UNOTSYNC;
481         }
482     }
483
484     /* create the transaction */
485     code = udisk_begin(dbase, transMode, &jt);  /* can't take address of register var */
486     tt = jt;                    /* move to a register */
487     if (code || tt == (struct ubik_trans *)NULL) {
488         DBRELE(dbase);
489         return code;
490     }
491     if (readAny)
492         tt->flags |= TRREADANY;
493     /* label trans and dbase with new tid */
494     tt->tid.epoch = ubik_epochTime;
495     /* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */
496     tt->tid.counter = (dbase->tidCounter += 2);
497
498     if (transMode == UBIK_WRITETRANS) {
499         /* for a write trans, we have to keep track of the write tid counter too */
500 #if defined(UBIK_PAUSE)
501         dbase->writeTidCounter = tt->tid.counter;
502 #else
503         dbase->writeTidCounter += 2;
504 #endif /* UBIK_PAUSE */
505
506         /* next try to start transaction on appropriate number of machines */
507         code = ContactQuorum(DISK_Begin, tt, 0);
508         if (code) {
509             /* we must abort the operation */
510             udisk_abort(tt);
511             ContactQuorum(DISK_Abort, tt, 0);   /* force aborts to the others */
512             udisk_end(tt);
513             DBRELE(dbase);
514             return code;
515         }
516     }
517
518     *transPtr = tt;
519     DBRELE(dbase);
520     return 0;
521 }
522
523 /*!
524  * \see BeginTrans()
525  */
526 int
527 ubik_BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
528                 struct ubik_trans **transPtr)
529 {
530     return BeginTrans(dbase, transMode, transPtr, 0);
531 }
532
533 /*!
534  * \see BeginTrans()
535  */
536 int
537 ubik_BeginTransReadAny(register struct ubik_dbase *dbase, afs_int32 transMode,
538                        struct ubik_trans **transPtr)
539 {
540     return BeginTrans(dbase, transMode, transPtr, 1);
541 }
542
543 /*!
544  * \brief This routine ends a read or write transaction by aborting it.
545  */
546 int
547 ubik_AbortTrans(register struct ubik_trans *transPtr)
548 {
549     register afs_int32 code;
550     afs_int32 code2;
551     register struct ubik_dbase *dbase;
552
553     dbase = transPtr->dbase;
554     DBHOLD(dbase);
555     memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
556     /* see if we're still up-to-date */
557     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
558         udisk_abort(transPtr);
559         udisk_end(transPtr);
560         DBRELE(dbase);
561         return UNOQUORUM;
562     }
563
564     if (transPtr->type == UBIK_READTRANS) {
565         code = udisk_abort(transPtr);
566         udisk_end(transPtr);
567         DBRELE(dbase);
568         return code;
569     }
570
571     /* below here, we know we're doing a write transaction */
572     if (!ubeacon_AmSyncSite()) {
573         udisk_abort(transPtr);
574         udisk_end(transPtr);
575         DBRELE(dbase);
576         return UNOTSYNC;
577     }
578
579     /* now it is safe to try remote abort */
580     code = ContactQuorum(DISK_Abort, transPtr, 0);
581     code2 = udisk_abort(transPtr);
582     udisk_end(transPtr);
583     DBRELE(dbase);
584     return (code ? code : code2);
585 }
586
587 /*!
588  * \brief This routine ends a read or write transaction on the open transaction identified by transPtr.
589  * \return an error code.
590  */
591 int
592 ubik_EndTrans(register struct ubik_trans *transPtr)
593 {
594     register afs_int32 code;
595     struct timeval tv;
596     afs_int32 realStart;
597     register struct ubik_server *ts;
598     afs_int32 now;
599     register struct ubik_dbase *dbase;
600
601     if (transPtr->type == UBIK_WRITETRANS) {
602         code = ubik_Flush(transPtr);
603         if (code) {
604             ubik_AbortTrans(transPtr);
605             return (code);
606         }
607     }
608
609     dbase = transPtr->dbase;
610     DBHOLD(dbase);
611     memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
612
613     /* give up if no longer current */
614     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
615         udisk_abort(transPtr);
616         udisk_end(transPtr);
617         DBRELE(dbase);
618         return UNOQUORUM;
619     }
620
621     if (transPtr->type == UBIK_READTRANS) {     /* reads are easy */
622         code = udisk_commit(transPtr);
623         if (code == 0)
624             goto success;       /* update cachedVersion correctly */
625         udisk_end(transPtr);
626         DBRELE(dbase);
627         return code;
628     }
629
630     if (!ubeacon_AmSyncSite()) {        /* no longer sync site */
631         udisk_abort(transPtr);
632         udisk_end(transPtr);
633         DBRELE(dbase);
634         return UNOTSYNC;
635     }
636
637     /* now it is safe to do commit */
638     code = udisk_commit(transPtr);
639     if (code == 0)
640         code = ContactQuorum(DISK_Commit, transPtr, CStampVersion);
641     if (code) {
642         /* failed to commit, so must return failure.  Try to clear locks first, just for fun
643          * Note that we don't know if this transaction will eventually commit at this point.
644          * If it made it to a site that will be present in the next quorum, we win, otherwise
645          * we lose.  If we contact a majority of sites, then we won't be here: contacting
646          * a majority guarantees commit, since it guarantees that one dude will be a
647          * member of the next quorum. */
648         ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
649         udisk_end(transPtr);
650         DBRELE(dbase);
651         return code;
652     }
653     /* before we can start sending unlock messages, we must wait until all servers
654      * that are possibly still functioning on the other side of a network partition
655      * have timed out.  Check the server structures, compute how long to wait, then
656      * start the unlocks */
657     realStart = FT_ApproxTime();
658     while (1) {
659         /* wait for all servers to time out */
660         code = 0;
661         now = FT_ApproxTime();
662         /* check if we're still sync site, the guy should either come up
663          * to us, or timeout.  Put safety check in anyway */
664         if (now - realStart > 10 * BIGTIME) {
665             ubik_stats.escapes++;
666             ubik_print("ubik escaping from commit wait\n");
667             break;
668         }
669         for (ts = ubik_servers; ts; ts = ts->next) {
670             if (!ts->beaconSinceDown && now <= ts->lastBeaconSent + BIGTIME) {
671                 /* this guy could have some damaged data, wait for him */
672                 code = 1;
673                 tv.tv_sec = 1;  /* try again after a while (ha ha) */
674                 tv.tv_usec = 0;
675 #ifdef AFS_PTHREAD_ENV
676                 select(0, 0, 0, 0, &tv);
677 #else
678                 IOMGR_Select(0, 0, 0, 0, &tv);  /* poll, should we wait on something? */
679 #endif
680                 break;
681             }
682         }
683         if (code == 0)
684             break;              /* no down ones still pseudo-active */
685     }
686
687     /* finally, unlock all the dudes.  We can return success independent of the number of servers
688      * that really unlock the dbase; the others will do it if/when they elect a new sync site.
689      * The transaction is committed anyway, since we succeeded in contacting a quorum
690      * at the start (when invoking the DiskCommit function).
691      */
692     ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
693
694   success:
695     udisk_end(transPtr);
696     /* update version on successful EndTrans */
697     memcpy(&dbase->cachedVersion, &dbase->version,
698            sizeof(struct ubik_version));
699
700     DBRELE(dbase);
701     return 0;
702 }
703
704 /*!
705  * \brief This routine reads length bytes into buffer from the current position in the database.
706  * 
707  * The file pointer is updated appropriately (by adding the number of bytes actually transferred), and the length actually transferred is stored in the long integer pointed to by length.  A short read returns zero for an error code.
708  *
709  * \note *length is an INOUT parameter: at the start it represents the size of the buffer, and when done, it contains the number of bytes actually transferred.
710  */
711 int
712 ubik_Read(register struct ubik_trans *transPtr, char *buffer,
713           afs_int32 length)
714 {
715     register afs_int32 code;
716
717     /* reads are easy to do: handle locally */
718     DBHOLD(transPtr->dbase);
719     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
720         DBRELE(transPtr->dbase);
721         return UNOQUORUM;
722     }
723
724     code =
725         udisk_read(transPtr, transPtr->seekFile, buffer, transPtr->seekPos,
726                    length);
727     if (code == 0) {
728         transPtr->seekPos += length;
729     }
730     DBRELE(transPtr->dbase);
731     return code;
732 }
733
734 /*!
735  * \brief This routine will flush the io data in the iovec structures. 
736  *
737  * It first flushes to the local disk and then uses ContactQuorum to write it
738  * to the other servers.
739  */
740 int
741 ubik_Flush(struct ubik_trans *transPtr)
742 {
743     afs_int32 code, error = 0;
744
745     if (transPtr->type != UBIK_WRITETRANS)
746         return UBADTYPE;
747     if (!transPtr->iovec_info.iovec_wrt_len
748         || !transPtr->iovec_info.iovec_wrt_val)
749         return 0;
750
751     DBHOLD(transPtr->dbase);
752     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
753         ERROR_EXIT(UNOQUORUM);
754     if (!ubeacon_AmSyncSite())  /* only sync site can write */
755         ERROR_EXIT(UNOTSYNC);
756
757     /* Update the rest of the servers in the quorum */
758     code =
759         ContactQuorum(DISK_WriteV, transPtr, 0, &transPtr->iovec_info,
760                       &transPtr->iovec_data);
761     if (code) {
762         udisk_abort(transPtr);
763         ContactQuorum(DISK_Abort, transPtr, 0); /* force aborts to the others */
764         transPtr->iovec_info.iovec_wrt_len = 0;
765         transPtr->iovec_data.iovec_buf_len = 0;
766         ERROR_EXIT(code);
767     }
768
769     /* Wrote the buffers out, so start at scratch again */
770     transPtr->iovec_info.iovec_wrt_len = 0;
771     transPtr->iovec_data.iovec_buf_len = 0;
772
773   error_exit:
774     DBRELE(transPtr->dbase);
775     return error;
776 }
777
778 int
779 ubik_Write(register struct ubik_trans *transPtr, char *buffer,
780            afs_int32 length)
781 {
782     struct ubik_iovec *iovec;
783     afs_int32 code, error = 0;
784     afs_int32 pos, len, size;
785
786     if (transPtr->type != UBIK_WRITETRANS)
787         return UBADTYPE;
788     if (!length)
789         return 0;
790
791     if (length > IOVEC_MAXBUF) {
792         for (pos = 0, len = length; len > 0; len -= size, pos += size) {
793             size = ((len < IOVEC_MAXBUF) ? len : IOVEC_MAXBUF);
794             code = ubik_Write(transPtr, &buffer[pos], size);
795             if (code)
796                 return (code);
797         }
798         return 0;
799     }
800
801     if (!transPtr->iovec_info.iovec_wrt_val) {
802         transPtr->iovec_info.iovec_wrt_len = 0;
803         transPtr->iovec_info.iovec_wrt_val =
804             (struct ubik_iovec *)malloc(IOVEC_MAXWRT *
805                                         sizeof(struct ubik_iovec));
806         transPtr->iovec_data.iovec_buf_len = 0;
807         transPtr->iovec_data.iovec_buf_val = (char *)malloc(IOVEC_MAXBUF);
808         if (!transPtr->iovec_info.iovec_wrt_val
809             || !transPtr->iovec_data.iovec_buf_val) {
810             if (transPtr->iovec_info.iovec_wrt_val)
811                 free(transPtr->iovec_info.iovec_wrt_val);
812             transPtr->iovec_info.iovec_wrt_val = 0;
813             if (transPtr->iovec_data.iovec_buf_val)
814                 free(transPtr->iovec_data.iovec_buf_val);
815             transPtr->iovec_data.iovec_buf_val = 0;
816             return UNOMEM;
817         }
818     }
819
820     /* If this write won't fit in the structure, then flush it out and start anew */
821     if ((transPtr->iovec_info.iovec_wrt_len >= IOVEC_MAXWRT)
822         || ((length + transPtr->iovec_data.iovec_buf_len) > IOVEC_MAXBUF)) {
823         code = ubik_Flush(transPtr);
824         if (code)
825             return (code);
826     }
827
828     DBHOLD(transPtr->dbase);
829     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
830         ERROR_EXIT(UNOQUORUM);
831     if (!ubeacon_AmSyncSite())  /* only sync site can write */
832         ERROR_EXIT(UNOTSYNC);
833
834     /* Write to the local disk */
835     code =
836         udisk_write(transPtr, transPtr->seekFile, buffer, transPtr->seekPos,
837                     length);
838     if (code) {
839         udisk_abort(transPtr);
840         transPtr->iovec_info.iovec_wrt_len = 0;
841         transPtr->iovec_data.iovec_buf_len = 0;
842         DBRELE(transPtr->dbase);
843         return (code);
844     }
845
846     /* Collect writes for the other ubik servers (to be done in bulk) */
847     iovec = (struct ubik_iovec *)transPtr->iovec_info.iovec_wrt_val;
848     iovec[transPtr->iovec_info.iovec_wrt_len].file = transPtr->seekFile;
849     iovec[transPtr->iovec_info.iovec_wrt_len].position = transPtr->seekPos;
850     iovec[transPtr->iovec_info.iovec_wrt_len].length = length;
851
852     memcpy(&transPtr->iovec_data.
853            iovec_buf_val[transPtr->iovec_data.iovec_buf_len], buffer, length);
854
855     transPtr->iovec_info.iovec_wrt_len++;
856     transPtr->iovec_data.iovec_buf_len += length;
857     transPtr->seekPos += length;
858
859   error_exit:
860     DBRELE(transPtr->dbase);
861     return error;
862 }
863
864 /*!
865  * \brief This sets the file pointer associated with the current transaction
866  * to the appropriate file and byte position.
867  *
868  * Unlike Unix files, a transaction is labelled by both a file number \p fileid
869  * and a byte position relative to the specified file \p position.
870  */
871 int
872 ubik_Seek(register struct ubik_trans *transPtr, afs_int32 fileid,
873           afs_int32 position)
874 {
875     register afs_int32 code;
876
877     DBHOLD(transPtr->dbase);
878     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
879         code = UNOQUORUM;
880     } else {
881         transPtr->seekFile = fileid;
882         transPtr->seekPos = position;
883         code = 0;
884     }
885     DBRELE(transPtr->dbase);
886     return code;
887 }
888
889 /*!
890  * \brief This call returns the file pointer associated with the specified
891  * transaction in \p fileid and \p position.
892  */
893 int
894 ubik_Tell(register struct ubik_trans *transPtr, afs_int32 * fileid,
895           afs_int32 * position)
896 {
897     DBHOLD(transPtr->dbase);
898     *fileid = transPtr->seekFile;
899     *position = transPtr->seekPos;
900     DBRELE(transPtr->dbase);
901     return 0;
902 }
903
904 /*!
905  * \brief This sets the file size for the currently-selected file to \p length
906  * bytes, if length is less than the file's current size.
907  */
908 int
909 ubik_Truncate(register struct ubik_trans *transPtr, afs_int32 length)
910 {
911     afs_int32 code, error = 0;
912
913     /* Will also catch if not UBIK_WRITETRANS */
914     code = ubik_Flush(transPtr);
915     if (code)
916         return (code);
917
918     DBHOLD(transPtr->dbase);
919     /* first, check that quorum is still good, and that dbase is up-to-date */
920     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
921         ERROR_EXIT(UNOQUORUM);
922     if (!ubeacon_AmSyncSite())
923         ERROR_EXIT(UNOTSYNC);
924
925     /* now do the operation locally, and propagate it out */
926     code = udisk_truncate(transPtr, transPtr->seekFile, length);
927     if (!code) {
928         code =
929             ContactQuorum(DISK_Truncate, transPtr, 0, transPtr->seekFile,
930                           length);
931     }
932     if (code) {
933         /* we must abort the operation */
934         udisk_abort(transPtr);
935         ContactQuorum(DISK_Abort, transPtr, 0); /* force aborts to the others */
936         ERROR_EXIT(code);
937     }
938
939   error_exit:
940     DBRELE(transPtr->dbase);
941     return error;
942 }
943
944 /*!
945  * \brief set a lock; all locks are released on transaction end (commit/abort)
946  */
947 int
948 ubik_SetLock(struct ubik_trans *atrans, afs_int32 apos, afs_int32 alen,
949              int atype)
950 {
951     afs_int32 code = 0, error = 0;
952
953     if (atype == LOCKWRITE) {
954         if (atrans->type == UBIK_READTRANS)
955             return UBADTYPE;
956         code = ubik_Flush(atrans);
957         if (code)
958             return (code);
959     }
960
961     DBHOLD(atrans->dbase);
962     if (atype == LOCKREAD) {
963         code = ulock_getLock(atrans, atype, 1);
964         if (code)
965             ERROR_EXIT(code);
966     } else {
967         /* first, check that quorum is still good, and that dbase is up-to-date */
968         if (!urecovery_AllBetter(atrans->dbase, atrans->flags & TRREADANY))
969             ERROR_EXIT(UNOQUORUM);
970         if (!ubeacon_AmSyncSite())
971             ERROR_EXIT(UNOTSYNC);
972
973         /* now do the operation locally, and propagate it out */
974         code = ulock_getLock(atrans, atype, 1);
975         if (code == 0) {
976             code = ContactQuorum(DISK_Lock, atrans, 0, 0, 1 /*unused */ ,
977                                  1 /*unused */ , LOCKWRITE);
978         }
979         if (code) {
980             /* we must abort the operation */
981             udisk_abort(atrans);
982             ContactQuorum(DISK_Abort, atrans, 0);       /* force aborts to the others */
983             ERROR_EXIT(code);
984         }
985     }
986
987   error_exit:
988     DBRELE(atrans->dbase);
989     return error;
990 }
991
992 /*!
993  * \brief utility to wait for a version # to change
994  */
995 int
996 ubik_WaitVersion(register struct ubik_dbase *adatabase,
997                  register struct ubik_version *aversion)
998 {
999     while (1) {
1000         /* wait until version # changes, and then return */
1001         if (vcmp(*aversion, adatabase->version) != 0)
1002             return 0;
1003 #ifdef AFS_PTHREAD_ENV
1004         assert(pthread_mutex_lock(&adatabase->version_mutex) == 0);
1005         assert(pthread_cond_wait(&adatabase->version_cond,&adatabase->version_mutex) == 0);
1006         assert(pthread_mutex_unlock(&adatabase->version_mutex) == 0);
1007 #else
1008         LWP_WaitProcess(&adatabase->version);   /* same vers, just wait */
1009 #endif
1010     }
1011 }
1012
1013 /*!
1014  * \brief utility to get the version of the dbase a transaction is dealing with
1015  */
1016 int
1017 ubik_GetVersion(register struct ubik_trans *atrans,
1018                 register struct ubik_version *avers)
1019 {
1020     *avers = atrans->dbase->version;
1021     return 0;
1022 }
1023
1024 /*!
1025  * \brief Facility to simplify database caching.  
1026  * \return zero if last trans was done on the local server and was successful.
1027  * \return -1 means bad (NULL) argument.
1028  * 
1029  * If return value is non-zero and the caller is a server caching part of the 
1030  * Ubik database, it should invalidate that cache.
1031  */
1032 int
1033 ubik_CacheUpdate(register struct ubik_trans *atrans)
1034 {
1035     if (!(atrans && atrans->dbase))
1036         return -1;
1037     return vcmp(atrans->dbase->cachedVersion, atrans->dbase->version) != 0;
1038 }
1039
1040 /*!
1041  * "Who said anything about panicking?" snapped Arthur. 
1042  * "This is still just the culture shock. You wait till I've settled down
1043  * into the situation and found my bearings. \em Then I'll start panicking!"
1044  * --Authur Dent
1045  *
1046  * \returns There is no return from panic.
1047  */
1048 int
1049 panic(char *a, char *b, char *c, char *d)
1050 {
1051     ubik_print("Ubik PANIC: ");
1052     ubik_print(a, b, c, d);
1053     abort();
1054     ubik_print("BACK FROM ABORT\n");    /* shouldn't come back */
1055     exit(1);                    /* never know, though  */
1056 }
1057
1058 /*!
1059  * This function takes an IP addresses as its parameter. It returns the
1060  * the primary IP address that is on the host passed in, or 0 if not found.
1061  */
1062 afs_uint32
1063 ubikGetPrimaryInterfaceAddr(afs_uint32 addr)
1064 {
1065     struct ubik_server *ts;
1066     int j;
1067
1068     for (ts = ubik_servers; ts; ts = ts->next)
1069         for (j = 0; j < UBIK_MAX_INTERFACE_ADDR; j++)
1070             if (ts->addr[j] == addr)
1071                 return ts->addr[0];     /* net byte order */
1072     return 0;                   /* if not in server database, return error */
1073 }