01f279ecdd7070f9567b417b3e414590e2023a44
[openafs.git] / src / ubik / recovery.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13
14 #include <sys/types.h>
15 #include <string.h>
16 #include <stdarg.h>
17 #include <errno.h>
18 #include <assert.h>
19
20 #ifdef AFS_NT40_ENV
21 #include <winsock2.h>
22 #include <time.h>
23 #include <fcntl.h>
24 #else
25 #include <sys/file.h>
26 #include <netinet/in.h>
27 #include <sys/time.h>
28 #endif
29
30 #include <lock.h>
31 #include <rx/xdr.h>
32 #include <rx/rx.h>
33 #include <afs/afsutil.h>
34
35 #define UBIK_INTERNALS
36 #include "ubik.h"
37 #include "ubik_int.h"
38
39 /*! \file
40  * This module is responsible for determining when the system has
41  * recovered to the point that it can handle new transactions.  It
42  * replays logs, polls to determine the current dbase after a crash,
43  * and distributes the new database to the others.
44  *
45  * The sync site associates a version number with each database.  It
46  * broadcasts the version associated with its current dbase in every
47  * one of its beacon messages.  When the sync site send a dbase to a
48  * server, it also sends the db's version.  A non-sync site server can
49  * tell if it has the right dbase version by simply comparing the
50  * version from the beacon message \p uvote_dbVersion with the version
51  * associated with the database \p ubik_dbase->version.  The sync site
52  * itself simply has one counter to keep track of all of this (again
53  * \p ubik_dbase->version).
54  *
55  * sync site: routine called when the sync site loses its quorum; this
56  * procedure is called "up" from the beacon package.  It resyncs the
57  * dbase and nudges the recovery daemon to try to propagate out the
58  * changes.  It also resets the recovery daemon's state, since
59  * recovery must potentially find a new dbase to propagate out.  This
60  * routine should not do anything with variables used by non-sync site
61  * servers.
62  */
63
64 /*!
65  * if this flag is set, then ubik will use only the primary address 
66  * (the address specified in the CellServDB) to contact other 
67  * ubik servers. Ubik recovery will not try opening connections 
68  * to the alternate interface addresses. 
69  */
70 int ubikPrimaryAddrOnly;
71
72 int
73 urecovery_ResetState(void)
74 {
75     urecovery_state = 0;
76 #if !defined(AFS_PTHREAD_ENV)
77     /*  No corresponding LWP_WaitProcess found anywhere for this -- klm */
78     LWP_NoYieldSignal(&urecovery_state);
79 #endif
80     return 0;
81 }
82
83 /*!
84  * \brief sync site
85  *
86  * routine called when a non-sync site server goes down; restarts recovery
87  * process to send missing server the new db when it comes back up.
88  *
89  * \note This routine should not do anything with variables used by non-sync site servers. 
90  */
91 int
92 urecovery_LostServer(void)
93 {
94 #if !defined(AFS_PTHREAD_ENV)
95     /*  No corresponding LWP_WaitProcess found anywhere for this -- klm */
96     LWP_NoYieldSignal(&urecovery_state);
97 #endif
98     return 0;
99 }
100
101 /*!
102  * return true iff we have a current database (called by both sync
103  * sites and non-sync sites) How do we determine this?  If we're the
104  * sync site, we wait until recovery has finished fetching and
105  * re-labelling its dbase (it may still be trying to propagate it out
106  * to everyone else; that's THEIR problem).  If we're not the sync
107  * site, then we must have a dbase labelled with the right version,
108  * and we must have a currently-good sync site.
109  */
110 int
111 urecovery_AllBetter(register struct ubik_dbase *adbase, int areadAny)
112 {
113     register afs_int32 rcode;
114
115     ubik_dprint_25("allbetter checking\n");
116     rcode = 0;
117
118
119     if (areadAny) {
120         if (ubik_dbase->version.epoch > 1)
121             rcode = 1;          /* Happy with any good version of database */
122     }
123
124     /* Check if we're sync site and we've got the right data */
125     else if (ubeacon_AmSyncSite() && (urecovery_state & UBIK_RECHAVEDB)) {
126         rcode = 1;
127     }
128
129     /* next, check if we're aux site, and we've ever been sent the
130      * right data (note that if a dbase update fails, we won't think
131      * that the sync site is still the sync site, 'cause it won't talk
132      * to us until a timeout period has gone by.  When we recover, we
133      * leave this clear until we get a new dbase */
134     else if ((uvote_GetSyncSite() && (vcmp(ubik_dbVersion, ubik_dbase->version) == 0))) {       /* && order is important */
135         rcode = 1;
136     }
137
138     ubik_dprint_25("allbetter: returning %d\n", rcode);
139     return rcode;
140 }
141
142 /*!
143  * \brief abort all transactions on this database
144  */
145 int
146 urecovery_AbortAll(struct ubik_dbase *adbase)
147 {
148     register struct ubik_trans *tt;
149     for (tt = adbase->activeTrans; tt; tt = tt->next) {
150         udisk_abort(tt);
151     }
152     return 0;
153 }
154
155 /*!
156  * \brief this routine aborts the current remote transaction, if any, if the tid is wrong
157  */
158 int
159 urecovery_CheckTid(register struct ubik_tid *atid)
160 {
161     if (ubik_currentTrans) {
162         /* there is remote write trans, see if we match, see if this
163          * is a new transaction */
164         if (atid->epoch != ubik_currentTrans->tid.epoch
165             || atid->counter > ubik_currentTrans->tid.counter) {
166             /* don't match, abort it */
167             /* If the thread is not waiting for lock - ok to end it */
168 #if !defined(UBIK_PAUSE)
169             if (ubik_currentTrans->locktype != LOCKWAIT) {
170 #endif /* UBIK_PAUSE */
171                 udisk_end(ubik_currentTrans);
172 #if !defined(UBIK_PAUSE)
173             }
174 #endif /* UBIK_PAUSE */
175             ubik_currentTrans = (struct ubik_trans *)0;
176         }
177     }
178     return 0;
179 }
180
181 /*!
182  * \brief replay logs
183  *
184  * log format is defined here, and implicitly in disk.c
185  *
186  * 4 byte opcode, followed by parameters, each 4 bytes long.  All integers
187  * are in logged in network standard byte order, in case we want to move logs
188  * from machine-to-machine someday.
189  *
190  * Begin transaction: opcode \n
191  * Commit transaction: opcode, version (8 bytes) \n
192  * Truncate file: opcode, file number, length \n
193  * Abort transaction: opcode \n
194  * Write data: opcode, file, position, length, <length> data bytes \n
195  *
196  * A very simple routine, it just replays the log.  Note that this is a new-value only log, which
197  * implies that no uncommitted data is written to the dbase: one writes data to the log, including
198  * the commit record, then we allow data to be written through to the dbase.  In our particular
199  * implementation, once a transaction is done, we write out the pages to the database, so that
200  * our buffer package doesn't have to know about stable and uncommitted data in the memory buffers:
201  * any changed data while there is an uncommitted write transaction can be zapped during an
202  * abort and the remaining dbase on the disk is exactly the right dbase, without having to read
203  * the log.
204  */
205 static int
206 ReplayLog(register struct ubik_dbase *adbase)
207 {
208     afs_int32 opcode;
209     register afs_int32 code, tpos;
210     int logIsGood;
211     afs_int32 len, thisSize, tfile, filePos;
212     afs_int32 buffer[4];
213     afs_int32 syncFile = -1;
214     afs_int32 data[1024];
215
216     /* read the lock twice, once to see whether we have a transaction to deal
217      * with that committed, (theoretically, we should support more than one
218      * trans in the log at once, but not yet), and once replaying the
219      * transactions.  */
220     tpos = 0;
221     logIsGood = 0;
222     /* for now, assume that all ops in log pertain to one transaction; see if there's a commit */
223     while (1) {
224         code =
225             (*adbase->read) (adbase, LOGFILE, (char *)&opcode, tpos,
226                              sizeof(afs_int32));
227         if (code != sizeof(afs_int32))
228             break;
229         if (opcode == LOGNEW) {
230             /* handle begin trans */
231             tpos += sizeof(afs_int32);
232         } else if (opcode == LOGABORT)
233             break;
234         else if (opcode == LOGEND) {
235             logIsGood = 1;
236             break;
237         } else if (opcode == LOGTRUNCATE) {
238             tpos += 4;
239             code =
240                 (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
241                                  2 * sizeof(afs_int32));
242             if (code != 2 * sizeof(afs_int32))
243                 break;          /* premature eof or io error */
244             tpos += 2 * sizeof(afs_int32);
245         } else if (opcode == LOGDATA) {
246             tpos += 4;
247             code =
248                 (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
249                                  3 * sizeof(afs_int32));
250             if (code != 3 * sizeof(afs_int32))
251                 break;
252             /* otherwise, skip over the data bytes, too */
253             tpos += buffer[2] + 3 * sizeof(afs_int32);
254         } else {
255             ubik_dprint("corrupt log opcode (%d) at position %d\n", opcode,
256                         tpos);
257             break;              /* corrupt log! */
258         }
259     }
260     if (logIsGood) {
261         /* actually do the replay; log should go all the way through the commit record, since
262          * we just read it above. */
263         tpos = 0;
264         logIsGood = 0;
265         syncFile = -1;
266         while (1) {
267             code =
268                 (*adbase->read) (adbase, LOGFILE, (char *)&opcode, tpos,
269                                  sizeof(afs_int32));
270             if (code != sizeof(afs_int32))
271                 break;
272             if (opcode == LOGNEW) {
273                 /* handle begin trans */
274                 tpos += sizeof(afs_int32);
275             } else if (opcode == LOGABORT)
276                 panic("log abort\n");
277             else if (opcode == LOGEND) {
278                 tpos += 4;
279                 code =
280                     (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
281                                      2 * sizeof(afs_int32));
282                 if (code != 2 * sizeof(afs_int32))
283                     return UBADLOG;
284                 code = (*adbase->setlabel) (adbase, 0, (ubik_version *)buffer);
285                 if (code)
286                     return code;
287                 logIsGood = 1;
288                 break;          /* all done now */
289             } else if (opcode == LOGTRUNCATE) {
290                 tpos += 4;
291                 code =
292                     (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
293                                      2 * sizeof(afs_int32));
294                 if (code != 2 * sizeof(afs_int32))
295                     break;      /* premature eof or io error */
296                 tpos += 2 * sizeof(afs_int32);
297                 code =
298                     (*adbase->truncate) (adbase, ntohl(buffer[0]),
299                                          ntohl(buffer[1]));
300                 if (code)
301                     return code;
302             } else if (opcode == LOGDATA) {
303                 tpos += 4;
304                 code =
305                     (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
306                                      3 * sizeof(afs_int32));
307                 if (code != 3 * sizeof(afs_int32))
308                     break;
309                 tpos += 3 * sizeof(afs_int32);
310                 /* otherwise, skip over the data bytes, too */
311                 len = ntohl(buffer[2]); /* total number of bytes to copy */
312                 filePos = ntohl(buffer[1]);
313                 tfile = ntohl(buffer[0]);
314                 /* try to minimize file syncs */
315                 if (syncFile != tfile) {
316                     if (syncFile >= 0)
317                         code = (*adbase->sync) (adbase, syncFile);
318                     else
319                         code = 0;
320                     syncFile = tfile;
321                     if (code)
322                         return code;
323                 }
324                 while (len > 0) {
325                     thisSize = (len > sizeof(data) ? sizeof(data) : len);
326                     /* copy sizeof(data) buffer bytes at a time */
327                     code =
328                         (*adbase->read) (adbase, LOGFILE, (char *)data, tpos,
329                                          thisSize);
330                     if (code != thisSize)
331                         return UBADLOG;
332                     code =
333                         (*adbase->write) (adbase, tfile, (char *)data, filePos,
334                                           thisSize);
335                     if (code != thisSize)
336                         return UBADLOG;
337                     filePos += thisSize;
338                     tpos += thisSize;
339                     len -= thisSize;
340                 }
341             } else {
342                 ubik_dprint("corrupt log opcode (%d) at position %d\n",
343                             opcode, tpos);
344                 break;          /* corrupt log! */
345             }
346         }
347         if (logIsGood) {
348             if (syncFile >= 0)
349                 code = (*adbase->sync) (adbase, syncFile);
350             if (code)
351                 return code;
352         } else {
353             ubik_dprint("Log read error on pass 2\n");
354             return UBADLOG;
355         }
356     }
357
358     /* now truncate the log, we're done with it */
359     code = (*adbase->truncate) (adbase, LOGFILE, 0);
360     return code;
361 }
362
363 /*! \brief
364  * Called at initialization to figure out version of the dbase we really have.
365  *
366  * This routine is called after replaying the log; it reads the restored labels.
367  */
368 static int
369 InitializeDB(register struct ubik_dbase *adbase)
370 {
371     register afs_int32 code;
372
373     code = (*adbase->getlabel) (adbase, 0, &adbase->version);
374     if (code) {
375         /* try setting the label to a new value */
376         adbase->version.epoch = 1;      /* value for newly-initialized db */
377         adbase->version.counter = 1;
378         code = (*adbase->setlabel) (adbase, 0, &adbase->version);
379         if (code) {
380             /* failed, try to set it back */
381             adbase->version.epoch = 0;
382             adbase->version.counter = 0;
383             (*adbase->setlabel) (adbase, 0, &adbase->version);
384         }
385 #ifdef AFS_PTHREAD_ENV
386         assert(pthread_cond_broadcast(&adbase->version_cond) == 0);
387 #else
388         LWP_NoYieldSignal(&adbase->version);
389 #endif
390     }
391     return 0;
392 }
393
394 /*!
395  * \brief initialize the local ubik_dbase
396  *
397  * We replay the logs and then read the resulting file to figure out what version we've really got.
398  */
399 int
400 urecovery_Initialize(register struct ubik_dbase *adbase)
401 {
402     register afs_int32 code;
403
404     code = ReplayLog(adbase);
405     if (code)
406         return code;
407     code = InitializeDB(adbase);
408     return code;
409 }
410
411 /*!
412  * \brief Main interaction loop for the recovery manager
413  *
414  * The recovery light-weight process only runs when you're the
415  * synchronization site.  It performs the following tasks, if and only
416  * if the prerequisite tasks have been performed successfully (it
417  * keeps track of which ones have been performed in its bit map,
418  * \p urecovery_state).
419  *
420  * First, it is responsible for probing that all servers are up.  This
421  * is the only operation that must be performed even if this is not
422  * yet the sync site, since otherwise this site may not notice that
423  * enough other machines are running to even elect this guy to be the
424  * sync site.
425  *
426  * After that, the recovery process does nothing until the beacon and
427  * voting modules manage to get this site elected sync site.
428  *
429  * After becoming sync site, recovery first attempts to find the best
430  * database available in the network (it must do this in order to
431  * ensure finding the latest committed data).  After finding the right
432  * database, it must fetch this dbase to the sync site.
433  *
434  * After fetching the dbase, it relabels it with a new version number,
435  * to ensure that everyone recognizes this dbase as the most recent
436  * dbase.
437  *
438  * One the dbase has been relabelled, this machine can start handling
439  * requests.  However, the recovery module still has one more task:
440  * propagating the dbase out to everyone who is up in the network.
441  */
442 void *
443 urecovery_Interact(void *dummy)
444 {
445     afs_int32 code, tcode;
446     struct ubik_server *bestServer = NULL;
447     struct ubik_server *ts;
448     int dbok, doingRPC, now;
449     afs_int32 lastProbeTime, lastDBVCheck;
450     /* if we're the sync site, the best db version we've found yet */
451     static struct ubik_version bestDBVersion;
452     struct ubik_version tversion;
453     struct timeval tv;
454     int length, tlen, offset, file, nbytes;
455     struct rx_call *rxcall;
456     char tbuffer[1024];
457     struct ubik_stat ubikstat;
458     struct in_addr inAddr;
459 #ifndef OLD_URECOVERY
460     char pbuffer[1028];
461     int flen, fd = -1;
462     afs_int32 pass;
463 #endif
464
465     /* otherwise, begin interaction */
466     urecovery_state = 0;
467     lastProbeTime = 0;
468     lastDBVCheck = 0;
469     while (1) {
470         /* Run through this loop every 4 seconds */
471         tv.tv_sec = 4;
472         tv.tv_usec = 0;
473 #ifdef AFS_PTHREAD_ENV
474         select(0, 0, 0, 0, &tv);
475 #else
476         IOMGR_Select(0, 0, 0, 0, &tv);
477 #endif
478
479         ubik_dprint("recovery running in state %x\n", urecovery_state);
480
481         /* Every 30 seconds, check all the down servers and mark them
482          * as up if they respond. When a server comes up or found to
483          * not be current, then re-find the the best database and 
484          * propogate it.
485          */
486         if ((now = FT_ApproxTime()) > 30 + lastProbeTime) {
487             for (ts = ubik_servers, doingRPC = 0; ts; ts = ts->next) {
488                 if (!ts->up) {
489                     doingRPC = 1;
490                     code = DoProbe(ts);
491                     if (code == 0) {
492                         ts->up = 1;
493                         urecovery_state &= ~UBIK_RECFOUNDDB;
494                     }
495                 } else if (!ts->currentDB) {
496                     urecovery_state &= ~UBIK_RECFOUNDDB;
497                 }
498             }
499             if (doingRPC)
500                 now = FT_ApproxTime();
501             lastProbeTime = now;
502         }
503
504         /* Mark whether we are the sync site */
505         if (!ubeacon_AmSyncSite()) {
506             urecovery_state &= ~UBIK_RECSYNCSITE;
507             continue;           /* nothing to do */
508         }
509         urecovery_state |= UBIK_RECSYNCSITE;
510
511         /* If a server has just come up or if we have not found the 
512          * most current database, then go find the most current db.
513          */
514         if (!(urecovery_state & UBIK_RECFOUNDDB)) {
515             bestServer = (struct ubik_server *)0;
516             bestDBVersion.epoch = 0;
517             bestDBVersion.counter = 0;
518             for (ts = ubik_servers; ts; ts = ts->next) {
519                 if (!ts->up)
520                     continue;   /* don't bother with these guys */
521                 if (ts->isClone)
522                     continue;
523                 code = DISK_GetVersion(ts->disk_rxcid, &ts->version);
524                 if (code == 0) {
525                     /* perhaps this is the best version */
526                     if (vcmp(ts->version, bestDBVersion) > 0) {
527                         /* new best version */
528                         bestDBVersion = ts->version;
529                         bestServer = ts;
530                     }
531                 }
532             }
533             /* take into consideration our version. Remember if we,
534              * the sync site, have the best version. Also note that
535              * we may need to send the best version out.
536              */
537             if (vcmp(ubik_dbase->version, bestDBVersion) >= 0) {
538                 bestDBVersion = ubik_dbase->version;
539                 bestServer = (struct ubik_server *)0;
540                 urecovery_state |= UBIK_RECHAVEDB;
541             } else {
542                 /* Clear the flag only when we know we have to retrieve
543                  * the db. Because urecovery_AllBetter() looks at it.
544                  */
545                 urecovery_state &= ~UBIK_RECHAVEDB;
546             }
547             lastDBVCheck = FT_ApproxTime();
548             urecovery_state |= UBIK_RECFOUNDDB;
549             urecovery_state &= ~UBIK_RECSENTDB;
550         }
551 #if defined(UBIK_PAUSE)
552         /* it's not possible for UBIK_RECFOUNDDB not to be set here.
553          * However, we might have lost UBIK_RECSYNCSITE, and that
554          * IS important.
555          */
556         if (!(urecovery_state & UBIK_RECSYNCSITE))
557             continue;           /* lost sync */
558 #else
559         if (!(urecovery_state & UBIK_RECFOUNDDB))
560             continue;           /* not ready */
561 #endif /* UBIK_PAUSE */
562
563         /* If we, the sync site, do not have the best db version, then
564          * go and get it from the server that does.
565          */
566         if ((urecovery_state & UBIK_RECHAVEDB) || !bestServer) {
567             urecovery_state |= UBIK_RECHAVEDB;
568         } else {
569             /* we don't have the best version; we should fetch it. */
570             DBHOLD(ubik_dbase);
571             urecovery_AbortAll(ubik_dbase);
572
573             /* Rx code to do the Bulk fetch */
574             file = 0;
575             offset = 0;
576             rxcall = rx_NewCall(bestServer->disk_rxcid);
577
578             ubik_print("Ubik: Synchronize database with server %s\n",
579                        afs_inet_ntoa(bestServer->addr[0]));
580
581             code = StartDISK_GetFile(rxcall, file);
582             if (code) {
583                 ubik_dprint("StartDiskGetFile failed=%d\n", code);
584                 goto FetchEndCall;
585             }
586             nbytes = rx_Read(rxcall, (char *)&length, sizeof(afs_int32));
587             length = ntohl(length);
588             if (nbytes != sizeof(afs_int32)) {
589                 ubik_dprint("Rx-read length error=%d\n", code = BULK_ERROR);
590                 code = EIO;
591                 goto FetchEndCall;
592             }
593
594 #ifdef OLD_URECOVERY
595             /* Truncate the file first */
596             code = (*ubik_dbase->truncate) (ubik_dbase, file, 0);
597             if (code) {
598                 ubik_dprint("truncate io error=%d\n", code);
599                 goto FetchEndCall;
600             }
601             tversion.counter = 0;
602 #endif
603             /* give invalid label during file transit */
604             tversion.epoch = 0;
605             code = (*ubik_dbase->setlabel) (ubik_dbase, file, &tversion);
606             if (code) {
607                 ubik_dprint("setlabel io error=%d\n", code);
608                 goto FetchEndCall;
609             }
610 #ifndef OLD_URECOVERY
611             flen = length;
612             afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
613             fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
614             if (fd < 0) {
615                 code = errno;
616                 goto FetchEndCall;
617             }
618             code = lseek(fd, HDRSIZE, 0);
619             if (code != HDRSIZE) {
620                 close(fd);
621                 goto FetchEndCall;
622             }
623 #endif
624
625             pass = 0;
626             while (length > 0) {
627                 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
628 #ifndef AFS_PTHREAD_ENV
629                 if (pass % 4 == 0)
630                     IOMGR_Poll();
631 #endif
632                 nbytes = rx_Read(rxcall, tbuffer, tlen);
633                 if (nbytes != tlen) {
634                     ubik_dprint("Rx-read bulk error=%d\n", code = BULK_ERROR);
635                     code = EIO;
636                     close(fd);
637                     goto FetchEndCall;
638                 }
639 #ifdef OLD_URECOVERY
640                 nbytes =
641                     (*ubik_dbase->write) (ubik_dbase, file, tbuffer, offset,
642                                           tlen);
643 #else
644                 nbytes = write(fd, tbuffer, tlen);
645                 pass++;
646 #endif
647                 if (nbytes != tlen) {
648                     code = UIOERROR;
649                     close(fd);
650                     goto FetchEndCall;
651                 }
652                 offset += tlen;
653                 length -= tlen;
654             }
655 #ifndef OLD_URECOVERY
656             code = close(fd);
657             if (code)
658                 goto FetchEndCall;
659 #endif      
660             code = EndDISK_GetFile(rxcall, &tversion);
661           FetchEndCall:
662             tcode = rx_EndCall(rxcall, code);
663             if (!code)
664                 code = tcode;
665             if (!code) {
666                 /* we got a new file, set up its header */
667                 urecovery_state |= UBIK_RECHAVEDB;
668                 memcpy(&ubik_dbase->version, &tversion,
669                        sizeof(struct ubik_version));
670 #ifdef OLD_URECOVERY
671                 (*ubik_dbase->sync) (ubik_dbase, 0);    /* get data out first */
672 #else
673                 afs_snprintf(tbuffer, sizeof(tbuffer), "%s.DB%s%d", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
674 #ifdef AFS_NT40_ENV
675                 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
676                 code = unlink(pbuffer);
677                 if (!code)
678                     code = rename(tbuffer, pbuffer);
679                 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
680 #endif
681                 if (!code) 
682                     code = rename(pbuffer, tbuffer);
683                 if (!code) {
684                     (*ubik_dbase->open) (ubik_dbase, 0);
685 #endif
686                     /* after data is good, sync disk with correct label */
687                     code =
688                         (*ubik_dbase->setlabel) (ubik_dbase, 0,
689                                                  &ubik_dbase->version);
690 #ifndef OLD_URECOVERY
691                 }
692 #ifdef AFS_NT40_ENV
693                 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
694                 unlink(pbuffer);
695 #endif
696 #endif
697             }
698             if (code) {
699 #ifndef OLD_URECOVERY
700                 unlink(pbuffer);
701                 /* 
702                  * We will effectively invalidate the old data forever now.
703                  * Unclear if we *should* but we do.
704                  */
705 #endif
706                 ubik_dbase->version.epoch = 0;
707                 ubik_dbase->version.counter = 0;
708                 ubik_print("Ubik: Synchronize database failed (error = %d)\n",
709                            code);
710             } else {
711                 ubik_print("Ubik: Synchronize database completed\n");
712                 urecovery_state |= UBIK_RECHAVEDB;
713             }
714             udisk_Invalidate(ubik_dbase, 0);    /* data has changed */
715 #ifdef AFS_PTHREAD_ENV
716             assert(pthread_cond_broadcast(&ubik_dbase->version_cond) == 0);
717 #else
718             LWP_NoYieldSignal(&ubik_dbase->version);
719 #endif
720             DBRELE(ubik_dbase);
721         }
722 #if defined(UBIK_PAUSE)
723         if (!(urecovery_state & UBIK_RECSYNCSITE))
724             continue;           /* lost sync */
725 #endif /* UBIK_PAUSE */
726         if (!(urecovery_state & UBIK_RECHAVEDB))
727             continue;           /* not ready */
728
729         /* If the database was newly initialized, then when we establish quorum, write
730          * a new label. This allows urecovery_AllBetter() to allow access for reads.
731          * Setting it to 2 also allows another site to come along with a newer
732          * database and overwrite this one.
733          */
734         if (ubik_dbase->version.epoch == 1) {
735             DBHOLD(ubik_dbase);
736             urecovery_AbortAll(ubik_dbase);
737             ubik_epochTime = 2;
738             ubik_dbase->version.epoch = ubik_epochTime;
739             ubik_dbase->version.counter = 1;
740             code =
741                 (*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
742             udisk_Invalidate(ubik_dbase, 0);    /* data may have changed */
743 #ifdef AFS_PTHREAD_ENV
744             assert(pthread_cond_broadcast(&ubik_dbase->version_cond) == 0);
745 #else
746             LWP_NoYieldSignal(&ubik_dbase->version);
747 #endif
748             DBRELE(ubik_dbase);
749         }
750
751         /* Check the other sites and send the database to them if they
752          * do not have the current db.
753          */
754         if (!(urecovery_state & UBIK_RECSENTDB)) {
755             /* now propagate out new version to everyone else */
756             dbok = 1;           /* start off assuming they all worked */
757
758             DBHOLD(ubik_dbase);
759             /*
760              * Check if a write transaction is in progress. We can't send the
761              * db when a write is in progress here because the db would be
762              * obsolete as soon as it goes there. Also, ops after the begin
763              * trans would reach the recepient and wouldn't find a transaction
764              * pending there.  Frankly, I don't think it's possible to get past
765              * the write-lock above if there is a write transaction in progress,
766              * but then, it won't hurt to check, will it?
767              */
768             if (ubik_dbase->flags & DBWRITING) {
769                 struct timeval tv;
770                 int safety = 0;
771                 tv.tv_sec = 0;
772                 tv.tv_usec = 50000;
773                 while ((ubik_dbase->flags & DBWRITING) && (safety < 500)) {
774                     DBRELE(ubik_dbase);
775                     /* sleep for a little while */
776 #ifdef AFS_PTHREAD_ENV
777                     select(0, 0, 0, 0, &tv);
778 #else
779                     IOMGR_Select(0, 0, 0, 0, &tv);
780 #endif
781                     tv.tv_usec += 10000;
782                     safety++;
783                     DBHOLD(ubik_dbase);
784                 }
785             }
786
787             for (ts = ubik_servers; ts; ts = ts->next) {
788                 inAddr.s_addr = ts->addr[0];
789                 if (!ts->up) {
790                     ubik_dprint("recovery cannot send version to %s\n",
791                                 afs_inet_ntoa(inAddr.s_addr));
792                     dbok = 0;
793                     continue;
794                 }
795                 ubik_dprint("recovery sending version to %s\n",
796                             afs_inet_ntoa(inAddr.s_addr));
797                 if (vcmp(ts->version, ubik_dbase->version) != 0) {
798                     ubik_dprint("recovery stating local database\n");
799
800                     /* Rx code to do the Bulk Store */
801                     code = (*ubik_dbase->stat) (ubik_dbase, 0, &ubikstat);
802                     if (!code) {
803                         length = ubikstat.size;
804                         file = offset = 0;
805                         rxcall = rx_NewCall(ts->disk_rxcid);
806                         code =
807                             StartDISK_SendFile(rxcall, file, length,
808                                                &ubik_dbase->version);
809                         if (code) {
810                             ubik_dprint("StartDiskSendFile failed=%d\n",
811                                         code);
812                             goto StoreEndCall;
813                         }
814                         while (length > 0) {
815                             tlen =
816                                 (length >
817                                  sizeof(tbuffer) ? sizeof(tbuffer) : length);
818                             nbytes =
819                                 (*ubik_dbase->read) (ubik_dbase, file,
820                                                      tbuffer, offset, tlen);
821                             if (nbytes != tlen) {
822                                 ubik_dprint("Local disk read error=%d\n",
823                                             code = UIOERROR);
824                                 goto StoreEndCall;
825                             }
826                             nbytes = rx_Write(rxcall, tbuffer, tlen);
827                             if (nbytes != tlen) {
828                                 ubik_dprint("Rx-write bulk error=%d\n", code =
829                                             BULK_ERROR);
830                                 goto StoreEndCall;
831                             }
832                             offset += tlen;
833                             length -= tlen;
834                         }
835                         code = EndDISK_SendFile(rxcall);
836                       StoreEndCall:
837                         code = rx_EndCall(rxcall, code);
838                     }
839                     if (code == 0) {
840                         /* we set a new file, process its header */
841                         ts->version = ubik_dbase->version;
842                         ts->currentDB = 1;
843                     } else
844                         dbok = 0;
845                 } else {
846                     /* mark file up to date */
847                     ts->currentDB = 1;
848                 }
849             }
850             DBRELE(ubik_dbase);
851             if (dbok)
852                 urecovery_state |= UBIK_RECSENTDB;
853         }
854     }
855     return NULL;
856 }
857
858 /*!
859  * \brief send a Probe to all the network address of this server 
860  * 
861  * \return 0 if success, else return 1
862  */
863 int
864 DoProbe(struct ubik_server *server)
865 {
866     struct rx_connection *conns[UBIK_MAX_INTERFACE_ADDR];
867     struct rx_connection *connSuccess = 0;
868     int i, j;
869     afs_uint32 addr;
870     char buffer[32];
871     extern afs_int32 ubikSecIndex;
872     extern struct rx_securityClass *ubikSecClass;
873
874     for (i = 0; (addr = server->addr[i]) && (i < UBIK_MAX_INTERFACE_ADDR);
875          i++) {
876         conns[i] =
877             rx_NewConnection(addr, ubik_callPortal, DISK_SERVICE_ID,
878                              ubikSecClass, ubikSecIndex);
879
880         /* user requirement to use only the primary interface */
881         if (ubikPrimaryAddrOnly) {
882             i = 1;
883             break;
884         }
885     }
886     assert(i);                  /* at least one interface address for this server */
887
888     multi_Rx(conns, i) {
889         multi_DISK_Probe();
890         if (!multi_error) {     /* first success */
891             addr = server->addr[multi_i];       /* successful interface addr */
892
893             if (server->disk_rxcid)     /* destroy existing conn */
894                 rx_DestroyConnection(server->disk_rxcid);
895             if (server->vote_rxcid)
896                 rx_DestroyConnection(server->vote_rxcid);
897
898             /* make new connections */
899             server->disk_rxcid = conns[multi_i];
900             server->vote_rxcid = rx_NewConnection(addr, ubik_callPortal, VOTE_SERVICE_ID, ubikSecClass, ubikSecIndex);  /* for vote reqs */
901
902             connSuccess = conns[multi_i];
903             strcpy(buffer, (char *)afs_inet_ntoa(server->addr[0]));
904             ubik_print
905                 ("ubik:server %s is back up: will be contacted through %s\n",
906                  buffer, afs_inet_ntoa(addr));
907
908             multi_Abort;
909         }
910     } multi_End_Ignore;
911
912     /* Destroy all connections except the one on which we succeeded */
913     for (j = 0; j < i; j++)
914         if (conns[j] != connSuccess)
915             rx_DestroyConnection(conns[j]);
916
917     if (!connSuccess)
918         ubik_dprint("ubik:server %s still down\n",
919                     afs_inet_ntoa(server->addr[0]));
920
921     if (connSuccess)
922         return 0;               /* success */
923     else
924         return 1;               /* failure */
925 }