lock_ReleaseRead(&cm_serverLock);
}
+void
+cm_ServerClearRPCStats(void) {
+ cm_server_t *tsp;
+ afs_uint16 port;
+
+ lock_ObtainRead(&cm_serverLock);
+ for (tsp = cm_allServersp; tsp; tsp = tsp->allNextp) {
+ switch (tsp->type) {
+ case CM_SERVER_VLDB:
+ port = htons(7003);
+ rx_ClearPeerRPCStats(opcode_VL_ProbeServer>>32, tsp->addr.sin_addr.s_addr, port);
+ break;
+ case CM_SERVER_FILE:
+ port = htons(7000);
+ rx_ClearPeerRPCStats(opcode_RXAFS_GetCapabilities>>32, tsp->addr.sin_addr.s_addr, port);
+ rx_ClearPeerRPCStats(opcode_RXAFS_GetTime>>32, tsp->addr.sin_addr.s_addr, port);
+ break;
+ }
+ }
+ lock_ReleaseRead(&cm_serverLock);
+}
+
/*
* lock_ObtainMutex must be held prior to calling
* this function.
{
afs_int32 code = 0; /* start with "success" */
struct rx_debugPeer tpeer;
+ struct rx_peer * rxPeer;
afs_uint16 port;
- afs_uint16 newRank;
+ afs_uint64 newRank;
+ afs_uint64 perfRank = 0;
+ afs_uint64 rtt = 0;
+ double log_rtt;
+
+ int isDown = (tsp->flags & CM_SERVERFLAG_DOWN);
+ void *peerRpcStats = NULL;
+ afs_uint64 opcode = 0;
switch(tsp->type) {
case CM_SERVER_VLDB:
port = htons(7003);
+ opcode = opcode_VL_ProbeServer;
break;
case CM_SERVER_FILE:
port = htons(7000);
+ opcode = opcode_RXAFS_GetCapabilities;
break;
default:
return -1;
}
- code = rx_GetLocalPeers(tsp->addr.sin_addr.s_addr, port, &tpeer);
+ cm_SetServerIPRank(tsp);
+
+ if (isDown) {
+ newRank = 0xFFFF;
+ } else {
+ /*
+ * There are three potential components to the ranking:
+ * 1. Any administrative set preference whether it be
+ * via "fs setserverprefs", registry or dns.
+ *
+ * 2. Network subnet mask comparison.
+ *
+ * 3. Performance data.
+ *
+ * If there is an administrative rank, that is the
+ * the primary factor. If not the primary factor
+ * is the network ranking.
+ */
+
+ code = rx_GetLocalPeers(tsp->addr.sin_addr.s_addr, port, &tpeer);
+ if (code == 0) {
+ peerRpcStats = rx_CopyPeerRPCStats(opcode, tsp->addr.sin_addr.s_addr, port);
+ if (peerRpcStats == NULL && tsp->type == CM_SERVER_FILE)
+ peerRpcStats = rx_CopyPeerRPCStats(opcode_RXAFS_GetTime, tsp->addr.sin_addr.s_addr, port);
+ if (peerRpcStats) {
+ afs_uint64 execTimeSum = _8THMSEC(RPCOpStat_ExecTimeSum(peerRpcStats));
+ afs_uint64 queueTimeSum = _8THMSEC(RPCOpStat_QTimeSum(peerRpcStats));
+ afs_uint64 numCalls = RPCOpStat_NumCalls(peerRpcStats);
+
+ if (numCalls > 0)
+ rtt = (execTimeSum - queueTimeSum) / numCalls;
+
+ rx_ReleaseRPCStats(peerRpcStats);
+ }
- /*check if rx_GetLocalPeers succeeded and if there is data for tsp */
- if(code == 0 && (tpeer.rtt == 0 && tpeer.rtt_dev == 0))
- code = -1;
+ if (rtt == 0 && tpeer.rtt) {
+ /* rtt is ms/8 */
+ rtt = tpeer.rtt;
+ }
+
+ if (rtt > 0) {
+ log_rtt = log(rtt);
+ perfRank += (6000 * log_rtt / 5000) * 5000;
+
+ if (tsp->type == CM_SERVER_FILE) {
+ /* give an edge to servers with high congestion windows */
+ perfRank -= (tpeer.cwind - 1)* 15;
+ }
+ }
+ }
- if(code == 0) {
- if((tsp->flags & CM_SERVERFLAG_PREF_SET))
- newRank = tsp->adminRank +
- ((int)(623 * log(tpeer.rtt) / 10) * 10 + 5);
- else /* rank has not been set by admin, derive rank from rtt */
- newRank = (int)(7200 * log(tpeer.rtt) / 5000) * 5000 + 5000;
+ if (tsp->adminRank) {
+ newRank = tsp->adminRank * 0.8;
+ newRank += tsp->ipRank * 0.2;
+ } else {
+ newRank = tsp->ipRank;
+ }
+ if (perfRank) {
+ newRank *= 0.9;
+ newRank += perfRank * 0.1;
+ }
+ newRank += (rand() & 0x000f); /* randomize */
- newRank += (rand() & 0x000f); /* randomize */
+ if (newRank > 0xFFFF)
+ osi_Log1(afsd_logp, "new server rank %I64u exceeds 0xFFFF", newRank);
- if (abs(newRank - tsp->ipRank) > 0xf) {
- tsp->ipRank = newRank;
+ /*
+ * If the ranking changes by more than the randomization
+ * factor, update the server reference lists.
+ */
+ if (abs(newRank - tsp->activeRank) > 0xf) {
+ tsp->activeRank = newRank;
lock_ReleaseMutex(&tsp->mx);
switch (tsp->type) {
afs_inet_ntoa_r(tsp->addr.sin_addr.S_un.S_addr, hoststr);
lock_ReleaseMutex(&tsp->mx);
- code = cm_ConnByServer(tsp, cm_rootUserp, &connp);
+ code = cm_ConnByServer(tsp, cm_rootUserp, FALSE, &connp);
if (code == 0) {
/* now call the appropriate ping call. Drop the timeout if
* the server is known to be down, so that we don't waste a
} /* got an unauthenticated connection to this server */
lock_ObtainMutex(&tsp->mx);
- if (code >= 0 || code == RXGEN_OPCODE) {
+ if (code >= 0 || code == RXGEN_OPCODE || code == RX_CALL_BUSY) {
/* mark server as up */
_InterlockedAnd(&tsp->flags, ~CM_SERVERFLAG_DOWN);
tsp->downTime = 0;
/* we currently handle 32-bits of capabilities */
- if (code != RXGEN_OPCODE && caps.Capabilities_len > 0) {
+ if (code != RXGEN_OPCODE && code != RX_CALL_BUSY &&
+ caps.Capabilities_len > 0) {
tsp->capabilities = caps.Capabilities_val[0];
xdr_free((xdrproc_t) xdr_Capabilities, &caps);
caps.Capabilities_len = 0;
}
}
}
+ cm_RankServer(tsp);
}
} else {
/* mark server as down */
}
}
}
+ cm_RankServer(tsp);
}
}
cm_server_t ** serversp, *tsp;
afs_uint32 isDown, wasDown;
afs_uint32 code;
- time_t start, *deltas;
+ time_t start;
char hoststr[16];
cm_InitReq(&req);
conns = (cm_conn_t **)malloc(maxconns * sizeof(cm_conn_t *));
rxconns = (struct rx_connection **)malloc(maxconns * sizeof(struct rx_connection *));
conntimer = (afs_int32 *)malloc(maxconns * sizeof (afs_int32));
- deltas = (time_t *)malloc(maxconns * sizeof (time_t));
results = (afs_int32 *)malloc(maxconns * sizeof (afs_int32));
serversp = (cm_server_t **)malloc(maxconns * sizeof(cm_server_t *));
caps = (Capabilities *)malloc(maxconns * sizeof(Capabilities));
lock_ReleaseMutex(&tsp->mx);
serversp[nconns] = tsp;
- code = cm_ConnByServer(tsp, cm_rootUserp, &conns[nconns]);
+ code = cm_ConnByServer(tsp, cm_rootUserp, FALSE, &conns[nconns]);
if (code) {
lock_ObtainRead(&cm_serverLock);
cm_PutServerNoLock(tsp);
lock_ObtainMutex(&tsp->mx);
wasDown = tsp->flags & CM_SERVERFLAG_DOWN;
- if (results[i] >= 0 || results[i] == RXGEN_OPCODE) {
+ if (results[i] >= 0 || results[i] == RXGEN_OPCODE ||
+ results[i] == RX_CALL_BUSY) {
/* mark server as up */
_InterlockedAnd(&tsp->flags, ~CM_SERVERFLAG_DOWN);
tsp->downTime = 0;
/* we currently handle 32-bits of capabilities */
- if (results[i] != RXGEN_OPCODE && caps[i].Capabilities_len > 0) {
+ if (results[i] != RXGEN_OPCODE && results[i] != RX_CALL_BUSY &&
+ caps[i].Capabilities_len > 0) {
tsp->capabilities = caps[i].Capabilities_val[0];
xdr_free((xdrproc_t) xdr_Capabilities, &caps[i]);
caps[i].Capabilities_len = 0;
}
}
}
+ cm_RankServer(tsp);
}
} else {
/* mark server as down */
}
}
}
+ cm_RankServer(tsp);
}
}
lock_ReleaseMutex(&tsp->mx);
serversp[nconns] = tsp;
- code = cm_ConnByServer(tsp, cm_rootUserp, &conns[nconns]);
+ code = cm_ConnByServer(tsp, cm_rootUserp, FALSE, &conns[nconns]);
if (code) {
lock_ObtainRead(&cm_serverLock);
cm_PutServerNoLock(tsp);
lock_ObtainMutex(&tsp->mx);
wasDown = tsp->flags & CM_SERVERFLAG_DOWN;
- if (results[i] >= 0) {
+ if (results[i] >= 0 || results[i] == RX_CALL_BUSY) {
/* mark server as up */
_InterlockedAnd(&tsp->flags, ~CM_SERVERFLAG_DOWN);
tsp->downTime = 0;
osi_LogSaveString(afsd_logp, hoststr),
tsp->type == CM_SERVER_VLDB ? "vldb" : "file",
tsp->capabilities);
+ if (wasDown)
+ cm_RankServer(tsp);
} else {
/* mark server as down */
if (!(tsp->flags & CM_SERVERFLAG_DOWN)) {
osi_LogSaveString(afsd_logp, hoststr),
tsp->type == CM_SERVER_VLDB ? "vldb" : "file",
tsp->capabilities);
+ if (!wasDown)
+ cm_RankServer(tsp);
}
if (tsp->waitCount == 0)
free(conns);
free(rxconns);
free(conntimer);
- free(deltas);
free(results);
free(serversp);
free(caps);
lock_ReleaseMutex(&serverp->mx);
}
-void cm_SetServerPrefs(cm_server_t * serverp)
+void cm_SetServerIPRank(cm_server_t * serverp)
{
unsigned long serverAddr; /* in host byte order */
unsigned long myAddr, myNet, mySubnet;/* in host byte order */
unsigned long netMask;
int i;
long code;
- int writeLock = 0;
lock_ObtainRead(&cm_syscfgLock);
if (cm_LanAdapterChangeDetected) {
lock_ConvertRToW(&cm_syscfgLock);
- writeLock = 1;
if (cm_LanAdapterChangeDetected) {
/* get network related info */
cm_noIPAddr = CM_MAXINTERFACE_ADDR;
{
if ( (serverAddr & cm_SubnetMask[i]) == mySubnet)
{
- if ( serverAddr == myAddr )
+ if ( serverAddr == myAddr ) {
serverp->ipRank = min(serverp->ipRank,
CM_IPRANK_TOP);/* same machine */
- else serverp->ipRank = min(serverp->ipRank,
- CM_IPRANK_HI); /* same subnet */
- }
- else serverp->ipRank = min(serverp->ipRank,CM_IPRANK_MED);
- /* same net */
+ } else {
+ serverp->ipRank = min(serverp->ipRank,
+ CM_IPRANK_HI); /* same subnet */
+ }
+ } else {
+ serverp->ipRank = min(serverp->ipRank, CM_IPRANK_MED); /* same net */
+ }
}
} /* and of for loop */
-
- /* random between 0..15*/
- serverp->ipRank += (rand() % 0x000f);
lock_ReleaseRead(&cm_syscfgLock);
}
lock_InitializeMutex(&tsp->mx, "cm_server_t mutex", LOCK_HIERARCHY_SERVER);
tsp->addr = *socketp;
- cm_SetServerPrefs(tsp);
-
tsp->allNextp = cm_allServersp;
cm_allServersp = tsp;
}
lock_ReleaseWrite(&cm_serverLock); /* release server lock */
- if (!(flags & CM_FLAG_NOPROBE) && tsp) {
- _InterlockedOr(&tsp->flags, CM_SERVERFLAG_DOWN); /* assume down; ping will mark up if available */
- cm_PingServer(tsp); /* Obtain Capabilities and check up/down state */
- }
+ if (tsp) {
+ if (!(flags & CM_FLAG_NOPROBE)) {
+ _InterlockedOr(&tsp->flags, CM_SERVERFLAG_DOWN); /* assume down; ping will mark up if available */
+ lock_ObtainMutex(&tsp->mx);
+ cm_RankServer(tsp);
+ lock_ReleaseMutex(&tsp->mx);
+ cm_PingServer(tsp); /* Obtain Capabilities and check up/down state */
+ } else {
+ pthread_t phandle;
+ pthread_attr_t tattr;
+ int pstatus;
+
+ /* Probe the server in the background to determine if it is up or down */
+ pthread_attr_init(&tattr);
+ pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);
+
+ lock_ObtainMutex(&tsp->mx);
+ cm_RankServer(tsp);
+ lock_ReleaseMutex(&tsp->mx);
+ pstatus = pthread_create(&phandle, &tattr, cm_PingServer, tsp);
+ pthread_attr_destroy(&tattr);
+ }
+ }
return tsp;
}
{
cm_server_t *tsp;
- if (locked)
+ if (!locked)
lock_ObtainRead(&cm_serverLock);
for (tsp = cm_allServersp; tsp; tsp = tsp->allNextp) {
return refCount;
}
+afs_uint32
+cm_ServerListSize(cm_serverRef_t* serversp)
+{
+ afs_uint32 count = 0;
+ cm_serverRef_t *tsrp;
+ lock_ObtainRead(&cm_serverLock);
+ for (tsrp = serversp; tsrp; tsrp=tsrp->next) {
+ if (tsrp->status == srv_deleted)
+ continue;
+ count++;
+ }
+ lock_ReleaseRead(&cm_serverLock);
+ return count;
+}
LONG_PTR cm_ChecksumServerList(cm_serverRef_t *serversp)
{
void cm_InsertServerList(cm_serverRef_t** list, cm_serverRef_t* element)
{
cm_serverRef_t *current;
- unsigned short ipRank;
+ unsigned short rank;
lock_ObtainWrite(&cm_serverLock);
/*
goto done;
}
- ipRank = element->server->ipRank;
+ rank = element->server->activeRank;
/* insertion at the beginning of the list */
- if ((*list)->server->ipRank > ipRank)
+ if ((*list)->server->activeRank > rank)
{
element->next = *list;
*list = element;
/* find appropriate place to insert */
for ( current = *list; current->next; current = current->next)
{
- if ( current->next->server->ipRank > ipRank )
+ if ( current->next->server->activeRank > rank )
break;
}
element->next = current->next;
}
/* count the number of servers with the lowest rank */
- lowestRank = tsrp->server->ipRank;
+ lowestRank = tsrp->server->activeRank;
for ( count=1, tsrp=tsrp->next; tsrp; tsrp=tsrp->next)
{
- if ( tsrp->server->ipRank != lowestRank)
+ if ( tsrp->server->activeRank != lowestRank)
break;
else
count++;
}
}
+int cm_IsServerListEmpty(cm_serverRef_t *serversp)
+{
+ cm_serverRef_t *tsrp;
+ int allDeleted = 1;
+
+ if (serversp == NULL)
+ return CM_ERROR_EMPTY;
+
+ lock_ObtainRead(&cm_serverLock);
+ for (tsrp = serversp; tsrp; tsrp=tsrp->next) {
+ if (tsrp->status == srv_deleted)
+ continue;
+ allDeleted = 0;
+ break;
+ }
+ lock_ReleaseRead(&cm_serverLock);
+
+ return ( allDeleted ? CM_ERROR_EMPTY : 0 );
+}
+
void cm_FreeServerList(cm_serverRef_t** list, afs_uint32 flags)
{
cm_serverRef_t **current;
"flags=0x%x waitCount=%u rank=%u downTime=\"%s\" refCount=%u\r\n",
cookie, tsp, tsp->cellp ? tsp->cellp->name : "", hoststr,
ntohs(tsp->addr.sin_port), uuidstr, type,
- tsp->capabilities, tsp->flags, tsp->waitCount, tsp->ipRank,
+ tsp->capabilities, tsp->flags, tsp->waitCount, tsp->activeRank,
(tsp->flags & CM_SERVERFLAG_DOWN) ? down : "up",
tsp->refCount);
WriteFile(outputFile, output, (DWORD)strlen(output), &zilch, NULL);