dafs-20060317
[openafs.git] / src / vol / daemon_com.c
1 /*
2  * Copyright 2006, Sine Nomine Associates and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 /*
11  * localhost interprocess communication for servers
12  *
13  * currently handled by a localhost socket
14  * (yes, this needs to be replaced someday)
15  */
16
17 #ifndef _WIN32
18 #define FD_SETSIZE 65536
19 #endif
20
21 #include <afsconfig.h>
22 #include <afs/param.h>
23
24 RCSID
25     ("$Header$");
26
27 #include <sys/types.h>
28 #include <stdio.h>
29 #ifdef AFS_NT40_ENV
30 #include <winsock2.h>
31 #include <time.h>
32 #else
33 #include <sys/param.h>
34 #include <sys/socket.h>
35 #include <netinet/in.h>
36 #include <netdb.h>
37 #include <sys/time.h>
38 #endif
39 #include <errno.h>
40 #include <assert.h>
41 #include <signal.h>
42
43 #ifdef HAVE_STRING_H
44 #include <string.h>
45 #else
46 #ifdef HAVE_STRINGS_H
47 #include <strings.h>
48 #endif
49 #endif
50
51
52 #include <rx/xdr.h>
53 #include <afs/afsint.h>
54 #include "nfs.h"
55 #include <afs/errors.h>
56 #include "daemon_com.h"
57 #include "lwp.h"
58 #include "lock.h"
59 #include <afs/afssyscalls.h>
60 #include "ihandle.h"
61 #include "vnode.h"
62 #include "volume.h"
63 #include "partition.h"
64 #include <rx/rx_queue.h>
65
66 /*@printflike@*/ extern void Log(const char *format, ...);
67
68 #ifdef osi_Assert
69 #undef osi_Assert
70 #endif
71 #define osi_Assert(e) (void)(e)
72
73 int (*V_BreakVolumeCallbacks) ();
74
75 #define MAXHANDLERS     4       /* Up to 4 clients; must be at least 2, so that
76                                  * move = dump+restore can run on single server */
77
78 #define MAX_BIND_TRIES  5       /* Number of times to retry socket bind */
79
80 static int getport(SYNC_client_state * state, struct sockaddr_in *addr);
81 static int SYNC_ask_internal(SYNC_client_state * state, SYNC_command * com, SYNC_response * res);
82
83 /* daemon com SYNC client interface */
84
85 int
86 SYNC_connect(SYNC_client_state * state)
87 {
88     struct sockaddr_in addr;
89     /* I can't believe the following is needed for localhost connections!! */
90     static time_t backoff[] =
91         { 3, 3, 3, 5, 5, 5, 7, 15, 16, 24, 32, 40, 48, 0 };
92     time_t *timeout = &backoff[0];
93
94     if (state->fd >= 0) {
95         return 1;
96     }
97
98     for (;;) {
99         state->fd = getport(state, &addr);
100         if (connect(state->fd, (struct sockaddr *)&addr, sizeof(addr)) >= 0)
101             return 1;
102         if (!*timeout)
103             break;
104         if (!(*timeout & 1))
105             Log("SYNC_connect temporary failure (will retry)\n");
106         SYNC_disconnect(state);
107         sleep(*timeout++);
108     }
109     perror("SYNC_connect failed (giving up!)");
110     return 0;
111 }
112
113 int
114 SYNC_disconnect(SYNC_client_state * state)
115 {
116 #ifdef AFS_NT40_ENV
117     closesocket(state->fd);
118 #else
119     close(state->fd);
120 #endif
121     state->fd = -1;
122     return 0;
123 }
124
125 afs_int32
126 SYNC_closeChannel(SYNC_client_state * state)
127 {
128     afs_int32 code;
129     SYNC_command com;
130     SYNC_response res;
131     SYNC_PROTO_BUF_DECL(ores);
132
133     if (state->fd == -1)
134         return SYNC_OK;
135
136     memset(&com, 0, sizeof(com));
137     memset(&res, 0, sizeof(res));
138
139     res.payload.len = SYNC_PROTO_MAX_LEN;
140     res.payload.buf = ores;
141
142     com.hdr.command = SYNC_COM_CHANNEL_CLOSE;
143     com.hdr.command_len = sizeof(SYNC_command_hdr);
144
145     /* in case the other end dropped, don't do any retries */
146     state->retry_limit = 0;
147     state->hard_timeout = 0;
148
149     code = SYNC_ask(state, &com, &res);
150
151     if (code == SYNC_OK) {
152         if (res.hdr.response != SYNC_OK) {
153             Log("SYNC_closeChannel:  channel shutdown request denied; closing socket anyway\n");
154         } else if (!(res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN)) {
155             Log("SYNC_closeChannel:  channel shutdown request mishandled by server\n");
156         }
157     } else {
158         Log("SYNC_closeChannel: channel communications problem");
159     }
160
161     SYNC_disconnect(state);
162
163     return code;
164 }
165
166 int
167 SYNC_reconnect(SYNC_client_state * state)
168 {
169     SYNC_disconnect(state);
170     return SYNC_connect(state);
171 }
172
173 /* private function to fill in the sockaddr struct for us */
174 static int
175 getport(SYNC_client_state * state, struct sockaddr_in *addr)
176 {
177     int sd;
178
179     memset(addr, 0, sizeof(*addr));
180     assert((sd = socket(AF_INET, SOCK_STREAM, 0)) >= 0);
181 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
182     addr->sin_len = sizeof(struct sockaddr_in);
183 #endif
184     addr->sin_addr.s_addr = htonl(0x7f000001);
185     addr->sin_family = AF_INET; /* was localhost->h_addrtype */
186     addr->sin_port = htons(state->port);        /* XXXX htons not _really_ neccessary */
187
188     return sd;
189 }
190
191 afs_int32
192 SYNC_ask(SYNC_client_state * state, SYNC_command * com, SYNC_response * res)
193 {
194     int tries;
195     afs_uint32 now, timeout, code=SYNC_OK;
196
197     if (state->fatal_error) {
198         return SYNC_COM_ERROR;
199     }
200
201     if (state->fd == -1) {
202         SYNC_connect(state);
203     }
204
205     if (state->fd == -1) {
206         state->fatal_error = 1;
207         return SYNC_COM_ERROR;
208     }
209
210 #ifdef AFS_DEMAND_ATTACH_FS
211     com->hdr.flags |= SYNC_FLAG_DAFS_EXTENSIONS;
212 #endif
213
214     now = FT_ApproxTime();
215     timeout = now + state->hard_timeout;
216     for (tries = 0; 
217          (tries <= state->retry_limit) && (now <= timeout);
218          tries++, now = FT_ApproxTime()) {
219         code = SYNC_ask_internal(state, com, res);
220         if (code == SYNC_OK) {
221             break;
222         } else if (code == SYNC_BAD_COMMAND) {
223             Log("SYNC_ask: protocol mismatch; make sure fileserver, volserver, salvageserver and salvager are same version\n");
224             break;
225         } else if (code == SYNC_COM_ERROR) {
226             Log("SYNC_ask: protocol communications failure; attempting reconnect to server\n");
227             SYNC_reconnect(state);
228             /* try again */
229         } else {
230             /* unknown (probably protocol-specific) response code, pass it up to the caller, and let them deal with it */
231             break;
232         }
233     }
234
235     if (code == SYNC_COM_ERROR) {
236         Log("SYNC_ask: fatal protocol error; disabling sync protocol to server running on port %d until next server restart\n", 
237             state->port);
238         state->fatal_error = 1;
239     }
240
241     return code;
242 }
243
244 static afs_int32
245 SYNC_ask_internal(SYNC_client_state * state, SYNC_command * com, SYNC_response * res)
246 {
247     int n;
248     SYNC_PROTO_BUF_DECL(buf);
249 #ifndef AFS_NT40_ENV
250     int iovcnt;
251     struct iovec iov[2];
252 #endif
253
254     if (state->fd == -1) {
255         Log("SYNC_ask:  invalid sync file descriptor\n");
256         res->hdr.response = SYNC_COM_ERROR;
257         goto done;
258     }
259
260     if (com->hdr.command_len > SYNC_PROTO_MAX_LEN) {
261         Log("SYNC_ask:  internal SYNC buffer too small; please file a bug\n");
262         res->hdr.response = SYNC_COM_ERROR;
263         goto done;
264     }
265
266     com->hdr.proto_version = state->proto_version;
267
268     memcpy(buf, &com->hdr, sizeof(com->hdr));
269     if (com->payload.len) {
270         memcpy(buf + sizeof(com->hdr), com->payload.buf, 
271                com->hdr.command_len - sizeof(com->hdr));
272     }
273
274 #ifdef AFS_NT40_ENV
275     n = send(state->fd, buf, com->hdr.command_len, 0);
276     if (n != com->hdr.command_len) {
277         Log("SYNC_ask:  write failed\n");
278         res->hdr.response = SYNC_COM_ERROR;
279         goto done;
280     }
281
282     n = recv(state->fd, buf, SYNC_PROTO_MAX_LEN, 0);
283     if (n == 0 || (n < 0 && WSAEINTR != WSAGetLastError())) {
284         Log("SYNC_ask:  No response\n");
285         res->hdr.response = SYNC_COM_ERROR;
286         goto done;
287     }
288 #else /* !AFS_NT40_ENV */
289     n = write(state->fd, buf, com->hdr.command_len);
290     if (com->hdr.command_len != n) {
291         Log("SYNC_ask: write failed\n");
292         res->hdr.response = SYNC_COM_ERROR;
293         goto done;
294     }
295
296     /* receive the response */
297     iov[0].iov_base = (char *)&res->hdr;
298     iov[0].iov_len = sizeof(res->hdr);
299     if (res->payload.len) {
300         iov[1].iov_base = (char *)res->payload.buf;
301         iov[1].iov_len = res->payload.len;
302         iovcnt = 2;
303     } else {
304         iovcnt = 1;
305     }
306     n = readv(state->fd, iov, iovcnt);
307     if (n == 0 || (n < 0 && errno != EINTR)) {
308         Log("SYNC_ask: No response\n");
309         res->hdr.response = SYNC_COM_ERROR;
310         goto done;
311     }
312 #endif /* !AFS_NT40_ENV */
313
314     res->recv_len = n;
315
316     if (n < sizeof(res->hdr)) {
317         Log("SYNC_ask:  response too short\n");
318         res->hdr.response = SYNC_COM_ERROR;
319         goto done;
320     }
321 #ifdef AFS_NT40_ENV
322     memcpy(&res->hdr, buf, sizeof(res->hdr));
323 #endif
324
325     if ((n - sizeof(res->hdr)) > res->payload.len) {
326         Log("SYNC_ask:  response too long\n");
327         res->hdr.response = SYNC_COM_ERROR;
328         goto done;
329     }
330 #ifdef AFS_NT40_ENV
331     memcpy(res->payload.buf, buf + sizeof(res->hdr), n - sizeof(res->hdr));
332 #endif
333
334     if (res->hdr.response_len != n) {
335         Log("SYNC_ask:  length field in response inconsistent\n");
336         res->hdr.response = SYNC_COM_ERROR;
337         goto done;
338     }
339     if (res->hdr.response == SYNC_DENIED) {
340         Log("SYNC_ask: negative response\n");
341     }
342
343   done:
344     return res->hdr.response;
345 }
346
347
348 /* 
349  * daemon com SYNC server-side interfaces 
350  */
351
352 /* get a command */
353 afs_int32
354 SYNC_getCom(int fd, SYNC_command * com)
355 {
356     int n;
357     afs_int32 code = SYNC_OK;
358 #ifdef AFS_NT40_ENV
359     SYNC_PROTO_BUF_DECL(buf);
360 #else
361     struct iovec iov[2];
362     int iovcnt;
363 #endif
364
365 #ifdef AFS_NT40_ENV
366     n = recv(fd, buf, SYNC_PROTO_MAX_LEN, 0);
367
368     if (n == 0 || (n < 0 && WSAEINTR != WSAGetLastError())) {
369         Log("SYNC_getCom:  error receiving command\n");
370         code = SYNC_COM_ERROR;
371         goto done;
372     }
373 #else /* !AFS_NT40_ENV */
374     iov[0].iov_base = (char *)&com->hdr;
375     iov[0].iov_len = sizeof(com->hdr);
376     if (com->payload.len) {
377         iov[1].iov_base = (char *)com->payload.buf;
378         iov[1].iov_len = com->payload.len;
379         iovcnt = 2;
380     } else {
381         iovcnt = 1;
382     }
383
384     n = readv(fd, iov, iovcnt);
385     if (n == 0 || (n < 0 && errno != EINTR)) {
386         Log("SYNC_getCom:  error receiving command\n");
387         code = SYNC_COM_ERROR;
388         goto done;
389     }
390 #endif /* !AFS_NT40_ENV */
391
392     com->recv_len = n;
393
394     if (n < sizeof(com->hdr)) {
395         Log("SYNC_getCom:  command too short\n");
396         code = SYNC_COM_ERROR;
397         goto done;
398     }
399 #ifdef AFS_NT40_ENV
400     memcpy(&com->hdr, buf, sizeof(com->hdr));
401 #endif
402
403     if ((n - sizeof(com->hdr)) > com->payload.len) {
404         Log("SYNC_getCom:  command too long\n");
405         code = SYNC_COM_ERROR;
406         goto done;
407     }
408 #ifdef AFS_NT40_ENV
409     memcpy(com->payload.buf, buf + sizeof(com->hdr), n - sizeof(com->hdr));
410 #endif
411
412  done:
413     return code;
414 }
415
416 /* put a response */
417 afs_int32
418 SYNC_putRes(int fd, SYNC_response * res)
419 {
420     int n;
421     afs_int32 code = SYNC_OK;
422     SYNC_PROTO_BUF_DECL(buf);
423
424     if (res->hdr.response_len > (sizeof(res->hdr) + res->payload.len)) {
425         Log("SYNC_putRes:  response_len field in response header inconsistent\n");
426         code = SYNC_COM_ERROR;
427         goto done;
428     }
429
430     if (res->hdr.response_len > SYNC_PROTO_MAX_LEN) {
431         Log("SYNC_putRes:  internal SYNC buffer too small; please file a bug\n");
432         code = SYNC_COM_ERROR;
433         goto done;
434     }
435
436 #ifdef AFS_DEMAND_ATTACH_FS
437     res->hdr.flags |= SYNC_FLAG_DAFS_EXTENSIONS;
438 #endif
439
440     memcpy(buf, &res->hdr, sizeof(res->hdr));
441     if (res->payload.len) {
442         memcpy(buf + sizeof(res->hdr), res->payload.buf, 
443                res->hdr.response_len - sizeof(res->hdr));
444     }
445
446 #ifdef AFS_NT40_ENV
447     n = send(fd, buf, res->hdr.response_len, 0);
448 #else /* !AFS_NT40_ENV */
449     n = write(fd, buf, res->hdr.response_len);
450 #endif /* !AFS_NT40_ENV */
451
452     if (res->hdr.response_len != n) {
453         Log("SYNC_putRes: write failed\n");
454         res->hdr.response = SYNC_COM_ERROR;
455         goto done;
456     }
457
458  done:
459     return code;
460 }
461
462 /* return 0 for legal (null-terminated) string,
463  * 1 for illegal (unterminated) string */
464 int
465 SYNC_verifyProtocolString(char * buf, size_t len)
466 {
467     int ret = 0;
468     size_t s_len;
469
470     s_len = afs_strnlen(buf, len);
471
472     return (s_len == len) ? 1 : 0;
473 }