ubik: locking in recovery.c
[openafs.git] / src / ubik / recovery.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #include <roken.h>
14
15 #include <lock.h>
16 #include <rx/xdr.h>
17 #include <rx/rx.h>
18 #include <afs/afsutil.h>
19 #include <afs/cellconfig.h>
20
21 #define UBIK_INTERNALS
22 #include "ubik.h"
23 #include "ubik_int.h"
24
25 /*! \file
26  * This module is responsible for determining when the system has
27  * recovered to the point that it can handle new transactions.  It
28  * replays logs, polls to determine the current dbase after a crash,
29  * and distributes the new database to the others.
30  *
31  * The sync site associates a version number with each database.  It
32  * broadcasts the version associated with its current dbase in every
33  * one of its beacon messages.  When the sync site send a dbase to a
34  * server, it also sends the db's version.  A non-sync site server can
35  * tell if it has the right dbase version by simply comparing the
36  * version from the beacon message \p uvote_dbVersion with the version
37  * associated with the database \p ubik_dbase->version.  The sync site
38  * itself simply has one counter to keep track of all of this (again
39  * \p ubik_dbase->version).
40  *
41  * sync site: routine called when the sync site loses its quorum; this
42  * procedure is called "up" from the beacon package.  It resyncs the
43  * dbase and nudges the recovery daemon to try to propagate out the
44  * changes.  It also resets the recovery daemon's state, since
45  * recovery must potentially find a new dbase to propagate out.  This
46  * routine should not do anything with variables used by non-sync site
47  * servers.
48  */
49
50 /*!
51  * if this flag is set, then ubik will use only the primary address
52  * (the address specified in the CellServDB) to contact other
53  * ubik servers. Ubik recovery will not try opening connections
54  * to the alternate interface addresses.
55  */
56 int ubikPrimaryAddrOnly;
57
58 int
59 urecovery_ResetState(void)
60 {
61     urecovery_state = 0;
62 #if !defined(AFS_PTHREAD_ENV)
63     /*  No corresponding LWP_WaitProcess found anywhere for this -- klm */
64     LWP_NoYieldSignal(&urecovery_state);
65 #endif
66     return 0;
67 }
68
69 /*!
70  * \brief sync site
71  *
72  * routine called when a non-sync site server goes down; restarts recovery
73  * process to send missing server the new db when it comes back up for
74  * non-sync site servers.
75  *
76  * \note This routine should not do anything with variables used by non-sync site servers.
77  */
78 int
79 urecovery_LostServer(struct ubik_server *ts)
80 {
81     ubeacon_ReinitServer(ts);
82 #if !defined(AFS_PTHREAD_ENV)
83     /*  No corresponding LWP_WaitProcess found anywhere for this -- klm */
84     LWP_NoYieldSignal(&urecovery_state);
85 #endif
86     return 0;
87 }
88
89 /*!
90  * return true iff we have a current database (called by both sync
91  * sites and non-sync sites) How do we determine this?  If we're the
92  * sync site, we wait until recovery has finished fetching and
93  * re-labelling its dbase (it may still be trying to propagate it out
94  * to everyone else; that's THEIR problem).  If we're not the sync
95  * site, then we must have a dbase labelled with the right version,
96  * and we must have a currently-good sync site.
97  */
98 int
99 urecovery_AllBetter(struct ubik_dbase *adbase, int areadAny)
100 {
101     afs_int32 rcode;
102
103     ubik_dprint_25("allbetter checking\n");
104     rcode = 0;
105
106
107     if (areadAny) {
108         if (ubik_dbase->version.epoch > 1)
109             rcode = 1;          /* Happy with any good version of database */
110     }
111
112     /* Check if we're sync site and we've got the right data */
113     else if (ubeacon_AmSyncSite() && (urecovery_state & UBIK_RECHAVEDB)) {
114         rcode = 1;
115     }
116
117     /* next, check if we're aux site, and we've ever been sent the
118      * right data (note that if a dbase update fails, we won't think
119      * that the sync site is still the sync site, 'cause it won't talk
120      * to us until a timeout period has gone by.  When we recover, we
121      * leave this clear until we get a new dbase */
122     else if ((uvote_GetSyncSite() && uvote_eq_dbVersion(ubik_dbase->version))) {        /* && order is important */
123         rcode = 1;
124     }
125
126     ubik_dprint_25("allbetter: returning %d\n", rcode);
127     return rcode;
128 }
129
130 /*!
131  * \brief abort all transactions on this database
132  */
133 int
134 urecovery_AbortAll(struct ubik_dbase *adbase)
135 {
136     struct ubik_trans *tt;
137     for (tt = adbase->activeTrans; tt; tt = tt->next) {
138         udisk_abort(tt);
139     }
140     return 0;
141 }
142
143 /*!
144  * \brief this routine aborts the current remote transaction, if any, if the tid is wrong
145  */
146 int
147 urecovery_CheckTid(struct ubik_tid *atid, int abortalways)
148 {
149     if (ubik_currentTrans) {
150         /* there is remote write trans, see if we match, see if this
151          * is a new transaction */
152         if (atid->epoch != ubik_currentTrans->tid.epoch
153             || atid->counter > ubik_currentTrans->tid.counter || abortalways) {
154             /* don't match, abort it */
155             /* If the thread is not waiting for lock - ok to end it */
156             if (ubik_currentTrans->locktype != LOCKWAIT) {
157                 udisk_end(ubik_currentTrans);
158             }
159             ubik_currentTrans = (struct ubik_trans *)0;
160         }
161     }
162     return 0;
163 }
164
165 /*!
166  * \brief replay logs
167  *
168  * log format is defined here, and implicitly in disk.c
169  *
170  * 4 byte opcode, followed by parameters, each 4 bytes long.  All integers
171  * are in logged in network standard byte order, in case we want to move logs
172  * from machine-to-machine someday.
173  *
174  * Begin transaction: opcode \n
175  * Commit transaction: opcode, version (8 bytes) \n
176  * Truncate file: opcode, file number, length \n
177  * Abort transaction: opcode \n
178  * Write data: opcode, file, position, length, <length> data bytes \n
179  *
180  * A very simple routine, it just replays the log.  Note that this is a new-value only log, which
181  * implies that no uncommitted data is written to the dbase: one writes data to the log, including
182  * the commit record, then we allow data to be written through to the dbase.  In our particular
183  * implementation, once a transaction is done, we write out the pages to the database, so that
184  * our buffer package doesn't have to know about stable and uncommitted data in the memory buffers:
185  * any changed data while there is an uncommitted write transaction can be zapped during an
186  * abort and the remaining dbase on the disk is exactly the right dbase, without having to read
187  * the log.
188  */
189 static int
190 ReplayLog(struct ubik_dbase *adbase)
191 {
192     afs_int32 opcode;
193     afs_int32 code, tpos;
194     int logIsGood;
195     afs_int32 len, thisSize, tfile, filePos;
196     afs_int32 buffer[4];
197     afs_int32 syncFile = -1;
198     afs_int32 data[1024];
199
200     /* read the lock twice, once to see whether we have a transaction to deal
201      * with that committed, (theoretically, we should support more than one
202      * trans in the log at once, but not yet), and once replaying the
203      * transactions.  */
204     tpos = 0;
205     logIsGood = 0;
206     /* for now, assume that all ops in log pertain to one transaction; see if there's a commit */
207     while (1) {
208         code =
209             (*adbase->read) (adbase, LOGFILE, (char *)&opcode, tpos,
210                              sizeof(afs_int32));
211         if (code != sizeof(afs_int32))
212             break;
213         opcode = ntohl(opcode);
214         if (opcode == LOGNEW) {
215             /* handle begin trans */
216             tpos += sizeof(afs_int32);
217         } else if (opcode == LOGABORT)
218             break;
219         else if (opcode == LOGEND) {
220             logIsGood = 1;
221             break;
222         } else if (opcode == LOGTRUNCATE) {
223             tpos += 4;
224             code =
225                 (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
226                                  2 * sizeof(afs_int32));
227             if (code != 2 * sizeof(afs_int32))
228                 break;          /* premature eof or io error */
229             tpos += 2 * sizeof(afs_int32);
230         } else if (opcode == LOGDATA) {
231             tpos += 4;
232             code =
233                 (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
234                                  3 * sizeof(afs_int32));
235             if (code != 3 * sizeof(afs_int32))
236                 break;
237             /* otherwise, skip over the data bytes, too */
238             tpos += ntohl(buffer[2]) + 3 * sizeof(afs_int32);
239         } else {
240             ubik_print("corrupt log opcode (%d) at position %d\n", opcode,
241                        tpos);
242             break;              /* corrupt log! */
243         }
244     }
245     if (logIsGood) {
246         /* actually do the replay; log should go all the way through the commit record, since
247          * we just read it above. */
248         tpos = 0;
249         logIsGood = 0;
250         syncFile = -1;
251         while (1) {
252             code =
253                 (*adbase->read) (adbase, LOGFILE, (char *)&opcode, tpos,
254                                  sizeof(afs_int32));
255             if (code != sizeof(afs_int32))
256                 break;
257             opcode = ntohl(opcode);
258             if (opcode == LOGNEW) {
259                 /* handle begin trans */
260                 tpos += sizeof(afs_int32);
261             } else if (opcode == LOGABORT)
262                 panic("log abort\n");
263             else if (opcode == LOGEND) {
264                 struct ubik_version version;
265                 tpos += 4;
266                 code =
267                     (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
268                                      2 * sizeof(afs_int32));
269                 if (code != 2 * sizeof(afs_int32))
270                     return UBADLOG;
271                 version.epoch = ntohl(buffer[0]);
272                 version.counter = ntohl(buffer[1]);
273                 code = (*adbase->setlabel) (adbase, 0, &version);
274                 if (code)
275                     return code;
276                 ubik_print("Successfully replayed log for interrupted "
277                            "transaction; db version is now %ld.%ld\n",
278                            (long) version.epoch, (long) version.counter);
279                 logIsGood = 1;
280                 break;          /* all done now */
281             } else if (opcode == LOGTRUNCATE) {
282                 tpos += 4;
283                 code =
284                     (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
285                                      2 * sizeof(afs_int32));
286                 if (code != 2 * sizeof(afs_int32))
287                     break;      /* premature eof or io error */
288                 tpos += 2 * sizeof(afs_int32);
289                 code =
290                     (*adbase->truncate) (adbase, ntohl(buffer[0]),
291                                          ntohl(buffer[1]));
292                 if (code)
293                     return code;
294             } else if (opcode == LOGDATA) {
295                 tpos += 4;
296                 code =
297                     (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
298                                      3 * sizeof(afs_int32));
299                 if (code != 3 * sizeof(afs_int32))
300                     break;
301                 tpos += 3 * sizeof(afs_int32);
302                 /* otherwise, skip over the data bytes, too */
303                 len = ntohl(buffer[2]); /* total number of bytes to copy */
304                 filePos = ntohl(buffer[1]);
305                 tfile = ntohl(buffer[0]);
306                 /* try to minimize file syncs */
307                 if (syncFile != tfile) {
308                     if (syncFile >= 0)
309                         code = (*adbase->sync) (adbase, syncFile);
310                     else
311                         code = 0;
312                     syncFile = tfile;
313                     if (code)
314                         return code;
315                 }
316                 while (len > 0) {
317                     thisSize = (len > sizeof(data) ? sizeof(data) : len);
318                     /* copy sizeof(data) buffer bytes at a time */
319                     code =
320                         (*adbase->read) (adbase, LOGFILE, (char *)data, tpos,
321                                          thisSize);
322                     if (code != thisSize)
323                         return UBADLOG;
324                     code =
325                         (*adbase->write) (adbase, tfile, (char *)data, filePos,
326                                           thisSize);
327                     if (code != thisSize)
328                         return UBADLOG;
329                     filePos += thisSize;
330                     tpos += thisSize;
331                     len -= thisSize;
332                 }
333             } else {
334                 ubik_print("corrupt log opcode (%d) at position %d\n",
335                            opcode, tpos);
336                 break;          /* corrupt log! */
337             }
338         }
339         if (logIsGood) {
340             if (syncFile >= 0)
341                 code = (*adbase->sync) (adbase, syncFile);
342             if (code)
343                 return code;
344         } else {
345             ubik_print("Log read error on pass 2\n");
346             return UBADLOG;
347         }
348     }
349
350     /* now truncate the log, we're done with it */
351     code = (*adbase->truncate) (adbase, LOGFILE, 0);
352     return code;
353 }
354
355 /*! \brief
356  * Called at initialization to figure out version of the dbase we really have.
357  *
358  * This routine is called after replaying the log; it reads the restored labels.
359  */
360 static int
361 InitializeDB(struct ubik_dbase *adbase)
362 {
363     afs_int32 code;
364
365     code = (*adbase->getlabel) (adbase, 0, &adbase->version);
366     if (code) {
367         /* try setting the label to a new value */
368         UBIK_VERSION_LOCK;
369         adbase->version.epoch = 1;      /* value for newly-initialized db */
370         adbase->version.counter = 1;
371         code = (*adbase->setlabel) (adbase, 0, &adbase->version);
372         if (code) {
373             /* failed, try to set it back */
374             adbase->version.epoch = 0;
375             adbase->version.counter = 0;
376             (*adbase->setlabel) (adbase, 0, &adbase->version);
377         }
378 #ifdef AFS_PTHREAD_ENV
379         CV_BROADCAST(&adbase->version_cond);
380 #else
381         LWP_NoYieldSignal(&adbase->version);
382 #endif
383         UBIK_VERSION_UNLOCK;
384     }
385     return 0;
386 }
387
388 /*!
389  * \brief initialize the local ubik_dbase
390  *
391  * We replay the logs and then read the resulting file to figure out what version we've really got.
392  */
393 int
394 urecovery_Initialize(struct ubik_dbase *adbase)
395 {
396     afs_int32 code;
397
398     DBHOLD(adbase);
399     code = ReplayLog(adbase);
400     if (code)
401         goto done;
402     code = InitializeDB(adbase);
403 done:
404     DBRELE(adbase);
405     return code;
406 }
407
408 /*!
409  * \brief Main interaction loop for the recovery manager
410  *
411  * The recovery light-weight process only runs when you're the
412  * synchronization site.  It performs the following tasks, if and only
413  * if the prerequisite tasks have been performed successfully (it
414  * keeps track of which ones have been performed in its bit map,
415  * \p urecovery_state).
416  *
417  * First, it is responsible for probing that all servers are up.  This
418  * is the only operation that must be performed even if this is not
419  * yet the sync site, since otherwise this site may not notice that
420  * enough other machines are running to even elect this guy to be the
421  * sync site.
422  *
423  * After that, the recovery process does nothing until the beacon and
424  * voting modules manage to get this site elected sync site.
425  *
426  * After becoming sync site, recovery first attempts to find the best
427  * database available in the network (it must do this in order to
428  * ensure finding the latest committed data).  After finding the right
429  * database, it must fetch this dbase to the sync site.
430  *
431  * After fetching the dbase, it relabels it with a new version number,
432  * to ensure that everyone recognizes this dbase as the most recent
433  * dbase.
434  *
435  * One the dbase has been relabelled, this machine can start handling
436  * requests.  However, the recovery module still has one more task:
437  * propagating the dbase out to everyone who is up in the network.
438  */
439 void *
440 urecovery_Interact(void *dummy)
441 {
442     afs_int32 code, tcode;
443     struct ubik_server *bestServer = NULL;
444     struct ubik_server *ts;
445     int dbok, doingRPC, now;
446     afs_int32 lastProbeTime;
447     /* if we're the sync site, the best db version we've found yet */
448     static struct ubik_version bestDBVersion;
449     struct ubik_version tversion;
450     struct timeval tv;
451     int length, tlen, offset, file, nbytes;
452     struct rx_call *rxcall;
453     char tbuffer[1024];
454     struct ubik_stat ubikstat;
455     struct in_addr inAddr;
456     char hoststr[16];
457     char pbuffer[1028];
458     int fd = -1;
459     afs_int32 pass;
460
461     /* otherwise, begin interaction */
462     urecovery_state = 0;
463     lastProbeTime = 0;
464     while (1) {
465         /* Run through this loop every 4 seconds */
466         tv.tv_sec = 4;
467         tv.tv_usec = 0;
468 #ifdef AFS_PTHREAD_ENV
469         select(0, 0, 0, 0, &tv);
470 #else
471         IOMGR_Select(0, 0, 0, 0, &tv);
472 #endif
473
474         ubik_dprint("recovery running in state %x\n", urecovery_state);
475
476         /* Every 30 seconds, check all the down servers and mark them
477          * as up if they respond. When a server comes up or found to
478          * not be current, then re-find the the best database and
479          * propogate it.
480          */
481         if ((now = FT_ApproxTime()) > 30 + lastProbeTime) {
482
483             for (ts = ubik_servers, doingRPC = 0; ts; ts = ts->next) {
484                 UBIK_BEACON_LOCK;
485                 if (!ts->up) {
486                     UBIK_BEACON_UNLOCK;
487                     doingRPC = 1;
488                     code = DoProbe(ts);
489                     if (code == 0) {
490                         UBIK_BEACON_LOCK;
491                         ts->up = 1;
492                         UBIK_BEACON_UNLOCK;
493                         DBHOLD(ubik_dbase);
494                         urecovery_state &= ~UBIK_RECFOUNDDB;
495                         DBRELE(ubik_dbase);
496                     }
497                 } else {
498                     UBIK_BEACON_UNLOCK;
499                     DBHOLD(ubik_dbase);
500                     if (!ts->currentDB)
501                         urecovery_state &= ~UBIK_RECFOUNDDB;
502                     DBRELE(ubik_dbase);
503                 }
504             }
505
506             if (doingRPC)
507                 now = FT_ApproxTime();
508             lastProbeTime = now;
509         }
510
511         /* Mark whether we are the sync site */
512         DBHOLD(ubik_dbase);
513         if (!ubeacon_AmSyncSite()) {
514             urecovery_state &= ~UBIK_RECSYNCSITE;
515             DBRELE(ubik_dbase);
516             continue;           /* nothing to do */
517         }
518         urecovery_state |= UBIK_RECSYNCSITE;
519
520         /* If a server has just come up or if we have not found the
521          * most current database, then go find the most current db.
522          */
523         if (!(urecovery_state & UBIK_RECFOUNDDB)) {
524             DBRELE(ubik_dbase);
525             bestServer = (struct ubik_server *)0;
526             bestDBVersion.epoch = 0;
527             bestDBVersion.counter = 0;
528             for (ts = ubik_servers; ts; ts = ts->next) {
529                 UBIK_BEACON_LOCK;
530                 if (!ts->up) {
531                     UBIK_BEACON_UNLOCK;
532                     continue;   /* don't bother with these guys */
533                 }
534                 UBIK_BEACON_UNLOCK;
535                 if (ts->isClone)
536                     continue;
537                 UBIK_ADDR_LOCK;
538                 code = DISK_GetVersion(ts->disk_rxcid, &ts->version);
539                 UBIK_ADDR_UNLOCK;
540                 if (code == 0) {
541                     /* perhaps this is the best version */
542                     if (vcmp(ts->version, bestDBVersion) > 0) {
543                         /* new best version */
544                         bestDBVersion = ts->version;
545                         bestServer = ts;
546                     }
547                 }
548             }
549             /* take into consideration our version. Remember if we,
550              * the sync site, have the best version. Also note that
551              * we may need to send the best version out.
552              */
553             DBHOLD(ubik_dbase);
554             if (vcmp(ubik_dbase->version, bestDBVersion) >= 0) {
555                 bestDBVersion = ubik_dbase->version;
556                 bestServer = (struct ubik_server *)0;
557                 urecovery_state |= UBIK_RECHAVEDB;
558             } else {
559                 /* Clear the flag only when we know we have to retrieve
560                  * the db. Because urecovery_AllBetter() looks at it.
561                  */
562                 urecovery_state &= ~UBIK_RECHAVEDB;
563             }
564             urecovery_state |= UBIK_RECFOUNDDB;
565             urecovery_state &= ~UBIK_RECSENTDB;
566         }
567         if (!(urecovery_state & UBIK_RECFOUNDDB)) {
568             DBRELE(ubik_dbase);
569             continue;           /* not ready */
570         }
571
572         /* If we, the sync site, do not have the best db version, then
573          * go and get it from the server that does.
574          */
575         if ((urecovery_state & UBIK_RECHAVEDB) || !bestServer) {
576             urecovery_state |= UBIK_RECHAVEDB;
577         } else {
578             /* we don't have the best version; we should fetch it. */
579             urecovery_AbortAll(ubik_dbase);
580
581             /* Rx code to do the Bulk fetch */
582             file = 0;
583             offset = 0;
584             UBIK_ADDR_LOCK;
585             rxcall = rx_NewCall(bestServer->disk_rxcid);
586
587             ubik_print("Ubik: Synchronize database with server %s\n",
588                        afs_inet_ntoa_r(bestServer->addr[0], hoststr));
589             UBIK_ADDR_UNLOCK;
590
591             code = StartDISK_GetFile(rxcall, file);
592             if (code) {
593                 ubik_dprint("StartDiskGetFile failed=%d\n", code);
594                 goto FetchEndCall;
595             }
596             nbytes = rx_Read(rxcall, (char *)&length, sizeof(afs_int32));
597             length = ntohl(length);
598             if (nbytes != sizeof(afs_int32)) {
599                 ubik_dprint("Rx-read length error=%d\n", code = BULK_ERROR);
600                 code = EIO;
601                 goto FetchEndCall;
602             }
603
604             /* give invalid label during file transit */
605             UBIK_VERSION_LOCK;
606             tversion.epoch = 0;
607             code = (*ubik_dbase->setlabel) (ubik_dbase, file, &tversion);
608             UBIK_VERSION_UNLOCK;
609             if (code) {
610                 ubik_dprint("setlabel io error=%d\n", code);
611                 goto FetchEndCall;
612             }
613             snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP",
614                      ubik_dbase->pathName, (file<0)?"SYS":"",
615                      (file<0)?-file:file);
616             fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
617             if (fd < 0) {
618                 code = errno;
619                 goto FetchEndCall;
620             }
621             code = lseek(fd, HDRSIZE, 0);
622             if (code != HDRSIZE) {
623                 close(fd);
624                 goto FetchEndCall;
625             }
626
627             pass = 0;
628             while (length > 0) {
629                 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
630 #ifndef AFS_PTHREAD_ENV
631                 if (pass % 4 == 0)
632                     IOMGR_Poll();
633 #endif
634                 nbytes = rx_Read(rxcall, tbuffer, tlen);
635                 if (nbytes != tlen) {
636                     ubik_dprint("Rx-read bulk error=%d\n", code = BULK_ERROR);
637                     code = EIO;
638                     close(fd);
639                     goto FetchEndCall;
640                 }
641                 nbytes = write(fd, tbuffer, tlen);
642                 pass++;
643                 if (nbytes != tlen) {
644                     code = UIOERROR;
645                     close(fd);
646                     goto FetchEndCall;
647                 }
648                 offset += tlen;
649                 length -= tlen;
650             }
651             code = close(fd);
652             if (code)
653                 goto FetchEndCall;
654             code = EndDISK_GetFile(rxcall, &tversion);
655           FetchEndCall:
656             tcode = rx_EndCall(rxcall, code);
657             if (!code)
658                 code = tcode;
659             if (!code) {
660                 /* we got a new file, set up its header */
661                 urecovery_state |= UBIK_RECHAVEDB;
662                 UBIK_VERSION_LOCK;
663                 memcpy(&ubik_dbase->version, &tversion,
664                        sizeof(struct ubik_version));
665                 snprintf(tbuffer, sizeof(tbuffer), "%s.DB%s%d",
666                          ubik_dbase->pathName, (file<0)?"SYS":"",
667                          (file<0)?-file:file);
668 #ifdef AFS_NT40_ENV
669                 snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD",
670                          ubik_dbase->pathName, (file<0)?"SYS":"",
671                          (file<0)?-file:file);
672                 code = unlink(pbuffer);
673                 if (!code)
674                     code = rename(tbuffer, pbuffer);
675                 snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP",
676                          ubik_dbase->pathName, (file<0)?"SYS":"",
677                          (file<0)?-file:file);
678 #endif
679                 if (!code)
680                     code = rename(pbuffer, tbuffer);
681                 if (!code) {
682                     (*ubik_dbase->open) (ubik_dbase, file);
683                     /* after data is good, sync disk with correct label */
684                     code =
685                         (*ubik_dbase->setlabel) (ubik_dbase, 0,
686                                                  &ubik_dbase->version);
687                 }
688                 UBIK_VERSION_UNLOCK;
689 #ifdef AFS_NT40_ENV
690                 snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD",
691                          ubik_dbase->pathName, (file<0)?"SYS":"",
692                          (file<0)?-file:file);
693                 unlink(pbuffer);
694 #endif
695             }
696             if (code) {
697                 unlink(pbuffer);
698                 /*
699                  * We will effectively invalidate the old data forever now.
700                  * Unclear if we *should* but we do.
701                  */
702                 UBIK_VERSION_LOCK;
703                 ubik_dbase->version.epoch = 0;
704                 ubik_dbase->version.counter = 0;
705                 UBIK_VERSION_UNLOCK;
706                 ubik_print("Ubik: Synchronize database failed (error = %d)\n",
707                            code);
708             } else {
709                 ubik_print("Ubik: Synchronize database completed\n");
710                 urecovery_state |= UBIK_RECHAVEDB;
711             }
712             udisk_Invalidate(ubik_dbase, 0);    /* data has changed */
713 #ifdef AFS_PTHREAD_ENV
714             CV_BROADCAST(&ubik_dbase->version_cond);
715 #else
716             LWP_NoYieldSignal(&ubik_dbase->version);
717 #endif
718         }
719         if (!(urecovery_state & UBIK_RECHAVEDB)) {
720             DBRELE(ubik_dbase);
721             continue;           /* not ready */
722         }
723
724         /* If the database was newly initialized, then when we establish quorum, write
725          * a new label. This allows urecovery_AllBetter() to allow access for reads.
726          * Setting it to 2 also allows another site to come along with a newer
727          * database and overwrite this one.
728          */
729         if (ubik_dbase->version.epoch == 1) {
730             urecovery_AbortAll(ubik_dbase);
731             UBIK_VERSION_LOCK;
732             version_globals.ubik_epochTime = 2;
733             ubik_dbase->version.epoch = version_globals.ubik_epochTime;
734             ubik_dbase->version.counter = 1;
735             code =
736                 (*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
737             UBIK_VERSION_UNLOCK;
738             udisk_Invalidate(ubik_dbase, 0);    /* data may have changed */
739 #ifdef AFS_PTHREAD_ENV
740             CV_BROADCAST(&ubik_dbase->version_cond);
741 #else
742             LWP_NoYieldSignal(&ubik_dbase->version);
743 #endif
744         }
745
746         /* Check the other sites and send the database to them if they
747          * do not have the current db.
748          */
749         if (!(urecovery_state & UBIK_RECSENTDB)) {
750             /* now propagate out new version to everyone else */
751             dbok = 1;           /* start off assuming they all worked */
752
753             /*
754              * Check if a write transaction is in progress. We can't send the
755              * db when a write is in progress here because the db would be
756              * obsolete as soon as it goes there. Also, ops after the begin
757              * trans would reach the recepient and wouldn't find a transaction
758              * pending there.  Frankly, I don't think it's possible to get past
759              * the write-lock above if there is a write transaction in progress,
760              * but then, it won't hurt to check, will it?
761              */
762             if (ubik_dbase->flags & DBWRITING) {
763                 struct timeval tv;
764                 int safety = 0;
765                 long cur_usec = 50000;
766                 while ((ubik_dbase->flags & DBWRITING) && (safety < 500)) {
767                     DBRELE(ubik_dbase);
768                     /* sleep for a little while */
769                     tv.tv_sec = 0;
770                     tv.tv_usec = cur_usec;
771 #ifdef AFS_PTHREAD_ENV
772                     select(0, 0, 0, 0, &tv);
773 #else
774                     IOMGR_Select(0, 0, 0, 0, &tv);
775 #endif
776                     cur_usec += 10000;
777                     safety++;
778                     DBHOLD(ubik_dbase);
779                 }
780             }
781
782             for (ts = ubik_servers; ts; ts = ts->next) {
783                 UBIK_ADDR_LOCK;
784                 inAddr.s_addr = ts->addr[0];
785                 UBIK_ADDR_UNLOCK;
786                 UBIK_BEACON_LOCK;
787                 if (!ts->up) {
788                     UBIK_BEACON_UNLOCK;
789                     ubik_dprint("recovery cannot send version to %s\n",
790                                 afs_inet_ntoa_r(inAddr.s_addr, hoststr));
791                     dbok = 0;
792                     continue;
793                 }
794                 UBIK_BEACON_UNLOCK;
795                 ubik_dprint("recovery sending version to %s\n",
796                             afs_inet_ntoa_r(inAddr.s_addr, hoststr));
797                 if (vcmp(ts->version, ubik_dbase->version) != 0) {
798                     ubik_dprint("recovery stating local database\n");
799
800                     /* Rx code to do the Bulk Store */
801                     code = (*ubik_dbase->stat) (ubik_dbase, 0, &ubikstat);
802                     if (!code) {
803                         length = ubikstat.size;
804                         file = offset = 0;
805                         UBIK_ADDR_LOCK;
806                         rxcall = rx_NewCall(ts->disk_rxcid);
807                         UBIK_ADDR_UNLOCK;
808                         code =
809                             StartDISK_SendFile(rxcall, file, length,
810                                                &ubik_dbase->version);
811                         if (code) {
812                             ubik_dprint("StartDiskSendFile failed=%d\n",
813                                         code);
814                             goto StoreEndCall;
815                         }
816                         while (length > 0) {
817                             tlen =
818                                 (length >
819                                  sizeof(tbuffer) ? sizeof(tbuffer) : length);
820                             nbytes =
821                                 (*ubik_dbase->read) (ubik_dbase, file,
822                                                      tbuffer, offset, tlen);
823                             if (nbytes != tlen) {
824                                 ubik_dprint("Local disk read error=%d\n",
825                                             code = UIOERROR);
826                                 goto StoreEndCall;
827                             }
828                             nbytes = rx_Write(rxcall, tbuffer, tlen);
829                             if (nbytes != tlen) {
830                                 ubik_dprint("Rx-write bulk error=%d\n", code =
831                                             BULK_ERROR);
832                                 goto StoreEndCall;
833                             }
834                             offset += tlen;
835                             length -= tlen;
836                         }
837                         code = EndDISK_SendFile(rxcall);
838                       StoreEndCall:
839                         code = rx_EndCall(rxcall, code);
840                     }
841                     if (code == 0) {
842                         /* we set a new file, process its header */
843                         ts->version = ubik_dbase->version;
844                         ts->currentDB = 1;
845                     } else
846                         dbok = 0;
847                 } else {
848                     /* mark file up to date */
849                     ts->currentDB = 1;
850                 }
851             }
852             if (dbok)
853                 urecovery_state |= UBIK_RECSENTDB;
854         }
855         DBRELE(ubik_dbase);
856     }
857     return NULL;
858 }
859
860 /*!
861  * \brief send a Probe to all the network address of this server
862  *
863  * \return 0 if success, else return 1
864  */
865 int
866 DoProbe(struct ubik_server *server)
867 {
868     struct rx_connection *conns[UBIK_MAX_INTERFACE_ADDR];
869     struct rx_connection *connSuccess = 0;
870     int i, j, success_i = -1;
871     afs_uint32 addr;
872     char buffer[32];
873     char hoststr[16];
874
875     UBIK_ADDR_LOCK;
876     for (i = 0; (addr = server->addr[i]) && (i < UBIK_MAX_INTERFACE_ADDR);
877          i++) {
878         conns[i] =
879             rx_NewConnection(addr, ubik_callPortal, DISK_SERVICE_ID,
880                              addr_globals.ubikSecClass, addr_globals.ubikSecIndex);
881
882         /* user requirement to use only the primary interface */
883         if (ubikPrimaryAddrOnly) {
884             i = 1;
885             break;
886         }
887     }
888     UBIK_ADDR_UNLOCK;
889     osi_Assert(i);                      /* at least one interface address for this server */
890
891     multi_Rx(conns, i) {
892         multi_DISK_Probe();
893         if (!multi_error) {     /* first success */
894             success_i = multi_i;
895
896             multi_Abort;
897         }
898     } multi_End_Ignore;
899
900     if (success_i >= 0) {
901         UBIK_ADDR_LOCK;
902         addr = server->addr[success_i]; /* successful interface addr */
903
904         if (server->disk_rxcid) /* destroy existing conn */
905             rx_DestroyConnection(server->disk_rxcid);
906         if (server->vote_rxcid)
907             rx_DestroyConnection(server->vote_rxcid);
908
909         /* make new connections */
910         server->disk_rxcid = conns[success_i];
911         server->vote_rxcid = rx_NewConnection(addr, ubik_callPortal,
912                                               VOTE_SERVICE_ID, addr_globals.ubikSecClass,
913                                               addr_globals.ubikSecIndex);
914
915         connSuccess = conns[success_i];
916         strcpy(buffer, afs_inet_ntoa_r(server->addr[0], hoststr));
917
918         ubik_print("ubik:server %s is back up: will be contacted through %s\n",
919              buffer, afs_inet_ntoa_r(addr, hoststr));
920         UBIK_ADDR_UNLOCK;
921     }
922
923     /* Destroy all connections except the one on which we succeeded */
924     for (j = 0; j < i; j++)
925         if (conns[j] != connSuccess)
926             rx_DestroyConnection(conns[j]);
927
928     if (!connSuccess)
929         ubik_dprint("ubik:server %s still down\n",
930                     afs_inet_ntoa_r(server->addr[0], hoststr));
931
932     if (connSuccess)
933         return 0;               /* success */
934     else
935         return 1;               /* failure */
936 }