ubik: Buffer log writes with stdio 70/13070/5
authorAndrew Deason <adeason@sinenomine.net>
Thu, 10 May 2018 21:22:52 +0000 (16:22 -0500)
committerBenjamin Kaduk <kaduk@mit.edu>
Sat, 15 Sep 2018 01:52:33 +0000 (21:52 -0400)
Currently, when we write ubik i/o operations to the db log, we tend to
issue several syscalls involving small writes and fstat()s. This is
because each "log" operation involves at least one write, and each log
operation tends to be pretty small.

Each logged operation hitting disk separately is unnecessary, since
the db log does not need to hit the disk at all until we are ready to
commit the transaction. So to reduce the number of syscalls when
writing to the db, change our log writes to be buffered in memory
(using stdio calls). This also avoids needing to fstat() the
underlying log file, since we open the underlying file in append-only
mode, since we only ever append to (and truncate) the log file.

To implement this, we introduce a new 'buffered_append' phys
operation, to explicitly separate our buffered and non-buffered
operations, to try to avoid any bugs from mixing buffered and
non-buffered i/o. This new operation is only used for the db log.

Change-Id: I5596117c6c71ab7c2d552f71b0ef038f387e358a
Reviewed-on: https://gerrit.openafs.org/13070
Reviewed-by: Mark Vitale <mvitale@sinenomine.net>
Reviewed-by: Michael Meffie <mmeffie@sinenomine.net>
Reviewed-by: Joe Gorse <jhgorse@gmail.com>
Reviewed-by: Benjamin Kaduk <kaduk@mit.edu>
Reviewed-by: Marcio Brito Barbosa <mbarbosa@sinenomine.net>
Tested-by: BuildBot <buildbot@rampaginggeek.com>

src/ubik/disk.c
src/ubik/phys.c
src/ubik/ubik.c
src/ubik/ubik.p.h

index 24d6a68..ed2b55b 100644 (file)
@@ -113,19 +113,11 @@ udisk_Debug(struct ubik_debug *aparm)
 static int
 udisk_LogOpcode(struct ubik_dbase *adbase, afs_int32 aopcode, int async)
 {
-    struct ubik_stat ustat;
     afs_int32 code;
 
-    /* figure out where to write */
-    code = (*adbase->stat) (adbase, LOGFILE, &ustat);
-    if (code < 0)
-       return code;
-
     /* setup data and do write */
     aopcode = htonl(aopcode);
-    code =
-       (*adbase->write) (adbase, LOGFILE, (char *)&aopcode, ustat.size,
-                         sizeof(afs_int32));
+    code = (*adbase->buffered_append)(adbase, LOGFILE, &aopcode, sizeof(afs_int32));
     if (code != sizeof(afs_int32))
        return UIOERROR;
 
@@ -145,12 +137,6 @@ udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion)
 {
     afs_int32 code;
     afs_int32 data[3];
-    struct ubik_stat ustat;
-
-    /* figure out where to write */
-    code = (*adbase->stat) (adbase, LOGFILE, &ustat);
-    if (code)
-       return code;
 
     /* setup data */
     data[0] = htonl(LOGEND);
@@ -159,8 +145,7 @@ udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion)
 
     /* do write */
     code =
-       (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
-                         3 * sizeof(afs_int32));
+       (*adbase->buffered_append)(adbase, LOGFILE, data, 3 * sizeof(afs_int32));
     if (code != 3 * sizeof(afs_int32))
        return UIOERROR;
 
@@ -178,12 +163,6 @@ udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile,
 {
     afs_int32 code;
     afs_int32 data[3];
-    struct ubik_stat ustat;
-
-    /* figure out where to write */
-    code = (*adbase->stat) (adbase, LOGFILE, &ustat);
-    if (code < 0)
-       return code;
 
     /* setup data */
     data[0] = htonl(LOGTRUNCATE);
@@ -192,8 +171,7 @@ udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile,
 
     /* do write */
     code =
-       (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
-                         3 * sizeof(afs_int32));
+       (*adbase->buffered_append)(adbase, LOGFILE, data, 3 * sizeof(afs_int32));
     if (code != 3 * sizeof(afs_int32))
        return UIOERROR;
     return 0;
@@ -206,16 +184,8 @@ static int
 udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, void *abuffer,
                   afs_int32 apos, afs_int32 alen)
 {
-    struct ubik_stat ustat;
     afs_int32 code;
     afs_int32 data[4];
-    afs_int32 lpos;
-
-    /* find end of log */
-    code = (*adbase->stat) (adbase, LOGFILE, &ustat);
-    lpos = ustat.size;
-    if (code < 0)
-       return code;
 
     /* setup header */
     data[0] = htonl(LOGDATA);
@@ -225,13 +195,12 @@ udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, void *abuffer,
 
     /* write header */
     code =
-       (*adbase->write) (adbase, LOGFILE, (char *)data, lpos, 4 * sizeof(afs_int32));
+       (*adbase->buffered_append)(adbase, LOGFILE, data, 4 * sizeof(afs_int32));
     if (code != 4 * sizeof(afs_int32))
        return UIOERROR;
-    lpos += 4 * sizeof(afs_int32);
 
     /* write data */
-    code = (*adbase->write) (adbase, LOGFILE, abuffer, lpos, alen);
+    code = (*adbase->buffered_append)(adbase, LOGFILE, abuffer, alen);
     if (code != alen)
        return UIOERROR;
     return 0;
index 0bc24f1..bea42cf 100644 (file)
@@ -34,8 +34,18 @@ static struct fdcache {
     int refCount;
 } fdcache[MAXFDCACHE];
 
+/* Cache a stdio handle for a given database file, for uphys_buf_append
+ * operations. We only use buf_append for one file at a time, so only try to
+ * cache a single file handle, since that's all we should need. */
+static struct buf_fdcache {
+    int fileID;
+    FILE *stream;
+} buf_fdcache;
+
 static char pbuffer[1024];
 
+static int uphys_buf_flush(struct ubik_dbase *adbase, afs_int32 afid);
+
 /*!
  * \warning Beware, when using this function, of the header in front of most files.
  */
@@ -216,6 +226,13 @@ uphys_truncate(struct ubik_dbase *adbase, afs_int32 afile,
               afs_int32 asize)
 {
     afs_int32 code, fd;
+
+    /* Just in case there's memory-buffered data for this file, flush it before
+     * truncating. */
+    if (uphys_buf_flush(adbase, afile) < 0) {
+        return UIOERROR;
+    }
+
     fd = uphys_open(adbase, afile);
     if (fd < 0)
        return UNOENT;
@@ -293,6 +310,12 @@ int
 uphys_sync(struct ubik_dbase *adbase, afs_int32 afile)
 {
     afs_int32 code, fd;
+
+    /* Flush any in-memory data, so we can sync it. */
+    if (uphys_buf_flush(adbase, afile) < 0) {
+        return -1;
+    }
+
     fd = uphys_open(adbase, afile);
     code = fsync(fd);
     uphys_close(fd);
@@ -317,3 +340,58 @@ uphys_invalidate(struct ubik_dbase *adbase, afs_int32 afid)
        }
     }
 }
+
+static FILE *
+uphys_buf_append_open(struct ubik_dbase *adbase, afs_int32 afid)
+{
+    /* If we have a cached handle open for this file, just return it. */
+    if (buf_fdcache.stream && buf_fdcache.fileID == afid) {
+        return buf_fdcache.stream;
+    }
+
+    /* Otherwise, close the existing handle, and open a new handle for the
+     * given file. */
+
+    if (buf_fdcache.stream) {
+        fclose(buf_fdcache.stream);
+    }
+
+    snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d", adbase->pathName,
+            (afid<0)?"SYS":"", (afid<0)?-afid:afid);
+    buf_fdcache.stream = fopen(pbuffer, "a");
+    buf_fdcache.fileID = afid;
+    return buf_fdcache.stream;
+}
+
+static int
+uphys_buf_flush(struct ubik_dbase *adbase, afs_int32 afid)
+{
+    if (buf_fdcache.stream && buf_fdcache.fileID == afid) {
+        int code = fflush(buf_fdcache.stream);
+        if (code) {
+            return -1;
+        }
+    }
+    return 0;
+}
+
+/* Append the given data to the given database file, allowing the data to be
+ * buffered in memory. */
+int
+uphys_buf_append(struct ubik_dbase *adbase, afs_int32 afid, void *adata,
+                 afs_int32 alength)
+{
+    FILE *stream;
+    size_t code;
+
+    stream = uphys_buf_append_open(adbase, afid);
+    if (!stream) {
+        return -1;
+    }
+
+    code = fwrite(adata, alength, 1, stream);
+    if (code != 1) {
+        return -1;
+    }
+    return alength;
+}
index f28a69b..f8a8b48 100644 (file)
@@ -431,6 +431,7 @@ ubik_ServerInitCommon(afs_uint32 myHost, short myPort,
     tdb->getlabel = uphys_getlabel;
     tdb->setlabel = uphys_setlabel;
     tdb->getnfiles = uphys_getnfiles;
+    tdb->buffered_append = uphys_buf_append;
     tdb->readers = 0;
     tdb->tidCounter = tdb->writeTidCounter = 0;
     *dbase = tdb;
index d703ce0..4585ba9 100644 (file)
@@ -159,6 +159,7 @@ struct ubik_dbase {
     int (*setlabel) (struct ubik_dbase * adbase, afs_int32 afile, struct ubik_version * aversion);     /*!< set the version label */
     int (*getlabel) (struct ubik_dbase * adbase, afs_int32 afile, struct ubik_version * aversion);     /*!< retrieve the version label */
     int (*getnfiles) (struct ubik_dbase * adbase);     /*!< find out number of files */
+    int (*buffered_append)(struct ubik_dbase *adbase, afs_int32 afid, void *adata, afs_int32 alength);
     short readers;             /*!< number of current read transactions */
     struct ubik_version cachedVersion; /*!< version of caller's cached data */
     struct Lock cache_lock; /*!< protects cached application data */
@@ -448,6 +449,8 @@ extern int uphys_setlabel(struct ubik_dbase *adbase, afs_int32 afile,
 extern int uphys_sync(struct ubik_dbase *adbase, afs_int32 afile);
 extern void uphys_invalidate(struct ubik_dbase *adbase,
                             afs_int32 afid);
+extern int uphys_buf_append(struct ubik_dbase *adbase, afs_int32 afid,
+                            void *buf, afs_int32 alength);
 
 /*! \name recovery.c */
 extern int urecovery_ResetState(void);