From f4a5a040f48d1d618985d419533b363787a9c397 Mon Sep 17 00:00:00 2001 From: UdjinM6 Date: Sun, 10 Nov 2024 10:34:44 +0100 Subject: [PATCH 01/11] cherry pick https://github.com/dashpay/dash/pull/2889 Remove unused function parameters --- src/evo/evonotificationinterface.cpp | 2 +- src/llmq/quorums_chainlocks.cpp | 2 +- src/llmq/quorums_chainlocks.h | 2 +- src/llmq/quorums_init.h | 2 -- 4 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/evo/evonotificationinterface.cpp b/src/evo/evonotificationinterface.cpp index cecb97b68d70c..0739224d31101 100644 --- a/src/evo/evonotificationinterface.cpp +++ b/src/evo/evonotificationinterface.cpp @@ -25,7 +25,7 @@ void EvoNotificationInterface::AcceptedBlockHeader(const CBlockIndex* pindexNew) void EvoNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) { // background thread updates - llmq::chainLocksHandler->UpdatedBlockTip(pindexNew, pindexFork); + llmq::chainLocksHandler->UpdatedBlockTip(pindexNew); llmq::quorumDKGSessionManager->UpdatedBlockTip(pindexNew, fInitialDownload); llmq::quorumManager->UpdatedBlockTip(pindexNew, fInitialDownload); } diff --git a/src/llmq/quorums_chainlocks.cpp b/src/llmq/quorums_chainlocks.cpp index cd98c92fb7a5a..86293d21a37b0 100644 --- a/src/llmq/quorums_chainlocks.cpp +++ b/src/llmq/quorums_chainlocks.cpp @@ -184,7 +184,7 @@ void CChainLocksHandler::AcceptedBlockHeader(const CBlockIndex* pindexNew) } } -void CChainLocksHandler::UpdatedBlockTip(const CBlockIndex* pindexNew, const CBlockIndex* pindexFork) +void CChainLocksHandler::UpdatedBlockTip(const CBlockIndex* pindexNew) { // don't call TrySignChainTip directly but instead let the scheduler call it. This way we ensure that cs_main is // never locked and TrySignChainTip is not called twice in parallel. Also avoids recursive calls due to diff --git a/src/llmq/quorums_chainlocks.h b/src/llmq/quorums_chainlocks.h index 06d444cf49e35..6289210e22629 100644 --- a/src/llmq/quorums_chainlocks.h +++ b/src/llmq/quorums_chainlocks.h @@ -74,7 +74,7 @@ class CChainLocksHandler : public CRecoveredSigsListener void ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman); void ProcessNewChainLock(NodeId from, const CChainLockSig& clsig, const uint256& hash); void AcceptedBlockHeader(const CBlockIndex* pindexNew); - void UpdatedBlockTip(const CBlockIndex* pindexNew, const CBlockIndex* pindexFork); + void UpdatedBlockTip(const CBlockIndex* pindexNew); void TrySignChainTip(); void EnforceBestChainLock(); virtual void HandleNewRecoveredSig(const CRecoveredSig& recoveredSig); diff --git a/src/llmq/quorums_init.h b/src/llmq/quorums_init.h index 0f4e3b852cc3c..cc56b5ff7c5c7 100644 --- a/src/llmq/quorums_init.h +++ b/src/llmq/quorums_init.h @@ -14,8 +14,6 @@ class CEvoDB; namespace llmq { -extern CDBWrapper* llmqDb; - // Init/destroy LLMQ globals void InitLLMQSystem(CEvoDB& evoDb, CScheduler* scheduler, bool unitTests); void DestroyLLMQSystem(); From 965a4a79154f6cb64d669a044248478f8a440d01 Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Thu, 11 Apr 2019 22:11:39 +0200 Subject: [PATCH 02/11] Use lazy BLS signatures more often and don't always verify self-recovered sigs (https://github.com/dashpay/dash/pull/2860) * Make CBLSLazySignature thread safe * Perform malleability check in CBLSLazySignature * Use CBLSLazySignature in CRecoveredSig and CInstantSendLock * Only sporadically verify self-recovered signatures * test --- src/llmq/quorums_chainlocks.cpp | 2 +- src/llmq/quorums_signing.cpp | 15 ++++++++------- src/llmq/quorums_signing.h | 30 ++++++++--------------------- src/llmq/quorums_signing_shares.cpp | 21 ++++++++++++-------- src/llmq/quorums_signing_shares.h | 1 + 5 files changed, 31 insertions(+), 38 deletions(-) diff --git a/src/llmq/quorums_chainlocks.cpp b/src/llmq/quorums_chainlocks.cpp index 86293d21a37b0..06fdbdba776d4 100644 --- a/src/llmq/quorums_chainlocks.cpp +++ b/src/llmq/quorums_chainlocks.cpp @@ -345,7 +345,7 @@ void CChainLocksHandler::HandleNewRecoveredSig(const llmq::CRecoveredSig& recove clsig.nHeight = lastSignedHeight; clsig.blockHash = lastSignedMsgHash; - clsig.sig = recoveredSig.sig; + clsig.sig = recoveredSig.sig.Get(); } ProcessNewChainLock(-1, clsig, ::SerializeHash(clsig)); } diff --git a/src/llmq/quorums_signing.cpp b/src/llmq/quorums_signing.cpp index 2cf1804aabc89..a4aee81026af9 100644 --- a/src/llmq/quorums_signing.cpp +++ b/src/llmq/quorums_signing.cpp @@ -100,7 +100,7 @@ bool CRecoveredSigsDb::ReadRecoveredSig(Consensus::LLMQType llmqType, const uint } try { - ret.Unserialize(ds, false); + ret.Unserialize(ds); return true; } catch (std::exception&) { return false; @@ -326,11 +326,6 @@ bool CSigningManager::PreVerifyRecoveredSig(NodeId nodeId, const CRecoveredSig& return false; } - if (!recoveredSig.sig.IsValid()) { - retBan = true; - return false; - } - return true; } @@ -417,8 +412,14 @@ bool CSigningManager::ProcessPendingRecoveredSigs(CConnman& connman) auto& v = p.second; for (auto& recSig : v) { + // we didn't verify the lazy signature until now + if (!recSig.sig.Get().IsValid()) { + batchVerifier.badSources.emplace(nodeId); + break; + } + const auto& quorum = quorums.at(std::make_pair((Consensus::LLMQType)recSig.llmqType, recSig.quorumHash)); - batchVerifier.PushMessage(nodeId, recSig.GetHash(), llmq::utils::BuildSignHash(recSig), recSig.sig, quorum->quorumPublicKey); + batchVerifier.PushMessage(nodeId, recSig.GetHash(), llmq::utils::BuildSignHash(recSig), recSig.sig.Get(), quorum->quorumPublicKey); verifyCount++; } } diff --git a/src/llmq/quorums_signing.h b/src/llmq/quorums_signing.h index a68bded653b3f..619b4518c5811 100644 --- a/src/llmq/quorums_signing.h +++ b/src/llmq/quorums_signing.h @@ -26,34 +26,20 @@ class CRecoveredSig uint256 quorumHash; uint256 id; uint256 msgHash; - CBLSSignature sig; + CBLSLazySignature sig; // only in-memory uint256 hash; public: - template - inline void Serialize(Stream& s) const + SERIALIZE_METHODS(CRecoveredSig, obj) { - s << llmqType; - s << quorumHash; - s << id; - s << msgHash; - s << sig; - } - template - inline void Unserialize(Stream& s, bool checkMalleable = true, bool updateHash = true, bool skipSig = false) - { - s >> llmqType; - s >> quorumHash; - s >> id; - s >> msgHash; - if (!skipSig) { - sig.Unserialize(s, checkMalleable); - if (updateHash) { - UpdateHash(); - } - } + READWRITE(obj.llmqType); + READWRITE(obj.quorumHash); + READWRITE(obj.id); + READWRITE(obj.msgHash); + READWRITE(obj.sig); + SER_READ(obj, obj.UpdateHash()); } void UpdateHash() diff --git a/src/llmq/quorums_signing_shares.cpp b/src/llmq/quorums_signing_shares.cpp index 6ecc124ab1287..1a47dcaeab2f0 100644 --- a/src/llmq/quorums_signing_shares.cpp +++ b/src/llmq/quorums_signing_shares.cpp @@ -753,16 +753,21 @@ void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256& rs.quorumHash = quorum->pindexQuorum->GetBlockHash(); rs.id = id; rs.msgHash = msgHash; - rs.sig = recoveredSig; + rs.sig.Set(recoveredSig); rs.UpdateHash(); - auto signHash = llmq::utils::BuildSignHash(rs); - bool valid = rs.sig.VerifyInsecure(quorum->quorumPublicKey, signHash); - if (!valid) { - // this should really not happen as we have verified all signature shares before - LogPrintf("CSigSharesManager::%s -- own recovered signature is invalid. id=%s, msgHash=%s\n", __func__, - id.ToString(), msgHash.ToString()); - return; + // There should actually be no need to verify the self-recovered signatures as it should always succeed. Let's + // however still verify it from time to time, so that we have a chance to catch bugs. We do only this sporadic + // verification because this is unbatched and thus slow verification that happens here. + if (((recoveredSigsCounter++) % 100) == 0) { + auto signHash = llmq::utils::BuildSignHash(rs); + bool valid = recoveredSig.VerifyInsecure(quorum->quorumPublicKey, signHash); + if (!valid) { + // this should really not happen as we have verified all signature shares before + LogPrintf("CSigSharesManager::%s -- own recovered signature is invalid. id=%s, msgHash=%s\n", __func__, + id.ToString(), msgHash.ToString()); + return; + } } quorumSigningManager->ProcessRecoveredSig(-1, rs, quorum, connman); diff --git a/src/llmq/quorums_signing_shares.h b/src/llmq/quorums_signing_shares.h index ccad1312386b3..25e819ab317b2 100644 --- a/src/llmq/quorums_signing_shares.h +++ b/src/llmq/quorums_signing_shares.h @@ -353,6 +353,7 @@ class CSigSharesManager : public CRecoveredSigsListener FastRandomContext rnd; int64_t lastCleanupTime{0}; + std::atomic recoveredSigsCounter{0}; public: CSigSharesManager(); From dce46adc11d9034d781a1f1ec9577caa844e92f6 Mon Sep 17 00:00:00 2001 From: UdjinM6 Date: Tue, 30 Apr 2019 15:55:11 +0300 Subject: [PATCH 03/11] Bail out in few more places when blockchain is not synced yet (https://github.com/dashpay/dash/pull/2888) * Bail out in few more places when blockchain is not synced yet * Apply review suggestion --- src/llmq/quorums.cpp | 3 ++- src/llmq/quorums_chainlocks.cpp | 16 +++++++++++++--- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/src/llmq/quorums.cpp b/src/llmq/quorums.cpp index 62afd98ede414..f1ec5a3831665 100644 --- a/src/llmq/quorums.cpp +++ b/src/llmq/quorums.cpp @@ -15,6 +15,7 @@ #include "quorums_commitment.h" #include "quorums_dkgsessionmgr.h" #include "shutdown.h" +#include "tiertwo/tiertwo_sync_state.h" #include "univalue.h" #include "validation.h" @@ -165,7 +166,7 @@ CQuorumManager::CQuorumManager(CEvoDB& _evoDb, CBLSWorker& _blsWorker, CDKGSessi void CQuorumManager::UpdatedBlockTip(const CBlockIndex* pindexNew, bool fInitialDownload) { - if (fInitialDownload || !activeMasternodeManager || !deterministicMNManager->IsDIP3Enforced(pindexNew->nHeight)) { + if (!g_tiertwo_sync_state.IsBlockchainSynced() || !activeMasternodeManager) { return; } diff --git a/src/llmq/quorums_chainlocks.cpp b/src/llmq/quorums_chainlocks.cpp index 06fdbdba776d4..af480fdbdb754 100644 --- a/src/llmq/quorums_chainlocks.cpp +++ b/src/llmq/quorums_chainlocks.cpp @@ -13,6 +13,7 @@ #include "scheduler.h" #include "spork.h" #include "sporkid.h" +#include "tiertwo/tiertwo_sync_state.h" #include "validation.h" namespace llmq @@ -207,15 +208,20 @@ void CChainLocksHandler::TrySignChainTip() { Cleanup(); + if (!fMasterNode) { + return; + } + + if (!g_tiertwo_sync_state.IsBlockchainSynced()) { + return; + } + const CBlockIndex* pindex; { LOCK(cs_main); pindex = chainActive.Tip(); } - if (!fMasterNode) { - return; - } if (!pindex->pprev) { return; } @@ -440,6 +446,10 @@ bool CChainLocksHandler::InternalHasConflictingChainLock(int nHeight, const uint void CChainLocksHandler::Cleanup() { + if (!g_tiertwo_sync_state.IsBlockchainSynced()) { + return; + } + { LOCK(cs); if (GetTimeMillis() - lastCleanupTime < CLEANUP_INTERVAL) { From d2a2d15e047210482f0877fd589474ac22a16807 Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Fri, 10 May 2019 11:24:04 +0200 Subject: [PATCH 04/11] Print inputs on which we voted and quorums used for signing (https://github.com/dashpay/dash/pull/2907) * Print inputs on which we voted Also print the corresponding requestId. This makes debugging InstantSend issues easier. * Print quorum hash when signing a share * Remove unused nodesByAddress map in CSigSharesManager::SendMessages Not related to this PR, but a simple cleanup that should have no side effects. --- src/llmq/quorums_signing_shares.cpp | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/llmq/quorums_signing_shares.cpp b/src/llmq/quorums_signing_shares.cpp index 1a47dcaeab2f0..df69c3b6a3805 100644 --- a/src/llmq/quorums_signing_shares.cpp +++ b/src/llmq/quorums_signing_shares.cpp @@ -975,11 +975,6 @@ void CSigSharesManager::CollectSigSharesToAnnounce(std::unordered_map nodesByAddress; - g_connman->ForEachNode([&nodesByAddress](CNode* pnode) { - nodesByAddress.emplace(pnode->addr, pnode->GetId()); - }); - std::unordered_map> sigSharesToRequest; std::unordered_map> sigSharesToSend; std::unordered_map> sigSharesToAnnounce; @@ -1418,8 +1413,8 @@ void CSigSharesManager::Sign(const CQuorumCPtr& quorum, const uint256& id, const sigShare.UpdateKey(); - LogPrint(BCLog::LLMQ, "CSigSharesManager::%s -- signed sigShare. signHash=%s, id=%s, msgHash=%s, time=%s\n", __func__, - signHash.ToString(), sigShare.id.ToString(), sigShare.msgHash.ToString(), t.count()); + LogPrint(BCLog::LLMQ, "CSigSharesManager::%s -- signed sigShare. signHash=%s, id=%s, msgHash=%s, llmqType=%d, quorum=%s, time=%s\n", __func__, + signHash.ToString(), sigShare.id.ToString(), sigShare.msgHash.ToString(), quorum->params.type, quorum->pindexQuorum->GetBlockHash().ToString(), t.count()); ProcessSigShare(-1, sigShare, *g_connman, quorum); } From e01ad46a9664094abdb92b0cce526e852906dbe6 Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Mon, 13 May 2019 10:41:48 +0200 Subject: [PATCH 05/11] Fix db leaks in LLMQ db (https://github.com/dashpay/dash/pull/2914) * Store rs_t key time in big endian Also implement ConvertInvalidTimeKeys to convert old entries. We can remove this later when we know that most MNs have run this code on testnet. The way we stored the time field in the past lead to CleanupOldRecoveredSigs iterating the keys in a strange order, causing no deletion at all and the LLMQ DB filling up. * Write batch in CleanupOldRecoveredSigs when it gets too large This avoids RAM filling up and OOM getting triggered. * Keep track of when a vote was written to the DB and clean up after week Instead of only deleting when the corresponding recovered sig is deleted. It sometimes happens that a masternode votes on something but a recovered sig is never created, which leaves us with a vote that will never be deleted. * Apply suggestions from code review Co-Authored-By: PastaPastaPasta --- src/llmq/quorums_signing.cpp | 160 ++++++++++++++++++++++++++++++++--- src/llmq/quorums_signing.h | 5 ++ 2 files changed, 151 insertions(+), 14 deletions(-) diff --git a/src/llmq/quorums_signing.cpp b/src/llmq/quorums_signing.cpp index a4aee81026af9..1ec720af014d9 100644 --- a/src/llmq/quorums_signing.cpp +++ b/src/llmq/quorums_signing.cpp @@ -26,6 +26,90 @@ std::unique_ptr quorumSigningManager{nullptr}; CRecoveredSigsDb::CRecoveredSigsDb(CDBWrapper& _db) : db(_db) { + if (Params().NetworkIDString() == CBaseChainParams::TESTNET) { + // TODO this can be completely removed after some time (when we're pretty sure the conversion has been run on most testnet MNs) + if (db.Exists(std::string("rs_upgraded"))) { + return; + } + + ConvertInvalidTimeKeys(); + AddVoteTimeKeys(); + + db.Write(std::string("rs_upgraded"), (uint8_t)1); + } +} + +// This converts time values in "rs_t" from host endiannes to big endiannes, which is required to have proper ordering of the keys +void CRecoveredSigsDb::ConvertInvalidTimeKeys() +{ + LogPrintf("CRecoveredSigsDb::%s -- converting invalid rs_t keys\n", __func__); + + std::unique_ptr pcursor(db.NewIterator()); + + auto start = std::make_tuple(std::string("rs_t"), (uint32_t)0, (uint8_t)0, uint256()); + pcursor->Seek(start); + + CDBBatch batch(CLIENT_VERSION | ADDRV2_FORMAT); + size_t cnt = 0; + while (pcursor->Valid()) { + decltype(start) k; + + if (!pcursor->GetKey(k) || std::get<0>(k) != "rs_t") { + break; + } + + batch.Erase(k); + std::get<1>(k) = htobe32(std::get<1>(k)); + batch.Write(k, (uint8_t)1); + + cnt++; + + pcursor->Next(); + } + pcursor.reset(); + + db.WriteBatch(batch); + + LogPrintf("CRecoveredSigsDb::%s -- converted %d invalid rs_t keys\n", __func__, cnt); +} + +// This adds rs_vt keys for every rs_v entry to the DB. The time in the key is set to the current time. +// This causes cleanup of all these votes a week later. +void CRecoveredSigsDb::AddVoteTimeKeys() +{ + LogPrintf("CRecoveredSigsDb::%s -- adding rs_vt keys with current time\n", __func__); + + auto curTime = GetAdjustedTime(); + + std::unique_ptr pcursor(db.NewIterator()); + + auto start = std::make_tuple(std::string("rs_v"), (uint8_t)0, uint256()); + pcursor->Seek(start); + + CDBBatch batch(CLIENT_VERSION | ADDRV2_FORMAT); + size_t cnt = 0; + while (pcursor->Valid()) { + decltype(start) k; + + if (!pcursor->GetKey(k) || std::get<0>(k) != "rs_v") { + break; + } + + uint8_t llmqType = std::get<1>(k); + const uint256& id = std::get<2>(k); + + auto k2 = std::make_tuple(std::string("rs_vt"), (uint32_t)htobe32(curTime), llmqType, id); + batch.Write(k2, (uint8_t)1); + + cnt++; + + pcursor->Next(); + } + pcursor.reset(); + + db.WriteBatch(batch); + + LogPrintf("CRecoveredSigsDb::%s -- added %d rs_vt entries\n", __func__, cnt); } bool CRecoveredSigsDb::HasRecoveredSig(Consensus::LLMQType llmqType, const uint256& id, const uint256& msgHash) @@ -143,13 +227,9 @@ void CRecoveredSigsDb::WriteRecoveredSig(const llmq::CRecoveredSig& recSig) auto k4 = std::make_tuple(std::string("rs_s"), signHash); batch.Write(k4, (uint8_t)1); - // remove the votedForId entry as we won't need it anymore - auto k5 = std::make_tuple(std::string("rs_v"), recSig.llmqType, recSig.id); - batch.Erase(k5); - // store by current time. Allows fast cleanup of old recSigs - auto k6 = std::make_tuple(std::string("rs_t"), (uint32_t)GetAdjustedTime(), recSig.llmqType, recSig.id); - batch.Write(k6, (uint8_t)1); + auto k5 = std::make_tuple(std::string("rs_t"), (uint32_t)htobe32(GetAdjustedTime()), recSig.llmqType, recSig.id); + batch.Write(k5, (uint8_t)1); db.WriteBatch(batch); @@ -167,9 +247,8 @@ void CRecoveredSigsDb::CleanupOldRecoveredSigs(int64_t maxAge) { std::unique_ptr pcursor(db.NewIterator()); - static const uint256 maxUint256 = uint256S("ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"); auto start = std::make_tuple(std::string("rs_t"), (uint32_t)0, (uint8_t)0, uint256()); - auto end = std::make_tuple(std::string("rs_t"), (uint32_t)(GetAdjustedTime() - maxAge), (uint8_t)255, maxUint256); + uint32_t endTime = (uint32_t)(GetAdjustedTime() - maxAge); pcursor->Seek(start); std::vector> toDelete; @@ -178,10 +257,10 @@ void CRecoveredSigsDb::CleanupOldRecoveredSigs(int64_t maxAge) while (pcursor->Valid()) { decltype(start) k; - if (!pcursor->GetKey(k)) { + if (!pcursor->GetKey(k) || std::get<0>(k) != "rs_t") { break; } - if (k >= end) { + if (be32toh(std::get<1>(k)) >= endTime) { break; } @@ -211,16 +290,19 @@ void CRecoveredSigsDb::CleanupOldRecoveredSigs(int64_t maxAge) auto k2 = std::make_tuple(std::string("rs_r"), recSig.llmqType, recSig.id, recSig.msgHash); auto k3 = std::make_tuple(std::string("rs_h"), recSig.GetHash()); auto k4 = std::make_tuple(std::string("rs_s"), signHash); - auto k5 = std::make_tuple(std::string("rs_v"), recSig.llmqType, recSig.id); batch.Erase(k1); batch.Erase(k2); batch.Erase(k3); batch.Erase(k4); - batch.Erase(k5); hasSigForIdCache.erase(std::make_pair((Consensus::LLMQType)recSig.llmqType, recSig.id)); hasSigForSessionCache.erase(signHash); hasSigForHashCache.erase(recSig.GetHash()); + + if (batch.SizeEstimate() >= (1 << 24)) { + db.WriteBatch(batch); + batch.Clear(); + } } } @@ -229,6 +311,8 @@ void CRecoveredSigsDb::CleanupOldRecoveredSigs(int64_t maxAge) } db.WriteBatch(batch); + + LogPrint(BCLog::LLMQ, "CRecoveredSigsDb::%d -- deleted %d entries\n", __func__, toDelete.size()); } bool CRecoveredSigsDb::HasVotedOnId(Consensus::LLMQType llmqType, const uint256& id) @@ -245,8 +329,55 @@ bool CRecoveredSigsDb::GetVoteForId(Consensus::LLMQType llmqType, const uint256& void CRecoveredSigsDb::WriteVoteForId(Consensus::LLMQType llmqType, const uint256& id, const uint256& msgHash) { - auto k = std::make_tuple(std::string("rs_v"), (uint8_t)llmqType, id); - db.Write(k, msgHash); + auto k1 = std::make_tuple(std::string("rs_v"), (uint8_t)llmqType, id); + auto k2 = std::make_tuple(std::string("rs_vt"), (uint32_t)htobe32(GetAdjustedTime()), (uint8_t)llmqType, id); + + CDBBatch batch(CLIENT_VERSION | ADDRV2_FORMAT); + batch.Write(k1, msgHash); + batch.Write(k2, (uint8_t)1); + + db.WriteBatch(batch); +} + +void CRecoveredSigsDb::CleanupOldVotes(int64_t maxAge) +{ + std::unique_ptr pcursor(db.NewIterator()); + + auto start = std::make_tuple(std::string("rs_vt"), (uint32_t)0, (uint8_t)0, uint256()); + uint32_t endTime = (uint32_t)(GetAdjustedTime() - maxAge); + pcursor->Seek(start); + + CDBBatch batch(CLIENT_VERSION | ADDRV2_FORMAT); + size_t cnt = 0; + while (pcursor->Valid()) { + decltype(start) k; + + if (!pcursor->GetKey(k) || std::get<0>(k) != "rs_vt") { + break; + } + if (be32toh(std::get<1>(k)) >= endTime) { + break; + } + + uint8_t llmqType = std::get<2>(k); + const uint256& id = std::get<3>(k); + + batch.Erase(k); + batch.Erase(std::make_tuple(std::string("rs_v"), llmqType, id)); + + cnt++; + + pcursor->Next(); + } + pcursor.reset(); + + if (cnt == 0) { + return; + } + + db.WriteBatch(batch); + + LogPrint(BCLog::LLMQ, "CRecoveredSigsDb::%d -- deleted %d entries\n", __func__, cnt); } ////////////////// @@ -520,6 +651,7 @@ void CSigningManager::Cleanup() int64_t maxAge = DEFAULT_MAX_RECOVERED_SIGS_AGE; db.CleanupOldRecoveredSigs(maxAge); + db.CleanupOldVotes(maxAge); lastCleanupTime = GetTimeMillis(); } diff --git a/src/llmq/quorums_signing.h b/src/llmq/quorums_signing.h index 619b4518c5811..f90c740492536 100644 --- a/src/llmq/quorums_signing.h +++ b/src/llmq/quorums_signing.h @@ -67,6 +67,9 @@ class CRecoveredSigsDb public: CRecoveredSigsDb(CDBWrapper& _db); + void ConvertInvalidTimeKeys(); + void AddVoteTimeKeys(); + bool HasRecoveredSig(Consensus::LLMQType llmqType, const uint256& id, const uint256& msgHash); bool HasRecoveredSigForId(Consensus::LLMQType llmqType, const uint256& id); bool HasRecoveredSigForSession(const uint256& signHash); @@ -82,6 +85,8 @@ class CRecoveredSigsDb bool GetVoteForId(Consensus::LLMQType llmqType, const uint256& id, uint256& msgHashRet); void WriteVoteForId(Consensus::LLMQType llmqType, const uint256& id, const uint256& msgHash); + void CleanupOldVotes(int64_t maxAge); + private: bool ReadRecoveredSig(Consensus::LLMQType llmqType, const uint256& id, CRecoveredSig& ret); }; From 44ad484c1f95ff51d39fd37e10b68a9e88e4418e Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Sat, 16 Feb 2019 15:49:19 +0100 Subject: [PATCH 06/11] Optimize LLMQs sending of sig shares (https://github.com/dashpay/dash/pull/2704) --- src/llmq/quorums_signing_shares.cpp | 20 +++++---- src/net.cpp | 63 ++++++++++++++++++++++++++++- src/net.h | 10 ++++- 3 files changed, 82 insertions(+), 11 deletions(-) diff --git a/src/llmq/quorums_signing_shares.cpp b/src/llmq/quorums_signing_shares.cpp index df69c3b6a3805..e84082f508ebb 100644 --- a/src/llmq/quorums_signing_shares.cpp +++ b/src/llmq/quorums_signing_shares.cpp @@ -1038,13 +1038,13 @@ bool CSigSharesManager::SendMessages() llmq::utils::BuildSignHash(sigSesAnn).ToString(), sigSesAnn.sessionId, pnode->GetId()); msgs.emplace_back(sigSesAnn); if (msgs.size() == MAX_MSGS_CNT_QSIGSESANN) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs)); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs), false); msgs.clear(); didSend = true; } } if (!msgs.empty()) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs)); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs), false); didSend = true; } } @@ -1058,13 +1058,13 @@ bool CSigSharesManager::SendMessages() p.first.ToString(), p.second.ToString(), pnode->GetId()); msgs.emplace_back(std::move(p.second)); if (msgs.size() == MAX_MSGS_CNT_QGETSIGSHARES) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs)); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs), false); msgs.clear(); didSend = true; } } if (!msgs.empty()) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs)); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs), false); didSend = true; } } @@ -1078,7 +1078,7 @@ bool CSigSharesManager::SendMessages() LogPrint(BCLog::LLMQ, "CSigSharesManager::SendMessages -- QBSIGSHARES signHash=%s, inv={%s}, node=%d\n", p.first.ToString(), p.second.ToInvString(), pnode->GetId()); if (totalSigsCount + p.second.sigShares.size() > MAX_MSGS_TOTAL_BATCHED_SIGS) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, msgs)); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, msgs), false); msgs.clear(); totalSigsCount = 0; didSend = true; @@ -1087,7 +1087,7 @@ bool CSigSharesManager::SendMessages() msgs.emplace_back(std::move(p.second)); } if (!msgs.empty()) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, std::move(msgs))); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, std::move(msgs)), false); didSend = true; } } @@ -1101,13 +1101,13 @@ bool CSigSharesManager::SendMessages() p.first.ToString(), p.second.ToString(), pnode->GetId()); msgs.emplace_back(std::move(p.second)); if (msgs.size() == MAX_MSGS_CNT_QSIGSHARESINV) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs)); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs), false); msgs.clear(); didSend = true; } } if (!msgs.empty()) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs)); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs), false); didSend = true; } } @@ -1116,6 +1116,10 @@ bool CSigSharesManager::SendMessages() // looped through all nodes, release them g_connman->ReleaseNodeVector(vNodesCopy); + if (didSend) { + g_connman->WakeSelect(); + } + return didSend; } diff --git a/src/net.cpp b/src/net.cpp index ee4dafe96f4c6..4cfc951ef1b06 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1295,6 +1295,15 @@ bool CConnman::GenerateSelectSet(std::set& recv_set, std::set& s } } +#ifndef WIN32 + // We add a pipe to the read set so that the select() call can be woken up from the outside + // This is done when data is added to send buffers (vSendMsg) or when new peers are added + // This is currently only implemented for POSIX compliant systems. This means that Windows will fall back to + // timing out after 50ms and then trying to send. This is ok as we assume that heavy-load daemons are usually + // run on Linux and friends. + recv_set.insert(wakeupPipe[0]); +#endif + return !recv_set.empty() || !send_set.empty() || !error_set.empty(); } @@ -1420,6 +1429,20 @@ void CConnman::SocketHandler() std::set recv_set, send_set, error_set; SocketEvents(recv_set, send_set, error_set); +#ifndef WIN32 + // drain the wakeup pipe + if (recv_set.count(wakeupPipe[0])) { + LogPrint(BCLog::NET, "woke up select()\n"); + char buf[128]; + while (true) { + int r = read(wakeupPipe[0], buf, sizeof(buf)); + if (r <= 0) { + break; + } + } + } +#endif + if (interruptNet) return; // @@ -1534,6 +1557,21 @@ void CConnman::WakeMessageHandler() condMsgProc.notify_one(); } +void CConnman::WakeSelect() +{ +#ifndef WIN32 + if (wakeupPipe[1] == -1) { + return; + } + + LogPrint(BCLog::NET, "waking up select()\n"); + + char buf[1]; + if (write(wakeupPipe[1], buf, 1) != 1) { + LogPrint(BCLog::NET, "write to wakeupPipe failed\n"); + } +#endif +} static std::string GetDNSHost(const CDNSSeedData& data, ServiceFlags* requiredServiceBits) { @@ -2242,6 +2280,22 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions) fMsgProcWake = false; } +#ifndef WIN32 + if (pipe(wakeupPipe) != 0) { + wakeupPipe[0] = wakeupPipe[1] = -1; + LogPrint(BCLog::NET, "pipe() for wakeupPipe failed\n"); + } else { + int fFlags = fcntl(wakeupPipe[0], F_GETFL, 0); + if (fcntl(wakeupPipe[0], F_SETFL, fFlags | O_NONBLOCK) == -1) { + LogPrint(BCLog::NET, "fcntl for O_NONBLOCK on wakeupPipe failed\n"); + } + fFlags = fcntl(wakeupPipe[1], F_GETFL, 0); + if (fcntl(wakeupPipe[1], F_SETFL, fFlags | O_NONBLOCK) == -1) { + LogPrint(BCLog::NET, "fcntl for O_NONBLOCK on wakeupPipe failed\n"); + } + } +#endif + // Send and receive from sockets, accept connections threadSocketHandler = std::thread(&TraceThread >, "net", std::function(std::bind(&CConnman::ThreadSocketHandler, this))); @@ -2371,6 +2425,11 @@ void CConnman::Stop() vhListenSocket.clear(); semOutbound.reset(); semAddnode.reset(); +#ifndef WIN32 + if (wakeupPipe[0] != -1) close(wakeupPipe[0]); + if (wakeupPipe[1] != -1) close(wakeupPipe[1]); + wakeupPipe[0] = wakeupPipe[1] = -1; +#endif } void CConnman::DeleteNode(CNode* pnode) @@ -2683,7 +2742,7 @@ bool CConnman::NodeFullyConnected(const CNode* pnode) return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect; } -void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) +void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg, bool allowOptimisticSend) { size_t nMessageSize = msg.data.size(); size_t nTotalSize = nMessageSize + CMessageHeader::HEADER_SIZE; @@ -2700,7 +2759,7 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) size_t nBytesSent = 0; { LOCK(pnode->cs_vSend); - bool optimisticSend(pnode->vSendMsg.empty()); + bool optimisticSend(allowOptimisticSend && pnode->vSendMsg.empty()); //log total amount of bytes per command pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize; diff --git a/src/net.h b/src/net.h index 581321f36633a..8ba02f3580b57 100644 --- a/src/net.h +++ b/src/net.h @@ -209,7 +209,7 @@ class CConnman bool ForNode(NodeId id, std::function func); bool ForNode(const CService& addr, const std::function& cond, const std::function& func); - void PushMessage(CNode* pnode, CSerializedNetMsg&& msg); + void PushMessage(CNode* pnode, CSerializedNetMsg&& msg, bool allowOptimisticSend = true); template bool ForEachNodeContinueIf(Callable&& func) @@ -360,6 +360,9 @@ class CConnman TierTwoConnMan* GetTierTwoConnMan() { return m_tiertwo_conn_man.get(); }; /** Update the node to be a iqr member if needed */ void UpdateQuorumRelayMemberIfNeeded(CNode* pnode); + /** Interrupt the select/poll system call **/ + void WakeSelect(); + private: struct ListenSocket { SOCKET socket; @@ -479,6 +482,11 @@ class CConnman CThreadInterrupt interruptNet; +#ifndef WIN32 + /** a pipe which is added to select() calls to wakeup before the timeout */ + int wakeupPipe[2]{-1, -1}; +#endif + std::thread threadDNSAddressSeed; std::thread threadSocketHandler; std::thread threadOpenAddedConnections; From 436300d00d18502b952b4b9928fac574e8415cdd Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Thu, 11 Apr 2019 14:43:22 +0200 Subject: [PATCH 07/11] Disable optimistic send in PushMessage by default (https://github.com/dashpay/dash/pull/2859) * Automatically wake up select() when optimistic send was not used But only when we know that we are actually inside select() and that it currenlty is unlikely for it to have selected the node's socket for sending. We accept race conditions here as the select() timeout will ensure that we always send the data. * Don't manually call WakeSelect() in CSigSharesManager::SendMessages Not needed anymore * Disable optimistic send in PushMessage by default --- src/llmq/quorums_signing_shares.cpp | 4 ---- src/net.cpp | 8 ++++++++ src/net.h | 16 +++++++++++++++- 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/src/llmq/quorums_signing_shares.cpp b/src/llmq/quorums_signing_shares.cpp index e84082f508ebb..e6b4fefe3d23c 100644 --- a/src/llmq/quorums_signing_shares.cpp +++ b/src/llmq/quorums_signing_shares.cpp @@ -1116,10 +1116,6 @@ bool CSigSharesManager::SendMessages() // looped through all nodes, release them g_connman->ReleaseNodeVector(vNodesCopy); - if (didSend) { - g_connman->WakeSelect(); - } - return didSend; } diff --git a/src/net.cpp b/src/net.cpp index 4cfc951ef1b06..6ecf8c3a26967 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1339,7 +1339,9 @@ void CConnman::SocketEvents(std::set& recv_set, std::set& send_s vpollfds.push_back(std::move(it.second)); } + isInSelect = true; if (poll(vpollfds.data(), vpollfds.size(), SELECT_TIMEOUT_MILLISECONDS) < 0) return; + isInSelect = false; if (interruptNet) return; @@ -1388,7 +1390,9 @@ void CConnman::SocketEvents(std::set &recv_set, std::set &send_s hSocketMax = std::max(hSocketMax, hSocket); } + isInSelect = true; int nSelect = select(hSocketMax + 1, &fdsetRecv, &fdsetSend, &fdsetError, &timeout); + isInSelect = false; if (interruptNet) return; @@ -2759,6 +2763,7 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg, bool allowOpti size_t nBytesSent = 0; { LOCK(pnode->cs_vSend); + bool hasPendingData = !pnode->vSendMsg.empty(); bool optimisticSend(allowOptimisticSend && pnode->vSendMsg.empty()); //log total amount of bytes per command @@ -2774,6 +2779,9 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg, bool allowOpti // If write queue empty, attempt "optimistic write" if (optimisticSend == true) nBytesSent = SocketSendData(pnode); + // wake up select() call in case there was no pending data before (so it was not selecting this socket for sending) + else if (!hasPendingData && isInSelect) + WakeSelect(); } if (nBytesSent) RecordBytesSent(nBytesSent); diff --git a/src/net.h b/src/net.h index 8ba02f3580b57..7e0b0fb6bf618 100644 --- a/src/net.h +++ b/src/net.h @@ -35,6 +35,19 @@ #include #endif +// "Optimistic send" was introduced in the beginning of the Bitcoin project. I assume this was done because it was +// thought that "send" would be very cheap when the send buffer is empty. This is not true, as shown by profiling. +// When a lot of load is seen on the network, the "send" call done in the message handler thread can easily use up 20% +// of time, effectively blocking things that could be done in parallel. We have introduced a way to wake up the select() +// call in the network thread, which allows us to disable optimistic send without introducing an artificial latency/delay +// when sending data. This however only works on non-WIN32 platforms for now. When we add support for WIN32 platforms, +// we can completely remove optimistic send. +#ifdef WIN32 +#define DEFAULT_ALLOW_OPTIMISTIC_SEND true +#else +#define DEFAULT_ALLOW_OPTIMISTIC_SEND false +#endif + class CAddrMan; class CBlockIndex; class CScheduler; @@ -209,7 +222,7 @@ class CConnman bool ForNode(NodeId id, std::function func); bool ForNode(const CService& addr, const std::function& cond, const std::function& func); - void PushMessage(CNode* pnode, CSerializedNetMsg&& msg, bool allowOptimisticSend = true); + void PushMessage(CNode* pnode, CSerializedNetMsg&& msg, bool allowOptimisticSend = DEFAULT_ALLOW_OPTIMISTIC_SEND); template bool ForEachNodeContinueIf(Callable&& func) @@ -486,6 +499,7 @@ class CConnman /** a pipe which is added to select() calls to wakeup before the timeout */ int wakeupPipe[2]{-1, -1}; #endif + std::atomic isInSelect{false}; std::thread threadDNSAddressSeed; std::thread threadSocketHandler; From 802a933b004a8e36ad70c78e65e6b16d5854b19e Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Fri, 12 Apr 2019 12:58:42 +0200 Subject: [PATCH 08/11] Don't wake up select if it was already woken up (https://github.com/dashpay/dash/pull/2863) This avoids calling WakeupSelect() for each node instead of just once. --- src/net.cpp | 27 ++++++++++++++++----------- src/net.h | 5 +++-- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 6ecf8c3a26967..951c7444e662f 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1295,7 +1295,7 @@ bool CConnman::GenerateSelectSet(std::set& recv_set, std::set& s } } -#ifndef WIN32 +#ifdef USE_WAKEUP_PIPE // We add a pipe to the read set so that the select() call can be woken up from the outside // This is done when data is added to send buffers (vSendMsg) or when new peers are added // This is currently only implemented for POSIX compliant systems. This means that Windows will fall back to @@ -1339,9 +1339,12 @@ void CConnman::SocketEvents(std::set& recv_set, std::set& send_s vpollfds.push_back(std::move(it.second)); } - isInSelect = true; - if (poll(vpollfds.data(), vpollfds.size(), SELECT_TIMEOUT_MILLISECONDS) < 0) return; - isInSelect = false; + wakeupSelectNeeded = true; + int r = poll(vpollfds.data(), vpollfds.size(), SELECT_TIMEOUT_MILLISECONDS); + wakeupSelectNeeded = false; + if (r < 0) { + return; + } if (interruptNet) return; @@ -1390,9 +1393,9 @@ void CConnman::SocketEvents(std::set &recv_set, std::set &send_s hSocketMax = std::max(hSocketMax, hSocket); } - isInSelect = true; + wakeupSelectNeeded = true; int nSelect = select(hSocketMax + 1, &fdsetRecv, &fdsetSend, &fdsetError, &timeout); - isInSelect = false; + wakeupSelectNeeded = false; if (interruptNet) return; @@ -1433,7 +1436,7 @@ void CConnman::SocketHandler() std::set recv_set, send_set, error_set; SocketEvents(recv_set, send_set, error_set); -#ifndef WIN32 +#ifdef USE_WAKEUP_PIPE // drain the wakeup pipe if (recv_set.count(wakeupPipe[0])) { LogPrint(BCLog::NET, "woke up select()\n"); @@ -1563,7 +1566,7 @@ void CConnman::WakeMessageHandler() void CConnman::WakeSelect() { -#ifndef WIN32 +#ifdef USE_WAKEUP_PIPE if (wakeupPipe[1] == -1) { return; } @@ -1575,6 +1578,8 @@ void CConnman::WakeSelect() LogPrint(BCLog::NET, "write to wakeupPipe failed\n"); } #endif + + wakeupSelectNeeded = false; } static std::string GetDNSHost(const CDNSSeedData& data, ServiceFlags* requiredServiceBits) @@ -2284,7 +2289,7 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions) fMsgProcWake = false; } -#ifndef WIN32 +#ifdef USE_WAKEUP_PIPE if (pipe(wakeupPipe) != 0) { wakeupPipe[0] = wakeupPipe[1] = -1; LogPrint(BCLog::NET, "pipe() for wakeupPipe failed\n"); @@ -2429,7 +2434,7 @@ void CConnman::Stop() vhListenSocket.clear(); semOutbound.reset(); semAddnode.reset(); -#ifndef WIN32 +#ifdef USE_WAKEUP_PIPE if (wakeupPipe[0] != -1) close(wakeupPipe[0]); if (wakeupPipe[1] != -1) close(wakeupPipe[1]); wakeupPipe[0] = wakeupPipe[1] = -1; @@ -2780,7 +2785,7 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg, bool allowOpti if (optimisticSend == true) nBytesSent = SocketSendData(pnode); // wake up select() call in case there was no pending data before (so it was not selecting this socket for sending) - else if (!hasPendingData && isInSelect) + else if (!hasPendingData && wakeupSelectNeeded) WakeSelect(); } if (nBytesSent) diff --git a/src/net.h b/src/net.h index 7e0b0fb6bf618..4d5b15a70262c 100644 --- a/src/net.h +++ b/src/net.h @@ -46,6 +46,7 @@ #define DEFAULT_ALLOW_OPTIMISTIC_SEND true #else #define DEFAULT_ALLOW_OPTIMISTIC_SEND false +#define USE_WAKEUP_PIPE #endif class CAddrMan; @@ -495,11 +496,11 @@ class CConnman CThreadInterrupt interruptNet; -#ifndef WIN32 +#ifdef USE_WAKEUP_PIPE /** a pipe which is added to select() calls to wakeup before the timeout */ int wakeupPipe[2]{-1, -1}; #endif - std::atomic isInSelect{false}; + std::atomic wakeupSelectNeeded{false}; std::thread threadDNSAddressSeed; std::thread threadSocketHandler; From be20a71bad6dc06f0e1ba8427783bab3a39973ee Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Mon, 12 Aug 2019 09:35:44 +0200 Subject: [PATCH 09/11] Remove recovered sigs from the LLMQ db when corresponding IS locks get confirmed (https://github.com/dashpay/dash/pull/3048) * Remove unused overload of RemoveInstantSendLock * Move deletion of recovered sigs into own method * Remove recovered sigs for fully confirmed IS locks * Also remove rs_t entries when removing recovered sigs from the outside CleanupOldRecoveredSigs already does this as the last step, but when RemoveRecoveredSig is called from the outside (e.g. from InstantSend), these keys are not removed. This PR fixes this by storing the write time into rs_r and later uses it to remove the rs_t entry. Old entries will be incompatible with this (1 byte written in the past, 4 bytes written now). This checked by comparing the data size with sizeof(uint32_t). * Add TODO --- src/llmq/quorums_signing.cpp | 76 ++++++++++++++++++++++++++---------- src/llmq/quorums_signing.h | 6 +++ 2 files changed, 61 insertions(+), 21 deletions(-) diff --git a/src/llmq/quorums_signing.cpp b/src/llmq/quorums_signing.cpp index 1ec720af014d9..40041bcd8dcf8 100644 --- a/src/llmq/quorums_signing.cpp +++ b/src/llmq/quorums_signing.cpp @@ -211,12 +211,15 @@ void CRecoveredSigsDb::WriteRecoveredSig(const llmq::CRecoveredSig& recSig) { CDBBatch batch(CLIENT_VERSION | ADDRV2_FORMAT); + uint32_t curTime = GetAdjustedTime(); + // we put these close to each other to leverage leveldb's key compaction // this way, the second key can be used for fast HasRecoveredSig checks while the first key stores the recSig auto k1 = std::make_tuple(std::string("rs_r"), recSig.llmqType, recSig.id); auto k2 = std::make_tuple(std::string("rs_r"), recSig.llmqType, recSig.id, recSig.msgHash); batch.Write(k1, recSig); - batch.Write(k2, (uint8_t)1); + // this key is also used to store the current time, so that we can easily get to the "rs_t" key when we have the id + batch.Write(k2, curTime); // store by object hash auto k3 = std::make_tuple(std::string("rs_h"), recSig.GetHash()); @@ -228,7 +231,7 @@ void CRecoveredSigsDb::WriteRecoveredSig(const llmq::CRecoveredSig& recSig) batch.Write(k4, (uint8_t)1); // store by current time. Allows fast cleanup of old recSigs - auto k5 = std::make_tuple(std::string("rs_t"), (uint32_t)htobe32(GetAdjustedTime()), recSig.llmqType, recSig.id); + auto k5 = std::make_tuple(std::string("rs_t"), (uint32_t)htobe32(curTime), recSig.llmqType, recSig.id); batch.Write(k5, (uint8_t)1); db.WriteBatch(batch); @@ -243,6 +246,50 @@ void CRecoveredSigsDb::WriteRecoveredSig(const llmq::CRecoveredSig& recSig) } } +void CRecoveredSigsDb::RemoveRecoveredSig(CDBBatch& batch, Consensus::LLMQType llmqType, const uint256& id, bool deleteTimeKey) +{ + AssertLockHeld(cs); + + CRecoveredSig recSig; + if (!ReadRecoveredSig(llmqType, id, recSig)) { + return; + } + + auto signHash = llmq::utils::BuildSignHash(recSig); + + auto k1 = std::make_tuple(std::string("rs_r"), recSig.llmqType, recSig.id); + auto k2 = std::make_tuple(std::string("rs_r"), recSig.llmqType, recSig.id, recSig.msgHash); + auto k3 = std::make_tuple(std::string("rs_h"), recSig.GetHash()); + auto k4 = std::make_tuple(std::string("rs_s"), signHash); + batch.Erase(k1); + batch.Erase(k2); + batch.Erase(k3); + batch.Erase(k4); + + if (deleteTimeKey) { + CDataStream writeTimeDs(SER_DISK, CLIENT_VERSION); + // TODO remove the size() == sizeof(uint32_t) in a future version (when we stop supporting upgrades from < 0.14.1) + if (db.ReadDataStream(k2, writeTimeDs) && writeTimeDs.size() == sizeof(uint32_t)) { + uint32_t writeTime; + writeTimeDs >> writeTime; + auto k5 = std::make_tuple(std::string("rs_t"), (uint32_t)htobe32(writeTime), recSig.llmqType, recSig.id); + batch.Erase(k5); + } + } + + hasSigForIdCache.erase(std::make_pair((Consensus::LLMQType)recSig.llmqType, recSig.id)); + hasSigForSessionCache.erase(signHash); + hasSigForHashCache.erase(recSig.GetHash()); +} + +void CRecoveredSigsDb::RemoveRecoveredSig(Consensus::LLMQType llmqType, const uint256& id) +{ + LOCK(cs); + CDBBatch batch(CLIENT_VERSION | ADDRV2_FORMAT); + RemoveRecoveredSig(batch, llmqType, id, true); + db.WriteBatch(batch); +} + void CRecoveredSigsDb::CleanupOldRecoveredSigs(int64_t maxAge) { std::unique_ptr pcursor(db.NewIterator()); @@ -279,25 +326,7 @@ void CRecoveredSigsDb::CleanupOldRecoveredSigs(int64_t maxAge) { LOCK(cs); for (auto& e : toDelete) { - CRecoveredSig recSig; - if (!ReadRecoveredSig(e.first, e.second, recSig)) { - continue; - } - - auto signHash = llmq::utils::BuildSignHash(recSig); - - auto k1 = std::make_tuple(std::string("rs_r"), recSig.llmqType, recSig.id); - auto k2 = std::make_tuple(std::string("rs_r"), recSig.llmqType, recSig.id, recSig.msgHash); - auto k3 = std::make_tuple(std::string("rs_h"), recSig.GetHash()); - auto k4 = std::make_tuple(std::string("rs_s"), signHash); - batch.Erase(k1); - batch.Erase(k2); - batch.Erase(k3); - batch.Erase(k4); - - hasSigForIdCache.erase(std::make_pair((Consensus::LLMQType)recSig.llmqType, recSig.id)); - hasSigForSessionCache.erase(signHash); - hasSigForHashCache.erase(recSig.GetHash()); + RemoveRecoveredSig(batch, e.first, e.second, false); if (batch.SizeEstimate() >= (1 << 24)) { db.WriteBatch(batch); @@ -641,6 +670,11 @@ void CSigningManager::ProcessRecoveredSig(NodeId nodeId, const CRecoveredSig& re } } +void CSigningManager::RemoveRecoveredSig(Consensus::LLMQType llmqType, const uint256& id) +{ + db.RemoveRecoveredSig(llmqType, id); +} + void CSigningManager::Cleanup() { int64_t now = GetTimeMillis(); diff --git a/src/llmq/quorums_signing.h b/src/llmq/quorums_signing.h index f90c740492536..0d9a91ab1ef2e 100644 --- a/src/llmq/quorums_signing.h +++ b/src/llmq/quorums_signing.h @@ -77,6 +77,7 @@ class CRecoveredSigsDb bool GetRecoveredSigByHash(const uint256& hash, CRecoveredSig& ret); bool GetRecoveredSigById(Consensus::LLMQType llmqType, const uint256& id, CRecoveredSig& ret); void WriteRecoveredSig(const CRecoveredSig& recSig); + void RemoveRecoveredSig(Consensus::LLMQType llmqType, const uint256& id); void CleanupOldRecoveredSigs(int64_t maxAge); @@ -89,6 +90,7 @@ class CRecoveredSigsDb private: bool ReadRecoveredSig(Consensus::LLMQType llmqType, const uint256& id, CRecoveredSig& ret); + void RemoveRecoveredSig(CDBBatch& batch, Consensus::LLMQType llmqType, const uint256& id, bool deleteTimeKey); }; class CRecoveredSigsListener @@ -132,6 +134,10 @@ class CSigningManager void ProcessMessage(CNode* pnode, const std::string& strCommand, CDataStream& vRecv, CConnman& connman); + // This is called when a recovered signature can be safely removed from the DB. This is only safe when some other + // mechanism prevents possible conflicts. As an example, ChainLocks prevent conflicts in confirmed TXs InstantSend votes + void RemoveRecoveredSig(Consensus::LLMQType llmqType, const uint256& id); + private: void ProcessMessageRecoveredSig(CNode* pfrom, const CRecoveredSig& recoveredSig, CConnman& connman); bool PreVerifyRecoveredSig(NodeId nodeId, const CRecoveredSig& recoveredSig, bool& retBan); From af7bb99ddc0977e7154700c08377b80980f2795d Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Mon, 12 Aug 2019 09:36:09 +0200 Subject: [PATCH 10/11] Re-verify invalid IS sigs when the active quorum set rotated (https://github.com/dashpay/dash/pull/3052) * Split ProcessPendingInstantSendLocks into two methods * Split SelectQuorumForSigning into SelectQuorumForSigning and GetActiveQuorumSet * Implement retrying of IS lock verification when the LLMQ active set rotates --- src/llmq/quorums_signing.cpp | 11 ++++++++--- src/llmq/quorums_signing.h | 2 ++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/llmq/quorums_signing.cpp b/src/llmq/quorums_signing.cpp index 40041bcd8dcf8..5609157ce9164 100644 --- a/src/llmq/quorums_signing.cpp +++ b/src/llmq/quorums_signing.cpp @@ -802,7 +802,7 @@ bool CSigningManager::GetVoteForId(Consensus::LLMQType llmqType, const uint256& return db.GetVoteForId(llmqType, id, msgHashRet); } -CQuorumCPtr CSigningManager::SelectQuorumForSigning(Consensus::LLMQType llmqType, int signHeight, const uint256& selectionHash) +std::vector CSigningManager::GetActiveQuorumSet(Consensus::LLMQType llmqType, int signHeight) { auto& llmqParams = Params().GetConsensus().llmqs.at(llmqType); size_t poolSize = (size_t)llmqParams.signingActiveQuorumCount; @@ -812,12 +812,17 @@ CQuorumCPtr CSigningManager::SelectQuorumForSigning(Consensus::LLMQType llmqType LOCK(cs_main); int startBlockHeight = signHeight - SIGN_HEIGHT_OFFSET; if (startBlockHeight > chainActive.Height()) { - return nullptr; + return {}; } pindexStart = chainActive[startBlockHeight]; } - auto quorums = quorumManager->ScanQuorums(llmqType, pindexStart, poolSize); + return quorumManager->ScanQuorums(llmqType, pindexStart, poolSize); +} + +CQuorumCPtr CSigningManager::SelectQuorumForSigning(Consensus::LLMQType llmqType, int signHeight, const uint256& selectionHash) +{ + auto quorums = GetActiveQuorumSet(llmqType, signHeight); if (quorums.empty()) { return nullptr; } diff --git a/src/llmq/quorums_signing.h b/src/llmq/quorums_signing.h index 0d9a91ab1ef2e..ac458a3b92d5e 100644 --- a/src/llmq/quorums_signing.h +++ b/src/llmq/quorums_signing.h @@ -160,6 +160,8 @@ class CSigningManager bool IsConflicting(Consensus::LLMQType llmqType, const uint256& id, const uint256& msgHash); bool HasVotedOnId(Consensus::LLMQType llmqType, const uint256& id); bool GetVoteForId(Consensus::LLMQType llmqType, const uint256& id, uint256& msgHashRet); + + std::vector GetActiveQuorumSet(Consensus::LLMQType llmqType, int signHeight); CQuorumCPtr SelectQuorumForSigning(Consensus::LLMQType llmqType, int signHeight, const uint256& selectionHash); // Verifies a recovered sig that was signed while the chain tip was at signedAtTip bool VerifyRecoveredSig(Consensus::LLMQType llmqType, int signedAtHeight, const uint256& id, const uint256& msgHash, const CBLSSignature& sig); From c7e2bebef16aacc965d549f46aacbfeec978502c Mon Sep 17 00:00:00 2001 From: UdjinM6 Date: Tue, 24 Sep 2019 12:38:50 +0300 Subject: [PATCH 11/11] scripted-diff: Refactor llmq type consensus param names (https://github.com/dashpay/dash/pull/3093) -BEGIN VERIFY SCRIPT- sed -i 's/llmqChainLocks/llmqTypeChainLocks/g' src/*.cpp src/*.h src/*/*.cpp src/*/*.h sed -i 's/llmqForInstantSend/llmqTypeInstantSend/g' src/*.cpp src/*.h src/*/*.cpp src/*/*.h -END VERIFY SCRIPT- --- src/chainparams.cpp | 6 +++--- src/consensus/params.h | 2 +- src/llmq/quorums_chainlocks.cpp | 4 ++-- src/llmq/quorums_signing.cpp | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/chainparams.cpp b/src/chainparams.cpp index 790a37089cb6a..9d7b7fa34da46 100644 --- a/src/chainparams.cpp +++ b/src/chainparams.cpp @@ -358,7 +358,7 @@ class CMainParams : public CChainParams nLLMQConnectionRetryTimeout = 60; - consensus.llmqChainLocks = Consensus::LLMQ_400_60; + consensus.llmqTypeChainLocks = Consensus::LLMQ_400_60; // Tier two nFulfilledRequestExpireTime = 60 * 60; // fulfilled requests expire in 1 hour @@ -505,7 +505,7 @@ class CTestNetParams : public CChainParams nLLMQConnectionRetryTimeout = 60; - consensus.llmqChainLocks = Consensus::LLMQ_400_60; + consensus.llmqTypeChainLocks = Consensus::LLMQ_400_60; // Tier two nFulfilledRequestExpireTime = 60 * 60; // fulfilled requests expire in 1 hour @@ -649,7 +649,7 @@ class CRegTestParams : public CChainParams consensus.llmqs[Consensus::LLMQ_TEST] = llmq_test; nLLMQConnectionRetryTimeout = 10; - consensus.llmqChainLocks = Consensus::LLMQ_TEST; + consensus.llmqTypeChainLocks = Consensus::LLMQ_TEST; // Tier two nFulfilledRequestExpireTime = 60 * 60; // fulfilled requests expire in 1 hour diff --git a/src/consensus/params.h b/src/consensus/params.h index fb93f00fbb1d1..44b00a9e091f4 100644 --- a/src/consensus/params.h +++ b/src/consensus/params.h @@ -278,7 +278,7 @@ struct Params { // LLMQ std::map llmqs; Optional GetLLMQParams(uint8_t llmqtype) const; - LLMQType llmqChainLocks; + LLMQType llmqTypeChainLocks; }; } // namespace Consensus diff --git a/src/llmq/quorums_chainlocks.cpp b/src/llmq/quorums_chainlocks.cpp index af480fdbdb754..811a694cdc105 100644 --- a/src/llmq/quorums_chainlocks.cpp +++ b/src/llmq/quorums_chainlocks.cpp @@ -109,7 +109,7 @@ void CChainLocksHandler::ProcessNewChainLock(NodeId from, const llmq::CChainLock uint256 requestId = ::SerializeHash(std::make_pair(CLSIG_REQUESTID_PREFIX, clsig.nHeight)); uint256 msgHash = clsig.blockHash; - if (!quorumSigningManager->VerifyRecoveredSig(Params().GetConsensus().llmqChainLocks, clsig.nHeight, requestId, msgHash, clsig.sig)) { + if (!quorumSigningManager->VerifyRecoveredSig(Params().GetConsensus().llmqTypeChainLocks, clsig.nHeight, requestId, msgHash, clsig.sig)) { LogPrintf("CChainLocksHandler::%s -- invalid CLSIG (%s), peer=%d\n", __func__, clsig.ToString(), from); if (from != -1) { LOCK(cs_main); @@ -270,7 +270,7 @@ void CChainLocksHandler::TrySignChainTip() lastSignedMsgHash = msgHash; } - quorumSigningManager->AsyncSignIfMember(Params().GetConsensus().llmqChainLocks, requestId, msgHash); + quorumSigningManager->AsyncSignIfMember(Params().GetConsensus().llmqTypeChainLocks, requestId, msgHash); } // WARNING: cs_main and cs should not be held! diff --git a/src/llmq/quorums_signing.cpp b/src/llmq/quorums_signing.cpp index 5609157ce9164..1816614823602 100644 --- a/src/llmq/quorums_signing.cpp +++ b/src/llmq/quorums_signing.cpp @@ -842,7 +842,7 @@ CQuorumCPtr CSigningManager::SelectQuorumForSigning(Consensus::LLMQType llmqType bool CSigningManager::VerifyRecoveredSig(Consensus::LLMQType llmqType, int signedAtHeight, const uint256& id, const uint256& msgHash, const CBLSSignature& sig) { - auto& llmqParams = Params().GetConsensus().llmqs.at(Params().GetConsensus().llmqChainLocks); + auto& llmqParams = Params().GetConsensus().llmqs.at(Params().GetConsensus().llmqTypeChainLocks); auto quorum = SelectQuorumForSigning(llmqParams.type, signedAtHeight, id); if (!quorum) {