2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
16 #include <sys/types.h>
21 #include <netinet/in.h>
30 #define UBIK_INTERNALS
35 static struct buffer {
36 struct ubik_dbase *dbase; /* dbase within which the buffer resides */
37 afs_int32 file; /* Unique cache key */
38 afs_int32 page; /* page number */
39 struct buffer *lru_next;
40 struct buffer *lru_prev;
41 struct buffer *hashNext; /* next dude in hash table */
42 char *data; /* ptr to the data */
43 char lockers; /* usage ref count */
44 char dirty; /* is buffer modified */
45 char hashIndex; /* back ptr to hash table */
48 #define pHash(page) ((page) & (PHSIZE-1))
50 afs_int32 ubik_nBuffers = NBUFFERS;
51 static struct buffer *phTable[PHSIZE]; /* page hash table */
52 static struct buffer *LruBuffer;
54 static int calls = 0, ios = 0, lastb = 0;
55 static char *BufferData;
56 static struct buffer *newslot(struct ubik_dbase *adbase, afs_int32 afid,
59 #define BADFID 0xffffffff
61 static DTrunc(struct ubik_dbase *dbase, afs_int32 fid, afs_int32 length);
63 static struct ubik_trunc *freeTruncList = 0;
65 /* remove a transaction from the database's active transaction list. Don't free it */
67 unthread(struct ubik_trans *atrans)
69 struct ubik_trans **lt, *tt;
70 lt = &atrans->dbase->activeTrans;
71 for (tt = *lt; tt; lt = &tt->next, tt = *lt) {
78 return 2; /* no entry */
81 /* some debugging assistance */
83 udisk_Debug(struct ubik_debug *aparm)
88 memcpy(&aparm->localVersion, &ubik_dbase->version,
89 sizeof(struct ubik_version));
90 aparm->lockedPages = 0;
91 aparm->writeLockedPages = 0;
93 for (i = 0; i < nbuffers; i++, tb++) {
97 aparm->writeLockedPages++;
103 /* log format is defined here, and implicitly in recovery.c
105 * 4 byte opcode, followed by parameters, each 4 bytes long. All integers
106 * are in logged in network standard byte order, in case we want to move logs
107 * from machine-to-machine someday.
109 * Begin transaction: opcode
110 * Commit transaction: opcode, version (8 bytes)
111 * Truncate file: opcode, file number, length
112 * Abort transaction: opcode
113 * Write data: opcode, file, position, length, <length> data bytes
116 /* write an opcode to the log */
118 udisk_LogOpcode(struct ubik_dbase *adbase, afs_int32 aopcode, int async)
120 struct ubik_stat ustat;
123 /* figure out where to write */
124 code = (*adbase->stat) (adbase, LOGFILE, &ustat);
128 /* setup data and do write */
129 aopcode = htonl(aopcode);
131 (*adbase->write) (adbase, LOGFILE, (char *)&aopcode, ustat.size,
133 if (code != sizeof(afs_int32))
136 /* optionally sync data */
138 code = (*adbase->sync) (adbase, LOGFILE);
144 /* log a commit, never syncing */
146 udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion)
150 struct ubik_stat ustat;
152 /* figure out where to write */
153 code = (*adbase->stat) (adbase, LOGFILE, &ustat);
158 data[0] = htonl(LOGEND);
159 data[1] = htonl(aversion->epoch);
160 data[2] = htonl(aversion->counter);
164 (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
165 3 * sizeof(afs_int32));
166 if (code != 3 * sizeof(afs_int32))
169 /* finally sync the log */
170 code = (*adbase->sync) (adbase, LOGFILE);
174 /* log a truncate operation, never syncing */
176 udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile,
181 struct ubik_stat ustat;
183 /* figure out where to write */
184 code = (*adbase->stat) (adbase, LOGFILE, &ustat);
189 data[0] = htonl(LOGTRUNCATE);
190 data[1] = htonl(afile);
191 data[2] = htonl(alength);
195 (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
196 3 * sizeof(afs_int32));
197 if (code != 3 * sizeof(afs_int32))
202 /* write some data to the log, never syncing */
204 udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, char *abuffer,
205 afs_int32 apos, afs_int32 alen)
207 struct ubik_stat ustat;
212 /* find end of log */
213 code = (*adbase->stat) (adbase, LOGFILE, &ustat);
219 data[0] = htonl(LOGDATA);
220 data[1] = htonl(afile);
221 data[2] = htonl(apos);
222 data[3] = htonl(alen);
226 (*adbase->write) (adbase, LOGFILE, (char *)data, lpos, 4 * sizeof(afs_int32));
227 if (code != 4 * sizeof(afs_int32))
229 lpos += 4 * sizeof(afs_int32);
232 code = (*adbase->write) (adbase, LOGFILE, abuffer, lpos, alen);
241 /* Initialize the venus buffer system. */
244 Buffers = (struct buffer *)malloc(abuffers * sizeof(struct buffer));
245 memset(Buffers, 0, abuffers * sizeof(struct buffer));
246 BufferData = (char *)malloc(abuffers * UBIK_PAGESIZE);
248 for (i = 0; i < PHSIZE; i++)
250 for (i = 0; i < abuffers; i++) {
251 /* Fill in each buffer with an empty indication. */
253 tb->lru_next = &(Buffers[i + 1]);
254 tb->lru_prev = &(Buffers[i - 1]);
255 tb->data = &BufferData[UBIK_PAGESIZE * i];
258 Buffers[0].lru_prev = &(Buffers[abuffers - 1]);
259 Buffers[abuffers - 1].lru_next = &(Buffers[0]);
260 LruBuffer = &(Buffers[0]);
264 /* Take a buffer and mark it as the least recently used buffer */
266 Dlru(struct buffer *abuf)
268 if (LruBuffer == abuf)
271 /* Unthread from where it is in the list */
272 abuf->lru_next->lru_prev = abuf->lru_prev;
273 abuf->lru_prev->lru_next = abuf->lru_next;
275 /* Thread onto beginning of LRU list */
276 abuf->lru_next = LruBuffer;
277 abuf->lru_prev = LruBuffer->lru_prev;
279 LruBuffer->lru_prev->lru_next = abuf;
280 LruBuffer->lru_prev = abuf;
284 /* Take a buffer and mark it as the most recently used buffer */
286 Dmru(struct buffer *abuf)
288 if (LruBuffer == abuf) {
289 LruBuffer = LruBuffer->lru_next;
293 /* Unthread from where it is in the list */
294 abuf->lru_next->lru_prev = abuf->lru_prev;
295 abuf->lru_prev->lru_next = abuf->lru_next;
297 /* Thread onto end of LRU list - making it the MRU buffer */
298 abuf->lru_next = LruBuffer;
299 abuf->lru_prev = LruBuffer->lru_prev;
300 LruBuffer->lru_prev->lru_next = abuf;
301 LruBuffer->lru_prev = abuf;
304 /* get a pointer to a particular buffer */
306 DRead(struct ubik_dbase *dbase, afs_int32 fid, int page)
308 /* Read a page from the disk. */
309 struct buffer *tb, *lastbuffer;
313 lastbuffer = LruBuffer->lru_prev;
315 if ((lastbuffer->page == page) && (lastbuffer->file == fid)
316 && (lastbuffer->dbase == dbase)) {
322 for (tb = phTable[pHash(page)]; tb; tb = tb->hashNext) {
323 if (tb->page == page && tb->file == fid && tb->dbase == dbase) {
330 tb = newslot(dbase, fid, page);
333 memset(tb->data, 0, UBIK_PAGESIZE);
337 (*dbase->read) (dbase, fid, tb->data, page * UBIK_PAGESIZE,
343 ubik_print("Ubik: Error reading database file: errno=%d\n", errno);
348 /* Note that findslot sets the page field in the buffer equal to
349 * what it is searching for.
354 /* zap truncated pages */
356 DTrunc(struct ubik_dbase *dbase, afs_int32 fid, afs_int32 length)
362 maxPage = (length + UBIK_PAGESIZE - 1) >> UBIK_LOGPAGESIZE; /* first invalid page now in file */
363 for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
364 if (tb->page >= maxPage && tb->file == fid && tb->dbase == dbase) {
372 /* allocate a truncation entry. We allocate special entries representing truncations, rather than
373 performing them immediately, so that we can abort a transaction easily by simply purging
374 the in-core memory buffers and discarding these truncation entries.
376 static struct ubik_trunc *
379 struct ubik_trunc *tt;
380 if (!freeTruncList) {
382 (struct ubik_trunc *)malloc(sizeof(struct ubik_trunc));
383 freeTruncList->next = (struct ubik_trunc *)0;
386 freeTruncList = tt->next;
390 /* free a truncation entry */
392 PutTrunc(struct ubik_trunc *at)
394 at->next = freeTruncList;
399 /* find a truncation entry for a file, if any */
400 static struct ubik_trunc *
401 FindTrunc(struct ubik_trans *atrans, afs_int32 afile)
403 struct ubik_trunc *tt;
404 for (tt = atrans->activeTruncs; tt; tt = tt->next) {
405 if (tt->file == afile)
408 return (struct ubik_trunc *)0;
411 /* do truncates associated with trans, and free them */
413 DoTruncs(struct ubik_trans *atrans)
415 struct ubik_trunc *tt, *nt;
417 afs_int32 rcode = 0, code;
419 tproc = atrans->dbase->truncate;
420 for (tt = atrans->activeTruncs; tt; tt = nt) {
422 DTrunc(atrans->dbase, tt->file, tt->length); /* zap pages from buffer cache */
423 code = (*tproc) (atrans->dbase, tt->file, tt->length);
428 /* don't unthread, because we do the entire list's worth here */
429 atrans->activeTruncs = (struct ubik_trunc *)0;
433 /* mark a fid as invalid */
435 udisk_Invalidate(struct ubik_dbase *adbase, afs_int32 afid)
440 for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
441 if (tb->file == afid) {
449 /* move this page into the correct hash bucket */
451 FixupBucket(struct buffer *ap)
453 struct buffer **lp, *tp;
455 /* first try to get it out of its current hash bucket, in which it might not be */
458 for (tp = *lp; tp; tp = tp->hashNext) {
465 /* now figure the new hash bucket */
467 ap->hashIndex = i; /* remember where we are for deletion */
468 ap->hashNext = phTable[i]; /* add us to the list */
473 /* create a new slot for a particular dbase page */
474 static struct buffer *
475 newslot(struct ubik_dbase *adbase, afs_int32 afid, afs_int32 apage)
477 /* Find a usable buffer slot */
479 struct buffer *pp, *tp;
481 pp = 0; /* last pure */
482 for (i = 0, tp = LruBuffer; i < nbuffers; i++, tp = tp->lru_next) {
483 if (!tp->lockers && !tp->dirty) {
490 /* There are no unlocked buffers that don't need to be written to the disk. */
492 ("Ubik: Internal Error: Unable to find free buffer in ubik cache\n");
496 /* Now fill in the header. */
501 FixupBucket(pp); /* move to the right hash bucket */
506 /* Release a buffer, specifying whether or not the buffer has been modified by the locker. */
508 DRelease(char *ap, int flag)
515 index = (int)(ap - (char *)BufferData) >> UBIK_LOGPAGESIZE;
516 bp = &(Buffers[index]);
523 /* flush all modified buffers, leaves dirty bits set (they're cleared
524 * by DSync). Note interaction with DSync: you call this thing first,
525 * writing the buffers to the disk. Then you call DSync to sync all the
526 * files that were written, and to clear the dirty bits. You should
527 * always call DFlush/DSync as a pair.
530 DFlush(struct ubik_dbase *adbase)
537 for (i = 0; i < nbuffers; i++, tb++) {
539 code = tb->page * UBIK_PAGESIZE; /* offset within file */
541 (*adbase->write) (adbase, tb->file, tb->data, code,
543 if (code != UBIK_PAGESIZE)
550 /* flush all modified buffers */
552 DAbort(struct ubik_dbase *adbase)
558 for (i = 0; i < nbuffers; i++, tb++) {
568 /* must only be called after DFlush, due to its interpretation of dirty flag */
570 DSync(struct ubik_dbase *adbase)
581 for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
582 if (tb->dirty == 1) {
585 if (file != BADFID && tb->file == file)
591 /* otherwise we have a file to sync */
592 code = (*adbase->sync) (adbase, file);
599 /* Same as read, only do not even try to read the page */
601 DNew(struct ubik_dbase *dbase, afs_int32 fid, int page)
605 if ((tb = newslot(dbase, fid, page)) == 0)
608 memset(tb->data, 0, UBIK_PAGESIZE);
612 /* read data from database */
614 udisk_read(struct ubik_trans *atrans, afs_int32 afile, char *abuffer,
615 afs_int32 apos, afs_int32 alen)
618 afs_int32 offset, len, totalLen;
619 struct ubik_dbase *dbase;
621 if (atrans->flags & TRDONE)
624 dbase = atrans->dbase;
626 bp = DRead(dbase, afile, apos >> UBIK_LOGPAGESIZE);
629 /* otherwise, min of remaining bytes and end of buffer to user mode */
630 offset = apos & (UBIK_PAGESIZE - 1);
631 len = UBIK_PAGESIZE - offset;
634 memcpy(abuffer, bp + offset, len);
646 udisk_truncate(struct ubik_trans *atrans, afs_int32 afile, afs_int32 alength)
649 struct ubik_trunc *tt;
651 if (atrans->flags & TRDONE)
653 if (atrans->type != UBIK_WRITETRANS)
656 /* write a truncate log record */
657 code = udisk_LogTruncate(atrans->dbase, afile, alength);
659 /* don't truncate until commit time */
660 tt = FindTrunc(atrans, afile);
662 /* this file not truncated yet */
664 tt->next = atrans->activeTruncs;
665 atrans->activeTruncs = tt;
667 tt->length = alength;
669 /* already truncated to a certain length */
670 if (tt->length > alength)
671 tt->length = alength;
676 /* write data to database, using logs */
678 udisk_write(struct ubik_trans *atrans, afs_int32 afile, char *abuffer,
679 afs_int32 apos, afs_int32 alen)
682 afs_int32 offset, len, totalLen;
683 struct ubik_dbase *dbase;
684 struct ubik_trunc *tt;
687 if (atrans->flags & TRDONE)
689 if (atrans->type != UBIK_WRITETRANS)
692 dbase = atrans->dbase;
693 /* first write the data to the log */
694 code = udisk_LogWriteData(dbase, afile, abuffer, apos, alen);
698 /* expand any truncations of this file */
699 tt = FindTrunc(atrans, afile);
701 if (tt->length < apos + alen) {
702 tt->length = apos + alen;
709 bp = DRead(dbase, afile, apos >> UBIK_LOGPAGESIZE);
711 bp = DNew(dbase, afile, apos >> UBIK_LOGPAGESIZE);
714 memset(bp, 0, UBIK_PAGESIZE);
716 /* otherwise, min of remaining bytes and end of buffer to user mode */
717 offset = apos & (UBIK_PAGESIZE - 1);
718 len = UBIK_PAGESIZE - offset;
721 memcpy(bp + offset, abuffer, len);
726 DRelease(bp, 1); /* buffer modified */
731 /* begin a new local transaction */
733 udisk_begin(struct ubik_dbase *adbase, int atype, struct ubik_trans **atrans)
736 struct ubik_trans *tt;
738 *atrans = (struct ubik_trans *)NULL;
739 /* Make sure system is initialized before doing anything */
742 DInit(ubik_nBuffers);
744 if (atype == UBIK_WRITETRANS) {
745 if (adbase->flags & DBWRITING)
747 code = udisk_LogOpcode(adbase, LOGNEW, 0);
751 tt = (struct ubik_trans *)malloc(sizeof(struct ubik_trans));
752 memset(tt, 0, sizeof(struct ubik_trans));
754 tt->next = adbase->activeTrans;
755 adbase->activeTrans = tt;
757 if (atype == UBIK_READTRANS)
759 else if (atype == UBIK_WRITETRANS)
760 adbase->flags |= DBWRITING;
765 /* commit transaction */
767 udisk_commit(struct ubik_trans *atrans)
769 struct ubik_dbase *dbase;
771 struct ubik_version oldversion, newversion;
773 if (atrans->flags & TRDONE)
776 if (atrans->type == UBIK_WRITETRANS) {
777 dbase = atrans->dbase;
779 /* On the first write to the database. We update the versions */
780 if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
781 oldversion = dbase->version;
782 newversion.epoch = FT_ApproxTime();;
783 newversion.counter = 1;
785 code = (*dbase->setlabel) (dbase, 0, &newversion);
788 ubik_epochTime = newversion.epoch;
789 dbase->version = newversion;
791 /* Ignore the error here. If the call fails, the site is
792 * marked down and when we detect it is up again, we will
793 * send the entire database to it.
795 ContactQuorum(DISK_SetVersion, atrans, 1 /*CStampVersion */ ,
796 &oldversion, &newversion);
797 urecovery_state |= UBIK_RECLABELDB;
800 dbase->version.counter++; /* bump commit count */
801 #if defined(AFS_PTHREAD_ENV) && defined(UBIK_PTHREAD_ENV)
802 assert(pthread_cond_broadcast(&dbase->version_cond) == 0);
804 LWP_NoYieldSignal(&dbase->version);
806 code = udisk_LogEnd(dbase, &dbase->version);
808 dbase->version.counter--;
812 /* If we fail anytime after this, then panic and let the
813 * recovery replay the log.
815 code = DFlush(dbase); /* write dirty pages to respective files */
817 panic("Writing Ubik DB modifications\n");
818 code = DSync(dbase); /* sync the files and mark pages not dirty */
820 panic("Synchronizing Ubik DB modifications\n");
822 code = DoTruncs(atrans); /* Perform requested truncations */
824 panic("Truncating Ubik DB\n");
826 /* label the committed dbase */
827 code = (*dbase->setlabel) (dbase, 0, &dbase->version);
829 panic("Truncating Ubik DB\n");
831 code = (*dbase->truncate) (dbase, LOGFILE, 0); /* discard log (optional) */
833 panic("Truncating Ubik logfile\n");
837 /* When the transaction is marked done, it also means the logfile
838 * has been truncated.
840 atrans->flags |= TRDONE;
844 /* abort transaction */
846 udisk_abort(struct ubik_trans *atrans)
848 struct ubik_dbase *dbase;
851 if (atrans->flags & TRDONE)
854 /* Check if we are the write trans before logging abort, lest we
855 * abort a good write trans in progress.
856 * We don't really care if the LOGABORT gets to the log because we
857 * truncate the log next. If the truncate fails, we panic; for
858 * otherwise, the log entries remain. On restart, replay of the log
859 * will do nothing because the abort is there or no LogEnd opcode.
861 dbase = atrans->dbase;
862 if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
863 udisk_LogOpcode(dbase, LOGABORT, 1);
864 code = (*dbase->truncate) (dbase, LOGFILE, 0);
866 panic("Truncating Ubik logfile during an abort\n");
867 DAbort(dbase); /* remove all dirty pages */
870 /* When the transaction is marked done, it also means the logfile
871 * has been truncated.
873 atrans->flags |= (TRABORT | TRDONE);
877 /* destroy a transaction after it has been committed or aborted. if
878 * it hasn't committed before you call this routine, we'll abort the
879 * transaction for you.
882 udisk_end(struct ubik_trans *atrans)
884 struct ubik_dbase *dbase;
886 #if defined(UBIK_PAUSE)
887 /* Another thread is trying to lock this transaction.
888 * That can only be an RPC doing SDISK_Lock.
889 * Unlock the transaction, 'cause otherwise the other
890 * thread will never wake up. Don't free it because
891 * the caller will do that already.
893 if (atrans->flags & TRSETLOCK) {
894 atrans->flags |= TRSTALE;
895 ulock_relLock(atrans);
898 #endif /* UBIK_PAUSE */
899 if (!(atrans->flags & TRDONE))
901 dbase = atrans->dbase;
903 ulock_relLock(atrans);
906 /* check if we are the write trans before unsetting the DBWRITING bit, else
907 * we could be unsetting someone else's bit.
909 if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
910 dbase->flags &= ~DBWRITING;
914 if (atrans->iovec_info.iovec_wrt_val)
915 free(atrans->iovec_info.iovec_wrt_val);
916 if (atrans->iovec_data.iovec_buf_val)
917 free(atrans->iovec_data.iovec_buf_val);
920 /* Wakeup any writers waiting in BeginTrans() */
921 #if defined(AFS_PTHREAD_ENV) && defined(UBIK_PTHREAD_ENV)
922 assert(pthread_cond_broadcast(&dbase->flags_cond) == 0);
924 LWP_NoYieldSignal(&dbase->flags);