Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add popup with unprocessed messages #603

Merged
merged 5 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from microchain.functions import Reasoning, Stop
from prediction_market_agent_tooling.tools.balances import get_balances
from prediction_market_agent_tooling.tools.utils import check_not_none
from prediction_market_agent_tooling.tools.web3_utils import wei_to_xdai
from streamlit_extras.stylable_container import stylable_container

from prediction_market_agent.agents.identifiers import AgentIdentifier
Expand All @@ -27,6 +28,9 @@
DEPLOYED_NFT_AGENTS,
DeployableAgentNFTGameAbstract,
)
from prediction_market_agent.db.blockchain_transaction_fetcher import (
BlockchainTransactionFetcher,
)
from prediction_market_agent.db.long_term_memory_table_handler import (
LongTermMemories,
LongTermMemoryTableHandler,
Expand All @@ -43,6 +47,11 @@ class DummyFunctionName(str, Enum):
RESPONSE_FUNCTION_NAME = "Response"


@st.cache_resource
def blockchain_transaction_fetcher() -> BlockchainTransactionFetcher:
return BlockchainTransactionFetcher()


@st.cache_resource
def long_term_memory_table_handler(
identifier: AgentIdentifier,
Expand Down Expand Up @@ -191,6 +200,24 @@ def show_about_agent_part(nft_agent: type[DeployableAgentNFTGameAbstract]) -> No
value=system_prompt,
disabled=True,
)
st.markdown("---")
with st.popover("Show unprocessed incoming messages"):
transactions = blockchain_transaction_fetcher().fetch_unseen_transactions(
nft_agent.wallet_address
)
Comment on lines +205 to +207
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I click 2x on the button, does it trigger fetch_unseen_transactions 2x?
spice is caching results, but still I would suggest placing the transactions inside a st.session_state, updating that through st.fragment and just reading from that inside the popover. Wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole thing is in the fragment already:

Screenshot by Dropbox Capture

I don't see how saving it manually to state would help?

As for the button, I tested it and code inside of with block is executed regardless of clicking the button:

Screenshot by Dropbox Capture

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ow, understood - so button simply toggles the disabled property and shows stuff, but doesn't trigger any fetching logic. Fine.


if not transactions:
st.info("No unprocessed messages")
else:
for transaction in transactions:
st.markdown(
f"""
**From:** {transaction.sender_address}
**Message:** {transaction.data_field}
**Value:** {wei_to_xdai(transaction.value_wei_parsed)} xDai
"""
)
Comment on lines +213 to +219
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure proper sanitization when displaying user-generated content

Displaying transaction.data_field, which may contain user-generated content, using st.markdown can pose security risks if the content includes malicious code.

Verify that st.markdown is safe from injection attacks in this context. If necessary, escape or sanitize transaction.data_field before rendering:

import html

st.markdown(
    f"""
    **From:** {transaction.sender_address}  
    **Message:** {html.escape(transaction.data_field)}  
    **Value:** {wei_to_xdai(transaction.value_wei_parsed)} xDai
    """
)

This ensures that any HTML or special characters in data_field are properly escaped, mitigating potential security vulnerabilities.

st.divider()


@st.fragment(run_every=timedelta(seconds=5))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,7 @@ def fetch_all_transaction_hashes(
return list(set(tx_hashes))

def save_multiple(self, items: t.Sequence[BlockchainMessage]) -> None:
return self.sql_handler.save_multiple(items)
return self.sql_handler.save_multiple(
# Re-create the items to avoid SQLModel errors. This is a workaround. It's weird, but it works. :shrug:
[BlockchainMessage(**i.model_dump()) for i in items]
)
Comment on lines +51 to +54
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider documenting the SQLModel issue and exploring alternative solutions

While the workaround functions, recreating objects for every save operation could impact performance with large datasets. Additionally, the casual comment style (":shrug:") should be replaced with proper documentation.

Consider:

  1. Document the specific SQLModel error being worked around
  2. Investigate the root cause - is it related to SQLModel's handling of nested models or serialization?
  3. Add error handling in case model_dump() fails
  4. Replace the casual comment with proper technical documentation
def save_multiple(self, items: t.Sequence[BlockchainMessage]) -> None:
    return self.sql_handler.save_multiple(
-        # Re-create the items to avoid SQLModel errors. This is a workaround. It's weird, but it works. :shrug:
+        # TODO: Temporary workaround for SQLModel serialization issue #<issue_number>
+        # When saving BlockchainMessage instances directly, SQLModel raises:
+        # <specific error message>
+        # This recreates clean instances to avoid the error.
        [BlockchainMessage(**i.model_dump()) for i in items]
    )

Committable suggestion skipped: line range outside the PR's diff.

47 changes: 26 additions & 21 deletions prediction_market_agent/db/blockchain_transaction_fetcher.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Any

import polars as pl
import spice
from eth_typing import ChecksumAddress
Expand Down Expand Up @@ -27,9 +29,21 @@ def unzip_message_else_do_nothing(self, data_field: str) -> str:
except:
return data_field

def fetch_unseen_transactions_df(
def blockchain_message_from_dune_df_row(
self, consumer_address: ChecksumAddress, x: dict[str, Any]
) -> BlockchainMessage:
return BlockchainMessage(
consumer_address=consumer_address,
transaction_hash=x["hash"],
value_wei=str(x["value"]),
block=str(x["block_number"]),
sender_address=x["from"],
data_field=self.unzip_message_else_do_nothing(x["data"]),
)
Comment on lines +32 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle potential KeyError exceptions when accessing dictionary keys

In blockchain_message_from_dune_df_row, accessing dictionary keys directly (e.g., x["hash"]) without verifying their existence can lead to KeyError exceptions if any keys are missing from the x dictionary. This could cause the application to crash unexpectedly when processing incomplete data from external sources.

Consider modifying the method to handle missing keys gracefully:

def blockchain_message_from_dune_df_row(
    self, consumer_address: ChecksumAddress, x: dict[str, Any]
) -> BlockchainMessage:
    return BlockchainMessage(
        consumer_address=consumer_address,
-       transaction_hash=x["hash"],
-       value_wei=str(x["value"]),
-       block=str(x["block_number"]),
-       sender_address=x["from"],
-       data_field=self.unzip_message_else_do_nothing(x["data"]),
+       transaction_hash=x.get("hash", ""),
+       value_wei=str(x.get("value", 0)),
+       block=str(x.get("block_number", 0)),
+       sender_address=x.get("from", ""),
+       data_field=self.unzip_message_else_do_nothing(x.get("data", "")),
    )

This ensures that missing keys default to safe values, preventing KeyError exceptions.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def blockchain_message_from_dune_df_row(
self, consumer_address: ChecksumAddress, x: dict[str, Any]
) -> BlockchainMessage:
return BlockchainMessage(
consumer_address=consumer_address,
transaction_hash=x["hash"],
value_wei=str(x["value"]),
block=str(x["block_number"]),
sender_address=x["from"],
data_field=self.unzip_message_else_do_nothing(x["data"]),
)
def blockchain_message_from_dune_df_row(
self, consumer_address: ChecksumAddress, x: dict[str, Any]
) -> BlockchainMessage:
return BlockchainMessage(
consumer_address=consumer_address,
transaction_hash=x.get("hash", ""),
value_wei=str(x.get("value", 0)),
block=str(x.get("block_number", 0)),
sender_address=x.get("from", ""),
data_field=self.unzip_message_else_do_nothing(x.get("data", "")),
)


def fetch_unseen_transactions(
self, consumer_address: ChecksumAddress
) -> pl.DataFrame:
) -> list[BlockchainMessage]:
keys = APIKeys()
latest_blockchain_message = (
self.blockchain_table_handler.fetch_latest_blockchain_message(
Expand All @@ -50,13 +64,16 @@ def fetch_unseen_transactions_df(
)
# Filter out existing hashes - hashes are by default lowercase
df = df.filter(~pl.col("hash").is_in([i.hex() for i in existing_hashes]))
return df
return [
self.blockchain_message_from_dune_df_row(consumer_address, x)
for x in df.iter_rows(named=True)
]

def fetch_count_unprocessed_transactions(
self, consumer_address: ChecksumAddress
) -> int:
df = self.fetch_unseen_transactions_df(consumer_address=consumer_address)
return len(df)
transactions = self.fetch_unseen_transactions(consumer_address=consumer_address)
return len(transactions)

Comment on lines +75 to 77
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid fetching all transactions just to get the count

Calling fetch_unseen_transactions to get the count of unprocessed transactions fetches all transaction data, which is inefficient and could impact performance.

Refactor fetch_count_unprocessed_transactions to fetch only the count from the database:

def fetch_count_unprocessed_transactions(
    self, consumer_address: ChecksumAddress
) -> int:
-    transactions = self.fetch_unseen_transactions(consumer_address=consumer_address)
-    return len(transactions)
+    keys = APIKeys()
+    latest_blockchain_message = (
+        self.blockchain_table_handler.fetch_latest_blockchain_message(
+            consumer_address
+        )
+    )
+    min_block_number = (
+        0 if not latest_blockchain_message else latest_blockchain_message.block
+    )
+    query = f'SELECT COUNT(*) AS count FROM gnosis.transactions WHERE "to" = {Web3.to_checksum_address(consumer_address)} AND block_number >= {min_block_number} AND value >= {xdai_to_wei(MicrochainAgentKeys().RECEIVER_MINIMUM_AMOUNT)}'
+    df = spice.query(query, api_key=keys.dune_api_key.get_secret_value())
+    existing_hashes = self.blockchain_table_handler.fetch_all_transaction_hashes(
+        consumer_address=consumer_address
+    )
+    total_count = df.select("count")[0, 0]
+    unseen_count = total_count - len(existing_hashes)
+    return max(unseen_count, 0)

This change reduces unnecessary data retrieval and improves performance.

Committable suggestion skipped: line range outside the PR's diff.

def fetch_one_unprocessed_blockchain_message_and_store_as_processed(
self, consumer_address: ChecksumAddress
Expand All @@ -65,25 +82,13 @@ def fetch_one_unprocessed_blockchain_message_and_store_as_processed(
Method for fetching oldest unprocessed transaction sent to the consumer address.
After being fetched, it is stored in the DB as processed.
"""
df = self.fetch_unseen_transactions_df(consumer_address=consumer_address)
if df.is_empty():
transactions = self.fetch_unseen_transactions(consumer_address=consumer_address)
if not transactions:
return None

# We only want the oldest non-processed message.
oldest_non_processed_message = df.row(0, named=True)
blockchain_message = BlockchainMessage(
consumer_address=consumer_address,
transaction_hash=oldest_non_processed_message["hash"],
value_wei=str(oldest_non_processed_message["value"]),
block=str(oldest_non_processed_message["block_number"]),
sender_address=oldest_non_processed_message["from"],
data_field=self.unzip_message_else_do_nothing(
oldest_non_processed_message["data"]
),
)
blockchain_message = transactions[0]

# Store here to avoid having to refresh after session was closed.
item = blockchain_message.model_copy(deep=True)
# mark unseen transaction as processed in DB
self.blockchain_table_handler.save_multiple([blockchain_message])
return item
return blockchain_message
Comment on lines +85 to +94
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Optimize fetching of a single unprocessed transaction

Fetching all unseen transactions and selecting the first one is inefficient, especially when only one transaction is needed.

Modify the method to fetch only the oldest unseen transaction:

def fetch_one_unprocessed_blockchain_message_and_store_as_processed(
    self, consumer_address: ChecksumAddress
) -> BlockchainMessage | None:
    """
    Method for fetching the oldest unprocessed transaction sent to the consumer address.
    After being fetched, it is stored in the DB as processed.
    """
-    transactions = self.fetch_unseen_transactions(consumer_address=consumer_address)
-    if not transactions:
+    transactions = self.fetch_unseen_transactions(consumer_address=consumer_address, limit=1)
+    if not transactions:
        return None

    blockchain_message = transactions[0]

    # Mark unseen transaction as processed in DB
    self.blockchain_table_handler.save_multiple([blockchain_message])
    return blockchain_message

Adjust fetch_unseen_transactions to accept a limit parameter and modify the query accordingly to fetch only the required transaction.

Committable suggestion skipped: line range outside the PR's diff.

Loading