ubik-clone-support-20010212
[openafs.git] / src / ubik / ubik.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afs/param.h>
11 #include <sys/types.h>
12 #ifdef AFS_NT40_ENV
13 #include <winsock2.h>
14 #else
15 #include <sys/file.h>
16 #include <netinet/in.h>
17 #include <sys/param.h>
18 #endif
19 #include <time.h>
20 #include <lock.h>
21 #include <rx/xdr.h>
22 #include <rx/rx.h>
23 #include <afs/cellconfig.h>
24
25 #define UBIK_INTERNALS
26 #include "ubik.h"
27 #include "ubik_int.h"
28
29 #define ERROR_EXIT(code) {error=(code); goto error_exit;}
30
31 /*  This system is organized in a hierarchical set of related modules.  Modules
32     at one level can only call modules at the same level or below.
33     
34     At the bottom level (0) we have R, RFTP, LWP and IOMGR, i.e. the basic
35     operating system primitives.
36     
37     At the next level (1) we have
38
39         VOTER--The module responsible for casting votes when asked.  It is also
40         responsible for determining whether this server should try to become
41         a synchronization site.
42         
43         BEACONER--The module responsible for sending keep-alives out when a
44         server is actually the sync site, or trying to become a sync site.
45         
46         DISK--The module responsible for representing atomic transactions
47         on the local disk.  It maintains a new-value only log.
48         
49         LOCK--The module responsible for locking byte ranges in the database file.
50         
51     At the next level (2) we have
52           
53         RECOVERY--The module responsible for ensuring that all members of a quorum
54         have the same up-to-date database after a new synchronization site is
55         elected.  This module runs only on the synchronization site.
56         
57     At the next level (3) we have
58     
59         REMOTE--The module responsible for interpreting requests from the sync
60         site and applying them to the database, after obtaining the appropriate
61         locks.
62         
63     At the next level (4) we have
64     
65         UBIK--The module users call to perform operations on the database.
66 */
67
68
69 /* some globals */
70 afs_int32 ubik_quorum=0;
71 struct ubik_dbase *ubik_dbase=0;
72 struct ubik_stats ubik_stats;
73 afs_uint32 ubik_host[UBIK_MAX_INTERFACE_ADDR];
74 afs_int32 ubik_epochTime = 0;
75 afs_int32 urecovery_state = 0;
76 int (*ubik_SRXSecurityProc)();
77 char *ubik_SRXSecurityRock;
78 struct ubik_server *ubik_servers;
79 short ubik_callPortal;
80
81 static int BeginTrans();
82
83 struct rx_securityClass *ubik_sc[3];
84
85 /* perform an operation at a quorum, handling error conditions.  return 0 if
86     all worked, otherwise mark failing server as down and return UERROR
87
88     Note that if any server misses an update, we must wait BIGTIME seconds before
89     allowing the transaction to commit, to ensure that the missing and possibly still
90     functioning server times out and stop handing out old data.  This is done in the commit
91     code, where we wait for a server marked down to have stayed down for BIGTIME seconds
92     before we allow a transaction to commit.  A server that fails but comes back up won't give
93     out old data because it is sent the sync count along with the beacon message that
94     marks it as *really* up (beaconSinceDown).
95 */
96 #define CStampVersion       1       /* meaning set ts->version */
97 afs_int32 ContactQuorum(aproc, atrans, aflags, aparm0, aparm1, aparm2, aparm3, aparm4, aparm5)
98     int (*aproc)();
99     int aflags;
100     register struct ubik_trans *atrans;
101     long aparm0, aparm1, aparm2, aparm3, aparm4, aparm5; {
102     register struct ubik_server *ts;
103     register afs_int32 code;
104     afs_int32 rcode, okcalls;
105
106     rcode = 0;
107     okcalls = 0;
108     for(ts = ubik_servers; ts; ts=ts->next) {
109         /* for each server */
110         if (!ts->up || !ts->currentDB) {
111             ts->currentDB = 0;  /* db is no longer current; we just missed an update */
112             continue;    /* not up-to-date, don't bother */
113         }
114         code = (*aproc) (ts->disk_rxcid, &atrans->tid, aparm0, aparm1, aparm2, aparm3, aparm4, aparm5);
115         if ( (aproc == DISK_WriteV) && (code <= -450) && (code > -500) ) {
116            /* An RPC interface mismatch (as defined in comerr/error_msg.c).
117             * Un-bulk the entries and do individual DISK_Write calls
118             * instead of DISK_WriteV.
119             */
120            iovec_wrt         *iovec_infoP = (iovec_wrt *)aparm0;
121            iovec_buf         *iovec_dataP = (iovec_buf *)aparm1;
122            struct ubik_iovec *iovec       = (struct ubik_iovec *)iovec_infoP->iovec_wrt_val;
123            char              *iobuf       = (char *)iovec_dataP->iovec_buf_val;
124            bulkdata          tcbs;
125            afs_int32             i, offset;
126
127            for (i=0, offset=0; i<iovec_infoP->iovec_wrt_len; i++) {
128               /* Sanity check for going off end of buffer */
129               if ((offset + iovec[i].length) > iovec_dataP->iovec_buf_len) {
130                  code = UINTERNAL;
131                  break;
132               }
133               tcbs.bulkdata_len = iovec[i].length;
134               tcbs.bulkdata_val = &iobuf[offset];
135               code = DISK_Write(ts->disk_rxcid, &atrans->tid,
136                                 iovec[i].file, iovec[i].position, &tcbs);
137               if (code) break;
138
139               offset += iovec[i].length;
140            }
141         }
142         if (code) {         /* failure */
143             rcode = code;
144             ts->up = 0;     /* mark as down now; beacons will no longer be sent */
145             ts->currentDB = 0;
146             ts->beaconSinceDown = 0;
147             urecovery_LostServer(); /* tell recovery to try to resend dbase later */
148         } else {            /* success */
149             if (!ts->isClone)
150                 okcalls++;          /* count up how many worked */
151             if (aflags & CStampVersion) {
152                 ts->version = atrans->dbase->version;
153             }
154         }
155     }
156     /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
157     if (okcalls+1 >= ubik_quorum) return 0;
158     else return rcode;
159 }
160
161 /* This routine initializes the ubik system for a set of servers.  It returns 0 for success, or an error code on failure.  The set of servers is specified by serverList; nServers gives the number of entries in this array.  Finally, dbase is the returned structure representing this instance of a ubik; it is passed to various calls below.  The variable pathName provides an initial prefix used for naming storage files used by this system.  It should perhaps be generalized to a low-level disk interface providing read, write, file enumeration and sync operations.
162
163     Note that the host named by myHost should not also be listed in serverList.
164 */
165
166 int ubik_ServerInitByInfo(myHost, myPort, info, clones, pathName, dbase)
167     struct afsconf_cell *info;  /* in */
168     char clones[];
169     afs_int32 myHost;
170     short myPort;
171     char *pathName;     /* in */
172     struct ubik_dbase **dbase; /* out */ 
173 {
174      afs_int32 code;
175     
176      code = ubik_ServerInitCommon(myHost, myPort, info, clones, 0, pathName, dbase);
177      return code;
178 }
179
180 int ubik_ServerInit(myHost, myPort, serverList, pathName, dbase)
181     afs_int32 serverList[];    /* in */
182     afs_int32 myHost;
183     short myPort;
184     char *pathName;     /* in */
185     struct ubik_dbase **dbase; /* out */ 
186 {
187      afs_int32 code;
188     
189      code = ubik_ServerInitCommon(myHost, myPort, (struct afsconf_cell *)0, 0,
190                         serverList, pathName, dbase);
191      return code;
192 }
193
194 int ubik_ServerInitCommon(myHost, myPort, info, clones, serverList, pathName, dbase)
195     afs_int32 myHost;
196     short myPort;
197     struct afsconf_cell *info;  /* in */
198     char clones[];
199     afs_int32 serverList[];    /* in */
200     char *pathName;     /* in */
201     struct ubik_dbase **dbase; /* out */ 
202 {
203     register struct ubik_dbase *tdb;
204     register afs_int32 code;
205     PROCESS junk;
206     afs_int32 secIndex;
207     struct rx_securityClass *secClass;
208
209     struct rx_service *tservice;
210     extern struct rx_securityClass *rxnull_NewServerSecurityObject();
211     extern int VOTE_ExecuteRequest(), DISK_ExecuteRequest();
212     extern void rx_ServerProc();
213     extern int rx_stackSize;
214
215     initialize_u_error_table();
216
217     tdb = (struct ubik_dbase *) malloc(sizeof(struct ubik_dbase));
218     tdb->pathName = (char *) malloc(strlen(pathName)+1);
219     strcpy(tdb->pathName, pathName);
220     tdb->activeTrans = (struct ubik_trans *) 0;
221     bzero(&tdb->version, sizeof(struct ubik_version));
222     bzero(&tdb->cachedVersion, sizeof(struct ubik_version));
223     Lock_Init(&tdb->versionLock);
224     tdb->flags = 0;
225     tdb->read = uphys_read;
226     tdb->write = uphys_write;
227     tdb->truncate = uphys_truncate;
228     tdb->open = 0;  /* this function isn't used any more */
229     tdb->sync = uphys_sync;
230     tdb->stat = uphys_stat;
231     tdb->getlabel = uphys_getlabel;
232     tdb->setlabel = uphys_setlabel;
233     tdb->getnfiles = uphys_getnfiles;
234     tdb->readers=0;
235     tdb->tidCounter=tdb->writeTidCounter=0;
236     *dbase = tdb;
237     ubik_dbase = tdb;   /* for now, only one db per server; can fix later when we have names for the other dbases */
238
239     /* initialize RX */
240     ubik_callPortal = myPort;
241     /* try to get an additional security object */
242     ubik_sc[0] = rxnull_NewServerSecurityObject();
243     ubik_sc[1] = 0;
244     ubik_sc[2] = 0;
245     if (ubik_SRXSecurityProc) {
246         code = (*ubik_SRXSecurityProc)(ubik_SRXSecurityRock, &secClass, &secIndex);
247         if (code == 0) {
248             ubik_sc[secIndex] = secClass;
249         }
250     }
251     code = rx_Init(myPort);
252     if (code < 0) return code;
253     tservice = rx_NewService(0, VOTE_SERVICE_ID, "VOTE", ubik_sc, 3, VOTE_ExecuteRequest);
254     if (tservice == (struct rx_service *)0) {
255         ubik_dprint("Could not create VOTE rx service!\n");
256         return -1;
257     }
258     rx_SetMinProcs(tservice, 2);
259     rx_SetMaxProcs(tservice, 3);
260
261     tservice = rx_NewService(0, DISK_SERVICE_ID, "DISK", ubik_sc, 3, DISK_ExecuteRequest);
262     if (tservice == (struct rx_service *)0) {
263         ubik_dprint("Could not create DISK rx service!\n");
264         return -1;
265     }
266     rx_SetMinProcs(tservice, 2);
267     rx_SetMaxProcs(tservice, 3);
268
269     /* start an rx_ServerProc to handle incoming RPC's in particular the 
270      * UpdateInterfaceAddr RPC that occurs in ubeacon_InitServerList. This avoids
271      * the "steplock" problem in ubik initialization. Defect 11037.
272      */
273     LWP_CreateProcess(rx_ServerProc, rx_stackSize, RX_PROCESS_PRIORITY,
274                       0, "rx_ServerProc", &junk);
275
276     /* do basic initialization */
277     code = uvote_Init();
278     if (code) return code;
279     code = urecovery_Initialize(tdb);
280     if (code) return code;
281     if (info)
282         code = ubeacon_InitServerListByInfo(myHost, info, clones);
283     else 
284         code = ubeacon_InitServerList(myHost, serverList);
285     if (code) return code;
286
287     /* now start up async processes */
288     code = LWP_CreateProcess(ubeacon_Interact, 16384/*8192*/, LWP_MAX_PRIORITY-1,
289                              0, "beacon", &junk);
290     if (code) return code;
291     code = LWP_CreateProcess(urecovery_Interact, 16384/*8192*/, LWP_MAX_PRIORITY-1,
292                              0, "recovery", &junk);
293     return code;
294 }
295
296 /*  This routine begins a read or write transaction on the transaction
297     identified by transPtr, in the dbase named by dbase.  An open mode of
298     ubik_READTRANS identifies this as a read transaction, while a mode of
299     ubik_WRITETRANS identifies this as a write transaction.  transPtr 
300     is set to the returned transaction control block. The readAny flag is
301     set to 0 or 1 by the wrapper functions ubik_BeginTrans() or 
302     ubik_BeginTransReadAny() below.
303
304     We can only begin transaction when we have an up-to-date database.
305 */
306
307 static int BeginTrans(dbase, transMode, transPtr, readAny)
308     register struct ubik_dbase *dbase;  /* in */
309     int readAny;
310     afs_int32 transMode; /* in */
311     struct ubik_trans **transPtr;       /* out */ {
312     struct ubik_trans *jt;
313     register struct ubik_trans *tt;
314     register afs_int32 code;
315
316     if ((transMode != UBIK_READTRANS) && readAny) return UBADTYPE;
317     DBHOLD(dbase);
318     if (urecovery_AllBetter(dbase, readAny)==0) {
319         DBRELE(dbase);
320         return UNOQUORUM;
321     }
322     /* otherwise we have a quorum, use it */
323
324     /* make sure that at most one write transaction occurs at any one time.  This
325         has nothing to do with transaction locking; that's enforced by the lock package.  However,
326         we can't even handle two non-conflicting writes, since our log and recovery modules
327         don't know how to restore one without possibly picking up some data from the other. */
328     if (transMode == UBIK_WRITETRANS) {
329         /* if we're writing already, wait */
330         while(dbase->flags & DBWRITING) {
331             DBRELE(dbase);
332             LWP_WaitProcess(&dbase->flags);
333             DBHOLD(dbase);
334         }
335         if (!ubeacon_AmSyncSite()) {
336             DBRELE(dbase);
337             return UNOTSYNC;
338         }
339     }
340
341     /* create the transaction */
342     code = udisk_begin(dbase, transMode, &jt);  /* can't take address of register var */
343     tt = jt;        /* move to a register */
344     if (code || tt == (struct ubik_trans *)NULL) {
345         DBRELE(dbase);
346         return code;
347     }
348     if (readAny) tt->flags |= TRREADANY;
349     /* label trans and dbase with new tid */
350     tt->tid.epoch = ubik_epochTime;
351     /* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */
352     tt->tid.counter = (dbase->tidCounter += 2);
353
354     if (transMode == UBIK_WRITETRANS) {
355       /* for a write trans, we have to keep track of the write tid counter too */
356       dbase->writeTidCounter += 2;
357
358         /* next try to start transaction on appropriate number of machines */
359         code = ContactQuorum(DISK_Begin, tt, 0);
360         if (code) {
361             /* we must abort the operation */
362             udisk_abort(tt);
363             ContactQuorum(DISK_Abort, tt, 0);    /* force aborts to the others */
364             udisk_end(tt);
365             DBRELE(dbase);
366             return code;
367         }
368     }
369
370     *transPtr = tt;
371     DBRELE(dbase);
372     return 0;
373 }
374
375 int ubik_BeginTrans(dbase, transMode, transPtr)
376     register struct ubik_dbase *dbase;  /* in */
377     afs_int32 transMode; /* in */
378     struct ubik_trans **transPtr;       /* out */ {
379         return BeginTrans(dbase, transMode, transPtr, 0);
380 }
381
382 int ubik_BeginTransReadAny(dbase, transMode, transPtr)
383     register struct ubik_dbase *dbase;  /* in */
384     afs_int32 transMode; /* in */
385     struct ubik_trans **transPtr;       /* out */ {
386         return BeginTrans(dbase, transMode, transPtr, 1);
387
388  
389 /* this routine ends a read or write transaction by aborting it */
390 int ubik_AbortTrans(transPtr)
391     register struct ubik_trans *transPtr; /* in */ {
392     register afs_int32 code;
393     afs_int32 code2;
394     register struct ubik_dbase *dbase;
395     
396     dbase = transPtr->dbase;
397     DBHOLD(dbase);
398     bzero(&dbase->cachedVersion, sizeof(struct ubik_version));
399     /* see if we're still up-to-date */
400     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
401         udisk_abort(transPtr);
402         udisk_end(transPtr);
403         DBRELE(dbase);
404         return UNOQUORUM;
405     }
406
407     if (transPtr->type == UBIK_READTRANS) {
408         code = udisk_abort(transPtr);
409         udisk_end(transPtr);
410         DBRELE(dbase);
411         return code;
412     }
413
414     /* below here, we know we're doing a write transaction */
415     if (!ubeacon_AmSyncSite()) {
416         udisk_abort(transPtr);
417         udisk_end(transPtr);
418         DBRELE(dbase);
419         return UNOTSYNC;
420     }
421     
422     /* now it is safe to try remote abort */
423     code = ContactQuorum(DISK_Abort, transPtr, 0);
424     code2 = udisk_abort(transPtr);
425     udisk_end(transPtr);
426     DBRELE(dbase);
427     return (code? code : code2);
428 }
429
430 /* This routine ends a read or write transaction on the open transaction identified by transPtr.  It returns an error code. */
431 int ubik_EndTrans(transPtr)
432     register struct ubik_trans *transPtr; /* in */ {
433     register afs_int32 code;
434     struct timeval tv;
435     afs_int32 realStart;
436     register struct ubik_server *ts;
437     afs_int32 now;
438     register struct ubik_dbase *dbase;
439     
440     if (transPtr->type == UBIK_WRITETRANS) {
441        code = ubik_Flush(transPtr);
442        if (code) {
443           ubik_AbortTrans(transPtr);
444           return(code);
445        }
446     }
447
448     dbase = transPtr->dbase;
449     DBHOLD(dbase);
450     bzero(&dbase->cachedVersion, sizeof(struct ubik_version));
451
452     /* give up if no longer current */
453     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
454         udisk_abort(transPtr);
455         udisk_end(transPtr);
456         DBRELE(dbase);
457         return UNOQUORUM;
458     }
459
460     if (transPtr->type == UBIK_READTRANS) { /* reads are easy */
461         code = udisk_commit(transPtr);
462         if (code == 0) goto success;    /* update cachedVersion correctly */
463         udisk_end(transPtr);
464         DBRELE(dbase);
465         return code;
466     }
467
468     if (!ubeacon_AmSyncSite()) {    /* no longer sync site */
469         udisk_abort(transPtr);
470         udisk_end(transPtr);
471         DBRELE(dbase);
472         return UNOTSYNC;
473     }
474     
475     /* now it is safe to do commit */
476     code = udisk_commit(transPtr);
477     if (code == 0) code = ContactQuorum(DISK_Commit, transPtr, CStampVersion);
478     if (code) {
479         /* failed to commit, so must return failure.  Try to clear locks first, just for fun
480             Note that we don't know if this transaction will eventually commit at this point.
481             If it made it to a site that will be present in the next quorum, we win, otherwise
482             we lose.  If we contact a majority of sites, then we won't be here: contacting
483             a majority guarantees commit, since it guarantees that one dude will be a
484             member of the next quorum. */
485         ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
486         udisk_end(transPtr);
487         DBRELE(dbase);
488         return code;
489     }
490     /* before we can start sending unlock messages, we must wait until all servers
491         that are possibly still functioning on the other side of a network partition
492         have timed out.  Check the server structures, compute how long to wait, then
493         start the unlocks */
494     realStart = FT_ApproxTime();
495     while (1) {
496         /* wait for all servers to time out */
497         code = 0;
498         now = FT_ApproxTime();
499         /* check if we're still sync site, the guy should either come up
500             to us, or timeout.  Put safety check in anyway */
501         if (now - realStart > 10 * BIGTIME) {
502             ubik_stats.escapes++;
503             printf("ubik escaping from commit wait\n");
504             break;
505         }
506         for(ts = ubik_servers; ts; ts=ts->next) {
507             if (!ts->beaconSinceDown && now <= ts->lastBeaconSent + BIGTIME) {
508                 /* this guy could have some damaged data, wait for him */
509                 code = 1;
510                 tv.tv_sec = 1;  /* try again after a while (ha ha) */
511                 tv.tv_usec = 0;
512                 IOMGR_Select(0, 0, 0, 0, &tv);  /* poll, should we wait on something? */
513                 break;
514             }
515         }
516         if (code == 0) break;       /* no down ones still pseudo-active */
517     }
518
519     /* finally, unlock all the dudes.  We can return success independent of the number of servers
520         that really unlock the dbase; the others will do it if/when they elect a new sync site.
521         The transaction is committed anyway, since we succeeded in contacting a quorum
522         at the start (when invoking the DiskCommit function).
523     */
524     ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
525
526   success:
527     udisk_end(transPtr);
528     /* update version on successful EndTrans */
529     bcopy(&dbase->version, &dbase->cachedVersion, sizeof(struct ubik_version));
530     
531     DBRELE(dbase);
532     return 0;
533 }
534
535 /* This routine reads length bytes into buffer from the current position in the database.  The file pointer is updated appropriately (by adding the number of bytes actually transferred), and the length actually transferred is stored in the long integer pointed to by length.  Note that *length is an INOUT parameter: at the start it represents the size of the buffer, and when done, it contains the number of bytes actually transferred.  A short read returns zero for an error code. */
536
537 int ubik_Read(transPtr, buffer, length)
538     register struct ubik_trans *transPtr;       /* in */
539     char *buffer;   /* in */
540     afs_int32 length;   /* in */ {
541     register afs_int32 code;
542
543     /* reads are easy to do: handle locally */
544     DBHOLD(transPtr->dbase);
545     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
546         DBRELE(transPtr->dbase);
547         return UNOQUORUM;
548     }
549     
550     code = udisk_read(transPtr, transPtr->seekFile, buffer, transPtr->seekPos, length);
551     if (code == 0) {
552         transPtr->seekPos += length;
553     }
554     DBRELE(transPtr->dbase);
555     return code;
556 }
557
558 /* This routine will flush the io data in the iovec structures. It first
559  * flushes to the local disk and then uses ContactQuorum to write it to 
560  * the other servers.
561  */
562 int ubik_Flush(transPtr)
563     struct ubik_trans *transPtr;
564 {
565     afs_int32 code, error=0;
566
567     if (transPtr->type != UBIK_WRITETRANS)
568        return UBADTYPE;
569     if (!transPtr->iovec_info.iovec_wrt_len || !transPtr->iovec_info.iovec_wrt_val)
570        return 0;
571
572     DBHOLD(transPtr->dbase);
573     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
574        ERROR_EXIT(UNOQUORUM);
575     if (!ubeacon_AmSyncSite())      /* only sync site can write */
576        ERROR_EXIT(UNOTSYNC);
577
578     /* Update the rest of the servers in the quorum */
579     code = ContactQuorum(DISK_WriteV, transPtr, 0, 
580                          &transPtr->iovec_info, &transPtr->iovec_data);
581     if (code) {
582        udisk_abort(transPtr);
583        ContactQuorum(DISK_Abort, transPtr, 0);    /* force aborts to the others */
584        transPtr->iovec_info.iovec_wrt_len = 0;
585        transPtr->iovec_data.iovec_buf_len = 0;
586        ERROR_EXIT(code);
587     }
588
589     /* Wrote the buffers out, so start at scratch again */
590     transPtr->iovec_info.iovec_wrt_len = 0;
591     transPtr->iovec_data.iovec_buf_len = 0;
592
593   error_exit:
594     DBRELE(transPtr->dbase);
595     return error;
596 }
597
598 int ubik_Write(transPtr, buffer, length)
599     register struct ubik_trans *transPtr;       /* in */
600     char *buffer;       /* in */
601     afs_int32 length;   /* in */
602 {
603     struct ubik_iovec *iovec;
604     afs_int32 code, error=0;
605     afs_int32 pos, len, size;
606
607     if (transPtr->type != UBIK_WRITETRANS)
608        return UBADTYPE;
609     if (!length)
610        return 0;
611
612     if (length > IOVEC_MAXBUF) {
613        for (pos=0, len=length; len>0; len-=size, pos+=size) {
614           size = ((len < IOVEC_MAXBUF) ? len : IOVEC_MAXBUF);
615           code = ubik_Write(transPtr, &buffer[pos], size);
616           if (code) return (code);
617        }
618        return 0;
619     }
620
621     if (!transPtr->iovec_info.iovec_wrt_val) {
622        transPtr->iovec_info.iovec_wrt_len  = 0;
623        transPtr->iovec_info.iovec_wrt_val  = 
624          (struct ubik_iovec *)malloc(IOVEC_MAXWRT*sizeof(struct ubik_iovec));
625        transPtr->iovec_data.iovec_buf_len = 0;
626        transPtr->iovec_data.iovec_buf_val = (char *)malloc(IOVEC_MAXBUF);
627        if (!transPtr->iovec_info.iovec_wrt_val || !transPtr->iovec_data.iovec_buf_val) {
628           if (transPtr->iovec_info.iovec_wrt_val) free(transPtr->iovec_info.iovec_wrt_val);
629           transPtr->iovec_info.iovec_wrt_val = 0;
630           if (transPtr->iovec_data.iovec_buf_val) free(transPtr->iovec_data.iovec_buf_val);
631           transPtr->iovec_data.iovec_buf_val = 0;
632           return UNOMEM;
633        }
634     }
635
636     /* If this write won't fit in the structure, then flush it out and start anew */
637     if ( (transPtr->iovec_info.iovec_wrt_len >= IOVEC_MAXWRT) ||
638          ((length + transPtr->iovec_data.iovec_buf_len) > IOVEC_MAXBUF) ) {
639        code = ubik_Flush(transPtr);
640        if (code) return (code);
641     }
642
643     DBHOLD(transPtr->dbase);
644     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
645        ERROR_EXIT(UNOQUORUM);
646     if (!ubeacon_AmSyncSite())      /* only sync site can write */
647        ERROR_EXIT(UNOTSYNC);
648
649     /* Write to the local disk */
650     code = udisk_write(transPtr, transPtr->seekFile, buffer, 
651                                  transPtr->seekPos, length);
652     if (code) {
653        udisk_abort(transPtr);
654        transPtr->iovec_info.iovec_wrt_len = 0;
655        transPtr->iovec_data.iovec_buf_len = 0;
656        DBRELE(transPtr->dbase);
657        return(code);
658     }
659
660     /* Collect writes for the other ubik servers (to be done in bulk) */
661     iovec = (struct ubik_iovec *)transPtr->iovec_info.iovec_wrt_val;
662     iovec[transPtr->iovec_info.iovec_wrt_len].file     = transPtr->seekFile;
663     iovec[transPtr->iovec_info.iovec_wrt_len].position = transPtr->seekPos;
664     iovec[transPtr->iovec_info.iovec_wrt_len].length   = length;
665     
666     bcopy(buffer, 
667           &transPtr->iovec_data.iovec_buf_val[transPtr->iovec_data.iovec_buf_len],
668           length);
669
670     transPtr->iovec_info.iovec_wrt_len++;
671     transPtr->iovec_data.iovec_buf_len += length;
672     transPtr->seekPos += length;
673
674   error_exit:
675     DBRELE(transPtr->dbase);
676     return error;
677 }
678
679 /* This sets the file pointer associated with the current transaction to the appropriate file and byte position.  Unlike Unix files, a transaction is labelled by both a file number (fileid) and a byte position relative to the specified file (position). */
680
681 int ubik_Seek(transPtr, fileid, position)
682     register struct ubik_trans *transPtr;       /* IN */
683     afs_int32 fileid;    /* IN */
684     afs_int32 position;  /* IN */ {
685     register afs_int32 code;
686
687     DBHOLD(transPtr->dbase);
688     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
689         code = UNOQUORUM;
690     } else  {
691         transPtr->seekFile = fileid;
692         transPtr->seekPos = position;
693         code = 0;
694     }
695     DBRELE(transPtr->dbase);
696     return code;
697 }
698
699 /* This call returns the file pointer associated with the specified transaction in fileid and position. */
700
701 int ubik_Tell(transPtr, fileid, position)
702     register struct ubik_trans *transPtr;       /* IN */
703     afs_int32 *fileid;   /* OUT */
704     afs_int32 *position; /* OUT */ {
705     DBHOLD(transPtr->dbase);
706     *fileid = transPtr->seekFile;
707     *position = transPtr->seekPos;
708     DBRELE(transPtr->dbase);
709     return 0;
710 }
711
712 /* This sets the file size for the currently-selected file to length bytes, if length is less than the file's current size. */
713
714 int ubik_Truncate(transPtr, length)
715     register struct ubik_trans *transPtr;       /* in */
716     afs_int32 length;    /* in */ {
717     afs_int32 code, error=0;
718
719     /* Will also catch if not UBIK_WRITETRANS */
720     code = ubik_Flush(transPtr);
721     if (code) return(code);
722
723     DBHOLD(transPtr->dbase);
724     /* first, check that quorum is still good, and that dbase is up-to-date */
725     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
726        ERROR_EXIT(UNOQUORUM);
727     if (!ubeacon_AmSyncSite())
728        ERROR_EXIT(UNOTSYNC);
729     
730     /* now do the operation locally, and propagate it out */
731     code = udisk_truncate(transPtr, transPtr->seekFile, length);
732     if (!code) {
733         code = ContactQuorum(DISK_Truncate, transPtr, 0, transPtr->seekFile, length);
734     }
735     if (code) {
736         /* we must abort the operation */
737         udisk_abort(transPtr);
738         ContactQuorum(DISK_Abort, transPtr, 0);    /* force aborts to the others */
739         ERROR_EXIT(code);
740     }
741
742   error_exit:
743     DBRELE(transPtr->dbase);
744     return error;
745 }
746
747 /* set a lock; all locks are released on transaction end (commit/abort) */
748 ubik_SetLock(atrans, apos, alen, atype)
749     struct ubik_trans *atrans;
750     afs_int32 apos, alen;                  /* apos and alen are not used */
751     int atype; {
752     afs_int32 code=0, error=0;
753     
754     if (atype == LOCKWRITE) {
755        if (atrans->type == UBIK_READTRANS) return UBADTYPE;
756        code = ubik_Flush(atrans);
757        if (code) return(code);
758     }
759
760     DBHOLD(atrans->dbase);
761     if (atype == LOCKREAD) {
762         code = ulock_getLock(atrans, atype, 1);
763         if (code) ERROR_EXIT(code);
764     }
765     else {
766         /* first, check that quorum is still good, and that dbase is up-to-date */
767         if (!urecovery_AllBetter(atrans->dbase, atrans->flags & TRREADANY))
768            ERROR_EXIT(UNOQUORUM);
769         if (!ubeacon_AmSyncSite())
770            ERROR_EXIT(UNOTSYNC);
771
772         /* now do the operation locally, and propagate it out */
773         code = ulock_getLock(atrans, atype, 1);
774         if (code == 0) {
775             code = ContactQuorum(DISK_Lock, atrans, 0, 0, 
776                                  1/*unused*/, 1/*unused*/, LOCKWRITE);
777         }
778         if (code) {
779             /* we must abort the operation */
780             udisk_abort(atrans);
781             ContactQuorum(DISK_Abort, atrans, 0);    /* force aborts to the others */
782             ERROR_EXIT(code);
783         }
784     }
785
786   error_exit:
787     DBRELE(atrans->dbase);
788     return error;
789 }
790
791 /* utility to wait for a version # to change */
792 int ubik_WaitVersion(adatabase, aversion)
793 register struct ubik_version *aversion;
794 register struct ubik_dbase *adatabase; {
795     while (1) {
796         /* wait until version # changes, and then return */
797         if (vcmp(*aversion, adatabase->version) != 0)
798             return 0;
799         LWP_WaitProcess(&adatabase->version);   /* same vers, just wait */
800     }
801 }
802
803 /* utility to get the version of the dbase a transaction is dealing with */
804 int ubik_GetVersion(atrans, avers)
805 register struct ubik_trans *atrans;
806 register struct ubik_version *avers; {
807     *avers = atrans->dbase->version;
808     return 0;
809 }
810
811 /* Facility to simplify database caching.  Returns zero if last trans was done
812    on the local server and was successful.  If return value is non-zero and the
813    caller is a server caching part of the Ubik database, it should invalidate
814    that cache.  A return value of -1 means bad (NULL) argument. */
815
816 int ubik_CacheUpdate (atrans)
817   register struct ubik_trans *atrans;
818 {
819     if (!(atrans && atrans->dbase)) return -1;
820     return vcmp(atrans->dbase->cachedVersion, atrans->dbase->version) != 0;
821 }
822
823 int panic(a, b, c, d)
824     char *a, *b, *c, *d;
825 {
826     ubik_print("Ubik PANIC: ");
827     printf(a, b, c, d);
828     abort();
829     printf("BACK FROM ABORT\n");    /* shouldn't come back */
830     exit(1);                        /* never know, though  */
831 }
832
833 /*
834 ** This functions takes an IP addresses as its parameter. It returns the
835 ** the primary IP address that is on the host passed in.
836 */
837 afs_uint32
838 ubikGetPrimaryInterfaceAddr(addr)
839 afs_uint32  addr;                       /* network byte order */
840 {
841         struct ubik_server *ts;
842         int j;
843
844         for ( ts=ubik_servers; ts; ts=ts->next )
845             for ( j=0; j < UBIK_MAX_INTERFACE_ADDR; j++)
846                 if ( ts->addr[j] == addr )
847                     return  ts->addr[0];        /* net byte order */
848         return 0; /* if not in server database, return error */
849 }
850