include-afsconfig-before-param-h-20010712
[openafs.git] / src / ubik / disk.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 RCSID("$Header$");
14
15 #include <sys/types.h>
16 #ifdef AFS_NT40_ENV
17 #include <winsock2.h>
18 #else
19 #include <sys/file.h>
20 #include <netinet/in.h>
21 #endif
22 #include <errno.h>
23 #include <lock.h>
24 #include <rx/xdr.h>
25
26
27
28 #define UBIK_INTERNALS
29 #include "ubik.h"
30 #include "ubik_int.h"
31
32 #define PHSIZE  128
33 struct buffer  {
34     struct ubik_dbase *dbase;       /* dbase within which the buffer resides */
35     afs_int32 file;                         /* Unique cache key */
36     afs_int32 page;                         /* page number */
37     struct buffer *lru_next;
38     struct buffer *lru_prev;
39     struct buffer *hashNext;        /* next dude in hash table */
40     char *data;                     /* ptr to the data */
41     char lockers;                   /* usage ref count */
42     char dirty;                     /* is buffer modified */
43     char hashIndex;                 /* back ptr to hash table */
44 } *Buffers;
45
46 #define pHash(page) ((page) & (PHSIZE-1))
47
48 afs_int32 ubik_nBuffers = NBUFFERS;
49 static struct buffer *phTable[PHSIZE];  /* page hash table */
50 static struct buffer *LruBuffer;
51 static int nbuffers;
52 static int calls=0, ios=0, lastb=0;
53 static char *BufferData;
54 static struct buffer *newslot();
55 static initd = 0;
56 #define BADFID      0xffffffff
57
58 static DTrunc();
59
60 static struct ubik_trunc *freeTruncList=0;
61
62 /* remove a transaction from the database's active transaction list.  Don't free it */
63 static unthread(atrans)
64     struct ubik_trans *atrans; {
65     struct ubik_trans **lt, *tt;
66     lt = &atrans->dbase->activeTrans;
67     for(tt = *lt; tt; lt = &tt->next, tt = *lt) {
68         if (tt == atrans) {
69             /* found it */
70             *lt = tt->next;
71             return 0;
72         }
73     }
74     return 2;   /* no entry */
75 }
76
77 /* some debugging assistance */
78 udisk_Debug(aparm)
79     struct ubik_debug *aparm; {
80     struct buffer *tb;
81     int i;
82
83     bcopy(&ubik_dbase->version, &aparm->localVersion, sizeof(struct ubik_version));
84     aparm->lockedPages = 0;
85     aparm->writeLockedPages = 0;
86     tb = Buffers;
87     for(i=0;i<nbuffers;i++, tb++) {
88         if (tb->lockers) {
89             aparm->lockedPages++;
90             if (tb->dirty) aparm->writeLockedPages++;
91         }
92     }
93 }
94
95 /* log format is defined here, and implicitly in recovery.c
96  *
97  * 4 byte opcode, followed by parameters, each 4 bytes long.  All integers
98  * are in logged in network standard byte order, in case we want to move logs
99  * from machine-to-machine someday.
100  *
101  * Begin transaction: opcode
102  * Commit transaction: opcode, version (8 bytes)
103  * Truncate file: opcode, file number, length
104  * Abort transaction: opcode
105  * Write data: opcode, file, position, length, <length> data bytes
106  */
107
108 /* write an opcode to the log */
109 udisk_LogOpcode(adbase, aopcode, async)
110     struct ubik_dbase *adbase;
111     afs_int32 aopcode;
112     int async; {
113     struct ubik_stat ustat;
114     afs_int32 code;
115     
116     /* figure out where to write */
117     code = (*adbase->stat)(adbase, LOGFILE, &ustat);
118     if (code < 0) return code;
119
120     /* setup data and do write */
121     aopcode = htonl(aopcode);
122     code = (*adbase->write)(adbase, LOGFILE, &aopcode, ustat.size, sizeof(afs_int32));
123     if (code != sizeof(afs_int32)) return UIOERROR;
124
125     /* optionally sync data */
126     if (async) code = (*adbase->sync)(adbase, LOGFILE);
127     else code = 0;
128     return code;
129 }
130
131 /* log a commit, never syncing */
132 udisk_LogEnd(adbase, aversion)
133     struct ubik_dbase *adbase;
134     struct ubik_version *aversion; {
135     afs_int32 code;
136     afs_int32 data[3];
137     struct ubik_stat ustat;
138     
139     /* figure out where to write */
140     code = (*adbase->stat)(adbase, LOGFILE, &ustat);
141     if (code) return code;
142
143     /* setup data */
144     data[0] = htonl(LOGEND);
145     data[1] = htonl(aversion->epoch);
146     data[2] = htonl(aversion->counter);
147     
148     /* do write */
149     code = (*adbase->write)(adbase, LOGFILE, data, ustat.size, 3*sizeof(afs_int32));
150     if (code != 3*sizeof(afs_int32)) return UIOERROR;
151
152     /* finally sync the log */
153     code = (*adbase->sync)(adbase, LOGFILE);
154     return code;
155 }
156     
157 /* log a truncate operation, never syncing */
158 udisk_LogTruncate(adbase, afile, alength)
159     struct ubik_dbase *adbase;
160     afs_int32 afile, alength; {
161     afs_int32 code;
162     afs_int32 data[3];
163     struct ubik_stat ustat;
164     
165     /* figure out where to write */
166     code = (*adbase->stat)(adbase, LOGFILE, &ustat);
167     if (code < 0) return code;
168
169     /* setup data */
170     data[0] = htonl(LOGTRUNCATE);
171     data[1] = htonl(afile);
172     data[2] = htonl(alength);
173     
174     /* do write */
175     code = (*adbase->write)(adbase, LOGFILE, data, ustat.size, 3*sizeof(afs_int32));
176     if (code != 3*sizeof(afs_int32)) return UIOERROR;
177     return 0;
178 }
179     
180 /* write some data to the log, never syncing */
181 udisk_LogWriteData(adbase, afile, abuffer, apos, alen)
182     struct ubik_dbase *adbase;
183     char *abuffer;
184     afs_int32 afile;
185     afs_int32 apos;
186     afs_int32 alen; {
187     struct ubik_stat ustat;
188     afs_int32 code;
189     afs_int32 data[4];
190     afs_int32 lpos;
191     
192     /* find end of log */
193     code = (*adbase->stat)(adbase, LOGFILE, &ustat);
194     lpos = ustat.size;
195     if (code < 0) return code;
196
197     /* setup header */
198     data[0] = htonl(LOGDATA);
199     data[1] = htonl(afile);
200     data[2] = htonl(apos);
201     data[3] = htonl(alen);
202
203     /* write header */
204     code = (*adbase->write)(adbase, LOGFILE, data, lpos, 4*sizeof(afs_int32));
205     if (code != 4*sizeof(afs_int32)) return UIOERROR;
206     lpos += 4*sizeof(afs_int32);
207     
208     /* write data */
209     code = (*adbase->write)(adbase, LOGFILE, abuffer, lpos, alen);
210     if (code != alen) return UIOERROR;
211     return 0;
212 }
213
214 static int DInit (abuffers)
215     int abuffers; {
216     /* Initialize the venus buffer system. */
217     int i;
218     struct buffer *tb;
219     Buffers = (struct buffer *) malloc(abuffers * sizeof(struct buffer));
220     bzero(Buffers, abuffers * sizeof(struct buffer));
221     BufferData = (char *) malloc(abuffers * PAGESIZE);
222     nbuffers = abuffers;
223     for(i=0;i<PHSIZE;i++) phTable[i] = 0;
224     for (i=0;i<abuffers;i++) {
225         /* Fill in each buffer with an empty indication. */
226         tb = &Buffers[i];
227         tb->lru_next = &(Buffers[i+1]);
228         tb->lru_prev = &(Buffers[i-1]);
229         tb->data = &BufferData[PAGESIZE*i];
230         tb->file = BADFID;
231     }
232     Buffers[0].lru_prev = &(Buffers[abuffers-1]);
233     Buffers[abuffers-1].lru_next = &(Buffers[0]);
234     LruBuffer  = &(Buffers[0]);
235     return 0;
236 }
237
238 /* Take a buffer and mark it as the least recently used buffer */
239 static int Dlru(abuf)
240   struct buffer *abuf;
241 {
242   if (LruBuffer == abuf)
243      return 0;
244
245   /* Unthread from where it is in the list */
246   abuf->lru_next->lru_prev = abuf->lru_prev;
247   abuf->lru_prev->lru_next = abuf->lru_next;
248
249   /* Thread onto beginning of LRU list */
250   abuf->lru_next = LruBuffer;
251   abuf->lru_prev = LruBuffer->lru_prev;
252
253   LruBuffer->lru_prev->lru_next = abuf;
254   LruBuffer->lru_prev = abuf;
255   LruBuffer = abuf;
256 }
257
258 /* Take a buffer and mark it as the most recently used buffer */
259 static int Dmru(abuf)
260      struct buffer *abuf;
261 {
262   if (LruBuffer == abuf) {
263      LruBuffer = LruBuffer->lru_next;
264      return 0;
265   }
266
267   /* Unthread from where it is in the list */
268   abuf->lru_next->lru_prev = abuf->lru_prev;
269   abuf->lru_prev->lru_next = abuf->lru_next;
270
271   /* Thread onto end of LRU list - making it the MRU buffer */
272   abuf->lru_next = LruBuffer;
273   abuf->lru_prev = LruBuffer->lru_prev;
274   LruBuffer->lru_prev->lru_next = abuf;
275   LruBuffer->lru_prev = abuf;
276
277 }
278
279 /* get a pointer to a particular buffer */
280 static char *DRead(dbase, fid, page)
281     struct ubik_dbase *dbase;
282     afs_int32 fid;
283     int page; {
284     /* Read a page from the disk. */
285     struct buffer *tb, *lastbuffer;
286     afs_int32 trys, code;
287
288     calls++;
289     lastbuffer = LruBuffer->lru_prev;
290
291     if ((lastbuffer->page  == page ) && 
292         (lastbuffer->file  == fid  ) && 
293         (lastbuffer->dbase == dbase)) {
294         tb = lastbuffer;
295         tb->lockers++;
296         lastb++;
297         return tb->data;
298     }
299     for(tb=phTable[pHash(page)]; tb; tb=tb->hashNext) {
300         if (tb->page == page && tb->file == fid && tb->dbase == dbase) {
301             Dmru(tb);
302             tb->lockers++;
303             return tb->data;
304         }
305     }
306     /* can't find it */
307     tb = newslot(dbase, fid, page);
308     if (!tb) return 0;
309     bzero(tb->data, PAGESIZE);
310
311     tb->lockers++;
312     code = (*dbase->read)(dbase, fid, tb->data, page*PAGESIZE, PAGESIZE);
313     if (code < 0) {
314        tb->file = BADFID;
315        Dlru(tb);
316        tb->lockers--;
317        ubik_print("Ubik: Error reading database file: errno=%d\n", errno);
318        return 0;
319     }
320     ios++;
321
322     /* Note that findslot sets the page field in the buffer equal to
323      * what it is searching for.
324      */
325     return tb->data;
326 }
327
328 /* zap truncated pages */
329 static DTrunc(dbase, fid, length)
330     struct ubik_dbase *dbase;
331     afs_int32 fid;
332     afs_int32 length; {
333     afs_int32 maxPage;
334     struct buffer *tb;
335     int   i;
336     
337     maxPage = (length+PAGESIZE-1)>>LOGPAGESIZE; /* first invalid page now in file */
338     for (i=0,tb=Buffers; i<nbuffers; i++,tb++) {
339         if (tb->page >= maxPage && tb->file == fid && tb->dbase == dbase) {
340             tb->file = BADFID;
341             Dlru(tb);
342         }
343     }
344     return 0;
345 }
346
347 /* allocate a truncation entry.  We allocate special entries representing truncations, rather than
348     performing them immediately, so that we can abort a transaction easily by simply purging
349     the in-core memory buffers and discarding these truncation entries.
350 */
351 static struct ubik_trunc *GetTrunc() {
352     struct ubik_trunc *tt;
353     if (!freeTruncList) {
354         freeTruncList = (struct ubik_trunc *) malloc(sizeof(struct ubik_trunc));
355         freeTruncList->next = (struct ubik_trunc *) 0;
356     }
357     tt = freeTruncList;
358     freeTruncList = tt->next;
359     return tt;
360 }
361
362 /* free a truncation entry */
363 static PutTrunc(at)
364     struct ubik_trunc *at; {
365     at->next = freeTruncList;
366     freeTruncList = at;
367     return 0;
368 }
369
370 /* find a truncation entry for a file, if any */
371 static struct ubik_trunc *FindTrunc(atrans, afile)
372     struct ubik_trans *atrans;
373     afs_int32 afile; {
374     struct ubik_trunc *tt;
375     for(tt=atrans->activeTruncs; tt; tt=tt->next) {
376         if (tt->file == afile) return tt;
377     }
378     return (struct ubik_trunc *) 0;
379 }
380
381 /* do truncates associated with trans, and free them */
382 static DoTruncs(atrans)
383     struct ubik_trans *atrans; {
384     struct ubik_trunc *tt, *nt;
385     int (*tproc)();
386     afs_int32 rcode=0, code;
387
388     tproc = atrans->dbase->truncate;
389     for(tt = atrans->activeTruncs; tt; tt=nt) {
390         nt = tt->next;
391         DTrunc(atrans->dbase, tt->file, tt->length);    /* zap pages from buffer cache */
392         code = (*tproc)(atrans->dbase, tt->file, tt->length);
393         if (code) rcode = code;
394         PutTrunc(tt);
395     }
396     /* don't unthread, because we do the entire list's worth here */
397     atrans->activeTruncs = (struct ubik_trunc *) 0;
398     return(rcode);
399 }
400
401 /* mark a fid as invalid */
402 udisk_Invalidate(adbase, afid)
403 struct ubik_dbase *adbase;
404 afs_int32 afid; {
405     struct buffer *tb;
406     int    i;
407
408     for (i=0,tb=Buffers; i<nbuffers; i++,tb++) {
409         if (tb->file == afid) {
410             tb->file = BADFID;
411             Dlru(tb);
412         }
413     }
414     return 0;
415 }
416
417 /* move this page into the correct hash bucket */
418 static FixupBucket(ap)
419     struct buffer *ap; {
420     struct buffer **lp, *tp;
421     int i;
422     /* first try to get it out of its current hash bucket, in which it might not be */
423     i = ap->hashIndex;
424     lp = &phTable[i];
425     for(tp = *lp; tp; tp=tp->hashNext) {
426         if (tp == ap) {
427             *lp = tp->hashNext;
428             break;
429         }
430         lp = &tp->hashNext;
431     }
432     /* now figure the new hash bucket */
433     i = pHash(ap->page);
434     ap->hashIndex = i;          /* remember where we are for deletion */
435     ap->hashNext = phTable[i];  /* add us to the list */
436     phTable[i] = ap;
437 }
438
439 /* create a new slot for a particular dbase page */
440 static struct buffer *newslot (adbase, afid, apage)
441     struct ubik_dbase *adbase;
442     afs_int32 afid, apage; {
443     /* Find a usable buffer slot */
444     afs_int32 i;
445     struct buffer *pp, *tp;
446
447     pp = 0;             /* last pure */
448     for (i=0,tp=LruBuffer; i<nbuffers; i++,tp=tp->lru_next) {
449        if (!tp->lockers && !tp->dirty) {
450           pp = tp;
451           break;
452        }
453     }
454
455     if (pp == 0) {
456         /* There are no unlocked buffers that don't need to be written to the disk. */
457         ubik_print("Ubik: Internal Error: Unable to find free buffer in ubik cache\n");
458         return (struct buffer *) 0;
459     }
460
461     /* Now fill in the header. */
462     pp->dbase = adbase;
463     pp->file = afid;
464     pp->page = apage;
465
466     FixupBucket(pp);            /* move to the right hash bucket */
467     Dmru(pp);
468     return pp;
469 }
470
471 /* Release a buffer, specifying whether or not the buffer has been modified by the locker. */
472 static DRelease (ap,flag)
473     char *ap;
474     int flag; {
475     int index;
476     struct buffer *bp;
477
478     if (!ap) return;
479     index = (ap - (char *)BufferData) >> LOGPAGESIZE;
480     bp = &(Buffers[index]);
481     bp->lockers--;
482     if (flag) bp->dirty=1;
483     return 0;
484 }
485
486 /* flush all modified buffers, leaves dirty bits set (they're cleared
487  * by DSync).  Note interaction with DSync: you call this thing first,
488  * writing the buffers to the disk.  Then you call DSync to sync all the
489  * files that were written, and to clear the dirty bits.  You should
490  * always call DFlush/DSync as a pair.
491  */
492 static DFlush (adbase)
493     struct ubik_dbase *adbase; {
494     int i;
495     afs_int32 code;
496     struct buffer *tb;
497
498     tb = Buffers;
499     for(i=0;i<nbuffers;i++,tb++) {
500         if (tb->dirty) {
501             code = tb->page * PAGESIZE; /* offset within file */
502             code = (*adbase->write)(adbase, tb->file, tb->data, code, PAGESIZE);
503             if (code != PAGESIZE) return UIOERROR;
504         }
505     }
506     return 0;
507 }
508
509 /* flush all modified buffers */
510 static DAbort (adbase)
511     struct ubik_dbase *adbase; {
512     int i;
513     struct buffer *tb;
514
515     tb = Buffers;
516     for(i=0;i<nbuffers;i++,tb++) {
517         if (tb->dirty) {
518             tb->dirty = 0;
519             tb->file = BADFID;
520             Dlru(tb);
521         }
522     }
523     return 0;
524 }
525
526 /* must only be called after DFlush, due to its interpretation of dirty flag */
527 static DSync(adbase)
528     struct ubik_dbase *adbase; {
529     int i;
530     afs_int32 code;
531     struct buffer *tb;
532     afs_int32 file;
533     afs_int32 rCode;
534
535     rCode = 0;
536     while (1) {
537         file = BADFID;
538         for(i=0,tb = Buffers; i<nbuffers; i++,tb++) {
539             if (tb->dirty == 1) {
540                 if (file == BADFID) file = tb->file;
541                 if (file != BADFID && tb->file == file) tb->dirty = 0;
542             }
543         }
544         if (file == BADFID) break;
545         /* otherwise we have a file to sync */
546         code = (*adbase->sync)(adbase, file);
547         if (code) rCode = code;
548     }
549     return rCode;
550 }
551
552 /* Same as read, only do not even try to read the page */
553 static char *DNew (dbase, fid, page)
554     struct ubik_dbase *dbase;
555     int page;
556     afs_int32 fid; {
557     struct buffer *tb;
558
559     if ((tb = newslot(dbase, fid, page)) == 0) return (char *) 0;
560     tb->lockers++;
561     bzero(tb->data, PAGESIZE);
562     return tb->data;
563 }
564
565 /* read data from database */
566 udisk_read(atrans, afile, abuffer, apos, alen)
567     afs_int32 afile;
568     char *abuffer;
569     afs_int32 apos, alen;
570     struct ubik_trans *atrans; {
571     char *bp;
572     afs_int32 offset, len, totalLen;
573     struct ubik_dbase *dbase;
574
575     if (atrans->flags & TRDONE) return UDONE;
576     totalLen = 0;
577     dbase = atrans->dbase;
578     while (alen > 0) {
579         bp = DRead(dbase, afile, apos>>LOGPAGESIZE);
580         if (!bp) return UEOF;
581         /* otherwise, min of remaining bytes and end of buffer to user mode */
582         offset = apos & (PAGESIZE-1);
583         len = PAGESIZE - offset;
584         if (len > alen) len = alen;
585         bcopy(bp+offset, abuffer, len);
586         abuffer += len;
587         apos += len;
588         alen -= len;
589         totalLen += len;
590         DRelease(bp, 0);
591     }
592     return 0;
593 }
594
595 /* truncate file */
596 udisk_truncate(atrans, afile, alength)
597     struct ubik_trans *atrans;
598     afs_int32 afile;
599     afs_int32 alength; {
600     afs_int32 code;
601     struct ubik_trunc *tt;
602
603     if (atrans->flags & TRDONE) return UDONE;
604     if (atrans->type != UBIK_WRITETRANS) return UBADTYPE;
605
606     /* write a truncate log record */
607     code = udisk_LogTruncate(atrans->dbase, afile, alength);
608
609     /* don't truncate until commit time */
610     tt = FindTrunc(atrans, afile);
611     if (!tt) {
612         /* this file not truncated yet */
613         tt=GetTrunc();
614         tt->next = atrans->activeTruncs;
615         atrans->activeTruncs = tt;
616         tt->file = afile;
617         tt->length = alength;
618     }
619     else {
620         /* already truncated to a certain length */
621         if (tt->length > alength) tt->length = alength;
622     }
623     return code;
624 }
625
626 /* write data to database, using logs */
627 udisk_write(atrans, afile, abuffer, apos, alen)
628     afs_int32 afile;
629     char *abuffer;
630     afs_int32 apos, alen;
631     struct ubik_trans *atrans; {
632     char *bp;
633     afs_int32 offset, len, totalLen;
634     struct ubik_dbase *dbase;
635     struct ubik_trunc *tt;
636     afs_int32 code;
637
638     if (atrans->flags & TRDONE) return UDONE;
639     if (atrans->type != UBIK_WRITETRANS) return UBADTYPE;
640
641     dbase = atrans->dbase;
642     /* first write the data to the log */
643     code = udisk_LogWriteData(dbase, afile, abuffer, apos, alen);
644     if (code) return code;
645
646     /* expand any truncations of this file */
647     tt = FindTrunc(atrans, afile);
648     if (tt) {
649         if (tt->length < apos + alen) {
650             tt->length = apos + alen;
651         }
652     }
653
654     /* now update vm */
655     totalLen = 0;
656     while (alen > 0) {
657         bp = DRead(dbase, afile, apos>>LOGPAGESIZE);
658         if (!bp) {
659             bp = DNew(dbase, afile, apos>>LOGPAGESIZE);
660             if (!bp) return UIOERROR;
661             bzero(bp, PAGESIZE);
662         }
663         /* otherwise, min of remaining bytes and end of buffer to user mode */
664         offset = apos & (PAGESIZE-1);
665         len = PAGESIZE-offset;
666         if (len > alen) len = alen;
667         bcopy(abuffer, bp+offset, len);
668         abuffer += len;
669         apos += len;
670         alen -= len;
671         totalLen += len;
672         DRelease(bp, 1);    /* buffer modified */
673     }
674     return 0;
675 }
676
677 /* begin a new local transaction */
678 udisk_begin(adbase, atype, atrans)
679     struct ubik_trans **atrans;
680     int atype;
681     struct ubik_dbase *adbase; {
682     afs_int32 code;
683     struct ubik_trans *tt;
684
685     *atrans = (struct ubik_trans *)NULL;
686     /* Make sure system is initialized before doing anything */
687     if (!initd) {
688         initd = 1;
689         DInit(ubik_nBuffers);
690     }
691     if (atype == UBIK_WRITETRANS) {
692         if (adbase->flags & DBWRITING) return USYNC;
693         code = udisk_LogOpcode(adbase, LOGNEW, 0);
694         if (code) return code;
695     }
696     tt = (struct ubik_trans *) malloc(sizeof(struct ubik_trans));
697     bzero(tt, sizeof(struct ubik_trans));
698     tt->dbase = adbase;
699     tt->next = adbase->activeTrans;
700     adbase->activeTrans = tt;
701     tt->type = atype;
702     if (atype == UBIK_READTRANS) adbase->readers++;
703     else if (atype == UBIK_WRITETRANS) adbase->flags |= DBWRITING;
704     *atrans = tt;
705     return 0;
706 }
707
708 /* commit transaction */
709 udisk_commit(atrans)
710     struct ubik_trans *atrans; {
711     struct ubik_dbase *dbase;
712     afs_int32 code=0;
713     struct ubik_version oldversion, newversion;
714
715     if (atrans->flags & TRDONE)
716        return(UTWOENDS);
717
718     if (atrans->type == UBIK_WRITETRANS) {
719         dbase = atrans->dbase;
720
721         /* On the first write to the database. We update the versions */
722         if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
723            oldversion = dbase->version;
724            newversion.epoch   = FT_ApproxTime();;
725            newversion.counter = 1;
726           
727            code = (*dbase->setlabel)(dbase, 0, &newversion);
728            if (code) return(code);
729            ubik_epochTime = newversion.epoch;
730            dbase->version = newversion;
731
732            /* Ignore the error here. If the call fails, the site is
733             * marked down and when we detect it is up again, we will 
734             * send the entire database to it.
735             */
736            ContactQuorum(DISK_SetVersion, atrans, 1/*CStampVersion*/,
737                          &oldversion, &newversion);
738            urecovery_state |= UBIK_RECLABELDB;
739         }
740
741         dbase->version.counter++;       /* bump commit count */
742         LWP_NoYieldSignal(&dbase->version);
743
744         code = udisk_LogEnd(dbase, &dbase->version);
745         if (code) {
746            dbase->version.counter--;
747            return(code);
748         }
749
750         /* If we fail anytime after this, then panic and let the
751          * recovery replay the log. 
752          */
753         code = DFlush(dbase);   /* write dirty pages to respective files */
754         if (code) panic("Writing Ubik DB modifications\n");
755         code = DSync(dbase);    /* sync the files and mark pages not dirty */
756         if (code) panic("Synchronizing Ubik DB modifications\n");
757
758         code = DoTruncs(atrans); /* Perform requested truncations */
759         if (code) panic("Truncating Ubik DB\n");
760
761         /* label the committed dbase */
762         code = (*dbase->setlabel)(dbase, 0, &dbase->version);
763         if (code) panic("Truncating Ubik DB\n");
764
765         code = (*dbase->truncate)(dbase, LOGFILE, 0);   /* discard log (optional) */
766         if (code) panic("Truncating Ubik logfile\n");
767
768     }
769
770     /* When the transaction is marked done, it also means the logfile
771      * has been truncated.
772      */
773     atrans->flags |= TRDONE;
774     return code;
775 }
776
777 /* abort transaction */
778 udisk_abort(atrans)
779     struct ubik_trans *atrans;
780 {
781     struct ubik_dbase *dbase;
782     afs_int32 code;
783     
784     if (atrans->flags & TRDONE)
785        return UTWOENDS;
786
787     /* Check if we are the write trans before logging abort, lest we
788      * abort a good write trans in progress. 
789      * We don't really care if the LOGABORT gets to the log because we 
790      * truncate the log next. If the truncate fails, we panic; for 
791      * otherwise, the log entries remain. On restart, replay of the log
792      * will do nothing because the abort is there or no LogEnd opcode.
793      */
794     dbase = atrans->dbase;
795     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
796         udisk_LogOpcode(dbase, LOGABORT, 1);
797         code = (*dbase->truncate)(dbase, LOGFILE, 0);
798         if (code) panic("Truncating Ubik logfile during an abort\n");
799         DAbort(dbase);         /* remove all dirty pages */
800     }
801
802     /* When the transaction is marked done, it also means the logfile
803      * has been truncated.
804      */
805     atrans->flags |= (TRABORT | TRDONE);
806     return 0;
807 }
808
809 /* destroy a transaction after it has been committed or aborted.  if
810  * it hasn't committed before you call this routine, we'll abort the
811  * transaction for you.
812  */
813 udisk_end(atrans)
814     struct ubik_trans *atrans; {
815     struct ubik_dbase *dbase;
816
817     if (!(atrans->flags & TRDONE)) udisk_abort(atrans);
818     dbase = atrans->dbase;
819
820     ulock_relLock(atrans);
821     unthread(atrans);
822
823     /* check if we are the write trans before unsetting the DBWRITING bit, else
824      * we could be unsetting someone else's bit.
825      */
826     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
827        dbase->flags &= ~DBWRITING;
828     } else {
829        dbase->readers--;
830     }
831     if (atrans->iovec_info.iovec_wrt_val) free(atrans->iovec_info.iovec_wrt_val);
832     if (atrans->iovec_data.iovec_buf_val) free(atrans->iovec_data.iovec_buf_val);
833     free(atrans);
834
835     /* Wakeup any writers waiting in BeginTrans() */
836     LWP_NoYieldSignal(&dbase->flags);
837     return 0;
838 }