Skip to content

Commit

Permalink
feat: add crash recovery and knex config for production
Browse files Browse the repository at this point in the history
Signed-off-by: Yogesh01000100 <[email protected]>

chore(satp-hermes): improve DB management

Signed-off-by: Rafael Belchior <[email protected]>

chore(satp-hermes): crash recovery architecture

Signed-off-by: Rafael Belchior <[email protected]>

fix(recovery): enhance crash recovery and rollback implementation

Signed-off-by: Yogesh01000100 <[email protected]>

refactor(recovery): consolidate logic and improve SATP message handling

Signed-off-by: Yogesh01000100 <[email protected]>

feat(recovery): add rollback implementations

Signed-off-by: Yogesh01000100 <[email protected]>

fix: correct return types and inits

Signed-off-by: Yogesh01000100 <[email protected]>
Co-authored-by: Rafael Belchior <[email protected]>

fix: add unit tests and resolve rollbackstate

Signed-off-by: Yogesh01000100 <[email protected]>

feat: add function processing logs from g2

Signed-off-by: Yogesh01000100 <[email protected]>

feat: add cron schedule for periodic crash checks

Signed-off-by: Yogesh01000100 <[email protected]>

fix: resolve rollback condition and add tests

Signed-off-by: Yogesh01000100 <[email protected]>

feat: add orchestrator communication layer using connect-RPC

Signed-off-by: Yogesh01000100 <[email protected]>

feat: add rollback protocol rpc

Signed-off-by: Yogesh01000100 <[email protected]>

fix: handle server log synchronization

Signed-off-by: Yogesh01000100 <[email protected]>

fix: resolve gol errors, add unit tests

Signed-off-by: Yogesh01000100 <[email protected]>

fix: handle server-side rollback

Signed-off-by: Yogesh01000100 <[email protected]>

fix: resolve networkId in rollback strategies

Signed-off-by: Yogesh01000100 <[email protected]>
  • Loading branch information
Yogesh01000100 committed Dec 9, 2024
1 parent 83b105b commit d6ffbca
Show file tree
Hide file tree
Showing 32 changed files with 3,724 additions and 77 deletions.
2 changes: 2 additions & 0 deletions packages/cactus-plugin-satp-hermes/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
"jsonc": "2.0.0",
"knex": "2.4.0",
"kubo-rpc-client": "3.0.1",
"node-cron": "3.0.2",
"npm-run-all": "4.1.5",
"openzeppelin-solidity": "3.4.2",
"pg": "8.13.1",
Expand Down Expand Up @@ -163,6 +164,7 @@
"@types/fs-extra": "11.0.4",
"@types/google-protobuf": "3.15.12",
"@types/node": "18.18.2",
"@types/node-cron": "3.0.11",
"@types/pg": "8.11.10",
"@types/swagger-ui-express": "4.1.6",
"@types/tape": "4.13.4",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Knex } from "knex";

export function up(knex: Knex): Knex.SchemaBuilder {
return knex.schema.createTable("logs", (table) => {
table.string("sessionID").notNullable();
table.string("sessionId").notNullable();
table.string("type").notNullable();
table.string("key").notNullable().primary();
table.string("operation").notNullable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ service CrashRecovery {

// step RPCs
rpc RecoverV2Message(RecoverMessage) returns (RecoverUpdateMessage);
rpc RecoverV2SuccessMessage(RecoverSuccessMessage) returns (google.protobuf.Empty);
rpc RecoverV2SuccessMessage(RecoverSuccessMessage) returns (RecoverSuccessMessageResponse);
rpc RollbackV2Message(RollbackMessage) returns (RollbackAckMessage);
}

Expand Down Expand Up @@ -41,6 +41,12 @@ message RecoverSuccessMessage {
string sender_signature = 6;
}

message RecoverSuccessMessageResponse {
string session_id = 1;
bool received = 2;
string sender_signature = 3;
}

message RollbackMessage {
string session_id = 1;
string message_type = 2;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import {
RecoverMessage,
RecoverMessageSchema,
RecoverSuccessMessage,
RecoverSuccessMessageSchema,
RollbackMessage,
RollbackMessageSchema,
RollbackState,
} from "../../../typescript/generated/proto/cacti/satp/v02/crash_recovery_pb";
import { Logger, LoggerProvider } from "@hyperledger/cactus-common";
import { SATPSession } from "../satp-session";
import { ILocalLogRepository } from "../../repository/interfaces/repository";
import { create } from "@bufbuild/protobuf";
import { SATPLogger } from "../../logging";

export class CrashRecoveryClientService {
private readonly log: Logger;

constructor(
private readonly logRepository: ILocalLogRepository,
private readonly sessions: Map<string, SATPSession>,
private readonly dbLogger: SATPLogger,
loggerLabel: string = "CrashRecoveryClientService",
) {
this.log = LoggerProvider.getOrCreate({ label: loggerLabel });
this.log.trace(`Initialized ${CrashRecoveryClientService.name}`);
}

public createRecoverMessage(session: SATPSession): RecoverMessage {
const fnTag = `${CrashRecoveryClientService.name}#createRecoverMessage`;
this.log.debug(
`${fnTag} - Creating RecoverMessage for sessionId: ${session.getSessionId()}`,
);

const sessionData = session.getClientSessionData();

const recoverMessage = create(RecoverMessageSchema, {
sessionId: session.getSessionId(),
messageType: "urn:ietf:SATP-2pc:msgtype:recover-msg",
satpPhase: "phase0",
sequenceNumber: Number(sessionData.lastSequenceNumber),
isBackup: false,
newIdentityPublicKey: "",
lastEntryTimestamp: BigInt(0),
senderSignature: "",
});
//await this.dbLogger.persistLogEntry({});
this.log.debug(`${fnTag} - RecoverMessage created:`, recoverMessage);

return recoverMessage;
}

public async createRecoverSuccessMessage(
session: SATPSession,
): Promise<RecoverSuccessMessage> {
const fnTag = `${CrashRecoveryClientService.name}#createRecoverSuccessMessage`;
this.log.debug(
`${fnTag} - Creating RecoverSuccessMessage for sessionId: ${session.getSessionId()}`,
);

const recoverSuccessMessage = create(RecoverSuccessMessageSchema, {
sessionId: session.getSessionId(),
messageType: "urn:ietf:SATP-2pc:msgtype:recover-success-msg",
hashRecoverUpdateMessage: "",
success: true,
entriesChanged: [],
senderSignature: "",
});
//await this.dbLogger.persistLogEntry({});
this.log.debug(
`${fnTag} - RecoverSuccessMessage created:`,
recoverSuccessMessage,
);

return recoverSuccessMessage;
}

public async createRollbackMessage(
session: SATPSession,
rollbackState: RollbackState,
): Promise<RollbackMessage> {
const fnTag = `${CrashRecoveryClientService.name}#createRollbackMessage`;
this.log.debug(
`${fnTag} - Creating RollbackMessage for sessionId: ${session.getSessionId()}`,
);

const rollbackMessage = create(RollbackMessageSchema, {
sessionId: session.getSessionId(),
messageType: "urn:ietf:SATP-2pc:msgtype:rollback-msg",
success: rollbackState.status === "completed",
actionsPerformed: [],
proofs: [],
senderSignature: "",
});
//await this.dbLogger.persistLogEntry({});
this.log.debug(`${fnTag} - RollbackMessage created:`, rollbackMessage);

return rollbackMessage;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import { ConnectRouter } from "@connectrpc/connect";
import { Logger, LoggerProvider } from "@hyperledger/cactus-common";
import {
CrashRecovery,
RecoverSuccessMessageResponse,
} from "../../generated/proto/cacti/satp/v02/crash_recovery_pb";
import { CrashRecoveryServerService } from "./server-service";
import { CrashRecoveryClientService } from "./client-service";
import { SATPSession } from "../satp-session";
import {
RecoverMessage,
RecoverUpdateMessage,
RecoverSuccessMessage,
RollbackMessage,
RollbackAckMessage,
RollbackState,
} from "../../generated/proto/cacti/satp/v02/crash_recovery_pb";
import { SATPHandler, SATPHandlerType } from "../../types/satp-protocol";

export class CrashRecoveryHandler implements SATPHandler {
private readonly log: Logger;

constructor(
private readonly serverService: CrashRecoveryServerService,
private readonly clientService: CrashRecoveryClientService,
loggerLabel: string = "CrashRecoveryHandler",
) {
this.log = LoggerProvider.getOrCreate({ label: loggerLabel });
this.log.trace(`Initialized ${CrashRecoveryHandler.name}`);
}

public getHandlerIdentifier(): SATPHandlerType {
return SATPHandlerType.CRASH;
}

public getHandlerSessions(): string[] {
return [];
}

public getStage(): string {
return "crash";
}

// Server-side

private async recoverV2MessageImplementation(
req: RecoverMessage,
): Promise<RecoverUpdateMessage> {
const fnTag = `${CrashRecoveryHandler.name}#recoverV2MessageImplementation`;
this.log.debug(`${fnTag} - Handling RecoverMessage: ${req}`);
try {
return await this.serverService.handleRecover(req);
} catch (error) {
this.log.error(`${fnTag} - Error:`, error);
throw error;
}
}

private async recoverV2SuccessMessageImplementation(
req: RecoverSuccessMessage,
): Promise<RecoverSuccessMessageResponse> {
const fnTag = `${CrashRecoveryHandler.name}#recoverV2SuccessMessageImplementation`;
this.log.debug(`${fnTag} - Handling RecoverSuccessMessage:${req}`);
try {
return await this.serverService.handleRecoverSuccess(req);
} catch (error) {
this.log.error(`${fnTag} - Error:`, error);
throw error;
}
}

private async rollbackV2MessageImplementation(
req: RollbackMessage,
): Promise<RollbackAckMessage> {
const fnTag = `${CrashRecoveryHandler.name}#rollbackV2MessageImplementation`;
this.log.debug(`${fnTag} - Handling RollbackMessage: ${req}`);
try {
return await this.serverService.handleRollback(req);
} catch (error) {
this.log.error(`${fnTag} - Error:`, error);
throw error;
}
}

public setupRouter(router: ConnectRouter): void {
// eslint-disable-next-line @typescript-eslint/no-this-alias
const that = this;
router.service(CrashRecovery, {
async recoverV2Message(req) {
return await that.recoverV2MessageImplementation(req);
},
async recoverV2SuccessMessage(req) {
return await that.recoverV2SuccessMessageImplementation(req);
},
async rollbackV2Message(req) {
return await that.rollbackV2MessageImplementation(req);
},
});

this.log.info("Router setup completed for CrashRecoveryHandler");
}

// Client-side

public async createRecoverMessage(
session: SATPSession,
): Promise<RecoverMessage> {
const fnTag = `${this.constructor.name}#createRecoverMessage`;
try {
return this.clientService.createRecoverMessage(session);
} catch (error) {
this.log.error(`${fnTag} - Failed to create RecoverMessage: ${error}`);
throw new Error(`Error in createRecoverMessage: ${error}`);
}
}

public async createRecoverSuccessMessage(
session: SATPSession,
): Promise<RecoverSuccessMessage> {
const fnTag = `${this.constructor.name}#createRecoverSuccessMessage`;
try {
return await this.clientService.createRecoverSuccessMessage(session);
} catch (error) {
this.log.error(
`${fnTag} - Failed to create RecoverSuccessMessage: ${error}`,
);
throw new Error(`Error in createRecoverSuccessMessage: ${error}`);
}
}

public async createRollbackMessage(
session: SATPSession,
rollbackState: RollbackState,
): Promise<RollbackMessage> {
const fnTag = `${this.constructor.name}#createRollbackMessage`;
try {
return await this.clientService.createRollbackMessage(
session,
rollbackState,
);
} catch (error) {
this.log.error(`${fnTag} - Failed to create RollbackMessage: ${error}`);
throw new Error(`Error in createRollbackMessage: ${error}`);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { Logger, LoggerProvider } from "@hyperledger/cactus-common";
import { SATPSession } from "../../satp-session";
import { Stage0RollbackStrategy } from "./stage0-rollback-strategy";
import { Stage1RollbackStrategy } from "./stage1-rollback-strategy";
import { Stage2RollbackStrategy } from "./stage2-rollback-strategy";
import { Stage3RollbackStrategy } from "./stage3-rollback-strategy";
import { SATPBridgesManager } from "../../../gol/satp-bridges-manager";
import { RollbackState } from "../../../generated/proto/cacti/satp/v02/crash_recovery_pb";
import { ILocalLogRepository } from "../../../repository/interfaces/repository";
//import { SATPLogger } from "../../../logging";

export interface RollbackStrategy {
execute(session: SATPSession): Promise<RollbackState>;
// todo do we want to return any information?
cleanup(session: SATPSession, state: RollbackState): Promise<RollbackState>;
}

export class RollbackStrategyFactory {
private log: Logger;
private bridgesManager: SATPBridgesManager;
private logRepository: ILocalLogRepository;
//private dbLogger: SATPLogger;

constructor(
bridgesManager: SATPBridgesManager,
localLog: ILocalLogRepository,
) {
this.log = LoggerProvider.getOrCreate({ label: "RollbackStrategyFactory" });
this.bridgesManager = bridgesManager;
this.logRepository = localLog;
//this.dbLogger = dbLogger;
}

createStrategy(session: SATPSession): RollbackStrategy {
const fnTag = "RollbackStrategyFactory#createStrategy";
const sessionData = session.hasClientSessionData()
? session.getClientSessionData()
: session.getServerSessionData();

if (!sessionData.hashes) {
this.log.debug(`${fnTag} Creating Stage0RollbackStrategy`);
return new Stage0RollbackStrategy(this.logRepository);
} else if (
!sessionData.hashes.stage2 ||
Object.keys(sessionData.hashes.stage2).length === 0
) {
this.log.debug(`${fnTag} Creating Stage1RollbackStrategy`);
return new Stage1RollbackStrategy(
this.bridgesManager,
this.logRepository,
);
} else if (
!sessionData.hashes.stage3 ||
Object.keys(sessionData.hashes.stage3).length === 0
) {
this.log.debug(`${fnTag} Creating Stage2RollbackStrategy`);
return new Stage2RollbackStrategy(
this.bridgesManager,
this.logRepository,
);
} else {
this.log.debug(`${fnTag} Creating Stage3RollbackStrategy`);
return new Stage3RollbackStrategy(
this.bridgesManager,
this.logRepository,
);
}
}
}
Loading

0 comments on commit d6ffbca

Please sign in to comment.