kill-ubik-pthread-env-20080718
[openafs.git] / src / ubik / remote.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 RCSID
14     ("$Header$");
15
16 #include <sys/types.h>
17 #ifdef AFS_NT40_ENV
18 #include <winsock2.h>
19 #include <fcntl.h>
20 #else
21 #include <sys/file.h>
22 #include <netinet/in.h>
23 #endif
24 #include <string.h>
25 #include <lock.h>
26 #include <rx/xdr.h>
27 #include <rx/rx.h>
28 #include <errno.h>
29 #include <afs/afsutil.h>
30
31 #define UBIK_INTERNALS
32 #include "ubik.h"
33 #include "ubik_int.h"
34 int (*ubik_CheckRXSecurityProc) ();
35 char *ubik_CheckRXSecurityRock;
36 void printServerInfo();
37
38 /* routines for handling requests remotely-submitted by the sync site.  These are
39     only write transactions (we don't propagate read trans), and there is at most one
40     write transaction extant at any one time.
41 */
42
43 struct ubik_trans *ubik_currentTrans = 0;
44
45
46 ubik_CheckAuth(acall)
47      register struct rx_call *acall;
48 {
49     register afs_int32 code;
50     if (ubik_CheckRXSecurityProc) {
51         code = (*ubik_CheckRXSecurityProc) (ubik_CheckRXSecurityRock, acall);
52         return code;
53     } else
54         return 0;
55 }
56
57 /* the rest of these guys handle remote execution of write
58  * transactions: this is the code executed on the other servers when a
59  * sync site is executing a write transaction.
60  */
61 afs_int32
62 SDISK_Begin(rxcall, atid)
63      register struct rx_call *rxcall;
64      struct ubik_tid *atid;
65 {
66     register afs_int32 code;
67
68     if ((code = ubik_CheckAuth(rxcall))) {
69         return code;
70     }
71     DBHOLD(ubik_dbase);
72     urecovery_CheckTid(atid);
73     if (ubik_currentTrans) {
74         /* If the thread is not waiting for lock - ok to end it */
75 #if !defined(UBIK_PAUSE)
76         if (ubik_currentTrans->locktype != LOCKWAIT) {
77 #endif /* UBIK_PAUSE */
78             udisk_end(ubik_currentTrans);
79 #if !defined(UBIK_PAUSE)
80         }
81 #endif /* UBIK_PAUSE */
82         ubik_currentTrans = (struct ubik_trans *)0;
83     }
84     code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
85     if (!code && ubik_currentTrans) {
86         /* label this trans with the right trans id */
87         ubik_currentTrans->tid.epoch = atid->epoch;
88         ubik_currentTrans->tid.counter = atid->counter;
89     }
90     DBRELE(ubik_dbase);
91     return code;
92 }
93
94
95 afs_int32
96 SDISK_Commit(rxcall, atid)
97      register struct rx_call *rxcall;
98      struct ubik_tid *atid;
99 {
100     register afs_int32 code;
101     register struct ubik_dbase *dbase;
102
103     if ((code = ubik_CheckAuth(rxcall))) {
104         return code;
105     }
106
107     if (!ubik_currentTrans) {
108         return USYNC;
109     }
110     /*
111      * sanity check to make sure only write trans appear here
112      */
113     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
114         return UBADTYPE;
115     }
116
117     dbase = ubik_currentTrans->dbase;
118     DBHOLD(dbase);
119     urecovery_CheckTid(atid);
120     if (!ubik_currentTrans) {
121         DBRELE(dbase);
122         return USYNC;
123     }
124
125     code = udisk_commit(ubik_currentTrans);
126     if (code == 0) {
127         /* sync site should now match */
128         ubik_dbVersion = ubik_dbase->version;
129     }
130     DBRELE(dbase);
131     return code;
132 }
133
134 afs_int32
135 SDISK_ReleaseLocks(rxcall, atid)
136      register struct rx_call *rxcall;
137      struct ubik_tid *atid;
138 {
139     register struct ubik_dbase *dbase;
140     register afs_int32 code;
141
142     if ((code = ubik_CheckAuth(rxcall))) {
143         return code;
144     }
145
146     if (!ubik_currentTrans) {
147         return USYNC;
148     }
149     /* sanity check to make sure only write trans appear here */
150     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
151         return UBADTYPE;
152     }
153
154     dbase = ubik_currentTrans->dbase;
155     DBHOLD(dbase);
156     urecovery_CheckTid(atid);
157     if (!ubik_currentTrans) {
158         DBRELE(dbase);
159         return USYNC;
160     }
161
162     /* If the thread is not waiting for lock - ok to end it */
163 #if !defined(UBIK_PAUSE)
164     if (ubik_currentTrans->locktype != LOCKWAIT) {
165 #endif /* UBIK_PAUSE */
166         udisk_end(ubik_currentTrans);
167 #if !defined(UBIK_PAUSE)
168     }
169 #endif /* UBIK_PAUSE */
170     ubik_currentTrans = (struct ubik_trans *)0;
171     DBRELE(dbase);
172     return 0;
173 }
174
175 afs_int32
176 SDISK_Abort(rxcall, atid)
177      register struct rx_call *rxcall;
178      struct ubik_tid *atid;
179 {
180     register afs_int32 code;
181     register struct ubik_dbase *dbase;
182
183     if ((code = ubik_CheckAuth(rxcall))) {
184         return code;
185     }
186
187     if (!ubik_currentTrans) {
188         return USYNC;
189     }
190     /* sanity check to make sure only write trans appear here  */
191     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
192         return UBADTYPE;
193     }
194
195     dbase = ubik_currentTrans->dbase;
196     DBHOLD(dbase);
197     urecovery_CheckTid(atid);
198     if (!ubik_currentTrans) {
199         DBRELE(dbase);
200         return USYNC;
201     }
202
203     code = udisk_abort(ubik_currentTrans);
204     /* If the thread is not waiting for lock - ok to end it */
205 #if !defined(UBIK_PAUSE)
206     if (ubik_currentTrans->locktype != LOCKWAIT) {
207 #endif /* UBIK_PAUSE */
208         udisk_end(ubik_currentTrans);
209 #if !defined(UBIK_PAUSE)
210     }
211 #endif /* UBIK_PAUSE */
212     ubik_currentTrans = (struct ubik_trans *)0;
213     DBRELE(dbase);
214     return code;
215 }
216
217 afs_int32
218 SDISK_Lock(rxcall, atid, afile, apos, alen, atype)
219      register struct rx_call *rxcall;
220      struct ubik_tid *atid;
221      afs_int32 afile, apos, alen, atype;        /* apos and alen are not used */
222 {
223     register afs_int32 code;
224     register struct ubik_dbase *dbase;
225     struct ubik_trans *ubik_thisTrans;
226
227     if ((code = ubik_CheckAuth(rxcall))) {
228         return code;
229     }
230     if (!ubik_currentTrans) {
231         return USYNC;
232     }
233     /* sanity check to make sure only write trans appear here */
234     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
235         return UBADTYPE;
236     }
237     if (alen != 1) {
238         return UBADLOCK;
239     }
240     dbase = ubik_currentTrans->dbase;
241     DBHOLD(dbase);
242     urecovery_CheckTid(atid);
243     if (!ubik_currentTrans) {
244         DBRELE(dbase);
245         return USYNC;
246     }
247
248     ubik_thisTrans = ubik_currentTrans;
249     code = ulock_getLock(ubik_currentTrans, atype, 1);
250
251     /* While waiting, the transaction may have been ended/
252      * aborted from under us (urecovery_CheckTid). In that
253      * case, end the transaction here.
254      */
255     if (!code && (ubik_currentTrans != ubik_thisTrans)) {
256         udisk_end(ubik_thisTrans);
257         code = USYNC;
258     }
259
260     DBRELE(dbase);
261     return code;
262 }
263
264 /* Write a vector of data */
265 afs_int32
266 SDISK_WriteV(rxcall, atid, io_vector, io_buffer)
267      register struct rx_call *rxcall;
268      struct ubik_tid *atid;
269      iovec_wrt *io_vector;
270      iovec_buf *io_buffer;
271 {
272     afs_int32 code, i, offset;
273     struct ubik_dbase *dbase;
274     struct ubik_iovec *iovec;
275     char *iobuf;
276
277     if ((code = ubik_CheckAuth(rxcall))) {
278         return code;
279     }
280     if (!ubik_currentTrans) {
281         return USYNC;
282     }
283     /* sanity check to make sure only write trans appear here */
284     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
285         return UBADTYPE;
286     }
287
288     dbase = ubik_currentTrans->dbase;
289     DBHOLD(dbase);
290     urecovery_CheckTid(atid);
291     if (!ubik_currentTrans) {
292         DBRELE(dbase);
293         return USYNC;
294     }
295
296     iovec = (struct ubik_iovec *)io_vector->iovec_wrt_val;
297     iobuf = (char *)io_buffer->iovec_buf_val;
298     for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
299         /* Sanity check for going off end of buffer */
300         if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
301             code = UINTERNAL;
302         } else {
303             code =
304                 udisk_write(ubik_currentTrans, iovec[i].file, &iobuf[offset],
305                             iovec[i].position, iovec[i].length);
306         }
307         if (code)
308             break;
309
310         offset += iovec[i].length;
311     }
312
313     DBRELE(dbase);
314     return code;
315 }
316
317 afs_int32
318 SDISK_Write(rxcall, atid, afile, apos, adata)
319      register struct rx_call *rxcall;
320      struct ubik_tid *atid;
321      afs_int32 afile, apos;
322      register bulkdata *adata;
323 {
324     register afs_int32 code;
325     register struct ubik_dbase *dbase;
326
327     if ((code = ubik_CheckAuth(rxcall))) {
328         return code;
329     }
330     if (!ubik_currentTrans) {
331         return USYNC;
332     }
333     /* sanity check to make sure only write trans appear here */
334     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
335         return UBADTYPE;
336     }
337
338     dbase = ubik_currentTrans->dbase;
339     DBHOLD(dbase);
340     urecovery_CheckTid(atid);
341     if (!ubik_currentTrans) {
342         DBRELE(dbase);
343         return USYNC;
344     }
345     code =
346         udisk_write(ubik_currentTrans, afile, adata->bulkdata_val, apos,
347                     adata->bulkdata_len);
348     DBRELE(dbase);
349     return code;
350 }
351
352 afs_int32
353 SDISK_Truncate(rxcall, atid, afile, alen)
354      register struct rx_call *rxcall;
355      struct ubik_tid *atid;
356      afs_int32 afile;
357      afs_int32 alen;
358 {
359     register afs_int32 code;
360     register struct ubik_dbase *dbase;
361
362     if ((code = ubik_CheckAuth(rxcall))) {
363         return code;
364     }
365     if (!ubik_currentTrans) {
366         return USYNC;
367     }
368     /* sanity check to make sure only write trans appear here */
369     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
370         return UBADTYPE;
371     }
372
373     dbase = ubik_currentTrans->dbase;
374     DBHOLD(dbase);
375     urecovery_CheckTid(atid);
376     if (!ubik_currentTrans) {
377         DBRELE(dbase);
378         return USYNC;
379     }
380     code = udisk_truncate(ubik_currentTrans, afile, alen);
381     DBRELE(dbase);
382     return code;
383 }
384
385 afs_int32
386 SDISK_GetVersion(rxcall, aversion)
387      register struct rx_call *rxcall;
388      register struct ubik_version *aversion;
389 {
390     register afs_int32 code;
391
392     if ((code = ubik_CheckAuth(rxcall))) {
393         return code;
394     }
395
396     /*
397      * If we are the sync site, recovery shouldn't be running on any
398      * other site. We shouldn't be getting this RPC as long as we are
399      * the sync site.  To prevent any unforseen activity, we should
400      * reject this RPC until we have recognized that we are not the
401      * sync site anymore, and/or if we have any pending WRITE
402      * transactions that have to complete. This way we can be assured
403      * that this RPC would not block any pending transactions that
404      * should either fail or pass. If we have recognized the fact that
405      * we are not the sync site any more, all write transactions would
406      * fail with UNOQUORUM anyway.
407      */
408     if (ubeacon_AmSyncSite()) {
409         return UDEADLOCK;
410     }
411
412     DBHOLD(ubik_dbase);
413     code = (*ubik_dbase->getlabel) (ubik_dbase, 0, aversion);
414     DBRELE(ubik_dbase);
415     if (code) {
416         /* tell other side there's no dbase */
417         aversion->epoch = 0;
418         aversion->counter = 0;
419     }
420     return 0;
421 }
422
423 afs_int32
424 SDISK_GetFile(rxcall, file, version)
425      register struct rx_call *rxcall;
426      register afs_int32 file;
427      struct ubik_version *version;
428 {
429     register afs_int32 code;
430     register struct ubik_dbase *dbase;
431     register afs_int32 offset;
432     struct ubik_stat ubikstat;
433     char tbuffer[256];
434     afs_int32 tlen;
435     afs_int32 length;
436
437     if ((code = ubik_CheckAuth(rxcall))) {
438         return code;
439     }
440 /* temporarily disabled because it causes problems for migration tool.  Hey, it's just
441  * a sanity check, anyway. 
442     if (ubeacon_AmSyncSite()) {
443       return UDEADLOCK;
444     }
445 */
446     dbase = ubik_dbase;
447     DBHOLD(dbase);
448     code = (*dbase->stat) (dbase, file, &ubikstat);
449     if (code < 0) {
450         DBRELE(dbase);
451         return code;
452     }
453     length = ubikstat.size;
454     tlen = htonl(length);
455     code = rx_Write(rxcall, (char *)&tlen, sizeof(afs_int32));
456     if (code != sizeof(afs_int32)) {
457         DBRELE(dbase);
458         ubik_dprint("Rx-write length error=%d\n", code);
459         return BULK_ERROR;
460     }
461     offset = 0;
462     while (length > 0) {
463         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
464         code = (*dbase->read) (dbase, file, tbuffer, offset, tlen);
465         if (code != tlen) {
466             DBRELE(dbase);
467             ubik_dprint("read failed error=%d\n", code);
468             return UIOERROR;
469         }
470         code = rx_Write(rxcall, tbuffer, tlen);
471         if (code != tlen) {
472             DBRELE(dbase);
473             ubik_dprint("Rx-write length error=%d\n", code);
474             return BULK_ERROR;
475         }
476         length -= tlen;
477         offset += tlen;
478     }
479     code = (*dbase->getlabel) (dbase, file, version);   /* return the dbase, too */
480     DBRELE(dbase);
481     return code;
482 }
483
484 afs_int32
485 SDISK_SendFile(rxcall, file, length, avers)
486      register struct rx_call *rxcall;
487      afs_int32 file;
488      afs_int32 length;
489      struct ubik_version *avers;
490 {
491     register afs_int32 code;
492     register struct ubik_dbase *dbase;
493     char tbuffer[1024];
494     afs_int32 offset;
495     struct ubik_version tversion;
496     register int tlen;
497     struct rx_peer *tpeer;
498     struct rx_connection *tconn;
499     afs_uint32 otherHost;
500 #ifndef OLD_URECOVERY
501     char pbuffer[1028];
502     int flen, fd = -1;
503     afs_int32 epoch, pass;
504 #endif
505
506     /* send the file back to the requester */
507
508     if ((code = ubik_CheckAuth(rxcall))) {
509         goto failed;
510     }
511
512     /* next, we do a sanity check to see if the guy sending us the database is
513      * the guy we think is the sync site.  It turns out that we might not have
514      * decided yet that someone's the sync site, but they could have enough
515      * votes from others to be sync site anyway, and could send us the database
516      * in advance of getting our votes.  This is fine, what we're really trying
517      * to check is that some authenticated bogon isn't sending a random database
518      * into another configuration.  This could happen on a bad configuration
519      * screwup.  Thus, we only object if we're sure we know who the sync site
520      * is, and it ain't the guy talking to us.
521      */
522     offset = uvote_GetSyncSite();
523     tconn = rx_ConnectionOf(rxcall);
524     tpeer = rx_PeerOf(tconn);
525     otherHost = ubikGetPrimaryInterfaceAddr(rx_HostOf(tpeer));
526     if (offset && offset != otherHost) {
527         /* we *know* this is the wrong guy */
528         code = USYNC;
529         goto failed;
530     }
531
532     dbase = ubik_dbase;
533     DBHOLD(dbase);
534
535     /* abort any active trans that may scribble over the database */
536     urecovery_AbortAll(dbase);
537
538     ubik_print("Ubik: Synchronize database with server %s\n",
539                afs_inet_ntoa(otherHost));
540
541     offset = 0;
542 #ifdef OLD_URECOVERY
543     (*dbase->truncate) (dbase, file, 0);        /* truncate first */
544     tversion.counter = 0;
545 #else
546     epoch =
547 #endif
548     tversion.epoch = 0;         /* start off by labelling in-transit db as invalid */
549     (*dbase->setlabel) (dbase, file, &tversion);        /* setlabel does sync */
550 #ifndef OLD_URECOVERY
551     flen = length;
552     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.TMP", ubik_dbase->pathName);
553     fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
554     if (fd < 0) {
555         code = errno;
556         goto failed;
557     }
558     code = lseek(fd, HDRSIZE, 0);
559     if (code != HDRSIZE) {
560         close(fd);
561         goto failed;
562     }
563 #else
564     pass = 0;
565 #endif
566     memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
567     while (length > 0) {
568         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
569 #if !defined(OLD_URECOVERY) && !defined(AFS_PTHREAD_ENV)
570         if (pass % 4 == 0)
571             IOMGR_Poll();
572 #endif
573         code = rx_Read(rxcall, tbuffer, tlen);
574         if (code != tlen) {
575             DBRELE(dbase);
576             ubik_dprint("Rx-read length error=%d\n", code);
577             code = BULK_ERROR;
578             close(fd);
579             goto failed;
580         }
581 #ifdef OLD_URECOVERY
582         code = (*dbase->write) (dbase, file, tbuffer, offset, tlen);
583 #else
584         code = write(fd, tbuffer, tlen);
585         pass++;
586 #endif
587         if (code != tlen) {
588             DBRELE(dbase);
589             ubik_dprint("write failed error=%d\n", code);
590             code = UIOERROR;
591             close(fd);
592             goto failed;
593         }
594         offset += tlen;
595         length -= tlen;
596     }
597 #ifndef OLD_URECOVERY
598     code = close(fd);
599     if (code)
600         goto failed;
601 #endif     
602
603     /* sync data first, then write label and resync (resync done by setlabel call).
604      * This way, good label is only on good database. */
605 #ifdef OLD_URECOVERY
606     (*ubik_dbase->sync) (dbase, file);
607 #else
608     afs_snprintf(tbuffer, sizeof(tbuffer), "%s.DB0", ubik_dbase->pathName);
609 #ifdef AFS_NT40_ENV
610     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.OLD", ubik_dbase->pathName);
611     code = unlink(pbuffer);
612     if (!code)
613         code = rename(tbuffer, pbuffer);
614     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.TMP", ubik_dbase->pathName);
615 #endif
616     if (!code) 
617         code = rename(pbuffer, tbuffer);
618     if (!code) 
619         code = (*ubik_dbase->open) (ubik_dbase, 0);
620     if (!code)
621 #endif
622     code = (*ubik_dbase->setlabel) (dbase, file, avers);
623 #ifndef OLD_URECOVERY
624 #ifdef AFS_NT40_ENV
625     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.OLD", ubik_dbase->pathName);
626     unlink(pbuffer);
627 #endif
628 #endif
629     memcpy(&ubik_dbase->version, avers, sizeof(struct ubik_version));
630     udisk_Invalidate(dbase, file);      /* new dbase, flush disk buffers */
631 #ifdef AFS_PTHREAD_ENV
632     assert(pthread_cond_broadcast(&dbase->version_cond) == 0);
633 #else
634     LWP_NoYieldSignal(&dbase->version);
635 #endif
636     DBRELE(dbase);
637   failed:
638     if (code) {
639 #ifndef OLD_URECOVERY
640         unlink(pbuffer);
641         /* Failed to sync. Allow reads again for now. */
642         tversion.epoch = epoch;
643         (*dbase->setlabel) (dbase, file, &tversion);
644 #endif
645         ubik_print
646             ("Ubik: Synchronize database with server %s failed (error = %d)\n",
647              afs_inet_ntoa(otherHost), code);
648     } else {
649         ubik_print("Ubik: Synchronize database completed\n");
650     }
651     return code;
652 }
653
654
655 afs_int32
656 SDISK_Probe(rxcall)
657      register struct rx_call *rxcall;
658 {
659     return 0;
660 }
661
662 /*
663 * Update remote machines addresses in my server list
664 * Send back my addresses to caller of this RPC
665 * Returns zero on success, else 1.
666 */
667 afs_int32
668 SDISK_UpdateInterfaceAddr(rxcall, inAddr, outAddr)
669      register struct rx_call *rxcall;
670      UbikInterfaceAddr *inAddr, *outAddr;
671 {
672     struct ubik_server *ts, *tmp;
673     afs_uint32 remoteAddr;      /* in net byte order */
674     int i, j, found = 0, probableMatch = 0;
675
676     /* copy the output parameters */
677     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR; i++)
678         outAddr->hostAddr[i] = ntohl(ubik_host[i]);
679
680     remoteAddr = htonl(inAddr->hostAddr[0]);
681     for (ts = ubik_servers; ts; ts = ts->next)
682         if (ts->addr[0] == remoteAddr) {        /* both in net byte order */
683             probableMatch = 1;
684             break;
685         }
686
687     if (probableMatch) {
688         /* verify that all addresses in the incoming RPC are
689          ** not part of other server entries in my CellServDB
690          */
691         for (i = 0; !found && (i < UBIK_MAX_INTERFACE_ADDR)
692              && inAddr->hostAddr[i]; i++) {
693             remoteAddr = htonl(inAddr->hostAddr[i]);
694             for (tmp = ubik_servers; (!found && tmp); tmp = tmp->next) {
695                 if (ts == tmp)  /* this is my server */
696                     continue;
697                 for (j = 0; (j < UBIK_MAX_INTERFACE_ADDR) && tmp->addr[j];
698                      j++)
699                     if (remoteAddr == tmp->addr[j]) {
700                         found = 1;
701                         break;
702                     }
703             }
704         }
705     }
706
707     /* if (probableMatch) */
708     /* inconsistent addresses in CellServDB */
709     if (!probableMatch || found) {
710         ubik_print("Inconsistent Cell Info from server: ");
711         for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
712             ubik_print("%s ", afs_inet_ntoa(htonl(inAddr->hostAddr[i])));
713         ubik_print("\n");
714         fflush(stdout);
715         fflush(stderr);
716         printServerInfo();
717         return UBADHOST;
718     }
719
720     /* update our data structures */
721     for (i = 1; i < UBIK_MAX_INTERFACE_ADDR; i++)
722         ts->addr[i] = htonl(inAddr->hostAddr[i]);
723
724     ubik_print("ubik: A Remote Server has addresses: ");
725     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && ts->addr[i]; i++)
726         ubik_print("%s ", afs_inet_ntoa(ts->addr[i]));
727     ubik_print("\n");
728
729     return 0;
730 }
731
732 void
733 printServerInfo()
734 {
735     struct ubik_server *ts;
736     int i, j = 1;
737
738     ubik_print("Local CellServDB:");
739     for (ts = ubik_servers; ts; ts = ts->next, j++) {
740         ubik_print("Server %d: ", j);
741         for (i = 0; (i < UBIK_MAX_INTERFACE_ADDR) && ts->addr[i]; i++)
742             ubik_print("%s ", afs_inet_ntoa(ts->addr[i]));
743     }
744     ubik_print("\n");
745 }
746
747 afs_int32
748 SDISK_SetVersion(rxcall, atid, oldversionp, newversionp)
749      struct rx_call *rxcall;
750      struct ubik_tid *atid;
751      struct ubik_version *oldversionp;
752      struct ubik_version *newversionp;
753 {
754     afs_int32 code = 0;
755     struct ubik_dbase *dbase;
756
757     if ((code = ubik_CheckAuth(rxcall))) {
758         return (code);
759     }
760
761     if (!ubik_currentTrans) {
762         return USYNC;
763     }
764     /* sanity check to make sure only write trans appear here */
765     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
766         return UBADTYPE;
767     }
768
769     /* Should not get this for the sync site */
770     if (ubeacon_AmSyncSite()) {
771         return UDEADLOCK;
772     }
773
774     dbase = ubik_currentTrans->dbase;
775     DBHOLD(dbase);
776     urecovery_CheckTid(atid);
777     if (!ubik_currentTrans) {
778         DBRELE(dbase);
779         return USYNC;
780     }
781
782     /* Set the label if its version matches the sync-site's */
783     if ((oldversionp->epoch == ubik_dbVersion.epoch)
784         && (oldversionp->counter == ubik_dbVersion.counter)) {
785         code = (*dbase->setlabel) (ubik_dbase, 0, newversionp);
786         if (!code) {
787             ubik_dbase->version = *newversionp;
788             ubik_dbVersion = *newversionp;
789         }
790     } else {
791         code = USYNC;
792     }
793
794     DBRELE(dbase);
795     return code;
796 }