more-prototyping-20030513
[openafs.git] / src / ubik / recovery.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 RCSID("$Header$");
14
15 #include <sys/types.h>
16 #ifdef AFS_NT40_ENV
17 #include <winsock2.h>
18 #include <time.h>
19 #else
20 #include <sys/file.h>
21 #include <netinet/in.h>
22 #include <sys/time.h>
23 #endif
24 #include <assert.h>
25 #include <lock.h>
26 #ifdef HAVE_STRING_H
27 #include <string.h>
28 #else
29 #ifdef HAVE_STRINGS_H
30 #include <strings.h>
31 #endif
32 #endif
33 #include <rx/xdr.h>
34 #include <rx/rx.h>
35 #include <errno.h>
36 #include <afs/afsutil.h>
37
38 #define UBIK_INTERNALS
39 #include "ubik.h"
40 #include "ubik_int.h"
41
42 /* This module is responsible for determining when the system has
43  * recovered to the point that it can handle new transactions.  It
44  * replays logs, polls to determine the current dbase after a crash,
45  * and distributes the new database to the others.
46  */
47
48 /* The sync site associates a version number with each database.  It
49  * broadcasts the version associated with its current dbase in every
50  * one of its beacon messages.  When the sync site send a dbase to a
51  * server, it also sends the db's version.  A non-sync site server can
52  * tell if it has the right dbase version by simply comparing the
53  * version from the beacon message (uvote_dbVersion) with the version
54  * associated with the database (ubik_dbase->version).  The sync site
55  * itself simply has one counter to keep track of all of this (again
56  * ubik_dbase->version).
57  */
58
59 /* sync site: routine called when the sync site loses its quorum; this
60  * procedure is called "up" from the beacon package.  It resyncs the
61  * dbase and nudges the recovery daemon to try to propagate out the
62  * changes.  It also resets the recovery daemon's state, since
63  * recovery must potentially find a new dbase to propagate out.  This
64  * routine should not do anything with variables used by non-sync site
65  * servers.
66  */ 
67
68 /* if this flag is set, then ubik will use only the primary address 
69 ** ( the address specified in the CellServDB) to contact other 
70 ** ubik servers. Ubik recovery will not try opening connections 
71 ** to the alternate interface addresses. 
72 */
73 int ubikPrimaryAddrOnly;
74
75 int urecovery_ResetState(void)
76 {
77     urecovery_state = 0;
78     LWP_NoYieldSignal(&urecovery_state);
79     return 0;
80 }
81
82 /* sync site: routine called when a non-sync site server goes down; restarts recovery
83  * process to send missing server the new db when it comes back up.
84  * This routine should not do anything with variables used by non-sync site servers. 
85  */
86 int urecovery_LostServer(void)
87 {
88     LWP_NoYieldSignal(&urecovery_state);
89     return 0;
90 }
91
92 /* return true iff we have a current database (called by both sync
93  * sites and non-sync sites) How do we determine this?  If we're the
94  * sync site, we wait until recovery has finished fetching and
95  * re-labelling its dbase (it may still be trying to propagate it out
96  * to everyone else; that's THEIR problem).  If we're not the sync
97  * site, then we must have a dbase labelled with the right version,
98  * and we must have a currently-good sync site.
99  */
100 int urecovery_AllBetter(register struct ubik_dbase *adbase, int areadAny)
101 {
102     register afs_int32 rcode;
103
104     ubik_dprint("allbetter checking\n");
105     rcode = 0;
106     
107     
108     if (areadAny) {
109       if (ubik_dbase->version.epoch > 1)
110           rcode = 1;                      /* Happy with any good version of database */
111     }
112
113     /* Check if we're sync site and we've got the right data */
114     else if (ubeacon_AmSyncSite() && (urecovery_state & UBIK_RECHAVEDB)) {
115         rcode = 1;
116     }
117
118     /* next, check if we're aux site, and we've ever been sent the
119      * right data (note that if a dbase update fails, we won't think
120      * that the sync site is still the sync site, 'cause it won't talk
121      * to us until a timeout period has gone by.  When we recover, we
122      * leave this clear until we get a new dbase */
123     else if ( (uvote_GetSyncSite() &&
124               (vcmp(ubik_dbVersion, ubik_dbase->version) == 0)) ) { /* && order is important */
125         rcode = 1;
126     }
127
128     ubik_dprint("allbetter: returning %d\n", rcode);
129     return rcode;
130 }
131
132 /* abort all transactions on this database */
133 int urecovery_AbortAll(struct ubik_dbase *adbase)
134 {
135     register struct ubik_trans *tt;
136     for(tt = adbase->activeTrans; tt; tt=tt->next) {
137         udisk_abort(tt);
138     }
139     return 0;
140 }
141
142 /* this routine aborts the current remote transaction, if any, if the tid is wrong */
143 int urecovery_CheckTid(register struct ubik_tid *atid)
144 {
145     if (ubik_currentTrans) {
146         /* there is remote write trans, see if we match, see if this
147          * is a new transaction */
148         if (atid->epoch != ubik_currentTrans->tid.epoch || atid->counter > ubik_currentTrans->tid.counter) {
149             /* don't match, abort it */
150             /* If the thread is not waiting for lock - ok to end it */
151 #if !defined(UBIK_PAUSE)
152             if (ubik_currentTrans->locktype != LOCKWAIT) {
153 #endif /* UBIK_PAUSE */
154                udisk_end(ubik_currentTrans);
155 #if !defined(UBIK_PAUSE)
156             }
157 #endif /* UBIK_PAUSE */
158             ubik_currentTrans = (struct ubik_trans *) 0;
159         }
160     }
161 }
162
163 /* log format is defined here, and implicitly in disk.c
164  *
165  * 4 byte opcode, followed by parameters, each 4 bytes long.  All integers
166  * are in logged in network standard byte order, in case we want to move logs
167  * from machine-to-machine someday.
168  *
169  * Begin transaction: opcode
170  * Commit transaction: opcode, version (8 bytes)
171  * Truncate file: opcode, file number, length
172  * Abort transaction: opcode
173  * Write data: opcode, file, position, length, <length> data bytes
174  *
175  * A very simple routine, it just replays the log.  Note that this is a new-value only log, which
176  * implies that no uncommitted data is written to the dbase: one writes data to the log, including
177  * the commit record, then we allow data to be written through to the dbase.  In our particular
178  * implementation, once a transaction is done, we write out the pages to the database, so that
179  * our buffer package doesn't have to know about stable and uncommitted data in the memory buffers:
180  * any changed data while there is an uncommitted write transaction can be zapped during an
181  * abort and the remaining dbase on the disk is exactly the right dbase, without having to read
182  * the log.
183  */
184
185 /* replay logs */
186 static int ReplayLog(register struct ubik_dbase *adbase)
187 {
188     afs_int32 opcode;
189     register afs_int32 code, tpos;
190     int logIsGood;
191     afs_int32 len, thisSize, tfile, filePos;
192     afs_int32 buffer[4];
193     afs_int32 syncFile = -1;
194     afs_int32 data[1024];
195
196     /* read the lock twice, once to see whether we have a transaction to deal
197         with that committed, (theoretically, we should support more than one
198         trans in the log at once, but not yet), and once replaying the
199         transactions.  */
200     tpos = 0;
201     logIsGood = 0;
202     /* for now, assume that all ops in log pertain to one transaction; see if there's a commit */
203     while (1) {
204         code = (*adbase->read)(adbase, LOGFILE, &opcode, tpos, sizeof(afs_int32));
205         if (code != sizeof(afs_int32)) break;
206         if (opcode == LOGNEW) {
207             /* handle begin trans */
208             tpos += sizeof(afs_int32);
209         }
210         else if (opcode == LOGABORT) break;
211         else if (opcode == LOGEND) {
212             logIsGood = 1;
213             break;
214         }
215         else if (opcode == LOGTRUNCATE) {
216             tpos += 4;
217             code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 2*sizeof(afs_int32));
218             if (code != 2*sizeof(afs_int32))    break;  /* premature eof or io error */
219             tpos += 2*sizeof(afs_int32);
220         }
221         else if (opcode == LOGDATA) {
222             tpos += 4;
223             code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 3*sizeof(afs_int32));
224             if (code != 3*sizeof(afs_int32)) break;
225             /* otherwise, skip over the data bytes, too */
226             tpos += buffer[2] + 3*sizeof(afs_int32);
227         }
228         else {
229             ubik_dprint("corrupt log opcode (%d) at position %d\n", opcode, tpos);
230             break;      /* corrupt log! */
231         }
232     }
233     if (logIsGood) {
234         /* actually do the replay; log should go all the way through the commit record, since
235             we just read it above. */
236         tpos = 0;
237         logIsGood = 0;
238         syncFile = -1;
239         while (1) {
240             code = (*adbase->read)(adbase, LOGFILE, &opcode, tpos, sizeof(afs_int32));
241             if (code != sizeof(afs_int32)) break;
242             if (opcode == LOGNEW) {
243                 /* handle begin trans */
244                 tpos += sizeof(afs_int32);
245             }
246             else if (opcode == LOGABORT) panic("log abort\n");
247             else if (opcode == LOGEND) {
248                 tpos += 4;
249                 code = (*adbase->read) (adbase, LOGFILE, buffer, tpos, 2*sizeof(afs_int32));
250                 if (code != 2*sizeof(afs_int32)) return UBADLOG;
251                 code = (*adbase->setlabel) (adbase, 0, buffer);
252                 if (code) return code;
253                 logIsGood = 1;
254                 break;      /* all done now */
255             }
256             else if (opcode == LOGTRUNCATE) {
257                 tpos += 4;
258                 code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 2*sizeof(afs_int32));
259                 if (code != 2*sizeof(afs_int32)) break; /* premature eof or io error */
260                 tpos += 2*sizeof(afs_int32);
261                 code = (*adbase->truncate) (adbase, ntohl(buffer[0]), ntohl(buffer[1]));
262                 if (code) return code;
263             }
264             else if (opcode == LOGDATA) {
265                 tpos += 4;
266                 code = (*adbase->read)(adbase, LOGFILE, buffer, tpos, 3*sizeof(afs_int32));
267                 if (code != 3*sizeof(afs_int32)) break;
268                 tpos += 3*sizeof(afs_int32);
269                 /* otherwise, skip over the data bytes, too */
270                 len = ntohl(buffer[2]);     /* total number of bytes to copy */
271                 filePos = ntohl(buffer[1]);
272                 tfile = ntohl(buffer[0]);
273                 /* try to minimize file syncs */
274                 if (syncFile != tfile) {
275                     if (syncFile >= 0) code = (*adbase->sync)(adbase, syncFile);
276                     else code = 0;
277                     syncFile = tfile;
278                     if (code) return code;
279                 }
280                 while (len > 0) {
281                     thisSize = (len > sizeof(data)? sizeof(data) : len);
282                     /* copy sizeof(data) buffer bytes at a time */
283                     code = (*adbase->read)(adbase, LOGFILE, data, tpos, thisSize);
284                     if (code != thisSize) return UBADLOG;
285                     code = (*adbase->write)(adbase, tfile, data, filePos, thisSize);
286                     if (code != thisSize) return UBADLOG;
287                     filePos += thisSize;
288                     tpos += thisSize;
289                     len -= thisSize;
290                 }
291             }
292             else {
293                 ubik_dprint("corrupt log opcode (%d) at position %d\n", opcode, tpos);
294                 break;  /* corrupt log! */
295             }
296         }
297         if (logIsGood) {
298             if (syncFile >= 0) code = (*adbase->sync)(adbase, syncFile);
299             if (code) return code;
300         }
301         else {
302             ubik_dprint("Log read error on pass 2\n");
303             return UBADLOG;
304         }
305     }
306
307     /* now truncate the log, we're done with it */
308     code = (*adbase->truncate)(adbase, LOGFILE, 0);
309     return code;
310 }
311
312 /* Called at initialization to figure out version of the dbase we really have.
313  * This routine is called after replaying the log; it reads the restored labels.
314  */
315 static int InitializeDB(register struct ubik_dbase *adbase)
316 {
317     register afs_int32 code;
318     
319     code = (*adbase->getlabel)(adbase, 0, &adbase->version);
320     if (code) {
321         /* try setting the label to a new value */
322         adbase->version.epoch = 1;      /* value for newly-initialized db */
323         adbase->version.counter = 1;
324         code = (*adbase->setlabel) (adbase, 0, &adbase->version);
325         if (code) {
326             /* failed, try to set it back */
327             adbase->version.epoch = 0;
328             adbase->version.counter = 0;
329             (*adbase->setlabel) (adbase, 0, &adbase->version);
330         }
331         LWP_NoYieldSignal(&adbase->version);
332     }
333     return 0;
334 }
335
336 /* initialize the local dbase
337  * We replay the logs and then read the resulting file to figure out what version we've really got.
338  */
339 int urecovery_Initialize(register struct ubik_dbase *adbase)
340 {
341     register afs_int32 code;
342
343     code = ReplayLog(adbase);
344     if (code) return code;
345     code = InitializeDB(adbase);
346     return code;
347 }
348
349 /* Main interaction loop for the recovery manager
350  * The recovery light-weight process only runs when you're the
351  * synchronization site.  It performs the following tasks, if and only
352  * if the prerequisite tasks have been performed successfully (it
353  * keeps track of which ones have been performed in its bit map,
354  * urecovery_state).
355  *
356  * First, it is responsible for probing that all servers are up.  This
357  * is the only operation that must be performed even if this is not
358  * yet the sync site, since otherwise this site may not notice that
359  * enough other machines are running to even elect this guy to be the
360  * sync site.
361  *
362  * After that, the recovery process does nothing until the beacon and
363  * voting modules manage to get this site elected sync site.
364  *
365  * After becoming sync site, recovery first attempts to find the best
366  * database available in the network (it must do this in order to
367  * ensure finding the latest committed data).  After finding the right
368  * database, it must fetch this dbase to the sync site.
369  *
370  * After fetching the dbase, it relabels it with a new version number,
371  * to ensure that everyone recognizes this dbase as the most recent
372  * dbase.
373  *
374  * One the dbase has been relabelled, this machine can start handling
375  * requests.  However, the recovery module still has one more task:
376  * propagating the dbase out to everyone who is up in the network.
377  */
378 int urecovery_Interact(void)
379 {
380     afs_int32 code, tcode;
381     struct ubik_server *bestServer = NULL;
382     struct ubik_server *ts;
383     int dbok, doingRPC, now;
384     afs_int32 lastProbeTime, lastDBVCheck;
385     /* if we're the sync site, the best db version we've found yet */
386     static struct ubik_version bestDBVersion;
387     struct ubik_version tversion;
388     struct timeval tv;
389     int length, tlen, offset, file, nbytes;
390     struct rx_call *rxcall;
391     char tbuffer[256];
392     struct ubik_stat ubikstat;
393     struct in_addr inAddr;
394
395     /* otherwise, begin interaction */
396     urecovery_state = 0;
397     lastProbeTime = 0;
398     lastDBVCheck = 0;
399     while (1) {
400         /* Run through this loop every 4 seconds */
401         tv.tv_sec = 4;
402         tv.tv_usec = 0;
403         IOMGR_Select(0, 0, 0, 0, &tv);
404
405         ubik_dprint("recovery running in state %x\n", urecovery_state);
406
407         /* Every 30 seconds, check all the down servers and mark them
408          * as up if they respond. When a server comes up or found to
409          * not be current, then re-find the the best database and 
410          * propogate it.
411          */
412         if ( (now = FT_ApproxTime()) > 30 + lastProbeTime) {
413             for (ts=ubik_servers,doingRPC=0; ts; ts=ts->next) {
414                 if (!ts->up) {
415                     doingRPC = 1;
416                     code = DoProbe(ts); 
417                     if (code == 0) {
418                        ts->up = 1;
419                        urecovery_state &= ~UBIK_RECFOUNDDB;
420                     }
421                 } else if (!ts->currentDB) {
422                     urecovery_state &= ~UBIK_RECFOUNDDB;
423                 }
424             }
425             if (doingRPC) 
426                 now = FT_ApproxTime();
427             lastProbeTime = now;
428         }
429
430         /* Mark whether we are the sync site */
431         if (!ubeacon_AmSyncSite()) {
432             urecovery_state &= ~UBIK_RECSYNCSITE;
433             continue;           /* nothing to do */
434         }
435         urecovery_state |= UBIK_RECSYNCSITE;
436         
437         /* If a server has just come up or if we have not found the 
438          * most current database, then go find the most current db.
439          */
440         if (!(urecovery_state & UBIK_RECFOUNDDB)) {
441             bestServer = (struct ubik_server *) 0;
442             bestDBVersion.epoch = 0;
443             bestDBVersion.counter = 0;
444             for(ts=ubik_servers; ts; ts=ts->next) {
445                 if (!ts->up) continue;  /* don't bother with these guys */
446                 if (ts->isClone) continue;
447                 code = DISK_GetVersion(ts->disk_rxcid, &ts->version);
448                 if (code == 0) {
449                     /* perhaps this is the best version */
450                     if (vcmp(ts->version, bestDBVersion) > 0) {
451                         /* new best version */
452                         bestDBVersion = ts->version;
453                         bestServer = ts;
454                     }
455                 }
456             }
457             /* take into consideration our version. Remember if we,
458              * the sync site, have the best version. Also note that
459              * we may need to send the best version out.
460              */
461             if (vcmp(ubik_dbase->version, bestDBVersion) >= 0) {
462                 bestDBVersion = ubik_dbase->version;
463                 bestServer = (struct ubik_server *) 0;
464                urecovery_state |= UBIK_RECHAVEDB;
465             } else {
466                /* Clear the flag only when we know we have to retrieve
467                 * the db. Because urecovery_AllBetter() looks at it.
468                 */
469                urecovery_state &= ~UBIK_RECHAVEDB;
470             }
471             lastDBVCheck = FT_ApproxTime();
472             urecovery_state |=  UBIK_RECFOUNDDB;
473             urecovery_state &= ~UBIK_RECSENTDB;
474         }
475 #if defined(UBIK_PAUSE)
476        /* it's not possible for UBIK_RECFOUNDDB not to be set here.
477         * However, we might have lost UBIK_RECSYNCSITE, and that
478         * IS important.
479         */
480         if (!(urecovery_state & UBIK_RECSYNCSITE)) continue; /* lost sync */
481 #else
482         if (!(urecovery_state & UBIK_RECFOUNDDB)) continue; /* not ready */
483 #endif /* UBIK_PAUSE */
484
485         /* If we, the sync site, do not have the best db version, then
486          * go and get it from the server that does.
487          */
488         if ((urecovery_state & UBIK_RECHAVEDB) || !bestServer) {
489            urecovery_state |= UBIK_RECHAVEDB;
490         } else {
491            /* we don't have the best version; we should fetch it. */
492 #if defined(UBIK_PAUSE)
493            DBHOLD(ubik_dbase);
494 #else
495            ObtainWriteLock(&ubik_dbase->versionLock);
496 #endif /* UBIK_PAUSE */
497            urecovery_AbortAll(ubik_dbase);
498
499            /* Rx code to do the Bulk fetch */
500            file = 0;
501            offset = 0;
502            rxcall = rx_NewCall(bestServer->disk_rxcid);
503
504            ubik_print("Ubik: Synchronize database with server %s\n", afs_inet_ntoa(bestServer->addr[0]));
505
506            code = StartDISK_GetFile(rxcall, file);
507            if (code) { 
508               ubik_dprint("StartDiskGetFile failed=%d\n", code); 
509               goto FetchEndCall;
510            }
511            nbytes = rx_Read(rxcall, &length, sizeof(afs_int32));
512            length = ntohl(length);
513            if (nbytes != sizeof(afs_int32)) { 
514               ubik_dprint("Rx-read length error=%d\n", code=BULK_ERROR); 
515               code = EIO;
516               goto FetchEndCall;
517            }
518
519            /* Truncate the file firest */
520            code = (*ubik_dbase->truncate)(ubik_dbase, file, 0);
521            if (code) {
522               ubik_dprint("truncate io error=%d\n", code); 
523               goto FetchEndCall;
524            }
525
526            /* give invalid label during file transit */
527            tversion.epoch = 0;
528            tversion.counter = 0;
529            code = (*ubik_dbase->setlabel)(ubik_dbase, file, &tversion);
530            if (code) {
531               ubik_dprint("setlabel io error=%d\n", code); 
532               goto FetchEndCall;
533            }
534
535            while (length > 0)   {
536               tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
537               nbytes = rx_Read(rxcall, tbuffer, tlen);
538               if (nbytes != tlen) { 
539                  ubik_dprint("Rx-read bulk error=%d\n", code=BULK_ERROR); 
540                  code = EIO;
541                  goto FetchEndCall;
542               }
543               nbytes = (*ubik_dbase->write)(ubik_dbase, file, tbuffer, offset, tlen);
544               if (nbytes != tlen) {
545                  code = UIOERROR;
546                  goto FetchEndCall;
547               }
548               offset += tlen;
549               length -= tlen;
550            }
551            code = EndDISK_GetFile(rxcall, &tversion);
552 FetchEndCall:
553            tcode = rx_EndCall(rxcall, code);
554            if (!code) code = tcode;
555            if (!code) {
556               /* we got a new file, set up its header */
557               urecovery_state |= UBIK_RECHAVEDB;
558               memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
559               (*ubik_dbase->sync)(ubik_dbase, 0);       /* get data out first */
560               /* after data is good, sync disk with correct label */
561               code = (*ubik_dbase->setlabel)(ubik_dbase, 0, &ubik_dbase->version);
562            }
563            if (code) {
564               ubik_dbase->version.epoch = 0;
565               ubik_dbase->version.counter = 0;
566               ubik_print("Ubik: Synchronize database failed (error = %d)\n", code);
567            } else {
568               ubik_print("Ubik: Synchronize database completed\n");
569            }
570            udisk_Invalidate(ubik_dbase, 0);     /* data has changed */
571            LWP_NoYieldSignal(&ubik_dbase->version);
572 #if defined(UBIK_PAUSE)
573            DBRELE(ubik_dbase);
574 #else
575            ReleaseWriteLock(&ubik_dbase->versionLock);
576 #endif /* UBIK_PAUSE */
577         }
578 #if defined(UBIK_PAUSE)
579         if (!(urecovery_state & UBIK_RECSYNCSITE)) continue; /* lost sync */
580 #endif /* UBIK_PAUSE */
581         if (!(urecovery_state & UBIK_RECHAVEDB)) continue; /* not ready */
582         
583         /* If the database was newly initialized, then when we establish quorum, write
584          * a new label. This allows urecovery_AllBetter() to allow access for reads.
585          * Setting it to 2 also allows another site to come along with a newer
586          * database and overwrite this one.
587          */
588         if (ubik_dbase->version.epoch == 1) {
589 #if defined(UBIK_PAUSE)
590            DBHOLD(ubik_dbase);
591 #else
592            ObtainWriteLock(&ubik_dbase->versionLock);
593 #endif /* UBIK_PAUSE */
594            urecovery_AbortAll(ubik_dbase);
595            ubik_epochTime = 2;
596            ubik_dbase->version.epoch   = ubik_epochTime;
597            ubik_dbase->version.counter = 1;
598            code = (*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
599            udisk_Invalidate(ubik_dbase, 0);           /* data may have changed */
600            LWP_NoYieldSignal(&ubik_dbase->version);
601 #if defined(UBIK_PAUSE)
602            DBRELE(ubik_dbase);
603 #else
604            ReleaseWriteLock(&ubik_dbase->versionLock);
605 #endif /* UBIK_PAUSE */
606         }
607
608         /* Check the other sites and send the database to them if they
609          * do not have the current db.
610          */
611         if (!(urecovery_state & UBIK_RECSENTDB)) {
612             /* now propagate out new version to everyone else */
613             dbok = 1;       /* start off assuming they all worked */
614
615 #if defined(UBIK_PAUSE)
616             DBHOLD(ubik_dbase);
617 #else
618             ObtainWriteLock(&ubik_dbase->versionLock);
619 #endif /* UBIK_PAUSE */
620             /*
621              * Check if a write transaction is in progress. We can't send the
622              * db when a write is in progress here because the db would be
623              * obsolete as soon as it goes there. Also, ops after the begin
624              * trans would reach the recepient and wouldn't find a transaction
625              * pending there.  Frankly, I don't think it's possible to get past
626              * the write-lock above if there is a write transaction in progress,
627              * but then, it won't hurt to check, will it?
628              */
629             if (ubik_dbase->flags & DBWRITING) {
630               struct timeval tv;
631               int safety = 0;
632               tv.tv_sec =  0;
633               tv.tv_usec = 50000;
634               while ((ubik_dbase->flags & DBWRITING) && (safety < 500)) {
635 #if defined(UBIK_PAUSE)
636                 DBRELE(ubik_dbase);
637 #else
638                 ReleaseWriteLock(&ubik_dbase->versionLock);     
639 #endif /* UBIK_PAUSE */
640                 /* sleep for a little while */
641                 IOMGR_Select(0, 0, 0, 0, &tv);
642                 tv.tv_usec += 10000; safety++;
643 #if defined(UBIK_PAUSE)
644                 DBHOLD(ubik_dbase);
645 #else
646                 ObtainWriteLock(&ubik_dbase->versionLock);                   
647 #endif /* UBIK_PAUSE */
648               }
649             }
650
651             for(ts=ubik_servers; ts; ts=ts->next) {
652                 inAddr.s_addr = ts->addr[0];
653                 if (!ts->up) {
654                     ubik_dprint("recovery cannot send version to %s\n",
655                                         afs_inet_ntoa(inAddr.s_addr));
656                     dbok = 0;
657                     continue;
658                 }
659                 ubik_dprint("recovery sending version to %s\n",
660                             afs_inet_ntoa(inAddr.s_addr));
661                 if (vcmp(ts->version, ubik_dbase->version) != 0) {
662                     ubik_dprint("recovery stating local database\n");
663
664                     /* Rx code to do the Bulk Store */
665                     code = (*ubik_dbase->stat)(ubik_dbase, 0, &ubikstat);
666                     if (!code) {
667                         length = ubikstat.size;
668                         file = offset = 0;
669                         rxcall = rx_NewCall(ts->disk_rxcid);
670                         code = StartDISK_SendFile(rxcall, file, length, &ubik_dbase->version);
671                         if (code) { 
672                             ubik_dprint("StartDiskSendFile failed=%d\n", code); 
673                             goto StoreEndCall;
674                         }
675                         while (length > 0)      {
676                             tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
677                             nbytes = (*ubik_dbase->read)(ubik_dbase, file, tbuffer, offset, tlen);
678                             if (nbytes != tlen) {
679                                 ubik_dprint("Local disk read error=%d\n", code=UIOERROR);
680                                 goto StoreEndCall;
681                             }
682                             nbytes = rx_Write(rxcall, tbuffer, tlen);
683                             if (nbytes != tlen) { 
684                                 ubik_dprint("Rx-write bulk error=%d\n", code=BULK_ERROR); 
685                                 goto StoreEndCall;
686                             }
687                             offset += tlen;
688                             length -= tlen;
689                         }
690                         code = EndDISK_SendFile(rxcall);
691 StoreEndCall:
692                         code = rx_EndCall(rxcall, code);
693                     }
694                     if (code == 0) {
695                         /* we set a new file, process its header */
696                         ts->version = ubik_dbase->version;
697                         ts->currentDB = 1;
698                     }
699                     else dbok = 0;
700                 }
701                 else {
702                     /* mark file up to date */
703                     ts->currentDB = 1;
704                 }
705             }
706 #if defined(UBIK_PAUSE)
707             DBRELE(ubik_dbase);
708 #else
709             ReleaseWriteLock(&ubik_dbase->versionLock);
710 #endif /* UBIK_PAUSE */
711             if (dbok) urecovery_state |= UBIK_RECSENTDB;
712         }
713     }
714 }
715
716 /*
717 ** send a Probe to all the network address of this server 
718 ** Return 0  if success, else return 1
719 */
720 int DoProbe(struct ubik_server *server)
721 {
722     struct rx_connection *conns[UBIK_MAX_INTERFACE_ADDR];
723     struct rx_connection *connSuccess = 0;
724     int                 i, j;
725     afs_uint32          addr;
726     char                buffer[32]; 
727     extern afs_int32    ubikSecIndex;
728     extern struct rx_securityClass      *ubikSecClass;
729
730     for (i=0; (addr=server->addr[i]) && (i<UBIK_MAX_INTERFACE_ADDR);i++)
731     {
732         conns[i] = rx_NewConnection(addr, ubik_callPortal, DISK_SERVICE_ID, 
733                                         ubikSecClass, ubikSecIndex);
734
735                 /* user requirement to use only the primary interface */
736         if ( ubikPrimaryAddrOnly )
737         {
738             i = 1;
739             break;
740         }
741     }
742     assert(i);  /* at least one interface address for this server */
743
744     multi_Rx(conns,i)
745     {
746         multi_DISK_Probe();
747         if ( !multi_error )                    /* first success */
748         {
749             addr = server->addr[multi_i];     /* successful interface addr */
750
751             if ( server->disk_rxcid)          /* destroy existing conn */
752                 rx_DestroyConnection(server->disk_rxcid); 
753             if ( server->vote_rxcid)
754                 rx_DestroyConnection(server->vote_rxcid);
755
756                                               /* make new connections */
757             server->disk_rxcid = conns[multi_i];
758             server->vote_rxcid = rx_NewConnection(addr, ubik_callPortal,
759                 VOTE_SERVICE_ID, ubikSecClass, ubikSecIndex);/* for vote reqs*/
760
761             connSuccess        = conns[multi_i];
762             strcpy(buffer, (char*)afs_inet_ntoa(server->addr[0]));
763             ubik_print("ubik:server %s is back up: will be contacted through %s\n",
764                         buffer, afs_inet_ntoa(addr));
765
766             multi_Abort;
767         }
768     } multi_End_Ignore; 
769
770     /* Destroy all connections except the one on which we succeeded */
771     for ( j=0; j < i; j++)
772         if ( conns[j] != connSuccess )
773             rx_DestroyConnection(conns[j] );
774
775     if (!connSuccess)
776         ubik_dprint("ubik:server %s still down\n",afs_inet_ntoa(server->addr[0]));
777
778     if ( connSuccess ) return 0;    /* success */
779         else return 1;              /* failure */
780 }