Additional functionality for rxperf
authorJeffrey Altman <jaltman@your-file-system.com>
Wed, 22 Sep 2010 23:28:48 +0000 (16:28 -0700)
committerDerrick Brashear <shadow@dementia.org>
Thu, 23 Sep 2010 09:09:40 +0000 (02:09 -0700)
Add the ability to initiate multiple pthread threads issuing
calls on the same connection to the server.

Add ability to set max window size.

Add ability to adjust min peer timeout.

Change-Id: I896650b033ae0adfa42d3e5831ff799ca1331bd5
Reviewed-on: http://gerrit.openafs.org/2835
Tested-by: BuildBot <buildbot@rampaginggeek.com>
Reviewed-by: Derrick Brashear <shadow@dementia.org>
Tested-by: Derrick Brashear <shadow@dementia.org>

src/rx/test/rxperf.c

index efa112e..91ad688 100644 (file)
@@ -83,6 +83,11 @@ nn * We are using getopt since we want it to be possible to link to
 #include "rx_null.h"
 #include "rx_globals.h"
 
+#ifdef AFS_PTHREAD_ENV
+#include <pthread.h>
+#define MAX_THREADS 128
+#endif
+
 static const char *__progname;
 
 #ifndef HAVE_WARNX
@@ -155,7 +160,7 @@ err(int eval, const char *fmt, ...)
 
 #define DEFAULT_PORT 7009      /* To match tcpdump */
 #define DEFAULT_HOST "127.0.0.1"
-#define DEFAULT_BYTES 1000000
+#define DEFAULT_BYTES 1024 * 1024
 #define RXPERF_BUFSIZE 512 * 1024
 
 enum { RX_PERF_VERSION = 3 };
@@ -478,7 +483,9 @@ rxperf_ExecuteRequest(struct rx_call *call)
  */
 
 static void
-do_server(short port, int nojumbo, int maxmtu, int udpbufsz)
+do_server(short port, int nojumbo, int maxmtu, int maxwsize, int minpeertimeout,
+          int udpbufsz, int nostats, int hotthread,
+          int minprocs, int maxprocs)
 {
     struct rx_service *service;
     struct rx_securityClass *secureobj;
@@ -490,9 +497,14 @@ do_server(short port, int nojumbo, int maxmtu, int udpbufsz)
        printf("Can't initialize winsock.\n");
        exit(1);
     }
-    rx_EnableHotThread();
 #endif
 
+    if (hotthread)
+        rx_EnableHotThread();
+
+    if (nostats)
+        rx_enable_stats = 0;
+
     ret = rx_Init(htons(port));
     if (ret)
        errx(1, "rx_Init failed");
@@ -503,6 +515,14 @@ do_server(short port, int nojumbo, int maxmtu, int udpbufsz)
     if (maxmtu)
       rx_SetMaxMTU(maxmtu);
 
+    if (maxwsize) {
+        rx_SetMaxReceiveWindow(maxwsize);
+        rx_SetMaxSendWindow(maxwsize);
+    }
+
+    if (minpeertimeout)
+        rx_SetMinPeerTimeout(minpeertimeout);
+
     rx_SetUdpBufSize(udpbufsz);
 
     get_sec(1, &secureobj, &secureindex);
@@ -513,11 +533,13 @@ do_server(short port, int nojumbo, int maxmtu, int udpbufsz)
     if (service == NULL)
        errx(1, "Cant create server");
 
-    rx_SetMinProcs(service, 2);
-    rx_SetMaxProcs(service, 100);
+    rx_SetMinProcs(service, minprocs);
+    rx_SetMaxProcs(service, maxprocs);
+
     rx_SetCheckReach(service, 1);
 
     rx_StartServer(1);
+
     abort();
 }
 
@@ -571,68 +593,33 @@ readfile(const char *filename, afs_uint32 ** readwrite, afs_uint32 * size)
        err(1, "fclose");
 }
 
-
-/*
- *
- */
+struct client_data {
+    struct rx_connection *conn;
+    char *filename;
+    int command;
+    afs_int32 times;
+    afs_int32 bytes;
+    afs_int32 sendtimes;
+    afs_int32 recvtimes;
+};
 
 static void
-do_client(const char *server, short port, char *filename, afs_int32 command,
-         afs_int32 times, afs_int32 bytes, afs_int32 sendtimes, afs_int32 recvtimes,
-          int dumpstats, int nojumbo, int maxmtu, int udpbufsz)
+client_thread( void *vparams)
 {
-    struct rx_connection *conn;
+    struct client_data *params = (struct client_data *)vparams;
     struct rx_call *call;
-    afs_uint32 addr;
-    struct rx_securityClass *secureobj;
-    int secureindex;
     afs_int32 data;
-    afs_uint32 num;
-    int ret;
-    int i;
+    int i, j;
+    afs_uint32 *readwrite;
     int readp = FALSE;
-    char stamp[1024];
     afs_uint32 size;
+    afs_int32 num;
 
-    afs_uint32 *readwrite;
-
-#ifdef AFS_NT40_ENV
-    if (afs_winsockInit() < 0) {
-       printf("Can't initialize winsock.\n");
-       exit(1);
-    }
-    rx_EnableHotThread();
-#endif
-
-    addr = str2addr(server);
-
-    ret = rx_Init(0);
-    if (ret)
-       errx(1, "rx_Init failed");
-
-    if (nojumbo)
-      rx_SetNoJumbo();
-
-    if (maxmtu)
-      rx_SetMaxMTU(maxmtu);
-
-    rx_SetUdpBufSize(udpbufsz);
-
-    get_sec(0, &secureobj, &secureindex);
-
-    conn = rx_NewConnection(addr, htons(port), RX_SERVER_ID, secureobj, secureindex);
-    if (conn == NULL)
-       errx(1, "failed to contact server");
-
-    sprintf(stamp, "send\t%d times\t%d writes\t%d reads", times, sendtimes,
-           recvtimes);
-    start_timer();
-
-    for (i = 0; i < times; i++) {
+    for (i = 0; i < params->times; i++) {
 
        DBFPRINT(("starting command "));
 
-       call = rx_NewCall(conn);
+       call = rx_NewCall(params->conn);
        if (call == NULL)
            errx(1, "rx_NewCall failed");
 
@@ -640,7 +627,7 @@ do_client(const char *server, short port, char *filename, afs_int32 command,
        if (rx_Write32(call, &data) != 4)
            errx(1, "rx_Write failed to send version (err %d)", rx_Error(call));
 
-       data = htonl(command);
+       data = htonl(params->command);
        if (rx_Write32(call, &data) != 4)
            errx(1, "rx_Write failed to send command (err %d)", rx_Error(call));
 
@@ -652,16 +639,16 @@ do_client(const char *server, short port, char *filename, afs_int32 command,
            errx(1, "rx_Write failed to send write read (err %d)", rx_Error(call));
 
 
-       switch (command) {
+       switch (params->command) {
        case RX_PERF_RECV:
            DBFPRINT(("command "));
 
-           data = htonl(bytes);
+           data = htonl(params->bytes);
            if (rx_Write32(call, &data) != 4)
                errx(1, "rx_Write failed to send size (err %d)", rx_Error(call));
 
-           DBFPRINT(("sending(%d) ", bytes));
-           if (readbytes(call, bytes))
+           DBFPRINT(("sending(%d) ", params->bytes));
+           if (readbytes(call, params->bytes))
                errx(1, "sendbytes (err %d)", rx_Error(call));
 
            if (rx_Read32(call, &data) != 4)
@@ -676,12 +663,12 @@ do_client(const char *server, short port, char *filename, afs_int32 command,
        case RX_PERF_SEND:
            DBFPRINT(("command "));
 
-           data = htonl(bytes);
+           data = htonl(params->bytes);
            if (rx_Write32(call, &data) != 4)
                errx(1, "rx_Write failed to send size (err %d)", rx_Error(call));
 
-           DBFPRINT(("sending(%d) ", bytes));
-           if (sendbytes(call, bytes))
+           DBFPRINT(("sending(%d) ", params->bytes));
+           if (sendbytes(call, params->bytes))
                errx(1, "sendbytes (err %d)", rx_Error(call));
 
            if (rx_Read32(call, &data) != 4)
@@ -696,33 +683,34 @@ do_client(const char *server, short port, char *filename, afs_int32 command,
        case RX_PERF_RPC:
            DBFPRINT(("commands "));
 
-           data = htonl(sendtimes);
+           data = htonl(params->sendtimes);
            if (rx_Write32(call, &data) != 4)
                errx(1, "rx_Write failed to send command (err %d)", rx_Error(call));
 
-           data = htonl(recvtimes);
+           data = htonl(params->recvtimes);
            if (rx_Write32(call, &data) != 4)
                errx(1, "rx_Write failed to send command (err %d)", rx_Error(call));
 
-           DBFPRINT(("send(%d) ", sendtimes));
-           if (sendbytes(call, sendtimes))
+           DBFPRINT(("send(%d) ", params->sendtimes));
+           if (sendbytes(call, params->sendtimes))
                errx(1, "sendbytes (err %d)", rx_Error(call));
 
-           DBFPRINT(("recv(%d) ", recvtimes));
-           if (readbytes(call, recvtimes))
+           DBFPRINT(("recv(%d) ", params->recvtimes));
+           if (readbytes(call, params->recvtimes))
                errx(1, "sendbytes (err %d)", rx_Error(call));
 
-           if (rx_Read32(call, &bytes) != 4)
+           if (rx_Read32(call, &data) != 4)
                errx(1, "failed to read result from server (err %d)", rx_Error(call));
 
-           if (bytes != htonl(RXPERF_MAGIC_COOKIE))
+           if (data != htonl(RXPERF_MAGIC_COOKIE))
                warn("server send wrong magic cookie in responce");
 
            DBFPRINT(("done\n"));
 
            break;
+
        case RX_PERF_FILE:
-           readfile(filename, &readwrite, &num);
+           readfile(params->filename, &readwrite, &num);
 
            data = htonl(num);
            if (rx_Write32(call, &data) != 4)
@@ -732,11 +720,11 @@ do_client(const char *server, short port, char *filename, afs_int32 command,
                != num * sizeof(afs_uint32))
                errx(1, "rx_Write failed to send list (err %d)", rx_Error(call));
 
-           for (i = 0; i < num; i++) {
-               if (readwrite[i] == 0)
+           for (j = 0; j < num; j++) {
+               if (readwrite[j] == 0)
                    readp = !readp;
 
-               size = ntohl(readwrite[i]) * sizeof(afs_uint32);
+               size = ntohl(readwrite[j]) * sizeof(afs_uint32);
 
                if (readp) {
                    if (readbytes(call, size))
@@ -756,6 +744,123 @@ do_client(const char *server, short port, char *filename, afs_int32 command,
        rx_EndCall(call, 0);
     }
 
+#ifdef AFS_PTHREAD_ENV
+    pthread_exit(NULL);
+#endif
+}
+
+/*
+ *
+ */
+
+static void
+do_client(const char *server, short port, char *filename, afs_int32 command,
+         afs_int32 times, afs_int32 bytes, afs_int32 sendtimes, afs_int32 recvtimes,
+          int dumpstats, int nojumbo, int maxmtu, int maxwsize, int minpeertimeout,
+          int udpbufsz, int nostats, int hotthread, int threads)
+{
+    struct rx_connection *conn;
+    afs_uint32 addr;
+    struct rx_securityClass *secureobj;
+    int secureindex;
+    int ret;
+    int i;
+    char stamp[2048];
+    struct client_data params;
+
+#ifdef AFS_PTHREAD_ENV
+    pthread_t thread[MAX_THREADS];
+    pthread_attr_t tattr;
+    void *status;
+#endif
+
+#ifdef AFS_NT40_ENV
+    if (afs_winsockInit() < 0) {
+       printf("Can't initialize winsock.\n");
+       exit(1);
+    }
+#endif
+
+    if (hotthread)
+        rx_EnableHotThread();
+
+    if (nostats)
+        rx_enable_stats = 0;
+
+    addr = str2addr(server);
+
+    ret = rx_Init(0);
+    if (ret)
+       errx(1, "rx_Init failed");
+
+    if (nojumbo)
+      rx_SetNoJumbo();
+
+    if (maxmtu)
+      rx_SetMaxMTU(maxmtu);
+
+    if (maxwsize) {
+        rx_SetMaxReceiveWindow(maxwsize);
+        rx_SetMaxSendWindow(maxwsize);
+    }
+
+    if (minpeertimeout)
+        rx_SetMinPeerTimeout(minpeertimeout);
+
+    rx_SetUdpBufSize(udpbufsz);
+
+    get_sec(0, &secureobj, &secureindex);
+
+    conn = rx_NewConnection(addr, htons(port), RX_SERVER_ID, secureobj, secureindex);
+    if (conn == NULL)
+       errx(1, "failed to contact server");
+
+#ifdef AFS_PTHREAD_ENV
+    pthread_attr_init(&tattr);
+    pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_JOINABLE);
+#endif
+
+    params.conn = conn;
+    params.filename = filename;
+    params.command = command;
+    params.times = times;
+    params.bytes = bytes;
+    params.sendtimes = sendtimes;
+    params.recvtimes = recvtimes;
+
+    switch (command) {
+    case RX_PERF_RPC:
+        sprintf(stamp, "RPC: threads\t%d, times\t%d, writes\t%d, reads\t%d",
+                 threads, times, sendtimes, recvtimes);
+        break;
+    case RX_PERF_RECV:
+        sprintf(stamp, "RECV: threads\t%d, times\t%d, bytes\t%d",
+                 threads, times, bytes);
+        break;
+    case RX_PERF_SEND:
+        sprintf(stamp, "SEND: threads\t%d, times\t%d, bytes\t%d",
+                 threads, times, bytes);
+        break;
+    case RX_PERF_FILE:
+        sprintf(stamp, "FILE %s: threads\t%d, times\t%d, bytes\t%d",
+                 filename, threads, times, bytes);
+        break;
+    }
+
+    start_timer();
+
+#ifdef AFS_PTHREAD_ENV
+    for ( i=0; i<threads; i++)
+        pthread_create(&thread[i], &tattr, client_thread, &params);
+#else
+        client_thread(&params);
+#endif
+
+#ifdef AFS_PTHREAD_ENV
+    for ( i=0; i<threads; i++)
+        pthread_join(thread[i], &status);
+#endif
+
     end_and_print_timer(stamp);
     DBFPRINT(("done for good\n"));
 
@@ -764,6 +869,10 @@ do_client(const char *server, short port, char *filename, afs_int32 command,
        rx_PrintPeerStats(stdout, conn->peer);
     }
     rx_Finalize();
+
+#ifdef AFS_PTHREAD_ENV
+    pthread_attr_destroy(&tattr);
+#endif
 }
 
 static void
@@ -786,6 +895,8 @@ usage(void)
     exit(1);
 }
 
+
+
 /*
  * do argument processing and call networking functions
  */
@@ -796,11 +907,17 @@ rxperf_server(int argc, char **argv)
     short port = DEFAULT_PORT;
     int nojumbo = 0;
     int maxmtu = 0;
+    int nostats = 0;
     int udpbufsz = 64 * 1024;
+    int hotthreads = 0;
+    int minprocs = 2;
+    int maxprocs = 20;
+    int maxwsize = 0;
+    int minpeertimeout = 0;
     char *ptr;
     int ch;
 
-    while ((ch = getopt(argc, argv, "r:d:p:w:jm:u:4")) != -1) {
+    while ((ch = getopt(argc, argv, "r:d:p:P:w:W:HNjm:u:4:s:S")) != -1) {
        switch (ch) {
        case 'd':
 #ifdef RXDEBUG
@@ -819,8 +936,23 @@ rxperf_server(int argc, char **argv)
                errx(1, "%d > sizeof(somebuf) (%d)", rxread_size,
                     sizeof(somebuf));
            break;
+       case 's':
+           minprocs = strtol(optarg, &ptr, 0);
+           if (ptr != 0 && ptr[0] != '\0')
+               errx(1, "can't resolve minprocs");
+           break;
+       case 'S':
+           maxprocs = strtol(optarg, &ptr, 0);
+           if (ptr != 0 && ptr[0] != '\0')
+               errx(1, "can't resolve maxprocs");
+           break;
+       case 'P':
+           minpeertimeout = strtol(optarg, &ptr, 0);
+           if (ptr != 0 && ptr[0] != '\0')
+               errx(1, "can't resolve min peer timeout");
+           break;
        case 'p':
-           port = strtol(optarg, &ptr, 0);
+           port = (short) strtol(optarg, &ptr, 0);
            if (ptr != 0 && ptr[0] != '\0')
                errx(1, "can't resolve portname");
            break;
@@ -835,6 +967,12 @@ rxperf_server(int argc, char **argv)
        case 'j':
          nojumbo=1;
          break;
+       case 'N':
+         nostats=1;
+         break;
+       case 'H':
+           hotthreads = 1;
+           break;
        case 'm':
          maxmtu = strtol(optarg, &ptr, 0);
          if (ptr && *ptr != '\0')
@@ -845,6 +983,11 @@ rxperf_server(int argc, char **argv)
            if (ptr && *ptr != '\0')
                errx(1, "can't resolve upd buffer size (Kbytes)");
            break;
+       case 'W':
+           maxwsize = strtol(optarg, &ptr, 0);
+           if (ptr && *ptr != '\0')
+               errx(1, "can't resolve max send/recv window size (packets)");
+           break;
        case '4':
          RX_IPUDP_SIZE = 28;
          break;
@@ -856,7 +999,8 @@ rxperf_server(int argc, char **argv)
     if (optind != argc)
        usage();
 
-    do_server(port, nojumbo, maxmtu, udpbufsz);
+    do_server(port, nojumbo, maxmtu, maxwsize, minpeertimeout, udpbufsz,
+              nostats, hotthreads, minprocs, maxprocs);
 
     return 0;
 }
@@ -878,14 +1022,19 @@ rxperf_client(int argc, char **argv)
     int times = 100;
     int dumpstats = 0;
     int nojumbo = 0;
+    int nostats = 0;
     int maxmtu = 0;
+    int hotthreads = 0;
+    int threads = 1;
     int udpbufsz = 64 * 1024;
+    int maxwsize = 0;
+    int minpeertimeout = 0;
     char *ptr;
     int ch;
 
     cmd = RX_PERF_UNKNOWN;
 
-    while ((ch = getopt(argc, argv, "T:S:R:b:c:d:p:r:s:w:f:Djm:u:4")) != -1) {
+    while ((ch = getopt(argc, argv, "T:S:R:b:c:d:p:P:r:s:w:W:f:HDNjm:u:4:t:")) != -1) {
        switch (ch) {
        case 'b':
            bytes = strtol(optarg, &ptr, 0);
@@ -913,8 +1062,13 @@ rxperf_client(int argc, char **argv)
            errx(1, "compiled without RXDEBUG");
 #endif
            break;
+       case 'P':
+           minpeertimeout = strtol(optarg, &ptr, 0);
+           if (ptr != 0 && ptr[0] != '\0')
+               errx(1, "can't resolve min peer timeout");
+           break;
        case 'p':
-           port = strtol(optarg, &ptr, 0);
+           port = (short) strtol(optarg, &ptr, 0);
            if (ptr != 0 && ptr[0] != '\0')
                errx(1, "can't resolve portname");
            break;
@@ -939,6 +1093,11 @@ rxperf_client(int argc, char **argv)
                errx(1, "%d > sizeof(somebuf) (%d)", rxwrite_size,
                     sizeof(somebuf));
            break;
+       case 'W':
+           maxwsize = strtol(optarg, &ptr, 0);
+           if (ptr && *ptr != '\0')
+               errx(1, "can't resolve max send/recv window size (packets)");
+           break;
        case 'T':
            times = strtol(optarg, &ptr, 0);
            if (ptr && *ptr != '\0')
@@ -954,12 +1113,29 @@ rxperf_client(int argc, char **argv)
            if (ptr && *ptr != '\0')
                errx(1, "can't resolve number of bytes to receive");
            break;
+       case 't':
+#ifdef AFS_PTHREAD_ENV
+           threads = strtol(optarg, &ptr, 0);
+           if (ptr && *ptr != '\0')
+               errx(1, "can't resolve number of threads to execute");
+            if (threads > MAX_THREADS)
+               errx(1, "too many threads");
+#else
+            errx(1, "Not built for pthreads");
+#endif
+           break;
        case 'f':
            filename = optarg;
            break;
        case 'D':
            dumpstats = 1;
            break;
+       case 'N':
+           nostats = 1;
+           break;
+       case 'H':
+           hotthreads = 1;
+           break;
        case 'j':
          nojumbo=1;
          break;
@@ -981,6 +1157,12 @@ rxperf_client(int argc, char **argv)
        }
     }
 
+    if (nostats && dumpstats)
+        errx(1, "cannot set both -N and -D");
+
+    if (threads > 1 && cmd == RX_PERF_FILE)
+        errx(1, "cannot use multiple threads with file command");
+
     if (optind != argc)
        usage();
 
@@ -988,7 +1170,8 @@ rxperf_client(int argc, char **argv)
        errx(1, "no command given to the client");
 
     do_client(host, port, filename, cmd, times, bytes, sendtimes,
-             recvtimes, dumpstats, nojumbo, maxmtu, udpbufsz);
+             recvtimes, dumpstats, nojumbo, maxmtu, maxwsize, minpeertimeout,
+              udpbufsz, nostats, hotthreads, threads);
 
     return 0;
 }