923306f36585bd3e37c482da458f3d57411a66cc
[openafs.git] / src / vol / daemon_com.c
1 /*
2  * Copyright 2006-2008, Sine Nomine Associates and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * localhost interprocess communication for servers
12  *
13  * currently handled by a localhost socket
14  * (yes, this needs to be replaced someday)
15  */
16
17 #ifndef _WIN32
18 #define FD_SETSIZE 65536
19 #endif
20
21 #include <afsconfig.h>
22 #include <afs/param.h>
23
24 #include <roken.h>
25
26 #include <sys/types.h>
27 #include <stdio.h>
28 #ifdef AFS_NT40_ENV
29 #include <winsock2.h>
30 #include <time.h>
31 #else
32 #include <sys/param.h>
33 #include <sys/socket.h>
34 #include <netinet/in.h>
35 #include <netdb.h>
36 #include <sys/time.h>
37 #include <unistd.h>
38 #endif
39 #include <errno.h>
40 #include <afs/afs_assert.h>
41 #include <signal.h>
42 #include <string.h>
43
44
45 #include <rx/xdr.h>
46 #include <afs/afsint.h>
47 #include "nfs.h"
48 #include <afs/errors.h>
49 #include "daemon_com.h"
50 #include "lwp.h"
51 #include "lock.h"
52 #include <afs/afssyscalls.h>
53 #include "ihandle.h"
54 #include "vnode.h"
55 #include "volume.h"
56 #include "partition.h"
57 #include "common.h"
58 #include <rx/rx_queue.h>
59
60 #ifdef USE_UNIX_SOCKETS
61 #include <afs/afsutil.h>
62 #include <sys/un.h>
63 #endif
64
65 int (*V_BreakVolumeCallbacks) (VolumeId);
66
67 #define MAXHANDLERS     4       /* Up to 4 clients; must be at least 2, so that
68                                  * move = dump+restore can run on single server */
69
70 #define MAX_BIND_TRIES  5       /* Number of times to retry socket bind */
71
72 static int SYNC_ask_internal(SYNC_client_state * state, SYNC_command * com, SYNC_response * res);
73
74
75 /*
76  * On AIX, connect() and bind() require use of SUN_LEN() macro;
77  * sizeof(struct sockaddr_un) will not suffice.
78  */
79 #if defined(AFS_AIX_ENV) && defined(USE_UNIX_SOCKETS)
80 #define AFS_SOCKADDR_LEN(sa)  SUN_LEN(sa)
81 #else
82 #define AFS_SOCKADDR_LEN(sa)  sizeof(*sa)
83 #endif
84
85
86 /* daemon com SYNC general interfaces */
87
88 /**
89  * fill in sockaddr structure.
90  *
91  * @param[in]  endpoint pointer to sync endpoint object
92  * @param[out] addr     pointer to sockaddr structure
93  *
94  * @post sockaddr structure populated using information from
95  *       endpoint structure.
96  */
97 void
98 SYNC_getAddr(SYNC_endpoint_t * endpoint, SYNC_sockaddr_t * addr)
99 {
100 #ifdef USE_UNIX_SOCKETS
101     char tbuffer[AFSDIR_PATH_MAX];
102 #endif /* USE_UNIX_SOCKETS */
103
104     memset(addr, 0, sizeof(*addr));
105
106 #ifdef USE_UNIX_SOCKETS
107     strcompose(tbuffer, AFSDIR_PATH_MAX, AFSDIR_SERVER_LOCAL_DIRPATH, "/",
108                endpoint->un, NULL);
109     addr->sun_family = AF_UNIX;
110     strncpy(addr->sun_path, tbuffer, (sizeof(struct sockaddr_un) - sizeof(short)));
111 #else  /* !USE_UNIX_SOCKETS */
112 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
113     addr->sin_len = sizeof(struct sockaddr_in);
114 #endif
115     addr->sin_addr.s_addr = htonl(0x7f000001);
116     addr->sin_family = AF_INET; /* was localhost->h_addrtype */
117     addr->sin_port = htons(endpoint->in);       /* XXXX htons not _really_ neccessary */
118 #endif /* !USE_UNIX_SOCKETS */
119 }
120
121 /**
122  * get a socket descriptor of the appropriate domain.
123  *
124  * @param[in]  endpoint pointer to sync endpoint object
125  *
126  * @return socket descriptor
127  *
128  * @post socket of domain specified in endpoint structure is created and
129  *       returned to caller.
130  */
131 osi_socket
132 SYNC_getSock(SYNC_endpoint_t * endpoint)
133 {
134     osi_socket sd;
135     osi_Assert((sd = socket(endpoint->domain, SOCK_STREAM, 0)) >= 0);
136     return sd;
137 }
138
139 /* daemon com SYNC client interface */
140
141 /**
142  * open a client connection to a sync server
143  *
144  * @param[in] state  pointer to sync client handle
145  *
146  * @return operation status
147  *    @retval 1 success
148  *
149  * @note at present, this routine aborts rather than returning an error code
150  */
151 int
152 SYNC_connect(SYNC_client_state * state)
153 {
154     SYNC_sockaddr_t addr;
155     /* I can't believe the following is needed for localhost connections!! */
156     static time_t backoff[] =
157         { 3, 3, 3, 5, 5, 5, 7, 15, 16, 24, 32, 40, 48, 0 };
158     time_t *timeout = &backoff[0];
159
160     if (state->fd != OSI_NULLSOCKET) {
161         return 1;
162     }
163
164     SYNC_getAddr(&state->endpoint, &addr);
165
166     for (;;) {
167         state->fd = SYNC_getSock(&state->endpoint);
168         if (connect(state->fd, (struct sockaddr *)&addr, AFS_SOCKADDR_LEN(&addr)) >= 0)
169             return 1;
170         if (!*timeout)
171             break;
172         if (!(*timeout & 1))
173             Log("SYNC_connect: temporary failure on circuit '%s' (will retry)\n",
174                 state->proto_name);
175         SYNC_disconnect(state);
176         sleep(*timeout++);
177     }
178     perror("SYNC_connect failed (giving up!)");
179     return 0;
180 }
181
182 /**
183  * forcibly disconnect a sync client handle.
184  *
185  * @param[in] state  pointer to sync client handle
186  *
187  * @retval operation status
188  *    @retval 0 success
189  */
190 int
191 SYNC_disconnect(SYNC_client_state * state)
192 {
193 #ifdef AFS_NT40_ENV
194     closesocket(state->fd);
195 #else
196     close(state->fd);
197 #endif
198     state->fd = OSI_NULLSOCKET;
199     return 0;
200 }
201
202 /**
203  * gracefully disconnect a sync client handle.
204  *
205  * @param[in] state  pointer to sync client handle
206  *
207  * @return operation status
208  *    @retval SYNC_OK success
209  */
210 afs_int32
211 SYNC_closeChannel(SYNC_client_state * state)
212 {
213     SYNC_command com;
214     SYNC_response res;
215     SYNC_PROTO_BUF_DECL(ores);
216
217     if (state->fd == OSI_NULLSOCKET)
218         return SYNC_OK;
219
220     memset(&com, 0, sizeof(com));
221     memset(&res, 0, sizeof(res));
222
223     res.payload.len = SYNC_PROTO_MAX_LEN;
224     res.payload.buf = ores;
225
226     com.hdr.command = SYNC_COM_CHANNEL_CLOSE;
227     com.hdr.command_len = sizeof(SYNC_command_hdr);
228     com.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
229
230     /* in case the other end dropped, don't do any retries */
231     state->retry_limit = 0;
232     state->hard_timeout = 0;
233
234     SYNC_ask(state, &com, &res);
235     SYNC_disconnect(state);
236
237     return SYNC_OK;
238 }
239
240 /**
241  * forcibly break a client connection, and then create a new connection.
242  *
243  * @param[in] state  pointer to sync client handle
244  *
245  * @post old connection dropped; new connection established
246  *
247  * @return @see SYNC_connect()
248  */
249 int
250 SYNC_reconnect(SYNC_client_state * state)
251 {
252     SYNC_disconnect(state);
253     return SYNC_connect(state);
254 }
255
256 /**
257  * send a command to a sync server and wait for a response.
258  *
259  * @param[in]  state  pointer to sync client handle
260  * @param[in]  com    command object
261  * @param[out] res    response object
262  *
263  * @return operation status
264  *    @retval SYNC_OK success
265  *    @retval SYNC_COM_ERROR communications error
266  *    @retval SYNC_BAD_COMMAND server did not recognize command code
267  *
268  * @note this routine merely handles error processing; SYNC_ask_internal()
269  *       handles the low-level details of communicating with the SYNC server.
270  *
271  * @see SYNC_ask_internal
272  */
273 afs_int32
274 SYNC_ask(SYNC_client_state * state, SYNC_command * com, SYNC_response * res)
275 {
276     int tries;
277     afs_uint32 now, timeout, code=SYNC_OK;
278
279     if (state->fatal_error) {
280         return SYNC_COM_ERROR;
281     }
282
283     if (state->fd == OSI_NULLSOCKET) {
284         SYNC_connect(state);
285     }
286
287     if (state->fd == OSI_NULLSOCKET) {
288         state->fatal_error = 1;
289         return SYNC_COM_ERROR;
290     }
291
292 #ifdef AFS_DEMAND_ATTACH_FS
293     com->hdr.flags |= SYNC_FLAG_DAFS_EXTENSIONS;
294 #endif
295
296     now = FT_ApproxTime();
297     timeout = now + state->hard_timeout;
298     for (tries = 0;
299          (tries <= state->retry_limit) && (now <= timeout);
300          tries++, now = FT_ApproxTime()) {
301         code = SYNC_ask_internal(state, com, res);
302         if (code == SYNC_OK) {
303             break;
304         } else if (code == SYNC_BAD_COMMAND) {
305             Log("SYNC_ask: protocol mismatch on circuit '%s'; make sure "
306                 "fileserver, volserver, salvageserver and salvager are same "
307                 "version\n", state->proto_name);
308             break;
309         } else if ((code == SYNC_COM_ERROR) && (tries < state->retry_limit)) {
310             Log("SYNC_ask: protocol communications failure on circuit '%s'; "
311                 "attempting reconnect to server\n", state->proto_name);
312             SYNC_reconnect(state);
313             /* try again */
314         } else {
315             /*
316              * unknown (probably protocol-specific) response code, pass it up to
317              * the caller, and let them deal with it
318              */
319             break;
320         }
321     }
322
323     if (code == SYNC_COM_ERROR) {
324         Log("SYNC_ask: fatal protocol error on circuit '%s'; disabling sync "
325             "protocol until next server restart\n",
326             state->proto_name);
327         state->fatal_error = 1;
328     }
329
330     return code;
331 }
332
333 /**
334  * send a command to a sync server and wait for a response.
335  *
336  * @param[in]  state  pointer to sync client handle
337  * @param[in]  com    command object
338  * @param[out] res    response object
339  *
340  * @return operation status
341  *    @retval SYNC_OK success
342  *    @retval SYNC_COM_ERROR communications error
343  *
344  * @internal
345  */
346 static afs_int32
347 SYNC_ask_internal(SYNC_client_state * state, SYNC_command * com, SYNC_response * res)
348 {
349     int n;
350     SYNC_PROTO_BUF_DECL(buf);
351 #ifndef AFS_NT40_ENV
352     int iovcnt;
353     struct iovec iov[2];
354 #endif
355
356     if (state->fd == OSI_NULLSOCKET) {
357         Log("SYNC_ask:  invalid sync file descriptor on circuit '%s'\n",
358             state->proto_name);
359         res->hdr.response = SYNC_COM_ERROR;
360         goto done;
361     }
362
363     if (com->hdr.command_len > SYNC_PROTO_MAX_LEN) {
364         Log("SYNC_ask:  internal SYNC buffer too small on circuit '%s'; "
365             "please file a bug\n", state->proto_name);
366         res->hdr.response = SYNC_COM_ERROR;
367         goto done;
368     }
369
370     /*
371      * fill in some common header fields
372      */
373     com->hdr.proto_version = state->proto_version;
374     com->hdr.pkt_seq = ++state->pkt_seq;
375     com->hdr.com_seq = ++state->com_seq;
376 #ifdef AFS_NT40_ENV
377     com->hdr.pid = 0;
378     com->hdr.tid = 0;
379 #else
380     com->hdr.pid = getpid();
381 #ifdef AFS_PTHREAD_ENV
382     com->hdr.tid = afs_pointer_to_int(pthread_self());
383 #else
384     {
385         PROCESS handle = LWP_ThreadId();
386         com->hdr.tid = (handle) ? handle->index : 0;
387     }
388 #endif /* !AFS_PTHREAD_ENV */
389 #endif /* !AFS_NT40_ENV */
390
391     memcpy(buf, &com->hdr, sizeof(com->hdr));
392     if (com->payload.len) {
393         memcpy(buf + sizeof(com->hdr), com->payload.buf,
394                com->hdr.command_len - sizeof(com->hdr));
395     }
396
397 #ifdef AFS_NT40_ENV
398     n = send(state->fd, buf, com->hdr.command_len, 0);
399     if (n != com->hdr.command_len) {
400         Log("SYNC_ask:  write failed on circuit '%s'\n", state->proto_name);
401         res->hdr.response = SYNC_COM_ERROR;
402         goto done;
403     }
404
405     if (com->hdr.command == SYNC_COM_CHANNEL_CLOSE) {
406         /* short circuit close channel requests */
407         res->hdr.response = SYNC_OK;
408         goto done;
409     }
410
411     n = recv(state->fd, buf, SYNC_PROTO_MAX_LEN, 0);
412     if (n == 0 || (n < 0 && WSAEINTR != WSAGetLastError())) {
413         Log("SYNC_ask:  No response on circuit '%s'\n", state->proto_name);
414         res->hdr.response = SYNC_COM_ERROR;
415         goto done;
416     }
417 #else /* !AFS_NT40_ENV */
418     n = write(state->fd, buf, com->hdr.command_len);
419     if (com->hdr.command_len != n) {
420         Log("SYNC_ask: write failed on circuit '%s'\n", state->proto_name);
421         res->hdr.response = SYNC_COM_ERROR;
422         goto done;
423     }
424
425     if (com->hdr.command == SYNC_COM_CHANNEL_CLOSE) {
426         /* short circuit close channel requests */
427         res->hdr.response = SYNC_OK;
428         goto done;
429     }
430
431     /* receive the response */
432     iov[0].iov_base = (char *)&res->hdr;
433     iov[0].iov_len = sizeof(res->hdr);
434     if (res->payload.len) {
435         iov[1].iov_base = (char *)res->payload.buf;
436         iov[1].iov_len = res->payload.len;
437         iovcnt = 2;
438     } else {
439         iovcnt = 1;
440     }
441     n = readv(state->fd, iov, iovcnt);
442     if (n == 0 || (n < 0 && errno != EINTR)) {
443         Log("SYNC_ask: No response on circuit '%s'\n", state->proto_name);
444         res->hdr.response = SYNC_COM_ERROR;
445         goto done;
446     }
447 #endif /* !AFS_NT40_ENV */
448
449     res->recv_len = n;
450
451     if (n < sizeof(res->hdr)) {
452         Log("SYNC_ask:  response too short on circuit '%s'\n",
453             state->proto_name);
454         res->hdr.response = SYNC_COM_ERROR;
455         goto done;
456     }
457 #ifdef AFS_NT40_ENV
458     memcpy(&res->hdr, buf, sizeof(res->hdr));
459 #endif
460
461     if ((n - sizeof(res->hdr)) > res->payload.len) {
462         Log("SYNC_ask:  response too long on circuit '%s'\n",
463             state->proto_name);
464         res->hdr.response = SYNC_COM_ERROR;
465         goto done;
466     }
467 #ifdef AFS_NT40_ENV
468     memcpy(res->payload.buf, buf + sizeof(res->hdr), n - sizeof(res->hdr));
469 #endif
470
471     if (res->hdr.response_len != n) {
472         Log("SYNC_ask:  length field in response inconsistent "
473             "on circuit '%s'\n", state->proto_name);
474         res->hdr.response = SYNC_COM_ERROR;
475         goto done;
476     }
477     if (res->hdr.response == SYNC_DENIED) {
478         Log("SYNC_ask: negative response on circuit '%s'\n", state->proto_name);
479     }
480
481   done:
482     return res->hdr.response;
483 }
484
485
486 /*
487  * daemon com SYNC server-side interfaces
488  */
489
490 /**
491  * receive a command structure off a sync socket.
492  *
493  * @param[in]  state  pointer to server-side state object
494  * @param[in]  fd     file descriptor on which to perform i/o
495  * @param[out] com    sync command object to be populated
496  *
497  * @return operation status
498  *    @retval SYNC_OK command received
499  *    @retval SYNC_COM_ERROR there was a socket communications error
500  */
501 afs_int32
502 SYNC_getCom(SYNC_server_state_t * state,
503             osi_socket fd,
504             SYNC_command * com)
505 {
506     int n;
507     afs_int32 code = SYNC_OK;
508 #ifdef AFS_NT40_ENV
509     SYNC_PROTO_BUF_DECL(buf);
510 #else
511     struct iovec iov[2];
512     int iovcnt;
513 #endif
514
515 #ifdef AFS_NT40_ENV
516     n = recv(fd, buf, SYNC_PROTO_MAX_LEN, 0);
517
518     if (n == 0 || (n < 0 && WSAEINTR != WSAGetLastError())) {
519         Log("SYNC_getCom:  error receiving command\n");
520         code = SYNC_COM_ERROR;
521         goto done;
522     }
523 #else /* !AFS_NT40_ENV */
524     iov[0].iov_base = (char *)&com->hdr;
525     iov[0].iov_len = sizeof(com->hdr);
526     if (com->payload.len) {
527         iov[1].iov_base = (char *)com->payload.buf;
528         iov[1].iov_len = com->payload.len;
529         iovcnt = 2;
530     } else {
531         iovcnt = 1;
532     }
533
534     n = readv(fd, iov, iovcnt);
535     if (n == 0 || (n < 0 && errno != EINTR)) {
536         Log("SYNC_getCom:  error receiving command\n");
537         code = SYNC_COM_ERROR;
538         goto done;
539     }
540 #endif /* !AFS_NT40_ENV */
541
542     com->recv_len = n;
543
544     if (n < sizeof(com->hdr)) {
545         Log("SYNC_getCom:  command too short\n");
546         code = SYNC_COM_ERROR;
547         goto done;
548     }
549 #ifdef AFS_NT40_ENV
550     memcpy(&com->hdr, buf, sizeof(com->hdr));
551 #endif
552
553     if ((n - sizeof(com->hdr)) > com->payload.len) {
554         Log("SYNC_getCom:  command too long\n");
555         code = SYNC_COM_ERROR;
556         goto done;
557     }
558 #ifdef AFS_NT40_ENV
559     memcpy(com->payload.buf, buf + sizeof(com->hdr), n - sizeof(com->hdr));
560 #endif
561
562  done:
563     return code;
564 }
565
566 /**
567  * write a response structure to a sync socket.
568  *
569  * @param[in] state  handle to server-side state object
570  * @param[in] fd     file descriptor on which to perform i/o
571  * @param[in] res    handle to response packet
572  *
573  * @return operation status
574  *    @retval SYNC_OK
575  *    @retval SYNC_COM_ERROR
576  */
577 afs_int32
578 SYNC_putRes(SYNC_server_state_t * state,
579             osi_socket fd,
580             SYNC_response * res)
581 {
582     int n;
583     afs_int32 code = SYNC_OK;
584     SYNC_PROTO_BUF_DECL(buf);
585
586     if (res->hdr.response_len > (sizeof(res->hdr) + res->payload.len)) {
587         Log("SYNC_putRes:  response_len field in response header inconsistent\n");
588         code = SYNC_COM_ERROR;
589         goto done;
590     }
591
592     if (res->hdr.response_len > SYNC_PROTO_MAX_LEN) {
593         Log("SYNC_putRes:  internal SYNC buffer too small; please file a bug\n");
594         code = SYNC_COM_ERROR;
595         goto done;
596     }
597
598 #ifdef AFS_DEMAND_ATTACH_FS
599     res->hdr.flags |= SYNC_FLAG_DAFS_EXTENSIONS;
600 #endif
601     res->hdr.proto_version = state->proto_version;
602     res->hdr.pkt_seq = ++state->pkt_seq;
603     res->hdr.res_seq = ++state->res_seq;
604
605     memcpy(buf, &res->hdr, sizeof(res->hdr));
606     if (res->payload.len) {
607         memcpy(buf + sizeof(res->hdr), res->payload.buf,
608                res->hdr.response_len - sizeof(res->hdr));
609     }
610
611 #ifdef AFS_NT40_ENV
612     n = send(fd, buf, res->hdr.response_len, 0);
613 #else /* !AFS_NT40_ENV */
614     n = write(fd, buf, res->hdr.response_len);
615 #endif /* !AFS_NT40_ENV */
616
617     if (res->hdr.response_len != n) {
618         Log("SYNC_putRes: write failed\n");
619         res->hdr.response = SYNC_COM_ERROR;
620         goto done;
621     }
622
623  done:
624     return code;
625 }
626
627 /* return 0 for legal (null-terminated) string,
628  * 1 for illegal (unterminated) string */
629 int
630 SYNC_verifyProtocolString(char * buf, size_t len)
631 {
632     size_t s_len;
633
634     s_len = afs_strnlen(buf, len);
635
636     return (s_len == len) ? 1 : 0;
637 }
638
639 /**
640  * clean up old sockets.
641  *
642  * @param[in]  state  server state object
643  *
644  * @post unix domain sockets are cleaned up
645  */
646 void
647 SYNC_cleanupSock(SYNC_server_state_t * state)
648 {
649 #ifdef USE_UNIX_SOCKETS
650     remove(state->addr.sun_path);
651 #endif
652 }
653
654 /**
655  * bind socket and set it to listen state.
656  *
657  * @param[in] state  server state object
658  *
659  * @return operation status
660  *    @retval 0 success
661  *    @retval nonzero failure
662  *
663  * @post socket bound and set to listen state
664  */
665 int
666 SYNC_bindSock(SYNC_server_state_t * state)
667 {
668     int code;
669     int on = 1;
670     int numTries;
671
672     /* Reuseaddr needed because system inexplicably leaves crud lying around */
673     code =
674         setsockopt(state->fd, SOL_SOCKET, SO_REUSEADDR, (char *)&on,
675                    sizeof(on));
676     if (code)
677         Log("SYNC_bindSock: setsockopt failed with (%d)\n", errno);
678
679     for (numTries = 0; numTries < state->bind_retry_limit; numTries++) {
680         code = bind(state->fd,
681                     (struct sockaddr *)&state->addr,
682                     AFS_SOCKADDR_LEN(&state->addr));
683         if (code == 0)
684             break;
685         Log("SYNC_bindSock: bind failed with (%d), will sleep and retry\n",
686             errno);
687         sleep(5);
688     }
689     listen(state->fd, state->listen_depth);
690
691     return code;
692 }