diff --git a/src/helpers/blockchain_data.py b/src/helpers/blockchain_data.py index 072e9c4..b4d55f7 100644 --- a/src/helpers/blockchain_data.py +++ b/src/helpers/blockchain_data.py @@ -83,19 +83,6 @@ def get_transaction_timestamp(self, tx_hash: str) -> tuple[str, int]: return tx_hash, timestamp - def get_transaction_tokens(self, tx_hash: str) -> list[tuple[str, str]]: - receipt = self.web3.eth.get_transaction_receipt(HexStr(tx_hash)) - - transfer_topic = self.web3.keccak(text="Transfer(address,address,uint256)") - - token_addresses: set[str] = set() - for log in receipt["logs"]: - if log["topics"] and log["topics"][0] == transfer_topic: - token_address = log["address"] - token_addresses.add(token_address) - - return [(tx_hash, token_address) for token_address in token_addresses] - def get_token_decimals(self, token_address: str) -> int: """Get number of decimals for a token.""" contract = self.web3.eth.contract( diff --git a/src/helpers/database.py b/src/helpers/database.py index e465376..7fa9fd7 100644 --- a/src/helpers/database.py +++ b/src/helpers/database.py @@ -43,6 +43,18 @@ def execute_and_commit(self, query: str, params: dict): connection.rollback() raise + def get_price(self, token_address, time, source): + query = "SELECT * FROM prices WHERE token_address = :token_address AND time = :time AND source = :source" + result = self.execute_query( + query, + { + "token_address": bytes.fromhex(token_address[2:]), + "time": datetime.fromtimestamp(time, tz=timezone.utc), + "source": source, + }, + ) + return result.fetchone() + def write_token_imbalances( self, tx_hash: str, @@ -173,6 +185,12 @@ def write_prices_new(self, prices: list[tuple[str, int, float, str]]) -> None: ) for token_address, time, price, source in prices: try: + if self.get_price(token_address, time, source) is not None: + logger.info( + "Skipping INSERT operation as entry already exists in PRICES table." + ) + continue + self.execute_and_commit( query, { diff --git a/src/transaction_processor.py b/src/transaction_processor.py index af42ace..a003826 100644 --- a/src/transaction_processor.py +++ b/src/transaction_processor.py @@ -118,11 +118,6 @@ def process_single_transaction( """Function processes a single tx to find imbalances, fees, prices including writing to database.""" self.log_message = [] try: - # compute raw token imbalances - token_imbalances = self.process_token_imbalances( - tx_hash, auction_id, block_number - ) - # get transaction timestamp transaction_timestamp = self.blockchain_data.get_transaction_timestamp( tx_hash @@ -130,13 +125,12 @@ def process_single_transaction( # store transaction timestamp self.db.write_transaction_timestamp(transaction_timestamp) - # get transaction tokens - # transaction_tokens = self.blockchain_data.get_transaction_tokens(tx_hash) - # store transaction tokens + # get transaction tokens by first computing raw token imbalances + token_imbalances = self.process_token_imbalances(tx_hash) transaction_tokens = [] - for token_address, imbalance in token_imbalances.items(): - if imbalance != 0: - transaction_tokens.append((tx_hash, token_address)) + for token_address in token_imbalances.keys(): + transaction_tokens.append((tx_hash, token_address)) + # store transaction tokens self.db.write_transaction_tokens(transaction_tokens) # update token decimals @@ -202,7 +196,8 @@ def process_single_transaction( return def process_token_imbalances( - self, tx_hash: str, auction_id: int, block_number: int + self, + tx_hash: str, ) -> dict[str, int]: """Process token imbalances for a given transaction and return imbalances.""" try: diff --git a/tests/e2e/test_blockchain_data.py b/tests/e2e/test_blockchain_data.py index 370b33d..c5fcb29 100644 --- a/tests/e2e/test_blockchain_data.py +++ b/tests/e2e/test_blockchain_data.py @@ -25,22 +25,3 @@ def test_get_transaction_timestamp(): transaction_timestamp = blockchain.get_transaction_timestamp(tx_hash) assert transaction_timestamp == (tx_hash, 1728044411) - - -def test_get_transaction_tokens(): - web3 = Web3(Web3.HTTPProvider(getenv("NODE_URL"))) - blockchain = BlockchainData(web3) - tx_hash = "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c" - - transaction_tokens = blockchain.get_transaction_tokens(tx_hash) - - assert all(h == tx_hash for h, _ in transaction_tokens) - assert set(token_address for _, token_address in transaction_tokens) == { - "0x7Fc66500c84A76Ad7e9c93437bFc5Ac33E2DDaE9", - "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48", - "0x812Ba41e071C7b7fA4EBcFB62dF5F45f6fA853Ee", - "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2", - "0xdAC17F958D2ee523a2206206994597C13D831ec7", - "0xEE2a03Aa6Dacf51C18679C516ad5283d8E7C2637", - "0x72e4f9F808C49A2a61dE9C5896298920Dc4EEEa9", - } diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index 67fe97c..730e629 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -99,7 +99,47 @@ def tests_write_prices(): ).all() for i, (token_address, time, price, source) in enumerate(token_prices): assert HexBytes(res[i][0]) == HexBytes(token_address) - assert res[i][1].timestamp() == time + assert res[i][1].replace(tzinfo=timezone.utc).timestamp() == time + assert float(res[i][2]) == price + assert res[i][3] == source + + +def tests_write_duplicate_prices(): + engine = create_engine( + f"postgresql+psycopg://postgres:postgres@localhost:5432/mainnet" + ) + db = Database(engine, "mainnet") + # list contains duplicate entry in order to test how this is handled + token_prices = [ + ( + "0xA0B86991C6218B36C1D19D4A2E9EB0CE3606EB48", + int(datetime.fromisoformat("2024-10-10 16:48:47.000000").timestamp()), + 0.000420454193230350, + "coingecko", + ), + ( + "0xA0B86991C6218B36C1D19D4A2E9EB0CE3606EB48", + int(datetime.fromisoformat("2024-10-10 16:48:47.000000").timestamp()), + 0.000420454193230350, + "coingecko", + ), + ] + # truncate table + with engine.connect() as conn: + conn.execute(text("TRUNCATE prices")) + conn.commit() + # write data + db.write_prices_new(token_prices) + # read data + with engine.connect() as conn: + res = conn.execute( + text("SELECT token_address, time, price, source FROM prices") + ).all() + # cleaning up the duplicate entry + token_prices = token_prices[:1] + for i, (token_address, time, price, source) in enumerate(token_prices): + assert HexBytes(res[i][0]) == HexBytes(token_address) + assert res[i][1].replace(tzinfo=timezone.utc).timestamp() == time assert float(res[i][2]) == price assert res[i][3] == source