2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
14 #include <sys/types.h>
23 #include <netinet/in.h>
30 #include <afs/afsutil.h>
32 #define UBIK_INTERNALS
36 int (*ubik_CheckRXSecurityProc) (void *, struct rx_call *);
37 void *ubik_CheckRXSecurityRock;
39 static void printServerInfo(void);
42 * routines for handling requests remotely-submitted by the sync site. These are
43 * only write transactions (we don't propagate read trans), and there is at most one
44 * write transaction extant at any one time.
47 struct ubik_trans *ubik_currentTrans = 0;
50 ubik_CheckAuth(struct rx_call *acall)
53 if (ubik_CheckRXSecurityProc) {
54 code = (*ubik_CheckRXSecurityProc) (ubik_CheckRXSecurityRock, acall);
60 /* the rest of these guys handle remote execution of write
61 * transactions: this is the code executed on the other servers when a
62 * sync site is executing a write transaction.
65 SDISK_Begin(struct rx_call *rxcall, struct ubik_tid *atid)
69 if ((code = ubik_CheckAuth(rxcall))) {
73 urecovery_CheckTid(atid, 1);
74 code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
75 if (!code && ubik_currentTrans) {
76 /* label this trans with the right trans id */
77 ubik_currentTrans->tid.epoch = atid->epoch;
78 ubik_currentTrans->tid.counter = atid->counter;
86 SDISK_Commit(struct rx_call *rxcall, struct ubik_tid *atid)
89 struct ubik_dbase *dbase;
91 if ((code = ubik_CheckAuth(rxcall))) {
95 if (!ubik_currentTrans) {
99 * sanity check to make sure only write trans appear here
101 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
105 dbase = ubik_currentTrans->dbase;
107 ObtainWriteLock(&dbase->cache_lock);
111 urecovery_CheckTid(atid, 0);
112 if (!ubik_currentTrans) {
114 ReleaseWriteLock(&dbase->cache_lock);
118 code = udisk_commit(ubik_currentTrans);
120 /* sync site should now match */
121 ubik_dbVersion = ubik_dbase->version;
124 ReleaseWriteLock(&dbase->cache_lock);
129 SDISK_ReleaseLocks(struct rx_call *rxcall, struct ubik_tid *atid)
131 struct ubik_dbase *dbase;
134 if ((code = ubik_CheckAuth(rxcall))) {
138 if (!ubik_currentTrans) {
141 /* sanity check to make sure only write trans appear here */
142 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
146 dbase = ubik_currentTrans->dbase;
148 urecovery_CheckTid(atid, 0);
149 if (!ubik_currentTrans) {
154 /* If the thread is not waiting for lock - ok to end it */
155 #if !defined(UBIK_PAUSE)
156 if (ubik_currentTrans->locktype != LOCKWAIT) {
157 #endif /* UBIK_PAUSE */
158 udisk_end(ubik_currentTrans);
159 #if !defined(UBIK_PAUSE)
161 #endif /* UBIK_PAUSE */
162 ubik_currentTrans = (struct ubik_trans *)0;
168 SDISK_Abort(struct rx_call *rxcall, struct ubik_tid *atid)
171 struct ubik_dbase *dbase;
173 if ((code = ubik_CheckAuth(rxcall))) {
177 if (!ubik_currentTrans) {
180 /* sanity check to make sure only write trans appear here */
181 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
185 dbase = ubik_currentTrans->dbase;
187 urecovery_CheckTid(atid, 0);
188 if (!ubik_currentTrans) {
193 code = udisk_abort(ubik_currentTrans);
194 /* If the thread is not waiting for lock - ok to end it */
195 #if !defined(UBIK_PAUSE)
196 if (ubik_currentTrans->locktype != LOCKWAIT) {
197 #endif /* UBIK_PAUSE */
198 udisk_end(ubik_currentTrans);
199 #if !defined(UBIK_PAUSE)
201 #endif /* UBIK_PAUSE */
202 ubik_currentTrans = (struct ubik_trans *)0;
207 /* apos and alen are not used */
209 SDISK_Lock(struct rx_call *rxcall, struct ubik_tid *atid,
210 afs_int32 afile, afs_int32 apos, afs_int32 alen, afs_int32 atype)
213 struct ubik_dbase *dbase;
214 struct ubik_trans *ubik_thisTrans;
216 if ((code = ubik_CheckAuth(rxcall))) {
219 if (!ubik_currentTrans) {
222 /* sanity check to make sure only write trans appear here */
223 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
229 dbase = ubik_currentTrans->dbase;
231 urecovery_CheckTid(atid, 0);
232 if (!ubik_currentTrans) {
237 ubik_thisTrans = ubik_currentTrans;
238 code = ulock_getLock(ubik_currentTrans, atype, 1);
240 /* While waiting, the transaction may have been ended/
241 * aborted from under us (urecovery_CheckTid). In that
242 * case, end the transaction here.
244 if (!code && (ubik_currentTrans != ubik_thisTrans)) {
245 udisk_end(ubik_thisTrans);
254 * \brief Write a vector of data
257 SDISK_WriteV(struct rx_call *rxcall, struct ubik_tid *atid,
258 iovec_wrt *io_vector, iovec_buf *io_buffer)
260 afs_int32 code, i, offset;
261 struct ubik_dbase *dbase;
262 struct ubik_iovec *iovec;
265 if ((code = ubik_CheckAuth(rxcall))) {
268 if (!ubik_currentTrans) {
271 /* sanity check to make sure only write trans appear here */
272 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
276 dbase = ubik_currentTrans->dbase;
278 urecovery_CheckTid(atid, 0);
279 if (!ubik_currentTrans) {
284 iovec = (struct ubik_iovec *)io_vector->iovec_wrt_val;
285 iobuf = (char *)io_buffer->iovec_buf_val;
286 for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
287 /* Sanity check for going off end of buffer */
288 if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
292 udisk_write(ubik_currentTrans, iovec[i].file, &iobuf[offset],
293 iovec[i].position, iovec[i].length);
298 offset += iovec[i].length;
306 SDISK_Write(struct rx_call *rxcall, struct ubik_tid *atid,
307 afs_int32 afile, afs_int32 apos, bulkdata *adata)
310 struct ubik_dbase *dbase;
312 if ((code = ubik_CheckAuth(rxcall))) {
315 if (!ubik_currentTrans) {
318 /* sanity check to make sure only write trans appear here */
319 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
323 dbase = ubik_currentTrans->dbase;
325 urecovery_CheckTid(atid, 0);
326 if (!ubik_currentTrans) {
331 udisk_write(ubik_currentTrans, afile, adata->bulkdata_val, apos,
332 adata->bulkdata_len);
338 SDISK_Truncate(struct rx_call *rxcall, struct ubik_tid *atid,
339 afs_int32 afile, afs_int32 alen)
342 struct ubik_dbase *dbase;
344 if ((code = ubik_CheckAuth(rxcall))) {
347 if (!ubik_currentTrans) {
350 /* sanity check to make sure only write trans appear here */
351 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
355 dbase = ubik_currentTrans->dbase;
357 urecovery_CheckTid(atid, 0);
358 if (!ubik_currentTrans) {
362 code = udisk_truncate(ubik_currentTrans, afile, alen);
368 SDISK_GetVersion(struct rx_call *rxcall,
369 struct ubik_version *aversion)
373 if ((code = ubik_CheckAuth(rxcall))) {
378 * If we are the sync site, recovery shouldn't be running on any
379 * other site. We shouldn't be getting this RPC as long as we are
380 * the sync site. To prevent any unforseen activity, we should
381 * reject this RPC until we have recognized that we are not the
382 * sync site anymore, and/or if we have any pending WRITE
383 * transactions that have to complete. This way we can be assured
384 * that this RPC would not block any pending transactions that
385 * should either fail or pass. If we have recognized the fact that
386 * we are not the sync site any more, all write transactions would
387 * fail with UNOQUORUM anyway.
389 if (ubeacon_AmSyncSite()) {
394 code = (*ubik_dbase->getlabel) (ubik_dbase, 0, aversion);
397 /* tell other side there's no dbase */
399 aversion->counter = 0;
405 SDISK_GetFile(struct rx_call *rxcall, afs_int32 file,
406 struct ubik_version *version)
409 struct ubik_dbase *dbase;
411 struct ubik_stat ubikstat;
416 if ((code = ubik_CheckAuth(rxcall))) {
419 /* temporarily disabled because it causes problems for migration tool. Hey, it's just
420 * a sanity check, anyway.
421 if (ubeacon_AmSyncSite()) {
427 code = (*dbase->stat) (dbase, file, &ubikstat);
432 length = ubikstat.size;
433 tlen = htonl(length);
434 code = rx_Write(rxcall, (char *)&tlen, sizeof(afs_int32));
435 if (code != sizeof(afs_int32)) {
437 ubik_dprint("Rx-write length error=%d\n", code);
442 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
443 code = (*dbase->read) (dbase, file, tbuffer, offset, tlen);
446 ubik_dprint("read failed error=%d\n", code);
449 code = rx_Write(rxcall, tbuffer, tlen);
452 ubik_dprint("Rx-write length error=%d\n", code);
458 code = (*dbase->getlabel) (dbase, file, version); /* return the dbase, too */
464 SDISK_SendFile(struct rx_call *rxcall, afs_int32 file,
465 afs_int32 length, struct ubik_version *avers)
468 struct ubik_dbase *dbase = NULL;
471 struct ubik_version tversion;
473 struct rx_peer *tpeer;
474 struct rx_connection *tconn;
475 afs_uint32 otherHost = 0;
477 #ifndef OLD_URECOVERY
484 /* send the file back to the requester */
486 if ((code = ubik_CheckAuth(rxcall))) {
490 /* next, we do a sanity check to see if the guy sending us the database is
491 * the guy we think is the sync site. It turns out that we might not have
492 * decided yet that someone's the sync site, but they could have enough
493 * votes from others to be sync site anyway, and could send us the database
494 * in advance of getting our votes. This is fine, what we're really trying
495 * to check is that some authenticated bogon isn't sending a random database
496 * into another configuration. This could happen on a bad configuration
497 * screwup. Thus, we only object if we're sure we know who the sync site
498 * is, and it ain't the guy talking to us.
500 offset = uvote_GetSyncSite();
501 tconn = rx_ConnectionOf(rxcall);
502 tpeer = rx_PeerOf(tconn);
503 otherHost = ubikGetPrimaryInterfaceAddr(rx_HostOf(tpeer));
504 if (offset && offset != otherHost) {
505 /* we *know* this is the wrong guy */
513 /* abort any active trans that may scribble over the database */
514 urecovery_AbortAll(dbase);
516 ubik_print("Ubik: Synchronize database with server %s\n",
517 afs_inet_ntoa_r(otherHost, hoststr));
521 (*dbase->truncate) (dbase, file, 0); /* truncate first */
522 tversion.counter = 0;
526 tversion.epoch = 0; /* start off by labelling in-transit db as invalid */
527 (*dbase->setlabel) (dbase, file, &tversion); /* setlabel does sync */
528 #ifndef OLD_URECOVERY
530 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
531 fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
536 code = lseek(fd, HDRSIZE, 0);
537 if (code != HDRSIZE) {
543 memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
545 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
546 #if !defined(OLD_URECOVERY) && !defined(AFS_PTHREAD_ENV)
550 code = rx_Read(rxcall, tbuffer, tlen);
553 ubik_dprint("Rx-read length error=%d\n", code);
559 code = (*dbase->write) (dbase, file, tbuffer, offset, tlen);
561 code = write(fd, tbuffer, tlen);
566 ubik_dprint("write failed error=%d\n", code);
574 #ifndef OLD_URECOVERY
580 /* sync data first, then write label and resync (resync done by setlabel call).
581 * This way, good label is only on good database. */
583 (*ubik_dbase->sync) (dbase, file);
585 afs_snprintf(tbuffer, sizeof(tbuffer), "%s.DB%s%d", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
587 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
588 code = unlink(pbuffer);
590 code = rename(tbuffer, pbuffer);
591 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
594 code = rename(pbuffer, tbuffer);
596 (*ubik_dbase->open) (ubik_dbase, file);
598 code = (*ubik_dbase->setlabel) (dbase, file, avers);
599 #ifndef OLD_URECOVERY
602 afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD", ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
606 memcpy(&ubik_dbase->version, avers, sizeof(struct ubik_version));
607 udisk_Invalidate(dbase, file); /* new dbase, flush disk buffers */
608 #ifdef AFS_PTHREAD_ENV
609 assert(pthread_cond_broadcast(&dbase->version_cond) == 0);
611 LWP_NoYieldSignal(&dbase->version);
616 #ifndef OLD_URECOVERY
618 /* Failed to sync. Allow reads again for now. */
620 tversion.epoch = epoch;
621 (*dbase->setlabel) (dbase, file, &tversion);
625 ("Ubik: Synchronize database with server %s failed (error = %d)\n",
626 afs_inet_ntoa_r(otherHost, hoststr), code);
628 ubik_print("Ubik: Synchronize database completed\n");
635 SDISK_Probe(struct rx_call *rxcall)
641 * \brief Update remote machines addresses in my server list
643 * Send back my addresses to caller of this RPC
644 * \return zero on success, else 1.
647 SDISK_UpdateInterfaceAddr(struct rx_call *rxcall,
648 UbikInterfaceAddr *inAddr,
649 UbikInterfaceAddr *outAddr)
651 struct ubik_server *ts, *tmp;
652 afs_uint32 remoteAddr; /* in net byte order */
653 int i, j, found = 0, probableMatch = 0;
656 /* copy the output parameters */
657 for (i = 0; i < UBIK_MAX_INTERFACE_ADDR; i++)
658 outAddr->hostAddr[i] = ntohl(ubik_host[i]);
660 remoteAddr = htonl(inAddr->hostAddr[0]);
661 for (ts = ubik_servers; ts; ts = ts->next)
662 if (ts->addr[0] == remoteAddr) { /* both in net byte order */
668 /* verify that all addresses in the incoming RPC are
669 ** not part of other server entries in my CellServDB
671 for (i = 0; !found && (i < UBIK_MAX_INTERFACE_ADDR)
672 && inAddr->hostAddr[i]; i++) {
673 remoteAddr = htonl(inAddr->hostAddr[i]);
674 for (tmp = ubik_servers; (!found && tmp); tmp = tmp->next) {
675 if (ts == tmp) /* this is my server */
677 for (j = 0; (j < UBIK_MAX_INTERFACE_ADDR) && tmp->addr[j];
679 if (remoteAddr == tmp->addr[j]) {
687 /* if (probableMatch) */
688 /* inconsistent addresses in CellServDB */
689 if (!probableMatch || found) {
690 ubik_print("Inconsistent Cell Info from server: ");
691 for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
692 ubik_print("%s ", afs_inet_ntoa_r(htonl(inAddr->hostAddr[i]), hoststr));
700 /* update our data structures */
701 for (i = 1; i < UBIK_MAX_INTERFACE_ADDR; i++)
702 ts->addr[i] = htonl(inAddr->hostAddr[i]);
704 ubik_print("ubik: A Remote Server has addresses: ");
705 for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && ts->addr[i]; i++)
706 ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
713 printServerInfo(void)
715 struct ubik_server *ts;
719 ubik_print("Local CellServDB:");
720 for (ts = ubik_servers; ts; ts = ts->next, j++) {
721 ubik_print("Server %d: ", j);
722 for (i = 0; (i < UBIK_MAX_INTERFACE_ADDR) && ts->addr[i]; i++)
723 ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
729 SDISK_SetVersion(struct rx_call *rxcall, struct ubik_tid *atid,
730 struct ubik_version *oldversionp,
731 struct ubik_version *newversionp)
734 struct ubik_dbase *dbase;
736 if ((code = ubik_CheckAuth(rxcall))) {
740 if (!ubik_currentTrans) {
743 /* sanity check to make sure only write trans appear here */
744 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
748 /* Should not get this for the sync site */
749 if (ubeacon_AmSyncSite()) {
753 dbase = ubik_currentTrans->dbase;
755 urecovery_CheckTid(atid, 0);
756 if (!ubik_currentTrans) {
761 /* Set the label if its version matches the sync-site's */
762 if ((oldversionp->epoch == ubik_dbVersion.epoch)
763 && (oldversionp->counter == ubik_dbVersion.counter)) {
764 code = (*dbase->setlabel) (ubik_dbase, 0, newversionp);
766 ubik_dbase->version = *newversionp;
767 ubik_dbVersion = *newversionp;