ubik: Remove api for reading during write locks
[openafs.git] / src / ubik / remote.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13
14 #include <sys/types.h>
15 #include <string.h>
16 #include <stdarg.h>
17
18 #ifdef AFS_NT40_ENV
19 #include <winsock2.h>
20 #include <fcntl.h>
21 #else
22 #include <sys/file.h>
23 #include <netinet/in.h>
24 #endif
25
26 #include <lock.h>
27 #include <rx/xdr.h>
28 #include <rx/rx.h>
29 #include <errno.h>
30 #include <afs/afsutil.h>
31
32 #define UBIK_INTERNALS
33 #include "ubik.h"
34 #include "ubik_int.h"
35
36 int (*ubik_CheckRXSecurityProc) (void *, struct rx_call *);
37 void *ubik_CheckRXSecurityRock;
38
39 static void printServerInfo(void);
40
41 /*! \file
42  * routines for handling requests remotely-submitted by the sync site.  These are
43  * only write transactions (we don't propagate read trans), and there is at most one
44  * write transaction extant at any one time.
45  */
46
47 struct ubik_trans *ubik_currentTrans = 0;
48
49 int
50 ubik_CheckAuth(register struct rx_call *acall)
51 {
52     register afs_int32 code;
53     if (ubik_CheckRXSecurityProc) {
54         code = (*ubik_CheckRXSecurityProc) (ubik_CheckRXSecurityRock, acall);
55         return code;
56     } else
57         return 0;
58 }
59
60 /* the rest of these guys handle remote execution of write
61  * transactions: this is the code executed on the other servers when a
62  * sync site is executing a write transaction.
63  */
64 afs_int32
65 SDISK_Begin(register struct rx_call *rxcall, struct ubik_tid *atid)
66 {
67     register afs_int32 code;
68
69     if ((code = ubik_CheckAuth(rxcall))) {
70         return code;
71     }
72     DBHOLD(ubik_dbase);
73     urecovery_CheckTid(atid);
74     if (ubik_currentTrans) {
75         /* If the thread is not waiting for lock - ok to end it */
76 #if !defined(UBIK_PAUSE)
77         if (ubik_currentTrans->locktype != LOCKWAIT) {
78 #endif /* UBIK_PAUSE */
79             udisk_end(ubik_currentTrans);
80 #if !defined(UBIK_PAUSE)
81         }
82 #endif /* UBIK_PAUSE */
83         ubik_currentTrans = (struct ubik_trans *)0;
84     }
85     code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
86     if (!code && ubik_currentTrans) {
87         /* label this trans with the right trans id */
88         ubik_currentTrans->tid.epoch = atid->epoch;
89         ubik_currentTrans->tid.counter = atid->counter;
90     }
91     DBRELE(ubik_dbase);
92     return code;
93 }
94
95
96 afs_int32
97 SDISK_Commit(register struct rx_call *rxcall, struct ubik_tid *atid)
98 {
99     register afs_int32 code;
100     register struct ubik_dbase *dbase;
101
102     if ((code = ubik_CheckAuth(rxcall))) {
103         return code;
104     }
105
106     if (!ubik_currentTrans) {
107         return USYNC;
108     }
109     /*
110      * sanity check to make sure only write trans appear here
111      */
112     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
113         return UBADTYPE;
114     }
115
116     dbase = ubik_currentTrans->dbase;
117     DBHOLD(dbase);
118     urecovery_CheckTid(atid);
119     if (!ubik_currentTrans) {
120         DBRELE(dbase);
121         return USYNC;
122     }
123
124     code = udisk_commit(ubik_currentTrans);
125     if (code == 0) {
126         /* sync site should now match */
127         ubik_dbVersion = ubik_dbase->version;
128     }
129     DBRELE(dbase);
130     return code;
131 }
132
133 afs_int32
134 SDISK_ReleaseLocks(register struct rx_call *rxcall, struct ubik_tid *atid)
135 {
136     register struct ubik_dbase *dbase;
137     register afs_int32 code;
138
139     if ((code = ubik_CheckAuth(rxcall))) {
140         return code;
141     }
142
143     if (!ubik_currentTrans) {
144         return USYNC;
145     }
146     /* sanity check to make sure only write trans appear here */
147     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
148         return UBADTYPE;
149     }
150
151     dbase = ubik_currentTrans->dbase;
152     DBHOLD(dbase);
153     urecovery_CheckTid(atid);
154     if (!ubik_currentTrans) {
155         DBRELE(dbase);
156         return USYNC;
157     }
158
159     /* If the thread is not waiting for lock - ok to end it */
160 #if !defined(UBIK_PAUSE)
161     if (ubik_currentTrans->locktype != LOCKWAIT) {
162 #endif /* UBIK_PAUSE */
163         udisk_end(ubik_currentTrans);
164 #if !defined(UBIK_PAUSE)
165     }
166 #endif /* UBIK_PAUSE */
167     ubik_currentTrans = (struct ubik_trans *)0;
168     DBRELE(dbase);
169     return 0;
170 }
171
172 afs_int32
173 SDISK_Abort(register struct rx_call *rxcall, struct ubik_tid *atid)
174 {
175     register afs_int32 code;
176     register struct ubik_dbase *dbase;
177
178     if ((code = ubik_CheckAuth(rxcall))) {
179         return code;
180     }
181
182     if (!ubik_currentTrans) {
183         return USYNC;
184     }
185     /* sanity check to make sure only write trans appear here  */
186     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
187         return UBADTYPE;
188     }
189
190     dbase = ubik_currentTrans->dbase;
191     DBHOLD(dbase);
192     urecovery_CheckTid(atid);
193     if (!ubik_currentTrans) {
194         DBRELE(dbase);
195         return USYNC;
196     }
197
198     code = udisk_abort(ubik_currentTrans);
199     /* If the thread is not waiting for lock - ok to end it */
200 #if !defined(UBIK_PAUSE)
201     if (ubik_currentTrans->locktype != LOCKWAIT) {
202 #endif /* UBIK_PAUSE */
203         udisk_end(ubik_currentTrans);
204 #if !defined(UBIK_PAUSE)
205     }
206 #endif /* UBIK_PAUSE */
207     ubik_currentTrans = (struct ubik_trans *)0;
208     DBRELE(dbase);
209     return code;
210 }
211
212 /* apos and alen are not used */
213 afs_int32
214 SDISK_Lock(register struct rx_call *rxcall, struct ubik_tid *atid, 
215            afs_int32 afile, afs_int32 apos, afs_int32 alen, afs_int32 atype)
216 {       
217     register afs_int32 code;
218     register struct ubik_dbase *dbase;
219     struct ubik_trans *ubik_thisTrans;
220
221     if ((code = ubik_CheckAuth(rxcall))) {
222         return code;
223     }
224     if (!ubik_currentTrans) {
225         return USYNC;
226     }
227     /* sanity check to make sure only write trans appear here */
228     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
229         return UBADTYPE;
230     }
231     if (alen != 1) {
232         return UBADLOCK;
233     }
234     dbase = ubik_currentTrans->dbase;
235     DBHOLD(dbase);
236     urecovery_CheckTid(atid);
237     if (!ubik_currentTrans) {
238         DBRELE(dbase);
239         return USYNC;
240     }
241
242     ubik_thisTrans = ubik_currentTrans;
243     code = ulock_getLock(ubik_currentTrans, atype, 1);
244
245     /* While waiting, the transaction may have been ended/
246      * aborted from under us (urecovery_CheckTid). In that
247      * case, end the transaction here.
248      */
249     if (!code && (ubik_currentTrans != ubik_thisTrans)) {
250         udisk_end(ubik_thisTrans);
251         code = USYNC;
252     }
253
254     DBRELE(dbase);
255     return code;
256 }
257
258 /*!
259  * \brief Write a vector of data
260  */
261 afs_int32
262 SDISK_WriteV(register struct rx_call *rxcall, struct ubik_tid *atid, 
263              iovec_wrt *io_vector, iovec_buf *io_buffer)
264 {
265     afs_int32 code, i, offset;
266     struct ubik_dbase *dbase;
267     struct ubik_iovec *iovec;
268     char *iobuf;
269
270     if ((code = ubik_CheckAuth(rxcall))) {
271         return code;
272     }
273     if (!ubik_currentTrans) {
274         return USYNC;
275     }
276     /* sanity check to make sure only write trans appear here */
277     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
278         return UBADTYPE;
279     }
280
281     dbase = ubik_currentTrans->dbase;
282     DBHOLD(dbase);
283     urecovery_CheckTid(atid);
284     if (!ubik_currentTrans) {
285         DBRELE(dbase);
286         return USYNC;
287     }
288
289     iovec = (struct ubik_iovec *)io_vector->iovec_wrt_val;
290     iobuf = (char *)io_buffer->iovec_buf_val;
291     for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
292         /* Sanity check for going off end of buffer */
293         if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
294             code = UINTERNAL;
295         } else {
296             code =
297                 udisk_write(ubik_currentTrans, iovec[i].file, &iobuf[offset],
298                             iovec[i].position, iovec[i].length);
299         }
300         if (code)
301             break;
302
303         offset += iovec[i].length;
304     }
305
306     DBRELE(dbase);
307     return code;
308 }
309
310 afs_int32
311 SDISK_Write(register struct rx_call *rxcall, struct ubik_tid *atid, 
312             afs_int32 afile, afs_int32 apos, register bulkdata *adata)
313 {
314     register afs_int32 code;
315     register struct ubik_dbase *dbase;
316
317     if ((code = ubik_CheckAuth(rxcall))) {
318         return code;
319     }
320     if (!ubik_currentTrans) {
321         return USYNC;
322     }
323     /* sanity check to make sure only write trans appear here */
324     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
325         return UBADTYPE;
326     }
327
328     dbase = ubik_currentTrans->dbase;
329     DBHOLD(dbase);
330     urecovery_CheckTid(atid);
331     if (!ubik_currentTrans) {
332         DBRELE(dbase);
333         return USYNC;
334     }
335     code =
336         udisk_write(ubik_currentTrans, afile, adata->bulkdata_val, apos,
337                     adata->bulkdata_len);
338     DBRELE(dbase);
339     return code;
340 }
341
342 afs_int32
343 SDISK_Truncate(register struct rx_call *rxcall, struct ubik_tid *atid, 
344                afs_int32 afile, afs_int32 alen)
345 {
346     register afs_int32 code;
347     register struct ubik_dbase *dbase;
348
349     if ((code = ubik_CheckAuth(rxcall))) {
350         return code;
351     }
352     if (!ubik_currentTrans) {
353         return USYNC;
354     }
355     /* sanity check to make sure only write trans appear here */
356     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
357         return UBADTYPE;
358     }
359
360     dbase = ubik_currentTrans->dbase;
361     DBHOLD(dbase);
362     urecovery_CheckTid(atid);
363     if (!ubik_currentTrans) {
364         DBRELE(dbase);
365         return USYNC;
366     }
367     code = udisk_truncate(ubik_currentTrans, afile, alen);
368     DBRELE(dbase);
369     return code;
370 }
371
372 afs_int32
373 SDISK_GetVersion(register struct rx_call *rxcall, 
374                  register struct ubik_version *aversion)
375 {
376     register afs_int32 code;
377
378     if ((code = ubik_CheckAuth(rxcall))) {
379         return code;
380     }
381
382     /*
383      * If we are the sync site, recovery shouldn't be running on any
384      * other site. We shouldn't be getting this RPC as long as we are
385      * the sync site.  To prevent any unforseen activity, we should
386      * reject this RPC until we have recognized that we are not the
387      * sync site anymore, and/or if we have any pending WRITE
388      * transactions that have to complete. This way we can be assured
389      * that this RPC would not block any pending transactions that
390      * should either fail or pass. If we have recognized the fact that
391      * we are not the sync site any more, all write transactions would
392      * fail with UNOQUORUM anyway.
393      */
394     if (ubeacon_AmSyncSite()) {
395         return UDEADLOCK;
396     }
397
398     DBHOLD(ubik_dbase);
399     code = (*ubik_dbase->getlabel) (ubik_dbase, 0, aversion);
400     DBRELE(ubik_dbase);
401     if (code) {
402         /* tell other side there's no dbase */
403         aversion->epoch = 0;
404         aversion->counter = 0;
405     }
406     return 0;
407 }
408
409 afs_int32
410 SDISK_GetFile(register struct rx_call *rxcall, register afs_int32 file, 
411               struct ubik_version *version)
412 {
413     register afs_int32 code;
414     register struct ubik_dbase *dbase;
415     register afs_int32 offset;
416     struct ubik_stat ubikstat;
417     char tbuffer[256];
418     afs_int32 tlen;
419     afs_int32 length;
420
421     if ((code = ubik_CheckAuth(rxcall))) {
422         return code;
423     }
424 /* temporarily disabled because it causes problems for migration tool.  Hey, it's just
425  * a sanity check, anyway. 
426     if (ubeacon_AmSyncSite()) {
427       return UDEADLOCK;
428     }
429 */
430     dbase = ubik_dbase;
431     DBHOLD(dbase);
432     code = (*dbase->stat) (dbase, file, &ubikstat);
433     if (code < 0) {
434         DBRELE(dbase);
435         return code;
436     }
437     length = ubikstat.size;
438     tlen = htonl(length);
439     code = rx_Write(rxcall, (char *)&tlen, sizeof(afs_int32));
440     if (code != sizeof(afs_int32)) {
441         DBRELE(dbase);
442         ubik_dprint("Rx-write length error=%d\n", code);
443         return BULK_ERROR;
444     }
445     offset = 0;
446     while (length > 0) {
447         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
448         code = (*dbase->read) (dbase, file, tbuffer, offset, tlen);
449         if (code != tlen) {
450             DBRELE(dbase);
451             ubik_dprint("read failed error=%d\n", code);
452             return UIOERROR;
453         }
454         code = rx_Write(rxcall, tbuffer, tlen);
455         if (code != tlen) {
456             DBRELE(dbase);
457             ubik_dprint("Rx-write length error=%d\n", code);
458             return BULK_ERROR;
459         }
460         length -= tlen;
461         offset += tlen;
462     }
463     code = (*dbase->getlabel) (dbase, file, version);   /* return the dbase, too */
464     DBRELE(dbase);
465     return code;
466 }
467
468 afs_int32
469 SDISK_SendFile(register struct rx_call *rxcall, afs_int32 file, 
470                afs_int32 length, struct ubik_version *avers)
471 {
472     register afs_int32 code;
473     struct ubik_dbase *dbase = NULL;
474     char tbuffer[1024];
475     afs_int32 offset;
476     struct ubik_version tversion;
477     register int tlen;
478     struct rx_peer *tpeer;
479     struct rx_connection *tconn;
480     afs_uint32 otherHost = 0;
481     char hoststr[16];
482 #ifndef OLD_URECOVERY
483     char pbuffer[1028];
484     int flen, fd = -1;
485     afs_int32 epoch = 0;
486     afs_int32 pass;
487 #endif
488
489     /* send the file back to the requester */
490
491     if ((code = ubik_CheckAuth(rxcall))) {
492         goto failed;
493     }
494
495     /* next, we do a sanity check to see if the guy sending us the database is
496      * the guy we think is the sync site.  It turns out that we might not have
497      * decided yet that someone's the sync site, but they could have enough
498      * votes from others to be sync site anyway, and could send us the database
499      * in advance of getting our votes.  This is fine, what we're really trying
500      * to check is that some authenticated bogon isn't sending a random database
501      * into another configuration.  This could happen on a bad configuration
502      * screwup.  Thus, we only object if we're sure we know who the sync site
503      * is, and it ain't the guy talking to us.
504      */
505     offset = uvote_GetSyncSite();
506     tconn = rx_ConnectionOf(rxcall);
507     tpeer = rx_PeerOf(tconn);
508     otherHost = ubikGetPrimaryInterfaceAddr(rx_HostOf(tpeer));
509     if (offset && offset != otherHost) {
510         /* we *know* this is the wrong guy */
511         code = USYNC;
512         goto failed;
513     }
514
515     dbase = ubik_dbase;
516     DBHOLD(dbase);
517
518     /* abort any active trans that may scribble over the database */
519     urecovery_AbortAll(dbase);
520
521     ubik_print("Ubik: Synchronize database with server %s\n",
522                afs_inet_ntoa_r(otherHost, hoststr));
523
524     offset = 0;
525 #ifdef OLD_URECOVERY
526     (*dbase->truncate) (dbase, file, 0);        /* truncate first */
527     tversion.counter = 0;
528 #else
529     epoch =
530 #endif
531     tversion.epoch = 0;         /* start off by labelling in-transit db as invalid */
532     (*dbase->setlabel) (dbase, file, &tversion);        /* setlabel does sync */
533 #ifndef OLD_URECOVERY
534     flen = length;
535     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
536     fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
537     if (fd < 0) {
538         code = errno;
539         goto failed;
540     }
541     code = lseek(fd, HDRSIZE, 0);
542     if (code != HDRSIZE) {
543         close(fd);
544         goto failed;
545     }
546     pass = 0;
547 #endif
548     memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
549     while (length > 0) {
550         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
551 #if !defined(OLD_URECOVERY) && !defined(AFS_PTHREAD_ENV)
552         if (pass % 4 == 0)
553             IOMGR_Poll();
554 #endif
555         code = rx_Read(rxcall, tbuffer, tlen);
556         if (code != tlen) {
557             DBRELE(dbase);
558             ubik_dprint("Rx-read length error=%d\n", code);
559             code = BULK_ERROR;
560             close(fd);
561             goto failed;
562         }
563 #ifdef OLD_URECOVERY
564         code = (*dbase->write) (dbase, file, tbuffer, offset, tlen);
565 #else
566         code = write(fd, tbuffer, tlen);
567         pass++;
568 #endif
569         if (code != tlen) {
570             DBRELE(dbase);
571             ubik_dprint("write failed error=%d\n", code);
572             code = UIOERROR;
573             close(fd);
574             goto failed;
575         }
576         offset += tlen;
577         length -= tlen;
578     }
579 #ifndef OLD_URECOVERY
580     code = close(fd);
581     if (code)
582         goto failed;
583 #endif     
584
585     /* sync data first, then write label and resync (resync done by setlabel call).
586      * This way, good label is only on good database. */
587 #ifdef OLD_URECOVERY
588     (*ubik_dbase->sync) (dbase, file);
589 #else
590     afs_snprintf(tbuffer, sizeof(tbuffer), "%s.DB%s%d", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
591 #ifdef AFS_NT40_ENV
592     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
593     code = unlink(pbuffer);
594     if (!code)
595         code = rename(tbuffer, pbuffer);
596     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
597 #endif
598     if (!code) 
599         code = rename(pbuffer, tbuffer);
600     if (!code) {
601         (*ubik_dbase->open) (ubik_dbase, 0);
602 #endif
603         code = (*ubik_dbase->setlabel) (dbase, file, avers);
604 #ifndef OLD_URECOVERY
605     }
606 #ifdef AFS_NT40_ENV
607     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
608     unlink(pbuffer);
609 #endif
610 #endif
611     memcpy(&ubik_dbase->version, avers, sizeof(struct ubik_version));
612     udisk_Invalidate(dbase, file);      /* new dbase, flush disk buffers */
613 #ifdef AFS_PTHREAD_ENV
614     assert(pthread_cond_broadcast(&dbase->version_cond) == 0);
615 #else
616     LWP_NoYieldSignal(&dbase->version);
617 #endif
618     DBRELE(dbase);
619   failed:
620     if (code) {
621 #ifndef OLD_URECOVERY
622         unlink(pbuffer);
623         /* Failed to sync. Allow reads again for now. */
624         if (dbase != NULL) {
625             tversion.epoch = epoch;
626             (*dbase->setlabel) (dbase, file, &tversion);
627         }
628 #endif
629         ubik_print
630             ("Ubik: Synchronize database with server %s failed (error = %d)\n",
631              afs_inet_ntoa_r(otherHost, hoststr), code);
632     } else {
633         ubik_print("Ubik: Synchronize database completed\n");
634     }
635     return code;
636 }
637
638
639 afs_int32
640 SDISK_Probe(register struct rx_call *rxcall)
641 {
642     return 0;
643 }
644
645 /*!
646  * \brief Update remote machines addresses in my server list
647  *
648  * Send back my addresses to caller of this RPC
649  * \return zero on success, else 1.
650  */
651 afs_int32
652 SDISK_UpdateInterfaceAddr(register struct rx_call *rxcall, 
653                           UbikInterfaceAddr *inAddr, 
654                           UbikInterfaceAddr *outAddr)
655 {
656     struct ubik_server *ts, *tmp;
657     afs_uint32 remoteAddr;      /* in net byte order */
658     int i, j, found = 0, probableMatch = 0;
659     char hoststr[16];
660
661     /* copy the output parameters */
662     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR; i++)
663         outAddr->hostAddr[i] = ntohl(ubik_host[i]);
664
665     remoteAddr = htonl(inAddr->hostAddr[0]);
666     for (ts = ubik_servers; ts; ts = ts->next)
667         if (ts->addr[0] == remoteAddr) {        /* both in net byte order */
668             probableMatch = 1;
669             break;
670         }
671
672     if (probableMatch) {
673         /* verify that all addresses in the incoming RPC are
674          ** not part of other server entries in my CellServDB
675          */
676         for (i = 0; !found && (i < UBIK_MAX_INTERFACE_ADDR)
677              && inAddr->hostAddr[i]; i++) {
678             remoteAddr = htonl(inAddr->hostAddr[i]);
679             for (tmp = ubik_servers; (!found && tmp); tmp = tmp->next) {
680                 if (ts == tmp)  /* this is my server */
681                     continue;
682                 for (j = 0; (j < UBIK_MAX_INTERFACE_ADDR) && tmp->addr[j];
683                      j++)
684                     if (remoteAddr == tmp->addr[j]) {
685                         found = 1;
686                         break;
687                     }
688             }
689         }
690     }
691
692     /* if (probableMatch) */
693     /* inconsistent addresses in CellServDB */
694     if (!probableMatch || found) {
695         ubik_print("Inconsistent Cell Info from server: ");
696         for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
697             ubik_print("%s ", afs_inet_ntoa_r(htonl(inAddr->hostAddr[i]), hoststr));
698         ubik_print("\n");
699         fflush(stdout);
700         fflush(stderr);
701         printServerInfo();
702         return UBADHOST;
703     }
704
705     /* update our data structures */
706     for (i = 1; i < UBIK_MAX_INTERFACE_ADDR; i++)
707         ts->addr[i] = htonl(inAddr->hostAddr[i]);
708
709     ubik_print("ubik: A Remote Server has addresses: ");
710     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && ts->addr[i]; i++)
711         ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
712     ubik_print("\n");
713
714     return 0;
715 }
716
717 static void
718 printServerInfo(void)
719 {
720     struct ubik_server *ts;
721     int i, j = 1;
722     char hoststr[16];
723
724     ubik_print("Local CellServDB:");
725     for (ts = ubik_servers; ts; ts = ts->next, j++) {
726         ubik_print("Server %d: ", j);
727         for (i = 0; (i < UBIK_MAX_INTERFACE_ADDR) && ts->addr[i]; i++)
728             ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
729     }
730     ubik_print("\n");
731 }
732
733 afs_int32
734 SDISK_SetVersion(struct rx_call *rxcall, struct ubik_tid *atid, 
735                  struct ubik_version *oldversionp, 
736                  struct ubik_version *newversionp)
737 {
738     afs_int32 code = 0;
739     struct ubik_dbase *dbase;
740
741     if ((code = ubik_CheckAuth(rxcall))) {
742         return (code);
743     }
744
745     if (!ubik_currentTrans) {
746         return USYNC;
747     }
748     /* sanity check to make sure only write trans appear here */
749     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
750         return UBADTYPE;
751     }
752
753     /* Should not get this for the sync site */
754     if (ubeacon_AmSyncSite()) {
755         return UDEADLOCK;
756     }
757
758     dbase = ubik_currentTrans->dbase;
759     DBHOLD(dbase);
760     urecovery_CheckTid(atid);
761     if (!ubik_currentTrans) {
762         DBRELE(dbase);
763         return USYNC;
764     }
765
766     /* Set the label if its version matches the sync-site's */
767     if ((oldversionp->epoch == ubik_dbVersion.epoch)
768         && (oldversionp->counter == ubik_dbVersion.counter)) {
769         code = (*dbase->setlabel) (ubik_dbase, 0, newversionp);
770         if (!code) {
771             ubik_dbase->version = *newversionp;
772             ubik_dbVersion = *newversionp;
773         }
774     } else {
775         code = USYNC;
776     }
777
778     DBRELE(dbase);
779     return code;
780 }