#include "rx_null.h"
#include "rx_globals.h"
+#ifdef AFS_PTHREAD_ENV
+#include <pthread.h>
+#define MAX_THREADS 128
+#endif
+
static const char *__progname;
#ifndef HAVE_WARNX
#define DEFAULT_PORT 7009 /* To match tcpdump */
#define DEFAULT_HOST "127.0.0.1"
-#define DEFAULT_BYTES 1000000
+#define DEFAULT_BYTES 1024 * 1024
#define RXPERF_BUFSIZE 512 * 1024
enum { RX_PERF_VERSION = 3 };
afs_int32 rxwrite_size = sizeof(somebuf);
afs_int32 rxread_size = sizeof(somebuf);
+afs_int32 use_rx_readv = 0;
static int
-readbytes(struct rx_call *call, afs_int32 bytes)
+do_readbytes(struct rx_call *call, afs_int32 bytes)
{
+ struct iovec tiov[RX_MAXIOVECS];
afs_int32 size;
+ int tnio;
+ int code;
while (bytes > 0) {
size = rxread_size;
+
if (size > bytes)
size = bytes;
- if (rx_Read(call, somebuf, size) != size)
- return 1;
+ if (use_rx_readv) {
+ if (size > RX_MAX_PACKET_DATA_SIZE)
+ size = RX_MAX_PACKET_DATA_SIZE;
+ code = rx_Readv(call, tiov, &tnio, RX_MAXIOVECS, size);
+ } else
+ code = rx_Read(call, somebuf, size);
+ if (code != size)
+ return 1;
+
bytes -= size;
}
return 0;
}
static int
-sendbytes(struct rx_call *call, afs_int32 bytes)
+do_sendbytes(struct rx_call *call, afs_int32 bytes)
{
afs_int32 size;
{
afs_int32 version;
afs_int32 command;
- afs_uint32 bytes;
- afs_uint32 recvb;
- afs_uint32 sendb;
- afs_uint32 data;
+ afs_int32 bytes;
+ afs_int32 recvb;
+ afs_int32 sendb;
+ afs_int32 data;
afs_uint32 num;
afs_uint32 *readwrite;
afs_uint32 i;
bytes = ntohl(bytes);
DBFPRINT(("reading(%d) ", bytes));
- readbytes(call, bytes);
+ do_readbytes(call, bytes);
data = htonl(RXPERF_MAGIC_COOKIE);
if (rx_Write32(call, &data) != 4) {
sendb = ntohl(sendb);
DBFPRINT(("read(%d) ", recvb));
- if (readbytes(call, recvb)) {
- warnx("readbytes failed");
+ if (do_readbytes(call, recvb)) {
+ warnx("do_readbytes failed");
return -1;
}
DBFPRINT(("send(%d) ", sendb));
- if (sendbytes(call, sendb)) {
+ if (do_sendbytes(call, sendb)) {
warnx("sendbytes failed");
return -1;
}
if (readp) {
DBFPRINT(("read\n"));
- readbytes(call, bytes);
+ do_readbytes(call, bytes);
} else {
- sendbytes(call, bytes);
+ do_sendbytes(call, bytes);
DBFPRINT(("send\n"));
}
}
bytes = ntohl(bytes);
DBFPRINT(("sending(%d) ", bytes));
- sendbytes(call, bytes);
+ do_sendbytes(call, bytes);
data = htonl(RXPERF_MAGIC_COOKIE);
if (rx_Write32(call, &data) != 4) {
*/
static void
-do_server(short port, int nojumbo, int maxmtu)
+do_server(short port, int nojumbo, int maxmtu, int maxwsize, int minpeertimeout,
+ int udpbufsz, int nostats, int hotthread,
+ int minprocs, int maxprocs)
{
struct rx_service *service;
struct rx_securityClass *secureobj;
printf("Can't initialize winsock.\n");
exit(1);
}
- rx_EnableHotThread();
#endif
+ if (hotthread)
+ rx_EnableHotThread();
+
+ if (nostats)
+ rx_enable_stats = 0;
+
+ rx_SetUdpBufSize(udpbufsz);
+
ret = rx_Init(htons(port));
if (ret)
errx(1, "rx_Init failed");
if (maxmtu)
rx_SetMaxMTU(maxmtu);
+ if (maxwsize) {
+ rx_SetMaxReceiveWindow(maxwsize);
+ rx_SetMaxSendWindow(maxwsize);
+ }
+
+ if (minpeertimeout)
+ rx_SetMinPeerTimeout(minpeertimeout);
+
+
get_sec(1, &secureobj, &secureindex);
service =
if (service == NULL)
errx(1, "Cant create server");
- rx_SetMinProcs(service, 2);
- rx_SetMaxProcs(service, 100);
+ rx_SetMinProcs(service, minprocs);
+ rx_SetMaxProcs(service, maxprocs);
+
rx_SetCheckReach(service, 1);
rx_StartServer(1);
+
abort();
}
afs_uint32 num = 0;
afs_uint32 data;
char *ptr;
- char buf[RXPERF_BUFSIZE];
+ char *buf;
*readwrite = malloc(sizeof(afs_uint32) * len);
+ buf = malloc(RXPERF_BUFSIZE);
if (*readwrite == NULL)
err(1, "malloc");
if (fclose(f) == -1)
err(1, "fclose");
+ free(buf);
}
+struct client_data {
+ struct rx_connection *conn;
+ char *filename;
+ int command;
+ afs_int32 times;
+ afs_int32 bytes;
+ afs_int32 sendbytes;
+ afs_int32 readbytes;
+};
-/*
- *
- */
-
-static void
-do_client(const char *server, short port, char *filename, afs_int32 command,
- afs_int32 times, afs_int32 bytes, afs_int32 sendtimes, afs_int32 recvtimes,
- int dumpstats, int nojumbo, int maxmtu)
+static void *
+client_thread( void *vparams)
{
- struct rx_connection *conn;
+ struct client_data *params = (struct client_data *)vparams;
struct rx_call *call;
- afs_uint32 addr;
- struct rx_securityClass *secureobj;
- int secureindex;
afs_int32 data;
- afs_int32 num;
- int ret;
- int i;
+ int i, j;
+ afs_uint32 *readwrite;
int readp = FALSE;
- char stamp[1024];
afs_uint32 size;
+ afs_uint32 num;
- afs_uint32 *readwrite;
-
-#ifdef AFS_NT40_ENV
- if (afs_winsockInit() < 0) {
- printf("Can't initialize winsock.\n");
- exit(1);
- }
- rx_EnableHotThread();
-#endif
-
- addr = str2addr(server);
-
- ret = rx_Init(0);
- if (ret)
- errx(1, "rx_Init failed");
-
- if (nojumbo)
- rx_SetNoJumbo();
-
- if (maxmtu)
- rx_SetMaxMTU(maxmtu);
-
- get_sec(0, &secureobj, &secureindex);
-
- conn = rx_NewConnection(addr, htons(port), RX_SERVER_ID, secureobj, secureindex);
- if (conn == NULL)
- errx(1, "failed to contact server");
-
- sprintf(stamp, "send\t%d times\t%d writes\t%d reads", times, sendtimes,
- recvtimes);
- start_timer();
-
- for (i = 0; i < times; i++) {
+ for (i = 0; i < params->times; i++) {
DBFPRINT(("starting command "));
- call = rx_NewCall(conn);
+ call = rx_NewCall(params->conn);
if (call == NULL)
errx(1, "rx_NewCall failed");
if (rx_Write32(call, &data) != 4)
errx(1, "rx_Write failed to send version (err %d)", rx_Error(call));
- data = htonl(command);
+ data = htonl(params->command);
if (rx_Write32(call, &data) != 4)
errx(1, "rx_Write failed to send command (err %d)", rx_Error(call));
errx(1, "rx_Write failed to send write read (err %d)", rx_Error(call));
- switch (command) {
+ switch (params->command) {
case RX_PERF_RECV:
DBFPRINT(("command "));
- data = htonl(bytes);
+ data = htonl(params->bytes);
if (rx_Write32(call, &data) != 4)
errx(1, "rx_Write failed to send size (err %d)", rx_Error(call));
- DBFPRINT(("sending(%d) ", bytes));
- if (readbytes(call, bytes))
+ DBFPRINT(("sending(%d) ", params->bytes));
+ if (do_readbytes(call, params->bytes))
errx(1, "sendbytes (err %d)", rx_Error(call));
if (rx_Read32(call, &data) != 4)
case RX_PERF_SEND:
DBFPRINT(("command "));
- data = htonl(bytes);
+ data = htonl(params->bytes);
if (rx_Write32(call, &data) != 4)
errx(1, "rx_Write failed to send size (err %d)", rx_Error(call));
- DBFPRINT(("sending(%d) ", bytes));
- if (sendbytes(call, bytes))
+ DBFPRINT(("sending(%d) ", params->bytes));
+ if (do_sendbytes(call, params->bytes))
errx(1, "sendbytes (err %d)", rx_Error(call));
if (rx_Read32(call, &data) != 4)
case RX_PERF_RPC:
DBFPRINT(("commands "));
- data = htonl(sendtimes);
+ data = htonl(params->sendbytes);
if (rx_Write32(call, &data) != 4)
errx(1, "rx_Write failed to send command (err %d)", rx_Error(call));
- data = htonl(recvtimes);
+ data = htonl(params->readbytes);
if (rx_Write32(call, &data) != 4)
errx(1, "rx_Write failed to send command (err %d)", rx_Error(call));
- DBFPRINT(("send(%d) ", sendtimes));
- if (sendbytes(call, sendtimes))
+ DBFPRINT(("send(%d) ", params->sendbytes));
+ if (do_sendbytes(call, params->sendbytes))
errx(1, "sendbytes (err %d)", rx_Error(call));
- DBFPRINT(("recv(%d) ", recvtimes));
- if (readbytes(call, recvtimes))
+ DBFPRINT(("recv(%d) ", params->readbytes));
+ if (do_readbytes(call, params->readbytes))
errx(1, "sendbytes (err %d)", rx_Error(call));
- if (rx_Read32(call, &bytes) != 4)
+ if (rx_Read32(call, &data) != 4)
errx(1, "failed to read result from server (err %d)", rx_Error(call));
- if (bytes != htonl(RXPERF_MAGIC_COOKIE))
+ if (data != htonl(RXPERF_MAGIC_COOKIE))
warn("server send wrong magic cookie in responce");
DBFPRINT(("done\n"));
break;
+
case RX_PERF_FILE:
- readfile(filename, &readwrite, &num);
+ readfile(params->filename, &readwrite, &num);
data = htonl(num);
if (rx_Write32(call, &data) != 4)
!= num * sizeof(afs_uint32))
errx(1, "rx_Write failed to send list (err %d)", rx_Error(call));
- for (i = 0; i < num; i++) {
- if (readwrite[i] == 0)
+ for (j = 0; j < num; j++) {
+ if (readwrite[j] == 0)
readp = !readp;
- size = ntohl(readwrite[i]) * sizeof(afs_uint32);
+ size = ntohl(readwrite[j]) * sizeof(afs_uint32);
if (readp) {
- if (readbytes(call, size))
+ if (do_readbytes(call, size))
errx(1, "sendbytes (err %d)", rx_Error(call));
DBFPRINT(("read\n"));
} else {
- if (sendbytes(call, size))
+ if (do_sendbytes(call, size))
errx(1, "sendbytes (err %d)", rx_Error(call));
DBFPRINT(("send\n"));
}
rx_EndCall(call, 0);
}
+#ifdef AFS_PTHREAD_ENV
+ pthread_exit(NULL);
+#endif
+
+ return NULL;
+}
+
+/*
+ *
+ */
+
+static void
+do_client(const char *server, short port, char *filename, afs_int32 command,
+ afs_int32 times, afs_int32 bytes, afs_int32 sendbytes, afs_int32 readbytes,
+ int dumpstats, int nojumbo, int maxmtu, int maxwsize, int minpeertimeout,
+ int udpbufsz, int nostats, int hotthread, int threads)
+{
+ struct rx_connection *conn;
+ afs_uint32 addr;
+ struct rx_securityClass *secureobj;
+ int secureindex;
+ int ret;
+ char stamp[2048];
+ struct client_data *params;
+
+#ifdef AFS_PTHREAD_ENV
+ int i;
+ pthread_t thread[MAX_THREADS];
+ pthread_attr_t tattr;
+ void *status;
+#endif
+
+ params = malloc(sizeof(struct client_data));
+ memset(params, 0, sizeof(struct client_data));
+
+#ifdef AFS_NT40_ENV
+ if (afs_winsockInit() < 0) {
+ printf("Can't initialize winsock.\n");
+ exit(1);
+ }
+#endif
+
+ if (hotthread)
+ rx_EnableHotThread();
+
+ if (nostats)
+ rx_enable_stats = 0;
+
+ addr = str2addr(server);
+
+ rx_SetUdpBufSize(udpbufsz);
+
+ ret = rx_Init(0);
+ if (ret)
+ errx(1, "rx_Init failed");
+
+ if (nojumbo)
+ rx_SetNoJumbo();
+
+ if (maxmtu)
+ rx_SetMaxMTU(maxmtu);
+
+ if (maxwsize) {
+ rx_SetMaxReceiveWindow(maxwsize);
+ rx_SetMaxSendWindow(maxwsize);
+ }
+
+ if (minpeertimeout)
+ rx_SetMinPeerTimeout(minpeertimeout);
+
+
+ get_sec(0, &secureobj, &secureindex);
+
+ conn = rx_NewConnection(addr, htons(port), RX_SERVER_ID, secureobj, secureindex);
+ if (conn == NULL)
+ errx(1, "failed to contact server");
+
+#ifdef AFS_PTHREAD_ENV
+ pthread_attr_init(&tattr);
+ pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_JOINABLE);
+#endif
+
+ params->conn = conn;
+ params->filename = filename;
+ params->command = command;
+ params->times = times;
+ params->bytes = bytes;
+ params->sendbytes = sendbytes;
+ params->readbytes = readbytes;
+
+ switch (command) {
+ case RX_PERF_RPC:
+ sprintf(stamp, "RPC: threads\t%d, times\t%d, write bytes\t%d, read bytes\t%d",
+ threads, times, sendbytes, readbytes);
+ break;
+ case RX_PERF_RECV:
+ sprintf(stamp, "RECV: threads\t%d, times\t%d, bytes\t%d",
+ threads, times, bytes);
+ break;
+ case RX_PERF_SEND:
+ sprintf(stamp, "SEND: threads\t%d, times\t%d, bytes\t%d",
+ threads, times, bytes);
+ break;
+ case RX_PERF_FILE:
+ sprintf(stamp, "FILE %s: threads\t%d, times\t%d, bytes\t%d",
+ filename, threads, times, bytes);
+ break;
+ }
+
+ start_timer();
+
+#ifdef AFS_PTHREAD_ENV
+ for ( i=0; i<threads; i++)
+ pthread_create(&thread[i], &tattr, client_thread, params);
+#else
+ client_thread(params);
+#endif
+
+#ifdef AFS_PTHREAD_ENV
+ for ( i=0; i<threads; i++)
+ pthread_join(thread[i], &status);
+#endif
+
end_and_print_timer(stamp);
DBFPRINT(("done for good\n"));
rx_PrintPeerStats(stdout, conn->peer);
}
rx_Finalize();
+
+#ifdef AFS_PTHREAD_ENV
+ pthread_attr_destroy(&tattr);
+#endif
+
+ free(params);
}
static void
-usage()
+usage(void)
{
#define COMMON ""
exit(1);
}
+
+
/*
* do argument processing and call networking functions
*/
short port = DEFAULT_PORT;
int nojumbo = 0;
int maxmtu = 0;
+ int nostats = 0;
+ int udpbufsz = 64 * 1024;
+ int hotthreads = 0;
+ int minprocs = 2;
+ int maxprocs = 20;
+ int maxwsize = 0;
+ int minpeertimeout = 0;
char *ptr;
int ch;
- while ((ch = getopt(argc, argv, "r:d:p:w:jm:4")) != -1) {
+ while ((ch = getopt(argc, argv, "r:d:p:P:w:W:HNjm:u:4:s:SV")) != -1) {
switch (ch) {
case 'd':
#ifdef RXDEBUG
errx(1, "%d > sizeof(somebuf) (%d)", rxread_size,
sizeof(somebuf));
break;
+ case 's':
+ minprocs = strtol(optarg, &ptr, 0);
+ if (ptr != 0 && ptr[0] != '\0')
+ errx(1, "can't resolve minprocs");
+ break;
+ case 'S':
+ maxprocs = strtol(optarg, &ptr, 0);
+ if (ptr != 0 && ptr[0] != '\0')
+ errx(1, "can't resolve maxprocs");
+ break;
+ case 'P':
+ minpeertimeout = strtol(optarg, &ptr, 0);
+ if (ptr != 0 && ptr[0] != '\0')
+ errx(1, "can't resolve min peer timeout");
+ break;
case 'p':
- port = strtol(optarg, &ptr, 0);
+ port = (short) strtol(optarg, &ptr, 0);
if (ptr != 0 && ptr[0] != '\0')
errx(1, "can't resolve portname");
break;
case 'j':
nojumbo=1;
break;
+ case 'N':
+ nostats=1;
+ break;
+ case 'H':
+ hotthreads = 1;
+ break;
case 'm':
maxmtu = strtol(optarg, &ptr, 0);
if (ptr && *ptr != '\0')
errx(1, "can't resolve rx maxmtu to use");
break;
+ case 'u':
+ udpbufsz = strtol(optarg, &ptr, 0) * 1024;
+ if (ptr && *ptr != '\0')
+ errx(1, "can't resolve upd buffer size (Kbytes)");
+ break;
+ case 'V':
+ use_rx_readv = 1;
+ break;
+ case 'W':
+ maxwsize = strtol(optarg, &ptr, 0);
+ if (ptr && *ptr != '\0')
+ errx(1, "can't resolve max send/recv window size (packets)");
+ break;
case '4':
RX_IPUDP_SIZE = 28;
break;
if (optind != argc)
usage();
- do_server(port, nojumbo, maxmtu);
+ do_server(port, nojumbo, maxmtu, maxwsize, minpeertimeout, udpbufsz,
+ nostats, hotthreads, minprocs, maxprocs);
return 0;
}
short port = DEFAULT_PORT;
char *filename = NULL;
afs_int32 cmd;
- int sendtimes = 3;
- int recvtimes = 30;
+ int sendbytes = 3;
+ int readbytes = 30;
int times = 100;
int dumpstats = 0;
int nojumbo = 0;
+ int nostats = 0;
int maxmtu = 0;
+ int hotthreads = 0;
+ int threads = 1;
+ int udpbufsz = 64 * 1024;
+ int maxwsize = 0;
+ int minpeertimeout = 0;
char *ptr;
int ch;
cmd = RX_PERF_UNKNOWN;
- while ((ch = getopt(argc, argv, "T:S:R:b:c:d:p:r:s:w:f:Djm:4")) != -1) {
+ while ((ch = getopt(argc, argv, "T:S:R:b:c:d:p:P:r:s:w:W:f:HDNjm:u:4:t:V")) != -1) {
switch (ch) {
case 'b':
bytes = strtol(optarg, &ptr, 0);
errx(1, "compiled without RXDEBUG");
#endif
break;
+ case 'P':
+ minpeertimeout = strtol(optarg, &ptr, 0);
+ if (ptr != 0 && ptr[0] != '\0')
+ errx(1, "can't resolve min peer timeout");
+ break;
case 'p':
- port = strtol(optarg, &ptr, 0);
+ port = (short) strtol(optarg, &ptr, 0);
if (ptr != 0 && ptr[0] != '\0')
errx(1, "can't resolve portname");
break;
if (host == NULL)
err(1, "strdup");
break;
+ case 'V':
+ use_rx_readv = 1;
+ break;
case 'w':
rxwrite_size = strtol(optarg, &ptr, 0);
if (ptr != 0 && ptr[0] != '\0')
errx(1, "%d > sizeof(somebuf) (%d)", rxwrite_size,
sizeof(somebuf));
break;
+ case 'W':
+ maxwsize = strtol(optarg, &ptr, 0);
+ if (ptr && *ptr != '\0')
+ errx(1, "can't resolve max send/recv window size (packets)");
+ break;
case 'T':
times = strtol(optarg, &ptr, 0);
if (ptr && *ptr != '\0')
errx(1, "can't resolve number of times to execute rpc");
break;
case 'S':
- sendtimes = strtol(optarg, &ptr, 0);
+ sendbytes = strtol(optarg, &ptr, 0);
if (ptr && *ptr != '\0')
errx(1, "can't resolve number of bytes to send");
break;
case 'R':
- recvtimes = strtol(optarg, &ptr, 0);
+ readbytes = strtol(optarg, &ptr, 0);
if (ptr && *ptr != '\0')
errx(1, "can't resolve number of bytes to receive");
break;
+ case 't':
+#ifdef AFS_PTHREAD_ENV
+ threads = strtol(optarg, &ptr, 0);
+ if (ptr && *ptr != '\0')
+ errx(1, "can't resolve number of threads to execute");
+ if (threads > MAX_THREADS)
+ errx(1, "too many threads");
+#else
+ errx(1, "Not built for pthreads");
+#endif
+ break;
case 'f':
filename = optarg;
break;
case 'D':
dumpstats = 1;
break;
+ case 'N':
+ nostats = 1;
+ break;
+ case 'H':
+ hotthreads = 1;
+ break;
case 'j':
nojumbo=1;
break;
if (ptr && *ptr != '\0')
errx(1, "can't resolve rx maxmtu to use");
break;
+ case 'u':
+ udpbufsz = strtol(optarg, &ptr, 0) * 1024;
+ if (ptr && *ptr != '\0')
+ errx(1, "can't resolve upd buffer size (Kbytes)");
+ break;
case '4':
RX_IPUDP_SIZE = 28;
break;
}
}
+ if (nostats && dumpstats)
+ errx(1, "cannot set both -N and -D");
+
+ if (threads > 1 && cmd == RX_PERF_FILE)
+ errx(1, "cannot use multiple threads with file command");
+
if (optind != argc)
usage();
if (cmd == RX_PERF_UNKNOWN)
errx(1, "no command given to the client");
- do_client(host, port, filename, cmd, times, bytes, sendtimes,
- recvtimes, dumpstats, nojumbo, maxmtu);
+ do_client(host, port, filename, cmd, times, bytes, sendbytes,
+ readbytes, dumpstats, nojumbo, maxmtu, maxwsize, minpeertimeout,
+ udpbufsz, nostats, hotthreads, threads);
return 0;
}