2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
14 #include <sys/types.h>
23 #include <netinet/in.h>
30 #include <afs/afsutil.h>
32 #define UBIK_INTERNALS
36 int (*ubik_CheckRXSecurityProc) (void *, struct rx_call *);
37 void *ubik_CheckRXSecurityRock;
39 static void printServerInfo(void);
42 * routines for handling requests remotely-submitted by the sync site. These are
43 * only write transactions (we don't propagate read trans), and there is at most one
44 * write transaction extant at any one time.
47 struct ubik_trans *ubik_currentTrans = 0;
50 ubik_CheckAuth(register struct rx_call *acall)
52 register afs_int32 code;
53 if (ubik_CheckRXSecurityProc) {
54 code = (*ubik_CheckRXSecurityProc) (ubik_CheckRXSecurityRock, acall);
60 /* the rest of these guys handle remote execution of write
61 * transactions: this is the code executed on the other servers when a
62 * sync site is executing a write transaction.
65 SDISK_Begin(register struct rx_call *rxcall, struct ubik_tid *atid)
67 register afs_int32 code;
69 if ((code = ubik_CheckAuth(rxcall))) {
73 urecovery_CheckTid(atid);
74 if (ubik_currentTrans) {
75 /* If the thread is not waiting for lock - ok to end it */
76 #if !defined(UBIK_PAUSE)
77 if (ubik_currentTrans->locktype != LOCKWAIT) {
78 #endif /* UBIK_PAUSE */
79 udisk_end(ubik_currentTrans);
80 #if !defined(UBIK_PAUSE)
82 #endif /* UBIK_PAUSE */
83 ubik_currentTrans = (struct ubik_trans *)0;
85 code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
86 if (!code && ubik_currentTrans) {
87 /* label this trans with the right trans id */
88 ubik_currentTrans->tid.epoch = atid->epoch;
89 ubik_currentTrans->tid.counter = atid->counter;
97 SDISK_Commit(register struct rx_call *rxcall, struct ubik_tid *atid)
99 register afs_int32 code;
100 register struct ubik_dbase *dbase;
102 if ((code = ubik_CheckAuth(rxcall))) {
106 if (!ubik_currentTrans) {
110 * sanity check to make sure only write trans appear here
112 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
116 dbase = ubik_currentTrans->dbase;
118 urecovery_CheckTid(atid);
119 if (!ubik_currentTrans) {
124 code = udisk_commit(ubik_currentTrans);
126 /* sync site should now match */
127 ubik_dbVersion = ubik_dbase->version;
134 SDISK_ReleaseLocks(register struct rx_call *rxcall, struct ubik_tid *atid)
136 register struct ubik_dbase *dbase;
137 register afs_int32 code;
139 if ((code = ubik_CheckAuth(rxcall))) {
143 if (!ubik_currentTrans) {
146 /* sanity check to make sure only write trans appear here */
147 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
151 dbase = ubik_currentTrans->dbase;
153 urecovery_CheckTid(atid);
154 if (!ubik_currentTrans) {
159 /* If the thread is not waiting for lock - ok to end it */
160 #if !defined(UBIK_PAUSE)
161 if (ubik_currentTrans->locktype != LOCKWAIT) {
162 #endif /* UBIK_PAUSE */
163 udisk_end(ubik_currentTrans);
164 #if !defined(UBIK_PAUSE)
166 #endif /* UBIK_PAUSE */
167 ubik_currentTrans = (struct ubik_trans *)0;
173 SDISK_Abort(register struct rx_call *rxcall, struct ubik_tid *atid)
175 register afs_int32 code;
176 register struct ubik_dbase *dbase;
178 if ((code = ubik_CheckAuth(rxcall))) {
182 if (!ubik_currentTrans) {
185 /* sanity check to make sure only write trans appear here */
186 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
190 dbase = ubik_currentTrans->dbase;
192 urecovery_CheckTid(atid);
193 if (!ubik_currentTrans) {
198 code = udisk_abort(ubik_currentTrans);
199 /* If the thread is not waiting for lock - ok to end it */
200 #if !defined(UBIK_PAUSE)
201 if (ubik_currentTrans->locktype != LOCKWAIT) {
202 #endif /* UBIK_PAUSE */
203 udisk_end(ubik_currentTrans);
204 #if !defined(UBIK_PAUSE)
206 #endif /* UBIK_PAUSE */
207 ubik_currentTrans = (struct ubik_trans *)0;
212 /* apos and alen are not used */
214 SDISK_Lock(register struct rx_call *rxcall, struct ubik_tid *atid,
215 afs_int32 afile, afs_int32 apos, afs_int32 alen, afs_int32 atype)
217 register afs_int32 code;
218 register struct ubik_dbase *dbase;
219 struct ubik_trans *ubik_thisTrans;
221 if ((code = ubik_CheckAuth(rxcall))) {
224 if (!ubik_currentTrans) {
227 /* sanity check to make sure only write trans appear here */
228 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
234 dbase = ubik_currentTrans->dbase;
236 urecovery_CheckTid(atid);
237 if (!ubik_currentTrans) {
242 ubik_thisTrans = ubik_currentTrans;
243 code = ulock_getLock(ubik_currentTrans, atype, 1);
245 /* While waiting, the transaction may have been ended/
246 * aborted from under us (urecovery_CheckTid). In that
247 * case, end the transaction here.
249 if (!code && (ubik_currentTrans != ubik_thisTrans)) {
250 udisk_end(ubik_thisTrans);
259 * \brief Write a vector of data
262 SDISK_WriteV(register struct rx_call *rxcall, struct ubik_tid *atid,
263 iovec_wrt *io_vector, iovec_buf *io_buffer)
265 afs_int32 code, i, offset;
266 struct ubik_dbase *dbase;
267 struct ubik_iovec *iovec;
270 if ((code = ubik_CheckAuth(rxcall))) {
273 if (!ubik_currentTrans) {
276 /* sanity check to make sure only write trans appear here */
277 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
281 dbase = ubik_currentTrans->dbase;
283 urecovery_CheckTid(atid);
284 if (!ubik_currentTrans) {
289 iovec = (struct ubik_iovec *)io_vector->iovec_wrt_val;
290 iobuf = (char *)io_buffer->iovec_buf_val;
291 for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
292 /* Sanity check for going off end of buffer */
293 if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
297 udisk_write(ubik_currentTrans, iovec[i].file, &iobuf[offset],
298 iovec[i].position, iovec[i].length);
303 offset += iovec[i].length;
311 SDISK_Write(register struct rx_call *rxcall, struct ubik_tid *atid,
312 afs_int32 afile, afs_int32 apos, register bulkdata *adata)
314 register afs_int32 code;
315 register struct ubik_dbase *dbase;
317 if ((code = ubik_CheckAuth(rxcall))) {
320 if (!ubik_currentTrans) {
323 /* sanity check to make sure only write trans appear here */
324 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
328 dbase = ubik_currentTrans->dbase;
330 urecovery_CheckTid(atid);
331 if (!ubik_currentTrans) {
336 udisk_write(ubik_currentTrans, afile, adata->bulkdata_val, apos,
337 adata->bulkdata_len);
343 SDISK_Truncate(register struct rx_call *rxcall, struct ubik_tid *atid,
344 afs_int32 afile, afs_int32 alen)
346 register afs_int32 code;
347 register struct ubik_dbase *dbase;
349 if ((code = ubik_CheckAuth(rxcall))) {
352 if (!ubik_currentTrans) {
355 /* sanity check to make sure only write trans appear here */
356 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
360 dbase = ubik_currentTrans->dbase;
362 urecovery_CheckTid(atid);
363 if (!ubik_currentTrans) {
367 code = udisk_truncate(ubik_currentTrans, afile, alen);
373 SDISK_GetVersion(register struct rx_call *rxcall,
374 register struct ubik_version *aversion)
376 register afs_int32 code;
378 if ((code = ubik_CheckAuth(rxcall))) {
383 * If we are the sync site, recovery shouldn't be running on any
384 * other site. We shouldn't be getting this RPC as long as we are
385 * the sync site. To prevent any unforseen activity, we should
386 * reject this RPC until we have recognized that we are not the
387 * sync site anymore, and/or if we have any pending WRITE
388 * transactions that have to complete. This way we can be assured
389 * that this RPC would not block any pending transactions that
390 * should either fail or pass. If we have recognized the fact that
391 * we are not the sync site any more, all write transactions would
392 * fail with UNOQUORUM anyway.
394 if (ubeacon_AmSyncSite()) {
399 code = (*ubik_dbase->getlabel) (ubik_dbase, 0, aversion);
402 /* tell other side there's no dbase */
404 aversion->counter = 0;
410 SDISK_GetFile(register struct rx_call *rxcall, register afs_int32 file,
411 struct ubik_version *version)
413 register afs_int32 code;
414 register struct ubik_dbase *dbase;
415 register afs_int32 offset;
416 struct ubik_stat ubikstat;
421 if ((code = ubik_CheckAuth(rxcall))) {
424 /* temporarily disabled because it causes problems for migration tool. Hey, it's just
425 * a sanity check, anyway.
426 if (ubeacon_AmSyncSite()) {
432 code = (*dbase->stat) (dbase, file, &ubikstat);
437 length = ubikstat.size;
438 tlen = htonl(length);
439 code = rx_Write(rxcall, (char *)&tlen, sizeof(afs_int32));
440 if (code != sizeof(afs_int32)) {
442 ubik_dprint("Rx-write length error=%d\n", code);
447 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
448 code = (*dbase->read) (dbase, file, tbuffer, offset, tlen);
451 ubik_dprint("read failed error=%d\n", code);
454 code = rx_Write(rxcall, tbuffer, tlen);
457 ubik_dprint("Rx-write length error=%d\n", code);
463 code = (*dbase->getlabel) (dbase, file, version); /* return the dbase, too */
469 SDISK_SendFile(register struct rx_call *rxcall, afs_int32 file,
470 afs_int32 length, struct ubik_version *avers)
472 register afs_int32 code;
473 struct ubik_dbase *dbase = NULL;
476 struct ubik_version tversion;
478 struct rx_peer *tpeer;
479 struct rx_connection *tconn;
480 afs_uint32 otherHost = 0;
482 #ifndef OLD_URECOVERY
489 /* send the file back to the requester */
491 if ((code = ubik_CheckAuth(rxcall))) {
495 /* next, we do a sanity check to see if the guy sending us the database is
496 * the guy we think is the sync site. It turns out that we might not have
497 * decided yet that someone's the sync site, but they could have enough
498 * votes from others to be sync site anyway, and could send us the database
499 * in advance of getting our votes. This is fine, what we're really trying
500 * to check is that some authenticated bogon isn't sending a random database
501 * into another configuration. This could happen on a bad configuration
502 * screwup. Thus, we only object if we're sure we know who the sync site
503 * is, and it ain't the guy talking to us.
505 offset = uvote_GetSyncSite();
506 tconn = rx_ConnectionOf(rxcall);
507 tpeer = rx_PeerOf(tconn);
508 otherHost = ubikGetPrimaryInterfaceAddr(rx_HostOf(tpeer));
509 if (offset && offset != otherHost) {
510 /* we *know* this is the wrong guy */
518 /* abort any active trans that may scribble over the database */
519 urecovery_AbortAll(dbase);
521 ubik_print("Ubik: Synchronize database with server %s\n",
522 afs_inet_ntoa_r(otherHost, hoststr));
526 (*dbase->truncate) (dbase, file, 0); /* truncate first */
527 tversion.counter = 0;
531 tversion.epoch = 0; /* start off by labelling in-transit db as invalid */
532 (*dbase->setlabel) (dbase, file, &tversion); /* setlabel does sync */
533 #ifndef OLD_URECOVERY
535 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
536 fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
541 code = lseek(fd, HDRSIZE, 0);
542 if (code != HDRSIZE) {
548 memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
550 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
551 #if !defined(OLD_URECOVERY) && !defined(AFS_PTHREAD_ENV)
555 code = rx_Read(rxcall, tbuffer, tlen);
558 ubik_dprint("Rx-read length error=%d\n", code);
564 code = (*dbase->write) (dbase, file, tbuffer, offset, tlen);
566 code = write(fd, tbuffer, tlen);
571 ubik_dprint("write failed error=%d\n", code);
579 #ifndef OLD_URECOVERY
585 /* sync data first, then write label and resync (resync done by setlabel call).
586 * This way, good label is only on good database. */
588 (*ubik_dbase->sync) (dbase, file);
590 afs_snprintf(tbuffer, sizeof(tbuffer), "%s.DB%s%d", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
592 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
593 code = unlink(pbuffer);
595 code = rename(tbuffer, pbuffer);
596 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
599 code = rename(pbuffer, tbuffer);
601 (*ubik_dbase->open) (ubik_dbase, 0);
603 code = (*ubik_dbase->setlabel) (dbase, file, avers);
604 #ifndef OLD_URECOVERY
607 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
611 memcpy(&ubik_dbase->version, avers, sizeof(struct ubik_version));
612 udisk_Invalidate(dbase, file); /* new dbase, flush disk buffers */
613 #ifdef AFS_PTHREAD_ENV
614 assert(pthread_cond_broadcast(&dbase->version_cond) == 0);
616 LWP_NoYieldSignal(&dbase->version);
621 #ifndef OLD_URECOVERY
623 /* Failed to sync. Allow reads again for now. */
625 tversion.epoch = epoch;
626 (*dbase->setlabel) (dbase, file, &tversion);
630 ("Ubik: Synchronize database with server %s failed (error = %d)\n",
631 afs_inet_ntoa_r(otherHost, hoststr), code);
633 ubik_print("Ubik: Synchronize database completed\n");
640 SDISK_Probe(register struct rx_call *rxcall)
646 * \brief Update remote machines addresses in my server list
648 * Send back my addresses to caller of this RPC
649 * \return zero on success, else 1.
652 SDISK_UpdateInterfaceAddr(register struct rx_call *rxcall,
653 UbikInterfaceAddr *inAddr,
654 UbikInterfaceAddr *outAddr)
656 struct ubik_server *ts, *tmp;
657 afs_uint32 remoteAddr; /* in net byte order */
658 int i, j, found = 0, probableMatch = 0;
661 /* copy the output parameters */
662 for (i = 0; i < UBIK_MAX_INTERFACE_ADDR; i++)
663 outAddr->hostAddr[i] = ntohl(ubik_host[i]);
665 remoteAddr = htonl(inAddr->hostAddr[0]);
666 for (ts = ubik_servers; ts; ts = ts->next)
667 if (ts->addr[0] == remoteAddr) { /* both in net byte order */
673 /* verify that all addresses in the incoming RPC are
674 ** not part of other server entries in my CellServDB
676 for (i = 0; !found && (i < UBIK_MAX_INTERFACE_ADDR)
677 && inAddr->hostAddr[i]; i++) {
678 remoteAddr = htonl(inAddr->hostAddr[i]);
679 for (tmp = ubik_servers; (!found && tmp); tmp = tmp->next) {
680 if (ts == tmp) /* this is my server */
682 for (j = 0; (j < UBIK_MAX_INTERFACE_ADDR) && tmp->addr[j];
684 if (remoteAddr == tmp->addr[j]) {
692 /* if (probableMatch) */
693 /* inconsistent addresses in CellServDB */
694 if (!probableMatch || found) {
695 ubik_print("Inconsistent Cell Info from server: ");
696 for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
697 ubik_print("%s ", afs_inet_ntoa_r(htonl(inAddr->hostAddr[i]), hoststr));
705 /* update our data structures */
706 for (i = 1; i < UBIK_MAX_INTERFACE_ADDR; i++)
707 ts->addr[i] = htonl(inAddr->hostAddr[i]);
709 ubik_print("ubik: A Remote Server has addresses: ");
710 for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && ts->addr[i]; i++)
711 ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
718 printServerInfo(void)
720 struct ubik_server *ts;
724 ubik_print("Local CellServDB:");
725 for (ts = ubik_servers; ts; ts = ts->next, j++) {
726 ubik_print("Server %d: ", j);
727 for (i = 0; (i < UBIK_MAX_INTERFACE_ADDR) && ts->addr[i]; i++)
728 ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
734 SDISK_SetVersion(struct rx_call *rxcall, struct ubik_tid *atid,
735 struct ubik_version *oldversionp,
736 struct ubik_version *newversionp)
739 struct ubik_dbase *dbase;
741 if ((code = ubik_CheckAuth(rxcall))) {
745 if (!ubik_currentTrans) {
748 /* sanity check to make sure only write trans appear here */
749 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
753 /* Should not get this for the sync site */
754 if (ubeacon_AmSyncSite()) {
758 dbase = ubik_currentTrans->dbase;
760 urecovery_CheckTid(atid);
761 if (!ubik_currentTrans) {
766 /* Set the label if its version matches the sync-site's */
767 if ((oldversionp->epoch == ubik_dbVersion.epoch)
768 && (oldversionp->counter == ubik_dbVersion.counter)) {
769 code = (*dbase->setlabel) (ubik_dbase, 0, newversionp);
771 ubik_dbase->version = *newversionp;
772 ubik_dbVersion = *newversionp;