urecovery_ResetState(void)
{
urecovery_state = 0;
-#if !defined(AFS_PTHREAD_ENV)
- /* No corresponding LWP_WaitProcess found anywhere for this -- klm */
- LWP_NoYieldSignal(&urecovery_state);
-#endif
return 0;
}
urecovery_LostServer(struct ubik_server *ts)
{
ubeacon_ReinitServer(ts);
-#if !defined(AFS_PTHREAD_ENV)
- /* No corresponding LWP_WaitProcess found anywhere for this -- klm */
- LWP_NoYieldSignal(&urecovery_state);
-#endif
return 0;
}
{
afs_int32 rcode;
- ubik_dprint_25("allbetter checking\n");
+ ViceLog(25, ("allbetter checking\n"));
rcode = 0;
rcode = 1;
}
- ubik_dprint_25("allbetter: returning %d\n", rcode);
+ ViceLog(25, ("allbetter: returning %d\n", rcode));
return rcode;
}
urecovery_AbortAll(struct ubik_dbase *adbase)
{
struct ubik_trans *tt;
+ int reads = 0, writes = 0;
+
for (tt = adbase->activeTrans; tt; tt = tt->next) {
+ if (tt->type == UBIK_WRITETRANS)
+ writes++;
+ else
+ reads++;
udisk_abort(tt);
}
+ ViceLog(0, ("urecovery_AbortAll: just aborted %d read and %d write transactions\n",
+ reads, writes));
return 0;
}
if (atid->epoch != ubik_currentTrans->tid.epoch
|| atid->counter > ubik_currentTrans->tid.counter || abortalways) {
/* don't match, abort it */
+ int endit = 0;
/* If the thread is not waiting for lock - ok to end it */
if (ubik_currentTrans->locktype != LOCKWAIT) {
+ endit = 1;
+ }
+
+ ViceLog(0, ("urecovery_CheckTid: Aborting/ending bad remote "
+ "transaction. (tx %d.%d, atid %d.%d, abortalways %d, "
+ "endit %d)\n",
+ ubik_currentTrans->tid.epoch,
+ ubik_currentTrans->tid.counter,
+ atid->epoch, atid->counter,
+ abortalways, endit));
+ if (endit) {
udisk_end(ubik_currentTrans);
}
ubik_currentTrans = (struct ubik_trans *)0;
/* otherwise, skip over the data bytes, too */
tpos += ntohl(buffer[2]) + 3 * sizeof(afs_int32);
} else {
- ubik_print("corrupt log opcode (%d) at position %d\n", opcode,
- tpos);
+ ViceLog(0, ("corrupt log opcode (%d) at position %d\n", opcode,
+ tpos));
break; /* corrupt log! */
}
}
code = (*adbase->setlabel) (adbase, 0, &version);
if (code)
return code;
- ubik_print("Successfully replayed log for interrupted "
+ ViceLog(0, ("Successfully replayed log for interrupted "
"transaction; db version is now %ld.%ld\n",
- (long) version.epoch, (long) version.counter);
+ (long) version.epoch, (long) version.counter));
logIsGood = 1;
break; /* all done now */
} else if (opcode == LOGTRUNCATE) {
len -= thisSize;
}
} else {
- ubik_print("corrupt log opcode (%d) at position %d\n",
- opcode, tpos);
+ ViceLog(0, ("corrupt log opcode (%d) at position %d\n",
+ opcode, tpos));
break; /* corrupt log! */
}
}
if (code)
return code;
} else {
- ubik_print("Log read error on pass 2\n");
+ ViceLog(0, ("Log read error on pass 2\n"));
return UBADLOG;
}
}
adbase->version.counter = 0;
(*adbase->setlabel) (adbase, 0, &adbase->version);
}
-#ifdef AFS_PTHREAD_ENV
- opr_cv_broadcast(&adbase->version_cond);
-#else
- LWP_NoYieldSignal(&adbase->version);
-#endif
UBIK_VERSION_UNLOCK;
}
return 0;
int fd = -1;
afs_int32 pass;
- afs_pthread_setname_self("recovery");
+ opr_threadname_set("recovery");
/* otherwise, begin interaction */
urecovery_state = 0;
IOMGR_Select(0, 0, 0, 0, &tv);
#endif
- ubik_dprint("recovery running in state %x\n", urecovery_state);
+ ViceLog(5, ("recovery running in state %x\n", urecovery_state));
/* Every 30 seconds, check all the down servers and mark them
* as up if they respond. When a server comes up or found to
UBIK_ADDR_LOCK;
rxcall = rx_NewCall(bestServer->disk_rxcid);
- ubik_print("Ubik: Synchronize database with server %s\n",
- afs_inet_ntoa_r(bestServer->addr[0], hoststr));
+ ViceLog(0, ("Ubik: Synchronize database: receive (via GetFile) "
+ "from server %s begin\n",
+ afs_inet_ntoa_r(bestServer->addr[0], hoststr)));
UBIK_ADDR_UNLOCK;
code = StartDISK_GetFile(rxcall, file);
if (code) {
- ubik_dprint("StartDiskGetFile failed=%d\n", code);
+ ViceLog(0, ("StartDiskGetFile failed=%d\n", code));
goto FetchEndCall;
}
nbytes = rx_Read(rxcall, (char *)&length, sizeof(afs_int32));
length = ntohl(length);
if (nbytes != sizeof(afs_int32)) {
- ubik_dprint("Rx-read length error=%d\n", code = BULK_ERROR);
+ ViceLog(0, ("Rx-read length error=%d\n", BULK_ERROR));
code = EIO;
goto FetchEndCall;
}
code = (*ubik_dbase->setlabel) (ubik_dbase, file, &tversion);
UBIK_VERSION_UNLOCK;
if (code) {
- ubik_dprint("setlabel io error=%d\n", code);
+ ViceLog(0, ("setlabel io error=%d\n", code));
goto FetchEndCall;
}
snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP",
#endif
nbytes = rx_Read(rxcall, tbuffer, tlen);
if (nbytes != tlen) {
- ubik_dprint("Rx-read bulk error=%d\n", code = BULK_ERROR);
+ ViceLog(0, ("Rx-read bulk error=%d\n", BULK_ERROR));
code = EIO;
close(fd);
goto FetchEndCall;
ubik_dbase->version.epoch = 0;
ubik_dbase->version.counter = 0;
UBIK_VERSION_UNLOCK;
- ubik_print("Ubik: Synchronize database failed (error = %d)\n",
- code);
+ ViceLog(0,
+ ("Ubik: Synchronize database: receive (via GetFile) "
+ "from server %s failed (error = %d)\n",
+ afs_inet_ntoa_r(bestServer->addr[0], hoststr), code));
} else {
- ubik_print("Ubik: Synchronize database completed\n");
+ ViceLog(0,
+ ("Ubik: Synchronize database: receive (via GetFile) "
+ "from server %s complete, version: %d.%d\n",
+ afs_inet_ntoa_r(bestServer->addr[0], hoststr),
+ ubik_dbase->version.epoch, ubik_dbase->version.counter));
+
urecovery_state |= UBIK_RECHAVEDB;
}
udisk_Invalidate(ubik_dbase, 0); /* data has changed */
-#ifdef AFS_PTHREAD_ENV
- opr_cv_broadcast(&ubik_dbase->version_cond);
-#else
- LWP_NoYieldSignal(&ubik_dbase->version);
-#endif
}
if (!(urecovery_state & UBIK_RECHAVEDB)) {
DBRELE(ubik_dbase);
(*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
UBIK_VERSION_UNLOCK;
udisk_Invalidate(ubik_dbase, 0); /* data may have changed */
-#ifdef AFS_PTHREAD_ENV
- opr_cv_broadcast(&ubik_dbase->version_cond);
-#else
- LWP_NoYieldSignal(&ubik_dbase->version);
-#endif
}
/* Check the other sites and send the database to them if they
* the write-lock above if there is a write transaction in progress,
* but then, it won't hurt to check, will it?
*/
- if (ubik_dbase->flags & DBWRITING) {
+ if (ubik_dbase->dbFlags & DBWRITING) {
struct timeval tv;
int safety = 0;
long cur_usec = 50000;
- while ((ubik_dbase->flags & DBWRITING) && (safety < 500)) {
+ while ((ubik_dbase->dbFlags & DBWRITING) && (safety < 500)) {
DBRELE(ubik_dbase);
/* sleep for a little while */
tv.tv_sec = 0;
UBIK_BEACON_LOCK;
if (!ts->up) {
UBIK_BEACON_UNLOCK;
- ubik_dprint("recovery cannot send version to %s\n",
- afs_inet_ntoa_r(inAddr.s_addr, hoststr));
+ /* It would be nice to have this message at loglevel
+ * 0 as well, but it will log once every 4s for each
+ * down server while in this recovery state. This
+ * should only be changed to loglevel 0 if it is
+ * also rate-limited.
+ */
+ ViceLog(5, ("recovery cannot send version to %s\n",
+ afs_inet_ntoa_r(inAddr.s_addr, hoststr)));
dbok = 0;
continue;
}
UBIK_BEACON_UNLOCK;
- ubik_dprint("recovery sending version to %s\n",
- afs_inet_ntoa_r(inAddr.s_addr, hoststr));
+
if (vcmp(ts->version, ubik_dbase->version) != 0) {
- ubik_dprint("recovery stating local database\n");
+ ViceLog(0, ("Synchronize database: send (via SendFile) "
+ "to server %s begin\n",
+ afs_inet_ntoa_r(inAddr.s_addr, hoststr)));
/* Rx code to do the Bulk Store */
code = (*ubik_dbase->stat) (ubik_dbase, 0, &ubikstat);
StartDISK_SendFile(rxcall, file, length,
&ubik_dbase->version);
if (code) {
- ubik_dprint("StartDiskSendFile failed=%d\n",
- code);
+ ViceLog(0, ("StartDiskSendFile failed=%d\n",
+ code));
goto StoreEndCall;
}
while (length > 0) {
(*ubik_dbase->read) (ubik_dbase, file,
tbuffer, offset, tlen);
if (nbytes != tlen) {
- ubik_dprint("Local disk read error=%d\n",
- code = UIOERROR);
+ code = UIOERROR;
+ ViceLog(0, ("Local disk read error=%d\n", code));
goto StoreEndCall;
}
nbytes = rx_Write(rxcall, tbuffer, tlen);
if (nbytes != tlen) {
- ubik_dprint("Rx-write bulk error=%d\n", code =
- BULK_ERROR);
+ code = BULK_ERROR;
+ ViceLog(0, ("Rx-write bulk error=%d\n", code));
goto StoreEndCall;
}
offset += tlen;
StoreEndCall:
code = rx_EndCall(rxcall, code);
}
+
if (code == 0) {
/* we set a new file, process its header */
ts->version = ubik_dbase->version;
ts->currentDB = 1;
- } else
+ ViceLog(0,
+ ("Ubik: Synchronize database: send (via SendFile) "
+ "to server %s complete, version: %d.%d\n",
+ afs_inet_ntoa_r(inAddr.s_addr, hoststr),
+ ts->version.epoch, ts->version.counter));
+
+ } else {
dbok = 0;
+ ViceLog(0,
+ ("Ubik: Synchronize database: send (via SendFile) "
+ "to server %s failed (error = %d)\n",
+ afs_inet_ntoa_r(inAddr.s_addr, hoststr), code));
+ }
} else {
/* mark file up to date */
ts->currentDB = 1;
}
DBRELE(ubik_dbase);
}
- return NULL;
+ AFS_UNREACHED(return(NULL));
}
/*!
connSuccess = conns[success_i];
strcpy(buffer, afs_inet_ntoa_r(server->addr[0], hoststr));
- ubik_print("ubik:server %s is back up: will be contacted through %s\n",
- buffer, afs_inet_ntoa_r(addr, hoststr));
+ ViceLog(0, ("ubik:server %s is back up: will be contacted through %s\n",
+ buffer, afs_inet_ntoa_r(addr, hoststr)));
UBIK_ADDR_UNLOCK;
}
rx_DestroyConnection(conns[i]);
if (!connSuccess)
- ubik_dprint("ubik:server %s still down\n",
- afs_inet_ntoa_r(server->addr[0], hoststr));
+ ViceLog(5, ("ubik:server %s still down\n",
+ afs_inet_ntoa_r(server->addr[0], hoststr)));
if (connSuccess)
return 0; /* success */