From 80ff7cc13ed6cfcd1e5d8473f718797be3592bf8 Mon Sep 17 00:00:00 2001 From: Jorge Silva Date: Wed, 21 Sep 2022 12:54:15 +0100 Subject: [PATCH] Revert "perf: memory usage" --- .../corecontracts/test/core_accounts_test.go | 5 +- .../wasm/corecontracts/test/core_blob_test.go | 3 +- .../corecontracts/test/core_blocklog_test.go | 5 +- .../test/core_governance_test.go | 5 +- .../wasm/corecontracts/test/core_root_test.go | 3 +- .../corecontracts/test/corecontracts_test.go | 3 +- contracts/wasm/dividend/test/dividend_test.go | 3 +- .../test/donatewithfeedback_test.go | 3 +- contracts/wasm/erc20/test/erc20_test.go | 3 +- contracts/wasm/erc20/test/init_test.go | 3 +- contracts/wasm/erc721/test/erc721_test.go | 3 +- .../wasm/fairauction/test/fairauction_test.go | 3 +- .../fairroulette/test/fairroulette_test.go | 3 +- .../executiontime/test/executiontime_test.go | 3 +- .../gascalibration/memory/test/memory_test.go | 3 +- .../storage/test/storage_test.go | 3 +- .../wasm/helloworld/test/helloworld_test.go | 3 +- .../wasm/inccounter/test/inccounter_test.go | 3 +- .../schemacomment/test/schemacomment_test.go | 3 +- contracts/wasm/testcore/test/call_test.go | 3 +- .../wasm/testcore/test/check_ctx_test.go | 3 +- .../wasm/testcore/test/concurrency_test.go | 3 +- .../wasm/testcore/test/init_fail_test.go | 3 +- .../wasm/testcore/test/misc_call_test.go | 3 +- .../wasm/testcore/test/offledger_test.go | 3 +- .../wasm/testcore/test/sandbox_panic_test.go | 3 +- contracts/wasm/testcore/test/spawn_test.go | 3 +- contracts/wasm/testcore/test/testcore_test.go | 3 +- contracts/wasm/testcore/test/transfer_test.go | 3 +- contracts/wasm/testcore/test/types_test.go | 3 +- .../test/testwasmlib_bigint_test.go | 3 +- .../test/testwasmlib_client_test.go | 3 +- .../test/testwasmlib_proxy_test.go | 3 +- .../wasm/testwasmlib/test/testwasmlib_test.go | 3 +- .../wasm/timestamp/test/timestamp_test.go | 3 +- .../tokenregistry/test/tokenregistry_test.go | 3 +- packages/chain/chain.go | 47 ++- packages/chain/chainimpl/chainimpl.go | 31 +- .../chain/chainimpl/{dismiss.go => entry.go} | 5 +- packages/chain/chainimpl/eventproc.go | 13 +- packages/chain/chainimpl/l1handlers.go | 79 ----- packages/chain/consensus/action.go | 68 ++++- packages/chain/consensus/consensus.go | 33 +- packages/chain/consensus/consensus_test.go | 2 +- packages/chain/consensus/eventproc.go | 15 + packages/chain/consensus/mocked_node_test.go | 78 +++-- packages/chain/consensus/pipeMetrics.go | 5 + .../chain/nodeconnchain/nodeconn_chain.go | 286 ++++++++++++++++++ packages/chain/statemgr/action.go | 4 +- packages/chain/statemgr/mocked_node_test.go | 34 +-- packages/chain/statemgr/statemgr.go | 4 +- .../templates/metrics_chain_consensus.tmpl | 5 + packages/nodeconn/nc_chain.go | 13 +- packages/nodeconn/nodeconn.go | 109 +++++-- packages/testutil/testchain/mock_ledger.go | 114 ++++--- packages/testutil/testchain/mock_ledgers.go | 16 +- packages/testutil/testchain/mock_nodeconn.go | 207 +++++++++++-- packages/webapi/admapi/chainmetrics.go | 1 + packages/webapi/model/consensus_metrics.go | 2 + tools/cluster/templates/waspconfig.go | 2 +- tools/cluster/tests/nodeconn_test.go | 38 ++- tools/cluster/tests/spam_test.go | 1 - tools/wasp-cli/metrics/consensus.go | 1 + 63 files changed, 939 insertions(+), 388 deletions(-) rename packages/chain/chainimpl/{dismiss.go => entry.go} (92%) delete mode 100644 packages/chain/chainimpl/l1handlers.go create mode 100644 packages/chain/nodeconnchain/nodeconn_chain.go diff --git a/contracts/wasm/corecontracts/test/core_accounts_test.go b/contracts/wasm/corecontracts/test/core_accounts_test.go index 0b2b90f197..3f6cc7ec5b 100644 --- a/contracts/wasm/corecontracts/test/core_accounts_test.go +++ b/contracts/wasm/corecontracts/test/core_accounts_test.go @@ -8,9 +8,6 @@ import ( "math/big" "testing" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/iotaledger/hive.go/serializer/v2" iotago "github.com/iotaledger/iota.go/v3" "github.com/iotaledger/wasp/packages/isc" @@ -19,6 +16,8 @@ import ( "github.com/iotaledger/wasp/packages/wasmvm/wasmlib/go/wasmlib/coreaccounts" "github.com/iotaledger/wasp/packages/wasmvm/wasmlib/go/wasmlib/wasmtypes" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) const ( diff --git a/contracts/wasm/corecontracts/test/core_blob_test.go b/contracts/wasm/corecontracts/test/core_blob_test.go index 07431dac34..0af0d38e85 100644 --- a/contracts/wasm/corecontracts/test/core_blob_test.go +++ b/contracts/wasm/corecontracts/test/core_blob_test.go @@ -6,11 +6,10 @@ package test import ( "testing" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/packages/wasmvm/wasmlib/go/wasmlib/coreblob" "github.com/iotaledger/wasp/packages/wasmvm/wasmlib/go/wasmlib/wasmtypes" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" + "github.com/stretchr/testify/require" ) // this is the expected blob hash for key0/val0 key1/val1 diff --git a/contracts/wasm/corecontracts/test/core_blocklog_test.go b/contracts/wasm/corecontracts/test/core_blocklog_test.go index 5d18712a9a..e9cb43b0af 100644 --- a/contracts/wasm/corecontracts/test/core_blocklog_test.go +++ b/contracts/wasm/corecontracts/test/core_blocklog_test.go @@ -6,13 +6,12 @@ package test import ( "testing" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/packages/vm/core/blocklog" "github.com/iotaledger/wasp/packages/wasmvm/wasmlib/go/wasmlib/coreblocklog" "github.com/iotaledger/wasp/packages/wasmvm/wasmlib/go/wasmlib/wasmtypes" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func setupBlockLog(t *testing.T) *wasmsolo.SoloContext { diff --git a/contracts/wasm/corecontracts/test/core_governance_test.go b/contracts/wasm/corecontracts/test/core_governance_test.go index 05c085d3c6..bb4816ba84 100644 --- a/contracts/wasm/corecontracts/test/core_governance_test.go +++ b/contracts/wasm/corecontracts/test/core_governance_test.go @@ -6,15 +6,14 @@ package test import ( "testing" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - iotago "github.com/iotaledger/iota.go/v3" "github.com/iotaledger/wasp/packages/vm/core/governance" "github.com/iotaledger/wasp/packages/vm/gas" "github.com/iotaledger/wasp/packages/wasmvm/wasmlib/go/wasmlib/coregovernance" "github.com/iotaledger/wasp/packages/wasmvm/wasmlib/go/wasmlib/wasmtypes" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func setupGovernance(t *testing.T) *wasmsolo.SoloContext { diff --git a/contracts/wasm/corecontracts/test/core_root_test.go b/contracts/wasm/corecontracts/test/core_root_test.go index fbe13872c1..7fd9004bf9 100644 --- a/contracts/wasm/corecontracts/test/core_root_test.go +++ b/contracts/wasm/corecontracts/test/core_root_test.go @@ -7,12 +7,11 @@ import ( "os" "testing" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/packages/vm/core/root" "github.com/iotaledger/wasp/packages/wasmvm/wasmlib/go/wasmlib/coreblob" "github.com/iotaledger/wasp/packages/wasmvm/wasmlib/go/wasmlib/coreroot" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" + "github.com/stretchr/testify/require" ) func setupRoot(t *testing.T) *wasmsolo.SoloContext { diff --git a/contracts/wasm/corecontracts/test/corecontracts_test.go b/contracts/wasm/corecontracts/test/corecontracts_test.go index a6e5f14f9c..793169c116 100644 --- a/contracts/wasm/corecontracts/test/corecontracts_test.go +++ b/contracts/wasm/corecontracts/test/corecontracts_test.go @@ -6,10 +6,9 @@ package test import ( "testing" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/contracts/wasm/corecontracts/go/corecontracts" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" + "github.com/stretchr/testify/require" ) func setup(t *testing.T) *wasmsolo.SoloContext { diff --git a/contracts/wasm/dividend/test/dividend_test.go b/contracts/wasm/dividend/test/dividend_test.go index a87425ff64..aef88b3792 100644 --- a/contracts/wasm/dividend/test/dividend_test.go +++ b/contracts/wasm/dividend/test/dividend_test.go @@ -6,11 +6,10 @@ package test import ( "testing" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/contracts/wasm/dividend/go/dividend" "github.com/iotaledger/wasp/packages/isc" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" + "github.com/stretchr/testify/require" ) func dividendMember(ctx *wasmsolo.SoloContext, agent *wasmsolo.SoloAgent, factor uint64) { diff --git a/contracts/wasm/donatewithfeedback/test/donatewithfeedback_test.go b/contracts/wasm/donatewithfeedback/test/donatewithfeedback_test.go index 75cf6a5dee..9b794f4cd4 100644 --- a/contracts/wasm/donatewithfeedback/test/donatewithfeedback_test.go +++ b/contracts/wasm/donatewithfeedback/test/donatewithfeedback_test.go @@ -6,11 +6,10 @@ package test import ( "testing" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/contracts/wasm/donatewithfeedback/go/donatewithfeedback" "github.com/iotaledger/wasp/packages/isc" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" + "github.com/stretchr/testify/require" ) func setupTest(t *testing.T) *wasmsolo.SoloContext { diff --git a/contracts/wasm/erc20/test/erc20_test.go b/contracts/wasm/erc20/test/erc20_test.go index 4fe74a936f..7708904f6d 100644 --- a/contracts/wasm/erc20/test/erc20_test.go +++ b/contracts/wasm/erc20/test/erc20_test.go @@ -3,13 +3,12 @@ package test import ( "testing" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/contracts/wasm/erc20/go/erc20" "github.com/iotaledger/wasp/packages/solo" "github.com/iotaledger/wasp/packages/utxodb" "github.com/iotaledger/wasp/packages/vm/core/corecontracts" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" + "github.com/stretchr/testify/require" ) var ( diff --git a/contracts/wasm/erc20/test/init_test.go b/contracts/wasm/erc20/test/init_test.go index 13c0c8dc9a..c20edeea7a 100644 --- a/contracts/wasm/erc20/test/init_test.go +++ b/contracts/wasm/erc20/test/init_test.go @@ -3,12 +3,11 @@ package test import ( "testing" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/contracts/wasm/erc20/go/erc20" "github.com/iotaledger/wasp/packages/utxodb" "github.com/iotaledger/wasp/packages/vm/core/corecontracts" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" + "github.com/stretchr/testify/require" ) func TestDeployErc20(t *testing.T) { diff --git a/contracts/wasm/erc721/test/erc721_test.go b/contracts/wasm/erc721/test/erc721_test.go index 8d7812d303..9bdbe14232 100644 --- a/contracts/wasm/erc721/test/erc721_test.go +++ b/contracts/wasm/erc721/test/erc721_test.go @@ -6,11 +6,10 @@ package test import ( "testing" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/contracts/wasm/erc721/go/erc721" "github.com/iotaledger/wasp/packages/wasmvm/wasmlib/go/wasmlib/wasmtypes" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" + "github.com/stretchr/testify/require" ) func TestDeploy(t *testing.T) { diff --git a/contracts/wasm/fairauction/test/fairauction_test.go b/contracts/wasm/fairauction/test/fairauction_test.go index b7a7be001f..38f1596f4c 100644 --- a/contracts/wasm/fairauction/test/fairauction_test.go +++ b/contracts/wasm/fairauction/test/fairauction_test.go @@ -7,12 +7,11 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/contracts/wasm/fairauction/go/fairauction" "github.com/iotaledger/wasp/packages/wasmvm/wasmlib/go/wasmlib" "github.com/iotaledger/wasp/packages/wasmvm/wasmlib/go/wasmlib/wasmtypes" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" + "github.com/stretchr/testify/require" ) const ( diff --git a/contracts/wasm/fairroulette/test/fairroulette_test.go b/contracts/wasm/fairroulette/test/fairroulette_test.go index 694f0d8cd5..7361f6f09f 100644 --- a/contracts/wasm/fairroulette/test/fairroulette_test.go +++ b/contracts/wasm/fairroulette/test/fairroulette_test.go @@ -7,10 +7,9 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/contracts/wasm/fairroulette/go/fairroulette" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" + "github.com/stretchr/testify/require" ) func setupTest(t *testing.T) *wasmsolo.SoloContext { diff --git a/contracts/wasm/gascalibration/executiontime/test/executiontime_test.go b/contracts/wasm/gascalibration/executiontime/test/executiontime_test.go index 9f0f8ca1f6..78e1d20117 100644 --- a/contracts/wasm/gascalibration/executiontime/test/executiontime_test.go +++ b/contracts/wasm/gascalibration/executiontime/test/executiontime_test.go @@ -7,12 +7,11 @@ import ( "flag" "testing" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/contracts/wasm/gascalibration" "github.com/iotaledger/wasp/contracts/wasm/gascalibration/executiontime/go/executiontime" "github.com/iotaledger/wasp/packages/wasmvm/wasmlib/go/wasmlib" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" + "github.com/stretchr/testify/require" ) var force = flag.Bool("force", false, "") diff --git a/contracts/wasm/gascalibration/memory/test/memory_test.go b/contracts/wasm/gascalibration/memory/test/memory_test.go index 28c7afaf4b..f59f67f278 100644 --- a/contracts/wasm/gascalibration/memory/test/memory_test.go +++ b/contracts/wasm/gascalibration/memory/test/memory_test.go @@ -7,12 +7,11 @@ import ( "flag" "testing" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/contracts/wasm/gascalibration" "github.com/iotaledger/wasp/contracts/wasm/gascalibration/memory/go/memory" "github.com/iotaledger/wasp/packages/wasmvm/wasmlib/go/wasmlib" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" + "github.com/stretchr/testify/require" ) var force = flag.Bool("force", false, "") diff --git a/contracts/wasm/gascalibration/storage/test/storage_test.go b/contracts/wasm/gascalibration/storage/test/storage_test.go index 63d017e726..02a5f7cb9f 100644 --- a/contracts/wasm/gascalibration/storage/test/storage_test.go +++ b/contracts/wasm/gascalibration/storage/test/storage_test.go @@ -7,12 +7,11 @@ import ( "flag" "testing" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/contracts/wasm/gascalibration" "github.com/iotaledger/wasp/contracts/wasm/gascalibration/storage/go/storage" "github.com/iotaledger/wasp/packages/wasmvm/wasmlib/go/wasmlib" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" + "github.com/stretchr/testify/require" ) var force = flag.Bool("force", false, "") diff --git a/contracts/wasm/helloworld/test/helloworld_test.go b/contracts/wasm/helloworld/test/helloworld_test.go index 0310e5e184..a027332cfb 100644 --- a/contracts/wasm/helloworld/test/helloworld_test.go +++ b/contracts/wasm/helloworld/test/helloworld_test.go @@ -6,10 +6,9 @@ package test import ( "testing" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/contracts/wasm/helloworld/go/helloworld" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" + "github.com/stretchr/testify/require" ) func setupTest(t *testing.T) *wasmsolo.SoloContext { diff --git a/contracts/wasm/inccounter/test/inccounter_test.go b/contracts/wasm/inccounter/test/inccounter_test.go index 5b81249195..16d0e2a5f0 100644 --- a/contracts/wasm/inccounter/test/inccounter_test.go +++ b/contracts/wasm/inccounter/test/inccounter_test.go @@ -8,11 +8,10 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/contracts/wasm/inccounter/go/inccounter" "github.com/iotaledger/wasp/packages/wasmvm/wasmhost" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" + "github.com/stretchr/testify/require" ) func setupTest(t *testing.T) *wasmsolo.SoloContext { diff --git a/contracts/wasm/schemacomment/test/schemacomment_test.go b/contracts/wasm/schemacomment/test/schemacomment_test.go index e055bd2ab1..b175572392 100644 --- a/contracts/wasm/schemacomment/test/schemacomment_test.go +++ b/contracts/wasm/schemacomment/test/schemacomment_test.go @@ -6,10 +6,9 @@ package test import ( "testing" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/contracts/wasm/schemacomment/go/schemacomment" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" + "github.com/stretchr/testify/require" ) func TestDeploy(t *testing.T) { diff --git a/contracts/wasm/testcore/test/call_test.go b/contracts/wasm/testcore/test/call_test.go index 4f5e699fae..b66785cd45 100644 --- a/contracts/wasm/testcore/test/call_test.go +++ b/contracts/wasm/testcore/test/call_test.go @@ -3,10 +3,9 @@ package test import ( "testing" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/contracts/wasm/testcore/go/testcore" "github.com/iotaledger/wasp/packages/wasmvm/wasmlib/go/wasmlib/wasmtypes" + "github.com/stretchr/testify/require" ) // N Fib(N) Calls diff --git a/contracts/wasm/testcore/test/check_ctx_test.go b/contracts/wasm/testcore/test/check_ctx_test.go index 98be76b8a0..ce20d25d49 100644 --- a/contracts/wasm/testcore/test/check_ctx_test.go +++ b/contracts/wasm/testcore/test/check_ctx_test.go @@ -3,9 +3,8 @@ package test import ( "testing" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/contracts/wasm/testcore/go/testcore" + "github.com/stretchr/testify/require" ) func TestMainCallsFromFullEP(t *testing.T) { diff --git a/contracts/wasm/testcore/test/concurrency_test.go b/contracts/wasm/testcore/test/concurrency_test.go index c919798707..05549f5c61 100644 --- a/contracts/wasm/testcore/test/concurrency_test.go +++ b/contracts/wasm/testcore/test/concurrency_test.go @@ -5,13 +5,12 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/contracts/wasm/testcore/go/testcore" "github.com/iotaledger/wasp/packages/kv/codec" "github.com/iotaledger/wasp/packages/solo" "github.com/iotaledger/wasp/packages/utxodb" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" + "github.com/stretchr/testify/require" ) func TestCounter(t *testing.T) { diff --git a/contracts/wasm/testcore/test/init_fail_test.go b/contracts/wasm/testcore/test/init_fail_test.go index cfc83300eb..80ca104644 100644 --- a/contracts/wasm/testcore/test/init_fail_test.go +++ b/contracts/wasm/testcore/test/init_fail_test.go @@ -3,11 +3,10 @@ package test import ( "testing" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/contracts/wasm/testcore/go/testcore" "github.com/iotaledger/wasp/packages/vm/core/corecontracts" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" + "github.com/stretchr/testify/require" ) func TestInitSuccess(t *testing.T) { diff --git a/contracts/wasm/testcore/test/misc_call_test.go b/contracts/wasm/testcore/test/misc_call_test.go index 127344b442..9554c3b355 100644 --- a/contracts/wasm/testcore/test/misc_call_test.go +++ b/contracts/wasm/testcore/test/misc_call_test.go @@ -3,9 +3,8 @@ package test import ( "testing" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/contracts/wasm/testcore/go/testcore" + "github.com/stretchr/testify/require" ) func TestChainOwnerIDView(t *testing.T) { diff --git a/contracts/wasm/testcore/test/offledger_test.go b/contracts/wasm/testcore/test/offledger_test.go index 29dc74e53b..f46f387812 100644 --- a/contracts/wasm/testcore/test/offledger_test.go +++ b/contracts/wasm/testcore/test/offledger_test.go @@ -3,11 +3,10 @@ package test import ( "testing" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/contracts/wasm/testcore/go/testcore" "github.com/iotaledger/wasp/packages/utxodb" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" + "github.com/stretchr/testify/require" ) func TestOffLedgerFailNoAccount(t *testing.T) { diff --git a/contracts/wasm/testcore/test/sandbox_panic_test.go b/contracts/wasm/testcore/test/sandbox_panic_test.go index a615986bb6..eae3aea37b 100644 --- a/contracts/wasm/testcore/test/sandbox_panic_test.go +++ b/contracts/wasm/testcore/test/sandbox_panic_test.go @@ -4,11 +4,10 @@ import ( "strings" "testing" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/contracts/wasm/testcore/go/testcore" "github.com/iotaledger/wasp/packages/vm/core/testcore/sbtests/sbtestsc" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" + "github.com/stretchr/testify/require" ) func verifyErrorInReceipts(t *testing.T, ctx *wasmsolo.SoloContext, msg string) { diff --git a/contracts/wasm/testcore/test/spawn_test.go b/contracts/wasm/testcore/test/spawn_test.go index b21b95544e..b18eed8a82 100644 --- a/contracts/wasm/testcore/test/spawn_test.go +++ b/contracts/wasm/testcore/test/spawn_test.go @@ -3,10 +3,9 @@ package test import ( "testing" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/contracts/wasm/testcore/go/testcore" "github.com/iotaledger/wasp/packages/vm/core/corecontracts" + "github.com/stretchr/testify/require" ) func TestSpawn(t *testing.T) { diff --git a/contracts/wasm/testcore/test/testcore_test.go b/contracts/wasm/testcore/test/testcore_test.go index 8e62dc016b..051445b4b2 100644 --- a/contracts/wasm/testcore/test/testcore_test.go +++ b/contracts/wasm/testcore/test/testcore_test.go @@ -4,8 +4,6 @@ import ( "fmt" "testing" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/contracts/wasm/testcore/go/testcore" "github.com/iotaledger/wasp/packages/solo" "github.com/iotaledger/wasp/packages/util" @@ -15,6 +13,7 @@ import ( "github.com/iotaledger/wasp/packages/wasmvm/wasmlib/go/wasmlib/coreaccounts" "github.com/iotaledger/wasp/packages/wasmvm/wasmlib/go/wasmlib/coreroot" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" + "github.com/stretchr/testify/require" ) func deployTestCore(t *testing.T, runWasm bool, addCreator ...bool) *wasmsolo.SoloContext { diff --git a/contracts/wasm/testcore/test/transfer_test.go b/contracts/wasm/testcore/test/transfer_test.go index f5e5fc2ae5..484440499a 100644 --- a/contracts/wasm/testcore/test/transfer_test.go +++ b/contracts/wasm/testcore/test/transfer_test.go @@ -3,11 +3,10 @@ package test import ( "testing" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/contracts/wasm/testcore/go/testcore" "github.com/iotaledger/wasp/packages/isc" "github.com/iotaledger/wasp/packages/solo" + "github.com/stretchr/testify/require" ) func TestDoNothing(t *testing.T) { diff --git a/contracts/wasm/testcore/test/types_test.go b/contracts/wasm/testcore/test/types_test.go index 6cada0ef5b..04a4b3cab8 100644 --- a/contracts/wasm/testcore/test/types_test.go +++ b/contracts/wasm/testcore/test/types_test.go @@ -3,11 +3,10 @@ package test import ( "testing" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/contracts/wasm/testcore/go/testcore" "github.com/iotaledger/wasp/packages/hashing" "github.com/iotaledger/wasp/packages/isc" + "github.com/stretchr/testify/require" ) //nolint:dupl diff --git a/contracts/wasm/testwasmlib/test/testwasmlib_bigint_test.go b/contracts/wasm/testwasmlib/test/testwasmlib_bigint_test.go index 7291319381..4b21efaf3f 100644 --- a/contracts/wasm/testwasmlib/test/testwasmlib_bigint_test.go +++ b/contracts/wasm/testwasmlib/test/testwasmlib_bigint_test.go @@ -7,11 +7,10 @@ import ( "math/rand" "testing" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/contracts/wasm/testwasmlib/go/testwasmlib" "github.com/iotaledger/wasp/packages/wasmvm/wasmlib/go/wasmlib/wasmtypes" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" + "github.com/stretchr/testify/require" ) const ( diff --git a/contracts/wasm/testwasmlib/test/testwasmlib_client_test.go b/contracts/wasm/testwasmlib/test/testwasmlib_client_test.go index 137bb11e28..f5d29751f3 100644 --- a/contracts/wasm/testwasmlib/test/testwasmlib_client_test.go +++ b/contracts/wasm/testwasmlib/test/testwasmlib_client_test.go @@ -6,8 +6,6 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/client/chainclient" "github.com/iotaledger/wasp/contracts/wasm/testwasmlib/go/testwasmlib" "github.com/iotaledger/wasp/packages/cryptolib" @@ -15,6 +13,7 @@ import ( "github.com/iotaledger/wasp/packages/wasmvm/wasmlib/go/wasmlib/wasmtypes" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" cluster_tests "github.com/iotaledger/wasp/tools/cluster/tests" + "github.com/stretchr/testify/require" ) const ( diff --git a/contracts/wasm/testwasmlib/test/testwasmlib_proxy_test.go b/contracts/wasm/testwasmlib/test/testwasmlib_proxy_test.go index bb54e93cd3..f5651ed964 100644 --- a/contracts/wasm/testwasmlib/test/testwasmlib_proxy_test.go +++ b/contracts/wasm/testwasmlib/test/testwasmlib_proxy_test.go @@ -6,12 +6,11 @@ package test import ( "testing" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/contracts/wasm/testwasmlib/go/testwasmlib" "github.com/iotaledger/wasp/packages/wasmvm/wasmhost" "github.com/iotaledger/wasp/packages/wasmvm/wasmlib/go/wasmlib/wasmtypes" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" + "github.com/stretchr/testify/require" ) func TestStringMapOfStringArrayClear(t *testing.T) { diff --git a/contracts/wasm/testwasmlib/test/testwasmlib_test.go b/contracts/wasm/testwasmlib/test/testwasmlib_test.go index f41b81313b..8c02a6c8c4 100644 --- a/contracts/wasm/testwasmlib/test/testwasmlib_test.go +++ b/contracts/wasm/testwasmlib/test/testwasmlib_test.go @@ -11,8 +11,6 @@ import ( "strings" "testing" - "github.com/stretchr/testify/require" - iotago "github.com/iotaledger/iota.go/v3" "github.com/iotaledger/wasp/contracts/wasm/testwasmlib/go/testwasmlib" "github.com/iotaledger/wasp/packages/hashing" @@ -22,6 +20,7 @@ import ( "github.com/iotaledger/wasp/packages/wasmvm/wasmlib/go/wasmlib/coreblocklog" "github.com/iotaledger/wasp/packages/wasmvm/wasmlib/go/wasmlib/wasmtypes" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" + "github.com/stretchr/testify/require" ) var ( diff --git a/contracts/wasm/timestamp/test/timestamp_test.go b/contracts/wasm/timestamp/test/timestamp_test.go index d3fdb9e554..1eb2423fee 100644 --- a/contracts/wasm/timestamp/test/timestamp_test.go +++ b/contracts/wasm/timestamp/test/timestamp_test.go @@ -3,10 +3,9 @@ package test import ( "testing" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/contracts/wasm/timestamp/go/timestamp" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" + "github.com/stretchr/testify/require" ) func TestDeploy(t *testing.T) { diff --git a/contracts/wasm/tokenregistry/test/tokenregistry_test.go b/contracts/wasm/tokenregistry/test/tokenregistry_test.go index e4bf69155b..1fe41d8298 100644 --- a/contracts/wasm/tokenregistry/test/tokenregistry_test.go +++ b/contracts/wasm/tokenregistry/test/tokenregistry_test.go @@ -6,10 +6,9 @@ package test import ( "testing" - "github.com/stretchr/testify/require" - "github.com/iotaledger/wasp/contracts/wasm/tokenregistry/go/tokenregistry" "github.com/iotaledger/wasp/packages/wasmvm/wasmsolo" + "github.com/stretchr/testify/require" ) func setupTest(t *testing.T) *wasmsolo.SoloContext { diff --git a/packages/chain/chain.go b/packages/chain/chain.go index dd9fbfd8e3..5dedb6de1c 100644 --- a/packages/chain/chain.go +++ b/packages/chain/chain.go @@ -9,8 +9,8 @@ import ( "github.com/iotaledger/hive.go/core/events" "github.com/iotaledger/hive.go/core/kvstore" "github.com/iotaledger/hive.go/core/logger" - "github.com/iotaledger/inx-app/nodebridge" iotago "github.com/iotaledger/iota.go/v3" + "github.com/iotaledger/iota.go/v3/nodeclient" "github.com/iotaledger/trie.go/trie" "github.com/iotaledger/wasp/packages/chain/mempool" "github.com/iotaledger/wasp/packages/chain/messages" @@ -93,26 +93,52 @@ type Committee interface { GetRandomValidators(upToN int) []*cryptolib.PublicKey // TODO: Remove after OffLedgerRequest dissemination is changed. } +type ( + NodeConnectionAliasOutputHandlerFun func(*isc.AliasOutputWithID) + NodeConnectionOnLedgerRequestHandlerFun func(isc.OnLedgerRequest) + NodeConnectionInclusionStateHandlerFun func(iotago.TransactionID, string) + NodeConnectionMilestonesHandlerFun func(*nodeclient.MilestoneInfo) +) + type NodeConnection interface { - RegisterChain( - chainID *isc.ChainID, - stateOutputHandler, - outputHandler func(iotago.OutputID, iotago.Output), - milestoneHandler func(*nodebridge.Milestone), - ) + RegisterChain(chainID *isc.ChainID, stateOutputHandler, outputHandler func(iotago.OutputID, iotago.Output)) UnregisterChain(chainID *isc.ChainID) - PublishTransaction(chainID *isc.ChainID, tx *iotago.Transaction) error + PublishStateTransaction(chainID *isc.ChainID, stateIndex uint32, tx *iotago.Transaction) error + PublishGovernanceTransaction(chainID *isc.ChainID, tx *iotago.Transaction) error PullLatestOutput(chainID *isc.ChainID) + PullTxInclusionState(chainID *isc.ChainID, txid iotago.TransactionID) PullStateOutputByID(chainID *isc.ChainID, id *iotago.UTXOInput) - AttachMilestones(handler func(*nodebridge.Milestone)) *events.Closure + AttachTxInclusionStateEvents(chainID *isc.ChainID, handler NodeConnectionInclusionStateHandlerFun) (*events.Closure, error) + DetachTxInclusionStateEvents(chainID *isc.ChainID, closure *events.Closure) error + AttachMilestones(handler NodeConnectionMilestonesHandlerFun) *events.Closure DetachMilestones(attachID *events.Closure) SetMetrics(metrics nodeconnmetrics.NodeConnectionMetrics) GetMetrics() nodeconnmetrics.NodeConnectionMetrics } +type ChainNodeConnection interface { + AttachToAliasOutput(NodeConnectionAliasOutputHandlerFun) + DetachFromAliasOutput() + AttachToOnLedgerRequest(NodeConnectionOnLedgerRequestHandlerFun) + DetachFromOnLedgerRequest() + AttachToTxInclusionState(NodeConnectionInclusionStateHandlerFun) + DetachFromTxInclusionState() + AttachToMilestones(NodeConnectionMilestonesHandlerFun) + DetachFromMilestones() + Close() + + PublishStateTransaction(stateIndex uint32, tx *iotago.Transaction) error + PublishGovernanceTransaction(tx *iotago.Transaction) error + PullLatestOutput() + PullTxInclusionState(txid iotago.TransactionID) + PullStateOutputByID(*iotago.UTXOInput) + + GetMetrics() nodeconnmetrics.NodeConnectionMessagesMetrics +} + type StateManager interface { Ready() *ready.Ready EnqueueGetBlockMsg(msg *messages.GetBlockMsgIn) @@ -129,6 +155,7 @@ type Consensus interface { EnqueueStateTransitionMsg(bool, state.VirtualStateAccess, *isc.AliasOutputWithID, time.Time) EnqueueDssIndexProposalMsg(msg *messages.DssIndexProposalMsg) EnqueueDssSignatureMsg(msg *messages.DssSignatureMsg) + EnqueueTxInclusionsStateMsg(iotago.TransactionID, string) EnqueueAsynchronousCommonSubsetMsg(msg *messages.AsynchronousCommonSubsetMsg) EnqueueVMResultMsg(msg *messages.VMResultMsg) EnqueueTimerMsg(messages.TimerTick) @@ -138,7 +165,6 @@ type Consensus interface { GetWorkflowStatus() ConsensusWorkflowStatus ShouldReceiveMissingRequest(req isc.Request) bool GetPipeMetrics() ConsensusPipeMetrics - SetTimeData(time.Time) } type AsynchronousCommonSubsetRunner interface { @@ -193,6 +219,7 @@ type ConsensusWorkflowStatus interface { type ConsensusPipeMetrics interface { GetEventStateTransitionMsgPipeSize() int GetEventPeerLogIndexMsgPipeSize() int + GetEventInclusionStateMsgPipeSize() int GetEventACSMsgPipeSize() int GetEventVMResultMsgPipeSize() int GetEventTimerMsgPipeSize() int diff --git a/packages/chain/chainimpl/chainimpl.go b/packages/chain/chainimpl/chainimpl.go index a3923bf1cc..de7347b433 100644 --- a/packages/chain/chainimpl/chainimpl.go +++ b/packages/chain/chainimpl/chainimpl.go @@ -17,6 +17,7 @@ import ( dss_node_pkg "github.com/iotaledger/wasp/packages/chain/dss/node" mempool_pkg "github.com/iotaledger/wasp/packages/chain/mempool" "github.com/iotaledger/wasp/packages/chain/messages" + "github.com/iotaledger/wasp/packages/chain/nodeconnchain" "github.com/iotaledger/wasp/packages/chain/statemgr" "github.com/iotaledger/wasp/packages/cryptolib" "github.com/iotaledger/wasp/packages/isc" @@ -37,6 +38,11 @@ import ( const maxMsgBuffer = 1000 +var ( + _ chain.Chain = &chainObj{} + _ map[cryptolib.PublicKeyKey]bool // We rely on value comparison on the pubkeys, just assert that here. +) + type chainObj struct { committee atomic.Value mempool mempool_pkg.Mempool @@ -52,15 +58,13 @@ type chainObj struct { consensus chain.Consensus dssNode dss_node_pkg.DSSNode log *logger.Logger - nodeConn chain.NodeConnection + nodeConn chain.ChainNodeConnection db kvstore.KVStore netProvider peering.NetworkProvider dksProvider registry.DKShareRegistryProvider eventRequestProcessed *events.Event eventChainTransition *events.Event eventChainTransitionClosure *events.Closure - txInclusingClosure *events.Closure - milestonesClosure *events.Closure receiveChainPeerMessagesAttachID interface{} detachFromCommitteePeerMessagesFun func() chainPeers peering.PeerDomainProvider @@ -146,7 +150,11 @@ func NewChain( consensusJournalRegistry: consensusJournalRegistry, wal: wal, dssNode: dss_node_pkg.New(&peeringID, netProvider, nodeIdentity, log), - nodeConn: nc, + } + ret.nodeConn, err = nodeconnchain.NewChainNodeConnection(chainID, nc, chainLog) + if err != nil { + ret.log.Errorf("NewChain: unable to create chain node connection: %v", err) + return nil } ret.committee.Store(&committeeStruct{}) @@ -168,14 +176,8 @@ func NewChain( ret.eventChainTransitionClosure = events.NewClosure(ret.processChainTransition) ret.eventChainTransition.Hook(ret.eventChainTransitionClosure) - - nc.RegisterChain( - chainID, - ret.stateOutputHandler, - ret.outputHandler, - ret.handleMilestone, - ) - + ret.nodeConn.AttachToOnLedgerRequest(ret.receiveOnLedgerRequest) + ret.nodeConn.AttachToAliasOutput(ret.EnqueueAliasOutput) ret.receiveChainPeerMessagesAttachID = ret.chainPeers.Attach(peering.PeerMessageReceiverChain, ret.receiveChainPeerMessages) go ret.recvLoop() ret.startTimer() @@ -194,6 +196,11 @@ func (c *chainObj) startTimer() { }() } +func (c *chainObj) receiveOnLedgerRequest(request isc.OnLedgerRequest) { + c.log.Debugf("receiveOnLedgerRequest: %s", request.ID()) + c.mempool.ReceiveRequest(request) +} + func (c *chainObj) receiveCommitteePeerMessages(peerMsg *peering.PeerMessageGroupIn) { if peerMsg.MsgType != chain.PeerMsgTypeMissingRequestIDs { c.log.Warnf("Wrong type of chain message (with committee peering ID): %v, ignoring it", peerMsg.MsgType) diff --git a/packages/chain/chainimpl/dismiss.go b/packages/chain/chainimpl/entry.go similarity index 92% rename from packages/chain/chainimpl/dismiss.go rename to packages/chain/chainimpl/entry.go index fb6fbc9746..8698a400a3 100644 --- a/packages/chain/chainimpl/dismiss.go +++ b/packages/chain/chainimpl/entry.go @@ -14,10 +14,10 @@ func (c *chainObj) Dismiss(reason string) { c.dismissOnce.Do(func() { c.dismissed.Store(true) c.chainPeers.Detach(c.receiveChainPeerMessagesAttachID) + c.nodeConn.DetachFromOnLedgerRequest() + c.nodeConn.DetachFromAliasOutput() c.eventChainTransition.Detach(c.eventChainTransitionClosure) - c.nodeConn.UnregisterChain(c.chainID) - c.mempool.Close() c.stateMgr.Close() cmt := c.getCommittee() @@ -32,6 +32,7 @@ func (c *chainObj) Dismiss(reason string) { c.eventRequestProcessed.DetachAll() c.eventChainTransition.DetachAll() c.chainPeers.Close() + c.nodeConn.Close() c.dismissChainMsgPipe.Close() c.aliasOutputPipe.Close() diff --git a/packages/chain/chainimpl/eventproc.go b/packages/chain/chainimpl/eventproc.go index c8b3723c46..b9d937deac 100644 --- a/packages/chain/chainimpl/eventproc.go +++ b/packages/chain/chainimpl/eventproc.go @@ -247,18 +247,7 @@ func (c *chainObj) createNewCommitteeAndConsensus(dkShare tcrypto.DKShare) error if err != nil { return xerrors.Errorf("cannot load consensus journal: %w", err) } - c.consensus = consensus.New( - c, - c.mempool, - cmt, - cmtPeerGroup, - c.pullMissingRequestsFromCommittee, - c.chainMetrics, - c.dssNode, - consensusJournal, - c.wal, - c.nodeConn.PublishTransaction, - ) + c.consensus = consensus.New(c, c.mempool, cmt, cmtPeerGroup, c.nodeConn, c.pullMissingRequestsFromCommittee, c.chainMetrics, c.dssNode, consensusJournal, c.wal) c.setCommittee(cmt) return nil } diff --git a/packages/chain/chainimpl/l1handlers.go b/packages/chain/chainimpl/l1handlers.go deleted file mode 100644 index e5f3701202..0000000000 --- a/packages/chain/chainimpl/l1handlers.go +++ /dev/null @@ -1,79 +0,0 @@ -package chainimpl - -import ( - "time" - - "github.com/iotaledger/inx-app/nodebridge" - iotago "github.com/iotaledger/iota.go/v3" - "github.com/iotaledger/wasp/packages/isc" - "github.com/iotaledger/wasp/packages/util" -) - -func (c *chainObj) handleMilestone(metadata *nodebridge.Milestone) { - c.log.Debugf("received milestone index : %d", metadata.Milestone.Index) - if c.consensus != nil { - c.consensus.SetTimeData(time.Unix(int64(metadata.Milestone.Timestamp), 0)) - } -} - -func (c *chainObj) stateOutputHandler(outputID iotago.OutputID, output iotago.Output) { - c.nodeConn.GetMetrics().GetInStateOutput().CountLastMessage(struct { - OutputID iotago.OutputID - Output iotago.Output - }{ - OutputID: outputID, - Output: output, - }) - outputIDUTXO := outputID.UTXOInput() - outputIDstring := isc.OID(outputIDUTXO) - c.log.Debugf("handling state output ID %v", outputIDstring) - aliasOutput, ok := output.(*iotago.AliasOutput) - if !ok { - c.log.Panicf("unexpected output ID %v type %T received as state update to chain ID %s; alias output expected", - outputIDstring, output, c.chainID) - } - if aliasOutput.AliasID.Empty() && aliasOutput.StateIndex != 0 { - c.log.Panicf("unexpected output ID %v index %v with empty alias ID received as state update to chain ID %s; alias ID may be empty for initial alias output only", - outputIDstring, aliasOutput.StateIndex, c.chainID) - } - if !util.AliasIDFromAliasOutput(aliasOutput, outputID).ToAddress().Equal(c.chainID.AsAddress()) { - c.log.Panicf("unexpected output ID %v address %s index %v received as state update to chain ID %s, address %s", - outputIDstring, aliasOutput.AliasID.ToAddress(), aliasOutput.StateIndex, c.chainID, c.chainID.AsAddress()) - } - c.log.Debugf("handling state output ID %v: writing alias output to channel", outputIDstring) - c.nodeConn.GetMetrics().GetInAliasOutput().CountLastMessage(aliasOutput) - c.EnqueueAliasOutput(isc.NewAliasOutputWithID(aliasOutput, outputIDUTXO)) - c.log.Debugf("handling state output ID %v: alias output handled", outputIDstring) -} - -func (c *chainObj) outputHandler(outputID iotago.OutputID, output iotago.Output) { - c.nodeConn.GetMetrics().GetInOutput().CountLastMessage(struct { - OutputID iotago.OutputID - Output iotago.Output - }{ - OutputID: outputID, - Output: output, - }) - outputIDUTXO := outputID.UTXOInput() - outputIDstring := isc.OID(outputIDUTXO) - c.log.Debugf("handling output ID %v", outputIDstring) - onLedgerRequest, err := isc.OnLedgerFromUTXO(output, outputIDUTXO) - if err != nil { - c.log.Warnf("handling output ID %v: unknown output type; ignoring it", outputIDstring) - return - } - c.log.Debugf("handling output ID %v: writing on ledger request to channel", outputIDstring) - c.nodeConn.GetMetrics().GetInOnLedgerRequest().CountLastMessage(onLedgerRequest) - c.mempool.ReceiveRequest(onLedgerRequest) - c.log.Debugf("handling output ID %v: on ledger request handled", outputIDstring) -} - -func (c *chainObj) PullLatestOutput() { - c.nodeConn.GetMetrics().GetOutPullLatestOutput().CountLastMessage(nil) - c.nodeConn.PullLatestOutput(c.chainID) -} - -func (c *chainObj) PullStateOutputByID(outputID *iotago.UTXOInput) { - c.nodeConn.GetMetrics().GetOutPullOutputByID().CountLastMessage(outputID) - c.nodeConn.PullStateOutputByID(c.chainID, outputID) -} diff --git a/packages/chain/consensus/action.go b/packages/chain/consensus/action.go index fa1034d09e..715a118aab 100644 --- a/packages/chain/consensus/action.go +++ b/packages/chain/consensus/action.go @@ -44,6 +44,7 @@ func (c *consensus) takeAction() { c.runVMIfNeeded() c.checkQuorum() c.postTransactionIfNeeded() + c.pullInclusionStateIfNeeded() } // proposeBatchIfNeeded when non empty ready batch is available is in mempool propose it as a candidate @@ -386,11 +387,8 @@ func (c *consensus) postTransactionIfNeeded() { } var logMsgTypeStr string var logMsgStateIndexStr string - - // `c.publishTx` takes care of waiting for the tx to confirm, and handled re-attchment/promotions - c.workflow.setTransactionPosted() if c.resultState == nil { // governance transaction - if err := c.publishTx(c.chain.ID(), c.finalTx); err != nil { + if err := c.nodeConn.PublishGovernanceTransaction(c.finalTx); err != nil { c.log.Errorf("postTransaction: error publishing gov transaction: %w", err) return } @@ -398,17 +396,15 @@ func (c *consensus) postTransactionIfNeeded() { logMsgStateIndexStr = "" } else { stateIndex := c.resultState.BlockIndex() - if err := c.publishTx(c.chain.ID(), c.finalTx); err != nil { + if err := c.nodeConn.PublishStateTransaction(stateIndex, c.finalTx); err != nil { c.log.Errorf("postTransaction: error publishing state transaction: %v", err) return } logMsgTypeStr = "STATE" logMsgStateIndexStr = fmt.Sprintf(" for state %v", stateIndex) } - c.workflow.setTransactionSeen() - c.workflow.setCompleted() - c.refreshConsensusInfo() + c.workflow.setTransactionPosted() // TODO: Fix it, retries should be in place for robustness. logMsgStart := fmt.Sprintf("postTransaction: POSTED %s TRANSACTION%s:", logMsgTypeStr, logMsgStateIndexStr) logMsgEnd := fmt.Sprintf("number of inputs: %d, outputs: %d", len(c.finalTx.Essence.Inputs), len(c.finalTx.Essence.Outputs)) txID, err := c.finalTx.ID() @@ -419,6 +415,30 @@ func (c *consensus) postTransactionIfNeeded() { } } +// pullInclusionStateIfNeeded periodic pull to know the inclusions state of the transaction. Note that pulling +// starts immediately after finalization of the transaction, not after posting it +func (c *consensus) pullInclusionStateIfNeeded() { + if !c.workflow.IsTransactionFinalized() { + c.log.Debugf("pullInclusionState not needed: transaction is not finalized") + return + } + if c.workflow.IsTransactionSeen() { + c.log.Debugf("pullInclusionState not needed: transaction already seen") + return + } + if time.Now().Before(c.pullInclusionStateDeadline) { + c.log.Debugf("pullInclusionState not needed: delayed till %v", c.pullInclusionStateDeadline) + return + } + finalTxID, err := c.finalTx.ID() + if err != nil { + c.log.Panicf("pullInclusionState: cannot calculate final transaction id: %v", err) + } + c.nodeConn.PullTxInclusionState(finalTxID) + c.pullInclusionStateDeadline = time.Now().Add(c.timers.PullInclusionStateRetry) + c.log.Debugf("pullInclusionState: request for inclusion state sent") +} + // prepareBatchProposal creates a batch proposal structure out of requests func (c *consensus) prepareBatchProposal(reqs []isc.Request, dssNonceIndexProposal []int) *BatchProposal { consensusManaPledge := identity.ID{} @@ -589,6 +609,38 @@ func (c *consensus) receiveACS(values [][]byte, sessionID uint64, logIndex journ c.runVMIfNeeded() } +func (c *consensus) processTxInclusionState(msg *messages.TxInclusionStateMsg) { + if !c.workflow.IsTransactionFinalized() { + c.log.Debugf("processTxInclusionState: transaction not finalized -> skipping.") + return + } + finalTxID, err := c.finalTx.ID() + finalTxIDStr := isc.TxID(finalTxID) + if err != nil { + c.log.Panicf("processTxInclusionState: cannot calculate final transaction id: %v", err) + } + if msg.TxID != finalTxID { + c.log.Debugf("processTxInclusionState: current transaction id %v does not match the received one %v -> skipping.", + finalTxIDStr, isc.TxID(msg.TxID)) + return + } + switch msg.State { + case "noTransaction": + c.log.Debugf("processTxInclusionState: transaction id %v is not known.", finalTxIDStr) + case "included": + c.workflow.setTransactionSeen() + c.workflow.setCompleted() + c.refreshConsensusInfo() + c.log.Debugf("processTxInclusionState: transaction id %s is included; workflow finished", finalTxIDStr) + case "conflicting": + c.workflow.setTransactionSeen() + c.log.Infof("processTxInclusionState: transaction id %s is conflicting; restarting consensus.", finalTxIDStr) + c.resetWorkflow() + default: + c.log.Warnf("processTxInclusionState: unknown inclusion state %s for transaction id %s; ignoring", msg.State, finalTxIDStr) + } +} + func (c *consensus) finalizeTransaction() (*iotago.Transaction, *isc.AliasOutputWithID, error) { if c.dssSignature == nil { return nil, nil, fmt.Errorf("DSS signature not ready yet") diff --git a/packages/chain/consensus/consensus.go b/packages/chain/consensus/consensus.go index b9b3a6d133..0b0230138f 100644 --- a/packages/chain/consensus/consensus.go +++ b/packages/chain/consensus/consensus.go @@ -11,6 +11,7 @@ import ( "github.com/iotaledger/hive.go/core/logger" iotago "github.com/iotaledger/iota.go/v3" + "github.com/iotaledger/iota.go/v3/nodeclient" "github.com/iotaledger/wasp/packages/chain" "github.com/iotaledger/wasp/packages/chain/consensus/journal" dss_node "github.com/iotaledger/wasp/packages/chain/dss/node" @@ -33,6 +34,7 @@ type consensus struct { committee chain.Committee committeePeerGroup peering.GroupProvider mempool mempool_pkg.Mempool + nodeConn chain.ChainNodeConnection vmRunner vm.VMRunner currentState state.VirtualStateAccess stateOutput *isc.AliasOutputWithID @@ -60,6 +62,7 @@ type consensus struct { eventDssIndexProposalMsgPipe pipe.Pipe eventDssSignatureMsgPipe pipe.Pipe eventPeerLogIndexMsgPipe pipe.Pipe + eventInclusionStateMsgPipe pipe.Pipe eventACSMsgPipe pipe.Pipe eventVMResultMsgPipe pipe.Pipe eventTimerMsgPipe pipe.Pipe @@ -77,7 +80,6 @@ type consensus struct { consensusJournal journal.ConsensusJournal consensusJournalLogIndex journal.LogIndex // Index of the currently running log index. wal chain.WAL - publishTx func(chainID *isc.ChainID, tx *iotago.Transaction) error } var _ chain.Consensus = &consensus{} @@ -93,12 +95,12 @@ func New( mempool mempool_pkg.Mempool, committee chain.Committee, peerGroup peering.GroupProvider, + nodeConn chain.ChainNodeConnection, pullMissingRequestsFromCommittee bool, consensusMetrics metrics.ConsensusMetrics, dssNode dss_node.DSSNode, consensusJournal journal.ConsensusJournal, wal chain.WAL, - publishTx func(chainID *isc.ChainID, tx *iotago.Transaction) error, timersOpt ...ConsensusTimers, ) chain.Consensus { var timers ConsensusTimers @@ -113,6 +115,7 @@ func New( committee: committee, committeePeerGroup: peerGroup, mempool: mempool, + nodeConn: nodeConn, vmRunner: runvm.NewVMRunner(), workflow: newWorkflowStatus(false), timers: timers, @@ -121,6 +124,7 @@ func New( eventDssIndexProposalMsgPipe: pipe.NewLimitInfinitePipe(maxMsgBuffer), eventDssSignatureMsgPipe: pipe.NewLimitInfinitePipe(maxMsgBuffer), eventPeerLogIndexMsgPipe: pipe.NewLimitInfinitePipe(maxMsgBuffer), + eventInclusionStateMsgPipe: pipe.NewLimitInfinitePipe(maxMsgBuffer), eventACSMsgPipe: pipe.NewLimitInfinitePipe(maxMsgBuffer), eventVMResultMsgPipe: pipe.NewLimitInfinitePipe(maxMsgBuffer), eventTimerMsgPipe: pipe.NewLimitInfinitePipe(1), @@ -130,9 +134,14 @@ func New( dssNode: dssNode, consensusJournal: consensusJournal, wal: wal, - publishTx: publishTx, } ret.receivePeerMessagesAttachID = ret.committeePeerGroup.Attach(peering.PeerMessageReceiverConsensus, ret.receiveCommitteePeerMessages) // TODO: Don't need to attach here at all. + ret.nodeConn.AttachToMilestones(func(milestonePointer *nodeclient.MilestoneInfo) { + ret.timeData = time.Unix(int64(milestonePointer.Timestamp), 0) + }) + ret.nodeConn.AttachToTxInclusionState(func(txID iotago.TransactionID, inclusionState string) { + ret.EnqueueTxInclusionsStateMsg(txID, inclusionState) + }) ret.refreshConsensusInfo() go ret.recvLoop() return ret @@ -160,6 +169,7 @@ func (c *consensus) IsReady() bool { } func (c *consensus) Close() { + c.nodeConn.DetachFromTxInclusionState() c.committeePeerGroup.Detach(c.receivePeerMessagesAttachID) c.eventStateTransitionMsgPipe.Close() @@ -170,6 +180,7 @@ func (c *consensus) Close() { c.eventDssSignatureMsgPipe.Close() c.eventPeerLogIndexMsgPipe.Close() + c.eventInclusionStateMsgPipe.Close() c.eventACSMsgPipe.Close() c.eventVMResultMsgPipe.Close() c.eventTimerMsgPipe.Close() @@ -181,6 +192,7 @@ func (c *consensus) recvLoop() { eventDssIndexProposalMsgCh := c.eventDssIndexProposalMsgPipe.Out() eventDssSignatureMsgCh := c.eventDssSignatureMsgPipe.Out() eventPeerLogIndexMsgCh := c.eventPeerLogIndexMsgPipe.Out() + eventInclusionStateMsgCh := c.eventInclusionStateMsgPipe.Out() eventACSMsgCh := c.eventACSMsgPipe.Out() eventVMResultMsgCh := c.eventVMResultMsgPipe.Out() eventTimerMsgCh := c.eventTimerMsgPipe.Out() @@ -189,7 +201,7 @@ func (c *consensus) recvLoop() { eventDssIndexProposalMsgCh == nil && eventDssSignatureMsgCh == nil && eventPeerLogIndexMsgCh == nil && - // eventInclusionStateMsgCh == nil && + eventInclusionStateMsgCh == nil && eventACSMsgCh == nil && eventVMResultMsgCh == nil && eventTimerMsgCh == nil @@ -238,6 +250,14 @@ func (c *consensus) recvLoop() { } else { eventPeerLogIndexMsgCh = nil } + case msg, ok := <-eventInclusionStateMsgCh: + if ok { + c.log.Debugf("Consensus::recvLoop, eventTxInclusionState...") + c.handleTxInclusionState(msg.(*messages.TxInclusionStateMsg)) + c.log.Debugf("Consensus::recvLoop, eventTxInclusionState... Done") + } else { + eventInclusionStateMsgCh = nil + } case msg, ok := <-eventACSMsgCh: if ok { c.log.Debugf("Consensus::recvLoop, eventAsynchronousCommonSubset...") @@ -269,10 +289,6 @@ func (c *consensus) recvLoop() { } } -func (c *consensus) SetTimeData(t time.Time) { - c.timeData = t -} - func (c *consensus) refreshConsensusInfo() { index := uint32(0) if c.currentState != nil { @@ -317,6 +333,7 @@ func (c *consensus) GetPipeMetrics() chain.ConsensusPipeMetrics { return &pipeMetrics{ eventStateTransitionMsgPipeSize: c.eventStateTransitionMsgPipe.Len(), eventPeerLogIndexMsgPipeSize: c.eventPeerLogIndexMsgPipe.Len(), + eventInclusionStateMsgPipeSize: c.eventInclusionStateMsgPipe.Len(), eventTimerMsgPipeSize: c.eventTimerMsgPipe.Len(), eventVMResultMsgPipeSize: c.eventVMResultMsgPipe.Len(), eventACSMsgPipeSize: c.eventACSMsgPipe.Len(), diff --git a/packages/chain/consensus/consensus_test.go b/packages/chain/consensus/consensus_test.go index 283d6fc3c6..352549a852 100644 --- a/packages/chain/consensus/consensus_test.go +++ b/packages/chain/consensus/consensus_test.go @@ -14,7 +14,7 @@ import ( "github.com/iotaledger/wasp/packages/chain/consensus" ) -const waitMempoolTimeout = 3 * time.Minute // JS: isn't 3 minutes way too long? +const waitMempoolTimeout = 3 * time.Minute func TestConsensusEnvMockedACS(t *testing.T) { t.Run("wait index mocked ACS", func(t *testing.T) { diff --git a/packages/chain/consensus/eventproc.go b/packages/chain/consensus/eventproc.go index 1b9d337420..a41bbab0cc 100644 --- a/packages/chain/consensus/eventproc.go +++ b/packages/chain/consensus/eventproc.go @@ -7,6 +7,7 @@ import ( "fmt" "time" + iotago "github.com/iotaledger/iota.go/v3" "github.com/iotaledger/wasp/packages/chain/messages" "github.com/iotaledger/wasp/packages/hashing" "github.com/iotaledger/wasp/packages/isc" @@ -73,6 +74,20 @@ func (c *consensus) handlePeerLogIndexMsg(msg *messages.PeerLogIndexMsgIn) { } } +func (c *consensus) EnqueueTxInclusionsStateMsg(txID iotago.TransactionID, inclusionState string) { + c.eventInclusionStateMsgPipe.In() <- &messages.TxInclusionStateMsg{ + TxID: txID, + State: inclusionState, + } +} + +func (c *consensus) handleTxInclusionState(msg *messages.TxInclusionStateMsg) { + c.log.Debugf("TxInclusionStateMsg received: %s: '%s'", isc.TxID(msg.TxID), msg.State) + c.processTxInclusionState(msg) + + c.takeAction() +} + func (c *consensus) EnqueueAsynchronousCommonSubsetMsg(msg *messages.AsynchronousCommonSubsetMsg) { c.eventACSMsgPipe.In() <- msg } diff --git a/packages/chain/consensus/mocked_node_test.go b/packages/chain/consensus/mocked_node_test.go index 0f39eb52b6..94b3f0d4b0 100644 --- a/packages/chain/consensus/mocked_node_test.go +++ b/packages/chain/consensus/mocked_node_test.go @@ -7,7 +7,6 @@ import ( "github.com/iotaledger/hive.go/core/kvstore/mapdb" "github.com/iotaledger/hive.go/core/logger" - "github.com/iotaledger/inx-app/nodebridge" iotago "github.com/iotaledger/iota.go/v3" "github.com/iotaledger/trie.go/trie" "github.com/iotaledger/wasp/packages/chain" @@ -16,6 +15,7 @@ import ( dss_node "github.com/iotaledger/wasp/packages/chain/dss/node" "github.com/iotaledger/wasp/packages/chain/mempool" "github.com/iotaledger/wasp/packages/chain/messages" + "github.com/iotaledger/wasp/packages/chain/nodeconnchain" "github.com/iotaledger/wasp/packages/cryptolib" "github.com/iotaledger/wasp/packages/isc" "github.com/iotaledger/wasp/packages/isc/coreutil" @@ -57,7 +57,6 @@ func NewNode(env *MockedEnv, nodeIndex uint16, timers ConsensusTimers) *mockedNo SolidStates: make(map[uint32]state.VirtualStateAccess), Log: log, } - ret.stateSync = coreutil.NewChainStateSync() store := mapdb.NewMapDB() ret.ChainCore.OnGlobalStateSync(func() coreutil.ChainStateSync { @@ -93,6 +92,8 @@ func NewNode(env *MockedEnv, nodeIndex uint16, timers ConsensusTimers) *mockedNo require.Equal(env.T, uint32(0), originState.BlockIndex()) require.True(env.T, ret.addNewState(originState)) + chainNodeConn, err := nodeconnchain.NewChainNodeConnection(env.ChainID, ret.NodeConn, log) + require.NoError(env.T, err) var peeringID peering.PeeringID copy(peeringID[:], env.ChainID[:]) dss := dss_node.New(&peeringID, env.NetworkProviders[nodeIndex], ret.NodeKeyPair, log) @@ -100,22 +101,10 @@ func NewNode(env *MockedEnv, nodeIndex uint16, timers ConsensusTimers) *mockedNo cmtF := cmtN - int(cmt.Quorum()) registry, err := journal.LoadConsensusJournal(*env.ChainID, cmt.Address(), testchain.NewMockedConsensusJournalRegistry(), cmtN, cmtF, log) require.NoError(env.T, err) - - cons := New(ret.ChainCore, ret.Mempool, cmt, cmtPeerGroup, true, metrics.DefaultChainMetrics(), dss, registry, wal.NewDefault(), ret.NodeConn.PublishTransaction, timers) + cons := New(ret.ChainCore, ret.Mempool, cmt, cmtPeerGroup, chainNodeConn, true, metrics.DefaultChainMetrics(), dss, registry, wal.NewDefault(), timers) cons.(*consensus).vmRunner = testchain.NewMockedVMRunner(env.T, log) ret.Consensus = cons - ret.NodeConn.RegisterChain( - env.ChainID, - func(oid iotago.OutputID, o iotago.Output) { - ret.receiveStateOutput(isc.NewAliasOutputWithID(o.(*iotago.AliasOutput), oid.UTXOInput())) - }, - func(iotago.OutputID, iotago.Output) {}, - func(metadata *nodebridge.Milestone) { - ret.Consensus.SetTimeData(time.Unix(int64(metadata.Milestone.Timestamp), 0)) - }, - ) - ret.doStateApproved(originState, env.InitStateOutput) ret.ChainCore.OnStateCandidate(func(newState state.VirtualStateAccess, approvingOutputID *iotago.UTXOInput) { // State manager mock: state candidate received and is approved by checking that L1 has approving output @@ -156,6 +145,7 @@ func NewNode(env *MockedEnv, nodeIndex uint16, timers ConsensusTimers) *mockedNo ret.doStateApproved(newState, isc.NewAliasOutputWithID(output, approvingOutputID)) }() }) + go ret.pullStateLoop() ret.Log.Debugf("Mocked node %v started: id %v public key %v", ret.NodeIndex, ret.NodeID, ret.NodeKeyPair.GetPublicKey().String()) return ret } @@ -181,24 +171,22 @@ func (n *mockedNode) addNewState(newState state.VirtualStateAccess) bool { return false } - if newStateIndex > 0 { - calcState := n.getState(newStateIndex - 1) - if calcState != nil { - block, err := newState.ExtractBlock() - if err != nil { - n.Log.Panicf("State manager mock: error extracting block: %v", err) - } - calcState = calcState.Copy() - err = calcState.ApplyBlock(block) - if err != nil { - n.Log.Panicf("State manager mock: error applying to previous state: %v", err) - } - calcState.Commit() - csCommitment := trie.RootCommitment(calcState.TrieNodeStore()) - if !state.EqualCommitments(nsCommitment, csCommitment) { - n.Log.Panicf("State manager mock: calculated state commitment %s differs from new state commitment %s", - csCommitment, nsCommitment) - } + calcState := n.getState(newStateIndex - 1) + if calcState != nil { + block, err := newState.ExtractBlock() + if err != nil { + n.Log.Panicf("State manager mock: error extracting block: %v", err) + } + calcState = calcState.Copy() + err = calcState.ApplyBlock(block) + if err != nil { + n.Log.Panicf("State manager mock: error applying to previous state: %v", err) + } + calcState.Commit() + csCommitment := trie.RootCommitment(calcState.TrieNodeStore()) + if !state.EqualCommitments(nsCommitment, csCommitment) { + n.Log.Panicf("State manager mock: calculated state commitment %s differs from new state commitment %s", + csCommitment, nsCommitment) } } @@ -266,19 +254,23 @@ func (n *mockedNode) doStateApproved(newState state.VirtualStateAccess, newState n.StateOutput.GetStateIndex(), trie.RootCommitment(newState.TrieNodeStore()), isc.OID(n.StateOutput.ID())) } -func (n *mockedNode) receiveStateOutput(stateOutput *isc.AliasOutputWithID) { // State manager mock: when node is behind and tries to catchup using state output from L1 and blocks (virtual states in mocke environment) from other nodes - stateIndex := stateOutput.GetStateIndex() - if stateOutput != nil && (stateIndex > n.StateOutput.GetStateIndex()) { - n.Log.Debugf("State manager mock (pullStateLoop): new state output received: index %v, id %v", - stateIndex, isc.OID(stateOutput.ID())) - vstate := n.getState(stateIndex) - if vstate == nil { - vstate = n.getStateFromNodes(stateIndex) +func (n *mockedNode) pullStateLoop() { // State manager mock: when node is behind and tries to catchup using state output from L1 and blocks (virtual states in mocke environment) from other nodes + for { + time.Sleep(200 * time.Millisecond) + stateOutput := n.Env.Ledgers.GetLedger(n.Env.ChainID).GetLatestOutput() + stateIndex := stateOutput.GetStateIndex() + if stateOutput != nil && (stateIndex > n.StateOutput.GetStateIndex()) { + n.Log.Debugf("State manager mock (pullStateLoop): new state output received: index %v, id %v", + stateIndex, isc.OID(stateOutput.ID())) + vstate := n.getState(stateIndex) if vstate == nil { - n.Log.Panicf("State manager mock (pullStateLoop): state obtained from nodes is nil") + vstate = n.getStateFromNodes(stateIndex) + if vstate == nil { + n.Log.Panicf("State manager mock (pullStateLoop): state obtained from nodes is nil") + } } + n.doStateApproved(vstate, stateOutput) } - n.doStateApproved(vstate, stateOutput) } } diff --git a/packages/chain/consensus/pipeMetrics.go b/packages/chain/consensus/pipeMetrics.go index 9cfb3330f2..31a20b43ae 100644 --- a/packages/chain/consensus/pipeMetrics.go +++ b/packages/chain/consensus/pipeMetrics.go @@ -3,6 +3,7 @@ package consensus type pipeMetrics struct { eventStateTransitionMsgPipeSize int eventPeerLogIndexMsgPipeSize int + eventInclusionStateMsgPipeSize int eventACSMsgPipeSize int eventVMResultMsgPipeSize int eventTimerMsgPipeSize int @@ -16,6 +17,10 @@ func (p *pipeMetrics) GetEventPeerLogIndexMsgPipeSize() int { return p.eventPeerLogIndexMsgPipeSize } +func (p *pipeMetrics) GetEventInclusionStateMsgPipeSize() int { + return p.eventInclusionStateMsgPipeSize +} + func (p *pipeMetrics) GetEventACSMsgPipeSize() int { return p.eventACSMsgPipeSize } diff --git a/packages/chain/nodeconnchain/nodeconn_chain.go b/packages/chain/nodeconnchain/nodeconn_chain.go new file mode 100644 index 0000000000..8984cfb7d5 --- /dev/null +++ b/packages/chain/nodeconnchain/nodeconn_chain.go @@ -0,0 +1,286 @@ +// Copyright 2020 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +package nodeconnchain + +import ( + "sync" + + "github.com/iotaledger/hive.go/core/events" + "github.com/iotaledger/hive.go/core/logger" + iotago "github.com/iotaledger/iota.go/v3" + "github.com/iotaledger/wasp/packages/chain" + "github.com/iotaledger/wasp/packages/isc" + "github.com/iotaledger/wasp/packages/metrics/nodeconnmetrics" + "github.com/iotaledger/wasp/packages/util" +) + +// nodeconnChain is responsible for maintaining the information related to a single chain. +type nodeconnChain struct { + nc chain.NodeConnection + chainID *isc.ChainID + log *logger.Logger + + aliasOutputIsHandled bool + aliasOutputCh chan *isc.AliasOutputWithID + aliasOutputStopCh chan bool + onLedgerRequestIsHandled bool + onLedgerRequestCh chan isc.OnLedgerRequest + onLedgerRequestStopCh chan bool + txInclusionStateIsHandled bool + txInclusionStateCh chan *txInclusionStateMsg + txInclusionStateStopCh chan bool + txInclusionStateHandlerRef *events.Closure + milestonesHandlerRef *events.Closure + metrics nodeconnmetrics.NodeConnectionMessagesMetrics + mutex sync.Mutex // NOTE: mutexes might also be separated for aliasOutput, onLedgerRequest and txInclusionState; however, it is not going to be used heavily, so the common one is used. +} + +type txInclusionStateMsg struct { + txID iotago.TransactionID + state string +} + +var _ chain.ChainNodeConnection = &nodeconnChain{} + +func NewChainNodeConnection(chainID *isc.ChainID, nc chain.NodeConnection, log *logger.Logger) (chain.ChainNodeConnection, error) { + var err error + result := nodeconnChain{ + nc: nc, + chainID: chainID, + log: log.Named("ncc-" + chainID.String()[2:8]), + aliasOutputCh: make(chan *isc.AliasOutputWithID), + aliasOutputStopCh: make(chan bool), + onLedgerRequestCh: make(chan isc.OnLedgerRequest), + onLedgerRequestStopCh: make(chan bool), + txInclusionStateCh: make(chan *txInclusionStateMsg), + txInclusionStateStopCh: make(chan bool), + metrics: nc.GetMetrics().NewMessagesMetrics(chainID), + } + result.nc.RegisterChain(result.chainID, result.stateOutputHandler, result.outputHandler) + result.txInclusionStateHandlerRef, err = result.nc.AttachTxInclusionStateEvents(result.chainID, result.txInclusionStateHandler) + if err != nil { + result.log.Errorf("cannot create chain nodeconnection: %v", err) + return nil, err + } + result.log.Debugf("chain nodeconnection created") + return &result, nil +} + +func (nccT *nodeconnChain) stateOutputHandler(outputID iotago.OutputID, output iotago.Output) { + nccT.metrics.GetInStateOutput().CountLastMessage(struct { + OutputID iotago.OutputID + Output iotago.Output + }{ + OutputID: outputID, + Output: output, + }) + outputIDUTXO := outputID.UTXOInput() + outputIDstring := isc.OID(outputIDUTXO) + nccT.log.Debugf("handling state output ID %v", outputIDstring) + aliasOutput, ok := output.(*iotago.AliasOutput) + if !ok { + nccT.log.Panicf("unexpected output ID %v type %T received as state update to chain ID %s; alias output expected", + outputIDstring, output, nccT.chainID) + } + if aliasOutput.AliasID.Empty() && aliasOutput.StateIndex != 0 { + nccT.log.Panicf("unexpected output ID %v index %v with empty alias ID received as state update to chain ID %s; alias ID may be empty for initial alias output only", + outputIDstring, aliasOutput.StateIndex, nccT.chainID) + } + if !util.AliasIDFromAliasOutput(aliasOutput, outputID).ToAddress().Equal(nccT.chainID.AsAddress()) { + nccT.log.Panicf("unexpected output ID %v address %s index %v received as state update to chain ID %s, address %s", + outputIDstring, aliasOutput.AliasID.ToAddress(), aliasOutput.StateIndex, nccT.chainID, nccT.chainID.AsAddress()) + } + nccT.log.Debugf("handling state output ID %v: writing alias output to channel", outputIDstring) + nccT.aliasOutputCh <- isc.NewAliasOutputWithID(aliasOutput, outputIDUTXO) + nccT.log.Debugf("handling state output ID %v: alias output handled", outputIDstring) +} + +func (nccT *nodeconnChain) outputHandler(outputID iotago.OutputID, output iotago.Output) { + nccT.metrics.GetInOutput().CountLastMessage(struct { + OutputID iotago.OutputID + Output iotago.Output + }{ + OutputID: outputID, + Output: output, + }) + outputIDUTXO := outputID.UTXOInput() + outputIDstring := isc.OID(outputIDUTXO) + nccT.log.Debugf("handling output ID %v", outputIDstring) + onLedgerRequest, err := isc.OnLedgerFromUTXO(output, outputIDUTXO) + if err != nil { + nccT.log.Warnf("handling output ID %v: unknown output type; ignoring it", outputIDstring) + return + } + nccT.log.Debugf("handling output ID %v: writing on ledger request to channel", outputIDstring) + nccT.onLedgerRequestCh <- onLedgerRequest + nccT.log.Debugf("handling output ID %v: on ledger request handled", outputIDstring) +} + +func (nccT *nodeconnChain) txInclusionStateHandler(txID iotago.TransactionID, state string) { + txIDStr := isc.TxID(txID) + nccT.log.Debugf("handling inclusion state of tx ID %v: %v", txIDStr, state) + nccT.txInclusionStateCh <- &txInclusionStateMsg{ + txID: txID, + state: state, + } + nccT.log.Debugf("handling inclusion state of tx ID %v finished", txIDStr) +} + +func (nccT *nodeconnChain) AttachToAliasOutput(handler chain.NodeConnectionAliasOutputHandlerFun) { + nccT.mutex.Lock() + defer nccT.mutex.Unlock() + if nccT.aliasOutputIsHandled { + nccT.log.Errorf("alias output handler already started!") // NOTE: this should not happen; maybe panic? + return + } + nccT.aliasOutputIsHandled = true + go func() { + for { + select { + case aliasOutput := <-nccT.aliasOutputCh: + nccT.metrics.GetInAliasOutput().CountLastMessage(aliasOutput) + handler(aliasOutput) + case <-nccT.aliasOutputStopCh: + nccT.log.Debugf("alias output handler stopped") + return + } + } + }() + nccT.log.Debugf("alias output handler started") +} + +func (nccT *nodeconnChain) DetachFromAliasOutput() { + nccT.mutex.Lock() + defer nccT.mutex.Unlock() + if nccT.aliasOutputIsHandled { + nccT.aliasOutputStopCh <- true + nccT.aliasOutputIsHandled = false + } +} + +func (nccT *nodeconnChain) AttachToOnLedgerRequest(handler chain.NodeConnectionOnLedgerRequestHandlerFun) { + nccT.mutex.Lock() + defer nccT.mutex.Unlock() + if nccT.onLedgerRequestIsHandled { + nccT.log.Errorf("on ledger request handler already started!") // NOTE: this should not happen; maybe panic? + return + } + nccT.onLedgerRequestIsHandled = true + go func() { + for { + select { + case onLedgerRequest := <-nccT.onLedgerRequestCh: + nccT.metrics.GetInOnLedgerRequest().CountLastMessage(onLedgerRequest) + handler(onLedgerRequest) + case <-nccT.onLedgerRequestStopCh: + nccT.log.Debugf("on ledger request handler stopped") + return + } + } + }() + nccT.log.Debugf("on ledger request handler started") +} + +func (nccT *nodeconnChain) DetachFromOnLedgerRequest() { + nccT.mutex.Lock() + defer nccT.mutex.Unlock() + if nccT.onLedgerRequestIsHandled { + nccT.onLedgerRequestStopCh <- true + nccT.onLedgerRequestIsHandled = false + } +} + +func (nccT *nodeconnChain) AttachToTxInclusionState(handler chain.NodeConnectionInclusionStateHandlerFun) { + nccT.mutex.Lock() + defer nccT.mutex.Unlock() + if nccT.txInclusionStateIsHandled { + nccT.log.Errorf("transaction inclusion state handler already started!") + return + } + nccT.txInclusionStateIsHandled = true + go func() { + for { + select { + case msg := <-nccT.txInclusionStateCh: + nccT.metrics.GetInTxInclusionState().CountLastMessage(msg) + handler(msg.txID, msg.state) + case <-nccT.txInclusionStateStopCh: + nccT.log.Debugf("transaction inclusion state handler stopped") + return + } + } + }() + nccT.log.Debugf("transaction inclusion state handler started") +} + +func (nccT *nodeconnChain) DetachFromTxInclusionState() { + nccT.mutex.Lock() + defer nccT.mutex.Unlock() + if nccT.txInclusionStateIsHandled { + nccT.txInclusionStateStopCh <- true + nccT.txInclusionStateIsHandled = false + } +} + +func (nccT *nodeconnChain) AttachToMilestones(handler chain.NodeConnectionMilestonesHandlerFun) { + nccT.mutex.Lock() + defer nccT.mutex.Unlock() + nccT.detachFromMilestones() + nccT.milestonesHandlerRef = nccT.nc.AttachMilestones(handler) +} + +func (nccT *nodeconnChain) DetachFromMilestones() { + nccT.mutex.Lock() + defer nccT.mutex.Unlock() + nccT.detachFromMilestones() +} + +func (nccT *nodeconnChain) detachFromMilestones() { + if nccT.milestonesHandlerRef != nil { + nccT.nc.DetachMilestones(nccT.milestonesHandlerRef) + nccT.milestonesHandlerRef = nil + } +} + +func (nccT *nodeconnChain) PublishStateTransaction(stateIndex uint32, tx *iotago.Transaction) error { + nccT.metrics.GetOutPublishStateTransaction().CountLastMessage(struct { + StateIndex uint32 + Transaction *iotago.Transaction + }{ + StateIndex: stateIndex, + Transaction: tx, + }) + return nccT.nc.PublishStateTransaction(nccT.chainID, stateIndex, tx) +} + +func (nccT *nodeconnChain) PublishGovernanceTransaction(tx *iotago.Transaction) error { + nccT.metrics.GetOutPublishGovernanceTransaction().CountLastMessage(tx) + return nccT.nc.PublishGovernanceTransaction(nccT.chainID, tx) +} + +func (nccT *nodeconnChain) PullLatestOutput() { + nccT.metrics.GetOutPullLatestOutput().CountLastMessage(nil) + nccT.nc.PullLatestOutput(nccT.chainID) +} + +func (nccT *nodeconnChain) PullTxInclusionState(txID iotago.TransactionID) { + nccT.metrics.GetOutPullTxInclusionState().CountLastMessage(txID) + nccT.nc.PullTxInclusionState(nccT.chainID, txID) +} + +func (nccT *nodeconnChain) PullStateOutputByID(outputID *iotago.UTXOInput) { + nccT.metrics.GetOutPullOutputByID().CountLastMessage(outputID) + nccT.nc.PullStateOutputByID(nccT.chainID, outputID) +} + +func (nccT *nodeconnChain) GetMetrics() nodeconnmetrics.NodeConnectionMessagesMetrics { + return nccT.metrics +} + +func (nccT *nodeconnChain) Close() { + nccT.DetachFromMilestones() + _ = nccT.nc.DetachTxInclusionStateEvents(nccT.chainID, nccT.txInclusionStateHandlerRef) + nccT.nc.UnregisterChain(nccT.chainID) + nccT.log.Debugf("chain nodeconnection closed") +} diff --git a/packages/chain/statemgr/action.go b/packages/chain/statemgr/action.go index 7dbef72b33..da9c98dea5 100644 --- a/packages/chain/statemgr/action.go +++ b/packages/chain/statemgr/action.go @@ -65,7 +65,7 @@ func (sm *stateManager) isSynced() bool { func (sm *stateManager) pullStateIfNeeded() { currentTime := time.Now() if currentTime.After(sm.pullStateRetryTime) { - sm.nodeConn.PullLatestOutput(sm.chain.ID()) + sm.nodeConn.PullLatestOutput() sm.pullStateRetryTime = currentTime.Add(sm.timers.PullStateRetry) sm.log.Debugf("pullState: pulling state for address %s. Next pull in: %v", sm.chain.ID().AsAddress(), sm.pullStateRetryTime.Sub(currentTime)) @@ -135,7 +135,7 @@ func (sm *stateManager) addBlockFromPeer(block state.Block) bool { if !sm.syncingBlocks.hasApprovedBlockCandidate(block.BlockIndex()) { // TODO: make the timer to not spam L1 // ask for approving output sm.log.Debugf("addBlockFromPeer: requesting approving output ID %v", isc.OID(block.ApprovingOutputID())) - sm.nodeConn.PullStateOutputByID(sm.chain.ID(), block.ApprovingOutputID()) + sm.nodeConn.PullStateOutputByID(block.ApprovingOutputID()) } return true } diff --git a/packages/chain/statemgr/mocked_node_test.go b/packages/chain/statemgr/mocked_node_test.go index b43954924d..662260e182 100644 --- a/packages/chain/statemgr/mocked_node_test.go +++ b/packages/chain/statemgr/mocked_node_test.go @@ -11,10 +11,10 @@ import ( "github.com/iotaledger/hive.go/core/kvstore/mapdb" "github.com/iotaledger/hive.go/core/logger" - "github.com/iotaledger/inx-app/nodebridge" iotago "github.com/iotaledger/iota.go/v3" "github.com/iotaledger/wasp/packages/chain" "github.com/iotaledger/wasp/packages/chain/messages" + "github.com/iotaledger/wasp/packages/chain/nodeconnchain" "github.com/iotaledger/wasp/packages/cryptolib" "github.com/iotaledger/wasp/packages/isc" "github.com/iotaledger/wasp/packages/isc/coreutil" @@ -25,12 +25,13 @@ import ( ) type MockedNode struct { - PubKey *cryptolib.PublicKey - Env *MockedEnv - NodeConn *testchain.MockedNodeConn - ChainCore *testchain.MockedChainCore - StateManager chain.StateManager - Log *logger.Logger + PubKey *cryptolib.PublicKey + Env *MockedEnv + NodeConn *testchain.MockedNodeConn + ChainNodeConn chain.ChainNodeConnection + ChainCore *testchain.MockedChainCore + StateManager chain.StateManager + Log *logger.Logger } type MockedStateManagerMetrics struct{} @@ -63,23 +64,15 @@ func NewMockedNode(env *MockedEnv, nodeIndex int, timers StateManagerTimers) *Mo ret.ChainCore.OnGetStateReader(func() state.OptimisticStateReader { return state.NewOptimisticStateReader(store, stateSync) }) - ret.StateManager = New(store, ret.ChainCore, stateMgrDomain, ret.NodeConn, stateMgrMetrics, wal.NewDefault(), false, "", true, timers) + ret.ChainNodeConn, err = nodeconnchain.NewChainNodeConnection(env.ChainID, ret.NodeConn, log) + require.NoError(env.T, err) + ret.StateManager = New(store, ret.ChainCore, stateMgrDomain, ret.ChainNodeConn, stateMgrMetrics, wal.NewDefault(), false, "", true, timers) ret.Log.Debugf("Mocked node %v created: id %v public key %v", nodeIndex, nodeID, ret.PubKey.String()) - - ret.NodeConn.RegisterChain( - env.ChainID, - func(oid iotago.OutputID, o iotago.Output) { - ret.StateManager.EnqueueAliasOutput(isc.NewAliasOutputWithID(o.(*iotago.AliasOutput), oid.UTXOInput())) - }, - func(iotago.OutputID, iotago.Output) {}, - func(*nodebridge.Milestone) {}, - ) - return ret } func (node *MockedNode) Start() { - // node.ChainNodeConn.AttachToAliasOutput(node.StateManager.EnqueueAliasOutput) + node.ChainNodeConn.AttachToAliasOutput(node.StateManager.EnqueueAliasOutput) node.startTimer() node.Log.Debugf("Mocked node %v started", node.PubKey.String()) } @@ -131,8 +124,7 @@ func (node *MockedNode) MakeNewStateTransition() { func (node *MockedNode) NextState(vstate state.VirtualStateAccess, chainOutput *isc.AliasOutputWithID) { node.Log.Debugf("NextState: from state %d, output ID %v", vstate.BlockIndex(), isc.OID(chainOutput.ID())) nextState, tx, aliasOutputID := testchain.NextState(node.Env.T, node.Env.StateKeyPair, vstate, chainOutput, time.Now()) - cid := isc.ChainIDFromAliasID(chainOutput.GetAliasID()) - go node.NodeConn.PublishTransaction(&cid, tx) + go node.ChainNodeConn.PublishStateTransaction(vstate.BlockIndex(), tx) go node.StateManager.EnqueueStateCandidateMsg(nextState, aliasOutputID) node.Log.Debugf("NextState: result state %d, output ID %v", nextState.BlockIndex(), isc.OID(aliasOutputID)) } diff --git a/packages/chain/statemgr/statemgr.go b/packages/chain/statemgr/statemgr.go index 07acfe2dfa..d457cd3071 100644 --- a/packages/chain/statemgr/statemgr.go +++ b/packages/chain/statemgr/statemgr.go @@ -30,7 +30,7 @@ type stateManager struct { store kvstore.KVStore chain chain.ChainCore domain *DomainWithFallback - nodeConn chain.NodeConnection + nodeConn chain.ChainNodeConnection pullStateRetryTime time.Time solidState state.VirtualStateAccess stateOutput *isc.AliasOutputWithID @@ -70,7 +70,7 @@ func New( store kvstore.KVStore, c chain.ChainCore, domain *DomainWithFallback, - nodeconn chain.NodeConnection, + nodeconn chain.ChainNodeConnection, stateManagerMetrics metrics.StateManagerMetrics, wal chain.WAL, rawBlocksEnabled bool, diff --git a/packages/dashboard/templates/metrics_chain_consensus.tmpl b/packages/dashboard/templates/metrics_chain_consensus.tmpl index 45f1e0718f..d043f960ad 100644 --- a/packages/dashboard/templates/metrics_chain_consensus.tmpl +++ b/packages/dashboard/templates/metrics_chain_consensus.tmpl @@ -88,6 +88,11 @@ {{ .PipeMetrics.GetEventACSMsgPipeSize }} + + Event inclusion state message pipe size + {{ .PipeMetrics.GetEventInclusionStateMsgPipeSize }} + + Event vm result message pipe size {{ .PipeMetrics.GetEventVMResultMsgPipeSize }} diff --git a/packages/nodeconn/nc_chain.go b/packages/nodeconn/nc_chain.go index 327ce901bf..e490e19c5b 100644 --- a/packages/nodeconn/nc_chain.go +++ b/packages/nodeconn/nc_chain.go @@ -13,10 +13,10 @@ import ( "github.com/iotaledger/hive.go/core/events" "github.com/iotaledger/hive.go/core/logger" "github.com/iotaledger/hive.go/serializer/v2" - "github.com/iotaledger/inx-app/nodebridge" inx "github.com/iotaledger/inx/go" iotago "github.com/iotaledger/iota.go/v3" "github.com/iotaledger/iota.go/v3/nodeclient" + "github.com/iotaledger/wasp/packages/chain" "github.com/iotaledger/wasp/packages/isc" "github.com/iotaledger/wasp/packages/parameters" ) @@ -27,7 +27,7 @@ type ncChain struct { chainID *isc.ChainID outputHandler func(iotago.OutputID, iotago.Output) stateOutputHandler func(iotago.OutputID, iotago.Output) - milestoneClosure *events.Closure + inclusionStates *events.Event log *logger.Logger } @@ -36,15 +36,18 @@ func newNCChain( chainID *isc.ChainID, stateOutputHandler, outputHandler func(iotago.OutputID, iotago.Output), - milestoneHandler func(*nodebridge.Milestone), ) *ncChain { + inclusionStates := events.NewEvent(func(handler interface{}, params ...interface{}) { + handler.(chain.NodeConnectionInclusionStateHandlerFun)(params[0].(iotago.TransactionID), params[1].(string)) + }) + ncc := ncChain{ nc: nc, chainID: chainID, outputHandler: outputHandler, stateOutputHandler: stateOutputHandler, + inclusionStates: inclusionStates, log: nc.log.Named(chainID.String()[:6]), - milestoneClosure: nc.AttachMilestones(milestoneHandler), } ncc.run() return &ncc @@ -55,7 +58,7 @@ func (ncc *ncChain) Key() string { } func (ncc *ncChain) Close() { - ncc.nc.DetachMilestones(ncc.milestoneClosure) + // Nothing. The ncc.nc.ctx is used for that. } func (ncc *ncChain) PublishTransaction(tx *iotago.Transaction, timeout ...time.Duration) error { diff --git a/packages/nodeconn/nodeconn.go b/packages/nodeconn/nodeconn.go index 6671bf08a4..90dd29b718 100644 --- a/packages/nodeconn/nodeconn.go +++ b/packages/nodeconn/nodeconn.go @@ -16,6 +16,7 @@ import ( "golang.org/x/xerrors" "github.com/iotaledger/hive.go/core/events" + hivecore "github.com/iotaledger/hive.go/core/events" "github.com/iotaledger/hive.go/core/logger" "github.com/iotaledger/hive.go/core/workerpool" "github.com/iotaledger/hive.go/serializer/v2" @@ -50,6 +51,7 @@ type nodeConn struct { chains map[string]*ncChain // key = iotago.Address.Key() chainsLock sync.RWMutex indexerClient nodeclient.IndexerClient + milestones *events.Event metrics nodeconnmetrics.NodeConnectionMetrics log *logger.Logger nodeBridge *nodebridge.NodeBridge @@ -61,6 +63,8 @@ type nodeConn struct { reattachWorkerPool *workerpool.WorkerPool } +var _ chain.NodeConnection = &nodeConn{} + func setL1ProtocolParams(protocolParameters *iotago.ProtocolParameters, baseToken *nodeclient.InfoResBaseToken) { parameters.InitL1(¶meters.L1Params{ // There are no limits on how big from a size perspective an essence can be, so it is just derived from 32KB - Block fields without payload = max size of the payload @@ -101,10 +105,13 @@ func New(ctx context.Context, log *logger.Logger, nodeBridge *nodebridge.NodeBri } nc := nodeConn{ - ctx: ctx, - chains: make(map[string]*ncChain), - chainsLock: sync.RWMutex{}, - indexerClient: indexerClient, + ctx: ctx, + chains: make(map[string]*ncChain), + chainsLock: sync.RWMutex{}, + indexerClient: indexerClient, + milestones: events.NewEvent(func(handler interface{}, params ...interface{}) { + handler.(chain.NodeConnectionMilestonesHandlerFun)(params[0].(*nodeclient.MilestoneInfo)) + }), metrics: nodeconnmetrics.NewEmptyNodeConnectionMetrics(), log: log.Named("nc"), nodeBridge: nodeBridge, @@ -117,6 +124,7 @@ func New(ctx context.Context, log *logger.Logger, nodeBridge *nodebridge.NodeBri nc.reattachWorkerPool.Start() go nc.subscribeToLedgerUpdates() + nc.enableMilestoneTrigger() return &nc } @@ -129,23 +137,23 @@ func (nc *nodeConn) subscribeToLedgerUpdates() { } func (nc *nodeConn) handleLedgerUpdate(update *nodebridge.LedgerUpdate) error { + // create maps for faster lookup. + // outputs that are created and consumed in the same milestone exist in both maps. + newSpentsMap := make(map[iotago.OutputID]struct{}) + for _, spent := range update.Consumed { + newSpentsMap[spent.GetOutput().GetOutputId().Unwrap()] = struct{}{} + } + + newOutputsMap := make(map[iotago.OutputID]struct{}) + for _, output := range update.Created { + newOutputsMap[output.GetOutputId().Unwrap()] = struct{}{} + } + nc.chainsLock.RLock() defer nc.chainsLock.RUnlock() // inline function used to release the lock with defer - go func() { - // create maps for faster lookup. - // outputs that are created and consumed in the same milestone exist in both maps. - newSpentsMap := make(map[iotago.OutputID]struct{}) - for _, spent := range update.Consumed { - newSpentsMap[spent.GetOutput().GetOutputId().Unwrap()] = struct{}{} - } - - newOutputsMap := make(map[iotago.OutputID]struct{}) - for _, output := range update.Created { - newOutputsMap[output.GetOutputId().Unwrap()] = struct{}{} - } - + func() { nc.pendingTransactionsLock.Lock() defer nc.pendingTransactionsLock.Unlock() @@ -201,7 +209,7 @@ func (nc *nodeConn) handleLedgerUpdate(update *nodebridge.LedgerUpdate) error { chainID := isc.ChainIDFromAliasID(aliasID) ncChain := nc.chains[chainID.Key()] if ncChain != nil { - go ncChain.HandleStateUpdate(outputID, aliasOutput) + ncChain.HandleStateUpdate(outputID, aliasOutput) } } @@ -214,7 +222,7 @@ func (nc *nodeConn) handleLedgerUpdate(update *nodebridge.LedgerUpdate) error { chainID := isc.ChainIDFromAliasID(unlockAliasAddr.AliasID()) ncChain := nc.chains[chainID.Key()] if ncChain != nil { - go ncChain.HandleUnlockableOutput(ledgerOutput.GetOutputId().Unwrap(), output) + ncChain.HandleUnlockableOutput(ledgerOutput.GetOutputId().Unwrap(), output) } } } @@ -222,6 +230,21 @@ func (nc *nodeConn) handleLedgerUpdate(update *nodebridge.LedgerUpdate) error { return nil } +func (nc *nodeConn) enableMilestoneTrigger() { + nc.nodeBridge.Events.LatestMilestoneChanged.Hook(hivecore.NewClosure(func(metadata *nodebridge.Milestone) { + milestone := nodeclient.MilestoneInfo{ + MilestoneID: metadata.MilestoneID.String(), + Index: metadata.Milestone.Index, + Timestamp: metadata.Milestone.Timestamp, + } + + nc.log.Debugf("Milestone received, index=%v, timestamp=%v", milestone.Index, milestone.Timestamp) + + nc.metrics.GetInMilestone().CountLastMessage(milestone) + nc.milestones.Trigger(&milestone) + })) +} + func (nc *nodeConn) SetMetrics(metrics nodeconnmetrics.NodeConnectionMetrics) { nc.metrics = metrics } @@ -231,10 +254,9 @@ func (nc *nodeConn) RegisterChain( chainID *isc.ChainID, stateOutputHandler, outputHandler func(iotago.OutputID, iotago.Output), - milestoneHandler func(*nodebridge.Milestone), ) { nc.metrics.SetRegistered(chainID) - ncc := newNCChain(nc, chainID, stateOutputHandler, outputHandler, milestoneHandler) + ncc := newNCChain(nc, chainID, stateOutputHandler, outputHandler) nc.chainsLock.Lock() defer nc.chainsLock.Unlock() nc.chains[chainID.Key()] = ncc @@ -268,7 +290,7 @@ func (nc *nodeConn) GetChain(chainID *isc.ChainID) (*ncChain, error) { } // PublishStateTransaction implements chain.NodeConnection. -func (nc *nodeConn) PublishTransaction(chainID *isc.ChainID, tx *iotago.Transaction) error { +func (nc *nodeConn) PublishStateTransaction(chainID *isc.ChainID, stateIndex uint32, tx *iotago.Transaction) error { ncc, err := nc.GetChain(chainID) if err != nil { return err @@ -277,16 +299,48 @@ func (nc *nodeConn) PublishTransaction(chainID *isc.ChainID, tx *iotago.Transact return ncc.PublishTransaction(tx, inxTimeoutPublishTransaction) } +// PublishGovernanceTransaction implements chain.NodeConnection. +// TODO: identical to PublishStateTransaction; needs to be reviewed +func (nc *nodeConn) PublishGovernanceTransaction(chainID *isc.ChainID, tx *iotago.Transaction) error { + ncc, err := nc.GetChain(chainID) + if err != nil { + return err + } + + return ncc.PublishTransaction(tx, inxTimeoutPublishTransaction) +} + +func (nc *nodeConn) AttachTxInclusionStateEvents(chainID *isc.ChainID, handler chain.NodeConnectionInclusionStateHandlerFun) (*events.Closure, error) { + ncc, err := nc.GetChain(chainID) + if err != nil { + return nil, err + } + + closure := events.NewClosure(handler) + ncc.inclusionStates.Hook(closure) + return closure, nil +} + +func (nc *nodeConn) DetachTxInclusionStateEvents(chainID *isc.ChainID, closure *events.Closure) error { + ncc, err := nc.GetChain(chainID) + if err != nil { + return err + } + + ncc.inclusionStates.Detach(closure) + return nil +} + // AttachMilestones implements chain.NodeConnection. -func (nc *nodeConn) AttachMilestones(handler func(*nodebridge.Milestone)) *events.Closure { +func (nc *nodeConn) AttachMilestones(handler chain.NodeConnectionMilestonesHandlerFun) *events.Closure { closure := events.NewClosure(handler) - nc.nodeBridge.Events.LatestMilestoneChanged.Hook(closure) + nc.milestones.Hook(closure) return closure } // DetachMilestones implements chain.NodeConnection. func (nc *nodeConn) DetachMilestones(attachID *events.Closure) { - nc.nodeBridge.Events.LatestMilestoneChanged.Detach(attachID) + nc.milestones.Detach(attachID) } func (nc *nodeConn) PullLatestOutput(chainID *isc.ChainID) { @@ -298,6 +352,11 @@ func (nc *nodeConn) PullLatestOutput(chainID *isc.ChainID) { ncc.queryLatestChainStateUTXO() } +func (nc *nodeConn) PullTxInclusionState(chainID *isc.ChainID, txid iotago.TransactionID) { + // TODO - is this needed? - output should come from INX subscription + // we are also constantly polling for confirmation in the promotion/reattachment logic +} + func (nc *nodeConn) PullStateOutputByID(chainID *isc.ChainID, id *iotago.UTXOInput) { ncc := nc.chains[chainID.Key()] if ncc == nil { diff --git a/packages/testutil/testchain/mock_ledger.go b/packages/testutil/testchain/mock_ledger.go index e0d240ff5d..ac33bbfa57 100644 --- a/packages/testutil/testchain/mock_ledger.go +++ b/packages/testutil/testchain/mock_ledger.go @@ -8,24 +8,26 @@ import ( "github.com/iotaledger/hive.go/core/logger" iotago "github.com/iotaledger/iota.go/v3" "github.com/iotaledger/iota.go/v3/tpkg" + "github.com/iotaledger/wasp/packages/chain" "github.com/iotaledger/wasp/packages/isc" "github.com/iotaledger/wasp/packages/state" ) type MockedLedger struct { - latestOutputID *iotago.UTXOInput - outputs map[iotago.UTXOInput]*iotago.AliasOutput - txIDs map[iotago.TransactionID]bool - publishTransactionAllowedFun func(tx *iotago.Transaction) bool - pullLatestOutputAllowed bool - pullTxInclusionStateAllowedFun func(iotago.TransactionID) bool - pullOutputByIDAllowedFun func(*iotago.UTXOInput) bool - pushOutputToNodesNeededFun func(*iotago.Transaction, *iotago.UTXOInput, iotago.Output) bool - stateOutputHandlerFuns map[string]func(iotago.OutputID, iotago.Output) - outputHandlerFuns map[string]func(iotago.OutputID, iotago.Output) - inclusionStateEvents map[string]*events.Event - mutex sync.RWMutex - log *logger.Logger + latestOutputID *iotago.UTXOInput + outputs map[iotago.UTXOInput]*iotago.AliasOutput + txIDs map[iotago.TransactionID]bool + publishStateTransactionAllowedFun func(stateIndex uint32, tx *iotago.Transaction) bool + publishGovernanceTransactionAllowedFun func(tx *iotago.Transaction) bool + pullLatestOutputAllowed bool + pullTxInclusionStateAllowedFun func(iotago.TransactionID) bool + pullOutputByIDAllowedFun func(*iotago.UTXOInput) bool + pushOutputToNodesNeededFun func(uint32, *iotago.Transaction, *iotago.UTXOInput, iotago.Output) bool + stateOutputHandlerFuns map[string]func(iotago.OutputID, iotago.Output) + outputHandlerFuns map[string]func(iotago.OutputID, iotago.Output) + inclusionStateEvents map[string]*events.Event + mutex sync.RWMutex + log *logger.Logger } func NewMockedLedger(stateAddress iotago.Address, log *logger.Logger) (*MockedLedger, *isc.ChainID) { @@ -85,46 +87,62 @@ func (mlT *MockedLedger) Unregister(nodeID string) { delete(mlT.outputHandlerFuns, nodeID) } -func (mlT *MockedLedger) PublishTransaction(tx *iotago.Transaction) error { +func (mlT *MockedLedger) PublishStateTransaction(stateIndex uint32, tx *iotago.Transaction) error { mlT.mutex.Lock() defer mlT.mutex.Unlock() - if mlT.publishTransactionAllowedFun(tx) { - mlT.log.Debugf("Publishing transaction allowed, transaction has %v inputs, %v outputs, %v unlock blocks", - len(tx.Essence.Inputs), len(tx.Essence.Outputs), len(tx.Unlocks)) + mlT.log.Debugf("Publishing state transaction for state %v", stateIndex) + if mlT.publishStateTransactionAllowedFun(stateIndex, tx) { + mlT.log.Debugf("Publishing state transaction for state %v allowed, transaction has %v inputs, %v outputs, %v unlock blocks", + stateIndex, len(tx.Essence.Inputs), len(tx.Essence.Outputs), len(tx.Unlocks)) txID, err := tx.ID() if err != nil { - mlT.log.Panicf("Publishing transaction: cannot calculate transaction id: %v", err) + mlT.log.Panicf("Publishing state transaction for state %v: cannot calculate transaction id: %v", stateIndex, err) } - mlT.log.Debugf("Publishing transaction: transaction id is %s", isc.TxID(txID)) + mlT.log.Debugf("Publishing state transaction for state %v: transaction id is %s", stateIndex, isc.TxID(txID)) mlT.txIDs[txID] = true for index, output := range tx.Essence.Outputs { aliasOutput, ok := output.(*iotago.AliasOutput) outputID := iotago.OutputIDFromTransactionIDAndIndex(txID, uint16(index)).UTXOInput() - mlT.log.Debugf("Publishing transaction: outputs[%v] has id %v", index, isc.OID(outputID)) + mlT.log.Debugf("Publishing state transaction for state %v: outputs[%v] has id %v", stateIndex, index, isc.OID(outputID)) if ok { - mlT.log.Debugf("Publishing transaction: outputs[%v] is alias output", index) + mlT.log.Debugf("Publishing state transaction for state %v: outputs[%v] is alias output", stateIndex, index) mlT.outputs[*outputID] = aliasOutput currentLatestOutput := mlT.getOutput(mlT.latestOutputID) if currentLatestOutput == nil || currentLatestOutput.StateIndex < aliasOutput.StateIndex { - mlT.log.Debugf("Publishing transaction: outputs[%v] is newer than current newest output (%v -> %v)", - index, currentLatestOutput.StateIndex, aliasOutput.StateIndex) + mlT.log.Debugf("Publishing state transaction for state %v: outputs[%v] is newer than current newest output (%v -> %v)", + stateIndex, index, currentLatestOutput.StateIndex, aliasOutput.StateIndex) mlT.latestOutputID = outputID } } - if mlT.pushOutputToNodesNeededFun(tx, outputID, output) { - mlT.log.Debugf("Publishing transaction: pushing it to nodes") + if mlT.pushOutputToNodesNeededFun(stateIndex, tx, outputID, output) { + mlT.log.Debugf("Publishing state transaction for state %v: pushing it to nodes", stateIndex) for nodeID, handler := range mlT.stateOutputHandlerFuns { - mlT.log.Debugf("Publishing transaction: pushing it to node %v", nodeID) + mlT.log.Debugf("Publishing state transaction for state %v: pushing it to node %v", stateIndex, nodeID) go handler(outputID.ID(), output) } } else { - mlT.log.Debugf("Publishing transaction: pushing it to nodes not needed") + mlT.log.Debugf("Publishing state transaction for state %v: pushing it to nodes not needed", stateIndex) } } return nil } - return fmt.Errorf("Publishing transaction not allowed") + return fmt.Errorf("Publishing state transaction for state %v not allowed", stateIndex) +} + +func (mlT *MockedLedger) PublishGovernanceTransaction(tx *iotago.Transaction) error { + mlT.log.Debugf("Publishing governance rotation transaction") + if mlT.publishGovernanceTransactionAllowedFun(tx) { + mlT.log.Debugf("Publishing governance rotation transaction allowed, transaction has %v inputs, %v outputs, %v unlock blocks", + len(tx.Essence.Inputs), len(tx.Essence.Outputs), len(tx.Unlocks)) + txID, err := tx.ID() + if err != nil { + mlT.log.Panicf("Publishing governance rotation transaction: cannot calculate transaction id: %v", err) + } + mlT.log.Debugf("Publishing governance rotation transaction %s", isc.TxID(txID)) + return nil + } + return fmt.Errorf("Publishing governance rotation transaction not allowed") } func (mlT *MockedLedger) PullLatestOutput(nodeID string) { @@ -228,15 +246,43 @@ func (mlT *MockedLedger) getOutput(id *iotago.UTXOInput) *iotago.AliasOutput { return nil } +func (mlT *MockedLedger) AttachTxInclusionStateEvents(nodeID string, handler chain.NodeConnectionInclusionStateHandlerFun) (*events.Closure, error) { + mlT.mutex.Lock() + defer mlT.mutex.Unlock() + + closure := events.NewClosure(handler) + event, ok := mlT.inclusionStateEvents[nodeID] + if !ok { + event = events.NewEvent(func(handler interface{}, params ...interface{}) { + handler.(chain.NodeConnectionInclusionStateHandlerFun)(params[0].(iotago.TransactionID), params[1].(string)) + }) + mlT.inclusionStateEvents[nodeID] = event + } + event.Hook(closure) + return closure, nil +} + +func (mlT *MockedLedger) DetachTxInclusionStateEvents(nodeID string, closure *events.Closure) error { + mlT.mutex.Lock() + defer mlT.mutex.Unlock() + + event, ok := mlT.inclusionStateEvents[nodeID] + if !ok { + mlT.log.Panicf("Cannot dettach from event of node %v: no such event", nodeID) + } + event.Detach(closure) + return nil +} + func (mlT *MockedLedger) SetPublishStateTransactionAllowed(flag bool) { - mlT.SetPublishStateTransactionAllowedFun(func(*iotago.Transaction) bool { return flag }) + mlT.SetPublishStateTransactionAllowedFun(func(uint32, *iotago.Transaction) bool { return flag }) } -func (mlT *MockedLedger) SetPublishStateTransactionAllowedFun(fun func(tx *iotago.Transaction) bool) { +func (mlT *MockedLedger) SetPublishStateTransactionAllowedFun(fun func(stateIndex uint32, tx *iotago.Transaction) bool) { mlT.mutex.Lock() defer mlT.mutex.Unlock() - mlT.publishTransactionAllowedFun = fun + mlT.publishStateTransactionAllowedFun = fun } func (mlT *MockedLedger) SetPublishGovernanceTransactionAllowed(flag bool) { @@ -247,7 +293,7 @@ func (mlT *MockedLedger) SetPublishGovernanceTransactionAllowedFun(fun func(tx * mlT.mutex.Lock() defer mlT.mutex.Unlock() - mlT.publishTransactionAllowedFun = fun + mlT.publishGovernanceTransactionAllowedFun = fun } func (mlT *MockedLedger) SetPullLatestOutputAllowed(flag bool) { @@ -280,10 +326,10 @@ func (mlT *MockedLedger) SetPullOutputByIDAllowedFun(fun func(outputID *iotago.U } func (mlT *MockedLedger) SetPushOutputToNodesNeeded(flag bool) { - mlT.SetPushOutputToNodesNeededFun(func(*iotago.Transaction, *iotago.UTXOInput, iotago.Output) bool { return flag }) + mlT.SetPushOutputToNodesNeededFun(func(uint32, *iotago.Transaction, *iotago.UTXOInput, iotago.Output) bool { return flag }) } -func (mlT *MockedLedger) SetPushOutputToNodesNeededFun(fun func(tx *iotago.Transaction, outputID *iotago.UTXOInput, output iotago.Output) bool) { +func (mlT *MockedLedger) SetPushOutputToNodesNeededFun(fun func(state uint32, tx *iotago.Transaction, outputID *iotago.UTXOInput, output iotago.Output) bool) { mlT.mutex.Lock() defer mlT.mutex.Unlock() diff --git a/packages/testutil/testchain/mock_ledgers.go b/packages/testutil/testchain/mock_ledgers.go index 1a0e87e0a5..0a739e2db9 100644 --- a/packages/testutil/testchain/mock_ledgers.go +++ b/packages/testutil/testchain/mock_ledgers.go @@ -6,8 +6,9 @@ import ( "github.com/iotaledger/hive.go/core/events" "github.com/iotaledger/hive.go/core/logger" - "github.com/iotaledger/inx-app/nodebridge" iotago "github.com/iotaledger/iota.go/v3" + "github.com/iotaledger/iota.go/v3/nodeclient" + "github.com/iotaledger/wasp/packages/chain" "github.com/iotaledger/wasp/packages/isc" ) @@ -23,7 +24,7 @@ func NewMockedLedgers(log *logger.Logger) *MockedLedgers { result := &MockedLedgers{ ledgers: make(map[string]*MockedLedger), milestones: events.NewEvent(func(handler interface{}, params ...interface{}) { - handler.(func(*nodebridge.Milestone))(params[0].(*nodebridge.Milestone)) + handler.(chain.NodeConnectionMilestonesHandlerFun)(params[0].(*nodeclient.MilestoneInfo)) }), log: log.Named("mls"), } @@ -51,7 +52,7 @@ func (mlT *MockedLedgers) GetLedger(chainID *isc.ChainID) *MockedLedger { return result } -func (mlT *MockedLedgers) AttachMilestones(handler func(*nodebridge.Milestone)) *events.Closure { +func (mlT *MockedLedgers) AttachMilestones(handler chain.NodeConnectionMilestonesHandlerFun) *events.Closure { closure := events.NewClosure(handler) mlT.milestones.Hook(closure) return closure @@ -68,12 +69,9 @@ func (mlT *MockedLedgers) pushMilestonesLoop() { mlT.log.Debugf("Milestone %v reached, will push to nodes: %v", milestone, mlT.pushMilestonesNeeded) } if mlT.pushMilestonesNeeded { - mlT.milestones.Trigger(&nodebridge.Milestone{ - MilestoneID: [32]byte{}, - Milestone: &iotago.Milestone{ - Index: milestone, - Timestamp: uint32(time.Now().Unix()), - }, + mlT.milestones.Trigger(&nodeclient.MilestoneInfo{ + Index: milestone, + Timestamp: uint32(time.Now().Unix()), }) } time.Sleep(100 * time.Millisecond) diff --git a/packages/testutil/testchain/mock_nodeconn.go b/packages/testutil/testchain/mock_nodeconn.go index e8944a4c29..f48d6aceac 100644 --- a/packages/testutil/testchain/mock_nodeconn.go +++ b/packages/testutil/testchain/mock_nodeconn.go @@ -5,7 +5,6 @@ import ( "github.com/iotaledger/hive.go/core/events" "github.com/iotaledger/hive.go/core/logger" - "github.com/iotaledger/inx-app/nodebridge" iotago "github.com/iotaledger/iota.go/v3" "github.com/iotaledger/wasp/packages/chain" "github.com/iotaledger/wasp/packages/isc" @@ -16,24 +15,22 @@ type MockedNodeConn struct { log *logger.Logger ledgers *MockedLedgers id string - publishTransactionAllowedFun func(chainID *isc.ChainID, tx *iotago.Transaction) bool + publishStateTransactionAllowedFun func(chainID *isc.ChainID, stateIndex uint32, tx *iotago.Transaction) bool publishGovernanceTransactionAllowedFun func(chainID *isc.ChainID, tx *iotago.Transaction) bool pullLatestOutputAllowed bool pullTxInclusionStateAllowedFun func(chainID *isc.ChainID, txID iotago.TransactionID) bool pullOutputByIDAllowedFun func(chainID *isc.ChainID, outputID *iotago.UTXOInput) bool stopChannel chan bool - attachMilestonesClosures map[isc.ChainID]*events.Closure } var _ chain.NodeConnection = &MockedNodeConn{} func NewMockedNodeConnection(id string, ledgers *MockedLedgers, log *logger.Logger) *MockedNodeConn { result := &MockedNodeConn{ - log: log.Named("mnc"), - id: id, - ledgers: ledgers, - stopChannel: make(chan bool), - attachMilestonesClosures: make(map[isc.ChainID]*events.Closure), + log: log.Named("mnc"), + id: id, + ledgers: ledgers, + stopChannel: make(chan bool), } result.SetPublishStateTransactionAllowed(true) result.SetPublishGovernanceTransactionAllowed(true) @@ -48,26 +45,26 @@ func (mncT *MockedNodeConn) ID() string { return mncT.id } -func (mncT *MockedNodeConn) RegisterChain( - chainID *isc.ChainID, - stateOutputHandler, - outputHandler func(iotago.OutputID, iotago.Output), - milestoneHandler func(*nodebridge.Milestone), -) { +func (mncT *MockedNodeConn) RegisterChain(chainID *isc.ChainID, stateOutputHandler, outputHandler func(iotago.OutputID, iotago.Output)) { mncT.ledgers.GetLedger(chainID).Register(mncT.id, stateOutputHandler, outputHandler) - mncT.attachMilestonesClosures[*chainID] = mncT.AttachMilestones(milestoneHandler) } func (mncT *MockedNodeConn) UnregisterChain(chainID *isc.ChainID) { mncT.ledgers.GetLedger(chainID).Unregister(mncT.id) - mncT.DetachMilestones(mncT.attachMilestonesClosures[*chainID]) } -func (mncT *MockedNodeConn) PublishTransaction(chainID *isc.ChainID, tx *iotago.Transaction) error { - if mncT.publishTransactionAllowedFun(chainID, tx) { - return mncT.ledgers.GetLedger(chainID).PublishTransaction(tx) +func (mncT *MockedNodeConn) PublishStateTransaction(chainID *isc.ChainID, stateIndex uint32, tx *iotago.Transaction) error { + if mncT.publishStateTransactionAllowedFun(chainID, stateIndex, tx) { + return mncT.ledgers.GetLedger(chainID).PublishStateTransaction(stateIndex, tx) } - return fmt.Errorf("Publishing state transaction for chain %s is not allowed", chainID) + return fmt.Errorf("Publishing state transaction for address %s of index %v is not allowed", chainID, stateIndex) +} + +func (mncT *MockedNodeConn) PublishGovernanceTransaction(chainID *isc.ChainID, tx *iotago.Transaction) error { + if mncT.publishGovernanceTransactionAllowedFun(chainID, tx) { + return mncT.ledgers.GetLedger(chainID).PublishGovernanceTransaction(tx) + } + return fmt.Errorf("Publishing governance rotation transaction for address %s is not allowed", chainID) } func (mncT *MockedNodeConn) PullLatestOutput(chainID *isc.ChainID) { @@ -78,6 +75,14 @@ func (mncT *MockedNodeConn) PullLatestOutput(chainID *isc.ChainID) { } } +func (mncT *MockedNodeConn) PullTxInclusionState(chainID *isc.ChainID, txid iotago.TransactionID) { + if mncT.pullTxInclusionStateAllowedFun(chainID, txid) { + mncT.ledgers.GetLedger(chainID).PullTxInclusionState(mncT.id, txid) + } else { + mncT.log.Errorf("Pull transaction inclusion state for address %s txID %v is not allowed", chainID, isc.TxID(txid)) + } +} + func (mncT *MockedNodeConn) PullStateOutputByID(chainID *isc.ChainID, id *iotago.UTXOInput) { if mncT.pullOutputByIDAllowedFun(chainID, id) { mncT.ledgers.GetLedger(chainID).PullStateOutputByID(mncT.id, id) @@ -86,6 +91,22 @@ func (mncT *MockedNodeConn) PullStateOutputByID(chainID *isc.ChainID, id *iotago } } +func (mncT *MockedNodeConn) AttachTxInclusionStateEvents(chainID *isc.ChainID, handler chain.NodeConnectionInclusionStateHandlerFun) (*events.Closure, error) { + return mncT.ledgers.GetLedger(chainID).AttachTxInclusionStateEvents(mncT.id, handler) +} + +func (mncT *MockedNodeConn) DetachTxInclusionStateEvents(chainID *isc.ChainID, closure *events.Closure) error { + return mncT.ledgers.GetLedger(chainID).DetachTxInclusionStateEvents(mncT.id, closure) +} + +func (mncT *MockedNodeConn) AttachMilestones(handler chain.NodeConnectionMilestonesHandlerFun) *events.Closure { + return mncT.ledgers.AttachMilestones(handler) +} + +func (mncT *MockedNodeConn) DetachMilestones(attachID *events.Closure) { + mncT.ledgers.DetachMilestones(attachID) +} + func (mncT *MockedNodeConn) SetMetrics(metrics nodeconnmetrics.NodeConnectionMetrics) { } @@ -97,11 +118,11 @@ func (mncT *MockedNodeConn) Close() { } func (mncT *MockedNodeConn) SetPublishStateTransactionAllowed(flag bool) { - mncT.SetPublishStateTransactionAllowedFun(func(*isc.ChainID, *iotago.Transaction) bool { return flag }) + mncT.SetPublishStateTransactionAllowedFun(func(*isc.ChainID, uint32, *iotago.Transaction) bool { return flag }) } -func (mncT *MockedNodeConn) SetPublishStateTransactionAllowedFun(fun func(chainID *isc.ChainID, tx *iotago.Transaction) bool) { - mncT.publishTransactionAllowedFun = fun +func (mncT *MockedNodeConn) SetPublishStateTransactionAllowedFun(fun func(chainID *isc.ChainID, stateIndex uint32, tx *iotago.Transaction) bool) { + mncT.publishStateTransactionAllowedFun = fun } func (mncT *MockedNodeConn) SetPublishGovernanceTransactionAllowed(flag bool) { @@ -132,10 +153,142 @@ func (mncT *MockedNodeConn) SetPullOutputByIDAllowedFun(fun func(chainID *isc.Ch mncT.pullOutputByIDAllowedFun = fun } -func (mncT *MockedNodeConn) AttachMilestones(handler func(*nodebridge.Milestone)) *events.Closure { - return mncT.ledgers.AttachMilestones(handler) +/*func (m *MockedNodeConn) PullLatestOutput() { + m.log.Debugf("Pull latest state output") + if m.pullLatestStateOutputAllowed { + m.log.Debugf("Pull latest state output allowed") + output := m.ledger.PullState() + if output != nil { + m.log.Debugf("Pull latest state output successful") + go m.handleUnspentAliasOutputFun(output, time.Now()) + } + } } -func (mncT *MockedNodeConn) DetachMilestones(attachID *events.Closure) { - mncT.ledgers.DetachMilestones(attachID) +func (m *MockedNodeConn) PullTxInclusionState(txid iotago.TransactionID) { + panic("TODO implement") +} + +func (m *MockedNodeConn) PullOutputByID(outputID *iotago.UTXOInput) { + m.log.Debugf("Pull output by id %v", isc.OID(outputID)) + if m.pullOutputByIDAllowedFun(outputID) { + m.log.Debugf("Pull output by id %v allowed", isc.OID(outputID)) + output := m.ledger.PullConfirmedOutput(outputID) + if output != nil { + m.log.Debugf("Pull confirmed output %v successful", isc.OID(outputID)) + go m.handleOutputFun(output, outputID) + } + } +} + +func (m *MockedNodeConn) PublishTransaction(stateIndex uint32, tx *iotago.Transaction) error { + m.log.Debugf("Publishing transaction for state %v", stateIndex) + if m.receiveTxAllowedFun(stateIndex, tx) { + m.log.Debugf("Publishing transaction for state %v allowed", stateIndex) + m.ledger.receiveTx(tx) + return nil + } + return fmt.Errorf("Publishing transaction for state %v not allowed", stateIndex) +} + +func (m *MockedNodeConn) SetPullLatestStateOutputAllowed(flag bool) { + m.pullLatestStateOutputAllowed = flag +} + +func (m *MockedNodeConn) SetPullConfirmedOutputAllowed(flag bool) { + m.SetPullConfirmedOutputAllowedFun(func(*iotago.UTXOInput) bool { return flag }) +} + +func (m *MockedNodeConn) SetPullOutputByIDAllowedFun(fun func(*iotago.UTXOInput) bool) { + m.pullOutputByIDAllowedFun = fun +} + +func (m *MockedNodeConn) SetReceiveTxAllowed(flag bool) { + m.SetReceiveTxAllowedFun(func(uint32, *iotago.Transaction) bool { return flag }) +} + +func (m *MockedNodeConn) SetReceiveTxAllowedFun(fun func(stateIndex uint32, tx *iotago.Transaction) bool) { + m.receiveTxAllowedFun = fun } + +func (m *MockedNodeConn) defaultHandleTimeDataFun(*isc.TimeData) {} + +func (m *MockedNodeConn) AttachToTimeData(fun chain.NodeConnectionHandleTimeDataFun) { + m.handleTimeDataFun = fun +} + +func (m *MockedNodeConn) DetachFromTimeData() { + m.handleTimeDataFun = m.defaultHandleTimeDataFun +} + +func (m *MockedNodeConn) defaultHandleTransactionFun(*iotago.Transaction) {} + +func (m *MockedNodeConn) AttachToTransactionReceived(fun chain.NodeConnectionHandleTransactionFun) { + m.handleTransactionFun = fun +} + +func (m *MockedNodeConn) DetachFromTransactionReceived() { + m.handleTransactionFun = m.defaultHandleTransactionFun +}*/ + +// func (m *MockedNodeConn) DetachFromInclusionStateReceived() { /* TODO */ } + +/*func (m *MockedNodeConn) defaultHandleOutputFun(iotago.Output, *iotago.UTXOInput) {} + +func (m *MockedNodeConn) AttachToOutputReceived(fun chain.NodeConnectionHandleOutputFun) { + m.handleOutputFun = fun +} + +func (m *MockedNodeConn) DetachFromOutputReceived() { + m.handleOutputFun = m.defaultHandleOutputFun +} + +func (m *MockedNodeConn) defaultHandleUnspentAliasOutputFun(*isc.AliasOutputWithID, time.Time) {} + +func (m *MockedNodeConn) AttachToUnspentAliasOutputReceived(fun chain.NodeConnectionHandleUnspentAliasOutputFun) { + m.handleUnspentAliasOutputFun = fun +} + +func (m *MockedNodeConn) DetachFromUnspentAliasOutputReceived() { + m.handleUnspentAliasOutputFun = m.defaultHandleUnspentAliasOutputFun +} + +func (m *MockedNodeConn) Close() { + close(m.stopChannel) +} + +func (m *MockedNodeConn) GetMetrics() nodeconnmetrics.NodeConnectionMessagesMetrics { + return nodeconnmetrics.NewEmptyNodeConnectionMessagesMetrics() +} + +func (m *MockedNodeConn) pushMilestonesLoop() { + milestone := uint32(0) + for { + select { + case <-time.After(100 * time.Millisecond): + m.handleTimeDataFun(&isc.TimeData{ + MilestoneIndex: milestone, + Time: time.Now(), + }) + milestone++ + case <-m.stopChannel: + return + } + } +}*/ + +/*AttachToAliasOutput(NodeConnectionAliasOutputHandlerFun) +DetachFromAliasOutput() +AttachToOnLedgerRequest(NodeConnectionOnLedgerRequestHandlerFun) +DetachFromOnLedgerRequest() +AttachToTxInclusionState(NodeConnectionInclusionStateHandlerFun) +DetachFromTxInclusionState() +AttachToMilestones(NodeConnectionMilestonesHandlerFun) +DetachFromMilestones() +Close() + ++PublishTransaction(stateIndex uint32, tx *iotago.Transaction) error ++PullLatestOutput() ++PullTxInclusionState(txid iotago.TransactionID) +PullOutputByID(*iotago.UTXOInput) +*/ diff --git a/packages/webapi/admapi/chainmetrics.go b/packages/webapi/admapi/chainmetrics.go index c765c49375..51f399b5cd 100644 --- a/packages/webapi/admapi/chainmetrics.go +++ b/packages/webapi/admapi/chainmetrics.go @@ -137,6 +137,7 @@ func addChainConcensusPipeMetricsEndpoints(adm echoswagger.ApiGroup, cms *chainM example := &model.ConsensusPipeMetrics{ EventStateTransitionMsgPipeSize: 0, EventPeerLogIndexMsgPipeSize: 0, + EventInclusionStateMsgPipeSize: 0, EventACSMsgPipeSize: 0, EventVMResultMsgPipeSize: 0, EventTimerMsgPipeSize: 0, diff --git a/packages/webapi/model/consensus_metrics.go b/packages/webapi/model/consensus_metrics.go index 2094049f33..7715e37409 100644 --- a/packages/webapi/model/consensus_metrics.go +++ b/packages/webapi/model/consensus_metrics.go @@ -57,6 +57,7 @@ func NewConsensusWorkflowStatus(status chain.ConsensusWorkflowStatus) *Consensus type ConsensusPipeMetrics struct { EventStateTransitionMsgPipeSize int EventPeerLogIndexMsgPipeSize int + EventInclusionStateMsgPipeSize int EventACSMsgPipeSize int EventVMResultMsgPipeSize int EventTimerMsgPipeSize int @@ -66,6 +67,7 @@ func NewConsensusPipeMetrics(pipeMetrics chain.ConsensusPipeMetrics) *ConsensusP return &ConsensusPipeMetrics{ EventStateTransitionMsgPipeSize: pipeMetrics.GetEventStateTransitionMsgPipeSize(), EventPeerLogIndexMsgPipeSize: pipeMetrics.GetEventPeerLogIndexMsgPipeSize(), + EventInclusionStateMsgPipeSize: pipeMetrics.GetEventInclusionStateMsgPipeSize(), EventACSMsgPipeSize: pipeMetrics.GetEventACSMsgPipeSize(), EventVMResultMsgPipeSize: pipeMetrics.GetEventVMResultMsgPipeSize(), EventTimerMsgPipeSize: pipeMetrics.GetEventTimerMsgPipeSize(), diff --git a/tools/cluster/templates/waspconfig.go b/tools/cluster/templates/waspconfig.go index c859b81961..beb1480645 100644 --- a/tools/cluster/templates/waspconfig.go +++ b/tools/cluster/templates/waspconfig.go @@ -66,7 +66,7 @@ const WaspConfig = ` }, "profiling": { "bindAddress": "0.0.0.0:{{.ProfilingPort}}", - "writeProfiles": false, + "writeProfiles": true, "enabled": false }, "wal": { diff --git a/tools/cluster/tests/nodeconn_test.go b/tools/cluster/tests/nodeconn_test.go index d91e55b124..2d84deed25 100644 --- a/tools/cluster/tests/nodeconn_test.go +++ b/tools/cluster/tests/nodeconn_test.go @@ -13,8 +13,10 @@ import ( "github.com/stretchr/testify/require" + "github.com/iotaledger/hive.go/core/logger" "github.com/iotaledger/inx-app/nodebridge" iotago "github.com/iotaledger/iota.go/v3" + "github.com/iotaledger/iota.go/v3/nodeclient" "github.com/iotaledger/wasp/packages/cryptolib" "github.com/iotaledger/wasp/packages/isc" "github.com/iotaledger/wasp/packages/l1connection" @@ -67,7 +69,7 @@ func TestNodeConn(t *testing.T) { // Start a peering network. // peeringID := peering.RandomPeeringID() peerNetIDs, peerIdentities := testpeers.SetupKeys(uint16(peerCount)) - networkLog := testlogger.WithLevel(log.Named("Network"), 0, false) + networkLog := testlogger.WithLevel(log.Named("Network"), logger.LevelInfo, false) _, networkCloser := testpeers.SetupNet( peerNetIDs, peerIdentities, @@ -76,15 +78,23 @@ func TestNodeConn(t *testing.T) { ) t.Logf("Peering network created.") - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() nodeBridge, err := nodebridge.NewNodeBridge(ctx, l1.Config.INXAddress, 10, log.Named("NodeBridge")) require.NoError(t, err) - go nodeBridge.Run(ctx) nc := nodeconn.New(ctx, log, nodeBridge) + // + // Check milestone attach/detach. + mChan := make(chan *nodeclient.MilestoneInfo, 10) + mSub := nc.AttachMilestones(func(m *nodeclient.MilestoneInfo) { + mChan <- m + }) + <-mChan + nc.DetachMilestones(mSub) + // // Check the chain operations. chainID := createChain(t) @@ -92,7 +102,6 @@ func TestNodeConn(t *testing.T) { chainOICh := make(chan iotago.OutputID) chainStateOuts := make(map[iotago.OutputID]iotago.Output) chainStateOutsICh := make(chan iotago.OutputID) - mChan := make(chan *nodebridge.Milestone, 10) nc.RegisterChain( chainID, func(oi iotago.OutputID, o iotago.Output) { @@ -102,12 +111,7 @@ func TestNodeConn(t *testing.T) { func(oi iotago.OutputID, o iotago.Output) { chainOuts[oi] = o chainOICh <- oi - }, - func(m *nodebridge.Milestone) { - mChan <- m - }, - ) - <-mChan + }) client := l1connection.NewClient(l1.Config, log) // Post a TX directly, and wait for it in the message stream (e.g. a request). @@ -117,17 +121,29 @@ func TestNodeConn(t *testing.T) { oid := <-chainOICh t.Logf("Waiting for outputs posted via tangle... Done, have %v=%v", oid.ToHex(), chainOuts[oid]) + // Post a TX via the NodeConn (e.g. alias output). + tiseCh := make(chan bool) + tise, err := nc.AttachTxInclusionStateEvents(chainID, func(txID iotago.TransactionID, inclusionState string) { + t.Logf("TX Inclusion state changed, txID=%v, state=%v", txID, inclusionState) + if inclusionState == "included" { + tiseCh <- true + } + }) require.NoError(t, err) wallet := cryptolib.NewKeyPair() client.RequestFunds(wallet.Address()) tx, err := l1connection.MakeSimpleValueTX(client, wallet, chainID.AsAddress(), 1*isc.Million) require.NoError(t, err) - err = nc.PublishTransaction(chainID, tx) + err = nc.PublishStateTransaction(chainID, uint32(0), tx) require.NoError(t, err) t.Logf("Waiting for outputs posted via nodeConn...") oid = <-chainOICh t.Logf("Waiting for outputs posted via nodeConn... Done, have %v=%v", oid.ToHex(), chainOuts[oid]) + t.Logf("Waiting for TX incusion event...") + <-tiseCh + t.Logf("Waiting for TX incusion event... Done") + nc.DetachTxInclusionStateEvents(chainID, tise) nc.UnregisterChain(chainID) // diff --git a/tools/cluster/tests/spam_test.go b/tools/cluster/tests/spam_test.go index eae8f05a98..4c3e0cb4a4 100644 --- a/tools/cluster/tests/spam_test.go +++ b/tools/cluster/tests/spam_test.go @@ -25,7 +25,6 @@ func TestSpamOnledger(t *testing.T) { testutil.RunHeavy(t) // in the privtangle setup, with 1s milestones, this test takes ~50m to process 10k requests const numRequests = 10_000 - // env := setupAdvancedInccounterTest(t, 4, []int{0, 1, 2, 3}) env := setupAdvancedInccounterTest(t, 1, []int{0}) // send requests from many different wallets to speed things up diff --git a/tools/wasp-cli/metrics/consensus.go b/tools/wasp-cli/metrics/consensus.go index 08fc248601..199a3bab08 100644 --- a/tools/wasp-cli/metrics/consensus.go +++ b/tools/wasp-cli/metrics/consensus.go @@ -40,6 +40,7 @@ var consensusMetricsCmd = &cobra.Command{ table[8] = makeWorkflowTableRow("Consensus is completed", !(workflowStatus.FlagInProgress), workflowStatus.TimeCompleted) table[9] = makeWorkflowTableRow("Current state index", workflowStatus.CurrentStateIndex, time.Time{}) table[10] = makeWorkflowTableRow("Event state transition message pipe size", pipeMetrics.EventStateTransitionMsgPipeSize, time.Time{}) + table[11] = makeWorkflowTableRow("Event inclusion state message pipe size", pipeMetrics.EventInclusionStateMsgPipeSize, time.Time{}) table[12] = makeWorkflowTableRow("Event ACS message pipe size", pipeMetrics.EventACSMsgPipeSize, time.Time{}) table[13] = makeWorkflowTableRow("Event VM result message pipe size", pipeMetrics.EventVMResultMsgPipeSize, time.Time{}) table[14] = makeWorkflowTableRow("Event timer message pipe size", pipeMetrics.EventTimerMsgPipeSize, time.Time{})