2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
15 #include <sys/types.h>
24 #include <netinet/in.h>
32 #define UBIK_INTERNALS
37 static struct buffer {
38 struct ubik_dbase *dbase; /*!< dbase within which the buffer resides */
39 afs_int32 file; /*!< Unique cache key */
40 afs_int32 page; /*!< page number */
41 struct buffer *lru_next;
42 struct buffer *lru_prev;
43 struct buffer *hashNext; /*!< next dude in hash table */
44 char *data; /*!< ptr to the data */
45 char lockers; /*!< usage ref count */
46 char dirty; /*!< is buffer modified */
47 char hashIndex; /*!< back ptr to hash table */
50 #define pHash(page) ((page) & (PHSIZE-1))
52 afs_int32 ubik_nBuffers = NBUFFERS;
53 static struct buffer *phTable[PHSIZE]; /*!< page hash table */
54 static struct buffer *LruBuffer;
56 static int calls = 0, ios = 0, lastb = 0;
57 static char *BufferData;
58 static struct buffer *newslot(struct ubik_dbase *adbase, afs_int32 afid,
61 #define BADFID 0xffffffff
63 static int DTrunc(struct ubik_trans *atrans, afs_int32 fid, afs_int32 length);
65 static struct ubik_trunc *freeTruncList = 0;
68 * \brief Remove a transaction from the database's active transaction list. Don't free it.
71 unthread(struct ubik_trans *atrans)
73 struct ubik_trans **lt, *tt;
74 lt = &atrans->dbase->activeTrans;
75 for (tt = *lt; tt; lt = &tt->next, tt = *lt) {
82 return 2; /* no entry */
86 * \brief some debugging assistance
89 udisk_Debug(struct ubik_debug *aparm)
94 memcpy(&aparm->localVersion, &ubik_dbase->version,
95 sizeof(struct ubik_version));
96 aparm->lockedPages = 0;
97 aparm->writeLockedPages = 0;
99 for (i = 0; i < nbuffers; i++, tb++) {
101 aparm->lockedPages++;
103 aparm->writeLockedPages++;
109 * \brief Write an opcode to the log.
111 * log format is defined here, and implicitly in recovery.c
113 * 4 byte opcode, followed by parameters, each 4 bytes long. All integers
114 * are in logged in network standard byte order, in case we want to move logs
115 * from machine-to-machine someday.
117 * Begin transaction: opcode \n
118 * Commit transaction: opcode, version (8 bytes) \n
119 * Truncate file: opcode, file number, length \n
120 * Abort transaction: opcode \n
121 * Write data: opcode, file, position, length, <length> data bytes \n
124 udisk_LogOpcode(struct ubik_dbase *adbase, afs_int32 aopcode, int async)
126 struct ubik_stat ustat;
129 /* figure out where to write */
130 code = (*adbase->stat) (adbase, LOGFILE, &ustat);
134 /* setup data and do write */
135 aopcode = htonl(aopcode);
137 (*adbase->write) (adbase, LOGFILE, (char *)&aopcode, ustat.size,
139 if (code != sizeof(afs_int32))
142 /* optionally sync data */
144 code = (*adbase->sync) (adbase, LOGFILE);
151 * \brief Log a commit, never syncing.
154 udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion)
158 struct ubik_stat ustat;
160 /* figure out where to write */
161 code = (*adbase->stat) (adbase, LOGFILE, &ustat);
166 data[0] = htonl(LOGEND);
167 data[1] = htonl(aversion->epoch);
168 data[2] = htonl(aversion->counter);
172 (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
173 3 * sizeof(afs_int32));
174 if (code != 3 * sizeof(afs_int32))
177 /* finally sync the log */
178 code = (*adbase->sync) (adbase, LOGFILE);
183 * \brief Log a truncate operation, never syncing.
186 udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile,
191 struct ubik_stat ustat;
193 /* figure out where to write */
194 code = (*adbase->stat) (adbase, LOGFILE, &ustat);
199 data[0] = htonl(LOGTRUNCATE);
200 data[1] = htonl(afile);
201 data[2] = htonl(alength);
205 (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
206 3 * sizeof(afs_int32));
207 if (code != 3 * sizeof(afs_int32))
213 * \brief Write some data to the log, never syncing.
216 udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, void *abuffer,
217 afs_int32 apos, afs_int32 alen)
219 struct ubik_stat ustat;
224 /* find end of log */
225 code = (*adbase->stat) (adbase, LOGFILE, &ustat);
231 data[0] = htonl(LOGDATA);
232 data[1] = htonl(afile);
233 data[2] = htonl(apos);
234 data[3] = htonl(alen);
238 (*adbase->write) (adbase, LOGFILE, (char *)data, lpos, 4 * sizeof(afs_int32));
239 if (code != 4 * sizeof(afs_int32))
241 lpos += 4 * sizeof(afs_int32);
244 code = (*adbase->write) (adbase, LOGFILE, abuffer, lpos, alen);
253 /* Initialize the venus buffer system. */
256 Buffers = (struct buffer *)malloc(abuffers * sizeof(struct buffer));
257 memset(Buffers, 0, abuffers * sizeof(struct buffer));
258 BufferData = (char *)malloc(abuffers * UBIK_PAGESIZE);
260 for (i = 0; i < PHSIZE; i++)
262 for (i = 0; i < abuffers; i++) {
263 /* Fill in each buffer with an empty indication. */
265 tb->lru_next = &(Buffers[i + 1]);
266 tb->lru_prev = &(Buffers[i - 1]);
267 tb->data = &BufferData[UBIK_PAGESIZE * i];
270 Buffers[0].lru_prev = &(Buffers[abuffers - 1]);
271 Buffers[abuffers - 1].lru_next = &(Buffers[0]);
272 LruBuffer = &(Buffers[0]);
277 * \brief Take a buffer and mark it as the least recently used buffer.
280 Dlru(struct buffer *abuf)
282 if (LruBuffer == abuf)
285 /* Unthread from where it is in the list */
286 abuf->lru_next->lru_prev = abuf->lru_prev;
287 abuf->lru_prev->lru_next = abuf->lru_next;
289 /* Thread onto beginning of LRU list */
290 abuf->lru_next = LruBuffer;
291 abuf->lru_prev = LruBuffer->lru_prev;
293 LruBuffer->lru_prev->lru_next = abuf;
294 LruBuffer->lru_prev = abuf;
299 * \brief Take a buffer and mark it as the most recently used buffer.
302 Dmru(struct buffer *abuf)
304 if (LruBuffer == abuf) {
305 LruBuffer = LruBuffer->lru_next;
309 /* Unthread from where it is in the list */
310 abuf->lru_next->lru_prev = abuf->lru_prev;
311 abuf->lru_prev->lru_next = abuf->lru_next;
313 /* Thread onto end of LRU list - making it the MRU buffer */
314 abuf->lru_next = LruBuffer;
315 abuf->lru_prev = LruBuffer->lru_prev;
316 LruBuffer->lru_prev->lru_next = abuf;
317 LruBuffer->lru_prev = abuf;
321 MatchBuffer(struct buffer *buf, int page, afs_int32 fid,
322 struct ubik_trans *atrans)
324 if (buf->page != page) {
327 if (buf->file != fid) {
330 if (atrans->type == UBIK_READTRANS && buf->dirty) {
331 /* if 'buf' is dirty, it has uncommitted changes; we do not want to
332 * see uncommitted changes if we are a read transaction, so skip over
336 if (buf->dbase != atrans->dbase) {
343 * \brief Get a pointer to a particular buffer.
346 DRead(struct ubik_trans *atrans, afs_int32 fid, int page)
348 /* Read a page from the disk. */
349 struct buffer *tb, *lastbuffer, *found_tb = NULL;
351 struct ubik_dbase *dbase = atrans->dbase;
354 lastbuffer = LruBuffer->lru_prev;
356 /* Skip for write transactions for a clean page - this may not be the right page to use */
357 if (MatchBuffer(lastbuffer, page, fid, atrans)
358 && (atrans->type == UBIK_READTRANS || lastbuffer->dirty)) {
364 for (tb = phTable[pHash(page)]; tb; tb = tb->hashNext) {
365 if (MatchBuffer(tb, page, fid, atrans)) {
366 if (tb->dirty || atrans->type == UBIK_READTRANS) {
370 /* Remember this clean page - we might use it */
374 /* For a write transaction, use a matching clean page if no dirty one was found */
378 return found_tb->data;
382 tb = newslot(dbase, fid, page);
385 memset(tb->data, 0, UBIK_PAGESIZE);
389 (*dbase->read) (dbase, fid, tb->data, page * UBIK_PAGESIZE,
395 ubik_print("Ubik: Error reading database file: errno=%d\n", errno);
400 /* Note that findslot sets the page field in the buffer equal to
401 * what it is searching for.
407 * \brief Zap truncated pages.
410 DTrunc(struct ubik_trans *atrans, afs_int32 fid, afs_int32 length)
415 struct ubik_dbase *dbase = atrans->dbase;
417 maxPage = (length + UBIK_PAGESIZE - 1) >> UBIK_LOGPAGESIZE; /* first invalid page now in file */
418 for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
419 if (tb->page >= maxPage && tb->file == fid && tb->dbase == dbase) {
428 * \brief Allocate a truncation entry.
430 * We allocate special entries representing truncations, rather than
431 * performing them immediately, so that we can abort a transaction easily by simply purging
432 * the in-core memory buffers and discarding these truncation entries.
434 static struct ubik_trunc *
437 struct ubik_trunc *tt;
438 if (!freeTruncList) {
440 (struct ubik_trunc *)malloc(sizeof(struct ubik_trunc));
441 freeTruncList->next = (struct ubik_trunc *)0;
444 freeTruncList = tt->next;
449 * \brief Free a truncation entry.
452 PutTrunc(struct ubik_trunc *at)
454 at->next = freeTruncList;
460 * \brief Find a truncation entry for a file, if any.
462 static struct ubik_trunc *
463 FindTrunc(struct ubik_trans *atrans, afs_int32 afile)
465 struct ubik_trunc *tt;
466 for (tt = atrans->activeTruncs; tt; tt = tt->next) {
467 if (tt->file == afile)
470 return (struct ubik_trunc *)0;
474 * \brief Do truncates associated with \p atrans, and free them.
477 DoTruncs(struct ubik_trans *atrans)
479 struct ubik_trunc *tt, *nt;
480 int (*tproc) (struct ubik_dbase *, afs_int32, afs_int32);
481 afs_int32 rcode = 0, code;
483 tproc = atrans->dbase->truncate;
484 for (tt = atrans->activeTruncs; tt; tt = nt) {
486 DTrunc(atrans, tt->file, tt->length); /* zap pages from buffer cache */
487 code = (*tproc) (atrans->dbase, tt->file, tt->length);
492 /* don't unthread, because we do the entire list's worth here */
493 atrans->activeTruncs = (struct ubik_trunc *)0;
498 * \brief Mark an \p fid as invalid.
501 udisk_Invalidate(struct ubik_dbase *adbase, afs_int32 afid)
506 for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
507 if (tb->file == afid) {
516 * \brief Move this page into the correct hash bucket.
519 FixupBucket(struct buffer *ap)
521 struct buffer **lp, *tp;
523 /* first try to get it out of its current hash bucket, in which it might not be */
526 for (tp = *lp; tp; tp = tp->hashNext) {
533 /* now figure the new hash bucket */
535 ap->hashIndex = i; /* remember where we are for deletion */
536 ap->hashNext = phTable[i]; /* add us to the list */
542 * \brief Create a new slot for a particular dbase page.
544 static struct buffer *
545 newslot(struct ubik_dbase *adbase, afs_int32 afid, afs_int32 apage)
547 /* Find a usable buffer slot */
549 struct buffer *pp, *tp;
551 pp = 0; /* last pure */
552 for (i = 0, tp = LruBuffer; i < nbuffers; i++, tp = tp->lru_next) {
553 if (!tp->lockers && !tp->dirty) {
560 /* There are no unlocked buffers that don't need to be written to the disk. */
562 ("Ubik: Internal Error: Unable to find free buffer in ubik cache\n");
566 /* Now fill in the header. */
571 FixupBucket(pp); /* move to the right hash bucket */
577 * \brief Release a buffer, specifying whether or not the buffer has been modified by the locker.
580 DRelease(char *ap, int flag)
587 index = (int)(ap - (char *)BufferData) >> UBIK_LOGPAGESIZE;
588 bp = &(Buffers[index]);
596 * \brief Flush all modified buffers, leaves dirty bits set (they're cleared
599 * \note Note interaction with DSync(): you call this thing first,
600 * writing the buffers to the disk. Then you call DSync() to sync all the
601 * files that were written, and to clear the dirty bits. You should
602 * always call DFlush/DSync as a pair.
605 DFlush(struct ubik_trans *atrans)
610 struct ubik_dbase *adbase = atrans->dbase;
613 for (i = 0; i < nbuffers; i++, tb++) {
615 code = tb->page * UBIK_PAGESIZE; /* offset within file */
617 (*adbase->write) (adbase, tb->file, tb->data, code,
619 if (code != UBIK_PAGESIZE)
627 * \brief Flush all modified buffers.
630 DAbort(struct ubik_trans *atrans)
636 for (i = 0; i < nbuffers; i++, tb++) {
647 * Invalidate any buffers that are duplicates of abuf. Duplicate buffers
648 * can appear if a read transaction reads a page that is dirty, then that
649 * dirty page is synced. The read transaction will skip over the dirty page,
650 * and create a new buffer, and when the dirty page is synced, it will be
651 * identical (except for contents) to the read-transaction buffer.
654 DedupBuffer(struct buffer *abuf)
657 for (tb = phTable[pHash(abuf->page)]; tb; tb = tb->hashNext) {
658 if (tb->page == abuf->page && tb != abuf && tb->file == abuf->file
659 && tb->dbase == abuf->dbase) {
668 * \attention DSync() must only be called after DFlush(), due to its interpretation of dirty flag.
671 DSync(struct ubik_trans *atrans)
678 struct ubik_dbase *adbase = atrans->dbase;
683 for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
684 if (tb->dirty == 1) {
687 if (file != BADFID && tb->file == file) {
695 /* otherwise we have a file to sync */
696 code = (*adbase->sync) (adbase, file);
704 * \brief Same as DRead(), only do not even try to read the page.
707 DNew(struct ubik_trans *atrans, afs_int32 fid, int page)
710 struct ubik_dbase *dbase = atrans->dbase;
712 if ((tb = newslot(dbase, fid, page)) == 0)
715 memset(tb->data, 0, UBIK_PAGESIZE);
720 * \brief Read data from database.
723 udisk_read(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
724 afs_int32 apos, afs_int32 alen)
727 afs_int32 offset, len, totalLen;
729 if (atrans->flags & TRDONE)
733 bp = DRead(atrans, afile, apos >> UBIK_LOGPAGESIZE);
736 /* otherwise, min of remaining bytes and end of buffer to user mode */
737 offset = apos & (UBIK_PAGESIZE - 1);
738 len = UBIK_PAGESIZE - offset;
741 memcpy(abuffer, bp + offset, len);
742 abuffer = (char *)abuffer + len;
752 * \brief Truncate file.
755 udisk_truncate(struct ubik_trans *atrans, afs_int32 afile, afs_int32 alength)
758 struct ubik_trunc *tt;
760 if (atrans->flags & TRDONE)
762 if (atrans->type != UBIK_WRITETRANS)
765 /* write a truncate log record */
766 code = udisk_LogTruncate(atrans->dbase, afile, alength);
768 /* don't truncate until commit time */
769 tt = FindTrunc(atrans, afile);
771 /* this file not truncated yet */
773 tt->next = atrans->activeTruncs;
774 atrans->activeTruncs = tt;
776 tt->length = alength;
778 /* already truncated to a certain length */
779 if (tt->length > alength)
780 tt->length = alength;
786 * \brief Write data to database, using logs.
789 udisk_write(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
790 afs_int32 apos, afs_int32 alen)
793 afs_int32 offset, len, totalLen;
794 struct ubik_trunc *tt;
797 if (atrans->flags & TRDONE)
799 if (atrans->type != UBIK_WRITETRANS)
802 /* first write the data to the log */
803 code = udisk_LogWriteData(atrans->dbase, afile, abuffer, apos, alen);
807 /* expand any truncations of this file */
808 tt = FindTrunc(atrans, afile);
810 if (tt->length < apos + alen) {
811 tt->length = apos + alen;
818 bp = DRead(atrans, afile, apos >> UBIK_LOGPAGESIZE);
820 bp = DNew(atrans, afile, apos >> UBIK_LOGPAGESIZE);
823 memset(bp, 0, UBIK_PAGESIZE);
825 /* otherwise, min of remaining bytes and end of buffer to user mode */
826 offset = apos & (UBIK_PAGESIZE - 1);
827 len = UBIK_PAGESIZE - offset;
830 memcpy(bp + offset, abuffer, len);
831 abuffer = (char *)abuffer + len;
835 DRelease(bp, 1); /* buffer modified */
841 * \brief Begin a new local transaction.
844 udisk_begin(struct ubik_dbase *adbase, int atype, struct ubik_trans **atrans)
847 struct ubik_trans *tt;
849 *atrans = (struct ubik_trans *)NULL;
850 /* Make sure system is initialized before doing anything */
853 DInit(ubik_nBuffers);
855 if (atype == UBIK_WRITETRANS) {
856 if (adbase->flags & DBWRITING)
858 code = udisk_LogOpcode(adbase, LOGNEW, 0);
862 tt = (struct ubik_trans *)malloc(sizeof(struct ubik_trans));
863 memset(tt, 0, sizeof(struct ubik_trans));
865 tt->next = adbase->activeTrans;
866 adbase->activeTrans = tt;
868 if (atype == UBIK_READTRANS)
870 else if (atype == UBIK_WRITETRANS)
871 adbase->flags |= DBWRITING;
877 * \brief Commit transaction.
880 udisk_commit(struct ubik_trans *atrans)
882 struct ubik_dbase *dbase;
884 struct ubik_version oldversion, newversion;
886 if (atrans->flags & TRDONE)
889 if (atrans->type == UBIK_WRITETRANS) {
890 dbase = atrans->dbase;
892 /* On the first write to the database. We update the versions */
893 if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
894 oldversion = dbase->version;
895 newversion.epoch = FT_ApproxTime();;
896 newversion.counter = 1;
898 code = (*dbase->setlabel) (dbase, 0, &newversion);
901 ubik_epochTime = newversion.epoch;
902 dbase->version = newversion;
904 /* Ignore the error here. If the call fails, the site is
905 * marked down and when we detect it is up again, we will
906 * send the entire database to it.
908 ContactQuorum_DISK_SetVersion( atrans, 1 /*CStampVersion */ ,
909 &oldversion, &newversion);
910 urecovery_state |= UBIK_RECLABELDB;
913 dbase->version.counter++; /* bump commit count */
914 #ifdef AFS_PTHREAD_ENV
915 CV_BROADCAST(&dbase->version_cond);
917 LWP_NoYieldSignal(&dbase->version);
919 code = udisk_LogEnd(dbase, &dbase->version);
921 dbase->version.counter--;
925 /* If we fail anytime after this, then panic and let the
926 * recovery replay the log.
928 code = DFlush(atrans); /* write dirty pages to respective files */
930 panic("Writing Ubik DB modifications\n");
931 code = DSync(atrans); /* sync the files and mark pages not dirty */
933 panic("Synchronizing Ubik DB modifications\n");
935 code = DoTruncs(atrans); /* Perform requested truncations */
937 panic("Truncating Ubik DB\n");
939 /* label the committed dbase */
940 code = (*dbase->setlabel) (dbase, 0, &dbase->version);
942 panic("Truncating Ubik DB\n");
944 code = (*dbase->truncate) (dbase, LOGFILE, 0); /* discard log (optional) */
946 panic("Truncating Ubik logfile\n");
950 /* When the transaction is marked done, it also means the logfile
951 * has been truncated.
953 atrans->flags |= TRDONE;
958 * \brief Abort transaction.
961 udisk_abort(struct ubik_trans *atrans)
963 struct ubik_dbase *dbase;
966 if (atrans->flags & TRDONE)
969 /* Check if we are the write trans before logging abort, lest we
970 * abort a good write trans in progress.
971 * We don't really care if the LOGABORT gets to the log because we
972 * truncate the log next. If the truncate fails, we panic; for
973 * otherwise, the log entries remain. On restart, replay of the log
974 * will do nothing because the abort is there or no LogEnd opcode.
976 dbase = atrans->dbase;
977 if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
978 udisk_LogOpcode(dbase, LOGABORT, 1);
979 code = (*dbase->truncate) (dbase, LOGFILE, 0);
981 panic("Truncating Ubik logfile during an abort\n");
982 DAbort(atrans); /* remove all dirty pages */
985 /* When the transaction is marked done, it also means the logfile
986 * has been truncated.
988 atrans->flags |= (TRABORT | TRDONE);
993 * \brief Destroy a transaction after it has been committed or aborted.
995 * If it hasn't committed before you call this routine, we'll abort the
996 * transaction for you.
999 udisk_end(struct ubik_trans *atrans)
1001 struct ubik_dbase *dbase;
1003 #if defined(UBIK_PAUSE)
1004 /* Another thread is trying to lock this transaction.
1005 * That can only be an RPC doing SDISK_Lock.
1006 * Unlock the transaction, 'cause otherwise the other
1007 * thread will never wake up. Don't free it because
1008 * the caller will do that already.
1010 if (atrans->flags & TRSETLOCK) {
1011 atrans->flags |= TRSTALE;
1012 ulock_relLock(atrans);
1015 #endif /* UBIK_PAUSE */
1016 if (!(atrans->flags & TRDONE))
1017 udisk_abort(atrans);
1018 dbase = atrans->dbase;
1020 ulock_relLock(atrans);
1023 /* check if we are the write trans before unsetting the DBWRITING bit, else
1024 * we could be unsetting someone else's bit.
1026 if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
1027 dbase->flags &= ~DBWRITING;
1031 if (atrans->iovec_info.iovec_wrt_val)
1032 free(atrans->iovec_info.iovec_wrt_val);
1033 if (atrans->iovec_data.iovec_buf_val)
1034 free(atrans->iovec_data.iovec_buf_val);
1037 /* Wakeup any writers waiting in BeginTrans() */
1038 #ifdef AFS_PTHREAD_ENV
1039 CV_BROADCAST(&dbase->flags_cond);
1041 LWP_NoYieldSignal(&dbase->flags);