2 * Copyright (c) 2010, Linux Box Corporation.
5 * Portions Copyright (c) 2007, Hartmut Reuter,
6 * RZG, Max-Planck-Institut f. Plasmaphysik.
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are met:
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in
16 * the documentation and/or other materials provided with the
19 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
20 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
21 * AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
22 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
26 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #include <afsconfig.h>
32 #include <afs/param.h>
38 #include "rpc_test_procs.h"
41 #include <sys/types.h>
46 #include <sys/param.h>
48 #include <sys/ioctl.h>
49 #include <sys/socket.h>
50 #include <netinet/in.h>
51 #include <arpa/inet.h>
60 #include <WINNT/afsevent.h>
63 #include <afs/venus.h>
67 #include <afs/afsint.h>
68 #define FSINT_COMMON_XG 1
75 #include <afs/cellconfig.h>
77 #include <afs/com_err.h>
85 #include <sys/malloc.h>
89 #include <afs/errors.h>
90 #include <afs/sys_prototypes.h>
91 #include <rx/rx_prototypes.h>
92 #ifdef AFS_PTHREAD_ENV
96 extern const char *prog;
97 const int ctx_key = 1;
100 #define RPC_TEST_GLOBAL_RX_INIT 1
102 #undef RPC_TEST_GLOBAL_RX_INIT
105 const afs_uint32 fs_port = 7000;
107 typedef struct rpc_test_pkg_params {
109 pthread_mutexattr_t mtx_attrs;
110 afs_uint32 cb_next_port;
112 } rpc_test_pkg_params;
113 static rpc_test_pkg_params rpc_test_params;
115 afs_int32 rpc_test_PkgInit()
118 static afs_uint32 rpc_test_initialized = 0; /* once */
120 if (!rpc_test_initialized) {
121 rpc_test_initialized = 1;
123 printf("%s: rpc_test_PkgInit: package already initialized\n");
128 code = pthread_mutexattr_init(&rpc_test_params.mtx_attrs);
130 printf("%s: rpc_test_PkgInit: pthread_mutexattr_init failed\n", prog);
133 code = pthread_mutex_init(&rpc_test_params.mtx, &rpc_test_params.mtx_attrs);
135 printf("%s: rpc_test_PkgInit: pthread_mutex_init failed\n", prog);
140 /* start connection sequence */
141 rpc_test_params.next_cno = 1;
143 /* set the starting port in sequence */
144 rpc_test_params.cb_next_port = 7105;
146 #if defined(RPC_TEST_GLOBAL_RX_INIT)
152 } /* rpc_test_PkgInit */
155 init_callback_service_lwp(void *arg)
157 struct rx_securityClass *sc;
158 struct rx_service *svc;
161 rpc_test_request_ctx *ctx = (rpc_test_request_ctx *) arg;
163 printf("%s: init_callback_service_lwp: listen_addr: %s "
164 "(%d) cb_port: %d\n",
165 prog, ctx->cb_listen_addr_s, ctx->cb_listen_addr.addr_in[0],
168 sc = (struct rx_securityClass *) rxnull_NewServerSecurityObject();
170 fprintf(stderr,"rxnull_NewServerSecurityObject failed for callback "
175 #if defined(RPC_TEST_GLOBAL_RX_INIT)
176 svc = rx_NewServiceHost(htonl(INADDR_ANY), htons(ctx->cb_port), 1,
177 ctx->cb_svc_name, &sc, 1, RXAFSCB_ExecuteRequest);
179 svc = rx_NewService(0, 1, ctx->cb_svc_name, &sc, 1, RXAFSCB_ExecuteRequest);
182 rx_SetServiceSpecific(svc, ctx_key, ctx);
185 fprintf(stderr,"rx_NewServiceHost failed for callback service\n");
189 /* XXX stash service so we can hijack its rx_socket when inititiating
193 /* release pkg mutex before entering rx processing loop */
194 pthread_mutex_unlock(&rpc_test_params.mtx);
198 printf("%s: init_callback_service_lwp: finished");
202 } /* callback_service_lwp */
204 afs_int32 init_callback_service(rpc_test_request_ctx *ctx)
207 pthread_attr_t tattr;
210 afs_uuid_create(&(ctx->cb_listen_addr.uuid));
212 #if !defined(RPC_TEST_GLOBAL_RX_INIT)
214 code = rx_InitHost(ctx->cb_listen_addr.addr_in[0],
215 (int) htons(ctx->cb_port));
217 code = rx_Init((int) htons(ctx->cb_port));
219 #endif /* RPC_TEST_GLOBAL_RX_INIT */
221 assert(pthread_attr_init(&tattr) == 0);
222 assert(pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) == 0);
223 assert(pthread_create(&tid, &tattr, init_callback_service_lwp, ctx) == 0);
227 } /* init_callback_service */
229 afs_int32 init_fs_channel(rpc_test_request_ctx **octx, char *cb_if,
230 char *listen_addr_s, char *prefix, char *fs_addr_s,
234 rpc_test_request_ctx *ctx;
237 afs_int32 sslen = sizeof(struct sockaddr);
240 ctx = *octx = (rpc_test_request_ctx *) malloc(sizeof(rpc_test_request_ctx));
241 memset(ctx, 0, sizeof(rpc_test_request_ctx));
243 /* initialize a local mutex */
244 code = pthread_mutex_init(&ctx->mtx, &rpc_test_params.mtx_attrs);
246 /* lock package before rx setup--which has global deps, atm */
247 pthread_mutex_lock(&rpc_test_params.mtx);
249 ctx->cno = rpc_test_params.next_cno++;
252 /* afscbint (server) */
253 sprintf(ctx->cb_svc_name, "cb_%d", ctx->cno);
254 sprintf(ctx->cb_if_s, cb_if);
255 sprintf(ctx->cb_listen_addr_s, listen_addr_s);
256 sprintf(ctx->cb_prefix_s, prefix);
257 sprintf(ctx->fs_addr_s, fs_addr_s);
259 #if defined(RPC_TEST_ADD_ADDRESSES)
260 #if defined(AFS_LINUX26_ENV)
261 sprintf(cmd, "ip addr add %s/%s dev %s label %s", listen_addr_s, prefix,
265 #endif /* RPC_TEST_ADD_ADDRESSES */
268 pthread_mutex_lock(&ctx->mtx);
271 ctx->cb_port = rpc_test_params.cb_next_port++;
272 ctx->cb_listen_addr.numberOfInterfaces = 1;
275 code = WSAStringToAddressA(listen_addr_s, AF_INET, NULL,
276 (struct sockaddr*) &(ctx->cb_listen_addr), &sslen);
278 code = inet_pton(AF_INET, listen_addr_s,
279 (void*) &(ctx->cb_listen_addr.addr_in[0]));
282 code = init_callback_service(ctx /* LOCKED, && rpc_test_params->mtx LOCKED */);
287 code = WSAStringToAddressA(fs_addr_s, AF_INET, NULL,
288 (struct sockaddr*) &(ctx->fs_addr.addr_in[0]), &sslen);
290 code = inet_pton(AF_INET, fs_addr_s, (void*) &(ctx->fs_addr.addr_in[0]));
292 ctx->sc = rxnull_NewClientSecurityObject();
293 ctx->sc_index = RX_SECIDX_NULL;
294 ctx->conn = rx_NewConnection(ctx->fs_addr.addr_in[0], (int) htons(fs_port),
295 1, ctx->sc, ctx->sc_index);
298 pthread_mutex_unlock(&ctx->mtx);
303 } /* init_fs_channel */
305 /* XXX use the pkg lock to protect the state of rx_socket for
306 * the duration of the call, switching it out for the stashed
307 * rx_socket created by rx_NewService for this channel */
308 #define RXCALL_WITH_SOCK(code, ctx, call) \
310 osi_socket prev_rx_socket; \
311 pthread_mutex_lock(&rpc_test_params.mtx); \
312 prev_rx_socket = rx_socket; \
313 rx_socket = ctx->svc->socket; \
315 rx_socket = prev_rx_socket; \
316 pthread_mutex_unlock(&rpc_test_params.mtx); \
320 rpc_test_afs_fetch_status(rpc_test_request_ctx *ctx, AFSFid *fid,
321 AFSFetchStatus *outstatus)
323 struct rx_call *tcall;
324 struct AFSVolSync tsync;
325 struct AFSCallBack tcb;
328 RXCALL_WITH_SOCK(code, ctx,
329 (RXAFS_FetchStatus(ctx->conn, fid, outstatus, &tcb, &tsync)));
333 } /* rpc_test_afs_fetch_status */
336 rpc_test_afs_store_status(rpc_test_request_ctx *ctx, AFSFid *fid,
337 AFSStoreStatus *instatus, AFSFetchStatus *outstatus)
339 struct rx_call *tcall;
340 struct AFSVolSync tsync;
343 RXCALL_WITH_SOCK(code, ctx,
344 (RXAFS_StoreStatus(ctx->conn, fid, instatus, outstatus, &tsync)));
348 } /* rpc_test_afs_fetch_status */
350 #if defined(AFS_BYTE_RANGE_FLOCKS)
351 afs_int32 rpc_test_afs_set_byterangelock(rpc_test_request_ctx *ctx,
352 AFSByteRangeLock * lock)
354 struct rx_call *tcall;
357 RXCALL_WITH_SOCK(code, ctx,
358 (RXAFS_SetByteRangeLock(ctx->conn, lock)));
362 } /* rpc_test_afs_set_byterangelock */
364 afs_int32 rpc_test_afs_release_byterangelock(rpc_test_request_ctx *ctx,
365 AFSByteRangeLock * lock)
367 struct rx_call *tcall;
370 RXCALL_WITH_SOCK(code, ctx,
371 (RXAFS_ReleaseByteRangeLock(ctx->conn, lock)));
375 } /* rpc_test_afs_release_byterangelock */
377 afs_int32 rpc_test_afs_upgrade_byterangelock(rpc_test_request_ctx *ctx,
378 AFSByteRangeLock * lock)
382 /* TODO: implement */
386 } /* rpc_test_afs_upgrade_byterangelock */
388 afs_int32 rpc_test_afs_downgrade_byterangelock(rpc_test_request_ctx *ctx,
389 AFSByteRangeLock * Lock)
393 /* TODO: implement */
397 } /* rpc_test_afs_downgrade_byterangelock */
398 #endif /* AFS_BYTE_RANGE_FLOCKS */
401 destroy_fs_channel(rpc_test_request_ctx *ctx)
405 #if defined(RPC_TEST_ADD_ADDRESSES)
406 #if defined(AFS_LINUX26_ENV)
407 sprintf(cmd, "ip addr del %s/%s dev %s label %s", ctx->cb_listen_addr_s,
408 ctx->cb_prefix_s, ctx->cb_if_s, ctx->cb_if_s);
411 #endif /* RPC_TEST_ADD_ADDRESSES */
416 } /* destroy_fs_channel */
419 rpc_test_PkgShutdown()
423 } /* rpc_test_PkgShutdown */