9b5149a29935e1db41c7dab8035c836da1049b98
[openafs.git] / src / vol / daemon_com.c
1 /*
2  * Copyright 2006-2008, Sine Nomine Associates and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * localhost interprocess communication for servers
12  *
13  * currently handled by a localhost socket
14  * (yes, this needs to be replaced someday)
15  */
16
17 #ifndef _WIN32
18 #define FD_SETSIZE 65536
19 #endif
20
21 #include <afsconfig.h>
22 #include <afs/param.h>
23
24 #include <roken.h>
25 #include <afs/opr.h>
26
27 #include <rx/xdr.h>
28 #include <afs/afsint.h>
29 #include <afs/errors.h>
30 #include <rx/rx_queue.h>
31
32 #include "nfs.h"
33 #include "daemon_com.h"
34 #include "lwp.h"
35 #include "lock.h"
36 #include <afs/afssyscalls.h>
37 #include "ihandle.h"
38 #include "vnode.h"
39 #include "volume.h"
40 #include "partition.h"
41 #include "common.h"
42 #include <rx/rx_queue.h>
43
44 #ifdef USE_UNIX_SOCKETS
45 #include <afs/afsutil.h>
46 #include <sys/un.h>
47 #endif
48
49 int (*V_BreakVolumeCallbacks) (VolumeId);
50
51 #define MAXHANDLERS     4       /* Up to 4 clients; must be at least 2, so that
52                                  * move = dump+restore can run on single server */
53
54 #define MAX_BIND_TRIES  5       /* Number of times to retry socket bind */
55
56 static int SYNC_ask_internal(SYNC_client_state * state, SYNC_command * com, SYNC_response * res);
57
58
59 /*
60  * On AIX, connect() and bind() require use of SUN_LEN() macro;
61  * sizeof(struct sockaddr_un) will not suffice.
62  */
63 #if defined(AFS_AIX_ENV) && defined(USE_UNIX_SOCKETS)
64 #define AFS_SOCKADDR_LEN(sa)  SUN_LEN(sa)
65 #else
66 #define AFS_SOCKADDR_LEN(sa)  sizeof(*sa)
67 #endif
68
69
70 /* daemon com SYNC general interfaces */
71
72 /**
73  * fill in sockaddr structure.
74  *
75  * @param[in]  endpoint pointer to sync endpoint object
76  * @param[out] addr     pointer to sockaddr structure
77  *
78  * @post sockaddr structure populated using information from
79  *       endpoint structure.
80  */
81 void
82 SYNC_getAddr(SYNC_endpoint_t * endpoint, SYNC_sockaddr_t * addr)
83 {
84     memset(addr, 0, sizeof(*addr));
85
86 #ifdef USE_UNIX_SOCKETS
87     addr->sun_family = AF_UNIX;
88     snprintf(addr->sun_path, sizeof(addr->sun_path), "%s/%s",
89              AFSDIR_SERVER_LOCAL_DIRPATH, endpoint->un);
90     addr->sun_path[sizeof(addr->sun_path) - 1] = '\0';
91 #else  /* !USE_UNIX_SOCKETS */
92 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
93     addr->sin_len = sizeof(struct sockaddr_in);
94 #endif
95     addr->sin_addr.s_addr = htonl(0x7f000001);
96     addr->sin_family = AF_INET; /* was localhost->h_addrtype */
97     addr->sin_port = htons(endpoint->in);       /* XXXX htons not _really_ neccessary */
98 #endif /* !USE_UNIX_SOCKETS */
99 }
100
101 /**
102  * get a socket descriptor of the appropriate domain.
103  *
104  * @param[in]  endpoint pointer to sync endpoint object
105  *
106  * @return socket descriptor
107  *
108  * @post socket of domain specified in endpoint structure is created and
109  *       returned to caller.
110  */
111 osi_socket
112 SYNC_getSock(SYNC_endpoint_t * endpoint)
113 {
114     osi_socket sd;
115     opr_Verify((sd = socket(endpoint->domain, SOCK_STREAM, 0)) >= 0);
116     return sd;
117 }
118
119 /* daemon com SYNC client interface */
120
121 /**
122  * open a client connection to a sync server
123  *
124  * @param[in] state  pointer to sync client handle
125  *
126  * @return operation status
127  *    @retval 1 success
128  *
129  * @note at present, this routine aborts rather than returning an error code
130  */
131 int
132 SYNC_connect(SYNC_client_state * state)
133 {
134     SYNC_sockaddr_t addr;
135     /* I can't believe the following is needed for localhost connections!! */
136     static time_t backoff[] =
137         { 3, 3, 3, 5, 5, 5, 7, 15, 16, 24, 32, 40, 48, 0 };
138     time_t *timeout = &backoff[0];
139
140     if (state->fd != OSI_NULLSOCKET) {
141         return 1;
142     }
143
144     SYNC_getAddr(&state->endpoint, &addr);
145
146     for (;;) {
147         state->fd = SYNC_getSock(&state->endpoint);
148         if (connect(state->fd, (struct sockaddr *)&addr, AFS_SOCKADDR_LEN(&addr)) >= 0)
149             return 1;
150         if (!*timeout)
151             break;
152         if (!(*timeout & 1))
153             Log("SYNC_connect: temporary failure on circuit '%s' (will retry)\n",
154                 state->proto_name);
155         SYNC_disconnect(state);
156         sleep(*timeout++);
157     }
158     perror("SYNC_connect failed (giving up!)");
159     return 0;
160 }
161
162 /**
163  * forcibly disconnect a sync client handle.
164  *
165  * @param[in] state  pointer to sync client handle
166  *
167  * @retval operation status
168  *    @retval 0 success
169  */
170 int
171 SYNC_disconnect(SYNC_client_state * state)
172 {
173     rk_closesocket(state->fd);
174     state->fd = OSI_NULLSOCKET;
175     return 0;
176 }
177
178 /**
179  * gracefully disconnect a sync client handle.
180  *
181  * @param[in] state  pointer to sync client handle
182  *
183  * @return operation status
184  *    @retval SYNC_OK success
185  */
186 afs_int32
187 SYNC_closeChannel(SYNC_client_state * state)
188 {
189     SYNC_command com;
190     SYNC_response res;
191     SYNC_PROTO_BUF_DECL(ores);
192
193     if (state->fd == OSI_NULLSOCKET)
194         return SYNC_OK;
195
196     memset(&com, 0, sizeof(com));
197     memset(&res, 0, sizeof(res));
198
199     res.payload.len = SYNC_PROTO_MAX_LEN;
200     res.payload.buf = ores;
201
202     com.hdr.command = SYNC_COM_CHANNEL_CLOSE;
203     com.hdr.command_len = sizeof(SYNC_command_hdr);
204     com.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
205
206     /* in case the other end dropped, don't do any retries */
207     state->retry_limit = 0;
208     state->hard_timeout = 0;
209
210     SYNC_ask(state, &com, &res);
211     SYNC_disconnect(state);
212
213     return SYNC_OK;
214 }
215
216 /**
217  * forcibly break a client connection, and then create a new connection.
218  *
219  * @param[in] state  pointer to sync client handle
220  *
221  * @post old connection dropped; new connection established
222  *
223  * @return @see SYNC_connect()
224  */
225 int
226 SYNC_reconnect(SYNC_client_state * state)
227 {
228     SYNC_disconnect(state);
229     return SYNC_connect(state);
230 }
231
232 /**
233  * send a command to a sync server and wait for a response.
234  *
235  * @param[in]  state  pointer to sync client handle
236  * @param[in]  com    command object
237  * @param[out] res    response object
238  *
239  * @return operation status
240  *    @retval SYNC_OK success
241  *    @retval SYNC_COM_ERROR communications error
242  *    @retval SYNC_BAD_COMMAND server did not recognize command code
243  *
244  * @note this routine merely handles error processing; SYNC_ask_internal()
245  *       handles the low-level details of communicating with the SYNC server.
246  *
247  * @see SYNC_ask_internal
248  */
249 afs_int32
250 SYNC_ask(SYNC_client_state * state, SYNC_command * com, SYNC_response * res)
251 {
252     int tries;
253     afs_uint32 now, timeout, code=SYNC_OK;
254
255     if (state->fd == OSI_NULLSOCKET) {
256         SYNC_connect(state);
257     }
258
259     if (state->fd == OSI_NULLSOCKET) {
260         return SYNC_COM_ERROR;
261     }
262
263 #ifdef AFS_DEMAND_ATTACH_FS
264     com->hdr.flags |= SYNC_FLAG_DAFS_EXTENSIONS;
265 #endif
266
267     now = FT_ApproxTime();
268     timeout = now + state->hard_timeout;
269     for (tries = 0;
270          (tries <= state->retry_limit) && (now <= timeout);
271          tries++, now = FT_ApproxTime()) {
272         code = SYNC_ask_internal(state, com, res);
273         if (code == SYNC_OK) {
274             break;
275         } else if (code == SYNC_BAD_COMMAND) {
276             Log("SYNC_ask: protocol mismatch on circuit '%s'; make sure "
277                 "fileserver, volserver, salvageserver and salvager are same "
278                 "version\n", state->proto_name);
279             break;
280         } else if ((code == SYNC_COM_ERROR) && (tries < state->retry_limit)) {
281             Log("SYNC_ask: protocol communications failure on circuit '%s'; "
282                 "attempting reconnect to server\n", state->proto_name);
283             SYNC_reconnect(state);
284             /* try again */
285         } else {
286             /*
287              * unknown (probably protocol-specific) response code, pass it up to
288              * the caller, and let them deal with it
289              */
290             break;
291         }
292     }
293
294     if (code == SYNC_COM_ERROR) {
295         Log("SYNC_ask: too many / too latent fatal protocol errors on circuit "
296             "'%s'; giving up (tries %d timeout %d)\n",
297             state->proto_name, tries, timeout);
298     }
299
300     return code;
301 }
302
303 /**
304  * send a command to a sync server and wait for a response.
305  *
306  * @param[in]  state  pointer to sync client handle
307  * @param[in]  com    command object
308  * @param[out] res    response object
309  *
310  * @return operation status
311  *    @retval SYNC_OK success
312  *    @retval SYNC_COM_ERROR communications error
313  *
314  * @internal
315  */
316 static afs_int32
317 SYNC_ask_internal(SYNC_client_state * state, SYNC_command * com, SYNC_response * res)
318 {
319     int n;
320     SYNC_PROTO_BUF_DECL(buf);
321 #ifndef AFS_NT40_ENV
322     int iovcnt;
323     struct iovec iov[2];
324 #endif
325
326     if (state->fd == OSI_NULLSOCKET) {
327         Log("SYNC_ask:  invalid sync file descriptor on circuit '%s'\n",
328             state->proto_name);
329         res->hdr.response = SYNC_COM_ERROR;
330         goto done;
331     }
332
333     if (com->hdr.command_len > SYNC_PROTO_MAX_LEN) {
334         Log("SYNC_ask:  internal SYNC buffer too small on circuit '%s'; "
335             "please file a bug\n", state->proto_name);
336         res->hdr.response = SYNC_COM_ERROR;
337         goto done;
338     }
339
340     /*
341      * fill in some common header fields
342      */
343     com->hdr.proto_version = state->proto_version;
344     com->hdr.pkt_seq = ++state->pkt_seq;
345     com->hdr.com_seq = ++state->com_seq;
346 #ifdef AFS_NT40_ENV
347     com->hdr.pid = 0;
348     com->hdr.tid = 0;
349 #else
350     com->hdr.pid = getpid();
351 #ifdef AFS_PTHREAD_ENV
352     com->hdr.tid = afs_pointer_to_int(pthread_self());
353 #else
354     {
355         PROCESS handle = LWP_ThreadId();
356         com->hdr.tid = (handle) ? handle->index : 0;
357     }
358 #endif /* !AFS_PTHREAD_ENV */
359 #endif /* !AFS_NT40_ENV */
360
361     memcpy(buf, &com->hdr, sizeof(com->hdr));
362     if (com->payload.len) {
363         memcpy(buf + sizeof(com->hdr), com->payload.buf,
364                com->hdr.command_len - sizeof(com->hdr));
365     }
366
367 #ifdef AFS_NT40_ENV
368     n = send(state->fd, buf, com->hdr.command_len, 0);
369     if (n != com->hdr.command_len) {
370         Log("SYNC_ask:  write failed on circuit '%s'\n", state->proto_name);
371         res->hdr.response = SYNC_COM_ERROR;
372         goto done;
373     }
374
375     if (com->hdr.command == SYNC_COM_CHANNEL_CLOSE) {
376         /* short circuit close channel requests */
377         res->hdr.response = SYNC_OK;
378         goto done;
379     }
380
381     n = recv(state->fd, buf, SYNC_PROTO_MAX_LEN, 0);
382     if (n == 0 || (n < 0 && WSAEINTR != WSAGetLastError())) {
383         Log("SYNC_ask:  No response on circuit '%s'\n", state->proto_name);
384         res->hdr.response = SYNC_COM_ERROR;
385         goto done;
386     }
387 #else /* !AFS_NT40_ENV */
388     n = write(state->fd, buf, com->hdr.command_len);
389     if (com->hdr.command_len != n) {
390         Log("SYNC_ask: write failed on circuit '%s'\n", state->proto_name);
391         res->hdr.response = SYNC_COM_ERROR;
392         goto done;
393     }
394
395     if (com->hdr.command == SYNC_COM_CHANNEL_CLOSE) {
396         /* short circuit close channel requests */
397         res->hdr.response = SYNC_OK;
398         goto done;
399     }
400
401     /* receive the response */
402     iov[0].iov_base = (char *)&res->hdr;
403     iov[0].iov_len = sizeof(res->hdr);
404     if (res->payload.len) {
405         iov[1].iov_base = (char *)res->payload.buf;
406         iov[1].iov_len = res->payload.len;
407         iovcnt = 2;
408     } else {
409         iovcnt = 1;
410     }
411     n = readv(state->fd, iov, iovcnt);
412     if (n == 0 || (n < 0 && errno != EINTR)) {
413         Log("SYNC_ask: No response on circuit '%s'\n", state->proto_name);
414         res->hdr.response = SYNC_COM_ERROR;
415         goto done;
416     }
417 #endif /* !AFS_NT40_ENV */
418
419     res->recv_len = n;
420
421     if (n < sizeof(res->hdr)) {
422         Log("SYNC_ask:  response too short on circuit '%s'\n",
423             state->proto_name);
424         res->hdr.response = SYNC_COM_ERROR;
425         goto done;
426     }
427 #ifdef AFS_NT40_ENV
428     memcpy(&res->hdr, buf, sizeof(res->hdr));
429 #endif
430
431     if ((n - sizeof(res->hdr)) > res->payload.len) {
432         Log("SYNC_ask:  response too long on circuit '%s'\n",
433             state->proto_name);
434         res->hdr.response = SYNC_COM_ERROR;
435         goto done;
436     }
437 #ifdef AFS_NT40_ENV
438     memcpy(res->payload.buf, buf + sizeof(res->hdr), n - sizeof(res->hdr));
439 #endif
440
441     if (res->hdr.response_len != n) {
442         Log("SYNC_ask:  length field in response inconsistent "
443             "on circuit '%s'\n", state->proto_name);
444         res->hdr.response = SYNC_COM_ERROR;
445         goto done;
446     }
447     if (res->hdr.response == SYNC_DENIED) {
448         Log("SYNC_ask: negative response on circuit '%s'\n", state->proto_name);
449     }
450
451   done:
452     return res->hdr.response;
453 }
454
455
456 /*
457  * daemon com SYNC server-side interfaces
458  */
459
460 /**
461  * receive a command structure off a sync socket.
462  *
463  * @param[in]  state  pointer to server-side state object
464  * @param[in]  fd     file descriptor on which to perform i/o
465  * @param[out] com    sync command object to be populated
466  *
467  * @return operation status
468  *    @retval SYNC_OK command received
469  *    @retval SYNC_COM_ERROR there was a socket communications error
470  */
471 afs_int32
472 SYNC_getCom(SYNC_server_state_t * state,
473             osi_socket fd,
474             SYNC_command * com)
475 {
476     int n;
477     afs_int32 code = SYNC_OK;
478 #ifdef AFS_NT40_ENV
479     SYNC_PROTO_BUF_DECL(buf);
480 #else
481     struct iovec iov[2];
482     int iovcnt;
483 #endif
484
485 #ifdef AFS_NT40_ENV
486     n = recv(fd, buf, SYNC_PROTO_MAX_LEN, 0);
487
488     if (n == 0 || (n < 0 && WSAEINTR != WSAGetLastError())) {
489         Log("SYNC_getCom:  error receiving command\n");
490         code = SYNC_COM_ERROR;
491         goto done;
492     }
493 #else /* !AFS_NT40_ENV */
494     iov[0].iov_base = (char *)&com->hdr;
495     iov[0].iov_len = sizeof(com->hdr);
496     if (com->payload.len) {
497         iov[1].iov_base = (char *)com->payload.buf;
498         iov[1].iov_len = com->payload.len;
499         iovcnt = 2;
500     } else {
501         iovcnt = 1;
502     }
503
504     n = readv(fd, iov, iovcnt);
505     if (n == 0 || (n < 0 && errno != EINTR)) {
506         Log("SYNC_getCom:  error receiving command\n");
507         code = SYNC_COM_ERROR;
508         goto done;
509     }
510 #endif /* !AFS_NT40_ENV */
511
512     com->recv_len = n;
513
514     if (n < sizeof(com->hdr)) {
515         Log("SYNC_getCom:  command too short\n");
516         code = SYNC_COM_ERROR;
517         goto done;
518     }
519 #ifdef AFS_NT40_ENV
520     memcpy(&com->hdr, buf, sizeof(com->hdr));
521 #endif
522
523     if ((n - sizeof(com->hdr)) > com->payload.len) {
524         Log("SYNC_getCom:  command too long\n");
525         code = SYNC_COM_ERROR;
526         goto done;
527     }
528 #ifdef AFS_NT40_ENV
529     memcpy(com->payload.buf, buf + sizeof(com->hdr), n - sizeof(com->hdr));
530 #endif
531
532  done:
533     return code;
534 }
535
536 /**
537  * write a response structure to a sync socket.
538  *
539  * @param[in] state  handle to server-side state object
540  * @param[in] fd     file descriptor on which to perform i/o
541  * @param[in] res    handle to response packet
542  *
543  * @return operation status
544  *    @retval SYNC_OK
545  *    @retval SYNC_COM_ERROR
546  */
547 afs_int32
548 SYNC_putRes(SYNC_server_state_t * state,
549             osi_socket fd,
550             SYNC_response * res)
551 {
552     int n;
553     afs_int32 code = SYNC_OK;
554     SYNC_PROTO_BUF_DECL(buf);
555
556     if (res->hdr.response_len > (sizeof(res->hdr) + res->payload.len)) {
557         Log("SYNC_putRes:  response_len field in response header inconsistent\n");
558         code = SYNC_COM_ERROR;
559         goto done;
560     }
561
562     if (res->hdr.response_len > SYNC_PROTO_MAX_LEN) {
563         Log("SYNC_putRes:  internal SYNC buffer too small; please file a bug\n");
564         code = SYNC_COM_ERROR;
565         goto done;
566     }
567
568 #ifdef AFS_DEMAND_ATTACH_FS
569     res->hdr.flags |= SYNC_FLAG_DAFS_EXTENSIONS;
570 #endif
571     res->hdr.proto_version = state->proto_version;
572     res->hdr.pkt_seq = ++state->pkt_seq;
573     res->hdr.res_seq = ++state->res_seq;
574
575     memcpy(buf, &res->hdr, sizeof(res->hdr));
576     if (res->payload.len) {
577         memcpy(buf + sizeof(res->hdr), res->payload.buf,
578                res->hdr.response_len - sizeof(res->hdr));
579     }
580
581 #ifdef AFS_NT40_ENV
582     n = send(fd, buf, res->hdr.response_len, 0);
583 #else /* !AFS_NT40_ENV */
584     n = write(fd, buf, res->hdr.response_len);
585 #endif /* !AFS_NT40_ENV */
586
587     if (res->hdr.response_len != n) {
588         Log("SYNC_putRes: write failed\n");
589         res->hdr.response = SYNC_COM_ERROR;
590         goto done;
591     }
592
593  done:
594     return code;
595 }
596
597 /* return 0 for legal (null-terminated) string,
598  * 1 for illegal (unterminated) string */
599 int
600 SYNC_verifyProtocolString(char * buf, size_t len)
601 {
602     size_t s_len;
603
604     s_len = strnlen(buf, len);
605
606     return (s_len == len) ? 1 : 0;
607 }
608
609 /**
610  * clean up old sockets.
611  *
612  * @param[in]  state  server state object
613  *
614  * @post unix domain sockets are cleaned up
615  */
616 void
617 SYNC_cleanupSock(SYNC_server_state_t * state)
618 {
619 #ifdef USE_UNIX_SOCKETS
620     remove(state->addr.sun_path);
621 #endif
622 }
623
624 /**
625  * bind socket and set it to listen state.
626  *
627  * @param[in] state  server state object
628  *
629  * @return operation status
630  *    @retval 0 success
631  *    @retval nonzero failure
632  *
633  * @post socket bound and set to listen state
634  */
635 int
636 SYNC_bindSock(SYNC_server_state_t * state)
637 {
638     int code;
639     int on = 1;
640     int numTries;
641
642     /* Reuseaddr needed because system inexplicably leaves crud lying around */
643     code =
644         setsockopt(state->fd, SOL_SOCKET, SO_REUSEADDR, (char *)&on,
645                    sizeof(on));
646     if (code)
647         Log("SYNC_bindSock: setsockopt failed with (%d)\n", errno);
648
649     for (numTries = 0; numTries < state->bind_retry_limit; numTries++) {
650         code = bind(state->fd,
651                     (struct sockaddr *)&state->addr,
652                     AFS_SOCKADDR_LEN(&state->addr));
653         if (code == 0)
654             break;
655         Log("SYNC_bindSock: bind failed with (%d), will sleep and retry\n",
656             errno);
657         sleep(5);
658     }
659     listen(state->fd, state->listen_depth);
660
661     return code;
662 }