vol-osi-assert-20080401
[openafs.git] / src / vol / daemon_com.c
1 /*
2  * Copyright 2006-2008, Sine Nomine Associates and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * localhost interprocess communication for servers
12  *
13  * currently handled by a localhost socket
14  * (yes, this needs to be replaced someday)
15  */
16
17 #ifndef _WIN32
18 #define FD_SETSIZE 65536
19 #endif
20
21 #include <afsconfig.h>
22 #include <afs/param.h>
23
24 RCSID
25     ("$Header$");
26
27 #include <sys/types.h>
28 #include <stdio.h>
29 #ifdef AFS_NT40_ENV
30 #include <winsock2.h>
31 #include <time.h>
32 #else
33 #include <sys/param.h>
34 #include <sys/socket.h>
35 #include <netinet/in.h>
36 #include <netdb.h>
37 #include <sys/time.h>
38 #endif
39 #include <errno.h>
40 #include <assert.h>
41 #include <signal.h>
42 #include <string.h>
43
44
45 #include <rx/xdr.h>
46 #include <afs/afsint.h>
47 #include "nfs.h"
48 #include <afs/errors.h>
49 #include "daemon_com.h"
50 #include "lwp.h"
51 #include "lock.h"
52 #include <afs/afssyscalls.h>
53 #include "ihandle.h"
54 #include "vnode.h"
55 #include "volume.h"
56 #include "partition.h"
57 #include <rx/rx_queue.h>
58
59 #ifdef USE_UNIX_SOCKETS
60 #include <afs/afsutil.h>
61 #include <sys/un.h>
62 #endif
63
64 /*@printflike@*/ extern void Log(const char *format, ...);
65
66 int (*V_BreakVolumeCallbacks) ();
67
68 #define MAXHANDLERS     4       /* Up to 4 clients; must be at least 2, so that
69                                  * move = dump+restore can run on single server */
70
71 #define MAX_BIND_TRIES  5       /* Number of times to retry socket bind */
72
73 static int SYNC_ask_internal(SYNC_client_state * state, SYNC_command * com, SYNC_response * res);
74
75
76 /*
77  * On AIX, connect() and bind() require use of SUN_LEN() macro;
78  * sizeof(struct sockaddr_un) will not suffice.
79  */
80 #if defined(AFS_AIX_ENV) && defined(USE_UNIX_SOCKETS)
81 #define AFS_SOCKADDR_LEN(sa)  SUN_LEN(sa)
82 #else
83 #define AFS_SOCKADDR_LEN(sa)  sizeof(*sa)
84 #endif
85
86
87 /* daemon com SYNC general interfaces */
88
89 /**
90  * fill in sockaddr structure.
91  *
92  * @param[in]  endpoint pointer to sync endpoint object
93  * @param[out] addr     pointer to sockaddr structure
94  *
95  * @post sockaddr structure populated using information from
96  *       endpoint structure.
97  */
98 void
99 SYNC_getAddr(SYNC_endpoint_t * endpoint, SYNC_sockaddr_t * addr)
100 {
101 #ifdef USE_UNIX_SOCKETS
102     char tbuffer[AFSDIR_PATH_MAX];
103 #endif /* USE_UNIX_SOCKETS */
104
105     memset(addr, 0, sizeof(*addr));
106
107 #ifdef USE_UNIX_SOCKETS
108     strcompose(tbuffer, AFSDIR_PATH_MAX, AFSDIR_SERVER_LOCAL_DIRPATH, "/",
109                endpoint->un, NULL);
110     addr->sun_family = AF_UNIX;
111     strncpy(addr->sun_path, tbuffer, (sizeof(struct sockaddr_un) - sizeof(short)));
112 #else  /* !USE_UNIX_SOCKETS */
113 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
114     addr->sin_len = sizeof(struct sockaddr_in);
115 #endif
116     addr->sin_addr.s_addr = htonl(0x7f000001);
117     addr->sin_family = AF_INET; /* was localhost->h_addrtype */
118     addr->sin_port = htons(endpoint->in);       /* XXXX htons not _really_ neccessary */
119 #endif /* !USE_UNIX_SOCKETS */
120 }
121
122 /**
123  * get a socket descriptor of the appropriate domain.
124  *
125  * @param[in]  endpoint pointer to sync endpoint object
126  *
127  * @return socket descriptor
128  *
129  * @post socket of domain specified in endpoint structure is created and
130  *       returned to caller.
131  */
132 int
133 SYNC_getSock(SYNC_endpoint_t * endpoint)
134 {
135     int sd;
136     assert((sd = socket(endpoint->domain, SOCK_STREAM, 0)) >= 0);
137     return sd;
138 }
139
140 /* daemon com SYNC client interface */
141
142 /**
143  * open a client connection to a sync server
144  *
145  * @param[in] state  pointer to sync client handle
146  *
147  * @return operation status
148  *    @retval 1 success
149  *
150  * @note at present, this routine aborts rather than returning an error code
151  */
152 int
153 SYNC_connect(SYNC_client_state * state)
154 {
155     SYNC_sockaddr_t addr;
156     /* I can't believe the following is needed for localhost connections!! */
157     static time_t backoff[] =
158         { 3, 3, 3, 5, 5, 5, 7, 15, 16, 24, 32, 40, 48, 0 };
159     time_t *timeout = &backoff[0];
160
161     if (state->fd >= 0) {
162         return 1;
163     }
164
165     SYNC_getAddr(&state->endpoint, &addr);
166
167     for (;;) {
168         state->fd = SYNC_getSock(&state->endpoint);
169         if (connect(state->fd, (struct sockaddr *)&addr, AFS_SOCKADDR_LEN(&addr)) >= 0)
170             return 1;
171         if (!*timeout)
172             break;
173         if (!(*timeout & 1))
174             Log("SYNC_connect: temporary failure on circuit '%s' (will retry)\n",
175                 state->proto_name);
176         SYNC_disconnect(state);
177         sleep(*timeout++);
178     }
179     perror("SYNC_connect failed (giving up!)");
180     return 0;
181 }
182
183 /**
184  * forcibly disconnect a sync client handle.
185  *
186  * @param[in] state  pointer to sync client handle
187  *
188  * @retval operation status
189  *    @retval 0 success
190  */
191 int
192 SYNC_disconnect(SYNC_client_state * state)
193 {
194 #ifdef AFS_NT40_ENV
195     closesocket(state->fd);
196 #else
197     close(state->fd);
198 #endif
199     state->fd = -1;
200     return 0;
201 }
202
203 /**
204  * gracefully disconnect a sync client handle.
205  *
206  * @param[in] state  pointer to sync client handle
207  *
208  * @return operation status
209  *    @retval SYNC_OK success
210  */
211 afs_int32
212 SYNC_closeChannel(SYNC_client_state * state)
213 {
214     afs_int32 code;
215     SYNC_command com;
216     SYNC_response res;
217     SYNC_PROTO_BUF_DECL(ores);
218
219     if (state->fd == -1)
220         return SYNC_OK;
221
222     memset(&com, 0, sizeof(com));
223     memset(&res, 0, sizeof(res));
224
225     res.payload.len = SYNC_PROTO_MAX_LEN;
226     res.payload.buf = ores;
227
228     com.hdr.command = SYNC_COM_CHANNEL_CLOSE;
229     com.hdr.command_len = sizeof(SYNC_command_hdr);
230     com.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
231
232     /* in case the other end dropped, don't do any retries */
233     state->retry_limit = 0;
234     state->hard_timeout = 0;
235
236     SYNC_ask(state, &com, &res);
237     SYNC_disconnect(state);
238
239     return SYNC_OK;
240 }
241
242 /**
243  * forcibly break a client connection, and then create a new connection.
244  *
245  * @param[in] state  pointer to sync client handle
246  *
247  * @post old connection dropped; new connection established
248  *
249  * @return @see SYNC_connect()
250  */
251 int
252 SYNC_reconnect(SYNC_client_state * state)
253 {
254     SYNC_disconnect(state);
255     return SYNC_connect(state);
256 }
257
258 /**
259  * send a command to a sync server and wait for a response.
260  *
261  * @param[in]  state  pointer to sync client handle
262  * @param[in]  com    command object
263  * @param[out] res    response object
264  *
265  * @return operation status
266  *    @retval SYNC_OK success
267  *    @retval SYNC_COM_ERROR communications error
268  *    @retval SYNC_BAD_COMMAND server did not recognize command code
269  *
270  * @note this routine merely handles error processing; SYNC_ask_internal()
271  *       handles the low-level details of communicating with the SYNC server.
272  *
273  * @see SYNC_ask_internal
274  */
275 afs_int32
276 SYNC_ask(SYNC_client_state * state, SYNC_command * com, SYNC_response * res)
277 {
278     int tries;
279     afs_uint32 now, timeout, code=SYNC_OK;
280
281     if (state->fatal_error) {
282         return SYNC_COM_ERROR;
283     }
284
285     if (state->fd == -1) {
286         SYNC_connect(state);
287     }
288
289     if (state->fd == -1) {
290         state->fatal_error = 1;
291         return SYNC_COM_ERROR;
292     }
293
294 #ifdef AFS_DEMAND_ATTACH_FS
295     com->hdr.flags |= SYNC_FLAG_DAFS_EXTENSIONS;
296 #endif
297
298     now = FT_ApproxTime();
299     timeout = now + state->hard_timeout;
300     for (tries = 0; 
301          (tries <= state->retry_limit) && (now <= timeout);
302          tries++, now = FT_ApproxTime()) {
303         code = SYNC_ask_internal(state, com, res);
304         if (code == SYNC_OK) {
305             break;
306         } else if (code == SYNC_BAD_COMMAND) {
307             Log("SYNC_ask: protocol mismatch on circuit '%s'; make sure "
308                 "fileserver, volserver, salvageserver and salvager are same "
309                 "version\n", state->proto_name);
310             break;
311         } else if ((code == SYNC_COM_ERROR) && (tries < state->retry_limit)) {
312             Log("SYNC_ask: protocol communications failure on circuit '%s'; "
313                 "attempting reconnect to server\n", state->proto_name);
314             SYNC_reconnect(state);
315             /* try again */
316         } else {
317             /* 
318              * unknown (probably protocol-specific) response code, pass it up to 
319              * the caller, and let them deal with it 
320              */
321             break;
322         }
323     }
324
325     if (code == SYNC_COM_ERROR) {
326         Log("SYNC_ask: fatal protocol error on circuit '%s'; disabling sync "
327             "protocol until next server restart\n", 
328             state->proto_name);
329         state->fatal_error = 1;
330     }
331
332     return code;
333 }
334
335 /**
336  * send a command to a sync server and wait for a response.
337  *
338  * @param[in]  state  pointer to sync client handle
339  * @param[in]  com    command object
340  * @param[out] res    response object
341  *
342  * @return operation status
343  *    @retval SYNC_OK success
344  *    @retval SYNC_COM_ERROR communications error
345  *
346  * @internal
347  */
348 static afs_int32
349 SYNC_ask_internal(SYNC_client_state * state, SYNC_command * com, SYNC_response * res)
350 {
351     int n;
352     SYNC_PROTO_BUF_DECL(buf);
353 #ifndef AFS_NT40_ENV
354     int iovcnt;
355     struct iovec iov[2];
356 #endif
357
358     if (state->fd == -1) {
359         Log("SYNC_ask:  invalid sync file descriptor on circuit '%s'\n",
360             state->proto_name);
361         res->hdr.response = SYNC_COM_ERROR;
362         goto done;
363     }
364
365     if (com->hdr.command_len > SYNC_PROTO_MAX_LEN) {
366         Log("SYNC_ask:  internal SYNC buffer too small on circuit '%s'; "
367             "please file a bug\n", state->proto_name);
368         res->hdr.response = SYNC_COM_ERROR;
369         goto done;
370     }
371
372     com->hdr.proto_version = state->proto_version;
373
374     memcpy(buf, &com->hdr, sizeof(com->hdr));
375     if (com->payload.len) {
376         memcpy(buf + sizeof(com->hdr), com->payload.buf, 
377                com->hdr.command_len - sizeof(com->hdr));
378     }
379
380 #ifdef AFS_NT40_ENV
381     n = send(state->fd, buf, com->hdr.command_len, 0);
382     if (n != com->hdr.command_len) {
383         Log("SYNC_ask:  write failed on circuit '%s'\n", state->proto_name);
384         res->hdr.response = SYNC_COM_ERROR;
385         goto done;
386     }
387
388     if (com->hdr.command == SYNC_COM_CHANNEL_CLOSE) {
389         /* short circuit close channel requests */
390         res->hdr.response = SYNC_OK;
391         goto done;
392     }
393
394     n = recv(state->fd, buf, SYNC_PROTO_MAX_LEN, 0);
395     if (n == 0 || (n < 0 && WSAEINTR != WSAGetLastError())) {
396         Log("SYNC_ask:  No response on circuit '%s'\n", state->proto_name);
397         res->hdr.response = SYNC_COM_ERROR;
398         goto done;
399     }
400 #else /* !AFS_NT40_ENV */
401     n = write(state->fd, buf, com->hdr.command_len);
402     if (com->hdr.command_len != n) {
403         Log("SYNC_ask: write failed on circuit '%s'\n", state->proto_name);
404         res->hdr.response = SYNC_COM_ERROR;
405         goto done;
406     }
407
408     if (com->hdr.command == SYNC_COM_CHANNEL_CLOSE) {
409         /* short circuit close channel requests */
410         res->hdr.response = SYNC_OK;
411         goto done;
412     }
413
414     /* receive the response */
415     iov[0].iov_base = (char *)&res->hdr;
416     iov[0].iov_len = sizeof(res->hdr);
417     if (res->payload.len) {
418         iov[1].iov_base = (char *)res->payload.buf;
419         iov[1].iov_len = res->payload.len;
420         iovcnt = 2;
421     } else {
422         iovcnt = 1;
423     }
424     n = readv(state->fd, iov, iovcnt);
425     if (n == 0 || (n < 0 && errno != EINTR)) {
426         Log("SYNC_ask: No response on circuit '%s'\n", state->proto_name);
427         res->hdr.response = SYNC_COM_ERROR;
428         goto done;
429     }
430 #endif /* !AFS_NT40_ENV */
431
432     res->recv_len = n;
433
434     if (n < sizeof(res->hdr)) {
435         Log("SYNC_ask:  response too short on circuit '%s'\n", 
436             state->proto_name);
437         res->hdr.response = SYNC_COM_ERROR;
438         goto done;
439     }
440 #ifdef AFS_NT40_ENV
441     memcpy(&res->hdr, buf, sizeof(res->hdr));
442 #endif
443
444     if ((n - sizeof(res->hdr)) > res->payload.len) {
445         Log("SYNC_ask:  response too long on circuit '%s'\n", 
446             state->proto_name);
447         res->hdr.response = SYNC_COM_ERROR;
448         goto done;
449     }
450 #ifdef AFS_NT40_ENV
451     memcpy(res->payload.buf, buf + sizeof(res->hdr), n - sizeof(res->hdr));
452 #endif
453
454     if (res->hdr.response_len != n) {
455         Log("SYNC_ask:  length field in response inconsistent "
456             "on circuit '%s'\n", state->proto_name);
457         res->hdr.response = SYNC_COM_ERROR;
458         goto done;
459     }
460     if (res->hdr.response == SYNC_DENIED) {
461         Log("SYNC_ask: negative response on circuit '%s'\n", state->proto_name);
462     }
463
464   done:
465     return res->hdr.response;
466 }
467
468
469 /* 
470  * daemon com SYNC server-side interfaces 
471  */
472
473 /**
474  * receive a command structure off a sync socket.
475  *
476  * @param[in] fd    socket descriptor
477  * @param[out] com  sync command object to be populated
478  *
479  * @return operation status
480  *    @retval SYNC_OK command received
481  *    @retval SYNC_COM_ERROR there was a socket communications error
482  */
483 afs_int32
484 SYNC_getCom(int fd, SYNC_command * com)
485 {
486     int n;
487     afs_int32 code = SYNC_OK;
488 #ifdef AFS_NT40_ENV
489     SYNC_PROTO_BUF_DECL(buf);
490 #else
491     struct iovec iov[2];
492     int iovcnt;
493 #endif
494
495 #ifdef AFS_NT40_ENV
496     n = recv(fd, buf, SYNC_PROTO_MAX_LEN, 0);
497
498     if (n == 0 || (n < 0 && WSAEINTR != WSAGetLastError())) {
499         Log("SYNC_getCom:  error receiving command\n");
500         code = SYNC_COM_ERROR;
501         goto done;
502     }
503 #else /* !AFS_NT40_ENV */
504     iov[0].iov_base = (char *)&com->hdr;
505     iov[0].iov_len = sizeof(com->hdr);
506     if (com->payload.len) {
507         iov[1].iov_base = (char *)com->payload.buf;
508         iov[1].iov_len = com->payload.len;
509         iovcnt = 2;
510     } else {
511         iovcnt = 1;
512     }
513
514     n = readv(fd, iov, iovcnt);
515     if (n == 0 || (n < 0 && errno != EINTR)) {
516         Log("SYNC_getCom:  error receiving command\n");
517         code = SYNC_COM_ERROR;
518         goto done;
519     }
520 #endif /* !AFS_NT40_ENV */
521
522     com->recv_len = n;
523
524     if (n < sizeof(com->hdr)) {
525         Log("SYNC_getCom:  command too short\n");
526         code = SYNC_COM_ERROR;
527         goto done;
528     }
529 #ifdef AFS_NT40_ENV
530     memcpy(&com->hdr, buf, sizeof(com->hdr));
531 #endif
532
533     if ((n - sizeof(com->hdr)) > com->payload.len) {
534         Log("SYNC_getCom:  command too long\n");
535         code = SYNC_COM_ERROR;
536         goto done;
537     }
538 #ifdef AFS_NT40_ENV
539     memcpy(com->payload.buf, buf + sizeof(com->hdr), n - sizeof(com->hdr));
540 #endif
541
542  done:
543     return code;
544 }
545
546 /**
547  * write a response structure to a sync socket.
548  *
549  * @param[in] fd
550  * @param[in] res
551  *
552  * @return operation status
553  *    @retval SYNC_OK
554  *    @retval SYNC_COM_ERROR
555  */
556 afs_int32
557 SYNC_putRes(int fd, SYNC_response * res)
558 {
559     int n;
560     afs_int32 code = SYNC_OK;
561     SYNC_PROTO_BUF_DECL(buf);
562
563     if (res->hdr.response_len > (sizeof(res->hdr) + res->payload.len)) {
564         Log("SYNC_putRes:  response_len field in response header inconsistent\n");
565         code = SYNC_COM_ERROR;
566         goto done;
567     }
568
569     if (res->hdr.response_len > SYNC_PROTO_MAX_LEN) {
570         Log("SYNC_putRes:  internal SYNC buffer too small; please file a bug\n");
571         code = SYNC_COM_ERROR;
572         goto done;
573     }
574
575 #ifdef AFS_DEMAND_ATTACH_FS
576     res->hdr.flags |= SYNC_FLAG_DAFS_EXTENSIONS;
577 #endif
578
579     memcpy(buf, &res->hdr, sizeof(res->hdr));
580     if (res->payload.len) {
581         memcpy(buf + sizeof(res->hdr), res->payload.buf, 
582                res->hdr.response_len - sizeof(res->hdr));
583     }
584
585 #ifdef AFS_NT40_ENV
586     n = send(fd, buf, res->hdr.response_len, 0);
587 #else /* !AFS_NT40_ENV */
588     n = write(fd, buf, res->hdr.response_len);
589 #endif /* !AFS_NT40_ENV */
590
591     if (res->hdr.response_len != n) {
592         Log("SYNC_putRes: write failed\n");
593         res->hdr.response = SYNC_COM_ERROR;
594         goto done;
595     }
596
597  done:
598     return code;
599 }
600
601 /* return 0 for legal (null-terminated) string,
602  * 1 for illegal (unterminated) string */
603 int
604 SYNC_verifyProtocolString(char * buf, size_t len)
605 {
606     int ret = 0;
607     size_t s_len;
608
609     s_len = afs_strnlen(buf, len);
610
611     return (s_len == len) ? 1 : 0;
612 }
613
614 /**
615  * clean up old sockets.
616  *
617  * @param[in]  state  server state object
618  *
619  * @post unix domain sockets are cleaned up
620  */
621 void
622 SYNC_cleanupSock(SYNC_server_state_t * state)
623 {
624 #ifdef USE_UNIX_SOCKETS
625     remove(state->addr.sun_path);
626 #endif
627 }
628
629 /**
630  * bind socket and set it to listen state.
631  *
632  * @param[in] state  server state object
633  *
634  * @return operation status
635  *    @retval 0 success
636  *    @retval nonzero failure
637  *
638  * @post socket bound and set to listen state
639  */
640 int
641 SYNC_bindSock(SYNC_server_state_t * state)
642 {
643     int code;
644     int on = 1;
645     int numTries;
646
647     /* Reuseaddr needed because system inexplicably leaves crud lying around */
648     code =
649         setsockopt(state->fd, SOL_SOCKET, SO_REUSEADDR, (char *)&on,
650                    sizeof(on));
651     if (code)
652         Log("SYNC_bindSock: setsockopt failed with (%d)\n", errno);
653
654     for (numTries = 0; numTries < state->bind_retry_limit; numTries++) {
655         code = bind(state->fd, 
656                     (struct sockaddr *)&state->addr, 
657                     AFS_SOCKADDR_LEN(&state->addr));
658         if (code == 0)
659             break;
660         Log("SYNC_bindSock: bind failed with (%d), will sleep and retry\n",
661             errno);
662         sleep(5);
663     }
664     listen(state->fd, state->listen_depth);
665
666     return code;
667 }