ubik: remote: fix DB lock usage
[openafs.git] / src / ubik / remote.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #include <roken.h>
14
15 #include <lock.h>
16 #include <rx/xdr.h>
17 #include <rx/rx.h>
18 #include <errno.h>
19 #include <afs/afsutil.h>
20
21 #define UBIK_INTERNALS
22 #include "ubik.h"
23 #include "ubik_int.h"
24
25 static void printServerInfo(void);
26
27 /*! \file
28  * routines for handling requests remotely-submitted by the sync site.  These are
29  * only write transactions (we don't propagate read trans), and there is at most one
30  * write transaction extant at any one time.
31  */
32
33 struct ubik_trans *ubik_currentTrans = 0;
34
35
36
37 /* the rest of these guys handle remote execution of write
38  * transactions: this is the code executed on the other servers when a
39  * sync site is executing a write transaction.
40  */
41 afs_int32
42 SDISK_Begin(struct rx_call *rxcall, struct ubik_tid *atid)
43 {
44     afs_int32 code;
45
46     if ((code = ubik_CheckAuth(rxcall))) {
47         return code;
48     }
49     DBHOLD(ubik_dbase);
50     urecovery_CheckTid(atid, 1);
51     code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
52     if (!code && ubik_currentTrans) {
53         /* label this trans with the right trans id */
54         ubik_currentTrans->tid.epoch = atid->epoch;
55         ubik_currentTrans->tid.counter = atid->counter;
56     }
57     DBRELE(ubik_dbase);
58     return code;
59 }
60
61
62 afs_int32
63 SDISK_Commit(struct rx_call *rxcall, struct ubik_tid *atid)
64 {
65     afs_int32 code;
66
67     if ((code = ubik_CheckAuth(rxcall))) {
68         return code;
69     }
70     ObtainWriteLock(&ubik_dbase->cache_lock);
71     DBHOLD(ubik_dbase);
72     if (!ubik_currentTrans) {
73         code = USYNC;
74         goto done;
75     }
76     /*
77      * sanity check to make sure only write trans appear here
78      */
79     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
80         code = UBADTYPE;
81         goto done;
82     }
83
84     urecovery_CheckTid(atid, 0);
85     if (!ubik_currentTrans) {
86         code = USYNC;
87         goto done;
88     }
89
90     code = udisk_commit(ubik_currentTrans);
91     if (code == 0) {
92         /* sync site should now match */
93         uvote_set_dbVersion(ubik_dbase->version);
94     }
95 done:
96     DBRELE(ubik_dbase);
97     ReleaseWriteLock(&ubik_dbase->cache_lock);
98     return code;
99 }
100
101 afs_int32
102 SDISK_ReleaseLocks(struct rx_call *rxcall, struct ubik_tid *atid)
103 {
104     afs_int32 code;
105
106     if ((code = ubik_CheckAuth(rxcall))) {
107         return code;
108     }
109
110     DBHOLD(ubik_dbase);
111
112     if (!ubik_currentTrans) {
113         code = USYNC;
114         goto done;
115     }
116     /* sanity check to make sure only write trans appear here */
117     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
118         code = UBADTYPE;
119         goto done;
120     }
121
122     urecovery_CheckTid(atid, 0);
123     if (!ubik_currentTrans) {
124         code = USYNC;
125         goto done;
126     }
127
128     /* If the thread is not waiting for lock - ok to end it */
129     if (ubik_currentTrans->locktype != LOCKWAIT) {
130         udisk_end(ubik_currentTrans);
131     }
132     ubik_currentTrans = (struct ubik_trans *)0;
133 done:
134     DBRELE(ubik_dbase);
135     return code;
136 }
137
138 afs_int32
139 SDISK_Abort(struct rx_call *rxcall, struct ubik_tid *atid)
140 {
141     afs_int32 code;
142
143     if ((code = ubik_CheckAuth(rxcall))) {
144         return code;
145     }
146     DBHOLD(ubik_dbase);
147     if (!ubik_currentTrans) {
148         code = USYNC;
149         goto done;
150     }
151     /* sanity check to make sure only write trans appear here  */
152     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
153         code = UBADTYPE;
154         goto done;
155     }
156
157     urecovery_CheckTid(atid, 0);
158     if (!ubik_currentTrans) {
159         code = USYNC;
160         goto done;
161     }
162
163     code = udisk_abort(ubik_currentTrans);
164     /* If the thread is not waiting for lock - ok to end it */
165     if (ubik_currentTrans->locktype != LOCKWAIT) {
166         udisk_end(ubik_currentTrans);
167     }
168     ubik_currentTrans = (struct ubik_trans *)0;
169 done:
170     DBRELE(ubik_dbase);
171     return code;
172 }
173
174 /* apos and alen are not used */
175 afs_int32
176 SDISK_Lock(struct rx_call *rxcall, struct ubik_tid *atid,
177            afs_int32 afile, afs_int32 apos, afs_int32 alen, afs_int32 atype)
178 {
179     afs_int32 code;
180     struct ubik_trans *ubik_thisTrans;
181
182     if ((code = ubik_CheckAuth(rxcall))) {
183         return code;
184     }
185     DBHOLD(ubik_dbase);
186     if (!ubik_currentTrans) {
187         code = USYNC;
188         goto done;
189     }
190     /* sanity check to make sure only write trans appear here */
191     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
192         code = UBADTYPE;
193         goto done;
194     }
195     if (alen != 1) {
196         code = UBADLOCK;
197         goto done;
198     }
199     urecovery_CheckTid(atid, 0);
200     if (!ubik_currentTrans) {
201         code = USYNC;
202         goto done;
203     }
204
205     ubik_thisTrans = ubik_currentTrans;
206     code = ulock_getLock(ubik_currentTrans, atype, 1);
207
208     /* While waiting, the transaction may have been ended/
209      * aborted from under us (urecovery_CheckTid). In that
210      * case, end the transaction here.
211      */
212     if (!code && (ubik_currentTrans != ubik_thisTrans)) {
213         udisk_end(ubik_thisTrans);
214         code = USYNC;
215     }
216 done:
217     DBRELE(ubik_dbase);
218     return code;
219 }
220
221 /*!
222  * \brief Write a vector of data
223  */
224 afs_int32
225 SDISK_WriteV(struct rx_call *rxcall, struct ubik_tid *atid,
226              iovec_wrt *io_vector, iovec_buf *io_buffer)
227 {
228     afs_int32 code, i, offset;
229     struct ubik_iovec *iovec;
230     char *iobuf;
231
232     if ((code = ubik_CheckAuth(rxcall))) {
233         return code;
234     }
235     DBHOLD(ubik_dbase);
236     if (!ubik_currentTrans) {
237         code = USYNC;
238         goto done;
239     }
240     /* sanity check to make sure only write trans appear here */
241     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
242         code = UBADTYPE;
243         goto done;
244     }
245
246     urecovery_CheckTid(atid, 0);
247     if (!ubik_currentTrans) {
248         code = USYNC;
249         goto done;
250     }
251
252     iovec = (struct ubik_iovec *)io_vector->iovec_wrt_val;
253     iobuf = (char *)io_buffer->iovec_buf_val;
254     for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
255         /* Sanity check for going off end of buffer */
256         if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
257             code = UINTERNAL;
258         } else {
259             code =
260                 udisk_write(ubik_currentTrans, iovec[i].file, &iobuf[offset],
261                             iovec[i].position, iovec[i].length);
262         }
263         if (code)
264             break;
265
266         offset += iovec[i].length;
267     }
268 done:
269     DBRELE(ubik_dbase);
270     return code;
271 }
272
273 afs_int32
274 SDISK_Write(struct rx_call *rxcall, struct ubik_tid *atid,
275             afs_int32 afile, afs_int32 apos, bulkdata *adata)
276 {
277     afs_int32 code;
278
279     if ((code = ubik_CheckAuth(rxcall))) {
280         return code;
281     }
282     DBHOLD(ubik_dbase);
283     if (!ubik_currentTrans) {
284         code = USYNC;
285         goto done;
286     }
287     /* sanity check to make sure only write trans appear here */
288     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
289         code = UBADTYPE;
290         goto done;
291     }
292
293     urecovery_CheckTid(atid, 0);
294     if (!ubik_currentTrans) {
295         code = USYNC;
296         goto done;
297     }
298     code =
299         udisk_write(ubik_currentTrans, afile, adata->bulkdata_val, apos,
300                     adata->bulkdata_len);
301 done:
302     DBRELE(ubik_dbase);
303     return code;
304 }
305
306 afs_int32
307 SDISK_Truncate(struct rx_call *rxcall, struct ubik_tid *atid,
308                afs_int32 afile, afs_int32 alen)
309 {
310     afs_int32 code;
311
312     if ((code = ubik_CheckAuth(rxcall))) {
313         return code;
314     }
315     DBHOLD(ubik_dbase);
316     if (!ubik_currentTrans) {
317         code = USYNC;
318         goto done;
319     }
320     /* sanity check to make sure only write trans appear here */
321     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
322         code = UBADTYPE;
323         goto done;
324     }
325
326     urecovery_CheckTid(atid, 0);
327     if (!ubik_currentTrans) {
328         code = USYNC;
329         goto done;
330     }
331     code = udisk_truncate(ubik_currentTrans, afile, alen);
332 done:
333     DBRELE(ubik_dbase);
334     return code;
335 }
336
337 afs_int32
338 SDISK_GetVersion(struct rx_call *rxcall,
339                  struct ubik_version *aversion)
340 {
341     afs_int32 code;
342
343     if ((code = ubik_CheckAuth(rxcall))) {
344         return code;
345     }
346
347     /*
348      * If we are the sync site, recovery shouldn't be running on any
349      * other site. We shouldn't be getting this RPC as long as we are
350      * the sync site.  To prevent any unforseen activity, we should
351      * reject this RPC until we have recognized that we are not the
352      * sync site anymore, and/or if we have any pending WRITE
353      * transactions that have to complete. This way we can be assured
354      * that this RPC would not block any pending transactions that
355      * should either fail or pass. If we have recognized the fact that
356      * we are not the sync site any more, all write transactions would
357      * fail with UNOQUORUM anyway.
358      */
359     DBHOLD(ubik_dbase);
360     if (ubeacon_AmSyncSite()) {
361         DBRELE(ubik_dbase);
362         return UDEADLOCK;
363     }
364
365     code = (*ubik_dbase->getlabel) (ubik_dbase, 0, aversion);
366     DBRELE(ubik_dbase);
367     if (code) {
368         /* tell other side there's no dbase */
369         aversion->epoch = 0;
370         aversion->counter = 0;
371     }
372     return 0;
373 }
374
375 afs_int32
376 SDISK_GetFile(struct rx_call *rxcall, afs_int32 file,
377               struct ubik_version *version)
378 {
379     afs_int32 code;
380     struct ubik_dbase *dbase;
381     afs_int32 offset;
382     struct ubik_stat ubikstat;
383     char tbuffer[256];
384     afs_int32 tlen;
385     afs_int32 length;
386
387     if ((code = ubik_CheckAuth(rxcall))) {
388         return code;
389     }
390     dbase = ubik_dbase;
391     DBHOLD(dbase);
392     code = (*dbase->stat) (dbase, file, &ubikstat);
393     if (code < 0) {
394         DBRELE(dbase);
395         return code;
396     }
397     length = ubikstat.size;
398     tlen = htonl(length);
399     code = rx_Write(rxcall, (char *)&tlen, sizeof(afs_int32));
400     if (code != sizeof(afs_int32)) {
401         DBRELE(dbase);
402         ubik_dprint("Rx-write length error=%d\n", code);
403         return BULK_ERROR;
404     }
405     offset = 0;
406     while (length > 0) {
407         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
408         code = (*dbase->read) (dbase, file, tbuffer, offset, tlen);
409         if (code != tlen) {
410             DBRELE(dbase);
411             ubik_dprint("read failed error=%d\n", code);
412             return UIOERROR;
413         }
414         code = rx_Write(rxcall, tbuffer, tlen);
415         if (code != tlen) {
416             DBRELE(dbase);
417             ubik_dprint("Rx-write length error=%d\n", code);
418             return BULK_ERROR;
419         }
420         length -= tlen;
421         offset += tlen;
422     }
423     code = (*dbase->getlabel) (dbase, file, version);   /* return the dbase, too */
424     DBRELE(dbase);
425     return code;
426 }
427
428 afs_int32
429 SDISK_SendFile(struct rx_call *rxcall, afs_int32 file,
430                afs_int32 length, struct ubik_version *avers)
431 {
432     afs_int32 code;
433     struct ubik_dbase *dbase = NULL;
434     char tbuffer[1024];
435     afs_int32 offset;
436     struct ubik_version tversion;
437     int tlen;
438     struct rx_peer *tpeer;
439     struct rx_connection *tconn;
440     afs_uint32 otherHost = 0;
441     char hoststr[16];
442     char pbuffer[1028];
443     int fd = -1;
444     afs_int32 epoch = 0;
445     afs_int32 pass;
446
447     /* send the file back to the requester */
448
449     dbase = ubik_dbase;
450
451     if ((code = ubik_CheckAuth(rxcall))) {
452         DBHOLD(dbase);
453         goto failed;
454     }
455
456     /* next, we do a sanity check to see if the guy sending us the database is
457      * the guy we think is the sync site.  It turns out that we might not have
458      * decided yet that someone's the sync site, but they could have enough
459      * votes from others to be sync site anyway, and could send us the database
460      * in advance of getting our votes.  This is fine, what we're really trying
461      * to check is that some authenticated bogon isn't sending a random database
462      * into another configuration.  This could happen on a bad configuration
463      * screwup.  Thus, we only object if we're sure we know who the sync site
464      * is, and it ain't the guy talking to us.
465      */
466     offset = uvote_GetSyncSite();
467     tconn = rx_ConnectionOf(rxcall);
468     tpeer = rx_PeerOf(tconn);
469     otherHost = ubikGetPrimaryInterfaceAddr(rx_HostOf(tpeer));
470     if (offset && offset != otherHost) {
471         /* we *know* this is the wrong guy */
472         code = USYNC;
473         DBHOLD(dbase);
474         goto failed;
475     }
476
477     DBHOLD(dbase);
478
479     /* abort any active trans that may scribble over the database */
480     urecovery_AbortAll(dbase);
481
482     ubik_print("Ubik: Synchronize database with server %s\n",
483                afs_inet_ntoa_r(otherHost, hoststr));
484
485     offset = 0;
486     UBIK_VERSION_LOCK;
487     epoch = tversion.epoch = 0;         /* start off by labelling in-transit db as invalid */
488     (*dbase->setlabel) (dbase, file, &tversion);        /* setlabel does sync */
489     snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP",
490              ubik_dbase->pathName, (file<0)?"SYS":"",
491              (file<0)?-file:file);
492     fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
493     if (fd < 0) {
494         code = errno;
495         goto failed_locked;
496     }
497     code = lseek(fd, HDRSIZE, 0);
498     if (code != HDRSIZE) {
499         close(fd);
500         goto failed_locked;
501     }
502     pass = 0;
503     memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
504     UBIK_VERSION_UNLOCK;
505     while (length > 0) {
506         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
507 #if !defined(AFS_PTHREAD_ENV)
508         if (pass % 4 == 0)
509             IOMGR_Poll();
510 #endif
511         code = rx_Read(rxcall, tbuffer, tlen);
512         if (code != tlen) {
513             ubik_dprint("Rx-read length error=%d\n", code);
514             code = BULK_ERROR;
515             close(fd);
516             goto failed;
517         }
518         code = write(fd, tbuffer, tlen);
519         pass++;
520         if (code != tlen) {
521             ubik_dprint("write failed error=%d\n", code);
522             code = UIOERROR;
523             close(fd);
524             goto failed;
525         }
526         offset += tlen;
527         length -= tlen;
528     }
529     code = close(fd);
530     if (code)
531         goto failed;
532
533     /* sync data first, then write label and resync (resync done by setlabel call).
534      * This way, good label is only on good database. */
535     snprintf(tbuffer, sizeof(tbuffer), "%s.DB%s%d",
536              ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
537 #ifdef AFS_NT40_ENV
538     snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD",
539              ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
540     code = unlink(pbuffer);
541     if (!code)
542         code = rename(tbuffer, pbuffer);
543     snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP",
544              ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
545 #endif
546     if (!code)
547         code = rename(pbuffer, tbuffer);
548     UBIK_VERSION_LOCK;
549     if (!code) {
550         (*ubik_dbase->open) (ubik_dbase, file);
551         code = (*ubik_dbase->setlabel) (dbase, file, avers);
552     }
553 #ifdef AFS_NT40_ENV
554     snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD",
555              ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
556     unlink(pbuffer);
557 #endif
558     memcpy(&ubik_dbase->version, avers, sizeof(struct ubik_version));
559     udisk_Invalidate(dbase, file);      /* new dbase, flush disk buffers */
560 #ifdef AFS_PTHREAD_ENV
561     assert(pthread_cond_broadcast(&dbase->version_cond) == 0);
562 #else
563     LWP_NoYieldSignal(&dbase->version);
564 #endif
565
566 failed_locked:
567     UBIK_VERSION_UNLOCK;
568
569 failed:
570     if (code) {
571         unlink(pbuffer);
572         /* Failed to sync. Allow reads again for now. */
573         if (dbase != NULL) {
574             UBIK_VERSION_LOCK;
575             tversion.epoch = epoch;
576             (*dbase->setlabel) (dbase, file, &tversion);
577             UBIK_VERSION_UNLOCK;
578         }
579         ubik_print
580             ("Ubik: Synchronize database with server %s failed (error = %d)\n",
581              afs_inet_ntoa_r(otherHost, hoststr), code);
582     } else {
583         ubik_print("Ubik: Synchronize database completed\n");
584     }
585     DBRELE(dbase);
586     return code;
587 }
588
589
590 afs_int32
591 SDISK_Probe(struct rx_call *rxcall)
592 {
593     return 0;
594 }
595
596 /*!
597  * \brief Update remote machines addresses in my server list
598  *
599  * Send back my addresses to caller of this RPC
600  * \return zero on success, else 1.
601  */
602 afs_int32
603 SDISK_UpdateInterfaceAddr(struct rx_call *rxcall,
604                           UbikInterfaceAddr *inAddr,
605                           UbikInterfaceAddr *outAddr)
606 {
607     struct ubik_server *ts, *tmp;
608     afs_uint32 remoteAddr;      /* in net byte order */
609     int i, j, found = 0, probableMatch = 0;
610     char hoststr[16];
611
612     UBIK_ADDR_LOCK;
613     /* copy the output parameters */
614     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR; i++)
615         outAddr->hostAddr[i] = ntohl(ubik_host[i]);
616
617     remoteAddr = htonl(inAddr->hostAddr[0]);
618     for (ts = ubik_servers; ts; ts = ts->next)
619         if (ts->addr[0] == remoteAddr) {        /* both in net byte order */
620             probableMatch = 1;
621             break;
622         }
623
624     if (probableMatch) {
625         /* verify that all addresses in the incoming RPC are
626          ** not part of other server entries in my CellServDB
627          */
628         for (i = 0; !found && (i < UBIK_MAX_INTERFACE_ADDR)
629              && inAddr->hostAddr[i]; i++) {
630             remoteAddr = htonl(inAddr->hostAddr[i]);
631             for (tmp = ubik_servers; (!found && tmp); tmp = tmp->next) {
632                 if (ts == tmp)  /* this is my server */
633                     continue;
634                 for (j = 0; (j < UBIK_MAX_INTERFACE_ADDR) && tmp->addr[j];
635                      j++)
636                     if (remoteAddr == tmp->addr[j]) {
637                         found = 1;
638                         break;
639                     }
640             }
641         }
642     }
643
644     /* if (probableMatch) */
645     /* inconsistent addresses in CellServDB */
646     if (!probableMatch || found) {
647         ubik_print("Inconsistent Cell Info from server: ");
648         for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
649             ubik_print("%s ", afs_inet_ntoa_r(htonl(inAddr->hostAddr[i]), hoststr));
650         ubik_print("\n");
651         fflush(stdout);
652         fflush(stderr);
653         printServerInfo();
654         UBIK_ADDR_UNLOCK;
655         return UBADHOST;
656     }
657
658     /* update our data structures */
659     for (i = 1; i < UBIK_MAX_INTERFACE_ADDR; i++)
660         ts->addr[i] = htonl(inAddr->hostAddr[i]);
661
662     ubik_print("ubik: A Remote Server has addresses: ");
663     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && ts->addr[i]; i++)
664         ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
665     ubik_print("\n");
666
667     UBIK_ADDR_UNLOCK;
668     return 0;
669 }
670
671 static void
672 printServerInfo(void)
673 {
674     struct ubik_server *ts;
675     int i, j = 1;
676     char hoststr[16];
677
678     ubik_print("Local CellServDB:");
679     for (ts = ubik_servers; ts; ts = ts->next, j++) {
680         ubik_print("Server %d: ", j);
681         for (i = 0; (i < UBIK_MAX_INTERFACE_ADDR) && ts->addr[i]; i++)
682             ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
683     }
684     ubik_print("\n");
685 }
686
687 afs_int32
688 SDISK_SetVersion(struct rx_call *rxcall, struct ubik_tid *atid,
689                  struct ubik_version *oldversionp,
690                  struct ubik_version *newversionp)
691 {
692     afs_int32 code = 0;
693
694     if ((code = ubik_CheckAuth(rxcall))) {
695         return (code);
696     }
697     DBHOLD(ubik_dbase);
698     if (!ubik_currentTrans) {
699         code = USYNC;
700         goto done;
701     }
702     /* sanity check to make sure only write trans appear here */
703     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
704         code = UBADTYPE;
705         goto done;
706     }
707
708     /* Should not get this for the sync site */
709     if (ubeacon_AmSyncSite()) {
710         code = UDEADLOCK;
711         goto done;
712     }
713
714     urecovery_CheckTid(atid, 0);
715     if (!ubik_currentTrans) {
716         code = USYNC;
717         goto done;
718     }
719
720     /* Set the label if its version matches the sync-site's */
721     if (uvote_eq_dbVersion(*oldversionp)) {
722         UBIK_VERSION_LOCK;
723         code = (*ubik_dbase->setlabel) (ubik_dbase, 0, newversionp);
724         if (!code) {
725             ubik_dbase->version = *newversionp;
726             uvote_set_dbVersion(*newversionp);
727         }
728         UBIK_VERSION_UNLOCK;
729     } else {
730         code = USYNC;
731     }
732 done:
733     DBRELE(ubik_dbase);
734     return code;
735 }