pthreaded-ubik-20080402
[openafs.git] / src / budb / dbs_dump.c
index d9efc99..0dcd34d 100644 (file)
@@ -115,7 +115,14 @@ DumpDB(call, firstcall, maxLength, charListPtr, done)
      charListT *charListPtr;
      afs_int32 *done;
 {
+#if defined(AFS_PTHREAD_ENV) && defined(UBIK_PTHREAD_ENV)
+    pthread_t dumperPid, watcherPid;
+    pthread_attr_t dumperPid_tattr;
+    pthread_attr_t watcherPid_tattr;
+
+#else
     PROCESS dumperPid, watcherPid;
+#endif
     int readSize;
     afs_int32 code = 0;
 
@@ -155,20 +162,45 @@ DumpDB(call, firstcall, maxLength, charListPtr, done)
        if (code)
            ERROR(errno);
 
+#if defined(AFS_PTHREAD_ENV) && defined(UBIK_PTHREAD_ENV)
+       /* Initialize the condition variables and the mutexes we use
+        * to signal and synchronize the reader and writer threads.
+        */
+       assert(pthread_cond_init(&dumpSyncPtr->ds_readerStatus_cond, (const pthread_condattr_t *)0) == 0);
+       assert(pthread_cond_init(&dumpSyncPtr->ds_writerStatus_cond, (const pthread_condattr_t *)0) == 0);
+       assert(pthread_mutex_init(&dumpSyncPtr->ds_readerStatus_mutex, (const pthread_mutexattr_t *)0) == 0);
+       assert(pthread_mutex_init(&dumpSyncPtr->ds_writerStatus_mutex, (const pthread_mutexattr_t *)0) == 0);
+
+       /* Initialize the thread attributes and launch the thread */
+
+       assert(pthread_attr_init(&dumperPid_tattr) == 0);
+       assert(pthread_attr_setdetachstate(&dumperPid_tattr, PTHREAD_CREATE_DETACHED) == 0);
+       assert(pthread_create(&dumperPid, &dumperPid_tattr, (void *)setupDbDump, NULL) == 0);
+
+#else
        code =
            LWP_CreateProcess(setupDbDump, 16384, 1,
                              (void *)dumpSyncPtr->pipeFid[1],
                              "Database Dumper", &dumperPid);
        if (code)
            goto error_exit;
+#endif
 
        dumpSyncPtr->dumperPid = dumperPid;
        dumpSyncPtr->timeToLive = time(0) + DUMP_TTL_INC;
 
+#if defined(AFS_PTHREAD_ENV) && defined(UBIK_PTHREAD_ENV)
+       /* Initialize the thread attributes and launch the thread */
+
+       assert(pthread_attr_init(&watcherPid_tattr) == 0);
+       assert(pthread_attr_setdetachstate(&watcherPid_tattr, PTHREAD_CREATE_DETACHED) == 0);
+       assert(pthread_create(&watcherPid, &watcherPid_tattr, (void *)dumpWatcher, NULL) == 0);
+#else
        /* now create the watcher thread */
        code =
            LWP_CreateProcess(dumpWatcher, 16384, 1, 0,
                              "Database Dump Watchdog", &watcherPid);
+#endif
     } else if (firstcall)
        ERROR(BUDB_LOCKED);
 
@@ -185,14 +217,24 @@ DumpDB(call, firstcall, maxLength, charListPtr, done)
        if (dumpSyncPtr->ds_writerStatus == DS_WAITING) {
            LogDebug(6, "wakup writer\n");
            dumpSyncPtr->ds_writerStatus = 0;
+#if defined(AFS_PTHREAD_ENV) && defined(UBIK_PTHREAD_ENV)
+           assert(pthread_cond_broadcast(&dumpSyncPtr->ds_writerStatus_cond) == 0);
+#else
            code = LWP_SignalProcess(&dumpSyncPtr->ds_writerStatus);
            if (code)
                LogError(code, "BUDB_DumpDB: signal delivery failed\n");
+#endif
        }
        LogDebug(6, "wait for writer\n");
        dumpSyncPtr->ds_readerStatus = DS_WAITING;
        ReleaseWriteLock(&dumpSyncPtr->ds_lock);
+#if defined(AFS_PTHREAD_ENV) && defined(UBIK_PTHREAD_ENV)
+        assert(pthread_mutex_lock(&dumpSyncPtr->ds_readerStatus_mutex) == 0);
+        assert(pthread_cond_wait(&dumpSyncPtr->ds_readerStatus_cond, &dumpSyncPtr->ds_readerStatus_mutex) == 0);
+        assert(pthread_mutex_unlock(&dumpSyncPtr->ds_readerStatus_mutex) == 0);
+#else
        LWP_WaitProcess(&dumpSyncPtr->ds_readerStatus);
+#endif
        ObtainWriteLock(&dumpSyncPtr->ds_lock);
     }
 
@@ -217,9 +259,13 @@ DumpDB(call, firstcall, maxLength, charListPtr, done)
     dumpSyncPtr->ds_bytes -= readSize;
     if (dumpSyncPtr->ds_writerStatus == DS_WAITING) {
        dumpSyncPtr->ds_writerStatus = 0;
+#if defined(AFS_PTHREAD_ENV) && defined(UBIK_PTHREAD_ENV)
+       assert(pthread_cond_broadcast(&dumpSyncPtr->ds_writerStatus_cond) == 0);
+#else
        code = LWP_SignalProcess(&dumpSyncPtr->ds_writerStatus);
        if (code)
            LogError(code, "BUDB_DumpDB: signal delivery failed\n");
+#endif
     }
 
   error_exit:
@@ -313,10 +359,13 @@ dumpWatcher(void *unused)
 
            close(dumpSyncPtr->pipeFid[0]);
            close(dumpSyncPtr->pipeFid[1]);
-
+#if defined(AFS_PTHREAD_ENV) && defined(UBIK_PTHREAD_ENV)
+           assert(pthread_cancel(dumpSyncPtr->dumperPid) == 0);
+#else
            code = LWP_DestroyProcess(dumpSyncPtr->dumperPid);
            if (code)
                LogError(code, "dumpWatcher: failed to kill dump thread\n");
+#endif
 
            if (dumpSyncPtr->ut) {
                code = ubik_AbortTrans(dumpSyncPtr->ut);
@@ -329,7 +378,11 @@ dumpWatcher(void *unused)
        }
        /*i */
        ReleaseWriteLock(&dumpSyncPtr->ds_lock);
+#if defined(AFS_PTHREAD_ENV) && defined(UBIK_PTHREAD_ENV)
+       sleep(5);
+#else
        IOMGR_Sleep(5);
+#endif
     }                          /*w */
 
   exit: