diff --git a/prediction_market_agent/agents/microchain_agent/nft_treasury_game/app_nft_treasury_game.py b/prediction_market_agent/agents/microchain_agent/nft_treasury_game/app_nft_treasury_game.py index 001d5d9c..62c79287 100644 --- a/prediction_market_agent/agents/microchain_agent/nft_treasury_game/app_nft_treasury_game.py +++ b/prediction_market_agent/agents/microchain_agent/nft_treasury_game/app_nft_treasury_game.py @@ -12,6 +12,7 @@ from microchain.functions import Reasoning, Stop from prediction_market_agent_tooling.tools.balances import get_balances from prediction_market_agent_tooling.tools.utils import check_not_none +from prediction_market_agent_tooling.tools.web3_utils import wei_to_xdai from streamlit_extras.stylable_container import stylable_container from prediction_market_agent.agents.identifiers import AgentIdentifier @@ -27,6 +28,9 @@ DEPLOYED_NFT_AGENTS, DeployableAgentNFTGameAbstract, ) +from prediction_market_agent.db.blockchain_transaction_fetcher import ( + BlockchainTransactionFetcher, +) from prediction_market_agent.db.long_term_memory_table_handler import ( LongTermMemories, LongTermMemoryTableHandler, @@ -43,6 +47,11 @@ class DummyFunctionName(str, Enum): RESPONSE_FUNCTION_NAME = "Response" +@st.cache_resource +def blockchain_transaction_fetcher() -> BlockchainTransactionFetcher: + return BlockchainTransactionFetcher() + + @st.cache_resource def long_term_memory_table_handler( identifier: AgentIdentifier, @@ -191,6 +200,24 @@ def show_about_agent_part(nft_agent: type[DeployableAgentNFTGameAbstract]) -> No value=system_prompt, disabled=True, ) + st.markdown("---") + with st.popover("Show unprocessed incoming messages"): + transactions = blockchain_transaction_fetcher().fetch_unseen_transactions( + nft_agent.wallet_address + ) + + if not transactions: + st.info("No unprocessed messages") + else: + for transaction in transactions: + st.markdown( + f""" + **From:** {transaction.sender_address} + **Message:** {transaction.data_field} + **Value:** {wei_to_xdai(transaction.value_wei_parsed)} xDai + """ + ) + st.divider() @st.fragment(run_every=timedelta(seconds=5)) diff --git a/prediction_market_agent/db/blockchain_message_table_handler.py b/prediction_market_agent/db/blockchain_message_table_handler.py index 22950dc9..97080ae9 100644 --- a/prediction_market_agent/db/blockchain_message_table_handler.py +++ b/prediction_market_agent/db/blockchain_message_table_handler.py @@ -48,4 +48,7 @@ def fetch_all_transaction_hashes( return list(set(tx_hashes)) def save_multiple(self, items: t.Sequence[BlockchainMessage]) -> None: - return self.sql_handler.save_multiple(items) + return self.sql_handler.save_multiple( + # Re-create the items to avoid SQLModel errors. This is a workaround. It's weird, but it works. :shrug: + [BlockchainMessage(**i.model_dump()) for i in items] + ) diff --git a/prediction_market_agent/db/blockchain_transaction_fetcher.py b/prediction_market_agent/db/blockchain_transaction_fetcher.py index c1988d12..d2977b24 100644 --- a/prediction_market_agent/db/blockchain_transaction_fetcher.py +++ b/prediction_market_agent/db/blockchain_transaction_fetcher.py @@ -1,3 +1,5 @@ +from typing import Any + import polars as pl import spice from eth_typing import ChecksumAddress @@ -27,9 +29,21 @@ def unzip_message_else_do_nothing(self, data_field: str) -> str: except: return data_field - def fetch_unseen_transactions_df( + def blockchain_message_from_dune_df_row( + self, consumer_address: ChecksumAddress, x: dict[str, Any] + ) -> BlockchainMessage: + return BlockchainMessage( + consumer_address=consumer_address, + transaction_hash=x["hash"], + value_wei=str(x["value"]), + block=str(x["block_number"]), + sender_address=x["from"], + data_field=self.unzip_message_else_do_nothing(x["data"]), + ) + + def fetch_unseen_transactions( self, consumer_address: ChecksumAddress - ) -> pl.DataFrame: + ) -> list[BlockchainMessage]: keys = APIKeys() latest_blockchain_message = ( self.blockchain_table_handler.fetch_latest_blockchain_message( @@ -50,13 +64,16 @@ def fetch_unseen_transactions_df( ) # Filter out existing hashes - hashes are by default lowercase df = df.filter(~pl.col("hash").is_in([i.hex() for i in existing_hashes])) - return df + return [ + self.blockchain_message_from_dune_df_row(consumer_address, x) + for x in df.iter_rows(named=True) + ] def fetch_count_unprocessed_transactions( self, consumer_address: ChecksumAddress ) -> int: - df = self.fetch_unseen_transactions_df(consumer_address=consumer_address) - return len(df) + transactions = self.fetch_unseen_transactions(consumer_address=consumer_address) + return len(transactions) def fetch_one_unprocessed_blockchain_message_and_store_as_processed( self, consumer_address: ChecksumAddress @@ -65,25 +82,13 @@ def fetch_one_unprocessed_blockchain_message_and_store_as_processed( Method for fetching oldest unprocessed transaction sent to the consumer address. After being fetched, it is stored in the DB as processed. """ - df = self.fetch_unseen_transactions_df(consumer_address=consumer_address) - if df.is_empty(): + transactions = self.fetch_unseen_transactions(consumer_address=consumer_address) + if not transactions: return None # We only want the oldest non-processed message. - oldest_non_processed_message = df.row(0, named=True) - blockchain_message = BlockchainMessage( - consumer_address=consumer_address, - transaction_hash=oldest_non_processed_message["hash"], - value_wei=str(oldest_non_processed_message["value"]), - block=str(oldest_non_processed_message["block_number"]), - sender_address=oldest_non_processed_message["from"], - data_field=self.unzip_message_else_do_nothing( - oldest_non_processed_message["data"] - ), - ) + blockchain_message = transactions[0] - # Store here to avoid having to refresh after session was closed. - item = blockchain_message.model_copy(deep=True) # mark unseen transaction as processed in DB self.blockchain_table_handler.save_multiple([blockchain_message]) - return item + return blockchain_message