Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/synchronizer #5

Merged
merged 22 commits into from
Aug 9, 2023
Merged

Feature/synchronizer #5

merged 22 commits into from
Aug 9, 2023

Conversation

christophercampbell
Copy link
Contributor

This PR adds a data synchronizer.

  1. sync_info table for tracking last processed block for SequenceBatches
  2. DataCommitteeWatcher for data committee membership changes
  3. BatchSynchronizer that watches for SequenceBatches events starting from last block in sync_info
  • checks if keys in batch are stored
  • retrieves values for missing keys from other data committee members
  • stores resolved offchain data & sync_info

db/db.go Outdated
@@ -60,11 +61,55 @@ func (p *DB) GetOffChainData(ctx context.Context, key common.Hash, dbTx pgx.Tx)
valueStr string
)

println(key.Hex())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this line

db/db.go Outdated
if err := dbTx.QueryRow(ctx, getOffchainDataSQL, key.Hex()).Scan(&valueStr); err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, state.ErrStateNotSynchronized
}
return nil, err
}
valueStr = strings.TrimPrefix(valueStr, "0x") // is this right?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, the expected response is 0x...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the right way to decode the hex? common.HexToBytes fails if the '0x' is included. maybe common.FromHex?

synchronizer/batches.go Show resolved Hide resolved
Value: bytes,
}
}
return data, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should double check that the answer is correct: key == hash(data)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually we could implement this check on the client, as otherwise it needs to be implemented by all the clients consumers like here

In fact the logic of handling multiple DAC nodes could be moved to the client as well, as this is also repeated code here

We can add both improvements later on

)

if err := db.pg.QueryRow(ctx, keyExists, key.Hex()).Scan(&count); err != nil {
return false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should handle the case of error in a different way as false. For instance if there is a DB error we should retry instead of assuming that the entry is not stored there

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retrying on DB errors can be tricky, there are classes of errors that won't resolve with retry, so can get into a loop. I guess maybe have a fixed number of retries, but then what is the behavior if you reach that?

synchronizer/batches.go Show resolved Hide resolved
}

// wait on events, timeouts, and signals to stop
select {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't the select be inside an endless for loop so the subscription is re-used until there is a problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

// Start starts the BatchSynchronizer event subscription
func (bs *BatchSynchronizer) Start() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this implementation will handle reorgs well, have no experience working with the Watch* methods

)

// DataCommitteeTracker tracks changes to the data committee membership
type DataCommitteeTracker struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understood well, the DAC info is only used to query data that for some reason in missing on the local DB. If this is the case, I think it's simpler and cleaner to ask for the current DAC members when needed, rather than keeping track of all the updates.

At least this is how I've implemented this on the node here 😇

The idea is that:

  1. You ask for the current committee only when you actually need to get data from it
  2. Once you've get the current committee, you cache on memory this info
  3. Only update if at some point all the (cached) committee members fail to response to a query

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, much better

@christophercampbell christophercampbell merged commit e1f21ce into main Aug 9, 2023
3 checks passed
@christophercampbell christophercampbell deleted the Feature/synchronizer branch August 9, 2023 16:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants