rx: Rework code which pulls packet from recv queue
authorSimon Wilkinson <sxw@your-file-system.com>
Sun, 8 Jan 2012 11:45:57 +0000 (11:45 +0000)
committerDerrick Brashear <shadow@dementix.org>
Sat, 14 Apr 2012 00:42:24 +0000 (17:42 -0700)
Both rxi_ReadProc and rxi_FillReadVector contained copies of the
same code to pull a packet out of the receive queue, and turn it
into the call's currentPacket. Abstract this out into a single common
function, so we're not maintaining the same code in two different
places.

Change-Id: I20af6b4ff19f05e21ffde1a80609be12ad6cfeee
Reviewed-on: http://gerrit.openafs.org/7195
Reviewed-by: Jeffrey Altman <jaltman@secure-endpoints.com>
Tested-by: BuildBot <buildbot@rampaginggeek.com>
Reviewed-by: Derrick Brashear <shadow@dementix.org>

src/rx/rx_rdwr.c

index a711572..b95bb6f 100644 (file)
 /* rxdb_fileID is used to identify the lock location, along with line#. */
 static int rxdb_fileID = RXDB_FILE_RX_RDWR;
 #endif /* RX_LOCKS_DB */
+
+/* Get the next packet in the receive queue
+ *
+ * Dispose of the call's currentPacket, and move the next packet in the
+ * receive queue into the currentPacket field. If the next packet isn't
+ * available, then currentPacket is left NULL.
+ *
+ * @param call
+ *     The RX call to manipulate
+ * @returns
+ *     0 on success, an error code on failure
+ *
+ * @notes
+ *     Must be called with the call locked. Unlocks the call if returning
+ *     with an error.
+ */
+
+static int
+rxi_GetNextPacket(struct rx_call *call) {
+    struct rx_packet *rp;
+    int error;
+
+    if (call->currentPacket != NULL) {
+#ifdef RX_TRACK_PACKETS
+       call->currentPacket->flags |= RX_PKTFLAG_CP;
+#endif
+       rxi_FreePacket(call->currentPacket);
+       call->currentPacket = NULL;
+    }
+
+    if (queue_IsEmpty(&call->rq))
+       return 0;
+
+    /* Check that next packet available is next in sequence */
+    rp = queue_First(&call->rq, rx_packet);
+    if (rp->header.seq != call->rnext)
+       return 0;
+
+    queue_Remove(rp);
+#ifdef RX_TRACK_PACKETS
+    rp->flags &= ~RX_PKTFLAG_RQ;
+#endif
+#ifdef RXDEBUG_PACKET
+    call->rqc--;
+#endif /* RXDEBUG_PACKET */
+
+    /* RXS_CheckPacket called to undo RXS_PreparePacket's work.  It may
+     * reduce the length of the packet by up to conn->maxTrailerSize,
+     * to reflect the length of the data + the header. */
+    if ((error = RXS_CheckPacket(call->conn->securityObject, call, rp))) {
+       /* Used to merely shut down the call, but now we shut down the whole
+        * connection since this may indicate an attempt to hijack it */
+
+       MUTEX_EXIT(&call->lock);
+       rxi_ConnectionError(call->conn, error);
+       MUTEX_ENTER(&call->conn->conn_data_lock);
+       rp = rxi_SendConnectionAbort(call->conn, rp, 0, 0);
+       MUTEX_EXIT(&call->conn->conn_data_lock);
+       rxi_FreePacket(rp);
+
+       return error;
+     }
+
+    call->rnext++;
+    call->currentPacket = rp;
+#ifdef RX_TRACK_PACKETS
+    call->currentPacket->flags |= RX_PKTFLAG_CP;
+#endif
+    call->curvec = 1;  /* 0th vec is always header */
+
+    /* begin at the beginning [ more or less ], continue on until the end,
+     * then stop. */
+    call->curpos = (char *)call->currentPacket->wirevec[1].iov_base +
+                  call->conn->securityHeaderSize;
+    call->curlen = call->currentPacket->wirevec[1].iov_len -
+                  call->conn->securityHeaderSize;
+
+    call->nLeft = call->currentPacket->length;
+    hadd32(call->bytesRcvd, call->currentPacket->length);
+
+    call->nHardAcks++;
+
+    return 0;
+}
+
 /* rxi_ReadProc -- internal version.
  *
  * LOCKS USED -- called at netpri
@@ -77,8 +162,8 @@ int
 rxi_ReadProc(struct rx_call *call, char *buf,
             int nbytes)
 {
-    struct rx_packet *rp;
     int requestCount;
+    int code;
     unsigned int t;
 
 /* XXXX took out clock_NewTime from here.  Was it needed? */
@@ -110,83 +195,23 @@ rxi_ReadProc(struct rx_call *call, char *buf,
                        continue;
                    }
                }
-               if (queue_IsNotEmpty(&call->rq)) {
-                   /* Check that next packet available is next in sequence */
-                   rp = queue_First(&call->rq, rx_packet);
-                   if (rp->header.seq == call->rnext) {
-                       afs_int32 error;
-                       struct rx_connection *conn = call->conn;
-                       queue_Remove(rp);
-#ifdef RX_TRACK_PACKETS
-                       rp->flags &= ~RX_PKTFLAG_RQ;
-#endif
-#ifdef RXDEBUG_PACKET
-                        call->rqc--;
-#endif /* RXDEBUG_PACKET */
 
-                       /* RXS_CheckPacket called to undo RXS_PreparePacket's
-                        * work.  It may reduce the length of the packet by up
-                        * to conn->maxTrailerSize, to reflect the length of the
-                        * data + the header. */
-                       if ((error =
-                            RXS_CheckPacket(conn->securityObject, call,
-                                            rp))) {
-                           /* Used to merely shut down the call, but now we
-                            * shut down the whole connection since this may
-                            * indicate an attempt to hijack it */
-
-                           MUTEX_EXIT(&call->lock);
-                           rxi_ConnectionError(conn, error);
-                           MUTEX_ENTER(&conn->conn_data_lock);
-                           rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
-                           MUTEX_EXIT(&conn->conn_data_lock);
-                           rxi_FreePacket(rp);
-
-                           return 0;
-                       }
-                       call->rnext++;
-                       call->currentPacket = rp;
-#ifdef RX_TRACK_PACKETS
-                       call->currentPacket->flags |= RX_PKTFLAG_CP;
-#endif
-                       call->curvec = 1;       /* 0th vec is always header */
-                       /* begin at the beginning [ more or less ], continue
-                        * on until the end, then stop. */
-                       call->curpos =
-                           (char *) call->currentPacket->wirevec[1].iov_base +
-                           call->conn->securityHeaderSize;
-                       call->curlen =
-                           call->currentPacket->wirevec[1].iov_len -
-                           call->conn->securityHeaderSize;
-
-                       /* Notice that this code works correctly if the data
-                        * size is 0 (which it may be--no reply arguments from
-                        * server, for example).  This relies heavily on the
-                        * fact that the code below immediately frees the packet
-                        * (no yields, etc.).  If it didn't, this would be a
-                        * problem because a value of zero for call->nLeft
-                        * normally means that there is no read packet */
-                       call->nLeft = call->currentPacket->length;
-                       hadd32(call->bytesRcvd, call->currentPacket->length);
-
-                       /* Send a hard ack for every rxi_HardAckRate+1 packets
-                        * consumed. Otherwise schedule an event to send
-                        * the hard ack later on.
-                        */
-                       call->nHardAcks++;
-                       if (!(call->flags & RX_CALL_RECEIVE_DONE)) {
-                           if (call->nHardAcks > (u_short) rxi_HardAckRate) {
-                               rxevent_Cancel(&call->delayedAckEvent, call,
-                                              RX_CALL_REFCOUNT_DELAY);
-                               rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
-                           } else {
-                               /* Delay to consolidate ack packets */
-                               rxi_PostDelayedAckEvent(call,
-                                                       &rx_hardAckDelay);
-                           }
+               code = rxi_GetNextPacket(call);
+               if (code)
+                    return 0;
+
+               if (call->currentPacket) {
+                   if (!(call->flags & RX_CALL_RECEIVE_DONE)) {
+                       if (call->nHardAcks > (u_short) rxi_HardAckRate) {
+                           rxevent_Cancel(&call->delayedAckEvent, call,
+                                          RX_CALL_REFCOUNT_DELAY);
+                           rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
+                       } else {
+                           /* Delay to consolidate ack packets */
+                           rxi_PostDelayedAckEvent(call, &rx_hardAckDelay);
                        }
-                       break;
                    }
+                   break;
                }
 
                 /*
@@ -371,8 +396,8 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
 {
     int didConsume = 0;
     int didHardAck = 0;
+    int code;
     unsigned int t;
-    struct rx_packet *rp;
     struct iovec *call_iov;
     struct iovec *cur_iov = NULL;
 
@@ -384,76 +409,19 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
     while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) {
        if (call->nLeft == 0) {
            /* Get next packet */
-           if (queue_IsNotEmpty(&call->rq)) {
-               /* Check that next packet available is next in sequence */
-               rp = queue_First(&call->rq, rx_packet);
-               if (rp->header.seq == call->rnext) {
-                   afs_int32 error;
-                   struct rx_connection *conn = call->conn;
-                   queue_Remove(rp);
-#ifdef RX_TRACK_PACKETS
-                   rp->flags &= ~RX_PKTFLAG_RQ;
-#endif
-#ifdef RXDEBUG_PACKET
-                    call->rqc--;
-#endif /* RXDEBUG_PACKET */
-
-                   /* RXS_CheckPacket called to undo RXS_PreparePacket's
-                    * work.  It may reduce the length of the packet by up
-                    * to conn->maxTrailerSize, to reflect the length of the
-                    * data + the header. */
-                   if ((error =
-                        RXS_CheckPacket(conn->securityObject, call, rp))) {
-                       /* Used to merely shut down the call, but now we
-                        * shut down the whole connection since this may
-                        * indicate an attempt to hijack it */
+           code = rxi_GetNextPacket(call);
+           if (code) {
+               MUTEX_ENTER(&call->lock);
+               return 1;
+           }
 
-                       MUTEX_EXIT(&call->lock);
-                       rxi_ConnectionError(conn, error);
-                       MUTEX_ENTER(&conn->conn_data_lock);
-                       rp = rxi_SendConnectionAbort(conn, rp, 0, 0);
-                       MUTEX_EXIT(&conn->conn_data_lock);
-                       rxi_FreePacket(rp);
-                       MUTEX_ENTER(&call->lock);
-
-                       return 1;
-                   }
-                   call->rnext++;
-                   call->currentPacket = rp;
-#ifdef RX_TRACK_PACKETS
-                   call->currentPacket->flags |= RX_PKTFLAG_CP;
-#endif
-                   call->curvec = 1;   /* 0th vec is always header */
-                   cur_iov = &call->currentPacket->wirevec[1];
-                   /* begin at the beginning [ more or less ], continue
-                    * on until the end, then stop. */
-                   call->curpos =
-                       (char *)call->currentPacket->wirevec[1].iov_base +
-                       call->conn->securityHeaderSize;
-                   call->curlen =
-                       call->currentPacket->wirevec[1].iov_len -
-                       call->conn->securityHeaderSize;
-
-                   /* Notice that this code works correctly if the data
-                    * size is 0 (which it may be--no reply arguments from
-                    * server, for example).  This relies heavily on the
-                    * fact that the code below immediately frees the packet
-                    * (no yields, etc.).  If it didn't, this would be a
-                    * problem because a value of zero for call->nLeft
-                    * normally means that there is no read packet */
-                   call->nLeft = call->currentPacket->length;
-                   hadd32(call->bytesRcvd, call->currentPacket->length);
-
-                   /* Send a hard ack for every rxi_HardAckRate+1 packets
-                    * consumed. Otherwise schedule an event to send
-                    * the hard ack later on.
-                    */
-                   call->nHardAcks++;
-                   didConsume = 1;
-                   continue;
-               }
+           if (call->currentPacket) {
+               cur_iov = &call->currentPacket->wirevec[1];
+               didConsume = 1;
+               continue;
+           } else {
+               break;
            }
-           break;
        }
 
        /* It's possible for call->nLeft to be smaller than any particular