vlserver: Allow reading during ubik writes
authorAndrew Deason <adeason@sinenomine.net>
Fri, 21 May 2010 21:26:53 +0000 (16:26 -0500)
committerDerrick Brashear <shadow@dementia.org>
Mon, 23 Aug 2010 16:32:52 +0000 (09:32 -0700)
Turn on the new ubik_BeginTransReadAnyWrite functionality for the
vlserver, which allows us to read data from ubik during a conflicting
ubik write lock. When writing, we now update a copy of the
application-level cache, and write back the changes during a commit.

Change-Id: Ica958af704f40e89c2062e43641d883865613802
Reviewed-on: http://gerrit.openafs.org/2106
Reviewed-by: Derrick Brashear <shadow@dementia.org>
Tested-by: Derrick Brashear <shadow@dementia.org>

src/vlserver/vlprocs.c
src/vlserver/vlserver.c
src/vlserver/vlserver_internal.h
src/vlserver/vlutils.c

index b42b821..d020c79 100644 (file)
@@ -133,7 +133,7 @@ Init_VLdbase(struct vl_ctx *ctx,
            wl = 1;
        } else if (locktype == LOCKREAD) {
            errorcode =
-               ubik_BeginTransReadAny(VL_dbase, UBIK_READTRANS, &ctx->trans);
+               ubik_BeginTransReadAnyWrite(VL_dbase, UBIK_READTRANS, &ctx->trans);
            wl = 0;
        } else {
            errorcode = ubik_BeginTrans(VL_dbase, UBIK_WRITETRANS, &ctx->trans);
index d123377..67bad77 100644 (file)
@@ -60,6 +60,7 @@ int lwps = 9;
 struct vldstats dynamic_statistics;
 struct ubik_dbase *VL_dbase;
 afs_uint32 rd_HostAddress[MAXSERVERID + 1];
+afs_uint32 wr_HostAddress[MAXSERVERID + 1];
 
 static void *CheckSignal(void*);
 int LogLevel = 0;
@@ -348,6 +349,7 @@ main(int argc, char **argv)
     ubik_SRXSecurityRock = (char *)tdir;
     ubik_CheckRXSecurityProc = afsconf_CheckAuth;
     ubik_CheckRXSecurityRock = (char *)tdir;
+    ubik_SyncWriterCacheProc = vlsynccache;
     code =
        ubik_ServerInitByInfo(myHost, htons(AFSCONF_VLDBPORT), &info, clones,
                              vl_dbaseName, &VL_dbase);
@@ -364,6 +366,7 @@ main(int argc, char **argv)
     rx_SetRxDeadTime(50);
 
     memset(rd_HostAddress, 0, sizeof(rd_HostAddress));
+    memset(wr_HostAddress, 0, sizeof(wr_HostAddress));
     initialize_dstats();
 
     afsconf_BuildServerSecurityObjects(tdir, 0, &securityClasses, &numClasses);
index c23e99c..9650aa3 100644 (file)
@@ -63,4 +63,5 @@ extern afs_int32 NextEntry(struct vl_ctx *ctx, afs_int32 blockindex,
                           struct nvlentry *tentry, afs_int32 *remaining);
 extern int FreeBlock(struct vl_ctx *ctx, afs_int32 blockindex);
 extern int vlsetcache(struct vl_ctx *ctx, int locktype);
+extern int vlsynccache(void);
 #endif
index ebc080f..65ca56d 100644 (file)
 struct vlheader xheader;
 extern int maxnservers;
 struct extentaddr extentaddr;
+extern afs_uint32 rd_HostAddress[MAXSERVERID + 1];
+extern afs_uint32 wr_HostAddress[MAXSERVERID + 1];
 struct extentaddr *rd_ex_addr[VL_MAX_ADDREXTBLKS] = { 0, 0, 0, 0 };
+struct extentaddr *wr_ex_addr[VL_MAX_ADDREXTBLKS] = { 0, 0, 0, 0 };
 struct vlheader rd_cheader;    /* kept in network byte order */
+struct vlheader wr_cheader;
 int vldbversion = 0;
 
 static int index_OK(struct vl_ctx *ctx, afs_int32 blockindex);
@@ -306,7 +310,6 @@ UpdateCache(struct ubik_trans *trans, void *rock)
     int *builddb_rock = rock;
     int builddb = *builddb_rock;
     afs_int32 error = 0, i, code, ubcode;
-    extern afs_uint32 rd_HostAddress[];
 
     /* if version changed (or first call), read the header */
     ubcode = vlread(trans, 0, (char *)&rd_cheader, sizeof(rd_cheader));
@@ -1064,13 +1067,55 @@ index_OK(struct vl_ctx *ctx, afs_int32 blockindex)
     return 1;
 }
 
+/* makes a deep copy of src_ex into dst_ex */
+static int
+vlexcpy(struct extentaddr **dst_ex, struct extentaddr **src_ex)
+{
+    int i;
+    for (i = 0; i < VL_MAX_ADDREXTBLKS; i++) {
+       if (src_ex[i]) {
+           if (!dst_ex[i]) {
+               dst_ex[i] = malloc(VL_ADDREXTBLK_SIZE);
+           }
+           if (!dst_ex[i]) {
+               return VL_NOMEM;
+           }
+           memcpy(dst_ex[i], src_ex[i], VL_ADDREXTBLK_SIZE);
+
+       } else if (dst_ex[i]) {
+           /* we have no src, but we have a dst... meaning, this block
+            * has gone away */
+           free(dst_ex[i]);
+           dst_ex[i] = NULL;
+       }
+    }
+    return 0;
+}
+
 int
 vlsetcache(struct vl_ctx *ctx, int locktype)
 {
-    extern afs_uint32 rd_HostAddress[];
+    if (locktype == LOCKREAD) {
+       ctx->hostaddress = rd_HostAddress;
+       ctx->ex_addr = rd_ex_addr;
+       ctx->cheader = &rd_cheader;
+       return 0;
+    } else {
+       memcpy(wr_HostAddress, rd_HostAddress, sizeof(wr_HostAddress));
+       memcpy(&wr_cheader, &rd_cheader, sizeof(wr_cheader));
 
-    ctx->hostaddress = rd_HostAddress;
-    ctx->ex_addr = rd_ex_addr;
-    ctx->cheader = &rd_cheader;
-    return 0;
+       ctx->hostaddress = wr_HostAddress;
+       ctx->ex_addr = wr_ex_addr;
+       ctx->cheader = &wr_cheader;
+
+       return vlexcpy(wr_ex_addr, rd_ex_addr);
+    }
+}
+
+int
+vlsynccache(void)
+{
+    memcpy(rd_HostAddress, wr_HostAddress, sizeof(rd_HostAddress));
+    memcpy(&rd_cheader, &wr_cheader, sizeof(rd_cheader));
+    return vlexcpy(rd_ex_addr, wr_ex_addr);
 }