diff --git a/packages/cactus-plugin-satp-hermes/package.json b/packages/cactus-plugin-satp-hermes/package.json index cd23c9f7c6b..3894052240a 100644 --- a/packages/cactus-plugin-satp-hermes/package.json +++ b/packages/cactus-plugin-satp-hermes/package.json @@ -129,7 +129,7 @@ "kubo-rpc-client": "3.0.1", "npm-run-all": "4.1.5", "openzeppelin-solidity": "3.4.2", - "pg": "^8.8.0", + "pg": "^8.13.0", "secp256k1": "4.0.3", "socket.io": "4.6.2", "sqlite3": "5.1.5", diff --git a/packages/cactus-plugin-satp-hermes/src/main/proto/cacti/satp/v02/crash_recovery.proto b/packages/cactus-plugin-satp-hermes/src/main/proto/cacti/satp/v02/crash_recovery.proto index 2b3abdb061e..d94d083b05d 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/proto/cacti/satp/v02/crash_recovery.proto +++ b/packages/cactus-plugin-satp-hermes/src/main/proto/cacti/satp/v02/crash_recovery.proto @@ -78,4 +78,14 @@ message RollbackLogEntry { string action = 4; // action performed during rollback string status = 5; // status of rollback (e.g., SUCCESS, FAILED) string details = 6; // Additional details or metadata about the rollback +} + +message RollbackState { + string session_id = 1; + string current_stage = 2; + int32 steps_remaining = 3; + repeated RollbackLogEntry rollback_log_entries = 4; + string estimated_time_to_completion = 5; + string status = 6; // Overall status (e.g., IN_PROGRESS, COMPLETED, FAILED) + string details = 7; // Additional metadata or information } \ No newline at end of file diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-manager.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-manager.ts index 90f6984602c..93c7ac2d528 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-manager.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-manager.ts @@ -8,7 +8,6 @@ import { SessionData } from "../../generated/proto/cacti/satp/v02/common/session import { CrashRecoveryHandler } from "./crash-recovery-handler"; import { SATPSession } from "../satp-session"; import { - RollbackState, RollbackStrategy, RollbackStrategyFactory, } from "./rollback/rollback-strategy-factory"; @@ -16,13 +15,20 @@ import { CrashRecoveryService } from "./crash-utils"; import { KnexLocalLogRepository as LocalLogRepository } from "../../repository/knex-local-log-repository"; import { ILocalLogRepository } from "../../repository/interfaces/repository"; import { Knex } from "knex"; -import { SATPBridgeConfig, LocalLog } from "../types"; +import { + RecoverMessage, + RecoverSuccessMessage, + RecoverUpdateMessage, + RollbackState, +} from "../../generated/proto/cacti/satp/v02/crash_recovery_pb"; import { SessionType } from "../session-utils"; +import { ISATPBridgesOptions } from "../../gol/satp-bridges-manager"; -enum CrashStatus { +export enum CrashStatus { IN_RECOVERY = "IN_RECOVERY", RECOVERED = "RECOVERED", NO_CRASH = "NO_CRASH", + ERROR = "ERROR", } class CrashOccurrence { @@ -37,7 +43,7 @@ export interface ICrashRecoveryManagerOptions { logLevel?: LogLevelDesc; instanceId: string; knexConfig?: Knex.Config; - bridgeConfig: SATPBridgeConfig; + bridgeConfig: ISATPBridgesOptions; } export class CrashRecoveryManager { @@ -59,8 +65,11 @@ export class CrashRecoveryManager { this.instanceId = options.instanceId; this.sessions = new Map(); this.log.info(`Instantiated ${this.className} OK`); - this.factory = new RollbackStrategyFactory(options.bridgeConfig); this.logRepository = new LocalLogRepository(options.knexConfig); + this.factory = new RollbackStrategyFactory( + options.bridgeConfig, + this.logRepository, + ); const crashRecoveryServiceOptions = { logLevel: this.options.logLevel, instanceId: this.instanceId, @@ -85,72 +94,39 @@ export class CrashRecoveryManager { return CrashRecoveryManager.CLASS_NAME; } - public async init(): Promise { - this.sessions = await this.getSessions(); - } - - // todo read from local log to get session data - /*private async getSessions(): Map { - const sessionMap = new Map(); - try { - const allSessions = await this.logRepository.readLogsNotProofs(); - allSessions.forEach((log) => { - const sessionData = new SessionData(); - sessionData.id = log.sessionID; - - sessionMap.set(log.sessionID, sessionData); - }); - } catch (error) { - this.log.error(`Error initializing sessions: ${error}`); - } - - return sessionMap; - }*/ - - private async getSessions(): Promise> { - const sessionMap = new Map(); + public async recoverSessions() { + const fnTag = `${this.className}#recoverSessions()`; try { const allLogs = await this.logRepository.readLogsNotProofs(); - for (const log of allLogs) { const sessionId = log.sessionID; + this.log.info(`${fnTag}, recovering session: ${sessionId}`); - let sessionData = sessionMap.get(sessionId); - if (!sessionData) { - sessionData = new SessionData(); - sessionData.id = sessionId; - sessionMap.set(sessionId, sessionData); + if (log == undefined || log.data == undefined) { + throw new Error(`${fnTag}, invalid log}`); } try { - const logEntry = JSON.parse(log.data); - - Object.assign(sessionData, logEntry); - - if (logEntry.sequenceNumber !== undefined) { - sessionData.lastSequenceNumber = logEntry.sequenceNumber; - } + const logEntry: SessionData = JSON.parse(log.data); + this.sessions.set(sessionId, logEntry); } catch (error) { this.log.error( - `Error parsing log data for session ${sessionId}: ${error}`, + `Error parsing log data for session Id: ${sessionId}: ${error}`, ); } } } catch (error) { this.log.error(`Error initializing sessions: ${error}`); } - - return sessionMap; } - // todo create util functoin that retrieves sessionid and checks if it is valid; i believe it is implemented in the satp services, refactor making it reusable private async checkCrash(session: SATPSession): Promise { - // todo implement crash check - check logs and understsands if there was a crash; might use timouts, etc - const fnTag = `${this.className}#checkCrash()`; + const sessionData = session.hasClientSessionData() + ? session.getClientSessionData() + : session.getServerSessionData(); - // check the logs and from the timeout logic make out try { session.verify( fnTag, @@ -158,27 +134,25 @@ export class CrashRecoveryManager { ? SessionType.CLIENT : SessionType.SERVER, ); + const lastLog = await this.logRepository.readLastestLog( session.getSessionId(), ); - if (lastLog && lastLog.operation !== "COMPLETED") { + + if (lastLog && lastLog.operation !== "done") { this.log.debug( - `${fnTag} Crash detected for session ${session.getSessionId()}`, + `${fnTag} Crash detected for session ID: ${session.getSessionId()} last log operation: ${lastLog.operation}`, ); return CrashStatus.IN_RECOVERY; } - const sessionData = session.hasClientSessionData() - ? session.getClientSessionData() - : session.getServerSessionData(); - - const logTimestamp = Number(lastLog.timestamp); + const logTimestamp = new Date(lastLog?.timestamp ?? 0).getTime(); const currentTime = new Date().getTime(); const timeDifference = currentTime - logTimestamp; if (timeDifference > Number(sessionData.maxTimeout)) { this.log.warn( - `${fnTag} Timeout exceeded for session ID: ${session.getSessionId()}`, + `${fnTag} Timeout exceeded by ${timeDifference} ms for session ID: ${session.getSessionId()}`, ); return CrashStatus.IN_RECOVERY; } @@ -188,34 +162,31 @@ export class CrashRecoveryManager { ); return CrashStatus.NO_CRASH; } catch (error) { - this.log.error(`${fnTag} Error detecting crash: ${error}`); - return CrashStatus.NO_CRASH; + this.log.error(`${fnTag} Error occured !`); + return CrashStatus.ERROR; } } - public async checkAndResolveCrash(sessionId: SATPSession): Promise { + public async checkAndResolveCrash(session: SATPSession): Promise { const fnTag = `${this.className}#checkAndResolveCrash()`; - this.log.info(`${fnTag} Checking crash status for session ${sessionId}`); - try { - const sessionData = sessionId.hasClientSessionData() - ? sessionId.getClientSessionData() - : sessionId.getServerSessionData(); + const sessionData = session.hasClientSessionData() + ? session.getClientSessionData() + : session.getServerSessionData(); - if (!sessionData) { - throw new Error(`${fnTag}, session data is not correctly initialized`); - } + if (!sessionData) { + throw new Error(`${fnTag}, session data is not correctly initialized`); + } + try { let attempts = 0; let crashOccurrence: CrashOccurrence | undefined; while (attempts < BigInt(sessionData.maxRetries)) { - const crashStatus = await this.checkCrash(sessionId); + const crashStatus = await this.checkCrash(session); if (crashStatus === CrashStatus.IN_RECOVERY) { - this.log.info( - `${fnTag} Crash detected. Attempting recovery for session ${sessionId}`, - ); + this.log.info(`${fnTag} Crash detected! Attempting recovery`); if (!crashOccurrence) { crashOccurrence = new CrashOccurrence( @@ -227,55 +198,90 @@ export class CrashRecoveryManager { crashOccurrence.lastUpdate = new Date(); } - await this.handleRecovery(sessionId); - this.log.info(`${fnTag} Recovery successful.`); - - crashOccurrence.status = CrashStatus.RECOVERED; - - return true; + const status = await this.handleRecovery(session); + if (status) { + crashOccurrence.status = CrashStatus.RECOVERED; + this.log.info( + `${fnTag} Recovery successful for sessionID: ${session.getSessionId()}`, + ); + return; + } } attempts++; this.log.info( - `${fnTag} Retry attempt ${attempts} for session ${sessionId}`, + `${fnTag} Retry attempt ${attempts} for sessionID: ${session.getSessionId()}`, ); } - - this.log.warn(`${fnTag} All retries exhausted. Initiating rollback.`); - await this.initiateRollback(sessionId, true); - return false; + if (attempts != 0) { + this.log.warn(`${fnTag} All retries exhausted ! Initiating Rollback`); + const rollBackStatus = await this.initiateRollback(session, true); + if (rollBackStatus) { + this.log.info( + `${fnTag} rollback was success: ${session.getSessionId()}`, + ); + } else { + this.log.error( + `${fnTag} rollback failed ! ${session.getSessionId()}`, + ); + } + } } catch (error) { this.log.error(`${fnTag} Error during crash resolution: ${error}`); - return false; } } - public async handleRecovery(session: SATPSession): Promise { + public async handleRecovery(session: SATPSession): Promise { const fnTag = `${this.className}#handleRecovery()`; + const sessionData = session.hasClientSessionData() + ? session.getClientSessionData() + : session.getServerSessionData(); + if (!sessionData) { + throw new Error(`${fnTag}, session data is not correctly initialized`); + } + try { - if (session.hasServerSessionData()) { - this.log.info( - `${fnTag} Initiating recovery as a server for session ID: ${session.getSessionId()}`, + this.log.info( + `${fnTag} Initiating recovery for session ID: ${session.getSessionId()}`, + ); + + const recoverMessage = new RecoverMessage({ + sessionId: session.getSessionId(), + messageType: "urn:ietf:SATP-2pc:msgtype:recover-msg", + satpPhase: "sessionData.hashes?.stage0", //todo: get phase info + sequenceNumber: Number(sessionData.lastSequenceNumber), + isBackup: false, + newIdentityPublicKey: "", + lastEntryTimestamp: sessionData.lastSequenceNumber, + senderSignature: "", + }); + + const recoverUpdateMessage = + await this.crashRecoveryHandler.handleRecover(recoverMessage); + + const response = await this.processRecoverUpdate(recoverUpdateMessage); + + if (response) { + const recoverSuccessMessage = new RecoverSuccessMessage({ + sessionId: session.getSessionId(), + messageType: "urn:ietf:SATP-2pc:msgtype:recover-success-msg", + hashRecoverUpdateMessage: "", + success: true, + entriesChanged: [], + senderSignature: "", + }); + + await this.crashRecoveryHandler.handleRecoverSuccess( + recoverSuccessMessage, ); - } else if (session.hasClientSessionData()) { + this.log.info( - `${fnTag} Initiating recovery as a client for session ID: ${session.getSessionId()}`, + `${fnTag} Recovery handled successfully for session ID: ${session.getSessionId()}`, ); + return true; } else { - throw new Error( - `${fnTag} Neither client nor server session data is available for session ID: ${session.getSessionId()}`, - ); + return false; } - - const recoverMessage = - await this.crashRecoveryHandler.sendRecover(session); - const recoverUpdateMessage = - await this.crashRecoveryHandler.sendRecoverUpdate(recoverMessage); - await this.crashRecoveryHandler.sendRecoverSuccess(recoverUpdateMessage); - - this.log.info( - `${fnTag} Recovery handled successfully for session ID: ${session.getSessionId()}`, - ); } catch (error) { this.log.error( `${fnTag} Error during recovery process for session ID: ${session.getSessionId()} - ${error}`, @@ -286,29 +292,34 @@ export class CrashRecoveryManager { } } + private processRecoverUpdate( + message: RecoverUpdateMessage, + ): Promise { + this.log.debug("Message received: ", message.messageType); + // get the logs from counterparty gateway and sync with current session + this.log.debug(`Session processed & updated with RecoverUpdateMessage`); + + return Promise.resolve(true); + } + public async initiateRollback( session: SATPSession, forceRollback?: boolean, ): Promise { const fnTag = `CrashRecoveryManager#initiateRollback()`; + + const sessionData = session.hasClientSessionData() + ? session.getClientSessionData() + : session.getServerSessionData(); + if (!sessionData) { + throw new Error(`${fnTag}, session data is not correctly initialized`); + } this.log.info( `${fnTag} Initiating rollback for session ${session.getSessionId()}`, ); try { - // Implement check for rollback (needs to read logs, etc) OR we assume that at satp handler/service layer this check is done and rollback is good to do - - const sessionLog: LocalLog = await this.logRepository.readLastestLog( - session.getSessionId(), - ); - - let shouldRollback = false; - if (sessionLog.operation !== "COMPLETED") { - shouldRollback = true; - } - - if (forceRollback || shouldRollback) { - // send bridge manager and possibly others to factory + if (forceRollback) { const strategy = this.factory.createStrategy(session); const rollbackState = await this.executeRollback(strategy, session); diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-recovery-handler.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-recovery-handler.ts index a8af8ceafb7..3872e66e48e 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-recovery-handler.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-recovery-handler.ts @@ -15,8 +15,8 @@ import { } from "@hyperledger/cactus-common"; import { Empty } from "@bufbuild/protobuf"; import { SessionData } from "../../generated/proto/cacti/satp/v02/common/session_pb"; -import { SATPSession } from "../satp-session"; import { ILocalLogRepository } from "../../repository/interfaces/repository"; +import { getSatpLogKey } from "../../gateway-utils"; interface HandlerOptions { crashService: CrashRecoveryService; @@ -27,16 +27,16 @@ interface HandlerOptions { export class CrashRecoveryHandler { public static readonly CLASS_NAME = "CrashRecoveryHandler"; - private sessions: Map; + public sessions: Map; private service: CrashRecoveryService; - private logger: Logger; + private log: Logger; private logRepository: ILocalLogRepository; constructor(ops: HandlerOptions) { this.sessions = ops.sessions; this.service = ops.crashService; - this.logger = LoggerProvider.getOrCreate(ops.loggerOptions); - this.logger.trace(`Initialized ${CrashRecoveryHandler.CLASS_NAME}`); + this.log = LoggerProvider.getOrCreate(ops.loggerOptions); + this.log.trace(`Initialized ${CrashRecoveryHandler.CLASS_NAME}`); this.logRepository = ops.logRepository; } @@ -44,189 +44,137 @@ export class CrashRecoveryHandler { return CrashRecoveryHandler.CLASS_NAME; } - public get Log(): Logger { - return this.logger; - } - - private generateKey(): string { - //todo: key generation logic - return "key"; - } + async handleRecover(req: RecoverMessage): Promise { + const fnTag = `${this.getHandlerIdentifier()}#handleRecover`; + this.log.debug(`${fnTag}, handling RecoverMessage: ${JSON.stringify(req)}`); - async sendRecover(req: SATPSession): Promise { - const fnTag = `${this.getHandlerIdentifier()}#sendRecover`; try { - this.Log.debug(`${fnTag}, Recover V2 Message...`); - - const sessionId = req.getSessionId(); - const sessionData = this.sessions.get(sessionId); + const sessionData = this.sessions.get(req.sessionId); if (!sessionData) { throw new Error(`${fnTag}, Session not found`); } - const recoverMessage = new RecoverMessage({ - sessionId: sessionId, - messageType: "Recover", - satpPhase: "phase", - sequenceNumber: Number(sessionData.lastSequenceNumber), - isBackup: false, - newIdentityPublicKey: "", - lastEntryTimestamp: sessionData.lastSequenceNumber, - senderSignature: "", - }); - - const updateMessage = - this.service.createRecoverUpdateMessage(recoverMessage); + const updateMessage = await this.service.createRecoverUpdateMessage(req); + this.log.debug(`${fnTag}, Created RecoverUpdateMessage`); const logEntry = { - sessionID: sessionId, + sessionID: req.sessionId, type: "RECOVER", - key: "key", // generateKey(), - operation: "RECOVER_MESSAGE_SENT", + key: getSatpLogKey(req.sessionId, "RECOVER", "init"), + operation: "init", timestamp: new Date().toISOString(), - data: "", + data: JSON.stringify(sessionData), }; await this.logRepository.create(logEntry); return updateMessage; } catch (error) { - throw new Error(`${fnTag}, Failed to process RecoverV2Message ${error}`); + this.log.error(`${fnTag}, Failed to handle RecoverMessage: ${error}`); + throw error; } } - async sendRecoverUpdate( - req: RecoverUpdateMessage, - ): Promise { - const fnTag = `${this.getHandlerIdentifier()}#handleRecoverUpdateMessage()`; - try { - this.Log.debug(`${fnTag}, Handling Recover Update Message...`); + async handleRecoverSuccess(req: RecoverSuccessMessage): Promise { + const fnTag = `${this.getHandlerIdentifier()}#handleRecoverSuccess`; + this.log.debug(`${fnTag}, Handling RecoverSuccessMessage`); + try { const sessionData = this.sessions.get(req.sessionId); if (!sessionData) { - throw new Error( - `${fnTag}, session data not found for ID: ${req.sessionId}`, - ); - } - - const successMessage = this.service.createRecoverSuccessMessage(req); - - this.Log.debug(`${fnTag}, Recover Success Message created`); - const logEntry = { - sessionID: req.sessionId, - type: "RECOVER_UPDATE", - key: "key", // generateKey(), - operation: "RECOVER_UPDATE_MESSAGE_SENT", - timestamp: new Date().toISOString(), - data: "", - }; - - await this.logRepository.create(logEntry); - - return successMessage; - } catch (error) { - throw new Error( - `${fnTag}, Error handling Recover Update Message: ${error}`, - ); - } - } - - async sendRecoverSuccess(req: RecoverSuccessMessage): Promise { - const fnTag = `${this.getHandlerIdentifier()}#handleRecoverSuccessMessage()`; - try { - this.Log.debug(`${fnTag}, Handling Recover Success Message...`); - - const session = this.sessions.get(req.sessionId); - if (!session) { throw new Error(`${fnTag}, Session not found`); } - this.Log.debug(`${fnTag}, Session recovery successfully completed`); const logEntry = { sessionID: req.sessionId, type: "RECOVER_SUCCESS", - key: "key", // generateKey(), + key: getSatpLogKey(req.sessionId, "RECOVER_SUCCESS", "init"), operation: "RECOVER_SUCCESS_MESSAGE_SENT", timestamp: new Date().toISOString(), - data: "", + data: JSON.stringify(sessionData), }; await this.logRepository.create(logEntry); - return new Empty(); } catch (error) { - throw new Error( - `${fnTag}, Error handling Recover Success Message: ${error}`, + this.log.error( + `${fnTag}, Error handling RecoverSuccessMessage: ${error}`, ); + throw error; } } - async sendRollback(req: RollbackMessage): Promise { - const fnTag = `${this.getHandlerIdentifier()}#handleRollbackMessage()`; - try { - this.Log.debug(`${fnTag}, Handling Rollback Message...`); + async handleRollback(req: RollbackMessage): Promise { + const fnTag = `${this.getHandlerIdentifier()}#handleRollback`; + this.log.debug(`${fnTag}, Handling RollbackMessage`); - const session = this.sessions.get(req.sessionId); - if (!session) { + try { + const sessionData = this.sessions.get(req.sessionId); + if (!sessionData) { throw new Error(`${fnTag}, Session not found`); } const ackMessage = this.service.createRollbackAckMessage(req); - this.Log.debug(`${fnTag}, Rollback Ack Message created`); const logEntry = { sessionID: req.sessionId, type: "ROLLBACK", - key: "key", //generateKey(), + key: getSatpLogKey(req.sessionId, "ROLLBACK", "init"), operation: "ROLLBACK_MESSAGE_SENT", timestamp: new Date().toISOString(), - data: "", + data: JSON.stringify(sessionData), }; await this.logRepository.create(logEntry); - return ackMessage; } catch (error) { - throw new Error(`${fnTag}, Error handling Rollback Message: ${error}`); + this.log.error(`${fnTag}, Error handling RollbackMessage: ${error}`); + throw error; } } - async sendRollbackAck(req: RollbackAckMessage): Promise { - const fnTag = `${this.getHandlerIdentifier()}#handleRollbackAckMessage()`; - try { - this.Log.debug(`${fnTag}, Handling Rollback Ack Message...`); + async handleRollbackAck(req: RollbackAckMessage): Promise { + const fnTag = `${this.getHandlerIdentifier()}#handleRollbackAck`; + this.log.debug(`${fnTag}, Handling RollbackAckMessage`); - const session = this.sessions.get(req.sessionId); - if (!session) { + try { + const sessionData = this.sessions.get(req.sessionId); + if (!sessionData) { throw new Error(`${fnTag}, Session not found`); } - this.Log.debug(`${fnTag}, Rollback successfully acknowledged`); const logEntry = { sessionID: req.sessionId, - type: "ROLLBACK_", - key: "key", //generateKey(), - operation: "ROLLBACK_", + type: "ROLLBACK_ACK", + key: getSatpLogKey(req.sessionId, "ROLLBACK_ACK", "init"), + operation: "ROLLBACK_ACK", timestamp: new Date().toISOString(), - data: "", + data: JSON.stringify(sessionData), }; await this.logRepository.create(logEntry); - return new Empty(); } catch (error) { - throw new Error( - `${fnTag}, Error handling Rollback Ack Message: ${error}`, - ); + this.log.error(`${fnTag}, Error handling RollbackAckMessage: ${error}`); + throw error; } } setupRouter(router: ConnectRouter): void { + // eslint-disable-next-line @typescript-eslint/no-this-alias + const that = this; router.service(CrashRecovery, { - recoverV2Message: this.sendRecover, - recoverV2UpdateMessage: this.sendRecoverUpdate, - recoverV2SuccessMessage: this.sendRecoverSuccess, - rollbackV2Message: this.sendRollback, - rollbackV2AckMessage: this.sendRollbackAck, + async recoverV2Message(req) { + return await that.handleRecover(req); + }, + async recoverV2SuccessMessage(req) { + return await that.handleRecoverSuccess(req); + }, + async rollbackV2Message(req) { + return await that.handleRollback(req); + }, + async rollbackV2AckMessage(req) { + return await that.handleRollbackAck(req); + }, }); } } diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-utils.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-utils.ts index 4a73a8635bf..77982d857a7 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-utils.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-utils.ts @@ -6,7 +6,6 @@ import { import { RecoverMessage, RecoverUpdateMessage, - RecoverSuccessMessage, RollbackMessage, RollbackAckMessage, } from "../../generated/proto/cacti/satp/v02/crash_recovery_pb"; @@ -18,48 +17,33 @@ interface ICrashRecoveryServiceOptions { } export class CrashRecoveryService { - private readonly logger: Logger; + private readonly log: Logger; private readonly logRepository: ILocalLogRepository; constructor(options: ICrashRecoveryServiceOptions) { - this.logger = LoggerProvider.getOrCreate(options.loggerOptions); + this.log = LoggerProvider.getOrCreate(options.loggerOptions); this.logRepository = options.logRepository; } async createRecoverUpdateMessage( request: RecoverMessage, ): Promise { - this.logger.debug("Creating RecoverUpdateMessage..."); + this.log.debug("Creating RecoverUpdateMessage..."); const recoveredLogs = await this.logRepository.readLogsMoreRecentThanTimestamp( request.lastEntryTimestamp.toString(), ); - return new RecoverUpdateMessage({ sessionId: request.sessionId, - messageType: "urn:ietf:SATP-2pc:msgtype:recover-msg", + messageType: "urn:ietf:SATP-2pc:msgtype:recover-update-msg", hashRecoverMessage: "", recoveredLogs: recoveredLogs, senderSignature: "", }); } - createRecoverSuccessMessage( - request: RecoverUpdateMessage, - ): RecoverSuccessMessage { - this.logger.debug("Creating RecoverSuccessMessage..."); - return new RecoverSuccessMessage({ - sessionId: request.sessionId, - messageType: "urn:ietf:SATP-2pc:msgtype:recover-update-msg", - hashRecoverUpdateMessage: "", - success: true, - entriesChanged: [], - senderSignature: "", - }); - } - createRollbackAckMessage(request: RollbackMessage): RollbackAckMessage { - this.logger.debug("Creating RollbackAckMessage..."); + this.log.debug("Creating RollbackAckMessage..."); return new RollbackAckMessage({ sessionId: request.sessionId, messageType: "urn:ietf:SATP-2pc:msgtype:rollback-msg", @@ -73,23 +57,15 @@ export class CrashRecoveryService { async sendRecoverMessage( message: RecoverMessage, ): Promise { - this.logger.debug("Sending RecoverMessage..."); + this.log.debug("Sending RecoverMessage..."); const updateMessage = await this.createRecoverUpdateMessage(message); return updateMessage; } - async sendRecoverUpdateMessage( - message: RecoverUpdateMessage, - ): Promise { - this.logger.debug("Sending RecoverUpdateMessage..."); - const successMessage = this.createRecoverSuccessMessage(message); - return successMessage; - } - async sendRollbackMessage( message: RollbackMessage, ): Promise { - this.logger.debug("Sending RollbackMessage..."); + this.log.debug("Sending RollbackMessage..."); const ackMessage = this.createRollbackAckMessage(message); return ackMessage; } diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/rollback-strategy-factory.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/rollback-strategy-factory.ts index 901d2194bea..54f432d8d3d 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/rollback-strategy-factory.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/rollback-strategy-factory.ts @@ -4,17 +4,19 @@ import { Stage0RollbackStrategy } from "./stage0-rollback-strategy"; import { Stage1RollbackStrategy } from "./stage1-rollback-strategy"; import { Stage2RollbackStrategy } from "./stage2-rollback-strategy"; import { Stage3RollbackStrategy } from "./stage3-rollback-strategy"; -import { RollbackLogEntry } from "../../../generated/proto/cacti/satp/v02/crash_recovery_pb"; -import { SATPBridgeManager } from "../../stage-services/satp-bridge/satp-bridge-manager"; -import { SATPBridgeConfig } from "../../types"; +import { + ISATPBridgesOptions, + SATPBridgesManager, +} from "../../../gol/satp-bridges-manager"; +import { RollbackState } from "../../../generated/proto/cacti/satp/v02/crash_recovery_pb"; +import { ILocalLogRepository } from "../../../repository/interfaces/repository"; -export interface RollbackState { +/*export interface RollbackState { currentStage: string; // todo add rollback state // placeholder, should import RollbackLogEntry from protos. // RollbackLogEntry in spec = RollbackState in code - rollbackLogEntry: RollbackLogEntry; -} +}*/ export interface RollbackStrategy { execute(session: SATPSession): Promise; @@ -24,39 +26,48 @@ export interface RollbackStrategy { export class RollbackStrategyFactory { private log: Logger; - private bridgeManager: SATPBridgeManager; + private bridgesManager: SATPBridgesManager; + private logRepository: ILocalLogRepository; - constructor(config: SATPBridgeConfig) { + constructor(options: ISATPBridgesOptions, localLog: ILocalLogRepository) { this.log = LoggerProvider.getOrCreate({ label: "RollbackStrategyFactory" }); - this.bridgeManager = new SATPBridgeManager(config); + this.bridgesManager = new SATPBridgesManager(options); + this.logRepository = localLog; } - // todo add bridge manager and possibly others so each strategy can connect to satp bridge createStrategy(session: SATPSession): RollbackStrategy { const fnTag = "RollbackStrategyFactory#createStrategy"; const sessionData = session.hasClientSessionData() - ? session.getClientSessionData()! - : session.getServerSessionData()!; - const rollbackLogEntry = new RollbackLogEntry(); + ? session.getClientSessionData() + : session.getServerSessionData(); if (!sessionData.hashes) { this.log.debug(`${fnTag} Creating Stage0RollbackStrategy`); - return new Stage0RollbackStrategy(rollbackLogEntry); + return new Stage0RollbackStrategy(this.logRepository); } else if ( !sessionData.hashes.stage2 || Object.keys(sessionData.hashes.stage2).length === 0 ) { this.log.debug(`${fnTag} Creating Stage1RollbackStrategy`); - return new Stage1RollbackStrategy(this.bridgeManager, rollbackLogEntry); + return new Stage1RollbackStrategy( + this.bridgesManager, + this.logRepository, + ); } else if ( !sessionData.hashes.stage3 || Object.keys(sessionData.hashes.stage3).length === 0 ) { this.log.debug(`${fnTag} Creating Stage2RollbackStrategy`); - return new Stage2RollbackStrategy(this.bridgeManager, rollbackLogEntry); + return new Stage2RollbackStrategy( + this.bridgesManager, + this.logRepository, + ); } else { this.log.debug(`${fnTag} Creating Stage3RollbackStrategy`); - return new Stage3RollbackStrategy(this.bridgeManager, rollbackLogEntry); + return new Stage3RollbackStrategy( + this.bridgesManager, + this.logRepository, + ); } } } diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage0-rollback-strategy.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage0-rollback-strategy.ts index fd2b5ec709c..b4498218660 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage0-rollback-strategy.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage0-rollback-strategy.ts @@ -1,74 +1,87 @@ import { Logger, LoggerProvider } from "@hyperledger/cactus-common"; import { SATPSession } from "../../satp-session"; -import { RollbackState, RollbackStrategy } from "./rollback-strategy-factory"; -import { RollbackLogEntry } from "../../../generated/proto/cacti/satp/v02/crash_recovery_pb"; +import { RollbackStrategy } from "./rollback-strategy-factory"; +import { + RollbackLogEntry, + RollbackState, +} from "../../../generated/proto/cacti/satp/v02/crash_recovery_pb"; +import { ILocalLogRepository } from "../../../repository/interfaces/repository"; export class Stage0RollbackStrategy implements RollbackStrategy { private log: Logger; - private rollbackLogEntry: RollbackLogEntry; + private logRepository: ILocalLogRepository; - constructor(logEntry: RollbackLogEntry) { + constructor(localLog: ILocalLogRepository) { this.log = LoggerProvider.getOrCreate({ label: "Stage0RollbackStrategy" }); - this.rollbackLogEntry = logEntry; + this.logRepository = localLog; } - // return a rollback state in all strategies async execute(session: SATPSession): Promise { const fnTag = "Stage0RollbackStrategy#execute"; this.log.info(`${fnTag} Executing rollback for Stage 0`); - // check session exists if (!session) { throw new Error(`${fnTag}, session data is not correctly initialized`); } + const sessionData = session.hasClientSessionData() + ? session.getClientSessionData() + : session.getServerSessionData(); + + if (!sessionData) { + throw new Error(`${fnTag}, session data is not correctly initialized`); + } + const rollbackState = new RollbackState({ + sessionId: session.getSessionId(), + currentStage: String(sessionData.hashes?.stage0), + stepsRemaining: 0, + rollbackLogEntries: [], + estimatedTimeToCompletion: "0", + status: "IN_PROGRESS", + details: "", + }); try { - // TODO record the rollback on the log. Implement RollbackLogEntry - this.log.debug("Persisting rollback log entry"); + const rollbackLogEntry = new RollbackLogEntry({ + sessionId: session.getSessionId(), + stage: String(sessionData.hashes?.stage0), + timestamp: new Date().toISOString(), + action: "NO_ACTION_REQUIRED", + status: "SUCCESS", + details: "", + }); - this.rollbackLogEntry.sessionId = session.getSessionId(); - this.rollbackLogEntry.stage = "Stage0"; - this.rollbackLogEntry.timestamp = Date.now().toString(); - this.rollbackLogEntry.action = ""; - this.rollbackLogEntry.status = "SUCCESS"; - this.rollbackLogEntry.details = ""; + rollbackState.rollbackLogEntries.push(rollbackLogEntry); + rollbackState.status = "COMPLETED"; + rollbackState.details = "Rollback of Stage 0 completed successfully"; - this.log.info(`Successfully rolled back Stage 0`); + this.log.info(`${fnTag} Rollback of Stage 0 completed successfully`); - const state: RollbackState = { - currentStage: "Stage0", - rollbackLogEntry: this.rollbackLogEntry, - }; - await this.rollbackLogs.create(state); // todo: log for the rollbackentry + //await this.logRepository.create(logEntry); - return state; + return rollbackState; } catch (error) { - this.log.error(`Failed to rollback Stage 0: ${error}`); + this.log.error(`${fnTag} Error during rollback of Stage 0: ${error}`); - this.rollbackLogEntry.sessionId = session.getSessionId(); - this.rollbackLogEntry.stage = "Stage0"; - this.rollbackLogEntry.timestamp = Date.now().toString(); - this.rollbackLogEntry.action = ""; - this.rollbackLogEntry.status = "FAILURE"; - this.rollbackLogEntry.details = ""; + rollbackState.status = "FAILED"; + rollbackState.details = `Rollback of Stage 0 failed: ${error}`; - const state: RollbackState = { - currentStage: "Stage0", - rollbackLogEntry: this.rollbackLogEntry, - }; - await this.rollbackLogs.create(state); // todo: implement the correct log support - return state; + return rollbackState; } } async cleanup(session: SATPSession): Promise { const fnTag = "Stage0RollbackStrategy#cleanup"; - // for stage 0, do nothing - const state: RollbackState = { + this.log.info(`${fnTag} Cleanup not required for Stage 0`); + + const rollbackState = new RollbackState({ + sessionId: session.getSessionId(), currentStage: "Stage0", - }; - if (!session) { - this.log.error(`${fnTag} Session not found`); - } - return state; + stepsRemaining: 0, + rollbackLogEntries: [], + estimatedTimeToCompletion: "", + status: "", + details: "", + }); + + return rollbackState; } } diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage1-rollback-strategy.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage1-rollback-strategy.ts index bd50d4ca1e8..fbce7a40f75 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage1-rollback-strategy.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage1-rollback-strategy.ts @@ -1,58 +1,107 @@ import { Logger, LoggerProvider } from "@hyperledger/cactus-common"; import { SATPSession } from "../../satp-session"; -import { RollbackState, RollbackStrategy } from "./rollback-strategy-factory"; -import { RollbackLogEntry } from "../../../generated/proto/cacti/satp/v02/crash_recovery_pb"; -import { SATPBridgeManager } from "../../stage-services/satp-bridge/satp-bridge-manager"; +import { RollbackStrategy } from "./rollback-strategy-factory"; +import { + RollbackLogEntry, + RollbackState, +} from "../../../generated/proto/cacti/satp/v02/crash_recovery_pb"; +import { SATPBridgesManager } from "../../../gol/satp-bridges-manager"; +import { ILocalLogRepository } from "../../../repository/interfaces/repository"; export class Stage1RollbackStrategy implements RollbackStrategy { private log: Logger; - private bridgeManager: SATPBridgeManager; - private rollbackLogEntry: RollbackLogEntry; + private bridgesManager: SATPBridgesManager; + private logRepository: ILocalLogRepository; - constructor(bridgeManager: SATPBridgeManager, logEntry: RollbackLogEntry) { + constructor( + bridgeManager: SATPBridgesManager, + localLog: ILocalLogRepository, + ) { this.log = LoggerProvider.getOrCreate({ label: "Stage1RollbackStrategy" }); - this.bridgeManager = bridgeManager; - this.rollbackLogEntry = logEntry; + this.bridgesManager = bridgeManager; + this.logRepository = localLog; } async execute(session: SATPSession): Promise { const fnTag = "Stage1RollbackStrategy#execute"; this.log.info(`${fnTag} Executing rollback for Stage 1`); - if (!session) { + const sessionData = session.hasClientSessionData() + ? session.getClientSessionData() + : session.getServerSessionData(); + + if (!sessionData) { throw new Error(`${fnTag}, session data is not correctly initialized`); } - try { - // TODO: Implement Stage 1 specific rollback logic - - // TODO: Record the rollback on the log. Implement RollbackLogEntry - - const receipt = await this.bridgeManager.unwrapAsset("assetId"); - - this.log.info(`${fnTag}, Asset unlocked: ${receipt}`); - - this.rollbackLogEntry.sessionId = session.getSessionId(); - this.rollbackLogEntry.stage = "Stage1"; - this.rollbackLogEntry.timestamp = Date.now().toString(); - this.rollbackLogEntry.action = "UNWRAP"; - this.rollbackLogEntry.status = "SUCCESS"; - this.rollbackLogEntry.details = ""; - - this.log.debug("Persisting rollback log entry"); - - this.log.info(`Successfully rolled back Stage 1`); + const rollbackState = new RollbackState({ + sessionId: session.getSessionId(), + currentStage: String(sessionData.hashes?.stage1), + stepsRemaining: 0, + rollbackLogEntries: [], + estimatedTimeToCompletion: "", + status: "IN_PROGRESS", + details: "", + }); + + const network = sessionData.senderGatewayNetworkId; + + if (!network) { + throw new Error( + `${fnTag}: Unable to determine network from session data.`, + ); + } - const state: RollbackState = { - currentStage: "Stage1", - rollbackLogEntry: this.rollbackLogEntry, - }; + const bridgeManager = this.bridgesManager.getBridge(network); - await this.rollbackLogs.create(state); // todo: log support - return state; + try { + const assetId = sessionData.senderAsset?.tokenId; + + if (!assetId) { + throw new Error(`${fnTag}: Asset ID is undefined`); + } + + this.log.info(`${fnTag}, Asset Id: ${assetId}`); + + await bridgeManager.unwrapAsset(assetId); + + const rollbackLogEntry = new RollbackLogEntry({ + sessionId: session.getSessionId(), + stage: String(sessionData.hashes?.stage1), + timestamp: new Date().toISOString(), + action: "UNWRAP_ASSET", + status: "SUCCESS", + details: "", + }); + + rollbackState.rollbackLogEntries.push(rollbackLogEntry); + rollbackState.stepsRemaining = 0; + rollbackState.status = "COMPLETED"; + rollbackState.estimatedTimeToCompletion = "0"; + rollbackState.details = "Rollback of Stage 1 completed successfully"; + + this.log.info( + `${fnTag} Successfully rolled back Stage 1 for session ${session.getSessionId}`, + ); + //await this.logRepository.create(logEntry); + return rollbackState; } catch (error) { - this.log.error(`Failed to rollback Stage 1: ${error}`); - return false; + this.log.error(`${fnTag} Failed to rollback Stage 1: ${error}`); + + const rollbackLogEntry = new RollbackLogEntry({ + sessionId: sessionData.id, + stage: String(sessionData.hashes?.stage1), + timestamp: new Date().toISOString(), + action: "UNWRAP_ASSET", + status: "FAILED", + details: "", + }); + + rollbackState.rollbackLogEntries.push(rollbackLogEntry); + rollbackState.status = "FAILED"; + rollbackState.details = `Rollback of Stage 1 failed: ${error}`; + + return rollbackState; } } @@ -71,7 +120,7 @@ export class Stage1RollbackStrategy implements RollbackStrategy { try { // TODO: Implement Stage 1 specific cleanup logic - state.currentStage = "Stage1"; + //state.currentStage = ""; // TODO: Update other state properties as needed return state; diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage2-rollback-strategy.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage2-rollback-strategy.ts index 261e13384c6..35dcea9e38c 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage2-rollback-strategy.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage2-rollback-strategy.ts @@ -1,56 +1,110 @@ import { Logger, LoggerProvider } from "@hyperledger/cactus-common"; import { SATPSession } from "../../satp-session"; -import { RollbackState, RollbackStrategy } from "./rollback-strategy-factory"; -import { RollbackLogEntry } from "../../../generated/proto/cacti/satp/v02/crash_recovery_pb"; -import { SATPBridgeManager } from "../../stage-services/satp-bridge/satp-bridge-manager"; +import { RollbackStrategy } from "./rollback-strategy-factory"; +import { + RollbackLogEntry, + RollbackState, +} from "../../../generated/proto/cacti/satp/v02/crash_recovery_pb"; +import { SATPBridgesManager } from "../../../gol/satp-bridges-manager"; +import { ILocalLogRepository } from "../../../repository/interfaces/repository"; export class Stage2RollbackStrategy implements RollbackStrategy { private log: Logger; - private bridgeManager: SATPBridgeManager; - private rollbackLogEntry: RollbackLogEntry; + private bridgesManager: SATPBridgesManager; + private logRepository: ILocalLogRepository; - constructor(bridgeManager: SATPBridgeManager, logEntry: RollbackLogEntry) { + constructor( + bridgeManager: SATPBridgesManager, + localLog: ILocalLogRepository, + ) { this.log = LoggerProvider.getOrCreate({ label: "Stage2RollbackStrategy" }); - this.bridgeManager = bridgeManager; - this.rollbackLogEntry = logEntry; + this.bridgesManager = bridgeManager; + this.logRepository = localLog; } async execute(session: SATPSession): Promise { const fnTag = "Stage2RollbackStrategy#execute"; this.log.info(`${fnTag} Executing rollback for Stage 2`); - if (!session) { + const sessionData = session.hasClientSessionData() + ? session.getClientSessionData() + : session.getServerSessionData(); + + if (!sessionData) { throw new Error(`${fnTag}, session data is not correctly initialized`); } + const network = sessionData.senderGatewayNetworkId; - try { - // TODO: Implement Stage 2 specific rollback logic - - // TODO: Record the rollback on the log. Implement RollbackLogEntry - this.log.debug("Persisting rollback log entry"); - - const receipt = await this.bridgeManager.unlockAsset("assetId", Number()); - - this.log.info(`${fnTag}, Asset unlocked: ${receipt}`); - this.rollbackLogEntry.sessionId = session.getSessionId(); - this.rollbackLogEntry.stage = "Stage2"; - this.rollbackLogEntry.timestamp = Date.now().toString(); - this.rollbackLogEntry.action = "UNLOCK"; - this.rollbackLogEntry.status = "SUCCESS"; - this.rollbackLogEntry.details = ""; + if (!network) { + throw new Error( + `${fnTag}: Unable to determine network from session data.`, + ); + } - this.log.info(`Successfully rolled back Stage 2`); + const bridgeManager = this.bridgesManager.getBridge(network); - const state: RollbackState = { - currentStage: "Stage2", - rollbackLogEntry: this.rollbackLogEntry, - }; + const rollbackState = new RollbackState({ + sessionId: session.getSessionId(), + currentStage: String(sessionData.hashes?.stage2), + stepsRemaining: 1, + rollbackLogEntries: [], + estimatedTimeToCompletion: "0", + status: "IN_PROGRESS", + details: "", + }); - await this.rollbackLogs.create(state); // todo: log support - return state; + try { + const assetId = sessionData.senderAsset?.tokenId; + const amount = sessionData.senderAsset?.amount; + if (!assetId) { + throw new Error(`${fnTag}: Asset ID is undefined`); + } + + if (!amount) { + throw new Error(`${fnTag}, Amount is missing`); + } + + this.log.info(`${fnTag} Asset Id: ${assetId} amount: ${amount}`); + + await bridgeManager.unlockAsset(assetId, Number(amount)); + + const rollbackLogEntry = new RollbackLogEntry({ + sessionId: session.getSessionId(), + stage: String(sessionData.hashes?.stage2), + timestamp: new Date().toISOString(), + action: "UNLOCK_ASSET", + status: "SUCCESS", + details: "", + }); + + rollbackState.rollbackLogEntries.push(rollbackLogEntry); + rollbackState.stepsRemaining = 1; + rollbackState.status = "COMPLETED"; + rollbackState.estimatedTimeToCompletion = "0"; + rollbackState.details = "Rollback of Stage 2 completed successfully"; + + this.log.info( + `${fnTag} Successfully rolled back Stage 2 for session ${session.getSessionId()}`, + ); + //await this.logRepository.create(logEntry); + return rollbackState; } catch (error) { - this.log.error(`Failed to rollback Stage 2: ${error}`); - return false; + this.log.error(`${fnTag} Failed to rollback Stage 2: ${error}`); + + const rollbackLogEntry = new RollbackLogEntry({ + sessionId: session.getSessionId(), + stage: String(sessionData.hashes?.stage2), + timestamp: new Date().toISOString(), + action: "UNLOCK_ASSET", + status: "FAILED", + details: "", + }); + + rollbackState.rollbackLogEntries.push(rollbackLogEntry); + rollbackState.status = "FAILED"; + rollbackState.details = `Rollback of Stage 2 failed: ${error}`; + + return rollbackState; } } @@ -69,7 +123,7 @@ export class Stage2RollbackStrategy implements RollbackStrategy { try { // TODO: Implement Stage 2 specific cleanup logic - state.currentStage = "Stage2"; + //state.currentStage = "Stage2"; // TODO: Update other state properties as needed return state; diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage3-rollback-strategy.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage3-rollback-strategy.ts index 1bf35232c4d..1e0507ca104 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage3-rollback-strategy.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage3-rollback-strategy.ts @@ -1,57 +1,111 @@ import { Logger, LoggerProvider } from "@hyperledger/cactus-common"; import { SATPSession } from "../../satp-session"; -import { RollbackState, RollbackStrategy } from "./rollback-strategy-factory"; -import { RollbackLogEntry } from "../../../generated/proto/cacti/satp/v02/crash_recovery_pb"; -import { SATPBridgeManager } from "../../stage-services/satp-bridge/satp-bridge-manager"; +import { RollbackStrategy } from "./rollback-strategy-factory"; +import { + RollbackLogEntry, + RollbackState, +} from "../../../generated/proto/cacti/satp/v02/crash_recovery_pb"; +import { SATPBridgesManager } from "../../../gol/satp-bridges-manager"; +import { ILocalLogRepository } from "../../../repository/interfaces/repository"; export class Stage3RollbackStrategy implements RollbackStrategy { private log: Logger; - private bridgeManager: SATPBridgeManager; - private rollbackLogEntry: RollbackLogEntry; + private bridgesManager: SATPBridgesManager; + private logRepository: ILocalLogRepository; - constructor(bridgeManager: SATPBridgeManager, logEntry: RollbackLogEntry) { + constructor( + bridgeManager: SATPBridgesManager, + localLog: ILocalLogRepository, + ) { this.log = LoggerProvider.getOrCreate({ label: "Stage3RollbackStrategy" }); - this.bridgeManager = bridgeManager; - this.rollbackLogEntry = logEntry; + this.bridgesManager = bridgeManager; + this.logRepository = localLog; } async execute(session: SATPSession): Promise { const fnTag = "Stage3RollbackStrategy#execute"; this.log.info(`${fnTag} Executing rollback for Stage 3`); - if (!session) { + const sessionData = session.hasClientSessionData() + ? session.getClientSessionData() + : session.getServerSessionData(); + + if (!sessionData) { throw new Error(`${fnTag}, session data is not correctly initialized`); } - try { - // TODO: Implement Stage 3 specific rollback logic - - // TODO: Record the rollback on the log. Implement RollbackLogEntry - - const receipt = await this.bridgeManager.burnAsset("assetId", Number()); + const network = sessionData.senderGatewayNetworkId; - this.log.info(`${fnTag}, Asset unlocked: ${receipt}`); - - this.rollbackLogEntry.sessionId = session.getSessionId(); - this.rollbackLogEntry.stage = "Stage3"; - this.rollbackLogEntry.timestamp = Date.now().toString(); - this.rollbackLogEntry.action = "BURN"; - this.rollbackLogEntry.status = "SUCCESS"; - this.rollbackLogEntry.details = ""; + if (!network) { + throw new Error( + `${fnTag}: Unable to determine network from session data.`, + ); + } - this.log.debug("Persisting rollback log entry"); + const bridgeManager = this.bridgesManager.getBridge(network); - this.log.info(`Successfully rolled back Stage 3`); - const state: RollbackState = { - currentStage: "Stage3", - rollbackLogEntry: this.rollbackLogEntry, - }; + const rollbackState = new RollbackState({ + sessionId: session.getSessionId(), + currentStage: String(sessionData.hashes?.stage3), + stepsRemaining: 2, + rollbackLogEntries: [], + estimatedTimeToCompletion: "", + status: "IN_PROGRESS", + details: "", + }); - await this.rollbackLogs.create(state); // todo: log support - return state; + try { + const assetId = sessionData.senderAsset?.tokenId; + const amount = sessionData.senderAsset?.amount; + if (!assetId) { + throw new Error(`${fnTag}: Asset ID is undefined`); + } + + if (!amount) { + throw new Error(`${fnTag}, Amount is missing`); + } + + this.log.info(`${fnTag} Asset Id: ${assetId} amount: ${amount}`); + + await bridgeManager.burnAsset(assetId, Number(amount)); + + const rollbackLogEntry = new RollbackLogEntry({ + sessionId: session.getSessionId(), + stage: String(sessionData.hashes?.stage0), + timestamp: new Date().toISOString(), + action: "BURN_ASSET", + status: "SUCCESS", + details: "", + }); + + rollbackState.rollbackLogEntries.push(rollbackLogEntry); + rollbackState.stepsRemaining = 2; + rollbackState.status = "COMPLETED"; + rollbackState.estimatedTimeToCompletion = "0"; + rollbackState.details = "Rollback of Stage 3 completed successfully"; + + this.log.info( + `${fnTag} Successfully rolled back Stage 3 for session ${session.getSessionId()}`, + ); + //await this.logRepository.create(logEntry); + return rollbackState; } catch (error) { - this.log.error(`Failed to rollback Stage 3: ${error}`); - return false; + this.log.error(`${fnTag} Failed to rollback Stage 3: ${error}`); + + const rollbackLogEntry = new RollbackLogEntry({ + sessionId: session.getSessionId(), + stage: String(sessionData.hashes?.stage3), + timestamp: new Date().toISOString(), + action: "BURN_ASSET", + status: "FAILED", + details: `Error burning asset: ${error}`, + }); + + rollbackState.rollbackLogEntries.push(rollbackLogEntry); + rollbackState.status = "FAILED"; + rollbackState.details = `Rollback of Stage 3 failed: ${error}`; + + return rollbackState; } } @@ -70,7 +124,7 @@ export class Stage3RollbackStrategy implements RollbackStrategy { try { // TODO: Implement Stage 3 specific cleanup logic - state.currentStage = "Stage3"; + //state.currentStage = ""; // TODO: Update other state properties as needed return state; diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/common/health_pb.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/common/health_pb.ts index 6ef53cb139d..06f94e0ab6e 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/common/health_pb.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/common/health_pb.ts @@ -1,4 +1,4 @@ -// @generated by protoc-gen-es v1.8.0 with parameter "target=ts" +// @generated by protoc-gen-es v1.7.2 with parameter "target=ts" // @generated from file cacti/satp/v02/common/health.proto (package cacti.satp.v02.common, syntax proto3) /* eslint-disable */ // @ts-nocheck diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/common/message_pb.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/common/message_pb.ts index 7f8248ffddc..36d20c534e5 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/common/message_pb.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/common/message_pb.ts @@ -1,4 +1,4 @@ -// @generated by protoc-gen-es v1.8.0 with parameter "target=ts" +// @generated by protoc-gen-es v1.7.2 with parameter "target=ts" // @generated from file cacti/satp/v02/common/message.proto (package cacti.satp.v02.common, syntax proto3) /* eslint-disable */ // @ts-nocheck diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/common/session_pb.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/common/session_pb.ts index 23b33c82ad5..29c9d725f33 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/common/session_pb.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/common/session_pb.ts @@ -1,4 +1,4 @@ -// @generated by protoc-gen-es v1.8.0 with parameter "target=ts" +// @generated by protoc-gen-es v1.7.2 with parameter "target=ts" // @generated from file cacti/satp/v02/common/session.proto (package cacti.satp.v02.common, syntax proto3) /* eslint-disable */ // @ts-nocheck diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/crash_recovery_pb.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/crash_recovery_pb.ts index a3943ab64e4..b4659aa2e8f 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/crash_recovery_pb.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/crash_recovery_pb.ts @@ -1,4 +1,4 @@ -// @generated by protoc-gen-es v1.8.0 with parameter "target=ts" +// @generated by protoc-gen-es v1.7.2 with parameter "target=ts" // @generated from file cacti/satp/v02/crash_recovery.proto (package cacti.satp.v02.crash, syntax proto3) /* eslint-disable */ // @ts-nocheck @@ -487,3 +487,80 @@ export class RollbackLogEntry extends Message { } } +/** + * @generated from message cacti.satp.v02.crash.RollbackState + */ +export class RollbackState extends Message { + /** + * @generated from field: string session_id = 1; + */ + sessionId = ""; + + /** + * @generated from field: string current_stage = 2; + */ + currentStage = ""; + + /** + * @generated from field: int32 steps_remaining = 3; + */ + stepsRemaining = 0; + + /** + * @generated from field: repeated cacti.satp.v02.crash.RollbackLogEntry rollback_log_entries = 4; + */ + rollbackLogEntries: RollbackLogEntry[] = []; + + /** + * @generated from field: string estimated_time_to_completion = 5; + */ + estimatedTimeToCompletion = ""; + + /** + * Overall status (e.g., IN_PROGRESS, COMPLETED, FAILED) + * + * @generated from field: string status = 6; + */ + status = ""; + + /** + * Additional metadata or information + * + * @generated from field: string details = 7; + */ + details = ""; + + constructor(data?: PartialMessage) { + super(); + proto3.util.initPartial(data, this); + } + + static readonly runtime: typeof proto3 = proto3; + static readonly typeName = "cacti.satp.v02.crash.RollbackState"; + static readonly fields: FieldList = proto3.util.newFieldList(() => [ + { no: 1, name: "session_id", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 2, name: "current_stage", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 3, name: "steps_remaining", kind: "scalar", T: 5 /* ScalarType.INT32 */ }, + { no: 4, name: "rollback_log_entries", kind: "message", T: RollbackLogEntry, repeated: true }, + { no: 5, name: "estimated_time_to_completion", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 6, name: "status", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 7, name: "details", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): RollbackState { + return new RollbackState().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): RollbackState { + return new RollbackState().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): RollbackState { + return new RollbackState().fromJsonString(jsonString, options); + } + + static equals(a: RollbackState | PlainMessage | undefined, b: RollbackState | PlainMessage | undefined): boolean { + return proto3.util.equals(RollbackState, a, b); + } +} + diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_0_pb.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_0_pb.ts index 9a74c558900..cacc5148051 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_0_pb.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_0_pb.ts @@ -1,4 +1,4 @@ -// @generated by protoc-gen-es v1.8.0 with parameter "target=ts" +// @generated by protoc-gen-es v1.7.2 with parameter "target=ts" // @generated from file cacti/satp/v02/stage_0.proto (package cacti.satp.v02, syntax proto3) /* eslint-disable */ // @ts-nocheck diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_1_pb.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_1_pb.ts index ed70ded73b5..e40e13871f1 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_1_pb.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_1_pb.ts @@ -1,4 +1,4 @@ -// @generated by protoc-gen-es v1.8.0 with parameter "target=ts" +// @generated by protoc-gen-es v1.7.2 with parameter "target=ts" // @generated from file cacti/satp/v02/stage_1.proto (package cacti.satp.v02, syntax proto3) /* eslint-disable */ // @ts-nocheck diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_2_pb.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_2_pb.ts index fc39e916dc4..2dcddb32d34 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_2_pb.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_2_pb.ts @@ -1,4 +1,4 @@ -// @generated by protoc-gen-es v1.8.0 with parameter "target=ts" +// @generated by protoc-gen-es v1.7.2 with parameter "target=ts" // @generated from file cacti/satp/v02/stage_2.proto (package cacti.satp.v02, syntax proto3) /* eslint-disable */ // @ts-nocheck diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_3_pb.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_3_pb.ts index 59d65e98f95..5e64da5a952 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_3_pb.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_3_pb.ts @@ -1,4 +1,4 @@ -// @generated by protoc-gen-es v1.8.0 with parameter "target=ts" +// @generated by protoc-gen-es v1.7.2 with parameter "target=ts" // @generated from file cacti/satp/v02/stage_3.proto (package cacti.satp.v02, syntax proto3) /* eslint-disable */ // @ts-nocheck diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/view/bungee_pb.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/view/bungee_pb.ts index 03e37d7dd70..e3147a86292 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/view/bungee_pb.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/view/bungee_pb.ts @@ -1,4 +1,4 @@ -// @generated by protoc-gen-es v1.8.0 with parameter "target=ts" +// @generated by protoc-gen-es v1.7.2 with parameter "target=ts" // @generated from file cacti/satp/v02/view/bungee.proto (package cacti.satp.v02.view.bungee, syntax proto3) /* eslint-disable */ // @ts-nocheck diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/plugin-satp-hermes-gateway.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/plugin-satp-hermes-gateway.ts index 59adfd5556e..1faccb7bdc6 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/plugin-satp-hermes-gateway.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/plugin-satp-hermes-gateway.ts @@ -56,7 +56,7 @@ import { } from "./gol/satp-bridges-manager"; import bodyParser from "body-parser"; import { - CrashOcurrence, + //CrashOcurrence, CrashRecoveryManager, ICrashRecoveryManagerOptions, } from "./core/recovery/crash-manager"; @@ -186,10 +186,9 @@ export class SATPGateway implements IPluginWebService, ICactusPlugin { const crashOptions: ICrashRecoveryManagerOptions = { instanceId: this.instanceId, logLevel: this.config.logLevel, - bridgeConfig: SATPBridgeConfig, + bridgeConfig: bridgesManagerOptions, }; this.crashManager = new CrashRecoveryManager(crashOptions); - this.crashManager.checkAndResolveCrash(); } /* ICactus Plugin methods */ diff --git a/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/logging.test.ts b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/logging.test.ts new file mode 100644 index 00000000000..a7f643a1dfc --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/logging.test.ts @@ -0,0 +1,340 @@ +import "jest-extended"; +import { + CrashRecoveryManager, + CrashStatus, +} from "../../../../main/typescript/core/recovery/crash-manager"; +import { LogLevelDesc, Secp256k1Keys } from "@hyperledger/cactus-common"; +import { ICrashRecoveryManagerOptions } from "../../../../main/typescript/core/recovery/crash-manager"; +import knex from "knex"; +import { + LocalLog, + SupportedChain, +} from "../../../../main/typescript/core/types"; +import { + Asset, + CredentialProfile, + LockType, + SignatureAlgorithm, +} from "../../../../main/typescript/generated/proto/cacti/satp/v02/common/message_pb"; +import { v4 as uuidv4 } from "uuid"; +import { SessionData } from "../../../../main/typescript/generated/proto/cacti/satp/v02/common/session_pb"; +import { SATP_VERSION } from "../../../../main/typescript/core/constants"; +import { SATPSession } from "../../../../main/typescript/core/satp-session"; +import { knexClientConnection } from "../../knex.config"; +import { getSatpLogKey } from "../../../../main/typescript/gateway-utils"; +import { TokenType } from "../../../../main/typescript/core/stage-services/satp-bridge/types/asset"; + +const logLevel: LogLevelDesc = "DEBUG"; + +let mockSession: SATPSession; +const keyPairs = Secp256k1Keys.generateKeyPairsBuffer(); +const sessionId = uuidv4(); + +const createMockSession = (maxTimeout: string, maxRetries: string) => { + const mockSession = new SATPSession({ + contextID: "MOCK_CONTEXT_ID", + server: false, + client: true, + }); + + const sessionData = mockSession.hasClientSessionData() + ? mockSession.getClientSessionData() + : mockSession.getServerSessionData(); + + sessionData.id = sessionId; + sessionData.maxTimeout = maxTimeout; + sessionData.maxRetries = maxRetries; + sessionData.version = SATP_VERSION; + sessionData.clientGatewayPubkey = Buffer.from(keyPairs.publicKey).toString( + "hex", + ); + sessionData.serverGatewayPubkey = sessionData.clientGatewayPubkey; + sessionData.originatorPubkey = "MOCK_ORIGINATOR_PUBKEY"; + sessionData.beneficiaryPubkey = "MOCK_BENEFICIARY_PUBKEY"; + sessionData.digitalAssetId = "MOCK_DIGITAL_ASSET_ID"; + sessionData.assetProfileId = "MOCK_ASSET_PROFILE_ID"; + sessionData.receiverGatewayOwnerId = "MOCK_RECEIVER_GATEWAY_OWNER_ID"; + sessionData.recipientGatewayNetworkId = SupportedChain.FABRIC; + sessionData.senderGatewayOwnerId = "MOCK_SENDER_GATEWAY_OWNER_ID"; + sessionData.senderGatewayNetworkId = SupportedChain.BESU; + sessionData.signatureAlgorithm = SignatureAlgorithm.RSA; + sessionData.lockType = LockType.FAUCET; + sessionData.lockExpirationTime = BigInt(1000); + sessionData.credentialProfile = CredentialProfile.X509; + sessionData.loggingProfile = "MOCK_LOGGING_PROFILE"; + sessionData.accessControlProfile = "MOCK_ACCESS_CONTROL_PROFILE"; + sessionData.resourceUrl = "MOCK_RESOURCE_URL"; + sessionData.lockAssertionExpiration = BigInt(99999); + sessionData.receiverContractOntology = "MOCK_RECEIVER_CONTRACT_ONTOLOGY"; + sessionData.senderContractOntology = "MOCK_SENDER_CONTRACT_ONTOLOGY"; + sessionData.sourceLedgerAssetId = "MOCK_SOURCE_LEDGER_ASSET_ID"; + sessionData.senderAsset = new Asset(); + sessionData.senderAsset.tokenId = "MOCK_TOKEN_ID"; + sessionData.senderAsset.tokenType = TokenType.ERC20; + sessionData.senderAsset.amount = BigInt(0); + sessionData.senderAsset.owner = "MOCK_SENDER_ASSET_OWNER"; + sessionData.senderAsset.ontology = "MOCK_SENDER_ASSET_ONTOLOGY"; + sessionData.senderAsset.contractName = "MOCK_SENDER_ASSET_CONTRACT_NAME"; + sessionData.senderAsset.contractAddress = + "MOCK_SENDER_ASSET_CONTRACT_ADDRESS"; + sessionData.receiverAsset = new Asset(); + + sessionData.receiverAsset.tokenType = TokenType.ERC20; + sessionData.receiverAsset.amount = BigInt(0); + sessionData.receiverAsset.owner = "MOCK_RECEIVER_ASSET_OWNER"; + sessionData.receiverAsset.ontology = "MOCK_RECEIVER_ASSET_ONTOLOGY"; + sessionData.receiverAsset.contractName = "MOCK_RECEIVER_ASSET_CONTRACT_NAME"; + sessionData.receiverAsset.mspId = "MOCK_RECEIVER_ASSET_MSP_ID"; + sessionData.receiverAsset.channelName = "MOCK_CHANNEL_ID"; + + return mockSession; +}; +let crashManager: CrashRecoveryManager; + +beforeAll(async () => { + const knexInstance = knex(knexClientConnection); + await knexInstance.migrate.latest(); + + const crashManagerOptions: ICrashRecoveryManagerOptions = { + instanceId: uuidv4(), + logLevel: logLevel, + knexConfig: knexClientConnection, + bridgeConfig: { + logLevel: logLevel, + networks: [], + supportedDLTs: [SupportedChain.BESU, SupportedChain.FABRIC], + }, + }; + + crashManager = new CrashRecoveryManager(crashManagerOptions); +}); + +afterEach(() => { + jest.clearAllMocks(); +}); + +describe("CrashRecoveryManager Tests", () => { + it("should reconstruct session by fetching logs", async () => { + mockSession = createMockSession("1000", "3"); + + const testData = mockSession.hasClientSessionData() + ? mockSession.getClientSessionData() + : mockSession.getServerSessionData(); + + // load sample log in database + const key = getSatpLogKey(sessionId, "type", "operation"); + const mockLogEntry: LocalLog = { + sessionID: sessionId, + type: "type", + key: key, + operation: "operation", + timestamp: new Date().toISOString(), + data: JSON.stringify(testData), + }; + const mockLogRepository = crashManager["logRepository"]; + + await mockLogRepository.create(mockLogEntry); + await crashManager.recoverSessions(); + + expect(crashManager["sessions"].has(sessionId)).toBeTrue(); + + const recoveredSession = crashManager["sessions"].get(sessionId); + + expect(recoveredSession).toBeDefined(); + + if (recoveredSession) { + const parsedSessionData: SessionData = JSON.parse(mockLogEntry.data); + expect(recoveredSession).toEqual(parsedSessionData); + } + }); + + it("should invoke handleRecovery when crash is initially detected", async () => { + mockSession = createMockSession("1000", "3"); + + const handleRecoverySpy = jest + .spyOn(crashManager, "handleRecovery") + .mockImplementation(async () => true); + + jest + .spyOn(crashManager as any, "checkCrash") + .mockImplementation(() => Promise.resolve(CrashStatus.IN_RECOVERY)); + + await crashManager.checkAndResolveCrash(mockSession); + + expect(handleRecoverySpy).toHaveBeenCalled(); + + handleRecoverySpy.mockRestore(); + }); + + it("should invoke initiateRollback when recovery attempts are exhausted", async () => { + mockSession = createMockSession("1000", "3"); + + const handleRecoverySpy = jest + .spyOn(crashManager, "handleRecovery") + .mockImplementation(async () => false); + + const initiateRollbackSpy = jest + .spyOn(crashManager, "initiateRollback") + .mockImplementation(async () => true); + + jest + .spyOn(crashManager as any, "checkCrash") + .mockImplementation(() => Promise.resolve(CrashStatus.IN_RECOVERY)); + + await crashManager.checkAndResolveCrash(mockSession); + + expect(handleRecoverySpy).toHaveBeenCalled(); + expect(initiateRollbackSpy).toHaveBeenCalled(); + + handleRecoverySpy.mockRestore(); + initiateRollbackSpy.mockRestore(); + }); + + it("should detect crash based on incomplete operation in logs", async () => { + mockSession = createMockSession("10000", "3"); + + const testData = mockSession.hasClientSessionData() + ? mockSession.getClientSessionData() + : mockSession.getServerSessionData(); + + const handleRecoverySpy = jest + .spyOn(crashManager, "handleRecovery") + .mockImplementation(async () => true); + + const key = getSatpLogKey(sessionId, "type", "init"); + + const mockLogEntry: LocalLog = { + sessionID: sessionId, + type: "type", + key: key, + operation: "init", // operation!=done + timestamp: new Date().toISOString(), + data: JSON.stringify(testData), + }; + + const mockLogRepository = crashManager["logRepository"]; + + await mockLogRepository.create(mockLogEntry); + + await crashManager.checkAndResolveCrash(mockSession); + + expect(handleRecoverySpy).toHaveBeenCalled(); + + handleRecoverySpy.mockRestore(); + }); + + it("should detect crash based on session timeout", async () => { + mockSession = createMockSession("1000", "3"); // timeout of 1 sec + + const testData = mockSession.hasClientSessionData() + ? mockSession.getClientSessionData() + : mockSession.getServerSessionData(); + + const handleRecoverySpy = jest + .spyOn(crashManager, "handleRecovery") + .mockImplementation(async () => true); + + const key = getSatpLogKey(sessionId, "type", "done"); + + const pastTime = new Date(Date.now() - 10000).toISOString(); + + const mockLogEntry: LocalLog = { + sessionID: sessionId, + type: "type", + key: key, + operation: "done", + timestamp: pastTime, + data: JSON.stringify(testData), + }; + + const mockLogRepository = crashManager["logRepository"]; + + await mockLogRepository.create(mockLogEntry); + + await crashManager.checkAndResolveCrash(mockSession); + + expect(handleRecoverySpy).toHaveBeenCalled(); + + handleRecoverySpy.mockRestore(); + }); + + it("should detect crash based on incomplete operation in logs and initiate rollback when recovery fails", async () => { + mockSession = createMockSession("10000", "3"); + + const testData = mockSession.hasClientSessionData() + ? mockSession.getClientSessionData() + : mockSession.getServerSessionData(); + + const handleRecoverySpy = jest + .spyOn(crashManager, "handleRecovery") + .mockImplementation(async () => false); + + const handleInitiateRollBackSpy = jest + .spyOn(crashManager, "initiateRollback") + .mockImplementation(async () => true); + + const key = getSatpLogKey(sessionId, "type3", "init"); + + const mockLogEntry: LocalLog = { + sessionID: sessionId, + type: "type3", + key: key, + operation: "init", // operation!=done + timestamp: new Date().toISOString(), + data: JSON.stringify(testData), + }; + + const mockLogRepository = crashManager["logRepository"]; + + await mockLogRepository.create(mockLogEntry); + + await crashManager.checkAndResolveCrash(mockSession); + + expect(handleRecoverySpy).toHaveBeenCalled(); + expect(handleInitiateRollBackSpy).toHaveBeenCalled(); + + handleRecoverySpy.mockRestore(); + handleInitiateRollBackSpy.mockRestore(); + }); + + it("should detect crash based on session timeout and initiate rollback when recovery fails", async () => { + mockSession = createMockSession("1000", "3"); // timeout of 1 sec + + const testData = mockSession.hasClientSessionData() + ? mockSession.getClientSessionData() + : mockSession.getServerSessionData(); + + const handleRecoverySpy = jest + .spyOn(crashManager, "handleRecovery") + .mockImplementation(async () => false); + + const handleInitiateRollBackSpy = jest + .spyOn(crashManager, "initiateRollback") + .mockImplementation(async () => true); + + const key = getSatpLogKey(sessionId, "type1", "done"); + + const pastTime = new Date(Date.now() - 10000).toISOString(); + + const mockLogEntry: LocalLog = { + sessionID: sessionId, + type: "type1", + key: key, + operation: "done", + timestamp: pastTime, + data: JSON.stringify(testData), + }; + + const mockLogRepository = crashManager["logRepository"]; + + await mockLogRepository.create(mockLogEntry); + + await crashManager.checkAndResolveCrash(mockSession); + + expect(handleRecoverySpy).toHaveBeenCalled(); + expect(handleInitiateRollBackSpy).toHaveBeenCalled(); + + handleRecoverySpy.mockRestore(); + handleInitiateRollBackSpy.mockRestore(); + }); +}); diff --git a/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/services.test.ts b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/services.test.ts new file mode 100644 index 00000000000..706d87a74d0 --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/services.test.ts @@ -0,0 +1,158 @@ +import "jest-extended"; +import { LogLevelDesc, Secp256k1Keys } from "@hyperledger/cactus-common"; +import { + CrashRecoveryManager, + ICrashRecoveryManagerOptions, +} from "../../../../main/typescript/core/recovery/crash-manager"; +import knex from "knex"; +import { SATPSession } from "../../../../main/typescript/core/satp-session"; +import { knexClientConnection } from "../../knex.config"; +import { SupportedChain } from "../../../../main/typescript/core/types"; +import { + Asset, + CredentialProfile, + LockType, + SignatureAlgorithm, +} from "../../../../main/typescript/generated/proto/cacti/satp/v02/common/message_pb"; +import { SATP_VERSION } from "../../../../main/typescript/core/constants"; +import { TokenType } from "../../../../main/typescript/core/stage-services/satp-bridge/types/asset"; +import { v4 as uuidv4 } from "uuid"; +import { getSatpLogKey } from "../../../../main/typescript/gateway-utils"; + +const logLevel: LogLevelDesc = "DEBUG"; +const keyPairs = Secp256k1Keys.generateKeyPairsBuffer(); + +let crashRecoveryManager: CrashRecoveryManager; +let mockSession: SATPSession; +const sessionId = uuidv4(); +const createMockSession = () => { + const mockSession = new SATPSession({ + contextID: "MOCK_CONTEXT_ID", + server: false, + client: true, + }); + + const sessionData = mockSession.hasClientSessionData() + ? mockSession.getClientSessionData() + : mockSession.getServerSessionData(); + + sessionData.id = sessionId; + sessionData.version = SATP_VERSION; + sessionData.clientGatewayPubkey = Buffer.from(keyPairs.publicKey).toString( + "hex", + ); + sessionData.serverGatewayPubkey = sessionData.clientGatewayPubkey; + sessionData.originatorPubkey = "MOCK_ORIGINATOR_PUBKEY"; + sessionData.beneficiaryPubkey = "MOCK_BENEFICIARY_PUBKEY"; + sessionData.digitalAssetId = "MOCK_DIGITAL_ASSET_ID"; + sessionData.assetProfileId = "MOCK_ASSET_PROFILE_ID"; + sessionData.receiverGatewayOwnerId = "MOCK_RECEIVER_GATEWAY_OWNER_ID"; + sessionData.recipientGatewayNetworkId = SupportedChain.FABRIC; + sessionData.senderGatewayOwnerId = "MOCK_SENDER_GATEWAY_OWNER_ID"; + sessionData.senderGatewayNetworkId = SupportedChain.BESU; + sessionData.signatureAlgorithm = SignatureAlgorithm.RSA; + sessionData.lockType = LockType.FAUCET; + sessionData.lockExpirationTime = BigInt(1000); + sessionData.credentialProfile = CredentialProfile.X509; + sessionData.loggingProfile = "MOCK_LOGGING_PROFILE"; + sessionData.accessControlProfile = "MOCK_ACCESS_CONTROL_PROFILE"; + sessionData.resourceUrl = "MOCK_RESOURCE_URL"; + sessionData.lockAssertionExpiration = BigInt(99999); + sessionData.receiverContractOntology = "MOCK_RECEIVER_CONTRACT_ONTOLOGY"; + sessionData.senderContractOntology = "MOCK_SENDER_CONTRACT_ONTOLOGY"; + sessionData.sourceLedgerAssetId = "MOCK_SOURCE_LEDGER_ASSET_ID"; + sessionData.senderAsset = new Asset(); + sessionData.senderAsset.tokenId = "MOCK_TOKEN_ID"; + sessionData.senderAsset.tokenType = TokenType.ERC20; + sessionData.senderAsset.amount = BigInt(0); + sessionData.senderAsset.owner = "MOCK_SENDER_ASSET_OWNER"; + sessionData.senderAsset.ontology = "MOCK_SENDER_ASSET_ONTOLOGY"; + sessionData.senderAsset.contractName = "MOCK_SENDER_ASSET_CONTRACT_NAME"; + sessionData.senderAsset.contractAddress = + "MOCK_SENDER_ASSET_CONTRACT_ADDRESS"; + sessionData.receiverAsset = new Asset(); + + sessionData.receiverAsset.tokenType = TokenType.ERC20; + sessionData.receiverAsset.amount = BigInt(0); + sessionData.receiverAsset.owner = "MOCK_RECEIVER_ASSET_OWNER"; + sessionData.receiverAsset.ontology = "MOCK_RECEIVER_ASSET_ONTOLOGY"; + sessionData.receiverAsset.contractName = "MOCK_RECEIVER_ASSET_CONTRACT_NAME"; + sessionData.receiverAsset.mspId = "MOCK_RECEIVER_ASSET_MSP_ID"; + sessionData.receiverAsset.channelName = "MOCK_CHANNEL_ID"; + + return mockSession; +}; + +beforeAll(async () => { + const knexInstance = knex(knexClientConnection); + await knexInstance.migrate.latest(); + const crashManagerOptions: ICrashRecoveryManagerOptions = { + instanceId: uuidv4(), + logLevel: logLevel, + knexConfig: knexClientConnection, + bridgeConfig: { + logLevel: logLevel, + networks: [], + supportedDLTs: [SupportedChain.BESU, SupportedChain.FABRIC], + }, + }; + + crashRecoveryManager = new CrashRecoveryManager(crashManagerOptions); +}); + +describe("Crash Recovery Services Testing", () => { + it("handle reover function test", async () => { + mockSession = createMockSession(); + + const testData = mockSession.hasClientSessionData() + ? mockSession.getClientSessionData() + : mockSession.getServerSessionData(); + + const key = getSatpLogKey(sessionId, "type", "operation"); + const mockLogEntry = { + sessionID: sessionId, + type: "type", + key: key, + operation: "operation", + timestamp: new Date().toISOString(), + data: JSON.stringify(testData), + }; + + const mockLogRepository = crashRecoveryManager["logRepository"]; + + await mockLogRepository.create(mockLogEntry); + await crashRecoveryManager.recoverSessions(); + + const result = await crashRecoveryManager.handleRecovery(mockSession); + expect(result).toBe(true); + }); + + /*it("intitiate rollback function test", async () => { + mockSession = createMockSession(); + + const testData = mockSession.hasClientSessionData() + ? mockSession.getClientSessionData() + : mockSession.getServerSessionData(); + + const key = getSatpLogKey(sessionId, "type1", "operation1"); + const mockLogEntry = { + sessionID: sessionId, + type: "type1", + key: key, + operation: "operation1", + timestamp: new Date().toISOString(), + data: JSON.stringify(testData), + }; + + const mockLogRepository = crashRecoveryManager["logRepository"]; + + await mockLogRepository.create(mockLogEntry); + await crashRecoveryManager.recoverSessions(); + + const result = await crashRecoveryManager.initiateRollback( + mockSession, + true, + ); + expect(result).toBe(true); + });*/ +}); diff --git a/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/start-gateway.test.ts b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/start-gateway.test.ts new file mode 100644 index 00000000000..020670e50ff --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/start-gateway.test.ts @@ -0,0 +1,90 @@ +import "jest-extended"; +import { LoggerProvider, LogLevelDesc } from "@hyperledger/cactus-common"; +import { + SATPGateway, + SATPGatewayConfig, +} from "../../../../main/typescript/plugin-satp-hermes-gateway"; +import { PluginFactorySATPGateway } from "../../../../main/typescript/factory/plugin-factory-gateway-orchestrator"; +import { + IPluginFactoryOptions, + PluginImportType, +} from "@hyperledger/cactus-core-api"; +import { + SupportedChain, + Address, +} from "../../../../main/typescript/core/types"; + +const logLevel: LogLevelDesc = "DEBUG"; +const log = LoggerProvider.getOrCreate({ + level: logLevel, + label: "gateway-test", +}); + +describe("SATPGateway tests", () => { + it("should initialize two gateways and test their interaction", async () => { + const factoryOptions: IPluginFactoryOptions = { + pluginImportType: PluginImportType.Local, + }; + + const factory = new PluginFactorySATPGateway(factoryOptions); + const gatewayIdentity1 = { + id: "mockID-1", + name: "CustomGateway", + version: [ + { + Core: "v02", + Architecture: "v02", + Crash: "v02", + }, + ], + supportedDLTs: [SupportedChain.BESU], + proofID: "mockProofID10", + address: "http://localhost" as Address, + }; + + const gatewayIdentity2 = { + id: "mockID-2", + name: "CustomGateway", + version: [ + { + Core: "v02", + Architecture: "v02", + Crash: "v02", + }, + ], + supportedDLTs: [SupportedChain.FABRIC], + proofID: "mockProofID11", + address: "http://localhost" as Address, + gatewayServerPort: 3110, + gatewayClientPort: 3111, + gatewayOpenAPIPort: 4110, + }; + + const options1: SATPGatewayConfig = { + logLevel: "DEBUG", + gid: gatewayIdentity1, + counterPartyGateways: [gatewayIdentity2], + bridgesConfig: [], + }; + + const options2: SATPGatewayConfig = { + logLevel: "DEBUG", + gid: gatewayIdentity2, + counterPartyGateways: [gatewayIdentity1], + bridgesConfig: [], + }; + const gateway1 = await factory.create(options1); + expect(gateway1).toBeInstanceOf(SATPGateway); + await gateway1.startup(); + + const gateway2 = await factory.create(options2); + expect(gateway2).toBeInstanceOf(SATPGateway); + + await gateway2.startup(); + + log.info("gateway test!"); + + await gateway1.shutdown(); + await gateway2.shutdown(); + }); +}); diff --git a/yarn.lock b/yarn.lock index 4b2cda125cd..b6b0639d849 100644 --- a/yarn.lock +++ b/yarn.lock @@ -44339,6 +44339,13 @@ __metadata: languageName: node linkType: hard +"pg-connection-string@npm:2.6.1": + version: 2.6.1 + resolution: "pg-connection-string@npm:2.6.1" + checksum: 10/882344a47e1ecf3a91383e0809bf2ac48facea97fcec0358d6e060e1cbcb8737acde419b4c86f05da4ce4a16634ee50fff1d2bb787d73b52ccbfde697243ad8a + languageName: node + linkType: hard + "pg-connection-string@npm:^2.6.4": version: 2.6.4 resolution: "pg-connection-string@npm:2.6.4"