openafs-string-header-cleanup-20071030
[openafs.git] / src / ubik / remote.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 RCSID
14     ("$Header$");
15
16 #include <sys/types.h>
17 #ifdef AFS_NT40_ENV
18 #include <winsock2.h>
19 #else
20 #include <sys/file.h>
21 #include <netinet/in.h>
22 #endif
23 #include <string.h>
24 #include <lock.h>
25 #include <rx/xdr.h>
26 #include <rx/rx.h>
27 #include <afs/afsutil.h>
28
29 #define UBIK_INTERNALS
30 #include "ubik.h"
31 #include "ubik_int.h"
32 int (*ubik_CheckRXSecurityProc) ();
33 char *ubik_CheckRXSecurityRock;
34 void printServerInfo();
35
36 /* routines for handling requests remotely-submitted by the sync site.  These are
37     only write transactions (we don't propagate read trans), and there is at most one
38     write transaction extant at any one time.
39 */
40
41 struct ubik_trans *ubik_currentTrans = 0;
42
43
44 ubik_CheckAuth(acall)
45      register struct rx_call *acall;
46 {
47     register afs_int32 code;
48     if (ubik_CheckRXSecurityProc) {
49         code = (*ubik_CheckRXSecurityProc) (ubik_CheckRXSecurityRock, acall);
50         return code;
51     } else
52         return 0;
53 }
54
55 /* the rest of these guys handle remote execution of write
56  * transactions: this is the code executed on the other servers when a
57  * sync site is executing a write transaction.
58  */
59 afs_int32
60 SDISK_Begin(rxcall, atid)
61      register struct rx_call *rxcall;
62      struct ubik_tid *atid;
63 {
64     register afs_int32 code;
65
66     if ((code = ubik_CheckAuth(rxcall))) {
67         return code;
68     }
69     DBHOLD(ubik_dbase);
70     urecovery_CheckTid(atid);
71     if (ubik_currentTrans) {
72         /* If the thread is not waiting for lock - ok to end it */
73 #if !defined(UBIK_PAUSE)
74         if (ubik_currentTrans->locktype != LOCKWAIT) {
75 #endif /* UBIK_PAUSE */
76             udisk_end(ubik_currentTrans);
77 #if !defined(UBIK_PAUSE)
78         }
79 #endif /* UBIK_PAUSE */
80         ubik_currentTrans = (struct ubik_trans *)0;
81     }
82     code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
83     if (!code && ubik_currentTrans) {
84         /* label this trans with the right trans id */
85         ubik_currentTrans->tid.epoch = atid->epoch;
86         ubik_currentTrans->tid.counter = atid->counter;
87     }
88     DBRELE(ubik_dbase);
89     return code;
90 }
91
92
93 afs_int32
94 SDISK_Commit(rxcall, atid)
95      register struct rx_call *rxcall;
96      struct ubik_tid *atid;
97 {
98     register afs_int32 code;
99     register struct ubik_dbase *dbase;
100
101     if ((code = ubik_CheckAuth(rxcall))) {
102         return code;
103     }
104
105     if (!ubik_currentTrans) {
106         return USYNC;
107     }
108     /*
109      * sanity check to make sure only write trans appear here
110      */
111     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
112         return UBADTYPE;
113     }
114
115     dbase = ubik_currentTrans->dbase;
116     DBHOLD(dbase);
117     urecovery_CheckTid(atid);
118     if (!ubik_currentTrans) {
119         DBRELE(dbase);
120         return USYNC;
121     }
122
123     code = udisk_commit(ubik_currentTrans);
124     if (code == 0) {
125         /* sync site should now match */
126         ubik_dbVersion = ubik_dbase->version;
127     }
128     DBRELE(dbase);
129     return code;
130 }
131
132 afs_int32
133 SDISK_ReleaseLocks(rxcall, atid)
134      register struct rx_call *rxcall;
135      struct ubik_tid *atid;
136 {
137     register struct ubik_dbase *dbase;
138     register afs_int32 code;
139
140     if ((code = ubik_CheckAuth(rxcall))) {
141         return code;
142     }
143
144     if (!ubik_currentTrans) {
145         return USYNC;
146     }
147     /* sanity check to make sure only write trans appear here */
148     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
149         return UBADTYPE;
150     }
151
152     dbase = ubik_currentTrans->dbase;
153     DBHOLD(dbase);
154     urecovery_CheckTid(atid);
155     if (!ubik_currentTrans) {
156         DBRELE(dbase);
157         return USYNC;
158     }
159
160     /* If the thread is not waiting for lock - ok to end it */
161 #if !defined(UBIK_PAUSE)
162     if (ubik_currentTrans->locktype != LOCKWAIT) {
163 #endif /* UBIK_PAUSE */
164         udisk_end(ubik_currentTrans);
165 #if !defined(UBIK_PAUSE)
166     }
167 #endif /* UBIK_PAUSE */
168     ubik_currentTrans = (struct ubik_trans *)0;
169     DBRELE(dbase);
170     return 0;
171 }
172
173 afs_int32
174 SDISK_Abort(rxcall, atid)
175      register struct rx_call *rxcall;
176      struct ubik_tid *atid;
177 {
178     register afs_int32 code;
179     register struct ubik_dbase *dbase;
180
181     if ((code = ubik_CheckAuth(rxcall))) {
182         return code;
183     }
184
185     if (!ubik_currentTrans) {
186         return USYNC;
187     }
188     /* sanity check to make sure only write trans appear here  */
189     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
190         return UBADTYPE;
191     }
192
193     dbase = ubik_currentTrans->dbase;
194     DBHOLD(dbase);
195     urecovery_CheckTid(atid);
196     if (!ubik_currentTrans) {
197         DBRELE(dbase);
198         return USYNC;
199     }
200
201     code = udisk_abort(ubik_currentTrans);
202     /* If the thread is not waiting for lock - ok to end it */
203 #if !defined(UBIK_PAUSE)
204     if (ubik_currentTrans->locktype != LOCKWAIT) {
205 #endif /* UBIK_PAUSE */
206         udisk_end(ubik_currentTrans);
207 #if !defined(UBIK_PAUSE)
208     }
209 #endif /* UBIK_PAUSE */
210     ubik_currentTrans = (struct ubik_trans *)0;
211     DBRELE(dbase);
212     return code;
213 }
214
215 afs_int32
216 SDISK_Lock(rxcall, atid, afile, apos, alen, atype)
217      register struct rx_call *rxcall;
218      struct ubik_tid *atid;
219      afs_int32 afile, apos, alen, atype;        /* apos and alen are not used */
220 {
221     register afs_int32 code;
222     register struct ubik_dbase *dbase;
223     struct ubik_trans *ubik_thisTrans;
224
225     if ((code = ubik_CheckAuth(rxcall))) {
226         return code;
227     }
228     if (!ubik_currentTrans) {
229         return USYNC;
230     }
231     /* sanity check to make sure only write trans appear here */
232     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
233         return UBADTYPE;
234     }
235     if (alen != 1) {
236         return UBADLOCK;
237     }
238     dbase = ubik_currentTrans->dbase;
239     DBHOLD(dbase);
240     urecovery_CheckTid(atid);
241     if (!ubik_currentTrans) {
242         DBRELE(dbase);
243         return USYNC;
244     }
245
246     ubik_thisTrans = ubik_currentTrans;
247     code = ulock_getLock(ubik_currentTrans, atype, 1);
248
249     /* While waiting, the transaction may have been ended/
250      * aborted from under us (urecovery_CheckTid). In that
251      * case, end the transaction here.
252      */
253     if (!code && (ubik_currentTrans != ubik_thisTrans)) {
254         udisk_end(ubik_thisTrans);
255         code = USYNC;
256     }
257
258     DBRELE(dbase);
259     return code;
260 }
261
262 /* Write a vector of data */
263 afs_int32
264 SDISK_WriteV(rxcall, atid, io_vector, io_buffer)
265      register struct rx_call *rxcall;
266      struct ubik_tid *atid;
267      iovec_wrt *io_vector;
268      iovec_buf *io_buffer;
269 {
270     afs_int32 code, i, offset;
271     struct ubik_dbase *dbase;
272     struct ubik_iovec *iovec;
273     char *iobuf;
274
275     if ((code = ubik_CheckAuth(rxcall))) {
276         return code;
277     }
278     if (!ubik_currentTrans) {
279         return USYNC;
280     }
281     /* sanity check to make sure only write trans appear here */
282     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
283         return UBADTYPE;
284     }
285
286     dbase = ubik_currentTrans->dbase;
287     DBHOLD(dbase);
288     urecovery_CheckTid(atid);
289     if (!ubik_currentTrans) {
290         DBRELE(dbase);
291         return USYNC;
292     }
293
294     iovec = (struct ubik_iovec *)io_vector->iovec_wrt_val;
295     iobuf = (char *)io_buffer->iovec_buf_val;
296     for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
297         /* Sanity check for going off end of buffer */
298         if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
299             code = UINTERNAL;
300         } else {
301             code =
302                 udisk_write(ubik_currentTrans, iovec[i].file, &iobuf[offset],
303                             iovec[i].position, iovec[i].length);
304         }
305         if (code)
306             break;
307
308         offset += iovec[i].length;
309     }
310
311     DBRELE(dbase);
312     return code;
313 }
314
315 afs_int32
316 SDISK_Write(rxcall, atid, afile, apos, adata)
317      register struct rx_call *rxcall;
318      struct ubik_tid *atid;
319      afs_int32 afile, apos;
320      register bulkdata *adata;
321 {
322     register afs_int32 code;
323     register struct ubik_dbase *dbase;
324
325     if ((code = ubik_CheckAuth(rxcall))) {
326         return code;
327     }
328     if (!ubik_currentTrans) {
329         return USYNC;
330     }
331     /* sanity check to make sure only write trans appear here */
332     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
333         return UBADTYPE;
334     }
335
336     dbase = ubik_currentTrans->dbase;
337     DBHOLD(dbase);
338     urecovery_CheckTid(atid);
339     if (!ubik_currentTrans) {
340         DBRELE(dbase);
341         return USYNC;
342     }
343     code =
344         udisk_write(ubik_currentTrans, afile, adata->bulkdata_val, apos,
345                     adata->bulkdata_len);
346     DBRELE(dbase);
347     return code;
348 }
349
350 afs_int32
351 SDISK_Truncate(rxcall, atid, afile, alen)
352      register struct rx_call *rxcall;
353      struct ubik_tid *atid;
354      afs_int32 afile;
355      afs_int32 alen;
356 {
357     register afs_int32 code;
358     register struct ubik_dbase *dbase;
359
360     if ((code = ubik_CheckAuth(rxcall))) {
361         return code;
362     }
363     if (!ubik_currentTrans) {
364         return USYNC;
365     }
366     /* sanity check to make sure only write trans appear here */
367     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
368         return UBADTYPE;
369     }
370
371     dbase = ubik_currentTrans->dbase;
372     DBHOLD(dbase);
373     urecovery_CheckTid(atid);
374     if (!ubik_currentTrans) {
375         DBRELE(dbase);
376         return USYNC;
377     }
378     code = udisk_truncate(ubik_currentTrans, afile, alen);
379     DBRELE(dbase);
380     return code;
381 }
382
383 afs_int32
384 SDISK_GetVersion(rxcall, aversion)
385      register struct rx_call *rxcall;
386      register struct ubik_version *aversion;
387 {
388     register afs_int32 code;
389
390     if ((code = ubik_CheckAuth(rxcall))) {
391         return code;
392     }
393
394     /*
395      * If we are the sync site, recovery shouldn't be running on any
396      * other site. We shouldn't be getting this RPC as long as we are
397      * the sync site.  To prevent any unforseen activity, we should
398      * reject this RPC until we have recognized that we are not the
399      * sync site anymore, and/or if we have any pending WRITE
400      * transactions that have to complete. This way we can be assured
401      * that this RPC would not block any pending transactions that
402      * should either fail or pass. If we have recognized the fact that
403      * we are not the sync site any more, all write transactions would
404      * fail with UNOQUORUM anyway.
405      */
406     if (ubeacon_AmSyncSite()) {
407         return UDEADLOCK;
408     }
409
410     DBHOLD(ubik_dbase);
411     code = (*ubik_dbase->getlabel) (ubik_dbase, 0, aversion);
412     DBRELE(ubik_dbase);
413     if (code) {
414         /* tell other side there's no dbase */
415         aversion->epoch = 0;
416         aversion->counter = 0;
417     }
418     return 0;
419 }
420
421 afs_int32
422 SDISK_GetFile(rxcall, file, version)
423      register struct rx_call *rxcall;
424      register afs_int32 file;
425      struct ubik_version *version;
426 {
427     register afs_int32 code;
428     register struct ubik_dbase *dbase;
429     register afs_int32 offset;
430     struct ubik_stat ubikstat;
431     char tbuffer[256];
432     afs_int32 tlen;
433     afs_int32 length;
434
435     if ((code = ubik_CheckAuth(rxcall))) {
436         return code;
437     }
438 /* temporarily disabled because it causes problems for migration tool.  Hey, it's just
439  * a sanity check, anyway. 
440     if (ubeacon_AmSyncSite()) {
441       return UDEADLOCK;
442     }
443 */
444     dbase = ubik_dbase;
445     DBHOLD(dbase);
446     code = (*dbase->stat) (dbase, file, &ubikstat);
447     if (code < 0) {
448         DBRELE(dbase);
449         return code;
450     }
451     length = ubikstat.size;
452     tlen = htonl(length);
453     code = rx_Write(rxcall, (char *)&tlen, sizeof(afs_int32));
454     if (code != sizeof(afs_int32)) {
455         DBRELE(dbase);
456         ubik_dprint("Rx-write length error=%d\n", code);
457         return BULK_ERROR;
458     }
459     offset = 0;
460     while (length > 0) {
461         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
462         code = (*dbase->read) (dbase, file, tbuffer, offset, tlen);
463         if (code != tlen) {
464             DBRELE(dbase);
465             ubik_dprint("read failed error=%d\n", code);
466             return UIOERROR;
467         }
468         code = rx_Write(rxcall, tbuffer, tlen);
469         if (code != tlen) {
470             DBRELE(dbase);
471             ubik_dprint("Rx-write length error=%d\n", code);
472             return BULK_ERROR;
473         }
474         length -= tlen;
475         offset += tlen;
476     }
477     code = (*dbase->getlabel) (dbase, file, version);   /* return the dbase, too */
478     DBRELE(dbase);
479     return code;
480 }
481
482 afs_int32
483 SDISK_SendFile(rxcall, file, length, avers)
484      register struct rx_call *rxcall;
485      afs_int32 file;
486      afs_int32 length;
487      struct ubik_version *avers;
488 {
489     register afs_int32 code;
490     register struct ubik_dbase *dbase;
491     char tbuffer[256];
492     afs_int32 offset;
493     struct ubik_version tversion;
494     register int tlen;
495     struct rx_peer *tpeer;
496     struct rx_connection *tconn;
497     afs_uint32 otherHost;
498
499     /* send the file back to the requester */
500
501     if ((code = ubik_CheckAuth(rxcall))) {
502         goto failed;
503     }
504
505     /* next, we do a sanity check to see if the guy sending us the database is
506      * the guy we think is the sync site.  It turns out that we might not have
507      * decided yet that someone's the sync site, but they could have enough
508      * votes from others to be sync site anyway, and could send us the database
509      * in advance of getting our votes.  This is fine, what we're really trying
510      * to check is that some authenticated bogon isn't sending a random database
511      * into another configuration.  This could happen on a bad configuration
512      * screwup.  Thus, we only object if we're sure we know who the sync site
513      * is, and it ain't the guy talking to us.
514      */
515     offset = uvote_GetSyncSite();
516     tconn = rx_ConnectionOf(rxcall);
517     tpeer = rx_PeerOf(tconn);
518     otherHost = ubikGetPrimaryInterfaceAddr(rx_HostOf(tpeer));
519     if (offset && offset != otherHost) {
520         /* we *know* this is the wrong guy */
521         code = USYNC;
522         goto failed;
523     }
524
525     dbase = ubik_dbase;
526     DBHOLD(dbase);
527
528     /* abort any active trans that may scribble over the database */
529     urecovery_AbortAll(dbase);
530
531     ubik_print("Ubik: Synchronize database with server %s\n",
532                afs_inet_ntoa(otherHost));
533
534     offset = 0;
535     (*dbase->truncate) (dbase, file, 0);        /* truncate first */
536     tversion.epoch = 0;         /* start off by labelling in-transit db as invalid */
537     tversion.counter = 0;
538     (*dbase->setlabel) (dbase, file, &tversion);        /* setlabel does sync */
539     memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
540     while (length > 0) {
541         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
542         code = rx_Read(rxcall, tbuffer, tlen);
543         if (code != tlen) {
544             DBRELE(dbase);
545             ubik_dprint("Rx-read length error=%d\n", code);
546             code = BULK_ERROR;
547             goto failed;
548         }
549         code = (*dbase->write) (dbase, file, tbuffer, offset, tlen);
550         if (code != tlen) {
551             DBRELE(dbase);
552             ubik_dprint("write failed error=%d\n", code);
553             code = UIOERROR;
554             goto failed;
555         }
556         offset += tlen;
557         length -= tlen;
558     }
559
560     /* sync data first, then write label and resync (resync done by setlabel call).
561      * This way, good label is only on good database. */
562     (*ubik_dbase->sync) (dbase, file);
563     code = (*ubik_dbase->setlabel) (dbase, file, avers);
564     memcpy(&ubik_dbase->version, avers, sizeof(struct ubik_version));
565     udisk_Invalidate(dbase, file);      /* new dbase, flush disk buffers */
566     LWP_NoYieldSignal(&dbase->version);
567     DBRELE(dbase);
568   failed:
569     if (code) {
570         ubik_print
571             ("Ubik: Synchronize database with server %s failed (error = %d)\n",
572              afs_inet_ntoa(otherHost), code);
573     } else {
574         ubik_print("Ubik: Synchronize database completed\n");
575     }
576     return code;
577 }
578
579
580 afs_int32
581 SDISK_Probe(rxcall)
582      register struct rx_call *rxcall;
583 {
584     return 0;
585 }
586
587 /*
588 * Update remote machines addresses in my server list
589 * Send back my addresses to caller of this RPC
590 * Returns zero on success, else 1.
591 */
592 afs_int32
593 SDISK_UpdateInterfaceAddr(rxcall, inAddr, outAddr)
594      register struct rx_call *rxcall;
595      UbikInterfaceAddr *inAddr, *outAddr;
596 {
597     struct ubik_server *ts, *tmp;
598     afs_uint32 remoteAddr;      /* in net byte order */
599     int i, j, found = 0, probableMatch = 0;
600
601     /* copy the output parameters */
602     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR; i++)
603         outAddr->hostAddr[i] = ntohl(ubik_host[i]);
604
605     remoteAddr = htonl(inAddr->hostAddr[0]);
606     for (ts = ubik_servers; ts; ts = ts->next)
607         if (ts->addr[0] == remoteAddr) {        /* both in net byte order */
608             probableMatch = 1;
609             break;
610         }
611
612     if (probableMatch) {
613         /* verify that all addresses in the incoming RPC are
614          ** not part of other server entries in my CellServDB
615          */
616         for (i = 0; !found && (i < UBIK_MAX_INTERFACE_ADDR)
617              && inAddr->hostAddr[i]; i++) {
618             remoteAddr = htonl(inAddr->hostAddr[i]);
619             for (tmp = ubik_servers; (!found && tmp); tmp = tmp->next) {
620                 if (ts == tmp)  /* this is my server */
621                     continue;
622                 for (j = 0; (j < UBIK_MAX_INTERFACE_ADDR) && tmp->addr[j];
623                      j++)
624                     if (remoteAddr == tmp->addr[j]) {
625                         found = 1;
626                         break;
627                     }
628             }
629         }
630     }
631
632     /* if (probableMatch) */
633     /* inconsistent addresses in CellServDB */
634     if (!probableMatch || found) {
635         ubik_print("Inconsistent Cell Info from server: ");
636         for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
637             ubik_print("%s ", afs_inet_ntoa(htonl(inAddr->hostAddr[i])));
638         ubik_print("\n");
639         fflush(stdout);
640         fflush(stderr);
641         printServerInfo();
642         return UBADHOST;
643     }
644
645     /* update our data structures */
646     for (i = 1; i < UBIK_MAX_INTERFACE_ADDR; i++)
647         ts->addr[i] = htonl(inAddr->hostAddr[i]);
648
649     ubik_print("ubik: A Remote Server has addresses: ");
650     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && ts->addr[i]; i++)
651         ubik_print("%s ", afs_inet_ntoa(ts->addr[i]));
652     ubik_print("\n");
653
654     return 0;
655 }
656
657 void
658 printServerInfo()
659 {
660     struct ubik_server *ts;
661     int i, j = 1;
662
663     ubik_print("Local CellServDB:");
664     for (ts = ubik_servers; ts; ts = ts->next, j++) {
665         ubik_print("Server %d: ", j);
666         for (i = 0; (i < UBIK_MAX_INTERFACE_ADDR) && ts->addr[i]; i++)
667             ubik_print("%s ", afs_inet_ntoa(ts->addr[i]));
668     }
669     ubik_print("\n");
670 }
671
672 afs_int32
673 SDISK_SetVersion(rxcall, atid, oldversionp, newversionp)
674      struct rx_call *rxcall;
675      struct ubik_tid *atid;
676      struct ubik_version *oldversionp;
677      struct ubik_version *newversionp;
678 {
679     afs_int32 code = 0;
680     struct ubik_dbase *dbase;
681
682     if ((code = ubik_CheckAuth(rxcall))) {
683         return (code);
684     }
685
686     if (!ubik_currentTrans) {
687         return USYNC;
688     }
689     /* sanity check to make sure only write trans appear here */
690     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
691         return UBADTYPE;
692     }
693
694     /* Should not get this for the sync site */
695     if (ubeacon_AmSyncSite()) {
696         return UDEADLOCK;
697     }
698
699     dbase = ubik_currentTrans->dbase;
700     DBHOLD(dbase);
701     urecovery_CheckTid(atid);
702     if (!ubik_currentTrans) {
703         DBRELE(dbase);
704         return USYNC;
705     }
706
707     /* Set the label if its version matches the sync-site's */
708     if ((oldversionp->epoch == ubik_dbVersion.epoch)
709         && (oldversionp->counter == ubik_dbVersion.counter)) {
710         code = (*dbase->setlabel) (ubik_dbase, 0, newversionp);
711         if (!code) {
712             ubik_dbase->version = *newversionp;
713             ubik_dbVersion = *newversionp;
714         }
715     } else {
716         code = USYNC;
717     }
718
719     DBRELE(dbase);
720     return code;
721 }