2 * Copyright 2006, Sine Nomine Associates and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
11 * localhost interprocess communication for servers
13 * currently handled by a localhost socket
14 * (yes, this needs to be replaced someday)
18 #define FD_SETSIZE 65536
21 #include <afsconfig.h>
22 #include <afs/param.h>
27 #include <sys/types.h>
33 #include <sys/param.h>
34 #include <sys/socket.h>
35 #include <netinet/in.h>
46 #include <afs/afsint.h>
48 #include <afs/errors.h>
49 #include "daemon_com.h"
52 #include <afs/afssyscalls.h>
56 #include "partition.h"
57 #include <rx/rx_queue.h>
59 #ifdef USE_UNIX_SOCKETS
60 #include <afs/afsutil.h>
64 /*@printflike@*/ extern void Log(const char *format, ...);
69 #define osi_Assert(e) (void)(e)
71 int (*V_BreakVolumeCallbacks) ();
73 #define MAXHANDLERS 4 /* Up to 4 clients; must be at least 2, so that
74 * move = dump+restore can run on single server */
76 #define MAX_BIND_TRIES 5 /* Number of times to retry socket bind */
78 #ifdef USE_UNIX_SOCKETS
79 static getport(SYNC_client_state * state, struct sockaddr_un *addr);
80 #else /* USE_UNIX_SOCKETS */
81 static getport(SYNC_client_state * state, struct sockaddr_in *addr);
82 #endif /* USE_UNIX_SOCKETS */
84 static int SYNC_ask_internal(SYNC_client_state * state, SYNC_command * com, SYNC_response * res);
86 /* daemon com SYNC client interface */
89 SYNC_connect(SYNC_client_state * state)
91 #ifdef USE_UNIX_SOCKETS
92 struct sockaddr_un addr;
93 #else /* USE_UNIX_SOCKETS */
94 struct sockaddr_in addr;
95 #endif /* USE_UNIX_SOCKETS */
96 /* I can't believe the following is needed for localhost connections!! */
97 static time_t backoff[] =
98 { 3, 3, 3, 5, 5, 5, 7, 15, 16, 24, 32, 40, 48, 0 };
99 time_t *timeout = &backoff[0];
101 if (state->fd >= 0) {
106 state->fd = getport(state, &addr);
107 if (connect(state->fd, (struct sockaddr *)&addr, sizeof(addr)) >= 0)
112 Log("SYNC_connect temporary failure (will retry)\n");
113 SYNC_disconnect(state);
116 perror("SYNC_connect failed (giving up!)");
121 SYNC_disconnect(SYNC_client_state * state)
124 closesocket(state->fd);
133 SYNC_closeChannel(SYNC_client_state * state)
138 SYNC_PROTO_BUF_DECL(ores);
143 memset(&com, 0, sizeof(com));
144 memset(&res, 0, sizeof(res));
146 res.payload.len = SYNC_PROTO_MAX_LEN;
147 res.payload.buf = ores;
149 com.hdr.command = SYNC_COM_CHANNEL_CLOSE;
150 com.hdr.command_len = sizeof(SYNC_command_hdr);
152 /* in case the other end dropped, don't do any retries */
153 state->retry_limit = 0;
154 state->hard_timeout = 0;
156 code = SYNC_ask(state, &com, &res);
158 if (code == SYNC_OK) {
159 if (res.hdr.response != SYNC_OK) {
160 Log("SYNC_closeChannel: channel shutdown request denied; closing socket anyway\n");
161 } else if (!(res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN)) {
162 Log("SYNC_closeChannel: channel shutdown request mishandled by server\n");
165 Log("SYNC_closeChannel: channel communications problem");
168 SYNC_disconnect(state);
174 SYNC_reconnect(SYNC_client_state * state)
176 SYNC_disconnect(state);
177 return SYNC_connect(state);
180 /* private function to fill in the sockaddr struct for us */
181 #ifdef USE_UNIX_SOCKETS
183 getport(SYNC_client_state * state, struct sockaddr_un *addr)
186 char tbuffer[AFSDIR_PATH_MAX];
188 strcompose(tbuffer, AFSDIR_PATH_MAX, AFSDIR_SERVER_LOCAL_DIRPATH, "/",
189 "fssync.sock", NULL);
190 memset(addr, 0, sizeof(*addr));
191 addr->sun_family = AF_UNIX;
192 strncpy(addr->sun_path, tbuffer, (sizeof(struct sockaddr_un) - sizeof(short)));
193 assert((sd = socket(AF_UNIX, SOCK_STREAM, 0)) >= 0);
196 #else /* USE_UNIX_SOCKETS */
198 getport(SYNC_client_state * state, struct sockaddr_in *addr)
201 memset(addr, 0, sizeof(*addr));
202 assert((sd = socket(AF_INET, SOCK_STREAM, 0)) >= 0);
203 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
204 addr->sin_len = sizeof(struct sockaddr_in);
206 addr->sin_addr.s_addr = htonl(0x7f000001);
207 addr->sin_family = AF_INET; /* was localhost->h_addrtype */
208 addr->sin_port = htons(state->port); /* XXXX htons not _really_ neccessary */
211 #endif /* USE_UNIX_SOCKETS */
214 SYNC_ask(SYNC_client_state * state, SYNC_command * com, SYNC_response * res)
217 afs_uint32 now, timeout, code=SYNC_OK;
219 if (state->fatal_error) {
220 return SYNC_COM_ERROR;
223 if (state->fd == -1) {
227 if (state->fd == -1) {
228 state->fatal_error = 1;
229 return SYNC_COM_ERROR;
232 #ifdef AFS_DEMAND_ATTACH_FS
233 com->hdr.flags |= SYNC_FLAG_DAFS_EXTENSIONS;
236 now = FT_ApproxTime();
237 timeout = now + state->hard_timeout;
239 (tries <= state->retry_limit) && (now <= timeout);
240 tries++, now = FT_ApproxTime()) {
241 code = SYNC_ask_internal(state, com, res);
242 if (code == SYNC_OK) {
244 } else if (code == SYNC_BAD_COMMAND) {
245 Log("SYNC_ask: protocol mismatch; make sure fileserver, volserver, salvageserver and salvager are same version\n");
247 } else if (code == SYNC_COM_ERROR) {
248 Log("SYNC_ask: protocol communications failure; attempting reconnect to server\n");
249 SYNC_reconnect(state);
252 /* unknown (probably protocol-specific) response code, pass it up to the caller, and let them deal with it */
257 if (code == SYNC_COM_ERROR) {
258 Log("SYNC_ask: fatal protocol error; disabling sync protocol to server running on port %d until next server restart\n",
260 state->fatal_error = 1;
267 SYNC_ask_internal(SYNC_client_state * state, SYNC_command * com, SYNC_response * res)
270 SYNC_PROTO_BUF_DECL(buf);
276 if (state->fd == -1) {
277 Log("SYNC_ask: invalid sync file descriptor\n");
278 res->hdr.response = SYNC_COM_ERROR;
282 if (com->hdr.command_len > SYNC_PROTO_MAX_LEN) {
283 Log("SYNC_ask: internal SYNC buffer too small; please file a bug\n");
284 res->hdr.response = SYNC_COM_ERROR;
288 com->hdr.proto_version = state->proto_version;
290 memcpy(buf, &com->hdr, sizeof(com->hdr));
291 if (com->payload.len) {
292 memcpy(buf + sizeof(com->hdr), com->payload.buf,
293 com->hdr.command_len - sizeof(com->hdr));
297 n = send(state->fd, buf, com->hdr.command_len, 0);
298 if (n != com->hdr.command_len) {
299 Log("SYNC_ask: write failed\n");
300 res->hdr.response = SYNC_COM_ERROR;
304 n = recv(state->fd, buf, SYNC_PROTO_MAX_LEN, 0);
305 if (n == 0 || (n < 0 && WSAEINTR != WSAGetLastError())) {
306 Log("SYNC_ask: No response\n");
307 res->hdr.response = SYNC_COM_ERROR;
310 #else /* !AFS_NT40_ENV */
311 n = write(state->fd, buf, com->hdr.command_len);
312 if (com->hdr.command_len != n) {
313 Log("SYNC_ask: write failed\n");
314 res->hdr.response = SYNC_COM_ERROR;
318 /* receive the response */
319 iov[0].iov_base = (char *)&res->hdr;
320 iov[0].iov_len = sizeof(res->hdr);
321 if (res->payload.len) {
322 iov[1].iov_base = (char *)res->payload.buf;
323 iov[1].iov_len = res->payload.len;
328 n = readv(state->fd, iov, iovcnt);
329 if (n == 0 || (n < 0 && errno != EINTR)) {
330 Log("SYNC_ask: No response\n");
331 res->hdr.response = SYNC_COM_ERROR;
334 #endif /* !AFS_NT40_ENV */
338 if (n < sizeof(res->hdr)) {
339 Log("SYNC_ask: response too short\n");
340 res->hdr.response = SYNC_COM_ERROR;
344 memcpy(&res->hdr, buf, sizeof(res->hdr));
347 if ((n - sizeof(res->hdr)) > res->payload.len) {
348 Log("SYNC_ask: response too long\n");
349 res->hdr.response = SYNC_COM_ERROR;
353 memcpy(res->payload.buf, buf + sizeof(res->hdr), n - sizeof(res->hdr));
356 if (res->hdr.response_len != n) {
357 Log("SYNC_ask: length field in response inconsistent\n");
358 res->hdr.response = SYNC_COM_ERROR;
361 if (res->hdr.response == SYNC_DENIED) {
362 Log("SYNC_ask: negative response\n");
366 return res->hdr.response;
371 * daemon com SYNC server-side interfaces
376 SYNC_getCom(int fd, SYNC_command * com)
379 afs_int32 code = SYNC_OK;
381 SYNC_PROTO_BUF_DECL(buf);
388 n = recv(fd, buf, SYNC_PROTO_MAX_LEN, 0);
390 if (n == 0 || (n < 0 && WSAEINTR != WSAGetLastError())) {
391 Log("SYNC_getCom: error receiving command\n");
392 code = SYNC_COM_ERROR;
395 #else /* !AFS_NT40_ENV */
396 iov[0].iov_base = (char *)&com->hdr;
397 iov[0].iov_len = sizeof(com->hdr);
398 if (com->payload.len) {
399 iov[1].iov_base = (char *)com->payload.buf;
400 iov[1].iov_len = com->payload.len;
406 n = readv(fd, iov, iovcnt);
407 if (n == 0 || (n < 0 && errno != EINTR)) {
408 Log("SYNC_getCom: error receiving command\n");
409 code = SYNC_COM_ERROR;
412 #endif /* !AFS_NT40_ENV */
416 if (n < sizeof(com->hdr)) {
417 Log("SYNC_getCom: command too short\n");
418 code = SYNC_COM_ERROR;
422 memcpy(&com->hdr, buf, sizeof(com->hdr));
425 if ((n - sizeof(com->hdr)) > com->payload.len) {
426 Log("SYNC_getCom: command too long\n");
427 code = SYNC_COM_ERROR;
431 memcpy(com->payload.buf, buf + sizeof(com->hdr), n - sizeof(com->hdr));
440 SYNC_putRes(int fd, SYNC_response * res)
443 afs_int32 code = SYNC_OK;
444 SYNC_PROTO_BUF_DECL(buf);
446 if (res->hdr.response_len > (sizeof(res->hdr) + res->payload.len)) {
447 Log("SYNC_putRes: response_len field in response header inconsistent\n");
448 code = SYNC_COM_ERROR;
452 if (res->hdr.response_len > SYNC_PROTO_MAX_LEN) {
453 Log("SYNC_putRes: internal SYNC buffer too small; please file a bug\n");
454 code = SYNC_COM_ERROR;
458 #ifdef AFS_DEMAND_ATTACH_FS
459 res->hdr.flags |= SYNC_FLAG_DAFS_EXTENSIONS;
462 memcpy(buf, &res->hdr, sizeof(res->hdr));
463 if (res->payload.len) {
464 memcpy(buf + sizeof(res->hdr), res->payload.buf,
465 res->hdr.response_len - sizeof(res->hdr));
469 n = send(fd, buf, res->hdr.response_len, 0);
470 #else /* !AFS_NT40_ENV */
471 n = write(fd, buf, res->hdr.response_len);
472 #endif /* !AFS_NT40_ENV */
474 if (res->hdr.response_len != n) {
475 Log("SYNC_putRes: write failed\n");
476 res->hdr.response = SYNC_COM_ERROR;
484 /* return 0 for legal (null-terminated) string,
485 * 1 for illegal (unterminated) string */
487 SYNC_verifyProtocolString(char * buf, size_t len)
492 s_len = afs_strnlen(buf, len);
494 return (s_len == len) ? 1 : 0;