#include "rx.h"
#include "rx_globals.h"
#include "rx_trace.h"
+#include "rx_atomic.h"
#define AFSOP_STOP_RXCALLBACK 210 /* Stop CALLBACK process */
#define AFSOP_STOP_AFS 211 /* Stop AFS process */
#define AFSOP_STOP_BKG 212 /* Stop BKG process */
# include "rx_user.h"
# include "rx_clock.h"
# include "rx_queue.h"
+# include "rx_atomic.h"
# include "rx_globals.h"
# include "rx_trace.h"
# include <afs/rxgen_consts.h>
#include <stddef.h> /* for definition of offsetof() */
#endif
+#ifdef RX_ENABLE_LOCKS
+afs_kmutex_t rx_atomic_mutex;
+#endif
+
#ifdef AFS_PTHREAD_ENV
#include <assert.h>
MUTEX_INIT(&rx_clock_mutex, "clock", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_stats_mutex, "stats", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_waiting_mutex, "waiting", MUTEX_DEFAULT, 0);
+ MUTEX_INIT(&rx_atomic_mutex, "atomic", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_quota_mutex, "quota", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_pthread_mutex, "pthread", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_packets_mutex, "packets", MUTEX_DEFAULT, 0);
* multi_handle->lock
* rxevent_lock
* rx_stats_mutex
+ * rx_atomic_mutex
*
* Do we need a lock to protect the peer field in the conn structure?
* conn->peer was previously a constant for all intents and so has no
if (tservice->beforeProc)
(*tservice->beforeProc) (call);
- code = call->conn->service->executeRequestProc(call);
+ code = tservice->executeRequestProc(call);
if (tservice->afterProc)
(*tservice->afterProc) (call, code);
afs_uint32 skew = 0;
int nbytes;
int missing;
- int backedOff = 0;
int acked;
int nNacked = 0;
int newAckCount = 0;
* timeout value for future packets until a successful response
* is received for an initial transmission.
*/
- if (missing && !backedOff) {
+ if (missing && !peer->backedOff) {
struct clock c = peer->timeout;
struct clock max_to = {3, 0};
clock_Add(&peer->timeout, &c);
if (clock_Gt(&peer->timeout, &max_to))
peer->timeout = max_to;
- backedOff = 1;
+ peer->backedOff = 1;
}
/* If packet isn't yet acked, and it has been transmitted at least
rxi_ClearReceiveQueue(call);
/* why init the queue if you just emptied it? queue_Init(&call->rq); */
- if (call->currentPacket) {
-#ifdef RX_TRACK_PACKETS
- call->currentPacket->flags &= ~RX_PKTFLAG_CP;
- call->currentPacket->flags |= RX_PKTFLAG_IOVQ;
-#endif
- queue_Prepend(&call->iovq, call->currentPacket);
-#ifdef RXDEBUG_PACKET
- call->iovqc++;
-#endif /* RXDEBUG_PACKET */
- call->currentPacket = (struct rx_packet *)0;
- }
- call->curlen = call->nLeft = call->nFree = 0;
-
-#ifdef RXDEBUG_PACKET
- call->iovqc -=
-#endif
- rxi_FreePackets(0, &call->iovq);
call->error = 0;
call->twind = call->conn->twind[call->channel];
peer->nSent += len;
if (resending)
peer->reSends += len;
- if (rx_stats_active)
- rx_MutexAdd(rx_stats.dataPacketsSent, len, rx_stats_mutex);
MUTEX_EXIT(&peer->peer_lock);
+ if (rx_stats_active) {
+ if (resending)
+ rx_MutexAdd(rx_stats.dataPacketsReSent, len, rx_stats_mutex);
+ else
+ rx_MutexAdd(rx_stats.dataPacketsSent, len, rx_stats_mutex);
+ }
+
if (list[len - 1]->header.flags & RX_LAST_PACKET) {
lastPacket = 1;
}
* packet until the congestion window reaches the ack rate. */
if (list[i]->header.serial) {
requestAck = 1;
- if (rx_stats_active)
- rx_MutexIncrement(rx_stats.dataPacketsReSent, rx_stats_mutex);
} else {
/* improved RTO calculation- not Karn */
list[i]->firstSent = *now;
* recent additions.
* Do a dance to avoid blocking after setting now. */
MUTEX_ENTER(&peer->peer_lock);
- retryTime = peer->timeout;
+ retryTime = peer->timeout;
MUTEX_EXIT(&peer->peer_lock);
+
clock_GetTime(&now);
clock_Add(&retryTime, &now);
usenow = now;
clock_Zero(&(peer->timeout));
clock_Addmsec(&(peer->timeout), rtt_timeout);
+ /* Reset the backedOff flag since we just computed a new timeout value */
+ peer->backedOff = 0;
+
dpf(("rxi_ComputeRoundTripTime(call=%d packet=%"AFS_PTR_FMT" rtt=%d ms, srtt=%d ms, rtt_dev=%d ms, timeout=%d.%06d sec)\n",
p->header.callNumber, p, MSEC(rttp), peer->rtt >> 3, peer->rtt_dev >> 2, (peer->timeout.sec), (peer->timeout.usec)));
}
} else {
return;
}
- xferSize = rx_AckDataSize(rx_Window) + RX_HEADER_SIZE;
+ xferSize = rx_AckDataSize(rx_maxSendWindow) + RX_HEADER_SIZE;
break;
default:
* one packet exchange */
if (clock_Gt(&newTO, &peer->timeout)) {
- dpf(("CONG peer %lx/%u: timeout %d.%06d ==> %ld.%06d (rtt %u, ps %u)",
+ dpf(("CONG peer %lx/%u: timeout %d.%06d ==> %ld.%06d (rtt %u)",
ntohl(peer->host), ntohs(peer->port), peer->timeout.sec, peer->timeout.usec,
- newTO.sec, newTO.usec, peer->smRtt, peer->packetSize));
+ newTO.sec, newTO.usec, peer->smRtt));
peer->timeout = newTO;
}
/* Now, convert to the number of full packets that could fit in a
* reasonable fraction of that interval */
minTime /= (peer->smRtt << 1);
+ minTime = MAX(minTime, rx_minPeerTimeout);
xferSize = minTime; /* (make a copy) */
/* Now clamp the size to reasonable bounds. */
if (minTime <= 1)
minTime = 1;
- else if (minTime > rx_Window)
- minTime = rx_Window;
+ else if (minTime > rx_maxSendWindow)
+ minTime = rx_maxSendWindow;
/* if (minTime != peer->maxWindow) {
- dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, rtt %u, ps %u)",
+ dpf(("CONG peer %lx/%u: windowsize %lu ==> %lu (to %lu.%06lu, rtt %u)",
ntohl(peer->host), ntohs(peer->port), peer->maxWindow, minTime,
- peer->timeout.sec, peer->timeout.usec, peer->smRtt,
- peer->packetSize));
+ peer->timeout.sec, peer->timeout.usec, peer->smRtt));
peer->maxWindow = minTime;
elide... call->twind = minTime;
}
/* Cut back on the peer timeout if it had earlier grown unreasonably.
* Discern this by calculating the timeout necessary for rx_Window
* packets. */
- if ((xferSize > rx_Window) && (peer->timeout.sec >= 3)) {
+ if ((xferSize > rx_maxSendWindow) && (peer->timeout.sec >= 3)) {
/* calculate estimate for transmission interval in milliseconds */
- minTime = rx_Window * peer->smRtt;
+ minTime = rx_maxSendWindow * peer->smRtt;
if (minTime < 1000) {
- dpf(("CONG peer %lx/%u: cut TO %d.%06d by 0.5 (rtt %u, ps %u)",
+ dpf(("CONG peer %lx/%u: cut TO %d.%06d by 0.5 (rtt %u)",
ntohl(peer->host), ntohs(peer->port), peer->timeout.sec,
- peer->timeout.usec, peer->smRtt, peer->packetSize));
+ peer->timeout.usec, peer->smRtt));
newTO.sec = 0; /* cut back on timeout by half a second */
newTO.usec = 500000;
rx_PrintTheseStats(FILE * file, struct rx_statistics *s, int size,
afs_int32 freePackets, char version)
{
-#ifdef RXDEBUG
int i;
if (size != sizeof(struct rx_statistics)) {
#if !defined(AFS_PTHREAD_ENV) && !defined(AFS_USE_GETTIMEOFDAY)
fprintf(file, " %d clock updates\n", clock_nUpdates);
#endif
-#else
- fprintf(file, "ERROR: compiled without RXDEBUG\n");
-#endif
}
/* for backward compatibility */
rx_PrintPeerStats(FILE * file, struct rx_peer *peer)
{
fprintf(file, "Peer %x.%d. " "Burst size %d, " "burst wait %d.%06d.\n",
- ntohl(peer->host), (int)peer->port, (int)peer->burstSize,
+ ntohl(peer->host), (int)ntohs(peer->port), (int)peer->burstSize,
(int)peer->burstWait.sec, (int)peer->burstWait.usec);
fprintf(file,
#define UNLOCK_RX_DEBUG
#endif /* AFS_PTHREAD_ENV */
-#ifdef RXDEBUG
+#if defined(RXDEBUG) || defined(MAKEDEBUGCALL)
static int
MakeDebugCall(osi_socket socket, afs_uint32 remoteAddr, afs_uint16 remotePort,
u_char type, void *inputData, size_t inputLength,
afs_uint16 remotePort, struct rx_debugStats * stat,
afs_uint32 * supportedValues)
{
-#ifndef RXDEBUG
- afs_int32 rc = -1;
-#else
+#if defined(RXDEBUG) || defined(MAKEDEBUGCALL)
afs_int32 rc = 0;
struct rx_debugIn in;
stat->nWaited = ntohl(stat->nWaited);
stat->nPackets = ntohl(stat->nPackets);
}
+#else
+ afs_int32 rc = -1;
#endif
return rc;
}
afs_uint16 remotePort, struct rx_statistics * stat,
afs_uint32 * supportedValues)
{
-#ifndef RXDEBUG
- afs_int32 rc = -1;
-#else
+#if defined(RXDEBUG) || defined(MAKEDEBUGCALL)
afs_int32 rc = 0;
struct rx_debugIn in;
afs_int32 *lp = (afs_int32 *) stat;
*lp = ntohl(*lp);
}
}
+#else
+ afs_int32 rc = -1;
#endif
return rc;
}
afs_uint16 remotePort, size_t version_length,
char *version)
{
-#ifdef RXDEBUG
+#if defined(RXDEBUG) || defined(MAKEDEBUGCALL)
char a[1] = { 0 };
return MakeDebugCall(socket, remoteAddr, remotePort,
RX_PACKET_TYPE_VERSION, a, 1, version,
struct rx_debugConn * conn,
afs_uint32 * supportedValues)
{
-#ifndef RXDEBUG
- afs_int32 rc = -1;
-#else
+#if defined(RXDEBUG) || defined(MAKEDEBUGCALL)
afs_int32 rc = 0;
struct rx_debugIn in;
int i;
conn->epoch = ntohl(conn->epoch);
conn->natMTU = ntohl(conn->natMTU);
}
+#else
+ afs_int32 rc = -1;
#endif
return rc;
}
afs_uint32 debugSupportedValues, struct rx_debugPeer * peer,
afs_uint32 * supportedValues)
{
-#ifndef RXDEBUG
- afs_int32 rc = -1;
-#else
+#if defined(RXDEBUG) || defined(MAKEDEBUGCALL)
afs_int32 rc = 0;
struct rx_debugIn in;
peer->bytesReceived.high = ntohl(peer->bytesReceived.high);
peer->bytesReceived.low = ntohl(peer->bytesReceived.low);
}
+#else
+ afs_int32 rc = -1;
#endif
return rc;
}