2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
15 #include <sys/types.h>
24 #include <netinet/in.h>
31 #include <afs/afsutil.h>
33 #define UBIK_INTERNALS
37 int (*ubik_CheckRXSecurityProc) (void *, struct rx_call *);
38 void *ubik_CheckRXSecurityRock;
40 static void printServerInfo(void);
43 * routines for handling requests remotely-submitted by the sync site. These are
44 * only write transactions (we don't propagate read trans), and there is at most one
45 * write transaction extant at any one time.
48 struct ubik_trans *ubik_currentTrans = 0;
51 ubik_CheckAuth(struct rx_call *acall)
54 if (ubik_CheckRXSecurityProc) {
55 code = (*ubik_CheckRXSecurityProc) (ubik_CheckRXSecurityRock, acall);
61 /* the rest of these guys handle remote execution of write
62 * transactions: this is the code executed on the other servers when a
63 * sync site is executing a write transaction.
66 SDISK_Begin(struct rx_call *rxcall, struct ubik_tid *atid)
70 if ((code = ubik_CheckAuth(rxcall))) {
74 urecovery_CheckTid(atid, 1);
75 code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
76 if (!code && ubik_currentTrans) {
77 /* label this trans with the right trans id */
78 ubik_currentTrans->tid.epoch = atid->epoch;
79 ubik_currentTrans->tid.counter = atid->counter;
87 SDISK_Commit(struct rx_call *rxcall, struct ubik_tid *atid)
90 struct ubik_dbase *dbase;
92 if ((code = ubik_CheckAuth(rxcall))) {
96 if (!ubik_currentTrans) {
100 * sanity check to make sure only write trans appear here
102 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
106 dbase = ubik_currentTrans->dbase;
108 ObtainWriteLock(&dbase->cache_lock);
112 urecovery_CheckTid(atid, 0);
113 if (!ubik_currentTrans) {
115 ReleaseWriteLock(&dbase->cache_lock);
119 code = udisk_commit(ubik_currentTrans);
121 /* sync site should now match */
122 ubik_dbVersion = ubik_dbase->version;
125 ReleaseWriteLock(&dbase->cache_lock);
130 SDISK_ReleaseLocks(struct rx_call *rxcall, struct ubik_tid *atid)
132 struct ubik_dbase *dbase;
135 if ((code = ubik_CheckAuth(rxcall))) {
139 if (!ubik_currentTrans) {
142 /* sanity check to make sure only write trans appear here */
143 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
147 dbase = ubik_currentTrans->dbase;
149 urecovery_CheckTid(atid, 0);
150 if (!ubik_currentTrans) {
155 /* If the thread is not waiting for lock - ok to end it */
156 if (ubik_currentTrans->locktype != LOCKWAIT) {
157 udisk_end(ubik_currentTrans);
159 ubik_currentTrans = (struct ubik_trans *)0;
165 SDISK_Abort(struct rx_call *rxcall, struct ubik_tid *atid)
168 struct ubik_dbase *dbase;
170 if ((code = ubik_CheckAuth(rxcall))) {
174 if (!ubik_currentTrans) {
177 /* sanity check to make sure only write trans appear here */
178 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
182 dbase = ubik_currentTrans->dbase;
184 urecovery_CheckTid(atid, 0);
185 if (!ubik_currentTrans) {
190 code = udisk_abort(ubik_currentTrans);
191 /* If the thread is not waiting for lock - ok to end it */
192 if (ubik_currentTrans->locktype != LOCKWAIT) {
193 udisk_end(ubik_currentTrans);
195 ubik_currentTrans = (struct ubik_trans *)0;
200 /* apos and alen are not used */
202 SDISK_Lock(struct rx_call *rxcall, struct ubik_tid *atid,
203 afs_int32 afile, afs_int32 apos, afs_int32 alen, afs_int32 atype)
206 struct ubik_dbase *dbase;
207 struct ubik_trans *ubik_thisTrans;
209 if ((code = ubik_CheckAuth(rxcall))) {
212 if (!ubik_currentTrans) {
215 /* sanity check to make sure only write trans appear here */
216 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
222 dbase = ubik_currentTrans->dbase;
224 urecovery_CheckTid(atid, 0);
225 if (!ubik_currentTrans) {
230 ubik_thisTrans = ubik_currentTrans;
231 code = ulock_getLock(ubik_currentTrans, atype, 1);
233 /* While waiting, the transaction may have been ended/
234 * aborted from under us (urecovery_CheckTid). In that
235 * case, end the transaction here.
237 if (!code && (ubik_currentTrans != ubik_thisTrans)) {
238 udisk_end(ubik_thisTrans);
247 * \brief Write a vector of data
250 SDISK_WriteV(struct rx_call *rxcall, struct ubik_tid *atid,
251 iovec_wrt *io_vector, iovec_buf *io_buffer)
253 afs_int32 code, i, offset;
254 struct ubik_dbase *dbase;
255 struct ubik_iovec *iovec;
258 if ((code = ubik_CheckAuth(rxcall))) {
261 if (!ubik_currentTrans) {
264 /* sanity check to make sure only write trans appear here */
265 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
269 dbase = ubik_currentTrans->dbase;
271 urecovery_CheckTid(atid, 0);
272 if (!ubik_currentTrans) {
277 iovec = (struct ubik_iovec *)io_vector->iovec_wrt_val;
278 iobuf = (char *)io_buffer->iovec_buf_val;
279 for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
280 /* Sanity check for going off end of buffer */
281 if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
285 udisk_write(ubik_currentTrans, iovec[i].file, &iobuf[offset],
286 iovec[i].position, iovec[i].length);
291 offset += iovec[i].length;
299 SDISK_Write(struct rx_call *rxcall, struct ubik_tid *atid,
300 afs_int32 afile, afs_int32 apos, bulkdata *adata)
303 struct ubik_dbase *dbase;
305 if ((code = ubik_CheckAuth(rxcall))) {
308 if (!ubik_currentTrans) {
311 /* sanity check to make sure only write trans appear here */
312 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
316 dbase = ubik_currentTrans->dbase;
318 urecovery_CheckTid(atid, 0);
319 if (!ubik_currentTrans) {
324 udisk_write(ubik_currentTrans, afile, adata->bulkdata_val, apos,
325 adata->bulkdata_len);
331 SDISK_Truncate(struct rx_call *rxcall, struct ubik_tid *atid,
332 afs_int32 afile, afs_int32 alen)
335 struct ubik_dbase *dbase;
337 if ((code = ubik_CheckAuth(rxcall))) {
340 if (!ubik_currentTrans) {
343 /* sanity check to make sure only write trans appear here */
344 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
348 dbase = ubik_currentTrans->dbase;
350 urecovery_CheckTid(atid, 0);
351 if (!ubik_currentTrans) {
355 code = udisk_truncate(ubik_currentTrans, afile, alen);
361 SDISK_GetVersion(struct rx_call *rxcall,
362 struct ubik_version *aversion)
366 if ((code = ubik_CheckAuth(rxcall))) {
371 * If we are the sync site, recovery shouldn't be running on any
372 * other site. We shouldn't be getting this RPC as long as we are
373 * the sync site. To prevent any unforseen activity, we should
374 * reject this RPC until we have recognized that we are not the
375 * sync site anymore, and/or if we have any pending WRITE
376 * transactions that have to complete. This way we can be assured
377 * that this RPC would not block any pending transactions that
378 * should either fail or pass. If we have recognized the fact that
379 * we are not the sync site any more, all write transactions would
380 * fail with UNOQUORUM anyway.
382 if (ubeacon_AmSyncSite()) {
387 code = (*ubik_dbase->getlabel) (ubik_dbase, 0, aversion);
390 /* tell other side there's no dbase */
392 aversion->counter = 0;
398 SDISK_GetFile(struct rx_call *rxcall, afs_int32 file,
399 struct ubik_version *version)
402 struct ubik_dbase *dbase;
404 struct ubik_stat ubikstat;
409 if ((code = ubik_CheckAuth(rxcall))) {
412 /* temporarily disabled because it causes problems for migration tool. Hey, it's just
413 * a sanity check, anyway.
414 if (ubeacon_AmSyncSite()) {
420 code = (*dbase->stat) (dbase, file, &ubikstat);
425 length = ubikstat.size;
426 tlen = htonl(length);
427 code = rx_Write(rxcall, (char *)&tlen, sizeof(afs_int32));
428 if (code != sizeof(afs_int32)) {
430 ubik_dprint("Rx-write length error=%d\n", code);
435 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
436 code = (*dbase->read) (dbase, file, tbuffer, offset, tlen);
439 ubik_dprint("read failed error=%d\n", code);
442 code = rx_Write(rxcall, tbuffer, tlen);
445 ubik_dprint("Rx-write length error=%d\n", code);
451 code = (*dbase->getlabel) (dbase, file, version); /* return the dbase, too */
457 SDISK_SendFile(struct rx_call *rxcall, afs_int32 file,
458 afs_int32 length, struct ubik_version *avers)
461 struct ubik_dbase *dbase = NULL;
464 struct ubik_version tversion;
466 struct rx_peer *tpeer;
467 struct rx_connection *tconn;
468 afs_uint32 otherHost = 0;
475 /* send the file back to the requester */
477 if ((code = ubik_CheckAuth(rxcall))) {
481 /* next, we do a sanity check to see if the guy sending us the database is
482 * the guy we think is the sync site. It turns out that we might not have
483 * decided yet that someone's the sync site, but they could have enough
484 * votes from others to be sync site anyway, and could send us the database
485 * in advance of getting our votes. This is fine, what we're really trying
486 * to check is that some authenticated bogon isn't sending a random database
487 * into another configuration. This could happen on a bad configuration
488 * screwup. Thus, we only object if we're sure we know who the sync site
489 * is, and it ain't the guy talking to us.
491 offset = uvote_GetSyncSite();
492 tconn = rx_ConnectionOf(rxcall);
493 tpeer = rx_PeerOf(tconn);
494 otherHost = ubikGetPrimaryInterfaceAddr(rx_HostOf(tpeer));
495 if (offset && offset != otherHost) {
496 /* we *know* this is the wrong guy */
504 /* abort any active trans that may scribble over the database */
505 urecovery_AbortAll(dbase);
507 ubik_print("Ubik: Synchronize database with server %s\n",
508 afs_inet_ntoa_r(otherHost, hoststr));
511 epoch = tversion.epoch = 0; /* start off by labelling in-transit db as invalid */
512 (*dbase->setlabel) (dbase, file, &tversion); /* setlabel does sync */
513 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
514 fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
519 code = lseek(fd, HDRSIZE, 0);
520 if (code != HDRSIZE) {
525 memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
527 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
528 #if !defined(AFS_PTHREAD_ENV)
532 code = rx_Read(rxcall, tbuffer, tlen);
535 ubik_dprint("Rx-read length error=%d\n", code);
540 code = write(fd, tbuffer, tlen);
544 ubik_dprint("write failed error=%d\n", code);
556 /* sync data first, then write label and resync (resync done by setlabel call).
557 * This way, good label is only on good database. */
558 afs_snprintf(tbuffer, sizeof(tbuffer), "%s.DB%s%d", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
560 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
561 code = unlink(pbuffer);
563 code = rename(tbuffer, pbuffer);
564 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
567 code = rename(pbuffer, tbuffer);
569 (*ubik_dbase->open) (ubik_dbase, file);
570 code = (*ubik_dbase->setlabel) (dbase, file, avers);
573 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
576 memcpy(&ubik_dbase->version, avers, sizeof(struct ubik_version));
577 udisk_Invalidate(dbase, file); /* new dbase, flush disk buffers */
578 #ifdef AFS_PTHREAD_ENV
579 assert(pthread_cond_broadcast(&dbase->version_cond) == 0);
581 LWP_NoYieldSignal(&dbase->version);
587 /* Failed to sync. Allow reads again for now. */
589 tversion.epoch = epoch;
590 (*dbase->setlabel) (dbase, file, &tversion);
593 ("Ubik: Synchronize database with server %s failed (error = %d)\n",
594 afs_inet_ntoa_r(otherHost, hoststr), code);
596 ubik_print("Ubik: Synchronize database completed\n");
603 SDISK_Probe(struct rx_call *rxcall)
609 * \brief Update remote machines addresses in my server list
611 * Send back my addresses to caller of this RPC
612 * \return zero on success, else 1.
615 SDISK_UpdateInterfaceAddr(struct rx_call *rxcall,
616 UbikInterfaceAddr *inAddr,
617 UbikInterfaceAddr *outAddr)
619 struct ubik_server *ts, *tmp;
620 afs_uint32 remoteAddr; /* in net byte order */
621 int i, j, found = 0, probableMatch = 0;
624 /* copy the output parameters */
625 for (i = 0; i < UBIK_MAX_INTERFACE_ADDR; i++)
626 outAddr->hostAddr[i] = ntohl(ubik_host[i]);
628 remoteAddr = htonl(inAddr->hostAddr[0]);
629 for (ts = ubik_servers; ts; ts = ts->next)
630 if (ts->addr[0] == remoteAddr) { /* both in net byte order */
636 /* verify that all addresses in the incoming RPC are
637 ** not part of other server entries in my CellServDB
639 for (i = 0; !found && (i < UBIK_MAX_INTERFACE_ADDR)
640 && inAddr->hostAddr[i]; i++) {
641 remoteAddr = htonl(inAddr->hostAddr[i]);
642 for (tmp = ubik_servers; (!found && tmp); tmp = tmp->next) {
643 if (ts == tmp) /* this is my server */
645 for (j = 0; (j < UBIK_MAX_INTERFACE_ADDR) && tmp->addr[j];
647 if (remoteAddr == tmp->addr[j]) {
655 /* if (probableMatch) */
656 /* inconsistent addresses in CellServDB */
657 if (!probableMatch || found) {
658 ubik_print("Inconsistent Cell Info from server: ");
659 for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
660 ubik_print("%s ", afs_inet_ntoa_r(htonl(inAddr->hostAddr[i]), hoststr));
668 /* update our data structures */
669 for (i = 1; i < UBIK_MAX_INTERFACE_ADDR; i++)
670 ts->addr[i] = htonl(inAddr->hostAddr[i]);
672 ubik_print("ubik: A Remote Server has addresses: ");
673 for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && ts->addr[i]; i++)
674 ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
681 printServerInfo(void)
683 struct ubik_server *ts;
687 ubik_print("Local CellServDB:");
688 for (ts = ubik_servers; ts; ts = ts->next, j++) {
689 ubik_print("Server %d: ", j);
690 for (i = 0; (i < UBIK_MAX_INTERFACE_ADDR) && ts->addr[i]; i++)
691 ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
697 SDISK_SetVersion(struct rx_call *rxcall, struct ubik_tid *atid,
698 struct ubik_version *oldversionp,
699 struct ubik_version *newversionp)
702 struct ubik_dbase *dbase;
704 if ((code = ubik_CheckAuth(rxcall))) {
708 if (!ubik_currentTrans) {
711 /* sanity check to make sure only write trans appear here */
712 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
716 /* Should not get this for the sync site */
717 if (ubeacon_AmSyncSite()) {
721 dbase = ubik_currentTrans->dbase;
723 urecovery_CheckTid(atid, 0);
724 if (!ubik_currentTrans) {
729 /* Set the label if its version matches the sync-site's */
730 if ((oldversionp->epoch == ubik_dbVersion.epoch)
731 && (oldversionp->counter == ubik_dbVersion.counter)) {
732 code = (*dbase->setlabel) (ubik_dbase, 0, newversionp);
734 ubik_dbase->version = *newversionp;
735 ubik_dbVersion = *newversionp;