include-afsconfig-before-param-h-20010712
[openafs.git] / src / ubik / ubik.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 RCSID("$Header$");
14
15 #include <sys/types.h>
16 #ifdef AFS_NT40_ENV
17 #include <winsock2.h>
18 #else
19 #include <sys/file.h>
20 #include <netinet/in.h>
21 #include <sys/param.h>
22 #endif
23 #include <time.h>
24 #include <lock.h>
25 #include <rx/xdr.h>
26 #include <rx/rx.h>
27 #include <afs/cellconfig.h>
28
29 #define UBIK_INTERNALS
30 #include "ubik.h"
31 #include "ubik_int.h"
32
33 #define ERROR_EXIT(code) {error=(code); goto error_exit;}
34
35 /*  This system is organized in a hierarchical set of related modules.  Modules
36     at one level can only call modules at the same level or below.
37     
38     At the bottom level (0) we have R, RFTP, LWP and IOMGR, i.e. the basic
39     operating system primitives.
40     
41     At the next level (1) we have
42
43         VOTER--The module responsible for casting votes when asked.  It is also
44         responsible for determining whether this server should try to become
45         a synchronization site.
46         
47         BEACONER--The module responsible for sending keep-alives out when a
48         server is actually the sync site, or trying to become a sync site.
49         
50         DISK--The module responsible for representing atomic transactions
51         on the local disk.  It maintains a new-value only log.
52         
53         LOCK--The module responsible for locking byte ranges in the database file.
54         
55     At the next level (2) we have
56           
57         RECOVERY--The module responsible for ensuring that all members of a quorum
58         have the same up-to-date database after a new synchronization site is
59         elected.  This module runs only on the synchronization site.
60         
61     At the next level (3) we have
62     
63         REMOTE--The module responsible for interpreting requests from the sync
64         site and applying them to the database, after obtaining the appropriate
65         locks.
66         
67     At the next level (4) we have
68     
69         UBIK--The module users call to perform operations on the database.
70 */
71
72
73 /* some globals */
74 afs_int32 ubik_quorum=0;
75 struct ubik_dbase *ubik_dbase=0;
76 struct ubik_stats ubik_stats;
77 afs_uint32 ubik_host[UBIK_MAX_INTERFACE_ADDR];
78 afs_int32 ubik_epochTime = 0;
79 afs_int32 urecovery_state = 0;
80 int (*ubik_SRXSecurityProc)();
81 char *ubik_SRXSecurityRock;
82 struct ubik_server *ubik_servers;
83 short ubik_callPortal;
84
85 static int BeginTrans();
86
87 struct rx_securityClass *ubik_sc[3];
88
89 /* perform an operation at a quorum, handling error conditions.  return 0 if
90     all worked, otherwise mark failing server as down and return UERROR
91
92     Note that if any server misses an update, we must wait BIGTIME seconds before
93     allowing the transaction to commit, to ensure that the missing and possibly still
94     functioning server times out and stop handing out old data.  This is done in the commit
95     code, where we wait for a server marked down to have stayed down for BIGTIME seconds
96     before we allow a transaction to commit.  A server that fails but comes back up won't give
97     out old data because it is sent the sync count along with the beacon message that
98     marks it as *really* up (beaconSinceDown).
99 */
100 #define CStampVersion       1       /* meaning set ts->version */
101 afs_int32 ContactQuorum(aproc, atrans, aflags, aparm0, aparm1, aparm2, aparm3, aparm4, aparm5)
102     int (*aproc)();
103     int aflags;
104     register struct ubik_trans *atrans;
105     long aparm0, aparm1, aparm2, aparm3, aparm4, aparm5; {
106     register struct ubik_server *ts;
107     register afs_int32 code;
108     afs_int32 rcode, okcalls;
109
110     rcode = 0;
111     okcalls = 0;
112     for(ts = ubik_servers; ts; ts=ts->next) {
113         /* for each server */
114         if (!ts->up || !ts->currentDB) {
115             ts->currentDB = 0;  /* db is no longer current; we just missed an update */
116             continue;    /* not up-to-date, don't bother */
117         }
118         code = (*aproc) (ts->disk_rxcid, &atrans->tid, aparm0, aparm1, aparm2, aparm3, aparm4, aparm5);
119         if ( (aproc == DISK_WriteV) && (code <= -450) && (code > -500) ) {
120            /* An RPC interface mismatch (as defined in comerr/error_msg.c).
121             * Un-bulk the entries and do individual DISK_Write calls
122             * instead of DISK_WriteV.
123             */
124            iovec_wrt         *iovec_infoP = (iovec_wrt *)aparm0;
125            iovec_buf         *iovec_dataP = (iovec_buf *)aparm1;
126            struct ubik_iovec *iovec       = (struct ubik_iovec *)iovec_infoP->iovec_wrt_val;
127            char              *iobuf       = (char *)iovec_dataP->iovec_buf_val;
128            bulkdata          tcbs;
129            afs_int32             i, offset;
130
131            for (i=0, offset=0; i<iovec_infoP->iovec_wrt_len; i++) {
132               /* Sanity check for going off end of buffer */
133               if ((offset + iovec[i].length) > iovec_dataP->iovec_buf_len) {
134                  code = UINTERNAL;
135                  break;
136               }
137               tcbs.bulkdata_len = iovec[i].length;
138               tcbs.bulkdata_val = &iobuf[offset];
139               code = DISK_Write(ts->disk_rxcid, &atrans->tid,
140                                 iovec[i].file, iovec[i].position, &tcbs);
141               if (code) break;
142
143               offset += iovec[i].length;
144            }
145         }
146         if (code) {         /* failure */
147             rcode = code;
148             ts->up = 0;     /* mark as down now; beacons will no longer be sent */
149             ts->currentDB = 0;
150             ts->beaconSinceDown = 0;
151             urecovery_LostServer(); /* tell recovery to try to resend dbase later */
152         } else {            /* success */
153             if (!ts->isClone)
154                 okcalls++;          /* count up how many worked */
155             if (aflags & CStampVersion) {
156                 ts->version = atrans->dbase->version;
157             }
158         }
159     }
160     /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
161     if (okcalls+1 >= ubik_quorum) return 0;
162     else return rcode;
163 }
164
165 /* This routine initializes the ubik system for a set of servers.  It returns 0 for success, or an error code on failure.  The set of servers is specified by serverList; nServers gives the number of entries in this array.  Finally, dbase is the returned structure representing this instance of a ubik; it is passed to various calls below.  The variable pathName provides an initial prefix used for naming storage files used by this system.  It should perhaps be generalized to a low-level disk interface providing read, write, file enumeration and sync operations.
166
167     Note that the host named by myHost should not also be listed in serverList.
168 */
169
170 int ubik_ServerInitByInfo(myHost, myPort, info, clones, pathName, dbase)
171     struct afsconf_cell *info;  /* in */
172     char clones[];
173     afs_int32 myHost;
174     short myPort;
175     char *pathName;     /* in */
176     struct ubik_dbase **dbase; /* out */ 
177 {
178      afs_int32 code;
179     
180      code = ubik_ServerInitCommon(myHost, myPort, info, clones, 0, pathName, dbase);
181      return code;
182 }
183
184 int ubik_ServerInit(myHost, myPort, serverList, pathName, dbase)
185     afs_int32 serverList[];    /* in */
186     afs_int32 myHost;
187     short myPort;
188     char *pathName;     /* in */
189     struct ubik_dbase **dbase; /* out */ 
190 {
191      afs_int32 code;
192     
193      code = ubik_ServerInitCommon(myHost, myPort, (struct afsconf_cell *)0, 0,
194                         serverList, pathName, dbase);
195      return code;
196 }
197
198 int ubik_ServerInitCommon(myHost, myPort, info, clones, serverList, pathName, dbase)
199     afs_int32 myHost;
200     short myPort;
201     struct afsconf_cell *info;  /* in */
202     char clones[];
203     afs_int32 serverList[];    /* in */
204     char *pathName;     /* in */
205     struct ubik_dbase **dbase; /* out */ 
206 {
207     register struct ubik_dbase *tdb;
208     register afs_int32 code;
209     PROCESS junk;
210     afs_int32 secIndex;
211     struct rx_securityClass *secClass;
212
213     struct rx_service *tservice;
214     extern struct rx_securityClass *rxnull_NewServerSecurityObject();
215     extern int VOTE_ExecuteRequest(), DISK_ExecuteRequest();
216     extern void rx_ServerProc();
217     extern int rx_stackSize;
218
219     initialize_u_error_table();
220
221     tdb = (struct ubik_dbase *) malloc(sizeof(struct ubik_dbase));
222     tdb->pathName = (char *) malloc(strlen(pathName)+1);
223     strcpy(tdb->pathName, pathName);
224     tdb->activeTrans = (struct ubik_trans *) 0;
225     bzero(&tdb->version, sizeof(struct ubik_version));
226     bzero(&tdb->cachedVersion, sizeof(struct ubik_version));
227     Lock_Init(&tdb->versionLock);
228     tdb->flags = 0;
229     tdb->read = uphys_read;
230     tdb->write = uphys_write;
231     tdb->truncate = uphys_truncate;
232     tdb->open = 0;  /* this function isn't used any more */
233     tdb->sync = uphys_sync;
234     tdb->stat = uphys_stat;
235     tdb->getlabel = uphys_getlabel;
236     tdb->setlabel = uphys_setlabel;
237     tdb->getnfiles = uphys_getnfiles;
238     tdb->readers=0;
239     tdb->tidCounter=tdb->writeTidCounter=0;
240     *dbase = tdb;
241     ubik_dbase = tdb;   /* for now, only one db per server; can fix later when we have names for the other dbases */
242
243     /* initialize RX */
244     ubik_callPortal = myPort;
245     /* try to get an additional security object */
246     ubik_sc[0] = rxnull_NewServerSecurityObject();
247     ubik_sc[1] = 0;
248     ubik_sc[2] = 0;
249     if (ubik_SRXSecurityProc) {
250         code = (*ubik_SRXSecurityProc)(ubik_SRXSecurityRock, &secClass, &secIndex);
251         if (code == 0) {
252             ubik_sc[secIndex] = secClass;
253         }
254     }
255     code = rx_Init(myPort);
256     if (code < 0) return code;
257     tservice = rx_NewService(0, VOTE_SERVICE_ID, "VOTE", ubik_sc, 3, VOTE_ExecuteRequest);
258     if (tservice == (struct rx_service *)0) {
259         ubik_dprint("Could not create VOTE rx service!\n");
260         return -1;
261     }
262     rx_SetMinProcs(tservice, 2);
263     rx_SetMaxProcs(tservice, 3);
264
265     tservice = rx_NewService(0, DISK_SERVICE_ID, "DISK", ubik_sc, 3, DISK_ExecuteRequest);
266     if (tservice == (struct rx_service *)0) {
267         ubik_dprint("Could not create DISK rx service!\n");
268         return -1;
269     }
270     rx_SetMinProcs(tservice, 2);
271     rx_SetMaxProcs(tservice, 3);
272
273     /* start an rx_ServerProc to handle incoming RPC's in particular the 
274      * UpdateInterfaceAddr RPC that occurs in ubeacon_InitServerList. This avoids
275      * the "steplock" problem in ubik initialization. Defect 11037.
276      */
277     LWP_CreateProcess(rx_ServerProc, rx_stackSize, RX_PROCESS_PRIORITY,
278                       0, "rx_ServerProc", &junk);
279
280     /* do basic initialization */
281     code = uvote_Init();
282     if (code) return code;
283     code = urecovery_Initialize(tdb);
284     if (code) return code;
285     if (info)
286         code = ubeacon_InitServerListByInfo(myHost, info, clones);
287     else 
288         code = ubeacon_InitServerList(myHost, serverList);
289     if (code) return code;
290
291     /* now start up async processes */
292     code = LWP_CreateProcess(ubeacon_Interact, 16384/*8192*/, LWP_MAX_PRIORITY-1,
293                              0, "beacon", &junk);
294     if (code) return code;
295     code = LWP_CreateProcess(urecovery_Interact, 16384/*8192*/, LWP_MAX_PRIORITY-1,
296                              0, "recovery", &junk);
297     return code;
298 }
299
300 /*  This routine begins a read or write transaction on the transaction
301     identified by transPtr, in the dbase named by dbase.  An open mode of
302     ubik_READTRANS identifies this as a read transaction, while a mode of
303     ubik_WRITETRANS identifies this as a write transaction.  transPtr 
304     is set to the returned transaction control block. The readAny flag is
305     set to 0 or 1 by the wrapper functions ubik_BeginTrans() or 
306     ubik_BeginTransReadAny() below.
307
308     We can only begin transaction when we have an up-to-date database.
309 */
310
311 static int BeginTrans(dbase, transMode, transPtr, readAny)
312     register struct ubik_dbase *dbase;  /* in */
313     int readAny;
314     afs_int32 transMode; /* in */
315     struct ubik_trans **transPtr;       /* out */ {
316     struct ubik_trans *jt;
317     register struct ubik_trans *tt;
318     register afs_int32 code;
319
320     if ((transMode != UBIK_READTRANS) && readAny) return UBADTYPE;
321     DBHOLD(dbase);
322     if (urecovery_AllBetter(dbase, readAny)==0) {
323         DBRELE(dbase);
324         return UNOQUORUM;
325     }
326     /* otherwise we have a quorum, use it */
327
328     /* make sure that at most one write transaction occurs at any one time.  This
329         has nothing to do with transaction locking; that's enforced by the lock package.  However,
330         we can't even handle two non-conflicting writes, since our log and recovery modules
331         don't know how to restore one without possibly picking up some data from the other. */
332     if (transMode == UBIK_WRITETRANS) {
333         /* if we're writing already, wait */
334         while(dbase->flags & DBWRITING) {
335             DBRELE(dbase);
336             LWP_WaitProcess(&dbase->flags);
337             DBHOLD(dbase);
338         }
339         if (!ubeacon_AmSyncSite()) {
340             DBRELE(dbase);
341             return UNOTSYNC;
342         }
343     }
344
345     /* create the transaction */
346     code = udisk_begin(dbase, transMode, &jt);  /* can't take address of register var */
347     tt = jt;        /* move to a register */
348     if (code || tt == (struct ubik_trans *)NULL) {
349         DBRELE(dbase);
350         return code;
351     }
352     if (readAny) tt->flags |= TRREADANY;
353     /* label trans and dbase with new tid */
354     tt->tid.epoch = ubik_epochTime;
355     /* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */
356     tt->tid.counter = (dbase->tidCounter += 2);
357
358     if (transMode == UBIK_WRITETRANS) {
359       /* for a write trans, we have to keep track of the write tid counter too */
360       dbase->writeTidCounter += 2;
361
362         /* next try to start transaction on appropriate number of machines */
363         code = ContactQuorum(DISK_Begin, tt, 0);
364         if (code) {
365             /* we must abort the operation */
366             udisk_abort(tt);
367             ContactQuorum(DISK_Abort, tt, 0);    /* force aborts to the others */
368             udisk_end(tt);
369             DBRELE(dbase);
370             return code;
371         }
372     }
373
374     *transPtr = tt;
375     DBRELE(dbase);
376     return 0;
377 }
378
379 int ubik_BeginTrans(dbase, transMode, transPtr)
380     register struct ubik_dbase *dbase;  /* in */
381     afs_int32 transMode; /* in */
382     struct ubik_trans **transPtr;       /* out */ {
383         return BeginTrans(dbase, transMode, transPtr, 0);
384 }
385
386 int ubik_BeginTransReadAny(dbase, transMode, transPtr)
387     register struct ubik_dbase *dbase;  /* in */
388     afs_int32 transMode; /* in */
389     struct ubik_trans **transPtr;       /* out */ {
390         return BeginTrans(dbase, transMode, transPtr, 1);
391
392  
393 /* this routine ends a read or write transaction by aborting it */
394 int ubik_AbortTrans(transPtr)
395     register struct ubik_trans *transPtr; /* in */ {
396     register afs_int32 code;
397     afs_int32 code2;
398     register struct ubik_dbase *dbase;
399     
400     dbase = transPtr->dbase;
401     DBHOLD(dbase);
402     bzero(&dbase->cachedVersion, sizeof(struct ubik_version));
403     /* see if we're still up-to-date */
404     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
405         udisk_abort(transPtr);
406         udisk_end(transPtr);
407         DBRELE(dbase);
408         return UNOQUORUM;
409     }
410
411     if (transPtr->type == UBIK_READTRANS) {
412         code = udisk_abort(transPtr);
413         udisk_end(transPtr);
414         DBRELE(dbase);
415         return code;
416     }
417
418     /* below here, we know we're doing a write transaction */
419     if (!ubeacon_AmSyncSite()) {
420         udisk_abort(transPtr);
421         udisk_end(transPtr);
422         DBRELE(dbase);
423         return UNOTSYNC;
424     }
425     
426     /* now it is safe to try remote abort */
427     code = ContactQuorum(DISK_Abort, transPtr, 0);
428     code2 = udisk_abort(transPtr);
429     udisk_end(transPtr);
430     DBRELE(dbase);
431     return (code? code : code2);
432 }
433
434 /* This routine ends a read or write transaction on the open transaction identified by transPtr.  It returns an error code. */
435 int ubik_EndTrans(transPtr)
436     register struct ubik_trans *transPtr; /* in */ {
437     register afs_int32 code;
438     struct timeval tv;
439     afs_int32 realStart;
440     register struct ubik_server *ts;
441     afs_int32 now;
442     register struct ubik_dbase *dbase;
443     
444     if (transPtr->type == UBIK_WRITETRANS) {
445        code = ubik_Flush(transPtr);
446        if (code) {
447           ubik_AbortTrans(transPtr);
448           return(code);
449        }
450     }
451
452     dbase = transPtr->dbase;
453     DBHOLD(dbase);
454     bzero(&dbase->cachedVersion, sizeof(struct ubik_version));
455
456     /* give up if no longer current */
457     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
458         udisk_abort(transPtr);
459         udisk_end(transPtr);
460         DBRELE(dbase);
461         return UNOQUORUM;
462     }
463
464     if (transPtr->type == UBIK_READTRANS) { /* reads are easy */
465         code = udisk_commit(transPtr);
466         if (code == 0) goto success;    /* update cachedVersion correctly */
467         udisk_end(transPtr);
468         DBRELE(dbase);
469         return code;
470     }
471
472     if (!ubeacon_AmSyncSite()) {    /* no longer sync site */
473         udisk_abort(transPtr);
474         udisk_end(transPtr);
475         DBRELE(dbase);
476         return UNOTSYNC;
477     }
478     
479     /* now it is safe to do commit */
480     code = udisk_commit(transPtr);
481     if (code == 0) code = ContactQuorum(DISK_Commit, transPtr, CStampVersion);
482     if (code) {
483         /* failed to commit, so must return failure.  Try to clear locks first, just for fun
484             Note that we don't know if this transaction will eventually commit at this point.
485             If it made it to a site that will be present in the next quorum, we win, otherwise
486             we lose.  If we contact a majority of sites, then we won't be here: contacting
487             a majority guarantees commit, since it guarantees that one dude will be a
488             member of the next quorum. */
489         ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
490         udisk_end(transPtr);
491         DBRELE(dbase);
492         return code;
493     }
494     /* before we can start sending unlock messages, we must wait until all servers
495         that are possibly still functioning on the other side of a network partition
496         have timed out.  Check the server structures, compute how long to wait, then
497         start the unlocks */
498     realStart = FT_ApproxTime();
499     while (1) {
500         /* wait for all servers to time out */
501         code = 0;
502         now = FT_ApproxTime();
503         /* check if we're still sync site, the guy should either come up
504             to us, or timeout.  Put safety check in anyway */
505         if (now - realStart > 10 * BIGTIME) {
506             ubik_stats.escapes++;
507             ubik_print("ubik escaping from commit wait\n");
508             break;
509         }
510         for(ts = ubik_servers; ts; ts=ts->next) {
511             if (!ts->beaconSinceDown && now <= ts->lastBeaconSent + BIGTIME) {
512                 /* this guy could have some damaged data, wait for him */
513                 code = 1;
514                 tv.tv_sec = 1;  /* try again after a while (ha ha) */
515                 tv.tv_usec = 0;
516                 IOMGR_Select(0, 0, 0, 0, &tv);  /* poll, should we wait on something? */
517                 break;
518             }
519         }
520         if (code == 0) break;       /* no down ones still pseudo-active */
521     }
522
523     /* finally, unlock all the dudes.  We can return success independent of the number of servers
524         that really unlock the dbase; the others will do it if/when they elect a new sync site.
525         The transaction is committed anyway, since we succeeded in contacting a quorum
526         at the start (when invoking the DiskCommit function).
527     */
528     ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
529
530   success:
531     udisk_end(transPtr);
532     /* update version on successful EndTrans */
533     bcopy(&dbase->version, &dbase->cachedVersion, sizeof(struct ubik_version));
534     
535     DBRELE(dbase);
536     return 0;
537 }
538
539 /* This routine reads length bytes into buffer from the current position in the database.  The file pointer is updated appropriately (by adding the number of bytes actually transferred), and the length actually transferred is stored in the long integer pointed to by length.  Note that *length is an INOUT parameter: at the start it represents the size of the buffer, and when done, it contains the number of bytes actually transferred.  A short read returns zero for an error code. */
540
541 int ubik_Read(transPtr, buffer, length)
542     register struct ubik_trans *transPtr;       /* in */
543     char *buffer;   /* in */
544     afs_int32 length;   /* in */ {
545     register afs_int32 code;
546
547     /* reads are easy to do: handle locally */
548     DBHOLD(transPtr->dbase);
549     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
550         DBRELE(transPtr->dbase);
551         return UNOQUORUM;
552     }
553     
554     code = udisk_read(transPtr, transPtr->seekFile, buffer, transPtr->seekPos, length);
555     if (code == 0) {
556         transPtr->seekPos += length;
557     }
558     DBRELE(transPtr->dbase);
559     return code;
560 }
561
562 /* This routine will flush the io data in the iovec structures. It first
563  * flushes to the local disk and then uses ContactQuorum to write it to 
564  * the other servers.
565  */
566 int ubik_Flush(transPtr)
567     struct ubik_trans *transPtr;
568 {
569     afs_int32 code, error=0;
570
571     if (transPtr->type != UBIK_WRITETRANS)
572        return UBADTYPE;
573     if (!transPtr->iovec_info.iovec_wrt_len || !transPtr->iovec_info.iovec_wrt_val)
574        return 0;
575
576     DBHOLD(transPtr->dbase);
577     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
578        ERROR_EXIT(UNOQUORUM);
579     if (!ubeacon_AmSyncSite())      /* only sync site can write */
580        ERROR_EXIT(UNOTSYNC);
581
582     /* Update the rest of the servers in the quorum */
583     code = ContactQuorum(DISK_WriteV, transPtr, 0, 
584                          &transPtr->iovec_info, &transPtr->iovec_data);
585     if (code) {
586        udisk_abort(transPtr);
587        ContactQuorum(DISK_Abort, transPtr, 0);    /* force aborts to the others */
588        transPtr->iovec_info.iovec_wrt_len = 0;
589        transPtr->iovec_data.iovec_buf_len = 0;
590        ERROR_EXIT(code);
591     }
592
593     /* Wrote the buffers out, so start at scratch again */
594     transPtr->iovec_info.iovec_wrt_len = 0;
595     transPtr->iovec_data.iovec_buf_len = 0;
596
597   error_exit:
598     DBRELE(transPtr->dbase);
599     return error;
600 }
601
602 int ubik_Write(transPtr, buffer, length)
603     register struct ubik_trans *transPtr;       /* in */
604     char *buffer;       /* in */
605     afs_int32 length;   /* in */
606 {
607     struct ubik_iovec *iovec;
608     afs_int32 code, error=0;
609     afs_int32 pos, len, size;
610
611     if (transPtr->type != UBIK_WRITETRANS)
612        return UBADTYPE;
613     if (!length)
614        return 0;
615
616     if (length > IOVEC_MAXBUF) {
617        for (pos=0, len=length; len>0; len-=size, pos+=size) {
618           size = ((len < IOVEC_MAXBUF) ? len : IOVEC_MAXBUF);
619           code = ubik_Write(transPtr, &buffer[pos], size);
620           if (code) return (code);
621        }
622        return 0;
623     }
624
625     if (!transPtr->iovec_info.iovec_wrt_val) {
626        transPtr->iovec_info.iovec_wrt_len  = 0;
627        transPtr->iovec_info.iovec_wrt_val  = 
628          (struct ubik_iovec *)malloc(IOVEC_MAXWRT*sizeof(struct ubik_iovec));
629        transPtr->iovec_data.iovec_buf_len = 0;
630        transPtr->iovec_data.iovec_buf_val = (char *)malloc(IOVEC_MAXBUF);
631        if (!transPtr->iovec_info.iovec_wrt_val || !transPtr->iovec_data.iovec_buf_val) {
632           if (transPtr->iovec_info.iovec_wrt_val) free(transPtr->iovec_info.iovec_wrt_val);
633           transPtr->iovec_info.iovec_wrt_val = 0;
634           if (transPtr->iovec_data.iovec_buf_val) free(transPtr->iovec_data.iovec_buf_val);
635           transPtr->iovec_data.iovec_buf_val = 0;
636           return UNOMEM;
637        }
638     }
639
640     /* If this write won't fit in the structure, then flush it out and start anew */
641     if ( (transPtr->iovec_info.iovec_wrt_len >= IOVEC_MAXWRT) ||
642          ((length + transPtr->iovec_data.iovec_buf_len) > IOVEC_MAXBUF) ) {
643        code = ubik_Flush(transPtr);
644        if (code) return (code);
645     }
646
647     DBHOLD(transPtr->dbase);
648     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
649        ERROR_EXIT(UNOQUORUM);
650     if (!ubeacon_AmSyncSite())      /* only sync site can write */
651        ERROR_EXIT(UNOTSYNC);
652
653     /* Write to the local disk */
654     code = udisk_write(transPtr, transPtr->seekFile, buffer, 
655                                  transPtr->seekPos, length);
656     if (code) {
657        udisk_abort(transPtr);
658        transPtr->iovec_info.iovec_wrt_len = 0;
659        transPtr->iovec_data.iovec_buf_len = 0;
660        DBRELE(transPtr->dbase);
661        return(code);
662     }
663
664     /* Collect writes for the other ubik servers (to be done in bulk) */
665     iovec = (struct ubik_iovec *)transPtr->iovec_info.iovec_wrt_val;
666     iovec[transPtr->iovec_info.iovec_wrt_len].file     = transPtr->seekFile;
667     iovec[transPtr->iovec_info.iovec_wrt_len].position = transPtr->seekPos;
668     iovec[transPtr->iovec_info.iovec_wrt_len].length   = length;
669     
670     bcopy(buffer, 
671           &transPtr->iovec_data.iovec_buf_val[transPtr->iovec_data.iovec_buf_len],
672           length);
673
674     transPtr->iovec_info.iovec_wrt_len++;
675     transPtr->iovec_data.iovec_buf_len += length;
676     transPtr->seekPos += length;
677
678   error_exit:
679     DBRELE(transPtr->dbase);
680     return error;
681 }
682
683 /* This sets the file pointer associated with the current transaction to the appropriate file and byte position.  Unlike Unix files, a transaction is labelled by both a file number (fileid) and a byte position relative to the specified file (position). */
684
685 int ubik_Seek(transPtr, fileid, position)
686     register struct ubik_trans *transPtr;       /* IN */
687     afs_int32 fileid;    /* IN */
688     afs_int32 position;  /* IN */ {
689     register afs_int32 code;
690
691     DBHOLD(transPtr->dbase);
692     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
693         code = UNOQUORUM;
694     } else  {
695         transPtr->seekFile = fileid;
696         transPtr->seekPos = position;
697         code = 0;
698     }
699     DBRELE(transPtr->dbase);
700     return code;
701 }
702
703 /* This call returns the file pointer associated with the specified transaction in fileid and position. */
704
705 int ubik_Tell(transPtr, fileid, position)
706     register struct ubik_trans *transPtr;       /* IN */
707     afs_int32 *fileid;   /* OUT */
708     afs_int32 *position; /* OUT */ {
709     DBHOLD(transPtr->dbase);
710     *fileid = transPtr->seekFile;
711     *position = transPtr->seekPos;
712     DBRELE(transPtr->dbase);
713     return 0;
714 }
715
716 /* This sets the file size for the currently-selected file to length bytes, if length is less than the file's current size. */
717
718 int ubik_Truncate(transPtr, length)
719     register struct ubik_trans *transPtr;       /* in */
720     afs_int32 length;    /* in */ {
721     afs_int32 code, error=0;
722
723     /* Will also catch if not UBIK_WRITETRANS */
724     code = ubik_Flush(transPtr);
725     if (code) return(code);
726
727     DBHOLD(transPtr->dbase);
728     /* first, check that quorum is still good, and that dbase is up-to-date */
729     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
730        ERROR_EXIT(UNOQUORUM);
731     if (!ubeacon_AmSyncSite())
732        ERROR_EXIT(UNOTSYNC);
733     
734     /* now do the operation locally, and propagate it out */
735     code = udisk_truncate(transPtr, transPtr->seekFile, length);
736     if (!code) {
737         code = ContactQuorum(DISK_Truncate, transPtr, 0, transPtr->seekFile, length);
738     }
739     if (code) {
740         /* we must abort the operation */
741         udisk_abort(transPtr);
742         ContactQuorum(DISK_Abort, transPtr, 0);    /* force aborts to the others */
743         ERROR_EXIT(code);
744     }
745
746   error_exit:
747     DBRELE(transPtr->dbase);
748     return error;
749 }
750
751 /* set a lock; all locks are released on transaction end (commit/abort) */
752 ubik_SetLock(atrans, apos, alen, atype)
753     struct ubik_trans *atrans;
754     afs_int32 apos, alen;                  /* apos and alen are not used */
755     int atype; {
756     afs_int32 code=0, error=0;
757     
758     if (atype == LOCKWRITE) {
759        if (atrans->type == UBIK_READTRANS) return UBADTYPE;
760        code = ubik_Flush(atrans);
761        if (code) return(code);
762     }
763
764     DBHOLD(atrans->dbase);
765     if (atype == LOCKREAD) {
766         code = ulock_getLock(atrans, atype, 1);
767         if (code) ERROR_EXIT(code);
768     }
769     else {
770         /* first, check that quorum is still good, and that dbase is up-to-date */
771         if (!urecovery_AllBetter(atrans->dbase, atrans->flags & TRREADANY))
772            ERROR_EXIT(UNOQUORUM);
773         if (!ubeacon_AmSyncSite())
774            ERROR_EXIT(UNOTSYNC);
775
776         /* now do the operation locally, and propagate it out */
777         code = ulock_getLock(atrans, atype, 1);
778         if (code == 0) {
779             code = ContactQuorum(DISK_Lock, atrans, 0, 0, 
780                                  1/*unused*/, 1/*unused*/, LOCKWRITE);
781         }
782         if (code) {
783             /* we must abort the operation */
784             udisk_abort(atrans);
785             ContactQuorum(DISK_Abort, atrans, 0);    /* force aborts to the others */
786             ERROR_EXIT(code);
787         }
788     }
789
790   error_exit:
791     DBRELE(atrans->dbase);
792     return error;
793 }
794
795 /* utility to wait for a version # to change */
796 int ubik_WaitVersion(adatabase, aversion)
797 register struct ubik_version *aversion;
798 register struct ubik_dbase *adatabase; {
799     while (1) {
800         /* wait until version # changes, and then return */
801         if (vcmp(*aversion, adatabase->version) != 0)
802             return 0;
803         LWP_WaitProcess(&adatabase->version);   /* same vers, just wait */
804     }
805 }
806
807 /* utility to get the version of the dbase a transaction is dealing with */
808 int ubik_GetVersion(atrans, avers)
809 register struct ubik_trans *atrans;
810 register struct ubik_version *avers; {
811     *avers = atrans->dbase->version;
812     return 0;
813 }
814
815 /* Facility to simplify database caching.  Returns zero if last trans was done
816    on the local server and was successful.  If return value is non-zero and the
817    caller is a server caching part of the Ubik database, it should invalidate
818    that cache.  A return value of -1 means bad (NULL) argument. */
819
820 int ubik_CacheUpdate (atrans)
821   register struct ubik_trans *atrans;
822 {
823     if (!(atrans && atrans->dbase)) return -1;
824     return vcmp(atrans->dbase->cachedVersion, atrans->dbase->version) != 0;
825 }
826
827 int panic(a, b, c, d)
828     char *a, *b, *c, *d;
829 {
830     ubik_print("Ubik PANIC: ");
831     ubik_print(a, b, c, d);
832     abort();
833     ubik_print("BACK FROM ABORT\n");    /* shouldn't come back */
834     exit(1);                        /* never know, though  */
835 }
836
837 /*
838 ** This functions takes an IP addresses as its parameter. It returns the
839 ** the primary IP address that is on the host passed in.
840 */
841 afs_uint32
842 ubikGetPrimaryInterfaceAddr(addr)
843 afs_uint32  addr;                       /* network byte order */
844 {
845         struct ubik_server *ts;
846         int j;
847
848         for ( ts=ubik_servers; ts; ts=ts->next )
849             for ( j=0; j < UBIK_MAX_INTERFACE_ADDR; j++)
850                 if ( ts->addr[j] == addr )
851                     return  ts->addr[0];        /* net byte order */
852         return 0; /* if not in server database, return error */
853 }
854