ubik-doxygen-20081121
[openafs.git] / src / ubik / recovery.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 RCSID
14     ("$Header$");
15
16 #include <sys/types.h>
17 #ifdef AFS_NT40_ENV
18 #include <winsock2.h>
19 #include <time.h>
20 #include <fcntl.h>
21 #else
22 #include <sys/file.h>
23 #include <netinet/in.h>
24 #include <sys/time.h>
25 #endif
26 #include <assert.h>
27 #include <lock.h>
28 #include <string.h>
29 #include <rx/xdr.h>
30 #include <rx/rx.h>
31 #include <errno.h>
32 #include <afs/afsutil.h>
33
34 #define UBIK_INTERNALS
35 #include "ubik.h"
36 #include "ubik_int.h"
37
38 /*! \file
39  * This module is responsible for determining when the system has
40  * recovered to the point that it can handle new transactions.  It
41  * replays logs, polls to determine the current dbase after a crash,
42  * and distributes the new database to the others.
43  *
44  * The sync site associates a version number with each database.  It
45  * broadcasts the version associated with its current dbase in every
46  * one of its beacon messages.  When the sync site send a dbase to a
47  * server, it also sends the db's version.  A non-sync site server can
48  * tell if it has the right dbase version by simply comparing the
49  * version from the beacon message \p uvote_dbVersion with the version
50  * associated with the database \p ubik_dbase->version.  The sync site
51  * itself simply has one counter to keep track of all of this (again
52  * \p ubik_dbase->version).
53  *
54  * sync site: routine called when the sync site loses its quorum; this
55  * procedure is called "up" from the beacon package.  It resyncs the
56  * dbase and nudges the recovery daemon to try to propagate out the
57  * changes.  It also resets the recovery daemon's state, since
58  * recovery must potentially find a new dbase to propagate out.  This
59  * routine should not do anything with variables used by non-sync site
60  * servers.
61  */
62
63 /*!
64  * if this flag is set, then ubik will use only the primary address 
65  * (the address specified in the CellServDB) to contact other 
66  * ubik servers. Ubik recovery will not try opening connections 
67  * to the alternate interface addresses. 
68  */
69 int ubikPrimaryAddrOnly;
70
71 int
72 urecovery_ResetState(void)
73 {
74     urecovery_state = 0;
75 #if !defined(AFS_PTHREAD_ENV)
76     /*  No corresponding LWP_WaitProcess found anywhere for this -- klm */
77     LWP_NoYieldSignal(&urecovery_state);
78 #endif
79     return 0;
80 }
81
82 /*!
83  * \brief sync site
84  *
85  * routine called when a non-sync site server goes down; restarts recovery
86  * process to send missing server the new db when it comes back up.
87  *
88  * \note This routine should not do anything with variables used by non-sync site servers. 
89  */
90 int
91 urecovery_LostServer(void)
92 {
93 #if !defined(AFS_PTHREAD_ENV)
94     /*  No corresponding LWP_WaitProcess found anywhere for this -- klm */
95     LWP_NoYieldSignal(&urecovery_state);
96     return 0;
97 #endif
98 }
99
100 /*!
101  * return true iff we have a current database (called by both sync
102  * sites and non-sync sites) How do we determine this?  If we're the
103  * sync site, we wait until recovery has finished fetching and
104  * re-labelling its dbase (it may still be trying to propagate it out
105  * to everyone else; that's THEIR problem).  If we're not the sync
106  * site, then we must have a dbase labelled with the right version,
107  * and we must have a currently-good sync site.
108  */
109 int
110 urecovery_AllBetter(register struct ubik_dbase *adbase, int areadAny)
111 {
112     register afs_int32 rcode;
113
114     ubik_dprint("allbetter checking\n");
115     rcode = 0;
116
117
118     if (areadAny) {
119         if (ubik_dbase->version.epoch > 1)
120             rcode = 1;          /* Happy with any good version of database */
121     }
122
123     /* Check if we're sync site and we've got the right data */
124     else if (ubeacon_AmSyncSite() && (urecovery_state & UBIK_RECHAVEDB)) {
125         rcode = 1;
126     }
127
128     /* next, check if we're aux site, and we've ever been sent the
129      * right data (note that if a dbase update fails, we won't think
130      * that the sync site is still the sync site, 'cause it won't talk
131      * to us until a timeout period has gone by.  When we recover, we
132      * leave this clear until we get a new dbase */
133     else if ((uvote_GetSyncSite() && (vcmp(ubik_dbVersion, ubik_dbase->version) == 0))) {       /* && order is important */
134         rcode = 1;
135     }
136
137     ubik_dprint("allbetter: returning %d\n", rcode);
138     return rcode;
139 }
140
141 /*!
142  * \brief abort all transactions on this database
143  */
144 int
145 urecovery_AbortAll(struct ubik_dbase *adbase)
146 {
147     register struct ubik_trans *tt;
148     for (tt = adbase->activeTrans; tt; tt = tt->next) {
149         udisk_abort(tt);
150     }
151     return 0;
152 }
153
154 /*!
155  * \brief this routine aborts the current remote transaction, if any, if the tid is wrong
156  */
157 int
158 urecovery_CheckTid(register struct ubik_tid *atid)
159 {
160     if (ubik_currentTrans) {
161         /* there is remote write trans, see if we match, see if this
162          * is a new transaction */
163         if (atid->epoch != ubik_currentTrans->tid.epoch
164             || atid->counter > ubik_currentTrans->tid.counter) {
165             /* don't match, abort it */
166             /* If the thread is not waiting for lock - ok to end it */
167 #if !defined(UBIK_PAUSE)
168             if (ubik_currentTrans->locktype != LOCKWAIT) {
169 #endif /* UBIK_PAUSE */
170                 udisk_end(ubik_currentTrans);
171 #if !defined(UBIK_PAUSE)
172             }
173 #endif /* UBIK_PAUSE */
174             ubik_currentTrans = (struct ubik_trans *)0;
175         }
176     }
177     return 0;
178 }
179
180 /*!
181  * \brief replay logs
182  *
183  * log format is defined here, and implicitly in disk.c
184  *
185  * 4 byte opcode, followed by parameters, each 4 bytes long.  All integers
186  * are in logged in network standard byte order, in case we want to move logs
187  * from machine-to-machine someday.
188  *
189  * Begin transaction: opcode \n
190  * Commit transaction: opcode, version (8 bytes) \n
191  * Truncate file: opcode, file number, length \n
192  * Abort transaction: opcode \n
193  * Write data: opcode, file, position, length, <length> data bytes \n
194  *
195  * A very simple routine, it just replays the log.  Note that this is a new-value only log, which
196  * implies that no uncommitted data is written to the dbase: one writes data to the log, including
197  * the commit record, then we allow data to be written through to the dbase.  In our particular
198  * implementation, once a transaction is done, we write out the pages to the database, so that
199  * our buffer package doesn't have to know about stable and uncommitted data in the memory buffers:
200  * any changed data while there is an uncommitted write transaction can be zapped during an
201  * abort and the remaining dbase on the disk is exactly the right dbase, without having to read
202  * the log.
203  */
204 static int
205 ReplayLog(register struct ubik_dbase *adbase)
206 {
207     afs_int32 opcode;
208     register afs_int32 code, tpos;
209     int logIsGood;
210     afs_int32 len, thisSize, tfile, filePos;
211     afs_int32 buffer[4];
212     afs_int32 syncFile = -1;
213     afs_int32 data[1024];
214
215     /* read the lock twice, once to see whether we have a transaction to deal
216      * with that committed, (theoretically, we should support more than one
217      * trans in the log at once, but not yet), and once replaying the
218      * transactions.  */
219     tpos = 0;
220     logIsGood = 0;
221     /* for now, assume that all ops in log pertain to one transaction; see if there's a commit */
222     while (1) {
223         code =
224             (*adbase->read) (adbase, LOGFILE, (char *)&opcode, tpos,
225                              sizeof(afs_int32));
226         if (code != sizeof(afs_int32))
227             break;
228         if (opcode == LOGNEW) {
229             /* handle begin trans */
230             tpos += sizeof(afs_int32);
231         } else if (opcode == LOGABORT)
232             break;
233         else if (opcode == LOGEND) {
234             logIsGood = 1;
235             break;
236         } else if (opcode == LOGTRUNCATE) {
237             tpos += 4;
238             code =
239                 (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
240                                  2 * sizeof(afs_int32));
241             if (code != 2 * sizeof(afs_int32))
242                 break;          /* premature eof or io error */
243             tpos += 2 * sizeof(afs_int32);
244         } else if (opcode == LOGDATA) {
245             tpos += 4;
246             code =
247                 (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
248                                  3 * sizeof(afs_int32));
249             if (code != 3 * sizeof(afs_int32))
250                 break;
251             /* otherwise, skip over the data bytes, too */
252             tpos += buffer[2] + 3 * sizeof(afs_int32);
253         } else {
254             ubik_dprint("corrupt log opcode (%d) at position %d\n", opcode,
255                         tpos);
256             break;              /* corrupt log! */
257         }
258     }
259     if (logIsGood) {
260         /* actually do the replay; log should go all the way through the commit record, since
261          * we just read it above. */
262         tpos = 0;
263         logIsGood = 0;
264         syncFile = -1;
265         while (1) {
266             code =
267                 (*adbase->read) (adbase, LOGFILE, (char *)&opcode, tpos,
268                                  sizeof(afs_int32));
269             if (code != sizeof(afs_int32))
270                 break;
271             if (opcode == LOGNEW) {
272                 /* handle begin trans */
273                 tpos += sizeof(afs_int32);
274             } else if (opcode == LOGABORT)
275                 panic("log abort\n");
276             else if (opcode == LOGEND) {
277                 tpos += 4;
278                 code =
279                     (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
280                                      2 * sizeof(afs_int32));
281                 if (code != 2 * sizeof(afs_int32))
282                     return UBADLOG;
283                 code = (*adbase->setlabel) (adbase, 0, (ubik_version *)buffer);
284                 if (code)
285                     return code;
286                 logIsGood = 1;
287                 break;          /* all done now */
288             } else if (opcode == LOGTRUNCATE) {
289                 tpos += 4;
290                 code =
291                     (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
292                                      2 * sizeof(afs_int32));
293                 if (code != 2 * sizeof(afs_int32))
294                     break;      /* premature eof or io error */
295                 tpos += 2 * sizeof(afs_int32);
296                 code =
297                     (*adbase->truncate) (adbase, ntohl(buffer[0]),
298                                          ntohl(buffer[1]));
299                 if (code)
300                     return code;
301             } else if (opcode == LOGDATA) {
302                 tpos += 4;
303                 code =
304                     (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
305                                      3 * sizeof(afs_int32));
306                 if (code != 3 * sizeof(afs_int32))
307                     break;
308                 tpos += 3 * sizeof(afs_int32);
309                 /* otherwise, skip over the data bytes, too */
310                 len = ntohl(buffer[2]); /* total number of bytes to copy */
311                 filePos = ntohl(buffer[1]);
312                 tfile = ntohl(buffer[0]);
313                 /* try to minimize file syncs */
314                 if (syncFile != tfile) {
315                     if (syncFile >= 0)
316                         code = (*adbase->sync) (adbase, syncFile);
317                     else
318                         code = 0;
319                     syncFile = tfile;
320                     if (code)
321                         return code;
322                 }
323                 while (len > 0) {
324                     thisSize = (len > sizeof(data) ? sizeof(data) : len);
325                     /* copy sizeof(data) buffer bytes at a time */
326                     code =
327                         (*adbase->read) (adbase, LOGFILE, (char *)data, tpos,
328                                          thisSize);
329                     if (code != thisSize)
330                         return UBADLOG;
331                     code =
332                         (*adbase->write) (adbase, tfile, (char *)data, filePos,
333                                           thisSize);
334                     if (code != thisSize)
335                         return UBADLOG;
336                     filePos += thisSize;
337                     tpos += thisSize;
338                     len -= thisSize;
339                 }
340             } else {
341                 ubik_dprint("corrupt log opcode (%d) at position %d\n",
342                             opcode, tpos);
343                 break;          /* corrupt log! */
344             }
345         }
346         if (logIsGood) {
347             if (syncFile >= 0)
348                 code = (*adbase->sync) (adbase, syncFile);
349             if (code)
350                 return code;
351         } else {
352             ubik_dprint("Log read error on pass 2\n");
353             return UBADLOG;
354         }
355     }
356
357     /* now truncate the log, we're done with it */
358     code = (*adbase->truncate) (adbase, LOGFILE, 0);
359     return code;
360 }
361
362 /*! \brief
363  * Called at initialization to figure out version of the dbase we really have.
364  *
365  * This routine is called after replaying the log; it reads the restored labels.
366  */
367 static int
368 InitializeDB(register struct ubik_dbase *adbase)
369 {
370     register afs_int32 code;
371
372     code = (*adbase->getlabel) (adbase, 0, &adbase->version);
373     if (code) {
374         /* try setting the label to a new value */
375         adbase->version.epoch = 1;      /* value for newly-initialized db */
376         adbase->version.counter = 1;
377         code = (*adbase->setlabel) (adbase, 0, &adbase->version);
378         if (code) {
379             /* failed, try to set it back */
380             adbase->version.epoch = 0;
381             adbase->version.counter = 0;
382             (*adbase->setlabel) (adbase, 0, &adbase->version);
383         }
384 #ifdef AFS_PTHREAD_ENV
385         assert(pthread_cond_broadcast(&adbase->version_cond) == 0);
386 #else
387         LWP_NoYieldSignal(&adbase->version);
388 #endif
389     }
390     return 0;
391 }
392
393 /*!
394  * \brief initialize the local ubik_dbase
395  *
396  * We replay the logs and then read the resulting file to figure out what version we've really got.
397  */
398 int
399 urecovery_Initialize(register struct ubik_dbase *adbase)
400 {
401     register afs_int32 code;
402
403     code = ReplayLog(adbase);
404     if (code)
405         return code;
406     code = InitializeDB(adbase);
407     return code;
408 }
409
410 /*!
411  * \brief Main interaction loop for the recovery manager
412  *
413  * The recovery light-weight process only runs when you're the
414  * synchronization site.  It performs the following tasks, if and only
415  * if the prerequisite tasks have been performed successfully (it
416  * keeps track of which ones have been performed in its bit map,
417  * \p urecovery_state).
418  *
419  * First, it is responsible for probing that all servers are up.  This
420  * is the only operation that must be performed even if this is not
421  * yet the sync site, since otherwise this site may not notice that
422  * enough other machines are running to even elect this guy to be the
423  * sync site.
424  *
425  * After that, the recovery process does nothing until the beacon and
426  * voting modules manage to get this site elected sync site.
427  *
428  * After becoming sync site, recovery first attempts to find the best
429  * database available in the network (it must do this in order to
430  * ensure finding the latest committed data).  After finding the right
431  * database, it must fetch this dbase to the sync site.
432  *
433  * After fetching the dbase, it relabels it with a new version number,
434  * to ensure that everyone recognizes this dbase as the most recent
435  * dbase.
436  *
437  * One the dbase has been relabelled, this machine can start handling
438  * requests.  However, the recovery module still has one more task:
439  * propagating the dbase out to everyone who is up in the network.
440  */
441 void *
442 urecovery_Interact(void *dummy)
443 {
444     afs_int32 code, tcode;
445     struct ubik_server *bestServer = NULL;
446     struct ubik_server *ts;
447     int dbok, doingRPC, now;
448     afs_int32 lastProbeTime, lastDBVCheck;
449     /* if we're the sync site, the best db version we've found yet */
450     static struct ubik_version bestDBVersion;
451     struct ubik_version tversion;
452     struct timeval tv;
453     int length, tlen, offset, file, nbytes;
454     struct rx_call *rxcall;
455     char tbuffer[1024];
456     struct ubik_stat ubikstat;
457     struct in_addr inAddr;
458 #ifndef OLD_URECOVERY
459     char pbuffer[1028];
460     int flen, fd = -1;
461     afs_int32 epoch, pass;
462 #endif
463
464     /* otherwise, begin interaction */
465     urecovery_state = 0;
466     lastProbeTime = 0;
467     lastDBVCheck = 0;
468     while (1) {
469         /* Run through this loop every 4 seconds */
470         tv.tv_sec = 4;
471         tv.tv_usec = 0;
472 #ifdef AFS_PTHREAD_ENV
473         select(0, 0, 0, 0, &tv);
474 #else
475         IOMGR_Select(0, 0, 0, 0, &tv);
476 #endif
477
478         ubik_dprint("recovery running in state %x\n", urecovery_state);
479
480         /* Every 30 seconds, check all the down servers and mark them
481          * as up if they respond. When a server comes up or found to
482          * not be current, then re-find the the best database and 
483          * propogate it.
484          */
485         if ((now = FT_ApproxTime()) > 30 + lastProbeTime) {
486             for (ts = ubik_servers, doingRPC = 0; ts; ts = ts->next) {
487                 if (!ts->up) {
488                     doingRPC = 1;
489                     code = DoProbe(ts);
490                     if (code == 0) {
491                         ts->up = 1;
492                         urecovery_state &= ~UBIK_RECFOUNDDB;
493                     }
494                 } else if (!ts->currentDB) {
495                     urecovery_state &= ~UBIK_RECFOUNDDB;
496                 }
497             }
498             if (doingRPC)
499                 now = FT_ApproxTime();
500             lastProbeTime = now;
501         }
502
503         /* Mark whether we are the sync site */
504         if (!ubeacon_AmSyncSite()) {
505             urecovery_state &= ~UBIK_RECSYNCSITE;
506             continue;           /* nothing to do */
507         }
508         urecovery_state |= UBIK_RECSYNCSITE;
509
510         /* If a server has just come up or if we have not found the 
511          * most current database, then go find the most current db.
512          */
513         if (!(urecovery_state & UBIK_RECFOUNDDB)) {
514             bestServer = (struct ubik_server *)0;
515             bestDBVersion.epoch = 0;
516             bestDBVersion.counter = 0;
517             for (ts = ubik_servers; ts; ts = ts->next) {
518                 if (!ts->up)
519                     continue;   /* don't bother with these guys */
520                 if (ts->isClone)
521                     continue;
522                 code = DISK_GetVersion(ts->disk_rxcid, &ts->version);
523                 if (code == 0) {
524                     /* perhaps this is the best version */
525                     if (vcmp(ts->version, bestDBVersion) > 0) {
526                         /* new best version */
527                         bestDBVersion = ts->version;
528                         bestServer = ts;
529                     }
530                 }
531             }
532             /* take into consideration our version. Remember if we,
533              * the sync site, have the best version. Also note that
534              * we may need to send the best version out.
535              */
536             if (vcmp(ubik_dbase->version, bestDBVersion) >= 0) {
537                 bestDBVersion = ubik_dbase->version;
538                 bestServer = (struct ubik_server *)0;
539                 urecovery_state |= UBIK_RECHAVEDB;
540             } else {
541                 /* Clear the flag only when we know we have to retrieve
542                  * the db. Because urecovery_AllBetter() looks at it.
543                  */
544                 urecovery_state &= ~UBIK_RECHAVEDB;
545             }
546             lastDBVCheck = FT_ApproxTime();
547             urecovery_state |= UBIK_RECFOUNDDB;
548             urecovery_state &= ~UBIK_RECSENTDB;
549         }
550 #if defined(UBIK_PAUSE)
551         /* it's not possible for UBIK_RECFOUNDDB not to be set here.
552          * However, we might have lost UBIK_RECSYNCSITE, and that
553          * IS important.
554          */
555         if (!(urecovery_state & UBIK_RECSYNCSITE))
556             continue;           /* lost sync */
557 #else
558         if (!(urecovery_state & UBIK_RECFOUNDDB))
559             continue;           /* not ready */
560 #endif /* UBIK_PAUSE */
561
562         /* If we, the sync site, do not have the best db version, then
563          * go and get it from the server that does.
564          */
565         if ((urecovery_state & UBIK_RECHAVEDB) || !bestServer) {
566             urecovery_state |= UBIK_RECHAVEDB;
567         } else {
568             /* we don't have the best version; we should fetch it. */
569             DBHOLD(ubik_dbase);
570             urecovery_AbortAll(ubik_dbase);
571
572             /* Rx code to do the Bulk fetch */
573             file = 0;
574             offset = 0;
575             rxcall = rx_NewCall(bestServer->disk_rxcid);
576
577             ubik_print("Ubik: Synchronize database with server %s\n",
578                        afs_inet_ntoa(bestServer->addr[0]));
579
580             code = StartDISK_GetFile(rxcall, file);
581             if (code) {
582                 ubik_dprint("StartDiskGetFile failed=%d\n", code);
583                 goto FetchEndCall;
584             }
585             nbytes = rx_Read(rxcall, (char *)&length, sizeof(afs_int32));
586             length = ntohl(length);
587             if (nbytes != sizeof(afs_int32)) {
588                 ubik_dprint("Rx-read length error=%d\n", code = BULK_ERROR);
589                 code = EIO;
590                 goto FetchEndCall;
591             }
592
593 #ifdef OLD_URECOVERY
594             /* Truncate the file first */
595             code = (*ubik_dbase->truncate) (ubik_dbase, file, 0);
596             if (code) {
597                 ubik_dprint("truncate io error=%d\n", code);
598                 goto FetchEndCall;
599             }
600             tversion.counter = 0;
601 #endif
602             /* give invalid label during file transit */
603             tversion.epoch = 0;
604             code = (*ubik_dbase->setlabel) (ubik_dbase, file, &tversion);
605             if (code) {
606                 ubik_dprint("setlabel io error=%d\n", code);
607                 goto FetchEndCall;
608             }
609 #ifndef OLD_URECOVERY
610             flen = length;
611             afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.TMP", ubik_dbase->pathName);
612             fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
613             if (fd < 0) {
614                 code = errno;
615                 goto FetchEndCall;
616             }
617             code = lseek(fd, HDRSIZE, 0);
618             if (code != HDRSIZE) {
619                 close(fd);
620                 goto FetchEndCall;
621             }
622 #endif
623
624             pass = 0;
625             while (length > 0) {
626                 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
627 #ifndef AFS_PTHREAD_ENV
628                 if (pass % 4 == 0)
629                     IOMGR_Poll();
630 #endif
631                 nbytes = rx_Read(rxcall, tbuffer, tlen);
632                 if (nbytes != tlen) {
633                     ubik_dprint("Rx-read bulk error=%d\n", code = BULK_ERROR);
634                     code = EIO;
635                     close(fd);
636                     goto FetchEndCall;
637                 }
638 #ifdef OLD_URECOVERY
639                 nbytes =
640                     (*ubik_dbase->write) (ubik_dbase, file, tbuffer, offset,
641                                           tlen);
642 #else
643                 nbytes = write(fd, tbuffer, tlen);
644                 pass++;
645 #endif
646                 if (nbytes != tlen) {
647                     code = UIOERROR;
648                     close(fd);
649                     goto FetchEndCall;
650                 }
651                 offset += tlen;
652                 length -= tlen;
653             }
654 #ifndef OLD_URECOVERY
655             code = close(fd);
656             if (code)
657                 goto FetchEndCall;
658 #endif      
659             code = EndDISK_GetFile(rxcall, &tversion);
660           FetchEndCall:
661             tcode = rx_EndCall(rxcall, code);
662             if (!code)
663                 code = tcode;
664             if (!code) {
665                 /* we got a new file, set up its header */
666                 urecovery_state |= UBIK_RECHAVEDB;
667                 memcpy(&ubik_dbase->version, &tversion,
668                        sizeof(struct ubik_version));
669 #ifdef OLD_URECOVERY
670                 (*ubik_dbase->sync) (ubik_dbase, 0);    /* get data out first */
671 #else
672                 afs_snprintf(tbuffer, sizeof(tbuffer), "%s.DB0", ubik_dbase->pathName);
673 #ifdef AFS_NT40_ENV
674                 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.OLD", ubik_dbase->pathName);
675                 code = unlink(pbuffer);
676                 if (!code)
677                     code = rename(tbuffer, pbuffer);
678                 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.TMP", ubik_dbase->pathName);
679 #endif
680                 if (!code) 
681                     code = rename(pbuffer, tbuffer);
682                 if (!code) 
683                     code = (*ubik_dbase->open) (ubik_dbase, 0);
684                 if (!code)
685 #endif
686                 /* after data is good, sync disk with correct label */
687                 code =
688                     (*ubik_dbase->setlabel) (ubik_dbase, 0,
689                                              &ubik_dbase->version);
690 #ifndef OLD_URECOVERY
691 #ifdef AFS_NT40_ENV
692                 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.OLD", ubik_dbase->pathName);
693                 unlink(pbuffer);
694 #endif
695 #endif
696             }
697             if (code) {
698 #ifndef OLD_URECOVERY
699                 unlink(pbuffer);
700                 /* 
701                  * We will effectively invalidate the old data forever now.
702                  * Unclear if we *should* but we do.
703                  */
704 #endif
705                 ubik_dbase->version.epoch = 0;
706                 ubik_dbase->version.counter = 0;
707                 ubik_print("Ubik: Synchronize database failed (error = %d)\n",
708                            code);
709             } else {
710                 ubik_print("Ubik: Synchronize database completed\n");
711                 urecovery_state |= UBIK_RECHAVEDB;
712             }
713             udisk_Invalidate(ubik_dbase, 0);    /* data has changed */
714 #ifdef AFS_PTHREAD_ENV
715             assert(pthread_cond_broadcast(&ubik_dbase->version_cond) == 0);
716 #else
717             LWP_NoYieldSignal(&ubik_dbase->version);
718 #endif
719             DBRELE(ubik_dbase);
720         }
721 #if defined(UBIK_PAUSE)
722         if (!(urecovery_state & UBIK_RECSYNCSITE))
723             continue;           /* lost sync */
724 #endif /* UBIK_PAUSE */
725         if (!(urecovery_state & UBIK_RECHAVEDB))
726             continue;           /* not ready */
727
728         /* If the database was newly initialized, then when we establish quorum, write
729          * a new label. This allows urecovery_AllBetter() to allow access for reads.
730          * Setting it to 2 also allows another site to come along with a newer
731          * database and overwrite this one.
732          */
733         if (ubik_dbase->version.epoch == 1) {
734             DBHOLD(ubik_dbase);
735             urecovery_AbortAll(ubik_dbase);
736             ubik_epochTime = 2;
737             ubik_dbase->version.epoch = ubik_epochTime;
738             ubik_dbase->version.counter = 1;
739             code =
740                 (*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
741             udisk_Invalidate(ubik_dbase, 0);    /* data may have changed */
742 #ifdef AFS_PTHREAD_ENV
743             assert(pthread_cond_broadcast(&ubik_dbase->version_cond) == 0);
744 #else
745             LWP_NoYieldSignal(&ubik_dbase->version);
746 #endif
747             DBRELE(ubik_dbase);
748         }
749
750         /* Check the other sites and send the database to them if they
751          * do not have the current db.
752          */
753         if (!(urecovery_state & UBIK_RECSENTDB)) {
754             /* now propagate out new version to everyone else */
755             dbok = 1;           /* start off assuming they all worked */
756
757             DBHOLD(ubik_dbase);
758             /*
759              * Check if a write transaction is in progress. We can't send the
760              * db when a write is in progress here because the db would be
761              * obsolete as soon as it goes there. Also, ops after the begin
762              * trans would reach the recepient and wouldn't find a transaction
763              * pending there.  Frankly, I don't think it's possible to get past
764              * the write-lock above if there is a write transaction in progress,
765              * but then, it won't hurt to check, will it?
766              */
767             if (ubik_dbase->flags & DBWRITING) {
768                 struct timeval tv;
769                 int safety = 0;
770                 tv.tv_sec = 0;
771                 tv.tv_usec = 50000;
772                 while ((ubik_dbase->flags & DBWRITING) && (safety < 500)) {
773                     DBRELE(ubik_dbase);
774                     /* sleep for a little while */
775 #ifdef AFS_PTHREAD_ENV
776                     select(0, 0, 0, 0, &tv);
777 #else
778                     IOMGR_Select(0, 0, 0, 0, &tv);
779 #endif
780                     tv.tv_usec += 10000;
781                     safety++;
782                     DBHOLD(ubik_dbase);
783                 }
784             }
785
786             for (ts = ubik_servers; ts; ts = ts->next) {
787                 inAddr.s_addr = ts->addr[0];
788                 if (!ts->up) {
789                     ubik_dprint("recovery cannot send version to %s\n",
790                                 afs_inet_ntoa(inAddr.s_addr));
791                     dbok = 0;
792                     continue;
793                 }
794                 ubik_dprint("recovery sending version to %s\n",
795                             afs_inet_ntoa(inAddr.s_addr));
796                 if (vcmp(ts->version, ubik_dbase->version) != 0) {
797                     ubik_dprint("recovery stating local database\n");
798
799                     /* Rx code to do the Bulk Store */
800                     code = (*ubik_dbase->stat) (ubik_dbase, 0, &ubikstat);
801                     if (!code) {
802                         length = ubikstat.size;
803                         file = offset = 0;
804                         rxcall = rx_NewCall(ts->disk_rxcid);
805                         code =
806                             StartDISK_SendFile(rxcall, file, length,
807                                                &ubik_dbase->version);
808                         if (code) {
809                             ubik_dprint("StartDiskSendFile failed=%d\n",
810                                         code);
811                             goto StoreEndCall;
812                         }
813                         while (length > 0) {
814                             tlen =
815                                 (length >
816                                  sizeof(tbuffer) ? sizeof(tbuffer) : length);
817                             nbytes =
818                                 (*ubik_dbase->read) (ubik_dbase, file,
819                                                      tbuffer, offset, tlen);
820                             if (nbytes != tlen) {
821                                 ubik_dprint("Local disk read error=%d\n",
822                                             code = UIOERROR);
823                                 goto StoreEndCall;
824                             }
825                             nbytes = rx_Write(rxcall, tbuffer, tlen);
826                             if (nbytes != tlen) {
827                                 ubik_dprint("Rx-write bulk error=%d\n", code =
828                                             BULK_ERROR);
829                                 goto StoreEndCall;
830                             }
831                             offset += tlen;
832                             length -= tlen;
833                         }
834                         code = EndDISK_SendFile(rxcall);
835                       StoreEndCall:
836                         code = rx_EndCall(rxcall, code);
837                     }
838                     if (code == 0) {
839                         /* we set a new file, process its header */
840                         ts->version = ubik_dbase->version;
841                         ts->currentDB = 1;
842                     } else
843                         dbok = 0;
844                 } else {
845                     /* mark file up to date */
846                     ts->currentDB = 1;
847                 }
848             }
849             DBRELE(ubik_dbase);
850             if (dbok)
851                 urecovery_state |= UBIK_RECSENTDB;
852         }
853     }
854     return NULL;
855 }
856
857 /*!
858  * \brief send a Probe to all the network address of this server 
859  * 
860  * \return 0 if success, else return 1
861  */
862 int
863 DoProbe(struct ubik_server *server)
864 {
865     struct rx_connection *conns[UBIK_MAX_INTERFACE_ADDR];
866     struct rx_connection *connSuccess = 0;
867     int i, j;
868     afs_uint32 addr;
869     char buffer[32];
870     extern afs_int32 ubikSecIndex;
871     extern struct rx_securityClass *ubikSecClass;
872
873     for (i = 0; (addr = server->addr[i]) && (i < UBIK_MAX_INTERFACE_ADDR);
874          i++) {
875         conns[i] =
876             rx_NewConnection(addr, ubik_callPortal, DISK_SERVICE_ID,
877                              ubikSecClass, ubikSecIndex);
878
879         /* user requirement to use only the primary interface */
880         if (ubikPrimaryAddrOnly) {
881             i = 1;
882             break;
883         }
884     }
885     assert(i);                  /* at least one interface address for this server */
886
887     multi_Rx(conns, i) {
888         multi_DISK_Probe();
889         if (!multi_error) {     /* first success */
890             addr = server->addr[multi_i];       /* successful interface addr */
891
892             if (server->disk_rxcid)     /* destroy existing conn */
893                 rx_DestroyConnection(server->disk_rxcid);
894             if (server->vote_rxcid)
895                 rx_DestroyConnection(server->vote_rxcid);
896
897             /* make new connections */
898             server->disk_rxcid = conns[multi_i];
899             server->vote_rxcid = rx_NewConnection(addr, ubik_callPortal, VOTE_SERVICE_ID, ubikSecClass, ubikSecIndex);  /* for vote reqs */
900
901             connSuccess = conns[multi_i];
902             strcpy(buffer, (char *)afs_inet_ntoa(server->addr[0]));
903             ubik_print
904                 ("ubik:server %s is back up: will be contacted through %s\n",
905                  buffer, afs_inet_ntoa(addr));
906
907             multi_Abort;
908         }
909     } multi_End_Ignore;
910
911     /* Destroy all connections except the one on which we succeeded */
912     for (j = 0; j < i; j++)
913         if (conns[j] != connSuccess)
914             rx_DestroyConnection(conns[j]);
915
916     if (!connSuccess)
917         ubik_dprint("ubik:server %s still down\n",
918                     afs_inet_ntoa(server->addr[0]));
919
920     if (connSuccess)
921         return 0;               /* success */
922     else
923         return 1;               /* failure */
924 }