acf007700f3ad26ebe6278a8e5d02a231511ffd5
[openafs.git] / src / ubik / disk.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #include <roken.h>
14 #include <afs/opr.h>
15
16 #ifdef AFS_PTHREAD_ENV
17 # include <opr/lock.h>
18 #else
19 # include <opr/lockstub.h>
20 #endif
21
22 #define UBIK_INTERNALS
23 #include "ubik.h"
24 #include "ubik_int.h"
25
26 #define PHSIZE  128
27 static struct buffer {
28     struct ubik_dbase *dbase;   /*!< dbase within which the buffer resides */
29     afs_int32 file;             /*!< Unique cache key */
30     afs_int32 page;             /*!< page number */
31     struct buffer *lru_next;
32     struct buffer *lru_prev;
33     struct buffer *hashNext;    /*!< next dude in hash table */
34     char *data;                 /*!< ptr to the data */
35     char lockers;               /*!< usage ref count */
36     char dirty;                 /*!< is buffer modified */
37     char hashIndex;             /*!< back ptr to hash table */
38 } *Buffers;
39
40 #define pHash(page) ((page) & (PHSIZE-1))
41
42 afs_int32 ubik_nBuffers = NBUFFERS;
43 static struct buffer *phTable[PHSIZE];  /*!< page hash table */
44 static struct buffer *LruBuffer;
45 static int nbuffers;
46 static int calls = 0, ios = 0, lastb = 0;
47 static char *BufferData;
48 static struct buffer *newslot(struct ubik_dbase *adbase, afs_int32 afid,
49                               afs_int32 apage);
50 #define BADFID      0xffffffff
51
52 static int DTrunc(struct ubik_trans *atrans, afs_int32 fid, afs_int32 length);
53
54 static struct ubik_trunc *freeTruncList = 0;
55
56 /*!
57  * \brief Remove a transaction from the database's active transaction list.  Don't free it.
58  */
59 static int
60 unthread(struct ubik_trans *atrans)
61 {
62     struct ubik_trans **lt, *tt;
63     lt = &atrans->dbase->activeTrans;
64     for (tt = *lt; tt; lt = &tt->next, tt = *lt) {
65         if (tt == atrans) {
66             /* found it */
67             *lt = tt->next;
68             return 0;
69         }
70     }
71     return 2;                   /* no entry */
72 }
73
74 /*!
75  * \brief some debugging assistance
76  */
77 void
78 udisk_Debug(struct ubik_debug *aparm)
79 {
80     struct buffer *tb;
81     int i;
82
83     memcpy(&aparm->localVersion, &ubik_dbase->version,
84            sizeof(struct ubik_version));
85     aparm->lockedPages = 0;
86     aparm->writeLockedPages = 0;
87     tb = Buffers;
88     for (i = 0; i < nbuffers; i++, tb++) {
89         if (tb->lockers) {
90             aparm->lockedPages++;
91             if (tb->dirty)
92                 aparm->writeLockedPages++;
93         }
94     }
95 }
96
97 /*!
98  * \brief Write an opcode to the log.
99  *
100  * log format is defined here, and implicitly in recovery.c
101  *
102  * 4 byte opcode, followed by parameters, each 4 bytes long.  All integers
103  * are in logged in network standard byte order, in case we want to move logs
104  * from machine-to-machine someday.
105  *
106  * Begin transaction: opcode \n
107  * Commit transaction: opcode, version (8 bytes) \n
108  * Truncate file: opcode, file number, length \n
109  * Abort transaction: opcode \n
110  * Write data: opcode, file, position, length, <length> data bytes \n
111  */
112 int
113 udisk_LogOpcode(struct ubik_dbase *adbase, afs_int32 aopcode, int async)
114 {
115     struct ubik_stat ustat;
116     afs_int32 code;
117
118     /* figure out where to write */
119     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
120     if (code < 0)
121         return code;
122
123     /* setup data and do write */
124     aopcode = htonl(aopcode);
125     code =
126         (*adbase->write) (adbase, LOGFILE, (char *)&aopcode, ustat.size,
127                           sizeof(afs_int32));
128     if (code != sizeof(afs_int32))
129         return UIOERROR;
130
131     /* optionally sync data */
132     if (async)
133         code = (*adbase->sync) (adbase, LOGFILE);
134     else
135         code = 0;
136     return code;
137 }
138
139 /*!
140  * \brief Log a commit, never syncing.
141  */
142 int
143 udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion)
144 {
145     afs_int32 code;
146     afs_int32 data[3];
147     struct ubik_stat ustat;
148
149     /* figure out where to write */
150     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
151     if (code)
152         return code;
153
154     /* setup data */
155     data[0] = htonl(LOGEND);
156     data[1] = htonl(aversion->epoch);
157     data[2] = htonl(aversion->counter);
158
159     /* do write */
160     code =
161         (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
162                           3 * sizeof(afs_int32));
163     if (code != 3 * sizeof(afs_int32))
164         return UIOERROR;
165
166     /* finally sync the log */
167     code = (*adbase->sync) (adbase, LOGFILE);
168     return code;
169 }
170
171 /*!
172  * \brief Log a truncate operation, never syncing.
173  */
174 int
175 udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile,
176                   afs_int32 alength)
177 {
178     afs_int32 code;
179     afs_int32 data[3];
180     struct ubik_stat ustat;
181
182     /* figure out where to write */
183     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
184     if (code < 0)
185         return code;
186
187     /* setup data */
188     data[0] = htonl(LOGTRUNCATE);
189     data[1] = htonl(afile);
190     data[2] = htonl(alength);
191
192     /* do write */
193     code =
194         (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
195                           3 * sizeof(afs_int32));
196     if (code != 3 * sizeof(afs_int32))
197         return UIOERROR;
198     return 0;
199 }
200
201 /*!
202  * \brief Write some data to the log, never syncing.
203  */
204 int
205 udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, void *abuffer,
206                    afs_int32 apos, afs_int32 alen)
207 {
208     struct ubik_stat ustat;
209     afs_int32 code;
210     afs_int32 data[4];
211     afs_int32 lpos;
212
213     /* find end of log */
214     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
215     lpos = ustat.size;
216     if (code < 0)
217         return code;
218
219     /* setup header */
220     data[0] = htonl(LOGDATA);
221     data[1] = htonl(afile);
222     data[2] = htonl(apos);
223     data[3] = htonl(alen);
224
225     /* write header */
226     code =
227         (*adbase->write) (adbase, LOGFILE, (char *)data, lpos, 4 * sizeof(afs_int32));
228     if (code != 4 * sizeof(afs_int32))
229         return UIOERROR;
230     lpos += 4 * sizeof(afs_int32);
231
232     /* write data */
233     code = (*adbase->write) (adbase, LOGFILE, abuffer, lpos, alen);
234     if (code != alen)
235         return UIOERROR;
236     return 0;
237 }
238
239 int
240 udisk_Init(int abuffers)
241 {
242     /* Initialize the venus buffer system. */
243     int i;
244     struct buffer *tb;
245     Buffers = calloc(abuffers, sizeof(struct buffer));
246     BufferData = malloc(abuffers * UBIK_PAGESIZE);
247     nbuffers = abuffers;
248     for (i = 0; i < PHSIZE; i++)
249         phTable[i] = 0;
250     for (i = 0; i < abuffers; i++) {
251         /* Fill in each buffer with an empty indication. */
252         tb = &Buffers[i];
253         tb->lru_next = &(Buffers[i + 1]);
254         tb->lru_prev = &(Buffers[i - 1]);
255         tb->data = &BufferData[UBIK_PAGESIZE * i];
256         tb->file = BADFID;
257     }
258     Buffers[0].lru_prev = &(Buffers[abuffers - 1]);
259     Buffers[abuffers - 1].lru_next = &(Buffers[0]);
260     LruBuffer = &(Buffers[0]);
261     return 0;
262 }
263
264 /*!
265  * \brief Take a buffer and mark it as the least recently used buffer.
266  */
267 static void
268 Dlru(struct buffer *abuf)
269 {
270     if (LruBuffer == abuf)
271         return;
272
273     /* Unthread from where it is in the list */
274     abuf->lru_next->lru_prev = abuf->lru_prev;
275     abuf->lru_prev->lru_next = abuf->lru_next;
276
277     /* Thread onto beginning of LRU list */
278     abuf->lru_next = LruBuffer;
279     abuf->lru_prev = LruBuffer->lru_prev;
280
281     LruBuffer->lru_prev->lru_next = abuf;
282     LruBuffer->lru_prev = abuf;
283     LruBuffer = abuf;
284 }
285
286 /*!
287  * \brief Take a buffer and mark it as the most recently used buffer.
288  */
289 static void
290 Dmru(struct buffer *abuf)
291 {
292     if (LruBuffer == abuf) {
293         LruBuffer = LruBuffer->lru_next;
294         return;
295     }
296
297     /* Unthread from where it is in the list */
298     abuf->lru_next->lru_prev = abuf->lru_prev;
299     abuf->lru_prev->lru_next = abuf->lru_next;
300
301     /* Thread onto end of LRU list - making it the MRU buffer */
302     abuf->lru_next = LruBuffer;
303     abuf->lru_prev = LruBuffer->lru_prev;
304     LruBuffer->lru_prev->lru_next = abuf;
305     LruBuffer->lru_prev = abuf;
306 }
307
308 static_inline int
309 MatchBuffer(struct buffer *buf, int page, afs_int32 fid,
310             struct ubik_trans *atrans)
311 {
312     if (buf->page != page) {
313         return 0;
314     }
315     if (buf->file != fid) {
316         return 0;
317     }
318     if (atrans->type == UBIK_READTRANS && buf->dirty) {
319         /* if 'buf' is dirty, it has uncommitted changes; we do not want to
320          * see uncommitted changes if we are a read transaction, so skip over
321          * it. */
322         return 0;
323     }
324     if (buf->dbase != atrans->dbase) {
325         return 0;
326     }
327     return 1;
328 }
329
330 /*!
331  * \brief Get a pointer to a particular buffer.
332  */
333 static char *
334 DRead(struct ubik_trans *atrans, afs_int32 fid, int page)
335 {
336     /* Read a page from the disk. */
337     struct buffer *tb, *lastbuffer, *found_tb = NULL;
338     afs_int32 code;
339     struct ubik_dbase *dbase = atrans->dbase;
340
341     calls++;
342     lastbuffer = LruBuffer->lru_prev;
343
344     /* Skip for write transactions for a clean page - this may not be the right page to use */
345     if (MatchBuffer(lastbuffer, page, fid, atrans)
346                 && (atrans->type == UBIK_READTRANS || lastbuffer->dirty)) {
347         tb = lastbuffer;
348         tb->lockers++;
349         lastb++;
350         return tb->data;
351     }
352     for (tb = phTable[pHash(page)]; tb; tb = tb->hashNext) {
353         if (MatchBuffer(tb, page, fid, atrans)) {
354             if (tb->dirty || atrans->type == UBIK_READTRANS) {
355                 found_tb = tb;
356                 break;
357             }
358             /* Remember this clean page - we might use it */
359             found_tb = tb;
360         }
361     }
362     /* For a write transaction, use a matching clean page if no dirty one was found */
363     if (found_tb) {
364         Dmru(found_tb);
365         found_tb->lockers++;
366         return found_tb->data;
367     }
368
369     /* can't find it */
370     tb = newslot(dbase, fid, page);
371     if (!tb)
372         return 0;
373     memset(tb->data, 0, UBIK_PAGESIZE);
374
375     tb->lockers++;
376     code =
377         (*dbase->read) (dbase, fid, tb->data, page * UBIK_PAGESIZE,
378                         UBIK_PAGESIZE);
379     if (code < 0) {
380         tb->file = BADFID;
381         Dlru(tb);
382         tb->lockers--;
383         ubik_print("Ubik: Error reading database file: errno=%d\n", errno);
384         return 0;
385     }
386     ios++;
387
388     /* Note that findslot sets the page field in the buffer equal to
389      * what it is searching for.
390      */
391     return tb->data;
392 }
393
394 /*!
395  * \brief Zap truncated pages.
396  */
397 static int
398 DTrunc(struct ubik_trans *atrans, afs_int32 fid, afs_int32 length)
399 {
400     afs_int32 maxPage;
401     struct buffer *tb;
402     int i;
403     struct ubik_dbase *dbase = atrans->dbase;
404
405     maxPage = (length + UBIK_PAGESIZE - 1) >> UBIK_LOGPAGESIZE; /* first invalid page now in file */
406     for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
407         if (tb->page >= maxPage && tb->file == fid && tb->dbase == dbase) {
408             tb->file = BADFID;
409             Dlru(tb);
410         }
411     }
412     return 0;
413 }
414
415 /*!
416  * \brief Allocate a truncation entry.
417  *
418  * We allocate special entries representing truncations, rather than
419  * performing them immediately, so that we can abort a transaction easily by simply purging
420  * the in-core memory buffers and discarding these truncation entries.
421  */
422 static struct ubik_trunc *
423 GetTrunc(void)
424 {
425     struct ubik_trunc *tt;
426     if (!freeTruncList) {
427         freeTruncList = malloc(sizeof(struct ubik_trunc));
428         freeTruncList->next = (struct ubik_trunc *)0;
429     }
430     tt = freeTruncList;
431     freeTruncList = tt->next;
432     return tt;
433 }
434
435 /*!
436  * \brief Free a truncation entry.
437  */
438 static int
439 PutTrunc(struct ubik_trunc *at)
440 {
441     at->next = freeTruncList;
442     freeTruncList = at;
443     return 0;
444 }
445
446 /*!
447  * \brief Find a truncation entry for a file, if any.
448  */
449 static struct ubik_trunc *
450 FindTrunc(struct ubik_trans *atrans, afs_int32 afile)
451 {
452     struct ubik_trunc *tt;
453     for (tt = atrans->activeTruncs; tt; tt = tt->next) {
454         if (tt->file == afile)
455             return tt;
456     }
457     return (struct ubik_trunc *)0;
458 }
459
460 /*!
461  * \brief Do truncates associated with \p atrans, and free them.
462  */
463 static int
464 DoTruncs(struct ubik_trans *atrans)
465 {
466     struct ubik_trunc *tt, *nt;
467     int (*tproc) (struct ubik_dbase *, afs_int32, afs_int32);
468     afs_int32 rcode = 0, code;
469
470     tproc = atrans->dbase->truncate;
471     for (tt = atrans->activeTruncs; tt; tt = nt) {
472         nt = tt->next;
473         DTrunc(atrans, tt->file, tt->length);   /* zap pages from buffer cache */
474         code = (*tproc) (atrans->dbase, tt->file, tt->length);
475         if (code)
476             rcode = code;
477         PutTrunc(tt);
478     }
479     /* don't unthread, because we do the entire list's worth here */
480     atrans->activeTruncs = (struct ubik_trunc *)0;
481     return (rcode);
482 }
483
484 /*!
485  * \brief Mark an \p fid as invalid.
486  */
487 int
488 udisk_Invalidate(struct ubik_dbase *adbase, afs_int32 afid)
489 {
490     struct buffer *tb;
491     int i;
492
493     for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
494         if (tb->file == afid) {
495             tb->file = BADFID;
496             Dlru(tb);
497         }
498     }
499     return 0;
500 }
501
502 /*!
503  * \brief Move this page into the correct hash bucket.
504  */
505 static int
506 FixupBucket(struct buffer *ap)
507 {
508     struct buffer **lp, *tp;
509     int i;
510     /* first try to get it out of its current hash bucket, in which it might not be */
511     i = ap->hashIndex;
512     lp = &phTable[i];
513     for (tp = *lp; tp; tp = tp->hashNext) {
514         if (tp == ap) {
515             *lp = tp->hashNext;
516             break;
517         }
518         lp = &tp->hashNext;
519     }
520     /* now figure the new hash bucket */
521     i = pHash(ap->page);
522     ap->hashIndex = i;          /* remember where we are for deletion */
523     ap->hashNext = phTable[i];  /* add us to the list */
524     phTable[i] = ap;
525     return 0;
526 }
527
528 /*!
529  * \brief Create a new slot for a particular dbase page.
530  */
531 static struct buffer *
532 newslot(struct ubik_dbase *adbase, afs_int32 afid, afs_int32 apage)
533 {
534     /* Find a usable buffer slot */
535     afs_int32 i;
536     struct buffer *pp, *tp;
537
538     pp = 0;                     /* last pure */
539     for (i = 0, tp = LruBuffer; i < nbuffers; i++, tp = tp->lru_next) {
540         if (!tp->lockers && !tp->dirty) {
541             pp = tp;
542             break;
543         }
544     }
545
546     if (pp == 0) {
547         /* There are no unlocked buffers that don't need to be written to the disk. */
548         ubik_print
549             ("Ubik: Internal Error: Unable to find free buffer in ubik cache\n");
550         return NULL;
551     }
552
553     /* Now fill in the header. */
554     pp->dbase = adbase;
555     pp->file = afid;
556     pp->page = apage;
557
558     FixupBucket(pp);            /* move to the right hash bucket */
559     Dmru(pp);
560     return pp;
561 }
562
563 /*!
564  * \brief Release a buffer, specifying whether or not the buffer has been modified by the locker.
565  */
566 static void
567 DRelease(char *ap, int flag)
568 {
569     int index;
570     struct buffer *bp;
571
572     if (!ap)
573         return;
574     index = (int)(ap - (char *)BufferData) >> UBIK_LOGPAGESIZE;
575     bp = &(Buffers[index]);
576     bp->lockers--;
577     if (flag)
578         bp->dirty = 1;
579     return;
580 }
581
582 /*!
583  * \brief Flush all modified buffers, leaves dirty bits set (they're cleared
584  * by DSync()).
585  *
586  * \note Note interaction with DSync(): you call this thing first,
587  * writing the buffers to the disk.  Then you call DSync() to sync all the
588  * files that were written, and to clear the dirty bits.  You should
589  * always call DFlush/DSync as a pair.
590  */
591 static int
592 DFlush(struct ubik_trans *atrans)
593 {
594     int i;
595     afs_int32 code;
596     struct buffer *tb;
597     struct ubik_dbase *adbase = atrans->dbase;
598
599     tb = Buffers;
600     for (i = 0; i < nbuffers; i++, tb++) {
601         if (tb->dirty) {
602             code = tb->page * UBIK_PAGESIZE;    /* offset within file */
603             code =
604                 (*adbase->write) (adbase, tb->file, tb->data, code,
605                                   UBIK_PAGESIZE);
606             if (code != UBIK_PAGESIZE)
607                 return UIOERROR;
608         }
609     }
610     return 0;
611 }
612
613 /*!
614  * \brief Flush all modified buffers.
615  */
616 static int
617 DAbort(struct ubik_trans *atrans)
618 {
619     int i;
620     struct buffer *tb;
621
622     tb = Buffers;
623     for (i = 0; i < nbuffers; i++, tb++) {
624         if (tb->dirty) {
625             tb->dirty = 0;
626             tb->file = BADFID;
627             Dlru(tb);
628         }
629     }
630     return 0;
631 }
632
633 /**
634  * Invalidate any buffers that are duplicates of abuf. Duplicate buffers
635  * can appear if a read transaction reads a page that is dirty, then that
636  * dirty page is synced. The read transaction will skip over the dirty page,
637  * and create a new buffer, and when the dirty page is synced, it will be
638  * identical (except for contents) to the read-transaction buffer.
639  */
640 static void
641 DedupBuffer(struct buffer *abuf)
642 {
643     struct buffer *tb;
644     for (tb = phTable[pHash(abuf->page)]; tb; tb = tb->hashNext) {
645         if (tb->page == abuf->page && tb != abuf && tb->file == abuf->file
646             && tb->dbase == abuf->dbase) {
647
648             tb->file = BADFID;
649             Dlru(tb);
650         }
651     }
652 }
653
654 /*!
655  * \attention DSync() must only be called after DFlush(), due to its interpretation of dirty flag.
656  */
657 static int
658 DSync(struct ubik_trans *atrans)
659 {
660     int i;
661     afs_int32 code;
662     struct buffer *tb;
663     afs_int32 file;
664     afs_int32 rCode;
665     struct ubik_dbase *adbase = atrans->dbase;
666
667     rCode = 0;
668     while (1) {
669         file = BADFID;
670         for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
671             if (tb->dirty == 1) {
672                 if (file == BADFID)
673                     file = tb->file;
674                 if (file != BADFID && tb->file == file) {
675                     tb->dirty = 0;
676                     DedupBuffer(tb);
677                 }
678             }
679         }
680         if (file == BADFID)
681             break;
682         /* otherwise we have a file to sync */
683         code = (*adbase->sync) (adbase, file);
684         if (code)
685             rCode = code;
686     }
687     return rCode;
688 }
689
690 /*!
691  * \brief Same as DRead(), only do not even try to read the page.
692  */
693 static char *
694 DNew(struct ubik_trans *atrans, afs_int32 fid, int page)
695 {
696     struct buffer *tb;
697     struct ubik_dbase *dbase = atrans->dbase;
698
699     if ((tb = newslot(dbase, fid, page)) == 0)
700         return NULL;
701     tb->lockers++;
702     memset(tb->data, 0, UBIK_PAGESIZE);
703     return tb->data;
704 }
705
706 /*!
707  * \brief Read data from database.
708  */
709 int
710 udisk_read(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
711            afs_int32 apos, afs_int32 alen)
712 {
713     char *bp;
714     afs_int32 offset, len, totalLen;
715
716     if (atrans->flags & TRDONE)
717         return UDONE;
718     totalLen = 0;
719     while (alen > 0) {
720         bp = DRead(atrans, afile, apos >> UBIK_LOGPAGESIZE);
721         if (!bp)
722             return UEOF;
723         /* otherwise, min of remaining bytes and end of buffer to user mode */
724         offset = apos & (UBIK_PAGESIZE - 1);
725         len = UBIK_PAGESIZE - offset;
726         if (len > alen)
727             len = alen;
728         memcpy(abuffer, bp + offset, len);
729         abuffer = (char *)abuffer + len;
730         apos += len;
731         alen -= len;
732         totalLen += len;
733         DRelease(bp, 0);
734     }
735     return 0;
736 }
737
738 /*!
739  * \brief Truncate file.
740  */
741 int
742 udisk_truncate(struct ubik_trans *atrans, afs_int32 afile, afs_int32 alength)
743 {
744     afs_int32 code;
745     struct ubik_trunc *tt;
746
747     if (atrans->flags & TRDONE)
748         return UDONE;
749     if (atrans->type != UBIK_WRITETRANS)
750         return UBADTYPE;
751
752     /* write a truncate log record */
753     code = udisk_LogTruncate(atrans->dbase, afile, alength);
754
755     /* don't truncate until commit time */
756     tt = FindTrunc(atrans, afile);
757     if (!tt) {
758         /* this file not truncated yet */
759         tt = GetTrunc();
760         tt->next = atrans->activeTruncs;
761         atrans->activeTruncs = tt;
762         tt->file = afile;
763         tt->length = alength;
764     } else {
765         /* already truncated to a certain length */
766         if (tt->length > alength)
767             tt->length = alength;
768     }
769     return code;
770 }
771
772 /*!
773  * \brief Write data to database, using logs.
774  */
775 int
776 udisk_write(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
777             afs_int32 apos, afs_int32 alen)
778 {
779     char *bp;
780     afs_int32 offset, len, totalLen;
781     struct ubik_trunc *tt;
782     afs_int32 code;
783
784     if (atrans->flags & TRDONE)
785         return UDONE;
786     if (atrans->type != UBIK_WRITETRANS)
787         return UBADTYPE;
788
789     /* first write the data to the log */
790     code = udisk_LogWriteData(atrans->dbase, afile, abuffer, apos, alen);
791     if (code)
792         return code;
793
794     /* expand any truncations of this file */
795     tt = FindTrunc(atrans, afile);
796     if (tt) {
797         if (tt->length < apos + alen) {
798             tt->length = apos + alen;
799         }
800     }
801
802     /* now update vm */
803     totalLen = 0;
804     while (alen > 0) {
805         bp = DRead(atrans, afile, apos >> UBIK_LOGPAGESIZE);
806         if (!bp) {
807             bp = DNew(atrans, afile, apos >> UBIK_LOGPAGESIZE);
808             if (!bp)
809                 return UIOERROR;
810             memset(bp, 0, UBIK_PAGESIZE);
811         }
812         /* otherwise, min of remaining bytes and end of buffer to user mode */
813         offset = apos & (UBIK_PAGESIZE - 1);
814         len = UBIK_PAGESIZE - offset;
815         if (len > alen)
816             len = alen;
817         memcpy(bp + offset, abuffer, len);
818         abuffer = (char *)abuffer + len;
819         apos += len;
820         alen -= len;
821         totalLen += len;
822         DRelease(bp, 1);        /* buffer modified */
823     }
824     return 0;
825 }
826
827 /*!
828  * \brief Begin a new local transaction.
829  */
830 int
831 udisk_begin(struct ubik_dbase *adbase, int atype, struct ubik_trans **atrans)
832 {
833     afs_int32 code;
834     struct ubik_trans *tt;
835
836     *atrans = NULL;
837     if (atype == UBIK_WRITETRANS) {
838         if (adbase->flags & DBWRITING)
839             return USYNC;
840         code = udisk_LogOpcode(adbase, LOGNEW, 0);
841         if (code)
842             return code;
843     }
844     tt = calloc(1, sizeof(struct ubik_trans));
845     tt->dbase = adbase;
846     tt->next = adbase->activeTrans;
847     adbase->activeTrans = tt;
848     tt->type = atype;
849     if (atype == UBIK_READTRANS)
850         adbase->readers++;
851     else if (atype == UBIK_WRITETRANS) {
852         UBIK_VERSION_LOCK;
853         adbase->flags |= DBWRITING;
854         UBIK_VERSION_UNLOCK;
855     }
856     *atrans = tt;
857     return 0;
858 }
859
860 /*!
861  * \brief Commit transaction.
862  */
863 int
864 udisk_commit(struct ubik_trans *atrans)
865 {
866     struct ubik_dbase *dbase;
867     afs_int32 code = 0;
868     struct ubik_version oldversion, newversion;
869
870     if (atrans->flags & TRDONE)
871         return (UTWOENDS);
872
873     if (atrans->type == UBIK_WRITETRANS) {
874         dbase = atrans->dbase;
875
876         /* On the first write to the database. We update the versions */
877         if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
878             UBIK_VERSION_LOCK;
879             oldversion = dbase->version;
880             newversion.epoch = FT_ApproxTime();;
881             newversion.counter = 1;
882
883             code = (*dbase->setlabel) (dbase, 0, &newversion);
884             if (code)
885                 return (code);
886             version_globals.ubik_epochTime = newversion.epoch;
887             dbase->version = newversion;
888             UBIK_VERSION_UNLOCK;
889
890             urecovery_state |= UBIK_RECLABELDB;
891
892             /* Ignore the error here. If the call fails, the site is
893              * marked down and when we detect it is up again, we will
894              * send the entire database to it.
895              */
896             ContactQuorum_DISK_SetVersion( atrans, 1 /*CStampVersion */ ,
897                                            &oldversion, &newversion);
898         }
899
900         UBIK_VERSION_LOCK;
901         dbase->version.counter++;       /* bump commit count */
902 #ifdef AFS_PTHREAD_ENV
903         opr_cv_broadcast(&dbase->version_cond);
904 #else
905         LWP_NoYieldSignal(&dbase->version);
906 #endif
907         code = udisk_LogEnd(dbase, &dbase->version);
908         if (code) {
909             dbase->version.counter--;
910             return (code);
911         }
912         UBIK_VERSION_UNLOCK;
913
914         /* If we fail anytime after this, then panic and let the
915          * recovery replay the log.
916          */
917         code = DFlush(atrans);  /* write dirty pages to respective files */
918         if (code)
919             panic("Writing Ubik DB modifications\n");
920         code = DSync(atrans);   /* sync the files and mark pages not dirty */
921         if (code)
922             panic("Synchronizing Ubik DB modifications\n");
923
924         code = DoTruncs(atrans);        /* Perform requested truncations */
925         if (code)
926             panic("Truncating Ubik DB\n");
927
928         /* label the committed dbase */
929         code = (*dbase->setlabel) (dbase, 0, &dbase->version);
930         if (code)
931             panic("Truncating Ubik DB\n");
932
933         code = (*dbase->truncate) (dbase, LOGFILE, 0);  /* discard log (optional) */
934         if (code)
935             panic("Truncating Ubik logfile\n");
936
937     }
938
939     /* When the transaction is marked done, it also means the logfile
940      * has been truncated.
941      */
942     atrans->flags |= TRDONE;
943     return code;
944 }
945
946 /*!
947  * \brief Abort transaction.
948  */
949 int
950 udisk_abort(struct ubik_trans *atrans)
951 {
952     struct ubik_dbase *dbase;
953     afs_int32 code;
954
955     if (atrans->flags & TRDONE)
956         return UTWOENDS;
957
958     /* Check if we are the write trans before logging abort, lest we
959      * abort a good write trans in progress.
960      * We don't really care if the LOGABORT gets to the log because we
961      * truncate the log next. If the truncate fails, we panic; for
962      * otherwise, the log entries remain. On restart, replay of the log
963      * will do nothing because the abort is there or no LogEnd opcode.
964      */
965     dbase = atrans->dbase;
966     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
967         udisk_LogOpcode(dbase, LOGABORT, 1);
968         code = (*dbase->truncate) (dbase, LOGFILE, 0);
969         if (code)
970             panic("Truncating Ubik logfile during an abort\n");
971         DAbort(atrans);         /* remove all dirty pages */
972     }
973
974     /* When the transaction is marked done, it also means the logfile
975      * has been truncated.
976      */
977     atrans->flags |= (TRABORT | TRDONE);
978     return 0;
979 }
980
981 /*!
982  * \brief Destroy a transaction after it has been committed or aborted.
983  *
984  * If it hasn't committed before you call this routine, we'll abort the
985  * transaction for you.
986  */
987 int
988 udisk_end(struct ubik_trans *atrans)
989 {
990     struct ubik_dbase *dbase;
991
992     if (!(atrans->flags & TRDONE))
993         udisk_abort(atrans);
994     dbase = atrans->dbase;
995
996     ulock_relLock(atrans);
997     unthread(atrans);
998
999     /* check if we are the write trans before unsetting the DBWRITING bit, else
1000      * we could be unsetting someone else's bit.
1001      */
1002     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
1003         UBIK_VERSION_LOCK;
1004         dbase->flags &= ~DBWRITING;
1005         UBIK_VERSION_UNLOCK;
1006     } else {
1007         dbase->readers--;
1008     }
1009     if (atrans->iovec_info.iovec_wrt_val)
1010         free(atrans->iovec_info.iovec_wrt_val);
1011     if (atrans->iovec_data.iovec_buf_val)
1012         free(atrans->iovec_data.iovec_buf_val);
1013     free(atrans);
1014
1015     /* Wakeup any writers waiting in BeginTrans() */
1016 #ifdef AFS_PTHREAD_ENV
1017     opr_cv_broadcast(&dbase->flags_cond);
1018 #else
1019     LWP_NoYieldSignal(&dbase->flags);
1020 #endif
1021     return 0;
1022 }