-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add batch and order data jobs #448
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that you want to merge the solver rewards code with the dune sync code. That would require some code restructuring to make the code consistent. Or massive clanup if merged in a state close to the current state of the PR.
Is the long term plan to make this repo into a solver-accounting repo? Would all code have to be compatible or should it be separate folders with independent code?
@@ -0,0 +1,251 @@ | |||
"""Basic client for connecting to postgres database with login credentials""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we really want to merge the two codebases, this needs to be combined with pg_client.py
.
src/dbt/common.py
Outdated
def compute_block_and_month_range( # pylint: disable=too-many-locals | ||
node: Web3, recompute_previous_month: bool | ||
) -> Tuple[List[Tuple[int, int]], List[str], List[bool]]: | ||
""" | ||
This determines the block range and the relevant months | ||
for which we will compute and upload data on Dune. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function might benefit from documenting the return value in a bit more detail.
src/dbt/sync_data.py
Outdated
for i, _ in enumerate(block_range_list): | ||
start_block = block_range_list[i][0] | ||
end_block = block_range_list[i][1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for i, _ in enumerate(block_range_list): | |
start_block = block_range_list[i][0] | |
end_block = block_range_list[i][1] | |
for i, (start_block, end_block) in enumerate(block_range_list): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or better yet, zip all those lists and avoid the index i
.
Co-authored-by: Felix Henneke <[email protected]>
So some code is currently commented out; it is meant to do proper type casting but doesn't work as expected. It is also not clear if type casting is really needed so will investigate further. For now, i am creating a tag from this branch so that we can proceed with a test deployment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked the logic of get_batch_data
and it looks fine.
) -- Most efficient column order for sorting would be having tx_hash or order_uid first | ||
|
||
select | ||
'{{env}}' as environment, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a good idea!
def get_order_data( | ||
cls, block_range: BlockRange, config: AccountingConfig | ||
) -> DataFrame: | ||
""" | ||
Decomposes the block range into buckets of 10k blocks each, | ||
so as to ensure the batch data query runs fast enough. | ||
At the end, it concatenates everything into one data frame | ||
""" | ||
load_dotenv() | ||
start = block_range.block_from | ||
end = block_range.block_to | ||
bucket_size = config.data_processing_config.bucket_size | ||
res = [] | ||
while start < end: | ||
size = min(end - start + 1, bucket_size) | ||
log.info(f"About to process block range ({start}, {start + size - 1})") | ||
res.append( | ||
cls.run_order_data_query( | ||
BlockRange(block_from=start, block_to=start + size - 1) | ||
) | ||
) | ||
start = start + size | ||
return pd.concat(res) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that this is not horrible code duplication. Not many lines can be saved by moving the chunking into its own function and one would have to awkwardly pass a function as argument to it.
But then again the logic of chunking always has potential for off by one errors. So I would still think it is worth it to refactor this at some point into one function for chunking which then can also be properly tested.
.replace( | ||
"{{EPSILON_UPPER}}", str(config.reward_config.batch_reward_cap_upper) | ||
) | ||
.replace("{{results}}", "dune_sync_batch_data_table") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to also add an {{env}}
to the batch rewards query?
This PR adds some functionality that is disjoint from the rest of the payouts script, and can be used to set up a cronjob that syncs some batch and order data to the analytics db. A separate dune-sync job is meant to be introduced in order to sync these tables to the corresponding dune tables.
In order to avoid conflict with the current state of the script, the batch and order data queries, which are almost identical to the order rewards and batch rewards queries, are presented as separate files. The block intervals on which the queries work are still to be double-checked.
To trigger the scripts locally and test the functionality, one can run
python -m src.data_sync.sync_data --sync-table order_data
or
python -m src.data_sync.sync_data --sync-table batch_data