Skip to content

Commit

Permalink
0.6.1 -- composite keys
Browse files Browse the repository at this point in the history
  • Loading branch information
pilsy committed Sep 12, 2024
1 parent 8f2413c commit b2a10ac
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 8 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "chrono-forge",
"version": "0.6.0",
"version": "0.6.1",
"description": "A comprehensive framework for building resilient Temporal workflows, advanced state management, and real-time streaming activities in TypeScript. Designed for a seamless developer experience with powerful abstractions, dynamic orchestration, and full control over distributed systems.",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand Down
2 changes: 2 additions & 0 deletions src/activities/StreamingChatActivity.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ describe('StreamingChatActivity', () => {
await worker?.shutdown();
await exporter.forceFlush();
workflowCoverage.mergeIntoGlobalCoverage();
await testEnv.cancel();
jest.clearAllTimers();
}, 20000);

async function runActivityWithMessage(messageSent: string, expectedMessage: string) {
Expand Down
63 changes: 62 additions & 1 deletion src/tests/StatefulWorkflow.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ import { OpenTelemetryActivityInboundInterceptor, makeWorkflowExporter } from '@
import { normalizeEntities } from '../utils/entities';
import { getExternalWorkflowHandle } from '@temporalio/workflow';
import { cloneDeep } from 'lodash';
import { get, set } from 'dottie';
import dottie, { get, set } from 'dottie';
import { getExporter, getResource, getTracer } from '../utils/instrumentation';
import { Photo } from './testSchemas';
import { getCompositeKey } from './../utils';

const workflowCoverage = new WorkflowCoverage();
const tracer = getTracer('temporal_worker');
Expand Down Expand Up @@ -125,6 +126,66 @@ describe('StatefulWorkflow', () => {
await handle.cancel();
});

describe('getCompositeKey', () => {
it('should generate a composite key from single-level attributes', () => {
const entity = {
id: '123',
type: 'abc'
};
const idAttributes = ['id', 'type'];

const compositeKey = getCompositeKey(entity, idAttributes);
expect(compositeKey).toBe('123-abc');
});

it('should generate a composite key from nested attributes', () => {
const entity = {
user: {
id: '123',
type: 'abc'
}
};
const idAttributes = ['user.id', 'user.type'];

const compositeKey = getCompositeKey(entity, idAttributes);
expect(compositeKey).toBe('123-abc');
});

it('should generate a composite key from mixed attributes', () => {
const entity = {
id: '123',
details: {
type: 'abc'
}
};
const idAttributes = ['id', 'details.type'];

const compositeKey = getCompositeKey(entity, idAttributes);
expect(compositeKey).toBe('123-abc');
});

it('should handle missing attributes gracefully', () => {
const entity = {
id: '123'
};
const idAttributes = ['id', 'type'];

const compositeKey = getCompositeKey(entity, idAttributes);
expect(compositeKey).toBe('123-');
});

it('should handle empty idAttributes array', () => {
const entity = {
id: '123',
type: 'abc'
};
const idAttributes: string[] = [];

const compositeKey = getCompositeKey(entity, idAttributes);
expect(compositeKey).toBe('');
});
});

it.skip('Should update state and child workflow and maintain state in parent and child correctly', async () => {
const data = { id: uuid4(), listings: [{ id: uuid4(), name: 'Awesome test listing' }] };
const handle = await execute(workflows.ShouldExecuteStateful, { id: data.id, entityName: 'User', data });
Expand Down
5 changes: 5 additions & 0 deletions src/utils/getCompositeKey.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import dottie from 'dottie';

export function getCompositeKey(entity: Record<string, any>, idAttributes: string[]): string {
return idAttributes.map((attr) => dottie.get(entity, attr)).join('-');
}
1 change: 1 addition & 0 deletions src/utils/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './entities';
export * from './limitRecursion';
export * from './startChildPayload';
export * from './getCompositeKey';
20 changes: 14 additions & 6 deletions src/workflows/StatefulWorkflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import { Workflow, ChronoFlowOptions } from './Workflow';
import { Signal, Query, Hook, Before, After, Property, Condition, Step, ContinueAsNew } from '../decorators';
import { SchemaManager } from '../SchemaManager';
import { limitRecursion } from '../utils/limitRecursion';
import { getCompositeKey } from '../utils/getCompositeKey';

export type ManagedPath = {
entityName?: string;
path?: string;
workflowType?: string;
idAttribute?: string;
idAttribute?: string | string[];
autoStartChildren?: boolean;
cancellationType?: workflow.ChildWorkflowCancellationType;
parentClosePolicy?: workflow.ParentClosePolicy;
Expand Down Expand Up @@ -525,7 +526,10 @@ export abstract class StatefulWorkflow extends Workflow {
}

for (const currentItem of currentItems) {
const itemId = currentItem[config.idAttribute as string];
const itemId = Array.isArray(config.idAttribute)
? getCompositeKey(currentItem, config.idAttribute)
: currentItem[config.idAttribute as string];

if (itemId && get(differences, `deleted.${config.entityName as string}.${itemId}`)) {
await this.processDeletion(itemId, config);
}
Expand All @@ -541,7 +545,8 @@ export abstract class StatefulWorkflow extends Workflow {
): Promise<void> {
this.log.debug(`[StatefulWorkflow]:${this.constructor.name}:processSingleItem`);

const itemId = item; // item is already the itemId
const itemId = Array.isArray(config.idAttribute) ? getCompositeKey(newState[config.entityName as string][item], config.idAttribute) : item; // just assign item if idAttribute is not an array

const existingHandle = this.handles[`${config.entityName}-${itemId}`];
const previousItem = get(previousState, `${config.entityName}.${itemId}`, {});
const newItem = get(newState, `${config.entityName}.${itemId}`, {});
Expand Down Expand Up @@ -576,8 +581,9 @@ export abstract class StatefulWorkflow extends Workflow {
}

const entitySchema = SchemaManager.getInstance().getSchema(entityName as string);
const { [idAttribute as string]: id } = state;
const workflowId = `${entityName}-${id}`;
const { [idAttribute as string]: id, ...rest } = state;
const compositeId = Array.isArray(idAttribute) ? getCompositeKey(state, idAttribute) : id;
const workflowId = `${entityName}-${compositeId}`;
const rawData = limitRecursion(denormalize(state, entitySchema, newState), entitySchema);

if (this.ancestorWorkflowIds.includes(workflowId)) {
Expand Down Expand Up @@ -656,7 +662,9 @@ export abstract class StatefulWorkflow extends Workflow {

const entitySchema = SchemaManager.getInstance().getSchema(entityName as string);
const { [idAttribute as string]: id } = state;
const workflowId = `${entityName}-${id}`;
const compositeId = Array.isArray(config.idAttribute) ? getCompositeKey(state, config.idAttribute) : state[config.idAttribute as string];
const workflowId = `${entityName}-${compositeId}`;

const rawData = limitRecursion(denormalize(state, entitySchema, newState), entitySchema);

if (this.ancestorWorkflowIds.includes(workflowId)) {
Expand Down

0 comments on commit b2a10ac

Please sign in to comment.