ubik: set UBIK_RECLABELDB before propagating version
[openafs.git] / src / ubik / disk.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #include <roken.h>
14
15 #include <lock.h>
16 #include <rx/xdr.h>
17
18 #define UBIK_INTERNALS
19 #include "ubik.h"
20 #include "ubik_int.h"
21
22 #define PHSIZE  128
23 static struct buffer {
24     struct ubik_dbase *dbase;   /*!< dbase within which the buffer resides */
25     afs_int32 file;             /*!< Unique cache key */
26     afs_int32 page;             /*!< page number */
27     struct buffer *lru_next;
28     struct buffer *lru_prev;
29     struct buffer *hashNext;    /*!< next dude in hash table */
30     char *data;                 /*!< ptr to the data */
31     char lockers;               /*!< usage ref count */
32     char dirty;                 /*!< is buffer modified */
33     char hashIndex;             /*!< back ptr to hash table */
34 } *Buffers;
35
36 #define pHash(page) ((page) & (PHSIZE-1))
37
38 afs_int32 ubik_nBuffers = NBUFFERS;
39 static struct buffer *phTable[PHSIZE];  /*!< page hash table */
40 static struct buffer *LruBuffer;
41 static int nbuffers;
42 static int calls = 0, ios = 0, lastb = 0;
43 static char *BufferData;
44 static struct buffer *newslot(struct ubik_dbase *adbase, afs_int32 afid,
45                               afs_int32 apage);
46 #define BADFID      0xffffffff
47
48 static int DTrunc(struct ubik_trans *atrans, afs_int32 fid, afs_int32 length);
49
50 static struct ubik_trunc *freeTruncList = 0;
51
52 /*!
53  * \brief Remove a transaction from the database's active transaction list.  Don't free it.
54  */
55 static int
56 unthread(struct ubik_trans *atrans)
57 {
58     struct ubik_trans **lt, *tt;
59     lt = &atrans->dbase->activeTrans;
60     for (tt = *lt; tt; lt = &tt->next, tt = *lt) {
61         if (tt == atrans) {
62             /* found it */
63             *lt = tt->next;
64             return 0;
65         }
66     }
67     return 2;                   /* no entry */
68 }
69
70 /*!
71  * \brief some debugging assistance
72  */
73 void
74 udisk_Debug(struct ubik_debug *aparm)
75 {
76     struct buffer *tb;
77     int i;
78
79     memcpy(&aparm->localVersion, &ubik_dbase->version,
80            sizeof(struct ubik_version));
81     aparm->lockedPages = 0;
82     aparm->writeLockedPages = 0;
83     tb = Buffers;
84     for (i = 0; i < nbuffers; i++, tb++) {
85         if (tb->lockers) {
86             aparm->lockedPages++;
87             if (tb->dirty)
88                 aparm->writeLockedPages++;
89         }
90     }
91 }
92
93 /*!
94  * \brief Write an opcode to the log.
95  *
96  * log format is defined here, and implicitly in recovery.c
97  *
98  * 4 byte opcode, followed by parameters, each 4 bytes long.  All integers
99  * are in logged in network standard byte order, in case we want to move logs
100  * from machine-to-machine someday.
101  *
102  * Begin transaction: opcode \n
103  * Commit transaction: opcode, version (8 bytes) \n
104  * Truncate file: opcode, file number, length \n
105  * Abort transaction: opcode \n
106  * Write data: opcode, file, position, length, <length> data bytes \n
107  */
108 int
109 udisk_LogOpcode(struct ubik_dbase *adbase, afs_int32 aopcode, int async)
110 {
111     struct ubik_stat ustat;
112     afs_int32 code;
113
114     /* figure out where to write */
115     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
116     if (code < 0)
117         return code;
118
119     /* setup data and do write */
120     aopcode = htonl(aopcode);
121     code =
122         (*adbase->write) (adbase, LOGFILE, (char *)&aopcode, ustat.size,
123                           sizeof(afs_int32));
124     if (code != sizeof(afs_int32))
125         return UIOERROR;
126
127     /* optionally sync data */
128     if (async)
129         code = (*adbase->sync) (adbase, LOGFILE);
130     else
131         code = 0;
132     return code;
133 }
134
135 /*!
136  * \brief Log a commit, never syncing.
137  */
138 int
139 udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion)
140 {
141     afs_int32 code;
142     afs_int32 data[3];
143     struct ubik_stat ustat;
144
145     /* figure out where to write */
146     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
147     if (code)
148         return code;
149
150     /* setup data */
151     data[0] = htonl(LOGEND);
152     data[1] = htonl(aversion->epoch);
153     data[2] = htonl(aversion->counter);
154
155     /* do write */
156     code =
157         (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
158                           3 * sizeof(afs_int32));
159     if (code != 3 * sizeof(afs_int32))
160         return UIOERROR;
161
162     /* finally sync the log */
163     code = (*adbase->sync) (adbase, LOGFILE);
164     return code;
165 }
166
167 /*!
168  * \brief Log a truncate operation, never syncing.
169  */
170 int
171 udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile,
172                   afs_int32 alength)
173 {
174     afs_int32 code;
175     afs_int32 data[3];
176     struct ubik_stat ustat;
177
178     /* figure out where to write */
179     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
180     if (code < 0)
181         return code;
182
183     /* setup data */
184     data[0] = htonl(LOGTRUNCATE);
185     data[1] = htonl(afile);
186     data[2] = htonl(alength);
187
188     /* do write */
189     code =
190         (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
191                           3 * sizeof(afs_int32));
192     if (code != 3 * sizeof(afs_int32))
193         return UIOERROR;
194     return 0;
195 }
196
197 /*!
198  * \brief Write some data to the log, never syncing.
199  */
200 int
201 udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, void *abuffer,
202                    afs_int32 apos, afs_int32 alen)
203 {
204     struct ubik_stat ustat;
205     afs_int32 code;
206     afs_int32 data[4];
207     afs_int32 lpos;
208
209     /* find end of log */
210     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
211     lpos = ustat.size;
212     if (code < 0)
213         return code;
214
215     /* setup header */
216     data[0] = htonl(LOGDATA);
217     data[1] = htonl(afile);
218     data[2] = htonl(apos);
219     data[3] = htonl(alen);
220
221     /* write header */
222     code =
223         (*adbase->write) (adbase, LOGFILE, (char *)data, lpos, 4 * sizeof(afs_int32));
224     if (code != 4 * sizeof(afs_int32))
225         return UIOERROR;
226     lpos += 4 * sizeof(afs_int32);
227
228     /* write data */
229     code = (*adbase->write) (adbase, LOGFILE, abuffer, lpos, alen);
230     if (code != alen)
231         return UIOERROR;
232     return 0;
233 }
234
235 int
236 udisk_Init(int abuffers)
237 {
238     /* Initialize the venus buffer system. */
239     int i;
240     struct buffer *tb;
241     Buffers = (struct buffer *)malloc(abuffers * sizeof(struct buffer));
242     memset(Buffers, 0, abuffers * sizeof(struct buffer));
243     BufferData = (char *)malloc(abuffers * UBIK_PAGESIZE);
244     nbuffers = abuffers;
245     for (i = 0; i < PHSIZE; i++)
246         phTable[i] = 0;
247     for (i = 0; i < abuffers; i++) {
248         /* Fill in each buffer with an empty indication. */
249         tb = &Buffers[i];
250         tb->lru_next = &(Buffers[i + 1]);
251         tb->lru_prev = &(Buffers[i - 1]);
252         tb->data = &BufferData[UBIK_PAGESIZE * i];
253         tb->file = BADFID;
254     }
255     Buffers[0].lru_prev = &(Buffers[abuffers - 1]);
256     Buffers[abuffers - 1].lru_next = &(Buffers[0]);
257     LruBuffer = &(Buffers[0]);
258     return 0;
259 }
260
261 /*!
262  * \brief Take a buffer and mark it as the least recently used buffer.
263  */
264 static void
265 Dlru(struct buffer *abuf)
266 {
267     if (LruBuffer == abuf)
268         return;
269
270     /* Unthread from where it is in the list */
271     abuf->lru_next->lru_prev = abuf->lru_prev;
272     abuf->lru_prev->lru_next = abuf->lru_next;
273
274     /* Thread onto beginning of LRU list */
275     abuf->lru_next = LruBuffer;
276     abuf->lru_prev = LruBuffer->lru_prev;
277
278     LruBuffer->lru_prev->lru_next = abuf;
279     LruBuffer->lru_prev = abuf;
280     LruBuffer = abuf;
281 }
282
283 /*!
284  * \brief Take a buffer and mark it as the most recently used buffer.
285  */
286 static void
287 Dmru(struct buffer *abuf)
288 {
289     if (LruBuffer == abuf) {
290         LruBuffer = LruBuffer->lru_next;
291         return;
292     }
293
294     /* Unthread from where it is in the list */
295     abuf->lru_next->lru_prev = abuf->lru_prev;
296     abuf->lru_prev->lru_next = abuf->lru_next;
297
298     /* Thread onto end of LRU list - making it the MRU buffer */
299     abuf->lru_next = LruBuffer;
300     abuf->lru_prev = LruBuffer->lru_prev;
301     LruBuffer->lru_prev->lru_next = abuf;
302     LruBuffer->lru_prev = abuf;
303 }
304
305 static_inline int
306 MatchBuffer(struct buffer *buf, int page, afs_int32 fid,
307             struct ubik_trans *atrans)
308 {
309     if (buf->page != page) {
310         return 0;
311     }
312     if (buf->file != fid) {
313         return 0;
314     }
315     if (atrans->type == UBIK_READTRANS && buf->dirty) {
316         /* if 'buf' is dirty, it has uncommitted changes; we do not want to
317          * see uncommitted changes if we are a read transaction, so skip over
318          * it. */
319         return 0;
320     }
321     if (buf->dbase != atrans->dbase) {
322         return 0;
323     }
324     return 1;
325 }
326
327 /*!
328  * \brief Get a pointer to a particular buffer.
329  */
330 static char *
331 DRead(struct ubik_trans *atrans, afs_int32 fid, int page)
332 {
333     /* Read a page from the disk. */
334     struct buffer *tb, *lastbuffer, *found_tb = NULL;
335     afs_int32 code;
336     struct ubik_dbase *dbase = atrans->dbase;
337
338     calls++;
339     lastbuffer = LruBuffer->lru_prev;
340
341     /* Skip for write transactions for a clean page - this may not be the right page to use */
342     if (MatchBuffer(lastbuffer, page, fid, atrans)
343                 && (atrans->type == UBIK_READTRANS || lastbuffer->dirty)) {
344         tb = lastbuffer;
345         tb->lockers++;
346         lastb++;
347         return tb->data;
348     }
349     for (tb = phTable[pHash(page)]; tb; tb = tb->hashNext) {
350         if (MatchBuffer(tb, page, fid, atrans)) {
351             if (tb->dirty || atrans->type == UBIK_READTRANS) {
352                 found_tb = tb;
353                 break;
354             }
355             /* Remember this clean page - we might use it */
356             found_tb = tb;
357         }
358     }
359     /* For a write transaction, use a matching clean page if no dirty one was found */
360     if (found_tb) {
361         Dmru(found_tb);
362         found_tb->lockers++;
363         return found_tb->data;
364     }
365
366     /* can't find it */
367     tb = newslot(dbase, fid, page);
368     if (!tb)
369         return 0;
370     memset(tb->data, 0, UBIK_PAGESIZE);
371
372     tb->lockers++;
373     code =
374         (*dbase->read) (dbase, fid, tb->data, page * UBIK_PAGESIZE,
375                         UBIK_PAGESIZE);
376     if (code < 0) {
377         tb->file = BADFID;
378         Dlru(tb);
379         tb->lockers--;
380         ubik_print("Ubik: Error reading database file: errno=%d\n", errno);
381         return 0;
382     }
383     ios++;
384
385     /* Note that findslot sets the page field in the buffer equal to
386      * what it is searching for.
387      */
388     return tb->data;
389 }
390
391 /*!
392  * \brief Zap truncated pages.
393  */
394 static int
395 DTrunc(struct ubik_trans *atrans, afs_int32 fid, afs_int32 length)
396 {
397     afs_int32 maxPage;
398     struct buffer *tb;
399     int i;
400     struct ubik_dbase *dbase = atrans->dbase;
401
402     maxPage = (length + UBIK_PAGESIZE - 1) >> UBIK_LOGPAGESIZE; /* first invalid page now in file */
403     for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
404         if (tb->page >= maxPage && tb->file == fid && tb->dbase == dbase) {
405             tb->file = BADFID;
406             Dlru(tb);
407         }
408     }
409     return 0;
410 }
411
412 /*!
413  * \brief Allocate a truncation entry.
414  *
415  * We allocate special entries representing truncations, rather than
416  * performing them immediately, so that we can abort a transaction easily by simply purging
417  * the in-core memory buffers and discarding these truncation entries.
418  */
419 static struct ubik_trunc *
420 GetTrunc(void)
421 {
422     struct ubik_trunc *tt;
423     if (!freeTruncList) {
424         freeTruncList =
425             (struct ubik_trunc *)malloc(sizeof(struct ubik_trunc));
426         freeTruncList->next = (struct ubik_trunc *)0;
427     }
428     tt = freeTruncList;
429     freeTruncList = tt->next;
430     return tt;
431 }
432
433 /*!
434  * \brief Free a truncation entry.
435  */
436 static int
437 PutTrunc(struct ubik_trunc *at)
438 {
439     at->next = freeTruncList;
440     freeTruncList = at;
441     return 0;
442 }
443
444 /*!
445  * \brief Find a truncation entry for a file, if any.
446  */
447 static struct ubik_trunc *
448 FindTrunc(struct ubik_trans *atrans, afs_int32 afile)
449 {
450     struct ubik_trunc *tt;
451     for (tt = atrans->activeTruncs; tt; tt = tt->next) {
452         if (tt->file == afile)
453             return tt;
454     }
455     return (struct ubik_trunc *)0;
456 }
457
458 /*!
459  * \brief Do truncates associated with \p atrans, and free them.
460  */
461 static int
462 DoTruncs(struct ubik_trans *atrans)
463 {
464     struct ubik_trunc *tt, *nt;
465     int (*tproc) (struct ubik_dbase *, afs_int32, afs_int32);
466     afs_int32 rcode = 0, code;
467
468     tproc = atrans->dbase->truncate;
469     for (tt = atrans->activeTruncs; tt; tt = nt) {
470         nt = tt->next;
471         DTrunc(atrans, tt->file, tt->length);   /* zap pages from buffer cache */
472         code = (*tproc) (atrans->dbase, tt->file, tt->length);
473         if (code)
474             rcode = code;
475         PutTrunc(tt);
476     }
477     /* don't unthread, because we do the entire list's worth here */
478     atrans->activeTruncs = (struct ubik_trunc *)0;
479     return (rcode);
480 }
481
482 /*!
483  * \brief Mark an \p fid as invalid.
484  */
485 int
486 udisk_Invalidate(struct ubik_dbase *adbase, afs_int32 afid)
487 {
488     struct buffer *tb;
489     int i;
490
491     for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
492         if (tb->file == afid) {
493             tb->file = BADFID;
494             Dlru(tb);
495         }
496     }
497     return 0;
498 }
499
500 /*!
501  * \brief Move this page into the correct hash bucket.
502  */
503 static int
504 FixupBucket(struct buffer *ap)
505 {
506     struct buffer **lp, *tp;
507     int i;
508     /* first try to get it out of its current hash bucket, in which it might not be */
509     i = ap->hashIndex;
510     lp = &phTable[i];
511     for (tp = *lp; tp; tp = tp->hashNext) {
512         if (tp == ap) {
513             *lp = tp->hashNext;
514             break;
515         }
516         lp = &tp->hashNext;
517     }
518     /* now figure the new hash bucket */
519     i = pHash(ap->page);
520     ap->hashIndex = i;          /* remember where we are for deletion */
521     ap->hashNext = phTable[i];  /* add us to the list */
522     phTable[i] = ap;
523     return 0;
524 }
525
526 /*!
527  * \brief Create a new slot for a particular dbase page.
528  */
529 static struct buffer *
530 newslot(struct ubik_dbase *adbase, afs_int32 afid, afs_int32 apage)
531 {
532     /* Find a usable buffer slot */
533     afs_int32 i;
534     struct buffer *pp, *tp;
535
536     pp = 0;                     /* last pure */
537     for (i = 0, tp = LruBuffer; i < nbuffers; i++, tp = tp->lru_next) {
538         if (!tp->lockers && !tp->dirty) {
539             pp = tp;
540             break;
541         }
542     }
543
544     if (pp == 0) {
545         /* There are no unlocked buffers that don't need to be written to the disk. */
546         ubik_print
547             ("Ubik: Internal Error: Unable to find free buffer in ubik cache\n");
548         return NULL;
549     }
550
551     /* Now fill in the header. */
552     pp->dbase = adbase;
553     pp->file = afid;
554     pp->page = apage;
555
556     FixupBucket(pp);            /* move to the right hash bucket */
557     Dmru(pp);
558     return pp;
559 }
560
561 /*!
562  * \brief Release a buffer, specifying whether or not the buffer has been modified by the locker.
563  */
564 static void
565 DRelease(char *ap, int flag)
566 {
567     int index;
568     struct buffer *bp;
569
570     if (!ap)
571         return;
572     index = (int)(ap - (char *)BufferData) >> UBIK_LOGPAGESIZE;
573     bp = &(Buffers[index]);
574     bp->lockers--;
575     if (flag)
576         bp->dirty = 1;
577     return;
578 }
579
580 /*!
581  * \brief Flush all modified buffers, leaves dirty bits set (they're cleared
582  * by DSync()).
583  *
584  * \note Note interaction with DSync(): you call this thing first,
585  * writing the buffers to the disk.  Then you call DSync() to sync all the
586  * files that were written, and to clear the dirty bits.  You should
587  * always call DFlush/DSync as a pair.
588  */
589 static int
590 DFlush(struct ubik_trans *atrans)
591 {
592     int i;
593     afs_int32 code;
594     struct buffer *tb;
595     struct ubik_dbase *adbase = atrans->dbase;
596
597     tb = Buffers;
598     for (i = 0; i < nbuffers; i++, tb++) {
599         if (tb->dirty) {
600             code = tb->page * UBIK_PAGESIZE;    /* offset within file */
601             code =
602                 (*adbase->write) (adbase, tb->file, tb->data, code,
603                                   UBIK_PAGESIZE);
604             if (code != UBIK_PAGESIZE)
605                 return UIOERROR;
606         }
607     }
608     return 0;
609 }
610
611 /*!
612  * \brief Flush all modified buffers.
613  */
614 static int
615 DAbort(struct ubik_trans *atrans)
616 {
617     int i;
618     struct buffer *tb;
619
620     tb = Buffers;
621     for (i = 0; i < nbuffers; i++, tb++) {
622         if (tb->dirty) {
623             tb->dirty = 0;
624             tb->file = BADFID;
625             Dlru(tb);
626         }
627     }
628     return 0;
629 }
630
631 /**
632  * Invalidate any buffers that are duplicates of abuf. Duplicate buffers
633  * can appear if a read transaction reads a page that is dirty, then that
634  * dirty page is synced. The read transaction will skip over the dirty page,
635  * and create a new buffer, and when the dirty page is synced, it will be
636  * identical (except for contents) to the read-transaction buffer.
637  */
638 static void
639 DedupBuffer(struct buffer *abuf)
640 {
641     struct buffer *tb;
642     for (tb = phTable[pHash(abuf->page)]; tb; tb = tb->hashNext) {
643         if (tb->page == abuf->page && tb != abuf && tb->file == abuf->file
644             && tb->dbase == abuf->dbase) {
645
646             tb->file = BADFID;
647             Dlru(tb);
648         }
649     }
650 }
651
652 /*!
653  * \attention DSync() must only be called after DFlush(), due to its interpretation of dirty flag.
654  */
655 static int
656 DSync(struct ubik_trans *atrans)
657 {
658     int i;
659     afs_int32 code;
660     struct buffer *tb;
661     afs_int32 file;
662     afs_int32 rCode;
663     struct ubik_dbase *adbase = atrans->dbase;
664
665     rCode = 0;
666     while (1) {
667         file = BADFID;
668         for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
669             if (tb->dirty == 1) {
670                 if (file == BADFID)
671                     file = tb->file;
672                 if (file != BADFID && tb->file == file) {
673                     tb->dirty = 0;
674                     DedupBuffer(tb);
675                 }
676             }
677         }
678         if (file == BADFID)
679             break;
680         /* otherwise we have a file to sync */
681         code = (*adbase->sync) (adbase, file);
682         if (code)
683             rCode = code;
684     }
685     return rCode;
686 }
687
688 /*!
689  * \brief Same as DRead(), only do not even try to read the page.
690  */
691 static char *
692 DNew(struct ubik_trans *atrans, afs_int32 fid, int page)
693 {
694     struct buffer *tb;
695     struct ubik_dbase *dbase = atrans->dbase;
696
697     if ((tb = newslot(dbase, fid, page)) == 0)
698         return NULL;
699     tb->lockers++;
700     memset(tb->data, 0, UBIK_PAGESIZE);
701     return tb->data;
702 }
703
704 /*!
705  * \brief Read data from database.
706  */
707 int
708 udisk_read(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
709            afs_int32 apos, afs_int32 alen)
710 {
711     char *bp;
712     afs_int32 offset, len, totalLen;
713
714     if (atrans->flags & TRDONE)
715         return UDONE;
716     totalLen = 0;
717     while (alen > 0) {
718         bp = DRead(atrans, afile, apos >> UBIK_LOGPAGESIZE);
719         if (!bp)
720             return UEOF;
721         /* otherwise, min of remaining bytes and end of buffer to user mode */
722         offset = apos & (UBIK_PAGESIZE - 1);
723         len = UBIK_PAGESIZE - offset;
724         if (len > alen)
725             len = alen;
726         memcpy(abuffer, bp + offset, len);
727         abuffer = (char *)abuffer + len;
728         apos += len;
729         alen -= len;
730         totalLen += len;
731         DRelease(bp, 0);
732     }
733     return 0;
734 }
735
736 /*!
737  * \brief Truncate file.
738  */
739 int
740 udisk_truncate(struct ubik_trans *atrans, afs_int32 afile, afs_int32 alength)
741 {
742     afs_int32 code;
743     struct ubik_trunc *tt;
744
745     if (atrans->flags & TRDONE)
746         return UDONE;
747     if (atrans->type != UBIK_WRITETRANS)
748         return UBADTYPE;
749
750     /* write a truncate log record */
751     code = udisk_LogTruncate(atrans->dbase, afile, alength);
752
753     /* don't truncate until commit time */
754     tt = FindTrunc(atrans, afile);
755     if (!tt) {
756         /* this file not truncated yet */
757         tt = GetTrunc();
758         tt->next = atrans->activeTruncs;
759         atrans->activeTruncs = tt;
760         tt->file = afile;
761         tt->length = alength;
762     } else {
763         /* already truncated to a certain length */
764         if (tt->length > alength)
765             tt->length = alength;
766     }
767     return code;
768 }
769
770 /*!
771  * \brief Write data to database, using logs.
772  */
773 int
774 udisk_write(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
775             afs_int32 apos, afs_int32 alen)
776 {
777     char *bp;
778     afs_int32 offset, len, totalLen;
779     struct ubik_trunc *tt;
780     afs_int32 code;
781
782     if (atrans->flags & TRDONE)
783         return UDONE;
784     if (atrans->type != UBIK_WRITETRANS)
785         return UBADTYPE;
786
787     /* first write the data to the log */
788     code = udisk_LogWriteData(atrans->dbase, afile, abuffer, apos, alen);
789     if (code)
790         return code;
791
792     /* expand any truncations of this file */
793     tt = FindTrunc(atrans, afile);
794     if (tt) {
795         if (tt->length < apos + alen) {
796             tt->length = apos + alen;
797         }
798     }
799
800     /* now update vm */
801     totalLen = 0;
802     while (alen > 0) {
803         bp = DRead(atrans, afile, apos >> UBIK_LOGPAGESIZE);
804         if (!bp) {
805             bp = DNew(atrans, afile, apos >> UBIK_LOGPAGESIZE);
806             if (!bp)
807                 return UIOERROR;
808             memset(bp, 0, UBIK_PAGESIZE);
809         }
810         /* otherwise, min of remaining bytes and end of buffer to user mode */
811         offset = apos & (UBIK_PAGESIZE - 1);
812         len = UBIK_PAGESIZE - offset;
813         if (len > alen)
814             len = alen;
815         memcpy(bp + offset, abuffer, len);
816         abuffer = (char *)abuffer + len;
817         apos += len;
818         alen -= len;
819         totalLen += len;
820         DRelease(bp, 1);        /* buffer modified */
821     }
822     return 0;
823 }
824
825 /*!
826  * \brief Begin a new local transaction.
827  */
828 int
829 udisk_begin(struct ubik_dbase *adbase, int atype, struct ubik_trans **atrans)
830 {
831     afs_int32 code;
832     struct ubik_trans *tt;
833
834     *atrans = (struct ubik_trans *)NULL;
835     if (atype == UBIK_WRITETRANS) {
836         if (adbase->flags & DBWRITING)
837             return USYNC;
838         code = udisk_LogOpcode(adbase, LOGNEW, 0);
839         if (code)
840             return code;
841     }
842     tt = (struct ubik_trans *)malloc(sizeof(struct ubik_trans));
843     memset(tt, 0, sizeof(struct ubik_trans));
844     tt->dbase = adbase;
845     tt->next = adbase->activeTrans;
846     adbase->activeTrans = tt;
847     tt->type = atype;
848     if (atype == UBIK_READTRANS)
849         adbase->readers++;
850     else if (atype == UBIK_WRITETRANS) {
851         UBIK_VERSION_LOCK;
852         adbase->flags |= DBWRITING;
853         UBIK_VERSION_UNLOCK;
854     }
855     *atrans = tt;
856     return 0;
857 }
858
859 /*!
860  * \brief Commit transaction.
861  */
862 int
863 udisk_commit(struct ubik_trans *atrans)
864 {
865     struct ubik_dbase *dbase;
866     afs_int32 code = 0;
867     struct ubik_version oldversion, newversion;
868
869     if (atrans->flags & TRDONE)
870         return (UTWOENDS);
871
872     if (atrans->type == UBIK_WRITETRANS) {
873         dbase = atrans->dbase;
874
875         /* On the first write to the database. We update the versions */
876         if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
877             UBIK_VERSION_LOCK;
878             oldversion = dbase->version;
879             newversion.epoch = FT_ApproxTime();;
880             newversion.counter = 1;
881
882             code = (*dbase->setlabel) (dbase, 0, &newversion);
883             if (code)
884                 return (code);
885             version_globals.ubik_epochTime = newversion.epoch;
886             dbase->version = newversion;
887             UBIK_VERSION_UNLOCK;
888
889             urecovery_state |= UBIK_RECLABELDB;
890
891             /* Ignore the error here. If the call fails, the site is
892              * marked down and when we detect it is up again, we will
893              * send the entire database to it.
894              */
895             ContactQuorum_DISK_SetVersion( atrans, 1 /*CStampVersion */ ,
896                                            &oldversion, &newversion);
897         }
898
899         UBIK_VERSION_LOCK;
900         dbase->version.counter++;       /* bump commit count */
901 #ifdef AFS_PTHREAD_ENV
902         CV_BROADCAST(&dbase->version_cond);
903 #else
904         LWP_NoYieldSignal(&dbase->version);
905 #endif
906         code = udisk_LogEnd(dbase, &dbase->version);
907         if (code) {
908             dbase->version.counter--;
909             return (code);
910         }
911         UBIK_VERSION_UNLOCK;
912
913         /* If we fail anytime after this, then panic and let the
914          * recovery replay the log.
915          */
916         code = DFlush(atrans);  /* write dirty pages to respective files */
917         if (code)
918             panic("Writing Ubik DB modifications\n");
919         code = DSync(atrans);   /* sync the files and mark pages not dirty */
920         if (code)
921             panic("Synchronizing Ubik DB modifications\n");
922
923         code = DoTruncs(atrans);        /* Perform requested truncations */
924         if (code)
925             panic("Truncating Ubik DB\n");
926
927         /* label the committed dbase */
928         code = (*dbase->setlabel) (dbase, 0, &dbase->version);
929         if (code)
930             panic("Truncating Ubik DB\n");
931
932         code = (*dbase->truncate) (dbase, LOGFILE, 0);  /* discard log (optional) */
933         if (code)
934             panic("Truncating Ubik logfile\n");
935
936     }
937
938     /* When the transaction is marked done, it also means the logfile
939      * has been truncated.
940      */
941     atrans->flags |= TRDONE;
942     return code;
943 }
944
945 /*!
946  * \brief Abort transaction.
947  */
948 int
949 udisk_abort(struct ubik_trans *atrans)
950 {
951     struct ubik_dbase *dbase;
952     afs_int32 code;
953
954     if (atrans->flags & TRDONE)
955         return UTWOENDS;
956
957     /* Check if we are the write trans before logging abort, lest we
958      * abort a good write trans in progress.
959      * We don't really care if the LOGABORT gets to the log because we
960      * truncate the log next. If the truncate fails, we panic; for
961      * otherwise, the log entries remain. On restart, replay of the log
962      * will do nothing because the abort is there or no LogEnd opcode.
963      */
964     dbase = atrans->dbase;
965     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
966         udisk_LogOpcode(dbase, LOGABORT, 1);
967         code = (*dbase->truncate) (dbase, LOGFILE, 0);
968         if (code)
969             panic("Truncating Ubik logfile during an abort\n");
970         DAbort(atrans);         /* remove all dirty pages */
971     }
972
973     /* When the transaction is marked done, it also means the logfile
974      * has been truncated.
975      */
976     atrans->flags |= (TRABORT | TRDONE);
977     return 0;
978 }
979
980 /*!
981  * \brief Destroy a transaction after it has been committed or aborted.
982  *
983  * If it hasn't committed before you call this routine, we'll abort the
984  * transaction for you.
985  */
986 int
987 udisk_end(struct ubik_trans *atrans)
988 {
989     struct ubik_dbase *dbase;
990
991     if (!(atrans->flags & TRDONE))
992         udisk_abort(atrans);
993     dbase = atrans->dbase;
994
995     ulock_relLock(atrans);
996     unthread(atrans);
997
998     /* check if we are the write trans before unsetting the DBWRITING bit, else
999      * we could be unsetting someone else's bit.
1000      */
1001     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
1002         UBIK_VERSION_LOCK;
1003         dbase->flags &= ~DBWRITING;
1004         UBIK_VERSION_UNLOCK;
1005     } else {
1006         dbase->readers--;
1007     }
1008     if (atrans->iovec_info.iovec_wrt_val)
1009         free(atrans->iovec_info.iovec_wrt_val);
1010     if (atrans->iovec_data.iovec_buf_val)
1011         free(atrans->iovec_data.iovec_buf_val);
1012     free(atrans);
1013
1014     /* Wakeup any writers waiting in BeginTrans() */
1015 #ifdef AFS_PTHREAD_ENV
1016     CV_BROADCAST(&dbase->flags_cond);
1017 #else
1018     LWP_NoYieldSignal(&dbase->flags);
1019 #endif
1020     return 0;
1021 }