Skip to content

Commit

Permalink
feat(demux): ✨ support get blockchain data from hyperion api
Browse files Browse the repository at this point in the history
  • Loading branch information
quyvo authored and manh-vv committed Dec 16, 2020
1 parent 4d83757 commit 1b38168
Show file tree
Hide file tree
Showing 12 changed files with 1,808 additions and 7 deletions.
268 changes: 268 additions & 0 deletions packages/demux/src/HyperionActionReader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
import * as http from 'http';
import * as https from 'https';
import { AbstractActionReader, NotInitializedError, ActionReaderOptions } from 'demux';
import fetch from 'node-fetch';
import { retry } from './utils';
import { HyperionBlock } from './HyperionBlock';

export interface HyperionActionReaderOptions extends ActionReaderOptions {
hyperionEndpoint?: string;
}

/**
* Reads from an hyperion node to get blocks of actions.
* deferred transactions and inline actions will be included,
*/
export class HyperionActionReader extends AbstractActionReader {
protected hyperionEndpoint: string;
protected httpAgent: http.Agent;
protected httpsAgent: https.Agent;

constructor(options: HyperionActionReaderOptions = {}) {
super(options);
const hyperionEndpoint = options.hyperionEndpoint
? options.hyperionEndpoint
: 'http://localhost:8888';
this.hyperionEndpoint = hyperionEndpoint.replace(/\/+$/g, ''); // Removes trailing slashes
}

/**
* Returns a http/https agent use to request.
* create new agent if not existed
*/
private getConnectionAgent(_parsedURL) {
if (_parsedURL.protocol === 'http:') {
if (!this.httpAgent) {
this.httpAgent = new http.Agent({
keepAlive: true,
keepAliveMsecs: 3000,
});
}
return this.httpAgent;
} else {
if (!this.httpsAgent) {
this.httpsAgent = new https.Agent({
keepAlive: true,
keepAliveMsecs: 3000,
});
}
return this.httpsAgent;
}
}

private async processBlockMeta(
block: any,
numRetries: number = 120,
waitTimeMs: number = 250,
){
const processedTransactions = [];
for (const transaction of block.transactions) {
if (this.includeMetaAction(transaction)) {
processedTransactions.push({
id: transaction.id,
actions: await this.getActionMeta(transaction.id, numRetries, waitTimeMs),
})
} else {
processedTransactions.push(transaction);
}
}

block.transactions = processedTransactions;
return block;
}

/**
* get meta data of action.
*/
private async getActionMeta(
transactionId: string,
numRetries: number = 120,
waitTimeMs: number = 250,
): Promise<any> {
try {
const rawTransaction = await retry(
async () => {
return await fetch(
`${this.hyperionEndpoint}/v2/history/get_transaction?id=${transactionId}`,
{
method: 'get',
headers: { 'Content-Type': 'application/json' },
agent: this.getConnectionAgent.bind(this),
},
).then(res => {
if (res.ok) {
return res.json();
} else {
throw new Error(res.statusText);
}
});
},
numRetries,
waitTimeMs,
);

if (!rawTransaction.actions) {
return [];
}

const actionMeta = [];

for (const action of rawTransaction.actions) {
for (const receipt of action.receipts) {
actionMeta.push({
receiver: receipt.receiver,
account: action.act.account,
action: action.act.name,
authorization: action.act.authorization.map(auth => {
return { account: auth.actor, permission: auth.permission };
}),
data: action.act.data,
});
}
}

return actionMeta;

} catch (err) {
this.log.error(err);
throw new Error('Error get action meta, max retries failed');
}
}

private includeMetaAction(transaction: any) {
for (const action of transaction.actions) {
if (action.receiver === 'eosio') {
if (
action.action === 'newaccount' ||
action.action === 'updateauth' ||
action.action === 'unstaketorex' ||
action.action === 'buyrex' ||
action.action === 'buyram' ||
action.action === 'buyrambytes' ||
action.action === 'undelegatebw' ||
action.action === 'delegatebw'
) {
return true;
}
}

if (action.action === 'transfer') {
return true;
}
}

return false;
}

/**
* Returns a promise for the head block number.
*/
public async getHeadBlockNumber(
numRetries: number = 120,
waitTimeMs: number = 250,
): Promise<number> {
try {
return await retry(
async () => {
const blockInfo = await fetch(`${this.hyperionEndpoint}/v1/chain/get_info`, {
agent: this.getConnectionAgent.bind(this),
}).then(res => {
if (res.ok) {
return res.json();
} else {
throw new Error(res.statusText);
}
});
return blockInfo.head_block_num;
},
numRetries,
waitTimeMs,
);
} catch (err) {
throw new Error('Error retrieving head block, max retries failed');
}
}

public async getLastIrreversibleBlockNumber(
numRetries: number = 120,
waitTimeMs: number = 250,
): Promise<number> {
try {
return await retry(
async () => {
const blockInfo = await fetch(`${this.hyperionEndpoint}/v1/chain/get_info`, {
agent: this.getConnectionAgent.bind(this),
}).then(res => {
if (res.ok) {
return res.json();
} else {
throw new Error(res.statusText);
}
});
return blockInfo.last_irreversible_block_num;
},
numRetries,
waitTimeMs,
);
} catch (err) {
this.log.error(err);
throw new Error('Error retrieving last irreversible block, max retries failed');
}
}

/**
* Returns a promise for a `NodeosBlock`.
*/
public async getBlock(
blockNumber: number,
numRetries: number = 120,
waitTimeMs: number = 250,
): Promise<HyperionBlock> {
try {
const block = await retry(
async () => {
return await fetch(`${this.hyperionEndpoint}/v1/trace_api/get_block`, {
method: 'post',
body: JSON.stringify({ block_num: blockNumber }),
headers: { 'Content-Type': 'application/json' },
agent: this.getConnectionAgent.bind(this),
}).then(res => {
if (res.ok) {
return res.json();
} else {
throw new Error(res.statusText);
}
});
},
numRetries,
waitTimeMs,
);

const processedBlock = await this.processBlockMeta(block, numRetries, waitTimeMs);
return new HyperionBlock(processedBlock, this.log);
} catch (err) {
this.log.error(err);
throw new Error('Error block, max retries failed');
}
}

protected async setup(): Promise<void> {
if (this.initialized) {
return;
}

try {
await fetch(`${this.hyperionEndpoint}/v1/chain/get_info`, {
agent: this.getConnectionAgent.bind(this),
}).then(res => {
if (res.ok) {
return res.json();
} else {
throw new Error(res.statusText);
}
});
} catch (err) {
throw new NotInitializedError('Cannot reach supplied nodeos endpoint.', err);
}
}
}
76 changes: 76 additions & 0 deletions packages/demux/src/HyperionBlock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import * as Logger from 'bunyan';
import { Block, BlockInfo } from 'demux';
import { Action } from 'demux';

export interface EosAuthorization {
actor: string
permission: string
}

export interface EosPayload<ActionStruct = any> {
account: string
authorization: EosAuthorization[]
data: ActionStruct
name: string
transactionId: string
actionIndex?: number
actionOrdinal?: number
producer?: string
notifiedAccounts?: string[]
receiver?: string
isContextFree?: boolean
isInline?: boolean
isNotification?: boolean
contextFreeData?: Buffer[]
transactionActions?: TransactionActions
}

export interface EosAction<ActionStruct = any> extends Action {
payload: EosPayload<ActionStruct>
}

export interface TransactionActions {
contextFreeActions: EosAction[]
actions: EosAction[]
inlineActions: EosAction[]
}

export class HyperionBlock implements Block {
public actions: EosAction[];
public blockInfo: BlockInfo;
constructor(
rawBlock: any,
private log: Logger,
) {
this.blockInfo = {
blockNumber: rawBlock.number,
blockHash: rawBlock.id,
previousBlockHash: rawBlock.previous_id,
timestamp: new Date(rawBlock.timestamp),
};
this.actions = this.collectActionsFromBlock(rawBlock);
}

protected collectActionsFromBlock(rawBlock: any): EosAction[] {
const producer = rawBlock.producer;
return this.flattenArray(rawBlock.transactions.map((transaction: any) => {
return transaction.actions.map((action: any, actionIndex: number) => {
const block = {
type: `${action.receiver}::${action.action}`,
payload: {
producer,
transactionId: transaction.id,
actionIndex,
...action,
},
};
return block
})
}))
}

private flattenArray(arr: any[]): any[] {
return arr.reduce((flat, toFlatten) =>
flat.concat(Array.isArray(toFlatten) ? this.flattenArray(toFlatten) : toFlatten), [])
}
}
1 change: 1 addition & 0 deletions packages/demux/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export type { DbUpdaterActionPayload } from './DbUpdater';

export { BaseHandlerVersion } from './BaseHandlerVersion';
export { AloxideActionHandler } from './AloxideActionHandler';
export { HyperionActionReader } from './HyperionActionReader';
export { DbUpdater } from './DbUpdater';
export { AloxideDataManager } from './AloxideDataManager';
export { indexStateSchema } from './indexStateSchema';
Expand Down
32 changes: 32 additions & 0 deletions packages/demux/src/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import Logger from 'bunyan';

function wait(ms: number): Promise<void> {
return new Promise(resolve => {
setTimeout(resolve, ms);
});
}

async function retry<T>(
func: () => Promise<T>,
maxNumAttempts: number,
waitMs: number,
log?: Logger,
): Promise<T> {
let numAttempts = 1;
while (numAttempts <= maxNumAttempts) {
try {
return await func();
} catch (err) {
if (numAttempts === maxNumAttempts) {
throw err;
}
}
numAttempts += 1;
log?.debug('retry count:', numAttempts);

await wait(waitMs);
}
throw new Error(`${maxNumAttempts} retries failed`);
}

export { retry };
Loading

0 comments on commit 1b38168

Please sign in to comment.