static void rxi_GrowMTUOn(struct rx_call *call);
static void rxi_ChallengeOn(struct rx_connection *conn);
static int rxi_CheckCall(struct rx_call *call, int haveCTLock);
+static void rxi_AckAllInTransmitQueue(struct rx_call *call);
#ifdef RX_ENABLE_LOCKS
-static void rxi_SetAcksInTransmitQueue(struct rx_call *call);
-
struct rx_tq_debug {
rx_atomic_t rxi_start_aborted; /* rxi_start awoke after rxi_Send in error.*/
rx_atomic_t rxi_start_in_error;
MUTEX_EXIT(&conn->conn_call_lock);
}
+/*!
+ * Abort the call if the server is over the busy threshold. This
+ * can be used without requiring a call structure be initialised,
+ * or connected to a particular channel
+ */
+static_inline int
+rxi_AbortIfServerBusy(osi_socket socket, struct rx_connection *conn,
+ struct rx_packet *np)
+{
+ if ((rx_BusyThreshold > 0) &&
+ (rx_atomic_read(&rx_nWaiting) > rx_BusyThreshold)) {
+ rxi_SendRawAbort(socket, conn->peer->host, conn->peer->port,
+ rx_BusyError, np, 0);
+ if (rx_stats_active)
+ rx_atomic_inc(&rx_stats.nBusies);
+ return 1;
+ }
+
+ return 0;
+}
+
+static_inline struct rx_call *
+rxi_ReceiveClientCall(struct rx_packet *np, struct rx_connection *conn)
+{
+ int channel;
+ struct rx_call *call;
+
+ channel = np->header.cid & RX_CHANNELMASK;
+ MUTEX_ENTER(&conn->conn_call_lock);
+ call = conn->call[channel];
+ if (!call || conn->callNumber[channel] != np->header.callNumber) {
+ MUTEX_EXIT(&conn->conn_call_lock);
+ if (rx_stats_active)
+ rx_atomic_inc(&rx_stats.spuriousPacketsRead);
+ return NULL;
+ }
+
+ MUTEX_ENTER(&call->lock);
+ MUTEX_EXIT(&conn->conn_call_lock);
+
+ if ((call->state == RX_STATE_DALLY)
+ && np->header.type == RX_PACKET_TYPE_ACK) {
+ if (rx_stats_active)
+ rx_atomic_inc(&rx_stats.ignorePacketDally);
+ MUTEX_EXIT(&call->lock);
+ return NULL;
+ }
+
+ return call;
+}
+
+static_inline struct rx_call *
+rxi_ReceiveServerCall(osi_socket socket, struct rx_packet *np,
+ struct rx_connection *conn)
+{
+ int channel;
+ struct rx_call *call;
+
+ channel = np->header.cid & RX_CHANNELMASK;
+ MUTEX_ENTER(&conn->conn_call_lock);
+ call = conn->call[channel];
+
+ if (!call) {
+ if (rxi_AbortIfServerBusy(socket, conn, np)) {
+ MUTEX_EXIT(&conn->conn_call_lock);
+ return NULL;
+ }
+
+ call = rxi_NewCall(conn, channel); /* returns locked call */
+ *call->callNumber = np->header.callNumber;
+ MUTEX_EXIT(&conn->conn_call_lock);
+
+ call->state = RX_STATE_PRECALL;
+ clock_GetTime(&call->queueTime);
+ call->app.bytesSent = 0;
+ call->app.bytesRcvd = 0;
+ rxi_KeepAliveOn(call);
+
+ return call;
+ }
+
+ if (np->header.callNumber == conn->callNumber[channel]) {
+ MUTEX_ENTER(&call->lock);
+ MUTEX_EXIT(&conn->conn_call_lock);
+ return call;
+ }
+
+ if (np->header.callNumber < conn->callNumber[channel]) {
+ MUTEX_EXIT(&conn->conn_call_lock);
+ if (rx_stats_active)
+ rx_atomic_inc(&rx_stats.spuriousPacketsRead);
+ return NULL;
+ }
+
+ MUTEX_ENTER(&call->lock);
+ MUTEX_EXIT(&conn->conn_call_lock);
+
+ /* Wait until the transmit queue is idle before deciding
+ * whether to reset the current call. Chances are that the
+ * call will be in ether DALLY or HOLD state once the TQ_BUSY
+ * flag is cleared.
+ */
+#ifdef RX_ENABLE_LOCKS
+ if (call->state == RX_STATE_ACTIVE) {
+ rxi_WaitforTQBusy(call);
+ /* If we entered error state while waiting,
+ * must call rxi_CallError to permit rxi_ResetCall
+ * to processed when the tqWaiter count hits zero.
+ */
+ if (call->error) {
+ rxi_CallError(call, call->error);
+ MUTEX_EXIT(&call->lock);
+ return NULL;
+ }
+ }
+#endif /* RX_ENABLE_LOCKS */
+ /* If the new call cannot be taken right now send a busy and set
+ * the error condition in this call, so that it terminates as
+ * quickly as possible */
+ if (call->state == RX_STATE_ACTIVE) {
+ rxi_CallError(call, RX_CALL_DEAD);
+ rxi_SendSpecial(call, conn, NULL, RX_PACKET_TYPE_BUSY,
+ NULL, 0, 1);
+ MUTEX_EXIT(&call->lock);
+ return NULL;
+ }
+
+ if (rxi_AbortIfServerBusy(socket, conn, np)) {
+ MUTEX_EXIT(&call->lock);
+ return NULL;
+ }
+
+ rxi_ResetCall(call, 0);
+ /* The conn_call_lock is not held but no one else should be
+ * using this call channel while we are processing this incoming
+ * packet. This assignment should be safe.
+ */
+ *call->callNumber = np->header.callNumber;
+ call->state = RX_STATE_PRECALL;
+ clock_GetTime(&call->queueTime);
+ call->app.bytesSent = 0;
+ call->app.bytesRcvd = 0;
+ rxi_KeepAliveOn(call);
+
+ return call;
+}
+
+
/* There are two packet tracing routines available for testing and monitoring
* Rx. One is called just after every packet is received and the other is
* called just before every packet is sent. Received packets, have had their
{
struct rx_call *call;
struct rx_connection *conn;
- int channel;
- afs_uint32 currentCallNumber;
int type;
#ifdef RXDEBUG
char *packetType;
}
}
- channel = np->header.cid & RX_CHANNELMASK;
- MUTEX_ENTER(&conn->conn_call_lock);
- call = conn->call[channel];
-
- if (call) {
- MUTEX_ENTER(&call->lock);
- currentCallNumber = conn->callNumber[channel];
- MUTEX_EXIT(&conn->conn_call_lock);
- } else if (type == RX_SERVER_CONNECTION) { /* No call allocated */
- call = rxi_NewCall(conn, channel); /* returns locked call */
- *call->callNumber = currentCallNumber = np->header.callNumber;
- MUTEX_EXIT(&conn->conn_call_lock);
-#ifdef RXDEBUG
- if (np->header.callNumber == 0)
- dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, "
- "packet %"AFS_PTR_FMT" len %d\n",
- np->header.serial, rx_packetTypes[np->header.type - 1],
- ntohl(conn->peer->host), ntohs(conn->peer->port),
- np->header.serial, np->header.epoch, np->header.cid,
- np->header.callNumber, np->header.seq,
- np->header.flags, np, np->length));
-#endif
- call->state = RX_STATE_PRECALL;
- clock_GetTime(&call->queueTime);
- call->app.bytesSent = 0;
- call->app.bytesRcvd = 0;
- /*
- * If the number of queued calls exceeds the overload
- * threshold then abort this call.
- */
- if ((rx_BusyThreshold > 0) &&
- (rx_atomic_read(&rx_nWaiting) > rx_BusyThreshold)) {
- struct rx_packet *tp;
+ if (type == RX_SERVER_CONNECTION)
+ call = rxi_ReceiveServerCall(socket, np, conn);
+ else
+ call = rxi_ReceiveClientCall(np, conn);
- rxi_CallError(call, rx_BusyError);
- tp = rxi_SendCallAbort(call, np, 1, 0);
- MUTEX_EXIT(&call->lock);
- putConnection(conn);
- if (rx_stats_active)
- rx_atomic_inc(&rx_stats.nBusies);
- return tp;
- }
- rxi_KeepAliveOn(call);
- } else { /* RX_CLIENT_CONNECTION and No call allocated */
- /* This packet can't be for this call. If the new call address is
- * 0 then no call is running on this channel. If there is a call
- * then, since this is a client connection we're getting data for
- * it must be for the previous call.
- */
- MUTEX_EXIT(&conn->conn_call_lock);
- if (rx_stats_active)
- rx_atomic_inc(&rx_stats.spuriousPacketsRead);
+ if (call == NULL) {
putConnection(conn);
- return np;
- }
-
- /* There is a non-NULL locked call at this point */
- if (type == RX_SERVER_CONNECTION) { /* We're the server */
- if (np->header.callNumber < currentCallNumber) {
- MUTEX_EXIT(&call->lock);
- if (rx_stats_active)
- rx_atomic_inc(&rx_stats.spuriousPacketsRead);
- putConnection(conn);
- return np;
- } else if (np->header.callNumber != currentCallNumber) {
- /* Wait until the transmit queue is idle before deciding
- * whether to reset the current call. Chances are that the
- * call will be in ether DALLY or HOLD state once the TQ_BUSY
- * flag is cleared.
- */
-#ifdef RX_ENABLE_LOCKS
- if (call->state == RX_STATE_ACTIVE) {
- rxi_WaitforTQBusy(call);
- /*
- * If we entered error state while waiting,
- * must call rxi_CallError to permit rxi_ResetCall
- * to processed when the tqWaiter count hits zero.
- */
- if (call->error) {
- rxi_CallError(call, call->error);
- MUTEX_EXIT(&call->lock);
- putConnection(conn);
- return np;
- }
- }
-#endif /* RX_ENABLE_LOCKS */
- /* If the new call cannot be taken right now send a busy and set
- * the error condition in this call, so that it terminates as
- * quickly as possible */
- if (call->state == RX_STATE_ACTIVE) {
- struct rx_packet *tp;
-
- rxi_CallError(call, RX_CALL_DEAD);
- tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY,
- NULL, 0, 1);
- MUTEX_EXIT(&call->lock);
- putConnection(conn);
- return tp;
- }
- rxi_ResetCall(call, 0);
- /*
- * The conn_call_lock is not held but no one else should be
- * using this call channel while we are processing this incoming
- * packet. This assignment should be safe.
- */
- *call->callNumber = np->header.callNumber;
-#ifdef RXDEBUG
- if (np->header.callNumber == 0)
- dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" len %d\n",
- np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port),
- np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq,
- np->header.flags, np, np->length));
-#endif
- call->state = RX_STATE_PRECALL;
- clock_GetTime(&call->queueTime);
- call->app.bytesSent = 0;
- call->app.bytesRcvd = 0;
- /*
- * If the number of queued calls exceeds the overload
- * threshold then abort this call.
- */
- if ((rx_BusyThreshold > 0) &&
- (rx_atomic_read(&rx_nWaiting) > rx_BusyThreshold)) {
- struct rx_packet *tp;
-
- rxi_CallError(call, rx_BusyError);
- tp = rxi_SendCallAbort(call, np, 1, 0);
- MUTEX_EXIT(&call->lock);
- putConnection(conn);
- if (rx_stats_active)
- rx_atomic_inc(&rx_stats.nBusies);
- return tp;
- }
- rxi_KeepAliveOn(call);
- } else {
- /* Continuing call; do nothing here. */
- }
- } else { /* we're the client */
- /* Ignore all incoming acknowledgements for calls in DALLY state */
- if ((call->state == RX_STATE_DALLY)
- && (np->header.type == RX_PACKET_TYPE_ACK)) {
- if (rx_stats_active)
- rx_atomic_inc(&rx_stats.ignorePacketDally);
- MUTEX_EXIT(&call->lock);
- putConnection(conn);
- return np;
- }
-
- /* Ignore anything that's not relevant to the current call. If there
- * isn't a current call, then no packet is relevant. */
- if (np->header.callNumber != currentCallNumber) {
- if (rx_stats_active)
- rx_atomic_inc(&rx_stats.spuriousPacketsRead);
- MUTEX_EXIT(&call->lock);
- putConnection(conn);
- return np;
- }
- /* If the service security object index stamped in the packet does not
- * match the connection's security index, ignore the packet */
- if (np->header.securityIndex != conn->securityIndex) {
- MUTEX_EXIT(&call->lock);
- putConnection(conn);
- return np;
- }
-
- /* If we're receiving the response, then all transmit packets are
- * implicitly acknowledged. Get rid of them. */
- if (np->header.type == RX_PACKET_TYPE_DATA) {
-#ifdef RX_ENABLE_LOCKS
- /* XXX Hack. Because we must release the call lock when
- * sending packets (osi_NetSend) we drop all acks while we're
- * traversing the tq in rxi_Start sending packets out because
- * packets may move to the freePacketQueue as result of being here!
- * So we drop these packets until we're safely out of the
- * traversing. Really ugly!
- * For fine grain RX locking, we set the acked field in the
- * packets and let rxi_Start remove them from the transmit queue.
- */
- if (call->flags & RX_CALL_TQ_BUSY) {
- rxi_SetAcksInTransmitQueue(call);
- } else {
- rxi_ClearTransmitQueue(call, 0);
- }
-#else /* RX_ENABLE_LOCKS */
- rxi_ClearTransmitQueue(call, 0);
-#endif /* RX_ENABLE_LOCKS */
- } else {
- if (np->header.type == RX_PACKET_TYPE_ACK) {
- /* now check to see if this is an ack packet acknowledging that the
- * server actually *lost* some hard-acked data. If this happens we
- * ignore this packet, as it may indicate that the server restarted in
- * the middle of a call. It is also possible that this is an old ack
- * packet. We don't abort the connection in this case, because this
- * *might* just be an old ack packet. The right way to detect a server
- * restart in the midst of a call is to notice that the server epoch
- * changed, btw. */
- /* XXX I'm not sure this is exactly right, since tfirst **IS**
- * XXX unacknowledged. I think that this is off-by-one, but
- * XXX I don't dare change it just yet, since it will
- * XXX interact badly with the server-restart detection
- * XXX code in receiveackpacket. */
- if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
- if (rx_stats_active)
- rx_atomic_inc(&rx_stats.spuriousPacketsRead);
- MUTEX_EXIT(&call->lock);
- putConnection(conn);
- return np;
- }
- }
- } /* else not a data packet */
+ return np;
}
osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
/* Now do packet type-specific processing */
switch (np->header.type) {
case RX_PACKET_TYPE_DATA:
+ /* If we're a client, and receiving a response, then all the packets
+ * we transmitted packets are implicitly acknowledged. */
+ if (type == RX_CLIENT_CONNECTION && !opr_queue_IsEmpty(&call->tq))
+ rxi_AckAllInTransmitQueue(call);
+
np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port, tnop,
newcallp);
break;
case RX_PACKET_TYPE_ACKALL:
/* All packets acknowledged, so we can drop all packets previously
* readied for sending */
-#ifdef RX_ENABLE_LOCKS
- /* XXX Hack. We because we can't release the call lock when
- * sending packets (osi_NetSend) we drop all ack pkts while we're
- * traversing the tq in rxi_Start sending packets out because
- * packets may move to the freePacketQueue as result of being
- * here! So we drop these packets until we're safely out of the
- * traversing. Really ugly!
- * For fine grain RX locking, we set the acked field in the packets
- * and let rxi_Start remove the packets from the transmit queue.
- */
- if (call->flags & RX_CALL_TQ_BUSY) {
- rxi_SetAcksInTransmitQueue(call);
- break;
- }
-#endif /* RX_ENABLE_LOCKS */
- rxi_ClearTransmitQueue(call, 0);
+ rxi_AckAllInTransmitQueue(call);
break;
default:
/* Should not reach here, unless the peer is broken: send an abort
#endif /* RX_ENABLE_LOCKS */
}
-
#ifdef RX_ENABLE_LOCKS
/* Set ack in all packets in transmit queue. rxi_Start will deal with
* clearing them out.
}
#endif /* RX_ENABLE_LOCKS */
+/*!
+ * Acknowledge the whole transmit queue.
+ *
+ * If we're running without locks, or the transmit queue isn't busy, then
+ * we can just clear the queue now. Otherwise, we have to mark all of the
+ * packets as acknowledged, and let rxi_Start clear it later on
+ */
+static void
+rxi_AckAllInTransmitQueue(struct rx_call *call)
+{
+#ifdef RX_ENABLE_LOCKS
+ if (call->flags & RX_CALL_TQ_BUSY) {
+ rxi_SetAcksInTransmitQueue(call);
+ return;
+ }
+#endif
+ rxi_ClearTransmitQueue(call, 0);
+}
/* Clear out the transmit queue for the current call (all packets have
* been received by peer) */
static void
MUTEX_EXIT(&peer->peer_lock);
flags = call->flags;
-#ifdef RX_ENABLE_LOCKS
rxi_WaitforTQBusy(call);
-#endif /* RX_ENABLE_LOCKS */
rxi_ClearTransmitQueue(call, 1);
if (call->tqWaiters || (flags & RX_CALL_TQ_WAIT)) {