Skip to content

Commit

Permalink
warn and merge if solvers appear in prod and barn
Browse files Browse the repository at this point in the history
  • Loading branch information
fhenneke committed Oct 25, 2024
1 parent fa1e14a commit 0e1a806
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 2 deletions.
4 changes: 4 additions & 0 deletions src/fetch/payouts.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,9 +468,13 @@ def construct_payouts(
batch_rewards_df = batch_rewards_df.drop(
["partner_list", "partner_fee_eth"], axis=1
)

assert batch_rewards_df["solver"].is_unique, "solver not unique in batch rewards"
assert quote_rewards_df["solver"].is_unique, "solver not unique in quote rewards"
merged_df = pandas.merge(
quote_rewards_df, batch_rewards_df, on="solver", how="outer"
).fillna(0)

service_fee_df = pandas.DataFrame(dune.get_service_fee_status())
service_fee_df["service_fee"] = [
datetime.strptime(time_string, "%Y-%m-%d %H:%M:%S.%f %Z") <= dune.period.start
Expand Down
49 changes: 47 additions & 2 deletions src/pg_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine

from src.logger import set_log
from src.utils.query_file import open_query

log = set_log(__name__)


class MultiInstanceDBFetcher:
"""
Expand Down Expand Up @@ -57,7 +60,38 @@ def get_solver_rewards(self, start_block: str, end_block: str) -> DataFrame:
self.exec_query(query=batch_reward_query_barn, engine=engine)
)

return pd.concat(results)
results_df = pd.concat(results)

# warn and merge in case of solvers in both environments
if not results_df["solver"].is_unique:
duplicated_entries = results_df[results_df["solver"].duplicated(keep=False)]
log.warning(
f"Solvers found in both environments:\n {duplicated_entries}.\n Merging results."
)

def merge_lists(series):
merged = []
for lst in series:
if lst is not None:
merged.extend(lst)
return merged if merged else None

results_df = (
results_df.groupby("solver")
.agg(
{
"primary_reward_eth": "sum",
"protocol_fee_eth": "sum",
"network_fee_eth": "sum",
# there can be duplicate entries in partner_list now
"partner_list": merge_lists,
"partner_fee_eth": merge_lists,
}
)
.reset_index()
)

return results_df

def get_quote_rewards(self, start_block: str, end_block: str) -> DataFrame:
"""Returns aggregated solver quote rewards for block range"""
Expand All @@ -70,8 +104,19 @@ def get_quote_rewards(self, start_block: str, end_block: str) -> DataFrame:
self.exec_query(query=quote_reward_query, engine=engine)
for engine in self.connections
]
results_df = pd.concat(results)

# warn and merge in case of solvers in both environments
if not results_df["solver"].is_unique:
duplicated_entries = results_df[results_df["solver"].duplicated(keep=False)]
log.warning(
f"Solvers found in both environments:\n {duplicated_entries}.\n Merging results."
)
results_df = (
results_df.groupby("solver").agg({"num_quotes": "sum"}).reset_index()
)

return pd.concat(results)
return results_df


def pg_hex2bytea(hex_address: str) -> str:
Expand Down

0 comments on commit 0e1a806

Please sign in to comment.