ubik: do not reuse the offset variable for the sync site address
[openafs.git] / src / ubik / remote.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #include <roken.h>
14
15 #include <assert.h>
16
17 #include <afs/opr.h>
18 #ifdef AFS_PTHREAD_ENV
19 # include <opr/lock.h>
20 #else
21 # include <opr/lockstub.h>
22 #endif
23
24 #include <lock.h>
25 #include <rx/xdr.h>
26 #include <rx/rx.h>
27 #include <afs/afsutil.h>
28
29 #define UBIK_INTERNALS
30 #include "ubik.h"
31 #include "ubik_int.h"
32
33 static void printServerInfo(void);
34
35 /*! \file
36  * routines for handling requests remotely-submitted by the sync site.  These are
37  * only write transactions (we don't propagate read trans), and there is at most one
38  * write transaction extant at any one time.
39  */
40
41 struct ubik_trans *ubik_currentTrans = 0;
42
43
44
45 /* the rest of these guys handle remote execution of write
46  * transactions: this is the code executed on the other servers when a
47  * sync site is executing a write transaction.
48  */
49 afs_int32
50 SDISK_Begin(struct rx_call *rxcall, struct ubik_tid *atid)
51 {
52     afs_int32 code;
53
54     if ((code = ubik_CheckAuth(rxcall))) {
55         return code;
56     }
57     DBHOLD(ubik_dbase);
58     if (urecovery_AllBetter(ubik_dbase, 0) == 0) {
59         code = UNOQUORUM;
60         goto out;
61     }
62     urecovery_CheckTid(atid, 1);
63     code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
64     if (!code && ubik_currentTrans) {
65         /* label this trans with the right trans id */
66         ubik_currentTrans->tid.epoch = atid->epoch;
67         ubik_currentTrans->tid.counter = atid->counter;
68     }
69   out:
70     DBRELE(ubik_dbase);
71     return code;
72 }
73
74
75 afs_int32
76 SDISK_Commit(struct rx_call *rxcall, struct ubik_tid *atid)
77 {
78     afs_int32 code;
79
80     if ((code = ubik_CheckAuth(rxcall))) {
81         return code;
82     }
83     ObtainWriteLock(&ubik_dbase->cache_lock);
84     DBHOLD(ubik_dbase);
85     if (!ubik_currentTrans) {
86         code = USYNC;
87         goto done;
88     }
89     /*
90      * sanity check to make sure only write trans appear here
91      */
92     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
93         code = UBADTYPE;
94         goto done;
95     }
96
97     urecovery_CheckTid(atid, 0);
98     if (!ubik_currentTrans) {
99         code = USYNC;
100         goto done;
101     }
102
103     code = udisk_commit(ubik_currentTrans);
104     if (code == 0) {
105         /* sync site should now match */
106         uvote_set_dbVersion(ubik_dbase->version);
107     }
108 done:
109     DBRELE(ubik_dbase);
110     ReleaseWriteLock(&ubik_dbase->cache_lock);
111     return code;
112 }
113
114 afs_int32
115 SDISK_ReleaseLocks(struct rx_call *rxcall, struct ubik_tid *atid)
116 {
117     afs_int32 code;
118
119     if ((code = ubik_CheckAuth(rxcall))) {
120         return code;
121     }
122
123     DBHOLD(ubik_dbase);
124
125     if (!ubik_currentTrans) {
126         code = USYNC;
127         goto done;
128     }
129     /* sanity check to make sure only write trans appear here */
130     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
131         code = UBADTYPE;
132         goto done;
133     }
134
135     urecovery_CheckTid(atid, 0);
136     if (!ubik_currentTrans) {
137         code = USYNC;
138         goto done;
139     }
140
141     /* If the thread is not waiting for lock - ok to end it */
142     if (ubik_currentTrans->locktype != LOCKWAIT) {
143         udisk_end(ubik_currentTrans);
144     }
145     ubik_currentTrans = (struct ubik_trans *)0;
146 done:
147     DBRELE(ubik_dbase);
148     return code;
149 }
150
151 afs_int32
152 SDISK_Abort(struct rx_call *rxcall, struct ubik_tid *atid)
153 {
154     afs_int32 code;
155
156     if ((code = ubik_CheckAuth(rxcall))) {
157         return code;
158     }
159     DBHOLD(ubik_dbase);
160     if (!ubik_currentTrans) {
161         code = USYNC;
162         goto done;
163     }
164     /* sanity check to make sure only write trans appear here  */
165     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
166         code = UBADTYPE;
167         goto done;
168     }
169
170     urecovery_CheckTid(atid, 0);
171     if (!ubik_currentTrans) {
172         code = USYNC;
173         goto done;
174     }
175
176     code = udisk_abort(ubik_currentTrans);
177     /* If the thread is not waiting for lock - ok to end it */
178     if (ubik_currentTrans->locktype != LOCKWAIT) {
179         udisk_end(ubik_currentTrans);
180     }
181     ubik_currentTrans = (struct ubik_trans *)0;
182 done:
183     DBRELE(ubik_dbase);
184     return code;
185 }
186
187 /* apos and alen are not used */
188 afs_int32
189 SDISK_Lock(struct rx_call *rxcall, struct ubik_tid *atid,
190            afs_int32 afile, afs_int32 apos, afs_int32 alen, afs_int32 atype)
191 {
192     afs_int32 code;
193     struct ubik_trans *ubik_thisTrans;
194
195     if ((code = ubik_CheckAuth(rxcall))) {
196         return code;
197     }
198     DBHOLD(ubik_dbase);
199     if (!ubik_currentTrans) {
200         code = USYNC;
201         goto done;
202     }
203     /* sanity check to make sure only write trans appear here */
204     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
205         code = UBADTYPE;
206         goto done;
207     }
208     if (alen != 1) {
209         code = UBADLOCK;
210         goto done;
211     }
212     urecovery_CheckTid(atid, 0);
213     if (!ubik_currentTrans) {
214         code = USYNC;
215         goto done;
216     }
217
218     ubik_thisTrans = ubik_currentTrans;
219     code = ulock_getLock(ubik_currentTrans, atype, 1);
220
221     /* While waiting, the transaction may have been ended/
222      * aborted from under us (urecovery_CheckTid). In that
223      * case, end the transaction here.
224      */
225     if (!code && (ubik_currentTrans != ubik_thisTrans)) {
226         udisk_end(ubik_thisTrans);
227         code = USYNC;
228     }
229 done:
230     DBRELE(ubik_dbase);
231     return code;
232 }
233
234 /*!
235  * \brief Write a vector of data
236  */
237 afs_int32
238 SDISK_WriteV(struct rx_call *rxcall, struct ubik_tid *atid,
239              iovec_wrt *io_vector, iovec_buf *io_buffer)
240 {
241     afs_int32 code, i, offset;
242     struct ubik_iovec *iovec;
243     char *iobuf;
244
245     if ((code = ubik_CheckAuth(rxcall))) {
246         return code;
247     }
248     DBHOLD(ubik_dbase);
249     if (!ubik_currentTrans) {
250         code = USYNC;
251         goto done;
252     }
253     /* sanity check to make sure only write trans appear here */
254     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
255         code = UBADTYPE;
256         goto done;
257     }
258
259     urecovery_CheckTid(atid, 0);
260     if (!ubik_currentTrans) {
261         code = USYNC;
262         goto done;
263     }
264
265     iovec = (struct ubik_iovec *)io_vector->iovec_wrt_val;
266     iobuf = (char *)io_buffer->iovec_buf_val;
267     for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
268         /* Sanity check for going off end of buffer */
269         if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
270             code = UINTERNAL;
271         } else {
272             code =
273                 udisk_write(ubik_currentTrans, iovec[i].file, &iobuf[offset],
274                             iovec[i].position, iovec[i].length);
275         }
276         if (code)
277             break;
278
279         offset += iovec[i].length;
280     }
281 done:
282     DBRELE(ubik_dbase);
283     return code;
284 }
285
286 afs_int32
287 SDISK_Write(struct rx_call *rxcall, struct ubik_tid *atid,
288             afs_int32 afile, afs_int32 apos, bulkdata *adata)
289 {
290     afs_int32 code;
291
292     if ((code = ubik_CheckAuth(rxcall))) {
293         return code;
294     }
295     DBHOLD(ubik_dbase);
296     if (!ubik_currentTrans) {
297         code = USYNC;
298         goto done;
299     }
300     /* sanity check to make sure only write trans appear here */
301     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
302         code = UBADTYPE;
303         goto done;
304     }
305
306     urecovery_CheckTid(atid, 0);
307     if (!ubik_currentTrans) {
308         code = USYNC;
309         goto done;
310     }
311     code =
312         udisk_write(ubik_currentTrans, afile, adata->bulkdata_val, apos,
313                     adata->bulkdata_len);
314 done:
315     DBRELE(ubik_dbase);
316     return code;
317 }
318
319 afs_int32
320 SDISK_Truncate(struct rx_call *rxcall, struct ubik_tid *atid,
321                afs_int32 afile, afs_int32 alen)
322 {
323     afs_int32 code;
324
325     if ((code = ubik_CheckAuth(rxcall))) {
326         return code;
327     }
328     DBHOLD(ubik_dbase);
329     if (!ubik_currentTrans) {
330         code = USYNC;
331         goto done;
332     }
333     /* sanity check to make sure only write trans appear here */
334     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
335         code = UBADTYPE;
336         goto done;
337     }
338
339     urecovery_CheckTid(atid, 0);
340     if (!ubik_currentTrans) {
341         code = USYNC;
342         goto done;
343     }
344     code = udisk_truncate(ubik_currentTrans, afile, alen);
345 done:
346     DBRELE(ubik_dbase);
347     return code;
348 }
349
350 afs_int32
351 SDISK_GetVersion(struct rx_call *rxcall,
352                  struct ubik_version *aversion)
353 {
354     afs_int32 code;
355
356     if ((code = ubik_CheckAuth(rxcall))) {
357         return code;
358     }
359
360     /*
361      * If we are the sync site, recovery shouldn't be running on any
362      * other site. We shouldn't be getting this RPC as long as we are
363      * the sync site.  To prevent any unforseen activity, we should
364      * reject this RPC until we have recognized that we are not the
365      * sync site anymore, and/or if we have any pending WRITE
366      * transactions that have to complete. This way we can be assured
367      * that this RPC would not block any pending transactions that
368      * should either fail or pass. If we have recognized the fact that
369      * we are not the sync site any more, all write transactions would
370      * fail with UNOQUORUM anyway.
371      */
372     DBHOLD(ubik_dbase);
373     if (ubeacon_AmSyncSite()) {
374         DBRELE(ubik_dbase);
375         return UDEADLOCK;
376     }
377
378     code = (*ubik_dbase->getlabel) (ubik_dbase, 0, aversion);
379     DBRELE(ubik_dbase);
380     if (code) {
381         /* tell other side there's no dbase */
382         aversion->epoch = 0;
383         aversion->counter = 0;
384     }
385     return 0;
386 }
387
388 afs_int32
389 SDISK_GetFile(struct rx_call *rxcall, afs_int32 file,
390               struct ubik_version *version)
391 {
392     afs_int32 code;
393     struct ubik_dbase *dbase;
394     afs_int32 offset;
395     struct ubik_stat ubikstat;
396     char tbuffer[256];
397     afs_int32 tlen;
398     afs_int32 length;
399     struct rx_peer *tpeer;
400     struct rx_connection *tconn;
401     afs_uint32 otherHost = 0;
402     char hoststr[16];
403
404     if ((code = ubik_CheckAuth(rxcall))) {
405         return code;
406     }
407
408     tconn = rx_ConnectionOf(rxcall);
409     tpeer = rx_PeerOf(tconn);
410     otherHost = ubikGetPrimaryInterfaceAddr(rx_HostOf(tpeer));
411     ViceLog(0, ("Ubik: Synchronize database: send (via GetFile) "
412                 "to server %s begin\n",
413                afs_inet_ntoa_r(otherHost, hoststr)));
414
415     dbase = ubik_dbase;
416     DBHOLD(dbase);
417     code = (*dbase->stat) (dbase, file, &ubikstat);
418     if (code < 0) {
419         ViceLog(0, ("database stat() error:%d\n", code));
420         goto failed;
421     }
422     length = ubikstat.size;
423     tlen = htonl(length);
424     code = rx_Write(rxcall, (char *)&tlen, sizeof(afs_int32));
425     if (code != sizeof(afs_int32)) {
426         ViceLog(0, ("Rx-write length error=%d\n", code));
427         code = BULK_ERROR;
428         goto failed;
429     }
430     offset = 0;
431     while (length > 0) {
432         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
433         code = (*dbase->read) (dbase, file, tbuffer, offset, tlen);
434         if (code != tlen) {
435             ViceLog(0, ("read failed error=%d\n", code));
436             code = UIOERROR;
437             goto failed;
438         }
439         code = rx_Write(rxcall, tbuffer, tlen);
440         if (code != tlen) {
441             ViceLog(0, ("Rx-write data error=%d\n", code));
442             code = BULK_ERROR;
443             goto failed;
444         }
445         length -= tlen;
446         offset += tlen;
447     }
448     code = (*dbase->getlabel) (dbase, file, version);   /* return the dbase, too */
449     if (code)
450         ViceLog(0, ("getlabel error=%d\n", code));
451
452  failed:
453     DBRELE(dbase);
454     if (code) {
455         ViceLog(0,
456             ("Ubik: Synchronize database: send (via GetFile) to "
457              "server %s failed (error = %d)\n",
458              afs_inet_ntoa_r(otherHost, hoststr), code));
459     } else {
460         ViceLog(0,
461             ("Ubik: Synchronize database: send (via GetFile) to "
462              "server %s complete, version: %d.%d\n",
463             afs_inet_ntoa_r(otherHost, hoststr), version->epoch, version->counter));
464     }
465     return code;
466 }
467
468 afs_int32
469 SDISK_SendFile(struct rx_call *rxcall, afs_int32 file,
470                afs_int32 length, struct ubik_version *avers)
471 {
472     afs_int32 code;
473     struct ubik_dbase *dbase = NULL;
474     char tbuffer[1024];
475     afs_int32 offset;
476     struct ubik_version tversion;
477     int tlen;
478     struct rx_peer *tpeer;
479     struct rx_connection *tconn;
480     afs_uint32 syncHost = 0;
481     afs_uint32 otherHost = 0;
482     char hoststr[16];
483     char pbuffer[1028];
484     int fd = -1;
485     afs_int32 epoch = 0;
486     afs_int32 pass;
487
488     /* send the file back to the requester */
489
490     dbase = ubik_dbase;
491     pbuffer[0] = '\0';
492
493     if ((code = ubik_CheckAuth(rxcall))) {
494         return code;
495     }
496
497     /* next, we do a sanity check to see if the guy sending us the database is
498      * the guy we think is the sync site.  It turns out that we might not have
499      * decided yet that someone's the sync site, but they could have enough
500      * votes from others to be sync site anyway, and could send us the database
501      * in advance of getting our votes.  This is fine, what we're really trying
502      * to check is that some authenticated bogon isn't sending a random database
503      * into another configuration.  This could happen on a bad configuration
504      * screwup.  Thus, we only object if we're sure we know who the sync site
505      * is, and it ain't the guy talking to us.
506      */
507     syncHost = uvote_GetSyncSite();
508     tconn = rx_ConnectionOf(rxcall);
509     tpeer = rx_PeerOf(tconn);
510     otherHost = ubikGetPrimaryInterfaceAddr(rx_HostOf(tpeer));
511     if (syncHost && syncHost != otherHost) {
512         /* we *know* this is the wrong guy */
513         char sync_hoststr[16];
514         ViceLog(0,
515             ("Ubik: Refusing synchronization with server %s since it is not the sync-site (%s).\n",
516              afs_inet_ntoa_r(otherHost, hoststr),
517              afs_inet_ntoa_r(syncHost, sync_hoststr)));
518         return USYNC;
519     }
520
521     DBHOLD(dbase);
522
523     /* abort any active trans that may scribble over the database */
524     urecovery_AbortAll(dbase);
525
526     ViceLog(0, ("Ubik: Synchronize database: receive (via SendFile) from server %s begin\n",
527                afs_inet_ntoa_r(otherHost, hoststr)));
528
529     offset = 0;
530     UBIK_VERSION_LOCK;
531     epoch = tversion.epoch = 0;         /* start off by labelling in-transit db as invalid */
532     (*dbase->setlabel) (dbase, file, &tversion);        /* setlabel does sync */
533     snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP",
534              ubik_dbase->pathName, (file<0)?"SYS":"",
535              (file<0)?-file:file);
536     fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
537     if (fd < 0) {
538         code = errno;
539         ViceLog(0, ("Open error=%d\n", code));
540         goto failed_locked;
541     }
542     code = lseek(fd, HDRSIZE, 0);
543     if (code != HDRSIZE) {
544         ViceLog(0, ("lseek error=%d\n", code));
545         close(fd);
546         goto failed_locked;
547     }
548     pass = 0;
549     memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
550     UBIK_VERSION_UNLOCK;
551     while (length > 0) {
552         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
553 #if !defined(AFS_PTHREAD_ENV)
554         if (pass % 4 == 0)
555             IOMGR_Poll();
556 #endif
557         code = rx_Read(rxcall, tbuffer, tlen);
558         if (code != tlen) {
559             ViceLog(0, ("Rx-read length error=%d\n", code));
560             code = BULK_ERROR;
561             close(fd);
562             goto failed;
563         }
564         code = write(fd, tbuffer, tlen);
565         pass++;
566         if (code != tlen) {
567             ViceLog(0, ("write failed tlen=%d, error=%d\n", tlen, code));
568             code = UIOERROR;
569             close(fd);
570             goto failed;
571         }
572         offset += tlen;
573         length -= tlen;
574     }
575     code = close(fd);
576     if (code) {
577         ViceLog(0, ("close failed error=%d\n", code));
578         goto failed;
579     }
580
581     /* sync data first, then write label and resync (resync done by setlabel call).
582      * This way, good label is only on good database. */
583     snprintf(tbuffer, sizeof(tbuffer), "%s.DB%s%d",
584              ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
585 #ifdef AFS_NT40_ENV
586     snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD",
587              ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
588     code = unlink(pbuffer);
589     if (!code)
590         code = rename(tbuffer, pbuffer);
591     snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP",
592              ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
593 #endif
594     if (!code)
595         code = rename(pbuffer, tbuffer);
596     UBIK_VERSION_LOCK;
597     if (!code) {
598         (*ubik_dbase->open) (ubik_dbase, file);
599         code = (*ubik_dbase->setlabel) (dbase, file, avers);
600     }
601 #ifdef AFS_NT40_ENV
602     snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD",
603              ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
604     unlink(pbuffer);
605 #endif
606     memcpy(&ubik_dbase->version, avers, sizeof(struct ubik_version));
607     udisk_Invalidate(dbase, file);      /* new dbase, flush disk buffers */
608 #ifdef AFS_PTHREAD_ENV
609     opr_Assert(pthread_cond_broadcast(&dbase->version_cond) == 0);
610 #else
611     LWP_NoYieldSignal(&dbase->version);
612 #endif
613
614 failed_locked:
615     UBIK_VERSION_UNLOCK;
616
617 failed:
618     if (code) {
619         if (pbuffer[0] != '\0')
620             unlink(pbuffer);
621
622         /* Failed to sync. Allow reads again for now. */
623         if (dbase != NULL) {
624             UBIK_VERSION_LOCK;
625             tversion.epoch = epoch;
626             (*dbase->setlabel) (dbase, file, &tversion);
627             UBIK_VERSION_UNLOCK;
628         }
629         ViceLog(0,
630             ("Ubik: Synchronize database: receive (via SendFile) from "
631              "server %s failed (error = %d)\n",
632             afs_inet_ntoa_r(otherHost, hoststr), code));
633     } else {
634         uvote_set_dbVersion(*avers);
635         ViceLog(0,
636             ("Ubik: Synchronize database: receive (via SendFile) from "
637              "server %s complete, version: %d.%d\n",
638             afs_inet_ntoa_r(otherHost, hoststr), avers->epoch, avers->counter));
639     }
640     DBRELE(dbase);
641     return code;
642 }
643
644
645 afs_int32
646 SDISK_Probe(struct rx_call *rxcall)
647 {
648     return 0;
649 }
650
651 /*!
652  * \brief Update remote machines addresses in my server list
653  *
654  * Send back my addresses to caller of this RPC
655  * \return zero on success, else 1.
656  */
657 afs_int32
658 SDISK_UpdateInterfaceAddr(struct rx_call *rxcall,
659                           UbikInterfaceAddr *inAddr,
660                           UbikInterfaceAddr *outAddr)
661 {
662     struct ubik_server *ts, *tmp;
663     afs_uint32 remoteAddr;      /* in net byte order */
664     int i, j, found = 0, probableMatch = 0;
665     char hoststr[16];
666
667     UBIK_ADDR_LOCK;
668     /* copy the output parameters */
669     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR; i++)
670         outAddr->hostAddr[i] = ntohl(ubik_host[i]);
671
672     remoteAddr = htonl(inAddr->hostAddr[0]);
673     for (ts = ubik_servers; ts; ts = ts->next)
674         if (ts->addr[0] == remoteAddr) {        /* both in net byte order */
675             probableMatch = 1;
676             break;
677         }
678
679     if (probableMatch) {
680         /* verify that all addresses in the incoming RPC are
681          ** not part of other server entries in my CellServDB
682          */
683         for (i = 0; !found && (i < UBIK_MAX_INTERFACE_ADDR)
684              && inAddr->hostAddr[i]; i++) {
685             remoteAddr = htonl(inAddr->hostAddr[i]);
686             for (tmp = ubik_servers; (!found && tmp); tmp = tmp->next) {
687                 if (ts == tmp)  /* this is my server */
688                     continue;
689                 for (j = 0; (j < UBIK_MAX_INTERFACE_ADDR) && tmp->addr[j];
690                      j++)
691                     if (remoteAddr == tmp->addr[j]) {
692                         found = 1;
693                         break;
694                     }
695             }
696         }
697     }
698
699     /* if (probableMatch) */
700     /* inconsistent addresses in CellServDB */
701     if (!probableMatch || found) {
702         ViceLog(0, ("Inconsistent Cell Info from server:\n"));
703         for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
704             ViceLog(0, ("... %s\n", afs_inet_ntoa_r(htonl(inAddr->hostAddr[i]), hoststr)));
705         fflush(stdout);
706         fflush(stderr);
707         printServerInfo();
708         UBIK_ADDR_UNLOCK;
709         return UBADHOST;
710     }
711
712     /* update our data structures */
713     for (i = 1; i < UBIK_MAX_INTERFACE_ADDR; i++)
714         ts->addr[i] = htonl(inAddr->hostAddr[i]);
715
716     ViceLog(0, ("ubik: A Remote Server has addresses:\n"));
717     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && ts->addr[i]; i++)
718         ViceLog(0, ("... %s\n", afs_inet_ntoa_r(ts->addr[i], hoststr)));
719
720     UBIK_ADDR_UNLOCK;
721
722     /*
723      * The most likely cause of a DISK_UpdateInterfaceAddr RPC
724      * is because the server was restarted.  Reset its state
725      * so that no DISK_Begin RPCs will be issued until the
726      * known database version is current.
727      */
728     UBIK_BEACON_LOCK;
729     ts->beaconSinceDown = 0;
730     ts->currentDB = 0;
731     urecovery_LostServer(ts);
732     UBIK_BEACON_UNLOCK;
733     return 0;
734 }
735
736 static void
737 printServerInfo(void)
738 {
739     struct ubik_server *ts;
740     int i, j = 1;
741     char hoststr[16];
742
743     ViceLog(0, ("Local CellServDB:\n"));
744     for (ts = ubik_servers; ts; ts = ts->next, j++) {
745         ViceLog(0, ("  Server %d:\n", j));
746         for (i = 0; (i < UBIK_MAX_INTERFACE_ADDR) && ts->addr[i]; i++)
747             ViceLog(0, ("  ... %s\n", afs_inet_ntoa_r(ts->addr[i], hoststr)));
748     }
749 }
750
751 afs_int32
752 SDISK_SetVersion(struct rx_call *rxcall, struct ubik_tid *atid,
753                  struct ubik_version *oldversionp,
754                  struct ubik_version *newversionp)
755 {
756     afs_int32 code = 0;
757
758     if ((code = ubik_CheckAuth(rxcall))) {
759         return (code);
760     }
761     DBHOLD(ubik_dbase);
762     if (!ubik_currentTrans) {
763         code = USYNC;
764         goto done;
765     }
766     /* sanity check to make sure only write trans appear here */
767     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
768         code = UBADTYPE;
769         goto done;
770     }
771
772     /* Should not get this for the sync site */
773     if (ubeacon_AmSyncSite()) {
774         code = UDEADLOCK;
775         goto done;
776     }
777
778     urecovery_CheckTid(atid, 0);
779     if (!ubik_currentTrans) {
780         code = USYNC;
781         goto done;
782     }
783
784     /* Set the label if our version matches the sync-site's. Also set the label
785      * if our on-disk version matches the old version, and our view of the
786      * sync-site's version matches the new version. This suggests that
787      * ubik_dbVersion was updated while the sync-site was setting the new
788      * version, and it already told us via VOTE_Beacon. */
789     if (uvote_eq_dbVersion(*oldversionp)
790         || (uvote_eq_dbVersion(*newversionp)
791             && vcmp(ubik_dbase->version, *oldversionp) == 0)) {
792         UBIK_VERSION_LOCK;
793         code = (*ubik_dbase->setlabel) (ubik_dbase, 0, newversionp);
794         if (!code) {
795             ubik_dbase->version = *newversionp;
796             uvote_set_dbVersion(*newversionp);
797         }
798         UBIK_VERSION_UNLOCK;
799     } else {
800         code = USYNC;
801     }
802 done:
803     DBRELE(ubik_dbase);
804     return code;
805 }