2 * Copyright (c) 2010, Linux Box Corporation.
5 * Portions Copyright (c) 2007, Hartmut Reuter,
6 * RZG, Max-Planck-Institut f. Plasmaphysik.
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are met:
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in
16 * the documentation and/or other materials provided with the
19 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
20 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
21 * AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
22 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
26 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #include <afsconfig.h>
32 #include <afs/param.h>
35 #include "rpc_test_procs.h"
38 #include <sys/types.h>
43 #include <sys/param.h>
45 #include <sys/ioctl.h>
46 #include <sys/socket.h>
47 #include <netinet/in.h>
48 #include <arpa/inet.h>
57 #include <WINNT/afsevent.h>
60 #include <afs/venus.h>
64 #include <afs/afsint.h>
65 #define FSINT_COMMON_XG 1
72 #include <afs/cellconfig.h>
74 #include <afs/com_err.h>
82 #include <sys/malloc.h>
86 #include <afs/errors.h>
87 #include <afs/sys_prototypes.h>
88 #include <rx_prototypes.h>
89 #ifdef AFS_PTHREAD_ENV
93 extern const char *prog;
94 const int ctx_key = 1;
97 #define RPC_TEST_GLOBAL_RX_INIT 1
99 #undef RPC_TEST_GLOBAL_RX_INIT
102 const afs_uint32 fs_port = 7000;
104 typedef struct rpc_test_pkg_params {
106 pthread_mutexattr_t mtx_attrs;
107 afs_uint32 cb_next_port;
109 } rpc_test_pkg_params;
110 static rpc_test_pkg_params rpc_test_params;
112 afs_int32 rpc_test_PkgInit()
115 static afs_uint32 rpc_test_initialized = 0; /* once */
117 if (!rpc_test_initialized) {
118 rpc_test_initialized = 1;
120 printf("%s: rpc_test_PkgInit: package already initialized\n");
125 code = pthread_mutexattr_init(&rpc_test_params.mtx_attrs);
127 printf("%s: rpc_test_PkgInit: pthread_mutexattr_init failed\n", prog);
130 code = pthread_mutex_init(&rpc_test_params.mtx, &rpc_test_params.mtx_attrs);
132 printf("%s: rpc_test_PkgInit: pthread_mutex_init failed\n", prog);
137 /* start connection sequence */
138 rpc_test_params.next_cno = 1;
140 /* set the starting port in sequence */
141 rpc_test_params.cb_next_port = 7105;
143 #if defined(RPC_TEST_GLOBAL_RX_INIT)
149 } /* rpc_test_PkgInit */
152 init_callback_service_lwp(void *arg)
154 struct rx_securityClass *sc;
155 struct rx_service *svc;
158 rpc_test_request_ctx *ctx = (rpc_test_request_ctx *) arg;
160 printf("%s: init_callback_service_lwp: listen_addr: %s "
161 "(%d) cb_port: %d\n",
162 prog, ctx->cb_listen_addr_s, ctx->cb_listen_addr.addr_in[0],
165 sc = (struct rx_securityClass *) rxnull_NewServerSecurityObject();
167 fprintf(stderr,"rxnull_NewServerSecurityObject failed for callback "
172 #if defined(RPC_TEST_GLOBAL_RX_INIT)
173 svc = rx_NewServiceHost(htonl(INADDR_ANY), htons(ctx->cb_port), 1,
174 ctx->cb_svc_name, &sc, 1, RXAFSCB_ExecuteRequest);
176 svc = rx_NewService(0, 1, ctx->cb_svc_name, &sc, 1, RXAFSCB_ExecuteRequest);
179 rx_SetServiceSpecific(svc, ctx_key, ctx);
182 fprintf(stderr,"rx_NewServiceHost failed for callback service\n");
186 /* XXX stash service so we can hijack its rx_socket when inititiating
190 /* release pkg mutex before entering rx processing loop */
191 pthread_mutex_unlock(&rpc_test_params.mtx);
195 printf("%s: init_callback_service_lwp: finished");
199 } /* callback_service_lwp */
201 afs_int32 init_callback_service(rpc_test_request_ctx *ctx)
204 pthread_attr_t tattr;
207 afs_uuid_create(&(ctx->cb_listen_addr.uuid));
209 #if !defined(RPC_TEST_GLOBAL_RX_INIT)
211 code = rx_InitHost(ctx->cb_listen_addr.addr_in[0],
212 (int) htons(ctx->cb_port));
214 code = rx_Init((int) htons(ctx->cb_port));
216 #endif /* RPC_TEST_GLOBAL_RX_INIT */
218 assert(pthread_attr_init(&tattr) == 0);
219 assert(pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) == 0);
220 assert(pthread_create(&tid, &tattr, init_callback_service_lwp, ctx) == 0);
224 } /* init_callback_service */
226 afs_int32 init_fs_channel(rpc_test_request_ctx **octx, char *cb_if,
227 char *listen_addr_s, char *prefix, char *fs_addr_s,
231 rpc_test_request_ctx *ctx;
234 afs_int32 sslen = sizeof(struct sockaddr);
237 ctx = *octx = (rpc_test_request_ctx *) malloc(sizeof(rpc_test_request_ctx));
238 memset(ctx, 0, sizeof(rpc_test_request_ctx));
240 /* initialize a local mutex */
241 code = pthread_mutex_init(&ctx->mtx, &rpc_test_params.mtx_attrs);
243 /* lock package before rx setup--which has global deps, atm */
244 pthread_mutex_lock(&rpc_test_params.mtx);
246 ctx->cno = rpc_test_params.next_cno++;
249 /* afscbint (server) */
250 sprintf(ctx->cb_svc_name, "cb_%d", ctx->cno);
251 sprintf(ctx->cb_if_s, cb_if);
252 sprintf(ctx->cb_listen_addr_s, listen_addr_s);
253 sprintf(ctx->cb_prefix_s, prefix);
254 sprintf(ctx->fs_addr_s, fs_addr_s);
256 #if defined(RPC_TEST_ADD_ADDRESSES)
257 #if defined(AFS_LINUX26_ENV)
258 sprintf(cmd, "ip addr add %s/%s dev %s label %s", listen_addr_s, prefix,
262 #endif /* RPC_TEST_ADD_ADDRESSES */
265 pthread_mutex_lock(&ctx->mtx);
268 ctx->cb_port = rpc_test_params.cb_next_port++;
269 ctx->cb_listen_addr.numberOfInterfaces = 1;
272 code = WSAStringToAddressA(listen_addr_s, AF_INET, NULL,
273 (struct sockaddr*) &(ctx->cb_listen_addr), &sslen);
275 code = inet_pton(AF_INET, listen_addr_s,
276 (void*) &(ctx->cb_listen_addr.addr_in[0]));
279 code = init_callback_service(ctx /* LOCKED, && rpc_test_params->mtx LOCKED */);
284 code = WSAStringToAddressA(fs_addr_s, AF_INET, NULL,
285 (struct sockaddr*) &(ctx->fs_addr.addr_in[0]), &sslen);
287 code = inet_pton(AF_INET, fs_addr_s, (void*) &(ctx->fs_addr.addr_in[0]));
289 ctx->sc = rxnull_NewClientSecurityObject();
290 ctx->sc_index = RX_SECIDX_NULL;
291 ctx->conn = rx_NewConnection(ctx->fs_addr.addr_in[0], (int) htons(fs_port),
292 1, ctx->sc, ctx->sc_index);
295 pthread_mutex_unlock(&ctx->mtx);
300 } /* init_fs_channel */
302 /* XXX use the pkg lock to protect the state of rx_socket for
303 * the duration of the call, switching it out for the stashed
304 * rx_socket created by rx_NewService for this channel */
305 #define RXCALL_WITH_SOCK(code, ctx, call) \
307 osi_socket prev_rx_socket; \
308 pthread_mutex_lock(&rpc_test_params.mtx); \
309 prev_rx_socket = rx_socket; \
310 rx_socket = ctx->svc->socket; \
312 rx_socket = prev_rx_socket; \
313 pthread_mutex_unlock(&rpc_test_params.mtx); \
317 rpc_test_afs_fetch_status(rpc_test_request_ctx *ctx, AFSFid *fid,
318 AFSFetchStatus *outstatus)
320 struct rx_call *tcall;
321 struct AFSVolSync tsync;
322 struct AFSCallBack tcb;
325 RXCALL_WITH_SOCK(code, ctx,
326 (RXAFS_FetchStatus(ctx->conn, fid, outstatus, &tcb, &tsync)));
330 } /* rpc_test_afs_fetch_status */
333 rpc_test_afs_store_status(rpc_test_request_ctx *ctx, AFSFid *fid,
334 AFSStoreStatus *instatus, AFSFetchStatus *outstatus)
336 struct rx_call *tcall;
337 struct AFSVolSync tsync;
340 RXCALL_WITH_SOCK(code, ctx,
341 (RXAFS_StoreStatus(ctx->conn, fid, instatus, outstatus, &tsync)));
345 } /* rpc_test_afs_fetch_status */
347 #if defined(AFS_BYTE_RANGE_FLOCKS)
348 afs_int32 rpc_test_afs_set_byterangelock(rpc_test_request_ctx *ctx,
349 AFSByteRangeLock * lock)
351 struct rx_call *tcall;
354 RXCALL_WITH_SOCK(code, ctx,
355 (RXAFS_SetByteRangeLock(ctx->conn, lock)));
359 } /* rpc_test_afs_set_byterangelock */
361 afs_int32 rpc_test_afs_release_byterangelock(rpc_test_request_ctx *ctx,
362 AFSByteRangeLock * lock)
364 struct rx_call *tcall;
367 RXCALL_WITH_SOCK(code, ctx,
368 (RXAFS_ReleaseByteRangeLock(ctx->conn, lock)));
372 } /* rpc_test_afs_release_byterangelock */
374 afs_int32 rpc_test_afs_upgrade_byterangelock(rpc_test_request_ctx *ctx,
375 AFSByteRangeLock * lock)
379 /* TODO: implement */
383 } /* rpc_test_afs_upgrade_byterangelock */
385 afs_int32 rpc_test_afs_downgrade_byterangelock(rpc_test_request_ctx *ctx,
386 AFSByteRangeLock * Lock)
390 /* TODO: implement */
394 } /* rpc_test_afs_downgrade_byterangelock */
395 #endif /* AFS_BYTE_RANGE_FLOCKS */
398 destroy_fs_channel(rpc_test_request_ctx *ctx)
402 #if defined(RPC_TEST_ADD_ADDRESSES)
403 #if defined(AFS_LINUX26_ENV)
404 sprintf(cmd, "ip addr del %s/%s dev %s label %s", ctx->cb_listen_addr_s,
405 ctx->cb_prefix_s, ctx->cb_if_s, ctx->cb_if_s);
408 #endif /* RPC_TEST_ADD_ADDRESSES */
413 } /* destroy_fs_channel */
416 rpc_test_PkgShutdown()
420 } /* rpc_test_PkgShutdown */