02a482aec3cfefcaf0ef64a07787c3c24e0b63ad
[openafs.git] / src / ubik / disk.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 RCSID
14     ("$Header$");
15
16 #include <sys/types.h>
17 #ifdef AFS_NT40_ENV
18 #include <winsock2.h>
19 #else
20 #include <sys/file.h>
21 #include <netinet/in.h>
22 #endif
23 #include <errno.h>
24 #ifdef HAVE_STRING_H
25 #include <string.h>
26 #else
27 #ifdef HAVE_STRINGS_H
28 #include <strings.h>
29 #endif
30 #endif
31 #include <lock.h>
32 #include <rx/xdr.h>
33
34
35
36 #define UBIK_INTERNALS
37 #include "ubik.h"
38 #include "ubik_int.h"
39
40 #define PHSIZE  128
41 static struct buffer {
42     struct ubik_dbase *dbase;   /* dbase within which the buffer resides */
43     afs_int32 file;             /* Unique cache key */
44     afs_int32 page;             /* page number */
45     struct buffer *lru_next;
46     struct buffer *lru_prev;
47     struct buffer *hashNext;    /* next dude in hash table */
48     char *data;                 /* ptr to the data */
49     char lockers;               /* usage ref count */
50     char dirty;                 /* is buffer modified */
51     char hashIndex;             /* back ptr to hash table */
52 } *Buffers;
53
54 #define pHash(page) ((page) & (PHSIZE-1))
55
56 afs_int32 ubik_nBuffers = NBUFFERS;
57 static struct buffer *phTable[PHSIZE];  /* page hash table */
58 static struct buffer *LruBuffer;
59 static int nbuffers;
60 static int calls = 0, ios = 0, lastb = 0;
61 static char *BufferData;
62 static struct buffer *newslot(struct ubik_dbase *adbase, afs_int32 afid,
63                               afs_int32 apage);
64 static initd = 0;
65 #define BADFID      0xffffffff
66
67 static DTrunc(struct ubik_dbase *dbase, afs_int32 fid, afs_int32 length);
68
69 static struct ubik_trunc *freeTruncList = 0;
70
71 /* remove a transaction from the database's active transaction list.  Don't free it */
72 static int
73 unthread(struct ubik_trans *atrans)
74 {
75     struct ubik_trans **lt, *tt;
76     lt = &atrans->dbase->activeTrans;
77     for (tt = *lt; tt; lt = &tt->next, tt = *lt) {
78         if (tt == atrans) {
79             /* found it */
80             *lt = tt->next;
81             return 0;
82         }
83     }
84     return 2;                   /* no entry */
85 }
86
87 /* some debugging assistance */
88 int
89 udisk_Debug(struct ubik_debug *aparm)
90 {
91     struct buffer *tb;
92     int i;
93
94     memcpy(&aparm->localVersion, &ubik_dbase->version,
95            sizeof(struct ubik_version));
96     aparm->lockedPages = 0;
97     aparm->writeLockedPages = 0;
98     tb = Buffers;
99     for (i = 0; i < nbuffers; i++, tb++) {
100         if (tb->lockers) {
101             aparm->lockedPages++;
102             if (tb->dirty)
103                 aparm->writeLockedPages++;
104         }
105     }
106 }
107
108 /* log format is defined here, and implicitly in recovery.c
109  *
110  * 4 byte opcode, followed by parameters, each 4 bytes long.  All integers
111  * are in logged in network standard byte order, in case we want to move logs
112  * from machine-to-machine someday.
113  *
114  * Begin transaction: opcode
115  * Commit transaction: opcode, version (8 bytes)
116  * Truncate file: opcode, file number, length
117  * Abort transaction: opcode
118  * Write data: opcode, file, position, length, <length> data bytes
119  */
120
121 /* write an opcode to the log */
122 int
123 udisk_LogOpcode(struct ubik_dbase *adbase, afs_int32 aopcode, int async)
124 {
125     struct ubik_stat ustat;
126     afs_int32 code;
127
128     /* figure out where to write */
129     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
130     if (code < 0)
131         return code;
132
133     /* setup data and do write */
134     aopcode = htonl(aopcode);
135     code =
136         (*adbase->write) (adbase, LOGFILE, &aopcode, ustat.size,
137                           sizeof(afs_int32));
138     if (code != sizeof(afs_int32))
139         return UIOERROR;
140
141     /* optionally sync data */
142     if (async)
143         code = (*adbase->sync) (adbase, LOGFILE);
144     else
145         code = 0;
146     return code;
147 }
148
149 /* log a commit, never syncing */
150 int
151 udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion)
152 {
153     afs_int32 code;
154     afs_int32 data[3];
155     struct ubik_stat ustat;
156
157     /* figure out where to write */
158     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
159     if (code)
160         return code;
161
162     /* setup data */
163     data[0] = htonl(LOGEND);
164     data[1] = htonl(aversion->epoch);
165     data[2] = htonl(aversion->counter);
166
167     /* do write */
168     code =
169         (*adbase->write) (adbase, LOGFILE, data, ustat.size,
170                           3 * sizeof(afs_int32));
171     if (code != 3 * sizeof(afs_int32))
172         return UIOERROR;
173
174     /* finally sync the log */
175     code = (*adbase->sync) (adbase, LOGFILE);
176     return code;
177 }
178
179 /* log a truncate operation, never syncing */
180 int
181 udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile,
182                   afs_int32 alength)
183 {
184     afs_int32 code;
185     afs_int32 data[3];
186     struct ubik_stat ustat;
187
188     /* figure out where to write */
189     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
190     if (code < 0)
191         return code;
192
193     /* setup data */
194     data[0] = htonl(LOGTRUNCATE);
195     data[1] = htonl(afile);
196     data[2] = htonl(alength);
197
198     /* do write */
199     code =
200         (*adbase->write) (adbase, LOGFILE, data, ustat.size,
201                           3 * sizeof(afs_int32));
202     if (code != 3 * sizeof(afs_int32))
203         return UIOERROR;
204     return 0;
205 }
206
207 /* write some data to the log, never syncing */
208 int
209 udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, char *abuffer,
210                    afs_int32 apos, afs_int32 alen)
211 {
212     struct ubik_stat ustat;
213     afs_int32 code;
214     afs_int32 data[4];
215     afs_int32 lpos;
216
217     /* find end of log */
218     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
219     lpos = ustat.size;
220     if (code < 0)
221         return code;
222
223     /* setup header */
224     data[0] = htonl(LOGDATA);
225     data[1] = htonl(afile);
226     data[2] = htonl(apos);
227     data[3] = htonl(alen);
228
229     /* write header */
230     code =
231         (*adbase->write) (adbase, LOGFILE, data, lpos, 4 * sizeof(afs_int32));
232     if (code != 4 * sizeof(afs_int32))
233         return UIOERROR;
234     lpos += 4 * sizeof(afs_int32);
235
236     /* write data */
237     code = (*adbase->write) (adbase, LOGFILE, abuffer, lpos, alen);
238     if (code != alen)
239         return UIOERROR;
240     return 0;
241 }
242
243 static int
244 DInit(int abuffers)
245 {
246     /* Initialize the venus buffer system. */
247     int i;
248     struct buffer *tb;
249     Buffers = (struct buffer *)malloc(abuffers * sizeof(struct buffer));
250     memset(Buffers, 0, abuffers * sizeof(struct buffer));
251     BufferData = (char *)malloc(abuffers * UBIK_PAGESIZE);
252     nbuffers = abuffers;
253     for (i = 0; i < PHSIZE; i++)
254         phTable[i] = 0;
255     for (i = 0; i < abuffers; i++) {
256         /* Fill in each buffer with an empty indication. */
257         tb = &Buffers[i];
258         tb->lru_next = &(Buffers[i + 1]);
259         tb->lru_prev = &(Buffers[i - 1]);
260         tb->data = &BufferData[UBIK_PAGESIZE * i];
261         tb->file = BADFID;
262     }
263     Buffers[0].lru_prev = &(Buffers[abuffers - 1]);
264     Buffers[abuffers - 1].lru_next = &(Buffers[0]);
265     LruBuffer = &(Buffers[0]);
266     return 0;
267 }
268
269 /* Take a buffer and mark it as the least recently used buffer */
270 static void
271 Dlru(struct buffer *abuf)
272 {
273     if (LruBuffer == abuf)
274         return;
275
276     /* Unthread from where it is in the list */
277     abuf->lru_next->lru_prev = abuf->lru_prev;
278     abuf->lru_prev->lru_next = abuf->lru_next;
279
280     /* Thread onto beginning of LRU list */
281     abuf->lru_next = LruBuffer;
282     abuf->lru_prev = LruBuffer->lru_prev;
283
284     LruBuffer->lru_prev->lru_next = abuf;
285     LruBuffer->lru_prev = abuf;
286     LruBuffer = abuf;
287 }
288
289 /* Take a buffer and mark it as the most recently used buffer */
290 static void
291 Dmru(struct buffer *abuf)
292 {
293     if (LruBuffer == abuf) {
294         LruBuffer = LruBuffer->lru_next;
295         return;
296     }
297
298     /* Unthread from where it is in the list */
299     abuf->lru_next->lru_prev = abuf->lru_prev;
300     abuf->lru_prev->lru_next = abuf->lru_next;
301
302     /* Thread onto end of LRU list - making it the MRU buffer */
303     abuf->lru_next = LruBuffer;
304     abuf->lru_prev = LruBuffer->lru_prev;
305     LruBuffer->lru_prev->lru_next = abuf;
306     LruBuffer->lru_prev = abuf;
307 }
308
309 /* get a pointer to a particular buffer */
310 static char *
311 DRead(struct ubik_dbase *dbase, afs_int32 fid, int page)
312 {
313     /* Read a page from the disk. */
314     struct buffer *tb, *lastbuffer;
315     afs_int32 code;
316
317     calls++;
318     lastbuffer = LruBuffer->lru_prev;
319
320     if ((lastbuffer->page == page) && (lastbuffer->file == fid)
321         && (lastbuffer->dbase == dbase)) {
322         tb = lastbuffer;
323         tb->lockers++;
324         lastb++;
325         return tb->data;
326     }
327     for (tb = phTable[pHash(page)]; tb; tb = tb->hashNext) {
328         if (tb->page == page && tb->file == fid && tb->dbase == dbase) {
329             Dmru(tb);
330             tb->lockers++;
331             return tb->data;
332         }
333     }
334     /* can't find it */
335     tb = newslot(dbase, fid, page);
336     if (!tb)
337         return 0;
338     memset(tb->data, 0, UBIK_PAGESIZE);
339
340     tb->lockers++;
341     code =
342         (*dbase->read) (dbase, fid, tb->data, page * UBIK_PAGESIZE,
343                         UBIK_PAGESIZE);
344     if (code < 0) {
345         tb->file = BADFID;
346         Dlru(tb);
347         tb->lockers--;
348         ubik_print("Ubik: Error reading database file: errno=%d\n", errno);
349         return 0;
350     }
351     ios++;
352
353     /* Note that findslot sets the page field in the buffer equal to
354      * what it is searching for.
355      */
356     return tb->data;
357 }
358
359 /* zap truncated pages */
360 static int
361 DTrunc(struct ubik_dbase *dbase, afs_int32 fid, afs_int32 length)
362 {
363     afs_int32 maxPage;
364     struct buffer *tb;
365     int i;
366
367     maxPage = (length + UBIK_PAGESIZE - 1) >> UBIK_LOGPAGESIZE; /* first invalid page now in file */
368     for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
369         if (tb->page >= maxPage && tb->file == fid && tb->dbase == dbase) {
370             tb->file = BADFID;
371             Dlru(tb);
372         }
373     }
374     return 0;
375 }
376
377 /* allocate a truncation entry.  We allocate special entries representing truncations, rather than
378     performing them immediately, so that we can abort a transaction easily by simply purging
379     the in-core memory buffers and discarding these truncation entries.
380 */
381 static struct ubik_trunc *
382 GetTrunc(void)
383 {
384     struct ubik_trunc *tt;
385     if (!freeTruncList) {
386         freeTruncList =
387             (struct ubik_trunc *)malloc(sizeof(struct ubik_trunc));
388         freeTruncList->next = (struct ubik_trunc *)0;
389     }
390     tt = freeTruncList;
391     freeTruncList = tt->next;
392     return tt;
393 }
394
395 /* free a truncation entry */
396 static int
397 PutTrunc(struct ubik_trunc *at)
398 {
399     at->next = freeTruncList;
400     freeTruncList = at;
401     return 0;
402 }
403
404 /* find a truncation entry for a file, if any */
405 static struct ubik_trunc *
406 FindTrunc(struct ubik_trans *atrans, afs_int32 afile)
407 {
408     struct ubik_trunc *tt;
409     for (tt = atrans->activeTruncs; tt; tt = tt->next) {
410         if (tt->file == afile)
411             return tt;
412     }
413     return (struct ubik_trunc *)0;
414 }
415
416 /* do truncates associated with trans, and free them */
417 static int
418 DoTruncs(struct ubik_trans *atrans)
419 {
420     struct ubik_trunc *tt, *nt;
421     int (*tproc) ();
422     afs_int32 rcode = 0, code;
423
424     tproc = atrans->dbase->truncate;
425     for (tt = atrans->activeTruncs; tt; tt = nt) {
426         nt = tt->next;
427         DTrunc(atrans->dbase, tt->file, tt->length);    /* zap pages from buffer cache */
428         code = (*tproc) (atrans->dbase, tt->file, tt->length);
429         if (code)
430             rcode = code;
431         PutTrunc(tt);
432     }
433     /* don't unthread, because we do the entire list's worth here */
434     atrans->activeTruncs = (struct ubik_trunc *)0;
435     return (rcode);
436 }
437
438 /* mark a fid as invalid */
439 int
440 udisk_Invalidate(struct ubik_dbase *adbase, afs_int32 afid)
441 {
442     struct buffer *tb;
443     int i;
444
445     for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
446         if (tb->file == afid) {
447             tb->file = BADFID;
448             Dlru(tb);
449         }
450     }
451     return 0;
452 }
453
454 /* move this page into the correct hash bucket */
455 static int
456 FixupBucket(struct buffer *ap)
457 {
458     struct buffer **lp, *tp;
459     int i;
460     /* first try to get it out of its current hash bucket, in which it might not be */
461     i = ap->hashIndex;
462     lp = &phTable[i];
463     for (tp = *lp; tp; tp = tp->hashNext) {
464         if (tp == ap) {
465             *lp = tp->hashNext;
466             break;
467         }
468         lp = &tp->hashNext;
469     }
470     /* now figure the new hash bucket */
471     i = pHash(ap->page);
472     ap->hashIndex = i;          /* remember where we are for deletion */
473     ap->hashNext = phTable[i];  /* add us to the list */
474     phTable[i] = ap;
475 }
476
477 /* create a new slot for a particular dbase page */
478 static struct buffer *
479 newslot(struct ubik_dbase *adbase, afs_int32 afid, afs_int32 apage)
480 {
481     /* Find a usable buffer slot */
482     afs_int32 i;
483     struct buffer *pp, *tp;
484
485     pp = 0;                     /* last pure */
486     for (i = 0, tp = LruBuffer; i < nbuffers; i++, tp = tp->lru_next) {
487         if (!tp->lockers && !tp->dirty) {
488             pp = tp;
489             break;
490         }
491     }
492
493     if (pp == 0) {
494         /* There are no unlocked buffers that don't need to be written to the disk. */
495         ubik_print
496             ("Ubik: Internal Error: Unable to find free buffer in ubik cache\n");
497         return NULL;
498     }
499
500     /* Now fill in the header. */
501     pp->dbase = adbase;
502     pp->file = afid;
503     pp->page = apage;
504
505     FixupBucket(pp);            /* move to the right hash bucket */
506     Dmru(pp);
507     return pp;
508 }
509
510 /* Release a buffer, specifying whether or not the buffer has been modified by the locker. */
511 static void
512 DRelease(char *ap, int flag)
513 {
514     int index;
515     struct buffer *bp;
516
517     if (!ap)
518         return;
519     index = (ap - (char *)BufferData) >> UBIK_LOGPAGESIZE;
520     bp = &(Buffers[index]);
521     bp->lockers--;
522     if (flag)
523         bp->dirty = 1;
524     return;
525 }
526
527 /* flush all modified buffers, leaves dirty bits set (they're cleared
528  * by DSync).  Note interaction with DSync: you call this thing first,
529  * writing the buffers to the disk.  Then you call DSync to sync all the
530  * files that were written, and to clear the dirty bits.  You should
531  * always call DFlush/DSync as a pair.
532  */
533 static int
534 DFlush(struct ubik_dbase *adbase)
535 {
536     int i;
537     afs_int32 code;
538     struct buffer *tb;
539
540     tb = Buffers;
541     for (i = 0; i < nbuffers; i++, tb++) {
542         if (tb->dirty) {
543             code = tb->page * UBIK_PAGESIZE;    /* offset within file */
544             code =
545                 (*adbase->write) (adbase, tb->file, tb->data, code,
546                                   UBIK_PAGESIZE);
547             if (code != UBIK_PAGESIZE)
548                 return UIOERROR;
549         }
550     }
551     return 0;
552 }
553
554 /* flush all modified buffers */
555 static int
556 DAbort(struct ubik_dbase *adbase)
557 {
558     int i;
559     struct buffer *tb;
560
561     tb = Buffers;
562     for (i = 0; i < nbuffers; i++, tb++) {
563         if (tb->dirty) {
564             tb->dirty = 0;
565             tb->file = BADFID;
566             Dlru(tb);
567         }
568     }
569     return 0;
570 }
571
572 /* must only be called after DFlush, due to its interpretation of dirty flag */
573 static int
574 DSync(struct ubik_dbase *adbase)
575 {
576     int i;
577     afs_int32 code;
578     struct buffer *tb;
579     afs_int32 file;
580     afs_int32 rCode;
581
582     rCode = 0;
583     while (1) {
584         file = BADFID;
585         for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
586             if (tb->dirty == 1) {
587                 if (file == BADFID)
588                     file = tb->file;
589                 if (file != BADFID && tb->file == file)
590                     tb->dirty = 0;
591             }
592         }
593         if (file == BADFID)
594             break;
595         /* otherwise we have a file to sync */
596         code = (*adbase->sync) (adbase, file);
597         if (code)
598             rCode = code;
599     }
600     return rCode;
601 }
602
603 /* Same as read, only do not even try to read the page */
604 static char *
605 DNew(struct ubik_dbase *dbase, afs_int32 fid, int page)
606 {
607     struct buffer *tb;
608
609     if ((tb = newslot(dbase, fid, page)) == 0)
610         return NULL;
611     tb->lockers++;
612     memset(tb->data, 0, UBIK_PAGESIZE);
613     return tb->data;
614 }
615
616 /* read data from database */
617 int
618 udisk_read(struct ubik_trans *atrans, afs_int32 afile, char *abuffer,
619            afs_int32 apos, afs_int32 alen)
620 {
621     char *bp;
622     afs_int32 offset, len, totalLen;
623     struct ubik_dbase *dbase;
624
625     if (atrans->flags & TRDONE)
626         return UDONE;
627     totalLen = 0;
628     dbase = atrans->dbase;
629     while (alen > 0) {
630         bp = DRead(dbase, afile, apos >> UBIK_LOGPAGESIZE);
631         if (!bp)
632             return UEOF;
633         /* otherwise, min of remaining bytes and end of buffer to user mode */
634         offset = apos & (UBIK_PAGESIZE - 1);
635         len = UBIK_PAGESIZE - offset;
636         if (len > alen)
637             len = alen;
638         memcpy(abuffer, bp + offset, len);
639         abuffer += len;
640         apos += len;
641         alen -= len;
642         totalLen += len;
643         DRelease(bp, 0);
644     }
645     return 0;
646 }
647
648 /* truncate file */
649 int
650 udisk_truncate(struct ubik_trans *atrans, afs_int32 afile, afs_int32 alength)
651 {
652     afs_int32 code;
653     struct ubik_trunc *tt;
654
655     if (atrans->flags & TRDONE)
656         return UDONE;
657     if (atrans->type != UBIK_WRITETRANS)
658         return UBADTYPE;
659
660     /* write a truncate log record */
661     code = udisk_LogTruncate(atrans->dbase, afile, alength);
662
663     /* don't truncate until commit time */
664     tt = FindTrunc(atrans, afile);
665     if (!tt) {
666         /* this file not truncated yet */
667         tt = GetTrunc();
668         tt->next = atrans->activeTruncs;
669         atrans->activeTruncs = tt;
670         tt->file = afile;
671         tt->length = alength;
672     } else {
673         /* already truncated to a certain length */
674         if (tt->length > alength)
675             tt->length = alength;
676     }
677     return code;
678 }
679
680 /* write data to database, using logs */
681 int
682 udisk_write(struct ubik_trans *atrans, afs_int32 afile, char *abuffer,
683             afs_int32 apos, afs_int32 alen)
684 {
685     char *bp;
686     afs_int32 offset, len, totalLen;
687     struct ubik_dbase *dbase;
688     struct ubik_trunc *tt;
689     afs_int32 code;
690
691     if (atrans->flags & TRDONE)
692         return UDONE;
693     if (atrans->type != UBIK_WRITETRANS)
694         return UBADTYPE;
695
696     dbase = atrans->dbase;
697     /* first write the data to the log */
698     code = udisk_LogWriteData(dbase, afile, abuffer, apos, alen);
699     if (code)
700         return code;
701
702     /* expand any truncations of this file */
703     tt = FindTrunc(atrans, afile);
704     if (tt) {
705         if (tt->length < apos + alen) {
706             tt->length = apos + alen;
707         }
708     }
709
710     /* now update vm */
711     totalLen = 0;
712     while (alen > 0) {
713         bp = DRead(dbase, afile, apos >> UBIK_LOGPAGESIZE);
714         if (!bp) {
715             bp = DNew(dbase, afile, apos >> UBIK_LOGPAGESIZE);
716             if (!bp)
717                 return UIOERROR;
718             memset(bp, 0, UBIK_PAGESIZE);
719         }
720         /* otherwise, min of remaining bytes and end of buffer to user mode */
721         offset = apos & (UBIK_PAGESIZE - 1);
722         len = UBIK_PAGESIZE - offset;
723         if (len > alen)
724             len = alen;
725         memcpy(bp + offset, abuffer, len);
726         abuffer += len;
727         apos += len;
728         alen -= len;
729         totalLen += len;
730         DRelease(bp, 1);        /* buffer modified */
731     }
732     return 0;
733 }
734
735 /* begin a new local transaction */
736 int
737 udisk_begin(struct ubik_dbase *adbase, int atype, struct ubik_trans **atrans)
738 {
739     afs_int32 code;
740     struct ubik_trans *tt;
741
742     *atrans = (struct ubik_trans *)NULL;
743     /* Make sure system is initialized before doing anything */
744     if (!initd) {
745         initd = 1;
746         DInit(ubik_nBuffers);
747     }
748     if (atype == UBIK_WRITETRANS) {
749         if (adbase->flags & DBWRITING)
750             return USYNC;
751         code = udisk_LogOpcode(adbase, LOGNEW, 0);
752         if (code)
753             return code;
754     }
755     tt = (struct ubik_trans *)malloc(sizeof(struct ubik_trans));
756     memset(tt, 0, sizeof(struct ubik_trans));
757     tt->dbase = adbase;
758     tt->next = adbase->activeTrans;
759     adbase->activeTrans = tt;
760     tt->type = atype;
761     if (atype == UBIK_READTRANS)
762         adbase->readers++;
763     else if (atype == UBIK_WRITETRANS)
764         adbase->flags |= DBWRITING;
765     *atrans = tt;
766     return 0;
767 }
768
769 /* commit transaction */
770 int
771 udisk_commit(struct ubik_trans *atrans)
772 {
773     struct ubik_dbase *dbase;
774     afs_int32 code = 0;
775     struct ubik_version oldversion, newversion;
776
777     if (atrans->flags & TRDONE)
778         return (UTWOENDS);
779
780     if (atrans->type == UBIK_WRITETRANS) {
781         dbase = atrans->dbase;
782
783         /* On the first write to the database. We update the versions */
784         if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
785             oldversion = dbase->version;
786             newversion.epoch = FT_ApproxTime();;
787             newversion.counter = 1;
788
789             code = (*dbase->setlabel) (dbase, 0, &newversion);
790             if (code)
791                 return (code);
792             ubik_epochTime = newversion.epoch;
793             dbase->version = newversion;
794
795             /* Ignore the error here. If the call fails, the site is
796              * marked down and when we detect it is up again, we will 
797              * send the entire database to it.
798              */
799             ContactQuorum(DISK_SetVersion, atrans, 1 /*CStampVersion */ ,
800                           &oldversion, &newversion);
801             urecovery_state |= UBIK_RECLABELDB;
802         }
803
804         dbase->version.counter++;       /* bump commit count */
805         LWP_NoYieldSignal(&dbase->version);
806
807         code = udisk_LogEnd(dbase, &dbase->version);
808         if (code) {
809             dbase->version.counter--;
810             return (code);
811         }
812
813         /* If we fail anytime after this, then panic and let the
814          * recovery replay the log. 
815          */
816         code = DFlush(dbase);   /* write dirty pages to respective files */
817         if (code)
818             panic("Writing Ubik DB modifications\n");
819         code = DSync(dbase);    /* sync the files and mark pages not dirty */
820         if (code)
821             panic("Synchronizing Ubik DB modifications\n");
822
823         code = DoTruncs(atrans);        /* Perform requested truncations */
824         if (code)
825             panic("Truncating Ubik DB\n");
826
827         /* label the committed dbase */
828         code = (*dbase->setlabel) (dbase, 0, &dbase->version);
829         if (code)
830             panic("Truncating Ubik DB\n");
831
832         code = (*dbase->truncate) (dbase, LOGFILE, 0);  /* discard log (optional) */
833         if (code)
834             panic("Truncating Ubik logfile\n");
835
836     }
837
838     /* When the transaction is marked done, it also means the logfile
839      * has been truncated.
840      */
841     atrans->flags |= TRDONE;
842     return code;
843 }
844
845 /* abort transaction */
846 int
847 udisk_abort(struct ubik_trans *atrans)
848 {
849     struct ubik_dbase *dbase;
850     afs_int32 code;
851
852     if (atrans->flags & TRDONE)
853         return UTWOENDS;
854
855     /* Check if we are the write trans before logging abort, lest we
856      * abort a good write trans in progress. 
857      * We don't really care if the LOGABORT gets to the log because we 
858      * truncate the log next. If the truncate fails, we panic; for 
859      * otherwise, the log entries remain. On restart, replay of the log
860      * will do nothing because the abort is there or no LogEnd opcode.
861      */
862     dbase = atrans->dbase;
863     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
864         udisk_LogOpcode(dbase, LOGABORT, 1);
865         code = (*dbase->truncate) (dbase, LOGFILE, 0);
866         if (code)
867             panic("Truncating Ubik logfile during an abort\n");
868         DAbort(dbase);          /* remove all dirty pages */
869     }
870
871     /* When the transaction is marked done, it also means the logfile
872      * has been truncated.
873      */
874     atrans->flags |= (TRABORT | TRDONE);
875     return 0;
876 }
877
878 /* destroy a transaction after it has been committed or aborted.  if
879  * it hasn't committed before you call this routine, we'll abort the
880  * transaction for you.
881  */
882 int
883 udisk_end(struct ubik_trans *atrans)
884 {
885     struct ubik_dbase *dbase;
886
887 #if defined(UBIK_PAUSE)
888     /* Another thread is trying to lock this transaction.
889      * That can only be an RPC doing SDISK_Lock.
890      * Unlock the transaction, 'cause otherwise the other
891      * thread will never wake up.  Don't free it because
892      * the caller will do that already.
893      */
894     if (atrans->flags & TRSETLOCK) {
895         atrans->flags |= TRSTALE;
896         ulock_relLock(atrans);
897         return;
898     }
899 #endif /* UBIK_PAUSE */
900     if (!(atrans->flags & TRDONE))
901         udisk_abort(atrans);
902     dbase = atrans->dbase;
903
904     ulock_relLock(atrans);
905     unthread(atrans);
906
907     /* check if we are the write trans before unsetting the DBWRITING bit, else
908      * we could be unsetting someone else's bit.
909      */
910     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
911         dbase->flags &= ~DBWRITING;
912     } else {
913         dbase->readers--;
914     }
915     if (atrans->iovec_info.iovec_wrt_val)
916         free(atrans->iovec_info.iovec_wrt_val);
917     if (atrans->iovec_data.iovec_buf_val)
918         free(atrans->iovec_data.iovec_buf_val);
919     free(atrans);
920
921     /* Wakeup any writers waiting in BeginTrans() */
922     LWP_NoYieldSignal(&dbase->flags);
923     return 0;
924 }