rx: Reorganise transmit queue walk
authorSimon Wilkinson <sxw@your-file-system.com>
Sun, 5 Jun 2011 10:04:12 +0000 (11:04 +0100)
committerJeffrey Altman <jaltman@openafs.org>
Tue, 7 Jun 2011 15:55:28 +0000 (08:55 -0700)
The transmit queue is stored in the order that we transmitted the
packets (by sequence number). This means that we can do all of the
ACK processing by just doing a single walk of this queue, rather
than having to walk the queue multiple times, once for each type of
ACK.

This clarifies the queue processing, and should reduce the amount of
time that we spending iterating large transmit queues.

Change-Id: I59578956e81197bbea7ce496e2f520a2995a3e95
Reviewed-on: http://gerrit.openafs.org/4796
Tested-by: Jeffrey Altman <jaltman@openafs.org>
Reviewed-by: Jeffrey Altman <jaltman@openafs.org>

src/rx/rx.c

index 97ede25..b3a6f5a 100644 (file)
@@ -4206,22 +4206,42 @@ rxi_ReceiveAckPacket(struct rx_call *call, struct rx_packet *np,
      * much */
     peer->outPacketSkew = skew;
 
-    /* Check for packets that no longer need to be transmitted, and
-     * discard them.  This only applies to packets positively
-     * acknowledged as having been sent to the peer's upper level.
-     * All other packets must be retained.  So only packets with
-     * sequence numbers < ap->firstPacket are candidates. */
 
     clock_GetTime(&now);
 
-    for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
-       if (tp->header.seq >= first)
-           break;
+    /* The transmit queue splits into 4 sections.
+     *
+     * The first section is packets which have now been acknowledged
+     * by a window size change in the ack. These have reached the
+     * application layer, and may be discarded. These are packets
+     * with sequence numbers < ap->firstPacket.
+     *
+     * The second section is packets which have sequence numbers in
+     * the range ap->firstPacket to ap->firstPacket + ap->nAcks. The
+     * contents of the packet's ack array determines whether these
+     * packets are acknowledged or not.
+     *
+     * The third section is packets which fall above the range
+     * addressed in the ack packet. These have not yet been received
+     * by the peer.
+     *
+     * The four section is packets which have not yet been transmitted.
+     * These packets will have a retryTime of 0.
+     */
+
+    /* First section - implicitly acknowledged packets that can be
+     * disposed of
+     */
+
+    tp = queue_First(&call->tq, rx_packet);
+    while(!queue_IsEnd(&call->tq, tp) && tp->header.seq < first) {
+       struct rx_packet *next;
+
+       next = queue_Next(tp, rx_packet);
        call->tfirst = tp->header.seq + 1;
 
        if (!(tp->flags & RX_PKTFLAG_ACKED)) {
            newAckCount++;
-
            rxi_ComputeRoundTripTime(tp, ap, call->conn->peer, &now);
        }
 
@@ -4258,6 +4278,7 @@ rxi_ReceiveAckPacket(struct rx_call *call, struct rx_packet *np,
 #endif /* RXDEBUG_PACKET */
            rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
        }
+       tp = next;
     }
 
 #ifdef ADAPT_WINDOW
@@ -4269,7 +4290,10 @@ rxi_ReceiveAckPacket(struct rx_call *call, struct rx_packet *np,
 
     /* N.B. we don't turn off any timers here.  They'll go away by themselves, anyway */
 
-    /* Now go through explicit acks/nacks and record the results in
+    /* Second section of the queue - packets for which we are receiving
+     * soft ACKs
+     *
+     * Go through the explicit acks/nacks and record the results in
      * the waiting packets.  These are packets that can't be released
      * yet, even with a positive acknowledge.  This positive
      * acknowledge only means the packet has been received by the
@@ -4279,46 +4303,31 @@ rxi_ReceiveAckPacket(struct rx_call *call, struct rx_packet *np,
      * because this packet was out of sequence) */
 
     call->nSoftAcked = 0;
-    for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
-
-       /* Set the acknowledge flag per packet based on the
+    missing = 0;
+    while (!queue_IsEnd(&call->tq, tp) && tp->header.seq < first + nAcks) {
+       /* Set the acknowledge flag per packet based on the
         * information in the ack packet. An acknowlegded packet can
         * be downgraded when the server has discarded a packet it
         * soacked previously, or when an ack packet is received
         * out of sequence. */
-       if (tp->header.seq < first) {
-           /* Implicit ack information */
+       if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
            if (!(tp->flags & RX_PKTFLAG_ACKED)) {
                newAckCount++;
-           }
-           tp->flags |= RX_PKTFLAG_ACKED;
-       } else if (tp->header.seq < first + nAcks) {
-           /* Explicit ack information:  set it in the packet appropriately */
-           if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
-               if (!(tp->flags & RX_PKTFLAG_ACKED)) {
-                   newAckCount++;
-                   tp->flags |= RX_PKTFLAG_ACKED;
+               tp->flags |= RX_PKTFLAG_ACKED;
 
-                   rxi_ComputeRoundTripTime(tp, ap, call->conn->peer, &now);
+               rxi_ComputeRoundTripTime(tp, ap, call->conn->peer, &now);
 #ifdef ADAPT_WINDOW
-                   rxi_ComputeRate(call->conn->peer, call, tp, np,
-                                   ap->reason);
+               rxi_ComputeRate(call->conn->peer, call, tp, np, ap->reason);
 #endif
-               }
-               if (missing) {
-                   nNacked++;
-               } else {
-                   call->nSoftAcked++;
-               }
-           } else /* RX_ACK_TYPE_NACK */ {
-               tp->flags &= ~RX_PKTFLAG_ACKED;
-               missing = 1;
            }
-       } else {
-           if (tp->flags & RX_PKTFLAG_ACKED) {
-               tp->flags &= ~RX_PKTFLAG_ACKED;
-               missing = 1;
+           if (missing) {
+               nNacked++;
+           } else {
+               call->nSoftAcked++;
            }
+       } else /* RX_ACK_TYPE_NACK */ {
+           tp->flags &= ~RX_PKTFLAG_ACKED;
+           missing = 1;
        }
 
         /*
@@ -4347,8 +4356,27 @@ rxi_ReceiveAckPacket(struct rx_call *call, struct rx_packet *np,
            /* shift by eight because one quarter-sec ~ 256 milliseconds */
            clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
        }
+
+       tp = queue_Next(tp, rx_packet);
+    }
+
+    /* The third case, packets which the ack packet tells us
+     * nothing about at all. We just need to adjust the retryTime to match
+     * any new timeouts that have been calculated for this peer.
+     * We use the fact that we send in order to terminate this loop as soon as
+     * we find a packet that has not been sent.
+     */
+
+    while (!queue_IsEnd(&call->tq, tp) && !clock_IsZero(&tp->retryTime)) {
+       tp->retryTime = tp->timeSent;
+       clock_Add(&tp->retryTime, &peer->timeout);
+       clock_Addmsec(&tp->retryTime, ((afs_uint32) tp->backoff) << 8);
+       tp = queue_Next(tp, rx_packet);
     }
 
+    /* The fourth set of packets - those which have yet to be transmitted,
+     * we don't care about at all here */
+
     /* If the window has been extended by this acknowledge packet,
      * then wakeup a sender waiting in alloc for window space, or try
      * sending packets now, if he's been sitting on packets due to