vol: Tidy header includes
[openafs.git] / src / vol / daemon_com.c
1 /*
2  * Copyright 2006-2008, Sine Nomine Associates and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * localhost interprocess communication for servers
12  *
13  * currently handled by a localhost socket
14  * (yes, this needs to be replaced someday)
15  */
16
17 #ifndef _WIN32
18 #define FD_SETSIZE 65536
19 #endif
20
21 #include <afsconfig.h>
22 #include <afs/param.h>
23
24 #include <roken.h>
25
26 #include <afs/afs_assert.h>
27 #include <rx/xdr.h>
28 #include <afs/afsint.h>
29 #include <afs/errors.h>
30
31 #include "nfs.h"
32 #include "daemon_com.h"
33 #include "lwp.h"
34 #include "lock.h"
35 #include <afs/afssyscalls.h>
36 #include "ihandle.h"
37 #include "vnode.h"
38 #include "volume.h"
39 #include "partition.h"
40 #include "common.h"
41 #include <rx/rx_queue.h>
42
43 #ifdef USE_UNIX_SOCKETS
44 #include <afs/afsutil.h>
45 #include <sys/un.h>
46 #endif
47
48 int (*V_BreakVolumeCallbacks) (VolumeId);
49
50 #define MAXHANDLERS     4       /* Up to 4 clients; must be at least 2, so that
51                                  * move = dump+restore can run on single server */
52
53 #define MAX_BIND_TRIES  5       /* Number of times to retry socket bind */
54
55 static int SYNC_ask_internal(SYNC_client_state * state, SYNC_command * com, SYNC_response * res);
56
57
58 /*
59  * On AIX, connect() and bind() require use of SUN_LEN() macro;
60  * sizeof(struct sockaddr_un) will not suffice.
61  */
62 #if defined(AFS_AIX_ENV) && defined(USE_UNIX_SOCKETS)
63 #define AFS_SOCKADDR_LEN(sa)  SUN_LEN(sa)
64 #else
65 #define AFS_SOCKADDR_LEN(sa)  sizeof(*sa)
66 #endif
67
68
69 /* daemon com SYNC general interfaces */
70
71 /**
72  * fill in sockaddr structure.
73  *
74  * @param[in]  endpoint pointer to sync endpoint object
75  * @param[out] addr     pointer to sockaddr structure
76  *
77  * @post sockaddr structure populated using information from
78  *       endpoint structure.
79  */
80 void
81 SYNC_getAddr(SYNC_endpoint_t * endpoint, SYNC_sockaddr_t * addr)
82 {
83 #ifdef USE_UNIX_SOCKETS
84     char tbuffer[AFSDIR_PATH_MAX];
85 #endif /* USE_UNIX_SOCKETS */
86
87     memset(addr, 0, sizeof(*addr));
88
89 #ifdef USE_UNIX_SOCKETS
90     strcompose(tbuffer, AFSDIR_PATH_MAX, AFSDIR_SERVER_LOCAL_DIRPATH, "/",
91                endpoint->un, NULL);
92     addr->sun_family = AF_UNIX;
93     strncpy(addr->sun_path, tbuffer, (sizeof(struct sockaddr_un) - sizeof(short)));
94 #else  /* !USE_UNIX_SOCKETS */
95 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
96     addr->sin_len = sizeof(struct sockaddr_in);
97 #endif
98     addr->sin_addr.s_addr = htonl(0x7f000001);
99     addr->sin_family = AF_INET; /* was localhost->h_addrtype */
100     addr->sin_port = htons(endpoint->in);       /* XXXX htons not _really_ neccessary */
101 #endif /* !USE_UNIX_SOCKETS */
102 }
103
104 /**
105  * get a socket descriptor of the appropriate domain.
106  *
107  * @param[in]  endpoint pointer to sync endpoint object
108  *
109  * @return socket descriptor
110  *
111  * @post socket of domain specified in endpoint structure is created and
112  *       returned to caller.
113  */
114 osi_socket
115 SYNC_getSock(SYNC_endpoint_t * endpoint)
116 {
117     osi_socket sd;
118     osi_Assert((sd = socket(endpoint->domain, SOCK_STREAM, 0)) >= 0);
119     return sd;
120 }
121
122 /* daemon com SYNC client interface */
123
124 /**
125  * open a client connection to a sync server
126  *
127  * @param[in] state  pointer to sync client handle
128  *
129  * @return operation status
130  *    @retval 1 success
131  *
132  * @note at present, this routine aborts rather than returning an error code
133  */
134 int
135 SYNC_connect(SYNC_client_state * state)
136 {
137     SYNC_sockaddr_t addr;
138     /* I can't believe the following is needed for localhost connections!! */
139     static time_t backoff[] =
140         { 3, 3, 3, 5, 5, 5, 7, 15, 16, 24, 32, 40, 48, 0 };
141     time_t *timeout = &backoff[0];
142
143     if (state->fd != OSI_NULLSOCKET) {
144         return 1;
145     }
146
147     SYNC_getAddr(&state->endpoint, &addr);
148
149     for (;;) {
150         state->fd = SYNC_getSock(&state->endpoint);
151         if (connect(state->fd, (struct sockaddr *)&addr, AFS_SOCKADDR_LEN(&addr)) >= 0)
152             return 1;
153         if (!*timeout)
154             break;
155         if (!(*timeout & 1))
156             Log("SYNC_connect: temporary failure on circuit '%s' (will retry)\n",
157                 state->proto_name);
158         SYNC_disconnect(state);
159         sleep(*timeout++);
160     }
161     perror("SYNC_connect failed (giving up!)");
162     return 0;
163 }
164
165 /**
166  * forcibly disconnect a sync client handle.
167  *
168  * @param[in] state  pointer to sync client handle
169  *
170  * @retval operation status
171  *    @retval 0 success
172  */
173 int
174 SYNC_disconnect(SYNC_client_state * state)
175 {
176 #ifdef AFS_NT40_ENV
177     closesocket(state->fd);
178 #else
179     close(state->fd);
180 #endif
181     state->fd = OSI_NULLSOCKET;
182     return 0;
183 }
184
185 /**
186  * gracefully disconnect a sync client handle.
187  *
188  * @param[in] state  pointer to sync client handle
189  *
190  * @return operation status
191  *    @retval SYNC_OK success
192  */
193 afs_int32
194 SYNC_closeChannel(SYNC_client_state * state)
195 {
196     SYNC_command com;
197     SYNC_response res;
198     SYNC_PROTO_BUF_DECL(ores);
199
200     if (state->fd == OSI_NULLSOCKET)
201         return SYNC_OK;
202
203     memset(&com, 0, sizeof(com));
204     memset(&res, 0, sizeof(res));
205
206     res.payload.len = SYNC_PROTO_MAX_LEN;
207     res.payload.buf = ores;
208
209     com.hdr.command = SYNC_COM_CHANNEL_CLOSE;
210     com.hdr.command_len = sizeof(SYNC_command_hdr);
211     com.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
212
213     /* in case the other end dropped, don't do any retries */
214     state->retry_limit = 0;
215     state->hard_timeout = 0;
216
217     SYNC_ask(state, &com, &res);
218     SYNC_disconnect(state);
219
220     return SYNC_OK;
221 }
222
223 /**
224  * forcibly break a client connection, and then create a new connection.
225  *
226  * @param[in] state  pointer to sync client handle
227  *
228  * @post old connection dropped; new connection established
229  *
230  * @return @see SYNC_connect()
231  */
232 int
233 SYNC_reconnect(SYNC_client_state * state)
234 {
235     SYNC_disconnect(state);
236     return SYNC_connect(state);
237 }
238
239 /**
240  * send a command to a sync server and wait for a response.
241  *
242  * @param[in]  state  pointer to sync client handle
243  * @param[in]  com    command object
244  * @param[out] res    response object
245  *
246  * @return operation status
247  *    @retval SYNC_OK success
248  *    @retval SYNC_COM_ERROR communications error
249  *    @retval SYNC_BAD_COMMAND server did not recognize command code
250  *
251  * @note this routine merely handles error processing; SYNC_ask_internal()
252  *       handles the low-level details of communicating with the SYNC server.
253  *
254  * @see SYNC_ask_internal
255  */
256 afs_int32
257 SYNC_ask(SYNC_client_state * state, SYNC_command * com, SYNC_response * res)
258 {
259     int tries;
260     afs_uint32 now, timeout, code=SYNC_OK;
261
262     if (state->fatal_error) {
263         return SYNC_COM_ERROR;
264     }
265
266     if (state->fd == OSI_NULLSOCKET) {
267         SYNC_connect(state);
268     }
269
270     if (state->fd == OSI_NULLSOCKET) {
271         state->fatal_error = 1;
272         return SYNC_COM_ERROR;
273     }
274
275 #ifdef AFS_DEMAND_ATTACH_FS
276     com->hdr.flags |= SYNC_FLAG_DAFS_EXTENSIONS;
277 #endif
278
279     now = FT_ApproxTime();
280     timeout = now + state->hard_timeout;
281     for (tries = 0;
282          (tries <= state->retry_limit) && (now <= timeout);
283          tries++, now = FT_ApproxTime()) {
284         code = SYNC_ask_internal(state, com, res);
285         if (code == SYNC_OK) {
286             break;
287         } else if (code == SYNC_BAD_COMMAND) {
288             Log("SYNC_ask: protocol mismatch on circuit '%s'; make sure "
289                 "fileserver, volserver, salvageserver and salvager are same "
290                 "version\n", state->proto_name);
291             break;
292         } else if ((code == SYNC_COM_ERROR) && (tries < state->retry_limit)) {
293             Log("SYNC_ask: protocol communications failure on circuit '%s'; "
294                 "attempting reconnect to server\n", state->proto_name);
295             SYNC_reconnect(state);
296             /* try again */
297         } else {
298             /*
299              * unknown (probably protocol-specific) response code, pass it up to
300              * the caller, and let them deal with it
301              */
302             break;
303         }
304     }
305
306     if (code == SYNC_COM_ERROR) {
307         Log("SYNC_ask: fatal protocol error on circuit '%s'; disabling sync "
308             "protocol until next server restart\n",
309             state->proto_name);
310         state->fatal_error = 1;
311     }
312
313     return code;
314 }
315
316 /**
317  * send a command to a sync server and wait for a response.
318  *
319  * @param[in]  state  pointer to sync client handle
320  * @param[in]  com    command object
321  * @param[out] res    response object
322  *
323  * @return operation status
324  *    @retval SYNC_OK success
325  *    @retval SYNC_COM_ERROR communications error
326  *
327  * @internal
328  */
329 static afs_int32
330 SYNC_ask_internal(SYNC_client_state * state, SYNC_command * com, SYNC_response * res)
331 {
332     int n;
333     SYNC_PROTO_BUF_DECL(buf);
334 #ifndef AFS_NT40_ENV
335     int iovcnt;
336     struct iovec iov[2];
337 #endif
338
339     if (state->fd == OSI_NULLSOCKET) {
340         Log("SYNC_ask:  invalid sync file descriptor on circuit '%s'\n",
341             state->proto_name);
342         res->hdr.response = SYNC_COM_ERROR;
343         goto done;
344     }
345
346     if (com->hdr.command_len > SYNC_PROTO_MAX_LEN) {
347         Log("SYNC_ask:  internal SYNC buffer too small on circuit '%s'; "
348             "please file a bug\n", state->proto_name);
349         res->hdr.response = SYNC_COM_ERROR;
350         goto done;
351     }
352
353     /*
354      * fill in some common header fields
355      */
356     com->hdr.proto_version = state->proto_version;
357     com->hdr.pkt_seq = ++state->pkt_seq;
358     com->hdr.com_seq = ++state->com_seq;
359 #ifdef AFS_NT40_ENV
360     com->hdr.pid = 0;
361     com->hdr.tid = 0;
362 #else
363     com->hdr.pid = getpid();
364 #ifdef AFS_PTHREAD_ENV
365     com->hdr.tid = afs_pointer_to_int(pthread_self());
366 #else
367     {
368         PROCESS handle = LWP_ThreadId();
369         com->hdr.tid = (handle) ? handle->index : 0;
370     }
371 #endif /* !AFS_PTHREAD_ENV */
372 #endif /* !AFS_NT40_ENV */
373
374     memcpy(buf, &com->hdr, sizeof(com->hdr));
375     if (com->payload.len) {
376         memcpy(buf + sizeof(com->hdr), com->payload.buf,
377                com->hdr.command_len - sizeof(com->hdr));
378     }
379
380 #ifdef AFS_NT40_ENV
381     n = send(state->fd, buf, com->hdr.command_len, 0);
382     if (n != com->hdr.command_len) {
383         Log("SYNC_ask:  write failed on circuit '%s'\n", state->proto_name);
384         res->hdr.response = SYNC_COM_ERROR;
385         goto done;
386     }
387
388     if (com->hdr.command == SYNC_COM_CHANNEL_CLOSE) {
389         /* short circuit close channel requests */
390         res->hdr.response = SYNC_OK;
391         goto done;
392     }
393
394     n = recv(state->fd, buf, SYNC_PROTO_MAX_LEN, 0);
395     if (n == 0 || (n < 0 && WSAEINTR != WSAGetLastError())) {
396         Log("SYNC_ask:  No response on circuit '%s'\n", state->proto_name);
397         res->hdr.response = SYNC_COM_ERROR;
398         goto done;
399     }
400 #else /* !AFS_NT40_ENV */
401     n = write(state->fd, buf, com->hdr.command_len);
402     if (com->hdr.command_len != n) {
403         Log("SYNC_ask: write failed on circuit '%s'\n", state->proto_name);
404         res->hdr.response = SYNC_COM_ERROR;
405         goto done;
406     }
407
408     if (com->hdr.command == SYNC_COM_CHANNEL_CLOSE) {
409         /* short circuit close channel requests */
410         res->hdr.response = SYNC_OK;
411         goto done;
412     }
413
414     /* receive the response */
415     iov[0].iov_base = (char *)&res->hdr;
416     iov[0].iov_len = sizeof(res->hdr);
417     if (res->payload.len) {
418         iov[1].iov_base = (char *)res->payload.buf;
419         iov[1].iov_len = res->payload.len;
420         iovcnt = 2;
421     } else {
422         iovcnt = 1;
423     }
424     n = readv(state->fd, iov, iovcnt);
425     if (n == 0 || (n < 0 && errno != EINTR)) {
426         Log("SYNC_ask: No response on circuit '%s'\n", state->proto_name);
427         res->hdr.response = SYNC_COM_ERROR;
428         goto done;
429     }
430 #endif /* !AFS_NT40_ENV */
431
432     res->recv_len = n;
433
434     if (n < sizeof(res->hdr)) {
435         Log("SYNC_ask:  response too short on circuit '%s'\n",
436             state->proto_name);
437         res->hdr.response = SYNC_COM_ERROR;
438         goto done;
439     }
440 #ifdef AFS_NT40_ENV
441     memcpy(&res->hdr, buf, sizeof(res->hdr));
442 #endif
443
444     if ((n - sizeof(res->hdr)) > res->payload.len) {
445         Log("SYNC_ask:  response too long on circuit '%s'\n",
446             state->proto_name);
447         res->hdr.response = SYNC_COM_ERROR;
448         goto done;
449     }
450 #ifdef AFS_NT40_ENV
451     memcpy(res->payload.buf, buf + sizeof(res->hdr), n - sizeof(res->hdr));
452 #endif
453
454     if (res->hdr.response_len != n) {
455         Log("SYNC_ask:  length field in response inconsistent "
456             "on circuit '%s'\n", state->proto_name);
457         res->hdr.response = SYNC_COM_ERROR;
458         goto done;
459     }
460     if (res->hdr.response == SYNC_DENIED) {
461         Log("SYNC_ask: negative response on circuit '%s'\n", state->proto_name);
462     }
463
464   done:
465     return res->hdr.response;
466 }
467
468
469 /*
470  * daemon com SYNC server-side interfaces
471  */
472
473 /**
474  * receive a command structure off a sync socket.
475  *
476  * @param[in]  state  pointer to server-side state object
477  * @param[in]  fd     file descriptor on which to perform i/o
478  * @param[out] com    sync command object to be populated
479  *
480  * @return operation status
481  *    @retval SYNC_OK command received
482  *    @retval SYNC_COM_ERROR there was a socket communications error
483  */
484 afs_int32
485 SYNC_getCom(SYNC_server_state_t * state,
486             osi_socket fd,
487             SYNC_command * com)
488 {
489     int n;
490     afs_int32 code = SYNC_OK;
491 #ifdef AFS_NT40_ENV
492     SYNC_PROTO_BUF_DECL(buf);
493 #else
494     struct iovec iov[2];
495     int iovcnt;
496 #endif
497
498 #ifdef AFS_NT40_ENV
499     n = recv(fd, buf, SYNC_PROTO_MAX_LEN, 0);
500
501     if (n == 0 || (n < 0 && WSAEINTR != WSAGetLastError())) {
502         Log("SYNC_getCom:  error receiving command\n");
503         code = SYNC_COM_ERROR;
504         goto done;
505     }
506 #else /* !AFS_NT40_ENV */
507     iov[0].iov_base = (char *)&com->hdr;
508     iov[0].iov_len = sizeof(com->hdr);
509     if (com->payload.len) {
510         iov[1].iov_base = (char *)com->payload.buf;
511         iov[1].iov_len = com->payload.len;
512         iovcnt = 2;
513     } else {
514         iovcnt = 1;
515     }
516
517     n = readv(fd, iov, iovcnt);
518     if (n == 0 || (n < 0 && errno != EINTR)) {
519         Log("SYNC_getCom:  error receiving command\n");
520         code = SYNC_COM_ERROR;
521         goto done;
522     }
523 #endif /* !AFS_NT40_ENV */
524
525     com->recv_len = n;
526
527     if (n < sizeof(com->hdr)) {
528         Log("SYNC_getCom:  command too short\n");
529         code = SYNC_COM_ERROR;
530         goto done;
531     }
532 #ifdef AFS_NT40_ENV
533     memcpy(&com->hdr, buf, sizeof(com->hdr));
534 #endif
535
536     if ((n - sizeof(com->hdr)) > com->payload.len) {
537         Log("SYNC_getCom:  command too long\n");
538         code = SYNC_COM_ERROR;
539         goto done;
540     }
541 #ifdef AFS_NT40_ENV
542     memcpy(com->payload.buf, buf + sizeof(com->hdr), n - sizeof(com->hdr));
543 #endif
544
545  done:
546     return code;
547 }
548
549 /**
550  * write a response structure to a sync socket.
551  *
552  * @param[in] state  handle to server-side state object
553  * @param[in] fd     file descriptor on which to perform i/o
554  * @param[in] res    handle to response packet
555  *
556  * @return operation status
557  *    @retval SYNC_OK
558  *    @retval SYNC_COM_ERROR
559  */
560 afs_int32
561 SYNC_putRes(SYNC_server_state_t * state,
562             osi_socket fd,
563             SYNC_response * res)
564 {
565     int n;
566     afs_int32 code = SYNC_OK;
567     SYNC_PROTO_BUF_DECL(buf);
568
569     if (res->hdr.response_len > (sizeof(res->hdr) + res->payload.len)) {
570         Log("SYNC_putRes:  response_len field in response header inconsistent\n");
571         code = SYNC_COM_ERROR;
572         goto done;
573     }
574
575     if (res->hdr.response_len > SYNC_PROTO_MAX_LEN) {
576         Log("SYNC_putRes:  internal SYNC buffer too small; please file a bug\n");
577         code = SYNC_COM_ERROR;
578         goto done;
579     }
580
581 #ifdef AFS_DEMAND_ATTACH_FS
582     res->hdr.flags |= SYNC_FLAG_DAFS_EXTENSIONS;
583 #endif
584     res->hdr.proto_version = state->proto_version;
585     res->hdr.pkt_seq = ++state->pkt_seq;
586     res->hdr.res_seq = ++state->res_seq;
587
588     memcpy(buf, &res->hdr, sizeof(res->hdr));
589     if (res->payload.len) {
590         memcpy(buf + sizeof(res->hdr), res->payload.buf,
591                res->hdr.response_len - sizeof(res->hdr));
592     }
593
594 #ifdef AFS_NT40_ENV
595     n = send(fd, buf, res->hdr.response_len, 0);
596 #else /* !AFS_NT40_ENV */
597     n = write(fd, buf, res->hdr.response_len);
598 #endif /* !AFS_NT40_ENV */
599
600     if (res->hdr.response_len != n) {
601         Log("SYNC_putRes: write failed\n");
602         res->hdr.response = SYNC_COM_ERROR;
603         goto done;
604     }
605
606  done:
607     return code;
608 }
609
610 /* return 0 for legal (null-terminated) string,
611  * 1 for illegal (unterminated) string */
612 int
613 SYNC_verifyProtocolString(char * buf, size_t len)
614 {
615     size_t s_len;
616
617     s_len = strnlen(buf, len);
618
619     return (s_len == len) ? 1 : 0;
620 }
621
622 /**
623  * clean up old sockets.
624  *
625  * @param[in]  state  server state object
626  *
627  * @post unix domain sockets are cleaned up
628  */
629 void
630 SYNC_cleanupSock(SYNC_server_state_t * state)
631 {
632 #ifdef USE_UNIX_SOCKETS
633     remove(state->addr.sun_path);
634 #endif
635 }
636
637 /**
638  * bind socket and set it to listen state.
639  *
640  * @param[in] state  server state object
641  *
642  * @return operation status
643  *    @retval 0 success
644  *    @retval nonzero failure
645  *
646  * @post socket bound and set to listen state
647  */
648 int
649 SYNC_bindSock(SYNC_server_state_t * state)
650 {
651     int code;
652     int on = 1;
653     int numTries;
654
655     /* Reuseaddr needed because system inexplicably leaves crud lying around */
656     code =
657         setsockopt(state->fd, SOL_SOCKET, SO_REUSEADDR, (char *)&on,
658                    sizeof(on));
659     if (code)
660         Log("SYNC_bindSock: setsockopt failed with (%d)\n", errno);
661
662     for (numTries = 0; numTries < state->bind_retry_limit; numTries++) {
663         code = bind(state->fd,
664                     (struct sockaddr *)&state->addr,
665                     AFS_SOCKADDR_LEN(&state->addr));
666         if (code == 0)
667             break;
668         Log("SYNC_bindSock: bind failed with (%d), will sleep and retry\n",
669             errno);
670         sleep(5);
671     }
672     listen(state->fd, state->listen_depth);
673
674     return code;
675 }