2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
16 #include <sys/types.h>
22 #include <netinet/in.h>
29 #include <afs/afsutil.h>
31 #define UBIK_INTERNALS
34 int (*ubik_CheckRXSecurityProc) ();
35 char *ubik_CheckRXSecurityRock;
36 void printServerInfo();
38 /* routines for handling requests remotely-submitted by the sync site. These are
39 only write transactions (we don't propagate read trans), and there is at most one
40 write transaction extant at any one time.
43 struct ubik_trans *ubik_currentTrans = 0;
47 register struct rx_call *acall;
49 register afs_int32 code;
50 if (ubik_CheckRXSecurityProc) {
51 code = (*ubik_CheckRXSecurityProc) (ubik_CheckRXSecurityRock, acall);
57 /* the rest of these guys handle remote execution of write
58 * transactions: this is the code executed on the other servers when a
59 * sync site is executing a write transaction.
62 SDISK_Begin(rxcall, atid)
63 register struct rx_call *rxcall;
64 struct ubik_tid *atid;
66 register afs_int32 code;
68 if ((code = ubik_CheckAuth(rxcall))) {
72 urecovery_CheckTid(atid);
73 if (ubik_currentTrans) {
74 /* If the thread is not waiting for lock - ok to end it */
75 #if !defined(UBIK_PAUSE)
76 if (ubik_currentTrans->locktype != LOCKWAIT) {
77 #endif /* UBIK_PAUSE */
78 udisk_end(ubik_currentTrans);
79 #if !defined(UBIK_PAUSE)
81 #endif /* UBIK_PAUSE */
82 ubik_currentTrans = (struct ubik_trans *)0;
84 code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
85 if (!code && ubik_currentTrans) {
86 /* label this trans with the right trans id */
87 ubik_currentTrans->tid.epoch = atid->epoch;
88 ubik_currentTrans->tid.counter = atid->counter;
96 SDISK_Commit(rxcall, atid)
97 register struct rx_call *rxcall;
98 struct ubik_tid *atid;
100 register afs_int32 code;
101 register struct ubik_dbase *dbase;
103 if ((code = ubik_CheckAuth(rxcall))) {
107 if (!ubik_currentTrans) {
111 * sanity check to make sure only write trans appear here
113 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
117 dbase = ubik_currentTrans->dbase;
119 urecovery_CheckTid(atid);
120 if (!ubik_currentTrans) {
125 code = udisk_commit(ubik_currentTrans);
127 /* sync site should now match */
128 ubik_dbVersion = ubik_dbase->version;
135 SDISK_ReleaseLocks(rxcall, atid)
136 register struct rx_call *rxcall;
137 struct ubik_tid *atid;
139 register struct ubik_dbase *dbase;
140 register afs_int32 code;
142 if ((code = ubik_CheckAuth(rxcall))) {
146 if (!ubik_currentTrans) {
149 /* sanity check to make sure only write trans appear here */
150 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
154 dbase = ubik_currentTrans->dbase;
156 urecovery_CheckTid(atid);
157 if (!ubik_currentTrans) {
162 /* If the thread is not waiting for lock - ok to end it */
163 #if !defined(UBIK_PAUSE)
164 if (ubik_currentTrans->locktype != LOCKWAIT) {
165 #endif /* UBIK_PAUSE */
166 udisk_end(ubik_currentTrans);
167 #if !defined(UBIK_PAUSE)
169 #endif /* UBIK_PAUSE */
170 ubik_currentTrans = (struct ubik_trans *)0;
176 SDISK_Abort(rxcall, atid)
177 register struct rx_call *rxcall;
178 struct ubik_tid *atid;
180 register afs_int32 code;
181 register struct ubik_dbase *dbase;
183 if ((code = ubik_CheckAuth(rxcall))) {
187 if (!ubik_currentTrans) {
190 /* sanity check to make sure only write trans appear here */
191 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
195 dbase = ubik_currentTrans->dbase;
197 urecovery_CheckTid(atid);
198 if (!ubik_currentTrans) {
203 code = udisk_abort(ubik_currentTrans);
204 /* If the thread is not waiting for lock - ok to end it */
205 #if !defined(UBIK_PAUSE)
206 if (ubik_currentTrans->locktype != LOCKWAIT) {
207 #endif /* UBIK_PAUSE */
208 udisk_end(ubik_currentTrans);
209 #if !defined(UBIK_PAUSE)
211 #endif /* UBIK_PAUSE */
212 ubik_currentTrans = (struct ubik_trans *)0;
218 SDISK_Lock(rxcall, atid, afile, apos, alen, atype)
219 register struct rx_call *rxcall;
220 struct ubik_tid *atid;
221 afs_int32 afile, apos, alen, atype; /* apos and alen are not used */
223 register afs_int32 code;
224 register struct ubik_dbase *dbase;
225 struct ubik_trans *ubik_thisTrans;
227 if ((code = ubik_CheckAuth(rxcall))) {
230 if (!ubik_currentTrans) {
233 /* sanity check to make sure only write trans appear here */
234 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
240 dbase = ubik_currentTrans->dbase;
242 urecovery_CheckTid(atid);
243 if (!ubik_currentTrans) {
248 ubik_thisTrans = ubik_currentTrans;
249 code = ulock_getLock(ubik_currentTrans, atype, 1);
251 /* While waiting, the transaction may have been ended/
252 * aborted from under us (urecovery_CheckTid). In that
253 * case, end the transaction here.
255 if (!code && (ubik_currentTrans != ubik_thisTrans)) {
256 udisk_end(ubik_thisTrans);
264 /* Write a vector of data */
266 SDISK_WriteV(rxcall, atid, io_vector, io_buffer)
267 register struct rx_call *rxcall;
268 struct ubik_tid *atid;
269 iovec_wrt *io_vector;
270 iovec_buf *io_buffer;
272 afs_int32 code, i, offset;
273 struct ubik_dbase *dbase;
274 struct ubik_iovec *iovec;
277 if ((code = ubik_CheckAuth(rxcall))) {
280 if (!ubik_currentTrans) {
283 /* sanity check to make sure only write trans appear here */
284 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
288 dbase = ubik_currentTrans->dbase;
290 urecovery_CheckTid(atid);
291 if (!ubik_currentTrans) {
296 iovec = (struct ubik_iovec *)io_vector->iovec_wrt_val;
297 iobuf = (char *)io_buffer->iovec_buf_val;
298 for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
299 /* Sanity check for going off end of buffer */
300 if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
304 udisk_write(ubik_currentTrans, iovec[i].file, &iobuf[offset],
305 iovec[i].position, iovec[i].length);
310 offset += iovec[i].length;
318 SDISK_Write(rxcall, atid, afile, apos, adata)
319 register struct rx_call *rxcall;
320 struct ubik_tid *atid;
321 afs_int32 afile, apos;
322 register bulkdata *adata;
324 register afs_int32 code;
325 register struct ubik_dbase *dbase;
327 if ((code = ubik_CheckAuth(rxcall))) {
330 if (!ubik_currentTrans) {
333 /* sanity check to make sure only write trans appear here */
334 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
338 dbase = ubik_currentTrans->dbase;
340 urecovery_CheckTid(atid);
341 if (!ubik_currentTrans) {
346 udisk_write(ubik_currentTrans, afile, adata->bulkdata_val, apos,
347 adata->bulkdata_len);
353 SDISK_Truncate(rxcall, atid, afile, alen)
354 register struct rx_call *rxcall;
355 struct ubik_tid *atid;
359 register afs_int32 code;
360 register struct ubik_dbase *dbase;
362 if ((code = ubik_CheckAuth(rxcall))) {
365 if (!ubik_currentTrans) {
368 /* sanity check to make sure only write trans appear here */
369 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
373 dbase = ubik_currentTrans->dbase;
375 urecovery_CheckTid(atid);
376 if (!ubik_currentTrans) {
380 code = udisk_truncate(ubik_currentTrans, afile, alen);
386 SDISK_GetVersion(rxcall, aversion)
387 register struct rx_call *rxcall;
388 register struct ubik_version *aversion;
390 register afs_int32 code;
392 if ((code = ubik_CheckAuth(rxcall))) {
397 * If we are the sync site, recovery shouldn't be running on any
398 * other site. We shouldn't be getting this RPC as long as we are
399 * the sync site. To prevent any unforseen activity, we should
400 * reject this RPC until we have recognized that we are not the
401 * sync site anymore, and/or if we have any pending WRITE
402 * transactions that have to complete. This way we can be assured
403 * that this RPC would not block any pending transactions that
404 * should either fail or pass. If we have recognized the fact that
405 * we are not the sync site any more, all write transactions would
406 * fail with UNOQUORUM anyway.
408 if (ubeacon_AmSyncSite()) {
413 code = (*ubik_dbase->getlabel) (ubik_dbase, 0, aversion);
416 /* tell other side there's no dbase */
418 aversion->counter = 0;
424 SDISK_GetFile(rxcall, file, version)
425 register struct rx_call *rxcall;
426 register afs_int32 file;
427 struct ubik_version *version;
429 register afs_int32 code;
430 register struct ubik_dbase *dbase;
431 register afs_int32 offset;
432 struct ubik_stat ubikstat;
437 if ((code = ubik_CheckAuth(rxcall))) {
440 /* temporarily disabled because it causes problems for migration tool. Hey, it's just
441 * a sanity check, anyway.
442 if (ubeacon_AmSyncSite()) {
448 code = (*dbase->stat) (dbase, file, &ubikstat);
453 length = ubikstat.size;
454 tlen = htonl(length);
455 code = rx_Write(rxcall, (char *)&tlen, sizeof(afs_int32));
456 if (code != sizeof(afs_int32)) {
458 ubik_dprint("Rx-write length error=%d\n", code);
463 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
464 code = (*dbase->read) (dbase, file, tbuffer, offset, tlen);
467 ubik_dprint("read failed error=%d\n", code);
470 code = rx_Write(rxcall, tbuffer, tlen);
473 ubik_dprint("Rx-write length error=%d\n", code);
479 code = (*dbase->getlabel) (dbase, file, version); /* return the dbase, too */
485 SDISK_SendFile(rxcall, file, length, avers)
486 register struct rx_call *rxcall;
489 struct ubik_version *avers;
491 register afs_int32 code;
492 register struct ubik_dbase *dbase;
495 struct ubik_version tversion;
497 struct rx_peer *tpeer;
498 struct rx_connection *tconn;
499 afs_uint32 otherHost;
500 #ifndef OLD_URECOVERY
503 afs_int32 epoch, pass;
506 /* send the file back to the requester */
508 if ((code = ubik_CheckAuth(rxcall))) {
512 /* next, we do a sanity check to see if the guy sending us the database is
513 * the guy we think is the sync site. It turns out that we might not have
514 * decided yet that someone's the sync site, but they could have enough
515 * votes from others to be sync site anyway, and could send us the database
516 * in advance of getting our votes. This is fine, what we're really trying
517 * to check is that some authenticated bogon isn't sending a random database
518 * into another configuration. This could happen on a bad configuration
519 * screwup. Thus, we only object if we're sure we know who the sync site
520 * is, and it ain't the guy talking to us.
522 offset = uvote_GetSyncSite();
523 tconn = rx_ConnectionOf(rxcall);
524 tpeer = rx_PeerOf(tconn);
525 otherHost = ubikGetPrimaryInterfaceAddr(rx_HostOf(tpeer));
526 if (offset && offset != otherHost) {
527 /* we *know* this is the wrong guy */
535 /* abort any active trans that may scribble over the database */
536 urecovery_AbortAll(dbase);
538 ubik_print("Ubik: Synchronize database with server %s\n",
539 afs_inet_ntoa(otherHost));
543 (*dbase->truncate) (dbase, file, 0); /* truncate first */
544 tversion.counter = 0;
548 tversion.epoch = 0; /* start off by labelling in-transit db as invalid */
549 (*dbase->setlabel) (dbase, file, &tversion); /* setlabel does sync */
550 #ifndef OLD_URECOVERY
552 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.TMP", ubik_dbase->pathName);
553 fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
558 code = lseek(fd, HDRSIZE, 0);
559 if (code != HDRSIZE) {
566 memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
568 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
569 #if !defined(OLD_URECOVERY) && !defined(AFS_PTHREAD_ENV)
573 code = rx_Read(rxcall, tbuffer, tlen);
576 ubik_dprint("Rx-read length error=%d\n", code);
582 code = (*dbase->write) (dbase, file, tbuffer, offset, tlen);
584 code = write(fd, tbuffer, tlen);
589 ubik_dprint("write failed error=%d\n", code);
597 #ifndef OLD_URECOVERY
603 /* sync data first, then write label and resync (resync done by setlabel call).
604 * This way, good label is only on good database. */
606 (*ubik_dbase->sync) (dbase, file);
608 afs_snprintf(tbuffer, sizeof(tbuffer), "%s.DB0", ubik_dbase->pathName);
610 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.OLD", ubik_dbase->pathName);
611 code = unlink(pbuffer);
613 code = rename(tbuffer, pbuffer);
614 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.TMP", ubik_dbase->pathName);
617 code = rename(pbuffer, tbuffer);
620 code = (*ubik_dbase->setlabel) (dbase, file, avers);
621 #ifndef OLD_URECOVERY
623 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.OLD", ubik_dbase->pathName);
627 memcpy(&ubik_dbase->version, avers, sizeof(struct ubik_version));
628 udisk_Invalidate(dbase, file); /* new dbase, flush disk buffers */
629 #if defined(AFS_PTHREAD_ENV) && defined(UBIK_PTHREAD_ENV)
630 assert(pthread_cond_broadcast(&dbase->version_cond) == 0);
632 LWP_NoYieldSignal(&dbase->version);
637 #ifndef OLD_URECOVERY
639 /* Failed to sync. Allow reads again for now. */
640 tversion.epoch = epoch;
641 (*dbase->setlabel) (dbase, file, &tversion);
644 ("Ubik: Synchronize database with server %s failed (error = %d)\n",
645 afs_inet_ntoa(otherHost), code);
647 ubik_print("Ubik: Synchronize database completed\n");
655 register struct rx_call *rxcall;
661 * Update remote machines addresses in my server list
662 * Send back my addresses to caller of this RPC
663 * Returns zero on success, else 1.
666 SDISK_UpdateInterfaceAddr(rxcall, inAddr, outAddr)
667 register struct rx_call *rxcall;
668 UbikInterfaceAddr *inAddr, *outAddr;
670 struct ubik_server *ts, *tmp;
671 afs_uint32 remoteAddr; /* in net byte order */
672 int i, j, found = 0, probableMatch = 0;
674 /* copy the output parameters */
675 for (i = 0; i < UBIK_MAX_INTERFACE_ADDR; i++)
676 outAddr->hostAddr[i] = ntohl(ubik_host[i]);
678 remoteAddr = htonl(inAddr->hostAddr[0]);
679 for (ts = ubik_servers; ts; ts = ts->next)
680 if (ts->addr[0] == remoteAddr) { /* both in net byte order */
686 /* verify that all addresses in the incoming RPC are
687 ** not part of other server entries in my CellServDB
689 for (i = 0; !found && (i < UBIK_MAX_INTERFACE_ADDR)
690 && inAddr->hostAddr[i]; i++) {
691 remoteAddr = htonl(inAddr->hostAddr[i]);
692 for (tmp = ubik_servers; (!found && tmp); tmp = tmp->next) {
693 if (ts == tmp) /* this is my server */
695 for (j = 0; (j < UBIK_MAX_INTERFACE_ADDR) && tmp->addr[j];
697 if (remoteAddr == tmp->addr[j]) {
705 /* if (probableMatch) */
706 /* inconsistent addresses in CellServDB */
707 if (!probableMatch || found) {
708 ubik_print("Inconsistent Cell Info from server: ");
709 for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
710 ubik_print("%s ", afs_inet_ntoa(htonl(inAddr->hostAddr[i])));
718 /* update our data structures */
719 for (i = 1; i < UBIK_MAX_INTERFACE_ADDR; i++)
720 ts->addr[i] = htonl(inAddr->hostAddr[i]);
722 ubik_print("ubik: A Remote Server has addresses: ");
723 for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && ts->addr[i]; i++)
724 ubik_print("%s ", afs_inet_ntoa(ts->addr[i]));
733 struct ubik_server *ts;
736 ubik_print("Local CellServDB:");
737 for (ts = ubik_servers; ts; ts = ts->next, j++) {
738 ubik_print("Server %d: ", j);
739 for (i = 0; (i < UBIK_MAX_INTERFACE_ADDR) && ts->addr[i]; i++)
740 ubik_print("%s ", afs_inet_ntoa(ts->addr[i]));
746 SDISK_SetVersion(rxcall, atid, oldversionp, newversionp)
747 struct rx_call *rxcall;
748 struct ubik_tid *atid;
749 struct ubik_version *oldversionp;
750 struct ubik_version *newversionp;
753 struct ubik_dbase *dbase;
755 if ((code = ubik_CheckAuth(rxcall))) {
759 if (!ubik_currentTrans) {
762 /* sanity check to make sure only write trans appear here */
763 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
767 /* Should not get this for the sync site */
768 if (ubeacon_AmSyncSite()) {
772 dbase = ubik_currentTrans->dbase;
774 urecovery_CheckTid(atid);
775 if (!ubik_currentTrans) {
780 /* Set the label if its version matches the sync-site's */
781 if ((oldversionp->epoch == ubik_dbVersion.epoch)
782 && (oldversionp->counter == ubik_dbVersion.counter)) {
783 code = (*dbase->setlabel) (ubik_dbase, 0, newversionp);
785 ubik_dbase->version = *newversionp;
786 ubik_dbVersion = *newversionp;