4b6618dadfe418dea7b74cf3cacb24a84be935b9
[openafs.git] / src / ubik / ubik.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afs/param.h>
11 #include <sys/types.h>
12 #ifdef AFS_NT40_ENV
13 #include <winsock2.h>
14 #else
15 #include <sys/file.h>
16 #include <netinet/in.h>
17 #include <sys/param.h>
18 #endif
19 #include <time.h>
20 #include <lock.h>
21 #include <rx/xdr.h>
22 #include <rx/rx.h>
23
24 #define UBIK_INTERNALS
25 #include "ubik.h"
26 #include "ubik_int.h"
27
28 #define ERROR_EXIT(code) {error=(code); goto error_exit;}
29
30 /*  This system is organized in a hierarchical set of related modules.  Modules
31     at one level can only call modules at the same level or below.
32     
33     At the bottom level (0) we have R, RFTP, LWP and IOMGR, i.e. the basic
34     operating system primitives.
35     
36     At the next level (1) we have
37
38         VOTER--The module responsible for casting votes when asked.  It is also
39         responsible for determining whether this server should try to become
40         a synchronization site.
41         
42         BEACONER--The module responsible for sending keep-alives out when a
43         server is actually the sync site, or trying to become a sync site.
44         
45         DISK--The module responsible for representing atomic transactions
46         on the local disk.  It maintains a new-value only log.
47         
48         LOCK--The module responsible for locking byte ranges in the database file.
49         
50     At the next level (2) we have
51           
52         RECOVERY--The module responsible for ensuring that all members of a quorum
53         have the same up-to-date database after a new synchronization site is
54         elected.  This module runs only on the synchronization site.
55         
56     At the next level (3) we have
57     
58         REMOTE--The module responsible for interpreting requests from the sync
59         site and applying them to the database, after obtaining the appropriate
60         locks.
61         
62     At the next level (4) we have
63     
64         UBIK--The module users call to perform operations on the database.
65 */
66
67
68 /* some globals */
69 afs_int32 ubik_quorum=0;
70 struct ubik_dbase *ubik_dbase=0;
71 struct ubik_stats ubik_stats;
72 afs_uint32 ubik_host[UBIK_MAX_INTERFACE_ADDR];
73 afs_int32 ubik_epochTime = 0;
74 afs_int32 urecovery_state = 0;
75 int (*ubik_SRXSecurityProc)();
76 char *ubik_SRXSecurityRock;
77 struct ubik_server *ubik_servers;
78 short ubik_callPortal;
79
80 static int BeginTrans();
81
82 struct rx_securityClass *ubik_sc[3];
83
84 /* perform an operation at a quorum, handling error conditions.  return 0 if
85     all worked, otherwise mark failing server as down and return UERROR
86
87     Note that if any server misses an update, we must wait BIGTIME seconds before
88     allowing the transaction to commit, to ensure that the missing and possibly still
89     functioning server times out and stop handing out old data.  This is done in the commit
90     code, where we wait for a server marked down to have stayed down for BIGTIME seconds
91     before we allow a transaction to commit.  A server that fails but comes back up won't give
92     out old data because it is sent the sync count along with the beacon message that
93     marks it as *really* up (beaconSinceDown).
94 */
95 #define CStampVersion       1       /* meaning set ts->version */
96 afs_int32 ContactQuorum(aproc, atrans, aflags, aparm0, aparm1, aparm2, aparm3, aparm4, aparm5)
97     int (*aproc)();
98     int aflags;
99     register struct ubik_trans *atrans;
100     long aparm0, aparm1, aparm2, aparm3, aparm4, aparm5; {
101     register struct ubik_server *ts;
102     register afs_int32 code;
103     afs_int32 rcode, okcalls;
104
105     rcode = 0;
106     okcalls = 0;
107     for(ts = ubik_servers; ts; ts=ts->next) {
108         /* for each server */
109         if (!ts->up || !ts->currentDB) {
110             ts->currentDB = 0;  /* db is no longer current; we just missed an update */
111             continue;    /* not up-to-date, don't bother */
112         }
113         code = (*aproc) (ts->disk_rxcid, &atrans->tid, aparm0, aparm1, aparm2, aparm3, aparm4, aparm5);
114         if ( (aproc == DISK_WriteV) && (code <= -450) && (code > -500) ) {
115            /* An RPC interface mismatch (as defined in comerr/error_msg.c).
116             * Un-bulk the entries and do individual DISK_Write calls
117             * instead of DISK_WriteV.
118             */
119            iovec_wrt         *iovec_infoP = (iovec_wrt *)aparm0;
120            iovec_buf         *iovec_dataP = (iovec_buf *)aparm1;
121            struct ubik_iovec *iovec       = (struct ubik_iovec *)iovec_infoP->iovec_wrt_val;
122            char              *iobuf       = (char *)iovec_dataP->iovec_buf_val;
123            bulkdata          tcbs;
124            afs_int32             i, offset;
125
126            for (i=0, offset=0; i<iovec_infoP->iovec_wrt_len; i++) {
127               /* Sanity check for going off end of buffer */
128               if ((offset + iovec[i].length) > iovec_dataP->iovec_buf_len) {
129                  code = UINTERNAL;
130                  break;
131               }
132               tcbs.bulkdata_len = iovec[i].length;
133               tcbs.bulkdata_val = &iobuf[offset];
134               code = DISK_Write(ts->disk_rxcid, &atrans->tid,
135                                 iovec[i].file, iovec[i].position, &tcbs);
136               if (code) break;
137
138               offset += iovec[i].length;
139            }
140         }
141         if (code) {         /* failure */
142             rcode = code;
143             ts->up = 0;     /* mark as down now; beacons will no longer be sent */
144             ts->currentDB = 0;
145             ts->beaconSinceDown = 0;
146             urecovery_LostServer(); /* tell recovery to try to resend dbase later */
147         } else {            /* success */
148             okcalls++;      /* count up how many worked */
149             if (aflags & CStampVersion) {
150                 ts->version = atrans->dbase->version;
151             }
152         }
153     }
154     /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
155     if (okcalls+1 >= ubik_quorum) return 0;
156     else return rcode;
157 }
158
159 /* This routine initializes the ubik system for a set of servers.  It returns 0 for success, or an error code on failure.  The set of servers is specified by serverList; nServers gives the number of entries in this array.  Finally, dbase is the returned structure representing this instance of a ubik; it is passed to various calls below.  The variable pathName provides an initial prefix used for naming storage files used by this system.  It should perhaps be generalized to a low-level disk interface providing read, write, file enumeration and sync operations.
160
161     Note that the host named by myHost should not also be listed in serverList.
162 */
163
164 int ubik_ServerInit(myHost, myPort, serverList, pathName, dbase)
165     afs_int32 myHost;
166     short myPort;
167     afs_int32 serverList[];    /* in */
168     char *pathName;     /* in */
169     struct ubik_dbase **dbase; /* out */ {
170     register struct ubik_dbase *tdb;
171     register afs_int32 code;
172     PROCESS junk;
173     afs_int32 secIndex;
174     struct rx_securityClass *secClass;
175
176     struct rx_service *tservice;
177     extern struct rx_securityClass *rxnull_NewServerSecurityObject();
178     extern int VOTE_ExecuteRequest(), DISK_ExecuteRequest();
179     extern void rx_ServerProc();
180     extern int rx_stackSize;
181
182     initialize_u_error_table();
183
184     tdb = (struct ubik_dbase *) malloc(sizeof(struct ubik_dbase));
185     tdb->pathName = (char *) malloc(strlen(pathName)+1);
186     strcpy(tdb->pathName, pathName);
187     tdb->activeTrans = (struct ubik_trans *) 0;
188     bzero(&tdb->version, sizeof(struct ubik_version));
189     bzero(&tdb->cachedVersion, sizeof(struct ubik_version));
190     Lock_Init(&tdb->versionLock);
191     tdb->flags = 0;
192     tdb->read = uphys_read;
193     tdb->write = uphys_write;
194     tdb->truncate = uphys_truncate;
195     tdb->open = 0;  /* this function isn't used any more */
196     tdb->sync = uphys_sync;
197     tdb->stat = uphys_stat;
198     tdb->getlabel = uphys_getlabel;
199     tdb->setlabel = uphys_setlabel;
200     tdb->getnfiles = uphys_getnfiles;
201     tdb->readers=0;
202     tdb->tidCounter=tdb->writeTidCounter=0;
203     *dbase = tdb;
204     ubik_dbase = tdb;   /* for now, only one db per server; can fix later when we have names for the other dbases */
205
206     /* initialize RX */
207     ubik_callPortal = myPort;
208     /* try to get an additional security object */
209     ubik_sc[0] = rxnull_NewServerSecurityObject();
210     ubik_sc[1] = 0;
211     ubik_sc[2] = 0;
212     if (ubik_SRXSecurityProc) {
213         code = (*ubik_SRXSecurityProc)(ubik_SRXSecurityRock, &secClass, &secIndex);
214         if (code == 0) {
215             ubik_sc[secIndex] = secClass;
216         }
217     }
218     code = rx_Init(myPort);
219     if (code < 0) return code;
220     tservice = rx_NewService(0, VOTE_SERVICE_ID, "VOTE", ubik_sc, 3, VOTE_ExecuteRequest);
221     if (tservice == (struct rx_service *)0) {
222         ubik_dprint("Could not create VOTE rx service!\n");
223         return -1;
224     }
225     rx_SetMinProcs(tservice, 2);
226     rx_SetMaxProcs(tservice, 3);
227
228     tservice = rx_NewService(0, DISK_SERVICE_ID, "DISK", ubik_sc, 3, DISK_ExecuteRequest);
229     if (tservice == (struct rx_service *)0) {
230         ubik_dprint("Could not create DISK rx service!\n");
231         return -1;
232     }
233     rx_SetMinProcs(tservice, 2);
234     rx_SetMaxProcs(tservice, 3);
235
236     /* start an rx_ServerProc to handle incoming RPC's in particular the 
237      * UpdateInterfaceAddr RPC that occurs in ubeacon_InitServerList. This avoids
238      * the "steplock" problem in ubik initialization. Defect 11037.
239      */
240     LWP_CreateProcess(rx_ServerProc, rx_stackSize, RX_PROCESS_PRIORITY,
241                       0, "rx_ServerProc", &junk);
242
243     /* do basic initialization */
244     code = uvote_Init();
245     if (code) return code;
246     code = urecovery_Initialize(tdb);
247     if (code) return code;
248     code = ubeacon_InitServerList(myHost, serverList);
249     if (code) return code;
250
251     /* now start up async processes */
252     code = LWP_CreateProcess(ubeacon_Interact, 16384/*8192*/, LWP_MAX_PRIORITY-1,
253                              0, "beacon", &junk);
254     if (code) return code;
255     code = LWP_CreateProcess(urecovery_Interact, 16384/*8192*/, LWP_MAX_PRIORITY-1,
256                              0, "recovery", &junk);
257     return code;
258 }
259
260 /*  This routine begins a read or write transaction on the transaction
261     identified by transPtr, in the dbase named by dbase.  An open mode of
262     ubik_READTRANS identifies this as a read transaction, while a mode of
263     ubik_WRITETRANS identifies this as a write transaction.  transPtr 
264     is set to the returned transaction control block. The readAny flag is
265     set to 0 or 1 by the wrapper functions ubik_BeginTrans() or 
266     ubik_BeginTransReadAny() below.
267
268     We can only begin transaction when we have an up-to-date database.
269 */
270
271 static int BeginTrans(dbase, transMode, transPtr, readAny)
272     register struct ubik_dbase *dbase;  /* in */
273     int readAny;
274     afs_int32 transMode; /* in */
275     struct ubik_trans **transPtr;       /* out */ {
276     struct ubik_trans *jt;
277     register struct ubik_trans *tt;
278     register afs_int32 code;
279
280     if ((transMode != UBIK_READTRANS) && readAny) return UBADTYPE;
281     DBHOLD(dbase);
282     if (urecovery_AllBetter(dbase, readAny)==0) {
283         DBRELE(dbase);
284         return UNOQUORUM;
285     }
286     /* otherwise we have a quorum, use it */
287
288     /* make sure that at most one write transaction occurs at any one time.  This
289         has nothing to do with transaction locking; that's enforced by the lock package.  However,
290         we can't even handle two non-conflicting writes, since our log and recovery modules
291         don't know how to restore one without possibly picking up some data from the other. */
292     if (transMode == UBIK_WRITETRANS) {
293         /* if we're writing already, wait */
294         while(dbase->flags & DBWRITING) {
295             DBRELE(dbase);
296             LWP_WaitProcess(&dbase->flags);
297             DBHOLD(dbase);
298         }
299         if (!ubeacon_AmSyncSite()) {
300             DBRELE(dbase);
301             return UNOTSYNC;
302         }
303     }
304
305     /* create the transaction */
306     code = udisk_begin(dbase, transMode, &jt);  /* can't take address of register var */
307     tt = jt;        /* move to a register */
308     if (code || tt == (struct ubik_trans *)NULL) {
309         DBRELE(dbase);
310         return code;
311     }
312     if (readAny) tt->flags |= TRREADANY;
313     /* label trans and dbase with new tid */
314     tt->tid.epoch = ubik_epochTime;
315     /* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */
316     tt->tid.counter = (dbase->tidCounter += 2);
317
318     if (transMode == UBIK_WRITETRANS) {
319       /* for a write trans, we have to keep track of the write tid counter too */
320       dbase->writeTidCounter += 2;
321
322         /* next try to start transaction on appropriate number of machines */
323         code = ContactQuorum(DISK_Begin, tt, 0);
324         if (code) {
325             /* we must abort the operation */
326             udisk_abort(tt);
327             ContactQuorum(DISK_Abort, tt, 0);    /* force aborts to the others */
328             udisk_end(tt);
329             DBRELE(dbase);
330             return code;
331         }
332     }
333
334     *transPtr = tt;
335     DBRELE(dbase);
336     return 0;
337 }
338
339 int ubik_BeginTrans(dbase, transMode, transPtr)
340     register struct ubik_dbase *dbase;  /* in */
341     afs_int32 transMode; /* in */
342     struct ubik_trans **transPtr;       /* out */ {
343         return BeginTrans(dbase, transMode, transPtr, 0);
344 }
345
346 int ubik_BeginTransReadAny(dbase, transMode, transPtr)
347     register struct ubik_dbase *dbase;  /* in */
348     afs_int32 transMode; /* in */
349     struct ubik_trans **transPtr;       /* out */ {
350         return BeginTrans(dbase, transMode, transPtr, 1);
351
352  
353 /* this routine ends a read or write transaction by aborting it */
354 int ubik_AbortTrans(transPtr)
355     register struct ubik_trans *transPtr; /* in */ {
356     register afs_int32 code;
357     afs_int32 code2;
358     register struct ubik_dbase *dbase;
359     
360     dbase = transPtr->dbase;
361     DBHOLD(dbase);
362     bzero(&dbase->cachedVersion, sizeof(struct ubik_version));
363     /* see if we're still up-to-date */
364     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
365         udisk_abort(transPtr);
366         udisk_end(transPtr);
367         DBRELE(dbase);
368         return UNOQUORUM;
369     }
370
371     if (transPtr->type == UBIK_READTRANS) {
372         code = udisk_abort(transPtr);
373         udisk_end(transPtr);
374         DBRELE(dbase);
375         return code;
376     }
377
378     /* below here, we know we're doing a write transaction */
379     if (!ubeacon_AmSyncSite()) {
380         udisk_abort(transPtr);
381         udisk_end(transPtr);
382         DBRELE(dbase);
383         return UNOTSYNC;
384     }
385     
386     /* now it is safe to try remote abort */
387     code = ContactQuorum(DISK_Abort, transPtr, 0);
388     code2 = udisk_abort(transPtr);
389     udisk_end(transPtr);
390     DBRELE(dbase);
391     return (code? code : code2);
392 }
393
394 /* This routine ends a read or write transaction on the open transaction identified by transPtr.  It returns an error code. */
395 int ubik_EndTrans(transPtr)
396     register struct ubik_trans *transPtr; /* in */ {
397     register afs_int32 code;
398     struct timeval tv;
399     afs_int32 realStart;
400     register struct ubik_server *ts;
401     afs_int32 now;
402     register struct ubik_dbase *dbase;
403     
404     if (transPtr->type == UBIK_WRITETRANS) {
405        code = ubik_Flush(transPtr);
406        if (code) {
407           ubik_AbortTrans(transPtr);
408           return(code);
409        }
410     }
411
412     dbase = transPtr->dbase;
413     DBHOLD(dbase);
414     bzero(&dbase->cachedVersion, sizeof(struct ubik_version));
415
416     /* give up if no longer current */
417     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
418         udisk_abort(transPtr);
419         udisk_end(transPtr);
420         DBRELE(dbase);
421         return UNOQUORUM;
422     }
423
424     if (transPtr->type == UBIK_READTRANS) { /* reads are easy */
425         code = udisk_commit(transPtr);
426         if (code == 0) goto success;    /* update cachedVersion correctly */
427         udisk_end(transPtr);
428         DBRELE(dbase);
429         return code;
430     }
431
432     if (!ubeacon_AmSyncSite()) {    /* no longer sync site */
433         udisk_abort(transPtr);
434         udisk_end(transPtr);
435         DBRELE(dbase);
436         return UNOTSYNC;
437     }
438     
439     /* now it is safe to do commit */
440     code = udisk_commit(transPtr);
441     if (code == 0) code = ContactQuorum(DISK_Commit, transPtr, CStampVersion);
442     if (code) {
443         /* failed to commit, so must return failure.  Try to clear locks first, just for fun
444             Note that we don't know if this transaction will eventually commit at this point.
445             If it made it to a site that will be present in the next quorum, we win, otherwise
446             we lose.  If we contact a majority of sites, then we won't be here: contacting
447             a majority guarantees commit, since it guarantees that one dude will be a
448             member of the next quorum. */
449         ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
450         udisk_end(transPtr);
451         DBRELE(dbase);
452         return code;
453     }
454     /* before we can start sending unlock messages, we must wait until all servers
455         that are possibly still functioning on the other side of a network partition
456         have timed out.  Check the server structures, compute how long to wait, then
457         start the unlocks */
458     realStart = FT_ApproxTime();
459     while (1) {
460         /* wait for all servers to time out */
461         code = 0;
462         now = FT_ApproxTime();
463         /* check if we're still sync site, the guy should either come up
464             to us, or timeout.  Put safety check in anyway */
465         if (now - realStart > 10 * BIGTIME) {
466             ubik_stats.escapes++;
467             printf("ubik escaping from commit wait\n");
468             break;
469         }
470         for(ts = ubik_servers; ts; ts=ts->next) {
471             if (!ts->beaconSinceDown && now <= ts->lastBeaconSent + BIGTIME) {
472                 /* this guy could have some damaged data, wait for him */
473                 code = 1;
474                 tv.tv_sec = 1;  /* try again after a while (ha ha) */
475                 tv.tv_usec = 0;
476                 IOMGR_Select(0, 0, 0, 0, &tv);  /* poll, should we wait on something? */
477                 break;
478             }
479         }
480         if (code == 0) break;       /* no down ones still pseudo-active */
481     }
482
483     /* finally, unlock all the dudes.  We can return success independent of the number of servers
484         that really unlock the dbase; the others will do it if/when they elect a new sync site.
485         The transaction is committed anyway, since we succeeded in contacting a quorum
486         at the start (when invoking the DiskCommit function).
487     */
488     ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
489
490   success:
491     udisk_end(transPtr);
492     /* update version on successful EndTrans */
493     bcopy(&dbase->version, &dbase->cachedVersion, sizeof(struct ubik_version));
494     
495     DBRELE(dbase);
496     return 0;
497 }
498
499 /* This routine reads length bytes into buffer from the current position in the database.  The file pointer is updated appropriately (by adding the number of bytes actually transferred), and the length actually transferred is stored in the long integer pointed to by length.  Note that *length is an INOUT parameter: at the start it represents the size of the buffer, and when done, it contains the number of bytes actually transferred.  A short read returns zero for an error code. */
500
501 int ubik_Read(transPtr, buffer, length)
502     register struct ubik_trans *transPtr;       /* in */
503     char *buffer;   /* in */
504     afs_int32 length;   /* in */ {
505     register afs_int32 code;
506
507     /* reads are easy to do: handle locally */
508     DBHOLD(transPtr->dbase);
509     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
510         DBRELE(transPtr->dbase);
511         return UNOQUORUM;
512     }
513     
514     code = udisk_read(transPtr, transPtr->seekFile, buffer, transPtr->seekPos, length);
515     if (code == 0) {
516         transPtr->seekPos += length;
517     }
518     DBRELE(transPtr->dbase);
519     return code;
520 }
521
522 /* This routine will flush the io data in the iovec structures. It first
523  * flushes to the local disk and then uses ContactQuorum to write it to 
524  * the other servers.
525  */
526 int ubik_Flush(transPtr)
527     struct ubik_trans *transPtr;
528 {
529     afs_int32 code, error=0;
530
531     if (transPtr->type != UBIK_WRITETRANS)
532        return UBADTYPE;
533     if (!transPtr->iovec_info.iovec_wrt_len || !transPtr->iovec_info.iovec_wrt_val)
534        return 0;
535
536     DBHOLD(transPtr->dbase);
537     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
538        ERROR_EXIT(UNOQUORUM);
539     if (!ubeacon_AmSyncSite())      /* only sync site can write */
540        ERROR_EXIT(UNOTSYNC);
541
542     /* Update the rest of the servers in the quorum */
543     code = ContactQuorum(DISK_WriteV, transPtr, 0, 
544                          &transPtr->iovec_info, &transPtr->iovec_data);
545     if (code) {
546        udisk_abort(transPtr);
547        ContactQuorum(DISK_Abort, transPtr, 0);    /* force aborts to the others */
548        transPtr->iovec_info.iovec_wrt_len = 0;
549        transPtr->iovec_data.iovec_buf_len = 0;
550        ERROR_EXIT(code);
551     }
552
553     /* Wrote the buffers out, so start at scratch again */
554     transPtr->iovec_info.iovec_wrt_len = 0;
555     transPtr->iovec_data.iovec_buf_len = 0;
556
557   error_exit:
558     DBRELE(transPtr->dbase);
559     return error;
560 }
561
562 int ubik_Write(transPtr, buffer, length)
563     register struct ubik_trans *transPtr;       /* in */
564     char *buffer;       /* in */
565     afs_int32 length;   /* in */
566 {
567     struct ubik_iovec *iovec;
568     afs_int32 code, error=0;
569     afs_int32 pos, len, size;
570
571     if (transPtr->type != UBIK_WRITETRANS)
572        return UBADTYPE;
573     if (!length)
574        return 0;
575
576     if (length > IOVEC_MAXBUF) {
577        for (pos=0, len=length; len>0; len-=size, pos+=size) {
578           size = ((len < IOVEC_MAXBUF) ? len : IOVEC_MAXBUF);
579           code = ubik_Write(transPtr, &buffer[pos], size);
580           if (code) return (code);
581        }
582        return 0;
583     }
584
585     if (!transPtr->iovec_info.iovec_wrt_val) {
586        transPtr->iovec_info.iovec_wrt_len  = 0;
587        transPtr->iovec_info.iovec_wrt_val  = 
588          (struct ubik_iovec *)malloc(IOVEC_MAXWRT*sizeof(struct ubik_iovec));
589        transPtr->iovec_data.iovec_buf_len = 0;
590        transPtr->iovec_data.iovec_buf_val = (char *)malloc(IOVEC_MAXBUF);
591        if (!transPtr->iovec_info.iovec_wrt_val || !transPtr->iovec_data.iovec_buf_val) {
592           if (transPtr->iovec_info.iovec_wrt_val) free(transPtr->iovec_info.iovec_wrt_val);
593           transPtr->iovec_info.iovec_wrt_val = 0;
594           if (transPtr->iovec_data.iovec_buf_val) free(transPtr->iovec_data.iovec_buf_val);
595           transPtr->iovec_data.iovec_buf_val = 0;
596           return UNOMEM;
597        }
598     }
599
600     /* If this write won't fit in the structure, then flush it out and start anew */
601     if ( (transPtr->iovec_info.iovec_wrt_len >= IOVEC_MAXWRT) ||
602          ((length + transPtr->iovec_data.iovec_buf_len) > IOVEC_MAXBUF) ) {
603        code = ubik_Flush(transPtr);
604        if (code) return (code);
605     }
606
607     DBHOLD(transPtr->dbase);
608     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
609        ERROR_EXIT(UNOQUORUM);
610     if (!ubeacon_AmSyncSite())      /* only sync site can write */
611        ERROR_EXIT(UNOTSYNC);
612
613     /* Write to the local disk */
614     code = udisk_write(transPtr, transPtr->seekFile, buffer, 
615                                  transPtr->seekPos, length);
616     if (code) {
617        udisk_abort(transPtr);
618        transPtr->iovec_info.iovec_wrt_len = 0;
619        transPtr->iovec_data.iovec_buf_len = 0;
620        DBRELE(transPtr->dbase);
621        return(code);
622     }
623
624     /* Collect writes for the other ubik servers (to be done in bulk) */
625     iovec = (struct ubik_iovec *)transPtr->iovec_info.iovec_wrt_val;
626     iovec[transPtr->iovec_info.iovec_wrt_len].file     = transPtr->seekFile;
627     iovec[transPtr->iovec_info.iovec_wrt_len].position = transPtr->seekPos;
628     iovec[transPtr->iovec_info.iovec_wrt_len].length   = length;
629     
630     bcopy(buffer, 
631           &transPtr->iovec_data.iovec_buf_val[transPtr->iovec_data.iovec_buf_len],
632           length);
633
634     transPtr->iovec_info.iovec_wrt_len++;
635     transPtr->iovec_data.iovec_buf_len += length;
636     transPtr->seekPos += length;
637
638   error_exit:
639     DBRELE(transPtr->dbase);
640     return error;
641 }
642
643 /* This sets the file pointer associated with the current transaction to the appropriate file and byte position.  Unlike Unix files, a transaction is labelled by both a file number (fileid) and a byte position relative to the specified file (position). */
644
645 int ubik_Seek(transPtr, fileid, position)
646     register struct ubik_trans *transPtr;       /* IN */
647     afs_int32 fileid;    /* IN */
648     afs_int32 position;  /* IN */ {
649     register afs_int32 code;
650
651     DBHOLD(transPtr->dbase);
652     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
653         code = UNOQUORUM;
654     } else  {
655         transPtr->seekFile = fileid;
656         transPtr->seekPos = position;
657         code = 0;
658     }
659     DBRELE(transPtr->dbase);
660     return code;
661 }
662
663 /* This call returns the file pointer associated with the specified transaction in fileid and position. */
664
665 int ubik_Tell(transPtr, fileid, position)
666     register struct ubik_trans *transPtr;       /* IN */
667     afs_int32 *fileid;   /* OUT */
668     afs_int32 *position; /* OUT */ {
669     DBHOLD(transPtr->dbase);
670     *fileid = transPtr->seekFile;
671     *position = transPtr->seekPos;
672     DBRELE(transPtr->dbase);
673     return 0;
674 }
675
676 /* This sets the file size for the currently-selected file to length bytes, if length is less than the file's current size. */
677
678 int ubik_Truncate(transPtr, length)
679     register struct ubik_trans *transPtr;       /* in */
680     afs_int32 length;    /* in */ {
681     afs_int32 code, error=0;
682
683     /* Will also catch if not UBIK_WRITETRANS */
684     code = ubik_Flush(transPtr);
685     if (code) return(code);
686
687     DBHOLD(transPtr->dbase);
688     /* first, check that quorum is still good, and that dbase is up-to-date */
689     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
690        ERROR_EXIT(UNOQUORUM);
691     if (!ubeacon_AmSyncSite())
692        ERROR_EXIT(UNOTSYNC);
693     
694     /* now do the operation locally, and propagate it out */
695     code = udisk_truncate(transPtr, transPtr->seekFile, length);
696     if (!code) {
697         code = ContactQuorum(DISK_Truncate, transPtr, 0, transPtr->seekFile, length);
698     }
699     if (code) {
700         /* we must abort the operation */
701         udisk_abort(transPtr);
702         ContactQuorum(DISK_Abort, transPtr, 0);    /* force aborts to the others */
703         ERROR_EXIT(code);
704     }
705
706   error_exit:
707     DBRELE(transPtr->dbase);
708     return error;
709 }
710
711 /* set a lock; all locks are released on transaction end (commit/abort) */
712 ubik_SetLock(atrans, apos, alen, atype)
713     struct ubik_trans *atrans;
714     afs_int32 apos, alen;                  /* apos and alen are not used */
715     int atype; {
716     afs_int32 code=0, error=0;
717     
718     if (atype == LOCKWRITE) {
719        if (atrans->type == UBIK_READTRANS) return UBADTYPE;
720        code = ubik_Flush(atrans);
721        if (code) return(code);
722     }
723
724     DBHOLD(atrans->dbase);
725     if (atype == LOCKREAD) {
726         code = ulock_getLock(atrans, atype, 1);
727         if (code) ERROR_EXIT(code);
728     }
729     else {
730         /* first, check that quorum is still good, and that dbase is up-to-date */
731         if (!urecovery_AllBetter(atrans->dbase, atrans->flags & TRREADANY))
732            ERROR_EXIT(UNOQUORUM);
733         if (!ubeacon_AmSyncSite())
734            ERROR_EXIT(UNOTSYNC);
735
736         /* now do the operation locally, and propagate it out */
737         code = ulock_getLock(atrans, atype, 1);
738         if (code == 0) {
739             code = ContactQuorum(DISK_Lock, atrans, 0, 0, 
740                                  1/*unused*/, 1/*unused*/, LOCKWRITE);
741         }
742         if (code) {
743             /* we must abort the operation */
744             udisk_abort(atrans);
745             ContactQuorum(DISK_Abort, atrans, 0);    /* force aborts to the others */
746             ERROR_EXIT(code);
747         }
748     }
749
750   error_exit:
751     DBRELE(atrans->dbase);
752     return error;
753 }
754
755 /* utility to wait for a version # to change */
756 int ubik_WaitVersion(adatabase, aversion)
757 register struct ubik_version *aversion;
758 register struct ubik_dbase *adatabase; {
759     while (1) {
760         /* wait until version # changes, and then return */
761         if (vcmp(*aversion, adatabase->version) != 0)
762             return 0;
763         LWP_WaitProcess(&adatabase->version);   /* same vers, just wait */
764     }
765 }
766
767 /* utility to get the version of the dbase a transaction is dealing with */
768 int ubik_GetVersion(atrans, avers)
769 register struct ubik_trans *atrans;
770 register struct ubik_version *avers; {
771     *avers = atrans->dbase->version;
772     return 0;
773 }
774
775 /* Facility to simplify database caching.  Returns zero if last trans was done
776    on the local server and was successful.  If return value is non-zero and the
777    caller is a server caching part of the Ubik database, it should invalidate
778    that cache.  A return value of -1 means bad (NULL) argument. */
779
780 int ubik_CacheUpdate (atrans)
781   register struct ubik_trans *atrans;
782 {
783     if (!(atrans && atrans->dbase)) return -1;
784     return vcmp(atrans->dbase->cachedVersion, atrans->dbase->version) != 0;
785 }
786
787 int panic(a, b, c, d)
788     char *a, *b, *c, *d;
789 {
790     ubik_print("Ubik PANIC: ");
791     printf(a, b, c, d);
792     abort();
793     printf("BACK FROM ABORT\n");    /* shouldn't come back */
794     exit(1);                        /* never know, though  */
795 }
796
797 /*
798 ** This functions takes an IP addresses as its parameter. It returns the
799 ** the primary IP address that is on the host passed in.
800 */
801 afs_uint32
802 ubikGetPrimaryInterfaceAddr(addr)
803 afs_uint32  addr;                       /* network byte order */
804 {
805         struct ubik_server *ts;
806         int j;
807
808         for ( ts=ubik_servers; ts; ts=ts->next )
809             for ( j=0; j < UBIK_MAX_INTERFACE_ADDR; j++)
810                 if ( ts->addr[j] == addr )
811                     return  ts->addr[0];        /* net byte order */
812         return 0; /* if not in server database, return error */
813 }
814