Skip to content

Commit

Permalink
sidechain/deploy: Use Waiter to track pending transactions
Browse files Browse the repository at this point in the history
Signed-off-by: Leonard Lyubich <[email protected]>
  • Loading branch information
cthulhu-rider committed Jul 9, 2023
1 parent 2e9161f commit 570e46b
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 224 deletions.
7 changes: 6 additions & 1 deletion pkg/morph/deploy/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ type Prm struct {
//
// See project documentation for details.
func Deploy(ctx context.Context, prm Prm) error {
// wrap the parent context into the context of the current function so that
// transaction wait routines do not leak
ctx, cancel := context.WithCancel(ctx)
defer cancel()

committee, err := prm.Blockchain.GetCommittee()
if err != nil {
return fmt.Errorf("get Neo committee of the network: %w", err)
Expand Down Expand Up @@ -198,7 +203,7 @@ func Deploy(ctx context.Context, prm Prm) error {

prm.Logger.Info("Notary service successfully enabled for the committee")

onNotaryDepositDeficiency, err := initNotaryDepositDeficiencyHandler(prm.Logger, prm.Blockchain, monitor, prm.LocalAccount)
onNotaryDepositDeficiency, err := initNotaryDepositDeficiencyHandler(ctx, prm.Logger, prm.Blockchain, prm.LocalAccount)
if err != nil {
return fmt.Errorf("construct action depositing funds to the local account's Notary balance: %w", err)
}
Expand Down
76 changes: 37 additions & 39 deletions pkg/morph/deploy/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ type initCommitteeGroupPrm struct {

// initCommitteeGroup initializes committee group and returns corresponding private key.
func initCommitteeGroup(ctx context.Context, prm initCommitteeGroupPrm) (*keys.PrivateKey, error) {
// wrap the parent context into the context of the current function so that
// transaction wait routines do not leak
ctx, cancel := context.WithCancel(ctx)
defer cancel()

inv := invoker.New(prm.blockchain, nil)
const leaderCommitteeIndex = 0
var committeeGroupKey *keys.PrivateKey
Expand Down Expand Up @@ -108,7 +113,7 @@ upperLoop:
}

if leaderTick == nil {
leaderTick, err = initShareCommitteeGroupKeyAsLeaderTick(prm, committeeGroupKey)
leaderTick, err = initShareCommitteeGroupKeyAsLeaderTick(ctx, prm, committeeGroupKey)
if err != nil {
prm.logger.Error("failed to construct action sharing committee group key between committee members as leader, will try again later",
zap.Error(err))
Expand All @@ -123,7 +128,7 @@ upperLoop:
// initShareCommitteeGroupKeyAsLeaderTick returns a function that preserves
// context of the committee group key distribution by leading committee member
// between calls.
func initShareCommitteeGroupKeyAsLeaderTick(prm initCommitteeGroupPrm, committeeGroupKey *keys.PrivateKey) (func(), error) {
func initShareCommitteeGroupKeyAsLeaderTick(ctx context.Context, prm initCommitteeGroupPrm, committeeGroupKey *keys.PrivateKey) (func(), error) {
localActor, err := actor.NewSimple(prm.blockchain, prm.localAcc)
if err != nil {
return nil, fmt.Errorf("init transaction sender from local account: %w", err)
Expand All @@ -132,7 +137,8 @@ func initShareCommitteeGroupKeyAsLeaderTick(prm initCommitteeGroupPrm, committee
invkr := invoker.New(prm.blockchain, nil)

// multi-tick context
mDomainsToVubs := make(map[string][2]uint32) // 1st - register, 2nd - addRecord
mDomainsToRegisterTxs := make(map[string]*transactionGroupMonitor, len(prm.committee))
mDomainsToSetRecordTxs := make(map[string]*transactionGroupMonitor, len(prm.committee))

return func() {
prm.logger.Info("distributing committee group key between committee members using NNS...")
Expand All @@ -148,58 +154,52 @@ func initShareCommitteeGroupKeyAsLeaderTick(prm initCommitteeGroupPrm, committee
if errors.Is(err, errMissingDomain) {
l.Info("NNS domain is missing, registration is needed")

vubs, ok := mDomainsToVubs[domain]
if ok && vubs[0] > 0 {
l.Info("transaction registering NNS domain was sent earlier, checking relevance...")

if cur := prm.monitor.currentHeight(); cur <= vubs[0] {
l.Info("previously sent transaction registering NNS domain may still be relevant, will wait for the outcome",
zap.Uint32("current height", cur), zap.Uint32("retry after height", vubs[0]))
return
txMonitor, ok := mDomainsToRegisterTxs[domain]
if ok {
if txMonitor.isPending() {
l.Info("previously sent transaction registering NNS domain is still pending, will wait for the outcome")
continue
}

l.Info("previously sent transaction registering NNS domain expired without side-effect")
} else {
txMonitor = newTransactionGroupMonitor(localActor)
mDomainsToRegisterTxs[domain] = txMonitor
}

l.Info("sending new transaction registering domain in the NNS...")

_, vub, err := localActor.SendCall(prm.nnsOnChainAddress, methodNNSRegister,
txID, vub, err := localActor.SendCall(prm.nnsOnChainAddress, methodNNSRegister,
domain, localActor.Sender(), prm.systemEmail, nnsRefresh, nnsRetry, nnsExpire, nnsMinimum)
if err != nil {
vubs[0] = 0
mDomainsToVubs[domain] = vubs
if isErrNotEnoughGAS(err) {
l.Info("not enough GAS to register domain in the NNS, will try again later")
} else {
l.Error("failed to send transaction registering domain in the NNS, will try again later", zap.Error(err))
}
return
continue
}

vubs[0] = vub
mDomainsToVubs[domain] = vubs
l.Info("transaction registering domain in the NNS has been successfully sent, will wait for the outcome",
zap.Stringer("tx", txID), zap.Uint32("vub", vub))

l.Info("transaction registering domain in the NNS has been successfully sent, will wait for the outcome")
txMonitor.trackPendingTransactionsAsync(ctx, vub, txID)

continue
} else if !errors.Is(err, errMissingDomainRecord) {
l.Error("failed to lookup NNS domain record, will try again later", zap.Error(err))
return
continue
}

l.Info("missing record of the NNS domain, needed to be set")

vubs, ok := mDomainsToVubs[domain]
if ok && vubs[1] > 0 {
l.Info("transaction setting NNS domain record was sent earlier, checking relevance...")

if cur := prm.monitor.currentHeight(); cur <= vubs[1] {
l.Info("previously sent transaction setting NNS domain record may still be relevant, will wait for the outcome",
zap.Uint32("current height", cur), zap.Uint32("retry after height", vubs[1]))
return
txMonitor, ok := mDomainsToSetRecordTxs[domain]
if ok {
if txMonitor.isPending() {
l.Info("previously sent transaction setting NNS domain record is still pending, will wait for the outcome")
continue
}

l.Info("previously sent transaction setting NNS domain record expired without side-effect")
} else {
txMonitor = newTransactionGroupMonitor(localActor)
mDomainsToSetRecordTxs[domain] = txMonitor
}

l.Info("sharing encrypted committee group key with the committee member...")
Expand All @@ -208,28 +208,26 @@ func initShareCommitteeGroupKeyAsLeaderTick(prm initCommitteeGroupPrm, committee
if err != nil {
l.Error("failed to encrypt committee group key to share with the committee member, will try again later",
zap.Error(err))
return
continue
}

l.Info("sending new transaction setting domain record in the NNS...")

_, vub, err := localActor.SendCall(prm.nnsOnChainAddress, methodNNSAddRecord,
txID, vub, err := localActor.SendCall(prm.nnsOnChainAddress, methodNNSAddRecord,
domain, int64(nns.TXT), keyCipher)
if err != nil {
vubs[1] = 0
mDomainsToVubs[domain] = vubs
if isErrNotEnoughGAS(err) {
l.Info("not enough GAS to set NNS domain record, will try again later")
} else {
l.Error("failed to send transaction setting NNS domain record, will try again later", zap.Error(err))
}
return
continue
}

vubs[1] = vub
mDomainsToVubs[domain] = vubs
l.Info("transaction setting NNS domain record has been successfully sent, will wait for the outcome",
zap.Stringer("tx", txID), zap.Uint32("vub", vub))

l.Info("transaction setting NNS domain record has been successfully sent, will wait for the outcome")
txMonitor.trackPendingTransactionsAsync(ctx, vub, txID)

continue
}
Expand Down
86 changes: 39 additions & 47 deletions pkg/morph/deploy/nns.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,19 @@ type deployNNSContractPrm struct {
// If contract is missing and deployNNSContractPrm.initCommitteeGroupKey is provided,
// initNNSContract attempts to deploy local contract.
func initNNSContract(ctx context.Context, prm deployNNSContractPrm) (res util.Uint160, err error) {
var managementContract *management.Contract
var sentTxValidUntilBlock uint32
localActor, err := actor.NewSimple(prm.blockchain, prm.localAcc)
if err != nil {
return res, fmt.Errorf("init transaction sender from local account: %w", err)
}

// wrap the parent context into the context of the current function so that
// transaction wait routines do not leak
ctx, cancel := context.WithCancel(ctx)
defer cancel()

var committeeGroupKey *keys.PrivateKey
txMonitor := newTransactionGroupMonitor(localActor)
managementContract := management.New(localActor)

for ; ; prm.monitor.waitForNextBlock(ctx) {
select {
Expand Down Expand Up @@ -141,47 +151,29 @@ func initNNSContract(ctx context.Context, prm deployNNSContractPrm) (res util.Ui
continue
}

setGroupInManifest(&prm.localManifest, prm.localNEF, committeeGroupKey, prm.localAcc.ScriptHash())

prm.logger.Info("private key of the committee group has been initialized", zap.Stringer("public key", committeeGroupKey.PublicKey()))
}

if sentTxValidUntilBlock > 0 {
prm.logger.Info("transaction deploying NNS contract was sent earlier, checking relevance...")

if cur := prm.monitor.currentHeight(); cur <= sentTxValidUntilBlock {
prm.logger.Info("previously sent transaction deploying NNS contract may still be relevant, will wait for the outcome",
zap.Uint32("current height", cur), zap.Uint32("retry after height", sentTxValidUntilBlock))
continue
}

prm.logger.Info("previously sent transaction deploying NNS contract expired without side-effect")
if txMonitor.isPending() {
prm.logger.Info("previously sent transaction updating NNS contract is still pending, will wait for the outcome")
continue
}

prm.logger.Info("sending new transaction deploying NNS contract...")

if managementContract == nil {
localActor, err := actor.NewSimple(prm.blockchain, prm.localAcc)
if err != nil {
prm.logger.Warn("NNS contract is missing on the chain but attempts to deploy are disabled, will try again later")
continue
}

managementContract = management.New(localActor)

setGroupInManifest(&prm.localManifest, prm.localNEF, committeeGroupKey, prm.localAcc.ScriptHash())
}

// just to definitely avoid mutation
nefCp := prm.localNEF
manifestCp := prm.localManifest

_, vub, err := managementContract.Deploy(&nefCp, &manifestCp, []interface{}{
txID, vub, err := managementContract.Deploy(&nefCp, &manifestCp, []interface{}{
[]interface{}{
[]interface{}{domainBootstrap, prm.systemEmail},
[]interface{}{domainContractAddresses, prm.systemEmail},
},
})
if err != nil {
sentTxValidUntilBlock = 0
if isErrNotEnoughGAS(err) {
prm.logger.Info("not enough GAS to deploy NNS contract, will try again later")
} else {
Expand All @@ -190,9 +182,11 @@ func initNNSContract(ctx context.Context, prm deployNNSContractPrm) (res util.Ui
continue
}

sentTxValidUntilBlock = vub
prm.logger.Info("transaction deploying NNS contract has been successfully sent, will wait for the outcome",
zap.Stringer("tx", txID), zap.Uint32("vub", vub),
)

prm.logger.Info("transaction deploying NNS contract has been successfully sent, will wait for the outcome")
txMonitor.trackPendingTransactionsAsync(ctx, vub, txID)
}
}

Expand Down Expand Up @@ -281,7 +275,12 @@ func updateNNSContract(ctx context.Context, prm updateNNSContractPrm) error {
return fmt.Errorf("create Notary service client sending transactions to be signed by the committee: %w", err)
}

var updateTxValidUntilBlock uint32
// wrap the parent context into the context of the current function so that
// transaction wait routines do not leak
ctx, cancel := context.WithCancel(ctx)
defer cancel()

txMonitor := newTransactionGroupMonitor(committeeActor)

for ; ; prm.monitor.waitForNextBlock(ctx) {
select {
Expand Down Expand Up @@ -332,8 +331,6 @@ func updateNNSContract(ctx context.Context, prm updateNNSContractPrm) error {

setGroupInManifest(&prm.localManifest, prm.localNEF, prm.committeeGroupKey, prm.localAcc.ScriptHash())

var vub uint32

// we pre-check 'already updated' case via MakeCall in order to not potentially
// wait for previously sent transaction to be expired (condition below) and
// immediately succeed
Expand All @@ -350,23 +347,17 @@ func updateNNSContract(ctx context.Context, prm updateNNSContractPrm) error {
prm.logger.Info("NNS contract has already been updated, skip")
return nil
}
} else {
if updateTxValidUntilBlock > 0 {
prm.logger.Info("transaction updating NNS contract was sent earlier, checking relevance...")

if cur := prm.monitor.currentHeight(); cur <= updateTxValidUntilBlock {
prm.logger.Info("previously sent transaction updating NNS contract may still be relevant, will wait for the outcome",
zap.Uint32("current height", cur), zap.Uint32("retry after height", updateTxValidUntilBlock))
continue
}

prm.logger.Info("previously sent transaction updating NNS contract expired without side-effect")
}

prm.logger.Info("sending new transaction updating NNS contract...")
prm.logger.Error("failed to make transaction updating NNS contract, will try again later", zap.Error(err))
continue
}

_, _, vub, err = committeeActor.Notarize(tx, nil)
if txMonitor.isPending() {
prm.logger.Info("previously sent notary request updating NNS contract is still pending, will wait for the outcome")
continue
}

mainTxID, fallbackTxID, vub, err := committeeActor.Notarize(tx, nil)
if err != nil {
lackOfGAS := isErrNotEnoughGAS(err)
// here lackOfGAS=true always means lack of Notary balance and not related to
Expand All @@ -389,8 +380,9 @@ func updateNNSContract(ctx context.Context, prm updateNNSContractPrm) error {
continue
}

updateTxValidUntilBlock = vub
prm.logger.Info("notary request updating NNS contract has been successfully sent, will wait for the outcome",
zap.Stringer("main tx", mainTxID), zap.Stringer("fallback tx", fallbackTxID), zap.Uint32("vub", vub))

prm.logger.Info("transaction updating NNS contract has been successfully sent, will wait for the outcome")
txMonitor.trackPendingTransactionsAsync(ctx, vub, mainTxID, fallbackTxID)
}
}
Loading

0 comments on commit 570e46b

Please sign in to comment.