2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
15 #include <sys/types.h>
20 #include <netinet/in.h>
21 #include <sys/param.h>
27 #include <afs/cellconfig.h>
29 #define UBIK_INTERNALS
33 #define ERROR_EXIT(code) {error=(code); goto error_exit;}
35 /* This system is organized in a hierarchical set of related modules. Modules
36 at one level can only call modules at the same level or below.
38 At the bottom level (0) we have R, RFTP, LWP and IOMGR, i.e. the basic
39 operating system primitives.
41 At the next level (1) we have
43 VOTER--The module responsible for casting votes when asked. It is also
44 responsible for determining whether this server should try to become
45 a synchronization site.
47 BEACONER--The module responsible for sending keep-alives out when a
48 server is actually the sync site, or trying to become a sync site.
50 DISK--The module responsible for representing atomic transactions
51 on the local disk. It maintains a new-value only log.
53 LOCK--The module responsible for locking byte ranges in the database file.
55 At the next level (2) we have
57 RECOVERY--The module responsible for ensuring that all members of a quorum
58 have the same up-to-date database after a new synchronization site is
59 elected. This module runs only on the synchronization site.
61 At the next level (3) we have
63 REMOTE--The module responsible for interpreting requests from the sync
64 site and applying them to the database, after obtaining the appropriate
67 At the next level (4) we have
69 UBIK--The module users call to perform operations on the database.
74 afs_int32 ubik_quorum=0;
75 struct ubik_dbase *ubik_dbase=0;
76 struct ubik_stats ubik_stats;
77 afs_uint32 ubik_host[UBIK_MAX_INTERFACE_ADDR];
78 afs_int32 ubik_epochTime = 0;
79 afs_int32 urecovery_state = 0;
80 int (*ubik_SRXSecurityProc)();
81 char *ubik_SRXSecurityRock;
82 struct ubik_server *ubik_servers;
83 short ubik_callPortal;
85 static int BeginTrans();
87 struct rx_securityClass *ubik_sc[3];
89 /* perform an operation at a quorum, handling error conditions. return 0 if
90 all worked, otherwise mark failing server as down and return UERROR
92 Note that if any server misses an update, we must wait BIGTIME seconds before
93 allowing the transaction to commit, to ensure that the missing and possibly still
94 functioning server times out and stop handing out old data. This is done in the commit
95 code, where we wait for a server marked down to have stayed down for BIGTIME seconds
96 before we allow a transaction to commit. A server that fails but comes back up won't give
97 out old data because it is sent the sync count along with the beacon message that
98 marks it as *really* up (beaconSinceDown).
100 #define CStampVersion 1 /* meaning set ts->version */
101 afs_int32 ContactQuorum(aproc, atrans, aflags, aparm0, aparm1, aparm2, aparm3, aparm4, aparm5)
104 register struct ubik_trans *atrans;
105 long aparm0, aparm1, aparm2, aparm3, aparm4, aparm5; {
106 register struct ubik_server *ts;
107 register afs_int32 code;
108 afs_int32 rcode, okcalls;
112 for(ts = ubik_servers; ts; ts=ts->next) {
113 /* for each server */
114 if (!ts->up || !ts->currentDB) {
115 ts->currentDB = 0; /* db is no longer current; we just missed an update */
116 continue; /* not up-to-date, don't bother */
118 code = (*aproc) (ts->disk_rxcid, &atrans->tid, aparm0, aparm1, aparm2, aparm3, aparm4, aparm5);
119 if ( (aproc == DISK_WriteV) && (code <= -450) && (code > -500) ) {
120 /* An RPC interface mismatch (as defined in comerr/error_msg.c).
121 * Un-bulk the entries and do individual DISK_Write calls
122 * instead of DISK_WriteV.
124 iovec_wrt *iovec_infoP = (iovec_wrt *)aparm0;
125 iovec_buf *iovec_dataP = (iovec_buf *)aparm1;
126 struct ubik_iovec *iovec = (struct ubik_iovec *)iovec_infoP->iovec_wrt_val;
127 char *iobuf = (char *)iovec_dataP->iovec_buf_val;
131 for (i=0, offset=0; i<iovec_infoP->iovec_wrt_len; i++) {
132 /* Sanity check for going off end of buffer */
133 if ((offset + iovec[i].length) > iovec_dataP->iovec_buf_len) {
137 tcbs.bulkdata_len = iovec[i].length;
138 tcbs.bulkdata_val = &iobuf[offset];
139 code = DISK_Write(ts->disk_rxcid, &atrans->tid,
140 iovec[i].file, iovec[i].position, &tcbs);
143 offset += iovec[i].length;
146 if (code) { /* failure */
148 ts->up = 0; /* mark as down now; beacons will no longer be sent */
150 ts->beaconSinceDown = 0;
151 urecovery_LostServer(); /* tell recovery to try to resend dbase later */
152 } else { /* success */
154 okcalls++; /* count up how many worked */
155 if (aflags & CStampVersion) {
156 ts->version = atrans->dbase->version;
160 /* return 0 if we successfully contacted a quorum, otherwise return error code. We don't have to contact ourselves (that was done locally) */
161 if (okcalls+1 >= ubik_quorum) return 0;
165 /* This routine initializes the ubik system for a set of servers. It returns 0 for success, or an error code on failure. The set of servers is specified by serverList; nServers gives the number of entries in this array. Finally, dbase is the returned structure representing this instance of a ubik; it is passed to various calls below. The variable pathName provides an initial prefix used for naming storage files used by this system. It should perhaps be generalized to a low-level disk interface providing read, write, file enumeration and sync operations.
167 Note that the host named by myHost should not also be listed in serverList.
170 int ubik_ServerInitByInfo(myHost, myPort, info, clones, pathName, dbase)
171 struct afsconf_cell *info; /* in */
175 char *pathName; /* in */
176 struct ubik_dbase **dbase; /* out */
180 code = ubik_ServerInitCommon(myHost, myPort, info, clones, 0, pathName, dbase);
184 int ubik_ServerInit(myHost, myPort, serverList, pathName, dbase)
185 afs_int32 serverList[]; /* in */
188 char *pathName; /* in */
189 struct ubik_dbase **dbase; /* out */
193 code = ubik_ServerInitCommon(myHost, myPort, (struct afsconf_cell *)0, 0,
194 serverList, pathName, dbase);
198 int ubik_ServerInitCommon(myHost, myPort, info, clones, serverList, pathName, dbase)
201 struct afsconf_cell *info; /* in */
203 afs_int32 serverList[]; /* in */
204 char *pathName; /* in */
205 struct ubik_dbase **dbase; /* out */
207 register struct ubik_dbase *tdb;
208 register afs_int32 code;
211 struct rx_securityClass *secClass;
213 struct rx_service *tservice;
214 extern struct rx_securityClass *rxnull_NewServerSecurityObject();
215 extern int VOTE_ExecuteRequest(), DISK_ExecuteRequest();
216 extern void rx_ServerProc();
217 extern int rx_stackSize;
219 initialize_U_error_table();
221 tdb = (struct ubik_dbase *) malloc(sizeof(struct ubik_dbase));
222 tdb->pathName = (char *) malloc(strlen(pathName)+1);
223 strcpy(tdb->pathName, pathName);
224 tdb->activeTrans = (struct ubik_trans *) 0;
225 memset(&tdb->version, 0, sizeof(struct ubik_version));
226 memset(&tdb->cachedVersion, 0, sizeof(struct ubik_version));
227 Lock_Init(&tdb->versionLock);
229 tdb->read = uphys_read;
230 tdb->write = uphys_write;
231 tdb->truncate = uphys_truncate;
232 tdb->open = 0; /* this function isn't used any more */
233 tdb->sync = uphys_sync;
234 tdb->stat = uphys_stat;
235 tdb->getlabel = uphys_getlabel;
236 tdb->setlabel = uphys_setlabel;
237 tdb->getnfiles = uphys_getnfiles;
239 tdb->tidCounter=tdb->writeTidCounter=0;
241 ubik_dbase = tdb; /* for now, only one db per server; can fix later when we have names for the other dbases */
244 ubik_callPortal = myPort;
245 /* try to get an additional security object */
246 ubik_sc[0] = rxnull_NewServerSecurityObject();
249 if (ubik_SRXSecurityProc) {
250 code = (*ubik_SRXSecurityProc)(ubik_SRXSecurityRock, &secClass, &secIndex);
252 ubik_sc[secIndex] = secClass;
255 code = rx_Init(myPort);
256 if (code < 0) return code;
257 tservice = rx_NewService(0, VOTE_SERVICE_ID, "VOTE", ubik_sc, 3, VOTE_ExecuteRequest);
258 if (tservice == (struct rx_service *)0) {
259 ubik_dprint("Could not create VOTE rx service!\n");
262 rx_SetMinProcs(tservice, 2);
263 rx_SetMaxProcs(tservice, 3);
265 tservice = rx_NewService(0, DISK_SERVICE_ID, "DISK", ubik_sc, 3, DISK_ExecuteRequest);
266 if (tservice == (struct rx_service *)0) {
267 ubik_dprint("Could not create DISK rx service!\n");
270 rx_SetMinProcs(tservice, 2);
271 rx_SetMaxProcs(tservice, 3);
273 /* start an rx_ServerProc to handle incoming RPC's in particular the
274 * UpdateInterfaceAddr RPC that occurs in ubeacon_InitServerList. This avoids
275 * the "steplock" problem in ubik initialization. Defect 11037.
277 LWP_CreateProcess(rx_ServerProc, rx_stackSize, RX_PROCESS_PRIORITY,
278 0, "rx_ServerProc", &junk);
280 /* do basic initialization */
282 if (code) return code;
283 code = urecovery_Initialize(tdb);
284 if (code) return code;
286 code = ubeacon_InitServerListByInfo(myHost, info, clones);
288 code = ubeacon_InitServerList(myHost, serverList);
289 if (code) return code;
291 /* now start up async processes */
292 code = LWP_CreateProcess(ubeacon_Interact, 16384/*8192*/, LWP_MAX_PRIORITY-1,
294 if (code) return code;
295 code = LWP_CreateProcess(urecovery_Interact, 16384/*8192*/, LWP_MAX_PRIORITY-1,
296 0, "recovery", &junk);
300 /* This routine begins a read or write transaction on the transaction
301 identified by transPtr, in the dbase named by dbase. An open mode of
302 ubik_READTRANS identifies this as a read transaction, while a mode of
303 ubik_WRITETRANS identifies this as a write transaction. transPtr
304 is set to the returned transaction control block. The readAny flag is
305 set to 0 or 1 by the wrapper functions ubik_BeginTrans() or
306 ubik_BeginTransReadAny() below.
308 We can only begin transaction when we have an up-to-date database.
311 static int BeginTrans(dbase, transMode, transPtr, readAny)
312 register struct ubik_dbase *dbase; /* in */
314 afs_int32 transMode; /* in */
315 struct ubik_trans **transPtr; /* out */ {
316 struct ubik_trans *jt;
317 register struct ubik_trans *tt;
318 register afs_int32 code;
320 if ((transMode != UBIK_READTRANS) && readAny) return UBADTYPE;
322 if (urecovery_AllBetter(dbase, readAny)==0) {
326 /* otherwise we have a quorum, use it */
328 /* make sure that at most one write transaction occurs at any one time. This
329 has nothing to do with transaction locking; that's enforced by the lock package. However,
330 we can't even handle two non-conflicting writes, since our log and recovery modules
331 don't know how to restore one without possibly picking up some data from the other. */
332 if (transMode == UBIK_WRITETRANS) {
333 /* if we're writing already, wait */
334 while(dbase->flags & DBWRITING) {
336 LWP_WaitProcess(&dbase->flags);
339 if (!ubeacon_AmSyncSite()) {
345 /* create the transaction */
346 code = udisk_begin(dbase, transMode, &jt); /* can't take address of register var */
347 tt = jt; /* move to a register */
348 if (code || tt == (struct ubik_trans *)NULL) {
352 if (readAny) tt->flags |= TRREADANY;
353 /* label trans and dbase with new tid */
354 tt->tid.epoch = ubik_epochTime;
355 /* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */
356 tt->tid.counter = (dbase->tidCounter += 2);
358 if (transMode == UBIK_WRITETRANS) {
359 /* for a write trans, we have to keep track of the write tid counter too */
360 dbase->writeTidCounter += 2;
362 /* next try to start transaction on appropriate number of machines */
363 code = ContactQuorum(DISK_Begin, tt, 0);
365 /* we must abort the operation */
367 ContactQuorum(DISK_Abort, tt, 0); /* force aborts to the others */
379 int ubik_BeginTrans(dbase, transMode, transPtr)
380 register struct ubik_dbase *dbase; /* in */
381 afs_int32 transMode; /* in */
382 struct ubik_trans **transPtr; /* out */ {
383 return BeginTrans(dbase, transMode, transPtr, 0);
386 int ubik_BeginTransReadAny(dbase, transMode, transPtr)
387 register struct ubik_dbase *dbase; /* in */
388 afs_int32 transMode; /* in */
389 struct ubik_trans **transPtr; /* out */ {
390 return BeginTrans(dbase, transMode, transPtr, 1);
393 /* this routine ends a read or write transaction by aborting it */
394 int ubik_AbortTrans(transPtr)
395 register struct ubik_trans *transPtr; /* in */ {
396 register afs_int32 code;
398 register struct ubik_dbase *dbase;
400 dbase = transPtr->dbase;
402 memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
403 /* see if we're still up-to-date */
404 if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
405 udisk_abort(transPtr);
411 if (transPtr->type == UBIK_READTRANS) {
412 code = udisk_abort(transPtr);
418 /* below here, we know we're doing a write transaction */
419 if (!ubeacon_AmSyncSite()) {
420 udisk_abort(transPtr);
426 /* now it is safe to try remote abort */
427 code = ContactQuorum(DISK_Abort, transPtr, 0);
428 code2 = udisk_abort(transPtr);
431 return (code? code : code2);
434 /* This routine ends a read or write transaction on the open transaction identified by transPtr. It returns an error code. */
435 int ubik_EndTrans(transPtr)
436 register struct ubik_trans *transPtr; /* in */ {
437 register afs_int32 code;
440 register struct ubik_server *ts;
442 register struct ubik_dbase *dbase;
444 if (transPtr->type == UBIK_WRITETRANS) {
445 code = ubik_Flush(transPtr);
447 ubik_AbortTrans(transPtr);
452 dbase = transPtr->dbase;
454 memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
456 /* give up if no longer current */
457 if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
458 udisk_abort(transPtr);
464 if (transPtr->type == UBIK_READTRANS) { /* reads are easy */
465 code = udisk_commit(transPtr);
466 if (code == 0) goto success; /* update cachedVersion correctly */
472 if (!ubeacon_AmSyncSite()) { /* no longer sync site */
473 udisk_abort(transPtr);
479 /* now it is safe to do commit */
480 code = udisk_commit(transPtr);
481 if (code == 0) code = ContactQuorum(DISK_Commit, transPtr, CStampVersion);
483 /* failed to commit, so must return failure. Try to clear locks first, just for fun
484 Note that we don't know if this transaction will eventually commit at this point.
485 If it made it to a site that will be present in the next quorum, we win, otherwise
486 we lose. If we contact a majority of sites, then we won't be here: contacting
487 a majority guarantees commit, since it guarantees that one dude will be a
488 member of the next quorum. */
489 ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
494 /* before we can start sending unlock messages, we must wait until all servers
495 that are possibly still functioning on the other side of a network partition
496 have timed out. Check the server structures, compute how long to wait, then
498 realStart = FT_ApproxTime();
500 /* wait for all servers to time out */
502 now = FT_ApproxTime();
503 /* check if we're still sync site, the guy should either come up
504 to us, or timeout. Put safety check in anyway */
505 if (now - realStart > 10 * BIGTIME) {
506 ubik_stats.escapes++;
507 ubik_print("ubik escaping from commit wait\n");
510 for(ts = ubik_servers; ts; ts=ts->next) {
511 if (!ts->beaconSinceDown && now <= ts->lastBeaconSent + BIGTIME) {
512 /* this guy could have some damaged data, wait for him */
514 tv.tv_sec = 1; /* try again after a while (ha ha) */
516 IOMGR_Select(0, 0, 0, 0, &tv); /* poll, should we wait on something? */
520 if (code == 0) break; /* no down ones still pseudo-active */
523 /* finally, unlock all the dudes. We can return success independent of the number of servers
524 that really unlock the dbase; the others will do it if/when they elect a new sync site.
525 The transaction is committed anyway, since we succeeded in contacting a quorum
526 at the start (when invoking the DiskCommit function).
528 ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
532 /* update version on successful EndTrans */
533 memcpy(&dbase->cachedVersion, &dbase->version, sizeof(struct ubik_version));
539 /* This routine reads length bytes into buffer from the current position in the database. The file pointer is updated appropriately (by adding the number of bytes actually transferred), and the length actually transferred is stored in the long integer pointed to by length. Note that *length is an INOUT parameter: at the start it represents the size of the buffer, and when done, it contains the number of bytes actually transferred. A short read returns zero for an error code. */
541 int ubik_Read(transPtr, buffer, length)
542 register struct ubik_trans *transPtr; /* in */
543 char *buffer; /* in */
544 afs_int32 length; /* in */ {
545 register afs_int32 code;
547 /* reads are easy to do: handle locally */
548 DBHOLD(transPtr->dbase);
549 if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
550 DBRELE(transPtr->dbase);
554 code = udisk_read(transPtr, transPtr->seekFile, buffer, transPtr->seekPos, length);
556 transPtr->seekPos += length;
558 DBRELE(transPtr->dbase);
562 /* This routine will flush the io data in the iovec structures. It first
563 * flushes to the local disk and then uses ContactQuorum to write it to
566 int ubik_Flush(transPtr)
567 struct ubik_trans *transPtr;
569 afs_int32 code, error=0;
571 if (transPtr->type != UBIK_WRITETRANS)
573 if (!transPtr->iovec_info.iovec_wrt_len || !transPtr->iovec_info.iovec_wrt_val)
576 DBHOLD(transPtr->dbase);
577 if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
578 ERROR_EXIT(UNOQUORUM);
579 if (!ubeacon_AmSyncSite()) /* only sync site can write */
580 ERROR_EXIT(UNOTSYNC);
582 /* Update the rest of the servers in the quorum */
583 code = ContactQuorum(DISK_WriteV, transPtr, 0,
584 &transPtr->iovec_info, &transPtr->iovec_data);
586 udisk_abort(transPtr);
587 ContactQuorum(DISK_Abort, transPtr, 0); /* force aborts to the others */
588 transPtr->iovec_info.iovec_wrt_len = 0;
589 transPtr->iovec_data.iovec_buf_len = 0;
593 /* Wrote the buffers out, so start at scratch again */
594 transPtr->iovec_info.iovec_wrt_len = 0;
595 transPtr->iovec_data.iovec_buf_len = 0;
598 DBRELE(transPtr->dbase);
602 int ubik_Write(transPtr, buffer, length)
603 register struct ubik_trans *transPtr; /* in */
604 char *buffer; /* in */
605 afs_int32 length; /* in */
607 struct ubik_iovec *iovec;
608 afs_int32 code, error=0;
609 afs_int32 pos, len, size;
611 if (transPtr->type != UBIK_WRITETRANS)
616 if (length > IOVEC_MAXBUF) {
617 for (pos=0, len=length; len>0; len-=size, pos+=size) {
618 size = ((len < IOVEC_MAXBUF) ? len : IOVEC_MAXBUF);
619 code = ubik_Write(transPtr, &buffer[pos], size);
620 if (code) return (code);
625 if (!transPtr->iovec_info.iovec_wrt_val) {
626 transPtr->iovec_info.iovec_wrt_len = 0;
627 transPtr->iovec_info.iovec_wrt_val =
628 (struct ubik_iovec *)malloc(IOVEC_MAXWRT*sizeof(struct ubik_iovec));
629 transPtr->iovec_data.iovec_buf_len = 0;
630 transPtr->iovec_data.iovec_buf_val = (char *)malloc(IOVEC_MAXBUF);
631 if (!transPtr->iovec_info.iovec_wrt_val || !transPtr->iovec_data.iovec_buf_val) {
632 if (transPtr->iovec_info.iovec_wrt_val) free(transPtr->iovec_info.iovec_wrt_val);
633 transPtr->iovec_info.iovec_wrt_val = 0;
634 if (transPtr->iovec_data.iovec_buf_val) free(transPtr->iovec_data.iovec_buf_val);
635 transPtr->iovec_data.iovec_buf_val = 0;
640 /* If this write won't fit in the structure, then flush it out and start anew */
641 if ( (transPtr->iovec_info.iovec_wrt_len >= IOVEC_MAXWRT) ||
642 ((length + transPtr->iovec_data.iovec_buf_len) > IOVEC_MAXBUF) ) {
643 code = ubik_Flush(transPtr);
644 if (code) return (code);
647 DBHOLD(transPtr->dbase);
648 if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
649 ERROR_EXIT(UNOQUORUM);
650 if (!ubeacon_AmSyncSite()) /* only sync site can write */
651 ERROR_EXIT(UNOTSYNC);
653 /* Write to the local disk */
654 code = udisk_write(transPtr, transPtr->seekFile, buffer,
655 transPtr->seekPos, length);
657 udisk_abort(transPtr);
658 transPtr->iovec_info.iovec_wrt_len = 0;
659 transPtr->iovec_data.iovec_buf_len = 0;
660 DBRELE(transPtr->dbase);
664 /* Collect writes for the other ubik servers (to be done in bulk) */
665 iovec = (struct ubik_iovec *)transPtr->iovec_info.iovec_wrt_val;
666 iovec[transPtr->iovec_info.iovec_wrt_len].file = transPtr->seekFile;
667 iovec[transPtr->iovec_info.iovec_wrt_len].position = transPtr->seekPos;
668 iovec[transPtr->iovec_info.iovec_wrt_len].length = length;
670 memcpy(&transPtr->iovec_data.iovec_buf_val[transPtr->iovec_data.iovec_buf_len], buffer, length);
672 transPtr->iovec_info.iovec_wrt_len++;
673 transPtr->iovec_data.iovec_buf_len += length;
674 transPtr->seekPos += length;
677 DBRELE(transPtr->dbase);
681 /* This sets the file pointer associated with the current transaction to the appropriate file and byte position. Unlike Unix files, a transaction is labelled by both a file number (fileid) and a byte position relative to the specified file (position). */
683 int ubik_Seek(transPtr, fileid, position)
684 register struct ubik_trans *transPtr; /* IN */
685 afs_int32 fileid; /* IN */
686 afs_int32 position; /* IN */ {
687 register afs_int32 code;
689 DBHOLD(transPtr->dbase);
690 if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
693 transPtr->seekFile = fileid;
694 transPtr->seekPos = position;
697 DBRELE(transPtr->dbase);
701 /* This call returns the file pointer associated with the specified transaction in fileid and position. */
703 int ubik_Tell(transPtr, fileid, position)
704 register struct ubik_trans *transPtr; /* IN */
705 afs_int32 *fileid; /* OUT */
706 afs_int32 *position; /* OUT */ {
707 DBHOLD(transPtr->dbase);
708 *fileid = transPtr->seekFile;
709 *position = transPtr->seekPos;
710 DBRELE(transPtr->dbase);
714 /* This sets the file size for the currently-selected file to length bytes, if length is less than the file's current size. */
716 int ubik_Truncate(transPtr, length)
717 register struct ubik_trans *transPtr; /* in */
718 afs_int32 length; /* in */ {
719 afs_int32 code, error=0;
721 /* Will also catch if not UBIK_WRITETRANS */
722 code = ubik_Flush(transPtr);
723 if (code) return(code);
725 DBHOLD(transPtr->dbase);
726 /* first, check that quorum is still good, and that dbase is up-to-date */
727 if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
728 ERROR_EXIT(UNOQUORUM);
729 if (!ubeacon_AmSyncSite())
730 ERROR_EXIT(UNOTSYNC);
732 /* now do the operation locally, and propagate it out */
733 code = udisk_truncate(transPtr, transPtr->seekFile, length);
735 code = ContactQuorum(DISK_Truncate, transPtr, 0, transPtr->seekFile, length);
738 /* we must abort the operation */
739 udisk_abort(transPtr);
740 ContactQuorum(DISK_Abort, transPtr, 0); /* force aborts to the others */
745 DBRELE(transPtr->dbase);
749 /* set a lock; all locks are released on transaction end (commit/abort) */
750 ubik_SetLock(atrans, apos, alen, atype)
751 struct ubik_trans *atrans;
752 afs_int32 apos, alen; /* apos and alen are not used */
754 afs_int32 code=0, error=0;
756 if (atype == LOCKWRITE) {
757 if (atrans->type == UBIK_READTRANS) return UBADTYPE;
758 code = ubik_Flush(atrans);
759 if (code) return(code);
762 DBHOLD(atrans->dbase);
763 if (atype == LOCKREAD) {
764 code = ulock_getLock(atrans, atype, 1);
765 if (code) ERROR_EXIT(code);
768 /* first, check that quorum is still good, and that dbase is up-to-date */
769 if (!urecovery_AllBetter(atrans->dbase, atrans->flags & TRREADANY))
770 ERROR_EXIT(UNOQUORUM);
771 if (!ubeacon_AmSyncSite())
772 ERROR_EXIT(UNOTSYNC);
774 /* now do the operation locally, and propagate it out */
775 code = ulock_getLock(atrans, atype, 1);
777 code = ContactQuorum(DISK_Lock, atrans, 0, 0,
778 1/*unused*/, 1/*unused*/, LOCKWRITE);
781 /* we must abort the operation */
783 ContactQuorum(DISK_Abort, atrans, 0); /* force aborts to the others */
789 DBRELE(atrans->dbase);
793 /* utility to wait for a version # to change */
794 int ubik_WaitVersion(adatabase, aversion)
795 register struct ubik_version *aversion;
796 register struct ubik_dbase *adatabase; {
798 /* wait until version # changes, and then return */
799 if (vcmp(*aversion, adatabase->version) != 0)
801 LWP_WaitProcess(&adatabase->version); /* same vers, just wait */
805 /* utility to get the version of the dbase a transaction is dealing with */
806 int ubik_GetVersion(atrans, avers)
807 register struct ubik_trans *atrans;
808 register struct ubik_version *avers; {
809 *avers = atrans->dbase->version;
813 /* Facility to simplify database caching. Returns zero if last trans was done
814 on the local server and was successful. If return value is non-zero and the
815 caller is a server caching part of the Ubik database, it should invalidate
816 that cache. A return value of -1 means bad (NULL) argument. */
818 int ubik_CacheUpdate (atrans)
819 register struct ubik_trans *atrans;
821 if (!(atrans && atrans->dbase)) return -1;
822 return vcmp(atrans->dbase->cachedVersion, atrans->dbase->version) != 0;
825 int panic(a, b, c, d)
828 ubik_print("Ubik PANIC: ");
829 ubik_print(a, b, c, d);
831 ubik_print("BACK FROM ABORT\n"); /* shouldn't come back */
832 exit(1); /* never know, though */
836 ** This functions takes an IP addresses as its parameter. It returns the
837 ** the primary IP address that is on the host passed in.
840 ubikGetPrimaryInterfaceAddr(addr)
841 afs_uint32 addr; /* network byte order */
843 struct ubik_server *ts;
846 for ( ts=ubik_servers; ts; ts=ts->next )
847 for ( j=0; j < UBIK_MAX_INTERFACE_ADDR; j++)
848 if ( ts->addr[j] == addr )
849 return ts->addr[0]; /* net byte order */
850 return 0; /* if not in server database, return error */