diff --git a/packages/cactus-plugin-satp-hermes/package.json b/packages/cactus-plugin-satp-hermes/package.json index 0199fc67b0..c986939b89 100644 --- a/packages/cactus-plugin-satp-hermes/package.json +++ b/packages/cactus-plugin-satp-hermes/package.json @@ -136,6 +136,7 @@ "jsonc": "2.0.0", "knex": "2.4.0", "kubo-rpc-client": "3.0.1", + "node-cron": "3.0.2", "npm-run-all": "4.1.5", "openzeppelin-solidity": "3.4.2", "pg": "8.13.1", @@ -163,6 +164,7 @@ "@types/fs-extra": "11.0.4", "@types/google-protobuf": "3.15.12", "@types/node": "18.18.2", + "@types/node-cron": "3.0.11", "@types/pg": "8.11.10", "@types/swagger-ui-express": "4.1.6", "@types/tape": "4.13.4", diff --git a/packages/cactus-plugin-satp-hermes/src/knex/migrations/20220331132128_create_logs_table.ts b/packages/cactus-plugin-satp-hermes/src/knex/migrations/20220331132128_create_logs_table.ts index 6227cc4aad..336b358e68 100644 --- a/packages/cactus-plugin-satp-hermes/src/knex/migrations/20220331132128_create_logs_table.ts +++ b/packages/cactus-plugin-satp-hermes/src/knex/migrations/20220331132128_create_logs_table.ts @@ -2,7 +2,7 @@ import { Knex } from "knex"; export function up(knex: Knex): Knex.SchemaBuilder { return knex.schema.createTable("logs", (table) => { - table.string("sessionID").notNullable(); + table.string("sessionId").notNullable(); table.string("type").notNullable(); table.string("key").notNullable().primary(); table.string("operation").notNullable(); diff --git a/packages/cactus-plugin-satp-hermes/src/main/proto/cacti/satp/v02/crash_recovery.proto b/packages/cactus-plugin-satp-hermes/src/main/proto/cacti/satp/v02/crash_recovery.proto index 6ba42bc554..39d881338b 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/proto/cacti/satp/v02/crash_recovery.proto +++ b/packages/cactus-plugin-satp-hermes/src/main/proto/cacti/satp/v02/crash_recovery.proto @@ -9,7 +9,7 @@ service CrashRecovery { // step RPCs rpc RecoverV2Message(RecoverMessage) returns (RecoverUpdateMessage); - rpc RecoverV2SuccessMessage(RecoverSuccessMessage) returns (google.protobuf.Empty); + rpc RecoverV2SuccessMessage(RecoverSuccessMessage) returns (RecoverSuccessMessageResponse); rpc RollbackV2Message(RollbackMessage) returns (RollbackAckMessage); } @@ -41,6 +41,12 @@ message RecoverSuccessMessage { string sender_signature = 6; } +message RecoverSuccessMessageResponse { + string session_id = 1; + bool received = 2; + string sender_signature = 3; +} + message RollbackMessage { string session_id = 1; string message_type = 2; diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/crash-management/client-service.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/crash-management/client-service.ts new file mode 100644 index 0000000000..0abe39df1a --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/crash-management/client-service.ts @@ -0,0 +1,100 @@ +import { + RecoverMessage, + RecoverMessageSchema, + RecoverSuccessMessage, + RecoverSuccessMessageSchema, + RollbackMessage, + RollbackMessageSchema, + RollbackState, +} from "../../../typescript/generated/proto/cacti/satp/v02/crash_recovery_pb"; +import { Logger, LoggerProvider } from "@hyperledger/cactus-common"; +import { SATPSession } from "../satp-session"; +import { ILocalLogRepository } from "../../repository/interfaces/repository"; +import { create } from "@bufbuild/protobuf"; +import { SATPLogger } from "../../logging"; + +export class CrashRecoveryClientService { + private readonly log: Logger; + + constructor( + private readonly logRepository: ILocalLogRepository, + private readonly sessions: Map, + private readonly dbLogger: SATPLogger, + loggerLabel: string = "CrashRecoveryClientService", + ) { + this.log = LoggerProvider.getOrCreate({ label: loggerLabel }); + this.log.trace(`Initialized ${CrashRecoveryClientService.name}`); + } + + public createRecoverMessage(session: SATPSession): RecoverMessage { + const fnTag = `${CrashRecoveryClientService.name}#createRecoverMessage`; + this.log.debug( + `${fnTag} - Creating RecoverMessage for sessionId: ${session.getSessionId()}`, + ); + + const sessionData = session.getClientSessionData(); + + const recoverMessage = create(RecoverMessageSchema, { + sessionId: session.getSessionId(), + messageType: "urn:ietf:SATP-2pc:msgtype:recover-msg", + satpPhase: "phase0", + sequenceNumber: Number(sessionData.lastSequenceNumber), + isBackup: false, + newIdentityPublicKey: "", + lastEntryTimestamp: BigInt(0), + senderSignature: "", + }); + //await this.dbLogger.persistLogEntry({}); + this.log.debug(`${fnTag} - RecoverMessage created:`, recoverMessage); + + return recoverMessage; + } + + public async createRecoverSuccessMessage( + session: SATPSession, + ): Promise { + const fnTag = `${CrashRecoveryClientService.name}#createRecoverSuccessMessage`; + this.log.debug( + `${fnTag} - Creating RecoverSuccessMessage for sessionId: ${session.getSessionId()}`, + ); + + const recoverSuccessMessage = create(RecoverSuccessMessageSchema, { + sessionId: session.getSessionId(), + messageType: "urn:ietf:SATP-2pc:msgtype:recover-success-msg", + hashRecoverUpdateMessage: "", + success: true, + entriesChanged: [], + senderSignature: "", + }); + //await this.dbLogger.persistLogEntry({}); + this.log.debug( + `${fnTag} - RecoverSuccessMessage created:`, + recoverSuccessMessage, + ); + + return recoverSuccessMessage; + } + + public async createRollbackMessage( + session: SATPSession, + rollbackState: RollbackState, + ): Promise { + const fnTag = `${CrashRecoveryClientService.name}#createRollbackMessage`; + this.log.debug( + `${fnTag} - Creating RollbackMessage for sessionId: ${session.getSessionId()}`, + ); + + const rollbackMessage = create(RollbackMessageSchema, { + sessionId: session.getSessionId(), + messageType: "urn:ietf:SATP-2pc:msgtype:rollback-msg", + success: rollbackState.status === "completed", + actionsPerformed: [], + proofs: [], + senderSignature: "", + }); + //await this.dbLogger.persistLogEntry({}); + this.log.debug(`${fnTag} - RollbackMessage created:`, rollbackMessage); + + return rollbackMessage; + } +} diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/crash-management/crash-handler.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/crash-management/crash-handler.ts new file mode 100644 index 0000000000..b5242612bc --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/crash-management/crash-handler.ts @@ -0,0 +1,146 @@ +import { ConnectRouter } from "@connectrpc/connect"; +import { Logger, LoggerProvider } from "@hyperledger/cactus-common"; +import { + CrashRecovery, + RecoverSuccessMessageResponse, +} from "../../generated/proto/cacti/satp/v02/crash_recovery_pb"; +import { CrashRecoveryServerService } from "./server-service"; +import { CrashRecoveryClientService } from "./client-service"; +import { SATPSession } from "../satp-session"; +import { + RecoverMessage, + RecoverUpdateMessage, + RecoverSuccessMessage, + RollbackMessage, + RollbackAckMessage, + RollbackState, +} from "../../generated/proto/cacti/satp/v02/crash_recovery_pb"; +import { SATPHandler, SATPHandlerType } from "../../types/satp-protocol"; + +export class CrashRecoveryHandler implements SATPHandler { + private readonly log: Logger; + + constructor( + private readonly serverService: CrashRecoveryServerService, + private readonly clientService: CrashRecoveryClientService, + loggerLabel: string = "CrashRecoveryHandler", + ) { + this.log = LoggerProvider.getOrCreate({ label: loggerLabel }); + this.log.trace(`Initialized ${CrashRecoveryHandler.name}`); + } + + public getHandlerIdentifier(): SATPHandlerType { + return SATPHandlerType.CRASH; + } + + public getHandlerSessions(): string[] { + return []; + } + + public getStage(): string { + return "crash"; + } + + // Server-side + + private async recoverV2MessageImplementation( + req: RecoverMessage, + ): Promise { + const fnTag = `${CrashRecoveryHandler.name}#recoverV2MessageImplementation`; + this.log.debug(`${fnTag} - Handling RecoverMessage: ${req}`); + try { + return await this.serverService.handleRecover(req); + } catch (error) { + this.log.error(`${fnTag} - Error:`, error); + throw error; + } + } + + private async recoverV2SuccessMessageImplementation( + req: RecoverSuccessMessage, + ): Promise { + const fnTag = `${CrashRecoveryHandler.name}#recoverV2SuccessMessageImplementation`; + this.log.debug(`${fnTag} - Handling RecoverSuccessMessage:${req}`); + try { + return await this.serverService.handleRecoverSuccess(req); + } catch (error) { + this.log.error(`${fnTag} - Error:`, error); + throw error; + } + } + + private async rollbackV2MessageImplementation( + req: RollbackMessage, + ): Promise { + const fnTag = `${CrashRecoveryHandler.name}#rollbackV2MessageImplementation`; + this.log.debug(`${fnTag} - Handling RollbackMessage: ${req}`); + try { + return await this.serverService.handleRollback(req); + } catch (error) { + this.log.error(`${fnTag} - Error:`, error); + throw error; + } + } + + public setupRouter(router: ConnectRouter): void { + // eslint-disable-next-line @typescript-eslint/no-this-alias + const that = this; + router.service(CrashRecovery, { + async recoverV2Message(req) { + return await that.recoverV2MessageImplementation(req); + }, + async recoverV2SuccessMessage(req) { + return await that.recoverV2SuccessMessageImplementation(req); + }, + async rollbackV2Message(req) { + return await that.rollbackV2MessageImplementation(req); + }, + }); + + this.log.info("Router setup completed for CrashRecoveryHandler"); + } + + // Client-side + + public async createRecoverMessage( + session: SATPSession, + ): Promise { + const fnTag = `${this.constructor.name}#createRecoverMessage`; + try { + return this.clientService.createRecoverMessage(session); + } catch (error) { + this.log.error(`${fnTag} - Failed to create RecoverMessage: ${error}`); + throw new Error(`Error in createRecoverMessage: ${error}`); + } + } + + public async createRecoverSuccessMessage( + session: SATPSession, + ): Promise { + const fnTag = `${this.constructor.name}#createRecoverSuccessMessage`; + try { + return await this.clientService.createRecoverSuccessMessage(session); + } catch (error) { + this.log.error( + `${fnTag} - Failed to create RecoverSuccessMessage: ${error}`, + ); + throw new Error(`Error in createRecoverSuccessMessage: ${error}`); + } + } + + public async createRollbackMessage( + session: SATPSession, + rollbackState: RollbackState, + ): Promise { + const fnTag = `${this.constructor.name}#createRollbackMessage`; + try { + return await this.clientService.createRollbackMessage( + session, + rollbackState, + ); + } catch (error) { + this.log.error(`${fnTag} - Failed to create RollbackMessage: ${error}`); + throw new Error(`Error in createRollbackMessage: ${error}`); + } + } +} diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/crash-management/recovery-handler.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/crash-management/recovery-handler.ts new file mode 100644 index 0000000000..e69de29bb2 diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/crash-management/rollback-handler.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/crash-management/rollback-handler.ts new file mode 100644 index 0000000000..e69de29bb2 diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/crash-management/rollback/rollback-strategy-factory.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/crash-management/rollback/rollback-strategy-factory.ts new file mode 100644 index 0000000000..44be5d1e7e --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/crash-management/rollback/rollback-strategy-factory.ts @@ -0,0 +1,69 @@ +import { Logger, LoggerProvider } from "@hyperledger/cactus-common"; +import { SATPSession } from "../../satp-session"; +import { Stage0RollbackStrategy } from "./stage0-rollback-strategy"; +import { Stage1RollbackStrategy } from "./stage1-rollback-strategy"; +import { Stage2RollbackStrategy } from "./stage2-rollback-strategy"; +import { Stage3RollbackStrategy } from "./stage3-rollback-strategy"; +import { SATPBridgesManager } from "../../../gol/satp-bridges-manager"; +import { RollbackState } from "../../../generated/proto/cacti/satp/v02/crash_recovery_pb"; +import { ILocalLogRepository } from "../../../repository/interfaces/repository"; +//import { SATPLogger } from "../../../logging"; + +export interface RollbackStrategy { + execute(session: SATPSession): Promise; + // todo do we want to return any information? + cleanup(session: SATPSession, state: RollbackState): Promise; +} + +export class RollbackStrategyFactory { + private log: Logger; + private bridgesManager: SATPBridgesManager; + private logRepository: ILocalLogRepository; + //private dbLogger: SATPLogger; + + constructor( + bridgesManager: SATPBridgesManager, + localLog: ILocalLogRepository, + ) { + this.log = LoggerProvider.getOrCreate({ label: "RollbackStrategyFactory" }); + this.bridgesManager = bridgesManager; + this.logRepository = localLog; + //this.dbLogger = dbLogger; + } + + createStrategy(session: SATPSession): RollbackStrategy { + const fnTag = "RollbackStrategyFactory#createStrategy"; + const sessionData = session.hasClientSessionData() + ? session.getClientSessionData() + : session.getServerSessionData(); + + if (!sessionData.hashes) { + this.log.debug(`${fnTag} Creating Stage0RollbackStrategy`); + return new Stage0RollbackStrategy(this.logRepository); + } else if ( + !sessionData.hashes.stage2 || + Object.keys(sessionData.hashes.stage2).length === 0 + ) { + this.log.debug(`${fnTag} Creating Stage1RollbackStrategy`); + return new Stage1RollbackStrategy( + this.bridgesManager, + this.logRepository, + ); + } else if ( + !sessionData.hashes.stage3 || + Object.keys(sessionData.hashes.stage3).length === 0 + ) { + this.log.debug(`${fnTag} Creating Stage2RollbackStrategy`); + return new Stage2RollbackStrategy( + this.bridgesManager, + this.logRepository, + ); + } else { + this.log.debug(`${fnTag} Creating Stage3RollbackStrategy`); + return new Stage3RollbackStrategy( + this.bridgesManager, + this.logRepository, + ); + } + } +} diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/crash-management/rollback/stage0-rollback-strategy.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/crash-management/rollback/stage0-rollback-strategy.ts new file mode 100644 index 0000000000..f8b1f4e54e --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/crash-management/rollback/stage0-rollback-strategy.ts @@ -0,0 +1,91 @@ +import { Logger, LoggerProvider } from "@hyperledger/cactus-common"; +import { SATPSession } from "../../satp-session"; +import { RollbackStrategy } from "./rollback-strategy-factory"; +import { + RollbackLogEntrySchema, + RollbackState, + RollbackStateSchema, +} from "../../../generated/proto/cacti/satp/v02/crash_recovery_pb"; +import { ILocalLogRepository } from "../../../repository/interfaces/repository"; +import { create } from "@bufbuild/protobuf"; +//import { SATPLogger } from "../../../logging"; +//import { stringify as safeStableStringify } from "safe-stable-stringify"; + +export class Stage0RollbackStrategy implements RollbackStrategy { + private log: Logger; + private logRepository: ILocalLogRepository; + + constructor(localLog: ILocalLogRepository) { + this.log = LoggerProvider.getOrCreate({ label: "Stage0RollbackStrategy" }); + this.logRepository = localLog; + } + + async execute(session: SATPSession): Promise { + const fnTag = "Stage0RollbackStrategy#execute"; + this.log.info(`${fnTag} Executing rollback for Stage 0`); + + if (!session) { + throw new Error(`${fnTag}, session data is not correctly initialized`); + } + const sessionData = session.hasClientSessionData() + ? session.getClientSessionData() + : session.getServerSessionData(); + + if (!sessionData) { + throw new Error(`${fnTag}, session data is not correctly initialized`); + } + const rollbackState = create(RollbackStateSchema, { + sessionId: session.getSessionId(), + currentStage: String(sessionData.hashes?.stage0), + stepsRemaining: 0, + rollbackLogEntries: [], + estimatedTimeToCompletion: "", + status: "IN_PROGRESS", + details: "", + }); + try { + const rollbackLogEntry = create(RollbackLogEntrySchema, { + sessionId: session.getSessionId(), + stage: "stage0", + timestamp: new Date().toISOString(), + action: "NO_ACTION_REQUIRED", + status: "SUCCESS", + details: "", + }); + + rollbackState.rollbackLogEntries.push(rollbackLogEntry); + rollbackState.status = "COMPLETED"; + rollbackState.details = "Rollback of Stage 0 completed successfully"; + + this.log.info(`${fnTag} Rollback of Stage 0 completed successfully`); + // todo: add logs for rollback + //await this.logRepository.create(logEntry); + + return rollbackState; + } catch (error) { + this.log.error(`${fnTag} Error during rollback of Stage 0: ${error}`); + + rollbackState.status = "FAILED"; + rollbackState.details = `Rollback of Stage 0 failed: ${error}`; + + return rollbackState; + } + } + + async cleanup(session: SATPSession): Promise { + const fnTag = "Stage0RollbackStrategy#cleanup"; + this.log.info(`${fnTag} Cleanup not required for Stage 0`); + + const rollbackState = create(RollbackStateSchema, { + sessionId: session.getSessionId(), + currentStage: "Stage0", + stepsRemaining: 0, + rollbackLogEntries: [], + estimatedTimeToCompletion: "", + status: "", + details: "", + }); + + return rollbackState; + } +} diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/crash-management/rollback/stage1-rollback-strategy.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/crash-management/rollback/stage1-rollback-strategy.ts new file mode 100644 index 0000000000..0ea993b68f --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/crash-management/rollback/stage1-rollback-strategy.ts @@ -0,0 +1,160 @@ +import { Logger, LoggerProvider } from "@hyperledger/cactus-common"; +import { SATPSession } from "../../satp-session"; +import { RollbackStrategy } from "./rollback-strategy-factory"; +import { + RollbackLogEntrySchema, + RollbackState, + RollbackStateSchema, +} from "../../../generated/proto/cacti/satp/v02/crash_recovery_pb"; +import { SATPBridgesManager } from "../../../gol/satp-bridges-manager"; +import { ILocalLogRepository } from "../../../repository/interfaces/repository"; +import { create } from "@bufbuild/protobuf"; + +export class Stage1RollbackStrategy implements RollbackStrategy { + private log: Logger; + private bridgeManager: SATPBridgesManager; + private logRepository: ILocalLogRepository; + + constructor( + bridgesManager: SATPBridgesManager, + localLog: ILocalLogRepository, + ) { + this.log = LoggerProvider.getOrCreate({ label: "Stage1RollbackStrategy" }); + this.bridgeManager = bridgesManager; + this.logRepository = localLog; + } + + async execute(session: SATPSession): Promise { + const fnTag = "Stage1RollbackStrategy#execute"; + this.log.info(`${fnTag} Executing rollback for Stage 1`); + + const sessionData = session.hasClientSessionData() + ? session.getClientSessionData() + : session.getServerSessionData(); + + if (!sessionData) { + throw new Error(`${fnTag}, session data is not correctly initialized`); + } + + const isClient = session.hasClientSessionData(); + const network = isClient + ? sessionData.senderGatewayNetworkId + : sessionData.recipientGatewayNetworkId; + + if (isClient && !network) { + throw new Error( + `${fnTag}: Unable to determine client network from session data.`, + ); + } + this.log.info(`${fnTag} network: ${network}`); + + const bridge = this.bridgeManager.getBridge(network); + if (!bridge) { + throw new Error(`${fnTag}: No bridge found for network: ${network}`); + } + + const rollbackState = create(RollbackStateSchema, { + sessionId: session.getSessionId(), + currentStage: "Stage1", + stepsRemaining: isClient ? 1 : 0, + rollbackLogEntries: [], + estimatedTimeToCompletion: "", + status: "IN_PROGRESS", + details: "", + }); + + try { + if (isClient) { + // Client-side: + const assetId = sessionData.senderAsset?.tokenId; + + if (!assetId) { + throw new Error(`${fnTag}: Asset ID is undefined`); + } + + this.log.info(`${fnTag} Unwrapping Asset ID: ${assetId}`); + + await bridge.unwrapAsset(assetId); + + const rollbackLogEntry = create(RollbackLogEntrySchema, { + sessionId: session.getSessionId(), + stage: "Stage1", + timestamp: new Date().toISOString(), + action: "UNWRAP_ASSET", + status: "SUCCESS", + details: "", + }); + + rollbackState.rollbackLogEntries.push(rollbackLogEntry); + rollbackState.stepsRemaining = 0; + rollbackState.estimatedTimeToCompletion = ""; + rollbackState.status = "COMPLETED"; + rollbackState.details = "Rollback of Stage 1 completed successfully"; + } else { + // Server-side: + const rollbackLogEntry = create(RollbackLogEntrySchema, { + sessionId: session.getSessionId(), + stage: "Stage1", + timestamp: new Date().toISOString(), + action: "NO_ACTION_REQUIRED", + status: "SUCCESS", + details: "No rollback action required for server in Stage 1.", + }); + + rollbackState.rollbackLogEntries.push(rollbackLogEntry); + rollbackState.status = "COMPLETED"; + rollbackState.details = + "No rollback action required for server in Stage 1."; + } + + this.log.info( + `${fnTag} Successfully rolled back Stage 1 for session ${session.getSessionId}`, + ); + // todo: add logs for rollback + //await this.logRepository.create(logEntry); + return rollbackState; + } catch (error) { + this.log.error(`${fnTag} Failed to rollback Stage 1: ${error}`); + + const rollbackLogEntry = create(RollbackLogEntrySchema, { + sessionId: sessionData.id, + stage: "Stage1", + timestamp: new Date().toISOString(), + action: isClient ? "UNWRAP_ASSET" : "NO_ACTION_REQUIRED", + status: "FAILED", + details: `Rollback of Stage 1 failed: ${error}`, + }); + + rollbackState.rollbackLogEntries.push(rollbackLogEntry); + rollbackState.status = "FAILED"; + rollbackState.details = `Rollback of Stage 1 failed: ${error}`; + + return rollbackState; + } + } + + async cleanup( + session: SATPSession, + state: RollbackState, + ): Promise { + const fnTag = "Stage1RollbackStrategy#cleanup"; + this.log.info(`${fnTag} Cleaning up after Stage 1 rollback`); + + if (!session) { + this.log.error(`${fnTag} Session not found`); + return state; + } + + try { + // TODO: Implement Stage 1 specific cleanup logic + + //state.currentStage = ""; + // TODO: Update other state properties as needed + + return state; + } catch (error) { + this.log.error(`${fnTag} Cleanup failed: ${error}`); + return state; + } + } +} diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/crash-management/rollback/stage2-rollback-strategy.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/crash-management/rollback/stage2-rollback-strategy.ts new file mode 100644 index 0000000000..d0ca8d6dfc --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/crash-management/rollback/stage2-rollback-strategy.ts @@ -0,0 +1,171 @@ +import { Logger, LoggerProvider } from "@hyperledger/cactus-common"; +import { SATPSession } from "../../satp-session"; +import { RollbackStrategy } from "./rollback-strategy-factory"; +import { + RollbackLogEntrySchema, + RollbackState, + RollbackStateSchema, +} from "../../../generated/proto/cacti/satp/v02/crash_recovery_pb"; +import { SATPBridgesManager } from "../../../gol/satp-bridges-manager"; +import { ILocalLogRepository } from "../../../repository/interfaces/repository"; +import { create } from "@bufbuild/protobuf"; + +export class Stage2RollbackStrategy implements RollbackStrategy { + private log: Logger; + private bridgeManager: SATPBridgesManager; + private logRepository: ILocalLogRepository; + + constructor( + bridgesManager: SATPBridgesManager, + localLog: ILocalLogRepository, + ) { + this.log = LoggerProvider.getOrCreate({ label: "Stage2RollbackStrategy" }); + this.bridgeManager = bridgesManager; + this.logRepository = localLog; + } + + async execute(session: SATPSession): Promise { + const fnTag = "Stage2RollbackStrategy#execute"; + this.log.info(`${fnTag} Executing rollback for Stage 2`); + + const sessionData = session.hasClientSessionData() + ? session.getClientSessionData() + : session.getServerSessionData(); + + if (!sessionData) { + throw new Error(`${fnTag}, session data is not correctly initialized`); + } + + const isClient = session.hasClientSessionData(); + const network = isClient + ? sessionData.senderGatewayNetworkId + : sessionData.recipientGatewayNetworkId; + + if (isClient && !network) { + throw new Error( + `${fnTag}: Unable to determine client network from session data.`, + ); + } + this.log.info(`${fnTag} network: ${network}`); + + const bridge = this.bridgeManager.getBridge(network); + if (!bridge) { + throw new Error(`${fnTag}: No bridge found for network: ${network}`); + } + + const rollbackState = create(RollbackStateSchema, { + sessionId: session.getSessionId(), + currentStage: "Stage2", + stepsRemaining: isClient ? 1 : 0, + rollbackLogEntries: [], + estimatedTimeToCompletion: "", + status: "IN_PROGRESS", + details: "", + }); + + try { + if (isClient) { + // Client-side: Unlock the asset to revert the lock action + const assetId = sessionData.senderAsset?.tokenId; + const amount = sessionData.senderAsset?.amount; + + if (!assetId) { + throw new Error(`${fnTag}: Asset ID is undefined`); + } + + if (amount === undefined || amount === null) { + throw new Error(`${fnTag}: Amount is missing`); + } + + this.log.info( + `${fnTag} Unlocking Asset ID: ${assetId}, Amount: ${amount}`, + ); + + await bridge.unlockAsset(assetId, Number(amount)); + + const rollbackLogEntry = create(RollbackLogEntrySchema, { + sessionId: session.getSessionId(), + stage: "Stage2", + timestamp: new Date().toISOString(), + action: "UNLOCK_ASSET", + status: "SUCCESS", + details: "", + }); + + rollbackState.stepsRemaining = 1; + rollbackState.rollbackLogEntries.push(rollbackLogEntry); + rollbackState.rollbackLogEntries.push(rollbackLogEntry); + rollbackState.stepsRemaining = 1; + rollbackState.rollbackLogEntries.push(rollbackLogEntry); + rollbackState.stepsRemaining = 1; + rollbackState.status = "COMPLETED"; + rollbackState.estimatedTimeToCompletion = ""; + rollbackState.details = "Rollback of Stage 2 completed successfully"; + } else { + // Server-side: + const rollbackLogEntry = create(RollbackLogEntrySchema, { + sessionId: session.getSessionId(), + stage: "Stage2", + timestamp: new Date().toISOString(), + action: "NO_ACTION_REQUIRED", + status: "SUCCESS", + details: "No rollback action required for server in Stage 2.", + }); + + rollbackState.rollbackLogEntries.push(rollbackLogEntry); + rollbackState.status = "COMPLETED"; + rollbackState.details = + "No rollback action required for server in Stage 2."; + } + + this.log.info( + `${fnTag} Successfully rolled back Stage 2 for session ${session.getSessionId}`, + ); + // todo: add logs for rollback + //await this.logRepository.create(logEntry); + return rollbackState; + } catch (error) { + this.log.error(`${fnTag} Failed to rollback Stage 2: ${error}`); + + const rollbackLogEntry = create(RollbackLogEntrySchema, { + sessionId: session.getSessionId(), + stage: "Stage2", + timestamp: new Date().toISOString(), + action: isClient ? "UNLOCK_ASSET" : "NO_ACTION_REQUIRED", + status: "FAILED", + details: `Rollback of Stage 2 failed: ${error}`, + }); + + rollbackState.rollbackLogEntries.push(rollbackLogEntry); + rollbackState.status = "FAILED"; + rollbackState.details = `Rollback of Stage 2 failed: ${error}`; + + return rollbackState; + } + } + + async cleanup( + session: SATPSession, + state: RollbackState, + ): Promise { + const fnTag = "Stage2RollbackStrategy#cleanup"; + this.log.info(`${fnTag} Cleaning up after Stage 2 rollback`); + + if (!session) { + this.log.error(`${fnTag} Session not found`); + return state; + } + + try { + // TODO: Implement Stage 2 specific cleanup logic + + //state.currentStage = "Stage2"; + // TODO: Update other state properties as needed + + return state; + } catch (error) { + this.log.error(`${fnTag} Cleanup failed: ${error}`); + return state; + } + } +} diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/crash-management/rollback/stage3-rollback-strategy.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/crash-management/rollback/stage3-rollback-strategy.ts new file mode 100644 index 0000000000..8689880a52 --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/crash-management/rollback/stage3-rollback-strategy.ts @@ -0,0 +1,163 @@ +import { Logger, LoggerProvider } from "@hyperledger/cactus-common"; +import { SATPSession } from "../../satp-session"; +import { RollbackStrategy } from "./rollback-strategy-factory"; +import { + RollbackLogEntrySchema, + RollbackState, + RollbackStateSchema, +} from "../../../generated/proto/cacti/satp/v02/crash_recovery_pb"; +import { SATPBridgesManager } from "../../../gol/satp-bridges-manager"; +import { ILocalLogRepository } from "../../../repository/interfaces/repository"; +import { create } from "@bufbuild/protobuf"; + +export class Stage3RollbackStrategy implements RollbackStrategy { + private log: Logger; + private bridgeManager: SATPBridgesManager; + private logRepository: ILocalLogRepository; + + constructor( + bridgesManager: SATPBridgesManager, + localLog: ILocalLogRepository, + ) { + this.log = LoggerProvider.getOrCreate({ label: "Stage3RollbackStrategy" }); + this.bridgeManager = bridgesManager; + this.logRepository = localLog; + } + + async execute(session: SATPSession): Promise { + const fnTag = "Stage3RollbackStrategy#execute"; + this.log.info(`${fnTag} Executing rollback for Stage 3`); + + const sessionData = session.hasClientSessionData() + ? session.getClientSessionData() + : session.getServerSessionData(); + + if (!sessionData) { + throw new Error(`${fnTag}, session data is not correctly initialized`); + } + + const isClient = session.hasClientSessionData(); + const network = isClient + ? sessionData.senderGatewayNetworkId + : sessionData.recipientGatewayNetworkId; + + if (isClient && !network) { + throw new Error( + `${fnTag}: Unable to determine client network from session data.`, + ); + } + this.log.info(`${fnTag} network: ${network}`); + + const bridge = this.bridgeManager.getBridge(network); + if (!bridge) { + throw new Error(`${fnTag}: No bridge found for network: ${network}`); + } + + const rollbackState = create(RollbackStateSchema, { + sessionId: session.getSessionId(), + currentStage: "Stage3", + stepsRemaining: isClient ? 1 : 1, + rollbackLogEntries: [], + estimatedTimeToCompletion: "", + status: "IN_PROGRESS", + details: "", + }); + + try { + if (isClient) { + // Client-side: + const rollbackLogEntry = create(RollbackLogEntrySchema, { + sessionId: session.getSessionId(), + stage: "Stage3", + timestamp: new Date().toISOString(), + action: "NO_ACTION_REQUIRED", + status: "SUCCESS", + details: "No rollback action required for client in Stage 3.", + }); + + rollbackState.rollbackLogEntries.push(rollbackLogEntry); + } + + if (!isClient) { + // Server-side: + const assetId = sessionData.receiverAsset?.tokenId; + const amount = sessionData.receiverAsset?.amount; + + if (!assetId) { + throw new Error(`${fnTag}: Sender Asset ID is undefined`); + } + + if (amount === undefined || amount === null) { + throw new Error(`${fnTag}: Amount is missing`); + } + + this.log.info(`${fnTag} Burning Asset ID at Destination: ${assetId}`); + + await bridge.burnAsset(assetId, Number(amount)); + const rollbackLogEntry = create(RollbackLogEntrySchema, { + sessionId: session.getSessionId(), + stage: "Stage3", + timestamp: new Date().toISOString(), + action: "BURN_ASSET_DESTINATION", + status: "SUCCESS", + details: "", + }); + + rollbackState.rollbackLogEntries.push(rollbackLogEntry); + } + + rollbackState.status = "COMPLETED"; + rollbackState.estimatedTimeToCompletion = ""; + rollbackState.details = "Rollback of Stage 3 completed successfully"; + + this.log.info( + `${fnTag} Successfully rolled back Stage 3 for session ${session.getSessionId()}`, + ); + // todo: add logs for rollback + //await this.logRepository.create(logEntry); + return rollbackState; + } catch (error) { + this.log.error(`${fnTag} Failed to rollback Stage 3: ${error}`); + + const rollbackLogEntry = create(RollbackLogEntrySchema, { + sessionId: session.getSessionId(), + stage: "Stage3", + timestamp: new Date().toISOString(), + action: isClient ? "BURN_ASSET_ORIGIN" : "BURN_ASSET_DESTINATION", + status: "FAILED", + details: `Rollback of Stage 3 failed: ${error}`, + }); + + rollbackState.rollbackLogEntries.push(rollbackLogEntry); + rollbackState.status = "FAILED"; + rollbackState.details = `Rollback of Stage 3 failed: ${error}`; + + return rollbackState; + } + } + + async cleanup( + session: SATPSession, + state: RollbackState, + ): Promise { + const fnTag = "Stage3RollbackStrategy#cleanup"; + this.log.info(`${fnTag} Cleaning up after Stage 3 rollback`); + + if (!session) { + this.log.error(`${fnTag} Session not found`); + return state; + } + + try { + // TODO: Implement Stage 3 specific cleanup logic + + //state.currentStage = ""; + // TODO: Update other state properties as needed + + return state; + } catch (error) { + this.log.error(`${fnTag} Cleanup failed: ${error}`); + return state; + } + } +} diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/crash-management/server-service.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/crash-management/server-service.ts new file mode 100644 index 0000000000..a91a3102d9 --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/crash-management/server-service.ts @@ -0,0 +1,165 @@ +import { + RecoverMessage, + RecoverUpdateMessage, + RecoverSuccessMessage, + RollbackMessage, + RollbackAckMessage, + RecoverUpdateMessageSchema, + RollbackAckMessageSchema, + RecoverSuccessMessageResponse, + RecoverSuccessMessageResponseSchema, +} from "../../generated/proto/cacti/satp/v02/crash_recovery_pb"; +import { SATPSession } from "../satp-session"; +import { ILocalLogRepository } from "../../repository/interfaces/repository"; +import { Logger, LoggerProvider } from "@hyperledger/cactus-common"; +import { RollbackStrategyFactory } from "./rollback/rollback-strategy-factory"; +import { SATPBridgesManager } from "../../gol/satp-bridges-manager"; +import { create } from "@bufbuild/protobuf"; +import { SATPLogger } from "../../logging"; +import { stringify as safeStableStringify } from "safe-stable-stringify"; + +export class CrashRecoveryServerService { + private readonly log: Logger; + + constructor( + private readonly bridgesManager: SATPBridgesManager, + private readonly logRepository: ILocalLogRepository, + private readonly sessions: Map, + private readonly dbLogger: SATPLogger, + loggerLabel: string = "CrashRecoveryServerService", + ) { + this.log = LoggerProvider.getOrCreate({ label: loggerLabel }); + this.log.trace(`Initialized ${CrashRecoveryServerService.name}`); + } + + public async handleRecover( + req: RecoverMessage, + ): Promise { + const fnTag = `${CrashRecoveryServerService.name}#handleRecover`; + + try { + this.log.debug(`${fnTag} - Handling RecoverMessage:`, req); + + const session = this.sessions.get(req.sessionId); + const sessionData = session?.getServerSessionData(); + if (!session) { + this.log.error(`${fnTag} - Session not found: ${req.sessionId}`); + throw new Error(`Session not found: ${req.sessionId}`); + } + + if (!sessionData) { + this.log.error(`${fnTag} - SessionData not found: ${req.sessionId}`); + throw new Error(`Error: ${req.sessionId}`); + } + + const recoveredLogs = await this.logRepository.fetchLogsFromSequence( + req.sessionId, + Number(session.getServerSessionData().lastSequenceNumber), + ); + + const recoverUpdateMessage = create(RecoverUpdateMessageSchema, { + sessionId: req.sessionId, + messageType: "urn:ietf:SATP-2pc:msgtype:recover-update-msg", + hashRecoverMessage: "", + recoveredLogs: recoveredLogs, + senderSignature: "", + }); + + await this.dbLogger.persistLogEntry({ + sessionID: sessionData.id, + type: "urn:ietf:SATP-2pc:msgtype:recover-update-msg", + operation: "done", + data: safeStableStringify(sessionData), + sequenceNumber: Number(sessionData.lastSequenceNumber), + }); + this.log.debug( + `${fnTag} - RecoverUpdateMessage created:`, + recoverUpdateMessage, + ); + + return recoverUpdateMessage; + } catch (error) { + this.log.error(`${fnTag} - Error handling RecoverMessage: ${error}`); + throw error; + } + } + + public async handleRecoverSuccess( + req: RecoverSuccessMessage, + ): Promise { + const fnTag = `${CrashRecoveryServerService.name}#handleRecoverSuccess`; + + try { + this.log.debug(`${fnTag} - Handling RecoverSuccessMessage:`, req); + + const session = this.sessions.get(req.sessionId); + if (!session) { + this.log.error(`${fnTag} - Session not found: ${req.sessionId}`); + throw new Error(`Session not found: ${req.sessionId}`); + } + + const recoverSuccessMessageResponse = create( + RecoverSuccessMessageResponseSchema, + { + sessionId: req.sessionId, + received: true, + senderSignature: "", + }, + ); + //await this.dbLogger.persistLogEntry({}); + this.log.info(`${fnTag} - Session marked as recovered: ${req.sessionId}`); + return recoverSuccessMessageResponse; + } catch (error) { + this.log.error( + `${fnTag} - Error handling RecoverSuccessMessage: ${error}`, + ); + throw error; + } + } + + public async handleRollback( + req: RollbackMessage, + ): Promise { + const fnTag = `${CrashRecoveryServerService.name}#handleRollback`; + + try { + this.log.debug(`${fnTag} - Handling RollbackMessage:`, req); + + const session = this.sessions.get(req.sessionId); + if (!session) { + this.log.error(`${fnTag} - Session not found: ${req.sessionId}`); + throw new Error(`Session not found: ${req.sessionId}`); + } + + const factory = new RollbackStrategyFactory( + this.bridgesManager, + this.logRepository, + //this.dbLogger, + ); + + const strategy = factory.createStrategy(session); + + const rollbackState = await strategy.execute(session); + + const rollbackAckMessage = create(RollbackAckMessageSchema, { + sessionId: req.sessionId, + messageType: "urn:ietf:SATP-2pc:msgtype:rollback-ack-msg", + success: rollbackState.status === "COMPLETED", + actionsPerformed: rollbackState.rollbackLogEntries.map( + (entry) => entry.action, + ), + proofs: [], + senderSignature: "", + }); + //await this.dbLogger.persistLogEntry({}); + this.log.info( + `${fnTag} - Rollback performed for session: ${req.sessionId}`, + ); + + return rollbackAckMessage; + } catch (error) { + this.log.error(`${fnTag} - Error handling RollbackMessage: ${error}`); + throw error; + } + } +} diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/satp-session.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/satp-session.ts index dab36d5960..350ab60e8f 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/satp-session.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/satp-session.ts @@ -143,6 +143,29 @@ export class SATPSession { return this.clientSessionData; } + public static fromSessionData(sessionData: SessionData): SATPSession { + // Determine if it's a client or server session based on the presence of gateway pubkeys + const isServer = sessionData.serverGatewayPubkey !== ""; + const isClient = sessionData.clientGatewayPubkey !== ""; + + const session = new SATPSession({ + contextID: sessionData.transferContextId, + sessionID: sessionData.id, + server: isServer, + client: isClient, + }); + + // Assign the sessionData to the appropriate property + if (isServer) { + session.serverSessionData = sessionData; + } + if (isClient) { + session.clientSessionData = sessionData; + } + + return session; + } + public createSessionData( type: SessionType, sessionId: string, diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/types.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/types.ts index b7613010c8..2e1a23803c 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/types.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/types.ts @@ -107,7 +107,7 @@ export function isOfType( } export interface LocalLog { - sessionID: string; + sessionId: string; type: string; key: string; operation: string; @@ -127,3 +127,11 @@ export interface SATPBridgeConfig { logLevel?: LogLevelDesc; } export { SATPServiceInstance }; + +export enum CrashStatus { + IN_RECOVERY = "IN_RECOVERY", + RECOVERED = "RECOVERED", + NO_CRASH = "NO_CRASH", + ROLLBACK = "ROLLBACK_REQUIRED", + ERROR = "ERROR", +} diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/SATPWrapperContract.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/SATPWrapperContract.ts index 74b5cfc08c..4ae4af7732 100755 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/SATPWrapperContract.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/SATPWrapperContract.ts @@ -1,12 +1,12 @@ -import BN from 'bn.js'; -import BigNumber from 'bignumber.js'; +import BN from "bn.js"; +import BigNumber from "bignumber.js"; import { PromiEvent, TransactionReceipt, EventResponse, EventData, Web3ContractContext, -} from 'ethereum-abi-types-generator'; +} from "ethereum-abi-types-generator"; export interface CallOptions { from?: string; @@ -31,12 +31,12 @@ export interface MethodPayableReturnContext { send(options: SendOptions): PromiEvent; send( options: SendOptions, - callback: (error: Error, result: any) => void + callback: (error: Error, result: any) => void, ): PromiEvent; estimateGas(options: EstimateGasOptions): Promise; estimateGas( options: EstimateGasOptions, - callback: (error: Error, result: any) => void + callback: (error: Error, result: any) => void, ): Promise; encodeABI(): string; } @@ -46,7 +46,7 @@ export interface MethodConstantReturnContext { call(options: CallOptions): Promise; call( options: CallOptions, - callback: (error: Error, result: TCallReturn) => void + callback: (error: Error, result: TCallReturn) => void, ): Promise; encodeABI(): string; } @@ -60,60 +60,60 @@ export type ContractContext = Web3ContractContext< SATPWrapperContractEvents >; export type SATPWrapperContractEvents = - | 'Assign' - | 'Burn' - | 'Changed' - | 'Lock' - | 'Mint' - | 'OwnershipTransferred' - | 'Unlock' - | 'Unwrap' - | 'Wrap'; + | "Assign" + | "Burn" + | "Changed" + | "Lock" + | "Mint" + | "OwnershipTransferred" + | "Unlock" + | "Unwrap" + | "Wrap"; export interface SATPWrapperContractEventsContext { Assign( parameters: { filter?: { tokenId?: string | string[] }; fromBlock?: number; - toBlock?: 'latest' | number; + toBlock?: "latest" | number; topics?: string[]; }, - callback?: (error: Error, event: EventData) => void + callback?: (error: Error, event: EventData) => void, ): EventResponse; Burn( parameters: { filter?: { tokenId?: string | string[] }; fromBlock?: number; - toBlock?: 'latest' | number; + toBlock?: "latest" | number; topics?: string[]; }, - callback?: (error: Error, event: EventData) => void + callback?: (error: Error, event: EventData) => void, ): EventResponse; Changed( parameters: { filter?: { id?: string | string[] }; fromBlock?: number; - toBlock?: 'latest' | number; + toBlock?: "latest" | number; topics?: string[]; }, - callback?: (error: Error, event: EventData) => void + callback?: (error: Error, event: EventData) => void, ): EventResponse; Lock( parameters: { filter?: { tokenId?: string | string[] }; fromBlock?: number; - toBlock?: 'latest' | number; + toBlock?: "latest" | number; topics?: string[]; }, - callback?: (error: Error, event: EventData) => void + callback?: (error: Error, event: EventData) => void, ): EventResponse; Mint( parameters: { filter?: { tokenId?: string | string[] }; fromBlock?: number; - toBlock?: 'latest' | number; + toBlock?: "latest" | number; topics?: string[]; }, - callback?: (error: Error, event: EventData) => void + callback?: (error: Error, event: EventData) => void, ): EventResponse; OwnershipTransferred( parameters: { @@ -122,57 +122,57 @@ export interface SATPWrapperContractEventsContext { newOwner?: string | string[]; }; fromBlock?: number; - toBlock?: 'latest' | number; + toBlock?: "latest" | number; topics?: string[]; }, - callback?: (error: Error, event: EventData) => void + callback?: (error: Error, event: EventData) => void, ): EventResponse; Unlock( parameters: { filter?: { tokenId?: string | string[] }; fromBlock?: number; - toBlock?: 'latest' | number; + toBlock?: "latest" | number; topics?: string[]; }, - callback?: (error: Error, event: EventData) => void + callback?: (error: Error, event: EventData) => void, ): EventResponse; Unwrap( parameters: { filter?: { tokenId?: string | string[] }; fromBlock?: number; - toBlock?: 'latest' | number; + toBlock?: "latest" | number; topics?: string[]; }, - callback?: (error: Error, event: EventData) => void + callback?: (error: Error, event: EventData) => void, ): EventResponse; Wrap( parameters: { filter?: { tokenId?: string | string[] }; fromBlock?: number; - toBlock?: 'latest' | number; + toBlock?: "latest" | number; topics?: string[]; }, - callback?: (error: Error, event: EventData) => void + callback?: (error: Error, event: EventData) => void, ): EventResponse; } export type SATPWrapperContractMethodNames = - | 'new' - | 'assign' - | 'bridge_address' - | 'burn' - | 'getAllAssetsIDs' - | 'getToken' - | 'lock' - | 'mint' - | 'owner' - | 'renounceOwnership' - | 'tokens' - | 'tokensInteractions' - | 'transferOwnership' - | 'unlock' - | 'unwrap' - | 'wrap' - | 'wrap'; + | "new" + | "assign" + | "bridge_address" + | "burn" + | "getAllAssetsIDs" + | "getToken" + | "lock" + | "mint" + | "owner" + | "renounceOwnership" + | "tokens" + | "tokensInteractions" + | "transferOwnership" + | "unlock" + | "unwrap" + | "wrap" + | "wrap"; export interface TokenResponse { contractAddress: string; tokenType: string; @@ -243,7 +243,7 @@ export interface SATPWrapperContract { * Type: constructor * @param _bridge_address Type: address, Indexed: false */ - 'new'(_bridge_address: string): MethodReturnContext; + "new"(_bridge_address: string): MethodReturnContext; /** * Payable: false * Constant: false @@ -256,7 +256,7 @@ export interface SATPWrapperContract { assign( tokenId: string, receiver_account: string, - amount: string + amount: string, ): MethodReturnContext; /** * Payable: false @@ -339,7 +339,7 @@ export interface SATPWrapperContract { */ tokensInteractions( parameter0: string, - parameter1: string | number + parameter1: string | number, ): MethodConstantReturnContext; /** * Payable: false @@ -382,7 +382,7 @@ export interface SATPWrapperContract { tokenType: string | number, tokenId: string, owner: string, - interactions: InteractionsRequest[] + interactions: InteractionsRequest[], ): MethodReturnContext; /** * Payable: false @@ -398,6 +398,6 @@ export interface SATPWrapperContract { contractAddress: string, tokenType: string | number, tokenId: string, - owner: string + owner: string, ): MethodReturnContext; } diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/crash_recovery_connect.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/crash_recovery_connect.ts index 5c58c6797e..d5c0f27c49 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/crash_recovery_connect.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/crash_recovery_connect.ts @@ -3,9 +3,10 @@ /* eslint-disable */ // @ts-nocheck +import { RecoverMessage, RecoverSuccessMessage, RecoverSuccessMessageResponse, RecoverUpdateMessage, RollbackAckMessage, RollbackMessage } from "./crash_recovery_pb.js"; +import { MethodKind } from "@bufbuild/protobuf"; + /** - * TODO: Rollback and crash-recovery related - * * util RPCs * * @generated from service cacti.satp.v02.crash.CrashRecovery @@ -13,6 +14,35 @@ export const CrashRecovery = { typeName: "cacti.satp.v02.crash.CrashRecovery", methods: { + /** + * step RPCs + * + * @generated from rpc cacti.satp.v02.crash.CrashRecovery.RecoverV2Message + */ + recoverV2Message: { + name: "RecoverV2Message", + I: RecoverMessage, + O: RecoverUpdateMessage, + kind: MethodKind.Unary, + }, + /** + * @generated from rpc cacti.satp.v02.crash.CrashRecovery.RecoverV2SuccessMessage + */ + recoverV2SuccessMessage: { + name: "RecoverV2SuccessMessage", + I: RecoverSuccessMessage, + O: RecoverSuccessMessageResponse, + kind: MethodKind.Unary, + }, + /** + * @generated from rpc cacti.satp.v02.crash.CrashRecovery.RollbackV2Message + */ + rollbackV2Message: { + name: "RollbackV2Message", + I: RollbackMessage, + O: RollbackAckMessage, + kind: MethodKind.Unary, + }, } } as const; diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/crash_recovery_pb.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/crash_recovery_pb.ts index 21d6c15775..42892a53bf 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/crash_recovery_pb.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/crash_recovery_pb.ts @@ -2,24 +2,437 @@ // @generated from file cacti/satp/v02/crash_recovery.proto (package cacti.satp.v02.crash, syntax proto3) /* eslint-disable */ -import type { GenFile, GenService } from "@bufbuild/protobuf/codegenv1"; -import { fileDesc, serviceDesc } from "@bufbuild/protobuf/codegenv1"; +import type { GenFile, GenMessage, GenService } from "@bufbuild/protobuf/codegenv1"; +import { fileDesc, messageDesc, serviceDesc } from "@bufbuild/protobuf/codegenv1"; import { file_google_protobuf_empty } from "@bufbuild/protobuf/wkt"; +import type { Message } from "@bufbuild/protobuf"; /** * Describes the file cacti/satp/v02/crash_recovery.proto. */ export const file_cacti_satp_v02_crash_recovery: GenFile = /*@__PURE__*/ - fileDesc("CiNjYWN0aS9zYXRwL3YwMi9jcmFzaF9yZWNvdmVyeS5wcm90bxIUY2FjdGkuc2F0cC52MDIuY3Jhc2gyDwoNQ3Jhc2hSZWNvdmVyeWIGcHJvdG8z", [file_google_protobuf_empty]); + fileDesc("CiNjYWN0aS9zYXRwL3YwMi9jcmFzaF9yZWNvdmVyeS5wcm90bxIUY2FjdGkuc2F0cC52MDIuY3Jhc2gi0wEKDlJlY292ZXJNZXNzYWdlEhIKCnNlc3Npb25faWQYASABKAkSFAoMbWVzc2FnZV90eXBlGAIgASgJEhIKCnNhdHBfcGhhc2UYAyABKAkSFwoPc2VxdWVuY2VfbnVtYmVyGAQgASgFEhEKCWlzX2JhY2t1cBgFIAEoCBIfChduZXdfaWRlbnRpdHlfcHVibGljX2tleRgGIAEoCRIcChRsYXN0X2VudHJ5X3RpbWVzdGFtcBgHIAEoAxIYChBzZW5kZXJfc2lnbmF0dXJlGAggASgJIrABChRSZWNvdmVyVXBkYXRlTWVzc2FnZRISCgpzZXNzaW9uX2lkGAEgASgJEhQKDG1lc3NhZ2VfdHlwZRgCIAEoCRIcChRoYXNoX3JlY292ZXJfbWVzc2FnZRgDIAEoCRI2Cg5yZWNvdmVyZWRfbG9ncxgEIAMoCzIeLmNhY3RpLnNhdHAudjAyLmNyYXNoLkxvY2FsTG9nEhgKEHNlbmRlcl9zaWduYXR1cmUYBSABKAkiqgEKFVJlY292ZXJTdWNjZXNzTWVzc2FnZRISCgpzZXNzaW9uX2lkGAEgASgJEhQKDG1lc3NhZ2VfdHlwZRgCIAEoCRIjChtoYXNoX3JlY292ZXJfdXBkYXRlX21lc3NhZ2UYAyABKAkSDwoHc3VjY2VzcxgEIAEoCBIXCg9lbnRyaWVzX2NoYW5nZWQYBSADKAkSGAoQc2VuZGVyX3NpZ25hdHVyZRgGIAEoCSJfCh1SZWNvdmVyU3VjY2Vzc01lc3NhZ2VSZXNwb25zZRISCgpzZXNzaW9uX2lkGAEgASgJEhAKCHJlY2VpdmVkGAIgASgIEhgKEHNlbmRlcl9zaWduYXR1cmUYAyABKAkikQEKD1JvbGxiYWNrTWVzc2FnZRISCgpzZXNzaW9uX2lkGAEgASgJEhQKDG1lc3NhZ2VfdHlwZRgCIAEoCRIPCgdzdWNjZXNzGAMgASgIEhkKEWFjdGlvbnNfcGVyZm9ybWVkGAQgAygJEg4KBnByb29mcxgFIAMoCRIYChBzZW5kZXJfc2lnbmF0dXJlGAYgASgJIpQBChJSb2xsYmFja0Fja01lc3NhZ2USEgoKc2Vzc2lvbl9pZBgBIAEoCRIUCgxtZXNzYWdlX3R5cGUYAiABKAkSDwoHc3VjY2VzcxgDIAEoCBIZChFhY3Rpb25zX3BlcmZvcm1lZBgEIAMoCRIOCgZwcm9vZnMYBSADKAkSGAoQc2VuZGVyX3NpZ25hdHVyZRgGIAEoCSKGAQoITG9jYWxMb2cSEgoKc2Vzc2lvbl9pZBgBIAEoCRIMCgR0eXBlGAIgASgJEgsKA2tleRgDIAEoCRIRCglvcGVyYXRpb24YBCABKAkSEQoJdGltZXN0YW1wGAUgASgJEgwKBGRhdGEYBiABKAkSFwoPc2VxdWVuY2VfbnVtYmVyGAcgASgFInkKEFJvbGxiYWNrTG9nRW50cnkSEgoKc2Vzc2lvbl9pZBgBIAEoCRINCgVzdGFnZRgCIAEoCRIRCgl0aW1lc3RhbXAYAyABKAkSDgoGYWN0aW9uGAQgASgJEg4KBnN0YXR1cxgFIAEoCRIPCgdkZXRhaWxzGAYgASgJIuABCg1Sb2xsYmFja1N0YXRlEhIKCnNlc3Npb25faWQYASABKAkSFQoNY3VycmVudF9zdGFnZRgCIAEoCRIXCg9zdGVwc19yZW1haW5pbmcYAyABKAUSRAoUcm9sbGJhY2tfbG9nX2VudHJpZXMYBCADKAsyJi5jYWN0aS5zYXRwLnYwMi5jcmFzaC5Sb2xsYmFja0xvZ0VudHJ5EiQKHGVzdGltYXRlZF90aW1lX3RvX2NvbXBsZXRpb24YBSABKAkSDgoGc3RhdHVzGAYgASgJEg8KB2RldGFpbHMYByABKAky2AIKDUNyYXNoUmVjb3ZlcnkSZAoQUmVjb3ZlclYyTWVzc2FnZRIkLmNhY3RpLnNhdHAudjAyLmNyYXNoLlJlY292ZXJNZXNzYWdlGiouY2FjdGkuc2F0cC52MDIuY3Jhc2guUmVjb3ZlclVwZGF0ZU1lc3NhZ2USewoXUmVjb3ZlclYyU3VjY2Vzc01lc3NhZ2USKy5jYWN0aS5zYXRwLnYwMi5jcmFzaC5SZWNvdmVyU3VjY2Vzc01lc3NhZ2UaMy5jYWN0aS5zYXRwLnYwMi5jcmFzaC5SZWNvdmVyU3VjY2Vzc01lc3NhZ2VSZXNwb25zZRJkChFSb2xsYmFja1YyTWVzc2FnZRIlLmNhY3RpLnNhdHAudjAyLmNyYXNoLlJvbGxiYWNrTWVzc2FnZRooLmNhY3RpLnNhdHAudjAyLmNyYXNoLlJvbGxiYWNrQWNrTWVzc2FnZWIGcHJvdG8z", [file_google_protobuf_empty]); + +/** + * @generated from message cacti.satp.v02.crash.RecoverMessage + */ +export type RecoverMessage = Message<"cacti.satp.v02.crash.RecoverMessage"> & { + /** + * @generated from field: string session_id = 1; + */ + sessionId: string; + + /** + * @generated from field: string message_type = 2; + */ + messageType: string; + + /** + * @generated from field: string satp_phase = 3; + */ + satpPhase: string; + + /** + * @generated from field: int32 sequence_number = 4; + */ + sequenceNumber: number; + + /** + * @generated from field: bool is_backup = 5; + */ + isBackup: boolean; + + /** + * @generated from field: string new_identity_public_key = 6; + */ + newIdentityPublicKey: string; + + /** + * @generated from field: int64 last_entry_timestamp = 7; + */ + lastEntryTimestamp: bigint; + + /** + * @generated from field: string sender_signature = 8; + */ + senderSignature: string; +}; + +/** + * Describes the message cacti.satp.v02.crash.RecoverMessage. + * Use `create(RecoverMessageSchema)` to create a new message. + */ +export const RecoverMessageSchema: GenMessage = /*@__PURE__*/ + messageDesc(file_cacti_satp_v02_crash_recovery, 0); + +/** + * @generated from message cacti.satp.v02.crash.RecoverUpdateMessage + */ +export type RecoverUpdateMessage = Message<"cacti.satp.v02.crash.RecoverUpdateMessage"> & { + /** + * @generated from field: string session_id = 1; + */ + sessionId: string; + + /** + * @generated from field: string message_type = 2; + */ + messageType: string; + + /** + * @generated from field: string hash_recover_message = 3; + */ + hashRecoverMessage: string; + + /** + * @generated from field: repeated cacti.satp.v02.crash.LocalLog recovered_logs = 4; + */ + recoveredLogs: LocalLog[]; + + /** + * @generated from field: string sender_signature = 5; + */ + senderSignature: string; +}; + +/** + * Describes the message cacti.satp.v02.crash.RecoverUpdateMessage. + * Use `create(RecoverUpdateMessageSchema)` to create a new message. + */ +export const RecoverUpdateMessageSchema: GenMessage = /*@__PURE__*/ + messageDesc(file_cacti_satp_v02_crash_recovery, 1); + +/** + * @generated from message cacti.satp.v02.crash.RecoverSuccessMessage + */ +export type RecoverSuccessMessage = Message<"cacti.satp.v02.crash.RecoverSuccessMessage"> & { + /** + * @generated from field: string session_id = 1; + */ + sessionId: string; + + /** + * @generated from field: string message_type = 2; + */ + messageType: string; + + /** + * @generated from field: string hash_recover_update_message = 3; + */ + hashRecoverUpdateMessage: string; + + /** + * @generated from field: bool success = 4; + */ + success: boolean; + + /** + * @generated from field: repeated string entries_changed = 5; + */ + entriesChanged: string[]; + + /** + * @generated from field: string sender_signature = 6; + */ + senderSignature: string; +}; + +/** + * Describes the message cacti.satp.v02.crash.RecoverSuccessMessage. + * Use `create(RecoverSuccessMessageSchema)` to create a new message. + */ +export const RecoverSuccessMessageSchema: GenMessage = /*@__PURE__*/ + messageDesc(file_cacti_satp_v02_crash_recovery, 2); + +/** + * @generated from message cacti.satp.v02.crash.RecoverSuccessMessageResponse + */ +export type RecoverSuccessMessageResponse = Message<"cacti.satp.v02.crash.RecoverSuccessMessageResponse"> & { + /** + * @generated from field: string session_id = 1; + */ + sessionId: string; + + /** + * @generated from field: bool received = 2; + */ + received: boolean; + + /** + * @generated from field: string sender_signature = 3; + */ + senderSignature: string; +}; + +/** + * Describes the message cacti.satp.v02.crash.RecoverSuccessMessageResponse. + * Use `create(RecoverSuccessMessageResponseSchema)` to create a new message. + */ +export const RecoverSuccessMessageResponseSchema: GenMessage = /*@__PURE__*/ + messageDesc(file_cacti_satp_v02_crash_recovery, 3); + +/** + * @generated from message cacti.satp.v02.crash.RollbackMessage + */ +export type RollbackMessage = Message<"cacti.satp.v02.crash.RollbackMessage"> & { + /** + * @generated from field: string session_id = 1; + */ + sessionId: string; + + /** + * @generated from field: string message_type = 2; + */ + messageType: string; + + /** + * @generated from field: bool success = 3; + */ + success: boolean; + + /** + * @generated from field: repeated string actions_performed = 4; + */ + actionsPerformed: string[]; + + /** + * @generated from field: repeated string proofs = 5; + */ + proofs: string[]; + + /** + * @generated from field: string sender_signature = 6; + */ + senderSignature: string; +}; + +/** + * Describes the message cacti.satp.v02.crash.RollbackMessage. + * Use `create(RollbackMessageSchema)` to create a new message. + */ +export const RollbackMessageSchema: GenMessage = /*@__PURE__*/ + messageDesc(file_cacti_satp_v02_crash_recovery, 4); + +/** + * @generated from message cacti.satp.v02.crash.RollbackAckMessage + */ +export type RollbackAckMessage = Message<"cacti.satp.v02.crash.RollbackAckMessage"> & { + /** + * @generated from field: string session_id = 1; + */ + sessionId: string; + + /** + * @generated from field: string message_type = 2; + */ + messageType: string; + + /** + * @generated from field: bool success = 3; + */ + success: boolean; + + /** + * @generated from field: repeated string actions_performed = 4; + */ + actionsPerformed: string[]; + + /** + * @generated from field: repeated string proofs = 5; + */ + proofs: string[]; + + /** + * @generated from field: string sender_signature = 6; + */ + senderSignature: string; +}; + +/** + * Describes the message cacti.satp.v02.crash.RollbackAckMessage. + * Use `create(RollbackAckMessageSchema)` to create a new message. + */ +export const RollbackAckMessageSchema: GenMessage = /*@__PURE__*/ + messageDesc(file_cacti_satp_v02_crash_recovery, 5); + +/** + * @generated from message cacti.satp.v02.crash.LocalLog + */ +export type LocalLog = Message<"cacti.satp.v02.crash.LocalLog"> & { + /** + * @generated from field: string session_id = 1; + */ + sessionId: string; + + /** + * @generated from field: string type = 2; + */ + type: string; + + /** + * @generated from field: string key = 3; + */ + key: string; + + /** + * @generated from field: string operation = 4; + */ + operation: string; + + /** + * @generated from field: string timestamp = 5; + */ + timestamp: string; + + /** + * @generated from field: string data = 6; + */ + data: string; + + /** + * @generated from field: int32 sequence_number = 7; + */ + sequenceNumber: number; +}; + +/** + * Describes the message cacti.satp.v02.crash.LocalLog. + * Use `create(LocalLogSchema)` to create a new message. + */ +export const LocalLogSchema: GenMessage = /*@__PURE__*/ + messageDesc(file_cacti_satp_v02_crash_recovery, 6); + +/** + * @generated from message cacti.satp.v02.crash.RollbackLogEntry + */ +export type RollbackLogEntry = Message<"cacti.satp.v02.crash.RollbackLogEntry"> & { + /** + * @generated from field: string session_id = 1; + */ + sessionId: string; + + /** + * @generated from field: string stage = 2; + */ + stage: string; + + /** + * @generated from field: string timestamp = 3; + */ + timestamp: string; + + /** + * action performed during rollback + * + * @generated from field: string action = 4; + */ + action: string; + + /** + * status of rollback (e.g., SUCCESS, FAILED) + * + * @generated from field: string status = 5; + */ + status: string; + + /** + * Additional details or metadata about the rollback + * + * @generated from field: string details = 6; + */ + details: string; +}; + +/** + * Describes the message cacti.satp.v02.crash.RollbackLogEntry. + * Use `create(RollbackLogEntrySchema)` to create a new message. + */ +export const RollbackLogEntrySchema: GenMessage = /*@__PURE__*/ + messageDesc(file_cacti_satp_v02_crash_recovery, 7); + +/** + * @generated from message cacti.satp.v02.crash.RollbackState + */ +export type RollbackState = Message<"cacti.satp.v02.crash.RollbackState"> & { + /** + * @generated from field: string session_id = 1; + */ + sessionId: string; + + /** + * @generated from field: string current_stage = 2; + */ + currentStage: string; + + /** + * @generated from field: int32 steps_remaining = 3; + */ + stepsRemaining: number; + + /** + * @generated from field: repeated cacti.satp.v02.crash.RollbackLogEntry rollback_log_entries = 4; + */ + rollbackLogEntries: RollbackLogEntry[]; + + /** + * @generated from field: string estimated_time_to_completion = 5; + */ + estimatedTimeToCompletion: string; + + /** + * Overall status (e.g., IN_PROGRESS, COMPLETED, FAILED) + * + * @generated from field: string status = 6; + */ + status: string; + + /** + * Additional metadata or information + * + * @generated from field: string details = 7; + */ + details: string; +}; + +/** + * Describes the message cacti.satp.v02.crash.RollbackState. + * Use `create(RollbackStateSchema)` to create a new message. + */ +export const RollbackStateSchema: GenMessage = /*@__PURE__*/ + messageDesc(file_cacti_satp_v02_crash_recovery, 8); /** - * TODO: Rollback and crash-recovery related - * * util RPCs * * @generated from service cacti.satp.v02.crash.CrashRecovery */ export const CrashRecovery: GenService<{ + /** + * step RPCs + * + * @generated from rpc cacti.satp.v02.crash.CrashRecovery.RecoverV2Message + */ + recoverV2Message: { + methodKind: "unary"; + input: typeof RecoverMessageSchema; + output: typeof RecoverUpdateMessageSchema; + }, + /** + * @generated from rpc cacti.satp.v02.crash.CrashRecovery.RecoverV2SuccessMessage + */ + recoverV2SuccessMessage: { + methodKind: "unary"; + input: typeof RecoverSuccessMessageSchema; + output: typeof RecoverSuccessMessageResponseSchema; + }, + /** + * @generated from rpc cacti.satp.v02.crash.CrashRecovery.RollbackV2Message + */ + rollbackV2Message: { + methodKind: "unary"; + input: typeof RollbackMessageSchema; + output: typeof RollbackAckMessageSchema; + }, }> = /*@__PURE__*/ serviceDesc(file_cacti_satp_v02_crash_recovery, 0); diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/gol/crash-manager.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/gol/crash-manager.ts new file mode 100644 index 0000000000..2ddf881e7f --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/gol/crash-manager.ts @@ -0,0 +1,621 @@ +import { + Logger, + LoggerProvider, + Checks, + LogLevelDesc, + JsObjectSigner, +} from "@hyperledger/cactus-common"; +import { + SessionData, + State, +} from "../generated/proto/cacti/satp/v02/common/session_pb"; +import { CrashRecoveryHandler } from "../core/crash-management/crash-handler"; +import { SATPSession } from "../core/satp-session"; +import { + RollbackStrategy, + RollbackStrategyFactory, +} from "../core/crash-management/rollback/rollback-strategy-factory"; +import { + ILocalLogRepository, + IRemoteLogRepository, +} from "../repository/interfaces/repository"; +import { + RecoverUpdateMessage, + RollbackState, + RollbackAckMessage, +} from "../generated/proto/cacti/satp/v02/crash_recovery_pb"; +import { SessionType } from "../core/session-utils"; +import { SATPBridgesManager } from "./satp-bridges-manager"; +import cron, { ScheduledTask } from "node-cron"; +import { CrashRecoveryServerService } from "../core/crash-management/server-service"; +import { CrashRecoveryClientService } from "../core/crash-management/client-service"; +import { GatewayOrchestrator } from "./gateway-orchestrator"; +import { Client as PromiseConnectClient } from "@connectrpc/connect"; +import { GatewayIdentity, SupportedChain } from "../core/types"; +import { CrashRecovery } from "../generated/proto/cacti/satp/v02/crash_recovery_pb"; +import { SATPHandler } from "../types/satp-protocol"; +import { ISATPLoggerConfig, SATPLogger } from "../logging"; +import { CrashStatus } from "../core/types"; + +export interface ICrashRecoveryManagerOptions { + logLevel?: LogLevelDesc; + localRepository: ILocalLogRepository; + remoteRepository: IRemoteLogRepository; + instanceId: string; + bridgeConfig: SATPBridgesManager; + orchestrator: GatewayOrchestrator; + signer: JsObjectSigner; + pubKey: string; +} + +export class CrashRecoveryManager { + public static readonly CLASS_NAME = "CrashRecoveryManager"; + private readonly log: Logger; + private readonly instanceId: string; + private sessions: Map; + private crashRecoveryHandler: CrashRecoveryHandler; + private factory: RollbackStrategyFactory; + public localRepository: ILocalLogRepository; + public remoteRepository: IRemoteLogRepository; + private crashDetectionTask!: ScheduledTask; + private crashRecoveryServerService: CrashRecoveryServerService; + private crashRecoveryClientService: CrashRecoveryClientService; + private orchestrator: GatewayOrchestrator; + private gatewaysPubKeys: Map = new Map(); + private readonly bridgesManager: SATPBridgesManager; + public dbLogger: SATPLogger; + private signer: JsObjectSigner; + private _pubKey: string; + + constructor(public readonly options: ICrashRecoveryManagerOptions) { + const fnTag = `${CrashRecoveryManager.CLASS_NAME}#constructor()`; + Checks.truthy(options, `${fnTag} arg options`); + + const level = this.options.logLevel || "DEBUG"; + const label = this.className; + this.log = LoggerProvider.getOrCreate({ level, label }); + this.instanceId = options.instanceId; + this.sessions = new Map(); + this.log.info(`Instantiated ${this.className} OK`); + this.localRepository = options.localRepository; + this.remoteRepository = options.remoteRepository; + this._pubKey = options.pubKey; + this.signer = options.signer; + this.orchestrator = options.orchestrator; + this.bridgesManager = options.bridgeConfig; + this.loadPubKeys(this.orchestrator.getCounterPartyGateways()); + + const satpLoggerConfig: ISATPLoggerConfig = { + localRepository: this.localRepository, + remoteRepository: this.remoteRepository, + signer: this.signer, + pubKey: this._pubKey, + }; + + this.dbLogger = new SATPLogger(satpLoggerConfig); + this.log.debug( + `${fnTag}SATPManager dbLogger initialized: ${!!this.dbLogger}`, + ); + + this.factory = new RollbackStrategyFactory( + this.bridgesManager, + this.localRepository, + //this.dbLogger, + ); + this.crashRecoveryServerService = new CrashRecoveryServerService( + this.bridgesManager, + this.localRepository, + this.sessions, + this.dbLogger, + ); + + this.crashRecoveryClientService = new CrashRecoveryClientService( + this.localRepository, + this.sessions, + this.dbLogger, + ); + + this.crashRecoveryHandler = new CrashRecoveryHandler( + this.crashRecoveryServerService, + this.crashRecoveryClientService, + ); + this.crashRecoveryHandler = new CrashRecoveryHandler( + this.crashRecoveryServerService, + this.crashRecoveryClientService, + ); + + const crashRecoveryHandlers = new Map(); + crashRecoveryHandlers.set("crash-handler", this.crashRecoveryHandler); + this.orchestrator.addHandlers(crashRecoveryHandlers); + } + + get className(): string { + return CrashRecoveryManager.CLASS_NAME; + } + + public async recoverSessions() { + const fnTag = `${this.className}#recoverSessions()`; + + try { + const allLogs = await this.localRepository.readLogsNotProofs(); + for (const log of allLogs) { + const sessionId = log.sessionId; + this.log.info( + `${fnTag}, recovering session from database: ${sessionId}`, + ); + + if (!log || !log.data) { + throw new Error(`${fnTag}, invalid log`); + } + + try { + const sessionData: SessionData = JSON.parse(log.data); + const satpSession = SATPSession.fromSessionData(sessionData); + this.sessions.set(sessionId, satpSession); + } catch (error) { + this.log.error( + `Error parsing log data for session Id: ${sessionId}: ${error}`, + ); + } + } + this.detectCrash(); + } catch (error) { + this.log.error(`Error initializing sessions: ${error}`); + } + } + + private detectCrash() { + const fnTag = `${this.className}#detectCrash()`; + + if (this.sessions.size === 0) { + this.log.warn( + `${fnTag} No active sessions. skipping cron job scheduling.`, + ); + return; + } + + this.crashDetectionTask = cron.schedule("*/15 * * * * *", async () => { + this.log.debug(`${fnTag} Running crash detection cron job.`); + await this.checkAndResolveCrashes(); + + // stop the cron job if all sessions are resolved + if (this.sessions.size === 0) { + this.log.info(`${fnTag} all sessions resolved. Stopping cron job.`); + this.stopCrashDetection(); + } + }); + + this.log.info(`${fnTag} crash detection cron job scheduled.`); + } + + public stopCrashDetection() { + if (this.crashDetectionTask) { + this.crashDetectionTask.stop(); + this.log.info(`${this.className}#stopCrashDetection() Cron job stopped.`); + } + } + + public async checkAndResolveCrashes(): Promise { + const fnTag = `${this.className}#checkAndResolveCrashes()`; + + for (const [sessionId, session] of this.sessions.entries()) { + await this.checkAndResolveCrash(session); + + const sessionData = session.hasClientSessionData() + ? session.getClientSessionData() + : session.getServerSessionData(); + + // remove resolved sessions + if (sessionData.state === State.COMPLETED) { + this.sessions.delete(sessionId); + this.log.info(`${fnTag} session ${sessionId} resolved and removed.`); + } + } + } + + public async checkAndResolveCrash(session: SATPSession): Promise { + const fnTag = `${this.className}#checkAndResolveCrash()`; + + const sessionData = session.hasClientSessionData() + ? session.getClientSessionData() + : session.getServerSessionData(); + + if (!sessionData) { + throw new Error(`${fnTag}, session data is not correctly initialized`); + } + + try { + let attempts = 0; + const maxRetries = Number(sessionData.maxRetries); + + while (attempts < maxRetries) { + const crashStatus = await this.checkCrash(session); + + if (crashStatus === CrashStatus.IN_RECOVERY) { + this.log.info(`${fnTag} Crash detected! Attempting recovery`); + + const recoverySuccess = await this.handleRecovery(session); + if (recoverySuccess) { + this.log.info( + `${fnTag} Recovery successful for sessionID: ${session.getSessionId()}`, + ); + return; + } else { + attempts++; + this.log.info( + `${fnTag} Recovery attempt ${attempts} failed for sessionID: ${session.getSessionId()}`, + ); + } + } else if (crashStatus === CrashStatus.ROLLBACK) { + this.log.warn( + `${fnTag} Crash requires rollback. Initiating rollback.`, + ); + await this.initiateRollback(session, true); + return; // Exit after rollback + } else if (crashStatus === CrashStatus.NO_CRASH) { + this.log.info( + `${fnTag} No crash detected for session ID: ${session.getSessionId()}`, + ); + return; // Exit if no crash + } else { + this.log.warn(`${fnTag} Unexpected crash status: ${crashStatus}`); + return; + } + } + + this.log.warn( + `${fnTag} All recovery attempts exhausted. Initiating rollback.`, + ); + await this.initiateRollback(session, true); + } catch (error) { + this.log.error(`${fnTag} Error during crash resolution: ${error}`); + } + } + + private async checkCrash(session: SATPSession): Promise { + const fnTag = `${this.className}#checkCrash()`; + const sessionData = session.hasClientSessionData() + ? session.getClientSessionData() + : session.getServerSessionData(); + + try { + session.verify( + fnTag, + session.hasClientSessionData() + ? SessionType.CLIENT + : SessionType.SERVER, + ); + + const lastLog = await this.localRepository.readLastestLog( + session.getSessionId(), + ); + + if (lastLog && lastLog.operation !== "done") { + this.log.debug( + `${fnTag} Crash detected for session ID: ${session.getSessionId()}, last log operation: ${lastLog.operation}`, + ); + return CrashStatus.IN_RECOVERY; + } + + const logTimestamp = new Date(lastLog?.timestamp ?? 0).getTime(); + const currentTime = new Date().getTime(); + const timeDifference = currentTime - logTimestamp; + + if (timeDifference > Number(sessionData.maxTimeout)) { + this.log.warn( + `${fnTag} Timeout exceeded by ${timeDifference} ms for session ID: ${session.getSessionId()}`, + ); + return CrashStatus.ROLLBACK; + } + + return CrashStatus.NO_CRASH; + } catch (error) { + this.log.error(`${fnTag} Error occurred during crash check: ${error}`); + return CrashStatus.ERROR; + } + } + + public async handleRecovery(session: SATPSession): Promise { + const fnTag = `${this.className}#handleRecovery()`; + this.log.debug( + `${fnTag} - Starting crash recovery for sessionId: ${session.getSessionId()}`, + ); + + try { + const channel = this.orchestrator.getChannel( + session.getClientSessionData() + .recipientGatewayNetworkId as SupportedChain, + ); + + if (!channel) { + throw new Error( + `${fnTag} - Channel not found for the recipient gateway network ID.`, + ); + } + + const counterGatewayID = this.orchestrator.getGatewayIdentity( + channel.toGatewayID, + ); + if (!counterGatewayID) { + throw new Error(`${fnTag} - Counterparty gateway ID not found.`); + } + + const clientCrashRecovery: PromiseConnectClient = + channel.clients.get("crash") as PromiseConnectClient< + typeof CrashRecovery + >; + + if (!clientCrashRecovery) { + throw new Error(`${fnTag} - Failed to get clientCrashRecovery.`); + } + + const recoverMessage = + await this.crashRecoveryHandler.createRecoverMessage(session); + + const recoverUpdateMessage = + await clientCrashRecovery.recoverV2Message(recoverMessage); + + const sequenceNumbers = recoverUpdateMessage.recoveredLogs.map( + (log) => log.sequenceNumber, + ); + this.log.info( + `${fnTag} - Received logs sequence numbers: ${sequenceNumbers}`, + ); + + await this.processRecoverUpdateMessage(recoverUpdateMessage); + + const recoverSuccessMessage = + await this.crashRecoveryHandler.createRecoverSuccessMessage(session); + + await clientCrashRecovery.recoverV2SuccessMessage(recoverSuccessMessage); + + this.log.info( + `${fnTag} - Crash recovery completed for sessionId: ${session.getSessionId()}`, + ); + + return true; + } catch (error) { + this.log.error( + `${fnTag} Error during recovery process for session ID: ${session.getSessionId()} - ${error}`, + ); + throw new Error( + `Recovery failed for session ID: ${session.getSessionId()}`, + ); + } + } + + private async processRecoverUpdateMessage( + message: RecoverUpdateMessage, + ): Promise { + const fnTag = `${this.className}#processRecoverUpdate()`; + try { + const sessionId = message.sessionId; + const recoveredLogs = message.recoveredLogs; + + if (!recoveredLogs || recoveredLogs.length === 0) { + this.log.warn(`${fnTag} No recovered logs to process.`); + return true; + } + + for (const logEntry of recoveredLogs) { + console.log("data receveived: ", logEntry); + await this.localRepository.create(logEntry); + } + + for (const log of recoveredLogs) { + const sessionId = log.sessionId; + this.log.info(`${fnTag}, recovering session: ${sessionId}`); + + if (!log || !log.data) { + throw new Error(`${fnTag}, invalid log`); + } + + try { + const sessionData: SessionData = JSON.parse(log.data); + const satpSession = SATPSession.fromSessionData(sessionData); + this.sessions.set(sessionId, satpSession); + } catch (error) { + this.log.error( + `Error parsing log data for session Id: ${sessionId}: ${error}`, + ); + } + } + this.log.info( + `${fnTag} Session data successfully reconstructed for session ID: ${sessionId}`, + ); + + return true; + } catch (error) { + this.log.error( + `${fnTag} Error processing RecoverUpdateMessage: ${error}`, + ); + return false; + } + } + + public async initiateRollback( + session: SATPSession, + forceRollback?: boolean, + ): Promise { + const fnTag = `CrashRecoveryManager#initiateRollback()`; + + const sessionData = session.hasClientSessionData() + ? session.getClientSessionData() + : session.getServerSessionData(); + if (!sessionData) { + throw new Error(`${fnTag}, session data is not correctly initialized`); + } + this.log.info( + `${fnTag} Initiating rollback for session ${session.getSessionId()}`, + ); + + try { + if (forceRollback) { + const strategy = this.factory.createStrategy(session); + const rollbackState = await this.executeRollback(strategy, session); + + if (rollbackState) { + const cleanupSuccess = await this.performCleanup( + strategy, + session, + rollbackState, + ); + + const rollbackSuccess = await this.sendRollbackMessage( + session, + rollbackState, + ); + return cleanupSuccess && rollbackSuccess; + } else { + this.log.error( + `${fnTag} Rollback execution failed for session ${session.getSessionId()}`, + ); + return false; + } + } else { + this.log.info( + `${fnTag} Rollback not needed for session ${session.getSessionId()}`, + ); + return true; + } + } catch (error) { + this.log.error(`${fnTag} Error during rollback initiation: ${error}`); + return false; + } + } + + private async executeRollback( + strategy: RollbackStrategy, + session: SATPSession, + ): Promise { + const fnTag = `CrashRecoveryManager#executeRollback`; + this.log.debug( + `${fnTag} Executing rollback strategy for session ${session.getSessionId()}`, + ); + + try { + return await strategy.execute(session); + } catch (error) { + this.log.error(`${fnTag} Error executing rollback strategy: ${error}`); + return undefined; + } + } + + private async sendRollbackMessage( + session: SATPSession, + rollbackState: RollbackState, + ): Promise { + const fnTag = `${this.className}#sendRollbackMessage()`; + this.log.debug( + `${fnTag} - Starting to send RollbackMessage for sessionId: ${session.getSessionId()}`, + ); + + try { + const channel = this.orchestrator.getChannel( + session.getClientSessionData() + .recipientGatewayNetworkId as SupportedChain, + ); + + if (!channel) { + throw new Error( + `${fnTag} - Channel not found for the recipient gateway network ID.`, + ); + } + + const counterGatewayID = this.orchestrator.getGatewayIdentity( + channel.toGatewayID, + ); + if (!counterGatewayID) { + throw new Error(`${fnTag} - Counterparty gateway ID not found.`); + } + + const clientCrashRecovery: PromiseConnectClient = + channel.clients.get("4") as PromiseConnectClient; + + if (!clientCrashRecovery) { + throw new Error(`${fnTag} - Failed to get clientCrashRecovery.`); + } + + const rollbackMessage = + await this.crashRecoveryHandler.createRollbackMessage( + session, + rollbackState, + ); + + const rollbackAckMessage = + await clientCrashRecovery.rollbackV2Message(rollbackMessage); + + this.log.info( + `${fnTag} - Received RollbackAckMessage: ${JSON.stringify(rollbackAckMessage)}`, + ); + + const success = await this.processRollbackAckMessage(rollbackAckMessage); + + return success; + } catch (error) { + this.log.error( + `${fnTag} Error during rollback message sending: ${error}`, + ); + return false; + } + } + + private async processRollbackAckMessage( + message: RollbackAckMessage, + ): Promise { + const fnTag = `${this.className}#processRollbackAckMessage()`; + try { + if (message.success) { + this.log.info( + `${fnTag} Rollback acknowledged by the counterparty for session ID: ${message.sessionId}`, + ); + return true; + } else { + this.log.warn( + `${fnTag} Rollback failed at counterparty for session ID: ${message.sessionId}`, + ); + return false; + } + } catch (error) { + this.log.error(`${fnTag} Error processing RollbackAckMessage: ${error}`); + return false; + } + } + + private async performCleanup( + strategy: RollbackStrategy, + session: SATPSession, + state: RollbackState, + ): Promise { + const fnTag = `CrashRecoveryManager#performCleanup`; + this.log.debug( + `${fnTag} Performing cleanup after rollback for session ${session.getSessionId()}`, + ); + + try { + const updatedState = await strategy.cleanup(session, state); + + // TODO: Handle the updated state, perhaps update session data or perform additional actions + this.log.info( + `${fnTag} Cleanup completed. Updated state: ${JSON.stringify(updatedState)}`, + ); + + return true; + } catch (error) { + this.log.error(`${fnTag} Error during cleanup: ${error}`); + return false; + } + } + + private loadPubKeys(gateways: Map): void { + gateways.forEach((gateway) => { + if (gateway.pubKey) { + this.gatewaysPubKeys.set(gateway.id, gateway.pubKey); + } + }); + this.gatewaysPubKeys.set( + this.orchestrator.getSelfId(), + this.orchestrator.ourGateway.pubKey!, + ); + } +} diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/gol/gateway-orchestrator.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/gol/gateway-orchestrator.ts index e266764964..b2f133f5d5 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/gol/gateway-orchestrator.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/gol/gateway-orchestrator.ts @@ -26,6 +26,7 @@ import { SatpStage0Service } from "../generated/proto/cacti/satp/v02/stage_0_pb" import { SatpStage1Service } from "../generated/proto/cacti/satp/v02/stage_1_pb"; import { SatpStage2Service } from "../generated/proto/cacti/satp/v02/stage_2_pb"; import { SatpStage3Service } from "../generated/proto/cacti/satp/v02/stage_3_pb"; +import { CrashRecovery } from "../generated/proto/cacti/satp/v02/crash_recovery_pb"; export interface IGatewayOrchestratorOptions { logLevel?: LogLevelDesc; @@ -327,12 +328,19 @@ export class GatewayOrchestrator { httpVersion: "1.1", }); + const transport4 = createGrpcWebTransport({ + baseUrl: + identity.address + ":" + identity.gatewayServerPort + `/${"crash"}`, + httpVersion: "1.1", + }); + const clients: Map> = new Map(); clients.set("0", this.createStage0ServiceClient(transport0)); clients.set("1", this.createStage1ServiceClient(transport1)); clients.set("2", this.createStage2ServiceClient(transport2)); clients.set("3", this.createStage3ServiceClient(transport3)); + clients.set("crash", this.createCrashServiceClient(transport4)); // todo perform healthcheck on startup; should be in stage 0 return clients; @@ -382,6 +390,17 @@ export class GatewayOrchestrator { return client; } + private createCrashServiceClient( + transport: ConnectTransport, + ): ConnectClient { + this.logger.debug( + "Creating crash-manager client, with transport: ", + transport, + ); + const client = createClient(CrashRecovery, transport); + return client; + } + public async resolveAndAddGateways(IDs: string[]): Promise { const fnTag = `${this.label}#addGateways()`; this.logger.trace(`Entering ${fnTag}`); diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/gol/satp-manager.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/gol/satp-manager.ts index 4595c63172..957b756797 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/gol/satp-manager.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/gol/satp-manager.ts @@ -157,7 +157,9 @@ export class SATPManager { }; this.dbLogger = new SATPLogger(satpLoggerConfig); - this.logger.debug(`SATPManager dbLogger initialized: ${!!this.dbLogger}`); + this.logger.debug( + `${fnTag}SATPManager dbLogger initialized: ${!!this.dbLogger}`, + ); const serviceClasses = [ Stage0ServerService as unknown as SATPServiceInstance, diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/logging.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/logging.ts index 9f024f015c..d7dcf6f65e 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/logging.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/logging.ts @@ -67,7 +67,7 @@ export class SATPLogger { logEntry.operation, ); const localLog: LocalLog = { - sessionID: logEntry.sessionID, + sessionId: logEntry.sessionID, type: logEntry.type, key: key, timestamp: Date.now().toString(), @@ -96,7 +96,7 @@ export class SATPLogger { logEntry.operation, ); const localLog: LocalLog = { - sessionID: logEntry.sessionID, + sessionId: logEntry.sessionID, type: logEntry.type, key: key, timestamp: Date.now().toString(), @@ -116,7 +116,7 @@ export class SATPLogger { private getHash(logEntry: LocalLog): string { const fnTag = `SATPLogger#getHash()`; this.log.debug( - `${fnTag} - generating hash for log entry with sessionID: ${logEntry.sessionID}`, + `${fnTag} - generating hash for log entry with sessionID: ${logEntry.sessionId}`, ); return SHA256( diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/plugin-satp-hermes-gateway.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/plugin-satp-hermes-gateway.ts index 4b2cda6d96..b13b14bf95 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/plugin-satp-hermes-gateway.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/plugin-satp-hermes-gateway.ts @@ -57,6 +57,10 @@ import { SATPBridgesManager, } from "./gol/satp-bridges-manager"; import bodyParser from "body-parser"; +import { + CrashRecoveryManager, + ICrashRecoveryManagerOptions, +} from "./gol/crash-manager"; import cors from "cors"; import * as OAS from "../json/openapi-blo-bundled.json"; @@ -97,6 +101,7 @@ export class SATPGateway implements IPluginWebService, ICactusPlugin { public localRepository?: ILocalLogRepository; public remoteRepository?: IRemoteLogRepository; private readonly shutdownHooks: ShutdownHook[]; + private readonly crashManager: CrashRecoveryManager; constructor(public readonly options: SATPGatewayConfig) { const fnTag = `${this.className}#constructor()`; @@ -181,6 +186,19 @@ export class SATPGateway implements IPluginWebService, ICactusPlugin { this.OAPIServerEnabled = this.config.enableOpenAPI ?? true; this.OAS = OAS; + + // After setup, initialize crash manager and check if we crashed; + const crashOptions: ICrashRecoveryManagerOptions = { + instanceId: this.instanceId, + logLevel: this.config.logLevel, + bridgeConfig: this.bridgesManager, + orchestrator: this.gatewayOrchestrator, + localRepository: this.localRepository, + remoteRepository: this.remoteRepository, + signer: this.signer, + pubKey: this.pubKey, + }; + this.crashManager = new CrashRecoveryManager(crashOptions); } /* ICactus Plugin methods */ diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/types/satp-protocol.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/types/satp-protocol.ts index 624a7326c1..c0b5ad025e 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/types/satp-protocol.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/types/satp-protocol.ts @@ -12,6 +12,7 @@ import { Stage0SATPHandler } from "../core/stage-handlers/stage0-handler"; import { Stage1SATPHandler } from "../core/stage-handlers/stage1-handler"; import { Stage2SATPHandler } from "../core/stage-handlers/stage2-handler"; import { Stage3SATPHandler } from "../core/stage-handlers/stage3-handler"; +import { CrashRecoveryHandler } from "../core/crash-management/crash-handler"; /** * Represents a handler for various stages of the SATP (Secure Asset Transfer Protocol). @@ -24,6 +25,7 @@ export enum SATPHandlerType { STAGE1 = "stage-1-handler", STAGE2 = "stage-2-handler", STAGE3 = "stage-3-handler", + CRASH = "crash-handler", } export enum Stage { @@ -52,7 +54,8 @@ export type SATPHandlerInstance = | (typeof Stage0SATPHandler & ISATPHandler) | (typeof Stage1SATPHandler & ISATPHandler) | (typeof Stage2SATPHandler & ISATPHandler) - | (typeof Stage3SATPHandler & ISATPHandler); + | (typeof Stage3SATPHandler & ISATPHandler) + | (typeof CrashRecoveryHandler & ISATPHandler); export interface SATPHandler { setupRouter(router: ConnectRouter): void; diff --git a/packages/cactus-plugin-satp-hermes/src/test/typescript/integration/satp-e2e-transfer-2-gateways.test.ts b/packages/cactus-plugin-satp-hermes/src/test/typescript/integration/satp-e2e-transfer-2-gateways.test.ts index 877d314100..26431a64d1 100644 --- a/packages/cactus-plugin-satp-hermes/src/test/typescript/integration/satp-e2e-transfer-2-gateways.test.ts +++ b/packages/cactus-plugin-satp-hermes/src/test/typescript/integration/satp-e2e-transfer-2-gateways.test.ts @@ -203,7 +203,6 @@ describe("2 SATPGateway sending a token from Besu to Fabric", () => { }; gateway1 = await factory.create(options1); expect(gateway1).toBeInstanceOf(SATPGateway); - const identity1 = gateway1.Identity; // default servers expect(identity1.gatewayServerPort).toBe(3010); diff --git a/packages/cactus-plugin-satp-hermes/src/test/typescript/knex.config.ts b/packages/cactus-plugin-satp-hermes/src/test/typescript/knex.config.ts index c32d533a4b..9e4f6aace2 100644 --- a/packages/cactus-plugin-satp-hermes/src/test/typescript/knex.config.ts +++ b/packages/cactus-plugin-satp-hermes/src/test/typescript/knex.config.ts @@ -28,7 +28,21 @@ export const knexServerConnection = { useNullAsDefault: true, }; -export const knexRemoteConnection = { +export const knexRemoteConnection1 = { + client: "sqlite3", + connection: { + filename: + "./packages/cactus-plugin-satp-hermes/src/knex/.dev.remote-" + + uuidv4() + + ".sqlite3", + }, + migrations: { + directory: "./packages/cactus-plugin-satp-hermes/src/knex/migrations", + }, + useNullAsDefault: true, +}; + +export const knexRemoteConnection2 = { client: "sqlite3", connection: { filename: diff --git a/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/crashmanager/cron.test.ts b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/crashmanager/cron.test.ts new file mode 100644 index 0000000000..e72fc535c9 --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/crashmanager/cron.test.ts @@ -0,0 +1,247 @@ +import "jest-extended"; +import { CrashRecoveryManager } from "../../../../main/typescript/gol/crash-manager"; +import { + LogLevelDesc, + Secp256k1Keys, + JsObjectSigner, + IJsObjectSignerOptions, +} from "@hyperledger/cactus-common"; +import { ICrashRecoveryManagerOptions } from "../../../../main/typescript/gol/crash-manager"; +import { Knex, knex } from "knex"; +import { + LocalLog, + SupportedChain, + GatewayIdentity, + Address, +} from "../../../../main/typescript/core/types"; +import { + AssetSchema, + CredentialProfile, + LockType, + SignatureAlgorithm, +} from "../../../../main/typescript/generated/proto/cacti/satp/v02/common/message_pb"; +import { v4 as uuidv4 } from "uuid"; +import { SATP_VERSION } from "../../../../main/typescript/core/constants"; +import { SATPSession } from "../../../../main/typescript/core/satp-session"; +import { knexClientConnection, knexRemoteConnection1 } from "../../knex.config"; +import { + bufArray2HexStr, + getSatpLogKey, +} from "../../../../main/typescript/gateway-utils"; +import { TokenType } from "../../../../main/typescript/core/stage-services/satp-bridge/types/asset"; +import { + GatewayOrchestrator, + IGatewayOrchestratorOptions, +} from "../../../../main/typescript/gol/gateway-orchestrator"; +import { + ISATPBridgesOptions, + SATPBridgesManager, +} from "../../../../main/typescript/gol/satp-bridges-manager"; +import { create } from "@bufbuild/protobuf"; +import { KnexLocalLogRepository } from "../../../../main/typescript/repository/knex-local-log-repository"; +import { KnexRemoteLogRepository } from "../../../../main/typescript/repository/knex-remote-log-repository"; +import { + ILocalLogRepository, + IRemoteLogRepository, +} from "../../../../main/typescript/repository/interfaces/repository"; +import { + SATP_ARCHITETURE_VERSION, + SATP_CORE_VERSION, + SATP_CRASH_VERSION, +} from "../../../../main/typescript/core/constants"; +import { stringify as safeStableStringify } from "safe-stable-stringify"; + +let crashManager: CrashRecoveryManager; + +const createMockSession = ( + sessionId: string, + maxTimeout: string, + maxRetries: string, +) => { + const mockSession = new SATPSession({ + contextID: "MOCK_CONTEXT_ID", + server: false, + client: true, + }); + + const sessionData = mockSession.getClientSessionData(); + + sessionData.id = sessionId; + sessionData.maxTimeout = maxTimeout; + sessionData.maxRetries = maxRetries; + sessionData.version = SATP_VERSION; + const keyPairs = Secp256k1Keys.generateKeyPairsBuffer(); + sessionData.clientGatewayPubkey = Buffer.from(keyPairs.publicKey).toString( + "hex", + ); + sessionData.serverGatewayPubkey = sessionData.clientGatewayPubkey; + sessionData.originatorPubkey = "MOCK_ORIGINATOR_PUBKEY"; + sessionData.beneficiaryPubkey = "MOCK_BENEFICIARY_PUBKEY"; + sessionData.digitalAssetId = "MOCK_DIGITAL_ASSET_ID"; + sessionData.assetProfileId = "MOCK_ASSET_PROFILE_ID"; + sessionData.receiverGatewayOwnerId = "MOCK_RECEIVER_GATEWAY_OWNER_ID"; + sessionData.recipientGatewayNetworkId = SupportedChain.FABRIC; + sessionData.senderGatewayOwnerId = "MOCK_SENDER_GATEWAY_OWNER_ID"; + sessionData.senderGatewayNetworkId = SupportedChain.BESU; + sessionData.signatureAlgorithm = SignatureAlgorithm.RSA; + sessionData.lockType = LockType.FAUCET; + sessionData.lockExpirationTime = BigInt(1000); + sessionData.credentialProfile = CredentialProfile.X509; + sessionData.loggingProfile = "MOCK_LOGGING_PROFILE"; + sessionData.accessControlProfile = "MOCK_ACCESS_CONTROL_PROFILE"; + sessionData.resourceUrl = "MOCK_RESOURCE_URL"; + sessionData.lockAssertionExpiration = BigInt(99999); + sessionData.receiverContractOntology = "MOCK_RECEIVER_CONTRACT_ONTOLOGY"; + sessionData.senderContractOntology = "MOCK_SENDER_CONTRACT_ONTOLOGY"; + sessionData.sourceLedgerAssetId = "MOCK_SOURCE_LEDGER_ASSET_ID"; + sessionData.senderAsset = create(AssetSchema, { + tokenId: "MOCK_TOKEN_ID", + tokenType: TokenType.ERC20, + amount: BigInt(100), + owner: "MOCK_SENDER_ASSET_OWNER", + ontology: "MOCK_SENDER_ASSET_ONTOLOGY", + contractName: "MOCK_SENDER_ASSET_CONTRACT_NAME", + contractAddress: "MOCK_SENDER_ASSET_CONTRACT_ADDRESS", + }); + sessionData.receiverAsset = create(AssetSchema, { + tokenType: TokenType.ERC20, + amount: BigInt(100), + owner: "MOCK_RECEIVER_ASSET_OWNER", + ontology: "MOCK_RECEIVER_ASSET_ONTOLOGY", + contractName: "MOCK_RECEIVER_ASSET_CONTRACT_NAME", + mspId: "MOCK_RECEIVER_ASSET_MSP_ID", + channelName: "MOCK_CHANNEL_ID", + }); + + return mockSession; +}; + +let knexInstanceClient: Knex; +let knexInstanceRemote: Knex; +let localRepository: ILocalLogRepository; +let remoteRepository: IRemoteLogRepository; +beforeAll(async () => { + knexInstanceClient = knex(knexClientConnection); + await knexInstanceClient.migrate.latest(); + knexInstanceRemote = knex(knexRemoteConnection1); + await knexInstanceRemote.migrate.latest(); + + localRepository = new KnexLocalLogRepository(knexClientConnection); + remoteRepository = new KnexRemoteLogRepository(knexClientConnection); + + const keyPairs = Secp256k1Keys.generateKeyPairsBuffer(); + const signerOptions: IJsObjectSignerOptions = { + privateKey: bufArray2HexStr(keyPairs.privateKey), + logLevel: "debug", + }; + const signer = new JsObjectSigner(signerOptions); + + const gatewayIdentity = { + id: "mockID-1", + name: "CustomGateway", + version: [ + { + Core: SATP_CORE_VERSION, + Architecture: SATP_ARCHITETURE_VERSION, + Crash: SATP_CRASH_VERSION, + }, + ], + supportedDLTs: [SupportedChain.BESU], + proofID: "mockProofID10", + address: "http://localhost" as Address, + } as GatewayIdentity; + + const orchestratorOptions: IGatewayOrchestratorOptions = { + logLevel: "DEBUG", + localGateway: gatewayIdentity, + counterPartyGateways: [], + signer: signer, + }; + const gatewayOrchestrator = new GatewayOrchestrator(orchestratorOptions); + + const bridgesManagerOptions: ISATPBridgesOptions = { + logLevel: "DEBUG", + supportedDLTs: gatewayIdentity.supportedDLTs, + networks: [], + }; + const bridgesManager = new SATPBridgesManager(bridgesManagerOptions); + + const crashOptions: ICrashRecoveryManagerOptions = { + instanceId: "test-instance", + logLevel: "DEBUG" as LogLevelDesc, + bridgeConfig: bridgesManager, + orchestrator: gatewayOrchestrator, + localRepository: localRepository, + remoteRepository: remoteRepository, + signer: signer, + pubKey: bufArray2HexStr(keyPairs.publicKey), + }; + crashManager = new CrashRecoveryManager(crashOptions); +}); + +beforeEach(async () => { + crashManager["sessions"].clear(); +}); + +afterEach(async () => { + jest.clearAllMocks(); + jest.useRealTimers(); + crashManager["sessions"].clear(); +}); + +afterAll(async () => { + if (crashManager) { + crashManager.stopCrashDetection(); + crashManager.localRepository.destroy(); + crashManager.remoteRepository.destroy(); + } + if (knexInstanceClient || knexInstanceRemote) { + await knexInstanceClient.destroy(); + await knexInstanceRemote.destroy(); + } +}); + +describe("CrashRecoveryManager Tests", () => { + it("should trigger checkAndResolveCrashes via cron schedule every 15 seconds for 75 seconds", async () => { + jest.useFakeTimers(); + + const sessionId = uuidv4(); + const mockSession = createMockSession(sessionId, "10000", "3"); + const sessionData = mockSession.hasClientSessionData() + ? mockSession.getClientSessionData() + : mockSession.getServerSessionData(); + + const key = getSatpLogKey(sessionId, "type", "operation"); + const mockLogEntry: LocalLog = { + sessionId: sessionId, + type: "type", + key: key, + operation: "operation", + timestamp: new Date().toISOString(), + data: safeStableStringify(sessionData), + sequenceNumber: Number(sessionData.lastSequenceNumber), + }; + const mockLogRepository = crashManager["localRepository"]; + + await mockLogRepository.create(mockLogEntry); + + const mockCheckAndResolveCrash = jest + .spyOn(CrashRecoveryManager.prototype, "checkAndResolveCrash") + .mockImplementation(() => Promise.resolve()); + + await crashManager.recoverSessions(); + + for (let i = 1; i <= 5; i++) { + jest.advanceTimersByTime(15000); + await Promise.resolve(); + } + + expect(mockCheckAndResolveCrash).toHaveBeenCalledTimes(5); + expect(mockCheckAndResolveCrash).toHaveBeenCalledWith( + expect.any(SATPSession), + ); + + mockCheckAndResolveCrash.mockRestore(); + jest.useRealTimers(); + }); +}); diff --git a/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/crashmanager/recovery.test.ts b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/crashmanager/recovery.test.ts new file mode 100644 index 0000000000..da263ecedd --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/crashmanager/recovery.test.ts @@ -0,0 +1,280 @@ +import "jest-extended"; +import { Secp256k1Keys } from "@hyperledger/cactus-common"; +import { CrashRecoveryManager } from "../../../../main/typescript/gol/crash-manager"; +import { + LocalLog, + SupportedChain, + GatewayIdentity, + Address, +} from "../../../../main/typescript/core/types"; +import { + AssetSchema, + CredentialProfile, + LockType, + SignatureAlgorithm, +} from "../../../../main/typescript/generated/proto/cacti/satp/v02/common/message_pb"; +import { v4 as uuidv4 } from "uuid"; +import { SATP_VERSION } from "../../../../main/typescript/core/constants"; +import { SATPSession } from "../../../../main/typescript/core/satp-session"; +import { getSatpLogKey } from "../../../../main/typescript/gateway-utils"; +import { TokenType } from "../../../../main/typescript/core/stage-services/satp-bridge/types/asset"; +import { + SATPGatewayConfig, + PluginFactorySATPGateway, + SATPGateway, +} from "../../../../main/typescript"; +import { + IPluginFactoryOptions, + PluginImportType, +} from "@hyperledger/cactus-core-api"; +import { bufArray2HexStr } from "../../../../main/typescript/gateway-utils"; +import { + knexClientConnection, + knexRemoteConnection1, + knexRemoteConnection2, + knexServerConnection, +} from "../../knex.config"; +import { Knex, knex } from "knex"; +import { create } from "@bufbuild/protobuf"; +import { stringify as safeStableStringify } from "safe-stable-stringify"; + +let knexInstanceClient: Knex; +let knexInstanceServer: Knex; +let knexInstanceRemote1: Knex; +let knexInstanceRemote2: Knex; + +let gateway1: SATPGateway; +let gateway2: SATPGateway; + +const keyPairs = Secp256k1Keys.generateKeyPairsBuffer(); +let crashManager1: CrashRecoveryManager; +let crashManager2: CrashRecoveryManager; + +const createMockSession = ( + maxTimeout: string, + maxRetries: string, + isClient: boolean, +): SATPSession => { + const sessionId = uuidv4(); + const mockSession = new SATPSession({ + contextID: "MOCK_CONTEXT_ID", + server: !isClient, + client: isClient, + }); + const sessionData = mockSession.hasClientSessionData() + ? mockSession.getClientSessionData() + : mockSession.getServerSessionData(); + + sessionData.id = sessionId; + sessionData.maxTimeout = maxTimeout; + sessionData.maxRetries = maxRetries; + sessionData.version = SATP_VERSION; + sessionData.clientGatewayPubkey = Buffer.from(keyPairs.publicKey).toString( + "hex", + ); + sessionData.serverGatewayPubkey = sessionData.clientGatewayPubkey; + sessionData.originatorPubkey = "MOCK_ORIGINATOR_PUBKEY"; + sessionData.beneficiaryPubkey = "MOCK_BENEFICIARY_PUBKEY"; + sessionData.digitalAssetId = "MOCK_DIGITAL_ASSET_ID"; + sessionData.assetProfileId = "MOCK_ASSET_PROFILE_ID"; + sessionData.receiverGatewayOwnerId = "MOCK_RECEIVER_GATEWAY_OWNER_ID"; + sessionData.recipientGatewayNetworkId = SupportedChain.FABRIC; + sessionData.senderGatewayOwnerId = "MOCK_SENDER_GATEWAY_OWNER_ID"; + sessionData.senderGatewayNetworkId = SupportedChain.BESU; + sessionData.signatureAlgorithm = SignatureAlgorithm.RSA; + sessionData.lockType = LockType.FAUCET; + sessionData.lockExpirationTime = BigInt(1000); + sessionData.credentialProfile = CredentialProfile.X509; + sessionData.loggingProfile = "MOCK_LOGGING_PROFILE"; + sessionData.accessControlProfile = "MOCK_ACCESS_CONTROL_PROFILE"; + sessionData.resourceUrl = "MOCK_RESOURCE_URL"; + sessionData.lockAssertionExpiration = BigInt(99999); + sessionData.receiverContractOntology = "MOCK_RECEIVER_CONTRACT_ONTOLOGY"; + sessionData.senderContractOntology = "MOCK_SENDER_CONTRACT_ONTOLOGY"; + sessionData.sourceLedgerAssetId = "MOCK_SOURCE_LEDGER_ASSET_ID"; + sessionData.senderAsset = create(AssetSchema, { + tokenId: "MOCK_TOKEN_ID", + tokenType: TokenType.ERC20, + amount: BigInt(100), + owner: "MOCK_SENDER_ASSET_OWNER", + ontology: "MOCK_SENDER_ASSET_ONTOLOGY", + contractName: "MOCK_SENDER_ASSET_CONTRACT_NAME", + contractAddress: "MOCK_SENDER_ASSET_CONTRACT_ADDRESS", + }); + sessionData.receiverAsset = create(AssetSchema, { + tokenType: TokenType.ERC20, + amount: BigInt(100), + owner: "MOCK_RECEIVER_ASSET_OWNER", + ontology: "MOCK_RECEIVER_ASSET_ONTOLOGY", + contractName: "MOCK_RECEIVER_ASSET_CONTRACT_NAME", + mspId: "MOCK_RECEIVER_ASSET_MSP_ID", + channelName: "MOCK_CHANNEL_ID", + }); + + return mockSession; +}; + +beforeAll(async () => { + const factoryOptions: IPluginFactoryOptions = { + pluginImportType: PluginImportType.Local, + }; + const factory = new PluginFactorySATPGateway(factoryOptions); + + const gateway1KeyPair = Secp256k1Keys.generateKeyPairsBuffer(); + const gateway2KeyPair = Secp256k1Keys.generateKeyPairsBuffer(); + + const gatewayIdentity1: GatewayIdentity = { + id: "mockID-1", + name: "CustomGateway1", + pubKey: bufArray2HexStr(gateway1KeyPair.publicKey), + version: [ + { + Core: "v02", + Architecture: "v02", + Crash: "v02", + }, + ], + supportedDLTs: [SupportedChain.BESU], + proofID: "mockProofID10", + address: "http://localhost" as Address, + gatewayServerPort: 3006, + gatewayClientPort: 3001, + gatewayOpenAPIPort: 3002, + }; + + const gatewayIdentity2: GatewayIdentity = { + id: "mockID-2", + name: "CustomGateway2", + pubKey: bufArray2HexStr(gateway2KeyPair.publicKey), + version: [ + { + Core: "v02", + Architecture: "v02", + Crash: "v02", + }, + ], + supportedDLTs: [SupportedChain.FABRIC], + proofID: "mockProofID11", + address: "http://localhost" as Address, + gatewayServerPort: 3228, + gatewayClientPort: 3211, + gatewayOpenAPIPort: 4210, + }; + + knexInstanceClient = knex(knexClientConnection); + await knexInstanceClient.migrate.latest(); + + knexInstanceRemote1 = knex(knexRemoteConnection1); + await knexInstanceRemote1.migrate.latest(); + + const options1: SATPGatewayConfig = { + logLevel: "DEBUG", + gid: gatewayIdentity1, + counterPartyGateways: [gatewayIdentity2], + keyPair: gateway1KeyPair, + knexLocalConfig: knexClientConnection, + knexRemoteConfig: knexRemoteConnection1, + }; + + knexInstanceServer = knex(knexServerConnection); + await knexInstanceServer.migrate.latest(); + + knexInstanceRemote2 = knex(knexRemoteConnection2); + await knexInstanceRemote2.migrate.latest(); + + const options2: SATPGatewayConfig = { + logLevel: "DEBUG", + gid: gatewayIdentity2, + counterPartyGateways: [gatewayIdentity1], + keyPair: gateway2KeyPair, + knexLocalConfig: knexServerConnection, + knexRemoteConfig: knexRemoteConnection2, + }; + + gateway1 = (await factory.create(options1)) as SATPGateway; + expect(gateway1).toBeInstanceOf(SATPGateway); + await gateway1.startup(); + + gateway2 = (await factory.create(options2)) as SATPGateway; + expect(gateway2).toBeInstanceOf(SATPGateway); + await gateway2.startup(); +}); + +afterEach(async () => { + jest.clearAllMocks(); +}); + +afterAll(async () => { + if (gateway1) await gateway1.shutdown(); + if (gateway2) await gateway2.shutdown(); + + if (crashManager1 || crashManager2) { + crashManager1.stopCrashDetection(); + crashManager1.localRepository.destroy(); + + crashManager2.stopCrashDetection(); + crashManager2.localRepository.destroy(); + } + if (knexInstanceClient || knexInstanceServer) { + await knexInstanceClient.destroy(); + await knexInstanceServer.destroy(); + } +}); + +describe("Crash Recovery Testing", () => { + it("should recover when handle recovery is called", async () => { + crashManager1 = gateway1["crashManager"] as CrashRecoveryManager; + expect(crashManager1).toBeInstanceOf(CrashRecoveryManager); + + crashManager2 = gateway2["crashManager"] as CrashRecoveryManager; + expect(crashManager2).toBeInstanceOf(CrashRecoveryManager); + + const clientSession = createMockSession("1000", "3", true); + const serverSession = createMockSession("1000", "3", false); + const clientSessionData = clientSession.getClientSessionData(); + const serverSessionData = serverSession.getServerSessionData(); + const sessionId = clientSessionData.id; + + serverSessionData.id = sessionId; + + // Incomplete logs on gateway1 (client) + const key1 = getSatpLogKey(sessionId, "type", "operation1"); + const mockLogEntry1: LocalLog = { + sessionId: sessionId, + type: "type", + key: key1, + operation: "operation1", + timestamp: new Date().toISOString(), + data: safeStableStringify(clientSessionData), + sequenceNumber: Number(clientSessionData.lastSequenceNumber), + }; + + const mockLogRepository1 = crashManager1["localRepository"]; + await mockLogRepository1.create(mockLogEntry1); + + // Logs on gateway2 (server) + const key2 = getSatpLogKey(sessionId, "type2", "done"); + const mockLogEntry2: LocalLog = { + sessionId: sessionId, + type: "type2", + key: key2, + operation: "done", + timestamp: new Date().toISOString(), + data: safeStableStringify(serverSessionData), + sequenceNumber: Number(serverSessionData.lastSequenceNumber) + 1, + }; + + const mockLogRepository2 = crashManager2["localRepository"]; + await mockLogRepository2.create(mockLogEntry2); + + // recover sessions on server side + await crashManager2.recoverSessions(); + + const result = await crashManager1.handleRecovery(clientSession); + expect(result).toBeTrue(); + + // verify session data in gateway1 is updated + const updatedSession = crashManager1["sessions"].get(sessionId); + expect(updatedSession).toBeDefined(); + }); +}); diff --git a/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/crashmanager/rollback.test.ts b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/crashmanager/rollback.test.ts new file mode 100644 index 0000000000..926af78315 --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/crashmanager/rollback.test.ts @@ -0,0 +1,349 @@ +import "jest-extended"; +import { Secp256k1Keys } from "@hyperledger/cactus-common"; +import { CrashRecoveryManager } from "../../../../main/typescript/gol/crash-manager"; +import { + LocalLog, + SupportedChain, + GatewayIdentity, + Address, +} from "../../../../main/typescript/core/types"; +import { + pruneDockerAllIfGithubAction, + Containers, +} from "@hyperledger/cactus-test-tooling"; +import { BesuTestEnvironment, FabricTestEnvironment } from "../../test-utils"; +import { + AssetSchema, + CredentialProfile, + LockType, + SignatureAlgorithm, +} from "../../../../main/typescript/generated/proto/cacti/satp/v02/common/message_pb"; +import { v4 as uuidv4 } from "uuid"; +import { SATP_VERSION } from "../../../../main/typescript/core/constants"; +import { SATPSession } from "../../../../main/typescript/core/satp-session"; +import { getSatpLogKey } from "../../../../main/typescript/gateway-utils"; +import { TokenType } from "../../../../main/typescript/core/stage-services/satp-bridge/types/asset"; +import { + SATPGatewayConfig, + PluginFactorySATPGateway, + SATPGateway, +} from "../../../../main/typescript"; +import { + IPluginFactoryOptions, + PluginImportType, +} from "@hyperledger/cactus-core-api"; +import { bufArray2HexStr } from "../../../../main/typescript/gateway-utils"; +import { + knexClientConnection, + knexRemoteConnection1, + knexRemoteConnection2, + knexServerConnection, +} from "../../knex.config"; +import { LogLevelDesc, LoggerProvider } from "@hyperledger/cactus-common"; +import { ClaimFormat } from "../../../../main/typescript/generated/proto/cacti/satp/v02/common/message_pb"; +import { Knex, knex } from "knex"; +import { create } from "@bufbuild/protobuf"; + +let fabricEnv: FabricTestEnvironment; +let besuEnv: BesuTestEnvironment; + +let knexInstanceClient: Knex; +let knexInstanceServer: Knex; +let knexInstanceRemote1: Knex; +let knexInstanceRemote2: Knex; + +let gateway1: SATPGateway; +let gateway2: SATPGateway; +const bridge_id = + "x509::/OU=org2/OU=client/OU=department1/CN=bridge::/C=UK/ST=Hampshire/L=Hursley/O=org2.example.com/CN=ca.org2.example.com"; + +let crashManager1: CrashRecoveryManager; +let crashManager2: CrashRecoveryManager; + +const keyPairs = Secp256k1Keys.generateKeyPairsBuffer(); +const logLevel: LogLevelDesc = "DEBUG"; +const log = LoggerProvider.getOrCreate({ + level: logLevel, + label: "BUNGEE - Hermes", +}); + +const createMockSession = ( + maxTimeout: string, + maxRetries: string, + isClient: boolean, +): SATPSession => { + const sessionId = uuidv4(); + const mockSession = new SATPSession({ + contextID: "MOCK_CONTEXT_ID", + server: !isClient, + client: isClient, + }); + + const sessionData = mockSession.hasClientSessionData() + ? mockSession.getClientSessionData() + : mockSession.getServerSessionData(); + + sessionData.id = sessionId; + sessionData.maxTimeout = maxTimeout; + sessionData.maxRetries = maxRetries; + sessionData.version = SATP_VERSION; + sessionData.clientGatewayPubkey = Buffer.from(keyPairs.publicKey).toString( + "hex", + ); + sessionData.serverGatewayPubkey = sessionData.clientGatewayPubkey; + sessionData.originatorPubkey = "MOCK_ORIGINATOR_PUBKEY"; + sessionData.beneficiaryPubkey = "MOCK_BENEFICIARY_PUBKEY"; + sessionData.digitalAssetId = "MOCK_DIGITAL_ASSET_ID"; + sessionData.assetProfileId = "MOCK_ASSET_PROFILE_ID"; + sessionData.receiverGatewayOwnerId = "MOCK_RECEIVER_GATEWAY_OWNER_ID"; + sessionData.senderGatewayOwnerId = "MOCK_SENDER_GATEWAY_OWNER_ID"; + sessionData.signatureAlgorithm = SignatureAlgorithm.RSA; + sessionData.lockType = LockType.FAUCET; + sessionData.lockExpirationTime = BigInt(1000); + sessionData.credentialProfile = CredentialProfile.X509; + sessionData.loggingProfile = "MOCK_LOGGING_PROFILE"; + sessionData.accessControlProfile = "MOCK_ACCESS_CONTROL_PROFILE"; + sessionData.resourceUrl = "MOCK_RESOURCE_URL"; + sessionData.lockAssertionExpiration = BigInt(99999); + sessionData.receiverContractOntology = "MOCK_RECEIVER_CONTRACT_ONTOLOGY"; + sessionData.senderContractOntology = "MOCK_SENDER_CONTRACT_ONTOLOGY"; + sessionData.sourceLedgerAssetId = "MOCK_SOURCE_LEDGER_ASSET_ID"; + sessionData.senderAsset = create(AssetSchema, { + tokenId: "MOCK_TOKEN_ID", + tokenType: TokenType.ERC20, + amount: BigInt(100), + owner: "MOCK_SENDER_ASSET_OWNER", + ontology: "MOCK_SENDER_ASSET_ONTOLOGY", + contractName: "MOCK_SENDER_ASSET_CONTRACT_NAME", + contractAddress: "MOCK_SENDER_ASSET_CONTRACT_ADDRESS", + }); + sessionData.receiverAsset = create(AssetSchema, { + tokenType: TokenType.ERC20, + amount: BigInt(100), + owner: "MOCK_RECEIVER_ASSET_OWNER", + ontology: "MOCK_RECEIVER_ASSET_ONTOLOGY", + contractName: "MOCK_RECEIVER_ASSET_CONTRACT_NAME", + mspId: "MOCK_RECEIVER_ASSET_MSP_ID", + channelName: "MOCK_CHANNEL_ID", + }); + + if (isClient) { + sessionData.senderGatewayNetworkId = SupportedChain.BESU; + sessionData.recipientGatewayNetworkId = SupportedChain.FABRIC; + } else { + sessionData.senderGatewayNetworkId = SupportedChain.FABRIC; + sessionData.recipientGatewayNetworkId = SupportedChain.BESU; + } + + return mockSession; +}; + +beforeAll(async () => { + pruneDockerAllIfGithubAction({ logLevel }) + .then(() => { + log.info("Pruning throw OK"); + }) + .catch(async () => { + await Containers.logDiagnostics({ logLevel }); + fail("Pruning didn't throw OK"); + }); + + { + const satpContractName = "satp-contract"; + fabricEnv = await FabricTestEnvironment.setupTestEnvironment( + satpContractName, + bridge_id, + logLevel, + ); + log.info("Fabric Ledger started successfully"); + + await fabricEnv.deployAndSetupContracts(ClaimFormat.DEFAULT); + } + + { + const erc20TokenContract = "SATPContract"; + const contractNameWrapper = "SATPWrapperContract"; + + besuEnv = await BesuTestEnvironment.setupTestEnvironment( + erc20TokenContract, + contractNameWrapper, + logLevel, + ); + log.info("Besu Ledger started successfully"); + + await besuEnv.deployAndSetupContracts(ClaimFormat.DEFAULT); + } +}); + +afterEach(async () => { + jest.clearAllMocks(); +}); + +afterAll(async () => { + if (gateway1) await gateway1.shutdown(); + if (gateway2) await gateway2.shutdown(); + + if (crashManager1 || crashManager2) { + crashManager1.stopCrashDetection(); + crashManager1.localRepository.destroy(); + + crashManager2.stopCrashDetection(); + crashManager2.localRepository.destroy(); + } + if (knexInstanceClient || knexInstanceServer) { + await knexInstanceClient.destroy(); + await knexInstanceServer.destroy(); + } + + await besuEnv.tearDown(); + await fabricEnv.tearDown(); + + await pruneDockerAllIfGithubAction({ logLevel }) + .then(() => { + log.info("Pruning throw OK"); + }) + .catch(async () => { + await Containers.logDiagnostics({ logLevel }); + fail("Pruning didn't throw OK"); + }); +}); + +describe("Crash Recovery Manager - Rollback", () => { + it("should successfully initiate rollback", async () => { + const factoryOptions: IPluginFactoryOptions = { + pluginImportType: PluginImportType.Local, + }; + const factory = new PluginFactorySATPGateway(factoryOptions); + + const gateway1KeyPair = Secp256k1Keys.generateKeyPairsBuffer(); + const gateway2KeyPair = Secp256k1Keys.generateKeyPairsBuffer(); + + const gatewayIdentity1: GatewayIdentity = { + id: "mockID-1", + name: "CustomGateway1", + pubKey: bufArray2HexStr(gateway1KeyPair.publicKey), + version: [ + { + Core: "v02", + Architecture: "v02", + Crash: "v02", + }, + ], + supportedDLTs: [SupportedChain.BESU], + proofID: "mockProofID10", + address: "http://localhost" as Address, + gatewayServerPort: 3005, + gatewayClientPort: 3001, + gatewayOpenAPIPort: 3002, + }; + + const gatewayIdentity2: GatewayIdentity = { + id: "mockID-2", + name: "CustomGateway2", + pubKey: bufArray2HexStr(gateway2KeyPair.publicKey), + version: [ + { + Core: "v02", + Architecture: "v02", + Crash: "v02", + }, + ], + supportedDLTs: [SupportedChain.FABRIC], + proofID: "mockProofID11", + address: "http://localhost" as Address, + gatewayServerPort: 3225, + gatewayClientPort: 3211, + gatewayOpenAPIPort: 4210, + }; + + knexInstanceClient = knex(knexClientConnection); + await knexInstanceClient.migrate.latest(); + + knexInstanceRemote1 = knex(knexRemoteConnection1); + await knexInstanceRemote1.migrate.latest(); + + const options1: SATPGatewayConfig = { + logLevel: "DEBUG", + gid: gatewayIdentity1, + counterPartyGateways: [gatewayIdentity2], + keyPair: gateway1KeyPair, + bridgesConfig: [besuEnv.besuConfig], + knexLocalConfig: knexClientConnection, + knexRemoteConfig: knexRemoteConnection1, + }; + + knexInstanceServer = knex(knexServerConnection); + await knexInstanceServer.migrate.latest(); + + knexInstanceRemote2 = knex(knexRemoteConnection2); + await knexInstanceRemote2.migrate.latest(); + + const options2: SATPGatewayConfig = { + logLevel: "DEBUG", + gid: gatewayIdentity2, + counterPartyGateways: [gatewayIdentity1], + keyPair: gateway2KeyPair, + bridgesConfig: [fabricEnv.fabricConfig], + knexLocalConfig: knexServerConnection, + knexRemoteConfig: knexRemoteConnection2, + }; + + gateway1 = (await factory.create(options1)) as SATPGateway; + expect(gateway1).toBeInstanceOf(SATPGateway); + await gateway1.startup(); + + gateway2 = (await factory.create(options2)) as SATPGateway; + expect(gateway2).toBeInstanceOf(SATPGateway); + await gateway2.startup(); + + crashManager1 = gateway1["crashManager"] as CrashRecoveryManager; + expect(crashManager1).toBeInstanceOf(CrashRecoveryManager); + + crashManager2 = gateway2["crashManager"] as CrashRecoveryManager; + expect(crashManager2).toBeInstanceOf(CrashRecoveryManager); + + const clientSession = createMockSession("1000", "3", true); + const serverSession = createMockSession("1000", "3", false); + const clientSessionData = clientSession.getClientSessionData(); + const serverSessionData = serverSession.getServerSessionData(); + const sessionId = clientSessionData.id; + + serverSessionData.id = sessionId; + + // Incomplete logs on gateway1 (client) + const key1 = getSatpLogKey(sessionId, "type", "operation1"); + const mockLogEntry1: LocalLog = { + sessionId: sessionId, + type: "type", + key: key1, + operation: "operation1", + timestamp: new Date().toISOString(), + data: JSON.stringify(clientSessionData), + sequenceNumber: Number(clientSessionData.lastSequenceNumber), + }; + + const mockLogRepository1 = crashManager1["localRepository"]; + await mockLogRepository1.create(mockLogEntry1); + + // Logs on gateway2 (server) + const key2 = getSatpLogKey(sessionId, "type2", "done"); + const mockLogEntry2: LocalLog = { + sessionId: sessionId, + type: "type2", + key: key2, + operation: "done", + timestamp: new Date().toISOString(), + data: JSON.stringify(serverSessionData), + sequenceNumber: Number(serverSessionData.lastSequenceNumber) + 1, + }; + + const mockLogRepository2 = crashManager2["localRepository"]; + await mockLogRepository2.create(mockLogEntry2); + + await crashManager2.recoverSessions(); + + const result = await crashManager1.initiateRollback(clientSession, true); + + expect(result).toBeTrue(); + }); +}); diff --git a/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/crashmanager/scenarios.test.ts b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/crashmanager/scenarios.test.ts new file mode 100644 index 0000000000..525d109397 --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/crashmanager/scenarios.test.ts @@ -0,0 +1,523 @@ +import "jest-extended"; +import { + LogLevelDesc, + Secp256k1Keys, + JsObjectSigner, + IJsObjectSignerOptions, +} from "@hyperledger/cactus-common"; +import { CrashRecoveryManager } from "../../../../main/typescript/gol/crash-manager"; +import { CrashStatus } from "../../../../main/typescript/core/types"; +import { ICrashRecoveryManagerOptions } from "../../../../main/typescript/gol/crash-manager"; +import { Knex, knex } from "knex"; +import { + LocalLog, + SupportedChain, + GatewayIdentity, + Address, +} from "../../../../main/typescript/core/types"; +import { + AssetSchema, + CredentialProfile, + LockType, + SignatureAlgorithm, +} from "../../../../main/typescript/generated/proto/cacti/satp/v02/common/message_pb"; +import { v4 as uuidv4 } from "uuid"; +import { SessionData } from "../../../../main/typescript/generated/proto/cacti/satp/v02/common/session_pb"; +import { SATP_VERSION } from "../../../../main/typescript/core/constants"; +import { SATPSession } from "../../../../main/typescript/core/satp-session"; +import { knexClientConnection, knexRemoteConnection1 } from "../../knex.config"; +import { + bufArray2HexStr, + getSatpLogKey, +} from "../../../../main/typescript/gateway-utils"; +import { TokenType } from "../../../../main/typescript/core/stage-services/satp-bridge/types/asset"; +import { RecoverUpdateMessage } from "../../../../main/typescript/generated/proto/cacti/satp/v02/crash_recovery_pb"; +import { + GatewayOrchestrator, + IGatewayOrchestratorOptions, +} from "../../../../main/typescript/gol/gateway-orchestrator"; +import { + ISATPBridgesOptions, + SATPBridgesManager, +} from "../../../../main/typescript/gol/satp-bridges-manager"; +import { create } from "@bufbuild/protobuf"; + +import { + SATP_ARCHITETURE_VERSION, + SATP_CORE_VERSION, + SATP_CRASH_VERSION, +} from "../../../../main/typescript/core/constants"; +import { KnexLocalLogRepository } from "../../../../main/typescript/repository/knex-local-log-repository"; +import { KnexRemoteLogRepository } from "../../../../main/typescript/repository/knex-remote-log-repository"; +import { + ILocalLogRepository, + IRemoteLogRepository, +} from "../../../../main/typescript/repository/interfaces/repository"; +import { stringify as safeStableStringify } from "safe-stable-stringify"; + +let mockSession: SATPSession; +const keyPairs = Secp256k1Keys.generateKeyPairsBuffer(); + +const createMockSession = (maxTimeout: string, maxRetries: string) => { + const sessionId = uuidv4(); + const mockSession = new SATPSession({ + contextID: "MOCK_CONTEXT_ID", + server: false, + client: true, + }); + + const sessionData = mockSession.hasClientSessionData() + ? mockSession.getClientSessionData() + : mockSession.getServerSessionData(); + + sessionData.id = sessionId; + sessionData.maxTimeout = maxTimeout; + sessionData.maxRetries = maxRetries; + sessionData.version = SATP_VERSION; + sessionData.clientGatewayPubkey = Buffer.from(keyPairs.publicKey).toString( + "hex", + ); + sessionData.serverGatewayPubkey = sessionData.clientGatewayPubkey; + sessionData.originatorPubkey = "MOCK_ORIGINATOR_PUBKEY"; + sessionData.beneficiaryPubkey = "MOCK_BENEFICIARY_PUBKEY"; + sessionData.digitalAssetId = "MOCK_DIGITAL_ASSET_ID"; + sessionData.assetProfileId = "MOCK_ASSET_PROFILE_ID"; + sessionData.receiverGatewayOwnerId = "MOCK_RECEIVER_GATEWAY_OWNER_ID"; + sessionData.recipientGatewayNetworkId = SupportedChain.FABRIC; + sessionData.senderGatewayOwnerId = "MOCK_SENDER_GATEWAY_OWNER_ID"; + sessionData.senderGatewayNetworkId = SupportedChain.BESU; + sessionData.signatureAlgorithm = SignatureAlgorithm.RSA; + sessionData.lockType = LockType.FAUCET; + sessionData.lockExpirationTime = BigInt(1000); + sessionData.credentialProfile = CredentialProfile.X509; + sessionData.loggingProfile = "MOCK_LOGGING_PROFILE"; + sessionData.accessControlProfile = "MOCK_ACCESS_CONTROL_PROFILE"; + sessionData.resourceUrl = "MOCK_RESOURCE_URL"; + sessionData.lockAssertionExpiration = BigInt(99999); + sessionData.receiverContractOntology = "MOCK_RECEIVER_CONTRACT_ONTOLOGY"; + sessionData.senderContractOntology = "MOCK_SENDER_CONTRACT_ONTOLOGY"; + sessionData.sourceLedgerAssetId = "MOCK_SOURCE_LEDGER_ASSET_ID"; + sessionData.senderAsset = create(AssetSchema, { + tokenId: "MOCK_TOKEN_ID", + tokenType: TokenType.ERC20, + amount: BigInt(100), + owner: "MOCK_SENDER_ASSET_OWNER", + ontology: "MOCK_SENDER_ASSET_ONTOLOGY", + contractName: "MOCK_SENDER_ASSET_CONTRACT_NAME", + contractAddress: "MOCK_SENDER_ASSET_CONTRACT_ADDRESS", + }); + sessionData.receiverAsset = create(AssetSchema, { + tokenType: TokenType.ERC20, + amount: BigInt(100), + owner: "MOCK_RECEIVER_ASSET_OWNER", + ontology: "MOCK_RECEIVER_ASSET_ONTOLOGY", + contractName: "MOCK_RECEIVER_ASSET_CONTRACT_NAME", + mspId: "MOCK_RECEIVER_ASSET_MSP_ID", + channelName: "MOCK_CHANNEL_ID", + }); + return mockSession; +}; +let crashManager: CrashRecoveryManager; +let knexInstanceClient: Knex; +let knexInstanceRemote: Knex; +let localRepository: ILocalLogRepository; +let remoteRepository: IRemoteLogRepository; + +beforeAll(async () => { + knexInstanceClient = knex(knexClientConnection); + await knexInstanceClient.migrate.latest(); + knexInstanceRemote = knex(knexRemoteConnection1); + await knexInstanceRemote.migrate.latest(); + + localRepository = new KnexLocalLogRepository(knexClientConnection); + remoteRepository = new KnexRemoteLogRepository(knexClientConnection); + + const keyPairs = Secp256k1Keys.generateKeyPairsBuffer(); + const signerOptions: IJsObjectSignerOptions = { + privateKey: bufArray2HexStr(keyPairs.privateKey), + logLevel: "debug", + }; + const signer = new JsObjectSigner(signerOptions); + + const gatewayIdentity = { + id: "mockID-1", + name: "CustomGateway", + version: [ + { + Core: SATP_CORE_VERSION, + Architecture: SATP_ARCHITETURE_VERSION, + Crash: SATP_CRASH_VERSION, + }, + ], + supportedDLTs: [SupportedChain.BESU], + proofID: "mockProofID10", + address: "http://localhost" as Address, + } as GatewayIdentity; + + const orchestratorOptions: IGatewayOrchestratorOptions = { + logLevel: "DEBUG", + localGateway: gatewayIdentity, + counterPartyGateways: [], + signer: signer, + }; + const gatewayOrchestrator = new GatewayOrchestrator(orchestratorOptions); + + const bridgesManagerOptions: ISATPBridgesOptions = { + logLevel: "DEBUG", + supportedDLTs: gatewayIdentity.supportedDLTs, + networks: [], + }; + const bridgesManager = new SATPBridgesManager(bridgesManagerOptions); + + const crashOptions: ICrashRecoveryManagerOptions = { + instanceId: "test-instance", + logLevel: "DEBUG" as LogLevelDesc, + bridgeConfig: bridgesManager, + orchestrator: gatewayOrchestrator, + localRepository: localRepository, + remoteRepository: remoteRepository, + signer: signer, + pubKey: bufArray2HexStr(keyPairs.publicKey), + }; + crashManager = new CrashRecoveryManager(crashOptions); +}); + +afterEach(async () => { + crashManager["sessions"].clear(); + jest.clearAllMocks(); +}); + +afterAll(async () => { + if (crashManager) { + crashManager.stopCrashDetection(); + crashManager.localRepository.destroy(); + crashManager.remoteRepository.destroy(); + } + if (knexInstanceClient || knexInstanceRemote) { + await knexInstanceClient.destroy(); + await knexInstanceRemote.destroy(); + } +}); + +describe("CrashRecoveryManager Tests", () => { + it("should reconstruct session by fetching logs", async () => { + mockSession = createMockSession("1000", "3"); + + const testData = mockSession.hasClientSessionData() + ? mockSession.getClientSessionData() + : mockSession.getServerSessionData(); + const sessionId = testData.id; + console.log(" id cgeck : ", sessionId); + // load sample log in database + const key = getSatpLogKey(sessionId, "type", "operation"); + const mockLogEntry: LocalLog = { + sessionId: sessionId, + type: "type", + key: key, + operation: "operation", + timestamp: new Date().toISOString(), + data: safeStableStringify(testData), + sequenceNumber: Number(testData.lastSequenceNumber), + }; + const mockLogRepository = crashManager["localRepository"]; + + await mockLogRepository.create(mockLogEntry); + await crashManager.recoverSessions(); + + expect(crashManager["sessions"].has(sessionId)).toBeTrue(); + + const recoveredSession = crashManager["sessions"].get(sessionId); + + expect(recoveredSession).toBeDefined(); + + if (recoveredSession) { + const parsedSessionData: SessionData = JSON.parse(mockLogEntry.data); + const sessionData = recoveredSession.hasClientSessionData() + ? recoveredSession.getClientSessionData() + : recoveredSession.getServerSessionData(); + + expect(sessionData).toEqual(parsedSessionData); + } + }); + + it("should invoke rollback based on session timeout", async () => { + mockSession = createMockSession("1000", "3"); // timeout of 1 sec + // client-side test + const testData = mockSession.hasClientSessionData() + ? mockSession.getClientSessionData() + : mockSession.getServerSessionData(); + const sessionId = testData.id; + + const handleRollbackSpy = jest + .spyOn(crashManager, "initiateRollback") + .mockImplementation(async () => true); + + const key = getSatpLogKey(sessionId, "type_o", "done"); + + const pastTime = new Date(Date.now() - 10000).toISOString(); + + const mockLogEntry: LocalLog = { + sessionId: sessionId, + type: "type_o", + key: key, + operation: "done", + timestamp: pastTime, + data: safeStableStringify(testData), + sequenceNumber: Number(testData.lastSequenceNumber), + }; + + const mockLogRepository = crashManager["localRepository"]; + + await mockLogRepository.create(mockLogEntry); + + await crashManager.checkAndResolveCrash(mockSession); + + expect(handleRollbackSpy).toHaveBeenCalled(); + + handleRollbackSpy.mockRestore(); + }); + + it("should not recover if no crash is detected", async () => { + mockSession = createMockSession("10000", "3"); + + const testData = mockSession.hasClientSessionData() + ? mockSession.getClientSessionData() + : mockSession.getServerSessionData(); + + const mockLogEntry: LocalLog = { + sessionId: testData.id, + type: "type", + key: getSatpLogKey(testData.id, "type", "done"), + operation: "done", + timestamp: new Date().toISOString(), + data: safeStableStringify(testData), + sequenceNumber: Number(testData.lastSequenceNumber), + }; + + await crashManager.localRepository.create(mockLogEntry); + + const handleRecoverySpy = jest.spyOn(crashManager, "handleRecovery"); + const initiateRollbackSpy = jest.spyOn(crashManager, "initiateRollback"); + + await crashManager.checkAndResolveCrash(mockSession); + + expect(handleRecoverySpy).not.toHaveBeenCalled(); + expect(initiateRollbackSpy).not.toHaveBeenCalled(); + }); + + it("should invoke handleRecovery when crash is initially detected", async () => { + mockSession = createMockSession("1000", "3"); + + const handleRecoverySpy = jest + .spyOn(crashManager, "handleRecovery") + .mockImplementation(async () => true); + + jest + .spyOn(crashManager as any, "checkCrash") + .mockImplementation(() => Promise.resolve(CrashStatus.IN_RECOVERY)); + + await crashManager.checkAndResolveCrash(mockSession); + + expect(handleRecoverySpy).toHaveBeenCalled(); + + handleRecoverySpy.mockRestore(); + }); + + it("should invoke initiateRollback when recovery attempts are exhausted", async () => { + mockSession = createMockSession("1000", "3"); + + const handleRecoverySpy = jest + .spyOn(crashManager, "handleRecovery") + .mockImplementation(async () => false); + + const initiateRollbackSpy = jest + .spyOn(crashManager, "initiateRollback") + .mockImplementation(async () => true); + + jest + .spyOn(crashManager as any, "checkCrash") + .mockImplementation(() => Promise.resolve(CrashStatus.IN_RECOVERY)); + + await crashManager.checkAndResolveCrash(mockSession); + + expect(handleRecoverySpy).toHaveBeenCalled(); + expect(initiateRollbackSpy).toHaveBeenCalled(); + + handleRecoverySpy.mockRestore(); + initiateRollbackSpy.mockRestore(); + }); + + it("should detect crash based on incomplete operation in logs", async () => { + mockSession = createMockSession("10000", "3"); + + const testData = mockSession.hasClientSessionData() + ? mockSession.getClientSessionData() + : mockSession.getServerSessionData(); + const sessionId = testData.id; + + const handleRecoverySpy = jest + .spyOn(crashManager, "handleRecovery") + .mockImplementation(async () => true); + + const key = getSatpLogKey(sessionId, "type", "init"); + + const mockLogEntry: LocalLog = { + sessionId: sessionId, + type: "type", + key: key, + operation: "init", // operation!=done + timestamp: new Date().toISOString(), + data: safeStableStringify(testData), + sequenceNumber: Number(testData.lastSequenceNumber), + }; + + const mockLogRepository = crashManager["localRepository"]; + + await mockLogRepository.create(mockLogEntry); + + await crashManager.checkAndResolveCrash(mockSession); + + expect(handleRecoverySpy).toHaveBeenCalled(); + + handleRecoverySpy.mockRestore(); + }); + + it("should detect crash based on incomplete operation in logs and initiate rollback when recovery fails", async () => { + mockSession = createMockSession("10000", "3"); + + const testData = mockSession.hasClientSessionData() + ? mockSession.getClientSessionData() + : mockSession.getServerSessionData(); + const sessionId = testData.id; + + const handleRecoverySpy = jest + .spyOn(crashManager, "handleRecovery") + .mockImplementation(async () => false); + + const handleInitiateRollBackSpy = jest + .spyOn(crashManager, "initiateRollback") + .mockImplementation(async () => true); + + const key = getSatpLogKey(sessionId, "type3", "init"); + + const mockLogEntry: LocalLog = { + sessionId: sessionId, + type: "type3", + key: key, + operation: "init", // operation!=done + timestamp: new Date().toISOString(), + data: safeStableStringify(testData), + sequenceNumber: Number(testData.lastSequenceNumber), + }; + + const mockLogRepository = crashManager["localRepository"]; + + await mockLogRepository.create(mockLogEntry); + + await crashManager.checkAndResolveCrash(mockSession); + + expect(handleRecoverySpy).toHaveBeenCalled(); + expect(handleInitiateRollBackSpy).toHaveBeenCalled(); + + handleRecoverySpy.mockRestore(); + handleInitiateRollBackSpy.mockRestore(); + }); + + it("should process recovered logs and add missing logs", async () => { + const mockSession = createMockSession("1000", "3"); + const testData = mockSession.hasClientSessionData() + ? mockSession.getClientSessionData() + : mockSession.getServerSessionData(); + const sessionId = testData.id; + + const recoveredLogs: LocalLog[] = [ + { + sessionId: sessionId, + type: "type_1", + key: getSatpLogKey(sessionId, "type_1", "init"), + operation: "init", + timestamp: new Date().toISOString(), + data: safeStableStringify(testData), + sequenceNumber: Number(testData.lastSequenceNumber), + }, + ]; + + const recoverUpdateMessage = { + sessionId: sessionId, + messageType: "urn:ietf:SATP-2pc:msgtype:recover-update-msg", + hashRecoverMessage: "", + recoveredLogs: recoveredLogs, + senderSignature: "", + } as RecoverUpdateMessage; + + const result = + await crashManager["processRecoverUpdateMessage"](recoverUpdateMessage); + + expect(result).toBeTrue(); + + const reconstructedSessionData = crashManager["sessions"].get(sessionId); + expect(reconstructedSessionData).toBeDefined(); + }); + + it("should process logs received from gateway(server)", async () => { + mockSession = createMockSession("10000", "3"); + + const testData = mockSession.hasClientSessionData() + ? mockSession.getClientSessionData() + : mockSession.getServerSessionData(); + const sessionId = testData.id; + + // Create an existing log entry for client + const existingLogEntry: LocalLog = { + sessionId: sessionId, + type: "type_client", + key: getSatpLogKey(sessionId, "type_client", "operation_client"), + operation: "operation_client", + timestamp: new Date().toISOString(), + data: safeStableStringify(testData), + sequenceNumber: Number(testData.lastSequenceNumber), + }; + + await crashManager.localRepository.create(existingLogEntry); + + // Create the log entry for server + const extraLogEntry: LocalLog = { + sessionId: sessionId, + type: "type_server", + key: getSatpLogKey(sessionId, "type_server", "operation_server"), + operation: "operation_server", + timestamp: new Date().toISOString(), + data: safeStableStringify(testData), + sequenceNumber: Number(testData.lastSequenceNumber) + 1, + }; + + // RecoverUpdateMessage to simulate receiving the log from server + const recoverUpdateMessage = { + sessionId: sessionId, + messageType: "urn:ietf:SATP-2pc:msgtype:recover-update-msg", + hashRecoverMessage: "", + recoveredLogs: [extraLogEntry], + senderSignature: "", + } as RecoverUpdateMessage; + + const result = + await crashManager["processRecoverUpdateMessage"](recoverUpdateMessage); + + expect(result).toBeTrue(); + + const reconstructedSession = crashManager["sessions"].get(sessionId); + expect(reconstructedSession).toBeDefined(); + + if (reconstructedSession) { + const reconstructedSessionData = + reconstructedSession.hasClientSessionData() + ? reconstructedSession.getClientSessionData() + : reconstructedSession.getServerSessionData(); + + expect(reconstructedSessionData).toBeDefined(); + expect(BigInt(reconstructedSessionData.lastSequenceNumber)).toEqual( + testData.lastSequenceNumber, + ); + } + }); +}); diff --git a/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/services.test.ts b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/services.test.ts index a81a66110c..318bdd4014 100644 --- a/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/services.test.ts +++ b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/services.test.ts @@ -71,15 +71,15 @@ import { ILocalLogRepository, IRemoteLogRepository, } from "../../../main/typescript/repository/interfaces/repository"; -import { knexClientConnection, knexRemoteConnection } from "../knex.config"; +import { knexClientConnection, knexRemoteConnection1 } from "../knex.config"; import { Knex, knex } from "knex"; import { KnexLocalLogRepository as LocalLogRepository } from "../../../main/typescript/repository/knex-local-log-repository"; import { KnexRemoteLogRepository as RemoteLogRepository } from "../../../main/typescript/repository/knex-remote-log-repository"; import { SATPLogger } from "../../../main/typescript/logging"; +import { create, isMessage } from "@bufbuild/protobuf"; let knexInstanceClient: Knex; // test as a client let knexInstanceRemote: Knex; -import { create, isMessage } from "@bufbuild/protobuf"; const logLevel: LogLevelDesc = "DEBUG"; @@ -146,11 +146,11 @@ beforeAll(async () => { knexInstanceClient = knex(knexClientConnection); await knexInstanceClient.migrate.latest(); - knexInstanceRemote = knex(knexRemoteConnection); + knexInstanceRemote = knex(knexRemoteConnection1); await knexInstanceRemote.migrate.latest(); localRepository = new LocalLogRepository(knexClientConnection); - remoteRepository = new RemoteLogRepository(knexRemoteConnection); + remoteRepository = new RemoteLogRepository(knexRemoteConnection1); dbLogger = new SATPLogger({ localRepository, remoteRepository, diff --git a/yarn.lock b/yarn.lock index 4e03bbca31..43161132b1 100644 --- a/yarn.lock +++ b/yarn.lock @@ -8990,6 +8990,7 @@ __metadata: "@types/fs-extra": "npm:11.0.4" "@types/google-protobuf": "npm:3.15.12" "@types/node": "npm:18.18.2" + "@types/node-cron": "npm:3.0.11" "@types/pg": "npm:8.11.10" "@types/swagger-ui-express": "npm:4.1.6" "@types/tape": "npm:4.13.4" @@ -9018,6 +9019,7 @@ __metadata: knex: "npm:2.4.0" kubo-rpc-client: "npm:3.0.1" make-dir-cli: "npm:3.1.0" + node-cron: "npm:3.0.2" npm-run-all: "npm:4.1.5" openzeppelin-solidity: "npm:3.4.2" pg: "npm:8.13.1" @@ -15751,6 +15753,13 @@ __metadata: languageName: node linkType: hard +"@types/node-cron@npm:3.0.11": + version: 3.0.11 + resolution: "@types/node-cron@npm:3.0.11" + checksum: 10/a73f69bcca52a5f3b1671cfb00a8e4a1d150d0aef36a611564a2f94e66b6981bade577e267ceeeca6fcee241768902d55eb8cf3a81f9ef4ed767a23112fdb16d + languageName: node + linkType: hard + "@types/node-fetch@npm:2.6.2": version: 2.6.2 resolution: "@types/node-fetch@npm:2.6.2" @@ -38068,6 +38077,15 @@ __metadata: languageName: node linkType: hard +"node-cron@npm:3.0.2": + version: 3.0.2 + resolution: "node-cron@npm:3.0.2" + dependencies: + uuid: "npm:8.3.2" + checksum: 10/71d4ce22425d0f2a7bd9753149da475317725f6890d5a55e5e43a97062456faeda984d55c17aee9699552460525cee948662fb75124065810e4038b2f56a9d32 + languageName: node + linkType: hard + "node-domexception@npm:^1.0.0": version: 1.0.0 resolution: "node-domexception@npm:1.0.0" @@ -44735,13 +44753,20 @@ __metadata: languageName: node linkType: hard -"safe-stable-stringify@npm:2.5.0, safe-stable-stringify@npm:^2.3.1, safe-stable-stringify@npm:^2.4.3": +"safe-stable-stringify@npm:2.5.0, safe-stable-stringify@npm:^2.4.3": version: 2.5.0 resolution: "safe-stable-stringify@npm:2.5.0" checksum: 10/2697fa186c17c38c3ca5309637b4ac6de2f1c3d282da27cd5e1e3c88eca0fb1f9aea568a6aabdf284111592c8782b94ee07176f17126031be72ab1313ed46c5c languageName: node linkType: hard +"safe-stable-stringify@npm:^2.3.1": + version: 2.3.1 + resolution: "safe-stable-stringify@npm:2.3.1" + checksum: 10/8a6ed4e5fb80694970f1939538518c44a59c71c74305e12b5964cbe3850636212eddac881da1f676b0232015213676e07750fe75bc402afbfe29851c8b52381e + languageName: node + linkType: hard + "safer-buffer@npm:>= 2.1.2 < 3, safer-buffer@npm:>= 2.1.2 < 3.0.0, safer-buffer@npm:^2.0.2, safer-buffer@npm:^2.1.0, safer-buffer@npm:~2.1.0": version: 2.1.2 resolution: "safer-buffer@npm:2.1.2"