Skip to content

Commit

Permalink
feat(cactus-connector-ethereum): add RunTransactionV1Exchange to shar…
Browse files Browse the repository at this point in the history
…e receipt data

Also improves watch blocks

Authored-by: Bruno Mateus <[email protected]>
Co-authored-by: Rafael Belchior <[email protected]>
Signed-off-by: Rafael Belchior <[email protected]>
  • Loading branch information
RafaelAPB authored and petermetz committed Dec 5, 2024
1 parent 6249eb6 commit 8f4ed85
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ import {
Web3StringReturnFormat,
convertWeb3ReceiptStatusToBool,
} from "./types/util-types";
import { Observable, ReplaySubject } from "rxjs";

export interface RunTransactionV1Exchange {
request: InvokeContractV1Request;
response: RunTransactionResponse;
timestamp: Date;
}

// Used when waiting for WS requests to be send correctly before disconnecting
const waitForWsProviderRequestsTimeout = 5 * 1000; // 5s
Expand Down Expand Up @@ -150,6 +157,9 @@ export class PluginLedgerConnectorEthereum
private watchBlocksSubscriptions: Map<string, WatchBlocksV1Endpoint> =
new Map();

private txSubject: ReplaySubject<RunTransactionV1Exchange> =
new ReplaySubject();

public get className(): string {
return PluginLedgerConnectorEthereum.CLASS_NAME;
}
Expand Down Expand Up @@ -235,6 +245,10 @@ export class PluginLedgerConnectorEthereum
return this.instanceId;
}

public getTxSubjectObservable(): Observable<RunTransactionV1Exchange> {
return this.txSubject.asObservable();
}

private async removeWatchBlocksSubscriptionForSocket(socketId: string) {
try {
const subscription = this.watchBlocksSubscriptions.get(socketId);
Expand Down Expand Up @@ -655,6 +669,16 @@ export class PluginLedgerConnectorEthereum
});
const success = out.transactionReceipt.status;
const data = { success, out };

// create RunTransactionV1Exchange for transaction monitoring
const receiptData: RunTransactionV1Exchange = {
request: req,
response: out,
timestamp: new Date(),
};
this.log.debug(`RunTransactionV1Exchange created ${receiptData}`);
this.txSubject.next(receiptData);

return data;
} else {
throw new Error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ export * from "./generated/openapi/typescript-axios";
export {
PluginLedgerConnectorEthereum,
IPluginLedgerConnectorEthereumOptions,
RunTransactionV1Exchange,
} from "./plugin-ledger-connector-ethereum";

export * from "./sign-utils";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@ export function signTransaction(
| FeeMarketEIP1559Transaction
| BlobEIP4844Transaction;
} {
let chainConfiguration = new Common({ chain: Chain.Mainnet });
let chainConfiguration = new Common({
chain: Chain.Mainnet,
hardfork: "istanbul",
});
if (customChainInfo) {
chainConfiguration = Common.custom(customChainInfo);
}

const transaction = TransactionFactory.fromTxData(txData, {
common: chainConfiguration,
common: chainConfiguration as any,
});
if (privateKey.toLowerCase().startsWith("0x")) {
privateKey = privateKey.slice(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/

import Web3, { BlockHeaderOutput, FMT_BYTES, FMT_NUMBER } from "web3";
import { NewHeadsSubscription } from "web3-eth";
import { NewHeadsSubscription, RegisteredSubscription } from "web3-eth";
import type { Socket as SocketIoSocket } from "socket.io";

import {
Expand All @@ -21,10 +21,7 @@ import {
WatchBlocksV1,
Web3Transaction,
} from "../generated/openapi/typescript-axios";
import {
ConvertWeb3ReturnToString,
Web3StringReturnFormat,
} from "../types/util-types";
import { ConvertWeb3ReturnToString } from "../types/util-types";

const DEFAULT_HTTP_POLL_INTERVAL = 1000 * 5; // 5 seconds
const LAST_SEEN_LATEST_BLOCK = -1; // must be negative number, will be replaced with latest block in code
Expand Down Expand Up @@ -337,7 +334,7 @@ export class WatchBlocksV1SubscriptionEndpoint extends WatchBlocksV1Endpoint {
}

public async subscribe() {
const { socket, log, web3, isGetBlockData } = this;
const { socket, log, isGetBlockData } = this;
log.info(`${WatchBlocksV1.Subscribe} [WS Subscription] => ${socket.id}`);

if (this.isSubscribed) {
Expand All @@ -352,46 +349,53 @@ export class WatchBlocksV1SubscriptionEndpoint extends WatchBlocksV1Endpoint {
await this.emitAllSinceLastSeenBlock();

log.debug("Subscribing to Web3 new block headers event...");
this.newBlocksSubscription = await web3.eth.subscribe(
"newBlockHeaders",
undefined,
Web3StringReturnFormat,
);

this.newBlocksSubscription.on("data", async (blockHeader) => {
try {
log.debug("newBlockHeaders:", blockHeader);

if (typeof blockHeader.number === undefined) {
throw new Error(
`Missing block number in received block header (number: ${blockHeader.number}, hash: ${blockHeader.hash})`,
);
}
const blockNumber = Number(blockHeader.number);
if (blockNumber - this.lastSeenBlock > 2) {
log.info(
`Detected missing blocks since latest one (blockNumber: ${blockNumber}, lastSeenBlock: ${this.lastSeenBlock})`,
);
await this.emitAllSinceLastSeenBlock();
}
const options = {
subscription: "newBlockHeaders" as keyof RegisteredSubscription,
parameters: {},
};

let next: WatchBlocksV1Progress;
if (isGetBlockData) {
next = await this.getFullBlockProgress(blockNumber);
} else {
next = await this.headerDataToBlockProgress(blockHeader);
this.newBlocksSubscription = (await this.web3.eth.subscribe(
options.subscription,
options.parameters,
)) as unknown as NewHeadsSubscription;

this.newBlocksSubscription?.on(
"data",
async (blockHeader: BlockHeaderOutput) => {
try {
log.debug("newBlockHeaders:", blockHeader);

if (blockHeader.number === undefined || blockHeader.number === null) {
throw new Error(
`Missing block number in received block header (number: ${blockHeader.number}, hash: ${blockHeader.hash})`,
);
}
const blockNumber = Number(blockHeader.number);
if (blockNumber - this.lastSeenBlock > 2) {
log.info(
`Detected missing blocks since latest one (blockNumber: ${blockNumber}, lastSeenBlock: ${this.lastSeenBlock})`,
);
await this.emitAllSinceLastSeenBlock();
}

let next: WatchBlocksV1Progress;
if (isGetBlockData) {
next = await this.getFullBlockProgress(blockNumber);
} else {
next = await this.headerDataToBlockProgress(blockHeader);
}

socket.emit(WatchBlocksV1.Next, next);

this.lastSeenBlock = blockNumber;
} catch (error) {
log.warn("Error when parsing subscribed block data:", error);
}
},
);

socket.emit(WatchBlocksV1.Next, next);

this.lastSeenBlock = blockNumber;
} catch (error) {
log.warn("Error when parsing subscribed block data:", error);
}
});

this.newBlocksSubscription.on("error", async (error) => {
console.log("Error when subscribing to new block header: ", error);
this.newBlocksSubscription?.on("error", async (error: Error) => {
log.error("Error when subscribing to new block header: ", error);
socket.emit(WatchBlocksV1.Error, safeStringifyException(error));
await this.unsubscribe();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { v4 as uuidV4 } from "uuid";
import { AddressInfo } from "net";
import { Server as SocketIoServer } from "socket.io";
import Web3, { HexString } from "web3";
import { Address } from "@ethereumjs/util";

import {
LogLevelDesc,
Expand Down Expand Up @@ -351,11 +352,11 @@ describe("Ethereum contract deploy and invoke using keychain tests", () => {

test("invoke Web3SigningCredentialType.None", async () => {
const testEthAccount2 = web3.eth.accounts.create();

const address = Address.fromString(testEthAccount2.address);
const value = 10e6;
const { serializedTransactionHex } = signTransaction(
{
to: testEthAccount2.address,
to: address,
value,
maxPriorityFeePerGas: 0,
maxFeePerGas: 0x40000000,
Expand Down

0 comments on commit 8f4ed85

Please sign in to comment.