4069cbd1bba140535d111c3b05294330843d0a53
[openafs.git] / src / ubik / remote.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13
14 #include <sys/types.h>
15 #include <string.h>
16 #include <stdarg.h>
17
18 #ifdef AFS_NT40_ENV
19 #include <winsock2.h>
20 #include <fcntl.h>
21 #else
22 #include <sys/file.h>
23 #include <netinet/in.h>
24 #endif
25
26 #include <lock.h>
27 #include <rx/xdr.h>
28 #include <rx/rx.h>
29 #include <errno.h>
30 #include <afs/afsutil.h>
31
32 #define UBIK_INTERNALS
33 #include "ubik.h"
34 #include "ubik_int.h"
35
36 int (*ubik_CheckRXSecurityProc) (void *, struct rx_call *);
37 void *ubik_CheckRXSecurityRock;
38
39 static void printServerInfo(void);
40
41 /*! \file
42  * routines for handling requests remotely-submitted by the sync site.  These are
43  * only write transactions (we don't propagate read trans), and there is at most one
44  * write transaction extant at any one time.
45  */
46
47 struct ubik_trans *ubik_currentTrans = 0;
48
49 int
50 ubik_CheckAuth(struct rx_call *acall)
51 {
52     afs_int32 code;
53     if (ubik_CheckRXSecurityProc) {
54         code = (*ubik_CheckRXSecurityProc) (ubik_CheckRXSecurityRock, acall);
55         return code;
56     } else
57         return 0;
58 }
59
60 /* the rest of these guys handle remote execution of write
61  * transactions: this is the code executed on the other servers when a
62  * sync site is executing a write transaction.
63  */
64 afs_int32
65 SDISK_Begin(struct rx_call *rxcall, struct ubik_tid *atid)
66 {
67     afs_int32 code;
68
69     if ((code = ubik_CheckAuth(rxcall))) {
70         return code;
71     }
72     DBHOLD(ubik_dbase);
73     urecovery_CheckTid(atid, 1);
74     code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
75     if (!code && ubik_currentTrans) {
76         /* label this trans with the right trans id */
77         ubik_currentTrans->tid.epoch = atid->epoch;
78         ubik_currentTrans->tid.counter = atid->counter;
79     }
80     DBRELE(ubik_dbase);
81     return code;
82 }
83
84
85 afs_int32
86 SDISK_Commit(struct rx_call *rxcall, struct ubik_tid *atid)
87 {
88     afs_int32 code;
89     struct ubik_dbase *dbase;
90
91     if ((code = ubik_CheckAuth(rxcall))) {
92         return code;
93     }
94
95     if (!ubik_currentTrans) {
96         return USYNC;
97     }
98     /*
99      * sanity check to make sure only write trans appear here
100      */
101     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
102         return UBADTYPE;
103     }
104
105     dbase = ubik_currentTrans->dbase;
106
107     ObtainWriteLock(&dbase->cache_lock);
108
109     DBHOLD(dbase);
110
111     urecovery_CheckTid(atid, 0);
112     if (!ubik_currentTrans) {
113         DBRELE(dbase);
114         ReleaseWriteLock(&dbase->cache_lock);
115         return USYNC;
116     }
117
118     code = udisk_commit(ubik_currentTrans);
119     if (code == 0) {
120         /* sync site should now match */
121         ubik_dbVersion = ubik_dbase->version;
122     }
123     DBRELE(dbase);
124     ReleaseWriteLock(&dbase->cache_lock);
125     return code;
126 }
127
128 afs_int32
129 SDISK_ReleaseLocks(struct rx_call *rxcall, struct ubik_tid *atid)
130 {
131     struct ubik_dbase *dbase;
132     afs_int32 code;
133
134     if ((code = ubik_CheckAuth(rxcall))) {
135         return code;
136     }
137
138     if (!ubik_currentTrans) {
139         return USYNC;
140     }
141     /* sanity check to make sure only write trans appear here */
142     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
143         return UBADTYPE;
144     }
145
146     dbase = ubik_currentTrans->dbase;
147     DBHOLD(dbase);
148     urecovery_CheckTid(atid, 0);
149     if (!ubik_currentTrans) {
150         DBRELE(dbase);
151         return USYNC;
152     }
153
154     /* If the thread is not waiting for lock - ok to end it */
155 #if !defined(UBIK_PAUSE)
156     if (ubik_currentTrans->locktype != LOCKWAIT) {
157 #endif /* UBIK_PAUSE */
158         udisk_end(ubik_currentTrans);
159 #if !defined(UBIK_PAUSE)
160     }
161 #endif /* UBIK_PAUSE */
162     ubik_currentTrans = (struct ubik_trans *)0;
163     DBRELE(dbase);
164     return 0;
165 }
166
167 afs_int32
168 SDISK_Abort(struct rx_call *rxcall, struct ubik_tid *atid)
169 {
170     afs_int32 code;
171     struct ubik_dbase *dbase;
172
173     if ((code = ubik_CheckAuth(rxcall))) {
174         return code;
175     }
176
177     if (!ubik_currentTrans) {
178         return USYNC;
179     }
180     /* sanity check to make sure only write trans appear here  */
181     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
182         return UBADTYPE;
183     }
184
185     dbase = ubik_currentTrans->dbase;
186     DBHOLD(dbase);
187     urecovery_CheckTid(atid, 0);
188     if (!ubik_currentTrans) {
189         DBRELE(dbase);
190         return USYNC;
191     }
192
193     code = udisk_abort(ubik_currentTrans);
194     /* If the thread is not waiting for lock - ok to end it */
195 #if !defined(UBIK_PAUSE)
196     if (ubik_currentTrans->locktype != LOCKWAIT) {
197 #endif /* UBIK_PAUSE */
198         udisk_end(ubik_currentTrans);
199 #if !defined(UBIK_PAUSE)
200     }
201 #endif /* UBIK_PAUSE */
202     ubik_currentTrans = (struct ubik_trans *)0;
203     DBRELE(dbase);
204     return code;
205 }
206
207 /* apos and alen are not used */
208 afs_int32
209 SDISK_Lock(struct rx_call *rxcall, struct ubik_tid *atid,
210            afs_int32 afile, afs_int32 apos, afs_int32 alen, afs_int32 atype)
211 {
212     afs_int32 code;
213     struct ubik_dbase *dbase;
214     struct ubik_trans *ubik_thisTrans;
215
216     if ((code = ubik_CheckAuth(rxcall))) {
217         return code;
218     }
219     if (!ubik_currentTrans) {
220         return USYNC;
221     }
222     /* sanity check to make sure only write trans appear here */
223     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
224         return UBADTYPE;
225     }
226     if (alen != 1) {
227         return UBADLOCK;
228     }
229     dbase = ubik_currentTrans->dbase;
230     DBHOLD(dbase);
231     urecovery_CheckTid(atid, 0);
232     if (!ubik_currentTrans) {
233         DBRELE(dbase);
234         return USYNC;
235     }
236
237     ubik_thisTrans = ubik_currentTrans;
238     code = ulock_getLock(ubik_currentTrans, atype, 1);
239
240     /* While waiting, the transaction may have been ended/
241      * aborted from under us (urecovery_CheckTid). In that
242      * case, end the transaction here.
243      */
244     if (!code && (ubik_currentTrans != ubik_thisTrans)) {
245         udisk_end(ubik_thisTrans);
246         code = USYNC;
247     }
248
249     DBRELE(dbase);
250     return code;
251 }
252
253 /*!
254  * \brief Write a vector of data
255  */
256 afs_int32
257 SDISK_WriteV(struct rx_call *rxcall, struct ubik_tid *atid,
258              iovec_wrt *io_vector, iovec_buf *io_buffer)
259 {
260     afs_int32 code, i, offset;
261     struct ubik_dbase *dbase;
262     struct ubik_iovec *iovec;
263     char *iobuf;
264
265     if ((code = ubik_CheckAuth(rxcall))) {
266         return code;
267     }
268     if (!ubik_currentTrans) {
269         return USYNC;
270     }
271     /* sanity check to make sure only write trans appear here */
272     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
273         return UBADTYPE;
274     }
275
276     dbase = ubik_currentTrans->dbase;
277     DBHOLD(dbase);
278     urecovery_CheckTid(atid, 0);
279     if (!ubik_currentTrans) {
280         DBRELE(dbase);
281         return USYNC;
282     }
283
284     iovec = (struct ubik_iovec *)io_vector->iovec_wrt_val;
285     iobuf = (char *)io_buffer->iovec_buf_val;
286     for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
287         /* Sanity check for going off end of buffer */
288         if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
289             code = UINTERNAL;
290         } else {
291             code =
292                 udisk_write(ubik_currentTrans, iovec[i].file, &iobuf[offset],
293                             iovec[i].position, iovec[i].length);
294         }
295         if (code)
296             break;
297
298         offset += iovec[i].length;
299     }
300
301     DBRELE(dbase);
302     return code;
303 }
304
305 afs_int32
306 SDISK_Write(struct rx_call *rxcall, struct ubik_tid *atid,
307             afs_int32 afile, afs_int32 apos, bulkdata *adata)
308 {
309     afs_int32 code;
310     struct ubik_dbase *dbase;
311
312     if ((code = ubik_CheckAuth(rxcall))) {
313         return code;
314     }
315     if (!ubik_currentTrans) {
316         return USYNC;
317     }
318     /* sanity check to make sure only write trans appear here */
319     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
320         return UBADTYPE;
321     }
322
323     dbase = ubik_currentTrans->dbase;
324     DBHOLD(dbase);
325     urecovery_CheckTid(atid, 0);
326     if (!ubik_currentTrans) {
327         DBRELE(dbase);
328         return USYNC;
329     }
330     code =
331         udisk_write(ubik_currentTrans, afile, adata->bulkdata_val, apos,
332                     adata->bulkdata_len);
333     DBRELE(dbase);
334     return code;
335 }
336
337 afs_int32
338 SDISK_Truncate(struct rx_call *rxcall, struct ubik_tid *atid,
339                afs_int32 afile, afs_int32 alen)
340 {
341     afs_int32 code;
342     struct ubik_dbase *dbase;
343
344     if ((code = ubik_CheckAuth(rxcall))) {
345         return code;
346     }
347     if (!ubik_currentTrans) {
348         return USYNC;
349     }
350     /* sanity check to make sure only write trans appear here */
351     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
352         return UBADTYPE;
353     }
354
355     dbase = ubik_currentTrans->dbase;
356     DBHOLD(dbase);
357     urecovery_CheckTid(atid, 0);
358     if (!ubik_currentTrans) {
359         DBRELE(dbase);
360         return USYNC;
361     }
362     code = udisk_truncate(ubik_currentTrans, afile, alen);
363     DBRELE(dbase);
364     return code;
365 }
366
367 afs_int32
368 SDISK_GetVersion(struct rx_call *rxcall,
369                  struct ubik_version *aversion)
370 {
371     afs_int32 code;
372
373     if ((code = ubik_CheckAuth(rxcall))) {
374         return code;
375     }
376
377     /*
378      * If we are the sync site, recovery shouldn't be running on any
379      * other site. We shouldn't be getting this RPC as long as we are
380      * the sync site.  To prevent any unforseen activity, we should
381      * reject this RPC until we have recognized that we are not the
382      * sync site anymore, and/or if we have any pending WRITE
383      * transactions that have to complete. This way we can be assured
384      * that this RPC would not block any pending transactions that
385      * should either fail or pass. If we have recognized the fact that
386      * we are not the sync site any more, all write transactions would
387      * fail with UNOQUORUM anyway.
388      */
389     if (ubeacon_AmSyncSite()) {
390         return UDEADLOCK;
391     }
392
393     DBHOLD(ubik_dbase);
394     code = (*ubik_dbase->getlabel) (ubik_dbase, 0, aversion);
395     DBRELE(ubik_dbase);
396     if (code) {
397         /* tell other side there's no dbase */
398         aversion->epoch = 0;
399         aversion->counter = 0;
400     }
401     return 0;
402 }
403
404 afs_int32
405 SDISK_GetFile(struct rx_call *rxcall, afs_int32 file,
406               struct ubik_version *version)
407 {
408     afs_int32 code;
409     struct ubik_dbase *dbase;
410     afs_int32 offset;
411     struct ubik_stat ubikstat;
412     char tbuffer[256];
413     afs_int32 tlen;
414     afs_int32 length;
415
416     if ((code = ubik_CheckAuth(rxcall))) {
417         return code;
418     }
419 /* temporarily disabled because it causes problems for migration tool.  Hey, it's just
420  * a sanity check, anyway.
421     if (ubeacon_AmSyncSite()) {
422       return UDEADLOCK;
423     }
424 */
425     dbase = ubik_dbase;
426     DBHOLD(dbase);
427     code = (*dbase->stat) (dbase, file, &ubikstat);
428     if (code < 0) {
429         DBRELE(dbase);
430         return code;
431     }
432     length = ubikstat.size;
433     tlen = htonl(length);
434     code = rx_Write(rxcall, (char *)&tlen, sizeof(afs_int32));
435     if (code != sizeof(afs_int32)) {
436         DBRELE(dbase);
437         ubik_dprint("Rx-write length error=%d\n", code);
438         return BULK_ERROR;
439     }
440     offset = 0;
441     while (length > 0) {
442         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
443         code = (*dbase->read) (dbase, file, tbuffer, offset, tlen);
444         if (code != tlen) {
445             DBRELE(dbase);
446             ubik_dprint("read failed error=%d\n", code);
447             return UIOERROR;
448         }
449         code = rx_Write(rxcall, tbuffer, tlen);
450         if (code != tlen) {
451             DBRELE(dbase);
452             ubik_dprint("Rx-write length error=%d\n", code);
453             return BULK_ERROR;
454         }
455         length -= tlen;
456         offset += tlen;
457     }
458     code = (*dbase->getlabel) (dbase, file, version);   /* return the dbase, too */
459     DBRELE(dbase);
460     return code;
461 }
462
463 afs_int32
464 SDISK_SendFile(struct rx_call *rxcall, afs_int32 file,
465                afs_int32 length, struct ubik_version *avers)
466 {
467     afs_int32 code;
468     struct ubik_dbase *dbase = NULL;
469     char tbuffer[1024];
470     afs_int32 offset;
471     struct ubik_version tversion;
472     int tlen;
473     struct rx_peer *tpeer;
474     struct rx_connection *tconn;
475     afs_uint32 otherHost = 0;
476     char hoststr[16];
477 #ifndef OLD_URECOVERY
478     char pbuffer[1028];
479     int flen, fd = -1;
480     afs_int32 epoch = 0;
481     afs_int32 pass;
482 #endif
483
484     /* send the file back to the requester */
485
486     if ((code = ubik_CheckAuth(rxcall))) {
487         goto failed;
488     }
489
490     /* next, we do a sanity check to see if the guy sending us the database is
491      * the guy we think is the sync site.  It turns out that we might not have
492      * decided yet that someone's the sync site, but they could have enough
493      * votes from others to be sync site anyway, and could send us the database
494      * in advance of getting our votes.  This is fine, what we're really trying
495      * to check is that some authenticated bogon isn't sending a random database
496      * into another configuration.  This could happen on a bad configuration
497      * screwup.  Thus, we only object if we're sure we know who the sync site
498      * is, and it ain't the guy talking to us.
499      */
500     offset = uvote_GetSyncSite();
501     tconn = rx_ConnectionOf(rxcall);
502     tpeer = rx_PeerOf(tconn);
503     otherHost = ubikGetPrimaryInterfaceAddr(rx_HostOf(tpeer));
504     if (offset && offset != otherHost) {
505         /* we *know* this is the wrong guy */
506         code = USYNC;
507         goto failed;
508     }
509
510     dbase = ubik_dbase;
511     DBHOLD(dbase);
512
513     /* abort any active trans that may scribble over the database */
514     urecovery_AbortAll(dbase);
515
516     ubik_print("Ubik: Synchronize database with server %s\n",
517                afs_inet_ntoa_r(otherHost, hoststr));
518
519     offset = 0;
520 #ifdef OLD_URECOVERY
521     (*dbase->truncate) (dbase, file, 0);        /* truncate first */
522     tversion.counter = 0;
523 #else
524     epoch =
525 #endif
526     tversion.epoch = 0;         /* start off by labelling in-transit db as invalid */
527     (*dbase->setlabel) (dbase, file, &tversion);        /* setlabel does sync */
528 #ifndef OLD_URECOVERY
529     flen = length;
530     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
531     fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
532     if (fd < 0) {
533         code = errno;
534         goto failed;
535     }
536     code = lseek(fd, HDRSIZE, 0);
537     if (code != HDRSIZE) {
538         close(fd);
539         goto failed;
540     }
541     pass = 0;
542 #endif
543     memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
544     while (length > 0) {
545         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
546 #if !defined(OLD_URECOVERY) && !defined(AFS_PTHREAD_ENV)
547         if (pass % 4 == 0)
548             IOMGR_Poll();
549 #endif
550         code = rx_Read(rxcall, tbuffer, tlen);
551         if (code != tlen) {
552             DBRELE(dbase);
553             ubik_dprint("Rx-read length error=%d\n", code);
554             code = BULK_ERROR;
555             close(fd);
556             goto failed;
557         }
558 #ifdef OLD_URECOVERY
559         code = (*dbase->write) (dbase, file, tbuffer, offset, tlen);
560 #else
561         code = write(fd, tbuffer, tlen);
562         pass++;
563 #endif
564         if (code != tlen) {
565             DBRELE(dbase);
566             ubik_dprint("write failed error=%d\n", code);
567             code = UIOERROR;
568             close(fd);
569             goto failed;
570         }
571         offset += tlen;
572         length -= tlen;
573     }
574 #ifndef OLD_URECOVERY
575     code = close(fd);
576     if (code)
577         goto failed;
578 #endif
579
580     /* sync data first, then write label and resync (resync done by setlabel call).
581      * This way, good label is only on good database. */
582 #ifdef OLD_URECOVERY
583     (*ubik_dbase->sync) (dbase, file);
584 #else
585     afs_snprintf(tbuffer, sizeof(tbuffer), "%s.DB%s%d", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
586 #ifdef AFS_NT40_ENV
587     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
588     code = unlink(pbuffer);
589     if (!code)
590         code = rename(tbuffer, pbuffer);
591     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
592 #endif
593     if (!code)
594         code = rename(pbuffer, tbuffer);
595     if (!code) {
596         (*ubik_dbase->open) (ubik_dbase, file);
597 #endif
598         code = (*ubik_dbase->setlabel) (dbase, file, avers);
599 #ifndef OLD_URECOVERY
600     }
601 #ifdef AFS_NT40_ENV
602     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
603     unlink(pbuffer);
604 #endif
605 #endif
606     memcpy(&ubik_dbase->version, avers, sizeof(struct ubik_version));
607     udisk_Invalidate(dbase, file);      /* new dbase, flush disk buffers */
608 #ifdef AFS_PTHREAD_ENV
609     assert(pthread_cond_broadcast(&dbase->version_cond) == 0);
610 #else
611     LWP_NoYieldSignal(&dbase->version);
612 #endif
613     DBRELE(dbase);
614   failed:
615     if (code) {
616 #ifndef OLD_URECOVERY
617         unlink(pbuffer);
618         /* Failed to sync. Allow reads again for now. */
619         if (dbase != NULL) {
620             tversion.epoch = epoch;
621             (*dbase->setlabel) (dbase, file, &tversion);
622         }
623 #endif
624         ubik_print
625             ("Ubik: Synchronize database with server %s failed (error = %d)\n",
626              afs_inet_ntoa_r(otherHost, hoststr), code);
627     } else {
628         ubik_print("Ubik: Synchronize database completed\n");
629     }
630     return code;
631 }
632
633
634 afs_int32
635 SDISK_Probe(struct rx_call *rxcall)
636 {
637     return 0;
638 }
639
640 /*!
641  * \brief Update remote machines addresses in my server list
642  *
643  * Send back my addresses to caller of this RPC
644  * \return zero on success, else 1.
645  */
646 afs_int32
647 SDISK_UpdateInterfaceAddr(struct rx_call *rxcall,
648                           UbikInterfaceAddr *inAddr,
649                           UbikInterfaceAddr *outAddr)
650 {
651     struct ubik_server *ts, *tmp;
652     afs_uint32 remoteAddr;      /* in net byte order */
653     int i, j, found = 0, probableMatch = 0;
654     char hoststr[16];
655
656     /* copy the output parameters */
657     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR; i++)
658         outAddr->hostAddr[i] = ntohl(ubik_host[i]);
659
660     remoteAddr = htonl(inAddr->hostAddr[0]);
661     for (ts = ubik_servers; ts; ts = ts->next)
662         if (ts->addr[0] == remoteAddr) {        /* both in net byte order */
663             probableMatch = 1;
664             break;
665         }
666
667     if (probableMatch) {
668         /* verify that all addresses in the incoming RPC are
669          ** not part of other server entries in my CellServDB
670          */
671         for (i = 0; !found && (i < UBIK_MAX_INTERFACE_ADDR)
672              && inAddr->hostAddr[i]; i++) {
673             remoteAddr = htonl(inAddr->hostAddr[i]);
674             for (tmp = ubik_servers; (!found && tmp); tmp = tmp->next) {
675                 if (ts == tmp)  /* this is my server */
676                     continue;
677                 for (j = 0; (j < UBIK_MAX_INTERFACE_ADDR) && tmp->addr[j];
678                      j++)
679                     if (remoteAddr == tmp->addr[j]) {
680                         found = 1;
681                         break;
682                     }
683             }
684         }
685     }
686
687     /* if (probableMatch) */
688     /* inconsistent addresses in CellServDB */
689     if (!probableMatch || found) {
690         ubik_print("Inconsistent Cell Info from server: ");
691         for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
692             ubik_print("%s ", afs_inet_ntoa_r(htonl(inAddr->hostAddr[i]), hoststr));
693         ubik_print("\n");
694         fflush(stdout);
695         fflush(stderr);
696         printServerInfo();
697         return UBADHOST;
698     }
699
700     /* update our data structures */
701     for (i = 1; i < UBIK_MAX_INTERFACE_ADDR; i++)
702         ts->addr[i] = htonl(inAddr->hostAddr[i]);
703
704     ubik_print("ubik: A Remote Server has addresses: ");
705     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && ts->addr[i]; i++)
706         ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
707     ubik_print("\n");
708
709     return 0;
710 }
711
712 static void
713 printServerInfo(void)
714 {
715     struct ubik_server *ts;
716     int i, j = 1;
717     char hoststr[16];
718
719     ubik_print("Local CellServDB:");
720     for (ts = ubik_servers; ts; ts = ts->next, j++) {
721         ubik_print("Server %d: ", j);
722         for (i = 0; (i < UBIK_MAX_INTERFACE_ADDR) && ts->addr[i]; i++)
723             ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
724     }
725     ubik_print("\n");
726 }
727
728 afs_int32
729 SDISK_SetVersion(struct rx_call *rxcall, struct ubik_tid *atid,
730                  struct ubik_version *oldversionp,
731                  struct ubik_version *newversionp)
732 {
733     afs_int32 code = 0;
734     struct ubik_dbase *dbase;
735
736     if ((code = ubik_CheckAuth(rxcall))) {
737         return (code);
738     }
739
740     if (!ubik_currentTrans) {
741         return USYNC;
742     }
743     /* sanity check to make sure only write trans appear here */
744     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
745         return UBADTYPE;
746     }
747
748     /* Should not get this for the sync site */
749     if (ubeacon_AmSyncSite()) {
750         return UDEADLOCK;
751     }
752
753     dbase = ubik_currentTrans->dbase;
754     DBHOLD(dbase);
755     urecovery_CheckTid(atid, 0);
756     if (!ubik_currentTrans) {
757         DBRELE(dbase);
758         return USYNC;
759     }
760
761     /* Set the label if its version matches the sync-site's */
762     if ((oldversionp->epoch == ubik_dbVersion.epoch)
763         && (oldversionp->counter == ubik_dbVersion.counter)) {
764         code = (*dbase->setlabel) (ubik_dbase, 0, newversionp);
765         if (!code) {
766             ubik_dbase->version = *newversionp;
767             ubik_dbVersion = *newversionp;
768         }
769     } else {
770         code = USYNC;
771     }
772
773     DBRELE(dbase);
774     return code;
775 }