ubik: rename DInit, call from initialization sequence
[openafs.git] / src / ubik / disk.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #include <roken.h>
14
15 #include <sys/types.h>
16 #include <string.h>
17 #include <stdarg.h>
18 #include <errno.h>
19
20 #ifdef AFS_NT40_ENV
21 #include <winsock2.h>
22 #else
23 #include <sys/file.h>
24 #include <netinet/in.h>
25 #endif
26
27 #include <lock.h>
28 #include <rx/xdr.h>
29
30
31
32 #define UBIK_INTERNALS
33 #include "ubik.h"
34 #include "ubik_int.h"
35
36 #define PHSIZE  128
37 static struct buffer {
38     struct ubik_dbase *dbase;   /*!< dbase within which the buffer resides */
39     afs_int32 file;             /*!< Unique cache key */
40     afs_int32 page;             /*!< page number */
41     struct buffer *lru_next;
42     struct buffer *lru_prev;
43     struct buffer *hashNext;    /*!< next dude in hash table */
44     char *data;                 /*!< ptr to the data */
45     char lockers;               /*!< usage ref count */
46     char dirty;                 /*!< is buffer modified */
47     char hashIndex;             /*!< back ptr to hash table */
48 } *Buffers;
49
50 #define pHash(page) ((page) & (PHSIZE-1))
51
52 afs_int32 ubik_nBuffers = NBUFFERS;
53 static struct buffer *phTable[PHSIZE];  /*!< page hash table */
54 static struct buffer *LruBuffer;
55 static int nbuffers;
56 static int calls = 0, ios = 0, lastb = 0;
57 static char *BufferData;
58 static struct buffer *newslot(struct ubik_dbase *adbase, afs_int32 afid,
59                               afs_int32 apage);
60 #define BADFID      0xffffffff
61
62 static int DTrunc(struct ubik_trans *atrans, afs_int32 fid, afs_int32 length);
63
64 static struct ubik_trunc *freeTruncList = 0;
65
66 /*!
67  * \brief Remove a transaction from the database's active transaction list.  Don't free it.
68  */
69 static int
70 unthread(struct ubik_trans *atrans)
71 {
72     struct ubik_trans **lt, *tt;
73     lt = &atrans->dbase->activeTrans;
74     for (tt = *lt; tt; lt = &tt->next, tt = *lt) {
75         if (tt == atrans) {
76             /* found it */
77             *lt = tt->next;
78             return 0;
79         }
80     }
81     return 2;                   /* no entry */
82 }
83
84 /*!
85  * \brief some debugging assistance
86  */
87 void
88 udisk_Debug(struct ubik_debug *aparm)
89 {
90     struct buffer *tb;
91     int i;
92
93     memcpy(&aparm->localVersion, &ubik_dbase->version,
94            sizeof(struct ubik_version));
95     aparm->lockedPages = 0;
96     aparm->writeLockedPages = 0;
97     tb = Buffers;
98     for (i = 0; i < nbuffers; i++, tb++) {
99         if (tb->lockers) {
100             aparm->lockedPages++;
101             if (tb->dirty)
102                 aparm->writeLockedPages++;
103         }
104     }
105 }
106
107 /*!
108  * \brief Write an opcode to the log.
109  *
110  * log format is defined here, and implicitly in recovery.c
111  *
112  * 4 byte opcode, followed by parameters, each 4 bytes long.  All integers
113  * are in logged in network standard byte order, in case we want to move logs
114  * from machine-to-machine someday.
115  *
116  * Begin transaction: opcode \n
117  * Commit transaction: opcode, version (8 bytes) \n
118  * Truncate file: opcode, file number, length \n
119  * Abort transaction: opcode \n
120  * Write data: opcode, file, position, length, <length> data bytes \n
121  */
122 int
123 udisk_LogOpcode(struct ubik_dbase *adbase, afs_int32 aopcode, int async)
124 {
125     struct ubik_stat ustat;
126     afs_int32 code;
127
128     /* figure out where to write */
129     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
130     if (code < 0)
131         return code;
132
133     /* setup data and do write */
134     aopcode = htonl(aopcode);
135     code =
136         (*adbase->write) (adbase, LOGFILE, (char *)&aopcode, ustat.size,
137                           sizeof(afs_int32));
138     if (code != sizeof(afs_int32))
139         return UIOERROR;
140
141     /* optionally sync data */
142     if (async)
143         code = (*adbase->sync) (adbase, LOGFILE);
144     else
145         code = 0;
146     return code;
147 }
148
149 /*!
150  * \brief Log a commit, never syncing.
151  */
152 int
153 udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion)
154 {
155     afs_int32 code;
156     afs_int32 data[3];
157     struct ubik_stat ustat;
158
159     /* figure out where to write */
160     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
161     if (code)
162         return code;
163
164     /* setup data */
165     data[0] = htonl(LOGEND);
166     data[1] = htonl(aversion->epoch);
167     data[2] = htonl(aversion->counter);
168
169     /* do write */
170     code =
171         (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
172                           3 * sizeof(afs_int32));
173     if (code != 3 * sizeof(afs_int32))
174         return UIOERROR;
175
176     /* finally sync the log */
177     code = (*adbase->sync) (adbase, LOGFILE);
178     return code;
179 }
180
181 /*!
182  * \brief Log a truncate operation, never syncing.
183  */
184 int
185 udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile,
186                   afs_int32 alength)
187 {
188     afs_int32 code;
189     afs_int32 data[3];
190     struct ubik_stat ustat;
191
192     /* figure out where to write */
193     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
194     if (code < 0)
195         return code;
196
197     /* setup data */
198     data[0] = htonl(LOGTRUNCATE);
199     data[1] = htonl(afile);
200     data[2] = htonl(alength);
201
202     /* do write */
203     code =
204         (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
205                           3 * sizeof(afs_int32));
206     if (code != 3 * sizeof(afs_int32))
207         return UIOERROR;
208     return 0;
209 }
210
211 /*!
212  * \brief Write some data to the log, never syncing.
213  */
214 int
215 udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, void *abuffer,
216                    afs_int32 apos, afs_int32 alen)
217 {
218     struct ubik_stat ustat;
219     afs_int32 code;
220     afs_int32 data[4];
221     afs_int32 lpos;
222
223     /* find end of log */
224     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
225     lpos = ustat.size;
226     if (code < 0)
227         return code;
228
229     /* setup header */
230     data[0] = htonl(LOGDATA);
231     data[1] = htonl(afile);
232     data[2] = htonl(apos);
233     data[3] = htonl(alen);
234
235     /* write header */
236     code =
237         (*adbase->write) (adbase, LOGFILE, (char *)data, lpos, 4 * sizeof(afs_int32));
238     if (code != 4 * sizeof(afs_int32))
239         return UIOERROR;
240     lpos += 4 * sizeof(afs_int32);
241
242     /* write data */
243     code = (*adbase->write) (adbase, LOGFILE, abuffer, lpos, alen);
244     if (code != alen)
245         return UIOERROR;
246     return 0;
247 }
248
249 int
250 udisk_Init(int abuffers)
251 {
252     /* Initialize the venus buffer system. */
253     int i;
254     struct buffer *tb;
255     Buffers = (struct buffer *)malloc(abuffers * sizeof(struct buffer));
256     memset(Buffers, 0, abuffers * sizeof(struct buffer));
257     BufferData = (char *)malloc(abuffers * UBIK_PAGESIZE);
258     nbuffers = abuffers;
259     for (i = 0; i < PHSIZE; i++)
260         phTable[i] = 0;
261     for (i = 0; i < abuffers; i++) {
262         /* Fill in each buffer with an empty indication. */
263         tb = &Buffers[i];
264         tb->lru_next = &(Buffers[i + 1]);
265         tb->lru_prev = &(Buffers[i - 1]);
266         tb->data = &BufferData[UBIK_PAGESIZE * i];
267         tb->file = BADFID;
268     }
269     Buffers[0].lru_prev = &(Buffers[abuffers - 1]);
270     Buffers[abuffers - 1].lru_next = &(Buffers[0]);
271     LruBuffer = &(Buffers[0]);
272     return 0;
273 }
274
275 /*!
276  * \brief Take a buffer and mark it as the least recently used buffer.
277  */
278 static void
279 Dlru(struct buffer *abuf)
280 {
281     if (LruBuffer == abuf)
282         return;
283
284     /* Unthread from where it is in the list */
285     abuf->lru_next->lru_prev = abuf->lru_prev;
286     abuf->lru_prev->lru_next = abuf->lru_next;
287
288     /* Thread onto beginning of LRU list */
289     abuf->lru_next = LruBuffer;
290     abuf->lru_prev = LruBuffer->lru_prev;
291
292     LruBuffer->lru_prev->lru_next = abuf;
293     LruBuffer->lru_prev = abuf;
294     LruBuffer = abuf;
295 }
296
297 /*!
298  * \brief Take a buffer and mark it as the most recently used buffer.
299  */
300 static void
301 Dmru(struct buffer *abuf)
302 {
303     if (LruBuffer == abuf) {
304         LruBuffer = LruBuffer->lru_next;
305         return;
306     }
307
308     /* Unthread from where it is in the list */
309     abuf->lru_next->lru_prev = abuf->lru_prev;
310     abuf->lru_prev->lru_next = abuf->lru_next;
311
312     /* Thread onto end of LRU list - making it the MRU buffer */
313     abuf->lru_next = LruBuffer;
314     abuf->lru_prev = LruBuffer->lru_prev;
315     LruBuffer->lru_prev->lru_next = abuf;
316     LruBuffer->lru_prev = abuf;
317 }
318
319 static_inline int
320 MatchBuffer(struct buffer *buf, int page, afs_int32 fid,
321             struct ubik_trans *atrans)
322 {
323     if (buf->page != page) {
324         return 0;
325     }
326     if (buf->file != fid) {
327         return 0;
328     }
329     if (atrans->type == UBIK_READTRANS && buf->dirty) {
330         /* if 'buf' is dirty, it has uncommitted changes; we do not want to
331          * see uncommitted changes if we are a read transaction, so skip over
332          * it. */
333         return 0;
334     }
335     if (buf->dbase != atrans->dbase) {
336         return 0;
337     }
338     return 1;
339 }
340
341 /*!
342  * \brief Get a pointer to a particular buffer.
343  */
344 static char *
345 DRead(struct ubik_trans *atrans, afs_int32 fid, int page)
346 {
347     /* Read a page from the disk. */
348     struct buffer *tb, *lastbuffer, *found_tb = NULL;
349     afs_int32 code;
350     struct ubik_dbase *dbase = atrans->dbase;
351
352     calls++;
353     lastbuffer = LruBuffer->lru_prev;
354
355     /* Skip for write transactions for a clean page - this may not be the right page to use */
356     if (MatchBuffer(lastbuffer, page, fid, atrans)
357                 && (atrans->type == UBIK_READTRANS || lastbuffer->dirty)) {
358         tb = lastbuffer;
359         tb->lockers++;
360         lastb++;
361         return tb->data;
362     }
363     for (tb = phTable[pHash(page)]; tb; tb = tb->hashNext) {
364         if (MatchBuffer(tb, page, fid, atrans)) {
365             if (tb->dirty || atrans->type == UBIK_READTRANS) {
366                 found_tb = tb;
367                 break;
368             }
369             /* Remember this clean page - we might use it */
370             found_tb = tb;
371         }
372     }
373     /* For a write transaction, use a matching clean page if no dirty one was found */
374     if (found_tb) {
375         Dmru(found_tb);
376         found_tb->lockers++;
377         return found_tb->data;
378     }
379
380     /* can't find it */
381     tb = newslot(dbase, fid, page);
382     if (!tb)
383         return 0;
384     memset(tb->data, 0, UBIK_PAGESIZE);
385
386     tb->lockers++;
387     code =
388         (*dbase->read) (dbase, fid, tb->data, page * UBIK_PAGESIZE,
389                         UBIK_PAGESIZE);
390     if (code < 0) {
391         tb->file = BADFID;
392         Dlru(tb);
393         tb->lockers--;
394         ubik_print("Ubik: Error reading database file: errno=%d\n", errno);
395         return 0;
396     }
397     ios++;
398
399     /* Note that findslot sets the page field in the buffer equal to
400      * what it is searching for.
401      */
402     return tb->data;
403 }
404
405 /*!
406  * \brief Zap truncated pages.
407  */
408 static int
409 DTrunc(struct ubik_trans *atrans, afs_int32 fid, afs_int32 length)
410 {
411     afs_int32 maxPage;
412     struct buffer *tb;
413     int i;
414     struct ubik_dbase *dbase = atrans->dbase;
415
416     maxPage = (length + UBIK_PAGESIZE - 1) >> UBIK_LOGPAGESIZE; /* first invalid page now in file */
417     for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
418         if (tb->page >= maxPage && tb->file == fid && tb->dbase == dbase) {
419             tb->file = BADFID;
420             Dlru(tb);
421         }
422     }
423     return 0;
424 }
425
426 /*!
427  * \brief Allocate a truncation entry.
428  *
429  * We allocate special entries representing truncations, rather than
430  * performing them immediately, so that we can abort a transaction easily by simply purging
431  * the in-core memory buffers and discarding these truncation entries.
432  */
433 static struct ubik_trunc *
434 GetTrunc(void)
435 {
436     struct ubik_trunc *tt;
437     if (!freeTruncList) {
438         freeTruncList =
439             (struct ubik_trunc *)malloc(sizeof(struct ubik_trunc));
440         freeTruncList->next = (struct ubik_trunc *)0;
441     }
442     tt = freeTruncList;
443     freeTruncList = tt->next;
444     return tt;
445 }
446
447 /*!
448  * \brief Free a truncation entry.
449  */
450 static int
451 PutTrunc(struct ubik_trunc *at)
452 {
453     at->next = freeTruncList;
454     freeTruncList = at;
455     return 0;
456 }
457
458 /*!
459  * \brief Find a truncation entry for a file, if any.
460  */
461 static struct ubik_trunc *
462 FindTrunc(struct ubik_trans *atrans, afs_int32 afile)
463 {
464     struct ubik_trunc *tt;
465     for (tt = atrans->activeTruncs; tt; tt = tt->next) {
466         if (tt->file == afile)
467             return tt;
468     }
469     return (struct ubik_trunc *)0;
470 }
471
472 /*!
473  * \brief Do truncates associated with \p atrans, and free them.
474  */
475 static int
476 DoTruncs(struct ubik_trans *atrans)
477 {
478     struct ubik_trunc *tt, *nt;
479     int (*tproc) (struct ubik_dbase *, afs_int32, afs_int32);
480     afs_int32 rcode = 0, code;
481
482     tproc = atrans->dbase->truncate;
483     for (tt = atrans->activeTruncs; tt; tt = nt) {
484         nt = tt->next;
485         DTrunc(atrans, tt->file, tt->length);   /* zap pages from buffer cache */
486         code = (*tproc) (atrans->dbase, tt->file, tt->length);
487         if (code)
488             rcode = code;
489         PutTrunc(tt);
490     }
491     /* don't unthread, because we do the entire list's worth here */
492     atrans->activeTruncs = (struct ubik_trunc *)0;
493     return (rcode);
494 }
495
496 /*!
497  * \brief Mark an \p fid as invalid.
498  */
499 int
500 udisk_Invalidate(struct ubik_dbase *adbase, afs_int32 afid)
501 {
502     struct buffer *tb;
503     int i;
504
505     for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
506         if (tb->file == afid) {
507             tb->file = BADFID;
508             Dlru(tb);
509         }
510     }
511     return 0;
512 }
513
514 /*!
515  * \brief Move this page into the correct hash bucket.
516  */
517 static int
518 FixupBucket(struct buffer *ap)
519 {
520     struct buffer **lp, *tp;
521     int i;
522     /* first try to get it out of its current hash bucket, in which it might not be */
523     i = ap->hashIndex;
524     lp = &phTable[i];
525     for (tp = *lp; tp; tp = tp->hashNext) {
526         if (tp == ap) {
527             *lp = tp->hashNext;
528             break;
529         }
530         lp = &tp->hashNext;
531     }
532     /* now figure the new hash bucket */
533     i = pHash(ap->page);
534     ap->hashIndex = i;          /* remember where we are for deletion */
535     ap->hashNext = phTable[i];  /* add us to the list */
536     phTable[i] = ap;
537     return 0;
538 }
539
540 /*!
541  * \brief Create a new slot for a particular dbase page.
542  */
543 static struct buffer *
544 newslot(struct ubik_dbase *adbase, afs_int32 afid, afs_int32 apage)
545 {
546     /* Find a usable buffer slot */
547     afs_int32 i;
548     struct buffer *pp, *tp;
549
550     pp = 0;                     /* last pure */
551     for (i = 0, tp = LruBuffer; i < nbuffers; i++, tp = tp->lru_next) {
552         if (!tp->lockers && !tp->dirty) {
553             pp = tp;
554             break;
555         }
556     }
557
558     if (pp == 0) {
559         /* There are no unlocked buffers that don't need to be written to the disk. */
560         ubik_print
561             ("Ubik: Internal Error: Unable to find free buffer in ubik cache\n");
562         return NULL;
563     }
564
565     /* Now fill in the header. */
566     pp->dbase = adbase;
567     pp->file = afid;
568     pp->page = apage;
569
570     FixupBucket(pp);            /* move to the right hash bucket */
571     Dmru(pp);
572     return pp;
573 }
574
575 /*!
576  * \brief Release a buffer, specifying whether or not the buffer has been modified by the locker.
577  */
578 static void
579 DRelease(char *ap, int flag)
580 {
581     int index;
582     struct buffer *bp;
583
584     if (!ap)
585         return;
586     index = (int)(ap - (char *)BufferData) >> UBIK_LOGPAGESIZE;
587     bp = &(Buffers[index]);
588     bp->lockers--;
589     if (flag)
590         bp->dirty = 1;
591     return;
592 }
593
594 /*!
595  * \brief Flush all modified buffers, leaves dirty bits set (they're cleared
596  * by DSync()).
597  *
598  * \note Note interaction with DSync(): you call this thing first,
599  * writing the buffers to the disk.  Then you call DSync() to sync all the
600  * files that were written, and to clear the dirty bits.  You should
601  * always call DFlush/DSync as a pair.
602  */
603 static int
604 DFlush(struct ubik_trans *atrans)
605 {
606     int i;
607     afs_int32 code;
608     struct buffer *tb;
609     struct ubik_dbase *adbase = atrans->dbase;
610
611     tb = Buffers;
612     for (i = 0; i < nbuffers; i++, tb++) {
613         if (tb->dirty) {
614             code = tb->page * UBIK_PAGESIZE;    /* offset within file */
615             code =
616                 (*adbase->write) (adbase, tb->file, tb->data, code,
617                                   UBIK_PAGESIZE);
618             if (code != UBIK_PAGESIZE)
619                 return UIOERROR;
620         }
621     }
622     return 0;
623 }
624
625 /*!
626  * \brief Flush all modified buffers.
627  */
628 static int
629 DAbort(struct ubik_trans *atrans)
630 {
631     int i;
632     struct buffer *tb;
633
634     tb = Buffers;
635     for (i = 0; i < nbuffers; i++, tb++) {
636         if (tb->dirty) {
637             tb->dirty = 0;
638             tb->file = BADFID;
639             Dlru(tb);
640         }
641     }
642     return 0;
643 }
644
645 /**
646  * Invalidate any buffers that are duplicates of abuf. Duplicate buffers
647  * can appear if a read transaction reads a page that is dirty, then that
648  * dirty page is synced. The read transaction will skip over the dirty page,
649  * and create a new buffer, and when the dirty page is synced, it will be
650  * identical (except for contents) to the read-transaction buffer.
651  */
652 static void
653 DedupBuffer(struct buffer *abuf)
654 {
655     struct buffer *tb;
656     for (tb = phTable[pHash(abuf->page)]; tb; tb = tb->hashNext) {
657         if (tb->page == abuf->page && tb != abuf && tb->file == abuf->file
658             && tb->dbase == abuf->dbase) {
659
660             tb->file = BADFID;
661             Dlru(tb);
662         }
663     }
664 }
665
666 /*!
667  * \attention DSync() must only be called after DFlush(), due to its interpretation of dirty flag.
668  */
669 static int
670 DSync(struct ubik_trans *atrans)
671 {
672     int i;
673     afs_int32 code;
674     struct buffer *tb;
675     afs_int32 file;
676     afs_int32 rCode;
677     struct ubik_dbase *adbase = atrans->dbase;
678
679     rCode = 0;
680     while (1) {
681         file = BADFID;
682         for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
683             if (tb->dirty == 1) {
684                 if (file == BADFID)
685                     file = tb->file;
686                 if (file != BADFID && tb->file == file) {
687                     tb->dirty = 0;
688                     DedupBuffer(tb);
689                 }
690             }
691         }
692         if (file == BADFID)
693             break;
694         /* otherwise we have a file to sync */
695         code = (*adbase->sync) (adbase, file);
696         if (code)
697             rCode = code;
698     }
699     return rCode;
700 }
701
702 /*!
703  * \brief Same as DRead(), only do not even try to read the page.
704  */
705 static char *
706 DNew(struct ubik_trans *atrans, afs_int32 fid, int page)
707 {
708     struct buffer *tb;
709     struct ubik_dbase *dbase = atrans->dbase;
710
711     if ((tb = newslot(dbase, fid, page)) == 0)
712         return NULL;
713     tb->lockers++;
714     memset(tb->data, 0, UBIK_PAGESIZE);
715     return tb->data;
716 }
717
718 /*!
719  * \brief Read data from database.
720  */
721 int
722 udisk_read(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
723            afs_int32 apos, afs_int32 alen)
724 {
725     char *bp;
726     afs_int32 offset, len, totalLen;
727
728     if (atrans->flags & TRDONE)
729         return UDONE;
730     totalLen = 0;
731     while (alen > 0) {
732         bp = DRead(atrans, afile, apos >> UBIK_LOGPAGESIZE);
733         if (!bp)
734             return UEOF;
735         /* otherwise, min of remaining bytes and end of buffer to user mode */
736         offset = apos & (UBIK_PAGESIZE - 1);
737         len = UBIK_PAGESIZE - offset;
738         if (len > alen)
739             len = alen;
740         memcpy(abuffer, bp + offset, len);
741         abuffer = (char *)abuffer + len;
742         apos += len;
743         alen -= len;
744         totalLen += len;
745         DRelease(bp, 0);
746     }
747     return 0;
748 }
749
750 /*!
751  * \brief Truncate file.
752  */
753 int
754 udisk_truncate(struct ubik_trans *atrans, afs_int32 afile, afs_int32 alength)
755 {
756     afs_int32 code;
757     struct ubik_trunc *tt;
758
759     if (atrans->flags & TRDONE)
760         return UDONE;
761     if (atrans->type != UBIK_WRITETRANS)
762         return UBADTYPE;
763
764     /* write a truncate log record */
765     code = udisk_LogTruncate(atrans->dbase, afile, alength);
766
767     /* don't truncate until commit time */
768     tt = FindTrunc(atrans, afile);
769     if (!tt) {
770         /* this file not truncated yet */
771         tt = GetTrunc();
772         tt->next = atrans->activeTruncs;
773         atrans->activeTruncs = tt;
774         tt->file = afile;
775         tt->length = alength;
776     } else {
777         /* already truncated to a certain length */
778         if (tt->length > alength)
779             tt->length = alength;
780     }
781     return code;
782 }
783
784 /*!
785  * \brief Write data to database, using logs.
786  */
787 int
788 udisk_write(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
789             afs_int32 apos, afs_int32 alen)
790 {
791     char *bp;
792     afs_int32 offset, len, totalLen;
793     struct ubik_trunc *tt;
794     afs_int32 code;
795
796     if (atrans->flags & TRDONE)
797         return UDONE;
798     if (atrans->type != UBIK_WRITETRANS)
799         return UBADTYPE;
800
801     /* first write the data to the log */
802     code = udisk_LogWriteData(atrans->dbase, afile, abuffer, apos, alen);
803     if (code)
804         return code;
805
806     /* expand any truncations of this file */
807     tt = FindTrunc(atrans, afile);
808     if (tt) {
809         if (tt->length < apos + alen) {
810             tt->length = apos + alen;
811         }
812     }
813
814     /* now update vm */
815     totalLen = 0;
816     while (alen > 0) {
817         bp = DRead(atrans, afile, apos >> UBIK_LOGPAGESIZE);
818         if (!bp) {
819             bp = DNew(atrans, afile, apos >> UBIK_LOGPAGESIZE);
820             if (!bp)
821                 return UIOERROR;
822             memset(bp, 0, UBIK_PAGESIZE);
823         }
824         /* otherwise, min of remaining bytes and end of buffer to user mode */
825         offset = apos & (UBIK_PAGESIZE - 1);
826         len = UBIK_PAGESIZE - offset;
827         if (len > alen)
828             len = alen;
829         memcpy(bp + offset, abuffer, len);
830         abuffer = (char *)abuffer + len;
831         apos += len;
832         alen -= len;
833         totalLen += len;
834         DRelease(bp, 1);        /* buffer modified */
835     }
836     return 0;
837 }
838
839 /*!
840  * \brief Begin a new local transaction.
841  */
842 int
843 udisk_begin(struct ubik_dbase *adbase, int atype, struct ubik_trans **atrans)
844 {
845     afs_int32 code;
846     struct ubik_trans *tt;
847
848     *atrans = (struct ubik_trans *)NULL;
849     if (atype == UBIK_WRITETRANS) {
850         if (adbase->flags & DBWRITING)
851             return USYNC;
852         code = udisk_LogOpcode(adbase, LOGNEW, 0);
853         if (code)
854             return code;
855     }
856     tt = (struct ubik_trans *)malloc(sizeof(struct ubik_trans));
857     memset(tt, 0, sizeof(struct ubik_trans));
858     tt->dbase = adbase;
859     tt->next = adbase->activeTrans;
860     adbase->activeTrans = tt;
861     tt->type = atype;
862     if (atype == UBIK_READTRANS)
863         adbase->readers++;
864     else if (atype == UBIK_WRITETRANS)
865         adbase->flags |= DBWRITING;
866     *atrans = tt;
867     return 0;
868 }
869
870 /*!
871  * \brief Commit transaction.
872  */
873 int
874 udisk_commit(struct ubik_trans *atrans)
875 {
876     struct ubik_dbase *dbase;
877     afs_int32 code = 0;
878     struct ubik_version oldversion, newversion;
879
880     if (atrans->flags & TRDONE)
881         return (UTWOENDS);
882
883     if (atrans->type == UBIK_WRITETRANS) {
884         dbase = atrans->dbase;
885
886         /* On the first write to the database. We update the versions */
887         if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
888             oldversion = dbase->version;
889             newversion.epoch = FT_ApproxTime();;
890             newversion.counter = 1;
891
892             code = (*dbase->setlabel) (dbase, 0, &newversion);
893             if (code)
894                 return (code);
895             ubik_epochTime = newversion.epoch;
896             dbase->version = newversion;
897
898             /* Ignore the error here. If the call fails, the site is
899              * marked down and when we detect it is up again, we will
900              * send the entire database to it.
901              */
902             ContactQuorum_DISK_SetVersion( atrans, 1 /*CStampVersion */ ,
903                                            &oldversion, &newversion);
904             urecovery_state |= UBIK_RECLABELDB;
905         }
906
907         dbase->version.counter++;       /* bump commit count */
908 #ifdef AFS_PTHREAD_ENV
909         CV_BROADCAST(&dbase->version_cond);
910 #else
911         LWP_NoYieldSignal(&dbase->version);
912 #endif
913         code = udisk_LogEnd(dbase, &dbase->version);
914         if (code) {
915             dbase->version.counter--;
916             return (code);
917         }
918
919         /* If we fail anytime after this, then panic and let the
920          * recovery replay the log.
921          */
922         code = DFlush(atrans);  /* write dirty pages to respective files */
923         if (code)
924             panic("Writing Ubik DB modifications\n");
925         code = DSync(atrans);   /* sync the files and mark pages not dirty */
926         if (code)
927             panic("Synchronizing Ubik DB modifications\n");
928
929         code = DoTruncs(atrans);        /* Perform requested truncations */
930         if (code)
931             panic("Truncating Ubik DB\n");
932
933         /* label the committed dbase */
934         code = (*dbase->setlabel) (dbase, 0, &dbase->version);
935         if (code)
936             panic("Truncating Ubik DB\n");
937
938         code = (*dbase->truncate) (dbase, LOGFILE, 0);  /* discard log (optional) */
939         if (code)
940             panic("Truncating Ubik logfile\n");
941
942     }
943
944     /* When the transaction is marked done, it also means the logfile
945      * has been truncated.
946      */
947     atrans->flags |= TRDONE;
948     return code;
949 }
950
951 /*!
952  * \brief Abort transaction.
953  */
954 int
955 udisk_abort(struct ubik_trans *atrans)
956 {
957     struct ubik_dbase *dbase;
958     afs_int32 code;
959
960     if (atrans->flags & TRDONE)
961         return UTWOENDS;
962
963     /* Check if we are the write trans before logging abort, lest we
964      * abort a good write trans in progress.
965      * We don't really care if the LOGABORT gets to the log because we
966      * truncate the log next. If the truncate fails, we panic; for
967      * otherwise, the log entries remain. On restart, replay of the log
968      * will do nothing because the abort is there or no LogEnd opcode.
969      */
970     dbase = atrans->dbase;
971     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
972         udisk_LogOpcode(dbase, LOGABORT, 1);
973         code = (*dbase->truncate) (dbase, LOGFILE, 0);
974         if (code)
975             panic("Truncating Ubik logfile during an abort\n");
976         DAbort(atrans);         /* remove all dirty pages */
977     }
978
979     /* When the transaction is marked done, it also means the logfile
980      * has been truncated.
981      */
982     atrans->flags |= (TRABORT | TRDONE);
983     return 0;
984 }
985
986 /*!
987  * \brief Destroy a transaction after it has been committed or aborted.
988  *
989  * If it hasn't committed before you call this routine, we'll abort the
990  * transaction for you.
991  */
992 int
993 udisk_end(struct ubik_trans *atrans)
994 {
995     struct ubik_dbase *dbase;
996
997     if (!(atrans->flags & TRDONE))
998         udisk_abort(atrans);
999     dbase = atrans->dbase;
1000
1001     ulock_relLock(atrans);
1002     unthread(atrans);
1003
1004     /* check if we are the write trans before unsetting the DBWRITING bit, else
1005      * we could be unsetting someone else's bit.
1006      */
1007     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
1008         dbase->flags &= ~DBWRITING;
1009     } else {
1010         dbase->readers--;
1011     }
1012     if (atrans->iovec_info.iovec_wrt_val)
1013         free(atrans->iovec_info.iovec_wrt_val);
1014     if (atrans->iovec_data.iovec_buf_val)
1015         free(atrans->iovec_data.iovec_buf_val);
1016     free(atrans);
1017
1018     /* Wakeup any writers waiting in BeginTrans() */
1019 #ifdef AFS_PTHREAD_ENV
1020     CV_BROADCAST(&dbase->flags_cond);
1021 #else
1022     LWP_NoYieldSignal(&dbase->flags);
1023 #endif
1024     return 0;
1025 }