c3aa09cf61ef9a9564d5f3aed85e586bd1237d16
[openafs.git] / src / ubik / recovery.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afs/param.h>
11 #include <sys/types.h>
12 #ifdef AFS_NT40_ENV
13 #include <winsock2.h>
14 #include <time.h>
15 #else
16 #include <sys/file.h>
17 #include <netinet/in.h>
18 #include <sys/time.h>
19 #endif
20 #include <assert.h>
21 #include <lock.h>
22 #include <rx/xdr.h>
23 #include <rx/rx.h>
24 #include <errno.h>
25
26 #define UBIK_INTERNALS
27 #include "ubik.h"
28 #include "ubik_int.h"
29
30 /* This module is responsible for determining when the system has
31  * recovered to the point that it can handle new transactions.  It
32  * replays logs, polls to determine the current dbase after a crash,
33  * and distributes the new database to the others.
34  */
35
36 /* The sync site associates a version number with each database.  It
37  * broadcasts the version associated with its current dbase in every
38  * one of its beacon messages.  When the sync site send a dbase to a
39  * server, it also sends the db's version.  A non-sync site server can
40  * tell if it has the right dbase version by simply comparing the
41  * version from the beacon message (uvote_dbVersion) with the version
42  * associated with the database (ubik_dbase->version).  The sync site
43  * itself simply has one counter to keep track of all of this (again
44  * ubik_dbase->version).
45  */
46
47 /* sync site: routine called when the sync site loses its quorum; this
48  * procedure is called "up" from the beacon package.  It resyncs the
49  * dbase and nudges the recovery daemon to try to propagate out the
50  * changes.  It also resets the recovery daemon's state, since
51  * recovery must potentially find a new dbase to propagate out.  This
52  * routine should not do anything with variables used by non-sync site
53  * servers.
54  */ 
55
56 /* if this flag is set, then ubik will use only the primary address 
57 ** ( the address specified in the CellServDB) to contact other 
58 ** ubik servers. Ubik recovery will not try opening connections 
59 ** to the alternate interface addresses. 
60 */
61 int ubikPrimaryAddrOnly;
62
63 urecovery_ResetState() {
64     urecovery_state = 0;
65     LWP_NoYieldSignal(&urecovery_state);
66     return 0;
67 }
68
69 /* sync site: routine called when a non-sync site server goes down; restarts recovery
70  * process to send missing server the new db when it comes back up.
71  * This routine should not do anything with variables used by non-sync site servers. 
72  */
73 urecovery_LostServer() {
74     LWP_NoYieldSignal(&urecovery_state);
75     return 0;
76 }
77
78 /* return true iff we have a current database (called by both sync
79  * sites and non-sync sites) How do we determine this?  If we're the
80  * sync site, we wait until recovery has finished fetching and
81  * re-labelling its dbase (it may still be trying to propagate it out
82  * to everyone else; that's THEIR problem).  If we're not the sync
83  * site, then we must have a dbase labelled with the right version,
84  * and we must have a currently-good sync site.
85  */
86 urecovery_AllBetter(adbase, areadAny)
87     int areadAny;
88     register struct ubik_dbase *adbase; {
89     register afs_int32 rcode;
90
91     ubik_dprint("allbetter checking\n");
92     rcode = 0;
93     
94     
95     if (areadAny) {
96       if (ubik_dbase->version.epoch > 1)
97           rcode = 1;                      /* Happy with any good version of database */
98     }
99
100     /* Check if we're sync site and we've got the right data */
101     else if (ubeacon_AmSyncSite() && (urecovery_state & UBIK_RECHAVEDB)) {
102         rcode = 1;
103     }
104
105     /* next, check if we're aux site, and we've ever been sent the
106      * right data (note that if a dbase update fails, we won't think
107      * that the sync site is still the sync site, 'cause it won't talk
108      * to us until a timeout period has gone by.  When we recover, we
109      * leave this clear until we get a new dbase */
110     else if ( (uvote_GetSyncSite() &&
111               (vcmp(ubik_dbVersion, ubik_dbase->version) == 0)) ) { /* && order is important */
112         rcode = 1;
113     }
114
115     ubik_dprint("allbetter: returning %d\n", rcode);
116     return rcode;
117 }
118
119 /* abort all transactions on this database */
120 urecovery_AbortAll(adbase)
121 struct ubik_dbase *adbase; {
122     register struct ubik_trans *tt;
123     for(tt = adbase->activeTrans; tt; tt=tt->next) {
124         udisk_abort(tt);
125     }
126     return 0;
127 }
128
129 /* this routine aborts the current remote transaction, if any, if the tid is wrong */
130 urecovery_CheckTid(atid)
131     register struct ubik_tid *atid; {
132     if (ubik_currentTrans) {
133         /* there is remote write trans, see if we match, see if this
134          * is a new transaction */
135         if (atid->epoch != ubik_currentTrans->tid.epoch || atid->counter > ubik_currentTrans->tid.counter) {
136             /* don't match, abort it */
137             /* If the thread is not waiting for lock - ok to end it */
138             if (ubik_currentTrans->locktype != LOCKWAIT) {
139                udisk_end(ubik_currentTrans);
140             }
141             ubik_currentTrans = (struct ubik_trans *) 0;
142         }
143     }
144 }
145
146 /* log format is defined here, and implicitly in disk.c
147  *
148  * 4 byte opcode, followed by parameters, each 4 bytes long.  All integers
149  * are in logged in network standard byte order, in case we want to move logs
150  * from machine-to-machine someday.
151  *
152  * Begin transaction: opcode
153  * Commit transaction: opcode, version (8 bytes)
154  * Truncate file: opcode, file number, length
155  * Abort transaction: opcode
156  * Write data: opcode, file, position, length, <length> data bytes
157  *
158  * A very simple routine, it just replays the log.  Note that this is a new-value only log, which
159  * implies that no uncommitted data is written to the dbase: one writes data to the log, including
160  * the commit record, then we allow data to be written through to the dbase.  In our particular
161  * implementation, once a transaction is done, we write out the pages to the database, so that
162  * our buffer package doesn't have to know about stable and uncommitted data in the memory buffers:
163  * any changed data while there is an uncommitted write transaction can be zapped during an
164  * abort and the remaining dbase on the disk is exactly the right dbase, without having to read
165  * the log.
166  */
167
168 /* replay logs */
169 static ReplayLog(adbase)
170     register struct ubik_dbase *adbase; {
171     afs_int32 opcode;
172     register afs_int32 code, tpos;
173     int logIsGood;
174     afs_int32 len, thisSize, tfile, filePos;
175     afs_int32 buffer[4];
176     afs_int32 syncFile = -1;
177     afs_int32 data[1024];
178
179     /* read the lock twice, once to see whether we have a transaction to deal
180         with that committed, (theoretically, we should support more than one
181         trans in the log at once, but not yet), and once replaying the
182         transactions.  */
183     tpos = 0;
184     logIsGood = 0;
185     /* for now, assume that all ops in log pertain to one transaction; see if there's a commit */
186     while (1) {
187         code = (*adbase->read)(adbase, LOGFILE, &opcode, tpos, sizeof(afs_int32));
188         if (code != sizeof(afs_int32)) break;
189         if (opcode == LOGNEW) {
190             /* handle begin trans */
191             tpos += sizeof(afs_int32);
192         }
193         else if (opcode == LOGABORT) break;
194         else if (opcode == LOGEND) {
195             logIsGood = 1;
196             break;
197         }
198         else if (opcode == LOGTRUNCATE) {
199             tpos += 4;
200             code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 2*sizeof(afs_int32));
201             if (code != 2*sizeof(afs_int32))    break;  /* premature eof or io error */
202             tpos += 2*sizeof(afs_int32);
203         }
204         else if (opcode == LOGDATA) {
205             tpos += 4;
206             code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 3*sizeof(afs_int32));
207             if (code != 3*sizeof(afs_int32)) break;
208             /* otherwise, skip over the data bytes, too */
209             tpos += buffer[2] + 3*sizeof(afs_int32);
210         }
211         else {
212             ubik_dprint("corrupt log opcode (%d) at position %d\n", opcode, tpos);
213             break;      /* corrupt log! */
214         }
215     }
216     if (logIsGood) {
217         /* actually do the replay; log should go all the way through the commit record, since
218             we just read it above. */
219         tpos = 0;
220         logIsGood = 0;
221         syncFile = -1;
222         while (1) {
223             code = (*adbase->read)(adbase, LOGFILE, &opcode, tpos, sizeof(afs_int32));
224             if (code != sizeof(afs_int32)) break;
225             if (opcode == LOGNEW) {
226                 /* handle begin trans */
227                 tpos += sizeof(afs_int32);
228             }
229             else if (opcode == LOGABORT) panic("log abort\n");
230             else if (opcode == LOGEND) {
231                 tpos += 4;
232                 code = (*adbase->read) (adbase, LOGFILE, buffer, tpos, 2*sizeof(afs_int32));
233                 if (code != 2*sizeof(afs_int32)) return UBADLOG;
234                 code = (*adbase->setlabel) (adbase, 0, buffer);
235                 if (code) return code;
236                 logIsGood = 1;
237                 break;      /* all done now */
238             }
239             else if (opcode == LOGTRUNCATE) {
240                 tpos += 4;
241                 code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 2*sizeof(afs_int32));
242                 if (code != 2*sizeof(afs_int32)) break; /* premature eof or io error */
243                 tpos += 2*sizeof(afs_int32);
244                 code = (*adbase->truncate) (adbase, ntohl(buffer[0]), ntohl(buffer[1]));
245                 if (code) return code;
246             }
247             else if (opcode == LOGDATA) {
248                 tpos += 4;
249                 code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 3*sizeof(afs_int32));
250                 if (code != 3*sizeof(afs_int32)) break;
251                 tpos += 3*sizeof(afs_int32);
252                 /* otherwise, skip over the data bytes, too */
253                 len = ntohl(buffer[2]);     /* total number of bytes to copy */
254                 filePos = ntohl(buffer[1]);
255                 tfile = ntohl(buffer[0]);
256                 /* try to minimize file syncs */
257                 if (syncFile != tfile) {
258                     if (syncFile >= 0) code = (*adbase->sync)(adbase, syncFile);
259                     else code = 0;
260                     syncFile = tfile;
261                     if (code) return code;
262                 }
263                 while (len > 0) {
264                     thisSize = (len > sizeof(data)? sizeof(data) : len);
265                     /* copy sizeof(data) buffer bytes at a time */
266                     code = (*adbase->read)(adbase, LOGFILE, data, tpos, thisSize);
267                     if (code != thisSize) return UBADLOG;
268                     code = (*adbase->write)(adbase, tfile, data, filePos, thisSize);
269                     if (code != thisSize) return UBADLOG;
270                     filePos += thisSize;
271                     tpos += thisSize;
272                     len -= thisSize;
273                 }
274             }
275             else {
276                 ubik_dprint("corrupt log opcode (%d) at position %d\n", opcode, tpos);
277                 break;  /* corrupt log! */
278             }
279         }
280         if (logIsGood) {
281             if (syncFile >= 0) code = (*adbase->sync)(adbase, syncFile);
282             if (code) return code;
283         }
284         else {
285             ubik_dprint("Log read error on pass 2\n");
286             return UBADLOG;
287         }
288     }
289
290     /* now truncate the log, we're done with it */
291     code = (*adbase->truncate)(adbase, LOGFILE, 0);
292     return code;
293 }
294
295 /* Called at initialization to figure out version of the dbase we really have.
296  * This routine is called after replaying the log; it reads the restored labels.
297  */
298 static InitializeDB(adbase)
299     register struct ubik_dbase *adbase; {
300     register afs_int32 code;
301     
302     code = (*adbase->getlabel)(adbase, 0, &adbase->version);
303     if (code) {
304         /* try setting the label to a new value */
305         adbase->version.epoch = 1;      /* value for newly-initialized db */
306         adbase->version.counter = 1;
307         code = (*adbase->setlabel) (adbase, 0, &adbase->version);
308         if (code) {
309             /* failed, try to set it back */
310             adbase->version.epoch = 0;
311             adbase->version.counter = 0;
312             (*adbase->setlabel) (adbase, 0, &adbase->version);
313         }
314         LWP_NoYieldSignal(&adbase->version);
315     }
316     return 0;
317 }
318
319 /* initialize the local dbase
320  * We replay the logs and then read the resulting file to figure out what version we've really got.
321  */
322 urecovery_Initialize(adbase)
323     register struct ubik_dbase *adbase; {
324     register afs_int32 code;
325
326     code = ReplayLog(adbase);
327     if (code) return code;
328     code = InitializeDB(adbase);
329     return code;
330 }
331
332 /* Main interaction loop for the recovery manager
333  * The recovery light-weight process only runs when you're the
334  * synchronization site.  It performs the following tasks, if and only
335  * if the prerequisite tasks have been performed successfully (it
336  * keeps track of which ones have been performed in its bit map,
337  * urecovery_state).
338  *
339  * First, it is responsible for probing that all servers are up.  This
340  * is the only operation that must be performed even if this is not
341  * yet the sync site, since otherwise this site may not notice that
342  * enough other machines are running to even elect this guy to be the
343  * sync site.
344  *
345  * After that, the recovery process does nothing until the beacon and
346  * voting modules manage to get this site elected sync site.
347  *
348  * After becoming sync site, recovery first attempts to find the best
349  * database available in the network (it must do this in order to
350  * ensure finding the latest committed data).  After finding the right
351  * database, it must fetch this dbase to the sync site.
352  *
353  * After fetching the dbase, it relabels it with a new version number,
354  * to ensure that everyone recognizes this dbase as the most recent
355  * dbase.
356  *
357  * One the dbase has been relabelled, this machine can start handling
358  * requests.  However, the recovery module still has one more task:
359  * propagating the dbase out to everyone who is up in the network.
360  */
361 urecovery_Interact() {
362     afs_int32 code, tcode;
363     struct ubik_server *bestServer;
364     struct ubik_server *ts;
365     int dbok, doingRPC, now;
366     afs_int32 lastProbeTime, lastDBVCheck;
367     /* if we're the sync site, the best db version we've found yet */
368     static struct ubik_version bestDBVersion;
369     struct ubik_version tversion;
370     struct timeval tv;
371     int length, tlen, offset, file, nbytes;
372     struct rx_call *rxcall;
373     char tbuffer[256];
374     struct ubik_stat ubikstat;
375     struct in_addr inAddr;
376
377     /* otherwise, begin interaction */
378     urecovery_state = 0;
379     lastProbeTime = 0;
380     lastDBVCheck = 0;
381     while (1) {
382         /* Run through this loop every 4 seconds */
383         tv.tv_sec = 4;
384         tv.tv_usec = 0;
385         IOMGR_Select(0, 0, 0, 0, &tv);
386
387         ubik_dprint("recovery running in state %x\n", urecovery_state);
388
389         /* Every 30 seconds, check all the down servers and mark them
390          * as up if they respond. When a server comes up or found to
391          * not be current, then re-find the the best database and 
392          * propogate it.
393          */
394         if ( (now = FT_ApproxTime()) > 30 + lastProbeTime) {
395             for (ts=ubik_servers,doingRPC=0; ts; ts=ts->next) {
396                 if (!ts->up) {
397                     doingRPC = 1;
398                     code = DoProbe(ts); 
399                     if (code == 0) {
400                        ts->up = 1;
401                        urecovery_state &= ~UBIK_RECFOUNDDB;
402                     }
403                 } else if (!ts->currentDB) {
404                     urecovery_state &= ~UBIK_RECFOUNDDB;
405                 }
406             }
407             if (doingRPC) 
408                 now = FT_ApproxTime();
409             lastProbeTime = now;
410         }
411
412         /* Mark whether we are the sync site */
413         if (!ubeacon_AmSyncSite()) {
414             urecovery_state &= ~UBIK_RECSYNCSITE;
415             continue;           /* nothing to do */
416         }
417         urecovery_state |= UBIK_RECSYNCSITE;
418         
419         /* If a server has just come up or if we have not found the 
420          * most current database, then go find the most current db.
421          */
422         if (!(urecovery_state & UBIK_RECFOUNDDB)) {
423             bestServer = (struct ubik_server *) 0;
424             bestDBVersion.epoch = 0;
425             bestDBVersion.counter = 0;
426             for(ts=ubik_servers; ts; ts=ts->next) {
427                 if (!ts->up) continue;  /* don't bother with these guys */
428                 code = DISK_GetVersion(ts->disk_rxcid, &ts->version);
429                 if (code == 0) {
430                     /* perhaps this is the best version */
431                     if (vcmp(ts->version, bestDBVersion) > 0) {
432                         /* new best version */
433                         bestDBVersion = ts->version;
434                         bestServer = ts;
435                     }
436                 }
437             }
438             /* take into consideration our version. Remember if we,
439              * the sync site, have the best version. Also note that
440              * we may need to send the best version out.
441              */
442             if (vcmp(ubik_dbase->version, bestDBVersion) >= 0) {
443                 bestDBVersion = ubik_dbase->version;
444                 bestServer = (struct ubik_server *) 0;
445                urecovery_state |= UBIK_RECHAVEDB;
446             } else {
447                /* Clear the flag only when we know we have to retrieve
448                 * the db. Because urecovery_AllBetter() looks at it.
449                 */
450                urecovery_state &= ~UBIK_RECHAVEDB;
451             }
452             lastDBVCheck = FT_ApproxTime();
453             urecovery_state |=  UBIK_RECFOUNDDB;
454             urecovery_state &= ~UBIK_RECSENTDB;
455         }
456         if (!(urecovery_state & UBIK_RECFOUNDDB)) continue; /* not ready */
457
458         /* If we, the sync site, do not have the best db version, then
459          * go and get it from the server that does.
460          */
461         if ((urecovery_state & UBIK_RECHAVEDB) || !bestServer) {
462            urecovery_state |= UBIK_RECHAVEDB;
463         } else {
464            /* we don't have the best version; we should fetch it. */
465            ObtainWriteLock(&ubik_dbase->versionLock);
466            urecovery_AbortAll(ubik_dbase);
467
468            /* Rx code to do the Bulk fetch */
469            file = 0;
470            offset = 0;
471            rxcall = rx_NewCall(bestServer->disk_rxcid);
472
473            ubik_print("Ubik: Synchronize database with server %s\n", afs_inet_ntoa(bestServer->addr[0]));
474
475            code = StartDISK_GetFile(rxcall, file);
476            if (code) { 
477               ubik_dprint("StartDiskGetFile failed=%d\n", code); 
478               goto FetchEndCall;
479            }
480            nbytes = rx_Read(rxcall, &length, sizeof(afs_int32));
481            length = ntohl(length);
482            if (nbytes != sizeof(afs_int32)) { 
483               ubik_dprint("Rx-read length error=%d\n", code=BULK_ERROR); 
484               code = EIO;
485               goto FetchEndCall;
486            }
487
488            /* Truncate the file firest */
489            code = (*ubik_dbase->truncate)(ubik_dbase, file, 0);
490            if (code) {
491               ubik_dprint("truncate io error=%d\n", code); 
492               goto FetchEndCall;
493            }
494
495            /* give invalid label during file transit */
496            tversion.epoch = 0;
497            tversion.counter = 0;
498            code = (*ubik_dbase->setlabel)(ubik_dbase, file, &tversion);
499            if (code) {
500               ubik_dprint("setlabel io error=%d\n", code); 
501               goto FetchEndCall;
502            }
503
504            while (length > 0)   {
505               tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
506               nbytes = rx_Read(rxcall, tbuffer, tlen);
507               if (nbytes != tlen) { 
508                  ubik_dprint("Rx-read bulk error=%d\n", code=BULK_ERROR); 
509                  code = EIO;
510                  goto FetchEndCall;
511               }
512               nbytes = (*ubik_dbase->write)(ubik_dbase, file, tbuffer, offset, tlen);
513               if (nbytes != tlen) {
514                  code = UIOERROR;
515                  goto FetchEndCall;
516               }
517               offset += tlen;
518               length -= tlen;
519            }
520            code = EndDISK_GetFile(rxcall, &tversion);
521 FetchEndCall:
522            tcode = rx_EndCall(rxcall, code);
523            if (!code) code = tcode;
524            if (!code) {
525               /* we got a new file, set up its header */
526               urecovery_state |= UBIK_RECHAVEDB;
527               bcopy(&tversion, &ubik_dbase->version, sizeof(struct ubik_version));
528               (*ubik_dbase->sync)(ubik_dbase, 0);       /* get data out first */
529               /* after data is good, sync disk with correct label */
530               code = (*ubik_dbase->setlabel)(ubik_dbase, 0, &ubik_dbase->version);
531            }
532            if (code) {
533               ubik_dbase->version.epoch = 0;
534               ubik_dbase->version.counter = 0;
535               ubik_print("Ubik: Synchronize database failed (error = %d)\n", code);
536            } else {
537               ubik_print("Ubik: Synchronize database completed\n");
538            }
539            udisk_Invalidate(ubik_dbase, 0);     /* data has changed */
540            LWP_NoYieldSignal(&ubik_dbase->version);
541            ReleaseWriteLock(&ubik_dbase->versionLock);
542         }
543         if (!(urecovery_state & UBIK_RECHAVEDB)) continue; /* not ready */
544         
545         /* If the database was newly initialized, then when we establish quorum, write
546          * a new label. This allows urecovery_AllBetter() to allow access for reads.
547          * Setting it to 2 also allows another site to come along with a newer
548          * database and overwrite this one.
549          */
550         if (ubik_dbase->version.epoch == 1) {
551            ObtainWriteLock(&ubik_dbase->versionLock);
552            urecovery_AbortAll(ubik_dbase);
553            ubik_epochTime = 2;
554            ubik_dbase->version.epoch   = ubik_epochTime;
555            ubik_dbase->version.counter = 1;
556            code = (*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
557            udisk_Invalidate(ubik_dbase, 0);           /* data may have changed */
558            LWP_NoYieldSignal(&ubik_dbase->version);
559            ReleaseWriteLock(&ubik_dbase->versionLock);
560         }
561
562         /* Check the other sites and send the database to them if they
563          * do not have the current db.
564          */
565         if (!(urecovery_state & UBIK_RECSENTDB)) {
566             /* now propagate out new version to everyone else */
567             dbok = 1;       /* start off assuming they all worked */
568
569             ObtainWriteLock(&ubik_dbase->versionLock);
570             /*
571              * Check if a write transaction is in progress. We can't send the
572              * db when a write is in progress here because the db would be
573              * obsolete as soon as it goes there. Also, ops after the begin
574              * trans would reach the recepient and wouldn't find a transaction
575              * pending there.  Frankly, I don't think it's possible to get past
576              * the write-lock above if there is a write transaction in progress,
577              * but then, it won't hurt to check, will it?
578              */
579             if (ubik_dbase->flags & DBWRITING) {
580               struct timeval tv;
581               int safety = 0;
582               tv.tv_sec =  0;
583               tv.tv_usec = 50000;
584               while ((ubik_dbase->flags & DBWRITING) && (safety < 500)) {
585                 ReleaseWriteLock(&ubik_dbase->versionLock);     
586                 /* sleep for a little while */
587                 IOMGR_Select(0, 0, 0, 0, &tv);
588                 tv.tv_usec += 10000; safety++;
589                 ObtainWriteLock(&ubik_dbase->versionLock);                   
590               }
591             }
592
593             for(ts=ubik_servers; ts; ts=ts->next) {
594                 inAddr.s_addr = ts->addr[0];
595                 if (!ts->up) {
596                     ubik_dprint("recovery cannot send version to %s\n",
597                                         afs_inet_ntoa(inAddr.s_addr));
598                     dbok = 0;
599                     continue;
600                 }
601                 ubik_dprint("recovery sending version to %s\n",
602                             afs_inet_ntoa(inAddr.s_addr));
603                 if (vcmp(ts->version, ubik_dbase->version) != 0) {
604                     ubik_dprint("recovery stating local database\n");
605
606                     /* Rx code to do the Bulk Store */
607                     code = (*ubik_dbase->stat)(ubik_dbase, 0, &ubikstat);
608                     if (!code) {
609                         length = ubikstat.size;
610                         file = offset = 0;
611                         rxcall = rx_NewCall(ts->disk_rxcid);
612                         code = StartDISK_SendFile(rxcall, file, length, &ubik_dbase->version);
613                         if (code) { 
614                             ubik_dprint("StartDiskSendFile failed=%d\n", code); 
615                             goto StoreEndCall;
616                         }
617                         while (length > 0)      {
618                             tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
619                             nbytes = (*ubik_dbase->read)(ubik_dbase, file, tbuffer, offset, tlen);
620                             if (nbytes != tlen) {
621                                 ubik_dprint("Local disk read error=%d\n", code=UIOERROR);
622                                 goto StoreEndCall;
623                             }
624                             nbytes = rx_Write(rxcall, tbuffer, tlen);
625                             if (nbytes != tlen) { 
626                                 ubik_dprint("Rx-write bulk error=%d\n", code=BULK_ERROR); 
627                                 goto StoreEndCall;
628                             }
629                             offset += tlen;
630                             length -= tlen;
631                         }
632                         code = EndDISK_SendFile(rxcall);
633 StoreEndCall:
634                         code = rx_EndCall(rxcall, code);
635                     }
636                     if (code == 0) {
637                         /* we set a new file, process its header */
638                         ts->version = ubik_dbase->version;
639                         ts->currentDB = 1;
640                     }
641                     else dbok = 0;
642                 }
643                 else {
644                     /* mark file up to date */
645                     ts->currentDB = 1;
646                 }
647             }
648             ReleaseWriteLock(&ubik_dbase->versionLock);
649             if (dbok) urecovery_state |= UBIK_RECSENTDB;
650         }
651     }
652 }
653
654 /*
655 ** send a Probe to all the network address of this server 
656 ** Return 0  if success, else return 1
657 */
658 DoProbe(server)
659 struct ubik_server *server;
660 {
661     struct rx_connection *conns[UBIK_MAX_INTERFACE_ADDR];
662     struct rx_connection *connSuccess = 0;
663     int                 i, j;
664     afs_uint32          addr;
665     char                buffer[32]; 
666     extern afs_int32    ubikSecIndex;
667     extern struct rx_securityClass      *ubikSecClass;
668
669     for (i=0; (addr=server->addr[i]) && (i<UBIK_MAX_INTERFACE_ADDR);i++)
670     {
671         conns[i] = rx_NewConnection(addr, ubik_callPortal, DISK_SERVICE_ID, 
672                                         ubikSecClass, ubikSecIndex);
673
674                 /* user requirement to use only the primary interface */
675         if ( ubikPrimaryAddrOnly )
676         {
677             i = 1;
678             break;
679         }
680     }
681     assert(i);  /* at least one interface address for this server */
682
683     multi_Rx(conns,i)
684     {
685         multi_DISK_Probe();
686         if ( !multi_error )                    /* first success */
687         {
688             addr = server->addr[multi_i];     /* successful interface addr */
689
690             if ( server->disk_rxcid)          /* destroy existing conn */
691                 rx_DestroyConnection(server->disk_rxcid); 
692             if ( server->vote_rxcid)
693                 rx_DestroyConnection(server->vote_rxcid);
694
695                                               /* make new connections */
696             server->disk_rxcid = conns[multi_i];
697             server->vote_rxcid = rx_NewConnection(addr, ubik_callPortal,
698                 VOTE_SERVICE_ID, ubikSecClass, ubikSecIndex);/* for vote reqs*/
699
700             connSuccess        = conns[multi_i];
701             strcpy(buffer, (char*)afs_inet_ntoa(server->addr[0]));
702             ubik_print("ubik:server %s is back up: will be contacted through %s\n",
703                         buffer, afs_inet_ntoa(addr));
704
705             multi_Abort;
706         }
707     } multi_End_Ignore; 
708
709     /* Destroy all connections except the one on which we succeeded */
710     for ( j=0; j < i; j++)
711         if ( conns[j] != connSuccess )
712             rx_DestroyConnection(conns[j] );
713
714     if (!connSuccess)
715         ubik_dprint("ubik:server %s still down\n",afs_inet_ntoa(server->addr[0]));
716
717     if ( connSuccess ) return 0;    /* success */
718         else return 1;              /* failure */
719 }