DEVEL15-kill-ubik-pthread-env-20080718
[openafs.git] / src / budb / dbs_dump.c
index f345383..2b94325 100644 (file)
@@ -36,7 +36,6 @@ RCSID
 #include <string.h>
 #include <des.h>
 #include <afs/cellconfig.h>
-#include <afs/auth.h>
 #include <errno.h>
 #include "budb.h"
 #include "budb_errs.h"
@@ -45,6 +44,7 @@ RCSID
 #include "globals.h"
 #include "afs/audit.h"
 
+void *dumpWatcher(void *);
 
 /* dump ubik database - interface routines */
 
@@ -64,9 +64,10 @@ badEntry(dbAddr)
  *     decode the arguments passed via LWP and dump the database.
  */
 
-setupDbDump(writeFid)
-     int writeFid;
+void *
+setupDbDump(void *param)
 {
+    int writeFid = (int)param;
     afs_int32 code = 0;
 
     code = InitRPC(&dumpSyncPtr->ut, LOCKREAD, 1);
@@ -86,7 +87,7 @@ setupDbDump(writeFid)
   error_exit:
     if (dumpSyncPtr->ut)
        ubik_EndTrans(dumpSyncPtr->ut);
-    return (code);
+    return (void *)(code);
 }
 
 
@@ -114,10 +115,15 @@ DumpDB(call, firstcall, maxLength, charListPtr, done)
      charListT *charListPtr;
      afs_int32 *done;
 {
+#ifdef AFS_PTHREAD_ENV
+    pthread_t dumperPid, watcherPid;
+    pthread_attr_t dumperPid_tattr;
+    pthread_attr_t watcherPid_tattr;
+#else
     PROCESS dumperPid, watcherPid;
+#endif
     int readSize;
     afs_int32 code = 0;
-    extern dumpWatcher();
 
     if (callPermitted(call) == 0)
        ERROR(BUDB_NOTPERMITTED);
@@ -155,20 +161,45 @@ DumpDB(call, firstcall, maxLength, charListPtr, done)
        if (code)
            ERROR(errno);
 
+#ifdef AFS_PTHREAD_ENV
+       /* Initialize the condition variables and the mutexes we use
+        * to signal and synchronize the reader and writer threads.
+        */
+       assert(pthread_cond_init(&dumpSyncPtr->ds_readerStatus_cond, (const pthread_condattr_t *)0) == 0);
+       assert(pthread_cond_init(&dumpSyncPtr->ds_writerStatus_cond, (const pthread_condattr_t *)0) == 0);
+       assert(pthread_mutex_init(&dumpSyncPtr->ds_readerStatus_mutex, (const pthread_mutexattr_t *)0) == 0);
+       assert(pthread_mutex_init(&dumpSyncPtr->ds_writerStatus_mutex, (const pthread_mutexattr_t *)0) == 0);
+
+       /* Initialize the thread attributes and launch the thread */
+
+       assert(pthread_attr_init(&dumperPid_tattr) == 0);
+       assert(pthread_attr_setdetachstate(&dumperPid_tattr, PTHREAD_CREATE_DETACHED) == 0);
+       assert(pthread_create(&dumperPid, &dumperPid_tattr, (void *)setupDbDump, NULL) == 0);
+
+#else
        code =
            LWP_CreateProcess(setupDbDump, 16384, 1,
                              (void *)dumpSyncPtr->pipeFid[1],
                              "Database Dumper", &dumperPid);
        if (code)
            goto error_exit;
+#endif
 
        dumpSyncPtr->dumperPid = dumperPid;
        dumpSyncPtr->timeToLive = time(0) + DUMP_TTL_INC;
 
+#ifdef AFS_PTHREAD_ENV
+       /* Initialize the thread attributes and launch the thread */
+
+       assert(pthread_attr_init(&watcherPid_tattr) == 0);
+       assert(pthread_attr_setdetachstate(&watcherPid_tattr, PTHREAD_CREATE_DETACHED) == 0);
+       assert(pthread_create(&watcherPid, &watcherPid_tattr, (void *)dumpWatcher, NULL) == 0);
+#else
        /* now create the watcher thread */
        code =
            LWP_CreateProcess(dumpWatcher, 16384, 1, 0,
                              "Database Dump Watchdog", &watcherPid);
+#endif
     } else if (firstcall)
        ERROR(BUDB_LOCKED);
 
@@ -185,14 +216,24 @@ DumpDB(call, firstcall, maxLength, charListPtr, done)
        if (dumpSyncPtr->ds_writerStatus == DS_WAITING) {
            LogDebug(6, "wakup writer\n");
            dumpSyncPtr->ds_writerStatus = 0;
+#ifdef AFS_PTHREAD_ENV
+           assert(pthread_cond_broadcast(&dumpSyncPtr->ds_writerStatus_cond) == 0);
+#else
            code = LWP_SignalProcess(&dumpSyncPtr->ds_writerStatus);
            if (code)
                LogError(code, "BUDB_DumpDB: signal delivery failed\n");
+#endif
        }
        LogDebug(6, "wait for writer\n");
        dumpSyncPtr->ds_readerStatus = DS_WAITING;
        ReleaseWriteLock(&dumpSyncPtr->ds_lock);
+#ifdef AFS_PTHREAD_ENV
+        assert(pthread_mutex_lock(&dumpSyncPtr->ds_readerStatus_mutex) == 0);
+        assert(pthread_cond_wait(&dumpSyncPtr->ds_readerStatus_cond, &dumpSyncPtr->ds_readerStatus_mutex) == 0);
+        assert(pthread_mutex_unlock(&dumpSyncPtr->ds_readerStatus_mutex) == 0);
+#else
        LWP_WaitProcess(&dumpSyncPtr->ds_readerStatus);
+#endif
        ObtainWriteLock(&dumpSyncPtr->ds_lock);
     }
 
@@ -217,9 +258,13 @@ DumpDB(call, firstcall, maxLength, charListPtr, done)
     dumpSyncPtr->ds_bytes -= readSize;
     if (dumpSyncPtr->ds_writerStatus == DS_WAITING) {
        dumpSyncPtr->ds_writerStatus = 0;
+#ifdef AFS_PTHREAD_ENV
+       assert(pthread_cond_broadcast(&dumpSyncPtr->ds_writerStatus_cond) == 0);
+#else
        code = LWP_SignalProcess(&dumpSyncPtr->ds_writerStatus);
        if (code)
            LogError(code, "BUDB_DumpDB: signal delivery failed\n");
+#endif
     }
 
   error_exit:
@@ -289,7 +334,8 @@ RestoreDbHeader(call, header)
  *     transactions can proceed.
  */
 
-dumpWatcher()
+void *
+dumpWatcher(void *unused)
 {
     afs_int32 code;
 
@@ -312,10 +358,13 @@ dumpWatcher()
 
            close(dumpSyncPtr->pipeFid[0]);
            close(dumpSyncPtr->pipeFid[1]);
-
+#ifdef AFS_PTHREAD_ENV
+           assert(pthread_cancel(dumpSyncPtr->dumperPid) == 0);
+#else
            code = LWP_DestroyProcess(dumpSyncPtr->dumperPid);
            if (code)
                LogError(code, "dumpWatcher: failed to kill dump thread\n");
+#endif
 
            if (dumpSyncPtr->ut) {
                code = ubik_AbortTrans(dumpSyncPtr->ut);
@@ -328,7 +377,11 @@ dumpWatcher()
        }
        /*i */
        ReleaseWriteLock(&dumpSyncPtr->ds_lock);
+#ifdef AFS_PTHREAD_ENV
+       sleep(5);
+#else
        IOMGR_Sleep(5);
+#endif
     }                          /*w */
 
   exit: