rx: rxi_FindRpcStat must test for empty queue
[openafs.git] / src / rx / rx.c
index f6aa9ff..0f30eb6 100644 (file)
@@ -556,11 +556,10 @@ rx_InitHost(u_int host, u_int port)
     rx_connDeadTime = 12;
     rx_tranquil = 0;           /* reset flag */
     rxi_ResetStatistics();
-    htable = (char *)
-       osi_Alloc(rx_hashTableSize * sizeof(struct rx_connection *));
+    htable = osi_Alloc(rx_hashTableSize * sizeof(struct rx_connection *));
     PIN(htable, rx_hashTableSize * sizeof(struct rx_connection *));    /* XXXXX */
     memset(htable, 0, rx_hashTableSize * sizeof(struct rx_connection *));
-    ptable = (char *)osi_Alloc(rx_hashTableSize * sizeof(struct rx_peer *));
+    ptable = osi_Alloc(rx_hashTableSize * sizeof(struct rx_peer *));
     PIN(ptable, rx_hashTableSize * sizeof(struct rx_peer *));  /* XXXXX */
     memset(ptable, 0, rx_hashTableSize * sizeof(struct rx_peer *));
 
@@ -608,6 +607,7 @@ rx_InitHost(u_int host, u_int port)
 #endif
        if (getsockname((intptr_t)rx_socket, (struct sockaddr *)&addr, &addrlen)) {
            rx_Finalize();
+           osi_Free(htable, rx_hashTableSize * sizeof(struct rx_connection *));
            return -1;
        }
        rx_port = addr.sin_port;
@@ -1628,8 +1628,8 @@ rx_NewCall(struct rx_connection *conn)
     /* remember start time for call in case we have hard dead time limit */
     call->queueTime = queueTime;
     clock_GetTime(&call->startTime);
-    hzero(call->bytesSent);
-    hzero(call->bytesRcvd);
+    call->bytesSent = 0;
+    call->bytesRcvd = 0;
 
     /* Turn on busy protocol. */
     rxi_KeepAliveOn(call);
@@ -2933,7 +2933,6 @@ rxi_FindPeer(afs_uint32 host, u_short port,
            pp->host = host;    /* set here or in InitPeerParams is zero */
            pp->port = port;
            MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
-           queue_Init(&pp->congestionQueue);
            queue_Init(&pp->rpcStats);
            pp->next = rx_peerHashTable[hashIndex];
            rx_peerHashTable[hashIndex] = pp;
@@ -3028,8 +3027,6 @@ rxi_FindConnection(osi_socket socket, afs_uint32 host,
        conn->lastSendTime = clock_Sec();       /* don't GC immediately */
        conn->epoch = epoch;
        conn->cid = cid & RX_CIDMASK;
-       /* conn->serial = conn->lastSerial = 0; */
-       /* conn->timeout = 0; */
        conn->ackRate = RX_FAST_ACK_RATE;
        conn->service = service;
        conn->serviceId = serviceId;
@@ -3170,7 +3167,6 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket,
     int channel;
     afs_uint32 currentCallNumber;
     int type;
-    int skew;
 #ifdef RXDEBUG
     char *packetType;
 #endif
@@ -3189,6 +3185,26 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket,
         np->header.seq, np->header.flags, np));
 #endif
 
+    /* Account for connectionless packets */
+    if (rx_stats_active &&
+       ((np->header.type == RX_PACKET_TYPE_VERSION) ||
+         (np->header.type == RX_PACKET_TYPE_DEBUG))) {
+       struct rx_peer *peer;
+
+       /* Try to look up the peer structure, but don't create one */
+       peer = rxi_FindPeer(host, port, 0, 0);
+
+       /* Since this may not be associated with a connection, it may have
+        * no refCount, meaning we could race with ReapConnections
+        */
+
+       if (peer && (peer->refCount > 0)) {
+           MUTEX_ENTER(&peer->peer_lock);
+           peer->bytesReceived += np->length;
+           MUTEX_EXIT(&peer->peer_lock);
+       }
+    }
+
     if (np->header.type == RX_PACKET_TYPE_VERSION) {
        return rxi_ReceiveVersionPacket(np, socket, host, port, 1);
     }
@@ -3228,11 +3244,20 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket,
                           np->header.cid, np->header.epoch, type,
                           np->header.securityIndex);
 
+    /* To avoid having 2 connections just abort at each other,
+       don't abort an abort. */
     if (!conn) {
-       /* If no connection found or fabricated, just ignore the packet.
-        * (An argument could be made for sending an abort packet for
-        * the conn) */
-       return np;
+        if (np->header.type != RX_PACKET_TYPE_ABORT)
+            rxi_SendRawAbort(socket, host, port, RX_INVALID_OPERATION,
+                             np, 0);
+        return np;
+    }
+
+    /* If we're doing statistics, then account for the incoming packet */
+    if (rx_stats_active) {
+       MUTEX_ENTER(&conn->peer->peer_lock);
+       conn->peer->bytesReceived += np->length;
+       MUTEX_EXIT(&conn->peer->peer_lock);
     }
 
     /* If the connection is in an error state, send an abort packet and ignore
@@ -3312,8 +3337,8 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket,
 #endif
             call->state = RX_STATE_PRECALL;
             clock_GetTime(&call->queueTime);
-            hzero(call->bytesSent);
-            hzero(call->bytesRcvd);
+            call->bytesSent = 0;
+            call->bytesRcvd = 0;
             /*
              * If the number of queued calls exceeds the overload
              * threshold then abort this call.
@@ -3338,6 +3363,7 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket,
          * then, since this is a client connection we're getting data for
          * it must be for the previous call.
          */
+        MUTEX_EXIT(&conn->conn_call_lock);
         if (rx_stats_active)
             rx_atomic_inc(&rx_stats.spuriousPacketsRead);
        putConnection(conn);
@@ -3403,8 +3429,8 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket,
 #endif
            call->state = RX_STATE_PRECALL;
            clock_GetTime(&call->queueTime);
-           hzero(call->bytesSent);
-           hzero(call->bytesRcvd);
+           call->bytesSent = 0;
+           call->bytesRcvd = 0;
            /*
             * If the number of queued calls exceeds the overload
             * threshold then abort this call.
@@ -3509,31 +3535,6 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket,
     /* Set remote user defined status from packet */
     call->remoteStatus = np->header.userStatus;
 
-    /* Note the gap between the expected next packet and the actual
-     * packet that arrived, when the new packet has a smaller serial number
-     * than expected.  Rioses frequently reorder packets all by themselves,
-     * so this will be quite important with very large window sizes.
-     * Skew is checked against 0 here to avoid any dependence on the type of
-     * inPacketSkew (which may be unsigned).  In C, -1 > (unsigned) 0 is always
-     * true!
-     * The inPacketSkew should be a smoothed running value, not just a maximum.  MTUXXX
-     * see CalculateRoundTripTime for an example of how to keep smoothed values.
-     * I think using a beta of 1/8 is probably appropriate.  93.04.21
-     */
-    MUTEX_ENTER(&conn->conn_data_lock);
-    skew = conn->lastSerial - np->header.serial;
-    conn->lastSerial = np->header.serial;
-    MUTEX_EXIT(&conn->conn_data_lock);
-    if (skew > 0) {
-       struct rx_peer *peer;
-       peer = conn->peer;
-       if (skew > peer->inPacketSkew) {
-           dpf(("*** In skew changed from %d to %d\n",
-                  peer->inPacketSkew, skew));
-           peer->inPacketSkew = skew;
-       }
-    }
-
     /* Now do packet type-specific processing */
     switch (np->header.type) {
     case RX_PACKET_TYPE_DATA:
@@ -3851,15 +3852,12 @@ rxi_ReceiveDataPacket(struct rx_call *call,
        MUTEX_EXIT(&rx_freePktQ_lock);
         if (rx_stats_active)
             rx_atomic_inc(&rx_stats.noPacketBuffersOnRead);
-       call->rprev = np->header.serial;
        rxi_calltrace(RX_TRACE_DROP, call);
        dpf(("packet %"AFS_PTR_FMT" dropped on receipt - quota problems\n", np));
         /* We used to clear the receive queue here, in an attempt to free
          * packets. However this is unsafe if the queue has received a
          * soft ACK for the final packet */
        rxi_PostDelayedAckEvent(call, &rx_softAckDelay);
-
-       /* we've damaged this call already, might as well do it in. */
        return np;
     }
 #endif /* KERNEL */
@@ -4231,8 +4229,6 @@ rxi_ReceiveAckPacket(struct rx_call *call, struct rx_packet *np,
     afs_uint32 first;
     afs_uint32 prev;
     afs_uint32 serial;
-    /* because there are CM's that are bogus, sending weird values for this. */
-    afs_uint32 skew = 0;
     int nbytes;
     int missing;
     int acked;
@@ -4254,8 +4250,6 @@ rxi_ReceiveAckPacket(struct rx_call *call, struct rx_packet *np,
     first = ntohl(ap->firstPacket);
     prev = ntohl(ap->previousPacket);
     serial = ntohl(ap->serial);
-    /* temporarily disabled -- needs to degrade over time
-     * skew = ntohs(ap->maxSkew); */
 
     /* Ignore ack packets received out of order */
     if (first < call->tfirst ||
@@ -4303,11 +4297,11 @@ rxi_ReceiveAckPacket(struct rx_call *call, struct rx_packet *np,
        size_t len;
 
        len = _snprintf(msg, sizeof(msg),
-                       "tid[%d] RACK: reason %s serial %u previous %u seq %u skew %d first %u acks %u space %u ",
+                       "tid[%d] RACK: reason %s serial %u previous %u seq %u first %u acks %u space %u ",
                         GetCurrentThreadId(), rx_ack_reason(ap->reason),
                         ntohl(ap->serial), ntohl(ap->previousPacket),
-                        (unsigned int)np->header.seq, (unsigned int)skew,
-                        ntohl(ap->firstPacket), ap->nAcks, ntohs(ap->bufferSpace) );
+                        (unsigned int)np->header.seq, ntohl(ap->firstPacket),
+                        ap->nAcks, ntohs(ap->bufferSpace) );
        if (nAcks) {
            int offset;
 
@@ -4321,10 +4315,10 @@ rxi_ReceiveAckPacket(struct rx_call *call, struct rx_packet *np,
 #else /* AFS_NT40_ENV */
     if (rx_Log) {
        fprintf(rx_Log,
-               "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
+               "RACK: reason %x previous %u seq %u serial %u first %u",
                ap->reason, ntohl(ap->previousPacket),
                (unsigned int)np->header.seq, (unsigned int)serial,
-               (unsigned int)skew, ntohl(ap->firstPacket));
+               ntohl(ap->firstPacket));
        if (nAcks) {
            int offset;
            for (offset = 0; offset < nAcks; offset++)
@@ -4355,13 +4349,6 @@ rxi_ReceiveAckPacket(struct rx_call *call, struct rx_packet *np,
        }
     }
 
-    /* Update the outgoing packet skew value to the latest value of
-     * the peer's incoming packet skew value.  The ack packet, of
-     * course, could arrive out of order, but that won't affect things
-     * much */
-    peer->outPacketSkew = skew;
-
-
     clock_GetTime(&now);
 
     /* The transmit queue splits into 4 sections.
@@ -4871,10 +4858,9 @@ rxi_AttachServerProc(struct rx_call *call,
        if (call->flags & RX_CALL_WAIT_PROC) {
            /* Conservative:  I don't think this should happen */
            call->flags &= ~RX_CALL_WAIT_PROC;
+           rx_atomic_dec(&rx_nWaiting);
            if (queue_IsOnQueue(call)) {
                queue_Remove(call);
-
-               rx_atomic_dec(&rx_nWaiting);
            }
        }
        call->state = RX_STATE_ACTIVE;
@@ -5350,6 +5336,9 @@ rxi_ResetCall(struct rx_call *call, int newcall)
        osi_rxWakeup(&call->twind);
 #endif
 
+    if (flags & RX_CALL_WAIT_PROC) {
+       rx_atomic_dec(&rx_nWaiting);
+    }
 #ifdef RX_ENABLE_LOCKS
     /* The following ensures that we don't mess with any queue while some
      * other thread might also be doing so. The call_queue_lock field is
@@ -5364,9 +5353,6 @@ rxi_ResetCall(struct rx_call *call, int newcall)
        MUTEX_ENTER(call->call_queue_lock);
        if (queue_IsOnQueue(call)) {
            queue_Remove(call);
-           if (flags & RX_CALL_WAIT_PROC) {
-               rx_atomic_dec(&rx_nWaiting);
-           }
        }
        MUTEX_EXIT(call->call_queue_lock);
        CLEAR_CALL_QUEUE_LOCK(call);
@@ -5374,8 +5360,6 @@ rxi_ResetCall(struct rx_call *call, int newcall)
 #else /* RX_ENABLE_LOCKS */
     if (queue_IsOnQueue(call)) {
        queue_Remove(call);
-       if (flags & RX_CALL_WAIT_PROC)
-           rx_atomic_dec(&rx_nWaiting);
     }
 #endif /* RX_ENABLE_LOCKS */
 
@@ -7314,18 +7298,14 @@ rx_PrintStats(FILE * file)
 void
 rx_PrintPeerStats(FILE * file, struct rx_peer *peer)
 {
-    fprintf(file, "Peer %x.%d.  " "Burst size %d, " "burst wait %d.%06d.\n",
-           ntohl(peer->host), (int)ntohs(peer->port), (int)peer->burstSize,
-           (int)peer->burstWait.sec, (int)peer->burstWait.usec);
+    fprintf(file, "Peer %x.%d.\n",
+           ntohl(peer->host), (int)ntohs(peer->port));
 
     fprintf(file,
            "   Rtt %d, " "total sent %d, " "resent %d\n",
            peer->rtt, peer->nSent, peer->reSends);
 
-    fprintf(file,
-           "   Packet size %d, " "max in packet skew %d, "
-           "max out packet skew %d\n", peer->ifMTU, (int)peer->inPacketSkew,
-           (int)peer->outPacketSkew);
+    fprintf(file, "   Packet size %d\n", peer->ifMTU);
 }
 #endif
 
@@ -7694,16 +7674,12 @@ rx_GetServerPeers(osi_socket socket, afs_uint32 remoteAddr,
        peer->ifMTU = ntohs(peer->ifMTU);
        peer->idleWhen = ntohl(peer->idleWhen);
        peer->refCount = ntohs(peer->refCount);
-       peer->burstWait.sec = ntohl(peer->burstWait.sec);
-       peer->burstWait.usec = ntohl(peer->burstWait.usec);
        peer->rtt = ntohl(peer->rtt);
        peer->rtt_dev = ntohl(peer->rtt_dev);
        peer->timeout.sec = 0;
        peer->timeout.usec = 0;
        peer->nSent = ntohl(peer->nSent);
        peer->reSends = ntohl(peer->reSends);
-       peer->inPacketSkew = ntohl(peer->inPacketSkew);
-       peer->outPacketSkew = ntohl(peer->outPacketSkew);
        peer->natMTU = ntohs(peer->natMTU);
        peer->maxMTU = ntohs(peer->maxMTU);
        peer->maxDgramPackets = ntohs(peer->maxDgramPackets);
@@ -7750,18 +7726,16 @@ rx_GetLocalPeers(afs_uint32 peerHost, afs_uint16 peerPort,
                peerStats->ifMTU = tp->ifMTU;
                peerStats->idleWhen = tp->idleWhen;
                peerStats->refCount = tp->refCount;
-               peerStats->burstSize = tp->burstSize;
-               peerStats->burst = tp->burst;
-               peerStats->burstWait.sec = tp->burstWait.sec;
-               peerStats->burstWait.usec = tp->burstWait.usec;
+               peerStats->burstSize = 0;
+               peerStats->burst = 0;
+               peerStats->burstWait.sec = 0;
+               peerStats->burstWait.usec = 0;
                peerStats->rtt = tp->rtt;
                peerStats->rtt_dev = tp->rtt_dev;
                peerStats->timeout.sec = 0;
                peerStats->timeout.usec = 0;
                peerStats->nSent = tp->nSent;
                peerStats->reSends = tp->reSends;
-               peerStats->inPacketSkew = tp->inPacketSkew;
-               peerStats->outPacketSkew = tp->outPacketSkew;
                peerStats->natMTU = tp->natMTU;
                peerStats->maxMTU = tp->maxMTU;
                peerStats->maxDgramPackets = tp->maxDgramPackets;
@@ -7770,10 +7744,11 @@ rx_GetLocalPeers(afs_uint32 peerHost, afs_uint16 peerPort,
                peerStats->cwind = tp->cwind;
                peerStats->nDgramPackets = tp->nDgramPackets;
                peerStats->congestSeq = tp->congestSeq;
-               peerStats->bytesSent.high = tp->bytesSent.high;
-               peerStats->bytesSent.low = tp->bytesSent.low;
-               peerStats->bytesReceived.high = tp->bytesReceived.high;
-               peerStats->bytesReceived.low = tp->bytesReceived.low;
+               peerStats->bytesSent.high = tp->bytesSent >> 32;
+               peerStats->bytesSent.low = tp->bytesSent & MAX_AFS_UINT32;
+               peerStats->bytesReceived.high = tp->bytesReceived >> 32;
+               peerStats->bytesReceived.low
+                               = tp->bytesReceived & MAX_AFS_UINT32;
                 MUTEX_EXIT(&tp->peer_lock);
 
                 MUTEX_ENTER(&rx_peerHashTable_lock);
@@ -7959,7 +7934,7 @@ rx_SetSpecific(struct rx_connection *conn, int key, void *ptr)
     int i;
     MUTEX_ENTER(&conn->conn_data_lock);
     if (!conn->specific) {
-       conn->specific = (void **)malloc((key + 1) * sizeof(void *));
+       conn->specific = malloc((key + 1) * sizeof(void *));
        for (i = 0; i < key; i++)
            conn->specific[i] = NULL;
        conn->nSpecific = key + 1;
@@ -7985,7 +7960,7 @@ rx_SetServiceSpecific(struct rx_service *svc, int key, void *ptr)
     int i;
     MUTEX_ENTER(&svc->svc_data_lock);
     if (!svc->specific) {
-       svc->specific = (void **)malloc((key + 1) * sizeof(void *));
+       svc->specific = malloc((key + 1) * sizeof(void *));
        for (i = 0; i < key; i++)
            svc->specific[i] = NULL;
        svc->nSpecific = key + 1;
@@ -8065,55 +8040,80 @@ static int rxi_monitor_processStats = 0;
 
 static int rxi_monitor_peerStats = 0;
 
-/*
- * rxi_AddRpcStat - given all of the information for a particular rpc
- * call, create (if needed) and update the stat totals for the rpc.
- *
- * PARAMETERS
- *
- * IN stats - the queue of stats that will be updated with the new value
- *
- * IN rxInterface - a unique number that identifies the rpc interface
- *
- * IN currentFunc - the index of the function being invoked
- *
- * IN totalFunc - the total number of functions in this interface
+
+void
+rxi_ClearRPCOpStat(rx_function_entry_v1_p rpc_stat)
+{
+    rpc_stat->invocations = 0;
+    rpc_stat->bytes_sent = 0;
+    rpc_stat->bytes_rcvd = 0;
+    rpc_stat->queue_time_sum.sec = 0;
+    rpc_stat->queue_time_sum.usec = 0;
+    rpc_stat->queue_time_sum_sqr.sec = 0;
+    rpc_stat->queue_time_sum_sqr.usec = 0;
+    rpc_stat->queue_time_min.sec = 9999999;
+    rpc_stat->queue_time_min.usec = 9999999;
+    rpc_stat->queue_time_max.sec = 0;
+    rpc_stat->queue_time_max.usec = 0;
+    rpc_stat->execution_time_sum.sec = 0;
+    rpc_stat->execution_time_sum.usec = 0;
+    rpc_stat->execution_time_sum_sqr.sec = 0;
+    rpc_stat->execution_time_sum_sqr.usec = 0;
+    rpc_stat->execution_time_min.sec = 9999999;
+    rpc_stat->execution_time_min.usec = 9999999;
+    rpc_stat->execution_time_max.sec = 0;
+    rpc_stat->execution_time_max.usec = 0;
+}
+
+/*!
+ * Given all of the information for a particular rpc
+ * call, find or create (if requested) the stat structure for the rpc.
  *
- * IN queueTime - the amount of time this function waited for a thread
+ * @param stats
+ *     the queue of stats that will be updated with the new value
  *
- * IN execTime - the amount of time this function invocation took to execute
+ * @param rxInterface
+ *     a unique number that identifies the rpc interface
  *
- * IN bytesSent - the number bytes sent by this invocation
+ * @param totalFunc
+ *     the total number of functions in this interface. this is only
+ *      required if create is true
  *
- * IN bytesRcvd - the number bytes received by this invocation
+ * @param isServer
+ *     if true, this invocation was made to a server
  *
- * IN isServer - if true, this invocation was made to a server
+ * @param remoteHost
+ *     the ip address of the remote host. this is only required if create
+ *      and addToPeerList are true
  *
- * IN remoteHost - the ip address of the remote host
+ * @param remotePort
+ *     the port of the remote host. this is only required if create
+ *      and addToPeerList are true
  *
- * IN remotePort - the port of the remote host
+ * @param addToPeerList
+ *     if != 0, add newly created stat to the global peer list
  *
- * IN addToPeerList - if != 0, add newly created stat to the global peer list
+ * @param counter
+ *     if a new stats structure is allocated, the counter will
+ *     be updated with the new number of allocated stat structures.
+ *      only required if create is true
  *
- * INOUT counter - if a new stats structure is allocated, the counter will
- * be updated with the new number of allocated stat structures
+ * @param create
+ *     if no stats structure exists, allocate one
  *
- * RETURN CODES
- *
- * Returns void.
  */
 
-static int
-rxi_AddRpcStat(struct rx_queue *stats, afs_uint32 rxInterface,
-              afs_uint32 currentFunc, afs_uint32 totalFunc,
-              struct clock *queueTime, struct clock *execTime,
-              afs_hyper_t * bytesSent, afs_hyper_t * bytesRcvd, int isServer,
-              afs_uint32 remoteHost, afs_uint32 remotePort,
-              int addToPeerList, unsigned int *counter)
+static rx_interface_stat_p
+rxi_FindRpcStat(struct rx_queue *stats, afs_uint32 rxInterface,
+               afs_uint32 totalFunc, int isServer, afs_uint32 remoteHost,
+               afs_uint32 remotePort, int addToPeerList,
+               unsigned int *counter, int create)
 {
-    int rc = 0;
     rx_interface_stat_p rpc_stat, nrpc_stat;
 
+    if (queue_IsEmpty(stats) && !create)
+        return NULL;
+
     /*
      * See if there's already a structure for this interface
      */
@@ -8124,6 +8124,14 @@ rxi_AddRpcStat(struct rx_queue *stats, afs_uint32 rxInterface,
            break;
     }
 
+    /* if they didn't ask us to create, we're done */
+    if (!create)
+       return rpc_stat;
+
+    /* can't proceed without these */
+    if (!totalFunc || !counter)
+       return NULL;
+
     /*
      * Didn't find a match so allocate a new structure and add it to the
      * queue.
@@ -8140,51 +8148,216 @@ rxi_AddRpcStat(struct rx_queue *stats, afs_uint32 rxInterface,
            totalFunc * sizeof(rx_function_entry_v1_t);
 
        rpc_stat = rxi_Alloc(space);
-       if (rpc_stat == NULL) {
-           rc = 1;
-           goto fail;
-       }
+       if (rpc_stat == NULL)
+           return NULL;
+
        *counter += totalFunc;
        for (i = 0; i < totalFunc; i++) {
+           rxi_ClearRPCOpStat(&(rpc_stat->stats[i]));
            rpc_stat->stats[i].remote_peer = remoteHost;
            rpc_stat->stats[i].remote_port = remotePort;
            rpc_stat->stats[i].remote_is_server = isServer;
            rpc_stat->stats[i].interfaceId = rxInterface;
            rpc_stat->stats[i].func_total = totalFunc;
            rpc_stat->stats[i].func_index = i;
-           hzero(rpc_stat->stats[i].invocations);
-           hzero(rpc_stat->stats[i].bytes_sent);
-           hzero(rpc_stat->stats[i].bytes_rcvd);
-           rpc_stat->stats[i].queue_time_sum.sec = 0;
-           rpc_stat->stats[i].queue_time_sum.usec = 0;
-           rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
-           rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
-           rpc_stat->stats[i].queue_time_min.sec = 9999999;
-           rpc_stat->stats[i].queue_time_min.usec = 9999999;
-           rpc_stat->stats[i].queue_time_max.sec = 0;
-           rpc_stat->stats[i].queue_time_max.usec = 0;
-           rpc_stat->stats[i].execution_time_sum.sec = 0;
-           rpc_stat->stats[i].execution_time_sum.usec = 0;
-           rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
-           rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
-           rpc_stat->stats[i].execution_time_min.sec = 9999999;
-           rpc_stat->stats[i].execution_time_min.usec = 9999999;
-           rpc_stat->stats[i].execution_time_max.sec = 0;
-           rpc_stat->stats[i].execution_time_max.usec = 0;
        }
        queue_Prepend(stats, rpc_stat);
        if (addToPeerList) {
            queue_Prepend(&peerStats, &rpc_stat->all_peers);
        }
     }
+    return rpc_stat;
+}
+
+void
+rx_ClearProcessRPCStats(afs_int32 rxInterface)
+{
+    rx_interface_stat_p rpc_stat;
+    int totalFunc, i;
+
+    if (rxInterface == -1)
+        return;
+
+    MUTEX_ENTER(&rx_rpc_stats);
+    rpc_stat = rxi_FindRpcStat(&processStats, rxInterface, 0, 0,
+                              0, 0, 0, 0, 0);
+    if (rpc_stat) {
+       totalFunc = rpc_stat->stats[0].func_total;
+       for (i = 0; i < totalFunc; i++)
+           rxi_ClearRPCOpStat(&(rpc_stat->stats[i]));
+    }
+    MUTEX_EXIT(&rx_rpc_stats);
+    return;
+}
+
+void
+rx_ClearPeerRPCStats(afs_int32 rxInterface, afs_uint32 peerHost, afs_uint16 peerPort)
+{
+    rx_interface_stat_p rpc_stat;
+    int totalFunc, i;
+    struct rx_peer * peer;
+
+    if (rxInterface == -1)
+        return;
+
+    peer = rxi_FindPeer(peerHost, peerPort, 0, 0);
+    if (!peer)
+        return;
+
+    MUTEX_ENTER(&rx_rpc_stats);
+    rpc_stat = rxi_FindRpcStat(&peer->rpcStats, rxInterface, 0, 1,
+                              0, 0, 0, 0, 0);
+    if (rpc_stat) {
+       totalFunc = rpc_stat->stats[0].func_total;
+       for (i = 0; i < totalFunc; i++)
+           rxi_ClearRPCOpStat(&(rpc_stat->stats[i]));
+    }
+    MUTEX_EXIT(&rx_rpc_stats);
+    return;
+}
+
+void *
+rx_CopyProcessRPCStats(afs_uint64 op)
+{
+    rx_interface_stat_p rpc_stat;
+    rx_function_entry_v1_p rpcop_stat =
+       rxi_Alloc(sizeof(rx_function_entry_v1_t));
+    int currentFunc = (op & MAX_AFS_UINT32);
+    afs_int32 rxInterface = (op >> 32);
+
+    if (!rxi_monitor_processStats)
+        return NULL;
+
+    if (rxInterface == -1)
+        return NULL;
+
+    MUTEX_ENTER(&rx_rpc_stats);
+    rpc_stat = rxi_FindRpcStat(&processStats, rxInterface, 0, 0,
+                              0, 0, 0, 0, 0);
+    if (rpc_stat)
+       memcpy(rpcop_stat, &(rpc_stat->stats[currentFunc]),
+              sizeof(rx_function_entry_v1_t));
+    MUTEX_EXIT(&rx_rpc_stats);
+    if (!rpc_stat) {
+       rxi_Free(rpcop_stat, sizeof(rx_function_entry_v1_t));
+       return NULL;
+    }
+    return rpcop_stat;
+}
+
+void *
+rx_CopyPeerRPCStats(afs_uint64 op, afs_uint32 peerHost, afs_uint16 peerPort)
+{
+    rx_interface_stat_p rpc_stat;
+    rx_function_entry_v1_p rpcop_stat =
+       rxi_Alloc(sizeof(rx_function_entry_v1_t));
+    int currentFunc = (op & MAX_AFS_UINT32);
+    afs_int32 rxInterface = (op >> 32);
+    struct rx_peer *peer;
+
+    if (!rxi_monitor_peerStats)
+        return NULL;
+
+    if (rxInterface == -1)
+        return NULL;
+
+    peer = rxi_FindPeer(peerHost, peerPort, 0, 0);
+    if (!peer)
+        return NULL;
+
+    MUTEX_ENTER(&rx_rpc_stats);
+    rpc_stat = rxi_FindRpcStat(&peer->rpcStats, rxInterface, 0, 1,
+                              0, 0, 0, 0, 0);
+    if (rpc_stat)
+       memcpy(rpcop_stat, &(rpc_stat->stats[currentFunc]),
+              sizeof(rx_function_entry_v1_t));
+    MUTEX_EXIT(&rx_rpc_stats);
+    if (!rpc_stat) {
+       rxi_Free(rpcop_stat, sizeof(rx_function_entry_v1_t));
+       return NULL;
+    }
+    return rpcop_stat;
+}
+
+void
+rx_ReleaseRPCStats(void *stats)
+{
+    if (stats)
+       rxi_Free(stats, sizeof(rx_function_entry_v1_t));
+}
+
+/*!
+ * Given all of the information for a particular rpc
+ * call, create (if needed) and update the stat totals for the rpc.
+ *
+ * @param stats
+ *     the queue of stats that will be updated with the new value
+ *
+ * @param rxInterface
+ *     a unique number that identifies the rpc interface
+ *
+ * @param currentFunc
+ *     the index of the function being invoked
+ *
+ * @param totalFunc
+ *     the total number of functions in this interface
+ *
+ * @param queueTime
+ *     the amount of time this function waited for a thread
+ *
+ * @param execTime
+ *     the amount of time this function invocation took to execute
+ *
+ * @param bytesSent
+ *     the number bytes sent by this invocation
+ *
+ * @param bytesRcvd
+ *     the number bytes received by this invocation
+ *
+ * @param isServer
+ *     if true, this invocation was made to a server
+ *
+ * @param remoteHost
+ *     the ip address of the remote host
+ *
+ * @param remotePort
+ *     the port of the remote host
+ *
+ * @param addToPeerList
+ *     if != 0, add newly created stat to the global peer list
+ *
+ * @param counter
+ *     if a new stats structure is allocated, the counter will
+ *     be updated with the new number of allocated stat structures
+ *
+ */
+
+static int
+rxi_AddRpcStat(struct rx_queue *stats, afs_uint32 rxInterface,
+              afs_uint32 currentFunc, afs_uint32 totalFunc,
+              struct clock *queueTime, struct clock *execTime,
+              afs_uint64 bytesSent, afs_uint64 bytesRcvd, int isServer,
+              afs_uint32 remoteHost, afs_uint32 remotePort,
+              int addToPeerList, unsigned int *counter)
+{
+    int rc = 0;
+    rx_interface_stat_p rpc_stat;
+
+    rpc_stat = rxi_FindRpcStat(stats, rxInterface, totalFunc, isServer,
+                              remoteHost, remotePort, addToPeerList, counter,
+                              1);
+    if (!rpc_stat) {
+       rc = -1;
+       goto fail;
+    }
 
     /*
      * Increment the stats for this function
      */
 
-    hadd32(rpc_stat->stats[currentFunc].invocations, 1);
-    hadd(rpc_stat->stats[currentFunc].bytes_sent, *bytesSent);
-    hadd(rpc_stat->stats[currentFunc].bytes_rcvd, *bytesRcvd);
+    rpc_stat->stats[currentFunc].invocations++;
+    rpc_stat->stats[currentFunc].bytes_sent += bytesSent;
+    rpc_stat->stats[currentFunc].bytes_rcvd += bytesRcvd;
     clock_Add(&rpc_stat->stats[currentFunc].queue_time_sum, queueTime);
     clock_AddSq(&rpc_stat->stats[currentFunc].queue_time_sum_sqr, queueTime);
     if (clock_Lt(queueTime, &rpc_stat->stats[currentFunc].queue_time_min)) {
@@ -8207,41 +8380,12 @@ rxi_AddRpcStat(struct rx_queue *stats, afs_uint32 rxInterface,
     return rc;
 }
 
-/*
- * rx_IncrementTimeAndCount - increment the times and count for a particular
- * rpc function.
- *
- * PARAMETERS
- *
- * IN peer - the peer who invoked the rpc
- *
- * IN rxInterface - a unique number that identifies the rpc interface
- *
- * IN currentFunc - the index of the function being invoked
- *
- * IN totalFunc - the total number of functions in this interface
- *
- * IN queueTime - the amount of time this function waited for a thread
- *
- * IN execTime - the amount of time this function invocation took to execute
- *
- * IN bytesSent - the number bytes sent by this invocation
- *
- * IN bytesRcvd - the number bytes received by this invocation
- *
- * IN isServer - if true, this invocation was made to a server
- *
- * RETURN CODES
- *
- * Returns void.
- */
-
 void
-rx_IncrementTimeAndCount(struct rx_peer *peer, afs_uint32 rxInterface,
-                        afs_uint32 currentFunc, afs_uint32 totalFunc,
-                        struct clock *queueTime, struct clock *execTime,
-                        afs_hyper_t * bytesSent, afs_hyper_t * bytesRcvd,
-                        int isServer)
+rxi_IncrementTimeAndCount(struct rx_peer *peer, afs_uint32 rxInterface,
+                         afs_uint32 currentFunc, afs_uint32 totalFunc,
+                         struct clock *queueTime, struct clock *execTime,
+                         afs_uint64 bytesSent, afs_uint64 bytesRcvd,
+                         int isServer)
 {
 
     if (!(rxi_monitor_peerStats || rxi_monitor_processStats))
@@ -8264,9 +8408,63 @@ rx_IncrementTimeAndCount(struct rx_peer *peer, afs_uint32 rxInterface,
     }
 
     MUTEX_EXIT(&rx_rpc_stats);
+}
 
+/*!
+ * Increment the times and count for a particular rpc function.
+ *
+ * Traditionally this call was invoked from rxgen stubs. Modern stubs
+ * call rx_RecordCallStatistics instead, so the public version of this
+ * function is left purely for legacy callers.
+ *
+ * @param peer
+ *     The peer who invoked the rpc
+ *
+ * @param rxInterface
+ *     A unique number that identifies the rpc interface
+ *
+ * @param currentFunc
+ *     The index of the function being invoked
+ *
+ * @param totalFunc
+ *     The total number of functions in this interface
+ *
+ * @param queueTime
+ *     The amount of time this function waited for a thread
+ *
+ * @param execTime
+ *     The amount of time this function invocation took to execute
+ *
+ * @param bytesSent
+ *     The number bytes sent by this invocation
+ *
+ * @param bytesRcvd
+ *     The number bytes received by this invocation
+ *
+ * @param isServer
+ *     If true, this invocation was made to a server
+ *
+ */
+void
+rx_IncrementTimeAndCount(struct rx_peer *peer, afs_uint32 rxInterface,
+                        afs_uint32 currentFunc, afs_uint32 totalFunc,
+                        struct clock *queueTime, struct clock *execTime,
+                        afs_hyper_t * bytesSent, afs_hyper_t * bytesRcvd,
+                        int isServer)
+{
+    afs_uint64 sent64;
+    afs_uint64 rcvd64;
+
+    sent64 = ((afs_uint64)bytesSent->high << 32) + bytesSent->low;
+    rcvd64 = ((afs_uint64)bytesRcvd->high << 32) + bytesRcvd->low;
+
+    rxi_IncrementTimeAndCount(peer, rxInterface, currentFunc, totalFunc,
+                             queueTime, execTime, sent64, rcvd64,
+                             isServer);
 }
 
+
+
 /*
  * rx_MarshallProcessRPCStats - marshall an array of rpc statistics
  *
@@ -8301,12 +8499,12 @@ rx_MarshallProcessRPCStats(afs_uint32 callerVersion, int count,
        *(ptr++) = stats->interfaceId;
        *(ptr++) = stats->func_total;
        *(ptr++) = stats->func_index;
-       *(ptr++) = hgethi(stats->invocations);
-       *(ptr++) = hgetlo(stats->invocations);
-       *(ptr++) = hgethi(stats->bytes_sent);
-       *(ptr++) = hgetlo(stats->bytes_sent);
-       *(ptr++) = hgethi(stats->bytes_rcvd);
-       *(ptr++) = hgetlo(stats->bytes_rcvd);
+       *(ptr++) = stats->invocations >> 32;
+       *(ptr++) = stats->invocations & MAX_AFS_UINT32;
+       *(ptr++) = stats->bytes_sent >> 32;
+       *(ptr++) = stats->bytes_sent & MAX_AFS_UINT32;
+       *(ptr++) = stats->bytes_rcvd >> 32;
+       *(ptr++) = stats->bytes_rcvd & MAX_AFS_UINT32;
        *(ptr++) = stats->queue_time_sum.sec;
        *(ptr++) = stats->queue_time_sum.usec;
        *(ptr++) = stats->queue_time_sum_sqr.sec;
@@ -8796,13 +8994,13 @@ rx_clearProcessRPCStats(afs_uint32 clearFlag)
        num_funcs = rpc_stat->stats[0].func_total;
        for (i = 0; i < num_funcs; i++) {
            if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
-               hzero(rpc_stat->stats[i].invocations);
+               rpc_stat->stats[i].invocations = 0;
            }
            if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
-               hzero(rpc_stat->stats[i].bytes_sent);
+               rpc_stat->stats[i].bytes_sent = 0;
            }
            if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
-               hzero(rpc_stat->stats[i].bytes_rcvd);
+               rpc_stat->stats[i].bytes_rcvd = 0;
            }
            if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
                rpc_stat->stats[i].queue_time_sum.sec = 0;
@@ -8882,13 +9080,13 @@ rx_clearPeerRPCStats(afs_uint32 clearFlag)
        num_funcs = rpc_stat->stats[0].func_total;
        for (i = 0; i < num_funcs; i++) {
            if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
-               hzero(rpc_stat->stats[i].invocations);
+               rpc_stat->stats[i].invocations = 0;
            }
            if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
-               hzero(rpc_stat->stats[i].bytes_sent);
+               rpc_stat->stats[i].bytes_sent = 0;
            }
            if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
-               hzero(rpc_stat->stats[i].bytes_rcvd);
+               rpc_stat->stats[i].bytes_rcvd = 0;
            }
            if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
                rpc_stat->stats[i].queue_time_sum.sec = 0;