5e42b54055811e6249346c94e8b8e8c654febeca
[openafs.git] / src / ubik / recovery.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #include <roken.h>
14
15 #include <afs/opr.h>
16
17 #ifdef AFS_PTHREAD_ENV
18 # include <opr/lock.h>
19 #else
20 # include <opr/lockstub.h>
21 #endif
22
23 #include <rx/rx.h>
24 #include <afs/afsutil.h>
25 #include <afs/cellconfig.h>
26
27
28 #define UBIK_INTERNALS
29 #include "ubik.h"
30 #include "ubik_int.h"
31
32 /*! \file
33  * This module is responsible for determining when the system has
34  * recovered to the point that it can handle new transactions.  It
35  * replays logs, polls to determine the current dbase after a crash,
36  * and distributes the new database to the others.
37  *
38  * The sync site associates a version number with each database.  It
39  * broadcasts the version associated with its current dbase in every
40  * one of its beacon messages.  When the sync site send a dbase to a
41  * server, it also sends the db's version.  A non-sync site server can
42  * tell if it has the right dbase version by simply comparing the
43  * version from the beacon message \p uvote_dbVersion with the version
44  * associated with the database \p ubik_dbase->version.  The sync site
45  * itself simply has one counter to keep track of all of this (again
46  * \p ubik_dbase->version).
47  *
48  * sync site: routine called when the sync site loses its quorum; this
49  * procedure is called "up" from the beacon package.  It resyncs the
50  * dbase and nudges the recovery daemon to try to propagate out the
51  * changes.  It also resets the recovery daemon's state, since
52  * recovery must potentially find a new dbase to propagate out.  This
53  * routine should not do anything with variables used by non-sync site
54  * servers.
55  */
56
57 /*!
58  * if this flag is set, then ubik will use only the primary address
59  * (the address specified in the CellServDB) to contact other
60  * ubik servers. Ubik recovery will not try opening connections
61  * to the alternate interface addresses.
62  */
63 int ubikPrimaryAddrOnly;
64
65 int
66 urecovery_ResetState(void)
67 {
68     urecovery_state = 0;
69     return 0;
70 }
71
72 /*!
73  * \brief sync site
74  *
75  * routine called when a non-sync site server goes down; restarts recovery
76  * process to send missing server the new db when it comes back up for
77  * non-sync site servers.
78  *
79  * \note This routine should not do anything with variables used by non-sync site servers.
80  */
81 int
82 urecovery_LostServer(struct ubik_server *ts)
83 {
84     ubeacon_ReinitServer(ts);
85     return 0;
86 }
87
88 /*!
89  * return true iff we have a current database (called by both sync
90  * sites and non-sync sites) How do we determine this?  If we're the
91  * sync site, we wait until recovery has finished fetching and
92  * re-labelling its dbase (it may still be trying to propagate it out
93  * to everyone else; that's THEIR problem).  If we're not the sync
94  * site, then we must have a dbase labelled with the right version,
95  * and we must have a currently-good sync site.
96  */
97 int
98 urecovery_AllBetter(struct ubik_dbase *adbase, int areadAny)
99 {
100     afs_int32 rcode;
101
102     ViceLog(25, ("allbetter checking\n"));
103     rcode = 0;
104
105
106     if (areadAny) {
107         if (ubik_dbase->version.epoch > 1)
108             rcode = 1;          /* Happy with any good version of database */
109     }
110
111     /* Check if we're sync site and we've got the right data */
112     else if (ubeacon_AmSyncSite() && (urecovery_state & UBIK_RECHAVEDB)) {
113         rcode = 1;
114     }
115
116     /* next, check if we're aux site, and we've ever been sent the
117      * right data (note that if a dbase update fails, we won't think
118      * that the sync site is still the sync site, 'cause it won't talk
119      * to us until a timeout period has gone by.  When we recover, we
120      * leave this clear until we get a new dbase */
121     else if (uvote_HaveSyncAndVersion(ubik_dbase->version)) {
122         rcode = 1;
123     }
124
125     ViceLog(25, ("allbetter: returning %d\n", rcode));
126     return rcode;
127 }
128
129 /*!
130  * \brief abort all transactions on this database
131  */
132 int
133 urecovery_AbortAll(struct ubik_dbase *adbase)
134 {
135     struct ubik_trans *tt;
136     int reads = 0, writes = 0;
137
138     for (tt = adbase->activeTrans; tt; tt = tt->next) {
139         if (tt->type == UBIK_WRITETRANS)
140             writes++;
141         else
142             reads++;
143         udisk_abort(tt);
144     }
145     ViceLog(0, ("urecovery_AbortAll: just aborted %d read and %d write transactions\n",
146                     reads, writes));
147     return 0;
148 }
149
150 /*!
151  * \brief this routine aborts the current remote transaction, if any, if the tid is wrong
152  */
153 int
154 urecovery_CheckTid(struct ubik_tid *atid, int abortalways)
155 {
156     if (ubik_currentTrans) {
157         /* there is remote write trans, see if we match, see if this
158          * is a new transaction */
159         if (atid->epoch != ubik_currentTrans->tid.epoch
160             || atid->counter > ubik_currentTrans->tid.counter || abortalways) {
161             /* don't match, abort it */
162             /* If the thread is not waiting for lock - ok to end it */
163             if (ubik_currentTrans->locktype != LOCKWAIT) {
164                 udisk_end(ubik_currentTrans);
165             }
166             ubik_currentTrans = (struct ubik_trans *)0;
167         }
168     }
169     return 0;
170 }
171
172 /*!
173  * \brief replay logs
174  *
175  * log format is defined here, and implicitly in disk.c
176  *
177  * 4 byte opcode, followed by parameters, each 4 bytes long.  All integers
178  * are in logged in network standard byte order, in case we want to move logs
179  * from machine-to-machine someday.
180  *
181  * Begin transaction: opcode \n
182  * Commit transaction: opcode, version (8 bytes) \n
183  * Truncate file: opcode, file number, length \n
184  * Abort transaction: opcode \n
185  * Write data: opcode, file, position, length, <length> data bytes \n
186  *
187  * A very simple routine, it just replays the log.  Note that this is a new-value only log, which
188  * implies that no uncommitted data is written to the dbase: one writes data to the log, including
189  * the commit record, then we allow data to be written through to the dbase.  In our particular
190  * implementation, once a transaction is done, we write out the pages to the database, so that
191  * our buffer package doesn't have to know about stable and uncommitted data in the memory buffers:
192  * any changed data while there is an uncommitted write transaction can be zapped during an
193  * abort and the remaining dbase on the disk is exactly the right dbase, without having to read
194  * the log.
195  */
196 static int
197 ReplayLog(struct ubik_dbase *adbase)
198 {
199     afs_int32 opcode;
200     afs_int32 code, tpos;
201     int logIsGood;
202     afs_int32 len, thisSize, tfile, filePos;
203     afs_int32 buffer[4];
204     afs_int32 syncFile = -1;
205     afs_int32 data[1024];
206
207     /* read the lock twice, once to see whether we have a transaction to deal
208      * with that committed, (theoretically, we should support more than one
209      * trans in the log at once, but not yet), and once replaying the
210      * transactions.  */
211     tpos = 0;
212     logIsGood = 0;
213     /* for now, assume that all ops in log pertain to one transaction; see if there's a commit */
214     while (1) {
215         code =
216             (*adbase->read) (adbase, LOGFILE, (char *)&opcode, tpos,
217                              sizeof(afs_int32));
218         if (code != sizeof(afs_int32))
219             break;
220         opcode = ntohl(opcode);
221         if (opcode == LOGNEW) {
222             /* handle begin trans */
223             tpos += sizeof(afs_int32);
224         } else if (opcode == LOGABORT)
225             break;
226         else if (opcode == LOGEND) {
227             logIsGood = 1;
228             break;
229         } else if (opcode == LOGTRUNCATE) {
230             tpos += 4;
231             code =
232                 (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
233                                  2 * sizeof(afs_int32));
234             if (code != 2 * sizeof(afs_int32))
235                 break;          /* premature eof or io error */
236             tpos += 2 * sizeof(afs_int32);
237         } else if (opcode == LOGDATA) {
238             tpos += 4;
239             code =
240                 (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
241                                  3 * sizeof(afs_int32));
242             if (code != 3 * sizeof(afs_int32))
243                 break;
244             /* otherwise, skip over the data bytes, too */
245             tpos += ntohl(buffer[2]) + 3 * sizeof(afs_int32);
246         } else {
247             ViceLog(0, ("corrupt log opcode (%d) at position %d\n", opcode,
248                        tpos));
249             break;              /* corrupt log! */
250         }
251     }
252     if (logIsGood) {
253         /* actually do the replay; log should go all the way through the commit record, since
254          * we just read it above. */
255         tpos = 0;
256         logIsGood = 0;
257         syncFile = -1;
258         while (1) {
259             code =
260                 (*adbase->read) (adbase, LOGFILE, (char *)&opcode, tpos,
261                                  sizeof(afs_int32));
262             if (code != sizeof(afs_int32))
263                 break;
264             opcode = ntohl(opcode);
265             if (opcode == LOGNEW) {
266                 /* handle begin trans */
267                 tpos += sizeof(afs_int32);
268             } else if (opcode == LOGABORT)
269                 panic("log abort\n");
270             else if (opcode == LOGEND) {
271                 struct ubik_version version;
272                 tpos += 4;
273                 code =
274                     (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
275                                      2 * sizeof(afs_int32));
276                 if (code != 2 * sizeof(afs_int32))
277                     return UBADLOG;
278                 version.epoch = ntohl(buffer[0]);
279                 version.counter = ntohl(buffer[1]);
280                 code = (*adbase->setlabel) (adbase, 0, &version);
281                 if (code)
282                     return code;
283                 ViceLog(0, ("Successfully replayed log for interrupted "
284                            "transaction; db version is now %ld.%ld\n",
285                            (long) version.epoch, (long) version.counter));
286                 logIsGood = 1;
287                 break;          /* all done now */
288             } else if (opcode == LOGTRUNCATE) {
289                 tpos += 4;
290                 code =
291                     (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
292                                      2 * sizeof(afs_int32));
293                 if (code != 2 * sizeof(afs_int32))
294                     break;      /* premature eof or io error */
295                 tpos += 2 * sizeof(afs_int32);
296                 code =
297                     (*adbase->truncate) (adbase, ntohl(buffer[0]),
298                                          ntohl(buffer[1]));
299                 if (code)
300                     return code;
301             } else if (opcode == LOGDATA) {
302                 tpos += 4;
303                 code =
304                     (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
305                                      3 * sizeof(afs_int32));
306                 if (code != 3 * sizeof(afs_int32))
307                     break;
308                 tpos += 3 * sizeof(afs_int32);
309                 /* otherwise, skip over the data bytes, too */
310                 len = ntohl(buffer[2]); /* total number of bytes to copy */
311                 filePos = ntohl(buffer[1]);
312                 tfile = ntohl(buffer[0]);
313                 /* try to minimize file syncs */
314                 if (syncFile != tfile) {
315                     if (syncFile >= 0)
316                         code = (*adbase->sync) (adbase, syncFile);
317                     else
318                         code = 0;
319                     syncFile = tfile;
320                     if (code)
321                         return code;
322                 }
323                 while (len > 0) {
324                     thisSize = (len > sizeof(data) ? sizeof(data) : len);
325                     /* copy sizeof(data) buffer bytes at a time */
326                     code =
327                         (*adbase->read) (adbase, LOGFILE, (char *)data, tpos,
328                                          thisSize);
329                     if (code != thisSize)
330                         return UBADLOG;
331                     code =
332                         (*adbase->write) (adbase, tfile, (char *)data, filePos,
333                                           thisSize);
334                     if (code != thisSize)
335                         return UBADLOG;
336                     filePos += thisSize;
337                     tpos += thisSize;
338                     len -= thisSize;
339                 }
340             } else {
341                 ViceLog(0, ("corrupt log opcode (%d) at position %d\n",
342                            opcode, tpos));
343                 break;          /* corrupt log! */
344             }
345         }
346         if (logIsGood) {
347             if (syncFile >= 0)
348                 code = (*adbase->sync) (adbase, syncFile);
349             if (code)
350                 return code;
351         } else {
352             ViceLog(0, ("Log read error on pass 2\n"));
353             return UBADLOG;
354         }
355     }
356
357     /* now truncate the log, we're done with it */
358     code = (*adbase->truncate) (adbase, LOGFILE, 0);
359     return code;
360 }
361
362 /*! \brief
363  * Called at initialization to figure out version of the dbase we really have.
364  *
365  * This routine is called after replaying the log; it reads the restored labels.
366  */
367 static int
368 InitializeDB(struct ubik_dbase *adbase)
369 {
370     afs_int32 code;
371
372     code = (*adbase->getlabel) (adbase, 0, &adbase->version);
373     if (code) {
374         /* try setting the label to a new value */
375         UBIK_VERSION_LOCK;
376         adbase->version.epoch = 1;      /* value for newly-initialized db */
377         adbase->version.counter = 1;
378         code = (*adbase->setlabel) (adbase, 0, &adbase->version);
379         if (code) {
380             /* failed, try to set it back */
381             adbase->version.epoch = 0;
382             adbase->version.counter = 0;
383             (*adbase->setlabel) (adbase, 0, &adbase->version);
384         }
385         UBIK_VERSION_UNLOCK;
386     }
387     return 0;
388 }
389
390 /*!
391  * \brief initialize the local ubik_dbase
392  *
393  * We replay the logs and then read the resulting file to figure out what version we've really got.
394  */
395 int
396 urecovery_Initialize(struct ubik_dbase *adbase)
397 {
398     afs_int32 code;
399
400     DBHOLD(adbase);
401     code = ReplayLog(adbase);
402     if (code)
403         goto done;
404     code = InitializeDB(adbase);
405 done:
406     DBRELE(adbase);
407     return code;
408 }
409
410 /*!
411  * \brief Main interaction loop for the recovery manager
412  *
413  * The recovery light-weight process only runs when you're the
414  * synchronization site.  It performs the following tasks, if and only
415  * if the prerequisite tasks have been performed successfully (it
416  * keeps track of which ones have been performed in its bit map,
417  * \p urecovery_state).
418  *
419  * First, it is responsible for probing that all servers are up.  This
420  * is the only operation that must be performed even if this is not
421  * yet the sync site, since otherwise this site may not notice that
422  * enough other machines are running to even elect this guy to be the
423  * sync site.
424  *
425  * After that, the recovery process does nothing until the beacon and
426  * voting modules manage to get this site elected sync site.
427  *
428  * After becoming sync site, recovery first attempts to find the best
429  * database available in the network (it must do this in order to
430  * ensure finding the latest committed data).  After finding the right
431  * database, it must fetch this dbase to the sync site.
432  *
433  * After fetching the dbase, it relabels it with a new version number,
434  * to ensure that everyone recognizes this dbase as the most recent
435  * dbase.
436  *
437  * One the dbase has been relabelled, this machine can start handling
438  * requests.  However, the recovery module still has one more task:
439  * propagating the dbase out to everyone who is up in the network.
440  */
441 void *
442 urecovery_Interact(void *dummy)
443 {
444     afs_int32 code;
445     struct ubik_server *bestServer = NULL;
446     struct ubik_server *ts;
447     int dbok, doingRPC, now;
448     afs_int32 lastProbeTime;
449     /* if we're the sync site, the best db version we've found yet */
450     static struct ubik_version bestDBVersion;
451     struct ubik_version tversion;
452     struct timeval tv;
453     int length, tlen, offset, file, nbytes;
454     struct rx_call *rxcall;
455     char tbuffer[1024];
456     struct ubik_stat ubikstat;
457     struct in_addr inAddr;
458     char hoststr[16];
459     char pbuffer[1028];
460     int fd = -1;
461     afs_int32 pass;
462
463     opr_threadname_set("recovery");
464
465     /* otherwise, begin interaction */
466     urecovery_state = 0;
467     lastProbeTime = 0;
468     while (1) {
469         /* Run through this loop every 4 seconds */
470         tv.tv_sec = 4;
471         tv.tv_usec = 0;
472 #ifdef AFS_PTHREAD_ENV
473         select(0, 0, 0, 0, &tv);
474 #else
475         IOMGR_Select(0, 0, 0, 0, &tv);
476 #endif
477
478         ViceLog(5, ("recovery running in state %x\n", urecovery_state));
479
480         /* Every 30 seconds, check all the down servers and mark them
481          * as up if they respond. When a server comes up or found to
482          * not be current, then re-find the the best database and
483          * propogate it.
484          */
485         if ((now = FT_ApproxTime()) > 30 + lastProbeTime) {
486
487             for (ts = ubik_servers, doingRPC = 0; ts; ts = ts->next) {
488                 UBIK_BEACON_LOCK;
489                 if (!ts->up) {
490                     UBIK_BEACON_UNLOCK;
491                     doingRPC = 1;
492                     code = DoProbe(ts);
493                     if (code == 0) {
494                         UBIK_BEACON_LOCK;
495                         ts->up = 1;
496                         UBIK_BEACON_UNLOCK;
497                         DBHOLD(ubik_dbase);
498                         urecovery_state &= ~UBIK_RECFOUNDDB;
499                         DBRELE(ubik_dbase);
500                     }
501                 } else {
502                     UBIK_BEACON_UNLOCK;
503                     DBHOLD(ubik_dbase);
504                     if (!ts->currentDB)
505                         urecovery_state &= ~UBIK_RECFOUNDDB;
506                     DBRELE(ubik_dbase);
507                 }
508             }
509
510             if (doingRPC)
511                 now = FT_ApproxTime();
512             lastProbeTime = now;
513         }
514
515         /* Mark whether we are the sync site */
516         DBHOLD(ubik_dbase);
517         if (!ubeacon_AmSyncSite()) {
518             urecovery_state &= ~UBIK_RECSYNCSITE;
519             DBRELE(ubik_dbase);
520             continue;           /* nothing to do */
521         }
522         urecovery_state |= UBIK_RECSYNCSITE;
523
524         /* If a server has just come up or if we have not found the
525          * most current database, then go find the most current db.
526          */
527         if (!(urecovery_state & UBIK_RECFOUNDDB)) {
528             int okcalls = 0;
529             DBRELE(ubik_dbase);
530             bestServer = (struct ubik_server *)0;
531             bestDBVersion.epoch = 0;
532             bestDBVersion.counter = 0;
533             for (ts = ubik_servers; ts; ts = ts->next) {
534                 UBIK_BEACON_LOCK;
535                 if (!ts->up) {
536                     UBIK_BEACON_UNLOCK;
537                     continue;   /* don't bother with these guys */
538                 }
539                 UBIK_BEACON_UNLOCK;
540                 if (ts->isClone)
541                     continue;
542                 UBIK_ADDR_LOCK;
543                 code = DISK_GetVersion(ts->disk_rxcid, &ts->version);
544                 UBIK_ADDR_UNLOCK;
545                 if (code == 0) {
546                     okcalls++;
547                     /* perhaps this is the best version */
548                     if (vcmp(ts->version, bestDBVersion) > 0) {
549                         /* new best version */
550                         bestDBVersion = ts->version;
551                         bestServer = ts;
552                     }
553                 }
554             }
555
556             DBHOLD(ubik_dbase);
557
558             if (okcalls + 1 >= ubik_quorum) {
559                 /* If we've asked a majority of sites about their db version,
560                  * then we can say with confidence that we've found the best db
561                  * version. If we haven't contacted most sites (because
562                  * GetVersion failed or because we already know the server is
563                  * down), then we don't really know if we know about the best
564                  * db version. So we can only proceed in here if 'okcalls'
565                  * indicates we managed to contact a majority of sites. */
566
567                 /* take into consideration our version. Remember if we,
568                  * the sync site, have the best version. Also note that
569                  * we may need to send the best version out.
570                  */
571                 if (vcmp(ubik_dbase->version, bestDBVersion) >= 0) {
572                     bestDBVersion = ubik_dbase->version;
573                     bestServer = (struct ubik_server *)0;
574                     urecovery_state |= UBIK_RECHAVEDB;
575                 } else {
576                     /* Clear the flag only when we know we have to retrieve
577                      * the db. Because urecovery_AllBetter() looks at it.
578                      */
579                     urecovery_state &= ~UBIK_RECHAVEDB;
580                 }
581                 urecovery_state |= UBIK_RECFOUNDDB;
582                 urecovery_state &= ~UBIK_RECSENTDB;
583             }
584         }
585         if (!(urecovery_state & UBIK_RECFOUNDDB)) {
586             DBRELE(ubik_dbase);
587             continue;           /* not ready */
588         }
589
590         /* If we, the sync site, do not have the best db version, then
591          * go and get it from the server that does.
592          */
593         if ((urecovery_state & UBIK_RECHAVEDB) || !bestServer) {
594             urecovery_state |= UBIK_RECHAVEDB;
595         } else {
596             /* we don't have the best version; we should fetch it. */
597             urecovery_AbortAll(ubik_dbase);
598
599             /* Rx code to do the Bulk fetch */
600             file = 0;
601             offset = 0;
602             UBIK_ADDR_LOCK;
603             rxcall = rx_NewCall(bestServer->disk_rxcid);
604
605             ViceLog(0, ("Ubik: Synchronize database: receive (via GetFile) "
606                         "from server %s begin\n",
607                        afs_inet_ntoa_r(bestServer->addr[0], hoststr)));
608             UBIK_ADDR_UNLOCK;
609
610             code = StartDISK_GetFile(rxcall, file);
611             if (code) {
612                 ViceLog(0, ("StartDiskGetFile failed=%d\n", code));
613                 goto FetchEndCall;
614             }
615             nbytes = rx_Read(rxcall, (char *)&length, sizeof(afs_int32));
616             length = ntohl(length);
617             if (nbytes != sizeof(afs_int32)) {
618                 ViceLog(0, ("Rx-read length error=%d\n", BULK_ERROR));
619                 code = EIO;
620                 goto FetchEndCall;
621             }
622
623             /* give invalid label during file transit */
624             UBIK_VERSION_LOCK;
625             tversion.epoch = 0;
626             code = (*ubik_dbase->setlabel) (ubik_dbase, file, &tversion);
627             UBIK_VERSION_UNLOCK;
628             if (code) {
629                 ViceLog(0, ("setlabel io error=%d\n", code));
630                 goto FetchEndCall;
631             }
632             snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP",
633                      ubik_dbase->pathName, (file<0)?"SYS":"",
634                      (file<0)?-file:file);
635             fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
636             if (fd < 0) {
637                 code = errno;
638                 goto FetchEndCall;
639             }
640             code = lseek(fd, HDRSIZE, 0);
641             if (code != HDRSIZE) {
642                 close(fd);
643                 goto FetchEndCall;
644             }
645
646             pass = 0;
647             while (length > 0) {
648                 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
649 #ifndef AFS_PTHREAD_ENV
650                 if (pass % 4 == 0)
651                     IOMGR_Poll();
652 #endif
653                 nbytes = rx_Read(rxcall, tbuffer, tlen);
654                 if (nbytes != tlen) {
655                     ViceLog(0, ("Rx-read bulk error=%d\n", BULK_ERROR));
656                     code = EIO;
657                     close(fd);
658                     goto FetchEndCall;
659                 }
660                 nbytes = write(fd, tbuffer, tlen);
661                 pass++;
662                 if (nbytes != tlen) {
663                     code = UIOERROR;
664                     close(fd);
665                     goto FetchEndCall;
666                 }
667                 offset += tlen;
668                 length -= tlen;
669             }
670             code = close(fd);
671             if (code)
672                 goto FetchEndCall;
673             code = EndDISK_GetFile(rxcall, &tversion);
674           FetchEndCall:
675             code = rx_EndCall(rxcall, code);
676             if (!code) {
677                 /* we got a new file, set up its header */
678                 urecovery_state |= UBIK_RECHAVEDB;
679                 UBIK_VERSION_LOCK;
680                 memcpy(&ubik_dbase->version, &tversion,
681                        sizeof(struct ubik_version));
682                 snprintf(tbuffer, sizeof(tbuffer), "%s.DB%s%d",
683                          ubik_dbase->pathName, (file<0)?"SYS":"",
684                          (file<0)?-file:file);
685 #ifdef AFS_NT40_ENV
686                 snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD",
687                          ubik_dbase->pathName, (file<0)?"SYS":"",
688                          (file<0)?-file:file);
689                 code = unlink(pbuffer);
690                 if (!code)
691                     code = rename(tbuffer, pbuffer);
692                 snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP",
693                          ubik_dbase->pathName, (file<0)?"SYS":"",
694                          (file<0)?-file:file);
695 #endif
696                 if (!code)
697                     code = rename(pbuffer, tbuffer);
698                 if (!code) {
699                     (*ubik_dbase->open) (ubik_dbase, file);
700                     /* after data is good, sync disk with correct label */
701                     code =
702                         (*ubik_dbase->setlabel) (ubik_dbase, 0,
703                                                  &ubik_dbase->version);
704                 }
705                 UBIK_VERSION_UNLOCK;
706 #ifdef AFS_NT40_ENV
707                 snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD",
708                          ubik_dbase->pathName, (file<0)?"SYS":"",
709                          (file<0)?-file:file);
710                 unlink(pbuffer);
711 #endif
712             }
713             if (code) {
714                 unlink(pbuffer);
715                 /*
716                  * We will effectively invalidate the old data forever now.
717                  * Unclear if we *should* but we do.
718                  */
719                 UBIK_VERSION_LOCK;
720                 ubik_dbase->version.epoch = 0;
721                 ubik_dbase->version.counter = 0;
722                 UBIK_VERSION_UNLOCK;
723                 ViceLog(0,
724                     ("Ubik: Synchronize database: receive (via GetFile) "
725                     "from server %s failed (error = %d)\n",
726                     afs_inet_ntoa_r(bestServer->addr[0], hoststr), code));
727             } else {
728                 ViceLog(0,
729                     ("Ubik: Synchronize database: receive (via GetFile) "
730                     "from server %s complete, version: %d.%d\n",
731                     afs_inet_ntoa_r(bestServer->addr[0], hoststr),
732                     ubik_dbase->version.epoch, ubik_dbase->version.counter));
733
734                 urecovery_state |= UBIK_RECHAVEDB;
735             }
736             udisk_Invalidate(ubik_dbase, 0);    /* data has changed */
737         }
738         if (!(urecovery_state & UBIK_RECHAVEDB)) {
739             DBRELE(ubik_dbase);
740             continue;           /* not ready */
741         }
742
743         /* If the database was newly initialized, then when we establish quorum, write
744          * a new label. This allows urecovery_AllBetter() to allow access for reads.
745          * Setting it to 2 also allows another site to come along with a newer
746          * database and overwrite this one.
747          */
748         if (ubik_dbase->version.epoch == 1) {
749             urecovery_AbortAll(ubik_dbase);
750             UBIK_VERSION_LOCK;
751             ubik_dbase->version.epoch = 2;
752             ubik_dbase->version.counter = 1;
753             code =
754                 (*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
755             UBIK_VERSION_UNLOCK;
756             udisk_Invalidate(ubik_dbase, 0);    /* data may have changed */
757         }
758
759         /* Check the other sites and send the database to them if they
760          * do not have the current db.
761          */
762         if (!(urecovery_state & UBIK_RECSENTDB)) {
763             /* now propagate out new version to everyone else */
764             dbok = 1;           /* start off assuming they all worked */
765
766             /*
767              * Check if a write transaction is in progress. We can't send the
768              * db when a write is in progress here because the db would be
769              * obsolete as soon as it goes there. Also, ops after the begin
770              * trans would reach the recepient and wouldn't find a transaction
771              * pending there.  Frankly, I don't think it's possible to get past
772              * the write-lock above if there is a write transaction in progress,
773              * but then, it won't hurt to check, will it?
774              */
775             if (ubik_dbase->flags & DBWRITING) {
776                 struct timeval tv;
777                 int safety = 0;
778                 long cur_usec = 50000;
779                 while ((ubik_dbase->flags & DBWRITING) && (safety < 500)) {
780                     DBRELE(ubik_dbase);
781                     /* sleep for a little while */
782                     tv.tv_sec = 0;
783                     tv.tv_usec = cur_usec;
784 #ifdef AFS_PTHREAD_ENV
785                     select(0, 0, 0, 0, &tv);
786 #else
787                     IOMGR_Select(0, 0, 0, 0, &tv);
788 #endif
789                     cur_usec += 10000;
790                     safety++;
791                     DBHOLD(ubik_dbase);
792                 }
793             }
794
795             for (ts = ubik_servers; ts; ts = ts->next) {
796                 UBIK_ADDR_LOCK;
797                 inAddr.s_addr = ts->addr[0];
798                 UBIK_ADDR_UNLOCK;
799                 UBIK_BEACON_LOCK;
800                 if (!ts->up) {
801                     UBIK_BEACON_UNLOCK;
802                     /* It would be nice to have this message at loglevel
803                      * 0 as well, but it will log once every 4s for each
804                      * down server while in this recovery state.  This
805                      * should only be changed to loglevel 0 if it is
806                      * also rate-limited.
807                      */
808                     ViceLog(5, ("recovery cannot send version to %s\n",
809                                 afs_inet_ntoa_r(inAddr.s_addr, hoststr)));
810                     dbok = 0;
811                     continue;
812                 }
813                 UBIK_BEACON_UNLOCK;
814
815                 if (vcmp(ts->version, ubik_dbase->version) != 0) {
816                     ViceLog(0, ("Synchronize database: send (via SendFile) "
817                                 "to server %s begin\n",
818                             afs_inet_ntoa_r(inAddr.s_addr, hoststr)));
819
820                     /* Rx code to do the Bulk Store */
821                     code = (*ubik_dbase->stat) (ubik_dbase, 0, &ubikstat);
822                     if (!code) {
823                         length = ubikstat.size;
824                         file = offset = 0;
825                         UBIK_ADDR_LOCK;
826                         rxcall = rx_NewCall(ts->disk_rxcid);
827                         UBIK_ADDR_UNLOCK;
828                         code =
829                             StartDISK_SendFile(rxcall, file, length,
830                                                &ubik_dbase->version);
831                         if (code) {
832                             ViceLog(0, ("StartDiskSendFile failed=%d\n",
833                                         code));
834                             goto StoreEndCall;
835                         }
836                         while (length > 0) {
837                             tlen =
838                                 (length >
839                                  sizeof(tbuffer) ? sizeof(tbuffer) : length);
840                             nbytes =
841                                 (*ubik_dbase->read) (ubik_dbase, file,
842                                                      tbuffer, offset, tlen);
843                             if (nbytes != tlen) {
844                                 code = UIOERROR;
845                                 ViceLog(0, ("Local disk read error=%d\n", code));
846                                 goto StoreEndCall;
847                             }
848                             nbytes = rx_Write(rxcall, tbuffer, tlen);
849                             if (nbytes != tlen) {
850                                 code = BULK_ERROR;
851                                 ViceLog(0, ("Rx-write bulk error=%d\n", code));
852                                 goto StoreEndCall;
853                             }
854                             offset += tlen;
855                             length -= tlen;
856                         }
857                         code = EndDISK_SendFile(rxcall);
858                       StoreEndCall:
859                         code = rx_EndCall(rxcall, code);
860                     }
861
862                     if (code == 0) {
863                         /* we set a new file, process its header */
864                         ts->version = ubik_dbase->version;
865                         ts->currentDB = 1;
866                         ViceLog(0,
867                             ("Ubik: Synchronize database: send (via SendFile) "
868                             "to server %s complete, version: %d.%d\n",
869                             afs_inet_ntoa_r(inAddr.s_addr, hoststr),
870                             ts->version.epoch, ts->version.counter));
871
872                     } else {
873                         dbok = 0;
874                         ViceLog(0,
875                             ("Ubik: Synchronize database: send (via SendFile) "
876                              "to server %s failed (error = %d)\n",
877                             afs_inet_ntoa_r(inAddr.s_addr, hoststr), code));
878                     }
879                 } else {
880                     /* mark file up to date */
881                     ts->currentDB = 1;
882                 }
883             }
884             if (dbok)
885                 urecovery_state |= UBIK_RECSENTDB;
886         }
887         DBRELE(ubik_dbase);
888     }
889     AFS_UNREACHED(return(NULL));
890 }
891
892 /*!
893  * \brief send a Probe to all the network address of this server
894  *
895  * \return 0 if success, else return 1
896  */
897 int
898 DoProbe(struct ubik_server *server)
899 {
900     struct rx_connection *conns[UBIK_MAX_INTERFACE_ADDR];
901     struct rx_connection *connSuccess = 0;
902     int i, nconns, success_i = -1;
903     afs_uint32 addr;
904     char buffer[32];
905     char hoststr[16];
906
907     UBIK_ADDR_LOCK;
908     for (i = 0; (i < UBIK_MAX_INTERFACE_ADDR) && (addr = server->addr[i]);
909          i++) {
910         conns[i] =
911             rx_NewConnection(addr, ubik_callPortal, DISK_SERVICE_ID,
912                              addr_globals.ubikSecClass, addr_globals.ubikSecIndex);
913
914         /* user requirement to use only the primary interface */
915         if (ubikPrimaryAddrOnly) {
916             i = 1;
917             break;
918         }
919     }
920     UBIK_ADDR_UNLOCK;
921     nconns = i;
922     opr_Assert(nconns);                 /* at least one interface address for this server */
923
924     multi_Rx(conns, nconns) {
925         multi_DISK_Probe();
926         if (!multi_error) {     /* first success */
927             success_i = multi_i;
928
929             multi_Abort;
930         }
931     } multi_End_Ignore;
932
933     if (success_i >= 0) {
934         UBIK_ADDR_LOCK;
935         addr = server->addr[success_i]; /* successful interface addr */
936
937         if (server->disk_rxcid) /* destroy existing conn */
938             rx_DestroyConnection(server->disk_rxcid);
939         if (server->vote_rxcid)
940             rx_DestroyConnection(server->vote_rxcid);
941
942         /* make new connections */
943         server->disk_rxcid = conns[success_i];
944         server->vote_rxcid = rx_NewConnection(addr, ubik_callPortal,
945                                               VOTE_SERVICE_ID, addr_globals.ubikSecClass,
946                                               addr_globals.ubikSecIndex);
947
948         connSuccess = conns[success_i];
949         strcpy(buffer, afs_inet_ntoa_r(server->addr[0], hoststr));
950
951         ViceLog(0, ("ubik:server %s is back up: will be contacted through %s\n",
952              buffer, afs_inet_ntoa_r(addr, hoststr)));
953         UBIK_ADDR_UNLOCK;
954     }
955
956     /* Destroy all connections except the one on which we succeeded */
957     for (i = 0; i < nconns; i++)
958         if (conns[i] != connSuccess)
959             rx_DestroyConnection(conns[i]);
960
961     if (!connSuccess)
962         ViceLog(5, ("ubik:server %s still down\n",
963                     afs_inet_ntoa_r(server->addr[0], hoststr)));
964
965     if (connSuccess)
966         return 0;               /* success */
967     else
968         return 1;               /* failure */
969 }