2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
16 #include <sys/types.h>
21 #include <netinet/in.h>
36 #define UBIK_INTERNALS
42 struct ubik_dbase *dbase; /* dbase within which the buffer resides */
43 afs_int32 file; /* Unique cache key */
44 afs_int32 page; /* page number */
45 struct buffer *lru_next;
46 struct buffer *lru_prev;
47 struct buffer *hashNext; /* next dude in hash table */
48 char *data; /* ptr to the data */
49 char lockers; /* usage ref count */
50 char dirty; /* is buffer modified */
51 char hashIndex; /* back ptr to hash table */
54 #define pHash(page) ((page) & (PHSIZE-1))
56 afs_int32 ubik_nBuffers = NBUFFERS;
57 static struct buffer *phTable[PHSIZE]; /* page hash table */
58 static struct buffer *LruBuffer;
60 static int calls = 0, ios = 0, lastb = 0;
61 static char *BufferData;
62 static struct buffer *newslot(struct ubik_dbase *adbase, afs_int32 afid,
65 #define BADFID 0xffffffff
67 static DTrunc(struct ubik_dbase *dbase, afs_int32 fid, afs_int32 length);
69 static struct ubik_trunc *freeTruncList = 0;
71 /* remove a transaction from the database's active transaction list. Don't free it */
73 unthread(struct ubik_trans *atrans)
75 struct ubik_trans **lt, *tt;
76 lt = &atrans->dbase->activeTrans;
77 for (tt = *lt; tt; lt = &tt->next, tt = *lt) {
84 return 2; /* no entry */
87 /* some debugging assistance */
89 udisk_Debug(struct ubik_debug *aparm)
94 memcpy(&aparm->localVersion, &ubik_dbase->version,
95 sizeof(struct ubik_version));
96 aparm->lockedPages = 0;
97 aparm->writeLockedPages = 0;
99 for (i = 0; i < nbuffers; i++, tb++) {
101 aparm->lockedPages++;
103 aparm->writeLockedPages++;
108 /* log format is defined here, and implicitly in recovery.c
110 * 4 byte opcode, followed by parameters, each 4 bytes long. All integers
111 * are in logged in network standard byte order, in case we want to move logs
112 * from machine-to-machine someday.
114 * Begin transaction: opcode
115 * Commit transaction: opcode, version (8 bytes)
116 * Truncate file: opcode, file number, length
117 * Abort transaction: opcode
118 * Write data: opcode, file, position, length, <length> data bytes
121 /* write an opcode to the log */
123 udisk_LogOpcode(struct ubik_dbase *adbase, afs_int32 aopcode, int async)
125 struct ubik_stat ustat;
128 /* figure out where to write */
129 code = (*adbase->stat) (adbase, LOGFILE, &ustat);
133 /* setup data and do write */
134 aopcode = htonl(aopcode);
136 (*adbase->write) (adbase, LOGFILE, &aopcode, ustat.size,
138 if (code != sizeof(afs_int32))
141 /* optionally sync data */
143 code = (*adbase->sync) (adbase, LOGFILE);
149 /* log a commit, never syncing */
151 udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion)
155 struct ubik_stat ustat;
157 /* figure out where to write */
158 code = (*adbase->stat) (adbase, LOGFILE, &ustat);
163 data[0] = htonl(LOGEND);
164 data[1] = htonl(aversion->epoch);
165 data[2] = htonl(aversion->counter);
169 (*adbase->write) (adbase, LOGFILE, data, ustat.size,
170 3 * sizeof(afs_int32));
171 if (code != 3 * sizeof(afs_int32))
174 /* finally sync the log */
175 code = (*adbase->sync) (adbase, LOGFILE);
179 /* log a truncate operation, never syncing */
181 udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile,
186 struct ubik_stat ustat;
188 /* figure out where to write */
189 code = (*adbase->stat) (adbase, LOGFILE, &ustat);
194 data[0] = htonl(LOGTRUNCATE);
195 data[1] = htonl(afile);
196 data[2] = htonl(alength);
200 (*adbase->write) (adbase, LOGFILE, data, ustat.size,
201 3 * sizeof(afs_int32));
202 if (code != 3 * sizeof(afs_int32))
207 /* write some data to the log, never syncing */
209 udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, char *abuffer,
210 afs_int32 apos, afs_int32 alen)
212 struct ubik_stat ustat;
217 /* find end of log */
218 code = (*adbase->stat) (adbase, LOGFILE, &ustat);
224 data[0] = htonl(LOGDATA);
225 data[1] = htonl(afile);
226 data[2] = htonl(apos);
227 data[3] = htonl(alen);
231 (*adbase->write) (adbase, LOGFILE, data, lpos, 4 * sizeof(afs_int32));
232 if (code != 4 * sizeof(afs_int32))
234 lpos += 4 * sizeof(afs_int32);
237 code = (*adbase->write) (adbase, LOGFILE, abuffer, lpos, alen);
246 /* Initialize the venus buffer system. */
249 Buffers = (struct buffer *)malloc(abuffers * sizeof(struct buffer));
250 memset(Buffers, 0, abuffers * sizeof(struct buffer));
251 BufferData = (char *)malloc(abuffers * UBIK_PAGESIZE);
253 for (i = 0; i < PHSIZE; i++)
255 for (i = 0; i < abuffers; i++) {
256 /* Fill in each buffer with an empty indication. */
258 tb->lru_next = &(Buffers[i + 1]);
259 tb->lru_prev = &(Buffers[i - 1]);
260 tb->data = &BufferData[UBIK_PAGESIZE * i];
263 Buffers[0].lru_prev = &(Buffers[abuffers - 1]);
264 Buffers[abuffers - 1].lru_next = &(Buffers[0]);
265 LruBuffer = &(Buffers[0]);
269 /* Take a buffer and mark it as the least recently used buffer */
271 Dlru(struct buffer *abuf)
273 if (LruBuffer == abuf)
276 /* Unthread from where it is in the list */
277 abuf->lru_next->lru_prev = abuf->lru_prev;
278 abuf->lru_prev->lru_next = abuf->lru_next;
280 /* Thread onto beginning of LRU list */
281 abuf->lru_next = LruBuffer;
282 abuf->lru_prev = LruBuffer->lru_prev;
284 LruBuffer->lru_prev->lru_next = abuf;
285 LruBuffer->lru_prev = abuf;
289 /* Take a buffer and mark it as the most recently used buffer */
291 Dmru(struct buffer *abuf)
293 if (LruBuffer == abuf) {
294 LruBuffer = LruBuffer->lru_next;
298 /* Unthread from where it is in the list */
299 abuf->lru_next->lru_prev = abuf->lru_prev;
300 abuf->lru_prev->lru_next = abuf->lru_next;
302 /* Thread onto end of LRU list - making it the MRU buffer */
303 abuf->lru_next = LruBuffer;
304 abuf->lru_prev = LruBuffer->lru_prev;
305 LruBuffer->lru_prev->lru_next = abuf;
306 LruBuffer->lru_prev = abuf;
309 /* get a pointer to a particular buffer */
311 DRead(struct ubik_dbase *dbase, afs_int32 fid, int page)
313 /* Read a page from the disk. */
314 struct buffer *tb, *lastbuffer;
318 lastbuffer = LruBuffer->lru_prev;
320 if ((lastbuffer->page == page) && (lastbuffer->file == fid)
321 && (lastbuffer->dbase == dbase)) {
327 for (tb = phTable[pHash(page)]; tb; tb = tb->hashNext) {
328 if (tb->page == page && tb->file == fid && tb->dbase == dbase) {
335 tb = newslot(dbase, fid, page);
338 memset(tb->data, 0, UBIK_PAGESIZE);
342 (*dbase->read) (dbase, fid, tb->data, page * UBIK_PAGESIZE,
348 ubik_print("Ubik: Error reading database file: errno=%d\n", errno);
353 /* Note that findslot sets the page field in the buffer equal to
354 * what it is searching for.
359 /* zap truncated pages */
361 DTrunc(struct ubik_dbase *dbase, afs_int32 fid, afs_int32 length)
367 maxPage = (length + UBIK_PAGESIZE - 1) >> UBIK_LOGPAGESIZE; /* first invalid page now in file */
368 for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
369 if (tb->page >= maxPage && tb->file == fid && tb->dbase == dbase) {
377 /* allocate a truncation entry. We allocate special entries representing truncations, rather than
378 performing them immediately, so that we can abort a transaction easily by simply purging
379 the in-core memory buffers and discarding these truncation entries.
381 static struct ubik_trunc *
384 struct ubik_trunc *tt;
385 if (!freeTruncList) {
387 (struct ubik_trunc *)malloc(sizeof(struct ubik_trunc));
388 freeTruncList->next = (struct ubik_trunc *)0;
391 freeTruncList = tt->next;
395 /* free a truncation entry */
397 PutTrunc(struct ubik_trunc *at)
399 at->next = freeTruncList;
404 /* find a truncation entry for a file, if any */
405 static struct ubik_trunc *
406 FindTrunc(struct ubik_trans *atrans, afs_int32 afile)
408 struct ubik_trunc *tt;
409 for (tt = atrans->activeTruncs; tt; tt = tt->next) {
410 if (tt->file == afile)
413 return (struct ubik_trunc *)0;
416 /* do truncates associated with trans, and free them */
418 DoTruncs(struct ubik_trans *atrans)
420 struct ubik_trunc *tt, *nt;
422 afs_int32 rcode = 0, code;
424 tproc = atrans->dbase->truncate;
425 for (tt = atrans->activeTruncs; tt; tt = nt) {
427 DTrunc(atrans->dbase, tt->file, tt->length); /* zap pages from buffer cache */
428 code = (*tproc) (atrans->dbase, tt->file, tt->length);
433 /* don't unthread, because we do the entire list's worth here */
434 atrans->activeTruncs = (struct ubik_trunc *)0;
438 /* mark a fid as invalid */
440 udisk_Invalidate(struct ubik_dbase *adbase, afs_int32 afid)
445 for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
446 if (tb->file == afid) {
454 /* move this page into the correct hash bucket */
456 FixupBucket(struct buffer *ap)
458 struct buffer **lp, *tp;
460 /* first try to get it out of its current hash bucket, in which it might not be */
463 for (tp = *lp; tp; tp = tp->hashNext) {
470 /* now figure the new hash bucket */
472 ap->hashIndex = i; /* remember where we are for deletion */
473 ap->hashNext = phTable[i]; /* add us to the list */
477 /* create a new slot for a particular dbase page */
478 static struct buffer *
479 newslot(struct ubik_dbase *adbase, afs_int32 afid, afs_int32 apage)
481 /* Find a usable buffer slot */
483 struct buffer *pp, *tp;
485 pp = 0; /* last pure */
486 for (i = 0, tp = LruBuffer; i < nbuffers; i++, tp = tp->lru_next) {
487 if (!tp->lockers && !tp->dirty) {
494 /* There are no unlocked buffers that don't need to be written to the disk. */
496 ("Ubik: Internal Error: Unable to find free buffer in ubik cache\n");
500 /* Now fill in the header. */
505 FixupBucket(pp); /* move to the right hash bucket */
510 /* Release a buffer, specifying whether or not the buffer has been modified by the locker. */
512 DRelease(char *ap, int flag)
519 index = (ap - (char *)BufferData) >> UBIK_LOGPAGESIZE;
520 bp = &(Buffers[index]);
527 /* flush all modified buffers, leaves dirty bits set (they're cleared
528 * by DSync). Note interaction with DSync: you call this thing first,
529 * writing the buffers to the disk. Then you call DSync to sync all the
530 * files that were written, and to clear the dirty bits. You should
531 * always call DFlush/DSync as a pair.
534 DFlush(struct ubik_dbase *adbase)
541 for (i = 0; i < nbuffers; i++, tb++) {
543 code = tb->page * UBIK_PAGESIZE; /* offset within file */
545 (*adbase->write) (adbase, tb->file, tb->data, code,
547 if (code != UBIK_PAGESIZE)
554 /* flush all modified buffers */
556 DAbort(struct ubik_dbase *adbase)
562 for (i = 0; i < nbuffers; i++, tb++) {
572 /* must only be called after DFlush, due to its interpretation of dirty flag */
574 DSync(struct ubik_dbase *adbase)
585 for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
586 if (tb->dirty == 1) {
589 if (file != BADFID && tb->file == file)
595 /* otherwise we have a file to sync */
596 code = (*adbase->sync) (adbase, file);
603 /* Same as read, only do not even try to read the page */
605 DNew(struct ubik_dbase *dbase, afs_int32 fid, int page)
609 if ((tb = newslot(dbase, fid, page)) == 0)
612 memset(tb->data, 0, UBIK_PAGESIZE);
616 /* read data from database */
618 udisk_read(struct ubik_trans *atrans, afs_int32 afile, char *abuffer,
619 afs_int32 apos, afs_int32 alen)
622 afs_int32 offset, len, totalLen;
623 struct ubik_dbase *dbase;
625 if (atrans->flags & TRDONE)
628 dbase = atrans->dbase;
630 bp = DRead(dbase, afile, apos >> UBIK_LOGPAGESIZE);
633 /* otherwise, min of remaining bytes and end of buffer to user mode */
634 offset = apos & (UBIK_PAGESIZE - 1);
635 len = UBIK_PAGESIZE - offset;
638 memcpy(abuffer, bp + offset, len);
650 udisk_truncate(struct ubik_trans *atrans, afs_int32 afile, afs_int32 alength)
653 struct ubik_trunc *tt;
655 if (atrans->flags & TRDONE)
657 if (atrans->type != UBIK_WRITETRANS)
660 /* write a truncate log record */
661 code = udisk_LogTruncate(atrans->dbase, afile, alength);
663 /* don't truncate until commit time */
664 tt = FindTrunc(atrans, afile);
666 /* this file not truncated yet */
668 tt->next = atrans->activeTruncs;
669 atrans->activeTruncs = tt;
671 tt->length = alength;
673 /* already truncated to a certain length */
674 if (tt->length > alength)
675 tt->length = alength;
680 /* write data to database, using logs */
682 udisk_write(struct ubik_trans *atrans, afs_int32 afile, char *abuffer,
683 afs_int32 apos, afs_int32 alen)
686 afs_int32 offset, len, totalLen;
687 struct ubik_dbase *dbase;
688 struct ubik_trunc *tt;
691 if (atrans->flags & TRDONE)
693 if (atrans->type != UBIK_WRITETRANS)
696 dbase = atrans->dbase;
697 /* first write the data to the log */
698 code = udisk_LogWriteData(dbase, afile, abuffer, apos, alen);
702 /* expand any truncations of this file */
703 tt = FindTrunc(atrans, afile);
705 if (tt->length < apos + alen) {
706 tt->length = apos + alen;
713 bp = DRead(dbase, afile, apos >> UBIK_LOGPAGESIZE);
715 bp = DNew(dbase, afile, apos >> UBIK_LOGPAGESIZE);
718 memset(bp, 0, UBIK_PAGESIZE);
720 /* otherwise, min of remaining bytes and end of buffer to user mode */
721 offset = apos & (UBIK_PAGESIZE - 1);
722 len = UBIK_PAGESIZE - offset;
725 memcpy(bp + offset, abuffer, len);
730 DRelease(bp, 1); /* buffer modified */
735 /* begin a new local transaction */
737 udisk_begin(struct ubik_dbase *adbase, int atype, struct ubik_trans **atrans)
740 struct ubik_trans *tt;
742 *atrans = (struct ubik_trans *)NULL;
743 /* Make sure system is initialized before doing anything */
746 DInit(ubik_nBuffers);
748 if (atype == UBIK_WRITETRANS) {
749 if (adbase->flags & DBWRITING)
751 code = udisk_LogOpcode(adbase, LOGNEW, 0);
755 tt = (struct ubik_trans *)malloc(sizeof(struct ubik_trans));
756 memset(tt, 0, sizeof(struct ubik_trans));
758 tt->next = adbase->activeTrans;
759 adbase->activeTrans = tt;
761 if (atype == UBIK_READTRANS)
763 else if (atype == UBIK_WRITETRANS)
764 adbase->flags |= DBWRITING;
769 /* commit transaction */
771 udisk_commit(struct ubik_trans *atrans)
773 struct ubik_dbase *dbase;
775 struct ubik_version oldversion, newversion;
777 if (atrans->flags & TRDONE)
780 if (atrans->type == UBIK_WRITETRANS) {
781 dbase = atrans->dbase;
783 /* On the first write to the database. We update the versions */
784 if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
785 oldversion = dbase->version;
786 newversion.epoch = FT_ApproxTime();;
787 newversion.counter = 1;
789 code = (*dbase->setlabel) (dbase, 0, &newversion);
792 ubik_epochTime = newversion.epoch;
793 dbase->version = newversion;
795 /* Ignore the error here. If the call fails, the site is
796 * marked down and when we detect it is up again, we will
797 * send the entire database to it.
799 ContactQuorum(DISK_SetVersion, atrans, 1 /*CStampVersion */ ,
800 &oldversion, &newversion);
801 urecovery_state |= UBIK_RECLABELDB;
804 dbase->version.counter++; /* bump commit count */
805 LWP_NoYieldSignal(&dbase->version);
807 code = udisk_LogEnd(dbase, &dbase->version);
809 dbase->version.counter--;
813 /* If we fail anytime after this, then panic and let the
814 * recovery replay the log.
816 code = DFlush(dbase); /* write dirty pages to respective files */
818 panic("Writing Ubik DB modifications\n");
819 code = DSync(dbase); /* sync the files and mark pages not dirty */
821 panic("Synchronizing Ubik DB modifications\n");
823 code = DoTruncs(atrans); /* Perform requested truncations */
825 panic("Truncating Ubik DB\n");
827 /* label the committed dbase */
828 code = (*dbase->setlabel) (dbase, 0, &dbase->version);
830 panic("Truncating Ubik DB\n");
832 code = (*dbase->truncate) (dbase, LOGFILE, 0); /* discard log (optional) */
834 panic("Truncating Ubik logfile\n");
838 /* When the transaction is marked done, it also means the logfile
839 * has been truncated.
841 atrans->flags |= TRDONE;
845 /* abort transaction */
847 udisk_abort(struct ubik_trans *atrans)
849 struct ubik_dbase *dbase;
852 if (atrans->flags & TRDONE)
855 /* Check if we are the write trans before logging abort, lest we
856 * abort a good write trans in progress.
857 * We don't really care if the LOGABORT gets to the log because we
858 * truncate the log next. If the truncate fails, we panic; for
859 * otherwise, the log entries remain. On restart, replay of the log
860 * will do nothing because the abort is there or no LogEnd opcode.
862 dbase = atrans->dbase;
863 if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
864 udisk_LogOpcode(dbase, LOGABORT, 1);
865 code = (*dbase->truncate) (dbase, LOGFILE, 0);
867 panic("Truncating Ubik logfile during an abort\n");
868 DAbort(dbase); /* remove all dirty pages */
871 /* When the transaction is marked done, it also means the logfile
872 * has been truncated.
874 atrans->flags |= (TRABORT | TRDONE);
878 /* destroy a transaction after it has been committed or aborted. if
879 * it hasn't committed before you call this routine, we'll abort the
880 * transaction for you.
883 udisk_end(struct ubik_trans *atrans)
885 struct ubik_dbase *dbase;
887 #if defined(UBIK_PAUSE)
888 /* Another thread is trying to lock this transaction.
889 * That can only be an RPC doing SDISK_Lock.
890 * Unlock the transaction, 'cause otherwise the other
891 * thread will never wake up. Don't free it because
892 * the caller will do that already.
894 if (atrans->flags & TRSETLOCK) {
895 atrans->flags |= TRSTALE;
896 ulock_relLock(atrans);
899 #endif /* UBIK_PAUSE */
900 if (!(atrans->flags & TRDONE))
902 dbase = atrans->dbase;
904 ulock_relLock(atrans);
907 /* check if we are the write trans before unsetting the DBWRITING bit, else
908 * we could be unsetting someone else's bit.
910 if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
911 dbase->flags &= ~DBWRITING;
915 if (atrans->iovec_info.iovec_wrt_val)
916 free(atrans->iovec_info.iovec_wrt_val);
917 if (atrans->iovec_data.iovec_buf_val)
918 free(atrans->iovec_data.iovec_buf_val);
921 /* Wakeup any writers waiting in BeginTrans() */
922 LWP_NoYieldSignal(&dbase->flags);