2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
16 #include <sys/types.h>
21 #include <netinet/in.h>
22 #include <sys/param.h>
35 #include <afs/cellconfig.h>
37 #define UBIK_INTERNALS
41 #define ERROR_EXIT(code) {error=(code); goto error_exit;}
43 /* This system is organized in a hierarchical set of related modules. Modules
44 at one level can only call modules at the same level or below.
46 At the bottom level (0) we have R, RFTP, LWP and IOMGR, i.e. the basic
47 operating system primitives.
49 At the next level (1) we have
51 VOTER--The module responsible for casting votes when asked. It is also
52 responsible for determining whether this server should try to become
53 a synchronization site.
55 BEACONER--The module responsible for sending keep-alives out when a
56 server is actually the sync site, or trying to become a sync site.
58 DISK--The module responsible for representing atomic transactions
59 on the local disk. It maintains a new-value only log.
61 LOCK--The module responsible for locking byte ranges in the database file.
63 At the next level (2) we have
65 RECOVERY--The module responsible for ensuring that all members of a quorum
66 have the same up-to-date database after a new synchronization site is
67 elected. This module runs only on the synchronization site.
69 At the next level (3) we have
71 REMOTE--The module responsible for interpreting requests from the sync
72 site and applying them to the database, after obtaining the appropriate
75 At the next level (4) we have
77 UBIK--The module users call to perform operations on the database.
82 afs_int32 ubik_quorum = 0;
83 struct ubik_dbase *ubik_dbase = 0;
84 struct ubik_stats ubik_stats;
85 afs_uint32 ubik_host[UBIK_MAX_INTERFACE_ADDR];
86 afs_int32 ubik_epochTime = 0;
87 afs_int32 urecovery_state = 0;
88 int (*ubik_SRXSecurityProc) ();
89 char *ubik_SRXSecurityRock;
90 struct ubik_server *ubik_servers;
91 short ubik_callPortal;
93 static int BeginTrans();
95 struct rx_securityClass *ubik_sc[3];
97 /* perform an operation at a quorum, handling error conditions. return 0 if
98 all worked, otherwise mark failing server as down and return UERROR
100 Note that if any server misses an update, we must wait BIGTIME seconds before
101 allowing the transaction to commit, to ensure that the missing and possibly still
102 functioning server times out and stop handing out old data. This is done in the commit
103 code, where we wait for a server marked down to have stayed down for BIGTIME seconds
104 before we allow a transaction to commit. A server that fails but comes back up won't give
105 out old data because it is sent the sync count along with the beacon message that
106 marks it as *really* up (beaconSinceDown).
108 #define CStampVersion 1 /* meaning set ts->version */
110 ContactQuorum(aproc, atrans, aflags, aparm0, aparm1, aparm2, aparm3, aparm4,
114 register struct ubik_trans *atrans;
115 long aparm0, aparm1, aparm2, aparm3, aparm4, aparm5;
117 register struct ubik_server *ts;
118 register afs_int32 code;
119 afs_int32 rcode, okcalls;
123 for (ts = ubik_servers; ts; ts = ts->next) {
124 /* for each server */
125 if (!ts->up || !ts->currentDB) {
126 ts->currentDB = 0; /* db is no longer current; we just missed an update */
127 continue; /* not up-to-date, don't bother */
130 (*aproc) (ts->disk_rxcid, &atrans->tid, aparm0, aparm1, aparm2,
131 aparm3, aparm4, aparm5);
132 if ((aproc == DISK_WriteV) && (code <= -450) && (code > -500)) {
133 /* An RPC interface mismatch (as defined in comerr/error_msg.c).
134 * Un-bulk the entries and do individual DISK_Write calls
135 * instead of DISK_WriteV.
137 iovec_wrt *iovec_infoP = (iovec_wrt *) aparm0;
138 iovec_buf *iovec_dataP = (iovec_buf *) aparm1;
139 struct ubik_iovec *iovec =
140 (struct ubik_iovec *)iovec_infoP->iovec_wrt_val;
141 char *iobuf = (char *)iovec_dataP->iovec_buf_val;
145 for (i = 0, offset = 0; i < iovec_infoP->iovec_wrt_len; i++) {
146 /* Sanity check for going off end of buffer */
147 if ((offset + iovec[i].length) > iovec_dataP->iovec_buf_len) {
151 tcbs.bulkdata_len = iovec[i].length;
152 tcbs.bulkdata_val = &iobuf[offset];
154 DISK_Write(ts->disk_rxcid, &atrans->tid, iovec[i].file,
155 iovec[i].position, &tcbs);
159 offset += iovec[i].length;
162 if (code) { /* failure */
164 ts->up = 0; /* mark as down now; beacons will no longer be sent */
166 ts->beaconSinceDown = 0;
167 urecovery_LostServer(); /* tell recovery to try to resend dbase later */
168 } else { /* success */
170 okcalls++; /* count up how many worked */
171 if (aflags & CStampVersion) {
172 ts->version = atrans->dbase->version;
176 /* return 0 if we successfully contacted a quorum, otherwise return error code. We don't have to contact ourselves (that was done locally) */
177 if (okcalls + 1 >= ubik_quorum)
183 /* This routine initializes the ubik system for a set of servers. It returns 0 for success, or an error code on failure. The set of servers is specified by serverList; nServers gives the number of entries in this array. Finally, dbase is the returned structure representing this instance of a ubik; it is passed to various calls below. The variable pathName provides an initial prefix used for naming storage files used by this system. It should perhaps be generalized to a low-level disk interface providing read, write, file enumeration and sync operations.
185 Note that the host named by myHost should not also be listed in serverList.
189 ubik_ServerInitCommon(afs_int32 myHost, short myPort,
190 struct afsconf_cell *info, char clones[],
191 afs_int32 serverList[], char *pathName,
192 struct ubik_dbase **dbase)
194 register struct ubik_dbase *tdb;
195 register afs_int32 code;
198 struct rx_securityClass *secClass;
200 struct rx_service *tservice;
201 extern int VOTE_ExecuteRequest(), DISK_ExecuteRequest();
202 extern void rx_ServerProc();
203 extern int rx_stackSize;
205 initialize_U_error_table();
207 tdb = (struct ubik_dbase *)malloc(sizeof(struct ubik_dbase));
208 tdb->pathName = (char *)malloc(strlen(pathName) + 1);
209 strcpy(tdb->pathName, pathName);
210 tdb->activeTrans = (struct ubik_trans *)0;
211 memset(&tdb->version, 0, sizeof(struct ubik_version));
212 memset(&tdb->cachedVersion, 0, sizeof(struct ubik_version));
213 Lock_Init(&tdb->versionLock);
215 tdb->read = uphys_read;
216 tdb->write = uphys_write;
217 tdb->truncate = uphys_truncate;
218 tdb->open = 0; /* this function isn't used any more */
219 tdb->sync = uphys_sync;
220 tdb->stat = uphys_stat;
221 tdb->getlabel = uphys_getlabel;
222 tdb->setlabel = uphys_setlabel;
223 tdb->getnfiles = uphys_getnfiles;
225 tdb->tidCounter = tdb->writeTidCounter = 0;
227 ubik_dbase = tdb; /* for now, only one db per server; can fix later when we have names for the other dbases */
230 ubik_callPortal = myPort;
231 /* try to get an additional security object */
232 ubik_sc[0] = rxnull_NewServerSecurityObject();
235 if (ubik_SRXSecurityProc) {
237 (*ubik_SRXSecurityProc) (ubik_SRXSecurityRock, &secClass,
240 ubik_sc[secIndex] = secClass;
243 /* for backwards compat this should keep working as it does now
245 code = rx_Init(myPort);
249 rx_NewService(0, VOTE_SERVICE_ID, "VOTE", ubik_sc, 3,
250 VOTE_ExecuteRequest);
251 if (tservice == (struct rx_service *)0) {
252 ubik_dprint("Could not create VOTE rx service!\n");
255 rx_SetMinProcs(tservice, 2);
256 rx_SetMaxProcs(tservice, 3);
259 rx_NewService(0, DISK_SERVICE_ID, "DISK", ubik_sc, 3,
260 DISK_ExecuteRequest);
261 if (tservice == (struct rx_service *)0) {
262 ubik_dprint("Could not create DISK rx service!\n");
265 rx_SetMinProcs(tservice, 2);
266 rx_SetMaxProcs(tservice, 3);
268 /* start an rx_ServerProc to handle incoming RPC's in particular the
269 * UpdateInterfaceAddr RPC that occurs in ubeacon_InitServerList. This avoids
270 * the "steplock" problem in ubik initialization. Defect 11037.
272 LWP_CreateProcess(rx_ServerProc, rx_stackSize, RX_PROCESS_PRIORITY,
273 (void *)0, "rx_ServerProc", &junk);
275 /* do basic initialization */
279 code = urecovery_Initialize(tdb);
283 code = ubeacon_InitServerListByInfo(myHost, info, clones);
285 code = ubeacon_InitServerList(myHost, serverList);
289 /* now start up async processes */
290 code = LWP_CreateProcess(ubeacon_Interact, 16384 /*8192 */ ,
291 LWP_MAX_PRIORITY - 1, (void *)0, "beacon",
295 code = LWP_CreateProcess(urecovery_Interact, 16384 /*8192 */ ,
296 LWP_MAX_PRIORITY - 1, (void *)0, "recovery",
302 ubik_ServerInitByInfo(afs_int32 myHost, short myPort,
303 struct afsconf_cell *info, char clones[],
304 char *pathName, struct ubik_dbase **dbase)
309 ubik_ServerInitCommon(myHost, myPort, info, clones, 0, pathName,
315 ubik_ServerInit(afs_int32 myHost, short myPort, afs_int32 serverList[],
316 char *pathName, struct ubik_dbase **dbase)
321 ubik_ServerInitCommon(myHost, myPort, (struct afsconf_cell *)0, 0,
322 serverList, pathName, dbase);
326 /* This routine begins a read or write transaction on the transaction
327 identified by transPtr, in the dbase named by dbase. An open mode of
328 ubik_READTRANS identifies this as a read transaction, while a mode of
329 ubik_WRITETRANS identifies this as a write transaction. transPtr
330 is set to the returned transaction control block. The readAny flag is
331 set to 0 or 1 by the wrapper functions ubik_BeginTrans() or
332 ubik_BeginTransReadAny() below.
334 We can only begin transaction when we have an up-to-date database.
338 BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
339 struct ubik_trans **transPtr, int readAny)
341 struct ubik_trans *jt;
342 register struct ubik_trans *tt;
343 register afs_int32 code;
344 #if defined(UBIK_PAUSE)
346 #endif /* UBIK_PAUSE */
348 if ((transMode != UBIK_READTRANS) && readAny)
351 #if defined(UBIK_PAUSE)
352 /* if we're polling the slave sites, wait until the returns
353 * are all in. Otherwise, the urecovery_CheckTid call may
356 if (transMode == UBIK_WRITETRANS)
357 for (count = 75; dbase->flags & DBVOTING; --count) {
359 #ifdef GRAND_PAUSE_DEBUGGING
362 "%ld: myport=%d: BeginTrans is waiting 'cause of voting conflict\n",
363 time(0), ntohs(ubik_callPortal));
369 "%ld: myport=%d: BeginTrans failed because of voting conflict\n",
370 time(0), ntohs(ubik_callPortal));
372 return UNOQUORUM; /* a white lie */
377 #endif /* UBIK_PAUSE */
378 if (urecovery_AllBetter(dbase, readAny) == 0) {
382 /* otherwise we have a quorum, use it */
384 /* make sure that at most one write transaction occurs at any one time. This
385 * has nothing to do with transaction locking; that's enforced by the lock package. However,
386 * we can't even handle two non-conflicting writes, since our log and recovery modules
387 * don't know how to restore one without possibly picking up some data from the other. */
388 if (transMode == UBIK_WRITETRANS) {
389 /* if we're writing already, wait */
390 while (dbase->flags & DBWRITING) {
392 LWP_WaitProcess(&dbase->flags);
395 if (!ubeacon_AmSyncSite()) {
401 /* create the transaction */
402 code = udisk_begin(dbase, transMode, &jt); /* can't take address of register var */
403 tt = jt; /* move to a register */
404 if (code || tt == (struct ubik_trans *)NULL) {
409 tt->flags |= TRREADANY;
410 /* label trans and dbase with new tid */
411 tt->tid.epoch = ubik_epochTime;
412 /* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */
413 tt->tid.counter = (dbase->tidCounter += 2);
415 if (transMode == UBIK_WRITETRANS) {
416 /* for a write trans, we have to keep track of the write tid counter too */
417 #if defined(UBIK_PAUSE)
418 dbase->writeTidCounter = tt->tid.counter;
420 dbase->writeTidCounter += 2;
421 #endif /* UBIK_PAUSE */
423 /* next try to start transaction on appropriate number of machines */
424 code = ContactQuorum(DISK_Begin, tt, 0);
426 /* we must abort the operation */
428 ContactQuorum(DISK_Abort, tt, 0); /* force aborts to the others */
441 ubik_BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
442 struct ubik_trans **transPtr)
444 return BeginTrans(dbase, transMode, transPtr, 0);
448 ubik_BeginTransReadAny(register struct ubik_dbase *dbase, afs_int32 transMode,
449 struct ubik_trans **transPtr)
451 return BeginTrans(dbase, transMode, transPtr, 1);
454 /* this routine ends a read or write transaction by aborting it */
456 ubik_AbortTrans(register struct ubik_trans *transPtr)
458 register afs_int32 code;
460 register struct ubik_dbase *dbase;
462 dbase = transPtr->dbase;
464 memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
465 /* see if we're still up-to-date */
466 if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
467 udisk_abort(transPtr);
473 if (transPtr->type == UBIK_READTRANS) {
474 code = udisk_abort(transPtr);
480 /* below here, we know we're doing a write transaction */
481 if (!ubeacon_AmSyncSite()) {
482 udisk_abort(transPtr);
488 /* now it is safe to try remote abort */
489 code = ContactQuorum(DISK_Abort, transPtr, 0);
490 code2 = udisk_abort(transPtr);
493 return (code ? code : code2);
496 /* This routine ends a read or write transaction on the open transaction identified by transPtr. It returns an error code. */
498 ubik_EndTrans(register struct ubik_trans *transPtr)
500 register afs_int32 code;
503 register struct ubik_server *ts;
505 register struct ubik_dbase *dbase;
507 if (transPtr->type == UBIK_WRITETRANS) {
508 code = ubik_Flush(transPtr);
510 ubik_AbortTrans(transPtr);
515 dbase = transPtr->dbase;
517 memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
519 /* give up if no longer current */
520 if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
521 udisk_abort(transPtr);
527 if (transPtr->type == UBIK_READTRANS) { /* reads are easy */
528 code = udisk_commit(transPtr);
530 goto success; /* update cachedVersion correctly */
536 if (!ubeacon_AmSyncSite()) { /* no longer sync site */
537 udisk_abort(transPtr);
543 /* now it is safe to do commit */
544 code = udisk_commit(transPtr);
546 code = ContactQuorum(DISK_Commit, transPtr, CStampVersion);
548 /* failed to commit, so must return failure. Try to clear locks first, just for fun
549 * Note that we don't know if this transaction will eventually commit at this point.
550 * If it made it to a site that will be present in the next quorum, we win, otherwise
551 * we lose. If we contact a majority of sites, then we won't be here: contacting
552 * a majority guarantees commit, since it guarantees that one dude will be a
553 * member of the next quorum. */
554 ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
559 /* before we can start sending unlock messages, we must wait until all servers
560 * that are possibly still functioning on the other side of a network partition
561 * have timed out. Check the server structures, compute how long to wait, then
562 * start the unlocks */
563 realStart = FT_ApproxTime();
565 /* wait for all servers to time out */
567 now = FT_ApproxTime();
568 /* check if we're still sync site, the guy should either come up
569 * to us, or timeout. Put safety check in anyway */
570 if (now - realStart > 10 * BIGTIME) {
571 ubik_stats.escapes++;
572 ubik_print("ubik escaping from commit wait\n");
575 for (ts = ubik_servers; ts; ts = ts->next) {
576 if (!ts->beaconSinceDown && now <= ts->lastBeaconSent + BIGTIME) {
577 /* this guy could have some damaged data, wait for him */
579 tv.tv_sec = 1; /* try again after a while (ha ha) */
581 IOMGR_Select(0, 0, 0, 0, &tv); /* poll, should we wait on something? */
586 break; /* no down ones still pseudo-active */
589 /* finally, unlock all the dudes. We can return success independent of the number of servers
590 * that really unlock the dbase; the others will do it if/when they elect a new sync site.
591 * The transaction is committed anyway, since we succeeded in contacting a quorum
592 * at the start (when invoking the DiskCommit function).
594 ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
598 /* update version on successful EndTrans */
599 memcpy(&dbase->cachedVersion, &dbase->version,
600 sizeof(struct ubik_version));
606 /* This routine reads length bytes into buffer from the current position in the database. The file pointer is updated appropriately (by adding the number of bytes actually transferred), and the length actually transferred is stored in the long integer pointed to by length. Note that *length is an INOUT parameter: at the start it represents the size of the buffer, and when done, it contains the number of bytes actually transferred. A short read returns zero for an error code. */
609 ubik_Read(register struct ubik_trans *transPtr, char *buffer,
612 register afs_int32 code;
614 /* reads are easy to do: handle locally */
615 DBHOLD(transPtr->dbase);
616 if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
617 DBRELE(transPtr->dbase);
622 udisk_read(transPtr, transPtr->seekFile, buffer, transPtr->seekPos,
625 transPtr->seekPos += length;
627 DBRELE(transPtr->dbase);
631 /* This routine will flush the io data in the iovec structures. It first
632 * flushes to the local disk and then uses ContactQuorum to write it to
636 ubik_Flush(struct ubik_trans *transPtr)
638 afs_int32 code, error = 0;
640 if (transPtr->type != UBIK_WRITETRANS)
642 if (!transPtr->iovec_info.iovec_wrt_len
643 || !transPtr->iovec_info.iovec_wrt_val)
646 DBHOLD(transPtr->dbase);
647 if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
648 ERROR_EXIT(UNOQUORUM);
649 if (!ubeacon_AmSyncSite()) /* only sync site can write */
650 ERROR_EXIT(UNOTSYNC);
652 /* Update the rest of the servers in the quorum */
654 ContactQuorum(DISK_WriteV, transPtr, 0, &transPtr->iovec_info,
655 &transPtr->iovec_data);
657 udisk_abort(transPtr);
658 ContactQuorum(DISK_Abort, transPtr, 0); /* force aborts to the others */
659 transPtr->iovec_info.iovec_wrt_len = 0;
660 transPtr->iovec_data.iovec_buf_len = 0;
664 /* Wrote the buffers out, so start at scratch again */
665 transPtr->iovec_info.iovec_wrt_len = 0;
666 transPtr->iovec_data.iovec_buf_len = 0;
669 DBRELE(transPtr->dbase);
674 ubik_Write(register struct ubik_trans *transPtr, char *buffer,
677 struct ubik_iovec *iovec;
678 afs_int32 code, error = 0;
679 afs_int32 pos, len, size;
681 if (transPtr->type != UBIK_WRITETRANS)
686 if (length > IOVEC_MAXBUF) {
687 for (pos = 0, len = length; len > 0; len -= size, pos += size) {
688 size = ((len < IOVEC_MAXBUF) ? len : IOVEC_MAXBUF);
689 code = ubik_Write(transPtr, &buffer[pos], size);
696 if (!transPtr->iovec_info.iovec_wrt_val) {
697 transPtr->iovec_info.iovec_wrt_len = 0;
698 transPtr->iovec_info.iovec_wrt_val =
699 (struct ubik_iovec *)malloc(IOVEC_MAXWRT *
700 sizeof(struct ubik_iovec));
701 transPtr->iovec_data.iovec_buf_len = 0;
702 transPtr->iovec_data.iovec_buf_val = (char *)malloc(IOVEC_MAXBUF);
703 if (!transPtr->iovec_info.iovec_wrt_val
704 || !transPtr->iovec_data.iovec_buf_val) {
705 if (transPtr->iovec_info.iovec_wrt_val)
706 free(transPtr->iovec_info.iovec_wrt_val);
707 transPtr->iovec_info.iovec_wrt_val = 0;
708 if (transPtr->iovec_data.iovec_buf_val)
709 free(transPtr->iovec_data.iovec_buf_val);
710 transPtr->iovec_data.iovec_buf_val = 0;
715 /* If this write won't fit in the structure, then flush it out and start anew */
716 if ((transPtr->iovec_info.iovec_wrt_len >= IOVEC_MAXWRT)
717 || ((length + transPtr->iovec_data.iovec_buf_len) > IOVEC_MAXBUF)) {
718 code = ubik_Flush(transPtr);
723 DBHOLD(transPtr->dbase);
724 if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
725 ERROR_EXIT(UNOQUORUM);
726 if (!ubeacon_AmSyncSite()) /* only sync site can write */
727 ERROR_EXIT(UNOTSYNC);
729 /* Write to the local disk */
731 udisk_write(transPtr, transPtr->seekFile, buffer, transPtr->seekPos,
734 udisk_abort(transPtr);
735 transPtr->iovec_info.iovec_wrt_len = 0;
736 transPtr->iovec_data.iovec_buf_len = 0;
737 DBRELE(transPtr->dbase);
741 /* Collect writes for the other ubik servers (to be done in bulk) */
742 iovec = (struct ubik_iovec *)transPtr->iovec_info.iovec_wrt_val;
743 iovec[transPtr->iovec_info.iovec_wrt_len].file = transPtr->seekFile;
744 iovec[transPtr->iovec_info.iovec_wrt_len].position = transPtr->seekPos;
745 iovec[transPtr->iovec_info.iovec_wrt_len].length = length;
747 memcpy(&transPtr->iovec_data.
748 iovec_buf_val[transPtr->iovec_data.iovec_buf_len], buffer, length);
750 transPtr->iovec_info.iovec_wrt_len++;
751 transPtr->iovec_data.iovec_buf_len += length;
752 transPtr->seekPos += length;
755 DBRELE(transPtr->dbase);
759 /* This sets the file pointer associated with the current transaction to the appropriate file and byte position. Unlike Unix files, a transaction is labelled by both a file number (fileid) and a byte position relative to the specified file (position). */
762 ubik_Seek(register struct ubik_trans *transPtr, afs_int32 fileid,
765 register afs_int32 code;
767 DBHOLD(transPtr->dbase);
768 if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
771 transPtr->seekFile = fileid;
772 transPtr->seekPos = position;
775 DBRELE(transPtr->dbase);
779 /* This call returns the file pointer associated with the specified transaction in fileid and position. */
782 ubik_Tell(register struct ubik_trans *transPtr, afs_int32 * fileid,
783 afs_int32 * position)
785 DBHOLD(transPtr->dbase);
786 *fileid = transPtr->seekFile;
787 *position = transPtr->seekPos;
788 DBRELE(transPtr->dbase);
792 /* This sets the file size for the currently-selected file to length bytes, if length is less than the file's current size. */
795 ubik_Truncate(register struct ubik_trans *transPtr, afs_int32 length)
797 afs_int32 code, error = 0;
799 /* Will also catch if not UBIK_WRITETRANS */
800 code = ubik_Flush(transPtr);
804 DBHOLD(transPtr->dbase);
805 /* first, check that quorum is still good, and that dbase is up-to-date */
806 if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
807 ERROR_EXIT(UNOQUORUM);
808 if (!ubeacon_AmSyncSite())
809 ERROR_EXIT(UNOTSYNC);
811 /* now do the operation locally, and propagate it out */
812 code = udisk_truncate(transPtr, transPtr->seekFile, length);
815 ContactQuorum(DISK_Truncate, transPtr, 0, transPtr->seekFile,
819 /* we must abort the operation */
820 udisk_abort(transPtr);
821 ContactQuorum(DISK_Abort, transPtr, 0); /* force aborts to the others */
826 DBRELE(transPtr->dbase);
830 /* set a lock; all locks are released on transaction end (commit/abort) */
832 ubik_SetLock(struct ubik_trans *atrans, afs_int32 apos, afs_int32 alen,
835 afs_int32 code = 0, error = 0;
837 if (atype == LOCKWRITE) {
838 if (atrans->type == UBIK_READTRANS)
840 code = ubik_Flush(atrans);
845 DBHOLD(atrans->dbase);
846 if (atype == LOCKREAD) {
847 code = ulock_getLock(atrans, atype, 1);
851 /* first, check that quorum is still good, and that dbase is up-to-date */
852 if (!urecovery_AllBetter(atrans->dbase, atrans->flags & TRREADANY))
853 ERROR_EXIT(UNOQUORUM);
854 if (!ubeacon_AmSyncSite())
855 ERROR_EXIT(UNOTSYNC);
857 /* now do the operation locally, and propagate it out */
858 code = ulock_getLock(atrans, atype, 1);
860 code = ContactQuorum(DISK_Lock, atrans, 0, 0, 1 /*unused */ ,
861 1 /*unused */ , LOCKWRITE);
864 /* we must abort the operation */
866 ContactQuorum(DISK_Abort, atrans, 0); /* force aborts to the others */
872 DBRELE(atrans->dbase);
876 /* utility to wait for a version # to change */
878 ubik_WaitVersion(register struct ubik_dbase *adatabase,
879 register struct ubik_version *aversion)
882 /* wait until version # changes, and then return */
883 if (vcmp(*aversion, adatabase->version) != 0)
885 LWP_WaitProcess(&adatabase->version); /* same vers, just wait */
889 /* utility to get the version of the dbase a transaction is dealing with */
891 ubik_GetVersion(register struct ubik_trans *atrans,
892 register struct ubik_version *avers)
894 *avers = atrans->dbase->version;
898 /* Facility to simplify database caching. Returns zero if last trans was done
899 on the local server and was successful. If return value is non-zero and the
900 caller is a server caching part of the Ubik database, it should invalidate
901 that cache. A return value of -1 means bad (NULL) argument. */
904 ubik_CacheUpdate(register struct ubik_trans *atrans)
906 if (!(atrans && atrans->dbase))
908 return vcmp(atrans->dbase->cachedVersion, atrans->dbase->version) != 0;
912 panic(char *a, char *b, char *c, char *d)
914 ubik_print("Ubik PANIC: ");
915 ubik_print(a, b, c, d);
917 ubik_print("BACK FROM ABORT\n"); /* shouldn't come back */
918 exit(1); /* never know, though */
922 ** This functions takes an IP addresses as its parameter. It returns the
923 ** the primary IP address that is on the host passed in.
926 ubikGetPrimaryInterfaceAddr(afs_uint32 addr)
928 struct ubik_server *ts;
931 for (ts = ubik_servers; ts; ts = ts->next)
932 for (j = 0; j < UBIK_MAX_INTERFACE_ADDR; j++)
933 if (ts->addr[j] == addr)
934 return ts->addr[0]; /* net byte order */
935 return 0; /* if not in server database, return error */