Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: event queue connection class design #71

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
# Summary
[summary]: #summary

A new connection class that streams events from the fullnode event queue.

# Motivation
[motivation]: #motivation

The event queue is a way to sequentially stream events, making sure we do not miss any transaction or transaction update, we can also continue where we left in case of any connection issues, currently any connection issue makes it so we have to sync the entire wallet history from the beggining.

# Guide-level explanation
[guide-level-explanation]: #guide-level-explanation

The fullnode's event queue (a.k.a. reliable integration) has 2 ways to stream events, a websocket and a http api (not server-side events (SSE), just polling events with an event id based pagination).
We will focus on the websocket implementation but we can later create another class that uses the http api.

## Events

The events from the event queue are not filtered in any way, meaning we have to implement our own filter.
The events that represent new transactions or blocks is called `NEW_VERTEX_ACCEPTED` and any updates on the transaction data is called `VERTEX_METADATA_CHANGED` these 2 events come with the current transaction data, we can use this data to filter for transaction of our wallet.

## Address subscription

The current implementation uses the fullnode pubsub to listen only for addresses of our wallet, meaning that during startup we send a subscribe command for each address of our wallet.
Since the events will be filtered locally we can instead use the `subscribeAddresses` method to create a local list of addresses being listened to and use this list to filter the events.

The "list of addresses" will be an object where the address is the key, since determining if an object has a key is O(1) we can ensure that this does not become a bottleneck for wallets with many addresses.
Alternatively, we could use the storage `isAddressMine` method which is already O(1).

## Event streaming

To get the full state of the wallet we would need to stream all events of the fullnode, but we can still use the address history api to fetch the balance and transactions of our addresses and start listening the newer events.

The best way to achieve this is to use the event api to fetch a single event, this will come with the latest event id.
Example response of `GET ${FULLNODE_URL}/v1a/event?size=1`

```json
{
"events": [
{
"peer_id": "ca084565aa4ac6f84c452cb0085c1bc03e64317b64e0761f119b389c34fcfede",
"id": 0,
"timestamp": 1686186579.306944,
"type": "LOAD_STARTED",
"data": {},
"group_id": null
}
],
"latest_event_id": 9038
}
```

Then we save `latest_event_id` and sync the history with the address history api.
Once we have the wallet on the current state we can start streaming from the `latest_event_id`.
There can be transactions arriving during this process which would mean we add them during the history sync and during the event streaming, but this issue does not affect the balance or how we process the history.
andreabadesso marked this conversation as resolved.
Show resolved Hide resolved

## Best block update

The fullnode pubsub sends updates whenever the best chain height changes, this is so our wallets can unlock our block rewards, the event queue does not send an update like this but we receive all block transactions as events, meaning we can listen for any transaction with `version` 0 or 3 (block or merged mining block) and with the metadata `voided_by` as `null` (this is because if a block is not voided, it is on the main chain) and derive the best chain height.

We will always expect the latest unvoided block to be the best chain newest block since during re-orgs (where the best chain changes) we will receive updates and the new best chain will be updated with `voided_by` as `null`.
andreabadesso marked this conversation as resolved.
Show resolved Hide resolved

## EventQueueConnection class

The `EventQueueConnection` class will manage a websocket instance to the event queue api and emit a `wallet-update` event, this is to keep compatibility with the existing `Connection` class.

The `wallet-update` event will work with the schema:

```ts
interface WalletUpdateEvent {
type: 'wallet:address_history',
history: IHistoryTx,
}

// Where IHistoryTx is defined as:

interface IHistoryTx {
tx_id: string;
signalBits: number;
version: number;
weight: number;
timestamp: number;
is_voided: boolean;
nonce: number,
inputs: IHistoryInput[];
outputs: IHistoryOutput[];
parents: string[];
token_name?: string;
token_symbol?: string;
tokens: string[];
height?: number;
}

export interface IHistoryInput {
value: number;
token_data: number;
script: string;
decoded: IHistoryOutputDecoded;
token: string;
tx_id: string;
index: number;
}

export interface IHistoryOutputDecoded {
type?: string;
address?: string;
timelock?: number | null;
data?: string;
}

export interface IHistoryOutput {
value: number;
token_data: number;
script: string;
decoded: IHistoryOutputDecoded;
token: string;
spent_by: string | null;
}
```

The transaction data from the event queue is in a different format as described below:

```ts
interface EventQueueTxData {
hash: string;
version: number;
weight: number;
timestamp: number;
nonce?: number;
inputs: EventQueueTxInput[];
outputs: EventQueueTxOutput[];
parents: string[];
token_name?: string;
token_symbol?: string;
tokens: string[];
metadata: EventQueueTxMetadata;
aux_pow?: string;
}

interface EventQueueTxMetadata {
hash: string;
spent_outputs: EventQueueSpentOutput[];
conflict_with: string[];
voided_by: string[];
received_by: string[];
children: string[];
twins: string[];
accumulated_weight: number;
score: number;
first_block?: string;
height: number;
validation: string;
}

interface EventQueueTxInput {
tx_id: string;
index: number;
data: string;
}

interface EventQueueTxOutput {
value: number;
script: string;
token_data: number;
}

interface EventQueueSpentOutput {
index: number;
tx_ids: string[];
}
```

### Data conversion process

To keep compatibility with the current `Connection` class we need to convert the data with the following process:

1. Convert `hash` to `tx_id`
2. Assert `nonce` is valid
andreabadesso marked this conversation as resolved.
Show resolved Hide resolved
3. Assign `signalBits` from `version`
4. Assign `is_voided` from metadata's `voided_by`
5. Assign `height` from metadata's height
6. Convert outputs
7. Convert inputs
8. remove `metadata` and `aux_pow` fields

Process to convert outputs:

1. Derive `token` from `token_data` and `tx.tokens`
2. Derive `spent_by` from the output index and `tx.metadata.spent_outputs`
3. Derive `decoded` from the script

Process to convert inputs:

1. Try to find the transaction with the input's `tx_id` in storage
1. If not found we must fetch the tx from the fullnode api
andreabadesso marked this conversation as resolved.
Show resolved Hide resolved
2. Assign `value`, `token_data`, `script` and `token` from the spent output

Now that the data matches the current websocket transaction we can emit the data and all processes to manage history from the facade will work as intended.
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
# Summary
[summary]: #summary

Organize the connection manager classes and move responsabilities from the wallet facade to the connection manager.

# Motivation
[motivation]: #motivation

The connection manager should be responsible for managing the state of the
connection and how the events from the connection are used, this way we can have
a multiple connection classes to handle the different behaviors but maintaining
a common interface.

# Guide-level explanation
[guide-level-explanation]: #guide-level-explanation


## Current implementation

The current `Connection` class implementation is a wrapper around a `WebSocket`
class, the `WebSocket` class is responsible for managing the connection and
actually sending and receiving data.
The `Connection` class will interpret the events from the websocket instance and
emit events that are used by the facade, it also subscribes to specific events
and handles them differently depending on the type of event.

The layers of abstraction make the connection very easy to instantiate and use
but create a black box of events that are interpreted by the facade.

## New connection classes

### FullnodePubSubConnection and FullnodeEventQueueWSConnection

Will connect to the fullnode websocket and subscribe to the apropriate
events, meaning the best block updates and the transaction events.

Transaction events will be inserted into the storage and the history will be
processed (calculating metadata, balance, etc) by the connection instance.
Best block updates will be processed as usual.

#### Events

- `new-tx` and `update-tx`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semi-unrelated to line number, but remember to add a ping/pong mechanism

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The document of the event queue and the implementation did not have ping/pong anywhere but from what I understand it comes with the websocket lib we use, so I'll add the ping/pong to the design

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Autobahn answers the PING frame with a PONG and also disconnects when no messages are received in a window. It also doesn't send a PING, so the implementation must be done in the client

- The event data will have the transaction id. // or data
- `best-block-update`
- The event data will have the new height
- `conn_state`
- Event data is a `ConnectionState`, so the wallet can know if it is receiving events in real
time or not.
- `sync-history-partial-update`
- This is so the wallet facade can update the UI when we are waiting for the
wallet to load.
- Will have 3 stages: `syncing`, `processing` and `fetching-tokens`.
- `syncing` means that the wallet is still fetching the history from the
fullnode.
- `processing` means that we are calculating the balance of the wallet.
- `fetching-tokens` means we are fetching the token data for our tokens.

#### Methods

- subscribeAddresses, unsubscribeAddresse(s)
- Start/stop listening for transactions with the provided addresses.
- start
- Start the websocket and add listeners
- stop
- Close the websocket and remove listeners
- getter methods for the configuration
- getNetwork, getServerURL

The other methods will be internal.


#### pubsub vs event queue

These classes have the same methods and the same events because they are meant
to be used by the same wallet facade (i.e. `HathorWallet`).
The difference between them is the underlying websocket class and how it is
managed.

The events are also derived in a very different way, the pubsub will receive
only transactions from the wallet but the event queue will receive all events
from the fullnode and will have to filter them locally.

Even with the different implementations since we will expose a common interface
the facade will be able to work with both of them interchangeably.

### WalletServiceConnection

The wallet-service connection does not have to handle transaction processing
since all of this is handled by the wallet-service.
The main concern of this connection is to re-emit events from the wallet-service
and to signal the wallet facade that it needs to update the "new addresses" list.

#### Events

- `new-tx` and `update-tx`
- The event data will have the transaction id. // or data
- `conn_state`
- Event data is a `ConnectionState`, so the wallet can know if it is receiving events in real
time or not.
- `reload-data`
- When the connection becomes available after it went offline this event will
be emitted to signal that the data needs to be reloaded.

#### Methods

- start
- stop
- setWalletId
- getter methods for the configuration

The other methods will be internal.

### FullnodeDataConnection

This is the only connection class not meant to be used by a wallet facade but to
be used by an external client to get real-time events from the fullnode, an
example of how to use this is the explorer main page, to update with new
transactions and blocks.

#### Events

- `network-update`
- network events are related to peers of the connected node or when a new tx
is accepted by the network.
- `dashboard`
- Dashboard data from the fullnode.

## Changes to the wallet facade

The wallet facade will be updated to use the new connection classes.
This means that instead of receiving a connection instance the wallet facade
will receive the connection params and options (i.e. `connection_params`) and
will instantiate the appropriate connection class.

The wallet-service facade can only use the `WalletServiceConnection`, but the
`HathorWallet` will need an aditional parameter to choose which connection class
to use.
The `connection_mode` argument will be introduced to resolve this issue, it will
be one of the following:

- `pubsub`
- Uses `FullnodePubSubConnection`
- `event_queue`
- Uses `FullnodeEventQueueConnection`
- `auto`

If not provided, the `pubsub` mode will be used, since this is the most common
connection used by our wallets.

Ideally the `auto` mode would decide to use either `FullnodePubSubConnection` or
`FullnodeEventQueueConnection` based on the size of the wallet and other
parameters, but until we have a better strategy it will simply use
`FullnodePubSubConnection`.

## FullnodeEventQueueWSConnection connection managementstate

Now that the connection class is responsible for syncing the history, we can
implement a special logic for the event queue.
When we are reestablishing the connection we can start from the last
acknowledged event.

Usually when the connection is lost we will need to clean and sync the history
of transactions from the beginning since we may have lost some events, but the
event queue allows us to start streaming events from the moment we lost
connection.
This makes it so we don't have to clean the history and can just start the
stream from where we left off.
Although this is only applicable when we are connected to the same fullnode, so
we need to check if the fullnode we are connected to is the same and if it isn't
we will need to re-sync as usual.

To make sure we are connected to the same fullnode we will use the peer-id (32 byte identifier) and the `stream_id` (uuid4).
Loading