rx-tq-busy-20070516
[openafs.git] / src / rx / rx.c
index 6694514..df35abc 100644 (file)
@@ -373,58 +373,19 @@ static int rxinit_status = 1;
 #define UNLOCK_RX_INIT
 #endif
 
-/*
- * Now, rx_InitHost is just a stub for rx_InitAddrs
- * Parameters are in network byte order.
- */
-
 int
 rx_InitHost(u_int host, u_int port)
 {
-    struct sockaddr_storage saddr;
-    int type = SOCK_DGRAM, len = sizeof(struct sockaddr_in);
-
-    memset((void *) &saddr, 0, sizeof(saddr));
-    rx_ssfamily(&saddr) = AF_INET;
-    ((struct sockaddr_in *) &saddr)->sin_addr.s_addr = host;
-    ((struct sockaddr_in *) &saddr)->sin_port = (u_short)port;
-#ifdef STRUCT_SOCKADDR_HAS_SA_LEN
-    ((struct sockaddr_in *) &saddr)->sin_len = sizeof(struct sockaddr_in);
-#endif
-    return rx_InitAddrs(&saddr, &type, &len, 1);
-}
-
-/*
- * New API: rx_InitAddrs(struct sockaddr_storage *, int *, int)
- *
- * Arguments:
- *
- * struct sockaddr_storage - array of struct sockaddr_storage elements,
- *                          each one listing an interface/protocol to
- *                          be listened on.
- * int *                  - array of integers listing the socket type
- *                          (SOCK_STREAM or SOCK_DGRAM) to be used
- *                          by the corresponding struct sockaddr_storage
- * int *                  - array of integers listing saddr sizes
- * int                    - Number of elements in sockaddr_storage array.
- *
- * Note that in general only servers should call this function; clients
- * should (for now) continue to call rx_Init().
- */
-
-int rx_InitAddrs(struct sockaddr_storage *saddrs, int *types, int *salens,
-                int nelem)
-{
 #ifdef KERNEL
     osi_timeval_t tv;
 #else /* KERNEL */
     struct timeval tv;
 #endif /* KERNEL */
     char *htable, *ptable;
-    int tmp_status, i;
-
+    int tmp_status;
+    
     SPLVAR;
-
+    
     INIT_PTHREAD_LOCKS;
     LOCK_RX_INIT;
     if (rxinit_status == 0) {
@@ -439,7 +400,7 @@ int rx_InitAddrs(struct sockaddr_storage *saddrs, int *types, int *salens,
     if (afs_winsockInit() < 0)
        return -1;
 #endif
-
+    
 #ifndef KERNEL
     /*
      * Initialize anything necessary to provide a non-premptive threading
@@ -447,29 +408,15 @@ int rx_InitAddrs(struct sockaddr_storage *saddrs, int *types, int *salens,
      */
     rxi_InitializeThreadSupport();
 #endif
-
+    
     /* Allocate and initialize a socket for client and perhaps server
      * connections. */
-
-    rx_socket = OSI_NULLSOCKET;
-    rx_port = 0;
-
-    for (i = 0; i < nelem; i++) {
-       switch (types[i]) {
-       case SOCK_DGRAM:
-           rx_socket = rxi_GetHostUDPSocket(&saddrs[i], salens[i]);
-           if (rx_socket == OSI_NULLSOCKET) {
-               UNLOCK_RX_INIT;
-               return RX_ADDRINUSE;
-           }
-           rx_port = rx_ss2pn(&saddrs[i]);
-           break;
-       default:
-           return RX_INVALID_OPERATION;
-       }
-
+    
+    rx_socket = rxi_GetHostUDPSocket(host, (u_short) port);
+    if (rx_socket == OSI_NULLSOCKET) {
+       UNLOCK_RX_INIT;
+       return RX_ADDRINUSE;
     }
-
 #ifdef RX_ENABLE_LOCKS
 #ifdef RX_LOCKS_DB
     rxdb_init();
@@ -532,19 +479,20 @@ int rx_InitAddrs(struct sockaddr_storage *saddrs, int *types, int *salens,
 #else
     osi_GetTime(&tv);
 #endif
-
-    if (! rx_port) {
+    if (port) {
+       rx_port = port;
+    } else {
 #if defined(KERNEL) && !defined(UKERNEL)
        /* Really, this should never happen in a real kernel */
        rx_port = 0;
 #else
-       struct sockaddr_storage sn;
-       socklen_t addrlen = sizeof(sn);
-       if (getsockname((int)rx_socket, (struct sockaddr *)&sn, &addrlen)) {
+       struct sockaddr_in addr;
+       int addrlen = sizeof(addr);
+       if (getsockname((int)rx_socket, (struct sockaddr *)&addr, &addrlen)) {
            rx_Finalize();
            return -1;
        }
-       rx_port = rx_ss2pn(&sn);
+       rx_port = addr.sin_port;
 #endif
     }
     rx_stats.minRtt.sec = 9999999;
@@ -598,7 +546,6 @@ rx_Init(u_int port)
     return rx_InitHost(htonl(INADDR_ANY), port);
 }
 
-
 /* called with unincremented nRequestsRunning to see if it is OK to start
  * a new thread in this service.  Could be "no" for two reasons: over the
  * max quota, or would prevent others from reaching their min quota.
@@ -788,50 +735,22 @@ rx_StartServer(int donateMe)
     return;
 }
 
-/*
- * Now, rx_NewConnection is just a stub for rx_NewConnectionAddrs()
- */
-
+/* Create a new client connection to the specified service, using the
+ * specified security object to implement the security model for this
+ * connection. */
 struct rx_connection *
 rx_NewConnection(register afs_uint32 shost, u_short sport, u_short sservice,
                 register struct rx_securityClass *securityObject,
                 int serviceSecurityIndex)
 {
-    struct sockaddr_in sin;
-    int len = sizeof(sin), type = SOCK_DGRAM;
-
-    memset((void *) &sin, 0, sizeof(sin));
-
-    sin.sin_family = AF_INET;
-    sin.sin_addr.s_addr = shost;
-    sin.sin_port = sport;
-
-    return rx_NewConnectionAddrs((struct sockaddr_storage *) &sin, &type,
-                                &len, 1, sservice, securityObject,
-                                serviceSecurityIndex);
-}
-
-/* Create a new client connection to the specified service, using the
- * specified security object to implement the security model for this
- * connection
- *
- * This follows the same logic as rx_InitAddrs() for the first four
- * arguments.
- */
-struct rx_connection *
-rx_NewConnectionAddrs(struct sockaddr_storage *saddr, int *type, int *slen,
-                     int nelem, u_short sservice,
-                     struct rx_securityClass *securityObject,
-                     int serviceSecurityIndex)
-{
-    int hashindex, i;
+    int hashindex;
     afs_int32 cid;
     register struct rx_connection *conn;
 
     SPLVAR;
 
     clock_NewTime();
-    dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n", ntohl(rx_ss2v4addr(saddr)), ntohs(rx_ss2pn(saddr)), sservice, securityObject, serviceSecurityIndex));
+    dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n", ntohl(shost), ntohs(sport), sservice, securityObject, serviceSecurityIndex));
 
     /* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
      * the case of kmem_alloc? */
@@ -847,16 +766,7 @@ rx_NewConnectionAddrs(struct sockaddr_storage *saddr, int *type, int *slen,
     conn->type = RX_CLIENT_CONNECTION;
     conn->cid = cid;
     conn->epoch = rx_epoch;
-    /*
-     * Right now we're going to just call rxi_FindPeer for UDP connections
-     * We're only going to support one.
-     */
-    for (i = 0; i < nelem; i++) {
-       if (type[i] == SOCK_DGRAM) {
-           conn->peer = rxi_FindPeer(&saddr[i], slen[i], type[i], 0, 1);
-           break;
-       }
-    }
+    conn->peer = rxi_FindPeer(shost, sport, 0, 1);
     conn->serviceId = sservice;
     conn->securityObject = securityObject;
     /* This doesn't work in all compilers with void (they're buggy), so fake it
@@ -1129,10 +1039,28 @@ rx_GetConnection(register struct rx_connection *conn)
     USERPRI;
 }
 
+/* Wait for the transmit queue to no longer be busy. 
+ * requires the call->lock to be held */
+static void rxi_WaitforTQBusy(struct rx_call *call) {
+    while (call->flags & RX_CALL_TQ_BUSY) {
+       call->flags |= RX_CALL_TQ_WAIT;
+       call->tqWaiters++;
+#ifdef RX_ENABLE_LOCKS
+       osirx_AssertMine(&call->lock, "rxi_WaitforTQ lock");
+       CV_WAIT(&call->cv_tq, &call->lock);
+#else /* RX_ENABLE_LOCKS */
+       osi_rxSleep(&call->tq);
+#endif /* RX_ENABLE_LOCKS */
+       call->tqWaiters--;
+       if (call->tqWaiters == 0) {
+           call->flags &= ~RX_CALL_TQ_WAIT;
+       }
+    }
+}
 /* Start a new rx remote procedure call, on the specified connection.
  * If wait is set to 1, wait for a free call channel; otherwise return
  * 0.  Maxtime gives the maximum number of seconds this call may take,
- * after rx_MakeCall returns.  After this time interval, a call to any
+ * after rx_NewCall returns.  After this time interval, a call to any
  * of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
  * For fine grain locking, we hold the conn_call_lock in order to 
  * to ensure that we don't get signalle after we found a call in an active
@@ -1147,7 +1075,7 @@ rx_NewCall(register struct rx_connection *conn)
     SPLVAR;
 
     clock_NewTime();
-    dpf(("rx_MakeCall(conn %x)\n", conn));
+    dpf(("rx_NewCall(conn %x)\n", conn));
 
     NETPRI;
     clock_GetTime(&queueTime);
@@ -1254,20 +1182,7 @@ rx_NewCall(register struct rx_connection *conn)
 #ifdef AFS_GLOBAL_RXLOCK_KERNEL
     /* Now, if TQ wasn't cleared earlier, do it now. */
     MUTEX_ENTER(&call->lock);
-    while (call->flags & RX_CALL_TQ_BUSY) {
-       call->flags |= RX_CALL_TQ_WAIT;
-       call->tqWaiters++;
-#ifdef RX_ENABLE_LOCKS
-       osirx_AssertMine(&call->lock, "rxi_Start lock4");
-       CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
-       osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
-       call->tqWaiters--;
-       if (call->tqWaiters == 0) {
-           call->flags &= ~RX_CALL_TQ_WAIT;
-       }
-    }
+    rxi_WaitforTQBusy(call);
     if (call->flags & RX_CALL_TQ_CLEARME) {
        rxi_ClearTransmitQueue(call, 0);
        queue_Init(&call->tq);
@@ -1275,6 +1190,7 @@ rx_NewCall(register struct rx_connection *conn)
     MUTEX_EXIT(&call->lock);
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
 
+    dpf(("rx_NewCall(call %x)\n", call));
     return call;
 }
 
@@ -1400,17 +1316,7 @@ rx_NewServiceHost(afs_uint32 host, u_short port, u_short serviceId,
            if (socket == OSI_NULLSOCKET) {
                /* If we don't already have a socket (from another
                 * service on same port) get a new one */
-               struct sockaddr_in sin;
-
-               memset((void *) &sin, 0, sizeof(sin));
-               sin.sin_family = AF_INET;
-               sin.sin_addr.s_addr = htonl(INADDR_ANY);
-               sin.sin_port = port;
-#ifdef STRUCT_SOCKADDR_HAS_SA_LEN
-               sin.sin_len = sizeof(sin);
-#endif
-               socket = rxi_GetHostUDPSocket((struct sockaddr_storage *) &sin,
-                                             sizeof(sin));
+               socket = rxi_GetHostUDPSocket(htonl(INADDR_ANY), port);
                if (socket == OSI_NULLSOCKET) {
                    USERPRI;
                    rxi_FreeService(tservice);
@@ -2180,6 +2086,8 @@ rxi_NewCall(register struct rx_connection *conn, register int channel)
     register struct rx_call *nxp;      /* Next call pointer, for queue_Scan */
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
 
+    dpf(("rxi_NewCall(conn %x, channel %d)\n", conn, channel));
+
     /* Grab an existing call structure, or allocate a new one.
      * Existing call structures are assumed to have been left reset by
      * rxi_FreeCall */
@@ -2361,62 +2269,22 @@ rxi_Free(void *addr, register size_t size)
  * refcount will be be decremented. This is used to replace the peer
  * structure hanging off a connection structure */
 struct rx_peer *
-rxi_FindPeer(struct sockaddr_storage *saddr, int slen, int stype,
-            struct rx_peer *origPeer, int create)
+rxi_FindPeer(register afs_uint32 host, register u_short port,
+             struct rx_peer *origPeer, int create)
 {
     register struct rx_peer *pp;
-    int hashIndex, i, j;
-    for (i = 0, j = 0; i < slen; i++)
-       j += ((unsigned char *) saddr)[i];
-    hashIndex = j % rx_hashTableSize;
+    int hashIndex;
+    hashIndex = PEER_HASH(host, port);
     MUTEX_ENTER(&rx_peerHashTable_lock);
     for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
-       if (memcmp(saddr, &pp->saddr, slen) == 0 && stype == pp->socktype)
-           break;
+       if ((pp->host == host) && (pp->port == port))
+            break;
     }
     if (!pp) {
-       if (create) {
-           pp = rxi_AllocPeer();       /* This bzero's *pp */
-           memcpy(&pp->saddr, saddr, slen);
-           pp->saddrlen = slen;
-           pp->socktype = stype;
-           switch (rx_ssfamily(saddr)) {
-           case AF_INET:
-               /*
-                * Should be enough storage for a dotted quad
-                */
-               snprintf(pp->addrstring, sizeof pp->addrstring, "%d.%d.%d.%d",
-                       rx_ss2addrp(saddr)[0], rx_ss2addrp(saddr)[1],
-                       rx_ss2addrp(saddr)[2], rx_ss2addrp(saddr)[3]);
-               break;
-#ifdef AF_INET6
-           case AF_INET6:
-               /*
-                * This gets more complicated, unfortunately
-                */
-               if (IN6_IS_ADDR_V4COMPAT(&(rx_ss2sin6(saddr)->sin6_addr))) {
-                   snprintf(pp->addrstring,
-                           sizeof pp->addrstring, "%d.%d.%d.%d",
-                           rx_ss2addrp(saddr)[12], rx_ss2addrp(saddr)[13],
-                           rx_ss2addrp(saddr)[14], rx_ss2addrp(saddr)[15]);
-               } else {
-                   snprintf(pp->addrstring,
-                            sizeof pp->addrstring, "%x:%x:%x:%x:%x:%x:%x:%x",
-                            ntohs(rx_ss2addrp6(saddr)[0]),
-                            ntohs(rx_ss2addrp6(saddr)[1]),
-                            ntohs(rx_ss2addrp6(saddr)[2]),
-                            ntohs(rx_ss2addrp6(saddr)[3]),
-                            ntohs(rx_ss2addrp6(saddr)[4]),
-                            ntohs(rx_ss2addrp6(saddr)[5]),
-                            ntohs(rx_ss2addrp6(saddr)[6]),
-                            ntohs(rx_ss2addrp6(saddr)[7]));
-               }
-               break;
-#endif /* AF_INET6 */
-           default:
-               strcpy(pp->addrstring, "??.??.??.??");
-               break;
-           }
+        if (create) {
+            pp = rxi_AllocPeer();       /* This bzero's *pp */
+           pp->host = host;    /* set here or in InitPeerParams is zero */
+           pp->port = port;
            MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
            queue_Init(&pp->congestionQueue);
            queue_Init(&pp->rpcStats);
@@ -2451,8 +2319,8 @@ rxi_FindPeer(struct sockaddr_storage *saddr, int slen, int stype,
  * server connection is created, it will be created using the supplied
  * index, if the index is valid for this service */
 struct rx_connection *
-rxi_FindConnection(osi_socket socket, struct sockaddr_storage *saddr,
-                  int slen, int socktype, u_short serviceId, afs_uint32 cid,
+rxi_FindConnection(osi_socket socket, register afs_int32 host,
+                  register u_short port, u_short serviceId, afs_uint32 cid,
                   afs_uint32 epoch, int type, u_int securityIndex)
 {
     int hashindex, flag;
@@ -2474,11 +2342,9 @@ rxi_FindConnection(osi_socket socket, struct sockaddr_storage *saddr,
                MUTEX_EXIT(&rx_connHashTable_lock);
                return (struct rx_connection *)0;
            }
-           if (memcmp(&pp->saddr, saddr, slen) == 0 &&
-                                               socktype == pp->socktype)
+           if (pp->host == host && pp->port == port)
                break;
-           if (type == RX_CLIENT_CONNECTION &&
-                                       rx_ss2pn(&pp->saddr) == rx_ss2pn(saddr))
+           if (type == RX_CLIENT_CONNECTION && pp->port == port)
                break;
            /* So what happens when it's a callback connection? */
            if (                /*type == RX_CLIENT_CONNECTION && */
@@ -2511,7 +2377,7 @@ rxi_FindConnection(osi_socket socket, struct sockaddr_storage *saddr,
        CV_INIT(&conn->conn_call_cv, "conn call cv", CV_DEFAULT, 0);
        conn->next = rx_connHashTable[hashindex];
        rx_connHashTable[hashindex] = conn;
-       conn->peer = rxi_FindPeer(saddr, slen, socktype, 0, 1);
+       conn->peer = rxi_FindPeer(host, port, 0, 1);
        conn->type = RX_SERVER_CONNECTION;
        conn->lastSendTime = clock_Sec();       /* don't GC immediately */
        conn->epoch = epoch;
@@ -2565,7 +2431,7 @@ int (*rx_almostSent) () = 0;
 
 struct rx_packet *
 rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket,
-                 struct sockaddr_storage *saddr, int slen, int *tnop,
+                 afs_uint32 host, u_short port, int *tnop,
                  struct rx_call **newcallp)
 {
     register struct rx_call *call;
@@ -2587,29 +2453,36 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket,
     packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
        ? rx_packetTypes[np->header.type - 1] : "*UNKNOWN*";
     dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
-        np->header.serial, packetType, ntohl(rx_ss2v4addr(saddr)),
-        ntohs(rx_ss2pn(saddr)), np->header.serviceId,
+        np->header.serial, packetType, ntohl(host), ntohs(port), np->header.serviceId,
         np->header.epoch, np->header.cid, np->header.callNumber,
         np->header.seq, np->header.flags, np));
 #endif
 
     if (np->header.type == RX_PACKET_TYPE_VERSION) {
-       return rxi_ReceiveVersionPacket(np, socket, saddr, slen, 1);
+       return rxi_ReceiveVersionPacket(np, socket, host, port, 1);
     }
 
     if (np->header.type == RX_PACKET_TYPE_DEBUG) {
-       return rxi_ReceiveDebugPacket(np, socket, saddr, slen, 1);
+       return rxi_ReceiveDebugPacket(np, socket, host, port, 1);
     }
 #ifdef RXDEBUG
     /* If an input tracer function is defined, call it with the packet and
      * network address.  Note this function may modify its arguments. */
     if (rx_justReceived) {
-       struct sockaddr_in *addr = (struct sockaddr_in *) saddr;
+       struct sockaddr_in addr;
        int drop;
-       drop = (*rx_justReceived) (np, addr);
+       addr.sin_family = AF_INET;
+       addr.sin_port = port;
+       addr.sin_addr.s_addr = host;
+#ifdef STRUCT_SOCKADDR_HAS_SA_LEN
+       addr.sin_len = sizeof(addr);
+#endif /* AFS_OSF_ENV */
+       drop = (*rx_justReceived) (np, &addr);
        /* drop packet if return value is non-zero */
        if (drop)
            return np;
+       port = addr.sin_port;   /* in case fcn changed addr */
+       host = addr.sin_addr.s_addr;
     }
 #endif
 
@@ -2620,9 +2493,9 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket,
     /* Find the connection (or fabricate one, if we're the server & if
      * necessary) associated with this packet */
     conn =
-       rxi_FindConnection(socket, saddr, slen, SOCK_DGRAM,
-                          np->header.serviceId, np->header.cid,
-                          np->header.epoch, type, np->header.securityIndex);
+       rxi_FindConnection(socket, host, port, np->header.serviceId,
+                          np->header.cid, np->header.epoch, type,
+                          np->header.securityIndex);
 
     if (!conn) {
        /* If no connection found or fabricated, just ignore the packet.
@@ -2757,7 +2630,7 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket,
            MUTEX_EXIT(&conn->conn_call_lock);
            *call->callNumber = np->header.callNumber;
            if (np->header.callNumber == 0) 
-               dpf(("RecPacket call 0 %d %s: %s.%u.%u.%u.%u.%u.%u flags %d, packet %lx resend %d.%0.3d len %d", np->header.serial, rx_packetTypes[np->header.type - 1], rx_AddrStringOf(conn->peer), ntohs(rx_PortOf(conn->peer)), np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq, np->header.flags, (unsigned long)np, np->retryTime.sec, np->retryTime.usec / 1000, np->length));
+               dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %lx resend %d.%0.3d len %d", np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port), np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq, np->header.flags, (unsigned long)np, np->retryTime.sec, np->retryTime.usec / 1000, np->length));
 
            call->state = RX_STATE_PRECALL;
            clock_GetTime(&call->queueTime);
@@ -2822,7 +2695,7 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket,
            rxi_ResetCall(call, 0);
            *call->callNumber = np->header.callNumber;
            if (np->header.callNumber == 0) 
-               dpf(("RecPacket call 0 %d %s: %s.%u.%u.%u.%u.%u.%u flags %d, packet %lx resend %d.%0.3d len %d", np->header.serial, rx_packetTypes[np->header.type - 1], rx_AddrStringOf(conn->peer), ntohs(rx_PortOf(conn->peer)), np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq, np->header.flags, (unsigned long)np, np->retryTime.sec, np->retryTime.usec / 1000, np->length));
+               dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %lx resend %d.%0.3d len %d", np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port), np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq, np->header.flags, (unsigned long)np, np->retryTime.sec, np->retryTime.usec / 1000, np->length));
 
            call->state = RX_STATE_PRECALL;
            clock_GetTime(&call->queueTime);
@@ -2983,7 +2856,7 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket,
     /* Now do packet type-specific processing */
     switch (np->header.type) {
     case RX_PACKET_TYPE_DATA:
-       np = rxi_ReceiveDataPacket(call, np, 1, socket, saddr, slen, tnop,
+       np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port, tnop,
                                   newcallp);
        break;
     case RX_PACKET_TYPE_ACK:
@@ -3227,8 +3100,8 @@ TryAttach(register struct rx_call *acall, register osi_socket socket,
 struct rx_packet *
 rxi_ReceiveDataPacket(register struct rx_call *call,
                      register struct rx_packet *np, int istack,
-                     osi_socket socket, struct sockaddr_storage *saddr,
-                     int slen, int *tnop, struct rx_call **newcallp)
+                     osi_socket socket, afs_uint32 host, u_short port,
+                     int *tnop, struct rx_call **newcallp)
 {
     int ackNeeded = 0;         /* 0 means no, otherwise ack_reason */
     int newPackets = 0;
@@ -3299,7 +3172,7 @@ rxi_ReceiveDataPacket(register struct rx_call *call,
        /* The RX_JUMBO_PACKET is set in all but the last packet in each
         * AFS 3.5 jumbogram. */
        if (flags & RX_JUMBO_PACKET) {
-           tnp = rxi_SplitJumboPacket(np, saddr, slen, isFirst);
+           tnp = rxi_SplitJumboPacket(np, host, port, isFirst);
        } else {
            tnp = NULL;
        }
@@ -3487,9 +3360,11 @@ rxi_ReceiveDataPacket(register struct rx_call *call,
 
            /* We need to send an ack of the packet is out of sequence, 
             * or if an ack was requested by the peer. */
-           if (seq != prev + 1 || missing || (flags & RX_REQUEST_ACK)) {
+           if (seq != prev + 1 || missing) {
                ackNeeded = RX_ACK_OUT_OF_SEQUENCE;
-           }
+           } else if (flags & RX_REQUEST_ACK) {
+               ackNeeded = RX_ACK_REQUESTED;
+            }
 
            /* Acknowledge the last packet for each call */
            if (flags & RX_LAST_PACKET) {
@@ -3957,9 +3832,9 @@ rxi_ReceiveAckPacket(register struct rx_call *call, struct rx_packet *np,
                          sizeof(afs_int32), &tSize);
            maxDgramPackets = (afs_uint32) ntohl(tSize);
            maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
-           maxDgramPackets =
-               MIN(maxDgramPackets, (int)(peer->ifDgramPackets));
-           maxDgramPackets = MIN(maxDgramPackets, tSize);
+           maxDgramPackets = MIN(maxDgramPackets, peer->ifDgramPackets);
+           if (peer->natMTU < peer->ifMTU)
+               maxDgramPackets = MIN(maxDgramPackets, rxi_AdjustDgramPackets(1, peer->natMTU));
            if (maxDgramPackets > 1) {
                peer->maxDgramPackets = maxDgramPackets;
                call->MTU = RX_JUMBOBUFFERSIZE + RX_HEADER_SIZE;
@@ -4021,19 +3896,7 @@ rxi_ReceiveAckPacket(register struct rx_call *call, struct rx_packet *np,
            return np;
        }
        call->flags |= RX_CALL_FAST_RECOVER_WAIT;
-       while (call->flags & RX_CALL_TQ_BUSY) {
-           call->flags |= RX_CALL_TQ_WAIT;
-           call->tqWaiters++;
-#ifdef RX_ENABLE_LOCKS
-           osirx_AssertMine(&call->lock, "rxi_Start lock2");
-           CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
-           osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
-           call->tqWaiters--;
-           if (call->tqWaiters == 0)
-               call->flags &= ~RX_CALL_TQ_WAIT;
-       }
+       rxi_WaitforTQBusy(call);
        MUTEX_ENTER(&peer->peer_lock);
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
        call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
@@ -4598,6 +4461,8 @@ rxi_ResetCall(register struct rx_call *call, register int newcall)
     register struct rx_peer *peer;
     struct rx_packet *packet;
 
+    dpf(("rxi_ResetCall(call %x, newcall %d)\n", call, newcall));
+
     /* Notify anyone who is waiting for asynchronous packet arrival */
     if (call->arrivalProc) {
        (*call->arrivalProc) (call, call->arrivalProcHandle,
@@ -5239,19 +5104,7 @@ rxi_Start(struct rxevent *event, register struct rx_call *call,
            return;
        }
        call->flags |= RX_CALL_FAST_RECOVER_WAIT;
-       while (call->flags & RX_CALL_TQ_BUSY) {
-           call->flags |= RX_CALL_TQ_WAIT;
-           call->tqWaiters++;
-#ifdef RX_ENABLE_LOCKS
-           osirx_AssertMine(&call->lock, "rxi_Start lock1");
-           CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
-           osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
-           call->tqWaiters--;
-           if (call->tqWaiters == 0)
-               call->flags &= ~RX_CALL_TQ_WAIT;
-       }
+       rxi_WaitforTQBusy(call);
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
        call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
        call->flags |= RX_CALL_FAST_RECOVER;
@@ -5766,6 +5619,7 @@ rxi_SendDelayedCallAbort(struct rxevent *event, register struct rx_call *call,
                            (char *)&error, sizeof(error), 0);
        rxi_FreePacket(packet);
     }
+    CALL_RELE(call, RX_CALL_REFCOUNT_ABORT);
     MUTEX_EXIT(&call->lock);
 }
 
@@ -6432,9 +6286,9 @@ rx_PrintStats(FILE * file)
 void
 rx_PrintPeerStats(FILE * file, struct rx_peer *peer)
 {
-/*    fprintf(file, "Peer %x.%d.  " "Burst size %d, " "burst wait %u.%d.\n",
+    fprintf(file, "Peer %x.%d.  " "Burst size %d, " "burst wait %u.%d.\n",
            ntohl(peer->host), (int)peer->port, (int)peer->burstSize,
-           (int)peer->burstWait.sec, (int)peer->burstWait.usec); */
+           (int)peer->burstWait.sec, (int)peer->burstWait.usec);
 
     fprintf(file,
            "   Rtt %d, " "retry time %u.%06d, " "total sent %d, "
@@ -6466,17 +6320,19 @@ MakeDebugCall(osi_socket socket, afs_uint32 remoteAddr, afs_uint16 remotePort,
              void *outputData, size_t outputLength)
 {
     static afs_int32 counter = 100;
-    time_t endTime;
+    time_t waitTime, waitCount, startTime, endTime;
     struct rx_header theader;
     char tbuffer[1500];
     register afs_int32 code;
-    struct timeval tv;
+    struct timeval tv_now, tv_wake, tv_delta;
     struct sockaddr_in taddr, faddr;
     int faddrLen;
     fd_set imask;
     register char *tp;
 
-    endTime = time(0) + 20;    /* try for 20 seconds */
+    startTime = time(0);
+    waitTime = 1;
+    waitCount = 5;
     LOCK_RX_DEBUG;
     counter++;
     UNLOCK_RX_DEBUG;
@@ -6505,29 +6361,54 @@ MakeDebugCall(osi_socket socket, afs_uint32 remoteAddr, afs_uint16 remotePort,
                   (struct sockaddr *)&taddr, sizeof(struct sockaddr_in));
 
        /* see if there's a packet available */
-       FD_ZERO(&imask);
-       FD_SET(socket, &imask);
-       tv.tv_sec = 1;
-       tv.tv_usec = 0;
-       code = select((int)(socket + 1), &imask, 0, 0, &tv);
-       if (code == 1 && FD_ISSET(socket, &imask)) {
-           /* now receive a packet */
-           faddrLen = sizeof(struct sockaddr_in);
-           code =
-               recvfrom(socket, tbuffer, sizeof(tbuffer), 0,
-                        (struct sockaddr *)&faddr, &faddrLen);
-
-           if (code > 0) {
-               memcpy(&theader, tbuffer, sizeof(struct rx_header));
-               if (counter == ntohl(theader.callNumber))
-                   break;
+       gettimeofday(&tv_wake,0);
+       tv_wake.tv_sec += waitTime;
+       for (;;) {
+           FD_ZERO(&imask);
+           FD_SET(socket, &imask);
+           tv_delta.tv_sec = tv_wake.tv_sec;
+           tv_delta.tv_usec = tv_wake.tv_usec;
+           gettimeofday(&tv_now, 0);
+           
+           if (tv_delta.tv_usec < tv_now.tv_usec) {
+               /* borrow */
+               tv_delta.tv_usec += 1000000;
+               tv_delta.tv_sec--;
+           }
+           tv_delta.tv_usec -= tv_now.tv_usec;
+           
+           if (tv_delta.tv_sec < tv_now.tv_sec) {
+               /* time expired */
+               break;
+           }
+           tv_delta.tv_sec -= tv_now.tv_sec;
+           
+           code = select(socket + 1, &imask, 0, 0, &tv_delta);
+           if (code == 1 && FD_ISSET(socket, &imask)) {
+               /* now receive a packet */
+               faddrLen = sizeof(struct sockaddr_in);
+               code =
+                   recvfrom(socket, tbuffer, sizeof(tbuffer), 0,
+                            (struct sockaddr *)&faddr, &faddrLen);
+               
+               if (code > 0) {
+                   memcpy(&theader, tbuffer, sizeof(struct rx_header));
+                   if (counter == ntohl(theader.callNumber))
+                       goto success;
+                   continue;
+               }
            }
+           break;
        }
 
        /* see if we've timed out */
-       if (endTime < time(0))
-           return -1;
+       if (!--waitCount) {
+            return -1;
+       }
+       waitTime <<= 1;
     }
+    
+ success:
     code -= sizeof(struct rx_header);
     if (code > outputLength)
        code = outputLength;
@@ -7064,7 +6945,7 @@ rxi_AddRpcStat(struct rx_queue *stats, afs_uint32 rxInterface,
               afs_uint32 currentFunc, afs_uint32 totalFunc,
               struct clock *queueTime, struct clock *execTime,
               afs_hyper_t * bytesSent, afs_hyper_t * bytesRcvd, int isServer,
-              struct sockaddr_storage *saddr,
+              afs_uint32 remoteHost, afs_uint32 remotePort,
               int addToPeerList, unsigned int *counter)
 {
     int rc = 0;
@@ -7102,19 +6983,8 @@ rxi_AddRpcStat(struct rx_queue *stats, afs_uint32 rxInterface,
        }
        *counter += totalFunc;
        for (i = 0; i < totalFunc; i++) {
-           switch (rx_ssfamily(saddr)) {
-           case AF_INET:
-               rpc_stat->stats[i].remote_peer =
-                                       rx_ss2sin(saddr)->sin_addr.s_addr;
-               break;
-           default:
-#ifdef AF_INET6
-           case AF_INET6:
-               rpc_stat->stats[i].remote_peer = 0xffffffff;
-               break;
-#endif /* AF_INET6 */
-           }
-           rpc_stat->stats[i].remote_port = rx_ss2pn(saddr);
+           rpc_stat->stats[i].remote_peer = remoteHost;
+           rpc_stat->stats[i].remote_port = remotePort;
            rpc_stat->stats[i].remote_is_server = isServer;
            rpc_stat->stats[i].interfaceId = rxInterface;
            rpc_stat->stats[i].func_total = totalFunc;
@@ -7217,18 +7087,13 @@ rx_IncrementTimeAndCount(struct rx_peer *peer, afs_uint32 rxInterface,
     if (rxi_monitor_peerStats) {
        rxi_AddRpcStat(&peer->rpcStats, rxInterface, currentFunc, totalFunc,
                       queueTime, execTime, bytesSent, bytesRcvd, isServer,
-                      &peer->saddr, 1, &rxi_rpc_peer_stat_cnt);
+                      peer->host, peer->port, 1, &rxi_rpc_peer_stat_cnt);
     }
 
     if (rxi_monitor_processStats) {
-       struct sockaddr_in sin;
-       sin.sin_family = AF_INET;
-       sin.sin_addr.s_addr = 0xffffffff;
-       sin.sin_port = 0xffff;
        rxi_AddRpcStat(&processStats, rxInterface, currentFunc, totalFunc,
                       queueTime, execTime, bytesSent, bytesRcvd, isServer,
-                      (struct sockaddr_storage *) &sin, 0,
-                      &rxi_rpc_process_stat_cnt);
+                      0xffffffff, 0xffffffff, 0, &rxi_rpc_process_stat_cnt);
     }
 
     MUTEX_EXIT(&peer->peer_lock);