ubik-warning-cleanup-20011005
[openafs.git] / src / ubik / ubik.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 RCSID("$Header$");
14
15 #include <sys/types.h>
16 #ifdef AFS_NT40_ENV
17 #include <winsock2.h>
18 #else
19 #include <sys/file.h>
20 #include <netinet/in.h>
21 #include <sys/param.h>
22 #endif
23 #include <time.h>
24 #include <lock.h>
25 #ifdef HAVE_STRING_H
26 #include <string.h>
27 #else
28 #ifdef HAVE_STRINGS_H
29 #include <strings.h>
30 #endif
31 #endif
32 #include <rx/xdr.h>
33 #include <rx/rx.h>
34 #include <afs/cellconfig.h>
35
36 #define UBIK_INTERNALS
37 #include "ubik.h"
38 #include "ubik_int.h"
39
40 #define ERROR_EXIT(code) {error=(code); goto error_exit;}
41
42 /*  This system is organized in a hierarchical set of related modules.  Modules
43     at one level can only call modules at the same level or below.
44     
45     At the bottom level (0) we have R, RFTP, LWP and IOMGR, i.e. the basic
46     operating system primitives.
47     
48     At the next level (1) we have
49
50         VOTER--The module responsible for casting votes when asked.  It is also
51         responsible for determining whether this server should try to become
52         a synchronization site.
53         
54         BEACONER--The module responsible for sending keep-alives out when a
55         server is actually the sync site, or trying to become a sync site.
56         
57         DISK--The module responsible for representing atomic transactions
58         on the local disk.  It maintains a new-value only log.
59         
60         LOCK--The module responsible for locking byte ranges in the database file.
61         
62     At the next level (2) we have
63           
64         RECOVERY--The module responsible for ensuring that all members of a quorum
65         have the same up-to-date database after a new synchronization site is
66         elected.  This module runs only on the synchronization site.
67         
68     At the next level (3) we have
69     
70         REMOTE--The module responsible for interpreting requests from the sync
71         site and applying them to the database, after obtaining the appropriate
72         locks.
73         
74     At the next level (4) we have
75     
76         UBIK--The module users call to perform operations on the database.
77 */
78
79
80 /* some globals */
81 afs_int32 ubik_quorum=0;
82 struct ubik_dbase *ubik_dbase=0;
83 struct ubik_stats ubik_stats;
84 afs_uint32 ubik_host[UBIK_MAX_INTERFACE_ADDR];
85 afs_int32 ubik_epochTime = 0;
86 afs_int32 urecovery_state = 0;
87 int (*ubik_SRXSecurityProc)();
88 char *ubik_SRXSecurityRock;
89 struct ubik_server *ubik_servers;
90 short ubik_callPortal;
91
92 static int BeginTrans();
93
94 struct rx_securityClass *ubik_sc[3];
95
96 /* perform an operation at a quorum, handling error conditions.  return 0 if
97     all worked, otherwise mark failing server as down and return UERROR
98
99     Note that if any server misses an update, we must wait BIGTIME seconds before
100     allowing the transaction to commit, to ensure that the missing and possibly still
101     functioning server times out and stop handing out old data.  This is done in the commit
102     code, where we wait for a server marked down to have stayed down for BIGTIME seconds
103     before we allow a transaction to commit.  A server that fails but comes back up won't give
104     out old data because it is sent the sync count along with the beacon message that
105     marks it as *really* up (beaconSinceDown).
106 */
107 #define CStampVersion       1       /* meaning set ts->version */
108 afs_int32 ContactQuorum(aproc, atrans, aflags, aparm0, aparm1, aparm2, aparm3, aparm4, aparm5)
109     int (*aproc)();
110     int aflags;
111     register struct ubik_trans *atrans;
112     long aparm0, aparm1, aparm2, aparm3, aparm4, aparm5; {
113     register struct ubik_server *ts;
114     register afs_int32 code;
115     afs_int32 rcode, okcalls;
116
117     rcode = 0;
118     okcalls = 0;
119     for(ts = ubik_servers; ts; ts=ts->next) {
120         /* for each server */
121         if (!ts->up || !ts->currentDB) {
122             ts->currentDB = 0;  /* db is no longer current; we just missed an update */
123             continue;    /* not up-to-date, don't bother */
124         }
125         code = (*aproc) (ts->disk_rxcid, &atrans->tid, aparm0, aparm1, aparm2, aparm3, aparm4, aparm5);
126         if ( (aproc == DISK_WriteV) && (code <= -450) && (code > -500) ) {
127            /* An RPC interface mismatch (as defined in comerr/error_msg.c).
128             * Un-bulk the entries and do individual DISK_Write calls
129             * instead of DISK_WriteV.
130             */
131            iovec_wrt         *iovec_infoP = (iovec_wrt *)aparm0;
132            iovec_buf         *iovec_dataP = (iovec_buf *)aparm1;
133            struct ubik_iovec *iovec       = (struct ubik_iovec *)iovec_infoP->iovec_wrt_val;
134            char              *iobuf       = (char *)iovec_dataP->iovec_buf_val;
135            bulkdata          tcbs;
136            afs_int32             i, offset;
137
138            for (i=0, offset=0; i<iovec_infoP->iovec_wrt_len; i++) {
139               /* Sanity check for going off end of buffer */
140               if ((offset + iovec[i].length) > iovec_dataP->iovec_buf_len) {
141                  code = UINTERNAL;
142                  break;
143               }
144               tcbs.bulkdata_len = iovec[i].length;
145               tcbs.bulkdata_val = &iobuf[offset];
146               code = DISK_Write(ts->disk_rxcid, &atrans->tid,
147                                 iovec[i].file, iovec[i].position, &tcbs);
148               if (code) break;
149
150               offset += iovec[i].length;
151            }
152         }
153         if (code) {         /* failure */
154             rcode = code;
155             ts->up = 0;     /* mark as down now; beacons will no longer be sent */
156             ts->currentDB = 0;
157             ts->beaconSinceDown = 0;
158             urecovery_LostServer(); /* tell recovery to try to resend dbase later */
159         } else {            /* success */
160             if (!ts->isClone)
161                 okcalls++;          /* count up how many worked */
162             if (aflags & CStampVersion) {
163                 ts->version = atrans->dbase->version;
164             }
165         }
166     }
167     /* return 0 if we successfully contacted a quorum, otherwise return error code.  We don't have to contact ourselves (that was done locally) */
168     if (okcalls+1 >= ubik_quorum) return 0;
169     else return rcode;
170 }
171
172 /* This routine initializes the ubik system for a set of servers.  It returns 0 for success, or an error code on failure.  The set of servers is specified by serverList; nServers gives the number of entries in this array.  Finally, dbase is the returned structure representing this instance of a ubik; it is passed to various calls below.  The variable pathName provides an initial prefix used for naming storage files used by this system.  It should perhaps be generalized to a low-level disk interface providing read, write, file enumeration and sync operations.
173
174     Note that the host named by myHost should not also be listed in serverList.
175 */
176
177 int ubik_ServerInitByInfo(myHost, myPort, info, clones, pathName, dbase)
178     struct afsconf_cell *info;  /* in */
179     char clones[];
180     afs_int32 myHost;
181     short myPort;
182     char *pathName;     /* in */
183     struct ubik_dbase **dbase; /* out */ 
184 {
185      afs_int32 code;
186     
187      code = ubik_ServerInitCommon(myHost, myPort, info, clones, 0, pathName, dbase);
188      return code;
189 }
190
191 int ubik_ServerInit(myHost, myPort, serverList, pathName, dbase)
192     afs_int32 serverList[];    /* in */
193     afs_int32 myHost;
194     short myPort;
195     char *pathName;     /* in */
196     struct ubik_dbase **dbase; /* out */ 
197 {
198      afs_int32 code;
199     
200      code = ubik_ServerInitCommon(myHost, myPort, (struct afsconf_cell *)0, 0,
201                         serverList, pathName, dbase);
202      return code;
203 }
204
205 int ubik_ServerInitCommon(myHost, myPort, info, clones, serverList, pathName, dbase)
206     afs_int32 myHost;
207     short myPort;
208     struct afsconf_cell *info;  /* in */
209     char clones[];
210     afs_int32 serverList[];    /* in */
211     char *pathName;     /* in */
212     struct ubik_dbase **dbase; /* out */ 
213 {
214     register struct ubik_dbase *tdb;
215     register afs_int32 code;
216     PROCESS junk;
217     afs_int32 secIndex;
218     struct rx_securityClass *secClass;
219
220     struct rx_service *tservice;
221     extern struct rx_securityClass *rxnull_NewServerSecurityObject();
222     extern int VOTE_ExecuteRequest(), DISK_ExecuteRequest();
223     extern void rx_ServerProc();
224     extern int rx_stackSize;
225
226     initialize_U_error_table();
227
228     tdb = (struct ubik_dbase *) malloc(sizeof(struct ubik_dbase));
229     tdb->pathName = (char *) malloc(strlen(pathName)+1);
230     strcpy(tdb->pathName, pathName);
231     tdb->activeTrans = (struct ubik_trans *) 0;
232     memset(&tdb->version, 0, sizeof(struct ubik_version));
233     memset(&tdb->cachedVersion, 0, sizeof(struct ubik_version));
234     Lock_Init(&tdb->versionLock);
235     tdb->flags = 0;
236     tdb->read = uphys_read;
237     tdb->write = uphys_write;
238     tdb->truncate = uphys_truncate;
239     tdb->open = 0;  /* this function isn't used any more */
240     tdb->sync = uphys_sync;
241     tdb->stat = uphys_stat;
242     tdb->getlabel = uphys_getlabel;
243     tdb->setlabel = uphys_setlabel;
244     tdb->getnfiles = uphys_getnfiles;
245     tdb->readers=0;
246     tdb->tidCounter=tdb->writeTidCounter=0;
247     *dbase = tdb;
248     ubik_dbase = tdb;   /* for now, only one db per server; can fix later when we have names for the other dbases */
249
250     /* initialize RX */
251     ubik_callPortal = myPort;
252     /* try to get an additional security object */
253     ubik_sc[0] = rxnull_NewServerSecurityObject();
254     ubik_sc[1] = 0;
255     ubik_sc[2] = 0;
256     if (ubik_SRXSecurityProc) {
257         code = (*ubik_SRXSecurityProc)(ubik_SRXSecurityRock, &secClass, &secIndex);
258         if (code == 0) {
259             ubik_sc[secIndex] = secClass;
260         }
261     }
262     code = rx_Init(myPort);
263     if (code < 0) return code;
264     tservice = rx_NewService(0, VOTE_SERVICE_ID, "VOTE", ubik_sc, 3, VOTE_ExecuteRequest);
265     if (tservice == (struct rx_service *)0) {
266         ubik_dprint("Could not create VOTE rx service!\n");
267         return -1;
268     }
269     rx_SetMinProcs(tservice, 2);
270     rx_SetMaxProcs(tservice, 3);
271
272     tservice = rx_NewService(0, DISK_SERVICE_ID, "DISK", ubik_sc, 3, DISK_ExecuteRequest);
273     if (tservice == (struct rx_service *)0) {
274         ubik_dprint("Could not create DISK rx service!\n");
275         return -1;
276     }
277     rx_SetMinProcs(tservice, 2);
278     rx_SetMaxProcs(tservice, 3);
279
280     /* start an rx_ServerProc to handle incoming RPC's in particular the 
281      * UpdateInterfaceAddr RPC that occurs in ubeacon_InitServerList. This avoids
282      * the "steplock" problem in ubik initialization. Defect 11037.
283      */
284     LWP_CreateProcess(rx_ServerProc, rx_stackSize, RX_PROCESS_PRIORITY,
285                       0, "rx_ServerProc", &junk);
286
287     /* do basic initialization */
288     code = uvote_Init();
289     if (code) return code;
290     code = urecovery_Initialize(tdb);
291     if (code) return code;
292     if (info)
293         code = ubeacon_InitServerListByInfo(myHost, info, clones);
294     else 
295         code = ubeacon_InitServerList(myHost, serverList);
296     if (code) return code;
297
298     /* now start up async processes */
299     code = LWP_CreateProcess(ubeacon_Interact, 16384/*8192*/, LWP_MAX_PRIORITY-1,
300                              0, "beacon", &junk);
301     if (code) return code;
302     code = LWP_CreateProcess(urecovery_Interact, 16384/*8192*/, LWP_MAX_PRIORITY-1,
303                              0, "recovery", &junk);
304     return code;
305 }
306
307 /*  This routine begins a read or write transaction on the transaction
308     identified by transPtr, in the dbase named by dbase.  An open mode of
309     ubik_READTRANS identifies this as a read transaction, while a mode of
310     ubik_WRITETRANS identifies this as a write transaction.  transPtr 
311     is set to the returned transaction control block. The readAny flag is
312     set to 0 or 1 by the wrapper functions ubik_BeginTrans() or 
313     ubik_BeginTransReadAny() below.
314
315     We can only begin transaction when we have an up-to-date database.
316 */
317
318 static int BeginTrans(dbase, transMode, transPtr, readAny)
319     register struct ubik_dbase *dbase;  /* in */
320     int readAny;
321     afs_int32 transMode; /* in */
322     struct ubik_trans **transPtr;       /* out */ {
323     struct ubik_trans *jt;
324     register struct ubik_trans *tt;
325     register afs_int32 code;
326
327     if ((transMode != UBIK_READTRANS) && readAny) return UBADTYPE;
328     DBHOLD(dbase);
329     if (urecovery_AllBetter(dbase, readAny)==0) {
330         DBRELE(dbase);
331         return UNOQUORUM;
332     }
333     /* otherwise we have a quorum, use it */
334
335     /* make sure that at most one write transaction occurs at any one time.  This
336         has nothing to do with transaction locking; that's enforced by the lock package.  However,
337         we can't even handle two non-conflicting writes, since our log and recovery modules
338         don't know how to restore one without possibly picking up some data from the other. */
339     if (transMode == UBIK_WRITETRANS) {
340         /* if we're writing already, wait */
341         while(dbase->flags & DBWRITING) {
342             DBRELE(dbase);
343             LWP_WaitProcess(&dbase->flags);
344             DBHOLD(dbase);
345         }
346         if (!ubeacon_AmSyncSite()) {
347             DBRELE(dbase);
348             return UNOTSYNC;
349         }
350     }
351
352     /* create the transaction */
353     code = udisk_begin(dbase, transMode, &jt);  /* can't take address of register var */
354     tt = jt;        /* move to a register */
355     if (code || tt == (struct ubik_trans *)NULL) {
356         DBRELE(dbase);
357         return code;
358     }
359     if (readAny) tt->flags |= TRREADANY;
360     /* label trans and dbase with new tid */
361     tt->tid.epoch = ubik_epochTime;
362     /* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */
363     tt->tid.counter = (dbase->tidCounter += 2);
364
365     if (transMode == UBIK_WRITETRANS) {
366       /* for a write trans, we have to keep track of the write tid counter too */
367       dbase->writeTidCounter += 2;
368
369         /* next try to start transaction on appropriate number of machines */
370         code = ContactQuorum(DISK_Begin, tt, 0);
371         if (code) {
372             /* we must abort the operation */
373             udisk_abort(tt);
374             ContactQuorum(DISK_Abort, tt, 0);    /* force aborts to the others */
375             udisk_end(tt);
376             DBRELE(dbase);
377             return code;
378         }
379     }
380
381     *transPtr = tt;
382     DBRELE(dbase);
383     return 0;
384 }
385
386 int ubik_BeginTrans(dbase, transMode, transPtr)
387     register struct ubik_dbase *dbase;  /* in */
388     afs_int32 transMode; /* in */
389     struct ubik_trans **transPtr;       /* out */ {
390         return BeginTrans(dbase, transMode, transPtr, 0);
391 }
392
393 int ubik_BeginTransReadAny(dbase, transMode, transPtr)
394     register struct ubik_dbase *dbase;  /* in */
395     afs_int32 transMode; /* in */
396     struct ubik_trans **transPtr;       /* out */ {
397         return BeginTrans(dbase, transMode, transPtr, 1);
398
399  
400 /* this routine ends a read or write transaction by aborting it */
401 int ubik_AbortTrans(transPtr)
402     register struct ubik_trans *transPtr; /* in */ {
403     register afs_int32 code;
404     afs_int32 code2;
405     register struct ubik_dbase *dbase;
406     
407     dbase = transPtr->dbase;
408     DBHOLD(dbase);
409     memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
410     /* see if we're still up-to-date */
411     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
412         udisk_abort(transPtr);
413         udisk_end(transPtr);
414         DBRELE(dbase);
415         return UNOQUORUM;
416     }
417
418     if (transPtr->type == UBIK_READTRANS) {
419         code = udisk_abort(transPtr);
420         udisk_end(transPtr);
421         DBRELE(dbase);
422         return code;
423     }
424
425     /* below here, we know we're doing a write transaction */
426     if (!ubeacon_AmSyncSite()) {
427         udisk_abort(transPtr);
428         udisk_end(transPtr);
429         DBRELE(dbase);
430         return UNOTSYNC;
431     }
432     
433     /* now it is safe to try remote abort */
434     code = ContactQuorum(DISK_Abort, transPtr, 0);
435     code2 = udisk_abort(transPtr);
436     udisk_end(transPtr);
437     DBRELE(dbase);
438     return (code? code : code2);
439 }
440
441 /* This routine ends a read or write transaction on the open transaction identified by transPtr.  It returns an error code. */
442 int ubik_EndTrans(transPtr)
443     register struct ubik_trans *transPtr; /* in */ {
444     register afs_int32 code;
445     struct timeval tv;
446     afs_int32 realStart;
447     register struct ubik_server *ts;
448     afs_int32 now;
449     register struct ubik_dbase *dbase;
450     
451     if (transPtr->type == UBIK_WRITETRANS) {
452        code = ubik_Flush(transPtr);
453        if (code) {
454           ubik_AbortTrans(transPtr);
455           return(code);
456        }
457     }
458
459     dbase = transPtr->dbase;
460     DBHOLD(dbase);
461     memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
462
463     /* give up if no longer current */
464     if (!urecovery_AllBetter(dbase, transPtr->flags & TRREADANY)) {
465         udisk_abort(transPtr);
466         udisk_end(transPtr);
467         DBRELE(dbase);
468         return UNOQUORUM;
469     }
470
471     if (transPtr->type == UBIK_READTRANS) { /* reads are easy */
472         code = udisk_commit(transPtr);
473         if (code == 0) goto success;    /* update cachedVersion correctly */
474         udisk_end(transPtr);
475         DBRELE(dbase);
476         return code;
477     }
478
479     if (!ubeacon_AmSyncSite()) {    /* no longer sync site */
480         udisk_abort(transPtr);
481         udisk_end(transPtr);
482         DBRELE(dbase);
483         return UNOTSYNC;
484     }
485     
486     /* now it is safe to do commit */
487     code = udisk_commit(transPtr);
488     if (code == 0) code = ContactQuorum(DISK_Commit, transPtr, CStampVersion);
489     if (code) {
490         /* failed to commit, so must return failure.  Try to clear locks first, just for fun
491             Note that we don't know if this transaction will eventually commit at this point.
492             If it made it to a site that will be present in the next quorum, we win, otherwise
493             we lose.  If we contact a majority of sites, then we won't be here: contacting
494             a majority guarantees commit, since it guarantees that one dude will be a
495             member of the next quorum. */
496         ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
497         udisk_end(transPtr);
498         DBRELE(dbase);
499         return code;
500     }
501     /* before we can start sending unlock messages, we must wait until all servers
502         that are possibly still functioning on the other side of a network partition
503         have timed out.  Check the server structures, compute how long to wait, then
504         start the unlocks */
505     realStart = FT_ApproxTime();
506     while (1) {
507         /* wait for all servers to time out */
508         code = 0;
509         now = FT_ApproxTime();
510         /* check if we're still sync site, the guy should either come up
511             to us, or timeout.  Put safety check in anyway */
512         if (now - realStart > 10 * BIGTIME) {
513             ubik_stats.escapes++;
514             ubik_print("ubik escaping from commit wait\n");
515             break;
516         }
517         for(ts = ubik_servers; ts; ts=ts->next) {
518             if (!ts->beaconSinceDown && now <= ts->lastBeaconSent + BIGTIME) {
519                 /* this guy could have some damaged data, wait for him */
520                 code = 1;
521                 tv.tv_sec = 1;  /* try again after a while (ha ha) */
522                 tv.tv_usec = 0;
523                 IOMGR_Select(0, 0, 0, 0, &tv);  /* poll, should we wait on something? */
524                 break;
525             }
526         }
527         if (code == 0) break;       /* no down ones still pseudo-active */
528     }
529
530     /* finally, unlock all the dudes.  We can return success independent of the number of servers
531         that really unlock the dbase; the others will do it if/when they elect a new sync site.
532         The transaction is committed anyway, since we succeeded in contacting a quorum
533         at the start (when invoking the DiskCommit function).
534     */
535     ContactQuorum(DISK_ReleaseLocks, transPtr, 0);
536
537   success:
538     udisk_end(transPtr);
539     /* update version on successful EndTrans */
540     memcpy(&dbase->cachedVersion, &dbase->version, sizeof(struct ubik_version));
541     
542     DBRELE(dbase);
543     return 0;
544 }
545
546 /* This routine reads length bytes into buffer from the current position in the database.  The file pointer is updated appropriately (by adding the number of bytes actually transferred), and the length actually transferred is stored in the long integer pointed to by length.  Note that *length is an INOUT parameter: at the start it represents the size of the buffer, and when done, it contains the number of bytes actually transferred.  A short read returns zero for an error code. */
547
548 int ubik_Read(transPtr, buffer, length)
549     register struct ubik_trans *transPtr;       /* in */
550     char *buffer;   /* in */
551     afs_int32 length;   /* in */ {
552     register afs_int32 code;
553
554     /* reads are easy to do: handle locally */
555     DBHOLD(transPtr->dbase);
556     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
557         DBRELE(transPtr->dbase);
558         return UNOQUORUM;
559     }
560     
561     code = udisk_read(transPtr, transPtr->seekFile, buffer, transPtr->seekPos, length);
562     if (code == 0) {
563         transPtr->seekPos += length;
564     }
565     DBRELE(transPtr->dbase);
566     return code;
567 }
568
569 /* This routine will flush the io data in the iovec structures. It first
570  * flushes to the local disk and then uses ContactQuorum to write it to 
571  * the other servers.
572  */
573 int ubik_Flush(transPtr)
574     struct ubik_trans *transPtr;
575 {
576     afs_int32 code, error=0;
577
578     if (transPtr->type != UBIK_WRITETRANS)
579        return UBADTYPE;
580     if (!transPtr->iovec_info.iovec_wrt_len || !transPtr->iovec_info.iovec_wrt_val)
581        return 0;
582
583     DBHOLD(transPtr->dbase);
584     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
585        ERROR_EXIT(UNOQUORUM);
586     if (!ubeacon_AmSyncSite())      /* only sync site can write */
587        ERROR_EXIT(UNOTSYNC);
588
589     /* Update the rest of the servers in the quorum */
590     code = ContactQuorum(DISK_WriteV, transPtr, 0, 
591                          &transPtr->iovec_info, &transPtr->iovec_data);
592     if (code) {
593        udisk_abort(transPtr);
594        ContactQuorum(DISK_Abort, transPtr, 0);    /* force aborts to the others */
595        transPtr->iovec_info.iovec_wrt_len = 0;
596        transPtr->iovec_data.iovec_buf_len = 0;
597        ERROR_EXIT(code);
598     }
599
600     /* Wrote the buffers out, so start at scratch again */
601     transPtr->iovec_info.iovec_wrt_len = 0;
602     transPtr->iovec_data.iovec_buf_len = 0;
603
604   error_exit:
605     DBRELE(transPtr->dbase);
606     return error;
607 }
608
609 int ubik_Write(transPtr, buffer, length)
610     register struct ubik_trans *transPtr;       /* in */
611     char *buffer;       /* in */
612     afs_int32 length;   /* in */
613 {
614     struct ubik_iovec *iovec;
615     afs_int32 code, error=0;
616     afs_int32 pos, len, size;
617
618     if (transPtr->type != UBIK_WRITETRANS)
619        return UBADTYPE;
620     if (!length)
621        return 0;
622
623     if (length > IOVEC_MAXBUF) {
624        for (pos=0, len=length; len>0; len-=size, pos+=size) {
625           size = ((len < IOVEC_MAXBUF) ? len : IOVEC_MAXBUF);
626           code = ubik_Write(transPtr, &buffer[pos], size);
627           if (code) return (code);
628        }
629        return 0;
630     }
631
632     if (!transPtr->iovec_info.iovec_wrt_val) {
633        transPtr->iovec_info.iovec_wrt_len  = 0;
634        transPtr->iovec_info.iovec_wrt_val  = 
635          (struct ubik_iovec *)malloc(IOVEC_MAXWRT*sizeof(struct ubik_iovec));
636        transPtr->iovec_data.iovec_buf_len = 0;
637        transPtr->iovec_data.iovec_buf_val = (char *)malloc(IOVEC_MAXBUF);
638        if (!transPtr->iovec_info.iovec_wrt_val || !transPtr->iovec_data.iovec_buf_val) {
639           if (transPtr->iovec_info.iovec_wrt_val) free(transPtr->iovec_info.iovec_wrt_val);
640           transPtr->iovec_info.iovec_wrt_val = 0;
641           if (transPtr->iovec_data.iovec_buf_val) free(transPtr->iovec_data.iovec_buf_val);
642           transPtr->iovec_data.iovec_buf_val = 0;
643           return UNOMEM;
644        }
645     }
646
647     /* If this write won't fit in the structure, then flush it out and start anew */
648     if ( (transPtr->iovec_info.iovec_wrt_len >= IOVEC_MAXWRT) ||
649          ((length + transPtr->iovec_data.iovec_buf_len) > IOVEC_MAXBUF) ) {
650        code = ubik_Flush(transPtr);
651        if (code) return (code);
652     }
653
654     DBHOLD(transPtr->dbase);
655     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
656        ERROR_EXIT(UNOQUORUM);
657     if (!ubeacon_AmSyncSite())      /* only sync site can write */
658        ERROR_EXIT(UNOTSYNC);
659
660     /* Write to the local disk */
661     code = udisk_write(transPtr, transPtr->seekFile, buffer, 
662                                  transPtr->seekPos, length);
663     if (code) {
664        udisk_abort(transPtr);
665        transPtr->iovec_info.iovec_wrt_len = 0;
666        transPtr->iovec_data.iovec_buf_len = 0;
667        DBRELE(transPtr->dbase);
668        return(code);
669     }
670
671     /* Collect writes for the other ubik servers (to be done in bulk) */
672     iovec = (struct ubik_iovec *)transPtr->iovec_info.iovec_wrt_val;
673     iovec[transPtr->iovec_info.iovec_wrt_len].file     = transPtr->seekFile;
674     iovec[transPtr->iovec_info.iovec_wrt_len].position = transPtr->seekPos;
675     iovec[transPtr->iovec_info.iovec_wrt_len].length   = length;
676     
677     memcpy(&transPtr->iovec_data.iovec_buf_val[transPtr->iovec_data.iovec_buf_len], buffer, length);
678
679     transPtr->iovec_info.iovec_wrt_len++;
680     transPtr->iovec_data.iovec_buf_len += length;
681     transPtr->seekPos += length;
682
683   error_exit:
684     DBRELE(transPtr->dbase);
685     return error;
686 }
687
688 /* This sets the file pointer associated with the current transaction to the appropriate file and byte position.  Unlike Unix files, a transaction is labelled by both a file number (fileid) and a byte position relative to the specified file (position). */
689
690 int ubik_Seek(transPtr, fileid, position)
691     register struct ubik_trans *transPtr;       /* IN */
692     afs_int32 fileid;    /* IN */
693     afs_int32 position;  /* IN */ {
694     register afs_int32 code;
695
696     DBHOLD(transPtr->dbase);
697     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY)) {
698         code = UNOQUORUM;
699     } else  {
700         transPtr->seekFile = fileid;
701         transPtr->seekPos = position;
702         code = 0;
703     }
704     DBRELE(transPtr->dbase);
705     return code;
706 }
707
708 /* This call returns the file pointer associated with the specified transaction in fileid and position. */
709
710 int ubik_Tell(transPtr, fileid, position)
711     register struct ubik_trans *transPtr;       /* IN */
712     afs_int32 *fileid;   /* OUT */
713     afs_int32 *position; /* OUT */ {
714     DBHOLD(transPtr->dbase);
715     *fileid = transPtr->seekFile;
716     *position = transPtr->seekPos;
717     DBRELE(transPtr->dbase);
718     return 0;
719 }
720
721 /* This sets the file size for the currently-selected file to length bytes, if length is less than the file's current size. */
722
723 int ubik_Truncate(transPtr, length)
724     register struct ubik_trans *transPtr;       /* in */
725     afs_int32 length;    /* in */ {
726     afs_int32 code, error=0;
727
728     /* Will also catch if not UBIK_WRITETRANS */
729     code = ubik_Flush(transPtr);
730     if (code) return(code);
731
732     DBHOLD(transPtr->dbase);
733     /* first, check that quorum is still good, and that dbase is up-to-date */
734     if (!urecovery_AllBetter(transPtr->dbase, transPtr->flags & TRREADANY))
735        ERROR_EXIT(UNOQUORUM);
736     if (!ubeacon_AmSyncSite())
737        ERROR_EXIT(UNOTSYNC);
738     
739     /* now do the operation locally, and propagate it out */
740     code = udisk_truncate(transPtr, transPtr->seekFile, length);
741     if (!code) {
742         code = ContactQuorum(DISK_Truncate, transPtr, 0, transPtr->seekFile, length);
743     }
744     if (code) {
745         /* we must abort the operation */
746         udisk_abort(transPtr);
747         ContactQuorum(DISK_Abort, transPtr, 0);    /* force aborts to the others */
748         ERROR_EXIT(code);
749     }
750
751   error_exit:
752     DBRELE(transPtr->dbase);
753     return error;
754 }
755
756 /* set a lock; all locks are released on transaction end (commit/abort) */
757 ubik_SetLock(atrans, apos, alen, atype)
758     struct ubik_trans *atrans;
759     afs_int32 apos, alen;                  /* apos and alen are not used */
760     int atype; {
761     afs_int32 code=0, error=0;
762     
763     if (atype == LOCKWRITE) {
764        if (atrans->type == UBIK_READTRANS) return UBADTYPE;
765        code = ubik_Flush(atrans);
766        if (code) return(code);
767     }
768
769     DBHOLD(atrans->dbase);
770     if (atype == LOCKREAD) {
771         code = ulock_getLock(atrans, atype, 1);
772         if (code) ERROR_EXIT(code);
773     }
774     else {
775         /* first, check that quorum is still good, and that dbase is up-to-date */
776         if (!urecovery_AllBetter(atrans->dbase, atrans->flags & TRREADANY))
777            ERROR_EXIT(UNOQUORUM);
778         if (!ubeacon_AmSyncSite())
779            ERROR_EXIT(UNOTSYNC);
780
781         /* now do the operation locally, and propagate it out */
782         code = ulock_getLock(atrans, atype, 1);
783         if (code == 0) {
784             code = ContactQuorum(DISK_Lock, atrans, 0, 0, 
785                                  1/*unused*/, 1/*unused*/, LOCKWRITE);
786         }
787         if (code) {
788             /* we must abort the operation */
789             udisk_abort(atrans);
790             ContactQuorum(DISK_Abort, atrans, 0);    /* force aborts to the others */
791             ERROR_EXIT(code);
792         }
793     }
794
795   error_exit:
796     DBRELE(atrans->dbase);
797     return error;
798 }
799
800 /* utility to wait for a version # to change */
801 int ubik_WaitVersion(adatabase, aversion)
802 register struct ubik_version *aversion;
803 register struct ubik_dbase *adatabase; {
804     while (1) {
805         /* wait until version # changes, and then return */
806         if (vcmp(*aversion, adatabase->version) != 0)
807             return 0;
808         LWP_WaitProcess(&adatabase->version);   /* same vers, just wait */
809     }
810 }
811
812 /* utility to get the version of the dbase a transaction is dealing with */
813 int ubik_GetVersion(atrans, avers)
814 register struct ubik_trans *atrans;
815 register struct ubik_version *avers; {
816     *avers = atrans->dbase->version;
817     return 0;
818 }
819
820 /* Facility to simplify database caching.  Returns zero if last trans was done
821    on the local server and was successful.  If return value is non-zero and the
822    caller is a server caching part of the Ubik database, it should invalidate
823    that cache.  A return value of -1 means bad (NULL) argument. */
824
825 int ubik_CacheUpdate (atrans)
826   register struct ubik_trans *atrans;
827 {
828     if (!(atrans && atrans->dbase)) return -1;
829     return vcmp(atrans->dbase->cachedVersion, atrans->dbase->version) != 0;
830 }
831
832 int panic(a, b, c, d)
833     char *a, *b, *c, *d;
834 {
835     ubik_print("Ubik PANIC: ");
836     ubik_print(a, b, c, d);
837     abort();
838     ubik_print("BACK FROM ABORT\n");    /* shouldn't come back */
839     exit(1);                        /* never know, though  */
840 }
841
842 /*
843 ** This functions takes an IP addresses as its parameter. It returns the
844 ** the primary IP address that is on the host passed in.
845 */
846 afs_uint32
847 ubikGetPrimaryInterfaceAddr(addr)
848 afs_uint32  addr;                       /* network byte order */
849 {
850         struct ubik_server *ts;
851         int j;
852
853         for ( ts=ubik_servers; ts; ts=ts->next )
854             for ( j=0; j < UBIK_MAX_INTERFACE_ADDR; j++)
855                 if ( ts->addr[j] == addr )
856                     return  ts->addr[0];        /* net byte order */
857         return 0; /* if not in server database, return error */
858 }
859