Skip to content

services: Implement call to getLedgers and start parsing the xdr.LedgerCloseMeta #197

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from

Conversation

marcelosalloum
Copy link
Collaborator

@marcelosalloum marcelosalloum commented Jun 18, 2025

What

Update the indexer code to stream from {RPC}.getLedgers and break down the results from ledgerRange -> ledgers -> transactions.

Also, (pseudo) process the ledgers in parallel.

Why

As part of the new indexer implementation.

Future Work

  • evaluate transactions, filter by participant account & store in the DB
  • evaluate operations, filter by participant account & store in the DB
  • generate state changes
  • unlock channel accounts

I haven't added many tests to the ingest service yet, since this code will still evolve quite a bit.

Issue that this PR addresses

Partially address #128

Checklist

PR Structure

  • It is not possible to break this PR down into smaller PRs.
  • This PR does not mix refactoring changes with feature changes.
  • This PR's title starts with name of package that is most changed in the PR, or all if the changes are broad or impact many packages.

Thoroughness

  • This PR adds tests for the new functionality or fixes.
  • All updated queries have been tested (refer to this check if the data set returned by the updated query is expected to be same as the original one).

Release

  • This is not a breaking change.
  • This is ready to be tested in development.
  • The new functionality is gated with a feature flag if this is not ready for production.

@marcelosalloum marcelosalloum self-assigned this Jun 18, 2025
@marcelosalloum marcelosalloum requested a review from Copilot June 18, 2025 23:46
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements changes for processing ledgers in the new indexer service. Key updates include the addition of a sqlExec parameter to several functions for improved transactional control, modifications to the rpcService error messages, and a new ingestion flow with an advisory lock and parallel ledger processing.

Reviewed Changes

Copilot reviewed 20 out of 21 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
internal/signing/store/types.go Added sqlExec parameter to UnassignTxAndUnlockChannelAccounts for transaction consistency.
internal/signing/store/mocks.go Updated mock signature for the new sqlExec parameter.
internal/services/rpc_service*.go Modified getLedgers interface and error messages for clarity.
internal/services/ingest.go Introduced a new ingestion Run function with advisory locking and a worker pool for parallel ledger processing; adjusted metrics updates.
internal/ingest/ingest.go Deprecated the old Run function and updated logging messages.
internal/indexer/types/types.go Changed LedgerNumber type from int64 to uint32.
internal/data/models.go & ingest_store* Refactored ledger synced cursor logic by moving it from PaymentModel to IngestStoreModel.
go.mod & .golangci.yml Added new dependencies and refined linter exclusion patterns.
Comments suppressed due to low confidence (1)

internal/signing/store/types.go:26

  • Introducing the sqlExec parameter improves transactional control; ensure that all callers are updated accordingly to pass the correct SQL executer.
	UnassignTxAndUnlockChannelAccounts(ctx context.Context, sqlExec db.SQLExecuter, txHashes ...string) (int64, error)

Comment on lines +300 to +310
mu := sync.Mutex{}
var allTxHashes []string
var errs []error

// Submit tasks to the pool
for _, ledger := range getLedgersResponse.Ledgers {
ledger := ledger // Create a new variable to avoid closure issues
pool.Submit(func() {
txHashes, err := m.processLedger(ctx, ledger)
mu.Lock()
if err != nil {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to use toid to order the transactions in the database. It's calculated like this.

@marcelosalloum marcelosalloum marked this pull request as ready for review June 19, 2025 00:18
@marcelosalloum marcelosalloum changed the title [WIP] services: Implement call to getLedgers and start parsing the xdr.LedgerCloseMeta services: Implement call to getLedgers and start parsing the xdr.LedgerCloseMeta Jun 19, 2025
@marcelosalloum marcelosalloum changed the title services: Implement call to getLedgers and start parsing the xdr.LedgerCloseMeta services: Implement call to getLedgers and start parsing the xdr.LedgerCloseMeta Jun 23, 2025
@marcelosalloum marcelosalloum added this to the Wallet Backend Indexer milestone Jun 23, 2025
@aditya1702 aditya1702 removed this from the Wallet Backend Indexer milestone Jun 23, 2025
@@ -38,7 +38,7 @@ func Ingest(cfg Configs) error {
}

if err = ingestService.Run(ctx, uint32(cfg.StartLedger), uint32(cfg.EndLedger)); err != nil {
log.Ctx(ctx).Fatalf("Running ingest from %d to %d: %v", cfg.StartLedger, cfg.EndLedger, err)
log.Ctx(ctx).Fatalf("Error running 'ingest' from %d to %d: %v", cfg.StartLedger, cfg.EndLedger, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small nit

Suggested change
log.Ctx(ctx).Fatalf("Error running 'ingest' from %d to %d: %v", cfg.StartLedger, cfg.EndLedger, err)
log.Ctx(ctx).Fatalf("running 'ingest' from %d to %d: %v", cfg.StartLedger, cfg.EndLedger, err)

func (m *ingestService) fetchNextLedgersBatch(ctx context.Context, startLedger uint32) (GetLedgersResponse, error) {
rpcHealth, err := m.rpcService.GetHealth()
if err != nil {
return GetLedgersResponse{}, fmt.Errorf("getting rpc health: %w", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In cases where RPC becomes unhealthy for a few mins and then healthy again, this would stop the ingestion and terminate right? How about we terminate ingestion process only if RPC stays unhealthy for a certain threshold. Otherwise we keep letting it try and fetch the ledgers batch until then.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, good point! I'll work on that 👍

if latestLedgerSynced >= rpcNewestLedger {
return LedgerSeqRange{}, true
}
ledgerRange.StartLedger = max(latestLedgerSynced+1, rpcOldestLedger)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do a max() here would that mean if wallet-backend is at 5 and rpc's oldest ledger is at 8, it would skip ingesting 6, 7? If we are ingesting using GetLedgers by getting ledgers in bulk, do we want to allow WB catch up to RPC instead of skipping ledgers?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do a max() here would that mean if wallet-backend is at 5 and rpc's oldest ledger is at 8, it would skip ingesting 6, 7?

I think we don't have another option here, right? If we have [oldestLedger: 8, newestLedger: 20], we can only ingest within that window, as the RPC doesn't have anything older than ledger 8 available for ingestion.


func (m *ingestService) processLedgerResponse(ctx context.Context, getLedgersResponse GetLedgersResponse) error {
// Create a worker pool with
const poolSize = 16
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: lets move this with other constants at the top

log.Ctx(ctx).Debugf("🚧 Got %d transactions for ledger %d", len(transactions), xdrLedgerCloseMeta.LedgerSequence())
txHashes := make([]string, 0, len(transactions))
for _, tx := range transactions {
txHashes = append(txHashes, tx.Hash.HexString())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is still in TODO, I am assuming this will return a list of types.Transaction structs instead of tx hash strings? Because what would be the use of the tx hashes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right! I'm returning txHashes because I'd like to log something short. But the output will definitely be actual objects!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants