fecfcd8a94c5ea3eccee5c92cb178d6847541614
[openafs.git] / src / ubik / disk.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 RCSID
14     ("$Header$");
15
16 #include <sys/types.h>
17 #ifdef AFS_NT40_ENV
18 #include <winsock2.h>
19 #else
20 #include <sys/file.h>
21 #include <netinet/in.h>
22 #endif
23 #include <errno.h>
24 #include <string.h>
25 #include <lock.h>
26 #include <rx/xdr.h>
27
28
29
30 #define UBIK_INTERNALS
31 #include "ubik.h"
32 #include "ubik_int.h"
33
34 #define PHSIZE  128
35 static struct buffer {
36     struct ubik_dbase *dbase;   /* dbase within which the buffer resides */
37     afs_int32 file;             /* Unique cache key */
38     afs_int32 page;             /* page number */
39     struct buffer *lru_next;
40     struct buffer *lru_prev;
41     struct buffer *hashNext;    /* next dude in hash table */
42     char *data;                 /* ptr to the data */
43     char lockers;               /* usage ref count */
44     char dirty;                 /* is buffer modified */
45     char hashIndex;             /* back ptr to hash table */
46 } *Buffers;
47
48 #define pHash(page) ((page) & (PHSIZE-1))
49
50 afs_int32 ubik_nBuffers = NBUFFERS;
51 static struct buffer *phTable[PHSIZE];  /* page hash table */
52 static struct buffer *LruBuffer;
53 static int nbuffers;
54 static int calls = 0, ios = 0, lastb = 0;
55 static char *BufferData;
56 static struct buffer *newslot(struct ubik_dbase *adbase, afs_int32 afid,
57                               afs_int32 apage);
58 static initd = 0;
59 #define BADFID      0xffffffff
60
61 static DTrunc(struct ubik_dbase *dbase, afs_int32 fid, afs_int32 length);
62
63 static struct ubik_trunc *freeTruncList = 0;
64
65 /* remove a transaction from the database's active transaction list.  Don't free it */
66 static int
67 unthread(struct ubik_trans *atrans)
68 {
69     struct ubik_trans **lt, *tt;
70     lt = &atrans->dbase->activeTrans;
71     for (tt = *lt; tt; lt = &tt->next, tt = *lt) {
72         if (tt == atrans) {
73             /* found it */
74             *lt = tt->next;
75             return 0;
76         }
77     }
78     return 2;                   /* no entry */
79 }
80
81 /* some debugging assistance */
82 int
83 udisk_Debug(struct ubik_debug *aparm)
84 {
85     struct buffer *tb;
86     int i;
87
88     memcpy(&aparm->localVersion, &ubik_dbase->version,
89            sizeof(struct ubik_version));
90     aparm->lockedPages = 0;
91     aparm->writeLockedPages = 0;
92     tb = Buffers;
93     for (i = 0; i < nbuffers; i++, tb++) {
94         if (tb->lockers) {
95             aparm->lockedPages++;
96             if (tb->dirty)
97                 aparm->writeLockedPages++;
98         }
99     }
100     return 0;
101 }
102
103 /* log format is defined here, and implicitly in recovery.c
104  *
105  * 4 byte opcode, followed by parameters, each 4 bytes long.  All integers
106  * are in logged in network standard byte order, in case we want to move logs
107  * from machine-to-machine someday.
108  *
109  * Begin transaction: opcode
110  * Commit transaction: opcode, version (8 bytes)
111  * Truncate file: opcode, file number, length
112  * Abort transaction: opcode
113  * Write data: opcode, file, position, length, <length> data bytes
114  */
115
116 /* write an opcode to the log */
117 int
118 udisk_LogOpcode(struct ubik_dbase *adbase, afs_int32 aopcode, int async)
119 {
120     struct ubik_stat ustat;
121     afs_int32 code;
122
123     /* figure out where to write */
124     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
125     if (code < 0)
126         return code;
127
128     /* setup data and do write */
129     aopcode = htonl(aopcode);
130     code =
131         (*adbase->write) (adbase, LOGFILE, (char *)&aopcode, ustat.size,
132                           sizeof(afs_int32));
133     if (code != sizeof(afs_int32))
134         return UIOERROR;
135
136     /* optionally sync data */
137     if (async)
138         code = (*adbase->sync) (adbase, LOGFILE);
139     else
140         code = 0;
141     return code;
142 }
143
144 /* log a commit, never syncing */
145 int
146 udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion)
147 {
148     afs_int32 code;
149     afs_int32 data[3];
150     struct ubik_stat ustat;
151
152     /* figure out where to write */
153     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
154     if (code)
155         return code;
156
157     /* setup data */
158     data[0] = htonl(LOGEND);
159     data[1] = htonl(aversion->epoch);
160     data[2] = htonl(aversion->counter);
161
162     /* do write */
163     code =
164         (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
165                           3 * sizeof(afs_int32));
166     if (code != 3 * sizeof(afs_int32))
167         return UIOERROR;
168
169     /* finally sync the log */
170     code = (*adbase->sync) (adbase, LOGFILE);
171     return code;
172 }
173
174 /* log a truncate operation, never syncing */
175 int
176 udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile,
177                   afs_int32 alength)
178 {
179     afs_int32 code;
180     afs_int32 data[3];
181     struct ubik_stat ustat;
182
183     /* figure out where to write */
184     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
185     if (code < 0)
186         return code;
187
188     /* setup data */
189     data[0] = htonl(LOGTRUNCATE);
190     data[1] = htonl(afile);
191     data[2] = htonl(alength);
192
193     /* do write */
194     code =
195         (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
196                           3 * sizeof(afs_int32));
197     if (code != 3 * sizeof(afs_int32))
198         return UIOERROR;
199     return 0;
200 }
201
202 /* write some data to the log, never syncing */
203 int
204 udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, char *abuffer,
205                    afs_int32 apos, afs_int32 alen)
206 {
207     struct ubik_stat ustat;
208     afs_int32 code;
209     afs_int32 data[4];
210     afs_int32 lpos;
211
212     /* find end of log */
213     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
214     lpos = ustat.size;
215     if (code < 0)
216         return code;
217
218     /* setup header */
219     data[0] = htonl(LOGDATA);
220     data[1] = htonl(afile);
221     data[2] = htonl(apos);
222     data[3] = htonl(alen);
223
224     /* write header */
225     code =
226         (*adbase->write) (adbase, LOGFILE, (char *)data, lpos, 4 * sizeof(afs_int32));
227     if (code != 4 * sizeof(afs_int32))
228         return UIOERROR;
229     lpos += 4 * sizeof(afs_int32);
230
231     /* write data */
232     code = (*adbase->write) (adbase, LOGFILE, abuffer, lpos, alen);
233     if (code != alen)
234         return UIOERROR;
235     return 0;
236 }
237
238 static int
239 DInit(int abuffers)
240 {
241     /* Initialize the venus buffer system. */
242     int i;
243     struct buffer *tb;
244     Buffers = (struct buffer *)malloc(abuffers * sizeof(struct buffer));
245     memset(Buffers, 0, abuffers * sizeof(struct buffer));
246     BufferData = (char *)malloc(abuffers * UBIK_PAGESIZE);
247     nbuffers = abuffers;
248     for (i = 0; i < PHSIZE; i++)
249         phTable[i] = 0;
250     for (i = 0; i < abuffers; i++) {
251         /* Fill in each buffer with an empty indication. */
252         tb = &Buffers[i];
253         tb->lru_next = &(Buffers[i + 1]);
254         tb->lru_prev = &(Buffers[i - 1]);
255         tb->data = &BufferData[UBIK_PAGESIZE * i];
256         tb->file = BADFID;
257     }
258     Buffers[0].lru_prev = &(Buffers[abuffers - 1]);
259     Buffers[abuffers - 1].lru_next = &(Buffers[0]);
260     LruBuffer = &(Buffers[0]);
261     return 0;
262 }
263
264 /* Take a buffer and mark it as the least recently used buffer */
265 static void
266 Dlru(struct buffer *abuf)
267 {
268     if (LruBuffer == abuf)
269         return;
270
271     /* Unthread from where it is in the list */
272     abuf->lru_next->lru_prev = abuf->lru_prev;
273     abuf->lru_prev->lru_next = abuf->lru_next;
274
275     /* Thread onto beginning of LRU list */
276     abuf->lru_next = LruBuffer;
277     abuf->lru_prev = LruBuffer->lru_prev;
278
279     LruBuffer->lru_prev->lru_next = abuf;
280     LruBuffer->lru_prev = abuf;
281     LruBuffer = abuf;
282 }
283
284 /* Take a buffer and mark it as the most recently used buffer */
285 static void
286 Dmru(struct buffer *abuf)
287 {
288     if (LruBuffer == abuf) {
289         LruBuffer = LruBuffer->lru_next;
290         return;
291     }
292
293     /* Unthread from where it is in the list */
294     abuf->lru_next->lru_prev = abuf->lru_prev;
295     abuf->lru_prev->lru_next = abuf->lru_next;
296
297     /* Thread onto end of LRU list - making it the MRU buffer */
298     abuf->lru_next = LruBuffer;
299     abuf->lru_prev = LruBuffer->lru_prev;
300     LruBuffer->lru_prev->lru_next = abuf;
301     LruBuffer->lru_prev = abuf;
302 }
303
304 /* get a pointer to a particular buffer */
305 static char *
306 DRead(struct ubik_dbase *dbase, afs_int32 fid, int page)
307 {
308     /* Read a page from the disk. */
309     struct buffer *tb, *lastbuffer;
310     afs_int32 code;
311
312     calls++;
313     lastbuffer = LruBuffer->lru_prev;
314
315     if ((lastbuffer->page == page) && (lastbuffer->file == fid)
316         && (lastbuffer->dbase == dbase)) {
317         tb = lastbuffer;
318         tb->lockers++;
319         lastb++;
320         return tb->data;
321     }
322     for (tb = phTable[pHash(page)]; tb; tb = tb->hashNext) {
323         if (tb->page == page && tb->file == fid && tb->dbase == dbase) {
324             Dmru(tb);
325             tb->lockers++;
326             return tb->data;
327         }
328     }
329     /* can't find it */
330     tb = newslot(dbase, fid, page);
331     if (!tb)
332         return 0;
333     memset(tb->data, 0, UBIK_PAGESIZE);
334
335     tb->lockers++;
336     code =
337         (*dbase->read) (dbase, fid, tb->data, page * UBIK_PAGESIZE,
338                         UBIK_PAGESIZE);
339     if (code < 0) {
340         tb->file = BADFID;
341         Dlru(tb);
342         tb->lockers--;
343         ubik_print("Ubik: Error reading database file: errno=%d\n", errno);
344         return 0;
345     }
346     ios++;
347
348     /* Note that findslot sets the page field in the buffer equal to
349      * what it is searching for.
350      */
351     return tb->data;
352 }
353
354 /* zap truncated pages */
355 static int
356 DTrunc(struct ubik_dbase *dbase, afs_int32 fid, afs_int32 length)
357 {
358     afs_int32 maxPage;
359     struct buffer *tb;
360     int i;
361
362     maxPage = (length + UBIK_PAGESIZE - 1) >> UBIK_LOGPAGESIZE; /* first invalid page now in file */
363     for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
364         if (tb->page >= maxPage && tb->file == fid && tb->dbase == dbase) {
365             tb->file = BADFID;
366             Dlru(tb);
367         }
368     }
369     return 0;
370 }
371
372 /* allocate a truncation entry.  We allocate special entries representing truncations, rather than
373     performing them immediately, so that we can abort a transaction easily by simply purging
374     the in-core memory buffers and discarding these truncation entries.
375 */
376 static struct ubik_trunc *
377 GetTrunc(void)
378 {
379     struct ubik_trunc *tt;
380     if (!freeTruncList) {
381         freeTruncList =
382             (struct ubik_trunc *)malloc(sizeof(struct ubik_trunc));
383         freeTruncList->next = (struct ubik_trunc *)0;
384     }
385     tt = freeTruncList;
386     freeTruncList = tt->next;
387     return tt;
388 }
389
390 /* free a truncation entry */
391 static int
392 PutTrunc(struct ubik_trunc *at)
393 {
394     at->next = freeTruncList;
395     freeTruncList = at;
396     return 0;
397 }
398
399 /* find a truncation entry for a file, if any */
400 static struct ubik_trunc *
401 FindTrunc(struct ubik_trans *atrans, afs_int32 afile)
402 {
403     struct ubik_trunc *tt;
404     for (tt = atrans->activeTruncs; tt; tt = tt->next) {
405         if (tt->file == afile)
406             return tt;
407     }
408     return (struct ubik_trunc *)0;
409 }
410
411 /* do truncates associated with trans, and free them */
412 static int
413 DoTruncs(struct ubik_trans *atrans)
414 {
415     struct ubik_trunc *tt, *nt;
416     int (*tproc) ();
417     afs_int32 rcode = 0, code;
418
419     tproc = atrans->dbase->truncate;
420     for (tt = atrans->activeTruncs; tt; tt = nt) {
421         nt = tt->next;
422         DTrunc(atrans->dbase, tt->file, tt->length);    /* zap pages from buffer cache */
423         code = (*tproc) (atrans->dbase, tt->file, tt->length);
424         if (code)
425             rcode = code;
426         PutTrunc(tt);
427     }
428     /* don't unthread, because we do the entire list's worth here */
429     atrans->activeTruncs = (struct ubik_trunc *)0;
430     return (rcode);
431 }
432
433 /* mark a fid as invalid */
434 int
435 udisk_Invalidate(struct ubik_dbase *adbase, afs_int32 afid)
436 {
437     struct buffer *tb;
438     int i;
439
440     for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
441         if (tb->file == afid) {
442             tb->file = BADFID;
443             Dlru(tb);
444         }
445     }
446     return 0;
447 }
448
449 /* move this page into the correct hash bucket */
450 static int
451 FixupBucket(struct buffer *ap)
452 {
453     struct buffer **lp, *tp;
454     int i;
455     /* first try to get it out of its current hash bucket, in which it might not be */
456     i = ap->hashIndex;
457     lp = &phTable[i];
458     for (tp = *lp; tp; tp = tp->hashNext) {
459         if (tp == ap) {
460             *lp = tp->hashNext;
461             break;
462         }
463         lp = &tp->hashNext;
464     }
465     /* now figure the new hash bucket */
466     i = pHash(ap->page);
467     ap->hashIndex = i;          /* remember where we are for deletion */
468     ap->hashNext = phTable[i];  /* add us to the list */
469     phTable[i] = ap;
470     return 0;
471 }
472
473 /* create a new slot for a particular dbase page */
474 static struct buffer *
475 newslot(struct ubik_dbase *adbase, afs_int32 afid, afs_int32 apage)
476 {
477     /* Find a usable buffer slot */
478     afs_int32 i;
479     struct buffer *pp, *tp;
480
481     pp = 0;                     /* last pure */
482     for (i = 0, tp = LruBuffer; i < nbuffers; i++, tp = tp->lru_next) {
483         if (!tp->lockers && !tp->dirty) {
484             pp = tp;
485             break;
486         }
487     }
488
489     if (pp == 0) {
490         /* There are no unlocked buffers that don't need to be written to the disk. */
491         ubik_print
492             ("Ubik: Internal Error: Unable to find free buffer in ubik cache\n");
493         return NULL;
494     }
495
496     /* Now fill in the header. */
497     pp->dbase = adbase;
498     pp->file = afid;
499     pp->page = apage;
500
501     FixupBucket(pp);            /* move to the right hash bucket */
502     Dmru(pp);
503     return pp;
504 }
505
506 /* Release a buffer, specifying whether or not the buffer has been modified by the locker. */
507 static void
508 DRelease(char *ap, int flag)
509 {
510     int index;
511     struct buffer *bp;
512
513     if (!ap)
514         return;
515     index = (int)(ap - (char *)BufferData) >> UBIK_LOGPAGESIZE;
516     bp = &(Buffers[index]);
517     bp->lockers--;
518     if (flag)
519         bp->dirty = 1;
520     return;
521 }
522
523 /* flush all modified buffers, leaves dirty bits set (they're cleared
524  * by DSync).  Note interaction with DSync: you call this thing first,
525  * writing the buffers to the disk.  Then you call DSync to sync all the
526  * files that were written, and to clear the dirty bits.  You should
527  * always call DFlush/DSync as a pair.
528  */
529 static int
530 DFlush(struct ubik_dbase *adbase)
531 {
532     int i;
533     afs_int32 code;
534     struct buffer *tb;
535
536     tb = Buffers;
537     for (i = 0; i < nbuffers; i++, tb++) {
538         if (tb->dirty) {
539             code = tb->page * UBIK_PAGESIZE;    /* offset within file */
540             code =
541                 (*adbase->write) (adbase, tb->file, tb->data, code,
542                                   UBIK_PAGESIZE);
543             if (code != UBIK_PAGESIZE)
544                 return UIOERROR;
545         }
546     }
547     return 0;
548 }
549
550 /* flush all modified buffers */
551 static int
552 DAbort(struct ubik_dbase *adbase)
553 {
554     int i;
555     struct buffer *tb;
556
557     tb = Buffers;
558     for (i = 0; i < nbuffers; i++, tb++) {
559         if (tb->dirty) {
560             tb->dirty = 0;
561             tb->file = BADFID;
562             Dlru(tb);
563         }
564     }
565     return 0;
566 }
567
568 /* must only be called after DFlush, due to its interpretation of dirty flag */
569 static int
570 DSync(struct ubik_dbase *adbase)
571 {
572     int i;
573     afs_int32 code;
574     struct buffer *tb;
575     afs_int32 file;
576     afs_int32 rCode;
577
578     rCode = 0;
579     while (1) {
580         file = BADFID;
581         for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
582             if (tb->dirty == 1) {
583                 if (file == BADFID)
584                     file = tb->file;
585                 if (file != BADFID && tb->file == file)
586                     tb->dirty = 0;
587             }
588         }
589         if (file == BADFID)
590             break;
591         /* otherwise we have a file to sync */
592         code = (*adbase->sync) (adbase, file);
593         if (code)
594             rCode = code;
595     }
596     return rCode;
597 }
598
599 /* Same as read, only do not even try to read the page */
600 static char *
601 DNew(struct ubik_dbase *dbase, afs_int32 fid, int page)
602 {
603     struct buffer *tb;
604
605     if ((tb = newslot(dbase, fid, page)) == 0)
606         return NULL;
607     tb->lockers++;
608     memset(tb->data, 0, UBIK_PAGESIZE);
609     return tb->data;
610 }
611
612 /* read data from database */
613 int
614 udisk_read(struct ubik_trans *atrans, afs_int32 afile, char *abuffer,
615            afs_int32 apos, afs_int32 alen)
616 {
617     char *bp;
618     afs_int32 offset, len, totalLen;
619     struct ubik_dbase *dbase;
620
621     if (atrans->flags & TRDONE)
622         return UDONE;
623     totalLen = 0;
624     dbase = atrans->dbase;
625     while (alen > 0) {
626         bp = DRead(dbase, afile, apos >> UBIK_LOGPAGESIZE);
627         if (!bp)
628             return UEOF;
629         /* otherwise, min of remaining bytes and end of buffer to user mode */
630         offset = apos & (UBIK_PAGESIZE - 1);
631         len = UBIK_PAGESIZE - offset;
632         if (len > alen)
633             len = alen;
634         memcpy(abuffer, bp + offset, len);
635         abuffer += len;
636         apos += len;
637         alen -= len;
638         totalLen += len;
639         DRelease(bp, 0);
640     }
641     return 0;
642 }
643
644 /* truncate file */
645 int
646 udisk_truncate(struct ubik_trans *atrans, afs_int32 afile, afs_int32 alength)
647 {
648     afs_int32 code;
649     struct ubik_trunc *tt;
650
651     if (atrans->flags & TRDONE)
652         return UDONE;
653     if (atrans->type != UBIK_WRITETRANS)
654         return UBADTYPE;
655
656     /* write a truncate log record */
657     code = udisk_LogTruncate(atrans->dbase, afile, alength);
658
659     /* don't truncate until commit time */
660     tt = FindTrunc(atrans, afile);
661     if (!tt) {
662         /* this file not truncated yet */
663         tt = GetTrunc();
664         tt->next = atrans->activeTruncs;
665         atrans->activeTruncs = tt;
666         tt->file = afile;
667         tt->length = alength;
668     } else {
669         /* already truncated to a certain length */
670         if (tt->length > alength)
671             tt->length = alength;
672     }
673     return code;
674 }
675
676 /* write data to database, using logs */
677 int
678 udisk_write(struct ubik_trans *atrans, afs_int32 afile, char *abuffer,
679             afs_int32 apos, afs_int32 alen)
680 {
681     char *bp;
682     afs_int32 offset, len, totalLen;
683     struct ubik_dbase *dbase;
684     struct ubik_trunc *tt;
685     afs_int32 code;
686
687     if (atrans->flags & TRDONE)
688         return UDONE;
689     if (atrans->type != UBIK_WRITETRANS)
690         return UBADTYPE;
691
692     dbase = atrans->dbase;
693     /* first write the data to the log */
694     code = udisk_LogWriteData(dbase, afile, abuffer, apos, alen);
695     if (code)
696         return code;
697
698     /* expand any truncations of this file */
699     tt = FindTrunc(atrans, afile);
700     if (tt) {
701         if (tt->length < apos + alen) {
702             tt->length = apos + alen;
703         }
704     }
705
706     /* now update vm */
707     totalLen = 0;
708     while (alen > 0) {
709         bp = DRead(dbase, afile, apos >> UBIK_LOGPAGESIZE);
710         if (!bp) {
711             bp = DNew(dbase, afile, apos >> UBIK_LOGPAGESIZE);
712             if (!bp)
713                 return UIOERROR;
714             memset(bp, 0, UBIK_PAGESIZE);
715         }
716         /* otherwise, min of remaining bytes and end of buffer to user mode */
717         offset = apos & (UBIK_PAGESIZE - 1);
718         len = UBIK_PAGESIZE - offset;
719         if (len > alen)
720             len = alen;
721         memcpy(bp + offset, abuffer, len);
722         abuffer += len;
723         apos += len;
724         alen -= len;
725         totalLen += len;
726         DRelease(bp, 1);        /* buffer modified */
727     }
728     return 0;
729 }
730
731 /* begin a new local transaction */
732 int
733 udisk_begin(struct ubik_dbase *adbase, int atype, struct ubik_trans **atrans)
734 {
735     afs_int32 code;
736     struct ubik_trans *tt;
737
738     *atrans = (struct ubik_trans *)NULL;
739     /* Make sure system is initialized before doing anything */
740     if (!initd) {
741         initd = 1;
742         DInit(ubik_nBuffers);
743     }
744     if (atype == UBIK_WRITETRANS) {
745         if (adbase->flags & DBWRITING)
746             return USYNC;
747         code = udisk_LogOpcode(adbase, LOGNEW, 0);
748         if (code)
749             return code;
750     }
751     tt = (struct ubik_trans *)malloc(sizeof(struct ubik_trans));
752     memset(tt, 0, sizeof(struct ubik_trans));
753     tt->dbase = adbase;
754     tt->next = adbase->activeTrans;
755     adbase->activeTrans = tt;
756     tt->type = atype;
757     if (atype == UBIK_READTRANS)
758         adbase->readers++;
759     else if (atype == UBIK_WRITETRANS)
760         adbase->flags |= DBWRITING;
761     *atrans = tt;
762     return 0;
763 }
764
765 /* commit transaction */
766 int
767 udisk_commit(struct ubik_trans *atrans)
768 {
769     struct ubik_dbase *dbase;
770     afs_int32 code = 0;
771     struct ubik_version oldversion, newversion;
772
773     if (atrans->flags & TRDONE)
774         return (UTWOENDS);
775
776     if (atrans->type == UBIK_WRITETRANS) {
777         dbase = atrans->dbase;
778
779         /* On the first write to the database. We update the versions */
780         if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
781             oldversion = dbase->version;
782             newversion.epoch = FT_ApproxTime();;
783             newversion.counter = 1;
784
785             code = (*dbase->setlabel) (dbase, 0, &newversion);
786             if (code)
787                 return (code);
788             ubik_epochTime = newversion.epoch;
789             dbase->version = newversion;
790
791             /* Ignore the error here. If the call fails, the site is
792              * marked down and when we detect it is up again, we will 
793              * send the entire database to it.
794              */
795             ContactQuorum(DISK_SetVersion, atrans, 1 /*CStampVersion */ ,
796                           &oldversion, &newversion);
797             urecovery_state |= UBIK_RECLABELDB;
798         }
799
800         dbase->version.counter++;       /* bump commit count */
801 #if defined(AFS_PTHREAD_ENV) && defined(UBIK_PTHREAD_ENV)
802         assert(pthread_cond_broadcast(&dbase->version_cond) == 0);
803 #else
804         LWP_NoYieldSignal(&dbase->version);
805 #endif
806         code = udisk_LogEnd(dbase, &dbase->version);
807         if (code) {
808             dbase->version.counter--;
809             return (code);
810         }
811
812         /* If we fail anytime after this, then panic and let the
813          * recovery replay the log. 
814          */
815         code = DFlush(dbase);   /* write dirty pages to respective files */
816         if (code)
817             panic("Writing Ubik DB modifications\n");
818         code = DSync(dbase);    /* sync the files and mark pages not dirty */
819         if (code)
820             panic("Synchronizing Ubik DB modifications\n");
821
822         code = DoTruncs(atrans);        /* Perform requested truncations */
823         if (code)
824             panic("Truncating Ubik DB\n");
825
826         /* label the committed dbase */
827         code = (*dbase->setlabel) (dbase, 0, &dbase->version);
828         if (code)
829             panic("Truncating Ubik DB\n");
830
831         code = (*dbase->truncate) (dbase, LOGFILE, 0);  /* discard log (optional) */
832         if (code)
833             panic("Truncating Ubik logfile\n");
834
835     }
836
837     /* When the transaction is marked done, it also means the logfile
838      * has been truncated.
839      */
840     atrans->flags |= TRDONE;
841     return code;
842 }
843
844 /* abort transaction */
845 int
846 udisk_abort(struct ubik_trans *atrans)
847 {
848     struct ubik_dbase *dbase;
849     afs_int32 code;
850
851     if (atrans->flags & TRDONE)
852         return UTWOENDS;
853
854     /* Check if we are the write trans before logging abort, lest we
855      * abort a good write trans in progress. 
856      * We don't really care if the LOGABORT gets to the log because we 
857      * truncate the log next. If the truncate fails, we panic; for 
858      * otherwise, the log entries remain. On restart, replay of the log
859      * will do nothing because the abort is there or no LogEnd opcode.
860      */
861     dbase = atrans->dbase;
862     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
863         udisk_LogOpcode(dbase, LOGABORT, 1);
864         code = (*dbase->truncate) (dbase, LOGFILE, 0);
865         if (code)
866             panic("Truncating Ubik logfile during an abort\n");
867         DAbort(dbase);          /* remove all dirty pages */
868     }
869
870     /* When the transaction is marked done, it also means the logfile
871      * has been truncated.
872      */
873     atrans->flags |= (TRABORT | TRDONE);
874     return 0;
875 }
876
877 /* destroy a transaction after it has been committed or aborted.  if
878  * it hasn't committed before you call this routine, we'll abort the
879  * transaction for you.
880  */
881 int
882 udisk_end(struct ubik_trans *atrans)
883 {
884     struct ubik_dbase *dbase;
885
886 #if defined(UBIK_PAUSE)
887     /* Another thread is trying to lock this transaction.
888      * That can only be an RPC doing SDISK_Lock.
889      * Unlock the transaction, 'cause otherwise the other
890      * thread will never wake up.  Don't free it because
891      * the caller will do that already.
892      */
893     if (atrans->flags & TRSETLOCK) {
894         atrans->flags |= TRSTALE;
895         ulock_relLock(atrans);
896         return;
897     }
898 #endif /* UBIK_PAUSE */
899     if (!(atrans->flags & TRDONE))
900         udisk_abort(atrans);
901     dbase = atrans->dbase;
902
903     ulock_relLock(atrans);
904     unthread(atrans);
905
906     /* check if we are the write trans before unsetting the DBWRITING bit, else
907      * we could be unsetting someone else's bit.
908      */
909     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
910         dbase->flags &= ~DBWRITING;
911     } else {
912         dbase->readers--;
913     }
914     if (atrans->iovec_info.iovec_wrt_val)
915         free(atrans->iovec_info.iovec_wrt_val);
916     if (atrans->iovec_data.iovec_buf_val)
917         free(atrans->iovec_data.iovec_buf_val);
918     free(atrans);
919
920     /* Wakeup any writers waiting in BeginTrans() */
921 #if defined(AFS_PTHREAD_ENV) && defined(UBIK_PTHREAD_ENV)
922         assert(pthread_cond_broadcast(&dbase->flags_cond) == 0);
923 #else
924     LWP_NoYieldSignal(&dbase->flags);
925 #endif
926     return 0;
927 }