Skip to content

Commit

Permalink
Merge pull request #31 from 0xPolygon/changes-sync
Browse files Browse the repository at this point in the history
feat: new changes
  • Loading branch information
nitinmittal23 committed Mar 4, 2024
2 parents a11fab7 + cf4bb98 commit dbc73e2
Show file tree
Hide file tree
Showing 22 changed files with 165 additions and 141 deletions.
6 changes: 3 additions & 3 deletions examples/matic_transfer/consumer/src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ dotenv.config()
export default async function startConsuming(transferService: TransferService, transferMapper: TransferMapper): Promise<void> {
try {
consume({
"metadata.broker.list": process.env.KAFKA_CONNECTION_URL || "localhost:9092",
"group.id": process.env.CONSUMER_GROUP_ID || "matic.transfer.consumer",
"metadata.broker.list": process.env.KAFKA_CONNECTION_URL ?? "localhost:9092",
"group.id": process.env.CONSUMER_GROUP_ID ?? "matic.transfer.consumer",
"security.protocol": "plaintext",
topic: process.env.TRANSFER_TOPIC || "apps.1.matic.transfer",
topic: process.env.TRANSFER_TOPIC ?? "apps.1.matic.transfer",
coders: {
fileName: "matic_transfer",
packageName: "matictransferpackage",
Expand Down
2 changes: 1 addition & 1 deletion examples/matic_transfer/consumer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async function start(): Promise<void> {
}
});

const database = new Database(process.env.MONGO_URL || 'mongodb://localhost:27017/chain-indexer');
const database = new Database(process.env.MONGO_URL ?? 'mongodb://localhost:27017/chain-indexer');
await database.connect();

const transferService = new TransferService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const statics = {
//@ts-ignore
const tx = await this.findOne().sort({ timestamp: -1 }).exec();

return tx?.blockNumber || 0;
return tx?.blockNumber ?? 0;
},

/**
Expand Down
6 changes: 3 additions & 3 deletions examples/matic_transfer/producer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ Logger.create({
const producer = produce<ErigonBlockProducer>({
startBlock: parseInt(process.env.START_BLOCK as string),
rpcWsEndpoints: process.env.RPC_WS_ENDPOINT_URL_LIST?.split(','),
topic: process.env.PRODUCER_TOPIC || "polygon.1.blocks",
topic: process.env.PRODUCER_TOPIC ?? "polygon.1.blocks",
maxReOrgDepth: 96,
maxRetries: 5,
mongoUrl: process.env.MONGO_URL || 'mongodb://localhost:27017/chain-indexer',
mongoUrl: process.env.MONGO_URL ?? 'mongodb://localhost:27017/chain-indexer',
blockSubscriptionTimeout: 120000,
"bootstrap.servers": process.env.KAFKA_CONNECTION_URL || "localhost:9092",
"bootstrap.servers": process.env.KAFKA_CONNECTION_URL ?? "localhost:9092",
"security.protocol": "plaintext",
type: "blocks:erigon"
});
Expand Down
8 changes: 4 additions & 4 deletions examples/matic_transfer/transformer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Logger.create({
try {
startTransforming(
{
"bootstrap.servers": process.env.KAFKA_CONNECTION_URL || "localhost:9092",
"bootstrap.servers": process.env.KAFKA_CONNECTION_URL ?? "localhost:9092",
"group.id": "matic.transfer.transformer",
"security.protocol": "plaintext",
"message.max.bytes": 26214400,
Expand All @@ -35,11 +35,11 @@ try {
packageName: "blockpackage",
messageType: "Block"
},
topic: process.env.CONSUMER_TOPIC || "polygon.1.blocks",
topic: process.env.CONSUMER_TOPIC ?? "polygon.1.blocks",
},
{
topic: process.env.PRODUCER_TOPIC || "apps.1.matic.transfer",
"bootstrap.servers": process.env.KAFKA_CONNECTION_URL || "localhost:9092",
topic: process.env.PRODUCER_TOPIC ?? "apps.1.matic.transfer",
"bootstrap.servers": process.env.KAFKA_CONNECTION_URL ?? "localhost:9092",
"security.protocol": "plaintext",
"message.max.bytes": 26214400,
coder: {
Expand Down
6 changes: 3 additions & 3 deletions examples/nft_balancer/consumer/src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ dotenv.config()
export default async function startConsuming(transferTokenService: TransferTokenService, transferTokenMapper: TransferTokenMapper): Promise<void> {
try {
consume({
"metadata.broker.list": process.env.KAFKA_CONNECTION_URL || "localhost:9092",
"group.id": process.env.CONSUMER_GROUP_ID || "matic.transfer.consumer",
"metadata.broker.list": process.env.KAFKA_CONNECTION_URL ?? "localhost:9092",
"group.id": process.env.CONSUMER_GROUP_ID ?? "matic.transfer.consumer",
"security.protocol": "plaintext",
topic: process.env.TRANSFER_TOPIC || "apps.1.matic.transfer",
topic: process.env.TRANSFER_TOPIC ?? "apps.1.matic.transfer",
coders: {
fileName: "nft_transfer",
packageName: "nfttransferpackage",
Expand Down
2 changes: 1 addition & 1 deletion examples/nft_balancer/consumer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async function start(): Promise<void> {
}
});

const database = new Database(process.env.MONGO_URL || 'mongodb://localhost:27017/chain-indexer');
const database = new Database(process.env.MONGO_URL ?? 'mongodb://localhost:27017/chain-indexer');
await database.connect();

const transferService = new TransferTokenService(
Expand Down
6 changes: 3 additions & 3 deletions examples/nft_balancer/producer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ const producer = produce<BlockPollerProducer>({
startBlock: parseInt(process.env.START_BLOCK as string),
rpcWsEndpoints: process.env.HTTP_PROVIDER ? [process.env.HTTP_PROVIDER] : undefined,
blockPollingTimeout: parseInt(process.env.BLOCK_POLLING_TIMEOUT as string),
topic: process.env.PRODUCER_TOPIC || "polygon.1442.blocks",
topic: process.env.PRODUCER_TOPIC ?? "polygon.1442.blocks",
maxReOrgDepth: 0,
maxRetries: 5,
mongoUrl: process.env.MONGO_URL || 'mongodb://localhost:27017/chain-indexer',
mongoUrl: process.env.MONGO_URL ?? 'mongodb://localhost:27017/chain-indexer',
// blockSubscriptionTimeout: 120000,
"bootstrap.servers": process.env.KAFKA_CONNECTION_URL || "localhost:9092",
"bootstrap.servers": process.env.KAFKA_CONNECTION_URL ?? "localhost:9092",
"security.protocol": "plaintext",
type: "blocks:polling"
});
Expand Down
8 changes: 4 additions & 4 deletions examples/nft_balancer/transformer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Logger.create({
try {
startTransforming(
{
"bootstrap.servers": process.env.KAFKA_CONNECTION_URL || "localhost:9092",
"bootstrap.servers": process.env.KAFKA_CONNECTION_URL ?? "localhost:9092",
"group.id": "matic.transfer.transformer",
"security.protocol": "plaintext",
"message.max.bytes": 26214400,
Expand All @@ -35,11 +35,11 @@ try {
packageName: "blockpackage",
messageType: "Block"
},
topic: process.env.CONSUMER_TOPIC || "polygon.1.blocks",
topic: process.env.CONSUMER_TOPIC ?? "polygon.1.blocks",
},
{
topic: process.env.PRODUCER_TOPIC || "apps.1.matic.transfer",
"bootstrap.servers": process.env.KAFKA_CONNECTION_URL || "localhost:9092",
topic: process.env.PRODUCER_TOPIC ?? "apps.1.matic.transfer",
"bootstrap.servers": process.env.KAFKA_CONNECTION_URL ?? "localhost:9092",
"security.protocol": "plaintext",
"message.max.bytes": 26214400,
coder: {
Expand Down
53 changes: 32 additions & 21 deletions internal/block_getters/quicknode_block_getter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,31 +36,42 @@ export class QuickNodeBlockGetter extends BlockGetter implements IBlockGetter {
*/
public async getBlockWithTransactionReceipts(blockNumber: number | string, retryCount: number = 0): Promise<IBlock> {
try {
const response: IQuickNodeResponse = await new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error(`Request timed out for block: ${blockNumber}`));
}, this.rpcTimeout || 4000);
const response: IQuickNodeResponse = await new Promise(
async (resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error(`Request timed out for block: ${blockNumber}`));
}, this.rpcTimeout ?? 4000);

let eth: Eth = this.eth;
if (retryCount > 0 && this.alternateEth) {
eth = this.alternateEth;
}

(eth.currentProvider as WebsocketProvider).send({
method: "qn_getBlockWithReceipts",
id: Date.now().toString() + blockNumber,
params: [utils.numberToHex(blockNumber)],
jsonrpc: "2.0"
}, (error, response) => {
if (error) {
clearTimeout(timeout);
reject(error);
let eth: Eth = this.eth;
if (retryCount > 0 && this.alternateEth) {
await new Promise(r => setTimeout(r, 2000));
eth = this.alternateEth;
}

clearTimeout(timeout);
resolve(response?.result);
(eth.currentProvider as WebsocketProvider).send({
method: "qn_getBlockWithReceipts",
id: Date.now().toString() + blockNumber,
params: [utils.numberToHex(blockNumber)],
jsonrpc: "2.0"
}, (error, response) => {
if (error) {
clearTimeout(timeout);
reject(error);
}

if (!(response?.result)) {
clearTimeout(timeout);
reject(
new Error(
`null response received for block: ${blockNumber}`
)
);
}

clearTimeout(timeout);
resolve(response?.result);
});
});
});

const transactions: ITransaction[] = [];

Expand Down
6 changes: 6 additions & 0 deletions internal/block_producers/block_producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ export class BlockProducer extends AsynchronousProducer {
await this.database.connect();
const metadata = await super.start();

Logger.info({
location: "block_producer",
function: "start",
message: "Producer started",
});

this.on("delivered", async (report: DeliveryReport) => {
if (report.partition === -1) {
const error = new BlockProducerError(
Expand Down
2 changes: 2 additions & 0 deletions internal/block_subscription/block_subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ export class BlockSubscription extends AbstractBlockSubscription {
workerData
}
);
worker.setMaxListeners(1000);

worker.on("exit", () => {
this.workers[i] = new Worker(
Expand All @@ -79,6 +80,7 @@ export class BlockSubscription extends AbstractBlockSubscription {
workerData
}
);
this.workers[i].setMaxListeners(1000);
});

workers.push(worker);
Expand Down
8 changes: 8 additions & 0 deletions internal/coder/protobuf_coder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { CoderError } from "../errors/coder_error.js";
import protobuf, { Root, Type } from "protobufjs";
import { ICoder } from "../interfaces/coder.js";
import { createRequire } from "module";
import { Logger } from "../logger/logger.js";
const { load } = protobuf;

/**
Expand Down Expand Up @@ -76,6 +77,13 @@ export class Coder implements ICoder {
}

try {
if (this.messageType === "L1StateBlock") {
Logger.info({message: "In coder deserialize - for L1StateBlock", data: {
base64: buffer.toString("base64"),
stringData: buffer.toString(),
buffer
}});
}
return this.protobufType.decode(buffer);
} catch (error) {
throw new CoderError(
Expand Down
Loading

0 comments on commit dbc73e2

Please sign in to comment.