diff --git a/src/fetch/payouts.py b/src/fetch/payouts.py index c81ed242..b1b39fbf 100644 --- a/src/fetch/payouts.py +++ b/src/fetch/payouts.py @@ -468,9 +468,13 @@ def construct_payouts( batch_rewards_df = batch_rewards_df.drop( ["partner_list", "partner_fee_eth"], axis=1 ) + + assert batch_rewards_df["solver"].is_unique, "solver not unique in batch rewards" + assert quote_rewards_df["solver"].is_unique, "solver not unique in quote rewards" merged_df = pandas.merge( quote_rewards_df, batch_rewards_df, on="solver", how="outer" ).fillna(0) + service_fee_df = pandas.DataFrame(dune.get_service_fee_status()) service_fee_df["service_fee"] = [ datetime.strptime(time_string, "%Y-%m-%d %H:%M:%S.%f %Z") <= dune.period.start diff --git a/src/pg_client.py b/src/pg_client.py index caaa4166..556cea39 100644 --- a/src/pg_client.py +++ b/src/pg_client.py @@ -8,8 +8,11 @@ from sqlalchemy import create_engine from sqlalchemy.engine import Engine +from src.logger import set_log from src.utils.query_file import open_query +log = set_log(__name__) + class MultiInstanceDBFetcher: """ @@ -57,7 +60,38 @@ def get_solver_rewards(self, start_block: str, end_block: str) -> DataFrame: self.exec_query(query=batch_reward_query_barn, engine=engine) ) - return pd.concat(results) + results_df = pd.concat(results) + + # warn and merge in case of solvers in both environments + if not results_df["solver"].is_unique: + duplicated_entries = results_df[results_df["solver"].duplicated(keep=False)] + log.warning( + f"Solvers found in both environments:\n {duplicated_entries}.\n Merging results." + ) + + def merge_lists(series): + merged = [] + for lst in series: + if lst is not None: + merged.extend(lst) + return merged if merged else None + + results_df = ( + results_df.groupby("solver") + .agg( + { + "primary_reward_eth": "sum", + "protocol_fee_eth": "sum", + "network_fee_eth": "sum", + # there can be duplicate entries in partner_list now + "partner_list": merge_lists, + "partner_fee_eth": merge_lists, + } + ) + .reset_index() + ) + + return results_df def get_quote_rewards(self, start_block: str, end_block: str) -> DataFrame: """Returns aggregated solver quote rewards for block range""" @@ -70,8 +104,19 @@ def get_quote_rewards(self, start_block: str, end_block: str) -> DataFrame: self.exec_query(query=quote_reward_query, engine=engine) for engine in self.connections ] + results_df = pd.concat(results) + + # warn and merge in case of solvers in both environments + if not results_df["solver"].is_unique: + duplicated_entries = results_df[results_df["solver"].duplicated(keep=False)] + log.warning( + f"Solvers found in both environments:\n {duplicated_entries}.\n Merging results." + ) + results_df = ( + results_df.groupby("solver").agg({"num_quotes": "sum"}).reset_index() + ) - return pd.concat(results) + return results_df def pg_hex2bytea(hex_address: str) -> str: