2 * Copyright 2006-2008, Sine Nomine Associates and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
11 * localhost interprocess communication for servers
13 * currently handled by a localhost socket
14 * (yes, this needs to be replaced someday)
18 #define FD_SETSIZE 65536
21 #include <afsconfig.h>
22 #include <afs/param.h>
28 #include <afs/afsint.h>
29 #include <afs/errors.h>
32 #include "daemon_com.h"
35 #include <afs/afssyscalls.h>
39 #include "partition.h"
41 #include <rx/rx_queue.h>
43 #ifdef USE_UNIX_SOCKETS
44 #include <afs/afsutil.h>
48 int (*V_BreakVolumeCallbacks) (VolumeId);
50 #define MAXHANDLERS 4 /* Up to 4 clients; must be at least 2, so that
51 * move = dump+restore can run on single server */
53 #define MAX_BIND_TRIES 5 /* Number of times to retry socket bind */
55 static int SYNC_ask_internal(SYNC_client_state * state, SYNC_command * com, SYNC_response * res);
59 * On AIX, connect() and bind() require use of SUN_LEN() macro;
60 * sizeof(struct sockaddr_un) will not suffice.
62 #if defined(AFS_AIX_ENV) && defined(USE_UNIX_SOCKETS)
63 #define AFS_SOCKADDR_LEN(sa) SUN_LEN(sa)
65 #define AFS_SOCKADDR_LEN(sa) sizeof(*sa)
69 /* daemon com SYNC general interfaces */
72 * fill in sockaddr structure.
74 * @param[in] endpoint pointer to sync endpoint object
75 * @param[out] addr pointer to sockaddr structure
77 * @post sockaddr structure populated using information from
81 SYNC_getAddr(SYNC_endpoint_t * endpoint, SYNC_sockaddr_t * addr)
83 #ifdef USE_UNIX_SOCKETS
84 char tbuffer[AFSDIR_PATH_MAX];
85 #endif /* USE_UNIX_SOCKETS */
87 memset(addr, 0, sizeof(*addr));
89 #ifdef USE_UNIX_SOCKETS
90 strcompose(tbuffer, AFSDIR_PATH_MAX, AFSDIR_SERVER_LOCAL_DIRPATH, "/",
91 endpoint->un, (char *)NULL);
92 addr->sun_family = AF_UNIX;
93 strncpy(addr->sun_path, tbuffer, (sizeof(struct sockaddr_un) - sizeof(short)));
94 #else /* !USE_UNIX_SOCKETS */
95 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
96 addr->sin_len = sizeof(struct sockaddr_in);
98 addr->sin_addr.s_addr = htonl(0x7f000001);
99 addr->sin_family = AF_INET; /* was localhost->h_addrtype */
100 addr->sin_port = htons(endpoint->in); /* XXXX htons not _really_ neccessary */
101 #endif /* !USE_UNIX_SOCKETS */
105 * get a socket descriptor of the appropriate domain.
107 * @param[in] endpoint pointer to sync endpoint object
109 * @return socket descriptor
111 * @post socket of domain specified in endpoint structure is created and
112 * returned to caller.
115 SYNC_getSock(SYNC_endpoint_t * endpoint)
118 opr_Verify((sd = socket(endpoint->domain, SOCK_STREAM, 0)) >= 0);
122 /* daemon com SYNC client interface */
125 * open a client connection to a sync server
127 * @param[in] state pointer to sync client handle
129 * @return operation status
132 * @note at present, this routine aborts rather than returning an error code
135 SYNC_connect(SYNC_client_state * state)
137 SYNC_sockaddr_t addr;
138 /* I can't believe the following is needed for localhost connections!! */
139 static time_t backoff[] =
140 { 3, 3, 3, 5, 5, 5, 7, 15, 16, 24, 32, 40, 48, 0 };
141 time_t *timeout = &backoff[0];
143 if (state->fd != OSI_NULLSOCKET) {
147 SYNC_getAddr(&state->endpoint, &addr);
150 state->fd = SYNC_getSock(&state->endpoint);
151 if (connect(state->fd, (struct sockaddr *)&addr, AFS_SOCKADDR_LEN(&addr)) >= 0)
156 Log("SYNC_connect: temporary failure on circuit '%s' (will retry)\n",
158 SYNC_disconnect(state);
161 perror("SYNC_connect failed (giving up!)");
166 * forcibly disconnect a sync client handle.
168 * @param[in] state pointer to sync client handle
170 * @retval operation status
174 SYNC_disconnect(SYNC_client_state * state)
176 rk_closesocket(state->fd);
177 state->fd = OSI_NULLSOCKET;
182 * gracefully disconnect a sync client handle.
184 * @param[in] state pointer to sync client handle
186 * @return operation status
187 * @retval SYNC_OK success
190 SYNC_closeChannel(SYNC_client_state * state)
194 SYNC_PROTO_BUF_DECL(ores);
196 if (state->fd == OSI_NULLSOCKET)
199 memset(&com, 0, sizeof(com));
200 memset(&res, 0, sizeof(res));
202 res.payload.len = SYNC_PROTO_MAX_LEN;
203 res.payload.buf = ores;
205 com.hdr.command = SYNC_COM_CHANNEL_CLOSE;
206 com.hdr.command_len = sizeof(SYNC_command_hdr);
207 com.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
209 /* in case the other end dropped, don't do any retries */
210 state->retry_limit = 0;
211 state->hard_timeout = 0;
213 SYNC_ask(state, &com, &res);
214 SYNC_disconnect(state);
220 * forcibly break a client connection, and then create a new connection.
222 * @param[in] state pointer to sync client handle
224 * @post old connection dropped; new connection established
226 * @return @see SYNC_connect()
229 SYNC_reconnect(SYNC_client_state * state)
231 SYNC_disconnect(state);
232 return SYNC_connect(state);
236 * send a command to a sync server and wait for a response.
238 * @param[in] state pointer to sync client handle
239 * @param[in] com command object
240 * @param[out] res response object
242 * @return operation status
243 * @retval SYNC_OK success
244 * @retval SYNC_COM_ERROR communications error
245 * @retval SYNC_BAD_COMMAND server did not recognize command code
247 * @note this routine merely handles error processing; SYNC_ask_internal()
248 * handles the low-level details of communicating with the SYNC server.
250 * @see SYNC_ask_internal
253 SYNC_ask(SYNC_client_state * state, SYNC_command * com, SYNC_response * res)
256 afs_uint32 now, timeout, code=SYNC_OK;
258 if (state->fd == OSI_NULLSOCKET) {
262 if (state->fd == OSI_NULLSOCKET) {
263 return SYNC_COM_ERROR;
266 #ifdef AFS_DEMAND_ATTACH_FS
267 com->hdr.flags |= SYNC_FLAG_DAFS_EXTENSIONS;
270 now = FT_ApproxTime();
271 timeout = now + state->hard_timeout;
273 (tries <= state->retry_limit) && (now <= timeout);
274 tries++, now = FT_ApproxTime()) {
275 code = SYNC_ask_internal(state, com, res);
276 if (code == SYNC_OK) {
278 } else if (code == SYNC_BAD_COMMAND) {
279 Log("SYNC_ask: protocol mismatch on circuit '%s'; make sure "
280 "fileserver, volserver, salvageserver and salvager are same "
281 "version\n", state->proto_name);
283 } else if ((code == SYNC_COM_ERROR) && (tries < state->retry_limit)) {
284 Log("SYNC_ask: protocol communications failure on circuit '%s'; "
285 "attempting reconnect to server\n", state->proto_name);
286 SYNC_reconnect(state);
290 * unknown (probably protocol-specific) response code, pass it up to
291 * the caller, and let them deal with it
297 if (code == SYNC_COM_ERROR) {
298 Log("SYNC_ask: too many / too latent fatal protocol errors on circuit "
299 "'%s'; giving up (tries %d timeout %d)\n",
300 state->proto_name, tries, timeout);
307 * send a command to a sync server and wait for a response.
309 * @param[in] state pointer to sync client handle
310 * @param[in] com command object
311 * @param[out] res response object
313 * @return operation status
314 * @retval SYNC_OK success
315 * @retval SYNC_COM_ERROR communications error
320 SYNC_ask_internal(SYNC_client_state * state, SYNC_command * com, SYNC_response * res)
323 SYNC_PROTO_BUF_DECL(buf);
329 if (state->fd == OSI_NULLSOCKET) {
330 Log("SYNC_ask: invalid sync file descriptor on circuit '%s'\n",
332 res->hdr.response = SYNC_COM_ERROR;
336 if (com->hdr.command_len > SYNC_PROTO_MAX_LEN) {
337 Log("SYNC_ask: internal SYNC buffer too small on circuit '%s'; "
338 "please file a bug\n", state->proto_name);
339 res->hdr.response = SYNC_COM_ERROR;
344 * fill in some common header fields
346 com->hdr.proto_version = state->proto_version;
347 com->hdr.pkt_seq = ++state->pkt_seq;
348 com->hdr.com_seq = ++state->com_seq;
353 com->hdr.pid = getpid();
354 #ifdef AFS_PTHREAD_ENV
355 com->hdr.tid = afs_pointer_to_int(pthread_self());
358 PROCESS handle = LWP_ThreadId();
359 com->hdr.tid = (handle) ? handle->index : 0;
361 #endif /* !AFS_PTHREAD_ENV */
362 #endif /* !AFS_NT40_ENV */
364 memcpy(buf, &com->hdr, sizeof(com->hdr));
365 if (com->payload.len) {
366 memcpy(buf + sizeof(com->hdr), com->payload.buf,
367 com->hdr.command_len - sizeof(com->hdr));
371 n = send(state->fd, buf, com->hdr.command_len, 0);
372 if (n != com->hdr.command_len) {
373 Log("SYNC_ask: write failed on circuit '%s'\n", state->proto_name);
374 res->hdr.response = SYNC_COM_ERROR;
378 if (com->hdr.command == SYNC_COM_CHANNEL_CLOSE) {
379 /* short circuit close channel requests */
380 res->hdr.response = SYNC_OK;
384 n = recv(state->fd, buf, SYNC_PROTO_MAX_LEN, 0);
385 if (n == 0 || (n < 0 && WSAEINTR != WSAGetLastError())) {
386 Log("SYNC_ask: No response on circuit '%s'\n", state->proto_name);
387 res->hdr.response = SYNC_COM_ERROR;
390 #else /* !AFS_NT40_ENV */
391 n = write(state->fd, buf, com->hdr.command_len);
392 if (com->hdr.command_len != n) {
393 Log("SYNC_ask: write failed on circuit '%s'\n", state->proto_name);
394 res->hdr.response = SYNC_COM_ERROR;
398 if (com->hdr.command == SYNC_COM_CHANNEL_CLOSE) {
399 /* short circuit close channel requests */
400 res->hdr.response = SYNC_OK;
404 /* receive the response */
405 iov[0].iov_base = (char *)&res->hdr;
406 iov[0].iov_len = sizeof(res->hdr);
407 if (res->payload.len) {
408 iov[1].iov_base = (char *)res->payload.buf;
409 iov[1].iov_len = res->payload.len;
414 n = readv(state->fd, iov, iovcnt);
415 if (n == 0 || (n < 0 && errno != EINTR)) {
416 Log("SYNC_ask: No response on circuit '%s'\n", state->proto_name);
417 res->hdr.response = SYNC_COM_ERROR;
420 #endif /* !AFS_NT40_ENV */
424 if (n < sizeof(res->hdr)) {
425 Log("SYNC_ask: response too short on circuit '%s'\n",
427 res->hdr.response = SYNC_COM_ERROR;
431 memcpy(&res->hdr, buf, sizeof(res->hdr));
434 if ((n - sizeof(res->hdr)) > res->payload.len) {
435 Log("SYNC_ask: response too long on circuit '%s'\n",
437 res->hdr.response = SYNC_COM_ERROR;
441 memcpy(res->payload.buf, buf + sizeof(res->hdr), n - sizeof(res->hdr));
444 if (res->hdr.response_len != n) {
445 Log("SYNC_ask: length field in response inconsistent "
446 "on circuit '%s'\n", state->proto_name);
447 res->hdr.response = SYNC_COM_ERROR;
450 if (res->hdr.response == SYNC_DENIED) {
451 Log("SYNC_ask: negative response on circuit '%s'\n", state->proto_name);
455 return res->hdr.response;
460 * daemon com SYNC server-side interfaces
464 * receive a command structure off a sync socket.
466 * @param[in] state pointer to server-side state object
467 * @param[in] fd file descriptor on which to perform i/o
468 * @param[out] com sync command object to be populated
470 * @return operation status
471 * @retval SYNC_OK command received
472 * @retval SYNC_COM_ERROR there was a socket communications error
475 SYNC_getCom(SYNC_server_state_t * state,
480 afs_int32 code = SYNC_OK;
482 SYNC_PROTO_BUF_DECL(buf);
489 n = recv(fd, buf, SYNC_PROTO_MAX_LEN, 0);
491 if (n == 0 || (n < 0 && WSAEINTR != WSAGetLastError())) {
492 Log("SYNC_getCom: error receiving command\n");
493 code = SYNC_COM_ERROR;
496 #else /* !AFS_NT40_ENV */
497 iov[0].iov_base = (char *)&com->hdr;
498 iov[0].iov_len = sizeof(com->hdr);
499 if (com->payload.len) {
500 iov[1].iov_base = (char *)com->payload.buf;
501 iov[1].iov_len = com->payload.len;
507 n = readv(fd, iov, iovcnt);
508 if (n == 0 || (n < 0 && errno != EINTR)) {
509 Log("SYNC_getCom: error receiving command\n");
510 code = SYNC_COM_ERROR;
513 #endif /* !AFS_NT40_ENV */
517 if (n < sizeof(com->hdr)) {
518 Log("SYNC_getCom: command too short\n");
519 code = SYNC_COM_ERROR;
523 memcpy(&com->hdr, buf, sizeof(com->hdr));
526 if ((n - sizeof(com->hdr)) > com->payload.len) {
527 Log("SYNC_getCom: command too long\n");
528 code = SYNC_COM_ERROR;
532 memcpy(com->payload.buf, buf + sizeof(com->hdr), n - sizeof(com->hdr));
540 * write a response structure to a sync socket.
542 * @param[in] state handle to server-side state object
543 * @param[in] fd file descriptor on which to perform i/o
544 * @param[in] res handle to response packet
546 * @return operation status
548 * @retval SYNC_COM_ERROR
551 SYNC_putRes(SYNC_server_state_t * state,
556 afs_int32 code = SYNC_OK;
557 SYNC_PROTO_BUF_DECL(buf);
559 if (res->hdr.response_len > (sizeof(res->hdr) + res->payload.len)) {
560 Log("SYNC_putRes: response_len field in response header inconsistent\n");
561 code = SYNC_COM_ERROR;
565 if (res->hdr.response_len > SYNC_PROTO_MAX_LEN) {
566 Log("SYNC_putRes: internal SYNC buffer too small; please file a bug\n");
567 code = SYNC_COM_ERROR;
571 #ifdef AFS_DEMAND_ATTACH_FS
572 res->hdr.flags |= SYNC_FLAG_DAFS_EXTENSIONS;
574 res->hdr.proto_version = state->proto_version;
575 res->hdr.pkt_seq = ++state->pkt_seq;
576 res->hdr.res_seq = ++state->res_seq;
578 memcpy(buf, &res->hdr, sizeof(res->hdr));
579 if (res->payload.len) {
580 memcpy(buf + sizeof(res->hdr), res->payload.buf,
581 res->hdr.response_len - sizeof(res->hdr));
585 n = send(fd, buf, res->hdr.response_len, 0);
586 #else /* !AFS_NT40_ENV */
587 n = write(fd, buf, res->hdr.response_len);
588 #endif /* !AFS_NT40_ENV */
590 if (res->hdr.response_len != n) {
591 Log("SYNC_putRes: write failed\n");
592 res->hdr.response = SYNC_COM_ERROR;
600 /* return 0 for legal (null-terminated) string,
601 * 1 for illegal (unterminated) string */
603 SYNC_verifyProtocolString(char * buf, size_t len)
607 s_len = strnlen(buf, len);
609 return (s_len == len) ? 1 : 0;
613 * clean up old sockets.
615 * @param[in] state server state object
617 * @post unix domain sockets are cleaned up
620 SYNC_cleanupSock(SYNC_server_state_t * state)
622 #ifdef USE_UNIX_SOCKETS
623 remove(state->addr.sun_path);
628 * bind socket and set it to listen state.
630 * @param[in] state server state object
632 * @return operation status
634 * @retval nonzero failure
636 * @post socket bound and set to listen state
639 SYNC_bindSock(SYNC_server_state_t * state)
645 /* Reuseaddr needed because system inexplicably leaves crud lying around */
647 setsockopt(state->fd, SOL_SOCKET, SO_REUSEADDR, (char *)&on,
650 Log("SYNC_bindSock: setsockopt failed with (%d)\n", errno);
652 for (numTries = 0; numTries < state->bind_retry_limit; numTries++) {
653 code = bind(state->fd,
654 (struct sockaddr *)&state->addr,
655 AFS_SOCKADDR_LEN(&state->addr));
658 Log("SYNC_bindSock: bind failed with (%d), will sleep and retry\n",
662 listen(state->fd, state->listen_depth);