rx: Convert rxinit_status to rx_IsRunning()
[openafs.git] / src / rx / rx.c
index 9dc2e2b..7615825 100644 (file)
@@ -160,6 +160,12 @@ static void rxi_CancelDelayedAbortEvent(struct rx_call *call);
 static void rxi_CancelGrowMTUEvent(struct rx_call *call);
 static void update_nextCid(void);
 
+#ifndef KERNEL
+static void rxi_Finalize_locked(void);
+#elif defined(UKERNEL)
+# define rxi_Finalize_locked() do { } while (0)
+#endif
+
 #ifdef RX_ENABLE_LOCKS
 struct rx_tq_debug {
     rx_atomic_t rxi_start_aborted; /* rxi_start awoke after rxi_Send in error.*/
@@ -442,19 +448,35 @@ static int rxdb_fileID = RXDB_FILE_RX;
 #endif /* RX_ENABLE_LOCKS */
 struct rx_serverQueueEntry *rx_waitForPacket = 0;
 
+/*
+ * This mutex serializes calls to our initialization and shutdown routines
+ * (rx_InitHost, rx_Finalize and shutdown_rx). Only one thread can be running
+ * these at any time; all other threads must wait for it to finish running, and
+ * then examine the value of rxi_running afterwards.
+ */
+#ifdef AFS_PTHREAD_ENV
+# define LOCK_RX_INIT MUTEX_ENTER(&rx_init_mutex)
+# define UNLOCK_RX_INIT MUTEX_EXIT(&rx_init_mutex)
+#else
+# define LOCK_RX_INIT
+# define UNLOCK_RX_INIT
+#endif
+
 /* ------------Exported Interfaces------------- */
 
+static rx_atomic_t rxi_running = RX_ATOMIC_INIT(0);
+int
+rxi_IsRunning(void)
+{
+    return rx_atomic_read(&rxi_running);
+}
+
 /* Initialize rx.  A port number may be mentioned, in which case this
  * becomes the default port number for any service installed later.
  * If 0 is provided for the port number, a random port will be chosen
  * by the kernel.  Whether this will ever overlap anything in
  * /etc/services is anybody's guess...  Returns 0 on success, -1 on
  * error. */
-#if !(defined(AFS_NT40_ENV) || defined(RXK_UPCALL_ENV))
-static
-#endif
-rx_atomic_t rxinit_status = RX_ATOMIC_INIT(1);
-
 int
 rx_InitHost(u_int host, u_int port)
 {
@@ -468,15 +490,17 @@ rx_InitHost(u_int host, u_int port)
     SPLVAR;
 
     INIT_PTHREAD_LOCKS;
-    if (!rx_atomic_test_and_clear_bit(&rxinit_status, 0))
+    LOCK_RX_INIT;
+    if (rxi_IsRunning()) {
+       UNLOCK_RX_INIT;
        return 0; /* already started */
-
+    }
 #ifdef RXDEBUG
     rxi_DebugInit();
 #endif
 #ifdef AFS_NT40_ENV
     if (afs_winsockInit() < 0)
-       return -1;
+       goto error;
 #endif
 
 #ifndef KERNEL
@@ -492,7 +516,7 @@ rx_InitHost(u_int host, u_int port)
 
     rx_socket = rxi_GetHostUDPSocket(host, (u_short) port);
     if (rx_socket == OSI_NULLSOCKET) {
-       return RX_ADDRINUSE;
+        goto addrinuse;
     }
 #if defined(RX_ENABLE_LOCKS) && defined(KERNEL)
 #ifdef RX_LOCKS_DB
@@ -580,19 +604,19 @@ rx_InitHost(u_int host, u_int port)
        socklen_t addrlen = sizeof(addr);
 #endif
        if (getsockname((intptr_t)rx_socket, (struct sockaddr *)&addr, &addrlen)) {
-           rx_Finalize();
+           rxi_Finalize_locked();
            osi_Free(htable, rx_hashTableSize * sizeof(struct rx_connection *));
-           return -1;
+           goto error;
        }
        rx_port = addr.sin_port;
 #endif
     }
     rx_stats.minRtt.sec = 9999999;
     if (RAND_bytes(&rx_epoch, sizeof(rx_epoch)) != 1)
-       return -1;
+       goto error;
     rx_epoch  = (rx_epoch & ~0x40000000) | 0x80000000;
     if (RAND_bytes(&rx_nextCid, sizeof(rx_nextCid)) != 1)
-       return -1;
+       goto error;
     rx_nextCid &= RX_CIDMASK;
     MUTEX_ENTER(&rx_quota_mutex);
     rxi_dataQuota += rx_extraQuota; /* + extra pkts caller asked to rsrv */
@@ -623,8 +647,19 @@ rx_InitHost(u_int host, u_int port)
     rxi_StartListener();
 
     USERPRI;
-    rx_atomic_clear_bit(&rxinit_status, 0);
+
+    rx_atomic_set(&rxi_running, 1);
+    UNLOCK_RX_INIT;
+
     return 0;
+
+ addrinuse:
+    UNLOCK_RX_INIT;
+    return RX_ADDRINUSE;
+
+ error:
+    UNLOCK_RX_INIT;
+    return -1;
 }
 
 int
@@ -2492,12 +2527,21 @@ rx_EndCall(struct rx_call *call, afs_int32 rc)
 void
 rx_Finalize(void)
 {
-    struct rx_connection **conn_ptr, **conn_end;
-
     INIT_PTHREAD_LOCKS;
-    if (rx_atomic_test_and_set_bit(&rxinit_status, 0))
+    LOCK_RX_INIT;
+    if (!rxi_IsRunning()) {
+       UNLOCK_RX_INIT;
        return;                 /* Already shutdown. */
+    }
+    rxi_Finalize_locked();
+    UNLOCK_RX_INIT;
+}
 
+static void
+rxi_Finalize_locked(void)
+{
+    struct rx_connection **conn_ptr, **conn_end;
+    rx_atomic_set(&rxi_running, 0);
     rxi_DeleteCachedConnections();
     if (rx_connHashTable) {
        MUTEX_ENTER(&rx_connHashTable_lock);
@@ -2534,7 +2578,6 @@ rx_Finalize(void)
 #ifdef AFS_NT40_ENV
     afs_winsockCleanup();
 #endif
-
 }
 #endif
 
@@ -3165,10 +3208,15 @@ static_inline int
 rxi_AbortIfServerBusy(osi_socket socket, struct rx_connection *conn,
                      struct rx_packet *np)
 {
+    afs_uint32 serial;
+
     if ((rx_BusyThreshold > 0) &&
        (rx_atomic_read(&rx_nWaiting) > rx_BusyThreshold)) {
+       MUTEX_ENTER(&conn->conn_data_lock);
+       serial = ++conn->serial;
+       MUTEX_EXIT(&conn->conn_data_lock);
        rxi_SendRawAbort(socket, conn->peer->host, conn->peer->port,
-                        rx_BusyError, np, 0);
+                        serial, rx_BusyError, np, 0);
        if (rx_stats_active)
            rx_atomic_inc(&rx_stats.nBusies);
        return 1;
@@ -3395,7 +3443,7 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket,
        memset(&addr.sin_zero, 0, sizeof(addr.sin_zero));
 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
        addr.sin_len = sizeof(addr);
-#endif /* AFS_OSF_ENV */
+#endif
        drop = (*rx_justReceived) (np, &addr);
        /* drop packet if return value is non-zero */
        if (drop)
@@ -3420,7 +3468,7 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket,
        don't abort an abort. */
     if (!conn) {
         if (unknownService && (np->header.type != RX_PACKET_TYPE_ABORT))
-            rxi_SendRawAbort(socket, host, port, RX_INVALID_OPERATION,
+           rxi_SendRawAbort(socket, host, port, 0, RX_INVALID_OPERATION,
                              np, 0);
         return np;
     }
@@ -4427,12 +4475,20 @@ rxi_ReceiveAckPacket(struct rx_call *call, struct rx_packet *np,
        rx_packetread(np, rx_AckDataSize(ap->nAcks) + (int)sizeof(afs_int32),
                      (int)sizeof(afs_int32), &tSize);
        tSize = (afs_uint32) ntohl(tSize);
+       if (tSize > RX_MAX_PACKET_SIZE)
+           tSize = RX_MAX_PACKET_SIZE;
+       if (tSize < RX_MIN_PACKET_SIZE)
+           tSize = RX_MIN_PACKET_SIZE;
        peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
 
        /* Get the maximum packet size to send to this peer */
        rx_packetread(np, rx_AckDataSize(ap->nAcks), (int)sizeof(afs_int32),
                      &tSize);
        tSize = (afs_uint32) ntohl(tSize);
+       if (tSize > RX_MAX_PACKET_SIZE)
+           tSize = RX_MAX_PACKET_SIZE;
+       if (tSize < RX_MIN_PACKET_SIZE)
+           tSize = RX_MIN_PACKET_SIZE;
        tSize = (afs_uint32) MIN(tSize, rx_MyMaxSendSize);
        tSize = rxi_AdjustMaxMTU(peer->natMTU, tSize);
 
@@ -4454,6 +4510,10 @@ rxi_ReceiveAckPacket(struct rx_call *call, struct rx_packet *np,
                          rx_AckDataSize(ap->nAcks) + 2 * (int)sizeof(afs_int32),
                          (int)sizeof(afs_int32), &tSize);
            tSize = (afs_uint32) ntohl(tSize);  /* peer's receive window, if it's */
+           if (tSize == 0)
+               tSize = 1;
+           if (tSize >= rx_maxSendWindow)
+               tSize = rx_maxSendWindow;
            if (tSize < call->twind) {  /* smaller than our send */
                call->twind = tSize;    /* window, we must send less... */
                call->ssthresh = MIN(call->twind, call->ssthresh);
@@ -4475,6 +4535,10 @@ rxi_ReceiveAckPacket(struct rx_call *call, struct rx_packet *np,
                          rx_AckDataSize(ap->nAcks) + 2 * (int)sizeof(afs_int32),
                          sizeof(afs_int32), &tSize);
            tSize = (afs_uint32) ntohl(tSize);
+           if (tSize == 0)
+               tSize = 1;
+           if (tSize >= rx_maxSendWindow)
+               tSize = rx_maxSendWindow;
            /*
             * As of AFS 3.5 we set the send window to match the receive window.
             */
@@ -5070,7 +5134,14 @@ rxi_SendCallAbort(struct rx_call *call, struct rx_packet *packet,
     if (rx_IsClientConn(call->conn))
        force = 1;
 
-    if (call->abortCode != call->error) {
+    /*
+     * An opcode that has been deprecated or has yet to be implemented is not
+     * a misbehavior of the client.  Do not punish the client by introducing
+     * delays.
+     */
+    if (call->error == RXGEN_OPCODE) {
+       force = 1;
+    } else if (call->abortCode != call->error) {
        call->abortCode = call->error;
        call->abortCount = 0;
     }
@@ -5079,7 +5150,8 @@ rxi_SendCallAbort(struct rx_call *call, struct rx_packet *packet,
        || call->abortCount < rxi_callAbortThreshhold) {
        rxi_CancelDelayedAbortEvent(call);
        error = htonl(call->error);
-       call->abortCount++;
+       if (!force)
+           call->abortCount++;
        packet =
            rxi_SendSpecial(call, call->conn, packet, RX_PACKET_TYPE_ABORT,
                            (char *)&error, sizeof(error), istack);
@@ -6673,27 +6745,30 @@ rxi_SendDelayedCallAbort(struct rxevent *event, void *arg1, void *dummy,
  *
  * This routine is both an event handler and a function called directly;
  * when called directly the passed |event| is NULL and the
- * conn->conn->data>lock must must not be held.
+ * conn->conn->data>lock must must not be held.  Also, when called as an
+ * an event handler, we must putConnection before we exit; but when called
+ * directly (the first challenge), we must NOT putConnection.
  */
 static void
 rxi_ChallengeEvent(struct rxevent *event,
                   void *arg0, void *arg1, int tries)
 {
     struct rx_connection *conn = arg0;
+    int event_raised = 0;      /* assume we were called directly */
 
     MUTEX_ENTER(&conn->conn_data_lock);
-    if (event != NULL && event == conn->challengeEvent)
+    if (event != NULL && event == conn->challengeEvent) {
+       event_raised = 1;       /* called as an event */
        rxevent_Put(&conn->challengeEvent);
+    }
     MUTEX_EXIT(&conn->conn_data_lock);
 
     /* If there are no active calls it is not worth re-issuing the
      * challenge.  If the client issues another call on this connection
      * the challenge can be requested at that time.
      */
-    if (!rxi_HasActiveCalls(conn)) {
-       putConnection(conn);
-        return;
-    }
+    if (!rxi_HasActiveCalls(conn))
+       goto done;
 
     if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) {
        struct rx_packet *packet;
@@ -6719,8 +6794,7 @@ rxi_ChallengeEvent(struct rxevent *event,
                }
            }
            MUTEX_EXIT(&conn->conn_call_lock);
-           putConnection(conn);
-           return;
+           goto done;
        }
 
        packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
@@ -6745,7 +6819,9 @@ rxi_ChallengeEvent(struct rxevent *event,
        }
        MUTEX_EXIT(&conn->conn_data_lock);
     }
-    putConnection(conn);
+ done:
+    if (event_raised)
+       putConnection(conn);
 }
 
 /* Call this routine to start requesting the client to authenticate
@@ -7810,9 +7886,12 @@ shutdown_rx(void)
     struct rx_serverQueueEntry *sq;
 #endif /* KERNEL */
 
-    if (rx_atomic_test_and_set_bit(&rxinit_status, 0))
+    LOCK_RX_INIT;
+    if (!rxi_IsRunning()) {
+       UNLOCK_RX_INIT;
        return;                 /* Already shutdown. */
-
+    }
+    rx_atomic_set(&rxi_running, 0);
 #ifndef KERNEL
     rx_port = 0;
 #ifndef AFS_PTHREAD_ENV
@@ -7934,6 +8013,7 @@ shutdown_rx(void)
     rxi_dataQuota = RX_MAX_QUOTA;
     rxi_availProcs = rxi_totalMin = rxi_minDeficit = 0;
     MUTEX_EXIT(&rx_quota_mutex);
+    UNLOCK_RX_INIT;
 }
 
 #ifndef KERNEL