Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: Move worker to nest #16

Merged
merged 7 commits into from
Mar 27, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import { PairLiquidityInfoHistoryService } from './pair-liquidity-info-history.s
import { ApiOperation, ApiQuery, ApiResponse } from '@nestjs/swagger';
import * as dto from '../../dto';
import { OrderQueryEnum } from '../../dto';
import { ContractAddress } from '../../lib/utils';

import { ContractAddress } from '../../clients/sdk-client.model';

@Controller('history/liquidity')
export class PairLiquidityInfoHistoryController {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import { Injectable } from '@nestjs/common';
import { PairLiquidityInfoHistoryDbService } from '../../database/pair-liquidity-info-history/pair-liquidity-info-history-db.service';
import { Pair, PairLiquidityInfoHistory } from '@prisma/client';
import { OrderQueryEnum } from '../../dto';
import { ContractAddress } from '../../lib/utils';

import { ContractAddress } from '../../clients/sdk-client.model';

@Injectable()
export class PairLiquidityInfoHistoryService {
Expand Down
3 changes: 2 additions & 1 deletion src/api/tokens/tokens.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import {
ApiParam,
ApiHeaders,
} from '@nestjs/swagger';
import { removeId, ContractAddress } from '../../lib/utils';
import { removeId } from '../../lib/utils';
import * as prisma from '@prisma/client';
import { ContractAddress } from '../../clients/sdk-client.model';

const withTokenAuthorization = async <T>(
auth: string,
Expand Down
3 changes: 2 additions & 1 deletion src/api/tokens/tokens.service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { Injectable } from '@nestjs/common';
import { Pair, PairLiquidityInfo, Token } from '@prisma/client';
import { ContractAddress, presentInvalidTokens } from '../../lib/utils';
import { presentInvalidTokens } from '../../lib/utils';
import { TokenDbService } from '../../database/token/token-db.service';
import { ContractAddress } from '../../clients/sdk-client.model';

@Injectable()
export class TokensService {
Expand Down
4 changes: 3 additions & 1 deletion src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import { TokensService } from './api/tokens/tokens.service';
import { PairsService } from './api/pairs/pairs.service';
import { ClientsModule } from './clients/clients.module';
import { ApiModule } from './api/api.module';
import { PairSyncService } from './tasks/pair-sync.service';
import { MdwWsClientService } from './clients/mdw-ws-client.service';

@Module({
imports: [ApiModule, ClientsModule, DatabaseModule],
controllers: [AppController],
providers: [TokensService, PairsService],
providers: [MdwWsClientService, PairsService, TokensService, PairSyncService],
})
export class AppModule {}
7 changes: 4 additions & 3 deletions src/clients/clients.module.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { Module } from '@nestjs/common';
import { MdwClientService } from './mdw-client.service';
import { MdwHttpClientService } from './mdw-http-client.service';
import { SdkClientService } from './sdk-client.service';

@Module({
providers: [MdwClientService],
exports: [MdwClientService],
providers: [MdwHttpClientService, MdwHttpClientService, SdkClientService],
exports: [MdwHttpClientService, MdwHttpClientService, SdkClientService],
})
export class ClientsModule {}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
KeyBlockHash,
MicroBlockHash,
TxHash,
} from '../lib/utils';
} from './sdk-client.model';

export type MdwPaginatedResponse<T> = {
next?: string;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
import { Injectable } from '@nestjs/common';
import NETWORKS from '../lib/networks';
import {
AccountAddress,
ContractAddress,
KeyBlockHash,
MicroBlockHash,
nonNullable,
} from '../lib/utils';
import { nonNullable } from '../lib/utils';
import {
AccountBalance,
ContractBalance,
Contract,
ContractLog,
MdwMicroBlock,
MdwPaginatedResponse,
} from './mdw-client.model';
} from './mdw-http-client.model';
import {
AccountAddress,
ContractAddress,
KeyBlockHash,
MicroBlockHash,
} from './sdk-client.model';

@Injectable()
export class MdwClientService {
export class MdwHttpClientService {
private readonly LIMIT = 100;
private readonly DIRECTION = 'forward';
private readonly INT_AS_STRING = true;
Expand Down
43 changes: 43 additions & 0 deletions src/clients/mdw-ws-client.model.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import {
CallData,
ContractAddress,
MicroBlockHash,
Payload,
Signature,
TxHash,
WalletAddress,
} from './sdk-client.model';

export type SubscriptionEvent = {
subscription: 'Object' | 'Transactions'; // add any other additional enum values if are used
source: string;
payload: {
tx: {
version: number;
nonce: number;
fee: number;
amount: number;
} & (
| {
type: 'ContractCallTx'; // add any other additional enum values if are used
gas_price: number;
gas: number;
contract_id: ContractAddress;
caller_id: WalletAddress;
call_data: CallData;
abi_version: number;
}
| {
type: 'SpendTx';
ttl: number;
sender_id: WalletAddress;
recipient_id: WalletAddress;
payload: Payload;
}
);
signatures: Signature[];
hash: TxHash;
block_height: number;
block_hash: MicroBlockHash;
};
};
156 changes: 156 additions & 0 deletions src/clients/mdw-ws-client.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import { Injectable, Logger } from '@nestjs/common';
import * as WebSocket from 'ws';
import NETWORKS from '../lib/networks';
import { nonNullable, pluralize } from '../lib/utils';
import { SubscriptionEvent } from './mdw-ws-client.model';
import { ContractAddress } from './sdk-client.model';

export type Callbacks = {
onDisconnected?: (error?: Error) => any;
onEventReceived?: (event: SubscriptionEvent) => any;
onConnected?: () => any;
};

@Injectable()
export class MdwWsClientService {
readonly logger = new Logger(MdwWsClientService.name);

createNewConnection = async (callbacks: Callbacks = {}) => {
//1. connect
const ws = this.createWebSocketConnection();

//2. crate ping time-out checker
const { setAlive, stopPing } = this.startPingMechanism(ws);

//
// set up the subscription
//

//3. on connect...
const openHandler = async () => {
setAlive();
ws.on('pong', setAlive);

const { ROUTER_ADDRESS, SUBSCRIBE_TO_ALL_TXS } = process.env;
if (SUBSCRIBE_TO_ALL_TXS && parseInt(SUBSCRIBE_TO_ALL_TXS)) {
this.subscribeToAllTxs(ws);
} else {
this.subscribeToContract(
ws,
nonNullable(ROUTER_ADDRESS) as ContractAddress,
);
}
callbacks.onConnected && callbacks.onConnected();
};

//4. when receive new messages
const messageHandler = this.createMessageHandler(
callbacks,
ws,
this.logger,
);

const errorHandler = (error?: Error) => {
callbacks.onDisconnected && callbacks.onDisconnected(error);
stopPing();
ws.removeAllListeners();
};
const closeHandler = () => errorHandler();
const onPing = (event: Buffer) => ws.pong(event);

ws.on('error', errorHandler);
ws.on('message', messageHandler);
ws.on('open', openHandler);
ws.on('close', closeHandler);
ws.on('ping', onPing);

return ws;
};

private createMessageHandler =
(callbacks: Callbacks, ws: WebSocket, logger: Logger) =>
async (msg: WebSocket.RawData) => {
const stringMessage = msg.toString();
const objMessage = JSON.parse(stringMessage);
const onUnknownMessage = () => {
ws.close();
throw new Error(`Unknown message received: ${stringMessage}`);
};
if (Array.isArray(objMessage)) {
if (objMessage.some((x) => x === 'Transactions')) {
logger.debug(`Subscribed to all transactions`);
} else {
logger.debug(
`Subscribed to ${pluralize(objMessage.length, 'contract')}`,
);
}
return;
}
if (typeof objMessage === 'string') {
// if the message doesn't represent an already existing subscription
if (objMessage.indexOf('already subscribed to target')) {
onUnknownMessage();
}
// there is nothing of interest here, let's exit
return;
} else if (
!['Object', 'Transactions'].some((x) => objMessage.subscription === x)
) {
onUnknownMessage();
return;
}
const event: SubscriptionEvent = objMessage;
//if pair update subscribe to pair
const callback = callbacks.onEventReceived;
callback && (await callback(event));
};

private createWebSocketConnection = () =>
new WebSocket(
NETWORKS[nonNullable(process.env.NETWORK_NAME)].middlewareWebsocketUrl,
);

private subscribeToContract = (ws: WebSocket, address: ContractAddress) =>
ws.send(
JSON.stringify({
op: 'Subscribe',
payload: 'Object',
target: address,
}),
);

private subscribeToAllTxs = (ws: WebSocket) =>
ws.send(
JSON.stringify({
op: 'Subscribe',
payload: 'Transactions',
}),
);

private startPingMechanism = (ws: WebSocket) => {
let isAlive = false;

const pingTimeOut = parseInt(process.env.MDW_PING_TIMEOUT_MS || '0');
const interval = pingTimeOut
? setInterval(() => {
if (!isAlive) {
this.logger.warn('Ws terminate because of ping-timeout');
interval && clearInterval(interval);
ws.terminate();
return;
}

isAlive = false;
ws.ping();
}, pingTimeOut)
: null;
return {
setAlive: () => {
isAlive = true;
},
stopPing: () => {
interval && clearInterval(interval);
},
};
};
}
15 changes: 15 additions & 0 deletions src/clients/sdk-client.model.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { Encoded } from '@aeternity/aepp-sdk';

export type AccountAddress = Encoded.AccountAddress;
export type ContractAddress = Encoded.ContractAddress;
export type WalletAddress = Encoded.AccountAddress;
export type CallData = Encoded.ContractBytearray;
export type Signature = Encoded.Signature;
export type TxHash = Encoded.TxHash;
export type MicroBlockHash = Encoded.MicroBlockHash;
export type KeyBlockHash = Encoded.KeyBlockHash;
export type Payload = Encoded.Bytearray;

export const contractAddrToAccountAddr = (
contractAddress: ContractAddress,
): AccountAddress => contractAddress.replace('ct_', 'ak_') as AccountAddress;
28 changes: 28 additions & 0 deletions src/clients/sdk-client.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { Injectable } from '@nestjs/common';
import { AeSdk, Node } from '@aeternity/aepp-sdk';
import { nonNullable } from '../lib/utils';
import NETWORKS from '../lib/networks';

@Injectable()
export class SdkClientService {
private client: AeSdk;
private node: Node;

async getClient(): Promise<[AeSdk, Node]> {
const NETWORK_NAME = nonNullable(process.env.NETWORK_NAME);
if (!this.client) {
this.node = new Node(NETWORKS[NETWORK_NAME].nodeUrl, {
ignoreVersion: true,
});

this.client = new AeSdk({
nodes: [{ name: NETWORK_NAME, instance: this.node }],
});
}
return [this.client, this.node];
}

async getHeight(): Promise<number> {
return await this.getClient().then(([client]) => client.getHeight());
}
}
3 changes: 0 additions & 3 deletions src/dal/client.ts

This file was deleted.

2 changes: 0 additions & 2 deletions src/dal/index.ts

This file was deleted.

Loading