2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afs/param.h>
11 #include <afsconfig.h>
15 #include <sys/types.h>
20 #include <netinet/in.h>
28 #define UBIK_INTERNALS
34 struct ubik_dbase *dbase; /* dbase within which the buffer resides */
35 afs_int32 file; /* Unique cache key */
36 afs_int32 page; /* page number */
37 struct buffer *lru_next;
38 struct buffer *lru_prev;
39 struct buffer *hashNext; /* next dude in hash table */
40 char *data; /* ptr to the data */
41 char lockers; /* usage ref count */
42 char dirty; /* is buffer modified */
43 char hashIndex; /* back ptr to hash table */
46 #define pHash(page) ((page) & (PHSIZE-1))
48 afs_int32 ubik_nBuffers = NBUFFERS;
49 static struct buffer *phTable[PHSIZE]; /* page hash table */
50 static struct buffer *LruBuffer;
52 static int calls=0, ios=0, lastb=0;
53 static char *BufferData;
54 static struct buffer *newslot();
56 #define BADFID 0xffffffff
60 static struct ubik_trunc *freeTruncList=0;
62 /* remove a transaction from the database's active transaction list. Don't free it */
63 static unthread(atrans)
64 struct ubik_trans *atrans; {
65 struct ubik_trans **lt, *tt;
66 lt = &atrans->dbase->activeTrans;
67 for(tt = *lt; tt; lt = &tt->next, tt = *lt) {
74 return 2; /* no entry */
77 /* some debugging assistance */
79 struct ubik_debug *aparm; {
83 bcopy(&ubik_dbase->version, &aparm->localVersion, sizeof(struct ubik_version));
84 aparm->lockedPages = 0;
85 aparm->writeLockedPages = 0;
87 for(i=0;i<nbuffers;i++, tb++) {
90 if (tb->dirty) aparm->writeLockedPages++;
95 /* log format is defined here, and implicitly in recovery.c
97 * 4 byte opcode, followed by parameters, each 4 bytes long. All integers
98 * are in logged in network standard byte order, in case we want to move logs
99 * from machine-to-machine someday.
101 * Begin transaction: opcode
102 * Commit transaction: opcode, version (8 bytes)
103 * Truncate file: opcode, file number, length
104 * Abort transaction: opcode
105 * Write data: opcode, file, position, length, <length> data bytes
108 /* write an opcode to the log */
109 udisk_LogOpcode(adbase, aopcode, async)
110 struct ubik_dbase *adbase;
113 struct ubik_stat ustat;
116 /* figure out where to write */
117 code = (*adbase->stat)(adbase, LOGFILE, &ustat);
118 if (code < 0) return code;
120 /* setup data and do write */
121 aopcode = htonl(aopcode);
122 code = (*adbase->write)(adbase, LOGFILE, &aopcode, ustat.size, sizeof(afs_int32));
123 if (code != sizeof(afs_int32)) return UIOERROR;
125 /* optionally sync data */
126 if (async) code = (*adbase->sync)(adbase, LOGFILE);
131 /* log a commit, never syncing */
132 udisk_LogEnd(adbase, aversion)
133 struct ubik_dbase *adbase;
134 struct ubik_version *aversion; {
137 struct ubik_stat ustat;
139 /* figure out where to write */
140 code = (*adbase->stat)(adbase, LOGFILE, &ustat);
141 if (code) return code;
144 data[0] = htonl(LOGEND);
145 data[1] = htonl(aversion->epoch);
146 data[2] = htonl(aversion->counter);
149 code = (*adbase->write)(adbase, LOGFILE, data, ustat.size, 3*sizeof(afs_int32));
150 if (code != 3*sizeof(afs_int32)) return UIOERROR;
152 /* finally sync the log */
153 code = (*adbase->sync)(adbase, LOGFILE);
157 /* log a truncate operation, never syncing */
158 udisk_LogTruncate(adbase, afile, alength)
159 struct ubik_dbase *adbase;
160 afs_int32 afile, alength; {
163 struct ubik_stat ustat;
165 /* figure out where to write */
166 code = (*adbase->stat)(adbase, LOGFILE, &ustat);
167 if (code < 0) return code;
170 data[0] = htonl(LOGTRUNCATE);
171 data[1] = htonl(afile);
172 data[2] = htonl(alength);
175 code = (*adbase->write)(adbase, LOGFILE, data, ustat.size, 3*sizeof(afs_int32));
176 if (code != 3*sizeof(afs_int32)) return UIOERROR;
180 /* write some data to the log, never syncing */
181 udisk_LogWriteData(adbase, afile, abuffer, apos, alen)
182 struct ubik_dbase *adbase;
187 struct ubik_stat ustat;
192 /* find end of log */
193 code = (*adbase->stat)(adbase, LOGFILE, &ustat);
195 if (code < 0) return code;
198 data[0] = htonl(LOGDATA);
199 data[1] = htonl(afile);
200 data[2] = htonl(apos);
201 data[3] = htonl(alen);
204 code = (*adbase->write)(adbase, LOGFILE, data, lpos, 4*sizeof(afs_int32));
205 if (code != 4*sizeof(afs_int32)) return UIOERROR;
206 lpos += 4*sizeof(afs_int32);
209 code = (*adbase->write)(adbase, LOGFILE, abuffer, lpos, alen);
210 if (code != alen) return UIOERROR;
214 static int DInit (abuffers)
216 /* Initialize the venus buffer system. */
219 Buffers = (struct buffer *) malloc(abuffers * sizeof(struct buffer));
220 bzero(Buffers, abuffers * sizeof(struct buffer));
221 BufferData = (char *) malloc(abuffers * PAGESIZE);
223 for(i=0;i<PHSIZE;i++) phTable[i] = 0;
224 for (i=0;i<abuffers;i++) {
225 /* Fill in each buffer with an empty indication. */
227 tb->lru_next = &(Buffers[i+1]);
228 tb->lru_prev = &(Buffers[i-1]);
229 tb->data = &BufferData[PAGESIZE*i];
232 Buffers[0].lru_prev = &(Buffers[abuffers-1]);
233 Buffers[abuffers-1].lru_next = &(Buffers[0]);
234 LruBuffer = &(Buffers[0]);
238 /* Take a buffer and mark it as the least recently used buffer */
239 static int Dlru(abuf)
242 if (LruBuffer == abuf)
245 /* Unthread from where it is in the list */
246 abuf->lru_next->lru_prev = abuf->lru_prev;
247 abuf->lru_prev->lru_next = abuf->lru_next;
249 /* Thread onto beginning of LRU list */
250 abuf->lru_next = LruBuffer;
251 abuf->lru_prev = LruBuffer->lru_prev;
253 LruBuffer->lru_prev->lru_next = abuf;
254 LruBuffer->lru_prev = abuf;
258 /* Take a buffer and mark it as the most recently used buffer */
259 static int Dmru(abuf)
262 if (LruBuffer == abuf) {
263 LruBuffer = LruBuffer->lru_next;
267 /* Unthread from where it is in the list */
268 abuf->lru_next->lru_prev = abuf->lru_prev;
269 abuf->lru_prev->lru_next = abuf->lru_next;
271 /* Thread onto end of LRU list - making it the MRU buffer */
272 abuf->lru_next = LruBuffer;
273 abuf->lru_prev = LruBuffer->lru_prev;
274 LruBuffer->lru_prev->lru_next = abuf;
275 LruBuffer->lru_prev = abuf;
279 /* get a pointer to a particular buffer */
280 static char *DRead(dbase, fid, page)
281 struct ubik_dbase *dbase;
284 /* Read a page from the disk. */
285 struct buffer *tb, *lastbuffer;
286 afs_int32 trys, code;
289 lastbuffer = LruBuffer->lru_prev;
291 if ((lastbuffer->page == page ) &&
292 (lastbuffer->file == fid ) &&
293 (lastbuffer->dbase == dbase)) {
299 for(tb=phTable[pHash(page)]; tb; tb=tb->hashNext) {
300 if (tb->page == page && tb->file == fid && tb->dbase == dbase) {
307 tb = newslot(dbase, fid, page);
309 bzero(tb->data, PAGESIZE);
312 code = (*dbase->read)(dbase, fid, tb->data, page*PAGESIZE, PAGESIZE);
317 ubik_print("Ubik: Error reading database file: errno=%d\n", errno);
322 /* Note that findslot sets the page field in the buffer equal to
323 * what it is searching for.
328 /* zap truncated pages */
329 static DTrunc(dbase, fid, length)
330 struct ubik_dbase *dbase;
337 maxPage = (length+PAGESIZE-1)>>LOGPAGESIZE; /* first invalid page now in file */
338 for (i=0,tb=Buffers; i<nbuffers; i++,tb++) {
339 if (tb->page >= maxPage && tb->file == fid && tb->dbase == dbase) {
347 /* allocate a truncation entry. We allocate special entries representing truncations, rather than
348 performing them immediately, so that we can abort a transaction easily by simply purging
349 the in-core memory buffers and discarding these truncation entries.
351 static struct ubik_trunc *GetTrunc() {
352 struct ubik_trunc *tt;
353 if (!freeTruncList) {
354 freeTruncList = (struct ubik_trunc *) malloc(sizeof(struct ubik_trunc));
355 freeTruncList->next = (struct ubik_trunc *) 0;
358 freeTruncList = tt->next;
362 /* free a truncation entry */
364 struct ubik_trunc *at; {
365 at->next = freeTruncList;
370 /* find a truncation entry for a file, if any */
371 static struct ubik_trunc *FindTrunc(atrans, afile)
372 struct ubik_trans *atrans;
374 struct ubik_trunc *tt;
375 for(tt=atrans->activeTruncs; tt; tt=tt->next) {
376 if (tt->file == afile) return tt;
378 return (struct ubik_trunc *) 0;
381 /* do truncates associated with trans, and free them */
382 static DoTruncs(atrans)
383 struct ubik_trans *atrans; {
384 struct ubik_trunc *tt, *nt;
386 afs_int32 rcode=0, code;
388 tproc = atrans->dbase->truncate;
389 for(tt = atrans->activeTruncs; tt; tt=nt) {
391 DTrunc(atrans->dbase, tt->file, tt->length); /* zap pages from buffer cache */
392 code = (*tproc)(atrans->dbase, tt->file, tt->length);
393 if (code) rcode = code;
396 /* don't unthread, because we do the entire list's worth here */
397 atrans->activeTruncs = (struct ubik_trunc *) 0;
401 /* mark a fid as invalid */
402 udisk_Invalidate(adbase, afid)
403 struct ubik_dbase *adbase;
408 for (i=0,tb=Buffers; i<nbuffers; i++,tb++) {
409 if (tb->file == afid) {
417 /* move this page into the correct hash bucket */
418 static FixupBucket(ap)
420 struct buffer **lp, *tp;
422 /* first try to get it out of its current hash bucket, in which it might not be */
425 for(tp = *lp; tp; tp=tp->hashNext) {
432 /* now figure the new hash bucket */
434 ap->hashIndex = i; /* remember where we are for deletion */
435 ap->hashNext = phTable[i]; /* add us to the list */
439 /* create a new slot for a particular dbase page */
440 static struct buffer *newslot (adbase, afid, apage)
441 struct ubik_dbase *adbase;
442 afs_int32 afid, apage; {
443 /* Find a usable buffer slot */
445 struct buffer *pp, *tp;
447 pp = 0; /* last pure */
448 for (i=0,tp=LruBuffer; i<nbuffers; i++,tp=tp->lru_next) {
449 if (!tp->lockers && !tp->dirty) {
456 /* There are no unlocked buffers that don't need to be written to the disk. */
457 ubik_print("Ubik: Internal Error: Unable to find free buffer in ubik cache\n");
458 return (struct buffer *) 0;
461 /* Now fill in the header. */
466 FixupBucket(pp); /* move to the right hash bucket */
471 /* Release a buffer, specifying whether or not the buffer has been modified by the locker. */
472 static DRelease (ap,flag)
479 index = (ap - (char *)BufferData) >> LOGPAGESIZE;
480 bp = &(Buffers[index]);
482 if (flag) bp->dirty=1;
486 /* flush all modified buffers, leaves dirty bits set (they're cleared
487 * by DSync). Note interaction with DSync: you call this thing first,
488 * writing the buffers to the disk. Then you call DSync to sync all the
489 * files that were written, and to clear the dirty bits. You should
490 * always call DFlush/DSync as a pair.
492 static DFlush (adbase)
493 struct ubik_dbase *adbase; {
499 for(i=0;i<nbuffers;i++,tb++) {
501 code = tb->page * PAGESIZE; /* offset within file */
502 code = (*adbase->write)(adbase, tb->file, tb->data, code, PAGESIZE);
503 if (code != PAGESIZE) return UIOERROR;
509 /* flush all modified buffers */
510 static DAbort (adbase)
511 struct ubik_dbase *adbase; {
516 for(i=0;i<nbuffers;i++,tb++) {
526 /* must only be called after DFlush, due to its interpretation of dirty flag */
528 struct ubik_dbase *adbase; {
538 for(i=0,tb = Buffers; i<nbuffers; i++,tb++) {
539 if (tb->dirty == 1) {
540 if (file == BADFID) file = tb->file;
541 if (file != BADFID && tb->file == file) tb->dirty = 0;
544 if (file == BADFID) break;
545 /* otherwise we have a file to sync */
546 code = (*adbase->sync)(adbase, file);
547 if (code) rCode = code;
552 /* Same as read, only do not even try to read the page */
553 static char *DNew (dbase, fid, page)
554 struct ubik_dbase *dbase;
559 if ((tb = newslot(dbase, fid, page)) == 0) return (char *) 0;
561 bzero(tb->data, PAGESIZE);
565 /* read data from database */
566 udisk_read(atrans, afile, abuffer, apos, alen)
569 afs_int32 apos, alen;
570 struct ubik_trans *atrans; {
572 afs_int32 offset, len, totalLen;
573 struct ubik_dbase *dbase;
575 if (atrans->flags & TRDONE) return UDONE;
577 dbase = atrans->dbase;
579 bp = DRead(dbase, afile, apos>>LOGPAGESIZE);
580 if (!bp) return UEOF;
581 /* otherwise, min of remaining bytes and end of buffer to user mode */
582 offset = apos & (PAGESIZE-1);
583 len = PAGESIZE - offset;
584 if (len > alen) len = alen;
585 bcopy(bp+offset, abuffer, len);
596 udisk_truncate(atrans, afile, alength)
597 struct ubik_trans *atrans;
601 struct ubik_trunc *tt;
603 if (atrans->flags & TRDONE) return UDONE;
604 if (atrans->type != UBIK_WRITETRANS) return UBADTYPE;
606 /* write a truncate log record */
607 code = udisk_LogTruncate(atrans->dbase, afile, alength);
609 /* don't truncate until commit time */
610 tt = FindTrunc(atrans, afile);
612 /* this file not truncated yet */
614 tt->next = atrans->activeTruncs;
615 atrans->activeTruncs = tt;
617 tt->length = alength;
620 /* already truncated to a certain length */
621 if (tt->length > alength) tt->length = alength;
626 /* write data to database, using logs */
627 udisk_write(atrans, afile, abuffer, apos, alen)
630 afs_int32 apos, alen;
631 struct ubik_trans *atrans; {
633 afs_int32 offset, len, totalLen;
634 struct ubik_dbase *dbase;
635 struct ubik_trunc *tt;
638 if (atrans->flags & TRDONE) return UDONE;
639 if (atrans->type != UBIK_WRITETRANS) return UBADTYPE;
641 dbase = atrans->dbase;
642 /* first write the data to the log */
643 code = udisk_LogWriteData(dbase, afile, abuffer, apos, alen);
644 if (code) return code;
646 /* expand any truncations of this file */
647 tt = FindTrunc(atrans, afile);
649 if (tt->length < apos + alen) {
650 tt->length = apos + alen;
657 bp = DRead(dbase, afile, apos>>LOGPAGESIZE);
659 bp = DNew(dbase, afile, apos>>LOGPAGESIZE);
660 if (!bp) return UIOERROR;
663 /* otherwise, min of remaining bytes and end of buffer to user mode */
664 offset = apos & (PAGESIZE-1);
665 len = PAGESIZE-offset;
666 if (len > alen) len = alen;
667 bcopy(abuffer, bp+offset, len);
672 DRelease(bp, 1); /* buffer modified */
677 /* begin a new local transaction */
678 udisk_begin(adbase, atype, atrans)
679 struct ubik_trans **atrans;
681 struct ubik_dbase *adbase; {
683 struct ubik_trans *tt;
685 *atrans = (struct ubik_trans *)NULL;
686 /* Make sure system is initialized before doing anything */
689 DInit(ubik_nBuffers);
691 if (atype == UBIK_WRITETRANS) {
692 if (adbase->flags & DBWRITING) return USYNC;
693 code = udisk_LogOpcode(adbase, LOGNEW, 0);
694 if (code) return code;
696 tt = (struct ubik_trans *) malloc(sizeof(struct ubik_trans));
697 bzero(tt, sizeof(struct ubik_trans));
699 tt->next = adbase->activeTrans;
700 adbase->activeTrans = tt;
702 if (atype == UBIK_READTRANS) adbase->readers++;
703 else if (atype == UBIK_WRITETRANS) adbase->flags |= DBWRITING;
708 /* commit transaction */
710 struct ubik_trans *atrans; {
711 struct ubik_dbase *dbase;
713 struct ubik_version oldversion, newversion;
715 if (atrans->flags & TRDONE)
718 if (atrans->type == UBIK_WRITETRANS) {
719 dbase = atrans->dbase;
721 /* On the first write to the database. We update the versions */
722 if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
723 oldversion = dbase->version;
724 newversion.epoch = FT_ApproxTime();;
725 newversion.counter = 1;
727 code = (*dbase->setlabel)(dbase, 0, &newversion);
728 if (code) return(code);
729 ubik_epochTime = newversion.epoch;
730 dbase->version = newversion;
732 /* Ignore the error here. If the call fails, the site is
733 * marked down and when we detect it is up again, we will
734 * send the entire database to it.
736 ContactQuorum(DISK_SetVersion, atrans, 1/*CStampVersion*/,
737 &oldversion, &newversion);
738 urecovery_state |= UBIK_RECLABELDB;
741 dbase->version.counter++; /* bump commit count */
742 LWP_NoYieldSignal(&dbase->version);
744 code = udisk_LogEnd(dbase, &dbase->version);
746 dbase->version.counter--;
750 /* If we fail anytime after this, then panic and let the
751 * recovery replay the log.
753 code = DFlush(dbase); /* write dirty pages to respective files */
754 if (code) panic("Writing Ubik DB modifications\n");
755 code = DSync(dbase); /* sync the files and mark pages not dirty */
756 if (code) panic("Synchronizing Ubik DB modifications\n");
758 code = DoTruncs(atrans); /* Perform requested truncations */
759 if (code) panic("Truncating Ubik DB\n");
761 /* label the committed dbase */
762 code = (*dbase->setlabel)(dbase, 0, &dbase->version);
763 if (code) panic("Truncating Ubik DB\n");
765 code = (*dbase->truncate)(dbase, LOGFILE, 0); /* discard log (optional) */
766 if (code) panic("Truncating Ubik logfile\n");
770 /* When the transaction is marked done, it also means the logfile
771 * has been truncated.
773 atrans->flags |= TRDONE;
777 /* abort transaction */
779 struct ubik_trans *atrans;
781 struct ubik_dbase *dbase;
784 if (atrans->flags & TRDONE)
787 /* Check if we are the write trans before logging abort, lest we
788 * abort a good write trans in progress.
789 * We don't really care if the LOGABORT gets to the log because we
790 * truncate the log next. If the truncate fails, we panic; for
791 * otherwise, the log entries remain. On restart, replay of the log
792 * will do nothing because the abort is there or no LogEnd opcode.
794 dbase = atrans->dbase;
795 if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
796 udisk_LogOpcode(dbase, LOGABORT, 1);
797 code = (*dbase->truncate)(dbase, LOGFILE, 0);
798 if (code) panic("Truncating Ubik logfile during an abort\n");
799 DAbort(dbase); /* remove all dirty pages */
802 /* When the transaction is marked done, it also means the logfile
803 * has been truncated.
805 atrans->flags |= (TRABORT | TRDONE);
809 /* destroy a transaction after it has been committed or aborted. if
810 * it hasn't committed before you call this routine, we'll abort the
811 * transaction for you.
814 struct ubik_trans *atrans; {
815 struct ubik_dbase *dbase;
817 if (!(atrans->flags & TRDONE)) udisk_abort(atrans);
818 dbase = atrans->dbase;
820 ulock_relLock(atrans);
823 /* check if we are the write trans before unsetting the DBWRITING bit, else
824 * we could be unsetting someone else's bit.
826 if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
827 dbase->flags &= ~DBWRITING;
831 if (atrans->iovec_info.iovec_wrt_val) free(atrans->iovec_info.iovec_wrt_val);
832 if (atrans->iovec_data.iovec_buf_val) free(atrans->iovec_data.iovec_buf_val);
835 /* Wakeup any writers waiting in BeginTrans() */
836 LWP_NoYieldSignal(&dbase->flags);