# include "rx_user.h"
#endif /* KERNEL */
+#include <opr/queue.h>
+
#include "rx.h"
#include "rx_clock.h"
-#include "rx_queue.h"
#include "rx_atomic.h"
#include "rx_globals.h"
#include "rx_trace.h"
/* Incoming calls wait on this queue when there are no available
* server processes */
-struct rx_queue rx_incomingCallQueue;
+struct opr_queue rx_incomingCallQueue;
/* Server processes wait on this queue when there are no appropriate
* calls to process */
-struct rx_queue rx_idleServerQueue;
+struct opr_queue rx_idleServerQueue;
#if !defined(offsetof)
#include <stddef.h> /* for definition of offsetof() */
/* Malloc up a bunch of packets & buffers */
rx_nFreePackets = 0;
- queue_Init(&rx_freePacketQueue);
+ opr_queue_Init(&rx_freePacketQueue);
rxi_NeedMorePackets = FALSE;
rx_nPackets = 0; /* rx_nPackets is managed by rxi_MorePackets* */
rxevent_Init(20, rxi_ReScheduleEvents);
/* Initialize various global queues */
- queue_Init(&rx_idleServerQueue);
- queue_Init(&rx_incomingCallQueue);
- queue_Init(&rx_freeCallQueue);
+ opr_queue_Init(&rx_idleServerQueue);
+ opr_queue_Init(&rx_incomingCallQueue);
+ opr_queue_Init(&rx_freeCallQueue);
#if defined(AFS_NT40_ENV) && !defined(KERNEL)
/* Initialize our list of usable IP addresses. */
static_inline void
rxi_rto_packet_acked(struct rx_call *call, int istack)
{
- struct rx_packet *p, *nxp;
+ struct opr_queue *cursor;
rxi_rto_cancel(call);
- if (queue_IsEmpty(&call->tq))
+ if (opr_queue_IsEmpty(&call->tq))
return;
- for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
+ for (opr_queue_Scan(&call->tq, cursor)) {
+ struct rx_packet *p = opr_queue_Entry(cursor, struct rx_packet, entry);
if (p->header.seq > call->tfirst + call->twind)
return;
rx_WakeupServerProcs(void)
{
struct rx_serverQueueEntry *np, *tqp;
+ struct opr_queue *cursor;
SPLVAR;
NETPRI;
#endif /* RX_ENABLE_LOCKS */
}
MUTEX_EXIT(&freeSQEList_lock);
- for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
+ for (opr_queue_Scan(&rx_idleServerQueue, cursor)) {
+ np = opr_queue_Entry(cursor, struct rx_serverQueueEntry, entry);
#ifdef RX_ENABLE_LOCKS
CV_BROADCAST(&np->cv);
#else /* RX_ENABLE_LOCKS */
ReturnToServerPool(cur_service);
}
while (1) {
- if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
- struct rx_call *tcall, *ncall, *choice2 = NULL;
+ if (!opr_queue_IsEmpty(&rx_incomingCallQueue)) {
+ struct rx_call *tcall, *choice2 = NULL;
+ struct opr_queue *cursor;
/* Scan for eligible incoming calls. A call is not eligible
* if the maximum number of calls for its service type are
* while the other threads may run ahead looking for calls which
* have all their input data available immediately. This helps
* keep threads from blocking, waiting for data from the client. */
- for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
+ for (opr_queue_Scan(&rx_incomingCallQueue, cursor)) {
+ tcall = opr_queue_Entry(cursor, struct rx_call, entry);
+
service = tcall->conn->service;
if (!QuotaOK(service)) {
continue;
}
MUTEX_ENTER(&rx_pthread_mutex);
if (tno == rxi_fcfs_thread_num
- || queue_IsLast(&rx_incomingCallQueue, tcall)) {
+ || opr_queue_IsEnd(&rx_incomingCallQueue, cursor)) {
MUTEX_EXIT(&rx_pthread_mutex);
/* If we're the fcfs thread , then we'll just use
* this call. If we haven't been able to find an optimal
service = call->conn->service;
} else {
MUTEX_EXIT(&rx_pthread_mutex);
- if (!queue_IsEmpty(&tcall->rq)) {
+ if (!opr_queue_IsEmpty(&tcall->rq)) {
struct rx_packet *rp;
- rp = queue_First(&tcall->rq, rx_packet);
+ rp = opr_queue_First(&tcall->rq, struct rx_packet,
+ entry);
if (rp->header.seq == 1) {
if (!meltdown_1pkt
|| (rp->header.flags & RX_LAST_PACKET)) {
}
if (call) {
- queue_Remove(call);
+ opr_queue_Remove(&call->entry);
MUTEX_EXIT(&rx_serverPool_lock);
MUTEX_ENTER(&call->lock);
continue;
}
- if (queue_IsEmpty(&call->rq)
- || queue_First(&call->rq, rx_packet)->header.seq != 1)
+ if (opr_queue_IsEmpty(&call->rq)
+ || opr_queue_First(&call->rq, struct rx_packet, entry)->header.seq != 1)
rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
CLEAR_CALL_QUEUE_LOCK(call);
*socketp = OSI_NULLSOCKET;
}
sq->socketp = socketp;
- queue_Append(&rx_idleServerQueue, sq);
+ opr_queue_Append(&rx_idleServerQueue, &sq->entry);
#ifndef AFS_AIX41_ENV
rx_waitForPacket = sq;
#else
rxi_availProcs++;
MUTEX_EXIT(&rx_quota_mutex);
}
- if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
- struct rx_call *tcall, *ncall;
+ if (!opr_queue_IsEmpty(&rx_incomingCallQueue)) {
+ struct rx_call *tcall;
+ struct opr_queue *cursor;
/* Scan for eligible incoming calls. A call is not eligible
* if the maximum number of calls for its service type are
* already executing */
* have all their input data available immediately. This helps
* keep threads from blocking, waiting for data from the client. */
choice2 = (struct rx_call *)0;
- for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
+ for (opr_queue_Scan(&rx_incomingCallQueue, cursor)) {
+ tcall = opr_queue_Entry(cursor, struct rx_call, entry);
service = tcall->conn->service;
if (QuotaOK(service)) {
MUTEX_ENTER(&rx_pthread_mutex);
- if (tno == rxi_fcfs_thread_num
- || !tcall->queue_item_header.next) {
+ /* XXX - If tcall->entry.next is NULL, then we're no longer
+ * on a queue at all. This shouldn't happen. */
+ if (tno == rxi_fcfs_thread_num || !tcall->entry.next) {
MUTEX_EXIT(&rx_pthread_mutex);
/* If we're the fcfs thread, then we'll just use
* this call. If we haven't been able to find an optimal
service = call->conn->service;
} else {
MUTEX_EXIT(&rx_pthread_mutex);
- if (!queue_IsEmpty(&tcall->rq)) {
+ if (!opr_queue_IsEmpty(&tcall->rq)) {
struct rx_packet *rp;
- rp = queue_First(&tcall->rq, rx_packet);
+ rp = opr_queue_First(&tcall->rq, struct rx_packet,
+ entry);
if (rp->header.seq == 1
&& (!meltdown_1pkt
|| (rp->header.flags & RX_LAST_PACKET))) {
}
if (call) {
- queue_Remove(call);
+ opr_queue_Remove(&call->entry);
/* we can't schedule a call if there's no data!!! */
/* send an ack if there's no data, if we're missing the
* first packet, or we're missing something between first
* and last -- there's a "hole" in the incoming data. */
- if (queue_IsEmpty(&call->rq)
- || queue_First(&call->rq, rx_packet)->header.seq != 1
- || call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
+ if (opr_queue_IsEmpty(&call->rq)
+ || opr_queue_First(&call->rq, struct rx_packet, entry)->header.seq != 1
+ || call->rprev != opr_queue_Last(&call->rq, struct rx_packet, entry)->header.seq)
rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
call->flags &= (~RX_CALL_WAIT_PROC);
*socketp = OSI_NULLSOCKET;
}
sq->socketp = socketp;
- queue_Append(&rx_idleServerQueue, sq);
+ opr_queue_Append(&rx_idleServerQueue, &sq->entry);
do {
osi_rxSleep(sq);
#ifdef KERNEL
struct rx_call *call;
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
struct rx_call *cp; /* Call pointer temp */
- struct rx_call *nxp; /* Next call pointer, for queue_Scan */
+ struct opr_queue *cursor;
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
dpf(("rxi_NewCall(conn %"AFS_PTR_FMT", channel %d)\n", conn, channel));
* Skip over those with in-use TQs.
*/
call = NULL;
- for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
+ for (opr_queue_Scan(&rx_freeCallQueue, cursor)) {
+ cp = opr_queue_Entry(cursor, struct rx_call, entry);
if (!(cp->flags & RX_CALL_TQ_BUSY)) {
call = cp;
break;
}
if (call) {
#else /* AFS_GLOBAL_RXLOCK_KERNEL */
- if (queue_IsNotEmpty(&rx_freeCallQueue)) {
- call = queue_First(&rx_freeCallQueue, rx_call);
+ if (!opr_queue_IsEmpty(&rx_freeCallQueue)) {
+ call = opr_queue_First(&rx_freeCallQueue, struct rx_call, entry);
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- queue_Remove(call);
+ opr_queue_Remove(&call->entry);
if (rx_stats_active)
rx_atomic_dec(&rx_stats.nFreeCallStructs);
MUTEX_EXIT(&rx_freeCallQueue_lock);
CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
/* Initialize once-only items */
- queue_Init(&call->tq);
- queue_Init(&call->rq);
- queue_Init(&call->iovq);
+ opr_queue_Init(&call->tq);
+ opr_queue_Init(&call->rq);
+ opr_queue_Init(&call->iovq);
#ifdef RXDEBUG_PACKET
call->rqc = call->tqc = call->iovqc = 0;
#endif /* RXDEBUG_PACKET */
* the head of the list, and idle calls at the tail.
*/
if (call->flags & RX_CALL_TQ_BUSY)
- queue_Prepend(&rx_freeCallQueue, call);
+ opr_queue_Prepend(&rx_freeCallQueue, &call->entry);
else
- queue_Append(&rx_freeCallQueue, call);
+ opr_queue_Append(&rx_freeCallQueue, &call->entry);
#else /* AFS_GLOBAL_RXLOCK_KERNEL */
- queue_Append(&rx_freeCallQueue, call);
+ opr_queue_Append(&rx_freeCallQueue, &call->entry);
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
if (rx_stats_active)
rx_atomic_inc(&rx_stats.nFreeCallStructs);
rx_atomic_set(&pp->neterrs, 0);
#endif
MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
- queue_Init(&pp->rpcStats);
+ opr_queue_Init(&pp->rpcStats);
pp->next = rx_peerHashTable[hashIndex];
rx_peerHashTable[hashIndex] = pp;
rxi_InitPeerParams(pp);
if (seq == call->rnext) {
/* Check to make sure it is not a duplicate of one already queued */
- if (queue_IsNotEmpty(&call->rq)
- && queue_First(&call->rq, rx_packet)->header.seq == seq) {
+ if (!opr_queue_IsEmpty(&call->rq)
+ && opr_queue_First(&call->rq, struct rx_packet, entry)->header.seq == seq) {
if (rx_stats_active)
rx_atomic_inc(&rx_stats.dupPacketsRead);
dpf(("packet %"AFS_PTR_FMT" dropped on receipt - duplicate\n", np));
#ifdef RX_TRACK_PACKETS
np->flags |= RX_PKTFLAG_RQ;
#endif
- queue_Prepend(&call->rq, np);
+ opr_queue_Prepend(&call->rq, &np->entry);
#ifdef RXDEBUG_PACKET
call->rqc++;
#endif /* RXDEBUG_PACKET */
/* Check whether we have all of the packets for this call */
if (call->flags & RX_CALL_HAVE_LAST) {
afs_uint32 tseq; /* temporary sequence number */
- struct rx_packet *tp; /* Temporary packet pointer */
- struct rx_packet *nxp; /* Next pointer, for queue_Scan */
+ struct opr_queue *cursor;
- for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
+ for (tseq = seq, opr_queue_Scan(&call->rq, cursor)) {
+ struct rx_packet *tp;
+
+ tp = opr_queue_Entry(cursor, struct rx_packet, entry);
if (tseq != tp->header.seq)
break;
if (tp->header.flags & RX_LAST_PACKET) {
* any of this packets predecessors are missing. */
afs_uint32 prev; /* "Previous packet" sequence number */
- struct rx_packet *tp; /* Temporary packet pointer */
- struct rx_packet *nxp; /* Next pointer, for queue_Scan */
+ struct opr_queue *cursor;
int missing; /* Are any predecessors missing? */
/* If the new packet's sequence number has been sent to the
}
/* Look for the packet in the queue of old received packets */
- for (prev = call->rnext - 1, missing =
- 0, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
+ prev = call->rnext - 1;
+ missing = 0;
+ for (opr_queue_Scan(&call->rq, cursor)) {
+ struct rx_packet *tp
+ = opr_queue_Entry(cursor, struct rx_packet, entry);
+
/*Check for duplicate packet */
if (seq == tp->header.seq) {
if (rx_stats_active)
#ifdef RXDEBUG_PACKET
call->rqc++;
#endif /* RXDEBUG_PACKET */
- queue_InsertBefore(tp, np);
+ opr_queue_InsertBefore(cursor, &np->entry);
call->nSoftAcks++;
np = NULL;
&& !(call->flags & RX_CALL_RECEIVE_DONE)) {
afs_uint32 tseq; /* temporary sequence number */
- for (tseq =
- call->rnext, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
+ tseq = call->rnext;
+ for (opr_queue_Scan(&call->rq, cursor)) {
+ struct rx_packet *tp
+ = opr_queue_Entry(cursor, struct rx_packet, entry);
if (tseq != tp->header.seq)
break;
if (tp->header.flags & RX_LAST_PACKET) {
struct rx_ackPacket *ap;
int nAcks;
struct rx_packet *tp;
- struct rx_packet *nxp; /* Next packet pointer for queue_Scan */
struct rx_connection *conn = call->conn;
struct rx_peer *peer = conn->peer;
+ struct opr_queue *cursor;
struct clock now; /* Current time, for RTT calculations */
afs_uint32 first;
afs_uint32 prev;
* disposed of
*/
- tp = queue_First(&call->tq, rx_packet);
- while(!queue_IsEnd(&call->tq, tp) && tp->header.seq < first) {
+ tp = opr_queue_First(&call->tq, struct rx_packet, entry);
+ while(!opr_queue_IsEnd(&call->tq, &tp->entry) && tp->header.seq < first) {
struct rx_packet *next;
- next = queue_Next(tp, rx_packet);
+ next = opr_queue_Next(&tp->entry, struct rx_packet, entry);
call->tfirst = tp->header.seq + 1;
if (!(tp->flags & RX_PKTFLAG_ACKED)) {
} else
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
{
- queue_Remove(tp);
+ opr_queue_Remove(&tp->entry);
#ifdef RX_TRACK_PACKETS
tp->flags &= ~RX_PKTFLAG_TQ;
#endif
call->nSoftAcked = 0;
missing = 0;
- while (!queue_IsEnd(&call->tq, tp) && tp->header.seq < first + nAcks) {
+ while (!opr_queue_IsEnd(&call->tq, &tp->entry)
+ && tp->header.seq < first + nAcks) {
/* Set the acknowledge flag per packet based on the
* information in the ack packet. An acknowlegded packet can
* be downgraded when the server has discarded a packet it
missing = 1;
}
- tp = queue_Next(tp, rx_packet);
+ tp = opr_queue_Next(&tp->entry, struct rx_packet, entry);
}
/* We don't need to take any action with the 3rd or 4th section in the
* so we will retransmit as soon as the window permits
*/
- for (acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
+ acked = 0;
+ for (opr_queue_ScanBackwards(&call->tq, cursor)) {
+ struct rx_packet *tp =
+ opr_queue_Entry(cursor, struct rx_packet, entry);
if (acked) {
if (!(tp->flags & RX_PKTFLAG_ACKED)) {
tp->flags &= ~RX_PKTFLAG_SENT;
call->state = RX_STATE_DALLY;
rxi_ClearTransmitQueue(call, 0);
rxevent_Cancel(&call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
- } else if (!queue_IsEmpty(&call->tq)) {
+ } else if (!opr_queue_IsEmpty(&call->tq)) {
rxi_Start(call, istack);
}
return np;
MUTEX_ENTER(&rx_serverPool_lock);
haveQuota = QuotaOK(service);
- if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) {
+ if ((!haveQuota) || opr_queue_IsEmpty(&rx_idleServerQueue)) {
/* If there are no processes available to service this call,
* put the call on the incoming call queue (unless it's
* already on the queue).
rx_atomic_inc(&rx_nWaited);
rxi_calltrace(RX_CALL_ARRIVAL, call);
SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
- queue_Append(&rx_incomingCallQueue, call);
+ opr_queue_Append(&rx_incomingCallQueue, &call->entry);
}
} else {
- sq = queue_Last(&rx_idleServerQueue, rx_serverQueueEntry);
+ sq = opr_queue_Last(&rx_idleServerQueue,
+ struct rx_serverQueueEntry, entry);
/* If hot threads are enabled, and both newcallp and sq->socketp
* are non-null, then this thread will process the call, and the
* idle server thread will start listening on this threads socket.
*/
- queue_Remove(sq);
+ opr_queue_Remove(&sq->entry);
+
if (rx_enable_hot_thread && newcallp && sq->socketp) {
*newcallp = call;
*tnop = sq->tno;
/* Conservative: I don't think this should happen */
call->flags &= ~RX_CALL_WAIT_PROC;
rx_atomic_dec(&rx_nWaiting);
- if (queue_IsOnQueue(call)) {
- queue_Remove(call);
+ if (opr_queue_IsOnQueue(&call->entry)) {
+ opr_queue_Remove(&call->entry);
}
}
call->state = RX_STATE_ACTIVE;
static void
rxi_SetAcksInTransmitQueue(struct rx_call *call)
{
- struct rx_packet *p, *tp;
+ struct opr_queue *cursor;
int someAcked = 0;
- for (queue_Scan(&call->tq, p, tp, rx_packet)) {
+ for (opr_queue_Scan(&call->tq, cursor)) {
+ struct rx_packet *p
+ = opr_queue_Entry(cursor, struct rx_packet, entry);
+
p->flags |= RX_PKTFLAG_ACKED;
someAcked = 1;
}
+
if (someAcked) {
call->flags |= RX_CALL_TQ_CLEARME;
call->flags |= RX_CALL_TQ_SOME_ACKED;
rxi_ClearTransmitQueue(struct rx_call *call, int force)
{
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- struct rx_packet *p, *tp;
-
+ struct opr_queue *cursor;
if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
int someAcked = 0;
- for (queue_Scan(&call->tq, p, tp, rx_packet)) {
+ for (opr_queue_Scan(&call->tq, cursor)) {
+ struct rx_packet *p
+ = opr_queue_Entry(cursor, struct rx_packet, entry);
+
p->flags |= RX_PKTFLAG_ACKED;
someAcked = 1;
}
static void
rxi_ClearReceiveQueue(struct rx_call *call)
{
- if (queue_IsNotEmpty(&call->rq)) {
+ if (!opr_queue_IsEmpty(&call->rq)) {
u_short count;
count = rxi_FreePackets(0, &call->rq);
*/
if (call->call_queue_lock) {
MUTEX_ENTER(call->call_queue_lock);
- if (queue_IsOnQueue(call)) {
- queue_Remove(call);
+ if (opr_queue_IsOnQueue(&call->entry)) {
+ opr_queue_Remove(&call->entry);
}
MUTEX_EXIT(call->call_queue_lock);
CLEAR_CALL_QUEUE_LOCK(call);
}
#else /* RX_ENABLE_LOCKS */
- if (queue_IsOnQueue(call)) {
- queue_Remove(call);
+ if (opr_queue_IsOnQueue(&call->entry)) {
+ opr_queue_Remove(&call->entry);
}
#endif /* RX_ENABLE_LOCKS */
int istack)
{
struct rx_ackPacket *ap;
- struct rx_packet *rqp;
- struct rx_packet *nxp; /* For queue_Scan */
struct rx_packet *p;
+ struct opr_queue *cursor;
u_char offset = 0;
afs_int32 templ;
afs_uint32 padbytes = 0;
* are packets in the receive queue awaiting processing.
*/
if ((call->flags & RX_CALL_ACKALL_SENT) &&
- !queue_IsEmpty(&call->rq)) {
- ap->firstPacket = htonl(queue_Last(&call->rq, rx_packet)->header.seq + 1);
+ !opr_queue_IsEmpty(&call->rq)) {
+ ap->firstPacket = htonl(opr_queue_Last(&call->rq, struct rx_packet, entry)->header.seq + 1);
} else {
ap->firstPacket = htonl(call->rnext);
ap->previousPacket = htonl(call->rprev); /* Previous packet received */
- /* No fear of running out of ack packet here because there can only be at most
- * one window full of unacknowledged packets. The window size must be constrained
- * to be less than the maximum ack size, of course. Also, an ack should always
- * fit into a single packet -- it should not ever be fragmented. */
- for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
+ /* No fear of running out of ack packet here because there can only
+ * be at most one window full of unacknowledged packets. The window
+ * size must be constrained to be less than the maximum ack size,
+ * of course. Also, an ack should always fit into a single packet
+ * -- it should not ever be fragmented. */
+ offset = 0;
+ for (opr_queue_Scan(&call->rq, cursor)) {
+ struct rx_packet *rqp
+ = opr_queue_Entry(cursor, struct rx_packet, entry);
+
if (!rqp || !call->rq.next
|| (rqp->header.seq > (call->rnext + call->rwind))) {
#ifndef RX_ENABLE_TSFPQ
{
struct rx_call *call = arg0;
struct rx_peer *peer;
- struct rx_packet *p, *nxp;
+ struct opr_queue *cursor;
struct clock maxTimeout = { 60, 0 };
MUTEX_ENTER(&call->lock);
rxi_CheckBusy(call);
}
- if (queue_IsEmpty(&call->tq)) {
+ if (opr_queue_IsEmpty(&call->tq)) {
/* Nothing to do. This means that we've been raced, and that an
* ACK has come in between when we were triggered, and when we
* actually got to run. */
call->flags |= RX_CALL_FAST_RECOVER;
/* Mark all of the pending packets in the queue as being lost */
- for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
+ for (opr_queue_Scan(&call->tq, cursor)) {
+ struct rx_packet *p = opr_queue_Entry(cursor, struct rx_packet, entry);
if (!(p->flags & RX_PKTFLAG_ACKED))
p->flags &= ~RX_PKTFLAG_SENT;
}
void
rxi_Start(struct rx_call *call, int istack)
{
-
- struct rx_packet *p;
- struct rx_packet *nxp; /* Next pointer for queue_Scan */
+ struct opr_queue *cursor;
+#ifdef RX_ENABLE_LOCKS
+ struct opr_queue *store;
+#endif
int nXmitPackets;
int maxXmitPackets;
return;
}
- if (queue_IsNotEmpty(&call->tq)) { /* If we have anything to send */
-
+ if (!opr_queue_IsEmpty(&call->tq)) { /* If we have anything to send */
/* Send (or resend) any packets that need it, subject to
* window restrictions and congestion burst control
* restrictions. Ask for an ack on the last packet sent in
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
nXmitPackets = 0;
maxXmitPackets = MIN(call->twind, call->cwind);
- for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
-#ifdef RX_TRACK_PACKETS
- if ((p->flags & RX_PKTFLAG_FREE)
- || (!queue_IsEnd(&call->tq, nxp)
- && (nxp->flags & RX_PKTFLAG_FREE))
- || (p == (struct rx_packet *)&rx_freePacketQueue)
- || (nxp == (struct rx_packet *)&rx_freePacketQueue)) {
- osi_Panic("rxi_Start: xmit queue clobbered");
- }
-#endif
+ for (opr_queue_Scan(&call->tq, cursor)) {
+ struct rx_packet *p
+ = opr_queue_Entry(cursor, struct rx_packet, entry);
+
if (p->flags & RX_PKTFLAG_ACKED) {
/* Since we may block, don't trust this */
if (rx_stats_active)
*(call->callNumber), p));
call->xmitList[nXmitPackets++] = p;
}
- }
+ } /* end of the queue_Scan */
/* xmitList now hold pointers to all of the packets that are
* ready to send. Now we loop to send the packets */
/* Some packets have received acks. If they all have, we can clear
* the transmit queue.
*/
- for (missing =
- 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
+ missing = 0;
+ for (opr_queue_ScanSafe(&call->tq, cursor, store)) {
+ struct rx_packet *p
+ = opr_queue_Entry(cursor, struct rx_packet, entry);
+
if (p->header.seq < call->tfirst
&& (p->flags & RX_PKTFLAG_ACKED)) {
- queue_Remove(p);
+ opr_queue_Remove(&p->entry);
#ifdef RX_TRACK_PACKETS
p->flags &= ~RX_PKTFLAG_TQ;
#endif
code = MUTEX_TRYENTER(&peer->peer_lock);
if ((code) && (peer->refCount == 0)
&& ((peer->idleWhen + rx_idlePeerTime) < now.sec)) {
- rx_interface_stat_p rpc_stat, nrpc_stat;
+ struct opr_queue *cursor, *store;
size_t space;
/*
MUTEX_EXIT(&peer->peer_lock);
MUTEX_DESTROY(&peer->peer_lock);
- for (queue_Scan
- (&peer->rpcStats, rpc_stat, nrpc_stat,
- rx_interface_stat)) {
+
+ for (opr_queue_ScanSafe(&peer->rpcStats, cursor, store)) {
unsigned int num_funcs;
+ struct rx_interface_stat *rpc_stat
+ = opr_queue_Entry(cursor, struct rx_interface_stat,
+ entry);
if (!rpc_stat)
break;
- queue_Remove(&rpc_stat->queue_header);
- queue_Remove(&rpc_stat->all_peers);
+
+ opr_queue_Remove(&rpc_stat->entry);
+ opr_queue_Remove(&rpc_stat->entryPeers);
+
num_funcs = rpc_stat->stats[0].func_total;
space =
sizeof(rx_interface_stat_t) +
#endif /* AFS_USE_GETTIMEOFDAY */
#endif /* AFS_PTHREAD_ENV */
- while (!queue_IsEmpty(&rx_freeCallQueue)) {
- call = queue_First(&rx_freeCallQueue, rx_call);
- queue_Remove(call);
+ while (!opr_queue_IsEmpty(&rx_freeCallQueue)) {
+ call = opr_queue_First(&rx_freeCallQueue, struct rx_call, entry);
+ opr_queue_Remove(&call->entry);
rxi_Free(call, sizeof(struct rx_call));
}
- while (!queue_IsEmpty(&rx_idleServerQueue)) {
- sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
- queue_Remove(sq);
+ while (!opr_queue_IsEmpty(&rx_idleServerQueue)) {
+ sq = opr_queue_First(&rx_idleServerQueue, struct rx_serverQueueEntry,
+ entry);
+ opr_queue_Remove(&sq->entry);
}
#endif /* KERNEL */
MUTEX_ENTER(&rx_peerHashTable_lock);
for (peer = *peer_ptr; peer; peer = next) {
- rx_interface_stat_p rpc_stat, nrpc_stat;
+ struct opr_queue *cursor, *store;
size_t space;
MUTEX_ENTER(&rx_rpc_stats);
MUTEX_ENTER(&peer->peer_lock);
- for (queue_Scan
- (&peer->rpcStats, rpc_stat, nrpc_stat,
- rx_interface_stat)) {
+ for (opr_queue_ScanSafe(&peer->rpcStats, cursor, store)) {
unsigned int num_funcs;
+ struct rx_interface_stat *rpc_stat
+ = opr_queue_Entry(cursor, struct rx_interface_stat,
+ entry);
if (!rpc_stat)
break;
- queue_Remove(&rpc_stat->queue_header);
- queue_Remove(&rpc_stat->all_peers);
+ opr_queue_Remove(&rpc_stat->entry);
+ opr_queue_Remove(&rpc_stat->entryPeers);
num_funcs = rpc_stat->stats[0].func_total;
space =
sizeof(rx_interface_stat_t) +
* which can come and go based upon the peer lifetime.
*/
-static struct rx_queue processStats = { &processStats, &processStats };
+static struct opr_queue processStats = { &processStats, &processStats };
/*
* peerStats is a queue used to store the statistics for all peer structs.
* Its contents are the union of all the peer rpcStats queues.
*/
-static struct rx_queue peerStats = { &peerStats, &peerStats };
+static struct opr_queue peerStats = { &peerStats, &peerStats };
/*
* rxi_monitor_processStats is used to turn process wide stat collection
*/
static rx_interface_stat_p
-rxi_FindRpcStat(struct rx_queue *stats, afs_uint32 rxInterface,
+rxi_FindRpcStat(struct opr_queue *stats, afs_uint32 rxInterface,
afs_uint32 totalFunc, int isServer, afs_uint32 remoteHost,
afs_uint32 remotePort, int addToPeerList,
unsigned int *counter, int create)
{
- rx_interface_stat_p rpc_stat, nrpc_stat;
+ rx_interface_stat_p rpc_stat = NULL;
+ struct opr_queue *cursor;
/*
* See if there's already a structure for this interface
*/
- for (queue_Scan(stats, rpc_stat, nrpc_stat, rx_interface_stat)) {
+ for (opr_queue_Scan(stats, cursor)) {
+ rpc_stat = opr_queue_Entry(cursor, struct rx_interface_stat, entry);
+
if ((rpc_stat->stats[0].interfaceId == rxInterface)
&& (rpc_stat->stats[0].remote_is_server == isServer))
break;
/* if they didn't ask us to create, we're done */
if (!create) {
- if (queue_IsEnd(stats, rpc_stat))
+ if (opr_queue_IsEnd(stats, cursor))
return NULL;
else
return rpc_stat;
* queue.
*/
- if (queue_IsEnd(stats, rpc_stat) || (rpc_stat == NULL)
+ if (opr_queue_IsEnd(stats, cursor) || (rpc_stat == NULL)
|| (rpc_stat->stats[0].interfaceId != rxInterface)
|| (rpc_stat->stats[0].remote_is_server != isServer)) {
int i;
rpc_stat->stats[i].func_total = totalFunc;
rpc_stat->stats[i].func_index = i;
}
- queue_Prepend(stats, rpc_stat);
+ opr_queue_Prepend(stats, &rpc_stat->entry);
if (addToPeerList) {
- queue_Prepend(&peerStats, &rpc_stat->all_peers);
+ opr_queue_Prepend(&peerStats, &rpc_stat->entryPeers);
}
}
return rpc_stat;
*/
static int
-rxi_AddRpcStat(struct rx_queue *stats, afs_uint32 rxInterface,
+rxi_AddRpcStat(struct opr_queue *stats, afs_uint32 rxInterface,
afs_uint32 currentFunc, afs_uint32 totalFunc,
struct clock *queueTime, struct clock *execTime,
afs_uint64 bytesSent, afs_uint64 bytesRcvd, int isServer,
ptr = *stats = rxi_Alloc(space);
if (ptr != NULL) {
- rx_interface_stat_p rpc_stat, nrpc_stat;
+ struct opr_queue *cursor;
-
- for (queue_Scan
- (&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
+ for (opr_queue_Scan(&processStats, cursor)) {
+ struct rx_interface_stat *rpc_stat =
+ opr_queue_Entry(cursor, struct rx_interface_stat, entry);
/*
* Copy the data based upon the caller version
*/
ptr = *stats = rxi_Alloc(space);
if (ptr != NULL) {
- rx_interface_stat_p rpc_stat, nrpc_stat;
- char *fix_offset;
+ struct opr_queue *cursor;
- for (queue_Scan
- (&peerStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
- /*
- * We have to fix the offset of rpc_stat since we are
- * keeping this structure on two rx_queues. The rx_queue
- * package assumes that the rx_queue member is the first
- * member of the structure. That is, rx_queue assumes that
- * any one item is only on one queue at a time. We are
- * breaking that assumption and so we have to do a little
- * math to fix our pointers.
- */
-
- fix_offset = (char *)rpc_stat;
- fix_offset -= offsetof(rx_interface_stat_t, all_peers);
- rpc_stat = (rx_interface_stat_p) fix_offset;
+ for (opr_queue_Scan(&peerStats, cursor)) {
+ struct rx_interface_stat *rpc_stat
+ = opr_queue_Entry(cursor, struct rx_interface_stat,
+ entryPeers);
/*
* Copy the data based upon the caller version
void
rx_disableProcessRPCStats(void)
{
- rx_interface_stat_p rpc_stat, nrpc_stat;
+ struct opr_queue *cursor, *store;
size_t space;
MUTEX_ENTER(&rx_rpc_stats);
rx_enable_stats = 0;
}
- for (queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
- unsigned int num_funcs = 0;
- if (!rpc_stat)
- break;
- queue_Remove(rpc_stat);
+ for (opr_queue_ScanSafe(&processStats, cursor, store)) {
+ unsigned int num_funcs = 0;
+ struct rx_interface_stat *rpc_stat
+ = opr_queue_Entry(cursor, struct rx_interface_stat, entry);
+
+ opr_queue_Remove(&rpc_stat->entry);
+
num_funcs = rpc_stat->stats[0].func_total;
space =
sizeof(rx_interface_stat_t) +
next = peer->next;
code = MUTEX_TRYENTER(&peer->peer_lock);
if (code) {
- rx_interface_stat_p rpc_stat, nrpc_stat;
size_t space;
+ struct opr_queue *cursor, *store;
if (prev == *peer_ptr) {
*peer_ptr = next;
peer->refCount++;
MUTEX_EXIT(&rx_peerHashTable_lock);
- for (queue_Scan
- (&peer->rpcStats, rpc_stat, nrpc_stat,
- rx_interface_stat)) {
+ for (opr_queue_ScanSafe(&peer->rpcStats, cursor, store)) {
unsigned int num_funcs = 0;
- if (!rpc_stat)
- break;
- queue_Remove(&rpc_stat->queue_header);
- queue_Remove(&rpc_stat->all_peers);
+ struct rx_interface_stat *rpc_stat
+ = opr_queue_Entry(cursor, struct rx_interface_stat,
+ entry);
+
+ opr_queue_Remove(&rpc_stat->entry);
+ opr_queue_Remove(&rpc_stat->entryPeers);
num_funcs = rpc_stat->stats[0].func_total;
space =
sizeof(rx_interface_stat_t) +
void
rx_clearProcessRPCStats(afs_uint32 clearFlag)
{
- rx_interface_stat_p rpc_stat, nrpc_stat;
+ struct opr_queue *cursor;
MUTEX_ENTER(&rx_rpc_stats);
- for (queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
+ for (opr_queue_Scan(&processStats, cursor)) {
unsigned int num_funcs = 0, i;
+ struct rx_interface_stat *rpc_stat
+ = opr_queue_Entry(rpc_stat, struct rx_interface_stat, entry);
+
num_funcs = rpc_stat->stats[0].func_total;
for (i = 0; i < num_funcs; i++) {
if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
void
rx_clearPeerRPCStats(afs_uint32 clearFlag)
{
- rx_interface_stat_p rpc_stat, nrpc_stat;
+ struct opr_queue *cursor;
MUTEX_ENTER(&rx_rpc_stats);
- for (queue_Scan(&peerStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
- unsigned int num_funcs = 0, i;
- char *fix_offset;
- /*
- * We have to fix the offset of rpc_stat since we are
- * keeping this structure on two rx_queues. The rx_queue
- * package assumes that the rx_queue member is the first
- * member of the structure. That is, rx_queue assumes that
- * any one item is only on one queue at a time. We are
- * breaking that assumption and so we have to do a little
- * math to fix our pointers.
- */
-
- fix_offset = (char *)rpc_stat;
- fix_offset -= offsetof(rx_interface_stat_t, all_peers);
- rpc_stat = (rx_interface_stat_p) fix_offset;
+ for (opr_queue_Scan(&peerStats, cursor)) {
+ unsigned int num_funcs, i;
+ struct rx_interface_stat *rpc_stat
+ = opr_queue_Entry(cursor, struct rx_interface_stat, entryPeers);
num_funcs = rpc_stat->stats[0].func_total;
for (i = 0; i < num_funcs; i++) {
for (c = rx_allCallsp; c; c = c->allNextp) {
u_short rqc, tqc, iovqc;
- struct rx_packet *p, *np;
MUTEX_ENTER(&c->lock);
- queue_Count(&c->rq, p, np, rx_packet, rqc);
- queue_Count(&c->tq, p, np, rx_packet, tqc);
- queue_Count(&c->iovq, p, np, rx_packet, iovqc);
+ rqc = opr_queue_Count(&c->rq);
+ tqc = opr_queue_Count(&c->tq);
+ iovqc = opr_queue_Count(&c->iovq);
RXDPRINTF(RXDPRINTOUT, "%s - call=0x%p, id=%u, state=%u, mode=%u, conn=%p, epoch=%u, cid=%u, callNum=%u, connFlags=0x%x, flags=0x%x, "
"rqc=%u,%u, tqc=%u,%u, iovqc=%u,%u, "
# include <sys/sysmacros.h>
#endif
+#include <opr/queue.h>
+
#include "rx.h"
#include "rx_clock.h"
-#include "rx_queue.h"
#include "rx_packet.h"
#include "rx_atomic.h"
#include "rx_globals.h"
extern char cml_version_number[];
-static int AllocPacketBufs(int class, int num_pkts, struct rx_queue *q);
+static int AllocPacketBufs(int class, int num_pkts, struct opr_queue *q);
static void rxi_SendDebugPacket(struct rx_packet *apacket, osi_socket asocket,
afs_uint32 ahost, short aport,
static void rxi_FreePacketNoLock(struct rx_packet *p);
static int rxi_FreeDataBufsNoLock(struct rx_packet *p, afs_uint32 first);
static int rxi_FreeDataBufsToQueue(struct rx_packet *p, afs_uint32 first,
- struct rx_queue * q);
+ struct opr_queue * q);
#endif
-extern struct rx_queue rx_idleServerQueue;
+extern struct opr_queue rx_idleServerQueue;
/* some rules about packets:
* 1. When a packet is allocated, the final iov_buf contains room for
}
int
-rxi_AllocPackets(int class, int num_pkts, struct rx_queue * q)
+rxi_AllocPackets(int class, int num_pkts, struct opr_queue * q)
{
- struct rx_packet *p, *np;
+ struct opr_queue *c;
num_pkts = AllocPacketBufs(class, num_pkts, q);
- for (queue_Scan(q, p, np, rx_packet)) {
- RX_PACKET_IOV_FULLINIT(p);
+ for (opr_queue_Scan(q, c)) {
+ RX_PACKET_IOV_FULLINIT(opr_queue_Entry(c, struct rx_packet, entry));
}
return num_pkts;
#ifdef RX_ENABLE_TSFPQ
static int
-AllocPacketBufs(int class, int num_pkts, struct rx_queue * q)
+AllocPacketBufs(int class, int num_pkts, struct opr_queue * q)
{
struct rx_ts_info_t * rx_ts_info;
int transfer;
}
#else /* RX_ENABLE_TSFPQ */
static int
-AllocPacketBufs(int class, int num_pkts, struct rx_queue * q)
+AllocPacketBufs(int class, int num_pkts, struct opr_queue * q)
{
struct rx_packet *c;
int i;
}
#endif /* KERNEL */
- for (i=0, c=queue_First(&rx_freePacketQueue, rx_packet);
+ for (i=0, c=opr_queue_First(&rx_freePacketQueue, struct rx_packet, entry);
i < num_pkts;
- i++, c=queue_Next(c, rx_packet)) {
+ i++, c=opr_queue_Next(&c->entry, struct rx_packet, entry)) {
RX_FPQ_MARK_USED(c);
}
- queue_SplitBeforeAppend(&rx_freePacketQueue,q,c);
+ opr_queue_SplitBeforeAppend(&rx_freePacketQueue, q, &c->entry);
rx_nFreePackets -= num_pkts;
#ifdef RX_ENABLE_TSFPQ
/* num_pkts=0 means queue length is unknown */
int
-rxi_FreePackets(int num_pkts, struct rx_queue * q)
+rxi_FreePackets(int num_pkts, struct opr_queue * q)
{
struct rx_ts_info_t * rx_ts_info;
- struct rx_packet *c, *nc;
+ struct opr_queue *cursor, *store;
SPLVAR;
osi_Assert(num_pkts >= 0);
RX_TS_INFO_GET(rx_ts_info);
if (!num_pkts) {
- for (queue_Scan(q, c, nc, rx_packet), num_pkts++) {
- rxi_FreeDataBufsTSFPQ(c, 2, 0);
+ for (opr_queue_ScanSafe(q, cursor, store)) {
+ num_pkts++;
+ rxi_FreeDataBufsTSFPQ(opr_queue_Entry(cursor, struct rx_packet,
+ entry), 2, 0);
}
} else {
- for (queue_Scan(q, c, nc, rx_packet)) {
- rxi_FreeDataBufsTSFPQ(c, 2, 0);
+ for (opr_queue_ScanSafe(q, cursor, store)) {
+ rxi_FreeDataBufsTSFPQ(opr_queue_Entry(cursor, struct rx_packet,
+ entry), 2, 0);
}
}
#else /* RX_ENABLE_TSFPQ */
/* num_pkts=0 means queue length is unknown */
int
-rxi_FreePackets(int num_pkts, struct rx_queue *q)
+rxi_FreePackets(int num_pkts, struct opr_queue *q)
{
- struct rx_queue cbs;
- struct rx_packet *p, *np;
+ struct opr_queue cbs;
+ struct opr_queue *cursor, *store;
int qlen = 0;
SPLVAR;
osi_Assert(num_pkts >= 0);
- queue_Init(&cbs);
+ opr_queue_Init(&cbs);
if (!num_pkts) {
- for (queue_Scan(q, p, np, rx_packet), num_pkts++) {
+ for (opr_queue_ScanSafe(q, cursor, store)) {
+ struct rx_packet *p
+ = opr_queue_Entry(cursor, struct rx_packet, entry);
if (p->niovecs > 2) {
qlen += rxi_FreeDataBufsToQueue(p, 2, &cbs);
}
if (!num_pkts)
return 0;
} else {
- for (queue_Scan(q, p, np, rx_packet)) {
+ for (opr_queue_ScanSafe(q, cursor, store)) {
+ struct rx_packet *p
+ = opr_queue_Entry(cursor, struct rx_packet, entry);
+
if (p->niovecs > 2) {
qlen += rxi_FreeDataBufsToQueue(p, 2, &cbs);
}
}
if (qlen) {
- queue_SpliceAppend(q, &cbs);
+ opr_queue_SpliceAppend(q, &cbs);
qlen += num_pkts;
} else
qlen = num_pkts;
NETPRI;
MUTEX_ENTER(&rx_freePktQ_lock);
- queue_SpliceAppend(&rx_freePacketQueue, q);
+ opr_queue_SpliceAppend(&rx_freePacketQueue, q);
rx_nFreePackets += qlen;
/* Wakeup anyone waiting for packets */
rxi_AllocDataBuf(struct rx_packet *p, int nb, int class)
{
int i, nv;
- struct rx_queue q;
- struct rx_packet *cb, *ncb;
+ struct opr_queue q, *cursor, *store;
/* compute the number of cbuf's we need */
nv = nb / RX_CBUFFERSIZE;
return nb;
/* allocate buffers */
- queue_Init(&q);
+ opr_queue_Init(&q);
nv = AllocPacketBufs(class, nv, &q);
/* setup packet iovs */
- for (i = p->niovecs, queue_Scan(&q, cb, ncb, rx_packet), i++) {
- queue_Remove(cb);
+ i = p ->niovecs;
+ for (opr_queue_ScanSafe(&q, cursor, store)) {
+ struct rx_packet *cb
+ = opr_queue_Entry(cursor, struct rx_packet, entry);
+
+ opr_queue_Remove(&cb->entry);
p->wirevec[i].iov_base = (caddr_t) cb->localdata;
p->wirevec[i].iov_len = RX_CBUFFERSIZE;
+ i++;
}
nb -= (nv * RX_CBUFFERSIZE);
#endif
p->niovecs = 2;
- queue_Append(&rx_freePacketQueue, p);
+ opr_queue_Append(&rx_freePacketQueue, &p->entry);
#ifdef RXDEBUG_PACKET
p->packetId = rx_packet_id++;
p->allNextp = rx_mallocedP;
#endif
p->niovecs = 2;
- queue_Append(&rx_freePacketQueue, p);
+ opr_queue_Append(&rx_freePacketQueue, &p->entry);
#ifdef RXDEBUG_PACKET
p->packetId = rx_packet_id++;
p->allNextp = rx_mallocedP;
RX_FPQ_MARK_FREE(p);
rx_nFreePackets++;
- queue_Append(&rx_freePacketQueue, p);
+ opr_queue_Append(&rx_freePacketQueue, &p->entry);
}
#endif /* RX_ENABLE_TSFPQ */
*/
#ifndef RX_ENABLE_TSFPQ
static int
-rxi_FreeDataBufsToQueue(struct rx_packet *p, afs_uint32 first, struct rx_queue * q)
+rxi_FreeDataBufsToQueue(struct rx_packet *p, afs_uint32 first, struct opr_queue * q)
{
struct iovec *iov;
struct rx_packet * cb;
osi_Panic("rxi_FreeDataBufsToQueue: unexpected NULL iov");
cb = RX_CBUF_TO_PACKET(iov->iov_base, p);
RX_FPQ_MARK_FREE(cb);
- queue_Append(q, cb);
+ opr_queue_Append(q, &cb->entry);
}
p->length = 0;
p->niovecs = 0;
if (rx_stats_active)
rx_atomic_inc(&rx_stats.packetRequests);
- if (queue_IsEmpty(&rx_ts_info->_FPQ)) {
+ if (opr_queue_IsEmpty(&rx_ts_info->_FPQ.queue)) {
#ifdef KERNEL
- if (queue_IsEmpty(&rx_freePacketQueue))
+ if (opr_queue_IsEmpty(&rx_freePacketQueue))
osi_Panic("rxi_AllocPacket error");
#else /* KERNEL */
- if (queue_IsEmpty(&rx_freePacketQueue))
+ if (opr_queue_IsEmpty(&rx_freePacketQueue))
rxi_MorePacketsNoLock(rx_maxSendWindow);
#endif /* KERNEL */
rx_atomic_inc(&rx_stats.packetRequests);
#ifdef KERNEL
- if (queue_IsEmpty(&rx_freePacketQueue))
+ if (opr_queue_IsEmpty(&rx_freePacketQueue))
osi_Panic("rxi_AllocPacket error");
#else /* KERNEL */
- if (queue_IsEmpty(&rx_freePacketQueue))
+ if (opr_queue_IsEmpty(&rx_freePacketQueue))
rxi_MorePacketsNoLock(rx_maxSendWindow);
#endif /* KERNEL */
rx_nFreePackets--;
- p = queue_First(&rx_freePacketQueue, rx_packet);
- queue_Remove(p);
+ p = opr_queue_First(&rx_freePacketQueue, struct rx_packet, entry);
+ opr_queue_Remove(&p->entry);
RX_FPQ_MARK_USED(p);
dpf(("Alloc %"AFS_PTR_FMT", class %d\n", p, class));
if (rx_stats_active)
rx_atomic_inc(&rx_stats.packetRequests);
- if (pull_global && queue_IsEmpty(&rx_ts_info->_FPQ)) {
+ if (pull_global && opr_queue_IsEmpty(&rx_ts_info->_FPQ.queue)) {
MUTEX_ENTER(&rx_freePktQ_lock);
- if (queue_IsEmpty(&rx_freePacketQueue))
+ if (opr_queue_IsEmpty(&rx_freePacketQueue))
rxi_MorePacketsNoLock(rx_maxSendWindow);
RX_TS_FPQ_GTOL(rx_ts_info);
MUTEX_EXIT(&rx_freePktQ_lock);
- } else if (queue_IsEmpty(&rx_ts_info->_FPQ)) {
+ } else if (opr_queue_IsEmpty(&rx_ts_info->_FPQ.queue)) {
return NULL;
}
{
struct rx_debugIn tin;
afs_int32 tl;
- struct rx_serverQueueEntry *np, *nqe;
/*
* Only respond to client-initiated Rx debug packets,
tstat.usedFDs = CountFDs(64);
tstat.nWaiting = htonl(rx_atomic_read(&rx_nWaiting));
tstat.nWaited = htonl(rx_atomic_read(&rx_nWaited));
- queue_Count(&rx_idleServerQueue, np, nqe, rx_serverQueueEntry,
- tstat.idleThreads);
+ tstat.idleThreads = opr_queue_Count(&rx_idleServerQueue);
MUTEX_EXIT(&rx_serverPool_lock);
tstat.idleThreads = htonl(tstat.idleThreads);
tl = sizeof(struct rx_debugStats) - ap->length;
tconn.callState[j] = tcall->state;
tconn.callMode[j] = tcall->mode;
tconn.callFlags[j] = tcall->flags;
- if (queue_IsNotEmpty(&tcall->rq))
+ if (!opr_queue_IsEmpty(&tcall->rq))
tconn.callOther[j] |= RX_OTHER_IN;
- if (queue_IsNotEmpty(&tcall->tq))
+ if (!opr_queue_IsEmpty(&tcall->tq))
tconn.callOther[j] |= RX_OTHER_OUT;
} else
tconn.callState[j] = RX_STATE_NOTINIT;