66c1ae7322ff8f9fecef6fd133bf7f0c722c0d43
[openafs.git] / src / ubik / remote.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #include <roken.h>
14
15 #include <sys/types.h>
16 #include <string.h>
17 #include <stdarg.h>
18
19 #ifdef AFS_NT40_ENV
20 #include <winsock2.h>
21 #include <fcntl.h>
22 #else
23 #include <sys/file.h>
24 #include <netinet/in.h>
25 #endif
26
27 #include <lock.h>
28 #include <rx/xdr.h>
29 #include <rx/rx.h>
30 #include <errno.h>
31 #include <afs/afsutil.h>
32
33 #define UBIK_INTERNALS
34 #include "ubik.h"
35 #include "ubik_int.h"
36
37 int (*ubik_CheckRXSecurityProc) (void *, struct rx_call *);
38 void *ubik_CheckRXSecurityRock;
39
40 static void printServerInfo(void);
41
42 /*! \file
43  * routines for handling requests remotely-submitted by the sync site.  These are
44  * only write transactions (we don't propagate read trans), and there is at most one
45  * write transaction extant at any one time.
46  */
47
48 struct ubik_trans *ubik_currentTrans = 0;
49
50 int
51 ubik_CheckAuth(struct rx_call *acall)
52 {
53     afs_int32 code;
54     if (ubik_CheckRXSecurityProc) {
55         code = (*ubik_CheckRXSecurityProc) (ubik_CheckRXSecurityRock, acall);
56         return code;
57     } else
58         return 0;
59 }
60
61 /* the rest of these guys handle remote execution of write
62  * transactions: this is the code executed on the other servers when a
63  * sync site is executing a write transaction.
64  */
65 afs_int32
66 SDISK_Begin(struct rx_call *rxcall, struct ubik_tid *atid)
67 {
68     afs_int32 code;
69
70     if ((code = ubik_CheckAuth(rxcall))) {
71         return code;
72     }
73     DBHOLD(ubik_dbase);
74     urecovery_CheckTid(atid, 1);
75     code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
76     if (!code && ubik_currentTrans) {
77         /* label this trans with the right trans id */
78         ubik_currentTrans->tid.epoch = atid->epoch;
79         ubik_currentTrans->tid.counter = atid->counter;
80     }
81     DBRELE(ubik_dbase);
82     return code;
83 }
84
85
86 afs_int32
87 SDISK_Commit(struct rx_call *rxcall, struct ubik_tid *atid)
88 {
89     afs_int32 code;
90     struct ubik_dbase *dbase;
91
92     if ((code = ubik_CheckAuth(rxcall))) {
93         return code;
94     }
95
96     if (!ubik_currentTrans) {
97         return USYNC;
98     }
99     /*
100      * sanity check to make sure only write trans appear here
101      */
102     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
103         return UBADTYPE;
104     }
105
106     dbase = ubik_currentTrans->dbase;
107
108     ObtainWriteLock(&dbase->cache_lock);
109
110     DBHOLD(dbase);
111
112     urecovery_CheckTid(atid, 0);
113     if (!ubik_currentTrans) {
114         DBRELE(dbase);
115         ReleaseWriteLock(&dbase->cache_lock);
116         return USYNC;
117     }
118
119     code = udisk_commit(ubik_currentTrans);
120     if (code == 0) {
121         /* sync site should now match */
122         ubik_dbVersion = ubik_dbase->version;
123     }
124     DBRELE(dbase);
125     ReleaseWriteLock(&dbase->cache_lock);
126     return code;
127 }
128
129 afs_int32
130 SDISK_ReleaseLocks(struct rx_call *rxcall, struct ubik_tid *atid)
131 {
132     struct ubik_dbase *dbase;
133     afs_int32 code;
134
135     if ((code = ubik_CheckAuth(rxcall))) {
136         return code;
137     }
138
139     if (!ubik_currentTrans) {
140         return USYNC;
141     }
142     /* sanity check to make sure only write trans appear here */
143     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
144         return UBADTYPE;
145     }
146
147     dbase = ubik_currentTrans->dbase;
148     DBHOLD(dbase);
149     urecovery_CheckTid(atid, 0);
150     if (!ubik_currentTrans) {
151         DBRELE(dbase);
152         return USYNC;
153     }
154
155     /* If the thread is not waiting for lock - ok to end it */
156     if (ubik_currentTrans->locktype != LOCKWAIT) {
157         udisk_end(ubik_currentTrans);
158     }
159     ubik_currentTrans = (struct ubik_trans *)0;
160     DBRELE(dbase);
161     return 0;
162 }
163
164 afs_int32
165 SDISK_Abort(struct rx_call *rxcall, struct ubik_tid *atid)
166 {
167     afs_int32 code;
168     struct ubik_dbase *dbase;
169
170     if ((code = ubik_CheckAuth(rxcall))) {
171         return code;
172     }
173
174     if (!ubik_currentTrans) {
175         return USYNC;
176     }
177     /* sanity check to make sure only write trans appear here  */
178     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
179         return UBADTYPE;
180     }
181
182     dbase = ubik_currentTrans->dbase;
183     DBHOLD(dbase);
184     urecovery_CheckTid(atid, 0);
185     if (!ubik_currentTrans) {
186         DBRELE(dbase);
187         return USYNC;
188     }
189
190     code = udisk_abort(ubik_currentTrans);
191     /* If the thread is not waiting for lock - ok to end it */
192     if (ubik_currentTrans->locktype != LOCKWAIT) {
193         udisk_end(ubik_currentTrans);
194     }
195     ubik_currentTrans = (struct ubik_trans *)0;
196     DBRELE(dbase);
197     return code;
198 }
199
200 /* apos and alen are not used */
201 afs_int32
202 SDISK_Lock(struct rx_call *rxcall, struct ubik_tid *atid,
203            afs_int32 afile, afs_int32 apos, afs_int32 alen, afs_int32 atype)
204 {
205     afs_int32 code;
206     struct ubik_dbase *dbase;
207     struct ubik_trans *ubik_thisTrans;
208
209     if ((code = ubik_CheckAuth(rxcall))) {
210         return code;
211     }
212     if (!ubik_currentTrans) {
213         return USYNC;
214     }
215     /* sanity check to make sure only write trans appear here */
216     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
217         return UBADTYPE;
218     }
219     if (alen != 1) {
220         return UBADLOCK;
221     }
222     dbase = ubik_currentTrans->dbase;
223     DBHOLD(dbase);
224     urecovery_CheckTid(atid, 0);
225     if (!ubik_currentTrans) {
226         DBRELE(dbase);
227         return USYNC;
228     }
229
230     ubik_thisTrans = ubik_currentTrans;
231     code = ulock_getLock(ubik_currentTrans, atype, 1);
232
233     /* While waiting, the transaction may have been ended/
234      * aborted from under us (urecovery_CheckTid). In that
235      * case, end the transaction here.
236      */
237     if (!code && (ubik_currentTrans != ubik_thisTrans)) {
238         udisk_end(ubik_thisTrans);
239         code = USYNC;
240     }
241
242     DBRELE(dbase);
243     return code;
244 }
245
246 /*!
247  * \brief Write a vector of data
248  */
249 afs_int32
250 SDISK_WriteV(struct rx_call *rxcall, struct ubik_tid *atid,
251              iovec_wrt *io_vector, iovec_buf *io_buffer)
252 {
253     afs_int32 code, i, offset;
254     struct ubik_dbase *dbase;
255     struct ubik_iovec *iovec;
256     char *iobuf;
257
258     if ((code = ubik_CheckAuth(rxcall))) {
259         return code;
260     }
261     if (!ubik_currentTrans) {
262         return USYNC;
263     }
264     /* sanity check to make sure only write trans appear here */
265     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
266         return UBADTYPE;
267     }
268
269     dbase = ubik_currentTrans->dbase;
270     DBHOLD(dbase);
271     urecovery_CheckTid(atid, 0);
272     if (!ubik_currentTrans) {
273         DBRELE(dbase);
274         return USYNC;
275     }
276
277     iovec = (struct ubik_iovec *)io_vector->iovec_wrt_val;
278     iobuf = (char *)io_buffer->iovec_buf_val;
279     for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
280         /* Sanity check for going off end of buffer */
281         if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
282             code = UINTERNAL;
283         } else {
284             code =
285                 udisk_write(ubik_currentTrans, iovec[i].file, &iobuf[offset],
286                             iovec[i].position, iovec[i].length);
287         }
288         if (code)
289             break;
290
291         offset += iovec[i].length;
292     }
293
294     DBRELE(dbase);
295     return code;
296 }
297
298 afs_int32
299 SDISK_Write(struct rx_call *rxcall, struct ubik_tid *atid,
300             afs_int32 afile, afs_int32 apos, bulkdata *adata)
301 {
302     afs_int32 code;
303     struct ubik_dbase *dbase;
304
305     if ((code = ubik_CheckAuth(rxcall))) {
306         return code;
307     }
308     if (!ubik_currentTrans) {
309         return USYNC;
310     }
311     /* sanity check to make sure only write trans appear here */
312     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
313         return UBADTYPE;
314     }
315
316     dbase = ubik_currentTrans->dbase;
317     DBHOLD(dbase);
318     urecovery_CheckTid(atid, 0);
319     if (!ubik_currentTrans) {
320         DBRELE(dbase);
321         return USYNC;
322     }
323     code =
324         udisk_write(ubik_currentTrans, afile, adata->bulkdata_val, apos,
325                     adata->bulkdata_len);
326     DBRELE(dbase);
327     return code;
328 }
329
330 afs_int32
331 SDISK_Truncate(struct rx_call *rxcall, struct ubik_tid *atid,
332                afs_int32 afile, afs_int32 alen)
333 {
334     afs_int32 code;
335     struct ubik_dbase *dbase;
336
337     if ((code = ubik_CheckAuth(rxcall))) {
338         return code;
339     }
340     if (!ubik_currentTrans) {
341         return USYNC;
342     }
343     /* sanity check to make sure only write trans appear here */
344     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
345         return UBADTYPE;
346     }
347
348     dbase = ubik_currentTrans->dbase;
349     DBHOLD(dbase);
350     urecovery_CheckTid(atid, 0);
351     if (!ubik_currentTrans) {
352         DBRELE(dbase);
353         return USYNC;
354     }
355     code = udisk_truncate(ubik_currentTrans, afile, alen);
356     DBRELE(dbase);
357     return code;
358 }
359
360 afs_int32
361 SDISK_GetVersion(struct rx_call *rxcall,
362                  struct ubik_version *aversion)
363 {
364     afs_int32 code;
365
366     if ((code = ubik_CheckAuth(rxcall))) {
367         return code;
368     }
369
370     /*
371      * If we are the sync site, recovery shouldn't be running on any
372      * other site. We shouldn't be getting this RPC as long as we are
373      * the sync site.  To prevent any unforseen activity, we should
374      * reject this RPC until we have recognized that we are not the
375      * sync site anymore, and/or if we have any pending WRITE
376      * transactions that have to complete. This way we can be assured
377      * that this RPC would not block any pending transactions that
378      * should either fail or pass. If we have recognized the fact that
379      * we are not the sync site any more, all write transactions would
380      * fail with UNOQUORUM anyway.
381      */
382     if (ubeacon_AmSyncSite()) {
383         return UDEADLOCK;
384     }
385
386     DBHOLD(ubik_dbase);
387     code = (*ubik_dbase->getlabel) (ubik_dbase, 0, aversion);
388     DBRELE(ubik_dbase);
389     if (code) {
390         /* tell other side there's no dbase */
391         aversion->epoch = 0;
392         aversion->counter = 0;
393     }
394     return 0;
395 }
396
397 afs_int32
398 SDISK_GetFile(struct rx_call *rxcall, afs_int32 file,
399               struct ubik_version *version)
400 {
401     afs_int32 code;
402     struct ubik_dbase *dbase;
403     afs_int32 offset;
404     struct ubik_stat ubikstat;
405     char tbuffer[256];
406     afs_int32 tlen;
407     afs_int32 length;
408
409     if ((code = ubik_CheckAuth(rxcall))) {
410         return code;
411     }
412 /* temporarily disabled because it causes problems for migration tool.  Hey, it's just
413  * a sanity check, anyway.
414     if (ubeacon_AmSyncSite()) {
415       return UDEADLOCK;
416     }
417 */
418     dbase = ubik_dbase;
419     DBHOLD(dbase);
420     code = (*dbase->stat) (dbase, file, &ubikstat);
421     if (code < 0) {
422         DBRELE(dbase);
423         return code;
424     }
425     length = ubikstat.size;
426     tlen = htonl(length);
427     code = rx_Write(rxcall, (char *)&tlen, sizeof(afs_int32));
428     if (code != sizeof(afs_int32)) {
429         DBRELE(dbase);
430         ubik_dprint("Rx-write length error=%d\n", code);
431         return BULK_ERROR;
432     }
433     offset = 0;
434     while (length > 0) {
435         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
436         code = (*dbase->read) (dbase, file, tbuffer, offset, tlen);
437         if (code != tlen) {
438             DBRELE(dbase);
439             ubik_dprint("read failed error=%d\n", code);
440             return UIOERROR;
441         }
442         code = rx_Write(rxcall, tbuffer, tlen);
443         if (code != tlen) {
444             DBRELE(dbase);
445             ubik_dprint("Rx-write length error=%d\n", code);
446             return BULK_ERROR;
447         }
448         length -= tlen;
449         offset += tlen;
450     }
451     code = (*dbase->getlabel) (dbase, file, version);   /* return the dbase, too */
452     DBRELE(dbase);
453     return code;
454 }
455
456 afs_int32
457 SDISK_SendFile(struct rx_call *rxcall, afs_int32 file,
458                afs_int32 length, struct ubik_version *avers)
459 {
460     afs_int32 code;
461     struct ubik_dbase *dbase = NULL;
462     char tbuffer[1024];
463     afs_int32 offset;
464     struct ubik_version tversion;
465     int tlen;
466     struct rx_peer *tpeer;
467     struct rx_connection *tconn;
468     afs_uint32 otherHost = 0;
469     char hoststr[16];
470     char pbuffer[1028];
471     int fd = -1;
472     afs_int32 epoch = 0;
473     afs_int32 pass;
474
475     /* send the file back to the requester */
476
477     if ((code = ubik_CheckAuth(rxcall))) {
478         goto failed;
479     }
480
481     /* next, we do a sanity check to see if the guy sending us the database is
482      * the guy we think is the sync site.  It turns out that we might not have
483      * decided yet that someone's the sync site, but they could have enough
484      * votes from others to be sync site anyway, and could send us the database
485      * in advance of getting our votes.  This is fine, what we're really trying
486      * to check is that some authenticated bogon isn't sending a random database
487      * into another configuration.  This could happen on a bad configuration
488      * screwup.  Thus, we only object if we're sure we know who the sync site
489      * is, and it ain't the guy talking to us.
490      */
491     offset = uvote_GetSyncSite();
492     tconn = rx_ConnectionOf(rxcall);
493     tpeer = rx_PeerOf(tconn);
494     otherHost = ubikGetPrimaryInterfaceAddr(rx_HostOf(tpeer));
495     if (offset && offset != otherHost) {
496         /* we *know* this is the wrong guy */
497         code = USYNC;
498         goto failed;
499     }
500
501     dbase = ubik_dbase;
502     DBHOLD(dbase);
503
504     /* abort any active trans that may scribble over the database */
505     urecovery_AbortAll(dbase);
506
507     ubik_print("Ubik: Synchronize database with server %s\n",
508                afs_inet_ntoa_r(otherHost, hoststr));
509
510     offset = 0;
511     epoch = tversion.epoch = 0;         /* start off by labelling in-transit db as invalid */
512     (*dbase->setlabel) (dbase, file, &tversion);        /* setlabel does sync */
513     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
514     fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
515     if (fd < 0) {
516         code = errno;
517         goto failed;
518     }
519     code = lseek(fd, HDRSIZE, 0);
520     if (code != HDRSIZE) {
521         close(fd);
522         goto failed;
523     }
524     pass = 0;
525     memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
526     while (length > 0) {
527         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
528 #if !defined(AFS_PTHREAD_ENV)
529         if (pass % 4 == 0)
530             IOMGR_Poll();
531 #endif
532         code = rx_Read(rxcall, tbuffer, tlen);
533         if (code != tlen) {
534             DBRELE(dbase);
535             ubik_dprint("Rx-read length error=%d\n", code);
536             code = BULK_ERROR;
537             close(fd);
538             goto failed;
539         }
540         code = write(fd, tbuffer, tlen);
541         pass++;
542         if (code != tlen) {
543             DBRELE(dbase);
544             ubik_dprint("write failed error=%d\n", code);
545             code = UIOERROR;
546             close(fd);
547             goto failed;
548         }
549         offset += tlen;
550         length -= tlen;
551     }
552     code = close(fd);
553     if (code)
554         goto failed;
555
556     /* sync data first, then write label and resync (resync done by setlabel call).
557      * This way, good label is only on good database. */
558     afs_snprintf(tbuffer, sizeof(tbuffer), "%s.DB%s%d", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
559 #ifdef AFS_NT40_ENV
560     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
561     code = unlink(pbuffer);
562     if (!code)
563         code = rename(tbuffer, pbuffer);
564     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
565 #endif
566     if (!code)
567         code = rename(pbuffer, tbuffer);
568     if (!code) {
569         (*ubik_dbase->open) (ubik_dbase, file);
570         code = (*ubik_dbase->setlabel) (dbase, file, avers);
571     }
572 #ifdef AFS_NT40_ENV
573     afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
574     unlink(pbuffer);
575 #endif
576     memcpy(&ubik_dbase->version, avers, sizeof(struct ubik_version));
577     udisk_Invalidate(dbase, file);      /* new dbase, flush disk buffers */
578 #ifdef AFS_PTHREAD_ENV
579     assert(pthread_cond_broadcast(&dbase->version_cond) == 0);
580 #else
581     LWP_NoYieldSignal(&dbase->version);
582 #endif
583     DBRELE(dbase);
584   failed:
585     if (code) {
586         unlink(pbuffer);
587         /* Failed to sync. Allow reads again for now. */
588         if (dbase != NULL) {
589             tversion.epoch = epoch;
590             (*dbase->setlabel) (dbase, file, &tversion);
591         }
592         ubik_print
593             ("Ubik: Synchronize database with server %s failed (error = %d)\n",
594              afs_inet_ntoa_r(otherHost, hoststr), code);
595     } else {
596         ubik_print("Ubik: Synchronize database completed\n");
597     }
598     return code;
599 }
600
601
602 afs_int32
603 SDISK_Probe(struct rx_call *rxcall)
604 {
605     return 0;
606 }
607
608 /*!
609  * \brief Update remote machines addresses in my server list
610  *
611  * Send back my addresses to caller of this RPC
612  * \return zero on success, else 1.
613  */
614 afs_int32
615 SDISK_UpdateInterfaceAddr(struct rx_call *rxcall,
616                           UbikInterfaceAddr *inAddr,
617                           UbikInterfaceAddr *outAddr)
618 {
619     struct ubik_server *ts, *tmp;
620     afs_uint32 remoteAddr;      /* in net byte order */
621     int i, j, found = 0, probableMatch = 0;
622     char hoststr[16];
623
624     /* copy the output parameters */
625     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR; i++)
626         outAddr->hostAddr[i] = ntohl(ubik_host[i]);
627
628     remoteAddr = htonl(inAddr->hostAddr[0]);
629     for (ts = ubik_servers; ts; ts = ts->next)
630         if (ts->addr[0] == remoteAddr) {        /* both in net byte order */
631             probableMatch = 1;
632             break;
633         }
634
635     if (probableMatch) {
636         /* verify that all addresses in the incoming RPC are
637          ** not part of other server entries in my CellServDB
638          */
639         for (i = 0; !found && (i < UBIK_MAX_INTERFACE_ADDR)
640              && inAddr->hostAddr[i]; i++) {
641             remoteAddr = htonl(inAddr->hostAddr[i]);
642             for (tmp = ubik_servers; (!found && tmp); tmp = tmp->next) {
643                 if (ts == tmp)  /* this is my server */
644                     continue;
645                 for (j = 0; (j < UBIK_MAX_INTERFACE_ADDR) && tmp->addr[j];
646                      j++)
647                     if (remoteAddr == tmp->addr[j]) {
648                         found = 1;
649                         break;
650                     }
651             }
652         }
653     }
654
655     /* if (probableMatch) */
656     /* inconsistent addresses in CellServDB */
657     if (!probableMatch || found) {
658         ubik_print("Inconsistent Cell Info from server: ");
659         for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
660             ubik_print("%s ", afs_inet_ntoa_r(htonl(inAddr->hostAddr[i]), hoststr));
661         ubik_print("\n");
662         fflush(stdout);
663         fflush(stderr);
664         printServerInfo();
665         return UBADHOST;
666     }
667
668     /* update our data structures */
669     for (i = 1; i < UBIK_MAX_INTERFACE_ADDR; i++)
670         ts->addr[i] = htonl(inAddr->hostAddr[i]);
671
672     ubik_print("ubik: A Remote Server has addresses: ");
673     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && ts->addr[i]; i++)
674         ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
675     ubik_print("\n");
676
677     return 0;
678 }
679
680 static void
681 printServerInfo(void)
682 {
683     struct ubik_server *ts;
684     int i, j = 1;
685     char hoststr[16];
686
687     ubik_print("Local CellServDB:");
688     for (ts = ubik_servers; ts; ts = ts->next, j++) {
689         ubik_print("Server %d: ", j);
690         for (i = 0; (i < UBIK_MAX_INTERFACE_ADDR) && ts->addr[i]; i++)
691             ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
692     }
693     ubik_print("\n");
694 }
695
696 afs_int32
697 SDISK_SetVersion(struct rx_call *rxcall, struct ubik_tid *atid,
698                  struct ubik_version *oldversionp,
699                  struct ubik_version *newversionp)
700 {
701     afs_int32 code = 0;
702     struct ubik_dbase *dbase;
703
704     if ((code = ubik_CheckAuth(rxcall))) {
705         return (code);
706     }
707
708     if (!ubik_currentTrans) {
709         return USYNC;
710     }
711     /* sanity check to make sure only write trans appear here */
712     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
713         return UBADTYPE;
714     }
715
716     /* Should not get this for the sync site */
717     if (ubeacon_AmSyncSite()) {
718         return UDEADLOCK;
719     }
720
721     dbase = ubik_currentTrans->dbase;
722     DBHOLD(dbase);
723     urecovery_CheckTid(atid, 0);
724     if (!ubik_currentTrans) {
725         DBRELE(dbase);
726         return USYNC;
727     }
728
729     /* Set the label if its version matches the sync-site's */
730     if ((oldversionp->epoch == ubik_dbVersion.epoch)
731         && (oldversionp->counter == ubik_dbVersion.counter)) {
732         code = (*dbase->setlabel) (ubik_dbase, 0, newversionp);
733         if (!code) {
734             ubik_dbase->version = *newversionp;
735             ubik_dbVersion = *newversionp;
736         }
737     } else {
738         code = USYNC;
739     }
740
741     DBRELE(dbase);
742     return code;
743 }