ubik: Set but not used variables
[openafs.git] / src / ubik / remote.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13
14 #include <sys/types.h>
15 #include <string.h>
16 #include <stdarg.h>
17
18 #ifdef AFS_NT40_ENV
19 #include <winsock2.h>
20 #include <fcntl.h>
21 #else
22 #include <sys/file.h>
23 #include <netinet/in.h>
24 #endif
25
26 #include <lock.h>
27 #include <rx/xdr.h>
28 #include <rx/rx.h>
29 #include <errno.h>
30 #include <afs/afsutil.h>
31
32 #define UBIK_INTERNALS
33 #include "ubik.h"
34 #include "ubik_int.h"
35
36 int (*ubik_CheckRXSecurityProc) (void *, struct rx_call *);
37 void *ubik_CheckRXSecurityRock;
38
39 static void printServerInfo(void);
40
41 /*! \file
42  * routines for handling requests remotely-submitted by the sync site.  These are
43  * only write transactions (we don't propagate read trans), and there is at most one
44  * write transaction extant at any one time.
45  */
46
47 struct ubik_trans *ubik_currentTrans = 0;
48
49 int
50 ubik_CheckAuth(struct rx_call *acall)
51 {
52     afs_int32 code;
53     if (ubik_CheckRXSecurityProc) {
54         code = (*ubik_CheckRXSecurityProc) (ubik_CheckRXSecurityRock, acall);
55         return code;
56     } else
57         return 0;
58 }
59
60 /* the rest of these guys handle remote execution of write
61  * transactions: this is the code executed on the other servers when a
62  * sync site is executing a write transaction.
63  */
64 afs_int32
65 SDISK_Begin(struct rx_call *rxcall, struct ubik_tid *atid)
66 {
67     afs_int32 code;
68
69     if ((code = ubik_CheckAuth(rxcall))) {
70         return code;
71     }
72     DBHOLD(ubik_dbase);
73     urecovery_CheckTid(atid, 1);
74     code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
75     if (!code && ubik_currentTrans) {
76         /* label this trans with the right trans id */
77         ubik_currentTrans->tid.epoch = atid->epoch;
78         ubik_currentTrans->tid.counter = atid->counter;
79     }
80     DBRELE(ubik_dbase);
81     return code;
82 }
83
84
85 afs_int32
86 SDISK_Commit(struct rx_call *rxcall, struct ubik_tid *atid)
87 {
88     afs_int32 code;
89     struct ubik_dbase *dbase;
90
91     if ((code = ubik_CheckAuth(rxcall))) {
92         return code;
93     }
94
95     if (!ubik_currentTrans) {
96         return USYNC;
97     }
98     /*
99      * sanity check to make sure only write trans appear here
100      */
101     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
102         return UBADTYPE;
103     }
104
105     dbase = ubik_currentTrans->dbase;
106
107     ObtainWriteLock(&dbase->cache_lock);
108
109     DBHOLD(dbase);
110
111     urecovery_CheckTid(atid, 0);
112     if (!ubik_currentTrans) {
113         DBRELE(dbase);
114         ReleaseWriteLock(&dbase->cache_lock);
115         return USYNC;
116     }
117
118     code = udisk_commit(ubik_currentTrans);
119     if (code == 0) {
120         /* sync site should now match */
121         ubik_dbVersion = ubik_dbase->version;
122     }
123     DBRELE(dbase);
124     ReleaseWriteLock(&dbase->cache_lock);
125     return code;
126 }
127
128 afs_int32
129 SDISK_ReleaseLocks(struct rx_call *rxcall, struct ubik_tid *atid)
130 {
131     struct ubik_dbase *dbase;
132     afs_int32 code;
133
134     if ((code = ubik_CheckAuth(rxcall))) {
135         return code;
136     }
137
138     if (!ubik_currentTrans) {
139         return USYNC;
140     }
141     /* sanity check to make sure only write trans appear here */
142     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
143         return UBADTYPE;
144     }
145
146     dbase = ubik_currentTrans->dbase;
147     DBHOLD(dbase);
148     urecovery_CheckTid(atid, 0);
149     if (!ubik_currentTrans) {
150         DBRELE(dbase);
151         return USYNC;
152     }
153
154     /* If the thread is not waiting for lock - ok to end it */
155 #if !defined(UBIK_PAUSE)
156     if (ubik_currentTrans->locktype != LOCKWAIT) {
157 #endif /* UBIK_PAUSE */
158         udisk_end(ubik_currentTrans);
159 #if !defined(UBIK_PAUSE)
160     }
161 #endif /* UBIK_PAUSE */
162     ubik_currentTrans = (struct ubik_trans *)0;
163     DBRELE(dbase);
164     return 0;
165 }
166
167 afs_int32
168 SDISK_Abort(struct rx_call *rxcall, struct ubik_tid *atid)
169 {
170     afs_int32 code;
171     struct ubik_dbase *dbase;
172
173     if ((code = ubik_CheckAuth(rxcall))) {
174         return code;
175     }
176
177     if (!ubik_currentTrans) {
178         return USYNC;
179     }
180     /* sanity check to make sure only write trans appear here  */
181     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
182         return UBADTYPE;
183     }
184
185     dbase = ubik_currentTrans->dbase;
186     DBHOLD(dbase);
187     urecovery_CheckTid(atid, 0);
188     if (!ubik_currentTrans) {
189         DBRELE(dbase);
190         return USYNC;
191     }
192
193     code = udisk_abort(ubik_currentTrans);
194     /* If the thread is not waiting for lock - ok to end it */
195 #if !defined(UBIK_PAUSE)
196     if (ubik_currentTrans->locktype != LOCKWAIT) {
197 #endif /* UBIK_PAUSE */
198         udisk_end(ubik_currentTrans);
199 #if !defined(UBIK_PAUSE)
200     }
201 #endif /* UBIK_PAUSE */
202     ubik_currentTrans = (struct ubik_trans *)0;
203     DBRELE(dbase);
204     return code;
205 }
206
207 /* apos and alen are not used */
208 afs_int32
209 SDISK_Lock(struct rx_call *rxcall, struct ubik_tid *atid,
210            afs_int32 afile, afs_int32 apos, afs_int32 alen, afs_int32 atype)
211 {
212     afs_int32 code;
213     struct ubik_dbase *dbase;
214     struct ubik_trans *ubik_thisTrans;
215
216     if ((code = ubik_CheckAuth(rxcall))) {
217         return code;
218     }
219     if (!ubik_currentTrans) {
220         return USYNC;
221     }
222     /* sanity check to make sure only write trans appear here */
223     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
224         return UBADTYPE;
225     }
226     if (alen != 1) {
227         return UBADLOCK;
228     }
229     dbase = ubik_currentTrans->dbase;
230     DBHOLD(dbase);
231     urecovery_CheckTid(atid, 0);
232     if (!ubik_currentTrans) {
233         DBRELE(dbase);
234         return USYNC;
235     }
236
237     ubik_thisTrans = ubik_currentTrans;
238     code = ulock_getLock(ubik_currentTrans, atype, 1);
239
240     /* While waiting, the transaction may have been ended/
241      * aborted from under us (urecovery_CheckTid). In that
242      * case, end the transaction here.
243      */
244     if (!code && (ubik_currentTrans != ubik_thisTrans)) {
245         udisk_end(ubik_thisTrans);
246         code = USYNC;
247     }
248
249     DBRELE(dbase);
250     return code;
251 }
252
253 /*!
254  * \brief Write a vector of data
255  */
256 afs_int32
257 SDISK_WriteV(struct rx_call *rxcall, struct ubik_tid *atid,
258              iovec_wrt *io_vector, iovec_buf *io_buffer)
259 {
260     afs_int32 code, i, offset;
261     struct ubik_dbase *dbase;
262     struct ubik_iovec *iovec;
263     char *iobuf;
264
265     if ((code = ubik_CheckAuth(rxcall))) {
266         return code;
267     }
268     if (!ubik_currentTrans) {
269         return USYNC;
270     }
271     /* sanity check to make sure only write trans appear here */
272     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
273         return UBADTYPE;
274     }
275
276     dbase = ubik_currentTrans->dbase;
277     DBHOLD(dbase);
278     urecovery_CheckTid(atid, 0);
279     if (!ubik_currentTrans) {
280         DBRELE(dbase);
281         return USYNC;
282     }
283
284     iovec = (struct ubik_iovec *)io_vector->iovec_wrt_val;
285     iobuf = (char *)io_buffer->iovec_buf_val;
286     for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
287         /* Sanity check for going off end of buffer */
288         if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
289             code = UINTERNAL;
290         } else {
291             code =
292                 udisk_write(ubik_currentTrans, iovec[i].file, &iobuf[offset],
293                             iovec[i].position, iovec[i].length);
294         }
295         if (code)
296             break;
297
298         offset += iovec[i].length;
299     }
300
301     DBRELE(dbase);
302     return code;
303 }
304
305 afs_int32
306 SDISK_Write(struct rx_call *rxcall, struct ubik_tid *atid,
307             afs_int32 afile, afs_int32 apos, bulkdata *adata)
308 {
309     afs_int32 code;
310     struct ubik_dbase *dbase;
311
312     if ((code = ubik_CheckAuth(rxcall))) {
313         return code;
314     }
315     if (!ubik_currentTrans) {
316         return USYNC;
317     }
318     /* sanity check to make sure only write trans appear here */
319     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
320         return UBADTYPE;
321     }
322
323     dbase = ubik_currentTrans->dbase;
324     DBHOLD(dbase);
325     urecovery_CheckTid(atid, 0);
326     if (!ubik_currentTrans) {
327         DBRELE(dbase);
328         return USYNC;
329     }
330     code =
331         udisk_write(ubik_currentTrans, afile, adata->bulkdata_val, apos,
332                     adata->bulkdata_len);
333     DBRELE(dbase);
334     return code;
335 }
336
337 afs_int32
338 SDISK_Truncate(struct rx_call *rxcall, struct ubik_tid *atid,
339                afs_int32 afile, afs_int32 alen)
340 {
341     afs_int32 code;
342     struct ubik_dbase *dbase;
343
344     if ((code = ubik_CheckAuth(rxcall))) {
345         return code;
346     }
347     if (!ubik_currentTrans) {
348         return USYNC;
349     }
350     /* sanity check to make sure only write trans appear here */
351     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
352         return UBADTYPE;
353     }
354
355     dbase = ubik_currentTrans->dbase;
356     DBHOLD(dbase);
357     urecovery_CheckTid(atid, 0);
358     if (!ubik_currentTrans) {
359         DBRELE(dbase);
360         return USYNC;
361     }
362     code = udisk_truncate(ubik_currentTrans, afile, alen);
363     DBRELE(dbase);
364     return code;
365 }
366
367 afs_int32
368 SDISK_GetVersion(struct rx_call *rxcall,
369                  struct ubik_version *aversion)
370 {
371     afs_int32 code;
372
373     if ((code = ubik_CheckAuth(rxcall))) {
374         return code;
375     }
376
377     /*
378      * If we are the sync site, recovery shouldn't be running on any
379      * other site. We shouldn't be getting this RPC as long as we are
380      * the sync site.  To prevent any unforseen activity, we should
381      * reject this RPC until we have recognized that we are not the
382      * sync site anymore, and/or if we have any pending WRITE
383      * transactions that have to complete. This way we can be assured
384      * that this RPC would not block any pending transactions that
385      * should either fail or pass. If we have recognized the fact that
386      * we are not the sync site any more, all write transactions would
387      * fail with UNOQUORUM anyway.
388      */
389     if (ubeacon_AmSyncSite()) {
390         return UDEADLOCK;
391     }
392
393     DBHOLD(ubik_dbase);
394     code = (*ubik_dbase->getlabel) (ubik_dbase, 0, aversion);
395     DBRELE(ubik_dbase);
396     if (code) {
397         /* tell other side there's no dbase */
398         aversion->epoch = 0;
399         aversion->counter = 0;
400     }
401     return 0;
402 }
403
404 afs_int32
405 SDISK_GetFile(struct rx_call *rxcall, afs_int32 file,
406               struct ubik_version *version)
407 {
408     afs_int32 code;
409     struct ubik_dbase *dbase;
410     afs_int32 offset;
411     struct ubik_stat ubikstat;
412     char tbuffer[256];
413     afs_int32 tlen;
414     afs_int32 length;
415
416     if ((code = ubik_CheckAuth(rxcall))) {
417         return code;
418     }
419 /* temporarily disabled because it causes problems for migration tool.  Hey, it's just
420  * a sanity check, anyway.
421     if (ubeacon_AmSyncSite()) {
422       return UDEADLOCK;
423     }
424 */
425     dbase = ubik_dbase;
426     DBHOLD(dbase);
427     code = (*dbase->stat) (dbase, file, &ubikstat);
428     if (code < 0) {
429         DBRELE(dbase);
430         return code;
431     }
432     length = ubikstat.size;
433     tlen = htonl(length);
434     code = rx_Write(rxcall, (char *)&tlen, sizeof(afs_int32));
435     if (code != sizeof(afs_int32)) {
436         DBRELE(dbase);
437         ubik_dprint("Rx-write length error=%d\n", code);
438         return BULK_ERROR;
439     }
440     offset = 0;
441     while (length > 0) {
442         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
443         code = (*dbase->read) (dbase, file, tbuffer, offset, tlen);
444         if (code != tlen) {
445             DBRELE(dbase);
446             ubik_dprint("read failed error=%d\n", code);
447             return UIOERROR;
448         }
449         code = rx_Write(rxcall, tbuffer, tlen);
450         if (code != tlen) {
451             DBRELE(dbase);
452             ubik_dprint("Rx-write length error=%d\n", code);
453             return BULK_ERROR;
454         }
455         length -= tlen;
456         offset += tlen;
457     }
458     code = (*dbase->getlabel) (dbase, file, version);   /* return the dbase, too */
459     DBRELE(dbase);
460     return code;
461 }
462
463 afs_int32
464 SDISK_SendFile(struct rx_call *rxcall, afs_int32 file,
465                afs_int32 length, struct ubik_version *avers)
466 {
467     afs_int32 code;
468     struct ubik_dbase *dbase = NULL;
469     char tbuffer[1024];
470     afs_int32 offset;
471     struct ubik_version tversion;
472     int tlen;
473     struct rx_peer *tpeer;
474     struct rx_connection *tconn;
475     afs_uint32 otherHost = 0;
476     char hoststr[16];
477 #ifndef OLD_URECOVERY
478     char pbuffer[1028];
479     int fd = -1;
480     afs_int32 epoch = 0;
481     afs_int32 pass;
482 #endif
483
484     /* send the file back to the requester */
485
486     if ((code = ubik_CheckAuth(rxcall))) {
487         goto failed;
488     }
489
490     /* next, we do a sanity check to see if the guy sending us the database is
491      * the guy we think is the sync site.  It turns out that we might not have
492      * decided yet that someone's the sync site, but they could have enough
493      * votes from others to be sync site anyway, and could send us the database
494      * in advance of getting our votes.  This is fine, what we're really trying
495      * to check is that some authenticated bogon isn't sending a random database
496      * into another configuration.  This could happen on a bad configuration
497      * screwup.  Thus, we only object if we're sure we know who the sync site
498      * is, and it ain't the guy talking to us.
499      */
500     offset = uvote_GetSyncSite();
501     tconn = rx_ConnectionOf(rxcall);
502     tpeer = rx_PeerOf(tconn);
503     otherHost = ubikGetPrimaryInterfaceAddr(rx_HostOf(tpeer));
504     if (offset && offset != otherHost) {
505         /* we *know* this is the wrong guy */
506         code = USYNC;
507         goto failed;
508     }
509
510     dbase = ubik_dbase;
511     DBHOLD(dbase);
512
513     /* abort any active trans that may scribble over the database */
514     urecovery_AbortAll(dbase);
515
516     ubik_print("Ubik: Synchronize database with server %s\n",
517                afs_inet_ntoa_r(otherHost, hoststr));
518
519     offset = 0;
520 #ifdef OLD_URECOVERY
521     (*dbase->truncate) (dbase, file, 0);        /* truncate first */
522     tversion.counter = 0;
523 #else
524     epoch =
525 #endif
526     tversion.epoch = 0;         /* start off by labelling in-transit db as invalid */
527     (*dbase->setlabel) (dbase, file, &tversion);        /* setlabel does sync */
528 #ifndef OLD_URECOVERY
529     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
530     fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
531     if (fd < 0) {
532         code = errno;
533         goto failed;
534     }
535     code = lseek(fd, HDRSIZE, 0);
536     if (code != HDRSIZE) {
537         close(fd);
538         goto failed;
539     }
540     pass = 0;
541 #endif
542     memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
543     while (length > 0) {
544         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
545 #if !defined(OLD_URECOVERY) && !defined(AFS_PTHREAD_ENV)
546         if (pass % 4 == 0)
547             IOMGR_Poll();
548 #endif
549         code = rx_Read(rxcall, tbuffer, tlen);
550         if (code != tlen) {
551             DBRELE(dbase);
552             ubik_dprint("Rx-read length error=%d\n", code);
553             code = BULK_ERROR;
554             close(fd);
555             goto failed;
556         }
557 #ifdef OLD_URECOVERY
558         code = (*dbase->write) (dbase, file, tbuffer, offset, tlen);
559 #else
560         code = write(fd, tbuffer, tlen);
561         pass++;
562 #endif
563         if (code != tlen) {
564             DBRELE(dbase);
565             ubik_dprint("write failed error=%d\n", code);
566             code = UIOERROR;
567             close(fd);
568             goto failed;
569         }
570         offset += tlen;
571         length -= tlen;
572     }
573 #ifndef OLD_URECOVERY
574     code = close(fd);
575     if (code)
576         goto failed;
577 #endif
578
579     /* sync data first, then write label and resync (resync done by setlabel call).
580      * This way, good label is only on good database. */
581 #ifdef OLD_URECOVERY
582     (*ubik_dbase->sync) (dbase, file);
583 #else
584     afs_snprintf(tbuffer, sizeof(tbuffer), "%s.DB%s%d", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
585 #ifdef AFS_NT40_ENV
586     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
587     code = unlink(pbuffer);
588     if (!code)
589         code = rename(tbuffer, pbuffer);
590     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
591 #endif
592     if (!code)
593         code = rename(pbuffer, tbuffer);
594     if (!code) {
595         (*ubik_dbase->open) (ubik_dbase, file);
596 #endif
597         code = (*ubik_dbase->setlabel) (dbase, file, avers);
598 #ifndef OLD_URECOVERY
599     }
600 #ifdef AFS_NT40_ENV
601     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
602     unlink(pbuffer);
603 #endif
604 #endif
605     memcpy(&ubik_dbase->version, avers, sizeof(struct ubik_version));
606     udisk_Invalidate(dbase, file);      /* new dbase, flush disk buffers */
607 #ifdef AFS_PTHREAD_ENV
608     assert(pthread_cond_broadcast(&dbase->version_cond) == 0);
609 #else
610     LWP_NoYieldSignal(&dbase->version);
611 #endif
612     DBRELE(dbase);
613   failed:
614     if (code) {
615 #ifndef OLD_URECOVERY
616         unlink(pbuffer);
617         /* Failed to sync. Allow reads again for now. */
618         if (dbase != NULL) {
619             tversion.epoch = epoch;
620             (*dbase->setlabel) (dbase, file, &tversion);
621         }
622 #endif
623         ubik_print
624             ("Ubik: Synchronize database with server %s failed (error = %d)\n",
625              afs_inet_ntoa_r(otherHost, hoststr), code);
626     } else {
627         ubik_print("Ubik: Synchronize database completed\n");
628     }
629     return code;
630 }
631
632
633 afs_int32
634 SDISK_Probe(struct rx_call *rxcall)
635 {
636     return 0;
637 }
638
639 /*!
640  * \brief Update remote machines addresses in my server list
641  *
642  * Send back my addresses to caller of this RPC
643  * \return zero on success, else 1.
644  */
645 afs_int32
646 SDISK_UpdateInterfaceAddr(struct rx_call *rxcall,
647                           UbikInterfaceAddr *inAddr,
648                           UbikInterfaceAddr *outAddr)
649 {
650     struct ubik_server *ts, *tmp;
651     afs_uint32 remoteAddr;      /* in net byte order */
652     int i, j, found = 0, probableMatch = 0;
653     char hoststr[16];
654
655     /* copy the output parameters */
656     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR; i++)
657         outAddr->hostAddr[i] = ntohl(ubik_host[i]);
658
659     remoteAddr = htonl(inAddr->hostAddr[0]);
660     for (ts = ubik_servers; ts; ts = ts->next)
661         if (ts->addr[0] == remoteAddr) {        /* both in net byte order */
662             probableMatch = 1;
663             break;
664         }
665
666     if (probableMatch) {
667         /* verify that all addresses in the incoming RPC are
668          ** not part of other server entries in my CellServDB
669          */
670         for (i = 0; !found && (i < UBIK_MAX_INTERFACE_ADDR)
671              && inAddr->hostAddr[i]; i++) {
672             remoteAddr = htonl(inAddr->hostAddr[i]);
673             for (tmp = ubik_servers; (!found && tmp); tmp = tmp->next) {
674                 if (ts == tmp)  /* this is my server */
675                     continue;
676                 for (j = 0; (j < UBIK_MAX_INTERFACE_ADDR) && tmp->addr[j];
677                      j++)
678                     if (remoteAddr == tmp->addr[j]) {
679                         found = 1;
680                         break;
681                     }
682             }
683         }
684     }
685
686     /* if (probableMatch) */
687     /* inconsistent addresses in CellServDB */
688     if (!probableMatch || found) {
689         ubik_print("Inconsistent Cell Info from server: ");
690         for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
691             ubik_print("%s ", afs_inet_ntoa_r(htonl(inAddr->hostAddr[i]), hoststr));
692         ubik_print("\n");
693         fflush(stdout);
694         fflush(stderr);
695         printServerInfo();
696         return UBADHOST;
697     }
698
699     /* update our data structures */
700     for (i = 1; i < UBIK_MAX_INTERFACE_ADDR; i++)
701         ts->addr[i] = htonl(inAddr->hostAddr[i]);
702
703     ubik_print("ubik: A Remote Server has addresses: ");
704     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && ts->addr[i]; i++)
705         ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
706     ubik_print("\n");
707
708     return 0;
709 }
710
711 static void
712 printServerInfo(void)
713 {
714     struct ubik_server *ts;
715     int i, j = 1;
716     char hoststr[16];
717
718     ubik_print("Local CellServDB:");
719     for (ts = ubik_servers; ts; ts = ts->next, j++) {
720         ubik_print("Server %d: ", j);
721         for (i = 0; (i < UBIK_MAX_INTERFACE_ADDR) && ts->addr[i]; i++)
722             ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
723     }
724     ubik_print("\n");
725 }
726
727 afs_int32
728 SDISK_SetVersion(struct rx_call *rxcall, struct ubik_tid *atid,
729                  struct ubik_version *oldversionp,
730                  struct ubik_version *newversionp)
731 {
732     afs_int32 code = 0;
733     struct ubik_dbase *dbase;
734
735     if ((code = ubik_CheckAuth(rxcall))) {
736         return (code);
737     }
738
739     if (!ubik_currentTrans) {
740         return USYNC;
741     }
742     /* sanity check to make sure only write trans appear here */
743     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
744         return UBADTYPE;
745     }
746
747     /* Should not get this for the sync site */
748     if (ubeacon_AmSyncSite()) {
749         return UDEADLOCK;
750     }
751
752     dbase = ubik_currentTrans->dbase;
753     DBHOLD(dbase);
754     urecovery_CheckTid(atid, 0);
755     if (!ubik_currentTrans) {
756         DBRELE(dbase);
757         return USYNC;
758     }
759
760     /* Set the label if its version matches the sync-site's */
761     if ((oldversionp->epoch == ubik_dbVersion.epoch)
762         && (oldversionp->counter == ubik_dbVersion.counter)) {
763         code = (*dbase->setlabel) (ubik_dbase, 0, newversionp);
764         if (!code) {
765             ubik_dbase->version = *newversionp;
766             ubik_dbVersion = *newversionp;
767         }
768     } else {
769         code = USYNC;
770     }
771
772     DBRELE(dbase);
773     return code;
774 }