2 * Copyright (c) 2010, Linux Box Corporation.
5 * Portions Copyright (c) 2007, Hartmut Reuter,
6 * RZG, Max-Planck-Institut f. Plasmaphysik.
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are met:
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in
16 * the documentation and/or other materials provided with the
19 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
20 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
21 * AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
22 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
26 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #include <afsconfig.h>
32 #include <afs/param.h>
38 #include "rpc_test_procs.h"
42 #include <WINNT/afsevent.h>
45 #include <afs/venus.h>
48 #include <afs/afsint.h>
49 #define FSINT_COMMON_XG 1
53 #include <afs/cellconfig.h>
55 #include <afs/com_err.h>
59 #include <afs/errors.h>
60 #include <afs/sys_prototypes.h>
61 #include <rx/rx_prototypes.h>
62 #ifdef AFS_PTHREAD_ENV
66 extern const char *prog;
67 const int ctx_key = 1;
70 #define RPC_TEST_GLOBAL_RX_INIT 1
72 #undef RPC_TEST_GLOBAL_RX_INIT
75 const afs_uint32 fs_port = 7000;
77 typedef struct rpc_test_pkg_params {
79 pthread_mutexattr_t mtx_attrs;
80 afs_uint32 cb_next_port;
82 } rpc_test_pkg_params;
83 static rpc_test_pkg_params rpc_test_params;
85 afs_int32 rpc_test_PkgInit(void)
88 static afs_uint32 rpc_test_initialized = 0; /* once */
90 if (!rpc_test_initialized) {
91 rpc_test_initialized = 1;
93 printf("%s: rpc_test_PkgInit: package already initialized\n", prog);
98 code = pthread_mutexattr_init(&rpc_test_params.mtx_attrs);
100 printf("%s: rpc_test_PkgInit: pthread_mutexattr_init failed\n", prog);
103 code = pthread_mutex_init(&rpc_test_params.mtx, &rpc_test_params.mtx_attrs);
105 printf("%s: rpc_test_PkgInit: pthread_mutex_init failed\n", prog);
110 /* start connection sequence */
111 rpc_test_params.next_cno = 1;
113 /* set the starting port in sequence */
114 rpc_test_params.cb_next_port = 7105;
116 #if defined(RPC_TEST_GLOBAL_RX_INIT)
122 } /* rpc_test_PkgInit */
125 init_callback_service_lwp(void *arg)
127 struct rx_securityClass *sc;
128 struct rx_service *svc;
130 rpc_test_request_ctx *ctx = (rpc_test_request_ctx *) arg;
132 printf("%s: init_callback_service_lwp: listen_addr: %s "
133 "(%d) cb_port: %d\n",
134 prog, ctx->cb_listen_addr_s, ctx->cb_listen_addr.addr_in[0],
137 sc = (struct rx_securityClass *) rxnull_NewServerSecurityObject();
139 fprintf(stderr,"rxnull_NewServerSecurityObject failed for callback "
144 #if defined(RPC_TEST_GLOBAL_RX_INIT)
145 svc = rx_NewServiceHost(htonl(INADDR_ANY), htons(ctx->cb_port), 1,
146 ctx->cb_svc_name, &sc, 1, RXAFSCB_ExecuteRequest);
148 svc = rx_NewService(0, 1, ctx->cb_svc_name, &sc, 1, RXAFSCB_ExecuteRequest);
151 rx_SetServiceSpecific(svc, ctx_key, ctx);
154 fprintf(stderr,"rx_NewServiceHost failed for callback service\n");
158 /* XXX stash service so we can hijack its rx_socket when inititiating
162 /* release pkg mutex before entering rx processing loop */
163 pthread_mutex_unlock(&rpc_test_params.mtx);
167 printf("%s: init_callback_service_lwp: finished", prog);
171 } /* callback_service_lwp */
173 afs_int32 init_callback_service(rpc_test_request_ctx *ctx)
176 pthread_attr_t tattr;
179 afs_uuid_create(&(ctx->cb_listen_addr.uuid));
181 #if !defined(RPC_TEST_GLOBAL_RX_INIT)
182 code = rx_Init((int) htons(ctx->cb_port));
183 #endif /* RPC_TEST_GLOBAL_RX_INIT */
185 assert(pthread_attr_init(&tattr) == 0);
186 assert(pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) == 0);
187 assert(pthread_create(&tid, &tattr, init_callback_service_lwp, ctx) == 0);
191 } /* init_callback_service */
193 afs_int32 init_fs_channel(rpc_test_request_ctx **octx, char *cb_if,
194 char *listen_addr_s, char *prefix, char *fs_addr_s,
197 rpc_test_request_ctx *ctx;
200 afs_int32 sslen = sizeof(struct sockaddr);
203 ctx = *octx = (rpc_test_request_ctx *) malloc(sizeof(rpc_test_request_ctx));
204 memset(ctx, 0, sizeof(rpc_test_request_ctx));
206 /* initialize a local mutex */
207 code = pthread_mutex_init(&ctx->mtx, &rpc_test_params.mtx_attrs);
209 /* lock package before rx setup--which has global deps, atm */
210 pthread_mutex_lock(&rpc_test_params.mtx);
212 ctx->cno = rpc_test_params.next_cno++;
215 /* afscbint (server) */
216 sprintf(ctx->cb_svc_name, "cb_%d", ctx->cno);
217 sprintf(ctx->cb_if_s, "%s", cb_if);
218 sprintf(ctx->cb_listen_addr_s, "%s", listen_addr_s);
219 sprintf(ctx->cb_prefix_s, "%s", prefix);
220 sprintf(ctx->fs_addr_s, "%s", fs_addr_s);
222 #if defined(RPC_TEST_ADD_ADDRESSES)
223 #if defined(AFS_LINUX26_ENV)
224 sprintf(cmd, "ip addr add %s/%s dev %s label %s", listen_addr_s, prefix,
228 #endif /* RPC_TEST_ADD_ADDRESSES */
231 pthread_mutex_lock(&ctx->mtx);
234 ctx->cb_port = rpc_test_params.cb_next_port++;
235 ctx->cb_listen_addr.numberOfInterfaces = 1;
238 code = WSAStringToAddressA(listen_addr_s, AF_INET, NULL,
239 (struct sockaddr*) &(ctx->cb_listen_addr), &sslen);
241 code = inet_pton(AF_INET, listen_addr_s,
242 (void*) &(ctx->cb_listen_addr.addr_in[0]));
245 code = init_callback_service(ctx /* LOCKED, && rpc_test_params->mtx LOCKED */);
250 code = WSAStringToAddressA(fs_addr_s, AF_INET, NULL,
251 (struct sockaddr*) &(ctx->fs_addr.addr_in[0]), &sslen);
253 code = inet_pton(AF_INET, fs_addr_s, (void*) &(ctx->fs_addr.addr_in[0]));
255 ctx->sc = rxnull_NewClientSecurityObject();
256 ctx->sc_index = RX_SECIDX_NULL;
257 ctx->conn = rx_NewConnection(ctx->fs_addr.addr_in[0], (int) htons(fs_port),
258 1, ctx->sc, ctx->sc_index);
261 pthread_mutex_unlock(&ctx->mtx);
265 } /* init_fs_channel */
267 /* XXX use the pkg lock to protect the state of rx_socket for
268 * the duration of the call, switching it out for the stashed
269 * rx_socket created by rx_NewService for this channel */
270 #define RXCALL_WITH_SOCK(code, ctx, call) \
272 osi_socket prev_rx_socket; \
273 pthread_mutex_lock(&rpc_test_params.mtx); \
274 prev_rx_socket = rx_socket; \
275 rx_socket = ctx->svc->socket; \
277 rx_socket = prev_rx_socket; \
278 pthread_mutex_unlock(&rpc_test_params.mtx); \
282 rpc_test_afs_fetch_status(rpc_test_request_ctx *ctx, AFSFid *fid,
283 AFSFetchStatus *outstatus)
285 struct AFSVolSync tsync;
286 struct AFSCallBack tcb;
289 RXCALL_WITH_SOCK(code, ctx,
290 (RXAFS_FetchStatus(ctx->conn, fid, outstatus, &tcb, &tsync)));
294 } /* rpc_test_afs_fetch_status */
297 rpc_test_afs_store_status(rpc_test_request_ctx *ctx, AFSFid *fid,
298 AFSStoreStatus *instatus, AFSFetchStatus *outstatus)
300 struct AFSVolSync tsync;
303 RXCALL_WITH_SOCK(code, ctx,
304 (RXAFS_StoreStatus(ctx->conn, fid, instatus, outstatus, &tsync)));
308 } /* rpc_test_afs_fetch_status */
310 #if defined(AFS_BYTE_RANGE_FLOCKS)
311 afs_int32 rpc_test_afs_set_byterangelock(rpc_test_request_ctx *ctx,
312 AFSByteRangeLock * lock)
314 struct rx_call *tcall;
317 RXCALL_WITH_SOCK(code, ctx,
318 (RXAFS_SetByteRangeLock(ctx->conn, lock)));
322 } /* rpc_test_afs_set_byterangelock */
324 afs_int32 rpc_test_afs_release_byterangelock(rpc_test_request_ctx *ctx,
325 AFSByteRangeLock * lock)
327 struct rx_call *tcall;
330 RXCALL_WITH_SOCK(code, ctx,
331 (RXAFS_ReleaseByteRangeLock(ctx->conn, lock)));
335 } /* rpc_test_afs_release_byterangelock */
337 afs_int32 rpc_test_afs_upgrade_byterangelock(rpc_test_request_ctx *ctx,
338 AFSByteRangeLock * lock)
342 /* TODO: implement */
346 } /* rpc_test_afs_upgrade_byterangelock */
348 afs_int32 rpc_test_afs_downgrade_byterangelock(rpc_test_request_ctx *ctx,
349 AFSByteRangeLock * Lock)
353 /* TODO: implement */
357 } /* rpc_test_afs_downgrade_byterangelock */
358 #endif /* AFS_BYTE_RANGE_FLOCKS */
361 destroy_fs_channel(rpc_test_request_ctx *ctx)
364 #if defined(RPC_TEST_ADD_ADDRESSES)
365 #if defined(AFS_LINUX26_ENV)
366 sprintf(cmd, "ip addr del %s/%s dev %s label %s", ctx->cb_listen_addr_s,
367 ctx->cb_prefix_s, ctx->cb_if_s, ctx->cb_if_s);
370 #endif /* RPC_TEST_ADD_ADDRESSES */
375 } /* destroy_fs_channel */
378 rpc_test_PkgShutdown(void)
380 } /* rpc_test_PkgShutdown */