rx-atomic-macros-and-variables-20090127
authorJeffrey Altman <jaltman@your-file-system.com>
Wed, 28 Jan 2009 20:35:50 +0000 (20:35 +0000)
committerDerrick Brashear <shadow@dementia.org>
Wed, 28 Jan 2009 20:35:50 +0000 (20:35 +0000)
LICENSE BSD

adds macros to support accessing some variables as atomics, when atomic
support is available; otherwise falls back to mutex-protected access.

src/rx/rx.c
src/rx/rx.h
src/rx/rx_internal.h
src/rx/rx_kcommon.c
src/rx/rx_lwp.c
src/rx/rx_multi.c
src/rx/rx_packet.c
src/rx/rx_user.c

index 3f80a37..9476aea 100644 (file)
@@ -867,11 +867,11 @@ rx_NewConnection(afs_uint32 shost, u_short sport, u_short sservice,
        hashindex =
            CONN_HASH(shost, sport, tconn->cid, tconn->epoch,
                      RX_CLIENT_CONNECTION);
-        tconn->refCount++;    /* no lock required since only this thread knows */
+       rx_AtomicIncrement_NL(tconn->refCount); /* no lock required since only this thread knows */
        tconn->next = rx_connHashTable[hashindex];
        rx_connHashTable[hashindex] = tconn;
         if (rx_stats_active)
-            rx_MutexIncrement(rx_stats.nClientConns, rx_stats_mutex);
+            rx_AtomicIncrement(rx_stats.nClientConns, rx_stats_mutex);
     }
        
     MUTEX_EXIT(&rx_connHashTable_lock);
@@ -891,8 +891,8 @@ rx_SetConnDeadTime(struct rx_connection *conn, int seconds)
     tconn->secondsUntilPing = rx_ConnSecondsUntilDead(tconn) / 6;
 }
 
-int rxi_lowPeerRefCount = 0;
-int rxi_lowConnRefCount = 0;
+rx_atomic_t rxi_lowPeerRefCount = 0;
+rx_atomic_t rxi_lowConnRefCount = 0;
 
 /*
  * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
@@ -914,23 +914,23 @@ rxi_CleanupConnection(struct rx_connection *conn)
      * idle (refCount == 0) after rx_idlePeerTime (60 seconds) have passed.
      */
     MUTEX_ENTER(&rx_peerHashTable_lock);
-    if (conn->peer->refCount < 2) {
+    if (rx_AtomicDecrement_NL(conn->peer->refCount) < 1) {
        conn->peer->idleWhen = clock_Sec();
-       if (conn->peer->refCount < 1) {
-           conn->peer->refCount = 1;
+       if (rx_AtomicPeek_NL(conn->peer->refCount) < 0) {
+           rx_AtomicSwap_NL(&conn->peer->refCount, 0);
+            dpf(("UNDERCOUNT(peer %x)\n", conn->peer));
            if (rx_stats_active)
-                rx_MutexIncrement(rxi_lowPeerRefCount, rx_stats_mutex);
+                rx_AtomicIncrement(rxi_lowPeerRefCount, rx_stats_mutex);
        }
     }
-    conn->peer->refCount--;
     MUTEX_EXIT(&rx_peerHashTable_lock);
 
     if (rx_stats_active) 
     {
         if (conn->type == RX_SERVER_CONNECTION)
-            rx_MutexDecrement(rx_stats.nServerConns, rx_stats_mutex);
+            rx_AtomicDecrement(rx_stats.nServerConns, rx_stats_mutex);
         else
-            rx_MutexDecrement(rx_stats.nClientConns, rx_stats_mutex);
+            rx_AtomicDecrement(rx_stats.nClientConns, rx_stats_mutex);
     }
 #ifndef KERNEL
     if (conn->specific) {
@@ -1014,17 +1014,15 @@ rxi_DestroyConnectionNoLock(register struct rx_connection *conn)
 
     NETPRI;
     MUTEX_ENTER(&conn->conn_data_lock);
-    if (conn->refCount > 0)
-       conn->refCount--;
-    else {
+    /* This requires the atomic type to be signed */
+    if (rx_AtomicDecrement_NL(conn->refCount) < 0) {
+        dpf(("UNDERCOUNT(conn %x)\n", conn));
         if (rx_stats_active) {
-            MUTEX_ENTER(&rx_stats_mutex);
-            rxi_lowConnRefCount++;
-            MUTEX_EXIT(&rx_stats_mutex);
+           rx_AtomicIncrement(rxi_lowConnRefCount, rx_stats_mutex);
         }
     }
 
-    if ((conn->refCount > 0) || (conn->flags & RX_CONN_BUSY)) {
+    if ((rx_AtomicPeek_NL(conn->refCount) > 0) || (conn->flags & RX_CONN_BUSY)) {
        /* Busy; wait till the last guy before proceeding */
        MUTEX_EXIT(&conn->conn_data_lock);
        USERPRI;
@@ -1145,7 +1143,7 @@ rx_GetConnection(register struct rx_connection *conn)
     SPLVAR;
 
     NETPRI;
-    rx_MutexIncrement(conn->refCount, conn->conn_data_lock);
+    rx_AtomicIncrement(conn->refCount, conn->conn_data_lock);
     USERPRI;
 }
 
@@ -2144,7 +2142,7 @@ rx_Finalize(void)
                next = conn->next;
                if (conn->type == RX_CLIENT_CONNECTION) {
                    /* MUTEX_ENTER(&conn->conn_data_lock); when used in kernel */
-                   conn->refCount++;
+                   rx_AtomicIncrement(conn->refCount, conn->conn_data_lock);
                    /* MUTEX_EXIT(&conn->conn_data_lock); when used in kernel */
 #ifdef RX_ENABLE_LOCKS
                    rxi_DestroyConnectionNoLock(conn);
@@ -2261,7 +2259,7 @@ rxi_NewCall(register struct rx_connection *conn, register int channel)
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
        queue_Remove(call);
         if (rx_stats_active)
-            rx_MutexDecrement(rx_stats.nFreeCallStructs, rx_stats_mutex);
+            rx_AtomicDecrement(rx_stats.nFreeCallStructs, rx_stats_mutex);
        MUTEX_EXIT(&rx_freeCallQueue_lock);
        MUTEX_ENTER(&call->lock);
        CLEAR_CALL_QUEUE_LOCK(call);
@@ -2283,7 +2281,7 @@ rxi_NewCall(register struct rx_connection *conn, register int channel)
         rx_allCallsp = call;
         call->call_id = 
 #endif /* RXDEBUG_PACKET */
-            rx_MutexIncrement(rx_stats.nCallStructs, rx_stats_mutex);
+            rx_AtomicIncrement(rx_stats.nCallStructs, rx_stats_mutex);
         
         MUTEX_EXIT(&rx_freeCallQueue_lock);
        MUTEX_INIT(&call->lock, "call lock", MUTEX_DEFAULT, NULL);
@@ -2357,7 +2355,7 @@ rxi_FreeCall(register struct rx_call *call)
     queue_Append(&rx_freeCallQueue, call);
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
     if (rx_stats_active)
-        rx_MutexIncrement(rx_stats.nFreeCallStructs, rx_stats_mutex);
+        rx_AtomicIncrement(rx_stats.nFreeCallStructs, rx_stats_mutex);
     MUTEX_EXIT(&rx_freeCallQueue_lock);
 
     /* Destroy the connection if it was previously slated for
@@ -2373,7 +2371,7 @@ rxi_FreeCall(register struct rx_call *call)
      * call lock held or are going through this section of code.
      */
     if (conn->flags & RX_CONN_DESTROY_ME && !(conn->flags & RX_CONN_MAKECALL_WAITING)) {
-       rx_MutexIncrement(conn->refCount, conn->conn_data_lock);
+       rx_AtomicIncrement(conn->refCount, conn->conn_data_lock);
 #ifdef RX_ENABLE_LOCKS
        if (haveCTLock)
            rxi_DestroyConnectionNoLock(conn);
@@ -2481,14 +2479,14 @@ rxi_FindPeer(register afs_uint32 host, register u_short port,
            rx_peerHashTable[hashIndex] = pp;
            rxi_InitPeerParams(pp);
            if (rx_stats_active)
-                rx_MutexIncrement(rx_stats.nPeerStructs, rx_stats_mutex);
+                rx_AtomicIncrement(rx_stats.nPeerStructs, rx_stats_mutex);
        }
     }
     if (pp && create) {
-       pp->refCount++;
+       rx_AtomicIncrement_NL(pp->refCount);
     }
     if (origPeer)
-       origPeer->refCount--;
+       rx_AtomicDecrement_NL(origPeer->refCount);
     MUTEX_EXIT(&rx_peerHashTable_lock);
     return pp;
 }
@@ -2592,10 +2590,10 @@ rxi_FindConnection(osi_socket socket, register afs_int32 host,
        if (service->newConnProc)
            (*service->newConnProc) (conn);
         if (rx_stats_active)
-            rx_MutexIncrement(rx_stats.nServerConns, rx_stats_mutex);
+            rx_AtomicIncrement(rx_stats.nServerConns, rx_stats_mutex);
     }
 
-    rx_MutexIncrement(conn->refCount, conn->conn_data_lock);
+    rx_AtomicIncrement(conn->refCount, conn->conn_data_lock);
 
     rxLastConn = conn;         /* store this connection as the last conn used */
     MUTEX_EXIT(&rx_connHashTable_lock);
@@ -2706,7 +2704,7 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket,
        MUTEX_ENTER(&conn->conn_data_lock);
        if (np->header.type != RX_PACKET_TYPE_ABORT)
            np = rxi_SendConnectionAbort(conn, np, 1, 0);
-       conn->refCount--;
+       rx_AtomicDecrement_NL(conn->refCount);
        MUTEX_EXIT(&conn->conn_data_lock);
        return np;
     }
@@ -2719,22 +2717,22 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket,
            afs_int32 errcode = ntohl(rx_GetInt32(np, 0));
            dpf(("rxi_ReceivePacket ABORT rx_GetInt32 = %d", errcode));
            rxi_ConnectionError(conn, errcode);
-           rx_MutexDecrement(conn->refCount, conn->conn_data_lock);
+           rx_AtomicDecrement(conn->refCount, conn->conn_data_lock);
            return np;
        }
        case RX_PACKET_TYPE_CHALLENGE:
            tnp = rxi_ReceiveChallengePacket(conn, np, 1);
-           rx_MutexDecrement(conn->refCount, conn->conn_data_lock);
+           rx_AtomicDecrement(conn->refCount, conn->conn_data_lock);
            return tnp;
        case RX_PACKET_TYPE_RESPONSE:
            tnp = rxi_ReceiveResponsePacket(conn, np, 1);
-           rx_MutexDecrement(conn->refCount, conn->conn_data_lock);
+           rx_AtomicDecrement(conn->refCount, conn->conn_data_lock);
            return tnp;
        case RX_PACKET_TYPE_PARAMS:
        case RX_PACKET_TYPE_PARAMS + 1:
        case RX_PACKET_TYPE_PARAMS + 2:
            /* ignore these packet types for now */
-           rx_MutexDecrement(conn->refCount, conn->conn_data_lock);
+           rx_AtomicDecrement(conn->refCount, conn->conn_data_lock);
            return np;
 
 
@@ -2744,7 +2742,7 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket,
            rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
            MUTEX_ENTER(&conn->conn_data_lock);
            tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
-           conn->refCount--;
+           rx_AtomicDecrement_NL(conn->refCount);
            MUTEX_EXIT(&conn->conn_data_lock);
            return tnp;
        }
@@ -2781,8 +2779,8 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket,
             * it must be for the previous call.
             */
            if (rx_stats_active)
-                rx_MutexIncrement(rx_stats.spuriousPacketsRead, rx_stats_mutex);
-           rx_MutexDecrement(conn->refCount, conn->conn_data_lock);
+                rx_AtomicIncrement(rx_stats.spuriousPacketsRead, rx_stats_mutex);
+           rx_AtomicDecrement(conn->refCount, conn->conn_data_lock);
            return np;
        }
     }
@@ -2792,12 +2790,12 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket,
     if (type == RX_SERVER_CONNECTION) {        /* We're the server */
        if (np->header.callNumber < currentCallNumber) {
            if (rx_stats_active)
-                rx_MutexIncrement(rx_stats.spuriousPacketsRead, rx_stats_mutex);
+                rx_AtomicIncrement(rx_stats.spuriousPacketsRead, rx_stats_mutex);
 #ifdef RX_ENABLE_LOCKS
            if (call)
                MUTEX_EXIT(&call->lock);
 #endif
-           rx_MutexDecrement(conn->refCount, conn->conn_data_lock);
+           rx_AtomicDecrement(conn->refCount, conn->conn_data_lock);
            return np;
        }
        if (!call) {
@@ -2823,9 +2821,9 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket,
                rxi_CallError(call, rx_BusyError);
                tp = rxi_SendCallAbort(call, np, 1, 0);
                MUTEX_EXIT(&call->lock);
-               rx_MutexDecrement(conn->refCount, conn->conn_data_lock);
+               rx_AtomicDecrement(conn->refCount, conn->conn_data_lock);
                 if (rx_stats_active)
-                    rx_MutexIncrement(rx_stats.nBusies, rx_stats_mutex);
+                    rx_AtomicIncrement(rx_stats.nBusies, rx_stats_mutex);
                return tp;
            }
            rxi_KeepAliveOn(call);
@@ -2861,7 +2859,7 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket,
                tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY,
                                     NULL, 0, 1);
                MUTEX_EXIT(&call->lock);
-               rx_MutexDecrement(conn->refCount, conn->conn_data_lock);
+               rx_AtomicDecrement(conn->refCount, conn->conn_data_lock);
                return tp;
            }
            rxi_ResetCall(call, 0);
@@ -2884,9 +2882,9 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket,
                rxi_CallError(call, rx_BusyError);
                tp = rxi_SendCallAbort(call, np, 1, 0);
                MUTEX_EXIT(&call->lock);
-               rx_MutexDecrement(conn->refCount, conn->conn_data_lock);
+               rx_AtomicDecrement(conn->refCount, conn->conn_data_lock);
                 if (rx_stats_active)
-                    rx_MutexIncrement(rx_stats.nBusies, rx_stats_mutex);
+                    rx_AtomicIncrement(rx_stats.nBusies, rx_stats_mutex);
                return tp;
            }
            rxi_KeepAliveOn(call);
@@ -2898,13 +2896,13 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket,
        if (call && (call->state == RX_STATE_DALLY)
            && (np->header.type == RX_PACKET_TYPE_ACK)) {
            if (rx_stats_active)
-                rx_MutexIncrement(rx_stats.ignorePacketDally, rx_stats_mutex);
+                rx_AtomicIncrement(rx_stats.ignorePacketDally, rx_stats_mutex);
 #ifdef  RX_ENABLE_LOCKS
            if (call) {
                MUTEX_EXIT(&call->lock);
            }
 #endif
-           rx_MutexDecrement(conn->refCount, conn->conn_data_lock);
+           rx_AtomicDecrement(conn->refCount, conn->conn_data_lock);
            return np;
        }
 
@@ -2912,13 +2910,13 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket,
         * isn't a current call, then no packet is relevant. */
        if (!call || (np->header.callNumber != currentCallNumber)) {
            if (rx_stats_active)
-                rx_MutexIncrement(rx_stats.spuriousPacketsRead, rx_stats_mutex);
+                rx_AtomicIncrement(rx_stats.spuriousPacketsRead, rx_stats_mutex);
 #ifdef RX_ENABLE_LOCKS
            if (call) {
                MUTEX_EXIT(&call->lock);
            }
 #endif
-           rx_MutexDecrement(conn->refCount, conn->conn_data_lock);
+           rx_AtomicDecrement(conn->refCount, conn->conn_data_lock);
            return np;
        }
        /* If the service security object index stamped in the packet does not
@@ -2927,7 +2925,7 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket,
 #ifdef RX_ENABLE_LOCKS
            MUTEX_EXIT(&call->lock);
 #endif
-           rx_MutexDecrement(conn->refCount, conn->conn_data_lock);
+           rx_AtomicDecrement(conn->refCount, conn->conn_data_lock);
            return np;
        }
 
@@ -2974,9 +2972,9 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket,
                 * XXX code in receiveackpacket.  */
                if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
                     if (rx_stats_active)
-                        rx_MutexIncrement(rx_stats.spuriousPacketsRead, rx_stats_mutex);
+                        rx_AtomicIncrement(rx_stats.spuriousPacketsRead, rx_stats_mutex);
                    MUTEX_EXIT(&call->lock);
-                   rx_MutexDecrement(conn->refCount, conn->conn_data_lock);
+                   rx_AtomicDecrement(conn->refCount, conn->conn_data_lock);
                    return np;
                }
            }
@@ -3038,7 +3036,7 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket,
        dpf(("rxi_ReceivePacket ABORT rx_DataOf = %d", errdata));
        rxi_CallError(call, errdata);
        MUTEX_EXIT(&call->lock);
-       rx_MutexDecrement(conn->refCount, conn->conn_data_lock);
+       rx_AtomicDecrement(conn->refCount, conn->conn_data_lock);
        return np;              /* xmitting; drop packet */
     }
     case RX_PACKET_TYPE_BUSY:
@@ -3084,7 +3082,7 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket,
      * (if not, then the time won't actually be re-evaluated here). */
     call->lastReceiveTime = clock_Sec();
     MUTEX_EXIT(&call->lock);
-    rx_MutexDecrement(conn->refCount, conn->conn_data_lock);
+    rx_AtomicDecrement(conn->refCount, conn->conn_data_lock);
     return np;
 }
 
@@ -3151,7 +3149,7 @@ rxi_CheckReachEvent(struct rxevent *event, void *arg1, void *arg2)
     conn->checkReachEvent = NULL;
     waiting = conn->flags & RX_CONN_ATTACHWAIT;
     if (event)
-       conn->refCount--;
+       rx_AtomicDecrement_NL(conn->refCount);
     MUTEX_EXIT(&conn->conn_data_lock);
 
     if (waiting) {
@@ -3188,7 +3186,7 @@ rxi_CheckReachEvent(struct rxevent *event, void *arg1, void *arg2)
            when.sec += RX_CHECKREACH_TIMEOUT;
            MUTEX_ENTER(&conn->conn_data_lock);
            if (!conn->checkReachEvent) {
-               conn->refCount++;
+               rx_AtomicIncrement_NL(conn->refCount);
                conn->checkReachEvent =
                    rxevent_PostNow(&when, &now, rxi_CheckReachEvent, conn, 
                                    NULL);
@@ -3270,7 +3268,7 @@ rxi_ReceiveDataPacket(register struct rx_call *call,
     struct rx_packet *tnp;
     struct clock when, now;
     if (rx_stats_active)
-        rx_MutexIncrement(rx_stats.dataPacketsRead, rx_stats_mutex);
+        rx_AtomicIncrement(rx_stats.dataPacketsRead, rx_stats_mutex);
 
 #ifdef KERNEL
     /* If there are no packet buffers, drop this new packet, unless we can find
@@ -3281,7 +3279,7 @@ rxi_ReceiveDataPacket(register struct rx_call *call,
        rxi_NeedMorePackets = TRUE;
        MUTEX_EXIT(&rx_freePktQ_lock);
         if (rx_stats_active)
-            rx_MutexIncrement(rx_stats.noPacketBuffersOnRead, rx_stats_mutex);
+            rx_AtomicIncrement(rx_stats.noPacketBuffersOnRead, rx_stats_mutex);
        call->rprev = np->header.serial;
        rxi_calltrace(RX_TRACE_DROP, call);
        dpf(("packet %x dropped on receipt - quota problems", np));
@@ -3347,7 +3345,7 @@ rxi_ReceiveDataPacket(register struct rx_call *call,
            if (queue_IsNotEmpty(&call->rq)
                && queue_First(&call->rq, rx_packet)->header.seq == seq) {
                 if (rx_stats_active)
-                    rx_MutexIncrement(rx_stats.dupPacketsRead, rx_stats_mutex);
+                    rx_AtomicIncrement(rx_stats.dupPacketsRead, rx_stats_mutex);
                dpf(("packet %x dropped on receipt - duplicate", np));
                rxevent_Cancel(call->delayedAckEvent, call,
                               RX_CALL_REFCOUNT_DELAY);
@@ -3436,7 +3434,7 @@ rxi_ReceiveDataPacket(register struct rx_call *call,
             * application already, then this is a duplicate */
            if (seq < call->rnext) {
                 if (rx_stats_active)
-                    rx_MutexIncrement(rx_stats.dupPacketsRead, rx_stats_mutex);
+                    rx_AtomicIncrement(rx_stats.dupPacketsRead, rx_stats_mutex);
                rxevent_Cancel(call->delayedAckEvent, call,
                               RX_CALL_REFCOUNT_DELAY);
                np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack);
@@ -3464,7 +3462,7 @@ rxi_ReceiveDataPacket(register struct rx_call *call,
                /*Check for duplicate packet */
                if (seq == tp->header.seq) {
                     if (rx_stats_active)
-                        rx_MutexIncrement(rx_stats.dupPacketsRead, rx_stats_mutex);
+                        rx_AtomicIncrement(rx_stats.dupPacketsRead, rx_stats_mutex);
                    rxevent_Cancel(call->delayedAckEvent, call,
                                   RX_CALL_REFCOUNT_DELAY);
                    np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE,
@@ -3709,7 +3707,7 @@ rxi_ReceiveAckPacket(register struct rx_call *call, struct rx_packet *np,
     int maxDgramPackets = 0;   /* Set if peer supports AFS 3.5 jumbo datagrams */
 
     if (rx_stats_active)
-        rx_MutexIncrement(rx_stats.ackPacketsRead, rx_stats_mutex);
+        rx_AtomicIncrement(rx_stats.ackPacketsRead, rx_stats_mutex);
     ap = (struct rx_ackPacket *)rx_DataOf(np);
     nbytes = rx_Contiguous(np) - (int)((ap->acks) - (u_char *) ap);
     if (nbytes < 0)
@@ -4591,7 +4589,7 @@ rxi_ConnectionError(register struct rx_connection *conn,
            rxevent_Cancel(conn->checkReachEvent, (struct rx_call *)0, 0);
            conn->checkReachEvent = 0;
            conn->flags &= ~RX_CONN_ATTACHWAIT;
-           conn->refCount--;
+           rx_AtomicDecrement_NL(conn->refCount);
        }
        MUTEX_EXIT(&conn->conn_data_lock);
 
@@ -4609,7 +4607,7 @@ rxi_ConnectionError(register struct rx_connection *conn,
        }
         rx_SetConnError(conn, error);
         if (rx_stats_active)
-            rx_MutexIncrement(rx_stats.fatalErrors, rx_stats_mutex);
+            rx_AtomicIncrement(rx_stats.fatalErrors, rx_stats_mutex);
     }
 }
 
@@ -5048,7 +5046,7 @@ rxi_SendAck(register struct rx_call *call,
        }
     }
     if (rx_stats_active)
-        rx_MutexIncrement(rx_stats.ackPacketsSent, rx_stats_mutex);
+        rx_AtomicIncrement(rx_stats.ackPacketsSent, rx_stats_mutex);
 #ifndef RX_ENABLE_TSFPQ
     if (!optionalPacket)
        rxi_FreePacket(p);
@@ -5073,7 +5071,7 @@ rxi_SendList(struct rx_call *call, struct rx_packet **list, int len,
     if (resending)
        peer->reSends += len;
     if (rx_stats_active)
-        rx_MutexIncrement(rx_stats.dataPacketsSent, rx_stats_mutex);
+        rx_AtomicIncrement(rx_stats.dataPacketsSent, rx_stats_mutex);
     MUTEX_EXIT(&peer->peer_lock);
 
     if (list[len - 1]->header.flags & RX_LAST_PACKET) {
@@ -5109,7 +5107,7 @@ rxi_SendList(struct rx_call *call, struct rx_packet **list, int len,
        if (list[i]->header.serial) {
            requestAck = 1;
             if (rx_stats_active)
-                rx_MutexIncrement(rx_stats.dataPacketsReSent, rx_stats_mutex);
+                rx_AtomicIncrement(rx_stats.dataPacketsReSent, rx_stats_mutex);
        } else {
            /* improved RTO calculation- not Karn */
            list[i]->firstSent = *now;
@@ -5125,7 +5123,7 @@ rxi_SendList(struct rx_call *call, struct rx_packet **list, int len,
        if (resending)
            peer->reSends++;
         if (rx_stats_active)
-            rx_MutexIncrement(rx_stats.dataPacketsSent, rx_stats_mutex);
+            rx_AtomicIncrement(rx_stats.dataPacketsSent, rx_stats_mutex);
        MUTEX_EXIT(&peer->peer_lock);
 
        /* Tag this packet as not being the last in this group,
@@ -5426,7 +5424,7 @@ rxi_Start(struct rxevent *event,
                        /* Since we may block, don't trust this */
                        usenow.sec = usenow.usec = 0;
                         if (rx_stats_active)
-                            rx_MutexIncrement(rx_stats.ignoreAckedPacket, rx_stats_mutex);
+                            rx_AtomicIncrement(rx_stats.ignoreAckedPacket, rx_stats_mutex);
                        continue;       /* Ignore this packet if it has been acknowledged */
                    }
 
@@ -5991,7 +5989,7 @@ rxi_ComputeRoundTripTime(register struct rx_packet *p,
             rx_stats.maxRtt = *rttp;
         }
         clock_Add(&rx_stats.totalRtt, rttp);
-        rx_stats.nRttSamples++;
+        rx_AtomicIncrement_NL(rx_stats.nRttSamples);
         MUTEX_EXIT(&rx_stats_mutex);
     }
 
@@ -6106,10 +6104,10 @@ rxi_ReapConnections(struct rxevent *unused, void *unused1, void *unused2)
                    /* This only actually destroys the connection if
                     * there are no outstanding calls */
                    MUTEX_ENTER(&conn->conn_data_lock);
-                   if (!havecalls && !conn->refCount
+                   if (!havecalls && (rx_AtomicPeek_NL(conn->refCount) == 0)
                        && ((conn->lastSendTime + rx_idleConnectionTime) <
                            now.sec)) {
-                       conn->refCount++;       /* it will be decr in rx_DestroyConn */
+                       rx_AtomicIncrement_NL(conn->refCount);  /* it will be decr in rx_DestroyConn */
                        MUTEX_EXIT(&conn->conn_data_lock);
 #ifdef RX_ENABLE_LOCKS
                        rxi_DestroyConnectionNoLock(conn);
@@ -6152,7 +6150,7 @@ rxi_ReapConnections(struct rxevent *unused, void *unused1, void *unused2)
            for (prev = peer = *peer_ptr; peer; peer = next) {
                next = peer->next;
                code = MUTEX_TRYENTER(&peer->peer_lock);
-               if ((code) && (peer->refCount == 0)
+               if ((code) && (rx_AtomicPeek_NL(peer->refCount) == 0)
                    && ((peer->idleWhen + rx_idlePeerTime) < now.sec)) {
                    rx_interface_stat_p rpc_stat, nrpc_stat;
                    size_t space;
@@ -6177,7 +6175,7 @@ rxi_ReapConnections(struct rxevent *unused, void *unused1, void *unused2)
                    }
                    rxi_FreePeer(peer);
                     if (rx_stats_active)
-                        rx_MutexDecrement(rx_stats.nPeerStructs, rx_stats_mutex);
+                        rx_AtomicDecrement(rx_stats.nPeerStructs, rx_stats_mutex);
                    if (peer == *peer_ptr) {
                        *peer_ptr = next;
                        prev = next;
@@ -6492,57 +6490,70 @@ rx_PrintTheseStats(FILE * file, struct rx_statistics *s, int size,
     }
 
     fprintf(file, "rx stats: free packets %d, allocs %d, ", (int)freePackets,
-           s->packetRequests);
+           rx_AtomicPeek_NL(s->packetRequests));
 
     if (version >= RX_DEBUGI_VERSION_W_NEWPACKETTYPES) {
        fprintf(file, "alloc-failures(rcv %d/%d,send %d/%d,ack %d)\n",
-               s->receivePktAllocFailures, s->receiveCbufPktAllocFailures,
-               s->sendPktAllocFailures, s->sendCbufPktAllocFailures,
-               s->specialPktAllocFailures);
+               rx_AtomicPeek_NL(s->receivePktAllocFailures),
+               rx_AtomicPeek_NL(s->receiveCbufPktAllocFailures),
+               rx_AtomicPeek_NL(s->sendPktAllocFailures),
+               rx_AtomicPeek_NL(s->sendCbufPktAllocFailures),
+               rx_AtomicPeek_NL(s->specialPktAllocFailures));
     } else {
        fprintf(file, "alloc-failures(rcv %d,send %d,ack %d)\n",
-               s->receivePktAllocFailures, s->sendPktAllocFailures,
-               s->specialPktAllocFailures);
+               rx_AtomicPeek_NL(s->receivePktAllocFailures),
+               rx_AtomicPeek_NL(s->sendPktAllocFailures),
+               rx_AtomicPeek_NL(s->specialPktAllocFailures));
     }
 
     fprintf(file,
            "   greedy %d, " "bogusReads %d (last from host %x), "
            "noPackets %d, " "noBuffers %d, " "selects %d, "
-           "sendSelects %d\n", s->socketGreedy, s->bogusPacketOnRead,
-           s->bogusHost, s->noPacketOnRead, s->noPacketBuffersOnRead,
-           s->selects, s->sendSelects);
+           "sendSelects %d\n", 
+           rx_AtomicPeek_NL(s->socketGreedy), 
+           rx_AtomicPeek_NL(s->bogusPacketOnRead),
+           rx_AtomicPeek_NL(s->bogusHost), 
+           rx_AtomicPeek_NL(s->noPacketOnRead), 
+           rx_AtomicPeek_NL(s->noPacketBuffersOnRead),
+           rx_AtomicPeek_NL(s->selects),
+           rx_AtomicPeek_NL(s->sendSelects));
 
     fprintf(file, "   packets read: ");
     for (i = 0; i < RX_N_PACKET_TYPES; i++) {
-       fprintf(file, "%s %d ", rx_packetTypes[i], s->packetsRead[i]);
+      fprintf(file, "%s %d ", rx_packetTypes[i], rx_AtomicPeek_NL(s->packetsRead[i]));
     }
     fprintf(file, "\n");
 
     fprintf(file,
            "   other read counters: data %d, " "ack %d, " "dup %d "
-           "spurious %d " "dally %d\n", s->dataPacketsRead,
-           s->ackPacketsRead, s->dupPacketsRead, s->spuriousPacketsRead,
-           s->ignorePacketDally);
+           "spurious %d " "dally %d\n", rx_AtomicPeek_NL(s->dataPacketsRead),
+           rx_AtomicPeek_NL(s->ackPacketsRead), 
+           rx_AtomicPeek_NL(s->dupPacketsRead), 
+           rx_AtomicPeek_NL(s->spuriousPacketsRead),
+           rx_AtomicPeek_NL(s->ignorePacketDally));
 
     fprintf(file, "   packets sent: ");
     for (i = 0; i < RX_N_PACKET_TYPES; i++) {
-       fprintf(file, "%s %d ", rx_packetTypes[i], s->packetsSent[i]);
+      fprintf(file, "%s %d ", rx_packetTypes[i], rx_AtomicPeek_NL(s->packetsSent[i]));
     }
     fprintf(file, "\n");
 
     fprintf(file,
            "   other send counters: ack %d, " "data %d (not resends), "
            "resends %d, " "pushed %d, " "acked&ignored %d\n",
-           s->ackPacketsSent, s->dataPacketsSent, s->dataPacketsReSent,
-           s->dataPacketsPushed, s->ignoreAckedPacket);
+           rx_AtomicPeek_NL(s->ackPacketsSent), 
+           rx_AtomicPeek_NL(s->dataPacketsSent), 
+           rx_AtomicPeek_NL(s->dataPacketsReSent),
+           rx_AtomicPeek_NL(s->dataPacketsPushed), 
+           rx_AtomicPeek_NL(s->ignoreAckedPacket));
 
     fprintf(file,
            "   \t(these should be small) sendFailed %d, " "fatalErrors %d\n",
-           s->netSendFailures, (int)s->fatalErrors);
+           rx_AtomicPeek_NL(s->netSendFailures), rx_AtomicPeek_NL(s->fatalErrors));
 
-    if (s->nRttSamples) {
+    if (rx_AtomicPeek_NL(s->nRttSamples)) {
        fprintf(file, "   Average rtt is %0.3f, with %d samples\n",
-               clock_Float(&s->totalRtt) / s->nRttSamples, s->nRttSamples);
+               clock_Float(&s->totalRtt) / rx_AtomicPeek_NL(s->nRttSamples), rx_AtomicPeek_NL(s->nRttSamples));
 
        fprintf(file, "   Minimum rtt is %0.3f, maximum is %0.3f\n",
                clock_Float(&s->minRtt), clock_Float(&s->maxRtt));
@@ -6551,8 +6562,11 @@ rx_PrintTheseStats(FILE * file, struct rx_statistics *s, int size,
     fprintf(file,
            "   %d server connections, " "%d client connections, "
            "%d peer structs, " "%d call structs, " "%d free call structs\n",
-           s->nServerConns, s->nClientConns, s->nPeerStructs,
-           s->nCallStructs, s->nFreeCallStructs);
+           rx_AtomicPeek_NL(s->nServerConns),
+           rx_AtomicPeek_NL(s->nClientConns),
+           rx_AtomicPeek_NL(s->nPeerStructs),
+           rx_AtomicPeek_NL(s->nCallStructs),
+           rx_AtomicPeek_NL(s->nFreeCallStructs));
 
 #if    !defined(AFS_PTHREAD_ENV) && !defined(AFS_USE_GETTIMEOFDAY)
     fprintf(file, "   %d clock updates\n", clock_nUpdates);
@@ -7056,7 +7070,7 @@ shutdown_rx(void)
                next = peer->next;
                rxi_FreePeer(peer);
                 if (rx_stats_active)
-                    rx_MutexDecrement(rx_stats.nPeerStructs, rx_stats_mutex);
+                    rx_AtomicDecrement(rx_stats.nPeerStructs, rx_stats_mutex);
            }
        }
     }
index 3f63d04..bfe8c6a 100644 (file)
@@ -255,6 +255,8 @@ do {\
                rx_max_clones_per_connection = v; \
 } while(0);
 
+typedef afs_int32 rx_atomic_t;
+
 #define rx_PutConnection(conn) rx_DestroyConnection(conn)
 
 /* A connection is an authenticated communication path, allowing 
@@ -296,7 +298,7 @@ struct rx_connection {
     /* client-- to retransmit the challenge */
     struct rx_service *service;        /* used by servers only */
     u_short serviceId;         /* To stamp on requests (clients only) */
-    afs_uint32 refCount;               /* Reference count */
+    rx_atomic_t refCount;      /* Reference count */
     u_char flags;              /* Defined below */
     u_char type;               /* Type of connection, defined below */
     u_char secondsUntilPing;   /* how often to ping for each active call */
@@ -417,7 +419,8 @@ struct rx_peer {
 
     /* For garbage collection */
     afs_uint32 idleWhen;       /* When the refcountwent to zero */
-    afs_uint32 refCount;               /* Reference count for this structure */
+    rx_atomic_t refCount;              /* Reference count */
+
 
     /* Congestion control parameters */
     u_char burstSize;          /* Reinitialization size for the burst parameter */
@@ -852,47 +855,47 @@ struct rx_securityClass {
  * must equal sizeof(afs_int32). */
 
 struct rx_statistics {         /* General rx statistics */
-    int packetRequests;                /* Number of packet allocation requests */
-    int receivePktAllocFailures;
-    int sendPktAllocFailures;
-    int specialPktAllocFailures;
-    int socketGreedy;          /* Whether SO_GREEDY succeeded */
-    int bogusPacketOnRead;     /* Number of inappropriately short packets received */
-    int bogusHost;             /* Host address from bogus packets */
-    int noPacketOnRead;                /* Number of read packets attempted when there was actually no packet to read off the wire */
-    int noPacketBuffersOnRead; /* Number of dropped data packets due to lack of packet buffers */
-    int selects;               /* Number of selects waiting for packet or timeout */
-    int sendSelects;           /* Number of selects forced when sending packet */
-    int packetsRead[RX_N_PACKET_TYPES];        /* Total number of packets read, per type */
-    int dataPacketsRead;       /* Number of unique data packets read off the wire */
-    int ackPacketsRead;                /* Number of ack packets read */
-    int dupPacketsRead;                /* Number of duplicate data packets read */
-    int spuriousPacketsRead;   /* Number of inappropriate data packets */
-    int packetsSent[RX_N_PACKET_TYPES];        /* Number of rxi_Sends: packets sent over the wire, per type */
-    int ackPacketsSent;                /* Number of acks sent */
-    int pingPacketsSent;       /* Total number of ping packets sent */
-    int abortPacketsSent;      /* Total number of aborts */
-    int busyPacketsSent;       /* Total number of busies sent received */
-    int dataPacketsSent;       /* Number of unique data packets sent */
-    int dataPacketsReSent;     /* Number of retransmissions */
-    int dataPacketsPushed;     /* Number of retransmissions pushed early by a NACK */
-    int ignoreAckedPacket;     /* Number of packets with acked flag, on rxi_Start */
+    rx_atomic_t packetRequests;                /* Number of packet allocation requests */
+    rx_atomic_t receivePktAllocFailures;
+    rx_atomic_t sendPktAllocFailures;
+    rx_atomic_t specialPktAllocFailures;
+    rx_atomic_t socketGreedy;          /* Whether SO_GREEDY succeeded */
+    rx_atomic_t bogusPacketOnRead;     /* Number of inappropriately short packets received */
+    rx_atomic_t bogusHost;             /* Host address from bogus packets */
+    rx_atomic_t noPacketOnRead;                /* Number of read packets attempted when there was actually no packet to read off the wire */
+    rx_atomic_t noPacketBuffersOnRead; /* Number of dropped data packets due to lack of packet buffers */
+    rx_atomic_t selects;               /* Number of selects waiting for packet or timeout */
+    rx_atomic_t sendSelects;           /* Number of selects forced when sending packet */
+    rx_atomic_t packetsRead[RX_N_PACKET_TYPES];        /* Total number of packets read, per type */
+    rx_atomic_t dataPacketsRead;       /* Number of unique data packets read off the wire */
+    rx_atomic_t ackPacketsRead;                /* Number of ack packets read */
+    rx_atomic_t dupPacketsRead;                /* Number of duplicate data packets read */
+    rx_atomic_t spuriousPacketsRead;   /* Number of inappropriate data packets */
+    rx_atomic_t packetsSent[RX_N_PACKET_TYPES];        /* Number of rxi_Sends: packets sent over the wire, per type */
+    rx_atomic_t ackPacketsSent;                /* Number of acks sent */
+    rx_atomic_t pingPacketsSent;       /* Total number of ping packets sent */
+    rx_atomic_t abortPacketsSent;      /* Total number of aborts */
+    rx_atomic_t busyPacketsSent;       /* Total number of busies sent received */
+    rx_atomic_t dataPacketsSent;       /* Number of unique data packets sent */
+    rx_atomic_t dataPacketsReSent;     /* Number of retransmissions */
+    rx_atomic_t dataPacketsPushed;     /* Number of retransmissions pushed early by a NACK */
+    rx_atomic_t ignoreAckedPacket;     /* Number of packets with acked flag, on rxi_Start */
     struct clock totalRtt;     /* Total round trip time measured (use to compute average) */
     struct clock minRtt;       /* Minimum round trip time measured */
     struct clock maxRtt;       /* Maximum round trip time measured */
-    int nRttSamples;           /* Total number of round trip samples */
-    int nServerConns;          /* Total number of server connections */
-    int nClientConns;          /* Total number of client connections */
-    int nPeerStructs;          /* Total number of peer structures */
-    int nCallStructs;          /* Total number of call structures allocated */
-    int nFreeCallStructs;      /* Total number of previously allocated free call structures */
-    int netSendFailures;
-    afs_int32 fatalErrors;
-    int ignorePacketDally;     /* packets dropped because call is in dally state */
-    int receiveCbufPktAllocFailures;
-    int sendCbufPktAllocFailures;
-    int nBusies;
-    int spares[4];
+    rx_atomic_t nRttSamples;           /* Total number of round trip samples */
+    rx_atomic_t nServerConns;          /* Total number of server connections */
+    rx_atomic_t nClientConns;          /* Total number of client connections */
+    rx_atomic_t nPeerStructs;          /* Total number of peer structures */
+    rx_atomic_t nCallStructs;          /* Total number of call structures allocated */
+    rx_atomic_t nFreeCallStructs;      /* Total number of previously allocated free call structures */
+    rx_atomic_t netSendFailures;
+    rx_atomic_t fatalErrors;
+    rx_atomic_t ignorePacketDally;     /* packets dropped because call is in dally state */
+    rx_atomic_t receiveCbufPktAllocFailures;
+    rx_atomic_t sendCbufPktAllocFailures;
+    rx_atomic_t nBusies;
+    rx_atomic_t spares[4];
 };
 
 /* structures for debug input and output packets */
index f03f62b..6e716d3 100644 (file)
 #include <intrin.h>
 #pragma intrinsic(_InterlockedOr)
 #pragma intrinsic(_InterlockedAnd)
-#define rx_MutexOr(object, operand, mutex) _InterlockedOr(&object, operand)
-#define rx_MutexAnd(object, operand, mutex) _InterlockedAnd(&object, operand)
-#endif
-#else
-#define rx_MutexOr(object, operand, mutex) InterlockedOr(&object, operand)
-#define rx_MutexAnd(object, operand, mutex) InterlockedAnd(&object, operand)
-#endif
-#define rx_MutexIncrement(object, mutex) InterlockedIncrement(&object)
-#define rx_MutexXor(object, operand, mutex) InterlockedXor(&object, operand)
-#define rx_MutexAdd(object, addend, mutex) InterlockedExchangeAdd(&object, addend)
-#define rx_MutexDecrement(object, mutex) InterlockedDecrement(&object)
-#define rx_MutexAdd1Increment2(object1, addend, object2, mutex) \
-    do { \
-        MUTEX_ENTER(&mutex); \
-        object1 += addend; \
-        InterlockedIncrement(&object2); \
-        MUTEX_EXIT(&mutex); \
-    } while (0)
-#define rx_MutexAdd1Decrement2(object1, addend, object2, mutex) \
-    do { \
-        MUTEX_ENTER(&mutex); \
-        object1 += addend; \
-        InterlockedDecrement(&object2); \
-        MUTEX_EXIT(&mutex); \
-    } while (0)
+#define rx_AtomicOr(object, operand, mutex) _InterlockedOr(&object, operand)
+#define rx_AtomicAnd(object, operand, mutex) _InterlockedAnd(&object, operand)
+#endif /* __cplusplus */
+#else /* !WIN64 */
+#define rx_AtomicOr(object, operand, mutex) InterlockedOr(&object, operand)
+#define rx_AtomicAnd(object, operand, mutex) InterlockedAnd(&object, operand)
+#endif /* WIN64 */
+#define rx_AtomicIncrement_NL(object) InterlockedIncrement(&object)
+#define rx_AtomicIncrement(object, mutex) InterlockedIncrement(&object)
+#define rx_AtomicXor(object, operand, mutex) InterlockedXor(&object, operand)
+#define rx_AtomicAdd_NL(object, addend) InterlockedExchangeAdd(&object, addend)
+#define rx_AtomicAdd(object, addend, mutex) InterlockedExchangeAdd(&object, addend)
+#define rx_AtomicDecrement_NL(object) InterlockedDecrement(&object)
+#define rx_AtomicDecrement(object, mutex) InterlockedDecrement(&object)
+#define rx_AtomicSwap_NL(object1, object2) InterlockedExchange ((volatile LONG *) object1, object2);
+#define rx_AtomicSwap(object1, object2, mutex) InterlockedExchange ((volatile LONG *) object1, object2);
 #elif defined(AFS_DARWIN80_ENV)
-#define rx_MutexIncrement(object, mutex) OSAtomicIncrement32(&object)
-#define rx_MutexOr(object, operand, mutex) OSAtomicOr32(operand, &object)
-#define rx_MutexAnd(object, operand, mutex) OSAtomicAnd32(operand, &object)
-#define rx_MutexXor(object, operand, mutex) OSAtomicXor32(operand, &object)
-#define rx_MutexAdd(object, addend, mutex) OSAtomicAdd32(addend, &object)
-#define rx_MutexDecrement(object, mutex) OSAtomicDecrement32(&object)
-#define rx_MutexAdd1Increment2(object1, addend, object2, mutex) \
-    do { \
-        MUTEX_ENTER(&mutex); \
-        object1 += addend; \
-        OSAtomicIncrement32(&object2); \
-        MUTEX_EXIT(&mutex); \
-    } while (0)
-#define rx_MutexAdd1Decrement2(object1, addend, object2, mutex) \
-    do { \
-        MUTEX_ENTER(&mutex); \
-        object1 += addend; \
-        OSAtomicDecrement32(&object2); \
-        MUTEX_EXIT(&mutex); \
-    } while (0)
+#define rx_AtomicIncrement_NL(object) OSAtomicIncrement32(&object)
+#define rx_AtomicIncrement(object, mutex) OSAtomicIncrement32(&object)
+#define rx_AtomicOr(object, operand, mutex) OSAtomicOr32(operand, &object)
+#define rx_AtomicAnd(object, operand, mutex) OSAtomicAnd32(operand, &object)
+#define rx_AtomicXor(object, operand, mutex) OSAtomicXor32(operand, &object)
+#define rx_AtomicAdd_NL(object, addend) OSAtomicAdd32(addend, &object)
+#define rx_AtomicAdd(object, addend, mutex) OSAtomicAdd32(addend, &object)
+#define rx_AtomicDecrement_NL(object) OSAtomicDecrement32(&object)
+#define rx_AtomicDecrement(object, mutex) OSAtomicDecrement32(&object)
+#define rx_AtomicSwap_NL(oldval, newval) rx_AtomicSwap_int(oldval, newval)
+#define rx_AtomicSwap(oldval, newval, mutex) rx_AtomicSwap_int(oldval, newval)
+static inline afs_int32 rx_AtomicSwap_int(afs_int32 *oldval, afs_int32 newval) {
+    afs_int32 ret = *oldval;
+    OSAtomicCompareAndSwap32 ((afs_int32) *oldval,(afs_int32) newval,
+                             (afs_int32*) oldval);
+    return ret;
+}
 #elif defined(AFS_SUN58_ENV)
-#define rx_MutexIncrement(object, mutex) atomic_inc_32(&object)
-#define rx_MutexOr(object, operand, mutex) atomic_or_32(&object, operand)
-#define rx_MutexAnd(object, operand, mutex) atomic_and_32(&object, operand)
-#define rx_MutexXor(object, operand, mutex) \
-    do { \
-        MUTEX_ENTER(&mutex); \
-        object ^= operand; \
-        MUTEX_EXIT(&mutex); \
-    } while(0)
-#define rx_MutexXor(object, operand, mutex) OSAtomicXor32Barrier(operand, &object)
-#define rx_MutexAdd(object, addend, mutex) atomic_add_32(&object, addend)
-#define rx_MutexDecrement(object, mutex) atomic_dec_32(&object)
-#define rx_MutexAdd1Increment2(object1, addend, object2, mutex) \
-    do { \
-        MUTEX_ENTER(&mutex); \
-        object1 += addend; \
-        atomic_inc_32(&object2); \
-        MUTEX_EXIT(&mutex); \
-    } while (0)
-#define rx_MutexAdd1Decrement2(object1, addend, object2, mutex) \
-    do { \
-        MUTEX_ENTER(&mutex); \
-        object1 += addend; \
-        atomic_dec_32(&object2); \
-        MUTEX_EXIT(&mutex); \
-    } while (0)
+#define rx_AtomicIncrement_NL(object) atomic_inc_32(&object)
+#define rx_AtomicIncrement(object, mutex) atomic_inc_32(&object)
+#define rx_AtomicOr(object, operand, mutex) atomic_or_32(&object, operand)
+#define rx_AtomicAnd(object, operand, mutex) atomic_and_32(&object, operand)
+#define rx_AtomicAdd_NL(object, addend) atomic_add_32(&object, addend)
+#define rx_AtomicAdd(object, addend, mutex) atomic_add_32(&object, addend)
+#define rx_AtomicDecrement_NL(object) atomic_dec_32(&object)
+#define rx_AtomicDecrement(object, mutex) atomic_dec_32(&object)
+#define rx_AtomicSwap_NL(oldval, newval) rx_AtomicSwap_int(oldval, newval)
+#define rx_AtomicSwap(oldval, newval, mutex) rx_AtomicSwap_int(oldval, newval)
+static inline afs_int32 rx_AtomicSwap_int(afs_int32 *oldval, afs_int32 newval) {
+    afs_int32 ret = *oldval;
+    atomic_cas_32((afs_int32) *oldval,(afs_int32) newval,
+                 (afs_int32*) oldval);
+    return ret;
+}
 #else
+#define rx_AtomicIncrement_NL(object) (object)++
+#define rx_AtomicIncrement(object, mutex) rx_MutexIncrement(object, mutex)
+#define rx_AtomicOr(object, operand, mutex) rx_MutexOr(object, operand, mutex)
+#define rx_AtomicAnd(object, operand, mutex) rx_MutexAnd(object, operand, mutex)
+#define rx_AtomicAdd_NL(object, addend) object += addend
+#define rx_AtomicAdd(object, addend, mutex) rx_MutexAdd(object, addand, mutex)
+#define rx_AtomicDecrement_NL(object) (object)--
+#define rx_AtomicDecrement(object, mutex) rx_MutexDecrement(object, mutex)
+#define rx_AtomicSwap_NL(oldval, newval) rx_AtomicSwap_int(oldval, newval)
+#define rx_AtomicSwap(oldval, newval, mutex) rx_AtomicSwap_int(oldval, newval)
+static inline afs_int32 rx_AtomicSwap_int(afs_int32 *oldval, afs_int32 newval) {
+    afs_int32 ret = *oldval;
+    *oldval = newval;
+    return ret;
+}
+#endif
+#define rx_AtomicPeek_NL(object) rx_AtomicAdd_NL(object, 0)
+#define rx_AtomicPeek(object, mutex) rx_AtomicAdd(object, 0, mutex)
 #define rx_MutexIncrement(object, mutex) \
     do { \
         MUTEX_ENTER(&mutex); \
         object += addend; \
         MUTEX_EXIT(&mutex); \
     } while(0)
+#define rx_MutexDecrement(object, mutex) \
+    do { \
+        MUTEX_ENTER(&mutex); \
+        object--; \
+        MUTEX_EXIT(&mutex); \
+    } while(0)
 #define rx_MutexAdd1Increment2(object1, addend, object2, mutex) \
     do { \
         MUTEX_ENTER(&mutex); \
         object2--; \
         MUTEX_EXIT(&mutex); \
     } while(0)
-#define rx_MutexDecrement(object, mutex) \
+
+#define rx_MutexAdd1AtomicIncrement2(object1, addend, object2, mutex) \
     do { \
         MUTEX_ENTER(&mutex); \
-        object--; \
+        object1 += addend; \
+        rx_AtomicIncrement(&object2); \
         MUTEX_EXIT(&mutex); \
-    } while(0)
-#endif 
-
+    } while (0)
+#define rx_MutexAdd1AtomicDecrement2(object1, addend, object2, mutex) \
+    do { \
+        MUTEX_ENTER(&mutex); \
+        object1 += addend; \
+        rx_AtomicDecrement(&object2); \
+        MUTEX_EXIT(&mutex); \
+    } while (0)
 #endif /* _RX_INTERNAL_H */
index 0b56e0f..9dac112 100644 (file)
@@ -1197,8 +1197,8 @@ rxk_ReadPacket(osi_socket so, struct rx_packet *p, int *host, int *port)
            if (nbytes <= 0) {
                 if (rx_stats_active) {
                     MUTEX_ENTER(&rx_stats_mutex);
-                    rx_stats.bogusPacketOnRead++;
-                    rx_stats.bogusHost = from.sin_addr.s_addr;
+                    rx_AtomicIncrement_NL(rx_stats.bogusPacketOnRead);
+                    rx_AtomicSwap_NL(&rx_stats.bogusHost, from.sin_addr.s_addr);
                     MUTEX_EXIT(&rx_stats_mutex);
                 }
                dpf(("B: bogus packet from [%x,%d] nb=%d",
@@ -1212,11 +1212,8 @@ rxk_ReadPacket(osi_socket so, struct rx_packet *p, int *host, int *port)
            *host = from.sin_addr.s_addr;
            *port = from.sin_port;
            if (p->header.type > 0 && p->header.type < RX_N_PACKET_TYPES) {
-                if (rx_stats_active) {
-                    MUTEX_ENTER(&rx_stats_mutex);
-                    rx_stats.packetsRead[p->header.type - 1]++;
-                    MUTEX_EXIT(&rx_stats_mutex);
-                }
+                if (rx_stats_active) 
+                    rx_AtomicIncrement(rx_stats.packetsRead[p->header.type - 1], rx_stats_mutex);
            }
 
            /* Free any empty packet buffers at the end of this packet */
index bef6846..f4c80a9 100644 (file)
@@ -212,7 +212,7 @@ rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp)
            tv.tv_usec = cv.usec;
            tvp = &tv;
        }
-       rx_stats.selects++;
+       rx_AtomicIncrement(rx_stats.selects, rx_stats_mutex);
 
        *rfds = rx_selectMask;
 
@@ -435,7 +435,7 @@ rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
     fd_set *sfds = (fd_set *) 0;
     while (sendmsg(socket, msg_p, flags) == -1) {
        int err;
-       rx_stats.sendSelects++;
+       rx_AtomicIncrement(rx_stats.sendSelects, rx_stats_mutex);
 
        if (!sfds) {
            if (!(sfds = IOMGR_AllocFDSet())) {
index d15fc92..9201081 100644 (file)
@@ -17,9 +17,11 @@ RCSID
 #include "afs/sysincludes.h"
 #include "rx/rx_internal.h"
 #include "rx/rx.h"
+#include "rx/rx_globals.h"
 #else /* KERNEL */
 # include "rx_internal.h"
 # include "rx.h"
+# include "rx_globals.h"
 #endif /* KERNEL */
 
 /*
index c11fbb9..10fb2e3 100644 (file)
@@ -316,19 +316,19 @@ AllocPacketBufs(int class, int num_pkts, struct rx_queue * q)
         if (rx_stats_active) {
             switch (class) {
             case RX_PACKET_CLASS_RECEIVE:
-                rx_MutexIncrement(rx_stats.receivePktAllocFailures, rx_stats_mutex);
+                rx_AtomicIncrement(rx_stats.receivePktAllocFailures, rx_stats_mutex);
                 break;
             case RX_PACKET_CLASS_SEND:
-                rx_MutexIncrement(rx_stats.sendPktAllocFailures, rx_stats_mutex);
+                rx_AtomicIncrement(rx_stats.sendPktAllocFailures, rx_stats_mutex);
                 break;
             case RX_PACKET_CLASS_SPECIAL:
-                rx_MutexIncrement(rx_stats.specialPktAllocFailures, rx_stats_mutex);
+                rx_AtomicIncrement(rx_stats.specialPktAllocFailures, rx_stats_mutex);
                 break;
             case RX_PACKET_CLASS_RECV_CBUF:
-                rx_MutexIncrement(rx_stats.receiveCbufPktAllocFailures, rx_stats_mutex);
+                rx_AtomicIncrement(rx_stats.receiveCbufPktAllocFailures, rx_stats_mutex);
                 break;
             case RX_PACKET_CLASS_SEND_CBUF:
-                rx_MutexIncrement(rx_stats.sendCbufPktAllocFailures, rx_stats_mutex);
+                rx_AtomicIncrement(rx_stats.sendCbufPktAllocFailures, rx_stats_mutex);
                 break;
             }
        }
@@ -1127,19 +1127,19 @@ rxi_AllocPacketNoLock(int class)
         if (rx_stats_active) {
             switch (class) {
             case RX_PACKET_CLASS_RECEIVE:
-                rx_MutexIncrement(rx_stats.receivePktAllocFailures, rx_stats_mutex);
+                rx_AtomicIncrement(rx_stats.receivePktAllocFailures, rx_stats_mutex);
                 break;
             case RX_PACKET_CLASS_SEND:
-                rx_MutexIncrement(rx_stats.sendPktAllocFailures, rx_stats_mutex);
+                rx_AtomicIncrement(rx_stats.sendPktAllocFailures, rx_stats_mutex);
                 break;
             case RX_PACKET_CLASS_SPECIAL:
-                rx_MutexIncrement(rx_stats.specialPktAllocFailures, rx_stats_mutex);
+                rx_AtomicIncrement(rx_stats.specialPktAllocFailures, rx_stats_mutex);
                 break;
             case RX_PACKET_CLASS_RECV_CBUF:
-                rx_MutexIncrement(rx_stats.receiveCbufPktAllocFailures, rx_stats_mutex);
+                rx_AtomicIncrement(rx_stats.receiveCbufPktAllocFailures, rx_stats_mutex);
                 break;
             case RX_PACKET_CLASS_SEND_CBUF:
-                rx_MutexIncrement(rx_stats.sendCbufPktAllocFailures, rx_stats_mutex);
+                rx_AtomicIncrement(rx_stats.sendCbufPktAllocFailures, rx_stats_mutex);
                 break;
             }
        }
@@ -1148,7 +1148,7 @@ rxi_AllocPacketNoLock(int class)
 #endif /* KERNEL */
 
     if (rx_stats_active)
-        rx_MutexIncrement(rx_stats.packetRequests, rx_stats_mutex);
+        rx_AtomicIncrement(rx_stats.packetRequests, rx_stats_mutex);
     if (queue_IsEmpty(&rx_ts_info->_FPQ)) {
 
 #ifdef KERNEL
@@ -1187,19 +1187,19 @@ rxi_AllocPacketNoLock(int class)
         if (rx_stats_active) {
             switch (class) {
             case RX_PACKET_CLASS_RECEIVE:
-                rx_MutexIncrement(rx_stats.receivePktAllocFailures, rx_stats_mutex);
+                rx_AtomicIncrement(rx_stats.receivePktAllocFailures, rx_stats_mutex);
                 break;
             case RX_PACKET_CLASS_SEND:
-                rx_MutexIncrement(rx_stats.sendPktAllocFailures, rx_stats_mutex);
+                rx_AtomicIncrement(rx_stats.sendPktAllocFailures, rx_stats_mutex);
                 break;
             case RX_PACKET_CLASS_SPECIAL:
-                rx_MutexIncrement(rx_stats.specialPktAllocFailures, rx_stats_mutex);
+                rx_AtomicIncrement(rx_stats.specialPktAllocFailures, rx_stats_mutex);
                 break;
             case RX_PACKET_CLASS_RECV_CBUF:
-                rx_MutexIncrement(rx_stats.receiveCbufPktAllocFailures, rx_stats_mutex);
+                rx_AtomicIncrement(rx_stats.receiveCbufPktAllocFailures, rx_stats_mutex);
                 break;
             case RX_PACKET_CLASS_SEND_CBUF:
-                rx_MutexIncrement(rx_stats.sendCbufPktAllocFailures, rx_stats_mutex);
+                rx_AtomicIncrement(rx_stats.sendCbufPktAllocFailures, rx_stats_mutex);
                 break;
             }
         }
@@ -1208,7 +1208,7 @@ rxi_AllocPacketNoLock(int class)
 #endif /* KERNEL */
 
     if (rx_stats_active)
-        rx_MutexIncrement(rx_stats.packetRequests, rx_stats_mutex);
+        rx_AtomicIncrement(rx_stats.packetRequests, rx_stats_mutex);
 
 #ifdef KERNEL
     if (queue_IsEmpty(&rx_freePacketQueue))
@@ -1245,7 +1245,7 @@ rxi_AllocPacketTSFPQ(int class, int pull_global)
     RX_TS_INFO_GET(rx_ts_info);
 
     if (rx_stats_active)
-        rx_MutexIncrement(rx_stats.packetRequests, rx_stats_mutex);
+        rx_AtomicIncrement(rx_stats.packetRequests, rx_stats_mutex);
     if (pull_global && queue_IsEmpty(&rx_ts_info->_FPQ)) {
         MUTEX_ENTER(&rx_freePktQ_lock);
 
@@ -1466,12 +1466,12 @@ rxi_ReadPacket(osi_socket socket, register struct rx_packet *p, afs_uint32 * hos
     if ((nbytes > tlen) || (p->length & 0x8000)) {     /* Bogus packet */
        if (nbytes < 0 && errno == EWOULDBLOCK) {
             if (rx_stats_active)
-                rx_MutexIncrement(rx_stats.noPacketOnRead, rx_stats_mutex);
+               rx_AtomicIncrement(rx_stats.noPacketOnRead, rx_stats_mutex);
        } else if (nbytes <= 0) {
             if (rx_stats_active) {
                 MUTEX_ENTER(&rx_stats_mutex);
-                rx_stats.bogusPacketOnRead++;
-                rx_stats.bogusHost = from.sin_addr.s_addr;
+                rx_AtomicIncrement_NL(rx_stats.bogusPacketOnRead);
+                rx_AtomicSwap(&rx_stats.bogusHost, from.sin_addr.s_addr, rx_stats_mutex);
                 MUTEX_EXIT(&rx_stats_mutex);
             }
            dpf(("B: bogus packet from [%x,%d] nb=%d", ntohl(from.sin_addr.s_addr),
@@ -1504,7 +1504,7 @@ rxi_ReadPacket(osi_socket socket, register struct rx_packet *p, afs_uint32 * hos
        if (p->header.type > 0 && p->header.type < RX_N_PACKET_TYPES) {
            struct rx_peer *peer;
             if (rx_stats_active)
-                rx_MutexIncrement(rx_stats.packetsRead[p->header.type - 1], rx_stats_mutex);
+                rx_AtomicIncrement(rx_stats.packetsRead[p->header.type - 1], rx_stats_mutex);
            /*
             * Try to look up this peer structure.  If it doesn't exist,
             * don't create a new one - 
@@ -1521,7 +1521,7 @@ rxi_ReadPacket(osi_socket socket, register struct rx_packet *p, afs_uint32 * hos
             * it may have no refCount, meaning we could race with
             * ReapConnections
             */
-           if (peer && (peer->refCount > 0)) {
+           if (peer && (rx_AtomicPeek_NL(peer->refCount) > 0)) {
                MUTEX_ENTER(&peer->peer_lock);
                hadd32(peer->bytesReceived, p->length);
                MUTEX_EXIT(&peer->peer_lock);
@@ -1984,7 +1984,7 @@ rxi_ReceiveDebugPacket(register struct rx_packet *ap, osi_socket asocket,
                        tpeer.port = tp->port;
                        tpeer.ifMTU = htons(tp->ifMTU);
                        tpeer.idleWhen = htonl(tp->idleWhen);
-                       tpeer.refCount = htons(tp->refCount);
+                       tpeer.refCount = htons(rx_AtomicPeek_NL(tp->refCount));
                        tpeer.burstSize = tp->burstSize;
                        tpeer.burst = tp->burst;
                        tpeer.burstWait.sec = htonl(tp->burstWait.sec);
@@ -2273,7 +2273,7 @@ rxi_SendPacket(struct rx_call *call, struct rx_connection *conn,
                         p->length + RX_HEADER_SIZE, istack)) != 0) {
            /* send failed, so let's hurry up the resend, eh? */
             if (rx_stats_active)
-                rx_MutexIncrement(rx_stats.netSendFailures, rx_stats_mutex);
+                rx_AtomicIncrement(rx_stats.netSendFailures, rx_stats_mutex);
            p->retryTime = p->timeSent; /* resend it very soon */
            clock_Addmsec(&(p->retryTime),
                          10 + (((afs_uint32) p->backoff) << 8));
@@ -2314,7 +2314,7 @@ rxi_SendPacket(struct rx_call *call, struct rx_connection *conn,
     dpf(("%c %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %lx resend %d.%0.3d len %d", deliveryType, p->header.serial, rx_packetTypes[p->header.type - 1], ntohl(peer->host), ntohs(peer->port), p->header.serial, p->header.epoch, p->header.cid, p->header.callNumber, p->header.seq, p->header.flags, (unsigned long)p, p->retryTime.sec, p->retryTime.usec / 1000, p->length));
 #endif
     if (rx_stats_active)
-        rx_MutexIncrement(rx_stats.packetsSent[p->header.type - 1], rx_stats_mutex);
+        rx_AtomicIncrement(rx_stats.packetsSent[p->header.type - 1], rx_stats_mutex);
     MUTEX_ENTER(&peer->peer_lock);
     hadd32(peer->bytesSent, p->length);
     MUTEX_EXIT(&peer->peer_lock);
@@ -2460,7 +2460,7 @@ rxi_SendPacketList(struct rx_call *call, struct rx_connection *conn,
                         istack)) != 0) {
            /* send failed, so let's hurry up the resend, eh? */
             if (rx_stats_active)
-                rx_MutexIncrement(rx_stats.netSendFailures, rx_stats_mutex);
+                rx_AtomicIncrement(rx_stats.netSendFailures, rx_stats_mutex);
            for (i = 0; i < len; i++) {
                p = list[i];
                p->retryTime = p->timeSent;     /* resend it very soon */
@@ -2498,7 +2498,7 @@ rxi_SendPacketList(struct rx_call *call, struct rx_connection *conn,
 
 #endif
     if (rx_stats_active)
-        rx_MutexIncrement(rx_stats.packetsSent[p->header.type - 1], rx_stats_mutex);
+        rx_AtomicIncrement(rx_stats.packetsSent[p->header.type - 1], rx_stats_mutex);
     MUTEX_ENTER(&peer->peer_lock);
     hadd32(peer->bytesSent, p->length);
     MUTEX_EXIT(&peer->peer_lock);
index 1a059d9..000caf7 100644 (file)
@@ -200,9 +200,7 @@ rxi_GetHostUDPSocket(u_int ahost, u_short port)
            (osi_Msg "%s*WARNING* Unable to increase buffering on socket\n",
             name);
        if (rx_stats_active) {
-            MUTEX_ENTER(&rx_stats_mutex);
-            rx_stats.socketGreedy = greedy;
-            MUTEX_EXIT(&rx_stats_mutex);
+           rx_AtomicSwap(&rx_stats.socketGreedy, greedy, rx_stats_mutex);
         }
     }