DEVEL15-kill-ubik-pthread-env-20080718
[openafs.git] / src / ubik / recovery.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 RCSID
14     ("$Header$");
15
16 #include <sys/types.h>
17 #ifdef AFS_NT40_ENV
18 #include <winsock2.h>
19 #include <time.h>
20 #include <fcntl.h>
21 #else
22 #include <sys/file.h>
23 #include <netinet/in.h>
24 #include <sys/time.h>
25 #endif
26 #include <assert.h>
27 #include <lock.h>
28 #include <string.h>
29 #include <rx/xdr.h>
30 #include <rx/rx.h>
31 #include <errno.h>
32 #include <afs/afsutil.h>
33
34 #define UBIK_INTERNALS
35 #include "ubik.h"
36 #include "ubik_int.h"
37
38 /* This module is responsible for determining when the system has
39  * recovered to the point that it can handle new transactions.  It
40  * replays logs, polls to determine the current dbase after a crash,
41  * and distributes the new database to the others.
42  */
43
44 /* The sync site associates a version number with each database.  It
45  * broadcasts the version associated with its current dbase in every
46  * one of its beacon messages.  When the sync site send a dbase to a
47  * server, it also sends the db's version.  A non-sync site server can
48  * tell if it has the right dbase version by simply comparing the
49  * version from the beacon message (uvote_dbVersion) with the version
50  * associated with the database (ubik_dbase->version).  The sync site
51  * itself simply has one counter to keep track of all of this (again
52  * ubik_dbase->version).
53  */
54
55 /* sync site: routine called when the sync site loses its quorum; this
56  * procedure is called "up" from the beacon package.  It resyncs the
57  * dbase and nudges the recovery daemon to try to propagate out the
58  * changes.  It also resets the recovery daemon's state, since
59  * recovery must potentially find a new dbase to propagate out.  This
60  * routine should not do anything with variables used by non-sync site
61  * servers.
62  */
63
64 /* if this flag is set, then ubik will use only the primary address 
65 ** ( the address specified in the CellServDB) to contact other 
66 ** ubik servers. Ubik recovery will not try opening connections 
67 ** to the alternate interface addresses. 
68 */
69 int ubikPrimaryAddrOnly;
70
71 int
72 urecovery_ResetState(void)
73 {
74     urecovery_state = 0;
75 #if !defined(AFS_PTHREAD_ENV)
76     /*  No corresponding LWP_WaitProcess found anywhere for this -- klm */
77     LWP_NoYieldSignal(&urecovery_state);
78 #endif
79     return 0;
80 }
81
82 /* sync site: routine called when a non-sync site server goes down; restarts recovery
83  * process to send missing server the new db when it comes back up.
84  * This routine should not do anything with variables used by non-sync site servers. 
85  */
86 int
87 urecovery_LostServer(void)
88 {
89 #if !defined(AFS_PTHREAD_ENV)
90     /*  No corresponding LWP_WaitProcess found anywhere for this -- klm */
91     LWP_NoYieldSignal(&urecovery_state);
92     return 0;
93 #endif
94 }
95
96 /* return true iff we have a current database (called by both sync
97  * sites and non-sync sites) How do we determine this?  If we're the
98  * sync site, we wait until recovery has finished fetching and
99  * re-labelling its dbase (it may still be trying to propagate it out
100  * to everyone else; that's THEIR problem).  If we're not the sync
101  * site, then we must have a dbase labelled with the right version,
102  * and we must have a currently-good sync site.
103  */
104 int
105 urecovery_AllBetter(register struct ubik_dbase *adbase, int areadAny)
106 {
107     register afs_int32 rcode;
108
109     ubik_dprint("allbetter checking\n");
110     rcode = 0;
111
112
113     if (areadAny) {
114         if (ubik_dbase->version.epoch > 1)
115             rcode = 1;          /* Happy with any good version of database */
116     }
117
118     /* Check if we're sync site and we've got the right data */
119     else if (ubeacon_AmSyncSite() && (urecovery_state & UBIK_RECHAVEDB)) {
120         rcode = 1;
121     }
122
123     /* next, check if we're aux site, and we've ever been sent the
124      * right data (note that if a dbase update fails, we won't think
125      * that the sync site is still the sync site, 'cause it won't talk
126      * to us until a timeout period has gone by.  When we recover, we
127      * leave this clear until we get a new dbase */
128     else if ((uvote_GetSyncSite() && (vcmp(ubik_dbVersion, ubik_dbase->version) == 0))) {       /* && order is important */
129         rcode = 1;
130     }
131
132     ubik_dprint("allbetter: returning %d\n", rcode);
133     return rcode;
134 }
135
136 /* abort all transactions on this database */
137 int
138 urecovery_AbortAll(struct ubik_dbase *adbase)
139 {
140     register struct ubik_trans *tt;
141     for (tt = adbase->activeTrans; tt; tt = tt->next) {
142         udisk_abort(tt);
143     }
144     return 0;
145 }
146
147 /* this routine aborts the current remote transaction, if any, if the tid is wrong */
148 int
149 urecovery_CheckTid(register struct ubik_tid *atid)
150 {
151     if (ubik_currentTrans) {
152         /* there is remote write trans, see if we match, see if this
153          * is a new transaction */
154         if (atid->epoch != ubik_currentTrans->tid.epoch
155             || atid->counter > ubik_currentTrans->tid.counter) {
156             /* don't match, abort it */
157             /* If the thread is not waiting for lock - ok to end it */
158 #if !defined(UBIK_PAUSE)
159             if (ubik_currentTrans->locktype != LOCKWAIT) {
160 #endif /* UBIK_PAUSE */
161                 udisk_end(ubik_currentTrans);
162 #if !defined(UBIK_PAUSE)
163             }
164 #endif /* UBIK_PAUSE */
165             ubik_currentTrans = (struct ubik_trans *)0;
166         }
167     }
168     return 0;
169 }
170
171 /* log format is defined here, and implicitly in disk.c
172  *
173  * 4 byte opcode, followed by parameters, each 4 bytes long.  All integers
174  * are in logged in network standard byte order, in case we want to move logs
175  * from machine-to-machine someday.
176  *
177  * Begin transaction: opcode
178  * Commit transaction: opcode, version (8 bytes)
179  * Truncate file: opcode, file number, length
180  * Abort transaction: opcode
181  * Write data: opcode, file, position, length, <length> data bytes
182  *
183  * A very simple routine, it just replays the log.  Note that this is a new-value only log, which
184  * implies that no uncommitted data is written to the dbase: one writes data to the log, including
185  * the commit record, then we allow data to be written through to the dbase.  In our particular
186  * implementation, once a transaction is done, we write out the pages to the database, so that
187  * our buffer package doesn't have to know about stable and uncommitted data in the memory buffers:
188  * any changed data while there is an uncommitted write transaction can be zapped during an
189  * abort and the remaining dbase on the disk is exactly the right dbase, without having to read
190  * the log.
191  */
192
193 /* replay logs */
194 static int
195 ReplayLog(register struct ubik_dbase *adbase)
196 {
197     afs_int32 opcode;
198     register afs_int32 code, tpos;
199     int logIsGood;
200     afs_int32 len, thisSize, tfile, filePos;
201     afs_int32 buffer[4];
202     afs_int32 syncFile = -1;
203     afs_int32 data[1024];
204
205     /* read the lock twice, once to see whether we have a transaction to deal
206      * with that committed, (theoretically, we should support more than one
207      * trans in the log at once, but not yet), and once replaying the
208      * transactions.  */
209     tpos = 0;
210     logIsGood = 0;
211     /* for now, assume that all ops in log pertain to one transaction; see if there's a commit */
212     while (1) {
213         code =
214             (*adbase->read) (adbase, LOGFILE, (char *)&opcode, tpos,
215                              sizeof(afs_int32));
216         if (code != sizeof(afs_int32))
217             break;
218         if (opcode == LOGNEW) {
219             /* handle begin trans */
220             tpos += sizeof(afs_int32);
221         } else if (opcode == LOGABORT)
222             break;
223         else if (opcode == LOGEND) {
224             logIsGood = 1;
225             break;
226         } else if (opcode == LOGTRUNCATE) {
227             tpos += 4;
228             code =
229                 (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
230                                  2 * sizeof(afs_int32));
231             if (code != 2 * sizeof(afs_int32))
232                 break;          /* premature eof or io error */
233             tpos += 2 * sizeof(afs_int32);
234         } else if (opcode == LOGDATA) {
235             tpos += 4;
236             code =
237                 (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
238                                  3 * sizeof(afs_int32));
239             if (code != 3 * sizeof(afs_int32))
240                 break;
241             /* otherwise, skip over the data bytes, too */
242             tpos += buffer[2] + 3 * sizeof(afs_int32);
243         } else {
244             ubik_dprint("corrupt log opcode (%d) at position %d\n", opcode,
245                         tpos);
246             break;              /* corrupt log! */
247         }
248     }
249     if (logIsGood) {
250         /* actually do the replay; log should go all the way through the commit record, since
251          * we just read it above. */
252         tpos = 0;
253         logIsGood = 0;
254         syncFile = -1;
255         while (1) {
256             code =
257                 (*adbase->read) (adbase, LOGFILE, (char *)&opcode, tpos,
258                                  sizeof(afs_int32));
259             if (code != sizeof(afs_int32))
260                 break;
261             if (opcode == LOGNEW) {
262                 /* handle begin trans */
263                 tpos += sizeof(afs_int32);
264             } else if (opcode == LOGABORT)
265                 panic("log abort\n");
266             else if (opcode == LOGEND) {
267                 tpos += 4;
268                 code =
269                     (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
270                                      2 * sizeof(afs_int32));
271                 if (code != 2 * sizeof(afs_int32))
272                     return UBADLOG;
273                 code = (*adbase->setlabel) (adbase, 0, (ubik_version *)buffer);
274                 if (code)
275                     return code;
276                 logIsGood = 1;
277                 break;          /* all done now */
278             } else if (opcode == LOGTRUNCATE) {
279                 tpos += 4;
280                 code =
281                     (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
282                                      2 * sizeof(afs_int32));
283                 if (code != 2 * sizeof(afs_int32))
284                     break;      /* premature eof or io error */
285                 tpos += 2 * sizeof(afs_int32);
286                 code =
287                     (*adbase->truncate) (adbase, ntohl(buffer[0]),
288                                          ntohl(buffer[1]));
289                 if (code)
290                     return code;
291             } else if (opcode == LOGDATA) {
292                 tpos += 4;
293                 code =
294                     (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
295                                      3 * sizeof(afs_int32));
296                 if (code != 3 * sizeof(afs_int32))
297                     break;
298                 tpos += 3 * sizeof(afs_int32);
299                 /* otherwise, skip over the data bytes, too */
300                 len = ntohl(buffer[2]); /* total number of bytes to copy */
301                 filePos = ntohl(buffer[1]);
302                 tfile = ntohl(buffer[0]);
303                 /* try to minimize file syncs */
304                 if (syncFile != tfile) {
305                     if (syncFile >= 0)
306                         code = (*adbase->sync) (adbase, syncFile);
307                     else
308                         code = 0;
309                     syncFile = tfile;
310                     if (code)
311                         return code;
312                 }
313                 while (len > 0) {
314                     thisSize = (len > sizeof(data) ? sizeof(data) : len);
315                     /* copy sizeof(data) buffer bytes at a time */
316                     code =
317                         (*adbase->read) (adbase, LOGFILE, (char *)data, tpos,
318                                          thisSize);
319                     if (code != thisSize)
320                         return UBADLOG;
321                     code =
322                         (*adbase->write) (adbase, tfile, (char *)data, filePos,
323                                           thisSize);
324                     if (code != thisSize)
325                         return UBADLOG;
326                     filePos += thisSize;
327                     tpos += thisSize;
328                     len -= thisSize;
329                 }
330             } else {
331                 ubik_dprint("corrupt log opcode (%d) at position %d\n",
332                             opcode, tpos);
333                 break;          /* corrupt log! */
334             }
335         }
336         if (logIsGood) {
337             if (syncFile >= 0)
338                 code = (*adbase->sync) (adbase, syncFile);
339             if (code)
340                 return code;
341         } else {
342             ubik_dprint("Log read error on pass 2\n");
343             return UBADLOG;
344         }
345     }
346
347     /* now truncate the log, we're done with it */
348     code = (*adbase->truncate) (adbase, LOGFILE, 0);
349     return code;
350 }
351
352 /* Called at initialization to figure out version of the dbase we really have.
353  * This routine is called after replaying the log; it reads the restored labels.
354  */
355 static int
356 InitializeDB(register struct ubik_dbase *adbase)
357 {
358     register afs_int32 code;
359
360     code = (*adbase->getlabel) (adbase, 0, &adbase->version);
361     if (code) {
362         /* try setting the label to a new value */
363         adbase->version.epoch = 1;      /* value for newly-initialized db */
364         adbase->version.counter = 1;
365         code = (*adbase->setlabel) (adbase, 0, &adbase->version);
366         if (code) {
367             /* failed, try to set it back */
368             adbase->version.epoch = 0;
369             adbase->version.counter = 0;
370             (*adbase->setlabel) (adbase, 0, &adbase->version);
371         }
372 #ifdef AFS_PTHREAD_ENV
373         assert(pthread_cond_broadcast(&adbase->version_cond) == 0);
374 #else
375         LWP_NoYieldSignal(&adbase->version);
376 #endif
377     }
378     return 0;
379 }
380
381 /* initialize the local dbase
382  * We replay the logs and then read the resulting file to figure out what version we've really got.
383  */
384 int
385 urecovery_Initialize(register struct ubik_dbase *adbase)
386 {
387     register afs_int32 code;
388
389     code = ReplayLog(adbase);
390     if (code)
391         return code;
392     code = InitializeDB(adbase);
393     return code;
394 }
395
396 /* Main interaction loop for the recovery manager
397  * The recovery light-weight process only runs when you're the
398  * synchronization site.  It performs the following tasks, if and only
399  * if the prerequisite tasks have been performed successfully (it
400  * keeps track of which ones have been performed in its bit map,
401  * urecovery_state).
402  *
403  * First, it is responsible for probing that all servers are up.  This
404  * is the only operation that must be performed even if this is not
405  * yet the sync site, since otherwise this site may not notice that
406  * enough other machines are running to even elect this guy to be the
407  * sync site.
408  *
409  * After that, the recovery process does nothing until the beacon and
410  * voting modules manage to get this site elected sync site.
411  *
412  * After becoming sync site, recovery first attempts to find the best
413  * database available in the network (it must do this in order to
414  * ensure finding the latest committed data).  After finding the right
415  * database, it must fetch this dbase to the sync site.
416  *
417  * After fetching the dbase, it relabels it with a new version number,
418  * to ensure that everyone recognizes this dbase as the most recent
419  * dbase.
420  *
421  * One the dbase has been relabelled, this machine can start handling
422  * requests.  However, the recovery module still has one more task:
423  * propagating the dbase out to everyone who is up in the network.
424  */
425 void *
426 urecovery_Interact(void *dummy)
427 {
428     afs_int32 code, tcode;
429     struct ubik_server *bestServer = NULL;
430     struct ubik_server *ts;
431     int dbok, doingRPC, now;
432     afs_int32 lastProbeTime, lastDBVCheck;
433     /* if we're the sync site, the best db version we've found yet */
434     static struct ubik_version bestDBVersion;
435     struct ubik_version tversion;
436     struct timeval tv;
437     int length, tlen, offset, file, nbytes;
438     struct rx_call *rxcall;
439     char tbuffer[1024];
440     struct ubik_stat ubikstat;
441     struct in_addr inAddr;
442 #ifndef OLD_URECOVERY
443     char pbuffer[1028];
444     int flen, fd = -1;
445     afs_int32 epoch, pass;
446 #endif
447
448     /* otherwise, begin interaction */
449     urecovery_state = 0;
450     lastProbeTime = 0;
451     lastDBVCheck = 0;
452     while (1) {
453         /* Run through this loop every 4 seconds */
454         tv.tv_sec = 4;
455         tv.tv_usec = 0;
456 #ifdef AFS_PTHREAD_ENV
457         select(0, 0, 0, 0, &tv);
458 #else
459         IOMGR_Select(0, 0, 0, 0, &tv);
460 #endif
461
462         ubik_dprint("recovery running in state %x\n", urecovery_state);
463
464         /* Every 30 seconds, check all the down servers and mark them
465          * as up if they respond. When a server comes up or found to
466          * not be current, then re-find the the best database and 
467          * propogate it.
468          */
469         if ((now = FT_ApproxTime()) > 30 + lastProbeTime) {
470             for (ts = ubik_servers, doingRPC = 0; ts; ts = ts->next) {
471                 if (!ts->up) {
472                     doingRPC = 1;
473                     code = DoProbe(ts);
474                     if (code == 0) {
475                         ts->up = 1;
476                         urecovery_state &= ~UBIK_RECFOUNDDB;
477                     }
478                 } else if (!ts->currentDB) {
479                     urecovery_state &= ~UBIK_RECFOUNDDB;
480                 }
481             }
482             if (doingRPC)
483                 now = FT_ApproxTime();
484             lastProbeTime = now;
485         }
486
487         /* Mark whether we are the sync site */
488         if (!ubeacon_AmSyncSite()) {
489             urecovery_state &= ~UBIK_RECSYNCSITE;
490             continue;           /* nothing to do */
491         }
492         urecovery_state |= UBIK_RECSYNCSITE;
493
494         /* If a server has just come up or if we have not found the 
495          * most current database, then go find the most current db.
496          */
497         if (!(urecovery_state & UBIK_RECFOUNDDB)) {
498             bestServer = (struct ubik_server *)0;
499             bestDBVersion.epoch = 0;
500             bestDBVersion.counter = 0;
501             for (ts = ubik_servers; ts; ts = ts->next) {
502                 if (!ts->up)
503                     continue;   /* don't bother with these guys */
504                 if (ts->isClone)
505                     continue;
506                 code = DISK_GetVersion(ts->disk_rxcid, &ts->version);
507                 if (code == 0) {
508                     /* perhaps this is the best version */
509                     if (vcmp(ts->version, bestDBVersion) > 0) {
510                         /* new best version */
511                         bestDBVersion = ts->version;
512                         bestServer = ts;
513                     }
514                 }
515             }
516             /* take into consideration our version. Remember if we,
517              * the sync site, have the best version. Also note that
518              * we may need to send the best version out.
519              */
520             if (vcmp(ubik_dbase->version, bestDBVersion) >= 0) {
521                 bestDBVersion = ubik_dbase->version;
522                 bestServer = (struct ubik_server *)0;
523                 urecovery_state |= UBIK_RECHAVEDB;
524             } else {
525                 /* Clear the flag only when we know we have to retrieve
526                  * the db. Because urecovery_AllBetter() looks at it.
527                  */
528                 urecovery_state &= ~UBIK_RECHAVEDB;
529             }
530             lastDBVCheck = FT_ApproxTime();
531             urecovery_state |= UBIK_RECFOUNDDB;
532             urecovery_state &= ~UBIK_RECSENTDB;
533         }
534 #if defined(UBIK_PAUSE)
535         /* it's not possible for UBIK_RECFOUNDDB not to be set here.
536          * However, we might have lost UBIK_RECSYNCSITE, and that
537          * IS important.
538          */
539         if (!(urecovery_state & UBIK_RECSYNCSITE))
540             continue;           /* lost sync */
541 #else
542         if (!(urecovery_state & UBIK_RECFOUNDDB))
543             continue;           /* not ready */
544 #endif /* UBIK_PAUSE */
545
546         /* If we, the sync site, do not have the best db version, then
547          * go and get it from the server that does.
548          */
549         if ((urecovery_state & UBIK_RECHAVEDB) || !bestServer) {
550             urecovery_state |= UBIK_RECHAVEDB;
551         } else {
552             /* we don't have the best version; we should fetch it. */
553             DBHOLD(ubik_dbase);
554             urecovery_AbortAll(ubik_dbase);
555
556             /* Rx code to do the Bulk fetch */
557             file = 0;
558             offset = 0;
559             rxcall = rx_NewCall(bestServer->disk_rxcid);
560
561             ubik_print("Ubik: Synchronize database with server %s\n",
562                        afs_inet_ntoa(bestServer->addr[0]));
563
564             code = StartDISK_GetFile(rxcall, file);
565             if (code) {
566                 ubik_dprint("StartDiskGetFile failed=%d\n", code);
567                 goto FetchEndCall;
568             }
569             nbytes = rx_Read(rxcall, (char *)&length, sizeof(afs_int32));
570             length = ntohl(length);
571             if (nbytes != sizeof(afs_int32)) {
572                 ubik_dprint("Rx-read length error=%d\n", code = BULK_ERROR);
573                 code = EIO;
574                 goto FetchEndCall;
575             }
576
577 #ifdef OLD_URECOVERY
578             /* Truncate the file first */
579             code = (*ubik_dbase->truncate) (ubik_dbase, file, 0);
580             if (code) {
581                 ubik_dprint("truncate io error=%d\n", code);
582                 goto FetchEndCall;
583             }
584             tversion.counter = 0;
585 #endif
586             /* give invalid label during file transit */
587             tversion.epoch = 0;
588             code = (*ubik_dbase->setlabel) (ubik_dbase, file, &tversion);
589             if (code) {
590                 ubik_dprint("setlabel io error=%d\n", code);
591                 goto FetchEndCall;
592             }
593 #ifndef OLD_URECOVERY
594             flen = length;
595             afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.TMP", ubik_dbase->pathName);
596             fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
597             if (fd < 0) {
598                 code = errno;
599                 goto FetchEndCall;
600             }
601             code = lseek(fd, HDRSIZE, 0);
602             if (code != HDRSIZE) {
603                 close(fd);
604                 goto FetchEndCall;
605             }
606 #endif
607
608             while (length > 0) {
609                 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
610 #ifndef AFS_PTHREAD_ENV
611                 if (pass % 4 == 0)
612                     IOMGR_Poll();
613 #endif
614                 nbytes = rx_Read(rxcall, tbuffer, tlen);
615                 if (nbytes != tlen) {
616                     ubik_dprint("Rx-read bulk error=%d\n", code = BULK_ERROR);
617                     code = EIO;
618                     close(fd);
619                     goto FetchEndCall;
620                 }
621 #ifdef OLD_URECOVERY
622                 nbytes =
623                     (*ubik_dbase->write) (ubik_dbase, file, tbuffer, offset,
624                                           tlen);
625 #else
626                 nbytes = write(fd, tbuffer, tlen);
627                 pass++;
628 #endif
629                 if (nbytes != tlen) {
630                     code = UIOERROR;
631                     close(fd);
632                     goto FetchEndCall;
633                 }
634                 offset += tlen;
635                 length -= tlen;
636             }
637 #ifndef OLD_URECOVERY
638             code = close(fd);
639             if (code)
640                 goto FetchEndCall;
641 #endif      
642             code = EndDISK_GetFile(rxcall, &tversion);
643           FetchEndCall:
644             tcode = rx_EndCall(rxcall, code);
645             if (!code)
646                 code = tcode;
647             if (!code) {
648                 /* we got a new file, set up its header */
649                 urecovery_state |= UBIK_RECHAVEDB;
650                 memcpy(&ubik_dbase->version, &tversion,
651                        sizeof(struct ubik_version));
652 #ifdef OLD_URECOVERY
653                 (*ubik_dbase->sync) (ubik_dbase, 0);    /* get data out first */
654 #else
655                 afs_snprintf(tbuffer, sizeof(tbuffer), "%s.DB0", ubik_dbase->pathName);
656 #ifdef AFS_NT40_ENV
657                 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.OLD", ubik_dbase->pathName);
658                 code = unlink(pbuffer);
659                 if (!code)
660                     code = rename(tbuffer, pbuffer);
661                 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.TMP", ubik_dbase->pathName);
662 #endif
663                 if (!code) 
664                     code = rename(pbuffer, tbuffer);
665                 if (!code) 
666                     code = (*ubik_dbase->open) (ubik_dbase, 0);
667                 if (!code)
668 #endif
669                 /* after data is good, sync disk with correct label */
670                 code =
671                     (*ubik_dbase->setlabel) (ubik_dbase, 0,
672                                              &ubik_dbase->version);
673 #ifndef OLD_URECOVERY
674 #ifdef AFS_NT40_ENV
675                 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.OLD", ubik_dbase->pathName);
676                 unlink(pbuffer);
677 #endif
678 #endif
679             }
680             if (code) {
681 #ifndef OLD_URECOVERY
682                 unlink(pbuffer);
683                 /* 
684                  * We will effectively invalidate the old data forever now.
685                  * Unclear if we *should* but we do.
686                  */
687 #endif
688                 ubik_dbase->version.epoch = 0;
689                 ubik_dbase->version.counter = 0;
690                 ubik_print("Ubik: Synchronize database failed (error = %d)\n",
691                            code);
692             } else {
693                 ubik_print("Ubik: Synchronize database completed\n");
694                 urecovery_state |= UBIK_RECHAVEDB;
695             }
696             udisk_Invalidate(ubik_dbase, 0);    /* data has changed */
697 #ifdef AFS_PTHREAD_ENV
698             assert(pthread_cond_broadcast(&ubik_dbase->version_cond) == 0);
699 #else
700             LWP_NoYieldSignal(&ubik_dbase->version);
701 #endif
702             DBRELE(ubik_dbase);
703         }
704 #if defined(UBIK_PAUSE)
705         if (!(urecovery_state & UBIK_RECSYNCSITE))
706             continue;           /* lost sync */
707 #endif /* UBIK_PAUSE */
708         if (!(urecovery_state & UBIK_RECHAVEDB))
709             continue;           /* not ready */
710
711         /* If the database was newly initialized, then when we establish quorum, write
712          * a new label. This allows urecovery_AllBetter() to allow access for reads.
713          * Setting it to 2 also allows another site to come along with a newer
714          * database and overwrite this one.
715          */
716         if (ubik_dbase->version.epoch == 1) {
717             DBHOLD(ubik_dbase);
718             urecovery_AbortAll(ubik_dbase);
719             ubik_epochTime = 2;
720             ubik_dbase->version.epoch = ubik_epochTime;
721             ubik_dbase->version.counter = 1;
722             code =
723                 (*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
724             udisk_Invalidate(ubik_dbase, 0);    /* data may have changed */
725 #ifdef AFS_PTHREAD_ENV
726             assert(pthread_cond_broadcast(&ubik_dbase->version_cond) == 0);
727 #else
728             LWP_NoYieldSignal(&ubik_dbase->version);
729 #endif
730             DBRELE(ubik_dbase);
731         }
732
733         /* Check the other sites and send the database to them if they
734          * do not have the current db.
735          */
736         if (!(urecovery_state & UBIK_RECSENTDB)) {
737             /* now propagate out new version to everyone else */
738             dbok = 1;           /* start off assuming they all worked */
739
740             DBHOLD(ubik_dbase);
741             /*
742              * Check if a write transaction is in progress. We can't send the
743              * db when a write is in progress here because the db would be
744              * obsolete as soon as it goes there. Also, ops after the begin
745              * trans would reach the recepient and wouldn't find a transaction
746              * pending there.  Frankly, I don't think it's possible to get past
747              * the write-lock above if there is a write transaction in progress,
748              * but then, it won't hurt to check, will it?
749              */
750             if (ubik_dbase->flags & DBWRITING) {
751                 struct timeval tv;
752                 int safety = 0;
753                 tv.tv_sec = 0;
754                 tv.tv_usec = 50000;
755                 while ((ubik_dbase->flags & DBWRITING) && (safety < 500)) {
756                     DBRELE(ubik_dbase);
757                     /* sleep for a little while */
758 #ifdef AFS_PTHREAD_ENV
759                     select(0, 0, 0, 0, &tv);
760 #else
761                     IOMGR_Select(0, 0, 0, 0, &tv);
762 #endif
763                     tv.tv_usec += 10000;
764                     safety++;
765                     DBHOLD(ubik_dbase);
766                 }
767             }
768
769             for (ts = ubik_servers; ts; ts = ts->next) {
770                 inAddr.s_addr = ts->addr[0];
771                 if (!ts->up) {
772                     ubik_dprint("recovery cannot send version to %s\n",
773                                 afs_inet_ntoa(inAddr.s_addr));
774                     dbok = 0;
775                     continue;
776                 }
777                 ubik_dprint("recovery sending version to %s\n",
778                             afs_inet_ntoa(inAddr.s_addr));
779                 if (vcmp(ts->version, ubik_dbase->version) != 0) {
780                     ubik_dprint("recovery stating local database\n");
781
782                     /* Rx code to do the Bulk Store */
783                     code = (*ubik_dbase->stat) (ubik_dbase, 0, &ubikstat);
784                     if (!code) {
785                         length = ubikstat.size;
786                         file = offset = 0;
787                         rxcall = rx_NewCall(ts->disk_rxcid);
788                         code =
789                             StartDISK_SendFile(rxcall, file, length,
790                                                &ubik_dbase->version);
791                         if (code) {
792                             ubik_dprint("StartDiskSendFile failed=%d\n",
793                                         code);
794                             goto StoreEndCall;
795                         }
796                         while (length > 0) {
797                             tlen =
798                                 (length >
799                                  sizeof(tbuffer) ? sizeof(tbuffer) : length);
800                             nbytes =
801                                 (*ubik_dbase->read) (ubik_dbase, file,
802                                                      tbuffer, offset, tlen);
803                             if (nbytes != tlen) {
804                                 ubik_dprint("Local disk read error=%d\n",
805                                             code = UIOERROR);
806                                 goto StoreEndCall;
807                             }
808                             nbytes = rx_Write(rxcall, tbuffer, tlen);
809                             if (nbytes != tlen) {
810                                 ubik_dprint("Rx-write bulk error=%d\n", code =
811                                             BULK_ERROR);
812                                 goto StoreEndCall;
813                             }
814                             offset += tlen;
815                             length -= tlen;
816                         }
817                         code = EndDISK_SendFile(rxcall);
818                       StoreEndCall:
819                         code = rx_EndCall(rxcall, code);
820                     }
821                     if (code == 0) {
822                         /* we set a new file, process its header */
823                         ts->version = ubik_dbase->version;
824                         ts->currentDB = 1;
825                     } else
826                         dbok = 0;
827                 } else {
828                     /* mark file up to date */
829                     ts->currentDB = 1;
830                 }
831             }
832             DBRELE(ubik_dbase);
833             if (dbok)
834                 urecovery_state |= UBIK_RECSENTDB;
835         }
836     }
837     return NULL;
838 }
839
840 /*
841 ** send a Probe to all the network address of this server 
842 ** Return 0  if success, else return 1
843 */
844 int
845 DoProbe(struct ubik_server *server)
846 {
847     struct rx_connection *conns[UBIK_MAX_INTERFACE_ADDR];
848     struct rx_connection *connSuccess = 0;
849     int i, j;
850     afs_uint32 addr;
851     char buffer[32];
852     extern afs_int32 ubikSecIndex;
853     extern struct rx_securityClass *ubikSecClass;
854
855     for (i = 0; (addr = server->addr[i]) && (i < UBIK_MAX_INTERFACE_ADDR);
856          i++) {
857         conns[i] =
858             rx_NewConnection(addr, ubik_callPortal, DISK_SERVICE_ID,
859                              ubikSecClass, ubikSecIndex);
860
861         /* user requirement to use only the primary interface */
862         if (ubikPrimaryAddrOnly) {
863             i = 1;
864             break;
865         }
866     }
867     assert(i);                  /* at least one interface address for this server */
868
869     multi_Rx(conns, i) {
870         multi_DISK_Probe();
871         if (!multi_error) {     /* first success */
872             addr = server->addr[multi_i];       /* successful interface addr */
873
874             if (server->disk_rxcid)     /* destroy existing conn */
875                 rx_DestroyConnection(server->disk_rxcid);
876             if (server->vote_rxcid)
877                 rx_DestroyConnection(server->vote_rxcid);
878
879             /* make new connections */
880             server->disk_rxcid = conns[multi_i];
881             server->vote_rxcid = rx_NewConnection(addr, ubik_callPortal, VOTE_SERVICE_ID, ubikSecClass, ubikSecIndex);  /* for vote reqs */
882
883             connSuccess = conns[multi_i];
884             strcpy(buffer, (char *)afs_inet_ntoa(server->addr[0]));
885             ubik_print
886                 ("ubik:server %s is back up: will be contacted through %s\n",
887                  buffer, afs_inet_ntoa(addr));
888
889             multi_Abort;
890         }
891     } multi_End_Ignore;
892
893     /* Destroy all connections except the one on which we succeeded */
894     for (j = 0; j < i; j++)
895         if (conns[j] != connSuccess)
896             rx_DestroyConnection(conns[j]);
897
898     if (!connSuccess)
899         ubik_dprint("ubik:server %s still down\n",
900                     afs_inet_ntoa(server->addr[0]));
901
902     if (connSuccess)
903         return 0;               /* success */
904     else
905         return 1;               /* failure */
906 }