include-afsconfig-before-param-h-20010712
[openafs.git] / src / ubik / recovery.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 RCSID("$Header$");
14
15 #include <sys/types.h>
16 #ifdef AFS_NT40_ENV
17 #include <winsock2.h>
18 #include <time.h>
19 #else
20 #include <sys/file.h>
21 #include <netinet/in.h>
22 #include <sys/time.h>
23 #endif
24 #include <assert.h>
25 #include <lock.h>
26 #include <rx/xdr.h>
27 #include <rx/rx.h>
28 #include <errno.h>
29
30 #define UBIK_INTERNALS
31 #include "ubik.h"
32 #include "ubik_int.h"
33
34 /* This module is responsible for determining when the system has
35  * recovered to the point that it can handle new transactions.  It
36  * replays logs, polls to determine the current dbase after a crash,
37  * and distributes the new database to the others.
38  */
39
40 /* The sync site associates a version number with each database.  It
41  * broadcasts the version associated with its current dbase in every
42  * one of its beacon messages.  When the sync site send a dbase to a
43  * server, it also sends the db's version.  A non-sync site server can
44  * tell if it has the right dbase version by simply comparing the
45  * version from the beacon message (uvote_dbVersion) with the version
46  * associated with the database (ubik_dbase->version).  The sync site
47  * itself simply has one counter to keep track of all of this (again
48  * ubik_dbase->version).
49  */
50
51 /* sync site: routine called when the sync site loses its quorum; this
52  * procedure is called "up" from the beacon package.  It resyncs the
53  * dbase and nudges the recovery daemon to try to propagate out the
54  * changes.  It also resets the recovery daemon's state, since
55  * recovery must potentially find a new dbase to propagate out.  This
56  * routine should not do anything with variables used by non-sync site
57  * servers.
58  */ 
59
60 /* if this flag is set, then ubik will use only the primary address 
61 ** ( the address specified in the CellServDB) to contact other 
62 ** ubik servers. Ubik recovery will not try opening connections 
63 ** to the alternate interface addresses. 
64 */
65 int ubikPrimaryAddrOnly;
66
67 urecovery_ResetState() {
68     urecovery_state = 0;
69     LWP_NoYieldSignal(&urecovery_state);
70     return 0;
71 }
72
73 /* sync site: routine called when a non-sync site server goes down; restarts recovery
74  * process to send missing server the new db when it comes back up.
75  * This routine should not do anything with variables used by non-sync site servers. 
76  */
77 urecovery_LostServer() {
78     LWP_NoYieldSignal(&urecovery_state);
79     return 0;
80 }
81
82 /* return true iff we have a current database (called by both sync
83  * sites and non-sync sites) How do we determine this?  If we're the
84  * sync site, we wait until recovery has finished fetching and
85  * re-labelling its dbase (it may still be trying to propagate it out
86  * to everyone else; that's THEIR problem).  If we're not the sync
87  * site, then we must have a dbase labelled with the right version,
88  * and we must have a currently-good sync site.
89  */
90 urecovery_AllBetter(adbase, areadAny)
91     int areadAny;
92     register struct ubik_dbase *adbase; {
93     register afs_int32 rcode;
94
95     ubik_dprint("allbetter checking\n");
96     rcode = 0;
97     
98     
99     if (areadAny) {
100       if (ubik_dbase->version.epoch > 1)
101           rcode = 1;                      /* Happy with any good version of database */
102     }
103
104     /* Check if we're sync site and we've got the right data */
105     else if (ubeacon_AmSyncSite() && (urecovery_state & UBIK_RECHAVEDB)) {
106         rcode = 1;
107     }
108
109     /* next, check if we're aux site, and we've ever been sent the
110      * right data (note that if a dbase update fails, we won't think
111      * that the sync site is still the sync site, 'cause it won't talk
112      * to us until a timeout period has gone by.  When we recover, we
113      * leave this clear until we get a new dbase */
114     else if ( (uvote_GetSyncSite() &&
115               (vcmp(ubik_dbVersion, ubik_dbase->version) == 0)) ) { /* && order is important */
116         rcode = 1;
117     }
118
119     ubik_dprint("allbetter: returning %d\n", rcode);
120     return rcode;
121 }
122
123 /* abort all transactions on this database */
124 urecovery_AbortAll(adbase)
125 struct ubik_dbase *adbase; {
126     register struct ubik_trans *tt;
127     for(tt = adbase->activeTrans; tt; tt=tt->next) {
128         udisk_abort(tt);
129     }
130     return 0;
131 }
132
133 /* this routine aborts the current remote transaction, if any, if the tid is wrong */
134 urecovery_CheckTid(atid)
135     register struct ubik_tid *atid; {
136     if (ubik_currentTrans) {
137         /* there is remote write trans, see if we match, see if this
138          * is a new transaction */
139         if (atid->epoch != ubik_currentTrans->tid.epoch || atid->counter > ubik_currentTrans->tid.counter) {
140             /* don't match, abort it */
141             /* If the thread is not waiting for lock - ok to end it */
142             if (ubik_currentTrans->locktype != LOCKWAIT) {
143                udisk_end(ubik_currentTrans);
144             }
145             ubik_currentTrans = (struct ubik_trans *) 0;
146         }
147     }
148 }
149
150 /* log format is defined here, and implicitly in disk.c
151  *
152  * 4 byte opcode, followed by parameters, each 4 bytes long.  All integers
153  * are in logged in network standard byte order, in case we want to move logs
154  * from machine-to-machine someday.
155  *
156  * Begin transaction: opcode
157  * Commit transaction: opcode, version (8 bytes)
158  * Truncate file: opcode, file number, length
159  * Abort transaction: opcode
160  * Write data: opcode, file, position, length, <length> data bytes
161  *
162  * A very simple routine, it just replays the log.  Note that this is a new-value only log, which
163  * implies that no uncommitted data is written to the dbase: one writes data to the log, including
164  * the commit record, then we allow data to be written through to the dbase.  In our particular
165  * implementation, once a transaction is done, we write out the pages to the database, so that
166  * our buffer package doesn't have to know about stable and uncommitted data in the memory buffers:
167  * any changed data while there is an uncommitted write transaction can be zapped during an
168  * abort and the remaining dbase on the disk is exactly the right dbase, without having to read
169  * the log.
170  */
171
172 /* replay logs */
173 static ReplayLog(adbase)
174     register struct ubik_dbase *adbase; {
175     afs_int32 opcode;
176     register afs_int32 code, tpos;
177     int logIsGood;
178     afs_int32 len, thisSize, tfile, filePos;
179     afs_int32 buffer[4];
180     afs_int32 syncFile = -1;
181     afs_int32 data[1024];
182
183     /* read the lock twice, once to see whether we have a transaction to deal
184         with that committed, (theoretically, we should support more than one
185         trans in the log at once, but not yet), and once replaying the
186         transactions.  */
187     tpos = 0;
188     logIsGood = 0;
189     /* for now, assume that all ops in log pertain to one transaction; see if there's a commit */
190     while (1) {
191         code = (*adbase->read)(adbase, LOGFILE, &opcode, tpos, sizeof(afs_int32));
192         if (code != sizeof(afs_int32)) break;
193         if (opcode == LOGNEW) {
194             /* handle begin trans */
195             tpos += sizeof(afs_int32);
196         }
197         else if (opcode == LOGABORT) break;
198         else if (opcode == LOGEND) {
199             logIsGood = 1;
200             break;
201         }
202         else if (opcode == LOGTRUNCATE) {
203             tpos += 4;
204             code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 2*sizeof(afs_int32));
205             if (code != 2*sizeof(afs_int32))    break;  /* premature eof or io error */
206             tpos += 2*sizeof(afs_int32);
207         }
208         else if (opcode == LOGDATA) {
209             tpos += 4;
210             code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 3*sizeof(afs_int32));
211             if (code != 3*sizeof(afs_int32)) break;
212             /* otherwise, skip over the data bytes, too */
213             tpos += buffer[2] + 3*sizeof(afs_int32);
214         }
215         else {
216             ubik_dprint("corrupt log opcode (%d) at position %d\n", opcode, tpos);
217             break;      /* corrupt log! */
218         }
219     }
220     if (logIsGood) {
221         /* actually do the replay; log should go all the way through the commit record, since
222             we just read it above. */
223         tpos = 0;
224         logIsGood = 0;
225         syncFile = -1;
226         while (1) {
227             code = (*adbase->read)(adbase, LOGFILE, &opcode, tpos, sizeof(afs_int32));
228             if (code != sizeof(afs_int32)) break;
229             if (opcode == LOGNEW) {
230                 /* handle begin trans */
231                 tpos += sizeof(afs_int32);
232             }
233             else if (opcode == LOGABORT) panic("log abort\n");
234             else if (opcode == LOGEND) {
235                 tpos += 4;
236                 code = (*adbase->read) (adbase, LOGFILE, buffer, tpos, 2*sizeof(afs_int32));
237                 if (code != 2*sizeof(afs_int32)) return UBADLOG;
238                 code = (*adbase->setlabel) (adbase, 0, buffer);
239                 if (code) return code;
240                 logIsGood = 1;
241                 break;      /* all done now */
242             }
243             else if (opcode == LOGTRUNCATE) {
244                 tpos += 4;
245                 code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 2*sizeof(afs_int32));
246                 if (code != 2*sizeof(afs_int32)) break; /* premature eof or io error */
247                 tpos += 2*sizeof(afs_int32);
248                 code = (*adbase->truncate) (adbase, ntohl(buffer[0]), ntohl(buffer[1]));
249                 if (code) return code;
250             }
251             else if (opcode == LOGDATA) {
252                 tpos += 4;
253                 code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 3*sizeof(afs_int32));
254                 if (code != 3*sizeof(afs_int32)) break;
255                 tpos += 3*sizeof(afs_int32);
256                 /* otherwise, skip over the data bytes, too */
257                 len = ntohl(buffer[2]);     /* total number of bytes to copy */
258                 filePos = ntohl(buffer[1]);
259                 tfile = ntohl(buffer[0]);
260                 /* try to minimize file syncs */
261                 if (syncFile != tfile) {
262                     if (syncFile >= 0) code = (*adbase->sync)(adbase, syncFile);
263                     else code = 0;
264                     syncFile = tfile;
265                     if (code) return code;
266                 }
267                 while (len > 0) {
268                     thisSize = (len > sizeof(data)? sizeof(data) : len);
269                     /* copy sizeof(data) buffer bytes at a time */
270                     code = (*adbase->read)(adbase, LOGFILE, data, tpos, thisSize);
271                     if (code != thisSize) return UBADLOG;
272                     code = (*adbase->write)(adbase, tfile, data, filePos, thisSize);
273                     if (code != thisSize) return UBADLOG;
274                     filePos += thisSize;
275                     tpos += thisSize;
276                     len -= thisSize;
277                 }
278             }
279             else {
280                 ubik_dprint("corrupt log opcode (%d) at position %d\n", opcode, tpos);
281                 break;  /* corrupt log! */
282             }
283         }
284         if (logIsGood) {
285             if (syncFile >= 0) code = (*adbase->sync)(adbase, syncFile);
286             if (code) return code;
287         }
288         else {
289             ubik_dprint("Log read error on pass 2\n");
290             return UBADLOG;
291         }
292     }
293
294     /* now truncate the log, we're done with it */
295     code = (*adbase->truncate)(adbase, LOGFILE, 0);
296     return code;
297 }
298
299 /* Called at initialization to figure out version of the dbase we really have.
300  * This routine is called after replaying the log; it reads the restored labels.
301  */
302 static InitializeDB(adbase)
303     register struct ubik_dbase *adbase; {
304     register afs_int32 code;
305     
306     code = (*adbase->getlabel)(adbase, 0, &adbase->version);
307     if (code) {
308         /* try setting the label to a new value */
309         adbase->version.epoch = 1;      /* value for newly-initialized db */
310         adbase->version.counter = 1;
311         code = (*adbase->setlabel) (adbase, 0, &adbase->version);
312         if (code) {
313             /* failed, try to set it back */
314             adbase->version.epoch = 0;
315             adbase->version.counter = 0;
316             (*adbase->setlabel) (adbase, 0, &adbase->version);
317         }
318         LWP_NoYieldSignal(&adbase->version);
319     }
320     return 0;
321 }
322
323 /* initialize the local dbase
324  * We replay the logs and then read the resulting file to figure out what version we've really got.
325  */
326 urecovery_Initialize(adbase)
327     register struct ubik_dbase *adbase; {
328     register afs_int32 code;
329
330     code = ReplayLog(adbase);
331     if (code) return code;
332     code = InitializeDB(adbase);
333     return code;
334 }
335
336 /* Main interaction loop for the recovery manager
337  * The recovery light-weight process only runs when you're the
338  * synchronization site.  It performs the following tasks, if and only
339  * if the prerequisite tasks have been performed successfully (it
340  * keeps track of which ones have been performed in its bit map,
341  * urecovery_state).
342  *
343  * First, it is responsible for probing that all servers are up.  This
344  * is the only operation that must be performed even if this is not
345  * yet the sync site, since otherwise this site may not notice that
346  * enough other machines are running to even elect this guy to be the
347  * sync site.
348  *
349  * After that, the recovery process does nothing until the beacon and
350  * voting modules manage to get this site elected sync site.
351  *
352  * After becoming sync site, recovery first attempts to find the best
353  * database available in the network (it must do this in order to
354  * ensure finding the latest committed data).  After finding the right
355  * database, it must fetch this dbase to the sync site.
356  *
357  * After fetching the dbase, it relabels it with a new version number,
358  * to ensure that everyone recognizes this dbase as the most recent
359  * dbase.
360  *
361  * One the dbase has been relabelled, this machine can start handling
362  * requests.  However, the recovery module still has one more task:
363  * propagating the dbase out to everyone who is up in the network.
364  */
365 urecovery_Interact() {
366     afs_int32 code, tcode;
367     struct ubik_server *bestServer;
368     struct ubik_server *ts;
369     int dbok, doingRPC, now;
370     afs_int32 lastProbeTime, lastDBVCheck;
371     /* if we're the sync site, the best db version we've found yet */
372     static struct ubik_version bestDBVersion;
373     struct ubik_version tversion;
374     struct timeval tv;
375     int length, tlen, offset, file, nbytes;
376     struct rx_call *rxcall;
377     char tbuffer[256];
378     struct ubik_stat ubikstat;
379     struct in_addr inAddr;
380
381     /* otherwise, begin interaction */
382     urecovery_state = 0;
383     lastProbeTime = 0;
384     lastDBVCheck = 0;
385     while (1) {
386         /* Run through this loop every 4 seconds */
387         tv.tv_sec = 4;
388         tv.tv_usec = 0;
389         IOMGR_Select(0, 0, 0, 0, &tv);
390
391         ubik_dprint("recovery running in state %x\n", urecovery_state);
392
393         /* Every 30 seconds, check all the down servers and mark them
394          * as up if they respond. When a server comes up or found to
395          * not be current, then re-find the the best database and 
396          * propogate it.
397          */
398         if ( (now = FT_ApproxTime()) > 30 + lastProbeTime) {
399             for (ts=ubik_servers,doingRPC=0; ts; ts=ts->next) {
400                 if (!ts->up) {
401                     doingRPC = 1;
402                     code = DoProbe(ts); 
403                     if (code == 0) {
404                        ts->up = 1;
405                        urecovery_state &= ~UBIK_RECFOUNDDB;
406                     }
407                 } else if (!ts->currentDB) {
408                     urecovery_state &= ~UBIK_RECFOUNDDB;
409                 }
410             }
411             if (doingRPC) 
412                 now = FT_ApproxTime();
413             lastProbeTime = now;
414         }
415
416         /* Mark whether we are the sync site */
417         if (!ubeacon_AmSyncSite()) {
418             urecovery_state &= ~UBIK_RECSYNCSITE;
419             continue;           /* nothing to do */
420         }
421         urecovery_state |= UBIK_RECSYNCSITE;
422         
423         /* If a server has just come up or if we have not found the 
424          * most current database, then go find the most current db.
425          */
426         if (!(urecovery_state & UBIK_RECFOUNDDB)) {
427             bestServer = (struct ubik_server *) 0;
428             bestDBVersion.epoch = 0;
429             bestDBVersion.counter = 0;
430             for(ts=ubik_servers; ts; ts=ts->next) {
431                 if (!ts->up) continue;  /* don't bother with these guys */
432                 if (ts->isClone) continue;
433                 code = DISK_GetVersion(ts->disk_rxcid, &ts->version);
434                 if (code == 0) {
435                     /* perhaps this is the best version */
436                     if (vcmp(ts->version, bestDBVersion) > 0) {
437                         /* new best version */
438                         bestDBVersion = ts->version;
439                         bestServer = ts;
440                     }
441                 }
442             }
443             /* take into consideration our version. Remember if we,
444              * the sync site, have the best version. Also note that
445              * we may need to send the best version out.
446              */
447             if (vcmp(ubik_dbase->version, bestDBVersion) >= 0) {
448                 bestDBVersion = ubik_dbase->version;
449                 bestServer = (struct ubik_server *) 0;
450                urecovery_state |= UBIK_RECHAVEDB;
451             } else {
452                /* Clear the flag only when we know we have to retrieve
453                 * the db. Because urecovery_AllBetter() looks at it.
454                 */
455                urecovery_state &= ~UBIK_RECHAVEDB;
456             }
457             lastDBVCheck = FT_ApproxTime();
458             urecovery_state |=  UBIK_RECFOUNDDB;
459             urecovery_state &= ~UBIK_RECSENTDB;
460         }
461         if (!(urecovery_state & UBIK_RECFOUNDDB)) continue; /* not ready */
462
463         /* If we, the sync site, do not have the best db version, then
464          * go and get it from the server that does.
465          */
466         if ((urecovery_state & UBIK_RECHAVEDB) || !bestServer) {
467            urecovery_state |= UBIK_RECHAVEDB;
468         } else {
469            /* we don't have the best version; we should fetch it. */
470            ObtainWriteLock(&ubik_dbase->versionLock);
471            urecovery_AbortAll(ubik_dbase);
472
473            /* Rx code to do the Bulk fetch */
474            file = 0;
475            offset = 0;
476            rxcall = rx_NewCall(bestServer->disk_rxcid);
477
478            ubik_print("Ubik: Synchronize database with server %s\n", afs_inet_ntoa(bestServer->addr[0]));
479
480            code = StartDISK_GetFile(rxcall, file);
481            if (code) { 
482               ubik_dprint("StartDiskGetFile failed=%d\n", code); 
483               goto FetchEndCall;
484            }
485            nbytes = rx_Read(rxcall, &length, sizeof(afs_int32));
486            length = ntohl(length);
487            if (nbytes != sizeof(afs_int32)) { 
488               ubik_dprint("Rx-read length error=%d\n", code=BULK_ERROR); 
489               code = EIO;
490               goto FetchEndCall;
491            }
492
493            /* Truncate the file firest */
494            code = (*ubik_dbase->truncate)(ubik_dbase, file, 0);
495            if (code) {
496               ubik_dprint("truncate io error=%d\n", code); 
497               goto FetchEndCall;
498            }
499
500            /* give invalid label during file transit */
501            tversion.epoch = 0;
502            tversion.counter = 0;
503            code = (*ubik_dbase->setlabel)(ubik_dbase, file, &tversion);
504            if (code) {
505               ubik_dprint("setlabel io error=%d\n", code); 
506               goto FetchEndCall;
507            }
508
509            while (length > 0)   {
510               tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
511               nbytes = rx_Read(rxcall, tbuffer, tlen);
512               if (nbytes != tlen) { 
513                  ubik_dprint("Rx-read bulk error=%d\n", code=BULK_ERROR); 
514                  code = EIO;
515                  goto FetchEndCall;
516               }
517               nbytes = (*ubik_dbase->write)(ubik_dbase, file, tbuffer, offset, tlen);
518               if (nbytes != tlen) {
519                  code = UIOERROR;
520                  goto FetchEndCall;
521               }
522               offset += tlen;
523               length -= tlen;
524            }
525            code = EndDISK_GetFile(rxcall, &tversion);
526 FetchEndCall:
527            tcode = rx_EndCall(rxcall, code);
528            if (!code) code = tcode;
529            if (!code) {
530               /* we got a new file, set up its header */
531               urecovery_state |= UBIK_RECHAVEDB;
532               bcopy(&tversion, &ubik_dbase->version, sizeof(struct ubik_version));
533               (*ubik_dbase->sync)(ubik_dbase, 0);       /* get data out first */
534               /* after data is good, sync disk with correct label */
535               code = (*ubik_dbase->setlabel)(ubik_dbase, 0, &ubik_dbase->version);
536            }
537            if (code) {
538               ubik_dbase->version.epoch = 0;
539               ubik_dbase->version.counter = 0;
540               ubik_print("Ubik: Synchronize database failed (error = %d)\n", code);
541            } else {
542               ubik_print("Ubik: Synchronize database completed\n");
543            }
544            udisk_Invalidate(ubik_dbase, 0);     /* data has changed */
545            LWP_NoYieldSignal(&ubik_dbase->version);
546            ReleaseWriteLock(&ubik_dbase->versionLock);
547         }
548         if (!(urecovery_state & UBIK_RECHAVEDB)) continue; /* not ready */
549         
550         /* If the database was newly initialized, then when we establish quorum, write
551          * a new label. This allows urecovery_AllBetter() to allow access for reads.
552          * Setting it to 2 also allows another site to come along with a newer
553          * database and overwrite this one.
554          */
555         if (ubik_dbase->version.epoch == 1) {
556            ObtainWriteLock(&ubik_dbase->versionLock);
557            urecovery_AbortAll(ubik_dbase);
558            ubik_epochTime = 2;
559            ubik_dbase->version.epoch   = ubik_epochTime;
560            ubik_dbase->version.counter = 1;
561            code = (*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
562            udisk_Invalidate(ubik_dbase, 0);           /* data may have changed */
563            LWP_NoYieldSignal(&ubik_dbase->version);
564            ReleaseWriteLock(&ubik_dbase->versionLock);
565         }
566
567         /* Check the other sites and send the database to them if they
568          * do not have the current db.
569          */
570         if (!(urecovery_state & UBIK_RECSENTDB)) {
571             /* now propagate out new version to everyone else */
572             dbok = 1;       /* start off assuming they all worked */
573
574             ObtainWriteLock(&ubik_dbase->versionLock);
575             /*
576              * Check if a write transaction is in progress. We can't send the
577              * db when a write is in progress here because the db would be
578              * obsolete as soon as it goes there. Also, ops after the begin
579              * trans would reach the recepient and wouldn't find a transaction
580              * pending there.  Frankly, I don't think it's possible to get past
581              * the write-lock above if there is a write transaction in progress,
582              * but then, it won't hurt to check, will it?
583              */
584             if (ubik_dbase->flags & DBWRITING) {
585               struct timeval tv;
586               int safety = 0;
587               tv.tv_sec =  0;
588               tv.tv_usec = 50000;
589               while ((ubik_dbase->flags & DBWRITING) && (safety < 500)) {
590                 ReleaseWriteLock(&ubik_dbase->versionLock);     
591                 /* sleep for a little while */
592                 IOMGR_Select(0, 0, 0, 0, &tv);
593                 tv.tv_usec += 10000; safety++;
594                 ObtainWriteLock(&ubik_dbase->versionLock);                   
595               }
596             }
597
598             for(ts=ubik_servers; ts; ts=ts->next) {
599                 inAddr.s_addr = ts->addr[0];
600                 if (!ts->up) {
601                     ubik_dprint("recovery cannot send version to %s\n",
602                                         afs_inet_ntoa(inAddr.s_addr));
603                     dbok = 0;
604                     continue;
605                 }
606                 ubik_dprint("recovery sending version to %s\n",
607                             afs_inet_ntoa(inAddr.s_addr));
608                 if (vcmp(ts->version, ubik_dbase->version) != 0) {
609                     ubik_dprint("recovery stating local database\n");
610
611                     /* Rx code to do the Bulk Store */
612                     code = (*ubik_dbase->stat)(ubik_dbase, 0, &ubikstat);
613                     if (!code) {
614                         length = ubikstat.size;
615                         file = offset = 0;
616                         rxcall = rx_NewCall(ts->disk_rxcid);
617                         code = StartDISK_SendFile(rxcall, file, length, &ubik_dbase->version);
618                         if (code) { 
619                             ubik_dprint("StartDiskSendFile failed=%d\n", code); 
620                             goto StoreEndCall;
621                         }
622                         while (length > 0)      {
623                             tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
624                             nbytes = (*ubik_dbase->read)(ubik_dbase, file, tbuffer, offset, tlen);
625                             if (nbytes != tlen) {
626                                 ubik_dprint("Local disk read error=%d\n", code=UIOERROR);
627                                 goto StoreEndCall;
628                             }
629                             nbytes = rx_Write(rxcall, tbuffer, tlen);
630                             if (nbytes != tlen) { 
631                                 ubik_dprint("Rx-write bulk error=%d\n", code=BULK_ERROR); 
632                                 goto StoreEndCall;
633                             }
634                             offset += tlen;
635                             length -= tlen;
636                         }
637                         code = EndDISK_SendFile(rxcall);
638 StoreEndCall:
639                         code = rx_EndCall(rxcall, code);
640                     }
641                     if (code == 0) {
642                         /* we set a new file, process its header */
643                         ts->version = ubik_dbase->version;
644                         ts->currentDB = 1;
645                     }
646                     else dbok = 0;
647                 }
648                 else {
649                     /* mark file up to date */
650                     ts->currentDB = 1;
651                 }
652             }
653             ReleaseWriteLock(&ubik_dbase->versionLock);
654             if (dbok) urecovery_state |= UBIK_RECSENTDB;
655         }
656     }
657 }
658
659 /*
660 ** send a Probe to all the network address of this server 
661 ** Return 0  if success, else return 1
662 */
663 DoProbe(server)
664 struct ubik_server *server;
665 {
666     struct rx_connection *conns[UBIK_MAX_INTERFACE_ADDR];
667     struct rx_connection *connSuccess = 0;
668     int                 i, j;
669     afs_uint32          addr;
670     char                buffer[32]; 
671     extern afs_int32    ubikSecIndex;
672     extern struct rx_securityClass      *ubikSecClass;
673
674     for (i=0; (addr=server->addr[i]) && (i<UBIK_MAX_INTERFACE_ADDR);i++)
675     {
676         conns[i] = rx_NewConnection(addr, ubik_callPortal, DISK_SERVICE_ID, 
677                                         ubikSecClass, ubikSecIndex);
678
679                 /* user requirement to use only the primary interface */
680         if ( ubikPrimaryAddrOnly )
681         {
682             i = 1;
683             break;
684         }
685     }
686     assert(i);  /* at least one interface address for this server */
687
688     multi_Rx(conns,i)
689     {
690         multi_DISK_Probe();
691         if ( !multi_error )                    /* first success */
692         {
693             addr = server->addr[multi_i];     /* successful interface addr */
694
695             if ( server->disk_rxcid)          /* destroy existing conn */
696                 rx_DestroyConnection(server->disk_rxcid); 
697             if ( server->vote_rxcid)
698                 rx_DestroyConnection(server->vote_rxcid);
699
700                                               /* make new connections */
701             server->disk_rxcid = conns[multi_i];
702             server->vote_rxcid = rx_NewConnection(addr, ubik_callPortal,
703                 VOTE_SERVICE_ID, ubikSecClass, ubikSecIndex);/* for vote reqs*/
704
705             connSuccess        = conns[multi_i];
706             strcpy(buffer, (char*)afs_inet_ntoa(server->addr[0]));
707             ubik_print("ubik:server %s is back up: will be contacted through %s\n",
708                         buffer, afs_inet_ntoa(addr));
709
710             multi_Abort;
711         }
712     } multi_End_Ignore; 
713
714     /* Destroy all connections except the one on which we succeeded */
715     for ( j=0; j < i; j++)
716         if ( conns[j] != connSuccess )
717             rx_DestroyConnection(conns[j] );
718
719     if (!connSuccess)
720         ubik_dprint("ubik:server %s still down\n",afs_inet_ntoa(server->addr[0]));
721
722     if ( connSuccess ) return 0;    /* success */
723         else return 1;              /* failure */
724 }