rxperf: use parallel connections
[openafs.git] / src / rx / test / rxperf.c
index 2985f73..3f4a918 100644 (file)
@@ -79,9 +79,9 @@ nn * We are using getopt since we want it to be possible to link to
                                 * is hopefully a err.h */
 #endif
 #include <getopt.h>
-#include "rx.h"
-#include "rx_null.h"
-#include "rx_globals.h"
+#include <rx/rx.h>
+#include <rx/rx_null.h>
+#include <rx/rx_globals.h>
 
 #ifdef AFS_PTHREAD_ENV
 #include <pthread.h>
@@ -277,25 +277,37 @@ char somebuf[RXPERF_BUFSIZE];
 
 afs_int32 rxwrite_size = sizeof(somebuf);
 afs_int32 rxread_size = sizeof(somebuf);
+afs_int32 use_rx_readv = 0;
 
 static int
-readbytes(struct rx_call *call, afs_int32 bytes)
+do_readbytes(struct rx_call *call, afs_int32 bytes)
 {
+    struct iovec tiov[RX_MAXIOVECS];
     afs_int32 size;
+    int tnio;
+    int code;
 
     while (bytes > 0) {
        size = rxread_size;
+
        if (size > bytes)
            size = bytes;
-       if (rx_Read(call, somebuf, size) != size)
-           return 1;
+       if (use_rx_readv) {
+            if (size > RX_MAX_PACKET_DATA_SIZE)
+                size = RX_MAX_PACKET_DATA_SIZE;
+            code = rx_Readv(call, tiov, &tnio, RX_MAXIOVECS, size);
+        } else
+            code = rx_Read(call, somebuf, size);
+        if (code != size)
+            return 1;
+
        bytes -= size;
     }
     return 0;
 }
 
 static int
-sendbytes(struct rx_call *call, afs_int32 bytes)
+do_sendbytes(struct rx_call *call, afs_int32 bytes)
 {
     afs_int32 size;
 
@@ -374,7 +386,7 @@ rxperf_ExecuteRequest(struct rx_call *call)
        bytes = ntohl(bytes);
 
        DBFPRINT(("reading(%d) ", bytes));
-       readbytes(call, bytes);
+       do_readbytes(call, bytes);
 
        data = htonl(RXPERF_MAGIC_COOKIE);
        if (rx_Write32(call, &data) != 4) {
@@ -399,12 +411,12 @@ rxperf_ExecuteRequest(struct rx_call *call)
        sendb = ntohl(sendb);
 
        DBFPRINT(("read(%d) ", recvb));
-       if (readbytes(call, recvb)) {
-           warnx("readbytes failed");
+       if (do_readbytes(call, recvb)) {
+           warnx("do_readbytes failed");
            return -1;
        }
        DBFPRINT(("send(%d) ", sendb));
-       if (sendbytes(call, sendb)) {
+       if (do_sendbytes(call, sendb)) {
            warnx("sendbytes failed");
            return -1;
        }
@@ -441,9 +453,9 @@ rxperf_ExecuteRequest(struct rx_call *call)
 
            if (readp) {
                DBFPRINT(("read\n"));
-               readbytes(call, bytes);
+               do_readbytes(call, bytes);
            } else {
-               sendbytes(call, bytes);
+               do_sendbytes(call, bytes);
                DBFPRINT(("send\n"));
            }
        }
@@ -459,7 +471,7 @@ rxperf_ExecuteRequest(struct rx_call *call)
        bytes = ntohl(bytes);
 
        DBFPRINT(("sending(%d) ", bytes));
-       sendbytes(call, bytes);
+       do_sendbytes(call, bytes);
 
        data = htonl(RXPERF_MAGIC_COOKIE);
        if (rx_Write32(call, &data) != 4) {
@@ -505,6 +517,8 @@ do_server(short port, int nojumbo, int maxmtu, int maxwsize, int minpeertimeout,
     if (nostats)
         rx_enable_stats = 0;
 
+    rx_SetUdpBufSize(udpbufsz);
+
     ret = rx_Init(htons(port));
     if (ret)
        errx(1, "rx_Init failed");
@@ -523,7 +537,6 @@ do_server(short port, int nojumbo, int maxmtu, int maxwsize, int minpeertimeout,
     if (minpeertimeout)
         rx_SetMinPeerTimeout(minpeertimeout);
 
-    rx_SetUdpBufSize(udpbufsz);
 
     get_sec(1, &secureobj, &secureindex);
 
@@ -555,9 +568,10 @@ readfile(const char *filename, afs_uint32 ** readwrite, afs_uint32 * size)
     afs_uint32 num = 0;
     afs_uint32 data;
     char *ptr;
-    char buf[RXPERF_BUFSIZE];
+    char *buf;
 
     *readwrite = malloc(sizeof(afs_uint32) * len);
+    buf = malloc(RXPERF_BUFSIZE);
 
     if (*readwrite == NULL)
        err(1, "malloc");
@@ -591,6 +605,7 @@ readfile(const char *filename, afs_uint32 ** readwrite, afs_uint32 * size)
 
     if (fclose(f) == -1)
        err(1, "fclose");
+    free(buf);
 }
 
 struct client_data {
@@ -599,8 +614,8 @@ struct client_data {
     int command;
     afs_int32 times;
     afs_int32 bytes;
-    afs_int32 sendtimes;
-    afs_int32 recvtimes;
+    afs_int32 sendbytes;
+    afs_int32 readbytes;
 };
 
 static void *
@@ -648,7 +663,7 @@ client_thread( void *vparams)
                errx(1, "rx_Write failed to send size (err %d)", rx_Error(call));
 
            DBFPRINT(("sending(%d) ", params->bytes));
-           if (readbytes(call, params->bytes))
+           if (do_readbytes(call, params->bytes))
                errx(1, "sendbytes (err %d)", rx_Error(call));
 
            if (rx_Read32(call, &data) != 4)
@@ -668,7 +683,7 @@ client_thread( void *vparams)
                errx(1, "rx_Write failed to send size (err %d)", rx_Error(call));
 
            DBFPRINT(("sending(%d) ", params->bytes));
-           if (sendbytes(call, params->bytes))
+           if (do_sendbytes(call, params->bytes))
                errx(1, "sendbytes (err %d)", rx_Error(call));
 
            if (rx_Read32(call, &data) != 4)
@@ -683,20 +698,20 @@ client_thread( void *vparams)
        case RX_PERF_RPC:
            DBFPRINT(("commands "));
 
-           data = htonl(params->sendtimes);
+           data = htonl(params->sendbytes);
            if (rx_Write32(call, &data) != 4)
                errx(1, "rx_Write failed to send command (err %d)", rx_Error(call));
 
-           data = htonl(params->recvtimes);
+           data = htonl(params->readbytes);
            if (rx_Write32(call, &data) != 4)
                errx(1, "rx_Write failed to send command (err %d)", rx_Error(call));
 
-           DBFPRINT(("send(%d) ", params->sendtimes));
-           if (sendbytes(call, params->sendtimes))
+           DBFPRINT(("send(%d) ", params->sendbytes));
+           if (do_sendbytes(call, params->sendbytes))
                errx(1, "sendbytes (err %d)", rx_Error(call));
 
-           DBFPRINT(("recv(%d) ", params->recvtimes));
-           if (readbytes(call, params->recvtimes))
+           DBFPRINT(("recv(%d) ", params->readbytes));
+           if (do_readbytes(call, params->readbytes))
                errx(1, "sendbytes (err %d)", rx_Error(call));
 
            if (rx_Read32(call, &data) != 4)
@@ -727,11 +742,11 @@ client_thread( void *vparams)
                size = ntohl(readwrite[j]) * sizeof(afs_uint32);
 
                if (readp) {
-                   if (readbytes(call, size))
+                   if (do_readbytes(call, size))
                        errx(1, "sendbytes (err %d)", rx_Error(call));
                    DBFPRINT(("read\n"));
                } else {
-                   if (sendbytes(call, size))
+                   if (do_sendbytes(call, size))
                        errx(1, "sendbytes (err %d)", rx_Error(call));
                    DBFPRINT(("send\n"));
                }
@@ -757,7 +772,7 @@ client_thread( void *vparams)
 
 static void
 do_client(const char *server, short port, char *filename, afs_int32 command,
-         afs_int32 times, afs_int32 bytes, afs_int32 sendtimes, afs_int32 recvtimes,
+         afs_int32 times, afs_int32 bytes, afs_int32 sendbytes, afs_int32 readbytes,
           int dumpstats, int nojumbo, int maxmtu, int maxwsize, int minpeertimeout,
           int udpbufsz, int nostats, int hotthread, int threads)
 {
@@ -767,7 +782,7 @@ do_client(const char *server, short port, char *filename, afs_int32 command,
     int secureindex;
     int ret;
     char stamp[2048];
-    struct client_data params;
+    struct client_data *params;
 
 #ifdef AFS_PTHREAD_ENV
     int i;
@@ -776,6 +791,9 @@ do_client(const char *server, short port, char *filename, afs_int32 command,
     void *status;
 #endif
 
+    params = malloc(sizeof(struct client_data));
+    memset(params, 0, sizeof(struct client_data));
+
 #ifdef AFS_NT40_ENV
     if (afs_winsockInit() < 0) {
        printf("Can't initialize winsock.\n");
@@ -791,6 +809,8 @@ do_client(const char *server, short port, char *filename, afs_int32 command,
 
     addr = str2addr(server);
 
+    rx_SetUdpBufSize(udpbufsz);
+
     ret = rx_Init(0);
     if (ret)
        errx(1, "rx_Init failed");
@@ -809,31 +829,13 @@ do_client(const char *server, short port, char *filename, afs_int32 command,
     if (minpeertimeout)
         rx_SetMinPeerTimeout(minpeertimeout);
 
-    rx_SetUdpBufSize(udpbufsz);
 
     get_sec(0, &secureobj, &secureindex);
 
-    conn = rx_NewConnection(addr, htons(port), RX_SERVER_ID, secureobj, secureindex);
-    if (conn == NULL)
-       errx(1, "failed to contact server");
-
-#ifdef AFS_PTHREAD_ENV
-    pthread_attr_init(&tattr);
-    pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_JOINABLE);
-#endif
-
-    params.conn = conn;
-    params.filename = filename;
-    params.command = command;
-    params.times = times;
-    params.bytes = bytes;
-    params.sendtimes = sendtimes;
-    params.recvtimes = recvtimes;
-
     switch (command) {
     case RX_PERF_RPC:
-        sprintf(stamp, "RPC: threads\t%d, times\t%d, writes\t%d, reads\t%d",
-                 threads, times, sendtimes, recvtimes);
+        sprintf(stamp, "RPC: threads\t%d, times\t%d, write bytes\t%d, read bytes\t%d",
+                 threads, times, sendbytes, readbytes);
         break;
     case RX_PERF_RECV:
         sprintf(stamp, "RECV: threads\t%d, times\t%d, bytes\t%d",
@@ -849,13 +851,40 @@ do_client(const char *server, short port, char *filename, afs_int32 command,
         break;
     }
 
+    conn = rx_NewConnection(addr, htons(port), RX_SERVER_ID, secureobj, secureindex);
+    if (conn == NULL)
+       errx(1, "failed to contact server");
+
+#ifdef AFS_PTHREAD_ENV
+    pthread_attr_init(&tattr);
+    pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_JOINABLE);
+#endif
+
+    params->conn = conn;
+    params->filename = filename;
+    params->command = command;
+    params->times = times;
+    params->bytes = bytes;
+    params->sendbytes = sendbytes;
+    params->readbytes = readbytes;
+
     start_timer();
 
 #ifdef AFS_PTHREAD_ENV
-    for ( i=0; i<threads; i++)
-        pthread_create(&thread[i], &tattr, client_thread, &params);
+    for ( i=0; i<threads; i++) {
+        pthread_create(&thread[i], &tattr, client_thread, params);
+        if ( (i + 1) % RX_MAXCALLS == 0 ) {
+            conn = rx_NewConnection(addr, htons(port), RX_SERVER_ID, secureobj, secureindex);
+            if (conn != NULL) {
+                struct client_data *new_params = malloc(sizeof(struct client_data));
+                memcpy(new_params, params, sizeof(struct client_data));
+                new_params->conn = conn;
+                params = new_params;
+            }
+        }
+    }
 #else
-        client_thread(&params);
+        client_thread(params);
 #endif
 
 #ifdef AFS_PTHREAD_ENV
@@ -875,6 +904,8 @@ do_client(const char *server, short port, char *filename, afs_int32 command,
 #ifdef AFS_PTHREAD_ENV
     pthread_attr_destroy(&tattr);
 #endif
+
+    free(params);
 }
 
 static void
@@ -919,7 +950,7 @@ rxperf_server(int argc, char **argv)
     char *ptr;
     int ch;
 
-    while ((ch = getopt(argc, argv, "r:d:p:P:w:W:HNjm:u:4:s:S")) != -1) {
+    while ((ch = getopt(argc, argv, "r:d:p:P:w:W:HNjm:u:4:s:SV")) != -1) {
        switch (ch) {
        case 'd':
 #ifdef RXDEBUG
@@ -985,6 +1016,9 @@ rxperf_server(int argc, char **argv)
            if (ptr && *ptr != '\0')
                errx(1, "can't resolve upd buffer size (Kbytes)");
            break;
+       case 'V':
+           use_rx_readv = 1;
+           break;
        case 'W':
            maxwsize = strtol(optarg, &ptr, 0);
            if (ptr && *ptr != '\0')
@@ -1019,8 +1053,8 @@ rxperf_client(int argc, char **argv)
     short port = DEFAULT_PORT;
     char *filename = NULL;
     afs_int32 cmd;
-    int sendtimes = 3;
-    int recvtimes = 30;
+    int sendbytes = 3;
+    int readbytes = 30;
     int times = 100;
     int dumpstats = 0;
     int nojumbo = 0;
@@ -1036,7 +1070,7 @@ rxperf_client(int argc, char **argv)
 
     cmd = RX_PERF_UNKNOWN;
 
-    while ((ch = getopt(argc, argv, "T:S:R:b:c:d:p:P:r:s:w:W:f:HDNjm:u:4:t:")) != -1) {
+    while ((ch = getopt(argc, argv, "T:S:R:b:c:d:p:P:r:s:w:W:f:HDNjm:u:4:t:V")) != -1) {
        switch (ch) {
        case 'b':
            bytes = strtol(optarg, &ptr, 0);
@@ -1087,6 +1121,9 @@ rxperf_client(int argc, char **argv)
            if (host == NULL)
                err(1, "strdup");
            break;
+       case 'V':
+           use_rx_readv = 1;
+           break;
        case 'w':
            rxwrite_size = strtol(optarg, &ptr, 0);
            if (ptr != 0 && ptr[0] != '\0')
@@ -1106,12 +1143,12 @@ rxperf_client(int argc, char **argv)
                errx(1, "can't resolve number of times to execute rpc");
            break;
        case 'S':
-           sendtimes = strtol(optarg, &ptr, 0);
+           sendbytes = strtol(optarg, &ptr, 0);
            if (ptr && *ptr != '\0')
                errx(1, "can't resolve number of bytes to send");
            break;
        case 'R':
-           recvtimes = strtol(optarg, &ptr, 0);
+           readbytes = strtol(optarg, &ptr, 0);
            if (ptr && *ptr != '\0')
                errx(1, "can't resolve number of bytes to receive");
            break;
@@ -1171,8 +1208,8 @@ rxperf_client(int argc, char **argv)
     if (cmd == RX_PERF_UNKNOWN)
        errx(1, "no command given to the client");
 
-    do_client(host, port, filename, cmd, times, bytes, sendtimes,
-             recvtimes, dumpstats, nojumbo, maxmtu, maxwsize, minpeertimeout,
+    do_client(host, port, filename, cmd, times, bytes, sendbytes,
+             readbytes, dumpstats, nojumbo, maxmtu, maxwsize, minpeertimeout,
               udpbufsz, nostats, hotthreads, threads);
 
     return 0;