more-must-returns-20031207
[openafs.git] / src / ubik / recovery.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 RCSID
14     ("$Header$");
15
16 #include <sys/types.h>
17 #ifdef AFS_NT40_ENV
18 #include <winsock2.h>
19 #include <time.h>
20 #else
21 #include <sys/file.h>
22 #include <netinet/in.h>
23 #include <sys/time.h>
24 #endif
25 #include <assert.h>
26 #include <lock.h>
27 #ifdef HAVE_STRING_H
28 #include <string.h>
29 #else
30 #ifdef HAVE_STRINGS_H
31 #include <strings.h>
32 #endif
33 #endif
34 #include <rx/xdr.h>
35 #include <rx/rx.h>
36 #include <errno.h>
37 #include <afs/afsutil.h>
38
39 #define UBIK_INTERNALS
40 #include "ubik.h"
41 #include "ubik_int.h"
42
43 /* This module is responsible for determining when the system has
44  * recovered to the point that it can handle new transactions.  It
45  * replays logs, polls to determine the current dbase after a crash,
46  * and distributes the new database to the others.
47  */
48
49 /* The sync site associates a version number with each database.  It
50  * broadcasts the version associated with its current dbase in every
51  * one of its beacon messages.  When the sync site send a dbase to a
52  * server, it also sends the db's version.  A non-sync site server can
53  * tell if it has the right dbase version by simply comparing the
54  * version from the beacon message (uvote_dbVersion) with the version
55  * associated with the database (ubik_dbase->version).  The sync site
56  * itself simply has one counter to keep track of all of this (again
57  * ubik_dbase->version).
58  */
59
60 /* sync site: routine called when the sync site loses its quorum; this
61  * procedure is called "up" from the beacon package.  It resyncs the
62  * dbase and nudges the recovery daemon to try to propagate out the
63  * changes.  It also resets the recovery daemon's state, since
64  * recovery must potentially find a new dbase to propagate out.  This
65  * routine should not do anything with variables used by non-sync site
66  * servers.
67  */
68
69 /* if this flag is set, then ubik will use only the primary address 
70 ** ( the address specified in the CellServDB) to contact other 
71 ** ubik servers. Ubik recovery will not try opening connections 
72 ** to the alternate interface addresses. 
73 */
74 int ubikPrimaryAddrOnly;
75
76 int
77 urecovery_ResetState(void)
78 {
79     urecovery_state = 0;
80     LWP_NoYieldSignal(&urecovery_state);
81     return 0;
82 }
83
84 /* sync site: routine called when a non-sync site server goes down; restarts recovery
85  * process to send missing server the new db when it comes back up.
86  * This routine should not do anything with variables used by non-sync site servers. 
87  */
88 int
89 urecovery_LostServer(void)
90 {
91     LWP_NoYieldSignal(&urecovery_state);
92     return 0;
93 }
94
95 /* return true iff we have a current database (called by both sync
96  * sites and non-sync sites) How do we determine this?  If we're the
97  * sync site, we wait until recovery has finished fetching and
98  * re-labelling its dbase (it may still be trying to propagate it out
99  * to everyone else; that's THEIR problem).  If we're not the sync
100  * site, then we must have a dbase labelled with the right version,
101  * and we must have a currently-good sync site.
102  */
103 int
104 urecovery_AllBetter(register struct ubik_dbase *adbase, int areadAny)
105 {
106     register afs_int32 rcode;
107
108     ubik_dprint("allbetter checking\n");
109     rcode = 0;
110
111
112     if (areadAny) {
113         if (ubik_dbase->version.epoch > 1)
114             rcode = 1;          /* Happy with any good version of database */
115     }
116
117     /* Check if we're sync site and we've got the right data */
118     else if (ubeacon_AmSyncSite() && (urecovery_state & UBIK_RECHAVEDB)) {
119         rcode = 1;
120     }
121
122     /* next, check if we're aux site, and we've ever been sent the
123      * right data (note that if a dbase update fails, we won't think
124      * that the sync site is still the sync site, 'cause it won't talk
125      * to us until a timeout period has gone by.  When we recover, we
126      * leave this clear until we get a new dbase */
127     else if ((uvote_GetSyncSite() && (vcmp(ubik_dbVersion, ubik_dbase->version) == 0))) {       /* && order is important */
128         rcode = 1;
129     }
130
131     ubik_dprint("allbetter: returning %d\n", rcode);
132     return rcode;
133 }
134
135 /* abort all transactions on this database */
136 int
137 urecovery_AbortAll(struct ubik_dbase *adbase)
138 {
139     register struct ubik_trans *tt;
140     for (tt = adbase->activeTrans; tt; tt = tt->next) {
141         udisk_abort(tt);
142     }
143     return 0;
144 }
145
146 /* this routine aborts the current remote transaction, if any, if the tid is wrong */
147 int
148 urecovery_CheckTid(register struct ubik_tid *atid)
149 {
150     if (ubik_currentTrans) {
151         /* there is remote write trans, see if we match, see if this
152          * is a new transaction */
153         if (atid->epoch != ubik_currentTrans->tid.epoch
154             || atid->counter > ubik_currentTrans->tid.counter) {
155             /* don't match, abort it */
156             /* If the thread is not waiting for lock - ok to end it */
157 #if !defined(UBIK_PAUSE)
158             if (ubik_currentTrans->locktype != LOCKWAIT) {
159 #endif /* UBIK_PAUSE */
160                 udisk_end(ubik_currentTrans);
161 #if !defined(UBIK_PAUSE)
162             }
163 #endif /* UBIK_PAUSE */
164             ubik_currentTrans = (struct ubik_trans *)0;
165         }
166     }
167     return 0;
168 }
169
170 /* log format is defined here, and implicitly in disk.c
171  *
172  * 4 byte opcode, followed by parameters, each 4 bytes long.  All integers
173  * are in logged in network standard byte order, in case we want to move logs
174  * from machine-to-machine someday.
175  *
176  * Begin transaction: opcode
177  * Commit transaction: opcode, version (8 bytes)
178  * Truncate file: opcode, file number, length
179  * Abort transaction: opcode
180  * Write data: opcode, file, position, length, <length> data bytes
181  *
182  * A very simple routine, it just replays the log.  Note that this is a new-value only log, which
183  * implies that no uncommitted data is written to the dbase: one writes data to the log, including
184  * the commit record, then we allow data to be written through to the dbase.  In our particular
185  * implementation, once a transaction is done, we write out the pages to the database, so that
186  * our buffer package doesn't have to know about stable and uncommitted data in the memory buffers:
187  * any changed data while there is an uncommitted write transaction can be zapped during an
188  * abort and the remaining dbase on the disk is exactly the right dbase, without having to read
189  * the log.
190  */
191
192 /* replay logs */
193 static int
194 ReplayLog(register struct ubik_dbase *adbase)
195 {
196     afs_int32 opcode;
197     register afs_int32 code, tpos;
198     int logIsGood;
199     afs_int32 len, thisSize, tfile, filePos;
200     afs_int32 buffer[4];
201     afs_int32 syncFile = -1;
202     afs_int32 data[1024];
203
204     /* read the lock twice, once to see whether we have a transaction to deal
205      * with that committed, (theoretically, we should support more than one
206      * trans in the log at once, but not yet), and once replaying the
207      * transactions.  */
208     tpos = 0;
209     logIsGood = 0;
210     /* for now, assume that all ops in log pertain to one transaction; see if there's a commit */
211     while (1) {
212         code =
213             (*adbase->read) (adbase, LOGFILE, &opcode, tpos,
214                              sizeof(afs_int32));
215         if (code != sizeof(afs_int32))
216             break;
217         if (opcode == LOGNEW) {
218             /* handle begin trans */
219             tpos += sizeof(afs_int32);
220         } else if (opcode == LOGABORT)
221             break;
222         else if (opcode == LOGEND) {
223             logIsGood = 1;
224             break;
225         } else if (opcode == LOGTRUNCATE) {
226             tpos += 4;
227             code =
228                 (*adbase->read) (adbase, LOGFILE, buffer, tpos,
229                                  2 * sizeof(afs_int32));
230             if (code != 2 * sizeof(afs_int32))
231                 break;          /* premature eof or io error */
232             tpos += 2 * sizeof(afs_int32);
233         } else if (opcode == LOGDATA) {
234             tpos += 4;
235             code =
236                 (*adbase->read) (adbase, LOGFILE, buffer, tpos,
237                                  3 * sizeof(afs_int32));
238             if (code != 3 * sizeof(afs_int32))
239                 break;
240             /* otherwise, skip over the data bytes, too */
241             tpos += buffer[2] + 3 * sizeof(afs_int32);
242         } else {
243             ubik_dprint("corrupt log opcode (%d) at position %d\n", opcode,
244                         tpos);
245             break;              /* corrupt log! */
246         }
247     }
248     if (logIsGood) {
249         /* actually do the replay; log should go all the way through the commit record, since
250          * we just read it above. */
251         tpos = 0;
252         logIsGood = 0;
253         syncFile = -1;
254         while (1) {
255             code =
256                 (*adbase->read) (adbase, LOGFILE, &opcode, tpos,
257                                  sizeof(afs_int32));
258             if (code != sizeof(afs_int32))
259                 break;
260             if (opcode == LOGNEW) {
261                 /* handle begin trans */
262                 tpos += sizeof(afs_int32);
263             } else if (opcode == LOGABORT)
264                 panic("log abort\n");
265             else if (opcode == LOGEND) {
266                 tpos += 4;
267                 code =
268                     (*adbase->read) (adbase, LOGFILE, buffer, tpos,
269                                      2 * sizeof(afs_int32));
270                 if (code != 2 * sizeof(afs_int32))
271                     return UBADLOG;
272                 code = (*adbase->setlabel) (adbase, 0, buffer);
273                 if (code)
274                     return code;
275                 logIsGood = 1;
276                 break;          /* all done now */
277             } else if (opcode == LOGTRUNCATE) {
278                 tpos += 4;
279                 code =
280                     (*adbase->read) (adbase, LOGFILE, buffer, tpos,
281                                      2 * sizeof(afs_int32));
282                 if (code != 2 * sizeof(afs_int32))
283                     break;      /* premature eof or io error */
284                 tpos += 2 * sizeof(afs_int32);
285                 code =
286                     (*adbase->truncate) (adbase, ntohl(buffer[0]),
287                                          ntohl(buffer[1]));
288                 if (code)
289                     return code;
290             } else if (opcode == LOGDATA) {
291                 tpos += 4;
292                 code =
293                     (*adbase->read) (adbase, LOGFILE, buffer, tpos,
294                                      3 * sizeof(afs_int32));
295                 if (code != 3 * sizeof(afs_int32))
296                     break;
297                 tpos += 3 * sizeof(afs_int32);
298                 /* otherwise, skip over the data bytes, too */
299                 len = ntohl(buffer[2]); /* total number of bytes to copy */
300                 filePos = ntohl(buffer[1]);
301                 tfile = ntohl(buffer[0]);
302                 /* try to minimize file syncs */
303                 if (syncFile != tfile) {
304                     if (syncFile >= 0)
305                         code = (*adbase->sync) (adbase, syncFile);
306                     else
307                         code = 0;
308                     syncFile = tfile;
309                     if (code)
310                         return code;
311                 }
312                 while (len > 0) {
313                     thisSize = (len > sizeof(data) ? sizeof(data) : len);
314                     /* copy sizeof(data) buffer bytes at a time */
315                     code =
316                         (*adbase->read) (adbase, LOGFILE, data, tpos,
317                                          thisSize);
318                     if (code != thisSize)
319                         return UBADLOG;
320                     code =
321                         (*adbase->write) (adbase, tfile, data, filePos,
322                                           thisSize);
323                     if (code != thisSize)
324                         return UBADLOG;
325                     filePos += thisSize;
326                     tpos += thisSize;
327                     len -= thisSize;
328                 }
329             } else {
330                 ubik_dprint("corrupt log opcode (%d) at position %d\n",
331                             opcode, tpos);
332                 break;          /* corrupt log! */
333             }
334         }
335         if (logIsGood) {
336             if (syncFile >= 0)
337                 code = (*adbase->sync) (adbase, syncFile);
338             if (code)
339                 return code;
340         } else {
341             ubik_dprint("Log read error on pass 2\n");
342             return UBADLOG;
343         }
344     }
345
346     /* now truncate the log, we're done with it */
347     code = (*adbase->truncate) (adbase, LOGFILE, 0);
348     return code;
349 }
350
351 /* Called at initialization to figure out version of the dbase we really have.
352  * This routine is called after replaying the log; it reads the restored labels.
353  */
354 static int
355 InitializeDB(register struct ubik_dbase *adbase)
356 {
357     register afs_int32 code;
358
359     code = (*adbase->getlabel) (adbase, 0, &adbase->version);
360     if (code) {
361         /* try setting the label to a new value */
362         adbase->version.epoch = 1;      /* value for newly-initialized db */
363         adbase->version.counter = 1;
364         code = (*adbase->setlabel) (adbase, 0, &adbase->version);
365         if (code) {
366             /* failed, try to set it back */
367             adbase->version.epoch = 0;
368             adbase->version.counter = 0;
369             (*adbase->setlabel) (adbase, 0, &adbase->version);
370         }
371         LWP_NoYieldSignal(&adbase->version);
372     }
373     return 0;
374 }
375
376 /* initialize the local dbase
377  * We replay the logs and then read the resulting file to figure out what version we've really got.
378  */
379 int
380 urecovery_Initialize(register struct ubik_dbase *adbase)
381 {
382     register afs_int32 code;
383
384     code = ReplayLog(adbase);
385     if (code)
386         return code;
387     code = InitializeDB(adbase);
388     return code;
389 }
390
391 /* Main interaction loop for the recovery manager
392  * The recovery light-weight process only runs when you're the
393  * synchronization site.  It performs the following tasks, if and only
394  * if the prerequisite tasks have been performed successfully (it
395  * keeps track of which ones have been performed in its bit map,
396  * urecovery_state).
397  *
398  * First, it is responsible for probing that all servers are up.  This
399  * is the only operation that must be performed even if this is not
400  * yet the sync site, since otherwise this site may not notice that
401  * enough other machines are running to even elect this guy to be the
402  * sync site.
403  *
404  * After that, the recovery process does nothing until the beacon and
405  * voting modules manage to get this site elected sync site.
406  *
407  * After becoming sync site, recovery first attempts to find the best
408  * database available in the network (it must do this in order to
409  * ensure finding the latest committed data).  After finding the right
410  * database, it must fetch this dbase to the sync site.
411  *
412  * After fetching the dbase, it relabels it with a new version number,
413  * to ensure that everyone recognizes this dbase as the most recent
414  * dbase.
415  *
416  * One the dbase has been relabelled, this machine can start handling
417  * requests.  However, the recovery module still has one more task:
418  * propagating the dbase out to everyone who is up in the network.
419  */
420 int
421 urecovery_Interact(void)
422 {
423     afs_int32 code, tcode;
424     struct ubik_server *bestServer = NULL;
425     struct ubik_server *ts;
426     int dbok, doingRPC, now;
427     afs_int32 lastProbeTime, lastDBVCheck;
428     /* if we're the sync site, the best db version we've found yet */
429     static struct ubik_version bestDBVersion;
430     struct ubik_version tversion;
431     struct timeval tv;
432     int length, tlen, offset, file, nbytes;
433     struct rx_call *rxcall;
434     char tbuffer[256];
435     struct ubik_stat ubikstat;
436     struct in_addr inAddr;
437
438     /* otherwise, begin interaction */
439     urecovery_state = 0;
440     lastProbeTime = 0;
441     lastDBVCheck = 0;
442     while (1) {
443         /* Run through this loop every 4 seconds */
444         tv.tv_sec = 4;
445         tv.tv_usec = 0;
446         IOMGR_Select(0, 0, 0, 0, &tv);
447
448         ubik_dprint("recovery running in state %x\n", urecovery_state);
449
450         /* Every 30 seconds, check all the down servers and mark them
451          * as up if they respond. When a server comes up or found to
452          * not be current, then re-find the the best database and 
453          * propogate it.
454          */
455         if ((now = FT_ApproxTime()) > 30 + lastProbeTime) {
456             for (ts = ubik_servers, doingRPC = 0; ts; ts = ts->next) {
457                 if (!ts->up) {
458                     doingRPC = 1;
459                     code = DoProbe(ts);
460                     if (code == 0) {
461                         ts->up = 1;
462                         urecovery_state &= ~UBIK_RECFOUNDDB;
463                     }
464                 } else if (!ts->currentDB) {
465                     urecovery_state &= ~UBIK_RECFOUNDDB;
466                 }
467             }
468             if (doingRPC)
469                 now = FT_ApproxTime();
470             lastProbeTime = now;
471         }
472
473         /* Mark whether we are the sync site */
474         if (!ubeacon_AmSyncSite()) {
475             urecovery_state &= ~UBIK_RECSYNCSITE;
476             continue;           /* nothing to do */
477         }
478         urecovery_state |= UBIK_RECSYNCSITE;
479
480         /* If a server has just come up or if we have not found the 
481          * most current database, then go find the most current db.
482          */
483         if (!(urecovery_state & UBIK_RECFOUNDDB)) {
484             bestServer = (struct ubik_server *)0;
485             bestDBVersion.epoch = 0;
486             bestDBVersion.counter = 0;
487             for (ts = ubik_servers; ts; ts = ts->next) {
488                 if (!ts->up)
489                     continue;   /* don't bother with these guys */
490                 if (ts->isClone)
491                     continue;
492                 code = DISK_GetVersion(ts->disk_rxcid, &ts->version);
493                 if (code == 0) {
494                     /* perhaps this is the best version */
495                     if (vcmp(ts->version, bestDBVersion) > 0) {
496                         /* new best version */
497                         bestDBVersion = ts->version;
498                         bestServer = ts;
499                     }
500                 }
501             }
502             /* take into consideration our version. Remember if we,
503              * the sync site, have the best version. Also note that
504              * we may need to send the best version out.
505              */
506             if (vcmp(ubik_dbase->version, bestDBVersion) >= 0) {
507                 bestDBVersion = ubik_dbase->version;
508                 bestServer = (struct ubik_server *)0;
509                 urecovery_state |= UBIK_RECHAVEDB;
510             } else {
511                 /* Clear the flag only when we know we have to retrieve
512                  * the db. Because urecovery_AllBetter() looks at it.
513                  */
514                 urecovery_state &= ~UBIK_RECHAVEDB;
515             }
516             lastDBVCheck = FT_ApproxTime();
517             urecovery_state |= UBIK_RECFOUNDDB;
518             urecovery_state &= ~UBIK_RECSENTDB;
519         }
520 #if defined(UBIK_PAUSE)
521         /* it's not possible for UBIK_RECFOUNDDB not to be set here.
522          * However, we might have lost UBIK_RECSYNCSITE, and that
523          * IS important.
524          */
525         if (!(urecovery_state & UBIK_RECSYNCSITE))
526             continue;           /* lost sync */
527 #else
528         if (!(urecovery_state & UBIK_RECFOUNDDB))
529             continue;           /* not ready */
530 #endif /* UBIK_PAUSE */
531
532         /* If we, the sync site, do not have the best db version, then
533          * go and get it from the server that does.
534          */
535         if ((urecovery_state & UBIK_RECHAVEDB) || !bestServer) {
536             urecovery_state |= UBIK_RECHAVEDB;
537         } else {
538             /* we don't have the best version; we should fetch it. */
539 #if defined(UBIK_PAUSE)
540             DBHOLD(ubik_dbase);
541 #else
542             ObtainWriteLock(&ubik_dbase->versionLock);
543 #endif /* UBIK_PAUSE */
544             urecovery_AbortAll(ubik_dbase);
545
546             /* Rx code to do the Bulk fetch */
547             file = 0;
548             offset = 0;
549             rxcall = rx_NewCall(bestServer->disk_rxcid);
550
551             ubik_print("Ubik: Synchronize database with server %s\n",
552                        afs_inet_ntoa(bestServer->addr[0]));
553
554             code = StartDISK_GetFile(rxcall, file);
555             if (code) {
556                 ubik_dprint("StartDiskGetFile failed=%d\n", code);
557                 goto FetchEndCall;
558             }
559             nbytes = rx_Read(rxcall, &length, sizeof(afs_int32));
560             length = ntohl(length);
561             if (nbytes != sizeof(afs_int32)) {
562                 ubik_dprint("Rx-read length error=%d\n", code = BULK_ERROR);
563                 code = EIO;
564                 goto FetchEndCall;
565             }
566
567             /* Truncate the file firest */
568             code = (*ubik_dbase->truncate) (ubik_dbase, file, 0);
569             if (code) {
570                 ubik_dprint("truncate io error=%d\n", code);
571                 goto FetchEndCall;
572             }
573
574             /* give invalid label during file transit */
575             tversion.epoch = 0;
576             tversion.counter = 0;
577             code = (*ubik_dbase->setlabel) (ubik_dbase, file, &tversion);
578             if (code) {
579                 ubik_dprint("setlabel io error=%d\n", code);
580                 goto FetchEndCall;
581             }
582
583             while (length > 0) {
584                 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
585                 nbytes = rx_Read(rxcall, tbuffer, tlen);
586                 if (nbytes != tlen) {
587                     ubik_dprint("Rx-read bulk error=%d\n", code = BULK_ERROR);
588                     code = EIO;
589                     goto FetchEndCall;
590                 }
591                 nbytes =
592                     (*ubik_dbase->write) (ubik_dbase, file, tbuffer, offset,
593                                           tlen);
594                 if (nbytes != tlen) {
595                     code = UIOERROR;
596                     goto FetchEndCall;
597                 }
598                 offset += tlen;
599                 length -= tlen;
600             }
601             code = EndDISK_GetFile(rxcall, &tversion);
602           FetchEndCall:
603             tcode = rx_EndCall(rxcall, code);
604             if (!code)
605                 code = tcode;
606             if (!code) {
607                 /* we got a new file, set up its header */
608                 urecovery_state |= UBIK_RECHAVEDB;
609                 memcpy(&ubik_dbase->version, &tversion,
610                        sizeof(struct ubik_version));
611                 (*ubik_dbase->sync) (ubik_dbase, 0);    /* get data out first */
612                 /* after data is good, sync disk with correct label */
613                 code =
614                     (*ubik_dbase->setlabel) (ubik_dbase, 0,
615                                              &ubik_dbase->version);
616             }
617             if (code) {
618                 ubik_dbase->version.epoch = 0;
619                 ubik_dbase->version.counter = 0;
620                 ubik_print("Ubik: Synchronize database failed (error = %d)\n",
621                            code);
622             } else {
623                 ubik_print("Ubik: Synchronize database completed\n");
624             }
625             udisk_Invalidate(ubik_dbase, 0);    /* data has changed */
626             LWP_NoYieldSignal(&ubik_dbase->version);
627 #if defined(UBIK_PAUSE)
628             DBRELE(ubik_dbase);
629 #else
630             ReleaseWriteLock(&ubik_dbase->versionLock);
631 #endif /* UBIK_PAUSE */
632         }
633 #if defined(UBIK_PAUSE)
634         if (!(urecovery_state & UBIK_RECSYNCSITE))
635             continue;           /* lost sync */
636 #endif /* UBIK_PAUSE */
637         if (!(urecovery_state & UBIK_RECHAVEDB))
638             continue;           /* not ready */
639
640         /* If the database was newly initialized, then when we establish quorum, write
641          * a new label. This allows urecovery_AllBetter() to allow access for reads.
642          * Setting it to 2 also allows another site to come along with a newer
643          * database and overwrite this one.
644          */
645         if (ubik_dbase->version.epoch == 1) {
646 #if defined(UBIK_PAUSE)
647             DBHOLD(ubik_dbase);
648 #else
649             ObtainWriteLock(&ubik_dbase->versionLock);
650 #endif /* UBIK_PAUSE */
651             urecovery_AbortAll(ubik_dbase);
652             ubik_epochTime = 2;
653             ubik_dbase->version.epoch = ubik_epochTime;
654             ubik_dbase->version.counter = 1;
655             code =
656                 (*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
657             udisk_Invalidate(ubik_dbase, 0);    /* data may have changed */
658             LWP_NoYieldSignal(&ubik_dbase->version);
659 #if defined(UBIK_PAUSE)
660             DBRELE(ubik_dbase);
661 #else
662             ReleaseWriteLock(&ubik_dbase->versionLock);
663 #endif /* UBIK_PAUSE */
664         }
665
666         /* Check the other sites and send the database to them if they
667          * do not have the current db.
668          */
669         if (!(urecovery_state & UBIK_RECSENTDB)) {
670             /* now propagate out new version to everyone else */
671             dbok = 1;           /* start off assuming they all worked */
672
673 #if defined(UBIK_PAUSE)
674             DBHOLD(ubik_dbase);
675 #else
676             ObtainWriteLock(&ubik_dbase->versionLock);
677 #endif /* UBIK_PAUSE */
678             /*
679              * Check if a write transaction is in progress. We can't send the
680              * db when a write is in progress here because the db would be
681              * obsolete as soon as it goes there. Also, ops after the begin
682              * trans would reach the recepient and wouldn't find a transaction
683              * pending there.  Frankly, I don't think it's possible to get past
684              * the write-lock above if there is a write transaction in progress,
685              * but then, it won't hurt to check, will it?
686              */
687             if (ubik_dbase->flags & DBWRITING) {
688                 struct timeval tv;
689                 int safety = 0;
690                 tv.tv_sec = 0;
691                 tv.tv_usec = 50000;
692                 while ((ubik_dbase->flags & DBWRITING) && (safety < 500)) {
693 #if defined(UBIK_PAUSE)
694                     DBRELE(ubik_dbase);
695 #else
696                     ReleaseWriteLock(&ubik_dbase->versionLock);
697 #endif /* UBIK_PAUSE */
698                     /* sleep for a little while */
699                     IOMGR_Select(0, 0, 0, 0, &tv);
700                     tv.tv_usec += 10000;
701                     safety++;
702 #if defined(UBIK_PAUSE)
703                     DBHOLD(ubik_dbase);
704 #else
705                     ObtainWriteLock(&ubik_dbase->versionLock);
706 #endif /* UBIK_PAUSE */
707                 }
708             }
709
710             for (ts = ubik_servers; ts; ts = ts->next) {
711                 inAddr.s_addr = ts->addr[0];
712                 if (!ts->up) {
713                     ubik_dprint("recovery cannot send version to %s\n",
714                                 afs_inet_ntoa(inAddr.s_addr));
715                     dbok = 0;
716                     continue;
717                 }
718                 ubik_dprint("recovery sending version to %s\n",
719                             afs_inet_ntoa(inAddr.s_addr));
720                 if (vcmp(ts->version, ubik_dbase->version) != 0) {
721                     ubik_dprint("recovery stating local database\n");
722
723                     /* Rx code to do the Bulk Store */
724                     code = (*ubik_dbase->stat) (ubik_dbase, 0, &ubikstat);
725                     if (!code) {
726                         length = ubikstat.size;
727                         file = offset = 0;
728                         rxcall = rx_NewCall(ts->disk_rxcid);
729                         code =
730                             StartDISK_SendFile(rxcall, file, length,
731                                                &ubik_dbase->version);
732                         if (code) {
733                             ubik_dprint("StartDiskSendFile failed=%d\n",
734                                         code);
735                             goto StoreEndCall;
736                         }
737                         while (length > 0) {
738                             tlen =
739                                 (length >
740                                  sizeof(tbuffer) ? sizeof(tbuffer) : length);
741                             nbytes =
742                                 (*ubik_dbase->read) (ubik_dbase, file,
743                                                      tbuffer, offset, tlen);
744                             if (nbytes != tlen) {
745                                 ubik_dprint("Local disk read error=%d\n",
746                                             code = UIOERROR);
747                                 goto StoreEndCall;
748                             }
749                             nbytes = rx_Write(rxcall, tbuffer, tlen);
750                             if (nbytes != tlen) {
751                                 ubik_dprint("Rx-write bulk error=%d\n", code =
752                                             BULK_ERROR);
753                                 goto StoreEndCall;
754                             }
755                             offset += tlen;
756                             length -= tlen;
757                         }
758                         code = EndDISK_SendFile(rxcall);
759                       StoreEndCall:
760                         code = rx_EndCall(rxcall, code);
761                     }
762                     if (code == 0) {
763                         /* we set a new file, process its header */
764                         ts->version = ubik_dbase->version;
765                         ts->currentDB = 1;
766                     } else
767                         dbok = 0;
768                 } else {
769                     /* mark file up to date */
770                     ts->currentDB = 1;
771                 }
772             }
773 #if defined(UBIK_PAUSE)
774             DBRELE(ubik_dbase);
775 #else
776             ReleaseWriteLock(&ubik_dbase->versionLock);
777 #endif /* UBIK_PAUSE */
778             if (dbok)
779                 urecovery_state |= UBIK_RECSENTDB;
780         }
781     }
782 }
783
784 /*
785 ** send a Probe to all the network address of this server 
786 ** Return 0  if success, else return 1
787 */
788 int
789 DoProbe(struct ubik_server *server)
790 {
791     struct rx_connection *conns[UBIK_MAX_INTERFACE_ADDR];
792     struct rx_connection *connSuccess = 0;
793     int i, j;
794     afs_uint32 addr;
795     char buffer[32];
796     extern afs_int32 ubikSecIndex;
797     extern struct rx_securityClass *ubikSecClass;
798
799     for (i = 0; (addr = server->addr[i]) && (i < UBIK_MAX_INTERFACE_ADDR);
800          i++) {
801         conns[i] =
802             rx_NewConnection(addr, ubik_callPortal, DISK_SERVICE_ID,
803                              ubikSecClass, ubikSecIndex);
804
805         /* user requirement to use only the primary interface */
806         if (ubikPrimaryAddrOnly) {
807             i = 1;
808             break;
809         }
810     }
811     assert(i);                  /* at least one interface address for this server */
812
813     multi_Rx(conns, i) {
814         multi_DISK_Probe();
815         if (!multi_error) {     /* first success */
816             addr = server->addr[multi_i];       /* successful interface addr */
817
818             if (server->disk_rxcid)     /* destroy existing conn */
819                 rx_DestroyConnection(server->disk_rxcid);
820             if (server->vote_rxcid)
821                 rx_DestroyConnection(server->vote_rxcid);
822
823             /* make new connections */
824             server->disk_rxcid = conns[multi_i];
825             server->vote_rxcid = rx_NewConnection(addr, ubik_callPortal, VOTE_SERVICE_ID, ubikSecClass, ubikSecIndex);  /* for vote reqs */
826
827             connSuccess = conns[multi_i];
828             strcpy(buffer, (char *)afs_inet_ntoa(server->addr[0]));
829             ubik_print
830                 ("ubik:server %s is back up: will be contacted through %s\n",
831                  buffer, afs_inet_ntoa(addr));
832
833             multi_Abort;
834         }
835     } multi_End_Ignore;
836
837     /* Destroy all connections except the one on which we succeeded */
838     for (j = 0; j < i; j++)
839         if (conns[j] != connSuccess)
840             rx_DestroyConnection(conns[j]);
841
842     if (!connSuccess)
843         ubik_dprint("ubik:server %s still down\n",
844                     afs_inet_ntoa(server->addr[0]));
845
846     if (connSuccess)
847         return 0;               /* success */
848     else
849         return 1;               /* failure */
850 }