ubik-warning-cleanup-20011005
[openafs.git] / src / ubik / recovery.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 RCSID("$Header$");
14
15 #include <sys/types.h>
16 #ifdef AFS_NT40_ENV
17 #include <winsock2.h>
18 #include <time.h>
19 #else
20 #include <sys/file.h>
21 #include <netinet/in.h>
22 #include <sys/time.h>
23 #endif
24 #include <assert.h>
25 #include <lock.h>
26 #ifdef HAVE_STRING_H
27 #include <string.h>
28 #else
29 #ifdef HAVE_STRINGS_H
30 #include <strings.h>
31 #endif
32 #endif
33 #include <rx/xdr.h>
34 #include <rx/rx.h>
35 #include <errno.h>
36 #include <afs/afsutil.h>
37
38 #define UBIK_INTERNALS
39 #include "ubik.h"
40 #include "ubik_int.h"
41
42 /* This module is responsible for determining when the system has
43  * recovered to the point that it can handle new transactions.  It
44  * replays logs, polls to determine the current dbase after a crash,
45  * and distributes the new database to the others.
46  */
47
48 /* The sync site associates a version number with each database.  It
49  * broadcasts the version associated with its current dbase in every
50  * one of its beacon messages.  When the sync site send a dbase to a
51  * server, it also sends the db's version.  A non-sync site server can
52  * tell if it has the right dbase version by simply comparing the
53  * version from the beacon message (uvote_dbVersion) with the version
54  * associated with the database (ubik_dbase->version).  The sync site
55  * itself simply has one counter to keep track of all of this (again
56  * ubik_dbase->version).
57  */
58
59 /* sync site: routine called when the sync site loses its quorum; this
60  * procedure is called "up" from the beacon package.  It resyncs the
61  * dbase and nudges the recovery daemon to try to propagate out the
62  * changes.  It also resets the recovery daemon's state, since
63  * recovery must potentially find a new dbase to propagate out.  This
64  * routine should not do anything with variables used by non-sync site
65  * servers.
66  */ 
67
68 /* if this flag is set, then ubik will use only the primary address 
69 ** ( the address specified in the CellServDB) to contact other 
70 ** ubik servers. Ubik recovery will not try opening connections 
71 ** to the alternate interface addresses. 
72 */
73 int ubikPrimaryAddrOnly;
74
75 urecovery_ResetState() {
76     urecovery_state = 0;
77     LWP_NoYieldSignal(&urecovery_state);
78     return 0;
79 }
80
81 /* sync site: routine called when a non-sync site server goes down; restarts recovery
82  * process to send missing server the new db when it comes back up.
83  * This routine should not do anything with variables used by non-sync site servers. 
84  */
85 urecovery_LostServer() {
86     LWP_NoYieldSignal(&urecovery_state);
87     return 0;
88 }
89
90 /* return true iff we have a current database (called by both sync
91  * sites and non-sync sites) How do we determine this?  If we're the
92  * sync site, we wait until recovery has finished fetching and
93  * re-labelling its dbase (it may still be trying to propagate it out
94  * to everyone else; that's THEIR problem).  If we're not the sync
95  * site, then we must have a dbase labelled with the right version,
96  * and we must have a currently-good sync site.
97  */
98 urecovery_AllBetter(adbase, areadAny)
99     int areadAny;
100     register struct ubik_dbase *adbase; {
101     register afs_int32 rcode;
102
103     ubik_dprint("allbetter checking\n");
104     rcode = 0;
105     
106     
107     if (areadAny) {
108       if (ubik_dbase->version.epoch > 1)
109           rcode = 1;                      /* Happy with any good version of database */
110     }
111
112     /* Check if we're sync site and we've got the right data */
113     else if (ubeacon_AmSyncSite() && (urecovery_state & UBIK_RECHAVEDB)) {
114         rcode = 1;
115     }
116
117     /* next, check if we're aux site, and we've ever been sent the
118      * right data (note that if a dbase update fails, we won't think
119      * that the sync site is still the sync site, 'cause it won't talk
120      * to us until a timeout period has gone by.  When we recover, we
121      * leave this clear until we get a new dbase */
122     else if ( (uvote_GetSyncSite() &&
123               (vcmp(ubik_dbVersion, ubik_dbase->version) == 0)) ) { /* && order is important */
124         rcode = 1;
125     }
126
127     ubik_dprint("allbetter: returning %d\n", rcode);
128     return rcode;
129 }
130
131 /* abort all transactions on this database */
132 urecovery_AbortAll(adbase)
133 struct ubik_dbase *adbase; {
134     register struct ubik_trans *tt;
135     for(tt = adbase->activeTrans; tt; tt=tt->next) {
136         udisk_abort(tt);
137     }
138     return 0;
139 }
140
141 /* this routine aborts the current remote transaction, if any, if the tid is wrong */
142 urecovery_CheckTid(atid)
143     register struct ubik_tid *atid; {
144     if (ubik_currentTrans) {
145         /* there is remote write trans, see if we match, see if this
146          * is a new transaction */
147         if (atid->epoch != ubik_currentTrans->tid.epoch || atid->counter > ubik_currentTrans->tid.counter) {
148             /* don't match, abort it */
149             /* If the thread is not waiting for lock - ok to end it */
150             if (ubik_currentTrans->locktype != LOCKWAIT) {
151                udisk_end(ubik_currentTrans);
152             }
153             ubik_currentTrans = (struct ubik_trans *) 0;
154         }
155     }
156 }
157
158 /* log format is defined here, and implicitly in disk.c
159  *
160  * 4 byte opcode, followed by parameters, each 4 bytes long.  All integers
161  * are in logged in network standard byte order, in case we want to move logs
162  * from machine-to-machine someday.
163  *
164  * Begin transaction: opcode
165  * Commit transaction: opcode, version (8 bytes)
166  * Truncate file: opcode, file number, length
167  * Abort transaction: opcode
168  * Write data: opcode, file, position, length, <length> data bytes
169  *
170  * A very simple routine, it just replays the log.  Note that this is a new-value only log, which
171  * implies that no uncommitted data is written to the dbase: one writes data to the log, including
172  * the commit record, then we allow data to be written through to the dbase.  In our particular
173  * implementation, once a transaction is done, we write out the pages to the database, so that
174  * our buffer package doesn't have to know about stable and uncommitted data in the memory buffers:
175  * any changed data while there is an uncommitted write transaction can be zapped during an
176  * abort and the remaining dbase on the disk is exactly the right dbase, without having to read
177  * the log.
178  */
179
180 /* replay logs */
181 static ReplayLog(adbase)
182     register struct ubik_dbase *adbase; {
183     afs_int32 opcode;
184     register afs_int32 code, tpos;
185     int logIsGood;
186     afs_int32 len, thisSize, tfile, filePos;
187     afs_int32 buffer[4];
188     afs_int32 syncFile = -1;
189     afs_int32 data[1024];
190
191     /* read the lock twice, once to see whether we have a transaction to deal
192         with that committed, (theoretically, we should support more than one
193         trans in the log at once, but not yet), and once replaying the
194         transactions.  */
195     tpos = 0;
196     logIsGood = 0;
197     /* for now, assume that all ops in log pertain to one transaction; see if there's a commit */
198     while (1) {
199         code = (*adbase->read)(adbase, LOGFILE, &opcode, tpos, sizeof(afs_int32));
200         if (code != sizeof(afs_int32)) break;
201         if (opcode == LOGNEW) {
202             /* handle begin trans */
203             tpos += sizeof(afs_int32);
204         }
205         else if (opcode == LOGABORT) break;
206         else if (opcode == LOGEND) {
207             logIsGood = 1;
208             break;
209         }
210         else if (opcode == LOGTRUNCATE) {
211             tpos += 4;
212             code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 2*sizeof(afs_int32));
213             if (code != 2*sizeof(afs_int32))    break;  /* premature eof or io error */
214             tpos += 2*sizeof(afs_int32);
215         }
216         else if (opcode == LOGDATA) {
217             tpos += 4;
218             code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 3*sizeof(afs_int32));
219             if (code != 3*sizeof(afs_int32)) break;
220             /* otherwise, skip over the data bytes, too */
221             tpos += buffer[2] + 3*sizeof(afs_int32);
222         }
223         else {
224             ubik_dprint("corrupt log opcode (%d) at position %d\n", opcode, tpos);
225             break;      /* corrupt log! */
226         }
227     }
228     if (logIsGood) {
229         /* actually do the replay; log should go all the way through the commit record, since
230             we just read it above. */
231         tpos = 0;
232         logIsGood = 0;
233         syncFile = -1;
234         while (1) {
235             code = (*adbase->read)(adbase, LOGFILE, &opcode, tpos, sizeof(afs_int32));
236             if (code != sizeof(afs_int32)) break;
237             if (opcode == LOGNEW) {
238                 /* handle begin trans */
239                 tpos += sizeof(afs_int32);
240             }
241             else if (opcode == LOGABORT) panic("log abort\n");
242             else if (opcode == LOGEND) {
243                 tpos += 4;
244                 code = (*adbase->read) (adbase, LOGFILE, buffer, tpos, 2*sizeof(afs_int32));
245                 if (code != 2*sizeof(afs_int32)) return UBADLOG;
246                 code = (*adbase->setlabel) (adbase, 0, buffer);
247                 if (code) return code;
248                 logIsGood = 1;
249                 break;      /* all done now */
250             }
251             else if (opcode == LOGTRUNCATE) {
252                 tpos += 4;
253                 code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 2*sizeof(afs_int32));
254                 if (code != 2*sizeof(afs_int32)) break; /* premature eof or io error */
255                 tpos += 2*sizeof(afs_int32);
256                 code = (*adbase->truncate) (adbase, ntohl(buffer[0]), ntohl(buffer[1]));
257                 if (code) return code;
258             }
259             else if (opcode == LOGDATA) {
260                 tpos += 4;
261                 code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 3*sizeof(afs_int32));
262                 if (code != 3*sizeof(afs_int32)) break;
263                 tpos += 3*sizeof(afs_int32);
264                 /* otherwise, skip over the data bytes, too */
265                 len = ntohl(buffer[2]);     /* total number of bytes to copy */
266                 filePos = ntohl(buffer[1]);
267                 tfile = ntohl(buffer[0]);
268                 /* try to minimize file syncs */
269                 if (syncFile != tfile) {
270                     if (syncFile >= 0) code = (*adbase->sync)(adbase, syncFile);
271                     else code = 0;
272                     syncFile = tfile;
273                     if (code) return code;
274                 }
275                 while (len > 0) {
276                     thisSize = (len > sizeof(data)? sizeof(data) : len);
277                     /* copy sizeof(data) buffer bytes at a time */
278                     code = (*adbase->read)(adbase, LOGFILE, data, tpos, thisSize);
279                     if (code != thisSize) return UBADLOG;
280                     code = (*adbase->write)(adbase, tfile, data, filePos, thisSize);
281                     if (code != thisSize) return UBADLOG;
282                     filePos += thisSize;
283                     tpos += thisSize;
284                     len -= thisSize;
285                 }
286             }
287             else {
288                 ubik_dprint("corrupt log opcode (%d) at position %d\n", opcode, tpos);
289                 break;  /* corrupt log! */
290             }
291         }
292         if (logIsGood) {
293             if (syncFile >= 0) code = (*adbase->sync)(adbase, syncFile);
294             if (code) return code;
295         }
296         else {
297             ubik_dprint("Log read error on pass 2\n");
298             return UBADLOG;
299         }
300     }
301
302     /* now truncate the log, we're done with it */
303     code = (*adbase->truncate)(adbase, LOGFILE, 0);
304     return code;
305 }
306
307 /* Called at initialization to figure out version of the dbase we really have.
308  * This routine is called after replaying the log; it reads the restored labels.
309  */
310 static InitializeDB(adbase)
311     register struct ubik_dbase *adbase; {
312     register afs_int32 code;
313     
314     code = (*adbase->getlabel)(adbase, 0, &adbase->version);
315     if (code) {
316         /* try setting the label to a new value */
317         adbase->version.epoch = 1;      /* value for newly-initialized db */
318         adbase->version.counter = 1;
319         code = (*adbase->setlabel) (adbase, 0, &adbase->version);
320         if (code) {
321             /* failed, try to set it back */
322             adbase->version.epoch = 0;
323             adbase->version.counter = 0;
324             (*adbase->setlabel) (adbase, 0, &adbase->version);
325         }
326         LWP_NoYieldSignal(&adbase->version);
327     }
328     return 0;
329 }
330
331 /* initialize the local dbase
332  * We replay the logs and then read the resulting file to figure out what version we've really got.
333  */
334 urecovery_Initialize(adbase)
335     register struct ubik_dbase *adbase; {
336     register afs_int32 code;
337
338     code = ReplayLog(adbase);
339     if (code) return code;
340     code = InitializeDB(adbase);
341     return code;
342 }
343
344 /* Main interaction loop for the recovery manager
345  * The recovery light-weight process only runs when you're the
346  * synchronization site.  It performs the following tasks, if and only
347  * if the prerequisite tasks have been performed successfully (it
348  * keeps track of which ones have been performed in its bit map,
349  * urecovery_state).
350  *
351  * First, it is responsible for probing that all servers are up.  This
352  * is the only operation that must be performed even if this is not
353  * yet the sync site, since otherwise this site may not notice that
354  * enough other machines are running to even elect this guy to be the
355  * sync site.
356  *
357  * After that, the recovery process does nothing until the beacon and
358  * voting modules manage to get this site elected sync site.
359  *
360  * After becoming sync site, recovery first attempts to find the best
361  * database available in the network (it must do this in order to
362  * ensure finding the latest committed data).  After finding the right
363  * database, it must fetch this dbase to the sync site.
364  *
365  * After fetching the dbase, it relabels it with a new version number,
366  * to ensure that everyone recognizes this dbase as the most recent
367  * dbase.
368  *
369  * One the dbase has been relabelled, this machine can start handling
370  * requests.  However, the recovery module still has one more task:
371  * propagating the dbase out to everyone who is up in the network.
372  */
373 urecovery_Interact() {
374     afs_int32 code, tcode;
375     struct ubik_server *bestServer = NULL;
376     struct ubik_server *ts;
377     int dbok, doingRPC, now;
378     afs_int32 lastProbeTime, lastDBVCheck;
379     /* if we're the sync site, the best db version we've found yet */
380     static struct ubik_version bestDBVersion;
381     struct ubik_version tversion;
382     struct timeval tv;
383     int length, tlen, offset, file, nbytes;
384     struct rx_call *rxcall;
385     char tbuffer[256];
386     struct ubik_stat ubikstat;
387     struct in_addr inAddr;
388
389     /* otherwise, begin interaction */
390     urecovery_state = 0;
391     lastProbeTime = 0;
392     lastDBVCheck = 0;
393     while (1) {
394         /* Run through this loop every 4 seconds */
395         tv.tv_sec = 4;
396         tv.tv_usec = 0;
397         IOMGR_Select(0, 0, 0, 0, &tv);
398
399         ubik_dprint("recovery running in state %x\n", urecovery_state);
400
401         /* Every 30 seconds, check all the down servers and mark them
402          * as up if they respond. When a server comes up or found to
403          * not be current, then re-find the the best database and 
404          * propogate it.
405          */
406         if ( (now = FT_ApproxTime()) > 30 + lastProbeTime) {
407             for (ts=ubik_servers,doingRPC=0; ts; ts=ts->next) {
408                 if (!ts->up) {
409                     doingRPC = 1;
410                     code = DoProbe(ts); 
411                     if (code == 0) {
412                        ts->up = 1;
413                        urecovery_state &= ~UBIK_RECFOUNDDB;
414                     }
415                 } else if (!ts->currentDB) {
416                     urecovery_state &= ~UBIK_RECFOUNDDB;
417                 }
418             }
419             if (doingRPC) 
420                 now = FT_ApproxTime();
421             lastProbeTime = now;
422         }
423
424         /* Mark whether we are the sync site */
425         if (!ubeacon_AmSyncSite()) {
426             urecovery_state &= ~UBIK_RECSYNCSITE;
427             continue;           /* nothing to do */
428         }
429         urecovery_state |= UBIK_RECSYNCSITE;
430         
431         /* If a server has just come up or if we have not found the 
432          * most current database, then go find the most current db.
433          */
434         if (!(urecovery_state & UBIK_RECFOUNDDB)) {
435             bestServer = (struct ubik_server *) 0;
436             bestDBVersion.epoch = 0;
437             bestDBVersion.counter = 0;
438             for(ts=ubik_servers; ts; ts=ts->next) {
439                 if (!ts->up) continue;  /* don't bother with these guys */
440                 if (ts->isClone) continue;
441                 code = DISK_GetVersion(ts->disk_rxcid, &ts->version);
442                 if (code == 0) {
443                     /* perhaps this is the best version */
444                     if (vcmp(ts->version, bestDBVersion) > 0) {
445                         /* new best version */
446                         bestDBVersion = ts->version;
447                         bestServer = ts;
448                     }
449                 }
450             }
451             /* take into consideration our version. Remember if we,
452              * the sync site, have the best version. Also note that
453              * we may need to send the best version out.
454              */
455             if (vcmp(ubik_dbase->version, bestDBVersion) >= 0) {
456                 bestDBVersion = ubik_dbase->version;
457                 bestServer = (struct ubik_server *) 0;
458                urecovery_state |= UBIK_RECHAVEDB;
459             } else {
460                /* Clear the flag only when we know we have to retrieve
461                 * the db. Because urecovery_AllBetter() looks at it.
462                 */
463                urecovery_state &= ~UBIK_RECHAVEDB;
464             }
465             lastDBVCheck = FT_ApproxTime();
466             urecovery_state |=  UBIK_RECFOUNDDB;
467             urecovery_state &= ~UBIK_RECSENTDB;
468         }
469         if (!(urecovery_state & UBIK_RECFOUNDDB)) continue; /* not ready */
470
471         /* If we, the sync site, do not have the best db version, then
472          * go and get it from the server that does.
473          */
474         if ((urecovery_state & UBIK_RECHAVEDB) || !bestServer) {
475            urecovery_state |= UBIK_RECHAVEDB;
476         } else {
477            /* we don't have the best version; we should fetch it. */
478            ObtainWriteLock(&ubik_dbase->versionLock);
479            urecovery_AbortAll(ubik_dbase);
480
481            /* Rx code to do the Bulk fetch */
482            file = 0;
483            offset = 0;
484            rxcall = rx_NewCall(bestServer->disk_rxcid);
485
486            ubik_print("Ubik: Synchronize database with server %s\n", afs_inet_ntoa(bestServer->addr[0]));
487
488            code = StartDISK_GetFile(rxcall, file);
489            if (code) { 
490               ubik_dprint("StartDiskGetFile failed=%d\n", code); 
491               goto FetchEndCall;
492            }
493            nbytes = rx_Read(rxcall, &length, sizeof(afs_int32));
494            length = ntohl(length);
495            if (nbytes != sizeof(afs_int32)) { 
496               ubik_dprint("Rx-read length error=%d\n", code=BULK_ERROR); 
497               code = EIO;
498               goto FetchEndCall;
499            }
500
501            /* Truncate the file firest */
502            code = (*ubik_dbase->truncate)(ubik_dbase, file, 0);
503            if (code) {
504               ubik_dprint("truncate io error=%d\n", code); 
505               goto FetchEndCall;
506            }
507
508            /* give invalid label during file transit */
509            tversion.epoch = 0;
510            tversion.counter = 0;
511            code = (*ubik_dbase->setlabel)(ubik_dbase, file, &tversion);
512            if (code) {
513               ubik_dprint("setlabel io error=%d\n", code); 
514               goto FetchEndCall;
515            }
516
517            while (length > 0)   {
518               tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
519               nbytes = rx_Read(rxcall, tbuffer, tlen);
520               if (nbytes != tlen) { 
521                  ubik_dprint("Rx-read bulk error=%d\n", code=BULK_ERROR); 
522                  code = EIO;
523                  goto FetchEndCall;
524               }
525               nbytes = (*ubik_dbase->write)(ubik_dbase, file, tbuffer, offset, tlen);
526               if (nbytes != tlen) {
527                  code = UIOERROR;
528                  goto FetchEndCall;
529               }
530               offset += tlen;
531               length -= tlen;
532            }
533            code = EndDISK_GetFile(rxcall, &tversion);
534 FetchEndCall:
535            tcode = rx_EndCall(rxcall, code);
536            if (!code) code = tcode;
537            if (!code) {
538               /* we got a new file, set up its header */
539               urecovery_state |= UBIK_RECHAVEDB;
540               memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
541               (*ubik_dbase->sync)(ubik_dbase, 0);       /* get data out first */
542               /* after data is good, sync disk with correct label */
543               code = (*ubik_dbase->setlabel)(ubik_dbase, 0, &ubik_dbase->version);
544            }
545            if (code) {
546               ubik_dbase->version.epoch = 0;
547               ubik_dbase->version.counter = 0;
548               ubik_print("Ubik: Synchronize database failed (error = %d)\n", code);
549            } else {
550               ubik_print("Ubik: Synchronize database completed\n");
551            }
552            udisk_Invalidate(ubik_dbase, 0);     /* data has changed */
553            LWP_NoYieldSignal(&ubik_dbase->version);
554            ReleaseWriteLock(&ubik_dbase->versionLock);
555         }
556         if (!(urecovery_state & UBIK_RECHAVEDB)) continue; /* not ready */
557         
558         /* If the database was newly initialized, then when we establish quorum, write
559          * a new label. This allows urecovery_AllBetter() to allow access for reads.
560          * Setting it to 2 also allows another site to come along with a newer
561          * database and overwrite this one.
562          */
563         if (ubik_dbase->version.epoch == 1) {
564            ObtainWriteLock(&ubik_dbase->versionLock);
565            urecovery_AbortAll(ubik_dbase);
566            ubik_epochTime = 2;
567            ubik_dbase->version.epoch   = ubik_epochTime;
568            ubik_dbase->version.counter = 1;
569            code = (*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
570            udisk_Invalidate(ubik_dbase, 0);           /* data may have changed */
571            LWP_NoYieldSignal(&ubik_dbase->version);
572            ReleaseWriteLock(&ubik_dbase->versionLock);
573         }
574
575         /* Check the other sites and send the database to them if they
576          * do not have the current db.
577          */
578         if (!(urecovery_state & UBIK_RECSENTDB)) {
579             /* now propagate out new version to everyone else */
580             dbok = 1;       /* start off assuming they all worked */
581
582             ObtainWriteLock(&ubik_dbase->versionLock);
583             /*
584              * Check if a write transaction is in progress. We can't send the
585              * db when a write is in progress here because the db would be
586              * obsolete as soon as it goes there. Also, ops after the begin
587              * trans would reach the recepient and wouldn't find a transaction
588              * pending there.  Frankly, I don't think it's possible to get past
589              * the write-lock above if there is a write transaction in progress,
590              * but then, it won't hurt to check, will it?
591              */
592             if (ubik_dbase->flags & DBWRITING) {
593               struct timeval tv;
594               int safety = 0;
595               tv.tv_sec =  0;
596               tv.tv_usec = 50000;
597               while ((ubik_dbase->flags & DBWRITING) && (safety < 500)) {
598                 ReleaseWriteLock(&ubik_dbase->versionLock);     
599                 /* sleep for a little while */
600                 IOMGR_Select(0, 0, 0, 0, &tv);
601                 tv.tv_usec += 10000; safety++;
602                 ObtainWriteLock(&ubik_dbase->versionLock);                   
603               }
604             }
605
606             for(ts=ubik_servers; ts; ts=ts->next) {
607                 inAddr.s_addr = ts->addr[0];
608                 if (!ts->up) {
609                     ubik_dprint("recovery cannot send version to %s\n",
610                                         afs_inet_ntoa(inAddr.s_addr));
611                     dbok = 0;
612                     continue;
613                 }
614                 ubik_dprint("recovery sending version to %s\n",
615                             afs_inet_ntoa(inAddr.s_addr));
616                 if (vcmp(ts->version, ubik_dbase->version) != 0) {
617                     ubik_dprint("recovery stating local database\n");
618
619                     /* Rx code to do the Bulk Store */
620                     code = (*ubik_dbase->stat)(ubik_dbase, 0, &ubikstat);
621                     if (!code) {
622                         length = ubikstat.size;
623                         file = offset = 0;
624                         rxcall = rx_NewCall(ts->disk_rxcid);
625                         code = StartDISK_SendFile(rxcall, file, length, &ubik_dbase->version);
626                         if (code) { 
627                             ubik_dprint("StartDiskSendFile failed=%d\n", code); 
628                             goto StoreEndCall;
629                         }
630                         while (length > 0)      {
631                             tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
632                             nbytes = (*ubik_dbase->read)(ubik_dbase, file, tbuffer, offset, tlen);
633                             if (nbytes != tlen) {
634                                 ubik_dprint("Local disk read error=%d\n", code=UIOERROR);
635                                 goto StoreEndCall;
636                             }
637                             nbytes = rx_Write(rxcall, tbuffer, tlen);
638                             if (nbytes != tlen) { 
639                                 ubik_dprint("Rx-write bulk error=%d\n", code=BULK_ERROR); 
640                                 goto StoreEndCall;
641                             }
642                             offset += tlen;
643                             length -= tlen;
644                         }
645                         code = EndDISK_SendFile(rxcall);
646 StoreEndCall:
647                         code = rx_EndCall(rxcall, code);
648                     }
649                     if (code == 0) {
650                         /* we set a new file, process its header */
651                         ts->version = ubik_dbase->version;
652                         ts->currentDB = 1;
653                     }
654                     else dbok = 0;
655                 }
656                 else {
657                     /* mark file up to date */
658                     ts->currentDB = 1;
659                 }
660             }
661             ReleaseWriteLock(&ubik_dbase->versionLock);
662             if (dbok) urecovery_state |= UBIK_RECSENTDB;
663         }
664     }
665 }
666
667 /*
668 ** send a Probe to all the network address of this server 
669 ** Return 0  if success, else return 1
670 */
671 DoProbe(server)
672 struct ubik_server *server;
673 {
674     struct rx_connection *conns[UBIK_MAX_INTERFACE_ADDR];
675     struct rx_connection *connSuccess = 0;
676     int                 i, j;
677     afs_uint32          addr;
678     char                buffer[32]; 
679     extern afs_int32    ubikSecIndex;
680     extern struct rx_securityClass      *ubikSecClass;
681
682     for (i=0; (addr=server->addr[i]) && (i<UBIK_MAX_INTERFACE_ADDR);i++)
683     {
684         conns[i] = rx_NewConnection(addr, ubik_callPortal, DISK_SERVICE_ID, 
685                                         ubikSecClass, ubikSecIndex);
686
687                 /* user requirement to use only the primary interface */
688         if ( ubikPrimaryAddrOnly )
689         {
690             i = 1;
691             break;
692         }
693     }
694     assert(i);  /* at least one interface address for this server */
695
696     multi_Rx(conns,i)
697     {
698         multi_DISK_Probe();
699         if ( !multi_error )                    /* first success */
700         {
701             addr = server->addr[multi_i];     /* successful interface addr */
702
703             if ( server->disk_rxcid)          /* destroy existing conn */
704                 rx_DestroyConnection(server->disk_rxcid); 
705             if ( server->vote_rxcid)
706                 rx_DestroyConnection(server->vote_rxcid);
707
708                                               /* make new connections */
709             server->disk_rxcid = conns[multi_i];
710             server->vote_rxcid = rx_NewConnection(addr, ubik_callPortal,
711                 VOTE_SERVICE_ID, ubikSecClass, ubikSecIndex);/* for vote reqs*/
712
713             connSuccess        = conns[multi_i];
714             strcpy(buffer, (char*)afs_inet_ntoa(server->addr[0]));
715             ubik_print("ubik:server %s is back up: will be contacted through %s\n",
716                         buffer, afs_inet_ntoa(addr));
717
718             multi_Abort;
719         }
720     } multi_End_Ignore; 
721
722     /* Destroy all connections except the one on which we succeeded */
723     for ( j=0; j < i; j++)
724         if ( conns[j] != connSuccess )
725             rx_DestroyConnection(conns[j] );
726
727     if (!connSuccess)
728         ubik_dprint("ubik:server %s still down\n",afs_inet_ntoa(server->addr[0]));
729
730     if ( connSuccess ) return 0;    /* success */
731         else return 1;              /* failure */
732 }