ubik: Buffer log writes with stdio
[openafs.git] / src / ubik / disk.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #include <roken.h>
14 #include <afs/opr.h>
15
16 #ifdef AFS_PTHREAD_ENV
17 # include <opr/lock.h>
18 #else
19 # include <opr/lockstub.h>
20 #endif
21 #include <afs/afsutil.h>
22
23 #define UBIK_INTERNALS
24 #include "ubik.h"
25 #include "ubik_int.h"
26
27 #define PHSIZE  128
28 static struct buffer {
29     struct ubik_dbase *dbase;   /*!< dbase within which the buffer resides */
30     afs_int32 file;             /*!< Unique cache key */
31     afs_int32 page;             /*!< page number */
32     struct buffer *lru_next;
33     struct buffer *lru_prev;
34     struct buffer *hashNext;    /*!< next dude in hash table */
35     char *data;                 /*!< ptr to the data */
36     char lockers;               /*!< usage ref count */
37     char dirty;                 /*!< is buffer modified */
38     char hashIndex;             /*!< back ptr to hash table */
39 } *Buffers;
40
41 #define pHash(page) ((page) & (PHSIZE-1))
42
43 afs_int32 ubik_nBuffers = NBUFFERS;
44 static struct buffer *phTable[PHSIZE];  /*!< page hash table */
45 static struct buffer *LruBuffer;
46 static int nbuffers;
47 static int calls = 0, ios = 0, lastb = 0;
48 static char *BufferData;
49 static struct buffer *newslot(struct ubik_dbase *adbase, afs_int32 afid,
50                               afs_int32 apage);
51 #define BADFID      0xffffffff
52
53 static int DTrunc(struct ubik_trans *atrans, afs_int32 fid, afs_int32 length);
54
55 static struct ubik_trunc *freeTruncList = 0;
56
57 /*!
58  * \brief Remove a transaction from the database's active transaction list.  Don't free it.
59  */
60 static int
61 unthread(struct ubik_trans *atrans)
62 {
63     struct ubik_trans **lt, *tt;
64     lt = &atrans->dbase->activeTrans;
65     for (tt = *lt; tt; lt = &tt->next, tt = *lt) {
66         if (tt == atrans) {
67             /* found it */
68             *lt = tt->next;
69             return 0;
70         }
71     }
72     return 2;                   /* no entry */
73 }
74
75 /*!
76  * \brief some debugging assistance
77  */
78 void
79 udisk_Debug(struct ubik_debug *aparm)
80 {
81     struct buffer *tb;
82     int i;
83
84     memcpy(&aparm->localVersion, &ubik_dbase->version,
85            sizeof(struct ubik_version));
86     aparm->lockedPages = 0;
87     aparm->writeLockedPages = 0;
88     tb = Buffers;
89     for (i = 0; i < nbuffers; i++, tb++) {
90         if (tb->lockers) {
91             aparm->lockedPages++;
92             if (tb->dirty)
93                 aparm->writeLockedPages++;
94         }
95     }
96 }
97
98 /*!
99  * \brief Write an opcode to the log.
100  *
101  * log format is defined here, and implicitly in recovery.c
102  *
103  * 4 byte opcode, followed by parameters, each 4 bytes long.  All integers
104  * are in logged in network standard byte order, in case we want to move logs
105  * from machine-to-machine someday.
106  *
107  * Begin transaction: opcode \n
108  * Commit transaction: opcode, version (8 bytes) \n
109  * Truncate file: opcode, file number, length \n
110  * Abort transaction: opcode \n
111  * Write data: opcode, file, position, length, <length> data bytes \n
112  */
113 static int
114 udisk_LogOpcode(struct ubik_dbase *adbase, afs_int32 aopcode, int async)
115 {
116     afs_int32 code;
117
118     /* setup data and do write */
119     aopcode = htonl(aopcode);
120     code = (*adbase->buffered_append)(adbase, LOGFILE, &aopcode, sizeof(afs_int32));
121     if (code != sizeof(afs_int32))
122         return UIOERROR;
123
124     /* optionally sync data */
125     if (async)
126         code = (*adbase->sync) (adbase, LOGFILE);
127     else
128         code = 0;
129     return code;
130 }
131
132 /*!
133  * \brief Log a commit, never syncing.
134  */
135 static int
136 udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion)
137 {
138     afs_int32 code;
139     afs_int32 data[3];
140
141     /* setup data */
142     data[0] = htonl(LOGEND);
143     data[1] = htonl(aversion->epoch);
144     data[2] = htonl(aversion->counter);
145
146     /* do write */
147     code =
148         (*adbase->buffered_append)(adbase, LOGFILE, data, 3 * sizeof(afs_int32));
149     if (code != 3 * sizeof(afs_int32))
150         return UIOERROR;
151
152     /* finally sync the log */
153     code = (*adbase->sync) (adbase, LOGFILE);
154     return code;
155 }
156
157 /*!
158  * \brief Log a truncate operation, never syncing.
159  */
160 static int
161 udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile,
162                   afs_int32 alength)
163 {
164     afs_int32 code;
165     afs_int32 data[3];
166
167     /* setup data */
168     data[0] = htonl(LOGTRUNCATE);
169     data[1] = htonl(afile);
170     data[2] = htonl(alength);
171
172     /* do write */
173     code =
174         (*adbase->buffered_append)(adbase, LOGFILE, data, 3 * sizeof(afs_int32));
175     if (code != 3 * sizeof(afs_int32))
176         return UIOERROR;
177     return 0;
178 }
179
180 /*!
181  * \brief Write some data to the log, never syncing.
182  */
183 static int
184 udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, void *abuffer,
185                    afs_int32 apos, afs_int32 alen)
186 {
187     afs_int32 code;
188     afs_int32 data[4];
189
190     /* setup header */
191     data[0] = htonl(LOGDATA);
192     data[1] = htonl(afile);
193     data[2] = htonl(apos);
194     data[3] = htonl(alen);
195
196     /* write header */
197     code =
198         (*adbase->buffered_append)(adbase, LOGFILE, data, 4 * sizeof(afs_int32));
199     if (code != 4 * sizeof(afs_int32))
200         return UIOERROR;
201
202     /* write data */
203     code = (*adbase->buffered_append)(adbase, LOGFILE, abuffer, alen);
204     if (code != alen)
205         return UIOERROR;
206     return 0;
207 }
208
209 int
210 udisk_Init(int abuffers)
211 {
212     /* Initialize the venus buffer system. */
213     int i;
214     struct buffer *tb;
215     Buffers = calloc(abuffers, sizeof(struct buffer));
216     BufferData = malloc(abuffers * UBIK_PAGESIZE);
217     nbuffers = abuffers;
218     for (i = 0; i < PHSIZE; i++)
219         phTable[i] = 0;
220     for (i = 0; i < abuffers; i++) {
221         /* Fill in each buffer with an empty indication. */
222         tb = &Buffers[i];
223         tb->lru_next = &(Buffers[i + 1]);
224         tb->lru_prev = &(Buffers[i - 1]);
225         tb->data = &BufferData[UBIK_PAGESIZE * i];
226         tb->file = BADFID;
227     }
228     Buffers[0].lru_prev = &(Buffers[abuffers - 1]);
229     Buffers[abuffers - 1].lru_next = &(Buffers[0]);
230     LruBuffer = &(Buffers[0]);
231     return 0;
232 }
233
234 /*!
235  * \brief Take a buffer and mark it as the least recently used buffer.
236  */
237 static void
238 Dlru(struct buffer *abuf)
239 {
240     if (LruBuffer == abuf)
241         return;
242
243     /* Unthread from where it is in the list */
244     abuf->lru_next->lru_prev = abuf->lru_prev;
245     abuf->lru_prev->lru_next = abuf->lru_next;
246
247     /* Thread onto beginning of LRU list */
248     abuf->lru_next = LruBuffer;
249     abuf->lru_prev = LruBuffer->lru_prev;
250
251     LruBuffer->lru_prev->lru_next = abuf;
252     LruBuffer->lru_prev = abuf;
253     LruBuffer = abuf;
254 }
255
256 /*!
257  * \brief Take a buffer and mark it as the most recently used buffer.
258  */
259 static void
260 Dmru(struct buffer *abuf)
261 {
262     if (LruBuffer == abuf) {
263         LruBuffer = LruBuffer->lru_next;
264         return;
265     }
266
267     /* Unthread from where it is in the list */
268     abuf->lru_next->lru_prev = abuf->lru_prev;
269     abuf->lru_prev->lru_next = abuf->lru_next;
270
271     /* Thread onto end of LRU list - making it the MRU buffer */
272     abuf->lru_next = LruBuffer;
273     abuf->lru_prev = LruBuffer->lru_prev;
274     LruBuffer->lru_prev->lru_next = abuf;
275     LruBuffer->lru_prev = abuf;
276 }
277
278 static_inline int
279 MatchBuffer(struct buffer *buf, int page, afs_int32 fid,
280             struct ubik_trans *atrans)
281 {
282     if (buf->page != page) {
283         return 0;
284     }
285     if (buf->file != fid) {
286         return 0;
287     }
288     if (atrans->type == UBIK_READTRANS && buf->dirty) {
289         /* if 'buf' is dirty, it has uncommitted changes; we do not want to
290          * see uncommitted changes if we are a read transaction, so skip over
291          * it. */
292         return 0;
293     }
294     if (buf->dbase != atrans->dbase) {
295         return 0;
296     }
297     return 1;
298 }
299
300 /*!
301  * \brief Get a pointer to a particular buffer.
302  */
303 static char *
304 DRead(struct ubik_trans *atrans, afs_int32 fid, int page)
305 {
306     /* Read a page from the disk. */
307     struct buffer *tb, *lastbuffer, *found_tb = NULL;
308     afs_int32 code;
309     struct ubik_dbase *dbase = atrans->dbase;
310
311     calls++;
312     lastbuffer = LruBuffer->lru_prev;
313
314     /* Skip for write transactions for a clean page - this may not be the right page to use */
315     if (MatchBuffer(lastbuffer, page, fid, atrans)
316                 && (atrans->type == UBIK_READTRANS || lastbuffer->dirty)) {
317         tb = lastbuffer;
318         tb->lockers++;
319         lastb++;
320         return tb->data;
321     }
322     for (tb = phTable[pHash(page)]; tb; tb = tb->hashNext) {
323         if (MatchBuffer(tb, page, fid, atrans)) {
324             if (tb->dirty || atrans->type == UBIK_READTRANS) {
325                 found_tb = tb;
326                 break;
327             }
328             /* Remember this clean page - we might use it */
329             found_tb = tb;
330         }
331     }
332     /* For a write transaction, use a matching clean page if no dirty one was found */
333     if (found_tb) {
334         Dmru(found_tb);
335         found_tb->lockers++;
336         return found_tb->data;
337     }
338
339     /* can't find it */
340     tb = newslot(dbase, fid, page);
341     if (!tb)
342         return 0;
343     memset(tb->data, 0, UBIK_PAGESIZE);
344
345     tb->lockers++;
346     code =
347         (*dbase->read) (dbase, fid, tb->data, page * UBIK_PAGESIZE,
348                         UBIK_PAGESIZE);
349     if (code < 0) {
350         tb->file = BADFID;
351         Dlru(tb);
352         tb->lockers--;
353         ViceLog(0, ("Ubik: Error reading database file: errno=%d\n", errno));
354         return 0;
355     }
356     ios++;
357
358     /* Note that findslot sets the page field in the buffer equal to
359      * what it is searching for.
360      */
361     return tb->data;
362 }
363
364 /*!
365  * \brief Zap truncated pages.
366  */
367 static int
368 DTrunc(struct ubik_trans *atrans, afs_int32 fid, afs_int32 length)
369 {
370     afs_int32 maxPage;
371     struct buffer *tb;
372     int i;
373     struct ubik_dbase *dbase = atrans->dbase;
374
375     maxPage = (length + UBIK_PAGESIZE - 1) >> UBIK_LOGPAGESIZE; /* first invalid page now in file */
376     for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
377         if (tb->page >= maxPage && tb->file == fid && tb->dbase == dbase) {
378             tb->file = BADFID;
379             Dlru(tb);
380         }
381     }
382     return 0;
383 }
384
385 /*!
386  * \brief Allocate a truncation entry.
387  *
388  * We allocate special entries representing truncations, rather than
389  * performing them immediately, so that we can abort a transaction easily by simply purging
390  * the in-core memory buffers and discarding these truncation entries.
391  */
392 static struct ubik_trunc *
393 GetTrunc(void)
394 {
395     struct ubik_trunc *tt;
396     if (!freeTruncList) {
397         freeTruncList = malloc(sizeof(struct ubik_trunc));
398         freeTruncList->next = (struct ubik_trunc *)0;
399     }
400     tt = freeTruncList;
401     freeTruncList = tt->next;
402     return tt;
403 }
404
405 /*!
406  * \brief Free a truncation entry.
407  */
408 static int
409 PutTrunc(struct ubik_trunc *at)
410 {
411     at->next = freeTruncList;
412     freeTruncList = at;
413     return 0;
414 }
415
416 /*!
417  * \brief Find a truncation entry for a file, if any.
418  */
419 static struct ubik_trunc *
420 FindTrunc(struct ubik_trans *atrans, afs_int32 afile)
421 {
422     struct ubik_trunc *tt;
423     for (tt = atrans->activeTruncs; tt; tt = tt->next) {
424         if (tt->file == afile)
425             return tt;
426     }
427     return (struct ubik_trunc *)0;
428 }
429
430 /*!
431  * \brief Do truncates associated with \p atrans, and free them.
432  */
433 static int
434 DoTruncs(struct ubik_trans *atrans)
435 {
436     struct ubik_trunc *tt, *nt;
437     int (*tproc) (struct ubik_dbase *, afs_int32, afs_int32);
438     afs_int32 rcode = 0, code;
439
440     tproc = atrans->dbase->truncate;
441     for (tt = atrans->activeTruncs; tt; tt = nt) {
442         nt = tt->next;
443         DTrunc(atrans, tt->file, tt->length);   /* zap pages from buffer cache */
444         code = (*tproc) (atrans->dbase, tt->file, tt->length);
445         if (code)
446             rcode = code;
447         PutTrunc(tt);
448     }
449     /* don't unthread, because we do the entire list's worth here */
450     atrans->activeTruncs = (struct ubik_trunc *)0;
451     return (rcode);
452 }
453
454 /*!
455  * \brief Mark an \p fid as invalid.
456  */
457 int
458 udisk_Invalidate(struct ubik_dbase *adbase, afs_int32 afid)
459 {
460     struct buffer *tb;
461     int i;
462
463     for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
464         if (tb->file == afid) {
465             tb->file = BADFID;
466             Dlru(tb);
467         }
468     }
469     return 0;
470 }
471
472 /*!
473  * \brief Move this page into the correct hash bucket.
474  */
475 static int
476 FixupBucket(struct buffer *ap)
477 {
478     struct buffer **lp, *tp;
479     int i;
480     /* first try to get it out of its current hash bucket, in which it might not be */
481     i = ap->hashIndex;
482     lp = &phTable[i];
483     for (tp = *lp; tp; tp = tp->hashNext) {
484         if (tp == ap) {
485             *lp = tp->hashNext;
486             break;
487         }
488         lp = &tp->hashNext;
489     }
490     /* now figure the new hash bucket */
491     i = pHash(ap->page);
492     ap->hashIndex = i;          /* remember where we are for deletion */
493     ap->hashNext = phTable[i];  /* add us to the list */
494     phTable[i] = ap;
495     return 0;
496 }
497
498 /*!
499  * \brief Create a new slot for a particular dbase page.
500  */
501 static struct buffer *
502 newslot(struct ubik_dbase *adbase, afs_int32 afid, afs_int32 apage)
503 {
504     /* Find a usable buffer slot */
505     afs_int32 i;
506     struct buffer *pp, *tp;
507
508     pp = 0;                     /* last pure */
509     for (i = 0, tp = LruBuffer; i < nbuffers; i++, tp = tp->lru_next) {
510         if (!tp->lockers && !tp->dirty) {
511             pp = tp;
512             break;
513         }
514     }
515
516     if (pp == 0) {
517         /* There are no unlocked buffers that don't need to be written to the disk. */
518         ViceLog(0, ("Ubik: Internal Error: Unable to find free buffer in ubik cache\n"));
519         return NULL;
520     }
521
522     /* Now fill in the header. */
523     pp->dbase = adbase;
524     pp->file = afid;
525     pp->page = apage;
526
527     FixupBucket(pp);            /* move to the right hash bucket */
528     Dmru(pp);
529     return pp;
530 }
531
532 /*!
533  * \brief Release a buffer, specifying whether or not the buffer has been modified by the locker.
534  */
535 static void
536 DRelease(char *ap, int flag)
537 {
538     int index;
539     struct buffer *bp;
540
541     if (!ap)
542         return;
543     index = (int)(ap - (char *)BufferData) >> UBIK_LOGPAGESIZE;
544     bp = &(Buffers[index]);
545     bp->lockers--;
546     if (flag)
547         bp->dirty = 1;
548     return;
549 }
550
551 /*!
552  * \brief Flush all modified buffers, leaves dirty bits set (they're cleared
553  * by DSync()).
554  *
555  * \note Note interaction with DSync(): you call this thing first,
556  * writing the buffers to the disk.  Then you call DSync() to sync all the
557  * files that were written, and to clear the dirty bits.  You should
558  * always call DFlush/DSync as a pair.
559  */
560 static int
561 DFlush(struct ubik_trans *atrans)
562 {
563     int i;
564     afs_int32 code;
565     struct buffer *tb;
566     struct ubik_dbase *adbase = atrans->dbase;
567
568     tb = Buffers;
569     for (i = 0; i < nbuffers; i++, tb++) {
570         if (tb->dirty) {
571             code = tb->page * UBIK_PAGESIZE;    /* offset within file */
572             code =
573                 (*adbase->write) (adbase, tb->file, tb->data, code,
574                                   UBIK_PAGESIZE);
575             if (code != UBIK_PAGESIZE)
576                 return UIOERROR;
577         }
578     }
579     return 0;
580 }
581
582 /*!
583  * \brief Flush all modified buffers.
584  */
585 static int
586 DAbort(struct ubik_trans *atrans)
587 {
588     int i;
589     struct buffer *tb;
590
591     tb = Buffers;
592     for (i = 0; i < nbuffers; i++, tb++) {
593         if (tb->dirty) {
594             tb->dirty = 0;
595             tb->file = BADFID;
596             Dlru(tb);
597         }
598     }
599     return 0;
600 }
601
602 /**
603  * Invalidate any buffers that are duplicates of abuf. Duplicate buffers
604  * can appear if a read transaction reads a page that is dirty, then that
605  * dirty page is synced. The read transaction will skip over the dirty page,
606  * and create a new buffer, and when the dirty page is synced, it will be
607  * identical (except for contents) to the read-transaction buffer.
608  */
609 static void
610 DedupBuffer(struct buffer *abuf)
611 {
612     struct buffer *tb;
613     for (tb = phTable[pHash(abuf->page)]; tb; tb = tb->hashNext) {
614         if (tb->page == abuf->page && tb != abuf && tb->file == abuf->file
615             && tb->dbase == abuf->dbase) {
616
617             tb->file = BADFID;
618             Dlru(tb);
619         }
620     }
621 }
622
623 /*!
624  * \attention DSync() must only be called after DFlush(), due to its interpretation of dirty flag.
625  */
626 static int
627 DSync(struct ubik_trans *atrans)
628 {
629     int i;
630     afs_int32 code;
631     struct buffer *tb;
632     afs_int32 file;
633     afs_int32 rCode;
634     struct ubik_dbase *adbase = atrans->dbase;
635
636     rCode = 0;
637     while (1) {
638         file = BADFID;
639         for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
640             if (tb->dirty == 1) {
641                 if (file == BADFID)
642                     file = tb->file;
643                 if (file != BADFID && tb->file == file) {
644                     tb->dirty = 0;
645                     DedupBuffer(tb);
646                 }
647             }
648         }
649         if (file == BADFID)
650             break;
651         /* otherwise we have a file to sync */
652         code = (*adbase->sync) (adbase, file);
653         if (code)
654             rCode = code;
655     }
656     return rCode;
657 }
658
659 /*!
660  * \brief Same as DRead(), only do not even try to read the page.
661  */
662 static char *
663 DNew(struct ubik_trans *atrans, afs_int32 fid, int page)
664 {
665     struct buffer *tb;
666     struct ubik_dbase *dbase = atrans->dbase;
667
668     if ((tb = newslot(dbase, fid, page)) == 0)
669         return NULL;
670     tb->lockers++;
671     memset(tb->data, 0, UBIK_PAGESIZE);
672     return tb->data;
673 }
674
675 /*!
676  * \brief Read data from database.
677  */
678 int
679 udisk_read(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
680            afs_int32 apos, afs_int32 alen)
681 {
682     char *bp;
683     afs_int32 offset, len, totalLen;
684
685     if (atrans->flags & TRDONE)
686         return UDONE;
687     totalLen = 0;
688     while (alen > 0) {
689         bp = DRead(atrans, afile, apos >> UBIK_LOGPAGESIZE);
690         if (!bp)
691             return UEOF;
692         /* otherwise, min of remaining bytes and end of buffer to user mode */
693         offset = apos & (UBIK_PAGESIZE - 1);
694         len = UBIK_PAGESIZE - offset;
695         if (len > alen)
696             len = alen;
697         memcpy(abuffer, bp + offset, len);
698         abuffer = (char *)abuffer + len;
699         apos += len;
700         alen -= len;
701         totalLen += len;
702         DRelease(bp, 0);
703     }
704     return 0;
705 }
706
707 /*!
708  * \brief Truncate file.
709  */
710 int
711 udisk_truncate(struct ubik_trans *atrans, afs_int32 afile, afs_int32 alength)
712 {
713     afs_int32 code;
714     struct ubik_trunc *tt;
715
716     if (atrans->flags & TRDONE)
717         return UDONE;
718     if (atrans->type != UBIK_WRITETRANS)
719         return UBADTYPE;
720
721     /* write a truncate log record */
722     code = udisk_LogTruncate(atrans->dbase, afile, alength);
723
724     /* don't truncate until commit time */
725     tt = FindTrunc(atrans, afile);
726     if (!tt) {
727         /* this file not truncated yet */
728         tt = GetTrunc();
729         tt->next = atrans->activeTruncs;
730         atrans->activeTruncs = tt;
731         tt->file = afile;
732         tt->length = alength;
733     } else {
734         /* already truncated to a certain length */
735         if (tt->length > alength)
736             tt->length = alength;
737     }
738     return code;
739 }
740
741 /*!
742  * \brief Write data to database, using logs.
743  */
744 int
745 udisk_write(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
746             afs_int32 apos, afs_int32 alen)
747 {
748     char *bp;
749     afs_int32 offset, len, totalLen;
750     struct ubik_trunc *tt;
751     afs_int32 code;
752
753     if (atrans->flags & TRDONE)
754         return UDONE;
755     if (atrans->type != UBIK_WRITETRANS)
756         return UBADTYPE;
757
758     /* first write the data to the log */
759     code = udisk_LogWriteData(atrans->dbase, afile, abuffer, apos, alen);
760     if (code)
761         return code;
762
763     /* expand any truncations of this file */
764     tt = FindTrunc(atrans, afile);
765     if (tt) {
766         if (tt->length < apos + alen) {
767             tt->length = apos + alen;
768         }
769     }
770
771     /* now update vm */
772     totalLen = 0;
773     while (alen > 0) {
774         bp = DRead(atrans, afile, apos >> UBIK_LOGPAGESIZE);
775         if (!bp) {
776             bp = DNew(atrans, afile, apos >> UBIK_LOGPAGESIZE);
777             if (!bp)
778                 return UIOERROR;
779         }
780         /* otherwise, min of remaining bytes and end of buffer to user mode */
781         offset = apos & (UBIK_PAGESIZE - 1);
782         len = UBIK_PAGESIZE - offset;
783         if (len > alen)
784             len = alen;
785         memcpy(bp + offset, abuffer, len);
786         abuffer = (char *)abuffer + len;
787         apos += len;
788         alen -= len;
789         totalLen += len;
790         DRelease(bp, 1);        /* buffer modified */
791     }
792     return 0;
793 }
794
795 /*!
796  * \brief Begin a new local transaction.
797  */
798 int
799 udisk_begin(struct ubik_dbase *adbase, int atype, struct ubik_trans **atrans)
800 {
801     afs_int32 code;
802     struct ubik_trans *tt;
803
804     *atrans = NULL;
805     if (atype == UBIK_WRITETRANS) {
806         if (adbase->flags & DBWRITING)
807             return USYNC;
808         code = udisk_LogOpcode(adbase, LOGNEW, 0);
809         if (code)
810             return code;
811     }
812     tt = calloc(1, sizeof(struct ubik_trans));
813     tt->dbase = adbase;
814     tt->next = adbase->activeTrans;
815     adbase->activeTrans = tt;
816     tt->type = atype;
817     if (atype == UBIK_READTRANS)
818         adbase->readers++;
819     else if (atype == UBIK_WRITETRANS) {
820         UBIK_VERSION_LOCK;
821         adbase->flags |= DBWRITING;
822         UBIK_VERSION_UNLOCK;
823     }
824     *atrans = tt;
825     return 0;
826 }
827
828 /*!
829  * \brief Commit transaction.
830  */
831 int
832 udisk_commit(struct ubik_trans *atrans)
833 {
834     struct ubik_dbase *dbase;
835     afs_int32 code = 0;
836     struct ubik_version oldversion, newversion;
837     afs_int32 now = FT_ApproxTime();
838
839     if (atrans->flags & TRDONE)
840         return (UTWOENDS);
841
842     if (atrans->type == UBIK_WRITETRANS) {
843         dbase = atrans->dbase;
844
845         /* On the first write to the database. We update the versions */
846         if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
847             UBIK_VERSION_LOCK;
848             if (version_globals.ubik_epochTime < UBIK_MILESTONE
849                 || version_globals.ubik_epochTime > now) {
850                 ViceLog(0,
851                     ("Ubik: New database label %d is out of the valid range (%d - %d)\n",
852                      version_globals.ubik_epochTime, UBIK_MILESTONE, now));
853                 panic("Writing Ubik DB label\n");
854             }
855             oldversion = dbase->version;
856             newversion.epoch = version_globals.ubik_epochTime;
857             newversion.counter = 1;
858
859             code = (*dbase->setlabel) (dbase, 0, &newversion);
860             if (code) {
861                 UBIK_VERSION_UNLOCK;
862                 return code;
863             }
864
865             dbase->version = newversion;
866             UBIK_VERSION_UNLOCK;
867
868             urecovery_state |= UBIK_RECLABELDB;
869
870             /* Ignore the error here. If the call fails, the site is
871              * marked down and when we detect it is up again, we will
872              * send the entire database to it.
873              */
874             ContactQuorum_DISK_SetVersion( atrans, 1 /*CStampVersion */ ,
875                                            &oldversion, &newversion);
876         }
877
878         UBIK_VERSION_LOCK;
879         dbase->version.counter++;       /* bump commit count */
880 #ifdef AFS_PTHREAD_ENV
881         opr_cv_broadcast(&dbase->version_cond);
882 #else
883         LWP_NoYieldSignal(&dbase->version);
884 #endif
885         code = udisk_LogEnd(dbase, &dbase->version);
886         if (code) {
887             dbase->version.counter--;
888             UBIK_VERSION_UNLOCK;
889             return code;
890         }
891         UBIK_VERSION_UNLOCK;
892
893         /* If we fail anytime after this, then panic and let the
894          * recovery replay the log.
895          */
896         code = DFlush(atrans);  /* write dirty pages to respective files */
897         if (code)
898             panic("Writing Ubik DB modifications\n");
899         code = DSync(atrans);   /* sync the files and mark pages not dirty */
900         if (code)
901             panic("Synchronizing Ubik DB modifications\n");
902
903         code = DoTruncs(atrans);        /* Perform requested truncations */
904         if (code)
905             panic("Truncating Ubik DB\n");
906
907         /* label the committed dbase */
908         code = (*dbase->setlabel) (dbase, 0, &dbase->version);
909         if (code)
910             panic("Truncating Ubik DB\n");
911
912         code = (*dbase->truncate) (dbase, LOGFILE, 0);  /* discard log (optional) */
913         if (code)
914             panic("Truncating Ubik logfile\n");
915
916     }
917
918     /* When the transaction is marked done, it also means the logfile
919      * has been truncated.
920      */
921     atrans->flags |= TRDONE;
922     return code;
923 }
924
925 /*!
926  * \brief Abort transaction.
927  */
928 int
929 udisk_abort(struct ubik_trans *atrans)
930 {
931     struct ubik_dbase *dbase;
932     afs_int32 code;
933
934     if (atrans->flags & TRDONE)
935         return UTWOENDS;
936
937     /* Check if we are the write trans before logging abort, lest we
938      * abort a good write trans in progress.
939      * We don't really care if the LOGABORT gets to the log because we
940      * truncate the log next. If the truncate fails, we panic; for
941      * otherwise, the log entries remain. On restart, replay of the log
942      * will do nothing because the abort is there or no LogEnd opcode.
943      */
944     dbase = atrans->dbase;
945     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
946         udisk_LogOpcode(dbase, LOGABORT, 1);
947         code = (*dbase->truncate) (dbase, LOGFILE, 0);
948         if (code)
949             panic("Truncating Ubik logfile during an abort\n");
950         DAbort(atrans);         /* remove all dirty pages */
951     }
952
953     /* When the transaction is marked done, it also means the logfile
954      * has been truncated.
955      */
956     atrans->flags |= (TRABORT | TRDONE);
957     return 0;
958 }
959
960 /*!
961  * \brief Destroy a transaction after it has been committed or aborted.
962  *
963  * If it hasn't committed before you call this routine, we'll abort the
964  * transaction for you.
965  */
966 int
967 udisk_end(struct ubik_trans *atrans)
968 {
969     struct ubik_dbase *dbase;
970
971     if (!(atrans->flags & TRDONE))
972         udisk_abort(atrans);
973     dbase = atrans->dbase;
974
975     ulock_relLock(atrans);
976     unthread(atrans);
977
978     /* check if we are the write trans before unsetting the DBWRITING bit, else
979      * we could be unsetting someone else's bit.
980      */
981     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
982         UBIK_VERSION_LOCK;
983         dbase->flags &= ~DBWRITING;
984         UBIK_VERSION_UNLOCK;
985     } else {
986         dbase->readers--;
987     }
988     if (atrans->iovec_info.iovec_wrt_val)
989         free(atrans->iovec_info.iovec_wrt_val);
990     if (atrans->iovec_data.iovec_buf_val)
991         free(atrans->iovec_data.iovec_buf_val);
992     free(atrans);
993
994     /* Wakeup any writers waiting in BeginTrans() */
995 #ifdef AFS_PTHREAD_ENV
996     opr_cv_broadcast(&dbase->flags_cond);
997 #else
998     LWP_NoYieldSignal(&dbase->flags);
999 #endif
1000     return 0;
1001 }