2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
18 #define UBIK_INTERNALS
23 static struct buffer {
24 struct ubik_dbase *dbase; /*!< dbase within which the buffer resides */
25 afs_int32 file; /*!< Unique cache key */
26 afs_int32 page; /*!< page number */
27 struct buffer *lru_next;
28 struct buffer *lru_prev;
29 struct buffer *hashNext; /*!< next dude in hash table */
30 char *data; /*!< ptr to the data */
31 char lockers; /*!< usage ref count */
32 char dirty; /*!< is buffer modified */
33 char hashIndex; /*!< back ptr to hash table */
36 #define pHash(page) ((page) & (PHSIZE-1))
38 afs_int32 ubik_nBuffers = NBUFFERS;
39 static struct buffer *phTable[PHSIZE]; /*!< page hash table */
40 static struct buffer *LruBuffer;
42 static int calls = 0, ios = 0, lastb = 0;
43 static char *BufferData;
44 static struct buffer *newslot(struct ubik_dbase *adbase, afs_int32 afid,
46 #define BADFID 0xffffffff
48 static int DTrunc(struct ubik_trans *atrans, afs_int32 fid, afs_int32 length);
50 static struct ubik_trunc *freeTruncList = 0;
53 * \brief Remove a transaction from the database's active transaction list. Don't free it.
56 unthread(struct ubik_trans *atrans)
58 struct ubik_trans **lt, *tt;
59 lt = &atrans->dbase->activeTrans;
60 for (tt = *lt; tt; lt = &tt->next, tt = *lt) {
67 return 2; /* no entry */
71 * \brief some debugging assistance
74 udisk_Debug(struct ubik_debug *aparm)
79 memcpy(&aparm->localVersion, &ubik_dbase->version,
80 sizeof(struct ubik_version));
81 aparm->lockedPages = 0;
82 aparm->writeLockedPages = 0;
84 for (i = 0; i < nbuffers; i++, tb++) {
88 aparm->writeLockedPages++;
94 * \brief Write an opcode to the log.
96 * log format is defined here, and implicitly in recovery.c
98 * 4 byte opcode, followed by parameters, each 4 bytes long. All integers
99 * are in logged in network standard byte order, in case we want to move logs
100 * from machine-to-machine someday.
102 * Begin transaction: opcode \n
103 * Commit transaction: opcode, version (8 bytes) \n
104 * Truncate file: opcode, file number, length \n
105 * Abort transaction: opcode \n
106 * Write data: opcode, file, position, length, <length> data bytes \n
109 udisk_LogOpcode(struct ubik_dbase *adbase, afs_int32 aopcode, int async)
111 struct ubik_stat ustat;
114 /* figure out where to write */
115 code = (*adbase->stat) (adbase, LOGFILE, &ustat);
119 /* setup data and do write */
120 aopcode = htonl(aopcode);
122 (*adbase->write) (adbase, LOGFILE, (char *)&aopcode, ustat.size,
124 if (code != sizeof(afs_int32))
127 /* optionally sync data */
129 code = (*adbase->sync) (adbase, LOGFILE);
136 * \brief Log a commit, never syncing.
139 udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion)
143 struct ubik_stat ustat;
145 /* figure out where to write */
146 code = (*adbase->stat) (adbase, LOGFILE, &ustat);
151 data[0] = htonl(LOGEND);
152 data[1] = htonl(aversion->epoch);
153 data[2] = htonl(aversion->counter);
157 (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
158 3 * sizeof(afs_int32));
159 if (code != 3 * sizeof(afs_int32))
162 /* finally sync the log */
163 code = (*adbase->sync) (adbase, LOGFILE);
168 * \brief Log a truncate operation, never syncing.
171 udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile,
176 struct ubik_stat ustat;
178 /* figure out where to write */
179 code = (*adbase->stat) (adbase, LOGFILE, &ustat);
184 data[0] = htonl(LOGTRUNCATE);
185 data[1] = htonl(afile);
186 data[2] = htonl(alength);
190 (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
191 3 * sizeof(afs_int32));
192 if (code != 3 * sizeof(afs_int32))
198 * \brief Write some data to the log, never syncing.
201 udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, void *abuffer,
202 afs_int32 apos, afs_int32 alen)
204 struct ubik_stat ustat;
209 /* find end of log */
210 code = (*adbase->stat) (adbase, LOGFILE, &ustat);
216 data[0] = htonl(LOGDATA);
217 data[1] = htonl(afile);
218 data[2] = htonl(apos);
219 data[3] = htonl(alen);
223 (*adbase->write) (adbase, LOGFILE, (char *)data, lpos, 4 * sizeof(afs_int32));
224 if (code != 4 * sizeof(afs_int32))
226 lpos += 4 * sizeof(afs_int32);
229 code = (*adbase->write) (adbase, LOGFILE, abuffer, lpos, alen);
236 udisk_Init(int abuffers)
238 /* Initialize the venus buffer system. */
241 Buffers = calloc(abuffers, sizeof(struct buffer));
242 BufferData = malloc(abuffers * UBIK_PAGESIZE);
244 for (i = 0; i < PHSIZE; i++)
246 for (i = 0; i < abuffers; i++) {
247 /* Fill in each buffer with an empty indication. */
249 tb->lru_next = &(Buffers[i + 1]);
250 tb->lru_prev = &(Buffers[i - 1]);
251 tb->data = &BufferData[UBIK_PAGESIZE * i];
254 Buffers[0].lru_prev = &(Buffers[abuffers - 1]);
255 Buffers[abuffers - 1].lru_next = &(Buffers[0]);
256 LruBuffer = &(Buffers[0]);
261 * \brief Take a buffer and mark it as the least recently used buffer.
264 Dlru(struct buffer *abuf)
266 if (LruBuffer == abuf)
269 /* Unthread from where it is in the list */
270 abuf->lru_next->lru_prev = abuf->lru_prev;
271 abuf->lru_prev->lru_next = abuf->lru_next;
273 /* Thread onto beginning of LRU list */
274 abuf->lru_next = LruBuffer;
275 abuf->lru_prev = LruBuffer->lru_prev;
277 LruBuffer->lru_prev->lru_next = abuf;
278 LruBuffer->lru_prev = abuf;
283 * \brief Take a buffer and mark it as the most recently used buffer.
286 Dmru(struct buffer *abuf)
288 if (LruBuffer == abuf) {
289 LruBuffer = LruBuffer->lru_next;
293 /* Unthread from where it is in the list */
294 abuf->lru_next->lru_prev = abuf->lru_prev;
295 abuf->lru_prev->lru_next = abuf->lru_next;
297 /* Thread onto end of LRU list - making it the MRU buffer */
298 abuf->lru_next = LruBuffer;
299 abuf->lru_prev = LruBuffer->lru_prev;
300 LruBuffer->lru_prev->lru_next = abuf;
301 LruBuffer->lru_prev = abuf;
305 MatchBuffer(struct buffer *buf, int page, afs_int32 fid,
306 struct ubik_trans *atrans)
308 if (buf->page != page) {
311 if (buf->file != fid) {
314 if (atrans->type == UBIK_READTRANS && buf->dirty) {
315 /* if 'buf' is dirty, it has uncommitted changes; we do not want to
316 * see uncommitted changes if we are a read transaction, so skip over
320 if (buf->dbase != atrans->dbase) {
327 * \brief Get a pointer to a particular buffer.
330 DRead(struct ubik_trans *atrans, afs_int32 fid, int page)
332 /* Read a page from the disk. */
333 struct buffer *tb, *lastbuffer, *found_tb = NULL;
335 struct ubik_dbase *dbase = atrans->dbase;
338 lastbuffer = LruBuffer->lru_prev;
340 /* Skip for write transactions for a clean page - this may not be the right page to use */
341 if (MatchBuffer(lastbuffer, page, fid, atrans)
342 && (atrans->type == UBIK_READTRANS || lastbuffer->dirty)) {
348 for (tb = phTable[pHash(page)]; tb; tb = tb->hashNext) {
349 if (MatchBuffer(tb, page, fid, atrans)) {
350 if (tb->dirty || atrans->type == UBIK_READTRANS) {
354 /* Remember this clean page - we might use it */
358 /* For a write transaction, use a matching clean page if no dirty one was found */
362 return found_tb->data;
366 tb = newslot(dbase, fid, page);
369 memset(tb->data, 0, UBIK_PAGESIZE);
373 (*dbase->read) (dbase, fid, tb->data, page * UBIK_PAGESIZE,
379 ubik_print("Ubik: Error reading database file: errno=%d\n", errno);
384 /* Note that findslot sets the page field in the buffer equal to
385 * what it is searching for.
391 * \brief Zap truncated pages.
394 DTrunc(struct ubik_trans *atrans, afs_int32 fid, afs_int32 length)
399 struct ubik_dbase *dbase = atrans->dbase;
401 maxPage = (length + UBIK_PAGESIZE - 1) >> UBIK_LOGPAGESIZE; /* first invalid page now in file */
402 for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
403 if (tb->page >= maxPage && tb->file == fid && tb->dbase == dbase) {
412 * \brief Allocate a truncation entry.
414 * We allocate special entries representing truncations, rather than
415 * performing them immediately, so that we can abort a transaction easily by simply purging
416 * the in-core memory buffers and discarding these truncation entries.
418 static struct ubik_trunc *
421 struct ubik_trunc *tt;
422 if (!freeTruncList) {
424 (struct ubik_trunc *)malloc(sizeof(struct ubik_trunc));
425 freeTruncList->next = (struct ubik_trunc *)0;
428 freeTruncList = tt->next;
433 * \brief Free a truncation entry.
436 PutTrunc(struct ubik_trunc *at)
438 at->next = freeTruncList;
444 * \brief Find a truncation entry for a file, if any.
446 static struct ubik_trunc *
447 FindTrunc(struct ubik_trans *atrans, afs_int32 afile)
449 struct ubik_trunc *tt;
450 for (tt = atrans->activeTruncs; tt; tt = tt->next) {
451 if (tt->file == afile)
454 return (struct ubik_trunc *)0;
458 * \brief Do truncates associated with \p atrans, and free them.
461 DoTruncs(struct ubik_trans *atrans)
463 struct ubik_trunc *tt, *nt;
464 int (*tproc) (struct ubik_dbase *, afs_int32, afs_int32);
465 afs_int32 rcode = 0, code;
467 tproc = atrans->dbase->truncate;
468 for (tt = atrans->activeTruncs; tt; tt = nt) {
470 DTrunc(atrans, tt->file, tt->length); /* zap pages from buffer cache */
471 code = (*tproc) (atrans->dbase, tt->file, tt->length);
476 /* don't unthread, because we do the entire list's worth here */
477 atrans->activeTruncs = (struct ubik_trunc *)0;
482 * \brief Mark an \p fid as invalid.
485 udisk_Invalidate(struct ubik_dbase *adbase, afs_int32 afid)
490 for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
491 if (tb->file == afid) {
500 * \brief Move this page into the correct hash bucket.
503 FixupBucket(struct buffer *ap)
505 struct buffer **lp, *tp;
507 /* first try to get it out of its current hash bucket, in which it might not be */
510 for (tp = *lp; tp; tp = tp->hashNext) {
517 /* now figure the new hash bucket */
519 ap->hashIndex = i; /* remember where we are for deletion */
520 ap->hashNext = phTable[i]; /* add us to the list */
526 * \brief Create a new slot for a particular dbase page.
528 static struct buffer *
529 newslot(struct ubik_dbase *adbase, afs_int32 afid, afs_int32 apage)
531 /* Find a usable buffer slot */
533 struct buffer *pp, *tp;
535 pp = 0; /* last pure */
536 for (i = 0, tp = LruBuffer; i < nbuffers; i++, tp = tp->lru_next) {
537 if (!tp->lockers && !tp->dirty) {
544 /* There are no unlocked buffers that don't need to be written to the disk. */
546 ("Ubik: Internal Error: Unable to find free buffer in ubik cache\n");
550 /* Now fill in the header. */
555 FixupBucket(pp); /* move to the right hash bucket */
561 * \brief Release a buffer, specifying whether or not the buffer has been modified by the locker.
564 DRelease(char *ap, int flag)
571 index = (int)(ap - (char *)BufferData) >> UBIK_LOGPAGESIZE;
572 bp = &(Buffers[index]);
580 * \brief Flush all modified buffers, leaves dirty bits set (they're cleared
583 * \note Note interaction with DSync(): you call this thing first,
584 * writing the buffers to the disk. Then you call DSync() to sync all the
585 * files that were written, and to clear the dirty bits. You should
586 * always call DFlush/DSync as a pair.
589 DFlush(struct ubik_trans *atrans)
594 struct ubik_dbase *adbase = atrans->dbase;
597 for (i = 0; i < nbuffers; i++, tb++) {
599 code = tb->page * UBIK_PAGESIZE; /* offset within file */
601 (*adbase->write) (adbase, tb->file, tb->data, code,
603 if (code != UBIK_PAGESIZE)
611 * \brief Flush all modified buffers.
614 DAbort(struct ubik_trans *atrans)
620 for (i = 0; i < nbuffers; i++, tb++) {
631 * Invalidate any buffers that are duplicates of abuf. Duplicate buffers
632 * can appear if a read transaction reads a page that is dirty, then that
633 * dirty page is synced. The read transaction will skip over the dirty page,
634 * and create a new buffer, and when the dirty page is synced, it will be
635 * identical (except for contents) to the read-transaction buffer.
638 DedupBuffer(struct buffer *abuf)
641 for (tb = phTable[pHash(abuf->page)]; tb; tb = tb->hashNext) {
642 if (tb->page == abuf->page && tb != abuf && tb->file == abuf->file
643 && tb->dbase == abuf->dbase) {
652 * \attention DSync() must only be called after DFlush(), due to its interpretation of dirty flag.
655 DSync(struct ubik_trans *atrans)
662 struct ubik_dbase *adbase = atrans->dbase;
667 for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
668 if (tb->dirty == 1) {
671 if (file != BADFID && tb->file == file) {
679 /* otherwise we have a file to sync */
680 code = (*adbase->sync) (adbase, file);
688 * \brief Same as DRead(), only do not even try to read the page.
691 DNew(struct ubik_trans *atrans, afs_int32 fid, int page)
694 struct ubik_dbase *dbase = atrans->dbase;
696 if ((tb = newslot(dbase, fid, page)) == 0)
699 memset(tb->data, 0, UBIK_PAGESIZE);
704 * \brief Read data from database.
707 udisk_read(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
708 afs_int32 apos, afs_int32 alen)
711 afs_int32 offset, len, totalLen;
713 if (atrans->flags & TRDONE)
717 bp = DRead(atrans, afile, apos >> UBIK_LOGPAGESIZE);
720 /* otherwise, min of remaining bytes and end of buffer to user mode */
721 offset = apos & (UBIK_PAGESIZE - 1);
722 len = UBIK_PAGESIZE - offset;
725 memcpy(abuffer, bp + offset, len);
726 abuffer = (char *)abuffer + len;
736 * \brief Truncate file.
739 udisk_truncate(struct ubik_trans *atrans, afs_int32 afile, afs_int32 alength)
742 struct ubik_trunc *tt;
744 if (atrans->flags & TRDONE)
746 if (atrans->type != UBIK_WRITETRANS)
749 /* write a truncate log record */
750 code = udisk_LogTruncate(atrans->dbase, afile, alength);
752 /* don't truncate until commit time */
753 tt = FindTrunc(atrans, afile);
755 /* this file not truncated yet */
757 tt->next = atrans->activeTruncs;
758 atrans->activeTruncs = tt;
760 tt->length = alength;
762 /* already truncated to a certain length */
763 if (tt->length > alength)
764 tt->length = alength;
770 * \brief Write data to database, using logs.
773 udisk_write(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
774 afs_int32 apos, afs_int32 alen)
777 afs_int32 offset, len, totalLen;
778 struct ubik_trunc *tt;
781 if (atrans->flags & TRDONE)
783 if (atrans->type != UBIK_WRITETRANS)
786 /* first write the data to the log */
787 code = udisk_LogWriteData(atrans->dbase, afile, abuffer, apos, alen);
791 /* expand any truncations of this file */
792 tt = FindTrunc(atrans, afile);
794 if (tt->length < apos + alen) {
795 tt->length = apos + alen;
802 bp = DRead(atrans, afile, apos >> UBIK_LOGPAGESIZE);
804 bp = DNew(atrans, afile, apos >> UBIK_LOGPAGESIZE);
807 memset(bp, 0, UBIK_PAGESIZE);
809 /* otherwise, min of remaining bytes and end of buffer to user mode */
810 offset = apos & (UBIK_PAGESIZE - 1);
811 len = UBIK_PAGESIZE - offset;
814 memcpy(bp + offset, abuffer, len);
815 abuffer = (char *)abuffer + len;
819 DRelease(bp, 1); /* buffer modified */
825 * \brief Begin a new local transaction.
828 udisk_begin(struct ubik_dbase *adbase, int atype, struct ubik_trans **atrans)
831 struct ubik_trans *tt;
833 *atrans = (struct ubik_trans *)NULL;
834 if (atype == UBIK_WRITETRANS) {
835 if (adbase->flags & DBWRITING)
837 code = udisk_LogOpcode(adbase, LOGNEW, 0);
841 tt = calloc(1, sizeof(struct ubik_trans));
843 tt->next = adbase->activeTrans;
844 adbase->activeTrans = tt;
846 if (atype == UBIK_READTRANS)
848 else if (atype == UBIK_WRITETRANS) {
850 adbase->flags |= DBWRITING;
858 * \brief Commit transaction.
861 udisk_commit(struct ubik_trans *atrans)
863 struct ubik_dbase *dbase;
865 struct ubik_version oldversion, newversion;
867 if (atrans->flags & TRDONE)
870 if (atrans->type == UBIK_WRITETRANS) {
871 dbase = atrans->dbase;
873 /* On the first write to the database. We update the versions */
874 if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
876 oldversion = dbase->version;
877 newversion.epoch = FT_ApproxTime();;
878 newversion.counter = 1;
880 code = (*dbase->setlabel) (dbase, 0, &newversion);
883 version_globals.ubik_epochTime = newversion.epoch;
884 dbase->version = newversion;
887 urecovery_state |= UBIK_RECLABELDB;
889 /* Ignore the error here. If the call fails, the site is
890 * marked down and when we detect it is up again, we will
891 * send the entire database to it.
893 ContactQuorum_DISK_SetVersion( atrans, 1 /*CStampVersion */ ,
894 &oldversion, &newversion);
898 dbase->version.counter++; /* bump commit count */
899 #ifdef AFS_PTHREAD_ENV
900 CV_BROADCAST(&dbase->version_cond);
902 LWP_NoYieldSignal(&dbase->version);
904 code = udisk_LogEnd(dbase, &dbase->version);
906 dbase->version.counter--;
911 /* If we fail anytime after this, then panic and let the
912 * recovery replay the log.
914 code = DFlush(atrans); /* write dirty pages to respective files */
916 panic("Writing Ubik DB modifications\n");
917 code = DSync(atrans); /* sync the files and mark pages not dirty */
919 panic("Synchronizing Ubik DB modifications\n");
921 code = DoTruncs(atrans); /* Perform requested truncations */
923 panic("Truncating Ubik DB\n");
925 /* label the committed dbase */
926 code = (*dbase->setlabel) (dbase, 0, &dbase->version);
928 panic("Truncating Ubik DB\n");
930 code = (*dbase->truncate) (dbase, LOGFILE, 0); /* discard log (optional) */
932 panic("Truncating Ubik logfile\n");
936 /* When the transaction is marked done, it also means the logfile
937 * has been truncated.
939 atrans->flags |= TRDONE;
944 * \brief Abort transaction.
947 udisk_abort(struct ubik_trans *atrans)
949 struct ubik_dbase *dbase;
952 if (atrans->flags & TRDONE)
955 /* Check if we are the write trans before logging abort, lest we
956 * abort a good write trans in progress.
957 * We don't really care if the LOGABORT gets to the log because we
958 * truncate the log next. If the truncate fails, we panic; for
959 * otherwise, the log entries remain. On restart, replay of the log
960 * will do nothing because the abort is there or no LogEnd opcode.
962 dbase = atrans->dbase;
963 if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
964 udisk_LogOpcode(dbase, LOGABORT, 1);
965 code = (*dbase->truncate) (dbase, LOGFILE, 0);
967 panic("Truncating Ubik logfile during an abort\n");
968 DAbort(atrans); /* remove all dirty pages */
971 /* When the transaction is marked done, it also means the logfile
972 * has been truncated.
974 atrans->flags |= (TRABORT | TRDONE);
979 * \brief Destroy a transaction after it has been committed or aborted.
981 * If it hasn't committed before you call this routine, we'll abort the
982 * transaction for you.
985 udisk_end(struct ubik_trans *atrans)
987 struct ubik_dbase *dbase;
989 if (!(atrans->flags & TRDONE))
991 dbase = atrans->dbase;
993 ulock_relLock(atrans);
996 /* check if we are the write trans before unsetting the DBWRITING bit, else
997 * we could be unsetting someone else's bit.
999 if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
1001 dbase->flags &= ~DBWRITING;
1002 UBIK_VERSION_UNLOCK;
1006 if (atrans->iovec_info.iovec_wrt_val)
1007 free(atrans->iovec_info.iovec_wrt_val);
1008 if (atrans->iovec_data.iovec_buf_val)
1009 free(atrans->iovec_data.iovec_buf_val);
1012 /* Wakeup any writers waiting in BeginTrans() */
1013 #ifdef AFS_PTHREAD_ENV
1014 CV_BROADCAST(&dbase->flags_cond);
1016 LWP_NoYieldSignal(&dbase->flags);