Skip to content

Commit

Permalink
Merge pull request #453 from iotaledger/feat/inx-accepted-transactions
Browse files Browse the repository at this point in the history
Added ListenToAcceptedTransactions to INX
  • Loading branch information
alexsporn authored Oct 20, 2023
2 parents 46c4923 + d7f69bc commit 1f0f14a
Show file tree
Hide file tree
Showing 15 changed files with 315 additions and 242 deletions.
14 changes: 4 additions & 10 deletions components/dashboard/visualizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,11 @@ func sendTipInfo(block *blocks.Block, isTip bool) {
func runVisualizer(component *app.Component) {
if err := component.Daemon().BackgroundWorker("Dashboard[Visualizer]", func(ctx context.Context) {

deps.Protocol.MainEngineInstance().Ledger.MemPool().OnSignedTransactionAttached(func(signedTransactionMetadata mempool.SignedTransactionMetadata) {
signedTransactionMetadata.OnSignaturesValid(func() {
transactionMetadata := signedTransactionMetadata.TransactionMetadata()
transactionMetadata.OnAccepted(func() {
attachmentID := transactionMetadata.EarliestIncludedAttachment()
sendTxAccepted(attachmentID, true)
})
})
})

unhook := lo.Batch(
deps.Protocol.Events.Engine.Booker.TransactionAccepted.Hook(func(transactionMetadata mempool.TransactionMetadata) {
attachmentID := transactionMetadata.EarliestIncludedAttachment()
sendTxAccepted(attachmentID, true)
}, event.WithWorkerPool(component.WorkerPool)).Unhook,
deps.Protocol.Events.Engine.BlockDAG.BlockAttached.Hook(func(block *blocks.Block) {
sendVertex(block, false)
}, event.WithWorkerPool(component.WorkerPool)).Unhook,
Expand Down
136 changes: 104 additions & 32 deletions components/inx/server_utxo.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"google.golang.org/grpc/status"

"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/runtime/event"
"github.com/iotaledger/hive.go/runtime/workerpool"
inx "github.com/iotaledger/inx/go"
"github.com/iotaledger/iota-core/pkg/protocol/engine/mempool"
"github.com/iotaledger/iota-core/pkg/protocol/engine/utxoledger"
iotago "github.com/iotaledger/iota.go/v4"
)
Expand All @@ -26,11 +28,11 @@ func NewLedgerOutput(o *utxoledger.Output) (*inx.LedgerOutput, error) {
},
}

includedSlotIndex := o.SlotBooked()
if includedSlotIndex <= latestCommitment.Slot() {
includedCommitment, err := deps.Protocol.MainEngineInstance().Storage.Commitments().Load(includedSlotIndex)
includedSlot := o.SlotBooked()
if includedSlot <= latestCommitment.Slot() {
includedCommitment, err := deps.Protocol.MainEngineInstance().Storage.Commitments().Load(includedSlot)
if err != nil {
return nil, ierrors.Wrapf(err, "failed to load commitment with index: %d", includedSlotIndex)
return nil, ierrors.Wrapf(err, "failed to load commitment with slot: %d", includedSlot)
}
l.CommitmentIdIncluded = inx.NewCommitmentId(includedCommitment.ID())
}
Expand All @@ -51,11 +53,11 @@ func NewLedgerSpent(s *utxoledger.Spent) (*inx.LedgerSpent, error) {
}

latestCommitment := deps.Protocol.MainEngineInstance().SyncManager.LatestCommitment()
spentSlotIndex := s.SlotSpent()
if spentSlotIndex <= latestCommitment.Slot() {
spentCommitment, err := deps.Protocol.MainEngineInstance().Storage.Commitments().Load(spentSlotIndex)
spentSlot := s.SlotSpent()
if spentSlot <= latestCommitment.Slot() {
spentCommitment, err := deps.Protocol.MainEngineInstance().Storage.Commitments().Load(spentSlot)
if err != nil {
return nil, ierrors.Wrapf(err, "failed to load commitment with index: %d", spentSlotIndex)
return nil, ierrors.Wrapf(err, "failed to load commitment with slot: %d", spentSlot)
}
l.CommitmentIdSpent = inx.NewCommitmentId(spentCommitment.ID())
}
Expand Down Expand Up @@ -226,11 +228,11 @@ func (s *Server) ListenToLedgerUpdates(req *inx.SlotRangeRequest, srv inx.INX_Li
return nil
}

sendStateDiffsRange := func(startIndex iotago.SlotIndex, endIndex iotago.SlotIndex) error {
for currentIndex := startIndex; currentIndex <= endIndex; currentIndex++ {
stateDiff, err := deps.Protocol.MainEngineInstance().Ledger.SlotDiffs(currentIndex)
sendStateDiffsRange := func(startSlot iotago.SlotIndex, endSlot iotago.SlotIndex) error {
for currentSlot := startSlot; currentSlot <= endSlot; currentSlot++ {
stateDiff, err := deps.Protocol.MainEngineInstance().Ledger.SlotDiffs(currentSlot)
if err != nil {
return status.Errorf(codes.NotFound, "ledger update for milestoneIndex %d not found", currentIndex)
return status.Errorf(codes.NotFound, "ledger update for slot %d not found", currentSlot)
}

if err := createLedgerUpdatePayloadAndSend(stateDiff.Slot, stateDiff.Outputs, stateDiff.Spents); err != nil {
Expand All @@ -241,37 +243,37 @@ func (s *Server) ListenToLedgerUpdates(req *inx.SlotRangeRequest, srv inx.INX_Li
return nil
}

// if a startIndex is given, we send all available milestone diffs including the start index.
// if an endIndex is given, we send all available milestone diffs up to and including min(ledgerIndex, endIndex).
// if no startIndex is given, but an endIndex, we don't send previous milestone diffs.
sendPreviousStateDiffs := func(startIndex iotago.SlotIndex, endIndex iotago.SlotIndex) (iotago.SlotIndex, error) {
if startIndex == 0 {
// no need to send previous milestone diffs
// if a startSlot is given, we send all available state diffs including the start slot.
// if an endSlot is given, we send all available state diffs up to and including min(ledgerSlot, endSlot).
// if no startSlot is given, but an endSlot we don't send previous state diffs.
sendPreviousStateDiffs := func(startSlot iotago.SlotIndex, endSlot iotago.SlotIndex) (iotago.SlotIndex, error) {
if startSlot == 0 {
// no need to send previous state diffs
return 0, nil
}

latestCommitment := deps.Protocol.MainEngineInstance().SyncManager.LatestCommitment()

if startIndex > latestCommitment.Slot() {
// no need to send previous milestone diffs
if startSlot > latestCommitment.Slot() {
// no need to send previous state diffs
return 0, nil
}

// Stream all available milestone diffs first
pruningIndex := deps.Protocol.MainEngineInstance().SyncManager.LatestFinalizedSlot()
if startIndex <= pruningIndex {
return 0, status.Errorf(codes.InvalidArgument, "given startMilestoneIndex %d is older than the current pruningIndex %d", startIndex, pruningIndex)
prunedEpoch, hasPruned := deps.Protocol.MainEngineInstance().SyncManager.LastPrunedEpoch()
if hasPruned && startSlot <= deps.Protocol.CommittedAPI().TimeProvider().EpochEnd(prunedEpoch) {
return 0, status.Errorf(codes.InvalidArgument, "given startSlot %d is older than the current pruningSlot %d", startSlot, deps.Protocol.CommittedAPI().TimeProvider().EpochEnd(prunedEpoch))
}

if endIndex == 0 || endIndex > latestCommitment.Slot() {
endIndex = latestCommitment.Slot()
if endSlot == 0 || endSlot > latestCommitment.Slot() {
endSlot = latestCommitment.Slot()
}

if err := sendStateDiffsRange(startIndex, endIndex); err != nil {
if err := sendStateDiffsRange(startSlot, endSlot); err != nil {
return 0, err
}

return endIndex, nil
return endSlot, nil
}

stream := &streamRange{
Expand All @@ -292,16 +294,16 @@ func (s *Server) ListenToLedgerUpdates(req *inx.SlotRangeRequest, srv inx.INX_Li

catchUpFunc := func(start iotago.SlotIndex, end iotago.SlotIndex) error {
if err := sendStateDiffsRange(start, end); err != nil {
Component.LogErrorf("sendMilestoneDiffsRange error: %v", err)
Component.LogErrorf("sendStateDiffsRange error: %v", err)

return err
}

return nil
}

sendFunc := func(index iotago.SlotIndex, newOutputs utxoledger.Outputs, newSpents utxoledger.Spents) error {
if err := createLedgerUpdatePayloadAndSend(index, newOutputs, newSpents); err != nil {
sendFunc := func(slot iotago.SlotIndex, newOutputs utxoledger.Outputs, newSpents utxoledger.Spents) error {
if err := createLedgerUpdatePayloadAndSend(slot, newOutputs, newSpents); err != nil {
Component.LogErrorf("send error: %v", err)

return err
Expand All @@ -315,8 +317,8 @@ func (s *Server) ListenToLedgerUpdates(req *inx.SlotRangeRequest, srv inx.INX_Li

wp := workerpool.New("ListenToLedgerUpdates", workerpool.WithWorkerCount(workerCount)).Start()

unhook := deps.Protocol.Events.Engine.Ledger.StateDiffApplied.Hook(func(index iotago.SlotIndex, newOutputs utxoledger.Outputs, newSpents utxoledger.Spents) {
done, err := handleRangedSend2(index, newOutputs, newSpents, stream, catchUpFunc, sendFunc)
unhook := deps.Protocol.Events.Engine.Ledger.StateDiffApplied.Hook(func(slot iotago.SlotIndex, newOutputs utxoledger.Outputs, newSpents utxoledger.Spents) {
done, err := handleRangedSend2(slot, newOutputs, newSpents, stream, catchUpFunc, sendFunc)
switch {
case err != nil:
innerErr = err
Expand All @@ -338,3 +340,73 @@ func (s *Server) ListenToLedgerUpdates(req *inx.SlotRangeRequest, srv inx.INX_Li

return innerErr
}

func (s *Server) ListenToAcceptedTransactions(_ *inx.NoParams, srv inx.INX_ListenToAcceptedTransactionsServer) error {
ctx, cancel := context.WithCancel(Component.Daemon().ContextStopped())

wp := workerpool.New("ListenToAcceptedTransactions", workerpool.WithWorkerCount(workerCount)).Start()

unhook := deps.Protocol.Events.Engine.Booker.TransactionAccepted.Hook(func(transactionMetadata mempool.TransactionMetadata) {
slot := transactionMetadata.EarliestIncludedAttachment().Slot()

var consumed []*inx.LedgerSpent
if err := transactionMetadata.Inputs().ForEach(func(stateMetadata mempool.StateMetadata) error {
spentOutput, ok := stateMetadata.State().(*utxoledger.Output)
if !ok {
return ierrors.Errorf("unexpected state metadata type: %T", stateMetadata.State())
}

inxSpent, err := NewLedgerSpent(utxoledger.NewSpent(spentOutput, transactionMetadata.ID(), slot))
if err != nil {
return err
}
consumed = append(consumed, inxSpent)

return nil
}); err != nil {
Component.LogErrorf("error creating payload: %v", err)
cancel()
}

var created []*inx.LedgerOutput
if err := transactionMetadata.Outputs().ForEach(func(stateMetadata mempool.StateMetadata) error {
output, ok := stateMetadata.State().(*utxoledger.Output)
if !ok {
return ierrors.Errorf("unexpected state metadata type: %T", stateMetadata.State())
}

inxOutput, err := NewLedgerOutput(output)
if err != nil {
return err
}
created = append(created, inxOutput)

return nil
}); err != nil {
Component.LogErrorf("error creating payload: %v", err)
cancel()
}

payload := &inx.AcceptedTransaction{
TransactionId: inx.NewTransactionId(transactionMetadata.ID()),
Slot: uint32(slot),
Consumed: consumed,
Created: created,
}
if err := srv.Send(payload); err != nil {
Component.LogErrorf("send error: %v", err)
cancel()
}
}, event.WithWorkerPool(wp)).Unhook

<-ctx.Done()
unhook()

// We need to wait until all tasks are done, otherwise we might call
// "SendMsg" and "CloseSend" in parallel on the grpc stream, which is
// not safe according to the grpc docs.
wp.Shutdown()
wp.ShutdownComplete.Wait()

return ctx.Err()
}
1 change: 1 addition & 0 deletions components/restapi/core/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
restapipkg "github.com/iotaledger/iota-core/pkg/restapi"
)

//nolint:goconst // don't care about the number of constants
const (
// RouteInfo is the route for getting the node info.
// GET returns the node info.
Expand Down
38 changes: 19 additions & 19 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,22 @@ require (
github.com/google/uuid v1.3.1
github.com/gorilla/websocket v1.5.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/iotaledger/hive.go/ads v0.0.0-20231019113503-7986872a7a38
github.com/iotaledger/hive.go/app v0.0.0-20231019113503-7986872a7a38
github.com/iotaledger/hive.go/constraints v0.0.0-20231019113503-7986872a7a38
github.com/iotaledger/hive.go/core v1.0.0-rc.3.0.20231019113503-7986872a7a38
github.com/iotaledger/hive.go/crypto v0.0.0-20231019113503-7986872a7a38
github.com/iotaledger/hive.go/ds v0.0.0-20231019113503-7986872a7a38
github.com/iotaledger/hive.go/ierrors v0.0.0-20231019113503-7986872a7a38
github.com/iotaledger/hive.go/kvstore v0.0.0-20231019113503-7986872a7a38
github.com/iotaledger/hive.go/lo v0.0.0-20231019113503-7986872a7a38
github.com/iotaledger/hive.go/logger v0.0.0-20231019113503-7986872a7a38
github.com/iotaledger/hive.go/runtime v0.0.0-20231019113503-7986872a7a38
github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231019113503-7986872a7a38
github.com/iotaledger/hive.go/stringify v0.0.0-20231019113503-7986872a7a38
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231011161248-cf0bd6e08811
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231011154428-257141868dad
github.com/iotaledger/iota.go/v4 v4.0.0-20231019112751-e9872df31648
github.com/iotaledger/hive.go/ads v0.0.0-20231020115340-13da292c580b
github.com/iotaledger/hive.go/app v0.0.0-20231020115340-13da292c580b
github.com/iotaledger/hive.go/constraints v0.0.0-20231020115340-13da292c580b
github.com/iotaledger/hive.go/core v1.0.0-rc.3.0.20231020115340-13da292c580b
github.com/iotaledger/hive.go/crypto v0.0.0-20231020115340-13da292c580b
github.com/iotaledger/hive.go/ds v0.0.0-20231020115340-13da292c580b
github.com/iotaledger/hive.go/ierrors v0.0.0-20231020115340-13da292c580b
github.com/iotaledger/hive.go/kvstore v0.0.0-20231020115340-13da292c580b
github.com/iotaledger/hive.go/lo v0.0.0-20231020115340-13da292c580b
github.com/iotaledger/hive.go/logger v0.0.0-20231020115340-13da292c580b
github.com/iotaledger/hive.go/runtime v0.0.0-20231020115340-13da292c580b
github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231020115340-13da292c580b
github.com/iotaledger/hive.go/stringify v0.0.0-20231020115340-13da292c580b
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231020152103-b6ea7ff7a4af
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231020151337-569450d5bf7d
github.com/iotaledger/iota.go/v4 v4.0.0-20231019174124-aa2290512bcd
github.com/labstack/echo/v4 v4.11.2
github.com/labstack/gommon v0.4.0
github.com/libp2p/go-libp2p v0.30.0
Expand All @@ -39,9 +39,9 @@ require (
github.com/wollac/iota-crypto-demo v0.0.0-20221117162917-b10619eccb98
github.com/zyedidia/generic v1.2.1
go.uber.org/atomic v1.11.0
go.uber.org/dig v1.17.0
go.uber.org/dig v1.17.1
golang.org/x/crypto v0.14.0
google.golang.org/grpc v1.58.3
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.31.0
)

Expand Down Expand Up @@ -179,7 +179,7 @@ require (
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.13.0 // indirect
gonum.org/v1/gonum v0.13.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231009173412-8bfb1ae86b6c // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
Expand Down
Loading

0 comments on commit 1f0f14a

Please sign in to comment.