2 * Copyright 2006-2008, Sine Nomine Associates and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
11 * localhost interprocess communication for servers
13 * currently handled by a localhost socket
14 * (yes, this needs to be replaced someday)
18 #define FD_SETSIZE 65536
21 #include <afsconfig.h>
22 #include <afs/param.h>
27 #include <sys/types.h>
33 #include <sys/param.h>
34 #include <sys/socket.h>
35 #include <netinet/in.h>
46 #include <afs/afsint.h>
48 #include <afs/errors.h>
49 #include "daemon_com.h"
52 #include <afs/afssyscalls.h>
56 #include "partition.h"
57 #include <rx/rx_queue.h>
59 #ifdef USE_UNIX_SOCKETS
60 #include <afs/afsutil.h>
64 /*@printflike@*/ extern void Log(const char *format, ...);
66 int (*V_BreakVolumeCallbacks) ();
68 #define MAXHANDLERS 4 /* Up to 4 clients; must be at least 2, so that
69 * move = dump+restore can run on single server */
71 #define MAX_BIND_TRIES 5 /* Number of times to retry socket bind */
73 static int SYNC_ask_internal(SYNC_client_state * state, SYNC_command * com, SYNC_response * res);
77 * On AIX, connect() and bind() require use of SUN_LEN() macro;
78 * sizeof(struct sockaddr_un) will not suffice.
80 #if defined(AFS_AIX_ENV) && defined(USE_UNIX_SOCKETS)
81 #define AFS_SOCKADDR_LEN(sa) SUN_LEN(sa)
83 #define AFS_SOCKADDR_LEN(sa) sizeof(*sa)
87 /* daemon com SYNC general interfaces */
90 * fill in sockaddr structure.
92 * @param[in] endpoint pointer to sync endpoint object
93 * @param[out] addr pointer to sockaddr structure
95 * @post sockaddr structure populated using information from
99 SYNC_getAddr(SYNC_endpoint_t * endpoint, SYNC_sockaddr_t * addr)
101 #ifdef USE_UNIX_SOCKETS
102 char tbuffer[AFSDIR_PATH_MAX];
103 #endif /* USE_UNIX_SOCKETS */
105 memset(addr, 0, sizeof(*addr));
107 #ifdef USE_UNIX_SOCKETS
108 strcompose(tbuffer, AFSDIR_PATH_MAX, AFSDIR_SERVER_LOCAL_DIRPATH, "/",
110 addr->sun_family = AF_UNIX;
111 strncpy(addr->sun_path, tbuffer, (sizeof(struct sockaddr_un) - sizeof(short)));
112 #else /* !USE_UNIX_SOCKETS */
113 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
114 addr->sin_len = sizeof(struct sockaddr_in);
116 addr->sin_addr.s_addr = htonl(0x7f000001);
117 addr->sin_family = AF_INET; /* was localhost->h_addrtype */
118 addr->sin_port = htons(endpoint->in); /* XXXX htons not _really_ neccessary */
119 #endif /* !USE_UNIX_SOCKETS */
123 * get a socket descriptor of the appropriate domain.
125 * @param[in] endpoint pointer to sync endpoint object
127 * @return socket descriptor
129 * @post socket of domain specified in endpoint structure is created and
130 * returned to caller.
133 SYNC_getSock(SYNC_endpoint_t * endpoint)
136 assert((sd = socket(endpoint->domain, SOCK_STREAM, 0)) >= 0);
140 /* daemon com SYNC client interface */
143 * open a client connection to a sync server
145 * @param[in] state pointer to sync client handle
147 * @return operation status
150 * @note at present, this routine aborts rather than returning an error code
153 SYNC_connect(SYNC_client_state * state)
155 SYNC_sockaddr_t addr;
156 /* I can't believe the following is needed for localhost connections!! */
157 static time_t backoff[] =
158 { 3, 3, 3, 5, 5, 5, 7, 15, 16, 24, 32, 40, 48, 0 };
159 time_t *timeout = &backoff[0];
161 if (state->fd >= 0) {
165 SYNC_getAddr(&state->endpoint, &addr);
168 state->fd = SYNC_getSock(&state->endpoint);
169 if (connect(state->fd, (struct sockaddr *)&addr, AFS_SOCKADDR_LEN(&addr)) >= 0)
174 Log("SYNC_connect: temporary failure on circuit '%s' (will retry)\n",
176 SYNC_disconnect(state);
179 perror("SYNC_connect failed (giving up!)");
184 * forcibly disconnect a sync client handle.
186 * @param[in] state pointer to sync client handle
188 * @retval operation status
192 SYNC_disconnect(SYNC_client_state * state)
195 closesocket(state->fd);
204 * gracefully disconnect a sync client handle.
206 * @param[in] state pointer to sync client handle
208 * @return operation status
209 * @retval SYNC_OK success
212 SYNC_closeChannel(SYNC_client_state * state)
217 SYNC_PROTO_BUF_DECL(ores);
222 memset(&com, 0, sizeof(com));
223 memset(&res, 0, sizeof(res));
225 res.payload.len = SYNC_PROTO_MAX_LEN;
226 res.payload.buf = ores;
228 com.hdr.command = SYNC_COM_CHANNEL_CLOSE;
229 com.hdr.command_len = sizeof(SYNC_command_hdr);
230 com.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
232 /* in case the other end dropped, don't do any retries */
233 state->retry_limit = 0;
234 state->hard_timeout = 0;
236 SYNC_ask(state, &com, &res);
237 SYNC_disconnect(state);
243 * forcibly break a client connection, and then create a new connection.
245 * @param[in] state pointer to sync client handle
247 * @post old connection dropped; new connection established
249 * @return @see SYNC_connect()
252 SYNC_reconnect(SYNC_client_state * state)
254 SYNC_disconnect(state);
255 return SYNC_connect(state);
259 * send a command to a sync server and wait for a response.
261 * @param[in] state pointer to sync client handle
262 * @param[in] com command object
263 * @param[out] res response object
265 * @return operation status
266 * @retval SYNC_OK success
267 * @retval SYNC_COM_ERROR communications error
268 * @retval SYNC_BAD_COMMAND server did not recognize command code
270 * @note this routine merely handles error processing; SYNC_ask_internal()
271 * handles the low-level details of communicating with the SYNC server.
273 * @see SYNC_ask_internal
276 SYNC_ask(SYNC_client_state * state, SYNC_command * com, SYNC_response * res)
279 afs_uint32 now, timeout, code=SYNC_OK;
281 if (state->fatal_error) {
282 return SYNC_COM_ERROR;
285 if (state->fd == -1) {
289 if (state->fd == -1) {
290 state->fatal_error = 1;
291 return SYNC_COM_ERROR;
294 #ifdef AFS_DEMAND_ATTACH_FS
295 com->hdr.flags |= SYNC_FLAG_DAFS_EXTENSIONS;
298 now = FT_ApproxTime();
299 timeout = now + state->hard_timeout;
301 (tries <= state->retry_limit) && (now <= timeout);
302 tries++, now = FT_ApproxTime()) {
303 code = SYNC_ask_internal(state, com, res);
304 if (code == SYNC_OK) {
306 } else if (code == SYNC_BAD_COMMAND) {
307 Log("SYNC_ask: protocol mismatch on circuit '%s'; make sure "
308 "fileserver, volserver, salvageserver and salvager are same "
309 "version\n", state->proto_name);
311 } else if ((code == SYNC_COM_ERROR) && (tries < state->retry_limit)) {
312 Log("SYNC_ask: protocol communications failure on circuit '%s'; "
313 "attempting reconnect to server\n", state->proto_name);
314 SYNC_reconnect(state);
318 * unknown (probably protocol-specific) response code, pass it up to
319 * the caller, and let them deal with it
325 if (code == SYNC_COM_ERROR) {
326 Log("SYNC_ask: fatal protocol error on circuit '%s'; disabling sync "
327 "protocol until next server restart\n",
329 state->fatal_error = 1;
336 * send a command to a sync server and wait for a response.
338 * @param[in] state pointer to sync client handle
339 * @param[in] com command object
340 * @param[out] res response object
342 * @return operation status
343 * @retval SYNC_OK success
344 * @retval SYNC_COM_ERROR communications error
349 SYNC_ask_internal(SYNC_client_state * state, SYNC_command * com, SYNC_response * res)
352 SYNC_PROTO_BUF_DECL(buf);
358 if (state->fd == -1) {
359 Log("SYNC_ask: invalid sync file descriptor on circuit '%s'\n",
361 res->hdr.response = SYNC_COM_ERROR;
365 if (com->hdr.command_len > SYNC_PROTO_MAX_LEN) {
366 Log("SYNC_ask: internal SYNC buffer too small on circuit '%s'; "
367 "please file a bug\n", state->proto_name);
368 res->hdr.response = SYNC_COM_ERROR;
372 com->hdr.proto_version = state->proto_version;
374 memcpy(buf, &com->hdr, sizeof(com->hdr));
375 if (com->payload.len) {
376 memcpy(buf + sizeof(com->hdr), com->payload.buf,
377 com->hdr.command_len - sizeof(com->hdr));
381 n = send(state->fd, buf, com->hdr.command_len, 0);
382 if (n != com->hdr.command_len) {
383 Log("SYNC_ask: write failed on circuit '%s'\n", state->proto_name);
384 res->hdr.response = SYNC_COM_ERROR;
388 if (com->hdr.command == SYNC_COM_CHANNEL_CLOSE) {
389 /* short circuit close channel requests */
390 res->hdr.response = SYNC_OK;
394 n = recv(state->fd, buf, SYNC_PROTO_MAX_LEN, 0);
395 if (n == 0 || (n < 0 && WSAEINTR != WSAGetLastError())) {
396 Log("SYNC_ask: No response on circuit '%s'\n", state->proto_name);
397 res->hdr.response = SYNC_COM_ERROR;
400 #else /* !AFS_NT40_ENV */
401 n = write(state->fd, buf, com->hdr.command_len);
402 if (com->hdr.command_len != n) {
403 Log("SYNC_ask: write failed on circuit '%s'\n", state->proto_name);
404 res->hdr.response = SYNC_COM_ERROR;
408 if (com->hdr.command == SYNC_COM_CHANNEL_CLOSE) {
409 /* short circuit close channel requests */
410 res->hdr.response = SYNC_OK;
414 /* receive the response */
415 iov[0].iov_base = (char *)&res->hdr;
416 iov[0].iov_len = sizeof(res->hdr);
417 if (res->payload.len) {
418 iov[1].iov_base = (char *)res->payload.buf;
419 iov[1].iov_len = res->payload.len;
424 n = readv(state->fd, iov, iovcnt);
425 if (n == 0 || (n < 0 && errno != EINTR)) {
426 Log("SYNC_ask: No response on circuit '%s'\n", state->proto_name);
427 res->hdr.response = SYNC_COM_ERROR;
430 #endif /* !AFS_NT40_ENV */
434 if (n < sizeof(res->hdr)) {
435 Log("SYNC_ask: response too short on circuit '%s'\n",
437 res->hdr.response = SYNC_COM_ERROR;
441 memcpy(&res->hdr, buf, sizeof(res->hdr));
444 if ((n - sizeof(res->hdr)) > res->payload.len) {
445 Log("SYNC_ask: response too long on circuit '%s'\n",
447 res->hdr.response = SYNC_COM_ERROR;
451 memcpy(res->payload.buf, buf + sizeof(res->hdr), n - sizeof(res->hdr));
454 if (res->hdr.response_len != n) {
455 Log("SYNC_ask: length field in response inconsistent "
456 "on circuit '%s'\n", state->proto_name);
457 res->hdr.response = SYNC_COM_ERROR;
460 if (res->hdr.response == SYNC_DENIED) {
461 Log("SYNC_ask: negative response on circuit '%s'\n", state->proto_name);
465 return res->hdr.response;
470 * daemon com SYNC server-side interfaces
474 * receive a command structure off a sync socket.
476 * @param[in] fd socket descriptor
477 * @param[out] com sync command object to be populated
479 * @return operation status
480 * @retval SYNC_OK command received
481 * @retval SYNC_COM_ERROR there was a socket communications error
484 SYNC_getCom(int fd, SYNC_command * com)
487 afs_int32 code = SYNC_OK;
489 SYNC_PROTO_BUF_DECL(buf);
496 n = recv(fd, buf, SYNC_PROTO_MAX_LEN, 0);
498 if (n == 0 || (n < 0 && WSAEINTR != WSAGetLastError())) {
499 Log("SYNC_getCom: error receiving command\n");
500 code = SYNC_COM_ERROR;
503 #else /* !AFS_NT40_ENV */
504 iov[0].iov_base = (char *)&com->hdr;
505 iov[0].iov_len = sizeof(com->hdr);
506 if (com->payload.len) {
507 iov[1].iov_base = (char *)com->payload.buf;
508 iov[1].iov_len = com->payload.len;
514 n = readv(fd, iov, iovcnt);
515 if (n == 0 || (n < 0 && errno != EINTR)) {
516 Log("SYNC_getCom: error receiving command\n");
517 code = SYNC_COM_ERROR;
520 #endif /* !AFS_NT40_ENV */
524 if (n < sizeof(com->hdr)) {
525 Log("SYNC_getCom: command too short\n");
526 code = SYNC_COM_ERROR;
530 memcpy(&com->hdr, buf, sizeof(com->hdr));
533 if ((n - sizeof(com->hdr)) > com->payload.len) {
534 Log("SYNC_getCom: command too long\n");
535 code = SYNC_COM_ERROR;
539 memcpy(com->payload.buf, buf + sizeof(com->hdr), n - sizeof(com->hdr));
547 * write a response structure to a sync socket.
552 * @return operation status
554 * @retval SYNC_COM_ERROR
557 SYNC_putRes(int fd, SYNC_response * res)
560 afs_int32 code = SYNC_OK;
561 SYNC_PROTO_BUF_DECL(buf);
563 if (res->hdr.response_len > (sizeof(res->hdr) + res->payload.len)) {
564 Log("SYNC_putRes: response_len field in response header inconsistent\n");
565 code = SYNC_COM_ERROR;
569 if (res->hdr.response_len > SYNC_PROTO_MAX_LEN) {
570 Log("SYNC_putRes: internal SYNC buffer too small; please file a bug\n");
571 code = SYNC_COM_ERROR;
575 #ifdef AFS_DEMAND_ATTACH_FS
576 res->hdr.flags |= SYNC_FLAG_DAFS_EXTENSIONS;
579 memcpy(buf, &res->hdr, sizeof(res->hdr));
580 if (res->payload.len) {
581 memcpy(buf + sizeof(res->hdr), res->payload.buf,
582 res->hdr.response_len - sizeof(res->hdr));
586 n = send(fd, buf, res->hdr.response_len, 0);
587 #else /* !AFS_NT40_ENV */
588 n = write(fd, buf, res->hdr.response_len);
589 #endif /* !AFS_NT40_ENV */
591 if (res->hdr.response_len != n) {
592 Log("SYNC_putRes: write failed\n");
593 res->hdr.response = SYNC_COM_ERROR;
601 /* return 0 for legal (null-terminated) string,
602 * 1 for illegal (unterminated) string */
604 SYNC_verifyProtocolString(char * buf, size_t len)
609 s_len = afs_strnlen(buf, len);
611 return (s_len == len) ? 1 : 0;
615 * clean up old sockets.
617 * @param[in] state server state object
619 * @post unix domain sockets are cleaned up
622 SYNC_cleanupSock(SYNC_server_state_t * state)
624 #ifdef USE_UNIX_SOCKETS
625 remove(state->addr.sun_path);
630 * bind socket and set it to listen state.
632 * @param[in] state server state object
634 * @return operation status
636 * @retval nonzero failure
638 * @post socket bound and set to listen state
641 SYNC_bindSock(SYNC_server_state_t * state)
647 /* Reuseaddr needed because system inexplicably leaves crud lying around */
649 setsockopt(state->fd, SOL_SOCKET, SO_REUSEADDR, (char *)&on,
652 Log("SYNC_bindSock: setsockopt failed with (%d)\n", errno);
654 for (numTries = 0; numTries < state->bind_retry_limit; numTries++) {
655 code = bind(state->fd,
656 (struct sockaddr *)&state->addr,
657 AFS_SOCKADDR_LEN(&state->addr));
660 Log("SYNC_bindSock: bind failed with (%d), will sleep and retry\n",
664 listen(state->fd, state->listen_depth);