2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
16 #include <sys/types.h>
21 #include <netinet/in.h>
30 #define UBIK_INTERNALS
35 static struct buffer {
36 struct ubik_dbase *dbase; /*!< dbase within which the buffer resides */
37 afs_int32 file; /*!< Unique cache key */
38 afs_int32 page; /*!< page number */
39 struct buffer *lru_next;
40 struct buffer *lru_prev;
41 struct buffer *hashNext; /*!< next dude in hash table */
42 char *data; /*!< ptr to the data */
43 char lockers; /*!< usage ref count */
44 char dirty; /*!< is buffer modified */
45 char hashIndex; /*!< back ptr to hash table */
48 #define pHash(page) ((page) & (PHSIZE-1))
50 afs_int32 ubik_nBuffers = NBUFFERS;
51 static struct buffer *phTable[PHSIZE]; /*!< page hash table */
52 static struct buffer *LruBuffer;
54 static int calls = 0, ios = 0, lastb = 0;
55 static char *BufferData;
56 static struct buffer *newslot(struct ubik_dbase *adbase, afs_int32 afid,
59 #define BADFID 0xffffffff
61 static DTrunc(struct ubik_dbase *dbase, afs_int32 fid, afs_int32 length);
63 static struct ubik_trunc *freeTruncList = 0;
66 * \brief Remove a transaction from the database's active transaction list. Don't free it.
69 unthread(struct ubik_trans *atrans)
71 struct ubik_trans **lt, *tt;
72 lt = &atrans->dbase->activeTrans;
73 for (tt = *lt; tt; lt = &tt->next, tt = *lt) {
80 return 2; /* no entry */
84 * \brief some debugging assistance
87 udisk_Debug(struct ubik_debug *aparm)
92 memcpy(&aparm->localVersion, &ubik_dbase->version,
93 sizeof(struct ubik_version));
94 aparm->lockedPages = 0;
95 aparm->writeLockedPages = 0;
97 for (i = 0; i < nbuffers; i++, tb++) {
101 aparm->writeLockedPages++;
108 * \brief Write an opcode to the log.
110 * log format is defined here, and implicitly in recovery.c
112 * 4 byte opcode, followed by parameters, each 4 bytes long. All integers
113 * are in logged in network standard byte order, in case we want to move logs
114 * from machine-to-machine someday.
116 * Begin transaction: opcode \n
117 * Commit transaction: opcode, version (8 bytes) \n
118 * Truncate file: opcode, file number, length \n
119 * Abort transaction: opcode \n
120 * Write data: opcode, file, position, length, <length> data bytes \n
123 udisk_LogOpcode(struct ubik_dbase *adbase, afs_int32 aopcode, int async)
125 struct ubik_stat ustat;
128 /* figure out where to write */
129 code = (*adbase->stat) (adbase, LOGFILE, &ustat);
133 /* setup data and do write */
134 aopcode = htonl(aopcode);
136 (*adbase->write) (adbase, LOGFILE, (char *)&aopcode, ustat.size,
138 if (code != sizeof(afs_int32))
141 /* optionally sync data */
143 code = (*adbase->sync) (adbase, LOGFILE);
150 * \brief Log a commit, never syncing.
153 udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion)
157 struct ubik_stat ustat;
159 /* figure out where to write */
160 code = (*adbase->stat) (adbase, LOGFILE, &ustat);
165 data[0] = htonl(LOGEND);
166 data[1] = htonl(aversion->epoch);
167 data[2] = htonl(aversion->counter);
171 (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
172 3 * sizeof(afs_int32));
173 if (code != 3 * sizeof(afs_int32))
176 /* finally sync the log */
177 code = (*adbase->sync) (adbase, LOGFILE);
182 * \brief Log a truncate operation, never syncing.
185 udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile,
190 struct ubik_stat ustat;
192 /* figure out where to write */
193 code = (*adbase->stat) (adbase, LOGFILE, &ustat);
198 data[0] = htonl(LOGTRUNCATE);
199 data[1] = htonl(afile);
200 data[2] = htonl(alength);
204 (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
205 3 * sizeof(afs_int32));
206 if (code != 3 * sizeof(afs_int32))
212 * \brief Write some data to the log, never syncing.
215 udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, char *abuffer,
216 afs_int32 apos, afs_int32 alen)
218 struct ubik_stat ustat;
223 /* find end of log */
224 code = (*adbase->stat) (adbase, LOGFILE, &ustat);
230 data[0] = htonl(LOGDATA);
231 data[1] = htonl(afile);
232 data[2] = htonl(apos);
233 data[3] = htonl(alen);
237 (*adbase->write) (adbase, LOGFILE, (char *)data, lpos, 4 * sizeof(afs_int32));
238 if (code != 4 * sizeof(afs_int32))
240 lpos += 4 * sizeof(afs_int32);
243 code = (*adbase->write) (adbase, LOGFILE, abuffer, lpos, alen);
252 /* Initialize the venus buffer system. */
255 Buffers = (struct buffer *)malloc(abuffers * sizeof(struct buffer));
256 memset(Buffers, 0, abuffers * sizeof(struct buffer));
257 BufferData = (char *)malloc(abuffers * UBIK_PAGESIZE);
259 for (i = 0; i < PHSIZE; i++)
261 for (i = 0; i < abuffers; i++) {
262 /* Fill in each buffer with an empty indication. */
264 tb->lru_next = &(Buffers[i + 1]);
265 tb->lru_prev = &(Buffers[i - 1]);
266 tb->data = &BufferData[UBIK_PAGESIZE * i];
269 Buffers[0].lru_prev = &(Buffers[abuffers - 1]);
270 Buffers[abuffers - 1].lru_next = &(Buffers[0]);
271 LruBuffer = &(Buffers[0]);
276 * \brief Take a buffer and mark it as the least recently used buffer.
279 Dlru(struct buffer *abuf)
281 if (LruBuffer == abuf)
284 /* Unthread from where it is in the list */
285 abuf->lru_next->lru_prev = abuf->lru_prev;
286 abuf->lru_prev->lru_next = abuf->lru_next;
288 /* Thread onto beginning of LRU list */
289 abuf->lru_next = LruBuffer;
290 abuf->lru_prev = LruBuffer->lru_prev;
292 LruBuffer->lru_prev->lru_next = abuf;
293 LruBuffer->lru_prev = abuf;
298 * \brief Take a buffer and mark it as the most recently used buffer.
301 Dmru(struct buffer *abuf)
303 if (LruBuffer == abuf) {
304 LruBuffer = LruBuffer->lru_next;
308 /* Unthread from where it is in the list */
309 abuf->lru_next->lru_prev = abuf->lru_prev;
310 abuf->lru_prev->lru_next = abuf->lru_next;
312 /* Thread onto end of LRU list - making it the MRU buffer */
313 abuf->lru_next = LruBuffer;
314 abuf->lru_prev = LruBuffer->lru_prev;
315 LruBuffer->lru_prev->lru_next = abuf;
316 LruBuffer->lru_prev = abuf;
320 * \brief Get a pointer to a particular buffer.
323 DRead(struct ubik_dbase *dbase, afs_int32 fid, int page)
325 /* Read a page from the disk. */
326 struct buffer *tb, *lastbuffer;
330 lastbuffer = LruBuffer->lru_prev;
332 if ((lastbuffer->page == page) && (lastbuffer->file == fid)
333 && (lastbuffer->dbase == dbase)) {
339 for (tb = phTable[pHash(page)]; tb; tb = tb->hashNext) {
340 if (tb->page == page && tb->file == fid && tb->dbase == dbase) {
347 tb = newslot(dbase, fid, page);
350 memset(tb->data, 0, UBIK_PAGESIZE);
354 (*dbase->read) (dbase, fid, tb->data, page * UBIK_PAGESIZE,
360 ubik_print("Ubik: Error reading database file: errno=%d\n", errno);
365 /* Note that findslot sets the page field in the buffer equal to
366 * what it is searching for.
372 * \brief Zap truncated pages.
375 DTrunc(struct ubik_dbase *dbase, afs_int32 fid, afs_int32 length)
381 maxPage = (length + UBIK_PAGESIZE - 1) >> UBIK_LOGPAGESIZE; /* first invalid page now in file */
382 for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
383 if (tb->page >= maxPage && tb->file == fid && tb->dbase == dbase) {
392 * \brief Allocate a truncation entry.
394 * We allocate special entries representing truncations, rather than
395 * performing them immediately, so that we can abort a transaction easily by simply purging
396 * the in-core memory buffers and discarding these truncation entries.
398 static struct ubik_trunc *
401 struct ubik_trunc *tt;
402 if (!freeTruncList) {
404 (struct ubik_trunc *)malloc(sizeof(struct ubik_trunc));
405 freeTruncList->next = (struct ubik_trunc *)0;
408 freeTruncList = tt->next;
413 * \brief Free a truncation entry.
416 PutTrunc(struct ubik_trunc *at)
418 at->next = freeTruncList;
424 * \brief Find a truncation entry for a file, if any.
426 static struct ubik_trunc *
427 FindTrunc(struct ubik_trans *atrans, afs_int32 afile)
429 struct ubik_trunc *tt;
430 for (tt = atrans->activeTruncs; tt; tt = tt->next) {
431 if (tt->file == afile)
434 return (struct ubik_trunc *)0;
438 * \brief Do truncates associated with \p atrans, and free them.
441 DoTruncs(struct ubik_trans *atrans)
443 struct ubik_trunc *tt, *nt;
445 afs_int32 rcode = 0, code;
447 tproc = atrans->dbase->truncate;
448 for (tt = atrans->activeTruncs; tt; tt = nt) {
450 DTrunc(atrans->dbase, tt->file, tt->length); /* zap pages from buffer cache */
451 code = (*tproc) (atrans->dbase, tt->file, tt->length);
456 /* don't unthread, because we do the entire list's worth here */
457 atrans->activeTruncs = (struct ubik_trunc *)0;
462 * \brief Mark an \p fid as invalid.
465 udisk_Invalidate(struct ubik_dbase *adbase, afs_int32 afid)
470 for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
471 if (tb->file == afid) {
480 * \brief Move this page into the correct hash bucket.
483 FixupBucket(struct buffer *ap)
485 struct buffer **lp, *tp;
487 /* first try to get it out of its current hash bucket, in which it might not be */
490 for (tp = *lp; tp; tp = tp->hashNext) {
497 /* now figure the new hash bucket */
499 ap->hashIndex = i; /* remember where we are for deletion */
500 ap->hashNext = phTable[i]; /* add us to the list */
506 * \brief Create a new slot for a particular dbase page.
508 static struct buffer *
509 newslot(struct ubik_dbase *adbase, afs_int32 afid, afs_int32 apage)
511 /* Find a usable buffer slot */
513 struct buffer *pp, *tp;
515 pp = 0; /* last pure */
516 for (i = 0, tp = LruBuffer; i < nbuffers; i++, tp = tp->lru_next) {
517 if (!tp->lockers && !tp->dirty) {
524 /* There are no unlocked buffers that don't need to be written to the disk. */
526 ("Ubik: Internal Error: Unable to find free buffer in ubik cache\n");
530 /* Now fill in the header. */
535 FixupBucket(pp); /* move to the right hash bucket */
541 * \brief Release a buffer, specifying whether or not the buffer has been modified by the locker.
544 DRelease(char *ap, int flag)
551 index = (int)(ap - (char *)BufferData) >> UBIK_LOGPAGESIZE;
552 bp = &(Buffers[index]);
560 * \brief Flush all modified buffers, leaves dirty bits set (they're cleared
563 * \note Note interaction with DSync(): you call this thing first,
564 * writing the buffers to the disk. Then you call DSync() to sync all the
565 * files that were written, and to clear the dirty bits. You should
566 * always call DFlush/DSync as a pair.
569 DFlush(struct ubik_dbase *adbase)
576 for (i = 0; i < nbuffers; i++, tb++) {
578 code = tb->page * UBIK_PAGESIZE; /* offset within file */
580 (*adbase->write) (adbase, tb->file, tb->data, code,
582 if (code != UBIK_PAGESIZE)
590 * \brief Flush all modified buffers.
593 DAbort(struct ubik_dbase *adbase)
599 for (i = 0; i < nbuffers; i++, tb++) {
610 * \attention DSync() must only be called after DFlush(), due to its interpretation of dirty flag.
613 DSync(struct ubik_dbase *adbase)
624 for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
625 if (tb->dirty == 1) {
628 if (file != BADFID && tb->file == file)
634 /* otherwise we have a file to sync */
635 code = (*adbase->sync) (adbase, file);
643 * \brief Same as DRead(), only do not even try to read the page.
646 DNew(struct ubik_dbase *dbase, afs_int32 fid, int page)
650 if ((tb = newslot(dbase, fid, page)) == 0)
653 memset(tb->data, 0, UBIK_PAGESIZE);
658 * \brief Read data from database.
661 udisk_read(struct ubik_trans *atrans, afs_int32 afile, char *abuffer,
662 afs_int32 apos, afs_int32 alen)
665 afs_int32 offset, len, totalLen;
666 struct ubik_dbase *dbase;
668 if (atrans->flags & TRDONE)
671 dbase = atrans->dbase;
673 bp = DRead(dbase, afile, apos >> UBIK_LOGPAGESIZE);
676 /* otherwise, min of remaining bytes and end of buffer to user mode */
677 offset = apos & (UBIK_PAGESIZE - 1);
678 len = UBIK_PAGESIZE - offset;
681 memcpy(abuffer, bp + offset, len);
692 * \brief Truncate file.
695 udisk_truncate(struct ubik_trans *atrans, afs_int32 afile, afs_int32 alength)
698 struct ubik_trunc *tt;
700 if (atrans->flags & TRDONE)
702 if (atrans->type != UBIK_WRITETRANS)
705 /* write a truncate log record */
706 code = udisk_LogTruncate(atrans->dbase, afile, alength);
708 /* don't truncate until commit time */
709 tt = FindTrunc(atrans, afile);
711 /* this file not truncated yet */
713 tt->next = atrans->activeTruncs;
714 atrans->activeTruncs = tt;
716 tt->length = alength;
718 /* already truncated to a certain length */
719 if (tt->length > alength)
720 tt->length = alength;
726 * \brief Write data to database, using logs.
729 udisk_write(struct ubik_trans *atrans, afs_int32 afile, char *abuffer,
730 afs_int32 apos, afs_int32 alen)
733 afs_int32 offset, len, totalLen;
734 struct ubik_dbase *dbase;
735 struct ubik_trunc *tt;
738 if (atrans->flags & TRDONE)
740 if (atrans->type != UBIK_WRITETRANS)
743 dbase = atrans->dbase;
744 /* first write the data to the log */
745 code = udisk_LogWriteData(dbase, afile, abuffer, apos, alen);
749 /* expand any truncations of this file */
750 tt = FindTrunc(atrans, afile);
752 if (tt->length < apos + alen) {
753 tt->length = apos + alen;
760 bp = DRead(dbase, afile, apos >> UBIK_LOGPAGESIZE);
762 bp = DNew(dbase, afile, apos >> UBIK_LOGPAGESIZE);
765 memset(bp, 0, UBIK_PAGESIZE);
767 /* otherwise, min of remaining bytes and end of buffer to user mode */
768 offset = apos & (UBIK_PAGESIZE - 1);
769 len = UBIK_PAGESIZE - offset;
772 memcpy(bp + offset, abuffer, len);
777 DRelease(bp, 1); /* buffer modified */
783 * \brief Begin a new local transaction.
786 udisk_begin(struct ubik_dbase *adbase, int atype, struct ubik_trans **atrans)
789 struct ubik_trans *tt;
791 *atrans = (struct ubik_trans *)NULL;
792 /* Make sure system is initialized before doing anything */
795 DInit(ubik_nBuffers);
797 if (atype == UBIK_WRITETRANS) {
798 if (adbase->flags & DBWRITING)
800 code = udisk_LogOpcode(adbase, LOGNEW, 0);
804 tt = (struct ubik_trans *)malloc(sizeof(struct ubik_trans));
805 memset(tt, 0, sizeof(struct ubik_trans));
807 tt->next = adbase->activeTrans;
808 adbase->activeTrans = tt;
810 if (atype == UBIK_READTRANS)
812 else if (atype == UBIK_WRITETRANS)
813 adbase->flags |= DBWRITING;
819 * \brief Commit transaction.
822 udisk_commit(struct ubik_trans *atrans)
824 struct ubik_dbase *dbase;
826 struct ubik_version oldversion, newversion;
828 if (atrans->flags & TRDONE)
831 if (atrans->type == UBIK_WRITETRANS) {
832 dbase = atrans->dbase;
834 /* On the first write to the database. We update the versions */
835 if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
836 oldversion = dbase->version;
837 newversion.epoch = FT_ApproxTime();;
838 newversion.counter = 1;
840 code = (*dbase->setlabel) (dbase, 0, &newversion);
843 ubik_epochTime = newversion.epoch;
844 dbase->version = newversion;
846 /* Ignore the error here. If the call fails, the site is
847 * marked down and when we detect it is up again, we will
848 * send the entire database to it.
850 ContactQuorum(DISK_SetVersion, atrans, 1 /*CStampVersion */ ,
851 &oldversion, &newversion);
852 urecovery_state |= UBIK_RECLABELDB;
855 dbase->version.counter++; /* bump commit count */
856 #ifdef AFS_PTHREAD_ENV
857 assert(pthread_cond_broadcast(&dbase->version_cond) == 0);
859 LWP_NoYieldSignal(&dbase->version);
861 code = udisk_LogEnd(dbase, &dbase->version);
863 dbase->version.counter--;
867 /* If we fail anytime after this, then panic and let the
868 * recovery replay the log.
870 code = DFlush(dbase); /* write dirty pages to respective files */
872 panic("Writing Ubik DB modifications\n");
873 code = DSync(dbase); /* sync the files and mark pages not dirty */
875 panic("Synchronizing Ubik DB modifications\n");
877 code = DoTruncs(atrans); /* Perform requested truncations */
879 panic("Truncating Ubik DB\n");
881 /* label the committed dbase */
882 code = (*dbase->setlabel) (dbase, 0, &dbase->version);
884 panic("Truncating Ubik DB\n");
886 code = (*dbase->truncate) (dbase, LOGFILE, 0); /* discard log (optional) */
888 panic("Truncating Ubik logfile\n");
892 /* When the transaction is marked done, it also means the logfile
893 * has been truncated.
895 atrans->flags |= TRDONE;
900 * \brief Abort transaction.
903 udisk_abort(struct ubik_trans *atrans)
905 struct ubik_dbase *dbase;
908 if (atrans->flags & TRDONE)
911 /* Check if we are the write trans before logging abort, lest we
912 * abort a good write trans in progress.
913 * We don't really care if the LOGABORT gets to the log because we
914 * truncate the log next. If the truncate fails, we panic; for
915 * otherwise, the log entries remain. On restart, replay of the log
916 * will do nothing because the abort is there or no LogEnd opcode.
918 dbase = atrans->dbase;
919 if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
920 udisk_LogOpcode(dbase, LOGABORT, 1);
921 code = (*dbase->truncate) (dbase, LOGFILE, 0);
923 panic("Truncating Ubik logfile during an abort\n");
924 DAbort(dbase); /* remove all dirty pages */
927 /* When the transaction is marked done, it also means the logfile
928 * has been truncated.
930 atrans->flags |= (TRABORT | TRDONE);
935 * \brief Destroy a transaction after it has been committed or aborted.
937 * If it hasn't committed before you call this routine, we'll abort the
938 * transaction for you.
941 udisk_end(struct ubik_trans *atrans)
943 struct ubik_dbase *dbase;
945 #if defined(UBIK_PAUSE)
946 /* Another thread is trying to lock this transaction.
947 * That can only be an RPC doing SDISK_Lock.
948 * Unlock the transaction, 'cause otherwise the other
949 * thread will never wake up. Don't free it because
950 * the caller will do that already.
952 if (atrans->flags & TRSETLOCK) {
953 atrans->flags |= TRSTALE;
954 ulock_relLock(atrans);
957 #endif /* UBIK_PAUSE */
958 if (!(atrans->flags & TRDONE))
960 dbase = atrans->dbase;
962 ulock_relLock(atrans);
965 /* check if we are the write trans before unsetting the DBWRITING bit, else
966 * we could be unsetting someone else's bit.
968 if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
969 dbase->flags &= ~DBWRITING;
973 if (atrans->iovec_info.iovec_wrt_val)
974 free(atrans->iovec_info.iovec_wrt_val);
975 if (atrans->iovec_data.iovec_buf_val)
976 free(atrans->iovec_data.iovec_buf_val);
979 /* Wakeup any writers waiting in BeginTrans() */
980 #ifdef AFS_PTHREAD_ENV
981 assert(pthread_cond_broadcast(&dbase->flags_cond) == 0);
983 LWP_NoYieldSignal(&dbase->flags);