Skip to content

Commit

Permalink
feat: use unified skyway event handler
Browse files Browse the repository at this point in the history
  • Loading branch information
maharifu committed Aug 5, 2024
1 parent 82e6ed4 commit 81ca12f
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 597 deletions.
194 changes: 17 additions & 177 deletions chain/evm/compass.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,23 +61,16 @@ type evmClienter interface {
GetEthClient() ethClientConn
}

type observedHeights struct {
batchSendEvent int64
sendToPalomaEvent int64
lightNodeSaleEvent int64
}

type compass struct {
paloma PalomaClienter
evm evmClienter
compassAbi *abi.ABI
chainID *big.Int
CompassID string
ChainReferenceID string
lastObservedBlockHeights observedHeights
lastObservedBlockHeight int64
startingBlockHeight int64
smartContractAddr common.Address
paloma PalomaClienter
evm evmClienter
compassAbi *abi.ABI
chainID *big.Int
CompassID string
ChainReferenceID string
lastObservedBlockHeight int64
startingBlockHeight int64
smartContractAddr common.Address
}

func newCompassClient(
Expand Down Expand Up @@ -695,165 +688,6 @@ func (t compass) provideEvidenceForReferenceBlock(ctx context.Context, queueType
return nil
}

func (t *compass) GetBatchSendEvents(ctx context.Context, orchestrator string) ([]chain.BatchSendEvent, error) {
filter, err := ethfilter.Factory().
WithFromBlockNumberProvider(t.evm.FindCurrentBlockNumber).
WithFromBlockNumberSafetyMargin(1).
WithTopics([]common.Hash{batchSendEvent}).
WithAddresses(t.smartContractAddr).
Filter(ctx)
if err != nil {
return nil, err
}

currentBlockNumber := filter.FromBlock.Int64()
if t.lastObservedBlockHeights.batchSendEvent == 0 {
t.lastObservedBlockHeights.batchSendEvent = currentBlockNumber - 10_000
}

filter.FromBlock = big.NewInt(t.lastObservedBlockHeights.batchSendEvent)
filter.ToBlock = big.NewInt(min(t.lastObservedBlockHeights.batchSendEvent+10_000, currentBlockNumber))

var events []chain.BatchSendEvent

logs, err := t.evm.GetEthClient().FilterLogs(ctx, filter)
if err != nil {
return nil, err
}

lastSkywayNonce, err := t.paloma.QueryLastObservedSkywayNonceByAddr(ctx, t.ChainReferenceID, orchestrator)
if err != nil {
return nil, err
}

for _, ethLog := range logs {
evt, err := t.parseBatchSendEvent(ethLog.Data, ethLog.BlockNumber)
if err != nil {
return nil, err
}

if evt.GetSkywayNonce() <= lastSkywayNonce {
liblog.WithContext(ctx).
WithField("last-event-nonce", lastSkywayNonce).
WithField("skyway-nonce", evt.GetSkywayNonce()).
Info("Skipping already observed event...")
continue
}

events = append(events, evt)
}

t.lastObservedBlockHeights.batchSendEvent = filter.ToBlock.Int64()

return events, err
}

func (t *compass) GetSendToPalomaEvents(ctx context.Context, orchestrator string) ([]chain.SendToPalomaEvent, error) {
filter, err := ethfilter.Factory().
WithFromBlockNumberProvider(t.evm.FindCurrentBlockNumber).
WithFromBlockNumberSafetyMargin(1).
WithTopics([]common.Hash{sendToPalomaEvent}).
WithAddresses(t.smartContractAddr).
Filter(ctx)
if err != nil {
return nil, err
}

currentBlockNumber := filter.FromBlock.Int64()
if t.lastObservedBlockHeights.sendToPalomaEvent == 0 {
t.lastObservedBlockHeights.sendToPalomaEvent = currentBlockNumber - 10_000
}

filter.FromBlock = big.NewInt(t.lastObservedBlockHeights.sendToPalomaEvent)
filter.ToBlock = big.NewInt(min(t.lastObservedBlockHeights.sendToPalomaEvent+10_000, currentBlockNumber))

var events []chain.SendToPalomaEvent

logs, err := t.evm.GetEthClient().FilterLogs(ctx, filter)
if err != nil {
return nil, err
}

lastSkywayNonce, err := t.paloma.QueryLastObservedSkywayNonceByAddr(ctx, t.ChainReferenceID, orchestrator)
if err != nil {
return nil, err
}

for _, ethLog := range logs {
evt, err := t.parseSendToPalomaEvent(ethLog.Data, ethLog.BlockNumber)
if err != nil {
return nil, err
}

if evt.GetSkywayNonce() <= lastSkywayNonce {
liblog.WithContext(ctx).
WithField("last-event-nonce", lastSkywayNonce).
WithField("skyway-nonce", evt.GetSkywayNonce()).
Info("Skipping already observed event...")
continue
}

events = append(events, evt)
}

t.lastObservedBlockHeights.sendToPalomaEvent = filter.ToBlock.Int64()

return events, err
}

func (t *compass) GetLightNodeSaleEvents(ctx context.Context, orchestrator string) ([]chain.LightNodeSaleEvent, error) {
filter, err := ethfilter.Factory().
WithFromBlockNumberProvider(t.evm.FindCurrentBlockNumber).
WithFromBlockNumberSafetyMargin(1).
WithTopics([]common.Hash{nodeSaleEvent}).
WithAddresses(t.smartContractAddr).
Filter(ctx)
if err != nil {
return nil, err
}

currentBlockNumber := filter.FromBlock.Int64()
if t.lastObservedBlockHeights.lightNodeSaleEvent == 0 {
t.lastObservedBlockHeights.lightNodeSaleEvent = currentBlockNumber - 10_000
}

filter.FromBlock = big.NewInt(t.lastObservedBlockHeights.lightNodeSaleEvent)
filter.ToBlock = big.NewInt(min(t.lastObservedBlockHeights.lightNodeSaleEvent+10_000, currentBlockNumber))

var events []chain.LightNodeSaleEvent

logs, err := t.evm.GetEthClient().FilterLogs(ctx, filter)
if err != nil {
return nil, err
}

lastSkywayNonce, err := t.paloma.QueryLastObservedSkywayNonceByAddr(ctx, t.ChainReferenceID, orchestrator)
if err != nil {
return nil, err
}

for _, ethLog := range logs {
evt, err := t.parseLightNodeSaleEvent(ethLog.Data, ethLog.BlockNumber)
if err != nil {
return nil, err
}

if evt.SkywayNonce <= lastSkywayNonce {
liblog.WithContext(ctx).
WithField("last-event-nonce", lastSkywayNonce).
WithField("skyway-nonce", evt.SkywayNonce).
Info("Skipping already observed event...")
continue
}

events = append(events, evt)
}

t.lastObservedBlockHeights.lightNodeSaleEvent = filter.ToBlock.Int64()

return events, err
}

func (t *compass) GetSkywayEvents(
ctx context.Context,
orchestrator string,
Expand Down Expand Up @@ -892,27 +726,33 @@ func (t *compass) GetSkywayEvents(
return nil, err
}

logger := liblog.WithContext(ctx)

var evt chain.SkywayEventer

for _, ethLog := range logs {
switch ethLog.Topics[0] {
case nodeSaleEvent:
logger.Info("Parsing light node sale event")
evt, err = t.parseLightNodeSaleEvent(ethLog.Data, ethLog.BlockNumber)
case batchSendEvent:
logger.Info("Parsing batch send event")
evt, err = t.parseBatchSendEvent(ethLog.Data, ethLog.BlockNumber)
case sendToPalomaEvent:
logger.Info("Parsing send to paloma event")
evt, err = t.parseSendToPalomaEvent(ethLog.Data, ethLog.BlockNumber)
default:
logger.WithField("event", ethLog).Warn("Unknown event from compass")
continue
}

if err != nil {
logger.WithError(err).Warn("Error parsing event")
return nil, err
}

if evt.GetSkywayNonce() <= lastSkywayNonce {
liblog.WithContext(ctx).
WithField("last-event-nonce", lastSkywayNonce).
logger.WithField("last-event-nonce", lastSkywayNonce).
WithField("skyway-nonce", evt.GetSkywayNonce()).
Info("Skipping already observed event...")
continue
Expand Down
39 changes: 0 additions & 39 deletions chain/evm/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,49 +217,10 @@ func (p Processor) isRightChain(blockHash common.Hash) error {
return nil
}

func (p Processor) GetBatchSendEvents(ctx context.Context, orchestrator string) ([]chain.BatchSendEvent, error) {
return p.compass.GetBatchSendEvents(ctx, orchestrator)
}

func (p Processor) GetSendToPalomaEvents(ctx context.Context, orchestrator string) ([]chain.SendToPalomaEvent, error) {
return p.compass.GetSendToPalomaEvents(ctx, orchestrator)
}

func (p Processor) GetLightNodeSaleEvents(ctx context.Context, orchestrator string) ([]chain.LightNodeSaleEvent, error) {
return p.compass.GetLightNodeSaleEvents(ctx, orchestrator)
}

func (p Processor) GetSkywayEvents(ctx context.Context, orchestrator string) ([]chain.SkywayEventer, error) {
return p.compass.GetSkywayEvents(ctx, orchestrator)
}

func (p Processor) SubmitBatchSendToRemoteClaims(ctx context.Context, batchSendEvents []chain.BatchSendEvent, orchestrator string) error {
for _, batchSendEvent := range batchSendEvents {
if err := p.compass.submitBatchSendToEVMClaim(ctx, batchSendEvent, orchestrator); err != nil {
return err
}
}
return nil
}

func (p Processor) SubmitSendToPalomaClaims(ctx context.Context, batchSendEvents []chain.SendToPalomaEvent, orchestrator string) error {
for _, batchSendEvent := range batchSendEvents {
if err := p.compass.submitSendToPalomaClaim(ctx, batchSendEvent, orchestrator); err != nil {
return err
}
}
return nil
}

func (p Processor) SubmitLightNodeSaleClaims(ctx context.Context, batchSaleEvents []chain.LightNodeSaleEvent, orchestrator string) error {
for _, batchSaleEvent := range batchSaleEvents {
if err := p.compass.submitLightNodeSaleClaim(ctx, batchSaleEvent, orchestrator); err != nil {
return err
}
}
return nil
}

// Submit all gathered events to Paloma. Events need to be sent in order to
// preserve skyway nonce order.
func (p Processor) SubmitEventClaims(
Expand Down
Loading

0 comments on commit 81ca12f

Please sign in to comment.