9ad89f882cdcbc72a26274544aa351135b55d640
[openafs.git] / src / ubik / disk.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 RCSID
14     ("$Header$");
15
16 #include <sys/types.h>
17 #ifdef AFS_NT40_ENV
18 #include <winsock2.h>
19 #else
20 #include <sys/file.h>
21 #include <netinet/in.h>
22 #endif
23 #include <errno.h>
24 #include <string.h>
25 #include <lock.h>
26 #include <rx/xdr.h>
27
28
29
30 #define UBIK_INTERNALS
31 #include "ubik.h"
32 #include "ubik_int.h"
33
34 #define PHSIZE  128
35 static struct buffer {
36     struct ubik_dbase *dbase;   /*!< dbase within which the buffer resides */
37     afs_int32 file;             /*!< Unique cache key */
38     afs_int32 page;             /*!< page number */
39     struct buffer *lru_next;
40     struct buffer *lru_prev;
41     struct buffer *hashNext;    /*!< next dude in hash table */
42     char *data;                 /*!< ptr to the data */
43     char lockers;               /*!< usage ref count */
44     char dirty;                 /*!< is buffer modified */
45     char hashIndex;             /*!< back ptr to hash table */
46 } *Buffers;
47
48 #define pHash(page) ((page) & (PHSIZE-1))
49
50 afs_int32 ubik_nBuffers = NBUFFERS;
51 static struct buffer *phTable[PHSIZE];  /*!< page hash table */
52 static struct buffer *LruBuffer;
53 static int nbuffers;
54 static int calls = 0, ios = 0, lastb = 0;
55 static char *BufferData;
56 static struct buffer *newslot(struct ubik_dbase *adbase, afs_int32 afid,
57                               afs_int32 apage);
58 static initd = 0;
59 #define BADFID      0xffffffff
60
61 static DTrunc(struct ubik_dbase *dbase, afs_int32 fid, afs_int32 length);
62
63 static struct ubik_trunc *freeTruncList = 0;
64
65 /*!
66  * \brief Remove a transaction from the database's active transaction list.  Don't free it.
67  */
68 static int
69 unthread(struct ubik_trans *atrans)
70 {
71     struct ubik_trans **lt, *tt;
72     lt = &atrans->dbase->activeTrans;
73     for (tt = *lt; tt; lt = &tt->next, tt = *lt) {
74         if (tt == atrans) {
75             /* found it */
76             *lt = tt->next;
77             return 0;
78         }
79     }
80     return 2;                   /* no entry */
81 }
82
83 /*!
84  * \brief some debugging assistance
85  */
86 int
87 udisk_Debug(struct ubik_debug *aparm)
88 {
89     struct buffer *tb;
90     int i;
91
92     memcpy(&aparm->localVersion, &ubik_dbase->version,
93            sizeof(struct ubik_version));
94     aparm->lockedPages = 0;
95     aparm->writeLockedPages = 0;
96     tb = Buffers;
97     for (i = 0; i < nbuffers; i++, tb++) {
98         if (tb->lockers) {
99             aparm->lockedPages++;
100             if (tb->dirty)
101                 aparm->writeLockedPages++;
102         }
103     }
104     return 0;
105 }
106
107 /*!
108  * \brief Write an opcode to the log.
109  *
110  * log format is defined here, and implicitly in recovery.c
111  *
112  * 4 byte opcode, followed by parameters, each 4 bytes long.  All integers
113  * are in logged in network standard byte order, in case we want to move logs
114  * from machine-to-machine someday.
115  *
116  * Begin transaction: opcode \n
117  * Commit transaction: opcode, version (8 bytes) \n
118  * Truncate file: opcode, file number, length \n
119  * Abort transaction: opcode \n
120  * Write data: opcode, file, position, length, <length> data bytes \n
121  */
122 int
123 udisk_LogOpcode(struct ubik_dbase *adbase, afs_int32 aopcode, int async)
124 {
125     struct ubik_stat ustat;
126     afs_int32 code;
127
128     /* figure out where to write */
129     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
130     if (code < 0)
131         return code;
132
133     /* setup data and do write */
134     aopcode = htonl(aopcode);
135     code =
136         (*adbase->write) (adbase, LOGFILE, (char *)&aopcode, ustat.size,
137                           sizeof(afs_int32));
138     if (code != sizeof(afs_int32))
139         return UIOERROR;
140
141     /* optionally sync data */
142     if (async)
143         code = (*adbase->sync) (adbase, LOGFILE);
144     else
145         code = 0;
146     return code;
147 }
148
149 /*!
150  * \brief Log a commit, never syncing.
151  */
152 int
153 udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion)
154 {
155     afs_int32 code;
156     afs_int32 data[3];
157     struct ubik_stat ustat;
158
159     /* figure out where to write */
160     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
161     if (code)
162         return code;
163
164     /* setup data */
165     data[0] = htonl(LOGEND);
166     data[1] = htonl(aversion->epoch);
167     data[2] = htonl(aversion->counter);
168
169     /* do write */
170     code =
171         (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
172                           3 * sizeof(afs_int32));
173     if (code != 3 * sizeof(afs_int32))
174         return UIOERROR;
175
176     /* finally sync the log */
177     code = (*adbase->sync) (adbase, LOGFILE);
178     return code;
179 }
180
181 /*!
182  * \brief Log a truncate operation, never syncing.
183  */
184 int
185 udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile,
186                   afs_int32 alength)
187 {
188     afs_int32 code;
189     afs_int32 data[3];
190     struct ubik_stat ustat;
191
192     /* figure out where to write */
193     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
194     if (code < 0)
195         return code;
196
197     /* setup data */
198     data[0] = htonl(LOGTRUNCATE);
199     data[1] = htonl(afile);
200     data[2] = htonl(alength);
201
202     /* do write */
203     code =
204         (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
205                           3 * sizeof(afs_int32));
206     if (code != 3 * sizeof(afs_int32))
207         return UIOERROR;
208     return 0;
209 }
210
211 /*!
212  * \brief Write some data to the log, never syncing.
213  */
214 int
215 udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, char *abuffer,
216                    afs_int32 apos, afs_int32 alen)
217 {
218     struct ubik_stat ustat;
219     afs_int32 code;
220     afs_int32 data[4];
221     afs_int32 lpos;
222
223     /* find end of log */
224     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
225     lpos = ustat.size;
226     if (code < 0)
227         return code;
228
229     /* setup header */
230     data[0] = htonl(LOGDATA);
231     data[1] = htonl(afile);
232     data[2] = htonl(apos);
233     data[3] = htonl(alen);
234
235     /* write header */
236     code =
237         (*adbase->write) (adbase, LOGFILE, (char *)data, lpos, 4 * sizeof(afs_int32));
238     if (code != 4 * sizeof(afs_int32))
239         return UIOERROR;
240     lpos += 4 * sizeof(afs_int32);
241
242     /* write data */
243     code = (*adbase->write) (adbase, LOGFILE, abuffer, lpos, alen);
244     if (code != alen)
245         return UIOERROR;
246     return 0;
247 }
248
249 static int
250 DInit(int abuffers)
251 {
252     /* Initialize the venus buffer system. */
253     int i;
254     struct buffer *tb;
255     Buffers = (struct buffer *)malloc(abuffers * sizeof(struct buffer));
256     memset(Buffers, 0, abuffers * sizeof(struct buffer));
257     BufferData = (char *)malloc(abuffers * UBIK_PAGESIZE);
258     nbuffers = abuffers;
259     for (i = 0; i < PHSIZE; i++)
260         phTable[i] = 0;
261     for (i = 0; i < abuffers; i++) {
262         /* Fill in each buffer with an empty indication. */
263         tb = &Buffers[i];
264         tb->lru_next = &(Buffers[i + 1]);
265         tb->lru_prev = &(Buffers[i - 1]);
266         tb->data = &BufferData[UBIK_PAGESIZE * i];
267         tb->file = BADFID;
268     }
269     Buffers[0].lru_prev = &(Buffers[abuffers - 1]);
270     Buffers[abuffers - 1].lru_next = &(Buffers[0]);
271     LruBuffer = &(Buffers[0]);
272     return 0;
273 }
274
275 /*!
276  * \brief Take a buffer and mark it as the least recently used buffer.
277  */
278 static void
279 Dlru(struct buffer *abuf)
280 {
281     if (LruBuffer == abuf)
282         return;
283
284     /* Unthread from where it is in the list */
285     abuf->lru_next->lru_prev = abuf->lru_prev;
286     abuf->lru_prev->lru_next = abuf->lru_next;
287
288     /* Thread onto beginning of LRU list */
289     abuf->lru_next = LruBuffer;
290     abuf->lru_prev = LruBuffer->lru_prev;
291
292     LruBuffer->lru_prev->lru_next = abuf;
293     LruBuffer->lru_prev = abuf;
294     LruBuffer = abuf;
295 }
296
297 /*!
298  * \brief Take a buffer and mark it as the most recently used buffer.
299  */
300 static void
301 Dmru(struct buffer *abuf)
302 {
303     if (LruBuffer == abuf) {
304         LruBuffer = LruBuffer->lru_next;
305         return;
306     }
307
308     /* Unthread from where it is in the list */
309     abuf->lru_next->lru_prev = abuf->lru_prev;
310     abuf->lru_prev->lru_next = abuf->lru_next;
311
312     /* Thread onto end of LRU list - making it the MRU buffer */
313     abuf->lru_next = LruBuffer;
314     abuf->lru_prev = LruBuffer->lru_prev;
315     LruBuffer->lru_prev->lru_next = abuf;
316     LruBuffer->lru_prev = abuf;
317 }
318
319 /*!
320  * \brief Get a pointer to a particular buffer.
321  */
322 static char *
323 DRead(struct ubik_dbase *dbase, afs_int32 fid, int page)
324 {
325     /* Read a page from the disk. */
326     struct buffer *tb, *lastbuffer;
327     afs_int32 code;
328
329     calls++;
330     lastbuffer = LruBuffer->lru_prev;
331
332     if ((lastbuffer->page == page) && (lastbuffer->file == fid)
333         && (lastbuffer->dbase == dbase)) {
334         tb = lastbuffer;
335         tb->lockers++;
336         lastb++;
337         return tb->data;
338     }
339     for (tb = phTable[pHash(page)]; tb; tb = tb->hashNext) {
340         if (tb->page == page && tb->file == fid && tb->dbase == dbase) {
341             Dmru(tb);
342             tb->lockers++;
343             return tb->data;
344         }
345     }
346     /* can't find it */
347     tb = newslot(dbase, fid, page);
348     if (!tb)
349         return 0;
350     memset(tb->data, 0, UBIK_PAGESIZE);
351
352     tb->lockers++;
353     code =
354         (*dbase->read) (dbase, fid, tb->data, page * UBIK_PAGESIZE,
355                         UBIK_PAGESIZE);
356     if (code < 0) {
357         tb->file = BADFID;
358         Dlru(tb);
359         tb->lockers--;
360         ubik_print("Ubik: Error reading database file: errno=%d\n", errno);
361         return 0;
362     }
363     ios++;
364
365     /* Note that findslot sets the page field in the buffer equal to
366      * what it is searching for.
367      */
368     return tb->data;
369 }
370
371 /*!
372  * \brief Zap truncated pages.
373  */
374 static int
375 DTrunc(struct ubik_dbase *dbase, afs_int32 fid, afs_int32 length)
376 {
377     afs_int32 maxPage;
378     struct buffer *tb;
379     int i;
380
381     maxPage = (length + UBIK_PAGESIZE - 1) >> UBIK_LOGPAGESIZE; /* first invalid page now in file */
382     for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
383         if (tb->page >= maxPage && tb->file == fid && tb->dbase == dbase) {
384             tb->file = BADFID;
385             Dlru(tb);
386         }
387     }
388     return 0;
389 }
390
391 /*!
392  * \brief Allocate a truncation entry.
393  *
394  * We allocate special entries representing truncations, rather than
395  * performing them immediately, so that we can abort a transaction easily by simply purging
396  * the in-core memory buffers and discarding these truncation entries.
397  */
398 static struct ubik_trunc *
399 GetTrunc(void)
400 {
401     struct ubik_trunc *tt;
402     if (!freeTruncList) {
403         freeTruncList =
404             (struct ubik_trunc *)malloc(sizeof(struct ubik_trunc));
405         freeTruncList->next = (struct ubik_trunc *)0;
406     }
407     tt = freeTruncList;
408     freeTruncList = tt->next;
409     return tt;
410 }
411
412 /*!
413  * \brief Free a truncation entry.
414  */
415 static int
416 PutTrunc(struct ubik_trunc *at)
417 {
418     at->next = freeTruncList;
419     freeTruncList = at;
420     return 0;
421 }
422
423 /*!
424  * \brief Find a truncation entry for a file, if any.
425  */
426 static struct ubik_trunc *
427 FindTrunc(struct ubik_trans *atrans, afs_int32 afile)
428 {
429     struct ubik_trunc *tt;
430     for (tt = atrans->activeTruncs; tt; tt = tt->next) {
431         if (tt->file == afile)
432             return tt;
433     }
434     return (struct ubik_trunc *)0;
435 }
436
437 /*!
438  * \brief Do truncates associated with \p atrans, and free them.
439  */
440 static int
441 DoTruncs(struct ubik_trans *atrans)
442 {
443     struct ubik_trunc *tt, *nt;
444     int (*tproc) ();
445     afs_int32 rcode = 0, code;
446
447     tproc = atrans->dbase->truncate;
448     for (tt = atrans->activeTruncs; tt; tt = nt) {
449         nt = tt->next;
450         DTrunc(atrans->dbase, tt->file, tt->length);    /* zap pages from buffer cache */
451         code = (*tproc) (atrans->dbase, tt->file, tt->length);
452         if (code)
453             rcode = code;
454         PutTrunc(tt);
455     }
456     /* don't unthread, because we do the entire list's worth here */
457     atrans->activeTruncs = (struct ubik_trunc *)0;
458     return (rcode);
459 }
460
461 /*!
462  * \brief Mark an \p fid as invalid.
463  */
464 int
465 udisk_Invalidate(struct ubik_dbase *adbase, afs_int32 afid)
466 {
467     struct buffer *tb;
468     int i;
469
470     for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
471         if (tb->file == afid) {
472             tb->file = BADFID;
473             Dlru(tb);
474         }
475     }
476     return 0;
477 }
478
479 /*!
480  * \brief Move this page into the correct hash bucket.
481  */
482 static int
483 FixupBucket(struct buffer *ap)
484 {
485     struct buffer **lp, *tp;
486     int i;
487     /* first try to get it out of its current hash bucket, in which it might not be */
488     i = ap->hashIndex;
489     lp = &phTable[i];
490     for (tp = *lp; tp; tp = tp->hashNext) {
491         if (tp == ap) {
492             *lp = tp->hashNext;
493             break;
494         }
495         lp = &tp->hashNext;
496     }
497     /* now figure the new hash bucket */
498     i = pHash(ap->page);
499     ap->hashIndex = i;          /* remember where we are for deletion */
500     ap->hashNext = phTable[i];  /* add us to the list */
501     phTable[i] = ap;
502     return 0;
503 }
504
505 /*!
506  * \brief Create a new slot for a particular dbase page.
507  */
508 static struct buffer *
509 newslot(struct ubik_dbase *adbase, afs_int32 afid, afs_int32 apage)
510 {
511     /* Find a usable buffer slot */
512     afs_int32 i;
513     struct buffer *pp, *tp;
514
515     pp = 0;                     /* last pure */
516     for (i = 0, tp = LruBuffer; i < nbuffers; i++, tp = tp->lru_next) {
517         if (!tp->lockers && !tp->dirty) {
518             pp = tp;
519             break;
520         }
521     }
522
523     if (pp == 0) {
524         /* There are no unlocked buffers that don't need to be written to the disk. */
525         ubik_print
526             ("Ubik: Internal Error: Unable to find free buffer in ubik cache\n");
527         return NULL;
528     }
529
530     /* Now fill in the header. */
531     pp->dbase = adbase;
532     pp->file = afid;
533     pp->page = apage;
534
535     FixupBucket(pp);            /* move to the right hash bucket */
536     Dmru(pp);
537     return pp;
538 }
539
540 /*!
541  * \brief Release a buffer, specifying whether or not the buffer has been modified by the locker.
542  */
543 static void
544 DRelease(char *ap, int flag)
545 {
546     int index;
547     struct buffer *bp;
548
549     if (!ap)
550         return;
551     index = (int)(ap - (char *)BufferData) >> UBIK_LOGPAGESIZE;
552     bp = &(Buffers[index]);
553     bp->lockers--;
554     if (flag)
555         bp->dirty = 1;
556     return;
557 }
558
559 /*!
560  * \brief Flush all modified buffers, leaves dirty bits set (they're cleared
561  * by DSync()).  
562  *
563  * \note Note interaction with DSync(): you call this thing first,
564  * writing the buffers to the disk.  Then you call DSync() to sync all the
565  * files that were written, and to clear the dirty bits.  You should
566  * always call DFlush/DSync as a pair.
567  */
568 static int
569 DFlush(struct ubik_dbase *adbase)
570 {
571     int i;
572     afs_int32 code;
573     struct buffer *tb;
574
575     tb = Buffers;
576     for (i = 0; i < nbuffers; i++, tb++) {
577         if (tb->dirty) {
578             code = tb->page * UBIK_PAGESIZE;    /* offset within file */
579             code =
580                 (*adbase->write) (adbase, tb->file, tb->data, code,
581                                   UBIK_PAGESIZE);
582             if (code != UBIK_PAGESIZE)
583                 return UIOERROR;
584         }
585     }
586     return 0;
587 }
588
589 /*!
590  * \brief Flush all modified buffers.
591  */
592 static int
593 DAbort(struct ubik_dbase *adbase)
594 {
595     int i;
596     struct buffer *tb;
597
598     tb = Buffers;
599     for (i = 0; i < nbuffers; i++, tb++) {
600         if (tb->dirty) {
601             tb->dirty = 0;
602             tb->file = BADFID;
603             Dlru(tb);
604         }
605     }
606     return 0;
607 }
608
609 /*!
610  * \attention DSync() must only be called after DFlush(), due to its interpretation of dirty flag.
611  */
612 static int
613 DSync(struct ubik_dbase *adbase)
614 {
615     int i;
616     afs_int32 code;
617     struct buffer *tb;
618     afs_int32 file;
619     afs_int32 rCode;
620
621     rCode = 0;
622     while (1) {
623         file = BADFID;
624         for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
625             if (tb->dirty == 1) {
626                 if (file == BADFID)
627                     file = tb->file;
628                 if (file != BADFID && tb->file == file)
629                     tb->dirty = 0;
630             }
631         }
632         if (file == BADFID)
633             break;
634         /* otherwise we have a file to sync */
635         code = (*adbase->sync) (adbase, file);
636         if (code)
637             rCode = code;
638     }
639     return rCode;
640 }
641
642 /*!
643  * \brief Same as DRead(), only do not even try to read the page.
644  */
645 static char *
646 DNew(struct ubik_dbase *dbase, afs_int32 fid, int page)
647 {
648     struct buffer *tb;
649
650     if ((tb = newslot(dbase, fid, page)) == 0)
651         return NULL;
652     tb->lockers++;
653     memset(tb->data, 0, UBIK_PAGESIZE);
654     return tb->data;
655 }
656
657 /*!
658  * \brief Read data from database.
659  */
660 int
661 udisk_read(struct ubik_trans *atrans, afs_int32 afile, char *abuffer,
662            afs_int32 apos, afs_int32 alen)
663 {
664     char *bp;
665     afs_int32 offset, len, totalLen;
666     struct ubik_dbase *dbase;
667
668     if (atrans->flags & TRDONE)
669         return UDONE;
670     totalLen = 0;
671     dbase = atrans->dbase;
672     while (alen > 0) {
673         bp = DRead(dbase, afile, apos >> UBIK_LOGPAGESIZE);
674         if (!bp)
675             return UEOF;
676         /* otherwise, min of remaining bytes and end of buffer to user mode */
677         offset = apos & (UBIK_PAGESIZE - 1);
678         len = UBIK_PAGESIZE - offset;
679         if (len > alen)
680             len = alen;
681         memcpy(abuffer, bp + offset, len);
682         abuffer += len;
683         apos += len;
684         alen -= len;
685         totalLen += len;
686         DRelease(bp, 0);
687     }
688     return 0;
689 }
690
691 /*!
692  * \brief Truncate file.
693  */
694 int
695 udisk_truncate(struct ubik_trans *atrans, afs_int32 afile, afs_int32 alength)
696 {
697     afs_int32 code;
698     struct ubik_trunc *tt;
699
700     if (atrans->flags & TRDONE)
701         return UDONE;
702     if (atrans->type != UBIK_WRITETRANS)
703         return UBADTYPE;
704
705     /* write a truncate log record */
706     code = udisk_LogTruncate(atrans->dbase, afile, alength);
707
708     /* don't truncate until commit time */
709     tt = FindTrunc(atrans, afile);
710     if (!tt) {
711         /* this file not truncated yet */
712         tt = GetTrunc();
713         tt->next = atrans->activeTruncs;
714         atrans->activeTruncs = tt;
715         tt->file = afile;
716         tt->length = alength;
717     } else {
718         /* already truncated to a certain length */
719         if (tt->length > alength)
720             tt->length = alength;
721     }
722     return code;
723 }
724
725 /*!
726  * \brief Write data to database, using logs.
727  */
728 int
729 udisk_write(struct ubik_trans *atrans, afs_int32 afile, char *abuffer,
730             afs_int32 apos, afs_int32 alen)
731 {
732     char *bp;
733     afs_int32 offset, len, totalLen;
734     struct ubik_dbase *dbase;
735     struct ubik_trunc *tt;
736     afs_int32 code;
737
738     if (atrans->flags & TRDONE)
739         return UDONE;
740     if (atrans->type != UBIK_WRITETRANS)
741         return UBADTYPE;
742
743     dbase = atrans->dbase;
744     /* first write the data to the log */
745     code = udisk_LogWriteData(dbase, afile, abuffer, apos, alen);
746     if (code)
747         return code;
748
749     /* expand any truncations of this file */
750     tt = FindTrunc(atrans, afile);
751     if (tt) {
752         if (tt->length < apos + alen) {
753             tt->length = apos + alen;
754         }
755     }
756
757     /* now update vm */
758     totalLen = 0;
759     while (alen > 0) {
760         bp = DRead(dbase, afile, apos >> UBIK_LOGPAGESIZE);
761         if (!bp) {
762             bp = DNew(dbase, afile, apos >> UBIK_LOGPAGESIZE);
763             if (!bp)
764                 return UIOERROR;
765             memset(bp, 0, UBIK_PAGESIZE);
766         }
767         /* otherwise, min of remaining bytes and end of buffer to user mode */
768         offset = apos & (UBIK_PAGESIZE - 1);
769         len = UBIK_PAGESIZE - offset;
770         if (len > alen)
771             len = alen;
772         memcpy(bp + offset, abuffer, len);
773         abuffer += len;
774         apos += len;
775         alen -= len;
776         totalLen += len;
777         DRelease(bp, 1);        /* buffer modified */
778     }
779     return 0;
780 }
781
782 /*!
783  * \brief Begin a new local transaction.
784  */
785 int
786 udisk_begin(struct ubik_dbase *adbase, int atype, struct ubik_trans **atrans)
787 {
788     afs_int32 code;
789     struct ubik_trans *tt;
790
791     *atrans = (struct ubik_trans *)NULL;
792     /* Make sure system is initialized before doing anything */
793     if (!initd) {
794         initd = 1;
795         DInit(ubik_nBuffers);
796     }
797     if (atype == UBIK_WRITETRANS) {
798         if (adbase->flags & DBWRITING)
799             return USYNC;
800         code = udisk_LogOpcode(adbase, LOGNEW, 0);
801         if (code)
802             return code;
803     }
804     tt = (struct ubik_trans *)malloc(sizeof(struct ubik_trans));
805     memset(tt, 0, sizeof(struct ubik_trans));
806     tt->dbase = adbase;
807     tt->next = adbase->activeTrans;
808     adbase->activeTrans = tt;
809     tt->type = atype;
810     if (atype == UBIK_READTRANS)
811         adbase->readers++;
812     else if (atype == UBIK_WRITETRANS)
813         adbase->flags |= DBWRITING;
814     *atrans = tt;
815     return 0;
816 }
817
818 /*!
819  * \brief Commit transaction.
820  */
821 int
822 udisk_commit(struct ubik_trans *atrans)
823 {
824     struct ubik_dbase *dbase;
825     afs_int32 code = 0;
826     struct ubik_version oldversion, newversion;
827
828     if (atrans->flags & TRDONE)
829         return (UTWOENDS);
830
831     if (atrans->type == UBIK_WRITETRANS) {
832         dbase = atrans->dbase;
833
834         /* On the first write to the database. We update the versions */
835         if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
836             oldversion = dbase->version;
837             newversion.epoch = FT_ApproxTime();;
838             newversion.counter = 1;
839
840             code = (*dbase->setlabel) (dbase, 0, &newversion);
841             if (code)
842                 return (code);
843             ubik_epochTime = newversion.epoch;
844             dbase->version = newversion;
845
846             /* Ignore the error here. If the call fails, the site is
847              * marked down and when we detect it is up again, we will 
848              * send the entire database to it.
849              */
850             ContactQuorum(DISK_SetVersion, atrans, 1 /*CStampVersion */ ,
851                           &oldversion, &newversion);
852             urecovery_state |= UBIK_RECLABELDB;
853         }
854
855         dbase->version.counter++;       /* bump commit count */
856 #ifdef AFS_PTHREAD_ENV
857         assert(pthread_cond_broadcast(&dbase->version_cond) == 0);
858 #else
859         LWP_NoYieldSignal(&dbase->version);
860 #endif
861         code = udisk_LogEnd(dbase, &dbase->version);
862         if (code) {
863             dbase->version.counter--;
864             return (code);
865         }
866
867         /* If we fail anytime after this, then panic and let the
868          * recovery replay the log. 
869          */
870         code = DFlush(dbase);   /* write dirty pages to respective files */
871         if (code)
872             panic("Writing Ubik DB modifications\n");
873         code = DSync(dbase);    /* sync the files and mark pages not dirty */
874         if (code)
875             panic("Synchronizing Ubik DB modifications\n");
876
877         code = DoTruncs(atrans);        /* Perform requested truncations */
878         if (code)
879             panic("Truncating Ubik DB\n");
880
881         /* label the committed dbase */
882         code = (*dbase->setlabel) (dbase, 0, &dbase->version);
883         if (code)
884             panic("Truncating Ubik DB\n");
885
886         code = (*dbase->truncate) (dbase, LOGFILE, 0);  /* discard log (optional) */
887         if (code)
888             panic("Truncating Ubik logfile\n");
889
890     }
891
892     /* When the transaction is marked done, it also means the logfile
893      * has been truncated.
894      */
895     atrans->flags |= TRDONE;
896     return code;
897 }
898
899 /*!
900  * \brief Abort transaction.
901  */
902 int
903 udisk_abort(struct ubik_trans *atrans)
904 {
905     struct ubik_dbase *dbase;
906     afs_int32 code;
907
908     if (atrans->flags & TRDONE)
909         return UTWOENDS;
910
911     /* Check if we are the write trans before logging abort, lest we
912      * abort a good write trans in progress. 
913      * We don't really care if the LOGABORT gets to the log because we 
914      * truncate the log next. If the truncate fails, we panic; for 
915      * otherwise, the log entries remain. On restart, replay of the log
916      * will do nothing because the abort is there or no LogEnd opcode.
917      */
918     dbase = atrans->dbase;
919     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
920         udisk_LogOpcode(dbase, LOGABORT, 1);
921         code = (*dbase->truncate) (dbase, LOGFILE, 0);
922         if (code)
923             panic("Truncating Ubik logfile during an abort\n");
924         DAbort(dbase);          /* remove all dirty pages */
925     }
926
927     /* When the transaction is marked done, it also means the logfile
928      * has been truncated.
929      */
930     atrans->flags |= (TRABORT | TRDONE);
931     return 0;
932 }
933
934 /*!
935  * \brief Destroy a transaction after it has been committed or aborted.
936  *
937  * If it hasn't committed before you call this routine, we'll abort the
938  * transaction for you.
939  */
940 int
941 udisk_end(struct ubik_trans *atrans)
942 {
943     struct ubik_dbase *dbase;
944
945 #if defined(UBIK_PAUSE)
946     /* Another thread is trying to lock this transaction.
947      * That can only be an RPC doing SDISK_Lock.
948      * Unlock the transaction, 'cause otherwise the other
949      * thread will never wake up.  Don't free it because
950      * the caller will do that already.
951      */
952     if (atrans->flags & TRSETLOCK) {
953         atrans->flags |= TRSTALE;
954         ulock_relLock(atrans);
955         return;
956     }
957 #endif /* UBIK_PAUSE */
958     if (!(atrans->flags & TRDONE))
959         udisk_abort(atrans);
960     dbase = atrans->dbase;
961
962     ulock_relLock(atrans);
963     unthread(atrans);
964
965     /* check if we are the write trans before unsetting the DBWRITING bit, else
966      * we could be unsetting someone else's bit.
967      */
968     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
969         dbase->flags &= ~DBWRITING;
970     } else {
971         dbase->readers--;
972     }
973     if (atrans->iovec_info.iovec_wrt_val)
974         free(atrans->iovec_info.iovec_wrt_val);
975     if (atrans->iovec_data.iovec_buf_val)
976         free(atrans->iovec_data.iovec_buf_val);
977     free(atrans);
978
979     /* Wakeup any writers waiting in BeginTrans() */
980 #ifdef AFS_PTHREAD_ENV
981         assert(pthread_cond_broadcast(&dbase->flags_cond) == 0);
982 #else
983     LWP_NoYieldSignal(&dbase->flags);
984 #endif
985     return 0;
986 }