2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afs/param.h>
11 #include <afsconfig.h>
15 #include <sys/types.h>
21 #include <netinet/in.h>
30 #define UBIK_INTERNALS
34 /* This module is responsible for determining when the system has
35 * recovered to the point that it can handle new transactions. It
36 * replays logs, polls to determine the current dbase after a crash,
37 * and distributes the new database to the others.
40 /* The sync site associates a version number with each database. It
41 * broadcasts the version associated with its current dbase in every
42 * one of its beacon messages. When the sync site send a dbase to a
43 * server, it also sends the db's version. A non-sync site server can
44 * tell if it has the right dbase version by simply comparing the
45 * version from the beacon message (uvote_dbVersion) with the version
46 * associated with the database (ubik_dbase->version). The sync site
47 * itself simply has one counter to keep track of all of this (again
48 * ubik_dbase->version).
51 /* sync site: routine called when the sync site loses its quorum; this
52 * procedure is called "up" from the beacon package. It resyncs the
53 * dbase and nudges the recovery daemon to try to propagate out the
54 * changes. It also resets the recovery daemon's state, since
55 * recovery must potentially find a new dbase to propagate out. This
56 * routine should not do anything with variables used by non-sync site
60 /* if this flag is set, then ubik will use only the primary address
61 ** ( the address specified in the CellServDB) to contact other
62 ** ubik servers. Ubik recovery will not try opening connections
63 ** to the alternate interface addresses.
65 int ubikPrimaryAddrOnly;
67 urecovery_ResetState() {
69 LWP_NoYieldSignal(&urecovery_state);
73 /* sync site: routine called when a non-sync site server goes down; restarts recovery
74 * process to send missing server the new db when it comes back up.
75 * This routine should not do anything with variables used by non-sync site servers.
77 urecovery_LostServer() {
78 LWP_NoYieldSignal(&urecovery_state);
82 /* return true iff we have a current database (called by both sync
83 * sites and non-sync sites) How do we determine this? If we're the
84 * sync site, we wait until recovery has finished fetching and
85 * re-labelling its dbase (it may still be trying to propagate it out
86 * to everyone else; that's THEIR problem). If we're not the sync
87 * site, then we must have a dbase labelled with the right version,
88 * and we must have a currently-good sync site.
90 urecovery_AllBetter(adbase, areadAny)
92 register struct ubik_dbase *adbase; {
93 register afs_int32 rcode;
95 ubik_dprint("allbetter checking\n");
100 if (ubik_dbase->version.epoch > 1)
101 rcode = 1; /* Happy with any good version of database */
104 /* Check if we're sync site and we've got the right data */
105 else if (ubeacon_AmSyncSite() && (urecovery_state & UBIK_RECHAVEDB)) {
109 /* next, check if we're aux site, and we've ever been sent the
110 * right data (note that if a dbase update fails, we won't think
111 * that the sync site is still the sync site, 'cause it won't talk
112 * to us until a timeout period has gone by. When we recover, we
113 * leave this clear until we get a new dbase */
114 else if ( (uvote_GetSyncSite() &&
115 (vcmp(ubik_dbVersion, ubik_dbase->version) == 0)) ) { /* && order is important */
119 ubik_dprint("allbetter: returning %d\n", rcode);
123 /* abort all transactions on this database */
124 urecovery_AbortAll(adbase)
125 struct ubik_dbase *adbase; {
126 register struct ubik_trans *tt;
127 for(tt = adbase->activeTrans; tt; tt=tt->next) {
133 /* this routine aborts the current remote transaction, if any, if the tid is wrong */
134 urecovery_CheckTid(atid)
135 register struct ubik_tid *atid; {
136 if (ubik_currentTrans) {
137 /* there is remote write trans, see if we match, see if this
138 * is a new transaction */
139 if (atid->epoch != ubik_currentTrans->tid.epoch || atid->counter > ubik_currentTrans->tid.counter) {
140 /* don't match, abort it */
141 /* If the thread is not waiting for lock - ok to end it */
142 if (ubik_currentTrans->locktype != LOCKWAIT) {
143 udisk_end(ubik_currentTrans);
145 ubik_currentTrans = (struct ubik_trans *) 0;
150 /* log format is defined here, and implicitly in disk.c
152 * 4 byte opcode, followed by parameters, each 4 bytes long. All integers
153 * are in logged in network standard byte order, in case we want to move logs
154 * from machine-to-machine someday.
156 * Begin transaction: opcode
157 * Commit transaction: opcode, version (8 bytes)
158 * Truncate file: opcode, file number, length
159 * Abort transaction: opcode
160 * Write data: opcode, file, position, length, <length> data bytes
162 * A very simple routine, it just replays the log. Note that this is a new-value only log, which
163 * implies that no uncommitted data is written to the dbase: one writes data to the log, including
164 * the commit record, then we allow data to be written through to the dbase. In our particular
165 * implementation, once a transaction is done, we write out the pages to the database, so that
166 * our buffer package doesn't have to know about stable and uncommitted data in the memory buffers:
167 * any changed data while there is an uncommitted write transaction can be zapped during an
168 * abort and the remaining dbase on the disk is exactly the right dbase, without having to read
173 static ReplayLog(adbase)
174 register struct ubik_dbase *adbase; {
176 register afs_int32 code, tpos;
178 afs_int32 len, thisSize, tfile, filePos;
180 afs_int32 syncFile = -1;
181 afs_int32 data[1024];
183 /* read the lock twice, once to see whether we have a transaction to deal
184 with that committed, (theoretically, we should support more than one
185 trans in the log at once, but not yet), and once replaying the
189 /* for now, assume that all ops in log pertain to one transaction; see if there's a commit */
191 code = (*adbase->read)(adbase, LOGFILE, &opcode, tpos, sizeof(afs_int32));
192 if (code != sizeof(afs_int32)) break;
193 if (opcode == LOGNEW) {
194 /* handle begin trans */
195 tpos += sizeof(afs_int32);
197 else if (opcode == LOGABORT) break;
198 else if (opcode == LOGEND) {
202 else if (opcode == LOGTRUNCATE) {
204 code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 2*sizeof(afs_int32));
205 if (code != 2*sizeof(afs_int32)) break; /* premature eof or io error */
206 tpos += 2*sizeof(afs_int32);
208 else if (opcode == LOGDATA) {
210 code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 3*sizeof(afs_int32));
211 if (code != 3*sizeof(afs_int32)) break;
212 /* otherwise, skip over the data bytes, too */
213 tpos += buffer[2] + 3*sizeof(afs_int32);
216 ubik_dprint("corrupt log opcode (%d) at position %d\n", opcode, tpos);
217 break; /* corrupt log! */
221 /* actually do the replay; log should go all the way through the commit record, since
222 we just read it above. */
227 code = (*adbase->read)(adbase, LOGFILE, &opcode, tpos, sizeof(afs_int32));
228 if (code != sizeof(afs_int32)) break;
229 if (opcode == LOGNEW) {
230 /* handle begin trans */
231 tpos += sizeof(afs_int32);
233 else if (opcode == LOGABORT) panic("log abort\n");
234 else if (opcode == LOGEND) {
236 code = (*adbase->read) (adbase, LOGFILE, buffer, tpos, 2*sizeof(afs_int32));
237 if (code != 2*sizeof(afs_int32)) return UBADLOG;
238 code = (*adbase->setlabel) (adbase, 0, buffer);
239 if (code) return code;
241 break; /* all done now */
243 else if (opcode == LOGTRUNCATE) {
245 code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 2*sizeof(afs_int32));
246 if (code != 2*sizeof(afs_int32)) break; /* premature eof or io error */
247 tpos += 2*sizeof(afs_int32);
248 code = (*adbase->truncate) (adbase, ntohl(buffer[0]), ntohl(buffer[1]));
249 if (code) return code;
251 else if (opcode == LOGDATA) {
253 code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 3*sizeof(afs_int32));
254 if (code != 3*sizeof(afs_int32)) break;
255 tpos += 3*sizeof(afs_int32);
256 /* otherwise, skip over the data bytes, too */
257 len = ntohl(buffer[2]); /* total number of bytes to copy */
258 filePos = ntohl(buffer[1]);
259 tfile = ntohl(buffer[0]);
260 /* try to minimize file syncs */
261 if (syncFile != tfile) {
262 if (syncFile >= 0) code = (*adbase->sync)(adbase, syncFile);
265 if (code) return code;
268 thisSize = (len > sizeof(data)? sizeof(data) : len);
269 /* copy sizeof(data) buffer bytes at a time */
270 code = (*adbase->read)(adbase, LOGFILE, data, tpos, thisSize);
271 if (code != thisSize) return UBADLOG;
272 code = (*adbase->write)(adbase, tfile, data, filePos, thisSize);
273 if (code != thisSize) return UBADLOG;
280 ubik_dprint("corrupt log opcode (%d) at position %d\n", opcode, tpos);
281 break; /* corrupt log! */
285 if (syncFile >= 0) code = (*adbase->sync)(adbase, syncFile);
286 if (code) return code;
289 ubik_dprint("Log read error on pass 2\n");
294 /* now truncate the log, we're done with it */
295 code = (*adbase->truncate)(adbase, LOGFILE, 0);
299 /* Called at initialization to figure out version of the dbase we really have.
300 * This routine is called after replaying the log; it reads the restored labels.
302 static InitializeDB(adbase)
303 register struct ubik_dbase *adbase; {
304 register afs_int32 code;
306 code = (*adbase->getlabel)(adbase, 0, &adbase->version);
308 /* try setting the label to a new value */
309 adbase->version.epoch = 1; /* value for newly-initialized db */
310 adbase->version.counter = 1;
311 code = (*adbase->setlabel) (adbase, 0, &adbase->version);
313 /* failed, try to set it back */
314 adbase->version.epoch = 0;
315 adbase->version.counter = 0;
316 (*adbase->setlabel) (adbase, 0, &adbase->version);
318 LWP_NoYieldSignal(&adbase->version);
323 /* initialize the local dbase
324 * We replay the logs and then read the resulting file to figure out what version we've really got.
326 urecovery_Initialize(adbase)
327 register struct ubik_dbase *adbase; {
328 register afs_int32 code;
330 code = ReplayLog(adbase);
331 if (code) return code;
332 code = InitializeDB(adbase);
336 /* Main interaction loop for the recovery manager
337 * The recovery light-weight process only runs when you're the
338 * synchronization site. It performs the following tasks, if and only
339 * if the prerequisite tasks have been performed successfully (it
340 * keeps track of which ones have been performed in its bit map,
343 * First, it is responsible for probing that all servers are up. This
344 * is the only operation that must be performed even if this is not
345 * yet the sync site, since otherwise this site may not notice that
346 * enough other machines are running to even elect this guy to be the
349 * After that, the recovery process does nothing until the beacon and
350 * voting modules manage to get this site elected sync site.
352 * After becoming sync site, recovery first attempts to find the best
353 * database available in the network (it must do this in order to
354 * ensure finding the latest committed data). After finding the right
355 * database, it must fetch this dbase to the sync site.
357 * After fetching the dbase, it relabels it with a new version number,
358 * to ensure that everyone recognizes this dbase as the most recent
361 * One the dbase has been relabelled, this machine can start handling
362 * requests. However, the recovery module still has one more task:
363 * propagating the dbase out to everyone who is up in the network.
365 urecovery_Interact() {
366 afs_int32 code, tcode;
367 struct ubik_server *bestServer;
368 struct ubik_server *ts;
369 int dbok, doingRPC, now;
370 afs_int32 lastProbeTime, lastDBVCheck;
371 /* if we're the sync site, the best db version we've found yet */
372 static struct ubik_version bestDBVersion;
373 struct ubik_version tversion;
375 int length, tlen, offset, file, nbytes;
376 struct rx_call *rxcall;
378 struct ubik_stat ubikstat;
379 struct in_addr inAddr;
381 /* otherwise, begin interaction */
386 /* Run through this loop every 4 seconds */
389 IOMGR_Select(0, 0, 0, 0, &tv);
391 ubik_dprint("recovery running in state %x\n", urecovery_state);
393 /* Every 30 seconds, check all the down servers and mark them
394 * as up if they respond. When a server comes up or found to
395 * not be current, then re-find the the best database and
398 if ( (now = FT_ApproxTime()) > 30 + lastProbeTime) {
399 for (ts=ubik_servers,doingRPC=0; ts; ts=ts->next) {
405 urecovery_state &= ~UBIK_RECFOUNDDB;
407 } else if (!ts->currentDB) {
408 urecovery_state &= ~UBIK_RECFOUNDDB;
412 now = FT_ApproxTime();
416 /* Mark whether we are the sync site */
417 if (!ubeacon_AmSyncSite()) {
418 urecovery_state &= ~UBIK_RECSYNCSITE;
419 continue; /* nothing to do */
421 urecovery_state |= UBIK_RECSYNCSITE;
423 /* If a server has just come up or if we have not found the
424 * most current database, then go find the most current db.
426 if (!(urecovery_state & UBIK_RECFOUNDDB)) {
427 bestServer = (struct ubik_server *) 0;
428 bestDBVersion.epoch = 0;
429 bestDBVersion.counter = 0;
430 for(ts=ubik_servers; ts; ts=ts->next) {
431 if (!ts->up) continue; /* don't bother with these guys */
432 if (ts->isClone) continue;
433 code = DISK_GetVersion(ts->disk_rxcid, &ts->version);
435 /* perhaps this is the best version */
436 if (vcmp(ts->version, bestDBVersion) > 0) {
437 /* new best version */
438 bestDBVersion = ts->version;
443 /* take into consideration our version. Remember if we,
444 * the sync site, have the best version. Also note that
445 * we may need to send the best version out.
447 if (vcmp(ubik_dbase->version, bestDBVersion) >= 0) {
448 bestDBVersion = ubik_dbase->version;
449 bestServer = (struct ubik_server *) 0;
450 urecovery_state |= UBIK_RECHAVEDB;
452 /* Clear the flag only when we know we have to retrieve
453 * the db. Because urecovery_AllBetter() looks at it.
455 urecovery_state &= ~UBIK_RECHAVEDB;
457 lastDBVCheck = FT_ApproxTime();
458 urecovery_state |= UBIK_RECFOUNDDB;
459 urecovery_state &= ~UBIK_RECSENTDB;
461 if (!(urecovery_state & UBIK_RECFOUNDDB)) continue; /* not ready */
463 /* If we, the sync site, do not have the best db version, then
464 * go and get it from the server that does.
466 if ((urecovery_state & UBIK_RECHAVEDB) || !bestServer) {
467 urecovery_state |= UBIK_RECHAVEDB;
469 /* we don't have the best version; we should fetch it. */
470 ObtainWriteLock(&ubik_dbase->versionLock);
471 urecovery_AbortAll(ubik_dbase);
473 /* Rx code to do the Bulk fetch */
476 rxcall = rx_NewCall(bestServer->disk_rxcid);
478 ubik_print("Ubik: Synchronize database with server %s\n", afs_inet_ntoa(bestServer->addr[0]));
480 code = StartDISK_GetFile(rxcall, file);
482 ubik_dprint("StartDiskGetFile failed=%d\n", code);
485 nbytes = rx_Read(rxcall, &length, sizeof(afs_int32));
486 length = ntohl(length);
487 if (nbytes != sizeof(afs_int32)) {
488 ubik_dprint("Rx-read length error=%d\n", code=BULK_ERROR);
493 /* Truncate the file firest */
494 code = (*ubik_dbase->truncate)(ubik_dbase, file, 0);
496 ubik_dprint("truncate io error=%d\n", code);
500 /* give invalid label during file transit */
502 tversion.counter = 0;
503 code = (*ubik_dbase->setlabel)(ubik_dbase, file, &tversion);
505 ubik_dprint("setlabel io error=%d\n", code);
510 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
511 nbytes = rx_Read(rxcall, tbuffer, tlen);
512 if (nbytes != tlen) {
513 ubik_dprint("Rx-read bulk error=%d\n", code=BULK_ERROR);
517 nbytes = (*ubik_dbase->write)(ubik_dbase, file, tbuffer, offset, tlen);
518 if (nbytes != tlen) {
525 code = EndDISK_GetFile(rxcall, &tversion);
527 tcode = rx_EndCall(rxcall, code);
528 if (!code) code = tcode;
530 /* we got a new file, set up its header */
531 urecovery_state |= UBIK_RECHAVEDB;
532 bcopy(&tversion, &ubik_dbase->version, sizeof(struct ubik_version));
533 (*ubik_dbase->sync)(ubik_dbase, 0); /* get data out first */
534 /* after data is good, sync disk with correct label */
535 code = (*ubik_dbase->setlabel)(ubik_dbase, 0, &ubik_dbase->version);
538 ubik_dbase->version.epoch = 0;
539 ubik_dbase->version.counter = 0;
540 ubik_print("Ubik: Synchronize database failed (error = %d)\n", code);
542 ubik_print("Ubik: Synchronize database completed\n");
544 udisk_Invalidate(ubik_dbase, 0); /* data has changed */
545 LWP_NoYieldSignal(&ubik_dbase->version);
546 ReleaseWriteLock(&ubik_dbase->versionLock);
548 if (!(urecovery_state & UBIK_RECHAVEDB)) continue; /* not ready */
550 /* If the database was newly initialized, then when we establish quorum, write
551 * a new label. This allows urecovery_AllBetter() to allow access for reads.
552 * Setting it to 2 also allows another site to come along with a newer
553 * database and overwrite this one.
555 if (ubik_dbase->version.epoch == 1) {
556 ObtainWriteLock(&ubik_dbase->versionLock);
557 urecovery_AbortAll(ubik_dbase);
559 ubik_dbase->version.epoch = ubik_epochTime;
560 ubik_dbase->version.counter = 1;
561 code = (*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
562 udisk_Invalidate(ubik_dbase, 0); /* data may have changed */
563 LWP_NoYieldSignal(&ubik_dbase->version);
564 ReleaseWriteLock(&ubik_dbase->versionLock);
567 /* Check the other sites and send the database to them if they
568 * do not have the current db.
570 if (!(urecovery_state & UBIK_RECSENTDB)) {
571 /* now propagate out new version to everyone else */
572 dbok = 1; /* start off assuming they all worked */
574 ObtainWriteLock(&ubik_dbase->versionLock);
576 * Check if a write transaction is in progress. We can't send the
577 * db when a write is in progress here because the db would be
578 * obsolete as soon as it goes there. Also, ops after the begin
579 * trans would reach the recepient and wouldn't find a transaction
580 * pending there. Frankly, I don't think it's possible to get past
581 * the write-lock above if there is a write transaction in progress,
582 * but then, it won't hurt to check, will it?
584 if (ubik_dbase->flags & DBWRITING) {
589 while ((ubik_dbase->flags & DBWRITING) && (safety < 500)) {
590 ReleaseWriteLock(&ubik_dbase->versionLock);
591 /* sleep for a little while */
592 IOMGR_Select(0, 0, 0, 0, &tv);
593 tv.tv_usec += 10000; safety++;
594 ObtainWriteLock(&ubik_dbase->versionLock);
598 for(ts=ubik_servers; ts; ts=ts->next) {
599 inAddr.s_addr = ts->addr[0];
601 ubik_dprint("recovery cannot send version to %s\n",
602 afs_inet_ntoa(inAddr.s_addr));
606 ubik_dprint("recovery sending version to %s\n",
607 afs_inet_ntoa(inAddr.s_addr));
608 if (vcmp(ts->version, ubik_dbase->version) != 0) {
609 ubik_dprint("recovery stating local database\n");
611 /* Rx code to do the Bulk Store */
612 code = (*ubik_dbase->stat)(ubik_dbase, 0, &ubikstat);
614 length = ubikstat.size;
616 rxcall = rx_NewCall(ts->disk_rxcid);
617 code = StartDISK_SendFile(rxcall, file, length, &ubik_dbase->version);
619 ubik_dprint("StartDiskSendFile failed=%d\n", code);
623 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
624 nbytes = (*ubik_dbase->read)(ubik_dbase, file, tbuffer, offset, tlen);
625 if (nbytes != tlen) {
626 ubik_dprint("Local disk read error=%d\n", code=UIOERROR);
629 nbytes = rx_Write(rxcall, tbuffer, tlen);
630 if (nbytes != tlen) {
631 ubik_dprint("Rx-write bulk error=%d\n", code=BULK_ERROR);
637 code = EndDISK_SendFile(rxcall);
639 code = rx_EndCall(rxcall, code);
642 /* we set a new file, process its header */
643 ts->version = ubik_dbase->version;
649 /* mark file up to date */
653 ReleaseWriteLock(&ubik_dbase->versionLock);
654 if (dbok) urecovery_state |= UBIK_RECSENTDB;
660 ** send a Probe to all the network address of this server
661 ** Return 0 if success, else return 1
664 struct ubik_server *server;
666 struct rx_connection *conns[UBIK_MAX_INTERFACE_ADDR];
667 struct rx_connection *connSuccess = 0;
671 extern afs_int32 ubikSecIndex;
672 extern struct rx_securityClass *ubikSecClass;
674 for (i=0; (addr=server->addr[i]) && (i<UBIK_MAX_INTERFACE_ADDR);i++)
676 conns[i] = rx_NewConnection(addr, ubik_callPortal, DISK_SERVICE_ID,
677 ubikSecClass, ubikSecIndex);
679 /* user requirement to use only the primary interface */
680 if ( ubikPrimaryAddrOnly )
686 assert(i); /* at least one interface address for this server */
691 if ( !multi_error ) /* first success */
693 addr = server->addr[multi_i]; /* successful interface addr */
695 if ( server->disk_rxcid) /* destroy existing conn */
696 rx_DestroyConnection(server->disk_rxcid);
697 if ( server->vote_rxcid)
698 rx_DestroyConnection(server->vote_rxcid);
700 /* make new connections */
701 server->disk_rxcid = conns[multi_i];
702 server->vote_rxcid = rx_NewConnection(addr, ubik_callPortal,
703 VOTE_SERVICE_ID, ubikSecClass, ubikSecIndex);/* for vote reqs*/
705 connSuccess = conns[multi_i];
706 strcpy(buffer, (char*)afs_inet_ntoa(server->addr[0]));
707 ubik_print("ubik:server %s is back up: will be contacted through %s\n",
708 buffer, afs_inet_ntoa(addr));
714 /* Destroy all connections except the one on which we succeeded */
715 for ( j=0; j < i; j++)
716 if ( conns[j] != connSuccess )
717 rx_DestroyConnection(conns[j] );
720 ubik_dprint("ubik:server %s still down\n",afs_inet_ntoa(server->addr[0]));
722 if ( connSuccess ) return 0; /* success */
723 else return 1; /* failure */