58fa8270cc7d6c17c5d91f923f87e114e0c2dc09
[openafs.git] / src / ubik / disk.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #include <roken.h>
14
15 #include <sys/types.h>
16 #include <string.h>
17 #include <stdarg.h>
18 #include <errno.h>
19
20 #ifdef AFS_NT40_ENV
21 #include <winsock2.h>
22 #else
23 #include <sys/file.h>
24 #include <netinet/in.h>
25 #endif
26
27 #include <lock.h>
28 #include <rx/xdr.h>
29
30
31
32 #define UBIK_INTERNALS
33 #include "ubik.h"
34 #include "ubik_int.h"
35
36 #define PHSIZE  128
37 static struct buffer {
38     struct ubik_dbase *dbase;   /*!< dbase within which the buffer resides */
39     afs_int32 file;             /*!< Unique cache key */
40     afs_int32 page;             /*!< page number */
41     struct buffer *lru_next;
42     struct buffer *lru_prev;
43     struct buffer *hashNext;    /*!< next dude in hash table */
44     char *data;                 /*!< ptr to the data */
45     char lockers;               /*!< usage ref count */
46     char dirty;                 /*!< is buffer modified */
47     char hashIndex;             /*!< back ptr to hash table */
48 } *Buffers;
49
50 #define pHash(page) ((page) & (PHSIZE-1))
51
52 afs_int32 ubik_nBuffers = NBUFFERS;
53 static struct buffer *phTable[PHSIZE];  /*!< page hash table */
54 static struct buffer *LruBuffer;
55 static int nbuffers;
56 static int calls = 0, ios = 0, lastb = 0;
57 static char *BufferData;
58 static struct buffer *newslot(struct ubik_dbase *adbase, afs_int32 afid,
59                               afs_int32 apage);
60 static int initd = 0;
61 #define BADFID      0xffffffff
62
63 static int DTrunc(struct ubik_trans *atrans, afs_int32 fid, afs_int32 length);
64
65 static struct ubik_trunc *freeTruncList = 0;
66
67 /*!
68  * \brief Remove a transaction from the database's active transaction list.  Don't free it.
69  */
70 static int
71 unthread(struct ubik_trans *atrans)
72 {
73     struct ubik_trans **lt, *tt;
74     lt = &atrans->dbase->activeTrans;
75     for (tt = *lt; tt; lt = &tt->next, tt = *lt) {
76         if (tt == atrans) {
77             /* found it */
78             *lt = tt->next;
79             return 0;
80         }
81     }
82     return 2;                   /* no entry */
83 }
84
85 /*!
86  * \brief some debugging assistance
87  */
88 void
89 udisk_Debug(struct ubik_debug *aparm)
90 {
91     struct buffer *tb;
92     int i;
93
94     memcpy(&aparm->localVersion, &ubik_dbase->version,
95            sizeof(struct ubik_version));
96     aparm->lockedPages = 0;
97     aparm->writeLockedPages = 0;
98     tb = Buffers;
99     for (i = 0; i < nbuffers; i++, tb++) {
100         if (tb->lockers) {
101             aparm->lockedPages++;
102             if (tb->dirty)
103                 aparm->writeLockedPages++;
104         }
105     }
106 }
107
108 /*!
109  * \brief Write an opcode to the log.
110  *
111  * log format is defined here, and implicitly in recovery.c
112  *
113  * 4 byte opcode, followed by parameters, each 4 bytes long.  All integers
114  * are in logged in network standard byte order, in case we want to move logs
115  * from machine-to-machine someday.
116  *
117  * Begin transaction: opcode \n
118  * Commit transaction: opcode, version (8 bytes) \n
119  * Truncate file: opcode, file number, length \n
120  * Abort transaction: opcode \n
121  * Write data: opcode, file, position, length, <length> data bytes \n
122  */
123 int
124 udisk_LogOpcode(struct ubik_dbase *adbase, afs_int32 aopcode, int async)
125 {
126     struct ubik_stat ustat;
127     afs_int32 code;
128
129     /* figure out where to write */
130     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
131     if (code < 0)
132         return code;
133
134     /* setup data and do write */
135     aopcode = htonl(aopcode);
136     code =
137         (*adbase->write) (adbase, LOGFILE, (char *)&aopcode, ustat.size,
138                           sizeof(afs_int32));
139     if (code != sizeof(afs_int32))
140         return UIOERROR;
141
142     /* optionally sync data */
143     if (async)
144         code = (*adbase->sync) (adbase, LOGFILE);
145     else
146         code = 0;
147     return code;
148 }
149
150 /*!
151  * \brief Log a commit, never syncing.
152  */
153 int
154 udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion)
155 {
156     afs_int32 code;
157     afs_int32 data[3];
158     struct ubik_stat ustat;
159
160     /* figure out where to write */
161     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
162     if (code)
163         return code;
164
165     /* setup data */
166     data[0] = htonl(LOGEND);
167     data[1] = htonl(aversion->epoch);
168     data[2] = htonl(aversion->counter);
169
170     /* do write */
171     code =
172         (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
173                           3 * sizeof(afs_int32));
174     if (code != 3 * sizeof(afs_int32))
175         return UIOERROR;
176
177     /* finally sync the log */
178     code = (*adbase->sync) (adbase, LOGFILE);
179     return code;
180 }
181
182 /*!
183  * \brief Log a truncate operation, never syncing.
184  */
185 int
186 udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile,
187                   afs_int32 alength)
188 {
189     afs_int32 code;
190     afs_int32 data[3];
191     struct ubik_stat ustat;
192
193     /* figure out where to write */
194     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
195     if (code < 0)
196         return code;
197
198     /* setup data */
199     data[0] = htonl(LOGTRUNCATE);
200     data[1] = htonl(afile);
201     data[2] = htonl(alength);
202
203     /* do write */
204     code =
205         (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
206                           3 * sizeof(afs_int32));
207     if (code != 3 * sizeof(afs_int32))
208         return UIOERROR;
209     return 0;
210 }
211
212 /*!
213  * \brief Write some data to the log, never syncing.
214  */
215 int
216 udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, void *abuffer,
217                    afs_int32 apos, afs_int32 alen)
218 {
219     struct ubik_stat ustat;
220     afs_int32 code;
221     afs_int32 data[4];
222     afs_int32 lpos;
223
224     /* find end of log */
225     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
226     lpos = ustat.size;
227     if (code < 0)
228         return code;
229
230     /* setup header */
231     data[0] = htonl(LOGDATA);
232     data[1] = htonl(afile);
233     data[2] = htonl(apos);
234     data[3] = htonl(alen);
235
236     /* write header */
237     code =
238         (*adbase->write) (adbase, LOGFILE, (char *)data, lpos, 4 * sizeof(afs_int32));
239     if (code != 4 * sizeof(afs_int32))
240         return UIOERROR;
241     lpos += 4 * sizeof(afs_int32);
242
243     /* write data */
244     code = (*adbase->write) (adbase, LOGFILE, abuffer, lpos, alen);
245     if (code != alen)
246         return UIOERROR;
247     return 0;
248 }
249
250 static int
251 DInit(int abuffers)
252 {
253     /* Initialize the venus buffer system. */
254     int i;
255     struct buffer *tb;
256     Buffers = (struct buffer *)malloc(abuffers * sizeof(struct buffer));
257     memset(Buffers, 0, abuffers * sizeof(struct buffer));
258     BufferData = (char *)malloc(abuffers * UBIK_PAGESIZE);
259     nbuffers = abuffers;
260     for (i = 0; i < PHSIZE; i++)
261         phTable[i] = 0;
262     for (i = 0; i < abuffers; i++) {
263         /* Fill in each buffer with an empty indication. */
264         tb = &Buffers[i];
265         tb->lru_next = &(Buffers[i + 1]);
266         tb->lru_prev = &(Buffers[i - 1]);
267         tb->data = &BufferData[UBIK_PAGESIZE * i];
268         tb->file = BADFID;
269     }
270     Buffers[0].lru_prev = &(Buffers[abuffers - 1]);
271     Buffers[abuffers - 1].lru_next = &(Buffers[0]);
272     LruBuffer = &(Buffers[0]);
273     return 0;
274 }
275
276 /*!
277  * \brief Take a buffer and mark it as the least recently used buffer.
278  */
279 static void
280 Dlru(struct buffer *abuf)
281 {
282     if (LruBuffer == abuf)
283         return;
284
285     /* Unthread from where it is in the list */
286     abuf->lru_next->lru_prev = abuf->lru_prev;
287     abuf->lru_prev->lru_next = abuf->lru_next;
288
289     /* Thread onto beginning of LRU list */
290     abuf->lru_next = LruBuffer;
291     abuf->lru_prev = LruBuffer->lru_prev;
292
293     LruBuffer->lru_prev->lru_next = abuf;
294     LruBuffer->lru_prev = abuf;
295     LruBuffer = abuf;
296 }
297
298 /*!
299  * \brief Take a buffer and mark it as the most recently used buffer.
300  */
301 static void
302 Dmru(struct buffer *abuf)
303 {
304     if (LruBuffer == abuf) {
305         LruBuffer = LruBuffer->lru_next;
306         return;
307     }
308
309     /* Unthread from where it is in the list */
310     abuf->lru_next->lru_prev = abuf->lru_prev;
311     abuf->lru_prev->lru_next = abuf->lru_next;
312
313     /* Thread onto end of LRU list - making it the MRU buffer */
314     abuf->lru_next = LruBuffer;
315     abuf->lru_prev = LruBuffer->lru_prev;
316     LruBuffer->lru_prev->lru_next = abuf;
317     LruBuffer->lru_prev = abuf;
318 }
319
320 static_inline int
321 MatchBuffer(struct buffer *buf, int page, afs_int32 fid,
322             struct ubik_trans *atrans)
323 {
324     if (buf->page != page) {
325         return 0;
326     }
327     if (buf->file != fid) {
328         return 0;
329     }
330     if (atrans->type == UBIK_READTRANS && buf->dirty) {
331         /* if 'buf' is dirty, it has uncommitted changes; we do not want to
332          * see uncommitted changes if we are a read transaction, so skip over
333          * it. */
334         return 0;
335     }
336     if (buf->dbase != atrans->dbase) {
337         return 0;
338     }
339     return 1;
340 }
341
342 /*!
343  * \brief Get a pointer to a particular buffer.
344  */
345 static char *
346 DRead(struct ubik_trans *atrans, afs_int32 fid, int page)
347 {
348     /* Read a page from the disk. */
349     struct buffer *tb, *lastbuffer;
350     afs_int32 code;
351     struct ubik_dbase *dbase = atrans->dbase;
352
353     calls++;
354     lastbuffer = LruBuffer->lru_prev;
355
356     if (MatchBuffer(lastbuffer, page, fid, atrans)) {
357         tb = lastbuffer;
358         tb->lockers++;
359         lastb++;
360         return tb->data;
361     }
362     for (tb = phTable[pHash(page)]; tb; tb = tb->hashNext) {
363         if (MatchBuffer(tb, page, fid, atrans)) {
364             Dmru(tb);
365             tb->lockers++;
366             return tb->data;
367         }
368     }
369     /* can't find it */
370     tb = newslot(dbase, fid, page);
371     if (!tb)
372         return 0;
373     memset(tb->data, 0, UBIK_PAGESIZE);
374
375     tb->lockers++;
376     code =
377         (*dbase->read) (dbase, fid, tb->data, page * UBIK_PAGESIZE,
378                         UBIK_PAGESIZE);
379     if (code < 0) {
380         tb->file = BADFID;
381         Dlru(tb);
382         tb->lockers--;
383         ubik_print("Ubik: Error reading database file: errno=%d\n", errno);
384         return 0;
385     }
386     ios++;
387
388     /* Note that findslot sets the page field in the buffer equal to
389      * what it is searching for.
390      */
391     return tb->data;
392 }
393
394 /*!
395  * \brief Zap truncated pages.
396  */
397 static int
398 DTrunc(struct ubik_trans *atrans, afs_int32 fid, afs_int32 length)
399 {
400     afs_int32 maxPage;
401     struct buffer *tb;
402     int i;
403     struct ubik_dbase *dbase = atrans->dbase;
404
405     maxPage = (length + UBIK_PAGESIZE - 1) >> UBIK_LOGPAGESIZE; /* first invalid page now in file */
406     for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
407         if (tb->page >= maxPage && tb->file == fid && tb->dbase == dbase) {
408             tb->file = BADFID;
409             Dlru(tb);
410         }
411     }
412     return 0;
413 }
414
415 /*!
416  * \brief Allocate a truncation entry.
417  *
418  * We allocate special entries representing truncations, rather than
419  * performing them immediately, so that we can abort a transaction easily by simply purging
420  * the in-core memory buffers and discarding these truncation entries.
421  */
422 static struct ubik_trunc *
423 GetTrunc(void)
424 {
425     struct ubik_trunc *tt;
426     if (!freeTruncList) {
427         freeTruncList =
428             (struct ubik_trunc *)malloc(sizeof(struct ubik_trunc));
429         freeTruncList->next = (struct ubik_trunc *)0;
430     }
431     tt = freeTruncList;
432     freeTruncList = tt->next;
433     return tt;
434 }
435
436 /*!
437  * \brief Free a truncation entry.
438  */
439 static int
440 PutTrunc(struct ubik_trunc *at)
441 {
442     at->next = freeTruncList;
443     freeTruncList = at;
444     return 0;
445 }
446
447 /*!
448  * \brief Find a truncation entry for a file, if any.
449  */
450 static struct ubik_trunc *
451 FindTrunc(struct ubik_trans *atrans, afs_int32 afile)
452 {
453     struct ubik_trunc *tt;
454     for (tt = atrans->activeTruncs; tt; tt = tt->next) {
455         if (tt->file == afile)
456             return tt;
457     }
458     return (struct ubik_trunc *)0;
459 }
460
461 /*!
462  * \brief Do truncates associated with \p atrans, and free them.
463  */
464 static int
465 DoTruncs(struct ubik_trans *atrans)
466 {
467     struct ubik_trunc *tt, *nt;
468     int (*tproc) (struct ubik_dbase *, afs_int32, afs_int32);
469     afs_int32 rcode = 0, code;
470
471     tproc = atrans->dbase->truncate;
472     for (tt = atrans->activeTruncs; tt; tt = nt) {
473         nt = tt->next;
474         DTrunc(atrans, tt->file, tt->length);   /* zap pages from buffer cache */
475         code = (*tproc) (atrans->dbase, tt->file, tt->length);
476         if (code)
477             rcode = code;
478         PutTrunc(tt);
479     }
480     /* don't unthread, because we do the entire list's worth here */
481     atrans->activeTruncs = (struct ubik_trunc *)0;
482     return (rcode);
483 }
484
485 /*!
486  * \brief Mark an \p fid as invalid.
487  */
488 int
489 udisk_Invalidate(struct ubik_dbase *adbase, afs_int32 afid)
490 {
491     struct buffer *tb;
492     int i;
493
494     for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
495         if (tb->file == afid) {
496             tb->file = BADFID;
497             Dlru(tb);
498         }
499     }
500     return 0;
501 }
502
503 /*!
504  * \brief Move this page into the correct hash bucket.
505  */
506 static int
507 FixupBucket(struct buffer *ap)
508 {
509     struct buffer **lp, *tp;
510     int i;
511     /* first try to get it out of its current hash bucket, in which it might not be */
512     i = ap->hashIndex;
513     lp = &phTable[i];
514     for (tp = *lp; tp; tp = tp->hashNext) {
515         if (tp == ap) {
516             *lp = tp->hashNext;
517             break;
518         }
519         lp = &tp->hashNext;
520     }
521     /* now figure the new hash bucket */
522     i = pHash(ap->page);
523     ap->hashIndex = i;          /* remember where we are for deletion */
524     ap->hashNext = phTable[i];  /* add us to the list */
525     phTable[i] = ap;
526     return 0;
527 }
528
529 /*!
530  * \brief Create a new slot for a particular dbase page.
531  */
532 static struct buffer *
533 newslot(struct ubik_dbase *adbase, afs_int32 afid, afs_int32 apage)
534 {
535     /* Find a usable buffer slot */
536     afs_int32 i;
537     struct buffer *pp, *tp;
538
539     pp = 0;                     /* last pure */
540     for (i = 0, tp = LruBuffer; i < nbuffers; i++, tp = tp->lru_next) {
541         if (!tp->lockers && !tp->dirty) {
542             pp = tp;
543             break;
544         }
545     }
546
547     if (pp == 0) {
548         /* There are no unlocked buffers that don't need to be written to the disk. */
549         ubik_print
550             ("Ubik: Internal Error: Unable to find free buffer in ubik cache\n");
551         return NULL;
552     }
553
554     /* Now fill in the header. */
555     pp->dbase = adbase;
556     pp->file = afid;
557     pp->page = apage;
558
559     FixupBucket(pp);            /* move to the right hash bucket */
560     Dmru(pp);
561     return pp;
562 }
563
564 /*!
565  * \brief Release a buffer, specifying whether or not the buffer has been modified by the locker.
566  */
567 static void
568 DRelease(char *ap, int flag)
569 {
570     int index;
571     struct buffer *bp;
572
573     if (!ap)
574         return;
575     index = (int)(ap - (char *)BufferData) >> UBIK_LOGPAGESIZE;
576     bp = &(Buffers[index]);
577     bp->lockers--;
578     if (flag)
579         bp->dirty = 1;
580     return;
581 }
582
583 /*!
584  * \brief Flush all modified buffers, leaves dirty bits set (they're cleared
585  * by DSync()).
586  *
587  * \note Note interaction with DSync(): you call this thing first,
588  * writing the buffers to the disk.  Then you call DSync() to sync all the
589  * files that were written, and to clear the dirty bits.  You should
590  * always call DFlush/DSync as a pair.
591  */
592 static int
593 DFlush(struct ubik_trans *atrans)
594 {
595     int i;
596     afs_int32 code;
597     struct buffer *tb;
598     struct ubik_dbase *adbase = atrans->dbase;
599
600     tb = Buffers;
601     for (i = 0; i < nbuffers; i++, tb++) {
602         if (tb->dirty) {
603             code = tb->page * UBIK_PAGESIZE;    /* offset within file */
604             code =
605                 (*adbase->write) (adbase, tb->file, tb->data, code,
606                                   UBIK_PAGESIZE);
607             if (code != UBIK_PAGESIZE)
608                 return UIOERROR;
609         }
610     }
611     return 0;
612 }
613
614 /*!
615  * \brief Flush all modified buffers.
616  */
617 static int
618 DAbort(struct ubik_trans *atrans)
619 {
620     int i;
621     struct buffer *tb;
622
623     tb = Buffers;
624     for (i = 0; i < nbuffers; i++, tb++) {
625         if (tb->dirty) {
626             tb->dirty = 0;
627             tb->file = BADFID;
628             Dlru(tb);
629         }
630     }
631     return 0;
632 }
633
634 /**
635  * Invalidate any buffers that are duplicates of abuf. Duplicate buffers
636  * can appear if a read transaction reads a page that is dirty, then that
637  * dirty page is synced. The read transaction will skip over the dirty page,
638  * and create a new buffer, and when the dirty page is synced, it will be
639  * identical (except for contents) to the read-transaction buffer.
640  */
641 static void
642 DedupBuffer(struct buffer *abuf)
643 {
644     struct buffer *tb;
645     for (tb = phTable[pHash(abuf->page)]; tb; tb = tb->hashNext) {
646         if (tb->page == abuf->page && tb != abuf && tb->file == abuf->file
647             && tb->dbase == abuf->dbase) {
648
649             tb->file = BADFID;
650             Dlru(tb);
651         }
652     }
653 }
654
655 /*!
656  * \attention DSync() must only be called after DFlush(), due to its interpretation of dirty flag.
657  */
658 static int
659 DSync(struct ubik_trans *atrans)
660 {
661     int i;
662     afs_int32 code;
663     struct buffer *tb;
664     afs_int32 file;
665     afs_int32 rCode;
666     struct ubik_dbase *adbase = atrans->dbase;
667
668     rCode = 0;
669     while (1) {
670         file = BADFID;
671         for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
672             if (tb->dirty == 1) {
673                 if (file == BADFID)
674                     file = tb->file;
675                 if (file != BADFID && tb->file == file) {
676                     tb->dirty = 0;
677                     DedupBuffer(tb);
678                 }
679             }
680         }
681         if (file == BADFID)
682             break;
683         /* otherwise we have a file to sync */
684         code = (*adbase->sync) (adbase, file);
685         if (code)
686             rCode = code;
687     }
688     return rCode;
689 }
690
691 /*!
692  * \brief Same as DRead(), only do not even try to read the page.
693  */
694 static char *
695 DNew(struct ubik_trans *atrans, afs_int32 fid, int page)
696 {
697     struct buffer *tb;
698     struct ubik_dbase *dbase = atrans->dbase;
699
700     if ((tb = newslot(dbase, fid, page)) == 0)
701         return NULL;
702     tb->lockers++;
703     memset(tb->data, 0, UBIK_PAGESIZE);
704     return tb->data;
705 }
706
707 /*!
708  * \brief Read data from database.
709  */
710 int
711 udisk_read(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
712            afs_int32 apos, afs_int32 alen)
713 {
714     char *bp;
715     afs_int32 offset, len, totalLen;
716
717     if (atrans->flags & TRDONE)
718         return UDONE;
719     totalLen = 0;
720     while (alen > 0) {
721         bp = DRead(atrans, afile, apos >> UBIK_LOGPAGESIZE);
722         if (!bp)
723             return UEOF;
724         /* otherwise, min of remaining bytes and end of buffer to user mode */
725         offset = apos & (UBIK_PAGESIZE - 1);
726         len = UBIK_PAGESIZE - offset;
727         if (len > alen)
728             len = alen;
729         memcpy(abuffer, bp + offset, len);
730         abuffer = (char *)abuffer + len;
731         apos += len;
732         alen -= len;
733         totalLen += len;
734         DRelease(bp, 0);
735     }
736     return 0;
737 }
738
739 /*!
740  * \brief Truncate file.
741  */
742 int
743 udisk_truncate(struct ubik_trans *atrans, afs_int32 afile, afs_int32 alength)
744 {
745     afs_int32 code;
746     struct ubik_trunc *tt;
747
748     if (atrans->flags & TRDONE)
749         return UDONE;
750     if (atrans->type != UBIK_WRITETRANS)
751         return UBADTYPE;
752
753     /* write a truncate log record */
754     code = udisk_LogTruncate(atrans->dbase, afile, alength);
755
756     /* don't truncate until commit time */
757     tt = FindTrunc(atrans, afile);
758     if (!tt) {
759         /* this file not truncated yet */
760         tt = GetTrunc();
761         tt->next = atrans->activeTruncs;
762         atrans->activeTruncs = tt;
763         tt->file = afile;
764         tt->length = alength;
765     } else {
766         /* already truncated to a certain length */
767         if (tt->length > alength)
768             tt->length = alength;
769     }
770     return code;
771 }
772
773 /*!
774  * \brief Write data to database, using logs.
775  */
776 int
777 udisk_write(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
778             afs_int32 apos, afs_int32 alen)
779 {
780     char *bp;
781     afs_int32 offset, len, totalLen;
782     struct ubik_trunc *tt;
783     afs_int32 code;
784
785     if (atrans->flags & TRDONE)
786         return UDONE;
787     if (atrans->type != UBIK_WRITETRANS)
788         return UBADTYPE;
789
790     /* first write the data to the log */
791     code = udisk_LogWriteData(atrans->dbase, afile, abuffer, apos, alen);
792     if (code)
793         return code;
794
795     /* expand any truncations of this file */
796     tt = FindTrunc(atrans, afile);
797     if (tt) {
798         if (tt->length < apos + alen) {
799             tt->length = apos + alen;
800         }
801     }
802
803     /* now update vm */
804     totalLen = 0;
805     while (alen > 0) {
806         bp = DRead(atrans, afile, apos >> UBIK_LOGPAGESIZE);
807         if (!bp) {
808             bp = DNew(atrans, afile, apos >> UBIK_LOGPAGESIZE);
809             if (!bp)
810                 return UIOERROR;
811             memset(bp, 0, UBIK_PAGESIZE);
812         }
813         /* otherwise, min of remaining bytes and end of buffer to user mode */
814         offset = apos & (UBIK_PAGESIZE - 1);
815         len = UBIK_PAGESIZE - offset;
816         if (len > alen)
817             len = alen;
818         memcpy(bp + offset, abuffer, len);
819         abuffer = (char *)abuffer + len;
820         apos += len;
821         alen -= len;
822         totalLen += len;
823         DRelease(bp, 1);        /* buffer modified */
824     }
825     return 0;
826 }
827
828 /*!
829  * \brief Begin a new local transaction.
830  */
831 int
832 udisk_begin(struct ubik_dbase *adbase, int atype, struct ubik_trans **atrans)
833 {
834     afs_int32 code;
835     struct ubik_trans *tt;
836
837     *atrans = (struct ubik_trans *)NULL;
838     /* Make sure system is initialized before doing anything */
839     if (!initd) {
840         initd = 1;
841         DInit(ubik_nBuffers);
842     }
843     if (atype == UBIK_WRITETRANS) {
844         if (adbase->flags & DBWRITING)
845             return USYNC;
846         code = udisk_LogOpcode(adbase, LOGNEW, 0);
847         if (code)
848             return code;
849     }
850     tt = (struct ubik_trans *)malloc(sizeof(struct ubik_trans));
851     memset(tt, 0, sizeof(struct ubik_trans));
852     tt->dbase = adbase;
853     tt->next = adbase->activeTrans;
854     adbase->activeTrans = tt;
855     tt->type = atype;
856     if (atype == UBIK_READTRANS)
857         adbase->readers++;
858     else if (atype == UBIK_WRITETRANS)
859         adbase->flags |= DBWRITING;
860     *atrans = tt;
861     return 0;
862 }
863
864 /*!
865  * \brief Commit transaction.
866  */
867 int
868 udisk_commit(struct ubik_trans *atrans)
869 {
870     struct ubik_dbase *dbase;
871     afs_int32 code = 0;
872     struct ubik_version oldversion, newversion;
873
874     if (atrans->flags & TRDONE)
875         return (UTWOENDS);
876
877     if (atrans->type == UBIK_WRITETRANS) {
878         dbase = atrans->dbase;
879
880         /* On the first write to the database. We update the versions */
881         if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
882             oldversion = dbase->version;
883             newversion.epoch = FT_ApproxTime();;
884             newversion.counter = 1;
885
886             code = (*dbase->setlabel) (dbase, 0, &newversion);
887             if (code)
888                 return (code);
889             ubik_epochTime = newversion.epoch;
890             dbase->version = newversion;
891
892             /* Ignore the error here. If the call fails, the site is
893              * marked down and when we detect it is up again, we will
894              * send the entire database to it.
895              */
896             ContactQuorum_DISK_SetVersion( atrans, 1 /*CStampVersion */ ,
897                                            &oldversion, &newversion);
898             urecovery_state |= UBIK_RECLABELDB;
899         }
900
901         dbase->version.counter++;       /* bump commit count */
902 #ifdef AFS_PTHREAD_ENV
903         CV_BROADCAST(&dbase->version_cond);
904 #else
905         LWP_NoYieldSignal(&dbase->version);
906 #endif
907         code = udisk_LogEnd(dbase, &dbase->version);
908         if (code) {
909             dbase->version.counter--;
910             return (code);
911         }
912
913         /* If we fail anytime after this, then panic and let the
914          * recovery replay the log.
915          */
916         code = DFlush(atrans);  /* write dirty pages to respective files */
917         if (code)
918             panic("Writing Ubik DB modifications\n");
919         code = DSync(atrans);   /* sync the files and mark pages not dirty */
920         if (code)
921             panic("Synchronizing Ubik DB modifications\n");
922
923         code = DoTruncs(atrans);        /* Perform requested truncations */
924         if (code)
925             panic("Truncating Ubik DB\n");
926
927         /* label the committed dbase */
928         code = (*dbase->setlabel) (dbase, 0, &dbase->version);
929         if (code)
930             panic("Truncating Ubik DB\n");
931
932         code = (*dbase->truncate) (dbase, LOGFILE, 0);  /* discard log (optional) */
933         if (code)
934             panic("Truncating Ubik logfile\n");
935
936     }
937
938     /* When the transaction is marked done, it also means the logfile
939      * has been truncated.
940      */
941     atrans->flags |= TRDONE;
942     return code;
943 }
944
945 /*!
946  * \brief Abort transaction.
947  */
948 int
949 udisk_abort(struct ubik_trans *atrans)
950 {
951     struct ubik_dbase *dbase;
952     afs_int32 code;
953
954     if (atrans->flags & TRDONE)
955         return UTWOENDS;
956
957     /* Check if we are the write trans before logging abort, lest we
958      * abort a good write trans in progress.
959      * We don't really care if the LOGABORT gets to the log because we
960      * truncate the log next. If the truncate fails, we panic; for
961      * otherwise, the log entries remain. On restart, replay of the log
962      * will do nothing because the abort is there or no LogEnd opcode.
963      */
964     dbase = atrans->dbase;
965     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
966         udisk_LogOpcode(dbase, LOGABORT, 1);
967         code = (*dbase->truncate) (dbase, LOGFILE, 0);
968         if (code)
969             panic("Truncating Ubik logfile during an abort\n");
970         DAbort(atrans);         /* remove all dirty pages */
971     }
972
973     /* When the transaction is marked done, it also means the logfile
974      * has been truncated.
975      */
976     atrans->flags |= (TRABORT | TRDONE);
977     return 0;
978 }
979
980 /*!
981  * \brief Destroy a transaction after it has been committed or aborted.
982  *
983  * If it hasn't committed before you call this routine, we'll abort the
984  * transaction for you.
985  */
986 int
987 udisk_end(struct ubik_trans *atrans)
988 {
989     struct ubik_dbase *dbase;
990
991 #if defined(UBIK_PAUSE)
992     /* Another thread is trying to lock this transaction.
993      * That can only be an RPC doing SDISK_Lock.
994      * Unlock the transaction, 'cause otherwise the other
995      * thread will never wake up.  Don't free it because
996      * the caller will do that already.
997      */
998     if (atrans->flags & TRSETLOCK) {
999         atrans->flags |= TRSTALE;
1000         ulock_relLock(atrans);
1001         return UINTERNAL;
1002     }
1003 #endif /* UBIK_PAUSE */
1004     if (!(atrans->flags & TRDONE))
1005         udisk_abort(atrans);
1006     dbase = atrans->dbase;
1007
1008     ulock_relLock(atrans);
1009     unthread(atrans);
1010
1011     /* check if we are the write trans before unsetting the DBWRITING bit, else
1012      * we could be unsetting someone else's bit.
1013      */
1014     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
1015         dbase->flags &= ~DBWRITING;
1016     } else {
1017         dbase->readers--;
1018     }
1019     if (atrans->iovec_info.iovec_wrt_val)
1020         free(atrans->iovec_info.iovec_wrt_val);
1021     if (atrans->iovec_data.iovec_buf_val)
1022         free(atrans->iovec_data.iovec_buf_val);
1023     free(atrans);
1024
1025     /* Wakeup any writers waiting in BeginTrans() */
1026 #ifdef AFS_PTHREAD_ENV
1027     CV_BROADCAST(&dbase->flags_cond);
1028 #else
1029     LWP_NoYieldSignal(&dbase->flags);
1030 #endif
1031     return 0;
1032 }