2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
16 #include <sys/types.h>
22 #include <netinet/in.h>
31 #include <afs/afsutil.h>
33 #define UBIK_INTERNALS
37 /* This module is responsible for determining when the system has
38 * recovered to the point that it can handle new transactions. It
39 * replays logs, polls to determine the current dbase after a crash,
40 * and distributes the new database to the others.
43 /* The sync site associates a version number with each database. It
44 * broadcasts the version associated with its current dbase in every
45 * one of its beacon messages. When the sync site send a dbase to a
46 * server, it also sends the db's version. A non-sync site server can
47 * tell if it has the right dbase version by simply comparing the
48 * version from the beacon message (uvote_dbVersion) with the version
49 * associated with the database (ubik_dbase->version). The sync site
50 * itself simply has one counter to keep track of all of this (again
51 * ubik_dbase->version).
54 /* sync site: routine called when the sync site loses its quorum; this
55 * procedure is called "up" from the beacon package. It resyncs the
56 * dbase and nudges the recovery daemon to try to propagate out the
57 * changes. It also resets the recovery daemon's state, since
58 * recovery must potentially find a new dbase to propagate out. This
59 * routine should not do anything with variables used by non-sync site
63 /* if this flag is set, then ubik will use only the primary address
64 ** ( the address specified in the CellServDB) to contact other
65 ** ubik servers. Ubik recovery will not try opening connections
66 ** to the alternate interface addresses.
68 int ubikPrimaryAddrOnly;
71 urecovery_ResetState(void)
74 LWP_NoYieldSignal(&urecovery_state);
78 /* sync site: routine called when a non-sync site server goes down; restarts recovery
79 * process to send missing server the new db when it comes back up.
80 * This routine should not do anything with variables used by non-sync site servers.
83 urecovery_LostServer(void)
85 LWP_NoYieldSignal(&urecovery_state);
89 /* return true iff we have a current database (called by both sync
90 * sites and non-sync sites) How do we determine this? If we're the
91 * sync site, we wait until recovery has finished fetching and
92 * re-labelling its dbase (it may still be trying to propagate it out
93 * to everyone else; that's THEIR problem). If we're not the sync
94 * site, then we must have a dbase labelled with the right version,
95 * and we must have a currently-good sync site.
98 urecovery_AllBetter(register struct ubik_dbase *adbase, int areadAny)
100 register afs_int32 rcode;
102 ubik_dprint("allbetter checking\n");
107 if (ubik_dbase->version.epoch > 1)
108 rcode = 1; /* Happy with any good version of database */
111 /* Check if we're sync site and we've got the right data */
112 else if (ubeacon_AmSyncSite() && (urecovery_state & UBIK_RECHAVEDB)) {
116 /* next, check if we're aux site, and we've ever been sent the
117 * right data (note that if a dbase update fails, we won't think
118 * that the sync site is still the sync site, 'cause it won't talk
119 * to us until a timeout period has gone by. When we recover, we
120 * leave this clear until we get a new dbase */
121 else if ((uvote_GetSyncSite() && (vcmp(ubik_dbVersion, ubik_dbase->version) == 0))) { /* && order is important */
125 ubik_dprint("allbetter: returning %d\n", rcode);
129 /* abort all transactions on this database */
131 urecovery_AbortAll(struct ubik_dbase *adbase)
133 register struct ubik_trans *tt;
134 for (tt = adbase->activeTrans; tt; tt = tt->next) {
140 /* this routine aborts the current remote transaction, if any, if the tid is wrong */
142 urecovery_CheckTid(register struct ubik_tid *atid)
144 if (ubik_currentTrans) {
145 /* there is remote write trans, see if we match, see if this
146 * is a new transaction */
147 if (atid->epoch != ubik_currentTrans->tid.epoch
148 || atid->counter > ubik_currentTrans->tid.counter) {
149 /* don't match, abort it */
150 /* If the thread is not waiting for lock - ok to end it */
151 #if !defined(UBIK_PAUSE)
152 if (ubik_currentTrans->locktype != LOCKWAIT) {
153 #endif /* UBIK_PAUSE */
154 udisk_end(ubik_currentTrans);
155 #if !defined(UBIK_PAUSE)
157 #endif /* UBIK_PAUSE */
158 ubik_currentTrans = (struct ubik_trans *)0;
164 /* log format is defined here, and implicitly in disk.c
166 * 4 byte opcode, followed by parameters, each 4 bytes long. All integers
167 * are in logged in network standard byte order, in case we want to move logs
168 * from machine-to-machine someday.
170 * Begin transaction: opcode
171 * Commit transaction: opcode, version (8 bytes)
172 * Truncate file: opcode, file number, length
173 * Abort transaction: opcode
174 * Write data: opcode, file, position, length, <length> data bytes
176 * A very simple routine, it just replays the log. Note that this is a new-value only log, which
177 * implies that no uncommitted data is written to the dbase: one writes data to the log, including
178 * the commit record, then we allow data to be written through to the dbase. In our particular
179 * implementation, once a transaction is done, we write out the pages to the database, so that
180 * our buffer package doesn't have to know about stable and uncommitted data in the memory buffers:
181 * any changed data while there is an uncommitted write transaction can be zapped during an
182 * abort and the remaining dbase on the disk is exactly the right dbase, without having to read
188 ReplayLog(register struct ubik_dbase *adbase)
191 register afs_int32 code, tpos;
193 afs_int32 len, thisSize, tfile, filePos;
195 afs_int32 syncFile = -1;
196 afs_int32 data[1024];
198 /* read the lock twice, once to see whether we have a transaction to deal
199 * with that committed, (theoretically, we should support more than one
200 * trans in the log at once, but not yet), and once replaying the
204 /* for now, assume that all ops in log pertain to one transaction; see if there's a commit */
207 (*adbase->read) (adbase, LOGFILE, (char *)&opcode, tpos,
209 if (code != sizeof(afs_int32))
211 if (opcode == LOGNEW) {
212 /* handle begin trans */
213 tpos += sizeof(afs_int32);
214 } else if (opcode == LOGABORT)
216 else if (opcode == LOGEND) {
219 } else if (opcode == LOGTRUNCATE) {
222 (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
223 2 * sizeof(afs_int32));
224 if (code != 2 * sizeof(afs_int32))
225 break; /* premature eof or io error */
226 tpos += 2 * sizeof(afs_int32);
227 } else if (opcode == LOGDATA) {
230 (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
231 3 * sizeof(afs_int32));
232 if (code != 3 * sizeof(afs_int32))
234 /* otherwise, skip over the data bytes, too */
235 tpos += buffer[2] + 3 * sizeof(afs_int32);
237 ubik_dprint("corrupt log opcode (%d) at position %d\n", opcode,
239 break; /* corrupt log! */
243 /* actually do the replay; log should go all the way through the commit record, since
244 * we just read it above. */
250 (*adbase->read) (adbase, LOGFILE, (char *)&opcode, tpos,
252 if (code != sizeof(afs_int32))
254 if (opcode == LOGNEW) {
255 /* handle begin trans */
256 tpos += sizeof(afs_int32);
257 } else if (opcode == LOGABORT)
258 panic("log abort\n");
259 else if (opcode == LOGEND) {
262 (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
263 2 * sizeof(afs_int32));
264 if (code != 2 * sizeof(afs_int32))
266 code = (*adbase->setlabel) (adbase, 0, (ubik_version *)buffer);
270 break; /* all done now */
271 } else if (opcode == LOGTRUNCATE) {
274 (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
275 2 * sizeof(afs_int32));
276 if (code != 2 * sizeof(afs_int32))
277 break; /* premature eof or io error */
278 tpos += 2 * sizeof(afs_int32);
280 (*adbase->truncate) (adbase, ntohl(buffer[0]),
284 } else if (opcode == LOGDATA) {
287 (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
288 3 * sizeof(afs_int32));
289 if (code != 3 * sizeof(afs_int32))
291 tpos += 3 * sizeof(afs_int32);
292 /* otherwise, skip over the data bytes, too */
293 len = ntohl(buffer[2]); /* total number of bytes to copy */
294 filePos = ntohl(buffer[1]);
295 tfile = ntohl(buffer[0]);
296 /* try to minimize file syncs */
297 if (syncFile != tfile) {
299 code = (*adbase->sync) (adbase, syncFile);
307 thisSize = (len > sizeof(data) ? sizeof(data) : len);
308 /* copy sizeof(data) buffer bytes at a time */
310 (*adbase->read) (adbase, LOGFILE, (char *)data, tpos,
312 if (code != thisSize)
315 (*adbase->write) (adbase, tfile, (char *)data, filePos,
317 if (code != thisSize)
324 ubik_dprint("corrupt log opcode (%d) at position %d\n",
326 break; /* corrupt log! */
331 code = (*adbase->sync) (adbase, syncFile);
335 ubik_dprint("Log read error on pass 2\n");
340 /* now truncate the log, we're done with it */
341 code = (*adbase->truncate) (adbase, LOGFILE, 0);
345 /* Called at initialization to figure out version of the dbase we really have.
346 * This routine is called after replaying the log; it reads the restored labels.
349 InitializeDB(register struct ubik_dbase *adbase)
351 register afs_int32 code;
353 code = (*adbase->getlabel) (adbase, 0, &adbase->version);
355 /* try setting the label to a new value */
356 adbase->version.epoch = 1; /* value for newly-initialized db */
357 adbase->version.counter = 1;
358 code = (*adbase->setlabel) (adbase, 0, &adbase->version);
360 /* failed, try to set it back */
361 adbase->version.epoch = 0;
362 adbase->version.counter = 0;
363 (*adbase->setlabel) (adbase, 0, &adbase->version);
365 LWP_NoYieldSignal(&adbase->version);
370 /* initialize the local dbase
371 * We replay the logs and then read the resulting file to figure out what version we've really got.
374 urecovery_Initialize(register struct ubik_dbase *adbase)
376 register afs_int32 code;
378 code = ReplayLog(adbase);
381 code = InitializeDB(adbase);
385 /* Main interaction loop for the recovery manager
386 * The recovery light-weight process only runs when you're the
387 * synchronization site. It performs the following tasks, if and only
388 * if the prerequisite tasks have been performed successfully (it
389 * keeps track of which ones have been performed in its bit map,
392 * First, it is responsible for probing that all servers are up. This
393 * is the only operation that must be performed even if this is not
394 * yet the sync site, since otherwise this site may not notice that
395 * enough other machines are running to even elect this guy to be the
398 * After that, the recovery process does nothing until the beacon and
399 * voting modules manage to get this site elected sync site.
401 * After becoming sync site, recovery first attempts to find the best
402 * database available in the network (it must do this in order to
403 * ensure finding the latest committed data). After finding the right
404 * database, it must fetch this dbase to the sync site.
406 * After fetching the dbase, it relabels it with a new version number,
407 * to ensure that everyone recognizes this dbase as the most recent
410 * One the dbase has been relabelled, this machine can start handling
411 * requests. However, the recovery module still has one more task:
412 * propagating the dbase out to everyone who is up in the network.
415 urecovery_Interact(void)
417 afs_int32 code, tcode;
418 struct ubik_server *bestServer = NULL;
419 struct ubik_server *ts;
420 int dbok, doingRPC, now;
421 afs_int32 lastProbeTime, lastDBVCheck;
422 /* if we're the sync site, the best db version we've found yet */
423 static struct ubik_version bestDBVersion;
424 struct ubik_version tversion;
426 int length, tlen, offset, file, nbytes;
427 struct rx_call *rxcall;
429 struct ubik_stat ubikstat;
430 struct in_addr inAddr;
432 /* otherwise, begin interaction */
437 /* Run through this loop every 4 seconds */
440 IOMGR_Select(0, 0, 0, 0, &tv);
442 ubik_dprint("recovery running in state %x\n", urecovery_state);
444 /* Every 30 seconds, check all the down servers and mark them
445 * as up if they respond. When a server comes up or found to
446 * not be current, then re-find the the best database and
449 if ((now = FT_ApproxTime()) > 30 + lastProbeTime) {
450 for (ts = ubik_servers, doingRPC = 0; ts; ts = ts->next) {
456 urecovery_state &= ~UBIK_RECFOUNDDB;
458 } else if (!ts->currentDB) {
459 urecovery_state &= ~UBIK_RECFOUNDDB;
463 now = FT_ApproxTime();
467 /* Mark whether we are the sync site */
468 if (!ubeacon_AmSyncSite()) {
469 urecovery_state &= ~UBIK_RECSYNCSITE;
470 continue; /* nothing to do */
472 urecovery_state |= UBIK_RECSYNCSITE;
474 /* If a server has just come up or if we have not found the
475 * most current database, then go find the most current db.
477 if (!(urecovery_state & UBIK_RECFOUNDDB)) {
478 bestServer = (struct ubik_server *)0;
479 bestDBVersion.epoch = 0;
480 bestDBVersion.counter = 0;
481 for (ts = ubik_servers; ts; ts = ts->next) {
483 continue; /* don't bother with these guys */
486 code = DISK_GetVersion(ts->disk_rxcid, &ts->version);
488 /* perhaps this is the best version */
489 if (vcmp(ts->version, bestDBVersion) > 0) {
490 /* new best version */
491 bestDBVersion = ts->version;
496 /* take into consideration our version. Remember if we,
497 * the sync site, have the best version. Also note that
498 * we may need to send the best version out.
500 if (vcmp(ubik_dbase->version, bestDBVersion) >= 0) {
501 bestDBVersion = ubik_dbase->version;
502 bestServer = (struct ubik_server *)0;
503 urecovery_state |= UBIK_RECHAVEDB;
505 /* Clear the flag only when we know we have to retrieve
506 * the db. Because urecovery_AllBetter() looks at it.
508 urecovery_state &= ~UBIK_RECHAVEDB;
510 lastDBVCheck = FT_ApproxTime();
511 urecovery_state |= UBIK_RECFOUNDDB;
512 urecovery_state &= ~UBIK_RECSENTDB;
514 #if defined(UBIK_PAUSE)
515 /* it's not possible for UBIK_RECFOUNDDB not to be set here.
516 * However, we might have lost UBIK_RECSYNCSITE, and that
519 if (!(urecovery_state & UBIK_RECSYNCSITE))
520 continue; /* lost sync */
522 if (!(urecovery_state & UBIK_RECFOUNDDB))
523 continue; /* not ready */
524 #endif /* UBIK_PAUSE */
526 /* If we, the sync site, do not have the best db version, then
527 * go and get it from the server that does.
529 if ((urecovery_state & UBIK_RECHAVEDB) || !bestServer) {
530 urecovery_state |= UBIK_RECHAVEDB;
532 /* we don't have the best version; we should fetch it. */
533 #if defined(UBIK_PAUSE)
536 ObtainWriteLock(&ubik_dbase->versionLock);
537 #endif /* UBIK_PAUSE */
538 urecovery_AbortAll(ubik_dbase);
540 /* Rx code to do the Bulk fetch */
543 rxcall = rx_NewCall(bestServer->disk_rxcid);
545 ubik_print("Ubik: Synchronize database with server %s\n",
546 afs_inet_ntoa(bestServer->addr[0]));
548 code = StartDISK_GetFile(rxcall, file);
550 ubik_dprint("StartDiskGetFile failed=%d\n", code);
553 nbytes = rx_Read(rxcall, (char *)&length, sizeof(afs_int32));
554 length = ntohl(length);
555 if (nbytes != sizeof(afs_int32)) {
556 ubik_dprint("Rx-read length error=%d\n", code = BULK_ERROR);
561 /* Truncate the file firest */
562 code = (*ubik_dbase->truncate) (ubik_dbase, file, 0);
564 ubik_dprint("truncate io error=%d\n", code);
568 /* give invalid label during file transit */
570 tversion.counter = 0;
571 code = (*ubik_dbase->setlabel) (ubik_dbase, file, &tversion);
573 ubik_dprint("setlabel io error=%d\n", code);
578 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
579 nbytes = rx_Read(rxcall, tbuffer, tlen);
580 if (nbytes != tlen) {
581 ubik_dprint("Rx-read bulk error=%d\n", code = BULK_ERROR);
586 (*ubik_dbase->write) (ubik_dbase, file, tbuffer, offset,
588 if (nbytes != tlen) {
595 code = EndDISK_GetFile(rxcall, &tversion);
597 tcode = rx_EndCall(rxcall, code);
601 /* we got a new file, set up its header */
602 urecovery_state |= UBIK_RECHAVEDB;
603 memcpy(&ubik_dbase->version, &tversion,
604 sizeof(struct ubik_version));
605 (*ubik_dbase->sync) (ubik_dbase, 0); /* get data out first */
606 /* after data is good, sync disk with correct label */
608 (*ubik_dbase->setlabel) (ubik_dbase, 0,
609 &ubik_dbase->version);
612 ubik_dbase->version.epoch = 0;
613 ubik_dbase->version.counter = 0;
614 ubik_print("Ubik: Synchronize database failed (error = %d)\n",
617 ubik_print("Ubik: Synchronize database completed\n");
619 udisk_Invalidate(ubik_dbase, 0); /* data has changed */
620 LWP_NoYieldSignal(&ubik_dbase->version);
621 #if defined(UBIK_PAUSE)
624 ReleaseWriteLock(&ubik_dbase->versionLock);
625 #endif /* UBIK_PAUSE */
627 #if defined(UBIK_PAUSE)
628 if (!(urecovery_state & UBIK_RECSYNCSITE))
629 continue; /* lost sync */
630 #endif /* UBIK_PAUSE */
631 if (!(urecovery_state & UBIK_RECHAVEDB))
632 continue; /* not ready */
634 /* If the database was newly initialized, then when we establish quorum, write
635 * a new label. This allows urecovery_AllBetter() to allow access for reads.
636 * Setting it to 2 also allows another site to come along with a newer
637 * database and overwrite this one.
639 if (ubik_dbase->version.epoch == 1) {
640 #if defined(UBIK_PAUSE)
643 ObtainWriteLock(&ubik_dbase->versionLock);
644 #endif /* UBIK_PAUSE */
645 urecovery_AbortAll(ubik_dbase);
647 ubik_dbase->version.epoch = ubik_epochTime;
648 ubik_dbase->version.counter = 1;
650 (*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
651 udisk_Invalidate(ubik_dbase, 0); /* data may have changed */
652 LWP_NoYieldSignal(&ubik_dbase->version);
653 #if defined(UBIK_PAUSE)
656 ReleaseWriteLock(&ubik_dbase->versionLock);
657 #endif /* UBIK_PAUSE */
660 /* Check the other sites and send the database to them if they
661 * do not have the current db.
663 if (!(urecovery_state & UBIK_RECSENTDB)) {
664 /* now propagate out new version to everyone else */
665 dbok = 1; /* start off assuming they all worked */
667 #if defined(UBIK_PAUSE)
670 ObtainWriteLock(&ubik_dbase->versionLock);
671 #endif /* UBIK_PAUSE */
673 * Check if a write transaction is in progress. We can't send the
674 * db when a write is in progress here because the db would be
675 * obsolete as soon as it goes there. Also, ops after the begin
676 * trans would reach the recepient and wouldn't find a transaction
677 * pending there. Frankly, I don't think it's possible to get past
678 * the write-lock above if there is a write transaction in progress,
679 * but then, it won't hurt to check, will it?
681 if (ubik_dbase->flags & DBWRITING) {
686 while ((ubik_dbase->flags & DBWRITING) && (safety < 500)) {
687 #if defined(UBIK_PAUSE)
690 ReleaseWriteLock(&ubik_dbase->versionLock);
691 #endif /* UBIK_PAUSE */
692 /* sleep for a little while */
693 IOMGR_Select(0, 0, 0, 0, &tv);
696 #if defined(UBIK_PAUSE)
699 ObtainWriteLock(&ubik_dbase->versionLock);
700 #endif /* UBIK_PAUSE */
704 for (ts = ubik_servers; ts; ts = ts->next) {
705 inAddr.s_addr = ts->addr[0];
707 ubik_dprint("recovery cannot send version to %s\n",
708 afs_inet_ntoa(inAddr.s_addr));
712 ubik_dprint("recovery sending version to %s\n",
713 afs_inet_ntoa(inAddr.s_addr));
714 if (vcmp(ts->version, ubik_dbase->version) != 0) {
715 ubik_dprint("recovery stating local database\n");
717 /* Rx code to do the Bulk Store */
718 code = (*ubik_dbase->stat) (ubik_dbase, 0, &ubikstat);
720 length = ubikstat.size;
722 rxcall = rx_NewCall(ts->disk_rxcid);
724 StartDISK_SendFile(rxcall, file, length,
725 &ubik_dbase->version);
727 ubik_dprint("StartDiskSendFile failed=%d\n",
734 sizeof(tbuffer) ? sizeof(tbuffer) : length);
736 (*ubik_dbase->read) (ubik_dbase, file,
737 tbuffer, offset, tlen);
738 if (nbytes != tlen) {
739 ubik_dprint("Local disk read error=%d\n",
743 nbytes = rx_Write(rxcall, tbuffer, tlen);
744 if (nbytes != tlen) {
745 ubik_dprint("Rx-write bulk error=%d\n", code =
752 code = EndDISK_SendFile(rxcall);
754 code = rx_EndCall(rxcall, code);
757 /* we set a new file, process its header */
758 ts->version = ubik_dbase->version;
763 /* mark file up to date */
767 #if defined(UBIK_PAUSE)
770 ReleaseWriteLock(&ubik_dbase->versionLock);
771 #endif /* UBIK_PAUSE */
773 urecovery_state |= UBIK_RECSENTDB;
779 ** send a Probe to all the network address of this server
780 ** Return 0 if success, else return 1
783 DoProbe(struct ubik_server *server)
785 struct rx_connection *conns[UBIK_MAX_INTERFACE_ADDR];
786 struct rx_connection *connSuccess = 0;
790 extern afs_int32 ubikSecIndex;
791 extern struct rx_securityClass *ubikSecClass;
793 for (i = 0; (addr = server->addr[i]) && (i < UBIK_MAX_INTERFACE_ADDR);
796 rx_NewConnection(addr, ubik_callPortal, DISK_SERVICE_ID,
797 ubikSecClass, ubikSecIndex);
799 /* user requirement to use only the primary interface */
800 if (ubikPrimaryAddrOnly) {
805 assert(i); /* at least one interface address for this server */
809 if (!multi_error) { /* first success */
810 addr = server->addr[multi_i]; /* successful interface addr */
812 if (server->disk_rxcid) /* destroy existing conn */
813 rx_DestroyConnection(server->disk_rxcid);
814 if (server->vote_rxcid)
815 rx_DestroyConnection(server->vote_rxcid);
817 /* make new connections */
818 server->disk_rxcid = conns[multi_i];
819 server->vote_rxcid = rx_NewConnection(addr, ubik_callPortal, VOTE_SERVICE_ID, ubikSecClass, ubikSecIndex); /* for vote reqs */
821 connSuccess = conns[multi_i];
822 strcpy(buffer, (char *)afs_inet_ntoa(server->addr[0]));
824 ("ubik:server %s is back up: will be contacted through %s\n",
825 buffer, afs_inet_ntoa(addr));
831 /* Destroy all connections except the one on which we succeeded */
832 for (j = 0; j < i; j++)
833 if (conns[j] != connSuccess)
834 rx_DestroyConnection(conns[j]);
837 ubik_dprint("ubik:server %s still down\n",
838 afs_inet_ntoa(server->addr[0]));
841 return 0; /* success */
843 return 1; /* failure */