a6ca567a32c85d1edd77e58e20d93c80e0205841
[openafs.git] / src / ubik / remote.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #include <roken.h>
14
15 #include <sys/types.h>
16 #include <string.h>
17 #include <stdarg.h>
18
19 #ifdef AFS_NT40_ENV
20 #include <winsock2.h>
21 #include <fcntl.h>
22 #else
23 #include <sys/file.h>
24 #include <netinet/in.h>
25 #endif
26
27 #include <lock.h>
28 #include <rx/xdr.h>
29 #include <rx/rx.h>
30 #include <errno.h>
31 #include <afs/afsutil.h>
32
33 #define UBIK_INTERNALS
34 #include "ubik.h"
35 #include "ubik_int.h"
36
37 int (*ubik_CheckRXSecurityProc) (void *, struct rx_call *);
38 void *ubik_CheckRXSecurityRock;
39
40 static void printServerInfo(void);
41
42 /*! \file
43  * routines for handling requests remotely-submitted by the sync site.  These are
44  * only write transactions (we don't propagate read trans), and there is at most one
45  * write transaction extant at any one time.
46  */
47
48 struct ubik_trans *ubik_currentTrans = 0;
49
50 int
51 ubik_CheckAuth(struct rx_call *acall)
52 {
53     afs_int32 code;
54     if (ubik_CheckRXSecurityProc) {
55         code = (*ubik_CheckRXSecurityProc) (ubik_CheckRXSecurityRock, acall);
56         return code;
57     } else
58         return 0;
59 }
60
61 /* the rest of these guys handle remote execution of write
62  * transactions: this is the code executed on the other servers when a
63  * sync site is executing a write transaction.
64  */
65 afs_int32
66 SDISK_Begin(struct rx_call *rxcall, struct ubik_tid *atid)
67 {
68     afs_int32 code;
69
70     if ((code = ubik_CheckAuth(rxcall))) {
71         return code;
72     }
73     DBHOLD(ubik_dbase);
74     urecovery_CheckTid(atid, 1);
75     code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
76     if (!code && ubik_currentTrans) {
77         /* label this trans with the right trans id */
78         ubik_currentTrans->tid.epoch = atid->epoch;
79         ubik_currentTrans->tid.counter = atid->counter;
80     }
81     DBRELE(ubik_dbase);
82     return code;
83 }
84
85
86 afs_int32
87 SDISK_Commit(struct rx_call *rxcall, struct ubik_tid *atid)
88 {
89     afs_int32 code;
90     struct ubik_dbase *dbase;
91
92     if ((code = ubik_CheckAuth(rxcall))) {
93         return code;
94     }
95
96     if (!ubik_currentTrans) {
97         return USYNC;
98     }
99     /*
100      * sanity check to make sure only write trans appear here
101      */
102     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
103         return UBADTYPE;
104     }
105
106     dbase = ubik_currentTrans->dbase;
107
108     ObtainWriteLock(&dbase->cache_lock);
109
110     DBHOLD(dbase);
111
112     urecovery_CheckTid(atid, 0);
113     if (!ubik_currentTrans) {
114         DBRELE(dbase);
115         ReleaseWriteLock(&dbase->cache_lock);
116         return USYNC;
117     }
118
119     code = udisk_commit(ubik_currentTrans);
120     if (code == 0) {
121         /* sync site should now match */
122         ubik_dbVersion = ubik_dbase->version;
123     }
124     DBRELE(dbase);
125     ReleaseWriteLock(&dbase->cache_lock);
126     return code;
127 }
128
129 afs_int32
130 SDISK_ReleaseLocks(struct rx_call *rxcall, struct ubik_tid *atid)
131 {
132     struct ubik_dbase *dbase;
133     afs_int32 code;
134
135     if ((code = ubik_CheckAuth(rxcall))) {
136         return code;
137     }
138
139     if (!ubik_currentTrans) {
140         return USYNC;
141     }
142     /* sanity check to make sure only write trans appear here */
143     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
144         return UBADTYPE;
145     }
146
147     dbase = ubik_currentTrans->dbase;
148     DBHOLD(dbase);
149     urecovery_CheckTid(atid, 0);
150     if (!ubik_currentTrans) {
151         DBRELE(dbase);
152         return USYNC;
153     }
154
155     /* If the thread is not waiting for lock - ok to end it */
156 #if !defined(UBIK_PAUSE)
157     if (ubik_currentTrans->locktype != LOCKWAIT) {
158 #endif /* UBIK_PAUSE */
159         udisk_end(ubik_currentTrans);
160 #if !defined(UBIK_PAUSE)
161     }
162 #endif /* UBIK_PAUSE */
163     ubik_currentTrans = (struct ubik_trans *)0;
164     DBRELE(dbase);
165     return 0;
166 }
167
168 afs_int32
169 SDISK_Abort(struct rx_call *rxcall, struct ubik_tid *atid)
170 {
171     afs_int32 code;
172     struct ubik_dbase *dbase;
173
174     if ((code = ubik_CheckAuth(rxcall))) {
175         return code;
176     }
177
178     if (!ubik_currentTrans) {
179         return USYNC;
180     }
181     /* sanity check to make sure only write trans appear here  */
182     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
183         return UBADTYPE;
184     }
185
186     dbase = ubik_currentTrans->dbase;
187     DBHOLD(dbase);
188     urecovery_CheckTid(atid, 0);
189     if (!ubik_currentTrans) {
190         DBRELE(dbase);
191         return USYNC;
192     }
193
194     code = udisk_abort(ubik_currentTrans);
195     /* If the thread is not waiting for lock - ok to end it */
196 #if !defined(UBIK_PAUSE)
197     if (ubik_currentTrans->locktype != LOCKWAIT) {
198 #endif /* UBIK_PAUSE */
199         udisk_end(ubik_currentTrans);
200 #if !defined(UBIK_PAUSE)
201     }
202 #endif /* UBIK_PAUSE */
203     ubik_currentTrans = (struct ubik_trans *)0;
204     DBRELE(dbase);
205     return code;
206 }
207
208 /* apos and alen are not used */
209 afs_int32
210 SDISK_Lock(struct rx_call *rxcall, struct ubik_tid *atid,
211            afs_int32 afile, afs_int32 apos, afs_int32 alen, afs_int32 atype)
212 {
213     afs_int32 code;
214     struct ubik_dbase *dbase;
215     struct ubik_trans *ubik_thisTrans;
216
217     if ((code = ubik_CheckAuth(rxcall))) {
218         return code;
219     }
220     if (!ubik_currentTrans) {
221         return USYNC;
222     }
223     /* sanity check to make sure only write trans appear here */
224     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
225         return UBADTYPE;
226     }
227     if (alen != 1) {
228         return UBADLOCK;
229     }
230     dbase = ubik_currentTrans->dbase;
231     DBHOLD(dbase);
232     urecovery_CheckTid(atid, 0);
233     if (!ubik_currentTrans) {
234         DBRELE(dbase);
235         return USYNC;
236     }
237
238     ubik_thisTrans = ubik_currentTrans;
239     code = ulock_getLock(ubik_currentTrans, atype, 1);
240
241     /* While waiting, the transaction may have been ended/
242      * aborted from under us (urecovery_CheckTid). In that
243      * case, end the transaction here.
244      */
245     if (!code && (ubik_currentTrans != ubik_thisTrans)) {
246         udisk_end(ubik_thisTrans);
247         code = USYNC;
248     }
249
250     DBRELE(dbase);
251     return code;
252 }
253
254 /*!
255  * \brief Write a vector of data
256  */
257 afs_int32
258 SDISK_WriteV(struct rx_call *rxcall, struct ubik_tid *atid,
259              iovec_wrt *io_vector, iovec_buf *io_buffer)
260 {
261     afs_int32 code, i, offset;
262     struct ubik_dbase *dbase;
263     struct ubik_iovec *iovec;
264     char *iobuf;
265
266     if ((code = ubik_CheckAuth(rxcall))) {
267         return code;
268     }
269     if (!ubik_currentTrans) {
270         return USYNC;
271     }
272     /* sanity check to make sure only write trans appear here */
273     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
274         return UBADTYPE;
275     }
276
277     dbase = ubik_currentTrans->dbase;
278     DBHOLD(dbase);
279     urecovery_CheckTid(atid, 0);
280     if (!ubik_currentTrans) {
281         DBRELE(dbase);
282         return USYNC;
283     }
284
285     iovec = (struct ubik_iovec *)io_vector->iovec_wrt_val;
286     iobuf = (char *)io_buffer->iovec_buf_val;
287     for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
288         /* Sanity check for going off end of buffer */
289         if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
290             code = UINTERNAL;
291         } else {
292             code =
293                 udisk_write(ubik_currentTrans, iovec[i].file, &iobuf[offset],
294                             iovec[i].position, iovec[i].length);
295         }
296         if (code)
297             break;
298
299         offset += iovec[i].length;
300     }
301
302     DBRELE(dbase);
303     return code;
304 }
305
306 afs_int32
307 SDISK_Write(struct rx_call *rxcall, struct ubik_tid *atid,
308             afs_int32 afile, afs_int32 apos, bulkdata *adata)
309 {
310     afs_int32 code;
311     struct ubik_dbase *dbase;
312
313     if ((code = ubik_CheckAuth(rxcall))) {
314         return code;
315     }
316     if (!ubik_currentTrans) {
317         return USYNC;
318     }
319     /* sanity check to make sure only write trans appear here */
320     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
321         return UBADTYPE;
322     }
323
324     dbase = ubik_currentTrans->dbase;
325     DBHOLD(dbase);
326     urecovery_CheckTid(atid, 0);
327     if (!ubik_currentTrans) {
328         DBRELE(dbase);
329         return USYNC;
330     }
331     code =
332         udisk_write(ubik_currentTrans, afile, adata->bulkdata_val, apos,
333                     adata->bulkdata_len);
334     DBRELE(dbase);
335     return code;
336 }
337
338 afs_int32
339 SDISK_Truncate(struct rx_call *rxcall, struct ubik_tid *atid,
340                afs_int32 afile, afs_int32 alen)
341 {
342     afs_int32 code;
343     struct ubik_dbase *dbase;
344
345     if ((code = ubik_CheckAuth(rxcall))) {
346         return code;
347     }
348     if (!ubik_currentTrans) {
349         return USYNC;
350     }
351     /* sanity check to make sure only write trans appear here */
352     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
353         return UBADTYPE;
354     }
355
356     dbase = ubik_currentTrans->dbase;
357     DBHOLD(dbase);
358     urecovery_CheckTid(atid, 0);
359     if (!ubik_currentTrans) {
360         DBRELE(dbase);
361         return USYNC;
362     }
363     code = udisk_truncate(ubik_currentTrans, afile, alen);
364     DBRELE(dbase);
365     return code;
366 }
367
368 afs_int32
369 SDISK_GetVersion(struct rx_call *rxcall,
370                  struct ubik_version *aversion)
371 {
372     afs_int32 code;
373
374     if ((code = ubik_CheckAuth(rxcall))) {
375         return code;
376     }
377
378     /*
379      * If we are the sync site, recovery shouldn't be running on any
380      * other site. We shouldn't be getting this RPC as long as we are
381      * the sync site.  To prevent any unforseen activity, we should
382      * reject this RPC until we have recognized that we are not the
383      * sync site anymore, and/or if we have any pending WRITE
384      * transactions that have to complete. This way we can be assured
385      * that this RPC would not block any pending transactions that
386      * should either fail or pass. If we have recognized the fact that
387      * we are not the sync site any more, all write transactions would
388      * fail with UNOQUORUM anyway.
389      */
390     if (ubeacon_AmSyncSite()) {
391         return UDEADLOCK;
392     }
393
394     DBHOLD(ubik_dbase);
395     code = (*ubik_dbase->getlabel) (ubik_dbase, 0, aversion);
396     DBRELE(ubik_dbase);
397     if (code) {
398         /* tell other side there's no dbase */
399         aversion->epoch = 0;
400         aversion->counter = 0;
401     }
402     return 0;
403 }
404
405 afs_int32
406 SDISK_GetFile(struct rx_call *rxcall, afs_int32 file,
407               struct ubik_version *version)
408 {
409     afs_int32 code;
410     struct ubik_dbase *dbase;
411     afs_int32 offset;
412     struct ubik_stat ubikstat;
413     char tbuffer[256];
414     afs_int32 tlen;
415     afs_int32 length;
416
417     if ((code = ubik_CheckAuth(rxcall))) {
418         return code;
419     }
420 /* temporarily disabled because it causes problems for migration tool.  Hey, it's just
421  * a sanity check, anyway.
422     if (ubeacon_AmSyncSite()) {
423       return UDEADLOCK;
424     }
425 */
426     dbase = ubik_dbase;
427     DBHOLD(dbase);
428     code = (*dbase->stat) (dbase, file, &ubikstat);
429     if (code < 0) {
430         DBRELE(dbase);
431         return code;
432     }
433     length = ubikstat.size;
434     tlen = htonl(length);
435     code = rx_Write(rxcall, (char *)&tlen, sizeof(afs_int32));
436     if (code != sizeof(afs_int32)) {
437         DBRELE(dbase);
438         ubik_dprint("Rx-write length error=%d\n", code);
439         return BULK_ERROR;
440     }
441     offset = 0;
442     while (length > 0) {
443         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
444         code = (*dbase->read) (dbase, file, tbuffer, offset, tlen);
445         if (code != tlen) {
446             DBRELE(dbase);
447             ubik_dprint("read failed error=%d\n", code);
448             return UIOERROR;
449         }
450         code = rx_Write(rxcall, tbuffer, tlen);
451         if (code != tlen) {
452             DBRELE(dbase);
453             ubik_dprint("Rx-write length error=%d\n", code);
454             return BULK_ERROR;
455         }
456         length -= tlen;
457         offset += tlen;
458     }
459     code = (*dbase->getlabel) (dbase, file, version);   /* return the dbase, too */
460     DBRELE(dbase);
461     return code;
462 }
463
464 afs_int32
465 SDISK_SendFile(struct rx_call *rxcall, afs_int32 file,
466                afs_int32 length, struct ubik_version *avers)
467 {
468     afs_int32 code;
469     struct ubik_dbase *dbase = NULL;
470     char tbuffer[1024];
471     afs_int32 offset;
472     struct ubik_version tversion;
473     int tlen;
474     struct rx_peer *tpeer;
475     struct rx_connection *tconn;
476     afs_uint32 otherHost = 0;
477     char hoststr[16];
478 #ifndef OLD_URECOVERY
479     char pbuffer[1028];
480     int fd = -1;
481     afs_int32 epoch = 0;
482     afs_int32 pass;
483 #endif
484
485     /* send the file back to the requester */
486
487     if ((code = ubik_CheckAuth(rxcall))) {
488         goto failed;
489     }
490
491     /* next, we do a sanity check to see if the guy sending us the database is
492      * the guy we think is the sync site.  It turns out that we might not have
493      * decided yet that someone's the sync site, but they could have enough
494      * votes from others to be sync site anyway, and could send us the database
495      * in advance of getting our votes.  This is fine, what we're really trying
496      * to check is that some authenticated bogon isn't sending a random database
497      * into another configuration.  This could happen on a bad configuration
498      * screwup.  Thus, we only object if we're sure we know who the sync site
499      * is, and it ain't the guy talking to us.
500      */
501     offset = uvote_GetSyncSite();
502     tconn = rx_ConnectionOf(rxcall);
503     tpeer = rx_PeerOf(tconn);
504     otherHost = ubikGetPrimaryInterfaceAddr(rx_HostOf(tpeer));
505     if (offset && offset != otherHost) {
506         /* we *know* this is the wrong guy */
507         code = USYNC;
508         goto failed;
509     }
510
511     dbase = ubik_dbase;
512     DBHOLD(dbase);
513
514     /* abort any active trans that may scribble over the database */
515     urecovery_AbortAll(dbase);
516
517     ubik_print("Ubik: Synchronize database with server %s\n",
518                afs_inet_ntoa_r(otherHost, hoststr));
519
520     offset = 0;
521 #ifdef OLD_URECOVERY
522     (*dbase->truncate) (dbase, file, 0);        /* truncate first */
523     tversion.counter = 0;
524 #else
525     epoch =
526 #endif
527     tversion.epoch = 0;         /* start off by labelling in-transit db as invalid */
528     (*dbase->setlabel) (dbase, file, &tversion);        /* setlabel does sync */
529 #ifndef OLD_URECOVERY
530     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
531     fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
532     if (fd < 0) {
533         code = errno;
534         goto failed;
535     }
536     code = lseek(fd, HDRSIZE, 0);
537     if (code != HDRSIZE) {
538         close(fd);
539         goto failed;
540     }
541     pass = 0;
542 #endif
543     memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
544     while (length > 0) {
545         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
546 #if !defined(OLD_URECOVERY) && !defined(AFS_PTHREAD_ENV)
547         if (pass % 4 == 0)
548             IOMGR_Poll();
549 #endif
550         code = rx_Read(rxcall, tbuffer, tlen);
551         if (code != tlen) {
552             DBRELE(dbase);
553             ubik_dprint("Rx-read length error=%d\n", code);
554             code = BULK_ERROR;
555             close(fd);
556             goto failed;
557         }
558 #ifdef OLD_URECOVERY
559         code = (*dbase->write) (dbase, file, tbuffer, offset, tlen);
560 #else
561         code = write(fd, tbuffer, tlen);
562         pass++;
563 #endif
564         if (code != tlen) {
565             DBRELE(dbase);
566             ubik_dprint("write failed error=%d\n", code);
567             code = UIOERROR;
568             close(fd);
569             goto failed;
570         }
571         offset += tlen;
572         length -= tlen;
573     }
574 #ifndef OLD_URECOVERY
575     code = close(fd);
576     if (code)
577         goto failed;
578 #endif
579
580     /* sync data first, then write label and resync (resync done by setlabel call).
581      * This way, good label is only on good database. */
582 #ifdef OLD_URECOVERY
583     (*ubik_dbase->sync) (dbase, file);
584 #else
585     afs_snprintf(tbuffer, sizeof(tbuffer), "%s.DB%s%d", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
586 #ifdef AFS_NT40_ENV
587     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
588     code = unlink(pbuffer);
589     if (!code)
590         code = rename(tbuffer, pbuffer);
591     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
592 #endif
593     if (!code)
594         code = rename(pbuffer, tbuffer);
595     if (!code) {
596         (*ubik_dbase->open) (ubik_dbase, file);
597 #endif
598         code = (*ubik_dbase->setlabel) (dbase, file, avers);
599 #ifndef OLD_URECOVERY
600     }
601 #ifdef AFS_NT40_ENV
602     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
603     unlink(pbuffer);
604 #endif
605 #endif
606     memcpy(&ubik_dbase->version, avers, sizeof(struct ubik_version));
607     udisk_Invalidate(dbase, file);      /* new dbase, flush disk buffers */
608 #ifdef AFS_PTHREAD_ENV
609     assert(pthread_cond_broadcast(&dbase->version_cond) == 0);
610 #else
611     LWP_NoYieldSignal(&dbase->version);
612 #endif
613     DBRELE(dbase);
614   failed:
615     if (code) {
616 #ifndef OLD_URECOVERY
617         unlink(pbuffer);
618         /* Failed to sync. Allow reads again for now. */
619         if (dbase != NULL) {
620             tversion.epoch = epoch;
621             (*dbase->setlabel) (dbase, file, &tversion);
622         }
623 #endif
624         ubik_print
625             ("Ubik: Synchronize database with server %s failed (error = %d)\n",
626              afs_inet_ntoa_r(otherHost, hoststr), code);
627     } else {
628         ubik_print("Ubik: Synchronize database completed\n");
629     }
630     return code;
631 }
632
633
634 afs_int32
635 SDISK_Probe(struct rx_call *rxcall)
636 {
637     return 0;
638 }
639
640 /*!
641  * \brief Update remote machines addresses in my server list
642  *
643  * Send back my addresses to caller of this RPC
644  * \return zero on success, else 1.
645  */
646 afs_int32
647 SDISK_UpdateInterfaceAddr(struct rx_call *rxcall,
648                           UbikInterfaceAddr *inAddr,
649                           UbikInterfaceAddr *outAddr)
650 {
651     struct ubik_server *ts, *tmp;
652     afs_uint32 remoteAddr;      /* in net byte order */
653     int i, j, found = 0, probableMatch = 0;
654     char hoststr[16];
655
656     /* copy the output parameters */
657     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR; i++)
658         outAddr->hostAddr[i] = ntohl(ubik_host[i]);
659
660     remoteAddr = htonl(inAddr->hostAddr[0]);
661     for (ts = ubik_servers; ts; ts = ts->next)
662         if (ts->addr[0] == remoteAddr) {        /* both in net byte order */
663             probableMatch = 1;
664             break;
665         }
666
667     if (probableMatch) {
668         /* verify that all addresses in the incoming RPC are
669          ** not part of other server entries in my CellServDB
670          */
671         for (i = 0; !found && (i < UBIK_MAX_INTERFACE_ADDR)
672              && inAddr->hostAddr[i]; i++) {
673             remoteAddr = htonl(inAddr->hostAddr[i]);
674             for (tmp = ubik_servers; (!found && tmp); tmp = tmp->next) {
675                 if (ts == tmp)  /* this is my server */
676                     continue;
677                 for (j = 0; (j < UBIK_MAX_INTERFACE_ADDR) && tmp->addr[j];
678                      j++)
679                     if (remoteAddr == tmp->addr[j]) {
680                         found = 1;
681                         break;
682                     }
683             }
684         }
685     }
686
687     /* if (probableMatch) */
688     /* inconsistent addresses in CellServDB */
689     if (!probableMatch || found) {
690         ubik_print("Inconsistent Cell Info from server: ");
691         for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
692             ubik_print("%s ", afs_inet_ntoa_r(htonl(inAddr->hostAddr[i]), hoststr));
693         ubik_print("\n");
694         fflush(stdout);
695         fflush(stderr);
696         printServerInfo();
697         return UBADHOST;
698     }
699
700     /* update our data structures */
701     for (i = 1; i < UBIK_MAX_INTERFACE_ADDR; i++)
702         ts->addr[i] = htonl(inAddr->hostAddr[i]);
703
704     ubik_print("ubik: A Remote Server has addresses: ");
705     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && ts->addr[i]; i++)
706         ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
707     ubik_print("\n");
708
709     return 0;
710 }
711
712 static void
713 printServerInfo(void)
714 {
715     struct ubik_server *ts;
716     int i, j = 1;
717     char hoststr[16];
718
719     ubik_print("Local CellServDB:");
720     for (ts = ubik_servers; ts; ts = ts->next, j++) {
721         ubik_print("Server %d: ", j);
722         for (i = 0; (i < UBIK_MAX_INTERFACE_ADDR) && ts->addr[i]; i++)
723             ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
724     }
725     ubik_print("\n");
726 }
727
728 afs_int32
729 SDISK_SetVersion(struct rx_call *rxcall, struct ubik_tid *atid,
730                  struct ubik_version *oldversionp,
731                  struct ubik_version *newversionp)
732 {
733     afs_int32 code = 0;
734     struct ubik_dbase *dbase;
735
736     if ((code = ubik_CheckAuth(rxcall))) {
737         return (code);
738     }
739
740     if (!ubik_currentTrans) {
741         return USYNC;
742     }
743     /* sanity check to make sure only write trans appear here */
744     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
745         return UBADTYPE;
746     }
747
748     /* Should not get this for the sync site */
749     if (ubeacon_AmSyncSite()) {
750         return UDEADLOCK;
751     }
752
753     dbase = ubik_currentTrans->dbase;
754     DBHOLD(dbase);
755     urecovery_CheckTid(atid, 0);
756     if (!ubik_currentTrans) {
757         DBRELE(dbase);
758         return USYNC;
759     }
760
761     /* Set the label if its version matches the sync-site's */
762     if ((oldversionp->epoch == ubik_dbVersion.epoch)
763         && (oldversionp->counter == ubik_dbVersion.counter)) {
764         code = (*dbase->setlabel) (ubik_dbase, 0, newversionp);
765         if (!code) {
766             ubik_dbase->version = *newversionp;
767             ubik_dbVersion = *newversionp;
768         }
769     } else {
770         code = USYNC;
771     }
772
773     DBRELE(dbase);
774     return code;
775 }