Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cancellation to RPC handlers #846

Draft
wants to merge 2 commits into
base: staging
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 74 additions & 52 deletions src/git/http.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { ContextCancellable } from '@matrixai/contexts';
import type {
CapabilityList,
Reference,
Expand Down Expand Up @@ -110,15 +111,18 @@ import * as utils from '../utils';
*
* `referenceList` is called for generating the `ref_list` stage.
*/
async function* advertiseRefGenerator({
efs,
dir,
gitDir,
}: {
efs: EncryptedFS;
dir: string;
gitDir: string;
}): AsyncGenerator<Buffer, void, void> {
async function* advertiseRefGenerator(
{
efs,
dir,
gitDir,
}: {
efs: EncryptedFS;
dir: string;
gitDir: string;
},
ctx: ContextCancellable,
): AsyncGenerator<Buffer, void, void> {
// Providing side-band-64, symref for the HEAD and agent name capabilities
const capabilityList = [
gitUtils.SIDE_BAND_64_CAPABILITY,
Expand All @@ -130,18 +134,21 @@ async function* advertiseRefGenerator({
}),
gitUtils.AGENT_CAPABILITY,
];
const objectGenerator = gitUtils.listReferencesGenerator({
efs,
dir,
gitDir,
});
const objectGenerator = gitUtils.listReferencesGenerator(
{
efs,
dir,
gitDir,
},
ctx,
);

// PKT-LINE("# service=$servicename" LF)
yield packetLineBuffer(gitUtils.REFERENCE_DISCOVERY_HEADER);
// "0000"
yield gitUtils.FLUSH_PACKET_BUFFER;
// Ref_list
yield* referenceListGenerator(objectGenerator, capabilityList);
yield* referenceListGenerator(objectGenerator, capabilityList, ctx);
// "0000"
yield gitUtils.FLUSH_PACKET_BUFFER;
}
Expand All @@ -165,6 +172,7 @@ async function* advertiseRefGenerator({
async function* referenceListGenerator(
objectGenerator: AsyncGenerator<[Reference, ObjectId], void, void>,
capabilities: CapabilityList,
ctx: ContextCancellable,
): AsyncGenerator<Buffer, void, void> {
// Cap-list = capability *(SP capability)
const capabilitiesListBuffer = Buffer.from(
Expand All @@ -175,6 +183,7 @@ async function* referenceListGenerator(
// *ref_record
let first = true;
for await (const [name, objectId] of objectGenerator) {
ctx.signal.throwIfAborted();
if (first) {
// PKT-LINE(obj-id SP name NUL cap_list LF)
yield packetLineBuffer(
Expand Down Expand Up @@ -341,34 +350,43 @@ async function parsePackRequest(
* It will respond with the `PKT-LINE(NAK_BUFFER)` and then the `packFile` data chunked into lines for the stream.
*
*/
async function* generatePackRequest({
efs,
dir,
gitDir,
body,
}: {
efs: EncryptedFS;
dir: string;
gitDir: string;
body: Array<Buffer>;
}): AsyncGenerator<Buffer, void, void> {
const [wants, haves, _capabilities] = await parsePackRequest(body);
const objectIds = await gitUtils.listObjects({
efs: efs,
async function* generatePackRequest(
{
efs,
dir,
gitDir: gitDir,
wants,
haves,
});
gitDir,
body,
}: {
efs: EncryptedFS;
dir: string;
gitDir: string;
body: Array<Buffer>;
},
ctx: ContextCancellable,
): AsyncGenerator<Buffer, void, void> {
const [wants, haves, _capabilities] = await parsePackRequest(body);
const objectIds = await gitUtils.listObjects(
{
efs: efs,
dir,
gitDir: gitDir,
wants,
haves,
},
// ctx,
);
// Reply that we have no common history and that we need to send everything
yield packetLineBuffer(gitUtils.NAK_BUFFER);
// Send everything over in pack format
yield* generatePackData({
efs: efs,
dir,
gitDir,
objectIds,
});
yield* generatePackData(
{
efs: efs,
dir,
gitDir,
objectIds,
},
ctx,
);
// Send dummy progress data
yield packetLineBuffer(
gitUtils.DUMMY_PROGRESS_BUFFER,
Expand All @@ -384,19 +402,22 @@ async function* generatePackRequest({
* The `packFile` is chunked into the `packetLineBuffer` with the size defined by `chunkSize`.
*
*/
async function* generatePackData({
efs,
dir,
gitDir,
objectIds,
chunkSize = gitUtils.PACK_CHUNK_SIZE,
}: {
efs: EncryptedFS;
dir: string;
gitDir: string;
objectIds: Array<ObjectId>;
chunkSize?: number;
}): AsyncGenerator<Buffer, void, void> {
async function* generatePackData(
{
efs,
dir,
gitDir,
objectIds,
chunkSize = gitUtils.PACK_CHUNK_SIZE,
}: {
efs: EncryptedFS;
dir: string;
gitDir: string;
objectIds: Array<ObjectId>;
chunkSize?: number;
},
ctx: ContextCancellable,
): AsyncGenerator<Buffer, void, void> {
let packFile: PackObjectsResult;
// In case of errors we don't want to throw them. This will result in the error being thrown into `isometric-git`
// when it consumes the response. It handles this by logging out the error which we don't want to happen.
Expand All @@ -423,6 +444,7 @@ async function* generatePackData({
// Streaming the packFile as chunks of the length specified by the `chunkSize`.
// Each line is formatted as a `PKT-LINE`
do {
ctx.signal.throwIfAborted();
const subBuffer = packFileBuffer.subarray(0, chunkSize);
packFileBuffer = packFileBuffer.subarray(chunkSize);
yield packetLineBuffer(subBuffer, gitUtils.CHANNEL_DATA);
Expand Down
11 changes: 8 additions & 3 deletions src/git/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import git from 'isomorphic-git';
import { requestTypes } from './types';
import * as utils from '../utils';
import * as validationErrors from '../validation/errors';
import {ContextCancellable} from "@matrixai/contexts";

// Constants
// Total number of bytes per pack line minus the 4 size bytes and 1 channel byte
Expand Down Expand Up @@ -75,7 +76,7 @@ async function* listReferencesGenerator({
efs: EncryptedFS;
dir: string;
gitDir: string;
}): AsyncGenerator<[Reference, ObjectId], void, void> {
}, ctx: ContextCancellable): AsyncGenerator<[Reference, ObjectId], void, void> {
const refs: Array<[string, Promise<string>]> = await git
.listBranches({
fs: efs,
Expand All @@ -84,6 +85,7 @@ async function* listReferencesGenerator({
})
.then((refs) => {
return refs.map((ref) => {
ctx.signal.throwIfAborted();
return [
`${REFERENCES_STRING}${ref}`,
git.resolveRef({ fs: efs, dir, gitdir: gitDir, ref: ref }),
Expand All @@ -99,6 +101,7 @@ async function* listReferencesGenerator({
});
yield [HEAD_REFERENCE, resolvedHead];
for (const [key, refP] of refs) {
ctx.signal.throwIfAborted();
yield [key, await refP];
}
}
Expand Down Expand Up @@ -155,14 +158,16 @@ async function listObjects({
gitDir: string;
wants: ObjectIdList;
haves: ObjectIdList;
}): Promise<ObjectIdList> {
}/*, ctx: ContextCancellable*/): Promise<ObjectIdList> {
// TODO: add support for ctx
const commits = new Set<string>();
const trees = new Set<string>();
const blobs = new Set<string>();
const tags = new Set<string>();
const havesSet: Set<string> = new Set(haves);

async function walk(objectId: ObjectId, type: ObjectType): Promise<void> {
// ctx.signal.throwIfAborted();
// If object was listed as a have then we don't need to walk over it
if (havesSet.has(objectId)) return;
switch (type) {
Copy link
Contributor Author

@aryanjassal aryanjassal Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is used in VaultInternal in garbageCollectGitObjects, which is called when initialising VaultInternal. As such, we don't have access to a context. Should I manually create a context and pass it in, or make context an optional parameter here?

@tegefaulkes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll need to pass the ctx to whatever needs it. For public methods in a class you should use the @timedCancellable decorator. Look at other cancellable methods for reference. For internal methods you can just pass along the ctx.

Expand Down Expand Up @@ -243,7 +248,7 @@ async function listObjectsAll({
}: {
fs: EncryptedFS;
gitDir: string;
}) {
}): Promise<Array<string>> {
const objectsDirPath = path.join(gitDir, objectsDirName);
const objectSet: Set<string> = new Set();
const objectDirs = await fs.promises.readdir(objectsDirPath);
Expand Down
9 changes: 6 additions & 3 deletions src/nodes/agent/handlers/VaultsGitInfoGet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ class VaultsGitInfoGet extends RawHandler<{
}> {
public handle = async (
input: [JSONRPCRequest, ReadableStream<Uint8Array>],
_cancel,
_cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
_ctx: ContextTimed, // TODO: use
ctx: ContextTimed,
): Promise<[JSONObject, ReadableStream<Uint8Array>]> => {
const { db, vaultManager, acl } = this.container;
const [headerMessage, inputStream] = input;
Expand Down Expand Up @@ -91,7 +91,10 @@ class VaultsGitInfoGet extends RawHandler<{
let handleInfoRequestGen: AsyncGenerator<Buffer>;
const stream = new ReadableStream({
start: async () => {
handleInfoRequestGen = vaultManager.handleInfoRequest(data.vaultId);
handleInfoRequestGen = vaultManager.handleInfoRequest(
data.vaultId,
ctx,
);
},
pull: async (controller) => {
const result = await handleInfoRequestGen.next();
Expand Down
10 changes: 6 additions & 4 deletions src/nodes/agent/handlers/VaultsGitPackGet.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { DB } from '@matrixai/db';
import type { JSONObject, JSONRPCRequest } from '@matrixai/rpc';
import type { JSONObject, JSONRPCRequest, JSONValue } from '@matrixai/rpc';
import type { ContextTimed } from '@matrixai/contexts';
import type { VaultName } from '../../../vaults/types';
import type ACL from '../../../acl/ACL';
import type VaultManager from '../../../vaults/VaultManager';
Expand All @@ -22,8 +23,9 @@ class VaultsGitPackGet extends RawHandler<{
}> {
public handle = async (
input: [JSONRPCRequest, ReadableStream<Uint8Array>],
_cancel,
meta,
_cancel: (reason: any) => void,
meta: Record<string, JSONValue>,
ctx: ContextTimed,
): Promise<[JSONObject, ReadableStream<Uint8Array>]> => {
const { vaultManager, acl, db } = this.container;
const [headerMessage, inputStream] = input;
Expand Down Expand Up @@ -77,7 +79,7 @@ class VaultsGitPackGet extends RawHandler<{
for await (const message of inputStream) {
body.push(Buffer.from(message));
}
packRequestGen = vaultManager.handlePackRequest(vaultId, body);
packRequestGen = vaultManager.handlePackRequest(vaultId, body, ctx);
},
pull: async (controller) => {
const next = await packRequestGen.next();
Expand Down
14 changes: 10 additions & 4 deletions src/nodes/agent/handlers/VaultsScan.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import type { DB } from '@matrixai/db';
import type { ContextTimed } from '@matrixai/contexts';
import type {
AgentRPCRequestParams,
AgentRPCResponseResult,
VaultsScanMessage,
} from '../types';
import type VaultManager from '../../../vaults/VaultManager';
import type { JSONValue } from '@matrixai/rpc';
import { ServerHandler } from '@matrixai/rpc';
import * as agentErrors from '../errors';
import * as agentUtils from '../utils';
Expand All @@ -22,11 +24,13 @@ class VaultsScan extends ServerHandler<
AgentRPCResponseResult<VaultsScanMessage>
> {
public handle = async function* (
input: AgentRPCRequestParams,
_cancel,
meta,
_input: AgentRPCRequestParams,
_cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
): AsyncGenerator<AgentRPCResponseResult<VaultsScanMessage>> {
const { vaultManager, db } = this.container;
const { vaultManager, db }: { vaultManager: VaultManager; db: DB } =
this.container;
const requestingNodeId = agentUtils.nodeIdFromMeta(meta);
if (requestingNodeId == null) {
throw new agentErrors.ErrorAgentNodeIdMissing();
Expand All @@ -36,13 +40,15 @@ class VaultsScan extends ServerHandler<
> {
const listResponse = vaultManager.handleScanVaults(
requestingNodeId,
ctx,
tran,
);
for await (const {
vaultId,
vaultName,
vaultPermissions,
} of listResponse) {
ctx.signal.throwIfAborted();
yield {
vaultIdEncoded: vaultsUtils.encodeVaultId(vaultId),
vaultName,
Expand Down
Loading
Loading