rx: Assert call error for RXS_PreparePacket error
[openafs.git] / src / rx / rx_packet.c
index d0cb5d3..442f598 100644 (file)
 # include <sys/sysmacros.h>
 #endif
 
+#include <opr/queue.h>
+
 #include "rx.h"
 #include "rx_clock.h"
-#include "rx_queue.h"
 #include "rx_packet.h"
 #include "rx_atomic.h"
 #include "rx_globals.h"
@@ -82,7 +83,7 @@ static afs_uint32       rx_packet_id = 0;
 
 extern char cml_version_number[];
 
-static int AllocPacketBufs(int class, int num_pkts, struct rx_queue *q);
+static int AllocPacketBufs(int class, int num_pkts, struct opr_queue *q);
 
 static void rxi_SendDebugPacket(struct rx_packet *apacket, osi_socket asocket,
                                afs_uint32 ahost, short aport,
@@ -102,9 +103,11 @@ static void rxi_AdjustLocalPacketsTSFPQ(int num_keep_local,
 static void rxi_FreePacketNoLock(struct rx_packet *p);
 static int rxi_FreeDataBufsNoLock(struct rx_packet *p, afs_uint32 first);
 static int rxi_FreeDataBufsToQueue(struct rx_packet *p, afs_uint32 first,
-                                  struct rx_queue * q);
+                                  struct opr_queue * q);
 #endif
 
+extern struct opr_queue rx_idleServerQueue;
+
 /* some rules about packets:
  * 1.  When a packet is allocated, the final iov_buf contains room for
  * a security trailer, but iov_len masks that fact.  If the security
@@ -238,14 +241,14 @@ rx_SlowWritePacket(struct rx_packet * packet, int offset, int resid, char *in)
 }
 
 int
-rxi_AllocPackets(int class, int num_pkts, struct rx_queue * q)
+rxi_AllocPackets(int class, int num_pkts, struct opr_queue * q)
 {
-    struct rx_packet *p, *np;
+    struct opr_queue *c;
 
     num_pkts = AllocPacketBufs(class, num_pkts, q);
 
-    for (queue_Scan(q, p, np, rx_packet)) {
-        RX_PACKET_IOV_FULLINIT(p);
+    for (opr_queue_Scan(q, c)) {
+        RX_PACKET_IOV_FULLINIT(opr_queue_Entry(c, struct rx_packet, entry));
     }
 
     return num_pkts;
@@ -253,7 +256,7 @@ rxi_AllocPackets(int class, int num_pkts, struct rx_queue * q)
 
 #ifdef RX_ENABLE_TSFPQ
 static int
-AllocPacketBufs(int class, int num_pkts, struct rx_queue * q)
+AllocPacketBufs(int class, int num_pkts, struct opr_queue * q)
 {
     struct rx_ts_info_t * rx_ts_info;
     int transfer;
@@ -283,7 +286,7 @@ AllocPacketBufs(int class, int num_pkts, struct rx_queue * q)
 }
 #else /* RX_ENABLE_TSFPQ */
 static int
-AllocPacketBufs(int class, int num_pkts, struct rx_queue * q)
+AllocPacketBufs(int class, int num_pkts, struct opr_queue * q)
 {
     struct rx_packet *c;
     int i;
@@ -336,13 +339,13 @@ AllocPacketBufs(int class, int num_pkts, struct rx_queue * q)
     }
 #endif /* KERNEL */
 
-    for (i=0, c=queue_First(&rx_freePacketQueue, rx_packet);
+    for (i=0, c=opr_queue_First(&rx_freePacketQueue, struct rx_packet, entry);
         i < num_pkts;
-        i++, c=queue_Next(c, rx_packet)) {
+        i++, c=opr_queue_Next(&c->entry, struct rx_packet, entry)) {
         RX_FPQ_MARK_USED(c);
     }
 
-    queue_SplitBeforeAppend(&rx_freePacketQueue,q,c);
+    opr_queue_SplitBeforeAppend(&rx_freePacketQueue, q, &c->entry);
 
     rx_nFreePackets -= num_pkts;
 
@@ -362,22 +365,25 @@ AllocPacketBufs(int class, int num_pkts, struct rx_queue * q)
 #ifdef RX_ENABLE_TSFPQ
 /* num_pkts=0 means queue length is unknown */
 int
-rxi_FreePackets(int num_pkts, struct rx_queue * q)
+rxi_FreePackets(int num_pkts, struct opr_queue * q)
 {
     struct rx_ts_info_t * rx_ts_info;
-    struct rx_packet *c, *nc;
+    struct opr_queue *cursor, *store;
     SPLVAR;
 
     osi_Assert(num_pkts >= 0);
     RX_TS_INFO_GET(rx_ts_info);
 
     if (!num_pkts) {
-       for (queue_Scan(q, c, nc, rx_packet), num_pkts++) {
-           rxi_FreeDataBufsTSFPQ(c, 2, 0);
+       for (opr_queue_ScanSafe(q, cursor, store)) {
+           num_pkts++;
+           rxi_FreeDataBufsTSFPQ(opr_queue_Entry(cursor, struct rx_packet, 
+                                                entry), 2, 0);
        }
     } else {
-       for (queue_Scan(q, c, nc, rx_packet)) {
-           rxi_FreeDataBufsTSFPQ(c, 2, 0);
+       for (opr_queue_ScanSafe(q, cursor, store)) {
+           rxi_FreeDataBufsTSFPQ(opr_queue_Entry(cursor, struct rx_packet, 
+                                                entry), 2, 0);
        }
     }
 
@@ -403,27 +409,33 @@ rxi_FreePackets(int num_pkts, struct rx_queue * q)
 #else /* RX_ENABLE_TSFPQ */
 /* num_pkts=0 means queue length is unknown */
 int
-rxi_FreePackets(int num_pkts, struct rx_queue *q)
+rxi_FreePackets(int num_pkts, struct opr_queue *q)
 {
-    struct rx_queue cbs;
-    struct rx_packet *p, *np;
+    struct opr_queue cbs;
+    struct opr_queue *cursor, *store;
     int qlen = 0;
     SPLVAR;
 
     osi_Assert(num_pkts >= 0);
-    queue_Init(&cbs);
+    opr_queue_Init(&cbs);
 
     if (!num_pkts) {
-        for (queue_Scan(q, p, np, rx_packet), num_pkts++) {
+        for (opr_queue_ScanSafe(q, cursor, store)) {
+           struct rx_packet *p
+               = opr_queue_Entry(cursor, struct rx_packet, entry);
            if (p->niovecs > 2) {
                qlen += rxi_FreeDataBufsToQueue(p, 2, &cbs);
            }
             RX_FPQ_MARK_FREE(p);
+           num_pkts++;
        }
        if (!num_pkts)
            return 0;
     } else {
-        for (queue_Scan(q, p, np, rx_packet)) {
+        for (opr_queue_ScanSafe(q, cursor, store)) {
+           struct rx_packet *p
+               = opr_queue_Entry(cursor, struct rx_packet, entry);
+
            if (p->niovecs > 2) {
                qlen += rxi_FreeDataBufsToQueue(p, 2, &cbs);
            }
@@ -432,7 +444,7 @@ rxi_FreePackets(int num_pkts, struct rx_queue *q)
     }
 
     if (qlen) {
-       queue_SpliceAppend(q, &cbs);
+       opr_queue_SpliceAppend(q, &cbs);
        qlen += num_pkts;
     } else
        qlen = num_pkts;
@@ -440,7 +452,7 @@ rxi_FreePackets(int num_pkts, struct rx_queue *q)
     NETPRI;
     MUTEX_ENTER(&rx_freePktQ_lock);
 
-    queue_SpliceAppend(&rx_freePacketQueue, q);
+    opr_queue_SpliceAppend(&rx_freePacketQueue, q);
     rx_nFreePackets += qlen;
 
     /* Wakeup anyone waiting for packets */
@@ -490,8 +502,7 @@ int
 rxi_AllocDataBuf(struct rx_packet *p, int nb, int class)
 {
     int i, nv;
-    struct rx_queue q;
-    struct rx_packet *cb, *ncb;
+    struct opr_queue q, *cursor, *store;
 
     /* compute the number of cbuf's we need */
     nv = nb / RX_CBUFFERSIZE;
@@ -503,14 +514,19 @@ rxi_AllocDataBuf(struct rx_packet *p, int nb, int class)
         return nb;
 
     /* allocate buffers */
-    queue_Init(&q);
+    opr_queue_Init(&q);
     nv = AllocPacketBufs(class, nv, &q);
 
     /* setup packet iovs */
-    for (i = p->niovecs, queue_Scan(&q, cb, ncb, rx_packet), i++) {
-        queue_Remove(cb);
+    i = p ->niovecs;
+    for (opr_queue_ScanSafe(&q, cursor, store)) {
+       struct rx_packet *cb
+           = opr_queue_Entry(cursor, struct rx_packet, entry);
+
+        opr_queue_Remove(&cb->entry);
         p->wirevec[i].iov_base = (caddr_t) cb->localdata;
         p->wirevec[i].iov_len = RX_CBUFFERSIZE;
+       i++;
     }
 
     nb -= (nv * RX_CBUFFERSIZE);
@@ -600,7 +616,7 @@ rxi_MorePackets(int apackets)
 #endif
        p->niovecs = 2;
 
-       queue_Append(&rx_freePacketQueue, p);
+       opr_queue_Append(&rx_freePacketQueue, &p->entry);
 #ifdef RXDEBUG_PACKET
         p->packetId = rx_packet_id++;
         p->allNextp = rx_mallocedP;
@@ -710,7 +726,7 @@ rxi_MorePacketsNoLock(int apackets)
 #endif
        p->niovecs = 2;
 
-       queue_Append(&rx_freePacketQueue, p);
+       opr_queue_Append(&rx_freePacketQueue, &p->entry);
 #ifdef RXDEBUG_PACKET
         p->packetId = rx_packet_id++;
         p->allNextp = rx_mallocedP;
@@ -811,7 +827,7 @@ rxi_FreePacketNoLock(struct rx_packet *p)
 
     RX_FPQ_MARK_FREE(p);
     rx_nFreePackets++;
-    queue_Append(&rx_freePacketQueue, p);
+    opr_queue_Append(&rx_freePacketQueue, &p->entry);
 }
 #endif /* RX_ENABLE_TSFPQ */
 
@@ -852,7 +868,7 @@ rxi_FreePacketTSFPQ(struct rx_packet *p, int flush_global)
  */
 #ifndef RX_ENABLE_TSFPQ
 static int
-rxi_FreeDataBufsToQueue(struct rx_packet *p, afs_uint32 first, struct rx_queue * q)
+rxi_FreeDataBufsToQueue(struct rx_packet *p, afs_uint32 first, struct opr_queue * q)
 {
     struct iovec *iov;
     struct rx_packet * cb;
@@ -864,7 +880,7 @@ rxi_FreeDataBufsToQueue(struct rx_packet *p, afs_uint32 first, struct rx_queue *
            osi_Panic("rxi_FreeDataBufsToQueue: unexpected NULL iov");
        cb = RX_CBUF_TO_PACKET(iov->iov_base, p);
        RX_FPQ_MARK_FREE(cb);
-       queue_Append(q, cb);
+       opr_queue_Append(q, &cb->entry);
     }
     p->length = 0;
     p->niovecs = 0;
@@ -1129,13 +1145,13 @@ rxi_AllocPacketNoLock(int class)
 
     if (rx_stats_active)
         rx_atomic_inc(&rx_stats.packetRequests);
-    if (queue_IsEmpty(&rx_ts_info->_FPQ)) {
+    if (opr_queue_IsEmpty(&rx_ts_info->_FPQ.queue)) {
 
 #ifdef KERNEL
-        if (queue_IsEmpty(&rx_freePacketQueue))
+        if (opr_queue_IsEmpty(&rx_freePacketQueue))
            osi_Panic("rxi_AllocPacket error");
 #else /* KERNEL */
-        if (queue_IsEmpty(&rx_freePacketQueue))
+        if (opr_queue_IsEmpty(&rx_freePacketQueue))
            rxi_MorePacketsNoLock(rx_maxSendWindow);
 #endif /* KERNEL */
 
@@ -1191,16 +1207,16 @@ rxi_AllocPacketNoLock(int class)
         rx_atomic_inc(&rx_stats.packetRequests);
 
 #ifdef KERNEL
-    if (queue_IsEmpty(&rx_freePacketQueue))
+    if (opr_queue_IsEmpty(&rx_freePacketQueue))
        osi_Panic("rxi_AllocPacket error");
 #else /* KERNEL */
-    if (queue_IsEmpty(&rx_freePacketQueue))
+    if (opr_queue_IsEmpty(&rx_freePacketQueue))
        rxi_MorePacketsNoLock(rx_maxSendWindow);
 #endif /* KERNEL */
 
     rx_nFreePackets--;
-    p = queue_First(&rx_freePacketQueue, rx_packet);
-    queue_Remove(p);
+    p = opr_queue_First(&rx_freePacketQueue, struct rx_packet, entry);
+    opr_queue_Remove(&p->entry);
     RX_FPQ_MARK_USED(p);
 
     dpf(("Alloc %"AFS_PTR_FMT", class %d\n", p, class));
@@ -1226,16 +1242,16 @@ rxi_AllocPacketTSFPQ(int class, int pull_global)
 
     if (rx_stats_active)
         rx_atomic_inc(&rx_stats.packetRequests);
-    if (pull_global && queue_IsEmpty(&rx_ts_info->_FPQ)) {
+    if (pull_global && opr_queue_IsEmpty(&rx_ts_info->_FPQ.queue)) {
         MUTEX_ENTER(&rx_freePktQ_lock);
 
-        if (queue_IsEmpty(&rx_freePacketQueue))
+        if (opr_queue_IsEmpty(&rx_freePacketQueue))
            rxi_MorePacketsNoLock(rx_maxSendWindow);
 
        RX_TS_FPQ_GTOL(rx_ts_info);
 
         MUTEX_EXIT(&rx_freePktQ_lock);
-    } else if (queue_IsEmpty(&rx_ts_info->_FPQ)) {
+    } else if (opr_queue_IsEmpty(&rx_ts_info->_FPQ.queue)) {
         return NULL;
     }
 
@@ -1746,7 +1762,6 @@ rxi_ReceiveDebugPacket(struct rx_packet *ap, osi_socket asocket,
 {
     struct rx_debugIn tin;
     afs_int32 tl;
-    struct rx_serverQueueEntry *np, *nqe;
 
     /*
      * Only respond to client-initiated Rx debug packets,
@@ -1784,8 +1799,7 @@ rxi_ReceiveDebugPacket(struct rx_packet *ap, osi_socket asocket,
            tstat.usedFDs = CountFDs(64);
            tstat.nWaiting = htonl(rx_atomic_read(&rx_nWaiting));
            tstat.nWaited = htonl(rx_atomic_read(&rx_nWaited));
-           queue_Count(&rx_idleServerQueue, np, nqe, rx_serverQueueEntry,
-                       tstat.idleThreads);
+           tstat.idleThreads = opr_queue_Count(&rx_idleServerQueue);
            MUTEX_EXIT(&rx_serverPool_lock);
            tstat.idleThreads = htonl(tstat.idleThreads);
            tl = sizeof(struct rx_debugStats) - ap->length;
@@ -1846,11 +1860,11 @@ rxi_ReceiveDebugPacket(struct rx_packet *ap, osi_socket asocket,
                            tconn.callNumber[j] = htonl(tc->callNumber[j]);
                            if ((tcall = tc->call[j])) {
                                tconn.callState[j] = tcall->state;
-                               tconn.callMode[j] = tcall->mode;
+                               tconn.callMode[j] = tcall->app.mode;
                                tconn.callFlags[j] = tcall->flags;
-                               if (queue_IsNotEmpty(&tcall->rq))
+                               if (!opr_queue_IsEmpty(&tcall->rq))
                                    tconn.callOther[j] |= RX_OTHER_IN;
-                               if (queue_IsNotEmpty(&tcall->tq))
+                               if (!opr_queue_IsEmpty(&tcall->tq))
                                    tconn.callOther[j] |= RX_OTHER_OUT;
                            } else
                                tconn.callState[j] = RX_STATE_NOTINIT;
@@ -2143,6 +2157,31 @@ rxi_SendDebugPacket(struct rx_packet *apacket, osi_socket asocket,
 
 }
 
+static void
+rxi_NetSendError(struct rx_call *call, int code)
+{
+    int down = 0;
+#ifdef AFS_NT40_ENV
+    if (code == -1 && WSAGetLastError() == WSAEHOSTUNREACH) {
+       down = 1;
+    }
+    if (code == -WSAEHOSTUNREACH) {
+       down = 1;
+    }
+#elif defined(AFS_LINUX20_ENV)
+    if (code == -ENETUNREACH) {
+       down = 1;
+    }
+#elif defined(AFS_DARWIN_ENV)
+    if (code == EHOSTUNREACH) {
+       down = 1;
+    }
+#endif
+    if (down) {
+       call->lastReceiveTime = 0;
+    }
+}
+
 /* Send the packet to appropriate destination for the specified
  * call.  The header is first encoded and placed in the packet.
  */
@@ -2259,18 +2298,9 @@ rxi_SendPacket(struct rx_call *call, struct rx_connection *conn,
             * So, when this happens let's "down" the host NOW so
             * we don't sit around waiting for this host to timeout later.
             */
-           if (call &&
-#ifdef AFS_NT40_ENV
-               (code == -1 && WSAGetLastError() == WSAEHOSTUNREACH) || (code == -WSAEHOSTUNREACH)
-#elif defined(AFS_LINUX20_ENV)
-               code == -ENETUNREACH
-#elif defined(AFS_DARWIN_ENV)
-               code == EHOSTUNREACH
-#else
-               0
-#endif
-               )
-               call->lastReceiveTime = 0;
+           if (call) {
+               rxi_NetSendError(call, code);
+           }
        }
 #ifdef KERNEL
 #ifdef RX_KERNEL_TRACE
@@ -2469,18 +2499,9 @@ rxi_SendPacketList(struct rx_call *call, struct rx_connection *conn,
             * So, when this happens let's "down" the host NOW so
             * we don't sit around waiting for this host to timeout later.
             */
-           if (call &&
-#ifdef AFS_NT40_ENV
-               (code == -1 && WSAGetLastError() == WSAEHOSTUNREACH) || (code == -WSAEHOSTUNREACH)
-#elif defined(AFS_LINUX20_ENV)
-               code == -ENETUNREACH
-#elif defined(AFS_DARWIN_ENV)
-               code == EHOSTUNREACH
-#else
-               0
-#endif
-               )
-               call->lastReceiveTime = 0;
+           if (call) {
+               rxi_NetSendError(call, code);
+           }
        }
 #if    defined(AFS_SUN5_ENV) && defined(KERNEL)
        if (!istack && waslocked)
@@ -2523,6 +2544,14 @@ rxi_SendRawAbort(osi_socket socket, afs_uint32 host, u_short port,
     theader.securityIndex = source->header.securityIndex;
     theader.cid = htonl(source->header.cid);
 
+    /*
+     * If the abort is being sent in response to a server initiated packet,
+     * set client_initiated in the abort to ensure it is not associated by
+     * the receiver with a connection in the opposite direction.
+     */
+    if ((source->header.flags & RX_CLIENT_INITIATED) != RX_CLIENT_INITIATED)
+        theader.flags |= RX_CLIENT_INITIATED;
+
     error = htonl(error);
 
     iov[0].iov_base = &theader;
@@ -2696,6 +2725,7 @@ rxi_PrepareSendPacket(struct rx_call *call,
     afs_uint32 seq = call->tnext++;
     unsigned int i;
     afs_int32 len;             /* len must be a signed type; it can go negative */
+    int code;
 
     /* No data packets on call 0. Where do these come from? */
     if (*call->callNumber == 0)
@@ -2747,7 +2777,20 @@ rxi_PrepareSendPacket(struct rx_call *call,
     if (len)
         p->wirevec[i - 1].iov_len += len;
     MUTEX_ENTER(&call->lock);
-    RXS_PreparePacket(conn->securityObject, call, p);
+    code = RXS_PreparePacket(conn->securityObject, call, p);
+    if (code) {
+       MUTEX_EXIT(&call->lock);
+       rxi_ConnectionError(conn, code);
+       MUTEX_ENTER(&conn->conn_data_lock);
+       p = rxi_SendConnectionAbort(conn, p, 0, 0);
+       MUTEX_EXIT(&conn->conn_data_lock);
+       MUTEX_ENTER(&call->lock);
+       /* setting a connection error means all calls for that conn are also
+        * error'd. if this call does not have an error by now, something is
+        * very wrong, and we risk sending data in the clear that is supposed
+        * to be encrypted. */
+       osi_Assert(call->error);
+    }
 }
 
 /* Given an interface MTU size, calculate an adjusted MTU size that