be008e1ba98473d237261e42a1cbb9e91925bcc0
[openafs.git] / src / ubik / remote.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13
14 #include <sys/types.h>
15 #include <string.h>
16 #include <stdarg.h>
17
18 #ifdef AFS_NT40_ENV
19 #include <winsock2.h>
20 #include <fcntl.h>
21 #else
22 #include <sys/file.h>
23 #include <netinet/in.h>
24 #endif
25
26 #include <lock.h>
27 #include <rx/xdr.h>
28 #include <rx/rx.h>
29 #include <errno.h>
30 #include <afs/afsutil.h>
31
32 #define UBIK_INTERNALS
33 #include "ubik.h"
34 #include "ubik_int.h"
35
36 int (*ubik_CheckRXSecurityProc) (void *, struct rx_call *);
37 void *ubik_CheckRXSecurityRock;
38
39 static void printServerInfo(void);
40
41 /*! \file
42  * routines for handling requests remotely-submitted by the sync site.  These are
43  * only write transactions (we don't propagate read trans), and there is at most one
44  * write transaction extant at any one time.
45  */
46
47 struct ubik_trans *ubik_currentTrans = 0;
48
49 int
50 ubik_CheckAuth(register struct rx_call *acall)
51 {
52     register afs_int32 code;
53     if (ubik_CheckRXSecurityProc) {
54         code = (*ubik_CheckRXSecurityProc) (ubik_CheckRXSecurityRock, acall);
55         return code;
56     } else
57         return 0;
58 }
59
60 /* the rest of these guys handle remote execution of write
61  * transactions: this is the code executed on the other servers when a
62  * sync site is executing a write transaction.
63  */
64 afs_int32
65 SDISK_Begin(register struct rx_call *rxcall, struct ubik_tid *atid)
66 {
67     register afs_int32 code;
68
69     if ((code = ubik_CheckAuth(rxcall))) {
70         return code;
71     }
72     DBHOLD(ubik_dbase);
73     urecovery_CheckTid(atid);
74     if (ubik_currentTrans) {
75         /* If the thread is not waiting for lock - ok to end it */
76 #if !defined(UBIK_PAUSE)
77         if (ubik_currentTrans->locktype != LOCKWAIT) {
78 #endif /* UBIK_PAUSE */
79             udisk_end(ubik_currentTrans);
80 #if !defined(UBIK_PAUSE)
81         }
82 #endif /* UBIK_PAUSE */
83         ubik_currentTrans = (struct ubik_trans *)0;
84     }
85     code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
86     if (!code && ubik_currentTrans) {
87         /* label this trans with the right trans id */
88         ubik_currentTrans->tid.epoch = atid->epoch;
89         ubik_currentTrans->tid.counter = atid->counter;
90     }
91     DBRELE(ubik_dbase);
92     return code;
93 }
94
95
96 afs_int32
97 SDISK_Commit(register struct rx_call *rxcall, struct ubik_tid *atid)
98 {
99     register afs_int32 code;
100     register struct ubik_dbase *dbase;
101
102     if ((code = ubik_CheckAuth(rxcall))) {
103         return code;
104     }
105
106     if (!ubik_currentTrans) {
107         return USYNC;
108     }
109     /*
110      * sanity check to make sure only write trans appear here
111      */
112     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
113         return UBADTYPE;
114     }
115
116     dbase = ubik_currentTrans->dbase;
117     DBHOLD(dbase);
118     urecovery_CheckTid(atid);
119     if (!ubik_currentTrans) {
120         DBRELE(dbase);
121         return USYNC;
122     }
123
124     code = udisk_commit(ubik_currentTrans);
125     if (code == 0) {
126         /* sync site should now match */
127         ubik_dbVersion = ubik_dbase->version;
128     }
129     DBRELE(dbase);
130     return code;
131 }
132
133 afs_int32
134 SDISK_ReleaseLocks(register struct rx_call *rxcall, struct ubik_tid *atid)
135 {
136     register struct ubik_dbase *dbase;
137     register afs_int32 code;
138
139     if ((code = ubik_CheckAuth(rxcall))) {
140         return code;
141     }
142
143     if (!ubik_currentTrans) {
144         return USYNC;
145     }
146     /* sanity check to make sure only write trans appear here */
147     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
148         return UBADTYPE;
149     }
150
151     dbase = ubik_currentTrans->dbase;
152     DBHOLD(dbase);
153     urecovery_CheckTid(atid);
154     if (!ubik_currentTrans) {
155         DBRELE(dbase);
156         return USYNC;
157     }
158
159     /* If the thread is not waiting for lock - ok to end it */
160 #if !defined(UBIK_PAUSE)
161     if (ubik_currentTrans->locktype != LOCKWAIT) {
162 #endif /* UBIK_PAUSE */
163         udisk_end(ubik_currentTrans);
164 #if !defined(UBIK_PAUSE)
165     }
166 #endif /* UBIK_PAUSE */
167     ubik_currentTrans = (struct ubik_trans *)0;
168     DBRELE(dbase);
169     return 0;
170 }
171
172 afs_int32
173 SDISK_Abort(register struct rx_call *rxcall, struct ubik_tid *atid)
174 {
175     register afs_int32 code;
176     register struct ubik_dbase *dbase;
177
178     if ((code = ubik_CheckAuth(rxcall))) {
179         return code;
180     }
181
182     if (!ubik_currentTrans) {
183         return USYNC;
184     }
185     /* sanity check to make sure only write trans appear here  */
186     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
187         return UBADTYPE;
188     }
189
190     dbase = ubik_currentTrans->dbase;
191     DBHOLD(dbase);
192     urecovery_CheckTid(atid);
193     if (!ubik_currentTrans) {
194         DBRELE(dbase);
195         return USYNC;
196     }
197
198     code = udisk_abort(ubik_currentTrans);
199     /* If the thread is not waiting for lock - ok to end it */
200 #if !defined(UBIK_PAUSE)
201     if (ubik_currentTrans->locktype != LOCKWAIT) {
202 #endif /* UBIK_PAUSE */
203         udisk_end(ubik_currentTrans);
204 #if !defined(UBIK_PAUSE)
205     }
206 #endif /* UBIK_PAUSE */
207     ubik_currentTrans = (struct ubik_trans *)0;
208     DBRELE(dbase);
209     return code;
210 }
211
212 /* apos and alen are not used */
213 afs_int32
214 SDISK_Lock(register struct rx_call *rxcall, struct ubik_tid *atid, 
215            afs_int32 afile, afs_int32 apos, afs_int32 alen, afs_int32 atype)
216 {       
217     register afs_int32 code;
218     register struct ubik_dbase *dbase;
219     struct ubik_trans *ubik_thisTrans;
220
221     if ((code = ubik_CheckAuth(rxcall))) {
222         return code;
223     }
224     if (!ubik_currentTrans) {
225         return USYNC;
226     }
227     /* sanity check to make sure only write trans appear here */
228     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
229         return UBADTYPE;
230     }
231     if (alen != 1) {
232         return UBADLOCK;
233     }
234     dbase = ubik_currentTrans->dbase;
235     DBHOLD(dbase);
236     urecovery_CheckTid(atid);
237     if (!ubik_currentTrans) {
238         DBRELE(dbase);
239         return USYNC;
240     }
241
242     ubik_thisTrans = ubik_currentTrans;
243     code = ulock_getLock(ubik_currentTrans, atype, 1);
244
245     /* While waiting, the transaction may have been ended/
246      * aborted from under us (urecovery_CheckTid). In that
247      * case, end the transaction here.
248      */
249     if (!code && (ubik_currentTrans != ubik_thisTrans)) {
250         udisk_end(ubik_thisTrans);
251         code = USYNC;
252     }
253
254     DBRELE(dbase);
255     return code;
256 }
257
258 /*!
259  * \brief Write a vector of data
260  */
261 afs_int32
262 SDISK_WriteV(register struct rx_call *rxcall, struct ubik_tid *atid, 
263              iovec_wrt *io_vector, iovec_buf *io_buffer)
264 {
265     afs_int32 code, i, offset;
266     struct ubik_dbase *dbase;
267     struct ubik_iovec *iovec;
268     char *iobuf;
269
270     if ((code = ubik_CheckAuth(rxcall))) {
271         return code;
272     }
273     if (!ubik_currentTrans) {
274         return USYNC;
275     }
276     /* sanity check to make sure only write trans appear here */
277     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
278         return UBADTYPE;
279     }
280
281     dbase = ubik_currentTrans->dbase;
282     DBHOLD(dbase);
283     urecovery_CheckTid(atid);
284     if (!ubik_currentTrans) {
285         DBRELE(dbase);
286         return USYNC;
287     }
288
289     iovec = (struct ubik_iovec *)io_vector->iovec_wrt_val;
290     iobuf = (char *)io_buffer->iovec_buf_val;
291     for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
292         /* Sanity check for going off end of buffer */
293         if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
294             code = UINTERNAL;
295         } else {
296             code =
297                 udisk_write(ubik_currentTrans, iovec[i].file, &iobuf[offset],
298                             iovec[i].position, iovec[i].length);
299         }
300         if (code)
301             break;
302
303         offset += iovec[i].length;
304     }
305
306     DBRELE(dbase);
307     return code;
308 }
309
310 afs_int32
311 SDISK_Write(register struct rx_call *rxcall, struct ubik_tid *atid, 
312             afs_int32 afile, afs_int32 apos, register bulkdata *adata)
313 {
314     register afs_int32 code;
315     register struct ubik_dbase *dbase;
316
317     if ((code = ubik_CheckAuth(rxcall))) {
318         return code;
319     }
320     if (!ubik_currentTrans) {
321         return USYNC;
322     }
323     /* sanity check to make sure only write trans appear here */
324     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
325         return UBADTYPE;
326     }
327
328     dbase = ubik_currentTrans->dbase;
329     DBHOLD(dbase);
330     urecovery_CheckTid(atid);
331     if (!ubik_currentTrans) {
332         DBRELE(dbase);
333         return USYNC;
334     }
335     code =
336         udisk_write(ubik_currentTrans, afile, adata->bulkdata_val, apos,
337                     adata->bulkdata_len);
338     DBRELE(dbase);
339     return code;
340 }
341
342 afs_int32
343 SDISK_Truncate(register struct rx_call *rxcall, struct ubik_tid *atid, 
344                afs_int32 afile, afs_int32 alen)
345 {
346     register afs_int32 code;
347     register struct ubik_dbase *dbase;
348
349     if ((code = ubik_CheckAuth(rxcall))) {
350         return code;
351     }
352     if (!ubik_currentTrans) {
353         return USYNC;
354     }
355     /* sanity check to make sure only write trans appear here */
356     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
357         return UBADTYPE;
358     }
359
360     dbase = ubik_currentTrans->dbase;
361     DBHOLD(dbase);
362     urecovery_CheckTid(atid);
363     if (!ubik_currentTrans) {
364         DBRELE(dbase);
365         return USYNC;
366     }
367     code = udisk_truncate(ubik_currentTrans, afile, alen);
368     DBRELE(dbase);
369     return code;
370 }
371
372 afs_int32
373 SDISK_GetVersion(register struct rx_call *rxcall, 
374                  register struct ubik_version *aversion)
375 {
376     register afs_int32 code;
377
378     if ((code = ubik_CheckAuth(rxcall))) {
379         return code;
380     }
381
382     /*
383      * If we are the sync site, recovery shouldn't be running on any
384      * other site. We shouldn't be getting this RPC as long as we are
385      * the sync site.  To prevent any unforseen activity, we should
386      * reject this RPC until we have recognized that we are not the
387      * sync site anymore, and/or if we have any pending WRITE
388      * transactions that have to complete. This way we can be assured
389      * that this RPC would not block any pending transactions that
390      * should either fail or pass. If we have recognized the fact that
391      * we are not the sync site any more, all write transactions would
392      * fail with UNOQUORUM anyway.
393      */
394     if (ubeacon_AmSyncSite()) {
395         return UDEADLOCK;
396     }
397
398     DBHOLD(ubik_dbase);
399     code = (*ubik_dbase->getlabel) (ubik_dbase, 0, aversion);
400     DBRELE(ubik_dbase);
401     if (code) {
402         /* tell other side there's no dbase */
403         aversion->epoch = 0;
404         aversion->counter = 0;
405     }
406     return 0;
407 }
408
409 afs_int32
410 SDISK_GetFile(register struct rx_call *rxcall, register afs_int32 file, 
411               struct ubik_version *version)
412 {
413     register afs_int32 code;
414     register struct ubik_dbase *dbase;
415     register afs_int32 offset;
416     struct ubik_stat ubikstat;
417     char tbuffer[256];
418     afs_int32 tlen;
419     afs_int32 length;
420
421     if ((code = ubik_CheckAuth(rxcall))) {
422         return code;
423     }
424 /* temporarily disabled because it causes problems for migration tool.  Hey, it's just
425  * a sanity check, anyway. 
426     if (ubeacon_AmSyncSite()) {
427       return UDEADLOCK;
428     }
429 */
430     dbase = ubik_dbase;
431     DBHOLD(dbase);
432     code = (*dbase->stat) (dbase, file, &ubikstat);
433     if (code < 0) {
434         DBRELE(dbase);
435         return code;
436     }
437     length = ubikstat.size;
438     tlen = htonl(length);
439     code = rx_Write(rxcall, (char *)&tlen, sizeof(afs_int32));
440     if (code != sizeof(afs_int32)) {
441         DBRELE(dbase);
442         ubik_dprint("Rx-write length error=%d\n", code);
443         return BULK_ERROR;
444     }
445     offset = 0;
446     while (length > 0) {
447         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
448         code = (*dbase->read) (dbase, file, tbuffer, offset, tlen);
449         if (code != tlen) {
450             DBRELE(dbase);
451             ubik_dprint("read failed error=%d\n", code);
452             return UIOERROR;
453         }
454         code = rx_Write(rxcall, tbuffer, tlen);
455         if (code != tlen) {
456             DBRELE(dbase);
457             ubik_dprint("Rx-write length error=%d\n", code);
458             return BULK_ERROR;
459         }
460         length -= tlen;
461         offset += tlen;
462     }
463     code = (*dbase->getlabel) (dbase, file, version);   /* return the dbase, too */
464     DBRELE(dbase);
465     return code;
466 }
467
468 afs_int32
469 SDISK_SendFile(register struct rx_call *rxcall, afs_int32 file, 
470                afs_int32 length, struct ubik_version *avers)
471 {
472     register afs_int32 code;
473     struct ubik_dbase *dbase = NULL;
474     char tbuffer[1024];
475     afs_int32 offset;
476     struct ubik_version tversion;
477     register int tlen;
478     struct rx_peer *tpeer;
479     struct rx_connection *tconn;
480     afs_uint32 otherHost = 0;
481 #ifndef OLD_URECOVERY
482     char pbuffer[1028];
483     int flen, fd = -1;
484     afs_int32 epoch = 0;
485     afs_int32 pass;
486 #endif
487
488     /* send the file back to the requester */
489
490     if ((code = ubik_CheckAuth(rxcall))) {
491         goto failed;
492     }
493
494     /* next, we do a sanity check to see if the guy sending us the database is
495      * the guy we think is the sync site.  It turns out that we might not have
496      * decided yet that someone's the sync site, but they could have enough
497      * votes from others to be sync site anyway, and could send us the database
498      * in advance of getting our votes.  This is fine, what we're really trying
499      * to check is that some authenticated bogon isn't sending a random database
500      * into another configuration.  This could happen on a bad configuration
501      * screwup.  Thus, we only object if we're sure we know who the sync site
502      * is, and it ain't the guy talking to us.
503      */
504     offset = uvote_GetSyncSite();
505     tconn = rx_ConnectionOf(rxcall);
506     tpeer = rx_PeerOf(tconn);
507     otherHost = ubikGetPrimaryInterfaceAddr(rx_HostOf(tpeer));
508     if (offset && offset != otherHost) {
509         /* we *know* this is the wrong guy */
510         code = USYNC;
511         goto failed;
512     }
513
514     dbase = ubik_dbase;
515     DBHOLD(dbase);
516
517     /* abort any active trans that may scribble over the database */
518     urecovery_AbortAll(dbase);
519
520     ubik_print("Ubik: Synchronize database with server %s\n",
521                afs_inet_ntoa(otherHost));
522
523     offset = 0;
524 #ifdef OLD_URECOVERY
525     (*dbase->truncate) (dbase, file, 0);        /* truncate first */
526     tversion.counter = 0;
527 #else
528     epoch =
529 #endif
530     tversion.epoch = 0;         /* start off by labelling in-transit db as invalid */
531     (*dbase->setlabel) (dbase, file, &tversion);        /* setlabel does sync */
532 #ifndef OLD_URECOVERY
533     flen = length;
534     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
535     fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
536     if (fd < 0) {
537         code = errno;
538         goto failed;
539     }
540     code = lseek(fd, HDRSIZE, 0);
541     if (code != HDRSIZE) {
542         close(fd);
543         goto failed;
544     }
545     pass = 0;
546 #endif
547     memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
548     while (length > 0) {
549         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
550 #if !defined(OLD_URECOVERY) && !defined(AFS_PTHREAD_ENV)
551         if (pass % 4 == 0)
552             IOMGR_Poll();
553 #endif
554         code = rx_Read(rxcall, tbuffer, tlen);
555         if (code != tlen) {
556             DBRELE(dbase);
557             ubik_dprint("Rx-read length error=%d\n", code);
558             code = BULK_ERROR;
559             close(fd);
560             goto failed;
561         }
562 #ifdef OLD_URECOVERY
563         code = (*dbase->write) (dbase, file, tbuffer, offset, tlen);
564 #else
565         code = write(fd, tbuffer, tlen);
566         pass++;
567 #endif
568         if (code != tlen) {
569             DBRELE(dbase);
570             ubik_dprint("write failed error=%d\n", code);
571             code = UIOERROR;
572             close(fd);
573             goto failed;
574         }
575         offset += tlen;
576         length -= tlen;
577     }
578 #ifndef OLD_URECOVERY
579     code = close(fd);
580     if (code)
581         goto failed;
582 #endif     
583
584     /* sync data first, then write label and resync (resync done by setlabel call).
585      * This way, good label is only on good database. */
586 #ifdef OLD_URECOVERY
587     (*ubik_dbase->sync) (dbase, file);
588 #else
589     afs_snprintf(tbuffer, sizeof(tbuffer), "%s.DB%s%d", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
590 #ifdef AFS_NT40_ENV
591     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
592     code = unlink(pbuffer);
593     if (!code)
594         code = rename(tbuffer, pbuffer);
595     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
596 #endif
597     if (!code) 
598         code = rename(pbuffer, tbuffer);
599     if (!code) {
600         (*ubik_dbase->open) (ubik_dbase, 0);
601 #endif
602         code = (*ubik_dbase->setlabel) (dbase, file, avers);
603 #ifndef OLD_URECOVERY
604     }
605 #ifdef AFS_NT40_ENV
606     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
607     unlink(pbuffer);
608 #endif
609 #endif
610     memcpy(&ubik_dbase->version, avers, sizeof(struct ubik_version));
611     udisk_Invalidate(dbase, file);      /* new dbase, flush disk buffers */
612 #ifdef AFS_PTHREAD_ENV
613     assert(pthread_cond_broadcast(&dbase->version_cond) == 0);
614 #else
615     LWP_NoYieldSignal(&dbase->version);
616 #endif
617     DBRELE(dbase);
618   failed:
619     if (code) {
620 #ifndef OLD_URECOVERY
621         unlink(pbuffer);
622         /* Failed to sync. Allow reads again for now. */
623         if (dbase != NULL) {
624             tversion.epoch = epoch;
625             (*dbase->setlabel) (dbase, file, &tversion);
626         }
627 #endif
628         ubik_print
629             ("Ubik: Synchronize database with server %s failed (error = %d)\n",
630              afs_inet_ntoa(otherHost), code);
631     } else {
632         ubik_print("Ubik: Synchronize database completed\n");
633     }
634     return code;
635 }
636
637
638 afs_int32
639 SDISK_Probe(register struct rx_call *rxcall)
640 {
641     return 0;
642 }
643
644 /*!
645  * \brief Update remote machines addresses in my server list
646  *
647  * Send back my addresses to caller of this RPC
648  * \return zero on success, else 1.
649  */
650 afs_int32
651 SDISK_UpdateInterfaceAddr(register struct rx_call *rxcall, 
652                           UbikInterfaceAddr *inAddr, 
653                           UbikInterfaceAddr *outAddr)
654 {
655     struct ubik_server *ts, *tmp;
656     afs_uint32 remoteAddr;      /* in net byte order */
657     int i, j, found = 0, probableMatch = 0;
658
659     /* copy the output parameters */
660     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR; i++)
661         outAddr->hostAddr[i] = ntohl(ubik_host[i]);
662
663     remoteAddr = htonl(inAddr->hostAddr[0]);
664     for (ts = ubik_servers; ts; ts = ts->next)
665         if (ts->addr[0] == remoteAddr) {        /* both in net byte order */
666             probableMatch = 1;
667             break;
668         }
669
670     if (probableMatch) {
671         /* verify that all addresses in the incoming RPC are
672          ** not part of other server entries in my CellServDB
673          */
674         for (i = 0; !found && (i < UBIK_MAX_INTERFACE_ADDR)
675              && inAddr->hostAddr[i]; i++) {
676             remoteAddr = htonl(inAddr->hostAddr[i]);
677             for (tmp = ubik_servers; (!found && tmp); tmp = tmp->next) {
678                 if (ts == tmp)  /* this is my server */
679                     continue;
680                 for (j = 0; (j < UBIK_MAX_INTERFACE_ADDR) && tmp->addr[j];
681                      j++)
682                     if (remoteAddr == tmp->addr[j]) {
683                         found = 1;
684                         break;
685                     }
686             }
687         }
688     }
689
690     /* if (probableMatch) */
691     /* inconsistent addresses in CellServDB */
692     if (!probableMatch || found) {
693         ubik_print("Inconsistent Cell Info from server: ");
694         for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
695             ubik_print("%s ", afs_inet_ntoa(htonl(inAddr->hostAddr[i])));
696         ubik_print("\n");
697         fflush(stdout);
698         fflush(stderr);
699         printServerInfo();
700         return UBADHOST;
701     }
702
703     /* update our data structures */
704     for (i = 1; i < UBIK_MAX_INTERFACE_ADDR; i++)
705         ts->addr[i] = htonl(inAddr->hostAddr[i]);
706
707     ubik_print("ubik: A Remote Server has addresses: ");
708     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && ts->addr[i]; i++)
709         ubik_print("%s ", afs_inet_ntoa(ts->addr[i]));
710     ubik_print("\n");
711
712     return 0;
713 }
714
715 static void
716 printServerInfo(void)
717 {
718     struct ubik_server *ts;
719     int i, j = 1;
720
721     ubik_print("Local CellServDB:");
722     for (ts = ubik_servers; ts; ts = ts->next, j++) {
723         ubik_print("Server %d: ", j);
724         for (i = 0; (i < UBIK_MAX_INTERFACE_ADDR) && ts->addr[i]; i++)
725             ubik_print("%s ", afs_inet_ntoa(ts->addr[i]));
726     }
727     ubik_print("\n");
728 }
729
730 afs_int32
731 SDISK_SetVersion(struct rx_call *rxcall, struct ubik_tid *atid, 
732                  struct ubik_version *oldversionp, 
733                  struct ubik_version *newversionp)
734 {
735     afs_int32 code = 0;
736     struct ubik_dbase *dbase;
737
738     if ((code = ubik_CheckAuth(rxcall))) {
739         return (code);
740     }
741
742     if (!ubik_currentTrans) {
743         return USYNC;
744     }
745     /* sanity check to make sure only write trans appear here */
746     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
747         return UBADTYPE;
748     }
749
750     /* Should not get this for the sync site */
751     if (ubeacon_AmSyncSite()) {
752         return UDEADLOCK;
753     }
754
755     dbase = ubik_currentTrans->dbase;
756     DBHOLD(dbase);
757     urecovery_CheckTid(atid);
758     if (!ubik_currentTrans) {
759         DBRELE(dbase);
760         return USYNC;
761     }
762
763     /* Set the label if its version matches the sync-site's */
764     if ((oldversionp->epoch == ubik_dbVersion.epoch)
765         && (oldversionp->counter == ubik_dbVersion.counter)) {
766         code = (*dbase->setlabel) (ubik_dbase, 0, newversionp);
767         if (!code) {
768             ubik_dbase->version = *newversionp;
769             ubik_dbVersion = *newversionp;
770         }
771     } else {
772         code = USYNC;
773     }
774
775     DBRELE(dbase);
776     return code;
777 }