2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
15 #include <sys/types.h>
20 #include <netinet/in.h>
21 #include <sys/param.h>
34 #include <afs/cellconfig.h>
36 #define UBIK_INTERNALS
40 #define ERROR_EXIT(code) {error=(code); goto error_exit;}
42 /* This system is organized in a hierarchical set of related modules. Modules
43 at one level can only call modules at the same level or below.
45 At the bottom level (0) we have R, RFTP, LWP and IOMGR, i.e. the basic
46 operating system primitives.
48 At the next level (1) we have
50 VOTER--The module responsible for casting votes when asked. It is also
51 responsible for determining whether this server should try to become
52 a synchronization site.
54 BEACONER--The module responsible for sending keep-alives out when a
55 server is actually the sync site, or trying to become a sync site.
57 DISK--The module responsible for representing atomic transactions
58 on the local disk. It maintains a new-value only log.
60 LOCK--The module responsible for locking byte ranges in the database file.
62 At the next level (2) we have
64 RECOVERY--The module responsible for ensuring that all members of a quorum
65 have the same up-to-date database after a new synchronization site is
66 elected. This module runs only on the synchronization site.
68 At the next level (3) we have
70 REMOTE--The module responsible for interpreting requests from the sync
71 site and applying them to the database, after obtaining the appropriate
74 At the next level (4) we have
76 UBIK--The module users call to perform operations on the database.
81 afs_int32 ubik_quorum=0;
82 struct ubik_dbase *ubik_dbase=0;
83 struct ubik_stats ubik_stats;
84 afs_uint32 ubik_host[UBIK_MAX_INTERFACE_ADDR];
85 afs_int32 ubik_epochTime = 0;
86 afs_int32 urecovery_state = 0;
87 int (*ubik_SRXSecurityProc)();
88 char *ubik_SRXSecurityRock;
89 struct ubik_server *ubik_servers;
90 short ubik_callPortal;
92 static int BeginTrans();
94 struct rx_securityClass *ubik_sc[3];
96 /* perform an operation at a quorum, handling error conditions. return 0 if
97 all worked, otherwise mark failing server as down and return UERROR
99 Note that if any server misses an update, we must wait BIGTIME seconds before
100 allowing the transaction to commit, to ensure that the missing and possibly still
101 functioning server times out and stop handing out old data. This is done in the commit
102 code, where we wait for a server marked down to have stayed down for BIGTIME seconds
103 before we allow a transaction to commit. A server that fails but comes back up won't give
104 out old data because it is sent the sync count along with the beacon message that
105 marks it as *really* up (beaconSinceDown).
107 #define CStampVersion 1 /* meaning set ts->version */
108 afs_int32 ContactQuorum(aproc, atrans, aflags, aparm0, aparm1, aparm2, aparm3, aparm4, aparm5)
111 register struct ubik_trans *atrans;
112 long aparm0, aparm1, aparm2, aparm3, aparm4, aparm5; {
113 register struct ubik_server *ts;
114 register afs_int32 code;
115 afs_int32 rcode, okcalls;
119 for(ts = ubik_servers; ts; ts=ts->next) {
120 /* for each server */
121 if (!ts->up || !ts->currentDB) {
122 ts->currentDB = 0; /* db is no longer current; we just missed an update */
123 continue; /* not up-to-date, don't bother */
125 code = (*aproc) (ts->disk_rxcid, &atrans->tid, aparm0, aparm1, aparm2, aparm3, aparm4, aparm5);
126 if ( (aproc == DISK_WriteV) && (code <= -450) && (code > -500) ) {
127 /* An RPC interface mismatch (as defined in comerr/error_msg.c).
128 * Un-bulk the entries and do individual DISK_Write calls
129 * instead of DISK_WriteV.
131 iovec_wrt *iovec_infoP = (iovec_wrt *)aparm0;
132 iovec_buf *iovec_dataP = (iovec_buf *)aparm1;
133 struct ubik_iovec *iovec = (struct ubik_iovec *)iovec_infoP->iovec_wrt_val;
134 char *iobuf = (char *)iovec_dataP->iovec_buf_val;
138 for (i=0, offset=0; i<iovec_infoP->iovec_wrt_len; i++) {
139 /* Sanity check for going off end of buffer */
140 if ((offset + iovec[i].length) > iovec_dataP->iovec_buf_len) {
144 tcbs.bulkdata_len = iovec[i].length;
145 tcbs.bulkdata_val = &iobuf[offset];
146 code = DISK_Write(ts->disk_rxcid, &atrans->tid,
147 iovec[i].file, iovec[i].position, &tcbs);
150 offset += iovec[i].length;
153 if (code) { /* failure */
155 ts->up = 0; /* mark as down now; beacons will no longer be sent */
157 ts->beaconSinceDown = 0;
158 urecovery_LostServer(); /* tell recovery to try to resend dbase later */
159 } else { /* success */
161 okcalls++; /* count up how many worked */
162 if (aflags & CStampVersion) {
163 ts->version = atrans->dbase->version;
167 /* return 0 if we successfully contacted a quorum, otherwise return error code. We don't have to contact ourselves (that was done locally) */
168 if (okcalls+1 >= ubik_quorum) return 0;
172 /* This routine initializes the ubik system for a set of servers. It returns 0 for success, or an error code on failure. The set of servers is specified by serverList; nServers gives the number of entries in this array. Finally, dbase is the returned structure representing this instance of a ubik; it is passed to various calls below. The variable pathName provides an initial prefix used for naming storage files used by this system. It should perhaps be generalized to a low-level disk interface providing read, write, file enumeration and sync operations.
174 Note that the host named by myHost should not also be listed in serverList.
177 int ubik_ServerInitByInfo(myHost, myPort, info, clones, pathName, dbase)
178 struct afsconf_cell *info; /* in */
182 char *pathName; /* in */
183 struct ubik_dbase **dbase; /* out */
187 code = ubik_ServerInitCommon(myHost, myPort, info, clones, 0, pathName, dbase);
191 int ubik_ServerInit(myHost, myPort, serverList, pathName, dbase)
192 afs_int32 serverList[]; /* in */
195 char *pathName; /* in */
196 struct ubik_dbase **dbase; /* out */
200 code = ubik_ServerInitCommon(myHost, myPort, (struct afsconf_cell *)0, 0,
201 serverList, pathName, dbase);
205 int ubik_ServerInitCommon(myHost, myPort, info, clones, serverList, pathName, dbase)
208 struct afsconf_cell *info; /* in */
210 afs_int32 serverList[]; /* in */
211 char *pathName; /* in */
212 struct ubik_dbase **dbase; /* out */
214 register struct ubik_dbase *tdb;
215 register afs_int32 code;
218 struct rx_securityClass *secClass;
220 struct rx_service *tservice;
221 extern int VOTE_ExecuteRequest(), DISK_ExecuteRequest();
222 extern void rx_ServerProc();
223 extern int rx_stackSize;
225 initialize_U_error_table();
227 tdb = (struct ubik_dbase *) malloc(sizeof(struct ubik_dbase));
228 tdb->pathName = (char *) malloc(strlen(pathName)+1);
229 strcpy(tdb->pathName, pathName);
230 tdb->activeTrans = (struct ubik_trans *) 0;
231 memset(&tdb->version, 0, sizeof(struct ubik_version));
232 memset(&tdb->cachedVersion, 0, sizeof(struct ubik_version));
233 Lock_Init(&tdb->versionLock);
235 tdb->read = uphys_read;
236 tdb->write = uphys_write;
237 tdb->truncate = uphys_truncate;
238 tdb->open = 0; /* this function isn't used any more */
239 tdb->sync = uphys_sync;
240 tdb->stat = uphys_stat;
241 tdb->getlabel = uphys_getlabel;
242 tdb->setlabel = uphys_setlabel;
243 tdb->getnfiles = uphys_getnfiles;
245 tdb->tidCounter=tdb->writeTidCounter=0;
247 ubik_dbase = tdb; /* for now, only one db per server; can fix later when we have names for the other dbases */
250 ubik_callPortal = myPort;
251 /* try to get an additional security object */
252 ubik_sc[0] = rxnull_NewServerSecurityObject();
255 if (ubik_SRXSecurityProc) {
256 code = (*ubik_SRXSecurityProc)(ubik_SRXSecurityRock, &secClass, &secIndex);
258 ubik_sc[secIndex] = secClass;
261 code = rx_Init(myPort);
262 if (code < 0) return code;
263 tservice = rx_NewService(0, VOTE_SERVICE_ID, "VOTE", ubik_sc, 3, VOTE_ExecuteRequest);
264 if (tservice == (struct rx_service *)0) {
265 ubik_dprint("Could not create VOTE rx service!\n");
268 rx_SetMinProcs(tservice, 2);
269 rx_SetMaxProcs(tservice, 3);
271 tservice = rx_NewService(0, DISK_SERVICE_ID, "DISK", ubik_sc, 3, DISK_ExecuteRequest);
272 if (tservice == (struct rx_service *)0) {
273 ubik_dprint("Could not create DISK rx service!\n");
276 rx_SetMinProcs(tservice, 2);
277 rx_SetMaxProcs(tservice, 3);
279 /* start an rx_ServerProc to handle incoming RPC's in particular the
280 * UpdateInterfaceAddr RPC that occurs in ubeacon_InitServerList. This avoids
281 * the "steplock" problem in ubik initialization. Defect 11037.
283 LWP_CreateProcess(rx_ServerProc, rx_stackSize, RX_PROCESS_PRIORITY,
284 (void *) 0, "rx_ServerProc", &junk);
286 /* do basic initialization */
288 if (code) return code;
289 code = urecovery_Initialize(tdb);
290 if (code) return code;
292 code = ubeacon_InitServerListByInfo(myHost, info, clones);
294 code = ubeacon_InitServerList(myHost, serverList);
295 if (code) return code;
297 /* now start up async processes */
298 code = LWP_CreateProcess(ubeacon_Interact, 16384/*8192*/, LWP_MAX_PRIORITY-1,
299 (void *) 0, "beacon", &junk);
300 if (code) return code;
301 code = LWP_CreateProcess(urecovery_Interact, 16384/*8192*/, LWP_MAX_PRIORITY-1,
302 (void *) 0, "recovery", &junk);
306 /* This routine begins a read or write transaction on the transaction
307 identified by transPtr, in the dbase named by dbase. An open mode of
308 ubik_READTRANS identifies this as a read transaction, while a mode of
309 ubik_WRITETRANS identifies this as a write transaction. transPtr
310 is set to the returned transaction control block. The readAny flag is
311 set to 0 or 1 by the wrapper functions ubik_BeginTrans() or
312 ubik_BeginTransReadAny() below.
314 We can only begin transaction when we have an up-to-date database.
317 static int BeginTrans(dbase, transMode, transPtr, readAny)
318 register struct ubik_dbase *dbase; /* in */
320 afs_int32 transMode; /* in */
321 struct ubik_trans **transPtr; /* out */ {
322 struct ubik_trans *jt;
323 register struct ubik_trans *tt;
324 register afs_int32 code;
325 #if defined(UBIK_PAUSE)
327 #endif /* UBIK_PAUSE */
329 if ((transMode != UBIK_READTRANS) && readAny) return UBADTYPE;
331 #if defined(UBIK_PAUSE)
332 /* if we're polling the slave sites, wait until the returns
333 * are all in. Otherwise, the urecovery_CheckTid call may
336 if (transMode == UBIK_WRITETRANS)
337 for (count = 75; dbase->flags & DBVOTING; --count) {
339 #ifdef GRAND_PAUSE_DEBUGGING
341 fprintf (stderr,"%ld: myport=%d: BeginTrans is waiting 'cause of voting conflict\n", time(0), ntohs(ubik_callPortal));
346 fprintf (stderr,"%ld: myport=%d: BeginTrans failed because of voting conflict\n", time(0), ntohs(ubik_callPortal));
348 return UNOQUORUM; /* a white lie */
353 #endif /* UBIK_PAUSE */
354 if (urecovery_AllBetter(dbase, readAny)==0) {
358 /* otherwise we have a quorum, use it */
360 /* make sure that at most one write transaction occurs at any one time. This
361 has nothing to do with transaction locking; that's enforced by the lock package. However,
362 we can't even handle two non-conflicting writes, since our log and recovery modules
363 don't know how to restore one without possibly picking up some data from the other. */
364 if (transMode == UBIK_WRITETRANS) {
365 /* if we're writing already, wait */
366 while(dbase->flags & DBWRITING) {
368 LWP_WaitProcess(&dbase->flags);
371 if (!ubeacon_AmSyncSite()) {
377 /* create the transaction */
378 code = udisk_begin(dbase, transMode, &jt); /* can't take address of register var */
379 tt = jt; /* move to a register */
380 if (code || tt == (struct ubik_trans *)NULL) {
384 if (readAny) tt->flags |= TRREADANY;
385 /* label trans and dbase with new tid */
386 tt->tid.epoch = ubik_epochTime;
387 /* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */
388 tt->tid.counter = (dbase->tidCounter += 2);
390 if (transMode == UBIK_WRITETRANS) {
391 /* for a write trans, we have to keep track of the write tid counter too */
392 #if defined(UBIK_PAUSE)
393 dbase->writeTidCounter = tt->tid.counter;
395 dbase->writeTidCounter += 2;
396 #endif /* UBIK_PAUSE */
398 /* next try to start transaction on appropriate number of machines */
399 code = ContactQuorum(DISK_Begin, tt, 0);
401 /* we must abort the operation */
403 ContactQuorum(DISK_Abort, tt, 0); /* force aborts to the others */
415 int ubik_BeginTrans(dbase, transMode, transPtr)
416 register struct ubik_dbase *dbase; /* in */
417 afs_int32 transMode; /* in */
418 struct ubik_trans **transPtr; /* out */ {
419 return BeginTrans(dbase, transMode, transPtr, 0);
422 int ubik_BeginTransReadAny(dbase, transMode, transPtr)
423 register struct ubik_dbase *dbase; /* in */
424 afs_int32 transMode; /* in */
425 struct ubik_trans **transPtr; /* out */ {
426 return BeginTrans(dbase, transMode, transPtr, 1);
429 /* this routine ends a read or write transaction by aborting it */
430 int ubik_AbortTrans(transPtr)
431 register struct ubik_trans *transPtr; /* in */ {
432 register afs_int32 code;
434 register struct ubik_dbase *dbase;
436 dbase = transPtr->dbase;
438 memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
439 /* see if we're still up-to-date */
440 if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
441 udisk_abort(transPtr);
447 if (transPtr->type == UBIK_READTRANS) {
448 code = udisk_abort(transPtr);
454 /* below here, we know we're doing a write transaction */
455 if (!ubeacon_AmSyncSite()) {
456 udisk_abort(transPtr);
462 /* now it is safe to try remote abort */
463 code = ContactQuorum(DISK_Abort, transPtr, 0);
464 code2 = udisk_abort(transPtr);
467 return (code? code : code2);
470 /* This routine ends a read or write transaction on the open transaction identified by transPtr. It returns an error code. */
471 int ubik_EndTrans(transPtr)
472 register struct ubik_trans *transPtr; /* in */ {
473 register afs_int32 code;
476 register struct ubik_server *ts;
478 register struct ubik_dbase *dbase;
480 if (transPtr->type == UBIK_WRITETRANS) {
481 code = ubik_Flush(transPtr);
483 ubik_AbortTrans(transPtr);
488 dbase = transPtr->dbase;
490 memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
492 /* give up if no longer current */
493 if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
494 udisk_abort(transPtr);
500 if (transPtr->type == UBIK_READTRANS) { /* reads are easy */
501 code = udisk_commit(transPtr);
502 if (code == 0) goto success; /* update cachedVersion correctly */
508 if (!ubeacon_AmSyncSite()) { /* no longer sync site */
509 udisk_abort(transPtr);
515 /* now it is safe to do commit */
516 code = udisk_commit(transPtr);
517 if (code == 0) code = ContactQuorum(DISK_Commit, transPtr, CStampVersion);
519 /* failed to commit, so must return failure. Try to clear locks first, just for fun
520 Note that we don't know if this transaction will eventually commit at this point.
521 If it made it to a site that will be present in the next quorum, we win, otherwise
522 we lose. If we contact a majority of sites, then we won't be here: contacting
523 a majority guarantees commit, since it guarantees that one dude will be a
524 member of the next quorum. */
525 ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
530 /* before we can start sending unlock messages, we must wait until all servers
531 that are possibly still functioning on the other side of a network partition
532 have timed out. Check the server structures, compute how long to wait, then
534 realStart = FT_ApproxTime();
536 /* wait for all servers to time out */
538 now = FT_ApproxTime();
539 /* check if we're still sync site, the guy should either come up
540 to us, or timeout. Put safety check in anyway */
541 if (now - realStart > 10 * BIGTIME) {
542 ubik_stats.escapes++;
543 ubik_print("ubik escaping from commit wait\n");
546 for(ts = ubik_servers; ts; ts=ts->next) {
547 if (!ts->beaconSinceDown && now <= ts->lastBeaconSent + BIGTIME) {
548 /* this guy could have some damaged data, wait for him */
550 tv.tv_sec = 1; /* try again after a while (ha ha) */
552 IOMGR_Select(0, 0, 0, 0, &tv); /* poll, should we wait on something? */
556 if (code == 0) break; /* no down ones still pseudo-active */
559 /* finally, unlock all the dudes. We can return success independent of the number of servers
560 that really unlock the dbase; the others will do it if/when they elect a new sync site.
561 The transaction is committed anyway, since we succeeded in contacting a quorum
562 at the start (when invoking the DiskCommit function).
564 ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
568 /* update version on successful EndTrans */
569 memcpy(&dbase->cachedVersion, &dbase->version, sizeof(struct ubik_version));
575 /* This routine reads length bytes into buffer from the current position in the database. The file pointer is updated appropriately (by adding the number of bytes actually transferred), and the length actually transferred is stored in the long integer pointed to by length. Note that *length is an INOUT parameter: at the start it represents the size of the buffer, and when done, it contains the number of bytes actually transferred. A short read returns zero for an error code. */
577 int ubik_Read(transPtr, buffer, length)
578 register struct ubik_trans *transPtr; /* in */
579 char *buffer; /* in */
580 afs_int32 length; /* in */ {
581 register afs_int32 code;
583 /* reads are easy to do: handle locally */
584 DBHOLD(transPtr->dbase);
585 if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
586 DBRELE(transPtr->dbase);
590 code = udisk_read(transPtr, transPtr->seekFile, buffer, transPtr->seekPos, length);
592 transPtr->seekPos += length;
594 DBRELE(transPtr->dbase);
598 /* This routine will flush the io data in the iovec structures. It first
599 * flushes to the local disk and then uses ContactQuorum to write it to
602 int ubik_Flush(transPtr)
603 struct ubik_trans *transPtr;
605 afs_int32 code, error=0;
607 if (transPtr->type != UBIK_WRITETRANS)
609 if (!transPtr->iovec_info.iovec_wrt_len || !transPtr->iovec_info.iovec_wrt_val)
612 DBHOLD(transPtr->dbase);
613 if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
614 ERROR_EXIT(UNOQUORUM);
615 if (!ubeacon_AmSyncSite()) /* only sync site can write */
616 ERROR_EXIT(UNOTSYNC);
618 /* Update the rest of the servers in the quorum */
619 code = ContactQuorum(DISK_WriteV, transPtr, 0,
620 &transPtr->iovec_info, &transPtr->iovec_data);
622 udisk_abort(transPtr);
623 ContactQuorum(DISK_Abort, transPtr, 0); /* force aborts to the others */
624 transPtr->iovec_info.iovec_wrt_len = 0;
625 transPtr->iovec_data.iovec_buf_len = 0;
629 /* Wrote the buffers out, so start at scratch again */
630 transPtr->iovec_info.iovec_wrt_len = 0;
631 transPtr->iovec_data.iovec_buf_len = 0;
634 DBRELE(transPtr->dbase);
638 int ubik_Write(transPtr, buffer, length)
639 register struct ubik_trans *transPtr; /* in */
640 char *buffer; /* in */
641 afs_int32 length; /* in */
643 struct ubik_iovec *iovec;
644 afs_int32 code, error=0;
645 afs_int32 pos, len, size;
647 if (transPtr->type != UBIK_WRITETRANS)
652 if (length > IOVEC_MAXBUF) {
653 for (pos=0, len=length; len>0; len-=size, pos+=size) {
654 size = ((len < IOVEC_MAXBUF) ? len : IOVEC_MAXBUF);
655 code = ubik_Write(transPtr, &buffer[pos], size);
656 if (code) return (code);
661 if (!transPtr->iovec_info.iovec_wrt_val) {
662 transPtr->iovec_info.iovec_wrt_len = 0;
663 transPtr->iovec_info.iovec_wrt_val =
664 (struct ubik_iovec *)malloc(IOVEC_MAXWRT*sizeof(struct ubik_iovec));
665 transPtr->iovec_data.iovec_buf_len = 0;
666 transPtr->iovec_data.iovec_buf_val = (char *)malloc(IOVEC_MAXBUF);
667 if (!transPtr->iovec_info.iovec_wrt_val || !transPtr->iovec_data.iovec_buf_val) {
668 if (transPtr->iovec_info.iovec_wrt_val) free(transPtr->iovec_info.iovec_wrt_val);
669 transPtr->iovec_info.iovec_wrt_val = 0;
670 if (transPtr->iovec_data.iovec_buf_val) free(transPtr->iovec_data.iovec_buf_val);
671 transPtr->iovec_data.iovec_buf_val = 0;
676 /* If this write won't fit in the structure, then flush it out and start anew */
677 if ( (transPtr->iovec_info.iovec_wrt_len >= IOVEC_MAXWRT) ||
678 ((length + transPtr->iovec_data.iovec_buf_len) > IOVEC_MAXBUF) ) {
679 code = ubik_Flush(transPtr);
680 if (code) return (code);
683 DBHOLD(transPtr->dbase);
684 if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
685 ERROR_EXIT(UNOQUORUM);
686 if (!ubeacon_AmSyncSite()) /* only sync site can write */
687 ERROR_EXIT(UNOTSYNC);
689 /* Write to the local disk */
690 code = udisk_write(transPtr, transPtr->seekFile, buffer,
691 transPtr->seekPos, length);
693 udisk_abort(transPtr);
694 transPtr->iovec_info.iovec_wrt_len = 0;
695 transPtr->iovec_data.iovec_buf_len = 0;
696 DBRELE(transPtr->dbase);
700 /* Collect writes for the other ubik servers (to be done in bulk) */
701 iovec = (struct ubik_iovec *)transPtr->iovec_info.iovec_wrt_val;
702 iovec[transPtr->iovec_info.iovec_wrt_len].file = transPtr->seekFile;
703 iovec[transPtr->iovec_info.iovec_wrt_len].position = transPtr->seekPos;
704 iovec[transPtr->iovec_info.iovec_wrt_len].length = length;
706 memcpy(&transPtr->iovec_data.iovec_buf_val[transPtr->iovec_data.iovec_buf_len], buffer, length);
708 transPtr->iovec_info.iovec_wrt_len++;
709 transPtr->iovec_data.iovec_buf_len += length;
710 transPtr->seekPos += length;
713 DBRELE(transPtr->dbase);
717 /* This sets the file pointer associated with the current transaction to the appropriate file and byte position. Unlike Unix files, a transaction is labelled by both a file number (fileid) and a byte position relative to the specified file (position). */
719 int ubik_Seek(transPtr, fileid, position)
720 register struct ubik_trans *transPtr; /* IN */
721 afs_int32 fileid; /* IN */
722 afs_int32 position; /* IN */ {
723 register afs_int32 code;
725 DBHOLD(transPtr->dbase);
726 if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
729 transPtr->seekFile = fileid;
730 transPtr->seekPos = position;
733 DBRELE(transPtr->dbase);
737 /* This call returns the file pointer associated with the specified transaction in fileid and position. */
739 int ubik_Tell(transPtr, fileid, position)
740 register struct ubik_trans *transPtr; /* IN */
741 afs_int32 *fileid; /* OUT */
742 afs_int32 *position; /* OUT */ {
743 DBHOLD(transPtr->dbase);
744 *fileid = transPtr->seekFile;
745 *position = transPtr->seekPos;
746 DBRELE(transPtr->dbase);
750 /* This sets the file size for the currently-selected file to length bytes, if length is less than the file's current size. */
752 int ubik_Truncate(transPtr, length)
753 register struct ubik_trans *transPtr; /* in */
754 afs_int32 length; /* in */ {
755 afs_int32 code, error=0;
757 /* Will also catch if not UBIK_WRITETRANS */
758 code = ubik_Flush(transPtr);
759 if (code) return(code);
761 DBHOLD(transPtr->dbase);
762 /* first, check that quorum is still good, and that dbase is up-to-date */
763 if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
764 ERROR_EXIT(UNOQUORUM);
765 if (!ubeacon_AmSyncSite())
766 ERROR_EXIT(UNOTSYNC);
768 /* now do the operation locally, and propagate it out */
769 code = udisk_truncate(transPtr, transPtr->seekFile, length);
771 code = ContactQuorum(DISK_Truncate, transPtr, 0, transPtr->seekFile, length);
774 /* we must abort the operation */
775 udisk_abort(transPtr);
776 ContactQuorum(DISK_Abort, transPtr, 0); /* force aborts to the others */
781 DBRELE(transPtr->dbase);
785 /* set a lock; all locks are released on transaction end (commit/abort) */
786 ubik_SetLock(atrans, apos, alen, atype)
787 struct ubik_trans *atrans;
788 afs_int32 apos, alen; /* apos and alen are not used */
790 afs_int32 code=0, error=0;
792 if (atype == LOCKWRITE) {
793 if (atrans->type == UBIK_READTRANS) return UBADTYPE;
794 code = ubik_Flush(atrans);
795 if (code) return(code);
798 DBHOLD(atrans->dbase);
799 if (atype == LOCKREAD) {
800 code = ulock_getLock(atrans, atype, 1);
801 if (code) ERROR_EXIT(code);
804 /* first, check that quorum is still good, and that dbase is up-to-date */
805 if (!urecovery_AllBetter(atrans->dbase, atrans->flags & TRREADANY))
806 ERROR_EXIT(UNOQUORUM);
807 if (!ubeacon_AmSyncSite())
808 ERROR_EXIT(UNOTSYNC);
810 /* now do the operation locally, and propagate it out */
811 code = ulock_getLock(atrans, atype, 1);
813 code = ContactQuorum(DISK_Lock, atrans, 0, 0,
814 1/*unused*/, 1/*unused*/, LOCKWRITE);
817 /* we must abort the operation */
819 ContactQuorum(DISK_Abort, atrans, 0); /* force aborts to the others */
825 DBRELE(atrans->dbase);
829 /* utility to wait for a version # to change */
830 int ubik_WaitVersion(adatabase, aversion)
831 register struct ubik_version *aversion;
832 register struct ubik_dbase *adatabase; {
834 /* wait until version # changes, and then return */
835 if (vcmp(*aversion, adatabase->version) != 0)
837 LWP_WaitProcess(&adatabase->version); /* same vers, just wait */
841 /* utility to get the version of the dbase a transaction is dealing with */
842 int ubik_GetVersion(atrans, avers)
843 register struct ubik_trans *atrans;
844 register struct ubik_version *avers; {
845 *avers = atrans->dbase->version;
849 /* Facility to simplify database caching. Returns zero if last trans was done
850 on the local server and was successful. If return value is non-zero and the
851 caller is a server caching part of the Ubik database, it should invalidate
852 that cache. A return value of -1 means bad (NULL) argument. */
854 int ubik_CacheUpdate (atrans)
855 register struct ubik_trans *atrans;
857 if (!(atrans && atrans->dbase)) return -1;
858 return vcmp(atrans->dbase->cachedVersion, atrans->dbase->version) != 0;
861 int panic(a, b, c, d)
864 ubik_print("Ubik PANIC: ");
865 ubik_print(a, b, c, d);
867 ubik_print("BACK FROM ABORT\n"); /* shouldn't come back */
868 exit(1); /* never know, though */
872 ** This functions takes an IP addresses as its parameter. It returns the
873 ** the primary IP address that is on the host passed in.
876 ubikGetPrimaryInterfaceAddr(addr)
877 afs_uint32 addr; /* network byte order */
879 struct ubik_server *ts;
882 for ( ts=ubik_servers; ts; ts=ts->next )
883 for ( j=0; j < UBIK_MAX_INTERFACE_ADDR; j++)
884 if ( ts->addr[j] == addr )
885 return ts->addr[0]; /* net byte order */
886 return 0; /* if not in server database, return error */