ubik: add interface for reading during write locks
[openafs.git] / src / ubik / remote.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13
14 #include <sys/types.h>
15 #include <string.h>
16 #include <stdarg.h>
17
18 #ifdef AFS_NT40_ENV
19 #include <winsock2.h>
20 #include <fcntl.h>
21 #else
22 #include <sys/file.h>
23 #include <netinet/in.h>
24 #endif
25
26 #include <lock.h>
27 #include <rx/xdr.h>
28 #include <rx/rx.h>
29 #include <errno.h>
30 #include <afs/afsutil.h>
31
32 #define UBIK_INTERNALS
33 #include "ubik.h"
34 #include "ubik_int.h"
35
36 int (*ubik_CheckRXSecurityProc) (void *, struct rx_call *);
37 void *ubik_CheckRXSecurityRock;
38
39 static void printServerInfo(void);
40
41 /*! \file
42  * routines for handling requests remotely-submitted by the sync site.  These are
43  * only write transactions (we don't propagate read trans), and there is at most one
44  * write transaction extant at any one time.
45  */
46
47 struct ubik_trans *ubik_currentTrans = 0;
48
49 int
50 ubik_CheckAuth(register struct rx_call *acall)
51 {
52     register afs_int32 code;
53     if (ubik_CheckRXSecurityProc) {
54         code = (*ubik_CheckRXSecurityProc) (ubik_CheckRXSecurityRock, acall);
55         return code;
56     } else
57         return 0;
58 }
59
60 /* the rest of these guys handle remote execution of write
61  * transactions: this is the code executed on the other servers when a
62  * sync site is executing a write transaction.
63  */
64 afs_int32
65 SDISK_Begin(register struct rx_call *rxcall, struct ubik_tid *atid)
66 {
67     register afs_int32 code;
68
69     if ((code = ubik_CheckAuth(rxcall))) {
70         return code;
71     }
72     DBHOLD(ubik_dbase);
73     urecovery_CheckTid(atid);
74     if (ubik_currentTrans) {
75         /* If the thread is not waiting for lock - ok to end it */
76 #if !defined(UBIK_PAUSE)
77         if (ubik_currentTrans->locktype != LOCKWAIT) {
78 #endif /* UBIK_PAUSE */
79             udisk_end(ubik_currentTrans);
80 #if !defined(UBIK_PAUSE)
81         }
82 #endif /* UBIK_PAUSE */
83         ubik_currentTrans = (struct ubik_trans *)0;
84     }
85     code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
86     if (!code && ubik_currentTrans) {
87         /* label this trans with the right trans id */
88         ubik_currentTrans->tid.epoch = atid->epoch;
89         ubik_currentTrans->tid.counter = atid->counter;
90     }
91     DBRELE(ubik_dbase);
92     return code;
93 }
94
95
96 afs_int32
97 SDISK_Commit(register struct rx_call *rxcall, struct ubik_tid *atid)
98 {
99     register afs_int32 code;
100     register struct ubik_dbase *dbase;
101
102     if ((code = ubik_CheckAuth(rxcall))) {
103         return code;
104     }
105
106     if (!ubik_currentTrans) {
107         return USYNC;
108     }
109     /*
110      * sanity check to make sure only write trans appear here
111      */
112     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
113         return UBADTYPE;
114     }
115
116     dbase = ubik_currentTrans->dbase;
117
118     ObtainWriteLock(&dbase->cache_lock);
119
120     DBHOLD(dbase);
121
122     urecovery_CheckTid(atid);
123     if (!ubik_currentTrans) {
124         DBRELE(dbase);
125         ReleaseWriteLock(&dbase->cache_lock);
126         return USYNC;
127     }
128
129     code = udisk_commit(ubik_currentTrans);
130     if (code == 0) {
131         /* sync site should now match */
132         ubik_dbVersion = ubik_dbase->version;
133     }
134     DBRELE(dbase);
135     ReleaseWriteLock(&dbase->cache_lock);
136     return code;
137 }
138
139 afs_int32
140 SDISK_ReleaseLocks(register struct rx_call *rxcall, struct ubik_tid *atid)
141 {
142     register struct ubik_dbase *dbase;
143     register afs_int32 code;
144
145     if ((code = ubik_CheckAuth(rxcall))) {
146         return code;
147     }
148
149     if (!ubik_currentTrans) {
150         return USYNC;
151     }
152     /* sanity check to make sure only write trans appear here */
153     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
154         return UBADTYPE;
155     }
156
157     dbase = ubik_currentTrans->dbase;
158     DBHOLD(dbase);
159     urecovery_CheckTid(atid);
160     if (!ubik_currentTrans) {
161         DBRELE(dbase);
162         return USYNC;
163     }
164
165     /* If the thread is not waiting for lock - ok to end it */
166 #if !defined(UBIK_PAUSE)
167     if (ubik_currentTrans->locktype != LOCKWAIT) {
168 #endif /* UBIK_PAUSE */
169         udisk_end(ubik_currentTrans);
170 #if !defined(UBIK_PAUSE)
171     }
172 #endif /* UBIK_PAUSE */
173     ubik_currentTrans = (struct ubik_trans *)0;
174     DBRELE(dbase);
175     return 0;
176 }
177
178 afs_int32
179 SDISK_Abort(register struct rx_call *rxcall, struct ubik_tid *atid)
180 {
181     register afs_int32 code;
182     register struct ubik_dbase *dbase;
183
184     if ((code = ubik_CheckAuth(rxcall))) {
185         return code;
186     }
187
188     if (!ubik_currentTrans) {
189         return USYNC;
190     }
191     /* sanity check to make sure only write trans appear here  */
192     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
193         return UBADTYPE;
194     }
195
196     dbase = ubik_currentTrans->dbase;
197     DBHOLD(dbase);
198     urecovery_CheckTid(atid);
199     if (!ubik_currentTrans) {
200         DBRELE(dbase);
201         return USYNC;
202     }
203
204     code = udisk_abort(ubik_currentTrans);
205     /* If the thread is not waiting for lock - ok to end it */
206 #if !defined(UBIK_PAUSE)
207     if (ubik_currentTrans->locktype != LOCKWAIT) {
208 #endif /* UBIK_PAUSE */
209         udisk_end(ubik_currentTrans);
210 #if !defined(UBIK_PAUSE)
211     }
212 #endif /* UBIK_PAUSE */
213     ubik_currentTrans = (struct ubik_trans *)0;
214     DBRELE(dbase);
215     return code;
216 }
217
218 /* apos and alen are not used */
219 afs_int32
220 SDISK_Lock(register struct rx_call *rxcall, struct ubik_tid *atid, 
221            afs_int32 afile, afs_int32 apos, afs_int32 alen, afs_int32 atype)
222 {       
223     register afs_int32 code;
224     register struct ubik_dbase *dbase;
225     struct ubik_trans *ubik_thisTrans;
226
227     if ((code = ubik_CheckAuth(rxcall))) {
228         return code;
229     }
230     if (!ubik_currentTrans) {
231         return USYNC;
232     }
233     /* sanity check to make sure only write trans appear here */
234     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
235         return UBADTYPE;
236     }
237     if (alen != 1) {
238         return UBADLOCK;
239     }
240     dbase = ubik_currentTrans->dbase;
241     DBHOLD(dbase);
242     urecovery_CheckTid(atid);
243     if (!ubik_currentTrans) {
244         DBRELE(dbase);
245         return USYNC;
246     }
247
248     ubik_thisTrans = ubik_currentTrans;
249     code = ulock_getLock(ubik_currentTrans, atype, 1);
250
251     /* While waiting, the transaction may have been ended/
252      * aborted from under us (urecovery_CheckTid). In that
253      * case, end the transaction here.
254      */
255     if (!code && (ubik_currentTrans != ubik_thisTrans)) {
256         udisk_end(ubik_thisTrans);
257         code = USYNC;
258     }
259
260     DBRELE(dbase);
261     return code;
262 }
263
264 /*!
265  * \brief Write a vector of data
266  */
267 afs_int32
268 SDISK_WriteV(register struct rx_call *rxcall, struct ubik_tid *atid, 
269              iovec_wrt *io_vector, iovec_buf *io_buffer)
270 {
271     afs_int32 code, i, offset;
272     struct ubik_dbase *dbase;
273     struct ubik_iovec *iovec;
274     char *iobuf;
275
276     if ((code = ubik_CheckAuth(rxcall))) {
277         return code;
278     }
279     if (!ubik_currentTrans) {
280         return USYNC;
281     }
282     /* sanity check to make sure only write trans appear here */
283     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
284         return UBADTYPE;
285     }
286
287     dbase = ubik_currentTrans->dbase;
288     DBHOLD(dbase);
289     urecovery_CheckTid(atid);
290     if (!ubik_currentTrans) {
291         DBRELE(dbase);
292         return USYNC;
293     }
294
295     iovec = (struct ubik_iovec *)io_vector->iovec_wrt_val;
296     iobuf = (char *)io_buffer->iovec_buf_val;
297     for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
298         /* Sanity check for going off end of buffer */
299         if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
300             code = UINTERNAL;
301         } else {
302             code =
303                 udisk_write(ubik_currentTrans, iovec[i].file, &iobuf[offset],
304                             iovec[i].position, iovec[i].length);
305         }
306         if (code)
307             break;
308
309         offset += iovec[i].length;
310     }
311
312     DBRELE(dbase);
313     return code;
314 }
315
316 afs_int32
317 SDISK_Write(register struct rx_call *rxcall, struct ubik_tid *atid, 
318             afs_int32 afile, afs_int32 apos, register bulkdata *adata)
319 {
320     register afs_int32 code;
321     register struct ubik_dbase *dbase;
322
323     if ((code = ubik_CheckAuth(rxcall))) {
324         return code;
325     }
326     if (!ubik_currentTrans) {
327         return USYNC;
328     }
329     /* sanity check to make sure only write trans appear here */
330     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
331         return UBADTYPE;
332     }
333
334     dbase = ubik_currentTrans->dbase;
335     DBHOLD(dbase);
336     urecovery_CheckTid(atid);
337     if (!ubik_currentTrans) {
338         DBRELE(dbase);
339         return USYNC;
340     }
341     code =
342         udisk_write(ubik_currentTrans, afile, adata->bulkdata_val, apos,
343                     adata->bulkdata_len);
344     DBRELE(dbase);
345     return code;
346 }
347
348 afs_int32
349 SDISK_Truncate(register struct rx_call *rxcall, struct ubik_tid *atid, 
350                afs_int32 afile, afs_int32 alen)
351 {
352     register afs_int32 code;
353     register struct ubik_dbase *dbase;
354
355     if ((code = ubik_CheckAuth(rxcall))) {
356         return code;
357     }
358     if (!ubik_currentTrans) {
359         return USYNC;
360     }
361     /* sanity check to make sure only write trans appear here */
362     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
363         return UBADTYPE;
364     }
365
366     dbase = ubik_currentTrans->dbase;
367     DBHOLD(dbase);
368     urecovery_CheckTid(atid);
369     if (!ubik_currentTrans) {
370         DBRELE(dbase);
371         return USYNC;
372     }
373     code = udisk_truncate(ubik_currentTrans, afile, alen);
374     DBRELE(dbase);
375     return code;
376 }
377
378 afs_int32
379 SDISK_GetVersion(register struct rx_call *rxcall, 
380                  register struct ubik_version *aversion)
381 {
382     register afs_int32 code;
383
384     if ((code = ubik_CheckAuth(rxcall))) {
385         return code;
386     }
387
388     /*
389      * If we are the sync site, recovery shouldn't be running on any
390      * other site. We shouldn't be getting this RPC as long as we are
391      * the sync site.  To prevent any unforseen activity, we should
392      * reject this RPC until we have recognized that we are not the
393      * sync site anymore, and/or if we have any pending WRITE
394      * transactions that have to complete. This way we can be assured
395      * that this RPC would not block any pending transactions that
396      * should either fail or pass. If we have recognized the fact that
397      * we are not the sync site any more, all write transactions would
398      * fail with UNOQUORUM anyway.
399      */
400     if (ubeacon_AmSyncSite()) {
401         return UDEADLOCK;
402     }
403
404     DBHOLD(ubik_dbase);
405     code = (*ubik_dbase->getlabel) (ubik_dbase, 0, aversion);
406     DBRELE(ubik_dbase);
407     if (code) {
408         /* tell other side there's no dbase */
409         aversion->epoch = 0;
410         aversion->counter = 0;
411     }
412     return 0;
413 }
414
415 afs_int32
416 SDISK_GetFile(register struct rx_call *rxcall, register afs_int32 file, 
417               struct ubik_version *version)
418 {
419     register afs_int32 code;
420     register struct ubik_dbase *dbase;
421     register afs_int32 offset;
422     struct ubik_stat ubikstat;
423     char tbuffer[256];
424     afs_int32 tlen;
425     afs_int32 length;
426
427     if ((code = ubik_CheckAuth(rxcall))) {
428         return code;
429     }
430 /* temporarily disabled because it causes problems for migration tool.  Hey, it's just
431  * a sanity check, anyway. 
432     if (ubeacon_AmSyncSite()) {
433       return UDEADLOCK;
434     }
435 */
436     dbase = ubik_dbase;
437     DBHOLD(dbase);
438     code = (*dbase->stat) (dbase, file, &ubikstat);
439     if (code < 0) {
440         DBRELE(dbase);
441         return code;
442     }
443     length = ubikstat.size;
444     tlen = htonl(length);
445     code = rx_Write(rxcall, (char *)&tlen, sizeof(afs_int32));
446     if (code != sizeof(afs_int32)) {
447         DBRELE(dbase);
448         ubik_dprint("Rx-write length error=%d\n", code);
449         return BULK_ERROR;
450     }
451     offset = 0;
452     while (length > 0) {
453         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
454         code = (*dbase->read) (dbase, file, tbuffer, offset, tlen);
455         if (code != tlen) {
456             DBRELE(dbase);
457             ubik_dprint("read failed error=%d\n", code);
458             return UIOERROR;
459         }
460         code = rx_Write(rxcall, tbuffer, tlen);
461         if (code != tlen) {
462             DBRELE(dbase);
463             ubik_dprint("Rx-write length error=%d\n", code);
464             return BULK_ERROR;
465         }
466         length -= tlen;
467         offset += tlen;
468     }
469     code = (*dbase->getlabel) (dbase, file, version);   /* return the dbase, too */
470     DBRELE(dbase);
471     return code;
472 }
473
474 afs_int32
475 SDISK_SendFile(register struct rx_call *rxcall, afs_int32 file, 
476                afs_int32 length, struct ubik_version *avers)
477 {
478     register afs_int32 code;
479     struct ubik_dbase *dbase = NULL;
480     char tbuffer[1024];
481     afs_int32 offset;
482     struct ubik_version tversion;
483     register int tlen;
484     struct rx_peer *tpeer;
485     struct rx_connection *tconn;
486     afs_uint32 otherHost = 0;
487     char hoststr[16];
488 #ifndef OLD_URECOVERY
489     char pbuffer[1028];
490     int flen, fd = -1;
491     afs_int32 epoch = 0;
492     afs_int32 pass;
493 #endif
494
495     /* send the file back to the requester */
496
497     if ((code = ubik_CheckAuth(rxcall))) {
498         goto failed;
499     }
500
501     /* next, we do a sanity check to see if the guy sending us the database is
502      * the guy we think is the sync site.  It turns out that we might not have
503      * decided yet that someone's the sync site, but they could have enough
504      * votes from others to be sync site anyway, and could send us the database
505      * in advance of getting our votes.  This is fine, what we're really trying
506      * to check is that some authenticated bogon isn't sending a random database
507      * into another configuration.  This could happen on a bad configuration
508      * screwup.  Thus, we only object if we're sure we know who the sync site
509      * is, and it ain't the guy talking to us.
510      */
511     offset = uvote_GetSyncSite();
512     tconn = rx_ConnectionOf(rxcall);
513     tpeer = rx_PeerOf(tconn);
514     otherHost = ubikGetPrimaryInterfaceAddr(rx_HostOf(tpeer));
515     if (offset && offset != otherHost) {
516         /* we *know* this is the wrong guy */
517         code = USYNC;
518         goto failed;
519     }
520
521     dbase = ubik_dbase;
522     DBHOLD(dbase);
523
524     /* abort any active trans that may scribble over the database */
525     urecovery_AbortAll(dbase);
526
527     ubik_print("Ubik: Synchronize database with server %s\n",
528                afs_inet_ntoa_r(otherHost, hoststr));
529
530     offset = 0;
531 #ifdef OLD_URECOVERY
532     (*dbase->truncate) (dbase, file, 0);        /* truncate first */
533     tversion.counter = 0;
534 #else
535     epoch =
536 #endif
537     tversion.epoch = 0;         /* start off by labelling in-transit db as invalid */
538     (*dbase->setlabel) (dbase, file, &tversion);        /* setlabel does sync */
539 #ifndef OLD_URECOVERY
540     flen = length;
541     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
542     fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
543     if (fd < 0) {
544         code = errno;
545         goto failed;
546     }
547     code = lseek(fd, HDRSIZE, 0);
548     if (code != HDRSIZE) {
549         close(fd);
550         goto failed;
551     }
552     pass = 0;
553 #endif
554     memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
555     while (length > 0) {
556         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
557 #if !defined(OLD_URECOVERY) && !defined(AFS_PTHREAD_ENV)
558         if (pass % 4 == 0)
559             IOMGR_Poll();
560 #endif
561         code = rx_Read(rxcall, tbuffer, tlen);
562         if (code != tlen) {
563             DBRELE(dbase);
564             ubik_dprint("Rx-read length error=%d\n", code);
565             code = BULK_ERROR;
566             close(fd);
567             goto failed;
568         }
569 #ifdef OLD_URECOVERY
570         code = (*dbase->write) (dbase, file, tbuffer, offset, tlen);
571 #else
572         code = write(fd, tbuffer, tlen);
573         pass++;
574 #endif
575         if (code != tlen) {
576             DBRELE(dbase);
577             ubik_dprint("write failed error=%d\n", code);
578             code = UIOERROR;
579             close(fd);
580             goto failed;
581         }
582         offset += tlen;
583         length -= tlen;
584     }
585 #ifndef OLD_URECOVERY
586     code = close(fd);
587     if (code)
588         goto failed;
589 #endif     
590
591     /* sync data first, then write label and resync (resync done by setlabel call).
592      * This way, good label is only on good database. */
593 #ifdef OLD_URECOVERY
594     (*ubik_dbase->sync) (dbase, file);
595 #else
596     afs_snprintf(tbuffer, sizeof(tbuffer), "%s.DB%s%d", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
597 #ifdef AFS_NT40_ENV
598     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
599     code = unlink(pbuffer);
600     if (!code)
601         code = rename(tbuffer, pbuffer);
602     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
603 #endif
604     if (!code) 
605         code = rename(pbuffer, tbuffer);
606     if (!code) {
607         (*ubik_dbase->open) (ubik_dbase, 0);
608 #endif
609         code = (*ubik_dbase->setlabel) (dbase, file, avers);
610 #ifndef OLD_URECOVERY
611     }
612 #ifdef AFS_NT40_ENV
613     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
614     unlink(pbuffer);
615 #endif
616 #endif
617     memcpy(&ubik_dbase->version, avers, sizeof(struct ubik_version));
618     udisk_Invalidate(dbase, file);      /* new dbase, flush disk buffers */
619 #ifdef AFS_PTHREAD_ENV
620     assert(pthread_cond_broadcast(&dbase->version_cond) == 0);
621 #else
622     LWP_NoYieldSignal(&dbase->version);
623 #endif
624     DBRELE(dbase);
625   failed:
626     if (code) {
627 #ifndef OLD_URECOVERY
628         unlink(pbuffer);
629         /* Failed to sync. Allow reads again for now. */
630         if (dbase != NULL) {
631             tversion.epoch = epoch;
632             (*dbase->setlabel) (dbase, file, &tversion);
633         }
634 #endif
635         ubik_print
636             ("Ubik: Synchronize database with server %s failed (error = %d)\n",
637              afs_inet_ntoa_r(otherHost, hoststr), code);
638     } else {
639         ubik_print("Ubik: Synchronize database completed\n");
640     }
641     return code;
642 }
643
644
645 afs_int32
646 SDISK_Probe(register struct rx_call *rxcall)
647 {
648     return 0;
649 }
650
651 /*!
652  * \brief Update remote machines addresses in my server list
653  *
654  * Send back my addresses to caller of this RPC
655  * \return zero on success, else 1.
656  */
657 afs_int32
658 SDISK_UpdateInterfaceAddr(register struct rx_call *rxcall, 
659                           UbikInterfaceAddr *inAddr, 
660                           UbikInterfaceAddr *outAddr)
661 {
662     struct ubik_server *ts, *tmp;
663     afs_uint32 remoteAddr;      /* in net byte order */
664     int i, j, found = 0, probableMatch = 0;
665     char hoststr[16];
666
667     /* copy the output parameters */
668     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR; i++)
669         outAddr->hostAddr[i] = ntohl(ubik_host[i]);
670
671     remoteAddr = htonl(inAddr->hostAddr[0]);
672     for (ts = ubik_servers; ts; ts = ts->next)
673         if (ts->addr[0] == remoteAddr) {        /* both in net byte order */
674             probableMatch = 1;
675             break;
676         }
677
678     if (probableMatch) {
679         /* verify that all addresses in the incoming RPC are
680          ** not part of other server entries in my CellServDB
681          */
682         for (i = 0; !found && (i < UBIK_MAX_INTERFACE_ADDR)
683              && inAddr->hostAddr[i]; i++) {
684             remoteAddr = htonl(inAddr->hostAddr[i]);
685             for (tmp = ubik_servers; (!found && tmp); tmp = tmp->next) {
686                 if (ts == tmp)  /* this is my server */
687                     continue;
688                 for (j = 0; (j < UBIK_MAX_INTERFACE_ADDR) && tmp->addr[j];
689                      j++)
690                     if (remoteAddr == tmp->addr[j]) {
691                         found = 1;
692                         break;
693                     }
694             }
695         }
696     }
697
698     /* if (probableMatch) */
699     /* inconsistent addresses in CellServDB */
700     if (!probableMatch || found) {
701         ubik_print("Inconsistent Cell Info from server: ");
702         for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
703             ubik_print("%s ", afs_inet_ntoa_r(htonl(inAddr->hostAddr[i]), hoststr));
704         ubik_print("\n");
705         fflush(stdout);
706         fflush(stderr);
707         printServerInfo();
708         return UBADHOST;
709     }
710
711     /* update our data structures */
712     for (i = 1; i < UBIK_MAX_INTERFACE_ADDR; i++)
713         ts->addr[i] = htonl(inAddr->hostAddr[i]);
714
715     ubik_print("ubik: A Remote Server has addresses: ");
716     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && ts->addr[i]; i++)
717         ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
718     ubik_print("\n");
719
720     return 0;
721 }
722
723 static void
724 printServerInfo(void)
725 {
726     struct ubik_server *ts;
727     int i, j = 1;
728     char hoststr[16];
729
730     ubik_print("Local CellServDB:");
731     for (ts = ubik_servers; ts; ts = ts->next, j++) {
732         ubik_print("Server %d: ", j);
733         for (i = 0; (i < UBIK_MAX_INTERFACE_ADDR) && ts->addr[i]; i++)
734             ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
735     }
736     ubik_print("\n");
737 }
738
739 afs_int32
740 SDISK_SetVersion(struct rx_call *rxcall, struct ubik_tid *atid, 
741                  struct ubik_version *oldversionp, 
742                  struct ubik_version *newversionp)
743 {
744     afs_int32 code = 0;
745     struct ubik_dbase *dbase;
746
747     if ((code = ubik_CheckAuth(rxcall))) {
748         return (code);
749     }
750
751     if (!ubik_currentTrans) {
752         return USYNC;
753     }
754     /* sanity check to make sure only write trans appear here */
755     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
756         return UBADTYPE;
757     }
758
759     /* Should not get this for the sync site */
760     if (ubeacon_AmSyncSite()) {
761         return UDEADLOCK;
762     }
763
764     dbase = ubik_currentTrans->dbase;
765     DBHOLD(dbase);
766     urecovery_CheckTid(atid);
767     if (!ubik_currentTrans) {
768         DBRELE(dbase);
769         return USYNC;
770     }
771
772     /* Set the label if its version matches the sync-site's */
773     if ((oldversionp->epoch == ubik_dbVersion.epoch)
774         && (oldversionp->counter == ubik_dbVersion.counter)) {
775         code = (*dbase->setlabel) (ubik_dbase, 0, newversionp);
776         if (!code) {
777             ubik_dbase->version = *newversionp;
778             ubik_dbVersion = *newversionp;
779         }
780     } else {
781         code = USYNC;
782     }
783
784     DBRELE(dbase);
785     return code;
786 }