rx: reset packet header userStatus field on reuse
[openafs.git] / src / rx / rx_packet.c
index 2c51036..9340da3 100644 (file)
 #include "rx_conn.h"
 #include "rx_call.h"
 
+/*!
+ * \brief structure used to keep track of allocated packets
+ */
+struct rx_mallocedPacket {
+    struct opr_queue entry;    /*!< chained using opr_queue */
+    struct rx_packet *addr;    /*!< address of the first element */
+    afs_uint32 size;           /*!< array size in bytes */
+};
+
 #ifdef RX_LOCKS_DB
 /* rxdb_fileID is used to identify the lock location, along with line#. */
 static int rxdb_fileID = RXDB_FILE_RX_PACKET;
@@ -536,6 +545,33 @@ rxi_AllocDataBuf(struct rx_packet *p, int nb, int class)
     return nb;
 }
 
+/**
+ * Register allocated packets.
+ *
+ * @param[in] addr array of packets
+ * @param[in] npkt number of packets
+ *
+ * @return none
+ */
+static void
+registerPackets(struct rx_packet *addr, afs_uint32 npkt)
+{
+    struct rx_mallocedPacket *mp;
+
+    mp = osi_Alloc(sizeof(*mp));
+
+    osi_Assert(mp != NULL);
+    memset(mp, 0, sizeof(*mp));
+
+    mp->addr = addr;
+    mp->size = npkt * sizeof(struct rx_packet);
+    osi_Assert(npkt <= MAX_AFS_UINT32 / sizeof(struct rx_packet));
+
+    MUTEX_ENTER(&rx_mallocedPktQ_lock);
+    opr_queue_Append(&rx_mallocedPacketQueue, &mp->entry);
+    MUTEX_EXIT(&rx_mallocedPktQ_lock);
+}
+
 /* Add more packet buffers */
 #ifdef RX_ENABLE_TSFPQ
 void
@@ -549,6 +585,7 @@ rxi_MorePackets(int apackets)
     getme = apackets * sizeof(struct rx_packet);
     p = osi_Alloc(getme);
     osi_Assert(p);
+    registerPackets(p, apackets);
 
     PIN(p, getme);             /* XXXXX */
     memset(p, 0, getme);
@@ -603,6 +640,7 @@ rxi_MorePackets(int apackets)
     getme = apackets * sizeof(struct rx_packet);
     p = osi_Alloc(getme);
     osi_Assert(p);
+    registerPackets(p, apackets);
 
     PIN(p, getme);             /* XXXXX */
     memset(p, 0, getme);
@@ -645,6 +683,7 @@ rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local)
 
     getme = apackets * sizeof(struct rx_packet);
     p = osi_Alloc(getme);
+    registerPackets(p, apackets);
 
     PIN(p, getme);             /* XXXXX */
     memset(p, 0, getme);
@@ -713,6 +752,7 @@ rxi_MorePacketsNoLock(int apackets)
         }
     } while(p == NULL);
     memset(p, 0, getme);
+    registerPackets(p, apackets);
 
 #ifdef RX_ENABLE_TSFPQ
     RX_TS_INFO_GET(rx_ts_info);
@@ -749,11 +789,19 @@ rxi_MorePacketsNoLock(int apackets)
 void
 rxi_FreeAllPackets(void)
 {
-    /* must be called at proper interrupt level, etcetera */
-    /* MTUXXX need to free all Packets */
-    osi_Free(rx_mallocedP,
-            (rx_maxReceiveWindow + 2) * sizeof(struct rx_packet));
-    UNPIN(rx_mallocedP, (rx_maxReceiveWindow + 2) * sizeof(struct rx_packet));
+    struct rx_mallocedPacket *mp;
+
+    MUTEX_ENTER(&rx_mallocedPktQ_lock);
+
+    while (!opr_queue_IsEmpty(&rx_mallocedPacketQueue)) {
+       mp = opr_queue_First(&rx_mallocedPacketQueue,
+                            struct rx_mallocedPacket, entry);
+       opr_queue_Remove(&mp->entry);
+       osi_Free(mp->addr, mp->size);
+       UNPIN(mp->addr, mp->size);
+       osi_Free(mp, sizeof(*mp));
+    }
+    MUTEX_EXIT(&rx_mallocedPktQ_lock);
 }
 
 #ifdef RX_ENABLE_TSFPQ
@@ -1550,6 +1598,7 @@ rxi_SplitJumboPacket(struct rx_packet *p, afs_uint32 host, short port,
     np->header = p->header;
     np->header.serial = p->header.serial + 1;
     np->header.seq = p->header.seq + 1;
+    np->header.userStatus = 0;
     np->header.flags = jp->flags;
     np->header.spare = jp->cksum;
 
@@ -2006,16 +2055,16 @@ rxi_ReceiveDebugPacket(struct rx_packet *ap, osi_socket asocket,
                return ap;
 
            /* Since its all int32s convert to network order with a loop. */
-        if (rx_stats_active)
-           MUTEX_ENTER(&rx_stats_mutex);
+           if (rx_stats_active)
+               MUTEX_ENTER(&rx_stats_mutex);
            s = (afs_int32 *) & rx_stats;
            for (i = 0; i < sizeof(rx_stats) / sizeof(afs_int32); i++, s++)
                rx_PutInt32(ap, i * sizeof(afs_int32), htonl(*s));
 
            tl = ap->length;
            ap->length = sizeof(rx_stats);
-        if (rx_stats_active)
-           MUTEX_EXIT(&rx_stats_mutex);
+           if (rx_stats_active)
+               MUTEX_EXIT(&rx_stats_mutex);
            rxi_SendDebugPacket(ap, asocket, ahost, aport, istack);
            ap->length = tl;
            break;
@@ -2504,7 +2553,8 @@ rxi_SendPacketList(struct rx_call *call, struct rx_connection *conn,
 /* Send a raw abort packet, without any call or connection structures */
 void
 rxi_SendRawAbort(osi_socket socket, afs_uint32 host, u_short port,
-                afs_int32 error, struct rx_packet *source, int istack)
+                afs_uint32 serial, afs_int32 error,
+                struct rx_packet *source, int istack)
 {
     struct rx_header theader;
     struct sockaddr_in addr;
@@ -2513,7 +2563,7 @@ rxi_SendRawAbort(osi_socket socket, afs_uint32 host, u_short port,
     memset(&theader, 0, sizeof(theader));
     theader.epoch = htonl(source->header.epoch);
     theader.callNumber = htonl(source->header.callNumber);
-    theader.serial = htonl(1);
+    theader.serial = htonl(serial);
     theader.type = RX_PACKET_TYPE_ABORT;
     theader.serviceId = htons(source->header.serviceId);
     theader.securityIndex = source->header.securityIndex;
@@ -2596,6 +2646,7 @@ rxi_SendSpecial(struct rx_call *call,
     p->header.seq = 0;
     p->header.epoch = conn->epoch;
     p->header.type = type;
+    p->header.userStatus = 0;
     p->header.flags = 0;
     if (conn->type == RX_CLIENT_CONNECTION)
        p->header.flags |= RX_CLIENT_INITIATED;
@@ -2718,6 +2769,7 @@ rxi_PrepareSendPacket(struct rx_call *call,
     p->header.seq = seq;
     p->header.epoch = conn->epoch;
     p->header.type = RX_PACKET_TYPE_DATA;
+    p->header.userStatus = 0;
     p->header.flags = 0;
     p->header.spare = 0;
     if (conn->type == RX_CLIENT_CONNECTION)