ubik-doxygen-20081121
[openafs.git] / src / ubik / remote.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 RCSID
14     ("$Header$");
15
16 #include <sys/types.h>
17 #ifdef AFS_NT40_ENV
18 #include <winsock2.h>
19 #include <fcntl.h>
20 #else
21 #include <sys/file.h>
22 #include <netinet/in.h>
23 #endif
24 #include <string.h>
25 #include <lock.h>
26 #include <rx/xdr.h>
27 #include <rx/rx.h>
28 #include <errno.h>
29 #include <afs/afsutil.h>
30
31 #define UBIK_INTERNALS
32 #include "ubik.h"
33 #include "ubik_int.h"
34 int (*ubik_CheckRXSecurityProc) ();
35 char *ubik_CheckRXSecurityRock;
36 void printServerInfo();
37
38 /*! \file
39  * routines for handling requests remotely-submitted by the sync site.  These are
40  * only write transactions (we don't propagate read trans), and there is at most one
41  * write transaction extant at any one time.
42  */
43
44 struct ubik_trans *ubik_currentTrans = 0;
45
46
47 ubik_CheckAuth(acall)
48      register struct rx_call *acall;
49 {
50     register afs_int32 code;
51     if (ubik_CheckRXSecurityProc) {
52         code = (*ubik_CheckRXSecurityProc) (ubik_CheckRXSecurityRock, acall);
53         return code;
54     } else
55         return 0;
56 }
57
58 /* the rest of these guys handle remote execution of write
59  * transactions: this is the code executed on the other servers when a
60  * sync site is executing a write transaction.
61  */
62 afs_int32
63 SDISK_Begin(rxcall, atid)
64      register struct rx_call *rxcall;
65      struct ubik_tid *atid;
66 {
67     register afs_int32 code;
68
69     if ((code = ubik_CheckAuth(rxcall))) {
70         return code;
71     }
72     DBHOLD(ubik_dbase);
73     urecovery_CheckTid(atid);
74     if (ubik_currentTrans) {
75         /* If the thread is not waiting for lock - ok to end it */
76 #if !defined(UBIK_PAUSE)
77         if (ubik_currentTrans->locktype != LOCKWAIT) {
78 #endif /* UBIK_PAUSE */
79             udisk_end(ubik_currentTrans);
80 #if !defined(UBIK_PAUSE)
81         }
82 #endif /* UBIK_PAUSE */
83         ubik_currentTrans = (struct ubik_trans *)0;
84     }
85     code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
86     if (!code && ubik_currentTrans) {
87         /* label this trans with the right trans id */
88         ubik_currentTrans->tid.epoch = atid->epoch;
89         ubik_currentTrans->tid.counter = atid->counter;
90     }
91     DBRELE(ubik_dbase);
92     return code;
93 }
94
95
96 afs_int32
97 SDISK_Commit(rxcall, atid)
98      register struct rx_call *rxcall;
99      struct ubik_tid *atid;
100 {
101     register afs_int32 code;
102     register struct ubik_dbase *dbase;
103
104     if ((code = ubik_CheckAuth(rxcall))) {
105         return code;
106     }
107
108     if (!ubik_currentTrans) {
109         return USYNC;
110     }
111     /*
112      * sanity check to make sure only write trans appear here
113      */
114     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
115         return UBADTYPE;
116     }
117
118     dbase = ubik_currentTrans->dbase;
119     DBHOLD(dbase);
120     urecovery_CheckTid(atid);
121     if (!ubik_currentTrans) {
122         DBRELE(dbase);
123         return USYNC;
124     }
125
126     code = udisk_commit(ubik_currentTrans);
127     if (code == 0) {
128         /* sync site should now match */
129         ubik_dbVersion = ubik_dbase->version;
130     }
131     DBRELE(dbase);
132     return code;
133 }
134
135 afs_int32
136 SDISK_ReleaseLocks(rxcall, atid)
137      register struct rx_call *rxcall;
138      struct ubik_tid *atid;
139 {
140     register struct ubik_dbase *dbase;
141     register afs_int32 code;
142
143     if ((code = ubik_CheckAuth(rxcall))) {
144         return code;
145     }
146
147     if (!ubik_currentTrans) {
148         return USYNC;
149     }
150     /* sanity check to make sure only write trans appear here */
151     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
152         return UBADTYPE;
153     }
154
155     dbase = ubik_currentTrans->dbase;
156     DBHOLD(dbase);
157     urecovery_CheckTid(atid);
158     if (!ubik_currentTrans) {
159         DBRELE(dbase);
160         return USYNC;
161     }
162
163     /* If the thread is not waiting for lock - ok to end it */
164 #if !defined(UBIK_PAUSE)
165     if (ubik_currentTrans->locktype != LOCKWAIT) {
166 #endif /* UBIK_PAUSE */
167         udisk_end(ubik_currentTrans);
168 #if !defined(UBIK_PAUSE)
169     }
170 #endif /* UBIK_PAUSE */
171     ubik_currentTrans = (struct ubik_trans *)0;
172     DBRELE(dbase);
173     return 0;
174 }
175
176 afs_int32
177 SDISK_Abort(rxcall, atid)
178      register struct rx_call *rxcall;
179      struct ubik_tid *atid;
180 {
181     register afs_int32 code;
182     register struct ubik_dbase *dbase;
183
184     if ((code = ubik_CheckAuth(rxcall))) {
185         return code;
186     }
187
188     if (!ubik_currentTrans) {
189         return USYNC;
190     }
191     /* sanity check to make sure only write trans appear here  */
192     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
193         return UBADTYPE;
194     }
195
196     dbase = ubik_currentTrans->dbase;
197     DBHOLD(dbase);
198     urecovery_CheckTid(atid);
199     if (!ubik_currentTrans) {
200         DBRELE(dbase);
201         return USYNC;
202     }
203
204     code = udisk_abort(ubik_currentTrans);
205     /* If the thread is not waiting for lock - ok to end it */
206 #if !defined(UBIK_PAUSE)
207     if (ubik_currentTrans->locktype != LOCKWAIT) {
208 #endif /* UBIK_PAUSE */
209         udisk_end(ubik_currentTrans);
210 #if !defined(UBIK_PAUSE)
211     }
212 #endif /* UBIK_PAUSE */
213     ubik_currentTrans = (struct ubik_trans *)0;
214     DBRELE(dbase);
215     return code;
216 }
217
218 afs_int32
219 SDISK_Lock(rxcall, atid, afile, apos, alen, atype)
220      register struct rx_call *rxcall;
221      struct ubik_tid *atid;
222      afs_int32 afile, apos, alen, atype;        /* apos and alen are not used */
223 {
224     register afs_int32 code;
225     register struct ubik_dbase *dbase;
226     struct ubik_trans *ubik_thisTrans;
227
228     if ((code = ubik_CheckAuth(rxcall))) {
229         return code;
230     }
231     if (!ubik_currentTrans) {
232         return USYNC;
233     }
234     /* sanity check to make sure only write trans appear here */
235     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
236         return UBADTYPE;
237     }
238     if (alen != 1) {
239         return UBADLOCK;
240     }
241     dbase = ubik_currentTrans->dbase;
242     DBHOLD(dbase);
243     urecovery_CheckTid(atid);
244     if (!ubik_currentTrans) {
245         DBRELE(dbase);
246         return USYNC;
247     }
248
249     ubik_thisTrans = ubik_currentTrans;
250     code = ulock_getLock(ubik_currentTrans, atype, 1);
251
252     /* While waiting, the transaction may have been ended/
253      * aborted from under us (urecovery_CheckTid). In that
254      * case, end the transaction here.
255      */
256     if (!code && (ubik_currentTrans != ubik_thisTrans)) {
257         udisk_end(ubik_thisTrans);
258         code = USYNC;
259     }
260
261     DBRELE(dbase);
262     return code;
263 }
264
265 /*!
266  * \brief Write a vector of data
267  */
268 afs_int32
269 SDISK_WriteV(rxcall, atid, io_vector, io_buffer)
270      register struct rx_call *rxcall;
271      struct ubik_tid *atid;
272      iovec_wrt *io_vector;
273      iovec_buf *io_buffer;
274 {
275     afs_int32 code, i, offset;
276     struct ubik_dbase *dbase;
277     struct ubik_iovec *iovec;
278     char *iobuf;
279
280     if ((code = ubik_CheckAuth(rxcall))) {
281         return code;
282     }
283     if (!ubik_currentTrans) {
284         return USYNC;
285     }
286     /* sanity check to make sure only write trans appear here */
287     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
288         return UBADTYPE;
289     }
290
291     dbase = ubik_currentTrans->dbase;
292     DBHOLD(dbase);
293     urecovery_CheckTid(atid);
294     if (!ubik_currentTrans) {
295         DBRELE(dbase);
296         return USYNC;
297     }
298
299     iovec = (struct ubik_iovec *)io_vector->iovec_wrt_val;
300     iobuf = (char *)io_buffer->iovec_buf_val;
301     for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
302         /* Sanity check for going off end of buffer */
303         if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
304             code = UINTERNAL;
305         } else {
306             code =
307                 udisk_write(ubik_currentTrans, iovec[i].file, &iobuf[offset],
308                             iovec[i].position, iovec[i].length);
309         }
310         if (code)
311             break;
312
313         offset += iovec[i].length;
314     }
315
316     DBRELE(dbase);
317     return code;
318 }
319
320 afs_int32
321 SDISK_Write(rxcall, atid, afile, apos, adata)
322      register struct rx_call *rxcall;
323      struct ubik_tid *atid;
324      afs_int32 afile, apos;
325      register bulkdata *adata;
326 {
327     register afs_int32 code;
328     register struct ubik_dbase *dbase;
329
330     if ((code = ubik_CheckAuth(rxcall))) {
331         return code;
332     }
333     if (!ubik_currentTrans) {
334         return USYNC;
335     }
336     /* sanity check to make sure only write trans appear here */
337     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
338         return UBADTYPE;
339     }
340
341     dbase = ubik_currentTrans->dbase;
342     DBHOLD(dbase);
343     urecovery_CheckTid(atid);
344     if (!ubik_currentTrans) {
345         DBRELE(dbase);
346         return USYNC;
347     }
348     code =
349         udisk_write(ubik_currentTrans, afile, adata->bulkdata_val, apos,
350                     adata->bulkdata_len);
351     DBRELE(dbase);
352     return code;
353 }
354
355 afs_int32
356 SDISK_Truncate(rxcall, atid, afile, alen)
357      register struct rx_call *rxcall;
358      struct ubik_tid *atid;
359      afs_int32 afile;
360      afs_int32 alen;
361 {
362     register afs_int32 code;
363     register struct ubik_dbase *dbase;
364
365     if ((code = ubik_CheckAuth(rxcall))) {
366         return code;
367     }
368     if (!ubik_currentTrans) {
369         return USYNC;
370     }
371     /* sanity check to make sure only write trans appear here */
372     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
373         return UBADTYPE;
374     }
375
376     dbase = ubik_currentTrans->dbase;
377     DBHOLD(dbase);
378     urecovery_CheckTid(atid);
379     if (!ubik_currentTrans) {
380         DBRELE(dbase);
381         return USYNC;
382     }
383     code = udisk_truncate(ubik_currentTrans, afile, alen);
384     DBRELE(dbase);
385     return code;
386 }
387
388 afs_int32
389 SDISK_GetVersion(rxcall, aversion)
390      register struct rx_call *rxcall;
391      register struct ubik_version *aversion;
392 {
393     register afs_int32 code;
394
395     if ((code = ubik_CheckAuth(rxcall))) {
396         return code;
397     }
398
399     /*
400      * If we are the sync site, recovery shouldn't be running on any
401      * other site. We shouldn't be getting this RPC as long as we are
402      * the sync site.  To prevent any unforseen activity, we should
403      * reject this RPC until we have recognized that we are not the
404      * sync site anymore, and/or if we have any pending WRITE
405      * transactions that have to complete. This way we can be assured
406      * that this RPC would not block any pending transactions that
407      * should either fail or pass. If we have recognized the fact that
408      * we are not the sync site any more, all write transactions would
409      * fail with UNOQUORUM anyway.
410      */
411     if (ubeacon_AmSyncSite()) {
412         return UDEADLOCK;
413     }
414
415     DBHOLD(ubik_dbase);
416     code = (*ubik_dbase->getlabel) (ubik_dbase, 0, aversion);
417     DBRELE(ubik_dbase);
418     if (code) {
419         /* tell other side there's no dbase */
420         aversion->epoch = 0;
421         aversion->counter = 0;
422     }
423     return 0;
424 }
425
426 afs_int32
427 SDISK_GetFile(rxcall, file, version)
428      register struct rx_call *rxcall;
429      register afs_int32 file;
430      struct ubik_version *version;
431 {
432     register afs_int32 code;
433     register struct ubik_dbase *dbase;
434     register afs_int32 offset;
435     struct ubik_stat ubikstat;
436     char tbuffer[256];
437     afs_int32 tlen;
438     afs_int32 length;
439
440     if ((code = ubik_CheckAuth(rxcall))) {
441         return code;
442     }
443 /* temporarily disabled because it causes problems for migration tool.  Hey, it's just
444  * a sanity check, anyway. 
445     if (ubeacon_AmSyncSite()) {
446       return UDEADLOCK;
447     }
448 */
449     dbase = ubik_dbase;
450     DBHOLD(dbase);
451     code = (*dbase->stat) (dbase, file, &ubikstat);
452     if (code < 0) {
453         DBRELE(dbase);
454         return code;
455     }
456     length = ubikstat.size;
457     tlen = htonl(length);
458     code = rx_Write(rxcall, (char *)&tlen, sizeof(afs_int32));
459     if (code != sizeof(afs_int32)) {
460         DBRELE(dbase);
461         ubik_dprint("Rx-write length error=%d\n", code);
462         return BULK_ERROR;
463     }
464     offset = 0;
465     while (length > 0) {
466         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
467         code = (*dbase->read) (dbase, file, tbuffer, offset, tlen);
468         if (code != tlen) {
469             DBRELE(dbase);
470             ubik_dprint("read failed error=%d\n", code);
471             return UIOERROR;
472         }
473         code = rx_Write(rxcall, tbuffer, tlen);
474         if (code != tlen) {
475             DBRELE(dbase);
476             ubik_dprint("Rx-write length error=%d\n", code);
477             return BULK_ERROR;
478         }
479         length -= tlen;
480         offset += tlen;
481     }
482     code = (*dbase->getlabel) (dbase, file, version);   /* return the dbase, too */
483     DBRELE(dbase);
484     return code;
485 }
486
487 afs_int32
488 SDISK_SendFile(rxcall, file, length, avers)
489      register struct rx_call *rxcall;
490      afs_int32 file;
491      afs_int32 length;
492      struct ubik_version *avers;
493 {
494     register afs_int32 code;
495     struct ubik_dbase *dbase = NULL;
496     char tbuffer[1024];
497     afs_int32 offset;
498     struct ubik_version tversion;
499     register int tlen;
500     struct rx_peer *tpeer;
501     struct rx_connection *tconn;
502     afs_uint32 otherHost = 0;
503 #ifndef OLD_URECOVERY
504     char pbuffer[1028];
505     int flen, fd = -1;
506     afs_int32 epoch = 0;
507     afs_int32 pass;
508 #endif
509
510     /* send the file back to the requester */
511
512     if ((code = ubik_CheckAuth(rxcall))) {
513         goto failed;
514     }
515
516     /* next, we do a sanity check to see if the guy sending us the database is
517      * the guy we think is the sync site.  It turns out that we might not have
518      * decided yet that someone's the sync site, but they could have enough
519      * votes from others to be sync site anyway, and could send us the database
520      * in advance of getting our votes.  This is fine, what we're really trying
521      * to check is that some authenticated bogon isn't sending a random database
522      * into another configuration.  This could happen on a bad configuration
523      * screwup.  Thus, we only object if we're sure we know who the sync site
524      * is, and it ain't the guy talking to us.
525      */
526     offset = uvote_GetSyncSite();
527     tconn = rx_ConnectionOf(rxcall);
528     tpeer = rx_PeerOf(tconn);
529     otherHost = ubikGetPrimaryInterfaceAddr(rx_HostOf(tpeer));
530     if (offset && offset != otherHost) {
531         /* we *know* this is the wrong guy */
532         code = USYNC;
533         goto failed;
534     }
535
536     dbase = ubik_dbase;
537     DBHOLD(dbase);
538
539     /* abort any active trans that may scribble over the database */
540     urecovery_AbortAll(dbase);
541
542     ubik_print("Ubik: Synchronize database with server %s\n",
543                afs_inet_ntoa(otherHost));
544
545     offset = 0;
546 #ifdef OLD_URECOVERY
547     (*dbase->truncate) (dbase, file, 0);        /* truncate first */
548     tversion.counter = 0;
549 #else
550     epoch =
551 #endif
552     tversion.epoch = 0;         /* start off by labelling in-transit db as invalid */
553     (*dbase->setlabel) (dbase, file, &tversion);        /* setlabel does sync */
554 #ifndef OLD_URECOVERY
555     flen = length;
556     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.TMP", ubik_dbase->pathName);
557     fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
558     if (fd < 0) {
559         code = errno;
560         goto failed;
561     }
562     code = lseek(fd, HDRSIZE, 0);
563     if (code != HDRSIZE) {
564         close(fd);
565         goto failed;
566     }
567     pass = 0;
568 #endif
569     memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
570     while (length > 0) {
571         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
572 #if !defined(OLD_URECOVERY) && !defined(AFS_PTHREAD_ENV)
573         if (pass % 4 == 0)
574             IOMGR_Poll();
575 #endif
576         code = rx_Read(rxcall, tbuffer, tlen);
577         if (code != tlen) {
578             DBRELE(dbase);
579             ubik_dprint("Rx-read length error=%d\n", code);
580             code = BULK_ERROR;
581             close(fd);
582             goto failed;
583         }
584 #ifdef OLD_URECOVERY
585         code = (*dbase->write) (dbase, file, tbuffer, offset, tlen);
586 #else
587         code = write(fd, tbuffer, tlen);
588         pass++;
589 #endif
590         if (code != tlen) {
591             DBRELE(dbase);
592             ubik_dprint("write failed error=%d\n", code);
593             code = UIOERROR;
594             close(fd);
595             goto failed;
596         }
597         offset += tlen;
598         length -= tlen;
599     }
600 #ifndef OLD_URECOVERY
601     code = close(fd);
602     if (code)
603         goto failed;
604 #endif     
605
606     /* sync data first, then write label and resync (resync done by setlabel call).
607      * This way, good label is only on good database. */
608 #ifdef OLD_URECOVERY
609     (*ubik_dbase->sync) (dbase, file);
610 #else
611     afs_snprintf(tbuffer, sizeof(tbuffer), "%s.DB0", ubik_dbase->pathName);
612 #ifdef AFS_NT40_ENV
613     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.OLD", ubik_dbase->pathName);
614     code = unlink(pbuffer);
615     if (!code)
616         code = rename(tbuffer, pbuffer);
617     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.TMP", ubik_dbase->pathName);
618 #endif
619     if (!code) 
620         code = rename(pbuffer, tbuffer);
621     if (!code) 
622         code = (*ubik_dbase->open) (ubik_dbase, 0);
623     if (!code)
624 #endif
625     code = (*ubik_dbase->setlabel) (dbase, file, avers);
626 #ifndef OLD_URECOVERY
627 #ifdef AFS_NT40_ENV
628     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.OLD", ubik_dbase->pathName);
629     unlink(pbuffer);
630 #endif
631 #endif
632     memcpy(&ubik_dbase->version, avers, sizeof(struct ubik_version));
633     udisk_Invalidate(dbase, file);      /* new dbase, flush disk buffers */
634 #ifdef AFS_PTHREAD_ENV
635     assert(pthread_cond_broadcast(&dbase->version_cond) == 0);
636 #else
637     LWP_NoYieldSignal(&dbase->version);
638 #endif
639     DBRELE(dbase);
640   failed:
641     if (code) {
642 #ifndef OLD_URECOVERY
643         unlink(pbuffer);
644         /* Failed to sync. Allow reads again for now. */
645         if (dbase != NULL) {
646             tversion.epoch = epoch;
647             (*dbase->setlabel) (dbase, file, &tversion);
648         }
649 #endif
650         ubik_print
651             ("Ubik: Synchronize database with server %s failed (error = %d)\n",
652              afs_inet_ntoa(otherHost), code);
653     } else {
654         ubik_print("Ubik: Synchronize database completed\n");
655     }
656     return code;
657 }
658
659
660 afs_int32
661 SDISK_Probe(rxcall)
662      register struct rx_call *rxcall;
663 {
664     return 0;
665 }
666
667 /*!
668  * \brief Update remote machines addresses in my server list
669  *
670  * Send back my addresses to caller of this RPC
671  * \return zero on success, else 1.
672  */
673 afs_int32
674 SDISK_UpdateInterfaceAddr(rxcall, inAddr, outAddr)
675      register struct rx_call *rxcall;
676      UbikInterfaceAddr *inAddr, *outAddr;
677 {
678     struct ubik_server *ts, *tmp;
679     afs_uint32 remoteAddr;      /* in net byte order */
680     int i, j, found = 0, probableMatch = 0;
681
682     /* copy the output parameters */
683     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR; i++)
684         outAddr->hostAddr[i] = ntohl(ubik_host[i]);
685
686     remoteAddr = htonl(inAddr->hostAddr[0]);
687     for (ts = ubik_servers; ts; ts = ts->next)
688         if (ts->addr[0] == remoteAddr) {        /* both in net byte order */
689             probableMatch = 1;
690             break;
691         }
692
693     if (probableMatch) {
694         /* verify that all addresses in the incoming RPC are
695          ** not part of other server entries in my CellServDB
696          */
697         for (i = 0; !found && (i < UBIK_MAX_INTERFACE_ADDR)
698              && inAddr->hostAddr[i]; i++) {
699             remoteAddr = htonl(inAddr->hostAddr[i]);
700             for (tmp = ubik_servers; (!found && tmp); tmp = tmp->next) {
701                 if (ts == tmp)  /* this is my server */
702                     continue;
703                 for (j = 0; (j < UBIK_MAX_INTERFACE_ADDR) && tmp->addr[j];
704                      j++)
705                     if (remoteAddr == tmp->addr[j]) {
706                         found = 1;
707                         break;
708                     }
709             }
710         }
711     }
712
713     /* if (probableMatch) */
714     /* inconsistent addresses in CellServDB */
715     if (!probableMatch || found) {
716         ubik_print("Inconsistent Cell Info from server: ");
717         for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
718             ubik_print("%s ", afs_inet_ntoa(htonl(inAddr->hostAddr[i])));
719         ubik_print("\n");
720         fflush(stdout);
721         fflush(stderr);
722         printServerInfo();
723         return UBADHOST;
724     }
725
726     /* update our data structures */
727     for (i = 1; i < UBIK_MAX_INTERFACE_ADDR; i++)
728         ts->addr[i] = htonl(inAddr->hostAddr[i]);
729
730     ubik_print("ubik: A Remote Server has addresses: ");
731     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && ts->addr[i]; i++)
732         ubik_print("%s ", afs_inet_ntoa(ts->addr[i]));
733     ubik_print("\n");
734
735     return 0;
736 }
737
738 void
739 printServerInfo()
740 {
741     struct ubik_server *ts;
742     int i, j = 1;
743
744     ubik_print("Local CellServDB:");
745     for (ts = ubik_servers; ts; ts = ts->next, j++) {
746         ubik_print("Server %d: ", j);
747         for (i = 0; (i < UBIK_MAX_INTERFACE_ADDR) && ts->addr[i]; i++)
748             ubik_print("%s ", afs_inet_ntoa(ts->addr[i]));
749     }
750     ubik_print("\n");
751 }
752
753 afs_int32
754 SDISK_SetVersion(rxcall, atid, oldversionp, newversionp)
755      struct rx_call *rxcall;
756      struct ubik_tid *atid;
757      struct ubik_version *oldversionp;
758      struct ubik_version *newversionp;
759 {
760     afs_int32 code = 0;
761     struct ubik_dbase *dbase;
762
763     if ((code = ubik_CheckAuth(rxcall))) {
764         return (code);
765     }
766
767     if (!ubik_currentTrans) {
768         return USYNC;
769     }
770     /* sanity check to make sure only write trans appear here */
771     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
772         return UBADTYPE;
773     }
774
775     /* Should not get this for the sync site */
776     if (ubeacon_AmSyncSite()) {
777         return UDEADLOCK;
778     }
779
780     dbase = ubik_currentTrans->dbase;
781     DBHOLD(dbase);
782     urecovery_CheckTid(atid);
783     if (!ubik_currentTrans) {
784         DBRELE(dbase);
785         return USYNC;
786     }
787
788     /* Set the label if its version matches the sync-site's */
789     if ((oldversionp->epoch == ubik_dbVersion.epoch)
790         && (oldversionp->counter == ubik_dbVersion.counter)) {
791         code = (*dbase->setlabel) (ubik_dbase, 0, newversionp);
792         if (!code) {
793             ubik_dbase->version = *newversionp;
794             ubik_dbVersion = *newversionp;
795         }
796     } else {
797         code = USYNC;
798     }
799
800     DBRELE(dbase);
801     return code;
802 }