Remove the RCSID macro
[openafs.git] / src / vol / daemon_com.c
1 /*
2  * Copyright 2006-2008, Sine Nomine Associates and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * localhost interprocess communication for servers
12  *
13  * currently handled by a localhost socket
14  * (yes, this needs to be replaced someday)
15  */
16
17 #ifndef _WIN32
18 #define FD_SETSIZE 65536
19 #endif
20
21 #include <afsconfig.h>
22 #include <afs/param.h>
23
24
25 #include <sys/types.h>
26 #include <stdio.h>
27 #ifdef AFS_NT40_ENV
28 #include <winsock2.h>
29 #include <time.h>
30 #else
31 #include <sys/param.h>
32 #include <sys/socket.h>
33 #include <netinet/in.h>
34 #include <netdb.h>
35 #include <sys/time.h>
36 #endif
37 #include <errno.h>
38 #include <assert.h>
39 #include <signal.h>
40 #include <string.h>
41
42
43 #include <rx/xdr.h>
44 #include <afs/afsint.h>
45 #include "nfs.h"
46 #include <afs/errors.h>
47 #include "daemon_com.h"
48 #include "lwp.h"
49 #include "lock.h"
50 #include <afs/afssyscalls.h>
51 #include "ihandle.h"
52 #include "vnode.h"
53 #include "volume.h"
54 #include "partition.h"
55 #include <rx/rx_queue.h>
56
57 #ifdef USE_UNIX_SOCKETS
58 #include <afs/afsutil.h>
59 #include <sys/un.h>
60 #endif
61
62 /*@printflike@*/ extern void Log(const char *format, ...);
63
64 int (*V_BreakVolumeCallbacks) (VolumeId);
65
66 #define MAXHANDLERS     4       /* Up to 4 clients; must be at least 2, so that
67                                  * move = dump+restore can run on single server */
68
69 #define MAX_BIND_TRIES  5       /* Number of times to retry socket bind */
70
71 static int SYNC_ask_internal(SYNC_client_state * state, SYNC_command * com, SYNC_response * res);
72
73
74 /*
75  * On AIX, connect() and bind() require use of SUN_LEN() macro;
76  * sizeof(struct sockaddr_un) will not suffice.
77  */
78 #if defined(AFS_AIX_ENV) && defined(USE_UNIX_SOCKETS)
79 #define AFS_SOCKADDR_LEN(sa)  SUN_LEN(sa)
80 #else
81 #define AFS_SOCKADDR_LEN(sa)  sizeof(*sa)
82 #endif
83
84
85 /* daemon com SYNC general interfaces */
86
87 /**
88  * fill in sockaddr structure.
89  *
90  * @param[in]  endpoint pointer to sync endpoint object
91  * @param[out] addr     pointer to sockaddr structure
92  *
93  * @post sockaddr structure populated using information from
94  *       endpoint structure.
95  */
96 void
97 SYNC_getAddr(SYNC_endpoint_t * endpoint, SYNC_sockaddr_t * addr)
98 {
99 #ifdef USE_UNIX_SOCKETS
100     char tbuffer[AFSDIR_PATH_MAX];
101 #endif /* USE_UNIX_SOCKETS */
102
103     memset(addr, 0, sizeof(*addr));
104
105 #ifdef USE_UNIX_SOCKETS
106     strcompose(tbuffer, AFSDIR_PATH_MAX, AFSDIR_SERVER_LOCAL_DIRPATH, "/",
107                endpoint->un, NULL);
108     addr->sun_family = AF_UNIX;
109     strncpy(addr->sun_path, tbuffer, (sizeof(struct sockaddr_un) - sizeof(short)));
110 #else  /* !USE_UNIX_SOCKETS */
111 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
112     addr->sin_len = sizeof(struct sockaddr_in);
113 #endif
114     addr->sin_addr.s_addr = htonl(0x7f000001);
115     addr->sin_family = AF_INET; /* was localhost->h_addrtype */
116     addr->sin_port = htons(endpoint->in);       /* XXXX htons not _really_ neccessary */
117 #endif /* !USE_UNIX_SOCKETS */
118 }
119
120 /**
121  * get a socket descriptor of the appropriate domain.
122  *
123  * @param[in]  endpoint pointer to sync endpoint object
124  *
125  * @return socket descriptor
126  *
127  * @post socket of domain specified in endpoint structure is created and
128  *       returned to caller.
129  */
130 osi_socket
131 SYNC_getSock(SYNC_endpoint_t * endpoint)
132 {
133     osi_socket sd;
134     assert((sd = socket(endpoint->domain, SOCK_STREAM, 0)) >= 0);
135     return sd;
136 }
137
138 /* daemon com SYNC client interface */
139
140 /**
141  * open a client connection to a sync server
142  *
143  * @param[in] state  pointer to sync client handle
144  *
145  * @return operation status
146  *    @retval 1 success
147  *
148  * @note at present, this routine aborts rather than returning an error code
149  */
150 int
151 SYNC_connect(SYNC_client_state * state)
152 {
153     SYNC_sockaddr_t addr;
154     /* I can't believe the following is needed for localhost connections!! */
155     static time_t backoff[] =
156         { 3, 3, 3, 5, 5, 5, 7, 15, 16, 24, 32, 40, 48, 0 };
157     time_t *timeout = &backoff[0];
158
159     if (state->fd >= 0) {
160         return 1;
161     }
162
163     SYNC_getAddr(&state->endpoint, &addr);
164
165     for (;;) {
166         state->fd = SYNC_getSock(&state->endpoint);
167         if (connect(state->fd, (struct sockaddr *)&addr, AFS_SOCKADDR_LEN(&addr)) >= 0)
168             return 1;
169         if (!*timeout)
170             break;
171         if (!(*timeout & 1))
172             Log("SYNC_connect: temporary failure on circuit '%s' (will retry)\n",
173                 state->proto_name);
174         SYNC_disconnect(state);
175         sleep(*timeout++);
176     }
177     perror("SYNC_connect failed (giving up!)");
178     return 0;
179 }
180
181 /**
182  * forcibly disconnect a sync client handle.
183  *
184  * @param[in] state  pointer to sync client handle
185  *
186  * @retval operation status
187  *    @retval 0 success
188  */
189 int
190 SYNC_disconnect(SYNC_client_state * state)
191 {
192 #ifdef AFS_NT40_ENV
193     closesocket(state->fd);
194 #else
195     close(state->fd);
196 #endif
197     state->fd = -1;
198     return 0;
199 }
200
201 /**
202  * gracefully disconnect a sync client handle.
203  *
204  * @param[in] state  pointer to sync client handle
205  *
206  * @return operation status
207  *    @retval SYNC_OK success
208  */
209 afs_int32
210 SYNC_closeChannel(SYNC_client_state * state)
211 {
212     SYNC_command com;
213     SYNC_response res;
214     SYNC_PROTO_BUF_DECL(ores);
215
216     if (state->fd == -1)
217         return SYNC_OK;
218
219     memset(&com, 0, sizeof(com));
220     memset(&res, 0, sizeof(res));
221
222     res.payload.len = SYNC_PROTO_MAX_LEN;
223     res.payload.buf = ores;
224
225     com.hdr.command = SYNC_COM_CHANNEL_CLOSE;
226     com.hdr.command_len = sizeof(SYNC_command_hdr);
227     com.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
228
229     /* in case the other end dropped, don't do any retries */
230     state->retry_limit = 0;
231     state->hard_timeout = 0;
232
233     SYNC_ask(state, &com, &res);
234     SYNC_disconnect(state);
235
236     return SYNC_OK;
237 }
238
239 /**
240  * forcibly break a client connection, and then create a new connection.
241  *
242  * @param[in] state  pointer to sync client handle
243  *
244  * @post old connection dropped; new connection established
245  *
246  * @return @see SYNC_connect()
247  */
248 int
249 SYNC_reconnect(SYNC_client_state * state)
250 {
251     SYNC_disconnect(state);
252     return SYNC_connect(state);
253 }
254
255 /**
256  * send a command to a sync server and wait for a response.
257  *
258  * @param[in]  state  pointer to sync client handle
259  * @param[in]  com    command object
260  * @param[out] res    response object
261  *
262  * @return operation status
263  *    @retval SYNC_OK success
264  *    @retval SYNC_COM_ERROR communications error
265  *    @retval SYNC_BAD_COMMAND server did not recognize command code
266  *
267  * @note this routine merely handles error processing; SYNC_ask_internal()
268  *       handles the low-level details of communicating with the SYNC server.
269  *
270  * @see SYNC_ask_internal
271  */
272 afs_int32
273 SYNC_ask(SYNC_client_state * state, SYNC_command * com, SYNC_response * res)
274 {
275     int tries;
276     afs_uint32 now, timeout, code=SYNC_OK;
277
278     if (state->fatal_error) {
279         return SYNC_COM_ERROR;
280     }
281
282     if (state->fd == -1) {
283         SYNC_connect(state);
284     }
285
286     if (state->fd == -1) {
287         state->fatal_error = 1;
288         return SYNC_COM_ERROR;
289     }
290
291 #ifdef AFS_DEMAND_ATTACH_FS
292     com->hdr.flags |= SYNC_FLAG_DAFS_EXTENSIONS;
293 #endif
294
295     now = FT_ApproxTime();
296     timeout = now + state->hard_timeout;
297     for (tries = 0; 
298          (tries <= state->retry_limit) && (now <= timeout);
299          tries++, now = FT_ApproxTime()) {
300         code = SYNC_ask_internal(state, com, res);
301         if (code == SYNC_OK) {
302             break;
303         } else if (code == SYNC_BAD_COMMAND) {
304             Log("SYNC_ask: protocol mismatch on circuit '%s'; make sure "
305                 "fileserver, volserver, salvageserver and salvager are same "
306                 "version\n", state->proto_name);
307             break;
308         } else if ((code == SYNC_COM_ERROR) && (tries < state->retry_limit)) {
309             Log("SYNC_ask: protocol communications failure on circuit '%s'; "
310                 "attempting reconnect to server\n", state->proto_name);
311             SYNC_reconnect(state);
312             /* try again */
313         } else {
314             /* 
315              * unknown (probably protocol-specific) response code, pass it up to 
316              * the caller, and let them deal with it 
317              */
318             break;
319         }
320     }
321
322     if (code == SYNC_COM_ERROR) {
323         Log("SYNC_ask: fatal protocol error on circuit '%s'; disabling sync "
324             "protocol until next server restart\n", 
325             state->proto_name);
326         state->fatal_error = 1;
327     }
328
329     return code;
330 }
331
332 /**
333  * send a command to a sync server and wait for a response.
334  *
335  * @param[in]  state  pointer to sync client handle
336  * @param[in]  com    command object
337  * @param[out] res    response object
338  *
339  * @return operation status
340  *    @retval SYNC_OK success
341  *    @retval SYNC_COM_ERROR communications error
342  *
343  * @internal
344  */
345 static afs_int32
346 SYNC_ask_internal(SYNC_client_state * state, SYNC_command * com, SYNC_response * res)
347 {
348     int n;
349     SYNC_PROTO_BUF_DECL(buf);
350 #ifndef AFS_NT40_ENV
351     int iovcnt;
352     struct iovec iov[2];
353 #endif
354
355     if (state->fd == -1) {
356         Log("SYNC_ask:  invalid sync file descriptor on circuit '%s'\n",
357             state->proto_name);
358         res->hdr.response = SYNC_COM_ERROR;
359         goto done;
360     }
361
362     if (com->hdr.command_len > SYNC_PROTO_MAX_LEN) {
363         Log("SYNC_ask:  internal SYNC buffer too small on circuit '%s'; "
364             "please file a bug\n", state->proto_name);
365         res->hdr.response = SYNC_COM_ERROR;
366         goto done;
367     }
368
369     /*
370      * fill in some common header fields
371      */
372     com->hdr.proto_version = state->proto_version;
373     com->hdr.pkt_seq = ++state->pkt_seq;
374     com->hdr.com_seq = ++state->com_seq;
375 #ifdef AFS_NT40_ENV
376     com->hdr.pid = 0;
377     com->hdr.tid = 0;
378 #else
379     com->hdr.pid = getpid();
380 #ifdef AFS_PTHREAD_ENV
381     com->hdr.tid = (afs_int32)pthread_self();
382 #else
383     {
384         PROCESS handle = LWP_ThreadId();
385         com->hdr.tid = (handle) ? handle->index : 0;
386     }
387 #endif /* !AFS_PTHREAD_ENV */
388 #endif /* !AFS_NT40_ENV */
389
390     memcpy(buf, &com->hdr, sizeof(com->hdr));
391     if (com->payload.len) {
392         memcpy(buf + sizeof(com->hdr), com->payload.buf, 
393                com->hdr.command_len - sizeof(com->hdr));
394     }
395
396 #ifdef AFS_NT40_ENV
397     n = send(state->fd, buf, com->hdr.command_len, 0);
398     if (n != com->hdr.command_len) {
399         Log("SYNC_ask:  write failed on circuit '%s'\n", state->proto_name);
400         res->hdr.response = SYNC_COM_ERROR;
401         goto done;
402     }
403
404     if (com->hdr.command == SYNC_COM_CHANNEL_CLOSE) {
405         /* short circuit close channel requests */
406         res->hdr.response = SYNC_OK;
407         goto done;
408     }
409
410     n = recv(state->fd, buf, SYNC_PROTO_MAX_LEN, 0);
411     if (n == 0 || (n < 0 && WSAEINTR != WSAGetLastError())) {
412         Log("SYNC_ask:  No response on circuit '%s'\n", state->proto_name);
413         res->hdr.response = SYNC_COM_ERROR;
414         goto done;
415     }
416 #else /* !AFS_NT40_ENV */
417     n = write(state->fd, buf, com->hdr.command_len);
418     if (com->hdr.command_len != n) {
419         Log("SYNC_ask: write failed on circuit '%s'\n", state->proto_name);
420         res->hdr.response = SYNC_COM_ERROR;
421         goto done;
422     }
423
424     if (com->hdr.command == SYNC_COM_CHANNEL_CLOSE) {
425         /* short circuit close channel requests */
426         res->hdr.response = SYNC_OK;
427         goto done;
428     }
429
430     /* receive the response */
431     iov[0].iov_base = (char *)&res->hdr;
432     iov[0].iov_len = sizeof(res->hdr);
433     if (res->payload.len) {
434         iov[1].iov_base = (char *)res->payload.buf;
435         iov[1].iov_len = res->payload.len;
436         iovcnt = 2;
437     } else {
438         iovcnt = 1;
439     }
440     n = readv(state->fd, iov, iovcnt);
441     if (n == 0 || (n < 0 && errno != EINTR)) {
442         Log("SYNC_ask: No response on circuit '%s'\n", state->proto_name);
443         res->hdr.response = SYNC_COM_ERROR;
444         goto done;
445     }
446 #endif /* !AFS_NT40_ENV */
447
448     res->recv_len = n;
449
450     if (n < sizeof(res->hdr)) {
451         Log("SYNC_ask:  response too short on circuit '%s'\n", 
452             state->proto_name);
453         res->hdr.response = SYNC_COM_ERROR;
454         goto done;
455     }
456 #ifdef AFS_NT40_ENV
457     memcpy(&res->hdr, buf, sizeof(res->hdr));
458 #endif
459
460     if ((n - sizeof(res->hdr)) > res->payload.len) {
461         Log("SYNC_ask:  response too long on circuit '%s'\n", 
462             state->proto_name);
463         res->hdr.response = SYNC_COM_ERROR;
464         goto done;
465     }
466 #ifdef AFS_NT40_ENV
467     memcpy(res->payload.buf, buf + sizeof(res->hdr), n - sizeof(res->hdr));
468 #endif
469
470     if (res->hdr.response_len != n) {
471         Log("SYNC_ask:  length field in response inconsistent "
472             "on circuit '%s'\n", state->proto_name);
473         res->hdr.response = SYNC_COM_ERROR;
474         goto done;
475     }
476     if (res->hdr.response == SYNC_DENIED) {
477         Log("SYNC_ask: negative response on circuit '%s'\n", state->proto_name);
478     }
479
480   done:
481     return res->hdr.response;
482 }
483
484
485 /* 
486  * daemon com SYNC server-side interfaces 
487  */
488
489 /**
490  * receive a command structure off a sync socket.
491  *
492  * @param[in]  state  pointer to server-side state object
493  * @param[in]  fd     file descriptor on which to perform i/o
494  * @param[out] com    sync command object to be populated
495  *
496  * @return operation status
497  *    @retval SYNC_OK command received
498  *    @retval SYNC_COM_ERROR there was a socket communications error
499  */
500 afs_int32
501 SYNC_getCom(SYNC_server_state_t * state,
502             osi_socket fd,
503             SYNC_command * com)
504 {
505     int n;
506     afs_int32 code = SYNC_OK;
507 #ifdef AFS_NT40_ENV
508     SYNC_PROTO_BUF_DECL(buf);
509 #else
510     struct iovec iov[2];
511     int iovcnt;
512 #endif
513
514 #ifdef AFS_NT40_ENV
515     n = recv(fd, buf, SYNC_PROTO_MAX_LEN, 0);
516
517     if (n == 0 || (n < 0 && WSAEINTR != WSAGetLastError())) {
518         Log("SYNC_getCom:  error receiving command\n");
519         code = SYNC_COM_ERROR;
520         goto done;
521     }
522 #else /* !AFS_NT40_ENV */
523     iov[0].iov_base = (char *)&com->hdr;
524     iov[0].iov_len = sizeof(com->hdr);
525     if (com->payload.len) {
526         iov[1].iov_base = (char *)com->payload.buf;
527         iov[1].iov_len = com->payload.len;
528         iovcnt = 2;
529     } else {
530         iovcnt = 1;
531     }
532
533     n = readv(fd, iov, iovcnt);
534     if (n == 0 || (n < 0 && errno != EINTR)) {
535         Log("SYNC_getCom:  error receiving command\n");
536         code = SYNC_COM_ERROR;
537         goto done;
538     }
539 #endif /* !AFS_NT40_ENV */
540
541     com->recv_len = n;
542
543     if (n < sizeof(com->hdr)) {
544         Log("SYNC_getCom:  command too short\n");
545         code = SYNC_COM_ERROR;
546         goto done;
547     }
548 #ifdef AFS_NT40_ENV
549     memcpy(&com->hdr, buf, sizeof(com->hdr));
550 #endif
551
552     if ((n - sizeof(com->hdr)) > com->payload.len) {
553         Log("SYNC_getCom:  command too long\n");
554         code = SYNC_COM_ERROR;
555         goto done;
556     }
557 #ifdef AFS_NT40_ENV
558     memcpy(com->payload.buf, buf + sizeof(com->hdr), n - sizeof(com->hdr));
559 #endif
560
561  done:
562     return code;
563 }
564
565 /**
566  * write a response structure to a sync socket.
567  *
568  * @param[in] state  handle to server-side state object
569  * @param[in] fd     file descriptor on which to perform i/o
570  * @param[in] res    handle to response packet
571  *
572  * @return operation status
573  *    @retval SYNC_OK
574  *    @retval SYNC_COM_ERROR
575  */
576 afs_int32
577 SYNC_putRes(SYNC_server_state_t * state, 
578             osi_socket fd,
579             SYNC_response * res)
580 {
581     int n;
582     afs_int32 code = SYNC_OK;
583     SYNC_PROTO_BUF_DECL(buf);
584
585     if (res->hdr.response_len > (sizeof(res->hdr) + res->payload.len)) {
586         Log("SYNC_putRes:  response_len field in response header inconsistent\n");
587         code = SYNC_COM_ERROR;
588         goto done;
589     }
590
591     if (res->hdr.response_len > SYNC_PROTO_MAX_LEN) {
592         Log("SYNC_putRes:  internal SYNC buffer too small; please file a bug\n");
593         code = SYNC_COM_ERROR;
594         goto done;
595     }
596
597 #ifdef AFS_DEMAND_ATTACH_FS
598     res->hdr.flags |= SYNC_FLAG_DAFS_EXTENSIONS;
599 #endif
600     res->hdr.proto_version = state->proto_version;
601     res->hdr.pkt_seq = ++state->pkt_seq;
602     res->hdr.res_seq = ++state->res_seq;
603
604     memcpy(buf, &res->hdr, sizeof(res->hdr));
605     if (res->payload.len) {
606         memcpy(buf + sizeof(res->hdr), res->payload.buf, 
607                res->hdr.response_len - sizeof(res->hdr));
608     }
609
610 #ifdef AFS_NT40_ENV
611     n = send(fd, buf, res->hdr.response_len, 0);
612 #else /* !AFS_NT40_ENV */
613     n = write(fd, buf, res->hdr.response_len);
614 #endif /* !AFS_NT40_ENV */
615
616     if (res->hdr.response_len != n) {
617         Log("SYNC_putRes: write failed\n");
618         res->hdr.response = SYNC_COM_ERROR;
619         goto done;
620     }
621
622  done:
623     return code;
624 }
625
626 /* return 0 for legal (null-terminated) string,
627  * 1 for illegal (unterminated) string */
628 int
629 SYNC_verifyProtocolString(char * buf, size_t len)
630 {
631     size_t s_len;
632
633     s_len = afs_strnlen(buf, len);
634
635     return (s_len == len) ? 1 : 0;
636 }
637
638 /**
639  * clean up old sockets.
640  *
641  * @param[in]  state  server state object
642  *
643  * @post unix domain sockets are cleaned up
644  */
645 void
646 SYNC_cleanupSock(SYNC_server_state_t * state)
647 {
648 #ifdef USE_UNIX_SOCKETS
649     remove(state->addr.sun_path);
650 #endif
651 }
652
653 /**
654  * bind socket and set it to listen state.
655  *
656  * @param[in] state  server state object
657  *
658  * @return operation status
659  *    @retval 0 success
660  *    @retval nonzero failure
661  *
662  * @post socket bound and set to listen state
663  */
664 int
665 SYNC_bindSock(SYNC_server_state_t * state)
666 {
667     int code;
668     int on = 1;
669     int numTries;
670
671     /* Reuseaddr needed because system inexplicably leaves crud lying around */
672     code =
673         setsockopt(state->fd, SOL_SOCKET, SO_REUSEADDR, (char *)&on,
674                    sizeof(on));
675     if (code)
676         Log("SYNC_bindSock: setsockopt failed with (%d)\n", errno);
677
678     for (numTries = 0; numTries < state->bind_retry_limit; numTries++) {
679         code = bind(state->fd, 
680                     (struct sockaddr *)&state->addr, 
681                     AFS_SOCKADDR_LEN(&state->addr));
682         if (code == 0)
683             break;
684         Log("SYNC_bindSock: bind failed with (%d), will sleep and retry\n",
685             errno);
686         sleep(5);
687     }
688     listen(state->fd, state->listen_depth);
689
690     return code;
691 }