diff --git a/.github/workflows/verify.yml b/.github/workflows/verify.yml index 115cad4248..4bd66285bd 100644 --- a/.github/workflows/verify.yml +++ b/.github/workflows/verify.yml @@ -33,9 +33,19 @@ jobs: with: token: ${{ secrets.GITHUB_TOKEN }} + - name: Filter JS/TS Files + run: | + echo "${{ steps.files.outputs.added_modified }}" | tr ' ' '\n' | grep -E '\.(js|ts|jsx|tsx)$' > changed_files.txt + if [ ! -s changed_files.txt ]; then + echo "No JS/TS files to format or lint." + exit 0 + fi + - name: Run format Checks run: | - npx prettier ${{steps.files.outputs.added_modified}} --write + if [ -s changed_files.txt ]; then + npx prettier --write $(cat changed_files.txt) + fi - run: git diff --exit-code diff --git a/.gitignore b/.gitignore index 09c536ebb8..84421f49d9 100644 --- a/.gitignore +++ b/.gitignore @@ -122,6 +122,7 @@ dist # Stores VSCode versions used for testing VSCode extensions .vscode-test +.vscode # yarn v2 .yarn/cache @@ -133,9 +134,9 @@ dist # Others **/.DS_Store .dccache - +.python-version .idea # component test report test_reports/ -temp/ +temp/ \ No newline at end of file diff --git a/src/controllers/__tests__/source.test.ts b/src/controllers/__tests__/source.test.ts index 565f39d559..72bee83282 100644 --- a/src/controllers/__tests__/source.test.ts +++ b/src/controllers/__tests__/source.test.ts @@ -6,6 +6,7 @@ import { applicationRoutes } from '../../routes'; import { NativeIntegrationSourceService } from '../../services/source/nativeIntegration'; import { ServiceSelector } from '../../helpers/serviceSelector'; import { ControllerUtility } from '../util/index'; +import { SourceInputConversionResult } from '../../types'; let server: any; const OLD_ENV = process.env; @@ -38,6 +39,19 @@ const getData = () => { return [{ event: { a: 'b1' } }, { event: { a: 'b2' } }]; }; +const getV2Data = () => { + return [ + { request: { body: '{"a": "b"}' }, source: { id: 1 } }, + { request: { body: '{"a": "b"}' }, source: { id: 1 } }, + ]; +}; + +const getConvertedData = () => { + return getData().map((eventInstance) => { + return { output: eventInstance } as SourceInputConversionResult; + }); +}; + describe('Source controller tests', () => { describe('V0 Source transform tests', () => { test('successful source transform', async () => { @@ -49,7 +63,7 @@ describe('Source controller tests', () => { mockSourceService.sourceTransformRoutine = jest .fn() .mockImplementation((i, s, v, requestMetadata) => { - expect(i).toEqual(getData()); + expect(i).toEqual(getConvertedData()); expect(s).toEqual(sourceType); expect(v).toEqual(version); return testOutput; @@ -66,7 +80,7 @@ describe('Source controller tests', () => { expect(s).toEqual(sourceType); expect(v).toEqual(version); expect(e).toEqual(getData()); - return { implementationVersion: version, input: e }; + return { implementationVersion: version, input: getConvertedData() }; }); const response = await request(server) @@ -139,7 +153,7 @@ describe('Source controller tests', () => { mockSourceService.sourceTransformRoutine = jest .fn() .mockImplementation((i, s, v, requestMetadata) => { - expect(i).toEqual(getData()); + expect(i).toEqual(getConvertedData()); expect(s).toEqual(sourceType); expect(v).toEqual(version); return testOutput; @@ -156,7 +170,7 @@ describe('Source controller tests', () => { expect(s).toEqual(sourceType); expect(v).toEqual(version); expect(e).toEqual(getData()); - return { implementationVersion: version, input: e }; + return { implementationVersion: version, input: getConvertedData() }; }); const response = await request(server) @@ -217,4 +231,93 @@ describe('Source controller tests', () => { expect(adaptInputToVersionSpy).toHaveBeenCalledTimes(1); }); }); + + describe('V2 Source transform tests', () => { + test('successful source transform', async () => { + const sourceType = '__rudder_test__'; + const version = 'v2'; + const testOutput = [{ event: { a: 'b' }, source: { id: 'id' } }]; + + const mockSourceService = new NativeIntegrationSourceService(); + mockSourceService.sourceTransformRoutine = jest + .fn() + .mockImplementation((i, s, v, requestMetadata) => { + expect(i).toEqual(getConvertedData()); + expect(s).toEqual(sourceType); + expect(v).toEqual(version); + return testOutput; + }); + const getNativeSourceServiceSpy = jest + .spyOn(ServiceSelector, 'getNativeSourceService') + .mockImplementation(() => { + return mockSourceService; + }); + + const adaptInputToVersionSpy = jest + .spyOn(ControllerUtility, 'adaptInputToVersion') + .mockImplementation((s, v, e) => { + expect(s).toEqual(sourceType); + expect(v).toEqual(version); + expect(e).toEqual(getV2Data()); + return { implementationVersion: version, input: getConvertedData() }; + }); + + const response = await request(server) + .post('/v2/sources/__rudder_test__') + .set('Accept', 'application/json') + .send(getV2Data()); + + expect(response.status).toEqual(200); + expect(response.body).toEqual(testOutput); + + expect(response.header['apiversion']).toEqual('2'); + + expect(getNativeSourceServiceSpy).toHaveBeenCalledTimes(1); + expect(adaptInputToVersionSpy).toHaveBeenCalledTimes(1); + expect(mockSourceService.sourceTransformRoutine).toHaveBeenCalledTimes(1); + }); + + test('failing source transform', async () => { + const sourceType = '__rudder_test__'; + const version = 'v2'; + const mockSourceService = new NativeIntegrationSourceService(); + const getNativeSourceServiceSpy = jest + .spyOn(ServiceSelector, 'getNativeSourceService') + .mockImplementation(() => { + return mockSourceService; + }); + + const adaptInputToVersionSpy = jest + .spyOn(ControllerUtility, 'adaptInputToVersion') + .mockImplementation((s, v, e) => { + expect(s).toEqual(sourceType); + expect(v).toEqual(version); + expect(e).toEqual(getV2Data()); + throw new Error('test error'); + }); + + const response = await request(server) + .post('/v2/sources/__rudder_test__') + .set('Accept', 'application/json') + .send(getV2Data()); + + const expectedResp = [ + { + error: 'test error', + statTags: { + errorCategory: 'transformation', + }, + statusCode: 500, + }, + ]; + + expect(response.status).toEqual(200); + expect(response.body).toEqual(expectedResp); + + expect(response.header['apiversion']).toEqual('2'); + + expect(getNativeSourceServiceSpy).toHaveBeenCalledTimes(1); + expect(adaptInputToVersionSpy).toHaveBeenCalledTimes(1); + }); + }); }); diff --git a/src/controllers/source.ts b/src/controllers/source.ts index 230636f193..8b6d2d70f8 100644 --- a/src/controllers/source.ts +++ b/src/controllers/source.ts @@ -12,6 +12,7 @@ export class SourceController { const events = ctx.request.body as object[]; const { version, source }: { version: string; source: string } = ctx.params; const integrationService = ServiceSelector.getNativeSourceService(); + try { const { implementationVersion, input } = ControllerUtility.adaptInputToVersion( source, diff --git a/src/controllers/util/conversionStrategies/abstractions.ts b/src/controllers/util/conversionStrategies/abstractions.ts new file mode 100644 index 0000000000..f25bc374a2 --- /dev/null +++ b/src/controllers/util/conversionStrategies/abstractions.ts @@ -0,0 +1,5 @@ +import { SourceInputConversionResult } from '../../../types'; + +export abstract class VersionConversionStrategy { + abstract convert(sourceEvents: I[]): SourceInputConversionResult[]; +} diff --git a/src/controllers/util/conversionStrategies/strategyDefault.ts b/src/controllers/util/conversionStrategies/strategyDefault.ts new file mode 100644 index 0000000000..44b9fbf312 --- /dev/null +++ b/src/controllers/util/conversionStrategies/strategyDefault.ts @@ -0,0 +1,15 @@ +import { SourceInputConversionResult } from '../../../types'; +import { VersionConversionStrategy } from './abstractions'; + +export class StrategyDefault extends VersionConversionStrategy< + NonNullable, + NonNullable +> { + convert( + sourceEvents: NonNullable[], + ): SourceInputConversionResult>[] { + return sourceEvents.map((sourceEvent) => ({ + output: sourceEvent, + })); + } +} diff --git a/src/controllers/util/conversionStrategies/strategyV0ToV1.ts b/src/controllers/util/conversionStrategies/strategyV0ToV1.ts new file mode 100644 index 0000000000..28f170c4dd --- /dev/null +++ b/src/controllers/util/conversionStrategies/strategyV0ToV1.ts @@ -0,0 +1,11 @@ +import { SourceInput, SourceInputConversionResult } from '../../../types'; +import { VersionConversionStrategy } from './abstractions'; + +export class StrategyV0ToV1 extends VersionConversionStrategy, SourceInput> { + convert(sourceEvents: NonNullable[]): SourceInputConversionResult[] { + // This should be deprecated along with v0-webhook-rudder-server deprecation + return sourceEvents.map((sourceEvent) => ({ + output: { event: sourceEvent, source: undefined } as SourceInput, + })); + } +} diff --git a/src/controllers/util/conversionStrategies/strategyV1ToV0.ts b/src/controllers/util/conversionStrategies/strategyV1ToV0.ts new file mode 100644 index 0000000000..d0894099a5 --- /dev/null +++ b/src/controllers/util/conversionStrategies/strategyV1ToV0.ts @@ -0,0 +1,10 @@ +import { SourceInput, SourceInputConversionResult } from '../../../types'; +import { VersionConversionStrategy } from './abstractions'; + +export class StrategyV1ToV0 extends VersionConversionStrategy> { + convert(sourceEvents: SourceInput[]): SourceInputConversionResult>[] { + return sourceEvents.map((sourceEvent) => ({ + output: sourceEvent.event as NonNullable, + })); + } +} diff --git a/src/controllers/util/conversionStrategies/strategyV1ToV2.ts b/src/controllers/util/conversionStrategies/strategyV1ToV2.ts new file mode 100644 index 0000000000..7cf4e77808 --- /dev/null +++ b/src/controllers/util/conversionStrategies/strategyV1ToV2.ts @@ -0,0 +1,42 @@ +import { + SourceInput, + SourceInputConversionResult, + SourceInputV2, + SourceRequestV2, +} from '../../../types'; +import { VersionConversionStrategy } from './abstractions'; + +export class StrategyV1ToV2 extends VersionConversionStrategy { + convert(sourceEvents: SourceInput[]): SourceInputConversionResult[] { + return sourceEvents.map((sourceEvent) => { + try { + const sourceEventParam = { ...sourceEvent }; + + let queryParameters: Record | undefined; + if (sourceEventParam.event && sourceEventParam.event.query_parameters) { + queryParameters = sourceEventParam.event.query_parameters; + delete sourceEventParam.event.query_parameters; + } + + const sourceRequest: SourceRequestV2 = { + body: JSON.stringify(sourceEventParam.event), + }; + if (queryParameters) { + sourceRequest.query_parameters = queryParameters; + } + + const sourceInputV2: SourceInputV2 = { + request: sourceRequest, + source: sourceEventParam.source, + }; + return { + output: sourceInputV2, + }; + } catch (err) { + const conversionError = + err instanceof Error ? err : new Error('error converting v1 to v2 spec'); + return { conversionError }; + } + }); + } +} diff --git a/src/controllers/util/conversionStrategies/strategyV2ToV0.ts b/src/controllers/util/conversionStrategies/strategyV2ToV0.ts new file mode 100644 index 0000000000..1145cf9763 --- /dev/null +++ b/src/controllers/util/conversionStrategies/strategyV2ToV0.ts @@ -0,0 +1,17 @@ +import { SourceInputConversionResult, SourceInputV2 } from '../../../types'; +import { VersionConversionStrategy } from './abstractions'; + +export class StrategyV2ToV0 extends VersionConversionStrategy> { + convert(sourceEvents: SourceInputV2[]): SourceInputConversionResult>[] { + return sourceEvents.map((sourceEvent) => { + try { + const v0Event = JSON.parse(sourceEvent.request.body); + return { output: v0Event }; + } catch (err) { + const conversionError = + err instanceof Error ? err : new Error('error converting v2 to v0 spec'); + return { conversionError }; + } + }); + } +} diff --git a/src/controllers/util/conversionStrategies/strategyV2ToV1.ts b/src/controllers/util/conversionStrategies/strategyV2ToV1.ts new file mode 100644 index 0000000000..52cade0d9d --- /dev/null +++ b/src/controllers/util/conversionStrategies/strategyV2ToV1.ts @@ -0,0 +1,17 @@ +import { SourceInput, SourceInputConversionResult, SourceInputV2 } from '../../../types'; +import { VersionConversionStrategy } from './abstractions'; + +export class StrategyV2ToV1 extends VersionConversionStrategy { + convert(sourceEvents: SourceInputV2[]): SourceInputConversionResult[] { + return sourceEvents.map((sourceEvent) => { + try { + const v1Event = { event: JSON.parse(sourceEvent.request.body), source: sourceEvent.source }; + return { output: v1Event }; + } catch (err) { + const conversionError = + err instanceof Error ? err : new Error('error converting v2 to v1 spec'); + return { conversionError }; + } + }); + } +} diff --git a/src/controllers/util/index.test.ts b/src/controllers/util/index.test.ts index 6065920846..4559bccc52 100644 --- a/src/controllers/util/index.test.ts +++ b/src/controllers/util/index.test.ts @@ -19,9 +19,9 @@ describe('adaptInputToVersion', () => { const expected = { implementationVersion: undefined, input: [ - { key1: 'val1', key2: 'val2' }, - { key1: 'val1', key2: 'val2' }, - { key1: 'val1', key2: 'val2' }, + { output: { key1: 'val1', key2: 'val2' } }, + { output: { key1: 'val1', key2: 'val2' } }, + { output: { key1: 'val1', key2: 'val2' } }, ], }; @@ -40,9 +40,9 @@ describe('adaptInputToVersion', () => { const expected = { implementationVersion: 'v0', input: [ - { key1: 'val1', key2: 'val2' }, - { key1: 'val1', key2: 'val2' }, - { key1: 'val1', key2: 'val2' }, + { output: { key1: 'val1', key2: 'val2' } }, + { output: { key1: 'val1', key2: 'val2' } }, + { output: { key1: 'val1', key2: 'val2' } }, ], }; @@ -71,16 +71,22 @@ describe('adaptInputToVersion', () => { implementationVersion: 'v1', input: [ { - event: { key1: 'val1', key2: 'val2' }, - source: { id: 'source_id', config: { configField1: 'configVal1' } }, + output: { + event: { key1: 'val1', key2: 'val2' }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, }, { - event: { key1: 'val1', key2: 'val2' }, - source: { id: 'source_id', config: { configField1: 'configVal1' } }, + output: { + event: { key1: 'val1', key2: 'val2' }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, }, { - event: { key1: 'val1', key2: 'val2' }, - source: { id: 'source_id', config: { configField1: 'configVal1' } }, + output: { + event: { key1: 'val1', key2: 'val2' }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, }, ], }; @@ -100,9 +106,9 @@ describe('adaptInputToVersion', () => { const expected = { implementationVersion: 'v1', input: [ - { event: { key1: 'val1', key2: 'val2' }, source: undefined }, - { event: { key1: 'val1', key2: 'val2' }, source: undefined }, - { event: { key1: 'val1', key2: 'val2' }, source: undefined }, + { output: { event: { key1: 'val1', key2: 'val2' }, source: undefined } }, + { output: { event: { key1: 'val1', key2: 'val2' }, source: undefined } }, + { output: { event: { key1: 'val1', key2: 'val2' }, source: undefined } }, ], }; @@ -131,9 +137,192 @@ describe('adaptInputToVersion', () => { const expected = { implementationVersion: 'v0', input: [ - { key1: 'val1', key2: 'val2' }, - { key1: 'val1', key2: 'val2' }, - { key1: 'val1', key2: 'val2' }, + { output: { key1: 'val1', key2: 'val2' } }, + { output: { key1: 'val1', key2: 'val2' } }, + { output: { key1: 'val1', key2: 'val2' } }, + ], + }; + + const result = ControllerUtility.adaptInputToVersion(sourceType, requestVersion, input); + + expect(result).toEqual(expected); + }); + + it('should convert input from v2 to v0 format when the request version is v2 and the implementation version is v0', () => { + const sourceType = 'pipedream'; + const requestVersion = 'v2'; + + const input = [ + { + request: { + method: 'POST', + url: 'http://example.com', + proto: 'HTTP/2', + headers: { headerkey: ['headervalue'] }, + body: '{"key": "value"}', + query_parameters: { paramkey: ['paramvalue'] }, + }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + { + request: { + method: 'POST', + url: 'http://example.com', + proto: 'HTTP/2', + headers: { headerkey: ['headervalue'] }, + body: '{"key": "value"}', + query_parameters: { paramkey: ['paramvalue'] }, + }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + { + request: { + method: 'POST', + url: 'http://example.com', + proto: 'HTTP/2', + headers: { headerkey: ['headervalue'] }, + body: '{"key": "value"}', + query_parameters: { paramkey: ['paramvalue'] }, + }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + ]; + const expected = { + implementationVersion: 'v0', + input: [ + { output: { key: 'value' } }, + { output: { key: 'value' } }, + { output: { key: 'value' } }, + ], + }; + + const result = ControllerUtility.adaptInputToVersion(sourceType, requestVersion, input); + + expect(result).toEqual(expected); + }); + + it('should fail trying to convert input from v2 to v0 format when the request version is v2 and the implementation version is v0', () => { + const sourceType = 'pipedream'; + const requestVersion = 'v2'; + + const input = [ + { + request: { + method: 'POST', + url: 'http://example.com', + proto: 'HTTP/2', + headers: { headerkey: ['headervalue'] }, + body: '{"key": "value', + query_parameters: { paramkey: ['paramvalue'] }, + }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + ]; + const expected = { + implementationVersion: 'v0', + input: [ + { + conversionError: new SyntaxError('Unexpected end of JSON input'), + }, + ], + }; + + const result = ControllerUtility.adaptInputToVersion(sourceType, requestVersion, input); + + expect(result).toEqual(expected); + }); + + it('should convert input from v2 to v1 format when the request version is v2 and the implementation version is v1', () => { + const sourceType = 'webhook'; + const requestVersion = 'v2'; + + const input = [ + { + request: { + method: 'POST', + url: 'http://example.com', + proto: 'HTTP/2', + headers: { headerkey: ['headervalue'] }, + body: '{"key": "value"}', + query_parameters: { paramkey: ['paramvalue'] }, + }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + { + request: { + method: 'POST', + url: 'http://example.com', + proto: 'HTTP/2', + headers: { headerkey: ['headervalue'] }, + body: '{"key": "value"}', + query_parameters: { paramkey: ['paramvalue'] }, + }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + { + request: { + method: 'POST', + url: 'http://example.com', + proto: 'HTTP/2', + headers: { headerkey: ['headervalue'] }, + body: '{"key": "value"}', + query_parameters: { paramkey: ['paramvalue'] }, + }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + ]; + const expected = { + implementationVersion: 'v1', + input: [ + { + output: { + event: { key: 'value' }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + }, + { + output: { + event: { key: 'value' }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + }, + { + output: { + event: { key: 'value' }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + }, + ], + }; + + const result = ControllerUtility.adaptInputToVersion(sourceType, requestVersion, input); + + expect(result).toEqual(expected); + }); + + it('should fail trying to convert input from v2 to v1 format when the request version is v2 and the implementation version is v1', () => { + const sourceType = 'webhook'; + const requestVersion = 'v2'; + + const input = [ + { + request: { + method: 'POST', + url: 'http://example.com', + proto: 'HTTP/2', + headers: { headerkey: ['headervalue'] }, + body: '{"key": "value"', + query_parameters: { paramkey: ['paramvalue'] }, + }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + ]; + const expected = { + implementationVersion: 'v1', + input: [ + { + conversionError: new SyntaxError('Unexpected end of JSON input'), + }, ], }; @@ -153,6 +342,107 @@ describe('adaptInputToVersion', () => { expect(result).toEqual(expected); }); + + it('should convert input from v1 to v2 format when the request version is v1 and the implementation version is v2', () => { + const sourceType = 'someSourceType'; + const requestVersion = 'v1'; + + // Mock return value for getSourceVersionsMap + jest + .spyOn(ControllerUtility as any, 'getSourceVersionsMap') + .mockReturnValue(new Map([['someSourceType', 'v2']])); + + const input = [ + { + event: { key: 'value', query_parameters: { paramkey: ['paramvalue'] } }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + { + event: { key: 'value' }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + { + event: {}, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + ]; + + const expected = { + implementationVersion: 'v2', + input: [ + { + output: { + request: { + body: '{"key":"value"}', + query_parameters: { paramkey: ['paramvalue'] }, + }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + }, + { + output: { + request: { + body: '{"key":"value"}', + }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + }, + { + output: { + request: { + body: '{}', + }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + }, + ], + }; + + const result = ControllerUtility.adaptInputToVersion(sourceType, requestVersion, input); + + expect(result).toEqual(expected); + }); + + it('should fail trying to convert input from v1 to v2 format when the request version is v1 and the implementation version is v2', () => { + const sourceType = 'someSourceType'; + const requestVersion = 'v1'; + + // Mock return value for getSourceVersionsMap + jest + .spyOn(ControllerUtility as any, 'getSourceVersionsMap') + .mockReturnValue(new Map([['someSourceType', 'v2']])); + + const input = [ + { + event: { + key: 'value', + query_parameters: { paramkey: ['paramvalue'] }, + largeNumber: BigInt(12345678901234567890n), + }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + { + event: { key: 'value', largeNumber: BigInt(12345678901234567890n) }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + ]; + + const expected = { + implementationVersion: 'v2', + input: [ + { + conversionError: new TypeError('Do not know how to serialize a BigInt'), + }, + { + conversionError: new TypeError('Do not know how to serialize a BigInt'), + }, + ], + }; + + const result = ControllerUtility.adaptInputToVersion(sourceType, requestVersion, input); + + expect(result).toEqual(expected); + }); }); type timestampTestCases = { diff --git a/src/controllers/util/index.ts b/src/controllers/util/index.ts index c5bf7ab358..ab2a0f5dc3 100644 --- a/src/controllers/util/index.ts +++ b/src/controllers/util/index.ts @@ -9,11 +9,12 @@ import { ProcessorTransformationRequest, RouterTransformationRequestData, RudderMessage, - SourceInput, + SourceInputConversionResult, } from '../../types'; import { getValueFromMessage } from '../../v0/util'; import genericFieldMap from '../../v0/util/data/GenericFieldMapping.json'; import { EventType, MappedToDestinationKey } from '../../constants'; +import { versionConversionFactory } from './versionConversion'; export class ControllerUtility { private static sourceVersionMap: Map = new Map(); @@ -45,30 +46,19 @@ export class ControllerUtility { return this.sourceVersionMap; } - private static convertSourceInputv1Tov0(sourceEvents: SourceInput[]): NonNullable[] { - return sourceEvents.map((sourceEvent) => sourceEvent.event); - } - - private static convertSourceInputv0Tov1(sourceEvents: unknown[]): SourceInput[] { - return sourceEvents.map( - (sourceEvent) => ({ event: sourceEvent, source: undefined }) as SourceInput, - ); - } - public static adaptInputToVersion( sourceType: string, requestVersion: string, input: NonNullable[], - ): { implementationVersion: string; input: NonNullable[] } { + ): { implementationVersion: string; input: SourceInputConversionResult>[] } { const sourceToVersionMap = this.getSourceVersionsMap(); const implementationVersion = sourceToVersionMap.get(sourceType); - let updatedInput: NonNullable[] = input; - if (requestVersion === 'v0' && implementationVersion === 'v1') { - updatedInput = this.convertSourceInputv0Tov1(input); - } else if (requestVersion === 'v1' && implementationVersion === 'v0') { - updatedInput = this.convertSourceInputv1Tov0(input as SourceInput[]); - } - return { implementationVersion, input: updatedInput }; + + const conversionStrategy = versionConversionFactory.getStrategy( + requestVersion, + implementationVersion, + ); + return { implementationVersion, input: conversionStrategy.convert(input) }; } private static getCompatibleStatusCode(status: number): number { diff --git a/src/controllers/util/versionConversion.ts b/src/controllers/util/versionConversion.ts new file mode 100644 index 0000000000..3058531f57 --- /dev/null +++ b/src/controllers/util/versionConversion.ts @@ -0,0 +1,65 @@ +import { VersionConversionStrategy } from './conversionStrategies/abstractions'; +import { StrategyDefault } from './conversionStrategies/strategyDefault'; +import { StrategyV0ToV1 } from './conversionStrategies/strategyV0ToV1'; +import { StrategyV1ToV0 } from './conversionStrategies/strategyV1ToV0'; +import { StrategyV1ToV2 } from './conversionStrategies/strategyV1ToV2'; +import { StrategyV2ToV0 } from './conversionStrategies/strategyV2ToV0'; +import { StrategyV2ToV1 } from './conversionStrategies/strategyV2ToV1'; + +export class VersionConversionFactory { + private strategyCache: Map> = new Map(); + + private getCase(requestVersion: string, implementationVersion: string) { + return `${String(requestVersion)}-to-${String(implementationVersion)}`; + } + + public getStrategy( + requestVersion: string, + implementationVersion: string, + ): VersionConversionStrategy { + const versionCase = this.getCase(requestVersion, implementationVersion); + + if (this.strategyCache.has(versionCase)) { + const cachedStrategy = this.strategyCache.get(versionCase); + if (cachedStrategy) { + return cachedStrategy; + } + } + + let strategy: VersionConversionStrategy; + + switch (versionCase) { + case 'v0-to-v1': + strategy = new StrategyV0ToV1(); + break; + + case 'v1-to-v0': + strategy = new StrategyV1ToV0(); + break; + + case 'v1-to-v2': + strategy = new StrategyV1ToV2(); + break; + + case 'v2-to-v0': + strategy = new StrategyV2ToV0(); + break; + + case 'v2-to-v1': + strategy = new StrategyV2ToV1(); + break; + + default: + strategy = new StrategyDefault(); + break; + } + + if (strategy) { + this.strategyCache[versionCase] = strategy; + } + + return strategy; + } +} + +export const versionConversionFactory = new VersionConversionFactory(); diff --git a/src/interfaces/SourceService.ts b/src/interfaces/SourceService.ts index c7de8cfe8b..32a7125e7a 100644 --- a/src/interfaces/SourceService.ts +++ b/src/interfaces/SourceService.ts @@ -1,10 +1,14 @@ -import { MetaTransferObject, SourceTransformationResponse } from '../types/index'; +import { + MetaTransferObject, + SourceInputConversionResult, + SourceTransformationResponse, +} from '../types/index'; export interface SourceService { getTags(): MetaTransferObject; sourceTransformRoutine( - sourceEvents: NonNullable[], + sourceEvents: SourceInputConversionResult>[], sourceType: string, version: string, requestMetadata: NonNullable, diff --git a/src/services/source/__tests__/nativeIntegration.test.ts b/src/services/source/__tests__/nativeIntegration.test.ts index 2ef8129cdc..51bb37f5f1 100644 --- a/src/services/source/__tests__/nativeIntegration.test.ts +++ b/src/services/source/__tests__/nativeIntegration.test.ts @@ -44,7 +44,15 @@ describe('NativeIntegration Source Service', () => { }); const service = new NativeIntegrationSourceService(); - const resp = await service.sourceTransformRoutine(events, sourceType, version, requestMetadata); + const adapterConvertedEvents = events.map((eventInstance) => { + return { output: eventInstance }; + }); + const resp = await service.sourceTransformRoutine( + adapterConvertedEvents, + sourceType, + version, + requestMetadata, + ); expect(resp).toEqual(tresponse); @@ -81,7 +89,15 @@ describe('NativeIntegration Source Service', () => { jest.spyOn(stats, 'increment').mockImplementation(() => {}); const service = new NativeIntegrationSourceService(); - const resp = await service.sourceTransformRoutine(events, sourceType, version, requestMetadata); + const adapterConvertedEvents = events.map((eventInstance) => { + return { output: eventInstance }; + }); + const resp = await service.sourceTransformRoutine( + adapterConvertedEvents, + sourceType, + version, + requestMetadata, + ); expect(resp).toEqual(tresponse); diff --git a/src/services/source/nativeIntegration.ts b/src/services/source/nativeIntegration.ts index 5c89de7b92..078716df96 100644 --- a/src/services/source/nativeIntegration.ts +++ b/src/services/source/nativeIntegration.ts @@ -4,6 +4,7 @@ import { ErrorDetailer, MetaTransferObject, RudderMessage, + SourceInputConversionResult, SourceTransformationEvent, SourceTransformationResponse, } from '../../types/index'; @@ -28,7 +29,7 @@ export class NativeIntegrationSourceService implements SourceService { } public async sourceTransformRoutine( - sourceEvents: NonNullable[], + sourceEvents: SourceInputConversionResult>[], sourceType: string, version: string, // eslint-disable-next-line @typescript-eslint/no-unused-vars @@ -39,12 +40,38 @@ export class NativeIntegrationSourceService implements SourceService { const respList: SourceTransformationResponse[] = await Promise.all( sourceEvents.map(async (sourceEvent) => { try { - const newSourceEvent = sourceEvent; - const { headers } = newSourceEvent; - delete newSourceEvent.headers; - const respEvents: RudderMessage | RudderMessage[] | SourceTransformationResponse = - await sourceHandler.process(newSourceEvent); - return SourcePostTransformationService.handleSuccessEventsSource(respEvents, { headers }); + if (sourceEvent.conversionError) { + stats.increment('source_transform_errors', { + source: sourceType, + version, + }); + logger.debug(`Error during source Transform: ${sourceEvent.conversionError}`, { + ...logger.getLogMetadata(metaTO.errorDetails), + }); + return SourcePostTransformationService.handleFailureEventsSource( + sourceEvent.conversionError, + metaTO, + ); + } + + if (sourceEvent.output) { + const newSourceEvent = sourceEvent.output; + + const { headers } = newSourceEvent; + if (headers) { + delete newSourceEvent.headers; + } + + const respEvents: RudderMessage | RudderMessage[] | SourceTransformationResponse = + await sourceHandler.process(newSourceEvent); + return SourcePostTransformationService.handleSuccessEventsSource(respEvents, { + headers, + }); + } + return SourcePostTransformationService.handleFailureEventsSource( + new Error('Error post version converstion, converstion output is undefined'), + metaTO, + ); } catch (error: FixMe) { stats.increment('source_transform_errors', { source: sourceType, diff --git a/src/types/index.ts b/src/types/index.ts index 45ec7445c3..7c07f659df 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -352,9 +352,32 @@ type Source = { }; type SourceInput = { - event: NonNullable[]; + event: { + query_parameters?: any; + [key: string]: any; + }; + source?: Source; +}; + +type SourceRequestV2 = { + method?: string; + url?: string; + proto?: string; + body: string; + headers?: Record; + query_parameters?: Record; +}; + +type SourceInputV2 = { + request: SourceRequestV2; source?: Source; }; + +type SourceInputConversionResult = { + output?: T; + conversionError?: Error; +}; + export { ComparatorInput, DeliveryJobState, @@ -382,7 +405,10 @@ export { UserDeletionRequest, UserDeletionResponse, SourceInput, + SourceInputV2, + SourceRequestV2, Source, + SourceInputConversionResult, UserTransformationLibrary, UserTransformationResponse, UserTransformationServiceResponse, diff --git a/test/apitests/service.api.test.ts b/test/apitests/service.api.test.ts index 9c1d96e7fe..2ad1f323ac 100644 --- a/test/apitests/service.api.test.ts +++ b/test/apitests/service.api.test.ts @@ -78,6 +78,13 @@ describe('features tests', () => { const supportTransformerProxyV1 = JSON.parse(response.text).supportTransformerProxyV1; expect(typeof supportTransformerProxyV1).toBe('boolean'); }); + + test('features upgradedToSourceTransformV2 to be boolean', async () => { + const response = await request(server).get('/features'); + expect(response.status).toEqual(200); + const upgradedToSourceTransformV2 = JSON.parse(response.text).upgradedToSourceTransformV2; + expect(typeof upgradedToSourceTransformV2).toBe('boolean'); + }); }); describe('Api tests with a mock source/destination', () => {