From 800318b43fdf461ad95cd7f3940718f3f0a609a7 Mon Sep 17 00:00:00 2001 From: Andrew Deason Date: Thu, 10 May 2018 16:22:52 -0500 Subject: [PATCH] ubik: Buffer log writes with stdio Currently, when we write ubik i/o operations to the db log, we tend to issue several syscalls involving small writes and fstat()s. This is because each "log" operation involves at least one write, and each log operation tends to be pretty small. Each logged operation hitting disk separately is unnecessary, since the db log does not need to hit the disk at all until we are ready to commit the transaction. So to reduce the number of syscalls when writing to the db, change our log writes to be buffered in memory (using stdio calls). This also avoids needing to fstat() the underlying log file, since we open the underlying file in append-only mode, since we only ever append to (and truncate) the log file. To implement this, we introduce a new 'buffered_append' phys operation, to explicitly separate our buffered and non-buffered operations, to try to avoid any bugs from mixing buffered and non-buffered i/o. This new operation is only used for the db log. Change-Id: I5596117c6c71ab7c2d552f71b0ef038f387e358a Reviewed-on: https://gerrit.openafs.org/13070 Reviewed-by: Mark Vitale Reviewed-by: Michael Meffie Reviewed-by: Joe Gorse Reviewed-by: Benjamin Kaduk Reviewed-by: Marcio Brito Barbosa Tested-by: BuildBot --- src/ubik/disk.c | 41 ++++------------------------- src/ubik/phys.c | 78 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/ubik/ubik.c | 1 + src/ubik/ubik.p.h | 3 +++ 4 files changed, 87 insertions(+), 36 deletions(-) diff --git a/src/ubik/disk.c b/src/ubik/disk.c index 24d6a68..ed2b55b 100644 --- a/src/ubik/disk.c +++ b/src/ubik/disk.c @@ -113,19 +113,11 @@ udisk_Debug(struct ubik_debug *aparm) static int udisk_LogOpcode(struct ubik_dbase *adbase, afs_int32 aopcode, int async) { - struct ubik_stat ustat; afs_int32 code; - /* figure out where to write */ - code = (*adbase->stat) (adbase, LOGFILE, &ustat); - if (code < 0) - return code; - /* setup data and do write */ aopcode = htonl(aopcode); - code = - (*adbase->write) (adbase, LOGFILE, (char *)&aopcode, ustat.size, - sizeof(afs_int32)); + code = (*adbase->buffered_append)(adbase, LOGFILE, &aopcode, sizeof(afs_int32)); if (code != sizeof(afs_int32)) return UIOERROR; @@ -145,12 +137,6 @@ udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion) { afs_int32 code; afs_int32 data[3]; - struct ubik_stat ustat; - - /* figure out where to write */ - code = (*adbase->stat) (adbase, LOGFILE, &ustat); - if (code) - return code; /* setup data */ data[0] = htonl(LOGEND); @@ -159,8 +145,7 @@ udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion) /* do write */ code = - (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size, - 3 * sizeof(afs_int32)); + (*adbase->buffered_append)(adbase, LOGFILE, data, 3 * sizeof(afs_int32)); if (code != 3 * sizeof(afs_int32)) return UIOERROR; @@ -178,12 +163,6 @@ udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile, { afs_int32 code; afs_int32 data[3]; - struct ubik_stat ustat; - - /* figure out where to write */ - code = (*adbase->stat) (adbase, LOGFILE, &ustat); - if (code < 0) - return code; /* setup data */ data[0] = htonl(LOGTRUNCATE); @@ -192,8 +171,7 @@ udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile, /* do write */ code = - (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size, - 3 * sizeof(afs_int32)); + (*adbase->buffered_append)(adbase, LOGFILE, data, 3 * sizeof(afs_int32)); if (code != 3 * sizeof(afs_int32)) return UIOERROR; return 0; @@ -206,16 +184,8 @@ static int udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, void *abuffer, afs_int32 apos, afs_int32 alen) { - struct ubik_stat ustat; afs_int32 code; afs_int32 data[4]; - afs_int32 lpos; - - /* find end of log */ - code = (*adbase->stat) (adbase, LOGFILE, &ustat); - lpos = ustat.size; - if (code < 0) - return code; /* setup header */ data[0] = htonl(LOGDATA); @@ -225,13 +195,12 @@ udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, void *abuffer, /* write header */ code = - (*adbase->write) (adbase, LOGFILE, (char *)data, lpos, 4 * sizeof(afs_int32)); + (*adbase->buffered_append)(adbase, LOGFILE, data, 4 * sizeof(afs_int32)); if (code != 4 * sizeof(afs_int32)) return UIOERROR; - lpos += 4 * sizeof(afs_int32); /* write data */ - code = (*adbase->write) (adbase, LOGFILE, abuffer, lpos, alen); + code = (*adbase->buffered_append)(adbase, LOGFILE, abuffer, alen); if (code != alen) return UIOERROR; return 0; diff --git a/src/ubik/phys.c b/src/ubik/phys.c index 0bc24f1..bea42cf 100644 --- a/src/ubik/phys.c +++ b/src/ubik/phys.c @@ -34,8 +34,18 @@ static struct fdcache { int refCount; } fdcache[MAXFDCACHE]; +/* Cache a stdio handle for a given database file, for uphys_buf_append + * operations. We only use buf_append for one file at a time, so only try to + * cache a single file handle, since that's all we should need. */ +static struct buf_fdcache { + int fileID; + FILE *stream; +} buf_fdcache; + static char pbuffer[1024]; +static int uphys_buf_flush(struct ubik_dbase *adbase, afs_int32 afid); + /*! * \warning Beware, when using this function, of the header in front of most files. */ @@ -216,6 +226,13 @@ uphys_truncate(struct ubik_dbase *adbase, afs_int32 afile, afs_int32 asize) { afs_int32 code, fd; + + /* Just in case there's memory-buffered data for this file, flush it before + * truncating. */ + if (uphys_buf_flush(adbase, afile) < 0) { + return UIOERROR; + } + fd = uphys_open(adbase, afile); if (fd < 0) return UNOENT; @@ -293,6 +310,12 @@ int uphys_sync(struct ubik_dbase *adbase, afs_int32 afile) { afs_int32 code, fd; + + /* Flush any in-memory data, so we can sync it. */ + if (uphys_buf_flush(adbase, afile) < 0) { + return -1; + } + fd = uphys_open(adbase, afile); code = fsync(fd); uphys_close(fd); @@ -317,3 +340,58 @@ uphys_invalidate(struct ubik_dbase *adbase, afs_int32 afid) } } } + +static FILE * +uphys_buf_append_open(struct ubik_dbase *adbase, afs_int32 afid) +{ + /* If we have a cached handle open for this file, just return it. */ + if (buf_fdcache.stream && buf_fdcache.fileID == afid) { + return buf_fdcache.stream; + } + + /* Otherwise, close the existing handle, and open a new handle for the + * given file. */ + + if (buf_fdcache.stream) { + fclose(buf_fdcache.stream); + } + + snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d", adbase->pathName, + (afid<0)?"SYS":"", (afid<0)?-afid:afid); + buf_fdcache.stream = fopen(pbuffer, "a"); + buf_fdcache.fileID = afid; + return buf_fdcache.stream; +} + +static int +uphys_buf_flush(struct ubik_dbase *adbase, afs_int32 afid) +{ + if (buf_fdcache.stream && buf_fdcache.fileID == afid) { + int code = fflush(buf_fdcache.stream); + if (code) { + return -1; + } + } + return 0; +} + +/* Append the given data to the given database file, allowing the data to be + * buffered in memory. */ +int +uphys_buf_append(struct ubik_dbase *adbase, afs_int32 afid, void *adata, + afs_int32 alength) +{ + FILE *stream; + size_t code; + + stream = uphys_buf_append_open(adbase, afid); + if (!stream) { + return -1; + } + + code = fwrite(adata, alength, 1, stream); + if (code != 1) { + return -1; + } + return alength; +} diff --git a/src/ubik/ubik.c b/src/ubik/ubik.c index f28a69b..f8a8b48 100644 --- a/src/ubik/ubik.c +++ b/src/ubik/ubik.c @@ -431,6 +431,7 @@ ubik_ServerInitCommon(afs_uint32 myHost, short myPort, tdb->getlabel = uphys_getlabel; tdb->setlabel = uphys_setlabel; tdb->getnfiles = uphys_getnfiles; + tdb->buffered_append = uphys_buf_append; tdb->readers = 0; tdb->tidCounter = tdb->writeTidCounter = 0; *dbase = tdb; diff --git a/src/ubik/ubik.p.h b/src/ubik/ubik.p.h index d703ce0..4585ba9 100644 --- a/src/ubik/ubik.p.h +++ b/src/ubik/ubik.p.h @@ -159,6 +159,7 @@ struct ubik_dbase { int (*setlabel) (struct ubik_dbase * adbase, afs_int32 afile, struct ubik_version * aversion); /*!< set the version label */ int (*getlabel) (struct ubik_dbase * adbase, afs_int32 afile, struct ubik_version * aversion); /*!< retrieve the version label */ int (*getnfiles) (struct ubik_dbase * adbase); /*!< find out number of files */ + int (*buffered_append)(struct ubik_dbase *adbase, afs_int32 afid, void *adata, afs_int32 alength); short readers; /*!< number of current read transactions */ struct ubik_version cachedVersion; /*!< version of caller's cached data */ struct Lock cache_lock; /*!< protects cached application data */ @@ -448,6 +449,8 @@ extern int uphys_setlabel(struct ubik_dbase *adbase, afs_int32 afile, extern int uphys_sync(struct ubik_dbase *adbase, afs_int32 afile); extern void uphys_invalidate(struct ubik_dbase *adbase, afs_int32 afid); +extern int uphys_buf_append(struct ubik_dbase *adbase, afs_int32 afid, + void *buf, afs_int32 alength); /*! \name recovery.c */ extern int urecovery_ResetState(void); -- 1.9.4