fc8a2bf98b69b52314a4df4c7036546a521cf034
[openafs.git] / src / ubik / recovery.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 RCSID
14     ("$Header$");
15
16 #include <sys/types.h>
17 #ifdef AFS_NT40_ENV
18 #include <winsock2.h>
19 #include <time.h>
20 #include <fcntl.h>
21 #else
22 #include <sys/file.h>
23 #include <netinet/in.h>
24 #include <sys/time.h>
25 #endif
26 #include <assert.h>
27 #include <lock.h>
28 #include <string.h>
29 #include <rx/xdr.h>
30 #include <rx/rx.h>
31 #include <errno.h>
32 #include <afs/afsutil.h>
33
34 #define UBIK_INTERNALS
35 #include "ubik.h"
36 #include "ubik_int.h"
37
38 /* This module is responsible for determining when the system has
39  * recovered to the point that it can handle new transactions.  It
40  * replays logs, polls to determine the current dbase after a crash,
41  * and distributes the new database to the others.
42  */
43
44 /* The sync site associates a version number with each database.  It
45  * broadcasts the version associated with its current dbase in every
46  * one of its beacon messages.  When the sync site send a dbase to a
47  * server, it also sends the db's version.  A non-sync site server can
48  * tell if it has the right dbase version by simply comparing the
49  * version from the beacon message (uvote_dbVersion) with the version
50  * associated with the database (ubik_dbase->version).  The sync site
51  * itself simply has one counter to keep track of all of this (again
52  * ubik_dbase->version).
53  */
54
55 /* sync site: routine called when the sync site loses its quorum; this
56  * procedure is called "up" from the beacon package.  It resyncs the
57  * dbase and nudges the recovery daemon to try to propagate out the
58  * changes.  It also resets the recovery daemon's state, since
59  * recovery must potentially find a new dbase to propagate out.  This
60  * routine should not do anything with variables used by non-sync site
61  * servers.
62  */
63
64 /* if this flag is set, then ubik will use only the primary address 
65 ** ( the address specified in the CellServDB) to contact other 
66 ** ubik servers. Ubik recovery will not try opening connections 
67 ** to the alternate interface addresses. 
68 */
69 int ubikPrimaryAddrOnly;
70
71 int
72 urecovery_ResetState(void)
73 {
74     urecovery_state = 0;
75     LWP_NoYieldSignal(&urecovery_state);
76     return 0;
77 }
78
79 /* sync site: routine called when a non-sync site server goes down; restarts recovery
80  * process to send missing server the new db when it comes back up.
81  * This routine should not do anything with variables used by non-sync site servers. 
82  */
83 int
84 urecovery_LostServer(void)
85 {
86     LWP_NoYieldSignal(&urecovery_state);
87     return 0;
88 }
89
90 /* return true iff we have a current database (called by both sync
91  * sites and non-sync sites) How do we determine this?  If we're the
92  * sync site, we wait until recovery has finished fetching and
93  * re-labelling its dbase (it may still be trying to propagate it out
94  * to everyone else; that's THEIR problem).  If we're not the sync
95  * site, then we must have a dbase labelled with the right version,
96  * and we must have a currently-good sync site.
97  */
98 int
99 urecovery_AllBetter(register struct ubik_dbase *adbase, int areadAny)
100 {
101     register afs_int32 rcode;
102
103     ubik_dprint("allbetter checking\n");
104     rcode = 0;
105
106
107     if (areadAny) {
108         if (ubik_dbase->version.epoch > 1)
109             rcode = 1;          /* Happy with any good version of database */
110     }
111
112     /* Check if we're sync site and we've got the right data */
113     else if (ubeacon_AmSyncSite() && (urecovery_state & UBIK_RECHAVEDB)) {
114         rcode = 1;
115     }
116
117     /* next, check if we're aux site, and we've ever been sent the
118      * right data (note that if a dbase update fails, we won't think
119      * that the sync site is still the sync site, 'cause it won't talk
120      * to us until a timeout period has gone by.  When we recover, we
121      * leave this clear until we get a new dbase */
122     else if ((uvote_GetSyncSite() && (vcmp(ubik_dbVersion, ubik_dbase->version) == 0))) {       /* && order is important */
123         rcode = 1;
124     }
125
126     ubik_dprint("allbetter: returning %d\n", rcode);
127     return rcode;
128 }
129
130 /* abort all transactions on this database */
131 int
132 urecovery_AbortAll(struct ubik_dbase *adbase)
133 {
134     register struct ubik_trans *tt;
135     for (tt = adbase->activeTrans; tt; tt = tt->next) {
136         udisk_abort(tt);
137     }
138     return 0;
139 }
140
141 /* this routine aborts the current remote transaction, if any, if the tid is wrong */
142 int
143 urecovery_CheckTid(register struct ubik_tid *atid)
144 {
145     if (ubik_currentTrans) {
146         /* there is remote write trans, see if we match, see if this
147          * is a new transaction */
148         if (atid->epoch != ubik_currentTrans->tid.epoch
149             || atid->counter > ubik_currentTrans->tid.counter) {
150             /* don't match, abort it */
151             /* If the thread is not waiting for lock - ok to end it */
152 #if !defined(UBIK_PAUSE)
153             if (ubik_currentTrans->locktype != LOCKWAIT) {
154 #endif /* UBIK_PAUSE */
155                 udisk_end(ubik_currentTrans);
156 #if !defined(UBIK_PAUSE)
157             }
158 #endif /* UBIK_PAUSE */
159             ubik_currentTrans = (struct ubik_trans *)0;
160         }
161     }
162     return 0;
163 }
164
165 /* log format is defined here, and implicitly in disk.c
166  *
167  * 4 byte opcode, followed by parameters, each 4 bytes long.  All integers
168  * are in logged in network standard byte order, in case we want to move logs
169  * from machine-to-machine someday.
170  *
171  * Begin transaction: opcode
172  * Commit transaction: opcode, version (8 bytes)
173  * Truncate file: opcode, file number, length
174  * Abort transaction: opcode
175  * Write data: opcode, file, position, length, <length> data bytes
176  *
177  * A very simple routine, it just replays the log.  Note that this is a new-value only log, which
178  * implies that no uncommitted data is written to the dbase: one writes data to the log, including
179  * the commit record, then we allow data to be written through to the dbase.  In our particular
180  * implementation, once a transaction is done, we write out the pages to the database, so that
181  * our buffer package doesn't have to know about stable and uncommitted data in the memory buffers:
182  * any changed data while there is an uncommitted write transaction can be zapped during an
183  * abort and the remaining dbase on the disk is exactly the right dbase, without having to read
184  * the log.
185  */
186
187 /* replay logs */
188 static int
189 ReplayLog(register struct ubik_dbase *adbase)
190 {
191     afs_int32 opcode;
192     register afs_int32 code, tpos;
193     int logIsGood;
194     afs_int32 len, thisSize, tfile, filePos;
195     afs_int32 buffer[4];
196     afs_int32 syncFile = -1;
197     afs_int32 data[1024];
198
199     /* read the lock twice, once to see whether we have a transaction to deal
200      * with that committed, (theoretically, we should support more than one
201      * trans in the log at once, but not yet), and once replaying the
202      * transactions.  */
203     tpos = 0;
204     logIsGood = 0;
205     /* for now, assume that all ops in log pertain to one transaction; see if there's a commit */
206     while (1) {
207         code =
208             (*adbase->read) (adbase, LOGFILE, (char *)&opcode, tpos,
209                              sizeof(afs_int32));
210         if (code != sizeof(afs_int32))
211             break;
212         if (opcode == LOGNEW) {
213             /* handle begin trans */
214             tpos += sizeof(afs_int32);
215         } else if (opcode == LOGABORT)
216             break;
217         else if (opcode == LOGEND) {
218             logIsGood = 1;
219             break;
220         } else if (opcode == LOGTRUNCATE) {
221             tpos += 4;
222             code =
223                 (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
224                                  2 * sizeof(afs_int32));
225             if (code != 2 * sizeof(afs_int32))
226                 break;          /* premature eof or io error */
227             tpos += 2 * sizeof(afs_int32);
228         } else if (opcode == LOGDATA) {
229             tpos += 4;
230             code =
231                 (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
232                                  3 * sizeof(afs_int32));
233             if (code != 3 * sizeof(afs_int32))
234                 break;
235             /* otherwise, skip over the data bytes, too */
236             tpos += buffer[2] + 3 * sizeof(afs_int32);
237         } else {
238             ubik_dprint("corrupt log opcode (%d) at position %d\n", opcode,
239                         tpos);
240             break;              /* corrupt log! */
241         }
242     }
243     if (logIsGood) {
244         /* actually do the replay; log should go all the way through the commit record, since
245          * we just read it above. */
246         tpos = 0;
247         logIsGood = 0;
248         syncFile = -1;
249         while (1) {
250             code =
251                 (*adbase->read) (adbase, LOGFILE, (char *)&opcode, tpos,
252                                  sizeof(afs_int32));
253             if (code != sizeof(afs_int32))
254                 break;
255             if (opcode == LOGNEW) {
256                 /* handle begin trans */
257                 tpos += sizeof(afs_int32);
258             } else if (opcode == LOGABORT)
259                 panic("log abort\n");
260             else if (opcode == LOGEND) {
261                 tpos += 4;
262                 code =
263                     (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
264                                      2 * sizeof(afs_int32));
265                 if (code != 2 * sizeof(afs_int32))
266                     return UBADLOG;
267                 code = (*adbase->setlabel) (adbase, 0, (ubik_version *)buffer);
268                 if (code)
269                     return code;
270                 logIsGood = 1;
271                 break;          /* all done now */
272             } else if (opcode == LOGTRUNCATE) {
273                 tpos += 4;
274                 code =
275                     (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
276                                      2 * sizeof(afs_int32));
277                 if (code != 2 * sizeof(afs_int32))
278                     break;      /* premature eof or io error */
279                 tpos += 2 * sizeof(afs_int32);
280                 code =
281                     (*adbase->truncate) (adbase, ntohl(buffer[0]),
282                                          ntohl(buffer[1]));
283                 if (code)
284                     return code;
285             } else if (opcode == LOGDATA) {
286                 tpos += 4;
287                 code =
288                     (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
289                                      3 * sizeof(afs_int32));
290                 if (code != 3 * sizeof(afs_int32))
291                     break;
292                 tpos += 3 * sizeof(afs_int32);
293                 /* otherwise, skip over the data bytes, too */
294                 len = ntohl(buffer[2]); /* total number of bytes to copy */
295                 filePos = ntohl(buffer[1]);
296                 tfile = ntohl(buffer[0]);
297                 /* try to minimize file syncs */
298                 if (syncFile != tfile) {
299                     if (syncFile >= 0)
300                         code = (*adbase->sync) (adbase, syncFile);
301                     else
302                         code = 0;
303                     syncFile = tfile;
304                     if (code)
305                         return code;
306                 }
307                 while (len > 0) {
308                     thisSize = (len > sizeof(data) ? sizeof(data) : len);
309                     /* copy sizeof(data) buffer bytes at a time */
310                     code =
311                         (*adbase->read) (adbase, LOGFILE, (char *)data, tpos,
312                                          thisSize);
313                     if (code != thisSize)
314                         return UBADLOG;
315                     code =
316                         (*adbase->write) (adbase, tfile, (char *)data, filePos,
317                                           thisSize);
318                     if (code != thisSize)
319                         return UBADLOG;
320                     filePos += thisSize;
321                     tpos += thisSize;
322                     len -= thisSize;
323                 }
324             } else {
325                 ubik_dprint("corrupt log opcode (%d) at position %d\n",
326                             opcode, tpos);
327                 break;          /* corrupt log! */
328             }
329         }
330         if (logIsGood) {
331             if (syncFile >= 0)
332                 code = (*adbase->sync) (adbase, syncFile);
333             if (code)
334                 return code;
335         } else {
336             ubik_dprint("Log read error on pass 2\n");
337             return UBADLOG;
338         }
339     }
340
341     /* now truncate the log, we're done with it */
342     code = (*adbase->truncate) (adbase, LOGFILE, 0);
343     return code;
344 }
345
346 /* Called at initialization to figure out version of the dbase we really have.
347  * This routine is called after replaying the log; it reads the restored labels.
348  */
349 static int
350 InitializeDB(register struct ubik_dbase *adbase)
351 {
352     register afs_int32 code;
353
354     code = (*adbase->getlabel) (adbase, 0, &adbase->version);
355     if (code) {
356         /* try setting the label to a new value */
357         adbase->version.epoch = 1;      /* value for newly-initialized db */
358         adbase->version.counter = 1;
359         code = (*adbase->setlabel) (adbase, 0, &adbase->version);
360         if (code) {
361             /* failed, try to set it back */
362             adbase->version.epoch = 0;
363             adbase->version.counter = 0;
364             (*adbase->setlabel) (adbase, 0, &adbase->version);
365         }
366         LWP_NoYieldSignal(&adbase->version);
367     }
368     return 0;
369 }
370
371 /* initialize the local dbase
372  * We replay the logs and then read the resulting file to figure out what version we've really got.
373  */
374 int
375 urecovery_Initialize(register struct ubik_dbase *adbase)
376 {
377     register afs_int32 code;
378
379     code = ReplayLog(adbase);
380     if (code)
381         return code;
382     code = InitializeDB(adbase);
383     return code;
384 }
385
386 /* Main interaction loop for the recovery manager
387  * The recovery light-weight process only runs when you're the
388  * synchronization site.  It performs the following tasks, if and only
389  * if the prerequisite tasks have been performed successfully (it
390  * keeps track of which ones have been performed in its bit map,
391  * urecovery_state).
392  *
393  * First, it is responsible for probing that all servers are up.  This
394  * is the only operation that must be performed even if this is not
395  * yet the sync site, since otherwise this site may not notice that
396  * enough other machines are running to even elect this guy to be the
397  * sync site.
398  *
399  * After that, the recovery process does nothing until the beacon and
400  * voting modules manage to get this site elected sync site.
401  *
402  * After becoming sync site, recovery first attempts to find the best
403  * database available in the network (it must do this in order to
404  * ensure finding the latest committed data).  After finding the right
405  * database, it must fetch this dbase to the sync site.
406  *
407  * After fetching the dbase, it relabels it with a new version number,
408  * to ensure that everyone recognizes this dbase as the most recent
409  * dbase.
410  *
411  * One the dbase has been relabelled, this machine can start handling
412  * requests.  However, the recovery module still has one more task:
413  * propagating the dbase out to everyone who is up in the network.
414  */
415 void *
416 urecovery_Interact(void *dummy)
417 {
418     afs_int32 code, tcode;
419     struct ubik_server *bestServer = NULL;
420     struct ubik_server *ts;
421     int dbok, doingRPC, now;
422     afs_int32 lastProbeTime, lastDBVCheck;
423     /* if we're the sync site, the best db version we've found yet */
424     static struct ubik_version bestDBVersion;
425     struct ubik_version tversion;
426     struct timeval tv;
427     int length, tlen, offset, file, nbytes;
428     struct rx_call *rxcall;
429     char tbuffer[1024];
430     struct ubik_stat ubikstat;
431     struct in_addr inAddr;
432 #ifndef OLD_URECOVERY
433     char pbuffer[1028];
434     int flen, fd = -1;
435     afs_int32 epoch, pass;
436 #endif
437
438     /* otherwise, begin interaction */
439     urecovery_state = 0;
440     lastProbeTime = 0;
441     lastDBVCheck = 0;
442     while (1) {
443         /* Run through this loop every 4 seconds */
444         tv.tv_sec = 4;
445         tv.tv_usec = 0;
446         IOMGR_Select(0, 0, 0, 0, &tv);
447
448         ubik_dprint("recovery running in state %x\n", urecovery_state);
449
450         /* Every 30 seconds, check all the down servers and mark them
451          * as up if they respond. When a server comes up or found to
452          * not be current, then re-find the the best database and 
453          * propogate it.
454          */
455         if ((now = FT_ApproxTime()) > 30 + lastProbeTime) {
456             for (ts = ubik_servers, doingRPC = 0; ts; ts = ts->next) {
457                 if (!ts->up) {
458                     doingRPC = 1;
459                     code = DoProbe(ts);
460                     if (code == 0) {
461                         ts->up = 1;
462                         urecovery_state &= ~UBIK_RECFOUNDDB;
463                     }
464                 } else if (!ts->currentDB) {
465                     urecovery_state &= ~UBIK_RECFOUNDDB;
466                 }
467             }
468             if (doingRPC)
469                 now = FT_ApproxTime();
470             lastProbeTime = now;
471         }
472
473         /* Mark whether we are the sync site */
474         if (!ubeacon_AmSyncSite()) {
475             urecovery_state &= ~UBIK_RECSYNCSITE;
476             continue;           /* nothing to do */
477         }
478         urecovery_state |= UBIK_RECSYNCSITE;
479
480         /* If a server has just come up or if we have not found the 
481          * most current database, then go find the most current db.
482          */
483         if (!(urecovery_state & UBIK_RECFOUNDDB)) {
484             bestServer = (struct ubik_server *)0;
485             bestDBVersion.epoch = 0;
486             bestDBVersion.counter = 0;
487             for (ts = ubik_servers; ts; ts = ts->next) {
488                 if (!ts->up)
489                     continue;   /* don't bother with these guys */
490                 if (ts->isClone)
491                     continue;
492                 code = DISK_GetVersion(ts->disk_rxcid, &ts->version);
493                 if (code == 0) {
494                     /* perhaps this is the best version */
495                     if (vcmp(ts->version, bestDBVersion) > 0) {
496                         /* new best version */
497                         bestDBVersion = ts->version;
498                         bestServer = ts;
499                     }
500                 }
501             }
502             /* take into consideration our version. Remember if we,
503              * the sync site, have the best version. Also note that
504              * we may need to send the best version out.
505              */
506             if (vcmp(ubik_dbase->version, bestDBVersion) >= 0) {
507                 bestDBVersion = ubik_dbase->version;
508                 bestServer = (struct ubik_server *)0;
509                 urecovery_state |= UBIK_RECHAVEDB;
510             } else {
511                 /* Clear the flag only when we know we have to retrieve
512                  * the db. Because urecovery_AllBetter() looks at it.
513                  */
514                 urecovery_state &= ~UBIK_RECHAVEDB;
515             }
516             lastDBVCheck = FT_ApproxTime();
517             urecovery_state |= UBIK_RECFOUNDDB;
518             urecovery_state &= ~UBIK_RECSENTDB;
519         }
520 #if defined(UBIK_PAUSE)
521         /* it's not possible for UBIK_RECFOUNDDB not to be set here.
522          * However, we might have lost UBIK_RECSYNCSITE, and that
523          * IS important.
524          */
525         if (!(urecovery_state & UBIK_RECSYNCSITE))
526             continue;           /* lost sync */
527 #else
528         if (!(urecovery_state & UBIK_RECFOUNDDB))
529             continue;           /* not ready */
530 #endif /* UBIK_PAUSE */
531
532         /* If we, the sync site, do not have the best db version, then
533          * go and get it from the server that does.
534          */
535         if ((urecovery_state & UBIK_RECHAVEDB) || !bestServer) {
536             urecovery_state |= UBIK_RECHAVEDB;
537         } else {
538             /* we don't have the best version; we should fetch it. */
539             DBHOLD(ubik_dbase);
540             urecovery_AbortAll(ubik_dbase);
541
542             /* Rx code to do the Bulk fetch */
543             file = 0;
544             offset = 0;
545             rxcall = rx_NewCall(bestServer->disk_rxcid);
546
547             ubik_print("Ubik: Synchronize database with server %s\n",
548                        afs_inet_ntoa(bestServer->addr[0]));
549
550             code = StartDISK_GetFile(rxcall, file);
551             if (code) {
552                 ubik_dprint("StartDiskGetFile failed=%d\n", code);
553                 goto FetchEndCall;
554             }
555             nbytes = rx_Read(rxcall, (char *)&length, sizeof(afs_int32));
556             length = ntohl(length);
557             if (nbytes != sizeof(afs_int32)) {
558                 ubik_dprint("Rx-read length error=%d\n", code = BULK_ERROR);
559                 code = EIO;
560                 goto FetchEndCall;
561             }
562
563 #ifdef OLD_URECOVERY
564             /* Truncate the file first */
565             code = (*ubik_dbase->truncate) (ubik_dbase, file, 0);
566             if (code) {
567                 ubik_dprint("truncate io error=%d\n", code);
568                 goto FetchEndCall;
569             }
570             tversion.counter = 0;
571 #endif
572             /* give invalid label during file transit */
573             tversion.epoch = 0;
574             code = (*ubik_dbase->setlabel) (ubik_dbase, file, &tversion);
575             if (code) {
576                 ubik_dprint("setlabel io error=%d\n", code);
577                 goto FetchEndCall;
578             }
579 #ifndef OLD_URECOVERY
580             flen = length;
581             afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.TMP", ubik_dbase->pathName);
582             fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
583             if (fd < 0) {
584                 code = errno;
585                 goto FetchEndCall;
586             }
587             code = lseek(fd, HDRSIZE, 0);
588             if (code != HDRSIZE) {
589                 close(fd);
590                 goto FetchEndCall;
591             }
592 #endif
593
594             while (length > 0) {
595                 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
596 #ifndef AFS_PTHREAD_ENV
597                 if (pass % 4 == 0)
598                     IOMGR_Poll();
599 #endif
600                 nbytes = rx_Read(rxcall, tbuffer, tlen);
601                 if (nbytes != tlen) {
602                     ubik_dprint("Rx-read bulk error=%d\n", code = BULK_ERROR);
603                     code = EIO;
604                     close(fd);
605                     goto FetchEndCall;
606                 }
607 #ifdef OLD_URECOVERY
608                 nbytes =
609                     (*ubik_dbase->write) (ubik_dbase, file, tbuffer, offset,
610                                           tlen);
611 #else
612                 nbytes = write(fd, tbuffer, tlen);
613                 pass++;
614 #endif
615                 if (nbytes != tlen) {
616                     code = UIOERROR;
617                     close(fd);
618                     goto FetchEndCall;
619                 }
620                 offset += tlen;
621                 length -= tlen;
622             }
623 #ifndef OLD_URECOVERY
624             code = close(fd);
625             if (code)
626                 goto FetchEndCall;
627 #endif      
628             code = EndDISK_GetFile(rxcall, &tversion);
629           FetchEndCall:
630             tcode = rx_EndCall(rxcall, code);
631             if (!code)
632                 code = tcode;
633             if (!code) {
634                 /* we got a new file, set up its header */
635                 urecovery_state |= UBIK_RECHAVEDB;
636                 memcpy(&ubik_dbase->version, &tversion,
637                        sizeof(struct ubik_version));
638 #ifdef OLD_URECOVERY
639                 (*ubik_dbase->sync) (ubik_dbase, 0);    /* get data out first */
640 #else
641                 afs_snprintf(tbuffer, sizeof(tbuffer), "%s.DB0", ubik_dbase->pathName);
642 #ifdef AFS_NT40_ENV
643                 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.OLD", ubik_dbase->pathName);
644                 code = unlink(pbuffer);
645                 if (!code)
646                     code = rename(tbuffer, pbuffer);
647                 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.TMP", ubik_dbase->pathName);
648 #endif
649                 if (!code) 
650                     code = rename(pbuffer, tbuffer);
651                 if (!code)
652 #endif
653                 /* after data is good, sync disk with correct label */
654                 code =
655                     (*ubik_dbase->setlabel) (ubik_dbase, 0,
656                                              &ubik_dbase->version);
657 #ifndef OLD_URECOVERY
658 #ifdef AFS_NT40_ENV
659                 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.OLD", ubik_dbase->pathName);
660                 unlink(pbuffer);
661 #endif
662 #endif
663             }
664             if (code) {
665 #ifndef OLD_URECOVERY
666                 unlink(pbuffer);
667                 /* 
668                  * We will effectively invalidate the old data forever now.
669                  * Unclear if we *should* but we do.
670                  */
671 #endif
672                 ubik_dbase->version.epoch = 0;
673                 ubik_dbase->version.counter = 0;
674                 ubik_print("Ubik: Synchronize database failed (error = %d)\n",
675                            code);
676             } else {
677                 ubik_print("Ubik: Synchronize database completed\n");
678                 urecovery_state |= UBIK_RECHAVEDB;
679             }
680             udisk_Invalidate(ubik_dbase, 0);    /* data has changed */
681             LWP_NoYieldSignal(&ubik_dbase->version);
682             DBRELE(ubik_dbase);
683         }
684 #if defined(UBIK_PAUSE)
685         if (!(urecovery_state & UBIK_RECSYNCSITE))
686             continue;           /* lost sync */
687 #endif /* UBIK_PAUSE */
688         if (!(urecovery_state & UBIK_RECHAVEDB))
689             continue;           /* not ready */
690
691         /* If the database was newly initialized, then when we establish quorum, write
692          * a new label. This allows urecovery_AllBetter() to allow access for reads.
693          * Setting it to 2 also allows another site to come along with a newer
694          * database and overwrite this one.
695          */
696         if (ubik_dbase->version.epoch == 1) {
697             DBHOLD(ubik_dbase);
698             urecovery_AbortAll(ubik_dbase);
699             ubik_epochTime = 2;
700             ubik_dbase->version.epoch = ubik_epochTime;
701             ubik_dbase->version.counter = 1;
702             code =
703                 (*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
704             udisk_Invalidate(ubik_dbase, 0);    /* data may have changed */
705             LWP_NoYieldSignal(&ubik_dbase->version);
706             DBRELE(ubik_dbase);
707         }
708
709         /* Check the other sites and send the database to them if they
710          * do not have the current db.
711          */
712         if (!(urecovery_state & UBIK_RECSENTDB)) {
713             /* now propagate out new version to everyone else */
714             dbok = 1;           /* start off assuming they all worked */
715
716             DBHOLD(ubik_dbase);
717             /*
718              * Check if a write transaction is in progress. We can't send the
719              * db when a write is in progress here because the db would be
720              * obsolete as soon as it goes there. Also, ops after the begin
721              * trans would reach the recepient and wouldn't find a transaction
722              * pending there.  Frankly, I don't think it's possible to get past
723              * the write-lock above if there is a write transaction in progress,
724              * but then, it won't hurt to check, will it?
725              */
726             if (ubik_dbase->flags & DBWRITING) {
727                 struct timeval tv;
728                 int safety = 0;
729                 tv.tv_sec = 0;
730                 tv.tv_usec = 50000;
731                 while ((ubik_dbase->flags & DBWRITING) && (safety < 500)) {
732                     DBRELE(ubik_dbase);
733                     /* sleep for a little while */
734                     IOMGR_Select(0, 0, 0, 0, &tv);
735                     tv.tv_usec += 10000;
736                     safety++;
737                     DBHOLD(ubik_dbase);
738                 }
739             }
740
741             for (ts = ubik_servers; ts; ts = ts->next) {
742                 inAddr.s_addr = ts->addr[0];
743                 if (!ts->up) {
744                     ubik_dprint("recovery cannot send version to %s\n",
745                                 afs_inet_ntoa(inAddr.s_addr));
746                     dbok = 0;
747                     continue;
748                 }
749                 ubik_dprint("recovery sending version to %s\n",
750                             afs_inet_ntoa(inAddr.s_addr));
751                 if (vcmp(ts->version, ubik_dbase->version) != 0) {
752                     ubik_dprint("recovery stating local database\n");
753
754                     /* Rx code to do the Bulk Store */
755                     code = (*ubik_dbase->stat) (ubik_dbase, 0, &ubikstat);
756                     if (!code) {
757                         length = ubikstat.size;
758                         file = offset = 0;
759                         rxcall = rx_NewCall(ts->disk_rxcid);
760                         code =
761                             StartDISK_SendFile(rxcall, file, length,
762                                                &ubik_dbase->version);
763                         if (code) {
764                             ubik_dprint("StartDiskSendFile failed=%d\n",
765                                         code);
766                             goto StoreEndCall;
767                         }
768                         while (length > 0) {
769                             tlen =
770                                 (length >
771                                  sizeof(tbuffer) ? sizeof(tbuffer) : length);
772                             nbytes =
773                                 (*ubik_dbase->read) (ubik_dbase, file,
774                                                      tbuffer, offset, tlen);
775                             if (nbytes != tlen) {
776                                 ubik_dprint("Local disk read error=%d\n",
777                                             code = UIOERROR);
778                                 goto StoreEndCall;
779                             }
780                             nbytes = rx_Write(rxcall, tbuffer, tlen);
781                             if (nbytes != tlen) {
782                                 ubik_dprint("Rx-write bulk error=%d\n", code =
783                                             BULK_ERROR);
784                                 goto StoreEndCall;
785                             }
786                             offset += tlen;
787                             length -= tlen;
788                         }
789                         code = EndDISK_SendFile(rxcall);
790                       StoreEndCall:
791                         code = rx_EndCall(rxcall, code);
792                     }
793                     if (code == 0) {
794                         /* we set a new file, process its header */
795                         ts->version = ubik_dbase->version;
796                         ts->currentDB = 1;
797                     } else
798                         dbok = 0;
799                 } else {
800                     /* mark file up to date */
801                     ts->currentDB = 1;
802                 }
803             }
804             DBRELE(ubik_dbase);
805             if (dbok)
806                 urecovery_state |= UBIK_RECSENTDB;
807         }
808     }
809 }
810
811 /*
812 ** send a Probe to all the network address of this server 
813 ** Return 0  if success, else return 1
814 */
815 int
816 DoProbe(struct ubik_server *server)
817 {
818     struct rx_connection *conns[UBIK_MAX_INTERFACE_ADDR];
819     struct rx_connection *connSuccess = 0;
820     int i, j;
821     afs_uint32 addr;
822     char buffer[32];
823     extern afs_int32 ubikSecIndex;
824     extern struct rx_securityClass *ubikSecClass;
825
826     for (i = 0; (addr = server->addr[i]) && (i < UBIK_MAX_INTERFACE_ADDR);
827          i++) {
828         conns[i] =
829             rx_NewConnection(addr, ubik_callPortal, DISK_SERVICE_ID,
830                              ubikSecClass, ubikSecIndex);
831
832         /* user requirement to use only the primary interface */
833         if (ubikPrimaryAddrOnly) {
834             i = 1;
835             break;
836         }
837     }
838     assert(i);                  /* at least one interface address for this server */
839
840     multi_Rx(conns, i) {
841         multi_DISK_Probe();
842         if (!multi_error) {     /* first success */
843             addr = server->addr[multi_i];       /* successful interface addr */
844
845             if (server->disk_rxcid)     /* destroy existing conn */
846                 rx_DestroyConnection(server->disk_rxcid);
847             if (server->vote_rxcid)
848                 rx_DestroyConnection(server->vote_rxcid);
849
850             /* make new connections */
851             server->disk_rxcid = conns[multi_i];
852             server->vote_rxcid = rx_NewConnection(addr, ubik_callPortal, VOTE_SERVICE_ID, ubikSecClass, ubikSecIndex);  /* for vote reqs */
853
854             connSuccess = conns[multi_i];
855             strcpy(buffer, (char *)afs_inet_ntoa(server->addr[0]));
856             ubik_print
857                 ("ubik:server %s is back up: will be contacted through %s\n",
858                  buffer, afs_inet_ntoa(addr));
859
860             multi_Abort;
861         }
862     } multi_End_Ignore;
863
864     /* Destroy all connections except the one on which we succeeded */
865     for (j = 0; j < i; j++)
866         if (conns[j] != connSuccess)
867             rx_DestroyConnection(conns[j]);
868
869     if (!connSuccess)
870         ubik_dprint("ubik:server %s still down\n",
871                     afs_inet_ntoa(server->addr[0]));
872
873     if (connSuccess)
874         return 0;               /* success */
875     else
876         return 1;               /* failure */
877 }