# include <stdlib.h>
# include <fcntl.h>
# include <afs/afsutil.h>
+# include <WINNT\afsreg.h>
#else
# include <sys/socket.h>
# include <sys/file.h>
#endif /* KERNEL */
char *htable, *ptable;
int tmp_status;
-
-#if defined(AFS_DJGPP_ENV) && !defined(DEBUG)
- __djgpp_set_quiet_socket(1);
-#endif
-
+
SPLVAR;
-
+
INIT_PTHREAD_LOCKS;
LOCK_RX_INIT;
if (rxinit_status == 0) {
UNLOCK_RX_INIT;
return tmp_status; /* Already started; return previous error code. */
}
+#ifdef RXDEBUG
+ rxi_DebugInit();
+#endif
#ifdef AFS_NT40_ENV
if (afs_winsockInit() < 0)
return -1;
#endif
-
+
#ifndef KERNEL
/*
* Initialize anything necessary to provide a non-premptive threading
*/
rxi_InitializeThreadSupport();
#endif
-
+
/* Allocate and initialize a socket for client and perhaps server
* connections. */
-
+
rx_socket = rxi_GetHostUDPSocket(host, (u_short) port);
if (rx_socket == OSI_NULLSOCKET) {
UNLOCK_RX_INIT;
if (!uniprocessor)
rx_sleepLock = alloc_spinlock(LAST_HELD_ORDER - 10, "rx_sleepLock");
#endif /* KERNEL && AFS_HPUX110_ENV */
-#else /* RX_ENABLE_LOCKS */
-#if defined(KERNEL) && defined(AFS_GLOBAL_SUNLOCK) && !defined(AFS_HPUX_ENV) && !defined(AFS_OBSD_ENV)
- mutex_init(&afs_rxglobal_lock, "afs_rxglobal_lock", MUTEX_DEFAULT, NULL);
-#endif /* AFS_GLOBAL_SUNLOCK */
#endif /* RX_ENABLE_LOCKS */
rxi_nCalls = 0;
SPLVAR;
clock_NewTime();
- dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n", shost, sport, sservice, securityObject, serviceSecurityIndex));
+ dpf(("rx_NewConnection(host %x, port %u, service %u, securityObject %x, serviceSecurityIndex %d)\n", ntohl(shost), ntohs(sport), sservice, securityObject, serviceSecurityIndex));
/* Vasilsi said: "NETPRI protects Cid and Alloc", but can this be true in
* the case of kmem_alloc? */
USERPRI;
}
+/* Wait for the transmit queue to no longer be busy.
+ * requires the call->lock to be held */
+static void rxi_WaitforTQBusy(struct rx_call *call) {
+ while (call->flags & RX_CALL_TQ_BUSY) {
+ call->flags |= RX_CALL_TQ_WAIT;
+ call->tqWaiters++;
+#ifdef RX_ENABLE_LOCKS
+ osirx_AssertMine(&call->lock, "rxi_WaitforTQ lock");
+ CV_WAIT(&call->cv_tq, &call->lock);
+#else /* RX_ENABLE_LOCKS */
+ osi_rxSleep(&call->tq);
+#endif /* RX_ENABLE_LOCKS */
+ call->tqWaiters--;
+ if (call->tqWaiters == 0) {
+ call->flags &= ~RX_CALL_TQ_WAIT;
+ }
+ }
+}
/* Start a new rx remote procedure call, on the specified connection.
* If wait is set to 1, wait for a free call channel; otherwise return
* 0. Maxtime gives the maximum number of seconds this call may take,
- * after rx_MakeCall returns. After this time interval, a call to any
+ * after rx_NewCall returns. After this time interval, a call to any
* of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
* For fine grain locking, we hold the conn_call_lock in order to
* to ensure that we don't get signalle after we found a call in an active
SPLVAR;
clock_NewTime();
- dpf(("rx_MakeCall(conn %x)\n", conn));
+ dpf(("rx_NewCall(conn %x)\n", conn));
NETPRI;
clock_GetTime(&queueTime);
/* Client is initially in send mode */
call->state = RX_STATE_ACTIVE;
- call->mode = RX_MODE_SENDING;
-
+ call->error = conn->error;
+ if (call->error)
+ call->mode = RX_MODE_ERROR;
+ else
+ call->mode = RX_MODE_SENDING;
+
/* remember start time for call in case we have hard dead time limit */
call->queueTime = queueTime;
clock_GetTime(&call->startTime);
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
/* Now, if TQ wasn't cleared earlier, do it now. */
MUTEX_ENTER(&call->lock);
- while (call->flags & RX_CALL_TQ_BUSY) {
- call->flags |= RX_CALL_TQ_WAIT;
-#ifdef RX_ENABLE_LOCKS
- CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
- osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
- }
+ rxi_WaitforTQBusy(call);
if (call->flags & RX_CALL_TQ_CLEARME) {
rxi_ClearTransmitQueue(call, 0);
queue_Init(&call->tq);
MUTEX_EXIT(&call->lock);
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+ dpf(("rx_NewCall(call %x)\n", call));
return call;
}
service name might be used for probing for
statistics) */
struct rx_service *
-rx_NewService(u_short port, u_short serviceId, char *serviceName,
- struct rx_securityClass **securityObjects, int nSecurityObjects,
- afs_int32(*serviceProc) (struct rx_call * acall))
+rx_NewServiceHost(afs_uint32 host, u_short port, u_short serviceId,
+ char *serviceName, struct rx_securityClass **securityObjects,
+ int nSecurityObjects,
+ afs_int32(*serviceProc) (struct rx_call * acall))
{
osi_socket socket = OSI_NULLSOCKET;
register struct rx_service *tservice;
for (i = 0; i < RX_MAX_SERVICES; i++) {
register struct rx_service *service = rx_services[i];
if (service) {
- if (port == service->servicePort) {
+ if (port == service->servicePort && host == service->serviceHost) {
if (service->serviceId == serviceId) {
/* The identical service has already been
* installed; if the caller was intending to
}
service = tservice;
service->socket = socket;
+ service->serviceHost = host;
service->servicePort = port;
service->serviceId = serviceId;
service->serviceName = serviceName;
return 0;
}
+struct rx_service *
+rx_NewService(u_short port, u_short serviceId, char *serviceName,
+ struct rx_securityClass **securityObjects, int nSecurityObjects,
+ afs_int32(*serviceProc) (struct rx_call * acall))
+{
+ return rx_NewServiceHost(htonl(INADDR_ANY), port, serviceId, serviceName, securityObjects, nSecurityObjects, serviceProc);
+}
+
/* Generic request processing loop. This routine should be called
* by the implementation dependent rx_ServerProc. If socketp is
* non-null, it will be set to the file descriptor that this thread
{
register struct rx_connection *conn = call->conn;
register struct rx_service *service;
- register struct rx_packet *tp; /* Temporary packet pointer */
- register struct rx_packet *nxp; /* Next packet pointer, for queue_Scan */
afs_int32 error;
SPLVAR;
- dpf(("rx_EndCall(call %x)\n", call));
+
+
+ dpf(("rx_EndCall(call %x rc %d error %d abortCode %d)\n", call, rc, call->error, call->abortCode));
NETPRI;
MUTEX_ENTER(&call->lock);
}
rxi_flushtrace();
+#ifdef AFS_NT40_ENV
+ afs_winsockCleanup();
+#endif
+
rxinit_status = 1;
UNLOCK_RX_INIT;
}
register struct rx_call *nxp; /* Next call pointer, for queue_Scan */
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+ dpf(("rxi_NewCall(conn %x, channel %d)\n", conn, channel));
+
/* Grab an existing call structure, or allocate a new one.
* Existing call structures are assumed to have been left reset by
* rxi_FreeCall */
MUTEX_ENTER(&rx_stats_mutex);
rxi_Alloccnt++;
- rxi_Allocsize += size;
+ rxi_Allocsize += (afs_int32)size;
MUTEX_EXIT(&rx_stats_mutex);
p = (char *)osi_Alloc(size);
{
MUTEX_ENTER(&rx_stats_mutex);
rxi_Alloccnt--;
- rxi_Allocsize -= size;
+ rxi_Allocsize -= (afs_int32)size;
MUTEX_EXIT(&rx_stats_mutex);
osi_Free(addr, size);
* structure hanging off a connection structure */
struct rx_peer *
rxi_FindPeer(register afs_uint32 host, register u_short port,
- struct rx_peer *origPeer, int create)
+ struct rx_peer *origPeer, int create)
{
register struct rx_peer *pp;
int hashIndex;
MUTEX_ENTER(&rx_peerHashTable_lock);
for (pp = rx_peerHashTable[hashIndex]; pp; pp = pp->next) {
if ((pp->host == host) && (pp->port == port))
- break;
+ break;
}
if (!pp) {
- if (create) {
- pp = rxi_AllocPeer(); /* This bzero's *pp */
- pp->host = host; /* set here or in InitPeerParams is zero */
+ if (create) {
+ pp = rxi_AllocPeer(); /* This bzero's *pp */
+ pp->host = host; /* set here or in InitPeerParams is zero */
pp->port = port;
MUTEX_INIT(&pp->peer_lock, "peer_lock", MUTEX_DEFAULT, 0);
queue_Init(&pp->congestionQueue);
packetType = (np->header.type > 0 && np->header.type < RX_N_PACKET_TYPES)
? rx_packetTypes[np->header.type - 1] : "*UNKNOWN*";
dpf(("R %d %s: %x.%d.%d.%d.%d.%d.%d flags %d, packet %x",
- np->header.serial, packetType, host, port, np->header.serviceId,
+ np->header.serial, packetType, ntohl(host), ntohs(port), np->header.serviceId,
np->header.epoch, np->header.cid, np->header.callNumber,
np->header.seq, np->header.flags, np));
#endif
/* Check for connection-only requests (i.e. not call specific). */
if (np->header.callNumber == 0) {
switch (np->header.type) {
- case RX_PACKET_TYPE_ABORT:
+ case RX_PACKET_TYPE_ABORT: {
/* What if the supplied error is zero? */
- rxi_ConnectionError(conn, ntohl(rx_GetInt32(np, 0)));
+ afs_int32 errcode = ntohl(rx_GetInt32(np, 0));
+ dpf(("rxi_ReceivePacket ABORT rx_GetInt32 = %d", errcode));
+ rxi_ConnectionError(conn, errcode);
MUTEX_ENTER(&conn->conn_data_lock);
conn->refCount--;
MUTEX_EXIT(&conn->conn_data_lock);
return np;
+ }
case RX_PACKET_TYPE_CHALLENGE:
tnp = rxi_ReceiveChallengePacket(conn, np, 1);
MUTEX_ENTER(&conn->conn_data_lock);
call = rxi_NewCall(conn, channel);
MUTEX_EXIT(&conn->conn_call_lock);
*call->callNumber = np->header.callNumber;
+ if (np->header.callNumber == 0)
+ dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %lx resend %d.%0.3d len %d", np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port), np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq, np->header.flags, (unsigned long)np, np->retryTime.sec, np->retryTime.usec / 1000, np->length));
+
call->state = RX_STATE_PRECALL;
clock_GetTime(&call->queueTime);
hzero(call->bytesSent);
hzero(call->bytesRcvd);
+ /*
+ * If the number of queued calls exceeds the overload
+ * threshold then abort this call.
+ */
+ if ((rx_BusyThreshold > 0) && (rx_nWaiting > rx_BusyThreshold)) {
+ struct rx_packet *tp;
+
+ rxi_CallError(call, rx_BusyError);
+ tp = rxi_SendCallAbort(call, np, 1, 0);
+ MUTEX_EXIT(&call->lock);
+ MUTEX_ENTER(&conn->conn_data_lock);
+ conn->refCount--;
+ MUTEX_EXIT(&conn->conn_data_lock);
+ MUTEX_ENTER(&rx_stats_mutex);
+ rx_stats.nBusies++;
+ MUTEX_EXIT(&rx_stats_mutex);
+ return tp;
+ }
rxi_KeepAliveOn(call);
} else if (np->header.callNumber != currentCallNumber) {
/* Wait until the transmit queue is idle before deciding
while ((call->state == RX_STATE_ACTIVE)
&& (call->flags & RX_CALL_TQ_BUSY)) {
call->flags |= RX_CALL_TQ_WAIT;
+ call->tqWaiters++;
#ifdef RX_ENABLE_LOCKS
+ osirx_AssertMine(&call->lock, "rxi_Start lock3");
CV_WAIT(&call->cv_tq, &call->lock);
#else /* RX_ENABLE_LOCKS */
osi_rxSleep(&call->tq);
#endif /* RX_ENABLE_LOCKS */
+ call->tqWaiters--;
+ if (call->tqWaiters == 0)
+ call->flags &= ~RX_CALL_TQ_WAIT;
}
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
/* If the new call cannot be taken right now send a busy and set
}
rxi_ResetCall(call, 0);
*call->callNumber = np->header.callNumber;
+ if (np->header.callNumber == 0)
+ dpf(("RecPacket call 0 %d %s: %x.%u.%u.%u.%u.%u.%u flags %d, packet %lx resend %d.%0.3d len %d", np->header.serial, rx_packetTypes[np->header.type - 1], ntohl(conn->peer->host), ntohs(conn->peer->port), np->header.serial, np->header.epoch, np->header.cid, np->header.callNumber, np->header.seq, np->header.flags, (unsigned long)np, np->retryTime.sec, np->retryTime.usec / 1000, np->length));
+
call->state = RX_STATE_PRECALL;
clock_GetTime(&call->queueTime);
hzero(call->bytesSent);
}
np = rxi_ReceiveAckPacket(call, np, 1);
break;
- case RX_PACKET_TYPE_ABORT:
- /* An abort packet: reset the connection, passing the error up to
- * the user */
+ case RX_PACKET_TYPE_ABORT: {
+ /* An abort packet: reset the call, passing the error up to the user. */
/* What if error is zero? */
- rxi_CallError(call, ntohl(*(afs_int32 *) rx_DataOf(np)));
- break;
+ /* What if the error is -1? the application will treat it as a timeout. */
+ afs_int32 errdata = ntohl(*(afs_int32 *) rx_DataOf(np));
+ dpf(("rxi_ReceivePacket ABORT rx_DataOf = %d", errdata));
+ rxi_CallError(call, errdata);
+ MUTEX_EXIT(&call->lock);
+ MUTEX_ENTER(&conn->conn_data_lock);
+ conn->refCount--;
+ MUTEX_EXIT(&conn->conn_data_lock);
+ return np; /* xmitting; drop packet */
+ }
case RX_PACKET_TYPE_BUSY:
/* XXXX */
break;
rxi_SetAcksInTransmitQueue(call);
break;
#else /* RX_ENABLE_LOCKS */
+ MUTEX_EXIT(&call->lock);
+ MUTEX_ENTER(&conn->conn_data_lock);
conn->refCount--;
+ MUTEX_EXIT(&conn->conn_data_lock);
return np; /* xmitting; drop packet */
#endif /* RX_ENABLE_LOCKS */
}
/* We need to send an ack of the packet is out of sequence,
* or if an ack was requested by the peer. */
- if (seq != prev + 1 || missing || (flags & RX_REQUEST_ACK)) {
+ if (seq != prev + 1 || missing) {
ackNeeded = RX_ACK_OUT_OF_SEQUENCE;
- }
+ } else if (flags & RX_REQUEST_ACK) {
+ ackNeeded = RX_ACK_REQUESTED;
+ }
/* Acknowledge the last packet for each call */
if (flags & RX_LAST_PACKET) {
MUTEX_EXIT(&conn->conn_data_lock);
}
+static const char *
+rx_ack_reason(int reason)
+{
+ switch (reason) {
+ case RX_ACK_REQUESTED:
+ return "requested";
+ case RX_ACK_DUPLICATE:
+ return "duplicate";
+ case RX_ACK_OUT_OF_SEQUENCE:
+ return "sequence";
+ case RX_ACK_EXCEEDS_WINDOW:
+ return "window";
+ case RX_ACK_NOSPACE:
+ return "nospace";
+ case RX_ACK_PING:
+ return "ping";
+ case RX_ACK_PING_RESPONSE:
+ return "response";
+ case RX_ACK_DELAY:
+ return "delay";
+ case RX_ACK_IDLE:
+ return "idle";
+ default:
+ return "unknown!!";
+ }
+}
+
+
/* rxi_ComputePeerNetStats
*
* Called exclusively by rxi_ReceiveAckPacket to compute network link
rx_stats.ackPacketsRead++;
MUTEX_EXIT(&rx_stats_mutex);
ap = (struct rx_ackPacket *)rx_DataOf(np);
- nbytes = rx_Contiguous(np) - ((ap->acks) - (u_char *) ap);
+ nbytes = rx_Contiguous(np) - (int)((ap->acks) - (u_char *) ap);
if (nbytes < 0)
return np; /* truncated ack packet */
rxi_UpdatePeerReach(conn, call);
#ifdef RXDEBUG
+#ifdef AFS_NT40_ENV
+ if (rxdebug_active) {
+ char msg[512];
+ size_t len;
+
+ len = _snprintf(msg, sizeof(msg),
+ "tid[%d] RACK: reason %s serial %u previous %u seq %u skew %d first %u acks %u space %u ",
+ GetCurrentThreadId(), rx_ack_reason(ap->reason),
+ ntohl(ap->serial), ntohl(ap->previousPacket),
+ (unsigned int)np->header.seq, (unsigned int)skew,
+ ntohl(ap->firstPacket), ap->nAcks, ntohs(ap->bufferSpace) );
+ if (nAcks) {
+ int offset;
+
+ for (offset = 0; offset < nAcks && len < sizeof(msg); offset++)
+ msg[len++] = (ap->acks[offset] == RX_ACK_TYPE_NACK ? '-' : '*');
+ }
+ msg[len++]='\n';
+ msg[len] = '\0';
+ OutputDebugString(msg);
+ }
+#else /* AFS_NT40_ENV */
if (rx_Log) {
fprintf(rx_Log,
"RACK: reason %x previous %u seq %u serial %u skew %d first %u",
}
putc('\n', rx_Log);
}
+#endif /* AFS_NT40_ENV */
#endif
/* Update the outgoing packet skew value to the latest value of
if (serial
&& (tp->header.serial == serial || tp->firstSerial == serial))
rxi_ComputePeerNetStats(call, tp, ap, np);
+ if (!(tp->flags & RX_PKTFLAG_ACKED)) {
+ newAckCount++;
+ }
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
/* XXX Hack. Because we have to release the global rx lock when sending
* packets (osi_NetSend) we drop all acks while we're traversing the tq
* set the ack bits in the packets and have rxi_Start remove the packets
* when it's done transmitting.
*/
- if (!(tp->flags & RX_PKTFLAG_ACKED)) {
- newAckCount++;
- }
if (call->flags & RX_CALL_TQ_BUSY) {
#ifdef RX_ENABLE_LOCKS
tp->flags |= RX_PKTFLAG_ACKED;
} else {
call->nSoftAcked++;
}
- } else {
+ } else /* RX_ACK_TYPE_NACK */ {
tp->flags &= ~RX_PKTFLAG_ACKED;
missing = 1;
}
/* If the ack packet has a "recommended" size that is less than
* what I am using now, reduce my size to match */
rx_packetread(np, rx_AckDataSize(ap->nAcks) + sizeof(afs_int32),
- sizeof(afs_int32), &tSize);
+ (int)sizeof(afs_int32), &tSize);
tSize = (afs_uint32) ntohl(tSize);
peer->natMTU = rxi_AdjustIfMTU(MIN(tSize, peer->ifMTU));
/* Get the maximum packet size to send to this peer */
- rx_packetread(np, rx_AckDataSize(ap->nAcks), sizeof(afs_int32),
+ rx_packetread(np, rx_AckDataSize(ap->nAcks), (int)sizeof(afs_int32),
&tSize);
tSize = (afs_uint32) ntohl(tSize);
tSize = (afs_uint32) MIN(tSize, rx_MyMaxSendSize);
* be unable to accept packets of the size that prior AFS versions would
* send without asking. */
if (peer->maxMTU != tSize) {
+ if (peer->maxMTU > tSize) /* possible cong., maxMTU decreased */
+ peer->congestSeq++;
peer->maxMTU = tSize;
peer->MTU = MIN(tSize, peer->MTU);
call->MTU = MIN(call->MTU, tSize);
- peer->congestSeq++;
}
if (np->length == rx_AckDataSize(ap->nAcks) + 3 * sizeof(afs_int32)) {
/* AFS 3.4a */
rx_packetread(np,
rx_AckDataSize(ap->nAcks) + 2 * sizeof(afs_int32),
- sizeof(afs_int32), &tSize);
+ (int)sizeof(afs_int32), &tSize);
tSize = (afs_uint32) ntohl(tSize); /* peer's receive window, if it's */
if (tSize < call->twind) { /* smaller than our send */
call->twind = tSize; /* window, we must send less... */
sizeof(afs_int32), &tSize);
maxDgramPackets = (afs_uint32) ntohl(tSize);
maxDgramPackets = MIN(maxDgramPackets, rxi_nDgramPackets);
- maxDgramPackets =
- MIN(maxDgramPackets, (int)(peer->ifDgramPackets));
- maxDgramPackets = MIN(maxDgramPackets, tSize);
+ maxDgramPackets = MIN(maxDgramPackets, peer->ifDgramPackets);
+ if (peer->natMTU < peer->ifMTU)
+ maxDgramPackets = MIN(maxDgramPackets, rxi_AdjustDgramPackets(1, peer->natMTU));
if (maxDgramPackets > 1) {
peer->maxDgramPackets = maxDgramPackets;
call->MTU = RX_JUMBOBUFFERSIZE + RX_HEADER_SIZE;
return np;
}
call->flags |= RX_CALL_FAST_RECOVER_WAIT;
- while (call->flags & RX_CALL_TQ_BUSY) {
- call->flags |= RX_CALL_TQ_WAIT;
-#ifdef RX_ENABLE_LOCKS
- CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
- osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
- }
+ rxi_WaitforTQBusy(call);
MUTEX_ENTER(&peer->peer_lock);
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
void
rxi_ClearTransmitQueue(register struct rx_call *call, register int force)
{
+#ifdef AFS_GLOBAL_RXLOCK_KERNEL
register struct rx_packet *p, *tp;
-#ifdef AFS_GLOBAL_RXLOCK_KERNEL
if (!force && (call->flags & RX_CALL_TQ_BUSY)) {
int someAcked = 0;
for (queue_Scan(&call->tq, p, tp, rx_packet)) {
{
if (error) {
register int i;
+
+ dpf(("rxi_ConnectionError conn %x error %d", conn, error));
+
MUTEX_ENTER(&conn->conn_data_lock);
if (conn->challengeEvent)
rxevent_Cancel(conn->challengeEvent, (struct rx_call *)0, 0);
void
rxi_CallError(register struct rx_call *call, afs_int32 error)
{
+ dpf(("rxi_CallError call %x error %d call->error %d", call, error, call->error));
if (call->error)
error = call->error;
+
#ifdef RX_GLOBAL_RXLOCK_KERNEL
- if (!(call->flags & RX_CALL_TQ_BUSY)) {
+ if (!((call->flags & RX_CALL_TQ_BUSY) || (call->tqWaiters > 0))) {
rxi_ResetCall(call, 0);
}
#else
register struct rx_peer *peer;
struct rx_packet *packet;
+ dpf(("rxi_ResetCall(call %x, newcall %d)\n", call, newcall));
+
/* Notify anyone who is waiting for asynchronous packet arrival */
if (call->arrivalProc) {
(*call->arrivalProc) (call, call->arrivalProcHandle,
flags = call->flags;
rxi_ClearReceiveQueue(call);
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- if (call->flags & RX_CALL_TQ_BUSY) {
+ if (flags & RX_CALL_TQ_BUSY) {
call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
call->flags |= (flags & RX_CALL_TQ_WAIT);
} else
{
rxi_ClearTransmitQueue(call, 0);
queue_Init(&call->tq);
+ if (call->tqWaiters || (flags & RX_CALL_TQ_WAIT)) {
+ dpf(("rcall %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
+ }
call->flags = 0;
+ while (call->tqWaiters) {
+#ifdef RX_ENABLE_LOCKS
+ CV_BROADCAST(&call->cv_tq);
+#else /* RX_ENABLE_LOCKS */
+ osi_rxWakeup(&call->tq);
+#endif /* RX_ENABLE_LOCKS */
+ call->tqWaiters--;
+ }
}
queue_Init(&call->rq);
call->error = 0;
p->header.flags |= RX_CLIENT_INITIATED;
#ifdef RXDEBUG
+#ifdef AFS_NT40_ENV
+ if (rxdebug_active) {
+ char msg[512];
+ size_t len;
+
+ len = _snprintf(msg, sizeof(msg),
+ "tid[%d] SACK: reason %s serial %u previous %u seq %u first %u acks %u space %u ",
+ GetCurrentThreadId(), rx_ack_reason(ap->reason),
+ ntohl(ap->serial), ntohl(ap->previousPacket),
+ (unsigned int)p->header.seq, ntohl(ap->firstPacket),
+ ap->nAcks, ntohs(ap->bufferSpace) );
+ if (ap->nAcks) {
+ int offset;
+
+ for (offset = 0; offset < ap->nAcks && len < sizeof(msg); offset++)
+ msg[len++] = (ap->acks[offset] == RX_ACK_TYPE_NACK ? '-' : '*');
+ }
+ msg[len++]='\n';
+ msg[len] = '\0';
+ OutputDebugString(msg);
+ }
+#else /* AFS_NT40_ENV */
if (rx_Log) {
- fprintf(rx_Log, "SACK: reason %x previous %u seq %u first %u",
+ fprintf(rx_Log, "SACK: reason %x previous %u seq %u first %u ",
ap->reason, ntohl(ap->previousPacket),
(unsigned int)p->header.seq, ntohl(ap->firstPacket));
if (ap->nAcks) {
}
putc('\n', rx_Log);
}
+#endif /* AFS_NT40_ENV */
#endif
-
{
register int i, nbytes = p->length;
return;
}
call->flags |= RX_CALL_FAST_RECOVER_WAIT;
- while (call->flags & RX_CALL_TQ_BUSY) {
- call->flags |= RX_CALL_TQ_WAIT;
-#ifdef RX_ENABLE_LOCKS
- CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
- osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
- }
+ rxi_WaitforTQBusy(call);
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
call->flags |= RX_CALL_FAST_RECOVER;
*/
if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
call->flags &= ~RX_CALL_TQ_BUSY;
- if (call->flags & RX_CALL_TQ_WAIT) {
- call->flags &= ~RX_CALL_TQ_WAIT;
+ if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
+ dpf(("call %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
#ifdef RX_ENABLE_LOCKS
+ osirx_AssertMine(&call->lock, "rxi_Start start");
CV_BROADCAST(&call->cv_tq);
#else /* RX_ENABLE_LOCKS */
osi_rxWakeup(&call->tq);
rx_tq_debug.rxi_start_aborted++;
MUTEX_EXIT(&rx_stats_mutex);
call->flags &= ~RX_CALL_TQ_BUSY;
- if (call->flags & RX_CALL_TQ_WAIT) {
- call->flags &= ~RX_CALL_TQ_WAIT;
+ if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
+ dpf(("call %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
#ifdef RX_ENABLE_LOCKS
+ osirx_AssertMine(&call->lock, "rxi_Start middle");
CV_BROADCAST(&call->cv_tq);
#else /* RX_ENABLE_LOCKS */
osi_rxWakeup(&call->tq);
* protected by the global lock.
*/
call->flags &= ~RX_CALL_TQ_BUSY;
- if (call->flags & RX_CALL_TQ_WAIT) {
- call->flags &= ~RX_CALL_TQ_WAIT;
+ if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
+ dpf(("call %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
#ifdef RX_ENABLE_LOCKS
+ osirx_AssertMine(&call->lock, "rxi_Start end");
CV_BROADCAST(&call->cv_tq);
#else /* RX_ENABLE_LOCKS */
osi_rxWakeup(&call->tq);
(char *)&error, sizeof(error), 0);
rxi_FreePacket(packet);
}
+ CALL_RELE(call, RX_CALL_REFCOUNT_ABORT);
MUTEX_EXIT(&call->lock);
}
#endif /* ADAPT_WINDOW */
+#ifdef RXDEBUG
+void
+rxi_DebugInit(void)
+{
+#ifdef AFS_NT40_ENV
+#define TRACE_OPTION_DEBUGLOG 4
+ HKEY parmKey;
+ DWORD dummyLen;
+ DWORD TraceOption;
+ long code;
+
+ rxdebug_active = 0;
+
+ code = RegOpenKeyEx(HKEY_LOCAL_MACHINE, AFSREG_CLT_SVC_PARAM_SUBKEY,
+ 0, KEY_QUERY_VALUE, &parmKey);
+ if (code != ERROR_SUCCESS)
+ return;
+ dummyLen = sizeof(TraceOption);
+ code = RegQueryValueEx(parmKey, "TraceOption", NULL, NULL,
+ (BYTE *) &TraceOption, &dummyLen);
+ if (code == ERROR_SUCCESS) {
+ rxdebug_active = (TraceOption & TRACE_OPTION_DEBUGLOG) ? 1 : 0;
+ }
+ RegCloseKey (parmKey);
+#endif /* AFS_NT40_ENV */
+}
+#ifdef AFS_NT40_ENV
+void
+rx_DebugOnOff(int on)
+{
+ rxdebug_active = on;
+}
+#endif /* AFS_NT40_ENV */
-#ifdef RXDEBUG
/* Don't call this debugging routine directly; use dpf */
void
rxi_DebugPrint(char *format, int a1, int a2, int a3, int a4, int a5, int a6,
int a7, int a8, int a9, int a10, int a11, int a12, int a13,
int a14, int a15)
{
+#ifdef AFS_NT40_ENV
+ char msg[512];
+ char tformat[256];
+ size_t len;
+
+ len = _snprintf(tformat, sizeof(tformat), "tid[%d] %s", GetCurrentThreadId(), format);
+
+ if (len > 0) {
+ len = _snprintf(msg, sizeof(msg)-2,
+ tformat, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10,
+ a11, a12, a13, a14, a15);
+ if (len > 0) {
+ if (msg[len-1] != '\n') {
+ msg[len] = '\n';
+ msg[len+1] = '\0';
+ }
+ OutputDebugString(msg);
+ }
+ }
+#else
struct clock now;
clock_GetTime(&now);
fprintf(rx_Log, " %u.%.3u:", (unsigned int)now.sec,
fprintf(rx_Log, format, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12,
a13, a14, a15);
putc('\n', rx_Log);
-}
#endif
+}
-#ifdef RXDEBUG
/*
* This function is used to process the rx_stats structure that is local
* to a process as well as an rx_stats structure received from a remote
void *outputData, size_t outputLength)
{
static afs_int32 counter = 100;
- afs_int32 endTime;
+ time_t waitTime, waitCount, startTime, endTime;
struct rx_header theader;
char tbuffer[1500];
register afs_int32 code;
- struct timeval tv;
+ struct timeval tv_now, tv_wake, tv_delta;
struct sockaddr_in taddr, faddr;
int faddrLen;
fd_set imask;
register char *tp;
- endTime = time(0) + 20; /* try for 20 seconds */
+ startTime = time(0);
+ waitTime = 1;
+ waitCount = 5;
LOCK_RX_DEBUG;
counter++;
UNLOCK_RX_DEBUG;
(struct sockaddr *)&taddr, sizeof(struct sockaddr_in));
/* see if there's a packet available */
- FD_ZERO(&imask);
- FD_SET(socket, &imask);
- tv.tv_sec = 1;
- tv.tv_usec = 0;
- code = select(socket + 1, &imask, 0, 0, &tv);
- if (code == 1 && FD_ISSET(socket, &imask)) {
- /* now receive a packet */
- faddrLen = sizeof(struct sockaddr_in);
- code =
- recvfrom(socket, tbuffer, sizeof(tbuffer), 0,
- (struct sockaddr *)&faddr, &faddrLen);
-
- if (code > 0) {
- memcpy(&theader, tbuffer, sizeof(struct rx_header));
- if (counter == ntohl(theader.callNumber))
- break;
+ gettimeofday(&tv_wake,0);
+ tv_wake.tv_sec += waitTime;
+ for (;;) {
+ FD_ZERO(&imask);
+ FD_SET(socket, &imask);
+ tv_delta.tv_sec = tv_wake.tv_sec;
+ tv_delta.tv_usec = tv_wake.tv_usec;
+ gettimeofday(&tv_now, 0);
+
+ if (tv_delta.tv_usec < tv_now.tv_usec) {
+ /* borrow */
+ tv_delta.tv_usec += 1000000;
+ tv_delta.tv_sec--;
+ }
+ tv_delta.tv_usec -= tv_now.tv_usec;
+
+ if (tv_delta.tv_sec < tv_now.tv_sec) {
+ /* time expired */
+ break;
+ }
+ tv_delta.tv_sec -= tv_now.tv_sec;
+
+ code = select(socket + 1, &imask, 0, 0, &tv_delta);
+ if (code == 1 && FD_ISSET(socket, &imask)) {
+ /* now receive a packet */
+ faddrLen = sizeof(struct sockaddr_in);
+ code =
+ recvfrom(socket, tbuffer, sizeof(tbuffer), 0,
+ (struct sockaddr *)&faddr, &faddrLen);
+
+ if (code > 0) {
+ memcpy(&theader, tbuffer, sizeof(struct rx_header));
+ if (counter == ntohl(theader.callNumber))
+ goto success;
+ continue;
+ }
}
+ break;
}
/* see if we've timed out */
- if (endTime < time(0))
- return -1;
+ if (!--waitCount) {
+ return -1;
+ }
+ waitTime <<= 1;
}
+
+ success:
code -= sizeof(struct rx_header);
if (code > outputLength)
code = outputLength;
return 0;
return rxi_rxstat_userok(call);
}
+
+#ifdef AFS_NT40_ENV
+/*
+ * DllMain() -- Entry-point function called by the DllMainCRTStartup()
+ * function in the MSVC runtime DLL (msvcrt.dll).
+ *
+ * Note: the system serializes calls to this function.
+ */
+BOOL WINAPI
+DllMain(HINSTANCE dllInstHandle, /* instance handle for this DLL module */
+ DWORD reason, /* reason function is being called */
+ LPVOID reserved) /* reserved for future use */
+{
+ switch (reason) {
+ case DLL_PROCESS_ATTACH:
+ /* library is being attached to a process */
+ INIT_PTHREAD_LOCKS;
+ return TRUE;
+
+ case DLL_PROCESS_DETACH:
+ return TRUE;
+
+ default:
+ return FALSE;
+ }
+}
+#endif
+