2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
15 #include <sys/types.h>
21 #include <netinet/in.h>
36 #include <afs/afsutil.h>
38 #define UBIK_INTERNALS
42 /* This module is responsible for determining when the system has
43 * recovered to the point that it can handle new transactions. It
44 * replays logs, polls to determine the current dbase after a crash,
45 * and distributes the new database to the others.
48 /* The sync site associates a version number with each database. It
49 * broadcasts the version associated with its current dbase in every
50 * one of its beacon messages. When the sync site send a dbase to a
51 * server, it also sends the db's version. A non-sync site server can
52 * tell if it has the right dbase version by simply comparing the
53 * version from the beacon message (uvote_dbVersion) with the version
54 * associated with the database (ubik_dbase->version). The sync site
55 * itself simply has one counter to keep track of all of this (again
56 * ubik_dbase->version).
59 /* sync site: routine called when the sync site loses its quorum; this
60 * procedure is called "up" from the beacon package. It resyncs the
61 * dbase and nudges the recovery daemon to try to propagate out the
62 * changes. It also resets the recovery daemon's state, since
63 * recovery must potentially find a new dbase to propagate out. This
64 * routine should not do anything with variables used by non-sync site
68 /* if this flag is set, then ubik will use only the primary address
69 ** ( the address specified in the CellServDB) to contact other
70 ** ubik servers. Ubik recovery will not try opening connections
71 ** to the alternate interface addresses.
73 int ubikPrimaryAddrOnly;
75 int urecovery_ResetState(void)
78 LWP_NoYieldSignal(&urecovery_state);
82 /* sync site: routine called when a non-sync site server goes down; restarts recovery
83 * process to send missing server the new db when it comes back up.
84 * This routine should not do anything with variables used by non-sync site servers.
86 int urecovery_LostServer(void)
88 LWP_NoYieldSignal(&urecovery_state);
92 /* return true iff we have a current database (called by both sync
93 * sites and non-sync sites) How do we determine this? If we're the
94 * sync site, we wait until recovery has finished fetching and
95 * re-labelling its dbase (it may still be trying to propagate it out
96 * to everyone else; that's THEIR problem). If we're not the sync
97 * site, then we must have a dbase labelled with the right version,
98 * and we must have a currently-good sync site.
100 int urecovery_AllBetter(register struct ubik_dbase *adbase, int areadAny)
102 register afs_int32 rcode;
104 ubik_dprint("allbetter checking\n");
109 if (ubik_dbase->version.epoch > 1)
110 rcode = 1; /* Happy with any good version of database */
113 /* Check if we're sync site and we've got the right data */
114 else if (ubeacon_AmSyncSite() && (urecovery_state & UBIK_RECHAVEDB)) {
118 /* next, check if we're aux site, and we've ever been sent the
119 * right data (note that if a dbase update fails, we won't think
120 * that the sync site is still the sync site, 'cause it won't talk
121 * to us until a timeout period has gone by. When we recover, we
122 * leave this clear until we get a new dbase */
123 else if ( (uvote_GetSyncSite() &&
124 (vcmp(ubik_dbVersion, ubik_dbase->version) == 0)) ) { /* && order is important */
128 ubik_dprint("allbetter: returning %d\n", rcode);
132 /* abort all transactions on this database */
133 int urecovery_AbortAll(struct ubik_dbase *adbase)
135 register struct ubik_trans *tt;
136 for(tt = adbase->activeTrans; tt; tt=tt->next) {
142 /* this routine aborts the current remote transaction, if any, if the tid is wrong */
143 int urecovery_CheckTid(register struct ubik_tid *atid)
145 if (ubik_currentTrans) {
146 /* there is remote write trans, see if we match, see if this
147 * is a new transaction */
148 if (atid->epoch != ubik_currentTrans->tid.epoch || atid->counter > ubik_currentTrans->tid.counter) {
149 /* don't match, abort it */
150 /* If the thread is not waiting for lock - ok to end it */
151 #if !defined(UBIK_PAUSE)
152 if (ubik_currentTrans->locktype != LOCKWAIT) {
153 #endif /* UBIK_PAUSE */
154 udisk_end(ubik_currentTrans);
155 #if !defined(UBIK_PAUSE)
157 #endif /* UBIK_PAUSE */
158 ubik_currentTrans = (struct ubik_trans *) 0;
163 /* log format is defined here, and implicitly in disk.c
165 * 4 byte opcode, followed by parameters, each 4 bytes long. All integers
166 * are in logged in network standard byte order, in case we want to move logs
167 * from machine-to-machine someday.
169 * Begin transaction: opcode
170 * Commit transaction: opcode, version (8 bytes)
171 * Truncate file: opcode, file number, length
172 * Abort transaction: opcode
173 * Write data: opcode, file, position, length, <length> data bytes
175 * A very simple routine, it just replays the log. Note that this is a new-value only log, which
176 * implies that no uncommitted data is written to the dbase: one writes data to the log, including
177 * the commit record, then we allow data to be written through to the dbase. In our particular
178 * implementation, once a transaction is done, we write out the pages to the database, so that
179 * our buffer package doesn't have to know about stable and uncommitted data in the memory buffers:
180 * any changed data while there is an uncommitted write transaction can be zapped during an
181 * abort and the remaining dbase on the disk is exactly the right dbase, without having to read
186 static int ReplayLog(register struct ubik_dbase *adbase)
189 register afs_int32 code, tpos;
191 afs_int32 len, thisSize, tfile, filePos;
193 afs_int32 syncFile = -1;
194 afs_int32 data[1024];
196 /* read the lock twice, once to see whether we have a transaction to deal
197 with that committed, (theoretically, we should support more than one
198 trans in the log at once, but not yet), and once replaying the
202 /* for now, assume that all ops in log pertain to one transaction; see if there's a commit */
204 code = (*adbase->read)(adbase, LOGFILE, &opcode, tpos, sizeof(afs_int32));
205 if (code != sizeof(afs_int32)) break;
206 if (opcode == LOGNEW) {
207 /* handle begin trans */
208 tpos += sizeof(afs_int32);
210 else if (opcode == LOGABORT) break;
211 else if (opcode == LOGEND) {
215 else if (opcode == LOGTRUNCATE) {
217 code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 2*sizeof(afs_int32));
218 if (code != 2*sizeof(afs_int32)) break; /* premature eof or io error */
219 tpos += 2*sizeof(afs_int32);
221 else if (opcode == LOGDATA) {
223 code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 3*sizeof(afs_int32));
224 if (code != 3*sizeof(afs_int32)) break;
225 /* otherwise, skip over the data bytes, too */
226 tpos += buffer[2] + 3*sizeof(afs_int32);
229 ubik_dprint("corrupt log opcode (%d) at position %d\n", opcode, tpos);
230 break; /* corrupt log! */
234 /* actually do the replay; log should go all the way through the commit record, since
235 we just read it above. */
240 code = (*adbase->read)(adbase, LOGFILE, &opcode, tpos, sizeof(afs_int32));
241 if (code != sizeof(afs_int32)) break;
242 if (opcode == LOGNEW) {
243 /* handle begin trans */
244 tpos += sizeof(afs_int32);
246 else if (opcode == LOGABORT) panic("log abort\n");
247 else if (opcode == LOGEND) {
249 code = (*adbase->read) (adbase, LOGFILE, buffer, tpos, 2*sizeof(afs_int32));
250 if (code != 2*sizeof(afs_int32)) return UBADLOG;
251 code = (*adbase->setlabel) (adbase, 0, buffer);
252 if (code) return code;
254 break; /* all done now */
256 else if (opcode == LOGTRUNCATE) {
258 code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 2*sizeof(afs_int32));
259 if (code != 2*sizeof(afs_int32)) break; /* premature eof or io error */
260 tpos += 2*sizeof(afs_int32);
261 code = (*adbase->truncate) (adbase, ntohl(buffer[0]), ntohl(buffer[1]));
262 if (code) return code;
264 else if (opcode == LOGDATA) {
266 code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 3*sizeof(afs_int32));
267 if (code != 3*sizeof(afs_int32)) break;
268 tpos += 3*sizeof(afs_int32);
269 /* otherwise, skip over the data bytes, too */
270 len = ntohl(buffer[2]); /* total number of bytes to copy */
271 filePos = ntohl(buffer[1]);
272 tfile = ntohl(buffer[0]);
273 /* try to minimize file syncs */
274 if (syncFile != tfile) {
275 if (syncFile >= 0) code = (*adbase->sync)(adbase, syncFile);
278 if (code) return code;
281 thisSize = (len > sizeof(data)? sizeof(data) : len);
282 /* copy sizeof(data) buffer bytes at a time */
283 code = (*adbase->read)(adbase, LOGFILE, data, tpos, thisSize);
284 if (code != thisSize) return UBADLOG;
285 code = (*adbase->write)(adbase, tfile, data, filePos, thisSize);
286 if (code != thisSize) return UBADLOG;
293 ubik_dprint("corrupt log opcode (%d) at position %d\n", opcode, tpos);
294 break; /* corrupt log! */
298 if (syncFile >= 0) code = (*adbase->sync)(adbase, syncFile);
299 if (code) return code;
302 ubik_dprint("Log read error on pass 2\n");
307 /* now truncate the log, we're done with it */
308 code = (*adbase->truncate)(adbase, LOGFILE, 0);
312 /* Called at initialization to figure out version of the dbase we really have.
313 * This routine is called after replaying the log; it reads the restored labels.
315 static int InitializeDB(register struct ubik_dbase *adbase)
317 register afs_int32 code;
319 code = (*adbase->getlabel)(adbase, 0, &adbase->version);
321 /* try setting the label to a new value */
322 adbase->version.epoch = 1; /* value for newly-initialized db */
323 adbase->version.counter = 1;
324 code = (*adbase->setlabel) (adbase, 0, &adbase->version);
326 /* failed, try to set it back */
327 adbase->version.epoch = 0;
328 adbase->version.counter = 0;
329 (*adbase->setlabel) (adbase, 0, &adbase->version);
331 LWP_NoYieldSignal(&adbase->version);
336 /* initialize the local dbase
337 * We replay the logs and then read the resulting file to figure out what version we've really got.
339 int urecovery_Initialize(register struct ubik_dbase *adbase)
341 register afs_int32 code;
343 code = ReplayLog(adbase);
344 if (code) return code;
345 code = InitializeDB(adbase);
349 /* Main interaction loop for the recovery manager
350 * The recovery light-weight process only runs when you're the
351 * synchronization site. It performs the following tasks, if and only
352 * if the prerequisite tasks have been performed successfully (it
353 * keeps track of which ones have been performed in its bit map,
356 * First, it is responsible for probing that all servers are up. This
357 * is the only operation that must be performed even if this is not
358 * yet the sync site, since otherwise this site may not notice that
359 * enough other machines are running to even elect this guy to be the
362 * After that, the recovery process does nothing until the beacon and
363 * voting modules manage to get this site elected sync site.
365 * After becoming sync site, recovery first attempts to find the best
366 * database available in the network (it must do this in order to
367 * ensure finding the latest committed data). After finding the right
368 * database, it must fetch this dbase to the sync site.
370 * After fetching the dbase, it relabels it with a new version number,
371 * to ensure that everyone recognizes this dbase as the most recent
374 * One the dbase has been relabelled, this machine can start handling
375 * requests. However, the recovery module still has one more task:
376 * propagating the dbase out to everyone who is up in the network.
378 int urecovery_Interact(void)
380 afs_int32 code, tcode;
381 struct ubik_server *bestServer = NULL;
382 struct ubik_server *ts;
383 int dbok, doingRPC, now;
384 afs_int32 lastProbeTime, lastDBVCheck;
385 /* if we're the sync site, the best db version we've found yet */
386 static struct ubik_version bestDBVersion;
387 struct ubik_version tversion;
389 int length, tlen, offset, file, nbytes;
390 struct rx_call *rxcall;
392 struct ubik_stat ubikstat;
393 struct in_addr inAddr;
395 /* otherwise, begin interaction */
400 /* Run through this loop every 4 seconds */
403 IOMGR_Select(0, 0, 0, 0, &tv);
405 ubik_dprint("recovery running in state %x\n", urecovery_state);
407 /* Every 30 seconds, check all the down servers and mark them
408 * as up if they respond. When a server comes up or found to
409 * not be current, then re-find the the best database and
412 if ( (now = FT_ApproxTime()) > 30 + lastProbeTime) {
413 for (ts=ubik_servers,doingRPC=0; ts; ts=ts->next) {
419 urecovery_state &= ~UBIK_RECFOUNDDB;
421 } else if (!ts->currentDB) {
422 urecovery_state &= ~UBIK_RECFOUNDDB;
426 now = FT_ApproxTime();
430 /* Mark whether we are the sync site */
431 if (!ubeacon_AmSyncSite()) {
432 urecovery_state &= ~UBIK_RECSYNCSITE;
433 continue; /* nothing to do */
435 urecovery_state |= UBIK_RECSYNCSITE;
437 /* If a server has just come up or if we have not found the
438 * most current database, then go find the most current db.
440 if (!(urecovery_state & UBIK_RECFOUNDDB)) {
441 bestServer = (struct ubik_server *) 0;
442 bestDBVersion.epoch = 0;
443 bestDBVersion.counter = 0;
444 for(ts=ubik_servers; ts; ts=ts->next) {
445 if (!ts->up) continue; /* don't bother with these guys */
446 if (ts->isClone) continue;
447 code = DISK_GetVersion(ts->disk_rxcid, &ts->version);
449 /* perhaps this is the best version */
450 if (vcmp(ts->version, bestDBVersion) > 0) {
451 /* new best version */
452 bestDBVersion = ts->version;
457 /* take into consideration our version. Remember if we,
458 * the sync site, have the best version. Also note that
459 * we may need to send the best version out.
461 if (vcmp(ubik_dbase->version, bestDBVersion) >= 0) {
462 bestDBVersion = ubik_dbase->version;
463 bestServer = (struct ubik_server *) 0;
464 urecovery_state |= UBIK_RECHAVEDB;
466 /* Clear the flag only when we know we have to retrieve
467 * the db. Because urecovery_AllBetter() looks at it.
469 urecovery_state &= ~UBIK_RECHAVEDB;
471 lastDBVCheck = FT_ApproxTime();
472 urecovery_state |= UBIK_RECFOUNDDB;
473 urecovery_state &= ~UBIK_RECSENTDB;
475 #if defined(UBIK_PAUSE)
476 /* it's not possible for UBIK_RECFOUNDDB not to be set here.
477 * However, we might have lost UBIK_RECSYNCSITE, and that
480 if (!(urecovery_state & UBIK_RECSYNCSITE)) continue; /* lost sync */
482 if (!(urecovery_state & UBIK_RECFOUNDDB)) continue; /* not ready */
483 #endif /* UBIK_PAUSE */
485 /* If we, the sync site, do not have the best db version, then
486 * go and get it from the server that does.
488 if ((urecovery_state & UBIK_RECHAVEDB) || !bestServer) {
489 urecovery_state |= UBIK_RECHAVEDB;
491 /* we don't have the best version; we should fetch it. */
492 #if defined(UBIK_PAUSE)
495 ObtainWriteLock(&ubik_dbase->versionLock);
496 #endif /* UBIK_PAUSE */
497 urecovery_AbortAll(ubik_dbase);
499 /* Rx code to do the Bulk fetch */
502 rxcall = rx_NewCall(bestServer->disk_rxcid);
504 ubik_print("Ubik: Synchronize database with server %s\n", afs_inet_ntoa(bestServer->addr[0]));
506 code = StartDISK_GetFile(rxcall, file);
508 ubik_dprint("StartDiskGetFile failed=%d\n", code);
511 nbytes = rx_Read(rxcall, &length, sizeof(afs_int32));
512 length = ntohl(length);
513 if (nbytes != sizeof(afs_int32)) {
514 ubik_dprint("Rx-read length error=%d\n", code=BULK_ERROR);
519 /* Truncate the file firest */
520 code = (*ubik_dbase->truncate)(ubik_dbase, file, 0);
522 ubik_dprint("truncate io error=%d\n", code);
526 /* give invalid label during file transit */
528 tversion.counter = 0;
529 code = (*ubik_dbase->setlabel)(ubik_dbase, file, &tversion);
531 ubik_dprint("setlabel io error=%d\n", code);
536 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
537 nbytes = rx_Read(rxcall, tbuffer, tlen);
538 if (nbytes != tlen) {
539 ubik_dprint("Rx-read bulk error=%d\n", code=BULK_ERROR);
543 nbytes = (*ubik_dbase->write)(ubik_dbase, file, tbuffer, offset, tlen);
544 if (nbytes != tlen) {
551 code = EndDISK_GetFile(rxcall, &tversion);
553 tcode = rx_EndCall(rxcall, code);
554 if (!code) code = tcode;
556 /* we got a new file, set up its header */
557 urecovery_state |= UBIK_RECHAVEDB;
558 memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
559 (*ubik_dbase->sync)(ubik_dbase, 0); /* get data out first */
560 /* after data is good, sync disk with correct label */
561 code = (*ubik_dbase->setlabel)(ubik_dbase, 0, &ubik_dbase->version);
564 ubik_dbase->version.epoch = 0;
565 ubik_dbase->version.counter = 0;
566 ubik_print("Ubik: Synchronize database failed (error = %d)\n", code);
568 ubik_print("Ubik: Synchronize database completed\n");
570 udisk_Invalidate(ubik_dbase, 0); /* data has changed */
571 LWP_NoYieldSignal(&ubik_dbase->version);
572 #if defined(UBIK_PAUSE)
575 ReleaseWriteLock(&ubik_dbase->versionLock);
576 #endif /* UBIK_PAUSE */
578 #if defined(UBIK_PAUSE)
579 if (!(urecovery_state & UBIK_RECSYNCSITE)) continue; /* lost sync */
580 #endif /* UBIK_PAUSE */
581 if (!(urecovery_state & UBIK_RECHAVEDB)) continue; /* not ready */
583 /* If the database was newly initialized, then when we establish quorum, write
584 * a new label. This allows urecovery_AllBetter() to allow access for reads.
585 * Setting it to 2 also allows another site to come along with a newer
586 * database and overwrite this one.
588 if (ubik_dbase->version.epoch == 1) {
589 #if defined(UBIK_PAUSE)
592 ObtainWriteLock(&ubik_dbase->versionLock);
593 #endif /* UBIK_PAUSE */
594 urecovery_AbortAll(ubik_dbase);
596 ubik_dbase->version.epoch = ubik_epochTime;
597 ubik_dbase->version.counter = 1;
598 code = (*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
599 udisk_Invalidate(ubik_dbase, 0); /* data may have changed */
600 LWP_NoYieldSignal(&ubik_dbase->version);
601 #if defined(UBIK_PAUSE)
604 ReleaseWriteLock(&ubik_dbase->versionLock);
605 #endif /* UBIK_PAUSE */
608 /* Check the other sites and send the database to them if they
609 * do not have the current db.
611 if (!(urecovery_state & UBIK_RECSENTDB)) {
612 /* now propagate out new version to everyone else */
613 dbok = 1; /* start off assuming they all worked */
615 #if defined(UBIK_PAUSE)
618 ObtainWriteLock(&ubik_dbase->versionLock);
619 #endif /* UBIK_PAUSE */
621 * Check if a write transaction is in progress. We can't send the
622 * db when a write is in progress here because the db would be
623 * obsolete as soon as it goes there. Also, ops after the begin
624 * trans would reach the recepient and wouldn't find a transaction
625 * pending there. Frankly, I don't think it's possible to get past
626 * the write-lock above if there is a write transaction in progress,
627 * but then, it won't hurt to check, will it?
629 if (ubik_dbase->flags & DBWRITING) {
634 while ((ubik_dbase->flags & DBWRITING) && (safety < 500)) {
635 #if defined(UBIK_PAUSE)
638 ReleaseWriteLock(&ubik_dbase->versionLock);
639 #endif /* UBIK_PAUSE */
640 /* sleep for a little while */
641 IOMGR_Select(0, 0, 0, 0, &tv);
642 tv.tv_usec += 10000; safety++;
643 #if defined(UBIK_PAUSE)
646 ObtainWriteLock(&ubik_dbase->versionLock);
647 #endif /* UBIK_PAUSE */
651 for(ts=ubik_servers; ts; ts=ts->next) {
652 inAddr.s_addr = ts->addr[0];
654 ubik_dprint("recovery cannot send version to %s\n",
655 afs_inet_ntoa(inAddr.s_addr));
659 ubik_dprint("recovery sending version to %s\n",
660 afs_inet_ntoa(inAddr.s_addr));
661 if (vcmp(ts->version, ubik_dbase->version) != 0) {
662 ubik_dprint("recovery stating local database\n");
664 /* Rx code to do the Bulk Store */
665 code = (*ubik_dbase->stat)(ubik_dbase, 0, &ubikstat);
667 length = ubikstat.size;
669 rxcall = rx_NewCall(ts->disk_rxcid);
670 code = StartDISK_SendFile(rxcall, file, length, &ubik_dbase->version);
672 ubik_dprint("StartDiskSendFile failed=%d\n", code);
676 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
677 nbytes = (*ubik_dbase->read)(ubik_dbase, file, tbuffer, offset, tlen);
678 if (nbytes != tlen) {
679 ubik_dprint("Local disk read error=%d\n", code=UIOERROR);
682 nbytes = rx_Write(rxcall, tbuffer, tlen);
683 if (nbytes != tlen) {
684 ubik_dprint("Rx-write bulk error=%d\n", code=BULK_ERROR);
690 code = EndDISK_SendFile(rxcall);
692 code = rx_EndCall(rxcall, code);
695 /* we set a new file, process its header */
696 ts->version = ubik_dbase->version;
702 /* mark file up to date */
706 #if defined(UBIK_PAUSE)
709 ReleaseWriteLock(&ubik_dbase->versionLock);
710 #endif /* UBIK_PAUSE */
711 if (dbok) urecovery_state |= UBIK_RECSENTDB;
717 ** send a Probe to all the network address of this server
718 ** Return 0 if success, else return 1
720 int DoProbe(struct ubik_server *server)
722 struct rx_connection *conns[UBIK_MAX_INTERFACE_ADDR];
723 struct rx_connection *connSuccess = 0;
727 extern afs_int32 ubikSecIndex;
728 extern struct rx_securityClass *ubikSecClass;
730 for (i=0; (addr=server->addr[i]) && (i<UBIK_MAX_INTERFACE_ADDR);i++)
732 conns[i] = rx_NewConnection(addr, ubik_callPortal, DISK_SERVICE_ID,
733 ubikSecClass, ubikSecIndex);
735 /* user requirement to use only the primary interface */
736 if ( ubikPrimaryAddrOnly )
742 assert(i); /* at least one interface address for this server */
747 if ( !multi_error ) /* first success */
749 addr = server->addr[multi_i]; /* successful interface addr */
751 if ( server->disk_rxcid) /* destroy existing conn */
752 rx_DestroyConnection(server->disk_rxcid);
753 if ( server->vote_rxcid)
754 rx_DestroyConnection(server->vote_rxcid);
756 /* make new connections */
757 server->disk_rxcid = conns[multi_i];
758 server->vote_rxcid = rx_NewConnection(addr, ubik_callPortal,
759 VOTE_SERVICE_ID, ubikSecClass, ubikSecIndex);/* for vote reqs*/
761 connSuccess = conns[multi_i];
762 strcpy(buffer, (char*)afs_inet_ntoa(server->addr[0]));
763 ubik_print("ubik:server %s is back up: will be contacted through %s\n",
764 buffer, afs_inet_ntoa(addr));
770 /* Destroy all connections except the one on which we succeeded */
771 for ( j=0; j < i; j++)
772 if ( conns[j] != connSuccess )
773 rx_DestroyConnection(conns[j] );
776 ubik_dprint("ubik:server %s still down\n",afs_inet_ntoa(server->addr[0]));
778 if ( connSuccess ) return 0; /* success */
779 else return 1; /* failure */