b5d6409bd8e34f0b4cb3db97b307e195b46a1bbb
[openafs.git] / src / ubik / recovery.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 RCSID("$Header$");
14
15 #include <sys/types.h>
16 #ifdef AFS_NT40_ENV
17 #include <winsock2.h>
18 #include <time.h>
19 #else
20 #include <sys/file.h>
21 #include <netinet/in.h>
22 #include <sys/time.h>
23 #endif
24 #include <assert.h>
25 #include <lock.h>
26 #include <rx/xdr.h>
27 #include <rx/rx.h>
28 #include <errno.h>
29 #include <afs/afsutil.h>
30
31 #define UBIK_INTERNALS
32 #include "ubik.h"
33 #include "ubik_int.h"
34
35 /* This module is responsible for determining when the system has
36  * recovered to the point that it can handle new transactions.  It
37  * replays logs, polls to determine the current dbase after a crash,
38  * and distributes the new database to the others.
39  */
40
41 /* The sync site associates a version number with each database.  It
42  * broadcasts the version associated with its current dbase in every
43  * one of its beacon messages.  When the sync site send a dbase to a
44  * server, it also sends the db's version.  A non-sync site server can
45  * tell if it has the right dbase version by simply comparing the
46  * version from the beacon message (uvote_dbVersion) with the version
47  * associated with the database (ubik_dbase->version).  The sync site
48  * itself simply has one counter to keep track of all of this (again
49  * ubik_dbase->version).
50  */
51
52 /* sync site: routine called when the sync site loses its quorum; this
53  * procedure is called "up" from the beacon package.  It resyncs the
54  * dbase and nudges the recovery daemon to try to propagate out the
55  * changes.  It also resets the recovery daemon's state, since
56  * recovery must potentially find a new dbase to propagate out.  This
57  * routine should not do anything with variables used by non-sync site
58  * servers.
59  */ 
60
61 /* if this flag is set, then ubik will use only the primary address 
62 ** ( the address specified in the CellServDB) to contact other 
63 ** ubik servers. Ubik recovery will not try opening connections 
64 ** to the alternate interface addresses. 
65 */
66 int ubikPrimaryAddrOnly;
67
68 urecovery_ResetState() {
69     urecovery_state = 0;
70     LWP_NoYieldSignal(&urecovery_state);
71     return 0;
72 }
73
74 /* sync site: routine called when a non-sync site server goes down; restarts recovery
75  * process to send missing server the new db when it comes back up.
76  * This routine should not do anything with variables used by non-sync site servers. 
77  */
78 urecovery_LostServer() {
79     LWP_NoYieldSignal(&urecovery_state);
80     return 0;
81 }
82
83 /* return true iff we have a current database (called by both sync
84  * sites and non-sync sites) How do we determine this?  If we're the
85  * sync site, we wait until recovery has finished fetching and
86  * re-labelling its dbase (it may still be trying to propagate it out
87  * to everyone else; that's THEIR problem).  If we're not the sync
88  * site, then we must have a dbase labelled with the right version,
89  * and we must have a currently-good sync site.
90  */
91 urecovery_AllBetter(adbase, areadAny)
92     int areadAny;
93     register struct ubik_dbase *adbase; {
94     register afs_int32 rcode;
95
96     ubik_dprint("allbetter checking\n");
97     rcode = 0;
98     
99     
100     if (areadAny) {
101       if (ubik_dbase->version.epoch > 1)
102           rcode = 1;                      /* Happy with any good version of database */
103     }
104
105     /* Check if we're sync site and we've got the right data */
106     else if (ubeacon_AmSyncSite() && (urecovery_state & UBIK_RECHAVEDB)) {
107         rcode = 1;
108     }
109
110     /* next, check if we're aux site, and we've ever been sent the
111      * right data (note that if a dbase update fails, we won't think
112      * that the sync site is still the sync site, 'cause it won't talk
113      * to us until a timeout period has gone by.  When we recover, we
114      * leave this clear until we get a new dbase */
115     else if ( (uvote_GetSyncSite() &&
116               (vcmp(ubik_dbVersion, ubik_dbase->version) == 0)) ) { /* && order is important */
117         rcode = 1;
118     }
119
120     ubik_dprint("allbetter: returning %d\n", rcode);
121     return rcode;
122 }
123
124 /* abort all transactions on this database */
125 urecovery_AbortAll(adbase)
126 struct ubik_dbase *adbase; {
127     register struct ubik_trans *tt;
128     for(tt = adbase->activeTrans; tt; tt=tt->next) {
129         udisk_abort(tt);
130     }
131     return 0;
132 }
133
134 /* this routine aborts the current remote transaction, if any, if the tid is wrong */
135 urecovery_CheckTid(atid)
136     register struct ubik_tid *atid; {
137     if (ubik_currentTrans) {
138         /* there is remote write trans, see if we match, see if this
139          * is a new transaction */
140         if (atid->epoch != ubik_currentTrans->tid.epoch || atid->counter > ubik_currentTrans->tid.counter) {
141             /* don't match, abort it */
142             /* If the thread is not waiting for lock - ok to end it */
143             if (ubik_currentTrans->locktype != LOCKWAIT) {
144                udisk_end(ubik_currentTrans);
145             }
146             ubik_currentTrans = (struct ubik_trans *) 0;
147         }
148     }
149 }
150
151 /* log format is defined here, and implicitly in disk.c
152  *
153  * 4 byte opcode, followed by parameters, each 4 bytes long.  All integers
154  * are in logged in network standard byte order, in case we want to move logs
155  * from machine-to-machine someday.
156  *
157  * Begin transaction: opcode
158  * Commit transaction: opcode, version (8 bytes)
159  * Truncate file: opcode, file number, length
160  * Abort transaction: opcode
161  * Write data: opcode, file, position, length, <length> data bytes
162  *
163  * A very simple routine, it just replays the log.  Note that this is a new-value only log, which
164  * implies that no uncommitted data is written to the dbase: one writes data to the log, including
165  * the commit record, then we allow data to be written through to the dbase.  In our particular
166  * implementation, once a transaction is done, we write out the pages to the database, so that
167  * our buffer package doesn't have to know about stable and uncommitted data in the memory buffers:
168  * any changed data while there is an uncommitted write transaction can be zapped during an
169  * abort and the remaining dbase on the disk is exactly the right dbase, without having to read
170  * the log.
171  */
172
173 /* replay logs */
174 static ReplayLog(adbase)
175     register struct ubik_dbase *adbase; {
176     afs_int32 opcode;
177     register afs_int32 code, tpos;
178     int logIsGood;
179     afs_int32 len, thisSize, tfile, filePos;
180     afs_int32 buffer[4];
181     afs_int32 syncFile = -1;
182     afs_int32 data[1024];
183
184     /* read the lock twice, once to see whether we have a transaction to deal
185         with that committed, (theoretically, we should support more than one
186         trans in the log at once, but not yet), and once replaying the
187         transactions.  */
188     tpos = 0;
189     logIsGood = 0;
190     /* for now, assume that all ops in log pertain to one transaction; see if there's a commit */
191     while (1) {
192         code = (*adbase->read)(adbase, LOGFILE, &opcode, tpos, sizeof(afs_int32));
193         if (code != sizeof(afs_int32)) break;
194         if (opcode == LOGNEW) {
195             /* handle begin trans */
196             tpos += sizeof(afs_int32);
197         }
198         else if (opcode == LOGABORT) break;
199         else if (opcode == LOGEND) {
200             logIsGood = 1;
201             break;
202         }
203         else if (opcode == LOGTRUNCATE) {
204             tpos += 4;
205             code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 2*sizeof(afs_int32));
206             if (code != 2*sizeof(afs_int32))    break;  /* premature eof or io error */
207             tpos += 2*sizeof(afs_int32);
208         }
209         else if (opcode == LOGDATA) {
210             tpos += 4;
211             code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 3*sizeof(afs_int32));
212             if (code != 3*sizeof(afs_int32)) break;
213             /* otherwise, skip over the data bytes, too */
214             tpos += buffer[2] + 3*sizeof(afs_int32);
215         }
216         else {
217             ubik_dprint("corrupt log opcode (%d) at position %d\n", opcode, tpos);
218             break;      /* corrupt log! */
219         }
220     }
221     if (logIsGood) {
222         /* actually do the replay; log should go all the way through the commit record, since
223             we just read it above. */
224         tpos = 0;
225         logIsGood = 0;
226         syncFile = -1;
227         while (1) {
228             code = (*adbase->read)(adbase, LOGFILE, &opcode, tpos, sizeof(afs_int32));
229             if (code != sizeof(afs_int32)) break;
230             if (opcode == LOGNEW) {
231                 /* handle begin trans */
232                 tpos += sizeof(afs_int32);
233             }
234             else if (opcode == LOGABORT) panic("log abort\n");
235             else if (opcode == LOGEND) {
236                 tpos += 4;
237                 code = (*adbase->read) (adbase, LOGFILE, buffer, tpos, 2*sizeof(afs_int32));
238                 if (code != 2*sizeof(afs_int32)) return UBADLOG;
239                 code = (*adbase->setlabel) (adbase, 0, buffer);
240                 if (code) return code;
241                 logIsGood = 1;
242                 break;      /* all done now */
243             }
244             else if (opcode == LOGTRUNCATE) {
245                 tpos += 4;
246                 code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 2*sizeof(afs_int32));
247                 if (code != 2*sizeof(afs_int32)) break; /* premature eof or io error */
248                 tpos += 2*sizeof(afs_int32);
249                 code = (*adbase->truncate) (adbase, ntohl(buffer[0]), ntohl(buffer[1]));
250                 if (code) return code;
251             }
252             else if (opcode == LOGDATA) {
253                 tpos += 4;
254                 code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 3*sizeof(afs_int32));
255                 if (code != 3*sizeof(afs_int32)) break;
256                 tpos += 3*sizeof(afs_int32);
257                 /* otherwise, skip over the data bytes, too */
258                 len = ntohl(buffer[2]);     /* total number of bytes to copy */
259                 filePos = ntohl(buffer[1]);
260                 tfile = ntohl(buffer[0]);
261                 /* try to minimize file syncs */
262                 if (syncFile != tfile) {
263                     if (syncFile >= 0) code = (*adbase->sync)(adbase, syncFile);
264                     else code = 0;
265                     syncFile = tfile;
266                     if (code) return code;
267                 }
268                 while (len > 0) {
269                     thisSize = (len > sizeof(data)? sizeof(data) : len);
270                     /* copy sizeof(data) buffer bytes at a time */
271                     code = (*adbase->read)(adbase, LOGFILE, data, tpos, thisSize);
272                     if (code != thisSize) return UBADLOG;
273                     code = (*adbase->write)(adbase, tfile, data, filePos, thisSize);
274                     if (code != thisSize) return UBADLOG;
275                     filePos += thisSize;
276                     tpos += thisSize;
277                     len -= thisSize;
278                 }
279             }
280             else {
281                 ubik_dprint("corrupt log opcode (%d) at position %d\n", opcode, tpos);
282                 break;  /* corrupt log! */
283             }
284         }
285         if (logIsGood) {
286             if (syncFile >= 0) code = (*adbase->sync)(adbase, syncFile);
287             if (code) return code;
288         }
289         else {
290             ubik_dprint("Log read error on pass 2\n");
291             return UBADLOG;
292         }
293     }
294
295     /* now truncate the log, we're done with it */
296     code = (*adbase->truncate)(adbase, LOGFILE, 0);
297     return code;
298 }
299
300 /* Called at initialization to figure out version of the dbase we really have.
301  * This routine is called after replaying the log; it reads the restored labels.
302  */
303 static InitializeDB(adbase)
304     register struct ubik_dbase *adbase; {
305     register afs_int32 code;
306     
307     code = (*adbase->getlabel)(adbase, 0, &adbase->version);
308     if (code) {
309         /* try setting the label to a new value */
310         adbase->version.epoch = 1;      /* value for newly-initialized db */
311         adbase->version.counter = 1;
312         code = (*adbase->setlabel) (adbase, 0, &adbase->version);
313         if (code) {
314             /* failed, try to set it back */
315             adbase->version.epoch = 0;
316             adbase->version.counter = 0;
317             (*adbase->setlabel) (adbase, 0, &adbase->version);
318         }
319         LWP_NoYieldSignal(&adbase->version);
320     }
321     return 0;
322 }
323
324 /* initialize the local dbase
325  * We replay the logs and then read the resulting file to figure out what version we've really got.
326  */
327 urecovery_Initialize(adbase)
328     register struct ubik_dbase *adbase; {
329     register afs_int32 code;
330
331     code = ReplayLog(adbase);
332     if (code) return code;
333     code = InitializeDB(adbase);
334     return code;
335 }
336
337 /* Main interaction loop for the recovery manager
338  * The recovery light-weight process only runs when you're the
339  * synchronization site.  It performs the following tasks, if and only
340  * if the prerequisite tasks have been performed successfully (it
341  * keeps track of which ones have been performed in its bit map,
342  * urecovery_state).
343  *
344  * First, it is responsible for probing that all servers are up.  This
345  * is the only operation that must be performed even if this is not
346  * yet the sync site, since otherwise this site may not notice that
347  * enough other machines are running to even elect this guy to be the
348  * sync site.
349  *
350  * After that, the recovery process does nothing until the beacon and
351  * voting modules manage to get this site elected sync site.
352  *
353  * After becoming sync site, recovery first attempts to find the best
354  * database available in the network (it must do this in order to
355  * ensure finding the latest committed data).  After finding the right
356  * database, it must fetch this dbase to the sync site.
357  *
358  * After fetching the dbase, it relabels it with a new version number,
359  * to ensure that everyone recognizes this dbase as the most recent
360  * dbase.
361  *
362  * One the dbase has been relabelled, this machine can start handling
363  * requests.  However, the recovery module still has one more task:
364  * propagating the dbase out to everyone who is up in the network.
365  */
366 urecovery_Interact() {
367     afs_int32 code, tcode;
368     struct ubik_server *bestServer;
369     struct ubik_server *ts;
370     int dbok, doingRPC, now;
371     afs_int32 lastProbeTime, lastDBVCheck;
372     /* if we're the sync site, the best db version we've found yet */
373     static struct ubik_version bestDBVersion;
374     struct ubik_version tversion;
375     struct timeval tv;
376     int length, tlen, offset, file, nbytes;
377     struct rx_call *rxcall;
378     char tbuffer[256];
379     struct ubik_stat ubikstat;
380     struct in_addr inAddr;
381
382     /* otherwise, begin interaction */
383     urecovery_state = 0;
384     lastProbeTime = 0;
385     lastDBVCheck = 0;
386     while (1) {
387         /* Run through this loop every 4 seconds */
388         tv.tv_sec = 4;
389         tv.tv_usec = 0;
390         IOMGR_Select(0, 0, 0, 0, &tv);
391
392         ubik_dprint("recovery running in state %x\n", urecovery_state);
393
394         /* Every 30 seconds, check all the down servers and mark them
395          * as up if they respond. When a server comes up or found to
396          * not be current, then re-find the the best database and 
397          * propogate it.
398          */
399         if ( (now = FT_ApproxTime()) > 30 + lastProbeTime) {
400             for (ts=ubik_servers,doingRPC=0; ts; ts=ts->next) {
401                 if (!ts->up) {
402                     doingRPC = 1;
403                     code = DoProbe(ts); 
404                     if (code == 0) {
405                        ts->up = 1;
406                        urecovery_state &= ~UBIK_RECFOUNDDB;
407                     }
408                 } else if (!ts->currentDB) {
409                     urecovery_state &= ~UBIK_RECFOUNDDB;
410                 }
411             }
412             if (doingRPC) 
413                 now = FT_ApproxTime();
414             lastProbeTime = now;
415         }
416
417         /* Mark whether we are the sync site */
418         if (!ubeacon_AmSyncSite()) {
419             urecovery_state &= ~UBIK_RECSYNCSITE;
420             continue;           /* nothing to do */
421         }
422         urecovery_state |= UBIK_RECSYNCSITE;
423         
424         /* If a server has just come up or if we have not found the 
425          * most current database, then go find the most current db.
426          */
427         if (!(urecovery_state & UBIK_RECFOUNDDB)) {
428             bestServer = (struct ubik_server *) 0;
429             bestDBVersion.epoch = 0;
430             bestDBVersion.counter = 0;
431             for(ts=ubik_servers; ts; ts=ts->next) {
432                 if (!ts->up) continue;  /* don't bother with these guys */
433                 if (ts->isClone) continue;
434                 code = DISK_GetVersion(ts->disk_rxcid, &ts->version);
435                 if (code == 0) {
436                     /* perhaps this is the best version */
437                     if (vcmp(ts->version, bestDBVersion) > 0) {
438                         /* new best version */
439                         bestDBVersion = ts->version;
440                         bestServer = ts;
441                     }
442                 }
443             }
444             /* take into consideration our version. Remember if we,
445              * the sync site, have the best version. Also note that
446              * we may need to send the best version out.
447              */
448             if (vcmp(ubik_dbase->version, bestDBVersion) >= 0) {
449                 bestDBVersion = ubik_dbase->version;
450                 bestServer = (struct ubik_server *) 0;
451                urecovery_state |= UBIK_RECHAVEDB;
452             } else {
453                /* Clear the flag only when we know we have to retrieve
454                 * the db. Because urecovery_AllBetter() looks at it.
455                 */
456                urecovery_state &= ~UBIK_RECHAVEDB;
457             }
458             lastDBVCheck = FT_ApproxTime();
459             urecovery_state |=  UBIK_RECFOUNDDB;
460             urecovery_state &= ~UBIK_RECSENTDB;
461         }
462         if (!(urecovery_state & UBIK_RECFOUNDDB)) continue; /* not ready */
463
464         /* If we, the sync site, do not have the best db version, then
465          * go and get it from the server that does.
466          */
467         if ((urecovery_state & UBIK_RECHAVEDB) || !bestServer) {
468            urecovery_state |= UBIK_RECHAVEDB;
469         } else {
470            /* we don't have the best version; we should fetch it. */
471            ObtainWriteLock(&ubik_dbase->versionLock);
472            urecovery_AbortAll(ubik_dbase);
473
474            /* Rx code to do the Bulk fetch */
475            file = 0;
476            offset = 0;
477            rxcall = rx_NewCall(bestServer->disk_rxcid);
478
479            ubik_print("Ubik: Synchronize database with server %s\n", afs_inet_ntoa(bestServer->addr[0]));
480
481            code = StartDISK_GetFile(rxcall, file);
482            if (code) { 
483               ubik_dprint("StartDiskGetFile failed=%d\n", code); 
484               goto FetchEndCall;
485            }
486            nbytes = rx_Read(rxcall, &length, sizeof(afs_int32));
487            length = ntohl(length);
488            if (nbytes != sizeof(afs_int32)) { 
489               ubik_dprint("Rx-read length error=%d\n", code=BULK_ERROR); 
490               code = EIO;
491               goto FetchEndCall;
492            }
493
494            /* Truncate the file firest */
495            code = (*ubik_dbase->truncate)(ubik_dbase, file, 0);
496            if (code) {
497               ubik_dprint("truncate io error=%d\n", code); 
498               goto FetchEndCall;
499            }
500
501            /* give invalid label during file transit */
502            tversion.epoch = 0;
503            tversion.counter = 0;
504            code = (*ubik_dbase->setlabel)(ubik_dbase, file, &tversion);
505            if (code) {
506               ubik_dprint("setlabel io error=%d\n", code); 
507               goto FetchEndCall;
508            }
509
510            while (length > 0)   {
511               tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
512               nbytes = rx_Read(rxcall, tbuffer, tlen);
513               if (nbytes != tlen) { 
514                  ubik_dprint("Rx-read bulk error=%d\n", code=BULK_ERROR); 
515                  code = EIO;
516                  goto FetchEndCall;
517               }
518               nbytes = (*ubik_dbase->write)(ubik_dbase, file, tbuffer, offset, tlen);
519               if (nbytes != tlen) {
520                  code = UIOERROR;
521                  goto FetchEndCall;
522               }
523               offset += tlen;
524               length -= tlen;
525            }
526            code = EndDISK_GetFile(rxcall, &tversion);
527 FetchEndCall:
528            tcode = rx_EndCall(rxcall, code);
529            if (!code) code = tcode;
530            if (!code) {
531               /* we got a new file, set up its header */
532               urecovery_state |= UBIK_RECHAVEDB;
533               memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
534               (*ubik_dbase->sync)(ubik_dbase, 0);       /* get data out first */
535               /* after data is good, sync disk with correct label */
536               code = (*ubik_dbase->setlabel)(ubik_dbase, 0, &ubik_dbase->version);
537            }
538            if (code) {
539               ubik_dbase->version.epoch = 0;
540               ubik_dbase->version.counter = 0;
541               ubik_print("Ubik: Synchronize database failed (error = %d)\n", code);
542            } else {
543               ubik_print("Ubik: Synchronize database completed\n");
544            }
545            udisk_Invalidate(ubik_dbase, 0);     /* data has changed */
546            LWP_NoYieldSignal(&ubik_dbase->version);
547            ReleaseWriteLock(&ubik_dbase->versionLock);
548         }
549         if (!(urecovery_state & UBIK_RECHAVEDB)) continue; /* not ready */
550         
551         /* If the database was newly initialized, then when we establish quorum, write
552          * a new label. This allows urecovery_AllBetter() to allow access for reads.
553          * Setting it to 2 also allows another site to come along with a newer
554          * database and overwrite this one.
555          */
556         if (ubik_dbase->version.epoch == 1) {
557            ObtainWriteLock(&ubik_dbase->versionLock);
558            urecovery_AbortAll(ubik_dbase);
559            ubik_epochTime = 2;
560            ubik_dbase->version.epoch   = ubik_epochTime;
561            ubik_dbase->version.counter = 1;
562            code = (*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
563            udisk_Invalidate(ubik_dbase, 0);           /* data may have changed */
564            LWP_NoYieldSignal(&ubik_dbase->version);
565            ReleaseWriteLock(&ubik_dbase->versionLock);
566         }
567
568         /* Check the other sites and send the database to them if they
569          * do not have the current db.
570          */
571         if (!(urecovery_state & UBIK_RECSENTDB)) {
572             /* now propagate out new version to everyone else */
573             dbok = 1;       /* start off assuming they all worked */
574
575             ObtainWriteLock(&ubik_dbase->versionLock);
576             /*
577              * Check if a write transaction is in progress. We can't send the
578              * db when a write is in progress here because the db would be
579              * obsolete as soon as it goes there. Also, ops after the begin
580              * trans would reach the recepient and wouldn't find a transaction
581              * pending there.  Frankly, I don't think it's possible to get past
582              * the write-lock above if there is a write transaction in progress,
583              * but then, it won't hurt to check, will it?
584              */
585             if (ubik_dbase->flags & DBWRITING) {
586               struct timeval tv;
587               int safety = 0;
588               tv.tv_sec =  0;
589               tv.tv_usec = 50000;
590               while ((ubik_dbase->flags & DBWRITING) && (safety < 500)) {
591                 ReleaseWriteLock(&ubik_dbase->versionLock);     
592                 /* sleep for a little while */
593                 IOMGR_Select(0, 0, 0, 0, &tv);
594                 tv.tv_usec += 10000; safety++;
595                 ObtainWriteLock(&ubik_dbase->versionLock);                   
596               }
597             }
598
599             for(ts=ubik_servers; ts; ts=ts->next) {
600                 inAddr.s_addr = ts->addr[0];
601                 if (!ts->up) {
602                     ubik_dprint("recovery cannot send version to %s\n",
603                                         afs_inet_ntoa(inAddr.s_addr));
604                     dbok = 0;
605                     continue;
606                 }
607                 ubik_dprint("recovery sending version to %s\n",
608                             afs_inet_ntoa(inAddr.s_addr));
609                 if (vcmp(ts->version, ubik_dbase->version) != 0) {
610                     ubik_dprint("recovery stating local database\n");
611
612                     /* Rx code to do the Bulk Store */
613                     code = (*ubik_dbase->stat)(ubik_dbase, 0, &ubikstat);
614                     if (!code) {
615                         length = ubikstat.size;
616                         file = offset = 0;
617                         rxcall = rx_NewCall(ts->disk_rxcid);
618                         code = StartDISK_SendFile(rxcall, file, length, &ubik_dbase->version);
619                         if (code) { 
620                             ubik_dprint("StartDiskSendFile failed=%d\n", code); 
621                             goto StoreEndCall;
622                         }
623                         while (length > 0)      {
624                             tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
625                             nbytes = (*ubik_dbase->read)(ubik_dbase, file, tbuffer, offset, tlen);
626                             if (nbytes != tlen) {
627                                 ubik_dprint("Local disk read error=%d\n", code=UIOERROR);
628                                 goto StoreEndCall;
629                             }
630                             nbytes = rx_Write(rxcall, tbuffer, tlen);
631                             if (nbytes != tlen) { 
632                                 ubik_dprint("Rx-write bulk error=%d\n", code=BULK_ERROR); 
633                                 goto StoreEndCall;
634                             }
635                             offset += tlen;
636                             length -= tlen;
637                         }
638                         code = EndDISK_SendFile(rxcall);
639 StoreEndCall:
640                         code = rx_EndCall(rxcall, code);
641                     }
642                     if (code == 0) {
643                         /* we set a new file, process its header */
644                         ts->version = ubik_dbase->version;
645                         ts->currentDB = 1;
646                     }
647                     else dbok = 0;
648                 }
649                 else {
650                     /* mark file up to date */
651                     ts->currentDB = 1;
652                 }
653             }
654             ReleaseWriteLock(&ubik_dbase->versionLock);
655             if (dbok) urecovery_state |= UBIK_RECSENTDB;
656         }
657     }
658 }
659
660 /*
661 ** send a Probe to all the network address of this server 
662 ** Return 0  if success, else return 1
663 */
664 DoProbe(server)
665 struct ubik_server *server;
666 {
667     struct rx_connection *conns[UBIK_MAX_INTERFACE_ADDR];
668     struct rx_connection *connSuccess = 0;
669     int                 i, j;
670     afs_uint32          addr;
671     char                buffer[32]; 
672     extern afs_int32    ubikSecIndex;
673     extern struct rx_securityClass      *ubikSecClass;
674
675     for (i=0; (addr=server->addr[i]) && (i<UBIK_MAX_INTERFACE_ADDR);i++)
676     {
677         conns[i] = rx_NewConnection(addr, ubik_callPortal, DISK_SERVICE_ID, 
678                                         ubikSecClass, ubikSecIndex);
679
680                 /* user requirement to use only the primary interface */
681         if ( ubikPrimaryAddrOnly )
682         {
683             i = 1;
684             break;
685         }
686     }
687     assert(i);  /* at least one interface address for this server */
688
689     multi_Rx(conns,i)
690     {
691         multi_DISK_Probe();
692         if ( !multi_error )                    /* first success */
693         {
694             addr = server->addr[multi_i];     /* successful interface addr */
695
696             if ( server->disk_rxcid)          /* destroy existing conn */
697                 rx_DestroyConnection(server->disk_rxcid); 
698             if ( server->vote_rxcid)
699                 rx_DestroyConnection(server->vote_rxcid);
700
701                                               /* make new connections */
702             server->disk_rxcid = conns[multi_i];
703             server->vote_rxcid = rx_NewConnection(addr, ubik_callPortal,
704                 VOTE_SERVICE_ID, ubikSecClass, ubikSecIndex);/* for vote reqs*/
705
706             connSuccess        = conns[multi_i];
707             strcpy(buffer, (char*)afs_inet_ntoa(server->addr[0]));
708             ubik_print("ubik:server %s is back up: will be contacted through %s\n",
709                         buffer, afs_inet_ntoa(addr));
710
711             multi_Abort;
712         }
713     } multi_End_Ignore; 
714
715     /* Destroy all connections except the one on which we succeeded */
716     for ( j=0; j < i; j++)
717         if ( conns[j] != connSuccess )
718             rx_DestroyConnection(conns[j] );
719
720     if (!connSuccess)
721         ubik_dprint("ubik:server %s still down\n",afs_inet_ntoa(server->addr[0]));
722
723     if ( connSuccess ) return 0;    /* success */
724         else return 1;              /* failure */
725 }