rx-tq-waiter-rework-20050914
authorDerrick Brashear <shadow@dementia.org>
Wed, 14 Sep 2005 09:51:56 +0000 (09:51 +0000)
committerDerrick Brashear <shadow@dementia.org>
Wed, 14 Sep 2005 09:51:56 +0000 (09:51 +0000)
heavily reworked by jaltman@secure-endpoints.com
and then a little further editing by me

see if we can avoid doing to sleep forever waiting on the tq to flush

====================
This delta was composed from multiple commits as part of the CVS->Git migration.
The checkin message with each commit was inconsistent.
The following are the additional commit messages.
====================

do not decrement tqWaiters in the while evaluation.   This will
result in an invalid count if the value was zero to begin with.

src/rx/LINUX/rx_kmutex.c
src/rx/LINUX/rx_kmutex.h
src/rx/rx.c
src/rx/rx.h

index fc04981..ea31a87 100644 (file)
@@ -72,7 +72,7 @@ afs_mutex_exit(afs_kmutex_t * l)
 int
 afs_cv_wait(afs_kcondvar_t * cv, afs_kmutex_t * l, int sigok)
 {
-    int isAFSGlocked = ISAFS_GLOCK();
+    int seq, isAFSGlocked = ISAFS_GLOCK();
     sigset_t saved_set;
 #ifdef DECLARE_WAITQUEUE
     DECLARE_WAITQUEUE(wait, current);
@@ -80,8 +80,10 @@ afs_cv_wait(afs_kcondvar_t * cv, afs_kmutex_t * l, int sigok)
     struct wait_queue wait = { current, NULL };
 #endif
 
-    add_wait_queue(cv, &wait);
+    seq = cv->seq;
+    
     set_current_state(TASK_INTERRUPTIBLE);
+    add_wait_queue(&cv->waitq, &wait);
 
     if (isAFSGlocked)
        AFS_GUNLOCK();
@@ -95,8 +97,13 @@ afs_cv_wait(afs_kcondvar_t * cv, afs_kmutex_t * l, int sigok)
        SIG_UNLOCK(current);
     }
 
-    schedule();
-    remove_wait_queue(cv, &wait);
+    while(seq == cv->seq) {
+       schedule();
+       /* should we refrigerate? */
+    }
+
+    remove_wait_queue(&cv->waitq, &wait);
+    set_current_state(TASK_RUNNING);
 
     if (!sigok) {
        SIG_LOCK(current);
@@ -115,23 +122,30 @@ afs_cv_wait(afs_kcondvar_t * cv, afs_kmutex_t * l, int sigok)
 void
 afs_cv_timedwait(afs_kcondvar_t * cv, afs_kmutex_t * l, int waittime)
 {
-    int isAFSGlocked = ISAFS_GLOCK();
+    int seq, isAFSGlocked = ISAFS_GLOCK();
     long t = waittime * HZ / 1000;
 #ifdef DECLARE_WAITQUEUE
     DECLARE_WAITQUEUE(wait, current);
 #else
     struct wait_queue wait = { current, NULL };
 #endif
+    seq = cv->seq;
 
-    add_wait_queue(cv, &wait);
     set_current_state(TASK_INTERRUPTIBLE);
+    add_wait_queue(&cv->waitq, &wait);
 
     if (isAFSGlocked)
        AFS_GUNLOCK();
     MUTEX_EXIT(l);
 
-    t = schedule_timeout(t);
-    remove_wait_queue(cv, &wait);
+    while(seq == cv->seq) {
+       t = schedule_timeout(t);
+       if (!t)         /* timeout */
+           break;
+    }
+    
+    remove_wait_queue(&cv->waitq, &wait);
+    set_current_state(TASK_RUNNING);
 
     if (isAFSGlocked)
        AFS_GLOCK();
index 8473e19..8e08f00 100644 (file)
@@ -43,11 +43,14 @@ typedef struct afs_kmutex {
 #define set_current_state(X) current->state=X
 #endif
 
+typedef struct afs_kcondvar {
+    int seq;
 #if defined(AFS_LINUX24_ENV)
-typedef wait_queue_head_t afs_kcondvar_t;
+    wait_queue_head_t waitq;
 #else
-typedef struct wait_queue *afs_kcondvar_t;
+    struct wait_queue *waitq;
 #endif
+} afs_kcondvar_t;
 
 static inline int
 MUTEX_ISMINE(afs_kmutex_t * l)
@@ -62,7 +65,7 @@ MUTEX_ISMINE(afs_kmutex_t * l)
 #define MUTEX_EXIT             afs_mutex_exit
 
 #if defined(AFS_LINUX24_ENV)
-#define CV_INIT(cv,b,c,d)      init_waitqueue_head((wait_queue_head_t *)(cv))
+#define CV_INIT(cv,b,c,d)      do { (cv)->seq = 0; init_waitqueue_head(&(cv)->waitq); } while (0)
 #else
 #define CV_INIT(cv,b,c,d)      init_waitqueue((struct wait_queue**)(cv))
 #endif
@@ -71,12 +74,11 @@ MUTEX_ISMINE(afs_kmutex_t * l)
 #define CV_WAIT(cv, m)         afs_cv_wait(cv, m, 0)
 #define CV_TIMEDWAIT           afs_cv_timedwait
 
+#define CV_SIGNAL(cv)          do { ++(cv)->seq; wake_up(&(cv)->waitq); } while (0)
 #if defined(AFS_LINUX24_ENV)
-#define CV_SIGNAL(cv)          wake_up((wait_queue_head_t *)cv)
-#define CV_BROADCAST(cv)       wake_up((wait_queue_head_t *)cv)
+#define CV_BROADCAST(cv)       do { ++(cv)->seq; wake_up_all(&(cv)->waitq); } while (0)
 #else
-#define CV_SIGNAL(cv)          wake_up((struct wait_queue**)cv)
-#define CV_BROADCAST(cv)       wake_up((struct wait_queue**)cv)
+#define CV_BROADCAST(cv)       do { ++(cv)->seq; wake_up(&(cv)->waitq); } while (0)
 #endif
 
 #endif /* RX_KMUTEX_H_ */
index 9c24720..7b44ff6 100644 (file)
@@ -1170,11 +1170,17 @@ rx_NewCall(register struct rx_connection *conn)
     MUTEX_ENTER(&call->lock);
     while (call->flags & RX_CALL_TQ_BUSY) {
        call->flags |= RX_CALL_TQ_WAIT;
+       call->tqWaiters++;
 #ifdef RX_ENABLE_LOCKS
+       osirx_AssertMine(&call->lock, "rxi_Start lock4");
        CV_WAIT(&call->cv_tq, &call->lock);
 #else /* RX_ENABLE_LOCKS */
        osi_rxSleep(&call->tq);
 #endif /* RX_ENABLE_LOCKS */
+       call->tqWaiters--;
+       if (call->tqWaiters == 0) {
+           call->flags &= ~RX_CALL_TQ_WAIT;
+       }
     }
     if (call->flags & RX_CALL_TQ_CLEARME) {
        rxi_ClearTransmitQueue(call, 0);
@@ -2638,11 +2644,16 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket,
            while ((call->state == RX_STATE_ACTIVE)
                   && (call->flags & RX_CALL_TQ_BUSY)) {
                call->flags |= RX_CALL_TQ_WAIT;
+               call->tqWaiters++;
 #ifdef RX_ENABLE_LOCKS
+               osirx_AssertMine(&call->lock, "rxi_Start lock3");
                CV_WAIT(&call->cv_tq, &call->lock);
 #else /* RX_ENABLE_LOCKS */
                osi_rxSleep(&call->tq);
 #endif /* RX_ENABLE_LOCKS */
+               call->tqWaiters--;
+               if (call->tqWaiters == 0)
+                   call->flags &= ~RX_CALL_TQ_WAIT;
            }
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
            /* If the new call cannot be taken right now send a busy and set
@@ -3802,11 +3813,16 @@ rxi_ReceiveAckPacket(register struct rx_call *call, struct rx_packet *np,
        call->flags |= RX_CALL_FAST_RECOVER_WAIT;
        while (call->flags & RX_CALL_TQ_BUSY) {
            call->flags |= RX_CALL_TQ_WAIT;
+           call->tqWaiters++;
 #ifdef RX_ENABLE_LOCKS
+           osirx_AssertMine(&call->lock, "rxi_Start lock2");
            CV_WAIT(&call->cv_tq, &call->lock);
 #else /* RX_ENABLE_LOCKS */
            osi_rxSleep(&call->tq);
 #endif /* RX_ENABLE_LOCKS */
+           call->tqWaiters--;
+           if (call->tqWaiters == 0)
+               call->flags &= ~RX_CALL_TQ_WAIT;
        }
        MUTEX_ENTER(&peer->peer_lock);
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
@@ -4340,7 +4356,7 @@ rxi_CallError(register struct rx_call *call, afs_int32 error)
     if (call->error)
        error = call->error;
 #ifdef RX_GLOBAL_RXLOCK_KERNEL
-    if (!(call->flags & RX_CALL_TQ_BUSY)) {
+    if (!((call->flags & RX_CALL_TQ_BUSY) || (call->tqWaiters > 0))) {
        rxi_ResetCall(call, 0);
     }
 #else
@@ -4416,7 +4432,7 @@ rxi_ResetCall(register struct rx_call *call, register int newcall)
     flags = call->flags;
     rxi_ClearReceiveQueue(call);
 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
-    if (call->flags & RX_CALL_TQ_BUSY) {
+    if (flags & RX_CALL_TQ_BUSY) {
        call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
        call->flags |= (flags & RX_CALL_TQ_WAIT);
     } else
@@ -4424,7 +4440,18 @@ rxi_ResetCall(register struct rx_call *call, register int newcall)
     {
        rxi_ClearTransmitQueue(call, 0);
        queue_Init(&call->tq);
+       if (call->tqWaiters || (flags & RX_CALL_TQ_WAIT)) {
+           dpf(("rcall %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
+       }
        call->flags = 0;
+       while (call->tqWaiters) {
+#ifdef RX_ENABLE_LOCKS
+           CV_BROADCAST(&call->cv_tq);
+#else /* RX_ENABLE_LOCKS */
+           osi_rxWakeup(&call->tq);
+#endif /* RX_ENABLE_LOCKS */
+           call->tqWaiters--;
+       }
     }
     queue_Init(&call->rq);
     call->error = 0;
@@ -4977,11 +5004,16 @@ rxi_Start(struct rxevent *event, register struct rx_call *call,
        call->flags |= RX_CALL_FAST_RECOVER_WAIT;
        while (call->flags & RX_CALL_TQ_BUSY) {
            call->flags |= RX_CALL_TQ_WAIT;
+           call->tqWaiters++;
 #ifdef RX_ENABLE_LOCKS
+           osirx_AssertMine(&call->lock, "rxi_Start lock1");
            CV_WAIT(&call->cv_tq, &call->lock);
 #else /* RX_ENABLE_LOCKS */
            osi_rxSleep(&call->tq);
 #endif /* RX_ENABLE_LOCKS */
+           call->tqWaiters--;
+           if (call->tqWaiters == 0)
+               call->flags &= ~RX_CALL_TQ_WAIT;
        }
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
        call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
@@ -5137,14 +5169,15 @@ rxi_Start(struct rxevent *event, register struct rx_call *call,
                 */
                if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
                    call->flags &= ~RX_CALL_TQ_BUSY;
-                   if (call->flags & RX_CALL_TQ_WAIT) {
-                       call->flags &= ~RX_CALL_TQ_WAIT;
+                   if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
+                       dpf(("call %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
+                   }
 #ifdef RX_ENABLE_LOCKS
-                       CV_BROADCAST(&call->cv_tq);
+                   osirx_AssertMine(&call->lock, "rxi_Start start");
+                   CV_BROADCAST(&call->cv_tq);
 #else /* RX_ENABLE_LOCKS */
-                       osi_rxWakeup(&call->tq);
+                   osi_rxWakeup(&call->tq);
 #endif /* RX_ENABLE_LOCKS */
-                   }
                    return;
                }
                if (call->error) {
@@ -5156,14 +5189,15 @@ rxi_Start(struct rxevent *event, register struct rx_call *call,
                    rx_tq_debug.rxi_start_aborted++;
                    MUTEX_EXIT(&rx_stats_mutex);
                    call->flags &= ~RX_CALL_TQ_BUSY;
-                   if (call->flags & RX_CALL_TQ_WAIT) {
-                       call->flags &= ~RX_CALL_TQ_WAIT;
+                   if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
+                       dpf(("call %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
+                   }
 #ifdef RX_ENABLE_LOCKS
-                       CV_BROADCAST(&call->cv_tq);
+                   osirx_AssertMine(&call->lock, "rxi_Start middle");
+                   CV_BROADCAST(&call->cv_tq);
 #else /* RX_ENABLE_LOCKS */
-                       osi_rxWakeup(&call->tq);
+                   osi_rxWakeup(&call->tq);
 #endif /* RX_ENABLE_LOCKS */
-                   }
                    rxi_CallError(call, call->error);
                    return;
                }
@@ -5243,14 +5277,15 @@ rxi_Start(struct rxevent *event, register struct rx_call *call,
             * protected by the global lock.
             */
            call->flags &= ~RX_CALL_TQ_BUSY;
-           if (call->flags & RX_CALL_TQ_WAIT) {
-               call->flags &= ~RX_CALL_TQ_WAIT;
+           if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
+               dpf(("call %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
+           }
 #ifdef RX_ENABLE_LOCKS
-               CV_BROADCAST(&call->cv_tq);
+           osirx_AssertMine(&call->lock, "rxi_Start end");
+           CV_BROADCAST(&call->cv_tq);
 #else /* RX_ENABLE_LOCKS */
-               osi_rxWakeup(&call->tq);
+           osi_rxWakeup(&call->tq);
 #endif /* RX_ENABLE_LOCKS */
-           }
        } else {
            call->flags |= RX_CALL_NEED_START;
        }
index 03c035e..77ead1a 100644 (file)
@@ -540,6 +540,7 @@ struct rx_call {
     struct clock startTime;    /* time call was started */
     afs_hyper_t bytesSent;     /* Number bytes sent */
     afs_hyper_t bytesRcvd;     /* Number bytes received */
+    u_short tqWaiters;
 };
 
 #ifndef KDUMP_RX_LOCK