afs_snprintf is dead, long live rk_snprintf
[openafs.git] / src / ubik / remote.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #include <roken.h>
14
15 #include <sys/types.h>
16 #include <string.h>
17 #include <stdarg.h>
18
19 #ifdef AFS_NT40_ENV
20 #include <winsock2.h>
21 #include <fcntl.h>
22 #else
23 #include <sys/file.h>
24 #include <netinet/in.h>
25 #endif
26
27 #include <lock.h>
28 #include <rx/xdr.h>
29 #include <rx/rx.h>
30 #include <errno.h>
31 #include <afs/afsutil.h>
32
33 #define UBIK_INTERNALS
34 #include "ubik.h"
35 #include "ubik_int.h"
36
37 static void printServerInfo(void);
38
39 /*! \file
40  * routines for handling requests remotely-submitted by the sync site.  These are
41  * only write transactions (we don't propagate read trans), and there is at most one
42  * write transaction extant at any one time.
43  */
44
45 struct ubik_trans *ubik_currentTrans = 0;
46
47
48
49 /* the rest of these guys handle remote execution of write
50  * transactions: this is the code executed on the other servers when a
51  * sync site is executing a write transaction.
52  */
53 afs_int32
54 SDISK_Begin(struct rx_call *rxcall, struct ubik_tid *atid)
55 {
56     afs_int32 code;
57
58     if ((code = ubik_CheckAuth(rxcall))) {
59         return code;
60     }
61     DBHOLD(ubik_dbase);
62     urecovery_CheckTid(atid, 1);
63     code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
64     if (!code && ubik_currentTrans) {
65         /* label this trans with the right trans id */
66         ubik_currentTrans->tid.epoch = atid->epoch;
67         ubik_currentTrans->tid.counter = atid->counter;
68     }
69     DBRELE(ubik_dbase);
70     return code;
71 }
72
73
74 afs_int32
75 SDISK_Commit(struct rx_call *rxcall, struct ubik_tid *atid)
76 {
77     afs_int32 code;
78     struct ubik_dbase *dbase;
79
80     if ((code = ubik_CheckAuth(rxcall))) {
81         return code;
82     }
83
84     if (!ubik_currentTrans) {
85         return USYNC;
86     }
87     /*
88      * sanity check to make sure only write trans appear here
89      */
90     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
91         return UBADTYPE;
92     }
93
94     dbase = ubik_currentTrans->dbase;
95
96     ObtainWriteLock(&dbase->cache_lock);
97
98     DBHOLD(dbase);
99
100     urecovery_CheckTid(atid, 0);
101     if (!ubik_currentTrans) {
102         DBRELE(dbase);
103         ReleaseWriteLock(&dbase->cache_lock);
104         return USYNC;
105     }
106
107     code = udisk_commit(ubik_currentTrans);
108     if (code == 0) {
109         /* sync site should now match */
110         ubik_dbVersion = ubik_dbase->version;
111     }
112     DBRELE(dbase);
113     ReleaseWriteLock(&dbase->cache_lock);
114     return code;
115 }
116
117 afs_int32
118 SDISK_ReleaseLocks(struct rx_call *rxcall, struct ubik_tid *atid)
119 {
120     struct ubik_dbase *dbase;
121     afs_int32 code;
122
123     if ((code = ubik_CheckAuth(rxcall))) {
124         return code;
125     }
126
127     if (!ubik_currentTrans) {
128         return USYNC;
129     }
130     /* sanity check to make sure only write trans appear here */
131     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
132         return UBADTYPE;
133     }
134
135     dbase = ubik_currentTrans->dbase;
136     DBHOLD(dbase);
137     urecovery_CheckTid(atid, 0);
138     if (!ubik_currentTrans) {
139         DBRELE(dbase);
140         return USYNC;
141     }
142
143     /* If the thread is not waiting for lock - ok to end it */
144     if (ubik_currentTrans->locktype != LOCKWAIT) {
145         udisk_end(ubik_currentTrans);
146     }
147     ubik_currentTrans = (struct ubik_trans *)0;
148     DBRELE(dbase);
149     return 0;
150 }
151
152 afs_int32
153 SDISK_Abort(struct rx_call *rxcall, struct ubik_tid *atid)
154 {
155     afs_int32 code;
156     struct ubik_dbase *dbase;
157
158     if ((code = ubik_CheckAuth(rxcall))) {
159         return code;
160     }
161
162     if (!ubik_currentTrans) {
163         return USYNC;
164     }
165     /* sanity check to make sure only write trans appear here  */
166     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
167         return UBADTYPE;
168     }
169
170     dbase = ubik_currentTrans->dbase;
171     DBHOLD(dbase);
172     urecovery_CheckTid(atid, 0);
173     if (!ubik_currentTrans) {
174         DBRELE(dbase);
175         return USYNC;
176     }
177
178     code = udisk_abort(ubik_currentTrans);
179     /* If the thread is not waiting for lock - ok to end it */
180     if (ubik_currentTrans->locktype != LOCKWAIT) {
181         udisk_end(ubik_currentTrans);
182     }
183     ubik_currentTrans = (struct ubik_trans *)0;
184     DBRELE(dbase);
185     return code;
186 }
187
188 /* apos and alen are not used */
189 afs_int32
190 SDISK_Lock(struct rx_call *rxcall, struct ubik_tid *atid,
191            afs_int32 afile, afs_int32 apos, afs_int32 alen, afs_int32 atype)
192 {
193     afs_int32 code;
194     struct ubik_dbase *dbase;
195     struct ubik_trans *ubik_thisTrans;
196
197     if ((code = ubik_CheckAuth(rxcall))) {
198         return code;
199     }
200     if (!ubik_currentTrans) {
201         return USYNC;
202     }
203     /* sanity check to make sure only write trans appear here */
204     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
205         return UBADTYPE;
206     }
207     if (alen != 1) {
208         return UBADLOCK;
209     }
210     dbase = ubik_currentTrans->dbase;
211     DBHOLD(dbase);
212     urecovery_CheckTid(atid, 0);
213     if (!ubik_currentTrans) {
214         DBRELE(dbase);
215         return USYNC;
216     }
217
218     ubik_thisTrans = ubik_currentTrans;
219     code = ulock_getLock(ubik_currentTrans, atype, 1);
220
221     /* While waiting, the transaction may have been ended/
222      * aborted from under us (urecovery_CheckTid). In that
223      * case, end the transaction here.
224      */
225     if (!code && (ubik_currentTrans != ubik_thisTrans)) {
226         udisk_end(ubik_thisTrans);
227         code = USYNC;
228     }
229
230     DBRELE(dbase);
231     return code;
232 }
233
234 /*!
235  * \brief Write a vector of data
236  */
237 afs_int32
238 SDISK_WriteV(struct rx_call *rxcall, struct ubik_tid *atid,
239              iovec_wrt *io_vector, iovec_buf *io_buffer)
240 {
241     afs_int32 code, i, offset;
242     struct ubik_dbase *dbase;
243     struct ubik_iovec *iovec;
244     char *iobuf;
245
246     if ((code = ubik_CheckAuth(rxcall))) {
247         return code;
248     }
249     if (!ubik_currentTrans) {
250         return USYNC;
251     }
252     /* sanity check to make sure only write trans appear here */
253     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
254         return UBADTYPE;
255     }
256
257     dbase = ubik_currentTrans->dbase;
258     DBHOLD(dbase);
259     urecovery_CheckTid(atid, 0);
260     if (!ubik_currentTrans) {
261         DBRELE(dbase);
262         return USYNC;
263     }
264
265     iovec = (struct ubik_iovec *)io_vector->iovec_wrt_val;
266     iobuf = (char *)io_buffer->iovec_buf_val;
267     for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
268         /* Sanity check for going off end of buffer */
269         if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
270             code = UINTERNAL;
271         } else {
272             code =
273                 udisk_write(ubik_currentTrans, iovec[i].file, &iobuf[offset],
274                             iovec[i].position, iovec[i].length);
275         }
276         if (code)
277             break;
278
279         offset += iovec[i].length;
280     }
281
282     DBRELE(dbase);
283     return code;
284 }
285
286 afs_int32
287 SDISK_Write(struct rx_call *rxcall, struct ubik_tid *atid,
288             afs_int32 afile, afs_int32 apos, bulkdata *adata)
289 {
290     afs_int32 code;
291     struct ubik_dbase *dbase;
292
293     if ((code = ubik_CheckAuth(rxcall))) {
294         return code;
295     }
296     if (!ubik_currentTrans) {
297         return USYNC;
298     }
299     /* sanity check to make sure only write trans appear here */
300     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
301         return UBADTYPE;
302     }
303
304     dbase = ubik_currentTrans->dbase;
305     DBHOLD(dbase);
306     urecovery_CheckTid(atid, 0);
307     if (!ubik_currentTrans) {
308         DBRELE(dbase);
309         return USYNC;
310     }
311     code =
312         udisk_write(ubik_currentTrans, afile, adata->bulkdata_val, apos,
313                     adata->bulkdata_len);
314     DBRELE(dbase);
315     return code;
316 }
317
318 afs_int32
319 SDISK_Truncate(struct rx_call *rxcall, struct ubik_tid *atid,
320                afs_int32 afile, afs_int32 alen)
321 {
322     afs_int32 code;
323     struct ubik_dbase *dbase;
324
325     if ((code = ubik_CheckAuth(rxcall))) {
326         return code;
327     }
328     if (!ubik_currentTrans) {
329         return USYNC;
330     }
331     /* sanity check to make sure only write trans appear here */
332     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
333         return UBADTYPE;
334     }
335
336     dbase = ubik_currentTrans->dbase;
337     DBHOLD(dbase);
338     urecovery_CheckTid(atid, 0);
339     if (!ubik_currentTrans) {
340         DBRELE(dbase);
341         return USYNC;
342     }
343     code = udisk_truncate(ubik_currentTrans, afile, alen);
344     DBRELE(dbase);
345     return code;
346 }
347
348 afs_int32
349 SDISK_GetVersion(struct rx_call *rxcall,
350                  struct ubik_version *aversion)
351 {
352     afs_int32 code;
353
354     if ((code = ubik_CheckAuth(rxcall))) {
355         return code;
356     }
357
358     /*
359      * If we are the sync site, recovery shouldn't be running on any
360      * other site. We shouldn't be getting this RPC as long as we are
361      * the sync site.  To prevent any unforseen activity, we should
362      * reject this RPC until we have recognized that we are not the
363      * sync site anymore, and/or if we have any pending WRITE
364      * transactions that have to complete. This way we can be assured
365      * that this RPC would not block any pending transactions that
366      * should either fail or pass. If we have recognized the fact that
367      * we are not the sync site any more, all write transactions would
368      * fail with UNOQUORUM anyway.
369      */
370     if (ubeacon_AmSyncSite()) {
371         return UDEADLOCK;
372     }
373
374     DBHOLD(ubik_dbase);
375     code = (*ubik_dbase->getlabel) (ubik_dbase, 0, aversion);
376     DBRELE(ubik_dbase);
377     if (code) {
378         /* tell other side there's no dbase */
379         aversion->epoch = 0;
380         aversion->counter = 0;
381     }
382     return 0;
383 }
384
385 afs_int32
386 SDISK_GetFile(struct rx_call *rxcall, afs_int32 file,
387               struct ubik_version *version)
388 {
389     afs_int32 code;
390     struct ubik_dbase *dbase;
391     afs_int32 offset;
392     struct ubik_stat ubikstat;
393     char tbuffer[256];
394     afs_int32 tlen;
395     afs_int32 length;
396
397     if ((code = ubik_CheckAuth(rxcall))) {
398         return code;
399     }
400 /* temporarily disabled because it causes problems for migration tool.  Hey, it's just
401  * a sanity check, anyway.
402     if (ubeacon_AmSyncSite()) {
403       return UDEADLOCK;
404     }
405 */
406     dbase = ubik_dbase;
407     DBHOLD(dbase);
408     code = (*dbase->stat) (dbase, file, &ubikstat);
409     if (code < 0) {
410         DBRELE(dbase);
411         return code;
412     }
413     length = ubikstat.size;
414     tlen = htonl(length);
415     code = rx_Write(rxcall, (char *)&tlen, sizeof(afs_int32));
416     if (code != sizeof(afs_int32)) {
417         DBRELE(dbase);
418         ubik_dprint("Rx-write length error=%d\n", code);
419         return BULK_ERROR;
420     }
421     offset = 0;
422     while (length > 0) {
423         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
424         code = (*dbase->read) (dbase, file, tbuffer, offset, tlen);
425         if (code != tlen) {
426             DBRELE(dbase);
427             ubik_dprint("read failed error=%d\n", code);
428             return UIOERROR;
429         }
430         code = rx_Write(rxcall, tbuffer, tlen);
431         if (code != tlen) {
432             DBRELE(dbase);
433             ubik_dprint("Rx-write length error=%d\n", code);
434             return BULK_ERROR;
435         }
436         length -= tlen;
437         offset += tlen;
438     }
439     code = (*dbase->getlabel) (dbase, file, version);   /* return the dbase, too */
440     DBRELE(dbase);
441     return code;
442 }
443
444 afs_int32
445 SDISK_SendFile(struct rx_call *rxcall, afs_int32 file,
446                afs_int32 length, struct ubik_version *avers)
447 {
448     afs_int32 code;
449     struct ubik_dbase *dbase = NULL;
450     char tbuffer[1024];
451     afs_int32 offset;
452     struct ubik_version tversion;
453     int tlen;
454     struct rx_peer *tpeer;
455     struct rx_connection *tconn;
456     afs_uint32 otherHost = 0;
457     char hoststr[16];
458     char pbuffer[1028];
459     int fd = -1;
460     afs_int32 epoch = 0;
461     afs_int32 pass;
462
463     /* send the file back to the requester */
464
465     dbase = ubik_dbase;
466
467     if ((code = ubik_CheckAuth(rxcall))) {
468         DBHOLD(dbase);
469         goto failed;
470     }
471
472     /* next, we do a sanity check to see if the guy sending us the database is
473      * the guy we think is the sync site.  It turns out that we might not have
474      * decided yet that someone's the sync site, but they could have enough
475      * votes from others to be sync site anyway, and could send us the database
476      * in advance of getting our votes.  This is fine, what we're really trying
477      * to check is that some authenticated bogon isn't sending a random database
478      * into another configuration.  This could happen on a bad configuration
479      * screwup.  Thus, we only object if we're sure we know who the sync site
480      * is, and it ain't the guy talking to us.
481      */
482     offset = uvote_GetSyncSite();
483     tconn = rx_ConnectionOf(rxcall);
484     tpeer = rx_PeerOf(tconn);
485     otherHost = ubikGetPrimaryInterfaceAddr(rx_HostOf(tpeer));
486     if (offset && offset != otherHost) {
487         /* we *know* this is the wrong guy */
488         code = USYNC;
489         DBHOLD(dbase);
490         goto failed;
491     }
492
493     DBHOLD(dbase);
494
495     /* abort any active trans that may scribble over the database */
496     urecovery_AbortAll(dbase);
497
498     ubik_print("Ubik: Synchronize database with server %s\n",
499                afs_inet_ntoa_r(otherHost, hoststr));
500
501     offset = 0;
502     epoch = tversion.epoch = 0;         /* start off by labelling in-transit db as invalid */
503     (*dbase->setlabel) (dbase, file, &tversion);        /* setlabel does sync */
504     snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP",
505              ubik_dbase->pathName, (file<0)?"SYS":"",
506              (file<0)?-file:file);
507     fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
508     if (fd < 0) {
509         code = errno;
510         goto failed;
511     }
512     code = lseek(fd, HDRSIZE, 0);
513     if (code != HDRSIZE) {
514         close(fd);
515         goto failed;
516     }
517     pass = 0;
518     memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
519     while (length > 0) {
520         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
521 #if !defined(AFS_PTHREAD_ENV)
522         if (pass % 4 == 0)
523             IOMGR_Poll();
524 #endif
525         code = rx_Read(rxcall, tbuffer, tlen);
526         if (code != tlen) {
527             ubik_dprint("Rx-read length error=%d\n", code);
528             code = BULK_ERROR;
529             close(fd);
530             goto failed;
531         }
532         code = write(fd, tbuffer, tlen);
533         pass++;
534         if (code != tlen) {
535             ubik_dprint("write failed error=%d\n", code);
536             code = UIOERROR;
537             close(fd);
538             goto failed;
539         }
540         offset += tlen;
541         length -= tlen;
542     }
543     code = close(fd);
544     if (code)
545         goto failed;
546
547     /* sync data first, then write label and resync (resync done by setlabel call).
548      * This way, good label is only on good database. */
549     snprintf(tbuffer, sizeof(tbuffer), "%s.DB%s%d",
550              ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
551 #ifdef AFS_NT40_ENV
552     snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD",
553              ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
554     code = unlink(pbuffer);
555     if (!code)
556         code = rename(tbuffer, pbuffer);
557     snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP",
558              ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
559 #endif
560     if (!code)
561         code = rename(pbuffer, tbuffer);
562     if (!code) {
563         (*ubik_dbase->open) (ubik_dbase, file);
564         code = (*ubik_dbase->setlabel) (dbase, file, avers);
565     }
566 #ifdef AFS_NT40_ENV
567     snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD",
568              ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
569     unlink(pbuffer);
570 #endif
571     memcpy(&ubik_dbase->version, avers, sizeof(struct ubik_version));
572     udisk_Invalidate(dbase, file);      /* new dbase, flush disk buffers */
573 #ifdef AFS_PTHREAD_ENV
574     assert(pthread_cond_broadcast(&dbase->version_cond) == 0);
575 #else
576     LWP_NoYieldSignal(&dbase->version);
577 #endif
578
579 failed:
580     if (code) {
581         unlink(pbuffer);
582         /* Failed to sync. Allow reads again for now. */
583         if (dbase != NULL) {
584             tversion.epoch = epoch;
585             (*dbase->setlabel) (dbase, file, &tversion);
586         }
587         ubik_print
588             ("Ubik: Synchronize database with server %s failed (error = %d)\n",
589              afs_inet_ntoa_r(otherHost, hoststr), code);
590     } else {
591         ubik_print("Ubik: Synchronize database completed\n");
592     }
593     DBRELE(dbase);
594     return code;
595 }
596
597
598 afs_int32
599 SDISK_Probe(struct rx_call *rxcall)
600 {
601     return 0;
602 }
603
604 /*!
605  * \brief Update remote machines addresses in my server list
606  *
607  * Send back my addresses to caller of this RPC
608  * \return zero on success, else 1.
609  */
610 afs_int32
611 SDISK_UpdateInterfaceAddr(struct rx_call *rxcall,
612                           UbikInterfaceAddr *inAddr,
613                           UbikInterfaceAddr *outAddr)
614 {
615     struct ubik_server *ts, *tmp;
616     afs_uint32 remoteAddr;      /* in net byte order */
617     int i, j, found = 0, probableMatch = 0;
618     char hoststr[16];
619
620     /* copy the output parameters */
621     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR; i++)
622         outAddr->hostAddr[i] = ntohl(ubik_host[i]);
623
624     remoteAddr = htonl(inAddr->hostAddr[0]);
625     for (ts = ubik_servers; ts; ts = ts->next)
626         if (ts->addr[0] == remoteAddr) {        /* both in net byte order */
627             probableMatch = 1;
628             break;
629         }
630
631     if (probableMatch) {
632         /* verify that all addresses in the incoming RPC are
633          ** not part of other server entries in my CellServDB
634          */
635         for (i = 0; !found && (i < UBIK_MAX_INTERFACE_ADDR)
636              && inAddr->hostAddr[i]; i++) {
637             remoteAddr = htonl(inAddr->hostAddr[i]);
638             for (tmp = ubik_servers; (!found && tmp); tmp = tmp->next) {
639                 if (ts == tmp)  /* this is my server */
640                     continue;
641                 for (j = 0; (j < UBIK_MAX_INTERFACE_ADDR) && tmp->addr[j];
642                      j++)
643                     if (remoteAddr == tmp->addr[j]) {
644                         found = 1;
645                         break;
646                     }
647             }
648         }
649     }
650
651     /* if (probableMatch) */
652     /* inconsistent addresses in CellServDB */
653     if (!probableMatch || found) {
654         ubik_print("Inconsistent Cell Info from server: ");
655         for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
656             ubik_print("%s ", afs_inet_ntoa_r(htonl(inAddr->hostAddr[i]), hoststr));
657         ubik_print("\n");
658         fflush(stdout);
659         fflush(stderr);
660         printServerInfo();
661         return UBADHOST;
662     }
663
664     /* update our data structures */
665     for (i = 1; i < UBIK_MAX_INTERFACE_ADDR; i++)
666         ts->addr[i] = htonl(inAddr->hostAddr[i]);
667
668     ubik_print("ubik: A Remote Server has addresses: ");
669     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && ts->addr[i]; i++)
670         ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
671     ubik_print("\n");
672
673     return 0;
674 }
675
676 static void
677 printServerInfo(void)
678 {
679     struct ubik_server *ts;
680     int i, j = 1;
681     char hoststr[16];
682
683     ubik_print("Local CellServDB:");
684     for (ts = ubik_servers; ts; ts = ts->next, j++) {
685         ubik_print("Server %d: ", j);
686         for (i = 0; (i < UBIK_MAX_INTERFACE_ADDR) && ts->addr[i]; i++)
687             ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
688     }
689     ubik_print("\n");
690 }
691
692 afs_int32
693 SDISK_SetVersion(struct rx_call *rxcall, struct ubik_tid *atid,
694                  struct ubik_version *oldversionp,
695                  struct ubik_version *newversionp)
696 {
697     afs_int32 code = 0;
698     struct ubik_dbase *dbase;
699
700     if ((code = ubik_CheckAuth(rxcall))) {
701         return (code);
702     }
703
704     if (!ubik_currentTrans) {
705         return USYNC;
706     }
707     /* sanity check to make sure only write trans appear here */
708     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
709         return UBADTYPE;
710     }
711
712     /* Should not get this for the sync site */
713     if (ubeacon_AmSyncSite()) {
714         return UDEADLOCK;
715     }
716
717     dbase = ubik_currentTrans->dbase;
718     DBHOLD(dbase);
719     urecovery_CheckTid(atid, 0);
720     if (!ubik_currentTrans) {
721         DBRELE(dbase);
722         return USYNC;
723     }
724
725     /* Set the label if its version matches the sync-site's */
726     if ((oldversionp->epoch == ubik_dbVersion.epoch)
727         && (oldversionp->counter == ubik_dbVersion.counter)) {
728         code = (*dbase->setlabel) (ubik_dbase, 0, newversionp);
729         if (!code) {
730             ubik_dbase->version = *newversionp;
731             ubik_dbVersion = *newversionp;
732         }
733     } else {
734         code = USYNC;
735     }
736
737     DBRELE(dbase);
738     return code;
739 }