Skip to content

Commit

Permalink
feat: sources v2 spec support along with adapters (#3810)
Browse files Browse the repository at this point in the history
  • Loading branch information
vinayteki95 authored Nov 13, 2024
2 parents 51bbc02 + e90d2ad commit c51cfbb
Show file tree
Hide file tree
Showing 19 changed files with 713 additions and 56 deletions.
12 changes: 11 additions & 1 deletion .github/workflows/verify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,19 @@ jobs:
with:
token: ${{ secrets.GITHUB_TOKEN }}

- name: Filter JS/TS Files
run: |
echo "${{ steps.files.outputs.added_modified }}" | tr ' ' '\n' | grep -E '\.(js|ts|jsx|tsx)$' > changed_files.txt
if [ ! -s changed_files.txt ]; then
echo "No JS/TS files to format or lint."
exit 0
fi
- name: Run format Checks
run: |
npx prettier ${{steps.files.outputs.added_modified}} --write
if [ -s changed_files.txt ]; then
npx prettier --write $(cat changed_files.txt)
fi
- run: git diff --exit-code

Expand Down
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ dist

# Stores VSCode versions used for testing VSCode extensions
.vscode-test
.vscode

# yarn v2
.yarn/cache
Expand All @@ -133,9 +134,9 @@ dist
# Others
**/.DS_Store
.dccache

.python-version
.idea

# component test report
test_reports/
temp/
temp/
111 changes: 107 additions & 4 deletions src/controllers/__tests__/source.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { applicationRoutes } from '../../routes';
import { NativeIntegrationSourceService } from '../../services/source/nativeIntegration';
import { ServiceSelector } from '../../helpers/serviceSelector';
import { ControllerUtility } from '../util/index';
import { SourceInputConversionResult } from '../../types';

let server: any;
const OLD_ENV = process.env;
Expand Down Expand Up @@ -38,6 +39,19 @@ const getData = () => {
return [{ event: { a: 'b1' } }, { event: { a: 'b2' } }];
};

const getV2Data = () => {
return [
{ request: { body: '{"a": "b"}' }, source: { id: 1 } },
{ request: { body: '{"a": "b"}' }, source: { id: 1 } },
];
};

const getConvertedData = () => {
return getData().map((eventInstance) => {
return { output: eventInstance } as SourceInputConversionResult<typeof eventInstance>;
});
};

describe('Source controller tests', () => {
describe('V0 Source transform tests', () => {
test('successful source transform', async () => {
Expand All @@ -49,7 +63,7 @@ describe('Source controller tests', () => {
mockSourceService.sourceTransformRoutine = jest
.fn()
.mockImplementation((i, s, v, requestMetadata) => {
expect(i).toEqual(getData());
expect(i).toEqual(getConvertedData());
expect(s).toEqual(sourceType);
expect(v).toEqual(version);
return testOutput;
Expand All @@ -66,7 +80,7 @@ describe('Source controller tests', () => {
expect(s).toEqual(sourceType);
expect(v).toEqual(version);
expect(e).toEqual(getData());
return { implementationVersion: version, input: e };
return { implementationVersion: version, input: getConvertedData() };
});

const response = await request(server)
Expand Down Expand Up @@ -139,7 +153,7 @@ describe('Source controller tests', () => {
mockSourceService.sourceTransformRoutine = jest
.fn()
.mockImplementation((i, s, v, requestMetadata) => {
expect(i).toEqual(getData());
expect(i).toEqual(getConvertedData());
expect(s).toEqual(sourceType);
expect(v).toEqual(version);
return testOutput;
Expand All @@ -156,7 +170,7 @@ describe('Source controller tests', () => {
expect(s).toEqual(sourceType);
expect(v).toEqual(version);
expect(e).toEqual(getData());
return { implementationVersion: version, input: e };
return { implementationVersion: version, input: getConvertedData() };
});

const response = await request(server)
Expand Down Expand Up @@ -217,4 +231,93 @@ describe('Source controller tests', () => {
expect(adaptInputToVersionSpy).toHaveBeenCalledTimes(1);
});
});

describe('V2 Source transform tests', () => {
test('successful source transform', async () => {
const sourceType = '__rudder_test__';
const version = 'v2';
const testOutput = [{ event: { a: 'b' }, source: { id: 'id' } }];

const mockSourceService = new NativeIntegrationSourceService();
mockSourceService.sourceTransformRoutine = jest
.fn()
.mockImplementation((i, s, v, requestMetadata) => {
expect(i).toEqual(getConvertedData());
expect(s).toEqual(sourceType);
expect(v).toEqual(version);
return testOutput;
});
const getNativeSourceServiceSpy = jest
.spyOn(ServiceSelector, 'getNativeSourceService')
.mockImplementation(() => {
return mockSourceService;
});

const adaptInputToVersionSpy = jest
.spyOn(ControllerUtility, 'adaptInputToVersion')
.mockImplementation((s, v, e) => {
expect(s).toEqual(sourceType);
expect(v).toEqual(version);
expect(e).toEqual(getV2Data());
return { implementationVersion: version, input: getConvertedData() };
});

const response = await request(server)
.post('/v2/sources/__rudder_test__')
.set('Accept', 'application/json')
.send(getV2Data());

expect(response.status).toEqual(200);
expect(response.body).toEqual(testOutput);

expect(response.header['apiversion']).toEqual('2');

expect(getNativeSourceServiceSpy).toHaveBeenCalledTimes(1);
expect(adaptInputToVersionSpy).toHaveBeenCalledTimes(1);
expect(mockSourceService.sourceTransformRoutine).toHaveBeenCalledTimes(1);
});

test('failing source transform', async () => {
const sourceType = '__rudder_test__';
const version = 'v2';
const mockSourceService = new NativeIntegrationSourceService();
const getNativeSourceServiceSpy = jest
.spyOn(ServiceSelector, 'getNativeSourceService')
.mockImplementation(() => {
return mockSourceService;
});

const adaptInputToVersionSpy = jest
.spyOn(ControllerUtility, 'adaptInputToVersion')
.mockImplementation((s, v, e) => {
expect(s).toEqual(sourceType);
expect(v).toEqual(version);
expect(e).toEqual(getV2Data());
throw new Error('test error');
});

const response = await request(server)
.post('/v2/sources/__rudder_test__')
.set('Accept', 'application/json')
.send(getV2Data());

const expectedResp = [
{
error: 'test error',
statTags: {
errorCategory: 'transformation',
},
statusCode: 500,
},
];

expect(response.status).toEqual(200);
expect(response.body).toEqual(expectedResp);

expect(response.header['apiversion']).toEqual('2');

expect(getNativeSourceServiceSpy).toHaveBeenCalledTimes(1);
expect(adaptInputToVersionSpy).toHaveBeenCalledTimes(1);
});
});
});
1 change: 1 addition & 0 deletions src/controllers/source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export class SourceController {
const events = ctx.request.body as object[];
const { version, source }: { version: string; source: string } = ctx.params;
const integrationService = ServiceSelector.getNativeSourceService();

try {
const { implementationVersion, input } = ControllerUtility.adaptInputToVersion(
source,
Expand Down
5 changes: 5 additions & 0 deletions src/controllers/util/conversionStrategies/abstractions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { SourceInputConversionResult } from '../../../types';

export abstract class VersionConversionStrategy<I, O> {
abstract convert(sourceEvents: I[]): SourceInputConversionResult<O>[];
}
15 changes: 15 additions & 0 deletions src/controllers/util/conversionStrategies/strategyDefault.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { SourceInputConversionResult } from '../../../types';
import { VersionConversionStrategy } from './abstractions';

export class StrategyDefault extends VersionConversionStrategy<
NonNullable<unknown>,
NonNullable<unknown>
> {
convert(
sourceEvents: NonNullable<unknown>[],
): SourceInputConversionResult<NonNullable<unknown>>[] {
return sourceEvents.map((sourceEvent) => ({
output: sourceEvent,
}));
}
}
11 changes: 11 additions & 0 deletions src/controllers/util/conversionStrategies/strategyV0ToV1.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { SourceInput, SourceInputConversionResult } from '../../../types';
import { VersionConversionStrategy } from './abstractions';

export class StrategyV0ToV1 extends VersionConversionStrategy<NonNullable<unknown>, SourceInput> {
convert(sourceEvents: NonNullable<unknown>[]): SourceInputConversionResult<SourceInput>[] {
// This should be deprecated along with v0-webhook-rudder-server deprecation
return sourceEvents.map((sourceEvent) => ({
output: { event: sourceEvent, source: undefined } as SourceInput,
}));
}
}
10 changes: 10 additions & 0 deletions src/controllers/util/conversionStrategies/strategyV1ToV0.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { SourceInput, SourceInputConversionResult } from '../../../types';
import { VersionConversionStrategy } from './abstractions';

export class StrategyV1ToV0 extends VersionConversionStrategy<SourceInput, NonNullable<unknown>> {
convert(sourceEvents: SourceInput[]): SourceInputConversionResult<NonNullable<unknown>>[] {
return sourceEvents.map((sourceEvent) => ({
output: sourceEvent.event as NonNullable<unknown>,
}));
}
}
42 changes: 42 additions & 0 deletions src/controllers/util/conversionStrategies/strategyV1ToV2.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import {
SourceInput,
SourceInputConversionResult,
SourceInputV2,
SourceRequestV2,
} from '../../../types';
import { VersionConversionStrategy } from './abstractions';

export class StrategyV1ToV2 extends VersionConversionStrategy<SourceInput, SourceInputV2> {
convert(sourceEvents: SourceInput[]): SourceInputConversionResult<SourceInputV2>[] {
return sourceEvents.map((sourceEvent) => {
try {
const sourceEventParam = { ...sourceEvent };

let queryParameters: Record<string, unknown> | undefined;
if (sourceEventParam.event && sourceEventParam.event.query_parameters) {
queryParameters = sourceEventParam.event.query_parameters;
delete sourceEventParam.event.query_parameters;
}

const sourceRequest: SourceRequestV2 = {
body: JSON.stringify(sourceEventParam.event),
};
if (queryParameters) {
sourceRequest.query_parameters = queryParameters;
}

const sourceInputV2: SourceInputV2 = {
request: sourceRequest,
source: sourceEventParam.source,
};
return {
output: sourceInputV2,
};
} catch (err) {
const conversionError =
err instanceof Error ? err : new Error('error converting v1 to v2 spec');
return { conversionError };
}
});
}
}
17 changes: 17 additions & 0 deletions src/controllers/util/conversionStrategies/strategyV2ToV0.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { SourceInputConversionResult, SourceInputV2 } from '../../../types';
import { VersionConversionStrategy } from './abstractions';

export class StrategyV2ToV0 extends VersionConversionStrategy<SourceInputV2, NonNullable<unknown>> {
convert(sourceEvents: SourceInputV2[]): SourceInputConversionResult<NonNullable<unknown>>[] {
return sourceEvents.map((sourceEvent) => {
try {
const v0Event = JSON.parse(sourceEvent.request.body);
return { output: v0Event };
} catch (err) {
const conversionError =
err instanceof Error ? err : new Error('error converting v2 to v0 spec');
return { conversionError };
}
});
}
}
17 changes: 17 additions & 0 deletions src/controllers/util/conversionStrategies/strategyV2ToV1.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { SourceInput, SourceInputConversionResult, SourceInputV2 } from '../../../types';
import { VersionConversionStrategy } from './abstractions';

export class StrategyV2ToV1 extends VersionConversionStrategy<SourceInputV2, SourceInput> {
convert(sourceEvents: SourceInputV2[]): SourceInputConversionResult<SourceInput>[] {
return sourceEvents.map((sourceEvent) => {
try {
const v1Event = { event: JSON.parse(sourceEvent.request.body), source: sourceEvent.source };
return { output: v1Event };
} catch (err) {
const conversionError =
err instanceof Error ? err : new Error('error converting v2 to v1 spec');
return { conversionError };
}
});
}
}
Loading

0 comments on commit c51cfbb

Please sign in to comment.