2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
15 #include <sys/types.h>
20 #include <netinet/in.h>
35 #define UBIK_INTERNALS
41 struct ubik_dbase *dbase; /* dbase within which the buffer resides */
42 afs_int32 file; /* Unique cache key */
43 afs_int32 page; /* page number */
44 struct buffer *lru_next;
45 struct buffer *lru_prev;
46 struct buffer *hashNext; /* next dude in hash table */
47 char *data; /* ptr to the data */
48 char lockers; /* usage ref count */
49 char dirty; /* is buffer modified */
50 char hashIndex; /* back ptr to hash table */
53 #define pHash(page) ((page) & (PHSIZE-1))
55 afs_int32 ubik_nBuffers = NBUFFERS;
56 static struct buffer *phTable[PHSIZE]; /* page hash table */
57 static struct buffer *LruBuffer;
59 static int calls=0, ios=0, lastb=0;
60 static char *BufferData;
61 static struct buffer *newslot();
63 #define BADFID 0xffffffff
67 static struct ubik_trunc *freeTruncList=0;
69 /* remove a transaction from the database's active transaction list. Don't free it */
70 static unthread(atrans)
71 struct ubik_trans *atrans; {
72 struct ubik_trans **lt, *tt;
73 lt = &atrans->dbase->activeTrans;
74 for(tt = *lt; tt; lt = &tt->next, tt = *lt) {
81 return 2; /* no entry */
84 /* some debugging assistance */
86 struct ubik_debug *aparm; {
90 memcpy(&aparm->localVersion, &ubik_dbase->version, sizeof(struct ubik_version));
91 aparm->lockedPages = 0;
92 aparm->writeLockedPages = 0;
94 for(i=0;i<nbuffers;i++, tb++) {
97 if (tb->dirty) aparm->writeLockedPages++;
102 /* log format is defined here, and implicitly in recovery.c
104 * 4 byte opcode, followed by parameters, each 4 bytes long. All integers
105 * are in logged in network standard byte order, in case we want to move logs
106 * from machine-to-machine someday.
108 * Begin transaction: opcode
109 * Commit transaction: opcode, version (8 bytes)
110 * Truncate file: opcode, file number, length
111 * Abort transaction: opcode
112 * Write data: opcode, file, position, length, <length> data bytes
115 /* write an opcode to the log */
116 udisk_LogOpcode(adbase, aopcode, async)
117 struct ubik_dbase *adbase;
120 struct ubik_stat ustat;
123 /* figure out where to write */
124 code = (*adbase->stat)(adbase, LOGFILE, &ustat);
125 if (code < 0) return code;
127 /* setup data and do write */
128 aopcode = htonl(aopcode);
129 code = (*adbase->write)(adbase, LOGFILE, &aopcode, ustat.size, sizeof(afs_int32));
130 if (code != sizeof(afs_int32)) return UIOERROR;
132 /* optionally sync data */
133 if (async) code = (*adbase->sync)(adbase, LOGFILE);
138 /* log a commit, never syncing */
139 udisk_LogEnd(adbase, aversion)
140 struct ubik_dbase *adbase;
141 struct ubik_version *aversion; {
144 struct ubik_stat ustat;
146 /* figure out where to write */
147 code = (*adbase->stat)(adbase, LOGFILE, &ustat);
148 if (code) return code;
151 data[0] = htonl(LOGEND);
152 data[1] = htonl(aversion->epoch);
153 data[2] = htonl(aversion->counter);
156 code = (*adbase->write)(adbase, LOGFILE, data, ustat.size, 3*sizeof(afs_int32));
157 if (code != 3*sizeof(afs_int32)) return UIOERROR;
159 /* finally sync the log */
160 code = (*adbase->sync)(adbase, LOGFILE);
164 /* log a truncate operation, never syncing */
165 udisk_LogTruncate(adbase, afile, alength)
166 struct ubik_dbase *adbase;
167 afs_int32 afile, alength; {
170 struct ubik_stat ustat;
172 /* figure out where to write */
173 code = (*adbase->stat)(adbase, LOGFILE, &ustat);
174 if (code < 0) return code;
177 data[0] = htonl(LOGTRUNCATE);
178 data[1] = htonl(afile);
179 data[2] = htonl(alength);
182 code = (*adbase->write)(adbase, LOGFILE, data, ustat.size, 3*sizeof(afs_int32));
183 if (code != 3*sizeof(afs_int32)) return UIOERROR;
187 /* write some data to the log, never syncing */
188 udisk_LogWriteData(adbase, afile, abuffer, apos, alen)
189 struct ubik_dbase *adbase;
194 struct ubik_stat ustat;
199 /* find end of log */
200 code = (*adbase->stat)(adbase, LOGFILE, &ustat);
202 if (code < 0) return code;
205 data[0] = htonl(LOGDATA);
206 data[1] = htonl(afile);
207 data[2] = htonl(apos);
208 data[3] = htonl(alen);
211 code = (*adbase->write)(adbase, LOGFILE, data, lpos, 4*sizeof(afs_int32));
212 if (code != 4*sizeof(afs_int32)) return UIOERROR;
213 lpos += 4*sizeof(afs_int32);
216 code = (*adbase->write)(adbase, LOGFILE, abuffer, lpos, alen);
217 if (code != alen) return UIOERROR;
221 static int DInit (abuffers)
223 /* Initialize the venus buffer system. */
226 Buffers = (struct buffer *) malloc(abuffers * sizeof(struct buffer));
227 memset(Buffers, 0, abuffers * sizeof(struct buffer));
228 BufferData = (char *) malloc(abuffers * UBIK_PAGESIZE);
230 for(i=0;i<PHSIZE;i++) phTable[i] = 0;
231 for (i=0;i<abuffers;i++) {
232 /* Fill in each buffer with an empty indication. */
234 tb->lru_next = &(Buffers[i+1]);
235 tb->lru_prev = &(Buffers[i-1]);
236 tb->data = &BufferData[UBIK_PAGESIZE*i];
239 Buffers[0].lru_prev = &(Buffers[abuffers-1]);
240 Buffers[abuffers-1].lru_next = &(Buffers[0]);
241 LruBuffer = &(Buffers[0]);
245 /* Take a buffer and mark it as the least recently used buffer */
246 static void Dlru(abuf)
249 if (LruBuffer == abuf)
252 /* Unthread from where it is in the list */
253 abuf->lru_next->lru_prev = abuf->lru_prev;
254 abuf->lru_prev->lru_next = abuf->lru_next;
256 /* Thread onto beginning of LRU list */
257 abuf->lru_next = LruBuffer;
258 abuf->lru_prev = LruBuffer->lru_prev;
260 LruBuffer->lru_prev->lru_next = abuf;
261 LruBuffer->lru_prev = abuf;
265 /* Take a buffer and mark it as the most recently used buffer */
266 static void Dmru(abuf)
269 if (LruBuffer == abuf) {
270 LruBuffer = LruBuffer->lru_next;
274 /* Unthread from where it is in the list */
275 abuf->lru_next->lru_prev = abuf->lru_prev;
276 abuf->lru_prev->lru_next = abuf->lru_next;
278 /* Thread onto end of LRU list - making it the MRU buffer */
279 abuf->lru_next = LruBuffer;
280 abuf->lru_prev = LruBuffer->lru_prev;
281 LruBuffer->lru_prev->lru_next = abuf;
282 LruBuffer->lru_prev = abuf;
285 /* get a pointer to a particular buffer */
286 static char *DRead(dbase, fid, page)
287 struct ubik_dbase *dbase;
290 /* Read a page from the disk. */
291 struct buffer *tb, *lastbuffer;
295 lastbuffer = LruBuffer->lru_prev;
297 if ((lastbuffer->page == page ) &&
298 (lastbuffer->file == fid ) &&
299 (lastbuffer->dbase == dbase)) {
305 for(tb=phTable[pHash(page)]; tb; tb=tb->hashNext) {
306 if (tb->page == page && tb->file == fid && tb->dbase == dbase) {
313 tb = newslot(dbase, fid, page);
315 memset(tb->data, 0, UBIK_PAGESIZE);
318 code = (*dbase->read)(dbase, fid, tb->data, page*UBIK_PAGESIZE, UBIK_PAGESIZE);
323 ubik_print("Ubik: Error reading database file: errno=%d\n", errno);
328 /* Note that findslot sets the page field in the buffer equal to
329 * what it is searching for.
334 /* zap truncated pages */
335 static DTrunc(dbase, fid, length)
336 struct ubik_dbase *dbase;
343 maxPage = (length+UBIK_PAGESIZE-1)>>UBIK_LOGPAGESIZE; /* first invalid page now in file */
344 for (i=0,tb=Buffers; i<nbuffers; i++,tb++) {
345 if (tb->page >= maxPage && tb->file == fid && tb->dbase == dbase) {
353 /* allocate a truncation entry. We allocate special entries representing truncations, rather than
354 performing them immediately, so that we can abort a transaction easily by simply purging
355 the in-core memory buffers and discarding these truncation entries.
357 static struct ubik_trunc *GetTrunc() {
358 struct ubik_trunc *tt;
359 if (!freeTruncList) {
360 freeTruncList = (struct ubik_trunc *) malloc(sizeof(struct ubik_trunc));
361 freeTruncList->next = (struct ubik_trunc *) 0;
364 freeTruncList = tt->next;
368 /* free a truncation entry */
370 struct ubik_trunc *at; {
371 at->next = freeTruncList;
376 /* find a truncation entry for a file, if any */
377 static struct ubik_trunc *FindTrunc(atrans, afile)
378 struct ubik_trans *atrans;
380 struct ubik_trunc *tt;
381 for(tt=atrans->activeTruncs; tt; tt=tt->next) {
382 if (tt->file == afile) return tt;
384 return (struct ubik_trunc *) 0;
387 /* do truncates associated with trans, and free them */
388 static DoTruncs(atrans)
389 struct ubik_trans *atrans; {
390 struct ubik_trunc *tt, *nt;
392 afs_int32 rcode=0, code;
394 tproc = atrans->dbase->truncate;
395 for(tt = atrans->activeTruncs; tt; tt=nt) {
397 DTrunc(atrans->dbase, tt->file, tt->length); /* zap pages from buffer cache */
398 code = (*tproc)(atrans->dbase, tt->file, tt->length);
399 if (code) rcode = code;
402 /* don't unthread, because we do the entire list's worth here */
403 atrans->activeTruncs = (struct ubik_trunc *) 0;
407 /* mark a fid as invalid */
408 udisk_Invalidate(adbase, afid)
409 struct ubik_dbase *adbase;
414 for (i=0,tb=Buffers; i<nbuffers; i++,tb++) {
415 if (tb->file == afid) {
423 /* move this page into the correct hash bucket */
424 static FixupBucket(ap)
426 struct buffer **lp, *tp;
428 /* first try to get it out of its current hash bucket, in which it might not be */
431 for(tp = *lp; tp; tp=tp->hashNext) {
438 /* now figure the new hash bucket */
440 ap->hashIndex = i; /* remember where we are for deletion */
441 ap->hashNext = phTable[i]; /* add us to the list */
445 /* create a new slot for a particular dbase page */
446 static struct buffer *newslot (adbase, afid, apage)
447 struct ubik_dbase *adbase;
448 afs_int32 afid, apage; {
449 /* Find a usable buffer slot */
451 struct buffer *pp, *tp;
453 pp = 0; /* last pure */
454 for (i=0,tp=LruBuffer; i<nbuffers; i++,tp=tp->lru_next) {
455 if (!tp->lockers && !tp->dirty) {
462 /* There are no unlocked buffers that don't need to be written to the disk. */
463 ubik_print("Ubik: Internal Error: Unable to find free buffer in ubik cache\n");
467 /* Now fill in the header. */
472 FixupBucket(pp); /* move to the right hash bucket */
477 /* Release a buffer, specifying whether or not the buffer has been modified by the locker. */
478 static void DRelease (ap,flag)
485 index = (ap - (char *)BufferData) >> UBIK_LOGPAGESIZE;
486 bp = &(Buffers[index]);
488 if (flag) bp->dirty=1;
492 /* flush all modified buffers, leaves dirty bits set (they're cleared
493 * by DSync). Note interaction with DSync: you call this thing first,
494 * writing the buffers to the disk. Then you call DSync to sync all the
495 * files that were written, and to clear the dirty bits. You should
496 * always call DFlush/DSync as a pair.
498 static DFlush (adbase)
499 struct ubik_dbase *adbase; {
505 for(i=0;i<nbuffers;i++,tb++) {
507 code = tb->page * UBIK_PAGESIZE; /* offset within file */
508 code = (*adbase->write)(adbase, tb->file, tb->data, code, UBIK_PAGESIZE);
509 if (code != UBIK_PAGESIZE) return UIOERROR;
515 /* flush all modified buffers */
516 static DAbort (adbase)
517 struct ubik_dbase *adbase; {
522 for(i=0;i<nbuffers;i++,tb++) {
532 /* must only be called after DFlush, due to its interpretation of dirty flag */
534 struct ubik_dbase *adbase; {
544 for(i=0,tb = Buffers; i<nbuffers; i++,tb++) {
545 if (tb->dirty == 1) {
546 if (file == BADFID) file = tb->file;
547 if (file != BADFID && tb->file == file) tb->dirty = 0;
550 if (file == BADFID) break;
551 /* otherwise we have a file to sync */
552 code = (*adbase->sync)(adbase, file);
553 if (code) rCode = code;
558 /* Same as read, only do not even try to read the page */
559 static char *DNew (dbase, fid, page)
560 struct ubik_dbase *dbase;
565 if ((tb = newslot(dbase, fid, page)) == 0) return NULL;
567 memset(tb->data, 0, UBIK_PAGESIZE);
571 /* read data from database */
572 udisk_read(atrans, afile, abuffer, apos, alen)
575 afs_int32 apos, alen;
576 struct ubik_trans *atrans; {
578 afs_int32 offset, len, totalLen;
579 struct ubik_dbase *dbase;
581 if (atrans->flags & TRDONE) return UDONE;
583 dbase = atrans->dbase;
585 bp = DRead(dbase, afile, apos>>UBIK_LOGPAGESIZE);
586 if (!bp) return UEOF;
587 /* otherwise, min of remaining bytes and end of buffer to user mode */
588 offset = apos & (UBIK_PAGESIZE-1);
589 len = UBIK_PAGESIZE - offset;
590 if (len > alen) len = alen;
591 memcpy(abuffer, bp+offset, len);
602 udisk_truncate(atrans, afile, alength)
603 struct ubik_trans *atrans;
607 struct ubik_trunc *tt;
609 if (atrans->flags & TRDONE) return UDONE;
610 if (atrans->type != UBIK_WRITETRANS) return UBADTYPE;
612 /* write a truncate log record */
613 code = udisk_LogTruncate(atrans->dbase, afile, alength);
615 /* don't truncate until commit time */
616 tt = FindTrunc(atrans, afile);
618 /* this file not truncated yet */
620 tt->next = atrans->activeTruncs;
621 atrans->activeTruncs = tt;
623 tt->length = alength;
626 /* already truncated to a certain length */
627 if (tt->length > alength) tt->length = alength;
632 /* write data to database, using logs */
633 udisk_write(atrans, afile, abuffer, apos, alen)
636 afs_int32 apos, alen;
637 struct ubik_trans *atrans; {
639 afs_int32 offset, len, totalLen;
640 struct ubik_dbase *dbase;
641 struct ubik_trunc *tt;
644 if (atrans->flags & TRDONE) return UDONE;
645 if (atrans->type != UBIK_WRITETRANS) return UBADTYPE;
647 dbase = atrans->dbase;
648 /* first write the data to the log */
649 code = udisk_LogWriteData(dbase, afile, abuffer, apos, alen);
650 if (code) return code;
652 /* expand any truncations of this file */
653 tt = FindTrunc(atrans, afile);
655 if (tt->length < apos + alen) {
656 tt->length = apos + alen;
663 bp = DRead(dbase, afile, apos>>UBIK_LOGPAGESIZE);
665 bp = DNew(dbase, afile, apos>>UBIK_LOGPAGESIZE);
666 if (!bp) return UIOERROR;
667 memset(bp, 0, UBIK_PAGESIZE);
669 /* otherwise, min of remaining bytes and end of buffer to user mode */
670 offset = apos & (UBIK_PAGESIZE-1);
671 len = UBIK_PAGESIZE-offset;
672 if (len > alen) len = alen;
673 memcpy(bp+offset, abuffer, len);
678 DRelease(bp, 1); /* buffer modified */
683 /* begin a new local transaction */
684 udisk_begin(adbase, atype, atrans)
685 struct ubik_trans **atrans;
687 struct ubik_dbase *adbase; {
689 struct ubik_trans *tt;
691 *atrans = (struct ubik_trans *)NULL;
692 /* Make sure system is initialized before doing anything */
695 DInit(ubik_nBuffers);
697 if (atype == UBIK_WRITETRANS) {
698 if (adbase->flags & DBWRITING) return USYNC;
699 code = udisk_LogOpcode(adbase, LOGNEW, 0);
700 if (code) return code;
702 tt = (struct ubik_trans *) malloc(sizeof(struct ubik_trans));
703 memset(tt, 0, sizeof(struct ubik_trans));
705 tt->next = adbase->activeTrans;
706 adbase->activeTrans = tt;
708 if (atype == UBIK_READTRANS) adbase->readers++;
709 else if (atype == UBIK_WRITETRANS) adbase->flags |= DBWRITING;
714 /* commit transaction */
716 struct ubik_trans *atrans; {
717 struct ubik_dbase *dbase;
719 struct ubik_version oldversion, newversion;
721 if (atrans->flags & TRDONE)
724 if (atrans->type == UBIK_WRITETRANS) {
725 dbase = atrans->dbase;
727 /* On the first write to the database. We update the versions */
728 if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
729 oldversion = dbase->version;
730 newversion.epoch = FT_ApproxTime();;
731 newversion.counter = 1;
733 code = (*dbase->setlabel)(dbase, 0, &newversion);
734 if (code) return(code);
735 ubik_epochTime = newversion.epoch;
736 dbase->version = newversion;
738 /* Ignore the error here. If the call fails, the site is
739 * marked down and when we detect it is up again, we will
740 * send the entire database to it.
742 ContactQuorum(DISK_SetVersion, atrans, 1/*CStampVersion*/,
743 &oldversion, &newversion);
744 urecovery_state |= UBIK_RECLABELDB;
747 dbase->version.counter++; /* bump commit count */
748 LWP_NoYieldSignal(&dbase->version);
750 code = udisk_LogEnd(dbase, &dbase->version);
752 dbase->version.counter--;
756 /* If we fail anytime after this, then panic and let the
757 * recovery replay the log.
759 code = DFlush(dbase); /* write dirty pages to respective files */
760 if (code) panic("Writing Ubik DB modifications\n");
761 code = DSync(dbase); /* sync the files and mark pages not dirty */
762 if (code) panic("Synchronizing Ubik DB modifications\n");
764 code = DoTruncs(atrans); /* Perform requested truncations */
765 if (code) panic("Truncating Ubik DB\n");
767 /* label the committed dbase */
768 code = (*dbase->setlabel)(dbase, 0, &dbase->version);
769 if (code) panic("Truncating Ubik DB\n");
771 code = (*dbase->truncate)(dbase, LOGFILE, 0); /* discard log (optional) */
772 if (code) panic("Truncating Ubik logfile\n");
776 /* When the transaction is marked done, it also means the logfile
777 * has been truncated.
779 atrans->flags |= TRDONE;
783 /* abort transaction */
785 struct ubik_trans *atrans;
787 struct ubik_dbase *dbase;
790 if (atrans->flags & TRDONE)
793 /* Check if we are the write trans before logging abort, lest we
794 * abort a good write trans in progress.
795 * We don't really care if the LOGABORT gets to the log because we
796 * truncate the log next. If the truncate fails, we panic; for
797 * otherwise, the log entries remain. On restart, replay of the log
798 * will do nothing because the abort is there or no LogEnd opcode.
800 dbase = atrans->dbase;
801 if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
802 udisk_LogOpcode(dbase, LOGABORT, 1);
803 code = (*dbase->truncate)(dbase, LOGFILE, 0);
804 if (code) panic("Truncating Ubik logfile during an abort\n");
805 DAbort(dbase); /* remove all dirty pages */
808 /* When the transaction is marked done, it also means the logfile
809 * has been truncated.
811 atrans->flags |= (TRABORT | TRDONE);
815 /* destroy a transaction after it has been committed or aborted. if
816 * it hasn't committed before you call this routine, we'll abort the
817 * transaction for you.
820 struct ubik_trans *atrans; {
821 struct ubik_dbase *dbase;
823 #if defined(UBIK_PAUSE)
824 /* Another thread is trying to lock this transaction.
825 * That can only be an RPC doing SDISK_Lock.
826 * Unlock the transaction, 'cause otherwise the other
827 * thread will never wake up. Don't free it because
828 * the caller will do that already.
830 if (atrans->flags & TRSETLOCK) {
831 atrans->flags |= TRSTALE;
832 ulock_relLock(atrans);
835 #endif /* UBIK_PAUSE */
836 if (!(atrans->flags & TRDONE)) udisk_abort(atrans);
837 dbase = atrans->dbase;
839 ulock_relLock(atrans);
842 /* check if we are the write trans before unsetting the DBWRITING bit, else
843 * we could be unsetting someone else's bit.
845 if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
846 dbase->flags &= ~DBWRITING;
850 if (atrans->iovec_info.iovec_wrt_val) free(atrans->iovec_info.iovec_wrt_val);
851 if (atrans->iovec_data.iovec_buf_val) free(atrans->iovec_data.iovec_buf_val);
854 /* Wakeup any writers waiting in BeginTrans() */
855 LWP_NoYieldSignal(&dbase->flags);