Skip to content

Commit

Permalink
feat(satp-hermes): add crash recovery & rollback protocol
Browse files Browse the repository at this point in the history
Signed-off-by: Yogesh01000100 <[email protected]>

chore(satp-hermes): improve DB management

Signed-off-by: Rafael Belchior <[email protected]>

chore(satp-hermes): crash recovery architecture

Signed-off-by: Rafael Belchior <[email protected]>

fix(recovery): enhance crash recovery and rollback implementation

Signed-off-by: Yogesh01000100 <[email protected]>

refactor(recovery): consolidate logic and improve SATP message handling

Signed-off-by: Yogesh01000100 <[email protected]>

feat(recovery): add rollback implementations

Signed-off-by: Yogesh01000100 <[email protected]>

fix: correct return types and inits

Signed-off-by: Yogesh01000100 <[email protected]>
Co-authored-by: Rafael Belchior <[email protected]>

fix: add unit tests and resolve rollbackstate

Signed-off-by: Yogesh01000100 <[email protected]>

feat: add function processing logs from g2

Signed-off-by: Yogesh01000100 <[email protected]>

feat: add cron schedule for periodic crash checks

Signed-off-by: Yogesh01000100 <[email protected]>

fix: resolve rollback condition and add tests

Signed-off-by: Yogesh01000100 <[email protected]>

feat: add orchestrator communication layer using connect-RPC

Signed-off-by: Yogesh01000100 <[email protected]>

feat: add rollback protocol rpc

Signed-off-by: Yogesh01000100 <[email protected]>

fix: handle server log synchronization

Signed-off-by: Yogesh01000100 <[email protected]>

fix: resolve gol errors, add unit tests

Signed-off-by: Yogesh01000100 <[email protected]>

fix: handle server-side rollback

Signed-off-by: Yogesh01000100 <[email protected]>

fix: resolve networkId in rollback strategies

Signed-off-by: Yogesh01000100 <[email protected]>
  • Loading branch information
Yogesh01000100 committed Dec 13, 2024
1 parent 7f99648 commit d73a5eb
Show file tree
Hide file tree
Showing 32 changed files with 4,535 additions and 29 deletions.
2 changes: 2 additions & 0 deletions packages/cactus-plugin-satp-hermes/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
"jsonc": "2.0.0",
"knex": "2.4.0",
"kubo-rpc-client": "3.0.1",
"node-schedule": "2.1.1",
"npm-run-all": "4.1.5",
"openzeppelin-solidity": "3.4.2",
"pg": "8.13.1",
Expand Down Expand Up @@ -163,6 +164,7 @@
"@types/fs-extra": "11.0.4",
"@types/google-protobuf": "3.15.12",
"@types/node": "18.18.2",
"@types/node-schedule": "2.1.7",
"@types/pg": "8.11.10",
"@types/swagger-ui-express": "4.1.6",
"@types/tape": "4.13.4",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import { Knex } from "knex";
const envPath = process.env.ENV_PATH;
dotenv.config({ path: envPath });

const config: { [key: string]: Knex.Config } = {
development: {
export const knexRemoteInstance: { [key: string]: Knex.Config } = {
default: {
client: "sqlite3",
connection: {
filename: path.resolve(__dirname, ".dev.remote-" + uuidv4() + ".sqlite3"),
Expand All @@ -31,5 +31,3 @@ const config: { [key: string]: Knex.Config } = {
},
},
};

export default config;
6 changes: 2 additions & 4 deletions packages/cactus-plugin-satp-hermes/src/knex/knexfile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import { Knex } from "knex";
const envPath = process.env.ENV_PATH;
dotenv.config({ path: envPath });

const config: { [key: string]: Knex.Config } = {
development: {
export const knexLocalInstance: { [key: string]: Knex.Config } = {
default: {
client: "sqlite3",
connection: {
filename: path.resolve(__dirname, `.dev.local-${uuidv4()}.sqlite3`),
Expand All @@ -34,5 +34,3 @@ const config: { [key: string]: Knex.Config } = {
},
},
};

export default config;
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Knex } from "knex";

export function up(knex: Knex): Knex.SchemaBuilder {
return knex.schema.createTable("logs", (table) => {
table.string("sessionID").notNullable();
table.string("sessionId").notNullable();
table.string("type").notNullable();
table.string("key").notNullable().primary();
table.string("operation").notNullable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ service CrashRecovery {

// step RPCs
rpc RecoverV2Message(RecoverMessage) returns (RecoverUpdateMessage);
rpc RecoverV2SuccessMessage(RecoverSuccessMessage) returns (google.protobuf.Empty);
rpc RecoverV2SuccessMessage(RecoverSuccessMessage) returns (RecoverSuccessMessageResponse);
rpc RollbackV2Message(RollbackMessage) returns (RollbackAckMessage);
}

Expand Down Expand Up @@ -41,6 +41,12 @@ message RecoverSuccessMessage {
string sender_signature = 6;
}

message RecoverSuccessMessageResponse {
string session_id = 1;
bool received = 2;
string sender_signature = 3;
}

message RollbackMessage {
string session_id = 1;
string message_type = 2;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import {
RecoverMessage,
RecoverMessageSchema,
RecoverSuccessMessage,
RecoverSuccessMessageSchema,
RollbackMessage,
RollbackMessageSchema,
RollbackState,
} from "../../../typescript/generated/proto/cacti/satp/v02/crash_recovery_pb";
import { Logger } from "@hyperledger/cactus-common";
import { SATPSession } from "../satp-session";
import { create } from "@bufbuild/protobuf";
import { SATPLogger } from "../../logging";
import { stringify as safeStableStringify } from "safe-stable-stringify";

export class CrashRecoveryClientService {
private readonly log: Logger;

constructor(
private readonly dbLogger: SATPLogger,
log: Logger,
) {
this.log = log;
this.log.trace(`Initialized ${CrashRecoveryClientService.name}`);
}

public async createRecoverMessage(
session: SATPSession,
): Promise<RecoverMessage> {
const fnTag = `${CrashRecoveryClientService.name}#createRecoverMessage`;
this.log.debug(
`${fnTag} - Creating RecoverMessage for sessionId: ${session.getSessionId()}`,
);

const sessionData = session.getClientSessionData();

const recoverMessage = create(RecoverMessageSchema, {
sessionId: session.getSessionId(),
messageType: "urn:ietf:SATP-2pc:msgtype:recover-msg",
satpPhase: "",
sequenceNumber: Number(sessionData.lastSequenceNumber),
isBackup: false,
newIdentityPublicKey: "",
lastEntryTimestamp: BigInt(sessionData.lastMessageReceivedTimestamp),
senderSignature: "",
});

await this.dbLogger.persistLogEntry({
sessionID: recoverMessage.sessionId,
type: "urn:ietf:SATP-2pc:msgtype:recover-msg",
operation: "done",
data: safeStableStringify(sessionData),
sequenceNumber: Number(sessionData.lastSequenceNumber),
});
this.log.debug(`${fnTag} - RecoverMessage created:`, recoverMessage);

return recoverMessage;
}

public async createRecoverSuccessMessage(
session: SATPSession,
): Promise<RecoverSuccessMessage> {
const fnTag = `${CrashRecoveryClientService.name}#createRecoverSuccessMessage`;
this.log.debug(
`${fnTag} - Creating RecoverSuccessMessage for sessionId: ${session.getSessionId()}`,
);
const sessionData = session.getClientSessionData();
const recoverSuccessMessage = create(RecoverSuccessMessageSchema, {
sessionId: session.getSessionId(),
messageType: "urn:ietf:SATP-2pc:msgtype:recover-success-msg",
hashRecoverUpdateMessage: "",
success: true,
entriesChanged: [],
senderSignature: "",
});
await this.dbLogger.persistLogEntry({
sessionID: recoverSuccessMessage.sessionId,
type: "urn:ietf:SATP-2pc:msgtype:recover-success-msg",
operation: "done",
data: safeStableStringify(sessionData),
sequenceNumber: Number(sessionData.lastSequenceNumber),
});
this.log.debug(
`${fnTag} - RecoverSuccessMessage created:`,
recoverSuccessMessage,
);

return recoverSuccessMessage;
}

public async createRollbackMessage(
session: SATPSession,
rollbackState: RollbackState,
): Promise<RollbackMessage> {
const fnTag = `${CrashRecoveryClientService.name}#createRollbackMessage`;
this.log.debug(
`${fnTag} - Creating RollbackMessage for sessionId: ${session.getSessionId()}`,
);
const sessionData = session.getClientSessionData();
const rollbackMessage = create(RollbackMessageSchema, {
sessionId: session.getSessionId(),
messageType: "urn:ietf:SATP-2pc:msgtype:rollback-msg",
success: rollbackState.status === "completed",
actionsPerformed: [],
proofs: [],
senderSignature: "",
});
await this.dbLogger.persistLogEntry({
sessionID: rollbackMessage.sessionId,
type: "urn:ietf:SATP-2pc:msgtype:rollback-msg",
operation: "done",
data: safeStableStringify(sessionData),
sequenceNumber: Number(sessionData.lastSequenceNumber),
});
this.log.debug(`${fnTag} - RollbackMessage created:`, rollbackMessage);

return rollbackMessage;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import { ConnectRouter } from "@connectrpc/connect";
import { Logger } from "@hyperledger/cactus-common";
import {
CrashRecovery,
RecoverSuccessMessageResponse,
} from "../../generated/proto/cacti/satp/v02/crash_recovery_pb";
import { CrashRecoveryServerService } from "./server-service";
import { CrashRecoveryClientService } from "./client-service";
import { SATPSession } from "../satp-session";
import {
RecoverMessage,
RecoverUpdateMessage,
RecoverSuccessMessage,
RollbackMessage,
RollbackAckMessage,
RollbackState,
} from "../../generated/proto/cacti/satp/v02/crash_recovery_pb";
import { SATPHandler, SATPHandlerType } from "../../types/satp-protocol";

export class CrashRecoveryHandler implements SATPHandler {
private readonly log: Logger;

constructor(
private readonly serverService: CrashRecoveryServerService,
private readonly clientService: CrashRecoveryClientService,
log: Logger,
) {
this.log = log;
this.log.trace(`Initialized ${CrashRecoveryHandler.name}`);
}

public getHandlerIdentifier(): SATPHandlerType {
return SATPHandlerType.CRASH;
}

public getHandlerSessions(): string[] {
return [];
}

public getStage(): string {
return "crash";
}

// Server-side

private async recoverV2MessageImplementation(
req: RecoverMessage,
): Promise<RecoverUpdateMessage> {
const fnTag = `${CrashRecoveryHandler.name}#recoverV2MessageImplementation`;
this.log.debug(`${fnTag} - Handling RecoverMessage: ${req}`);
try {
return await this.serverService.handleRecover(req);
} catch (error) {
this.log.error(`${fnTag} - Error:`, error);
throw error;
}
}

private async recoverV2SuccessMessageImplementation(
req: RecoverSuccessMessage,
): Promise<RecoverSuccessMessageResponse> {
const fnTag = `${CrashRecoveryHandler.name}#recoverV2SuccessMessageImplementation`;
this.log.debug(`${fnTag} - Handling RecoverSuccessMessage:${req}`);
try {
return await this.serverService.handleRecoverSuccess(req);
} catch (error) {
this.log.error(`${fnTag} - Error:`, error);
throw error;
}
}

private async rollbackV2MessageImplementation(
req: RollbackMessage,
): Promise<RollbackAckMessage> {
const fnTag = `${CrashRecoveryHandler.name}#rollbackV2MessageImplementation`;
this.log.debug(`${fnTag} - Handling RollbackMessage: ${req}`);
try {
return await this.serverService.handleRollback(req);
} catch (error) {
this.log.error(`${fnTag} - Error:`, error);
throw error;
}
}

public setupRouter(router: ConnectRouter): void {
// eslint-disable-next-line @typescript-eslint/no-this-alias
const that = this;
router.service(CrashRecovery, {
async recoverV2Message(req) {
return await that.recoverV2MessageImplementation(req);
},
async recoverV2SuccessMessage(req) {
return await that.recoverV2SuccessMessageImplementation(req);
},
async rollbackV2Message(req) {
return await that.rollbackV2MessageImplementation(req);
},
});

this.log.info("Router setup completed for CrashRecoveryHandler");
}

// Client-side

public async createRecoverMessage(
session: SATPSession,
): Promise<RecoverMessage> {
const fnTag = `${this.constructor.name}#createRecoverMessage`;
try {
return this.clientService.createRecoverMessage(session);
} catch (error) {
this.log.error(`${fnTag} - Failed to create RecoverMessage: ${error}`);
throw new Error(`Error in createRecoverMessage: ${error}`);
}
}

public async createRecoverSuccessMessage(
session: SATPSession,
): Promise<RecoverSuccessMessage> {
const fnTag = `${this.constructor.name}#createRecoverSuccessMessage`;
try {
return await this.clientService.createRecoverSuccessMessage(session);
} catch (error) {
this.log.error(
`${fnTag} - Failed to create RecoverSuccessMessage: ${error}`,
);
throw new Error(`Error in createRecoverSuccessMessage: ${error}`);
}
}

public async createRollbackMessage(
session: SATPSession,
rollbackState: RollbackState,
): Promise<RollbackMessage> {
const fnTag = `${this.constructor.name}#createRollbackMessage`;
try {
return await this.clientService.createRollbackMessage(
session,
rollbackState,
);
} catch (error) {
this.log.error(`${fnTag} - Failed to create RollbackMessage: ${error}`);
throw new Error(`Error in createRollbackMessage: ${error}`);
}
}
}
Loading

0 comments on commit d73a5eb

Please sign in to comment.