# include "rxgen_consts.h"
#else /* KERNEL */
# include <sys/types.h>
+# include <string.h>
# include <errno.h>
#ifdef AFS_NT40_ENV
# include <stdlib.h>
# include <netinet/in.h>
# include <sys/time.h>
#endif
-#ifdef HAVE_STRING_H
-#include <string.h>
-#else
-#ifdef HAVE_STRINGS_H
-#include <strings.h>
-#endif
-#endif
# include "rx.h"
# include "rx_user.h"
# include "rx_clock.h"
rx_StartClientThread(void)
{
#ifdef AFS_PTHREAD_ENV
- int pid;
- pid = (int) pthread_self();
+ pthread_t pid;
+ pid = pthread_self();
#endif /* AFS_PTHREAD_ENV */
}
#endif /* AFS_NT40_ENV */
(*registerProgram) (pid, name);
#endif /* KERNEL */
#endif /* AFS_NT40_ENV */
- rx_ServerProc(); /* Never returns */
+ rx_ServerProc(NULL); /* Never returns */
}
#ifdef RX_ENABLE_TSFPQ
/* no use leaving packets around in this thread's local queue if
conn->peer = rxi_FindPeer(shost, sport, 0, 1);
conn->serviceId = sservice;
conn->securityObject = securityObject;
- /* This doesn't work in all compilers with void (they're buggy), so fake it
- * with VOID */
- conn->securityData = (VOID *) 0;
+ conn->securityData = (void *) 0;
conn->securityIndex = serviceSecurityIndex;
rx_SetConnDeadTime(conn, rx_connDeadTime);
conn->ackRate = RX_FAST_ACK_RATE;
conn->refCount++; /* no lock required since only this thread knows... */
conn->next = rx_connHashTable[hashindex];
rx_connHashTable[hashindex] = conn;
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.nClientConns++;
- MUTEX_EXIT(&rx_stats_mutex);
-
+ rx_MutexIncrement(rx_stats.nClientConns, rx_stats_mutex);
MUTEX_EXIT(&rx_connHashTable_lock);
USERPRI;
return conn;
conn->peer->refCount--;
MUTEX_EXIT(&rx_peerHashTable_lock);
- MUTEX_ENTER(&rx_stats_mutex);
if (conn->type == RX_SERVER_CONNECTION)
- rx_stats.nServerConns--;
+ rx_MutexDecrement(rx_stats.nServerConns, rx_stats_mutex);
else
- rx_stats.nClientConns--;
- MUTEX_EXIT(&rx_stats_mutex);
-
+ rx_MutexDecrement(rx_stats.nClientConns, rx_stats_mutex);
#ifndef KERNEL
if (conn->specific) {
int i;
USERPRI;
}
+/* Wait for the transmit queue to no longer be busy.
+ * requires the call->lock to be held */
+static void rxi_WaitforTQBusy(struct rx_call *call) {
+ while (call->flags & RX_CALL_TQ_BUSY) {
+ call->flags |= RX_CALL_TQ_WAIT;
+ call->tqWaiters++;
+#ifdef RX_ENABLE_LOCKS
+ osirx_AssertMine(&call->lock, "rxi_WaitforTQ lock");
+ CV_WAIT(&call->cv_tq, &call->lock);
+#else /* RX_ENABLE_LOCKS */
+ osi_rxSleep(&call->tq);
+#endif /* RX_ENABLE_LOCKS */
+ call->tqWaiters--;
+ if (call->tqWaiters == 0) {
+ call->flags &= ~RX_CALL_TQ_WAIT;
+ }
+ }
+}
/* Start a new rx remote procedure call, on the specified connection.
* If wait is set to 1, wait for a free call channel; otherwise return
* 0. Maxtime gives the maximum number of seconds this call may take,
- * after rx_MakeCall returns. After this time interval, a call to any
+ * after rx_NewCall returns. After this time interval, a call to any
* of rx_SendData, rx_ReadData, etc. will fail with RX_CALL_TIMEOUT.
* For fine grain locking, we hold the conn_call_lock in order to
* to ensure that we don't get signalle after we found a call in an active
SPLVAR;
clock_NewTime();
- dpf(("rx_MakeCall(conn %x)\n", conn));
+ dpf(("rx_NewCall(conn %x)\n", conn));
NETPRI;
clock_GetTime(&queueTime);
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
/* Now, if TQ wasn't cleared earlier, do it now. */
MUTEX_ENTER(&call->lock);
- while (call->flags & RX_CALL_TQ_BUSY) {
- call->flags |= RX_CALL_TQ_WAIT;
- call->tqWaiters++;
-#ifdef RX_ENABLE_LOCKS
- osirx_AssertMine(&call->lock, "rxi_Start lock4");
- CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
- osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
- call->tqWaiters--;
- if (call->tqWaiters == 0) {
- call->flags &= ~RX_CALL_TQ_WAIT;
- }
- }
+ rxi_WaitforTQBusy(call);
if (call->flags & RX_CALL_TQ_CLEARME) {
rxi_ClearTransmitQueue(call, 0);
queue_Init(&call->tq);
MUTEX_EXIT(&call->lock);
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+ dpf(("rx_NewCall(call %x)\n", call));
return call;
}
return 0;
}
+/* Set configuration options for all of a service's security objects */
+
+afs_int32
+rx_SetSecurityConfiguration(struct rx_service *service,
+ rx_securityConfigVariables type,
+ void *value)
+{
+ int i;
+ for (i = 0; i<service->nSecurityObjects; i++) {
+ if (service->securityObjects[i]) {
+ RXS_SetConfiguration(service->securityObjects[i], NULL, type,
+ value, NULL);
+ }
+ }
+ return 0;
+}
+
struct rx_service *
rx_NewService(u_short port, u_short serviceId, char *serviceName,
struct rx_securityClass **securityObjects, int nSecurityObjects,
void
rx_SetArrivalProc(register struct rx_call *call,
register void (*proc) (register struct rx_call * call,
- register VOID * mh,
+ register void * mh,
register int index),
- register VOID * handle, register int arg)
+ register void * handle, register int arg)
{
call->arrivalProc = proc;
call->arrivalProcHandle = handle;
register struct rx_call *nxp; /* Next call pointer, for queue_Scan */
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
+ dpf(("rxi_NewCall(conn %x, channel %d)\n", conn, channel));
+
/* Grab an existing call structure, or allocate a new one.
* Existing call structures are assumed to have been left reset by
* rxi_FreeCall */
call = queue_First(&rx_freeCallQueue, rx_call);
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
queue_Remove(call);
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.nFreeCallStructs--;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexDecrement(rx_stats.nFreeCallStructs, rx_stats_mutex);
MUTEX_EXIT(&rx_freeCallQueue_lock);
MUTEX_ENTER(&call->lock);
CLEAR_CALL_QUEUE_LOCK(call);
CV_INIT(&call->cv_rq, "call rq", CV_DEFAULT, 0);
CV_INIT(&call->cv_tq, "call tq", CV_DEFAULT, 0);
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.nCallStructs++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.nFreeCallStructs, rx_stats_mutex);
/* Initialize once-only items */
queue_Init(&call->tq);
queue_Init(&call->rq);
#else /* AFS_GLOBAL_RXLOCK_KERNEL */
queue_Append(&rx_freeCallQueue, call);
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.nFreeCallStructs++;
- MUTEX_EXIT(&rx_stats_mutex);
-
+ rx_MutexIncrement(rx_stats.nFreeCallStructs, rx_stats_mutex);
MUTEX_EXIT(&rx_freeCallQueue_lock);
/* Destroy the connection if it was previously slated for
{
register char *p;
- MUTEX_ENTER(&rx_stats_mutex);
- rxi_Alloccnt++;
- rxi_Allocsize += (afs_int32)size;
- MUTEX_EXIT(&rx_stats_mutex);
-
+ rx_MutexAdd1Increment2(rxi_Allocsize, (afs_int32)size, rxi_Alloccnt, rx_stats_mutex);
p = (char *)osi_Alloc(size);
if (!p)
void
rxi_Free(void *addr, register size_t size)
{
- MUTEX_ENTER(&rx_stats_mutex);
- rxi_Alloccnt--;
- rxi_Allocsize -= (afs_int32)size;
- MUTEX_EXIT(&rx_stats_mutex);
-
+ rx_MutexAdd1Decrement2(rxi_Allocsize, -(afs_int32)size, rxi_Alloccnt, rx_stats_mutex);
osi_Free(addr, size);
}
pp->next = rx_peerHashTable[hashIndex];
rx_peerHashTable[hashIndex] = pp;
rxi_InitPeerParams(pp);
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.nPeerStructs++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.nPeerStructs, rx_stats_mutex);
}
}
if (pp && create) {
/* XXXX Connection timeout? */
if (service->newConnProc)
(*service->newConnProc) (conn);
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.nServerConns++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.nServerConns, rx_stats_mutex);
}
MUTEX_ENTER(&conn->conn_data_lock);
* then, since this is a client connection we're getting data for
* it must be for the previous call.
*/
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.spuriousPacketsRead++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.spuriousPacketsRead, rx_stats_mutex);
MUTEX_ENTER(&conn->conn_data_lock);
conn->refCount--;
MUTEX_EXIT(&conn->conn_data_lock);
if (type == RX_SERVER_CONNECTION) { /* We're the server */
if (np->header.callNumber < currentCallNumber) {
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.spuriousPacketsRead++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.spuriousPacketsRead, rx_stats_mutex);
#ifdef RX_ENABLE_LOCKS
if (call)
MUTEX_EXIT(&call->lock);
MUTEX_ENTER(&conn->conn_data_lock);
conn->refCount--;
MUTEX_EXIT(&conn->conn_data_lock);
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.nBusies++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.nBusies, rx_stats_mutex);
return tp;
}
rxi_KeepAliveOn(call);
MUTEX_ENTER(&conn->conn_data_lock);
conn->refCount--;
MUTEX_EXIT(&conn->conn_data_lock);
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.nBusies++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.nBusies, rx_stats_mutex);
return tp;
}
rxi_KeepAliveOn(call);
/* Ignore all incoming acknowledgements for calls in DALLY state */
if (call && (call->state == RX_STATE_DALLY)
&& (np->header.type == RX_PACKET_TYPE_ACK)) {
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.ignorePacketDally++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.ignorePacketDally, rx_stats_mutex);
#ifdef RX_ENABLE_LOCKS
if (call) {
MUTEX_EXIT(&call->lock);
/* Ignore anything that's not relevant to the current call. If there
* isn't a current call, then no packet is relevant. */
if (!call || (np->header.callNumber != currentCallNumber)) {
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.spuriousPacketsRead++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.spuriousPacketsRead, rx_stats_mutex);
#ifdef RX_ENABLE_LOCKS
if (call) {
MUTEX_EXIT(&call->lock);
* XXX interact badly with the server-restart detection
* XXX code in receiveackpacket. */
if (ntohl(rx_GetInt32(np, FIRSTACKOFFSET)) < call->tfirst) {
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.spuriousPacketsRead++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.spuriousPacketsRead, rx_stats_mutex);
MUTEX_EXIT(&call->lock);
MUTEX_ENTER(&conn->conn_data_lock);
conn->refCount--;
int isFirst;
struct rx_packet *tnp;
struct clock when;
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.dataPacketsRead++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.dataPacketsRead, rx_stats_mutex);
#ifdef KERNEL
/* If there are no packet buffers, drop this new packet, unless we can find
MUTEX_ENTER(&rx_freePktQ_lock);
rxi_NeedMorePackets = TRUE;
MUTEX_EXIT(&rx_freePktQ_lock);
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.noPacketBuffersOnRead++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.nPacketBuffersOnRead, rx_stats_mutex);
call->rprev = np->header.serial;
rxi_calltrace(RX_TRACE_DROP, call);
dpf(("packet %x dropped on receipt - quota problems", np));
/* Check to make sure it is not a duplicate of one already queued */
if (queue_IsNotEmpty(&call->rq)
&& queue_First(&call->rq, rx_packet)->header.seq == seq) {
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.dupPacketsRead++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.dupPacketsRead, rx_stats_mutex);
dpf(("packet %x dropped on receipt - duplicate", np));
rxevent_Cancel(call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
/* If the new packet's sequence number has been sent to the
* application already, then this is a duplicate */
if (seq < call->rnext) {
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.dupPacketsRead++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.dupPacketsRead, rx_stats_mutex);
rxevent_Cancel(call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack);
0, queue_Scan(&call->rq, tp, nxp, rx_packet)) {
/*Check for duplicate packet */
if (seq == tp->header.seq) {
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.dupPacketsRead++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.dupPacketsRead, rx_stats_mutex);
rxevent_Cancel(call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE,
/* We need to send an ack of the packet is out of sequence,
* or if an ack was requested by the peer. */
- if (seq != prev + 1 || missing || (flags & RX_REQUEST_ACK)) {
+ if (seq != prev + 1 || missing) {
ackNeeded = RX_ACK_OUT_OF_SEQUENCE;
- }
+ } else if (flags & RX_REQUEST_ACK) {
+ ackNeeded = RX_ACK_REQUESTED;
+ }
/* Acknowledge the last packet for each call */
if (flags & RX_LAST_PACKET) {
u_short maxMTU = 0; /* Set if peer supports AFS 3.4a jumbo datagrams */
int maxDgramPackets = 0; /* Set if peer supports AFS 3.5 jumbo datagrams */
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.ackPacketsRead++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.ackPacketsRead, rx_stats_mutex);
ap = (struct rx_ackPacket *)rx_DataOf(np);
nbytes = rx_Contiguous(np) - (int)((ap->acks) - (u_char *) ap);
if (nbytes < 0)
return np;
}
call->flags |= RX_CALL_FAST_RECOVER_WAIT;
- while (call->flags & RX_CALL_TQ_BUSY) {
- call->flags |= RX_CALL_TQ_WAIT;
- call->tqWaiters++;
-#ifdef RX_ENABLE_LOCKS
- osirx_AssertMine(&call->lock, "rxi_Start lock2");
- CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
- osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
- call->tqWaiters--;
- if (call->tqWaiters == 0)
- call->flags &= ~RX_CALL_TQ_WAIT;
- }
+ rxi_WaitforTQBusy(call);
MUTEX_ENTER(&peer->peer_lock);
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
}
}
conn->error = error;
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.fatalErrors++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.fatalErrors, rx_stats_mutex);
}
}
register struct rx_peer *peer;
struct rx_packet *packet;
+ dpf(("rxi_ResetCall(call %x, newcall %d)\n", call, newcall));
+
/* Notify anyone who is waiting for asynchronous packet arrival */
if (call->arrivalProc) {
(*call->arrivalProc) (call, call->arrivalProcHandle,
nbytes -= p->wirevec[i].iov_len;
}
}
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.ackPacketsSent++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.ackPacketsSent, rx_stats_mutex);
#ifndef RX_ENABLE_TSFPQ
if (!optionalPacket)
rxi_FreePacket(p);
peer->nSent += len;
if (resending)
peer->reSends += len;
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.dataPacketsSent += len;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.dataPacketsSent, rx_stats_mutex);
MUTEX_EXIT(&peer->peer_lock);
if (list[len - 1]->header.flags & RX_LAST_PACKET) {
* packet until the congestion window reaches the ack rate. */
if (list[i]->header.serial) {
requestAck = 1;
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.dataPacketsReSent++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.dataPacketsReSent, rx_stats_mutex);
} else {
/* improved RTO calculation- not Karn */
list[i]->firstSent = *now;
peer->nSent++;
if (resending)
peer->reSends++;
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.dataPacketsSent++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.dataPacketsSent, rx_stats_mutex);
MUTEX_EXIT(&peer->peer_lock);
/* Tag this packet as not being the last in this group,
return;
}
call->flags |= RX_CALL_FAST_RECOVER_WAIT;
- while (call->flags & RX_CALL_TQ_BUSY) {
- call->flags |= RX_CALL_TQ_WAIT;
- call->tqWaiters++;
-#ifdef RX_ENABLE_LOCKS
- osirx_AssertMine(&call->lock, "rxi_Start lock1");
- CV_WAIT(&call->cv_tq, &call->lock);
-#else /* RX_ENABLE_LOCKS */
- osi_rxSleep(&call->tq);
-#endif /* RX_ENABLE_LOCKS */
- call->tqWaiters--;
- if (call->tqWaiters == 0)
- call->flags &= ~RX_CALL_TQ_WAIT;
- }
+ rxi_WaitforTQBusy(call);
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */
call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
call->flags |= RX_CALL_FAST_RECOVER;
}
if (call->error) {
#ifdef AFS_GLOBAL_RXLOCK_KERNEL
- MUTEX_ENTER(&rx_stats_mutex);
- rx_tq_debug.rxi_start_in_error++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_tq_debug.rxi_start_in_error, rx_stats_mutex);
#endif
return;
}
osi_Panic("rxi_Start: xmit queue clobbered");
}
if (p->flags & RX_PKTFLAG_ACKED) {
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.ignoreAckedPacket++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_stats.ignoreAckedPacket, rx_stats_mutex);
continue; /* Ignore this packet if it has been acknowledged */
}
* the time to reset the call. This will also inform the using
* process that the call is in an error state.
*/
- MUTEX_ENTER(&rx_stats_mutex);
- rx_tq_debug.rxi_start_aborted++;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexIncrement(rx_tq_debug.rxi_start_aborted, rx_stats_mutex);
call->flags &= ~RX_CALL_TQ_BUSY;
if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
dpf(("call %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
(char *)&error, sizeof(error), 0);
rxi_FreePacket(packet);
}
+ CALL_RELE(call, RX_CALL_REFCOUNT_ABORT);
MUTEX_EXIT(&call->lock);
}
rxi_rpc_peer_stat_cnt -= num_funcs;
}
rxi_FreePeer(peer);
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.nPeerStructs--;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexDecrement(rx_stats.nPeerStructs, rx_stats_mutex);
if (peer == *peer_ptr) {
*peer_ptr = next;
prev = next;
void *outputData, size_t outputLength)
{
static afs_int32 counter = 100;
- time_t endTime;
+ time_t waitTime, waitCount, startTime;
struct rx_header theader;
char tbuffer[1500];
register afs_int32 code;
- struct timeval tv;
+ struct timeval tv_now, tv_wake, tv_delta;
struct sockaddr_in taddr, faddr;
int faddrLen;
fd_set imask;
register char *tp;
- endTime = time(0) + 20; /* try for 20 seconds */
+ startTime = time(0);
+ waitTime = 1;
+ waitCount = 5;
LOCK_RX_DEBUG;
counter++;
UNLOCK_RX_DEBUG;
(struct sockaddr *)&taddr, sizeof(struct sockaddr_in));
/* see if there's a packet available */
- FD_ZERO(&imask);
- FD_SET(socket, &imask);
- tv.tv_sec = 1;
- tv.tv_usec = 0;
- code = select((int)(socket + 1), &imask, 0, 0, &tv);
- if (code == 1 && FD_ISSET(socket, &imask)) {
- /* now receive a packet */
- faddrLen = sizeof(struct sockaddr_in);
- code =
- recvfrom(socket, tbuffer, sizeof(tbuffer), 0,
- (struct sockaddr *)&faddr, &faddrLen);
-
- if (code > 0) {
- memcpy(&theader, tbuffer, sizeof(struct rx_header));
- if (counter == ntohl(theader.callNumber))
- break;
+ gettimeofday(&tv_wake,0);
+ tv_wake.tv_sec += waitTime;
+ for (;;) {
+ FD_ZERO(&imask);
+ FD_SET(socket, &imask);
+ tv_delta.tv_sec = tv_wake.tv_sec;
+ tv_delta.tv_usec = tv_wake.tv_usec;
+ gettimeofday(&tv_now, 0);
+
+ if (tv_delta.tv_usec < tv_now.tv_usec) {
+ /* borrow */
+ tv_delta.tv_usec += 1000000;
+ tv_delta.tv_sec--;
+ }
+ tv_delta.tv_usec -= tv_now.tv_usec;
+
+ if (tv_delta.tv_sec < tv_now.tv_sec) {
+ /* time expired */
+ break;
+ }
+ tv_delta.tv_sec -= tv_now.tv_sec;
+
+ code = select(socket + 1, &imask, 0, 0, &tv_delta);
+ if (code == 1 && FD_ISSET(socket, &imask)) {
+ /* now receive a packet */
+ faddrLen = sizeof(struct sockaddr_in);
+ code =
+ recvfrom(socket, tbuffer, sizeof(tbuffer), 0,
+ (struct sockaddr *)&faddr, &faddrLen);
+
+ if (code > 0) {
+ memcpy(&theader, tbuffer, sizeof(struct rx_header));
+ if (counter == ntohl(theader.callNumber))
+ goto success;
+ continue;
+ }
}
+ break;
}
/* see if we've timed out */
- if (endTime < time(0))
- return -1;
+ if (!--waitCount) {
+ return -1;
+ }
+ waitTime <<= 1;
}
+
+ success:
code -= sizeof(struct rx_header);
if (code > outputLength)
code = outputLength;
}
next = peer->next;
rxi_FreePeer(peer);
- MUTEX_ENTER(&rx_stats_mutex);
- rx_stats.nPeerStructs--;
- MUTEX_EXIT(&rx_stats_mutex);
+ rx_MutexDecrement(rx_stats.nPeerStructs, rx_stats_mutex);
}
}
}
int isServer)
{
+ if (!(rxi_monitor_peerStats || rxi_monitor_processStats))
+ return;
+
MUTEX_ENTER(&rx_rpc_stats);
MUTEX_ENTER(&peer->peer_lock);