ubik-avoid-truncating-live-database-during-recovery-20071210
[openafs.git] / src / ubik / remote.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 RCSID
14     ("$Header$");
15
16 #include <sys/types.h>
17 #ifdef AFS_NT40_ENV
18 #include <winsock2.h>
19 #include <fcntl.h>
20 #else
21 #include <sys/file.h>
22 #include <netinet/in.h>
23 #endif
24 #include <string.h>
25 #include <lock.h>
26 #include <rx/xdr.h>
27 #include <rx/rx.h>
28 #include <errno.h>
29 #include <afs/afsutil.h>
30
31 #define UBIK_INTERNALS
32 #include "ubik.h"
33 #include "ubik_int.h"
34 int (*ubik_CheckRXSecurityProc) ();
35 char *ubik_CheckRXSecurityRock;
36 void printServerInfo();
37
38 /* routines for handling requests remotely-submitted by the sync site.  These are
39     only write transactions (we don't propagate read trans), and there is at most one
40     write transaction extant at any one time.
41 */
42
43 struct ubik_trans *ubik_currentTrans = 0;
44
45
46 ubik_CheckAuth(acall)
47      register struct rx_call *acall;
48 {
49     register afs_int32 code;
50     if (ubik_CheckRXSecurityProc) {
51         code = (*ubik_CheckRXSecurityProc) (ubik_CheckRXSecurityRock, acall);
52         return code;
53     } else
54         return 0;
55 }
56
57 /* the rest of these guys handle remote execution of write
58  * transactions: this is the code executed on the other servers when a
59  * sync site is executing a write transaction.
60  */
61 afs_int32
62 SDISK_Begin(rxcall, atid)
63      register struct rx_call *rxcall;
64      struct ubik_tid *atid;
65 {
66     register afs_int32 code;
67
68     if ((code = ubik_CheckAuth(rxcall))) {
69         return code;
70     }
71     DBHOLD(ubik_dbase);
72     urecovery_CheckTid(atid);
73     if (ubik_currentTrans) {
74         /* If the thread is not waiting for lock - ok to end it */
75 #if !defined(UBIK_PAUSE)
76         if (ubik_currentTrans->locktype != LOCKWAIT) {
77 #endif /* UBIK_PAUSE */
78             udisk_end(ubik_currentTrans);
79 #if !defined(UBIK_PAUSE)
80         }
81 #endif /* UBIK_PAUSE */
82         ubik_currentTrans = (struct ubik_trans *)0;
83     }
84     code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
85     if (!code && ubik_currentTrans) {
86         /* label this trans with the right trans id */
87         ubik_currentTrans->tid.epoch = atid->epoch;
88         ubik_currentTrans->tid.counter = atid->counter;
89     }
90     DBRELE(ubik_dbase);
91     return code;
92 }
93
94
95 afs_int32
96 SDISK_Commit(rxcall, atid)
97      register struct rx_call *rxcall;
98      struct ubik_tid *atid;
99 {
100     register afs_int32 code;
101     register struct ubik_dbase *dbase;
102
103     if ((code = ubik_CheckAuth(rxcall))) {
104         return code;
105     }
106
107     if (!ubik_currentTrans) {
108         return USYNC;
109     }
110     /*
111      * sanity check to make sure only write trans appear here
112      */
113     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
114         return UBADTYPE;
115     }
116
117     dbase = ubik_currentTrans->dbase;
118     DBHOLD(dbase);
119     urecovery_CheckTid(atid);
120     if (!ubik_currentTrans) {
121         DBRELE(dbase);
122         return USYNC;
123     }
124
125     code = udisk_commit(ubik_currentTrans);
126     if (code == 0) {
127         /* sync site should now match */
128         ubik_dbVersion = ubik_dbase->version;
129     }
130     DBRELE(dbase);
131     return code;
132 }
133
134 afs_int32
135 SDISK_ReleaseLocks(rxcall, atid)
136      register struct rx_call *rxcall;
137      struct ubik_tid *atid;
138 {
139     register struct ubik_dbase *dbase;
140     register afs_int32 code;
141
142     if ((code = ubik_CheckAuth(rxcall))) {
143         return code;
144     }
145
146     if (!ubik_currentTrans) {
147         return USYNC;
148     }
149     /* sanity check to make sure only write trans appear here */
150     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
151         return UBADTYPE;
152     }
153
154     dbase = ubik_currentTrans->dbase;
155     DBHOLD(dbase);
156     urecovery_CheckTid(atid);
157     if (!ubik_currentTrans) {
158         DBRELE(dbase);
159         return USYNC;
160     }
161
162     /* If the thread is not waiting for lock - ok to end it */
163 #if !defined(UBIK_PAUSE)
164     if (ubik_currentTrans->locktype != LOCKWAIT) {
165 #endif /* UBIK_PAUSE */
166         udisk_end(ubik_currentTrans);
167 #if !defined(UBIK_PAUSE)
168     }
169 #endif /* UBIK_PAUSE */
170     ubik_currentTrans = (struct ubik_trans *)0;
171     DBRELE(dbase);
172     return 0;
173 }
174
175 afs_int32
176 SDISK_Abort(rxcall, atid)
177      register struct rx_call *rxcall;
178      struct ubik_tid *atid;
179 {
180     register afs_int32 code;
181     register struct ubik_dbase *dbase;
182
183     if ((code = ubik_CheckAuth(rxcall))) {
184         return code;
185     }
186
187     if (!ubik_currentTrans) {
188         return USYNC;
189     }
190     /* sanity check to make sure only write trans appear here  */
191     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
192         return UBADTYPE;
193     }
194
195     dbase = ubik_currentTrans->dbase;
196     DBHOLD(dbase);
197     urecovery_CheckTid(atid);
198     if (!ubik_currentTrans) {
199         DBRELE(dbase);
200         return USYNC;
201     }
202
203     code = udisk_abort(ubik_currentTrans);
204     /* If the thread is not waiting for lock - ok to end it */
205 #if !defined(UBIK_PAUSE)
206     if (ubik_currentTrans->locktype != LOCKWAIT) {
207 #endif /* UBIK_PAUSE */
208         udisk_end(ubik_currentTrans);
209 #if !defined(UBIK_PAUSE)
210     }
211 #endif /* UBIK_PAUSE */
212     ubik_currentTrans = (struct ubik_trans *)0;
213     DBRELE(dbase);
214     return code;
215 }
216
217 afs_int32
218 SDISK_Lock(rxcall, atid, afile, apos, alen, atype)
219      register struct rx_call *rxcall;
220      struct ubik_tid *atid;
221      afs_int32 afile, apos, alen, atype;        /* apos and alen are not used */
222 {
223     register afs_int32 code;
224     register struct ubik_dbase *dbase;
225     struct ubik_trans *ubik_thisTrans;
226
227     if ((code = ubik_CheckAuth(rxcall))) {
228         return code;
229     }
230     if (!ubik_currentTrans) {
231         return USYNC;
232     }
233     /* sanity check to make sure only write trans appear here */
234     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
235         return UBADTYPE;
236     }
237     if (alen != 1) {
238         return UBADLOCK;
239     }
240     dbase = ubik_currentTrans->dbase;
241     DBHOLD(dbase);
242     urecovery_CheckTid(atid);
243     if (!ubik_currentTrans) {
244         DBRELE(dbase);
245         return USYNC;
246     }
247
248     ubik_thisTrans = ubik_currentTrans;
249     code = ulock_getLock(ubik_currentTrans, atype, 1);
250
251     /* While waiting, the transaction may have been ended/
252      * aborted from under us (urecovery_CheckTid). In that
253      * case, end the transaction here.
254      */
255     if (!code && (ubik_currentTrans != ubik_thisTrans)) {
256         udisk_end(ubik_thisTrans);
257         code = USYNC;
258     }
259
260     DBRELE(dbase);
261     return code;
262 }
263
264 /* Write a vector of data */
265 afs_int32
266 SDISK_WriteV(rxcall, atid, io_vector, io_buffer)
267      register struct rx_call *rxcall;
268      struct ubik_tid *atid;
269      iovec_wrt *io_vector;
270      iovec_buf *io_buffer;
271 {
272     afs_int32 code, i, offset;
273     struct ubik_dbase *dbase;
274     struct ubik_iovec *iovec;
275     char *iobuf;
276
277     if ((code = ubik_CheckAuth(rxcall))) {
278         return code;
279     }
280     if (!ubik_currentTrans) {
281         return USYNC;
282     }
283     /* sanity check to make sure only write trans appear here */
284     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
285         return UBADTYPE;
286     }
287
288     dbase = ubik_currentTrans->dbase;
289     DBHOLD(dbase);
290     urecovery_CheckTid(atid);
291     if (!ubik_currentTrans) {
292         DBRELE(dbase);
293         return USYNC;
294     }
295
296     iovec = (struct ubik_iovec *)io_vector->iovec_wrt_val;
297     iobuf = (char *)io_buffer->iovec_buf_val;
298     for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
299         /* Sanity check for going off end of buffer */
300         if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
301             code = UINTERNAL;
302         } else {
303             code =
304                 udisk_write(ubik_currentTrans, iovec[i].file, &iobuf[offset],
305                             iovec[i].position, iovec[i].length);
306         }
307         if (code)
308             break;
309
310         offset += iovec[i].length;
311     }
312
313     DBRELE(dbase);
314     return code;
315 }
316
317 afs_int32
318 SDISK_Write(rxcall, atid, afile, apos, adata)
319      register struct rx_call *rxcall;
320      struct ubik_tid *atid;
321      afs_int32 afile, apos;
322      register bulkdata *adata;
323 {
324     register afs_int32 code;
325     register struct ubik_dbase *dbase;
326
327     if ((code = ubik_CheckAuth(rxcall))) {
328         return code;
329     }
330     if (!ubik_currentTrans) {
331         return USYNC;
332     }
333     /* sanity check to make sure only write trans appear here */
334     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
335         return UBADTYPE;
336     }
337
338     dbase = ubik_currentTrans->dbase;
339     DBHOLD(dbase);
340     urecovery_CheckTid(atid);
341     if (!ubik_currentTrans) {
342         DBRELE(dbase);
343         return USYNC;
344     }
345     code =
346         udisk_write(ubik_currentTrans, afile, adata->bulkdata_val, apos,
347                     adata->bulkdata_len);
348     DBRELE(dbase);
349     return code;
350 }
351
352 afs_int32
353 SDISK_Truncate(rxcall, atid, afile, alen)
354      register struct rx_call *rxcall;
355      struct ubik_tid *atid;
356      afs_int32 afile;
357      afs_int32 alen;
358 {
359     register afs_int32 code;
360     register struct ubik_dbase *dbase;
361
362     if ((code = ubik_CheckAuth(rxcall))) {
363         return code;
364     }
365     if (!ubik_currentTrans) {
366         return USYNC;
367     }
368     /* sanity check to make sure only write trans appear here */
369     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
370         return UBADTYPE;
371     }
372
373     dbase = ubik_currentTrans->dbase;
374     DBHOLD(dbase);
375     urecovery_CheckTid(atid);
376     if (!ubik_currentTrans) {
377         DBRELE(dbase);
378         return USYNC;
379     }
380     code = udisk_truncate(ubik_currentTrans, afile, alen);
381     DBRELE(dbase);
382     return code;
383 }
384
385 afs_int32
386 SDISK_GetVersion(rxcall, aversion)
387      register struct rx_call *rxcall;
388      register struct ubik_version *aversion;
389 {
390     register afs_int32 code;
391
392     if ((code = ubik_CheckAuth(rxcall))) {
393         return code;
394     }
395
396     /*
397      * If we are the sync site, recovery shouldn't be running on any
398      * other site. We shouldn't be getting this RPC as long as we are
399      * the sync site.  To prevent any unforseen activity, we should
400      * reject this RPC until we have recognized that we are not the
401      * sync site anymore, and/or if we have any pending WRITE
402      * transactions that have to complete. This way we can be assured
403      * that this RPC would not block any pending transactions that
404      * should either fail or pass. If we have recognized the fact that
405      * we are not the sync site any more, all write transactions would
406      * fail with UNOQUORUM anyway.
407      */
408     if (ubeacon_AmSyncSite()) {
409         return UDEADLOCK;
410     }
411
412     DBHOLD(ubik_dbase);
413     code = (*ubik_dbase->getlabel) (ubik_dbase, 0, aversion);
414     DBRELE(ubik_dbase);
415     if (code) {
416         /* tell other side there's no dbase */
417         aversion->epoch = 0;
418         aversion->counter = 0;
419     }
420     return 0;
421 }
422
423 afs_int32
424 SDISK_GetFile(rxcall, file, version)
425      register struct rx_call *rxcall;
426      register afs_int32 file;
427      struct ubik_version *version;
428 {
429     register afs_int32 code;
430     register struct ubik_dbase *dbase;
431     register afs_int32 offset;
432     struct ubik_stat ubikstat;
433     char tbuffer[256];
434     afs_int32 tlen;
435     afs_int32 length;
436
437     if ((code = ubik_CheckAuth(rxcall))) {
438         return code;
439     }
440 /* temporarily disabled because it causes problems for migration tool.  Hey, it's just
441  * a sanity check, anyway. 
442     if (ubeacon_AmSyncSite()) {
443       return UDEADLOCK;
444     }
445 */
446     dbase = ubik_dbase;
447     DBHOLD(dbase);
448     code = (*dbase->stat) (dbase, file, &ubikstat);
449     if (code < 0) {
450         DBRELE(dbase);
451         return code;
452     }
453     length = ubikstat.size;
454     tlen = htonl(length);
455     code = rx_Write(rxcall, (char *)&tlen, sizeof(afs_int32));
456     if (code != sizeof(afs_int32)) {
457         DBRELE(dbase);
458         ubik_dprint("Rx-write length error=%d\n", code);
459         return BULK_ERROR;
460     }
461     offset = 0;
462     while (length > 0) {
463         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
464         code = (*dbase->read) (dbase, file, tbuffer, offset, tlen);
465         if (code != tlen) {
466             DBRELE(dbase);
467             ubik_dprint("read failed error=%d\n", code);
468             return UIOERROR;
469         }
470         code = rx_Write(rxcall, tbuffer, tlen);
471         if (code != tlen) {
472             DBRELE(dbase);
473             ubik_dprint("Rx-write length error=%d\n", code);
474             return BULK_ERROR;
475         }
476         length -= tlen;
477         offset += tlen;
478     }
479     code = (*dbase->getlabel) (dbase, file, version);   /* return the dbase, too */
480     DBRELE(dbase);
481     return code;
482 }
483
484 afs_int32
485 SDISK_SendFile(rxcall, file, length, avers)
486      register struct rx_call *rxcall;
487      afs_int32 file;
488      afs_int32 length;
489      struct ubik_version *avers;
490 {
491     register afs_int32 code;
492     register struct ubik_dbase *dbase;
493     char tbuffer[1024];
494     afs_int32 offset;
495     struct ubik_version tversion;
496     register int tlen;
497     struct rx_peer *tpeer;
498     struct rx_connection *tconn;
499     afs_uint32 otherHost;
500 #ifndef OLD_URECOVERY
501     char pbuffer[1028];
502     int flen, fd = -1;
503 #endif
504
505     /* send the file back to the requester */
506
507     if ((code = ubik_CheckAuth(rxcall))) {
508         goto failed;
509     }
510
511     /* next, we do a sanity check to see if the guy sending us the database is
512      * the guy we think is the sync site.  It turns out that we might not have
513      * decided yet that someone's the sync site, but they could have enough
514      * votes from others to be sync site anyway, and could send us the database
515      * in advance of getting our votes.  This is fine, what we're really trying
516      * to check is that some authenticated bogon isn't sending a random database
517      * into another configuration.  This could happen on a bad configuration
518      * screwup.  Thus, we only object if we're sure we know who the sync site
519      * is, and it ain't the guy talking to us.
520      */
521     offset = uvote_GetSyncSite();
522     tconn = rx_ConnectionOf(rxcall);
523     tpeer = rx_PeerOf(tconn);
524     otherHost = ubikGetPrimaryInterfaceAddr(rx_HostOf(tpeer));
525     if (offset && offset != otherHost) {
526         /* we *know* this is the wrong guy */
527         code = USYNC;
528         goto failed;
529     }
530
531     dbase = ubik_dbase;
532     DBHOLD(dbase);
533
534     /* abort any active trans that may scribble over the database */
535     urecovery_AbortAll(dbase);
536
537     ubik_print("Ubik: Synchronize database with server %s\n",
538                afs_inet_ntoa(otherHost));
539
540     offset = 0;
541 #ifdef OLD_URECOVERY
542     (*dbase->truncate) (dbase, file, 0);        /* truncate first */
543     tversion.epoch = 0;         /* start off by labelling in-transit db as invalid */
544     tversion.counter = 0;
545     (*dbase->setlabel) (dbase, file, &tversion);        /* setlabel does sync */
546 #else
547     flen = length;
548     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.TMP", ubik_dbase->pathName);
549     fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
550     if (fd < 0) {
551         code = errno;
552         goto failed;
553     }
554     code = lseek(fd, HDRSIZE, 0);
555     if (code != HDRSIZE) {
556         close(fd);
557         goto failed;
558     }
559 #endif
560     memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
561     while (length > 0) {
562         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
563         code = rx_Read(rxcall, tbuffer, tlen);
564         if (code != tlen) {
565             DBRELE(dbase);
566             ubik_dprint("Rx-read length error=%d\n", code);
567             code = BULK_ERROR;
568             close(fd);
569             goto failed;
570         }
571 #ifdef OLD_URECOVERY
572         code = (*dbase->write) (dbase, file, tbuffer, offset, tlen);
573 #else
574         code = write(fd, tbuffer, tlen);
575 #endif
576         if (code != tlen) {
577             DBRELE(dbase);
578             ubik_dprint("write failed error=%d\n", code);
579             code = UIOERROR;
580             close(fd);
581             goto failed;
582         }
583         offset += tlen;
584         length -= tlen;
585     }
586 #ifndef OLD_URECOVERY
587     code = close(fd);
588     if (code)
589         goto failed;
590 #endif     
591
592     /* sync data first, then write label and resync (resync done by setlabel call).
593      * This way, good label is only on good database. */
594 #ifdef OLD_URECOVERY
595     (*ubik_dbase->sync) (dbase, file);
596 #else
597     afs_snprintf(tbuffer, sizeof(tbuffer), "%s.DB0", ubik_dbase->pathName);
598 #ifdef AFS_NT40_ENV
599     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.OLD", ubik_dbase->pathName);
600     code = unlink(pbuffer);
601     if (!code)
602         code = rename(tbuffer, pbuffer);
603     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.TMP", ubik_dbase->pathName);
604 #endif
605     if (!code) 
606         code = rename(pbuffer, tbuffer);
607     if (!code)
608 #endif
609     code = (*ubik_dbase->setlabel) (dbase, file, avers);
610 #ifndef OLD_URECOVERY
611 #ifdef AFS_NT40_ENV
612     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.OLD", ubik_dbase->pathName);
613     unlink(pbuffer);
614 #endif
615 #endif
616     memcpy(&ubik_dbase->version, avers, sizeof(struct ubik_version));
617     udisk_Invalidate(dbase, file);      /* new dbase, flush disk buffers */
618     LWP_NoYieldSignal(&dbase->version);
619     DBRELE(dbase);
620   failed:
621     if (code) {
622 #ifndef OLD_URECOVERY
623         unlink(pbuffer);
624 #endif
625         ubik_print
626             ("Ubik: Synchronize database with server %s failed (error = %d)\n",
627              afs_inet_ntoa(otherHost), code);
628     } else {
629         ubik_print("Ubik: Synchronize database completed\n");
630     }
631     return code;
632 }
633
634
635 afs_int32
636 SDISK_Probe(rxcall)
637      register struct rx_call *rxcall;
638 {
639     return 0;
640 }
641
642 /*
643 * Update remote machines addresses in my server list
644 * Send back my addresses to caller of this RPC
645 * Returns zero on success, else 1.
646 */
647 afs_int32
648 SDISK_UpdateInterfaceAddr(rxcall, inAddr, outAddr)
649      register struct rx_call *rxcall;
650      UbikInterfaceAddr *inAddr, *outAddr;
651 {
652     struct ubik_server *ts, *tmp;
653     afs_uint32 remoteAddr;      /* in net byte order */
654     int i, j, found = 0, probableMatch = 0;
655
656     /* copy the output parameters */
657     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR; i++)
658         outAddr->hostAddr[i] = ntohl(ubik_host[i]);
659
660     remoteAddr = htonl(inAddr->hostAddr[0]);
661     for (ts = ubik_servers; ts; ts = ts->next)
662         if (ts->addr[0] == remoteAddr) {        /* both in net byte order */
663             probableMatch = 1;
664             break;
665         }
666
667     if (probableMatch) {
668         /* verify that all addresses in the incoming RPC are
669          ** not part of other server entries in my CellServDB
670          */
671         for (i = 0; !found && (i < UBIK_MAX_INTERFACE_ADDR)
672              && inAddr->hostAddr[i]; i++) {
673             remoteAddr = htonl(inAddr->hostAddr[i]);
674             for (tmp = ubik_servers; (!found && tmp); tmp = tmp->next) {
675                 if (ts == tmp)  /* this is my server */
676                     continue;
677                 for (j = 0; (j < UBIK_MAX_INTERFACE_ADDR) && tmp->addr[j];
678                      j++)
679                     if (remoteAddr == tmp->addr[j]) {
680                         found = 1;
681                         break;
682                     }
683             }
684         }
685     }
686
687     /* if (probableMatch) */
688     /* inconsistent addresses in CellServDB */
689     if (!probableMatch || found) {
690         ubik_print("Inconsistent Cell Info from server: ");
691         for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
692             ubik_print("%s ", afs_inet_ntoa(htonl(inAddr->hostAddr[i])));
693         ubik_print("\n");
694         fflush(stdout);
695         fflush(stderr);
696         printServerInfo();
697         return UBADHOST;
698     }
699
700     /* update our data structures */
701     for (i = 1; i < UBIK_MAX_INTERFACE_ADDR; i++)
702         ts->addr[i] = htonl(inAddr->hostAddr[i]);
703
704     ubik_print("ubik: A Remote Server has addresses: ");
705     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && ts->addr[i]; i++)
706         ubik_print("%s ", afs_inet_ntoa(ts->addr[i]));
707     ubik_print("\n");
708
709     return 0;
710 }
711
712 void
713 printServerInfo()
714 {
715     struct ubik_server *ts;
716     int i, j = 1;
717
718     ubik_print("Local CellServDB:");
719     for (ts = ubik_servers; ts; ts = ts->next, j++) {
720         ubik_print("Server %d: ", j);
721         for (i = 0; (i < UBIK_MAX_INTERFACE_ADDR) && ts->addr[i]; i++)
722             ubik_print("%s ", afs_inet_ntoa(ts->addr[i]));
723     }
724     ubik_print("\n");
725 }
726
727 afs_int32
728 SDISK_SetVersion(rxcall, atid, oldversionp, newversionp)
729      struct rx_call *rxcall;
730      struct ubik_tid *atid;
731      struct ubik_version *oldversionp;
732      struct ubik_version *newversionp;
733 {
734     afs_int32 code = 0;
735     struct ubik_dbase *dbase;
736
737     if ((code = ubik_CheckAuth(rxcall))) {
738         return (code);
739     }
740
741     if (!ubik_currentTrans) {
742         return USYNC;
743     }
744     /* sanity check to make sure only write trans appear here */
745     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
746         return UBADTYPE;
747     }
748
749     /* Should not get this for the sync site */
750     if (ubeacon_AmSyncSite()) {
751         return UDEADLOCK;
752     }
753
754     dbase = ubik_currentTrans->dbase;
755     DBHOLD(dbase);
756     urecovery_CheckTid(atid);
757     if (!ubik_currentTrans) {
758         DBRELE(dbase);
759         return USYNC;
760     }
761
762     /* Set the label if its version matches the sync-site's */
763     if ((oldversionp->epoch == ubik_dbVersion.epoch)
764         && (oldversionp->counter == ubik_dbVersion.counter)) {
765         code = (*dbase->setlabel) (ubik_dbase, 0, newversionp);
766         if (!code) {
767             ubik_dbase->version = *newversionp;
768             ubik_dbVersion = *newversionp;
769         }
770     } else {
771         code = USYNC;
772     }
773
774     DBRELE(dbase);
775     return code;
776 }