diff --git a/package.json b/package.json index 024e8a1..5b9679f 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "chrono-forge", - "version": "0.6.0", + "version": "0.6.1", "description": "A comprehensive framework for building resilient Temporal workflows, advanced state management, and real-time streaming activities in TypeScript. Designed for a seamless developer experience with powerful abstractions, dynamic orchestration, and full control over distributed systems.", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/src/activities/StreamingChatActivity.test.ts b/src/activities/StreamingChatActivity.test.ts index e71e3ef..9779824 100644 --- a/src/activities/StreamingChatActivity.test.ts +++ b/src/activities/StreamingChatActivity.test.ts @@ -67,6 +67,8 @@ describe('StreamingChatActivity', () => { await worker?.shutdown(); await exporter.forceFlush(); workflowCoverage.mergeIntoGlobalCoverage(); + await testEnv.cancel(); + jest.clearAllTimers(); }, 20000); async function runActivityWithMessage(messageSent: string, expectedMessage: string) { diff --git a/src/tests/StatefulWorkflow.test.ts b/src/tests/StatefulWorkflow.test.ts index 7885ed1..4e5690d 100644 --- a/src/tests/StatefulWorkflow.test.ts +++ b/src/tests/StatefulWorkflow.test.ts @@ -16,9 +16,10 @@ import { OpenTelemetryActivityInboundInterceptor, makeWorkflowExporter } from '@ import { normalizeEntities } from '../utils/entities'; import { getExternalWorkflowHandle } from '@temporalio/workflow'; import { cloneDeep } from 'lodash'; -import { get, set } from 'dottie'; +import dottie, { get, set } from 'dottie'; import { getExporter, getResource, getTracer } from '../utils/instrumentation'; import { Photo } from './testSchemas'; +import { getCompositeKey } from './../utils'; const workflowCoverage = new WorkflowCoverage(); const tracer = getTracer('temporal_worker'); @@ -125,6 +126,66 @@ describe('StatefulWorkflow', () => { await handle.cancel(); }); + describe('getCompositeKey', () => { + it('should generate a composite key from single-level attributes', () => { + const entity = { + id: '123', + type: 'abc' + }; + const idAttributes = ['id', 'type']; + + const compositeKey = getCompositeKey(entity, idAttributes); + expect(compositeKey).toBe('123-abc'); + }); + + it('should generate a composite key from nested attributes', () => { + const entity = { + user: { + id: '123', + type: 'abc' + } + }; + const idAttributes = ['user.id', 'user.type']; + + const compositeKey = getCompositeKey(entity, idAttributes); + expect(compositeKey).toBe('123-abc'); + }); + + it('should generate a composite key from mixed attributes', () => { + const entity = { + id: '123', + details: { + type: 'abc' + } + }; + const idAttributes = ['id', 'details.type']; + + const compositeKey = getCompositeKey(entity, idAttributes); + expect(compositeKey).toBe('123-abc'); + }); + + it('should handle missing attributes gracefully', () => { + const entity = { + id: '123' + }; + const idAttributes = ['id', 'type']; + + const compositeKey = getCompositeKey(entity, idAttributes); + expect(compositeKey).toBe('123-'); + }); + + it('should handle empty idAttributes array', () => { + const entity = { + id: '123', + type: 'abc' + }; + const idAttributes: string[] = []; + + const compositeKey = getCompositeKey(entity, idAttributes); + expect(compositeKey).toBe(''); + }); + }); + it.skip('Should update state and child workflow and maintain state in parent and child correctly', async () => { const data = { id: uuid4(), listings: [{ id: uuid4(), name: 'Awesome test listing' }] }; const handle = await execute(workflows.ShouldExecuteStateful, { id: data.id, entityName: 'User', data }); diff --git a/src/utils/getCompositeKey.ts b/src/utils/getCompositeKey.ts new file mode 100644 index 0000000..eb469a1 --- /dev/null +++ b/src/utils/getCompositeKey.ts @@ -0,0 +1,5 @@ +import dottie from 'dottie'; + +export function getCompositeKey(entity: Record, idAttributes: string[]): string { + return idAttributes.map((attr) => dottie.get(entity, attr)).join('-'); +} diff --git a/src/utils/index.ts b/src/utils/index.ts index ae3a9f8..722b2fa 100644 --- a/src/utils/index.ts +++ b/src/utils/index.ts @@ -1,3 +1,4 @@ export * from './entities'; export * from './limitRecursion'; export * from './startChildPayload'; +export * from './getCompositeKey'; diff --git a/src/workflows/StatefulWorkflow.ts b/src/workflows/StatefulWorkflow.ts index 5f1f999..af4b225 100644 --- a/src/workflows/StatefulWorkflow.ts +++ b/src/workflows/StatefulWorkflow.ts @@ -21,12 +21,13 @@ import { Workflow, ChronoFlowOptions } from './Workflow'; import { Signal, Query, Hook, Before, After, Property, Condition, Step, ContinueAsNew } from '../decorators'; import { SchemaManager } from '../SchemaManager'; import { limitRecursion } from '../utils/limitRecursion'; +import { getCompositeKey } from '../utils/getCompositeKey'; export type ManagedPath = { entityName?: string; path?: string; workflowType?: string; - idAttribute?: string; + idAttribute?: string | string[]; autoStartChildren?: boolean; cancellationType?: workflow.ChildWorkflowCancellationType; parentClosePolicy?: workflow.ParentClosePolicy; @@ -525,7 +526,10 @@ export abstract class StatefulWorkflow extends Workflow { } for (const currentItem of currentItems) { - const itemId = currentItem[config.idAttribute as string]; + const itemId = Array.isArray(config.idAttribute) + ? getCompositeKey(currentItem, config.idAttribute) + : currentItem[config.idAttribute as string]; + if (itemId && get(differences, `deleted.${config.entityName as string}.${itemId}`)) { await this.processDeletion(itemId, config); } @@ -541,7 +545,8 @@ export abstract class StatefulWorkflow extends Workflow { ): Promise { this.log.debug(`[StatefulWorkflow]:${this.constructor.name}:processSingleItem`); - const itemId = item; // item is already the itemId + const itemId = Array.isArray(config.idAttribute) ? getCompositeKey(newState[config.entityName as string][item], config.idAttribute) : item; // just assign item if idAttribute is not an array + const existingHandle = this.handles[`${config.entityName}-${itemId}`]; const previousItem = get(previousState, `${config.entityName}.${itemId}`, {}); const newItem = get(newState, `${config.entityName}.${itemId}`, {}); @@ -576,8 +581,9 @@ export abstract class StatefulWorkflow extends Workflow { } const entitySchema = SchemaManager.getInstance().getSchema(entityName as string); - const { [idAttribute as string]: id } = state; - const workflowId = `${entityName}-${id}`; + const { [idAttribute as string]: id, ...rest } = state; + const compositeId = Array.isArray(idAttribute) ? getCompositeKey(state, idAttribute) : id; + const workflowId = `${entityName}-${compositeId}`; const rawData = limitRecursion(denormalize(state, entitySchema, newState), entitySchema); if (this.ancestorWorkflowIds.includes(workflowId)) { @@ -656,7 +662,9 @@ export abstract class StatefulWorkflow extends Workflow { const entitySchema = SchemaManager.getInstance().getSchema(entityName as string); const { [idAttribute as string]: id } = state; - const workflowId = `${entityName}-${id}`; + const compositeId = Array.isArray(config.idAttribute) ? getCompositeKey(state, config.idAttribute) : state[config.idAttribute as string]; + const workflowId = `${entityName}-${compositeId}`; + const rawData = limitRecursion(denormalize(state, entitySchema, newState), entitySchema); if (this.ancestorWorkflowIds.includes(workflowId)) {