2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
15 #include <sys/types.h>
21 #include <netinet/in.h>
29 #include <afs/afsutil.h>
31 #define UBIK_INTERNALS
35 /* This module is responsible for determining when the system has
36 * recovered to the point that it can handle new transactions. It
37 * replays logs, polls to determine the current dbase after a crash,
38 * and distributes the new database to the others.
41 /* The sync site associates a version number with each database. It
42 * broadcasts the version associated with its current dbase in every
43 * one of its beacon messages. When the sync site send a dbase to a
44 * server, it also sends the db's version. A non-sync site server can
45 * tell if it has the right dbase version by simply comparing the
46 * version from the beacon message (uvote_dbVersion) with the version
47 * associated with the database (ubik_dbase->version). The sync site
48 * itself simply has one counter to keep track of all of this (again
49 * ubik_dbase->version).
52 /* sync site: routine called when the sync site loses its quorum; this
53 * procedure is called "up" from the beacon package. It resyncs the
54 * dbase and nudges the recovery daemon to try to propagate out the
55 * changes. It also resets the recovery daemon's state, since
56 * recovery must potentially find a new dbase to propagate out. This
57 * routine should not do anything with variables used by non-sync site
61 /* if this flag is set, then ubik will use only the primary address
62 ** ( the address specified in the CellServDB) to contact other
63 ** ubik servers. Ubik recovery will not try opening connections
64 ** to the alternate interface addresses.
66 int ubikPrimaryAddrOnly;
68 urecovery_ResetState() {
70 LWP_NoYieldSignal(&urecovery_state);
74 /* sync site: routine called when a non-sync site server goes down; restarts recovery
75 * process to send missing server the new db when it comes back up.
76 * This routine should not do anything with variables used by non-sync site servers.
78 urecovery_LostServer() {
79 LWP_NoYieldSignal(&urecovery_state);
83 /* return true iff we have a current database (called by both sync
84 * sites and non-sync sites) How do we determine this? If we're the
85 * sync site, we wait until recovery has finished fetching and
86 * re-labelling its dbase (it may still be trying to propagate it out
87 * to everyone else; that's THEIR problem). If we're not the sync
88 * site, then we must have a dbase labelled with the right version,
89 * and we must have a currently-good sync site.
91 urecovery_AllBetter(adbase, areadAny)
93 register struct ubik_dbase *adbase; {
94 register afs_int32 rcode;
96 ubik_dprint("allbetter checking\n");
101 if (ubik_dbase->version.epoch > 1)
102 rcode = 1; /* Happy with any good version of database */
105 /* Check if we're sync site and we've got the right data */
106 else if (ubeacon_AmSyncSite() && (urecovery_state & UBIK_RECHAVEDB)) {
110 /* next, check if we're aux site, and we've ever been sent the
111 * right data (note that if a dbase update fails, we won't think
112 * that the sync site is still the sync site, 'cause it won't talk
113 * to us until a timeout period has gone by. When we recover, we
114 * leave this clear until we get a new dbase */
115 else if ( (uvote_GetSyncSite() &&
116 (vcmp(ubik_dbVersion, ubik_dbase->version) == 0)) ) { /* && order is important */
120 ubik_dprint("allbetter: returning %d\n", rcode);
124 /* abort all transactions on this database */
125 urecovery_AbortAll(adbase)
126 struct ubik_dbase *adbase; {
127 register struct ubik_trans *tt;
128 for(tt = adbase->activeTrans; tt; tt=tt->next) {
134 /* this routine aborts the current remote transaction, if any, if the tid is wrong */
135 urecovery_CheckTid(atid)
136 register struct ubik_tid *atid; {
137 if (ubik_currentTrans) {
138 /* there is remote write trans, see if we match, see if this
139 * is a new transaction */
140 if (atid->epoch != ubik_currentTrans->tid.epoch || atid->counter > ubik_currentTrans->tid.counter) {
141 /* don't match, abort it */
142 /* If the thread is not waiting for lock - ok to end it */
143 if (ubik_currentTrans->locktype != LOCKWAIT) {
144 udisk_end(ubik_currentTrans);
146 ubik_currentTrans = (struct ubik_trans *) 0;
151 /* log format is defined here, and implicitly in disk.c
153 * 4 byte opcode, followed by parameters, each 4 bytes long. All integers
154 * are in logged in network standard byte order, in case we want to move logs
155 * from machine-to-machine someday.
157 * Begin transaction: opcode
158 * Commit transaction: opcode, version (8 bytes)
159 * Truncate file: opcode, file number, length
160 * Abort transaction: opcode
161 * Write data: opcode, file, position, length, <length> data bytes
163 * A very simple routine, it just replays the log. Note that this is a new-value only log, which
164 * implies that no uncommitted data is written to the dbase: one writes data to the log, including
165 * the commit record, then we allow data to be written through to the dbase. In our particular
166 * implementation, once a transaction is done, we write out the pages to the database, so that
167 * our buffer package doesn't have to know about stable and uncommitted data in the memory buffers:
168 * any changed data while there is an uncommitted write transaction can be zapped during an
169 * abort and the remaining dbase on the disk is exactly the right dbase, without having to read
174 static ReplayLog(adbase)
175 register struct ubik_dbase *adbase; {
177 register afs_int32 code, tpos;
179 afs_int32 len, thisSize, tfile, filePos;
181 afs_int32 syncFile = -1;
182 afs_int32 data[1024];
184 /* read the lock twice, once to see whether we have a transaction to deal
185 with that committed, (theoretically, we should support more than one
186 trans in the log at once, but not yet), and once replaying the
190 /* for now, assume that all ops in log pertain to one transaction; see if there's a commit */
192 code = (*adbase->read)(adbase, LOGFILE, &opcode, tpos, sizeof(afs_int32));
193 if (code != sizeof(afs_int32)) break;
194 if (opcode == LOGNEW) {
195 /* handle begin trans */
196 tpos += sizeof(afs_int32);
198 else if (opcode == LOGABORT) break;
199 else if (opcode == LOGEND) {
203 else if (opcode == LOGTRUNCATE) {
205 code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 2*sizeof(afs_int32));
206 if (code != 2*sizeof(afs_int32)) break; /* premature eof or io error */
207 tpos += 2*sizeof(afs_int32);
209 else if (opcode == LOGDATA) {
211 code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 3*sizeof(afs_int32));
212 if (code != 3*sizeof(afs_int32)) break;
213 /* otherwise, skip over the data bytes, too */
214 tpos += buffer[2] + 3*sizeof(afs_int32);
217 ubik_dprint("corrupt log opcode (%d) at position %d\n", opcode, tpos);
218 break; /* corrupt log! */
222 /* actually do the replay; log should go all the way through the commit record, since
223 we just read it above. */
228 code = (*adbase->read)(adbase, LOGFILE, &opcode, tpos, sizeof(afs_int32));
229 if (code != sizeof(afs_int32)) break;
230 if (opcode == LOGNEW) {
231 /* handle begin trans */
232 tpos += sizeof(afs_int32);
234 else if (opcode == LOGABORT) panic("log abort\n");
235 else if (opcode == LOGEND) {
237 code = (*adbase->read) (adbase, LOGFILE, buffer, tpos, 2*sizeof(afs_int32));
238 if (code != 2*sizeof(afs_int32)) return UBADLOG;
239 code = (*adbase->setlabel) (adbase, 0, buffer);
240 if (code) return code;
242 break; /* all done now */
244 else if (opcode == LOGTRUNCATE) {
246 code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 2*sizeof(afs_int32));
247 if (code != 2*sizeof(afs_int32)) break; /* premature eof or io error */
248 tpos += 2*sizeof(afs_int32);
249 code = (*adbase->truncate) (adbase, ntohl(buffer[0]), ntohl(buffer[1]));
250 if (code) return code;
252 else if (opcode == LOGDATA) {
254 code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 3*sizeof(afs_int32));
255 if (code != 3*sizeof(afs_int32)) break;
256 tpos += 3*sizeof(afs_int32);
257 /* otherwise, skip over the data bytes, too */
258 len = ntohl(buffer[2]); /* total number of bytes to copy */
259 filePos = ntohl(buffer[1]);
260 tfile = ntohl(buffer[0]);
261 /* try to minimize file syncs */
262 if (syncFile != tfile) {
263 if (syncFile >= 0) code = (*adbase->sync)(adbase, syncFile);
266 if (code) return code;
269 thisSize = (len > sizeof(data)? sizeof(data) : len);
270 /* copy sizeof(data) buffer bytes at a time */
271 code = (*adbase->read)(adbase, LOGFILE, data, tpos, thisSize);
272 if (code != thisSize) return UBADLOG;
273 code = (*adbase->write)(adbase, tfile, data, filePos, thisSize);
274 if (code != thisSize) return UBADLOG;
281 ubik_dprint("corrupt log opcode (%d) at position %d\n", opcode, tpos);
282 break; /* corrupt log! */
286 if (syncFile >= 0) code = (*adbase->sync)(adbase, syncFile);
287 if (code) return code;
290 ubik_dprint("Log read error on pass 2\n");
295 /* now truncate the log, we're done with it */
296 code = (*adbase->truncate)(adbase, LOGFILE, 0);
300 /* Called at initialization to figure out version of the dbase we really have.
301 * This routine is called after replaying the log; it reads the restored labels.
303 static InitializeDB(adbase)
304 register struct ubik_dbase *adbase; {
305 register afs_int32 code;
307 code = (*adbase->getlabel)(adbase, 0, &adbase->version);
309 /* try setting the label to a new value */
310 adbase->version.epoch = 1; /* value for newly-initialized db */
311 adbase->version.counter = 1;
312 code = (*adbase->setlabel) (adbase, 0, &adbase->version);
314 /* failed, try to set it back */
315 adbase->version.epoch = 0;
316 adbase->version.counter = 0;
317 (*adbase->setlabel) (adbase, 0, &adbase->version);
319 LWP_NoYieldSignal(&adbase->version);
324 /* initialize the local dbase
325 * We replay the logs and then read the resulting file to figure out what version we've really got.
327 urecovery_Initialize(adbase)
328 register struct ubik_dbase *adbase; {
329 register afs_int32 code;
331 code = ReplayLog(adbase);
332 if (code) return code;
333 code = InitializeDB(adbase);
337 /* Main interaction loop for the recovery manager
338 * The recovery light-weight process only runs when you're the
339 * synchronization site. It performs the following tasks, if and only
340 * if the prerequisite tasks have been performed successfully (it
341 * keeps track of which ones have been performed in its bit map,
344 * First, it is responsible for probing that all servers are up. This
345 * is the only operation that must be performed even if this is not
346 * yet the sync site, since otherwise this site may not notice that
347 * enough other machines are running to even elect this guy to be the
350 * After that, the recovery process does nothing until the beacon and
351 * voting modules manage to get this site elected sync site.
353 * After becoming sync site, recovery first attempts to find the best
354 * database available in the network (it must do this in order to
355 * ensure finding the latest committed data). After finding the right
356 * database, it must fetch this dbase to the sync site.
358 * After fetching the dbase, it relabels it with a new version number,
359 * to ensure that everyone recognizes this dbase as the most recent
362 * One the dbase has been relabelled, this machine can start handling
363 * requests. However, the recovery module still has one more task:
364 * propagating the dbase out to everyone who is up in the network.
366 urecovery_Interact() {
367 afs_int32 code, tcode;
368 struct ubik_server *bestServer;
369 struct ubik_server *ts;
370 int dbok, doingRPC, now;
371 afs_int32 lastProbeTime, lastDBVCheck;
372 /* if we're the sync site, the best db version we've found yet */
373 static struct ubik_version bestDBVersion;
374 struct ubik_version tversion;
376 int length, tlen, offset, file, nbytes;
377 struct rx_call *rxcall;
379 struct ubik_stat ubikstat;
380 struct in_addr inAddr;
382 /* otherwise, begin interaction */
387 /* Run through this loop every 4 seconds */
390 IOMGR_Select(0, 0, 0, 0, &tv);
392 ubik_dprint("recovery running in state %x\n", urecovery_state);
394 /* Every 30 seconds, check all the down servers and mark them
395 * as up if they respond. When a server comes up or found to
396 * not be current, then re-find the the best database and
399 if ( (now = FT_ApproxTime()) > 30 + lastProbeTime) {
400 for (ts=ubik_servers,doingRPC=0; ts; ts=ts->next) {
406 urecovery_state &= ~UBIK_RECFOUNDDB;
408 } else if (!ts->currentDB) {
409 urecovery_state &= ~UBIK_RECFOUNDDB;
413 now = FT_ApproxTime();
417 /* Mark whether we are the sync site */
418 if (!ubeacon_AmSyncSite()) {
419 urecovery_state &= ~UBIK_RECSYNCSITE;
420 continue; /* nothing to do */
422 urecovery_state |= UBIK_RECSYNCSITE;
424 /* If a server has just come up or if we have not found the
425 * most current database, then go find the most current db.
427 if (!(urecovery_state & UBIK_RECFOUNDDB)) {
428 bestServer = (struct ubik_server *) 0;
429 bestDBVersion.epoch = 0;
430 bestDBVersion.counter = 0;
431 for(ts=ubik_servers; ts; ts=ts->next) {
432 if (!ts->up) continue; /* don't bother with these guys */
433 if (ts->isClone) continue;
434 code = DISK_GetVersion(ts->disk_rxcid, &ts->version);
436 /* perhaps this is the best version */
437 if (vcmp(ts->version, bestDBVersion) > 0) {
438 /* new best version */
439 bestDBVersion = ts->version;
444 /* take into consideration our version. Remember if we,
445 * the sync site, have the best version. Also note that
446 * we may need to send the best version out.
448 if (vcmp(ubik_dbase->version, bestDBVersion) >= 0) {
449 bestDBVersion = ubik_dbase->version;
450 bestServer = (struct ubik_server *) 0;
451 urecovery_state |= UBIK_RECHAVEDB;
453 /* Clear the flag only when we know we have to retrieve
454 * the db. Because urecovery_AllBetter() looks at it.
456 urecovery_state &= ~UBIK_RECHAVEDB;
458 lastDBVCheck = FT_ApproxTime();
459 urecovery_state |= UBIK_RECFOUNDDB;
460 urecovery_state &= ~UBIK_RECSENTDB;
462 if (!(urecovery_state & UBIK_RECFOUNDDB)) continue; /* not ready */
464 /* If we, the sync site, do not have the best db version, then
465 * go and get it from the server that does.
467 if ((urecovery_state & UBIK_RECHAVEDB) || !bestServer) {
468 urecovery_state |= UBIK_RECHAVEDB;
470 /* we don't have the best version; we should fetch it. */
471 ObtainWriteLock(&ubik_dbase->versionLock);
472 urecovery_AbortAll(ubik_dbase);
474 /* Rx code to do the Bulk fetch */
477 rxcall = rx_NewCall(bestServer->disk_rxcid);
479 ubik_print("Ubik: Synchronize database with server %s\n", afs_inet_ntoa(bestServer->addr[0]));
481 code = StartDISK_GetFile(rxcall, file);
483 ubik_dprint("StartDiskGetFile failed=%d\n", code);
486 nbytes = rx_Read(rxcall, &length, sizeof(afs_int32));
487 length = ntohl(length);
488 if (nbytes != sizeof(afs_int32)) {
489 ubik_dprint("Rx-read length error=%d\n", code=BULK_ERROR);
494 /* Truncate the file firest */
495 code = (*ubik_dbase->truncate)(ubik_dbase, file, 0);
497 ubik_dprint("truncate io error=%d\n", code);
501 /* give invalid label during file transit */
503 tversion.counter = 0;
504 code = (*ubik_dbase->setlabel)(ubik_dbase, file, &tversion);
506 ubik_dprint("setlabel io error=%d\n", code);
511 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
512 nbytes = rx_Read(rxcall, tbuffer, tlen);
513 if (nbytes != tlen) {
514 ubik_dprint("Rx-read bulk error=%d\n", code=BULK_ERROR);
518 nbytes = (*ubik_dbase->write)(ubik_dbase, file, tbuffer, offset, tlen);
519 if (nbytes != tlen) {
526 code = EndDISK_GetFile(rxcall, &tversion);
528 tcode = rx_EndCall(rxcall, code);
529 if (!code) code = tcode;
531 /* we got a new file, set up its header */
532 urecovery_state |= UBIK_RECHAVEDB;
533 memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
534 (*ubik_dbase->sync)(ubik_dbase, 0); /* get data out first */
535 /* after data is good, sync disk with correct label */
536 code = (*ubik_dbase->setlabel)(ubik_dbase, 0, &ubik_dbase->version);
539 ubik_dbase->version.epoch = 0;
540 ubik_dbase->version.counter = 0;
541 ubik_print("Ubik: Synchronize database failed (error = %d)\n", code);
543 ubik_print("Ubik: Synchronize database completed\n");
545 udisk_Invalidate(ubik_dbase, 0); /* data has changed */
546 LWP_NoYieldSignal(&ubik_dbase->version);
547 ReleaseWriteLock(&ubik_dbase->versionLock);
549 if (!(urecovery_state & UBIK_RECHAVEDB)) continue; /* not ready */
551 /* If the database was newly initialized, then when we establish quorum, write
552 * a new label. This allows urecovery_AllBetter() to allow access for reads.
553 * Setting it to 2 also allows another site to come along with a newer
554 * database and overwrite this one.
556 if (ubik_dbase->version.epoch == 1) {
557 ObtainWriteLock(&ubik_dbase->versionLock);
558 urecovery_AbortAll(ubik_dbase);
560 ubik_dbase->version.epoch = ubik_epochTime;
561 ubik_dbase->version.counter = 1;
562 code = (*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
563 udisk_Invalidate(ubik_dbase, 0); /* data may have changed */
564 LWP_NoYieldSignal(&ubik_dbase->version);
565 ReleaseWriteLock(&ubik_dbase->versionLock);
568 /* Check the other sites and send the database to them if they
569 * do not have the current db.
571 if (!(urecovery_state & UBIK_RECSENTDB)) {
572 /* now propagate out new version to everyone else */
573 dbok = 1; /* start off assuming they all worked */
575 ObtainWriteLock(&ubik_dbase->versionLock);
577 * Check if a write transaction is in progress. We can't send the
578 * db when a write is in progress here because the db would be
579 * obsolete as soon as it goes there. Also, ops after the begin
580 * trans would reach the recepient and wouldn't find a transaction
581 * pending there. Frankly, I don't think it's possible to get past
582 * the write-lock above if there is a write transaction in progress,
583 * but then, it won't hurt to check, will it?
585 if (ubik_dbase->flags & DBWRITING) {
590 while ((ubik_dbase->flags & DBWRITING) && (safety < 500)) {
591 ReleaseWriteLock(&ubik_dbase->versionLock);
592 /* sleep for a little while */
593 IOMGR_Select(0, 0, 0, 0, &tv);
594 tv.tv_usec += 10000; safety++;
595 ObtainWriteLock(&ubik_dbase->versionLock);
599 for(ts=ubik_servers; ts; ts=ts->next) {
600 inAddr.s_addr = ts->addr[0];
602 ubik_dprint("recovery cannot send version to %s\n",
603 afs_inet_ntoa(inAddr.s_addr));
607 ubik_dprint("recovery sending version to %s\n",
608 afs_inet_ntoa(inAddr.s_addr));
609 if (vcmp(ts->version, ubik_dbase->version) != 0) {
610 ubik_dprint("recovery stating local database\n");
612 /* Rx code to do the Bulk Store */
613 code = (*ubik_dbase->stat)(ubik_dbase, 0, &ubikstat);
615 length = ubikstat.size;
617 rxcall = rx_NewCall(ts->disk_rxcid);
618 code = StartDISK_SendFile(rxcall, file, length, &ubik_dbase->version);
620 ubik_dprint("StartDiskSendFile failed=%d\n", code);
624 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
625 nbytes = (*ubik_dbase->read)(ubik_dbase, file, tbuffer, offset, tlen);
626 if (nbytes != tlen) {
627 ubik_dprint("Local disk read error=%d\n", code=UIOERROR);
630 nbytes = rx_Write(rxcall, tbuffer, tlen);
631 if (nbytes != tlen) {
632 ubik_dprint("Rx-write bulk error=%d\n", code=BULK_ERROR);
638 code = EndDISK_SendFile(rxcall);
640 code = rx_EndCall(rxcall, code);
643 /* we set a new file, process its header */
644 ts->version = ubik_dbase->version;
650 /* mark file up to date */
654 ReleaseWriteLock(&ubik_dbase->versionLock);
655 if (dbok) urecovery_state |= UBIK_RECSENTDB;
661 ** send a Probe to all the network address of this server
662 ** Return 0 if success, else return 1
665 struct ubik_server *server;
667 struct rx_connection *conns[UBIK_MAX_INTERFACE_ADDR];
668 struct rx_connection *connSuccess = 0;
672 extern afs_int32 ubikSecIndex;
673 extern struct rx_securityClass *ubikSecClass;
675 for (i=0; (addr=server->addr[i]) && (i<UBIK_MAX_INTERFACE_ADDR);i++)
677 conns[i] = rx_NewConnection(addr, ubik_callPortal, DISK_SERVICE_ID,
678 ubikSecClass, ubikSecIndex);
680 /* user requirement to use only the primary interface */
681 if ( ubikPrimaryAddrOnly )
687 assert(i); /* at least one interface address for this server */
692 if ( !multi_error ) /* first success */
694 addr = server->addr[multi_i]; /* successful interface addr */
696 if ( server->disk_rxcid) /* destroy existing conn */
697 rx_DestroyConnection(server->disk_rxcid);
698 if ( server->vote_rxcid)
699 rx_DestroyConnection(server->vote_rxcid);
701 /* make new connections */
702 server->disk_rxcid = conns[multi_i];
703 server->vote_rxcid = rx_NewConnection(addr, ubik_callPortal,
704 VOTE_SERVICE_ID, ubikSecClass, ubikSecIndex);/* for vote reqs*/
706 connSuccess = conns[multi_i];
707 strcpy(buffer, (char*)afs_inet_ntoa(server->addr[0]));
708 ubik_print("ubik:server %s is back up: will be contacted through %s\n",
709 buffer, afs_inet_ntoa(addr));
715 /* Destroy all connections except the one on which we succeeded */
716 for ( j=0; j < i; j++)
717 if ( conns[j] != connSuccess )
718 rx_DestroyConnection(conns[j] );
721 ubik_dprint("ubik:server %s still down\n",afs_inet_ntoa(server->addr[0]));
723 if ( connSuccess ) return 0; /* success */
724 else return 1; /* failure */