diff --git a/src/fetch/payouts.py b/src/fetch/payouts.py index c3af462d..bda38fb8 100644 --- a/src/fetch/payouts.py +++ b/src/fetch/payouts.py @@ -475,9 +475,13 @@ def construct_payouts( batch_rewards_df = batch_rewards_df.drop( ["partner_list", "partner_fee_eth"], axis=1 ) + + assert batch_rewards_df["solver"].is_unique, "solver not unique in batch rewards" + assert quote_rewards_df["solver"].is_unique, "solver not unique in quote rewards" merged_df = pandas.merge( quote_rewards_df, batch_rewards_df, on="solver", how="outer" ).fillna(0) + service_fee_df = pandas.DataFrame(dune.get_service_fee_status()) service_fee_df["service_fee"] = [ datetime.strptime(time_string, "%Y-%m-%d %H:%M:%S.%f %Z") <= dune.period.start diff --git a/src/pg_client.py b/src/pg_client.py index caaa4166..b7921dd5 100644 --- a/src/pg_client.py +++ b/src/pg_client.py @@ -4,12 +4,15 @@ import pandas as pd -from pandas import DataFrame +from pandas import DataFrame, Series from sqlalchemy import create_engine from sqlalchemy.engine import Engine +from src.logger import set_log from src.utils.query_file import open_query +log = set_log(__name__) + class MultiInstanceDBFetcher: """ @@ -57,7 +60,28 @@ def get_solver_rewards(self, start_block: str, end_block: str) -> DataFrame: self.exec_query(query=batch_reward_query_barn, engine=engine) ) - return pd.concat(results) + results_df = pd.concat(results) + + # warn and merge in case of solvers in both environments + if not results_df["solver"].is_unique: + log_duplicate_rows(results_df) + + results_df = ( + results_df.groupby("solver") + .agg( + { + "primary_reward_eth": "sum", + "protocol_fee_eth": "sum", + "network_fee_eth": "sum", + # there can be duplicate entries in partner_list now + "partner_list": merge_lists, + "partner_fee_eth": merge_lists, + } + ) + .reset_index() + ) + + return results_df def get_quote_rewards(self, start_block: str, end_block: str) -> DataFrame: """Returns aggregated solver quote rewards for block range""" @@ -70,8 +94,17 @@ def get_quote_rewards(self, start_block: str, end_block: str) -> DataFrame: self.exec_query(query=quote_reward_query, engine=engine) for engine in self.connections ] + results_df = pd.concat(results) + + # warn and merge in case of solvers in both environments + if not results_df["solver"].is_unique: + log_duplicate_rows(results_df) - return pd.concat(results) + results_df = ( + results_df.groupby("solver").agg({"num_quotes": "sum"}).reset_index() + ) + + return results_df def pg_hex2bytea(hex_address: str) -> str: @@ -80,3 +113,31 @@ def pg_hex2bytea(hex_address: str) -> str: compatible bytea by replacing `0x` with `\\x`. """ return hex_address.replace("0x", "\\x") + + +def log_duplicate_rows(df: DataFrame) -> None: + """Log rows with duplicate solvers entries. + Printing defaults are changed to show all column entries.""" + duplicated_entries = df[df["solver"].duplicated(keep=False)] + with pd.option_context( + "display.max_columns", + None, + "display.width", + None, + "display.max_colwidth", + None, + ): + log.warning( + f"Solvers found in both environments:\n {duplicated_entries}.\n" + "Merging results." + ) + + +def merge_lists(series: Series) -> list | None: + """Merges series containing lists into large list. + Returns None if the result would be an empty list.""" + merged = [] + for lst in series: + if lst is not None: + merged.extend(lst) + return merged if merged else None