ubik-pause-collapsing-20020624
[openafs.git] / src / ubik / ubik.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 RCSID("$Header$");
14
15 #include <sys/types.h>
16 #ifdef AFS_NT40_ENV
17 #include <winsock2.h>
18 #else
19 #include <sys/file.h>
20 #include <netinet/in.h>
21 #include <sys/param.h>
22 #endif
23 #include <time.h>
24 #include <lock.h>
25 #ifdef HAVE_STRING_H
26 #include <string.h>
27 #else
28 #ifdef HAVE_STRINGS_H
29 #include <strings.h>
30 #endif
31 #endif
32 #include <rx/xdr.h>
33 #include <rx/rx.h>
34 #include <afs/cellconfig.h>
35
36 #define UBIK_INTERNALS
37 #include "ubik.h"
38 #include "ubik_int.h"
39
40 #define ERROR_EXIT(code) {error=(code); goto error_exit;}
41
42 /*  This system is organized in a hierarchical set of related modules.  Modules
43     at one level can only call modules at the same level or below.
44     
45     At the bottom level (0) we have R, RFTP, LWP and IOMGR, i.e. the basic
46     operating system primitives.
47     
48     At the next level (1) we have
49
50         VOTER--The module responsible for casting votes when asked.  It is also
51         responsible for determining whether this server should try to become
52         a synchronization site.
53         
54         BEACONER--The module responsible for sending keep-alives out when a
55         server is actually the sync site, or trying to become a sync site.
56         
57         DISK--The module responsible for representing atomic transactions
58         on the local disk.  It maintains a new-value only log.
59         
60         LOCK--The module responsible for locking byte ranges in the database file.
61         
62     At the next level (2) we have
63           
64         RECOVERY--The module responsible for ensuring that all members of a quorum
65         have the same up-to-date database after a new synchronization site is
66         elected.  This module runs only on the synchronization site.
67         
68     At the next level (3) we have
69     
70         REMOTE--The module responsible for interpreting requests from the sync
71         site and applying them to the database, after obtaining the appropriate
72         locks.
73         
74     At the next level (4) we have
75     
76         UBIK--The module users call to perform operations on the database.
77 */
78
79
80 /* some globals */
81 afs_int32 ubik_quorum=0;
82 struct ubik_dbase *ubik_dbase=0;
83 struct ubik_stats ubik_stats;
84 afs_uint32 ubik_host[UBIK_MAX_INTERFACE_ADDR];
85 afs_int32 ubik_epochTime = 0;
86 afs_int32 urecovery_state = 0;
87 int (*ubik_SRXSecurityProc)();
88 char *ubik_SRXSecurityRock;
89 struct ubik_server *ubik_servers;
90 short ubik_callPortal;
91
92 static int BeginTrans();
93
94 struct rx_securityClass *ubik_sc[3];
95
96 /* perform an operation at a quorum, handling error conditions.  return 0 if
97     all worked, otherwise mark failing server as down and return UERROR
98
99     Note that if any server misses an update, we must wait BIGTIME seconds before
100     allowing the transaction to commit, to ensure that the missing and possibly still
101     functioning server times out and stop handing out old data.  This is done in the commit
102     code, where we wait for a server marked down to have stayed down for BIGTIME seconds
103     before we allow a transaction to commit.  A server that fails but comes back up won't give
104     out old data because it is sent the sync count along with the beacon message that
105     marks it as *really* up (beaconSinceDown).
106 */
107 #define CStampVersion       1       /* meaning set ts->version */
108 afs_int32 ContactQuorum(aproc, atrans, aflags, aparm0, aparm1, aparm2, aparm3, aparm4, aparm5)
109     int (*aproc)();
110     int aflags;
111     register struct ubik_trans *atrans;
112     long aparm0, aparm1, aparm2, aparm3, aparm4, aparm5; {
113     register struct ubik_server *ts;
114     register afs_int32 code;
115     afs_int32 rcode, okcalls;
116
117     rcode = 0;
118     okcalls = 0;
119     for(ts = ubik_servers; ts; ts=ts->next) {
120         /* for each server */
121         if (!ts->up || !ts->currentDB) {
122             ts->currentDB = 0;  /* db is no longer current; we just missed an update */
123             continue;    /* not up-to-date, don't bother */
124         }
125         code = (*aproc) (ts->disk_rxcid, &atrans->tid, aparm0, aparm1, aparm2, aparm3, aparm4, aparm5);
126         if ( (aproc == DISK_WriteV) && (code <= -450) && (code > -500) ) {
127            /* An RPC interface mismatch (as defined in comerr/error_msg.c).
128             * Un-bulk the entries and do individual DISK_Write calls
129             * instead of DISK_WriteV.
130             */
131            iovec_wrt         *iovec_infoP = (iovec_wrt *)aparm0;
132            iovec_buf         *iovec_dataP = (iovec_buf *)aparm1;
133            struct ubik_iovec *iovec       = (struct ubik_iovec *)iovec_infoP->iovec_wrt_val;
134            char              *iobuf       = (char *)iovec_dataP->iovec_buf_val;
135            bulkdata          tcbs;
136            afs_int32             i, offset;
137
138            for (i=0, offset=0; i<iovec_infoP->iovec_wrt_len; i++) {
139               /* Sanity check for going off end of buffer */
140               if ((offset + iovec[i].length) > iovec_dataP->iovec_buf_len) {
141                  code = UINTERNAL;
142                  break;
143               }
144               tcbs.bulkdata_len = iovec[i].length;
145               tcbs.bulkdata_val = &iobuf[offset];
146               code = DISK_Write(ts->disk_rxcid, &atrans->tid,
147                                 iovec[i].file, iovec[i].position, &tcbs);
148               if (code) break;
149
150               offset += iovec[i].length;
151            }
152         }
153         if (code) {         /* failure */
154             rcode = code;
155             ts->up = 0;     /* mark as down now; beacons will no longer be sent */
156             ts->currentDB = 0;
157             ts->beaconSinceDown = 0;
158             urecovery_LostServer(); /* tell recovery to try to resend dbase later */
159         } else {            /* success */
160             if (!ts->isClone)
161                 okcalls++;          /* count up how many worked */
162             if (aflags & CStampVersion) {
163                 ts->version = atrans->dbase->version;
164             }
165         }
166     }
167     /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
168     if (okcalls+1 >= ubik_quorum) return 0;
169     else return rcode;
170 }
171
172 /* This routine initializes the ubik system for a set of servers.  It returns 0 for success, or an error code on failure.  The set of servers is specified by serverList; nServers gives the number of entries in this array.  Finally, dbase is the returned structure representing this instance of a ubik; it is passed to various calls below.  The variable pathName provides an initial prefix used for naming storage files used by this system.  It should perhaps be generalized to a low-level disk interface providing read, write, file enumeration and sync operations.
173
174     Note that the host named by myHost should not also be listed in serverList.
175 */
176
177 int ubik_ServerInitByInfo(myHost, myPort, info, clones, pathName, dbase)
178     struct afsconf_cell *info;  /* in */
179     char clones[];
180     afs_int32 myHost;
181     short myPort;
182     char *pathName;     /* in */
183     struct ubik_dbase **dbase; /* out */ 
184 {
185      afs_int32 code;
186     
187      code = ubik_ServerInitCommon(myHost, myPort, info, clones, 0, pathName, dbase);
188      return code;
189 }
190
191 int ubik_ServerInit(myHost, myPort, serverList, pathName, dbase)
192     afs_int32 serverList[];    /* in */
193     afs_int32 myHost;
194     short myPort;
195     char *pathName;     /* in */
196     struct ubik_dbase **dbase; /* out */ 
197 {
198      afs_int32 code;
199     
200      code = ubik_ServerInitCommon(myHost, myPort, (struct afsconf_cell *)0, 0,
201                         serverList, pathName, dbase);
202      return code;
203 }
204
205 int ubik_ServerInitCommon(myHost, myPort, info, clones, serverList, pathName, dbase)
206     afs_int32 myHost;
207     short myPort;
208     struct afsconf_cell *info;  /* in */
209     char clones[];
210     afs_int32 serverList[];    /* in */
211     char *pathName;     /* in */
212     struct ubik_dbase **dbase; /* out */ 
213 {
214     register struct ubik_dbase *tdb;
215     register afs_int32 code;
216     PROCESS junk;
217     afs_int32 secIndex;
218     struct rx_securityClass *secClass;
219
220     struct rx_service *tservice;
221     extern struct rx_securityClass *rxnull_NewServerSecurityObject();
222     extern int VOTE_ExecuteRequest(), DISK_ExecuteRequest();
223     extern void rx_ServerProc();
224     extern int rx_stackSize;
225
226     initialize_U_error_table();
227
228     tdb = (struct ubik_dbase *) malloc(sizeof(struct ubik_dbase));
229     tdb->pathName = (char *) malloc(strlen(pathName)+1);
230     strcpy(tdb->pathName, pathName);
231     tdb->activeTrans = (struct ubik_trans *) 0;
232     memset(&tdb->version, 0, sizeof(struct ubik_version));
233     memset(&tdb->cachedVersion, 0, sizeof(struct ubik_version));
234     Lock_Init(&tdb->versionLock);
235     tdb->flags = 0;
236     tdb->read = uphys_read;
237     tdb->write = uphys_write;
238     tdb->truncate = uphys_truncate;
239     tdb->open = 0;  /* this function isn't used any more */
240     tdb->sync = uphys_sync;
241     tdb->stat = uphys_stat;
242     tdb->getlabel = uphys_getlabel;
243     tdb->setlabel = uphys_setlabel;
244     tdb->getnfiles = uphys_getnfiles;
245     tdb->readers=0;
246     tdb->tidCounter=tdb->writeTidCounter=0;
247     *dbase = tdb;
248     ubik_dbase = tdb;   /* for now, only one db per server; can fix later when we have names for the other dbases */
249
250     /* initialize RX */
251     ubik_callPortal = myPort;
252     /* try to get an additional security object */
253     ubik_sc[0] = rxnull_NewServerSecurityObject();
254     ubik_sc[1] = 0;
255     ubik_sc[2] = 0;
256     if (ubik_SRXSecurityProc) {
257         code = (*ubik_SRXSecurityProc)(ubik_SRXSecurityRock, &secClass, &secIndex);
258         if (code == 0) {
259             ubik_sc[secIndex] = secClass;
260         }
261     }
262     code = rx_Init(myPort);
263     if (code < 0) return code;
264     tservice = rx_NewService(0, VOTE_SERVICE_ID, "VOTE", ubik_sc, 3, VOTE_ExecuteRequest);
265     if (tservice == (struct rx_service *)0) {
266         ubik_dprint("Could not create VOTE rx service!\n");
267         return -1;
268     }
269     rx_SetMinProcs(tservice, 2);
270     rx_SetMaxProcs(tservice, 3);
271
272     tservice = rx_NewService(0, DISK_SERVICE_ID, "DISK", ubik_sc, 3, DISK_ExecuteRequest);
273     if (tservice == (struct rx_service *)0) {
274         ubik_dprint("Could not create DISK rx service!\n");
275         return -1;
276     }
277     rx_SetMinProcs(tservice, 2);
278     rx_SetMaxProcs(tservice, 3);
279
280     /* start an rx_ServerProc to handle incoming RPC's in particular the 
281      * UpdateInterfaceAddr RPC that occurs in ubeacon_InitServerList. This avoids
282      * the "steplock" problem in ubik initialization. Defect 11037.
283      */
284     LWP_CreateProcess(rx_ServerProc, rx_stackSize, RX_PROCESS_PRIORITY,
285                       0, "rx_ServerProc", &junk);
286
287     /* do basic initialization */
288     code = uvote_Init();
289     if (code) return code;
290     code = urecovery_Initialize(tdb);
291     if (code) return code;
292     if (info)
293         code = ubeacon_InitServerListByInfo(myHost, info, clones);
294     else 
295         code = ubeacon_InitServerList(myHost, serverList);
296     if (code) return code;
297
298     /* now start up async processes */
299     code = LWP_CreateProcess(ubeacon_Interact, 16384/*8192*/, LWP_MAX_PRIORITY-1,
300                              0, "beacon", &junk);
301     if (code) return code;
302     code = LWP_CreateProcess(urecovery_Interact, 16384/*8192*/, LWP_MAX_PRIORITY-1,
303                              0, "recovery", &junk);
304     return code;
305 }
306
307 /*  This routine begins a read or write transaction on the transaction
308     identified by transPtr, in the dbase named by dbase.  An open mode of
309     ubik_READTRANS identifies this as a read transaction, while a mode of
310     ubik_WRITETRANS identifies this as a write transaction.  transPtr 
311     is set to the returned transaction control block. The readAny flag is
312     set to 0 or 1 by the wrapper functions ubik_BeginTrans() or 
313     ubik_BeginTransReadAny() below.
314
315     We can only begin transaction when we have an up-to-date database.
316 */
317
318 static int BeginTrans(dbase, transMode, transPtr, readAny)
319     register struct ubik_dbase *dbase;  /* in */
320     int readAny;
321     afs_int32 transMode; /* in */
322     struct ubik_trans **transPtr;       /* out */ {
323     struct ubik_trans *jt;
324     register struct ubik_trans *tt;
325     register afs_int32 code;
326 #if defined(UBIK_PAUSE)
327     int count;
328 #endif /* UBIK_PAUSE */
329
330     if ((transMode != UBIK_READTRANS) && readAny) return UBADTYPE;
331     DBHOLD(dbase);
332 #if defined(UBIK_PAUSE)
333     /* if we're polling the slave sites, wait until the returns
334      *  are all in.  Otherwise, the urecovery_CheckTid call may
335      *  glitch us. 
336      */
337     if (transMode == UBIK_WRITETRANS)
338         for (count = 75; dbase->flags & DBVOTING; --count) {
339             DBRELE(dbase);
340 #ifdef GRAND_PAUSE_DEBUGGING
341             if (count==75)
342                 fprintf (stderr,"%ld: myport=%d: BeginTrans is waiting 'cause of voting conflict\n", time(0), ntohs(ubik_callPortal)); 
343             else
344 #endif
345                 if (count <= 0) {
346 #if 1
347                     fprintf (stderr,"%ld: myport=%d: BeginTrans failed because of voting conflict\n", time(0), ntohs(ubik_callPortal));
348 #endif
349                     return UNOQUORUM;   /* a white lie */
350                 }
351             IOMGR_Sleep(2);
352             DBHOLD(dbase);
353         }
354 #endif /* UBIK_PAUSE */
355     if (urecovery_AllBetter(dbase, readAny)==0) {
356         DBRELE(dbase);
357         return UNOQUORUM;
358     }
359     /* otherwise we have a quorum, use it */
360
361     /* make sure that at most one write transaction occurs at any one time.  This
362         has nothing to do with transaction locking; that's enforced by the lock package.  However,
363         we can't even handle two non-conflicting writes, since our log and recovery modules
364         don't know how to restore one without possibly picking up some data from the other. */
365     if (transMode == UBIK_WRITETRANS) {
366         /* if we're writing already, wait */
367         while(dbase->flags & DBWRITING) {
368             DBRELE(dbase);
369             LWP_WaitProcess(&dbase->flags);
370             DBHOLD(dbase);
371         }
372         if (!ubeacon_AmSyncSite()) {
373             DBRELE(dbase);
374             return UNOTSYNC;
375         }
376     }
377
378     /* create the transaction */
379     code = udisk_begin(dbase, transMode, &jt);  /* can't take address of register var */
380     tt = jt;        /* move to a register */
381     if (code || tt == (struct ubik_trans *)NULL) {
382         DBRELE(dbase);
383         return code;
384     }
385     if (readAny) tt->flags |= TRREADANY;
386     /* label trans and dbase with new tid */
387     tt->tid.epoch = ubik_epochTime;
388     /* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */
389     tt->tid.counter = (dbase->tidCounter += 2);
390
391     if (transMode == UBIK_WRITETRANS) {
392       /* for a write trans, we have to keep track of the write tid counter too */
393 #if defined(UBIK_PAUSE)
394       dbase->writeTidCounter = tt->tid.counter;
395 #else
396       dbase->writeTidCounter += 2;
397 #endif /* UBIK_PAUSE */
398
399         /* next try to start transaction on appropriate number of machines */
400         code = ContactQuorum(DISK_Begin, tt, 0);
401         if (code) {
402             /* we must abort the operation */
403             udisk_abort(tt);
404             ContactQuorum(DISK_Abort, tt, 0);    /* force aborts to the others */
405             udisk_end(tt);
406             DBRELE(dbase);
407             return code;
408         }
409     }
410
411     *transPtr = tt;
412     DBRELE(dbase);
413     return 0;
414 }
415
416 int ubik_BeginTrans(dbase, transMode, transPtr)
417     register struct ubik_dbase *dbase;  /* in */
418     afs_int32 transMode; /* in */
419     struct ubik_trans **transPtr;       /* out */ {
420         return BeginTrans(dbase, transMode, transPtr, 0);
421 }
422
423 int ubik_BeginTransReadAny(dbase, transMode, transPtr)
424     register struct ubik_dbase *dbase;  /* in */
425     afs_int32 transMode; /* in */
426     struct ubik_trans **transPtr;       /* out */ {
427         return BeginTrans(dbase, transMode, transPtr, 1);
428
429  
430 /* this routine ends a read or write transaction by aborting it */
431 int ubik_AbortTrans(transPtr)
432     register struct ubik_trans *transPtr; /* in */ {
433     register afs_int32 code;
434     afs_int32 code2;
435     register struct ubik_dbase *dbase;
436     
437     dbase = transPtr->dbase;
438     DBHOLD(dbase);
439     memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
440     /* see if we're still up-to-date */
441     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
442         udisk_abort(transPtr);
443         udisk_end(transPtr);
444         DBRELE(dbase);
445         return UNOQUORUM;
446     }
447
448     if (transPtr->type == UBIK_READTRANS) {
449         code = udisk_abort(transPtr);
450         udisk_end(transPtr);
451         DBRELE(dbase);
452         return code;
453     }
454
455     /* below here, we know we're doing a write transaction */
456     if (!ubeacon_AmSyncSite()) {
457         udisk_abort(transPtr);
458         udisk_end(transPtr);
459         DBRELE(dbase);
460         return UNOTSYNC;
461     }
462     
463     /* now it is safe to try remote abort */
464     code = ContactQuorum(DISK_Abort, transPtr, 0);
465     code2 = udisk_abort(transPtr);
466     udisk_end(transPtr);
467     DBRELE(dbase);
468     return (code? code : code2);
469 }
470
471 /* This routine ends a read or write transaction on the open transaction identified by transPtr.  It returns an error code. */
472 int ubik_EndTrans(transPtr)
473     register struct ubik_trans *transPtr; /* in */ {
474     register afs_int32 code;
475     struct timeval tv;
476     afs_int32 realStart;
477     register struct ubik_server *ts;
478     afs_int32 now;
479     register struct ubik_dbase *dbase;
480     
481     if (transPtr->type == UBIK_WRITETRANS) {
482        code = ubik_Flush(transPtr);
483        if (code) {
484           ubik_AbortTrans(transPtr);
485           return(code);
486        }
487     }
488
489     dbase = transPtr->dbase;
490     DBHOLD(dbase);
491     memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
492
493     /* give up if no longer current */
494     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
495         udisk_abort(transPtr);
496         udisk_end(transPtr);
497         DBRELE(dbase);
498         return UNOQUORUM;
499     }
500
501     if (transPtr->type == UBIK_READTRANS) { /* reads are easy */
502         code = udisk_commit(transPtr);
503         if (code == 0) goto success;    /* update cachedVersion correctly */
504         udisk_end(transPtr);
505         DBRELE(dbase);
506         return code;
507     }
508
509     if (!ubeacon_AmSyncSite()) {    /* no longer sync site */
510         udisk_abort(transPtr);
511         udisk_end(transPtr);
512         DBRELE(dbase);
513         return UNOTSYNC;
514     }
515     
516     /* now it is safe to do commit */
517     code = udisk_commit(transPtr);
518     if (code == 0) code = ContactQuorum(DISK_Commit, transPtr, CStampVersion);
519     if (code) {
520         /* failed to commit, so must return failure.  Try to clear locks first, just for fun
521             Note that we don't know if this transaction will eventually commit at this point.
522             If it made it to a site that will be present in the next quorum, we win, otherwise
523             we lose.  If we contact a majority of sites, then we won't be here: contacting
524             a majority guarantees commit, since it guarantees that one dude will be a
525             member of the next quorum. */
526         ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
527         udisk_end(transPtr);
528         DBRELE(dbase);
529         return code;
530     }
531     /* before we can start sending unlock messages, we must wait until all servers
532         that are possibly still functioning on the other side of a network partition
533         have timed out.  Check the server structures, compute how long to wait, then
534         start the unlocks */
535     realStart = FT_ApproxTime();
536     while (1) {
537         /* wait for all servers to time out */
538         code = 0;
539         now = FT_ApproxTime();
540         /* check if we're still sync site, the guy should either come up
541             to us, or timeout.  Put safety check in anyway */
542         if (now - realStart > 10 * BIGTIME) {
543             ubik_stats.escapes++;
544             ubik_print("ubik escaping from commit wait\n");
545             break;
546         }
547         for(ts = ubik_servers; ts; ts=ts->next) {
548             if (!ts->beaconSinceDown && now <= ts->lastBeaconSent + BIGTIME) {
549                 /* this guy could have some damaged data, wait for him */
550                 code = 1;
551                 tv.tv_sec = 1;  /* try again after a while (ha ha) */
552                 tv.tv_usec = 0;
553                 IOMGR_Select(0, 0, 0, 0, &tv);  /* poll, should we wait on something? */
554                 break;
555             }
556         }
557         if (code == 0) break;       /* no down ones still pseudo-active */
558     }
559
560     /* finally, unlock all the dudes.  We can return success independent of the number of servers
561         that really unlock the dbase; the others will do it if/when they elect a new sync site.
562         The transaction is committed anyway, since we succeeded in contacting a quorum
563         at the start (when invoking the DiskCommit function).
564     */
565     ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
566
567   success:
568     udisk_end(transPtr);
569     /* update version on successful EndTrans */
570     memcpy(&dbase->cachedVersion, &dbase->version, sizeof(struct ubik_version));
571     
572     DBRELE(dbase);
573     return 0;
574 }
575
576 /* This routine reads length bytes into buffer from the current position in the database.  The file pointer is updated appropriately (by adding the number of bytes actually transferred), and the length actually transferred is stored in the long integer pointed to by length.  Note that *length is an INOUT parameter: at the start it represents the size of the buffer, and when done, it contains the number of bytes actually transferred.  A short read returns zero for an error code. */
577
578 int ubik_Read(transPtr, buffer, length)
579     register struct ubik_trans *transPtr;       /* in */
580     char *buffer;   /* in */
581     afs_int32 length;   /* in */ {
582     register afs_int32 code;
583
584     /* reads are easy to do: handle locally */
585     DBHOLD(transPtr->dbase);
586     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
587         DBRELE(transPtr->dbase);
588         return UNOQUORUM;
589     }
590     
591     code = udisk_read(transPtr, transPtr->seekFile, buffer, transPtr->seekPos, length);
592     if (code == 0) {
593         transPtr->seekPos += length;
594     }
595     DBRELE(transPtr->dbase);
596     return code;
597 }
598
599 /* This routine will flush the io data in the iovec structures. It first
600  * flushes to the local disk and then uses ContactQuorum to write it to 
601  * the other servers.
602  */
603 int ubik_Flush(transPtr)
604     struct ubik_trans *transPtr;
605 {
606     afs_int32 code, error=0;
607
608     if (transPtr->type != UBIK_WRITETRANS)
609        return UBADTYPE;
610     if (!transPtr->iovec_info.iovec_wrt_len || !transPtr->iovec_info.iovec_wrt_val)
611        return 0;
612
613     DBHOLD(transPtr->dbase);
614     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
615        ERROR_EXIT(UNOQUORUM);
616     if (!ubeacon_AmSyncSite())      /* only sync site can write */
617        ERROR_EXIT(UNOTSYNC);
618
619     /* Update the rest of the servers in the quorum */
620     code = ContactQuorum(DISK_WriteV, transPtr, 0, 
621                          &transPtr->iovec_info, &transPtr->iovec_data);
622     if (code) {
623        udisk_abort(transPtr);
624        ContactQuorum(DISK_Abort, transPtr, 0);    /* force aborts to the others */
625        transPtr->iovec_info.iovec_wrt_len = 0;
626        transPtr->iovec_data.iovec_buf_len = 0;
627        ERROR_EXIT(code);
628     }
629
630     /* Wrote the buffers out, so start at scratch again */
631     transPtr->iovec_info.iovec_wrt_len = 0;
632     transPtr->iovec_data.iovec_buf_len = 0;
633
634   error_exit:
635     DBRELE(transPtr->dbase);
636     return error;
637 }
638
639 int ubik_Write(transPtr, buffer, length)
640     register struct ubik_trans *transPtr;       /* in */
641     char *buffer;       /* in */
642     afs_int32 length;   /* in */
643 {
644     struct ubik_iovec *iovec;
645     afs_int32 code, error=0;
646     afs_int32 pos, len, size;
647
648     if (transPtr->type != UBIK_WRITETRANS)
649        return UBADTYPE;
650     if (!length)
651        return 0;
652
653     if (length > IOVEC_MAXBUF) {
654        for (pos=0, len=length; len>0; len-=size, pos+=size) {
655           size = ((len < IOVEC_MAXBUF) ? len : IOVEC_MAXBUF);
656           code = ubik_Write(transPtr, &buffer[pos], size);
657           if (code) return (code);
658        }
659        return 0;
660     }
661
662     if (!transPtr->iovec_info.iovec_wrt_val) {
663        transPtr->iovec_info.iovec_wrt_len  = 0;
664        transPtr->iovec_info.iovec_wrt_val  = 
665          (struct ubik_iovec *)malloc(IOVEC_MAXWRT*sizeof(struct ubik_iovec));
666        transPtr->iovec_data.iovec_buf_len = 0;
667        transPtr->iovec_data.iovec_buf_val = (char *)malloc(IOVEC_MAXBUF);
668        if (!transPtr->iovec_info.iovec_wrt_val || !transPtr->iovec_data.iovec_buf_val) {
669           if (transPtr->iovec_info.iovec_wrt_val) free(transPtr->iovec_info.iovec_wrt_val);
670           transPtr->iovec_info.iovec_wrt_val = 0;
671           if (transPtr->iovec_data.iovec_buf_val) free(transPtr->iovec_data.iovec_buf_val);
672           transPtr->iovec_data.iovec_buf_val = 0;
673           return UNOMEM;
674        }
675     }
676
677     /* If this write won't fit in the structure, then flush it out and start anew */
678     if ( (transPtr->iovec_info.iovec_wrt_len >= IOVEC_MAXWRT) ||
679          ((length + transPtr->iovec_data.iovec_buf_len) > IOVEC_MAXBUF) ) {
680        code = ubik_Flush(transPtr);
681        if (code) return (code);
682     }
683
684     DBHOLD(transPtr->dbase);
685     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
686        ERROR_EXIT(UNOQUORUM);
687     if (!ubeacon_AmSyncSite())      /* only sync site can write */
688        ERROR_EXIT(UNOTSYNC);
689
690     /* Write to the local disk */
691     code = udisk_write(transPtr, transPtr->seekFile, buffer, 
692                                  transPtr->seekPos, length);
693     if (code) {
694        udisk_abort(transPtr);
695        transPtr->iovec_info.iovec_wrt_len = 0;
696        transPtr->iovec_data.iovec_buf_len = 0;
697        DBRELE(transPtr->dbase);
698        return(code);
699     }
700
701     /* Collect writes for the other ubik servers (to be done in bulk) */
702     iovec = (struct ubik_iovec *)transPtr->iovec_info.iovec_wrt_val;
703     iovec[transPtr->iovec_info.iovec_wrt_len].file     = transPtr->seekFile;
704     iovec[transPtr->iovec_info.iovec_wrt_len].position = transPtr->seekPos;
705     iovec[transPtr->iovec_info.iovec_wrt_len].length   = length;
706     
707     memcpy(&transPtr->iovec_data.iovec_buf_val[transPtr->iovec_data.iovec_buf_len], buffer, length);
708
709     transPtr->iovec_info.iovec_wrt_len++;
710     transPtr->iovec_data.iovec_buf_len += length;
711     transPtr->seekPos += length;
712
713   error_exit:
714     DBRELE(transPtr->dbase);
715     return error;
716 }
717
718 /* This sets the file pointer associated with the current transaction to the appropriate file and byte position.  Unlike Unix files, a transaction is labelled by both a file number (fileid) and a byte position relative to the specified file (position). */
719
720 int ubik_Seek(transPtr, fileid, position)
721     register struct ubik_trans *transPtr;       /* IN */
722     afs_int32 fileid;    /* IN */
723     afs_int32 position;  /* IN */ {
724     register afs_int32 code;
725
726     DBHOLD(transPtr->dbase);
727     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
728         code = UNOQUORUM;
729     } else  {
730         transPtr->seekFile = fileid;
731         transPtr->seekPos = position;
732         code = 0;
733     }
734     DBRELE(transPtr->dbase);
735     return code;
736 }
737
738 /* This call returns the file pointer associated with the specified transaction in fileid and position. */
739
740 int ubik_Tell(transPtr, fileid, position)
741     register struct ubik_trans *transPtr;       /* IN */
742     afs_int32 *fileid;   /* OUT */
743     afs_int32 *position; /* OUT */ {
744     DBHOLD(transPtr->dbase);
745     *fileid = transPtr->seekFile;
746     *position = transPtr->seekPos;
747     DBRELE(transPtr->dbase);
748     return 0;
749 }
750
751 /* This sets the file size for the currently-selected file to length bytes, if length is less than the file's current size. */
752
753 int ubik_Truncate(transPtr, length)
754     register struct ubik_trans *transPtr;       /* in */
755     afs_int32 length;    /* in */ {
756     afs_int32 code, error=0;
757
758     /* Will also catch if not UBIK_WRITETRANS */
759     code = ubik_Flush(transPtr);
760     if (code) return(code);
761
762     DBHOLD(transPtr->dbase);
763     /* first, check that quorum is still good, and that dbase is up-to-date */
764     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
765        ERROR_EXIT(UNOQUORUM);
766     if (!ubeacon_AmSyncSite())
767        ERROR_EXIT(UNOTSYNC);
768     
769     /* now do the operation locally, and propagate it out */
770     code = udisk_truncate(transPtr, transPtr->seekFile, length);
771     if (!code) {
772         code = ContactQuorum(DISK_Truncate, transPtr, 0, transPtr->seekFile, length);
773     }
774     if (code) {
775         /* we must abort the operation */
776         udisk_abort(transPtr);
777         ContactQuorum(DISK_Abort, transPtr, 0);    /* force aborts to the others */
778         ERROR_EXIT(code);
779     }
780
781   error_exit:
782     DBRELE(transPtr->dbase);
783     return error;
784 }
785
786 /* set a lock; all locks are released on transaction end (commit/abort) */
787 ubik_SetLock(atrans, apos, alen, atype)
788     struct ubik_trans *atrans;
789     afs_int32 apos, alen;                  /* apos and alen are not used */
790     int atype; {
791     afs_int32 code=0, error=0;
792     
793     if (atype == LOCKWRITE) {
794        if (atrans->type == UBIK_READTRANS) return UBADTYPE;
795        code = ubik_Flush(atrans);
796        if (code) return(code);
797     }
798
799     DBHOLD(atrans->dbase);
800     if (atype == LOCKREAD) {
801         code = ulock_getLock(atrans, atype, 1);
802         if (code) ERROR_EXIT(code);
803     }
804     else {
805         /* first, check that quorum is still good, and that dbase is up-to-date */
806         if (!urecovery_AllBetter(atrans->dbase, atrans->flags & TRREADANY))
807            ERROR_EXIT(UNOQUORUM);
808         if (!ubeacon_AmSyncSite())
809            ERROR_EXIT(UNOTSYNC);
810
811         /* now do the operation locally, and propagate it out */
812         code = ulock_getLock(atrans, atype, 1);
813         if (code == 0) {
814             code = ContactQuorum(DISK_Lock, atrans, 0, 0, 
815                                  1/*unused*/, 1/*unused*/, LOCKWRITE);
816         }
817         if (code) {
818             /* we must abort the operation */
819             udisk_abort(atrans);
820             ContactQuorum(DISK_Abort, atrans, 0);    /* force aborts to the others */
821             ERROR_EXIT(code);
822         }
823     }
824
825   error_exit:
826     DBRELE(atrans->dbase);
827     return error;
828 }
829
830 /* utility to wait for a version # to change */
831 int ubik_WaitVersion(adatabase, aversion)
832 register struct ubik_version *aversion;
833 register struct ubik_dbase *adatabase; {
834     while (1) {
835         /* wait until version # changes, and then return */
836         if (vcmp(*aversion, adatabase->version) != 0)
837             return 0;
838         LWP_WaitProcess(&adatabase->version);   /* same vers, just wait */
839     }
840 }
841
842 /* utility to get the version of the dbase a transaction is dealing with */
843 int ubik_GetVersion(atrans, avers)
844 register struct ubik_trans *atrans;
845 register struct ubik_version *avers; {
846     *avers = atrans->dbase->version;
847     return 0;
848 }
849
850 /* Facility to simplify database caching.  Returns zero if last trans was done
851    on the local server and was successful.  If return value is non-zero and the
852    caller is a server caching part of the Ubik database, it should invalidate
853    that cache.  A return value of -1 means bad (NULL) argument. */
854
855 int ubik_CacheUpdate (atrans)
856   register struct ubik_trans *atrans;
857 {
858     if (!(atrans && atrans->dbase)) return -1;
859     return vcmp(atrans->dbase->cachedVersion, atrans->dbase->version) != 0;
860 }
861
862 int panic(a, b, c, d)
863     char *a, *b, *c, *d;
864 {
865     ubik_print("Ubik PANIC: ");
866     ubik_print(a, b, c, d);
867     abort();
868     ubik_print("BACK FROM ABORT\n");    /* shouldn't come back */
869     exit(1);                        /* never know, though  */
870 }
871
872 /*
873 ** This functions takes an IP addresses as its parameter. It returns the
874 ** the primary IP address that is on the host passed in.
875 */
876 afs_uint32
877 ubikGetPrimaryInterfaceAddr(addr)
878 afs_uint32  addr;                       /* network byte order */
879 {
880         struct ubik_server *ts;
881         int j;
882
883         for ( ts=ubik_servers; ts; ts=ts->next )
884             for ( j=0; j < UBIK_MAX_INTERFACE_ADDR; j++)
885                 if ( ts->addr[j] == addr )
886                     return  ts->addr[0];        /* net byte order */
887         return 0; /* if not in server database, return error */
888 }
889