4187ac0d7e0a359749c92d03f80c2a8c831b10cf
[openafs.git] / src / ubik / ubik.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13
14 #include <sys/types.h>
15 #include <string.h>
16 #include <stdarg.h>
17 #include <time.h>
18
19 #ifdef AFS_NT40_ENV
20 #include <winsock2.h>
21 #else
22 #include <sys/file.h>
23 #include <netinet/in.h>
24 #include <sys/param.h>
25 #endif
26
27 #include <lock.h>
28 #include <rx/xdr.h>
29 #include <rx/rx.h>
30 #include <afs/cellconfig.h>
31
32 #define UBIK_INTERNALS
33 #include "ubik.h"
34 #include "ubik_int.h"
35
36 #include <lwp.h>   /* temporary hack by klm */
37
38 #define ERROR_EXIT(code) {error=(code); goto error_exit;}
39
40 /*!
41  * \file
42  * This system is organized in a hierarchical set of related modules.  Modules
43  * at one level can only call modules at the same level or below.
44  * 
45  * At the bottom level (0) we have R, RFTP, LWP and IOMGR, i.e. the basic
46  * operating system primitives.
47  *
48  * At the next level (1) we have
49  *
50  * \li VOTER--The module responsible for casting votes when asked.  It is also
51  * responsible for determining whether this server should try to become
52  * a synchronization site.
53  * \li BEACONER--The module responsible for sending keep-alives out when a
54  * server is actually the sync site, or trying to become a sync site.
55  * \li DISK--The module responsible for representing atomic transactions
56  * on the local disk.  It maintains a new-value only log.
57  * \li LOCK--The module responsible for locking byte ranges in the database file.
58  *      
59  * At the next level (2) we have
60  *  
61  * \li RECOVERY--The module responsible for ensuring that all members of a quorum
62  * have the same up-to-date database after a new synchronization site is
63  * elected.  This module runs only on the synchronization site.
64  *      
65  * At the next level (3) we have
66  *
67  * \li REMOTE--The module responsible for interpreting requests from the sync
68  * site and applying them to the database, after obtaining the appropriate
69  * locks.
70  *
71  * At the next level (4) we have
72  *
73  * \li UBIK--The module users call to perform operations on the database.
74  */
75
76
77 /* some globals */
78 afs_int32 ubik_quorum = 0;
79 struct ubik_dbase *ubik_dbase = 0;
80 struct ubik_stats ubik_stats;
81 afs_uint32 ubik_host[UBIK_MAX_INTERFACE_ADDR];
82 afs_int32 ubik_epochTime = 0;
83 afs_int32 urecovery_state = 0;
84 int (*ubik_SRXSecurityProc) (void *, struct rx_securityClass **, afs_int32 *);
85 void *ubik_SRXSecurityRock;
86 struct ubik_server *ubik_servers;
87 short ubik_callPortal;
88
89 static int BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
90                       struct ubik_trans **transPtr, int readAny);
91
92 struct rx_securityClass *ubik_sc[3];
93
94 #define CStampVersion       1   /* meaning set ts->version */
95
96 /*! 
97  * \brief Perform an operation at a quorum, handling error conditions.
98  * \return 0 if all worked and a quorum was contacted successfully
99  * \return otherwise mark failing server as down and return #UERROR
100  *
101  * \note If any server misses an update, we must wait #BIGTIME seconds before
102  * allowing the transaction to commit, to ensure that the missing and 
103  * possibly still functioning server times out and stops handing out old 
104  * data.  This is done in the commit code, where we wait for a server marked 
105  * down to have stayed down for #BIGTIME seconds before we allow a transaction 
106  * to commit.  A server that fails but comes back up won't give out old data 
107  * because it is sent the sync count along with the beacon message that
108  * marks it as \b really up (\p beaconSinceDown).
109  */
110 afs_int32
111 ContactQuorum_NoArguments(afs_int32 (*proc)(struct rx_connection *, ubik_tid *),
112                           register struct ubik_trans *atrans, int aflags)
113 {
114     register struct ubik_server *ts;
115     register afs_int32 code;
116     afs_int32 rcode, okcalls;
117
118     rcode = 0;
119     okcalls = 0;
120     for (ts = ubik_servers; ts; ts = ts->next) {
121         /* for each server */
122         if (!ts->up || !ts->currentDB) {
123             ts->currentDB = 0;  /* db is no longer current; we just missed an update */
124             continue;           /* not up-to-date, don't bother */
125         }
126         code = (*proc)(ts->disk_rxcid, &atrans->tid);
127         if (code) {             /* failure */
128             rcode = code;
129             ts->up = 0;         /* mark as down now; beacons will no longer be sent */
130             ts->currentDB = 0;
131             ts->beaconSinceDown = 0;
132             urecovery_LostServer();     /* tell recovery to try to resend dbase later */
133         } else {                /* success */
134             if (!ts->isClone)
135                 okcalls++;      /* count up how many worked */
136             if (aflags & CStampVersion) {
137                 ts->version = atrans->dbase->version;
138             }
139         }
140     }
141     /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
142     if (okcalls + 1 >= ubik_quorum)
143         return 0;
144     else
145         return rcode;
146 }
147
148 afs_int32
149 ContactQuorum_DISK_Lock(register struct ubik_trans *atrans, int aflags,afs_int32 file,
150                         afs_int32 position, afs_int32 length, afs_int32 type)
151 {
152     register struct ubik_server *ts;
153     register afs_int32 code;
154     afs_int32 rcode, okcalls;
155
156     rcode = 0;
157     okcalls = 0;
158     for (ts = ubik_servers; ts; ts = ts->next) {
159         /* for each server */
160         if (!ts->up || !ts->currentDB) {
161             ts->currentDB = 0;  /* db is no longer current; we just missed an update */
162             continue;           /* not up-to-date, don't bother */
163         }
164         code = DISK_Lock(ts->disk_rxcid, &atrans->tid, file, position, length,
165                            type);
166         if (code) {             /* failure */
167             rcode = code;
168             ts->up = 0;         /* mark as down now; beacons will no longer be sent */
169             ts->currentDB = 0;
170             ts->beaconSinceDown = 0;
171             urecovery_LostServer();     /* tell recovery to try to resend dbase later */
172         } else {                /* success */
173             if (!ts->isClone)
174                 okcalls++;      /* count up how many worked */
175             if (aflags & CStampVersion) {
176                 ts->version = atrans->dbase->version;
177             }
178         }
179     }
180     /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
181     if (okcalls + 1 >= ubik_quorum)
182         return 0;
183     else
184         return rcode;
185 }
186
187 afs_int32
188 ContactQuorum_DISK_Write(register struct ubik_trans *atrans, int aflags,
189                          afs_int32 file, afs_int32 position, bulkdata *data)
190 {
191     register struct ubik_server *ts;
192     register afs_int32 code;
193     afs_int32 rcode, okcalls;
194
195     rcode = 0;
196     okcalls = 0;
197     for (ts = ubik_servers; ts; ts = ts->next) {
198         /* for each server */
199         if (!ts->up || !ts->currentDB) {
200             ts->currentDB = 0;  /* db is no longer current; we just missed an update */
201             continue;           /* not up-to-date, don't bother */
202         }
203         code = DISK_Write(ts->disk_rxcid, &atrans->tid, file, position, data);
204         if (code) {             /* failure */
205             rcode = code;
206             ts->up = 0;         /* mark as down now; beacons will no longer be sent */
207             ts->currentDB = 0;
208             ts->beaconSinceDown = 0;
209             urecovery_LostServer();     /* tell recovery to try to resend dbase later */
210         } else {                /* success */
211             if (!ts->isClone)
212                 okcalls++;      /* count up how many worked */
213             if (aflags & CStampVersion) {
214                 ts->version = atrans->dbase->version;
215             }
216         }
217     }
218     /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
219     if (okcalls + 1 >= ubik_quorum)
220         return 0;
221     else
222         return rcode;
223 }
224
225 afs_int32
226 ContactQuorum_DISK_Truncate(register struct ubik_trans *atrans, int aflags,
227                             afs_int32 file, afs_int32 length)
228 {
229     register struct ubik_server *ts;
230     register afs_int32 code;
231     afs_int32 rcode, okcalls;
232
233     rcode = 0;
234     okcalls = 0;
235     for (ts = ubik_servers; ts; ts = ts->next) {
236         /* for each server */
237         if (!ts->up || !ts->currentDB) {
238             ts->currentDB = 0;  /* db is no longer current; we just missed an update */
239             continue;           /* not up-to-date, don't bother */
240         }
241         code = DISK_Truncate(ts->disk_rxcid, &atrans->tid, file, length);
242         if (code) {             /* failure */
243             rcode = code;
244             ts->up = 0;         /* mark as down now; beacons will no longer be sent */
245             ts->currentDB = 0;
246             ts->beaconSinceDown = 0;
247             urecovery_LostServer();     /* tell recovery to try to resend dbase later */
248         } else {                /* success */
249             if (!ts->isClone)
250                 okcalls++;      /* count up how many worked */
251             if (aflags & CStampVersion) {
252                 ts->version = atrans->dbase->version;
253             }
254         }
255     }
256     /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
257     if (okcalls + 1 >= ubik_quorum)
258         return 0;
259     else
260         return rcode;
261 }
262
263 afs_int32
264 ContactQuorum_DISK_WriteV(register struct ubik_trans *atrans, int aflags,
265                           iovec_wrt * io_vector, iovec_buf *io_buffer)
266 {
267     register struct ubik_server *ts;
268     register afs_int32 code;
269     afs_int32 rcode, okcalls;
270
271     rcode = 0;
272     okcalls = 0;
273     for (ts = ubik_servers; ts; ts = ts->next) {
274         /* for each server */
275         if (!ts->up || !ts->currentDB) {
276             ts->currentDB = 0;  /* db is no longer current; we just missed an update */
277             continue;           /* not up-to-date, don't bother */
278         }
279
280         code = DISK_WriteV(ts->disk_rxcid, &atrans->tid, io_vector, io_buffer);
281
282         if ((code <= -450) && (code > -500)) {
283             /* An RPC interface mismatch (as defined in comerr/error_msg.c).
284              * Un-bulk the entries and do individual DISK_Write calls
285              * instead of DISK_WriteV.
286              */
287             struct ubik_iovec *iovec =
288                 (struct ubik_iovec *)io_vector->iovec_wrt_val;
289             char *iobuf = (char *)io_buffer->iovec_buf_val;
290             bulkdata tcbs;
291             afs_int32 i, offset;
292
293             for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
294                 /* Sanity check for going off end of buffer */
295                 if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
296                     code = UINTERNAL;
297                     break;
298                 }
299                 tcbs.bulkdata_len = iovec[i].length;
300                 tcbs.bulkdata_val = &iobuf[offset];
301                 code =
302                     DISK_Write(ts->disk_rxcid, &atrans->tid, iovec[i].file,
303                                iovec[i].position, &tcbs);
304                 if (code)
305                     break;
306
307                 offset += iovec[i].length;
308             }
309         }
310
311         if (code) {             /* failure */
312             rcode = code;
313             ts->up = 0;         /* mark as down now; beacons will no longer be sent */
314             ts->currentDB = 0;
315             ts->beaconSinceDown = 0;
316             urecovery_LostServer();     /* tell recovery to try to resend dbase later */
317         } else {                /* success */
318             if (!ts->isClone)
319                 okcalls++;      /* count up how many worked */
320             if (aflags & CStampVersion) {
321                 ts->version = atrans->dbase->version;
322             }
323         }
324     }
325     /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
326     if (okcalls + 1 >= ubik_quorum)
327         return 0;
328     else
329         return rcode;
330 }
331
332 afs_int32
333 ContactQuorum_DISK_SetVersion(register struct ubik_trans *atrans, int aflags, 
334                               ubik_version *OldVersion,
335                               ubik_version *NewVersion)
336 {
337     register struct ubik_server *ts;
338     register afs_int32 code;
339     afs_int32 rcode, okcalls;
340
341     rcode = 0;
342     okcalls = 0;
343     for (ts = ubik_servers; ts; ts = ts->next) {
344         /* for each server */
345         if (!ts->up || !ts->currentDB) {
346             ts->currentDB = 0;  /* db is no longer current; we just missed an update */
347             continue;           /* not up-to-date, don't bother */
348         }
349         code = DISK_SetVersion(ts->disk_rxcid, &atrans->tid, OldVersion, 
350                                NewVersion);
351         if (code) {             /* failure */
352             rcode = code;
353             ts->up = 0;         /* mark as down now; beacons will no longer be sent */
354             ts->currentDB = 0;
355             ts->beaconSinceDown = 0;
356             urecovery_LostServer();     /* tell recovery to try to resend dbase later */
357         } else {                /* success */
358             if (!ts->isClone)
359                 okcalls++;      /* count up how many worked */
360             if (aflags & CStampVersion) {
361                 ts->version = atrans->dbase->version;
362             }
363         }
364     }
365     /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
366     if (okcalls + 1 >= ubik_quorum)
367         return 0;
368     else
369         return rcode;
370 }
371
372 /*! 
373  * \brief This routine initializes the ubik system for a set of servers.
374  * \return 0 for success, or an error code on failure.
375  * \param serverList set of servers specified; nServers gives the number of entries in this array.
376  * \param pathName provides an initial prefix used for naming storage files used by this system.  
377  * \param dbase the returned structure representing this instance of an ubik; it is passed to various calls below.  
378  *
379  * \todo This routine should perhaps be generalized to a low-level disk interface providing read, write, file enumeration and sync operations.
380  *
381  * \warning The host named by myHost should not also be listed in serverList.
382  *
383  * \see ubik_ServerInit(), ubik_ServerInitByInfo()
384  */
385 int
386 ubik_ServerInitCommon(afs_int32 myHost, short myPort,
387                       struct afsconf_cell *info, char clones[],
388                       afs_int32 serverList[], const char *pathName,
389                       struct ubik_dbase **dbase)
390 {
391     register struct ubik_dbase *tdb;
392     register afs_int32 code;
393 #ifdef AFS_PTHREAD_ENV
394     pthread_t rxServerThread;        /* pthread variables */
395     pthread_t ubeacon_InteractThread;
396     pthread_t urecovery_InteractThread;
397     pthread_attr_t rxServer_tattr;
398     pthread_attr_t ubeacon_Interact_tattr;
399     pthread_attr_t urecovery_Interact_tattr;
400 #else
401     PROCESS junk;
402     extern int rx_stackSize;
403 #endif
404
405     afs_int32 secIndex;
406     struct rx_securityClass *secClass;
407
408     struct rx_service *tservice;
409
410     initialize_U_error_table();
411
412     tdb = (struct ubik_dbase *)malloc(sizeof(struct ubik_dbase));
413     tdb->pathName = (char *)malloc(strlen(pathName) + 1);
414     strcpy(tdb->pathName, pathName);
415     tdb->activeTrans = (struct ubik_trans *)0;
416     memset(&tdb->version, 0, sizeof(struct ubik_version));
417     memset(&tdb->cachedVersion, 0, sizeof(struct ubik_version));
418     Lock_Init(&tdb->versionLock);
419     tdb->flags = 0;
420     tdb->read = uphys_read;
421     tdb->write = uphys_write;
422     tdb->truncate = uphys_truncate;
423     tdb->open = uphys_invalidate;       /* this function isn't used any more */
424     tdb->sync = uphys_sync;
425     tdb->stat = uphys_stat;
426     tdb->getlabel = uphys_getlabel;
427     tdb->setlabel = uphys_setlabel;
428     tdb->getnfiles = uphys_getnfiles;
429     tdb->readers = 0;
430     tdb->tidCounter = tdb->writeTidCounter = 0;
431     *dbase = tdb;
432     ubik_dbase = tdb;           /* for now, only one db per server; can fix later when we have names for the other dbases */
433
434 #ifdef AFS_PTHREAD_ENV
435     assert(pthread_cond_init(&tdb->version_cond, NULL) == 0);
436     assert(pthread_cond_init(&tdb->flags_cond, NULL) == 0);
437     assert(pthread_mutex_init(&tdb->version_mutex, NULL) == 0);
438     assert(pthread_mutex_init(&tdb->flags_mutex, NULL) == 0);
439 #endif /* AFS_PTHREAD_ENV */
440
441     /* initialize RX */
442
443     /* the following call is idempotent so when/if it got called earlier,
444      * by whatever called us, it doesn't really matter -- klm */
445     code = rx_Init(myPort);
446     if (code < 0)
447         return code;
448
449     ubik_callPortal = myPort;
450     /* try to get an additional security object */
451     ubik_sc[0] = rxnull_NewServerSecurityObject();
452     ubik_sc[1] = 0;
453     ubik_sc[2] = 0;
454     if (ubik_SRXSecurityProc) {
455         code =
456             (*ubik_SRXSecurityProc) (ubik_SRXSecurityRock, &secClass,
457                                      &secIndex);
458         if (code == 0) {
459             ubik_sc[secIndex] = secClass;
460         }
461     }
462     /* for backwards compat this should keep working as it does now 
463        and not host bind */
464 #if 0
465     /* This really needs to be up above, where I have put it.  It works
466      * here when we're non-pthreaded, but the code above, when using
467      * pthreads may (and almost certainly does) end up calling on a
468      * pthread resource which gets initialized by rx_Init.  The end
469      * result is that an assert fails and the program dies. -- klm
470      */
471     code = rx_Init(myPort);
472     if (code < 0)
473         return code;
474 #endif
475
476     tservice =
477         rx_NewService(0, VOTE_SERVICE_ID, "VOTE", ubik_sc, 3,
478                       VOTE_ExecuteRequest);
479     if (tservice == (struct rx_service *)0) {
480         ubik_dprint("Could not create VOTE rx service!\n");
481         return -1;
482     }
483     rx_SetMinProcs(tservice, 2);
484     rx_SetMaxProcs(tservice, 3);
485
486     tservice =
487         rx_NewService(0, DISK_SERVICE_ID, "DISK", ubik_sc, 3,
488                       DISK_ExecuteRequest);
489     if (tservice == (struct rx_service *)0) {
490         ubik_dprint("Could not create DISK rx service!\n");
491         return -1;
492     }
493     rx_SetMinProcs(tservice, 2);
494     rx_SetMaxProcs(tservice, 3);
495
496     /* start an rx_ServerProc to handle incoming RPC's in particular the 
497      * UpdateInterfaceAddr RPC that occurs in ubeacon_InitServerList. This avoids
498      * the "steplock" problem in ubik initialization. Defect 11037.
499      */
500 #ifdef AFS_PTHREAD_ENV
501 /* do assert stuff */
502     assert(pthread_attr_init(&rxServer_tattr) == 0);
503     assert(pthread_attr_setdetachstate(&rxServer_tattr, PTHREAD_CREATE_DETACHED) == 0);
504 /*    assert(pthread_attr_setstacksize(&rxServer_tattr, rx_stackSize) == 0); */
505
506     assert(pthread_create(&rxServerThread, &rxServer_tattr, (void *)rx_ServerProc, NULL) == 0);
507 #else
508     LWP_CreateProcess(rx_ServerProc, rx_stackSize, RX_PROCESS_PRIORITY,
509               NULL, "rx_ServerProc", &junk);
510 #endif
511
512     /* do basic initialization */
513     code = uvote_Init();
514     if (code)
515         return code;
516     code = urecovery_Initialize(tdb);
517     if (code)
518         return code;
519     if (info)
520         code = ubeacon_InitServerListByInfo(myHost, info, clones);
521     else
522         code = ubeacon_InitServerList(myHost, serverList);
523     if (code)
524         return code;
525
526     /* now start up async processes */
527 #ifdef AFS_PTHREAD_ENV
528 /* do assert stuff */
529     assert(pthread_attr_init(&ubeacon_Interact_tattr) == 0);
530     assert(pthread_attr_setdetachstate(&ubeacon_Interact_tattr, PTHREAD_CREATE_DETACHED) == 0);
531 /*    assert(pthread_attr_setstacksize(&ubeacon_Interact_tattr, 16384) == 0); */
532     /*  need another attr set here for priority???  - klm */
533
534     assert(pthread_create(&ubeacon_InteractThread, &ubeacon_Interact_tattr,
535            (void *)ubeacon_Interact, NULL) == 0);
536 #else
537     code = LWP_CreateProcess(ubeacon_Interact, 16384 /*8192 */ ,
538                              LWP_MAX_PRIORITY - 1, (void *)0, "beacon",
539                              &junk);
540     if (code)
541         return code;
542 #endif
543
544 #ifdef AFS_PTHREAD_ENV
545 /* do assert stuff */
546     assert(pthread_attr_init(&urecovery_Interact_tattr) == 0);
547     assert(pthread_attr_setdetachstate(&urecovery_Interact_tattr, PTHREAD_CREATE_DETACHED) == 0);
548 /*    assert(pthread_attr_setstacksize(&urecovery_Interact_tattr, 16384) == 0); */
549     /*  need another attr set here for priority???  - klm */
550
551     assert(pthread_create(&urecovery_InteractThread, &urecovery_Interact_tattr,
552            (void *)urecovery_Interact, NULL) == 0);
553
554     return 0;  /* is this correct?  - klm */
555 #else  
556     code = LWP_CreateProcess(urecovery_Interact, 16384 /*8192 */ ,
557                              LWP_MAX_PRIORITY - 1, (void *)0, "recovery",
558                              &junk);
559     return code;
560 #endif
561
562 }
563
564 /*!
565  * \see ubik_ServerInitCommon()
566  */
567 int
568 ubik_ServerInitByInfo(afs_int32 myHost, short myPort,
569                       struct afsconf_cell *info, char clones[],
570                       const char *pathName, struct ubik_dbase **dbase)
571 {
572     afs_int32 code;
573
574     code =
575         ubik_ServerInitCommon(myHost, myPort, info, clones, 0, pathName,
576                               dbase);
577     return code;
578 }
579
580 /*!
581  * \see ubik_ServerInitCommon()
582  */
583 int
584 ubik_ServerInit(afs_int32 myHost, short myPort, afs_int32 serverList[],
585                 const char *pathName, struct ubik_dbase **dbase)
586 {
587     afs_int32 code;
588
589     code =
590         ubik_ServerInitCommon(myHost, myPort, (struct afsconf_cell *)0, 0,
591                               serverList, pathName, dbase);
592     return code;
593 }
594
595 /*!
596  * \brief This routine begins a read or write transaction on the transaction
597  * identified by transPtr, in the dbase named by dbase.
598  *
599  * An open mode of ubik_READTRANS identifies this as a read transaction, 
600  * while a mode of ubik_WRITETRANS identifies this as a write transaction.
601  * transPtr is set to the returned transaction control block. 
602  * The readAny flag is set to 0 or 1 by the wrapper functions ubik_BeginTrans() or 
603  * ubik_BeginTransReadAny() below.
604  *
605  * \note We can only begin transaction when we have an up-to-date database.
606  */
607 static int
608 BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
609            struct ubik_trans **transPtr, int readAny)
610 {
611     struct ubik_trans *jt;
612     register struct ubik_trans *tt;
613     register afs_int32 code;
614 #if defined(UBIK_PAUSE)
615     int count;
616 #endif /* UBIK_PAUSE */
617
618     if ((transMode != UBIK_READTRANS) && readAny)
619         return UBADTYPE;
620     DBHOLD(dbase);
621 #if defined(UBIK_PAUSE)
622     /* if we're polling the slave sites, wait until the returns
623      *  are all in.  Otherwise, the urecovery_CheckTid call may
624      *  glitch us. 
625      */
626     if (transMode == UBIK_WRITETRANS)
627         for (count = 75; dbase->flags & DBVOTING; --count) {
628             DBRELE(dbase);
629 #ifdef GRAND_PAUSE_DEBUGGING
630             if (count == 75)
631                 fprintf(stderr,
632                         "%ld: myport=%d: BeginTrans is waiting 'cause of voting conflict\n",
633                         time(0), ntohs(ubik_callPortal));
634             else
635 #endif
636             if (count <= 0) {
637 #if 1
638                 fprintf(stderr,
639                         "%ld: myport=%d: BeginTrans failed because of voting conflict\n",
640                         time(0), ntohs(ubik_callPortal));
641 #endif
642                 return UNOQUORUM;       /* a white lie */
643             }
644 #ifdef AFS_PTHREAD_ENV
645             sleep(2);
646 #else
647             IOMGR_Sleep(2);
648 #endif
649             DBHOLD(dbase);
650         }
651 #endif /* UBIK_PAUSE */
652     if (urecovery_AllBetter(dbase, readAny) == 0) {
653         DBRELE(dbase);
654         return UNOQUORUM;
655     }
656     /* otherwise we have a quorum, use it */
657
658     /* make sure that at most one write transaction occurs at any one time.  This
659      * has nothing to do with transaction locking; that's enforced by the lock package.  However,
660      * we can't even handle two non-conflicting writes, since our log and recovery modules
661      * don't know how to restore one without possibly picking up some data from the other. */
662     if (transMode == UBIK_WRITETRANS) {
663         /* if we're writing already, wait */
664         while (dbase->flags & DBWRITING) {
665             DBRELE(dbase);
666 #ifdef AFS_PTHREAD_ENV
667             assert(pthread_mutex_lock(&dbase->flags_mutex) == 0);
668             assert(pthread_cond_wait(&dbase->flags_cond, &dbase->flags_mutex) == 0);
669             assert(pthread_mutex_unlock(&dbase->flags_mutex) == 0);
670 #else
671             LWP_WaitProcess(&dbase->flags);
672 #endif
673             DBHOLD(dbase);
674         }
675         if (!ubeacon_AmSyncSite()) {
676             DBRELE(dbase);
677             return UNOTSYNC;
678         }
679     }
680
681     /* create the transaction */
682     code = udisk_begin(dbase, transMode, &jt);  /* can't take address of register var */
683     tt = jt;                    /* move to a register */
684     if (code || tt == (struct ubik_trans *)NULL) {
685         DBRELE(dbase);
686         return code;
687     }
688     if (readAny)
689         tt->flags |= TRREADANY;
690     /* label trans and dbase with new tid */
691     tt->tid.epoch = ubik_epochTime;
692     /* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */
693     tt->tid.counter = (dbase->tidCounter += 2);
694
695     if (transMode == UBIK_WRITETRANS) {
696         /* for a write trans, we have to keep track of the write tid counter too */
697 #if defined(UBIK_PAUSE)
698         dbase->writeTidCounter = tt->tid.counter;
699 #else
700         dbase->writeTidCounter += 2;
701 #endif /* UBIK_PAUSE */
702
703         /* next try to start transaction on appropriate number of machines */
704         code = ContactQuorum_NoArguments(DISK_Begin, tt, 0);
705         if (code) {
706             /* we must abort the operation */
707             udisk_abort(tt);
708             ContactQuorum_NoArguments(DISK_Abort, tt, 0); /* force aborts to the others */
709             udisk_end(tt);
710             DBRELE(dbase);
711             return code;
712         }
713     }
714
715     *transPtr = tt;
716     DBRELE(dbase);
717     return 0;
718 }
719
720 /*!
721  * \see BeginTrans()
722  */
723 int
724 ubik_BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
725                 struct ubik_trans **transPtr)
726 {
727     return BeginTrans(dbase, transMode, transPtr, 0);
728 }
729
730 /*!
731  * \see BeginTrans()
732  */
733 int
734 ubik_BeginTransReadAny(register struct ubik_dbase *dbase, afs_int32 transMode,
735                        struct ubik_trans **transPtr)
736 {
737     return BeginTrans(dbase, transMode, transPtr, 1);
738 }
739
740 /*!
741  * \brief This routine ends a read or write transaction by aborting it.
742  */
743 int
744 ubik_AbortTrans(register struct ubik_trans *transPtr)
745 {
746     register afs_int32 code;
747     afs_int32 code2;
748     register struct ubik_dbase *dbase;
749
750     dbase = transPtr->dbase;
751     DBHOLD(dbase);
752     memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
753     /* see if we're still up-to-date */
754     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
755         udisk_abort(transPtr);
756         udisk_end(transPtr);
757         DBRELE(dbase);
758         return UNOQUORUM;
759     }
760
761     if (transPtr->type == UBIK_READTRANS) {
762         code = udisk_abort(transPtr);
763         udisk_end(transPtr);
764         DBRELE(dbase);
765         return code;
766     }
767
768     /* below here, we know we're doing a write transaction */
769     if (!ubeacon_AmSyncSite()) {
770         udisk_abort(transPtr);
771         udisk_end(transPtr);
772         DBRELE(dbase);
773         return UNOTSYNC;
774     }
775
776     /* now it is safe to try remote abort */
777     code = ContactQuorum_NoArguments(DISK_Abort, transPtr, 0);
778     code2 = udisk_abort(transPtr);
779     udisk_end(transPtr);
780     DBRELE(dbase);
781     return (code ? code : code2);
782 }
783
784 /*!
785  * \brief This routine ends a read or write transaction on the open transaction identified by transPtr.
786  * \return an error code.
787  */
788 int
789 ubik_EndTrans(register struct ubik_trans *transPtr)
790 {
791     register afs_int32 code;
792     struct timeval tv;
793     afs_int32 realStart;
794     register struct ubik_server *ts;
795     afs_int32 now;
796     register struct ubik_dbase *dbase;
797
798     if (transPtr->type == UBIK_WRITETRANS) {
799         code = ubik_Flush(transPtr);
800         if (code) {
801             ubik_AbortTrans(transPtr);
802             return (code);
803         }
804     }
805
806     dbase = transPtr->dbase;
807     DBHOLD(dbase);
808     memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
809
810     /* give up if no longer current */
811     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
812         udisk_abort(transPtr);
813         udisk_end(transPtr);
814         DBRELE(dbase);
815         return UNOQUORUM;
816     }
817
818     if (transPtr->type == UBIK_READTRANS) {     /* reads are easy */
819         code = udisk_commit(transPtr);
820         if (code == 0)
821             goto success;       /* update cachedVersion correctly */
822         udisk_end(transPtr);
823         DBRELE(dbase);
824         return code;
825     }
826
827     if (!ubeacon_AmSyncSite()) {        /* no longer sync site */
828         udisk_abort(transPtr);
829         udisk_end(transPtr);
830         DBRELE(dbase);
831         return UNOTSYNC;
832     }
833
834     /* now it is safe to do commit */
835     code = udisk_commit(transPtr);
836     if (code == 0)
837         code = ContactQuorum_NoArguments(DISK_Commit, transPtr, CStampVersion);
838     if (code) {
839         /* failed to commit, so must return failure.  Try to clear locks first, just for fun
840          * Note that we don't know if this transaction will eventually commit at this point.
841          * If it made it to a site that will be present in the next quorum, we win, otherwise
842          * we lose.  If we contact a majority of sites, then we won't be here: contacting
843          * a majority guarantees commit, since it guarantees that one dude will be a
844          * member of the next quorum. */
845         ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0);
846         udisk_end(transPtr);
847         DBRELE(dbase);
848         return code;
849     }
850     /* before we can start sending unlock messages, we must wait until all servers
851      * that are possibly still functioning on the other side of a network partition
852      * have timed out.  Check the server structures, compute how long to wait, then
853      * start the unlocks */
854     realStart = FT_ApproxTime();
855     while (1) {
856         /* wait for all servers to time out */
857         code = 0;
858         now = FT_ApproxTime();
859         /* check if we're still sync site, the guy should either come up
860          * to us, or timeout.  Put safety check in anyway */
861         if (now - realStart > 10 * BIGTIME) {
862             ubik_stats.escapes++;
863             ubik_print("ubik escaping from commit wait\n");
864             break;
865         }
866         for (ts = ubik_servers; ts; ts = ts->next) {
867             if (!ts->beaconSinceDown && now <= ts->lastBeaconSent + BIGTIME) {
868                 /* this guy could have some damaged data, wait for him */
869                 code = 1;
870                 tv.tv_sec = 1;  /* try again after a while (ha ha) */
871                 tv.tv_usec = 0;
872 #ifdef AFS_PTHREAD_ENV
873                 select(0, 0, 0, 0, &tv);
874 #else
875                 IOMGR_Select(0, 0, 0, 0, &tv);  /* poll, should we wait on something? */
876 #endif
877                 break;
878             }
879         }
880         if (code == 0)
881             break;              /* no down ones still pseudo-active */
882     }
883
884     /* finally, unlock all the dudes.  We can return success independent of the number of servers
885      * that really unlock the dbase; the others will do it if/when they elect a new sync site.
886      * The transaction is committed anyway, since we succeeded in contacting a quorum
887      * at the start (when invoking the DiskCommit function).
888      */
889     ContactQuorum_NoArguments(DISK_ReleaseLocks, transPtr, 0);
890
891   success:
892     udisk_end(transPtr);
893     /* update version on successful EndTrans */
894     memcpy(&dbase->cachedVersion, &dbase->version,
895            sizeof(struct ubik_version));
896
897     DBRELE(dbase);
898     return 0;
899 }
900
901 /*!
902  * \brief This routine reads length bytes into buffer from the current position in the database.
903  * 
904  * The file pointer is updated appropriately (by adding the number of bytes actually transferred), and the length actually transferred is stored in the long integer pointed to by length.  A short read returns zero for an error code.
905  *
906  * \note *length is an INOUT parameter: at the start it represents the size of the buffer, and when done, it contains the number of bytes actually transferred.
907  */
908 int
909 ubik_Read(register struct ubik_trans *transPtr, void *buffer,
910           afs_int32 length)
911 {
912     register afs_int32 code;
913
914     /* reads are easy to do: handle locally */
915     DBHOLD(transPtr->dbase);
916     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
917         DBRELE(transPtr->dbase);
918         return UNOQUORUM;
919     }
920
921     code =
922         udisk_read(transPtr, transPtr->seekFile, buffer, transPtr->seekPos,
923                    length);
924     if (code == 0) {
925         transPtr->seekPos += length;
926     }
927     DBRELE(transPtr->dbase);
928     return code;
929 }
930
931 /*!
932  * \brief This routine will flush the io data in the iovec structures. 
933  *
934  * It first flushes to the local disk and then uses ContactQuorum to write it
935  * to the other servers.
936  */
937 int
938 ubik_Flush(struct ubik_trans *transPtr)
939 {
940     afs_int32 code, error = 0;
941
942     if (transPtr->type != UBIK_WRITETRANS)
943         return UBADTYPE;
944     if (!transPtr->iovec_info.iovec_wrt_len
945         || !transPtr->iovec_info.iovec_wrt_val)
946         return 0;
947
948     DBHOLD(transPtr->dbase);
949     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
950         ERROR_EXIT(UNOQUORUM);
951     if (!ubeacon_AmSyncSite())  /* only sync site can write */
952         ERROR_EXIT(UNOTSYNC);
953
954     /* Update the rest of the servers in the quorum */
955     code =
956         ContactQuorum_DISK_WriteV(transPtr, 0, &transPtr->iovec_info,
957                                   &transPtr->iovec_data);
958     if (code) {
959         udisk_abort(transPtr);
960         ContactQuorum_NoArguments(DISK_Abort, transPtr, 0); /* force aborts to the others */
961         transPtr->iovec_info.iovec_wrt_len = 0;
962         transPtr->iovec_data.iovec_buf_len = 0;
963         ERROR_EXIT(code);
964     }
965
966     /* Wrote the buffers out, so start at scratch again */
967     transPtr->iovec_info.iovec_wrt_len = 0;
968     transPtr->iovec_data.iovec_buf_len = 0;
969
970   error_exit:
971     DBRELE(transPtr->dbase);
972     return error;
973 }
974
975 int
976 ubik_Write(register struct ubik_trans *transPtr, void *vbuffer,
977            afs_int32 length)
978 {
979     struct ubik_iovec *iovec;
980     afs_int32 code, error = 0;
981     afs_int32 pos, len, size;
982     char * buffer = (char *)vbuffer;
983
984     if (transPtr->type != UBIK_WRITETRANS)
985         return UBADTYPE;
986     if (!length)
987         return 0;
988
989     if (length > IOVEC_MAXBUF) {
990         for (pos = 0, len = length; len > 0; len -= size, pos += size) {
991             size = ((len < IOVEC_MAXBUF) ? len : IOVEC_MAXBUF);
992             code = ubik_Write(transPtr, buffer+pos, size);
993             if (code)
994                 return (code);
995         }
996         return 0;
997     }
998
999     if (!transPtr->iovec_info.iovec_wrt_val) {
1000         transPtr->iovec_info.iovec_wrt_len = 0;
1001         transPtr->iovec_info.iovec_wrt_val =
1002             (struct ubik_iovec *)malloc(IOVEC_MAXWRT *
1003                                         sizeof(struct ubik_iovec));
1004         transPtr->iovec_data.iovec_buf_len = 0;
1005         transPtr->iovec_data.iovec_buf_val = (char *)malloc(IOVEC_MAXBUF);
1006         if (!transPtr->iovec_info.iovec_wrt_val
1007             || !transPtr->iovec_data.iovec_buf_val) {
1008             if (transPtr->iovec_info.iovec_wrt_val)
1009                 free(transPtr->iovec_info.iovec_wrt_val);
1010             transPtr->iovec_info.iovec_wrt_val = 0;
1011             if (transPtr->iovec_data.iovec_buf_val)
1012                 free(transPtr->iovec_data.iovec_buf_val);
1013             transPtr->iovec_data.iovec_buf_val = 0;
1014             return UNOMEM;
1015         }
1016     }
1017
1018     /* If this write won't fit in the structure, then flush it out and start anew */
1019     if ((transPtr->iovec_info.iovec_wrt_len >= IOVEC_MAXWRT)
1020         || ((length + transPtr->iovec_data.iovec_buf_len) > IOVEC_MAXBUF)) {
1021         code = ubik_Flush(transPtr);
1022         if (code)
1023             return (code);
1024     }
1025
1026     DBHOLD(transPtr->dbase);
1027     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
1028         ERROR_EXIT(UNOQUORUM);
1029     if (!ubeacon_AmSyncSite())  /* only sync site can write */
1030         ERROR_EXIT(UNOTSYNC);
1031
1032     /* Write to the local disk */
1033     code =
1034         udisk_write(transPtr, transPtr->seekFile, buffer, transPtr->seekPos,
1035                     length);
1036     if (code) {
1037         udisk_abort(transPtr);
1038         transPtr->iovec_info.iovec_wrt_len = 0;
1039         transPtr->iovec_data.iovec_buf_len = 0;
1040         DBRELE(transPtr->dbase);
1041         return (code);
1042     }
1043
1044     /* Collect writes for the other ubik servers (to be done in bulk) */
1045     iovec = (struct ubik_iovec *)transPtr->iovec_info.iovec_wrt_val;
1046     iovec[transPtr->iovec_info.iovec_wrt_len].file = transPtr->seekFile;
1047     iovec[transPtr->iovec_info.iovec_wrt_len].position = transPtr->seekPos;
1048     iovec[transPtr->iovec_info.iovec_wrt_len].length = length;
1049
1050     memcpy(&transPtr->iovec_data.
1051            iovec_buf_val[transPtr->iovec_data.iovec_buf_len], buffer, length);
1052
1053     transPtr->iovec_info.iovec_wrt_len++;
1054     transPtr->iovec_data.iovec_buf_len += length;
1055     transPtr->seekPos += length;
1056
1057   error_exit:
1058     DBRELE(transPtr->dbase);
1059     return error;
1060 }
1061
1062 /*!
1063  * \brief This sets the file pointer associated with the current transaction
1064  * to the appropriate file and byte position.
1065  *
1066  * Unlike Unix files, a transaction is labelled by both a file number \p fileid
1067  * and a byte position relative to the specified file \p position.
1068  */
1069 int
1070 ubik_Seek(register struct ubik_trans *transPtr, afs_int32 fileid,
1071           afs_int32 position)
1072 {
1073     register afs_int32 code;
1074
1075     DBHOLD(transPtr->dbase);
1076     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
1077         code = UNOQUORUM;
1078     } else {
1079         transPtr->seekFile = fileid;
1080         transPtr->seekPos = position;
1081         code = 0;
1082     }
1083     DBRELE(transPtr->dbase);
1084     return code;
1085 }
1086
1087 /*!
1088  * \brief This call returns the file pointer associated with the specified
1089  * transaction in \p fileid and \p position.
1090  */
1091 int
1092 ubik_Tell(register struct ubik_trans *transPtr, afs_int32 * fileid,
1093           afs_int32 * position)
1094 {
1095     DBHOLD(transPtr->dbase);
1096     *fileid = transPtr->seekFile;
1097     *position = transPtr->seekPos;
1098     DBRELE(transPtr->dbase);
1099     return 0;
1100 }
1101
1102 /*!
1103  * \brief This sets the file size for the currently-selected file to \p length
1104  * bytes, if length is less than the file's current size.
1105  */
1106 int
1107 ubik_Truncate(register struct ubik_trans *transPtr, afs_int32 length)
1108 {
1109     afs_int32 code, error = 0;
1110
1111     /* Will also catch if not UBIK_WRITETRANS */
1112     code = ubik_Flush(transPtr);
1113     if (code)
1114         return (code);
1115
1116     DBHOLD(transPtr->dbase);
1117     /* first, check that quorum is still good, and that dbase is up-to-date */
1118     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
1119         ERROR_EXIT(UNOQUORUM);
1120     if (!ubeacon_AmSyncSite())
1121         ERROR_EXIT(UNOTSYNC);
1122
1123     /* now do the operation locally, and propagate it out */
1124     code = udisk_truncate(transPtr, transPtr->seekFile, length);
1125     if (!code) {
1126         code =
1127             ContactQuorum_DISK_Truncate(transPtr, 0, transPtr->seekFile,
1128                                         length);
1129     }
1130     if (code) {
1131         /* we must abort the operation */
1132         udisk_abort(transPtr);
1133         ContactQuorum_NoArguments(DISK_Abort, transPtr, 0); /* force aborts to the others */
1134         ERROR_EXIT(code);
1135     }
1136
1137   error_exit:
1138     DBRELE(transPtr->dbase);
1139     return error;
1140 }
1141
1142 /*!
1143  * \brief set a lock; all locks are released on transaction end (commit/abort)
1144  */
1145 int
1146 ubik_SetLock(struct ubik_trans *atrans, afs_int32 apos, afs_int32 alen,
1147              int atype)
1148 {
1149     afs_int32 code = 0, error = 0;
1150
1151     if (atype == LOCKWRITE) {
1152         if (atrans->type == UBIK_READTRANS)
1153             return UBADTYPE;
1154         code = ubik_Flush(atrans);
1155         if (code)
1156             return (code);
1157     }
1158
1159     DBHOLD(atrans->dbase);
1160     if (atype == LOCKREAD) {
1161         code = ulock_getLock(atrans, atype, 1);
1162         if (code)
1163             ERROR_EXIT(code);
1164     } else {
1165         /* first, check that quorum is still good, and that dbase is up-to-date */
1166         if (!urecovery_AllBetter(atrans->dbase, atrans->flags & TRREADANY))
1167             ERROR_EXIT(UNOQUORUM);
1168         if (!ubeacon_AmSyncSite())
1169             ERROR_EXIT(UNOTSYNC);
1170
1171         /* now do the operation locally, and propagate it out */
1172         code = ulock_getLock(atrans, atype, 1);
1173         if (code == 0) {
1174             code = ContactQuorum_DISK_Lock(atrans, 0, 0, 1 /*unused */ ,
1175                                            1 /*unused */ , LOCKWRITE);
1176         }
1177         if (code) {
1178             /* we must abort the operation */
1179             udisk_abort(atrans);
1180             ContactQuorum_NoArguments(DISK_Abort, atrans, 0); /* force aborts to the others */
1181             ERROR_EXIT(code);
1182         }
1183     }
1184
1185   error_exit:
1186     DBRELE(atrans->dbase);
1187     return error;
1188 }
1189
1190 /*!
1191  * \brief utility to wait for a version # to change
1192  */
1193 int
1194 ubik_WaitVersion(register struct ubik_dbase *adatabase,
1195                  register struct ubik_version *aversion)
1196 {
1197     while (1) {
1198         /* wait until version # changes, and then return */
1199         if (vcmp(*aversion, adatabase->version) != 0)
1200             return 0;
1201 #ifdef AFS_PTHREAD_ENV
1202         assert(pthread_mutex_lock(&adatabase->version_mutex) == 0);
1203         assert(pthread_cond_wait(&adatabase->version_cond,&adatabase->version_mutex) == 0);
1204         assert(pthread_mutex_unlock(&adatabase->version_mutex) == 0);
1205 #else
1206         LWP_WaitProcess(&adatabase->version);   /* same vers, just wait */
1207 #endif
1208     }
1209 }
1210
1211 /*!
1212  * \brief utility to get the version of the dbase a transaction is dealing with
1213  */
1214 int
1215 ubik_GetVersion(register struct ubik_trans *atrans,
1216                 register struct ubik_version *avers)
1217 {
1218     *avers = atrans->dbase->version;
1219     return 0;
1220 }
1221
1222 /*!
1223  * \brief Facility to simplify database caching.  
1224  * \return zero if last trans was done on the local server and was successful.
1225  * \return -1 means bad (NULL) argument.
1226  * 
1227  * If return value is non-zero and the caller is a server caching part of the 
1228  * Ubik database, it should invalidate that cache.
1229  */
1230 int
1231 ubik_CacheUpdate(register struct ubik_trans *atrans)
1232 {
1233     if (!(atrans && atrans->dbase))
1234         return -1;
1235     return vcmp(atrans->dbase->cachedVersion, atrans->dbase->version) != 0;
1236 }
1237
1238 /*!
1239  * "Who said anything about panicking?" snapped Arthur. 
1240  * "This is still just the culture shock. You wait till I've settled down
1241  * into the situation and found my bearings. \em Then I'll start panicking!"
1242  * --Authur Dent
1243  *
1244  * \returns There is no return from panic.
1245  */
1246 void
1247 panic(char *format, ...)
1248 {
1249     va_list ap;
1250
1251     va_start(ap, format);
1252     ubik_print("Ubik PANIC: ");
1253     ubik_vprint(format, ap);
1254     va_end(ap);
1255
1256     abort();
1257     ubik_print("BACK FROM ABORT\n");    /* shouldn't come back */
1258     exit(1);                    /* never know, though  */
1259 }
1260
1261 /*!
1262  * This function takes an IP addresses as its parameter. It returns the
1263  * the primary IP address that is on the host passed in, or 0 if not found.
1264  */
1265 afs_uint32
1266 ubikGetPrimaryInterfaceAddr(afs_uint32 addr)
1267 {
1268     struct ubik_server *ts;
1269     int j;
1270
1271     for (ts = ubik_servers; ts; ts = ts->next)
1272         for (j = 0; j < UBIK_MAX_INTERFACE_ADDR; j++)
1273             if (ts->addr[j] == addr)
1274                 return ts->addr[0];     /* net byte order */
1275     return 0;                   /* if not in server database, return error */
1276 }