From b0e7bd5a89ba31a58f38a54af01bd6b447cc5583 Mon Sep 17 00:00:00 2001 From: Felix Henneke Date: Mon, 28 Oct 2024 16:53:43 +0100 Subject: [PATCH] Warn and merge if solvers appear in prod and barn (#411) This PR adds default handling in case solvers appear in prod and barn. In such cases, a warning is emitted and data is merged. At the moment, if settlement in prod and barn are settled by the same address (or rather if the same address is reported during bidding), solvers can appear in multiple rows in `batch_rewards_df` here. Since we later merge on solvers, we might assign slippage or quote rewards to a solver twice. A similar thing can happen with `quote_rewards_df`. With this PR, duplicate entries are detected after fetching data and suitably merged. All numerical columns are summed in merging. The columns containing list entries on partner fees are concatenated. The assumption on unique solvers is later checked at the point where it is required. This is more of a hot fix and refactoring is required to make the code understandable. One alternative would be to implement all processing of the fetched data in `payouts.py` where the rest of the processing happens. With this PR there is a mix of fetching, checking, merging of rewards, followed by checking and merging of rewards, followed by checking them again in `construct_payout_dataframe`. Before this PR, there was fetching, merging of rewards, then merging of rewards, then checking and merging. So generally quote confusing. # Testing plan I tested the code for the accounting week 2024-10-15 -- 2024-10-22 and it produced the roughly same amounts which were used for payments. There was a difference of 0.003 ETH and around 100 COW. This difference if probably because in the actual payment, data was not merged but removed. I have not added a test yet. --- src/fetch/payouts.py | 4 +++ src/pg_client.py | 67 ++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 68 insertions(+), 3 deletions(-) diff --git a/src/fetch/payouts.py b/src/fetch/payouts.py index c81ed242..b1b39fbf 100644 --- a/src/fetch/payouts.py +++ b/src/fetch/payouts.py @@ -468,9 +468,13 @@ def construct_payouts( batch_rewards_df = batch_rewards_df.drop( ["partner_list", "partner_fee_eth"], axis=1 ) + + assert batch_rewards_df["solver"].is_unique, "solver not unique in batch rewards" + assert quote_rewards_df["solver"].is_unique, "solver not unique in quote rewards" merged_df = pandas.merge( quote_rewards_df, batch_rewards_df, on="solver", how="outer" ).fillna(0) + service_fee_df = pandas.DataFrame(dune.get_service_fee_status()) service_fee_df["service_fee"] = [ datetime.strptime(time_string, "%Y-%m-%d %H:%M:%S.%f %Z") <= dune.period.start diff --git a/src/pg_client.py b/src/pg_client.py index caaa4166..b7921dd5 100644 --- a/src/pg_client.py +++ b/src/pg_client.py @@ -4,12 +4,15 @@ import pandas as pd -from pandas import DataFrame +from pandas import DataFrame, Series from sqlalchemy import create_engine from sqlalchemy.engine import Engine +from src.logger import set_log from src.utils.query_file import open_query +log = set_log(__name__) + class MultiInstanceDBFetcher: """ @@ -57,7 +60,28 @@ def get_solver_rewards(self, start_block: str, end_block: str) -> DataFrame: self.exec_query(query=batch_reward_query_barn, engine=engine) ) - return pd.concat(results) + results_df = pd.concat(results) + + # warn and merge in case of solvers in both environments + if not results_df["solver"].is_unique: + log_duplicate_rows(results_df) + + results_df = ( + results_df.groupby("solver") + .agg( + { + "primary_reward_eth": "sum", + "protocol_fee_eth": "sum", + "network_fee_eth": "sum", + # there can be duplicate entries in partner_list now + "partner_list": merge_lists, + "partner_fee_eth": merge_lists, + } + ) + .reset_index() + ) + + return results_df def get_quote_rewards(self, start_block: str, end_block: str) -> DataFrame: """Returns aggregated solver quote rewards for block range""" @@ -70,8 +94,17 @@ def get_quote_rewards(self, start_block: str, end_block: str) -> DataFrame: self.exec_query(query=quote_reward_query, engine=engine) for engine in self.connections ] + results_df = pd.concat(results) + + # warn and merge in case of solvers in both environments + if not results_df["solver"].is_unique: + log_duplicate_rows(results_df) - return pd.concat(results) + results_df = ( + results_df.groupby("solver").agg({"num_quotes": "sum"}).reset_index() + ) + + return results_df def pg_hex2bytea(hex_address: str) -> str: @@ -80,3 +113,31 @@ def pg_hex2bytea(hex_address: str) -> str: compatible bytea by replacing `0x` with `\\x`. """ return hex_address.replace("0x", "\\x") + + +def log_duplicate_rows(df: DataFrame) -> None: + """Log rows with duplicate solvers entries. + Printing defaults are changed to show all column entries.""" + duplicated_entries = df[df["solver"].duplicated(keep=False)] + with pd.option_context( + "display.max_columns", + None, + "display.width", + None, + "display.max_colwidth", + None, + ): + log.warning( + f"Solvers found in both environments:\n {duplicated_entries}.\n" + "Merging results." + ) + + +def merge_lists(series: Series) -> list | None: + """Merges series containing lists into large list. + Returns None if the result would be an empty list.""" + merged = [] + for lst in series: + if lst is not None: + merged.extend(lst) + return merged if merged else None