2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
16 #include <sys/types.h>
21 #include <netinet/in.h>
22 #include <sys/param.h>
29 #include <afs/cellconfig.h>
31 #define UBIK_INTERNALS
35 #define ERROR_EXIT(code) {error=(code); goto error_exit;}
37 /* This system is organized in a hierarchical set of related modules. Modules
38 at one level can only call modules at the same level or below.
40 At the bottom level (0) we have R, RFTP, LWP and IOMGR, i.e. the basic
41 operating system primitives.
43 At the next level (1) we have
45 VOTER--The module responsible for casting votes when asked. It is also
46 responsible for determining whether this server should try to become
47 a synchronization site.
49 BEACONER--The module responsible for sending keep-alives out when a
50 server is actually the sync site, or trying to become a sync site.
52 DISK--The module responsible for representing atomic transactions
53 on the local disk. It maintains a new-value only log.
55 LOCK--The module responsible for locking byte ranges in the database file.
57 At the next level (2) we have
59 RECOVERY--The module responsible for ensuring that all members of a quorum
60 have the same up-to-date database after a new synchronization site is
61 elected. This module runs only on the synchronization site.
63 At the next level (3) we have
65 REMOTE--The module responsible for interpreting requests from the sync
66 site and applying them to the database, after obtaining the appropriate
69 At the next level (4) we have
71 UBIK--The module users call to perform operations on the database.
76 afs_int32 ubik_quorum = 0;
77 struct ubik_dbase *ubik_dbase = 0;
78 struct ubik_stats ubik_stats;
79 afs_uint32 ubik_host[UBIK_MAX_INTERFACE_ADDR];
80 afs_int32 ubik_epochTime = 0;
81 afs_int32 urecovery_state = 0;
82 int (*ubik_SRXSecurityProc) ();
83 char *ubik_SRXSecurityRock;
84 struct ubik_server *ubik_servers;
85 short ubik_callPortal;
87 static int BeginTrans();
89 struct rx_securityClass *ubik_sc[3];
91 /* perform an operation at a quorum, handling error conditions. return 0 if
92 all worked, otherwise mark failing server as down and return UERROR
94 Note that if any server misses an update, we must wait BIGTIME seconds before
95 allowing the transaction to commit, to ensure that the missing and possibly still
96 functioning server times out and stop handing out old data. This is done in the commit
97 code, where we wait for a server marked down to have stayed down for BIGTIME seconds
98 before we allow a transaction to commit. A server that fails but comes back up won't give
99 out old data because it is sent the sync count along with the beacon message that
100 marks it as *really* up (beaconSinceDown).
102 #define CStampVersion 1 /* meaning set ts->version */
104 ContactQuorum(aproc, atrans, aflags, aparm0, aparm1, aparm2, aparm3, aparm4,
108 register struct ubik_trans *atrans;
109 long aparm0, aparm1, aparm2, aparm3, aparm4, aparm5;
111 register struct ubik_server *ts;
112 register afs_int32 code;
113 afs_int32 rcode, okcalls;
117 for (ts = ubik_servers; ts; ts = ts->next) {
118 /* for each server */
119 if (!ts->up || !ts->currentDB) {
120 ts->currentDB = 0; /* db is no longer current; we just missed an update */
121 continue; /* not up-to-date, don't bother */
124 (*aproc) (ts->disk_rxcid, &atrans->tid, aparm0, aparm1, aparm2,
125 aparm3, aparm4, aparm5);
126 if ((aproc == DISK_WriteV) && (code <= -450) && (code > -500)) {
127 /* An RPC interface mismatch (as defined in comerr/error_msg.c).
128 * Un-bulk the entries and do individual DISK_Write calls
129 * instead of DISK_WriteV.
131 iovec_wrt *iovec_infoP = (iovec_wrt *) aparm0;
132 iovec_buf *iovec_dataP = (iovec_buf *) aparm1;
133 struct ubik_iovec *iovec =
134 (struct ubik_iovec *)iovec_infoP->iovec_wrt_val;
135 char *iobuf = (char *)iovec_dataP->iovec_buf_val;
139 for (i = 0, offset = 0; i < iovec_infoP->iovec_wrt_len; i++) {
140 /* Sanity check for going off end of buffer */
141 if ((offset + iovec[i].length) > iovec_dataP->iovec_buf_len) {
145 tcbs.bulkdata_len = iovec[i].length;
146 tcbs.bulkdata_val = &iobuf[offset];
148 DISK_Write(ts->disk_rxcid, &atrans->tid, iovec[i].file,
149 iovec[i].position, &tcbs);
153 offset += iovec[i].length;
156 if (code) { /* failure */
158 ts->up = 0; /* mark as down now; beacons will no longer be sent */
160 ts->beaconSinceDown = 0;
161 urecovery_LostServer(); /* tell recovery to try to resend dbase later */
162 } else { /* success */
164 okcalls++; /* count up how many worked */
165 if (aflags & CStampVersion) {
166 ts->version = atrans->dbase->version;
170 /* return 0 if we successfully contacted a quorum, otherwise return error code. We don't have to contact ourselves (that was done locally) */
171 if (okcalls + 1 >= ubik_quorum)
177 /* This routine initializes the ubik system for a set of servers. It returns 0 for success, or an error code on failure. The set of servers is specified by serverList; nServers gives the number of entries in this array. Finally, dbase is the returned structure representing this instance of a ubik; it is passed to various calls below. The variable pathName provides an initial prefix used for naming storage files used by this system. It should perhaps be generalized to a low-level disk interface providing read, write, file enumeration and sync operations.
179 Note that the host named by myHost should not also be listed in serverList.
183 ubik_ServerInitCommon(afs_int32 myHost, short myPort,
184 struct afsconf_cell *info, char clones[],
185 afs_int32 serverList[], char *pathName,
186 struct ubik_dbase **dbase)
188 register struct ubik_dbase *tdb;
189 register afs_int32 code;
192 struct rx_securityClass *secClass;
194 struct rx_service *tservice;
195 extern int VOTE_ExecuteRequest(), DISK_ExecuteRequest();
196 extern void rx_ServerProc();
197 extern int rx_stackSize;
199 initialize_U_error_table();
201 tdb = (struct ubik_dbase *)malloc(sizeof(struct ubik_dbase));
202 tdb->pathName = (char *)malloc(strlen(pathName) + 1);
203 strcpy(tdb->pathName, pathName);
204 tdb->activeTrans = (struct ubik_trans *)0;
205 memset(&tdb->version, 0, sizeof(struct ubik_version));
206 memset(&tdb->cachedVersion, 0, sizeof(struct ubik_version));
207 Lock_Init(&tdb->versionLock);
209 tdb->read = uphys_read;
210 tdb->write = uphys_write;
211 tdb->truncate = uphys_truncate;
212 tdb->open = 0; /* this function isn't used any more */
213 tdb->sync = uphys_sync;
214 tdb->stat = uphys_stat;
215 tdb->getlabel = uphys_getlabel;
216 tdb->setlabel = uphys_setlabel;
217 tdb->getnfiles = uphys_getnfiles;
219 tdb->tidCounter = tdb->writeTidCounter = 0;
221 ubik_dbase = tdb; /* for now, only one db per server; can fix later when we have names for the other dbases */
224 ubik_callPortal = myPort;
225 /* try to get an additional security object */
226 ubik_sc[0] = rxnull_NewServerSecurityObject();
229 if (ubik_SRXSecurityProc) {
231 (*ubik_SRXSecurityProc) (ubik_SRXSecurityRock, &secClass,
234 ubik_sc[secIndex] = secClass;
237 /* for backwards compat this should keep working as it does now
239 code = rx_Init(myPort);
243 rx_NewService(0, VOTE_SERVICE_ID, "VOTE", ubik_sc, 3,
244 VOTE_ExecuteRequest);
245 if (tservice == (struct rx_service *)0) {
246 ubik_dprint("Could not create VOTE rx service!\n");
249 rx_SetMinProcs(tservice, 2);
250 rx_SetMaxProcs(tservice, 3);
253 rx_NewService(0, DISK_SERVICE_ID, "DISK", ubik_sc, 3,
254 DISK_ExecuteRequest);
255 if (tservice == (struct rx_service *)0) {
256 ubik_dprint("Could not create DISK rx service!\n");
259 rx_SetMinProcs(tservice, 2);
260 rx_SetMaxProcs(tservice, 3);
262 /* start an rx_ServerProc to handle incoming RPC's in particular the
263 * UpdateInterfaceAddr RPC that occurs in ubeacon_InitServerList. This avoids
264 * the "steplock" problem in ubik initialization. Defect 11037.
266 LWP_CreateProcess(rx_ServerProc, rx_stackSize, RX_PROCESS_PRIORITY,
267 (void *)0, "rx_ServerProc", &junk);
269 /* do basic initialization */
273 code = urecovery_Initialize(tdb);
277 code = ubeacon_InitServerListByInfo(myHost, info, clones);
279 code = ubeacon_InitServerList(myHost, serverList);
283 /* now start up async processes */
284 code = LWP_CreateProcess(ubeacon_Interact, 16384 /*8192 */ ,
285 LWP_MAX_PRIORITY - 1, (void *)0, "beacon",
289 code = LWP_CreateProcess(urecovery_Interact, 16384 /*8192 */ ,
290 LWP_MAX_PRIORITY - 1, (void *)0, "recovery",
296 ubik_ServerInitByInfo(afs_int32 myHost, short myPort,
297 struct afsconf_cell *info, char clones[],
298 char *pathName, struct ubik_dbase **dbase)
303 ubik_ServerInitCommon(myHost, myPort, info, clones, 0, pathName,
309 ubik_ServerInit(afs_int32 myHost, short myPort, afs_int32 serverList[],
310 char *pathName, struct ubik_dbase **dbase)
315 ubik_ServerInitCommon(myHost, myPort, (struct afsconf_cell *)0, 0,
316 serverList, pathName, dbase);
320 /* This routine begins a read or write transaction on the transaction
321 identified by transPtr, in the dbase named by dbase. An open mode of
322 ubik_READTRANS identifies this as a read transaction, while a mode of
323 ubik_WRITETRANS identifies this as a write transaction. transPtr
324 is set to the returned transaction control block. The readAny flag is
325 set to 0 or 1 by the wrapper functions ubik_BeginTrans() or
326 ubik_BeginTransReadAny() below.
328 We can only begin transaction when we have an up-to-date database.
332 BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
333 struct ubik_trans **transPtr, int readAny)
335 struct ubik_trans *jt;
336 register struct ubik_trans *tt;
337 register afs_int32 code;
338 #if defined(UBIK_PAUSE)
340 #endif /* UBIK_PAUSE */
342 if ((transMode != UBIK_READTRANS) && readAny)
345 #if defined(UBIK_PAUSE)
346 /* if we're polling the slave sites, wait until the returns
347 * are all in. Otherwise, the urecovery_CheckTid call may
350 if (transMode == UBIK_WRITETRANS)
351 for (count = 75; dbase->flags & DBVOTING; --count) {
353 #ifdef GRAND_PAUSE_DEBUGGING
356 "%ld: myport=%d: BeginTrans is waiting 'cause of voting conflict\n",
357 time(0), ntohs(ubik_callPortal));
363 "%ld: myport=%d: BeginTrans failed because of voting conflict\n",
364 time(0), ntohs(ubik_callPortal));
366 return UNOQUORUM; /* a white lie */
371 #endif /* UBIK_PAUSE */
372 if (urecovery_AllBetter(dbase, readAny) == 0) {
376 /* otherwise we have a quorum, use it */
378 /* make sure that at most one write transaction occurs at any one time. This
379 * has nothing to do with transaction locking; that's enforced by the lock package. However,
380 * we can't even handle two non-conflicting writes, since our log and recovery modules
381 * don't know how to restore one without possibly picking up some data from the other. */
382 if (transMode == UBIK_WRITETRANS) {
383 /* if we're writing already, wait */
384 while (dbase->flags & DBWRITING) {
386 LWP_WaitProcess(&dbase->flags);
389 if (!ubeacon_AmSyncSite()) {
395 /* create the transaction */
396 code = udisk_begin(dbase, transMode, &jt); /* can't take address of register var */
397 tt = jt; /* move to a register */
398 if (code || tt == (struct ubik_trans *)NULL) {
403 tt->flags |= TRREADANY;
404 /* label trans and dbase with new tid */
405 tt->tid.epoch = ubik_epochTime;
406 /* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */
407 tt->tid.counter = (dbase->tidCounter += 2);
409 if (transMode == UBIK_WRITETRANS) {
410 /* for a write trans, we have to keep track of the write tid counter too */
411 #if defined(UBIK_PAUSE)
412 dbase->writeTidCounter = tt->tid.counter;
414 dbase->writeTidCounter += 2;
415 #endif /* UBIK_PAUSE */
417 /* next try to start transaction on appropriate number of machines */
418 code = ContactQuorum(DISK_Begin, tt, 0);
420 /* we must abort the operation */
422 ContactQuorum(DISK_Abort, tt, 0); /* force aborts to the others */
435 ubik_BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
436 struct ubik_trans **transPtr)
438 return BeginTrans(dbase, transMode, transPtr, 0);
442 ubik_BeginTransReadAny(register struct ubik_dbase *dbase, afs_int32 transMode,
443 struct ubik_trans **transPtr)
445 return BeginTrans(dbase, transMode, transPtr, 1);
448 /* this routine ends a read or write transaction by aborting it */
450 ubik_AbortTrans(register struct ubik_trans *transPtr)
452 register afs_int32 code;
454 register struct ubik_dbase *dbase;
456 dbase = transPtr->dbase;
458 memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
459 /* see if we're still up-to-date */
460 if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
461 udisk_abort(transPtr);
467 if (transPtr->type == UBIK_READTRANS) {
468 code = udisk_abort(transPtr);
474 /* below here, we know we're doing a write transaction */
475 if (!ubeacon_AmSyncSite()) {
476 udisk_abort(transPtr);
482 /* now it is safe to try remote abort */
483 code = ContactQuorum(DISK_Abort, transPtr, 0);
484 code2 = udisk_abort(transPtr);
487 return (code ? code : code2);
490 /* This routine ends a read or write transaction on the open transaction identified by transPtr. It returns an error code. */
492 ubik_EndTrans(register struct ubik_trans *transPtr)
494 register afs_int32 code;
497 register struct ubik_server *ts;
499 register struct ubik_dbase *dbase;
501 if (transPtr->type == UBIK_WRITETRANS) {
502 code = ubik_Flush(transPtr);
504 ubik_AbortTrans(transPtr);
509 dbase = transPtr->dbase;
511 memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
513 /* give up if no longer current */
514 if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
515 udisk_abort(transPtr);
521 if (transPtr->type == UBIK_READTRANS) { /* reads are easy */
522 code = udisk_commit(transPtr);
524 goto success; /* update cachedVersion correctly */
530 if (!ubeacon_AmSyncSite()) { /* no longer sync site */
531 udisk_abort(transPtr);
537 /* now it is safe to do commit */
538 code = udisk_commit(transPtr);
540 code = ContactQuorum(DISK_Commit, transPtr, CStampVersion);
542 /* failed to commit, so must return failure. Try to clear locks first, just for fun
543 * Note that we don't know if this transaction will eventually commit at this point.
544 * If it made it to a site that will be present in the next quorum, we win, otherwise
545 * we lose. If we contact a majority of sites, then we won't be here: contacting
546 * a majority guarantees commit, since it guarantees that one dude will be a
547 * member of the next quorum. */
548 ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
553 /* before we can start sending unlock messages, we must wait until all servers
554 * that are possibly still functioning on the other side of a network partition
555 * have timed out. Check the server structures, compute how long to wait, then
556 * start the unlocks */
557 realStart = FT_ApproxTime();
559 /* wait for all servers to time out */
561 now = FT_ApproxTime();
562 /* check if we're still sync site, the guy should either come up
563 * to us, or timeout. Put safety check in anyway */
564 if (now - realStart > 10 * BIGTIME) {
565 ubik_stats.escapes++;
566 ubik_print("ubik escaping from commit wait\n");
569 for (ts = ubik_servers; ts; ts = ts->next) {
570 if (!ts->beaconSinceDown && now <= ts->lastBeaconSent + BIGTIME) {
571 /* this guy could have some damaged data, wait for him */
573 tv.tv_sec = 1; /* try again after a while (ha ha) */
575 IOMGR_Select(0, 0, 0, 0, &tv); /* poll, should we wait on something? */
580 break; /* no down ones still pseudo-active */
583 /* finally, unlock all the dudes. We can return success independent of the number of servers
584 * that really unlock the dbase; the others will do it if/when they elect a new sync site.
585 * The transaction is committed anyway, since we succeeded in contacting a quorum
586 * at the start (when invoking the DiskCommit function).
588 ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
592 /* update version on successful EndTrans */
593 memcpy(&dbase->cachedVersion, &dbase->version,
594 sizeof(struct ubik_version));
600 /* This routine reads length bytes into buffer from the current position in the database. The file pointer is updated appropriately (by adding the number of bytes actually transferred), and the length actually transferred is stored in the long integer pointed to by length. Note that *length is an INOUT parameter: at the start it represents the size of the buffer, and when done, it contains the number of bytes actually transferred. A short read returns zero for an error code. */
603 ubik_Read(register struct ubik_trans *transPtr, char *buffer,
606 register afs_int32 code;
608 /* reads are easy to do: handle locally */
609 DBHOLD(transPtr->dbase);
610 if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
611 DBRELE(transPtr->dbase);
616 udisk_read(transPtr, transPtr->seekFile, buffer, transPtr->seekPos,
619 transPtr->seekPos += length;
621 DBRELE(transPtr->dbase);
625 /* This routine will flush the io data in the iovec structures. It first
626 * flushes to the local disk and then uses ContactQuorum to write it to
630 ubik_Flush(struct ubik_trans *transPtr)
632 afs_int32 code, error = 0;
634 if (transPtr->type != UBIK_WRITETRANS)
636 if (!transPtr->iovec_info.iovec_wrt_len
637 || !transPtr->iovec_info.iovec_wrt_val)
640 DBHOLD(transPtr->dbase);
641 if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
642 ERROR_EXIT(UNOQUORUM);
643 if (!ubeacon_AmSyncSite()) /* only sync site can write */
644 ERROR_EXIT(UNOTSYNC);
646 /* Update the rest of the servers in the quorum */
648 ContactQuorum(DISK_WriteV, transPtr, 0, &transPtr->iovec_info,
649 &transPtr->iovec_data);
651 udisk_abort(transPtr);
652 ContactQuorum(DISK_Abort, transPtr, 0); /* force aborts to the others */
653 transPtr->iovec_info.iovec_wrt_len = 0;
654 transPtr->iovec_data.iovec_buf_len = 0;
658 /* Wrote the buffers out, so start at scratch again */
659 transPtr->iovec_info.iovec_wrt_len = 0;
660 transPtr->iovec_data.iovec_buf_len = 0;
663 DBRELE(transPtr->dbase);
668 ubik_Write(register struct ubik_trans *transPtr, char *buffer,
671 struct ubik_iovec *iovec;
672 afs_int32 code, error = 0;
673 afs_int32 pos, len, size;
675 if (transPtr->type != UBIK_WRITETRANS)
680 if (length > IOVEC_MAXBUF) {
681 for (pos = 0, len = length; len > 0; len -= size, pos += size) {
682 size = ((len < IOVEC_MAXBUF) ? len : IOVEC_MAXBUF);
683 code = ubik_Write(transPtr, &buffer[pos], size);
690 if (!transPtr->iovec_info.iovec_wrt_val) {
691 transPtr->iovec_info.iovec_wrt_len = 0;
692 transPtr->iovec_info.iovec_wrt_val =
693 (struct ubik_iovec *)malloc(IOVEC_MAXWRT *
694 sizeof(struct ubik_iovec));
695 transPtr->iovec_data.iovec_buf_len = 0;
696 transPtr->iovec_data.iovec_buf_val = (char *)malloc(IOVEC_MAXBUF);
697 if (!transPtr->iovec_info.iovec_wrt_val
698 || !transPtr->iovec_data.iovec_buf_val) {
699 if (transPtr->iovec_info.iovec_wrt_val)
700 free(transPtr->iovec_info.iovec_wrt_val);
701 transPtr->iovec_info.iovec_wrt_val = 0;
702 if (transPtr->iovec_data.iovec_buf_val)
703 free(transPtr->iovec_data.iovec_buf_val);
704 transPtr->iovec_data.iovec_buf_val = 0;
709 /* If this write won't fit in the structure, then flush it out and start anew */
710 if ((transPtr->iovec_info.iovec_wrt_len >= IOVEC_MAXWRT)
711 || ((length + transPtr->iovec_data.iovec_buf_len) > IOVEC_MAXBUF)) {
712 code = ubik_Flush(transPtr);
717 DBHOLD(transPtr->dbase);
718 if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
719 ERROR_EXIT(UNOQUORUM);
720 if (!ubeacon_AmSyncSite()) /* only sync site can write */
721 ERROR_EXIT(UNOTSYNC);
723 /* Write to the local disk */
725 udisk_write(transPtr, transPtr->seekFile, buffer, transPtr->seekPos,
728 udisk_abort(transPtr);
729 transPtr->iovec_info.iovec_wrt_len = 0;
730 transPtr->iovec_data.iovec_buf_len = 0;
731 DBRELE(transPtr->dbase);
735 /* Collect writes for the other ubik servers (to be done in bulk) */
736 iovec = (struct ubik_iovec *)transPtr->iovec_info.iovec_wrt_val;
737 iovec[transPtr->iovec_info.iovec_wrt_len].file = transPtr->seekFile;
738 iovec[transPtr->iovec_info.iovec_wrt_len].position = transPtr->seekPos;
739 iovec[transPtr->iovec_info.iovec_wrt_len].length = length;
741 memcpy(&transPtr->iovec_data.
742 iovec_buf_val[transPtr->iovec_data.iovec_buf_len], buffer, length);
744 transPtr->iovec_info.iovec_wrt_len++;
745 transPtr->iovec_data.iovec_buf_len += length;
746 transPtr->seekPos += length;
749 DBRELE(transPtr->dbase);
753 /* This sets the file pointer associated with the current transaction to the appropriate file and byte position. Unlike Unix files, a transaction is labelled by both a file number (fileid) and a byte position relative to the specified file (position). */
756 ubik_Seek(register struct ubik_trans *transPtr, afs_int32 fileid,
759 register afs_int32 code;
761 DBHOLD(transPtr->dbase);
762 if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
765 transPtr->seekFile = fileid;
766 transPtr->seekPos = position;
769 DBRELE(transPtr->dbase);
773 /* This call returns the file pointer associated with the specified transaction in fileid and position. */
776 ubik_Tell(register struct ubik_trans *transPtr, afs_int32 * fileid,
777 afs_int32 * position)
779 DBHOLD(transPtr->dbase);
780 *fileid = transPtr->seekFile;
781 *position = transPtr->seekPos;
782 DBRELE(transPtr->dbase);
786 /* This sets the file size for the currently-selected file to length bytes, if length is less than the file's current size. */
789 ubik_Truncate(register struct ubik_trans *transPtr, afs_int32 length)
791 afs_int32 code, error = 0;
793 /* Will also catch if not UBIK_WRITETRANS */
794 code = ubik_Flush(transPtr);
798 DBHOLD(transPtr->dbase);
799 /* first, check that quorum is still good, and that dbase is up-to-date */
800 if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
801 ERROR_EXIT(UNOQUORUM);
802 if (!ubeacon_AmSyncSite())
803 ERROR_EXIT(UNOTSYNC);
805 /* now do the operation locally, and propagate it out */
806 code = udisk_truncate(transPtr, transPtr->seekFile, length);
809 ContactQuorum(DISK_Truncate, transPtr, 0, transPtr->seekFile,
813 /* we must abort the operation */
814 udisk_abort(transPtr);
815 ContactQuorum(DISK_Abort, transPtr, 0); /* force aborts to the others */
820 DBRELE(transPtr->dbase);
824 /* set a lock; all locks are released on transaction end (commit/abort) */
826 ubik_SetLock(struct ubik_trans *atrans, afs_int32 apos, afs_int32 alen,
829 afs_int32 code = 0, error = 0;
831 if (atype == LOCKWRITE) {
832 if (atrans->type == UBIK_READTRANS)
834 code = ubik_Flush(atrans);
839 DBHOLD(atrans->dbase);
840 if (atype == LOCKREAD) {
841 code = ulock_getLock(atrans, atype, 1);
845 /* first, check that quorum is still good, and that dbase is up-to-date */
846 if (!urecovery_AllBetter(atrans->dbase, atrans->flags & TRREADANY))
847 ERROR_EXIT(UNOQUORUM);
848 if (!ubeacon_AmSyncSite())
849 ERROR_EXIT(UNOTSYNC);
851 /* now do the operation locally, and propagate it out */
852 code = ulock_getLock(atrans, atype, 1);
854 code = ContactQuorum(DISK_Lock, atrans, 0, 0, 1 /*unused */ ,
855 1 /*unused */ , LOCKWRITE);
858 /* we must abort the operation */
860 ContactQuorum(DISK_Abort, atrans, 0); /* force aborts to the others */
866 DBRELE(atrans->dbase);
870 /* utility to wait for a version # to change */
872 ubik_WaitVersion(register struct ubik_dbase *adatabase,
873 register struct ubik_version *aversion)
876 /* wait until version # changes, and then return */
877 if (vcmp(*aversion, adatabase->version) != 0)
879 LWP_WaitProcess(&adatabase->version); /* same vers, just wait */
883 /* utility to get the version of the dbase a transaction is dealing with */
885 ubik_GetVersion(register struct ubik_trans *atrans,
886 register struct ubik_version *avers)
888 *avers = atrans->dbase->version;
892 /* Facility to simplify database caching. Returns zero if last trans was done
893 on the local server and was successful. If return value is non-zero and the
894 caller is a server caching part of the Ubik database, it should invalidate
895 that cache. A return value of -1 means bad (NULL) argument. */
898 ubik_CacheUpdate(register struct ubik_trans *atrans)
900 if (!(atrans && atrans->dbase))
902 return vcmp(atrans->dbase->cachedVersion, atrans->dbase->version) != 0;
906 panic(char *a, char *b, char *c, char *d)
908 ubik_print("Ubik PANIC: ");
909 ubik_print(a, b, c, d);
911 ubik_print("BACK FROM ABORT\n"); /* shouldn't come back */
912 exit(1); /* never know, though */
916 ** This functions takes an IP addresses as its parameter. It returns the
917 ** the primary IP address that is on the host passed in.
920 ubikGetPrimaryInterfaceAddr(afs_uint32 addr)
922 struct ubik_server *ts;
925 for (ts = ubik_servers; ts; ts = ts->next)
926 for (j = 0; j < UBIK_MAX_INTERFACE_ADDR; j++)
927 if (ts->addr[j] == addr)
928 return ts->addr[0]; /* net byte order */
929 return 0; /* if not in server database, return error */