bf9785b9f20f82016ff70e11081899ba2de057df
[openafs.git] / src / ubik / disk.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 RCSID("$Header$");
14
15 #include <sys/types.h>
16 #ifdef AFS_NT40_ENV
17 #include <winsock2.h>
18 #else
19 #include <sys/file.h>
20 #include <netinet/in.h>
21 #endif
22 #include <errno.h>
23 #include <lock.h>
24 #include <rx/xdr.h>
25
26
27
28 #define UBIK_INTERNALS
29 #include "ubik.h"
30 #include "ubik_int.h"
31
32 #define PHSIZE  128
33 struct buffer  {
34     struct ubik_dbase *dbase;       /* dbase within which the buffer resides */
35     afs_int32 file;                         /* Unique cache key */
36     afs_int32 page;                         /* page number */
37     struct buffer *lru_next;
38     struct buffer *lru_prev;
39     struct buffer *hashNext;        /* next dude in hash table */
40     char *data;                     /* ptr to the data */
41     char lockers;                   /* usage ref count */
42     char dirty;                     /* is buffer modified */
43     char hashIndex;                 /* back ptr to hash table */
44 } *Buffers;
45
46 #define pHash(page) ((page) & (PHSIZE-1))
47
48 afs_int32 ubik_nBuffers = NBUFFERS;
49 static struct buffer *phTable[PHSIZE];  /* page hash table */
50 static struct buffer *LruBuffer;
51 static int nbuffers;
52 static int calls=0, ios=0, lastb=0;
53 static char *BufferData;
54 static struct buffer *newslot();
55 static initd = 0;
56 #define BADFID      0xffffffff
57
58 static DTrunc();
59
60 static struct ubik_trunc *freeTruncList=0;
61
62 /* remove a transaction from the database's active transaction list.  Don't free it */
63 static unthread(atrans)
64     struct ubik_trans *atrans; {
65     struct ubik_trans **lt, *tt;
66     lt = &atrans->dbase->activeTrans;
67     for(tt = *lt; tt; lt = &tt->next, tt = *lt) {
68         if (tt == atrans) {
69             /* found it */
70             *lt = tt->next;
71             return 0;
72         }
73     }
74     return 2;   /* no entry */
75 }
76
77 /* some debugging assistance */
78 udisk_Debug(aparm)
79     struct ubik_debug *aparm; {
80     struct buffer *tb;
81     int i;
82
83     memcpy(&aparm->localVersion, &ubik_dbase->version, sizeof(struct ubik_version));
84     aparm->lockedPages = 0;
85     aparm->writeLockedPages = 0;
86     tb = Buffers;
87     for(i=0;i<nbuffers;i++, tb++) {
88         if (tb->lockers) {
89             aparm->lockedPages++;
90             if (tb->dirty) aparm->writeLockedPages++;
91         }
92     }
93 }
94
95 /* log format is defined here, and implicitly in recovery.c
96  *
97  * 4 byte opcode, followed by parameters, each 4 bytes long.  All integers
98  * are in logged in network standard byte order, in case we want to move logs
99  * from machine-to-machine someday.
100  *
101  * Begin transaction: opcode
102  * Commit transaction: opcode, version (8 bytes)
103  * Truncate file: opcode, file number, length
104  * Abort transaction: opcode
105  * Write data: opcode, file, position, length, <length> data bytes
106  */
107
108 /* write an opcode to the log */
109 udisk_LogOpcode(adbase, aopcode, async)
110     struct ubik_dbase *adbase;
111     afs_int32 aopcode;
112     int async; {
113     struct ubik_stat ustat;
114     afs_int32 code;
115     
116     /* figure out where to write */
117     code = (*adbase->stat)(adbase, LOGFILE, &ustat);
118     if (code < 0) return code;
119
120     /* setup data and do write */
121     aopcode = htonl(aopcode);
122     code = (*adbase->write)(adbase, LOGFILE, &aopcode, ustat.size, sizeof(afs_int32));
123     if (code != sizeof(afs_int32)) return UIOERROR;
124
125     /* optionally sync data */
126     if (async) code = (*adbase->sync)(adbase, LOGFILE);
127     else code = 0;
128     return code;
129 }
130
131 /* log a commit, never syncing */
132 udisk_LogEnd(adbase, aversion)
133     struct ubik_dbase *adbase;
134     struct ubik_version *aversion; {
135     afs_int32 code;
136     afs_int32 data[3];
137     struct ubik_stat ustat;
138     
139     /* figure out where to write */
140     code = (*adbase->stat)(adbase, LOGFILE, &ustat);
141     if (code) return code;
142
143     /* setup data */
144     data[0] = htonl(LOGEND);
145     data[1] = htonl(aversion->epoch);
146     data[2] = htonl(aversion->counter);
147     
148     /* do write */
149     code = (*adbase->write)(adbase, LOGFILE, data, ustat.size, 3*sizeof(afs_int32));
150     if (code != 3*sizeof(afs_int32)) return UIOERROR;
151
152     /* finally sync the log */
153     code = (*adbase->sync)(adbase, LOGFILE);
154     return code;
155 }
156     
157 /* log a truncate operation, never syncing */
158 udisk_LogTruncate(adbase, afile, alength)
159     struct ubik_dbase *adbase;
160     afs_int32 afile, alength; {
161     afs_int32 code;
162     afs_int32 data[3];
163     struct ubik_stat ustat;
164     
165     /* figure out where to write */
166     code = (*adbase->stat)(adbase, LOGFILE, &ustat);
167     if (code < 0) return code;
168
169     /* setup data */
170     data[0] = htonl(LOGTRUNCATE);
171     data[1] = htonl(afile);
172     data[2] = htonl(alength);
173     
174     /* do write */
175     code = (*adbase->write)(adbase, LOGFILE, data, ustat.size, 3*sizeof(afs_int32));
176     if (code != 3*sizeof(afs_int32)) return UIOERROR;
177     return 0;
178 }
179     
180 /* write some data to the log, never syncing */
181 udisk_LogWriteData(adbase, afile, abuffer, apos, alen)
182     struct ubik_dbase *adbase;
183     char *abuffer;
184     afs_int32 afile;
185     afs_int32 apos;
186     afs_int32 alen; {
187     struct ubik_stat ustat;
188     afs_int32 code;
189     afs_int32 data[4];
190     afs_int32 lpos;
191     
192     /* find end of log */
193     code = (*adbase->stat)(adbase, LOGFILE, &ustat);
194     lpos = ustat.size;
195     if (code < 0) return code;
196
197     /* setup header */
198     data[0] = htonl(LOGDATA);
199     data[1] = htonl(afile);
200     data[2] = htonl(apos);
201     data[3] = htonl(alen);
202
203     /* write header */
204     code = (*adbase->write)(adbase, LOGFILE, data, lpos, 4*sizeof(afs_int32));
205     if (code != 4*sizeof(afs_int32)) return UIOERROR;
206     lpos += 4*sizeof(afs_int32);
207     
208     /* write data */
209     code = (*adbase->write)(adbase, LOGFILE, abuffer, lpos, alen);
210     if (code != alen) return UIOERROR;
211     return 0;
212 }
213
214 static int DInit (abuffers)
215     int abuffers; {
216     /* Initialize the venus buffer system. */
217     int i;
218     struct buffer *tb;
219     Buffers = (struct buffer *) malloc(abuffers * sizeof(struct buffer));
220     memset(Buffers, 0, abuffers * sizeof(struct buffer));
221     BufferData = (char *) malloc(abuffers * PAGESIZE);
222     nbuffers = abuffers;
223     for(i=0;i<PHSIZE;i++) phTable[i] = 0;
224     for (i=0;i<abuffers;i++) {
225         /* Fill in each buffer with an empty indication. */
226         tb = &Buffers[i];
227         tb->lru_next = &(Buffers[i+1]);
228         tb->lru_prev = &(Buffers[i-1]);
229         tb->data = &BufferData[PAGESIZE*i];
230         tb->file = BADFID;
231     }
232     Buffers[0].lru_prev = &(Buffers[abuffers-1]);
233     Buffers[abuffers-1].lru_next = &(Buffers[0]);
234     LruBuffer  = &(Buffers[0]);
235     return 0;
236 }
237
238 /* Take a buffer and mark it as the least recently used buffer */
239 static void Dlru(abuf)
240   struct buffer *abuf;
241 {
242   if (LruBuffer == abuf)
243      return;
244
245   /* Unthread from where it is in the list */
246   abuf->lru_next->lru_prev = abuf->lru_prev;
247   abuf->lru_prev->lru_next = abuf->lru_next;
248
249   /* Thread onto beginning of LRU list */
250   abuf->lru_next = LruBuffer;
251   abuf->lru_prev = LruBuffer->lru_prev;
252
253   LruBuffer->lru_prev->lru_next = abuf;
254   LruBuffer->lru_prev = abuf;
255   LruBuffer = abuf;
256 }
257
258 /* Take a buffer and mark it as the most recently used buffer */
259 static void Dmru(abuf)
260      struct buffer *abuf;
261 {
262   if (LruBuffer == abuf) {
263      LruBuffer = LruBuffer->lru_next;
264      return;
265   }
266
267   /* Unthread from where it is in the list */
268   abuf->lru_next->lru_prev = abuf->lru_prev;
269   abuf->lru_prev->lru_next = abuf->lru_next;
270
271   /* Thread onto end of LRU list - making it the MRU buffer */
272   abuf->lru_next = LruBuffer;
273   abuf->lru_prev = LruBuffer->lru_prev;
274   LruBuffer->lru_prev->lru_next = abuf;
275   LruBuffer->lru_prev = abuf;
276 }
277
278 /* get a pointer to a particular buffer */
279 static char *DRead(dbase, fid, page)
280     struct ubik_dbase *dbase;
281     afs_int32 fid;
282     int page; {
283     /* Read a page from the disk. */
284     struct buffer *tb, *lastbuffer;
285     afs_int32 code;
286
287     calls++;
288     lastbuffer = LruBuffer->lru_prev;
289
290     if ((lastbuffer->page  == page ) && 
291         (lastbuffer->file  == fid  ) && 
292         (lastbuffer->dbase == dbase)) {
293         tb = lastbuffer;
294         tb->lockers++;
295         lastb++;
296         return tb->data;
297     }
298     for(tb=phTable[pHash(page)]; tb; tb=tb->hashNext) {
299         if (tb->page == page && tb->file == fid && tb->dbase == dbase) {
300             Dmru(tb);
301             tb->lockers++;
302             return tb->data;
303         }
304     }
305     /* can't find it */
306     tb = newslot(dbase, fid, page);
307     if (!tb) return 0;
308     memset(tb->data, 0, PAGESIZE);
309
310     tb->lockers++;
311     code = (*dbase->read)(dbase, fid, tb->data, page*PAGESIZE, PAGESIZE);
312     if (code < 0) {
313        tb->file = BADFID;
314        Dlru(tb);
315        tb->lockers--;
316        ubik_print("Ubik: Error reading database file: errno=%d\n", errno);
317        return 0;
318     }
319     ios++;
320
321     /* Note that findslot sets the page field in the buffer equal to
322      * what it is searching for.
323      */
324     return tb->data;
325 }
326
327 /* zap truncated pages */
328 static DTrunc(dbase, fid, length)
329     struct ubik_dbase *dbase;
330     afs_int32 fid;
331     afs_int32 length; {
332     afs_int32 maxPage;
333     struct buffer *tb;
334     int   i;
335     
336     maxPage = (length+PAGESIZE-1)>>LOGPAGESIZE; /* first invalid page now in file */
337     for (i=0,tb=Buffers; i<nbuffers; i++,tb++) {
338         if (tb->page >= maxPage && tb->file == fid && tb->dbase == dbase) {
339             tb->file = BADFID;
340             Dlru(tb);
341         }
342     }
343     return 0;
344 }
345
346 /* allocate a truncation entry.  We allocate special entries representing truncations, rather than
347     performing them immediately, so that we can abort a transaction easily by simply purging
348     the in-core memory buffers and discarding these truncation entries.
349 */
350 static struct ubik_trunc *GetTrunc() {
351     struct ubik_trunc *tt;
352     if (!freeTruncList) {
353         freeTruncList = (struct ubik_trunc *) malloc(sizeof(struct ubik_trunc));
354         freeTruncList->next = (struct ubik_trunc *) 0;
355     }
356     tt = freeTruncList;
357     freeTruncList = tt->next;
358     return tt;
359 }
360
361 /* free a truncation entry */
362 static PutTrunc(at)
363     struct ubik_trunc *at; {
364     at->next = freeTruncList;
365     freeTruncList = at;
366     return 0;
367 }
368
369 /* find a truncation entry for a file, if any */
370 static struct ubik_trunc *FindTrunc(atrans, afile)
371     struct ubik_trans *atrans;
372     afs_int32 afile; {
373     struct ubik_trunc *tt;
374     for(tt=atrans->activeTruncs; tt; tt=tt->next) {
375         if (tt->file == afile) return tt;
376     }
377     return (struct ubik_trunc *) 0;
378 }
379
380 /* do truncates associated with trans, and free them */
381 static DoTruncs(atrans)
382     struct ubik_trans *atrans; {
383     struct ubik_trunc *tt, *nt;
384     int (*tproc)();
385     afs_int32 rcode=0, code;
386
387     tproc = atrans->dbase->truncate;
388     for(tt = atrans->activeTruncs; tt; tt=nt) {
389         nt = tt->next;
390         DTrunc(atrans->dbase, tt->file, tt->length);    /* zap pages from buffer cache */
391         code = (*tproc)(atrans->dbase, tt->file, tt->length);
392         if (code) rcode = code;
393         PutTrunc(tt);
394     }
395     /* don't unthread, because we do the entire list's worth here */
396     atrans->activeTruncs = (struct ubik_trunc *) 0;
397     return(rcode);
398 }
399
400 /* mark a fid as invalid */
401 udisk_Invalidate(adbase, afid)
402 struct ubik_dbase *adbase;
403 afs_int32 afid; {
404     struct buffer *tb;
405     int    i;
406
407     for (i=0,tb=Buffers; i<nbuffers; i++,tb++) {
408         if (tb->file == afid) {
409             tb->file = BADFID;
410             Dlru(tb);
411         }
412     }
413     return 0;
414 }
415
416 /* move this page into the correct hash bucket */
417 static FixupBucket(ap)
418     struct buffer *ap; {
419     struct buffer **lp, *tp;
420     int i;
421     /* first try to get it out of its current hash bucket, in which it might not be */
422     i = ap->hashIndex;
423     lp = &phTable[i];
424     for(tp = *lp; tp; tp=tp->hashNext) {
425         if (tp == ap) {
426             *lp = tp->hashNext;
427             break;
428         }
429         lp = &tp->hashNext;
430     }
431     /* now figure the new hash bucket */
432     i = pHash(ap->page);
433     ap->hashIndex = i;          /* remember where we are for deletion */
434     ap->hashNext = phTable[i];  /* add us to the list */
435     phTable[i] = ap;
436 }
437
438 /* create a new slot for a particular dbase page */
439 static struct buffer *newslot (adbase, afid, apage)
440     struct ubik_dbase *adbase;
441     afs_int32 afid, apage; {
442     /* Find a usable buffer slot */
443     afs_int32 i;
444     struct buffer *pp, *tp;
445
446     pp = 0;             /* last pure */
447     for (i=0,tp=LruBuffer; i<nbuffers; i++,tp=tp->lru_next) {
448        if (!tp->lockers && !tp->dirty) {
449           pp = tp;
450           break;
451        }
452     }
453
454     if (pp == 0) {
455         /* There are no unlocked buffers that don't need to be written to the disk. */
456         ubik_print("Ubik: Internal Error: Unable to find free buffer in ubik cache\n");
457         return (struct buffer *) 0;
458     }
459
460     /* Now fill in the header. */
461     pp->dbase = adbase;
462     pp->file = afid;
463     pp->page = apage;
464
465     FixupBucket(pp);            /* move to the right hash bucket */
466     Dmru(pp);
467     return pp;
468 }
469
470 /* Release a buffer, specifying whether or not the buffer has been modified by the locker. */
471 static void DRelease (ap,flag)
472     char *ap;
473     int flag; {
474     int index;
475     struct buffer *bp;
476
477     if (!ap) return;
478     index = (ap - (char *)BufferData) >> LOGPAGESIZE;
479     bp = &(Buffers[index]);
480     bp->lockers--;
481     if (flag) bp->dirty=1;
482     return;
483 }
484
485 /* flush all modified buffers, leaves dirty bits set (they're cleared
486  * by DSync).  Note interaction with DSync: you call this thing first,
487  * writing the buffers to the disk.  Then you call DSync to sync all the
488  * files that were written, and to clear the dirty bits.  You should
489  * always call DFlush/DSync as a pair.
490  */
491 static DFlush (adbase)
492     struct ubik_dbase *adbase; {
493     int i;
494     afs_int32 code;
495     struct buffer *tb;
496
497     tb = Buffers;
498     for(i=0;i<nbuffers;i++,tb++) {
499         if (tb->dirty) {
500             code = tb->page * PAGESIZE; /* offset within file */
501             code = (*adbase->write)(adbase, tb->file, tb->data, code, PAGESIZE);
502             if (code != PAGESIZE) return UIOERROR;
503         }
504     }
505     return 0;
506 }
507
508 /* flush all modified buffers */
509 static DAbort (adbase)
510     struct ubik_dbase *adbase; {
511     int i;
512     struct buffer *tb;
513
514     tb = Buffers;
515     for(i=0;i<nbuffers;i++,tb++) {
516         if (tb->dirty) {
517             tb->dirty = 0;
518             tb->file = BADFID;
519             Dlru(tb);
520         }
521     }
522     return 0;
523 }
524
525 /* must only be called after DFlush, due to its interpretation of dirty flag */
526 static DSync(adbase)
527     struct ubik_dbase *adbase; {
528     int i;
529     afs_int32 code;
530     struct buffer *tb;
531     afs_int32 file;
532     afs_int32 rCode;
533
534     rCode = 0;
535     while (1) {
536         file = BADFID;
537         for(i=0,tb = Buffers; i<nbuffers; i++,tb++) {
538             if (tb->dirty == 1) {
539                 if (file == BADFID) file = tb->file;
540                 if (file != BADFID && tb->file == file) tb->dirty = 0;
541             }
542         }
543         if (file == BADFID) break;
544         /* otherwise we have a file to sync */
545         code = (*adbase->sync)(adbase, file);
546         if (code) rCode = code;
547     }
548     return rCode;
549 }
550
551 /* Same as read, only do not even try to read the page */
552 static char *DNew (dbase, fid, page)
553     struct ubik_dbase *dbase;
554     int page;
555     afs_int32 fid; {
556     struct buffer *tb;
557
558     if ((tb = newslot(dbase, fid, page)) == 0) return (char *) 0;
559     tb->lockers++;
560     memset(tb->data, 0, PAGESIZE);
561     return tb->data;
562 }
563
564 /* read data from database */
565 udisk_read(atrans, afile, abuffer, apos, alen)
566     afs_int32 afile;
567     char *abuffer;
568     afs_int32 apos, alen;
569     struct ubik_trans *atrans; {
570     char *bp;
571     afs_int32 offset, len, totalLen;
572     struct ubik_dbase *dbase;
573
574     if (atrans->flags & TRDONE) return UDONE;
575     totalLen = 0;
576     dbase = atrans->dbase;
577     while (alen > 0) {
578         bp = DRead(dbase, afile, apos>>LOGPAGESIZE);
579         if (!bp) return UEOF;
580         /* otherwise, min of remaining bytes and end of buffer to user mode */
581         offset = apos & (PAGESIZE-1);
582         len = PAGESIZE - offset;
583         if (len > alen) len = alen;
584         memcpy(abuffer, bp+offset, len);
585         abuffer += len;
586         apos += len;
587         alen -= len;
588         totalLen += len;
589         DRelease(bp, 0);
590     }
591     return 0;
592 }
593
594 /* truncate file */
595 udisk_truncate(atrans, afile, alength)
596     struct ubik_trans *atrans;
597     afs_int32 afile;
598     afs_int32 alength; {
599     afs_int32 code;
600     struct ubik_trunc *tt;
601
602     if (atrans->flags & TRDONE) return UDONE;
603     if (atrans->type != UBIK_WRITETRANS) return UBADTYPE;
604
605     /* write a truncate log record */
606     code = udisk_LogTruncate(atrans->dbase, afile, alength);
607
608     /* don't truncate until commit time */
609     tt = FindTrunc(atrans, afile);
610     if (!tt) {
611         /* this file not truncated yet */
612         tt=GetTrunc();
613         tt->next = atrans->activeTruncs;
614         atrans->activeTruncs = tt;
615         tt->file = afile;
616         tt->length = alength;
617     }
618     else {
619         /* already truncated to a certain length */
620         if (tt->length > alength) tt->length = alength;
621     }
622     return code;
623 }
624
625 /* write data to database, using logs */
626 udisk_write(atrans, afile, abuffer, apos, alen)
627     afs_int32 afile;
628     char *abuffer;
629     afs_int32 apos, alen;
630     struct ubik_trans *atrans; {
631     char *bp;
632     afs_int32 offset, len, totalLen;
633     struct ubik_dbase *dbase;
634     struct ubik_trunc *tt;
635     afs_int32 code;
636
637     if (atrans->flags & TRDONE) return UDONE;
638     if (atrans->type != UBIK_WRITETRANS) return UBADTYPE;
639
640     dbase = atrans->dbase;
641     /* first write the data to the log */
642     code = udisk_LogWriteData(dbase, afile, abuffer, apos, alen);
643     if (code) return code;
644
645     /* expand any truncations of this file */
646     tt = FindTrunc(atrans, afile);
647     if (tt) {
648         if (tt->length < apos + alen) {
649             tt->length = apos + alen;
650         }
651     }
652
653     /* now update vm */
654     totalLen = 0;
655     while (alen > 0) {
656         bp = DRead(dbase, afile, apos>>LOGPAGESIZE);
657         if (!bp) {
658             bp = DNew(dbase, afile, apos>>LOGPAGESIZE);
659             if (!bp) return UIOERROR;
660             memset(bp, 0, PAGESIZE);
661         }
662         /* otherwise, min of remaining bytes and end of buffer to user mode */
663         offset = apos & (PAGESIZE-1);
664         len = PAGESIZE-offset;
665         if (len > alen) len = alen;
666         memcpy(bp+offset, abuffer, len);
667         abuffer += len;
668         apos += len;
669         alen -= len;
670         totalLen += len;
671         DRelease(bp, 1);    /* buffer modified */
672     }
673     return 0;
674 }
675
676 /* begin a new local transaction */
677 udisk_begin(adbase, atype, atrans)
678     struct ubik_trans **atrans;
679     int atype;
680     struct ubik_dbase *adbase; {
681     afs_int32 code;
682     struct ubik_trans *tt;
683
684     *atrans = (struct ubik_trans *)NULL;
685     /* Make sure system is initialized before doing anything */
686     if (!initd) {
687         initd = 1;
688         DInit(ubik_nBuffers);
689     }
690     if (atype == UBIK_WRITETRANS) {
691         if (adbase->flags & DBWRITING) return USYNC;
692         code = udisk_LogOpcode(adbase, LOGNEW, 0);
693         if (code) return code;
694     }
695     tt = (struct ubik_trans *) malloc(sizeof(struct ubik_trans));
696     memset(tt, 0, sizeof(struct ubik_trans));
697     tt->dbase = adbase;
698     tt->next = adbase->activeTrans;
699     adbase->activeTrans = tt;
700     tt->type = atype;
701     if (atype == UBIK_READTRANS) adbase->readers++;
702     else if (atype == UBIK_WRITETRANS) adbase->flags |= DBWRITING;
703     *atrans = tt;
704     return 0;
705 }
706
707 /* commit transaction */
708 udisk_commit(atrans)
709     struct ubik_trans *atrans; {
710     struct ubik_dbase *dbase;
711     afs_int32 code=0;
712     struct ubik_version oldversion, newversion;
713
714     if (atrans->flags & TRDONE)
715        return(UTWOENDS);
716
717     if (atrans->type == UBIK_WRITETRANS) {
718         dbase = atrans->dbase;
719
720         /* On the first write to the database. We update the versions */
721         if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
722            oldversion = dbase->version;
723            newversion.epoch   = FT_ApproxTime();;
724            newversion.counter = 1;
725           
726            code = (*dbase->setlabel)(dbase, 0, &newversion);
727            if (code) return(code);
728            ubik_epochTime = newversion.epoch;
729            dbase->version = newversion;
730
731            /* Ignore the error here. If the call fails, the site is
732             * marked down and when we detect it is up again, we will 
733             * send the entire database to it.
734             */
735            ContactQuorum(DISK_SetVersion, atrans, 1/*CStampVersion*/,
736                          &oldversion, &newversion);
737            urecovery_state |= UBIK_RECLABELDB;
738         }
739
740         dbase->version.counter++;       /* bump commit count */
741         LWP_NoYieldSignal(&dbase->version);
742
743         code = udisk_LogEnd(dbase, &dbase->version);
744         if (code) {
745            dbase->version.counter--;
746            return(code);
747         }
748
749         /* If we fail anytime after this, then panic and let the
750          * recovery replay the log. 
751          */
752         code = DFlush(dbase);   /* write dirty pages to respective files */
753         if (code) panic("Writing Ubik DB modifications\n");
754         code = DSync(dbase);    /* sync the files and mark pages not dirty */
755         if (code) panic("Synchronizing Ubik DB modifications\n");
756
757         code = DoTruncs(atrans); /* Perform requested truncations */
758         if (code) panic("Truncating Ubik DB\n");
759
760         /* label the committed dbase */
761         code = (*dbase->setlabel)(dbase, 0, &dbase->version);
762         if (code) panic("Truncating Ubik DB\n");
763
764         code = (*dbase->truncate)(dbase, LOGFILE, 0);   /* discard log (optional) */
765         if (code) panic("Truncating Ubik logfile\n");
766
767     }
768
769     /* When the transaction is marked done, it also means the logfile
770      * has been truncated.
771      */
772     atrans->flags |= TRDONE;
773     return code;
774 }
775
776 /* abort transaction */
777 udisk_abort(atrans)
778     struct ubik_trans *atrans;
779 {
780     struct ubik_dbase *dbase;
781     afs_int32 code;
782     
783     if (atrans->flags & TRDONE)
784        return UTWOENDS;
785
786     /* Check if we are the write trans before logging abort, lest we
787      * abort a good write trans in progress. 
788      * We don't really care if the LOGABORT gets to the log because we 
789      * truncate the log next. If the truncate fails, we panic; for 
790      * otherwise, the log entries remain. On restart, replay of the log
791      * will do nothing because the abort is there or no LogEnd opcode.
792      */
793     dbase = atrans->dbase;
794     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
795         udisk_LogOpcode(dbase, LOGABORT, 1);
796         code = (*dbase->truncate)(dbase, LOGFILE, 0);
797         if (code) panic("Truncating Ubik logfile during an abort\n");
798         DAbort(dbase);         /* remove all dirty pages */
799     }
800
801     /* When the transaction is marked done, it also means the logfile
802      * has been truncated.
803      */
804     atrans->flags |= (TRABORT | TRDONE);
805     return 0;
806 }
807
808 /* destroy a transaction after it has been committed or aborted.  if
809  * it hasn't committed before you call this routine, we'll abort the
810  * transaction for you.
811  */
812 udisk_end(atrans)
813     struct ubik_trans *atrans; {
814     struct ubik_dbase *dbase;
815
816     if (!(atrans->flags & TRDONE)) udisk_abort(atrans);
817     dbase = atrans->dbase;
818
819     ulock_relLock(atrans);
820     unthread(atrans);
821
822     /* check if we are the write trans before unsetting the DBWRITING bit, else
823      * we could be unsetting someone else's bit.
824      */
825     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
826        dbase->flags &= ~DBWRITING;
827     } else {
828        dbase->readers--;
829     }
830     if (atrans->iovec_info.iovec_wrt_val) free(atrans->iovec_info.iovec_wrt_val);
831     if (atrans->iovec_data.iovec_buf_val) free(atrans->iovec_data.iovec_buf_val);
832     free(atrans);
833
834     /* Wakeup any writers waiting in BeginTrans() */
835     LWP_NoYieldSignal(&dbase->flags);
836     return 0;
837 }