Further rationalise our usage of assert()
[openafs.git] / src / ubik / remote.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #include <roken.h>
14
15 #include <assert.h>
16
17 #include <afs/opr.h>
18 #include <lock.h>
19 #include <rx/xdr.h>
20 #include <rx/rx.h>
21 #include <afs/afsutil.h>
22
23 #define UBIK_INTERNALS
24 #include "ubik.h"
25 #include "ubik_int.h"
26
27 static void printServerInfo(void);
28
29 /*! \file
30  * routines for handling requests remotely-submitted by the sync site.  These are
31  * only write transactions (we don't propagate read trans), and there is at most one
32  * write transaction extant at any one time.
33  */
34
35 struct ubik_trans *ubik_currentTrans = 0;
36
37
38
39 /* the rest of these guys handle remote execution of write
40  * transactions: this is the code executed on the other servers when a
41  * sync site is executing a write transaction.
42  */
43 afs_int32
44 SDISK_Begin(struct rx_call *rxcall, struct ubik_tid *atid)
45 {
46     afs_int32 code;
47
48     if ((code = ubik_CheckAuth(rxcall))) {
49         return code;
50     }
51     DBHOLD(ubik_dbase);
52     urecovery_CheckTid(atid, 1);
53     code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
54     if (!code && ubik_currentTrans) {
55         /* label this trans with the right trans id */
56         ubik_currentTrans->tid.epoch = atid->epoch;
57         ubik_currentTrans->tid.counter = atid->counter;
58     }
59     DBRELE(ubik_dbase);
60     return code;
61 }
62
63
64 afs_int32
65 SDISK_Commit(struct rx_call *rxcall, struct ubik_tid *atid)
66 {
67     afs_int32 code;
68
69     if ((code = ubik_CheckAuth(rxcall))) {
70         return code;
71     }
72     ObtainWriteLock(&ubik_dbase->cache_lock);
73     DBHOLD(ubik_dbase);
74     if (!ubik_currentTrans) {
75         code = USYNC;
76         goto done;
77     }
78     /*
79      * sanity check to make sure only write trans appear here
80      */
81     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
82         code = UBADTYPE;
83         goto done;
84     }
85
86     urecovery_CheckTid(atid, 0);
87     if (!ubik_currentTrans) {
88         code = USYNC;
89         goto done;
90     }
91
92     code = udisk_commit(ubik_currentTrans);
93     if (code == 0) {
94         /* sync site should now match */
95         uvote_set_dbVersion(ubik_dbase->version);
96     }
97 done:
98     DBRELE(ubik_dbase);
99     ReleaseWriteLock(&ubik_dbase->cache_lock);
100     return code;
101 }
102
103 afs_int32
104 SDISK_ReleaseLocks(struct rx_call *rxcall, struct ubik_tid *atid)
105 {
106     afs_int32 code;
107
108     if ((code = ubik_CheckAuth(rxcall))) {
109         return code;
110     }
111
112     DBHOLD(ubik_dbase);
113
114     if (!ubik_currentTrans) {
115         code = USYNC;
116         goto done;
117     }
118     /* sanity check to make sure only write trans appear here */
119     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
120         code = UBADTYPE;
121         goto done;
122     }
123
124     urecovery_CheckTid(atid, 0);
125     if (!ubik_currentTrans) {
126         code = USYNC;
127         goto done;
128     }
129
130     /* If the thread is not waiting for lock - ok to end it */
131     if (ubik_currentTrans->locktype != LOCKWAIT) {
132         udisk_end(ubik_currentTrans);
133     }
134     ubik_currentTrans = (struct ubik_trans *)0;
135 done:
136     DBRELE(ubik_dbase);
137     return code;
138 }
139
140 afs_int32
141 SDISK_Abort(struct rx_call *rxcall, struct ubik_tid *atid)
142 {
143     afs_int32 code;
144
145     if ((code = ubik_CheckAuth(rxcall))) {
146         return code;
147     }
148     DBHOLD(ubik_dbase);
149     if (!ubik_currentTrans) {
150         code = USYNC;
151         goto done;
152     }
153     /* sanity check to make sure only write trans appear here  */
154     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
155         code = UBADTYPE;
156         goto done;
157     }
158
159     urecovery_CheckTid(atid, 0);
160     if (!ubik_currentTrans) {
161         code = USYNC;
162         goto done;
163     }
164
165     code = udisk_abort(ubik_currentTrans);
166     /* If the thread is not waiting for lock - ok to end it */
167     if (ubik_currentTrans->locktype != LOCKWAIT) {
168         udisk_end(ubik_currentTrans);
169     }
170     ubik_currentTrans = (struct ubik_trans *)0;
171 done:
172     DBRELE(ubik_dbase);
173     return code;
174 }
175
176 /* apos and alen are not used */
177 afs_int32
178 SDISK_Lock(struct rx_call *rxcall, struct ubik_tid *atid,
179            afs_int32 afile, afs_int32 apos, afs_int32 alen, afs_int32 atype)
180 {
181     afs_int32 code;
182     struct ubik_trans *ubik_thisTrans;
183
184     if ((code = ubik_CheckAuth(rxcall))) {
185         return code;
186     }
187     DBHOLD(ubik_dbase);
188     if (!ubik_currentTrans) {
189         code = USYNC;
190         goto done;
191     }
192     /* sanity check to make sure only write trans appear here */
193     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
194         code = UBADTYPE;
195         goto done;
196     }
197     if (alen != 1) {
198         code = UBADLOCK;
199         goto done;
200     }
201     urecovery_CheckTid(atid, 0);
202     if (!ubik_currentTrans) {
203         code = USYNC;
204         goto done;
205     }
206
207     ubik_thisTrans = ubik_currentTrans;
208     code = ulock_getLock(ubik_currentTrans, atype, 1);
209
210     /* While waiting, the transaction may have been ended/
211      * aborted from under us (urecovery_CheckTid). In that
212      * case, end the transaction here.
213      */
214     if (!code && (ubik_currentTrans != ubik_thisTrans)) {
215         udisk_end(ubik_thisTrans);
216         code = USYNC;
217     }
218 done:
219     DBRELE(ubik_dbase);
220     return code;
221 }
222
223 /*!
224  * \brief Write a vector of data
225  */
226 afs_int32
227 SDISK_WriteV(struct rx_call *rxcall, struct ubik_tid *atid,
228              iovec_wrt *io_vector, iovec_buf *io_buffer)
229 {
230     afs_int32 code, i, offset;
231     struct ubik_iovec *iovec;
232     char *iobuf;
233
234     if ((code = ubik_CheckAuth(rxcall))) {
235         return code;
236     }
237     DBHOLD(ubik_dbase);
238     if (!ubik_currentTrans) {
239         code = USYNC;
240         goto done;
241     }
242     /* sanity check to make sure only write trans appear here */
243     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
244         code = UBADTYPE;
245         goto done;
246     }
247
248     urecovery_CheckTid(atid, 0);
249     if (!ubik_currentTrans) {
250         code = USYNC;
251         goto done;
252     }
253
254     iovec = (struct ubik_iovec *)io_vector->iovec_wrt_val;
255     iobuf = (char *)io_buffer->iovec_buf_val;
256     for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
257         /* Sanity check for going off end of buffer */
258         if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
259             code = UINTERNAL;
260         } else {
261             code =
262                 udisk_write(ubik_currentTrans, iovec[i].file, &iobuf[offset],
263                             iovec[i].position, iovec[i].length);
264         }
265         if (code)
266             break;
267
268         offset += iovec[i].length;
269     }
270 done:
271     DBRELE(ubik_dbase);
272     return code;
273 }
274
275 afs_int32
276 SDISK_Write(struct rx_call *rxcall, struct ubik_tid *atid,
277             afs_int32 afile, afs_int32 apos, bulkdata *adata)
278 {
279     afs_int32 code;
280
281     if ((code = ubik_CheckAuth(rxcall))) {
282         return code;
283     }
284     DBHOLD(ubik_dbase);
285     if (!ubik_currentTrans) {
286         code = USYNC;
287         goto done;
288     }
289     /* sanity check to make sure only write trans appear here */
290     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
291         code = UBADTYPE;
292         goto done;
293     }
294
295     urecovery_CheckTid(atid, 0);
296     if (!ubik_currentTrans) {
297         code = USYNC;
298         goto done;
299     }
300     code =
301         udisk_write(ubik_currentTrans, afile, adata->bulkdata_val, apos,
302                     adata->bulkdata_len);
303 done:
304     DBRELE(ubik_dbase);
305     return code;
306 }
307
308 afs_int32
309 SDISK_Truncate(struct rx_call *rxcall, struct ubik_tid *atid,
310                afs_int32 afile, afs_int32 alen)
311 {
312     afs_int32 code;
313
314     if ((code = ubik_CheckAuth(rxcall))) {
315         return code;
316     }
317     DBHOLD(ubik_dbase);
318     if (!ubik_currentTrans) {
319         code = USYNC;
320         goto done;
321     }
322     /* sanity check to make sure only write trans appear here */
323     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
324         code = UBADTYPE;
325         goto done;
326     }
327
328     urecovery_CheckTid(atid, 0);
329     if (!ubik_currentTrans) {
330         code = USYNC;
331         goto done;
332     }
333     code = udisk_truncate(ubik_currentTrans, afile, alen);
334 done:
335     DBRELE(ubik_dbase);
336     return code;
337 }
338
339 afs_int32
340 SDISK_GetVersion(struct rx_call *rxcall,
341                  struct ubik_version *aversion)
342 {
343     afs_int32 code;
344
345     if ((code = ubik_CheckAuth(rxcall))) {
346         return code;
347     }
348
349     /*
350      * If we are the sync site, recovery shouldn't be running on any
351      * other site. We shouldn't be getting this RPC as long as we are
352      * the sync site.  To prevent any unforseen activity, we should
353      * reject this RPC until we have recognized that we are not the
354      * sync site anymore, and/or if we have any pending WRITE
355      * transactions that have to complete. This way we can be assured
356      * that this RPC would not block any pending transactions that
357      * should either fail or pass. If we have recognized the fact that
358      * we are not the sync site any more, all write transactions would
359      * fail with UNOQUORUM anyway.
360      */
361     DBHOLD(ubik_dbase);
362     if (ubeacon_AmSyncSite()) {
363         DBRELE(ubik_dbase);
364         return UDEADLOCK;
365     }
366
367     code = (*ubik_dbase->getlabel) (ubik_dbase, 0, aversion);
368     DBRELE(ubik_dbase);
369     if (code) {
370         /* tell other side there's no dbase */
371         aversion->epoch = 0;
372         aversion->counter = 0;
373     }
374     return 0;
375 }
376
377 afs_int32
378 SDISK_GetFile(struct rx_call *rxcall, afs_int32 file,
379               struct ubik_version *version)
380 {
381     afs_int32 code;
382     struct ubik_dbase *dbase;
383     afs_int32 offset;
384     struct ubik_stat ubikstat;
385     char tbuffer[256];
386     afs_int32 tlen;
387     afs_int32 length;
388
389     if ((code = ubik_CheckAuth(rxcall))) {
390         return code;
391     }
392     dbase = ubik_dbase;
393     DBHOLD(dbase);
394     code = (*dbase->stat) (dbase, file, &ubikstat);
395     if (code < 0) {
396         DBRELE(dbase);
397         return code;
398     }
399     length = ubikstat.size;
400     tlen = htonl(length);
401     code = rx_Write(rxcall, (char *)&tlen, sizeof(afs_int32));
402     if (code != sizeof(afs_int32)) {
403         DBRELE(dbase);
404         ubik_dprint("Rx-write length error=%d\n", code);
405         return BULK_ERROR;
406     }
407     offset = 0;
408     while (length > 0) {
409         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
410         code = (*dbase->read) (dbase, file, tbuffer, offset, tlen);
411         if (code != tlen) {
412             DBRELE(dbase);
413             ubik_dprint("read failed error=%d\n", code);
414             return UIOERROR;
415         }
416         code = rx_Write(rxcall, tbuffer, tlen);
417         if (code != tlen) {
418             DBRELE(dbase);
419             ubik_dprint("Rx-write length error=%d\n", code);
420             return BULK_ERROR;
421         }
422         length -= tlen;
423         offset += tlen;
424     }
425     code = (*dbase->getlabel) (dbase, file, version);   /* return the dbase, too */
426     DBRELE(dbase);
427     return code;
428 }
429
430 afs_int32
431 SDISK_SendFile(struct rx_call *rxcall, afs_int32 file,
432                afs_int32 length, struct ubik_version *avers)
433 {
434     afs_int32 code;
435     struct ubik_dbase *dbase = NULL;
436     char tbuffer[1024];
437     afs_int32 offset;
438     struct ubik_version tversion;
439     int tlen;
440     struct rx_peer *tpeer;
441     struct rx_connection *tconn;
442     afs_uint32 otherHost = 0;
443     char hoststr[16];
444     char pbuffer[1028];
445     int fd = -1;
446     afs_int32 epoch = 0;
447     afs_int32 pass;
448
449     /* send the file back to the requester */
450
451     dbase = ubik_dbase;
452
453     if ((code = ubik_CheckAuth(rxcall))) {
454         DBHOLD(dbase);
455         goto failed;
456     }
457
458     /* next, we do a sanity check to see if the guy sending us the database is
459      * the guy we think is the sync site.  It turns out that we might not have
460      * decided yet that someone's the sync site, but they could have enough
461      * votes from others to be sync site anyway, and could send us the database
462      * in advance of getting our votes.  This is fine, what we're really trying
463      * to check is that some authenticated bogon isn't sending a random database
464      * into another configuration.  This could happen on a bad configuration
465      * screwup.  Thus, we only object if we're sure we know who the sync site
466      * is, and it ain't the guy talking to us.
467      */
468     offset = uvote_GetSyncSite();
469     tconn = rx_ConnectionOf(rxcall);
470     tpeer = rx_PeerOf(tconn);
471     otherHost = ubikGetPrimaryInterfaceAddr(rx_HostOf(tpeer));
472     if (offset && offset != otherHost) {
473         /* we *know* this is the wrong guy */
474         code = USYNC;
475         DBHOLD(dbase);
476         goto failed;
477     }
478
479     DBHOLD(dbase);
480
481     /* abort any active trans that may scribble over the database */
482     urecovery_AbortAll(dbase);
483
484     ubik_print("Ubik: Synchronize database with server %s\n",
485                afs_inet_ntoa_r(otherHost, hoststr));
486
487     offset = 0;
488     UBIK_VERSION_LOCK;
489     epoch = tversion.epoch = 0;         /* start off by labelling in-transit db as invalid */
490     (*dbase->setlabel) (dbase, file, &tversion);        /* setlabel does sync */
491     snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP",
492              ubik_dbase->pathName, (file<0)?"SYS":"",
493              (file<0)?-file:file);
494     fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
495     if (fd < 0) {
496         code = errno;
497         goto failed_locked;
498     }
499     code = lseek(fd, HDRSIZE, 0);
500     if (code != HDRSIZE) {
501         close(fd);
502         goto failed_locked;
503     }
504     pass = 0;
505     memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
506     UBIK_VERSION_UNLOCK;
507     while (length > 0) {
508         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
509 #if !defined(AFS_PTHREAD_ENV)
510         if (pass % 4 == 0)
511             IOMGR_Poll();
512 #endif
513         code = rx_Read(rxcall, tbuffer, tlen);
514         if (code != tlen) {
515             ubik_dprint("Rx-read length error=%d\n", code);
516             code = BULK_ERROR;
517             close(fd);
518             goto failed;
519         }
520         code = write(fd, tbuffer, tlen);
521         pass++;
522         if (code != tlen) {
523             ubik_dprint("write failed error=%d\n", code);
524             code = UIOERROR;
525             close(fd);
526             goto failed;
527         }
528         offset += tlen;
529         length -= tlen;
530     }
531     code = close(fd);
532     if (code)
533         goto failed;
534
535     /* sync data first, then write label and resync (resync done by setlabel call).
536      * This way, good label is only on good database. */
537     snprintf(tbuffer, sizeof(tbuffer), "%s.DB%s%d",
538              ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
539 #ifdef AFS_NT40_ENV
540     snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD",
541              ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
542     code = unlink(pbuffer);
543     if (!code)
544         code = rename(tbuffer, pbuffer);
545     snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP",
546              ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
547 #endif
548     if (!code)
549         code = rename(pbuffer, tbuffer);
550     UBIK_VERSION_LOCK;
551     if (!code) {
552         (*ubik_dbase->open) (ubik_dbase, file);
553         code = (*ubik_dbase->setlabel) (dbase, file, avers);
554     }
555 #ifdef AFS_NT40_ENV
556     snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD",
557              ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
558     unlink(pbuffer);
559 #endif
560     memcpy(&ubik_dbase->version, avers, sizeof(struct ubik_version));
561     udisk_Invalidate(dbase, file);      /* new dbase, flush disk buffers */
562 #ifdef AFS_PTHREAD_ENV
563     opr_Assert(pthread_cond_broadcast(&dbase->version_cond) == 0);
564 #else
565     LWP_NoYieldSignal(&dbase->version);
566 #endif
567
568 failed_locked:
569     UBIK_VERSION_UNLOCK;
570
571 failed:
572     if (code) {
573         unlink(pbuffer);
574         /* Failed to sync. Allow reads again for now. */
575         if (dbase != NULL) {
576             UBIK_VERSION_LOCK;
577             tversion.epoch = epoch;
578             (*dbase->setlabel) (dbase, file, &tversion);
579             UBIK_VERSION_UNLOCK;
580         }
581         ubik_print
582             ("Ubik: Synchronize database with server %s failed (error = %d)\n",
583              afs_inet_ntoa_r(otherHost, hoststr), code);
584     } else {
585         ubik_print("Ubik: Synchronize database completed\n");
586     }
587     DBRELE(dbase);
588     return code;
589 }
590
591
592 afs_int32
593 SDISK_Probe(struct rx_call *rxcall)
594 {
595     return 0;
596 }
597
598 /*!
599  * \brief Update remote machines addresses in my server list
600  *
601  * Send back my addresses to caller of this RPC
602  * \return zero on success, else 1.
603  */
604 afs_int32
605 SDISK_UpdateInterfaceAddr(struct rx_call *rxcall,
606                           UbikInterfaceAddr *inAddr,
607                           UbikInterfaceAddr *outAddr)
608 {
609     struct ubik_server *ts, *tmp;
610     afs_uint32 remoteAddr;      /* in net byte order */
611     int i, j, found = 0, probableMatch = 0;
612     char hoststr[16];
613
614     UBIK_ADDR_LOCK;
615     /* copy the output parameters */
616     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR; i++)
617         outAddr->hostAddr[i] = ntohl(ubik_host[i]);
618
619     remoteAddr = htonl(inAddr->hostAddr[0]);
620     for (ts = ubik_servers; ts; ts = ts->next)
621         if (ts->addr[0] == remoteAddr) {        /* both in net byte order */
622             probableMatch = 1;
623             break;
624         }
625
626     if (probableMatch) {
627         /* verify that all addresses in the incoming RPC are
628          ** not part of other server entries in my CellServDB
629          */
630         for (i = 0; !found && (i < UBIK_MAX_INTERFACE_ADDR)
631              && inAddr->hostAddr[i]; i++) {
632             remoteAddr = htonl(inAddr->hostAddr[i]);
633             for (tmp = ubik_servers; (!found && tmp); tmp = tmp->next) {
634                 if (ts == tmp)  /* this is my server */
635                     continue;
636                 for (j = 0; (j < UBIK_MAX_INTERFACE_ADDR) && tmp->addr[j];
637                      j++)
638                     if (remoteAddr == tmp->addr[j]) {
639                         found = 1;
640                         break;
641                     }
642             }
643         }
644     }
645
646     /* if (probableMatch) */
647     /* inconsistent addresses in CellServDB */
648     if (!probableMatch || found) {
649         ubik_print("Inconsistent Cell Info from server: ");
650         for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
651             ubik_print("%s ", afs_inet_ntoa_r(htonl(inAddr->hostAddr[i]), hoststr));
652         ubik_print("\n");
653         fflush(stdout);
654         fflush(stderr);
655         printServerInfo();
656         UBIK_ADDR_UNLOCK;
657         return UBADHOST;
658     }
659
660     /* update our data structures */
661     for (i = 1; i < UBIK_MAX_INTERFACE_ADDR; i++)
662         ts->addr[i] = htonl(inAddr->hostAddr[i]);
663
664     ubik_print("ubik: A Remote Server has addresses: ");
665     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && ts->addr[i]; i++)
666         ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
667     ubik_print("\n");
668
669     UBIK_ADDR_UNLOCK;
670     return 0;
671 }
672
673 static void
674 printServerInfo(void)
675 {
676     struct ubik_server *ts;
677     int i, j = 1;
678     char hoststr[16];
679
680     ubik_print("Local CellServDB:");
681     for (ts = ubik_servers; ts; ts = ts->next, j++) {
682         ubik_print("Server %d: ", j);
683         for (i = 0; (i < UBIK_MAX_INTERFACE_ADDR) && ts->addr[i]; i++)
684             ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
685     }
686     ubik_print("\n");
687 }
688
689 afs_int32
690 SDISK_SetVersion(struct rx_call *rxcall, struct ubik_tid *atid,
691                  struct ubik_version *oldversionp,
692                  struct ubik_version *newversionp)
693 {
694     afs_int32 code = 0;
695
696     if ((code = ubik_CheckAuth(rxcall))) {
697         return (code);
698     }
699     DBHOLD(ubik_dbase);
700     if (!ubik_currentTrans) {
701         code = USYNC;
702         goto done;
703     }
704     /* sanity check to make sure only write trans appear here */
705     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
706         code = UBADTYPE;
707         goto done;
708     }
709
710     /* Should not get this for the sync site */
711     if (ubeacon_AmSyncSite()) {
712         code = UDEADLOCK;
713         goto done;
714     }
715
716     urecovery_CheckTid(atid, 0);
717     if (!ubik_currentTrans) {
718         code = USYNC;
719         goto done;
720     }
721
722     /* Set the label if its version matches the sync-site's */
723     if (uvote_eq_dbVersion(*oldversionp)) {
724         UBIK_VERSION_LOCK;
725         code = (*ubik_dbase->setlabel) (ubik_dbase, 0, newversionp);
726         if (!code) {
727             ubik_dbase->version = *newversionp;
728             uvote_set_dbVersion(*newversionp);
729         }
730         UBIK_VERSION_UNLOCK;
731     } else {
732         code = USYNC;
733     }
734 done:
735     DBRELE(ubik_dbase);
736     return code;
737 }