int
afs_cv_wait(afs_kcondvar_t * cv, afs_kmutex_t * l, int sigok)
{
- int isAFSGlocked = ISAFS_GLOCK();
+ int seq, isAFSGlocked = ISAFS_GLOCK();
sigset_t saved_set;
#ifdef DECLARE_WAITQUEUE
DECLARE_WAITQUEUE(wait, current);
struct wait_queue wait = { current, NULL };
#endif
- add_wait_queue(cv, &wait);
+ seq = cv->seq;
+
set_current_state(TASK_INTERRUPTIBLE);
+ add_wait_queue(&cv->waitq, &wait);
if (isAFSGlocked)
AFS_GUNLOCK();
SIG_UNLOCK(current);
}
- schedule();
- remove_wait_queue(cv, &wait);
+ while(seq == cv->seq) {
+ schedule();
+ /* should we refrigerate? */
+ }
+
+ remove_wait_queue(&cv->waitq, &wait);
+ set_current_state(TASK_RUNNING);
if (!sigok) {
SIG_LOCK(current);
void
afs_cv_timedwait(afs_kcondvar_t * cv, afs_kmutex_t * l, int waittime)
{
- int isAFSGlocked = ISAFS_GLOCK();
+ int seq, isAFSGlocked = ISAFS_GLOCK();
long t = waittime * HZ / 1000;
#ifdef DECLARE_WAITQUEUE
DECLARE_WAITQUEUE(wait, current);
#else
struct wait_queue wait = { current, NULL };
#endif
+ seq = cv->seq;
- add_wait_queue(cv, &wait);
set_current_state(TASK_INTERRUPTIBLE);
+ add_wait_queue(&cv->waitq, &wait);
if (isAFSGlocked)
AFS_GUNLOCK();
MUTEX_EXIT(l);
- t = schedule_timeout(t);
- remove_wait_queue(cv, &wait);
+ while(seq == cv->seq) {
+ t = schedule_timeout(t);
+ if (!t) /* timeout */
+ break;
+ }
+
+ remove_wait_queue(&cv->waitq, &wait);
+ set_current_state(TASK_RUNNING);
if (isAFSGlocked)
AFS_GLOCK();
#define set_current_state(X) current->state=X
#endif
+typedef struct afs_kcondvar {
+ int seq;
#if defined(AFS_LINUX24_ENV)
-typedef wait_queue_head_t afs_kcondvar_t;
+ wait_queue_head_t waitq;
#else
-typedef struct wait_queue *afs_kcondvar_t;
+ struct wait_queue *waitq;
#endif
+} afs_kcondvar_t;
static inline int
MUTEX_ISMINE(afs_kmutex_t * l)
#define MUTEX_EXIT afs_mutex_exit
#if defined(AFS_LINUX24_ENV)
-#define CV_INIT(cv,b,c,d) init_waitqueue_head((wait_queue_head_t *)(cv))
+#define CV_INIT(cv,b,c,d) do { (cv)->seq = 0; init_waitqueue_head(&(cv)->waitq); } while (0)
#else
#define CV_INIT(cv,b,c,d) init_waitqueue((struct wait_queue**)(cv))
#endif
#define CV_WAIT(cv, m) afs_cv_wait(cv, m, 0)
#define CV_TIMEDWAIT afs_cv_timedwait
+#define CV_SIGNAL(cv) do { ++(cv)->seq; wake_up(&(cv)->waitq); } while (0)
#if defined(AFS_LINUX24_ENV)
-#define CV_SIGNAL(cv) wake_up((wait_queue_head_t *)cv)
-#define CV_BROADCAST(cv) wake_up((wait_queue_head_t *)cv)
+#define CV_BROADCAST(cv) do { ++(cv)->seq; wake_up_all(&(cv)->waitq); } while (0)
#else
-#define CV_SIGNAL(cv) wake_up((struct wait_queue**)cv)
-#define CV_BROADCAST(cv) wake_up((struct wait_queue**)cv)
+#define CV_BROADCAST(cv) do { ++(cv)->seq; wake_up(&(cv)->waitq); } while (0)
#endif
#endif /* RX_KMUTEX_H_ */
MUTEX_ENTER(&call->lock);
while (call->flags & RX_CALL_TQ_BUSY) {
call->flags |= RX_CALL_TQ_WAIT;
+ call->tqWaiters++;
#ifdef RX_ENABLE_LOCKS
+ osirx_AssertMine(&call->lock, "rxi_Start lock4");
CV_WAIT(&call->cv_tq, &call->lock);
#else /* RX_ENABLE_LOCKS */
osi_rxSleep(&call->tq);
#endif /* RX_ENABLE_LOCKS */
+ call->tqWaiters--;
+ if (call->tqWaiters == 0) {
+ call->flags &= ~RX_CALL_TQ_WAIT;
+ }
}
if (call->flags & RX_CALL_TQ_CLEARME) {
rxi_ClearTransmitQueue(call, 0);
while ((call->state == RX_STATE_ACTIVE)
&& (call->flags & RX_CALL_TQ_BUSY)) {
call->flags |= RX_CALL_TQ_WAIT;
+ call->tqWaiters++;
#ifdef RX_ENABLE_LOCKS
+ osirx_AssertMine(&call->lock, "rxi_Start lock3");
CV_WAIT(&call->cv_tq, &call->lock);
#else /* RX_ENABLE_LOCKS */
osi_rxSleep(&call->tq);
#endif /* RX_ENABLE_LOCKS */
+ call->tqWaiters--;
+ if (call->tqWaiters == 0)
+ call->flags &= ~RX_CALL_TQ_WAIT;
}
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
/* If the new call cannot be taken right now send a busy and set
call->flags |= RX_CALL_FAST_RECOVER_WAIT;
while (call->flags & RX_CALL_TQ_BUSY) {
call->flags |= RX_CALL_TQ_WAIT;
+ call->tqWaiters++;
#ifdef RX_ENABLE_LOCKS
+ osirx_AssertMine(&call->lock, "rxi_Start lock2");
CV_WAIT(&call->cv_tq, &call->lock);
#else /* RX_ENABLE_LOCKS */
osi_rxSleep(&call->tq);
#endif /* RX_ENABLE_LOCKS */
+ call->tqWaiters--;
+ if (call->tqWaiters == 0)
+ call->flags &= ~RX_CALL_TQ_WAIT;
}
MUTEX_ENTER(&peer->peer_lock);
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
if (call->error)
error = call->error;
#ifdef RX_GLOBAL_RXLOCK_KERNEL
- if (!(call->flags & RX_CALL_TQ_BUSY)) {
+ if (!((call->flags & RX_CALL_TQ_BUSY) || (call->tqWaiters > 0))) {
rxi_ResetCall(call, 0);
}
#else
flags = call->flags;
rxi_ClearReceiveQueue(call);
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- if (call->flags & RX_CALL_TQ_BUSY) {
+ if (flags & RX_CALL_TQ_BUSY) {
call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
call->flags |= (flags & RX_CALL_TQ_WAIT);
} else
{
rxi_ClearTransmitQueue(call, 0);
queue_Init(&call->tq);
+ if (call->tqWaiters || (flags & RX_CALL_TQ_WAIT)) {
+ dpf(("rcall %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
+ }
call->flags = 0;
+ while (call->tqWaiters) {
+#ifdef RX_ENABLE_LOCKS
+ CV_BROADCAST(&call->cv_tq);
+#else /* RX_ENABLE_LOCKS */
+ osi_rxWakeup(&call->tq);
+#endif /* RX_ENABLE_LOCKS */
+ call->tqWaiters--;
+ }
}
queue_Init(&call->rq);
call->error = 0;
call->flags |= RX_CALL_FAST_RECOVER_WAIT;
while (call->flags & RX_CALL_TQ_BUSY) {
call->flags |= RX_CALL_TQ_WAIT;
+ call->tqWaiters++;
#ifdef RX_ENABLE_LOCKS
+ osirx_AssertMine(&call->lock, "rxi_Start lock1");
CV_WAIT(&call->cv_tq, &call->lock);
#else /* RX_ENABLE_LOCKS */
osi_rxSleep(&call->tq);
#endif /* RX_ENABLE_LOCKS */
+ call->tqWaiters--;
+ if (call->tqWaiters == 0)
+ call->flags &= ~RX_CALL_TQ_WAIT;
}
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
*/
if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
call->flags &= ~RX_CALL_TQ_BUSY;
- if (call->flags & RX_CALL_TQ_WAIT) {
- call->flags &= ~RX_CALL_TQ_WAIT;
+ if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
+ dpf(("call %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
+ }
#ifdef RX_ENABLE_LOCKS
- CV_BROADCAST(&call->cv_tq);
+ osirx_AssertMine(&call->lock, "rxi_Start start");
+ CV_BROADCAST(&call->cv_tq);
#else /* RX_ENABLE_LOCKS */
- osi_rxWakeup(&call->tq);
+ osi_rxWakeup(&call->tq);
#endif /* RX_ENABLE_LOCKS */
- }
return;
}
if (call->error) {
rx_tq_debug.rxi_start_aborted++;
MUTEX_EXIT(&rx_stats_mutex);
call->flags &= ~RX_CALL_TQ_BUSY;
- if (call->flags & RX_CALL_TQ_WAIT) {
- call->flags &= ~RX_CALL_TQ_WAIT;
+ if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
+ dpf(("call %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
+ }
#ifdef RX_ENABLE_LOCKS
- CV_BROADCAST(&call->cv_tq);
+ osirx_AssertMine(&call->lock, "rxi_Start middle");
+ CV_BROADCAST(&call->cv_tq);
#else /* RX_ENABLE_LOCKS */
- osi_rxWakeup(&call->tq);
+ osi_rxWakeup(&call->tq);
#endif /* RX_ENABLE_LOCKS */
- }
rxi_CallError(call, call->error);
return;
}
* protected by the global lock.
*/
call->flags &= ~RX_CALL_TQ_BUSY;
- if (call->flags & RX_CALL_TQ_WAIT) {
- call->flags &= ~RX_CALL_TQ_WAIT;
+ if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
+ dpf(("call %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
+ }
#ifdef RX_ENABLE_LOCKS
- CV_BROADCAST(&call->cv_tq);
+ osirx_AssertMine(&call->lock, "rxi_Start end");
+ CV_BROADCAST(&call->cv_tq);
#else /* RX_ENABLE_LOCKS */
- osi_rxWakeup(&call->tq);
+ osi_rxWakeup(&call->tq);
#endif /* RX_ENABLE_LOCKS */
- }
} else {
call->flags |= RX_CALL_NEED_START;
}