ubik-prototypes-20090315
[openafs.git] / src / ubik / remote.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 RCSID
14     ("$Header$");
15
16 #include <sys/types.h>
17 #include <string.h>
18 #include <stdarg.h>
19
20 #ifdef AFS_NT40_ENV
21 #include <winsock2.h>
22 #include <fcntl.h>
23 #else
24 #include <sys/file.h>
25 #include <netinet/in.h>
26 #endif
27
28 #include <lock.h>
29 #include <rx/xdr.h>
30 #include <rx/rx.h>
31 #include <errno.h>
32 #include <afs/afsutil.h>
33
34 #define UBIK_INTERNALS
35 #include "ubik.h"
36 #include "ubik_int.h"
37
38 int (*ubik_CheckRXSecurityProc) (void *, struct rx_call *);
39 void *ubik_CheckRXSecurityRock;
40
41 static void printServerInfo(void);
42
43 /*! \file
44  * routines for handling requests remotely-submitted by the sync site.  These are
45  * only write transactions (we don't propagate read trans), and there is at most one
46  * write transaction extant at any one time.
47  */
48
49 struct ubik_trans *ubik_currentTrans = 0;
50
51 int
52 ubik_CheckAuth(register struct rx_call *acall)
53 {
54     register afs_int32 code;
55     if (ubik_CheckRXSecurityProc) {
56         code = (*ubik_CheckRXSecurityProc) (ubik_CheckRXSecurityRock, acall);
57         return code;
58     } else
59         return 0;
60 }
61
62 /* the rest of these guys handle remote execution of write
63  * transactions: this is the code executed on the other servers when a
64  * sync site is executing a write transaction.
65  */
66 afs_int32
67 SDISK_Begin(register struct rx_call *rxcall, struct ubik_tid *atid)
68 {
69     register afs_int32 code;
70
71     if ((code = ubik_CheckAuth(rxcall))) {
72         return code;
73     }
74     DBHOLD(ubik_dbase);
75     urecovery_CheckTid(atid);
76     if (ubik_currentTrans) {
77         /* If the thread is not waiting for lock - ok to end it */
78 #if !defined(UBIK_PAUSE)
79         if (ubik_currentTrans->locktype != LOCKWAIT) {
80 #endif /* UBIK_PAUSE */
81             udisk_end(ubik_currentTrans);
82 #if !defined(UBIK_PAUSE)
83         }
84 #endif /* UBIK_PAUSE */
85         ubik_currentTrans = (struct ubik_trans *)0;
86     }
87     code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
88     if (!code && ubik_currentTrans) {
89         /* label this trans with the right trans id */
90         ubik_currentTrans->tid.epoch = atid->epoch;
91         ubik_currentTrans->tid.counter = atid->counter;
92     }
93     DBRELE(ubik_dbase);
94     return code;
95 }
96
97
98 afs_int32
99 SDISK_Commit(register struct rx_call *rxcall, struct ubik_tid *atid)
100 {
101     register afs_int32 code;
102     register struct ubik_dbase *dbase;
103
104     if ((code = ubik_CheckAuth(rxcall))) {
105         return code;
106     }
107
108     if (!ubik_currentTrans) {
109         return USYNC;
110     }
111     /*
112      * sanity check to make sure only write trans appear here
113      */
114     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
115         return UBADTYPE;
116     }
117
118     dbase = ubik_currentTrans->dbase;
119     DBHOLD(dbase);
120     urecovery_CheckTid(atid);
121     if (!ubik_currentTrans) {
122         DBRELE(dbase);
123         return USYNC;
124     }
125
126     code = udisk_commit(ubik_currentTrans);
127     if (code == 0) {
128         /* sync site should now match */
129         ubik_dbVersion = ubik_dbase->version;
130     }
131     DBRELE(dbase);
132     return code;
133 }
134
135 afs_int32
136 SDISK_ReleaseLocks(register struct rx_call *rxcall, struct ubik_tid *atid)
137 {
138     register struct ubik_dbase *dbase;
139     register afs_int32 code;
140
141     if ((code = ubik_CheckAuth(rxcall))) {
142         return code;
143     }
144
145     if (!ubik_currentTrans) {
146         return USYNC;
147     }
148     /* sanity check to make sure only write trans appear here */
149     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
150         return UBADTYPE;
151     }
152
153     dbase = ubik_currentTrans->dbase;
154     DBHOLD(dbase);
155     urecovery_CheckTid(atid);
156     if (!ubik_currentTrans) {
157         DBRELE(dbase);
158         return USYNC;
159     }
160
161     /* If the thread is not waiting for lock - ok to end it */
162 #if !defined(UBIK_PAUSE)
163     if (ubik_currentTrans->locktype != LOCKWAIT) {
164 #endif /* UBIK_PAUSE */
165         udisk_end(ubik_currentTrans);
166 #if !defined(UBIK_PAUSE)
167     }
168 #endif /* UBIK_PAUSE */
169     ubik_currentTrans = (struct ubik_trans *)0;
170     DBRELE(dbase);
171     return 0;
172 }
173
174 afs_int32
175 SDISK_Abort(register struct rx_call *rxcall, struct ubik_tid *atid)
176 {
177     register afs_int32 code;
178     register struct ubik_dbase *dbase;
179
180     if ((code = ubik_CheckAuth(rxcall))) {
181         return code;
182     }
183
184     if (!ubik_currentTrans) {
185         return USYNC;
186     }
187     /* sanity check to make sure only write trans appear here  */
188     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
189         return UBADTYPE;
190     }
191
192     dbase = ubik_currentTrans->dbase;
193     DBHOLD(dbase);
194     urecovery_CheckTid(atid);
195     if (!ubik_currentTrans) {
196         DBRELE(dbase);
197         return USYNC;
198     }
199
200     code = udisk_abort(ubik_currentTrans);
201     /* If the thread is not waiting for lock - ok to end it */
202 #if !defined(UBIK_PAUSE)
203     if (ubik_currentTrans->locktype != LOCKWAIT) {
204 #endif /* UBIK_PAUSE */
205         udisk_end(ubik_currentTrans);
206 #if !defined(UBIK_PAUSE)
207     }
208 #endif /* UBIK_PAUSE */
209     ubik_currentTrans = (struct ubik_trans *)0;
210     DBRELE(dbase);
211     return code;
212 }
213
214 /* apos and alen are not used */
215 afs_int32
216 SDISK_Lock(register struct rx_call *rxcall, struct ubik_tid *atid, 
217            afs_int32 afile, afs_int32 apos, afs_int32 alen, afs_int32 atype)
218 {       
219     register afs_int32 code;
220     register struct ubik_dbase *dbase;
221     struct ubik_trans *ubik_thisTrans;
222
223     if ((code = ubik_CheckAuth(rxcall))) {
224         return code;
225     }
226     if (!ubik_currentTrans) {
227         return USYNC;
228     }
229     /* sanity check to make sure only write trans appear here */
230     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
231         return UBADTYPE;
232     }
233     if (alen != 1) {
234         return UBADLOCK;
235     }
236     dbase = ubik_currentTrans->dbase;
237     DBHOLD(dbase);
238     urecovery_CheckTid(atid);
239     if (!ubik_currentTrans) {
240         DBRELE(dbase);
241         return USYNC;
242     }
243
244     ubik_thisTrans = ubik_currentTrans;
245     code = ulock_getLock(ubik_currentTrans, atype, 1);
246
247     /* While waiting, the transaction may have been ended/
248      * aborted from under us (urecovery_CheckTid). In that
249      * case, end the transaction here.
250      */
251     if (!code && (ubik_currentTrans != ubik_thisTrans)) {
252         udisk_end(ubik_thisTrans);
253         code = USYNC;
254     }
255
256     DBRELE(dbase);
257     return code;
258 }
259
260 /*!
261  * \brief Write a vector of data
262  */
263 afs_int32
264 SDISK_WriteV(register struct rx_call *rxcall, struct ubik_tid *atid, 
265              iovec_wrt *io_vector, iovec_buf *io_buffer)
266 {
267     afs_int32 code, i, offset;
268     struct ubik_dbase *dbase;
269     struct ubik_iovec *iovec;
270     char *iobuf;
271
272     if ((code = ubik_CheckAuth(rxcall))) {
273         return code;
274     }
275     if (!ubik_currentTrans) {
276         return USYNC;
277     }
278     /* sanity check to make sure only write trans appear here */
279     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
280         return UBADTYPE;
281     }
282
283     dbase = ubik_currentTrans->dbase;
284     DBHOLD(dbase);
285     urecovery_CheckTid(atid);
286     if (!ubik_currentTrans) {
287         DBRELE(dbase);
288         return USYNC;
289     }
290
291     iovec = (struct ubik_iovec *)io_vector->iovec_wrt_val;
292     iobuf = (char *)io_buffer->iovec_buf_val;
293     for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
294         /* Sanity check for going off end of buffer */
295         if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
296             code = UINTERNAL;
297         } else {
298             code =
299                 udisk_write(ubik_currentTrans, iovec[i].file, &iobuf[offset],
300                             iovec[i].position, iovec[i].length);
301         }
302         if (code)
303             break;
304
305         offset += iovec[i].length;
306     }
307
308     DBRELE(dbase);
309     return code;
310 }
311
312 afs_int32
313 SDISK_Write(register struct rx_call *rxcall, struct ubik_tid *atid, 
314             afs_int32 afile, afs_int32 apos, register bulkdata *adata)
315 {
316     register afs_int32 code;
317     register struct ubik_dbase *dbase;
318
319     if ((code = ubik_CheckAuth(rxcall))) {
320         return code;
321     }
322     if (!ubik_currentTrans) {
323         return USYNC;
324     }
325     /* sanity check to make sure only write trans appear here */
326     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
327         return UBADTYPE;
328     }
329
330     dbase = ubik_currentTrans->dbase;
331     DBHOLD(dbase);
332     urecovery_CheckTid(atid);
333     if (!ubik_currentTrans) {
334         DBRELE(dbase);
335         return USYNC;
336     }
337     code =
338         udisk_write(ubik_currentTrans, afile, adata->bulkdata_val, apos,
339                     adata->bulkdata_len);
340     DBRELE(dbase);
341     return code;
342 }
343
344 afs_int32
345 SDISK_Truncate(register struct rx_call *rxcall, struct ubik_tid *atid, 
346                afs_int32 afile, afs_int32 alen)
347 {
348     register afs_int32 code;
349     register struct ubik_dbase *dbase;
350
351     if ((code = ubik_CheckAuth(rxcall))) {
352         return code;
353     }
354     if (!ubik_currentTrans) {
355         return USYNC;
356     }
357     /* sanity check to make sure only write trans appear here */
358     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
359         return UBADTYPE;
360     }
361
362     dbase = ubik_currentTrans->dbase;
363     DBHOLD(dbase);
364     urecovery_CheckTid(atid);
365     if (!ubik_currentTrans) {
366         DBRELE(dbase);
367         return USYNC;
368     }
369     code = udisk_truncate(ubik_currentTrans, afile, alen);
370     DBRELE(dbase);
371     return code;
372 }
373
374 afs_int32
375 SDISK_GetVersion(register struct rx_call *rxcall, 
376                  register struct ubik_version *aversion)
377 {
378     register afs_int32 code;
379
380     if ((code = ubik_CheckAuth(rxcall))) {
381         return code;
382     }
383
384     /*
385      * If we are the sync site, recovery shouldn't be running on any
386      * other site. We shouldn't be getting this RPC as long as we are
387      * the sync site.  To prevent any unforseen activity, we should
388      * reject this RPC until we have recognized that we are not the
389      * sync site anymore, and/or if we have any pending WRITE
390      * transactions that have to complete. This way we can be assured
391      * that this RPC would not block any pending transactions that
392      * should either fail or pass. If we have recognized the fact that
393      * we are not the sync site any more, all write transactions would
394      * fail with UNOQUORUM anyway.
395      */
396     if (ubeacon_AmSyncSite()) {
397         return UDEADLOCK;
398     }
399
400     DBHOLD(ubik_dbase);
401     code = (*ubik_dbase->getlabel) (ubik_dbase, 0, aversion);
402     DBRELE(ubik_dbase);
403     if (code) {
404         /* tell other side there's no dbase */
405         aversion->epoch = 0;
406         aversion->counter = 0;
407     }
408     return 0;
409 }
410
411 afs_int32
412 SDISK_GetFile(register struct rx_call *rxcall, register afs_int32 file, 
413               struct ubik_version *version)
414 {
415     register afs_int32 code;
416     register struct ubik_dbase *dbase;
417     register afs_int32 offset;
418     struct ubik_stat ubikstat;
419     char tbuffer[256];
420     afs_int32 tlen;
421     afs_int32 length;
422
423     if ((code = ubik_CheckAuth(rxcall))) {
424         return code;
425     }
426 /* temporarily disabled because it causes problems for migration tool.  Hey, it's just
427  * a sanity check, anyway. 
428     if (ubeacon_AmSyncSite()) {
429       return UDEADLOCK;
430     }
431 */
432     dbase = ubik_dbase;
433     DBHOLD(dbase);
434     code = (*dbase->stat) (dbase, file, &ubikstat);
435     if (code < 0) {
436         DBRELE(dbase);
437         return code;
438     }
439     length = ubikstat.size;
440     tlen = htonl(length);
441     code = rx_Write(rxcall, (char *)&tlen, sizeof(afs_int32));
442     if (code != sizeof(afs_int32)) {
443         DBRELE(dbase);
444         ubik_dprint("Rx-write length error=%d\n", code);
445         return BULK_ERROR;
446     }
447     offset = 0;
448     while (length > 0) {
449         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
450         code = (*dbase->read) (dbase, file, tbuffer, offset, tlen);
451         if (code != tlen) {
452             DBRELE(dbase);
453             ubik_dprint("read failed error=%d\n", code);
454             return UIOERROR;
455         }
456         code = rx_Write(rxcall, tbuffer, tlen);
457         if (code != tlen) {
458             DBRELE(dbase);
459             ubik_dprint("Rx-write length error=%d\n", code);
460             return BULK_ERROR;
461         }
462         length -= tlen;
463         offset += tlen;
464     }
465     code = (*dbase->getlabel) (dbase, file, version);   /* return the dbase, too */
466     DBRELE(dbase);
467     return code;
468 }
469
470 afs_int32
471 SDISK_SendFile(register struct rx_call *rxcall, afs_int32 file, 
472                afs_int32 length, struct ubik_version *avers)
473 {
474     register afs_int32 code;
475     struct ubik_dbase *dbase = NULL;
476     char tbuffer[1024];
477     afs_int32 offset;
478     struct ubik_version tversion;
479     register int tlen;
480     struct rx_peer *tpeer;
481     struct rx_connection *tconn;
482     afs_uint32 otherHost = 0;
483 #ifndef OLD_URECOVERY
484     char pbuffer[1028];
485     int flen, fd = -1;
486     afs_int32 epoch = 0;
487     afs_int32 pass;
488 #endif
489
490     /* send the file back to the requester */
491
492     if ((code = ubik_CheckAuth(rxcall))) {
493         goto failed;
494     }
495
496     /* next, we do a sanity check to see if the guy sending us the database is
497      * the guy we think is the sync site.  It turns out that we might not have
498      * decided yet that someone's the sync site, but they could have enough
499      * votes from others to be sync site anyway, and could send us the database
500      * in advance of getting our votes.  This is fine, what we're really trying
501      * to check is that some authenticated bogon isn't sending a random database
502      * into another configuration.  This could happen on a bad configuration
503      * screwup.  Thus, we only object if we're sure we know who the sync site
504      * is, and it ain't the guy talking to us.
505      */
506     offset = uvote_GetSyncSite();
507     tconn = rx_ConnectionOf(rxcall);
508     tpeer = rx_PeerOf(tconn);
509     otherHost = ubikGetPrimaryInterfaceAddr(rx_HostOf(tpeer));
510     if (offset && offset != otherHost) {
511         /* we *know* this is the wrong guy */
512         code = USYNC;
513         goto failed;
514     }
515
516     dbase = ubik_dbase;
517     DBHOLD(dbase);
518
519     /* abort any active trans that may scribble over the database */
520     urecovery_AbortAll(dbase);
521
522     ubik_print("Ubik: Synchronize database with server %s\n",
523                afs_inet_ntoa(otherHost));
524
525     offset = 0;
526 #ifdef OLD_URECOVERY
527     (*dbase->truncate) (dbase, file, 0);        /* truncate first */
528     tversion.counter = 0;
529 #else
530     epoch =
531 #endif
532     tversion.epoch = 0;         /* start off by labelling in-transit db as invalid */
533     (*dbase->setlabel) (dbase, file, &tversion);        /* setlabel does sync */
534 #ifndef OLD_URECOVERY
535     flen = length;
536     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.TMP", ubik_dbase->pathName);
537     fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
538     if (fd < 0) {
539         code = errno;
540         goto failed;
541     }
542     code = lseek(fd, HDRSIZE, 0);
543     if (code != HDRSIZE) {
544         close(fd);
545         goto failed;
546     }
547     pass = 0;
548 #endif
549     memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
550     while (length > 0) {
551         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
552 #if !defined(OLD_URECOVERY) && !defined(AFS_PTHREAD_ENV)
553         if (pass % 4 == 0)
554             IOMGR_Poll();
555 #endif
556         code = rx_Read(rxcall, tbuffer, tlen);
557         if (code != tlen) {
558             DBRELE(dbase);
559             ubik_dprint("Rx-read length error=%d\n", code);
560             code = BULK_ERROR;
561             close(fd);
562             goto failed;
563         }
564 #ifdef OLD_URECOVERY
565         code = (*dbase->write) (dbase, file, tbuffer, offset, tlen);
566 #else
567         code = write(fd, tbuffer, tlen);
568         pass++;
569 #endif
570         if (code != tlen) {
571             DBRELE(dbase);
572             ubik_dprint("write failed error=%d\n", code);
573             code = UIOERROR;
574             close(fd);
575             goto failed;
576         }
577         offset += tlen;
578         length -= tlen;
579     }
580 #ifndef OLD_URECOVERY
581     code = close(fd);
582     if (code)
583         goto failed;
584 #endif     
585
586     /* sync data first, then write label and resync (resync done by setlabel call).
587      * This way, good label is only on good database. */
588 #ifdef OLD_URECOVERY
589     (*ubik_dbase->sync) (dbase, file);
590 #else
591     afs_snprintf(tbuffer, sizeof(tbuffer), "%s.DB0", ubik_dbase->pathName);
592 #ifdef AFS_NT40_ENV
593     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.OLD", ubik_dbase->pathName);
594     code = unlink(pbuffer);
595     if (!code)
596         code = rename(tbuffer, pbuffer);
597     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.TMP", ubik_dbase->pathName);
598 #endif
599     if (!code) 
600         code = rename(pbuffer, tbuffer);
601     if (!code) {
602         (*ubik_dbase->open) (ubik_dbase, 0);
603 #endif
604         code = (*ubik_dbase->setlabel) (dbase, file, avers);
605 #ifndef OLD_URECOVERY
606     }
607 #ifdef AFS_NT40_ENV
608     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.OLD", ubik_dbase->pathName);
609     unlink(pbuffer);
610 #endif
611 #endif
612     memcpy(&ubik_dbase->version, avers, sizeof(struct ubik_version));
613     udisk_Invalidate(dbase, file);      /* new dbase, flush disk buffers */
614 #ifdef AFS_PTHREAD_ENV
615     assert(pthread_cond_broadcast(&dbase->version_cond) == 0);
616 #else
617     LWP_NoYieldSignal(&dbase->version);
618 #endif
619     DBRELE(dbase);
620   failed:
621     if (code) {
622 #ifndef OLD_URECOVERY
623         unlink(pbuffer);
624         /* Failed to sync. Allow reads again for now. */
625         if (dbase != NULL) {
626             tversion.epoch = epoch;
627             (*dbase->setlabel) (dbase, file, &tversion);
628         }
629 #endif
630         ubik_print
631             ("Ubik: Synchronize database with server %s failed (error = %d)\n",
632              afs_inet_ntoa(otherHost), code);
633     } else {
634         ubik_print("Ubik: Synchronize database completed\n");
635     }
636     return code;
637 }
638
639
640 afs_int32
641 SDISK_Probe(register struct rx_call *rxcall)
642 {
643     return 0;
644 }
645
646 /*!
647  * \brief Update remote machines addresses in my server list
648  *
649  * Send back my addresses to caller of this RPC
650  * \return zero on success, else 1.
651  */
652 afs_int32
653 SDISK_UpdateInterfaceAddr(register struct rx_call *rxcall, 
654                           UbikInterfaceAddr *inAddr, 
655                           UbikInterfaceAddr *outAddr)
656 {
657     struct ubik_server *ts, *tmp;
658     afs_uint32 remoteAddr;      /* in net byte order */
659     int i, j, found = 0, probableMatch = 0;
660
661     /* copy the output parameters */
662     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR; i++)
663         outAddr->hostAddr[i] = ntohl(ubik_host[i]);
664
665     remoteAddr = htonl(inAddr->hostAddr[0]);
666     for (ts = ubik_servers; ts; ts = ts->next)
667         if (ts->addr[0] == remoteAddr) {        /* both in net byte order */
668             probableMatch = 1;
669             break;
670         }
671
672     if (probableMatch) {
673         /* verify that all addresses in the incoming RPC are
674          ** not part of other server entries in my CellServDB
675          */
676         for (i = 0; !found && (i < UBIK_MAX_INTERFACE_ADDR)
677              && inAddr->hostAddr[i]; i++) {
678             remoteAddr = htonl(inAddr->hostAddr[i]);
679             for (tmp = ubik_servers; (!found && tmp); tmp = tmp->next) {
680                 if (ts == tmp)  /* this is my server */
681                     continue;
682                 for (j = 0; (j < UBIK_MAX_INTERFACE_ADDR) && tmp->addr[j];
683                      j++)
684                     if (remoteAddr == tmp->addr[j]) {
685                         found = 1;
686                         break;
687                     }
688             }
689         }
690     }
691
692     /* if (probableMatch) */
693     /* inconsistent addresses in CellServDB */
694     if (!probableMatch || found) {
695         ubik_print("Inconsistent Cell Info from server: ");
696         for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
697             ubik_print("%s ", afs_inet_ntoa(htonl(inAddr->hostAddr[i])));
698         ubik_print("\n");
699         fflush(stdout);
700         fflush(stderr);
701         printServerInfo();
702         return UBADHOST;
703     }
704
705     /* update our data structures */
706     for (i = 1; i < UBIK_MAX_INTERFACE_ADDR; i++)
707         ts->addr[i] = htonl(inAddr->hostAddr[i]);
708
709     ubik_print("ubik: A Remote Server has addresses: ");
710     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && ts->addr[i]; i++)
711         ubik_print("%s ", afs_inet_ntoa(ts->addr[i]));
712     ubik_print("\n");
713
714     return 0;
715 }
716
717 static void
718 printServerInfo(void)
719 {
720     struct ubik_server *ts;
721     int i, j = 1;
722
723     ubik_print("Local CellServDB:");
724     for (ts = ubik_servers; ts; ts = ts->next, j++) {
725         ubik_print("Server %d: ", j);
726         for (i = 0; (i < UBIK_MAX_INTERFACE_ADDR) && ts->addr[i]; i++)
727             ubik_print("%s ", afs_inet_ntoa(ts->addr[i]));
728     }
729     ubik_print("\n");
730 }
731
732 afs_int32
733 SDISK_SetVersion(struct rx_call *rxcall, struct ubik_tid *atid, 
734                  struct ubik_version *oldversionp, 
735                  struct ubik_version *newversionp)
736 {
737     afs_int32 code = 0;
738     struct ubik_dbase *dbase;
739
740     if ((code = ubik_CheckAuth(rxcall))) {
741         return (code);
742     }
743
744     if (!ubik_currentTrans) {
745         return USYNC;
746     }
747     /* sanity check to make sure only write trans appear here */
748     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
749         return UBADTYPE;
750     }
751
752     /* Should not get this for the sync site */
753     if (ubeacon_AmSyncSite()) {
754         return UDEADLOCK;
755     }
756
757     dbase = ubik_currentTrans->dbase;
758     DBHOLD(dbase);
759     urecovery_CheckTid(atid);
760     if (!ubik_currentTrans) {
761         DBRELE(dbase);
762         return USYNC;
763     }
764
765     /* Set the label if its version matches the sync-site's */
766     if ((oldversionp->epoch == ubik_dbVersion.epoch)
767         && (oldversionp->counter == ubik_dbVersion.counter)) {
768         code = (*dbase->setlabel) (ubik_dbase, 0, newversionp);
769         if (!code) {
770             ubik_dbase->version = *newversionp;
771             ubik_dbVersion = *newversionp;
772         }
773     } else {
774         code = USYNC;
775     }
776
777     DBRELE(dbase);
778     return code;
779 }