Skip to content

Commit

Permalink
feat(satp-hermes): add crash recovery & rollback protocol
Browse files Browse the repository at this point in the history
1. Implemented recovery & rollback using RPC-based message handlers.
2. Added rollback strategies for all SATP stages.
3. Integrated database log management for recovery and rollback.
4. Added cron jobs for scheduled crash detection and recovery initiation.

Co-authored-by: Rafael Belchior <[email protected]>
Co-authored-by: Carlos Amaro <[email protected]>
Signed-off-by: Yogesh01000100 <[email protected]>

chore(satp-hermes): improve DB management

Signed-off-by: Rafael Belchior <[email protected]>

chore(satp-hermes): crash recovery architecture

Signed-off-by: Rafael Belchior <[email protected]>

fix(recovery): enhance crash recovery and rollback implementation

Signed-off-by: Yogesh01000100 <[email protected]>

refactor(recovery): consolidate logic and improve SATP message handling

Signed-off-by: Yogesh01000100 <[email protected]>

feat(recovery): add rollback implementations

Signed-off-by: Yogesh01000100 <[email protected]>

fix: correct return types and inits

Signed-off-by: Yogesh01000100 <[email protected]>

fix: add unit tests and resolve rollbackstate

Signed-off-by: Yogesh01000100 <[email protected]>

feat: add function processing logs from g2

Signed-off-by: Yogesh01000100 <[email protected]>

feat: add cron schedule for periodic crash checks

Signed-off-by: Yogesh01000100 <[email protected]>

fix: resolve rollback condition and add tests

Signed-off-by: Yogesh01000100 <[email protected]>

feat: add orchestrator communication layer using connect-RPC

Signed-off-by: Yogesh01000100 <[email protected]>

feat: add rollback protocol rpc

Signed-off-by: Yogesh01000100 <[email protected]>

fix: handle server log synchronization

Signed-off-by: Yogesh01000100 <[email protected]>

fix: resolve gol errors, add unit tests

Signed-off-by: Yogesh01000100 <[email protected]>

fix: handle server-side rollback

Signed-off-by: Yogesh01000100 <[email protected]>

fix: resolve networkId in rollback strategies

Signed-off-by: Yogesh01000100 <[email protected]>
  • Loading branch information
Yogesh01000100 committed Dec 17, 2024
1 parent 15eee79 commit 503658c
Show file tree
Hide file tree
Showing 38 changed files with 6,566 additions and 37 deletions.
42 changes: 36 additions & 6 deletions packages/cactus-plugin-satp-hermes/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@ The sequence diagram of SATP is pictured below.

![satp-sequence-diagram](https://i.imgur.com/SOdXFEt.png)

### Crash Recovery Integration
The crash recovery protocol ensures session consistency across all stages of SATP. Each session's state, logs, hashes, timestamps, and signatures are stored and recovered using the following mechanisms:

1. **Session Logs**: A persistent log storage mechanism ensures crash-resilient state recovery.
2. **Stage Recovery**: Recovers interrupted sessions by validating logs, hashes, timestamps, and signatures to maintain protocol integrity.
1. **Consistency Checks**: Ensures all messages and actions are consistent across both gateways and the connected ledgers.
2. **Rollback Operations**: In the event of a timeout or irrecoverable failure, rollback messages ensure the state reverts back the current stage.
3. **Logging & Proofs**: The SQLite3 database is leveraged for state consistency and proof accountability across gateways.

Refer to the [Crash Recovery Sequence](https://datatracker.ietf.org/doc/html/draft-belchior-satp-gateway-recovery) for more details.

### Application-to-Gateway API (API Type 1)
We

Expand All @@ -76,17 +87,28 @@ There are Client and Server Endpoints for each type of message detailed in the S
- CommitFinalV1Response
- TransferCompleteV1Request
- ClientV1Request
### Crash Recovery Endpoints
There are Client and Server gRPC Endpoints for the recovery and rollback messages:

There are also defined the endpoints for the crash recovery procedure (there is still missing the endpoint to receive the Rollback mesage):
- RecoverV1Message
- RecoverUpdateV1Message
- RecoverUpdateAckV1Message
- RecoverSuccessV1Message
- RollbackV1Message
- **Recovery Messages:**
- `RecoverV2Message`
- `RecoverV2SuccessMessage`
- `RecoverUpdateMessage`
- **Rollback Messages:**
- `RollbackV2Message`
- `RollbackAckMessage`

## Use case
Alice and Bob, in blockchains A and B, respectively, want to make a transfer of an asset from one to the other. Gateway A represents the gateway connected to Alice's blockchain. Gateway B represents the gateway connected to Bob's blockchain. Alice and Bob will run SATP, which will execute the transfer of the asset from blockchain A to blockchain B. The above endpoints will be called in sequence. Notice that the asset will first be locked on blockchain A and a proof is sent to the server-side. Afterward, the asset on the original blockchain is extinguished, followed by its regeneration on blockchain B.

### Role of Crash Recovery in SATP
In SATP, crash recovery ensures that asset transfers remain consistent and fault-tolerant across distributed ledgers. Key features include:
- **Session Recovery**: Gateways synchronize state using recovery messages, ensuring continuity after failures.
- **Rollback**: For irrecoverable errors, rollback procedures ensure safe reversion to previous states.
- **Fault Resilience**: Enables recovery from crashes while maintaining the integrity of ongoing transfers.

These features enhance reliability in scenarios where network or gateway disruptions occur during asset transfers.

## Running the tests

[A test of the entire protocol with manual calls to the methods, i.e. without ledger connectors and Open API.](https://github.com/hyperledger/cactus/blob/2e94ef8d3b34449c7b4d48e37d81245851477a3e/packages/cactus-plugin-satp-hermes/src/test/typescript/integration/satp.test.ts)
Expand All @@ -109,6 +131,14 @@ Alice and Bob, in blockchains A and B, respectively, want to make a transfer of

[A test with a backup gateway resuming the protocol after the client gateway crashed.](https://github.com/hyperledger/cactus/tree/main/packages/cactus-plugin-satp-hermes/src/test/typescript/integration/backup-gateway-after-client-crash.test.ts)


### Crash Recovery Tests
- [Stage 1 Recovery Test](src/test/typescript/integration/recovery/recovery-stage-1.test.ts)
- [Stage 2 Recovery Test](src/test/typescript/integration/recovery/recovery-stage-2.test.ts)
- [Stage 3 Recovery Test](src/test/typescript/integration/recovery/recovery-stage-3.test.ts)
- [Rollback Test](src/test/typescript/integration/rollback/rollback.test.ts)
- [Rollback Timeout Test](src/test/typescript/integration/rollback/rollback-timeout.test.ts)

For developers that want to test separate steps/phases of the SATP protocol, please refer to [these](https://github.com/hyperledger/cactus/blob/2e94ef8d3b34449c7b4d48e37d81245851477a3e/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/) test files (client and server side along with the recovery procedure).

## Usage
Expand Down
2 changes: 2 additions & 0 deletions packages/cactus-plugin-satp-hermes/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
"jsonc": "2.0.0",
"knex": "2.4.0",
"kubo-rpc-client": "3.0.1",
"node-schedule": "2.1.1",
"npm-run-all": "4.1.5",
"openzeppelin-solidity": "3.4.2",
"pg": "8.13.1",
Expand Down Expand Up @@ -163,6 +164,7 @@
"@types/fs-extra": "11.0.4",
"@types/google-protobuf": "3.15.12",
"@types/node": "18.18.2",
"@types/node-schedule": "2.1.7",
"@types/pg": "8.11.10",
"@types/swagger-ui-express": "4.1.6",
"@types/tape": "4.13.4",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import { Knex } from "knex";
const envPath = process.env.ENV_PATH;
dotenv.config({ path: envPath });

const config: { [key: string]: Knex.Config } = {
development: {
export const knexRemoteInstance: { [key: string]: Knex.Config } = {
default: {
client: "sqlite3",
connection: {
filename: path.resolve(__dirname, ".dev.remote-" + uuidv4() + ".sqlite3"),
filename: path.resolve(__dirname, `.dev.remote-${uuidv4()}.sqlite3`),
},
migrations: {
directory: path.resolve(__dirname, "migrations"),
Expand All @@ -31,5 +31,3 @@ const config: { [key: string]: Knex.Config } = {
},
},
};

export default config;
6 changes: 2 additions & 4 deletions packages/cactus-plugin-satp-hermes/src/knex/knexfile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import { Knex } from "knex";
const envPath = process.env.ENV_PATH;
dotenv.config({ path: envPath });

const config: { [key: string]: Knex.Config } = {
development: {
export const knexLocalInstance: { [key: string]: Knex.Config } = {
default: {
client: "sqlite3",
connection: {
filename: path.resolve(__dirname, `.dev.local-${uuidv4()}.sqlite3`),
Expand All @@ -34,5 +34,3 @@ const config: { [key: string]: Knex.Config } = {
},
},
};

export default config;
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Knex } from "knex";

export function up(knex: Knex): Knex.SchemaBuilder {
return knex.schema.createTable("logs", (table) => {
table.string("sessionID").notNullable();
table.string("sessionId").notNullable();
table.string("type").notNullable();
table.string("key").notNullable().primary();
table.string("operation").notNullable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ service CrashRecovery {

// step RPCs
rpc RecoverV2Message(RecoverMessage) returns (RecoverUpdateMessage);
rpc RecoverV2SuccessMessage(RecoverSuccessMessage) returns (google.protobuf.Empty);
rpc RecoverV2SuccessMessage(RecoverSuccessMessage) returns (RecoverSuccessMessageResponse);
rpc RollbackV2Message(RollbackMessage) returns (RollbackAckMessage);
}

Expand Down Expand Up @@ -41,6 +41,12 @@ message RecoverSuccessMessage {
string sender_signature = 6;
}

message RecoverSuccessMessageResponse {
string session_id = 1;
bool received = 2;
string sender_signature = 3;
}

message RollbackMessage {
string session_id = 1;
string message_type = 2;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import {
RecoverMessage,
RecoverMessageSchema,
RecoverSuccessMessage,
RecoverSuccessMessageSchema,
RollbackMessage,
RollbackMessageSchema,
RollbackState,
} from "../../../typescript/generated/proto/cacti/satp/v02/crash_recovery_pb";
import { JsObjectSigner, Logger } from "@hyperledger/cactus-common";
import { SATPSession } from "../satp-session";
import { create } from "@bufbuild/protobuf";
import { SATPLogger } from "../../logging";
import { stringify as safeStableStringify } from "safe-stable-stringify";
import { bufArray2HexStr, sign } from "../../gateway-utils";

export class CrashRecoveryClientService {
constructor(
private readonly dbLogger: SATPLogger,
private readonly log: Logger,
private readonly signer: JsObjectSigner,
) {
this.log = log;
this.log.trace(`Initialized ${CrashRecoveryClientService.name}`);
}

public async createRecoverMessage(
session: SATPSession,
): Promise<RecoverMessage> {
const fnTag = `${CrashRecoveryClientService.name}#createRecoverMessage`;
this.log.debug(
`${fnTag} - Creating RecoverMessage for sessionId: ${session.getSessionId()}`,
);

const sessionData = session.getClientSessionData();

const recoverMessage = create(RecoverMessageSchema, {
sessionId: session.getSessionId(),
messageType: "urn:ietf:SATP-2pc:msgtype:recover-msg",
satpPhase: "",
sequenceNumber: Number(sessionData.lastSequenceNumber),
isBackup: false,
newIdentityPublicKey: "",
lastEntryTimestamp: BigInt(sessionData.lastMessageReceivedTimestamp),
senderSignature: "",
});

const signature = bufArray2HexStr(
sign(this.signer, safeStableStringify(recoverMessage)),
);

recoverMessage.senderSignature = signature;

await this.dbLogger.persistLogEntry({
sessionID: recoverMessage.sessionId,
type: "urn:ietf:SATP-2pc:msgtype:recover-msg",
operation: "done",
data: safeStableStringify(sessionData),
sequenceNumber: Number(sessionData.lastSequenceNumber),
});
this.log.debug(`${fnTag} - RecoverMessage created:`, recoverMessage);

return recoverMessage;
}

public async createRecoverSuccessMessage(
session: SATPSession,
): Promise<RecoverSuccessMessage> {
const fnTag = `${CrashRecoveryClientService.name}#createRecoverSuccessMessage`;
this.log.debug(
`${fnTag} - Creating RecoverSuccessMessage for sessionId: ${session.getSessionId()}`,
);
const sessionData = session.getClientSessionData();
const recoverSuccessMessage = create(RecoverSuccessMessageSchema, {
sessionId: session.getSessionId(),
messageType: "urn:ietf:SATP-2pc:msgtype:recover-success-msg",
hashRecoverUpdateMessage: "",
success: true,
entriesChanged: [],
senderSignature: "",
});

const signature = bufArray2HexStr(
sign(this.signer, safeStableStringify(recoverSuccessMessage)),
);

recoverSuccessMessage.senderSignature = signature;

await this.dbLogger.persistLogEntry({
sessionID: recoverSuccessMessage.sessionId,
type: "urn:ietf:SATP-2pc:msgtype:recover-success-msg",
operation: "done",
data: safeStableStringify(sessionData),
sequenceNumber: Number(sessionData.lastSequenceNumber),
});
this.log.debug(
`${fnTag} - RecoverSuccessMessage created:`,
recoverSuccessMessage,
);

return recoverSuccessMessage;
}

public async createRollbackMessage(
session: SATPSession,
rollbackState: RollbackState,
): Promise<RollbackMessage> {
const fnTag = `${CrashRecoveryClientService.name}#createRollbackMessage`;
this.log.debug(
`${fnTag} - Creating RollbackMessage for sessionId: ${session.getSessionId()}`,
);
const sessionData = session.getClientSessionData();
const rollbackMessage = create(RollbackMessageSchema, {
sessionId: session.getSessionId(),
messageType: "urn:ietf:SATP-2pc:msgtype:rollback-msg",
success: rollbackState.status === "COMPLETED",
actionsPerformed: rollbackState.rollbackLogEntries.map(
(entry) => entry.action,
),
proofs: [],
senderSignature: "",
});

const signature = bufArray2HexStr(
sign(this.signer, safeStableStringify(rollbackMessage)),
);

rollbackMessage.senderSignature = signature;

await this.dbLogger.persistLogEntry({
sessionID: rollbackMessage.sessionId,
type: "urn:ietf:SATP-2pc:msgtype:rollback-msg",
operation: "done",
data: safeStableStringify(sessionData),
sequenceNumber: Number(sessionData.lastSequenceNumber),
});
this.log.debug(`${fnTag} - RollbackMessage created:`, rollbackMessage);

return rollbackMessage;
}
}
Loading

0 comments on commit 503658c

Please sign in to comment.