rx: Introduce rxi_NetSend
[openafs.git] / src / rx / rx_packet.c
index 28441ec..d9822af 100644 (file)
@@ -31,7 +31,7 @@
 #  endif
 #  include "h/socket.h"
 #  if !defined(AFS_SUN5_ENV) &&  !defined(AFS_LINUX20_ENV) && !defined(AFS_HPUX110_ENV)
-#   if !defined(AFS_OSF_ENV) && !defined(AFS_AIX41_ENV)
+#   if !defined(AFS_AIX41_ENV)
 #    include "sys/mount.h"             /* it gets pulled in by something later anyway */
 #   endif
 #   include "h/mbuf.h"
 #include "rx_conn.h"
 #include "rx_call.h"
 
+/*!
+ * \brief structure used to keep track of allocated packets
+ */
+struct rx_mallocedPacket {
+    struct opr_queue entry;    /*!< chained using opr_queue */
+    struct rx_packet *addr;    /*!< address of the first element */
+    afs_uint32 size;           /*!< array size in bytes */
+};
+
 #ifdef RX_LOCKS_DB
 /* rxdb_fileID is used to identify the lock location, along with line#. */
 static int rxdb_fileID = RXDB_FILE_RX_PACKET;
@@ -427,6 +436,7 @@ rxi_FreePackets(int num_pkts, struct opr_queue *q)
                qlen += rxi_FreeDataBufsToQueue(p, 2, &cbs);
            }
             RX_FPQ_MARK_FREE(p);
+           num_pkts++;
        }
        if (!num_pkts)
            return 0;
@@ -535,6 +545,33 @@ rxi_AllocDataBuf(struct rx_packet *p, int nb, int class)
     return nb;
 }
 
+/**
+ * Register allocated packets.
+ *
+ * @param[in] addr array of packets
+ * @param[in] npkt number of packets
+ *
+ * @return none
+ */
+static void
+registerPackets(struct rx_packet *addr, afs_uint32 npkt)
+{
+    struct rx_mallocedPacket *mp;
+
+    mp = osi_Alloc(sizeof(*mp));
+
+    osi_Assert(mp != NULL);
+    memset(mp, 0, sizeof(*mp));
+
+    mp->addr = addr;
+    mp->size = npkt * sizeof(struct rx_packet);
+    osi_Assert(npkt <= MAX_AFS_UINT32 / sizeof(struct rx_packet));
+
+    MUTEX_ENTER(&rx_mallocedPktQ_lock);
+    opr_queue_Append(&rx_mallocedPacketQueue, &mp->entry);
+    MUTEX_EXIT(&rx_mallocedPktQ_lock);
+}
+
 /* Add more packet buffers */
 #ifdef RX_ENABLE_TSFPQ
 void
@@ -548,6 +585,7 @@ rxi_MorePackets(int apackets)
     getme = apackets * sizeof(struct rx_packet);
     p = osi_Alloc(getme);
     osi_Assert(p);
+    registerPackets(p, apackets);
 
     PIN(p, getme);             /* XXXXX */
     memset(p, 0, getme);
@@ -602,6 +640,7 @@ rxi_MorePackets(int apackets)
     getme = apackets * sizeof(struct rx_packet);
     p = osi_Alloc(getme);
     osi_Assert(p);
+    registerPackets(p, apackets);
 
     PIN(p, getme);             /* XXXXX */
     memset(p, 0, getme);
@@ -644,6 +683,7 @@ rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local)
 
     getme = apackets * sizeof(struct rx_packet);
     p = osi_Alloc(getme);
+    registerPackets(p, apackets);
 
     PIN(p, getme);             /* XXXXX */
     memset(p, 0, getme);
@@ -712,6 +752,7 @@ rxi_MorePacketsNoLock(int apackets)
         }
     } while(p == NULL);
     memset(p, 0, getme);
+    registerPackets(p, apackets);
 
 #ifdef RX_ENABLE_TSFPQ
     RX_TS_INFO_GET(rx_ts_info);
@@ -748,11 +789,19 @@ rxi_MorePacketsNoLock(int apackets)
 void
 rxi_FreeAllPackets(void)
 {
-    /* must be called at proper interrupt level, etcetera */
-    /* MTUXXX need to free all Packets */
-    osi_Free(rx_mallocedP,
-            (rx_maxReceiveWindow + 2) * sizeof(struct rx_packet));
-    UNPIN(rx_mallocedP, (rx_maxReceiveWindow + 2) * sizeof(struct rx_packet));
+    struct rx_mallocedPacket *mp;
+
+    MUTEX_ENTER(&rx_mallocedPktQ_lock);
+
+    while (!opr_queue_IsEmpty(&rx_mallocedPacketQueue)) {
+       mp = opr_queue_First(&rx_mallocedPacketQueue,
+                            struct rx_mallocedPacket, entry);
+       opr_queue_Remove(&mp->entry);
+       osi_Free(mp->addr, mp->size);
+       UNPIN(mp->addr, mp->size);
+       osi_Free(mp, sizeof(*mp));
+    }
+    MUTEX_EXIT(&rx_mallocedPktQ_lock);
 }
 
 #ifdef RX_ENABLE_TSFPQ
@@ -1116,32 +1165,6 @@ rxi_AllocPacketNoLock(int class)
 
     RX_TS_INFO_GET(rx_ts_info);
 
-#ifdef KERNEL
-    if (rxi_OverQuota(class)) {
-       rxi_NeedMorePackets = TRUE;
-        if (rx_stats_active) {
-            switch (class) {
-            case RX_PACKET_CLASS_RECEIVE:
-                rx_atomic_inc(rx_stats.receivePktAllocFailures);
-                break;
-            case RX_PACKET_CLASS_SEND:
-                rx_atomic_inc(&rx_stats.sendPktAllocFailures);
-                break;
-            case RX_PACKET_CLASS_SPECIAL:
-                rx_atomic_inc(&rx_stats.specialPktAllocFailures);
-                break;
-            case RX_PACKET_CLASS_RECV_CBUF:
-                rx_atomic_inc(&rx_stats.receiveCbufPktAllocFailures);
-                break;
-            case RX_PACKET_CLASS_SEND_CBUF:
-                rx_atomic_inc(&rx_stats.sendCbufPktAllocFailures);
-                break;
-            }
-       }
-        return (struct rx_packet *)0;
-    }
-#endif /* KERNEL */
-
     if (rx_stats_active)
         rx_atomic_inc(&rx_stats.packetRequests);
     if (opr_queue_IsEmpty(&rx_ts_info->_FPQ.queue)) {
@@ -1575,6 +1598,7 @@ rxi_SplitJumboPacket(struct rx_packet *p, afs_uint32 host, short port,
     np->header = p->header;
     np->header.serial = p->header.serial + 1;
     np->header.seq = p->header.seq + 1;
+    np->header.userStatus = 0;
     np->header.flags = jp->flags;
     np->header.spare = jp->cksum;
 
@@ -1850,6 +1874,7 @@ rxi_ReceiveDebugPacket(struct rx_packet *ap, osi_socket asocket,
                for (tc = rx_connHashTable[i]; tc; tc = tc->next) {
                    if ((all || rxi_IsConnInteresting(tc))
                        && tin.index-- <= 0) {
+                       int do_secstats = 0;
                        tconn.host = tc->peer->host;
                        tconn.port = tc->peer->port;
                        tconn.cid = htonl(tc->cid);
@@ -1859,7 +1884,7 @@ rxi_ReceiveDebugPacket(struct rx_packet *ap, osi_socket asocket,
                            tconn.callNumber[j] = htonl(tc->callNumber[j]);
                            if ((tcall = tc->call[j])) {
                                tconn.callState[j] = tcall->state;
-                               tconn.callMode[j] = tcall->mode;
+                               tconn.callMode[j] = tcall->app.mode;
                                tconn.callFlags[j] = tcall->flags;
                                if (!opr_queue_IsEmpty(&tcall->rq))
                                    tconn.callOther[j] |= RX_OTHER_IN;
@@ -1875,8 +1900,14 @@ rxi_ReceiveDebugPacket(struct rx_packet *ap, osi_socket asocket,
                        tconn.type = tc->type;
                        tconn.securityIndex = tc->securityIndex;
                        if (tc->securityObject) {
-                           RXS_GetStats(tc->securityObject, tc,
-                                        &tconn.secStats);
+                           int code;
+                           code = RXS_GetStats(tc->securityObject, tc,
+                                               &tconn.secStats);
+                           if (code == 0) {
+                               do_secstats = 1;
+                           }
+                       }
+                       if (do_secstats) {
 #define DOHTONL(a) (tconn.secStats.a = htonl(tconn.secStats.a))
 #define DOHTONS(a) (tconn.secStats.a = htons(tconn.secStats.a))
                            DOHTONL(flags);
@@ -1895,6 +1926,8 @@ rxi_ReceiveDebugPacket(struct rx_packet *ap, osi_socket asocket,
                                 sizeof(tconn.secStats.sparel) /
                                 sizeof(afs_int32); i++)
                                DOHTONL(sparel[i]);
+                       } else {
+                           memset(&tconn.secStats, 0, sizeof(tconn.secStats));
                        }
 
                        MUTEX_EXIT(&rx_connHashTable_lock);
@@ -2031,16 +2064,16 @@ rxi_ReceiveDebugPacket(struct rx_packet *ap, osi_socket asocket,
                return ap;
 
            /* Since its all int32s convert to network order with a loop. */
-        if (rx_stats_active)
-           MUTEX_ENTER(&rx_stats_mutex);
+           if (rx_stats_active)
+               MUTEX_ENTER(&rx_stats_mutex);
            s = (afs_int32 *) & rx_stats;
            for (i = 0; i < sizeof(rx_stats) / sizeof(afs_int32); i++, s++)
                rx_PutInt32(ap, i * sizeof(afs_int32), htonl(*s));
 
            tl = ap->length;
            ap->length = sizeof(rx_stats);
-        if (rx_stats_active)
-           MUTEX_EXIT(&rx_stats_mutex);
+           if (rx_stats_active)
+               MUTEX_EXIT(&rx_stats_mutex);
            rxi_SendDebugPacket(ap, asocket, ahost, aport, istack);
            ap->length = tl;
            break;
@@ -2103,6 +2136,7 @@ rxi_SendDebugPacket(struct rx_packet *apacket, osi_socket asocket,
     taddr.sin_family = AF_INET;
     taddr.sin_port = aport;
     taddr.sin_addr.s_addr = ahost;
+    memset(&taddr.sin_zero, 0, sizeof(taddr.sin_zero));
 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
     taddr.sin_len = sizeof(struct sockaddr_in);
 #endif
@@ -2124,7 +2158,7 @@ rxi_SendDebugPacket(struct rx_packet *apacket, osi_socket asocket,
        if (!waslocked)
            AFS_GLOCK();
        afs_Trace1(afs_iclSetp, CM_TRACE_TIMESTAMP, ICL_TYPE_STRING,
-                  "before osi_NetSend()");
+                  "before rxi_NetSend()");
        AFS_GUNLOCK();
     }
 #else
@@ -2133,14 +2167,14 @@ rxi_SendDebugPacket(struct rx_packet *apacket, osi_socket asocket,
 #endif
 #endif
     /* debug packets are not reliably delivered, hence the cast below. */
-    (void)osi_NetSend(asocket, &taddr, apacket->wirevec, apacket->niovecs,
+    (void)rxi_NetSend(asocket, &taddr, apacket->wirevec, apacket->niovecs,
                      apacket->length + RX_HEADER_SIZE, istack);
 #ifdef KERNEL
 #ifdef RX_KERNEL_TRACE
     if (ICL_SETACTIVE(afs_iclSetp)) {
        AFS_GLOCK();
        afs_Trace1(afs_iclSetp, CM_TRACE_TIMESTAMP, ICL_TYPE_STRING,
-                  "after osi_NetSend()");
+                  "after rxi_NetSend()");
        if (!waslocked)
            AFS_GUNLOCK();
     }
@@ -2203,6 +2237,7 @@ rxi_SendPacket(struct rx_call *call, struct rx_connection *conn,
     addr.sin_family = AF_INET;
     addr.sin_port = peer->port;
     addr.sin_addr.s_addr = peer->host;
+    memset(&addr.sin_zero, 0, sizeof(addr.sin_zero));
 
     /* This stuff should be revamped, I think, so that most, if not
      * all, of the header stuff is always added here.  We could
@@ -2276,7 +2311,7 @@ rxi_SendPacket(struct rx_call *call, struct rx_connection *conn,
            if (!waslocked)
                AFS_GLOCK();
            afs_Trace1(afs_iclSetp, CM_TRACE_TIMESTAMP, ICL_TYPE_STRING,
-                      "before osi_NetSend()");
+                      "before rxi_NetSend()");
            AFS_GUNLOCK();
        }
 #else
@@ -2285,7 +2320,7 @@ rxi_SendPacket(struct rx_call *call, struct rx_connection *conn,
 #endif
 #endif
        if ((code =
-            osi_NetSend(socket, &addr, p->wirevec, p->niovecs,
+            rxi_NetSend(socket, &addr, p->wirevec, p->niovecs,
                         p->length + RX_HEADER_SIZE, istack)) != 0) {
            /* send failed, so let's hurry up the resend, eh? */
             if (rx_stats_active)
@@ -2306,7 +2341,7 @@ rxi_SendPacket(struct rx_call *call, struct rx_connection *conn,
        if (ICL_SETACTIVE(afs_iclSetp)) {
            AFS_GLOCK();
            afs_Trace1(afs_iclSetp, CM_TRACE_TIMESTAMP, ICL_TYPE_STRING,
-                      "after osi_NetSend()");
+                      "after rxi_NetSend()");
            if (!waslocked)
                AFS_GUNLOCK();
        }
@@ -2356,6 +2391,7 @@ rxi_SendPacketList(struct rx_call *call, struct rx_connection *conn,
     addr.sin_family = AF_INET;
     addr.sin_port = peer->port;
     addr.sin_addr.s_addr = peer->host;
+    memset(&addr.sin_zero, 0, sizeof(addr.sin_zero));
 
     if (len + 1 > RX_MAXIOVECS) {
        osi_Panic("rxi_SendPacketList, len > RX_MAXIOVECS\n");
@@ -2369,19 +2405,17 @@ rxi_SendPacketList(struct rx_call *call, struct rx_connection *conn,
     conn->serial += len;
     for (i = 0; i < len; i++) {
        p = list[i];
+       /* a ping *or* a sequenced packet can count */
        if (p->length > conn->peer->maxPacketSize) {
-           /* a ping *or* a sequenced packet can count */
-           if ((p->length > conn->peer->maxPacketSize)) {
-               if (((p->header.type == RX_PACKET_TYPE_ACK) &&
-                    (p->header.flags & RX_REQUEST_ACK)) &&
-                   ((i == 0) || (p->length >= conn->lastPingSize))) {
-                   conn->lastPingSize = p->length;
-                   conn->lastPingSizeSer = serial + i;
-               } else if ((p->header.seq != 0) &&
-                          ((i == 0) || (p->length >= conn->lastPacketSize))) {
-                   conn->lastPacketSize = p->length;
-                   conn->lastPacketSizeSeq = p->header.seq;
-               }
+           if (((p->header.type == RX_PACKET_TYPE_ACK) &&
+                (p->header.flags & RX_REQUEST_ACK)) &&
+               ((i == 0) || (p->length >= conn->lastPingSize))) {
+               conn->lastPingSize = p->length;
+               conn->lastPingSizeSer = serial + i;
+           } else if ((p->header.seq != 0) &&
+                      ((i == 0) || (p->length >= conn->lastPacketSize))) {
+               conn->lastPacketSize = p->length;
+               conn->lastPacketSizeSeq = p->header.seq;
            }
        }
     }
@@ -2484,7 +2518,7 @@ rxi_SendPacketList(struct rx_call *call, struct rx_connection *conn,
            AFS_GUNLOCK();
 #endif
        if ((code =
-            osi_NetSend(socket, &addr, &wirevec[0], len + 1, length,
+            rxi_NetSend(socket, &addr, &wirevec[0], len + 1, length,
                         istack)) != 0) {
            /* send failed, so let's hurry up the resend, eh? */
             if (rx_stats_active)
@@ -2528,7 +2562,8 @@ rxi_SendPacketList(struct rx_call *call, struct rx_connection *conn,
 /* Send a raw abort packet, without any call or connection structures */
 void
 rxi_SendRawAbort(osi_socket socket, afs_uint32 host, u_short port,
-                afs_int32 error, struct rx_packet *source, int istack)
+                afs_uint32 serial, afs_int32 error,
+                struct rx_packet *source, int istack)
 {
     struct rx_header theader;
     struct sockaddr_in addr;
@@ -2537,12 +2572,20 @@ rxi_SendRawAbort(osi_socket socket, afs_uint32 host, u_short port,
     memset(&theader, 0, sizeof(theader));
     theader.epoch = htonl(source->header.epoch);
     theader.callNumber = htonl(source->header.callNumber);
-    theader.serial = htonl(1);
+    theader.serial = htonl(serial);
     theader.type = RX_PACKET_TYPE_ABORT;
     theader.serviceId = htons(source->header.serviceId);
     theader.securityIndex = source->header.securityIndex;
     theader.cid = htonl(source->header.cid);
 
+    /*
+     * If the abort is being sent in response to a server initiated packet,
+     * set client_initiated in the abort to ensure it is not associated by
+     * the receiver with a connection in the opposite direction.
+     */
+    if ((source->header.flags & RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
+        theader.flags |= RX_CLIENT_INITIATED;
+
     error = htonl(error);
 
     iov[0].iov_base = &theader;
@@ -2553,11 +2596,12 @@ rxi_SendRawAbort(osi_socket socket, afs_uint32 host, u_short port,
     addr.sin_family = AF_INET;
     addr.sin_addr.s_addr = host;
     addr.sin_port = port;
+    memset(&addr.sin_zero, 0, sizeof(addr.sin_zero));
 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
     addr.sin_len = sizeof(struct sockaddr_in);
 #endif
 
-    osi_NetSend(socket, &addr, iov, 2,
+    rxi_NetSend(socket, &addr, iov, 2,
                sizeof(struct rx_header) + sizeof(error), istack);
 }
 
@@ -2611,6 +2655,7 @@ rxi_SendSpecial(struct rx_call *call,
     p->header.seq = 0;
     p->header.epoch = conn->epoch;
     p->header.type = type;
+    p->header.userStatus = 0;
     p->header.flags = 0;
     if (conn->type == RX_CLIENT_CONNECTION)
        p->header.flags |= RX_CLIENT_INITIATED;
@@ -2716,6 +2761,7 @@ rxi_PrepareSendPacket(struct rx_call *call,
     afs_uint32 seq = call->tnext++;
     unsigned int i;
     afs_int32 len;             /* len must be a signed type; it can go negative */
+    int code;
 
     /* No data packets on call 0. Where do these come from? */
     if (*call->callNumber == 0)
@@ -2732,6 +2778,7 @@ rxi_PrepareSendPacket(struct rx_call *call,
     p->header.seq = seq;
     p->header.epoch = conn->epoch;
     p->header.type = RX_PACKET_TYPE_DATA;
+    p->header.userStatus = 0;
     p->header.flags = 0;
     p->header.spare = 0;
     if (conn->type == RX_CLIENT_CONNECTION)
@@ -2767,7 +2814,20 @@ rxi_PrepareSendPacket(struct rx_call *call,
     if (len)
         p->wirevec[i - 1].iov_len += len;
     MUTEX_ENTER(&call->lock);
-    RXS_PreparePacket(conn->securityObject, call, p);
+    code = RXS_PreparePacket(conn->securityObject, call, p);
+    if (code) {
+       MUTEX_EXIT(&call->lock);
+       rxi_ConnectionError(conn, code);
+       MUTEX_ENTER(&conn->conn_data_lock);
+       p = rxi_SendConnectionAbort(conn, p, 0, 0);
+       MUTEX_EXIT(&conn->conn_data_lock);
+       MUTEX_ENTER(&call->lock);
+       /* setting a connection error means all calls for that conn are also
+        * error'd. if this call does not have an error by now, something is
+        * very wrong, and we risk sending data in the clear that is supposed
+        * to be encrypted. */
+       osi_Assert(call->error);
+    }
 }
 
 /* Given an interface MTU size, calculate an adjusted MTU size that