rx-tq-busy-20070516
[openafs.git] / src / rx / rx.c
index 0f4defc..df35abc 100644 (file)
@@ -383,13 +383,9 @@ rx_InitHost(u_int host, u_int port)
 #endif /* KERNEL */
     char *htable, *ptable;
     int tmp_status;
-
-#if defined(AFS_DJGPP_ENV) && !defined(DEBUG)
-    __djgpp_set_quiet_socket(1);
-#endif
-
+    
     SPLVAR;
-
+    
     INIT_PTHREAD_LOCKS;
     LOCK_RX_INIT;
     if (rxinit_status == 0) {
@@ -404,7 +400,7 @@ rx_InitHost(u_int host, u_int port)
     if (afs_winsockInit() < 0)
        return -1;
 #endif
-
+    
 #ifndef KERNEL
     /*
      * Initialize anything necessary to provide a non-premptive threading
@@ -412,10 +408,10 @@ rx_InitHost(u_int host, u_int port)
      */
     rxi_InitializeThreadSupport();
 #endif
-
+    
     /* Allocate and initialize a socket for client and perhaps server
      * connections. */
-
+    
     rx_socket = rxi_GetHostUDPSocket(host, (u_short) port);
     if (rx_socket == OSI_NULLSOCKET) {
        UNLOCK_RX_INIT;
@@ -1043,10 +1039,28 @@ rx_GetConnection(register struct rx_connection *conn)
     USERPRI;
 }
 
+/* Wait for the transmit queue to no longer be busy. 
+ * requires the call->lock to be held */
+static void rxi_WaitforTQBusy(struct rx_call *call) {
+    while (call->flags & RX_CALL_TQ_BUSY) {
+       call->flags |= RX_CALL_TQ_WAIT;
+       call->tqWaiters++;
+#ifdef RX_ENABLE_LOCKS
+       osirx_AssertMine(&call->lock, "rxi_WaitforTQ lock");
+       CV_WAIT(&call->cv_tq, &call->lock);
+#else /* RX_ENABLE_LOCKS */
+       osi_rxSleep(&call->tq);
+#endif /* RX_ENABLE_LOCKS */
+       call->tqWaiters--;
+       if (call->tqWaiters == 0) {
+           call->flags &= ~RX_CALL_TQ_WAIT;
+       }
+    }
+}
 /* Start a new rx remote procedure call, on the specified connection.
  * If wait is set to 1, wait for a free call channel; otherwise return
  * 0.  Maxtime gives the maximum number of seconds this call may take,
- * after rx_MakeCall returns.  After this time interval, a call to any
+ * after rx_NewCall returns.  After this time interval, a call to any
  * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
  * For fine grain locking, we hold the conn_call_lock in order to 
  * to ensure that we don't get signalle after we found a call in an active
@@ -1061,7 +1075,7 @@ rx_NewCall(register struct rx_connection *conn)
     SPLVAR;
 
     clock_NewTime();
-    dpf(("rx_MakeCall(conn %x)\n", conn));
+    dpf(("rx_NewCall(conn %x)\n", conn));
 
     NETPRI;
     clock_GetTime(&queueTime);
@@ -1168,20 +1182,7 @@ rx_NewCall(register struct rx_connection *conn)
 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
     /* Now, if TQ wasn't cleared earlier, do it now. */
     MUTEX_ENTER(&call->lock);
-    while (call->flags & RX_CALL_TQ_BUSY) {
-       call->flags |= RX_CALL_TQ_WAIT;
-       call->tqWaiters++;
-#ifdef RX_ENABLE_LOCKS
-       osirx_AssertMine(&call->lock, "rxi_Start lock4");
-       CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
-       osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
-       call->tqWaiters--;
-       if (call->tqWaiters == 0) {
-           call->flags &= ~RX_CALL_TQ_WAIT;
-       }
-    }
+    rxi_WaitforTQBusy(call);
     if (call->flags & RX_CALL_TQ_CLEARME) {
        rxi_ClearTransmitQueue(call, 0);
        queue_Init(&call->tq);
@@ -1189,6 +1190,7 @@ rx_NewCall(register struct rx_connection *conn)
     MUTEX_EXIT(&call->lock);
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
 
+    dpf(("rx_NewCall(call %x)\n", call));
     return call;
 }
 
@@ -1259,9 +1261,10 @@ rxi_SetCallNumberVector(register struct rx_connection *aconn,
                          service name might be used for probing for
                          statistics) */
 struct rx_service *
-rx_NewService(u_short port, u_short serviceId, char *serviceName,
-             struct rx_securityClass **securityObjects, int nSecurityObjects,
-             afs_int32(*serviceProc) (struct rx_call * acall))
+rx_NewServiceHost(afs_uint32 host, u_short port, u_short serviceId, 
+                 char *serviceName, struct rx_securityClass **securityObjects,
+                 int nSecurityObjects, 
+                 afs_int32(*serviceProc) (struct rx_call * acall))
 {
     osi_socket socket = OSI_NULLSOCKET;
     register struct rx_service *tservice;
@@ -1292,7 +1295,7 @@ rx_NewService(u_short port, u_short serviceId, char *serviceName,
     for (i = 0; i < RX_MAX_SERVICES; i++) {
        register struct rx_service *service = rx_services[i];
        if (service) {
-           if (port == service->servicePort) {
+           if (port == service->servicePort && host == service->serviceHost) {
                if (service->serviceId == serviceId) {
                    /* The identical service has already been
                     * installed; if the caller was intending to
@@ -1322,6 +1325,7 @@ rx_NewService(u_short port, u_short serviceId, char *serviceName,
            }
            service = tservice;
            service->socket = socket;
+           service->serviceHost = host;
            service->servicePort = port;
            service->serviceId = serviceId;
            service->serviceName = serviceName;
@@ -1345,6 +1349,14 @@ rx_NewService(u_short port, u_short serviceId, char *serviceName,
     return 0;
 }
 
+struct rx_service *
+rx_NewService(u_short port, u_short serviceId, char *serviceName,
+             struct rx_securityClass **securityObjects, int nSecurityObjects,
+             afs_int32(*serviceProc) (struct rx_call * acall))
+{
+    return rx_NewServiceHost(htonl(INADDR_ANY), port, serviceId, serviceName, securityObjects, nSecurityObjects, serviceProc);
+}
+
 /* Generic request processing loop. This routine should be called
  * by the implementation dependent rx_ServerProc. If socketp is
  * non-null, it will be set to the file descriptor that this thread
@@ -2015,6 +2027,10 @@ rx_Finalize(void)
     }
     rxi_flushtrace();
 
+#ifdef AFS_NT40_ENV
+    afs_winsockCleanup();
+#endif
+
     rxinit_status = 1;
     UNLOCK_RX_INIT;
 }
@@ -2070,6 +2086,8 @@ rxi_NewCall(register struct rx_connection *conn, register int channel)
     register struct rx_call *nxp;      /* Next call pointer, for queue_Scan */
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
 
+    dpf(("rxi_NewCall(conn %x, channel %d)\n", conn, channel));
+
     /* Grab an existing call structure, or allocate a new one.
      * Existing call structures are assumed to have been left reset by
      * rxi_FreeCall */
@@ -2252,7 +2270,7 @@ rxi_Free(void *addr, register size_t size)
  * structure hanging off a connection structure */
 struct rx_peer *
 rxi_FindPeer(register afs_uint32 host, register u_short port,
-            struct rx_peer *origPeer, int create)
+             struct rx_peer *origPeer, int create)
 {
     register struct rx_peer *pp;
     int hashIndex;
@@ -2260,12 +2278,12 @@ rxi_FindPeer(register afs_uint32 host, register u_short port,
     MUTEX_ENTER(&rx_peerHashTable_lock);
     for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
        if ((pp->host == host) && (pp->port == port))
-           break;
+            break;
     }
     if (!pp) {
-       if (create) {
-           pp = rxi_AllocPeer();       /* This bzero's *pp */
-           pp->host = host;    /* set here or in InitPeerParams is zero */
+        if (create) {
+            pp = rxi_AllocPeer();       /* This bzero's *pp */
+           pp->host = host;    /* set here or in InitPeerParams is zero */
            pp->port = port;
            MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
            queue_Init(&pp->congestionQueue);
@@ -2435,7 +2453,7 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket,
     packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
        ? rx_packetTypes[np->header.type - 1] : "*UNKNOWN*";
     dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
-        np->header.serial, packetType, ntohl(host), htohs(port), np->header.serviceId,
+        np->header.serial, packetType, ntohl(host), ntohs(port), np->header.serviceId,
         np->header.epoch, np->header.cid, np->header.callNumber,
         np->header.seq, np->header.flags, np));
 #endif
@@ -3342,9 +3360,11 @@ rxi_ReceiveDataPacket(register struct rx_call *call,
 
            /* We need to send an ack of the packet is out of sequence, 
             * or if an ack was requested by the peer. */
-           if (seq != prev + 1 || missing || (flags & RX_REQUEST_ACK)) {
+           if (seq != prev + 1 || missing) {
                ackNeeded = RX_ACK_OUT_OF_SEQUENCE;
-           }
+           } else if (flags & RX_REQUEST_ACK) {
+               ackNeeded = RX_ACK_REQUESTED;
+            }
 
            /* Acknowledge the last packet for each call */
            if (flags & RX_LAST_PACKET) {
@@ -3611,6 +3631,9 @@ rxi_ReceiveAckPacket(register struct rx_call *call, struct rx_packet *np,
        if (serial
            && (tp->header.serial == serial || tp->firstSerial == serial))
            rxi_ComputePeerNetStats(call, tp, ap, np);
+       if (!(tp->flags & RX_PKTFLAG_ACKED)) {
+           newAckCount++;
+       }
 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
        /* XXX Hack. Because we have to release the global rx lock when sending
         * packets (osi_NetSend) we drop all acks while we're traversing the tq
@@ -3621,9 +3644,6 @@ rxi_ReceiveAckPacket(register struct rx_call *call, struct rx_packet *np,
         * set the ack bits in the packets and have rxi_Start remove the packets
         * when it's done transmitting.
         */
-       if (!(tp->flags & RX_PKTFLAG_ACKED)) {
-           newAckCount++;
-       }
        if (call->flags & RX_CALL_TQ_BUSY) {
 #ifdef RX_ENABLE_LOCKS
            tp->flags |= RX_PKTFLAG_ACKED;
@@ -3757,10 +3777,11 @@ rxi_ReceiveAckPacket(register struct rx_call *call, struct rx_packet *np,
         * be unable to accept packets of the size that prior AFS versions would
         * send without asking.  */
        if (peer->maxMTU != tSize) {
+           if (peer->maxMTU > tSize) /* possible cong., maxMTU decreased */
+               peer->congestSeq++;
            peer->maxMTU = tSize;
            peer->MTU = MIN(tSize, peer->MTU);
            call->MTU = MIN(call->MTU, tSize);
-           peer->congestSeq++;
        }
 
        if (np->length == rx_AckDataSize(ap->nAcks) + 3 * sizeof(afs_int32)) {
@@ -3811,9 +3832,9 @@ rxi_ReceiveAckPacket(register struct rx_call *call, struct rx_packet *np,
                          sizeof(afs_int32), &tSize);
            maxDgramPackets = (afs_uint32) ntohl(tSize);
            maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
-           maxDgramPackets =
-               MIN(maxDgramPackets, (int)(peer->ifDgramPackets));
-           maxDgramPackets = MIN(maxDgramPackets, tSize);
+           maxDgramPackets = MIN(maxDgramPackets, peer->ifDgramPackets);
+           if (peer->natMTU < peer->ifMTU)
+               maxDgramPackets = MIN(maxDgramPackets, rxi_AdjustDgramPackets(1, peer->natMTU));
            if (maxDgramPackets > 1) {
                peer->maxDgramPackets = maxDgramPackets;
                call->MTU = RX_JUMBOBUFFERSIZE + RX_HEADER_SIZE;
@@ -3875,19 +3896,7 @@ rxi_ReceiveAckPacket(register struct rx_call *call, struct rx_packet *np,
            return np;
        }
        call->flags |= RX_CALL_FAST_RECOVER_WAIT;
-       while (call->flags & RX_CALL_TQ_BUSY) {
-           call->flags |= RX_CALL_TQ_WAIT;
-           call->tqWaiters++;
-#ifdef RX_ENABLE_LOCKS
-           osirx_AssertMine(&call->lock, "rxi_Start lock2");
-           CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
-           osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
-           call->tqWaiters--;
-           if (call->tqWaiters == 0)
-               call->flags &= ~RX_CALL_TQ_WAIT;
-       }
+       rxi_WaitforTQBusy(call);
        MUTEX_ENTER(&peer->peer_lock);
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
        call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
@@ -4452,6 +4461,8 @@ rxi_ResetCall(register struct rx_call *call, register int newcall)
     register struct rx_peer *peer;
     struct rx_packet *packet;
 
+    dpf(("rxi_ResetCall(call %x, newcall %d)\n", call, newcall));
+
     /* Notify anyone who is waiting for asynchronous packet arrival */
     if (call->arrivalProc) {
        (*call->arrivalProc) (call, call->arrivalProcHandle,
@@ -5093,19 +5104,7 @@ rxi_Start(struct rxevent *event, register struct rx_call *call,
            return;
        }
        call->flags |= RX_CALL_FAST_RECOVER_WAIT;
-       while (call->flags & RX_CALL_TQ_BUSY) {
-           call->flags |= RX_CALL_TQ_WAIT;
-           call->tqWaiters++;
-#ifdef RX_ENABLE_LOCKS
-           osirx_AssertMine(&call->lock, "rxi_Start lock1");
-           CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
-           osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
-           call->tqWaiters--;
-           if (call->tqWaiters == 0)
-               call->flags &= ~RX_CALL_TQ_WAIT;
-       }
+       rxi_WaitforTQBusy(call);
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
        call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
        call->flags |= RX_CALL_FAST_RECOVER;
@@ -5620,6 +5619,7 @@ rxi_SendDelayedCallAbort(struct rxevent *event, register struct rx_call *call,
                            (char *)&error, sizeof(error), 0);
        rxi_FreePacket(packet);
     }
+    CALL_RELE(call, RX_CALL_REFCOUNT_ABORT);
     MUTEX_EXIT(&call->lock);
 }
 
@@ -6320,17 +6320,19 @@ MakeDebugCall(osi_socket socket, afs_uint32 remoteAddr, afs_uint16 remotePort,
              void *outputData, size_t outputLength)
 {
     static afs_int32 counter = 100;
-    time_t endTime;
+    time_t waitTime, waitCount, startTime, endTime;
     struct rx_header theader;
     char tbuffer[1500];
     register afs_int32 code;
-    struct timeval tv;
+    struct timeval tv_now, tv_wake, tv_delta;
     struct sockaddr_in taddr, faddr;
     int faddrLen;
     fd_set imask;
     register char *tp;
 
-    endTime = time(0) + 20;    /* try for 20 seconds */
+    startTime = time(0);
+    waitTime = 1;
+    waitCount = 5;
     LOCK_RX_DEBUG;
     counter++;
     UNLOCK_RX_DEBUG;
@@ -6359,29 +6361,54 @@ MakeDebugCall(osi_socket socket, afs_uint32 remoteAddr, afs_uint16 remotePort,
                   (struct sockaddr *)&taddr, sizeof(struct sockaddr_in));
 
        /* see if there's a packet available */
-       FD_ZERO(&imask);
-       FD_SET(socket, &imask);
-       tv.tv_sec = 1;
-       tv.tv_usec = 0;
-       code = select((int)(socket + 1), &imask, 0, 0, &tv);
-       if (code == 1 && FD_ISSET(socket, &imask)) {
-           /* now receive a packet */
-           faddrLen = sizeof(struct sockaddr_in);
-           code =
-               recvfrom(socket, tbuffer, sizeof(tbuffer), 0,
-                        (struct sockaddr *)&faddr, &faddrLen);
-
-           if (code > 0) {
-               memcpy(&theader, tbuffer, sizeof(struct rx_header));
-               if (counter == ntohl(theader.callNumber))
-                   break;
+       gettimeofday(&tv_wake,0);
+       tv_wake.tv_sec += waitTime;
+       for (;;) {
+           FD_ZERO(&imask);
+           FD_SET(socket, &imask);
+           tv_delta.tv_sec = tv_wake.tv_sec;
+           tv_delta.tv_usec = tv_wake.tv_usec;
+           gettimeofday(&tv_now, 0);
+           
+           if (tv_delta.tv_usec < tv_now.tv_usec) {
+               /* borrow */
+               tv_delta.tv_usec += 1000000;
+               tv_delta.tv_sec--;
            }
+           tv_delta.tv_usec -= tv_now.tv_usec;
+           
+           if (tv_delta.tv_sec < tv_now.tv_sec) {
+               /* time expired */
+               break;
+           }
+           tv_delta.tv_sec -= tv_now.tv_sec;
+           
+           code = select(socket + 1, &imask, 0, 0, &tv_delta);
+           if (code == 1 && FD_ISSET(socket, &imask)) {
+               /* now receive a packet */
+               faddrLen = sizeof(struct sockaddr_in);
+               code =
+                   recvfrom(socket, tbuffer, sizeof(tbuffer), 0,
+                            (struct sockaddr *)&faddr, &faddrLen);
+               
+               if (code > 0) {
+                   memcpy(&theader, tbuffer, sizeof(struct rx_header));
+                   if (counter == ntohl(theader.callNumber))
+                       goto success;
+                   continue;
+               }
+           }
+           break;
        }
 
        /* see if we've timed out */
-       if (endTime < time(0))
-           return -1;
+       if (!--waitCount) {
+            return -1;
+       }
+       waitTime <<= 1;
     }
+    
+ success:
     code -= sizeof(struct rx_header);
     if (code > outputLength)
        code = outputLength;
@@ -7738,3 +7765,31 @@ rx_RxStatUserOk(struct rx_call *call)
        return 0;
     return rxi_rxstat_userok(call);
 }
+
+#ifdef AFS_NT40_ENV
+/*
+ * DllMain() -- Entry-point function called by the DllMainCRTStartup()
+ *     function in the MSVC runtime DLL (msvcrt.dll).
+ *
+ *     Note: the system serializes calls to this function.
+ */
+BOOL WINAPI
+DllMain(HINSTANCE dllInstHandle,       /* instance handle for this DLL module */
+       DWORD reason,                   /* reason function is being called */
+       LPVOID reserved)                /* reserved for future use */
+{
+    switch (reason) {
+    case DLL_PROCESS_ATTACH:
+       /* library is being attached to a process */
+       INIT_PTHREAD_LOCKS;
+       return TRUE;
+
+    case DLL_PROCESS_DETACH:
+       return TRUE;
+
+    default:
+       return FALSE;
+    }
+}
+#endif
+