Skip to content

Commit

Permalink
Merge branch 'main' into add_flag_to_skip_slippage_computations
Browse files Browse the repository at this point in the history
  • Loading branch information
harisang authored Oct 29, 2024
2 parents 890c046 + b0e7bd5 commit c6b3e88
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 3 deletions.
4 changes: 4 additions & 0 deletions src/fetch/payouts.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,9 +475,13 @@ def construct_payouts(
batch_rewards_df = batch_rewards_df.drop(
["partner_list", "partner_fee_eth"], axis=1
)

assert batch_rewards_df["solver"].is_unique, "solver not unique in batch rewards"
assert quote_rewards_df["solver"].is_unique, "solver not unique in quote rewards"
merged_df = pandas.merge(
quote_rewards_df, batch_rewards_df, on="solver", how="outer"
).fillna(0)

service_fee_df = pandas.DataFrame(dune.get_service_fee_status())
service_fee_df["service_fee"] = [
datetime.strptime(time_string, "%Y-%m-%d %H:%M:%S.%f %Z") <= dune.period.start
Expand Down
67 changes: 64 additions & 3 deletions src/pg_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@


import pandas as pd
from pandas import DataFrame
from pandas import DataFrame, Series
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine

from src.logger import set_log
from src.utils.query_file import open_query

log = set_log(__name__)


class MultiInstanceDBFetcher:
"""
Expand Down Expand Up @@ -57,7 +60,28 @@ def get_solver_rewards(self, start_block: str, end_block: str) -> DataFrame:
self.exec_query(query=batch_reward_query_barn, engine=engine)
)

return pd.concat(results)
results_df = pd.concat(results)

# warn and merge in case of solvers in both environments
if not results_df["solver"].is_unique:
log_duplicate_rows(results_df)

results_df = (
results_df.groupby("solver")
.agg(
{
"primary_reward_eth": "sum",
"protocol_fee_eth": "sum",
"network_fee_eth": "sum",
# there can be duplicate entries in partner_list now
"partner_list": merge_lists,
"partner_fee_eth": merge_lists,
}
)
.reset_index()
)

return results_df

def get_quote_rewards(self, start_block: str, end_block: str) -> DataFrame:
"""Returns aggregated solver quote rewards for block range"""
Expand All @@ -70,8 +94,17 @@ def get_quote_rewards(self, start_block: str, end_block: str) -> DataFrame:
self.exec_query(query=quote_reward_query, engine=engine)
for engine in self.connections
]
results_df = pd.concat(results)

# warn and merge in case of solvers in both environments
if not results_df["solver"].is_unique:
log_duplicate_rows(results_df)

return pd.concat(results)
results_df = (
results_df.groupby("solver").agg({"num_quotes": "sum"}).reset_index()
)

return results_df


def pg_hex2bytea(hex_address: str) -> str:
Expand All @@ -80,3 +113,31 @@ def pg_hex2bytea(hex_address: str) -> str:
compatible bytea by replacing `0x` with `\\x`.
"""
return hex_address.replace("0x", "\\x")


def log_duplicate_rows(df: DataFrame) -> None:
"""Log rows with duplicate solvers entries.
Printing defaults are changed to show all column entries."""
duplicated_entries = df[df["solver"].duplicated(keep=False)]
with pd.option_context(
"display.max_columns",
None,
"display.width",
None,
"display.max_colwidth",
None,
):
log.warning(
f"Solvers found in both environments:\n {duplicated_entries}.\n"
"Merging results."
)


def merge_lists(series: Series) -> list | None:
"""Merges series containing lists into large list.
Returns None if the result would be an empty list."""
merged = []
for lst in series:
if lst is not None:
merged.extend(lst)
return merged if merged else None

0 comments on commit c6b3e88

Please sign in to comment.