2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
16 #include <sys/types.h>
22 #include <netinet/in.h>
29 #include <afs/afsutil.h>
31 #define UBIK_INTERNALS
34 int (*ubik_CheckRXSecurityProc) ();
35 char *ubik_CheckRXSecurityRock;
36 void printServerInfo();
38 /* routines for handling requests remotely-submitted by the sync site. These are
39 only write transactions (we don't propagate read trans), and there is at most one
40 write transaction extant at any one time.
43 struct ubik_trans *ubik_currentTrans = 0;
47 register struct rx_call *acall;
49 register afs_int32 code;
50 if (ubik_CheckRXSecurityProc) {
51 code = (*ubik_CheckRXSecurityProc) (ubik_CheckRXSecurityRock, acall);
57 /* the rest of these guys handle remote execution of write
58 * transactions: this is the code executed on the other servers when a
59 * sync site is executing a write transaction.
62 SDISK_Begin(rxcall, atid)
63 register struct rx_call *rxcall;
64 struct ubik_tid *atid;
66 register afs_int32 code;
68 if ((code = ubik_CheckAuth(rxcall))) {
72 urecovery_CheckTid(atid);
73 if (ubik_currentTrans) {
74 /* If the thread is not waiting for lock - ok to end it */
75 #if !defined(UBIK_PAUSE)
76 if (ubik_currentTrans->locktype != LOCKWAIT) {
77 #endif /* UBIK_PAUSE */
78 udisk_end(ubik_currentTrans);
79 #if !defined(UBIK_PAUSE)
81 #endif /* UBIK_PAUSE */
82 ubik_currentTrans = (struct ubik_trans *)0;
84 code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
85 if (!code && ubik_currentTrans) {
86 /* label this trans with the right trans id */
87 ubik_currentTrans->tid.epoch = atid->epoch;
88 ubik_currentTrans->tid.counter = atid->counter;
96 SDISK_Commit(rxcall, atid)
97 register struct rx_call *rxcall;
98 struct ubik_tid *atid;
100 register afs_int32 code;
101 register struct ubik_dbase *dbase;
103 if ((code = ubik_CheckAuth(rxcall))) {
107 if (!ubik_currentTrans) {
111 * sanity check to make sure only write trans appear here
113 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
117 dbase = ubik_currentTrans->dbase;
119 urecovery_CheckTid(atid);
120 if (!ubik_currentTrans) {
125 code = udisk_commit(ubik_currentTrans);
127 /* sync site should now match */
128 ubik_dbVersion = ubik_dbase->version;
135 SDISK_ReleaseLocks(rxcall, atid)
136 register struct rx_call *rxcall;
137 struct ubik_tid *atid;
139 register struct ubik_dbase *dbase;
140 register afs_int32 code;
142 if ((code = ubik_CheckAuth(rxcall))) {
146 if (!ubik_currentTrans) {
149 /* sanity check to make sure only write trans appear here */
150 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
154 dbase = ubik_currentTrans->dbase;
156 urecovery_CheckTid(atid);
157 if (!ubik_currentTrans) {
162 /* If the thread is not waiting for lock - ok to end it */
163 #if !defined(UBIK_PAUSE)
164 if (ubik_currentTrans->locktype != LOCKWAIT) {
165 #endif /* UBIK_PAUSE */
166 udisk_end(ubik_currentTrans);
167 #if !defined(UBIK_PAUSE)
169 #endif /* UBIK_PAUSE */
170 ubik_currentTrans = (struct ubik_trans *)0;
176 SDISK_Abort(rxcall, atid)
177 register struct rx_call *rxcall;
178 struct ubik_tid *atid;
180 register afs_int32 code;
181 register struct ubik_dbase *dbase;
183 if ((code = ubik_CheckAuth(rxcall))) {
187 if (!ubik_currentTrans) {
190 /* sanity check to make sure only write trans appear here */
191 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
195 dbase = ubik_currentTrans->dbase;
197 urecovery_CheckTid(atid);
198 if (!ubik_currentTrans) {
203 code = udisk_abort(ubik_currentTrans);
204 /* If the thread is not waiting for lock - ok to end it */
205 #if !defined(UBIK_PAUSE)
206 if (ubik_currentTrans->locktype != LOCKWAIT) {
207 #endif /* UBIK_PAUSE */
208 udisk_end(ubik_currentTrans);
209 #if !defined(UBIK_PAUSE)
211 #endif /* UBIK_PAUSE */
212 ubik_currentTrans = (struct ubik_trans *)0;
218 SDISK_Lock(rxcall, atid, afile, apos, alen, atype)
219 register struct rx_call *rxcall;
220 struct ubik_tid *atid;
221 afs_int32 afile, apos, alen, atype; /* apos and alen are not used */
223 register afs_int32 code;
224 register struct ubik_dbase *dbase;
225 struct ubik_trans *ubik_thisTrans;
227 if ((code = ubik_CheckAuth(rxcall))) {
230 if (!ubik_currentTrans) {
233 /* sanity check to make sure only write trans appear here */
234 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
240 dbase = ubik_currentTrans->dbase;
242 urecovery_CheckTid(atid);
243 if (!ubik_currentTrans) {
248 ubik_thisTrans = ubik_currentTrans;
249 code = ulock_getLock(ubik_currentTrans, atype, 1);
251 /* While waiting, the transaction may have been ended/
252 * aborted from under us (urecovery_CheckTid). In that
253 * case, end the transaction here.
255 if (!code && (ubik_currentTrans != ubik_thisTrans)) {
256 udisk_end(ubik_thisTrans);
264 /* Write a vector of data */
266 SDISK_WriteV(rxcall, atid, io_vector, io_buffer)
267 register struct rx_call *rxcall;
268 struct ubik_tid *atid;
269 iovec_wrt *io_vector;
270 iovec_buf *io_buffer;
272 afs_int32 code, i, offset;
273 struct ubik_dbase *dbase;
274 struct ubik_iovec *iovec;
277 if ((code = ubik_CheckAuth(rxcall))) {
280 if (!ubik_currentTrans) {
283 /* sanity check to make sure only write trans appear here */
284 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
288 dbase = ubik_currentTrans->dbase;
290 urecovery_CheckTid(atid);
291 if (!ubik_currentTrans) {
296 iovec = (struct ubik_iovec *)io_vector->iovec_wrt_val;
297 iobuf = (char *)io_buffer->iovec_buf_val;
298 for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
299 /* Sanity check for going off end of buffer */
300 if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
304 udisk_write(ubik_currentTrans, iovec[i].file, &iobuf[offset],
305 iovec[i].position, iovec[i].length);
310 offset += iovec[i].length;
318 SDISK_Write(rxcall, atid, afile, apos, adata)
319 register struct rx_call *rxcall;
320 struct ubik_tid *atid;
321 afs_int32 afile, apos;
322 register bulkdata *adata;
324 register afs_int32 code;
325 register struct ubik_dbase *dbase;
327 if ((code = ubik_CheckAuth(rxcall))) {
330 if (!ubik_currentTrans) {
333 /* sanity check to make sure only write trans appear here */
334 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
338 dbase = ubik_currentTrans->dbase;
340 urecovery_CheckTid(atid);
341 if (!ubik_currentTrans) {
346 udisk_write(ubik_currentTrans, afile, adata->bulkdata_val, apos,
347 adata->bulkdata_len);
353 SDISK_Truncate(rxcall, atid, afile, alen)
354 register struct rx_call *rxcall;
355 struct ubik_tid *atid;
359 register afs_int32 code;
360 register struct ubik_dbase *dbase;
362 if ((code = ubik_CheckAuth(rxcall))) {
365 if (!ubik_currentTrans) {
368 /* sanity check to make sure only write trans appear here */
369 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
373 dbase = ubik_currentTrans->dbase;
375 urecovery_CheckTid(atid);
376 if (!ubik_currentTrans) {
380 code = udisk_truncate(ubik_currentTrans, afile, alen);
386 SDISK_GetVersion(rxcall, aversion)
387 register struct rx_call *rxcall;
388 register struct ubik_version *aversion;
390 register afs_int32 code;
392 if ((code = ubik_CheckAuth(rxcall))) {
397 * If we are the sync site, recovery shouldn't be running on any
398 * other site. We shouldn't be getting this RPC as long as we are
399 * the sync site. To prevent any unforseen activity, we should
400 * reject this RPC until we have recognized that we are not the
401 * sync site anymore, and/or if we have any pending WRITE
402 * transactions that have to complete. This way we can be assured
403 * that this RPC would not block any pending transactions that
404 * should either fail or pass. If we have recognized the fact that
405 * we are not the sync site any more, all write transactions would
406 * fail with UNOQUORUM anyway.
408 if (ubeacon_AmSyncSite()) {
413 code = (*ubik_dbase->getlabel) (ubik_dbase, 0, aversion);
416 /* tell other side there's no dbase */
418 aversion->counter = 0;
424 SDISK_GetFile(rxcall, file, version)
425 register struct rx_call *rxcall;
426 register afs_int32 file;
427 struct ubik_version *version;
429 register afs_int32 code;
430 register struct ubik_dbase *dbase;
431 register afs_int32 offset;
432 struct ubik_stat ubikstat;
437 if ((code = ubik_CheckAuth(rxcall))) {
440 /* temporarily disabled because it causes problems for migration tool. Hey, it's just
441 * a sanity check, anyway.
442 if (ubeacon_AmSyncSite()) {
448 code = (*dbase->stat) (dbase, file, &ubikstat);
453 length = ubikstat.size;
454 tlen = htonl(length);
455 code = rx_Write(rxcall, (char *)&tlen, sizeof(afs_int32));
456 if (code != sizeof(afs_int32)) {
458 ubik_dprint("Rx-write length error=%d\n", code);
463 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
464 code = (*dbase->read) (dbase, file, tbuffer, offset, tlen);
467 ubik_dprint("read failed error=%d\n", code);
470 code = rx_Write(rxcall, tbuffer, tlen);
473 ubik_dprint("Rx-write length error=%d\n", code);
479 code = (*dbase->getlabel) (dbase, file, version); /* return the dbase, too */
485 SDISK_SendFile(rxcall, file, length, avers)
486 register struct rx_call *rxcall;
489 struct ubik_version *avers;
491 register afs_int32 code;
492 register struct ubik_dbase *dbase;
495 struct ubik_version tversion;
497 struct rx_peer *tpeer;
498 struct rx_connection *tconn;
499 afs_uint32 otherHost;
500 #ifndef OLD_URECOVERY
505 /* send the file back to the requester */
507 if ((code = ubik_CheckAuth(rxcall))) {
511 /* next, we do a sanity check to see if the guy sending us the database is
512 * the guy we think is the sync site. It turns out that we might not have
513 * decided yet that someone's the sync site, but they could have enough
514 * votes from others to be sync site anyway, and could send us the database
515 * in advance of getting our votes. This is fine, what we're really trying
516 * to check is that some authenticated bogon isn't sending a random database
517 * into another configuration. This could happen on a bad configuration
518 * screwup. Thus, we only object if we're sure we know who the sync site
519 * is, and it ain't the guy talking to us.
521 offset = uvote_GetSyncSite();
522 tconn = rx_ConnectionOf(rxcall);
523 tpeer = rx_PeerOf(tconn);
524 otherHost = ubikGetPrimaryInterfaceAddr(rx_HostOf(tpeer));
525 if (offset && offset != otherHost) {
526 /* we *know* this is the wrong guy */
534 /* abort any active trans that may scribble over the database */
535 urecovery_AbortAll(dbase);
537 ubik_print("Ubik: Synchronize database with server %s\n",
538 afs_inet_ntoa(otherHost));
542 (*dbase->truncate) (dbase, file, 0); /* truncate first */
543 tversion.epoch = 0; /* start off by labelling in-transit db as invalid */
544 tversion.counter = 0;
545 (*dbase->setlabel) (dbase, file, &tversion); /* setlabel does sync */
548 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.TMP", ubik_dbase->pathName);
549 fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
554 code = lseek(fd, HDRSIZE, 0);
555 if (code != HDRSIZE) {
560 memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
562 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
563 code = rx_Read(rxcall, tbuffer, tlen);
566 ubik_dprint("Rx-read length error=%d\n", code);
572 code = (*dbase->write) (dbase, file, tbuffer, offset, tlen);
574 code = write(fd, tbuffer, tlen);
578 ubik_dprint("write failed error=%d\n", code);
586 #ifndef OLD_URECOVERY
592 /* sync data first, then write label and resync (resync done by setlabel call).
593 * This way, good label is only on good database. */
595 (*ubik_dbase->sync) (dbase, file);
597 afs_snprintf(tbuffer, sizeof(tbuffer), "%s.DB0", ubik_dbase->pathName);
599 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.OLD", ubik_dbase->pathName);
600 code = unlink(pbuffer);
602 code = rename(tbuffer, pbuffer);
603 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.TMP", ubik_dbase->pathName);
606 code = rename(pbuffer, tbuffer);
609 code = (*ubik_dbase->setlabel) (dbase, file, avers);
610 #ifndef OLD_URECOVERY
612 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.OLD", ubik_dbase->pathName);
616 memcpy(&ubik_dbase->version, avers, sizeof(struct ubik_version));
617 udisk_Invalidate(dbase, file); /* new dbase, flush disk buffers */
618 LWP_NoYieldSignal(&dbase->version);
622 #ifndef OLD_URECOVERY
626 ("Ubik: Synchronize database with server %s failed (error = %d)\n",
627 afs_inet_ntoa(otherHost), code);
629 ubik_print("Ubik: Synchronize database completed\n");
637 register struct rx_call *rxcall;
643 * Update remote machines addresses in my server list
644 * Send back my addresses to caller of this RPC
645 * Returns zero on success, else 1.
648 SDISK_UpdateInterfaceAddr(rxcall, inAddr, outAddr)
649 register struct rx_call *rxcall;
650 UbikInterfaceAddr *inAddr, *outAddr;
652 struct ubik_server *ts, *tmp;
653 afs_uint32 remoteAddr; /* in net byte order */
654 int i, j, found = 0, probableMatch = 0;
656 /* copy the output parameters */
657 for (i = 0; i < UBIK_MAX_INTERFACE_ADDR; i++)
658 outAddr->hostAddr[i] = ntohl(ubik_host[i]);
660 remoteAddr = htonl(inAddr->hostAddr[0]);
661 for (ts = ubik_servers; ts; ts = ts->next)
662 if (ts->addr[0] == remoteAddr) { /* both in net byte order */
668 /* verify that all addresses in the incoming RPC are
669 ** not part of other server entries in my CellServDB
671 for (i = 0; !found && (i < UBIK_MAX_INTERFACE_ADDR)
672 && inAddr->hostAddr[i]; i++) {
673 remoteAddr = htonl(inAddr->hostAddr[i]);
674 for (tmp = ubik_servers; (!found && tmp); tmp = tmp->next) {
675 if (ts == tmp) /* this is my server */
677 for (j = 0; (j < UBIK_MAX_INTERFACE_ADDR) && tmp->addr[j];
679 if (remoteAddr == tmp->addr[j]) {
687 /* if (probableMatch) */
688 /* inconsistent addresses in CellServDB */
689 if (!probableMatch || found) {
690 ubik_print("Inconsistent Cell Info from server: ");
691 for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
692 ubik_print("%s ", afs_inet_ntoa(htonl(inAddr->hostAddr[i])));
700 /* update our data structures */
701 for (i = 1; i < UBIK_MAX_INTERFACE_ADDR; i++)
702 ts->addr[i] = htonl(inAddr->hostAddr[i]);
704 ubik_print("ubik: A Remote Server has addresses: ");
705 for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && ts->addr[i]; i++)
706 ubik_print("%s ", afs_inet_ntoa(ts->addr[i]));
715 struct ubik_server *ts;
718 ubik_print("Local CellServDB:");
719 for (ts = ubik_servers; ts; ts = ts->next, j++) {
720 ubik_print("Server %d: ", j);
721 for (i = 0; (i < UBIK_MAX_INTERFACE_ADDR) && ts->addr[i]; i++)
722 ubik_print("%s ", afs_inet_ntoa(ts->addr[i]));
728 SDISK_SetVersion(rxcall, atid, oldversionp, newversionp)
729 struct rx_call *rxcall;
730 struct ubik_tid *atid;
731 struct ubik_version *oldversionp;
732 struct ubik_version *newversionp;
735 struct ubik_dbase *dbase;
737 if ((code = ubik_CheckAuth(rxcall))) {
741 if (!ubik_currentTrans) {
744 /* sanity check to make sure only write trans appear here */
745 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
749 /* Should not get this for the sync site */
750 if (ubeacon_AmSyncSite()) {
754 dbase = ubik_currentTrans->dbase;
756 urecovery_CheckTid(atid);
757 if (!ubik_currentTrans) {
762 /* Set the label if its version matches the sync-site's */
763 if ((oldversionp->epoch == ubik_dbVersion.epoch)
764 && (oldversionp->counter == ubik_dbVersion.counter)) {
765 code = (*dbase->setlabel) (ubik_dbase, 0, newversionp);
767 ubik_dbase->version = *newversionp;
768 ubik_dbVersion = *newversionp;