# include <WINNT\afsreg.h>
# endif
+# include <afs/opr.h>
+
# include "rx_user.h"
#endif /* KERNEL */
+#include <opr/queue.h>
+
#include "rx.h"
#include "rx_clock.h"
-#include "rx_queue.h"
#include "rx_atomic.h"
#include "rx_globals.h"
#include "rx_trace.h"
#include "rx_conn.h"
#include "rx_call.h"
#include "rx_packet.h"
+#include "rx_server.h"
#include <afs/rxgen_consts.h>
rx_atomic_t rx_nWaiting = RX_ATOMIC_INIT(0);
rx_atomic_t rx_nWaited = RX_ATOMIC_INIT(0);
+/* Incoming calls wait on this queue when there are no available
+ * server processes */
+struct opr_queue rx_incomingCallQueue;
+
+/* Server processes wait on this queue when there are no appropriate
+ * calls to process */
+struct opr_queue rx_idleServerQueue;
+
#if !defined(offsetof)
#include <stddef.h> /* for definition of offsetof() */
#endif
#endif /* RX_LOCKS_DB */
MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_quota_mutex, "rx_quota_mutex", MUTEX_DEFAULT, 0);
+ MUTEX_INIT(&rx_atomic_mutex, "rx_atomic_mutex", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_pthread_mutex, "rx_pthread_mutex", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_packets_mutex, "rx_packets_mutex", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_refcnt_mutex, "rx_refcnt_mutex", MUTEX_DEFAULT, 0);
rx_connDeadTime = 12;
rx_tranquil = 0; /* reset flag */
rxi_ResetStatistics();
- htable = (char *)
- osi_Alloc(rx_hashTableSize * sizeof(struct rx_connection *));
+ htable = osi_Alloc(rx_hashTableSize * sizeof(struct rx_connection *));
PIN(htable, rx_hashTableSize * sizeof(struct rx_connection *)); /* XXXXX */
memset(htable, 0, rx_hashTableSize * sizeof(struct rx_connection *));
- ptable = (char *)osi_Alloc(rx_hashTableSize * sizeof(struct rx_peer *));
+ ptable = osi_Alloc(rx_hashTableSize * sizeof(struct rx_peer *));
PIN(ptable, rx_hashTableSize * sizeof(struct rx_peer *)); /* XXXXX */
memset(ptable, 0, rx_hashTableSize * sizeof(struct rx_peer *));
/* Malloc up a bunch of packets & buffers */
rx_nFreePackets = 0;
- queue_Init(&rx_freePacketQueue);
+ opr_queue_Init(&rx_freePacketQueue);
rxi_NeedMorePackets = FALSE;
rx_nPackets = 0; /* rx_nPackets is managed by rxi_MorePackets* */
#endif
if (getsockname((intptr_t)rx_socket, (struct sockaddr *)&addr, &addrlen)) {
rx_Finalize();
+ osi_Free(htable, rx_hashTableSize * sizeof(struct rx_connection *));
return -1;
}
rx_port = addr.sin_port;
rxevent_Init(20, rxi_ReScheduleEvents);
/* Initialize various global queues */
- queue_Init(&rx_idleServerQueue);
- queue_Init(&rx_incomingCallQueue);
- queue_Init(&rx_freeCallQueue);
+ opr_queue_Init(&rx_idleServerQueue);
+ opr_queue_Init(&rx_incomingCallQueue);
+ opr_queue_Init(&rx_freeCallQueue);
#if defined(AFS_NT40_ENV) && !defined(KERNEL)
/* Initialize our list of usable IP addresses. */
static_inline void
rxi_rto_packet_acked(struct rx_call *call, int istack)
{
- struct rx_packet *p, *nxp;
+ struct opr_queue *cursor;
rxi_rto_cancel(call);
- if (queue_IsEmpty(&call->tq))
+ if (opr_queue_IsEmpty(&call->tq))
return;
- for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
+ for (opr_queue_Scan(&call->tq, cursor)) {
+ struct rx_packet *p = opr_queue_Entry(cursor, struct rx_packet, entry);
if (p->header.seq > call->tfirst + call->twind)
return;
call->state = RX_STATE_ACTIVE;
call->error = conn->error;
if (call->error)
- call->mode = RX_MODE_ERROR;
+ call->app.mode = RX_MODE_ERROR;
else
- call->mode = RX_MODE_SENDING;
+ call->app.mode = RX_MODE_SENDING;
+
+#ifdef AFS_RXERRQ_ENV
+ /* remember how many network errors the peer has when we started, so if
+ * more errors are encountered after the call starts, we know the other endpoint won't be
+ * responding to us */
+ call->neterr_gen = rx_atomic_read(&conn->peer->neterrs);
+#endif
/* remember start time for call in case we have hard dead time limit */
call->queueTime = queueTime;
clock_GetTime(&call->startTime);
- hzero(call->bytesSent);
- hzero(call->bytesRcvd);
+ call->bytesSent = 0;
+ call->bytesRcvd = 0;
/* Turn on busy protocol. */
rxi_KeepAliveOn(call);
rx_WakeupServerProcs(void)
{
struct rx_serverQueueEntry *np, *tqp;
+ struct opr_queue *cursor;
SPLVAR;
NETPRI;
#endif /* RX_ENABLE_LOCKS */
}
MUTEX_EXIT(&freeSQEList_lock);
- for (queue_Scan(&rx_idleServerQueue, np, tqp, rx_serverQueueEntry)) {
+ for (opr_queue_Scan(&rx_idleServerQueue, cursor)) {
+ np = opr_queue_Entry(cursor, struct rx_serverQueueEntry, entry);
#ifdef RX_ENABLE_LOCKS
CV_BROADCAST(&np->cv);
#else /* RX_ENABLE_LOCKS */
ReturnToServerPool(cur_service);
}
while (1) {
- if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
- struct rx_call *tcall, *ncall, *choice2 = NULL;
+ if (!opr_queue_IsEmpty(&rx_incomingCallQueue)) {
+ struct rx_call *tcall, *choice2 = NULL;
+ struct opr_queue *cursor;
/* Scan for eligible incoming calls. A call is not eligible
* if the maximum number of calls for its service type are
* while the other threads may run ahead looking for calls which
* have all their input data available immediately. This helps
* keep threads from blocking, waiting for data from the client. */
- for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
+ for (opr_queue_Scan(&rx_incomingCallQueue, cursor)) {
+ tcall = opr_queue_Entry(cursor, struct rx_call, entry);
+
service = tcall->conn->service;
if (!QuotaOK(service)) {
continue;
}
MUTEX_ENTER(&rx_pthread_mutex);
if (tno == rxi_fcfs_thread_num
- || queue_IsLast(&rx_incomingCallQueue, tcall)) {
+ || opr_queue_IsEnd(&rx_incomingCallQueue, cursor)) {
MUTEX_EXIT(&rx_pthread_mutex);
/* If we're the fcfs thread , then we'll just use
* this call. If we haven't been able to find an optimal
service = call->conn->service;
} else {
MUTEX_EXIT(&rx_pthread_mutex);
- if (!queue_IsEmpty(&tcall->rq)) {
+ if (!opr_queue_IsEmpty(&tcall->rq)) {
struct rx_packet *rp;
- rp = queue_First(&tcall->rq, rx_packet);
+ rp = opr_queue_First(&tcall->rq, struct rx_packet,
+ entry);
if (rp->header.seq == 1) {
if (!meltdown_1pkt
|| (rp->header.flags & RX_LAST_PACKET)) {
}
if (call) {
- queue_Remove(call);
+ opr_queue_Remove(&call->entry);
MUTEX_EXIT(&rx_serverPool_lock);
MUTEX_ENTER(&call->lock);
continue;
}
- if (queue_IsEmpty(&call->rq)
- || queue_First(&call->rq, rx_packet)->header.seq != 1)
+ if (opr_queue_IsEmpty(&call->rq)
+ || opr_queue_First(&call->rq, struct rx_packet, entry)->header.seq != 1)
rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
CLEAR_CALL_QUEUE_LOCK(call);
*socketp = OSI_NULLSOCKET;
}
sq->socketp = socketp;
- queue_Append(&rx_idleServerQueue, sq);
+ opr_queue_Append(&rx_idleServerQueue, &sq->entry);
#ifndef AFS_AIX41_ENV
rx_waitForPacket = sq;
#else
if (call) {
clock_GetTime(&call->startTime);
call->state = RX_STATE_ACTIVE;
- call->mode = RX_MODE_RECEIVING;
+ call->app.mode = RX_MODE_RECEIVING;
#ifdef RX_KERNEL_TRACE
if (ICL_SETACTIVE(afs_iclSetp)) {
int glockOwner = ISAFS_GLOCK();
rxi_availProcs++;
MUTEX_EXIT(&rx_quota_mutex);
}
- if (queue_IsNotEmpty(&rx_incomingCallQueue)) {
- struct rx_call *tcall, *ncall;
+ if (!opr_queue_IsEmpty(&rx_incomingCallQueue)) {
+ struct rx_call *tcall;
+ struct opr_queue *cursor;
/* Scan for eligible incoming calls. A call is not eligible
* if the maximum number of calls for its service type are
* already executing */
* have all their input data available immediately. This helps
* keep threads from blocking, waiting for data from the client. */
choice2 = (struct rx_call *)0;
- for (queue_Scan(&rx_incomingCallQueue, tcall, ncall, rx_call)) {
+ for (opr_queue_Scan(&rx_incomingCallQueue, cursor)) {
+ tcall = opr_queue_Entry(cursor, struct rx_call, entry);
service = tcall->conn->service;
if (QuotaOK(service)) {
MUTEX_ENTER(&rx_pthread_mutex);
- if (tno == rxi_fcfs_thread_num
- || !tcall->queue_item_header.next) {
+ /* XXX - If tcall->entry.next is NULL, then we're no longer
+ * on a queue at all. This shouldn't happen. */
+ if (tno == rxi_fcfs_thread_num || !tcall->entry.next) {
MUTEX_EXIT(&rx_pthread_mutex);
/* If we're the fcfs thread, then we'll just use
* this call. If we haven't been able to find an optimal
service = call->conn->service;
} else {
MUTEX_EXIT(&rx_pthread_mutex);
- if (!queue_IsEmpty(&tcall->rq)) {
+ if (!opr_queue_IsEmpty(&tcall->rq)) {
struct rx_packet *rp;
- rp = queue_First(&tcall->rq, rx_packet);
+ rp = opr_queue_First(&tcall->rq, struct rx_packet,
+ entry);
if (rp->header.seq == 1
&& (!meltdown_1pkt
|| (rp->header.flags & RX_LAST_PACKET))) {
}
if (call) {
- queue_Remove(call);
+ opr_queue_Remove(&call->entry);
/* we can't schedule a call if there's no data!!! */
/* send an ack if there's no data, if we're missing the
* first packet, or we're missing something between first
* and last -- there's a "hole" in the incoming data. */
- if (queue_IsEmpty(&call->rq)
- || queue_First(&call->rq, rx_packet)->header.seq != 1
- || call->rprev != queue_Last(&call->rq, rx_packet)->header.seq)
+ if (opr_queue_IsEmpty(&call->rq)
+ || opr_queue_First(&call->rq, struct rx_packet, entry)->header.seq != 1
+ || call->rprev != opr_queue_Last(&call->rq, struct rx_packet, entry)->header.seq)
rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
call->flags &= (~RX_CALL_WAIT_PROC);
*socketp = OSI_NULLSOCKET;
}
sq->socketp = socketp;
- queue_Append(&rx_idleServerQueue, sq);
+ opr_queue_Append(&rx_idleServerQueue, &sq->entry);
do {
osi_rxSleep(sq);
#ifdef KERNEL
if (call) {
clock_GetTime(&call->startTime);
call->state = RX_STATE_ACTIVE;
- call->mode = RX_MODE_RECEIVING;
+ call->app.mode = RX_MODE_RECEIVING;
#ifdef RX_KERNEL_TRACE
if (ICL_SETACTIVE(afs_iclSetp)) {
int glockOwner = ISAFS_GLOCK();
call->arrivalProc = (void (*)())0;
if (rc && call->error == 0) {
rxi_CallError(call, rc);
- call->mode = RX_MODE_ERROR;
+ call->app.mode = RX_MODE_ERROR;
/* Send an abort message to the peer if this error code has
* only just been set. If it was set previously, assume the
* peer has already been sent the error code or will request it
}
if (conn->type == RX_SERVER_CONNECTION) {
/* Make sure reply or at least dummy reply is sent */
- if (call->mode == RX_MODE_RECEIVING) {
+ if (call->app.mode == RX_MODE_RECEIVING) {
MUTEX_EXIT(&call->lock);
rxi_WriteProc(call, 0, 0);
MUTEX_ENTER(&call->lock);
}
- if (call->mode == RX_MODE_SENDING) {
+ if (call->app.mode == RX_MODE_SENDING) {
MUTEX_EXIT(&call->lock);
rxi_FlushWrite(call);
MUTEX_ENTER(&call->lock);
char dummy;
/* Make sure server receives input packets, in the case where
* no reply arguments are expected */
- if ((call->mode == RX_MODE_SENDING)
- || (call->mode == RX_MODE_RECEIVING && call->rnext == 1)) {
+
+ if ((call->app.mode == RX_MODE_SENDING)
+ || (call->app.mode == RX_MODE_RECEIVING && call->rnext == 1)) {
MUTEX_EXIT(&call->lock);
(void)rxi_ReadProc(call, &dummy, 1);
MUTEX_ENTER(&call->lock);
* ResetCall cannot: ResetCall may be called at splnet(), in the
* kernel version, and may interrupt the macros rx_Read or
* rx_Write, which run at normal priority for efficiency. */
- if (call->currentPacket) {
+ if (call->app.currentPacket) {
#ifdef RX_TRACK_PACKETS
- call->currentPacket->flags &= ~RX_PKTFLAG_CP;
+ call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
#endif
- rxi_FreePacket(call->currentPacket);
- call->currentPacket = (struct rx_packet *)0;
+ rxi_FreePacket(call->app.currentPacket);
+ call->app.currentPacket = (struct rx_packet *)0;
}
- call->nLeft = call->nFree = call->curlen = 0;
+ call->app.nLeft = call->app.nFree = call->app.curlen = 0;
/* Free any packets from the last call to ReadvProc/WritevProc */
#ifdef RXDEBUG_PACKET
call->iovqc -=
#endif /* RXDEBUG_PACKET */
- rxi_FreePackets(0, &call->iovq);
+ rxi_FreePackets(0, &call->app.iovq);
MUTEX_EXIT(&call->lock);
CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
struct rx_call *call;
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
struct rx_call *cp; /* Call pointer temp */
- struct rx_call *nxp; /* Next call pointer, for queue_Scan */
+ struct opr_queue *cursor;
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
dpf(("rxi_NewCall(conn %"AFS_PTR_FMT", channel %d)\n", conn, channel));
* Skip over those with in-use TQs.
*/
call = NULL;
- for (queue_Scan(&rx_freeCallQueue, cp, nxp, rx_call)) {
+ for (opr_queue_Scan(&rx_freeCallQueue, cursor)) {
+ cp = opr_queue_Entry(cursor, struct rx_call, entry);
if (!(cp->flags & RX_CALL_TQ_BUSY)) {
call = cp;
break;
}
if (call) {
#else /* AFS_GLOBAL_RXLOCK_KERNEL */
- if (queue_IsNotEmpty(&rx_freeCallQueue)) {
- call = queue_First(&rx_freeCallQueue, rx_call);
+ if (!opr_queue_IsEmpty(&rx_freeCallQueue)) {
+ call = opr_queue_First(&rx_freeCallQueue, struct rx_call, entry);
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- queue_Remove(call);
+ opr_queue_Remove(&call->entry);
if (rx_stats_active)
rx_atomic_dec(&rx_stats.nFreeCallStructs);
MUTEX_EXIT(&rx_freeCallQueue_lock);
CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
/* Initialize once-only items */
- queue_Init(&call->tq);
- queue_Init(&call->rq);
- queue_Init(&call->iovq);
+ opr_queue_Init(&call->tq);
+ opr_queue_Init(&call->rq);
+ opr_queue_Init(&call->app.iovq);
#ifdef RXDEBUG_PACKET
call->rqc = call->tqc = call->iovqc = 0;
#endif /* RXDEBUG_PACKET */
* the head of the list, and idle calls at the tail.
*/
if (call->flags & RX_CALL_TQ_BUSY)
- queue_Prepend(&rx_freeCallQueue, call);
+ opr_queue_Prepend(&rx_freeCallQueue, &call->entry);
else
- queue_Append(&rx_freeCallQueue, call);
+ opr_queue_Append(&rx_freeCallQueue, &call->entry);
#else /* AFS_GLOBAL_RXLOCK_KERNEL */
- queue_Append(&rx_freeCallQueue, call);
+ opr_queue_Append(&rx_freeCallQueue, &call->entry);
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
if (rx_stats_active)
rx_atomic_inc(&rx_stats.nFreeCallStructs);
MUTEX_EXIT(&rx_peerHashTable_lock);
}
+#ifdef AFS_RXERRQ_ENV
+static void
+rxi_SetPeerDead(afs_uint32 host, afs_uint16 port)
+{
+ int hashIndex = PEER_HASH(host, port);
+ struct rx_peer *peer;
+
+ MUTEX_ENTER(&rx_peerHashTable_lock);
+
+ for (peer = rx_peerHashTable[hashIndex]; peer; peer = peer->next) {
+ if (peer->host == host && peer->port == port) {
+ break;
+ }
+ }
+
+ if (peer) {
+ rx_atomic_inc(&peer->neterrs);
+ }
+
+ MUTEX_EXIT(&rx_peerHashTable_lock);
+}
+
+void
+rxi_ProcessNetError(struct sock_extended_err *err, afs_uint32 addr, afs_uint16 port)
+{
+# ifdef AFS_ADAPT_PMTU
+ if (err->ee_errno == EMSGSIZE && err->ee_info >= 68) {
+ rxi_SetPeerMtu(NULL, addr, port, err->ee_info - RX_IPUDP_SIZE);
+ return;
+ }
+# endif
+ if (err->ee_origin == SO_EE_ORIGIN_ICMP && err->ee_type == ICMP_DEST_UNREACH) {
+ switch (err->ee_code) {
+ case ICMP_NET_UNREACH:
+ case ICMP_HOST_UNREACH:
+ case ICMP_PORT_UNREACH:
+ case ICMP_NET_ANO:
+ case ICMP_HOST_ANO:
+ rxi_SetPeerDead(addr, port);
+ break;
+ }
+ }
+}
+#endif /* AFS_RXERRQ_ENV */
+
/* Find the peer process represented by the supplied (host,port)
* combination. If there is no appropriate active peer structure, a
* new one will be allocated and initialized
pp = rxi_AllocPeer(); /* This bzero's *pp */
pp->host = host; /* set here or in InitPeerParams is zero */
pp->port = port;
+#ifdef AFS_RXERRQ_ENV
+ rx_atomic_set(&pp->neterrs, 0);
+#endif
MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
- queue_Init(&pp->rpcStats);
+ opr_queue_Init(&pp->rpcStats);
pp->next = rx_peerHashTable[hashIndex];
rx_peerHashTable[hashIndex] = pp;
rxi_InitPeerParams(pp);
np->header.seq, np->header.flags, np));
#endif
+ /* Account for connectionless packets */
+ if (rx_stats_active &&
+ ((np->header.type == RX_PACKET_TYPE_VERSION) ||
+ (np->header.type == RX_PACKET_TYPE_DEBUG))) {
+ struct rx_peer *peer;
+
+ /* Try to look up the peer structure, but don't create one */
+ peer = rxi_FindPeer(host, port, 0, 0);
+
+ /* Since this may not be associated with a connection, it may have
+ * no refCount, meaning we could race with ReapConnections
+ */
+
+ if (peer && (peer->refCount > 0)) {
+#ifdef AFS_RXERRQ_ENV
+ if (rx_atomic_read(&peer->neterrs)) {
+ rx_atomic_set(&peer->neterrs, 0);
+ }
+#endif
+ MUTEX_ENTER(&peer->peer_lock);
+ peer->bytesReceived += np->length;
+ MUTEX_EXIT(&peer->peer_lock);
+ }
+ }
+
if (np->header.type == RX_PACKET_TYPE_VERSION) {
return rxi_ReceiveVersionPacket(np, socket, host, port, 1);
}
np->header.cid, np->header.epoch, type,
np->header.securityIndex);
+ /* To avoid having 2 connections just abort at each other,
+ don't abort an abort. */
if (!conn) {
- /* If no connection found or fabricated, just ignore the packet.
- * (An argument could be made for sending an abort packet for
- * the conn) */
- return np;
+ if (np->header.type != RX_PACKET_TYPE_ABORT)
+ rxi_SendRawAbort(socket, host, port, RX_INVALID_OPERATION,
+ np, 0);
+ return np;
+ }
+
+#ifdef AFS_RXERRQ_ENV
+ if (rx_atomic_read(&conn->peer->neterrs)) {
+ rx_atomic_set(&conn->peer->neterrs, 0);
+ }
+#endif
+
+ /* If we're doing statistics, then account for the incoming packet */
+ if (rx_stats_active) {
+ MUTEX_ENTER(&conn->peer->peer_lock);
+ conn->peer->bytesReceived += np->length;
+ MUTEX_EXIT(&conn->peer->peer_lock);
}
/* If the connection is in an error state, send an abort packet and ignore
#endif
call->state = RX_STATE_PRECALL;
clock_GetTime(&call->queueTime);
- hzero(call->bytesSent);
- hzero(call->bytesRcvd);
+ call->bytesSent = 0;
+ call->bytesRcvd = 0;
/*
* If the number of queued calls exceeds the overload
* threshold then abort this call.
#endif
call->state = RX_STATE_PRECALL;
clock_GetTime(&call->queueTime);
- hzero(call->bytesSent);
- hzero(call->bytesRcvd);
+ call->bytesSent = 0;
+ call->bytesRcvd = 0;
/*
* If the number of queued calls exceeds the overload
* threshold then abort this call.
if ((tcall->state == RX_STATE_PRECALL)
|| (tcall->state == RX_STATE_ACTIVE))
return 1;
- if ((tcall->mode == RX_MODE_SENDING)
- || (tcall->mode == RX_MODE_RECEIVING))
+ if ((tcall->app.mode == RX_MODE_SENDING)
+ || (tcall->app.mode == RX_MODE_RECEIVING))
return 1;
}
}
MUTEX_EXIT(&rx_freePktQ_lock);
if (rx_stats_active)
rx_atomic_inc(&rx_stats.noPacketBuffersOnRead);
- call->rprev = np->header.serial;
rxi_calltrace(RX_TRACE_DROP, call);
dpf(("packet %"AFS_PTR_FMT" dropped on receipt - quota problems\n", np));
/* We used to clear the receive queue here, in an attempt to free
* packets. However this is unsafe if the queue has received a
* soft ACK for the final packet */
rxi_PostDelayedAckEvent(call, &rx_softAckDelay);
-
- /* we've damaged this call already, might as well do it in. */
return np;
}
#endif /* KERNEL */
if (seq == call->rnext) {
/* Check to make sure it is not a duplicate of one already queued */
- if (queue_IsNotEmpty(&call->rq)
- && queue_First(&call->rq, rx_packet)->header.seq == seq) {
+ if (!opr_queue_IsEmpty(&call->rq)
+ && opr_queue_First(&call->rq, struct rx_packet, entry)->header.seq == seq) {
if (rx_stats_active)
rx_atomic_inc(&rx_stats.dupPacketsRead);
dpf(("packet %"AFS_PTR_FMT" dropped on receipt - duplicate\n", np));
#ifdef RX_TRACK_PACKETS
np->flags |= RX_PKTFLAG_RQ;
#endif
- queue_Prepend(&call->rq, np);
+ opr_queue_Prepend(&call->rq, &np->entry);
#ifdef RXDEBUG_PACKET
call->rqc++;
#endif /* RXDEBUG_PACKET */
/* Check whether we have all of the packets for this call */
if (call->flags & RX_CALL_HAVE_LAST) {
afs_uint32 tseq; /* temporary sequence number */
- struct rx_packet *tp; /* Temporary packet pointer */
- struct rx_packet *nxp; /* Next pointer, for queue_Scan */
+ struct opr_queue *cursor;
- for (tseq = seq, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
+ for (tseq = seq, opr_queue_Scan(&call->rq, cursor)) {
+ struct rx_packet *tp;
+
+ tp = opr_queue_Entry(cursor, struct rx_packet, entry);
if (tseq != tp->header.seq)
break;
if (tp->header.flags & RX_LAST_PACKET) {
* any of this packets predecessors are missing. */
afs_uint32 prev; /* "Previous packet" sequence number */
- struct rx_packet *tp; /* Temporary packet pointer */
- struct rx_packet *nxp; /* Next pointer, for queue_Scan */
+ struct opr_queue *cursor;
int missing; /* Are any predecessors missing? */
/* If the new packet's sequence number has been sent to the
}
/* Look for the packet in the queue of old received packets */
- for (prev = call->rnext - 1, missing =
- 0, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
+ prev = call->rnext - 1;
+ missing = 0;
+ for (opr_queue_Scan(&call->rq, cursor)) {
+ struct rx_packet *tp
+ = opr_queue_Entry(cursor, struct rx_packet, entry);
+
/*Check for duplicate packet */
if (seq == tp->header.seq) {
if (rx_stats_active)
#ifdef RXDEBUG_PACKET
call->rqc++;
#endif /* RXDEBUG_PACKET */
- queue_InsertBefore(tp, np);
+ opr_queue_InsertBefore(cursor, &np->entry);
call->nSoftAcks++;
np = NULL;
&& !(call->flags & RX_CALL_RECEIVE_DONE)) {
afs_uint32 tseq; /* temporary sequence number */
- for (tseq =
- call->rnext, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
+ tseq = call->rnext;
+ for (opr_queue_Scan(&call->rq, cursor)) {
+ struct rx_packet *tp
+ = opr_queue_Entry(cursor, struct rx_packet, entry);
if (tseq != tp->header.seq)
break;
if (tp->header.flags & RX_LAST_PACKET) {
struct rx_ackPacket *ap;
int nAcks;
struct rx_packet *tp;
- struct rx_packet *nxp; /* Next packet pointer for queue_Scan */
struct rx_connection *conn = call->conn;
struct rx_peer *peer = conn->peer;
+ struct opr_queue *cursor;
struct clock now; /* Current time, for RTT calculations */
afs_uint32 first;
afs_uint32 prev;
prev = ntohl(ap->previousPacket);
serial = ntohl(ap->serial);
- /* Ignore ack packets received out of order */
+ /*
+ * Ignore ack packets received out of order while protecting
+ * against peers that set the previousPacket field to a packet
+ * serial number instead of a sequence number.
+ */
if (first < call->tfirst ||
- (first == call->tfirst && prev < call->tprev)) {
+ (first == call->tfirst && prev < call->tprev && prev < call->tfirst
+ + call->twind)) {
return np;
}
* disposed of
*/
- tp = queue_First(&call->tq, rx_packet);
- while(!queue_IsEnd(&call->tq, tp) && tp->header.seq < first) {
+ tp = opr_queue_First(&call->tq, struct rx_packet, entry);
+ while(!opr_queue_IsEnd(&call->tq, &tp->entry) && tp->header.seq < first) {
struct rx_packet *next;
- next = queue_Next(tp, rx_packet);
+ next = opr_queue_Next(&tp->entry, struct rx_packet, entry);
call->tfirst = tp->header.seq + 1;
if (!(tp->flags & RX_PKTFLAG_ACKED)) {
} else
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
{
- queue_Remove(tp);
+ opr_queue_Remove(&tp->entry);
#ifdef RX_TRACK_PACKETS
tp->flags &= ~RX_PKTFLAG_TQ;
#endif
call->nSoftAcked = 0;
missing = 0;
- while (!queue_IsEnd(&call->tq, tp) && tp->header.seq < first + nAcks) {
+ while (!opr_queue_IsEnd(&call->tq, &tp->entry)
+ && tp->header.seq < first + nAcks) {
/* Set the acknowledge flag per packet based on the
* information in the ack packet. An acknowlegded packet can
* be downgraded when the server has discarded a packet it
missing = 1;
}
- tp = queue_Next(tp, rx_packet);
+ tp = opr_queue_Next(&tp->entry, struct rx_packet, entry);
}
/* We don't need to take any action with the 3rd or 4th section in the
* so we will retransmit as soon as the window permits
*/
- for (acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
+ acked = 0;
+ for (opr_queue_ScanBackwards(&call->tq, cursor)) {
+ struct rx_packet *tp =
+ opr_queue_Entry(cursor, struct rx_packet, entry);
if (acked) {
if (!(tp->flags & RX_PKTFLAG_ACKED)) {
tp->flags &= ~RX_PKTFLAG_SENT;
call->state = RX_STATE_DALLY;
rxi_ClearTransmitQueue(call, 0);
rxevent_Cancel(&call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
- } else if (!queue_IsEmpty(&call->tq)) {
+ } else if (!opr_queue_IsEmpty(&call->tq)) {
rxi_Start(call, istack);
}
return np;
MUTEX_ENTER(&rx_serverPool_lock);
haveQuota = QuotaOK(service);
- if ((!haveQuota) || queue_IsEmpty(&rx_idleServerQueue)) {
+ if ((!haveQuota) || opr_queue_IsEmpty(&rx_idleServerQueue)) {
/* If there are no processes available to service this call,
* put the call on the incoming call queue (unless it's
* already on the queue).
rx_atomic_inc(&rx_nWaited);
rxi_calltrace(RX_CALL_ARRIVAL, call);
SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
- queue_Append(&rx_incomingCallQueue, call);
+ opr_queue_Append(&rx_incomingCallQueue, &call->entry);
}
} else {
- sq = queue_Last(&rx_idleServerQueue, rx_serverQueueEntry);
+ sq = opr_queue_Last(&rx_idleServerQueue,
+ struct rx_serverQueueEntry, entry);
/* If hot threads are enabled, and both newcallp and sq->socketp
* are non-null, then this thread will process the call, and the
* idle server thread will start listening on this threads socket.
*/
- queue_Remove(sq);
+ opr_queue_Remove(&sq->entry);
+
if (rx_enable_hot_thread && newcallp && sq->socketp) {
*newcallp = call;
*tnop = sq->tno;
if (call->flags & RX_CALL_WAIT_PROC) {
/* Conservative: I don't think this should happen */
call->flags &= ~RX_CALL_WAIT_PROC;
- if (queue_IsOnQueue(call)) {
- queue_Remove(call);
-
- rx_atomic_dec(&rx_nWaiting);
+ rx_atomic_dec(&rx_nWaiting);
+ if (opr_queue_IsOnQueue(&call->entry)) {
+ opr_queue_Remove(&call->entry);
}
}
call->state = RX_STATE_ACTIVE;
- call->mode = RX_MODE_RECEIVING;
+ call->app.mode = RX_MODE_RECEIVING;
#ifdef RX_KERNEL_TRACE
{
int glockOwner = ISAFS_GLOCK();
static void
rxi_SetAcksInTransmitQueue(struct rx_call *call)
{
- struct rx_packet *p, *tp;
+ struct opr_queue *cursor;
int someAcked = 0;
- for (queue_Scan(&call->tq, p, tp, rx_packet)) {
+ for (opr_queue_Scan(&call->tq, cursor)) {
+ struct rx_packet *p
+ = opr_queue_Entry(cursor, struct rx_packet, entry);
+
p->flags |= RX_PKTFLAG_ACKED;
someAcked = 1;
}
+
if (someAcked) {
call->flags |= RX_CALL_TQ_CLEARME;
call->flags |= RX_CALL_TQ_SOME_ACKED;
rxi_ClearTransmitQueue(struct rx_call *call, int force)
{
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- struct rx_packet *p, *tp;
-
+ struct opr_queue *cursor;
if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
int someAcked = 0;
- for (queue_Scan(&call->tq, p, tp, rx_packet)) {
+ for (opr_queue_Scan(&call->tq, cursor)) {
+ struct rx_packet *p
+ = opr_queue_Entry(cursor, struct rx_packet, entry);
+
p->flags |= RX_PKTFLAG_ACKED;
someAcked = 1;
}
static void
rxi_ClearReceiveQueue(struct rx_call *call)
{
- if (queue_IsNotEmpty(&call->rq)) {
+ if (!opr_queue_IsEmpty(&call->rq)) {
u_short count;
count = rxi_FreePackets(0, &call->rq);
osi_rxWakeup(&call->twind);
#endif
+ if (flags & RX_CALL_WAIT_PROC) {
+ rx_atomic_dec(&rx_nWaiting);
+ }
#ifdef RX_ENABLE_LOCKS
/* The following ensures that we don't mess with any queue while some
* other thread might also be doing so. The call_queue_lock field is
*/
if (call->call_queue_lock) {
MUTEX_ENTER(call->call_queue_lock);
- if (queue_IsOnQueue(call)) {
- queue_Remove(call);
- if (flags & RX_CALL_WAIT_PROC) {
- rx_atomic_dec(&rx_nWaiting);
- }
+ if (opr_queue_IsOnQueue(&call->entry)) {
+ opr_queue_Remove(&call->entry);
}
MUTEX_EXIT(call->call_queue_lock);
CLEAR_CALL_QUEUE_LOCK(call);
}
#else /* RX_ENABLE_LOCKS */
- if (queue_IsOnQueue(call)) {
- queue_Remove(call);
- if (flags & RX_CALL_WAIT_PROC)
- rx_atomic_dec(&rx_nWaiting);
+ if (opr_queue_IsOnQueue(&call->entry)) {
+ opr_queue_Remove(&call->entry);
}
#endif /* RX_ENABLE_LOCKS */
int istack)
{
struct rx_ackPacket *ap;
- struct rx_packet *rqp;
- struct rx_packet *nxp; /* For queue_Scan */
struct rx_packet *p;
+ struct opr_queue *cursor;
u_char offset = 0;
afs_int32 templ;
afs_uint32 padbytes = 0;
* are packets in the receive queue awaiting processing.
*/
if ((call->flags & RX_CALL_ACKALL_SENT) &&
- !queue_IsEmpty(&call->rq)) {
- ap->firstPacket = htonl(queue_Last(&call->rq, rx_packet)->header.seq + 1);
+ !opr_queue_IsEmpty(&call->rq)) {
+ ap->firstPacket = htonl(opr_queue_Last(&call->rq, struct rx_packet, entry)->header.seq + 1);
} else {
ap->firstPacket = htonl(call->rnext);
ap->previousPacket = htonl(call->rprev); /* Previous packet received */
- /* No fear of running out of ack packet here because there can only be at most
- * one window full of unacknowledged packets. The window size must be constrained
- * to be less than the maximum ack size, of course. Also, an ack should always
- * fit into a single packet -- it should not ever be fragmented. */
- for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
+ /* No fear of running out of ack packet here because there can only
+ * be at most one window full of unacknowledged packets. The window
+ * size must be constrained to be less than the maximum ack size,
+ * of course. Also, an ack should always fit into a single packet
+ * -- it should not ever be fragmented. */
+ offset = 0;
+ for (opr_queue_Scan(&call->rq, cursor)) {
+ struct rx_packet *rqp
+ = opr_queue_Entry(cursor, struct rx_packet, entry);
+
if (!rqp || !call->rq.next
|| (rqp->header.seq > (call->rnext + call->rwind))) {
#ifndef RX_ENABLE_TSFPQ
/* Send the whole list when the call is in receive mode, when
* the call is in eof mode, when we are in fast recovery mode,
* and when we have the last packet */
+ /* XXX - The accesses to app.mode aren't safe, as this may be called by
+ * the listener or event threads
+ */
if ((list[len - 1]->header.flags & RX_LAST_PACKET)
- || call->mode == RX_MODE_RECEIVING || call->mode == RX_MODE_EOF
+ || call->app.mode == RX_MODE_RECEIVING || call->app.mode == RX_MODE_EOF
|| (call->flags & RX_CALL_FAST_RECOVER)) {
/* Check for the case where the current list contains
* an acked packet. Since we always send retransmissions
}
}
+/**
+ * Check if the peer for the given call is known to be dead
+ *
+ * If the call's peer appears dead (it has encountered fatal network errors
+ * since the call started) the call is killed with RX_CALL_DEAD if the call
+ * is active. Otherwise, we do nothing.
+ *
+ * @param[in] call The call to check
+ *
+ * @return status
+ * @retval 0 The call is fine, and we haven't done anything to the call
+ * @retval nonzero The call's peer appears dead, and the call has been
+ * terminated if it was active
+ *
+ * @pre call->lock must be locked
+ */
+static int
+rxi_CheckPeerDead(struct rx_call *call)
+{
+#ifdef AFS_RXERRQ_ENV
+ int peererrs;
+
+ if (call->state == RX_STATE_DALLY) {
+ return 0;
+ }
+
+ peererrs = rx_atomic_read(&call->conn->peer->neterrs);
+ if (call->neterr_gen < peererrs) {
+ /* we have received network errors since this call started; kill
+ * the call */
+ if (call->state == RX_STATE_ACTIVE) {
+ rxi_CallError(call, RX_CALL_DEAD);
+ }
+ return -1;
+ }
+ if (call->neterr_gen > peererrs) {
+ /* someone has reset the number of peer errors; set the call error gen
+ * so we can detect if more errors are encountered */
+ call->neterr_gen = peererrs;
+ }
+#endif
+ return 0;
+}
+
static void
rxi_Resend(struct rxevent *event, void *arg0, void *arg1, int istack)
{
struct rx_call *call = arg0;
struct rx_peer *peer;
- struct rx_packet *p, *nxp;
+ struct opr_queue *cursor;
struct clock maxTimeout = { 60, 0 };
MUTEX_ENTER(&call->lock);
call->resendEvent = NULL;
}
+ rxi_CheckPeerDead(call);
+
if (rxi_busyChannelError && (call->flags & RX_CALL_PEER_BUSY)) {
rxi_CheckBusy(call);
}
- if (queue_IsEmpty(&call->tq)) {
+ if (opr_queue_IsEmpty(&call->tq)) {
/* Nothing to do. This means that we've been raced, and that an
* ACK has come in between when we were triggered, and when we
* actually got to run. */
call->flags |= RX_CALL_FAST_RECOVER;
/* Mark all of the pending packets in the queue as being lost */
- for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
+ for (opr_queue_Scan(&call->tq, cursor)) {
+ struct rx_packet *p = opr_queue_Entry(cursor, struct rx_packet, entry);
if (!(p->flags & RX_PKTFLAG_ACKED))
p->flags &= ~RX_PKTFLAG_SENT;
}
void
rxi_Start(struct rx_call *call, int istack)
{
-
- struct rx_packet *p;
- struct rx_packet *nxp; /* Next pointer for queue_Scan */
+ struct opr_queue *cursor;
+#ifdef RX_ENABLE_LOCKS
+ struct opr_queue *store;
+#endif
int nXmitPackets;
int maxXmitPackets;
return;
}
- if (queue_IsNotEmpty(&call->tq)) { /* If we have anything to send */
-
+ if (!opr_queue_IsEmpty(&call->tq)) { /* If we have anything to send */
/* Send (or resend) any packets that need it, subject to
* window restrictions and congestion burst control
* restrictions. Ask for an ack on the last packet sent in
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
nXmitPackets = 0;
maxXmitPackets = MIN(call->twind, call->cwind);
- for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
-#ifdef RX_TRACK_PACKETS
- if ((p->flags & RX_PKTFLAG_FREE)
- || (!queue_IsEnd(&call->tq, nxp)
- && (nxp->flags & RX_PKTFLAG_FREE))
- || (p == (struct rx_packet *)&rx_freePacketQueue)
- || (nxp == (struct rx_packet *)&rx_freePacketQueue)) {
- osi_Panic("rxi_Start: xmit queue clobbered");
- }
-#endif
+ for (opr_queue_Scan(&call->tq, cursor)) {
+ struct rx_packet *p
+ = opr_queue_Entry(cursor, struct rx_packet, entry);
+
if (p->flags & RX_PKTFLAG_ACKED) {
/* Since we may block, don't trust this */
if (rx_stats_active)
*(call->callNumber), p));
call->xmitList[nXmitPackets++] = p;
}
- }
+ } /* end of the queue_Scan */
/* xmitList now hold pointers to all of the packets that are
* ready to send. Now we loop to send the packets */
/* Some packets have received acks. If they all have, we can clear
* the transmit queue.
*/
- for (missing =
- 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
+ missing = 0;
+ for (opr_queue_ScanSafe(&call->tq, cursor, store)) {
+ struct rx_packet *p
+ = opr_queue_Entry(cursor, struct rx_packet, entry);
+
if (p->header.seq < call->tfirst
&& (p->flags & RX_PKTFLAG_ACKED)) {
- queue_Remove(p);
+ opr_queue_Remove(&p->entry);
#ifdef RX_TRACK_PACKETS
p->flags &= ~RX_PKTFLAG_TQ;
#endif
* haveCTLock Set if calling from rxi_ReapConnections
*/
#ifdef RX_ENABLE_LOCKS
-int
-static rxi_CheckCall(struct rx_call *call, int haveCTLock)
+static int
+rxi_CheckCall(struct rx_call *call, int haveCTLock)
#else /* RX_ENABLE_LOCKS */
-int
-static rxi_CheckCall(struct rx_call *call)
+static int
+rxi_CheckCall(struct rx_call *call)
#endif /* RX_ENABLE_LOCKS */
{
struct rx_connection *conn = call->conn;
int idle_timeout = 0;
afs_int32 clock_diff = 0;
+ if (rxi_CheckPeerDead(call)) {
+ return -1;
+ }
+
now = clock_Sec();
/* Large swings in the clock can have a significant impact on
* number of seconds. */
if (now > (call->lastReceiveTime + deadTime)) {
if (call->state == RX_STATE_ACTIVE) {
-#ifdef ADAPT_PMTU
-#if defined(KERNEL) && defined(AFS_SUN5_ENV)
+#ifdef AFS_ADAPT_PMTU
+# if defined(KERNEL) && defined(AFS_SUN5_ENV)
ire_t *ire;
-#if defined(AFS_SUN510_ENV) && defined(GLOBAL_NETSTACKID)
- netstack_t *ns = netstack_find_by_stackid(GLOBAL_NETSTACKID);
+# if defined(AFS_SUN510_ENV) && defined(GLOBAL_NETSTACKID)
+ netstack_t *ns = netstack_find_by_stackid(GLOBAL_NETSTACKID);
ip_stack_t *ipst = ns->netstack_ip;
-#endif
+# endif
ire = ire_cache_lookup(conn->peer->host
-#if defined(AFS_SUN510_ENV) && defined(ALL_ZONES)
+# if defined(AFS_SUN510_ENV) && defined(ALL_ZONES)
, ALL_ZONES
-#if defined(AFS_SUN510_ENV) && (defined(ICL_3_ARG) || defined(GLOBAL_NETSTACKID))
+# if defined(ICL_3_ARG) || defined(GLOBAL_NETSTACKID)
, NULL
-#if defined(AFS_SUN510_ENV) && defined(GLOBAL_NETSTACKID)
+# if defined(GLOBAL_NETSTACKID)
, ipst
-#endif
-#endif
-#endif
+# endif
+# endif
+# endif
);
if (ire && ire->ire_max_frag > 0)
rxi_SetPeerMtu(NULL, conn->peer->host, 0,
ire->ire_max_frag);
-#if defined(GLOBAL_NETSTACKID)
+# if defined(GLOBAL_NETSTACKID)
netstack_rele(ns);
-#endif
-#endif
-#endif /* ADAPT_PMTU */
+# endif
+# endif
+#endif /* AFS_ADAPT_PMTU */
cerror = RX_CALL_DEAD;
goto mtuout;
} else {
code = MUTEX_TRYENTER(&peer->peer_lock);
if ((code) && (peer->refCount == 0)
&& ((peer->idleWhen + rx_idlePeerTime) < now.sec)) {
- rx_interface_stat_p rpc_stat, nrpc_stat;
+ struct opr_queue *cursor, *store;
size_t space;
/*
MUTEX_EXIT(&peer->peer_lock);
MUTEX_DESTROY(&peer->peer_lock);
- for (queue_Scan
- (&peer->rpcStats, rpc_stat, nrpc_stat,
- rx_interface_stat)) {
+
+ for (opr_queue_ScanSafe(&peer->rpcStats, cursor, store)) {
unsigned int num_funcs;
+ struct rx_interface_stat *rpc_stat
+ = opr_queue_Entry(cursor, struct rx_interface_stat,
+ entry);
if (!rpc_stat)
break;
- queue_Remove(&rpc_stat->queue_header);
- queue_Remove(&rpc_stat->all_peers);
+
+ opr_queue_Remove(&rpc_stat->entry);
+ opr_queue_Remove(&rpc_stat->entryPeers);
+
num_funcs = rpc_stat->stats[0].func_total;
space =
sizeof(rx_interface_stat_t) +
peerStats->cwind = tp->cwind;
peerStats->nDgramPackets = tp->nDgramPackets;
peerStats->congestSeq = tp->congestSeq;
- peerStats->bytesSent.high = tp->bytesSent.high;
- peerStats->bytesSent.low = tp->bytesSent.low;
- peerStats->bytesReceived.high = tp->bytesReceived.high;
- peerStats->bytesReceived.low = tp->bytesReceived.low;
+ peerStats->bytesSent.high = tp->bytesSent >> 32;
+ peerStats->bytesSent.low = tp->bytesSent & MAX_AFS_UINT32;
+ peerStats->bytesReceived.high = tp->bytesReceived >> 32;
+ peerStats->bytesReceived.low
+ = tp->bytesReceived & MAX_AFS_UINT32;
MUTEX_EXIT(&tp->peer_lock);
MUTEX_ENTER(&rx_peerHashTable_lock);
#endif /* AFS_USE_GETTIMEOFDAY */
#endif /* AFS_PTHREAD_ENV */
- while (!queue_IsEmpty(&rx_freeCallQueue)) {
- call = queue_First(&rx_freeCallQueue, rx_call);
- queue_Remove(call);
+ while (!opr_queue_IsEmpty(&rx_freeCallQueue)) {
+ call = opr_queue_First(&rx_freeCallQueue, struct rx_call, entry);
+ opr_queue_Remove(&call->entry);
rxi_Free(call, sizeof(struct rx_call));
}
- while (!queue_IsEmpty(&rx_idleServerQueue)) {
- sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
- queue_Remove(sq);
+ while (!opr_queue_IsEmpty(&rx_idleServerQueue)) {
+ sq = opr_queue_First(&rx_idleServerQueue, struct rx_serverQueueEntry,
+ entry);
+ opr_queue_Remove(&sq->entry);
}
#endif /* KERNEL */
MUTEX_ENTER(&rx_peerHashTable_lock);
for (peer = *peer_ptr; peer; peer = next) {
- rx_interface_stat_p rpc_stat, nrpc_stat;
+ struct opr_queue *cursor, *store;
size_t space;
MUTEX_ENTER(&rx_rpc_stats);
MUTEX_ENTER(&peer->peer_lock);
- for (queue_Scan
- (&peer->rpcStats, rpc_stat, nrpc_stat,
- rx_interface_stat)) {
+ for (opr_queue_ScanSafe(&peer->rpcStats, cursor, store)) {
unsigned int num_funcs;
+ struct rx_interface_stat *rpc_stat
+ = opr_queue_Entry(cursor, struct rx_interface_stat,
+ entry);
if (!rpc_stat)
break;
- queue_Remove(&rpc_stat->queue_header);
- queue_Remove(&rpc_stat->all_peers);
+ opr_queue_Remove(&rpc_stat->entry);
+ opr_queue_Remove(&rpc_stat->entryPeers);
num_funcs = rpc_stat->stats[0].func_total;
space =
sizeof(rx_interface_stat_t) +
int i;
MUTEX_ENTER(&conn->conn_data_lock);
if (!conn->specific) {
- conn->specific = (void **)malloc((key + 1) * sizeof(void *));
+ conn->specific = malloc((key + 1) * sizeof(void *));
for (i = 0; i < key; i++)
conn->specific[i] = NULL;
conn->nSpecific = key + 1;
int i;
MUTEX_ENTER(&svc->svc_data_lock);
if (!svc->specific) {
- svc->specific = (void **)malloc((key + 1) * sizeof(void *));
+ svc->specific = malloc((key + 1) * sizeof(void *));
for (i = 0; i < key; i++)
svc->specific[i] = NULL;
svc->nSpecific = key + 1;
* which can come and go based upon the peer lifetime.
*/
-static struct rx_queue processStats = { &processStats, &processStats };
+static struct opr_queue processStats = { &processStats, &processStats };
/*
* peerStats is a queue used to store the statistics for all peer structs.
* Its contents are the union of all the peer rpcStats queues.
*/
-static struct rx_queue peerStats = { &peerStats, &peerStats };
+static struct opr_queue peerStats = { &peerStats, &peerStats };
/*
* rxi_monitor_processStats is used to turn process wide stat collection
static int rxi_monitor_peerStats = 0;
-/*
- * rxi_AddRpcStat - given all of the information for a particular rpc
- * call, create (if needed) and update the stat totals for the rpc.
- *
- * PARAMETERS
- *
- * IN stats - the queue of stats that will be updated with the new value
- *
- * IN rxInterface - a unique number that identifies the rpc interface
- *
- * IN currentFunc - the index of the function being invoked
- *
- * IN totalFunc - the total number of functions in this interface
- *
- * IN queueTime - the amount of time this function waited for a thread
+
+void
+rxi_ClearRPCOpStat(rx_function_entry_v1_p rpc_stat)
+{
+ rpc_stat->invocations = 0;
+ rpc_stat->bytes_sent = 0;
+ rpc_stat->bytes_rcvd = 0;
+ rpc_stat->queue_time_sum.sec = 0;
+ rpc_stat->queue_time_sum.usec = 0;
+ rpc_stat->queue_time_sum_sqr.sec = 0;
+ rpc_stat->queue_time_sum_sqr.usec = 0;
+ rpc_stat->queue_time_min.sec = 9999999;
+ rpc_stat->queue_time_min.usec = 9999999;
+ rpc_stat->queue_time_max.sec = 0;
+ rpc_stat->queue_time_max.usec = 0;
+ rpc_stat->execution_time_sum.sec = 0;
+ rpc_stat->execution_time_sum.usec = 0;
+ rpc_stat->execution_time_sum_sqr.sec = 0;
+ rpc_stat->execution_time_sum_sqr.usec = 0;
+ rpc_stat->execution_time_min.sec = 9999999;
+ rpc_stat->execution_time_min.usec = 9999999;
+ rpc_stat->execution_time_max.sec = 0;
+ rpc_stat->execution_time_max.usec = 0;
+}
+
+/*!
+ * Given all of the information for a particular rpc
+ * call, find or create (if requested) the stat structure for the rpc.
*
- * IN execTime - the amount of time this function invocation took to execute
+ * @param stats
+ * the queue of stats that will be updated with the new value
*
- * IN bytesSent - the number bytes sent by this invocation
+ * @param rxInterface
+ * a unique number that identifies the rpc interface
*
- * IN bytesRcvd - the number bytes received by this invocation
+ * @param totalFunc
+ * the total number of functions in this interface. this is only
+ * required if create is true
*
- * IN isServer - if true, this invocation was made to a server
+ * @param isServer
+ * if true, this invocation was made to a server
*
- * IN remoteHost - the ip address of the remote host
+ * @param remoteHost
+ * the ip address of the remote host. this is only required if create
+ * and addToPeerList are true
*
- * IN remotePort - the port of the remote host
+ * @param remotePort
+ * the port of the remote host. this is only required if create
+ * and addToPeerList are true
*
- * IN addToPeerList - if != 0, add newly created stat to the global peer list
+ * @param addToPeerList
+ * if != 0, add newly created stat to the global peer list
*
- * INOUT counter - if a new stats structure is allocated, the counter will
- * be updated with the new number of allocated stat structures
+ * @param counter
+ * if a new stats structure is allocated, the counter will
+ * be updated with the new number of allocated stat structures.
+ * only required if create is true
*
- * RETURN CODES
+ * @param create
+ * if no stats structure exists, allocate one
*
- * Returns void.
*/
-static int
-rxi_AddRpcStat(struct rx_queue *stats, afs_uint32 rxInterface,
- afs_uint32 currentFunc, afs_uint32 totalFunc,
- struct clock *queueTime, struct clock *execTime,
- afs_hyper_t * bytesSent, afs_hyper_t * bytesRcvd, int isServer,
- afs_uint32 remoteHost, afs_uint32 remotePort,
- int addToPeerList, unsigned int *counter)
+static rx_interface_stat_p
+rxi_FindRpcStat(struct opr_queue *stats, afs_uint32 rxInterface,
+ afs_uint32 totalFunc, int isServer, afs_uint32 remoteHost,
+ afs_uint32 remotePort, int addToPeerList,
+ unsigned int *counter, int create)
{
- int rc = 0;
- rx_interface_stat_p rpc_stat, nrpc_stat;
+ rx_interface_stat_p rpc_stat = NULL;
+ struct opr_queue *cursor;
/*
* See if there's already a structure for this interface
*/
- for (queue_Scan(stats, rpc_stat, nrpc_stat, rx_interface_stat)) {
+ for (opr_queue_Scan(stats, cursor)) {
+ rpc_stat = opr_queue_Entry(cursor, struct rx_interface_stat, entry);
+
if ((rpc_stat->stats[0].interfaceId == rxInterface)
&& (rpc_stat->stats[0].remote_is_server == isServer))
break;
}
+ /* if they didn't ask us to create, we're done */
+ if (!create) {
+ if (opr_queue_IsEnd(stats, cursor))
+ return NULL;
+ else
+ return rpc_stat;
+ }
+
+ /* can't proceed without these */
+ if (!totalFunc || !counter)
+ return NULL;
+
/*
* Didn't find a match so allocate a new structure and add it to the
* queue.
*/
- if (queue_IsEnd(stats, rpc_stat) || (rpc_stat == NULL)
+ if (opr_queue_IsEnd(stats, cursor) || (rpc_stat == NULL)
|| (rpc_stat->stats[0].interfaceId != rxInterface)
|| (rpc_stat->stats[0].remote_is_server != isServer)) {
int i;
totalFunc * sizeof(rx_function_entry_v1_t);
rpc_stat = rxi_Alloc(space);
- if (rpc_stat == NULL) {
- rc = 1;
- goto fail;
- }
+ if (rpc_stat == NULL)
+ return NULL;
+
*counter += totalFunc;
for (i = 0; i < totalFunc; i++) {
+ rxi_ClearRPCOpStat(&(rpc_stat->stats[i]));
rpc_stat->stats[i].remote_peer = remoteHost;
rpc_stat->stats[i].remote_port = remotePort;
rpc_stat->stats[i].remote_is_server = isServer;
rpc_stat->stats[i].interfaceId = rxInterface;
rpc_stat->stats[i].func_total = totalFunc;
rpc_stat->stats[i].func_index = i;
- hzero(rpc_stat->stats[i].invocations);
- hzero(rpc_stat->stats[i].bytes_sent);
- hzero(rpc_stat->stats[i].bytes_rcvd);
- rpc_stat->stats[i].queue_time_sum.sec = 0;
- rpc_stat->stats[i].queue_time_sum.usec = 0;
- rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
- rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
- rpc_stat->stats[i].queue_time_min.sec = 9999999;
- rpc_stat->stats[i].queue_time_min.usec = 9999999;
- rpc_stat->stats[i].queue_time_max.sec = 0;
- rpc_stat->stats[i].queue_time_max.usec = 0;
- rpc_stat->stats[i].execution_time_sum.sec = 0;
- rpc_stat->stats[i].execution_time_sum.usec = 0;
- rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
- rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
- rpc_stat->stats[i].execution_time_min.sec = 9999999;
- rpc_stat->stats[i].execution_time_min.usec = 9999999;
- rpc_stat->stats[i].execution_time_max.sec = 0;
- rpc_stat->stats[i].execution_time_max.usec = 0;
- }
- queue_Prepend(stats, rpc_stat);
+ }
+ opr_queue_Prepend(stats, &rpc_stat->entry);
if (addToPeerList) {
- queue_Prepend(&peerStats, &rpc_stat->all_peers);
+ opr_queue_Prepend(&peerStats, &rpc_stat->entryPeers);
}
}
+ return rpc_stat;
+}
+
+void
+rx_ClearProcessRPCStats(afs_int32 rxInterface)
+{
+ rx_interface_stat_p rpc_stat;
+ int totalFunc, i;
+
+ if (rxInterface == -1)
+ return;
+
+ MUTEX_ENTER(&rx_rpc_stats);
+ rpc_stat = rxi_FindRpcStat(&processStats, rxInterface, 0, 0,
+ 0, 0, 0, 0, 0);
+ if (rpc_stat) {
+ totalFunc = rpc_stat->stats[0].func_total;
+ for (i = 0; i < totalFunc; i++)
+ rxi_ClearRPCOpStat(&(rpc_stat->stats[i]));
+ }
+ MUTEX_EXIT(&rx_rpc_stats);
+ return;
+}
+
+void
+rx_ClearPeerRPCStats(afs_int32 rxInterface, afs_uint32 peerHost, afs_uint16 peerPort)
+{
+ rx_interface_stat_p rpc_stat;
+ int totalFunc, i;
+ struct rx_peer * peer;
+
+ if (rxInterface == -1)
+ return;
+
+ peer = rxi_FindPeer(peerHost, peerPort, 0, 0);
+ if (!peer)
+ return;
+
+ MUTEX_ENTER(&rx_rpc_stats);
+ rpc_stat = rxi_FindRpcStat(&peer->rpcStats, rxInterface, 0, 1,
+ 0, 0, 0, 0, 0);
+ if (rpc_stat) {
+ totalFunc = rpc_stat->stats[0].func_total;
+ for (i = 0; i < totalFunc; i++)
+ rxi_ClearRPCOpStat(&(rpc_stat->stats[i]));
+ }
+ MUTEX_EXIT(&rx_rpc_stats);
+ return;
+}
+
+void *
+rx_CopyProcessRPCStats(afs_uint64 op)
+{
+ rx_interface_stat_p rpc_stat;
+ rx_function_entry_v1_p rpcop_stat =
+ rxi_Alloc(sizeof(rx_function_entry_v1_t));
+ int currentFunc = (op & MAX_AFS_UINT32);
+ afs_int32 rxInterface = (op >> 32);
+
+ if (!rxi_monitor_processStats)
+ return NULL;
+
+ if (rxInterface == -1)
+ return NULL;
+
+ if (rpcop_stat == NULL)
+ return NULL;
+
+ MUTEX_ENTER(&rx_rpc_stats);
+ rpc_stat = rxi_FindRpcStat(&processStats, rxInterface, 0, 0,
+ 0, 0, 0, 0, 0);
+ if (rpc_stat)
+ memcpy(rpcop_stat, &(rpc_stat->stats[currentFunc]),
+ sizeof(rx_function_entry_v1_t));
+ MUTEX_EXIT(&rx_rpc_stats);
+ if (!rpc_stat) {
+ rxi_Free(rpcop_stat, sizeof(rx_function_entry_v1_t));
+ return NULL;
+ }
+ return rpcop_stat;
+}
+
+void *
+rx_CopyPeerRPCStats(afs_uint64 op, afs_uint32 peerHost, afs_uint16 peerPort)
+{
+ rx_interface_stat_p rpc_stat;
+ rx_function_entry_v1_p rpcop_stat =
+ rxi_Alloc(sizeof(rx_function_entry_v1_t));
+ int currentFunc = (op & MAX_AFS_UINT32);
+ afs_int32 rxInterface = (op >> 32);
+ struct rx_peer *peer;
+
+ if (!rxi_monitor_peerStats)
+ return NULL;
+
+ if (rxInterface == -1)
+ return NULL;
+
+ if (rpcop_stat == NULL)
+ return NULL;
+
+ peer = rxi_FindPeer(peerHost, peerPort, 0, 0);
+ if (!peer)
+ return NULL;
+
+ MUTEX_ENTER(&rx_rpc_stats);
+ rpc_stat = rxi_FindRpcStat(&peer->rpcStats, rxInterface, 0, 1,
+ 0, 0, 0, 0, 0);
+ if (rpc_stat)
+ memcpy(rpcop_stat, &(rpc_stat->stats[currentFunc]),
+ sizeof(rx_function_entry_v1_t));
+ MUTEX_EXIT(&rx_rpc_stats);
+ if (!rpc_stat) {
+ rxi_Free(rpcop_stat, sizeof(rx_function_entry_v1_t));
+ return NULL;
+ }
+ return rpcop_stat;
+}
+
+void
+rx_ReleaseRPCStats(void *stats)
+{
+ if (stats)
+ rxi_Free(stats, sizeof(rx_function_entry_v1_t));
+}
+
+/*!
+ * Given all of the information for a particular rpc
+ * call, create (if needed) and update the stat totals for the rpc.
+ *
+ * @param stats
+ * the queue of stats that will be updated with the new value
+ *
+ * @param rxInterface
+ * a unique number that identifies the rpc interface
+ *
+ * @param currentFunc
+ * the index of the function being invoked
+ *
+ * @param totalFunc
+ * the total number of functions in this interface
+ *
+ * @param queueTime
+ * the amount of time this function waited for a thread
+ *
+ * @param execTime
+ * the amount of time this function invocation took to execute
+ *
+ * @param bytesSent
+ * the number bytes sent by this invocation
+ *
+ * @param bytesRcvd
+ * the number bytes received by this invocation
+ *
+ * @param isServer
+ * if true, this invocation was made to a server
+ *
+ * @param remoteHost
+ * the ip address of the remote host
+ *
+ * @param remotePort
+ * the port of the remote host
+ *
+ * @param addToPeerList
+ * if != 0, add newly created stat to the global peer list
+ *
+ * @param counter
+ * if a new stats structure is allocated, the counter will
+ * be updated with the new number of allocated stat structures
+ *
+ */
+
+static int
+rxi_AddRpcStat(struct opr_queue *stats, afs_uint32 rxInterface,
+ afs_uint32 currentFunc, afs_uint32 totalFunc,
+ struct clock *queueTime, struct clock *execTime,
+ afs_uint64 bytesSent, afs_uint64 bytesRcvd, int isServer,
+ afs_uint32 remoteHost, afs_uint32 remotePort,
+ int addToPeerList, unsigned int *counter)
+{
+ int rc = 0;
+ rx_interface_stat_p rpc_stat;
+
+ rpc_stat = rxi_FindRpcStat(stats, rxInterface, totalFunc, isServer,
+ remoteHost, remotePort, addToPeerList, counter,
+ 1);
+ if (!rpc_stat) {
+ rc = -1;
+ goto fail;
+ }
/*
* Increment the stats for this function
*/
- hadd32(rpc_stat->stats[currentFunc].invocations, 1);
- hadd(rpc_stat->stats[currentFunc].bytes_sent, *bytesSent);
- hadd(rpc_stat->stats[currentFunc].bytes_rcvd, *bytesRcvd);
+ rpc_stat->stats[currentFunc].invocations++;
+ rpc_stat->stats[currentFunc].bytes_sent += bytesSent;
+ rpc_stat->stats[currentFunc].bytes_rcvd += bytesRcvd;
clock_Add(&rpc_stat->stats[currentFunc].queue_time_sum, queueTime);
clock_AddSq(&rpc_stat->stats[currentFunc].queue_time_sum_sqr, queueTime);
if (clock_Lt(queueTime, &rpc_stat->stats[currentFunc].queue_time_min)) {
return rc;
}
-/*
- * rx_IncrementTimeAndCount - increment the times and count for a particular
- * rpc function.
- *
- * PARAMETERS
- *
- * IN peer - the peer who invoked the rpc
- *
- * IN rxInterface - a unique number that identifies the rpc interface
- *
- * IN currentFunc - the index of the function being invoked
- *
- * IN totalFunc - the total number of functions in this interface
- *
- * IN queueTime - the amount of time this function waited for a thread
- *
- * IN execTime - the amount of time this function invocation took to execute
- *
- * IN bytesSent - the number bytes sent by this invocation
- *
- * IN bytesRcvd - the number bytes received by this invocation
- *
- * IN isServer - if true, this invocation was made to a server
- *
- * RETURN CODES
- *
- * Returns void.
- */
-
void
-rx_IncrementTimeAndCount(struct rx_peer *peer, afs_uint32 rxInterface,
- afs_uint32 currentFunc, afs_uint32 totalFunc,
- struct clock *queueTime, struct clock *execTime,
- afs_hyper_t * bytesSent, afs_hyper_t * bytesRcvd,
- int isServer)
+rxi_IncrementTimeAndCount(struct rx_peer *peer, afs_uint32 rxInterface,
+ afs_uint32 currentFunc, afs_uint32 totalFunc,
+ struct clock *queueTime, struct clock *execTime,
+ afs_uint64 bytesSent, afs_uint64 bytesRcvd,
+ int isServer)
{
if (!(rxi_monitor_peerStats || rxi_monitor_processStats))
}
MUTEX_EXIT(&rx_rpc_stats);
+}
+/*!
+ * Increment the times and count for a particular rpc function.
+ *
+ * Traditionally this call was invoked from rxgen stubs. Modern stubs
+ * call rx_RecordCallStatistics instead, so the public version of this
+ * function is left purely for legacy callers.
+ *
+ * @param peer
+ * The peer who invoked the rpc
+ *
+ * @param rxInterface
+ * A unique number that identifies the rpc interface
+ *
+ * @param currentFunc
+ * The index of the function being invoked
+ *
+ * @param totalFunc
+ * The total number of functions in this interface
+ *
+ * @param queueTime
+ * The amount of time this function waited for a thread
+ *
+ * @param execTime
+ * The amount of time this function invocation took to execute
+ *
+ * @param bytesSent
+ * The number bytes sent by this invocation
+ *
+ * @param bytesRcvd
+ * The number bytes received by this invocation
+ *
+ * @param isServer
+ * If true, this invocation was made to a server
+ *
+ */
+void
+rx_IncrementTimeAndCount(struct rx_peer *peer, afs_uint32 rxInterface,
+ afs_uint32 currentFunc, afs_uint32 totalFunc,
+ struct clock *queueTime, struct clock *execTime,
+ afs_hyper_t * bytesSent, afs_hyper_t * bytesRcvd,
+ int isServer)
+{
+ afs_uint64 sent64;
+ afs_uint64 rcvd64;
+
+ sent64 = ((afs_uint64)bytesSent->high << 32) + bytesSent->low;
+ rcvd64 = ((afs_uint64)bytesRcvd->high << 32) + bytesRcvd->low;
+
+ rxi_IncrementTimeAndCount(peer, rxInterface, currentFunc, totalFunc,
+ queueTime, execTime, sent64, rcvd64,
+ isServer);
}
+
+
/*
* rx_MarshallProcessRPCStats - marshall an array of rpc statistics
*
*(ptr++) = stats->interfaceId;
*(ptr++) = stats->func_total;
*(ptr++) = stats->func_index;
- *(ptr++) = hgethi(stats->invocations);
- *(ptr++) = hgetlo(stats->invocations);
- *(ptr++) = hgethi(stats->bytes_sent);
- *(ptr++) = hgetlo(stats->bytes_sent);
- *(ptr++) = hgethi(stats->bytes_rcvd);
- *(ptr++) = hgetlo(stats->bytes_rcvd);
+ *(ptr++) = stats->invocations >> 32;
+ *(ptr++) = stats->invocations & MAX_AFS_UINT32;
+ *(ptr++) = stats->bytes_sent >> 32;
+ *(ptr++) = stats->bytes_sent & MAX_AFS_UINT32;
+ *(ptr++) = stats->bytes_rcvd >> 32;
+ *(ptr++) = stats->bytes_rcvd & MAX_AFS_UINT32;
*(ptr++) = stats->queue_time_sum.sec;
*(ptr++) = stats->queue_time_sum.usec;
*(ptr++) = stats->queue_time_sum_sqr.sec;
ptr = *stats = rxi_Alloc(space);
if (ptr != NULL) {
- rx_interface_stat_p rpc_stat, nrpc_stat;
+ struct opr_queue *cursor;
-
- for (queue_Scan
- (&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
+ for (opr_queue_Scan(&processStats, cursor)) {
+ struct rx_interface_stat *rpc_stat =
+ opr_queue_Entry(cursor, struct rx_interface_stat, entry);
/*
* Copy the data based upon the caller version
*/
ptr = *stats = rxi_Alloc(space);
if (ptr != NULL) {
- rx_interface_stat_p rpc_stat, nrpc_stat;
- char *fix_offset;
+ struct opr_queue *cursor;
- for (queue_Scan
- (&peerStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
- /*
- * We have to fix the offset of rpc_stat since we are
- * keeping this structure on two rx_queues. The rx_queue
- * package assumes that the rx_queue member is the first
- * member of the structure. That is, rx_queue assumes that
- * any one item is only on one queue at a time. We are
- * breaking that assumption and so we have to do a little
- * math to fix our pointers.
- */
-
- fix_offset = (char *)rpc_stat;
- fix_offset -= offsetof(rx_interface_stat_t, all_peers);
- rpc_stat = (rx_interface_stat_p) fix_offset;
+ for (opr_queue_Scan(&peerStats, cursor)) {
+ struct rx_interface_stat *rpc_stat
+ = opr_queue_Entry(cursor, struct rx_interface_stat,
+ entryPeers);
/*
* Copy the data based upon the caller version
void
rx_disableProcessRPCStats(void)
{
- rx_interface_stat_p rpc_stat, nrpc_stat;
+ struct opr_queue *cursor, *store;
size_t space;
MUTEX_ENTER(&rx_rpc_stats);
rx_enable_stats = 0;
}
- for (queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
- unsigned int num_funcs = 0;
- if (!rpc_stat)
- break;
- queue_Remove(rpc_stat);
+ for (opr_queue_ScanSafe(&processStats, cursor, store)) {
+ unsigned int num_funcs = 0;
+ struct rx_interface_stat *rpc_stat
+ = opr_queue_Entry(cursor, struct rx_interface_stat, entry);
+
+ opr_queue_Remove(&rpc_stat->entry);
+
num_funcs = rpc_stat->stats[0].func_total;
space =
sizeof(rx_interface_stat_t) +
next = peer->next;
code = MUTEX_TRYENTER(&peer->peer_lock);
if (code) {
- rx_interface_stat_p rpc_stat, nrpc_stat;
size_t space;
+ struct opr_queue *cursor, *store;
if (prev == *peer_ptr) {
*peer_ptr = next;
peer->refCount++;
MUTEX_EXIT(&rx_peerHashTable_lock);
- for (queue_Scan
- (&peer->rpcStats, rpc_stat, nrpc_stat,
- rx_interface_stat)) {
+ for (opr_queue_ScanSafe(&peer->rpcStats, cursor, store)) {
unsigned int num_funcs = 0;
- if (!rpc_stat)
- break;
- queue_Remove(&rpc_stat->queue_header);
- queue_Remove(&rpc_stat->all_peers);
+ struct rx_interface_stat *rpc_stat
+ = opr_queue_Entry(cursor, struct rx_interface_stat,
+ entry);
+
+ opr_queue_Remove(&rpc_stat->entry);
+ opr_queue_Remove(&rpc_stat->entryPeers);
num_funcs = rpc_stat->stats[0].func_total;
space =
sizeof(rx_interface_stat_t) +
void
rx_clearProcessRPCStats(afs_uint32 clearFlag)
{
- rx_interface_stat_p rpc_stat, nrpc_stat;
+ struct opr_queue *cursor;
MUTEX_ENTER(&rx_rpc_stats);
- for (queue_Scan(&processStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
+ for (opr_queue_Scan(&processStats, cursor)) {
unsigned int num_funcs = 0, i;
+ struct rx_interface_stat *rpc_stat
+ = opr_queue_Entry(rpc_stat, struct rx_interface_stat, entry);
+
num_funcs = rpc_stat->stats[0].func_total;
for (i = 0; i < num_funcs; i++) {
if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
- hzero(rpc_stat->stats[i].invocations);
+ rpc_stat->stats[i].invocations = 0;
}
if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
- hzero(rpc_stat->stats[i].bytes_sent);
+ rpc_stat->stats[i].bytes_sent = 0;
}
if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
- hzero(rpc_stat->stats[i].bytes_rcvd);
+ rpc_stat->stats[i].bytes_rcvd = 0;
}
if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
rpc_stat->stats[i].queue_time_sum.sec = 0;
void
rx_clearPeerRPCStats(afs_uint32 clearFlag)
{
- rx_interface_stat_p rpc_stat, nrpc_stat;
+ struct opr_queue *cursor;
MUTEX_ENTER(&rx_rpc_stats);
- for (queue_Scan(&peerStats, rpc_stat, nrpc_stat, rx_interface_stat)) {
- unsigned int num_funcs = 0, i;
- char *fix_offset;
- /*
- * We have to fix the offset of rpc_stat since we are
- * keeping this structure on two rx_queues. The rx_queue
- * package assumes that the rx_queue member is the first
- * member of the structure. That is, rx_queue assumes that
- * any one item is only on one queue at a time. We are
- * breaking that assumption and so we have to do a little
- * math to fix our pointers.
- */
-
- fix_offset = (char *)rpc_stat;
- fix_offset -= offsetof(rx_interface_stat_t, all_peers);
- rpc_stat = (rx_interface_stat_p) fix_offset;
+ for (opr_queue_Scan(&peerStats, cursor)) {
+ unsigned int num_funcs, i;
+ struct rx_interface_stat *rpc_stat
+ = opr_queue_Entry(cursor, struct rx_interface_stat, entryPeers);
num_funcs = rpc_stat->stats[0].func_total;
for (i = 0; i < num_funcs; i++) {
if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
- hzero(rpc_stat->stats[i].invocations);
+ rpc_stat->stats[i].invocations = 0;
}
if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
- hzero(rpc_stat->stats[i].bytes_sent);
+ rpc_stat->stats[i].bytes_sent = 0;
}
if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
- hzero(rpc_stat->stats[i].bytes_rcvd);
+ rpc_stat->stats[i].bytes_rcvd = 0;
}
if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
rpc_stat->stats[i].queue_time_sum.sec = 0;
for (c = rx_allCallsp; c; c = c->allNextp) {
u_short rqc, tqc, iovqc;
- struct rx_packet *p, *np;
MUTEX_ENTER(&c->lock);
- queue_Count(&c->rq, p, np, rx_packet, rqc);
- queue_Count(&c->tq, p, np, rx_packet, tqc);
- queue_Count(&c->iovq, p, np, rx_packet, iovqc);
+ rqc = opr_queue_Count(&c->rq);
+ tqc = opr_queue_Count(&c->tq);
+ iovqc = opr_queue_Count(&c->app.iovq);
RXDPRINTF(RXDPRINTOUT, "%s - call=0x%p, id=%u, state=%u, mode=%u, conn=%p, epoch=%u, cid=%u, callNum=%u, connFlags=0x%x, flags=0x%x, "
"rqc=%u,%u, tqc=%u,%u, iovqc=%u,%u, "
"refCountAlive=%u, refCountPacket=%u, refCountSend=%u, refCountAckAll=%u, refCountAbort=%u"
#endif
"\r\n",
- cookie, c, c->call_id, (afs_uint32)c->state, (afs_uint32)c->mode, c->conn, c->conn?c->conn->epoch:0, c->conn?c->conn->cid:0,
+ cookie, c, c->call_id, (afs_uint32)c->state, (afs_uint32)c->app.mode, c->conn, c->conn?c->conn->epoch:0, c->conn?c->conn->cid:0,
c->callNumber?*c->callNumber:0, c->conn?c->conn->flags:0, c->flags,
(afs_uint32)c->rqc, (afs_uint32)rqc, (afs_uint32)c->tqc, (afs_uint32)tqc, (afs_uint32)c->iovqc, (afs_uint32)iovqc,
(afs_uint32)c->localStatus, (afs_uint32)c->remoteStatus, c->error, c->timeout,