lwp-protoize-20080310
[openafs.git] / src / ubik / ubik.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 RCSID
14     ("$Header$");
15
16 #include <sys/types.h>
17 #ifdef AFS_NT40_ENV
18 #include <winsock2.h>
19 #else
20 #include <sys/file.h>
21 #include <netinet/in.h>
22 #include <sys/param.h>
23 #endif
24 #include <time.h>
25 #include <lock.h>
26 #include <string.h>
27 #include <rx/xdr.h>
28 #include <rx/rx.h>
29 #include <afs/cellconfig.h>
30
31 #define UBIK_INTERNALS
32 #include "ubik.h"
33 #include "ubik_int.h"
34
35 #define ERROR_EXIT(code) {error=(code); goto error_exit;}
36
37 /*  This system is organized in a hierarchical set of related modules.  Modules
38     at one level can only call modules at the same level or below.
39     
40     At the bottom level (0) we have R, RFTP, LWP and IOMGR, i.e. the basic
41     operating system primitives.
42     
43     At the next level (1) we have
44
45         VOTER--The module responsible for casting votes when asked.  It is also
46         responsible for determining whether this server should try to become
47         a synchronization site.
48         
49         BEACONER--The module responsible for sending keep-alives out when a
50         server is actually the sync site, or trying to become a sync site.
51         
52         DISK--The module responsible for representing atomic transactions
53         on the local disk.  It maintains a new-value only log.
54         
55         LOCK--The module responsible for locking byte ranges in the database file.
56         
57     At the next level (2) we have
58           
59         RECOVERY--The module responsible for ensuring that all members of a quorum
60         have the same up-to-date database after a new synchronization site is
61         elected.  This module runs only on the synchronization site.
62         
63     At the next level (3) we have
64     
65         REMOTE--The module responsible for interpreting requests from the sync
66         site and applying them to the database, after obtaining the appropriate
67         locks.
68         
69     At the next level (4) we have
70     
71         UBIK--The module users call to perform operations on the database.
72 */
73
74
75 /* some globals */
76 afs_int32 ubik_quorum = 0;
77 struct ubik_dbase *ubik_dbase = 0;
78 struct ubik_stats ubik_stats;
79 afs_uint32 ubik_host[UBIK_MAX_INTERFACE_ADDR];
80 afs_int32 ubik_epochTime = 0;
81 afs_int32 urecovery_state = 0;
82 int (*ubik_SRXSecurityProc) ();
83 char *ubik_SRXSecurityRock;
84 struct ubik_server *ubik_servers;
85 short ubik_callPortal;
86
87 static int BeginTrans();
88
89 struct rx_securityClass *ubik_sc[3];
90
91 /* perform an operation at a quorum, handling error conditions.  return 0 if
92     all worked, otherwise mark failing server as down and return UERROR
93
94     Note that if any server misses an update, we must wait BIGTIME seconds before
95     allowing the transaction to commit, to ensure that the missing and possibly still
96     functioning server times out and stop handing out old data.  This is done in the commit
97     code, where we wait for a server marked down to have stayed down for BIGTIME seconds
98     before we allow a transaction to commit.  A server that fails but comes back up won't give
99     out old data because it is sent the sync count along with the beacon message that
100     marks it as *really* up (beaconSinceDown).
101 */
102 #define CStampVersion       1   /* meaning set ts->version */
103 afs_int32
104 ContactQuorum(aproc, atrans, aflags, aparm0, aparm1, aparm2, aparm3, aparm4,
105               aparm5)
106      int (*aproc) ();
107      int aflags;
108      register struct ubik_trans *atrans;
109      long aparm0, aparm1, aparm2, aparm3, aparm4, aparm5;
110 {
111     register struct ubik_server *ts;
112     register afs_int32 code;
113     afs_int32 rcode, okcalls;
114
115     rcode = 0;
116     okcalls = 0;
117     for (ts = ubik_servers; ts; ts = ts->next) {
118         /* for each server */
119         if (!ts->up || !ts->currentDB) {
120             ts->currentDB = 0;  /* db is no longer current; we just missed an update */
121             continue;           /* not up-to-date, don't bother */
122         }
123         code =
124             (*aproc) (ts->disk_rxcid, &atrans->tid, aparm0, aparm1, aparm2,
125                       aparm3, aparm4, aparm5);
126         if ((aproc == DISK_WriteV) && (code <= -450) && (code > -500)) {
127             /* An RPC interface mismatch (as defined in comerr/error_msg.c).
128              * Un-bulk the entries and do individual DISK_Write calls
129              * instead of DISK_WriteV.
130              */
131             iovec_wrt *iovec_infoP = (iovec_wrt *) aparm0;
132             iovec_buf *iovec_dataP = (iovec_buf *) aparm1;
133             struct ubik_iovec *iovec =
134                 (struct ubik_iovec *)iovec_infoP->iovec_wrt_val;
135             char *iobuf = (char *)iovec_dataP->iovec_buf_val;
136             bulkdata tcbs;
137             afs_int32 i, offset;
138
139             for (i = 0, offset = 0; i < iovec_infoP->iovec_wrt_len; i++) {
140                 /* Sanity check for going off end of buffer */
141                 if ((offset + iovec[i].length) > iovec_dataP->iovec_buf_len) {
142                     code = UINTERNAL;
143                     break;
144                 }
145                 tcbs.bulkdata_len = iovec[i].length;
146                 tcbs.bulkdata_val = &iobuf[offset];
147                 code =
148                     DISK_Write(ts->disk_rxcid, &atrans->tid, iovec[i].file,
149                                iovec[i].position, &tcbs);
150                 if (code)
151                     break;
152
153                 offset += iovec[i].length;
154             }
155         }
156         if (code) {             /* failure */
157             rcode = code;
158             ts->up = 0;         /* mark as down now; beacons will no longer be sent */
159             ts->currentDB = 0;
160             ts->beaconSinceDown = 0;
161             urecovery_LostServer();     /* tell recovery to try to resend dbase later */
162         } else {                /* success */
163             if (!ts->isClone)
164                 okcalls++;      /* count up how many worked */
165             if (aflags & CStampVersion) {
166                 ts->version = atrans->dbase->version;
167             }
168         }
169     }
170     /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
171     if (okcalls + 1 >= ubik_quorum)
172         return 0;
173     else
174         return rcode;
175 }
176
177 /* This routine initializes the ubik system for a set of servers.  It returns 0 for success, or an error code on failure.  The set of servers is specified by serverList; nServers gives the number of entries in this array.  Finally, dbase is the returned structure representing this instance of a ubik; it is passed to various calls below.  The variable pathName provides an initial prefix used for naming storage files used by this system.  It should perhaps be generalized to a low-level disk interface providing read, write, file enumeration and sync operations.
178
179     Note that the host named by myHost should not also be listed in serverList.
180 */
181
182 int
183 ubik_ServerInitCommon(afs_int32 myHost, short myPort,
184                       struct afsconf_cell *info, char clones[],
185                       afs_int32 serverList[], char *pathName,
186                       struct ubik_dbase **dbase)
187 {
188     register struct ubik_dbase *tdb;
189     register afs_int32 code;
190     PROCESS junk;
191     afs_int32 secIndex;
192     struct rx_securityClass *secClass;
193
194     struct rx_service *tservice;
195     extern int VOTE_ExecuteRequest(), DISK_ExecuteRequest();
196     extern int rx_stackSize;
197
198     initialize_U_error_table();
199
200     tdb = (struct ubik_dbase *)malloc(sizeof(struct ubik_dbase));
201     tdb->pathName = (char *)malloc(strlen(pathName) + 1);
202     strcpy(tdb->pathName, pathName);
203     tdb->activeTrans = (struct ubik_trans *)0;
204     memset(&tdb->version, 0, sizeof(struct ubik_version));
205     memset(&tdb->cachedVersion, 0, sizeof(struct ubik_version));
206     Lock_Init(&tdb->versionLock);
207     tdb->flags = 0;
208     tdb->read = uphys_read;
209     tdb->write = uphys_write;
210     tdb->truncate = uphys_truncate;
211     tdb->open = 0;              /* this function isn't used any more */
212     tdb->sync = uphys_sync;
213     tdb->stat = uphys_stat;
214     tdb->getlabel = uphys_getlabel;
215     tdb->setlabel = uphys_setlabel;
216     tdb->getnfiles = uphys_getnfiles;
217     tdb->readers = 0;
218     tdb->tidCounter = tdb->writeTidCounter = 0;
219     *dbase = tdb;
220     ubik_dbase = tdb;           /* for now, only one db per server; can fix later when we have names for the other dbases */
221
222     /* initialize RX */
223     ubik_callPortal = myPort;
224     /* try to get an additional security object */
225     ubik_sc[0] = rxnull_NewServerSecurityObject();
226     ubik_sc[1] = 0;
227     ubik_sc[2] = 0;
228     if (ubik_SRXSecurityProc) {
229         code =
230             (*ubik_SRXSecurityProc) (ubik_SRXSecurityRock, &secClass,
231                                      &secIndex);
232         if (code == 0) {
233             ubik_sc[secIndex] = secClass;
234         }
235     }
236     /* for backwards compat this should keep working as it does now 
237        and not host bind */
238     code = rx_Init(myPort);
239     if (code < 0)
240         return code;
241     tservice =
242         rx_NewService(0, VOTE_SERVICE_ID, "VOTE", ubik_sc, 3,
243                       VOTE_ExecuteRequest);
244     if (tservice == (struct rx_service *)0) {
245         ubik_dprint("Could not create VOTE rx service!\n");
246         return -1;
247     }
248     rx_SetMinProcs(tservice, 2);
249     rx_SetMaxProcs(tservice, 3);
250
251     tservice =
252         rx_NewService(0, DISK_SERVICE_ID, "DISK", ubik_sc, 3,
253                       DISK_ExecuteRequest);
254     if (tservice == (struct rx_service *)0) {
255         ubik_dprint("Could not create DISK rx service!\n");
256         return -1;
257     }
258     rx_SetMinProcs(tservice, 2);
259     rx_SetMaxProcs(tservice, 3);
260
261     /* start an rx_ServerProc to handle incoming RPC's in particular the 
262      * UpdateInterfaceAddr RPC that occurs in ubeacon_InitServerList. This avoids
263      * the "steplock" problem in ubik initialization. Defect 11037.
264      */
265     LWP_CreateProcess(rx_ServerProc, rx_stackSize, RX_PROCESS_PRIORITY,
266                       (void *)0, "rx_ServerProc", &junk);
267
268     /* do basic initialization */
269     code = uvote_Init();
270     if (code)
271         return code;
272     code = urecovery_Initialize(tdb);
273     if (code)
274         return code;
275     if (info)
276         code = ubeacon_InitServerListByInfo(myHost, info, clones);
277     else
278         code = ubeacon_InitServerList(myHost, serverList);
279     if (code)
280         return code;
281
282     /* now start up async processes */
283     code = LWP_CreateProcess(ubeacon_Interact, 16384 /*8192 */ ,
284                              LWP_MAX_PRIORITY - 1, (void *)0, "beacon",
285                              &junk);
286     if (code)
287         return code;
288     code = LWP_CreateProcess(urecovery_Interact, 16384 /*8192 */ ,
289                              LWP_MAX_PRIORITY - 1, (void *)0, "recovery",
290                              &junk);
291     return code;
292 }
293
294 int
295 ubik_ServerInitByInfo(afs_int32 myHost, short myPort,
296                       struct afsconf_cell *info, char clones[],
297                       char *pathName, struct ubik_dbase **dbase)
298 {
299     afs_int32 code;
300
301     code =
302         ubik_ServerInitCommon(myHost, myPort, info, clones, 0, pathName,
303                               dbase);
304     return code;
305 }
306
307 int
308 ubik_ServerInit(afs_int32 myHost, short myPort, afs_int32 serverList[],
309                 char *pathName, struct ubik_dbase **dbase)
310 {
311     afs_int32 code;
312
313     code =
314         ubik_ServerInitCommon(myHost, myPort, (struct afsconf_cell *)0, 0,
315                               serverList, pathName, dbase);
316     return code;
317 }
318
319 /*  This routine begins a read or write transaction on the transaction
320     identified by transPtr, in the dbase named by dbase.  An open mode of
321     ubik_READTRANS identifies this as a read transaction, while a mode of
322     ubik_WRITETRANS identifies this as a write transaction.  transPtr 
323     is set to the returned transaction control block. The readAny flag is
324     set to 0 or 1 by the wrapper functions ubik_BeginTrans() or 
325     ubik_BeginTransReadAny() below.
326
327     We can only begin transaction when we have an up-to-date database.
328 */
329
330 static int
331 BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
332            struct ubik_trans **transPtr, int readAny)
333 {
334     struct ubik_trans *jt;
335     register struct ubik_trans *tt;
336     register afs_int32 code;
337 #if defined(UBIK_PAUSE)
338     int count;
339 #endif /* UBIK_PAUSE */
340
341     if ((transMode != UBIK_READTRANS) && readAny)
342         return UBADTYPE;
343     DBHOLD(dbase);
344 #if defined(UBIK_PAUSE)
345     /* if we're polling the slave sites, wait until the returns
346      *  are all in.  Otherwise, the urecovery_CheckTid call may
347      *  glitch us. 
348      */
349     if (transMode == UBIK_WRITETRANS)
350         for (count = 75; dbase->flags & DBVOTING; --count) {
351             DBRELE(dbase);
352 #ifdef GRAND_PAUSE_DEBUGGING
353             if (count == 75)
354                 fprintf(stderr,
355                         "%ld: myport=%d: BeginTrans is waiting 'cause of voting conflict\n",
356                         time(0), ntohs(ubik_callPortal));
357             else
358 #endif
359             if (count <= 0) {
360 #if 1
361                 fprintf(stderr,
362                         "%ld: myport=%d: BeginTrans failed because of voting conflict\n",
363                         time(0), ntohs(ubik_callPortal));
364 #endif
365                 return UNOQUORUM;       /* a white lie */
366             }
367             IOMGR_Sleep(2);
368             DBHOLD(dbase);
369         }
370 #endif /* UBIK_PAUSE */
371     if (urecovery_AllBetter(dbase, readAny) == 0) {
372         DBRELE(dbase);
373         return UNOQUORUM;
374     }
375     /* otherwise we have a quorum, use it */
376
377     /* make sure that at most one write transaction occurs at any one time.  This
378      * has nothing to do with transaction locking; that's enforced by the lock package.  However,
379      * we can't even handle two non-conflicting writes, since our log and recovery modules
380      * don't know how to restore one without possibly picking up some data from the other. */
381     if (transMode == UBIK_WRITETRANS) {
382         /* if we're writing already, wait */
383         while (dbase->flags & DBWRITING) {
384             DBRELE(dbase);
385             LWP_WaitProcess(&dbase->flags);
386             DBHOLD(dbase);
387         }
388         if (!ubeacon_AmSyncSite()) {
389             DBRELE(dbase);
390             return UNOTSYNC;
391         }
392     }
393
394     /* create the transaction */
395     code = udisk_begin(dbase, transMode, &jt);  /* can't take address of register var */
396     tt = jt;                    /* move to a register */
397     if (code || tt == (struct ubik_trans *)NULL) {
398         DBRELE(dbase);
399         return code;
400     }
401     if (readAny)
402         tt->flags |= TRREADANY;
403     /* label trans and dbase with new tid */
404     tt->tid.epoch = ubik_epochTime;
405     /* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */
406     tt->tid.counter = (dbase->tidCounter += 2);
407
408     if (transMode == UBIK_WRITETRANS) {
409         /* for a write trans, we have to keep track of the write tid counter too */
410 #if defined(UBIK_PAUSE)
411         dbase->writeTidCounter = tt->tid.counter;
412 #else
413         dbase->writeTidCounter += 2;
414 #endif /* UBIK_PAUSE */
415
416         /* next try to start transaction on appropriate number of machines */
417         code = ContactQuorum(DISK_Begin, tt, 0);
418         if (code) {
419             /* we must abort the operation */
420             udisk_abort(tt);
421             ContactQuorum(DISK_Abort, tt, 0);   /* force aborts to the others */
422             udisk_end(tt);
423             DBRELE(dbase);
424             return code;
425         }
426     }
427
428     *transPtr = tt;
429     DBRELE(dbase);
430     return 0;
431 }
432
433 int
434 ubik_BeginTrans(register struct ubik_dbase *dbase, afs_int32 transMode,
435                 struct ubik_trans **transPtr)
436 {
437     return BeginTrans(dbase, transMode, transPtr, 0);
438 }
439
440 int
441 ubik_BeginTransReadAny(register struct ubik_dbase *dbase, afs_int32 transMode,
442                        struct ubik_trans **transPtr)
443 {
444     return BeginTrans(dbase, transMode, transPtr, 1);
445 }
446
447 /* this routine ends a read or write transaction by aborting it */
448 int
449 ubik_AbortTrans(register struct ubik_trans *transPtr)
450 {
451     register afs_int32 code;
452     afs_int32 code2;
453     register struct ubik_dbase *dbase;
454
455     dbase = transPtr->dbase;
456     DBHOLD(dbase);
457     memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
458     /* see if we're still up-to-date */
459     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
460         udisk_abort(transPtr);
461         udisk_end(transPtr);
462         DBRELE(dbase);
463         return UNOQUORUM;
464     }
465
466     if (transPtr->type == UBIK_READTRANS) {
467         code = udisk_abort(transPtr);
468         udisk_end(transPtr);
469         DBRELE(dbase);
470         return code;
471     }
472
473     /* below here, we know we're doing a write transaction */
474     if (!ubeacon_AmSyncSite()) {
475         udisk_abort(transPtr);
476         udisk_end(transPtr);
477         DBRELE(dbase);
478         return UNOTSYNC;
479     }
480
481     /* now it is safe to try remote abort */
482     code = ContactQuorum(DISK_Abort, transPtr, 0);
483     code2 = udisk_abort(transPtr);
484     udisk_end(transPtr);
485     DBRELE(dbase);
486     return (code ? code : code2);
487 }
488
489 /* This routine ends a read or write transaction on the open transaction identified by transPtr.  It returns an error code. */
490 int
491 ubik_EndTrans(register struct ubik_trans *transPtr)
492 {
493     register afs_int32 code;
494     struct timeval tv;
495     afs_int32 realStart;
496     register struct ubik_server *ts;
497     afs_int32 now;
498     register struct ubik_dbase *dbase;
499
500     if (transPtr->type == UBIK_WRITETRANS) {
501         code = ubik_Flush(transPtr);
502         if (code) {
503             ubik_AbortTrans(transPtr);
504             return (code);
505         }
506     }
507
508     dbase = transPtr->dbase;
509     DBHOLD(dbase);
510     memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
511
512     /* give up if no longer current */
513     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
514         udisk_abort(transPtr);
515         udisk_end(transPtr);
516         DBRELE(dbase);
517         return UNOQUORUM;
518     }
519
520     if (transPtr->type == UBIK_READTRANS) {     /* reads are easy */
521         code = udisk_commit(transPtr);
522         if (code == 0)
523             goto success;       /* update cachedVersion correctly */
524         udisk_end(transPtr);
525         DBRELE(dbase);
526         return code;
527     }
528
529     if (!ubeacon_AmSyncSite()) {        /* no longer sync site */
530         udisk_abort(transPtr);
531         udisk_end(transPtr);
532         DBRELE(dbase);
533         return UNOTSYNC;
534     }
535
536     /* now it is safe to do commit */
537     code = udisk_commit(transPtr);
538     if (code == 0)
539         code = ContactQuorum(DISK_Commit, transPtr, CStampVersion);
540     if (code) {
541         /* failed to commit, so must return failure.  Try to clear locks first, just for fun
542          * Note that we don't know if this transaction will eventually commit at this point.
543          * If it made it to a site that will be present in the next quorum, we win, otherwise
544          * we lose.  If we contact a majority of sites, then we won't be here: contacting
545          * a majority guarantees commit, since it guarantees that one dude will be a
546          * member of the next quorum. */
547         ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
548         udisk_end(transPtr);
549         DBRELE(dbase);
550         return code;
551     }
552     /* before we can start sending unlock messages, we must wait until all servers
553      * that are possibly still functioning on the other side of a network partition
554      * have timed out.  Check the server structures, compute how long to wait, then
555      * start the unlocks */
556     realStart = FT_ApproxTime();
557     while (1) {
558         /* wait for all servers to time out */
559         code = 0;
560         now = FT_ApproxTime();
561         /* check if we're still sync site, the guy should either come up
562          * to us, or timeout.  Put safety check in anyway */
563         if (now - realStart > 10 * BIGTIME) {
564             ubik_stats.escapes++;
565             ubik_print("ubik escaping from commit wait\n");
566             break;
567         }
568         for (ts = ubik_servers; ts; ts = ts->next) {
569             if (!ts->beaconSinceDown && now <= ts->lastBeaconSent + BIGTIME) {
570                 /* this guy could have some damaged data, wait for him */
571                 code = 1;
572                 tv.tv_sec = 1;  /* try again after a while (ha ha) */
573                 tv.tv_usec = 0;
574                 IOMGR_Select(0, 0, 0, 0, &tv);  /* poll, should we wait on something? */
575                 break;
576             }
577         }
578         if (code == 0)
579             break;              /* no down ones still pseudo-active */
580     }
581
582     /* finally, unlock all the dudes.  We can return success independent of the number of servers
583      * that really unlock the dbase; the others will do it if/when they elect a new sync site.
584      * The transaction is committed anyway, since we succeeded in contacting a quorum
585      * at the start (when invoking the DiskCommit function).
586      */
587     ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
588
589   success:
590     udisk_end(transPtr);
591     /* update version on successful EndTrans */
592     memcpy(&dbase->cachedVersion, &dbase->version,
593            sizeof(struct ubik_version));
594
595     DBRELE(dbase);
596     return 0;
597 }
598
599 /* This routine reads length bytes into buffer from the current position in the database.  The file pointer is updated appropriately (by adding the number of bytes actually transferred), and the length actually transferred is stored in the long integer pointed to by length.  Note that *length is an INOUT parameter: at the start it represents the size of the buffer, and when done, it contains the number of bytes actually transferred.  A short read returns zero for an error code. */
600
601 int
602 ubik_Read(register struct ubik_trans *transPtr, char *buffer,
603           afs_int32 length)
604 {
605     register afs_int32 code;
606
607     /* reads are easy to do: handle locally */
608     DBHOLD(transPtr->dbase);
609     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
610         DBRELE(transPtr->dbase);
611         return UNOQUORUM;
612     }
613
614     code =
615         udisk_read(transPtr, transPtr->seekFile, buffer, transPtr->seekPos,
616                    length);
617     if (code == 0) {
618         transPtr->seekPos += length;
619     }
620     DBRELE(transPtr->dbase);
621     return code;
622 }
623
624 /* This routine will flush the io data in the iovec structures. It first
625  * flushes to the local disk and then uses ContactQuorum to write it to 
626  * the other servers.
627  */
628 int
629 ubik_Flush(struct ubik_trans *transPtr)
630 {
631     afs_int32 code, error = 0;
632
633     if (transPtr->type != UBIK_WRITETRANS)
634         return UBADTYPE;
635     if (!transPtr->iovec_info.iovec_wrt_len
636         || !transPtr->iovec_info.iovec_wrt_val)
637         return 0;
638
639     DBHOLD(transPtr->dbase);
640     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
641         ERROR_EXIT(UNOQUORUM);
642     if (!ubeacon_AmSyncSite())  /* only sync site can write */
643         ERROR_EXIT(UNOTSYNC);
644
645     /* Update the rest of the servers in the quorum */
646     code =
647         ContactQuorum(DISK_WriteV, transPtr, 0, &transPtr->iovec_info,
648                       &transPtr->iovec_data);
649     if (code) {
650         udisk_abort(transPtr);
651         ContactQuorum(DISK_Abort, transPtr, 0); /* force aborts to the others */
652         transPtr->iovec_info.iovec_wrt_len = 0;
653         transPtr->iovec_data.iovec_buf_len = 0;
654         ERROR_EXIT(code);
655     }
656
657     /* Wrote the buffers out, so start at scratch again */
658     transPtr->iovec_info.iovec_wrt_len = 0;
659     transPtr->iovec_data.iovec_buf_len = 0;
660
661   error_exit:
662     DBRELE(transPtr->dbase);
663     return error;
664 }
665
666 int
667 ubik_Write(register struct ubik_trans *transPtr, char *buffer,
668            afs_int32 length)
669 {
670     struct ubik_iovec *iovec;
671     afs_int32 code, error = 0;
672     afs_int32 pos, len, size;
673
674     if (transPtr->type != UBIK_WRITETRANS)
675         return UBADTYPE;
676     if (!length)
677         return 0;
678
679     if (length > IOVEC_MAXBUF) {
680         for (pos = 0, len = length; len > 0; len -= size, pos += size) {
681             size = ((len < IOVEC_MAXBUF) ? len : IOVEC_MAXBUF);
682             code = ubik_Write(transPtr, &buffer[pos], size);
683             if (code)
684                 return (code);
685         }
686         return 0;
687     }
688
689     if (!transPtr->iovec_info.iovec_wrt_val) {
690         transPtr->iovec_info.iovec_wrt_len = 0;
691         transPtr->iovec_info.iovec_wrt_val =
692             (struct ubik_iovec *)malloc(IOVEC_MAXWRT *
693                                         sizeof(struct ubik_iovec));
694         transPtr->iovec_data.iovec_buf_len = 0;
695         transPtr->iovec_data.iovec_buf_val = (char *)malloc(IOVEC_MAXBUF);
696         if (!transPtr->iovec_info.iovec_wrt_val
697             || !transPtr->iovec_data.iovec_buf_val) {
698             if (transPtr->iovec_info.iovec_wrt_val)
699                 free(transPtr->iovec_info.iovec_wrt_val);
700             transPtr->iovec_info.iovec_wrt_val = 0;
701             if (transPtr->iovec_data.iovec_buf_val)
702                 free(transPtr->iovec_data.iovec_buf_val);
703             transPtr->iovec_data.iovec_buf_val = 0;
704             return UNOMEM;
705         }
706     }
707
708     /* If this write won't fit in the structure, then flush it out and start anew */
709     if ((transPtr->iovec_info.iovec_wrt_len >= IOVEC_MAXWRT)
710         || ((length + transPtr->iovec_data.iovec_buf_len) > IOVEC_MAXBUF)) {
711         code = ubik_Flush(transPtr);
712         if (code)
713             return (code);
714     }
715
716     DBHOLD(transPtr->dbase);
717     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
718         ERROR_EXIT(UNOQUORUM);
719     if (!ubeacon_AmSyncSite())  /* only sync site can write */
720         ERROR_EXIT(UNOTSYNC);
721
722     /* Write to the local disk */
723     code =
724         udisk_write(transPtr, transPtr->seekFile, buffer, transPtr->seekPos,
725                     length);
726     if (code) {
727         udisk_abort(transPtr);
728         transPtr->iovec_info.iovec_wrt_len = 0;
729         transPtr->iovec_data.iovec_buf_len = 0;
730         DBRELE(transPtr->dbase);
731         return (code);
732     }
733
734     /* Collect writes for the other ubik servers (to be done in bulk) */
735     iovec = (struct ubik_iovec *)transPtr->iovec_info.iovec_wrt_val;
736     iovec[transPtr->iovec_info.iovec_wrt_len].file = transPtr->seekFile;
737     iovec[transPtr->iovec_info.iovec_wrt_len].position = transPtr->seekPos;
738     iovec[transPtr->iovec_info.iovec_wrt_len].length = length;
739
740     memcpy(&transPtr->iovec_data.
741            iovec_buf_val[transPtr->iovec_data.iovec_buf_len], buffer, length);
742
743     transPtr->iovec_info.iovec_wrt_len++;
744     transPtr->iovec_data.iovec_buf_len += length;
745     transPtr->seekPos += length;
746
747   error_exit:
748     DBRELE(transPtr->dbase);
749     return error;
750 }
751
752 /* This sets the file pointer associated with the current transaction to the appropriate file and byte position.  Unlike Unix files, a transaction is labelled by both a file number (fileid) and a byte position relative to the specified file (position). */
753
754 int
755 ubik_Seek(register struct ubik_trans *transPtr, afs_int32 fileid,
756           afs_int32 position)
757 {
758     register afs_int32 code;
759
760     DBHOLD(transPtr->dbase);
761     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
762         code = UNOQUORUM;
763     } else {
764         transPtr->seekFile = fileid;
765         transPtr->seekPos = position;
766         code = 0;
767     }
768     DBRELE(transPtr->dbase);
769     return code;
770 }
771
772 /* This call returns the file pointer associated with the specified transaction in fileid and position. */
773
774 int
775 ubik_Tell(register struct ubik_trans *transPtr, afs_int32 * fileid,
776           afs_int32 * position)
777 {
778     DBHOLD(transPtr->dbase);
779     *fileid = transPtr->seekFile;
780     *position = transPtr->seekPos;
781     DBRELE(transPtr->dbase);
782     return 0;
783 }
784
785 /* This sets the file size for the currently-selected file to length bytes, if length is less than the file's current size. */
786
787 int
788 ubik_Truncate(register struct ubik_trans *transPtr, afs_int32 length)
789 {
790     afs_int32 code, error = 0;
791
792     /* Will also catch if not UBIK_WRITETRANS */
793     code = ubik_Flush(transPtr);
794     if (code)
795         return (code);
796
797     DBHOLD(transPtr->dbase);
798     /* first, check that quorum is still good, and that dbase is up-to-date */
799     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
800         ERROR_EXIT(UNOQUORUM);
801     if (!ubeacon_AmSyncSite())
802         ERROR_EXIT(UNOTSYNC);
803
804     /* now do the operation locally, and propagate it out */
805     code = udisk_truncate(transPtr, transPtr->seekFile, length);
806     if (!code) {
807         code =
808             ContactQuorum(DISK_Truncate, transPtr, 0, transPtr->seekFile,
809                           length);
810     }
811     if (code) {
812         /* we must abort the operation */
813         udisk_abort(transPtr);
814         ContactQuorum(DISK_Abort, transPtr, 0); /* force aborts to the others */
815         ERROR_EXIT(code);
816     }
817
818   error_exit:
819     DBRELE(transPtr->dbase);
820     return error;
821 }
822
823 /* set a lock; all locks are released on transaction end (commit/abort) */
824 int
825 ubik_SetLock(struct ubik_trans *atrans, afs_int32 apos, afs_int32 alen,
826              int atype)
827 {
828     afs_int32 code = 0, error = 0;
829
830     if (atype == LOCKWRITE) {
831         if (atrans->type == UBIK_READTRANS)
832             return UBADTYPE;
833         code = ubik_Flush(atrans);
834         if (code)
835             return (code);
836     }
837
838     DBHOLD(atrans->dbase);
839     if (atype == LOCKREAD) {
840         code = ulock_getLock(atrans, atype, 1);
841         if (code)
842             ERROR_EXIT(code);
843     } else {
844         /* first, check that quorum is still good, and that dbase is up-to-date */
845         if (!urecovery_AllBetter(atrans->dbase, atrans->flags & TRREADANY))
846             ERROR_EXIT(UNOQUORUM);
847         if (!ubeacon_AmSyncSite())
848             ERROR_EXIT(UNOTSYNC);
849
850         /* now do the operation locally, and propagate it out */
851         code = ulock_getLock(atrans, atype, 1);
852         if (code == 0) {
853             code = ContactQuorum(DISK_Lock, atrans, 0, 0, 1 /*unused */ ,
854                                  1 /*unused */ , LOCKWRITE);
855         }
856         if (code) {
857             /* we must abort the operation */
858             udisk_abort(atrans);
859             ContactQuorum(DISK_Abort, atrans, 0);       /* force aborts to the others */
860             ERROR_EXIT(code);
861         }
862     }
863
864   error_exit:
865     DBRELE(atrans->dbase);
866     return error;
867 }
868
869 /* utility to wait for a version # to change */
870 int
871 ubik_WaitVersion(register struct ubik_dbase *adatabase,
872                  register struct ubik_version *aversion)
873 {
874     while (1) {
875         /* wait until version # changes, and then return */
876         if (vcmp(*aversion, adatabase->version) != 0)
877             return 0;
878         LWP_WaitProcess(&adatabase->version);   /* same vers, just wait */
879     }
880 }
881
882 /* utility to get the version of the dbase a transaction is dealing with */
883 int
884 ubik_GetVersion(register struct ubik_trans *atrans,
885                 register struct ubik_version *avers)
886 {
887     *avers = atrans->dbase->version;
888     return 0;
889 }
890
891 /* Facility to simplify database caching.  Returns zero if last trans was done
892    on the local server and was successful.  If return value is non-zero and the
893    caller is a server caching part of the Ubik database, it should invalidate
894    that cache.  A return value of -1 means bad (NULL) argument. */
895
896 int
897 ubik_CacheUpdate(register struct ubik_trans *atrans)
898 {
899     if (!(atrans && atrans->dbase))
900         return -1;
901     return vcmp(atrans->dbase->cachedVersion, atrans->dbase->version) != 0;
902 }
903
904 int
905 panic(char *a, char *b, char *c, char *d)
906 {
907     ubik_print("Ubik PANIC: ");
908     ubik_print(a, b, c, d);
909     abort();
910     ubik_print("BACK FROM ABORT\n");    /* shouldn't come back */
911     exit(1);                    /* never know, though  */
912 }
913
914 /*
915 ** This functions takes an IP addresses as its parameter. It returns the
916 ** the primary IP address that is on the host passed in.
917 */
918 afs_uint32
919 ubikGetPrimaryInterfaceAddr(afs_uint32 addr)
920 {
921     struct ubik_server *ts;
922     int j;
923
924     for (ts = ubik_servers; ts; ts = ts->next)
925         for (j = 0; j < UBIK_MAX_INTERFACE_ADDR; j++)
926             if (ts->addr[j] == addr)
927                 return ts->addr[0];     /* net byte order */
928     return 0;                   /* if not in server database, return error */
929 }