Move string manipulation functions out of util
[openafs.git] / src / vol / daemon_com.c
1 /*
2  * Copyright 2006-2008, Sine Nomine Associates and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * localhost interprocess communication for servers
12  *
13  * currently handled by a localhost socket
14  * (yes, this needs to be replaced someday)
15  */
16
17 #ifndef _WIN32
18 #define FD_SETSIZE 65536
19 #endif
20
21 #include <afsconfig.h>
22 #include <afs/param.h>
23
24 #include <roken.h>
25 #include <afs/opr.h>
26
27 #include <afs/afs_assert.h>
28 #include <rx/xdr.h>
29 #include <afs/afsint.h>
30 #include <afs/errors.h>
31
32 #include "nfs.h"
33 #include "daemon_com.h"
34 #include "lwp.h"
35 #include "lock.h"
36 #include <afs/afssyscalls.h>
37 #include "ihandle.h"
38 #include "vnode.h"
39 #include "volume.h"
40 #include "partition.h"
41 #include "common.h"
42 #include <rx/rx_queue.h>
43
44 #ifdef USE_UNIX_SOCKETS
45 #include <afs/afsutil.h>
46 #include <sys/un.h>
47 #endif
48
49 int (*V_BreakVolumeCallbacks) (VolumeId);
50
51 #define MAXHANDLERS     4       /* Up to 4 clients; must be at least 2, so that
52                                  * move = dump+restore can run on single server */
53
54 #define MAX_BIND_TRIES  5       /* Number of times to retry socket bind */
55
56 static int SYNC_ask_internal(SYNC_client_state * state, SYNC_command * com, SYNC_response * res);
57
58
59 /*
60  * On AIX, connect() and bind() require use of SUN_LEN() macro;
61  * sizeof(struct sockaddr_un) will not suffice.
62  */
63 #if defined(AFS_AIX_ENV) && defined(USE_UNIX_SOCKETS)
64 #define AFS_SOCKADDR_LEN(sa)  SUN_LEN(sa)
65 #else
66 #define AFS_SOCKADDR_LEN(sa)  sizeof(*sa)
67 #endif
68
69
70 /* daemon com SYNC general interfaces */
71
72 /**
73  * fill in sockaddr structure.
74  *
75  * @param[in]  endpoint pointer to sync endpoint object
76  * @param[out] addr     pointer to sockaddr structure
77  *
78  * @post sockaddr structure populated using information from
79  *       endpoint structure.
80  */
81 void
82 SYNC_getAddr(SYNC_endpoint_t * endpoint, SYNC_sockaddr_t * addr)
83 {
84 #ifdef USE_UNIX_SOCKETS
85     char tbuffer[AFSDIR_PATH_MAX];
86 #endif /* USE_UNIX_SOCKETS */
87
88     memset(addr, 0, sizeof(*addr));
89
90 #ifdef USE_UNIX_SOCKETS
91     strcompose(tbuffer, AFSDIR_PATH_MAX, AFSDIR_SERVER_LOCAL_DIRPATH, "/",
92                endpoint->un, NULL);
93     addr->sun_family = AF_UNIX;
94     strncpy(addr->sun_path, tbuffer, (sizeof(struct sockaddr_un) - sizeof(short)));
95 #else  /* !USE_UNIX_SOCKETS */
96 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
97     addr->sin_len = sizeof(struct sockaddr_in);
98 #endif
99     addr->sin_addr.s_addr = htonl(0x7f000001);
100     addr->sin_family = AF_INET; /* was localhost->h_addrtype */
101     addr->sin_port = htons(endpoint->in);       /* XXXX htons not _really_ neccessary */
102 #endif /* !USE_UNIX_SOCKETS */
103 }
104
105 /**
106  * get a socket descriptor of the appropriate domain.
107  *
108  * @param[in]  endpoint pointer to sync endpoint object
109  *
110  * @return socket descriptor
111  *
112  * @post socket of domain specified in endpoint structure is created and
113  *       returned to caller.
114  */
115 osi_socket
116 SYNC_getSock(SYNC_endpoint_t * endpoint)
117 {
118     osi_socket sd;
119     osi_Assert((sd = socket(endpoint->domain, SOCK_STREAM, 0)) >= 0);
120     return sd;
121 }
122
123 /* daemon com SYNC client interface */
124
125 /**
126  * open a client connection to a sync server
127  *
128  * @param[in] state  pointer to sync client handle
129  *
130  * @return operation status
131  *    @retval 1 success
132  *
133  * @note at present, this routine aborts rather than returning an error code
134  */
135 int
136 SYNC_connect(SYNC_client_state * state)
137 {
138     SYNC_sockaddr_t addr;
139     /* I can't believe the following is needed for localhost connections!! */
140     static time_t backoff[] =
141         { 3, 3, 3, 5, 5, 5, 7, 15, 16, 24, 32, 40, 48, 0 };
142     time_t *timeout = &backoff[0];
143
144     if (state->fd != OSI_NULLSOCKET) {
145         return 1;
146     }
147
148     SYNC_getAddr(&state->endpoint, &addr);
149
150     for (;;) {
151         state->fd = SYNC_getSock(&state->endpoint);
152         if (connect(state->fd, (struct sockaddr *)&addr, AFS_SOCKADDR_LEN(&addr)) >= 0)
153             return 1;
154         if (!*timeout)
155             break;
156         if (!(*timeout & 1))
157             Log("SYNC_connect: temporary failure on circuit '%s' (will retry)\n",
158                 state->proto_name);
159         SYNC_disconnect(state);
160         sleep(*timeout++);
161     }
162     perror("SYNC_connect failed (giving up!)");
163     return 0;
164 }
165
166 /**
167  * forcibly disconnect a sync client handle.
168  *
169  * @param[in] state  pointer to sync client handle
170  *
171  * @retval operation status
172  *    @retval 0 success
173  */
174 int
175 SYNC_disconnect(SYNC_client_state * state)
176 {
177     rk_closesocket(state->fd);
178     state->fd = OSI_NULLSOCKET;
179     return 0;
180 }
181
182 /**
183  * gracefully disconnect a sync client handle.
184  *
185  * @param[in] state  pointer to sync client handle
186  *
187  * @return operation status
188  *    @retval SYNC_OK success
189  */
190 afs_int32
191 SYNC_closeChannel(SYNC_client_state * state)
192 {
193     SYNC_command com;
194     SYNC_response res;
195     SYNC_PROTO_BUF_DECL(ores);
196
197     if (state->fd == OSI_NULLSOCKET)
198         return SYNC_OK;
199
200     memset(&com, 0, sizeof(com));
201     memset(&res, 0, sizeof(res));
202
203     res.payload.len = SYNC_PROTO_MAX_LEN;
204     res.payload.buf = ores;
205
206     com.hdr.command = SYNC_COM_CHANNEL_CLOSE;
207     com.hdr.command_len = sizeof(SYNC_command_hdr);
208     com.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
209
210     /* in case the other end dropped, don't do any retries */
211     state->retry_limit = 0;
212     state->hard_timeout = 0;
213
214     SYNC_ask(state, &com, &res);
215     SYNC_disconnect(state);
216
217     return SYNC_OK;
218 }
219
220 /**
221  * forcibly break a client connection, and then create a new connection.
222  *
223  * @param[in] state  pointer to sync client handle
224  *
225  * @post old connection dropped; new connection established
226  *
227  * @return @see SYNC_connect()
228  */
229 int
230 SYNC_reconnect(SYNC_client_state * state)
231 {
232     SYNC_disconnect(state);
233     return SYNC_connect(state);
234 }
235
236 /**
237  * send a command to a sync server and wait for a response.
238  *
239  * @param[in]  state  pointer to sync client handle
240  * @param[in]  com    command object
241  * @param[out] res    response object
242  *
243  * @return operation status
244  *    @retval SYNC_OK success
245  *    @retval SYNC_COM_ERROR communications error
246  *    @retval SYNC_BAD_COMMAND server did not recognize command code
247  *
248  * @note this routine merely handles error processing; SYNC_ask_internal()
249  *       handles the low-level details of communicating with the SYNC server.
250  *
251  * @see SYNC_ask_internal
252  */
253 afs_int32
254 SYNC_ask(SYNC_client_state * state, SYNC_command * com, SYNC_response * res)
255 {
256     int tries;
257     afs_uint32 now, timeout, code=SYNC_OK;
258
259     if (state->fatal_error) {
260         return SYNC_COM_ERROR;
261     }
262
263     if (state->fd == OSI_NULLSOCKET) {
264         SYNC_connect(state);
265     }
266
267     if (state->fd == OSI_NULLSOCKET) {
268         state->fatal_error = 1;
269         return SYNC_COM_ERROR;
270     }
271
272 #ifdef AFS_DEMAND_ATTACH_FS
273     com->hdr.flags |= SYNC_FLAG_DAFS_EXTENSIONS;
274 #endif
275
276     now = FT_ApproxTime();
277     timeout = now + state->hard_timeout;
278     for (tries = 0;
279          (tries <= state->retry_limit) && (now <= timeout);
280          tries++, now = FT_ApproxTime()) {
281         code = SYNC_ask_internal(state, com, res);
282         if (code == SYNC_OK) {
283             break;
284         } else if (code == SYNC_BAD_COMMAND) {
285             Log("SYNC_ask: protocol mismatch on circuit '%s'; make sure "
286                 "fileserver, volserver, salvageserver and salvager are same "
287                 "version\n", state->proto_name);
288             break;
289         } else if ((code == SYNC_COM_ERROR) && (tries < state->retry_limit)) {
290             Log("SYNC_ask: protocol communications failure on circuit '%s'; "
291                 "attempting reconnect to server\n", state->proto_name);
292             SYNC_reconnect(state);
293             /* try again */
294         } else {
295             /*
296              * unknown (probably protocol-specific) response code, pass it up to
297              * the caller, and let them deal with it
298              */
299             break;
300         }
301     }
302
303     if (code == SYNC_COM_ERROR) {
304         Log("SYNC_ask: fatal protocol error on circuit '%s'; disabling sync "
305             "protocol until next server restart\n",
306             state->proto_name);
307         state->fatal_error = 1;
308     }
309
310     return code;
311 }
312
313 /**
314  * send a command to a sync server and wait for a response.
315  *
316  * @param[in]  state  pointer to sync client handle
317  * @param[in]  com    command object
318  * @param[out] res    response object
319  *
320  * @return operation status
321  *    @retval SYNC_OK success
322  *    @retval SYNC_COM_ERROR communications error
323  *
324  * @internal
325  */
326 static afs_int32
327 SYNC_ask_internal(SYNC_client_state * state, SYNC_command * com, SYNC_response * res)
328 {
329     int n;
330     SYNC_PROTO_BUF_DECL(buf);
331 #ifndef AFS_NT40_ENV
332     int iovcnt;
333     struct iovec iov[2];
334 #endif
335
336     if (state->fd == OSI_NULLSOCKET) {
337         Log("SYNC_ask:  invalid sync file descriptor on circuit '%s'\n",
338             state->proto_name);
339         res->hdr.response = SYNC_COM_ERROR;
340         goto done;
341     }
342
343     if (com->hdr.command_len > SYNC_PROTO_MAX_LEN) {
344         Log("SYNC_ask:  internal SYNC buffer too small on circuit '%s'; "
345             "please file a bug\n", state->proto_name);
346         res->hdr.response = SYNC_COM_ERROR;
347         goto done;
348     }
349
350     /*
351      * fill in some common header fields
352      */
353     com->hdr.proto_version = state->proto_version;
354     com->hdr.pkt_seq = ++state->pkt_seq;
355     com->hdr.com_seq = ++state->com_seq;
356 #ifdef AFS_NT40_ENV
357     com->hdr.pid = 0;
358     com->hdr.tid = 0;
359 #else
360     com->hdr.pid = getpid();
361 #ifdef AFS_PTHREAD_ENV
362     com->hdr.tid = afs_pointer_to_int(pthread_self());
363 #else
364     {
365         PROCESS handle = LWP_ThreadId();
366         com->hdr.tid = (handle) ? handle->index : 0;
367     }
368 #endif /* !AFS_PTHREAD_ENV */
369 #endif /* !AFS_NT40_ENV */
370
371     memcpy(buf, &com->hdr, sizeof(com->hdr));
372     if (com->payload.len) {
373         memcpy(buf + sizeof(com->hdr), com->payload.buf,
374                com->hdr.command_len - sizeof(com->hdr));
375     }
376
377 #ifdef AFS_NT40_ENV
378     n = send(state->fd, buf, com->hdr.command_len, 0);
379     if (n != com->hdr.command_len) {
380         Log("SYNC_ask:  write failed on circuit '%s'\n", state->proto_name);
381         res->hdr.response = SYNC_COM_ERROR;
382         goto done;
383     }
384
385     if (com->hdr.command == SYNC_COM_CHANNEL_CLOSE) {
386         /* short circuit close channel requests */
387         res->hdr.response = SYNC_OK;
388         goto done;
389     }
390
391     n = recv(state->fd, buf, SYNC_PROTO_MAX_LEN, 0);
392     if (n == 0 || (n < 0 && WSAEINTR != WSAGetLastError())) {
393         Log("SYNC_ask:  No response on circuit '%s'\n", state->proto_name);
394         res->hdr.response = SYNC_COM_ERROR;
395         goto done;
396     }
397 #else /* !AFS_NT40_ENV */
398     n = write(state->fd, buf, com->hdr.command_len);
399     if (com->hdr.command_len != n) {
400         Log("SYNC_ask: write failed on circuit '%s'\n", state->proto_name);
401         res->hdr.response = SYNC_COM_ERROR;
402         goto done;
403     }
404
405     if (com->hdr.command == SYNC_COM_CHANNEL_CLOSE) {
406         /* short circuit close channel requests */
407         res->hdr.response = SYNC_OK;
408         goto done;
409     }
410
411     /* receive the response */
412     iov[0].iov_base = (char *)&res->hdr;
413     iov[0].iov_len = sizeof(res->hdr);
414     if (res->payload.len) {
415         iov[1].iov_base = (char *)res->payload.buf;
416         iov[1].iov_len = res->payload.len;
417         iovcnt = 2;
418     } else {
419         iovcnt = 1;
420     }
421     n = readv(state->fd, iov, iovcnt);
422     if (n == 0 || (n < 0 && errno != EINTR)) {
423         Log("SYNC_ask: No response on circuit '%s'\n", state->proto_name);
424         res->hdr.response = SYNC_COM_ERROR;
425         goto done;
426     }
427 #endif /* !AFS_NT40_ENV */
428
429     res->recv_len = n;
430
431     if (n < sizeof(res->hdr)) {
432         Log("SYNC_ask:  response too short on circuit '%s'\n",
433             state->proto_name);
434         res->hdr.response = SYNC_COM_ERROR;
435         goto done;
436     }
437 #ifdef AFS_NT40_ENV
438     memcpy(&res->hdr, buf, sizeof(res->hdr));
439 #endif
440
441     if ((n - sizeof(res->hdr)) > res->payload.len) {
442         Log("SYNC_ask:  response too long on circuit '%s'\n",
443             state->proto_name);
444         res->hdr.response = SYNC_COM_ERROR;
445         goto done;
446     }
447 #ifdef AFS_NT40_ENV
448     memcpy(res->payload.buf, buf + sizeof(res->hdr), n - sizeof(res->hdr));
449 #endif
450
451     if (res->hdr.response_len != n) {
452         Log("SYNC_ask:  length field in response inconsistent "
453             "on circuit '%s'\n", state->proto_name);
454         res->hdr.response = SYNC_COM_ERROR;
455         goto done;
456     }
457     if (res->hdr.response == SYNC_DENIED) {
458         Log("SYNC_ask: negative response on circuit '%s'\n", state->proto_name);
459     }
460
461   done:
462     return res->hdr.response;
463 }
464
465
466 /*
467  * daemon com SYNC server-side interfaces
468  */
469
470 /**
471  * receive a command structure off a sync socket.
472  *
473  * @param[in]  state  pointer to server-side state object
474  * @param[in]  fd     file descriptor on which to perform i/o
475  * @param[out] com    sync command object to be populated
476  *
477  * @return operation status
478  *    @retval SYNC_OK command received
479  *    @retval SYNC_COM_ERROR there was a socket communications error
480  */
481 afs_int32
482 SYNC_getCom(SYNC_server_state_t * state,
483             osi_socket fd,
484             SYNC_command * com)
485 {
486     int n;
487     afs_int32 code = SYNC_OK;
488 #ifdef AFS_NT40_ENV
489     SYNC_PROTO_BUF_DECL(buf);
490 #else
491     struct iovec iov[2];
492     int iovcnt;
493 #endif
494
495 #ifdef AFS_NT40_ENV
496     n = recv(fd, buf, SYNC_PROTO_MAX_LEN, 0);
497
498     if (n == 0 || (n < 0 && WSAEINTR != WSAGetLastError())) {
499         Log("SYNC_getCom:  error receiving command\n");
500         code = SYNC_COM_ERROR;
501         goto done;
502     }
503 #else /* !AFS_NT40_ENV */
504     iov[0].iov_base = (char *)&com->hdr;
505     iov[0].iov_len = sizeof(com->hdr);
506     if (com->payload.len) {
507         iov[1].iov_base = (char *)com->payload.buf;
508         iov[1].iov_len = com->payload.len;
509         iovcnt = 2;
510     } else {
511         iovcnt = 1;
512     }
513
514     n = readv(fd, iov, iovcnt);
515     if (n == 0 || (n < 0 && errno != EINTR)) {
516         Log("SYNC_getCom:  error receiving command\n");
517         code = SYNC_COM_ERROR;
518         goto done;
519     }
520 #endif /* !AFS_NT40_ENV */
521
522     com->recv_len = n;
523
524     if (n < sizeof(com->hdr)) {
525         Log("SYNC_getCom:  command too short\n");
526         code = SYNC_COM_ERROR;
527         goto done;
528     }
529 #ifdef AFS_NT40_ENV
530     memcpy(&com->hdr, buf, sizeof(com->hdr));
531 #endif
532
533     if ((n - sizeof(com->hdr)) > com->payload.len) {
534         Log("SYNC_getCom:  command too long\n");
535         code = SYNC_COM_ERROR;
536         goto done;
537     }
538 #ifdef AFS_NT40_ENV
539     memcpy(com->payload.buf, buf + sizeof(com->hdr), n - sizeof(com->hdr));
540 #endif
541
542  done:
543     return code;
544 }
545
546 /**
547  * write a response structure to a sync socket.
548  *
549  * @param[in] state  handle to server-side state object
550  * @param[in] fd     file descriptor on which to perform i/o
551  * @param[in] res    handle to response packet
552  *
553  * @return operation status
554  *    @retval SYNC_OK
555  *    @retval SYNC_COM_ERROR
556  */
557 afs_int32
558 SYNC_putRes(SYNC_server_state_t * state,
559             osi_socket fd,
560             SYNC_response * res)
561 {
562     int n;
563     afs_int32 code = SYNC_OK;
564     SYNC_PROTO_BUF_DECL(buf);
565
566     if (res->hdr.response_len > (sizeof(res->hdr) + res->payload.len)) {
567         Log("SYNC_putRes:  response_len field in response header inconsistent\n");
568         code = SYNC_COM_ERROR;
569         goto done;
570     }
571
572     if (res->hdr.response_len > SYNC_PROTO_MAX_LEN) {
573         Log("SYNC_putRes:  internal SYNC buffer too small; please file a bug\n");
574         code = SYNC_COM_ERROR;
575         goto done;
576     }
577
578 #ifdef AFS_DEMAND_ATTACH_FS
579     res->hdr.flags |= SYNC_FLAG_DAFS_EXTENSIONS;
580 #endif
581     res->hdr.proto_version = state->proto_version;
582     res->hdr.pkt_seq = ++state->pkt_seq;
583     res->hdr.res_seq = ++state->res_seq;
584
585     memcpy(buf, &res->hdr, sizeof(res->hdr));
586     if (res->payload.len) {
587         memcpy(buf + sizeof(res->hdr), res->payload.buf,
588                res->hdr.response_len - sizeof(res->hdr));
589     }
590
591 #ifdef AFS_NT40_ENV
592     n = send(fd, buf, res->hdr.response_len, 0);
593 #else /* !AFS_NT40_ENV */
594     n = write(fd, buf, res->hdr.response_len);
595 #endif /* !AFS_NT40_ENV */
596
597     if (res->hdr.response_len != n) {
598         Log("SYNC_putRes: write failed\n");
599         res->hdr.response = SYNC_COM_ERROR;
600         goto done;
601     }
602
603  done:
604     return code;
605 }
606
607 /* return 0 for legal (null-terminated) string,
608  * 1 for illegal (unterminated) string */
609 int
610 SYNC_verifyProtocolString(char * buf, size_t len)
611 {
612     size_t s_len;
613
614     s_len = strnlen(buf, len);
615
616     return (s_len == len) ? 1 : 0;
617 }
618
619 /**
620  * clean up old sockets.
621  *
622  * @param[in]  state  server state object
623  *
624  * @post unix domain sockets are cleaned up
625  */
626 void
627 SYNC_cleanupSock(SYNC_server_state_t * state)
628 {
629 #ifdef USE_UNIX_SOCKETS
630     remove(state->addr.sun_path);
631 #endif
632 }
633
634 /**
635  * bind socket and set it to listen state.
636  *
637  * @param[in] state  server state object
638  *
639  * @return operation status
640  *    @retval 0 success
641  *    @retval nonzero failure
642  *
643  * @post socket bound and set to listen state
644  */
645 int
646 SYNC_bindSock(SYNC_server_state_t * state)
647 {
648     int code;
649     int on = 1;
650     int numTries;
651
652     /* Reuseaddr needed because system inexplicably leaves crud lying around */
653     code =
654         setsockopt(state->fd, SOL_SOCKET, SO_REUSEADDR, (char *)&on,
655                    sizeof(on));
656     if (code)
657         Log("SYNC_bindSock: setsockopt failed with (%d)\n", errno);
658
659     for (numTries = 0; numTries < state->bind_retry_limit; numTries++) {
660         code = bind(state->fd,
661                     (struct sockaddr *)&state->addr,
662                     AFS_SOCKADDR_LEN(&state->addr));
663         if (code == 0)
664             break;
665         Log("SYNC_bindSock: bind failed with (%d), will sleep and retry\n",
666             errno);
667         sleep(5);
668     }
669     listen(state->fd, state->listen_depth);
670
671     return code;
672 }