2 * Copyright 2006-2008, Sine Nomine Associates and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
11 * localhost interprocess communication for servers
13 * currently handled by a localhost socket
14 * (yes, this needs to be replaced someday)
18 #define FD_SETSIZE 65536
21 #include <afsconfig.h>
22 #include <afs/param.h>
26 #include <sys/types.h>
32 #include <sys/param.h>
33 #include <sys/socket.h>
34 #include <netinet/in.h>
40 #include <afs/afs_assert.h>
46 #include <afs/afsint.h>
48 #include <afs/errors.h>
49 #include "daemon_com.h"
52 #include <afs/afssyscalls.h>
56 #include "partition.h"
58 #include <rx/rx_queue.h>
60 #ifdef USE_UNIX_SOCKETS
61 #include <afs/afsutil.h>
65 int (*V_BreakVolumeCallbacks) (VolumeId);
67 #define MAXHANDLERS 4 /* Up to 4 clients; must be at least 2, so that
68 * move = dump+restore can run on single server */
70 #define MAX_BIND_TRIES 5 /* Number of times to retry socket bind */
72 static int SYNC_ask_internal(SYNC_client_state * state, SYNC_command * com, SYNC_response * res);
76 * On AIX, connect() and bind() require use of SUN_LEN() macro;
77 * sizeof(struct sockaddr_un) will not suffice.
79 #if defined(AFS_AIX_ENV) && defined(USE_UNIX_SOCKETS)
80 #define AFS_SOCKADDR_LEN(sa) SUN_LEN(sa)
82 #define AFS_SOCKADDR_LEN(sa) sizeof(*sa)
86 /* daemon com SYNC general interfaces */
89 * fill in sockaddr structure.
91 * @param[in] endpoint pointer to sync endpoint object
92 * @param[out] addr pointer to sockaddr structure
94 * @post sockaddr structure populated using information from
98 SYNC_getAddr(SYNC_endpoint_t * endpoint, SYNC_sockaddr_t * addr)
100 #ifdef USE_UNIX_SOCKETS
101 char tbuffer[AFSDIR_PATH_MAX];
102 #endif /* USE_UNIX_SOCKETS */
104 memset(addr, 0, sizeof(*addr));
106 #ifdef USE_UNIX_SOCKETS
107 strcompose(tbuffer, AFSDIR_PATH_MAX, AFSDIR_SERVER_LOCAL_DIRPATH, "/",
109 addr->sun_family = AF_UNIX;
110 strncpy(addr->sun_path, tbuffer, (sizeof(struct sockaddr_un) - sizeof(short)));
111 #else /* !USE_UNIX_SOCKETS */
112 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
113 addr->sin_len = sizeof(struct sockaddr_in);
115 addr->sin_addr.s_addr = htonl(0x7f000001);
116 addr->sin_family = AF_INET; /* was localhost->h_addrtype */
117 addr->sin_port = htons(endpoint->in); /* XXXX htons not _really_ neccessary */
118 #endif /* !USE_UNIX_SOCKETS */
122 * get a socket descriptor of the appropriate domain.
124 * @param[in] endpoint pointer to sync endpoint object
126 * @return socket descriptor
128 * @post socket of domain specified in endpoint structure is created and
129 * returned to caller.
132 SYNC_getSock(SYNC_endpoint_t * endpoint)
135 osi_Assert((sd = socket(endpoint->domain, SOCK_STREAM, 0)) >= 0);
139 /* daemon com SYNC client interface */
142 * open a client connection to a sync server
144 * @param[in] state pointer to sync client handle
146 * @return operation status
149 * @note at present, this routine aborts rather than returning an error code
152 SYNC_connect(SYNC_client_state * state)
154 SYNC_sockaddr_t addr;
155 /* I can't believe the following is needed for localhost connections!! */
156 static time_t backoff[] =
157 { 3, 3, 3, 5, 5, 5, 7, 15, 16, 24, 32, 40, 48, 0 };
158 time_t *timeout = &backoff[0];
160 if (state->fd != OSI_NULLSOCKET) {
164 SYNC_getAddr(&state->endpoint, &addr);
167 state->fd = SYNC_getSock(&state->endpoint);
168 if (connect(state->fd, (struct sockaddr *)&addr, AFS_SOCKADDR_LEN(&addr)) >= 0)
173 Log("SYNC_connect: temporary failure on circuit '%s' (will retry)\n",
175 SYNC_disconnect(state);
178 perror("SYNC_connect failed (giving up!)");
183 * forcibly disconnect a sync client handle.
185 * @param[in] state pointer to sync client handle
187 * @retval operation status
191 SYNC_disconnect(SYNC_client_state * state)
194 closesocket(state->fd);
198 state->fd = OSI_NULLSOCKET;
203 * gracefully disconnect a sync client handle.
205 * @param[in] state pointer to sync client handle
207 * @return operation status
208 * @retval SYNC_OK success
211 SYNC_closeChannel(SYNC_client_state * state)
215 SYNC_PROTO_BUF_DECL(ores);
217 if (state->fd == OSI_NULLSOCKET)
220 memset(&com, 0, sizeof(com));
221 memset(&res, 0, sizeof(res));
223 res.payload.len = SYNC_PROTO_MAX_LEN;
224 res.payload.buf = ores;
226 com.hdr.command = SYNC_COM_CHANNEL_CLOSE;
227 com.hdr.command_len = sizeof(SYNC_command_hdr);
228 com.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
230 /* in case the other end dropped, don't do any retries */
231 state->retry_limit = 0;
232 state->hard_timeout = 0;
234 SYNC_ask(state, &com, &res);
235 SYNC_disconnect(state);
241 * forcibly break a client connection, and then create a new connection.
243 * @param[in] state pointer to sync client handle
245 * @post old connection dropped; new connection established
247 * @return @see SYNC_connect()
250 SYNC_reconnect(SYNC_client_state * state)
252 SYNC_disconnect(state);
253 return SYNC_connect(state);
257 * send a command to a sync server and wait for a response.
259 * @param[in] state pointer to sync client handle
260 * @param[in] com command object
261 * @param[out] res response object
263 * @return operation status
264 * @retval SYNC_OK success
265 * @retval SYNC_COM_ERROR communications error
266 * @retval SYNC_BAD_COMMAND server did not recognize command code
268 * @note this routine merely handles error processing; SYNC_ask_internal()
269 * handles the low-level details of communicating with the SYNC server.
271 * @see SYNC_ask_internal
274 SYNC_ask(SYNC_client_state * state, SYNC_command * com, SYNC_response * res)
277 afs_uint32 now, timeout, code=SYNC_OK;
279 if (state->fatal_error) {
280 return SYNC_COM_ERROR;
283 if (state->fd == OSI_NULLSOCKET) {
287 if (state->fd == OSI_NULLSOCKET) {
288 state->fatal_error = 1;
289 return SYNC_COM_ERROR;
292 #ifdef AFS_DEMAND_ATTACH_FS
293 com->hdr.flags |= SYNC_FLAG_DAFS_EXTENSIONS;
296 now = FT_ApproxTime();
297 timeout = now + state->hard_timeout;
299 (tries <= state->retry_limit) && (now <= timeout);
300 tries++, now = FT_ApproxTime()) {
301 code = SYNC_ask_internal(state, com, res);
302 if (code == SYNC_OK) {
304 } else if (code == SYNC_BAD_COMMAND) {
305 Log("SYNC_ask: protocol mismatch on circuit '%s'; make sure "
306 "fileserver, volserver, salvageserver and salvager are same "
307 "version\n", state->proto_name);
309 } else if ((code == SYNC_COM_ERROR) && (tries < state->retry_limit)) {
310 Log("SYNC_ask: protocol communications failure on circuit '%s'; "
311 "attempting reconnect to server\n", state->proto_name);
312 SYNC_reconnect(state);
316 * unknown (probably protocol-specific) response code, pass it up to
317 * the caller, and let them deal with it
323 if (code == SYNC_COM_ERROR) {
324 Log("SYNC_ask: fatal protocol error on circuit '%s'; disabling sync "
325 "protocol until next server restart\n",
327 state->fatal_error = 1;
334 * send a command to a sync server and wait for a response.
336 * @param[in] state pointer to sync client handle
337 * @param[in] com command object
338 * @param[out] res response object
340 * @return operation status
341 * @retval SYNC_OK success
342 * @retval SYNC_COM_ERROR communications error
347 SYNC_ask_internal(SYNC_client_state * state, SYNC_command * com, SYNC_response * res)
350 SYNC_PROTO_BUF_DECL(buf);
356 if (state->fd == OSI_NULLSOCKET) {
357 Log("SYNC_ask: invalid sync file descriptor on circuit '%s'\n",
359 res->hdr.response = SYNC_COM_ERROR;
363 if (com->hdr.command_len > SYNC_PROTO_MAX_LEN) {
364 Log("SYNC_ask: internal SYNC buffer too small on circuit '%s'; "
365 "please file a bug\n", state->proto_name);
366 res->hdr.response = SYNC_COM_ERROR;
371 * fill in some common header fields
373 com->hdr.proto_version = state->proto_version;
374 com->hdr.pkt_seq = ++state->pkt_seq;
375 com->hdr.com_seq = ++state->com_seq;
380 com->hdr.pid = getpid();
381 #ifdef AFS_PTHREAD_ENV
382 com->hdr.tid = afs_pointer_to_int(pthread_self());
385 PROCESS handle = LWP_ThreadId();
386 com->hdr.tid = (handle) ? handle->index : 0;
388 #endif /* !AFS_PTHREAD_ENV */
389 #endif /* !AFS_NT40_ENV */
391 memcpy(buf, &com->hdr, sizeof(com->hdr));
392 if (com->payload.len) {
393 memcpy(buf + sizeof(com->hdr), com->payload.buf,
394 com->hdr.command_len - sizeof(com->hdr));
398 n = send(state->fd, buf, com->hdr.command_len, 0);
399 if (n != com->hdr.command_len) {
400 Log("SYNC_ask: write failed on circuit '%s'\n", state->proto_name);
401 res->hdr.response = SYNC_COM_ERROR;
405 if (com->hdr.command == SYNC_COM_CHANNEL_CLOSE) {
406 /* short circuit close channel requests */
407 res->hdr.response = SYNC_OK;
411 n = recv(state->fd, buf, SYNC_PROTO_MAX_LEN, 0);
412 if (n == 0 || (n < 0 && WSAEINTR != WSAGetLastError())) {
413 Log("SYNC_ask: No response on circuit '%s'\n", state->proto_name);
414 res->hdr.response = SYNC_COM_ERROR;
417 #else /* !AFS_NT40_ENV */
418 n = write(state->fd, buf, com->hdr.command_len);
419 if (com->hdr.command_len != n) {
420 Log("SYNC_ask: write failed on circuit '%s'\n", state->proto_name);
421 res->hdr.response = SYNC_COM_ERROR;
425 if (com->hdr.command == SYNC_COM_CHANNEL_CLOSE) {
426 /* short circuit close channel requests */
427 res->hdr.response = SYNC_OK;
431 /* receive the response */
432 iov[0].iov_base = (char *)&res->hdr;
433 iov[0].iov_len = sizeof(res->hdr);
434 if (res->payload.len) {
435 iov[1].iov_base = (char *)res->payload.buf;
436 iov[1].iov_len = res->payload.len;
441 n = readv(state->fd, iov, iovcnt);
442 if (n == 0 || (n < 0 && errno != EINTR)) {
443 Log("SYNC_ask: No response on circuit '%s'\n", state->proto_name);
444 res->hdr.response = SYNC_COM_ERROR;
447 #endif /* !AFS_NT40_ENV */
451 if (n < sizeof(res->hdr)) {
452 Log("SYNC_ask: response too short on circuit '%s'\n",
454 res->hdr.response = SYNC_COM_ERROR;
458 memcpy(&res->hdr, buf, sizeof(res->hdr));
461 if ((n - sizeof(res->hdr)) > res->payload.len) {
462 Log("SYNC_ask: response too long on circuit '%s'\n",
464 res->hdr.response = SYNC_COM_ERROR;
468 memcpy(res->payload.buf, buf + sizeof(res->hdr), n - sizeof(res->hdr));
471 if (res->hdr.response_len != n) {
472 Log("SYNC_ask: length field in response inconsistent "
473 "on circuit '%s'\n", state->proto_name);
474 res->hdr.response = SYNC_COM_ERROR;
477 if (res->hdr.response == SYNC_DENIED) {
478 Log("SYNC_ask: negative response on circuit '%s'\n", state->proto_name);
482 return res->hdr.response;
487 * daemon com SYNC server-side interfaces
491 * receive a command structure off a sync socket.
493 * @param[in] state pointer to server-side state object
494 * @param[in] fd file descriptor on which to perform i/o
495 * @param[out] com sync command object to be populated
497 * @return operation status
498 * @retval SYNC_OK command received
499 * @retval SYNC_COM_ERROR there was a socket communications error
502 SYNC_getCom(SYNC_server_state_t * state,
507 afs_int32 code = SYNC_OK;
509 SYNC_PROTO_BUF_DECL(buf);
516 n = recv(fd, buf, SYNC_PROTO_MAX_LEN, 0);
518 if (n == 0 || (n < 0 && WSAEINTR != WSAGetLastError())) {
519 Log("SYNC_getCom: error receiving command\n");
520 code = SYNC_COM_ERROR;
523 #else /* !AFS_NT40_ENV */
524 iov[0].iov_base = (char *)&com->hdr;
525 iov[0].iov_len = sizeof(com->hdr);
526 if (com->payload.len) {
527 iov[1].iov_base = (char *)com->payload.buf;
528 iov[1].iov_len = com->payload.len;
534 n = readv(fd, iov, iovcnt);
535 if (n == 0 || (n < 0 && errno != EINTR)) {
536 Log("SYNC_getCom: error receiving command\n");
537 code = SYNC_COM_ERROR;
540 #endif /* !AFS_NT40_ENV */
544 if (n < sizeof(com->hdr)) {
545 Log("SYNC_getCom: command too short\n");
546 code = SYNC_COM_ERROR;
550 memcpy(&com->hdr, buf, sizeof(com->hdr));
553 if ((n - sizeof(com->hdr)) > com->payload.len) {
554 Log("SYNC_getCom: command too long\n");
555 code = SYNC_COM_ERROR;
559 memcpy(com->payload.buf, buf + sizeof(com->hdr), n - sizeof(com->hdr));
567 * write a response structure to a sync socket.
569 * @param[in] state handle to server-side state object
570 * @param[in] fd file descriptor on which to perform i/o
571 * @param[in] res handle to response packet
573 * @return operation status
575 * @retval SYNC_COM_ERROR
578 SYNC_putRes(SYNC_server_state_t * state,
583 afs_int32 code = SYNC_OK;
584 SYNC_PROTO_BUF_DECL(buf);
586 if (res->hdr.response_len > (sizeof(res->hdr) + res->payload.len)) {
587 Log("SYNC_putRes: response_len field in response header inconsistent\n");
588 code = SYNC_COM_ERROR;
592 if (res->hdr.response_len > SYNC_PROTO_MAX_LEN) {
593 Log("SYNC_putRes: internal SYNC buffer too small; please file a bug\n");
594 code = SYNC_COM_ERROR;
598 #ifdef AFS_DEMAND_ATTACH_FS
599 res->hdr.flags |= SYNC_FLAG_DAFS_EXTENSIONS;
601 res->hdr.proto_version = state->proto_version;
602 res->hdr.pkt_seq = ++state->pkt_seq;
603 res->hdr.res_seq = ++state->res_seq;
605 memcpy(buf, &res->hdr, sizeof(res->hdr));
606 if (res->payload.len) {
607 memcpy(buf + sizeof(res->hdr), res->payload.buf,
608 res->hdr.response_len - sizeof(res->hdr));
612 n = send(fd, buf, res->hdr.response_len, 0);
613 #else /* !AFS_NT40_ENV */
614 n = write(fd, buf, res->hdr.response_len);
615 #endif /* !AFS_NT40_ENV */
617 if (res->hdr.response_len != n) {
618 Log("SYNC_putRes: write failed\n");
619 res->hdr.response = SYNC_COM_ERROR;
627 /* return 0 for legal (null-terminated) string,
628 * 1 for illegal (unterminated) string */
630 SYNC_verifyProtocolString(char * buf, size_t len)
634 s_len = strnlen(buf, len);
636 return (s_len == len) ? 1 : 0;
640 * clean up old sockets.
642 * @param[in] state server state object
644 * @post unix domain sockets are cleaned up
647 SYNC_cleanupSock(SYNC_server_state_t * state)
649 #ifdef USE_UNIX_SOCKETS
650 remove(state->addr.sun_path);
655 * bind socket and set it to listen state.
657 * @param[in] state server state object
659 * @return operation status
661 * @retval nonzero failure
663 * @post socket bound and set to listen state
666 SYNC_bindSock(SYNC_server_state_t * state)
672 /* Reuseaddr needed because system inexplicably leaves crud lying around */
674 setsockopt(state->fd, SOL_SOCKET, SO_REUSEADDR, (char *)&on,
677 Log("SYNC_bindSock: setsockopt failed with (%d)\n", errno);
679 for (numTries = 0; numTries < state->bind_retry_limit; numTries++) {
680 code = bind(state->fd,
681 (struct sockaddr *)&state->addr,
682 AFS_SOCKADDR_LEN(&state->addr));
685 Log("SYNC_bindSock: bind failed with (%d), will sleep and retry\n",
689 listen(state->fd, state->listen_depth);