rx: Account for delayed ACKS when computing RTO
[openafs.git] / src / rx / rx.c
index 949e89d..fa1ba86 100644 (file)
@@ -10,7 +10,7 @@
 /* RX:  Extended Remote Procedure Call */
 
 #include <afsconfig.h>
-#include "afs/param.h"
+#include <afs/param.h>
 
 #ifdef KERNEL
 # include "afs/sysincludes.h"
@@ -63,26 +63,12 @@ extern afs_int32 afs_termState;
 # include "afs/rxgen_consts.h"
 #else /* KERNEL */
 # include <roken.h>
-# include <sys/types.h>
-# include <string.h>
-# include <stdarg.h>
-# include <errno.h>
-# ifdef HAVE_STDINT_H
-#  include <stdint.h>
-# endif
+
 # ifdef AFS_NT40_ENV
-#  include <stdlib.h>
-#  include <fcntl.h>
 #  include <afs/afsutil.h>
 #  include <WINNT\afsreg.h>
-# else
-#  include <sys/socket.h>
-#  include <sys/file.h>
-#  include <netdb.h>
-#  include <sys/stat.h>
-#  include <netinet/in.h>
-#  include <sys/time.h>
 # endif
+
 # include "rx_user.h"
 #endif /* KERNEL */
 
@@ -125,6 +111,25 @@ struct rx_tq_debug {
 } rx_tq_debug;
 #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
 
+/* Constant delay time before sending an acknowledge of the last packet
+ * received.  This is to avoid sending an extra acknowledge when the
+ * client is about to make another call, anyway, or the server is
+ * about to respond.
+ *
+ * The lastAckDelay may not exceeed 400ms without causing peers to
+ * unecessarily timeout.
+ */
+struct clock rx_lastAckDelay = {0, 400000};
+
+/* Constant delay time before sending a soft ack when none was requested.
+ * This is to make sure we send soft acks before the sender times out,
+ * Normally we wait and send a hard ack when the receiver consumes the packet
+ *
+ * This value has been 100ms in all shipping versions of OpenAFS. Changing it
+ * will require changes to the peer's RTT calculations.
+ */
+struct clock rx_softAckDelay = {0, 100000};
+
 /*
  * rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
  * currently allocated within rx.  This number is used to allocate the
@@ -143,6 +148,16 @@ static unsigned int rxi_rpc_peer_stat_cnt;
 
 static unsigned int rxi_rpc_process_stat_cnt;
 
+/*
+ * rxi_busyChannelError is the error to return to the application when a call
+ * channel appears busy (inferred from the receipt of RX_PACKET_TYPE_BUSY
+ * packets on the channel), and there are other call channels in the
+ * connection that are not busy. If 0, we do not return errors upon receiving
+ * busy packets; we just keep trying on the same call channel until we hit a
+ * timeout.
+ */
+static afs_int32 rxi_busyChannelError = 0;
+
 rx_atomic_t rx_nWaiting = RX_ATOMIC_INIT(0);
 rx_atomic_t rx_nWaited = RX_ATOMIC_INIT(0);
 
@@ -154,6 +169,9 @@ rx_atomic_t rx_nWaited = RX_ATOMIC_INIT(0);
 afs_kmutex_t rx_atomic_mutex;
 #endif
 
+/* Forward prototypes */
+static struct rx_call * rxi_NewCall(struct rx_connection *, int);
+
 #ifdef AFS_PTHREAD_ENV
 
 /*
@@ -170,7 +188,6 @@ extern afs_kmutex_t des_random_mutex;
 extern afs_kmutex_t rx_clock_mutex;
 extern afs_kmutex_t rxi_connCacheMutex;
 extern afs_kmutex_t rx_event_mutex;
-extern afs_kmutex_t osi_malloc_mutex;
 extern afs_kmutex_t event_handler_mutex;
 extern afs_kmutex_t listener_mutex;
 extern afs_kmutex_t rx_if_init_mutex;
@@ -199,7 +216,6 @@ rxi_InitPthread(void)
     MUTEX_INIT(&epoch_mutex, "epoch", MUTEX_DEFAULT, 0);
     MUTEX_INIT(&rx_init_mutex, "init", MUTEX_DEFAULT, 0);
     MUTEX_INIT(&rx_event_mutex, "event", MUTEX_DEFAULT, 0);
-    MUTEX_INIT(&osi_malloc_mutex, "malloc", MUTEX_DEFAULT, 0);
     MUTEX_INIT(&event_handler_mutex, "event handler", MUTEX_DEFAULT, 0);
     MUTEX_INIT(&rxi_connCacheMutex, "conn cache", MUTEX_DEFAULT, 0);
     MUTEX_INIT(&listener_mutex, "listener", MUTEX_DEFAULT, 0);
@@ -304,7 +320,7 @@ pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
 
 #if defined(RX_ENABLE_LOCKS) && defined(KERNEL)
 static afs_kmutex_t rx_rpc_stats;
-void rxi_StartUnlocked(struct rxevent *event, void *call,
+static void rxi_StartUnlocked(struct rxevent *event, void *call,
                        void *arg1, int istack);
 #endif
 
@@ -565,12 +581,8 @@ rx_InitHost(u_int host, u_int port)
     rx_connHashTable = (struct rx_connection **)htable;
     rx_peerHashTable = (struct rx_peer **)ptable;
 
-    rx_lastAckDelay.sec = 0;
-    rx_lastAckDelay.usec = 400000;     /* 400 milliseconds */
     rx_hardAckDelay.sec = 0;
     rx_hardAckDelay.usec = 100000;     /* 100 milliseconds */
-    rx_softAckDelay.sec = 0;
-    rx_softAckDelay.usec = 100000;     /* 100 milliseconds */
 
     rxevent_Init(20, rxi_ReScheduleEvents);
 
@@ -584,9 +596,11 @@ rx_InitHost(u_int host, u_int port)
     rx_GetIFInfo();
 #endif
 
+#if defined(RXK_LISTENER_ENV) || !defined(KERNEL)
     /* Start listener process (exact function is dependent on the
      * implementation environment--kernel or user space) */
     rxi_StartListener();
+#endif
 
     USERPRI;
     tmp_status = rxinit_status = 0;
@@ -600,6 +614,20 @@ rx_Init(u_int port)
     return rx_InitHost(htonl(INADDR_ANY), port);
 }
 
+/**
+ * Sets the error generated when a busy call channel is detected.
+ *
+ * @param[in] error The error to return for a call on a busy channel.
+ *
+ * @pre Neither rx_Init nor rx_InitHost have been called yet
+ */
+void
+rx_SetBusyChannelError(afs_int32 error)
+{
+    osi_Assert(rxinit_status != 0);
+    rxi_busyChannelError = error;
+}
+
 /* called with unincremented nRequestsRunning to see if it is OK to start
  * a new thread in this service.  Could be "no" for two reasons: over the
  * max quota, or would prevent others from reaching their min quota.
@@ -677,7 +705,7 @@ QuotaOK(struct rx_service *aservice)
 /* Called by rx_StartServer to start up lwp's to service calls.
    NExistingProcs gives the number of procs already existing, and which
    therefore needn't be created. */
-void
+static void
 rxi_StartServerProcs(int nExistingProcs)
 {
     struct rx_service *service;
@@ -924,7 +952,7 @@ int rxi_lowConnRefCount = 0;
  * Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
  * NOTE: must not be called with rx_connHashTable_lock held.
  */
-void
+static void
 rxi_CleanupConnection(struct rx_connection *conn)
 {
     /* Notify the service exporter, if requested, that this connection
@@ -1438,7 +1466,7 @@ rx_NewCall(struct rx_connection *conn)
     return call;
 }
 
-int
+static int
 rxi_HasActiveCalls(struct rx_connection *aconn)
 {
     int i;
@@ -1774,7 +1802,6 @@ rx_GetCall(int tno, struct rx_service *cur_service, osi_socket * socketp)
     struct rx_serverQueueEntry *sq;
     struct rx_call *call = (struct rx_call *)0;
     struct rx_service *service = NULL;
-    SPLVAR;
 
     MUTEX_ENTER(&freeSQEList_lock);
 
@@ -2360,7 +2387,7 @@ rxi_PacketsUnWait(void)
 
 /* Return this process's service structure for the
  * specified socket and service */
-struct rx_service *
+static struct rx_service *
 rxi_FindService(osi_socket socket, u_short serviceId)
 {
     struct rx_service **sp;
@@ -2382,7 +2409,7 @@ static struct rx_call *rx_allCallsp = 0;
 /* Allocate a call structure, for the indicated channel of the
  * supplied connection.  The mode and state of the call must be set by
  * the caller. Returns the call with mutex locked. */
-struct rx_call *
+static struct rx_call *
 rxi_NewCall(struct rx_connection *conn, int channel)
 {
     struct rx_call *call;
@@ -2485,7 +2512,7 @@ rxi_NewCall(struct rx_connection *conn, int channel)
  * call->lock amd rx_refcnt_mutex are held upon entry.
  * haveCTLock is set when called from rxi_ReapConnections.
  */
-void
+static void
 rxi_FreeCall(struct rx_call *call, int haveCTLock)
 {
     int channel = call->channel;
@@ -2818,6 +2845,7 @@ rxi_FindConnection(osi_socket socket, afs_uint32 host,
  *      call->conn->lastBusy[call->channel] != 0)
  *
  * @pre call->lock is held
+ * @pre rxi_busyChannelError is nonzero
  *
  * @note call->lock is dropped and reacquired
  */
@@ -2882,10 +2910,10 @@ rxi_CheckBusy(struct rx_call *call)
         * rx_conn that the application thread might be able to use. We know
         * that we have the correct call since callNumber is unchanged, and we
         * know that the call is still busy. So, set the call error state to
-        * RX_CALL_TIMEOUT so the application can retry the request, presumably
-        * on a less-busy call channel. */
+        * rxi_busyChannelError so the application can retry the request,
+        * presumably on a less-busy call channel. */
 
-       rxi_CallError(call, RX_CALL_TIMEOUT);
+       rxi_CallError(call, rxi_busyChannelError);
     }
 }
 
@@ -2981,11 +3009,6 @@ rxi_ReceivePacket(struct rx_packet *np, osi_socket socket,
        return np;
     }
 
-    MUTEX_ENTER(&conn->conn_data_lock);
-    if (conn->maxSerial < np->header.serial)
-       conn->maxSerial = np->header.serial;
-    MUTEX_EXIT(&conn->conn_data_lock);
-
     /* If the connection is in an error state, send an abort packet and ignore
      * the incoming packet */
     if (conn->error) {
@@ -3940,12 +3963,18 @@ rxi_ReceiveDataPacket(struct rx_call *call,
      * received. Always send a soft ack for the last packet in
      * the server's reply.
      *
-     * If we have received all of the packets for the call
-     * immediately send an RX_PACKET_TYPE_ACKALL packet so that
-     * the peer can empty its packet queue and cancel all resend
-     * events.
+     * If there was more than one packet received for the call
+     * and we have received all of them, immediately send an
+     * RX_PACKET_TYPE_ACKALL packet so that the peer can empty
+     * its packet transmit queue and cancel all resend events.
+     *
+     * When there is only one packet in the call there is a
+     * chance that we can race with Ping ACKs sent as part of
+     * connection establishment if the udp packets are delivered
+     * out of order.  When the race occurs, a two second delay
+     * will occur while waiting for a new Ping ACK to be sent.
      */
-    if (call->flags & RX_CALL_RECEIVE_DONE) {
+    if (!isFirst && (call->flags & RX_CALL_RECEIVE_DONE)) {
         rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
         rxi_AckAll(NULL, call, 0);
     } else if (ackNeeded) {
@@ -4187,22 +4216,42 @@ rxi_ReceiveAckPacket(struct rx_call *call, struct rx_packet *np,
      * much */
     peer->outPacketSkew = skew;
 
-    /* Check for packets that no longer need to be transmitted, and
-     * discard them.  This only applies to packets positively
-     * acknowledged as having been sent to the peer's upper level.
-     * All other packets must be retained.  So only packets with
-     * sequence numbers < ap->firstPacket are candidates. */
 
     clock_GetTime(&now);
 
-    for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
-       if (tp->header.seq >= first)
-           break;
+    /* The transmit queue splits into 4 sections.
+     *
+     * The first section is packets which have now been acknowledged
+     * by a window size change in the ack. These have reached the
+     * application layer, and may be discarded. These are packets
+     * with sequence numbers < ap->firstPacket.
+     *
+     * The second section is packets which have sequence numbers in
+     * the range ap->firstPacket to ap->firstPacket + ap->nAcks. The
+     * contents of the packet's ack array determines whether these
+     * packets are acknowledged or not.
+     *
+     * The third section is packets which fall above the range
+     * addressed in the ack packet. These have not yet been received
+     * by the peer.
+     *
+     * The four section is packets which have not yet been transmitted.
+     * These packets will have a retryTime of 0.
+     */
+
+    /* First section - implicitly acknowledged packets that can be
+     * disposed of
+     */
+
+    tp = queue_First(&call->tq, rx_packet);
+    while(!queue_IsEnd(&call->tq, tp) && tp->header.seq < first) {
+       struct rx_packet *next;
+
+       next = queue_Next(tp, rx_packet);
        call->tfirst = tp->header.seq + 1;
 
        if (!(tp->flags & RX_PKTFLAG_ACKED)) {
            newAckCount++;
-
            rxi_ComputeRoundTripTime(tp, ap, call->conn->peer, &now);
        }
 
@@ -4239,6 +4288,7 @@ rxi_ReceiveAckPacket(struct rx_call *call, struct rx_packet *np,
 #endif /* RXDEBUG_PACKET */
            rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
        }
+       tp = next;
     }
 
 #ifdef ADAPT_WINDOW
@@ -4250,7 +4300,10 @@ rxi_ReceiveAckPacket(struct rx_call *call, struct rx_packet *np,
 
     /* N.B. we don't turn off any timers here.  They'll go away by themselves, anyway */
 
-    /* Now go through explicit acks/nacks and record the results in
+    /* Second section of the queue - packets for which we are receiving
+     * soft ACKs
+     *
+     * Go through the explicit acks/nacks and record the results in
      * the waiting packets.  These are packets that can't be released
      * yet, even with a positive acknowledge.  This positive
      * acknowledge only means the packet has been received by the
@@ -4260,46 +4313,31 @@ rxi_ReceiveAckPacket(struct rx_call *call, struct rx_packet *np,
      * because this packet was out of sequence) */
 
     call->nSoftAcked = 0;
-    for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
-
-       /* Set the acknowledge flag per packet based on the
+    missing = 0;
+    while (!queue_IsEnd(&call->tq, tp) && tp->header.seq < first + nAcks) {
+       /* Set the acknowledge flag per packet based on the
         * information in the ack packet. An acknowlegded packet can
         * be downgraded when the server has discarded a packet it
         * soacked previously, or when an ack packet is received
         * out of sequence. */
-       if (tp->header.seq < first) {
-           /* Implicit ack information */
+       if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
            if (!(tp->flags & RX_PKTFLAG_ACKED)) {
                newAckCount++;
-           }
-           tp->flags |= RX_PKTFLAG_ACKED;
-       } else if (tp->header.seq < first + nAcks) {
-           /* Explicit ack information:  set it in the packet appropriately */
-           if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
-               if (!(tp->flags & RX_PKTFLAG_ACKED)) {
-                   newAckCount++;
-                   tp->flags |= RX_PKTFLAG_ACKED;
+               tp->flags |= RX_PKTFLAG_ACKED;
 
-                   rxi_ComputeRoundTripTime(tp, ap, call->conn->peer, &now);
+               rxi_ComputeRoundTripTime(tp, ap, call->conn->peer, &now);
 #ifdef ADAPT_WINDOW
-                   rxi_ComputeRate(call->conn->peer, call, tp, np,
-                                   ap->reason);
+               rxi_ComputeRate(call->conn->peer, call, tp, np, ap->reason);
 #endif
-               }
-               if (missing) {
-                   nNacked++;
-               } else {
-                   call->nSoftAcked++;
-               }
-           } else /* RX_ACK_TYPE_NACK */ {
-               tp->flags &= ~RX_PKTFLAG_ACKED;
-               missing = 1;
            }
-       } else {
-           if (tp->flags & RX_PKTFLAG_ACKED) {
-               tp->flags &= ~RX_PKTFLAG_ACKED;
-               missing = 1;
+           if (missing) {
+               nNacked++;
+           } else {
+               call->nSoftAcked++;
            }
+       } else /* RX_ACK_TYPE_NACK */ {
+           tp->flags &= ~RX_PKTFLAG_ACKED;
+           missing = 1;
        }
 
         /*
@@ -4328,8 +4366,27 @@ rxi_ReceiveAckPacket(struct rx_call *call, struct rx_packet *np,
            /* shift by eight because one quarter-sec ~ 256 milliseconds */
            clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
        }
+
+       tp = queue_Next(tp, rx_packet);
+    }
+
+    /* The third case, packets which the ack packet tells us
+     * nothing about at all. We just need to adjust the retryTime to match
+     * any new timeouts that have been calculated for this peer.
+     * We use the fact that we send in order to terminate this loop as soon as
+     * we find a packet that has not been sent.
+     */
+
+    while (!queue_IsEnd(&call->tq, tp) && !clock_IsZero(&tp->retryTime)) {
+       tp->retryTime = tp->timeSent;
+       clock_Add(&tp->retryTime, &peer->timeout);
+       clock_Addmsec(&tp->retryTime, ((afs_uint32) tp->backoff) << 8);
+       tp = queue_Next(tp, rx_packet);
     }
 
+    /* The fourth set of packets - those which have yet to be transmitted,
+     * we don't care about at all here */
+
     /* If the window has been extended by this acknowledge packet,
      * then wakeup a sender waiting in alloc for window space, or try
      * sending packets now, if he's been sitting on packets due to
@@ -4784,6 +4841,7 @@ rxi_AckAll(struct rxevent *event, struct rx_call *call, char *dummy)
     }
     rxi_SendSpecial(call, call->conn, (struct rx_packet *)0,
                    RX_PACKET_TYPE_ACKALL, NULL, 0, 0);
+    call->flags |= RX_CALL_ACKALL_SENT;
     if (event)
        MUTEX_EXIT(&call->lock);
 #else /* RX_ENABLE_LOCKS */
@@ -4791,6 +4849,7 @@ rxi_AckAll(struct rxevent *event, struct rx_call *call, char *dummy)
        call->delayedAckEvent = NULL;
     rxi_SendSpecial(call, call->conn, (struct rx_packet *)0,
                    RX_PACKET_TYPE_ACKALL, NULL, 0, 0);
+    call->flags |= RX_CALL_ACKALL_SENT;
 #endif /* RX_ENABLE_LOCKS */
 }
 
@@ -5373,7 +5432,21 @@ rxi_SendAck(struct rx_call *call,
     ap->serial = htonl(serial);
     ap->maxSkew = 0;           /* used to be peer->inPacketSkew */
 
-    ap->firstPacket = htonl(call->rnext);      /* First packet not yet forwarded to reader */
+    /*
+     * First packet not yet forwarded to reader. When ACKALL has been
+     * sent the peer has been told that all received packets will be
+     * delivered to the reader.  The value 'rnext' is used internally
+     * to refer to the next packet in the receive queue that must be
+     * delivered to the reader.  From the perspective of the peer it
+     * already has so report the last sequence number plus one if there
+     * are packets in the receive queue awaiting processing.
+     */
+    if ((call->flags & RX_CALL_ACKALL_SENT) &&
+        !queue_IsEmpty(&call->rq)) {
+        ap->firstPacket = htonl(queue_Last(&call->rq, rx_packet)->header.seq + 1);
+    } else
+        ap->firstPacket = htonl(call->rnext);
+
     ap->previousPacket = htonl(call->rprev);   /* Previous packet received */
 
     /* No fear of running out of ack packet here because there can only be at most
@@ -5789,7 +5862,7 @@ rxi_Start(struct rxevent *event,
         MUTEX_EXIT(&rx_refcnt_mutex);
        call->resendEvent = NULL;
 
-       if ((call->flags & RX_CALL_PEER_BUSY)) {
+       if (rxi_busyChannelError && (call->flags & RX_CALL_PEER_BUSY)) {
            rxi_CheckBusy(call);
        }
 
@@ -6745,11 +6818,16 @@ rxi_ComputeRoundTripTime(struct rx_packet *p,
        peer->rtt = _8THMSEC(&thisRtt) + 8;
        peer->rtt_dev = peer->rtt >> 2; /* rtt/2: they're scaled differently */
     }
-    /* the timeout is RTT + 4*MDEV + rx_minPeerTimeout msec.
-     * This is because one end or the other of these connections is usually
-     * in a user process, and can be switched and/or swapped out.  So on fast,
-     * reliable networks, the timeout would otherwise be too short. */
-    rtt_timeout = ((peer->rtt >> 3) + peer->rtt_dev) + rx_minPeerTimeout;
+    /* the smoothed RTT time is RTT + 4*MDEV
+     *
+     * We allow a user specified minimum to be set for this, to allow clamping
+     * at a minimum value in the same way as TCP. In addition, we have to allow
+     * for the possibility that this packet is answered by a delayed ACK, so we
+     * add on a fixed 200ms to account for that timer expiring.
+     */
+
+    rtt_timeout = MAX(((peer->rtt >> 3) + peer->rtt_dev),
+                     rx_minPeerTimeout) + 200;
     clock_Zero(&(peer->timeout));
     clock_Addmsec(&(peer->timeout), rtt_timeout);
 
@@ -7406,14 +7484,14 @@ MakeDebugCall(osi_socket socket, afs_uint32 remoteAddr, afs_uint16 remotePort,
                   (struct sockaddr *)&taddr, sizeof(struct sockaddr_in));
 
        /* see if there's a packet available */
-       gettimeofday(&tv_wake,0);
+       gettimeofday(&tv_wake, NULL);
        tv_wake.tv_sec += waitTime;
        for (;;) {
            FD_ZERO(&imask);
            FD_SET(socket, &imask);
            tv_delta.tv_sec = tv_wake.tv_sec;
            tv_delta.tv_usec = tv_wake.tv_usec;
-           gettimeofday(&tv_now, 0);
+           gettimeofday(&tv_now, NULL);
 
            if (tv_delta.tv_usec < tv_now.tv_usec) {
                /* borrow */