/* RX: Extended Remote Procedure Call */
#include <afsconfig.h>
-#ifdef KERNEL
-#include "afs/param.h"
-#else
#include <afs/param.h>
-#endif
-
#ifdef KERNEL
-#include "afs/sysincludes.h"
-#include "afsincludes.h"
-#ifndef UKERNEL
-#include "h/types.h"
-#include "h/time.h"
-#include "h/stat.h"
-#ifdef AFS_OSF_ENV
-#include <net/net_globals.h>
-#endif /* AFS_OSF_ENV */
-#ifdef AFS_LINUX20_ENV
-#include "h/socket.h"
-#endif
-#include "netinet/in.h"
-#ifdef AFS_SUN57_ENV
-#include "inet/common.h"
-#include "inet/ip.h"
-#include "inet/ip_ire.h"
-#endif
-#include "afs/afs_args.h"
-#include "afs/afs_osi.h"
-#ifdef RX_KERNEL_TRACE
-#include "rx_kcommon.h"
-#endif
-#if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
-#include "h/systm.h"
-#endif
-#ifdef RXDEBUG
-#undef RXDEBUG /* turn off debugging */
-#endif /* RXDEBUG */
-#if defined(AFS_SGI_ENV)
-#include "sys/debug.h"
-#endif
-#include "afsint.h"
-#ifdef AFS_OSF_ENV
-#undef kmem_alloc
-#undef kmem_free
-#undef mem_alloc
-#undef mem_free
-#endif /* AFS_OSF_ENV */
-#else /* !UKERNEL */
-#include "afs/sysincludes.h"
-#include "afsincludes.h"
-#endif /* !UKERNEL */
-#include "afs/lock.h"
-#include "rx_kmutex.h"
-#include "rx_kernel.h"
-#include "rx_clock.h"
-#include "rx_queue.h"
-#include "rx.h"
-#include "rx_globals.h"
-#include "rx_trace.h"
-#include "rx_atomic.h"
-#include "rx_internal.h"
-#include "rx_stats.h"
-#define AFSOP_STOP_RXCALLBACK 210 /* Stop CALLBACK process */
-#define AFSOP_STOP_AFS 211 /* Stop AFS process */
-#define AFSOP_STOP_BKG 212 /* Stop BKG process */
-#include "afsint.h"
+# include "afs/sysincludes.h"
+# include "afsincludes.h"
+# ifndef UKERNEL
+# include "h/types.h"
+# include "h/time.h"
+# include "h/stat.h"
+# ifdef AFS_LINUX20_ENV
+# include "h/socket.h"
+# endif
+# include "netinet/in.h"
+# ifdef AFS_SUN5_ENV
+# include "netinet/ip6.h"
+# include "inet/common.h"
+# include "inet/ip.h"
+# include "inet/ip_ire.h"
+# endif
+# include "afs/afs_args.h"
+# include "afs/afs_osi.h"
+# ifdef RX_KERNEL_TRACE
+# include "rx_kcommon.h"
+# endif
+# if defined(AFS_AIX_ENV)
+# include "h/systm.h"
+# endif
+# ifdef RXDEBUG
+# undef RXDEBUG /* turn off debugging */
+# endif /* RXDEBUG */
+# if defined(AFS_SGI_ENV)
+# include "sys/debug.h"
+# endif
+# else /* !UKERNEL */
+# include "afs/sysincludes.h"
+# include "afsincludes.h"
+# endif /* !UKERNEL */
+# include "afs/lock.h"
+# include "rx_kmutex.h"
+# include "rx_kernel.h"
+# define AFSOP_STOP_RXCALLBACK 210 /* Stop CALLBACK process */
+# define AFSOP_STOP_AFS 211 /* Stop AFS process */
+# define AFSOP_STOP_BKG 212 /* Stop BKG process */
extern afs_int32 afs_termState;
-#ifdef AFS_AIX41_ENV
-#include "sys/lockl.h"
-#include "sys/lock_def.h"
-#endif /* AFS_AIX41_ENV */
+# ifdef AFS_AIX41_ENV
+# include "sys/lockl.h"
+# include "sys/lock_def.h"
+# endif /* AFS_AIX41_ENV */
# include "afs/rxgen_consts.h"
#else /* KERNEL */
-# include <sys/types.h>
-# include <string.h>
-# include <stdarg.h>
-# include <errno.h>
-# ifdef HAVE_STDINT_H
-# include <stdint.h>
+# include <roken.h>
+
+# ifdef AFS_NT40_ENV
+# include <afs/afsutil.h>
+# include <WINNT\afsreg.h>
# endif
-#ifdef AFS_NT40_ENV
-# include <stdlib.h>
-# include <fcntl.h>
-# include <afs/afsutil.h>
-# include <WINNT\afsreg.h>
-#else
-# include <sys/socket.h>
-# include <sys/file.h>
-# include <netdb.h>
-# include <sys/stat.h>
-# include <netinet/in.h>
-# include <sys/time.h>
-#endif
-# include "rx.h"
+
+# include <afs/opr.h>
+
# include "rx_user.h"
-# include "rx_clock.h"
-# include "rx_queue.h"
-# include "rx_atomic.h"
-# include "rx_globals.h"
-# include "rx_trace.h"
-# include "rx_internal.h"
-# include "rx_stats.h"
-# include <afs/rxgen_consts.h>
#endif /* KERNEL */
+#include "rx.h"
+#include "rx_clock.h"
+#include "rx_queue.h"
+#include "rx_atomic.h"
+#include "rx_globals.h"
+#include "rx_trace.h"
+#include "rx_internal.h"
+#include "rx_stats.h"
+#include "rx_event.h"
+
+#include "rx_peer.h"
+#include "rx_conn.h"
+#include "rx_call.h"
+#include "rx_packet.h"
+
+#include <afs/rxgen_consts.h>
+
#ifndef KERNEL
#ifdef AFS_PTHREAD_ENV
#ifndef AFS_NT40_ENV
/* Local static routines */
static void rxi_DestroyConnectionNoLock(struct rx_connection *conn);
-static void rxi_ComputeRoundTripTime(struct rx_packet *, struct clock *,
- struct rx_peer *, struct clock *);
+static void rxi_ComputeRoundTripTime(struct rx_packet *, struct rx_ackPacket *,
+ struct rx_call *, struct rx_peer *,
+ struct clock *);
+static void rxi_Resend(struct rxevent *event, void *arg0, void *arg1,
+ int istack);
+static void rxi_SendDelayedAck(struct rxevent *event, void *call,
+ void *dummy, int dummy2);
+static void rxi_SendDelayedCallAbort(struct rxevent *event, void *arg1,
+ void *dummy, int dummy2);
+static void rxi_SendDelayedConnAbort(struct rxevent *event, void *arg1,
+ void *unused, int unused2);
+static void rxi_ReapConnections(struct rxevent *unused, void *unused1,
+ void *unused2, int unused3);
+static struct rx_packet *rxi_SendCallAbort(struct rx_call *call,
+ struct rx_packet *packet,
+ int istack, int force);
+static void rxi_AckAll(struct rx_call *call);
+static struct rx_connection
+ *rxi_FindConnection(osi_socket socket, afs_uint32 host, u_short port,
+ u_short serviceId, afs_uint32 cid,
+ afs_uint32 epoch, int type, u_int securityIndex);
+static struct rx_packet
+ *rxi_ReceiveDataPacket(struct rx_call *call, struct rx_packet *np,
+ int istack, osi_socket socket,
+ afs_uint32 host, u_short port, int *tnop,
+ struct rx_call **newcallp);
+static struct rx_packet
+ *rxi_ReceiveAckPacket(struct rx_call *call, struct rx_packet *np,
+ int istack);
+static struct rx_packet
+ *rxi_ReceiveResponsePacket(struct rx_connection *conn,
+ struct rx_packet *np, int istack);
+static struct rx_packet
+ *rxi_ReceiveChallengePacket(struct rx_connection *conn,
+ struct rx_packet *np, int istack);
+static void rxi_AttachServerProc(struct rx_call *call, osi_socket socket,
+ int *tnop, struct rx_call **newcallp);
+static void rxi_ClearTransmitQueue(struct rx_call *call, int force);
+static void rxi_ClearReceiveQueue(struct rx_call *call);
+static void rxi_ResetCall(struct rx_call *call, int newcall);
+static void rxi_ScheduleKeepAliveEvent(struct rx_call *call);
+static void rxi_ScheduleNatKeepAliveEvent(struct rx_connection *conn);
+static void rxi_ScheduleGrowMTUEvent(struct rx_call *call, int secs);
+static void rxi_KeepAliveOn(struct rx_call *call);
+static void rxi_GrowMTUOn(struct rx_call *call);
+static void rxi_ChallengeOn(struct rx_connection *conn);
#ifdef RX_ENABLE_LOCKS
+static int rxi_CheckCall(struct rx_call *call, int haveCTLock);
static void rxi_SetAcksInTransmitQueue(struct rx_call *call);
+#else
+static int rxi_CheckCall(struct rx_call *call);
#endif
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
} rx_tq_debug;
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+/* Constant delay time before sending an acknowledge of the last packet
+ * received. This is to avoid sending an extra acknowledge when the
+ * client is about to make another call, anyway, or the server is
+ * about to respond.
+ *
+ * The lastAckDelay may not exceeed 400ms without causing peers to
+ * unecessarily timeout.
+ */
+struct clock rx_lastAckDelay = {0, 400000};
+
+/* Constant delay time before sending a soft ack when none was requested.
+ * This is to make sure we send soft acks before the sender times out,
+ * Normally we wait and send a hard ack when the receiver consumes the packet
+ *
+ * This value has been 100ms in all shipping versions of OpenAFS. Changing it
+ * will require changes to the peer's RTT calculations.
+ */
+struct clock rx_softAckDelay = {0, 100000};
+
/*
* rxi_rpc_peer_stat_cnt counts the total number of peer stat structures
* currently allocated within rx. This number is used to allocate the
static unsigned int rxi_rpc_process_stat_cnt;
+/*
+ * rxi_busyChannelError is a boolean. It indicates whether or not RX_CALL_BUSY
+ * errors should be reported to the application when a call channel appears busy
+ * (inferred from the receipt of RX_PACKET_TYPE_BUSY packets on the channel),
+ * and there are other call channels in the connection that are not busy.
+ * If 0, we do not return errors upon receiving busy packets; we just keep
+ * trying on the same call channel until we hit a timeout.
+ */
+static afs_int32 rxi_busyChannelError = 0;
+
rx_atomic_t rx_nWaiting = RX_ATOMIC_INIT(0);
rx_atomic_t rx_nWaited = RX_ATOMIC_INIT(0);
afs_kmutex_t rx_atomic_mutex;
#endif
+/* Forward prototypes */
+static struct rx_call * rxi_NewCall(struct rx_connection *, int);
+
+static_inline void
+putConnection (struct rx_connection *conn) {
+ MUTEX_ENTER(&rx_refcnt_mutex);
+ conn->refCount--;
+ MUTEX_EXIT(&rx_refcnt_mutex);
+}
+
#ifdef AFS_PTHREAD_ENV
/*
extern afs_kmutex_t des_random_mutex;
extern afs_kmutex_t rx_clock_mutex;
extern afs_kmutex_t rxi_connCacheMutex;
-extern afs_kmutex_t rx_event_mutex;
-extern afs_kmutex_t osi_malloc_mutex;
extern afs_kmutex_t event_handler_mutex;
extern afs_kmutex_t listener_mutex;
extern afs_kmutex_t rx_if_init_mutex;
extern afs_kmutex_t rx_if_mutex;
-extern afs_kmutex_t rxkad_client_uid_mutex;
-extern afs_kmutex_t rxkad_random_mutex;
extern afs_kcondvar_t rx_event_handler_cond;
extern afs_kcondvar_t rx_listener_cond;
MUTEX_INIT(&rx_refcnt_mutex, "refcnts", MUTEX_DEFAULT, 0);
MUTEX_INIT(&epoch_mutex, "epoch", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_init_mutex, "init", MUTEX_DEFAULT, 0);
- MUTEX_INIT(&rx_event_mutex, "event", MUTEX_DEFAULT, 0);
- MUTEX_INIT(&des_init_mutex, "des", MUTEX_DEFAULT, 0);
- MUTEX_INIT(&des_random_mutex, "random", MUTEX_DEFAULT, 0);
- MUTEX_INIT(&osi_malloc_mutex, "malloc", MUTEX_DEFAULT, 0);
MUTEX_INIT(&event_handler_mutex, "event handler", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rxi_connCacheMutex, "conn cache", MUTEX_DEFAULT, 0);
MUTEX_INIT(&listener_mutex, "listener", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_if_init_mutex, "if init", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_if_mutex, "if", MUTEX_DEFAULT, 0);
- MUTEX_INIT(&rxkad_client_uid_mutex, "uid", MUTEX_DEFAULT, 0);
- MUTEX_INIT(&rxkad_random_mutex, "rxkad random", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_debug_mutex, "debug", MUTEX_DEFAULT, 0);
- osi_Assert(pthread_cond_init
- (&rx_event_handler_cond, (const pthread_condattr_t *)0) == 0);
- osi_Assert(pthread_cond_init(&rx_listener_cond, (const pthread_condattr_t *)0)
- == 0);
+ CV_INIT(&rx_event_handler_cond, "evhand", CV_DEFAULT, 0);
+ CV_INIT(&rx_listener_cond, "rxlisten", CV_DEFAULT, 0);
+
osi_Assert(pthread_key_create(&rx_thread_id_key, NULL) == 0);
osi_Assert(pthread_key_create(&rx_ts_info_key, NULL) == 0);
- rxkad_global_stats_init();
-
MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock", MUTEX_DEFAULT, 0);
#ifdef RX_ENABLE_LOCKS
* to manipulate the queue.
*/
-#if defined(RX_ENABLE_LOCKS) && defined(KERNEL)
+#if defined(RX_ENABLE_LOCKS)
static afs_kmutex_t rx_rpc_stats;
-void rxi_StartUnlocked(struct rxevent *event, void *call,
- void *arg1, int istack);
#endif
/* We keep a "last conn pointer" in rxi_FindConnection. The odds are
#endif /* RX_LOCKS_DB */
MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_quota_mutex, "rx_quota_mutex", MUTEX_DEFAULT, 0);
+ MUTEX_INIT(&rx_atomic_mutex, "rx_atomic_mutex", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_pthread_mutex, "rx_pthread_mutex", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_packets_mutex, "rx_packets_mutex", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_refcnt_mutex, "rx_refcnt_mutex", MUTEX_DEFAULT, 0);
rx_connDeadTime = 12;
rx_tranquil = 0; /* reset flag */
rxi_ResetStatistics();
- htable = (char *)
- osi_Alloc(rx_hashTableSize * sizeof(struct rx_connection *));
+ htable = osi_Alloc(rx_hashTableSize * sizeof(struct rx_connection *));
PIN(htable, rx_hashTableSize * sizeof(struct rx_connection *)); /* XXXXX */
memset(htable, 0, rx_hashTableSize * sizeof(struct rx_connection *));
- ptable = (char *)osi_Alloc(rx_hashTableSize * sizeof(struct rx_peer *));
+ ptable = osi_Alloc(rx_hashTableSize * sizeof(struct rx_peer *));
PIN(ptable, rx_hashTableSize * sizeof(struct rx_peer *)); /* XXXXX */
memset(ptable, 0, rx_hashTableSize * sizeof(struct rx_peer *));
#endif
if (getsockname((intptr_t)rx_socket, (struct sockaddr *)&addr, &addrlen)) {
rx_Finalize();
+ osi_Free(htable, rx_hashTableSize * sizeof(struct rx_connection *));
return -1;
}
rx_port = addr.sin_port;
rx_connHashTable = (struct rx_connection **)htable;
rx_peerHashTable = (struct rx_peer **)ptable;
- rx_lastAckDelay.sec = 0;
- rx_lastAckDelay.usec = 400000; /* 400 milliseconds */
rx_hardAckDelay.sec = 0;
rx_hardAckDelay.usec = 100000; /* 100 milliseconds */
- rx_softAckDelay.sec = 0;
- rx_softAckDelay.usec = 100000; /* 100 milliseconds */
rxevent_Init(20, rxi_ReScheduleEvents);
rx_GetIFInfo();
#endif
+#if defined(RXK_LISTENER_ENV) || !defined(KERNEL)
/* Start listener process (exact function is dependent on the
* implementation environment--kernel or user space) */
rxi_StartListener();
+#endif
USERPRI;
tmp_status = rxinit_status = 0;
return rx_InitHost(htonl(INADDR_ANY), port);
}
+/* RTT Timer
+ * ---------
+ *
+ * The rxi_rto functions implement a TCP (RFC2988) style algorithm for
+ * maintaing the round trip timer.
+ *
+ */
+
+/*!
+ * Start a new RTT timer for a given call and packet.
+ *
+ * There must be no resendEvent already listed for this call, otherwise this
+ * will leak events - intended for internal use within the RTO code only
+ *
+ * @param[in] call
+ * the RX call to start the timer for
+ * @param[in] lastPacket
+ * a flag indicating whether the last packet has been sent or not
+ *
+ * @pre call must be locked before calling this function
+ *
+ */
+static_inline void
+rxi_rto_startTimer(struct rx_call *call, int lastPacket, int istack)
+{
+ struct clock now, retryTime;
+
+ clock_GetTime(&now);
+ retryTime = now;
+
+ clock_Add(&retryTime, &call->rto);
+
+ /* If we're sending the last packet, and we're the client, then the server
+ * may wait for an additional 400ms before returning the ACK, wait for it
+ * rather than hitting a timeout */
+ if (lastPacket && call->conn->type == RX_CLIENT_CONNECTION)
+ clock_Addmsec(&retryTime, 400);
+
+ CALL_HOLD(call, RX_CALL_REFCOUNT_RESEND);
+ call->resendEvent = rxevent_Post(&retryTime, &now, rxi_Resend,
+ call, NULL, istack);
+}
+
+/*!
+ * Cancel an RTT timer for a given call.
+ *
+ *
+ * @param[in] call
+ * the RX call to cancel the timer for
+ *
+ * @pre call must be locked before calling this function
+ *
+ */
+
+static_inline void
+rxi_rto_cancel(struct rx_call *call)
+{
+ rxevent_Cancel(&call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
+}
+
+/*!
+ * Tell the RTO timer that we have sent a packet.
+ *
+ * If the timer isn't already running, then start it. If the timer is running,
+ * then do nothing.
+ *
+ * @param[in] call
+ * the RX call that the packet has been sent on
+ * @param[in] lastPacket
+ * A flag which is true if this is the last packet for the call
+ *
+ * @pre The call must be locked before calling this function
+ *
+ */
+
+static_inline void
+rxi_rto_packet_sent(struct rx_call *call, int lastPacket, int istack)
+{
+ if (call->resendEvent)
+ return;
+
+ rxi_rto_startTimer(call, lastPacket, istack);
+}
+
+/*!
+ * Tell the RTO timer that we have received an new ACK message
+ *
+ * This function should be called whenever a call receives an ACK that
+ * acknowledges new packets. Whatever happens, we stop the current timer.
+ * If there are unacked packets in the queue which have been sent, then
+ * we restart the timer from now. Otherwise, we leave it stopped.
+ *
+ * @param[in] call
+ * the RX call that the ACK has been received on
+ */
+
+static_inline void
+rxi_rto_packet_acked(struct rx_call *call, int istack)
+{
+ struct rx_packet *p, *nxp;
+
+ rxi_rto_cancel(call);
+
+ if (queue_IsEmpty(&call->tq))
+ return;
+
+ for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
+ if (p->header.seq > call->tfirst + call->twind)
+ return;
+
+ if (!(p->flags & RX_PKTFLAG_ACKED) && p->flags & RX_PKTFLAG_SENT) {
+ rxi_rto_startTimer(call, p->header.flags & RX_LAST_PACKET, istack);
+ return;
+ }
+ }
+}
+
+
+/**
+ * Set an initial round trip timeout for a peer connection
+ *
+ * @param[in] secs The timeout to set in seconds
+ */
+
+void
+rx_rto_setPeerTimeoutSecs(struct rx_peer *peer, int secs) {
+ peer->rtt = secs * 8000;
+}
+
+/**
+ * Enables or disables the busy call channel error (RX_CALL_BUSY).
+ *
+ * @param[in] onoff Non-zero to enable busy call channel errors.
+ *
+ * @pre Neither rx_Init nor rx_InitHost have been called yet
+ */
+void
+rx_SetBusyChannelError(afs_int32 onoff)
+{
+ osi_Assert(rxinit_status != 0);
+ rxi_busyChannelError = onoff ? 1 : 0;
+}
+
+/**
+ * Set a delayed ack event on the specified call for the given time
+ *
+ * @param[in] call - the call on which to set the event
+ * @param[in] offset - the delay from now after which the event fires
+ */
+void
+rxi_PostDelayedAckEvent(struct rx_call *call, struct clock *offset)
+{
+ struct clock now, when;
+
+ clock_GetTime(&now);
+ when = now;
+ clock_Add(&when, offset);
+
+ if (!call->delayedAckEvent
+ || clock_Gt(&call->delayedAckTime, &when)) {
+
+ rxevent_Cancel(&call->delayedAckEvent, call,
+ RX_CALL_REFCOUNT_DELAY);
+ CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
+
+ call->delayedAckEvent = rxevent_Post(&when, &now,
+ rxi_SendDelayedAck,
+ call, NULL, 0);
+ call->delayedAckTime = when;
+ }
+}
+
/* called with unincremented nRequestsRunning to see if it is OK to start
* a new thread in this service. Could be "no" for two reasons: over the
* max quota, or would prevent others from reaching their min quota.
/* Called by rx_StartServer to start up lwp's to service calls.
NExistingProcs gives the number of procs already existing, and which
therefore needn't be created. */
-void
+static void
rxi_StartServerProcs(int nExistingProcs)
{
struct rx_service *service;
}
/* Turn on reaping of idle server connections */
- rxi_ReapConnections(NULL, NULL, NULL);
+ rxi_ReapConnections(NULL, NULL, NULL, 0);
USERPRI;
for (i = 0; i < RX_MAXCALLS; i++) {
conn->twind[i] = rx_initSendWindow;
conn->rwind[i] = rx_initReceiveWindow;
+ conn->lastBusy[i] = 0;
}
RXS_NewConnection(securityObject, conn);
rx_SetConnIdleDeadTime(struct rx_connection *conn, int seconds)
{
conn->idleDeadTime = seconds;
+ conn->idleDeadDetection = (seconds ? 1 : 0);
rxi_CheckConnTimeouts(conn);
}
* Cleanup a connection that was destroyed in rxi_DestroyConnectioNoLock.
* NOTE: must not be called with rx_connHashTable_lock held.
*/
-void
+static void
rxi_CleanupConnection(struct rx_connection *conn)
{
/* Notify the service exporter, if requested, that this connection
MUTEX_EXIT(&conn->conn_data_lock);
/* Check for extant references to this connection */
+ MUTEX_ENTER(&conn->conn_call_lock);
for (i = 0; i < RX_MAXCALLS; i++) {
struct rx_call *call = conn->call[i];
if (call) {
/* Push the final acknowledgment out now--there
* won't be a subsequent call to acknowledge the
* last reply packets */
- rxevent_Cancel(call->delayedAckEvent, call,
+ rxevent_Cancel(&call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
if (call->state == RX_STATE_PRECALL
|| call->state == RX_STATE_ACTIVE) {
rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
} else {
- rxi_AckAll(NULL, call, 0);
+ rxi_AckAll(call);
}
}
MUTEX_EXIT(&call->lock);
}
}
}
+ MUTEX_EXIT(&conn->conn_call_lock);
+
#ifdef RX_ENABLE_LOCKS
if (!havecalls) {
if (MUTEX_TRYENTER(&conn->conn_data_lock)) {
}
if (conn->delayedAbortEvent) {
- rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
+ rxevent_Cancel(&conn->delayedAbortEvent, NULL, 0);
packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
if (packet) {
MUTEX_ENTER(&conn->conn_data_lock);
/* Make sure the connection is completely reset before deleting it. */
/* get rid of pending events that could zap us later */
- if (conn->challengeEvent)
- rxevent_Cancel(conn->challengeEvent, (struct rx_call *)0, 0);
- if (conn->checkReachEvent)
- rxevent_Cancel(conn->checkReachEvent, (struct rx_call *)0, 0);
- if (conn->natKeepAliveEvent)
- rxevent_Cancel(conn->natKeepAliveEvent, (struct rx_call *)0, 0);
+ rxevent_Cancel(&conn->challengeEvent, NULL, 0);
+ rxevent_Cancel(&conn->checkReachEvent, NULL, 0);
+ rxevent_Cancel(&conn->natKeepAliveEvent, NULL, 0);
/* Add the connection to the list of destroyed connections that
* need to be cleaned up. This is necessary to avoid deadlocks
}
#endif
+static void
+rxi_WakeUpTransmitQueue(struct rx_call *call)
+{
+ if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
+ dpf(("call %"AFS_PTR_FMT" has %d waiters and flags %d\n",
+ call, call->tqWaiters, call->flags));
+#ifdef RX_ENABLE_LOCKS
+ osirx_AssertMine(&call->lock, "rxi_Start start");
+ CV_BROADCAST(&call->cv_tq);
+#else /* RX_ENABLE_LOCKS */
+ osi_rxWakeup(&call->tq);
+#endif /* RX_ENABLE_LOCKS */
+ }
+}
+
/* Start a new rx remote procedure call, on the specified connection.
* If wait is set to 1, wait for a free call channel; otherwise return
* 0. Maxtime gives the maximum number of seconds this call may take,
struct rx_call *
rx_NewCall(struct rx_connection *conn)
{
- int i, wait;
+ int i, wait, ignoreBusy = 1;
struct rx_call *call;
struct clock queueTime;
+ afs_uint32 leastBusy = 0;
SPLVAR;
clock_NewTime();
for (i = 0; i < RX_MAXCALLS; i++) {
call = conn->call[i];
if (call) {
+ if (!ignoreBusy && conn->lastBusy[i] != leastBusy) {
+ /* we're not ignoring busy call slots; only look at the
+ * call slot that is the "least" busy */
+ continue;
+ }
+
if (call->state == RX_STATE_DALLY) {
MUTEX_ENTER(&call->lock);
if (call->state == RX_STATE_DALLY) {
+ if (ignoreBusy && conn->lastBusy[i]) {
+ /* if we're ignoring busy call slots, skip any ones that
+ * have lastBusy set */
+ if (leastBusy == 0 || conn->lastBusy[i] < leastBusy) {
+ leastBusy = conn->lastBusy[i];
+ }
+ MUTEX_EXIT(&call->lock);
+ continue;
+ }
+
/*
* We are setting the state to RX_STATE_RESET to
* ensure that no one else will attempt to use this
* effect on overall system performance.
*/
call->state = RX_STATE_RESET;
+ (*call->callNumber)++;
MUTEX_EXIT(&conn->conn_call_lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
- MUTEX_EXIT(&rx_refcnt_mutex);
rxi_ResetCall(call, 0);
- (*call->callNumber)++;
if (MUTEX_TRYENTER(&conn->conn_call_lock))
break;
* Instead, cycle through one more time to see if
* we can find a call that can call our own.
*/
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
- MUTEX_EXIT(&rx_refcnt_mutex);
wait = 0;
}
MUTEX_EXIT(&call->lock);
}
} else {
+ if (ignoreBusy && conn->lastBusy[i]) {
+ /* if we're ignoring busy call slots, skip any ones that
+ * have lastBusy set */
+ if (leastBusy == 0 || conn->lastBusy[i] < leastBusy) {
+ leastBusy = conn->lastBusy[i];
+ }
+ continue;
+ }
+
/* rxi_NewCall returns with mutex locked */
call = rxi_NewCall(conn, i);
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
- MUTEX_EXIT(&rx_refcnt_mutex);
break;
}
}
if (i < RX_MAXCALLS) {
+ conn->lastBusy[i] = 0;
+ call->flags &= ~RX_CALL_PEER_BUSY;
break;
}
if (!wait)
continue;
+ if (leastBusy && ignoreBusy) {
+ /* we didn't find a useable call slot, but we did see at least one
+ * 'busy' slot; look again and only use a slot with the 'least
+ * busy time */
+ ignoreBusy = 0;
+ continue;
+ }
MUTEX_ENTER(&conn->conn_data_lock);
conn->flags |= RX_CONN_MAKECALL_WAITING;
else
call->mode = RX_MODE_SENDING;
+#ifdef AFS_RXERRQ_ENV
+ /* remember how many network errors the peer has when we started, so if
+ * more errors are encountered after the call starts, we know the other endpoint won't be
+ * responding to us */
+ call->neterr_gen = rx_atomic_read(&conn->peer->neterrs);
+#endif
+
/* remember start time for call in case we have hard dead time limit */
call->queueTime = queueTime;
clock_GetTime(&call->startTime);
- hzero(call->bytesSent);
- hzero(call->bytesRcvd);
+ call->bytesSent = 0;
+ call->bytesRcvd = 0;
/* Turn on busy protocol. */
rxi_KeepAliveOn(call);
return call;
}
-int
+static int
rxi_HasActiveCalls(struct rx_connection *aconn)
{
int i;
SPLVAR;
NETPRI;
+ MUTEX_ENTER(&aconn->conn_call_lock);
for (i = 0; i < RX_MAXCALLS; i++) {
if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
aint32s[i] = aconn->callNumber[i] + 1;
else
aint32s[i] = aconn->callNumber[i];
}
+ MUTEX_EXIT(&aconn->conn_call_lock);
USERPRI;
return 0;
}
SPLVAR;
NETPRI;
+ MUTEX_ENTER(&aconn->conn_call_lock);
for (i = 0; i < RX_MAXCALLS; i++) {
if ((tcall = aconn->call[i]) && (tcall->state == RX_STATE_DALLY))
aconn->callNumber[i] = aint32s[i] - 1;
else
aconn->callNumber[i] = aint32s[i];
}
+ MUTEX_EXIT(&aconn->conn_call_lock);
USERPRI;
return 0;
}
}
}
+#ifdef KERNEL
+ if (afs_termState == AFSOP_STOP_RXCALLBACK) {
+#ifdef RX_ENABLE_LOCKS
+ AFS_GLOCK();
+#endif /* RX_ENABLE_LOCKS */
+ afs_termState = AFSOP_STOP_AFS;
+ afs_osi_Wakeup(&afs_termState);
+#ifdef RX_ENABLE_LOCKS
+ AFS_GUNLOCK();
+#endif /* RX_ENABLE_LOCKS */
+ return;
+ }
+#endif
+
/* if server is restarting( typically smooth shutdown) then do not
* allow any new calls.
*/
MUTEX_EXIT(&call->lock);
USERPRI;
+ continue;
}
-#ifdef KERNEL
- if (afs_termState == AFSOP_STOP_RXCALLBACK) {
-#ifdef RX_ENABLE_LOCKS
- AFS_GLOCK();
-#endif /* RX_ENABLE_LOCKS */
- afs_termState = AFSOP_STOP_AFS;
- afs_osi_Wakeup(&afs_termState);
-#ifdef RX_ENABLE_LOCKS
- AFS_GUNLOCK();
-#endif /* RX_ENABLE_LOCKS */
- return;
- }
-#endif
tservice = call->conn->service;
(*tservice->afterProc) (call, code);
rx_EndCall(call, code);
+
+ if (tservice->postProc)
+ (*tservice->postProc) (code);
+
if (rx_stats_active) {
MUTEX_ENTER(&rx_stats_mutex);
rxi_nCalls++;
struct rx_serverQueueEntry *sq;
struct rx_call *call = (struct rx_call *)0;
struct rx_service *service = NULL;
- SPLVAR;
MUTEX_ENTER(&freeSQEList_lock);
}
MUTEX_ENTER(&rx_pthread_mutex);
if (tno == rxi_fcfs_thread_num
- || !tcall->queue_item_header.next) {
+ || queue_IsLast(&rx_incomingCallQueue, tcall)) {
MUTEX_EXIT(&rx_pthread_mutex);
/* If we're the fcfs thread , then we'll just use
* this call. If we haven't been able to find an optimal
call));
MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
- MUTEX_EXIT(&rx_refcnt_mutex);
} else {
dpf(("rx_GetCall(socketp=%p, *socketp=0x%x)\n", socketp, *socketp));
}
} else {
call->state = RX_STATE_DALLY;
rxi_ClearTransmitQueue(call, 0);
- rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
- rxevent_Cancel(call->keepAliveEvent, call,
+ rxi_rto_cancel(call);
+ rxevent_Cancel(&call->keepAliveEvent, call,
RX_CALL_REFCOUNT_ALIVE);
}
} else { /* Client connection */
* and force-send it now.
*/
if (call->delayedAckEvent) {
- rxevent_Cancel(call->delayedAckEvent, call,
+ rxevent_Cancel(&call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
- call->delayedAckEvent = NULL;
- rxi_SendDelayedAck(NULL, call, NULL);
+ rxi_SendDelayedAck(NULL, call, NULL, 0);
}
/* We need to release the call lock since it's lower than the
MUTEX_EXIT(&call->lock);
MUTEX_ENTER(&conn->conn_call_lock);
MUTEX_ENTER(&call->lock);
+
+ if (!(call->flags & RX_CALL_PEER_BUSY)) {
+ conn->lastBusy[call->channel] = 0;
+ }
+
MUTEX_ENTER(&conn->conn_data_lock);
conn->flags |= RX_CONN_BUSY;
if (conn->flags & RX_CONN_MAKECALL_WAITING) {
rxi_FreePackets(0, &call->iovq);
MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_RELE(call, RX_CALL_REFCOUNT_BEGIN);
- MUTEX_EXIT(&rx_refcnt_mutex);
if (conn->type == RX_CLIENT_CONNECTION) {
MUTEX_ENTER(&conn->conn_data_lock);
conn->flags &= ~RX_CONN_BUSY;
/* Return this process's service structure for the
* specified socket and service */
-struct rx_service *
+static struct rx_service *
rxi_FindService(osi_socket socket, u_short serviceId)
{
struct rx_service **sp;
/* Allocate a call structure, for the indicated channel of the
* supplied connection. The mode and state of the call must be set by
* the caller. Returns the call with mutex locked. */
-struct rx_call *
+static struct rx_call *
rxi_NewCall(struct rx_connection *conn, int channel)
{
struct rx_call *call;
*
* call->lock amd rx_refcnt_mutex are held upon entry.
* haveCTLock is set when called from rxi_ReapConnections.
+ *
+ * return 1 if the call is freed, 0 if not.
*/
-void
+static int
rxi_FreeCall(struct rx_call *call, int haveCTLock)
{
int channel = call->channel;
struct rx_connection *conn = call->conn;
+ u_char state = call->state;
-
- if (call->state == RX_STATE_DALLY || call->state == RX_STATE_HOLD)
- (*call->callNumber)++;
- rxi_ResetCall(call, 0);
- call->conn->call[channel] = (struct rx_call *)0;
+ /*
+ * We are setting the state to RX_STATE_RESET to
+ * ensure that no one else will attempt to use this
+ * call once we drop the refcnt lock. We must drop
+ * the refcnt lock before calling rxi_ResetCall
+ * because it cannot be held across acquiring the
+ * freepktQ lock. NewCall does the same.
+ */
+ call->state = RX_STATE_RESET;
MUTEX_EXIT(&rx_refcnt_mutex);
+ rxi_ResetCall(call, 0);
+
+ if (MUTEX_TRYENTER(&conn->conn_call_lock))
+ {
+ if (state == RX_STATE_DALLY || state == RX_STATE_HOLD)
+ (*call->callNumber)++;
+
+ if (call->conn->call[channel] == call)
+ call->conn->call[channel] = 0;
+ MUTEX_EXIT(&conn->conn_call_lock);
+ } else {
+ /*
+ * We couldn't obtain the conn_call_lock so we can't
+ * disconnect the call from the connection. Set the
+ * call state to dally so that the call can be reused.
+ */
+ MUTEX_ENTER(&rx_refcnt_mutex);
+ call->state = RX_STATE_DALLY;
+ return 0;
+ }
MUTEX_ENTER(&rx_freeCallQueue_lock);
SET_CALL_QUEUE_LOCK(call, &rx_freeCallQueue_lock);
MUTEX_EXIT(&conn->conn_data_lock);
}
MUTEX_ENTER(&rx_refcnt_mutex);
+ return 1;
}
rx_atomic_t rxi_Allocsize = RX_ATOMIC_INIT(0);
MUTEX_EXIT(&rx_peerHashTable_lock);
}
-/* Find the peer process represented by the supplied (host,port)
- * combination. If there is no appropriate active peer structure, a
- * new one will be allocated and initialized
- * The origPeer, if set, is a pointer to a peer structure on which the
- * refcount will be be decremented. This is used to replace the peer
- * structure hanging off a connection structure */
-struct rx_peer *
-rxi_FindPeer(afs_uint32 host, u_short port,
- struct rx_peer *origPeer, int create)
+#ifdef AFS_RXERRQ_ENV
+static void
+rxi_SetPeerDead(afs_uint32 host, afs_uint16 port)
{
- struct rx_peer *pp;
- int hashIndex;
- hashIndex = PEER_HASH(host, port);
+ int hashIndex = PEER_HASH(host, port);
+ struct rx_peer *peer;
+
MUTEX_ENTER(&rx_peerHashTable_lock);
- for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
- if ((pp->host == host) && (pp->port == port))
+
+ for (peer = rx_peerHashTable[hashIndex]; peer; peer = peer->next) {
+ if (peer->host == host && peer->port == port) {
+ break;
+ }
+ }
+
+ if (peer) {
+ rx_atomic_inc(&peer->neterrs);
+ }
+
+ MUTEX_EXIT(&rx_peerHashTable_lock);
+}
+
+void
+rxi_ProcessNetError(struct sock_extended_err *err, afs_uint32 addr, afs_uint16 port)
+{
+# ifdef AFS_ADAPT_PMTU
+ if (err->ee_errno == EMSGSIZE && err->ee_info >= 68) {
+ rxi_SetPeerMtu(NULL, addr, port, err->ee_info - RX_IPUDP_SIZE);
+ return;
+ }
+# endif
+ if (err->ee_origin == SO_EE_ORIGIN_ICMP && err->ee_type == ICMP_DEST_UNREACH) {
+ switch (err->ee_code) {
+ case ICMP_NET_UNREACH:
+ case ICMP_HOST_UNREACH:
+ case ICMP_PORT_UNREACH:
+ case ICMP_NET_ANO:
+ case ICMP_HOST_ANO:
+ rxi_SetPeerDead(addr, port);
+ break;
+ }
+ }
+}
+#endif /* AFS_RXERRQ_ENV */
+
+/* Find the peer process represented by the supplied (host,port)
+ * combination. If there is no appropriate active peer structure, a
+ * new one will be allocated and initialized
+ * The origPeer, if set, is a pointer to a peer structure on which the
+ * refcount will be be decremented. This is used to replace the peer
+ * structure hanging off a connection structure */
+struct rx_peer *
+rxi_FindPeer(afs_uint32 host, u_short port,
+ struct rx_peer *origPeer, int create)
+{
+ struct rx_peer *pp;
+ int hashIndex;
+ hashIndex = PEER_HASH(host, port);
+ MUTEX_ENTER(&rx_peerHashTable_lock);
+ for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
+ if ((pp->host == host) && (pp->port == port))
break;
}
if (!pp) {
pp = rxi_AllocPeer(); /* This bzero's *pp */
pp->host = host; /* set here or in InitPeerParams is zero */
pp->port = port;
+#ifdef AFS_RXERRQ_ENV
+ rx_atomic_set(&pp->neterrs, 0);
+#endif
MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
- queue_Init(&pp->congestionQueue);
queue_Init(&pp->rpcStats);
pp->next = rx_peerHashTable[hashIndex];
rx_peerHashTable[hashIndex] = pp;
* parameter must match the existing index for the connection. If a
* server connection is created, it will be created using the supplied
* index, if the index is valid for this service */
-struct rx_connection *
+static struct rx_connection *
rxi_FindConnection(osi_socket socket, afs_uint32 host,
u_short port, u_short serviceId, afs_uint32 cid,
afs_uint32 epoch, int type, u_int securityIndex)
conn->lastSendTime = clock_Sec(); /* don't GC immediately */
conn->epoch = epoch;
conn->cid = cid & RX_CIDMASK;
- /* conn->serial = conn->lastSerial = 0; */
- /* conn->timeout = 0; */
conn->ackRate = RX_FAST_ACK_RATE;
conn->service = service;
conn->serviceId = serviceId;
conn->nSpecific = 0;
conn->specific = NULL;
rx_SetConnDeadTime(conn, service->connDeadTime);
- rx_SetConnIdleDeadTime(conn, service->idleDeadTime);
- rx_SetServerConnIdleDeadErr(conn, service->idleDeadErr);
+ conn->idleDeadTime = service->idleDeadTime;
+ conn->idleDeadDetection = service->idleDeadErr ? 1 : 0;
for (i = 0; i < RX_MAXCALLS; i++) {
conn->twind[i] = rx_initSendWindow;
conn->rwind[i] = rx_initReceiveWindow;
return conn;
}
+/**
+ * Timeout a call on a busy call channel if appropriate.
+ *
+ * @param[in] call The busy call.
+ *
+ * @pre 'call' is marked as busy (namely,
+ * call->conn->lastBusy[call->channel] != 0)
+ *
+ * @pre call->lock is held
+ * @pre rxi_busyChannelError is nonzero
+ *
+ * @note call->lock is dropped and reacquired
+ */
+static void
+rxi_CheckBusy(struct rx_call *call)
+{
+ struct rx_connection *conn = call->conn;
+ int channel = call->channel;
+ int freechannel = 0;
+ int i;
+ afs_uint32 callNumber;
+
+ MUTEX_EXIT(&call->lock);
+
+ MUTEX_ENTER(&conn->conn_call_lock);
+ callNumber = *call->callNumber;
+
+ /* Are there any other call slots on this conn that we should try? Look for
+ * slots that are empty and are either non-busy, or were marked as busy
+ * longer than conn->secondsUntilDead seconds before this call started. */
+
+ for (i = 0; i < RX_MAXCALLS && !freechannel; i++) {
+ if (i == channel) {
+ /* only look at channels that aren't us */
+ continue;
+ }
+
+ if (conn->lastBusy[i]) {
+ /* if this channel looked busy too recently, don't look at it */
+ if (conn->lastBusy[i] >= call->startTime.sec) {
+ continue;
+ }
+ if (call->startTime.sec - conn->lastBusy[i] < conn->secondsUntilDead) {
+ continue;
+ }
+ }
+
+ if (conn->call[i]) {
+ struct rx_call *tcall = conn->call[i];
+ MUTEX_ENTER(&tcall->lock);
+ if (tcall->state == RX_STATE_DALLY) {
+ freechannel = 1;
+ }
+ MUTEX_EXIT(&tcall->lock);
+ } else {
+ freechannel = 1;
+ }
+ }
+
+ MUTEX_ENTER(&call->lock);
+
+ /* Since the call->lock and conn->conn_call_lock have been released it is
+ * possible that (1) the call may no longer be busy and/or (2) the call may
+ * have been reused by another waiting thread. Therefore, we must confirm
+ * that the call state has not changed when deciding whether or not to
+ * force this application thread to retry by forcing a Timeout error. */
+
+ if (freechannel && *call->callNumber == callNumber &&
+ (call->flags & RX_CALL_PEER_BUSY)) {
+ /* Since 'freechannel' is set, there exists another channel in this
+ * rx_conn that the application thread might be able to use. We know
+ * that we have the correct call since callNumber is unchanged, and we
+ * know that the call is still busy. So, set the call error state to
+ * rxi_busyChannelError so the application can retry the request,
+ * presumably on a less-busy call channel. */
+
+ rxi_CallError(call, RX_CALL_BUSY);
+ }
+ MUTEX_EXIT(&conn->conn_call_lock);
+}
+
/* There are two packet tracing routines available for testing and monitoring
* Rx. One is called just after every packet is received and the other is
* called just before every packet is sent. Received packets, have had their
int channel;
afs_uint32 currentCallNumber;
int type;
- int skew;
#ifdef RXDEBUG
char *packetType;
#endif
np->header.seq, np->header.flags, np));
#endif
+ /* Account for connectionless packets */
+ if (rx_stats_active &&
+ ((np->header.type == RX_PACKET_TYPE_VERSION) ||
+ (np->header.type == RX_PACKET_TYPE_DEBUG))) {
+ struct rx_peer *peer;
+
+ /* Try to look up the peer structure, but don't create one */
+ peer = rxi_FindPeer(host, port, 0, 0);
+
+ /* Since this may not be associated with a connection, it may have
+ * no refCount, meaning we could race with ReapConnections
+ */
+
+ if (peer && (peer->refCount > 0)) {
+#ifdef AFS_RXERRQ_ENV
+ if (rx_atomic_read(&peer->neterrs)) {
+ rx_atomic_set(&peer->neterrs, 0);
+ }
+#endif
+ MUTEX_ENTER(&peer->peer_lock);
+ peer->bytesReceived += np->length;
+ MUTEX_EXIT(&peer->peer_lock);
+ }
+ }
+
if (np->header.type == RX_PACKET_TYPE_VERSION) {
return rxi_ReceiveVersionPacket(np, socket, host, port, 1);
}
np->header.cid, np->header.epoch, type,
np->header.securityIndex);
+ /* To avoid having 2 connections just abort at each other,
+ don't abort an abort. */
if (!conn) {
- /* If no connection found or fabricated, just ignore the packet.
- * (An argument could be made for sending an abort packet for
- * the conn) */
- return np;
+ if (np->header.type != RX_PACKET_TYPE_ABORT)
+ rxi_SendRawAbort(socket, host, port, RX_INVALID_OPERATION,
+ np, 0);
+ return np;
}
- MUTEX_ENTER(&conn->conn_data_lock);
- if (conn->maxSerial < np->header.serial)
- conn->maxSerial = np->header.serial;
- MUTEX_EXIT(&conn->conn_data_lock);
+#ifdef AFS_RXERRQ_ENV
+ if (rx_atomic_read(&conn->peer->neterrs)) {
+ rx_atomic_set(&conn->peer->neterrs, 0);
+ }
+#endif
+
+ /* If we're doing statistics, then account for the incoming packet */
+ if (rx_stats_active) {
+ MUTEX_ENTER(&conn->peer->peer_lock);
+ conn->peer->bytesReceived += np->length;
+ MUTEX_EXIT(&conn->peer->peer_lock);
+ }
/* If the connection is in an error state, send an abort packet and ignore
* the incoming packet */
MUTEX_ENTER(&conn->conn_data_lock);
if (np->header.type != RX_PACKET_TYPE_ABORT)
np = rxi_SendConnectionAbort(conn, np, 1, 0);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ putConnection(conn);
MUTEX_EXIT(&conn->conn_data_lock);
return np;
}
afs_int32 errcode = ntohl(rx_GetInt32(np, 0));
dpf(("rxi_ReceivePacket ABORT rx_GetInt32 = %d\n", errcode));
rxi_ConnectionError(conn, errcode);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ putConnection(conn);
return np;
}
case RX_PACKET_TYPE_CHALLENGE:
tnp = rxi_ReceiveChallengePacket(conn, np, 1);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ putConnection(conn);
return tnp;
case RX_PACKET_TYPE_RESPONSE:
tnp = rxi_ReceiveResponsePacket(conn, np, 1);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ putConnection(conn);
return tnp;
case RX_PACKET_TYPE_PARAMS:
case RX_PACKET_TYPE_PARAMS + 1:
case RX_PACKET_TYPE_PARAMS + 2:
/* ignore these packet types for now */
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ putConnection(conn);
return np;
-
default:
/* Should not reach here, unless the peer is broken: send an
* abort packet */
rxi_ConnectionError(conn, RX_PROTOCOL_ERROR);
MUTEX_ENTER(&conn->conn_data_lock);
tnp = rxi_SendConnectionAbort(conn, np, 1, 0);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ putConnection(conn);
MUTEX_EXIT(&conn->conn_data_lock);
return tnp;
}
}
channel = np->header.cid & RX_CHANNELMASK;
+ MUTEX_ENTER(&conn->conn_call_lock);
call = conn->call[channel];
-#ifdef RX_ENABLE_LOCKS
- if (call)
- MUTEX_ENTER(&call->lock);
- /* Test to see if call struct is still attached to conn. */
- if (call != conn->call[channel]) {
- if (call)
- MUTEX_EXIT(&call->lock);
- if (type == RX_SERVER_CONNECTION) {
- call = conn->call[channel];
- /* If we started with no call attached and there is one now,
- * another thread is also running this routine and has gotten
- * the connection channel. We should drop this packet in the tests
- * below. If there was a call on this connection and it's now
- * gone, then we'll be making a new call below.
- * If there was previously a call and it's now different then
- * the old call was freed and another thread running this routine
- * has created a call on this channel. One of these two threads
- * has a packet for the old call and the code below handles those
- * cases.
- */
- if (call)
- MUTEX_ENTER(&call->lock);
- } else {
- /* This packet can't be for this call. If the new call address is
- * 0 then no call is running on this channel. If there is a call
- * then, since this is a client connection we're getting data for
- * it must be for the previous call.
- */
- if (rx_stats_active)
- rx_atomic_inc(&rx_stats.spuriousPacketsRead);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
- return np;
- }
- }
-#endif
- currentCallNumber = conn->callNumber[channel];
- if (type == RX_SERVER_CONNECTION) { /* We're the server */
- if (np->header.callNumber < currentCallNumber) {
- if (rx_stats_active)
- rx_atomic_inc(&rx_stats.spuriousPacketsRead);
-#ifdef RX_ENABLE_LOCKS
- if (call)
- MUTEX_EXIT(&call->lock);
-#endif
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
- return np;
- }
- if (!call) {
- MUTEX_ENTER(&conn->conn_call_lock);
- call = rxi_NewCall(conn, channel);
- MUTEX_EXIT(&conn->conn_call_lock);
- *call->callNumber = np->header.callNumber;
+ if (call) {
+ MUTEX_ENTER(&call->lock);
+ currentCallNumber = conn->callNumber[channel];
+ MUTEX_EXIT(&conn->conn_call_lock);
+ } else if (type == RX_SERVER_CONNECTION) { /* No call allocated */
+ call = conn->call[channel];
+ if (call) {
+ MUTEX_ENTER(&call->lock);
+ currentCallNumber = conn->callNumber[channel];
+ MUTEX_EXIT(&conn->conn_call_lock);
+ } else {
+ call = rxi_NewCall(conn, channel); /* returns locked call */
+ *call->callNumber = currentCallNumber = np->header.callNumber;
+ MUTEX_EXIT(&conn->conn_call_lock);
#ifdef RXDEBUG
- if (np->header.callNumber == 0)
- dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" resend %d.%.06d len %d\n",
- np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port),
- np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq,
- np->header.flags, np, np->retryTime.sec, np->retryTime.usec / 1000, np->length));
+ if (np->header.callNumber == 0)
+ dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" len %d\n",
+ np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port),
+ np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq,
+ np->header.flags, np, np->length));
#endif
- call->state = RX_STATE_PRECALL;
- clock_GetTime(&call->queueTime);
- hzero(call->bytesSent);
- hzero(call->bytesRcvd);
- /*
- * If the number of queued calls exceeds the overload
- * threshold then abort this call.
- */
- if ((rx_BusyThreshold > 0) &&
- (rx_atomic_read(&rx_nWaiting) > rx_BusyThreshold)) {
- struct rx_packet *tp;
-
- rxi_CallError(call, rx_BusyError);
- tp = rxi_SendCallAbort(call, np, 1, 0);
- MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ call->state = RX_STATE_PRECALL;
+ clock_GetTime(&call->queueTime);
+ call->bytesSent = 0;
+ call->bytesRcvd = 0;
+ /*
+ * If the number of queued calls exceeds the overload
+ * threshold then abort this call.
+ */
+ if ((rx_BusyThreshold > 0) &&
+ (rx_atomic_read(&rx_nWaiting) > rx_BusyThreshold)) {
+ struct rx_packet *tp;
+
+ rxi_CallError(call, rx_BusyError);
+ tp = rxi_SendCallAbort(call, np, 1, 0);
+ MUTEX_EXIT(&call->lock);
+ putConnection(conn);
if (rx_stats_active)
rx_atomic_inc(&rx_stats.nBusies);
- return tp;
- }
- rxi_KeepAliveOn(call);
- } else if (np->header.callNumber != currentCallNumber) {
+ return tp;
+ }
+ rxi_KeepAliveOn(call);
+ }
+ } else { /* RX_CLIENT_CONNECTION and No call allocated */
+ /* This packet can't be for this call. If the new call address is
+ * 0 then no call is running on this channel. If there is a call
+ * then, since this is a client connection we're getting data for
+ * it must be for the previous call.
+ */
+ MUTEX_EXIT(&conn->conn_call_lock);
+ if (rx_stats_active)
+ rx_atomic_inc(&rx_stats.spuriousPacketsRead);
+ putConnection(conn);
+ return np;
+ }
+
+ /* There is a non-NULL locked call at this point */
+ if (type == RX_SERVER_CONNECTION) { /* We're the server */
+ if (np->header.callNumber < currentCallNumber) {
+ MUTEX_EXIT(&call->lock);
+ if (rx_stats_active)
+ rx_atomic_inc(&rx_stats.spuriousPacketsRead);
+ putConnection(conn);
+ return np;
+ } else if (np->header.callNumber != currentCallNumber) {
/* Wait until the transmit queue is idle before deciding
* whether to reset the current call. Chances are that the
* call will be in ether DALLY or HOLD state once the TQ_BUSY
if (call->error) {
rxi_CallError(call, call->error);
MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ putConnection(conn);
return np;
}
}
tp = rxi_SendSpecial(call, conn, np, RX_PACKET_TYPE_BUSY,
NULL, 0, 1);
MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ putConnection(conn);
return tp;
}
rxi_ResetCall(call, 0);
+ /*
+ * The conn_call_lock is not held but no one else should be
+ * using this call channel while we are processing this incoming
+ * packet. This assignment should be safe.
+ */
*call->callNumber = np->header.callNumber;
#ifdef RXDEBUG
if (np->header.callNumber == 0)
- dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" resend %d.%06d len %d\n",
+ dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %"AFS_PTR_FMT" len %d\n",
np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port),
np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq,
- np->header.flags, np, np->retryTime.sec, np->retryTime.usec, np->length));
+ np->header.flags, np, np->length));
#endif
call->state = RX_STATE_PRECALL;
clock_GetTime(&call->queueTime);
- hzero(call->bytesSent);
- hzero(call->bytesRcvd);
+ call->bytesSent = 0;
+ call->bytesRcvd = 0;
/*
* If the number of queued calls exceeds the overload
* threshold then abort this call.
rxi_CallError(call, rx_BusyError);
tp = rxi_SendCallAbort(call, np, 1, 0);
MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ putConnection(conn);
if (rx_stats_active)
rx_atomic_inc(&rx_stats.nBusies);
return tp;
}
} else { /* we're the client */
/* Ignore all incoming acknowledgements for calls in DALLY state */
- if (call && (call->state == RX_STATE_DALLY)
+ if ((call->state == RX_STATE_DALLY)
&& (np->header.type == RX_PACKET_TYPE_ACK)) {
if (rx_stats_active)
rx_atomic_inc(&rx_stats.ignorePacketDally);
-#ifdef RX_ENABLE_LOCKS
- if (call) {
- MUTEX_EXIT(&call->lock);
- }
-#endif
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ MUTEX_EXIT(&call->lock);
+ putConnection(conn);
return np;
}
/* Ignore anything that's not relevant to the current call. If there
* isn't a current call, then no packet is relevant. */
- if (!call || (np->header.callNumber != currentCallNumber)) {
+ if (np->header.callNumber != currentCallNumber) {
if (rx_stats_active)
rx_atomic_inc(&rx_stats.spuriousPacketsRead);
-#ifdef RX_ENABLE_LOCKS
- if (call) {
- MUTEX_EXIT(&call->lock);
- }
-#endif
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ MUTEX_EXIT(&call->lock);
+ putConnection(conn);
return np;
}
/* If the service security object index stamped in the packet does not
* match the connection's security index, ignore the packet */
if (np->header.securityIndex != conn->securityIndex) {
-#ifdef RX_ENABLE_LOCKS
MUTEX_EXIT(&call->lock);
-#endif
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ putConnection(conn);
return np;
}
#ifdef RX_ENABLE_LOCKS
rxi_SetAcksInTransmitQueue(call);
#else
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ putConnection(conn);
return np; /* xmitting; drop packet */
#endif
} else {
if (rx_stats_active)
rx_atomic_inc(&rx_stats.spuriousPacketsRead);
MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ putConnection(conn);
return np;
}
}
/* Set remote user defined status from packet */
call->remoteStatus = np->header.userStatus;
- /* Note the gap between the expected next packet and the actual
- * packet that arrived, when the new packet has a smaller serial number
- * than expected. Rioses frequently reorder packets all by themselves,
- * so this will be quite important with very large window sizes.
- * Skew is checked against 0 here to avoid any dependence on the type of
- * inPacketSkew (which may be unsigned). In C, -1 > (unsigned) 0 is always
- * true!
- * The inPacketSkew should be a smoothed running value, not just a maximum. MTUXXX
- * see CalculateRoundTripTime for an example of how to keep smoothed values.
- * I think using a beta of 1/8 is probably appropriate. 93.04.21
- */
- MUTEX_ENTER(&conn->conn_data_lock);
- skew = conn->lastSerial - np->header.serial;
- conn->lastSerial = np->header.serial;
- MUTEX_EXIT(&conn->conn_data_lock);
- if (skew > 0) {
- struct rx_peer *peer;
- peer = conn->peer;
- if (skew > peer->inPacketSkew) {
- dpf(("*** In skew changed from %d to %d\n",
- peer->inPacketSkew, skew));
- peer->inPacketSkew = skew;
- }
- }
-
/* Now do packet type-specific processing */
switch (np->header.type) {
case RX_PACKET_TYPE_DATA:
dpf(("rxi_ReceivePacket ABORT rx_DataOf = %d\n", errdata));
rxi_CallError(call, errdata);
MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ putConnection(conn);
return np; /* xmitting; drop packet */
}
- case RX_PACKET_TYPE_BUSY:
- /* XXXX */
- break;
+ case RX_PACKET_TYPE_BUSY: {
+ struct clock busyTime;
+ clock_NewTime();
+ clock_GetTime(&busyTime);
+
+ MUTEX_EXIT(&call->lock);
+
+ MUTEX_ENTER(&conn->conn_call_lock);
+ MUTEX_ENTER(&call->lock);
+ conn->lastBusy[call->channel] = busyTime.sec;
+ call->flags |= RX_CALL_PEER_BUSY;
+ MUTEX_EXIT(&call->lock);
+ MUTEX_EXIT(&conn->conn_call_lock);
+
+ putConnection(conn);
+ return np;
+ }
+
case RX_PACKET_TYPE_ACKALL:
/* All packets acknowledged, so we can drop all packets previously
* readied for sending */
break;
#else /* RX_ENABLE_LOCKS */
MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ putConnection(conn);
return np; /* xmitting; drop packet */
#endif /* RX_ENABLE_LOCKS */
}
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
rxi_ClearTransmitQueue(call, 0);
- rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
break;
default:
/* Should not reach here, unless the peer is broken: send an abort
* the packet will be delivered to the user before any get time is required
* (if not, then the time won't actually be re-evaluated here). */
call->lastReceiveTime = clock_Sec();
+ /* we've received a legit packet, so the channel is not busy */
+ call->flags &= ~RX_CALL_PEER_BUSY;
MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ putConnection(conn);
return np;
}
}
#endif /* KERNEL */
+/*!
+ * Clear the attach wait flag on a connection and proceed.
+ *
+ * Any processing waiting for a connection to be attached should be
+ * unblocked. We clear the flag and do any other needed tasks.
+ *
+ * @param[in] conn
+ * the conn to unmark waiting for attach
+ *
+ * @pre conn's conn_data_lock must be locked before calling this function
+ *
+ */
static void
-rxi_CheckReachEvent(struct rxevent *event, void *arg1, void *arg2)
+rxi_ConnClearAttachWait(struct rx_connection *conn)
+{
+ /* Indicate that rxi_CheckReachEvent is no longer running by
+ * clearing the flag. Must be atomic under conn_data_lock to
+ * avoid a new call slipping by: rxi_CheckConnReach holds
+ * conn_data_lock while checking RX_CONN_ATTACHWAIT.
+ */
+ conn->flags &= ~RX_CONN_ATTACHWAIT;
+ if (conn->flags & RX_CONN_NAT_PING) {
+ conn->flags &= ~RX_CONN_NAT_PING;
+ rxi_ScheduleNatKeepAliveEvent(conn);
+ }
+}
+
+static void
+rxi_CheckReachEvent(struct rxevent *event, void *arg1, void *arg2, int dummy)
{
struct rx_connection *conn = arg1;
struct rx_call *acall = arg2;
int i, waiting;
MUTEX_ENTER(&conn->conn_data_lock);
- conn->checkReachEvent = NULL;
+
+ if (event) {
+ rxevent_Put(conn->checkReachEvent);
+ conn->checkReachEvent = NULL;
+ }
+
waiting = conn->flags & RX_CONN_ATTACHWAIT;
if (event) {
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ putConnection(conn);
}
MUTEX_EXIT(&conn->conn_data_lock);
}
}
if (!call)
- /* Indicate that rxi_CheckReachEvent is no longer running by
- * clearing the flag. Must be atomic under conn_data_lock to
- * avoid a new call slipping by: rxi_CheckConnReach holds
- * conn_data_lock while checking RX_CONN_ATTACHWAIT.
- */
- conn->flags &= ~RX_CONN_ATTACHWAIT;
+ rxi_ConnClearAttachWait(conn);
MUTEX_EXIT(&conn->conn_data_lock);
MUTEX_EXIT(&conn->conn_call_lock);
}
MUTEX_ENTER(&rx_refcnt_mutex);
conn->refCount++;
MUTEX_EXIT(&rx_refcnt_mutex);
- conn->checkReachEvent =
- rxevent_PostNow(&when, &now, rxi_CheckReachEvent, conn,
- NULL);
+ conn->checkReachEvent = rxevent_Post(&when, &now,
+ rxi_CheckReachEvent, conn,
+ NULL, 0);
}
MUTEX_EXIT(&conn->conn_data_lock);
}
conn->flags |= RX_CONN_ATTACHWAIT;
MUTEX_EXIT(&conn->conn_data_lock);
if (!conn->checkReachEvent)
- rxi_CheckReachEvent(NULL, conn, call);
+ rxi_CheckReachEvent(NULL, conn, call, 0);
return 1;
}
* appropriate to the call (the call is in the right state, etc.). This
* routine can return a packet to the caller, for re-use */
-struct rx_packet *
+static struct rx_packet *
rxi_ReceiveDataPacket(struct rx_call *call,
struct rx_packet *np, int istack,
osi_socket socket, afs_uint32 host, u_short port,
afs_uint32 serial=0, flags=0;
int isFirst;
struct rx_packet *tnp;
- struct clock when, now;
if (rx_stats_active)
rx_atomic_inc(&rx_stats.dataPacketsRead);
MUTEX_EXIT(&rx_freePktQ_lock);
if (rx_stats_active)
rx_atomic_inc(&rx_stats.noPacketBuffersOnRead);
- call->rprev = np->header.serial;
rxi_calltrace(RX_TRACE_DROP, call);
dpf(("packet %"AFS_PTR_FMT" dropped on receipt - quota problems\n", np));
- if (rxi_doreclaim)
- rxi_ClearReceiveQueue(call);
- clock_GetTime(&now);
- when = now;
- clock_Add(&when, &rx_softAckDelay);
- if (!call->delayedAckEvent
- || clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
- rxevent_Cancel(call->delayedAckEvent, call,
- RX_CALL_REFCOUNT_DELAY);
- MUTEX_ENTER(&rx_refcnt_mutex);
- CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
- MUTEX_EXIT(&rx_refcnt_mutex);
-
- call->delayedAckEvent =
- rxevent_PostNow(&when, &now, rxi_SendDelayedAck, call, 0);
- }
- /* we've damaged this call already, might as well do it in. */
+ /* We used to clear the receive queue here, in an attempt to free
+ * packets. However this is unsafe if the queue has received a
+ * soft ACK for the final packet */
+ rxi_PostDelayedAckEvent(call, &rx_softAckDelay);
return np;
}
#endif /* KERNEL */
if (rx_stats_active)
rx_atomic_inc(&rx_stats.dupPacketsRead);
dpf(("packet %"AFS_PTR_FMT" dropped on receipt - duplicate\n", np));
- rxevent_Cancel(call->delayedAckEvent, call,
+ rxevent_Cancel(&call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack);
ackNeeded = 0;
if (seq < call->rnext) {
if (rx_stats_active)
rx_atomic_inc(&rx_stats.dupPacketsRead);
- rxevent_Cancel(call->delayedAckEvent, call,
+ rxevent_Cancel(&call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack);
ackNeeded = 0;
* accomodated by the current window, then send a negative
* acknowledge and drop the packet */
if ((call->rnext + call->rwind) <= seq) {
- rxevent_Cancel(call->delayedAckEvent, call,
+ rxevent_Cancel(&call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
np = rxi_SendAck(call, np, serial, RX_ACK_EXCEEDS_WINDOW,
istack);
if (seq == tp->header.seq) {
if (rx_stats_active)
rx_atomic_inc(&rx_stats.dupPacketsRead);
- rxevent_Cancel(call->delayedAckEvent, call,
+ rxevent_Cancel(&call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE,
istack);
* Send an ack when requested by the peer, or once every
* rxi_SoftAckRate packets until the last packet has been
* received. Always send a soft ack for the last packet in
- * the server's reply.
- *
- * If we have received all of the packets for the call
- * immediately send an RX_PACKET_TYPE_ACKALL packet so that
- * the peer can empty its packet queue and cancel all resend
- * events.
- */
- if (call->flags & RX_CALL_RECEIVE_DONE) {
- rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
- rxi_AckAll(NULL, call, 0);
- } else if (ackNeeded) {
- rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
+ * the server's reply. */
+ if (ackNeeded) {
+ rxevent_Cancel(&call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
np = rxi_SendAck(call, np, serial, ackNeeded, istack);
} else if (call->nSoftAcks > (u_short) rxi_SoftAckRate) {
- rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
+ rxevent_Cancel(&call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
np = rxi_SendAck(call, np, serial, RX_ACK_IDLE, istack);
} else if (call->nSoftAcks) {
- clock_GetTime(&now);
- when = now;
- if (haveLast && !(flags & RX_CLIENT_INITIATED)) {
- clock_Add(&when, &rx_lastAckDelay);
- } else {
- clock_Add(&when, &rx_softAckDelay);
- }
- if (!call->delayedAckEvent
- || clock_Gt(&call->delayedAckEvent->eventTime, &when)) {
- rxevent_Cancel(call->delayedAckEvent, call,
- RX_CALL_REFCOUNT_DELAY);
- MUTEX_ENTER(&rx_refcnt_mutex);
- CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
- MUTEX_EXIT(&rx_refcnt_mutex);
- call->delayedAckEvent =
- rxevent_PostNow(&when, &now, rxi_SendDelayedAck, call, 0);
- }
+ if (haveLast && !(flags & RX_CLIENT_INITIATED))
+ rxi_PostDelayedAckEvent(call, &rx_lastAckDelay);
+ else
+ rxi_PostDelayedAckEvent(call, &rx_softAckDelay);
+ } else if (call->flags & RX_CALL_RECEIVE_DONE) {
+ rxevent_Cancel(&call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
}
return np;
}
-#ifdef ADAPT_WINDOW
-static void rxi_ComputeRate();
-#endif
-
static void
rxi_UpdatePeerReach(struct rx_connection *conn, struct rx_call *acall)
{
if (conn->flags & RX_CONN_ATTACHWAIT) {
int i;
- conn->flags &= ~RX_CONN_ATTACHWAIT;
+ rxi_ConnClearAttachWait(conn);
MUTEX_EXIT(&conn->conn_data_lock);
for (i = 0; i < RX_MAXCALLS; i++) {
/* The real smarts of the whole thing. */
-struct rx_packet *
+static struct rx_packet *
rxi_ReceiveAckPacket(struct rx_call *call, struct rx_packet *np,
int istack)
{
afs_uint32 first;
afs_uint32 prev;
afs_uint32 serial;
- /* because there are CM's that are bogus, sending weird values for this. */
- afs_uint32 skew = 0;
int nbytes;
int missing;
int acked;
first = ntohl(ap->firstPacket);
prev = ntohl(ap->previousPacket);
serial = ntohl(ap->serial);
- /* temporarily disabled -- needs to degrade over time
- * skew = ntohs(ap->maxSkew); */
- /* Ignore ack packets received out of order */
+ /*
+ * Ignore ack packets received out of order while protecting
+ * against peers that set the previousPacket field to a packet
+ * serial number instead of a sequence number.
+ */
if (first < call->tfirst ||
- (first == call->tfirst && prev < call->tprev)) {
+ (first == call->tfirst && prev < call->tprev && prev < call->tfirst
+ + call->twind)) {
return np;
}
size_t len;
len = _snprintf(msg, sizeof(msg),
- "tid[%d] RACK: reason %s serial %u previous %u seq %u skew %d first %u acks %u space %u ",
+ "tid[%d] RACK: reason %s serial %u previous %u seq %u first %u acks %u space %u ",
GetCurrentThreadId(), rx_ack_reason(ap->reason),
ntohl(ap->serial), ntohl(ap->previousPacket),
- (unsigned int)np->header.seq, (unsigned int)skew,
- ntohl(ap->firstPacket), ap->nAcks, ntohs(ap->bufferSpace) );
+ (unsigned int)np->header.seq, ntohl(ap->firstPacket),
+ ap->nAcks, ntohs(ap->bufferSpace) );
if (nAcks) {
int offset;
#else /* AFS_NT40_ENV */
if (rx_Log) {
fprintf(rx_Log,
- "RACK: reason %x previous %u seq %u serial %u skew %d first %u",
+ "RACK: reason %x previous %u seq %u serial %u first %u",
ap->reason, ntohl(ap->previousPacket),
(unsigned int)np->header.seq, (unsigned int)serial,
- (unsigned int)skew, ntohl(ap->firstPacket));
+ ntohl(ap->firstPacket));
if (nAcks) {
int offset;
for (offset = 0; offset < nAcks; offset++)
}
}
- /* Update the outgoing packet skew value to the latest value of
- * the peer's incoming packet skew value. The ack packet, of
- * course, could arrive out of order, but that won't affect things
- * much */
- peer->outPacketSkew = skew;
+ clock_GetTime(&now);
+
+ /* The transmit queue splits into 4 sections.
+ *
+ * The first section is packets which have now been acknowledged
+ * by a window size change in the ack. These have reached the
+ * application layer, and may be discarded. These are packets
+ * with sequence numbers < ap->firstPacket.
+ *
+ * The second section is packets which have sequence numbers in
+ * the range ap->firstPacket to ap->firstPacket + ap->nAcks. The
+ * contents of the packet's ack array determines whether these
+ * packets are acknowledged or not.
+ *
+ * The third section is packets which fall above the range
+ * addressed in the ack packet. These have not yet been received
+ * by the peer.
+ *
+ * The four section is packets which have not yet been transmitted.
+ * These packets will have a header.serial of 0.
+ */
- /* Check for packets that no longer need to be transmitted, and
- * discard them. This only applies to packets positively
- * acknowledged as having been sent to the peer's upper level.
- * All other packets must be retained. So only packets with
- * sequence numbers < ap->firstPacket are candidates. */
+ /* First section - implicitly acknowledged packets that can be
+ * disposed of
+ */
- clock_GetTime(&now);
+ tp = queue_First(&call->tq, rx_packet);
+ while(!queue_IsEnd(&call->tq, tp) && tp->header.seq < first) {
+ struct rx_packet *next;
- for (queue_Scan(&call->tq, tp, nxp, rx_packet)) {
- if (tp->header.seq >= first)
- break;
+ next = queue_Next(tp, rx_packet);
call->tfirst = tp->header.seq + 1;
if (!(tp->flags & RX_PKTFLAG_ACKED)) {
newAckCount++;
- if (ap->reason != RX_ACK_DELAY &&
- clock_Eq(&tp->timeSent, &tp->firstSent)) {
- rxi_ComputeRoundTripTime(tp, &tp->timeSent, call->conn->peer,
- &now);
- }
+ rxi_ComputeRoundTripTime(tp, ap, call, peer, &now);
}
-#ifdef ADAPT_WINDOW
- rxi_ComputeRate(call->conn->peer, call, p, np, ap->reason);
-#endif
-
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
/* XXX Hack. Because we have to release the global rx lock when sending
* packets (osi_NetSend) we drop all acks while we're traversing the tq
#endif /* RXDEBUG_PACKET */
rxi_FreePacket(tp); /* rxi_FreePacket mustn't wake up anyone, preemptively. */
}
+ tp = next;
}
-#ifdef ADAPT_WINDOW
- /* Give rate detector a chance to respond to ping requests */
- if (ap->reason == RX_ACK_PING_RESPONSE) {
- rxi_ComputeRate(peer, call, 0, np, ap->reason);
- }
-#endif
-
/* N.B. we don't turn off any timers here. They'll go away by themselves, anyway */
- /* Now go through explicit acks/nacks and record the results in
+ /* Second section of the queue - packets for which we are receiving
+ * soft ACKs
+ *
+ * Go through the explicit acks/nacks and record the results in
* the waiting packets. These are packets that can't be released
* yet, even with a positive acknowledge. This positive
* acknowledge only means the packet has been received by the
* because this packet was out of sequence) */
call->nSoftAcked = 0;
- for (missing = 0, queue_Scan(&call->tq, tp, nxp, rx_packet)) {
-
- /* Set the acknowledge flag per packet based on the
+ missing = 0;
+ while (!queue_IsEnd(&call->tq, tp) && tp->header.seq < first + nAcks) {
+ /* Set the acknowledge flag per packet based on the
* information in the ack packet. An acknowlegded packet can
* be downgraded when the server has discarded a packet it
* soacked previously, or when an ack packet is received
* out of sequence. */
- if (tp->header.seq < first) {
- /* Implicit ack information */
+ if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
if (!(tp->flags & RX_PKTFLAG_ACKED)) {
newAckCount++;
+ tp->flags |= RX_PKTFLAG_ACKED;
+ rxi_ComputeRoundTripTime(tp, ap, call, peer, &now);
}
- tp->flags |= RX_PKTFLAG_ACKED;
- } else if (tp->header.seq < first + nAcks) {
- /* Explicit ack information: set it in the packet appropriately */
- if (ap->acks[tp->header.seq - first] == RX_ACK_TYPE_ACK) {
- if (!(tp->flags & RX_PKTFLAG_ACKED)) {
- newAckCount++;
- tp->flags |= RX_PKTFLAG_ACKED;
-
- if (ap->reason != RX_ACK_DELAY &&
- clock_Eq(&tp->timeSent, &tp->firstSent)) {
- rxi_ComputeRoundTripTime(tp, &tp->timeSent,
- call->conn->peer, &now);
- }
-#ifdef ADAPT_WINDOW
- rxi_ComputeRate(call->conn->peer, call, tp, np,
- ap->reason);
-#endif
- }
- if (missing) {
- nNacked++;
- } else {
- call->nSoftAcked++;
- }
- } else /* RX_ACK_TYPE_NACK */ {
- tp->flags &= ~RX_PKTFLAG_ACKED;
- missing = 1;
- }
- } else {
- if (tp->flags & RX_PKTFLAG_ACKED) {
- tp->flags &= ~RX_PKTFLAG_ACKED;
- missing = 1;
+ if (missing) {
+ nNacked++;
+ } else {
+ call->nSoftAcked++;
}
+ } else /* RX_ACK_TYPE_NACK */ {
+ tp->flags &= ~RX_PKTFLAG_ACKED;
+ missing = 1;
}
- /*
- * Following the suggestion of Phil Kern, we back off the peer's
- * timeout value for future packets until a successful response
- * is received for an initial transmission.
- */
- if (missing && !peer->backedOff) {
- struct clock c = peer->timeout;
- struct clock max_to = {3, 0};
-
- clock_Add(&peer->timeout, &c);
- if (clock_Gt(&peer->timeout, &max_to))
- peer->timeout = max_to;
- peer->backedOff = 1;
- }
-
- /* If packet isn't yet acked, and it has been transmitted at least
- * once, reset retransmit time using latest timeout
- * ie, this should readjust the retransmit timer for all outstanding
- * packets... So we don't just retransmit when we should know better*/
-
- if (!(tp->flags & RX_PKTFLAG_ACKED) && !clock_IsZero(&tp->retryTime)) {
- tp->retryTime = tp->timeSent;
- clock_Add(&tp->retryTime, &peer->timeout);
- /* shift by eight because one quarter-sec ~ 256 milliseconds */
- clock_Addmsec(&(tp->retryTime), ((afs_uint32) tp->backoff) << 8);
- }
+ tp = queue_Next(tp, rx_packet);
}
+ /* We don't need to take any action with the 3rd or 4th section in the
+ * queue - they're not addressed by the contents of this ACK packet.
+ */
+
/* If the window has been extended by this acknowledge packet,
* then wakeup a sender waiting in alloc for window space, or try
* sending packets now, if he's been sitting on packets due to
maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
maxDgramPackets =
MIN(maxDgramPackets, (int)(peer->ifDgramPackets));
- maxDgramPackets = MIN(maxDgramPackets, tSize);
if (maxDgramPackets > 1) {
peer->maxDgramPackets = maxDgramPackets;
call->MTU = RX_JUMBOBUFFERSIZE + RX_HEADER_SIZE;
call->nNacks = 0;
}
+ /* If the packet contained new acknowledgements, rather than just
+ * being a duplicate of one we have previously seen, then we can restart
+ * the RTT timer
+ */
+ if (newAckCount > 0)
+ rxi_rto_packet_acked(call, istack);
+
if (call->flags & RX_CALL_FAST_RECOVER) {
- if (nNacked) {
+ if (newAckCount == 0) {
call->cwind = MIN((int)(call->cwind + 1), rx_maxSendWindow);
} else {
call->flags &= ~RX_CALL_FAST_RECOVER;
call->nCwindAcks = 0;
} else if (nNacked && call->nNacks >= (u_short) rx_nackThreshold) {
/* Three negative acks in a row trigger congestion recovery */
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- MUTEX_EXIT(&peer->peer_lock);
- if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
- /* someone else is waiting to start recovery */
- return np;
- }
- call->flags |= RX_CALL_FAST_RECOVER_WAIT;
- rxi_WaitforTQBusy(call);
- MUTEX_ENTER(&peer->peer_lock);
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
call->flags |= RX_CALL_FAST_RECOVER;
call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind)) >> 1;
call->cwind =
peer->nDgramPackets = call->nDgramPackets;
peer->congestSeq++;
call->congestSeq = peer->congestSeq;
+
/* Reset the resend times on the packets that were nacked
- * so we will retransmit as soon as the window permits*/
+ * so we will retransmit as soon as the window permits
+ */
+
for (acked = 0, queue_ScanBackwards(&call->tq, tp, nxp, rx_packet)) {
if (acked) {
if (!(tp->flags & RX_PKTFLAG_ACKED)) {
- clock_Zero(&tp->retryTime);
+ tp->flags &= ~RX_PKTFLAG_SENT;
}
} else if (tp->flags & RX_PKTFLAG_ACKED) {
acked = 1;
&& call->tfirst + call->nSoftAcked >= call->tnext) {
call->state = RX_STATE_DALLY;
rxi_ClearTransmitQueue(call, 0);
- rxevent_Cancel(call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
+ rxevent_Cancel(&call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
} else if (!queue_IsEmpty(&call->tq)) {
- rxi_Start(0, call, 0, istack);
+ rxi_Start(call, istack);
}
return np;
}
/* Received a response to a challenge packet */
-struct rx_packet *
+static struct rx_packet *
rxi_ReceiveResponsePacket(struct rx_connection *conn,
struct rx_packet *np, int istack)
{
* back to the server. The server is responsible for retrying the
* challenge if it fails to get a response. */
-struct rx_packet *
+static struct rx_packet *
rxi_ReceiveChallengePacket(struct rx_connection *conn,
struct rx_packet *np, int istack)
{
/* Find an available server process to service the current request in
* the given call structure. If one isn't available, queue up this
* call so it eventually gets one */
-void
+static void
rxi_AttachServerProc(struct rx_call *call,
osi_socket socket, int *tnop,
struct rx_call **newcallp)
queue_Append(&rx_incomingCallQueue, call);
}
} else {
- sq = queue_First(&rx_idleServerQueue, rx_serverQueueEntry);
+ sq = queue_Last(&rx_idleServerQueue, rx_serverQueueEntry);
/* If hot threads are enabled, and both newcallp and sq->socketp
* are non-null, then this thread will process the call, and the
*tnop = sq->tno;
*sq->socketp = socket;
clock_GetTime(&call->startTime);
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
- MUTEX_EXIT(&rx_refcnt_mutex);
} else {
sq->newcall = call;
}
if (call->flags & RX_CALL_WAIT_PROC) {
/* Conservative: I don't think this should happen */
call->flags &= ~RX_CALL_WAIT_PROC;
+ rx_atomic_dec(&rx_nWaiting);
if (queue_IsOnQueue(call)) {
queue_Remove(call);
-
- rx_atomic_dec(&rx_nWaiting);
}
}
call->state = RX_STATE_ACTIVE;
* a new call is being prepared (in the case of a client) or a reply
* is being prepared (in the case of a server). Rather than sending
* an ack packet, an ACKALL packet is sent. */
-void
-rxi_AckAll(struct rxevent *event, struct rx_call *call, char *dummy)
+static void
+rxi_AckAll(struct rx_call *call)
{
-#ifdef RX_ENABLE_LOCKS
- if (event) {
- MUTEX_ENTER(&call->lock);
- call->delayedAckEvent = NULL;
- MUTEX_ENTER(&rx_refcnt_mutex);
- CALL_RELE(call, RX_CALL_REFCOUNT_ACKALL);
- MUTEX_EXIT(&rx_refcnt_mutex);
- }
- rxi_SendSpecial(call, call->conn, (struct rx_packet *)0,
- RX_PACKET_TYPE_ACKALL, NULL, 0, 0);
- if (event)
- MUTEX_EXIT(&call->lock);
-#else /* RX_ENABLE_LOCKS */
- if (event)
- call->delayedAckEvent = NULL;
- rxi_SendSpecial(call, call->conn, (struct rx_packet *)0,
- RX_PACKET_TYPE_ACKALL, NULL, 0, 0);
-#endif /* RX_ENABLE_LOCKS */
+ rxi_SendSpecial(call, call->conn, NULL, RX_PACKET_TYPE_ACKALL,
+ NULL, 0, 0);
+ call->flags |= RX_CALL_ACKALL_SENT;
}
-void
-rxi_SendDelayedAck(struct rxevent *event, void *arg1, void *unused)
+static void
+rxi_SendDelayedAck(struct rxevent *event, void *arg1, void *unused1,
+ int unused2)
{
struct rx_call *call = arg1;
#ifdef RX_ENABLE_LOCKS
if (event) {
MUTEX_ENTER(&call->lock);
- if (event == call->delayedAckEvent)
+ if (event == call->delayedAckEvent) {
+ rxevent_Put(call->delayedAckEvent);
call->delayedAckEvent = NULL;
- MUTEX_ENTER(&rx_refcnt_mutex);
+ }
CALL_RELE(call, RX_CALL_REFCOUNT_DELAY);
- MUTEX_EXIT(&rx_refcnt_mutex);
}
(void)rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
if (event)
MUTEX_EXIT(&call->lock);
#else /* RX_ENABLE_LOCKS */
- if (event)
+ if (event) {
+ rxevent_Put(call->delayedAckEvent);
call->delayedAckEvent = NULL;
+ }
(void)rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
#endif /* RX_ENABLE_LOCKS */
}
call->flags |= RX_CALL_TQ_SOME_ACKED;
}
- rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
+ rxi_rto_cancel(call);
+
call->tfirst = call->tnext;
call->nSoftAcked = 0;
/* Clear out the transmit queue for the current call (all packets have
* been received by peer) */
-void
+static void
rxi_ClearTransmitQueue(struct rx_call *call, int force)
{
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
call->tqc -=
#endif /* RXDEBUG_PACKET */
rxi_FreePackets(0, &call->tq);
- if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
-#ifdef RX_ENABLE_LOCKS
- CV_BROADCAST(&call->cv_tq);
-#else /* RX_ENABLE_LOCKS */
- osi_rxWakeup(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
- }
+ rxi_WakeUpTransmitQueue(call);
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
call->flags &= ~RX_CALL_TQ_CLEARME;
}
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
+ rxi_rto_cancel(call);
call->tfirst = call->tnext; /* implicitly acknowledge all data already sent */
call->nSoftAcked = 0;
#endif
}
-void
+static void
rxi_ClearReceiveQueue(struct rx_call *call)
{
if (queue_IsNotEmpty(&call->rq)) {
}
/* Send an abort packet for the specified call */
-struct rx_packet *
+static struct rx_packet *
rxi_SendCallAbort(struct rx_call *call, struct rx_packet *packet,
int istack, int force)
{
- afs_int32 error;
+ afs_int32 error, cerror;
struct clock when, now;
if (!call->error)
return packet;
+ switch (call->error) {
+ case RX_CALL_IDLE:
+ case RX_CALL_BUSY:
+ cerror = RX_CALL_TIMEOUT;
+ break;
+ default:
+ cerror = call->error;
+ }
+
/* Clients should never delay abort messages */
if (rx_IsClientConn(call->conn))
force = 1;
- if (call->abortCode != call->error) {
- call->abortCode = call->error;
+ if (call->abortCode != cerror) {
+ call->abortCode = cerror;
call->abortCount = 0;
}
if (force || rxi_callAbortThreshhold == 0
|| call->abortCount < rxi_callAbortThreshhold) {
if (call->delayedAbortEvent) {
- rxevent_Cancel(call->delayedAbortEvent, call,
+ rxevent_Cancel(&call->delayedAbortEvent, call,
RX_CALL_REFCOUNT_ABORT);
}
- error = htonl(call->error);
+ error = htonl(cerror);
call->abortCount++;
packet =
rxi_SendSpecial(call, call->conn, packet, RX_PACKET_TYPE_ABORT,
clock_GetTime(&now);
when = now;
clock_Addmsec(&when, rxi_callAbortDelay);
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_HOLD(call, RX_CALL_REFCOUNT_ABORT);
- MUTEX_EXIT(&rx_refcnt_mutex);
call->delayedAbortEvent =
- rxevent_PostNow(&when, &now, rxi_SendDelayedCallAbort, call, 0);
+ rxevent_Post(&when, &now, rxi_SendDelayedCallAbort, call, 0, 0);
}
return packet;
}
if (force || rxi_connAbortThreshhold == 0
|| conn->abortCount < rxi_connAbortThreshhold) {
- if (conn->delayedAbortEvent) {
- rxevent_Cancel(conn->delayedAbortEvent, (struct rx_call *)0, 0);
- }
+
+ rxevent_Cancel(&conn->delayedAbortEvent, NULL, 0);
error = htonl(conn->error);
conn->abortCount++;
MUTEX_EXIT(&conn->conn_data_lock);
when = now;
clock_Addmsec(&when, rxi_connAbortDelay);
conn->delayedAbortEvent =
- rxevent_PostNow(&when, &now, rxi_SendDelayedConnAbort, conn, 0);
+ rxevent_Post(&when, &now, rxi_SendDelayedConnAbort, conn, NULL, 0);
}
return packet;
}
dpf(("rxi_ConnectionError conn %"AFS_PTR_FMT" error %d\n", conn, error));
MUTEX_ENTER(&conn->conn_data_lock);
- if (conn->challengeEvent)
- rxevent_Cancel(conn->challengeEvent, (struct rx_call *)0, 0);
- if (conn->natKeepAliveEvent)
- rxevent_Cancel(conn->natKeepAliveEvent, (struct rx_call *)0, 0);
+ rxevent_Cancel(&conn->challengeEvent, NULL, 0);
+ rxevent_Cancel(&conn->natKeepAliveEvent, NULL, 0);
if (conn->checkReachEvent) {
- rxevent_Cancel(conn->checkReachEvent, (struct rx_call *)0, 0);
- conn->checkReachEvent = 0;
- conn->flags &= ~RX_CONN_ATTACHWAIT;
- MUTEX_ENTER(&rx_refcnt_mutex);
- conn->refCount--;
- MUTEX_EXIT(&rx_refcnt_mutex);
+ rxevent_Cancel(&conn->checkReachEvent, NULL, 0);
+ conn->flags &= ~(RX_CONN_ATTACHWAIT|RX_CONN_NAT_PING);
+ putConnection(conn);
}
MUTEX_EXIT(&conn->conn_data_lock);
for (i = 0; i < RX_MAXCALLS; i++) {
* nFree are not reset, since these fields are manipulated by
* unprotected macros, and may only be reset by non-interrupting code.
*/
-#ifdef ADAPT_WINDOW
-/* this code requires that call->conn be set properly as a pre-condition. */
-#endif /* ADAPT_WINDOW */
-void
+static void
rxi_ResetCall(struct rx_call *call, int newcall)
{
int flags;
call->arrivalProc = (void (*)())0;
}
+
+ rxevent_Cancel(&call->growMTUEvent, call, RX_CALL_REFCOUNT_MTU);
+
if (call->delayedAbortEvent) {
- rxevent_Cancel(call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
+ rxevent_Cancel(&call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
if (packet) {
rxi_SendCallAbort(call, packet, 0, 1);
call->ssthresh = rx_maxSendWindow;
call->nDgramPackets = peer->nDgramPackets;
call->congestSeq = peer->congestSeq;
+ call->rtt = peer->rtt;
+ call->rtt_dev = peer->rtt_dev;
+ clock_Zero(&call->rto);
+ clock_Addmsec(&call->rto,
+ MAX(((call->rtt >> 3) + call->rtt_dev), rx_minPeerTimeout) + 200);
MUTEX_EXIT(&peer->peer_lock);
flags = call->flags;
}
call->flags = 0;
+ if (!newcall && (flags & RX_CALL_PEER_BUSY)) {
+ /* The call channel is still busy; resetting the call doesn't change
+ * that. However, if 'newcall' is set, we are processing a call
+ * structure that has either been recycled from the free list, or has
+ * been newly allocated. So, RX_CALL_PEER_BUSY is not relevant if
+ * 'newcall' is set, since it describes a completely different call
+ * channel which we do not care about. */
+ call->flags |= RX_CALL_PEER_BUSY;
+ }
+
rxi_ClearReceiveQueue(call);
/* why init the queue if you just emptied it? queue_Init(&call->rq); */
osi_rxWakeup(&call->twind);
#endif
+ if (flags & RX_CALL_WAIT_PROC) {
+ rx_atomic_dec(&rx_nWaiting);
+ }
#ifdef RX_ENABLE_LOCKS
/* The following ensures that we don't mess with any queue while some
* other thread might also be doing so. The call_queue_lock field is
MUTEX_ENTER(call->call_queue_lock);
if (queue_IsOnQueue(call)) {
queue_Remove(call);
- if (flags & RX_CALL_WAIT_PROC) {
- rx_atomic_dec(&rx_nWaiting);
- }
}
MUTEX_EXIT(call->call_queue_lock);
CLEAR_CALL_QUEUE_LOCK(call);
#else /* RX_ENABLE_LOCKS */
if (queue_IsOnQueue(call)) {
queue_Remove(call);
- if (flags & RX_CALL_WAIT_PROC)
- rx_atomic_dec(&rx_nWaiting);
}
#endif /* RX_ENABLE_LOCKS */
rxi_KeepAliveOff(call);
- rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
+ rxevent_Cancel(&call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
}
/* Send an acknowledge for the indicated packet (seq,serial) of the
struct rx_packet *rqp;
struct rx_packet *nxp; /* For queue_Scan */
struct rx_packet *p;
- u_char offset;
+ u_char offset = 0;
afs_int32 templ;
afs_uint32 padbytes = 0;
#ifdef RX_ENABLE_TSFPQ
ap->serial = htonl(serial);
ap->maxSkew = 0; /* used to be peer->inPacketSkew */
- ap->firstPacket = htonl(call->rnext); /* First packet not yet forwarded to reader */
- ap->previousPacket = htonl(call->rprev); /* Previous packet received */
+ /*
+ * First packet not yet forwarded to reader. When ACKALL has been
+ * sent the peer has been told that all received packets will be
+ * delivered to the reader. The value 'rnext' is used internally
+ * to refer to the next packet in the receive queue that must be
+ * delivered to the reader. From the perspective of the peer it
+ * already has so report the last sequence number plus one if there
+ * are packets in the receive queue awaiting processing.
+ */
+ if ((call->flags & RX_CALL_ACKALL_SENT) &&
+ !queue_IsEmpty(&call->rq)) {
+ ap->firstPacket = htonl(queue_Last(&call->rq, rx_packet)->header.seq + 1);
+ } else {
+ ap->firstPacket = htonl(call->rnext);
+
+ ap->previousPacket = htonl(call->rprev); /* Previous packet received */
- /* No fear of running out of ack packet here because there can only be at most
- * one window full of unacknowledged packets. The window size must be constrained
- * to be less than the maximum ack size, of course. Also, an ack should always
- * fit into a single packet -- it should not ever be fragmented. */
- for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
- if (!rqp || !call->rq.next
- || (rqp->header.seq > (call->rnext + call->rwind))) {
+ /* No fear of running out of ack packet here because there can only be at most
+ * one window full of unacknowledged packets. The window size must be constrained
+ * to be less than the maximum ack size, of course. Also, an ack should always
+ * fit into a single packet -- it should not ever be fragmented. */
+ for (offset = 0, queue_Scan(&call->rq, rqp, nxp, rx_packet)) {
+ if (!rqp || !call->rq.next
+ || (rqp->header.seq > (call->rnext + call->rwind))) {
#ifndef RX_ENABLE_TSFPQ
- if (!optionalPacket)
- rxi_FreePacket(p);
+ if (!optionalPacket)
+ rxi_FreePacket(p);
#endif
- rxi_CallError(call, RX_CALL_DEAD);
- return optionalPacket;
- }
+ rxi_CallError(call, RX_CALL_DEAD);
+ return optionalPacket;
+ }
- while (rqp->header.seq > call->rnext + offset)
- ap->acks[offset++] = RX_ACK_TYPE_NACK;
- ap->acks[offset++] = RX_ACK_TYPE_ACK;
+ while (rqp->header.seq > call->rnext + offset)
+ ap->acks[offset++] = RX_ACK_TYPE_NACK;
+ ap->acks[offset++] = RX_ACK_TYPE_ACK;
- if ((offset > (u_char) rx_maxReceiveWindow) || (offset > call->rwind)) {
+ if ((offset > (u_char) rx_maxReceiveWindow) || (offset > call->rwind)) {
#ifndef RX_ENABLE_TSFPQ
- if (!optionalPacket)
- rxi_FreePacket(p);
+ if (!optionalPacket)
+ rxi_FreePacket(p);
#endif
- rxi_CallError(call, RX_CALL_DEAD);
- return optionalPacket;
+ rxi_CallError(call, RX_CALL_DEAD);
+ return optionalPacket;
+ }
}
}
p->header.flags = RX_SLOW_START_OK;
if (reason == RX_ACK_PING) {
p->header.flags |= RX_REQUEST_ACK;
-#ifdef ADAPT_WINDOW
- clock_GetTime(&call->pingRequestTime);
-#endif
if (padbytes) {
p->length = padbytes +
rx_AckDataSize(call->rwind) + 4 * sizeof(afs_int32);
return optionalPacket; /* Return packet for re-use by caller */
}
+struct xmitlist {
+ struct rx_packet **list;
+ int len;
+ int resending;
+};
+
/* Send all of the packets in the list in single datagram */
static void
-rxi_SendList(struct rx_call *call, struct rx_packet **list, int len,
- int istack, int moreFlag, struct clock *now,
- struct clock *retryTime, int resending)
+rxi_SendList(struct rx_call *call, struct xmitlist *xmit,
+ int istack, int moreFlag)
{
int i;
int requestAck = 0;
int lastPacket = 0;
+ struct clock now;
struct rx_connection *conn = call->conn;
struct rx_peer *peer = conn->peer;
MUTEX_ENTER(&peer->peer_lock);
- peer->nSent += len;
- if (resending)
- peer->reSends += len;
+ peer->nSent += xmit->len;
+ if (xmit->resending)
+ peer->reSends += xmit->len;
MUTEX_EXIT(&peer->peer_lock);
if (rx_stats_active) {
- if (resending)
- rx_atomic_add(&rx_stats.dataPacketsReSent, len);
+ if (xmit->resending)
+ rx_atomic_add(&rx_stats.dataPacketsReSent, xmit->len);
else
- rx_atomic_add(&rx_stats.dataPacketsSent, len);
+ rx_atomic_add(&rx_stats.dataPacketsSent, xmit->len);
}
- if (list[len - 1]->header.flags & RX_LAST_PACKET) {
+ clock_GetTime(&now);
+
+ if (xmit->list[xmit->len - 1]->header.flags & RX_LAST_PACKET) {
lastPacket = 1;
}
/* Set the packet flags and schedule the resend events */
/* Only request an ack for the last packet in the list */
- for (i = 0; i < len; i++) {
- list[i]->retryTime = *retryTime;
- if (list[i]->header.serial) {
- /* Exponentially backoff retry times */
- if (list[i]->backoff < MAXBACKOFF) {
- /* so it can't stay == 0 */
- list[i]->backoff = (list[i]->backoff << 1) + 1;
- } else
- list[i]->backoff++;
- clock_Addmsec(&(list[i]->retryTime),
- ((afs_uint32) list[i]->backoff) << 8);
- }
-
- /* Wait a little extra for the ack on the last packet */
- if (lastPacket && !(list[i]->header.flags & RX_CLIENT_INITIATED)) {
- clock_Addmsec(&(list[i]->retryTime), 400);
- }
+ for (i = 0; i < xmit->len; i++) {
+ struct rx_packet *packet = xmit->list[i];
/* Record the time sent */
- list[i]->timeSent = *now;
+ packet->timeSent = now;
+ packet->flags |= RX_PKTFLAG_SENT;
/* Ask for an ack on retransmitted packets, on every other packet
* if the peer doesn't support slow start. Ask for an ack on every
* packet until the congestion window reaches the ack rate. */
- if (list[i]->header.serial) {
+ if (packet->header.serial) {
requestAck = 1;
} else {
- /* improved RTO calculation- not Karn */
- list[i]->firstSent = *now;
+ packet->firstSent = now;
if (!lastPacket && (call->cwind <= (u_short) (conn->ackRate + 1)
|| (!(call->flags & RX_CALL_SLOW_START_OK)
- && (list[i]->header.seq & 1)))) {
+ && (packet->header.seq & 1)))) {
requestAck = 1;
}
}
/* Tag this packet as not being the last in this group,
* for the receiver's benefit */
- if (i < len - 1 || moreFlag) {
- list[i]->header.flags |= RX_MORE_PACKETS;
+ if (i < xmit->len - 1 || moreFlag) {
+ packet->header.flags |= RX_MORE_PACKETS;
}
-
- /* Install the new retransmit time for the packet, and
- * record the time sent */
- list[i]->timeSent = *now;
}
if (requestAck) {
- list[len - 1]->header.flags |= RX_REQUEST_ACK;
+ xmit->list[xmit->len - 1]->header.flags |= RX_REQUEST_ACK;
}
/* Since we're about to send a data packet to the peer, it's
* safe to nuke any scheduled end-of-packets ack */
- rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
+ rxevent_Cancel(&call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
- MUTEX_EXIT(&rx_refcnt_mutex);
- if (len > 1) {
- rxi_SendPacketList(call, conn, list, len, istack);
+ if (xmit->len > 1) {
+ rxi_SendPacketList(call, conn, xmit->list, xmit->len, istack);
} else {
- rxi_SendPacket(call, conn, list[0], istack);
+ rxi_SendPacket(call, conn, xmit->list[0], istack);
}
MUTEX_ENTER(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
- MUTEX_EXIT(&rx_refcnt_mutex);
+
+ /* Tell the RTO calculation engine that we have sent a packet, and
+ * if it was the last one */
+ rxi_rto_packet_sent(call, lastPacket, istack);
/* Update last send time for this call (for keep-alive
* processing), and for the connection (so that we can discover
* idle connections) */
conn->lastSendTime = call->lastSendTime = clock_Sec();
/* Let a set of retransmits trigger an idle timeout */
- if (!resending)
+ if (!xmit->resending)
call->lastSendData = call->lastSendTime;
}
* We always keep the last list we should have sent so we
* can set the RX_MORE_PACKETS flags correctly.
*/
+
static void
rxi_SendXmitList(struct rx_call *call, struct rx_packet **list, int len,
- int istack, struct clock *now, struct clock *retryTime,
- int resending)
+ int istack)
{
- int i, cnt, lastCnt = 0;
- struct rx_packet **listP, **lastP = 0;
+ int i;
+ int recovery;
+ struct xmitlist working;
+ struct xmitlist last;
+
struct rx_peer *peer = call->conn->peer;
int morePackets = 0;
- for (cnt = 0, listP = &list[0], i = 0; i < len; i++) {
+ memset(&last, 0, sizeof(struct xmitlist));
+ working.list = &list[0];
+ working.len = 0;
+ working.resending = 0;
+
+ recovery = call->flags & RX_CALL_FAST_RECOVER;
+
+ for (i = 0; i < len; i++) {
/* Does the current packet force us to flush the current list? */
- if (cnt > 0
+ if (working.len > 0
&& (list[i]->header.serial || (list[i]->flags & RX_PKTFLAG_ACKED)
|| list[i]->length > RX_JUMBOBUFFERSIZE)) {
- if (lastCnt > 0) {
- rxi_SendList(call, lastP, lastCnt, istack, 1, now, retryTime,
- resending);
+
+ /* This sends the 'last' list and then rolls the current working
+ * set into the 'last' one, and resets the working set */
+
+ if (last.len > 0) {
+ rxi_SendList(call, &last, istack, 1);
/* If the call enters an error state stop sending, or if
* we entered congestion recovery mode, stop sending */
- if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
+ if (call->error
+ || (!recovery && (call->flags & RX_CALL_FAST_RECOVER)))
return;
}
- lastP = listP;
- lastCnt = cnt;
- listP = &list[i];
- cnt = 0;
+ last = working;
+ working.len = 0;
+ working.resending = 0;
+ working.list = &list[i];
}
/* Add the current packet to the list if it hasn't been acked.
* Otherwise adjust the list pointer to skip the current packet. */
if (!(list[i]->flags & RX_PKTFLAG_ACKED)) {
- cnt++;
+ working.len++;
+
+ if (list[i]->header.serial)
+ working.resending = 1;
+
/* Do we need to flush the list? */
- if (cnt >= (int)peer->maxDgramPackets
- || cnt >= (int)call->nDgramPackets || cnt >= (int)call->cwind
+ if (working.len >= (int)peer->maxDgramPackets
+ || working.len >= (int)call->nDgramPackets
+ || working.len >= (int)call->cwind
|| list[i]->header.serial
|| list[i]->length != RX_JUMBOBUFFERSIZE) {
- if (lastCnt > 0) {
- rxi_SendList(call, lastP, lastCnt, istack, 1, now,
- retryTime, resending);
+ if (last.len > 0) {
+ rxi_SendList(call, &last, istack, 1);
/* If the call enters an error state stop sending, or if
* we entered congestion recovery mode, stop sending */
if (call->error
- || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
+ || (!recovery && (call->flags & RX_CALL_FAST_RECOVER)))
return;
}
- lastP = listP;
- lastCnt = cnt;
- listP = &list[i + 1];
- cnt = 0;
+ last = working;
+ working.len = 0;
+ working.resending = 0;
+ working.list = &list[i + 1];
}
} else {
- if (cnt != 0) {
+ if (working.len != 0) {
osi_Panic("rxi_SendList error");
}
- listP = &list[i + 1];
+ working.list = &list[i + 1];
}
}
* an acked packet. Since we always send retransmissions
* in a separate packet, we only need to check the first
* packet in the list */
- if (cnt > 0 && !(listP[0]->flags & RX_PKTFLAG_ACKED)) {
+ if (working.len > 0 && !(working.list[0]->flags & RX_PKTFLAG_ACKED)) {
morePackets = 1;
}
- if (lastCnt > 0) {
- rxi_SendList(call, lastP, lastCnt, istack, morePackets, now,
- retryTime, resending);
+ if (last.len > 0) {
+ rxi_SendList(call, &last, istack, morePackets);
/* If the call enters an error state stop sending, or if
* we entered congestion recovery mode, stop sending */
- if (call->error || (call->flags & RX_CALL_FAST_RECOVER_WAIT))
+ if (call->error
+ || (!recovery && (call->flags & RX_CALL_FAST_RECOVER)))
return;
}
if (morePackets) {
- rxi_SendList(call, listP, cnt, istack, 0, now, retryTime,
- resending);
+ rxi_SendList(call, &working, istack, 0);
}
- } else if (lastCnt > 0) {
- rxi_SendList(call, lastP, lastCnt, istack, 0, now, retryTime,
- resending);
+ } else if (last.len > 0) {
+ rxi_SendList(call, &last, istack, 0);
+ /* Packets which are in 'working' are not sent by this call */
}
}
-#ifdef RX_ENABLE_LOCKS
-/* Call rxi_Start, below, but with the call lock held. */
-void
-rxi_StartUnlocked(struct rxevent *event,
- void *arg0, void *arg1, int istack)
+/**
+ * Check if the peer for the given call is known to be dead
+ *
+ * If the call's peer appears dead (it has encountered fatal network errors
+ * since the call started) the call is killed with RX_CALL_DEAD if the call
+ * is active. Otherwise, we do nothing.
+ *
+ * @param[in] call The call to check
+ *
+ * @return status
+ * @retval 0 The call is fine, and we haven't done anything to the call
+ * @retval nonzero The call's peer appears dead, and the call has been
+ * terminated if it was active
+ *
+ * @pre call->lock must be locked
+ */
+static int
+rxi_CheckPeerDead(struct rx_call *call)
+{
+#ifdef AFS_RXERRQ_ENV
+ int peererrs;
+
+ if (call->state == RX_STATE_DALLY) {
+ return 0;
+ }
+
+ peererrs = rx_atomic_read(&call->conn->peer->neterrs);
+ if (call->neterr_gen < peererrs) {
+ /* we have received network errors since this call started; kill
+ * the call */
+ if (call->state == RX_STATE_ACTIVE) {
+ rxi_CallError(call, RX_CALL_DEAD);
+ }
+ return -1;
+ }
+ if (call->neterr_gen > peererrs) {
+ /* someone has reset the number of peer errors; set the call error gen
+ * so we can detect if more errors are encountered */
+ call->neterr_gen = peererrs;
+ }
+#endif
+ return 0;
+}
+
+static void
+rxi_Resend(struct rxevent *event, void *arg0, void *arg1, int istack)
{
struct rx_call *call = arg0;
+ struct rx_peer *peer;
+ struct rx_packet *p, *nxp;
+ struct clock maxTimeout = { 60, 0 };
MUTEX_ENTER(&call->lock);
- rxi_Start(event, call, arg1, istack);
+
+ peer = call->conn->peer;
+
+ /* Make sure that the event pointer is removed from the call
+ * structure, since there is no longer a per-call retransmission
+ * event pending. */
+ if (event == call->resendEvent) {
+ CALL_RELE(call, RX_CALL_REFCOUNT_RESEND);
+ rxevent_Put(call->resendEvent);
+ call->resendEvent = NULL;
+ }
+
+ rxi_CheckPeerDead(call);
+
+ if (rxi_busyChannelError && (call->flags & RX_CALL_PEER_BUSY)) {
+ rxi_CheckBusy(call);
+ }
+
+ if (queue_IsEmpty(&call->tq)) {
+ /* Nothing to do. This means that we've been raced, and that an
+ * ACK has come in between when we were triggered, and when we
+ * actually got to run. */
+ goto out;
+ }
+
+ /* We're in loss recovery */
+ call->flags |= RX_CALL_FAST_RECOVER;
+
+ /* Mark all of the pending packets in the queue as being lost */
+ for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
+ if (!(p->flags & RX_PKTFLAG_ACKED))
+ p->flags &= ~RX_PKTFLAG_SENT;
+ }
+
+ /* We're resending, so we double the timeout of the call. This will be
+ * dropped back down by the first successful ACK that we receive.
+ *
+ * We apply a maximum value here of 60 seconds
+ */
+ clock_Add(&call->rto, &call->rto);
+ if (clock_Gt(&call->rto, &maxTimeout))
+ call->rto = maxTimeout;
+
+ /* Packet loss is most likely due to congestion, so drop our window size
+ * and start again from the beginning */
+ if (peer->maxDgramPackets >1) {
+ call->MTU = RX_JUMBOBUFFERSIZE + RX_HEADER_SIZE;
+ call->MTU = MIN(peer->natMTU, peer->maxMTU);
+ }
+ call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind)) >> 1;
+ call->nDgramPackets = 1;
+ call->cwind = 1;
+ call->nextCwind = 1;
+ call->nAcks = 0;
+ call->nNacks = 0;
+ MUTEX_ENTER(&peer->peer_lock);
+ peer->MTU = call->MTU;
+ peer->cwind = call->cwind;
+ peer->nDgramPackets = 1;
+ peer->congestSeq++;
+ call->congestSeq = peer->congestSeq;
+ MUTEX_EXIT(&peer->peer_lock);
+
+ rxi_Start(call, istack);
+
+out:
MUTEX_EXIT(&call->lock);
}
-#endif /* RX_ENABLE_LOCKS */
/* This routine is called when new packets are readied for
* transmission and when retransmission may be necessary, or when the
* better optimized for new packets, the usual case, now that we've
* got rid of queues of send packets. XXXXXXXXXXX */
void
-rxi_Start(struct rxevent *event,
- void *arg0, void *arg1, int istack)
+rxi_Start(struct rx_call *call, int istack)
{
- struct rx_call *call = arg0;
struct rx_packet *p;
struct rx_packet *nxp; /* Next pointer for queue_Scan */
- struct rx_peer *peer = call->conn->peer;
- struct clock now, usenow, retryTime;
- int haveEvent;
int nXmitPackets;
int maxXmitPackets;
- int resending = 0;
-
- /* If rxi_Start is being called as a result of a resend event,
- * then make sure that the event pointer is removed from the call
- * structure, since there is no longer a per-call retransmission
- * event pending. */
- if (event && event == call->resendEvent) {
- MUTEX_ENTER(&rx_refcnt_mutex);
- CALL_RELE(call, RX_CALL_REFCOUNT_RESEND);
- MUTEX_EXIT(&rx_refcnt_mutex);
- call->resendEvent = NULL;
- resending = 1;
- if (queue_IsEmpty(&call->tq)) {
- /* Nothing to do */
- return;
- }
- /* Timeouts trigger congestion recovery */
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
- /* someone else is waiting to start recovery */
- return;
- }
- call->flags |= RX_CALL_FAST_RECOVER_WAIT;
- rxi_WaitforTQBusy(call);
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- if (call->error) {
- if (rx_stats_active)
- rx_atomic_inc(&rx_tq_debug.rxi_start_in_error);
- return;
- }
-#endif
- call->flags |= RX_CALL_FAST_RECOVER;
- if (peer->maxDgramPackets > 1) {
- call->MTU = RX_JUMBOBUFFERSIZE + RX_HEADER_SIZE;
- } else {
- call->MTU = MIN(peer->natMTU, peer->maxMTU);
- }
- call->ssthresh = MAX(4, MIN((int)call->cwind, (int)call->twind)) >> 1;
- call->nDgramPackets = 1;
- call->cwind = 1;
- call->nextCwind = 1;
- call->nAcks = 0;
- call->nNacks = 0;
- MUTEX_ENTER(&peer->peer_lock);
- peer->MTU = call->MTU;
- peer->cwind = call->cwind;
- peer->nDgramPackets = 1;
- peer->congestSeq++;
- call->congestSeq = peer->congestSeq;
- MUTEX_EXIT(&peer->peer_lock);
- /* Clear retry times on packets. Otherwise, it's possible for
- * some packets in the queue to force resends at rates faster
- * than recovery rates.
- */
- for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
- if (!(p->flags & RX_PKTFLAG_ACKED)) {
- clock_Zero(&p->retryTime);
- }
- }
- }
if (call->error) {
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
if (rx_stats_active)
}
if (queue_IsNotEmpty(&call->tq)) { /* If we have anything to send */
- /* Get clock to compute the re-transmit time for any packets
- * in this burst. Note, if we back off, it's reasonable to
- * back off all of the packets in the same manner, even if
- * some of them have been retransmitted more times than more
- * recent additions.
- * Do a dance to avoid blocking after setting now. */
- MUTEX_ENTER(&peer->peer_lock);
- retryTime = peer->timeout;
- MUTEX_EXIT(&peer->peer_lock);
- clock_GetTime(&now);
- clock_Add(&retryTime, &now);
- usenow = now;
/* Send (or resend) any packets that need it, subject to
* window restrictions and congestion burst control
* restrictions. Ask for an ack on the last packet sent in
nXmitPackets = 0;
maxXmitPackets = MIN(call->twind, call->cwind);
for (queue_Scan(&call->tq, p, nxp, rx_packet)) {
- if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
- /* We shouldn't be sending packets if a thread is waiting
- * to initiate congestion recovery */
- dpf(("call %d waiting to initiate fast recovery\n",
- *(call->callNumber)));
- break;
- }
- if ((nXmitPackets)
- && (call->flags & RX_CALL_FAST_RECOVER)) {
- /* Only send one packet during fast recovery */
- dpf(("call %d restricted to one packet per send during fast recovery\n",
- *(call->callNumber)));
- break;
- }
#ifdef RX_TRACK_PACKETS
if ((p->flags & RX_PKTFLAG_FREE)
|| (!queue_IsEnd(&call->tq, nxp)
#endif
if (p->flags & RX_PKTFLAG_ACKED) {
/* Since we may block, don't trust this */
- usenow.sec = usenow.usec = 0;
if (rx_stats_active)
rx_atomic_inc(&rx_stats.ignoreAckedPacket);
continue; /* Ignore this packet if it has been acknowledged */
}
/* Transmit the packet if it needs to be sent. */
- if (!clock_Lt(&now, &p->retryTime)) {
+ if (!(p->flags & RX_PKTFLAG_SENT)) {
if (nXmitPackets == maxXmitPackets) {
rxi_SendXmitList(call, call->xmitList,
- nXmitPackets, istack, &now,
- &retryTime, resending);
+ nXmitPackets, istack);
goto restart;
}
- dpf(("call %d xmit packet %"AFS_PTR_FMT" now %u.%06u retryTime %u.%06u nextRetry %u.%06u\n",
- *(call->callNumber), p,
- now.sec, now.usec,
- p->retryTime.sec, p->retryTime.usec,
- retryTime.sec, retryTime.usec));
+ dpf(("call %d xmit packet %"AFS_PTR_FMT"\n",
+ *(call->callNumber), p));
call->xmitList[nXmitPackets++] = p;
}
}
* ready to send. Now we loop to send the packets */
if (nXmitPackets > 0) {
rxi_SendXmitList(call, call->xmitList, nXmitPackets,
- istack, &now, &retryTime, resending);
+ istack);
}
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- /*
- * TQ references no longer protected by this flag; they must remain
- * protected by the global lock.
- */
- if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
- call->flags &= ~RX_CALL_TQ_BUSY;
- if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
- dpf(("call %"AFS_PTR_FMT" has %d waiters and flags %d\n",
- call, call->tqWaiters, call->flags));
-#ifdef RX_ENABLE_LOCKS
- osirx_AssertMine(&call->lock, "rxi_Start start");
- CV_BROADCAST(&call->cv_tq);
-#else /* RX_ENABLE_LOCKS */
- osi_rxWakeup(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
- }
- return;
- }
if (call->error) {
/* We went into the error state while sending packets. Now is
* the time to reset the call. This will also inform the using
if (rx_stats_active)
rx_atomic_inc(&rx_tq_debug.rxi_start_aborted);
call->flags &= ~RX_CALL_TQ_BUSY;
- if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
- dpf(("call error %d while xmit %p has %d waiters and flags %d\n",
- call->error, call, call->tqWaiters, call->flags));
-#ifdef RX_ENABLE_LOCKS
- osirx_AssertMine(&call->lock, "rxi_Start middle");
- CV_BROADCAST(&call->cv_tq);
-#else /* RX_ENABLE_LOCKS */
- osi_rxWakeup(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
- }
+ rxi_WakeUpTransmitQueue(call);
rxi_CallError(call, call->error);
return;
}
call->flags |= RX_CALL_TQ_CLEARME;
}
#endif /* RX_ENABLE_LOCKS */
- /* Don't bother doing retransmits if the TQ is cleared. */
- if (call->flags & RX_CALL_TQ_CLEARME) {
+ if (call->flags & RX_CALL_TQ_CLEARME)
rxi_ClearTransmitQueue(call, 1);
- } else
-#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- {
-
- /* Always post a resend event, if there is anything in the
- * queue, and resend is possible. There should be at least
- * one unacknowledged packet in the queue ... otherwise none
- * of these packets should be on the queue in the first place.
- */
- if (call->resendEvent) {
- /* Cancel the existing event and post a new one */
- rxevent_Cancel(call->resendEvent, call,
- RX_CALL_REFCOUNT_RESEND);
- }
-
- /* The retry time is the retry time on the first unacknowledged
- * packet inside the current window */
- for (haveEvent =
- 0, queue_Scan(&call->tq, p, nxp, rx_packet)) {
- /* Don't set timers for packets outside the window */
- if (p->header.seq >= call->tfirst + call->twind) {
- break;
- }
-
- if (!(p->flags & RX_PKTFLAG_ACKED)
- && !clock_IsZero(&p->retryTime)) {
- haveEvent = 1;
- retryTime = p->retryTime;
- break;
- }
- }
-
- /* Post a new event to re-run rxi_Start when retries may be needed */
- if (haveEvent && !(call->flags & RX_CALL_NEED_START)) {
-#ifdef RX_ENABLE_LOCKS
- MUTEX_ENTER(&rx_refcnt_mutex);
- CALL_HOLD(call, RX_CALL_REFCOUNT_RESEND);
- MUTEX_EXIT(&rx_refcnt_mutex);
- call->resendEvent =
- rxevent_PostNow2(&retryTime, &usenow,
- rxi_StartUnlocked,
- (void *)call, 0, istack);
-#else /* RX_ENABLE_LOCKS */
- call->resendEvent =
- rxevent_PostNow2(&retryTime, &usenow, rxi_Start,
- (void *)call, 0, istack);
-#endif /* RX_ENABLE_LOCKS */
- }
- }
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
} while (call->flags & RX_CALL_NEED_START);
/*
* TQ references no longer protected by this flag; they must remain
* protected by the global lock.
*/
call->flags &= ~RX_CALL_TQ_BUSY;
- if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
- dpf(("call %"AFS_PTR_FMT" has %d waiters and flags %d\n",
- call, call->tqWaiters, call->flags));
-#ifdef RX_ENABLE_LOCKS
- osirx_AssertMine(&call->lock, "rxi_Start end");
- CV_BROADCAST(&call->cv_tq);
-#else /* RX_ENABLE_LOCKS */
- osi_rxWakeup(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
- }
+ rxi_WakeUpTransmitQueue(call);
} else {
call->flags |= RX_CALL_NEED_START;
}
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
} else {
- if (call->resendEvent) {
- rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
- }
+ rxi_rto_cancel(call);
}
}
/* Since we're about to send SOME sort of packet to the peer, it's
* safe to nuke any scheduled end-of-packets ack */
- rxevent_Cancel(call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
+ rxevent_Cancel(&call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
/* Actually send the packet, filling in more connection-specific fields */
MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
- MUTEX_EXIT(&rx_refcnt_mutex);
rxi_SendPacket(call, conn, p, istack);
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_RELE(call, RX_CALL_REFCOUNT_SEND);
- MUTEX_EXIT(&rx_refcnt_mutex);
MUTEX_ENTER(&call->lock);
/* Update last send time for this call (for keep-alive
* haveCTLock Set if calling from rxi_ReapConnections
*/
#ifdef RX_ENABLE_LOCKS
-int
+static int
rxi_CheckCall(struct rx_call *call, int haveCTLock)
#else /* RX_ENABLE_LOCKS */
-int
+static int
rxi_CheckCall(struct rx_call *call)
#endif /* RX_ENABLE_LOCKS */
{
afs_uint32 fudgeFactor;
int cerror = 0;
int newmtu = 0;
+ int idle_timeout = 0;
+ afs_int32 clock_diff = 0;
+
+ if (rxi_CheckPeerDead(call)) {
+ return -1;
+ }
+
+ now = clock_Sec();
+
+ /* Large swings in the clock can have a significant impact on
+ * the performance of RX call processing. Forward clock shifts
+ * will result in premature event triggering or timeouts.
+ * Backward shifts can result in calls not completing until
+ * the clock catches up with the original start clock value.
+ *
+ * If a backward clock shift of more than five minutes is noticed,
+ * just fail the call.
+ */
+ if (now < call->lastSendTime)
+ clock_diff = call->lastSendTime - now;
+ if (now < call->startWait)
+ clock_diff = MAX(clock_diff, call->startWait - now);
+ if (now < call->lastReceiveTime)
+ clock_diff = MAX(clock_diff, call->lastReceiveTime - now);
+ if (clock_diff > 5 * 60)
+ {
+ if (call->state == RX_STATE_ACTIVE)
+ rxi_CallError(call, RX_CALL_TIMEOUT);
+ return -1;
+ }
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
if (call->flags & RX_CALL_TQ_BUSY) {
}
#endif
/* RTT + 8*MDEV, rounded up to the next second. */
- fudgeFactor = (((afs_uint32) conn->peer->rtt >> 3) +
- ((afs_uint32) conn->peer->rtt_dev << 1) + 1023) >> 10;
+ fudgeFactor = (((afs_uint32) call->rtt >> 3) +
+ ((afs_uint32) call->rtt_dev << 1) + 1023) >> 10;
deadTime = conn->secondsUntilDead + fudgeFactor;
- now = clock_Sec();
/* These are computed to the second (+- 1 second). But that's
* good enough for these values, which should be a significant
* number of seconds. */
if (now > (call->lastReceiveTime + deadTime)) {
if (call->state == RX_STATE_ACTIVE) {
-#ifdef ADAPT_PMTU
-#if defined(KERNEL) && defined(AFS_SUN57_ENV)
+#ifdef AFS_ADAPT_PMTU
+# if defined(KERNEL) && defined(AFS_SUN5_ENV)
ire_t *ire;
-#if defined(AFS_SUN510_ENV) && defined(GLOBAL_NETSTACKID)
- netstack_t *ns = netstack_find_by_stackid(GLOBAL_NETSTACKID);
+# if defined(AFS_SUN510_ENV) && defined(GLOBAL_NETSTACKID)
+ netstack_t *ns = netstack_find_by_stackid(GLOBAL_NETSTACKID);
ip_stack_t *ipst = ns->netstack_ip;
-#endif
+# endif
ire = ire_cache_lookup(conn->peer->host
-#if defined(AFS_SUN510_ENV) && defined(ALL_ZONES)
+# if defined(AFS_SUN510_ENV) && defined(ALL_ZONES)
, ALL_ZONES
-#if defined(AFS_SUN510_ENV) && (defined(ICL_3_ARG) || defined(GLOBAL_NETSTACKID))
+# if defined(ICL_3_ARG) || defined(GLOBAL_NETSTACKID)
, NULL
-#if defined(AFS_SUN510_ENV) && defined(GLOBAL_NETSTACKID)
+# if defined(GLOBAL_NETSTACKID)
, ipst
-#endif
-#endif
-#endif
+# endif
+# endif
+# endif
);
if (ire && ire->ire_max_frag > 0)
rxi_SetPeerMtu(NULL, conn->peer->host, 0,
ire->ire_max_frag);
-#if defined(GLOBAL_NETSTACKID)
+# if defined(GLOBAL_NETSTACKID)
netstack_rele(ns);
-#endif
-#endif
-#endif /* ADAPT_PMTU */
+# endif
+# endif
+#endif /* AFS_ADAPT_PMTU */
cerror = RX_CALL_DEAD;
goto mtuout;
} else {
#ifdef RX_ENABLE_LOCKS
/* Cancel pending events */
- rxevent_Cancel(call->delayedAckEvent, call,
+ rxevent_Cancel(&call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
- rxevent_Cancel(call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
- rxevent_Cancel(call->keepAliveEvent, call,
+ rxi_rto_cancel(call);
+ rxevent_Cancel(&call->keepAliveEvent, call,
RX_CALL_REFCOUNT_ALIVE);
+ rxevent_Cancel(&call->growMTUEvent, call,
+ RX_CALL_REFCOUNT_MTU);
MUTEX_ENTER(&rx_refcnt_mutex);
- if (call->refCount == 0) {
- rxi_FreeCall(call, haveCTLock);
+ /* if rxi_FreeCall returns 1 it has freed the call */
+ if (call->refCount == 0 &&
+ rxi_FreeCall(call, haveCTLock))
+ {
MUTEX_EXIT(&rx_refcnt_mutex);
- return -2;
+ return -2;
}
MUTEX_EXIT(&rx_refcnt_mutex);
return -1;
* attached process can die reasonably gracefully. */
}
- if (conn->idleDeadTime) {
- idleDeadTime = conn->idleDeadTime + fudgeFactor;
- }
+ if (conn->idleDeadDetection) {
+ if (conn->idleDeadTime) {
+ idleDeadTime = conn->idleDeadTime + fudgeFactor;
+ }
- /* see if we have a non-activity timeout */
- if (call->startWait && idleDeadTime
- && ((call->startWait + idleDeadTime) < now) &&
- (call->flags & RX_CALL_READER_WAIT)) {
- if (call->state == RX_STATE_ACTIVE) {
- cerror = RX_CALL_TIMEOUT;
- goto mtuout;
- }
- }
- if (call->lastSendData && idleDeadTime && (conn->idleDeadErr != 0)
- && ((call->lastSendData + idleDeadTime) < now)) {
- if (call->state == RX_STATE_ACTIVE) {
- cerror = conn->idleDeadErr;
- goto mtuout;
- }
+ if (idleDeadTime) {
+ /* see if we have a non-activity timeout */
+ if (call->startWait && ((call->startWait + idleDeadTime) < now) &&
+ (call->flags & RX_CALL_READER_WAIT)) {
+ if (call->state == RX_STATE_ACTIVE) {
+ cerror = RX_CALL_TIMEOUT;
+ goto mtuout;
+ }
+ }
+
+ if (call->lastSendData && ((call->lastSendData + idleDeadTime) < now)) {
+ if (call->state == RX_STATE_ACTIVE) {
+ cerror = conn->service ? conn->service->idleDeadErr : RX_CALL_IDLE;
+ idle_timeout = 1;
+ goto mtuout;
+ }
+ }
+ }
}
- if (hardDeadTime) {
+ if (conn->hardDeadTime) {
hardDeadTime = conn->hardDeadTime + fudgeFactor;
}
}
return 0;
mtuout:
- if (conn->msgsizeRetryErr && cerror != RX_CALL_TIMEOUT
- && call->lastReceiveTime) {
+ if (conn->msgsizeRetryErr && cerror != RX_CALL_TIMEOUT && !idle_timeout &&
+ call->lastReceiveTime) {
int oldMTU = conn->peer->ifMTU;
/* if we thought we could send more, perhaps things got worse */
}
void
-rxi_NatKeepAliveEvent(struct rxevent *event, void *arg1, void *dummy)
+rxi_NatKeepAliveEvent(struct rxevent *event, void *arg1,
+ void *dummy, int dummy2)
{
struct rx_connection *conn = arg1;
struct rx_header theader;
- char tbuffer[1500];
+ char tbuffer[1 + sizeof(struct rx_header)];
struct sockaddr_in taddr;
char *tp;
char a[1] = { 0 };
MUTEX_ENTER(&rx_refcnt_mutex);
/* Only reschedule ourselves if the connection would not be destroyed */
if (conn->refCount <= 1) {
+ rxevent_Put(conn->natKeepAliveEvent);
conn->natKeepAliveEvent = NULL;
MUTEX_EXIT(&rx_refcnt_mutex);
MUTEX_EXIT(&conn->conn_data_lock);
} else {
conn->refCount--; /* drop the reference for this */
MUTEX_EXIT(&rx_refcnt_mutex);
+ rxevent_Put(conn->natKeepAliveEvent);
conn->natKeepAliveEvent = NULL;
rxi_ScheduleNatKeepAliveEvent(conn);
MUTEX_EXIT(&conn->conn_data_lock);
}
}
-void
+static void
rxi_ScheduleNatKeepAliveEvent(struct rx_connection *conn)
{
if (!conn->natKeepAliveEvent && conn->secondsUntilNatPing) {
conn->refCount++; /* hold a reference for this */
MUTEX_EXIT(&rx_refcnt_mutex);
conn->natKeepAliveEvent =
- rxevent_PostNow(&when, &now, rxi_NatKeepAliveEvent, conn, 0);
+ rxevent_Post(&when, &now, rxi_NatKeepAliveEvent, conn, NULL, 0);
}
}
{
MUTEX_ENTER(&conn->conn_data_lock);
conn->secondsUntilNatPing = seconds;
- if (seconds != 0)
- rxi_ScheduleNatKeepAliveEvent(conn);
- MUTEX_EXIT(&conn->conn_data_lock);
-}
-
-void
-rxi_NatKeepAliveOn(struct rx_connection *conn)
-{
- MUTEX_ENTER(&conn->conn_data_lock);
- rxi_ScheduleNatKeepAliveEvent(conn);
+ if (seconds != 0) {
+ if (!(conn->flags & RX_CONN_ATTACHWAIT))
+ rxi_ScheduleNatKeepAliveEvent(conn);
+ else
+ conn->flags |= RX_CONN_NAT_PING;
+ }
MUTEX_EXIT(&conn->conn_data_lock);
}
* keep-alive packet (if we're actually trying to keep the call alive)
*/
void
-rxi_KeepAliveEvent(struct rxevent *event, void *arg1, void *dummy)
+rxi_KeepAliveEvent(struct rxevent *event, void *arg1, void *dummy,
+ int dummy2)
{
struct rx_call *call = arg1;
struct rx_connection *conn;
afs_uint32 now;
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_RELE(call, RX_CALL_REFCOUNT_ALIVE);
- MUTEX_EXIT(&rx_refcnt_mutex);
MUTEX_ENTER(&call->lock);
- if (event == call->keepAliveEvent)
+
+ if (event == call->keepAliveEvent) {
+ rxevent_Put(call->keepAliveEvent);
call->keepAliveEvent = NULL;
+ }
+
now = clock_Sec();
#ifdef RX_ENABLE_LOCKS
/* Does what's on the nameplate. */
void
-rxi_GrowMTUEvent(struct rxevent *event, void *arg1, void *dummy)
+rxi_GrowMTUEvent(struct rxevent *event, void *arg1, void *dummy, int dummy2)
{
struct rx_call *call = arg1;
struct rx_connection *conn;
- MUTEX_ENTER(&rx_refcnt_mutex);
- CALL_RELE(call, RX_CALL_REFCOUNT_ALIVE);
- MUTEX_EXIT(&rx_refcnt_mutex);
+ CALL_RELE(call, RX_CALL_REFCOUNT_MTU);
MUTEX_ENTER(&call->lock);
- if (event == call->growMTUEvent)
+ if (event == call->growMTUEvent) {
+ rxevent_Put(call->growMTUEvent);
call->growMTUEvent = NULL;
+ }
#ifdef RX_ENABLE_LOCKS
if (rxi_CheckCall(call, 0)) {
*/
if ((conn->peer->maxPacketSize != 0) &&
(conn->peer->natMTU < RX_MAX_PACKET_SIZE) &&
- (conn->idleDeadErr))
+ conn->idleDeadDetection)
(void)rxi_SendAck(call, NULL, 0, RX_ACK_MTU, 0);
rxi_ScheduleGrowMTUEvent(call, 0);
MUTEX_EXIT(&call->lock);
}
-void
+static void
rxi_ScheduleKeepAliveEvent(struct rx_call *call)
{
if (!call->keepAliveEvent) {
clock_GetTime(&now);
when = now;
when.sec += call->conn->secondsUntilPing;
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_HOLD(call, RX_CALL_REFCOUNT_ALIVE);
- MUTEX_EXIT(&rx_refcnt_mutex);
call->keepAliveEvent =
- rxevent_PostNow(&when, &now, rxi_KeepAliveEvent, call, 0);
+ rxevent_Post(&when, &now, rxi_KeepAliveEvent, call, NULL, 0);
}
}
-void
+static void
rxi_ScheduleGrowMTUEvent(struct rx_call *call, int secs)
{
if (!call->growMTUEvent) {
}
when.sec += secs;
- MUTEX_ENTER(&rx_refcnt_mutex);
- CALL_HOLD(call, RX_CALL_REFCOUNT_ALIVE);
- MUTEX_EXIT(&rx_refcnt_mutex);
+ CALL_HOLD(call, RX_CALL_REFCOUNT_MTU);
call->growMTUEvent =
- rxevent_PostNow(&when, &now, rxi_GrowMTUEvent, call, 0);
+ rxevent_Post(&when, &now, rxi_GrowMTUEvent, call, NULL, 0);
}
}
/* N.B. rxi_KeepAliveOff: is defined earlier as a macro */
-void
+static void
rxi_KeepAliveOn(struct rx_call *call)
{
/* Pretend last packet received was received now--i.e. if another
rxi_ScheduleKeepAliveEvent(call);
}
+/*
+ * Solely in order that callers not need to include rx_call.h
+ */
+void
+rx_KeepAliveOff(struct rx_call *call)
+{
+ rxi_KeepAliveOff(call);
+}
void
+rx_KeepAliveOn(struct rx_call *call)
+{
+ rxi_KeepAliveOn(call);
+}
+
+static void
rxi_GrowMTUOn(struct rx_call *call)
{
struct rx_connection *conn = call->conn;
/* This routine is called to send connection abort messages
* that have been delayed to throttle looping clients. */
-void
-rxi_SendDelayedConnAbort(struct rxevent *event,
- void *arg1, void *unused)
+static void
+rxi_SendDelayedConnAbort(struct rxevent *event, void *arg1, void *unused,
+ int unused2)
{
struct rx_connection *conn = arg1;
struct rx_packet *packet;
MUTEX_ENTER(&conn->conn_data_lock);
+ rxevent_Put(conn->delayedAbortEvent);
conn->delayedAbortEvent = NULL;
error = htonl(conn->error);
conn->abortCount++;
/* This routine is called to send call abort messages
* that have been delayed to throttle looping clients. */
-void
-rxi_SendDelayedCallAbort(struct rxevent *event,
- void *arg1, void *dummy)
+static void
+rxi_SendDelayedCallAbort(struct rxevent *event, void *arg1, void *dummy,
+ int dummy2)
{
struct rx_call *call = arg1;
struct rx_packet *packet;
MUTEX_ENTER(&call->lock);
+ rxevent_Put(call->delayedAbortEvent);
call->delayedAbortEvent = NULL;
error = htonl(call->error);
call->abortCount++;
rxi_FreePacket(packet);
}
MUTEX_EXIT(&call->lock);
- MUTEX_ENTER(&rx_refcnt_mutex);
CALL_RELE(call, RX_CALL_REFCOUNT_ABORT);
- MUTEX_EXIT(&rx_refcnt_mutex);
}
/* This routine is called periodically (every RX_AUTH_REQUEST_TIMEOUT
* seconds) to ask the client to authenticate itself. The routine
* issues a challenge to the client, which is obtained from the
* security object associated with the connection */
-void
+static void
rxi_ChallengeEvent(struct rxevent *event,
void *arg0, void *arg1, int tries)
{
struct rx_connection *conn = arg0;
- conn->challengeEvent = NULL;
+ if (event) {
+ rxevent_Put(conn->challengeEvent);
+ conn->challengeEvent = NULL;
+ }
+
if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) {
struct rx_packet *packet;
struct clock when, now;
when = now;
when.sec += RX_CHALLENGE_TIMEOUT;
conn->challengeEvent =
- rxevent_PostNow2(&when, &now, rxi_ChallengeEvent, conn, 0,
+ rxevent_Post(&when, &now, rxi_ChallengeEvent, conn, 0,
(tries - 1));
}
}
* security object associated with the connection is asked to create
* the challenge at this time. N.B. rxi_ChallengeOff is a macro,
* defined earlier. */
-void
+static void
rxi_ChallengeOn(struct rx_connection *conn)
{
if (!conn->challengeEvent) {
}
-/* Compute round trip time of the packet provided, in *rttp.
- */
-
/* rxi_ComputeRoundTripTime is called with peer locked. */
-/* sentp and/or peer may be null */
+/* peer may be null */
static void
rxi_ComputeRoundTripTime(struct rx_packet *p,
- struct clock *sentp,
+ struct rx_ackPacket *ack,
+ struct rx_call *call,
struct rx_peer *peer,
struct clock *now)
{
- struct clock thisRtt, *rttp = &thisRtt;
+ struct clock thisRtt, *sentp;
int rtt_timeout;
+ int serial;
+
+ /* If the ACK is delayed, then do nothing */
+ if (ack->reason == RX_ACK_DELAY)
+ return;
+
+ /* On the wire, jumbograms are a single UDP packet. We shouldn't count
+ * their RTT multiple times, so only include the RTT of the last packet
+ * in a jumbogram */
+ if (p->flags & RX_JUMBO_PACKET)
+ return;
+
+ /* Use the serial number to determine which transmission the ACK is for,
+ * and set the sent time to match this. If we have no serial number, then
+ * only use the ACK for RTT calculations if the packet has not been
+ * retransmitted
+ */
+
+ serial = ntohl(ack->serial);
+ if (serial) {
+ if (serial == p->header.serial) {
+ sentp = &p->timeSent;
+ } else if (serial == p->firstSerial) {
+ sentp = &p->firstSent;
+ } else if (clock_Eq(&p->timeSent, &p->firstSent)) {
+ sentp = &p->firstSent;
+ } else
+ return;
+ } else {
+ if (clock_Eq(&p->timeSent, &p->firstSent)) {
+ sentp = &p->firstSent;
+ } else
+ return;
+ }
thisRtt = *now;
- if (clock_Lt(rttp, sentp))
+ if (clock_Lt(&thisRtt, sentp))
return; /* somebody set the clock back, don't count this time. */
- clock_Sub(rttp, sentp);
+ clock_Sub(&thisRtt, sentp);
dpf(("rxi_ComputeRoundTripTime(call=%d packet=%"AFS_PTR_FMT" rttp=%d.%06d sec)\n",
- p->header.callNumber, p, rttp->sec, rttp->usec));
+ p->header.callNumber, p, thisRtt.sec, thisRtt.usec));
- if (rttp->sec == 0 && rttp->usec == 0) {
+ if (clock_IsZero(&thisRtt)) {
/*
* The actual round trip time is shorter than the
* clock_GetTime resolution. It is most likely 1ms or 100ns.
* Since we can't tell which at the moment we will assume 1ms.
*/
- rttp->usec = 1000;
+ thisRtt.usec = 1000;
}
if (rx_stats_active) {
MUTEX_ENTER(&rx_stats_mutex);
- if (clock_Lt(rttp, &rx_stats.minRtt))
- rx_stats.minRtt = *rttp;
- if (clock_Gt(rttp, &rx_stats.maxRtt)) {
- if (rttp->sec > 60) {
+ if (clock_Lt(&thisRtt, &rx_stats.minRtt))
+ rx_stats.minRtt = thisRtt;
+ if (clock_Gt(&thisRtt, &rx_stats.maxRtt)) {
+ if (thisRtt.sec > 60) {
MUTEX_EXIT(&rx_stats_mutex);
return; /* somebody set the clock ahead */
}
- rx_stats.maxRtt = *rttp;
+ rx_stats.maxRtt = thisRtt;
}
- clock_Add(&rx_stats.totalRtt, rttp);
+ clock_Add(&rx_stats.totalRtt, &thisRtt);
rx_atomic_inc(&rx_stats.nRttSamples);
MUTEX_EXIT(&rx_stats_mutex);
}
/* better rtt calculation courtesy of UMich crew (dave,larry,peter,?) */
/* Apply VanJacobson round-trip estimations */
- if (peer->rtt) {
+ if (call->rtt) {
int delta;
/*
- * srtt (peer->rtt) is in units of one-eighth-milliseconds.
+ * srtt (call->rtt) is in units of one-eighth-milliseconds.
* srtt is stored as fixed point with 3 bits after the binary
* point (i.e., scaled by 8). The following magic is
* equivalent to the smoothing algorithm in rfc793 with an
* srtt' = srtt + (rtt - srtt)/8
*/
- delta = _8THMSEC(rttp) - peer->rtt;
- peer->rtt += (delta >> 3);
+ delta = _8THMSEC(&thisRtt) - call->rtt;
+ call->rtt += (delta >> 3);
/*
* We accumulate a smoothed rtt variance (actually, a smoothed
if (delta < 0)
delta = -delta;
- delta -= (peer->rtt_dev << 1);
- peer->rtt_dev += (delta >> 3);
+ delta -= (call->rtt_dev << 1);
+ call->rtt_dev += (delta >> 3);
} else {
/* I don't have a stored RTT so I start with this value. Since I'm
* probably just starting a call, and will be pushing more data down
* little, and I set deviance to half the rtt. In practice,
* deviance tends to approach something a little less than
* half the smoothed rtt. */
- peer->rtt = _8THMSEC(rttp) + 8;
- peer->rtt_dev = peer->rtt >> 2; /* rtt/2: they're scaled differently */
+ call->rtt = _8THMSEC(&thisRtt) + 8;
+ call->rtt_dev = call->rtt >> 2; /* rtt/2: they're scaled differently */
}
- /* the timeout is RTT + 4*MDEV but no less than rx_minPeerTimeout msec.
- * This is because one end or the other of these connections is usually
- * in a user process, and can be switched and/or swapped out. So on fast,
- * reliable networks, the timeout would otherwise be too short. */
- rtt_timeout = MAX(((peer->rtt >> 3) + peer->rtt_dev), rx_minPeerTimeout);
- clock_Zero(&(peer->timeout));
- clock_Addmsec(&(peer->timeout), rtt_timeout);
+ /* the smoothed RTT time is RTT + 4*MDEV
+ *
+ * We allow a user specified minimum to be set for this, to allow clamping
+ * at a minimum value in the same way as TCP. In addition, we have to allow
+ * for the possibility that this packet is answered by a delayed ACK, so we
+ * add on a fixed 200ms to account for that timer expiring.
+ */
+
+ rtt_timeout = MAX(((call->rtt >> 3) + call->rtt_dev),
+ rx_minPeerTimeout) + 200;
+ clock_Zero(&call->rto);
+ clock_Addmsec(&call->rto, rtt_timeout);
- /* Reset the backedOff flag since we just computed a new timeout value */
- peer->backedOff = 0;
+ /* Update the peer, so any new calls start with our values */
+ peer->rtt_dev = call->rtt_dev;
+ peer->rtt = call->rtt;
dpf(("rxi_ComputeRoundTripTime(call=%d packet=%"AFS_PTR_FMT" rtt=%d ms, srtt=%d ms, rtt_dev=%d ms, timeout=%d.%06d sec)\n",
- p->header.callNumber, p, MSEC(rttp), peer->rtt >> 3, peer->rtt_dev >> 2, (peer->timeout.sec), (peer->timeout.usec)));
+ p->header.callNumber, p, MSEC(&thisRtt), call->rtt >> 3, call->rtt_dev >> 2, (call->rto.sec), (call->rto.usec)));
}
/* Find all server connections that have not been active for a long time, and
* toss them */
-void
-rxi_ReapConnections(struct rxevent *unused, void *unused1, void *unused2)
+static void
+rxi_ReapConnections(struct rxevent *unused, void *unused1, void *unused2,
+ int unused3)
{
struct clock now, when;
clock_GetTime(&now);
when = now;
when.sec += RX_REAP_TIME; /* Check every RX_REAP_TIME seconds */
- rxevent_Post(&when, rxi_ReapConnections, 0, 0);
+ rxevent_Put(rxevent_Post(&when, &now, rxi_ReapConnections, 0, NULL, 0));
}
return RXS_Close(aobj);
}
-#ifdef ADAPT_WINDOW
-#define RXRATE_PKT_OH (RX_HEADER_SIZE + RX_IPUDP_SIZE)
-#define RXRATE_SMALL_PKT (RXRATE_PKT_OH + sizeof(struct rx_ackPacket))
-#define RXRATE_AVG_SMALL_PKT (RXRATE_PKT_OH + (sizeof(struct rx_ackPacket)/2))
-#define RXRATE_LARGE_PKT (RXRATE_SMALL_PKT + 256)
-
-/* Adjust our estimate of the transmission rate to this peer, given
- * that the packet p was just acked. We can adjust peer->timeout and
- * call->twind. Pragmatically, this is called
- * only with packets of maximal length.
- * Called with peer and call locked.
- */
-
-static void
-rxi_ComputeRate(struct rx_peer *peer, struct rx_call *call,
- struct rx_packet *p, struct rx_packet *ackp, u_char ackReason)
-{
- afs_int32 xferSize, xferMs;
- afs_int32 minTime;
- struct clock newTO;
-
- /* Count down packets */
- if (peer->rateFlag > 0)
- peer->rateFlag--;
- /* Do nothing until we're enabled */
- if (peer->rateFlag != 0)
- return;
- if (!call->conn)
- return;
-
- /* Count only when the ack seems legitimate */
- switch (ackReason) {
- case RX_ACK_REQUESTED:
- xferSize =
- p->length + RX_HEADER_SIZE + call->conn->securityMaxTrailerSize;
- xferMs = peer->rtt;
- break;
-
- case RX_ACK_PING_RESPONSE:
- if (p) /* want the response to ping-request, not data send */
- return;
- clock_GetTime(&newTO);
- if (clock_Gt(&newTO, &call->pingRequestTime)) {
- clock_Sub(&newTO, &call->pingRequestTime);
- xferMs = (newTO.sec * 1000) + (newTO.usec / 1000);
- } else {
- return;
- }
- xferSize = rx_AckDataSize(rx_maxSendWindow) + RX_HEADER_SIZE;
- break;
-
- default:
- return;
- }
-
- dpf(("CONG peer %lx/%u: sample (%s) size %ld, %ld ms (to %d.%06d, rtt %u, ps %u)\n",
- ntohl(peer->host), ntohs(peer->port), (ackReason == RX_ACK_REQUESTED ? "dataack" : "pingack"),
- xferSize, xferMs, peer->timeout.sec, peer->timeout.usec, peer->smRtt, peer->ifMTU));
-
- /* Track only packets that are big enough. */
- if ((p->length + RX_HEADER_SIZE + call->conn->securityMaxTrailerSize) <
- peer->ifMTU)
- return;
-
- /* absorb RTT data (in milliseconds) for these big packets */
- if (peer->smRtt == 0) {
- peer->smRtt = xferMs;
- } else {
- peer->smRtt = ((peer->smRtt * 15) + xferMs + 4) >> 4;
- if (!peer->smRtt)
- peer->smRtt = 1;
- }
-
- if (peer->countDown) {
- peer->countDown--;
- return;
- }
- peer->countDown = 10; /* recalculate only every so often */
-
- /* In practice, we can measure only the RTT for full packets,
- * because of the way Rx acks the data that it receives. (If it's
- * smaller than a full packet, it often gets implicitly acked
- * either by the call response (from a server) or by the next call
- * (from a client), and either case confuses transmission times
- * with processing times.) Therefore, replace the above
- * more-sophisticated processing with a simpler version, where the
- * smoothed RTT is kept for full-size packets, and the time to
- * transmit a windowful of full-size packets is simply RTT *
- * windowSize. Again, we take two steps:
- - ensure the timeout is large enough for a single packet's RTT;
- - ensure that the window is small enough to fit in the desired timeout.*/
-
- /* First, the timeout check. */
- minTime = peer->smRtt;
- /* Get a reasonable estimate for a timeout period */
- minTime += minTime;
- newTO.sec = minTime / 1000;
- newTO.usec = (minTime - (newTO.sec * 1000)) * 1000;
-
- /* Increase the timeout period so that we can always do at least
- * one packet exchange */
- if (clock_Gt(&newTO, &peer->timeout)) {
-
- dpf(("CONG peer %lx/%u: timeout %d.%06d ==> %ld.%06d (rtt %u)\n",
- ntohl(peer->host), ntohs(peer->port), peer->timeout.sec, peer->timeout.usec,
- newTO.sec, newTO.usec, peer->smRtt));
-
- peer->timeout = newTO;
- }
-
- /* Now, get an estimate for the transmit window size. */
- minTime = peer->timeout.sec * 1000 + (peer->timeout.usec / 1000);
- /* Now, convert to the number of full packets that could fit in a
- * reasonable fraction of that interval */
- minTime /= (peer->smRtt << 1);
- minTime = MAX(minTime, rx_minPeerTimeout);
- xferSize = minTime; /* (make a copy) */
-
- /* Now clamp the size to reasonable bounds. */
- if (minTime <= 1)
- minTime = 1;
- else if (minTime > rx_maxSendWindow)
- minTime = rx_maxSendWindow;
-/* if (minTime != peer->maxWindow) {
- dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, rtt %u)\n",
- ntohl(peer->host), ntohs(peer->port), peer->maxWindow, minTime,
- peer->timeout.sec, peer->timeout.usec, peer->smRtt));
- peer->maxWindow = minTime;
- elide... call->twind = minTime;
- }
-*/
-
- /* Cut back on the peer timeout if it had earlier grown unreasonably.
- * Discern this by calculating the timeout necessary for rx_Window
- * packets. */
- if ((xferSize > rx_maxSendWindow) && (peer->timeout.sec >= 3)) {
- /* calculate estimate for transmission interval in milliseconds */
- minTime = rx_maxSendWindow * peer->smRtt;
- if (minTime < 1000) {
- dpf(("CONG peer %lx/%u: cut TO %d.%06d by 0.5 (rtt %u)\n",
- ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
- peer->timeout.usec, peer->smRtt));
-
- newTO.sec = 0; /* cut back on timeout by half a second */
- newTO.usec = 500000;
- clock_Sub(&peer->timeout, &newTO);
- }
- }
-
- return;
-} /* end of rxi_ComputeRate */
-#endif /* ADAPT_WINDOW */
-
-
void
rxi_DebugInit(void)
{
void
rx_StatsOnOff(int on)
{
-#ifdef RXDEBUG
rx_stats_active = on;
-#endif
}
void
rx_PrintPeerStats(FILE * file, struct rx_peer *peer)
{
- fprintf(file, "Peer %x.%d. " "Burst size %d, " "burst wait %d.%06d.\n",
- ntohl(peer->host), (int)ntohs(peer->port), (int)peer->burstSize,
- (int)peer->burstWait.sec, (int)peer->burstWait.usec);
+ fprintf(file, "Peer %x.%d.\n",
+ ntohl(peer->host), (int)ntohs(peer->port));
fprintf(file,
- " Rtt %d, " "retry time %u.%06d, " "total sent %d, "
- "resent %d\n", peer->rtt, (int)peer->timeout.sec,
- (int)peer->timeout.usec, peer->nSent, peer->reSends);
+ " Rtt %d, " "total sent %d, " "resent %d\n",
+ peer->rtt, peer->nSent, peer->reSends);
- fprintf(file,
- " Packet size %d, " "max in packet skew %d, "
- "max out packet skew %d\n", peer->ifMTU, (int)peer->inPacketSkew,
- (int)peer->outPacketSkew);
+ fprintf(file, " Packet size %d\n", peer->ifMTU);
}
#endif
(struct sockaddr *)&taddr, sizeof(struct sockaddr_in));
/* see if there's a packet available */
- gettimeofday(&tv_wake,0);
+ gettimeofday(&tv_wake, NULL);
tv_wake.tv_sec += waitTime;
for (;;) {
FD_ZERO(&imask);
FD_SET(socket, &imask);
tv_delta.tv_sec = tv_wake.tv_sec;
tv_delta.tv_usec = tv_wake.tv_usec;
- gettimeofday(&tv_now, 0);
+ gettimeofday(&tv_now, NULL);
if (tv_delta.tv_usec < tv_now.tv_usec) {
/* borrow */
peer->ifMTU = ntohs(peer->ifMTU);
peer->idleWhen = ntohl(peer->idleWhen);
peer->refCount = ntohs(peer->refCount);
- peer->burstWait.sec = ntohl(peer->burstWait.sec);
- peer->burstWait.usec = ntohl(peer->burstWait.usec);
peer->rtt = ntohl(peer->rtt);
peer->rtt_dev = ntohl(peer->rtt_dev);
- peer->timeout.sec = ntohl(peer->timeout.sec);
- peer->timeout.usec = ntohl(peer->timeout.usec);
+ peer->timeout.sec = 0;
+ peer->timeout.usec = 0;
peer->nSent = ntohl(peer->nSent);
peer->reSends = ntohl(peer->reSends);
- peer->inPacketSkew = ntohl(peer->inPacketSkew);
- peer->outPacketSkew = ntohl(peer->outPacketSkew);
- peer->rateFlag = ntohl(peer->rateFlag);
peer->natMTU = ntohs(peer->natMTU);
peer->maxMTU = ntohs(peer->maxMTU);
peer->maxDgramPackets = ntohs(peer->maxDgramPackets);
peerStats->ifMTU = tp->ifMTU;
peerStats->idleWhen = tp->idleWhen;
peerStats->refCount = tp->refCount;
- peerStats->burstSize = tp->burstSize;
- peerStats->burst = tp->burst;
- peerStats->burstWait.sec = tp->burstWait.sec;
- peerStats->burstWait.usec = tp->burstWait.usec;
+ peerStats->burstSize = 0;
+ peerStats->burst = 0;
+ peerStats->burstWait.sec = 0;
+ peerStats->burstWait.usec = 0;
peerStats->rtt = tp->rtt;
peerStats->rtt_dev = tp->rtt_dev;
- peerStats->timeout.sec = tp->timeout.sec;
- peerStats->timeout.usec = tp->timeout.usec;
+ peerStats->timeout.sec = 0;
+ peerStats->timeout.usec = 0;
peerStats->nSent = tp->nSent;
peerStats->reSends = tp->reSends;
- peerStats->inPacketSkew = tp->inPacketSkew;
- peerStats->outPacketSkew = tp->outPacketSkew;
- peerStats->rateFlag = tp->rateFlag;
peerStats->natMTU = tp->natMTU;
peerStats->maxMTU = tp->maxMTU;
peerStats->maxDgramPackets = tp->maxDgramPackets;
peerStats->cwind = tp->cwind;
peerStats->nDgramPackets = tp->nDgramPackets;
peerStats->congestSeq = tp->congestSeq;
- peerStats->bytesSent.high = tp->bytesSent.high;
- peerStats->bytesSent.low = tp->bytesSent.low;
- peerStats->bytesReceived.high = tp->bytesReceived.high;
- peerStats->bytesReceived.low = tp->bytesReceived.low;
+ peerStats->bytesSent.high = tp->bytesSent >> 32;
+ peerStats->bytesSent.low = tp->bytesSent & MAX_AFS_UINT32;
+ peerStats->bytesReceived.high = tp->bytesReceived >> 32;
+ peerStats->bytesReceived.low
+ = tp->bytesReceived & MAX_AFS_UINT32;
MUTEX_EXIT(&tp->peer_lock);
MUTEX_ENTER(&rx_peerHashTable_lock);
int i;
MUTEX_ENTER(&conn->conn_data_lock);
if (!conn->specific) {
- conn->specific = (void **)malloc((key + 1) * sizeof(void *));
+ conn->specific = malloc((key + 1) * sizeof(void *));
for (i = 0; i < key; i++)
conn->specific[i] = NULL;
conn->nSpecific = key + 1;
int i;
MUTEX_ENTER(&svc->svc_data_lock);
if (!svc->specific) {
- svc->specific = (void **)malloc((key + 1) * sizeof(void *));
+ svc->specific = malloc((key + 1) * sizeof(void *));
for (i = 0; i < key; i++)
svc->specific[i] = NULL;
svc->nSpecific = key + 1;
static int rxi_monitor_peerStats = 0;
-/*
- * rxi_AddRpcStat - given all of the information for a particular rpc
- * call, create (if needed) and update the stat totals for the rpc.
- *
- * PARAMETERS
- *
- * IN stats - the queue of stats that will be updated with the new value
- *
- * IN rxInterface - a unique number that identifies the rpc interface
- *
- * IN currentFunc - the index of the function being invoked
- *
- * IN totalFunc - the total number of functions in this interface
- *
- * IN queueTime - the amount of time this function waited for a thread
+
+void
+rxi_ClearRPCOpStat(rx_function_entry_v1_p rpc_stat)
+{
+ rpc_stat->invocations = 0;
+ rpc_stat->bytes_sent = 0;
+ rpc_stat->bytes_rcvd = 0;
+ rpc_stat->queue_time_sum.sec = 0;
+ rpc_stat->queue_time_sum.usec = 0;
+ rpc_stat->queue_time_sum_sqr.sec = 0;
+ rpc_stat->queue_time_sum_sqr.usec = 0;
+ rpc_stat->queue_time_min.sec = 9999999;
+ rpc_stat->queue_time_min.usec = 9999999;
+ rpc_stat->queue_time_max.sec = 0;
+ rpc_stat->queue_time_max.usec = 0;
+ rpc_stat->execution_time_sum.sec = 0;
+ rpc_stat->execution_time_sum.usec = 0;
+ rpc_stat->execution_time_sum_sqr.sec = 0;
+ rpc_stat->execution_time_sum_sqr.usec = 0;
+ rpc_stat->execution_time_min.sec = 9999999;
+ rpc_stat->execution_time_min.usec = 9999999;
+ rpc_stat->execution_time_max.sec = 0;
+ rpc_stat->execution_time_max.usec = 0;
+}
+
+/*!
+ * Given all of the information for a particular rpc
+ * call, find or create (if requested) the stat structure for the rpc.
*
- * IN execTime - the amount of time this function invocation took to execute
+ * @param stats
+ * the queue of stats that will be updated with the new value
*
- * IN bytesSent - the number bytes sent by this invocation
+ * @param rxInterface
+ * a unique number that identifies the rpc interface
*
- * IN bytesRcvd - the number bytes received by this invocation
+ * @param totalFunc
+ * the total number of functions in this interface. this is only
+ * required if create is true
*
- * IN isServer - if true, this invocation was made to a server
+ * @param isServer
+ * if true, this invocation was made to a server
*
- * IN remoteHost - the ip address of the remote host
+ * @param remoteHost
+ * the ip address of the remote host. this is only required if create
+ * and addToPeerList are true
*
- * IN remotePort - the port of the remote host
+ * @param remotePort
+ * the port of the remote host. this is only required if create
+ * and addToPeerList are true
*
- * IN addToPeerList - if != 0, add newly created stat to the global peer list
+ * @param addToPeerList
+ * if != 0, add newly created stat to the global peer list
*
- * INOUT counter - if a new stats structure is allocated, the counter will
- * be updated with the new number of allocated stat structures
+ * @param counter
+ * if a new stats structure is allocated, the counter will
+ * be updated with the new number of allocated stat structures.
+ * only required if create is true
*
- * RETURN CODES
+ * @param create
+ * if no stats structure exists, allocate one
*
- * Returns void.
*/
-static int
-rxi_AddRpcStat(struct rx_queue *stats, afs_uint32 rxInterface,
- afs_uint32 currentFunc, afs_uint32 totalFunc,
- struct clock *queueTime, struct clock *execTime,
- afs_hyper_t * bytesSent, afs_hyper_t * bytesRcvd, int isServer,
- afs_uint32 remoteHost, afs_uint32 remotePort,
- int addToPeerList, unsigned int *counter)
+static rx_interface_stat_p
+rxi_FindRpcStat(struct rx_queue *stats, afs_uint32 rxInterface,
+ afs_uint32 totalFunc, int isServer, afs_uint32 remoteHost,
+ afs_uint32 remotePort, int addToPeerList,
+ unsigned int *counter, int create)
{
- int rc = 0;
rx_interface_stat_p rpc_stat, nrpc_stat;
/*
break;
}
+ /* if they didn't ask us to create, we're done */
+ if (!create) {
+ if (queue_IsEnd(stats, rpc_stat))
+ return NULL;
+ else
+ return rpc_stat;
+ }
+
+ /* can't proceed without these */
+ if (!totalFunc || !counter)
+ return NULL;
+
/*
* Didn't find a match so allocate a new structure and add it to the
* queue.
totalFunc * sizeof(rx_function_entry_v1_t);
rpc_stat = rxi_Alloc(space);
- if (rpc_stat == NULL) {
- rc = 1;
- goto fail;
- }
+ if (rpc_stat == NULL)
+ return NULL;
+
*counter += totalFunc;
for (i = 0; i < totalFunc; i++) {
+ rxi_ClearRPCOpStat(&(rpc_stat->stats[i]));
rpc_stat->stats[i].remote_peer = remoteHost;
rpc_stat->stats[i].remote_port = remotePort;
rpc_stat->stats[i].remote_is_server = isServer;
rpc_stat->stats[i].interfaceId = rxInterface;
rpc_stat->stats[i].func_total = totalFunc;
rpc_stat->stats[i].func_index = i;
- hzero(rpc_stat->stats[i].invocations);
- hzero(rpc_stat->stats[i].bytes_sent);
- hzero(rpc_stat->stats[i].bytes_rcvd);
- rpc_stat->stats[i].queue_time_sum.sec = 0;
- rpc_stat->stats[i].queue_time_sum.usec = 0;
- rpc_stat->stats[i].queue_time_sum_sqr.sec = 0;
- rpc_stat->stats[i].queue_time_sum_sqr.usec = 0;
- rpc_stat->stats[i].queue_time_min.sec = 9999999;
- rpc_stat->stats[i].queue_time_min.usec = 9999999;
- rpc_stat->stats[i].queue_time_max.sec = 0;
- rpc_stat->stats[i].queue_time_max.usec = 0;
- rpc_stat->stats[i].execution_time_sum.sec = 0;
- rpc_stat->stats[i].execution_time_sum.usec = 0;
- rpc_stat->stats[i].execution_time_sum_sqr.sec = 0;
- rpc_stat->stats[i].execution_time_sum_sqr.usec = 0;
- rpc_stat->stats[i].execution_time_min.sec = 9999999;
- rpc_stat->stats[i].execution_time_min.usec = 9999999;
- rpc_stat->stats[i].execution_time_max.sec = 0;
- rpc_stat->stats[i].execution_time_max.usec = 0;
}
queue_Prepend(stats, rpc_stat);
if (addToPeerList) {
queue_Prepend(&peerStats, &rpc_stat->all_peers);
}
}
+ return rpc_stat;
+}
+
+void
+rx_ClearProcessRPCStats(afs_int32 rxInterface)
+{
+ rx_interface_stat_p rpc_stat;
+ int totalFunc, i;
+
+ if (rxInterface == -1)
+ return;
+
+ MUTEX_ENTER(&rx_rpc_stats);
+ rpc_stat = rxi_FindRpcStat(&processStats, rxInterface, 0, 0,
+ 0, 0, 0, 0, 0);
+ if (rpc_stat) {
+ totalFunc = rpc_stat->stats[0].func_total;
+ for (i = 0; i < totalFunc; i++)
+ rxi_ClearRPCOpStat(&(rpc_stat->stats[i]));
+ }
+ MUTEX_EXIT(&rx_rpc_stats);
+ return;
+}
+
+void
+rx_ClearPeerRPCStats(afs_int32 rxInterface, afs_uint32 peerHost, afs_uint16 peerPort)
+{
+ rx_interface_stat_p rpc_stat;
+ int totalFunc, i;
+ struct rx_peer * peer;
+
+ if (rxInterface == -1)
+ return;
+
+ peer = rxi_FindPeer(peerHost, peerPort, 0, 0);
+ if (!peer)
+ return;
+
+ MUTEX_ENTER(&rx_rpc_stats);
+ rpc_stat = rxi_FindRpcStat(&peer->rpcStats, rxInterface, 0, 1,
+ 0, 0, 0, 0, 0);
+ if (rpc_stat) {
+ totalFunc = rpc_stat->stats[0].func_total;
+ for (i = 0; i < totalFunc; i++)
+ rxi_ClearRPCOpStat(&(rpc_stat->stats[i]));
+ }
+ MUTEX_EXIT(&rx_rpc_stats);
+ return;
+}
+
+void *
+rx_CopyProcessRPCStats(afs_uint64 op)
+{
+ rx_interface_stat_p rpc_stat;
+ rx_function_entry_v1_p rpcop_stat =
+ rxi_Alloc(sizeof(rx_function_entry_v1_t));
+ int currentFunc = (op & MAX_AFS_UINT32);
+ afs_int32 rxInterface = (op >> 32);
+
+ if (!rxi_monitor_processStats)
+ return NULL;
+
+ if (rxInterface == -1)
+ return NULL;
+
+ if (rpcop_stat == NULL)
+ return NULL;
+
+ MUTEX_ENTER(&rx_rpc_stats);
+ rpc_stat = rxi_FindRpcStat(&processStats, rxInterface, 0, 0,
+ 0, 0, 0, 0, 0);
+ if (rpc_stat)
+ memcpy(rpcop_stat, &(rpc_stat->stats[currentFunc]),
+ sizeof(rx_function_entry_v1_t));
+ MUTEX_EXIT(&rx_rpc_stats);
+ if (!rpc_stat) {
+ rxi_Free(rpcop_stat, sizeof(rx_function_entry_v1_t));
+ return NULL;
+ }
+ return rpcop_stat;
+}
+
+void *
+rx_CopyPeerRPCStats(afs_uint64 op, afs_uint32 peerHost, afs_uint16 peerPort)
+{
+ rx_interface_stat_p rpc_stat;
+ rx_function_entry_v1_p rpcop_stat =
+ rxi_Alloc(sizeof(rx_function_entry_v1_t));
+ int currentFunc = (op & MAX_AFS_UINT32);
+ afs_int32 rxInterface = (op >> 32);
+ struct rx_peer *peer;
+
+ if (!rxi_monitor_peerStats)
+ return NULL;
+
+ if (rxInterface == -1)
+ return NULL;
+
+ if (rpcop_stat == NULL)
+ return NULL;
+
+ peer = rxi_FindPeer(peerHost, peerPort, 0, 0);
+ if (!peer)
+ return NULL;
+
+ MUTEX_ENTER(&rx_rpc_stats);
+ rpc_stat = rxi_FindRpcStat(&peer->rpcStats, rxInterface, 0, 1,
+ 0, 0, 0, 0, 0);
+ if (rpc_stat)
+ memcpy(rpcop_stat, &(rpc_stat->stats[currentFunc]),
+ sizeof(rx_function_entry_v1_t));
+ MUTEX_EXIT(&rx_rpc_stats);
+ if (!rpc_stat) {
+ rxi_Free(rpcop_stat, sizeof(rx_function_entry_v1_t));
+ return NULL;
+ }
+ return rpcop_stat;
+}
+
+void
+rx_ReleaseRPCStats(void *stats)
+{
+ if (stats)
+ rxi_Free(stats, sizeof(rx_function_entry_v1_t));
+}
+
+/*!
+ * Given all of the information for a particular rpc
+ * call, create (if needed) and update the stat totals for the rpc.
+ *
+ * @param stats
+ * the queue of stats that will be updated with the new value
+ *
+ * @param rxInterface
+ * a unique number that identifies the rpc interface
+ *
+ * @param currentFunc
+ * the index of the function being invoked
+ *
+ * @param totalFunc
+ * the total number of functions in this interface
+ *
+ * @param queueTime
+ * the amount of time this function waited for a thread
+ *
+ * @param execTime
+ * the amount of time this function invocation took to execute
+ *
+ * @param bytesSent
+ * the number bytes sent by this invocation
+ *
+ * @param bytesRcvd
+ * the number bytes received by this invocation
+ *
+ * @param isServer
+ * if true, this invocation was made to a server
+ *
+ * @param remoteHost
+ * the ip address of the remote host
+ *
+ * @param remotePort
+ * the port of the remote host
+ *
+ * @param addToPeerList
+ * if != 0, add newly created stat to the global peer list
+ *
+ * @param counter
+ * if a new stats structure is allocated, the counter will
+ * be updated with the new number of allocated stat structures
+ *
+ */
+
+static int
+rxi_AddRpcStat(struct rx_queue *stats, afs_uint32 rxInterface,
+ afs_uint32 currentFunc, afs_uint32 totalFunc,
+ struct clock *queueTime, struct clock *execTime,
+ afs_uint64 bytesSent, afs_uint64 bytesRcvd, int isServer,
+ afs_uint32 remoteHost, afs_uint32 remotePort,
+ int addToPeerList, unsigned int *counter)
+{
+ int rc = 0;
+ rx_interface_stat_p rpc_stat;
+
+ rpc_stat = rxi_FindRpcStat(stats, rxInterface, totalFunc, isServer,
+ remoteHost, remotePort, addToPeerList, counter,
+ 1);
+ if (!rpc_stat) {
+ rc = -1;
+ goto fail;
+ }
/*
* Increment the stats for this function
*/
- hadd32(rpc_stat->stats[currentFunc].invocations, 1);
- hadd(rpc_stat->stats[currentFunc].bytes_sent, *bytesSent);
- hadd(rpc_stat->stats[currentFunc].bytes_rcvd, *bytesRcvd);
+ rpc_stat->stats[currentFunc].invocations++;
+ rpc_stat->stats[currentFunc].bytes_sent += bytesSent;
+ rpc_stat->stats[currentFunc].bytes_rcvd += bytesRcvd;
clock_Add(&rpc_stat->stats[currentFunc].queue_time_sum, queueTime);
clock_AddSq(&rpc_stat->stats[currentFunc].queue_time_sum_sqr, queueTime);
if (clock_Lt(queueTime, &rpc_stat->stats[currentFunc].queue_time_min)) {
return rc;
}
-/*
- * rx_IncrementTimeAndCount - increment the times and count for a particular
- * rpc function.
- *
- * PARAMETERS
- *
- * IN peer - the peer who invoked the rpc
- *
- * IN rxInterface - a unique number that identifies the rpc interface
- *
- * IN currentFunc - the index of the function being invoked
- *
- * IN totalFunc - the total number of functions in this interface
- *
- * IN queueTime - the amount of time this function waited for a thread
- *
- * IN execTime - the amount of time this function invocation took to execute
- *
- * IN bytesSent - the number bytes sent by this invocation
- *
- * IN bytesRcvd - the number bytes received by this invocation
- *
- * IN isServer - if true, this invocation was made to a server
- *
- * RETURN CODES
- *
- * Returns void.
- */
-
void
-rx_IncrementTimeAndCount(struct rx_peer *peer, afs_uint32 rxInterface,
- afs_uint32 currentFunc, afs_uint32 totalFunc,
- struct clock *queueTime, struct clock *execTime,
- afs_hyper_t * bytesSent, afs_hyper_t * bytesRcvd,
- int isServer)
+rxi_IncrementTimeAndCount(struct rx_peer *peer, afs_uint32 rxInterface,
+ afs_uint32 currentFunc, afs_uint32 totalFunc,
+ struct clock *queueTime, struct clock *execTime,
+ afs_uint64 bytesSent, afs_uint64 bytesRcvd,
+ int isServer)
{
if (!(rxi_monitor_peerStats || rxi_monitor_processStats))
}
MUTEX_EXIT(&rx_rpc_stats);
+}
+
+/*!
+ * Increment the times and count for a particular rpc function.
+ *
+ * Traditionally this call was invoked from rxgen stubs. Modern stubs
+ * call rx_RecordCallStatistics instead, so the public version of this
+ * function is left purely for legacy callers.
+ *
+ * @param peer
+ * The peer who invoked the rpc
+ *
+ * @param rxInterface
+ * A unique number that identifies the rpc interface
+ *
+ * @param currentFunc
+ * The index of the function being invoked
+ *
+ * @param totalFunc
+ * The total number of functions in this interface
+ *
+ * @param queueTime
+ * The amount of time this function waited for a thread
+ *
+ * @param execTime
+ * The amount of time this function invocation took to execute
+ *
+ * @param bytesSent
+ * The number bytes sent by this invocation
+ *
+ * @param bytesRcvd
+ * The number bytes received by this invocation
+ *
+ * @param isServer
+ * If true, this invocation was made to a server
+ *
+ */
+void
+rx_IncrementTimeAndCount(struct rx_peer *peer, afs_uint32 rxInterface,
+ afs_uint32 currentFunc, afs_uint32 totalFunc,
+ struct clock *queueTime, struct clock *execTime,
+ afs_hyper_t * bytesSent, afs_hyper_t * bytesRcvd,
+ int isServer)
+{
+ afs_uint64 sent64;
+ afs_uint64 rcvd64;
+
+ sent64 = ((afs_uint64)bytesSent->high << 32) + bytesSent->low;
+ rcvd64 = ((afs_uint64)bytesRcvd->high << 32) + bytesRcvd->low;
+ rxi_IncrementTimeAndCount(peer, rxInterface, currentFunc, totalFunc,
+ queueTime, execTime, sent64, rcvd64,
+ isServer);
}
+
+
/*
* rx_MarshallProcessRPCStats - marshall an array of rpc statistics
*
*(ptr++) = stats->interfaceId;
*(ptr++) = stats->func_total;
*(ptr++) = stats->func_index;
- *(ptr++) = hgethi(stats->invocations);
- *(ptr++) = hgetlo(stats->invocations);
- *(ptr++) = hgethi(stats->bytes_sent);
- *(ptr++) = hgetlo(stats->bytes_sent);
- *(ptr++) = hgethi(stats->bytes_rcvd);
- *(ptr++) = hgetlo(stats->bytes_rcvd);
+ *(ptr++) = stats->invocations >> 32;
+ *(ptr++) = stats->invocations & MAX_AFS_UINT32;
+ *(ptr++) = stats->bytes_sent >> 32;
+ *(ptr++) = stats->bytes_sent & MAX_AFS_UINT32;
+ *(ptr++) = stats->bytes_rcvd >> 32;
+ *(ptr++) = stats->bytes_rcvd & MAX_AFS_UINT32;
*(ptr++) = stats->queue_time_sum.sec;
*(ptr++) = stats->queue_time_sum.usec;
*(ptr++) = stats->queue_time_sum_sqr.sec;
num_funcs = rpc_stat->stats[0].func_total;
for (i = 0; i < num_funcs; i++) {
if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
- hzero(rpc_stat->stats[i].invocations);
+ rpc_stat->stats[i].invocations = 0;
}
if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
- hzero(rpc_stat->stats[i].bytes_sent);
+ rpc_stat->stats[i].bytes_sent = 0;
}
if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
- hzero(rpc_stat->stats[i].bytes_rcvd);
+ rpc_stat->stats[i].bytes_rcvd = 0;
}
if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
rpc_stat->stats[i].queue_time_sum.sec = 0;
num_funcs = rpc_stat->stats[0].func_total;
for (i = 0; i < num_funcs; i++) {
if (clearFlag & AFS_RX_STATS_CLEAR_INVOCATIONS) {
- hzero(rpc_stat->stats[i].invocations);
+ rpc_stat->stats[i].invocations = 0;
}
if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_SENT) {
- hzero(rpc_stat->stats[i].bytes_sent);
+ rpc_stat->stats[i].bytes_sent = 0;
}
if (clearFlag & AFS_RX_STATS_CLEAR_BYTES_RCVD) {
- hzero(rpc_stat->stats[i].bytes_rcvd);
+ rpc_stat->stats[i].bytes_rcvd = 0;
}
if (clearFlag & AFS_RX_STATS_CLEAR_QUEUE_TIME_SUM) {
rpc_stat->stats[i].queue_time_sum.sec = 0;