Windows: NOPROBE means probe in background thread
[openafs.git] / src / WINNT / afsd / cm_server.c
index 32eeff8..1b0b6c8 100644 (file)
@@ -50,6 +50,28 @@ cm_ForceNewConnectionsAllServers(void)
     lock_ReleaseRead(&cm_serverLock);
 }
 
+void
+cm_ServerClearRPCStats(void) {
+    cm_server_t *tsp;
+    afs_uint16 port;
+
+    lock_ObtainRead(&cm_serverLock);
+    for (tsp = cm_allServersp; tsp; tsp = tsp->allNextp) {
+        switch (tsp->type) {
+        case CM_SERVER_VLDB:
+           port = htons(7003);
+            rx_ClearPeerRPCStats(opcode_VL_ProbeServer>>32, tsp->addr.sin_addr.s_addr, port);
+           break;
+       case CM_SERVER_FILE:
+           port = htons(7000);
+            rx_ClearPeerRPCStats(opcode_RXAFS_GetCapabilities>>32, tsp->addr.sin_addr.s_addr, port);
+            rx_ClearPeerRPCStats(opcode_RXAFS_GetTime>>32, tsp->addr.sin_addr.s_addr, port);
+           break;
+        }
+    }
+    lock_ReleaseRead(&cm_serverLock);
+}
+
 /*
  * lock_ObtainMutex must be held prior to calling
  * this function.
@@ -59,37 +81,102 @@ cm_RankServer(cm_server_t * tsp)
 {
     afs_int32 code = 0; /* start with "success" */
     struct rx_debugPeer tpeer;
+    struct rx_peer * rxPeer;
     afs_uint16 port;
-    afs_uint16 newRank;
+    afs_uint64 newRank;
+    afs_uint64 perfRank = 0;
+    afs_uint64 rtt = 0;
+    double log_rtt;
+
+    int isDown = (tsp->flags & CM_SERVERFLAG_DOWN);
+    void *peerRpcStats = NULL;
+    afs_uint64 opcode = 0;
 
     switch(tsp->type) {
        case CM_SERVER_VLDB:
            port = htons(7003);
+            opcode = opcode_VL_ProbeServer;
            break;
        case CM_SERVER_FILE:
            port = htons(7000);
+            opcode = opcode_RXAFS_GetCapabilities;
            break;
        default:
            return -1;
     }
 
-    code = rx_GetLocalPeers(tsp->addr.sin_addr.s_addr, port, &tpeer);
+    cm_SetServerIPRank(tsp);
+
+    if (isDown) {
+        newRank = 0xFFFF;
+    } else {
+        /*
+        * There are three potential components to the ranking:
+        *  1. Any administrative set preference whether it be
+        *     via "fs setserverprefs", registry or dns.
+        *
+        *  2. Network subnet mask comparison.
+        *
+        *  3. Performance data.
+        *
+        * If there is an administrative rank, that is the
+        * the primary factor.  If not the primary factor
+        * is the network ranking.
+        */
+
+        code = rx_GetLocalPeers(tsp->addr.sin_addr.s_addr, port, &tpeer);
+        if (code == 0) {
+            peerRpcStats = rx_CopyPeerRPCStats(opcode, tsp->addr.sin_addr.s_addr, port);
+            if (peerRpcStats == NULL && tsp->type == CM_SERVER_FILE)
+                peerRpcStats = rx_CopyPeerRPCStats(opcode_RXAFS_GetTime, tsp->addr.sin_addr.s_addr, port);
+            if (peerRpcStats) {
+                afs_uint64 execTimeSum = _8THMSEC(RPCOpStat_ExecTimeSum(peerRpcStats));
+                afs_uint64 queueTimeSum = _8THMSEC(RPCOpStat_QTimeSum(peerRpcStats));
+                afs_uint64 numCalls = RPCOpStat_NumCalls(peerRpcStats);
+
+                if (numCalls > 0)
+                    rtt = (execTimeSum - queueTimeSum) / numCalls;
+
+                rx_ReleaseRPCStats(peerRpcStats);
+            }
 
-    /*check if rx_GetLocalPeers succeeded and if there is data for tsp */
-    if(code == 0 && (tpeer.rtt == 0 && tpeer.rtt_dev == 0))
-       code = -1;
+            if (rtt == 0 && tpeer.rtt) {
+                /* rtt is ms/8 */
+                rtt = tpeer.rtt;
+            }
+
+            if (rtt > 0) {
+                log_rtt = log(rtt);
+                perfRank += (6000 * log_rtt / 5000) * 5000;
+
+                if (tsp->type == CM_SERVER_FILE) {
+                    /* give an edge to servers with high congestion windows */
+                    perfRank -= (tpeer.cwind - 1)* 15;
+                }
+            }
+        }
 
-    if(code == 0) {
-       if((tsp->flags & CM_SERVERFLAG_PREF_SET))
-           newRank = tsp->adminRank +
-                ((int)(623 * log(tpeer.rtt) / 10) * 10 + 5);
-       else /* rank has not been set by admin, derive rank from rtt */
-           newRank = (int)(7200 * log(tpeer.rtt) / 5000) * 5000 + 5000;
+        if (tsp->adminRank) {
+            newRank = tsp->adminRank * 0.8;
+            newRank += tsp->ipRank * 0.2;
+        } else {
+            newRank = tsp->ipRank;
+        }
+        if (perfRank) {
+            newRank *= 0.9;
+            newRank += perfRank * 0.1;
+        }
+        newRank += (rand() & 0x000f); /* randomize */
 
-       newRank += (rand() & 0x000f); /* randomize */
+        if (newRank > 0xFFFF)
+            osi_Log1(afsd_logp, "new server rank %I64u exceeds 0xFFFF", newRank);
 
-        if (abs(newRank - tsp->ipRank) > 0xf) {
-            tsp->ipRank = newRank;
+        /*
+         * If the ranking changes by more than the randomization
+         * factor, update the server reference lists.
+         */
+        if (abs(newRank - tsp->activeRank) > 0xf) {
+            tsp->activeRank = newRank;
 
             lock_ReleaseMutex(&tsp->mx);
             switch (tsp->type) {
@@ -142,7 +229,7 @@ cm_PingServer(cm_server_t *tsp)
     afs_inet_ntoa_r(tsp->addr.sin_addr.S_un.S_addr, hoststr);
     lock_ReleaseMutex(&tsp->mx);
 
-    code = cm_ConnByServer(tsp, cm_rootUserp, &connp);
+    code = cm_ConnByServer(tsp, cm_rootUserp, FALSE, &connp);
     if (code == 0) {
        /* now call the appropriate ping call.  Drop the timeout if
        * the server is known to be down, so that we don't waste a
@@ -172,13 +259,13 @@ cm_PingServer(cm_server_t *tsp)
     }  /* got an unauthenticated connection to this server */
 
     lock_ObtainMutex(&tsp->mx);
-    if (code >= 0 || code == RXGEN_OPCODE || code == CM_RX_RETRY_BUSY_CALL) {
+    if (code >= 0 || code == RXGEN_OPCODE || code == RX_CALL_BUSY) {
        /* mark server as up */
        _InterlockedAnd(&tsp->flags, ~CM_SERVERFLAG_DOWN);
         tsp->downTime = 0;
 
        /* we currently handle 32-bits of capabilities */
-       if (code != RXGEN_OPCODE && code != CM_RX_RETRY_BUSY_CALL &&
+       if (code != RXGEN_OPCODE && code != RX_CALL_BUSY &&
             caps.Capabilities_len > 0) {
            tsp->capabilities = caps.Capabilities_val[0];
            xdr_free((xdrproc_t) xdr_Capabilities, &caps);
@@ -215,6 +302,7 @@ cm_PingServer(cm_server_t *tsp)
                     }
                 }
             }
+            cm_RankServer(tsp);
         }
     } else {
        /* mark server as down */
@@ -254,6 +342,7 @@ cm_PingServer(cm_server_t *tsp)
                     }
                 }
             }
+            cm_RankServer(tsp);
         }
     }
 
@@ -371,7 +460,7 @@ static void cm_CheckServersMulti(afs_uint32 flags, cm_cell_t *cellp)
     cm_server_t ** serversp, *tsp;
     afs_uint32 isDown, wasDown;
     afs_uint32 code;
-    time_t start, *deltas;
+    time_t start;
     char hoststr[16];
 
     cm_InitReq(&req);
@@ -382,7 +471,6 @@ static void cm_CheckServersMulti(afs_uint32 flags, cm_cell_t *cellp)
     conns = (cm_conn_t **)malloc(maxconns * sizeof(cm_conn_t *));
     rxconns = (struct rx_connection **)malloc(maxconns * sizeof(struct rx_connection *));
     conntimer = (afs_int32 *)malloc(maxconns * sizeof (afs_int32));
-    deltas = (time_t *)malloc(maxconns * sizeof (time_t));
     results = (afs_int32 *)malloc(maxconns * sizeof (afs_int32));
     serversp = (cm_server_t **)malloc(maxconns * sizeof(cm_server_t *));
     caps = (Capabilities *)malloc(maxconns * sizeof(Capabilities));
@@ -418,7 +506,7 @@ static void cm_CheckServersMulti(afs_uint32 flags, cm_cell_t *cellp)
             lock_ReleaseMutex(&tsp->mx);
 
             serversp[nconns] = tsp;
-            code = cm_ConnByServer(tsp, cm_rootUserp, &conns[nconns]);
+            code = cm_ConnByServer(tsp, cm_rootUserp, FALSE, &conns[nconns]);
             if (code) {
                 lock_ObtainRead(&cm_serverLock);
                 cm_PutServerNoLock(tsp);
@@ -457,13 +545,13 @@ static void cm_CheckServersMulti(afs_uint32 flags, cm_cell_t *cellp)
             wasDown = tsp->flags & CM_SERVERFLAG_DOWN;
 
             if (results[i] >= 0 || results[i] == RXGEN_OPCODE ||
-                results[i] == CM_RX_RETRY_BUSY_CALL)  {
+                results[i] == RX_CALL_BUSY)  {
                 /* mark server as up */
                 _InterlockedAnd(&tsp->flags, ~CM_SERVERFLAG_DOWN);
                 tsp->downTime = 0;
 
                 /* we currently handle 32-bits of capabilities */
-                if (results[i] != RXGEN_OPCODE && results[i] != CM_RX_RETRY_BUSY_CALL &&
+                if (results[i] != RXGEN_OPCODE && results[i] != RX_CALL_BUSY &&
                     caps[i].Capabilities_len > 0) {
                     tsp->capabilities = caps[i].Capabilities_val[0];
                     xdr_free((xdrproc_t) xdr_Capabilities, &caps[i]);
@@ -501,6 +589,7 @@ static void cm_CheckServersMulti(afs_uint32 flags, cm_cell_t *cellp)
                             }
                         }
                     }
+                    cm_RankServer(tsp);
                 }
             } else {
                 /* mark server as down */
@@ -541,6 +630,7 @@ static void cm_CheckServersMulti(afs_uint32 flags, cm_cell_t *cellp)
                             }
                         }
                     }
+                    cm_RankServer(tsp);
                 }
             }
 
@@ -584,7 +674,7 @@ static void cm_CheckServersMulti(afs_uint32 flags, cm_cell_t *cellp)
             lock_ReleaseMutex(&tsp->mx);
 
             serversp[nconns] = tsp;
-            code = cm_ConnByServer(tsp, cm_rootUserp, &conns[nconns]);
+            code = cm_ConnByServer(tsp, cm_rootUserp, FALSE, &conns[nconns]);
             if (code) {
                 lock_ObtainRead(&cm_serverLock);
                 cm_PutServerNoLock(tsp);
@@ -623,7 +713,7 @@ static void cm_CheckServersMulti(afs_uint32 flags, cm_cell_t *cellp)
             lock_ObtainMutex(&tsp->mx);
             wasDown = tsp->flags & CM_SERVERFLAG_DOWN;
 
-            if (results[i] >= 0 || results[i] == CM_RX_RETRY_BUSY_CALL)  {
+            if (results[i] >= 0 || results[i] == RX_CALL_BUSY)  {
                 /* mark server as up */
                 _InterlockedAnd(&tsp->flags, ~CM_SERVERFLAG_DOWN);
                 tsp->downTime = 0;
@@ -634,6 +724,8 @@ static void cm_CheckServersMulti(afs_uint32 flags, cm_cell_t *cellp)
                           osi_LogSaveString(afsd_logp, hoststr),
                           tsp->type == CM_SERVER_VLDB ? "vldb" : "file",
                           tsp->capabilities);
+                if (wasDown)
+                    cm_RankServer(tsp);
             } else {
                 /* mark server as down */
                 if (!(tsp->flags & CM_SERVERFLAG_DOWN)) {
@@ -650,6 +742,8 @@ static void cm_CheckServersMulti(afs_uint32 flags, cm_cell_t *cellp)
                           osi_LogSaveString(afsd_logp, hoststr),
                           tsp->type == CM_SERVER_VLDB ? "vldb" : "file",
                           tsp->capabilities);
+                if (!wasDown)
+                    cm_RankServer(tsp);
             }
 
             if (tsp->waitCount == 0)
@@ -666,7 +760,6 @@ static void cm_CheckServersMulti(afs_uint32 flags, cm_cell_t *cellp)
     free(conns);
     free(rxconns);
     free(conntimer);
-    free(deltas);
     free(results);
     free(serversp);
     free(caps);
@@ -767,19 +860,17 @@ void cm_SetServerNoInlineBulk(cm_server_t * serverp, int no)
     lock_ReleaseMutex(&serverp->mx);
 }
 
-void cm_SetServerPrefs(cm_server_t * serverp)
+void cm_SetServerIPRank(cm_server_t * serverp)
 {
     unsigned long      serverAddr;     /* in host byte order */
     unsigned long      myAddr, myNet, mySubnet;/* in host byte order */
     unsigned long      netMask;
     int                i;
     long code;
-    int writeLock = 0;
 
     lock_ObtainRead(&cm_syscfgLock);
     if (cm_LanAdapterChangeDetected) {
         lock_ConvertRToW(&cm_syscfgLock);
-        writeLock = 1;
         if (cm_LanAdapterChangeDetected) {
             /* get network related info */
             cm_noIPAddr = CM_MAXINTERFACE_ADDR;
@@ -816,19 +907,18 @@ void cm_SetServerPrefs(cm_server_t * serverp)
        {
            if ( (serverAddr & cm_SubnetMask[i]) == mySubnet)
            {
-               if ( serverAddr == myAddr )
+               if ( serverAddr == myAddr ) {
                    serverp->ipRank = min(serverp->ipRank,
                                           CM_IPRANK_TOP);/* same machine */
-               else serverp->ipRank = min(serverp->ipRank,
-                                           CM_IPRANK_HI); /* same subnet */
-           }
-           else serverp->ipRank = min(serverp->ipRank,CM_IPRANK_MED);
-           /* same net */
+               } else {
+                    serverp->ipRank = min(serverp->ipRank,
+                                          CM_IPRANK_HI); /* same subnet */
+                }
+           } else {
+                serverp->ipRank = min(serverp->ipRank, CM_IPRANK_MED); /* same net */
+            }
        }
     } /* and of for loop */
-
-    /* random between 0..15*/
-    serverp->ipRank += (rand() % 0x000f);
     lock_ReleaseRead(&cm_syscfgLock);
 }
 
@@ -864,8 +954,6 @@ cm_server_t *cm_NewServer(struct sockaddr_in *socketp, int type, cm_cell_t *cell
         lock_InitializeMutex(&tsp->mx, "cm_server_t mutex", LOCK_HIERARCHY_SERVER);
         tsp->addr = *socketp;
 
-        cm_SetServerPrefs(tsp);
-
         tsp->allNextp = cm_allServersp;
         cm_allServersp = tsp;
 
@@ -880,11 +968,30 @@ cm_server_t *cm_NewServer(struct sockaddr_in *socketp, int type, cm_cell_t *cell
     }
     lock_ReleaseWrite(&cm_serverLock);         /* release server lock */
 
-    if (!(flags & CM_FLAG_NOPROBE) && tsp) {
-        _InterlockedOr(&tsp->flags, CM_SERVERFLAG_DOWN);       /* assume down; ping will mark up if available */
-        cm_PingServer(tsp);                                    /* Obtain Capabilities and check up/down state */
-    }
+    if (tsp) {
+        if (!(flags & CM_FLAG_NOPROBE)) {
+            _InterlockedOr(&tsp->flags, CM_SERVERFLAG_DOWN);   /* assume down; ping will mark up if available */
+            lock_ObtainMutex(&tsp->mx);
+            cm_RankServer(tsp);
+            lock_ReleaseMutex(&tsp->mx);
+            cm_PingServer(tsp);                                        /* Obtain Capabilities and check up/down state */
+        } else {
+            pthread_t phandle;
+            pthread_attr_t tattr;
+            int pstatus;
+
+            /* Probe the server in the background to determine if it is up or down */
+            pthread_attr_init(&tattr);
+            pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);
+
+            lock_ObtainMutex(&tsp->mx);
+            cm_RankServer(tsp);
+            lock_ReleaseMutex(&tsp->mx);
+            pstatus = pthread_create(&phandle, &tattr, cm_PingServer, tsp);
 
+            pthread_attr_destroy(&tattr);
+        }
+    }
     return tsp;
 }
 
@@ -1044,7 +1151,21 @@ afs_int32 cm_PutServerRef(cm_serverRef_t *tsrp, int locked)
     return refCount;
 }
 
+afs_uint32
+cm_ServerListSize(cm_serverRef_t* serversp)
+{
+    afs_uint32 count = 0;
+    cm_serverRef_t *tsrp;
 
+    lock_ObtainRead(&cm_serverLock);
+    for (tsrp = serversp; tsrp; tsrp=tsrp->next) {
+        if (tsrp->status == srv_deleted)
+            continue;
+        count++;
+    }
+    lock_ReleaseRead(&cm_serverLock);
+    return count;
+}
 
 LONG_PTR cm_ChecksumServerList(cm_serverRef_t *serversp)
 {
@@ -1076,7 +1197,7 @@ LONG_PTR cm_ChecksumServerList(cm_serverRef_t *serversp)
 void cm_InsertServerList(cm_serverRef_t** list, cm_serverRef_t* element)
 {
     cm_serverRef_t     *current;
-    unsigned short ipRank;
+    unsigned short rank;
 
     lock_ObtainWrite(&cm_serverLock);
     /*
@@ -1144,10 +1265,10 @@ void cm_InsertServerList(cm_serverRef_t** list, cm_serverRef_t* element)
         goto done;
     }
 
-    ipRank = element->server->ipRank;
+    rank = element->server->activeRank;
 
        /* insertion at the beginning of the list */
-    if ((*list)->server->ipRank > ipRank)
+    if ((*list)->server->activeRank > rank)
     {
         element->next = *list;
         *list = element;
@@ -1157,7 +1278,7 @@ void cm_InsertServerList(cm_serverRef_t** list, cm_serverRef_t* element)
     /* find appropriate place to insert */
     for ( current = *list; current->next; current = current->next)
     {
-        if ( current->next->server->ipRank > ipRank )
+        if ( current->next->server->activeRank > rank )
             break;
     }
     element->next = current->next;
@@ -1228,10 +1349,10 @@ void cm_RandomizeServer(cm_serverRef_t** list)
     }
 
     /* count the number of servers with the lowest rank */
-    lowestRank = tsrp->server->ipRank;
+    lowestRank = tsrp->server->activeRank;
     for ( count=1, tsrp=tsrp->next; tsrp; tsrp=tsrp->next)
     {
-        if ( tsrp->server->ipRank != lowestRank)
+        if ( tsrp->server->activeRank != lowestRank)
             break;
         else
             count++;
@@ -1345,6 +1466,26 @@ void cm_RemoveVolumeFromServer(cm_server_t * serverp, afs_uint32 volID)
     }
 }
 
+int cm_IsServerListEmpty(cm_serverRef_t *serversp)
+{
+    cm_serverRef_t *tsrp;
+    int allDeleted = 1;
+
+    if (serversp == NULL)
+        return CM_ERROR_EMPTY;
+
+    lock_ObtainRead(&cm_serverLock);
+    for (tsrp = serversp; tsrp; tsrp=tsrp->next) {
+        if (tsrp->status == srv_deleted)
+            continue;
+        allDeleted = 0;
+        break;
+    }
+    lock_ReleaseRead(&cm_serverLock);
+
+    return ( allDeleted ? CM_ERROR_EMPTY : 0 );
+}
+
 void cm_FreeServerList(cm_serverRef_t** list, afs_uint32 flags)
 {
     cm_serverRef_t  **current;
@@ -1433,7 +1574,7 @@ int cm_DumpServers(FILE *outputFile, char *cookie, int lock)
                  "flags=0x%x waitCount=%u rank=%u downTime=\"%s\" refCount=%u\r\n",
                  cookie, tsp, tsp->cellp ? tsp->cellp->name : "", hoststr,
                  ntohs(tsp->addr.sin_port), uuidstr, type,
-                 tsp->capabilities, tsp->flags, tsp->waitCount, tsp->ipRank,
+                 tsp->capabilities, tsp->flags, tsp->waitCount, tsp->activeRank,
                  (tsp->flags & CM_SERVERFLAG_DOWN) ?  down : "up",
                  tsp->refCount);
         WriteFile(outputFile, output, (DWORD)strlen(output), &zilch, NULL);