Skip to content

Commit

Permalink
chore: more demonstrative examples
Browse files Browse the repository at this point in the history
  • Loading branch information
joe-p committed Nov 7, 2024
1 parent 7b005b7 commit 6fe4b1e
Show file tree
Hide file tree
Showing 9 changed files with 383 additions and 132 deletions.
23 changes: 23 additions & 0 deletions examples/governance/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Subscriber Example: Governance

This example demonstrates how to use the `AlgorandSubscriber` to parse governance commitment transactions. Every 10 seconds, the subscriber will print out all of the governance commitments made since the last sync. The subscriber in this example uses `"sync_behaviour": "catchup-with-indexer"` to catch up because we are expecting to have a large amount of transactions with a common note prefix. This is an example of where the indexer's server-side filtering is useful. It should be noted that the exact same behavior can be achieved without indexer using algod, but indexer allows for a quicker catchup with fewer API calls. This example also only polls the chain every 10 seconds, since it is primarily useful for historical data and we don't care about live data.

## Governance Prefix

This example uses the `af/gov1` governance prefix to find governance transactions. For more information on Algorand Governance transactions, see the [Govenor's Guide](https://forum.algorand.org/t/governors-guide-2021-2024/12013).

## Running the example

To run the example, execute the following commands:

### Install dependencies

```bash
poetry install
```

### Run the script

```bash
poetry run python governance.py
```
117 changes: 117 additions & 0 deletions examples/governance/governance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import base64
import json
import random

from algokit_subscriber.subscriber import AlgorandSubscriber
from algokit_subscriber.types.subscription import SubscribedTransaction
from algokit_utils.beta.algorand_client import AlgorandClient
from algokit_utils.beta.composer import PayParams

algorand = AlgorandClient.default_local_net()


dispenser = algorand.account.localnet_dispenser()
sender = algorand.account.random()

# Fund the sender
algorand.send.payment(
PayParams(sender=dispenser.address, receiver=sender.address, amount=1_000_000)
)

# Send a governance commitment message
algorand.send.payment(
PayParams(
sender=sender.address,
receiver=sender.address,
amount=0,
# Commit a random amount of ALGO
note=f'af/gov1:j{{"com":{random.randint(1_000_000, 100_000_000)}}}'.encode(),
)
)

# Send an unrelated message
algorand.send.payment(
PayParams(
sender=sender.address,
receiver=sender.address,
amount=0,
note=b"Some random txn",
)
)

# Every subscriber instance uses a water to track what block it processed last.
# In this example we are using a variable to track the watermark

watermark = 0


# To implement a watermark in the subscriber, we must define a get and set function
def get_watermark() -> int:
"""
Get the current watermark value
"""
return watermark


def set_watermark(new_watermark: int) -> None:
"""
Set our watermark variable to the new watermark from the subscriber
"""
global watermark # noqa: PLW0603
watermark = new_watermark


subscriber = AlgorandSubscriber(
algod_client=algorand.client.algod,
indexer_client=algorand.client.indexer,
config={
"filters": [
{
"name": "Governance",
# Only match non-zero USDC transfers
"filter": {
"type": "pay",
"note_prefix": "af/gov1:j",
},
},
],
# Instead of always waiting for the next block, just poll for new blocks every 10 seconds
"wait_for_block_when_at_tip": False,
"frequency_in_seconds": 10,
# The watermark persistence functions are used to get and set the watermark
"watermark_persistence": {"get": get_watermark, "set": set_watermark},
# Indexer has the ability to filter transactions server-side, resulting in less API calls
# This is only useful if we have a very specific query, such as a note prefix
"sync_behaviour": "catchup-with-indexer",
},
)


def print_transfer(transaction: SubscribedTransaction, _: str) -> None:
"""
This is an EventListener callback. We use the .on function below to attach this callback to specific events.
Every EventListener callback will receive two arguments:
* The transaction data
* The filter name (from the 'filters' list) that the transaction matched
"""
json_data = (
base64.b64decode(transaction["note"])
.decode()
.split(":j")[1]
.replace("”", '"')
.replace("“", '"')
)

amount = json.loads(json_data)["com"] * 1e-6

print(
f"Transaction {transaction['sender']} committed {amount} ALGO on round {transaction['confirmed-round']} in transaction {transaction['id']}"
)


# Attach the callback to the events we are interested in
subscriber.on("Governance", print_transfer)

# Start the subscriber
subscriber.start()
23 changes: 23 additions & 0 deletions examples/live_monitoring/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Subscriber Example: Live Monitoring

This example demonstrates how to use the `AlgorandSubscriber` to get live transactions from the Algorand blockchain.

Each round, the subscriber will print out all of the USDC and ALGO transactions that have ocurred.

This is an example of using the subscriber for live monitoring where you don't care about historical data. This behavior is primarily driven by the `"sync_behaviour": "skip-sync-newest"` configuration which skips syncing older blocks. Since we don't care about historical data, the watermark of the last round processed is not persisted and only a non-archival algod is required for the subscriber to function. This makes this setup lightweight with low infrastructure requirements.

## Running the example

To run the example, execute the following commands:

### Install dependencies

```bash
poetry install
```

### Run the script

```bash
poetry run python live_monitoring.py
```
88 changes: 88 additions & 0 deletions examples/live_monitoring/live_monitoring.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from algokit_subscriber.subscriber import AlgorandSubscriber
from algokit_subscriber.types.subscription import SubscribedTransaction
from algokit_utils.beta.algorand_client import AlgorandClient

algorand = AlgorandClient.main_net()

# Every subscriber instance uses a water to track what block it processed last.
# In this example we are using a variable to track the watermark

watermark = 0


# To implement a watermark in the subscriber, we must define a get and set function
def get_watermark() -> int:
"""
Get the current watermark value
"""
return watermark


def set_watermark(new_watermark: int) -> None:
"""
Set our watermark variable to the new watermark from the subscriber
"""
global watermark # noqa: PLW0603
watermark = new_watermark


subscriber = AlgorandSubscriber(
algod_client=algorand.client.algod,
config={
"filters": [
{
"name": "USDC",
# Only match non-zero USDC transfers
"filter": {
"type": "axfer",
"asset_id": 31566704, # mainnet usdc
"min_amount": 1,
},
},
{
"name": "ALGO",
# Only match non-zero ALGO transfers
"filter": {
"type": "pay",
"min_amount": 1,
},
},
],
# Once we are caught up, always wait until the next block is available and process it immediately once available
"wait_for_block_when_at_tip": True,
# The watermark persistence functions are used to get and set the watermark
"watermark_persistence": {"get": get_watermark, "set": set_watermark},
# Skip the sync process and immediately get the latest block in the network
"sync_behaviour": "skip-sync-newest",
# Max rounds to sync defines how many rounds to lookback when first starting the subscriber
# If syncing via a non-archival node, this could be up to 1000 rounds back
# In this example we want to immediately start processing the latest block without looking back
"max_rounds_to_sync": 1,
},
)


def print_transfer(transaction: SubscribedTransaction, filter_name: str) -> None:
"""
This is an EventListener callback. We use the .on function below to attach this callback to specific events.
Every EventListener callback will receive two arguments:
* The transaction data
* The filter name (from the 'filters' list) that the transaction matched
"""
if filter_name == "USDC":
details = transaction["asset-transfer-transaction"]
elif filter_name == "ALGO":
details = transaction["payment-transaction"]

print(
f"{transaction['sender']} sent {details['receiver']} {details['amount'] * 1e-6} {filter_name} in transaction {transaction['id']}"
)


# Attach the callback to the events we are interested in
subscriber.on("ALGO", print_transfer)
subscriber.on("USDC", print_transfer)

# Start the subscriber
subscriber.start()
69 changes: 0 additions & 69 deletions examples/payments/payments.py

This file was deleted.

2 changes: 2 additions & 0 deletions examples/transaction_recording/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
transactions.csv
watermark
25 changes: 25 additions & 0 deletions examples/transaction_recording/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Subscriber Example: Transaction Record

This example demonstrates how to use the `AlgorandSubscriber` to record all of the balance changes for a given account. This example uses the filesystem to persist the watermark and record transactions in a CSV file. This example makes use of the `"balance_changes"` filter option which will include ANY transaction that affects the balance of the given account. This example uses `"sync_behaviour": "sync-oldest"` to ensure that we get all historical data from an archival node. An indexer could be used for catchup, but due to the complex nature of the query it would not save any API calls like it would with a more simple query (such as the one in the [governance example](../governance/README.md)).

## Created Files

`watermark` will be created with the last processed round and updated with each new round processed.

`transactions.csv` will be created with the header `round,sender,receiver,amount` and will append a new row for each transaction processed.

## Running the example

To run the example, execute the following commands:

### Install dependencies

```bash
poetry install
```

### Run the script

```bash
poetry run python governance.py
```
Loading

0 comments on commit 6fe4b1e

Please sign in to comment.