ubik: convert ubik_print to ViceLog
[openafs.git] / src / ubik / disk.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #include <roken.h>
14 #include <afs/opr.h>
15
16 #ifdef AFS_PTHREAD_ENV
17 # include <opr/lock.h>
18 #else
19 # include <opr/lockstub.h>
20 #endif
21 #include <afs/afsutil.h>
22
23 #define UBIK_INTERNALS
24 #include "ubik.h"
25 #include "ubik_int.h"
26
27 #define PHSIZE  128
28 static struct buffer {
29     struct ubik_dbase *dbase;   /*!< dbase within which the buffer resides */
30     afs_int32 file;             /*!< Unique cache key */
31     afs_int32 page;             /*!< page number */
32     struct buffer *lru_next;
33     struct buffer *lru_prev;
34     struct buffer *hashNext;    /*!< next dude in hash table */
35     char *data;                 /*!< ptr to the data */
36     char lockers;               /*!< usage ref count */
37     char dirty;                 /*!< is buffer modified */
38     char hashIndex;             /*!< back ptr to hash table */
39 } *Buffers;
40
41 #define pHash(page) ((page) & (PHSIZE-1))
42
43 afs_int32 ubik_nBuffers = NBUFFERS;
44 static struct buffer *phTable[PHSIZE];  /*!< page hash table */
45 static struct buffer *LruBuffer;
46 static int nbuffers;
47 static int calls = 0, ios = 0, lastb = 0;
48 static char *BufferData;
49 static struct buffer *newslot(struct ubik_dbase *adbase, afs_int32 afid,
50                               afs_int32 apage);
51 #define BADFID      0xffffffff
52
53 static int DTrunc(struct ubik_trans *atrans, afs_int32 fid, afs_int32 length);
54
55 static struct ubik_trunc *freeTruncList = 0;
56
57 /*!
58  * \brief Remove a transaction from the database's active transaction list.  Don't free it.
59  */
60 static int
61 unthread(struct ubik_trans *atrans)
62 {
63     struct ubik_trans **lt, *tt;
64     lt = &atrans->dbase->activeTrans;
65     for (tt = *lt; tt; lt = &tt->next, tt = *lt) {
66         if (tt == atrans) {
67             /* found it */
68             *lt = tt->next;
69             return 0;
70         }
71     }
72     return 2;                   /* no entry */
73 }
74
75 /*!
76  * \brief some debugging assistance
77  */
78 void
79 udisk_Debug(struct ubik_debug *aparm)
80 {
81     struct buffer *tb;
82     int i;
83
84     memcpy(&aparm->localVersion, &ubik_dbase->version,
85            sizeof(struct ubik_version));
86     aparm->lockedPages = 0;
87     aparm->writeLockedPages = 0;
88     tb = Buffers;
89     for (i = 0; i < nbuffers; i++, tb++) {
90         if (tb->lockers) {
91             aparm->lockedPages++;
92             if (tb->dirty)
93                 aparm->writeLockedPages++;
94         }
95     }
96 }
97
98 /*!
99  * \brief Write an opcode to the log.
100  *
101  * log format is defined here, and implicitly in recovery.c
102  *
103  * 4 byte opcode, followed by parameters, each 4 bytes long.  All integers
104  * are in logged in network standard byte order, in case we want to move logs
105  * from machine-to-machine someday.
106  *
107  * Begin transaction: opcode \n
108  * Commit transaction: opcode, version (8 bytes) \n
109  * Truncate file: opcode, file number, length \n
110  * Abort transaction: opcode \n
111  * Write data: opcode, file, position, length, <length> data bytes \n
112  */
113 static int
114 udisk_LogOpcode(struct ubik_dbase *adbase, afs_int32 aopcode, int async)
115 {
116     struct ubik_stat ustat;
117     afs_int32 code;
118
119     /* figure out where to write */
120     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
121     if (code < 0)
122         return code;
123
124     /* setup data and do write */
125     aopcode = htonl(aopcode);
126     code =
127         (*adbase->write) (adbase, LOGFILE, (char *)&aopcode, ustat.size,
128                           sizeof(afs_int32));
129     if (code != sizeof(afs_int32))
130         return UIOERROR;
131
132     /* optionally sync data */
133     if (async)
134         code = (*adbase->sync) (adbase, LOGFILE);
135     else
136         code = 0;
137     return code;
138 }
139
140 /*!
141  * \brief Log a commit, never syncing.
142  */
143 static int
144 udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion)
145 {
146     afs_int32 code;
147     afs_int32 data[3];
148     struct ubik_stat ustat;
149
150     /* figure out where to write */
151     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
152     if (code)
153         return code;
154
155     /* setup data */
156     data[0] = htonl(LOGEND);
157     data[1] = htonl(aversion->epoch);
158     data[2] = htonl(aversion->counter);
159
160     /* do write */
161     code =
162         (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
163                           3 * sizeof(afs_int32));
164     if (code != 3 * sizeof(afs_int32))
165         return UIOERROR;
166
167     /* finally sync the log */
168     code = (*adbase->sync) (adbase, LOGFILE);
169     return code;
170 }
171
172 /*!
173  * \brief Log a truncate operation, never syncing.
174  */
175 static int
176 udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile,
177                   afs_int32 alength)
178 {
179     afs_int32 code;
180     afs_int32 data[3];
181     struct ubik_stat ustat;
182
183     /* figure out where to write */
184     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
185     if (code < 0)
186         return code;
187
188     /* setup data */
189     data[0] = htonl(LOGTRUNCATE);
190     data[1] = htonl(afile);
191     data[2] = htonl(alength);
192
193     /* do write */
194     code =
195         (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
196                           3 * sizeof(afs_int32));
197     if (code != 3 * sizeof(afs_int32))
198         return UIOERROR;
199     return 0;
200 }
201
202 /*!
203  * \brief Write some data to the log, never syncing.
204  */
205 static int
206 udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, void *abuffer,
207                    afs_int32 apos, afs_int32 alen)
208 {
209     struct ubik_stat ustat;
210     afs_int32 code;
211     afs_int32 data[4];
212     afs_int32 lpos;
213
214     /* find end of log */
215     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
216     lpos = ustat.size;
217     if (code < 0)
218         return code;
219
220     /* setup header */
221     data[0] = htonl(LOGDATA);
222     data[1] = htonl(afile);
223     data[2] = htonl(apos);
224     data[3] = htonl(alen);
225
226     /* write header */
227     code =
228         (*adbase->write) (adbase, LOGFILE, (char *)data, lpos, 4 * sizeof(afs_int32));
229     if (code != 4 * sizeof(afs_int32))
230         return UIOERROR;
231     lpos += 4 * sizeof(afs_int32);
232
233     /* write data */
234     code = (*adbase->write) (adbase, LOGFILE, abuffer, lpos, alen);
235     if (code != alen)
236         return UIOERROR;
237     return 0;
238 }
239
240 int
241 udisk_Init(int abuffers)
242 {
243     /* Initialize the venus buffer system. */
244     int i;
245     struct buffer *tb;
246     Buffers = calloc(abuffers, sizeof(struct buffer));
247     BufferData = malloc(abuffers * UBIK_PAGESIZE);
248     nbuffers = abuffers;
249     for (i = 0; i < PHSIZE; i++)
250         phTable[i] = 0;
251     for (i = 0; i < abuffers; i++) {
252         /* Fill in each buffer with an empty indication. */
253         tb = &Buffers[i];
254         tb->lru_next = &(Buffers[i + 1]);
255         tb->lru_prev = &(Buffers[i - 1]);
256         tb->data = &BufferData[UBIK_PAGESIZE * i];
257         tb->file = BADFID;
258     }
259     Buffers[0].lru_prev = &(Buffers[abuffers - 1]);
260     Buffers[abuffers - 1].lru_next = &(Buffers[0]);
261     LruBuffer = &(Buffers[0]);
262     return 0;
263 }
264
265 /*!
266  * \brief Take a buffer and mark it as the least recently used buffer.
267  */
268 static void
269 Dlru(struct buffer *abuf)
270 {
271     if (LruBuffer == abuf)
272         return;
273
274     /* Unthread from where it is in the list */
275     abuf->lru_next->lru_prev = abuf->lru_prev;
276     abuf->lru_prev->lru_next = abuf->lru_next;
277
278     /* Thread onto beginning of LRU list */
279     abuf->lru_next = LruBuffer;
280     abuf->lru_prev = LruBuffer->lru_prev;
281
282     LruBuffer->lru_prev->lru_next = abuf;
283     LruBuffer->lru_prev = abuf;
284     LruBuffer = abuf;
285 }
286
287 /*!
288  * \brief Take a buffer and mark it as the most recently used buffer.
289  */
290 static void
291 Dmru(struct buffer *abuf)
292 {
293     if (LruBuffer == abuf) {
294         LruBuffer = LruBuffer->lru_next;
295         return;
296     }
297
298     /* Unthread from where it is in the list */
299     abuf->lru_next->lru_prev = abuf->lru_prev;
300     abuf->lru_prev->lru_next = abuf->lru_next;
301
302     /* Thread onto end of LRU list - making it the MRU buffer */
303     abuf->lru_next = LruBuffer;
304     abuf->lru_prev = LruBuffer->lru_prev;
305     LruBuffer->lru_prev->lru_next = abuf;
306     LruBuffer->lru_prev = abuf;
307 }
308
309 static_inline int
310 MatchBuffer(struct buffer *buf, int page, afs_int32 fid,
311             struct ubik_trans *atrans)
312 {
313     if (buf->page != page) {
314         return 0;
315     }
316     if (buf->file != fid) {
317         return 0;
318     }
319     if (atrans->type == UBIK_READTRANS && buf->dirty) {
320         /* if 'buf' is dirty, it has uncommitted changes; we do not want to
321          * see uncommitted changes if we are a read transaction, so skip over
322          * it. */
323         return 0;
324     }
325     if (buf->dbase != atrans->dbase) {
326         return 0;
327     }
328     return 1;
329 }
330
331 /*!
332  * \brief Get a pointer to a particular buffer.
333  */
334 static char *
335 DRead(struct ubik_trans *atrans, afs_int32 fid, int page)
336 {
337     /* Read a page from the disk. */
338     struct buffer *tb, *lastbuffer, *found_tb = NULL;
339     afs_int32 code;
340     struct ubik_dbase *dbase = atrans->dbase;
341
342     calls++;
343     lastbuffer = LruBuffer->lru_prev;
344
345     /* Skip for write transactions for a clean page - this may not be the right page to use */
346     if (MatchBuffer(lastbuffer, page, fid, atrans)
347                 && (atrans->type == UBIK_READTRANS || lastbuffer->dirty)) {
348         tb = lastbuffer;
349         tb->lockers++;
350         lastb++;
351         return tb->data;
352     }
353     for (tb = phTable[pHash(page)]; tb; tb = tb->hashNext) {
354         if (MatchBuffer(tb, page, fid, atrans)) {
355             if (tb->dirty || atrans->type == UBIK_READTRANS) {
356                 found_tb = tb;
357                 break;
358             }
359             /* Remember this clean page - we might use it */
360             found_tb = tb;
361         }
362     }
363     /* For a write transaction, use a matching clean page if no dirty one was found */
364     if (found_tb) {
365         Dmru(found_tb);
366         found_tb->lockers++;
367         return found_tb->data;
368     }
369
370     /* can't find it */
371     tb = newslot(dbase, fid, page);
372     if (!tb)
373         return 0;
374     memset(tb->data, 0, UBIK_PAGESIZE);
375
376     tb->lockers++;
377     code =
378         (*dbase->read) (dbase, fid, tb->data, page * UBIK_PAGESIZE,
379                         UBIK_PAGESIZE);
380     if (code < 0) {
381         tb->file = BADFID;
382         Dlru(tb);
383         tb->lockers--;
384         ViceLog(0, ("Ubik: Error reading database file: errno=%d\n", errno));
385         return 0;
386     }
387     ios++;
388
389     /* Note that findslot sets the page field in the buffer equal to
390      * what it is searching for.
391      */
392     return tb->data;
393 }
394
395 /*!
396  * \brief Zap truncated pages.
397  */
398 static int
399 DTrunc(struct ubik_trans *atrans, afs_int32 fid, afs_int32 length)
400 {
401     afs_int32 maxPage;
402     struct buffer *tb;
403     int i;
404     struct ubik_dbase *dbase = atrans->dbase;
405
406     maxPage = (length + UBIK_PAGESIZE - 1) >> UBIK_LOGPAGESIZE; /* first invalid page now in file */
407     for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
408         if (tb->page >= maxPage && tb->file == fid && tb->dbase == dbase) {
409             tb->file = BADFID;
410             Dlru(tb);
411         }
412     }
413     return 0;
414 }
415
416 /*!
417  * \brief Allocate a truncation entry.
418  *
419  * We allocate special entries representing truncations, rather than
420  * performing them immediately, so that we can abort a transaction easily by simply purging
421  * the in-core memory buffers and discarding these truncation entries.
422  */
423 static struct ubik_trunc *
424 GetTrunc(void)
425 {
426     struct ubik_trunc *tt;
427     if (!freeTruncList) {
428         freeTruncList = malloc(sizeof(struct ubik_trunc));
429         freeTruncList->next = (struct ubik_trunc *)0;
430     }
431     tt = freeTruncList;
432     freeTruncList = tt->next;
433     return tt;
434 }
435
436 /*!
437  * \brief Free a truncation entry.
438  */
439 static int
440 PutTrunc(struct ubik_trunc *at)
441 {
442     at->next = freeTruncList;
443     freeTruncList = at;
444     return 0;
445 }
446
447 /*!
448  * \brief Find a truncation entry for a file, if any.
449  */
450 static struct ubik_trunc *
451 FindTrunc(struct ubik_trans *atrans, afs_int32 afile)
452 {
453     struct ubik_trunc *tt;
454     for (tt = atrans->activeTruncs; tt; tt = tt->next) {
455         if (tt->file == afile)
456             return tt;
457     }
458     return (struct ubik_trunc *)0;
459 }
460
461 /*!
462  * \brief Do truncates associated with \p atrans, and free them.
463  */
464 static int
465 DoTruncs(struct ubik_trans *atrans)
466 {
467     struct ubik_trunc *tt, *nt;
468     int (*tproc) (struct ubik_dbase *, afs_int32, afs_int32);
469     afs_int32 rcode = 0, code;
470
471     tproc = atrans->dbase->truncate;
472     for (tt = atrans->activeTruncs; tt; tt = nt) {
473         nt = tt->next;
474         DTrunc(atrans, tt->file, tt->length);   /* zap pages from buffer cache */
475         code = (*tproc) (atrans->dbase, tt->file, tt->length);
476         if (code)
477             rcode = code;
478         PutTrunc(tt);
479     }
480     /* don't unthread, because we do the entire list's worth here */
481     atrans->activeTruncs = (struct ubik_trunc *)0;
482     return (rcode);
483 }
484
485 /*!
486  * \brief Mark an \p fid as invalid.
487  */
488 int
489 udisk_Invalidate(struct ubik_dbase *adbase, afs_int32 afid)
490 {
491     struct buffer *tb;
492     int i;
493
494     for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
495         if (tb->file == afid) {
496             tb->file = BADFID;
497             Dlru(tb);
498         }
499     }
500     return 0;
501 }
502
503 /*!
504  * \brief Move this page into the correct hash bucket.
505  */
506 static int
507 FixupBucket(struct buffer *ap)
508 {
509     struct buffer **lp, *tp;
510     int i;
511     /* first try to get it out of its current hash bucket, in which it might not be */
512     i = ap->hashIndex;
513     lp = &phTable[i];
514     for (tp = *lp; tp; tp = tp->hashNext) {
515         if (tp == ap) {
516             *lp = tp->hashNext;
517             break;
518         }
519         lp = &tp->hashNext;
520     }
521     /* now figure the new hash bucket */
522     i = pHash(ap->page);
523     ap->hashIndex = i;          /* remember where we are for deletion */
524     ap->hashNext = phTable[i];  /* add us to the list */
525     phTable[i] = ap;
526     return 0;
527 }
528
529 /*!
530  * \brief Create a new slot for a particular dbase page.
531  */
532 static struct buffer *
533 newslot(struct ubik_dbase *adbase, afs_int32 afid, afs_int32 apage)
534 {
535     /* Find a usable buffer slot */
536     afs_int32 i;
537     struct buffer *pp, *tp;
538
539     pp = 0;                     /* last pure */
540     for (i = 0, tp = LruBuffer; i < nbuffers; i++, tp = tp->lru_next) {
541         if (!tp->lockers && !tp->dirty) {
542             pp = tp;
543             break;
544         }
545     }
546
547     if (pp == 0) {
548         /* There are no unlocked buffers that don't need to be written to the disk. */
549         ViceLog(0, ("Ubik: Internal Error: Unable to find free buffer in ubik cache\n"));
550         return NULL;
551     }
552
553     /* Now fill in the header. */
554     pp->dbase = adbase;
555     pp->file = afid;
556     pp->page = apage;
557
558     FixupBucket(pp);            /* move to the right hash bucket */
559     Dmru(pp);
560     return pp;
561 }
562
563 /*!
564  * \brief Release a buffer, specifying whether or not the buffer has been modified by the locker.
565  */
566 static void
567 DRelease(char *ap, int flag)
568 {
569     int index;
570     struct buffer *bp;
571
572     if (!ap)
573         return;
574     index = (int)(ap - (char *)BufferData) >> UBIK_LOGPAGESIZE;
575     bp = &(Buffers[index]);
576     bp->lockers--;
577     if (flag)
578         bp->dirty = 1;
579     return;
580 }
581
582 /*!
583  * \brief Flush all modified buffers, leaves dirty bits set (they're cleared
584  * by DSync()).
585  *
586  * \note Note interaction with DSync(): you call this thing first,
587  * writing the buffers to the disk.  Then you call DSync() to sync all the
588  * files that were written, and to clear the dirty bits.  You should
589  * always call DFlush/DSync as a pair.
590  */
591 static int
592 DFlush(struct ubik_trans *atrans)
593 {
594     int i;
595     afs_int32 code;
596     struct buffer *tb;
597     struct ubik_dbase *adbase = atrans->dbase;
598
599     tb = Buffers;
600     for (i = 0; i < nbuffers; i++, tb++) {
601         if (tb->dirty) {
602             code = tb->page * UBIK_PAGESIZE;    /* offset within file */
603             code =
604                 (*adbase->write) (adbase, tb->file, tb->data, code,
605                                   UBIK_PAGESIZE);
606             if (code != UBIK_PAGESIZE)
607                 return UIOERROR;
608         }
609     }
610     return 0;
611 }
612
613 /*!
614  * \brief Flush all modified buffers.
615  */
616 static int
617 DAbort(struct ubik_trans *atrans)
618 {
619     int i;
620     struct buffer *tb;
621
622     tb = Buffers;
623     for (i = 0; i < nbuffers; i++, tb++) {
624         if (tb->dirty) {
625             tb->dirty = 0;
626             tb->file = BADFID;
627             Dlru(tb);
628         }
629     }
630     return 0;
631 }
632
633 /**
634  * Invalidate any buffers that are duplicates of abuf. Duplicate buffers
635  * can appear if a read transaction reads a page that is dirty, then that
636  * dirty page is synced. The read transaction will skip over the dirty page,
637  * and create a new buffer, and when the dirty page is synced, it will be
638  * identical (except for contents) to the read-transaction buffer.
639  */
640 static void
641 DedupBuffer(struct buffer *abuf)
642 {
643     struct buffer *tb;
644     for (tb = phTable[pHash(abuf->page)]; tb; tb = tb->hashNext) {
645         if (tb->page == abuf->page && tb != abuf && tb->file == abuf->file
646             && tb->dbase == abuf->dbase) {
647
648             tb->file = BADFID;
649             Dlru(tb);
650         }
651     }
652 }
653
654 /*!
655  * \attention DSync() must only be called after DFlush(), due to its interpretation of dirty flag.
656  */
657 static int
658 DSync(struct ubik_trans *atrans)
659 {
660     int i;
661     afs_int32 code;
662     struct buffer *tb;
663     afs_int32 file;
664     afs_int32 rCode;
665     struct ubik_dbase *adbase = atrans->dbase;
666
667     rCode = 0;
668     while (1) {
669         file = BADFID;
670         for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
671             if (tb->dirty == 1) {
672                 if (file == BADFID)
673                     file = tb->file;
674                 if (file != BADFID && tb->file == file) {
675                     tb->dirty = 0;
676                     DedupBuffer(tb);
677                 }
678             }
679         }
680         if (file == BADFID)
681             break;
682         /* otherwise we have a file to sync */
683         code = (*adbase->sync) (adbase, file);
684         if (code)
685             rCode = code;
686     }
687     return rCode;
688 }
689
690 /*!
691  * \brief Same as DRead(), only do not even try to read the page.
692  */
693 static char *
694 DNew(struct ubik_trans *atrans, afs_int32 fid, int page)
695 {
696     struct buffer *tb;
697     struct ubik_dbase *dbase = atrans->dbase;
698
699     if ((tb = newslot(dbase, fid, page)) == 0)
700         return NULL;
701     tb->lockers++;
702     memset(tb->data, 0, UBIK_PAGESIZE);
703     return tb->data;
704 }
705
706 /*!
707  * \brief Read data from database.
708  */
709 int
710 udisk_read(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
711            afs_int32 apos, afs_int32 alen)
712 {
713     char *bp;
714     afs_int32 offset, len, totalLen;
715
716     if (atrans->flags & TRDONE)
717         return UDONE;
718     totalLen = 0;
719     while (alen > 0) {
720         bp = DRead(atrans, afile, apos >> UBIK_LOGPAGESIZE);
721         if (!bp)
722             return UEOF;
723         /* otherwise, min of remaining bytes and end of buffer to user mode */
724         offset = apos & (UBIK_PAGESIZE - 1);
725         len = UBIK_PAGESIZE - offset;
726         if (len > alen)
727             len = alen;
728         memcpy(abuffer, bp + offset, len);
729         abuffer = (char *)abuffer + len;
730         apos += len;
731         alen -= len;
732         totalLen += len;
733         DRelease(bp, 0);
734     }
735     return 0;
736 }
737
738 /*!
739  * \brief Truncate file.
740  */
741 int
742 udisk_truncate(struct ubik_trans *atrans, afs_int32 afile, afs_int32 alength)
743 {
744     afs_int32 code;
745     struct ubik_trunc *tt;
746
747     if (atrans->flags & TRDONE)
748         return UDONE;
749     if (atrans->type != UBIK_WRITETRANS)
750         return UBADTYPE;
751
752     /* write a truncate log record */
753     code = udisk_LogTruncate(atrans->dbase, afile, alength);
754
755     /* don't truncate until commit time */
756     tt = FindTrunc(atrans, afile);
757     if (!tt) {
758         /* this file not truncated yet */
759         tt = GetTrunc();
760         tt->next = atrans->activeTruncs;
761         atrans->activeTruncs = tt;
762         tt->file = afile;
763         tt->length = alength;
764     } else {
765         /* already truncated to a certain length */
766         if (tt->length > alength)
767             tt->length = alength;
768     }
769     return code;
770 }
771
772 /*!
773  * \brief Write data to database, using logs.
774  */
775 int
776 udisk_write(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
777             afs_int32 apos, afs_int32 alen)
778 {
779     char *bp;
780     afs_int32 offset, len, totalLen;
781     struct ubik_trunc *tt;
782     afs_int32 code;
783
784     if (atrans->flags & TRDONE)
785         return UDONE;
786     if (atrans->type != UBIK_WRITETRANS)
787         return UBADTYPE;
788
789     /* first write the data to the log */
790     code = udisk_LogWriteData(atrans->dbase, afile, abuffer, apos, alen);
791     if (code)
792         return code;
793
794     /* expand any truncations of this file */
795     tt = FindTrunc(atrans, afile);
796     if (tt) {
797         if (tt->length < apos + alen) {
798             tt->length = apos + alen;
799         }
800     }
801
802     /* now update vm */
803     totalLen = 0;
804     while (alen > 0) {
805         bp = DRead(atrans, afile, apos >> UBIK_LOGPAGESIZE);
806         if (!bp) {
807             bp = DNew(atrans, afile, apos >> UBIK_LOGPAGESIZE);
808             if (!bp)
809                 return UIOERROR;
810         }
811         /* otherwise, min of remaining bytes and end of buffer to user mode */
812         offset = apos & (UBIK_PAGESIZE - 1);
813         len = UBIK_PAGESIZE - offset;
814         if (len > alen)
815             len = alen;
816         memcpy(bp + offset, abuffer, len);
817         abuffer = (char *)abuffer + len;
818         apos += len;
819         alen -= len;
820         totalLen += len;
821         DRelease(bp, 1);        /* buffer modified */
822     }
823     return 0;
824 }
825
826 /*!
827  * \brief Begin a new local transaction.
828  */
829 int
830 udisk_begin(struct ubik_dbase *adbase, int atype, struct ubik_trans **atrans)
831 {
832     afs_int32 code;
833     struct ubik_trans *tt;
834
835     *atrans = NULL;
836     if (atype == UBIK_WRITETRANS) {
837         if (adbase->flags & DBWRITING)
838             return USYNC;
839         code = udisk_LogOpcode(adbase, LOGNEW, 0);
840         if (code)
841             return code;
842     }
843     tt = calloc(1, sizeof(struct ubik_trans));
844     tt->dbase = adbase;
845     tt->next = adbase->activeTrans;
846     adbase->activeTrans = tt;
847     tt->type = atype;
848     if (atype == UBIK_READTRANS)
849         adbase->readers++;
850     else if (atype == UBIK_WRITETRANS) {
851         UBIK_VERSION_LOCK;
852         adbase->flags |= DBWRITING;
853         UBIK_VERSION_UNLOCK;
854     }
855     *atrans = tt;
856     return 0;
857 }
858
859 /*!
860  * \brief Commit transaction.
861  */
862 int
863 udisk_commit(struct ubik_trans *atrans)
864 {
865     struct ubik_dbase *dbase;
866     afs_int32 code = 0;
867     struct ubik_version oldversion, newversion;
868     afs_int32 now = FT_ApproxTime();
869
870     if (atrans->flags & TRDONE)
871         return (UTWOENDS);
872
873     if (atrans->type == UBIK_WRITETRANS) {
874         dbase = atrans->dbase;
875
876         /* On the first write to the database. We update the versions */
877         if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
878             UBIK_VERSION_LOCK;
879             if (version_globals.ubik_epochTime < UBIK_MILESTONE
880                 || version_globals.ubik_epochTime > now) {
881                 ViceLog(0,
882                     ("Ubik: New database label %d is out of the valid range (%d - %d)\n",
883                      version_globals.ubik_epochTime, UBIK_MILESTONE, now));
884                 panic("Writing Ubik DB label\n");
885             }
886             oldversion = dbase->version;
887             newversion.epoch = version_globals.ubik_epochTime;
888             newversion.counter = 1;
889
890             code = (*dbase->setlabel) (dbase, 0, &newversion);
891             if (code) {
892                 UBIK_VERSION_UNLOCK;
893                 return code;
894             }
895
896             dbase->version = newversion;
897             UBIK_VERSION_UNLOCK;
898
899             urecovery_state |= UBIK_RECLABELDB;
900
901             /* Ignore the error here. If the call fails, the site is
902              * marked down and when we detect it is up again, we will
903              * send the entire database to it.
904              */
905             ContactQuorum_DISK_SetVersion( atrans, 1 /*CStampVersion */ ,
906                                            &oldversion, &newversion);
907         }
908
909         UBIK_VERSION_LOCK;
910         dbase->version.counter++;       /* bump commit count */
911 #ifdef AFS_PTHREAD_ENV
912         opr_cv_broadcast(&dbase->version_cond);
913 #else
914         LWP_NoYieldSignal(&dbase->version);
915 #endif
916         code = udisk_LogEnd(dbase, &dbase->version);
917         if (code) {
918             dbase->version.counter--;
919             UBIK_VERSION_UNLOCK;
920             return code;
921         }
922         UBIK_VERSION_UNLOCK;
923
924         /* If we fail anytime after this, then panic and let the
925          * recovery replay the log.
926          */
927         code = DFlush(atrans);  /* write dirty pages to respective files */
928         if (code)
929             panic("Writing Ubik DB modifications\n");
930         code = DSync(atrans);   /* sync the files and mark pages not dirty */
931         if (code)
932             panic("Synchronizing Ubik DB modifications\n");
933
934         code = DoTruncs(atrans);        /* Perform requested truncations */
935         if (code)
936             panic("Truncating Ubik DB\n");
937
938         /* label the committed dbase */
939         code = (*dbase->setlabel) (dbase, 0, &dbase->version);
940         if (code)
941             panic("Truncating Ubik DB\n");
942
943         code = (*dbase->truncate) (dbase, LOGFILE, 0);  /* discard log (optional) */
944         if (code)
945             panic("Truncating Ubik logfile\n");
946
947     }
948
949     /* When the transaction is marked done, it also means the logfile
950      * has been truncated.
951      */
952     atrans->flags |= TRDONE;
953     return code;
954 }
955
956 /*!
957  * \brief Abort transaction.
958  */
959 int
960 udisk_abort(struct ubik_trans *atrans)
961 {
962     struct ubik_dbase *dbase;
963     afs_int32 code;
964
965     if (atrans->flags & TRDONE)
966         return UTWOENDS;
967
968     /* Check if we are the write trans before logging abort, lest we
969      * abort a good write trans in progress.
970      * We don't really care if the LOGABORT gets to the log because we
971      * truncate the log next. If the truncate fails, we panic; for
972      * otherwise, the log entries remain. On restart, replay of the log
973      * will do nothing because the abort is there or no LogEnd opcode.
974      */
975     dbase = atrans->dbase;
976     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
977         udisk_LogOpcode(dbase, LOGABORT, 1);
978         code = (*dbase->truncate) (dbase, LOGFILE, 0);
979         if (code)
980             panic("Truncating Ubik logfile during an abort\n");
981         DAbort(atrans);         /* remove all dirty pages */
982     }
983
984     /* When the transaction is marked done, it also means the logfile
985      * has been truncated.
986      */
987     atrans->flags |= (TRABORT | TRDONE);
988     return 0;
989 }
990
991 /*!
992  * \brief Destroy a transaction after it has been committed or aborted.
993  *
994  * If it hasn't committed before you call this routine, we'll abort the
995  * transaction for you.
996  */
997 int
998 udisk_end(struct ubik_trans *atrans)
999 {
1000     struct ubik_dbase *dbase;
1001
1002     if (!(atrans->flags & TRDONE))
1003         udisk_abort(atrans);
1004     dbase = atrans->dbase;
1005
1006     ulock_relLock(atrans);
1007     unthread(atrans);
1008
1009     /* check if we are the write trans before unsetting the DBWRITING bit, else
1010      * we could be unsetting someone else's bit.
1011      */
1012     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
1013         UBIK_VERSION_LOCK;
1014         dbase->flags &= ~DBWRITING;
1015         UBIK_VERSION_UNLOCK;
1016     } else {
1017         dbase->readers--;
1018     }
1019     if (atrans->iovec_info.iovec_wrt_val)
1020         free(atrans->iovec_info.iovec_wrt_val);
1021     if (atrans->iovec_data.iovec_buf_val)
1022         free(atrans->iovec_data.iovec_buf_val);
1023     free(atrans);
1024
1025     /* Wakeup any writers waiting in BeginTrans() */
1026 #ifdef AFS_PTHREAD_ENV
1027     opr_cv_broadcast(&dbase->flags_cond);
1028 #else
1029     LWP_NoYieldSignal(&dbase->flags);
1030 #endif
1031     return 0;
1032 }