a256a8a490b5f44a18ab58e5a576e6b1d94df664
[openafs.git] / src / ubik / remote.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #include <roken.h>
14
15 #include <sys/types.h>
16 #include <string.h>
17 #include <stdarg.h>
18
19 #ifdef AFS_NT40_ENV
20 #include <winsock2.h>
21 #include <fcntl.h>
22 #else
23 #include <sys/file.h>
24 #include <netinet/in.h>
25 #endif
26
27 #include <lock.h>
28 #include <rx/xdr.h>
29 #include <rx/rx.h>
30 #include <errno.h>
31 #include <afs/afsutil.h>
32
33 #define UBIK_INTERNALS
34 #include "ubik.h"
35 #include "ubik_int.h"
36
37 int (*ubik_CheckRXSecurityProc) (void *, struct rx_call *);
38 void *ubik_CheckRXSecurityRock;
39
40 static void printServerInfo(void);
41
42 /*! \file
43  * routines for handling requests remotely-submitted by the sync site.  These are
44  * only write transactions (we don't propagate read trans), and there is at most one
45  * write transaction extant at any one time.
46  */
47
48 struct ubik_trans *ubik_currentTrans = 0;
49
50 int
51 ubik_CheckAuth(struct rx_call *acall)
52 {
53     afs_int32 code;
54     if (ubik_CheckRXSecurityProc) {
55         code = (*ubik_CheckRXSecurityProc) (ubik_CheckRXSecurityRock, acall);
56         return code;
57     } else
58         return 0;
59 }
60
61 /* the rest of these guys handle remote execution of write
62  * transactions: this is the code executed on the other servers when a
63  * sync site is executing a write transaction.
64  */
65 afs_int32
66 SDISK_Begin(struct rx_call *rxcall, struct ubik_tid *atid)
67 {
68     afs_int32 code;
69
70     if ((code = ubik_CheckAuth(rxcall))) {
71         return code;
72     }
73     DBHOLD(ubik_dbase);
74     urecovery_CheckTid(atid, 1);
75     code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
76     if (!code && ubik_currentTrans) {
77         /* label this trans with the right trans id */
78         ubik_currentTrans->tid.epoch = atid->epoch;
79         ubik_currentTrans->tid.counter = atid->counter;
80     }
81     DBRELE(ubik_dbase);
82     return code;
83 }
84
85
86 afs_int32
87 SDISK_Commit(struct rx_call *rxcall, struct ubik_tid *atid)
88 {
89     afs_int32 code;
90     struct ubik_dbase *dbase;
91
92     if ((code = ubik_CheckAuth(rxcall))) {
93         return code;
94     }
95
96     if (!ubik_currentTrans) {
97         return USYNC;
98     }
99     /*
100      * sanity check to make sure only write trans appear here
101      */
102     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
103         return UBADTYPE;
104     }
105
106     dbase = ubik_currentTrans->dbase;
107
108     ObtainWriteLock(&dbase->cache_lock);
109
110     DBHOLD(dbase);
111
112     urecovery_CheckTid(atid, 0);
113     if (!ubik_currentTrans) {
114         DBRELE(dbase);
115         ReleaseWriteLock(&dbase->cache_lock);
116         return USYNC;
117     }
118
119     code = udisk_commit(ubik_currentTrans);
120     if (code == 0) {
121         /* sync site should now match */
122         ubik_dbVersion = ubik_dbase->version;
123     }
124     DBRELE(dbase);
125     ReleaseWriteLock(&dbase->cache_lock);
126     return code;
127 }
128
129 afs_int32
130 SDISK_ReleaseLocks(struct rx_call *rxcall, struct ubik_tid *atid)
131 {
132     struct ubik_dbase *dbase;
133     afs_int32 code;
134
135     if ((code = ubik_CheckAuth(rxcall))) {
136         return code;
137     }
138
139     if (!ubik_currentTrans) {
140         return USYNC;
141     }
142     /* sanity check to make sure only write trans appear here */
143     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
144         return UBADTYPE;
145     }
146
147     dbase = ubik_currentTrans->dbase;
148     DBHOLD(dbase);
149     urecovery_CheckTid(atid, 0);
150     if (!ubik_currentTrans) {
151         DBRELE(dbase);
152         return USYNC;
153     }
154
155     /* If the thread is not waiting for lock - ok to end it */
156 #if !defined(UBIK_PAUSE)
157     if (ubik_currentTrans->locktype != LOCKWAIT) {
158 #endif /* UBIK_PAUSE */
159         udisk_end(ubik_currentTrans);
160 #if !defined(UBIK_PAUSE)
161     }
162 #endif /* UBIK_PAUSE */
163     ubik_currentTrans = (struct ubik_trans *)0;
164     DBRELE(dbase);
165     return 0;
166 }
167
168 afs_int32
169 SDISK_Abort(struct rx_call *rxcall, struct ubik_tid *atid)
170 {
171     afs_int32 code;
172     struct ubik_dbase *dbase;
173
174     if ((code = ubik_CheckAuth(rxcall))) {
175         return code;
176     }
177
178     if (!ubik_currentTrans) {
179         return USYNC;
180     }
181     /* sanity check to make sure only write trans appear here  */
182     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
183         return UBADTYPE;
184     }
185
186     dbase = ubik_currentTrans->dbase;
187     DBHOLD(dbase);
188     urecovery_CheckTid(atid, 0);
189     if (!ubik_currentTrans) {
190         DBRELE(dbase);
191         return USYNC;
192     }
193
194     code = udisk_abort(ubik_currentTrans);
195     /* If the thread is not waiting for lock - ok to end it */
196 #if !defined(UBIK_PAUSE)
197     if (ubik_currentTrans->locktype != LOCKWAIT) {
198 #endif /* UBIK_PAUSE */
199         udisk_end(ubik_currentTrans);
200 #if !defined(UBIK_PAUSE)
201     }
202 #endif /* UBIK_PAUSE */
203     ubik_currentTrans = (struct ubik_trans *)0;
204     DBRELE(dbase);
205     return code;
206 }
207
208 /* apos and alen are not used */
209 afs_int32
210 SDISK_Lock(struct rx_call *rxcall, struct ubik_tid *atid,
211            afs_int32 afile, afs_int32 apos, afs_int32 alen, afs_int32 atype)
212 {
213     afs_int32 code;
214     struct ubik_dbase *dbase;
215     struct ubik_trans *ubik_thisTrans;
216
217     if ((code = ubik_CheckAuth(rxcall))) {
218         return code;
219     }
220     if (!ubik_currentTrans) {
221         return USYNC;
222     }
223     /* sanity check to make sure only write trans appear here */
224     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
225         return UBADTYPE;
226     }
227     if (alen != 1) {
228         return UBADLOCK;
229     }
230     dbase = ubik_currentTrans->dbase;
231     DBHOLD(dbase);
232     urecovery_CheckTid(atid, 0);
233     if (!ubik_currentTrans) {
234         DBRELE(dbase);
235         return USYNC;
236     }
237
238     ubik_thisTrans = ubik_currentTrans;
239     code = ulock_getLock(ubik_currentTrans, atype, 1);
240
241     /* While waiting, the transaction may have been ended/
242      * aborted from under us (urecovery_CheckTid). In that
243      * case, end the transaction here.
244      */
245     if (!code && (ubik_currentTrans != ubik_thisTrans)) {
246         udisk_end(ubik_thisTrans);
247         code = USYNC;
248     }
249
250     DBRELE(dbase);
251     return code;
252 }
253
254 /*!
255  * \brief Write a vector of data
256  */
257 afs_int32
258 SDISK_WriteV(struct rx_call *rxcall, struct ubik_tid *atid,
259              iovec_wrt *io_vector, iovec_buf *io_buffer)
260 {
261     afs_int32 code, i, offset;
262     struct ubik_dbase *dbase;
263     struct ubik_iovec *iovec;
264     char *iobuf;
265
266     if ((code = ubik_CheckAuth(rxcall))) {
267         return code;
268     }
269     if (!ubik_currentTrans) {
270         return USYNC;
271     }
272     /* sanity check to make sure only write trans appear here */
273     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
274         return UBADTYPE;
275     }
276
277     dbase = ubik_currentTrans->dbase;
278     DBHOLD(dbase);
279     urecovery_CheckTid(atid, 0);
280     if (!ubik_currentTrans) {
281         DBRELE(dbase);
282         return USYNC;
283     }
284
285     iovec = (struct ubik_iovec *)io_vector->iovec_wrt_val;
286     iobuf = (char *)io_buffer->iovec_buf_val;
287     for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
288         /* Sanity check for going off end of buffer */
289         if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
290             code = UINTERNAL;
291         } else {
292             code =
293                 udisk_write(ubik_currentTrans, iovec[i].file, &iobuf[offset],
294                             iovec[i].position, iovec[i].length);
295         }
296         if (code)
297             break;
298
299         offset += iovec[i].length;
300     }
301
302     DBRELE(dbase);
303     return code;
304 }
305
306 afs_int32
307 SDISK_Write(struct rx_call *rxcall, struct ubik_tid *atid,
308             afs_int32 afile, afs_int32 apos, bulkdata *adata)
309 {
310     afs_int32 code;
311     struct ubik_dbase *dbase;
312
313     if ((code = ubik_CheckAuth(rxcall))) {
314         return code;
315     }
316     if (!ubik_currentTrans) {
317         return USYNC;
318     }
319     /* sanity check to make sure only write trans appear here */
320     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
321         return UBADTYPE;
322     }
323
324     dbase = ubik_currentTrans->dbase;
325     DBHOLD(dbase);
326     urecovery_CheckTid(atid, 0);
327     if (!ubik_currentTrans) {
328         DBRELE(dbase);
329         return USYNC;
330     }
331     code =
332         udisk_write(ubik_currentTrans, afile, adata->bulkdata_val, apos,
333                     adata->bulkdata_len);
334     DBRELE(dbase);
335     return code;
336 }
337
338 afs_int32
339 SDISK_Truncate(struct rx_call *rxcall, struct ubik_tid *atid,
340                afs_int32 afile, afs_int32 alen)
341 {
342     afs_int32 code;
343     struct ubik_dbase *dbase;
344
345     if ((code = ubik_CheckAuth(rxcall))) {
346         return code;
347     }
348     if (!ubik_currentTrans) {
349         return USYNC;
350     }
351     /* sanity check to make sure only write trans appear here */
352     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
353         return UBADTYPE;
354     }
355
356     dbase = ubik_currentTrans->dbase;
357     DBHOLD(dbase);
358     urecovery_CheckTid(atid, 0);
359     if (!ubik_currentTrans) {
360         DBRELE(dbase);
361         return USYNC;
362     }
363     code = udisk_truncate(ubik_currentTrans, afile, alen);
364     DBRELE(dbase);
365     return code;
366 }
367
368 afs_int32
369 SDISK_GetVersion(struct rx_call *rxcall,
370                  struct ubik_version *aversion)
371 {
372     afs_int32 code;
373
374     if ((code = ubik_CheckAuth(rxcall))) {
375         return code;
376     }
377
378     /*
379      * If we are the sync site, recovery shouldn't be running on any
380      * other site. We shouldn't be getting this RPC as long as we are
381      * the sync site.  To prevent any unforseen activity, we should
382      * reject this RPC until we have recognized that we are not the
383      * sync site anymore, and/or if we have any pending WRITE
384      * transactions that have to complete. This way we can be assured
385      * that this RPC would not block any pending transactions that
386      * should either fail or pass. If we have recognized the fact that
387      * we are not the sync site any more, all write transactions would
388      * fail with UNOQUORUM anyway.
389      */
390     if (ubeacon_AmSyncSite()) {
391         return UDEADLOCK;
392     }
393
394     DBHOLD(ubik_dbase);
395     code = (*ubik_dbase->getlabel) (ubik_dbase, 0, aversion);
396     DBRELE(ubik_dbase);
397     if (code) {
398         /* tell other side there's no dbase */
399         aversion->epoch = 0;
400         aversion->counter = 0;
401     }
402     return 0;
403 }
404
405 afs_int32
406 SDISK_GetFile(struct rx_call *rxcall, afs_int32 file,
407               struct ubik_version *version)
408 {
409     afs_int32 code;
410     struct ubik_dbase *dbase;
411     afs_int32 offset;
412     struct ubik_stat ubikstat;
413     char tbuffer[256];
414     afs_int32 tlen;
415     afs_int32 length;
416
417     if ((code = ubik_CheckAuth(rxcall))) {
418         return code;
419     }
420 /* temporarily disabled because it causes problems for migration tool.  Hey, it's just
421  * a sanity check, anyway.
422     if (ubeacon_AmSyncSite()) {
423       return UDEADLOCK;
424     }
425 */
426     dbase = ubik_dbase;
427     DBHOLD(dbase);
428     code = (*dbase->stat) (dbase, file, &ubikstat);
429     if (code < 0) {
430         DBRELE(dbase);
431         return code;
432     }
433     length = ubikstat.size;
434     tlen = htonl(length);
435     code = rx_Write(rxcall, (char *)&tlen, sizeof(afs_int32));
436     if (code != sizeof(afs_int32)) {
437         DBRELE(dbase);
438         ubik_dprint("Rx-write length error=%d\n", code);
439         return BULK_ERROR;
440     }
441     offset = 0;
442     while (length > 0) {
443         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
444         code = (*dbase->read) (dbase, file, tbuffer, offset, tlen);
445         if (code != tlen) {
446             DBRELE(dbase);
447             ubik_dprint("read failed error=%d\n", code);
448             return UIOERROR;
449         }
450         code = rx_Write(rxcall, tbuffer, tlen);
451         if (code != tlen) {
452             DBRELE(dbase);
453             ubik_dprint("Rx-write length error=%d\n", code);
454             return BULK_ERROR;
455         }
456         length -= tlen;
457         offset += tlen;
458     }
459     code = (*dbase->getlabel) (dbase, file, version);   /* return the dbase, too */
460     DBRELE(dbase);
461     return code;
462 }
463
464 afs_int32
465 SDISK_SendFile(struct rx_call *rxcall, afs_int32 file,
466                afs_int32 length, struct ubik_version *avers)
467 {
468     afs_int32 code;
469     struct ubik_dbase *dbase = NULL;
470     char tbuffer[1024];
471     afs_int32 offset;
472     struct ubik_version tversion;
473     int tlen;
474     struct rx_peer *tpeer;
475     struct rx_connection *tconn;
476     afs_uint32 otherHost = 0;
477     char hoststr[16];
478     char pbuffer[1028];
479     int fd = -1;
480     afs_int32 epoch = 0;
481     afs_int32 pass;
482
483     /* send the file back to the requester */
484
485     if ((code = ubik_CheckAuth(rxcall))) {
486         goto failed;
487     }
488
489     /* next, we do a sanity check to see if the guy sending us the database is
490      * the guy we think is the sync site.  It turns out that we might not have
491      * decided yet that someone's the sync site, but they could have enough
492      * votes from others to be sync site anyway, and could send us the database
493      * in advance of getting our votes.  This is fine, what we're really trying
494      * to check is that some authenticated bogon isn't sending a random database
495      * into another configuration.  This could happen on a bad configuration
496      * screwup.  Thus, we only object if we're sure we know who the sync site
497      * is, and it ain't the guy talking to us.
498      */
499     offset = uvote_GetSyncSite();
500     tconn = rx_ConnectionOf(rxcall);
501     tpeer = rx_PeerOf(tconn);
502     otherHost = ubikGetPrimaryInterfaceAddr(rx_HostOf(tpeer));
503     if (offset && offset != otherHost) {
504         /* we *know* this is the wrong guy */
505         code = USYNC;
506         goto failed;
507     }
508
509     dbase = ubik_dbase;
510     DBHOLD(dbase);
511
512     /* abort any active trans that may scribble over the database */
513     urecovery_AbortAll(dbase);
514
515     ubik_print("Ubik: Synchronize database with server %s\n",
516                afs_inet_ntoa_r(otherHost, hoststr));
517
518     offset = 0;
519     epoch = tversion.epoch = 0;         /* start off by labelling in-transit db as invalid */
520     (*dbase->setlabel) (dbase, file, &tversion);        /* setlabel does sync */
521     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
522     fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
523     if (fd < 0) {
524         code = errno;
525         goto failed;
526     }
527     code = lseek(fd, HDRSIZE, 0);
528     if (code != HDRSIZE) {
529         close(fd);
530         goto failed;
531     }
532     pass = 0;
533     memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
534     while (length > 0) {
535         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
536 #if !defined(AFS_PTHREAD_ENV)
537         if (pass % 4 == 0)
538             IOMGR_Poll();
539 #endif
540         code = rx_Read(rxcall, tbuffer, tlen);
541         if (code != tlen) {
542             DBRELE(dbase);
543             ubik_dprint("Rx-read length error=%d\n", code);
544             code = BULK_ERROR;
545             close(fd);
546             goto failed;
547         }
548         code = write(fd, tbuffer, tlen);
549         pass++;
550         if (code != tlen) {
551             DBRELE(dbase);
552             ubik_dprint("write failed error=%d\n", code);
553             code = UIOERROR;
554             close(fd);
555             goto failed;
556         }
557         offset += tlen;
558         length -= tlen;
559     }
560     code = close(fd);
561     if (code)
562         goto failed;
563
564     /* sync data first, then write label and resync (resync done by setlabel call).
565      * This way, good label is only on good database. */
566     afs_snprintf(tbuffer, sizeof(tbuffer), "%s.DB%s%d", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
567 #ifdef AFS_NT40_ENV
568     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
569     code = unlink(pbuffer);
570     if (!code)
571         code = rename(tbuffer, pbuffer);
572     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
573 #endif
574     if (!code)
575         code = rename(pbuffer, tbuffer);
576     if (!code) {
577         (*ubik_dbase->open) (ubik_dbase, file);
578         code = (*ubik_dbase->setlabel) (dbase, file, avers);
579     }
580 #ifdef AFS_NT40_ENV
581     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
582     unlink(pbuffer);
583 #endif
584     memcpy(&ubik_dbase->version, avers, sizeof(struct ubik_version));
585     udisk_Invalidate(dbase, file);      /* new dbase, flush disk buffers */
586 #ifdef AFS_PTHREAD_ENV
587     assert(pthread_cond_broadcast(&dbase->version_cond) == 0);
588 #else
589     LWP_NoYieldSignal(&dbase->version);
590 #endif
591     DBRELE(dbase);
592   failed:
593     if (code) {
594         unlink(pbuffer);
595         /* Failed to sync. Allow reads again for now. */
596         if (dbase != NULL) {
597             tversion.epoch = epoch;
598             (*dbase->setlabel) (dbase, file, &tversion);
599         }
600         ubik_print
601             ("Ubik: Synchronize database with server %s failed (error = %d)\n",
602              afs_inet_ntoa_r(otherHost, hoststr), code);
603     } else {
604         ubik_print("Ubik: Synchronize database completed\n");
605     }
606     return code;
607 }
608
609
610 afs_int32
611 SDISK_Probe(struct rx_call *rxcall)
612 {
613     return 0;
614 }
615
616 /*!
617  * \brief Update remote machines addresses in my server list
618  *
619  * Send back my addresses to caller of this RPC
620  * \return zero on success, else 1.
621  */
622 afs_int32
623 SDISK_UpdateInterfaceAddr(struct rx_call *rxcall,
624                           UbikInterfaceAddr *inAddr,
625                           UbikInterfaceAddr *outAddr)
626 {
627     struct ubik_server *ts, *tmp;
628     afs_uint32 remoteAddr;      /* in net byte order */
629     int i, j, found = 0, probableMatch = 0;
630     char hoststr[16];
631
632     /* copy the output parameters */
633     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR; i++)
634         outAddr->hostAddr[i] = ntohl(ubik_host[i]);
635
636     remoteAddr = htonl(inAddr->hostAddr[0]);
637     for (ts = ubik_servers; ts; ts = ts->next)
638         if (ts->addr[0] == remoteAddr) {        /* both in net byte order */
639             probableMatch = 1;
640             break;
641         }
642
643     if (probableMatch) {
644         /* verify that all addresses in the incoming RPC are
645          ** not part of other server entries in my CellServDB
646          */
647         for (i = 0; !found && (i < UBIK_MAX_INTERFACE_ADDR)
648              && inAddr->hostAddr[i]; i++) {
649             remoteAddr = htonl(inAddr->hostAddr[i]);
650             for (tmp = ubik_servers; (!found && tmp); tmp = tmp->next) {
651                 if (ts == tmp)  /* this is my server */
652                     continue;
653                 for (j = 0; (j < UBIK_MAX_INTERFACE_ADDR) && tmp->addr[j];
654                      j++)
655                     if (remoteAddr == tmp->addr[j]) {
656                         found = 1;
657                         break;
658                     }
659             }
660         }
661     }
662
663     /* if (probableMatch) */
664     /* inconsistent addresses in CellServDB */
665     if (!probableMatch || found) {
666         ubik_print("Inconsistent Cell Info from server: ");
667         for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
668             ubik_print("%s ", afs_inet_ntoa_r(htonl(inAddr->hostAddr[i]), hoststr));
669         ubik_print("\n");
670         fflush(stdout);
671         fflush(stderr);
672         printServerInfo();
673         return UBADHOST;
674     }
675
676     /* update our data structures */
677     for (i = 1; i < UBIK_MAX_INTERFACE_ADDR; i++)
678         ts->addr[i] = htonl(inAddr->hostAddr[i]);
679
680     ubik_print("ubik: A Remote Server has addresses: ");
681     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && ts->addr[i]; i++)
682         ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
683     ubik_print("\n");
684
685     return 0;
686 }
687
688 static void
689 printServerInfo(void)
690 {
691     struct ubik_server *ts;
692     int i, j = 1;
693     char hoststr[16];
694
695     ubik_print("Local CellServDB:");
696     for (ts = ubik_servers; ts; ts = ts->next, j++) {
697         ubik_print("Server %d: ", j);
698         for (i = 0; (i < UBIK_MAX_INTERFACE_ADDR) && ts->addr[i]; i++)
699             ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
700     }
701     ubik_print("\n");
702 }
703
704 afs_int32
705 SDISK_SetVersion(struct rx_call *rxcall, struct ubik_tid *atid,
706                  struct ubik_version *oldversionp,
707                  struct ubik_version *newversionp)
708 {
709     afs_int32 code = 0;
710     struct ubik_dbase *dbase;
711
712     if ((code = ubik_CheckAuth(rxcall))) {
713         return (code);
714     }
715
716     if (!ubik_currentTrans) {
717         return USYNC;
718     }
719     /* sanity check to make sure only write trans appear here */
720     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
721         return UBADTYPE;
722     }
723
724     /* Should not get this for the sync site */
725     if (ubeacon_AmSyncSite()) {
726         return UDEADLOCK;
727     }
728
729     dbase = ubik_currentTrans->dbase;
730     DBHOLD(dbase);
731     urecovery_CheckTid(atid, 0);
732     if (!ubik_currentTrans) {
733         DBRELE(dbase);
734         return USYNC;
735     }
736
737     /* Set the label if its version matches the sync-site's */
738     if ((oldversionp->epoch == ubik_dbVersion.epoch)
739         && (oldversionp->counter == ubik_dbVersion.counter)) {
740         code = (*dbase->setlabel) (ubik_dbase, 0, newversionp);
741         if (!code) {
742             ubik_dbase->version = *newversionp;
743             ubik_dbVersion = *newversionp;
744         }
745     } else {
746         code = USYNC;
747     }
748
749     DBRELE(dbase);
750     return code;
751 }