2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
16 #include <sys/types.h>
21 #include <netinet/in.h>
22 #include <sys/param.h>
29 #include <afs/cellconfig.h>
31 #define UBIK_INTERNALS
35 #include <lwp.h> /* temporary hack by klm */
37 #define ERROR_EXIT(code) {error=(code); goto error_exit;}
39 /* This system is organized in a hierarchical set of related modules. Modules
40 at one level can only call modules at the same level or below.
42 At the bottom level (0) we have R, RFTP, LWP and IOMGR, i.e. the basic
43 operating system primitives.
45 At the next level (1) we have
47 VOTER--The module responsible for casting votes when asked. It is also
48 responsible for determining whether this server should try to become
49 a synchronization site.
51 BEACONER--The module responsible for sending keep-alives out when a
52 server is actually the sync site, or trying to become a sync site.
54 DISK--The module responsible for representing atomic transactions
55 on the local disk. It maintains a new-value only log.
57 LOCK--The module responsible for locking byte ranges in the database file.
59 At the next level (2) we have
61 RECOVERY--The module responsible for ensuring that all members of a quorum
62 have the same up-to-date database after a new synchronization site is
63 elected. This module runs only on the synchronization site.
65 At the next level (3) we have
67 REMOTE--The module responsible for interpreting requests from the sync
68 site and applying them to the database, after obtaining the appropriate
71 At the next level (4) we have
73 UBIK--The module users call to perform operations on the database.
78 afs_int32 ubik_quorum = 0;
79 struct ubik_dbase *ubik_dbase = 0;
80 struct ubik_stats ubik_stats;
81 afs_uint32 ubik_host[UBIK_MAX_INTERFACE_ADDR];
82 afs_int32 ubik_epochTime = 0;
83 afs_int32 urecovery_state = 0;
84 int (*ubik_SRXSecurityProc) ();
85 char *ubik_SRXSecurityRock;
86 struct ubik_server *ubik_servers;
87 short ubik_callPortal;
89 static int BeginTrans();
91 struct rx_securityClass *ubik_sc[3];
93 /* perform an operation at a quorum, handling error conditions. return 0 if
94 all worked, otherwise mark failing server as down and return UERROR
96 Note that if any server misses an update, we must wait BIGTIME seconds before
97 allowing the transaction to commit, to ensure that the missing and possibly still
98 functioning server times out and stop handing out old data. This is done in the commit
99 code, where we wait for a server marked down to have stayed down for BIGTIME seconds
100 before we allow a transaction to commit. A server that fails but comes back up won't give
101 out old data because it is sent the sync count along with the beacon message that
102 marks it as *really* up (beaconSinceDown).
104 #define CStampVersion 1 /* meaning set ts->version */
106 ContactQuorum(aproc, atrans, aflags, aparm0, aparm1, aparm2, aparm3, aparm4,
110 register struct ubik_trans *atrans;
111 long aparm0, aparm1, aparm2, aparm3, aparm4, aparm5;
113 register struct ubik_server *ts;
114 register afs_int32 code;
115 afs_int32 rcode, okcalls;
119 for (ts = ubik_servers; ts; ts = ts->next) {
120 /* for each server */
121 if (!ts->up || !ts->currentDB) {
122 ts->currentDB = 0; /* db is no longer current; we just missed an update */
123 continue; /* not up-to-date, don't bother */
126 (*aproc) (ts->disk_rxcid, &atrans->tid, aparm0, aparm1, aparm2,
127 aparm3, aparm4, aparm5);
128 if ((aproc == DISK_WriteV) && (code <= -450) && (code > -500)) {
129 /* An RPC interface mismatch (as defined in comerr/error_msg.c).
130 * Un-bulk the entries and do individual DISK_Write calls
131 * instead of DISK_WriteV.
133 iovec_wrt *iovec_infoP = (iovec_wrt *) aparm0;
134 iovec_buf *iovec_dataP = (iovec_buf *) aparm1;
135 struct ubik_iovec *iovec =
136 (struct ubik_iovec *)iovec_infoP->iovec_wrt_val;
137 char *iobuf = (char *)iovec_dataP->iovec_buf_val;
141 for (i = 0, offset = 0; i < iovec_infoP->iovec_wrt_len; i++) {
142 /* Sanity check for going off end of buffer */
143 if ((offset + iovec[i].length) > iovec_dataP->iovec_buf_len) {
147 tcbs.bulkdata_len = iovec[i].length;
148 tcbs.bulkdata_val = &iobuf[offset];
150 DISK_Write(ts->disk_rxcid, &atrans->tid, iovec[i].file,
151 iovec[i].position, &tcbs);
155 offset += iovec[i].length;
158 if (code) { /* failure */
160 ts->up = 0; /* mark as down now; beacons will no longer be sent */
162 ts->beaconSinceDown = 0;
163 urecovery_LostServer(); /* tell recovery to try to resend dbase later */
164 } else { /* success */
166 okcalls++; /* count up how many worked */
167 if (aflags & CStampVersion) {
168 ts->version = atrans->dbase->version;
172 /* return 0 if we successfully contacted a quorum, otherwise return error code. We don't have to contact ourselves (that was done locally) */
173 if (okcalls + 1 >= ubik_quorum)
179 /* This routine initializes the ubik system for a set of servers. It returns 0 for success, or an error code on failure. The set of servers is specified by serverList; nServers gives the number of entries in this array. Finally, dbase is the returned structure representing this instance of a ubik; it is passed to various calls below. The variable pathName provides an initial prefix used for naming storage files used by this system. It should perhaps be generalized to a low-level disk interface providing read, write, file enumeration and sync operations.
181 Note that the host named by myHost should not also be listed in serverList.
185 ubik_ServerInitCommon(afs_int32 myHost, short myPort,
186 struct afsconf_cell *info, char clones[],
187 afs_int32 serverList[], char *pathName,
188 struct ubik_dbase **dbase)
190 register struct ubik_dbase *tdb;
191 register afs_int32 code;
192 #if defined(AFS_PTHREAD_ENV) && defined(UBIK_PTHREAD_ENV)
193 pthread_t rxServerThread; /* pthread variables */
194 pthread_t ubeacon_InteractThread;
195 pthread_t urecovery_InteractThread;
196 pthread_attr_t rxServer_tattr;
197 pthread_attr_t ubeacon_Interact_tattr;
198 pthread_attr_t urecovery_Interact_tattr;
204 struct rx_securityClass *secClass;
206 struct rx_service *tservice;
207 extern int VOTE_ExecuteRequest(), DISK_ExecuteRequest();
208 extern int rx_stackSize;
210 initialize_U_error_table();
212 tdb = (struct ubik_dbase *)malloc(sizeof(struct ubik_dbase));
213 tdb->pathName = (char *)malloc(strlen(pathName) + 1);
214 strcpy(tdb->pathName, pathName);
215 tdb->activeTrans = (struct ubik_trans *)0;
216 memset(&tdb->version, 0, sizeof(struct ubik_version));
217 memset(&tdb->cachedVersion, 0, sizeof(struct ubik_version));
218 Lock_Init(&tdb->versionLock);
220 tdb->read = uphys_read;
221 tdb->write = uphys_write;
222 tdb->truncate = uphys_truncate;
223 tdb->open = uphys_invalidate; /* this function isn't used any more */
224 tdb->sync = uphys_sync;
225 tdb->stat = uphys_stat;
226 tdb->getlabel = uphys_getlabel;
227 tdb->setlabel = uphys_setlabel;
228 tdb->getnfiles = uphys_getnfiles;
230 tdb->tidCounter = tdb->writeTidCounter = 0;
232 ubik_dbase = tdb; /* for now, only one db per server; can fix later when we have names for the other dbases */
236 /* the following call is idempotent so when/if it got called earlier,
237 * by whatever called us, it doesn't really matter -- klm */
238 code = rx_Init(myPort);
242 ubik_callPortal = myPort;
243 /* try to get an additional security object */
244 ubik_sc[0] = rxnull_NewServerSecurityObject();
247 if (ubik_SRXSecurityProc) {
249 (*ubik_SRXSecurityProc) (ubik_SRXSecurityRock, &secClass,
252 ubik_sc[secIndex] = secClass;
255 /* for backwards compat this should keep working as it does now
258 /* This really needs to be up above, where I have put it. It works
259 * here when we're non-pthreaded, but the code above, when using
260 * pthreads may (and almost certainly does) end up calling on a
261 * pthread resource which gets initialized by rx_Init. The end
262 * result is that an assert fails and the program dies. -- klm
264 code = rx_Init(myPort);
270 rx_NewService(0, VOTE_SERVICE_ID, "VOTE", ubik_sc, 3,
271 VOTE_ExecuteRequest);
272 if (tservice == (struct rx_service *)0) {
273 ubik_dprint("Could not create VOTE rx service!\n");
276 rx_SetMinProcs(tservice, 2);
277 rx_SetMaxProcs(tservice, 3);
280 rx_NewService(0, DISK_SERVICE_ID, "DISK", ubik_sc, 3,
281 DISK_ExecuteRequest);
282 if (tservice == (struct rx_service *)0) {
283 ubik_dprint("Could not create DISK rx service!\n");
286 rx_SetMinProcs(tservice, 2);
287 rx_SetMaxProcs(tservice, 3);
289 /* start an rx_ServerProc to handle incoming RPC's in particular the
290 * UpdateInterfaceAddr RPC that occurs in ubeacon_InitServerList. This avoids
291 * the "steplock" problem in ubik initialization. Defect 11037.
293 #if defined(AFS_PTHREAD_ENV) && defined(UBIK_PTHREAD_ENV)
294 /* do assert stuff */
295 assert(pthread_attr_init(&rxServer_tattr) == 0);
296 assert(pthread_attr_setdetachstate(&rxServer_tattr, PTHREAD_CREATE_DETACHED) == 0);
297 /* assert(pthread_attr_setstacksize(&rxServer_tattr, rx_stackSize) == 0); */
299 assert(pthread_create(&rxServerThread, &rxServer_tattr, (void *)rx_ServerProc, NULL) == 0);
301 LWP_CreateProcess(rx_ServerProc, rx_stackSize, RX_PROCESS_PRIORITY,
302 (void *)0, "rx_ServerProc", &junk);
305 /* do basic initialization */
309 code = urecovery_Initialize(tdb);
313 code = ubeacon_InitServerListByInfo(myHost, info, clones);
315 code = ubeacon_InitServerList(myHost, serverList);
319 /* now start up async processes */
320 #if defined(AFS_PTHREAD_ENV) && defined(UBIK_PTHREAD_ENV)
321 /* do assert stuff */
322 assert(pthread_attr_init(&ubeacon_Interact_tattr) == 0);
323 assert(pthread_attr_setdetachstate(&ubeacon_Interact_tattr, PTHREAD_CREATE_DETACHED) == 0);
324 /* assert(pthread_attr_setstacksize(&ubeacon_Interact_tattr, 16384) == 0); */
325 /* need another attr set here for priority??? - klm */
327 assert(pthread_create(&ubeacon_InteractThread, &ubeacon_Interact_tattr,
328 (void *)ubeacon_Interact, NULL) == 0);
330 code = LWP_CreateProcess(ubeacon_Interact, 16384 /*8192 */ ,
331 LWP_MAX_PRIORITY - 1, (void *)0, "beacon",
337 #if defined(AFS_PTHREAD_ENV) && defined(UBIK_PTHREAD_ENV)
338 /* do assert stuff */
339 assert(pthread_attr_init(&urecovery_Interact_tattr) == 0);
340 assert(pthread_attr_setdetachstate(&urecovery_Interact_tattr, PTHREAD_CREATE_DETACHED) == 0);
341 /* assert(pthread_attr_setstacksize(&urecovery_Interact_tattr, 16384) == 0); */
342 /* need another attr set here for priority??? - klm */
344 assert(pthread_create(&urecovery_InteractThread, &urecovery_Interact_tattr,
345 (void *)urecovery_Interact, NULL) == 0);
347 return 0; /* is this correct? - klm */
349 code = LWP_CreateProcess(urecovery_Interact, 16384 /*8192 */ ,
350 LWP_MAX_PRIORITY - 1, (void *)0, "recovery",
358 ubik_ServerInitByInfo(afs_int32 myHost, short myPort,
359 struct afsconf_cell *info, char clones[],
360 char *pathName, struct ubik_dbase **dbase)
365 ubik_ServerInitCommon(myHost, myPort, info, clones, 0, pathName,
371 ubik_ServerInit(afs_int32 myHost, short myPort, afs_int32 serverList[],
372 char *pathName, struct ubik_dbase **dbase)
377 ubik_ServerInitCommon(myHost, myPort, (struct afsconf_cell *)0, 0,
378 serverList, pathName, dbase);
382 /* This routine begins a read or write transaction on the transaction
383 identified by transPtr, in the dbase named by dbase. An open mode of
384 ubik_READTRANS identifies this as a read transaction, while a mode of
385 ubik_WRITETRANS identifies this as a write transaction. transPtr
386 is set to the returned transaction control block. The readAny flag is
387 set to 0 or 1 by the wrapper functions ubik_BeginTrans() or
388 ubik_BeginTransReadAny() below.
390 We can only begin transaction when we have an up-to-date database.
394 BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
395 struct ubik_trans **transPtr, int readAny)
397 struct ubik_trans *jt;
398 register struct ubik_trans *tt;
399 register afs_int32 code;
400 #if defined(UBIK_PAUSE)
402 #endif /* UBIK_PAUSE */
404 if ((transMode != UBIK_READTRANS) && readAny)
407 #if defined(UBIK_PAUSE)
408 /* if we're polling the slave sites, wait until the returns
409 * are all in. Otherwise, the urecovery_CheckTid call may
412 if (transMode == UBIK_WRITETRANS)
413 for (count = 75; dbase->flags & DBVOTING; --count) {
415 #ifdef GRAND_PAUSE_DEBUGGING
418 "%ld: myport=%d: BeginTrans is waiting 'cause of voting conflict\n",
419 time(0), ntohs(ubik_callPortal));
425 "%ld: myport=%d: BeginTrans failed because of voting conflict\n",
426 time(0), ntohs(ubik_callPortal));
428 return UNOQUORUM; /* a white lie */
430 #if defined(AFS_PTHREAD_ENV) && defined(UBIK_PTHREAD_ENV)
437 #endif /* UBIK_PAUSE */
438 if (urecovery_AllBetter(dbase, readAny) == 0) {
442 /* otherwise we have a quorum, use it */
444 /* make sure that at most one write transaction occurs at any one time. This
445 * has nothing to do with transaction locking; that's enforced by the lock package. However,
446 * we can't even handle two non-conflicting writes, since our log and recovery modules
447 * don't know how to restore one without possibly picking up some data from the other. */
448 if (transMode == UBIK_WRITETRANS) {
449 /* if we're writing already, wait */
450 while (dbase->flags & DBWRITING) {
452 #if defined(AFS_PTHREAD_ENV) && defined(UBIK_PTHREAD_ENV)
453 assert(pthread_mutex_lock(&dbase->flags_mutex) == 0);
454 assert(pthread_cond_wait(&dbase->flags_cond, &dbase->flags_mutex) == 0);
455 assert(pthread_mutex_unlock(&dbase->flags_mutex) == 0);
457 LWP_WaitProcess(&dbase->flags);
461 if (!ubeacon_AmSyncSite()) {
467 /* create the transaction */
468 code = udisk_begin(dbase, transMode, &jt); /* can't take address of register var */
469 tt = jt; /* move to a register */
470 if (code || tt == (struct ubik_trans *)NULL) {
475 tt->flags |= TRREADANY;
476 /* label trans and dbase with new tid */
477 tt->tid.epoch = ubik_epochTime;
478 /* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */
479 tt->tid.counter = (dbase->tidCounter += 2);
481 if (transMode == UBIK_WRITETRANS) {
482 /* for a write trans, we have to keep track of the write tid counter too */
483 #if defined(UBIK_PAUSE)
484 dbase->writeTidCounter = tt->tid.counter;
486 dbase->writeTidCounter += 2;
487 #endif /* UBIK_PAUSE */
489 /* next try to start transaction on appropriate number of machines */
490 code = ContactQuorum(DISK_Begin, tt, 0);
492 /* we must abort the operation */
494 ContactQuorum(DISK_Abort, tt, 0); /* force aborts to the others */
507 ubik_BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
508 struct ubik_trans **transPtr)
510 return BeginTrans(dbase, transMode, transPtr, 0);
514 ubik_BeginTransReadAny(register struct ubik_dbase *dbase, afs_int32 transMode,
515 struct ubik_trans **transPtr)
517 return BeginTrans(dbase, transMode, transPtr, 1);
520 /* this routine ends a read or write transaction by aborting it */
522 ubik_AbortTrans(register struct ubik_trans *transPtr)
524 register afs_int32 code;
526 register struct ubik_dbase *dbase;
528 dbase = transPtr->dbase;
530 memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
531 /* see if we're still up-to-date */
532 if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
533 udisk_abort(transPtr);
539 if (transPtr->type == UBIK_READTRANS) {
540 code = udisk_abort(transPtr);
546 /* below here, we know we're doing a write transaction */
547 if (!ubeacon_AmSyncSite()) {
548 udisk_abort(transPtr);
554 /* now it is safe to try remote abort */
555 code = ContactQuorum(DISK_Abort, transPtr, 0);
556 code2 = udisk_abort(transPtr);
559 return (code ? code : code2);
562 /* This routine ends a read or write transaction on the open transaction identified by transPtr. It returns an error code. */
564 ubik_EndTrans(register struct ubik_trans *transPtr)
566 register afs_int32 code;
569 register struct ubik_server *ts;
571 register struct ubik_dbase *dbase;
573 if (transPtr->type == UBIK_WRITETRANS) {
574 code = ubik_Flush(transPtr);
576 ubik_AbortTrans(transPtr);
581 dbase = transPtr->dbase;
583 memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
585 /* give up if no longer current */
586 if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
587 udisk_abort(transPtr);
593 if (transPtr->type == UBIK_READTRANS) { /* reads are easy */
594 code = udisk_commit(transPtr);
596 goto success; /* update cachedVersion correctly */
602 if (!ubeacon_AmSyncSite()) { /* no longer sync site */
603 udisk_abort(transPtr);
609 /* now it is safe to do commit */
610 code = udisk_commit(transPtr);
612 code = ContactQuorum(DISK_Commit, transPtr, CStampVersion);
614 /* failed to commit, so must return failure. Try to clear locks first, just for fun
615 * Note that we don't know if this transaction will eventually commit at this point.
616 * If it made it to a site that will be present in the next quorum, we win, otherwise
617 * we lose. If we contact a majority of sites, then we won't be here: contacting
618 * a majority guarantees commit, since it guarantees that one dude will be a
619 * member of the next quorum. */
620 ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
625 /* before we can start sending unlock messages, we must wait until all servers
626 * that are possibly still functioning on the other side of a network partition
627 * have timed out. Check the server structures, compute how long to wait, then
628 * start the unlocks */
629 realStart = FT_ApproxTime();
631 /* wait for all servers to time out */
633 now = FT_ApproxTime();
634 /* check if we're still sync site, the guy should either come up
635 * to us, or timeout. Put safety check in anyway */
636 if (now - realStart > 10 * BIGTIME) {
637 ubik_stats.escapes++;
638 ubik_print("ubik escaping from commit wait\n");
641 for (ts = ubik_servers; ts; ts = ts->next) {
642 if (!ts->beaconSinceDown && now <= ts->lastBeaconSent + BIGTIME) {
643 /* this guy could have some damaged data, wait for him */
645 tv.tv_sec = 1; /* try again after a while (ha ha) */
647 #if defined(AFS_PTHREAD_ENV) && defined(UBIK_PTHREAD_ENV)
648 select(0, 0, 0, 0, &tv);
650 IOMGR_Select(0, 0, 0, 0, &tv); /* poll, should we wait on something? */
656 break; /* no down ones still pseudo-active */
659 /* finally, unlock all the dudes. We can return success independent of the number of servers
660 * that really unlock the dbase; the others will do it if/when they elect a new sync site.
661 * The transaction is committed anyway, since we succeeded in contacting a quorum
662 * at the start (when invoking the DiskCommit function).
664 ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
668 /* update version on successful EndTrans */
669 memcpy(&dbase->cachedVersion, &dbase->version,
670 sizeof(struct ubik_version));
676 /* This routine reads length bytes into buffer from the current position in the database. The file pointer is updated appropriately (by adding the number of bytes actually transferred), and the length actually transferred is stored in the long integer pointed to by length. Note that *length is an INOUT parameter: at the start it represents the size of the buffer, and when done, it contains the number of bytes actually transferred. A short read returns zero for an error code. */
679 ubik_Read(register struct ubik_trans *transPtr, char *buffer,
682 register afs_int32 code;
684 /* reads are easy to do: handle locally */
685 DBHOLD(transPtr->dbase);
686 if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
687 DBRELE(transPtr->dbase);
692 udisk_read(transPtr, transPtr->seekFile, buffer, transPtr->seekPos,
695 transPtr->seekPos += length;
697 DBRELE(transPtr->dbase);
701 /* This routine will flush the io data in the iovec structures. It first
702 * flushes to the local disk and then uses ContactQuorum to write it to
706 ubik_Flush(struct ubik_trans *transPtr)
708 afs_int32 code, error = 0;
710 if (transPtr->type != UBIK_WRITETRANS)
712 if (!transPtr->iovec_info.iovec_wrt_len
713 || !transPtr->iovec_info.iovec_wrt_val)
716 DBHOLD(transPtr->dbase);
717 if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
718 ERROR_EXIT(UNOQUORUM);
719 if (!ubeacon_AmSyncSite()) /* only sync site can write */
720 ERROR_EXIT(UNOTSYNC);
722 /* Update the rest of the servers in the quorum */
724 ContactQuorum(DISK_WriteV, transPtr, 0, &transPtr->iovec_info,
725 &transPtr->iovec_data);
727 udisk_abort(transPtr);
728 ContactQuorum(DISK_Abort, transPtr, 0); /* force aborts to the others */
729 transPtr->iovec_info.iovec_wrt_len = 0;
730 transPtr->iovec_data.iovec_buf_len = 0;
734 /* Wrote the buffers out, so start at scratch again */
735 transPtr->iovec_info.iovec_wrt_len = 0;
736 transPtr->iovec_data.iovec_buf_len = 0;
739 DBRELE(transPtr->dbase);
744 ubik_Write(register struct ubik_trans *transPtr, char *buffer,
747 struct ubik_iovec *iovec;
748 afs_int32 code, error = 0;
749 afs_int32 pos, len, size;
751 if (transPtr->type != UBIK_WRITETRANS)
756 if (length > IOVEC_MAXBUF) {
757 for (pos = 0, len = length; len > 0; len -= size, pos += size) {
758 size = ((len < IOVEC_MAXBUF) ? len : IOVEC_MAXBUF);
759 code = ubik_Write(transPtr, &buffer[pos], size);
766 if (!transPtr->iovec_info.iovec_wrt_val) {
767 transPtr->iovec_info.iovec_wrt_len = 0;
768 transPtr->iovec_info.iovec_wrt_val =
769 (struct ubik_iovec *)malloc(IOVEC_MAXWRT *
770 sizeof(struct ubik_iovec));
771 transPtr->iovec_data.iovec_buf_len = 0;
772 transPtr->iovec_data.iovec_buf_val = (char *)malloc(IOVEC_MAXBUF);
773 if (!transPtr->iovec_info.iovec_wrt_val
774 || !transPtr->iovec_data.iovec_buf_val) {
775 if (transPtr->iovec_info.iovec_wrt_val)
776 free(transPtr->iovec_info.iovec_wrt_val);
777 transPtr->iovec_info.iovec_wrt_val = 0;
778 if (transPtr->iovec_data.iovec_buf_val)
779 free(transPtr->iovec_data.iovec_buf_val);
780 transPtr->iovec_data.iovec_buf_val = 0;
785 /* If this write won't fit in the structure, then flush it out and start anew */
786 if ((transPtr->iovec_info.iovec_wrt_len >= IOVEC_MAXWRT)
787 || ((length + transPtr->iovec_data.iovec_buf_len) > IOVEC_MAXBUF)) {
788 code = ubik_Flush(transPtr);
793 DBHOLD(transPtr->dbase);
794 if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
795 ERROR_EXIT(UNOQUORUM);
796 if (!ubeacon_AmSyncSite()) /* only sync site can write */
797 ERROR_EXIT(UNOTSYNC);
799 /* Write to the local disk */
801 udisk_write(transPtr, transPtr->seekFile, buffer, transPtr->seekPos,
804 udisk_abort(transPtr);
805 transPtr->iovec_info.iovec_wrt_len = 0;
806 transPtr->iovec_data.iovec_buf_len = 0;
807 DBRELE(transPtr->dbase);
811 /* Collect writes for the other ubik servers (to be done in bulk) */
812 iovec = (struct ubik_iovec *)transPtr->iovec_info.iovec_wrt_val;
813 iovec[transPtr->iovec_info.iovec_wrt_len].file = transPtr->seekFile;
814 iovec[transPtr->iovec_info.iovec_wrt_len].position = transPtr->seekPos;
815 iovec[transPtr->iovec_info.iovec_wrt_len].length = length;
817 memcpy(&transPtr->iovec_data.
818 iovec_buf_val[transPtr->iovec_data.iovec_buf_len], buffer, length);
820 transPtr->iovec_info.iovec_wrt_len++;
821 transPtr->iovec_data.iovec_buf_len += length;
822 transPtr->seekPos += length;
825 DBRELE(transPtr->dbase);
829 /* This sets the file pointer associated with the current transaction to the appropriate file and byte position. Unlike Unix files, a transaction is labelled by both a file number (fileid) and a byte position relative to the specified file (position). */
832 ubik_Seek(register struct ubik_trans *transPtr, afs_int32 fileid,
835 register afs_int32 code;
837 DBHOLD(transPtr->dbase);
838 if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
841 transPtr->seekFile = fileid;
842 transPtr->seekPos = position;
845 DBRELE(transPtr->dbase);
849 /* This call returns the file pointer associated with the specified transaction in fileid and position. */
852 ubik_Tell(register struct ubik_trans *transPtr, afs_int32 * fileid,
853 afs_int32 * position)
855 DBHOLD(transPtr->dbase);
856 *fileid = transPtr->seekFile;
857 *position = transPtr->seekPos;
858 DBRELE(transPtr->dbase);
862 /* This sets the file size for the currently-selected file to length bytes, if length is less than the file's current size. */
865 ubik_Truncate(register struct ubik_trans *transPtr, afs_int32 length)
867 afs_int32 code, error = 0;
869 /* Will also catch if not UBIK_WRITETRANS */
870 code = ubik_Flush(transPtr);
874 DBHOLD(transPtr->dbase);
875 /* first, check that quorum is still good, and that dbase is up-to-date */
876 if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
877 ERROR_EXIT(UNOQUORUM);
878 if (!ubeacon_AmSyncSite())
879 ERROR_EXIT(UNOTSYNC);
881 /* now do the operation locally, and propagate it out */
882 code = udisk_truncate(transPtr, transPtr->seekFile, length);
885 ContactQuorum(DISK_Truncate, transPtr, 0, transPtr->seekFile,
889 /* we must abort the operation */
890 udisk_abort(transPtr);
891 ContactQuorum(DISK_Abort, transPtr, 0); /* force aborts to the others */
896 DBRELE(transPtr->dbase);
900 /* set a lock; all locks are released on transaction end (commit/abort) */
902 ubik_SetLock(struct ubik_trans *atrans, afs_int32 apos, afs_int32 alen,
905 afs_int32 code = 0, error = 0;
907 if (atype == LOCKWRITE) {
908 if (atrans->type == UBIK_READTRANS)
910 code = ubik_Flush(atrans);
915 DBHOLD(atrans->dbase);
916 if (atype == LOCKREAD) {
917 code = ulock_getLock(atrans, atype, 1);
921 /* first, check that quorum is still good, and that dbase is up-to-date */
922 if (!urecovery_AllBetter(atrans->dbase, atrans->flags & TRREADANY))
923 ERROR_EXIT(UNOQUORUM);
924 if (!ubeacon_AmSyncSite())
925 ERROR_EXIT(UNOTSYNC);
927 /* now do the operation locally, and propagate it out */
928 code = ulock_getLock(atrans, atype, 1);
930 code = ContactQuorum(DISK_Lock, atrans, 0, 0, 1 /*unused */ ,
931 1 /*unused */ , LOCKWRITE);
934 /* we must abort the operation */
936 ContactQuorum(DISK_Abort, atrans, 0); /* force aborts to the others */
942 DBRELE(atrans->dbase);
946 /* utility to wait for a version # to change */
948 ubik_WaitVersion(register struct ubik_dbase *adatabase,
949 register struct ubik_version *aversion)
952 /* wait until version # changes, and then return */
953 if (vcmp(*aversion, adatabase->version) != 0)
955 #if defined(AFS_PTHREAD_ENV) && defined(UBIK_PTHREAD_ENV)
956 assert(pthread_mutex_lock(&adatabase->version_mutex) == 0);
957 assert(pthread_cond_wait(&adatabase->version_cond,&adatabase->version_mutex) == 0);
958 assert(pthread_mutex_unlock(&adatabase->version_mutex) == 0);
960 LWP_WaitProcess(&adatabase->version); /* same vers, just wait */
965 /* utility to get the version of the dbase a transaction is dealing with */
967 ubik_GetVersion(register struct ubik_trans *atrans,
968 register struct ubik_version *avers)
970 *avers = atrans->dbase->version;
974 /* Facility to simplify database caching. Returns zero if last trans was done
975 on the local server and was successful. If return value is non-zero and the
976 caller is a server caching part of the Ubik database, it should invalidate
977 that cache. A return value of -1 means bad (NULL) argument. */
980 ubik_CacheUpdate(register struct ubik_trans *atrans)
982 if (!(atrans && atrans->dbase))
984 return vcmp(atrans->dbase->cachedVersion, atrans->dbase->version) != 0;
988 panic(char *a, char *b, char *c, char *d)
990 ubik_print("Ubik PANIC: ");
991 ubik_print(a, b, c, d);
993 ubik_print("BACK FROM ABORT\n"); /* shouldn't come back */
994 exit(1); /* never know, though */
998 ** This functions takes an IP addresses as its parameter. It returns the
999 ** the primary IP address that is on the host passed in.
1002 ubikGetPrimaryInterfaceAddr(afs_uint32 addr)
1004 struct ubik_server *ts;
1007 for (ts = ubik_servers; ts; ts = ts->next)
1008 for (j = 0; j < UBIK_MAX_INTERFACE_ADDR; j++)
1009 if (ts->addr[j] == addr)
1010 return ts->addr[0]; /* net byte order */
1011 return 0; /* if not in server database, return error */