ubik-dont-serve-data-while-syncing-20030927
[openafs.git] / src / ubik / remote.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 RCSID
14     ("$Header$");
15
16 #include <sys/types.h>
17 #ifdef AFS_NT40_ENV
18 #include <winsock2.h>
19 #else
20 #include <sys/file.h>
21 #include <netinet/in.h>
22 #endif
23 #ifdef HAVE_STRING_H
24 #include <string.h>
25 #else
26 #ifdef HAVE_STRINGS_H
27 #include <strings.h>
28 #endif
29 #endif
30 #include <lock.h>
31 #include <rx/xdr.h>
32 #include <rx/rx.h>
33
34 #define UBIK_INTERNALS
35 #include "ubik.h"
36 #include "ubik_int.h"
37 int (*ubik_CheckRXSecurityProc) ();
38 char *ubik_CheckRXSecurityRock;
39 void printServerInfo();
40
41 /* routines for handling requests remotely-submitted by the sync site.  These are
42     only write transactions (we don't propagate read trans), and there is at most one
43     write transaction extant at any one time.
44 */
45
46 struct ubik_trans *ubik_currentTrans = 0;
47
48
49 ubik_CheckAuth(acall)
50      register struct rx_call *acall;
51 {
52     register afs_int32 code;
53     if (ubik_CheckRXSecurityProc) {
54         code = (*ubik_CheckRXSecurityProc) (ubik_CheckRXSecurityRock, acall);
55         return code;
56     } else
57         return 0;
58 }
59
60 /* the rest of these guys handle remote execution of write
61  * transactions: this is the code executed on the other servers when a
62  * sync site is executing a write transaction.
63  */
64 afs_int32
65 SDISK_Begin(rxcall, atid)
66      register struct rx_call *rxcall;
67      struct ubik_tid *atid;
68 {
69     register afs_int32 code;
70
71     if ((code = ubik_CheckAuth(rxcall))) {
72         return code;
73     }
74     DBHOLD(ubik_dbase);
75     urecovery_CheckTid(atid);
76     if (ubik_currentTrans) {
77         /* If the thread is not waiting for lock - ok to end it */
78 #if !defined(UBIK_PAUSE)
79         if (ubik_currentTrans->locktype != LOCKWAIT) {
80 #endif /* UBIK_PAUSE */
81             udisk_end(ubik_currentTrans);
82 #if !defined(UBIK_PAUSE)
83         }
84 #endif /* UBIK_PAUSE */
85         ubik_currentTrans = (struct ubik_trans *)0;
86     }
87     code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
88     if (!code && ubik_currentTrans) {
89         /* label this trans with the right trans id */
90         ubik_currentTrans->tid.epoch = atid->epoch;
91         ubik_currentTrans->tid.counter = atid->counter;
92     }
93     DBRELE(ubik_dbase);
94     return code;
95 }
96
97
98 afs_int32
99 SDISK_Commit(rxcall, atid)
100      register struct rx_call *rxcall;
101      struct ubik_tid *atid;
102 {
103     register afs_int32 code;
104     register struct ubik_dbase *dbase;
105
106     if ((code = ubik_CheckAuth(rxcall))) {
107         return code;
108     }
109
110     if (!ubik_currentTrans) {
111         return USYNC;
112     }
113     /*
114      * sanity check to make sure only write trans appear here
115      */
116     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
117         return UBADTYPE;
118     }
119
120     dbase = ubik_currentTrans->dbase;
121     DBHOLD(dbase);
122     urecovery_CheckTid(atid);
123     if (!ubik_currentTrans) {
124         DBRELE(dbase);
125         return USYNC;
126     }
127
128     code = udisk_commit(ubik_currentTrans);
129     if (code == 0) {
130         /* sync site should now match */
131         ubik_dbVersion = ubik_dbase->version;
132     }
133     DBRELE(dbase);
134     return code;
135 }
136
137 afs_int32
138 SDISK_ReleaseLocks(rxcall, atid)
139      register struct rx_call *rxcall;
140      struct ubik_tid *atid;
141 {
142     register struct ubik_dbase *dbase;
143     register afs_int32 code;
144
145     if ((code = ubik_CheckAuth(rxcall))) {
146         return code;
147     }
148
149     if (!ubik_currentTrans) {
150         return USYNC;
151     }
152     /* sanity check to make sure only write trans appear here */
153     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
154         return UBADTYPE;
155     }
156
157     dbase = ubik_currentTrans->dbase;
158     DBHOLD(dbase);
159     urecovery_CheckTid(atid);
160     if (!ubik_currentTrans) {
161         DBRELE(dbase);
162         return USYNC;
163     }
164
165     /* If the thread is not waiting for lock - ok to end it */
166 #if !defined(UBIK_PAUSE)
167     if (ubik_currentTrans->locktype != LOCKWAIT) {
168 #endif /* UBIK_PAUSE */
169         udisk_end(ubik_currentTrans);
170 #if !defined(UBIK_PAUSE)
171     }
172 #endif /* UBIK_PAUSE */
173     ubik_currentTrans = (struct ubik_trans *)0;
174     DBRELE(dbase);
175     return 0;
176 }
177
178 afs_int32
179 SDISK_Abort(rxcall, atid)
180      register struct rx_call *rxcall;
181      struct ubik_tid *atid;
182 {
183     register afs_int32 code;
184     register struct ubik_dbase *dbase;
185
186     if ((code = ubik_CheckAuth(rxcall))) {
187         return code;
188     }
189
190     if (!ubik_currentTrans) {
191         return USYNC;
192     }
193     /* sanity check to make sure only write trans appear here  */
194     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
195         return UBADTYPE;
196     }
197
198     dbase = ubik_currentTrans->dbase;
199     DBHOLD(dbase);
200     urecovery_CheckTid(atid);
201     if (!ubik_currentTrans) {
202         DBRELE(dbase);
203         return USYNC;
204     }
205
206     code = udisk_abort(ubik_currentTrans);
207     /* If the thread is not waiting for lock - ok to end it */
208 #if !defined(UBIK_PAUSE)
209     if (ubik_currentTrans->locktype != LOCKWAIT) {
210 #endif /* UBIK_PAUSE */
211         udisk_end(ubik_currentTrans);
212 #if !defined(UBIK_PAUSE)
213     }
214 #endif /* UBIK_PAUSE */
215     ubik_currentTrans = (struct ubik_trans *)0;
216     DBRELE(dbase);
217     return code;
218 }
219
220 afs_int32
221 SDISK_Lock(rxcall, atid, afile, apos, alen, atype)
222      register struct rx_call *rxcall;
223      struct ubik_tid *atid;
224      afs_int32 afile, apos, alen, atype;        /* apos and alen are not used */
225 {
226     register afs_int32 code;
227     register struct ubik_dbase *dbase;
228     struct ubik_trans *ubik_thisTrans;
229
230     if ((code = ubik_CheckAuth(rxcall))) {
231         return code;
232     }
233     if (!ubik_currentTrans) {
234         return USYNC;
235     }
236     /* sanity check to make sure only write trans appear here */
237     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
238         return UBADTYPE;
239     }
240     if (alen != 1) {
241         return UBADLOCK;
242     }
243     dbase = ubik_currentTrans->dbase;
244     DBHOLD(dbase);
245     urecovery_CheckTid(atid);
246     if (!ubik_currentTrans) {
247         DBRELE(dbase);
248         return USYNC;
249     }
250
251     ubik_thisTrans = ubik_currentTrans;
252     code = ulock_getLock(ubik_currentTrans, atype, 1);
253
254     /* While waiting, the transaction may have been ended/
255      * aborted from under us (urecovery_CheckTid). In that
256      * case, end the transaction here.
257      */
258     if (!code && (ubik_currentTrans != ubik_thisTrans)) {
259         udisk_end(ubik_thisTrans);
260         code = USYNC;
261     }
262
263     DBRELE(dbase);
264     return code;
265 }
266
267 /* Write a vector of data */
268 afs_int32
269 SDISK_WriteV(rxcall, atid, io_vector, io_buffer)
270      register struct rx_call *rxcall;
271      struct ubik_tid *atid;
272      iovec_wrt *io_vector;
273      iovec_buf *io_buffer;
274 {
275     afs_int32 code, i, offset;
276     struct ubik_dbase *dbase;
277     struct ubik_iovec *iovec;
278     char *iobuf;
279
280     if ((code = ubik_CheckAuth(rxcall))) {
281         return code;
282     }
283     if (!ubik_currentTrans) {
284         return USYNC;
285     }
286     /* sanity check to make sure only write trans appear here */
287     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
288         return UBADTYPE;
289     }
290
291     dbase = ubik_currentTrans->dbase;
292     DBHOLD(dbase);
293     urecovery_CheckTid(atid);
294     if (!ubik_currentTrans) {
295         DBRELE(dbase);
296         return USYNC;
297     }
298
299     iovec = (struct ubik_iovec *)io_vector->iovec_wrt_val;
300     iobuf = (char *)io_buffer->iovec_buf_val;
301     for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
302         /* Sanity check for going off end of buffer */
303         if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
304             code = UINTERNAL;
305         } else {
306             code =
307                 udisk_write(ubik_currentTrans, iovec[i].file, &iobuf[offset],
308                             iovec[i].position, iovec[i].length);
309         }
310         if (code)
311             break;
312
313         offset += iovec[i].length;
314     }
315
316     DBRELE(dbase);
317     return code;
318 }
319
320 afs_int32
321 SDISK_Write(rxcall, atid, afile, apos, adata)
322      register struct rx_call *rxcall;
323      struct ubik_tid *atid;
324      afs_int32 afile, apos;
325      register bulkdata *adata;
326 {
327     register afs_int32 code;
328     register struct ubik_dbase *dbase;
329
330     if ((code = ubik_CheckAuth(rxcall))) {
331         return code;
332     }
333     if (!ubik_currentTrans) {
334         return USYNC;
335     }
336     /* sanity check to make sure only write trans appear here */
337     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
338         return UBADTYPE;
339     }
340
341     dbase = ubik_currentTrans->dbase;
342     DBHOLD(dbase);
343     urecovery_CheckTid(atid);
344     if (!ubik_currentTrans) {
345         DBRELE(dbase);
346         return USYNC;
347     }
348     code =
349         udisk_write(ubik_currentTrans, afile, adata->bulkdata_val, apos,
350                     adata->bulkdata_len);
351     DBRELE(dbase);
352     return code;
353 }
354
355 afs_int32
356 SDISK_Truncate(rxcall, atid, afile, alen)
357      register struct rx_call *rxcall;
358      struct ubik_tid *atid;
359      afs_int32 afile;
360      afs_int32 alen;
361 {
362     register afs_int32 code;
363     register struct ubik_dbase *dbase;
364
365     if ((code = ubik_CheckAuth(rxcall))) {
366         return code;
367     }
368     if (!ubik_currentTrans) {
369         return USYNC;
370     }
371     /* sanity check to make sure only write trans appear here */
372     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
373         return UBADTYPE;
374     }
375
376     dbase = ubik_currentTrans->dbase;
377     DBHOLD(dbase);
378     urecovery_CheckTid(atid);
379     if (!ubik_currentTrans) {
380         DBRELE(dbase);
381         return USYNC;
382     }
383     code = udisk_truncate(ubik_currentTrans, afile, alen);
384     DBRELE(dbase);
385     return code;
386 }
387
388 afs_int32
389 SDISK_GetVersion(rxcall, aversion)
390      register struct rx_call *rxcall;
391      register struct ubik_version *aversion;
392 {
393     register afs_int32 code;
394
395     if ((code = ubik_CheckAuth(rxcall))) {
396         return code;
397     }
398
399     /*
400      * If we are the sync site, recovery shouldn't be running on any
401      * other site. We shouldn't be getting this RPC as long as we are
402      * the sync site.  To prevent any unforseen activity, we should
403      * reject this RPC until we have recognized that we are not the
404      * sync site anymore, and/or if we have any pending WRITE
405      * transactions that have to complete. This way we can be assured
406      * that this RPC would not block any pending transactions that
407      * should either fail or pass. If we have recognized the fact that
408      * we are not the sync site any more, all write transactions would
409      * fail with UNOQUORUM anyway.
410      */
411     if (ubeacon_AmSyncSite()) {
412         return UDEADLOCK;
413     }
414
415     DBHOLD(ubik_dbase);
416     code = (*ubik_dbase->getlabel) (ubik_dbase, 0, aversion);
417     DBRELE(ubik_dbase);
418     if (code) {
419         /* tell other side there's no dbase */
420         aversion->epoch = 0;
421         aversion->counter = 0;
422     }
423     return 0;
424 }
425
426 afs_int32
427 SDISK_GetFile(rxcall, file, version)
428      register struct rx_call *rxcall;
429      register afs_int32 file;
430      struct ubik_version *version;
431 {
432     register afs_int32 code;
433     register struct ubik_dbase *dbase;
434     register afs_int32 offset;
435     struct ubik_stat ubikstat;
436     char tbuffer[256];
437     afs_int32 tlen;
438     afs_int32 length;
439
440     if ((code = ubik_CheckAuth(rxcall))) {
441         return code;
442     }
443 /* temporarily disabled because it causes problems for migration tool.  Hey, it's just
444  * a sanity check, anyway. 
445     if (ubeacon_AmSyncSite()) {
446       return UDEADLOCK;
447     }
448 */
449     dbase = ubik_dbase;
450     DBHOLD(dbase);
451     code = (*dbase->stat) (dbase, file, &ubikstat);
452     if (code < 0) {
453         DBRELE(dbase);
454         return code;
455     }
456     length = ubikstat.size;
457     tlen = htonl(length);
458     code = rx_Write(rxcall, &tlen, sizeof(afs_int32));
459     if (code != sizeof(afs_int32)) {
460         DBRELE(dbase);
461         return BULK_ERROR;
462     }
463     offset = 0;
464     while (length > 0) {
465         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
466         code = (*dbase->read) (dbase, file, tbuffer, offset, tlen);
467         if (code != tlen) {
468             DBRELE(dbase);
469             return UIOERROR;
470         }
471         code = rx_Write(rxcall, tbuffer, tlen);
472         if (code != tlen) {
473             DBRELE(dbase);
474             return BULK_ERROR;
475         }
476         length -= tlen;
477         offset += tlen;
478     }
479     code = (*dbase->getlabel) (dbase, file, version);   /* return the dbase, too */
480     DBRELE(dbase);
481     return code;
482 }
483
484 afs_int32
485 SDISK_SendFile(rxcall, file, length, avers)
486      register struct rx_call *rxcall;
487      afs_int32 file;
488      afs_int32 length;
489      struct ubik_version *avers;
490 {
491     register afs_int32 code;
492     register struct ubik_dbase *dbase;
493     char tbuffer[256];
494     afs_int32 offset;
495     struct ubik_version tversion;
496     register int tlen;
497     struct rx_peer *tpeer;
498     struct rx_connection *tconn;
499     afs_uint32 otherHost;
500
501     /* send the file back to the requester */
502
503     if ((code = ubik_CheckAuth(rxcall))) {
504         goto failed;
505     }
506
507     /* next, we do a sanity check to see if the guy sending us the database is
508      * the guy we think is the sync site.  It turns out that we might not have
509      * decided yet that someone's the sync site, but they could have enough
510      * votes from others to be sync site anyway, and could send us the database
511      * in advance of getting our votes.  This is fine, what we're really trying
512      * to check is that some authenticated bogon isn't sending a random database
513      * into another configuration.  This could happen on a bad configuration
514      * screwup.  Thus, we only object if we're sure we know who the sync site
515      * is, and it ain't the guy talking to us.
516      */
517     offset = uvote_GetSyncSite();
518     tconn = rx_ConnectionOf(rxcall);
519     tpeer = rx_PeerOf(tconn);
520     otherHost = ubikGetPrimaryInterfaceAddr(rx_HostOf(tpeer));
521     if (offset && offset != otherHost) {
522         /* we *know* this is the wrong guy */
523         code = USYNC;
524         goto failed;
525     }
526
527     dbase = ubik_dbase;
528     DBHOLD(dbase);
529
530     /* abort any active trans that may scribble over the database */
531     urecovery_AbortAll(dbase);
532
533     ubik_print("Ubik: Synchronize database with server %s\n",
534                afs_inet_ntoa(otherHost));
535
536     offset = 0;
537     (*dbase->truncate) (dbase, file, 0);        /* truncate first */
538     tversion.epoch = 0;         /* start off by labelling in-transit db as invalid */
539     tversion.counter = 0;
540     (*dbase->setlabel) (dbase, file, &tversion);/* setlabel does sync */
541     memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
542     while (length > 0) {
543         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
544         code = rx_Read(rxcall, tbuffer, tlen);
545         if (code != tlen) {
546             DBRELE(dbase);
547             code = BULK_ERROR;
548             goto failed;
549         }
550         code = (*dbase->write) (dbase, file, tbuffer, offset, tlen);
551         if (code != tlen) {
552             DBRELE(dbase);
553             code = UIOERROR;
554             goto failed;
555         }
556         offset += tlen;
557         length -= tlen;
558     }
559
560     /* sync data first, then write label and resync (resync done by setlabel call).
561      * This way, good label is only on good database. */
562     (*ubik_dbase->sync) (dbase, file);
563     code = (*ubik_dbase->setlabel) (dbase, file, avers);
564     memcpy(&ubik_dbase->version, avers, sizeof(struct ubik_version));
565     udisk_Invalidate(dbase, file);      /* new dbase, flush disk buffers */
566     LWP_NoYieldSignal(&dbase->version);
567     DBRELE(dbase);
568   failed:
569     if (code) {
570         ubik_print
571             ("Ubik: Synchronize database with server %s failed (error = %d)\n",
572              afs_inet_ntoa(otherHost), code);
573     } else {
574         ubik_print("Ubik: Synchronize database completed\n");
575     }
576     return code;
577 }
578
579
580 afs_int32
581 SDISK_Probe(rxcall)
582      register struct rx_call *rxcall;
583 {
584     return 0;
585 }
586
587 /*
588 * Update remote machines addresses in my server list
589 * Send back my addresses to caller of this RPC
590 * Returns zero on success, else 1.
591 */
592 afs_int32
593 SDISK_UpdateInterfaceAddr(rxcall, inAddr, outAddr)
594      register struct rx_call *rxcall;
595      UbikInterfaceAddr *inAddr, *outAddr;
596 {
597     struct ubik_server *ts, *tmp;
598     afs_uint32 remoteAddr;      /* in net byte order */
599     int i, j, found = 0, probableMatch = 0;
600
601     /* copy the output parameters */
602     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR; i++)
603         outAddr->hostAddr[i] = ntohl(ubik_host[i]);
604
605     remoteAddr = htonl(inAddr->hostAddr[0]);
606     for (ts = ubik_servers; ts; ts = ts->next)
607         if (ts->addr[0] == remoteAddr) {        /* both in net byte order */
608             probableMatch = 1;
609             break;
610         }
611
612     if (probableMatch) {
613         /* verify that all addresses in the incoming RPC are
614          ** not part of other server entries in my CellServDB
615          */
616         for (i = 0; !found && (i < UBIK_MAX_INTERFACE_ADDR)
617              && inAddr->hostAddr[i]; i++) {
618             remoteAddr = htonl(inAddr->hostAddr[i]);
619             for (tmp = ubik_servers; (!found && tmp); tmp = tmp->next) {
620                 if (ts == tmp)  /* this is my server */
621                     continue;
622                 for (j = 0; (j < UBIK_MAX_INTERFACE_ADDR) && tmp->addr[j];
623                      j++)
624                     if (remoteAddr == tmp->addr[j]) {
625                         found = 1;
626                         break;
627                     }
628             }
629         }
630     }
631
632     /* if (probableMatch) */
633     /* inconsistent addresses in CellServDB */
634     if (!probableMatch || found) {
635         ubik_print("Inconsistent Cell Info from server: ");
636         for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
637             ubik_print("%s ", afs_inet_ntoa(htonl(inAddr->hostAddr[i])));
638         ubik_print("\n");
639         fflush(stdout);
640         fflush(stderr);
641         printServerInfo();
642         return UBADHOST;
643     }
644
645     /* update our data structures */
646     for (i = 1; i < UBIK_MAX_INTERFACE_ADDR; i++)
647         ts->addr[i] = htonl(inAddr->hostAddr[i]);
648
649     ubik_print("ubik: A Remote Server has addresses: ");
650     for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && ts->addr[i]; i++)
651         ubik_print("%s ", afs_inet_ntoa(ts->addr[i]));
652     ubik_print("\n");
653
654     return 0;
655 }
656
657 void
658 printServerInfo()
659 {
660     struct ubik_server *ts;
661     int i, j = 1;
662
663     ubik_print("Local CellServDB:");
664     for (ts = ubik_servers; ts; ts = ts->next, j++) {
665         ubik_print("Server %d: ", j);
666         for (i = 0; (i < UBIK_MAX_INTERFACE_ADDR) && ts->addr[i]; i++)
667             ubik_print("%s ", afs_inet_ntoa(ts->addr[i]));
668     }
669     ubik_print("\n");
670 }
671
672 afs_int32
673 SDISK_SetVersion(rxcall, atid, oldversionp, newversionp)
674      struct rx_call *rxcall;
675      struct ubik_tid *atid;
676      struct ubik_version *oldversionp;
677      struct ubik_version *newversionp;
678 {
679     afs_int32 code = 0;
680     struct ubik_dbase *dbase;
681
682     if ((code = ubik_CheckAuth(rxcall))) {
683         return (code);
684     }
685
686     if (!ubik_currentTrans) {
687         return USYNC;
688     }
689     /* sanity check to make sure only write trans appear here */
690     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
691         return UBADTYPE;
692     }
693
694     /* Should not get this for the sync site */
695     if (ubeacon_AmSyncSite()) {
696         return UDEADLOCK;
697     }
698
699     dbase = ubik_currentTrans->dbase;
700     DBHOLD(dbase);
701     urecovery_CheckTid(atid);
702     if (!ubik_currentTrans) {
703         DBRELE(dbase);
704         return USYNC;
705     }
706
707     /* Set the label if its version matches the sync-site's */
708     if ((oldversionp->epoch == ubik_dbVersion.epoch)
709         && (oldversionp->counter == ubik_dbVersion.counter)) {
710         code = (*dbase->setlabel) (ubik_dbase, 0, newversionp);
711         if (!code) {
712             ubik_dbase->version = *newversionp;
713             ubik_dbVersion = *newversionp;
714         }
715     } else {
716         code = USYNC;
717     }
718
719     DBRELE(dbase);
720     return code;
721 }