pull-prototypes-to-head-20020821
[openafs.git] / src / ubik / ubik.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 RCSID("$Header$");
14
15 #include <sys/types.h>
16 #ifdef AFS_NT40_ENV
17 #include <winsock2.h>
18 #else
19 #include <sys/file.h>
20 #include <netinet/in.h>
21 #include <sys/param.h>
22 #endif
23 #include <time.h>
24 #include <lock.h>
25 #ifdef HAVE_STRING_H
26 #include <string.h>
27 #else
28 #ifdef HAVE_STRINGS_H
29 #include <strings.h>
30 #endif
31 #endif
32 #include <rx/xdr.h>
33 #include <rx/rx.h>
34 #include <afs/cellconfig.h>
35
36 #define UBIK_INTERNALS
37 #include "ubik.h"
38 #include "ubik_int.h"
39
40 #define ERROR_EXIT(code) {error=(code); goto error_exit;}
41
42 /*  This system is organized in a hierarchical set of related modules.  Modules
43     at one level can only call modules at the same level or below.
44     
45     At the bottom level (0) we have R, RFTP, LWP and IOMGR, i.e. the basic
46     operating system primitives.
47     
48     At the next level (1) we have
49
50         VOTER--The module responsible for casting votes when asked.  It is also
51         responsible for determining whether this server should try to become
52         a synchronization site.
53         
54         BEACONER--The module responsible for sending keep-alives out when a
55         server is actually the sync site, or trying to become a sync site.
56         
57         DISK--The module responsible for representing atomic transactions
58         on the local disk.  It maintains a new-value only log.
59         
60         LOCK--The module responsible for locking byte ranges in the database file.
61         
62     At the next level (2) we have
63           
64         RECOVERY--The module responsible for ensuring that all members of a quorum
65         have the same up-to-date database after a new synchronization site is
66         elected.  This module runs only on the synchronization site.
67         
68     At the next level (3) we have
69     
70         REMOTE--The module responsible for interpreting requests from the sync
71         site and applying them to the database, after obtaining the appropriate
72         locks.
73         
74     At the next level (4) we have
75     
76         UBIK--The module users call to perform operations on the database.
77 */
78
79
80 /* some globals */
81 afs_int32 ubik_quorum=0;
82 struct ubik_dbase *ubik_dbase=0;
83 struct ubik_stats ubik_stats;
84 afs_uint32 ubik_host[UBIK_MAX_INTERFACE_ADDR];
85 afs_int32 ubik_epochTime = 0;
86 afs_int32 urecovery_state = 0;
87 int (*ubik_SRXSecurityProc)();
88 char *ubik_SRXSecurityRock;
89 struct ubik_server *ubik_servers;
90 short ubik_callPortal;
91
92 static int BeginTrans();
93
94 struct rx_securityClass *ubik_sc[3];
95
96 /* perform an operation at a quorum, handling error conditions.  return 0 if
97     all worked, otherwise mark failing server as down and return UERROR
98
99     Note that if any server misses an update, we must wait BIGTIME seconds before
100     allowing the transaction to commit, to ensure that the missing and possibly still
101     functioning server times out and stop handing out old data.  This is done in the commit
102     code, where we wait for a server marked down to have stayed down for BIGTIME seconds
103     before we allow a transaction to commit.  A server that fails but comes back up won't give
104     out old data because it is sent the sync count along with the beacon message that
105     marks it as *really* up (beaconSinceDown).
106 */
107 #define CStampVersion       1       /* meaning set ts->version */
108 afs_int32 ContactQuorum(aproc, atrans, aflags, aparm0, aparm1, aparm2, aparm3, aparm4, aparm5)
109     int (*aproc)();
110     int aflags;
111     register struct ubik_trans *atrans;
112     long aparm0, aparm1, aparm2, aparm3, aparm4, aparm5; {
113     register struct ubik_server *ts;
114     register afs_int32 code;
115     afs_int32 rcode, okcalls;
116
117     rcode = 0;
118     okcalls = 0;
119     for(ts = ubik_servers; ts; ts=ts->next) {
120         /* for each server */
121         if (!ts->up || !ts->currentDB) {
122             ts->currentDB = 0;  /* db is no longer current; we just missed an update */
123             continue;    /* not up-to-date, don't bother */
124         }
125         code = (*aproc) (ts->disk_rxcid, &atrans->tid, aparm0, aparm1, aparm2, aparm3, aparm4, aparm5);
126         if ( (aproc == DISK_WriteV) && (code <= -450) && (code > -500) ) {
127            /* An RPC interface mismatch (as defined in comerr/error_msg.c).
128             * Un-bulk the entries and do individual DISK_Write calls
129             * instead of DISK_WriteV.
130             */
131            iovec_wrt         *iovec_infoP = (iovec_wrt *)aparm0;
132            iovec_buf         *iovec_dataP = (iovec_buf *)aparm1;
133            struct ubik_iovec *iovec       = (struct ubik_iovec *)iovec_infoP->iovec_wrt_val;
134            char              *iobuf       = (char *)iovec_dataP->iovec_buf_val;
135            bulkdata          tcbs;
136            afs_int32             i, offset;
137
138            for (i=0, offset=0; i<iovec_infoP->iovec_wrt_len; i++) {
139               /* Sanity check for going off end of buffer */
140               if ((offset + iovec[i].length) > iovec_dataP->iovec_buf_len) {
141                  code = UINTERNAL;
142                  break;
143               }
144               tcbs.bulkdata_len = iovec[i].length;
145               tcbs.bulkdata_val = &iobuf[offset];
146               code = DISK_Write(ts->disk_rxcid, &atrans->tid,
147                                 iovec[i].file, iovec[i].position, &tcbs);
148               if (code) break;
149
150               offset += iovec[i].length;
151            }
152         }
153         if (code) {         /* failure */
154             rcode = code;
155             ts->up = 0;     /* mark as down now; beacons will no longer be sent */
156             ts->currentDB = 0;
157             ts->beaconSinceDown = 0;
158             urecovery_LostServer(); /* tell recovery to try to resend dbase later */
159         } else {            /* success */
160             if (!ts->isClone)
161                 okcalls++;          /* count up how many worked */
162             if (aflags & CStampVersion) {
163                 ts->version = atrans->dbase->version;
164             }
165         }
166     }
167     /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
168     if (okcalls+1 >= ubik_quorum) return 0;
169     else return rcode;
170 }
171
172 /* This routine initializes the ubik system for a set of servers.  It returns 0 for success, or an error code on failure.  The set of servers is specified by serverList; nServers gives the number of entries in this array.  Finally, dbase is the returned structure representing this instance of a ubik; it is passed to various calls below.  The variable pathName provides an initial prefix used for naming storage files used by this system.  It should perhaps be generalized to a low-level disk interface providing read, write, file enumeration and sync operations.
173
174     Note that the host named by myHost should not also be listed in serverList.
175 */
176
177 int ubik_ServerInitByInfo(myHost, myPort, info, clones, pathName, dbase)
178     struct afsconf_cell *info;  /* in */
179     char clones[];
180     afs_int32 myHost;
181     short myPort;
182     char *pathName;     /* in */
183     struct ubik_dbase **dbase; /* out */ 
184 {
185      afs_int32 code;
186     
187      code = ubik_ServerInitCommon(myHost, myPort, info, clones, 0, pathName, dbase);
188      return code;
189 }
190
191 int ubik_ServerInit(myHost, myPort, serverList, pathName, dbase)
192     afs_int32 serverList[];    /* in */
193     afs_int32 myHost;
194     short myPort;
195     char *pathName;     /* in */
196     struct ubik_dbase **dbase; /* out */ 
197 {
198      afs_int32 code;
199     
200      code = ubik_ServerInitCommon(myHost, myPort, (struct afsconf_cell *)0, 0,
201                         serverList, pathName, dbase);
202      return code;
203 }
204
205 int ubik_ServerInitCommon(myHost, myPort, info, clones, serverList, pathName, dbase)
206     afs_int32 myHost;
207     short myPort;
208     struct afsconf_cell *info;  /* in */
209     char clones[];
210     afs_int32 serverList[];    /* in */
211     char *pathName;     /* in */
212     struct ubik_dbase **dbase; /* out */ 
213 {
214     register struct ubik_dbase *tdb;
215     register afs_int32 code;
216     PROCESS junk;
217     afs_int32 secIndex;
218     struct rx_securityClass *secClass;
219
220     struct rx_service *tservice;
221     extern int VOTE_ExecuteRequest(), DISK_ExecuteRequest();
222     extern void rx_ServerProc();
223     extern int rx_stackSize;
224
225     initialize_U_error_table();
226
227     tdb = (struct ubik_dbase *) malloc(sizeof(struct ubik_dbase));
228     tdb->pathName = (char *) malloc(strlen(pathName)+1);
229     strcpy(tdb->pathName, pathName);
230     tdb->activeTrans = (struct ubik_trans *) 0;
231     memset(&tdb->version, 0, sizeof(struct ubik_version));
232     memset(&tdb->cachedVersion, 0, sizeof(struct ubik_version));
233     Lock_Init(&tdb->versionLock);
234     tdb->flags = 0;
235     tdb->read = uphys_read;
236     tdb->write = uphys_write;
237     tdb->truncate = uphys_truncate;
238     tdb->open = 0;  /* this function isn't used any more */
239     tdb->sync = uphys_sync;
240     tdb->stat = uphys_stat;
241     tdb->getlabel = uphys_getlabel;
242     tdb->setlabel = uphys_setlabel;
243     tdb->getnfiles = uphys_getnfiles;
244     tdb->readers=0;
245     tdb->tidCounter=tdb->writeTidCounter=0;
246     *dbase = tdb;
247     ubik_dbase = tdb;   /* for now, only one db per server; can fix later when we have names for the other dbases */
248
249     /* initialize RX */
250     ubik_callPortal = myPort;
251     /* try to get an additional security object */
252     ubik_sc[0] = rxnull_NewServerSecurityObject();
253     ubik_sc[1] = 0;
254     ubik_sc[2] = 0;
255     if (ubik_SRXSecurityProc) {
256         code = (*ubik_SRXSecurityProc)(ubik_SRXSecurityRock, &secClass, &secIndex);
257         if (code == 0) {
258             ubik_sc[secIndex] = secClass;
259         }
260     }
261     code = rx_Init(myPort);
262     if (code < 0) return code;
263     tservice = rx_NewService(0, VOTE_SERVICE_ID, "VOTE", ubik_sc, 3, VOTE_ExecuteRequest);
264     if (tservice == (struct rx_service *)0) {
265         ubik_dprint("Could not create VOTE rx service!\n");
266         return -1;
267     }
268     rx_SetMinProcs(tservice, 2);
269     rx_SetMaxProcs(tservice, 3);
270
271     tservice = rx_NewService(0, DISK_SERVICE_ID, "DISK", ubik_sc, 3, DISK_ExecuteRequest);
272     if (tservice == (struct rx_service *)0) {
273         ubik_dprint("Could not create DISK rx service!\n");
274         return -1;
275     }
276     rx_SetMinProcs(tservice, 2);
277     rx_SetMaxProcs(tservice, 3);
278
279     /* start an rx_ServerProc to handle incoming RPC's in particular the 
280      * UpdateInterfaceAddr RPC that occurs in ubeacon_InitServerList. This avoids
281      * the "steplock" problem in ubik initialization. Defect 11037.
282      */
283     LWP_CreateProcess(rx_ServerProc, rx_stackSize, RX_PROCESS_PRIORITY,
284                       0, "rx_ServerProc", &junk);
285
286     /* do basic initialization */
287     code = uvote_Init();
288     if (code) return code;
289     code = urecovery_Initialize(tdb);
290     if (code) return code;
291     if (info)
292         code = ubeacon_InitServerListByInfo(myHost, info, clones);
293     else 
294         code = ubeacon_InitServerList(myHost, serverList);
295     if (code) return code;
296
297     /* now start up async processes */
298     code = LWP_CreateProcess(ubeacon_Interact, 16384/*8192*/, LWP_MAX_PRIORITY-1,
299                              0, "beacon", &junk);
300     if (code) return code;
301     code = LWP_CreateProcess(urecovery_Interact, 16384/*8192*/, LWP_MAX_PRIORITY-1,
302                              0, "recovery", &junk);
303     return code;
304 }
305
306 /*  This routine begins a read or write transaction on the transaction
307     identified by transPtr, in the dbase named by dbase.  An open mode of
308     ubik_READTRANS identifies this as a read transaction, while a mode of
309     ubik_WRITETRANS identifies this as a write transaction.  transPtr 
310     is set to the returned transaction control block. The readAny flag is
311     set to 0 or 1 by the wrapper functions ubik_BeginTrans() or 
312     ubik_BeginTransReadAny() below.
313
314     We can only begin transaction when we have an up-to-date database.
315 */
316
317 static int BeginTrans(dbase, transMode, transPtr, readAny)
318     register struct ubik_dbase *dbase;  /* in */
319     int readAny;
320     afs_int32 transMode; /* in */
321     struct ubik_trans **transPtr;       /* out */ {
322     struct ubik_trans *jt;
323     register struct ubik_trans *tt;
324     register afs_int32 code;
325 #if defined(UBIK_PAUSE)
326     int count;
327 #endif /* UBIK_PAUSE */
328
329     if ((transMode != UBIK_READTRANS) && readAny) return UBADTYPE;
330     DBHOLD(dbase);
331 #if defined(UBIK_PAUSE)
332     /* if we're polling the slave sites, wait until the returns
333      *  are all in.  Otherwise, the urecovery_CheckTid call may
334      *  glitch us. 
335      */
336     if (transMode == UBIK_WRITETRANS)
337         for (count = 75; dbase->flags & DBVOTING; --count) {
338             DBRELE(dbase);
339 #ifdef GRAND_PAUSE_DEBUGGING
340             if (count==75)
341                 fprintf (stderr,"%ld: myport=%d: BeginTrans is waiting 'cause of voting conflict\n", time(0), ntohs(ubik_callPortal)); 
342             else
343 #endif
344                 if (count <= 0) {
345 #if 1
346                     fprintf (stderr,"%ld: myport=%d: BeginTrans failed because of voting conflict\n", time(0), ntohs(ubik_callPortal));
347 #endif
348                     return UNOQUORUM;   /* a white lie */
349                 }
350             IOMGR_Sleep(2);
351             DBHOLD(dbase);
352         }
353 #endif /* UBIK_PAUSE */
354     if (urecovery_AllBetter(dbase, readAny)==0) {
355         DBRELE(dbase);
356         return UNOQUORUM;
357     }
358     /* otherwise we have a quorum, use it */
359
360     /* make sure that at most one write transaction occurs at any one time.  This
361         has nothing to do with transaction locking; that's enforced by the lock package.  However,
362         we can't even handle two non-conflicting writes, since our log and recovery modules
363         don't know how to restore one without possibly picking up some data from the other. */
364     if (transMode == UBIK_WRITETRANS) {
365         /* if we're writing already, wait */
366         while(dbase->flags & DBWRITING) {
367             DBRELE(dbase);
368             LWP_WaitProcess(&dbase->flags);
369             DBHOLD(dbase);
370         }
371         if (!ubeacon_AmSyncSite()) {
372             DBRELE(dbase);
373             return UNOTSYNC;
374         }
375     }
376
377     /* create the transaction */
378     code = udisk_begin(dbase, transMode, &jt);  /* can't take address of register var */
379     tt = jt;        /* move to a register */
380     if (code || tt == (struct ubik_trans *)NULL) {
381         DBRELE(dbase);
382         return code;
383     }
384     if (readAny) tt->flags |= TRREADANY;
385     /* label trans and dbase with new tid */
386     tt->tid.epoch = ubik_epochTime;
387     /* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */
388     tt->tid.counter = (dbase->tidCounter += 2);
389
390     if (transMode == UBIK_WRITETRANS) {
391       /* for a write trans, we have to keep track of the write tid counter too */
392 #if defined(UBIK_PAUSE)
393       dbase->writeTidCounter = tt->tid.counter;
394 #else
395       dbase->writeTidCounter += 2;
396 #endif /* UBIK_PAUSE */
397
398         /* next try to start transaction on appropriate number of machines */
399         code = ContactQuorum(DISK_Begin, tt, 0);
400         if (code) {
401             /* we must abort the operation */
402             udisk_abort(tt);
403             ContactQuorum(DISK_Abort, tt, 0);    /* force aborts to the others */
404             udisk_end(tt);
405             DBRELE(dbase);
406             return code;
407         }
408     }
409
410     *transPtr = tt;
411     DBRELE(dbase);
412     return 0;
413 }
414
415 int ubik_BeginTrans(dbase, transMode, transPtr)
416     register struct ubik_dbase *dbase;  /* in */
417     afs_int32 transMode; /* in */
418     struct ubik_trans **transPtr;       /* out */ {
419         return BeginTrans(dbase, transMode, transPtr, 0);
420 }
421
422 int ubik_BeginTransReadAny(dbase, transMode, transPtr)
423     register struct ubik_dbase *dbase;  /* in */
424     afs_int32 transMode; /* in */
425     struct ubik_trans **transPtr;       /* out */ {
426         return BeginTrans(dbase, transMode, transPtr, 1);
427
428  
429 /* this routine ends a read or write transaction by aborting it */
430 int ubik_AbortTrans(transPtr)
431     register struct ubik_trans *transPtr; /* in */ {
432     register afs_int32 code;
433     afs_int32 code2;
434     register struct ubik_dbase *dbase;
435     
436     dbase = transPtr->dbase;
437     DBHOLD(dbase);
438     memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
439     /* see if we're still up-to-date */
440     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
441         udisk_abort(transPtr);
442         udisk_end(transPtr);
443         DBRELE(dbase);
444         return UNOQUORUM;
445     }
446
447     if (transPtr->type == UBIK_READTRANS) {
448         code = udisk_abort(transPtr);
449         udisk_end(transPtr);
450         DBRELE(dbase);
451         return code;
452     }
453
454     /* below here, we know we're doing a write transaction */
455     if (!ubeacon_AmSyncSite()) {
456         udisk_abort(transPtr);
457         udisk_end(transPtr);
458         DBRELE(dbase);
459         return UNOTSYNC;
460     }
461     
462     /* now it is safe to try remote abort */
463     code = ContactQuorum(DISK_Abort, transPtr, 0);
464     code2 = udisk_abort(transPtr);
465     udisk_end(transPtr);
466     DBRELE(dbase);
467     return (code? code : code2);
468 }
469
470 /* This routine ends a read or write transaction on the open transaction identified by transPtr.  It returns an error code. */
471 int ubik_EndTrans(transPtr)
472     register struct ubik_trans *transPtr; /* in */ {
473     register afs_int32 code;
474     struct timeval tv;
475     afs_int32 realStart;
476     register struct ubik_server *ts;
477     afs_int32 now;
478     register struct ubik_dbase *dbase;
479     
480     if (transPtr->type == UBIK_WRITETRANS) {
481        code = ubik_Flush(transPtr);
482        if (code) {
483           ubik_AbortTrans(transPtr);
484           return(code);
485        }
486     }
487
488     dbase = transPtr->dbase;
489     DBHOLD(dbase);
490     memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
491
492     /* give up if no longer current */
493     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
494         udisk_abort(transPtr);
495         udisk_end(transPtr);
496         DBRELE(dbase);
497         return UNOQUORUM;
498     }
499
500     if (transPtr->type == UBIK_READTRANS) { /* reads are easy */
501         code = udisk_commit(transPtr);
502         if (code == 0) goto success;    /* update cachedVersion correctly */
503         udisk_end(transPtr);
504         DBRELE(dbase);
505         return code;
506     }
507
508     if (!ubeacon_AmSyncSite()) {    /* no longer sync site */
509         udisk_abort(transPtr);
510         udisk_end(transPtr);
511         DBRELE(dbase);
512         return UNOTSYNC;
513     }
514     
515     /* now it is safe to do commit */
516     code = udisk_commit(transPtr);
517     if (code == 0) code = ContactQuorum(DISK_Commit, transPtr, CStampVersion);
518     if (code) {
519         /* failed to commit, so must return failure.  Try to clear locks first, just for fun
520             Note that we don't know if this transaction will eventually commit at this point.
521             If it made it to a site that will be present in the next quorum, we win, otherwise
522             we lose.  If we contact a majority of sites, then we won't be here: contacting
523             a majority guarantees commit, since it guarantees that one dude will be a
524             member of the next quorum. */
525         ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
526         udisk_end(transPtr);
527         DBRELE(dbase);
528         return code;
529     }
530     /* before we can start sending unlock messages, we must wait until all servers
531         that are possibly still functioning on the other side of a network partition
532         have timed out.  Check the server structures, compute how long to wait, then
533         start the unlocks */
534     realStart = FT_ApproxTime();
535     while (1) {
536         /* wait for all servers to time out */
537         code = 0;
538         now = FT_ApproxTime();
539         /* check if we're still sync site, the guy should either come up
540             to us, or timeout.  Put safety check in anyway */
541         if (now - realStart > 10 * BIGTIME) {
542             ubik_stats.escapes++;
543             ubik_print("ubik escaping from commit wait\n");
544             break;
545         }
546         for(ts = ubik_servers; ts; ts=ts->next) {
547             if (!ts->beaconSinceDown && now <= ts->lastBeaconSent + BIGTIME) {
548                 /* this guy could have some damaged data, wait for him */
549                 code = 1;
550                 tv.tv_sec = 1;  /* try again after a while (ha ha) */
551                 tv.tv_usec = 0;
552                 IOMGR_Select(0, 0, 0, 0, &tv);  /* poll, should we wait on something? */
553                 break;
554             }
555         }
556         if (code == 0) break;       /* no down ones still pseudo-active */
557     }
558
559     /* finally, unlock all the dudes.  We can return success independent of the number of servers
560         that really unlock the dbase; the others will do it if/when they elect a new sync site.
561         The transaction is committed anyway, since we succeeded in contacting a quorum
562         at the start (when invoking the DiskCommit function).
563     */
564     ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
565
566   success:
567     udisk_end(transPtr);
568     /* update version on successful EndTrans */
569     memcpy(&dbase->cachedVersion, &dbase->version, sizeof(struct ubik_version));
570     
571     DBRELE(dbase);
572     return 0;
573 }
574
575 /* This routine reads length bytes into buffer from the current position in the database.  The file pointer is updated appropriately (by adding the number of bytes actually transferred), and the length actually transferred is stored in the long integer pointed to by length.  Note that *length is an INOUT parameter: at the start it represents the size of the buffer, and when done, it contains the number of bytes actually transferred.  A short read returns zero for an error code. */
576
577 int ubik_Read(transPtr, buffer, length)
578     register struct ubik_trans *transPtr;       /* in */
579     char *buffer;   /* in */
580     afs_int32 length;   /* in */ {
581     register afs_int32 code;
582
583     /* reads are easy to do: handle locally */
584     DBHOLD(transPtr->dbase);
585     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
586         DBRELE(transPtr->dbase);
587         return UNOQUORUM;
588     }
589     
590     code = udisk_read(transPtr, transPtr->seekFile, buffer, transPtr->seekPos, length);
591     if (code == 0) {
592         transPtr->seekPos += length;
593     }
594     DBRELE(transPtr->dbase);
595     return code;
596 }
597
598 /* This routine will flush the io data in the iovec structures. It first
599  * flushes to the local disk and then uses ContactQuorum to write it to 
600  * the other servers.
601  */
602 int ubik_Flush(transPtr)
603     struct ubik_trans *transPtr;
604 {
605     afs_int32 code, error=0;
606
607     if (transPtr->type != UBIK_WRITETRANS)
608        return UBADTYPE;
609     if (!transPtr->iovec_info.iovec_wrt_len || !transPtr->iovec_info.iovec_wrt_val)
610        return 0;
611
612     DBHOLD(transPtr->dbase);
613     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
614        ERROR_EXIT(UNOQUORUM);
615     if (!ubeacon_AmSyncSite())      /* only sync site can write */
616        ERROR_EXIT(UNOTSYNC);
617
618     /* Update the rest of the servers in the quorum */
619     code = ContactQuorum(DISK_WriteV, transPtr, 0, 
620                          &transPtr->iovec_info, &transPtr->iovec_data);
621     if (code) {
622        udisk_abort(transPtr);
623        ContactQuorum(DISK_Abort, transPtr, 0);    /* force aborts to the others */
624        transPtr->iovec_info.iovec_wrt_len = 0;
625        transPtr->iovec_data.iovec_buf_len = 0;
626        ERROR_EXIT(code);
627     }
628
629     /* Wrote the buffers out, so start at scratch again */
630     transPtr->iovec_info.iovec_wrt_len = 0;
631     transPtr->iovec_data.iovec_buf_len = 0;
632
633   error_exit:
634     DBRELE(transPtr->dbase);
635     return error;
636 }
637
638 int ubik_Write(transPtr, buffer, length)
639     register struct ubik_trans *transPtr;       /* in */
640     char *buffer;       /* in */
641     afs_int32 length;   /* in */
642 {
643     struct ubik_iovec *iovec;
644     afs_int32 code, error=0;
645     afs_int32 pos, len, size;
646
647     if (transPtr->type != UBIK_WRITETRANS)
648        return UBADTYPE;
649     if (!length)
650        return 0;
651
652     if (length > IOVEC_MAXBUF) {
653        for (pos=0, len=length; len>0; len-=size, pos+=size) {
654           size = ((len < IOVEC_MAXBUF) ? len : IOVEC_MAXBUF);
655           code = ubik_Write(transPtr, &buffer[pos], size);
656           if (code) return (code);
657        }
658        return 0;
659     }
660
661     if (!transPtr->iovec_info.iovec_wrt_val) {
662        transPtr->iovec_info.iovec_wrt_len  = 0;
663        transPtr->iovec_info.iovec_wrt_val  = 
664          (struct ubik_iovec *)malloc(IOVEC_MAXWRT*sizeof(struct ubik_iovec));
665        transPtr->iovec_data.iovec_buf_len = 0;
666        transPtr->iovec_data.iovec_buf_val = (char *)malloc(IOVEC_MAXBUF);
667        if (!transPtr->iovec_info.iovec_wrt_val || !transPtr->iovec_data.iovec_buf_val) {
668           if (transPtr->iovec_info.iovec_wrt_val) free(transPtr->iovec_info.iovec_wrt_val);
669           transPtr->iovec_info.iovec_wrt_val = 0;
670           if (transPtr->iovec_data.iovec_buf_val) free(transPtr->iovec_data.iovec_buf_val);
671           transPtr->iovec_data.iovec_buf_val = 0;
672           return UNOMEM;
673        }
674     }
675
676     /* If this write won't fit in the structure, then flush it out and start anew */
677     if ( (transPtr->iovec_info.iovec_wrt_len >= IOVEC_MAXWRT) ||
678          ((length + transPtr->iovec_data.iovec_buf_len) > IOVEC_MAXBUF) ) {
679        code = ubik_Flush(transPtr);
680        if (code) return (code);
681     }
682
683     DBHOLD(transPtr->dbase);
684     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
685        ERROR_EXIT(UNOQUORUM);
686     if (!ubeacon_AmSyncSite())      /* only sync site can write */
687        ERROR_EXIT(UNOTSYNC);
688
689     /* Write to the local disk */
690     code = udisk_write(transPtr, transPtr->seekFile, buffer, 
691                                  transPtr->seekPos, length);
692     if (code) {
693        udisk_abort(transPtr);
694        transPtr->iovec_info.iovec_wrt_len = 0;
695        transPtr->iovec_data.iovec_buf_len = 0;
696        DBRELE(transPtr->dbase);
697        return(code);
698     }
699
700     /* Collect writes for the other ubik servers (to be done in bulk) */
701     iovec = (struct ubik_iovec *)transPtr->iovec_info.iovec_wrt_val;
702     iovec[transPtr->iovec_info.iovec_wrt_len].file     = transPtr->seekFile;
703     iovec[transPtr->iovec_info.iovec_wrt_len].position = transPtr->seekPos;
704     iovec[transPtr->iovec_info.iovec_wrt_len].length   = length;
705     
706     memcpy(&transPtr->iovec_data.iovec_buf_val[transPtr->iovec_data.iovec_buf_len], buffer, length);
707
708     transPtr->iovec_info.iovec_wrt_len++;
709     transPtr->iovec_data.iovec_buf_len += length;
710     transPtr->seekPos += length;
711
712   error_exit:
713     DBRELE(transPtr->dbase);
714     return error;
715 }
716
717 /* This sets the file pointer associated with the current transaction to the appropriate file and byte position.  Unlike Unix files, a transaction is labelled by both a file number (fileid) and a byte position relative to the specified file (position). */
718
719 int ubik_Seek(transPtr, fileid, position)
720     register struct ubik_trans *transPtr;       /* IN */
721     afs_int32 fileid;    /* IN */
722     afs_int32 position;  /* IN */ {
723     register afs_int32 code;
724
725     DBHOLD(transPtr->dbase);
726     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
727         code = UNOQUORUM;
728     } else  {
729         transPtr->seekFile = fileid;
730         transPtr->seekPos = position;
731         code = 0;
732     }
733     DBRELE(transPtr->dbase);
734     return code;
735 }
736
737 /* This call returns the file pointer associated with the specified transaction in fileid and position. */
738
739 int ubik_Tell(transPtr, fileid, position)
740     register struct ubik_trans *transPtr;       /* IN */
741     afs_int32 *fileid;   /* OUT */
742     afs_int32 *position; /* OUT */ {
743     DBHOLD(transPtr->dbase);
744     *fileid = transPtr->seekFile;
745     *position = transPtr->seekPos;
746     DBRELE(transPtr->dbase);
747     return 0;
748 }
749
750 /* This sets the file size for the currently-selected file to length bytes, if length is less than the file's current size. */
751
752 int ubik_Truncate(transPtr, length)
753     register struct ubik_trans *transPtr;       /* in */
754     afs_int32 length;    /* in */ {
755     afs_int32 code, error=0;
756
757     /* Will also catch if not UBIK_WRITETRANS */
758     code = ubik_Flush(transPtr);
759     if (code) return(code);
760
761     DBHOLD(transPtr->dbase);
762     /* first, check that quorum is still good, and that dbase is up-to-date */
763     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
764        ERROR_EXIT(UNOQUORUM);
765     if (!ubeacon_AmSyncSite())
766        ERROR_EXIT(UNOTSYNC);
767     
768     /* now do the operation locally, and propagate it out */
769     code = udisk_truncate(transPtr, transPtr->seekFile, length);
770     if (!code) {
771         code = ContactQuorum(DISK_Truncate, transPtr, 0, transPtr->seekFile, length);
772     }
773     if (code) {
774         /* we must abort the operation */
775         udisk_abort(transPtr);
776         ContactQuorum(DISK_Abort, transPtr, 0);    /* force aborts to the others */
777         ERROR_EXIT(code);
778     }
779
780   error_exit:
781     DBRELE(transPtr->dbase);
782     return error;
783 }
784
785 /* set a lock; all locks are released on transaction end (commit/abort) */
786 ubik_SetLock(atrans, apos, alen, atype)
787     struct ubik_trans *atrans;
788     afs_int32 apos, alen;                  /* apos and alen are not used */
789     int atype; {
790     afs_int32 code=0, error=0;
791     
792     if (atype == LOCKWRITE) {
793        if (atrans->type == UBIK_READTRANS) return UBADTYPE;
794        code = ubik_Flush(atrans);
795        if (code) return(code);
796     }
797
798     DBHOLD(atrans->dbase);
799     if (atype == LOCKREAD) {
800         code = ulock_getLock(atrans, atype, 1);
801         if (code) ERROR_EXIT(code);
802     }
803     else {
804         /* first, check that quorum is still good, and that dbase is up-to-date */
805         if (!urecovery_AllBetter(atrans->dbase, atrans->flags & TRREADANY))
806            ERROR_EXIT(UNOQUORUM);
807         if (!ubeacon_AmSyncSite())
808            ERROR_EXIT(UNOTSYNC);
809
810         /* now do the operation locally, and propagate it out */
811         code = ulock_getLock(atrans, atype, 1);
812         if (code == 0) {
813             code = ContactQuorum(DISK_Lock, atrans, 0, 0, 
814                                  1/*unused*/, 1/*unused*/, LOCKWRITE);
815         }
816         if (code) {
817             /* we must abort the operation */
818             udisk_abort(atrans);
819             ContactQuorum(DISK_Abort, atrans, 0);    /* force aborts to the others */
820             ERROR_EXIT(code);
821         }
822     }
823
824   error_exit:
825     DBRELE(atrans->dbase);
826     return error;
827 }
828
829 /* utility to wait for a version # to change */
830 int ubik_WaitVersion(adatabase, aversion)
831 register struct ubik_version *aversion;
832 register struct ubik_dbase *adatabase; {
833     while (1) {
834         /* wait until version # changes, and then return */
835         if (vcmp(*aversion, adatabase->version) != 0)
836             return 0;
837         LWP_WaitProcess(&adatabase->version);   /* same vers, just wait */
838     }
839 }
840
841 /* utility to get the version of the dbase a transaction is dealing with */
842 int ubik_GetVersion(atrans, avers)
843 register struct ubik_trans *atrans;
844 register struct ubik_version *avers; {
845     *avers = atrans->dbase->version;
846     return 0;
847 }
848
849 /* Facility to simplify database caching.  Returns zero if last trans was done
850    on the local server and was successful.  If return value is non-zero and the
851    caller is a server caching part of the Ubik database, it should invalidate
852    that cache.  A return value of -1 means bad (NULL) argument. */
853
854 int ubik_CacheUpdate (atrans)
855   register struct ubik_trans *atrans;
856 {
857     if (!(atrans && atrans->dbase)) return -1;
858     return vcmp(atrans->dbase->cachedVersion, atrans->dbase->version) != 0;
859 }
860
861 int panic(a, b, c, d)
862     char *a, *b, *c, *d;
863 {
864     ubik_print("Ubik PANIC: ");
865     ubik_print(a, b, c, d);
866     abort();
867     ubik_print("BACK FROM ABORT\n");    /* shouldn't come back */
868     exit(1);                        /* never know, though  */
869 }
870
871 /*
872 ** This functions takes an IP addresses as its parameter. It returns the
873 ** the primary IP address that is on the host passed in.
874 */
875 afs_uint32
876 ubikGetPrimaryInterfaceAddr(addr)
877 afs_uint32  addr;                       /* network byte order */
878 {
879         struct ubik_server *ts;
880         int j;
881
882         for ( ts=ubik_servers; ts; ts=ts->next )
883             for ( j=0; j < UBIK_MAX_INTERFACE_ADDR; j++)
884                 if ( ts->addr[j] == addr )
885                     return  ts->addr[0];        /* net byte order */
886         return 0; /* if not in server database, return error */
887 }
888