rx: Add atomic operations code
[openafs.git] / src / rx / rx.c
index f54d510..267ebcb 100644 (file)
@@ -69,6 +69,7 @@
 #include "rx.h"
 #include "rx_globals.h"
 #include "rx_trace.h"
+#include "rx_atomic.h"
 #define        AFSOP_STOP_RXCALLBACK   210     /* Stop CALLBACK process */
 #define        AFSOP_STOP_AFS          211     /* Stop AFS process */
 #define        AFSOP_STOP_BKG          212     /* Stop BKG process */
@@ -104,6 +105,7 @@ extern afs_int32 afs_termState;
 # include "rx_user.h"
 # include "rx_clock.h"
 # include "rx_queue.h"
+# include "rx_atomic.h"
 # include "rx_globals.h"
 # include "rx_trace.h"
 # include <afs/rxgen_consts.h>
@@ -156,6 +158,10 @@ static unsigned int rxi_rpc_process_stat_cnt;
 #include <stddef.h>            /* for definition of offsetof() */
 #endif
 
+#ifdef RX_ENABLE_LOCKS
+afs_kmutex_t rx_atomic_mutex;
+#endif
+
 #ifdef AFS_PTHREAD_ENV
 #include <assert.h>
 
@@ -196,6 +202,7 @@ rxi_InitPthread(void)
     MUTEX_INIT(&rx_clock_mutex, "clock", MUTEX_DEFAULT, 0);
     MUTEX_INIT(&rx_stats_mutex, "stats", MUTEX_DEFAULT, 0);
     MUTEX_INIT(&rx_waiting_mutex, "waiting", MUTEX_DEFAULT, 0);
+    MUTEX_INIT(&rx_atomic_mutex, "atomic", MUTEX_DEFAULT, 0);
     MUTEX_INIT(&rx_quota_mutex, "quota", MUTEX_DEFAULT, 0);
     MUTEX_INIT(&rx_pthread_mutex, "pthread", MUTEX_DEFAULT, 0);
     MUTEX_INIT(&rx_packets_mutex, "packets", MUTEX_DEFAULT, 0);
@@ -346,6 +353,7 @@ struct rx_connection *rxLastConn = 0;
  *     multi_handle->lock
  *     rxevent_lock
  *     rx_stats_mutex
+ *     rx_atomic_mutex
  *
  * Do we need a lock to protect the peer field in the conn structure?
  *      conn->peer was previously a constant for all intents and so has no
@@ -4899,23 +4907,6 @@ rxi_ResetCall(struct rx_call *call, int newcall)
     rxi_ClearReceiveQueue(call);
     /* why init the queue if you just emptied it? queue_Init(&call->rq); */
 
-    if (call->currentPacket) {
-#ifdef RX_TRACK_PACKETS
-        call->currentPacket->flags &= ~RX_PKTFLAG_CP;
-        call->currentPacket->flags |= RX_PKTFLAG_IOVQ;
-#endif
-        queue_Prepend(&call->iovq, call->currentPacket);
-#ifdef RXDEBUG_PACKET
-        call->iovqc++;
-#endif /* RXDEBUG_PACKET */
-        call->currentPacket = (struct rx_packet *)0;
-    }
-    call->curlen = call->nLeft = call->nFree = 0;
-
-#ifdef RXDEBUG_PACKET
-    call->iovqc -=
-#endif
-        rxi_FreePackets(0, &call->iovq);
 
     call->error = 0;
     call->twind = call->conn->twind[call->channel];
@@ -5568,8 +5559,9 @@ rxi_Start(struct rxevent *event,
         * recent additions.
         * Do a dance to avoid blocking after setting now. */
        MUTEX_ENTER(&peer->peer_lock);
-       retryTime = peer->timeout;
+        retryTime = peer->timeout;
        MUTEX_EXIT(&peer->peer_lock);
+
        clock_GetTime(&now);
        clock_Add(&retryTime, &now);
        usenow = now;
@@ -6768,7 +6760,7 @@ rxi_ComputeRate(struct rx_peer *peer, struct rx_call *call,
        } else {
            return;
        }
-       xferSize = rx_AckDataSize(rx_Window) + RX_HEADER_SIZE;
+       xferSize = rx_AckDataSize(rx_maxSendWindow) + RX_HEADER_SIZE;
        break;
 
     default:
@@ -6823,9 +6815,9 @@ rxi_ComputeRate(struct rx_peer *peer, struct rx_call *call,
      * one packet exchange */
     if (clock_Gt(&newTO, &peer->timeout)) {
 
-       dpf(("CONG peer %lx/%u: timeout %d.%06d ==> %ld.%06d (rtt %u, ps %u)",
+       dpf(("CONG peer %lx/%u: timeout %d.%06d ==> %ld.%06d (rtt %u)",
               ntohl(peer->host), ntohs(peer->port), peer->timeout.sec, peer->timeout.usec,
-              newTO.sec, newTO.usec, peer->smRtt, peer->packetSize));
+              newTO.sec, newTO.usec, peer->smRtt));
 
        peer->timeout = newTO;
     }
@@ -6835,18 +6827,18 @@ rxi_ComputeRate(struct rx_peer *peer, struct rx_call *call,
     /* Now, convert to the number of full packets that could fit in a
      * reasonable fraction of that interval */
     minTime /= (peer->smRtt << 1);
+    minTime = MAX(minTime, rx_minPeerTimeout);
     xferSize = minTime;                /* (make a copy) */
 
     /* Now clamp the size to reasonable bounds. */
     if (minTime <= 1)
        minTime = 1;
-    else if (minTime > rx_Window)
-       minTime = rx_Window;
+    else if (minTime > rx_maxSendWindow)
+       minTime = rx_maxSendWindow;
 /*    if (minTime != peer->maxWindow) {
-      dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, rtt %u, ps %u)",
+      dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, rtt %u)",
             ntohl(peer->host), ntohs(peer->port), peer->maxWindow, minTime,
-            peer->timeout.sec, peer->timeout.usec, peer->smRtt,
-            peer->packetSize));
+            peer->timeout.sec, peer->timeout.usec, peer->smRtt));
       peer->maxWindow = minTime;
        elide... call->twind = minTime;
     }
@@ -6855,13 +6847,13 @@ rxi_ComputeRate(struct rx_peer *peer, struct rx_call *call,
     /* Cut back on the peer timeout if it had earlier grown unreasonably.
      * Discern this by calculating the timeout necessary for rx_Window
      * packets. */
-    if ((xferSize > rx_Window) && (peer->timeout.sec >= 3)) {
+    if ((xferSize > rx_maxSendWindow) && (peer->timeout.sec >= 3)) {
        /* calculate estimate for transmission interval in milliseconds */
-       minTime = rx_Window * peer->smRtt;
+       minTime = rx_maxSendWindow * peer->smRtt;
        if (minTime < 1000) {
-           dpf(("CONG peer %lx/%u: cut TO %d.%06d by 0.5 (rtt %u, ps %u)",
+           dpf(("CONG peer %lx/%u: cut TO %d.%06d by 0.5 (rtt %u)",
                 ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
-                peer->timeout.usec, peer->smRtt, peer->packetSize));
+                peer->timeout.usec, peer->smRtt));
 
            newTO.sec = 0;      /* cut back on timeout by half a second */
            newTO.usec = 500000;
@@ -6974,7 +6966,6 @@ void
 rx_PrintTheseStats(FILE * file, struct rx_statistics *s, int size,
                   afs_int32 freePackets, char version)
 {
-#ifdef RXDEBUG
     int i;
 
     if (size != sizeof(struct rx_statistics)) {
@@ -7049,9 +7040,6 @@ rx_PrintTheseStats(FILE * file, struct rx_statistics *s, int size,
 #if    !defined(AFS_PTHREAD_ENV) && !defined(AFS_USE_GETTIMEOFDAY)
     fprintf(file, "   %d clock updates\n", clock_nUpdates);
 #endif
-#else
-    fprintf(file, "ERROR: compiled without RXDEBUG\n");
-#endif
 }
 
 /* for backward compatibility */
@@ -7068,7 +7056,7 @@ void
 rx_PrintPeerStats(FILE * file, struct rx_peer *peer)
 {
     fprintf(file, "Peer %x.%d.  " "Burst size %d, " "burst wait %d.%06d.\n",
-           ntohl(peer->host), (int)peer->port, (int)peer->burstSize,
+           ntohl(peer->host), (int)ntohs(peer->port), (int)peer->burstSize,
            (int)peer->burstWait.sec, (int)peer->burstWait.usec);
 
     fprintf(file,
@@ -7096,7 +7084,7 @@ rx_PrintPeerStats(FILE * file, struct rx_peer *peer)
 #define UNLOCK_RX_DEBUG
 #endif /* AFS_PTHREAD_ENV */
 
-#ifdef RXDEBUG
+#if defined(RXDEBUG) || defined(MAKEDEBUGCALL)
 static int
 MakeDebugCall(osi_socket socket, afs_uint32 remoteAddr, afs_uint16 remotePort,
              u_char type, void *inputData, size_t inputLength,
@@ -7212,9 +7200,7 @@ rx_GetServerDebug(osi_socket socket, afs_uint32 remoteAddr,
                  afs_uint16 remotePort, struct rx_debugStats * stat,
                  afs_uint32 * supportedValues)
 {
-#ifndef RXDEBUG
-     afs_int32 rc = -1;
-#else
+#if defined(RXDEBUG) || defined(MAKEDEBUGCALL)
     afs_int32 rc = 0;
     struct rx_debugIn in;
 
@@ -7267,6 +7253,8 @@ rx_GetServerDebug(osi_socket socket, afs_uint32 remoteAddr,
         stat->nWaited = ntohl(stat->nWaited);
         stat->nPackets = ntohl(stat->nPackets);
     }
+#else
+    afs_int32 rc = -1;
 #endif
     return rc;
 }
@@ -7276,9 +7264,7 @@ rx_GetServerStats(osi_socket socket, afs_uint32 remoteAddr,
                  afs_uint16 remotePort, struct rx_statistics * stat,
                  afs_uint32 * supportedValues)
 {
-#ifndef RXDEBUG
-     afs_int32 rc = -1;
-#else
+#if defined(RXDEBUG) || defined(MAKEDEBUGCALL)
     afs_int32 rc = 0;
     struct rx_debugIn in;
     afs_int32 *lp = (afs_int32 *) stat;
@@ -7307,6 +7293,8 @@ rx_GetServerStats(osi_socket socket, afs_uint32 remoteAddr,
            *lp = ntohl(*lp);
        }
     }
+#else
+    afs_int32 rc = -1;
 #endif
     return rc;
 }
@@ -7316,7 +7304,7 @@ rx_GetServerVersion(osi_socket socket, afs_uint32 remoteAddr,
                    afs_uint16 remotePort, size_t version_length,
                    char *version)
 {
-#ifdef RXDEBUG
+#if defined(RXDEBUG) || defined(MAKEDEBUGCALL)
     char a[1] = { 0 };
     return MakeDebugCall(socket, remoteAddr, remotePort,
                         RX_PACKET_TYPE_VERSION, a, 1, version,
@@ -7333,9 +7321,7 @@ rx_GetServerConnections(osi_socket socket, afs_uint32 remoteAddr,
                        struct rx_debugConn * conn,
                        afs_uint32 * supportedValues)
 {
-#ifndef RXDEBUG
-    afs_int32 rc = -1;
-#else
+#if defined(RXDEBUG) || defined(MAKEDEBUGCALL)
     afs_int32 rc = 0;
     struct rx_debugIn in;
     int i;
@@ -7409,6 +7395,8 @@ rx_GetServerConnections(osi_socket socket, afs_uint32 remoteAddr,
        conn->epoch = ntohl(conn->epoch);
        conn->natMTU = ntohl(conn->natMTU);
     }
+#else
+    afs_int32 rc = -1;
 #endif
     return rc;
 }
@@ -7419,9 +7407,7 @@ rx_GetServerPeers(osi_socket socket, afs_uint32 remoteAddr,
                  afs_uint32 debugSupportedValues, struct rx_debugPeer * peer,
                  afs_uint32 * supportedValues)
 {
-#ifndef RXDEBUG
-    afs_int32 rc = -1;
-#else
+#if defined(RXDEBUG) || defined(MAKEDEBUGCALL)
     afs_int32 rc = 0;
     struct rx_debugIn in;
 
@@ -7474,6 +7460,8 @@ rx_GetServerPeers(osi_socket socket, afs_uint32 remoteAddr,
        peer->bytesReceived.high = ntohl(peer->bytesReceived.high);
        peer->bytesReceived.low = ntohl(peer->bytesReceived.low);
     }
+#else
+    afs_int32 rc = -1;
 #endif
     return rc;
 }