2 * Copyright 2006, Sine Nomine Associates and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
11 * localhost interprocess communication for servers
13 * currently handled by a localhost socket
14 * (yes, this needs to be replaced someday)
18 #define FD_SETSIZE 65536
21 #include <afsconfig.h>
22 #include <afs/param.h>
27 #include <sys/types.h>
33 #include <sys/param.h>
34 #include <sys/socket.h>
35 #include <netinet/in.h>
53 #include <afs/afsint.h>
55 #include <afs/errors.h>
56 #include "daemon_com.h"
59 #include <afs/afssyscalls.h>
63 #include "partition.h"
64 #include <rx/rx_queue.h>
66 #ifdef USE_UNIX_SOCKETS
67 #include <afs/afsutil.h>
71 /*@printflike@*/ extern void Log(const char *format, ...);
76 #define osi_Assert(e) (void)(e)
78 int (*V_BreakVolumeCallbacks) ();
80 #define MAXHANDLERS 4 /* Up to 4 clients; must be at least 2, so that
81 * move = dump+restore can run on single server */
83 #define MAX_BIND_TRIES 5 /* Number of times to retry socket bind */
85 #ifdef USE_UNIX_SOCKETS
86 static getport(SYNC_client_state * state, struct sockaddr_un *addr);
87 #else /* USE_UNIX_SOCKETS */
88 static getport(SYNC_client_state * state, struct sockaddr_in *addr);
89 #endif /* USE_UNIX_SOCKETS */
91 static int SYNC_ask_internal(SYNC_client_state * state, SYNC_command * com, SYNC_response * res);
93 /* daemon com SYNC client interface */
96 SYNC_connect(SYNC_client_state * state)
98 #ifdef USE_UNIX_SOCKETS
99 struct sockaddr_un addr;
100 #else /* USE_UNIX_SOCKETS */
101 struct sockaddr_in addr;
102 #endif /* USE_UNIX_SOCKETS */
103 /* I can't believe the following is needed for localhost connections!! */
104 static time_t backoff[] =
105 { 3, 3, 3, 5, 5, 5, 7, 15, 16, 24, 32, 40, 48, 0 };
106 time_t *timeout = &backoff[0];
108 if (state->fd >= 0) {
113 state->fd = getport(state, &addr);
114 if (connect(state->fd, (struct sockaddr *)&addr, sizeof(addr)) >= 0)
119 Log("SYNC_connect temporary failure (will retry)\n");
120 SYNC_disconnect(state);
123 perror("SYNC_connect failed (giving up!)");
128 SYNC_disconnect(SYNC_client_state * state)
131 closesocket(state->fd);
140 SYNC_closeChannel(SYNC_client_state * state)
145 SYNC_PROTO_BUF_DECL(ores);
150 memset(&com, 0, sizeof(com));
151 memset(&res, 0, sizeof(res));
153 res.payload.len = SYNC_PROTO_MAX_LEN;
154 res.payload.buf = ores;
156 com.hdr.command = SYNC_COM_CHANNEL_CLOSE;
157 com.hdr.command_len = sizeof(SYNC_command_hdr);
159 /* in case the other end dropped, don't do any retries */
160 state->retry_limit = 0;
161 state->hard_timeout = 0;
163 code = SYNC_ask(state, &com, &res);
165 if (code == SYNC_OK) {
166 if (res.hdr.response != SYNC_OK) {
167 Log("SYNC_closeChannel: channel shutdown request denied; closing socket anyway\n");
168 } else if (!(res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN)) {
169 Log("SYNC_closeChannel: channel shutdown request mishandled by server\n");
172 Log("SYNC_closeChannel: channel communications problem");
175 SYNC_disconnect(state);
181 SYNC_reconnect(SYNC_client_state * state)
183 SYNC_disconnect(state);
184 return SYNC_connect(state);
187 /* private function to fill in the sockaddr struct for us */
188 #ifdef USE_UNIX_SOCKETS
190 getport(SYNC_client_state * state, struct sockaddr_un *addr)
193 char tbuffer[AFSDIR_PATH_MAX];
195 strcompose(tbuffer, AFSDIR_PATH_MAX, AFSDIR_SERVER_LOCAL_DIRPATH, "/",
196 "fssync.sock", NULL);
197 memset(addr, 0, sizeof(*addr));
198 addr->sun_family = AF_UNIX;
199 strncpy(addr->sun_path, tbuffer, (sizeof(struct sockaddr_un) - sizeof(short)));
200 assert((sd = socket(AF_UNIX, SOCK_STREAM, 0)) >= 0);
203 #else /* USE_UNIX_SOCKETS */
205 getport(SYNC_client_state * state, struct sockaddr_in *addr)
208 memset(addr, 0, sizeof(*addr));
209 assert((sd = socket(AF_INET, SOCK_STREAM, 0)) >= 0);
210 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
211 addr->sin_len = sizeof(struct sockaddr_in);
213 addr->sin_addr.s_addr = htonl(0x7f000001);
214 addr->sin_family = AF_INET; /* was localhost->h_addrtype */
215 addr->sin_port = htons(state->port); /* XXXX htons not _really_ neccessary */
218 #endif /* USE_UNIX_SOCKETS */
221 SYNC_ask(SYNC_client_state * state, SYNC_command * com, SYNC_response * res)
224 afs_uint32 now, timeout, code=SYNC_OK;
226 if (state->fatal_error) {
227 return SYNC_COM_ERROR;
230 if (state->fd == -1) {
234 if (state->fd == -1) {
235 state->fatal_error = 1;
236 return SYNC_COM_ERROR;
239 #ifdef AFS_DEMAND_ATTACH_FS
240 com->hdr.flags |= SYNC_FLAG_DAFS_EXTENSIONS;
243 now = FT_ApproxTime();
244 timeout = now + state->hard_timeout;
246 (tries <= state->retry_limit) && (now <= timeout);
247 tries++, now = FT_ApproxTime()) {
248 code = SYNC_ask_internal(state, com, res);
249 if (code == SYNC_OK) {
251 } else if (code == SYNC_BAD_COMMAND) {
252 Log("SYNC_ask: protocol mismatch; make sure fileserver, volserver, salvageserver and salvager are same version\n");
254 } else if (code == SYNC_COM_ERROR) {
255 Log("SYNC_ask: protocol communications failure; attempting reconnect to server\n");
256 SYNC_reconnect(state);
259 /* unknown (probably protocol-specific) response code, pass it up to the caller, and let them deal with it */
264 if (code == SYNC_COM_ERROR) {
265 Log("SYNC_ask: fatal protocol error; disabling sync protocol to server running on port %d until next server restart\n",
267 state->fatal_error = 1;
274 SYNC_ask_internal(SYNC_client_state * state, SYNC_command * com, SYNC_response * res)
277 SYNC_PROTO_BUF_DECL(buf);
283 if (state->fd == -1) {
284 Log("SYNC_ask: invalid sync file descriptor\n");
285 res->hdr.response = SYNC_COM_ERROR;
289 if (com->hdr.command_len > SYNC_PROTO_MAX_LEN) {
290 Log("SYNC_ask: internal SYNC buffer too small; please file a bug\n");
291 res->hdr.response = SYNC_COM_ERROR;
295 com->hdr.proto_version = state->proto_version;
297 memcpy(buf, &com->hdr, sizeof(com->hdr));
298 if (com->payload.len) {
299 memcpy(buf + sizeof(com->hdr), com->payload.buf,
300 com->hdr.command_len - sizeof(com->hdr));
304 n = send(state->fd, buf, com->hdr.command_len, 0);
305 if (n != com->hdr.command_len) {
306 Log("SYNC_ask: write failed\n");
307 res->hdr.response = SYNC_COM_ERROR;
311 n = recv(state->fd, buf, SYNC_PROTO_MAX_LEN, 0);
312 if (n == 0 || (n < 0 && WSAEINTR != WSAGetLastError())) {
313 Log("SYNC_ask: No response\n");
314 res->hdr.response = SYNC_COM_ERROR;
317 #else /* !AFS_NT40_ENV */
318 n = write(state->fd, buf, com->hdr.command_len);
319 if (com->hdr.command_len != n) {
320 Log("SYNC_ask: write failed\n");
321 res->hdr.response = SYNC_COM_ERROR;
325 /* receive the response */
326 iov[0].iov_base = (char *)&res->hdr;
327 iov[0].iov_len = sizeof(res->hdr);
328 if (res->payload.len) {
329 iov[1].iov_base = (char *)res->payload.buf;
330 iov[1].iov_len = res->payload.len;
335 n = readv(state->fd, iov, iovcnt);
336 if (n == 0 || (n < 0 && errno != EINTR)) {
337 Log("SYNC_ask: No response\n");
338 res->hdr.response = SYNC_COM_ERROR;
341 #endif /* !AFS_NT40_ENV */
345 if (n < sizeof(res->hdr)) {
346 Log("SYNC_ask: response too short\n");
347 res->hdr.response = SYNC_COM_ERROR;
351 memcpy(&res->hdr, buf, sizeof(res->hdr));
354 if ((n - sizeof(res->hdr)) > res->payload.len) {
355 Log("SYNC_ask: response too long\n");
356 res->hdr.response = SYNC_COM_ERROR;
360 memcpy(res->payload.buf, buf + sizeof(res->hdr), n - sizeof(res->hdr));
363 if (res->hdr.response_len != n) {
364 Log("SYNC_ask: length field in response inconsistent\n");
365 res->hdr.response = SYNC_COM_ERROR;
368 if (res->hdr.response == SYNC_DENIED) {
369 Log("SYNC_ask: negative response\n");
373 return res->hdr.response;
378 * daemon com SYNC server-side interfaces
383 SYNC_getCom(int fd, SYNC_command * com)
386 afs_int32 code = SYNC_OK;
388 SYNC_PROTO_BUF_DECL(buf);
395 n = recv(fd, buf, SYNC_PROTO_MAX_LEN, 0);
397 if (n == 0 || (n < 0 && WSAEINTR != WSAGetLastError())) {
398 Log("SYNC_getCom: error receiving command\n");
399 code = SYNC_COM_ERROR;
402 #else /* !AFS_NT40_ENV */
403 iov[0].iov_base = (char *)&com->hdr;
404 iov[0].iov_len = sizeof(com->hdr);
405 if (com->payload.len) {
406 iov[1].iov_base = (char *)com->payload.buf;
407 iov[1].iov_len = com->payload.len;
413 n = readv(fd, iov, iovcnt);
414 if (n == 0 || (n < 0 && errno != EINTR)) {
415 Log("SYNC_getCom: error receiving command\n");
416 code = SYNC_COM_ERROR;
419 #endif /* !AFS_NT40_ENV */
423 if (n < sizeof(com->hdr)) {
424 Log("SYNC_getCom: command too short\n");
425 code = SYNC_COM_ERROR;
429 memcpy(&com->hdr, buf, sizeof(com->hdr));
432 if ((n - sizeof(com->hdr)) > com->payload.len) {
433 Log("SYNC_getCom: command too long\n");
434 code = SYNC_COM_ERROR;
438 memcpy(com->payload.buf, buf + sizeof(com->hdr), n - sizeof(com->hdr));
447 SYNC_putRes(int fd, SYNC_response * res)
450 afs_int32 code = SYNC_OK;
451 SYNC_PROTO_BUF_DECL(buf);
453 if (res->hdr.response_len > (sizeof(res->hdr) + res->payload.len)) {
454 Log("SYNC_putRes: response_len field in response header inconsistent\n");
455 code = SYNC_COM_ERROR;
459 if (res->hdr.response_len > SYNC_PROTO_MAX_LEN) {
460 Log("SYNC_putRes: internal SYNC buffer too small; please file a bug\n");
461 code = SYNC_COM_ERROR;
465 #ifdef AFS_DEMAND_ATTACH_FS
466 res->hdr.flags |= SYNC_FLAG_DAFS_EXTENSIONS;
469 memcpy(buf, &res->hdr, sizeof(res->hdr));
470 if (res->payload.len) {
471 memcpy(buf + sizeof(res->hdr), res->payload.buf,
472 res->hdr.response_len - sizeof(res->hdr));
476 n = send(fd, buf, res->hdr.response_len, 0);
477 #else /* !AFS_NT40_ENV */
478 n = write(fd, buf, res->hdr.response_len);
479 #endif /* !AFS_NT40_ENV */
481 if (res->hdr.response_len != n) {
482 Log("SYNC_putRes: write failed\n");
483 res->hdr.response = SYNC_COM_ERROR;
491 /* return 0 for legal (null-terminated) string,
492 * 1 for illegal (unterminated) string */
494 SYNC_verifyProtocolString(char * buf, size_t len)
499 s_len = afs_strnlen(buf, len);
501 return (s_len == len) ? 1 : 0;