ubik-clone-support-20010212
[openafs.git] / src / ubik / recovery.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afs/param.h>
11 #include <sys/types.h>
12 #ifdef AFS_NT40_ENV
13 #include <winsock2.h>
14 #include <time.h>
15 #else
16 #include <sys/file.h>
17 #include <netinet/in.h>
18 #include <sys/time.h>
19 #endif
20 #include <assert.h>
21 #include <lock.h>
22 #include <rx/xdr.h>
23 #include <rx/rx.h>
24 #include <errno.h>
25
26 #define UBIK_INTERNALS
27 #include "ubik.h"
28 #include "ubik_int.h"
29
30 /* This module is responsible for determining when the system has
31  * recovered to the point that it can handle new transactions.  It
32  * replays logs, polls to determine the current dbase after a crash,
33  * and distributes the new database to the others.
34  */
35
36 /* The sync site associates a version number with each database.  It
37  * broadcasts the version associated with its current dbase in every
38  * one of its beacon messages.  When the sync site send a dbase to a
39  * server, it also sends the db's version.  A non-sync site server can
40  * tell if it has the right dbase version by simply comparing the
41  * version from the beacon message (uvote_dbVersion) with the version
42  * associated with the database (ubik_dbase->version).  The sync site
43  * itself simply has one counter to keep track of all of this (again
44  * ubik_dbase->version).
45  */
46
47 /* sync site: routine called when the sync site loses its quorum; this
48  * procedure is called "up" from the beacon package.  It resyncs the
49  * dbase and nudges the recovery daemon to try to propagate out the
50  * changes.  It also resets the recovery daemon's state, since
51  * recovery must potentially find a new dbase to propagate out.  This
52  * routine should not do anything with variables used by non-sync site
53  * servers.
54  */ 
55
56 /* if this flag is set, then ubik will use only the primary address 
57 ** ( the address specified in the CellServDB) to contact other 
58 ** ubik servers. Ubik recovery will not try opening connections 
59 ** to the alternate interface addresses. 
60 */
61 int ubikPrimaryAddrOnly;
62
63 urecovery_ResetState() {
64     urecovery_state = 0;
65     LWP_NoYieldSignal(&urecovery_state);
66     return 0;
67 }
68
69 /* sync site: routine called when a non-sync site server goes down; restarts recovery
70  * process to send missing server the new db when it comes back up.
71  * This routine should not do anything with variables used by non-sync site servers. 
72  */
73 urecovery_LostServer() {
74     LWP_NoYieldSignal(&urecovery_state);
75     return 0;
76 }
77
78 /* return true iff we have a current database (called by both sync
79  * sites and non-sync sites) How do we determine this?  If we're the
80  * sync site, we wait until recovery has finished fetching and
81  * re-labelling its dbase (it may still be trying to propagate it out
82  * to everyone else; that's THEIR problem).  If we're not the sync
83  * site, then we must have a dbase labelled with the right version,
84  * and we must have a currently-good sync site.
85  */
86 urecovery_AllBetter(adbase, areadAny)
87     int areadAny;
88     register struct ubik_dbase *adbase; {
89     register afs_int32 rcode;
90
91     ubik_dprint("allbetter checking\n");
92     rcode = 0;
93     
94     
95     if (areadAny) {
96       if (ubik_dbase->version.epoch > 1)
97           rcode = 1;                      /* Happy with any good version of database */
98     }
99
100     /* Check if we're sync site and we've got the right data */
101     else if (ubeacon_AmSyncSite() && (urecovery_state & UBIK_RECHAVEDB)) {
102         rcode = 1;
103     }
104
105     /* next, check if we're aux site, and we've ever been sent the
106      * right data (note that if a dbase update fails, we won't think
107      * that the sync site is still the sync site, 'cause it won't talk
108      * to us until a timeout period has gone by.  When we recover, we
109      * leave this clear until we get a new dbase */
110     else if ( (uvote_GetSyncSite() &&
111               (vcmp(ubik_dbVersion, ubik_dbase->version) == 0)) ) { /* && order is important */
112         rcode = 1;
113     }
114
115     ubik_dprint("allbetter: returning %d\n", rcode);
116     return rcode;
117 }
118
119 /* abort all transactions on this database */
120 urecovery_AbortAll(adbase)
121 struct ubik_dbase *adbase; {
122     register struct ubik_trans *tt;
123     for(tt = adbase->activeTrans; tt; tt=tt->next) {
124         udisk_abort(tt);
125     }
126     return 0;
127 }
128
129 /* this routine aborts the current remote transaction, if any, if the tid is wrong */
130 urecovery_CheckTid(atid)
131     register struct ubik_tid *atid; {
132     if (ubik_currentTrans) {
133         /* there is remote write trans, see if we match, see if this
134          * is a new transaction */
135         if (atid->epoch != ubik_currentTrans->tid.epoch || atid->counter > ubik_currentTrans->tid.counter) {
136             /* don't match, abort it */
137             /* If the thread is not waiting for lock - ok to end it */
138             if (ubik_currentTrans->locktype != LOCKWAIT) {
139                udisk_end(ubik_currentTrans);
140             }
141             ubik_currentTrans = (struct ubik_trans *) 0;
142         }
143     }
144 }
145
146 /* log format is defined here, and implicitly in disk.c
147  *
148  * 4 byte opcode, followed by parameters, each 4 bytes long.  All integers
149  * are in logged in network standard byte order, in case we want to move logs
150  * from machine-to-machine someday.
151  *
152  * Begin transaction: opcode
153  * Commit transaction: opcode, version (8 bytes)
154  * Truncate file: opcode, file number, length
155  * Abort transaction: opcode
156  * Write data: opcode, file, position, length, <length> data bytes
157  *
158  * A very simple routine, it just replays the log.  Note that this is a new-value only log, which
159  * implies that no uncommitted data is written to the dbase: one writes data to the log, including
160  * the commit record, then we allow data to be written through to the dbase.  In our particular
161  * implementation, once a transaction is done, we write out the pages to the database, so that
162  * our buffer package doesn't have to know about stable and uncommitted data in the memory buffers:
163  * any changed data while there is an uncommitted write transaction can be zapped during an
164  * abort and the remaining dbase on the disk is exactly the right dbase, without having to read
165  * the log.
166  */
167
168 /* replay logs */
169 static ReplayLog(adbase)
170     register struct ubik_dbase *adbase; {
171     afs_int32 opcode;
172     register afs_int32 code, tpos;
173     int logIsGood;
174     afs_int32 len, thisSize, tfile, filePos;
175     afs_int32 buffer[4];
176     afs_int32 syncFile = -1;
177     afs_int32 data[1024];
178
179     /* read the lock twice, once to see whether we have a transaction to deal
180         with that committed, (theoretically, we should support more than one
181         trans in the log at once, but not yet), and once replaying the
182         transactions.  */
183     tpos = 0;
184     logIsGood = 0;
185     /* for now, assume that all ops in log pertain to one transaction; see if there's a commit */
186     while (1) {
187         code = (*adbase->read)(adbase, LOGFILE, &opcode, tpos, sizeof(afs_int32));
188         if (code != sizeof(afs_int32)) break;
189         if (opcode == LOGNEW) {
190             /* handle begin trans */
191             tpos += sizeof(afs_int32);
192         }
193         else if (opcode == LOGABORT) break;
194         else if (opcode == LOGEND) {
195             logIsGood = 1;
196             break;
197         }
198         else if (opcode == LOGTRUNCATE) {
199             tpos += 4;
200             code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 2*sizeof(afs_int32));
201             if (code != 2*sizeof(afs_int32))    break;  /* premature eof or io error */
202             tpos += 2*sizeof(afs_int32);
203         }
204         else if (opcode == LOGDATA) {
205             tpos += 4;
206             code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 3*sizeof(afs_int32));
207             if (code != 3*sizeof(afs_int32)) break;
208             /* otherwise, skip over the data bytes, too */
209             tpos += buffer[2] + 3*sizeof(afs_int32);
210         }
211         else {
212             ubik_dprint("corrupt log opcode (%d) at position %d\n", opcode, tpos);
213             break;      /* corrupt log! */
214         }
215     }
216     if (logIsGood) {
217         /* actually do the replay; log should go all the way through the commit record, since
218             we just read it above. */
219         tpos = 0;
220         logIsGood = 0;
221         syncFile = -1;
222         while (1) {
223             code = (*adbase->read)(adbase, LOGFILE, &opcode, tpos, sizeof(afs_int32));
224             if (code != sizeof(afs_int32)) break;
225             if (opcode == LOGNEW) {
226                 /* handle begin trans */
227                 tpos += sizeof(afs_int32);
228             }
229             else if (opcode == LOGABORT) panic("log abort\n");
230             else if (opcode == LOGEND) {
231                 tpos += 4;
232                 code = (*adbase->read) (adbase, LOGFILE, buffer, tpos, 2*sizeof(afs_int32));
233                 if (code != 2*sizeof(afs_int32)) return UBADLOG;
234                 code = (*adbase->setlabel) (adbase, 0, buffer);
235                 if (code) return code;
236                 logIsGood = 1;
237                 break;      /* all done now */
238             }
239             else if (opcode == LOGTRUNCATE) {
240                 tpos += 4;
241                 code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 2*sizeof(afs_int32));
242                 if (code != 2*sizeof(afs_int32)) break; /* premature eof or io error */
243                 tpos += 2*sizeof(afs_int32);
244                 code = (*adbase->truncate) (adbase, ntohl(buffer[0]), ntohl(buffer[1]));
245                 if (code) return code;
246             }
247             else if (opcode == LOGDATA) {
248                 tpos += 4;
249                 code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 3*sizeof(afs_int32));
250                 if (code != 3*sizeof(afs_int32)) break;
251                 tpos += 3*sizeof(afs_int32);
252                 /* otherwise, skip over the data bytes, too */
253                 len = ntohl(buffer[2]);     /* total number of bytes to copy */
254                 filePos = ntohl(buffer[1]);
255                 tfile = ntohl(buffer[0]);
256                 /* try to minimize file syncs */
257                 if (syncFile != tfile) {
258                     if (syncFile >= 0) code = (*adbase->sync)(adbase, syncFile);
259                     else code = 0;
260                     syncFile = tfile;
261                     if (code) return code;
262                 }
263                 while (len > 0) {
264                     thisSize = (len > sizeof(data)? sizeof(data) : len);
265                     /* copy sizeof(data) buffer bytes at a time */
266                     code = (*adbase->read)(adbase, LOGFILE, data, tpos, thisSize);
267                     if (code != thisSize) return UBADLOG;
268                     code = (*adbase->write)(adbase, tfile, data, filePos, thisSize);
269                     if (code != thisSize) return UBADLOG;
270                     filePos += thisSize;
271                     tpos += thisSize;
272                     len -= thisSize;
273                 }
274             }
275             else {
276                 ubik_dprint("corrupt log opcode (%d) at position %d\n", opcode, tpos);
277                 break;  /* corrupt log! */
278             }
279         }
280         if (logIsGood) {
281             if (syncFile >= 0) code = (*adbase->sync)(adbase, syncFile);
282             if (code) return code;
283         }
284         else {
285             ubik_dprint("Log read error on pass 2\n");
286             return UBADLOG;
287         }
288     }
289
290     /* now truncate the log, we're done with it */
291     code = (*adbase->truncate)(adbase, LOGFILE, 0);
292     return code;
293 }
294
295 /* Called at initialization to figure out version of the dbase we really have.
296  * This routine is called after replaying the log; it reads the restored labels.
297  */
298 static InitializeDB(adbase)
299     register struct ubik_dbase *adbase; {
300     register afs_int32 code;
301     
302     code = (*adbase->getlabel)(adbase, 0, &adbase->version);
303     if (code) {
304         /* try setting the label to a new value */
305         adbase->version.epoch = 1;      /* value for newly-initialized db */
306         adbase->version.counter = 1;
307         code = (*adbase->setlabel) (adbase, 0, &adbase->version);
308         if (code) {
309             /* failed, try to set it back */
310             adbase->version.epoch = 0;
311             adbase->version.counter = 0;
312             (*adbase->setlabel) (adbase, 0, &adbase->version);
313         }
314         LWP_NoYieldSignal(&adbase->version);
315     }
316     return 0;
317 }
318
319 /* initialize the local dbase
320  * We replay the logs and then read the resulting file to figure out what version we've really got.
321  */
322 urecovery_Initialize(adbase)
323     register struct ubik_dbase *adbase; {
324     register afs_int32 code;
325
326     code = ReplayLog(adbase);
327     if (code) return code;
328     code = InitializeDB(adbase);
329     return code;
330 }
331
332 /* Main interaction loop for the recovery manager
333  * The recovery light-weight process only runs when you're the
334  * synchronization site.  It performs the following tasks, if and only
335  * if the prerequisite tasks have been performed successfully (it
336  * keeps track of which ones have been performed in its bit map,
337  * urecovery_state).
338  *
339  * First, it is responsible for probing that all servers are up.  This
340  * is the only operation that must be performed even if this is not
341  * yet the sync site, since otherwise this site may not notice that
342  * enough other machines are running to even elect this guy to be the
343  * sync site.
344  *
345  * After that, the recovery process does nothing until the beacon and
346  * voting modules manage to get this site elected sync site.
347  *
348  * After becoming sync site, recovery first attempts to find the best
349  * database available in the network (it must do this in order to
350  * ensure finding the latest committed data).  After finding the right
351  * database, it must fetch this dbase to the sync site.
352  *
353  * After fetching the dbase, it relabels it with a new version number,
354  * to ensure that everyone recognizes this dbase as the most recent
355  * dbase.
356  *
357  * One the dbase has been relabelled, this machine can start handling
358  * requests.  However, the recovery module still has one more task:
359  * propagating the dbase out to everyone who is up in the network.
360  */
361 urecovery_Interact() {
362     afs_int32 code, tcode;
363     struct ubik_server *bestServer;
364     struct ubik_server *ts;
365     int dbok, doingRPC, now;
366     afs_int32 lastProbeTime, lastDBVCheck;
367     /* if we're the sync site, the best db version we've found yet */
368     static struct ubik_version bestDBVersion;
369     struct ubik_version tversion;
370     struct timeval tv;
371     int length, tlen, offset, file, nbytes;
372     struct rx_call *rxcall;
373     char tbuffer[256];
374     struct ubik_stat ubikstat;
375     struct in_addr inAddr;
376
377     /* otherwise, begin interaction */
378     urecovery_state = 0;
379     lastProbeTime = 0;
380     lastDBVCheck = 0;
381     while (1) {
382         /* Run through this loop every 4 seconds */
383         tv.tv_sec = 4;
384         tv.tv_usec = 0;
385         IOMGR_Select(0, 0, 0, 0, &tv);
386
387         ubik_dprint("recovery running in state %x\n", urecovery_state);
388
389         /* Every 30 seconds, check all the down servers and mark them
390          * as up if they respond. When a server comes up or found to
391          * not be current, then re-find the the best database and 
392          * propogate it.
393          */
394         if ( (now = FT_ApproxTime()) > 30 + lastProbeTime) {
395             for (ts=ubik_servers,doingRPC=0; ts; ts=ts->next) {
396                 if (!ts->up) {
397                     doingRPC = 1;
398                     code = DoProbe(ts); 
399                     if (code == 0) {
400                        ts->up = 1;
401                        urecovery_state &= ~UBIK_RECFOUNDDB;
402                     }
403                 } else if (!ts->currentDB) {
404                     urecovery_state &= ~UBIK_RECFOUNDDB;
405                 }
406             }
407             if (doingRPC) 
408                 now = FT_ApproxTime();
409             lastProbeTime = now;
410         }
411
412         /* Mark whether we are the sync site */
413         if (!ubeacon_AmSyncSite()) {
414             urecovery_state &= ~UBIK_RECSYNCSITE;
415             continue;           /* nothing to do */
416         }
417         urecovery_state |= UBIK_RECSYNCSITE;
418         
419         /* If a server has just come up or if we have not found the 
420          * most current database, then go find the most current db.
421          */
422         if (!(urecovery_state & UBIK_RECFOUNDDB)) {
423             bestServer = (struct ubik_server *) 0;
424             bestDBVersion.epoch = 0;
425             bestDBVersion.counter = 0;
426             for(ts=ubik_servers; ts; ts=ts->next) {
427                 if (!ts->up) continue;  /* don't bother with these guys */
428                 if (ts->isClone) continue;
429                 code = DISK_GetVersion(ts->disk_rxcid, &ts->version);
430                 if (code == 0) {
431                     /* perhaps this is the best version */
432                     if (vcmp(ts->version, bestDBVersion) > 0) {
433                         /* new best version */
434                         bestDBVersion = ts->version;
435                         bestServer = ts;
436                     }
437                 }
438             }
439             /* take into consideration our version. Remember if we,
440              * the sync site, have the best version. Also note that
441              * we may need to send the best version out.
442              */
443             if (vcmp(ubik_dbase->version, bestDBVersion) >= 0) {
444                 bestDBVersion = ubik_dbase->version;
445                 bestServer = (struct ubik_server *) 0;
446                urecovery_state |= UBIK_RECHAVEDB;
447             } else {
448                /* Clear the flag only when we know we have to retrieve
449                 * the db. Because urecovery_AllBetter() looks at it.
450                 */
451                urecovery_state &= ~UBIK_RECHAVEDB;
452             }
453             lastDBVCheck = FT_ApproxTime();
454             urecovery_state |=  UBIK_RECFOUNDDB;
455             urecovery_state &= ~UBIK_RECSENTDB;
456         }
457         if (!(urecovery_state & UBIK_RECFOUNDDB)) continue; /* not ready */
458
459         /* If we, the sync site, do not have the best db version, then
460          * go and get it from the server that does.
461          */
462         if ((urecovery_state & UBIK_RECHAVEDB) || !bestServer) {
463            urecovery_state |= UBIK_RECHAVEDB;
464         } else {
465            /* we don't have the best version; we should fetch it. */
466            ObtainWriteLock(&ubik_dbase->versionLock);
467            urecovery_AbortAll(ubik_dbase);
468
469            /* Rx code to do the Bulk fetch */
470            file = 0;
471            offset = 0;
472            rxcall = rx_NewCall(bestServer->disk_rxcid);
473
474            ubik_print("Ubik: Synchronize database with server %s\n", afs_inet_ntoa(bestServer->addr[0]));
475
476            code = StartDISK_GetFile(rxcall, file);
477            if (code) { 
478               ubik_dprint("StartDiskGetFile failed=%d\n", code); 
479               goto FetchEndCall;
480            }
481            nbytes = rx_Read(rxcall, &length, sizeof(afs_int32));
482            length = ntohl(length);
483            if (nbytes != sizeof(afs_int32)) { 
484               ubik_dprint("Rx-read length error=%d\n", code=BULK_ERROR); 
485               code = EIO;
486               goto FetchEndCall;
487            }
488
489            /* Truncate the file firest */
490            code = (*ubik_dbase->truncate)(ubik_dbase, file, 0);
491            if (code) {
492               ubik_dprint("truncate io error=%d\n", code); 
493               goto FetchEndCall;
494            }
495
496            /* give invalid label during file transit */
497            tversion.epoch = 0;
498            tversion.counter = 0;
499            code = (*ubik_dbase->setlabel)(ubik_dbase, file, &tversion);
500            if (code) {
501               ubik_dprint("setlabel io error=%d\n", code); 
502               goto FetchEndCall;
503            }
504
505            while (length > 0)   {
506               tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
507               nbytes = rx_Read(rxcall, tbuffer, tlen);
508               if (nbytes != tlen) { 
509                  ubik_dprint("Rx-read bulk error=%d\n", code=BULK_ERROR); 
510                  code = EIO;
511                  goto FetchEndCall;
512               }
513               nbytes = (*ubik_dbase->write)(ubik_dbase, file, tbuffer, offset, tlen);
514               if (nbytes != tlen) {
515                  code = UIOERROR;
516                  goto FetchEndCall;
517               }
518               offset += tlen;
519               length -= tlen;
520            }
521            code = EndDISK_GetFile(rxcall, &tversion);
522 FetchEndCall:
523            tcode = rx_EndCall(rxcall, code);
524            if (!code) code = tcode;
525            if (!code) {
526               /* we got a new file, set up its header */
527               urecovery_state |= UBIK_RECHAVEDB;
528               bcopy(&tversion, &ubik_dbase->version, sizeof(struct ubik_version));
529               (*ubik_dbase->sync)(ubik_dbase, 0);       /* get data out first */
530               /* after data is good, sync disk with correct label */
531               code = (*ubik_dbase->setlabel)(ubik_dbase, 0, &ubik_dbase->version);
532            }
533            if (code) {
534               ubik_dbase->version.epoch = 0;
535               ubik_dbase->version.counter = 0;
536               ubik_print("Ubik: Synchronize database failed (error = %d)\n", code);
537            } else {
538               ubik_print("Ubik: Synchronize database completed\n");
539            }
540            udisk_Invalidate(ubik_dbase, 0);     /* data has changed */
541            LWP_NoYieldSignal(&ubik_dbase->version);
542            ReleaseWriteLock(&ubik_dbase->versionLock);
543         }
544         if (!(urecovery_state & UBIK_RECHAVEDB)) continue; /* not ready */
545         
546         /* If the database was newly initialized, then when we establish quorum, write
547          * a new label. This allows urecovery_AllBetter() to allow access for reads.
548          * Setting it to 2 also allows another site to come along with a newer
549          * database and overwrite this one.
550          */
551         if (ubik_dbase->version.epoch == 1) {
552            ObtainWriteLock(&ubik_dbase->versionLock);
553            urecovery_AbortAll(ubik_dbase);
554            ubik_epochTime = 2;
555            ubik_dbase->version.epoch   = ubik_epochTime;
556            ubik_dbase->version.counter = 1;
557            code = (*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
558            udisk_Invalidate(ubik_dbase, 0);           /* data may have changed */
559            LWP_NoYieldSignal(&ubik_dbase->version);
560            ReleaseWriteLock(&ubik_dbase->versionLock);
561         }
562
563         /* Check the other sites and send the database to them if they
564          * do not have the current db.
565          */
566         if (!(urecovery_state & UBIK_RECSENTDB)) {
567             /* now propagate out new version to everyone else */
568             dbok = 1;       /* start off assuming they all worked */
569
570             ObtainWriteLock(&ubik_dbase->versionLock);
571             /*
572              * Check if a write transaction is in progress. We can't send the
573              * db when a write is in progress here because the db would be
574              * obsolete as soon as it goes there. Also, ops after the begin
575              * trans would reach the recepient and wouldn't find a transaction
576              * pending there.  Frankly, I don't think it's possible to get past
577              * the write-lock above if there is a write transaction in progress,
578              * but then, it won't hurt to check, will it?
579              */
580             if (ubik_dbase->flags & DBWRITING) {
581               struct timeval tv;
582               int safety = 0;
583               tv.tv_sec =  0;
584               tv.tv_usec = 50000;
585               while ((ubik_dbase->flags & DBWRITING) && (safety < 500)) {
586                 ReleaseWriteLock(&ubik_dbase->versionLock);     
587                 /* sleep for a little while */
588                 IOMGR_Select(0, 0, 0, 0, &tv);
589                 tv.tv_usec += 10000; safety++;
590                 ObtainWriteLock(&ubik_dbase->versionLock);                   
591               }
592             }
593
594             for(ts=ubik_servers; ts; ts=ts->next) {
595                 inAddr.s_addr = ts->addr[0];
596                 if (!ts->up) {
597                     ubik_dprint("recovery cannot send version to %s\n",
598                                         afs_inet_ntoa(inAddr.s_addr));
599                     dbok = 0;
600                     continue;
601                 }
602                 ubik_dprint("recovery sending version to %s\n",
603                             afs_inet_ntoa(inAddr.s_addr));
604                 if (vcmp(ts->version, ubik_dbase->version) != 0) {
605                     ubik_dprint("recovery stating local database\n");
606
607                     /* Rx code to do the Bulk Store */
608                     code = (*ubik_dbase->stat)(ubik_dbase, 0, &ubikstat);
609                     if (!code) {
610                         length = ubikstat.size;
611                         file = offset = 0;
612                         rxcall = rx_NewCall(ts->disk_rxcid);
613                         code = StartDISK_SendFile(rxcall, file, length, &ubik_dbase->version);
614                         if (code) { 
615                             ubik_dprint("StartDiskSendFile failed=%d\n", code); 
616                             goto StoreEndCall;
617                         }
618                         while (length > 0)      {
619                             tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
620                             nbytes = (*ubik_dbase->read)(ubik_dbase, file, tbuffer, offset, tlen);
621                             if (nbytes != tlen) {
622                                 ubik_dprint("Local disk read error=%d\n", code=UIOERROR);
623                                 goto StoreEndCall;
624                             }
625                             nbytes = rx_Write(rxcall, tbuffer, tlen);
626                             if (nbytes != tlen) { 
627                                 ubik_dprint("Rx-write bulk error=%d\n", code=BULK_ERROR); 
628                                 goto StoreEndCall;
629                             }
630                             offset += tlen;
631                             length -= tlen;
632                         }
633                         code = EndDISK_SendFile(rxcall);
634 StoreEndCall:
635                         code = rx_EndCall(rxcall, code);
636                     }
637                     if (code == 0) {
638                         /* we set a new file, process its header */
639                         ts->version = ubik_dbase->version;
640                         ts->currentDB = 1;
641                     }
642                     else dbok = 0;
643                 }
644                 else {
645                     /* mark file up to date */
646                     ts->currentDB = 1;
647                 }
648             }
649             ReleaseWriteLock(&ubik_dbase->versionLock);
650             if (dbok) urecovery_state |= UBIK_RECSENTDB;
651         }
652     }
653 }
654
655 /*
656 ** send a Probe to all the network address of this server 
657 ** Return 0  if success, else return 1
658 */
659 DoProbe(server)
660 struct ubik_server *server;
661 {
662     struct rx_connection *conns[UBIK_MAX_INTERFACE_ADDR];
663     struct rx_connection *connSuccess = 0;
664     int                 i, j;
665     afs_uint32          addr;
666     char                buffer[32]; 
667     extern afs_int32    ubikSecIndex;
668     extern struct rx_securityClass      *ubikSecClass;
669
670     for (i=0; (addr=server->addr[i]) && (i<UBIK_MAX_INTERFACE_ADDR);i++)
671     {
672         conns[i] = rx_NewConnection(addr, ubik_callPortal, DISK_SERVICE_ID, 
673                                         ubikSecClass, ubikSecIndex);
674
675                 /* user requirement to use only the primary interface */
676         if ( ubikPrimaryAddrOnly )
677         {
678             i = 1;
679             break;
680         }
681     }
682     assert(i);  /* at least one interface address for this server */
683
684     multi_Rx(conns,i)
685     {
686         multi_DISK_Probe();
687         if ( !multi_error )                    /* first success */
688         {
689             addr = server->addr[multi_i];     /* successful interface addr */
690
691             if ( server->disk_rxcid)          /* destroy existing conn */
692                 rx_DestroyConnection(server->disk_rxcid); 
693             if ( server->vote_rxcid)
694                 rx_DestroyConnection(server->vote_rxcid);
695
696                                               /* make new connections */
697             server->disk_rxcid = conns[multi_i];
698             server->vote_rxcid = rx_NewConnection(addr, ubik_callPortal,
699                 VOTE_SERVICE_ID, ubikSecClass, ubikSecIndex);/* for vote reqs*/
700
701             connSuccess        = conns[multi_i];
702             strcpy(buffer, (char*)afs_inet_ntoa(server->addr[0]));
703             ubik_print("ubik:server %s is back up: will be contacted through %s\n",
704                         buffer, afs_inet_ntoa(addr));
705
706             multi_Abort;
707         }
708     } multi_End_Ignore; 
709
710     /* Destroy all connections except the one on which we succeeded */
711     for ( j=0; j < i; j++)
712         if ( conns[j] != connSuccess )
713             rx_DestroyConnection(conns[j] );
714
715     if (!connSuccess)
716         ubik_dprint("ubik:server %s still down\n",afs_inet_ntoa(server->addr[0]));
717
718     if ( connSuccess ) return 0;    /* success */
719         else return 1;              /* failure */
720 }