static void rxi_KeepAliveOn(struct rx_call *call);
static void rxi_GrowMTUOn(struct rx_call *call);
static void rxi_ChallengeOn(struct rx_connection *conn);
-
-#ifdef RX_ENABLE_LOCKS
static int rxi_CheckCall(struct rx_call *call, int haveCTLock);
-static void rxi_SetAcksInTransmitQueue(struct rx_call *call);
-#else
-static int rxi_CheckCall(struct rx_call *call);
-#endif
+static void rxi_AckAllInTransmitQueue(struct rx_call *call);
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
struct rx_tq_debug {
rx_atomic_t rxi_start_aborted; /* rxi_start awoke after rxi_Send in error.*/
rx_atomic_t rxi_start_in_error;
} rx_tq_debug;
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif /* RX_ENABLE_LOCKS */
/* Constant delay time before sending an acknowledge of the last packet
* received. This is to avoid sending an extra acknowledge when the
USERPRI;
}
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
/* Wait for the transmit queue to no longer be busy.
* requires the call->lock to be held */
void
while (!call->error && (call->flags & RX_CALL_TQ_BUSY)) {
call->flags |= RX_CALL_TQ_WAIT;
call->tqWaiters++;
-#ifdef RX_ENABLE_LOCKS
osirx_AssertMine(&call->lock, "rxi_WaitforTQ lock");
CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
- osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
call->tqWaiters--;
if (call->tqWaiters == 0) {
call->flags &= ~RX_CALL_TQ_WAIT;
/* remember start time for call in case we have hard dead time limit */
call->queueTime = queueTime;
clock_GetTime(&call->startTime);
- call->bytesSent = 0;
- call->bytesRcvd = 0;
+ call->app.bytesSent = 0;
+ call->app.bytesRcvd = 0;
/* Turn on busy protocol. */
rxi_KeepAliveOn(call);
* run (see code above that avoids resource starvation).
*/
#ifdef RX_ENABLE_LOCKS
+ if (call->flags & (RX_CALL_TQ_BUSY | RX_CALL_TQ_CLEARME)) {
+ osi_Panic("rx_NewCall call about to be used without an empty tq");
+ }
+
CV_BROADCAST(&conn->conn_call_cv);
#else
osi_rxWakeup(conn);
#endif
MUTEX_EXIT(&conn->conn_call_lock);
-
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- if (call->flags & (RX_CALL_TQ_BUSY | RX_CALL_TQ_CLEARME)) {
- osi_Panic("rx_NewCall call about to be used without an empty tq");
- }
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
-
MUTEX_EXIT(&call->lock);
USERPRI;
tservice = rxi_AllocService();
NETPRI;
-#ifdef RX_ENABLE_LOCKS
MUTEX_INIT(&tservice->svc_data_lock, "svc data lock", MUTEX_DEFAULT, 0);
-#endif
for (i = 0; i < RX_MAX_SERVICES; i++) {
struct rx_service *service = rx_services[i];
rxi_NewCall(struct rx_connection *conn, int channel)
{
struct rx_call *call;
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
struct rx_call *cp; /* Call pointer temp */
struct opr_queue *cursor;
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif
dpf(("rxi_NewCall(conn %"AFS_PTR_FMT", channel %d)\n", conn, channel));
* rxi_FreeCall */
MUTEX_ENTER(&rx_freeCallQueue_lock);
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
/*
* EXCEPT that the TQ might not yet be cleared out.
* Skip over those with in-use TQs.
}
}
if (call) {
-#else /* AFS_GLOBAL_RXLOCK_KERNEL */
+#else /* RX_ENABLE_LOCKS */
if (!opr_queue_IsEmpty(&rx_freeCallQueue)) {
call = opr_queue_First(&rx_freeCallQueue, struct rx_call, entry);
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif /* RX_ENABLE_LOCKS */
opr_queue_Remove(&call->entry);
if (rx_stats_active)
rx_atomic_dec(&rx_stats.nFreeCallStructs);
MUTEX_EXIT(&rx_freeCallQueue_lock);
MUTEX_ENTER(&call->lock);
CLEAR_CALL_QUEUE_LOCK(call);
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
/* Now, if TQ wasn't cleared earlier, do it now. */
rxi_WaitforTQBusy(call);
if (call->flags & RX_CALL_TQ_CLEARME) {
rxi_ClearTransmitQueue(call, 1);
/*queue_Init(&call->tq);*/
}
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif /* RX_ENABLE_LOCKS */
/* Bind the call to its connection structure */
call->conn = conn;
rxi_ResetCall(call, 1);
MUTEX_ENTER(&rx_freeCallQueue_lock);
SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
/* A call may be free even though its transmit queue is still in use.
* Since we search the call list from head to tail, put busy calls at
* the head of the list, and idle calls at the tail.
opr_queue_Prepend(&rx_freeCallQueue, &call->entry);
else
opr_queue_Append(&rx_freeCallQueue, &call->entry);
-#else /* AFS_GLOBAL_RXLOCK_KERNEL */
+#else /* RX_ENABLE_LOCKS */
opr_queue_Append(&rx_freeCallQueue, &call->entry);
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif /* RX_ENABLE_LOCKS */
if (rx_stats_active)
rx_atomic_inc(&rx_stats.nFreeCallStructs);
MUTEX_EXIT(&rx_freeCallQueue_lock);
MUTEX_EXIT(&conn->conn_call_lock);
}
+/*!
+ * Abort the call if the server is over the busy threshold. This
+ * can be used without requiring a call structure be initialised,
+ * or connected to a particular channel
+ */
+static_inline int
+rxi_AbortIfServerBusy(osi_socket socket, struct rx_connection *conn,
+ struct rx_packet *np)
+{
+ if ((rx_BusyThreshold > 0) &&
+ (rx_atomic_read(&rx_nWaiting) > rx_BusyThreshold)) {
+ rxi_SendRawAbort(socket, conn->peer->host, conn->peer->port,
+ rx_BusyError, np, 0);
+ if (rx_stats_active)
+ rx_atomic_inc(&rx_stats.nBusies);
+ return 1;
+ }
+
+ return 0;
+}
+
+static_inline struct rx_call *
+rxi_ReceiveClientCall(struct rx_packet *np, struct rx_connection *conn)
+{
+ int channel;
+ struct rx_call *call;
+
+ channel = np->header.cid & RX_CHANNELMASK;
+ MUTEX_ENTER(&conn->conn_call_lock);
+ call = conn->call[channel];
+ if (!call || conn->callNumber[channel] != np->header.callNumber) {
+ MUTEX_EXIT(&conn->conn_call_lock);
+ if (rx_stats_active)
+ rx_atomic_inc(&rx_stats.spuriousPacketsRead);
+ return NULL;
+ }
+
+ MUTEX_ENTER(&call->lock);
+ MUTEX_EXIT(&conn->conn_call_lock);
+
+ if ((call->state == RX_STATE_DALLY)
+ && np->header.type == RX_PACKET_TYPE_ACK) {
+ if (rx_stats_active)
+ rx_atomic_inc(&rx_stats.ignorePacketDally);
+ MUTEX_EXIT(&call->lock);
+ return NULL;
+ }
+
+ return call;
+}
+
+static_inline struct rx_call *
+rxi_ReceiveServerCall(osi_socket socket, struct rx_packet *np,
+ struct rx_connection *conn)
+{
+ int channel;
+ struct rx_call *call;
+
+ channel = np->header.cid & RX_CHANNELMASK;
+ MUTEX_ENTER(&conn->conn_call_lock);
+ call = conn->call[channel];
+
+ if (!call) {
+ if (rxi_AbortIfServerBusy(socket, conn, np)) {
+ MUTEX_EXIT(&conn->conn_call_lock);
+ return NULL;
+ }
+
+ call = rxi_NewCall(conn, channel); /* returns locked call */
+ *call->callNumber = np->header.callNumber;
+ MUTEX_EXIT(&conn->conn_call_lock);
+
+ call->state = RX_STATE_PRECALL;
+ clock_GetTime(&call->queueTime);
+ call->app.bytesSent = 0;
+ call->app.bytesRcvd = 0;
+ rxi_KeepAliveOn(call);
+
+ return call;
+ }
+
+ if (np->header.callNumber == conn->callNumber[channel]) {
+ MUTEX_ENTER(&call->lock);
+ MUTEX_EXIT(&conn->conn_call_lock);
+ return call;
+ }
+
+ if (np->header.callNumber < conn->callNumber[channel]) {
+ MUTEX_EXIT(&conn->conn_call_lock);
+ if (rx_stats_active)
+ rx_atomic_inc(&rx_stats.spuriousPacketsRead);
+ return NULL;
+ }
+
+ MUTEX_ENTER(&call->lock);
+ MUTEX_EXIT(&conn->conn_call_lock);
+
+ /* Wait until the transmit queue is idle before deciding
+ * whether to reset the current call. Chances are that the
+ * call will be in ether DALLY or HOLD state once the TQ_BUSY
+ * flag is cleared.
+ */
+#ifdef RX_ENABLE_LOCKS
+ if (call->state == RX_STATE_ACTIVE) {
+ rxi_WaitforTQBusy(call);
+ /* If we entered error state while waiting,
+ * must call rxi_CallError to permit rxi_ResetCall
+ * to processed when the tqWaiter count hits zero.
+ */
+ if (call->error) {
+ rxi_CallError(call, call->error);
+ MUTEX_EXIT(&call->lock);
+ return NULL;
+ }
+ }
+#endif /* RX_ENABLE_LOCKS */
+ /* If the new call cannot be taken right now send a busy and set
+ * the error condition in this call, so that it terminates as
+ * quickly as possible */
+ if (call->state == RX_STATE_ACTIVE) {
+ rxi_CallError(call, RX_CALL_DEAD);
+ rxi_SendSpecial(call, conn, NULL, RX_PACKET_TYPE_BUSY,
+ NULL, 0, 1);
+ MUTEX_EXIT(&call->lock);
+ return NULL;
+ }
+
+ if (rxi_AbortIfServerBusy(socket, conn, np)) {
+ MUTEX_EXIT(&call->lock);
+ return NULL;
+ }
+
+ rxi_ResetCall(call, 0);
+ /* The conn_call_lock is not held but no one else should be
+ * using this call channel while we are processing this incoming
+ * packet. This assignment should be safe.
+ */
+ *call->callNumber = np->header.callNumber;
+ call->state = RX_STATE_PRECALL;
+ clock_GetTime(&call->queueTime);
+ call->app.bytesSent = 0;
+ call->app.bytesRcvd = 0;
+ rxi_KeepAliveOn(call);
+
+ return call;
+}
+
+
/* There are two packet tracing routines available for testing and monitoring
* Rx. One is called just after every packet is received and the other is
* called just before every packet is sent. Received packets, have had their
{
struct rx_call *call;
struct rx_connection *conn;
- int channel;
- afs_uint32 currentCallNumber;
int type;
#ifdef RXDEBUG
char *packetType;
}
}
- channel = np->header.cid & RX_CHANNELMASK;
- MUTEX_ENTER(&conn->conn_call_lock);
- call = conn->call[channel];
+ if (type == RX_SERVER_CONNECTION)
+ call = rxi_ReceiveServerCall(socket, np, conn);
+ else
+ call = rxi_ReceiveClientCall(np, conn);
- if (call) {
- MUTEX_ENTER(&call->lock);
- currentCallNumber = conn->callNumber[channel];
- MUTEX_EXIT(&conn->conn_call_lock);
- } else if (type == RX_SERVER_CONNECTION) { /* No call allocated */
- call = conn->call[channel];
- if (call) {
- MUTEX_ENTER(&call->lock);
- currentCallNumber = conn->callNumber[channel];
- MUTEX_EXIT(&conn->conn_call_lock);
- } else {
- call = rxi_NewCall(conn, channel); /* returns locked call */
- *call->callNumber = currentCallNumber = np->header.callNumber;
- MUTEX_EXIT(&conn->conn_call_lock);
-#ifdef RXDEBUG
- if (np->header.callNumber == 0)
- dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" len %d\n",
- np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port),
- np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq,
- np->header.flags, np, np->length));
-#endif
- call->state = RX_STATE_PRECALL;
- clock_GetTime(&call->queueTime);
- call->bytesSent = 0;
- call->bytesRcvd = 0;
- /*
- * If the number of queued calls exceeds the overload
- * threshold then abort this call.
- */
- if ((rx_BusyThreshold > 0) &&
- (rx_atomic_read(&rx_nWaiting) > rx_BusyThreshold)) {
- struct rx_packet *tp;
-
- rxi_CallError(call, rx_BusyError);
- tp = rxi_SendCallAbort(call, np, 1, 0);
- MUTEX_EXIT(&call->lock);
- putConnection(conn);
- if (rx_stats_active)
- rx_atomic_inc(&rx_stats.nBusies);
- return tp;
- }
- rxi_KeepAliveOn(call);
- }
- } else { /* RX_CLIENT_CONNECTION and No call allocated */
- /* This packet can't be for this call. If the new call address is
- * 0 then no call is running on this channel. If there is a call
- * then, since this is a client connection we're getting data for
- * it must be for the previous call.
- */
- MUTEX_EXIT(&conn->conn_call_lock);
- if (rx_stats_active)
- rx_atomic_inc(&rx_stats.spuriousPacketsRead);
+ if (call == NULL) {
putConnection(conn);
- return np;
- }
-
- /* There is a non-NULL locked call at this point */
- if (type == RX_SERVER_CONNECTION) { /* We're the server */
- if (np->header.callNumber < currentCallNumber) {
- MUTEX_EXIT(&call->lock);
- if (rx_stats_active)
- rx_atomic_inc(&rx_stats.spuriousPacketsRead);
- putConnection(conn);
- return np;
- } else if (np->header.callNumber != currentCallNumber) {
- /* Wait until the transmit queue is idle before deciding
- * whether to reset the current call. Chances are that the
- * call will be in ether DALLY or HOLD state once the TQ_BUSY
- * flag is cleared.
- */
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- if (call->state == RX_STATE_ACTIVE) {
- rxi_WaitforTQBusy(call);
- /*
- * If we entered error state while waiting,
- * must call rxi_CallError to permit rxi_ResetCall
- * to processed when the tqWaiter count hits zero.
- */
- if (call->error) {
- rxi_CallError(call, call->error);
- MUTEX_EXIT(&call->lock);
- putConnection(conn);
- return np;
- }
- }
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- /* If the new call cannot be taken right now send a busy and set
- * the error condition in this call, so that it terminates as
- * quickly as possible */
- if (call->state == RX_STATE_ACTIVE) {
- struct rx_packet *tp;
-
- rxi_CallError(call, RX_CALL_DEAD);
- tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY,
- NULL, 0, 1);
- MUTEX_EXIT(&call->lock);
- putConnection(conn);
- return tp;
- }
- rxi_ResetCall(call, 0);
- /*
- * The conn_call_lock is not held but no one else should be
- * using this call channel while we are processing this incoming
- * packet. This assignment should be safe.
- */
- *call->callNumber = np->header.callNumber;
-#ifdef RXDEBUG
- if (np->header.callNumber == 0)
- dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" len %d\n",
- np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port),
- np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq,
- np->header.flags, np, np->length));
-#endif
- call->state = RX_STATE_PRECALL;
- clock_GetTime(&call->queueTime);
- call->bytesSent = 0;
- call->bytesRcvd = 0;
- /*
- * If the number of queued calls exceeds the overload
- * threshold then abort this call.
- */
- if ((rx_BusyThreshold > 0) &&
- (rx_atomic_read(&rx_nWaiting) > rx_BusyThreshold)) {
- struct rx_packet *tp;
-
- rxi_CallError(call, rx_BusyError);
- tp = rxi_SendCallAbort(call, np, 1, 0);
- MUTEX_EXIT(&call->lock);
- putConnection(conn);
- if (rx_stats_active)
- rx_atomic_inc(&rx_stats.nBusies);
- return tp;
- }
- rxi_KeepAliveOn(call);
- } else {
- /* Continuing call; do nothing here. */
- }
- } else { /* we're the client */
- /* Ignore all incoming acknowledgements for calls in DALLY state */
- if ((call->state == RX_STATE_DALLY)
- && (np->header.type == RX_PACKET_TYPE_ACK)) {
- if (rx_stats_active)
- rx_atomic_inc(&rx_stats.ignorePacketDally);
- MUTEX_EXIT(&call->lock);
- putConnection(conn);
- return np;
- }
-
- /* Ignore anything that's not relevant to the current call. If there
- * isn't a current call, then no packet is relevant. */
- if (np->header.callNumber != currentCallNumber) {
- if (rx_stats_active)
- rx_atomic_inc(&rx_stats.spuriousPacketsRead);
- MUTEX_EXIT(&call->lock);
- putConnection(conn);
- return np;
- }
- /* If the service security object index stamped in the packet does not
- * match the connection's security index, ignore the packet */
- if (np->header.securityIndex != conn->securityIndex) {
- MUTEX_EXIT(&call->lock);
- putConnection(conn);
- return np;
- }
-
- /* If we're receiving the response, then all transmit packets are
- * implicitly acknowledged. Get rid of them. */
- if (np->header.type == RX_PACKET_TYPE_DATA) {
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- /* XXX Hack. Because we must release the global rx lock when
- * sending packets (osi_NetSend) we drop all acks while we're
- * traversing the tq in rxi_Start sending packets out because
- * packets may move to the freePacketQueue as result of being here!
- * So we drop these packets until we're safely out of the
- * traversing. Really ugly!
- * For fine grain RX locking, we set the acked field in the
- * packets and let rxi_Start remove them from the transmit queue.
- */
- if (call->flags & RX_CALL_TQ_BUSY) {
-#ifdef RX_ENABLE_LOCKS
- rxi_SetAcksInTransmitQueue(call);
-#else
- putConnection(conn);
- return np; /* xmitting; drop packet */
-#endif
- } else {
- rxi_ClearTransmitQueue(call, 0);
- }
-#else /* AFS_GLOBAL_RXLOCK_KERNEL */
- rxi_ClearTransmitQueue(call, 0);
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- } else {
- if (np->header.type == RX_PACKET_TYPE_ACK) {
- /* now check to see if this is an ack packet acknowledging that the
- * server actually *lost* some hard-acked data. If this happens we
- * ignore this packet, as it may indicate that the server restarted in
- * the middle of a call. It is also possible that this is an old ack
- * packet. We don't abort the connection in this case, because this
- * *might* just be an old ack packet. The right way to detect a server
- * restart in the midst of a call is to notice that the server epoch
- * changed, btw. */
- /* XXX I'm not sure this is exactly right, since tfirst **IS**
- * XXX unacknowledged. I think that this is off-by-one, but
- * XXX I don't dare change it just yet, since it will
- * XXX interact badly with the server-restart detection
- * XXX code in receiveackpacket. */
- if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
- if (rx_stats_active)
- rx_atomic_inc(&rx_stats.spuriousPacketsRead);
- MUTEX_EXIT(&call->lock);
- putConnection(conn);
- return np;
- }
- }
- } /* else not a data packet */
+ return np;
}
osirx_AssertMine(&call->lock, "rxi_ReceivePacket middle");
/* Now do packet type-specific processing */
switch (np->header.type) {
case RX_PACKET_TYPE_DATA:
+ /* If we're a client, and receiving a response, then all the packets
+ * we transmitted packets are implicitly acknowledged. */
+ if (type == RX_CLIENT_CONNECTION && !opr_queue_IsEmpty(&call->tq))
+ rxi_AckAllInTransmitQueue(call);
+
np = rxi_ReceiveDataPacket(call, np, 1, socket, host, port, tnop,
newcallp);
break;
case RX_PACKET_TYPE_ACKALL:
/* All packets acknowledged, so we can drop all packets previously
* readied for sending */
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- /* XXX Hack. We because we can't release the global rx lock when
- * sending packets (osi_NetSend) we drop all ack pkts while we're
- * traversing the tq in rxi_Start sending packets out because
- * packets may move to the freePacketQueue as result of being
- * here! So we drop these packets until we're safely out of the
- * traversing. Really ugly!
- * For fine grain RX locking, we set the acked field in the packets
- * and let rxi_Start remove the packets from the transmit queue.
- */
- if (call->flags & RX_CALL_TQ_BUSY) {
-#ifdef RX_ENABLE_LOCKS
- rxi_SetAcksInTransmitQueue(call);
- break;
-#else /* RX_ENABLE_LOCKS */
- MUTEX_EXIT(&call->lock);
- putConnection(conn);
- return np; /* xmitting; drop packet */
-#endif /* RX_ENABLE_LOCKS */
- }
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- rxi_ClearTransmitQueue(call, 0);
+ rxi_AckAllInTransmitQueue(call);
break;
default:
/* Should not reach here, unless the peer is broken: send an abort
rxi_ComputeRoundTripTime(tp, ap, call, peer, &now);
}
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- /* XXX Hack. Because we have to release the global rx lock when sending
+#ifdef RX_ENABLE_LOCKS
+ /* XXX Hack. Because we have to release the global call lock when sending
* packets (osi_NetSend) we drop all acks while we're traversing the tq
* in rxi_Start sending packets out because packets may move to the
* freePacketQueue as result of being here! So we drop these packets until
* when it's done transmitting.
*/
if (call->flags & RX_CALL_TQ_BUSY) {
-#ifdef RX_ENABLE_LOCKS
tp->flags |= RX_PKTFLAG_ACKED;
call->flags |= RX_CALL_TQ_SOME_ACKED;
-#else /* RX_ENABLE_LOCKS */
- break;
-#endif /* RX_ENABLE_LOCKS */
} else
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif /* RX_ENABLE_LOCKS */
{
opr_queue_Remove(&tp->entry);
#ifdef RX_TRACK_PACKETS
#endif /* RX_ENABLE_LOCKS */
}
-
#ifdef RX_ENABLE_LOCKS
/* Set ack in all packets in transmit queue. rxi_Start will deal with
* clearing them out.
}
#endif /* RX_ENABLE_LOCKS */
+/*!
+ * Acknowledge the whole transmit queue.
+ *
+ * If we're running without locks, or the transmit queue isn't busy, then
+ * we can just clear the queue now. Otherwise, we have to mark all of the
+ * packets as acknowledged, and let rxi_Start clear it later on
+ */
+static void
+rxi_AckAllInTransmitQueue(struct rx_call *call)
+{
+#ifdef RX_ENABLE_LOCKS
+ if (call->flags & RX_CALL_TQ_BUSY) {
+ rxi_SetAcksInTransmitQueue(call);
+ return;
+ }
+#endif
+ rxi_ClearTransmitQueue(call, 0);
+}
/* Clear out the transmit queue for the current call (all packets have
* been received by peer) */
static void
rxi_ClearTransmitQueue(struct rx_call *call, int force)
{
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
struct opr_queue *cursor;
if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
int someAcked = 0;
call->flags |= RX_CALL_TQ_SOME_ACKED;
}
} else {
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif /* RX_ENABLE_LOCKS */
#ifdef RXDEBUG_PACKET
call->tqc -=
#endif /* RXDEBUG_PACKET */
rxi_FreePackets(0, &call->tq);
rxi_WakeUpTransmitQueue(call);
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
call->flags &= ~RX_CALL_TQ_CLEARME;
}
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif
rxi_rto_cancel(call);
call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
if (call->error)
error = call->error;
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
if (!((call->flags & RX_CALL_TQ_BUSY) || (call->tqWaiters > 0))) {
rxi_ResetCall(call, 0);
}
MUTEX_EXIT(&peer->peer_lock);
flags = call->flags;
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
rxi_WaitforTQBusy(call);
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
rxi_ClearTransmitQueue(call, 1);
if (call->tqWaiters || (flags & RX_CALL_TQ_WAIT)) {
* the listener or event threads
*/
if ((list[len - 1]->header.flags & RX_LAST_PACKET)
- || call->app.mode == RX_MODE_RECEIVING || call->app.mode == RX_MODE_EOF
+ || (call->flags & RX_CALL_FLUSH)
|| (call->flags & RX_CALL_FAST_RECOVER)) {
/* Check for the case where the current list contains
* an acked packet. Since we always send retransmissions
int maxXmitPackets;
if (call->error) {
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
if (rx_stats_active)
rx_atomic_inc(&rx_tq_debug.rxi_start_in_error);
#endif
* But check whether we're here recursively, and let the other guy
* do the work.
*/
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
if (!(call->flags & RX_CALL_TQ_BUSY)) {
call->flags |= RX_CALL_TQ_BUSY;
do {
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif /* RX_ENABLE_LOCKS */
restart:
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
call->flags &= ~RX_CALL_NEED_START;
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif /* RX_ENABLE_LOCKS */
nXmitPackets = 0;
maxXmitPackets = MIN(call->twind, call->cwind);
for (opr_queue_Scan(&call->tq, cursor)) {
istack);
}
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
if (call->error) {
/* We went into the error state while sending packets. Now is
* the time to reset the call. This will also inform the using
rxi_CallError(call, call->error);
return;
}
-#ifdef RX_ENABLE_LOCKS
+
if (call->flags & RX_CALL_TQ_SOME_ACKED) {
int missing;
call->flags &= ~RX_CALL_TQ_SOME_ACKED;
if (!missing)
call->flags |= RX_CALL_TQ_CLEARME;
}
-#endif /* RX_ENABLE_LOCKS */
if (call->flags & RX_CALL_TQ_CLEARME)
rxi_ClearTransmitQueue(call, 1);
} while (call->flags & RX_CALL_NEED_START);
/*
* TQ references no longer protected by this flag; they must remain
- * protected by the global lock.
+ * protected by the call lock.
*/
call->flags &= ~RX_CALL_TQ_BUSY;
rxi_WakeUpTransmitQueue(call);
} else {
call->flags |= RX_CALL_NEED_START;
}
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+#endif /* RX_ENABLE_LOCKS */
} else {
rxi_rto_cancel(call);
}
* may be freed!
* haveCTLock Set if calling from rxi_ReapConnections
*/
-#ifdef RX_ENABLE_LOCKS
static int
rxi_CheckCall(struct rx_call *call, int haveCTLock)
-#else /* RX_ENABLE_LOCKS */
-static int
-rxi_CheckCall(struct rx_call *call)
-#endif /* RX_ENABLE_LOCKS */
{
struct rx_connection *conn = call->conn;
afs_uint32 now;
return -1;
}
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
+#ifdef RX_ENABLE_LOCKS
if (call->flags & RX_CALL_TQ_BUSY) {
/* Call is active and will be reset by rxi_Start if it's
* in an error state.
now = clock_Sec();
-#ifdef RX_ENABLE_LOCKS
if (rxi_CheckCall(call, 0)) {
MUTEX_EXIT(&call->lock);
return;
}
-#else /* RX_ENABLE_LOCKS */
- if (rxi_CheckCall(call))
- return;
-#endif /* RX_ENABLE_LOCKS */
/* Don't try to keep alive dallying calls */
if (call->state == RX_STATE_DALLY) {
call->growMTUEvent = NULL;
}
-#ifdef RX_ENABLE_LOCKS
if (rxi_CheckCall(call, 0)) {
MUTEX_EXIT(&call->lock);
return;
}
-#else /* RX_ENABLE_LOCKS */
- if (rxi_CheckCall(call))
- return;
-#endif /* RX_ENABLE_LOCKS */
/* Don't bother with dallying calls */
if (call->state == RX_STATE_DALLY) {
code = MUTEX_TRYENTER(&call->lock);
if (!code)
continue;
-#ifdef RX_ENABLE_LOCKS
result = rxi_CheckCall(call, 1);
-#else /* RX_ENABLE_LOCKS */
- result = rxi_CheckCall(call);
-#endif /* RX_ENABLE_LOCKS */
MUTEX_EXIT(&call->lock);
if (result == -2) {
/* If CheckCall freed the call, it might