From 0db7d83b76bb71b2f149916e3b050f9890a84be3 Mon Sep 17 00:00:00 2001 From: Greg Szabo <16846635+greg-szabo@users.noreply.github.com> Date: Mon, 18 Nov 2024 16:16:07 +0100 Subject: [PATCH 1/8] feat(ci): Codecov self-hosted test (#562) * Codecov self-hosted test * chore(ci): Enable Codecov Test Analytics feature for detecting flaky tests (#563) https://docs.codecov.com/docs/test-analytics * Update Codecov badges on README --------- Co-authored-by: Romain Ruetschi --- .github/workflows/coverage.yml | 22 ++++++++++++++++++---- README.md | 4 ++-- code/.config/nextest.toml | 6 ++++++ 3 files changed, 26 insertions(+), 6 deletions(-) create mode 100644 code/.config/nextest.toml diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index fa20e4007..2f024d638 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -43,28 +43,41 @@ jobs: with: toolchain: nightly components: llvm-tools-preview + - name: Install cargo-nextest + uses: taiki-e/install-action@cargo-nextest - name: Install cargo-llvm-cov uses: taiki-e/install-action@cargo-llvm-cov - name: Generate code coverage run: | - cargo llvm-cov test \ + cargo llvm-cov nextest \ --workspace \ --exclude malachite-test-mbt \ --ignore-filename-regex crates/cli \ --all-features \ - --jobs 1 \ + --no-capture \ --ignore-run-fail \ --lcov \ --output-path lcov.info - name: Generate text report run: cargo llvm-cov report - name: Upload coverage to Codecov - uses: codecov/codecov-action@v3 + uses: codecov/codecov-action@v4 + if: ${{ !cancelled() }} with: token: ${{ secrets.CODECOV_TOKEN }} files: code/lcov.info flags: integration fail_ci_if_error: false + url: https://codecov.informal.systems + - name: Upload test results to Codecov + if: ${{ !cancelled() }} + uses: codecov/test-results-action@v1 + with: + token: ${{ secrets.CODECOV_TOKEN }} + file: code/target/nextest/default/junit.xml + flags: integration + fail_ci_if_error: false + url: https://codecov.informal.systems mbt: name: MBT @@ -101,9 +114,10 @@ jobs: - name: Generate text report run: cargo llvm-cov report - name: Upload coverage to Codecov - uses: codecov/codecov-action@v3 + uses: codecov/codecov-action@v4 with: token: ${{ secrets.CODECOV_TOKEN }} files: code/lcov.info flags: mbt fail_ci_if_error: false + url: https://codecov.informal.systems diff --git a/README.md b/README.md index 7b22a6b33..d47adacd2 100644 --- a/README.md +++ b/README.md @@ -41,8 +41,8 @@ Unless required by applicable law or agreed to in writing, software distributed [quint-link]: https://github.com/informalsystems/malachite/actions/workflows/quint.yml [mbt-test-image]: https://github.com/informalsystems/malachite/actions/workflows/mbt.yml/badge.svg [mbt-test-link]: https://github.com/informalsystems/malachite/actions/workflows/mbt.yml -[coverage-image]: https://codecov.io/gh/informalsystems/malachite/graph/badge.svg?token=B9KY7B6DJF -[coverage-link]: https://codecov.io/gh/informalsystems/malachite +[coverage-image]: https://codecov.informal.systems/gh/informalsystems/malachite/graph/badge.svg?token=LO0NSEJ9FC +[coverage-link]: https://codecov.informal.systems/gh/informalsystems/malachite [license-image]: https://img.shields.io/badge/license-Apache_2.0-blue.svg [license-link]: https://github.com/informalsystems/hermes/blob/master/LICENSE [rustc-image]: https://img.shields.io/badge/Rust-stable-orange.svg diff --git a/code/.config/nextest.toml b/code/.config/nextest.toml new file mode 100644 index 000000000..a23314383 --- /dev/null +++ b/code/.config/nextest.toml @@ -0,0 +1,6 @@ +[profile.default] +retries = 1 +fail-fast = false + +[profile.default.junit] +path = "junit.xml" From d39ce993e61820a31d286e481313eedec8baac87 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 20 Nov 2024 03:58:36 -0500 Subject: [PATCH 2/8] chore(deps): bump serde from 1.0.214 to 1.0.215 in /code (#573) Bumps [serde](https://github.com/serde-rs/serde) from 1.0.214 to 1.0.215. - [Release notes](https://github.com/serde-rs/serde/releases) - [Commits](https://github.com/serde-rs/serde/compare/v1.0.214...v1.0.215) --- updated-dependencies: - dependency-name: serde dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- code/Cargo.lock | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/code/Cargo.lock b/code/Cargo.lock index 831a6535e..4134482ba 100644 --- a/code/Cargo.lock +++ b/code/Cargo.lock @@ -3857,18 +3857,18 @@ checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" [[package]] name = "serde" -version = "1.0.214" +version = "1.0.215" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f55c3193aca71c12ad7890f1785d2b73e1b9f63a0bbc353c08ef26fe03fc56b5" +checksum = "6513c1ad0b11a9376da888e3e0baa0077f1aed55c17f50e7b2397136129fb88f" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.214" +version = "1.0.215" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de523f781f095e28fa605cdce0f8307e451cc0fd14e2eb4cd2e98a355b147766" +checksum = "ad1e866f866923f252f05c889987993144fb74e722403468a4ebd70c3cd756c0" dependencies = [ "proc-macro2", "quote", From e70ed65f144d0db2c66c0c4e40782b3f800bfc61 Mon Sep 17 00:00:00 2001 From: Josef Widder <44643235+josef-widder@users.noreply.github.com> Date: Wed, 20 Nov 2024 10:43:34 +0100 Subject: [PATCH 3/8] Update README.md (#577) layout, minor changes. --- specs/quint/specs/reset/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/specs/quint/specs/reset/README.md b/specs/quint/specs/reset/README.md index 0edb1ab84..04b98de30 100644 --- a/specs/quint/specs/reset/README.md +++ b/specs/quint/specs/reset/README.md @@ -31,9 +31,9 @@ The following actions add faulty behaviors to the standard `step` action: - `L1ForkIDMonotonic`: L2 forkID in L1 blocks is non-decreasing - Proofs validation invariants - - `InvalidRegistrationProofRejectedInv`: If the latest block in L1 does not include a (valid) proof or the proof contains an invalid registration, then the proof should be rejected. We check that by attesting that L1's provenHeight remains unchanged (checked also for `--step "stepWithInvalidRegs"`) - - `OldProofRejectedInv`: L1 blocks should not accept proofs with non monotonically increasing proven L2 heights. As a consequence, the latest L2 proven height in L1 should remain unchanged with such a proof is submitted (checked also with `--step stepWithPotentiallyOldProofs`) - - `FutureProofRejectedInv`: If the proof starts from a block with height greater than provenHeight + 1 it is rejected. (checked also with `--step stepWithPotentiallyFutureProofs`) + - `InvalidRegistrationProofRejectedInv`: If the latest block in L1 does not include a (valid) proof or the proof contains an invalid registration, then the proof should be rejected. We check that by attesting that L1's provenHeight remains unchanged (checked also for `--step "stepWithInvalidRegs"`) + - `OldProofRejectedInv`: L1 blocks should not accept proofs with non monotonically increasing proven L2 heights. As a consequence, the latest L2 proven height in L1 should remain unchanged with such a proof is submitted (checked also with `--step stepWithPotentiallyOldProofs`) + - `FutureProofRejectedInv`: If the proof starts from a block with height greater than provenHeight + 1 it is rejected. (checked also with `--step stepWithPotentiallyFutureProofs`) - Local L2 invariants - `monotonicForkIDInv`: ForkID on L2 is non-decreasing From dbc32bc0c82ce00a27c0d61b361d784763459a3f Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 20 Nov 2024 04:53:58 -0500 Subject: [PATCH 4/8] chore(deps): bump axum from 0.7.7 to 0.7.9 in /code (#572) Bumps [axum](https://github.com/tokio-rs/axum) from 0.7.7 to 0.7.9. - [Release notes](https://github.com/tokio-rs/axum/releases) - [Changelog](https://github.com/tokio-rs/axum/blob/main/CHANGELOG.md) - [Commits](https://github.com/tokio-rs/axum/compare/axum-v0.7.7...axum-v0.7.9) --- updated-dependencies: - dependency-name: axum dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Anca Zamfir --- code/Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/code/Cargo.lock b/code/Cargo.lock index 4134482ba..8f5368527 100644 --- a/code/Cargo.lock +++ b/code/Cargo.lock @@ -278,9 +278,9 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" [[package]] name = "axum" -version = "0.7.7" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "504e3947307ac8326a5437504c517c4b56716c9d98fac0028c2acc7ca47d70ae" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" dependencies = [ "async-trait", "axum-core", From f84ecfaaf314bf377aa469650862a8db54b08957 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 20 Nov 2024 04:54:50 -0500 Subject: [PATCH 5/8] chore(deps): bump clap from 4.5.20 to 4.5.21 in /code (#570) Bumps [clap](https://github.com/clap-rs/clap) from 4.5.20 to 4.5.21. - [Release notes](https://github.com/clap-rs/clap/releases) - [Changelog](https://github.com/clap-rs/clap/blob/master/CHANGELOG.md) - [Commits](https://github.com/clap-rs/clap/compare/clap_complete-v4.5.20...clap_complete-v4.5.21) --- updated-dependencies: - dependency-name: clap dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Anca Zamfir --- code/Cargo.lock | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/code/Cargo.lock b/code/Cargo.lock index 8f5368527..f8016b17c 100644 --- a/code/Cargo.lock +++ b/code/Cargo.lock @@ -557,9 +557,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.20" +version = "4.5.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b97f376d85a664d5837dbae44bf546e6477a679ff6610010f17276f686d867e8" +checksum = "fb3b4b9e5a7c7514dfa52869339ee98b3156b0bfb4e8a77c4ff4babb64b1604f" dependencies = [ "clap_builder", "clap_derive", @@ -567,9 +567,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.20" +version = "4.5.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19bc80abd44e4bed93ca373a0704ccbd1b710dc5749406201bb018272808dc54" +checksum = "b17a95aa67cc7b5ebd32aa5370189aa0d79069ef1c64ce893bd30fb24bff20ec" dependencies = [ "anstream", "anstyle", From 36e130b57f35ce5c604d7c19a26b15aaae82f49c Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 20 Nov 2024 04:55:56 -0500 Subject: [PATCH 6/8] chore(deps): bump serde_json from 1.0.132 to 1.0.133 in /code (#571) Bumps [serde_json](https://github.com/serde-rs/json) from 1.0.132 to 1.0.133. - [Release notes](https://github.com/serde-rs/json/releases) - [Commits](https://github.com/serde-rs/json/compare/v1.0.132...v1.0.133) --- updated-dependencies: - dependency-name: serde_json dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- code/Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/code/Cargo.lock b/code/Cargo.lock index f8016b17c..9f54df8ee 100644 --- a/code/Cargo.lock +++ b/code/Cargo.lock @@ -3877,9 +3877,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.132" +version = "1.0.133" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03" +checksum = "c7fceb2473b9166b2294ef05efcb65a3db80803f0b03ef86a5fc88a2b85ee377" dependencies = [ "itoa", "memchr", From ba5d49c61af02e0b2b0057875550fc12190c17db Mon Sep 17 00:00:00 2001 From: Josef Widder <44643235+josef-widder@users.noreply.github.com> Date: Wed, 20 Nov 2024 11:01:20 +0100 Subject: [PATCH 7/8] feat(spec): refactor blocksync into two statemachines (#564) * refactor towards to mock * mock * refactor * consensus mocked * bluespec * a bit more refactor * typo * spec/quint: re-tabbing bsyncMock.qnt * spec/quint: re-tabbing client.qnt * spec/quint: re-tabbing server.qnt * spec/quint: re-tabbing syncStatemachine.qnt * spec/quint: re-tabbing bsyncWithConsensus.qnt * Apply suggestions from code review Co-authored-by: Daniel * fix * decide guarded by client height * Apply suggestions from code review Co-authored-by: Daniel --------- Co-authored-by: Daniel Cason --- specs/quint/specs/blocksync/bsyncMock.qnt | 95 +++++ .../specs/blocksync/bsyncWithConsensus.qnt | 148 ++++++++ specs/quint/specs/blocksync/client.qnt | 348 +++++++++--------- specs/quint/specs/blocksync/server.qnt | 236 ++++++------ specs/quint/specs/blocksync/sync.qnt | 254 ------------- .../specs/blocksync/syncStatemachine.qnt | 118 ++++++ 6 files changed, 659 insertions(+), 540 deletions(-) create mode 100644 specs/quint/specs/blocksync/bsyncMock.qnt create mode 100644 specs/quint/specs/blocksync/bsyncWithConsensus.qnt delete mode 100644 specs/quint/specs/blocksync/sync.qnt create mode 100644 specs/quint/specs/blocksync/syncStatemachine.qnt diff --git a/specs/quint/specs/blocksync/bsyncMock.qnt b/specs/quint/specs/blocksync/bsyncMock.qnt new file mode 100644 index 000000000..1aa090f35 --- /dev/null +++ b/specs/quint/specs/blocksync/bsyncMock.qnt @@ -0,0 +1,95 @@ +// -*- mode: Bluespec; -*- + +module bsyncMock { + + import blocksync.* from "./blocksync" + import bsync_client.* from "./client" + import bsync_server.* from "./server" + import syncStatemachine.* from "./syncStatemachine" + + val validators = Set("v1", "v2", "v3", "v4") + val Correct = Set("v2", "v3", "v4") + + type Output = { + v : Address, + out: BsyncClientOutput + } + + var outputs: List[Output] + var chains: Address -> List[BlockStoreEntry] + + /// initialize consensus and synchronizer + action initMockedConsensus = all { + outputs' = [], + chains' = Correct.mapBy(_ => []), + syncInit(validators) + } + + action unchangedMock = all { + outputs' = outputs, + chains' = chains, + } + + /// Consensus mocked logic decides the latest height. + /// The blockchain increases in size. + /// The server becomes aware of that once `updateServer` is invoked. + /// The client becomes aware of that once `newHeightAction` is invoked. + /// This is an abstract version. We just add a (mocked) block with the right + /// height on top of the chain. Because of this, implicitly all nodes will + /// decide on the same block + action decideMock(v) = all { + bsyncClients.get(v).height == chains.get(v).length(), + outputs' = outputs, + chains' = chains.put(v, chains.get(v).append({ + decision: mkProposal("", chains.get(v).length(), 0, "", -1), + commit: Set(), + })), + syncUnchangedAll, + } + + /// Environment sends the node to the next height. + /// This implicitly requires `decideMock(v)` to be previous executed. + action newHeightActionAll(v, valset, h) = all { + chains.get(v).length() == h, // precondition for calling this + newHeightActionSync(v, valset, h), + unchangedMock, + } + + /// Update server v from the consensus' blockchain + /// This abstracts as pull-based mechanism: the server consults the chain state. + action mockUpdateServer(v) = all { + all { + updateServer(v, chains.get(v)), + unchangedClient, + unchangedMock, + } + } + + action writeAction(v, so) = all { + outputs' = outputs.append({v: v, out: so}), + chains' = chains, + } + + action mockStep(v, act) = all { + act(v), + unchangedMock + } + + /// For simple repl evaluation + action bsyncStep (v, act) = all { + act(v), + unchangedMock + } + + /// main step function: either a consensus state-machine step or a sync protocol step + action stepMockedConsensus = + nondet v = oneOf(Correct) + any { + pureSyncStep(v, unchangedMock), + // consensus-specific steps + syncStepClient(v, writeAction), // is checking if there are responses and check whether it need to requ + mockUpdateServer(v), + newHeightActionAll(v, validators, chains.get(v).length()), + decideMock(v), + } +} diff --git a/specs/quint/specs/blocksync/bsyncWithConsensus.qnt b/specs/quint/specs/blocksync/bsyncWithConsensus.qnt new file mode 100644 index 000000000..64c1f911e --- /dev/null +++ b/specs/quint/specs/blocksync/bsyncWithConsensus.qnt @@ -0,0 +1,148 @@ +// -*- mode: Bluespec; -*- + +module bsyncWithConsensus { + + import blocksync.* from "./blocksync" + import bsync_client.* from "./client" + import bsync_server.* from "./server" + + import statemachineAsync( + validators = Set("v1", "v2", "v3", "v4"), + validatorSet = Set("v1", "v2", "v3", "v4").mapBy(x => 1), + Faulty = Set("v1"), + Values = Set("red", "blue"), + Rounds = Set(0, 1, 2, 3), + Heights = Set(0) // , 1, 2, 3) + ).* from "../statemachineAsync" + + import syncStatemachine.* from "./syncStatemachine" + + /// initialize consensus and block sync + action initAll = all { + init, + syncInit(validators) + } + + /// environment sends the node to the next height. + action newHeightActionAll(v, valset, h) = all { + system.get(v).es.chain.length() == h, // precondition for calling this + newHeightActionSync(v, valset, h), + newHeightAction(v, valset, h), + } + + // Update server v from the consensus' blockchain + action syncUpdateServer(v) = all { + val chain = system.get(v).es.chain + all { + updateServer(v, chain), + unchangedClient, + unchangedAll, + } + } + + // validator step in the consensus protocol, no changes to block sync + action syncValStep(v) = all { + valStep(v), + syncUnchangedAll + } + + /// main step function: either a consensus state-machine step or a block sync protocol step + action stepWithBlockSync = any { + // consensus takes a step + all { + step, + syncUnchangedAll + }, + // block sync takes a step + nondet v = oneOf(Correct) + any { + pureSyncStep(v, unchangedAll), + // Block sync client check if there are responses to apply to consensus + syncStepClient(v, putSyncOutputIntoNode), + syncUpdateServer(v), + }, + // a node goes to next height + nondet v = oneOf(Correct) + newHeightActionAll(v, validatorSet, system.get(v).es.chain.length()) + } + + // + // Interesting scenarios + // + + /// auxiliary function for initHeight + /// sets the chain + pure def setChain(s: DriverState, c: List[{decision: Proposal, commit: Set[Vote]}]): DriverState = + {... s, chain: c} + + /// auxiliary function for initHeight + /// constructs a commit certificate for a height and value + pure def commitSet (h: Height, v: Value) : Set[Vote] = + Set("v1", "v2", "v3").map(n => mkVote(Precommit, n, h, 0, v)) + + /// An action to set up an initial state with some nodes already decided up to height h + /// this sets up an initial state where v4 starts late, and v2 and v3 have reached + /// height h + action initHeight(h) = all { + val special = "v4" // TODO proper selection from correct set + val initsystem = Correct.mapBy(v => + // hack + if (v == special) initNode(v, validatorSet, 0) + else initNode(v, validatorSet, h)) + nondet decisionValMap = 0.to(h-1).setOfMaps(Values).oneOf() + val propMap = decisionValMap.keys().mapBy(i => + mkProposal( proposer(validatorSet, i,0), + i, + 0, + decisionValMap.get(i), + 0)) + val list = range(0, h).foldl(List(), (acc, i) => acc.append(propMap.get(i))) + val chain = list.foldl(List(), (acc, i) => acc.append({decision: i, commit: commitSet(i.height, Val(i.proposal))})) + all { + system' = initsystem.keys().mapBy(x => + // hack + if (x == special) initsystem.get(x) + else {... initsystem.get(x), es: setChain(initsystem.get(x).es, chain) }), + propBuffer' = Correct.mapBy(v => Set()), + voteBuffer' = Correct.mapBy(v => Set()), + certBuffer' = Correct.mapBy(v => Set()), + _hist' = { validator: "INIT", input: NoDInput, output: NoConsensusOutput }, + syncInit(validators) + } + } + + action syncStep = + nondet v = oneOf(Correct) + pureSyncStep(v, unchangedAll) + + /// a simple scenario where v4 starts height h + run syncCycle(h) = + newHeightActionAll("v4", validatorSet, h) + .then(all{unchangedAll, syncStatusStep("v2")}) + .then(all{unchangedAll, syncDeliverStatus("v4")}) + .then(syncStepClient("v4", putSyncOutputIntoNode)) // ask for certificate + .then(all{unchangedAll, syncDeliverReq("v2")}) + .then(all{unchangedAll, syncStepServer("v2")}) + .then(all{unchangedAll, syncDeliverResp("v4")}) + .then(syncStepClient("v4", putSyncOutputIntoNode)) // ask for block and give certificate to node + .expect(system.get("v4").incomingSyncCertificates.size() > 0) + .then(all{unchangedAll, syncDeliverReq("v2")}) + .then(all{unchangedAll, syncStepServer("v2")}) + .then(all{unchangedAll, syncDeliverResp("v4")}) + .then(syncStepClient("v4", putSyncOutputIntoNode)) + .expect(system.get("v4").incomingSyncProposals.size() > 0) + .then(3.reps(_ => syncValStep("v4"))) + .expect(system.get("v4").es.chain.length() > h) + + run retreat = + nondet heightToReach = 1.to(4).oneOf() + initHeight( q::debug ("Height:", heightToReach)) + // FIXME: I had to put it here, instead of syncCycle(h) + // This is not ideal, but it works. + .then(syncUpdateServer("v2")) + .then(heightToReach.reps(i => syncCycle(i))) + .expect(system.get("v4").es.chain == system.get("v2").es.chain) + .then(newHeightActionAll("v4", validatorSet, heightToReach)) + .expect(system.get("v4").es.cs.height == system.get("v2").es.cs.height) + // and now v4 has synced ! +} diff --git a/specs/quint/specs/blocksync/client.qnt b/specs/quint/specs/blocksync/client.qnt index 84215d511..2bd1db37c 100644 --- a/specs/quint/specs/blocksync/client.qnt +++ b/specs/quint/specs/blocksync/client.qnt @@ -3,173 +3,185 @@ // Blocksync protocol: client side. module bsync_client { - import blocksync.* from "./blocksync" - - /// The state of the synchronizer - type BsyncClient = { - id: Address, - - peerStatus: Address -> BlockRange, - openRequests: Set[RequestMsg], - - height: Height, - lastSyncedHeight: Height, // "done" if greater than or equal to height - // TODO: we could add buffers for certificates and values - // inbuffers - statusMsgs: Set[StatusMsg], - responseMsgs: Set[ResponseMsg], - } - - type BsyncClientOutput = - | SOCertificate(Set[Vote]) - | SOBlock(Proposal) - | SONoOutput - - // - // BsyncClient functions - // - - /// Initialize the synchronizer - pure def initBsyncClient(id: Address, peers: Set[Address]) : BsyncClient = - { - id: id, - peerStatus: peers.mapBy(x => {base:-1, top:-1}), - openRequests: Set(), - height: -1, - lastSyncedHeight: -1, - statusMsgs: Set(), - responseMsgs: Set(), - } - - /// Auxiliary function to iterate over the received status messages - pure def updatePeerStatus (ps: Address -> BlockRange, msgs: Set[StatusMsg]) : Address -> BlockRange = - msgs.fold(ps, (newStatus , msg) => - if (newStatus.get(msg.peer).top < msg.top) // TODO: think about base? - newStatus.put(msg.peer, {base: msg.base, top: msg.top}) - else - newStatus - ) - - /// inform the synchronizer that consensus has entered height h - pure def syncNewHeight (s: BsyncClient, h: Height) : BsyncClient = - if (h <= s.height) - s - else - s.with("height", h) - - /// returned by the synchronizer: sync is the new state, so is the output towards - /// the consensus driver, req are messages sent towards peers/servers - type ClientResult = { - sync: BsyncClient, - so: BsyncClientOutput, - req: Option[RequestMsg] - } - - /// We have received a certificate. now we need to issue the - /// corresponding block request and generate a certificate output. - pure def syncHandleCertificate (s: BsyncClient, cert: Set[Vote], peer: str ) : ClientResult = - val blockReq = { client: s.id, - server: peer, - rtype: SyncBlock, - height: s.height} - { sync: {...s, openRequests: Set(blockReq)}, // If we have parallelization we need to be more precise here - so: SOCertificate(cert), - req: Some(blockReq)} - - /// we have received a block. now we need to generate a block output - pure def syncHandleBlock (s: BsyncClient, p: Proposal) : ClientResult = - { sync: {...s, openRequests: Set(), // If we have parallelization we need to remove one element, - lastSyncedHeight: s.height }, // blockheight, - so: SOBlock(p), - req: None} - - /// step of a client: - /// 1. update peer statuses, 2. if there is no open request, request something - /// 3. otherwise check whether we have a response and act accordingly - pure def bsyncClient (s: BsyncClient) : ClientResult = - val newPeerStates = updatePeerStatus(s.peerStatus, s.statusMsgs) - val newS = { ...s, peerStatus: newPeerStates} - if (s.lastSyncedHeight >= s.height) - // nothing to do - { sync: newS, - so: SONoOutput, - req: None} - else - val goodPeers = s.peerStatus.keys().filter(p => newPeerStates.get(p).base <= s.height - and s.height <= newPeerStates.get(p).top ) - if (goodPeers.size() > 0) - if (s.openRequests.size() == 0) - // we start the sync "round" by asking for a certificate - val req = { client: s.id, - server: goodPeers.fold("", (acc, i) => i), //chooseSome(), - rtype: SyncCertificate, - height: s.height} - { sync: {... newS, openRequests: s.openRequests.union(Set(req))}, - so: SONoOutput, - req: Some(req) - } - else - // we issued a request before, let's see whether there is a response - if (s.responseMsgs.size()> 0) - val resp = s.responseMsgs.fold(emptyResponseMsg, (acc, i) => i) //chooseSome() // in the future there might be parallelization - match resp.response { - | RespBlock(prop) => syncHandleBlock(newS, prop) - | RespCertificate(cert) => syncHandleCertificate(newS, cert, goodPeers.fold("", (s,x) => x)) - } - else - // I don't have response - // this might be timeout logic - { sync: newS, - so: SONoOutput, - req: None} - else - // no peers - { sync: newS, - so: SONoOutput, - req: None} - - // State machine - - var bsyncClients: Address -> BsyncClient - - action initClient(nodes) = all { - bsyncClients' = nodes.mapBy(v => initBsyncClient(v, nodes.exclude(Set(v)))), - } - - action unchangedClient = all { - bsyncClients' = bsyncClients, - } - - action newHeightClient(node, h) = all { - bsyncClients' = bsyncClients.put(node, syncNewHeight(bsyncClients.get(node), h)), - } - - // deliver a status message, from the statusBuffer, to node - action deliverStatus(node) = all { - statusBuffer.get(node).size() > 0, - val client = bsyncClients.get(node) - nondet msg = statusBuffer.get(node).oneOf() - all { - bsyncClients' = bsyncClients.put(node, {... client, - statusMsgs: client.statusMsgs.union(Set(msg))}), - statusBuffer' = statusBuffer.put(node, statusBuffer.get(node).exclude(Set(msg))), - requestsBuffer' = requestsBuffer, - responsesBuffer' = responsesBuffer, - } - } - - // deliver a response message, from responsesBuffer, to node - action deliverResponse(node) = all { - responsesBuffer.get(node).size() > 0, - val client = bsyncClients.get(node) - nondet msg = responsesBuffer.get(node).oneOf() - all { - bsyncClients' = bsyncClients.put(node, {... client, - responseMsgs: client.responseMsgs.union(Set(msg))}), - requestsBuffer' = requestsBuffer, - responsesBuffer' = responsesBuffer.put(node, responsesBuffer.get(node).exclude(Set(msg))), - statusBuffer' = statusBuffer, - } - } + import blocksync.* from "./blocksync" + + /// The state of the synchronizer + type BsyncClient = { + id: Address, + + peerStatus: Address -> BlockRange, + openRequests: Set[RequestMsg], + + height: Height, + lastSyncedHeight: Height, // "done" if greater than or equal to height + // TODO: we could add buffers for certificates and values + // inbuffers + statusMsgs: Set[StatusMsg], + responseMsgs: Set[ResponseMsg], + } + + type BsyncClientOutput = + | SOCertificate(Set[Vote]) + | SOBlock(Proposal) + | SONoOutput + + // + // BsyncClient functions + // + + /// Initialize the synchronizer + pure def initBsyncClient(id: Address, peers: Set[Address]) : BsyncClient = + { + id: id, + peerStatus: peers.mapBy(x => {base:-1, top:-1}), + openRequests: Set(), + height: -1, + lastSyncedHeight: -1, + statusMsgs: Set(), + responseMsgs: Set(), + } + + /// Auxiliary function to iterate over the received status messages + pure def updatePeerStatus (ps: Address -> BlockRange, msgs: Set[StatusMsg]) : Address -> BlockRange = + msgs.fold(ps, (newStatus , msg) => + if (newStatus.get(msg.peer).top < msg.top) // TODO: think about base? + newStatus.put(msg.peer, {base: msg.base, top: msg.top}) + else + newStatus + ) + + /// inform the synchronizer that consensus has entered height h + pure def syncNewHeight (s: BsyncClient, h: Height) : BsyncClient = + if (h <= s.height) + s + else + s.with("height", h) + + /// returned by the synchronizer: sync is the new state, so is the output towards + /// the consensus driver, req are messages sent towards peers/servers + type ClientResult = { + sync: BsyncClient, + so: BsyncClientOutput, + req: Option[RequestMsg] + } + + + + /// We have received a certificate. now we need to issue the + /// corresponding block request and generate a certificate output. + pure def syncHandleCertificate (s: BsyncClient, cert: Set[Vote], peer: str ) : ClientResult = + val blockReq = { client: s.id, + server: peer, + rtype: SyncBlock, + height: s.height} + { sync: {...s, openRequests: Set(blockReq)}, // If we have parallelization we need to be more precise here + so: SOCertificate(cert), + req: Some(blockReq)} + + /// we have received a block. now we need to generate a block output + pure def syncHandleBlock (s: BsyncClient, p: Proposal) : ClientResult = + { sync: {...s, openRequests: Set(), // If we have parallelization we need to remove one element, + lastSyncedHeight: s.height }, // blockheight, + so: SOBlock(p), + req: None} + + /// step of a client: + /// 1. update peer statuses, 2. if there is no open request, request something + /// 3. otherwise check whether we have a response and act accordingly + pure def bsyncClient (s: BsyncClient) : ClientResult = + val newPeerStates = updatePeerStatus(s.peerStatus, s.statusMsgs) + val newS = { ...s, peerStatus: newPeerStates} + if (s.lastSyncedHeight >= s.height) + // nothing to do + { sync: newS, + so: SONoOutput, + req: None} + else + val goodPeers = s.peerStatus.keys().filter(p => newPeerStates.get(p).base <= s.height + and s.height <= newPeerStates.get(p).top ) + if (goodPeers.size() > 0) + if (s.openRequests.size() == 0) + // we start the sync "round" by asking for a certificate + val req = { client: s.id, + server: goodPeers.fold("", (acc, i) => i), //chooseSome(), + rtype: SyncCertificate, + height: s.height} + { sync: {... newS, openRequests: s.openRequests.union(Set(req))}, + so: SONoOutput, + req: Some(req) + } + else + // we issued a request before, let's see whether there is a response + if (s.responseMsgs.size()> 0) + val resp = s.responseMsgs.fold(emptyResponseMsg, (acc, i) => i) //chooseSome() // in the future there might be parallelization + match resp.response { + | RespBlock(prop) => syncHandleBlock(newS, prop) + | RespCertificate(cert) => syncHandleCertificate(newS, cert, goodPeers.fold("", (s,x) => x)) + } + else + // I don't have response + // this might be timeout logic + { sync: newS, + so: SONoOutput, + req: None} + else + // no peers + { sync: newS, + so: SONoOutput, + req: None} + + // State machine + + var bsyncClients: Address -> BsyncClient + + action initClient(nodes) = all { + bsyncClients' = nodes.mapBy(v => initBsyncClient(v, nodes.exclude(Set(v)))), + } + + action unchangedClient = all { + bsyncClients' = bsyncClients, + } + + action newHeightClient(node, h) = all { + bsyncClients' = bsyncClients.put(node, syncNewHeight(bsyncClients.get(node), h)), + } + + // deliver a status message, from the statusBuffer, to node + action deliverStatus(node) = all { + statusBuffer.get(node).size() > 0, + val client = bsyncClients.get(node) + nondet msg = statusBuffer.get(node).oneOf() + all { + bsyncClients' = bsyncClients.put(node, {... client, + statusMsgs: client.statusMsgs.union(Set(msg))}), + statusBuffer' = statusBuffer.put(node, statusBuffer.get(node).exclude(Set(msg))), + requestsBuffer' = requestsBuffer, + responsesBuffer' = responsesBuffer, + } + } + + // deliver a response message, from responsesBuffer, to node + action deliverResponse(node) = all { + responsesBuffer.get(node).size() > 0, + val client = bsyncClients.get(node) + nondet msg = responsesBuffer.get(node).oneOf() + all { + bsyncClients' = bsyncClients.put(node, {... client, + responseMsgs: client.responseMsgs.union(Set(msg))}), + requestsBuffer' = requestsBuffer, + responsesBuffer' = responsesBuffer.put(node, responsesBuffer.get(node).exclude(Set(msg))), + statusBuffer' = statusBuffer, + } + } + + action applyClientUpdate(v, result) = all { + bsyncClients' = bsyncClients.put(v, result.sync), + statusBuffer' = statusBuffer, + requestsBuffer' = match result.req { + | Some(m) => requestsBuffer.put(m.server, requestsBuffer.get(m.server).union(Set(m))) + | None => requestsBuffer + }, + responsesBuffer' = responsesBuffer, + } } diff --git a/specs/quint/specs/blocksync/server.qnt b/specs/quint/specs/blocksync/server.qnt index 5d6c359da..5890a4ec5 100644 --- a/specs/quint/specs/blocksync/server.qnt +++ b/specs/quint/specs/blocksync/server.qnt @@ -3,123 +3,123 @@ // Blocksync protocol: server side. module bsync_server { - import blocksync.* from "./blocksync" - - type Server = { - id: Address, - - chain: List[BlockStoreEntry], - - // Incoming requests - requestMsgs: Set[RequestMsg] - } - - pure def newServer(addr: Address) : Server = { - id: addr, - chain: List(), - requestMsgs: Set() - } - - // generate a status message - pure def syncStatus (server: Server) : StatusMsg = - // TODO: perhaps we should add to height to the chain entries to capture non-zero bases - { peer: server.id , base: 0, top: server.chain.length() - 1 } - - /// new server state and response messages to be sent - type ServerOutput = { - sync: Server, - msg: Option[ResponseMsg], - } - - // main method: respond to incoming request, if any - pure def syncServer (s: Server) : ServerOutput = - if (s.requestMsgs.size() > 0) - val m = s.requestMsgs.fold(emptyReqMsg, (acc, i) => i) // chooseSome() // TODO: fix - val result = - if (m.height < s.chain.length()) - match m.rtype { - | SyncCertificate => - val cm = { client: m.client, - server: s.id, - height: m.height, - response: RespCertificate(s.chain[m.height].commit)} - Some(cm) - | SyncBlock => - val bl = { client: m.client, - server: s.id, - height: m.height, - response: RespBlock(s.chain[m.height].decision)} - Some(bl) - } - else None - { sync: { ...s, requestMsgs: s.requestMsgs.exclude(Set(m))}, - msg: result} - else { - sync: s, - msg: None} - - // State machine - - var bsyncServers: Address -> Server - - action initServer(nodes) = all { - bsyncServers' = nodes.mapBy(v => newServer(v)), - } - - action unchangedServer = all { - bsyncServers' = bsyncServers, - } - - // Deliver a request message, from requestsBuffer, to node - action deliverRequest(node) = all { - requestsBuffer.get(node).size() > 0, - val server = bsyncServers.get(node) - nondet msg = requestsBuffer.get(node).oneOf() - all { - bsyncServers' = bsyncServers.put(node, {... server, - requestMsgs: server.requestMsgs.union(Set(msg))}), - statusBuffer' = statusBuffer, - requestsBuffer' = requestsBuffer.put(node, requestsBuffer.get(node).exclude(Set(msg))), - responsesBuffer' = responsesBuffer, - } - } - - // Server at node broadcasts its blockchain status - action broadcastStatus(node) = all { - val server = bsyncServers.get(node) - val msg = server.syncStatus() - all { - bsyncServers' = bsyncServers.put(node, server), - statusBuffer' = broadcastStatusMsg(statusBuffer, msg), - requestsBuffer' = requestsBuffer, - responsesBuffer' = responsesBuffer, - } - } - - // Server at node takes a step (checking for requests and responding) - action stepServer(node) = all { - val server = bsyncServers.get(node) - val result = server.syncServer() - all { - bsyncServers' = bsyncServers.put(node, server), - statusBuffer' = statusBuffer, - requestsBuffer' = requestsBuffer, - responsesBuffer' = match result.msg { - | Some(m) => responsesBuffer.sendResponse(m) - | None => responsesBuffer - }, - } - } - - // Updates the server status, the latest available blockchain content. - // This action must be called by the component that knows the chain. - action updateServer(node, chain) = all { - val server = bsyncServers.get(node) - all { - chain != server.chain, - bsyncServers' = bsyncServers.put(node, { ...server, chain: chain }), - unchangedBsync, - } - } + import blocksync.* from "./blocksync" + + type Server = { + id: Address, + + chain: List[BlockStoreEntry], + + // Incoming requests + requestMsgs: Set[RequestMsg] + } + + pure def newServer(addr: Address) : Server = { + id: addr, + chain: List(), + requestMsgs: Set() + } + + // generate a status message + pure def syncStatus (server: Server) : StatusMsg = + // TODO: perhaps we should add to height to the chain entries to capture non-zero bases + { peer: server.id , base: 0, top: server.chain.length() - 1 } + + /// new server state and response messages to be sent + type ServerOutput = { + sync: Server, + msg: Option[ResponseMsg], + } + + // main method: respond to incoming request, if any + pure def syncServer (s: Server) : ServerOutput = + if (s.requestMsgs.size() > 0) + val m = s.requestMsgs.fold(emptyReqMsg, (acc, i) => i) // chooseSome() // TODO: fix + val result = + if (m.height < s.chain.length()) + match m.rtype { + | SyncCertificate => + val cm = { client: m.client, + server: s.id, + height: m.height, + response: RespCertificate(s.chain[m.height].commit)} + Some(cm) + | SyncBlock => + val bl = { client: m.client, + server: s.id, + height: m.height, + response: RespBlock(s.chain[m.height].decision)} + Some(bl) + } + else None + { sync: { ...s, requestMsgs: s.requestMsgs.exclude(Set(m))}, + msg: result} + else { + sync: s, + msg: None} + + // State machine + + var bsyncServers: Address -> Server + + action initServer(nodes) = all { + bsyncServers' = nodes.mapBy(v => newServer(v)), + } + + action unchangedServer = all { + bsyncServers' = bsyncServers, + } + + // Deliver a request message, from requestsBuffer, to node + action deliverRequest(node) = all { + requestsBuffer.get(node).size() > 0, + val server = bsyncServers.get(node) + nondet msg = requestsBuffer.get(node).oneOf() + all { + bsyncServers' = bsyncServers.put(node, {... server, + requestMsgs: server.requestMsgs.union(Set(msg))}), + statusBuffer' = statusBuffer, + requestsBuffer' = requestsBuffer.put(node, requestsBuffer.get(node).exclude(Set(msg))), + responsesBuffer' = responsesBuffer, + } + } + + // Server at node broadcasts its blockchain status + action broadcastStatus(node) = all { + val server = bsyncServers.get(node) + val msg = server.syncStatus() + all { + bsyncServers' = bsyncServers.put(node, server), + statusBuffer' = broadcastStatusMsg(statusBuffer, msg), + requestsBuffer' = requestsBuffer, + responsesBuffer' = responsesBuffer, + } + } + + // Server at node takes a step (checking for requests and responding) + action stepServer(node) = all { + val server = bsyncServers.get(node) + val result = server.syncServer() + all { + bsyncServers' = bsyncServers.put(node, server), + statusBuffer' = statusBuffer, + requestsBuffer' = requestsBuffer, + responsesBuffer' = match result.msg { + | Some(m) => responsesBuffer.sendResponse(m) + | None => responsesBuffer + }, + } + } + + // Updates the server status, the latest available blockchain content. + // This action must be called by the component that knows the chain. + action updateServer(node, chain) = all { + val server = bsyncServers.get(node) + all { + chain != server.chain, + bsyncServers' = bsyncServers.put(node, { ...server, chain: chain }), + unchangedBsync, + } + } } diff --git a/specs/quint/specs/blocksync/sync.qnt b/specs/quint/specs/blocksync/sync.qnt deleted file mode 100644 index 3c64d9786..000000000 --- a/specs/quint/specs/blocksync/sync.qnt +++ /dev/null @@ -1,254 +0,0 @@ -// -*- mode: Bluespec; -*- - -module sync { - - // General definitions - import blocksync.* from "./blocksync" - - import statemachineAsync( - validators = Set("v1", "v2", "v3", "v4"), - validatorSet = Set("v1", "v2", "v3", "v4").mapBy(x => 1), - Faulty = Set("v1"), - Values = Set("red", "blue"), - Rounds = Set(0, 1, 2, 3), - Heights = Set(0) // , 1, 2, 3) - ).* from "../statemachineAsync" - - import bsync_client.* from "./client" - import bsync_server.* from "./server" - - // **************************************************************************** - // State machine - // **************************************************************************** - // - // The statemachine is put on top of statemachineAsync, that is, we use its - // initialization and steps, and add the updates to the variables defined below - // - - /// initializing the variables of the sync part of the state machine - action syncInit = all { - initClient(validators), - initServer(validators), - initBsync(validators), - } - - action syncUnchangedAll = all { - unchangedServer, - unchangedClient, - unchangedBsync, - } - - /// initialize consensus and synchronizer - action initAll = all { - init, - syncInit - } - - // - // Actions for the Environment to send a node to a new height - // - - /// environment sends the node to the next height. - /// initializes synchronizer - action newHeightActionSync(v, valset, h) = all { - newHeightClient(v, h), - unchangedBsync, - unchangedServer, - } - - /// environment sends the node to the next height. - action newHeightActionAll(v, valset, h) = all { - newHeightActionSync(v, valset, h), - newHeightAction(v, valset, h), - } - - // - // Actions for process steps in the sync protocol - // - - // Update server v from the consensus' blockchain - action syncUpdateServer(v) = all { - val chain = system.get(v).es.chain - all { - updateServer(v, chain), - unchangedClient, - unchangedAll, - } - } - - // Server v announces its status - action syncStatusStep(v) = all { - all { - broadcastStatus(v), - unchangedClient, - unchangedAll, - } - } - - // Server v takes a step (checking for requests and responding) - action syncStepServer(v) = all { - all { - stepServer(v), - unchangedClient, - unchangedAll, - } - } - - // Client v takes a step - // - // FIXME: here we need to apply the client output to the consensus - // state machine. Refactoring this method is much more complex. - // - action syncStepClient(v) = all { - val result = bsyncClient(bsyncClients.get(v)) - all { - // TODO: this block should be implemented in client.qnt - bsyncClients' = bsyncClients.put(v, result.sync), - statusBuffer' = statusBuffer, - requestsBuffer' = match result.req { - | Some(m) => requestsBuffer.put(m.server, requestsBuffer.get(m.server).union(Set(m))) - | None => requestsBuffer - }, - responsesBuffer' = responsesBuffer, - - unchangedServer, - - // This is the part that interacts with the consensus state - // machine. Moreover, the logic's code should be here, but due - // to Quint issues we need to keep it in stateMachineAsync. - putSyncOutputIntoNode(v, result.so), - } - } - - // - // Actions for the environment to deliver messages in the sync protocol - // Implemented by the blocksync server or client. - // - - // deliver a status message to client v - action syncDeliverStatus(v) = all { - deliverStatus(v), - unchangedServer, - unchangedAll - } - - // deliver a response to client v - action syncDeliverResp(v) = all { - deliverResponse(v), - unchangedServer, - unchangedAll - } - - // deliver a request to server v - action syncDeliverReq(v) = all { - deliverRequest(v), - unchangedClient, - unchangedAll - } - - // - // Complex actions for a validator: consensus and blocksync - // - - // validator step in the system with sync protocol - action syncValStep(v) = all { - valStep(v), - syncUnchangedAll - } - - /// main step function: either a consensus state-machine step or a sync protocol step - action stepWithBlockSync = any { - all { - step, - syncUnchangedAll - }, - nondet v = oneOf(Correct) - any { - syncDeliverReq(v), - syncDeliverResp(v), - syncDeliverStatus(v), - syncUpdateServer(v), // update the server with the latest state of the chain - syncStepServer(v), // looking for a request and sends responses - syncStepClient(v), // is checking if there are responses and check whether it need to requ - syncStatusStep(v), - //syncTimeout(v) // TODO: - } - } - - // - // Interesting scenarios - // - - /// auxiliary function for initHeight - /// sets the chain - pure def setChain(s: DriverState, c: List[{decision: Proposal, commit: Set[Vote]}]): DriverState = - {... s, chain: c} - - /// auxiliary function for initHeight - /// constructs a commit certificate for a height and value - pure def commitSet (h: Height, v: Value) : Set[Vote] = - Set("v1", "v2", "v3").map(n => mkVote(Precommit, n, h, 0, v)) - - /// An action to set up an initial state with some nodes already decided up to height h - /// this sets up an initial state where v4 starts late, and v2 and v3 have reached - /// height h - action initHeight(h) = all { - val special = "v4" // TODO proper selection from correct set - val initsystem = Correct.mapBy(v => - // hack - if (v == special) initNode(v, validatorSet, 0) - else initNode(v, validatorSet, h)) - nondet decisionValMap = 0.to(h-1).setOfMaps(Values).oneOf() - val propMap = decisionValMap.keys().mapBy(i => - mkProposal( proposer(validatorSet, i,0), - i, - 0, - decisionValMap.get(i), - 0)) - val list = range(0, h).foldl(List(), (acc, i) => acc.append(propMap.get(i))) - val chain = list.foldl(List(), (acc, i) => acc.append({decision: i, commit: commitSet(i.height, Val(i.proposal))})) - all { - system' = initsystem.keys().mapBy(x => - // hack - if (x == special) initsystem.get(x) - else {... initsystem.get(x), es: setChain(initsystem.get(x).es, chain) }), - propBuffer' = Correct.mapBy(v => Set()), - voteBuffer' = Correct.mapBy(v => Set()), - certBuffer' = Correct.mapBy(v => Set()), - _hist' = { validator: "INIT", input: NoDInput, output: NoConsensusOutput }, - syncInit - } - } - - /// a simple scenario where v4 starts height h - run syncCycle(h) = - newHeightActionAll("v4", validatorSet, h) - .then(syncStatusStep("v2")) - .then(syncDeliverStatus("v4")) - .then(syncStepClient("v4")) // ask for certificate - .then(syncDeliverReq("v2")) - .then(syncStepServer("v2")) - .then(syncDeliverResp("v4")) - .then(syncStepClient("v4")) // ask for block and give certificate to node - .expect(system.get("v4").incomingSyncCertificates.size() > 0) - .then(syncDeliverReq("v2")) - .then(syncStepServer("v2")) - .then(syncDeliverResp("v4")) - .then(syncStepClient("v4")) - .expect(system.get("v4").incomingSyncProposals.size() > 0) - .then(3.reps(_ => syncValStep("v4"))) - .expect(system.get("v4").es.chain.length() > h) - - run retreat = - nondet heightToReach = 1.to(4).oneOf() - initHeight( q::debug ("Height:", heightToReach)) - // FIXME: I had to put it here, instead of syncCycle(h) - // This is not ideal, but it works. - .then(syncUpdateServer("v2")) - .then(heightToReach.reps(i => syncCycle(i))) - .expect(system.get("v4").es.chain == system.get("v2").es.chain) - .then(newHeightActionAll("v4", validatorSet, heightToReach)) - .expect(system.get("v4").es.cs.height == system.get("v2").es.cs.height) - // and now v4 has synced ! - -} diff --git a/specs/quint/specs/blocksync/syncStatemachine.qnt b/specs/quint/specs/blocksync/syncStatemachine.qnt new file mode 100644 index 000000000..252779ce1 --- /dev/null +++ b/specs/quint/specs/blocksync/syncStatemachine.qnt @@ -0,0 +1,118 @@ +// -*- mode: Bluespec; -*- + +module syncStatemachine { + + // General definitions + import blocksync.* from "./blocksync" + + import bsync_client.* from "./client" + import bsync_server.* from "./server" + + // **************************************************************************** + // State machine + // **************************************************************************** + // + // The statemachine is put on top of statemachineAsync, that is, we use its + // initialization and steps, and add the updates to the variables defined below + // + + /// initializing the variables of the sync part of the state machine + action syncInit(validators) = all { + initClient(validators), + initServer(validators), + initBsync(validators), + } + + action syncUnchangedAll = all { + unchangedServer, + unchangedClient, + unchangedBsync, + } + + // + // Actions for the Environment to send a node to a new height + // + + /// environment sends the node to the next height. + /// initializes client + action newHeightActionSync(v, valset, h) = all { + newHeightClient(v, h), + unchangedBsync, + unchangedServer, + } + + // + // Actions for process steps in the sync protocol + // + + // Server v announces its status + action syncStatusStep(v) = all { + all { + broadcastStatus(v), + unchangedClient, + } + } + + // Server v takes a step (checking for requests and responding) + action syncStepServer(v) = all { + all { + stepServer(v), + unchangedClient, + } + } + + // + // Actions for the environment to deliver messages in the sync protocol + // Implemented by the blocksync server or client. + // + + // deliver a status message to client v + action syncDeliverStatus(v) = all { + deliverStatus(v), + unchangedServer, + } + + // deliver a response to client v + action syncDeliverResp(v) = all { + deliverResponse(v), + unchangedServer, + } + + // deliver a request to server v + action syncDeliverReq(v) = all { + deliverRequest(v), + unchangedClient, + } + + /// any blocksync step independent of consensus. unchange is an action + /// that can be used for composition + action pureSyncStep (v, unchange) = all { + any { + syncDeliverReq(v), + syncDeliverResp(v), + syncDeliverStatus(v), + syncStepServer(v), + syncStatusStep(v), + }, + unchange + } + + + // Client v takes a step + action syncStepClient(v, outputAction) = all { + val result = bsyncClient(bsyncClients.get(v)) + all { + // updates the client's state + applyClientUpdate(v, result), + // This is the part that interacts with the consensus. + // It can be the driver or the mocked consensus logic. + outputAction(v, result.so), + unchangedServer, + } + } + + + + + +} From 6a744fb8cc684f357067cadbfde0e5fc2cebc1c7 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Wed, 20 Nov 2024 11:40:27 +0100 Subject: [PATCH 8/8] chore(code/blockstore): Store blocks and certificates in separate tables (#565) --- code/crates/starknet/host/src/block_store.rs | 76 +++++++++++--------- code/crates/starknet/host/src/codec.rs | 4 +- 2 files changed, 43 insertions(+), 37 deletions(-) diff --git a/code/crates/starknet/host/src/block_store.rs b/code/crates/starknet/host/src/block_store.rs index 8884fa912..73881a1d7 100644 --- a/code/crates/starknet/host/src/block_store.rs +++ b/code/crates/starknet/host/src/block_store.rs @@ -2,7 +2,6 @@ use std::ops::RangeBounds; use std::path::Path; use std::sync::Arc; -use malachite_blocksync::SyncedBlock; use malachite_common::CommitCertificate; use malachite_proto::Protobuf; @@ -10,7 +9,7 @@ use prost::Message; use redb::ReadableTable; use thiserror::Error; -use crate::codec::{decode_sync_block, encode_synced_block}; +use crate::codec; use crate::proto::{self as proto, Error as ProtoError}; use crate::types::MockContext; use crate::types::{Block, Height, Transaction, Transactions}; @@ -21,27 +20,14 @@ pub struct DecidedBlock { pub certificate: CommitCertificate, } -impl DecidedBlock { - fn into_bytes(self) -> Result, ProtoError> { - let synced_block = SyncedBlock { - certificate: self.certificate.clone(), - block_bytes: self.block.to_bytes().unwrap(), - }; - - let proto = encode_synced_block(synced_block)?; - Ok(proto.encode_to_vec()) - } - - fn from_bytes(bytes: &[u8]) -> Option { - let synced_block = proto::blocksync::SyncedBlock::decode(bytes).ok()?; - let synced_block = decode_sync_block(synced_block).ok()?; - let block = Block::from_bytes(synced_block.block_bytes.as_ref()).ok()?; +fn decode_certificate(bytes: &[u8]) -> Result, ProtoError> { + let proto = proto::CommitCertificate::decode(bytes)?; + codec::decode_certificate(proto) +} - Some(Self { - block, - certificate: synced_block.certificate, - }) - } +fn encode_certificate(certificate: CommitCertificate) -> Result, ProtoError> { + let proto = codec::encode_certificate(certificate)?; + Ok(proto.encode_to_vec()) } #[derive(Debug, Error)] @@ -112,6 +98,8 @@ impl redb::Key for HeightKey { } const BLOCK_TABLE: redb::TableDefinition> = redb::TableDefinition::new("blocks"); +const CERTIFICATE_TABLE: redb::TableDefinition> = + redb::TableDefinition::new("certificates"); struct Db { db: redb::Database, @@ -126,10 +114,22 @@ impl Db { fn get(&self, height: Height) -> Result, StoreError> { let tx = self.db.begin_read()?; - let table = tx.open_table(BLOCK_TABLE)?; - let value = table.get(&height)?; - let block = value.and_then(|value| DecidedBlock::from_bytes(&value.value())); - Ok(block) + let block = { + let table = tx.open_table(BLOCK_TABLE)?; + let value = table.get(&height)?; + value.and_then(|value| Block::from_bytes(&value.value()).ok()) + }; + let certificate = { + let table = tx.open_table(CERTIFICATE_TABLE)?; + let value = table.get(&height)?; + value.and_then(|value| decode_certificate(&value.value()).ok()) + }; + + let decided_block = block + .zip(certificate) + .map(|(block, certificate)| DecidedBlock { block, certificate }); + + Ok(decided_block) } fn insert(&self, decided_block: DecidedBlock) -> Result<(), StoreError> { @@ -137,10 +137,15 @@ impl Db { let tx = self.db.begin_write()?; { - let mut table = tx.open_table(BLOCK_TABLE)?; - table.insert(height, decided_block.into_bytes()?)?; + let mut blocks = tx.open_table(BLOCK_TABLE)?; + blocks.insert(height, decided_block.block.to_bytes()?.to_vec())?; + } + { + let mut certificates = tx.open_table(CERTIFICATE_TABLE)?; + certificates.insert(height, encode_certificate(decided_block.certificate)?)?; } tx.commit()?; + Ok(()) } @@ -162,10 +167,12 @@ impl Db { fn prune(&self, retain_height: Height) -> Result, StoreError> { let tx = self.db.begin_write().unwrap(); let pruned = { - let mut table = tx.open_table(BLOCK_TABLE)?; - let keys = self.range(&table, ..retain_height)?; + let mut blocks = tx.open_table(BLOCK_TABLE)?; + let mut certificates = tx.open_table(CERTIFICATE_TABLE)?; + let keys = self.range(&blocks, ..retain_height)?; for key in &keys { - table.remove(key)?; + blocks.remove(key)?; + certificates.remove(key)?; } keys }; @@ -190,8 +197,9 @@ impl Db { fn create_tables(&self) -> Result<(), StoreError> { let tx = self.db.begin_write()?; - // Implicitly creates the "blocks" table if it does not exists + // Implicitly creates the tables if they do not exist yet let _ = tx.open_table(BLOCK_TABLE)?; + let _ = tx.open_table(CERTIFICATE_TABLE)?; tx.commit()?; Ok(()) } @@ -228,12 +236,10 @@ impl BlockStore { certificate: &CommitCertificate, txes: &[Transaction], ) -> Result<(), StoreError> { - let block_id = certificate.value_id; - let decided_block = DecidedBlock { block: Block { height: certificate.height, - block_hash: block_id, + block_hash: certificate.value_id, transactions: Transactions::new(txes.to_vec()), }, certificate: certificate.clone(), diff --git a/code/crates/starknet/host/src/codec.rs b/code/crates/starknet/host/src/codec.rs index 0606b34fd..f6add0572 100644 --- a/code/crates/starknet/host/src/codec.rs +++ b/code/crates/starknet/host/src/codec.rs @@ -95,7 +95,7 @@ impl NetworkCodec> for ProtobufCodec { Ok(blocksync::Response { height: Height::new(response.block_number, response.fork_id), - block: response.block.map(decode_sync_block).transpose()?, + block: response.block.map(decode_synced_block).transpose()?, }) } @@ -309,7 +309,7 @@ pub(crate) fn decode_certificate( Ok(certificate) } -pub(crate) fn decode_sync_block( +pub(crate) fn decode_synced_block( synced_block: proto::blocksync::SyncedBlock, ) -> Result, ProtoError> { let certificate = if let Some(certificate) = synced_block.certificate {