Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(polling): poll per rollapp, only non-finalized orders #63

Merged
merged 9 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion eibc/lp.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (or *orderTracker) loadLPs(ctx context.Context) error {
if len(or.lps) == 0 {
or.logger.Info("no LPs found")
} else {
or.logger.Info("loaded LPs", zap.Int("count", len(or.lps)))
or.logger.Debug("loaded LPs", zap.Int("count", len(or.lps)))
}

if lpsUpdated || (currentLPCount > 0 && len(or.lps) > currentLPCount) {
Expand Down
7 changes: 6 additions & 1 deletion eibc/order_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,15 @@ func NewOrderClient(cfg config.Config, logger *zap.Logger) (*orderClient, error)
}

if cfg.OrderPolling.Enabled {
var rollapps []string
for r, _ := range cfg.Rollapps {
rollapps = append(rollapps, r)
}
oc.orderPoller = newOrderPoller(
hubClient.Context().ChainID,
hubClient.Context(),
oc.orderTracker,
cfg.OrderPolling,
rollapps,
logger,
)
oc.orderTracker.resetPoller = oc.orderPoller.resetOrderPolling
Expand Down
15 changes: 9 additions & 6 deletions eibc/order_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func TestOrderClient(t *testing.T) {

func setupTestOrderClient(
cfg config.Config,
pollOrders func() ([]Order, error),
pollOrders func(ctx context.Context) ([]Order, error),
hubClient mockNodeClient,
fullNodeClient *nodeClient,
grantsFn getLPGrantsFn,
Expand Down Expand Up @@ -373,8 +373,6 @@ func setupTestOrderClient(
logger,
)

chainID := "test-chain-id"

for i := range cfg.Fulfillers.Scale {
fulfillerName := fmt.Sprintf("fulfiller-%d", i+1)

Expand Down Expand Up @@ -403,10 +401,15 @@ func setupTestOrderClient(
// poller
var poller *orderPoller
if cfg.OrderPolling.Enabled {
var rollapps []string
for r, _ := range cfg.Rollapps {
rollapps = append(rollapps, r)
}
poller = newOrderPoller(
chainID,
hubClient.Context(),
ordTracker,
cfg.OrderPolling,
rollapps,
logger,
)
poller.getOrders = pollOrders
Expand All @@ -425,8 +428,8 @@ func setupTestOrderClient(
return oc, nil
}

func mockGetPollerOrders(orders []Order) func() ([]Order, error) {
return func() ([]Order, error) {
func mockGetPollerOrders(orders []Order) func(ctx context.Context) ([]Order, error) {
return func(ctx context.Context) ([]Order, error) {
// after polling once, remove orders
defer func() {
orders = nil
Expand Down
53 changes: 43 additions & 10 deletions eibc/order_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,44 +11,51 @@ import (
"sync/atomic"
"time"

"github.com/cosmos/cosmos-sdk/client"
sdk "github.com/cosmos/cosmos-sdk/types"
"go.uber.org/zap"

"github.com/dymensionxyz/eibc-client/config"
"github.com/dymensionxyz/eibc-client/types"
)

type orderPoller struct {
chainID string
indexerURL string
rollapps []string
interval time.Duration
indexerClient *http.Client
rollappClient types.QueryClient
logger *zap.Logger

getOrders func() ([]Order, error)
getOrders func(ctx context.Context) ([]Order, error)
orderTracker *orderTracker
lastBlockHeight atomic.Uint64
}

func newOrderPoller(
chainID string,
clientCtx client.Context,
orderTracker *orderTracker,
pollingCfg config.OrderPollingConfig,
rollapps []string,
logger *zap.Logger,
) *orderPoller {
o := &orderPoller{
chainID: chainID,
chainID: clientCtx.ChainID,
rollapps: rollapps,
indexerURL: pollingCfg.IndexerURL,
interval: pollingCfg.Interval,
logger: logger.With(zap.String("module", "order-poller")),
orderTracker: orderTracker,
rollappClient: types.NewQueryClient(clientCtx),
indexerClient: &http.Client{Timeout: 25 * time.Second},
}
o.getOrders = o.getDemandOrdersFromIndexer
return o
}

const (
ordersQuery = `{"query": "{ibcTransferDetails(filter: {network: {equalTo: \"%s\"} status: { in: [EibcPending, Refunding] }, blockHeight: { greaterThan: \"%s\" }}) {nodes { eibcOrderId amount proofHeight blockHeight price rollappId eibcFee }}}"}`
rollappOrdersQuery = `{"query": "{ibcTransferDetails(filter: {network: {equalTo: \"%s\"} status: { in: [EibcPending, Refunding] }, blockHeight: { greaterThan: \"%s\" }, rollappId: { equalTo: \"%s\"}, proofHeight: {greaterThan: \"%s\"}}) {nodes { eibcOrderId amount proofHeight blockHeight price rollappId eibcFee }}}"}`
)

type Order struct {
Expand All @@ -70,7 +77,7 @@ type ordersResponse struct {
}

func (p *orderPoller) start(ctx context.Context) error {
if err := p.pollPendingDemandOrders(); err != nil {
if err := p.pollPendingDemandOrders(ctx); err != nil {
return fmt.Errorf("failed to refresh demand orders: %w", err)
}

Expand All @@ -80,7 +87,7 @@ func (p *orderPoller) start(ctx context.Context) error {
case <-ctx.Done():
return
default:
if err := p.pollPendingDemandOrders(); err != nil {
if err := p.pollPendingDemandOrders(ctx); err != nil {
p.logger.Error("failed to refresh demand orders", zap.Error(err))
}
}
Expand All @@ -89,8 +96,8 @@ func (p *orderPoller) start(ctx context.Context) error {
return nil
}

func (p *orderPoller) pollPendingDemandOrders() error {
newDemandOrders, err := p.getOrders()
func (p *orderPoller) pollPendingDemandOrders(ctx context.Context) error {
newDemandOrders, err := p.getOrders(ctx)
if err != nil {
return fmt.Errorf("failed to get demand orders: %w", err)
}
Expand Down Expand Up @@ -209,8 +216,34 @@ func (p *orderPoller) convertOrders(demandOrders []Order) (orders []*demandOrder
return orders
}

func (p *orderPoller) getDemandOrdersFromIndexer() ([]Order, error) {
queryStr := fmt.Sprintf(ordersQuery, p.chainID, fmt.Sprint(p.lastBlockHeight.Load()))
func (p *orderPoller) getDemandOrdersFromIndexer(ctx context.Context) ([]Order, error) {
var demandOrders []Order
for _, rollapp := range p.rollapps {
orders, err := p.getRollappDemandOrdersFromIndexer(ctx, rollapp)
if err != nil {
return nil, fmt.Errorf("failed to get demand orders: %w", err)
}
demandOrders = append(demandOrders, orders...)
}

p.logger.Debug("got demand orders", zap.Int("count", len(demandOrders)))

return demandOrders, nil
}

func (p *orderPoller) getRollappDemandOrdersFromIndexer(ctx context.Context, rollappId string) ([]Order, error) {
lastFinalizedHeight := "0"
lastHeightResp, err := p.rollappClient.LatestHeight(ctx, &types.QueryGetLatestHeightRequest{
RollappId: rollappId,
Finalized: true,
})
if err != nil {
p.logger.Warn("failed to get latest height, using 0", zap.Error(err))
} else {
lastFinalizedHeight = fmt.Sprint(lastHeightResp.Height)
}

queryStr := fmt.Sprintf(rollappOrdersQuery, p.chainID, fmt.Sprint(p.lastBlockHeight.Load()), rollappId, lastFinalizedHeight)
body := strings.NewReader(queryStr)

resp, err := p.indexerClient.Post(p.indexerURL, "application/json", body)
Expand Down
8 changes: 0 additions & 8 deletions eibc/order_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,11 +352,3 @@ func (or *orderTracker) isRollappSupported(rollappID string) bool {
_, ok := or.fullNodeClient.rollapps[rollappID]
return ok
}

func (or *orderTracker) isOrderFulfilled(id string) bool {
or.fomu.Lock()
defer or.fomu.Unlock()

_, ok := or.fulfilledOrders[id]
return ok
}
Loading
Loading