2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
19 #include <afs/afsutil.h>
21 #define UBIK_INTERNALS
25 static void printServerInfo(void);
28 * routines for handling requests remotely-submitted by the sync site. These are
29 * only write transactions (we don't propagate read trans), and there is at most one
30 * write transaction extant at any one time.
33 struct ubik_trans *ubik_currentTrans = 0;
37 /* the rest of these guys handle remote execution of write
38 * transactions: this is the code executed on the other servers when a
39 * sync site is executing a write transaction.
42 SDISK_Begin(struct rx_call *rxcall, struct ubik_tid *atid)
46 if ((code = ubik_CheckAuth(rxcall))) {
50 urecovery_CheckTid(atid, 1);
51 code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
52 if (!code && ubik_currentTrans) {
53 /* label this trans with the right trans id */
54 ubik_currentTrans->tid.epoch = atid->epoch;
55 ubik_currentTrans->tid.counter = atid->counter;
63 SDISK_Commit(struct rx_call *rxcall, struct ubik_tid *atid)
66 struct ubik_dbase *dbase;
68 if ((code = ubik_CheckAuth(rxcall))) {
72 if (!ubik_currentTrans) {
76 * sanity check to make sure only write trans appear here
78 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
82 dbase = ubik_currentTrans->dbase;
84 ObtainWriteLock(&dbase->cache_lock);
88 urecovery_CheckTid(atid, 0);
89 if (!ubik_currentTrans) {
91 ReleaseWriteLock(&dbase->cache_lock);
95 code = udisk_commit(ubik_currentTrans);
97 /* sync site should now match */
98 uvote_set_dbVersion(ubik_dbase->version);
101 ReleaseWriteLock(&dbase->cache_lock);
106 SDISK_ReleaseLocks(struct rx_call *rxcall, struct ubik_tid *atid)
108 struct ubik_dbase *dbase;
111 if ((code = ubik_CheckAuth(rxcall))) {
115 if (!ubik_currentTrans) {
118 /* sanity check to make sure only write trans appear here */
119 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
123 dbase = ubik_currentTrans->dbase;
125 urecovery_CheckTid(atid, 0);
126 if (!ubik_currentTrans) {
131 /* If the thread is not waiting for lock - ok to end it */
132 if (ubik_currentTrans->locktype != LOCKWAIT) {
133 udisk_end(ubik_currentTrans);
135 ubik_currentTrans = (struct ubik_trans *)0;
141 SDISK_Abort(struct rx_call *rxcall, struct ubik_tid *atid)
144 struct ubik_dbase *dbase;
146 if ((code = ubik_CheckAuth(rxcall))) {
150 if (!ubik_currentTrans) {
153 /* sanity check to make sure only write trans appear here */
154 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
158 dbase = ubik_currentTrans->dbase;
160 urecovery_CheckTid(atid, 0);
161 if (!ubik_currentTrans) {
166 code = udisk_abort(ubik_currentTrans);
167 /* If the thread is not waiting for lock - ok to end it */
168 if (ubik_currentTrans->locktype != LOCKWAIT) {
169 udisk_end(ubik_currentTrans);
171 ubik_currentTrans = (struct ubik_trans *)0;
176 /* apos and alen are not used */
178 SDISK_Lock(struct rx_call *rxcall, struct ubik_tid *atid,
179 afs_int32 afile, afs_int32 apos, afs_int32 alen, afs_int32 atype)
182 struct ubik_dbase *dbase;
183 struct ubik_trans *ubik_thisTrans;
185 if ((code = ubik_CheckAuth(rxcall))) {
188 if (!ubik_currentTrans) {
191 /* sanity check to make sure only write trans appear here */
192 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
198 dbase = ubik_currentTrans->dbase;
200 urecovery_CheckTid(atid, 0);
201 if (!ubik_currentTrans) {
206 ubik_thisTrans = ubik_currentTrans;
207 code = ulock_getLock(ubik_currentTrans, atype, 1);
209 /* While waiting, the transaction may have been ended/
210 * aborted from under us (urecovery_CheckTid). In that
211 * case, end the transaction here.
213 if (!code && (ubik_currentTrans != ubik_thisTrans)) {
214 udisk_end(ubik_thisTrans);
223 * \brief Write a vector of data
226 SDISK_WriteV(struct rx_call *rxcall, struct ubik_tid *atid,
227 iovec_wrt *io_vector, iovec_buf *io_buffer)
229 afs_int32 code, i, offset;
230 struct ubik_dbase *dbase;
231 struct ubik_iovec *iovec;
234 if ((code = ubik_CheckAuth(rxcall))) {
237 if (!ubik_currentTrans) {
240 /* sanity check to make sure only write trans appear here */
241 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
245 dbase = ubik_currentTrans->dbase;
247 urecovery_CheckTid(atid, 0);
248 if (!ubik_currentTrans) {
253 iovec = (struct ubik_iovec *)io_vector->iovec_wrt_val;
254 iobuf = (char *)io_buffer->iovec_buf_val;
255 for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
256 /* Sanity check for going off end of buffer */
257 if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
261 udisk_write(ubik_currentTrans, iovec[i].file, &iobuf[offset],
262 iovec[i].position, iovec[i].length);
267 offset += iovec[i].length;
275 SDISK_Write(struct rx_call *rxcall, struct ubik_tid *atid,
276 afs_int32 afile, afs_int32 apos, bulkdata *adata)
279 struct ubik_dbase *dbase;
281 if ((code = ubik_CheckAuth(rxcall))) {
284 if (!ubik_currentTrans) {
287 /* sanity check to make sure only write trans appear here */
288 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
292 dbase = ubik_currentTrans->dbase;
294 urecovery_CheckTid(atid, 0);
295 if (!ubik_currentTrans) {
300 udisk_write(ubik_currentTrans, afile, adata->bulkdata_val, apos,
301 adata->bulkdata_len);
307 SDISK_Truncate(struct rx_call *rxcall, struct ubik_tid *atid,
308 afs_int32 afile, afs_int32 alen)
311 struct ubik_dbase *dbase;
313 if ((code = ubik_CheckAuth(rxcall))) {
316 if (!ubik_currentTrans) {
319 /* sanity check to make sure only write trans appear here */
320 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
324 dbase = ubik_currentTrans->dbase;
326 urecovery_CheckTid(atid, 0);
327 if (!ubik_currentTrans) {
331 code = udisk_truncate(ubik_currentTrans, afile, alen);
337 SDISK_GetVersion(struct rx_call *rxcall,
338 struct ubik_version *aversion)
342 if ((code = ubik_CheckAuth(rxcall))) {
347 * If we are the sync site, recovery shouldn't be running on any
348 * other site. We shouldn't be getting this RPC as long as we are
349 * the sync site. To prevent any unforseen activity, we should
350 * reject this RPC until we have recognized that we are not the
351 * sync site anymore, and/or if we have any pending WRITE
352 * transactions that have to complete. This way we can be assured
353 * that this RPC would not block any pending transactions that
354 * should either fail or pass. If we have recognized the fact that
355 * we are not the sync site any more, all write transactions would
356 * fail with UNOQUORUM anyway.
358 if (ubeacon_AmSyncSite()) {
363 code = (*ubik_dbase->getlabel) (ubik_dbase, 0, aversion);
366 /* tell other side there's no dbase */
368 aversion->counter = 0;
374 SDISK_GetFile(struct rx_call *rxcall, afs_int32 file,
375 struct ubik_version *version)
378 struct ubik_dbase *dbase;
380 struct ubik_stat ubikstat;
385 if ((code = ubik_CheckAuth(rxcall))) {
388 /* temporarily disabled because it causes problems for migration tool. Hey, it's just
389 * a sanity check, anyway.
390 if (ubeacon_AmSyncSite()) {
396 code = (*dbase->stat) (dbase, file, &ubikstat);
401 length = ubikstat.size;
402 tlen = htonl(length);
403 code = rx_Write(rxcall, (char *)&tlen, sizeof(afs_int32));
404 if (code != sizeof(afs_int32)) {
406 ubik_dprint("Rx-write length error=%d\n", code);
411 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
412 code = (*dbase->read) (dbase, file, tbuffer, offset, tlen);
415 ubik_dprint("read failed error=%d\n", code);
418 code = rx_Write(rxcall, tbuffer, tlen);
421 ubik_dprint("Rx-write length error=%d\n", code);
427 code = (*dbase->getlabel) (dbase, file, version); /* return the dbase, too */
433 SDISK_SendFile(struct rx_call *rxcall, afs_int32 file,
434 afs_int32 length, struct ubik_version *avers)
437 struct ubik_dbase *dbase = NULL;
440 struct ubik_version tversion;
442 struct rx_peer *tpeer;
443 struct rx_connection *tconn;
444 afs_uint32 otherHost = 0;
451 /* send the file back to the requester */
455 if ((code = ubik_CheckAuth(rxcall))) {
460 /* next, we do a sanity check to see if the guy sending us the database is
461 * the guy we think is the sync site. It turns out that we might not have
462 * decided yet that someone's the sync site, but they could have enough
463 * votes from others to be sync site anyway, and could send us the database
464 * in advance of getting our votes. This is fine, what we're really trying
465 * to check is that some authenticated bogon isn't sending a random database
466 * into another configuration. This could happen on a bad configuration
467 * screwup. Thus, we only object if we're sure we know who the sync site
468 * is, and it ain't the guy talking to us.
470 offset = uvote_GetSyncSite();
471 tconn = rx_ConnectionOf(rxcall);
472 tpeer = rx_PeerOf(tconn);
473 otherHost = ubikGetPrimaryInterfaceAddr(rx_HostOf(tpeer));
474 if (offset && offset != otherHost) {
475 /* we *know* this is the wrong guy */
483 /* abort any active trans that may scribble over the database */
484 urecovery_AbortAll(dbase);
486 ubik_print("Ubik: Synchronize database with server %s\n",
487 afs_inet_ntoa_r(otherHost, hoststr));
490 epoch = tversion.epoch = 0; /* start off by labelling in-transit db as invalid */
491 (*dbase->setlabel) (dbase, file, &tversion); /* setlabel does sync */
492 snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP",
493 ubik_dbase->pathName, (file<0)?"SYS":"",
494 (file<0)?-file:file);
495 fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
500 code = lseek(fd, HDRSIZE, 0);
501 if (code != HDRSIZE) {
506 memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
508 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
509 #if !defined(AFS_PTHREAD_ENV)
513 code = rx_Read(rxcall, tbuffer, tlen);
515 ubik_dprint("Rx-read length error=%d\n", code);
520 code = write(fd, tbuffer, tlen);
523 ubik_dprint("write failed error=%d\n", code);
535 /* sync data first, then write label and resync (resync done by setlabel call).
536 * This way, good label is only on good database. */
537 snprintf(tbuffer, sizeof(tbuffer), "%s.DB%s%d",
538 ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
540 snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD",
541 ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
542 code = unlink(pbuffer);
544 code = rename(tbuffer, pbuffer);
545 snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP",
546 ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
549 code = rename(pbuffer, tbuffer);
551 (*ubik_dbase->open) (ubik_dbase, file);
552 code = (*ubik_dbase->setlabel) (dbase, file, avers);
555 snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD",
556 ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
559 memcpy(&ubik_dbase->version, avers, sizeof(struct ubik_version));
560 udisk_Invalidate(dbase, file); /* new dbase, flush disk buffers */
561 #ifdef AFS_PTHREAD_ENV
562 assert(pthread_cond_broadcast(&dbase->version_cond) == 0);
564 LWP_NoYieldSignal(&dbase->version);
570 /* Failed to sync. Allow reads again for now. */
572 tversion.epoch = epoch;
573 (*dbase->setlabel) (dbase, file, &tversion);
576 ("Ubik: Synchronize database with server %s failed (error = %d)\n",
577 afs_inet_ntoa_r(otherHost, hoststr), code);
579 ubik_print("Ubik: Synchronize database completed\n");
587 SDISK_Probe(struct rx_call *rxcall)
593 * \brief Update remote machines addresses in my server list
595 * Send back my addresses to caller of this RPC
596 * \return zero on success, else 1.
599 SDISK_UpdateInterfaceAddr(struct rx_call *rxcall,
600 UbikInterfaceAddr *inAddr,
601 UbikInterfaceAddr *outAddr)
603 struct ubik_server *ts, *tmp;
604 afs_uint32 remoteAddr; /* in net byte order */
605 int i, j, found = 0, probableMatch = 0;
608 /* copy the output parameters */
609 for (i = 0; i < UBIK_MAX_INTERFACE_ADDR; i++)
610 outAddr->hostAddr[i] = ntohl(ubik_host[i]);
612 remoteAddr = htonl(inAddr->hostAddr[0]);
613 for (ts = ubik_servers; ts; ts = ts->next)
614 if (ts->addr[0] == remoteAddr) { /* both in net byte order */
620 /* verify that all addresses in the incoming RPC are
621 ** not part of other server entries in my CellServDB
623 for (i = 0; !found && (i < UBIK_MAX_INTERFACE_ADDR)
624 && inAddr->hostAddr[i]; i++) {
625 remoteAddr = htonl(inAddr->hostAddr[i]);
626 for (tmp = ubik_servers; (!found && tmp); tmp = tmp->next) {
627 if (ts == tmp) /* this is my server */
629 for (j = 0; (j < UBIK_MAX_INTERFACE_ADDR) && tmp->addr[j];
631 if (remoteAddr == tmp->addr[j]) {
639 /* if (probableMatch) */
640 /* inconsistent addresses in CellServDB */
641 if (!probableMatch || found) {
642 ubik_print("Inconsistent Cell Info from server: ");
643 for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
644 ubik_print("%s ", afs_inet_ntoa_r(htonl(inAddr->hostAddr[i]), hoststr));
652 /* update our data structures */
653 for (i = 1; i < UBIK_MAX_INTERFACE_ADDR; i++)
654 ts->addr[i] = htonl(inAddr->hostAddr[i]);
656 ubik_print("ubik: A Remote Server has addresses: ");
657 for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && ts->addr[i]; i++)
658 ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
665 printServerInfo(void)
667 struct ubik_server *ts;
671 ubik_print("Local CellServDB:");
672 for (ts = ubik_servers; ts; ts = ts->next, j++) {
673 ubik_print("Server %d: ", j);
674 for (i = 0; (i < UBIK_MAX_INTERFACE_ADDR) && ts->addr[i]; i++)
675 ubik_print("%s ", afs_inet_ntoa_r(ts->addr[i], hoststr));
681 SDISK_SetVersion(struct rx_call *rxcall, struct ubik_tid *atid,
682 struct ubik_version *oldversionp,
683 struct ubik_version *newversionp)
686 struct ubik_dbase *dbase;
688 if ((code = ubik_CheckAuth(rxcall))) {
692 if (!ubik_currentTrans) {
695 /* sanity check to make sure only write trans appear here */
696 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
700 /* Should not get this for the sync site */
701 if (ubeacon_AmSyncSite()) {
705 dbase = ubik_currentTrans->dbase;
707 urecovery_CheckTid(atid, 0);
708 if (!ubik_currentTrans) {
713 /* Set the label if its version matches the sync-site's */
714 if (uvote_eq_dbVersion(*oldversionp)) {
715 code = (*dbase->setlabel) (ubik_dbase, 0, newversionp);
717 ubik_dbase->version = *newversionp;
718 uvote_set_dbVersion(*newversionp);