ubik: Introduce new vote lock
[openafs.git] / src / ubik / remote.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #include <roken.h>
14
15 #include <lock.h>
16 #include <rx/xdr.h>
17 #include <rx/rx.h>
18 #include <errno.h>
19 #include <afs/afsutil.h>
20
21 #define UBIK_INTERNALS
22 #include "ubik.h"
23 #include "ubik_int.h"
24
25 static void printServerInfo(void);
26
27 /*! \file
28  * routines for handling requests remotely-submitted by the sync site.  These are
29  * only write transactions (we don't propagate read trans), and there is at most one
30  * write transaction extant at any one time.
31  */
32
33 struct ubik_trans *ubik_currentTrans = 0;
34
35
36
37 /* the rest of these guys handle remote execution of write
38  * transactions: this is the code executed on the other servers when a
39  * sync site is executing a write transaction.
40  */
41 afs_int32
42 SDISK_Begin(struct rx_call *rxcall, struct ubik_tid *atid)
43 {
44     afs_int32 code;
45
46     if ((code = ubik_CheckAuth(rxcall))) {
47         return code;
48     }
49     DBHOLD(ubik_dbase);
50     urecovery_CheckTid(atid, 1);
51     code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
52     if (!code && ubik_currentTrans) {
53         /* label this trans with the right trans id */
54         ubik_currentTrans->tid.epoch = atid->epoch;
55         ubik_currentTrans->tid.counter = atid->counter;
56     }
57     DBRELE(ubik_dbase);
58     return code;
59 }
60
61
62 afs_int32
63 SDISK_Commit(struct rx_call *rxcall, struct ubik_tid *atid)
64 {
65     afs_int32 code;
66     struct ubik_dbase *dbase;
67
68     if ((code = ubik_CheckAuth(rxcall))) {
69         return code;
70     }
71
72     if (!ubik_currentTrans) {
73         return USYNC;
74     }
75     /*
76      * sanity check to make sure only write trans appear here
77      */
78     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
79         return UBADTYPE;
80     }
81
82     dbase = ubik_currentTrans->dbase;
83
84     ObtainWriteLock(&dbase->cache_lock);
85
86     DBHOLD(dbase);
87
88     urecovery_CheckTid(atid, 0);
89     if (!ubik_currentTrans) {
90         DBRELE(dbase);
91         ReleaseWriteLock(&dbase->cache_lock);
92         return USYNC;
93     }
94
95     code = udisk_commit(ubik_currentTrans);
96     if (code == 0) {
97         /* sync site should now match */
98         uvote_set_dbVersion(ubik_dbase->version);
99     }
100     DBRELE(dbase);
101     ReleaseWriteLock(&dbase->cache_lock);
102     return code;
103 }
104
105 afs_int32
106 SDISK_ReleaseLocks(struct rx_call *rxcall, struct ubik_tid *atid)
107 {
108     struct ubik_dbase *dbase;
109     afs_int32 code;
110
111     if ((code = ubik_CheckAuth(rxcall))) {
112         return code;
113     }
114
115     if (!ubik_currentTrans) {
116         return USYNC;
117     }
118     /* sanity check to make sure only write trans appear here */
119     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
120         return UBADTYPE;
121     }
122
123     dbase = ubik_currentTrans->dbase;
124     DBHOLD(dbase);
125     urecovery_CheckTid(atid, 0);
126     if (!ubik_currentTrans) {
127         DBRELE(dbase);
128         return USYNC;
129     }
130
131     /* If the thread is not waiting for lock - ok to end it */
132     if (ubik_currentTrans->locktype != LOCKWAIT) {
133         udisk_end(ubik_currentTrans);
134     }
135     ubik_currentTrans = (struct ubik_trans *)0;
136     DBRELE(dbase);
137     return 0;
138 }
139
140 afs_int32
141 SDISK_Abort(struct rx_call *rxcall, struct ubik_tid *atid)
142 {
143     afs_int32 code;
144     struct ubik_dbase *dbase;
145
146     if ((code = ubik_CheckAuth(rxcall))) {
147         return code;
148     }
149
150     if (!ubik_currentTrans) {
151         return USYNC;
152     }
153     /* sanity check to make sure only write trans appear here  */
154     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
155         return UBADTYPE;
156     }
157
158     dbase = ubik_currentTrans->dbase;
159     DBHOLD(dbase);
160     urecovery_CheckTid(atid, 0);
161     if (!ubik_currentTrans) {
162         DBRELE(dbase);
163         return USYNC;
164     }
165
166     code = udisk_abort(ubik_currentTrans);
167     /* If the thread is not waiting for lock - ok to end it */
168     if (ubik_currentTrans->locktype != LOCKWAIT) {
169         udisk_end(ubik_currentTrans);
170     }
171     ubik_currentTrans = (struct ubik_trans *)0;
172     DBRELE(dbase);
173     return code;
174 }
175
176 /* apos and alen are not used */
177 afs_int32
178 SDISK_Lock(struct rx_call *rxcall, struct ubik_tid *atid,
179            afs_int32 afile, afs_int32 apos, afs_int32 alen, afs_int32 atype)
180 {
181     afs_int32 code;
182     struct ubik_dbase *dbase;
183     struct ubik_trans *ubik_thisTrans;
184
185     if ((code = ubik_CheckAuth(rxcall))) {
186         return code;
187     }
188     if (!ubik_currentTrans) {
189         return USYNC;
190     }
191     /* sanity check to make sure only write trans appear here */
192     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
193         return UBADTYPE;
194     }
195     if (alen != 1) {
196         return UBADLOCK;
197     }
198     dbase = ubik_currentTrans->dbase;
199     DBHOLD(dbase);
200     urecovery_CheckTid(atid, 0);
201     if (!ubik_currentTrans) {
202         DBRELE(dbase);
203         return USYNC;
204     }
205
206     ubik_thisTrans = ubik_currentTrans;
207     code = ulock_getLock(ubik_currentTrans, atype, 1);
208
209     /* While waiting, the transaction may have been ended/
210      * aborted from under us (urecovery_CheckTid). In that
211      * case, end the transaction here.
212      */
213     if (!code && (ubik_currentTrans != ubik_thisTrans)) {
214         udisk_end(ubik_thisTrans);
215         code = USYNC;
216     }
217
218     DBRELE(dbase);
219     return code;
220 }
221
222 /*!
223  * \brief Write a vector of data
224  */
225 afs_int32
226 SDISK_WriteV(struct rx_call *rxcall, struct ubik_tid *atid,
227              iovec_wrt *io_vector, iovec_buf *io_buffer)
228 {
229     afs_int32 code, i, offset;
230     struct ubik_dbase *dbase;
231     struct ubik_iovec *iovec;
232     char *iobuf;
233
234     if ((code = ubik_CheckAuth(rxcall))) {
235         return code;
236     }
237     if (!ubik_currentTrans) {
238         return USYNC;
239     }
240     /* sanity check to make sure only write trans appear here */
241     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
242         return UBADTYPE;
243     }
244
245     dbase = ubik_currentTrans->dbase;
246     DBHOLD(dbase);
247     urecovery_CheckTid(atid, 0);
248     if (!ubik_currentTrans) {
249         DBRELE(dbase);
250         return USYNC;
251     }
252
253     iovec = (struct ubik_iovec *)io_vector->iovec_wrt_val;
254     iobuf = (char *)io_buffer->iovec_buf_val;
255     for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
256         /* Sanity check for going off end of buffer */
257         if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
258             code = UINTERNAL;
259         } else {
260             code =
261                 udisk_write(ubik_currentTrans, iovec[i].file, &iobuf[offset],
262                             iovec[i].position, iovec[i].length);
263         }
264         if (code)
265             break;
266
267         offset += iovec[i].length;
268     }
269
270     DBRELE(dbase);
271     return code;
272 }
273
274 afs_int32
275 SDISK_Write(struct rx_call *rxcall, struct ubik_tid *atid,
276             afs_int32 afile, afs_int32 apos, bulkdata *adata)
277 {
278     afs_int32 code;
279     struct ubik_dbase *dbase;
280
281     if ((code = ubik_CheckAuth(rxcall))) {
282         return code;
283     }
284     if (!ubik_currentTrans) {
285         return USYNC;
286     }
287     /* sanity check to make sure only write trans appear here */
288     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
289         return UBADTYPE;
290     }
291
292     dbase = ubik_currentTrans->dbase;
293     DBHOLD(dbase);
294     urecovery_CheckTid(atid, 0);
295     if (!ubik_currentTrans) {
296         DBRELE(dbase);
297         return USYNC;
298     }
299     code =
300         udisk_write(ubik_currentTrans, afile, adata->bulkdata_val, apos,
301                     adata->bulkdata_len);
302     DBRELE(dbase);
303     return code;
304 }
305
306 afs_int32
307 SDISK_Truncate(struct rx_call *rxcall, struct ubik_tid *atid,
308                afs_int32 afile, afs_int32 alen)
309 {
310     afs_int32 code;
311     struct ubik_dbase *dbase;
312
313     if ((code = ubik_CheckAuth(rxcall))) {
314         return code;
315     }
316     if (!ubik_currentTrans) {
317         return USYNC;
318     }
319     /* sanity check to make sure only write trans appear here */
320     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
321         return UBADTYPE;
322     }
323
324     dbase = ubik_currentTrans->dbase;
325     DBHOLD(dbase);
326     urecovery_CheckTid(atid, 0);
327     if (!ubik_currentTrans) {
328         DBRELE(dbase);
329         return USYNC;
330     }
331     code = udisk_truncate(ubik_currentTrans, afile, alen);
332     DBRELE(dbase);
333     return code;
334 }
335
336 afs_int32
337 SDISK_GetVersion(struct rx_call *rxcall,
338                  struct ubik_version *aversion)
339 {
340     afs_int32 code;
341
342     if ((code = ubik_CheckAuth(rxcall))) {
343         return code;
344     }
345
346     /*
347      * If we are the sync site, recovery shouldn't be running on any
348      * other site. We shouldn't be getting this RPC as long as we are
349      * the sync site.  To prevent any unforseen activity, we should
350      * reject this RPC until we have recognized that we are not the
351      * sync site anymore, and/or if we have any pending WRITE
352      * transactions that have to complete. This way we can be assured
353      * that this RPC would not block any pending transactions that
354      * should either fail or pass. If we have recognized the fact that
355      * we are not the sync site any more, all write transactions would
356      * fail with UNOQUORUM anyway.
357      */
358     if (ubeacon_AmSyncSite()) {
359         return UDEADLOCK;
360     }
361
362     DBHOLD(ubik_dbase);
363     code = (*ubik_dbase->getlabel) (ubik_dbase, 0, aversion);
364     DBRELE(ubik_dbase);
365     if (code) {
366         /* tell other side there's no dbase */
367         aversion->epoch = 0;
368         aversion->counter = 0;
369     }
370     return 0;
371 }
372
373 afs_int32
374 SDISK_GetFile(struct rx_call *rxcall, afs_int32 file,
375               struct ubik_version *version)
376 {
377     afs_int32 code;
378     struct ubik_dbase *dbase;
379     afs_int32 offset;
380     struct ubik_stat ubikstat;
381     char tbuffer[256];
382     afs_int32 tlen;
383     afs_int32 length;
384
385     if ((code = ubik_CheckAuth(rxcall))) {
386         return code;
387     }
388 /* temporarily disabled because it causes problems for migration tool.  Hey, it's just
389  * a sanity check, anyway.
390     if (ubeacon_AmSyncSite()) {
391       return UDEADLOCK;
392     }
393 */
394     dbase = ubik_dbase;
395     DBHOLD(dbase);
396     code = (*dbase->stat) (dbase, file, &ubikstat);
397     if (code < 0) {
398         DBRELE(dbase);
399         return code;
400     }
401     length = ubikstat.size;
402     tlen = htonl(length);
403     code = rx_Write(rxcall, (char *)&tlen, sizeof(afs_int32));
404     if (code != sizeof(afs_int32)) {
405         DBRELE(dbase);
406         ubik_dprint("Rx-write length error=%d\n", code);
407         return BULK_ERROR;
408     }
409     offset = 0;
410     while (length > 0) {
411         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
412         code = (*dbase->read) (dbase, file, tbuffer, offset, tlen);
413         if (code != tlen) {
414             DBRELE(dbase);
415             ubik_dprint("read failed error=%d\n", code);
416             return UIOERROR;
417         }
418         code = rx_Write(rxcall, tbuffer, tlen);
419         if (code != tlen) {
420             DBRELE(dbase);
421             ubik_dprint("Rx-write length error=%d\n", code);
422             return BULK_ERROR;
423         }
424         length -= tlen;
425         offset += tlen;
426     }
427     code = (*dbase->getlabel) (dbase, file, version);   /* return the dbase, too */
428     DBRELE(dbase);
429     return code;
430 }
431
432 afs_int32
433 SDISK_SendFile(struct rx_call *rxcall, afs_int32 file,
434                afs_int32 length, struct ubik_version *avers)
435 {
436     afs_int32 code;
437     struct ubik_dbase *dbase = NULL;
438     char tbuffer[1024];
439     afs_int32 offset;
440     struct ubik_version tversion;
441     int tlen;
442     struct rx_peer *tpeer;
443     struct rx_connection *tconn;
444     afs_uint32 otherHost = 0;
445     char hoststr[16];
446     char pbuffer[1028];
447     int fd = -1;
448     afs_int32 epoch = 0;
449     afs_int32 pass;
450
451     /* send the file back to the requester */
452
453     dbase = ubik_dbase;
454
455     if ((code = ubik_CheckAuth(rxcall))) {
456         DBHOLD(dbase);
457         goto failed;
458     }
459
460     /* next, we do a sanity check to see if the guy sending us the database is
461      * the guy we think is the sync site.  It turns out that we might not have
462      * decided yet that someone's the sync site, but they could have enough
463      * votes from others to be sync site anyway, and could send us the database
464      * in advance of getting our votes.  This is fine, what we're really trying
465      * to check is that some authenticated bogon isn't sending a random database
466      * into another configuration.  This could happen on a bad configuration
467      * screwup.  Thus, we only object if we're sure we know who the sync site
468      * is, and it ain't the guy talking to us.
469      */
470     offset = uvote_GetSyncSite();
471     tconn = rx_ConnectionOf(rxcall);
472     tpeer = rx_PeerOf(tconn);
473     otherHost = ubikGetPrimaryInterfaceAddr(rx_HostOf(tpeer));
474     if (offset && offset != otherHost) {
475         /* we *know* this is the wrong guy */
476         code = USYNC;
477         DBHOLD(dbase);
478         goto failed;
479     }
480
481     DBHOLD(dbase);
482
483     /* abort any active trans that may scribble over the database */
484     urecovery_AbortAll(dbase);
485
486     ubik_print("Ubik: Synchronize database with server %s\n",
487                afs_inet_ntoa_r(otherHost, hoststr));
488
489     offset = 0;
490     epoch = tversion.epoch = 0;         /* start off by labelling in-transit db as invalid */
491     (*dbase->setlabel) (dbase, file, &tversion);        /* setlabel does sync */
492     snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP",
493              ubik_dbase->pathName, (file<0)?"SYS":"",
494              (file<0)?-file:file);
495     fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
496     if (fd < 0) {
497         code = errno;
498         goto failed;
499     }
500     code = lseek(fd, HDRSIZE, 0);
501     if (code != HDRSIZE) {
502         close(fd);
503         goto failed;
504     }
505     pass = 0;
506     memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
507     while (length > 0) {
508         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
509 #if !defined(AFS_PTHREAD_ENV)
510         if (pass % 4 == 0)
511             IOMGR_Poll();
512 #endif
513         code = rx_Read(rxcall, tbuffer, tlen);
514         if (code != tlen) {
515             ubik_dprint("Rx-read length error=%d\n", code);
516             code = BULK_ERROR;
517             close(fd);
518             goto failed;
519         }
520         code = write(fd, tbuffer, tlen);
521         pass++;
522         if (code != tlen) {
523             ubik_dprint("write failed error=%d\n", code);
524             code = UIOERROR;
525             close(fd);
526             goto failed;
527         }
528         offset += tlen;
529         length -= tlen;
530     }
531     code = close(fd);
532     if (code)
533         goto failed;
534
535     /* sync data first, then write label and resync (resync done by setlabel call).
536      * This way, good label is only on good database. */
537     snprintf(tbuffer, sizeof(tbuffer), "%s.DB%s%d",
538              ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
539 #ifdef AFS_NT40_ENV
540     snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD",
541              ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
542     code = unlink(pbuffer);
543     if (!code)
544         code = rename(tbuffer, pbuffer);
545     snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP",
546              ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
547 #endif
548     if (!code)
549         code = rename(pbuffer, tbuffer);
550     if (!code) {
551         (*ubik_dbase->open) (ubik_dbase, file);
552         code = (*ubik_dbase->setlabel) (dbase, file, avers);
553     }
554 #ifdef AFS_NT40_ENV
555     snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD",
556              ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
557     unlink(pbuffer);
558 #endif
559     memcpy(&ubik_dbase->version, avers, sizeof(struct ubik_version));
560     udisk_Invalidate(dbase, file);      /* new dbase, flush disk buffers */
561 #ifdef AFS_PTHREAD_ENV
562     assert(pthread_cond_broadcast(&dbase->version_cond) == 0);
563 #else
564     LWP_NoYieldSignal(&dbase->version);
565 #endif
566
567 failed:
568     if (code) {
569         unlink(pbuffer);
570         /* Failed to sync. Allow reads again for now. */
571         if (dbase != NULL) {
572             tversion.epoch = epoch;
573             (*dbase->setlabel) (dbase, file, &tversion);
574         }
575         ubik_print
576             ("Ubik: Synchronize database with server %s failed (error = %d)\n",
577              afs_inet_ntoa_r(otherHost, hoststr), code);
578     } else {
579         ubik_print("Ubik: Synchronize database completed\n");
580     }
581     DBRELE(dbase);
582     return code;
583 }
584
585
586 afs_int32
587 SDISK_Probe(struct rx_call *rxcall)
588 {
589     return 0;
590 }
591
592 /*!
593  * \brief Update remote machines addresses in my server list
594  *
595  * Send back my addresses to caller of this RPC
596  * \return zero on success, else 1.
597  */
598 afs_int32
599 SDISK_UpdateInterfaceAddr(struct rx_call *rxcall,
600                           UbikInterfaceAddr *inAddr,
601                           UbikInterfaceAddr *outAddr)
602 {
603     struct ubik_server *ts, *tmp;
604     afs_uint32 remoteAddr;      /* in net byte order */
605     int i, j, found = 0, probableMatch = 0;
606     char hoststr[16];
607
608     /* copy the output parameters */
609     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR; i++)
610         outAddr->hostAddr[i] = ntohl(ubik_host[i]);
611
612     remoteAddr = htonl(inAddr->hostAddr[0]);
613     for (ts = ubik_servers; ts; ts = ts->next)
614         if (ts->addr[0] == remoteAddr) {        /* both in net byte order */
615             probableMatch = 1;
616             break;
617         }
618
619     if (probableMatch) {
620         /* verify that all addresses in the incoming RPC are
621          ** not part of other server entries in my CellServDB
622          */
623         for (i = 0; !found && (i < UBIK_MAX_INTERFACE_ADDR)
624              && inAddr->hostAddr[i]; i++) {
625             remoteAddr = htonl(inAddr->hostAddr[i]);
626             for (tmp = ubik_servers; (!found && tmp); tmp = tmp->next) {
627                 if (ts == tmp)  /* this is my server */
628                     continue;
629                 for (j = 0; (j < UBIK_MAX_INTERFACE_ADDR) && tmp->addr[j];
630                      j++)
631                     if (remoteAddr == tmp->addr[j]) {
632                         found = 1;
633                         break;
634                     }
635             }
636         }
637     }
638
639     /* if (probableMatch) */
640     /* inconsistent addresses in CellServDB */
641     if (!probableMatch || found) {
642         ubik_print("Inconsistent Cell Info from server: ");
643         for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
644             ubik_print("%s ", afs_inet_ntoa_r(htonl(inAddr->hostAddr[i]), hoststr));
645         ubik_print("\n");
646         fflush(stdout);
647         fflush(stderr);
648         printServerInfo();
649         return UBADHOST;
650     }
651
652     /* update our data structures */
653     for (i = 1; i < UBIK_MAX_INTERFACE_ADDR; i++)
654         ts->addr[i] = htonl(inAddr->hostAddr[i]);
655
656     ubik_print("ubik: A Remote Server has addresses: ");
657     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && ts->addr[i]; i++)
658         ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
659     ubik_print("\n");
660
661     return 0;
662 }
663
664 static void
665 printServerInfo(void)
666 {
667     struct ubik_server *ts;
668     int i, j = 1;
669     char hoststr[16];
670
671     ubik_print("Local CellServDB:");
672     for (ts = ubik_servers; ts; ts = ts->next, j++) {
673         ubik_print("Server %d: ", j);
674         for (i = 0; (i < UBIK_MAX_INTERFACE_ADDR) && ts->addr[i]; i++)
675             ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
676     }
677     ubik_print("\n");
678 }
679
680 afs_int32
681 SDISK_SetVersion(struct rx_call *rxcall, struct ubik_tid *atid,
682                  struct ubik_version *oldversionp,
683                  struct ubik_version *newversionp)
684 {
685     afs_int32 code = 0;
686     struct ubik_dbase *dbase;
687
688     if ((code = ubik_CheckAuth(rxcall))) {
689         return (code);
690     }
691
692     if (!ubik_currentTrans) {
693         return USYNC;
694     }
695     /* sanity check to make sure only write trans appear here */
696     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
697         return UBADTYPE;
698     }
699
700     /* Should not get this for the sync site */
701     if (ubeacon_AmSyncSite()) {
702         return UDEADLOCK;
703     }
704
705     dbase = ubik_currentTrans->dbase;
706     DBHOLD(dbase);
707     urecovery_CheckTid(atid, 0);
708     if (!ubik_currentTrans) {
709         DBRELE(dbase);
710         return USYNC;
711     }
712
713     /* Set the label if its version matches the sync-site's */
714     if (uvote_eq_dbVersion(*oldversionp)) {
715         code = (*dbase->setlabel) (ubik_dbase, 0, newversionp);
716         if (!code) {
717             ubik_dbase->version = *newversionp;
718             uvote_set_dbVersion(*newversionp);
719         }
720     } else {
721         code = USYNC;
722     }
723
724     DBRELE(dbase);
725     return code;
726 }