windows-rx-debug-tid-20050917
[openafs.git] / src / rx / rx.c
index 19b14fb..92b3457 100644 (file)
@@ -48,13 +48,13 @@ RCSID
 #include "sys/debug.h"
 #endif
 #include "afsint.h"
-#ifdef AFS_ALPHA_ENV
+#ifdef AFS_OSF_ENV
 #undef kmem_alloc
 #undef kmem_free
 #undef mem_alloc
 #undef mem_free
 #undef register
-#endif /* AFS_ALPHA_ENV */
+#endif /* AFS_OSF_ENV */
 #else /* !UKERNEL */
 #include "afs/sysincludes.h"
 #include "afsincludes.h"
@@ -153,7 +153,6 @@ static unsigned int rxi_rpc_process_stat_cnt;
  */
 
 extern pthread_mutex_t rx_stats_mutex;
-extern pthread_mutex_t rxkad_stats_mutex;
 extern pthread_mutex_t des_init_mutex;
 extern pthread_mutex_t des_random_mutex;
 extern pthread_mutex_t rx_clock_mutex;
@@ -207,8 +206,6 @@ rxi_InitPthread(void)
           (&rxkad_client_uid_mutex, (const pthread_mutexattr_t *)0) == 0);
     assert(pthread_mutex_init
           (&rxkad_random_mutex, (const pthread_mutexattr_t *)0) == 0);
-    assert(pthread_mutex_init
-          (&rxkad_stats_mutex, (const pthread_mutexattr_t *)0) == 0);
     assert(pthread_mutex_init(&rx_debug_mutex, (const pthread_mutexattr_t *)0)
           == 0);
 
@@ -217,6 +214,9 @@ rxi_InitPthread(void)
     assert(pthread_cond_init(&rx_listener_cond, (const pthread_condattr_t *)0)
           == 0);
     assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
+    assert(pthread_key_create(&rx_ts_info_key, NULL) == 0);
+    rxkad_global_stats_init();
 }
 
 pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
@@ -461,14 +461,18 @@ rx_InitHost(u_int host, u_int port)
 
     /* Malloc up a bunch of packets & buffers */
     rx_nFreePackets = 0;
-    rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2;  /* fudge */
     queue_Init(&rx_freePacketQueue);
     rxi_NeedMorePackets = FALSE;
+#ifdef RX_ENABLE_TSFPQ
+    rx_nPackets = 0;   /* in TSFPQ version, rx_nPackets is managed by rxi_MorePackets* */
+    rxi_MorePacketsTSFPQ(rx_extraPackets + RX_MAX_QUOTA + 2, RX_TS_FPQ_FLUSH_GLOBAL, 0);
+#else /* RX_ENABLE_TSFPQ */
+    rx_nPackets = rx_extraPackets + RX_MAX_QUOTA + 2;  /* fudge */
     rxi_MorePackets(rx_nPackets);
+#endif /* RX_ENABLE_TSFPQ */
     rx_CheckPackets();
 
     NETPRI;
-    AFS_RXGLOCK();
 
     clock_Init();
 
@@ -534,7 +538,6 @@ rx_InitHost(u_int host, u_int port)
      * implementation environment--kernel or user space) */
     rxi_StartListener();
 
-    AFS_RXGUNLOCK();
     USERPRI;
     tmp_status = rxinit_status = 0;
     UNLOCK_RX_INIT;
@@ -655,6 +658,18 @@ rxi_StartServerProcs(int nExistingProcs)
 }
 #endif /* KERNEL */
 
+#ifdef AFS_NT40_ENV
+/* This routine is only required on Windows */
+void
+rx_StartClientThread(void)
+{
+#ifdef AFS_PTHREAD_ENV
+    int pid;
+    pid = (int) pthread_self();
+#endif /* AFS_PTHREAD_ENV */
+}
+#endif /* AFS_NT40_ENV */
+
 /* This routine must be called if any services are exported.  If the
  * donateMe flag is set, the calling process is donated to the server
  * process pool */
@@ -667,7 +682,6 @@ rx_StartServer(int donateMe)
     clock_NewTime();
 
     NETPRI;
-    AFS_RXGLOCK();
     /* Start server processes, if necessary (exact function is dependent
      * on the implementation environment--kernel or user space).  DonateMe
      * will be 1 if there is 1 pre-existing proc, i.e. this one.  In this
@@ -694,7 +708,6 @@ rx_StartServer(int donateMe)
     /* Turn on reaping of idle server connections */
     rxi_ReapConnections();
 
-    AFS_RXGUNLOCK();
     USERPRI;
 
     if (donateMe) {
@@ -717,6 +730,12 @@ rx_StartServer(int donateMe)
 #endif /* AFS_NT40_ENV */
        rx_ServerProc();        /* Never returns */
     }
+#ifdef RX_ENABLE_TSFPQ
+    /* no use leaving packets around in this thread's local queue if
+     * it isn't getting donated to the server thread pool. 
+     */
+    rxi_FlushLocalPacketsTSFPQ();
+#endif /* RX_ENABLE_TSFPQ */
     return;
 }
 
@@ -746,7 +765,6 @@ rx_NewConnection(register afs_uint32 shost, u_short sport, u_short sservice,
     CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
 #endif
     NETPRI;
-    AFS_RXGLOCK();
     MUTEX_ENTER(&rx_connHashTable_lock);
     cid = (rx_nextCid += RX_MAXCALLS);
     conn->type = RX_CLIENT_CONNECTION;
@@ -780,7 +798,6 @@ rx_NewConnection(register afs_uint32 shost, u_short sport, u_short sservice,
     MUTEX_EXIT(&rx_stats_mutex);
 
     MUTEX_EXIT(&rx_connHashTable_lock);
-    AFS_RXGUNLOCK();
     USERPRI;
     return conn;
 }
@@ -1010,9 +1027,7 @@ rx_DestroyConnection(register struct rx_connection *conn)
     SPLVAR;
 
     NETPRI;
-    AFS_RXGLOCK();
     rxi_DestroyConnection(conn);
-    AFS_RXGUNLOCK();
     USERPRI;
 }
 
@@ -1022,11 +1037,9 @@ rx_GetConnection(register struct rx_connection *conn)
     SPLVAR;
 
     NETPRI;
-    AFS_RXGLOCK();
     MUTEX_ENTER(&conn->conn_data_lock);
     conn->refCount++;
     MUTEX_EXIT(&conn->conn_data_lock);
-    AFS_RXGUNLOCK();
     USERPRI;
 }
 
@@ -1052,7 +1065,6 @@ rx_NewCall(register struct rx_connection *conn)
 
     NETPRI;
     clock_GetTime(&queueTime);
-    AFS_RXGLOCK();
     MUTEX_ENTER(&conn->conn_call_lock);
 
     /*
@@ -1060,14 +1072,32 @@ rx_NewCall(register struct rx_connection *conn)
      * If so, let them go first to avoid starving them.
      * This is a fairly simple scheme, and might not be
      * a complete solution for large numbers of waiters.
+     * 
+     * makeCallWaiters keeps track of the number of 
+     * threads waiting to make calls and the 
+     * RX_CONN_MAKECALL_WAITING flag bit is used to 
+     * indicate that there are indeed calls waiting.
+     * The flag is set when the waiter is incremented.
+     * It is only cleared in rx_EndCall when 
+     * makeCallWaiters is 0.  This prevents us from 
+     * accidently destroying the connection while it
+     * is potentially about to be used.
      */
+    MUTEX_ENTER(&conn->conn_data_lock);
     if (conn->makeCallWaiters) {
+       conn->flags |= RX_CONN_MAKECALL_WAITING;
+       conn->makeCallWaiters++;
+        MUTEX_EXIT(&conn->conn_data_lock);
+
 #ifdef RX_ENABLE_LOCKS
-       CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
+        CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
 #else
-       osi_rxSleep(conn);
+        osi_rxSleep(conn);
 #endif
-    }
+       MUTEX_ENTER(&conn->conn_data_lock);
+       conn->makeCallWaiters--;
+    } 
+    MUTEX_EXIT(&conn->conn_data_lock);
 
     for (;;) {
        for (i = 0; i < RX_MAXCALLS; i++) {
@@ -1090,15 +1120,17 @@ rx_NewCall(register struct rx_connection *conn)
        }
        MUTEX_ENTER(&conn->conn_data_lock);
        conn->flags |= RX_CONN_MAKECALL_WAITING;
+       conn->makeCallWaiters++;
        MUTEX_EXIT(&conn->conn_data_lock);
 
-       conn->makeCallWaiters++;
 #ifdef RX_ENABLE_LOCKS
        CV_WAIT(&conn->conn_call_cv, &conn->conn_call_lock);
 #else
        osi_rxSleep(conn);
 #endif
+       MUTEX_ENTER(&conn->conn_data_lock);
        conn->makeCallWaiters--;
+       MUTEX_EXIT(&conn->conn_data_lock);
     }
     /*
      * Wake up anyone else who might be giving us a chance to
@@ -1114,8 +1146,12 @@ rx_NewCall(register struct rx_connection *conn)
 
     /* Client is initially in send mode */
     call->state = RX_STATE_ACTIVE;
-    call->mode = RX_MODE_SENDING;
-
+    call->error = conn->error;
+    if (call->error)
+       call->mode = RX_MODE_ERROR;
+    else
+       call->mode = RX_MODE_SENDING;
+    
     /* remember start time for call in case we have hard dead time limit */
     call->queueTime = queueTime;
     clock_GetTime(&call->startTime);
@@ -1127,27 +1163,30 @@ rx_NewCall(register struct rx_connection *conn)
 
     MUTEX_EXIT(&call->lock);
     MUTEX_EXIT(&conn->conn_call_lock);
-    AFS_RXGUNLOCK();
     USERPRI;
 
 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
     /* Now, if TQ wasn't cleared earlier, do it now. */
-    AFS_RXGLOCK();
     MUTEX_ENTER(&call->lock);
     while (call->flags & RX_CALL_TQ_BUSY) {
        call->flags |= RX_CALL_TQ_WAIT;
+       call->tqWaiters++;
 #ifdef RX_ENABLE_LOCKS
+       osirx_AssertMine(&call->lock, "rxi_Start lock4");
        CV_WAIT(&call->cv_tq, &call->lock);
 #else /* RX_ENABLE_LOCKS */
        osi_rxSleep(&call->tq);
 #endif /* RX_ENABLE_LOCKS */
+       call->tqWaiters--;
+       if (call->tqWaiters == 0) {
+           call->flags &= ~RX_CALL_TQ_WAIT;
+       }
     }
     if (call->flags & RX_CALL_TQ_CLEARME) {
        rxi_ClearTransmitQueue(call, 0);
        queue_Init(&call->tq);
     }
     MUTEX_EXIT(&call->lock);
-    AFS_RXGUNLOCK();
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
 
     return call;
@@ -1250,7 +1289,6 @@ rx_NewService(u_short port, u_short serviceId, char *serviceName,
 
     tservice = rxi_AllocService();
     NETPRI;
-    AFS_RXGLOCK();
     for (i = 0; i < RX_MAX_SERVICES; i++) {
        register struct rx_service *service = rx_services[i];
        if (service) {
@@ -1263,7 +1301,6 @@ rx_NewService(u_short port, u_short serviceId, char *serviceName,
                    (osi_Msg
                     "rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n",
                     serviceName, serviceId, service->serviceName);
-                   AFS_RXGUNLOCK();
                    USERPRI;
                    rxi_FreeService(tservice);
                    return service;
@@ -1278,7 +1315,6 @@ rx_NewService(u_short port, u_short serviceId, char *serviceName,
                 * service on same port) get a new one */
                socket = rxi_GetHostUDPSocket(htonl(INADDR_ANY), port);
                if (socket == OSI_NULLSOCKET) {
-                   AFS_RXGUNLOCK();
                    USERPRI;
                    rxi_FreeService(tservice);
                    return 0;
@@ -1298,12 +1334,10 @@ rx_NewService(u_short port, u_short serviceId, char *serviceName,
            service->executeRequestProc = serviceProc;
            service->checkReach = 0;
            rx_services[i] = service;   /* not visible until now */
-           AFS_RXGUNLOCK();
            USERPRI;
            return service;
        }
     }
-    AFS_RXGUNLOCK();
     USERPRI;
     rxi_FreeService(tservice);
     (osi_Msg "rx_NewService: cannot support > %d services\n",
@@ -1343,14 +1377,12 @@ rxi_ServerProc(int threadID, struct rx_call *newcall, osi_socket * socketp)
            SPLVAR;
 
            NETPRI;
-           AFS_RXGLOCK();
            MUTEX_ENTER(&call->lock);
 
            rxi_CallError(call, RX_RESTARTING);
            rxi_SendCallAbort(call, (struct rx_packet *)0, 0, 0);
 
            MUTEX_EXIT(&call->lock);
-           AFS_RXGUNLOCK();
            USERPRI;
        }
 #ifdef KERNEL
@@ -1392,7 +1424,6 @@ rx_WakeupServerProcs(void)
     SPLVAR;
 
     NETPRI;
-    AFS_RXGLOCK();
     MUTEX_ENTER(&rx_serverPool_lock);
 
 #ifdef RX_ENABLE_LOCKS
@@ -1420,7 +1451,6 @@ rx_WakeupServerProcs(void)
 #endif /* RX_ENABLE_LOCKS */
     }
     MUTEX_EXIT(&rx_serverPool_lock);
-    AFS_RXGUNLOCK();
     USERPRI;
 }
 
@@ -1631,7 +1661,6 @@ rx_GetCall(int tno, struct rx_service *cur_service, osi_socket * socketp)
     SPLVAR;
 
     NETPRI;
-    AFS_RXGLOCK();
     MUTEX_ENTER(&freeSQEList_lock);
 
     if ((sq = rx_FreeSQEList)) {
@@ -1726,7 +1755,6 @@ rx_GetCall(int tno, struct rx_service *cur_service, osi_socket * socketp)
            osi_rxSleep(sq);
 #ifdef KERNEL
            if (afs_termState == AFSOP_STOP_RXCALLBACK) {
-               AFS_RXGUNLOCK();
                USERPRI;
                rxi_Free(sq, sizeof(struct rx_serverQueueEntry));
                return (struct rx_call *)0;
@@ -1767,7 +1795,6 @@ rx_GetCall(int tno, struct rx_service *cur_service, osi_socket * socketp)
        dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
     }
 
-    AFS_RXGUNLOCK();
     USERPRI;
 
     return call;
@@ -1815,7 +1842,6 @@ rx_EndCall(register struct rx_call *call, afs_int32 rc)
     dpf(("rx_EndCall(call %x)\n", call));
 
     NETPRI;
-    AFS_RXGLOCK();
     MUTEX_ENTER(&call->lock);
 
     if (rc == 0 && call->error == 0) {
@@ -1879,6 +1905,9 @@ rx_EndCall(register struct rx_call *call, afs_int32 rc)
         * rx_NewCall is in a stable state. Otherwise, rx_NewCall may
         * have checked this call, found it active and by the time it
         * goes to sleep, will have missed the signal.
+         *
+         * Do not clear the RX_CONN_MAKECALL_WAITING flag as long as
+         * there are threads waiting to use the conn object.
         */
        MUTEX_EXIT(&call->lock);
        MUTEX_ENTER(&conn->conn_call_lock);
@@ -1886,7 +1915,8 @@ rx_EndCall(register struct rx_call *call, afs_int32 rc)
        MUTEX_ENTER(&conn->conn_data_lock);
        conn->flags |= RX_CONN_BUSY;
        if (conn->flags & RX_CONN_MAKECALL_WAITING) {
-           conn->flags &= (~RX_CONN_MAKECALL_WAITING);
+            if (conn->makeCallWaiters == 0)
+                conn->flags &= (~RX_CONN_MAKECALL_WAITING);
            MUTEX_EXIT(&conn->conn_data_lock);
 #ifdef RX_ENABLE_LOCKS
            CV_BROADCAST(&conn->conn_call_cv);
@@ -1908,17 +1938,14 @@ rx_EndCall(register struct rx_call *call, afs_int32 rc)
      * kernel version, and may interrupt the macros rx_Read or
      * rx_Write, which run at normal priority for efficiency. */
     if (call->currentPacket) {
-       rxi_FreePacket(call->currentPacket);
+       queue_Prepend(&call->iovq, call->currentPacket);
        call->currentPacket = (struct rx_packet *)0;
-       call->nLeft = call->nFree = call->curlen = 0;
-    } else
-       call->nLeft = call->nFree = call->curlen = 0;
+    }
+       
+    call->nLeft = call->nFree = call->curlen = 0;
 
     /* Free any packets from the last call to ReadvProc/WritevProc */
-    for (queue_Scan(&call->iovq, tp, nxp, rx_packet)) {
-       queue_Remove(tp);
-       rxi_FreePacket(tp);
-    }
+    rxi_FreePackets(0, &call->iovq);
 
     CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
     MUTEX_EXIT(&call->lock);
@@ -1926,7 +1953,6 @@ rx_EndCall(register struct rx_call *call, afs_int32 rc)
        MUTEX_EXIT(&conn->conn_call_lock);
        conn->flags &= ~RX_CONN_BUSY;
     }
-    AFS_RXGUNLOCK();
     USERPRI;
     /*
      * Map errors to the local host's errno.h format.
@@ -2173,7 +2199,7 @@ rxi_FreeCall(register struct rx_call *call)
      * If someone else destroys a connection, they either have no
      * call lock held or are going through this section of code.
      */
-    if (conn->flags & RX_CONN_DESTROY_ME) {
+    if (conn->flags & RX_CONN_DESTROY_ME && !(conn->flags & RX_CONN_MAKECALL_WAITING)) {
        MUTEX_ENTER(&conn->conn_data_lock);
        conn->refCount++;
        MUTEX_EXIT(&conn->conn_data_lock);
@@ -2194,30 +2220,13 @@ rxi_Alloc(register size_t size)
 {
     register char *p;
 
-#if defined(AFS_AIX41_ENV) && defined(KERNEL)
-    /* Grab the AFS filesystem lock. See afs/osi.h for the lock
-     * implementation.
-     */
-    int glockOwner = ISAFS_GLOCK();
-    if (!glockOwner)
-       AFS_GLOCK();
-#endif
     MUTEX_ENTER(&rx_stats_mutex);
     rxi_Alloccnt++;
     rxi_Allocsize += size;
     MUTEX_EXIT(&rx_stats_mutex);
-#if    (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
-    if (size > AFS_SMALLOCSIZ) {
-       p = (char *)osi_AllocMediumSpace(size);
-    } else
-       p = (char *)osi_AllocSmall(size, 1);
-#if defined(AFS_AIX41_ENV) && defined(KERNEL)
-    if (!glockOwner)
-       AFS_GUNLOCK();
-#endif
-#else
+
     p = (char *)osi_Alloc(size);
-#endif
+
     if (!p)
        osi_Panic("rxi_Alloc error");
     memset(p, 0, size);
@@ -2227,30 +2236,12 @@ rxi_Alloc(register size_t size)
 void
 rxi_Free(void *addr, register size_t size)
 {
-#if defined(AFS_AIX41_ENV) && defined(KERNEL)
-    /* Grab the AFS filesystem lock. See afs/osi.h for the lock
-     * implementation.
-     */
-    int glockOwner = ISAFS_GLOCK();
-    if (!glockOwner)
-       AFS_GLOCK();
-#endif
     MUTEX_ENTER(&rx_stats_mutex);
     rxi_Alloccnt--;
     rxi_Allocsize -= size;
     MUTEX_EXIT(&rx_stats_mutex);
-#if    (defined(AFS_AIX32_ENV) || defined(AFS_HPUX_ENV)) && !defined(AFS_HPUX100_ENV) && defined(KERNEL)
-    if (size > AFS_SMALLOCSIZ)
-       osi_FreeMediumSpace(addr);
-    else
-       osi_FreeSmall(addr);
-#if defined(AFS_AIX41_ENV) && defined(KERNEL)
-    if (!glockOwner)
-       AFS_GUNLOCK();
-#endif
-#else
+
     osi_Free(addr, size);
-#endif
 }
 
 /* Find the peer process represented by the supplied (host,port)
@@ -2617,10 +2608,31 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket,
            call = rxi_NewCall(conn, channel);
            MUTEX_EXIT(&conn->conn_call_lock);
            *call->callNumber = np->header.callNumber;
+           if (np->header.callNumber == 0) 
+               dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %lx resend %d.%0.3d len %d", np->header.serial, rx_packetTypes[np->header.type - 1], conn->peer->host, conn->peer->port, np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq, np->header.flags, (unsigned long)np, np->retryTime.sec, np->retryTime.usec / 1000, np->length));
+
            call->state = RX_STATE_PRECALL;
            clock_GetTime(&call->queueTime);
            hzero(call->bytesSent);
            hzero(call->bytesRcvd);
+           /*
+            * If the number of queued calls exceeds the overload
+            * threshold then abort this call.
+            */
+           if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
+               struct rx_packet *tp;
+               
+               rxi_CallError(call, rx_BusyError);
+               tp = rxi_SendCallAbort(call, np, 1, 0);
+               MUTEX_EXIT(&call->lock);
+               MUTEX_ENTER(&conn->conn_data_lock);
+               conn->refCount--;
+               MUTEX_EXIT(&conn->conn_data_lock);
+               MUTEX_ENTER(&rx_stats_mutex);
+               rx_stats.nBusies++;
+               MUTEX_EXIT(&rx_stats_mutex);
+               return tp;
+           }
            rxi_KeepAliveOn(call);
        } else if (np->header.callNumber != currentCallNumber) {
            /* Wait until the transmit queue is idle before deciding
@@ -2632,11 +2644,16 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket,
            while ((call->state == RX_STATE_ACTIVE)
                   && (call->flags & RX_CALL_TQ_BUSY)) {
                call->flags |= RX_CALL_TQ_WAIT;
+               call->tqWaiters++;
 #ifdef RX_ENABLE_LOCKS
+               osirx_AssertMine(&call->lock, "rxi_Start lock3");
                CV_WAIT(&call->cv_tq, &call->lock);
 #else /* RX_ENABLE_LOCKS */
                osi_rxSleep(&call->tq);
 #endif /* RX_ENABLE_LOCKS */
+               call->tqWaiters--;
+               if (call->tqWaiters == 0)
+                   call->flags &= ~RX_CALL_TQ_WAIT;
            }
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
            /* If the new call cannot be taken right now send a busy and set
@@ -2656,6 +2673,9 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket,
            }
            rxi_ResetCall(call, 0);
            *call->callNumber = np->header.callNumber;
+           if (np->header.callNumber == 0) 
+               dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %lx resend %d.%0.3d len %d", np->header.serial, rx_packetTypes[np->header.type - 1], conn->peer->host, conn->peer->port, np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq, np->header.flags, (unsigned long)np, np->retryTime.sec, np->retryTime.usec / 1000, np->length));
+
            call->state = RX_STATE_PRECALL;
            clock_GetTime(&call->queueTime);
            hzero(call->bytesSent);
@@ -3793,11 +3813,16 @@ rxi_ReceiveAckPacket(register struct rx_call *call, struct rx_packet *np,
        call->flags |= RX_CALL_FAST_RECOVER_WAIT;
        while (call->flags & RX_CALL_TQ_BUSY) {
            call->flags |= RX_CALL_TQ_WAIT;
+           call->tqWaiters++;
 #ifdef RX_ENABLE_LOCKS
+           osirx_AssertMine(&call->lock, "rxi_Start lock2");
            CV_WAIT(&call->cv_tq, &call->lock);
 #else /* RX_ENABLE_LOCKS */
            osi_rxSleep(&call->tq);
 #endif /* RX_ENABLE_LOCKS */
+           call->tqWaiters--;
+           if (call->tqWaiters == 0)
+               call->flags &= ~RX_CALL_TQ_WAIT;
        }
        MUTEX_ENTER(&peer->peer_lock);
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
@@ -4126,8 +4151,6 @@ rxi_SetAcksInTransmitQueue(register struct rx_call *call)
     int someAcked = 0;
 
     for (queue_Scan(&call->tq, p, tp, rx_packet)) {
-       if (!p)
-           break;
        p->flags |= RX_PKTFLAG_ACKED;
        someAcked = 1;
     }
@@ -4162,8 +4185,6 @@ rxi_ClearTransmitQueue(register struct rx_call *call, register int force)
     if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
        int someAcked = 0;
        for (queue_Scan(&call->tq, p, tp, rx_packet)) {
-           if (!p)
-               break;
            p->flags |= RX_PKTFLAG_ACKED;
            someAcked = 1;
        }
@@ -4173,12 +4194,7 @@ rxi_ClearTransmitQueue(register struct rx_call *call, register int force)
        }
     } else {
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
-       for (queue_Scan(&call->tq, p, tp, rx_packet)) {
-           if (!p)
-               break;
-           queue_Remove(p);
-           rxi_FreePacket(p);
-       }
+       rxi_FreePackets(0, &call->tq);
 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
        call->flags &= ~RX_CALL_TQ_CLEARME;
     }
@@ -4203,15 +4219,8 @@ rxi_ClearTransmitQueue(register struct rx_call *call, register int force)
 void
 rxi_ClearReceiveQueue(register struct rx_call *call)
 {
-    register struct rx_packet *p, *tp;
     if (queue_IsNotEmpty(&call->rq)) {
-       for (queue_Scan(&call->rq, p, tp, rx_packet)) {
-           if (!p)
-               break;
-           queue_Remove(p);
-           rxi_FreePacket(p);
-           rx_packetReclaims++;
-       }
+       rx_packetReclaims += rxi_FreePackets(0, &call->rq);
        call->flags &= ~(RX_CALL_RECEIVE_DONE | RX_CALL_HAVE_LAST);
     }
     if (call->state == RX_STATE_PRECALL) {
@@ -4347,7 +4356,7 @@ rxi_CallError(register struct rx_call *call, afs_int32 error)
     if (call->error)
        error = call->error;
 #ifdef RX_GLOBAL_RXLOCK_KERNEL
-    if (!(call->flags & RX_CALL_TQ_BUSY)) {
+    if (!((call->flags & RX_CALL_TQ_BUSY) || (call->tqWaiters > 0))) {
        rxi_ResetCall(call, 0);
     }
 #else
@@ -4423,7 +4432,7 @@ rxi_ResetCall(register struct rx_call *call, register int newcall)
     flags = call->flags;
     rxi_ClearReceiveQueue(call);
 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
-    if (call->flags & RX_CALL_TQ_BUSY) {
+    if (flags & RX_CALL_TQ_BUSY) {
        call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
        call->flags |= (flags & RX_CALL_TQ_WAIT);
     } else
@@ -4431,7 +4440,18 @@ rxi_ResetCall(register struct rx_call *call, register int newcall)
     {
        rxi_ClearTransmitQueue(call, 0);
        queue_Init(&call->tq);
+       if (call->tqWaiters || (flags & RX_CALL_TQ_WAIT)) {
+           dpf(("rcall %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
+       }
        call->flags = 0;
+       while (call->tqWaiters) {
+#ifdef RX_ENABLE_LOCKS
+           CV_BROADCAST(&call->cv_tq);
+#else /* RX_ENABLE_LOCKS */
+           osi_rxWakeup(&call->tq);
+#endif /* RX_ENABLE_LOCKS */
+           call->tqWaiters--;
+       }
     }
     queue_Init(&call->rq);
     call->error = 0;
@@ -4539,6 +4559,9 @@ rxi_SendAck(register struct rx_call *call,
     register struct rx_packet *p;
     u_char offset;
     afs_int32 templ;
+#ifdef RX_ENABLE_TSFPQ
+    struct rx_ts_info_t * rx_ts_info;
+#endif
 
     /*
      * Open the receive window once a thread starts reading packets
@@ -4556,24 +4579,41 @@ rxi_SendAck(register struct rx_call *call,
     if (p) {
        rx_computelen(p, p->length);    /* reset length, you never know */
     } /* where that's been...         */
+#ifdef RX_ENABLE_TSFPQ
+    else {
+        RX_TS_INFO_GET(rx_ts_info);
+        if ((p = rx_ts_info->local_special_packet)) {
+            rx_computelen(p, p->length);
+        } else if ((p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) {
+            rx_ts_info->local_special_packet = p;
+        } else { /* We won't send the ack, but don't panic. */
+            return optionalPacket;
+        }
+    }
+#else
     else if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL))) {
        /* We won't send the ack, but don't panic. */
        return optionalPacket;
     }
+#endif
 
     templ =
        rx_AckDataSize(call->rwind) + 4 * sizeof(afs_int32) -
        rx_GetDataSize(p);
     if (templ > 0) {
-       if (rxi_AllocDataBuf(p, templ, RX_PACKET_CLASS_SPECIAL)) {
+       if (rxi_AllocDataBuf(p, templ, RX_PACKET_CLASS_SPECIAL) > 0) {
+#ifndef RX_ENABLE_TSFPQ
            if (!optionalPacket)
                rxi_FreePacket(p);
+#endif
            return optionalPacket;
        }
        templ = rx_AckDataSize(call->rwind) + 2 * sizeof(afs_int32);
        if (rx_Contiguous(p) < templ) {
+#ifndef RX_ENABLE_TSFPQ
            if (!optionalPacket)
                rxi_FreePacket(p);
+#endif
            return optionalPacket;
        }
     }
@@ -4601,8 +4641,10 @@ rxi_SendAck(register struct rx_call *call,
     for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
        if (!rqp || !call->rq.next
            || (rqp->header.seq > (call->rnext + call->rwind))) {
+#ifndef RX_ENABLE_TSFPQ
            if (!optionalPacket)
                rxi_FreePacket(p);
+#endif
            rxi_CallError(call, RX_CALL_DEAD);
            return optionalPacket;
        }
@@ -4612,8 +4654,10 @@ rxi_SendAck(register struct rx_call *call,
        ap->acks[offset++] = RX_ACK_TYPE_ACK;
 
        if ((offset > (u_char) rx_maxReceiveWindow) || (offset > call->rwind)) {
+#ifndef RX_ENABLE_TSFPQ
            if (!optionalPacket)
                rxi_FreePacket(p);
+#endif
            rxi_CallError(call, RX_CALL_DEAD);
            return optionalPacket;
        }
@@ -4693,8 +4737,10 @@ rxi_SendAck(register struct rx_call *call,
     MUTEX_ENTER(&rx_stats_mutex);
     rx_stats.ackPacketsSent++;
     MUTEX_EXIT(&rx_stats_mutex);
+#ifndef RX_ENABLE_TSFPQ
     if (!optionalPacket)
        rxi_FreePacket(p);
+#endif
     return optionalPacket;     /* Return packet for re-use by caller */
 }
 
@@ -4958,11 +5004,16 @@ rxi_Start(struct rxevent *event, register struct rx_call *call,
        call->flags |= RX_CALL_FAST_RECOVER_WAIT;
        while (call->flags & RX_CALL_TQ_BUSY) {
            call->flags |= RX_CALL_TQ_WAIT;
+           call->tqWaiters++;
 #ifdef RX_ENABLE_LOCKS
+           osirx_AssertMine(&call->lock, "rxi_Start lock1");
            CV_WAIT(&call->cv_tq, &call->lock);
 #else /* RX_ENABLE_LOCKS */
            osi_rxSleep(&call->tq);
 #endif /* RX_ENABLE_LOCKS */
+           call->tqWaiters--;
+           if (call->tqWaiters == 0)
+               call->flags &= ~RX_CALL_TQ_WAIT;
        }
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
        call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
@@ -5118,14 +5169,15 @@ rxi_Start(struct rxevent *event, register struct rx_call *call,
                 */
                if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
                    call->flags &= ~RX_CALL_TQ_BUSY;
-                   if (call->flags & RX_CALL_TQ_WAIT) {
-                       call->flags &= ~RX_CALL_TQ_WAIT;
+                   if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
+                       dpf(("call %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
+                   }
 #ifdef RX_ENABLE_LOCKS
-                       CV_BROADCAST(&call->cv_tq);
+                   osirx_AssertMine(&call->lock, "rxi_Start start");
+                   CV_BROADCAST(&call->cv_tq);
 #else /* RX_ENABLE_LOCKS */
-                       osi_rxWakeup(&call->tq);
+                   osi_rxWakeup(&call->tq);
 #endif /* RX_ENABLE_LOCKS */
-                   }
                    return;
                }
                if (call->error) {
@@ -5137,14 +5189,15 @@ rxi_Start(struct rxevent *event, register struct rx_call *call,
                    rx_tq_debug.rxi_start_aborted++;
                    MUTEX_EXIT(&rx_stats_mutex);
                    call->flags &= ~RX_CALL_TQ_BUSY;
-                   if (call->flags & RX_CALL_TQ_WAIT) {
-                       call->flags &= ~RX_CALL_TQ_WAIT;
+                   if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
+                       dpf(("call %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
+                   }
 #ifdef RX_ENABLE_LOCKS
-                       CV_BROADCAST(&call->cv_tq);
+                   osirx_AssertMine(&call->lock, "rxi_Start middle");
+                   CV_BROADCAST(&call->cv_tq);
 #else /* RX_ENABLE_LOCKS */
-                       osi_rxWakeup(&call->tq);
+                   osi_rxWakeup(&call->tq);
 #endif /* RX_ENABLE_LOCKS */
-                   }
                    rxi_CallError(call, call->error);
                    return;
                }
@@ -5224,14 +5277,15 @@ rxi_Start(struct rxevent *event, register struct rx_call *call,
             * protected by the global lock.
             */
            call->flags &= ~RX_CALL_TQ_BUSY;
-           if (call->flags & RX_CALL_TQ_WAIT) {
-               call->flags &= ~RX_CALL_TQ_WAIT;
+           if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
+               dpf(("call %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
+           }
 #ifdef RX_ENABLE_LOCKS
-               CV_BROADCAST(&call->cv_tq);
+           osirx_AssertMine(&call->lock, "rxi_Start end");
+           CV_BROADCAST(&call->cv_tq);
 #else /* RX_ENABLE_LOCKS */
-               osi_rxWakeup(&call->tq);
+           osi_rxWakeup(&call->tq);
 #endif /* RX_ENABLE_LOCKS */
-           }
        } else {
            call->flags |= RX_CALL_NEED_START;
        }
@@ -5558,20 +5612,10 @@ rxi_ComputeRoundTripTime(register struct rx_packet *p,
 {
     struct clock thisRtt, *rttp = &thisRtt;
 
-#if defined(AFS_ALPHA_LINUX22_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
-    /* making year 2038 bugs to get this running now - stroucki */
-    struct timeval temptime;
-#endif
     register int rtt_timeout;
 
-#if defined(AFS_ALPHA_LINUX20_ENV) && defined(AFS_PTHREAD_ENV) && !defined(KERNEL)
-    /* yet again. This was the worst Heisenbug of the port - stroucki */
-    clock_GetTime(&temptime);
-    rttp->sec = (afs_int32) temptime.tv_sec;
-    rttp->usec = (afs_int32) temptime.tv_usec;
-#else
     clock_GetTime(rttp);
-#endif
+
     if (clock_Lt(rttp, sentp)) {
        clock_Zero(rttp);
        return;                 /* somebody set the clock back, don't count this time. */
@@ -5989,6 +6033,26 @@ rxi_DebugPrint(char *format, int a1, int a2, int a3, int a4, int a5, int a6,
               int a7, int a8, int a9, int a10, int a11, int a12, int a13,
               int a14, int a15)
 {
+#ifdef AFS_NT40_ENV
+    char msg[512];
+    char tformat[256];
+    int len;
+
+    len = _snprintf(tformat, sizeof(tformat), "tid[%d] %s", GetCurrentThreadId(), format);
+
+    if (len > 0) {
+       len = _snprintf(msg, sizeof(msg)-2, 
+                       tformat, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, 
+                       a11, a12, a13, a14, a15);
+       if (len > 0) {
+           if (msg[len-1] != '\n') {
+               msg[len] = '\n';
+               msg[len+1] = '\0';
+           }
+           OutputDebugString(msg);
+       }
+    }
+#else
     struct clock now;
     clock_GetTime(&now);
     fprintf(rx_Log, " %u.%.3u:", (unsigned int)now.sec,
@@ -5996,10 +6060,9 @@ rxi_DebugPrint(char *format, int a1, int a2, int a3, int a4, int a5, int a6,
     fprintf(rx_Log, format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12,
            a13, a14, a15);
     putc('\n', rx_Log);
-}
 #endif
+}
 
-#ifdef RXDEBUG
 /*
  * This function is used to process the rx_stats structure that is local
  * to a process as well as an rx_stats structure received from a remote