From 3f2bed9b9201068eebe8c82517d47cf8d8c5d35b Mon Sep 17 00:00:00 2001 From: Utsab Chowdhury Date: Thu, 21 Nov 2024 23:23:58 +0530 Subject: [PATCH] chore: claen depricated files --- src/controllers/bulkUpload.ts | 166 ---- src/controllers/obs.delivery.js | 130 --- src/legacy/delivery.js | 27 - src/legacy/router.js | 1272 --------------------------- src/routes/bulkUpload.ts | 17 - src/routes/index.ts | 2 - test/__tests__/legacyRouter.test.ts | 113 --- 7 files changed, 1727 deletions(-) delete mode 100644 src/controllers/bulkUpload.ts delete mode 100644 src/controllers/obs.delivery.js delete mode 100644 src/legacy/delivery.js delete mode 100644 src/legacy/router.js delete mode 100644 src/routes/bulkUpload.ts delete mode 100644 test/__tests__/legacyRouter.test.ts diff --git a/src/controllers/bulkUpload.ts b/src/controllers/bulkUpload.ts deleted file mode 100644 index cb0bcfed3c..0000000000 --- a/src/controllers/bulkUpload.ts +++ /dev/null @@ -1,166 +0,0 @@ -/* eslint-disable global-require, import/no-dynamic-require, @typescript-eslint/no-unused-vars */ -import { client as errNotificationClient } from '../util/errorNotifier'; -import { - getDestFileUploadHandler, - getJobStatusHandler, - getPollStatusHandler, -} from '../util/fetchDestinationHandlers'; -import { CatchErr, ContextBodySimple } from '../util/types'; -import logger from '../logger'; -// TODO: To be refactored and redisgned - -const ERROR_MESSAGE_PROCESSOR_STRING = 'Error occurred while processing payload.'; - -const getCommonMetadata = (ctx) => - // TODO: Parse information such as - // cluster, namespace, etc information - // from the request - ({ - namespace: 'Unknown', - cluster: 'Unknown', - }); - -const getReqMetadata = (ctx) => { - try { - const reqBody = ctx.request.body; - return { destType: reqBody?.destType, importId: reqBody?.importId }; - } catch (error) { - // Do nothing - } - return {}; -}; - -export const fileUpload = async (ctx) => { - logger.debug('Native(Bulk-Upload): Request to transformer:: /fileUpload route', ctx.request.body); - const getReqMetadataFileUpload = () => { - try { - const reqBody = ctx.request.body; - return { destType: reqBody?.destType }; - } catch (error) { - // Do nothing - } - return {}; - }; - - const { destType }: ContextBodySimple = ctx.request.body; - const destFileUploadHandler = getDestFileUploadHandler('v0', destType.toLowerCase()); - - if (!destFileUploadHandler || !destFileUploadHandler.processFileData) { - ctx.status = 404; - ctx.body = `${destType} doesn't support bulk upload`; - return null; - } - let response; - try { - response = await destFileUploadHandler.processFileData(ctx.request.body); - } catch (error: CatchErr) { - response = { - statusCode: error?.response?.status || error?.status || 400, - error: error.message || ERROR_MESSAGE_PROCESSOR_STRING, - metadata: error.response ? error.response.metadata : null, - }; - errNotificationClient.notify(error, 'File Upload', { - ...response, - ...getCommonMetadata(ctx), - ...getReqMetadata(ctx), - }); - } - ctx.body = response; - logger.debug('Native(Bulk-Upload): Response from transformer:: /fileUpload route', ctx.body); - return ctx.body; -}; - -export const pollStatus = async (ctx) => { - logger.debug('Native(Bulk-Upload): Request to transformer:: /pollStatus route', ctx.request.body); - - const { destType }: ContextBodySimple = ctx.request.body; - const destFileUploadHandler = getPollStatusHandler('v0', destType.toLowerCase()); - let response; - if (!destFileUploadHandler || !destFileUploadHandler.processPolling) { - ctx.status = 404; - ctx.body = `${destType} doesn't support bulk upload`; - return null; - } - try { - response = await destFileUploadHandler.processPolling(ctx.request.body); - } catch (error: CatchErr) { - response = { - statusCode: error.response?.status || 400, - error: error.message || ERROR_MESSAGE_PROCESSOR_STRING, - }; - errNotificationClient.notify(error, 'Poll Status', { - ...response, - ...getCommonMetadata(ctx), - ...getReqMetadata(ctx), - }); - } - ctx.body = response; - logger.debug('Native(Bulk-Upload): Request from transformer:: /pollStatus route', ctx.body); - return ctx.body; -}; - -export const getWarnJobStatus = async (ctx) => { - logger.debug( - 'Native(Bulk-Upload): Request to transformer:: /getWarningJobs route', - ctx.request.body, - ); - - const { destType }: ContextBodySimple = ctx.request.body; - const destFileUploadHandler = getJobStatusHandler('v0', destType.toLowerCase()); - - if (!destFileUploadHandler || !destFileUploadHandler.processJobStatus) { - ctx.status = 404; - ctx.body = `${destType} doesn't support bulk upload`; - return null; - } - let response; - try { - response = await destFileUploadHandler.processJobStatus(ctx.request.body, 'warn'); - } catch (error: CatchErr) { - response = { - statusCode: error.response ? error.response.status : 400, - error: error.message || ERROR_MESSAGE_PROCESSOR_STRING, - }; - errNotificationClient.notify(error, 'Job Status', { - ...response, - ...getCommonMetadata(ctx), - ...getReqMetadata(ctx), - }); - } - ctx.body = response; - logger.debug('Native(Bulk-Upload): Request from transformer:: /getWarningJobs route', ctx.body); - return ctx.body; -}; - -export const getFailedJobStatus = async (ctx) => { - logger.debug( - 'Native(Bulk-Upload): Request to transformer:: /getFailedJobs route', - ctx.request.body, - ); - - const { destType }: ContextBodySimple = ctx.request.body; - const destFileUploadHandler = getJobStatusHandler('v0', destType.toLowerCase()); - - if (!destFileUploadHandler || !destFileUploadHandler.processJobStatus) { - ctx.status = 404; - ctx.body = `${destType} doesn't support bulk upload`; - return null; - } - let response; - try { - response = await destFileUploadHandler.processJobStatus(ctx.request.body, 'fail'); - } catch (error: CatchErr) { - response = { - statusCode: error.response ? error.response.status : 400, - error: error.message || ERROR_MESSAGE_PROCESSOR_STRING, - }; - errNotificationClient.notify(error, 'Job Status', { - ...response, - ...getCommonMetadata(ctx), - ...getReqMetadata(ctx), - }); - } - ctx.body = response; - logger.debug('Native(Bulk-Upload): Request from transformer:: /getFailedJobs route', ctx.body); - return ctx.body; -}; diff --git a/src/controllers/obs.delivery.js b/src/controllers/obs.delivery.js deleted file mode 100644 index 8e99650af6..0000000000 --- a/src/controllers/obs.delivery.js +++ /dev/null @@ -1,130 +0,0 @@ -/** - * -------------------------------------- - * -------------------------------------- - * ---------TO BE DEPRECATED------------- - * -------------------------------------- - * -------------------------------------- - */ - -const match = require('match-json'); -const jsonDiff = require('json-diff'); -const networkHandlerFactory = require('../adapters/networkHandlerFactory'); -const { getPayloadData } = require('../adapters/network'); -const { generateErrorObject } = require('../v0/util'); -const logger = require('../logger'); -const tags = require('../v0/util/tags'); -const stats = require('../util/stats'); - -const DestProxyController = { - /** - * Handler for testing the destination proxy - * @param {*} destination Destination name - * @param {*} ctx - * @returns - */ - async handleProxyTestRequest(destination, ctx) { - const { - deliveryPayload: routerDeliveryPayload, - destinationRequestPayload: routerDestReqPayload, - } = ctx.request.body; - let response; - try { - const destNetworkHandler = networkHandlerFactory.getNetworkHandler(destination); - - const proxyDestReqPayload = destNetworkHandler.prepareProxy(routerDeliveryPayload); - response = { - destinationRequestPayload: proxyDestReqPayload, - }; - - // Special handling required as Go and JavaScript encodes - // URL parameters differently - const { payloadFormat } = getPayloadData(routerDeliveryPayload.body); - if (payloadFormat === 'FORM') { - // This is to make sure we encode `~` in the data coming from the router. - // The data coming from the router is already a query parameter string - const routerDataVal = new URLSearchParams(routerDestReqPayload.data); - routerDestReqPayload.data = routerDataVal; - - const proxyDataVal = new URLSearchParams(); - proxyDestReqPayload.data.forEach((value, key) => { - const encodeAsterisk = (x) => x.replace(/\*/g, '%2A'); - // Router encodes `*` as well - proxyDataVal.append(encodeAsterisk(key), encodeAsterisk(value)); - }); - proxyDestReqPayload.data = proxyDataVal; - } - - // Compare the destination request payloads from router and proxy - if (!match(routerDestReqPayload, proxyDestReqPayload)) { - stats.counter('proxy_test_payload_mismatch', 1, { - destination, - }); - - logger.error(`[TransformerProxyTest] Destination request payload mismatch!`); - logger.error( - `[TransformerProxyTest] Delivery payload (router): ${JSON.stringify( - routerDeliveryPayload, - )}`, - ); - logger.error( - `[TransformerProxyTest] Destination request payload (router): ${JSON.stringify( - routerDestReqPayload, - )}`, - ); - logger.error( - `[TransformerProxyTest] Destination request payload (proxy): ${JSON.stringify( - proxyDestReqPayload, - )} `, - ); - - // Compute output difference - const outputDiff = jsonDiff.diffString(routerDestReqPayload, proxyDestReqPayload); - logger.error( - `[TransformerProxyTest] Destination request payload difference: ${outputDiff}`, - ); - response = { - outputDiff, - ...response, - }; - } else { - stats.counter('proxy_test_payload_match', 1, { - destination, - }); - } - } catch (err) { - stats.counter('proxy_test_error', 1, { - destination, - }); - - response = generateErrorObject( - err, - { - [tags.TAG_NAMES.DEST_TYPE]: destination.toUpperCase(), - [tags.TAG_NAMES.MODULE]: tags.MODULES.DESTINATION, - [tags.TAG_NAMES.FEATURE]: tags.FEATURES.DATA_DELIVERY, - }, - false, - ); - response.message = `[TransformerProxyTest] Error occurred while testing proxy for destination ("${destination}"): "${err.message}"`; - logger.error(response.message); - logger.error(err); - logger.error( - `[TransformerProxyTest] Delivery payload (router): ${JSON.stringify( - routerDeliveryPayload, - )}`, - ); - logger.error( - `[TransformerProxyTest] Destination request payload (router): ${JSON.stringify( - routerDestReqPayload, - )}`, - ); - } - - // Always return success as router doesn't care - ctx.status = 200; - ctx.body = { output: response }; - return ctx.body; - }, -}; - -module.exports = { DestProxyController }; diff --git a/src/legacy/delivery.js b/src/legacy/delivery.js deleted file mode 100644 index 8f7b092815..0000000000 --- a/src/legacy/delivery.js +++ /dev/null @@ -1,27 +0,0 @@ -/** - * -------------------------------------- - * -------------------------------------- - * ---------TO BE DEPRICIATED------------ - * -------------------------------------- - * -------------------------------------- - */ - -const path = require('path'); -const KoaRouter = require('@koa/router'); -const { DestProxyController } = require('../controllers/obs.delivery'); -const { getIntegrations } = require('../routes/utils'); -const { SUPPORTED_VERSIONS, API_VERSION } = require('../routes/utils/constants'); - -const router = new KoaRouter(); - -SUPPORTED_VERSIONS.forEach((version) => { - const destinations = getIntegrations(path.resolve(__dirname, `../${version}/destinations`)); - destinations.forEach((destination) => { - router.post(`/${version}/destinations/${destination}/proxyTest`, async (ctx) => { - ctx.set('apiVersion', API_VERSION); - await DestProxyController.handleProxyTestRequest(destination, ctx); - }); - }); -}); - -module.exports = router.routes(); diff --git a/src/legacy/router.js b/src/legacy/router.js deleted file mode 100644 index 60f786e225..0000000000 --- a/src/legacy/router.js +++ /dev/null @@ -1,1272 +0,0 @@ -// ============================================================================= -// DEPRECATION NOTICE: THIS FILE IS GETTING DEPRECATED AND WILL BE REMOVED IN FUTURE RELEASE -// ============================================================================= -/* eslint-disable import/no-dynamic-require */ -/* eslint-disable global-require */ -const Router = require('@koa/router'); -const lodash = require('lodash'); -const fs = require('fs'); -const path = require('path'); -const { PlatformError, getErrorRespEvents } = require('@rudderstack/integrations-lib'); -const logger = require('../logger'); -const stats = require('../util/stats'); -const { SUPPORTED_VERSIONS, API_VERSION } = require('../routes/utils/constants'); -const { client: errNotificationClient } = require('../util/errorNotifier'); -const tags = require('../v0/util/tags'); - -const { - isNonFuncObject, - getMetadata, - generateErrorObject, - checkAndCorrectUserId, -} = require('../v0/util'); -const { processDynamicConfig } = require('../util/dynamicConfig'); -const { DestHandlerMap } = require('../constants/destinationCanonicalNames'); -const { userTransformHandler } = require('../routerUtils'); -const networkHandlerFactory = require('../adapters/networkHandlerFactory'); -const destProxyRoutes = require('./delivery'); -const eventValidator = require('../util/eventValidation'); -const { getIntegrations } = require('../routes/utils'); -const { setupUserTransformHandler, validateCode } = require('../util/customTransformer'); -const { - RespStatusError, - RetryRequestError, - sendViolationMetrics, - constructValidationErrors, -} = require('../util/utils'); -const { extractLibraries } = require('../util/customTransformer'); -const { getCompatibleStatusCode } = require('../adapters/utils/networkUtils'); - -const transformerMode = process.env.TRANSFORMER_MODE; - -const startDestTransformer = transformerMode === 'destination' || !transformerMode; -const startSourceTransformer = transformerMode === 'source' || !transformerMode; -const transformerProxy = process.env.TRANSFORMER_PROXY || true; -const proxyTestModeEnabled = - process.env.TRANSFORMER_PROXY_TEST_ENABLED?.toLowerCase() === 'true' || false; -const transformerTestModeEnabled = process.env.TRANSFORMER_TEST_MODE - ? process.env.TRANSFORMER_TEST_MODE.toLowerCase() === 'true' - : false; - -const router = new Router(); - -const PAYLOAD_PROC_ERR_MSG = 'Error occurred while processing payload'; - -/** - * @deprecated this function is deprecated and will be removed in future release - */ -const getDestHandler = (version, dest) => { - if (Object.prototype.hasOwnProperty.call(DestHandlerMap, dest)) { - return require(`../${version}/destinations/${DestHandlerMap[dest]}/transform`); - } - return require(`../${version}/destinations/${dest}/transform`); -}; - -const getDestFileUploadHandler = (version, dest) => - require(`../${version}/destinations/${dest}/fileUpload`); - -const getPollStatusHandler = (version, dest) => require(`../${version}/destinations/${dest}/poll`); - -const getJobStatusHandler = (version, dest) => - require(`../${version}/destinations/${dest}/fetchJobStatus`); - -const getDeletionUserHandler = (version, dest) => - require(`../${version}/destinations/${dest}/deleteUsers`); - -const getSourceHandler = (version, source) => require(`../${version}/sources/${source}/transform`); - -let areFunctionsEnabled = -1; -const functionsEnabled = () => { - if (areFunctionsEnabled === -1) { - areFunctionsEnabled = process.env.ENABLE_FUNCTIONS === 'false' ? 0 : 1; - } - return areFunctionsEnabled === 1; -}; - -// eslint-disable-next-line @typescript-eslint/no-unused-vars -function getCommonMetadata(ctx) { - // TODO: Parse information such as - // cluster, namespace, etc information - // from the request - return { - namespace: 'Unknown', - cluster: 'Unknown', - }; -} - -/** - * Enriches the transformed event with more information - * - userId stringification - * - * @param {Object} transformedEvent - single transformed event - * @returns transformedEvent after enrichment - */ -const enrichTransformedEvent = (transformedEvent) => ({ - ...transformedEvent, - userId: checkAndCorrectUserId(transformedEvent.statusCode, transformedEvent?.userId), -}); - -/** - * @deprecated this function is deprecated and will be removed in future release - */ -function handleV0Destination(destHandler, inputArr) { - return destHandler(...inputArr); -} -/** - * @deprecated this function is deprecated and will be removed in future release - */ -async function handleDest(ctx, version, destination) { - const getReqMetadata = (event) => { - try { - return { - destType: destination, - destinationId: event?.destination?.ID, - destName: event?.destination?.Name, - metadata: event?.metadata, - }; - } catch (error) { - // Do nothing - } - return {}; - }; - - const events = ctx.request.body; - if (!Array.isArray(events) || events.length === 0) { - throw new PlatformError('Event is missing or in inappropriate format'); - } - const reqParams = ctx.request.query; - logger.debug(`[DT] Input events: ${JSON.stringify(events)}`); - - const metaTags = - events && events.length > 0 && events[0].metadata ? getMetadata(events[0].metadata) : {}; - stats.histogram('dest_transform_input_events', events.length, { - destination, - version, - ...metaTags, - }); - const executeStartTime = new Date(); - let destHandler = null; - const respList = await Promise.all( - events.map(async (event) => { - try { - let parsedEvent = event; - parsedEvent.request = { query: reqParams }; - parsedEvent = processDynamicConfig(parsedEvent); - let respEvents; - if (destHandler === null) { - destHandler = getDestHandler(version, destination); - } - respEvents = await handleV0Destination(destHandler.process, [parsedEvent]); - - if (respEvents) { - if (!Array.isArray(respEvents)) { - respEvents = [respEvents]; - } - return respEvents.map((ev) => ({ - output: enrichTransformedEvent(ev), - metadata: destHandler?.processMetadata - ? destHandler.processMetadata({ - metadata: event.metadata, - inputEvent: parsedEvent, - outputEvent: ev, - }) - : event.metadata, - statusCode: 200, - })); - } - return undefined; - } catch (error) { - logger.error(error); - - const implementation = tags.IMPLEMENTATIONS.NATIVE; - const errCtx = 'Processor Transformation'; - - const errObj = generateErrorObject(error, { - [tags.TAG_NAMES.DEST_TYPE]: destination.toUpperCase(), - [tags.TAG_NAMES.MODULE]: tags.MODULES.DESTINATION, - [tags.TAG_NAMES.IMPLEMENTATION]: implementation, - [tags.TAG_NAMES.FEATURE]: tags.FEATURES.PROCESSOR, - [tags.TAG_NAMES.DESTINATION_ID]: event.metadata?.destinationId, - [tags.TAG_NAMES.WORKSPACE_ID]: event.metadata?.workspaceId, - }); - - const resp = { - metadata: event.metadata, - destination: event.destination, - statusCode: errObj.status, - error: errObj.message, - statTags: errObj.statTags, - }; - - errNotificationClient.notify(error, errCtx, { - ...resp, - ...getCommonMetadata(ctx), - ...getReqMetadata(event), - }); - return resp; - } - }), - ); - stats.timing('cdk_events_latency', executeStartTime, { - destination, - ...metaTags, - }); - logger.debug(`[DT] Output events: ${JSON.stringify(respList)}`); - stats.histogram('dest_transform_output_events', respList.length, { - destination, - version, - ...metaTags, - }); - ctx.body = respList.flat(); - return ctx.body; -} - -async function handleValidation(ctx) { - const requestStartTime = new Date(); - const events = ctx.request.body; - const requestSize = Number(ctx.request.get('content-length')); - const reqParams = ctx.request.query; - const respList = []; - const metaTags = events[0].metadata ? getMetadata(events[0].metadata) : {}; - let ctxStatusCode = 200; - // eslint-disable-next-line no-restricted-syntax - for (const event of events) { - const eventStartTime = new Date(); - try { - const parsedEvent = event; - parsedEvent.request = { query: reqParams }; - // eslint-disable-next-line no-await-in-loop - const hv = await eventValidator.handleValidation(parsedEvent); - sendViolationMetrics(hv.validationErrors, hv.dropEvent, metaTags); - if (hv.dropEvent) { - respList.push({ - output: event.message, - metadata: event.metadata, - statusCode: 400, - validationErrors: hv.validationErrors, - error: JSON.stringify(constructValidationErrors(hv.validationErrors)), - }); - stats.counter('hv_violation_type', 1, { - violationType: hv.violationType, - ...metaTags, - }); - } else { - respList.push({ - output: event.message, - metadata: event.metadata, - statusCode: 200, - validationErrors: hv.validationErrors, - error: JSON.stringify(constructValidationErrors(hv.validationErrors)), - }); - stats.counter('hv_propagated_events', 1, { - ...metaTags, - }); - } - } catch (error) { - const errMessage = `Error occurred while validating : ${error}`; - logger.error(errMessage); - let status = 200; - if (error instanceof RetryRequestError) { - ctxStatusCode = error.statusCode; - } - if (error instanceof RespStatusError) { - status = error.statusCode; - } - respList.push({ - output: event.message, - metadata: event.metadata, - statusCode: status, - validationErrors: [], - error: errMessage, - }); - stats.counter('hv_errors', 1, { - ...metaTags, - }); - } finally { - stats.timing('hv_event_latency', eventStartTime, { - ...metaTags, - }); - } - } - ctx.body = respList; - ctx.status = ctxStatusCode; - ctx.set('apiVersion', API_VERSION); - - stats.counter('hv_events_count', events.length, { - ...metaTags, - }); - stats.histogram('hv_request_size', requestSize, { - ...metaTags, - }); - stats.timing('hv_request_latency', requestStartTime, { - ...metaTags, - }); -} - -async function isValidRouterDest(event, destType) { - try { - const routerDestHandler = getDestHandler('v0', destType); - return routerDestHandler?.processRouterDest !== undefined; - } catch (error) { - return false; - } -} -/** - * @deprecated this function is deprecated and will be removed in future release - */ -async function routerHandleDest(ctx) { - const getReqMetadata = () => { - try { - return { - destType: ctx.request?.body?.destType, - }; - } catch (error) { - // Do nothing - } - return {}; - }; - - const respEvents = []; - let destType; - let defTags; - try { - const { input } = ctx.request.body; - destType = ctx.request.body.destType; - defTags = { - [tags.TAG_NAMES.DEST_TYPE]: destType.toUpperCase(), - [tags.TAG_NAMES.MODULE]: tags.MODULES.DESTINATION, - [tags.TAG_NAMES.FEATURE]: tags.FEATURES.ROUTER, - [tags.TAG_NAMES.IMPLEMENTATION]: tags.IMPLEMENTATIONS.NATIVE, - }; - - const routerDestHandler = getDestHandler('v0', destType); - const isValidRTDest = await isValidRouterDest(input[0], destType); - if (!isValidRTDest) { - ctx.status = 404; - ctx.body = `${destType} doesn't support router transform`; - return null; - } - const allDestEvents = lodash.groupBy(input, (event) => event.destination.ID); - await Promise.all( - Object.values(allDestEvents).map(async (destInputArray) => { - const newDestInputArray = processDynamicConfig(destInputArray, 'router'); - const listOutput = await handleV0Destination(routerDestHandler.processRouterDest, [ - newDestInputArray, - { ...getCommonMetadata(ctx), ...getReqMetadata() }, - ]); - const hasProcMetadataForRouter = routerDestHandler.processMetadataForRouter; - // enriching transformed event - listOutput.forEach((listOut) => { - const { batchedRequest } = listOut; - if (Array.isArray(batchedRequest)) { - // eslint-disable-next-line no-param-reassign - listOut.batchedRequest = batchedRequest.map((batReq) => enrichTransformedEvent(batReq)); - } else if (batchedRequest && typeof batchedRequest === 'object') { - // eslint-disable-next-line no-param-reassign - listOut.batchedRequest = enrichTransformedEvent(batchedRequest); - } - - if (hasProcMetadataForRouter) { - // eslint-disable-next-line no-param-reassign - listOut.metadata = routerDestHandler.processMetadataForRouter(listOut); - } - }); - respEvents.push(...listOutput); - }), - ); - - // Add default stat tags - respEvents - .filter( - (resp) => - 'error' in resp && lodash.isObject(resp.statTags) && !lodash.isEmpty(resp.statTags), - ) - .forEach((resp) => { - // eslint-disable-next-line no-param-reassign - resp.statTags = { - ...resp.statTags, - ...defTags, - [tags.TAG_NAMES.DESTINATION_ID]: resp.metadata[0]?.destinationId, - [tags.TAG_NAMES.WORKSPACE_ID]: resp.metadata[0]?.workspaceId, - }; - }); - } catch (error) { - logger.error(error); - - const errObj = generateErrorObject(error, defTags); - - const resp = { - statusCode: errObj.status, - error: errObj.message, - statTags: errObj.statTags, - }; - - // Add support to perform refreshToken action for OAuth destinations - if (error?.authErrorCategory) { - resp.authErrorCategory = error.authErrorCategory; - } - - errNotificationClient.notify(error, 'Router Transformation', { - ...resp, - ...getCommonMetadata(ctx), - ...getReqMetadata(), - }); - - respEvents.push(resp); - } - ctx.body = { output: respEvents }; - return ctx.body; -} - -if (startDestTransformer) { - SUPPORTED_VERSIONS.forEach((version) => { - const destinations = getIntegrations(path.resolve(__dirname, `../${version}/destinations`)); - destinations.forEach((destination) => { - // eg. v0/destinations/ga - router.post(`/${version}/destinations/${destination}`, async (ctx) => { - const startTime = new Date(); - await handleDest(ctx, version, destination); - ctx.set('apiVersion', API_VERSION); - // Assuming that events are from one single source - - const metaTags = - ctx.request.body && ctx.request.body.length > 0 && ctx.request.body[0].metadata - ? getMetadata(ctx.request.body[0].metadata) - : {}; - - stats.timing('dest_transform_request_latency', startTime, { - destination, - version, - ...metaTags, - }); - stats.increment('dest_transform_requests', { - destination, - version, - ...metaTags, - }); - }); - // eg. v0/ga. will be deprecated in favor of v0/destinations/ga format - router.post(`/${version}/${destination}`, async (ctx) => { - const startTime = new Date(); - await handleDest(ctx, version, destination); - ctx.set('apiVersion', API_VERSION); - // Assuming that events are from one single source - - const metaTags = - ctx.request.body && ctx.request.body.length > 0 && ctx.request.body[0].metadata - ? getMetadata(ctx.request.body[0].metadata) - : {}; - - stats.timing('dest_transform_request_latency', startTime, { - destination, - ...metaTags, - }); - stats.increment('dest_transform_requests', { - destination, - version, - ...metaTags, - }); - }); - router.post('/routerTransform', async (ctx) => { - ctx.set('apiVersion', API_VERSION); - await routerHandleDest(ctx); - }); - }); - }); - - if (functionsEnabled()) { - router.post('/extractLibs', async (ctx) => { - try { - const { - code, - versionId, - validateImports = false, - additionalLibraries = [], - language = 'javascript', - testMode = false, - } = ctx.request.body; - - if (!code) { - throw new Error('Invalid request. Code is missing'); - } - - const obj = await extractLibraries( - code, - versionId, - validateImports, - additionalLibraries, - language, - testMode || versionId === 'testVersionId', - ); - ctx.body = obj; - } catch (err) { - ctx.status = 400; - ctx.body = { error: err.error || err.message }; - } - }); - - // eslint-disable-next-line sonarjs/cognitive-complexity - router.post('/customTransform', async (ctx) => { - const startTime = new Date(); - const events = ctx.request.body; - const { processSessions } = ctx.query; - logger.debug(`[CT] Input events: ${JSON.stringify(events)}`); - stats.histogram('user_transform_input_events', events.length, { - processSessions, - }); - let groupedEvents; - if (processSessions) { - groupedEvents = lodash.groupBy(events, (event) => { - // to have the backward-compatibility and being extra careful. We need to remove this (message.anonymousId) in next release. - const rudderId = event.metadata.rudderId || event.message.anonymousId; - return `${event.destination.ID}_${event.metadata.sourceId}_${rudderId}`; - }); - } else { - groupedEvents = lodash.groupBy( - events, - (event) => `${event.metadata.destinationId}_${event.metadata.sourceId}`, - ); - } - stats.counter('user_transform_function_group_size', Object.entries(groupedEvents).length, {}); - - let ctxStatusCode = 200; - const transformedEvents = []; - let librariesVersionIDs = []; - if (events[0].libraries) { - librariesVersionIDs = events[0].libraries.map((library) => library.VersionID); - } - await Promise.all( - Object.entries(groupedEvents).map(async ([dest, destEvents]) => { - logger.debug(`dest: ${dest}`); - const transformationVersionId = - destEvents[0] && - destEvents[0].destination && - destEvents[0].destination.Transformations && - destEvents[0].destination.Transformations[0] && - destEvents[0].destination.Transformations[0].VersionID; - const messageIds = destEvents.map((ev) => ev.metadata && ev.metadata.messageId); - const commonMetadata = { - sourceId: destEvents[0].metadata && destEvents[0].metadata.sourceId, - destinationId: destEvents[0].metadata && destEvents[0].metadata.destinationId, - destinationType: destEvents[0].metadata && destEvents[0].metadata.destinationType, - messageIds, - }; - - const metaTags = - destEvents.length > 0 && destEvents[0].metadata - ? getMetadata(destEvents[0].metadata) - : {}; - if (transformationVersionId) { - let destTransformedEvents; - try { - destTransformedEvents = await userTransformHandler()( - destEvents, - transformationVersionId, - librariesVersionIDs, - ); - transformedEvents.push( - ...destTransformedEvents.map((ev) => { - if (ev.error) { - return { - statusCode: 400, - error: ev.error, - metadata: lodash.isEmpty(ev.metadata) ? commonMetadata : ev.metadata, - }; - } - if (!isNonFuncObject(ev.transformedEvent)) { - return { - statusCode: 400, - error: `returned event in events from user transformation is not an object. transformationVersionId:${transformationVersionId} and returned event: ${JSON.stringify( - ev.transformedEvent, - )}`, - metadata: lodash.isEmpty(ev.metadata) ? commonMetadata : ev.metadata, - }; - } - return { - output: ev.transformedEvent, - metadata: lodash.isEmpty(ev.metadata) ? commonMetadata : ev.metadata, - statusCode: 200, - }; - }), - ); - } catch (error) { - logger.error(error); - let status = 400; - const errorString = error.toString(); - if (error instanceof RetryRequestError) { - ctxStatusCode = error.statusCode; - } - if (error instanceof RespStatusError) { - status = error.statusCode; - } - destTransformedEvents = destEvents.map((e) => ({ - statusCode: status, - metadata: e.metadata, - error: errorString, - })); - transformedEvents.push(...destTransformedEvents); - stats.counter('user_transform_errors', destEvents.length, { - transformationVersionId, - processSessions, - ...metaTags, - }); - } - } else { - const errorMessage = 'Transformation VersionID not found'; - logger.error(`[CT] ${errorMessage}`); - transformedEvents.push({ - statusCode: 400, - error: errorMessage, - metadata: commonMetadata, - }); - stats.counter('user_transform_errors', destEvents.length, { - transformationVersionId, - processSessions, - ...metaTags, - }); - } - }), - ); - logger.debug(`[CT] Output events: ${JSON.stringify(transformedEvents)}`); - ctx.body = transformedEvents; - ctx.status = ctxStatusCode; - ctx.set('apiVersion', API_VERSION); - - stats.timingSummary('user_transform_request_latency_summary', startTime, {}); - stats.increment('user_transform_requests', {}); - stats.histogram('user_transform_output_events', transformedEvents.length, {}); - }); - } -} - -if (transformerTestModeEnabled) { - router.post('/transformation/test', async (ctx) => { - try { - const { events, trRevCode, libraryVersionIDs = [] } = ctx.request.body; - if (!trRevCode || !trRevCode.code || !trRevCode.codeVersion) { - throw new Error('Invalid Request. Missing parameters in transformation code block'); - } - if (!events || events.length === 0) { - throw new Error('Invalid request. Missing events'); - } - - logger.debug(`[CT] Test Input Events: ${JSON.stringify(events)}`); - trRevCode.versionId = 'testVersionId'; - const res = await userTransformHandler()( - events, - trRevCode.versionId, - libraryVersionIDs, - trRevCode, - true, - ); - logger.debug(`[CT] Test Output Events: ${JSON.stringify(res.transformedEvents)}`); - ctx.body = res; - } catch (error) { - ctx.status = error.statusCode || 400; - ctx.body = { error: error.message }; - } - }); - - router.post('/transformationLibrary/test', async (ctx) => { - try { - const { code, language = 'javascript' } = ctx.request.body; - - if (!code) { - throw new Error('Invalid request. Missing code'); - } - - const res = await validateCode(code, language); - ctx.body = res; - } catch (error) { - ctx.body = { error: error.message }; - ctx.status = 400; - } - }); - /* *params - * code: transfromation code - * language - * name - */ - router.post('/transformation/sethandle', async (ctx) => { - try { - const { trRevCode, libraryVersionIDs = [] } = ctx.request.body; - const { code, versionId, language, testName } = trRevCode || {}; - if (!code || !language || !testName || (language === 'pythonfaas' && !versionId)) { - throw new Error('Invalid Request. Missing parameters in transformation code block'); - } - - logger.debug(`[CT] Setting up a transformation ${testName}`); - if (!trRevCode.versionId) { - trRevCode.versionId = 'testVersionId'; - } - if (!trRevCode.workspaceId) { - trRevCode.workspaceId = 'workspaceId'; - } - const res = await setupUserTransformHandler(libraryVersionIDs, trRevCode); - logger.debug(`[CT] Finished setting up transformation: ${testName}`); - ctx.body = res; - } catch (error) { - ctx.status = 400; - ctx.body = { error: error.message }; - } - }); -} - -async function handleSource(ctx, version, source) { - const getReqMetadata = () => { - try { - return { srcType: source }; - } catch (error) { - // Do nothing - } - return {}; - }; - - const sourceHandler = getSourceHandler(version, source); - const events = ctx.request.body; - logger.debug(`[ST] Input source events: ${JSON.stringify(events)}`); - stats.counter('source_transform_input_events', events.length, { - source, - version, - }); - const respList = []; - await Promise.all( - events.map(async (event) => { - try { - const respEvents = await sourceHandler.process(event); - - // We send response back to the source - // through outputToSource. This is not sent to gateway - if (Object.prototype.hasOwnProperty.call(respEvents, 'outputToSource')) { - respList.push(respEvents); - return; - } - - if (Array.isArray(respEvents)) { - respList.push({ output: { batch: respEvents } }); - } else { - respList.push({ output: { batch: [respEvents] } }); - } - } catch (error) { - logger.error(error); - - // TODO: Update the data contact for source transformation - // and then send the following additional information - // const errObj = generateErrorObject(error, { - // [tags.TAG_NAMES.SRC_TYPE]: source.toUpperCase(), - // [tags.TAG_NAMES.MODULE]: tags.MODULES.SOURCE, - // [tags.TAG_NAMES.IMPLEMENTATION]: tags.IMPLEMENTATIONS.NATIVE, - // [tags.TAG_NAMES.FEATURE]: tags.FEATURES.PROCESSOR - // [tags.TAG_NAMES.SOURCE_ID]: TBD - // }); - - // const resp = { - // statusCode: errObj.status, - // error: errObj.message, - // statTags: errObj.statTags - // }; - - const resp = { - statusCode: 400, - error: error.message || PAYLOAD_PROC_ERR_MSG, - }; - - respList.push(resp); - - stats.counter('source_transform_errors', events.length, { - source, - version, - }); - errNotificationClient.notify(error, 'Source Transformation', { - ...resp, - ...getCommonMetadata(ctx), - ...getReqMetadata(), - }); - } - }), - ); - logger.debug(`[ST] Output source events: ${JSON.stringify(respList)}`); - stats.increment('source_transform_output_events', respList.length, { - source, - version, - }); - ctx.body = respList; - ctx.set('apiVersion', API_VERSION); -} - -if (startSourceTransformer) { - SUPPORTED_VERSIONS.forEach((version) => { - const sources = getIntegrations(path.resolve(__dirname, `../${version}/sources`)); - sources.forEach((source) => { - // eg. v0/sources/customerio - router.post(`/${version}/sources/${source}`, async (ctx) => { - const startTime = new Date(); - await handleSource(ctx, version, source); - - stats.timing('source_transform_request_latency', startTime, { - source, - version, - }); - stats.increment('source_transform_requests', { source, version }); - }); - }); - }); -} -/** - * @deprecated this function is deprecated and will be removed in future release - */ -async function handleProxyRequest(destination, ctx) { - const getReqMetadata = () => { - try { - return { destType: destination }; - } catch (error) { - // Do nothing - } - return {}; - }; - - const { metadata, ...destinationRequest } = ctx.request.body; - const destNetworkHandler = networkHandlerFactory.getNetworkHandler(destination); - let response; - try { - stats.counter('tf_proxy_dest_req_count', 1, { - destination, - }); - const startTime = new Date(); - const rawProxyResponse = await destNetworkHandler.proxy(destinationRequest); - - stats.timing('transformer_proxy_time', startTime, { - destination, - }); - stats.counter('tf_proxy_dest_resp_count', 1, { - destination, - success: rawProxyResponse.success, - }); - - const processedProxyResponse = destNetworkHandler.processAxiosResponse(rawProxyResponse); - stats.counter('tf_proxy_proc_ax_response_count', 1, { - destination, - }); - response = destNetworkHandler.responseHandler( - { ...processedProxyResponse, rudderJobMetadata: metadata }, - destination, - ); - stats.counter('tf_proxy_resp_handler_count', 1, { - destination, - }); - } catch (err) { - logger.error('Error occurred while completing proxy request:'); - logger.error(err); - - const errObj = generateErrorObject( - err, - { - [tags.TAG_NAMES.DEST_TYPE]: destination.toUpperCase(), - [tags.TAG_NAMES.MODULE]: tags.MODULES.DESTINATION, - [tags.TAG_NAMES.IMPLEMENTATION]: tags.IMPLEMENTATIONS.NATIVE, - [tags.TAG_NAMES.FEATURE]: tags.FEATURES.DATA_DELIVERY, - [tags.TAG_NAMES.DESTINATION_ID]: metadata?.destinationId, - [tags.TAG_NAMES.WORKSPACE_ID]: metadata?.workspaceId, - }, - false, - ); - - response = { - status: errObj.status, - ...(errObj.authErrorCategory && { - authErrorCategory: errObj.authErrorCategory, - }), - destinationResponse: errObj.destinationResponse, - message: errObj.message, - statTags: errObj.statTags, - }; - - stats.counter('tf_proxy_err_count', 1, { - destination, - }); - - errNotificationClient.notify(err, 'Data Delivery', { - ...response, - ...getCommonMetadata(ctx), - ...getReqMetadata(), - }); - } - ctx.body = { output: response }; - // Sending `204` status(obtained from destination) is not working as expected - // Since this is success scenario, we'll be forcefully sending `200` status-code to server - ctx.status = getCompatibleStatusCode(response.status); - return ctx.body; -} - -if (transformerProxy) { - SUPPORTED_VERSIONS.forEach((version) => { - const destinations = getIntegrations(path.resolve(__dirname, `../${version}/destinations`)); - destinations.forEach((destination) => { - router.post(`/${version}/destinations/${destination}/proxy`, async (ctx) => { - const startTime = new Date(); - ctx.set('apiVersion', API_VERSION); - await handleProxyRequest(destination, ctx); - - stats.timing('transformer_total_proxy_latency', startTime, { - destination, - version, - }); - }); - }); - }); -} - -if (proxyTestModeEnabled) { - router.use(destProxyRoutes); -} - -router.get('/version', (ctx) => { - ctx.body = process.env.npm_package_version || 'Version Info not found'; -}); - -router.get('/transformerBuildVersion', (ctx) => { - ctx.body = process.env.transformer_build_version || 'Version Info not found'; -}); - -router.get('/health', (ctx) => { - const { git_commit_sha: gitCommitSha, transformer_build_version: imageVersion } = process.env; - ctx.body = { - service: 'UP', - ...(imageVersion && { version: imageVersion }), - ...(gitCommitSha && { gitCommitSha }), - }; -}); - -router.get('/features', (ctx) => { - const obj = JSON.parse(fs.readFileSync(path.resolve(__dirname, 'features.json'), 'utf8')); - ctx.body = JSON.stringify(obj); -}); -/** - * @deprecated this function is deprecated and will be removed in future release - */ -const batchHandler = (ctx) => { - const getReqMetadata = (destEvents) => { - try { - const reqBody = ctx.request.body; - const firstEvent = destEvents[0]; - return { - destType: reqBody?.destType, - destinationId: firstEvent?.destination?.ID, - destName: firstEvent?.destination?.Name, - metadata: firstEvent?.metadata, - }; - } catch (error) { - // Do nothing - } - return {}; - }; - - const { destType, input } = ctx.request.body; - const destHandler = getDestHandler('v0', destType); - if (!destHandler || !destHandler.batch) { - ctx.status = 404; - ctx.body = `${destType} doesn't support batching`; - return null; - } - const allDestEvents = lodash.groupBy(input, (event) => event.destination.ID); - - const response = { batchedRequests: [], errors: [] }; - Object.entries(allDestEvents).forEach(([, destEvents]) => { - try { - // eslint-disable-next-line no-param-reassign - destEvents = processDynamicConfig(destEvents, 'batch'); - const destBatchedRequests = destHandler.batch(destEvents); - response.batchedRequests.push(...destBatchedRequests); - } catch (error) { - const errorObj = generateErrorObject(error, { - [tags.TAG_NAMES.DEST_TYPE]: destType.toUpperCase(), - [tags.TAG_NAMES.MODULE]: tags.MODULES.DESTINATION, - [tags.TAG_NAMES.IMPLEMENTATION]: tags.IMPLEMENTATIONS.NATIVE, - [tags.TAG_NAMES.FEATURE]: tags.FEATURES.BATCH, - [tags.TAG_NAMES.DESTINATION_ID]: destEvents[0].metadata?.destinationId, - [tags.TAG_NAMES.WORKSPACE_ID]: destEvents[0].metadata?.workspaceId, - }); - const errResp = getErrorRespEvents( - destEvents.map((d) => d.metadata), - 500, // not using errorObj.status - errorObj.message, - errorObj.statTags, - ); - response.errors.push({ - ...errResp, - destination: destEvents[0].destination, - }); - errNotificationClient.notify(error, 'Batch Transformation', { - ...errResp, - ...getCommonMetadata(ctx), - ...getReqMetadata(destEvents), - }); - } - }); - if (response.errors.length > 0) { - ctx.status = 500; - ctx.body = response.errors; - return null; - } - ctx.body = response.batchedRequests; - return ctx.body; -}; -router.post('/batch', (ctx) => { - ctx.set('apiVersion', API_VERSION); - batchHandler(ctx); -}); - -/** - * @deprecated this function is deprecated and will be removed in future release - */ -const fileUpload = async (ctx) => { - const getReqMetadata = () => { - try { - const reqBody = ctx.request.body; - return { destType: reqBody?.destType }; - } catch (error) { - // Do nothing - } - return {}; - }; - - const { destType } = ctx.request.body; - const destFileUploadHandler = getDestFileUploadHandler('v0', destType.toLowerCase()); - - if (!destFileUploadHandler || !destFileUploadHandler.processFileData) { - ctx.status = 404; - ctx.body = `${destType} doesn't support bulk upload`; - return null; - } - let response; - try { - response = await destFileUploadHandler.processFileData(ctx.request.body); - } catch (error) { - response = { - statusCode: error.response ? error.response.status : 400, - error: error.message || PAYLOAD_PROC_ERR_MSG, - metadata: error.response ? error.response.metadata : null, - }; - errNotificationClient.notify(error, 'File Upload', { - ...response, - ...getCommonMetadata(ctx), - ...getReqMetadata(), - }); - } - ctx.body = response; - return ctx.body; -}; - -const jobAndPollStatusReqMetadata = (ctx) => { - try { - const reqBody = ctx.request.body; - return { destType: reqBody?.destType, importId: reqBody?.importId }; - } catch (error) { - // Do nothing - } - return {}; -}; - -/** - * @deprecated this function is deprecated and will be removed in future release - */ -const pollStatus = async (ctx) => { - const { destType } = ctx.request.body; - const destFileUploadHandler = getPollStatusHandler('v0', destType.toLowerCase()); - let response; - if (!destFileUploadHandler || !destFileUploadHandler.processPolling) { - ctx.status = 404; - ctx.body = `${destType} doesn't support bulk upload`; - return null; - } - try { - response = await destFileUploadHandler.processPolling(ctx.request.body); - } catch (error) { - response = { - statusCode: error.response ? error.response.status : 400, - error: error.message || PAYLOAD_PROC_ERR_MSG, - }; - errNotificationClient.notify(error, 'Poll Status', { - ...response, - ...getCommonMetadata(ctx), - ...jobAndPollStatusReqMetadata(ctx), - }); - } - ctx.body = response; - return ctx.body; -}; - -/** - * @deprecated this function is deprecated and will be removed in future release - */ -const getJobStatus = async (ctx, type) => { - const { destType } = ctx.request.body; - const destFileUploadHandler = getJobStatusHandler('v0', destType.toLowerCase()); - - if (!destFileUploadHandler || !destFileUploadHandler.processJobStatus) { - ctx.status = 404; - ctx.body = `${destType} doesn't support bulk upload`; - return null; - } - let response; - try { - response = await destFileUploadHandler.processJobStatus(ctx.request.body, type); - } catch (error) { - response = { - statusCode: error.response ? error.response.status : 400, - error: error.message || PAYLOAD_PROC_ERR_MSG, - }; - errNotificationClient.notify(error, 'Job Status', { - ...response, - ...getCommonMetadata(ctx), - ...jobAndPollStatusReqMetadata(ctx), - }); - } - ctx.body = response; - return ctx.body; -}; - -/** - * @deprecated this function is deprecated and will be removed in future release - */ -const handleDeletionOfUsers = async (ctx) => { - const getReqMetadata = () => { - try { - const reqBody = ctx.request.body; - return { - destType: reqBody[0]?.destType, - jobs: reqBody.map((req) => req.jobId), - }; - } catch (error) { - // Do nothing - } - return {}; - }; - - const getRudderDestInfo = () => { - try { - const rudderDestInfoHeader = ctx.get('x-rudder-dest-info'); - const destInfoHeader = JSON.parse(rudderDestInfoHeader); - if (!Array.isArray(destInfoHeader)) { - return destInfoHeader; - } - } catch (error) { - logger.error(`Error while getting rudderDestInfo header value: ${error}`); - } - return {}; - }; - - const { body } = ctx.request; - const respList = []; - const rudderDestInfo = getRudderDestInfo(); - let response; - await Promise.all( - body.map(async (reqBody) => { - const { destType } = reqBody; - const destUserDeletionHandler = getDeletionUserHandler('v0', destType.toLowerCase()); - if (!destUserDeletionHandler || !destUserDeletionHandler.processDeleteUsers) { - ctx.status = 404; - ctx.body = "Doesn't support deletion of users"; - return null; - } - - try { - response = await destUserDeletionHandler.processDeleteUsers({ - ...reqBody, - rudderDestInfo, - }); - if (response) { - respList.push(response); - } - } catch (error) { - const errObj = generateErrorObject( - error, - { - [tags.TAG_NAMES.DEST_TYPE]: destType.toUpperCase(), - [tags.TAG_NAMES.MODULE]: tags.MODULES.DESTINATION, - [tags.TAG_NAMES.IMPLEMENTATION]: tags.IMPLEMENTATIONS.NATIVE, - [tags.TAG_NAMES.FEATURE]: tags.FEATURES.USER_DELETION, - }, - false, - ); - - // adding the status to the request - ctx.status = errObj.status; - const resp = { - statusCode: errObj.status, - error: errObj.message, - ...(errObj.authErrorCategory && { - authErrorCategory: errObj.authErrorCategory, - }), - }; - - respList.push(resp); - logger.error(`Error Response List: ${JSON.stringify(respList, null, 2)}`); - - errNotificationClient.notify(error, 'User Deletion', { - ...resp, - ...getCommonMetadata(ctx), - ...getReqMetadata(), - }); - } - return undefined; - }), - ); - ctx.body = respList; - return ctx.body; - // const { destType } = ctx.request.body; -}; - -router.post('/fileUpload', async (ctx) => { - await fileUpload(ctx); -}); - -router.post('/pollStatus', async (ctx) => { - await pollStatus(ctx); -}); - -router.post('/getFailedJobs', async (ctx) => { - await getJobStatus(ctx, 'fail'); -}); - -router.post('/getWarningJobs', async (ctx) => { - await getJobStatus(ctx, 'warn'); -}); -// eg. v0/validate. will validate events as per respective tracking plans -router.post(`/v0/validate`, async (ctx) => { - await handleValidation(ctx); -}); - -// Api to handle deletion of users for data regulation -// { -// "destType": "dest name", -// "userAttributes": [ -// { -// "userId": "user_1" -// }, -// { -// "userId": "user_2" -// } -// ], -// "config": { -// "apiKey": "", -// "apiSecret": "" -// } -// } -router.post(`/deleteUsers`, async (ctx) => { - await handleDeletionOfUsers(ctx); -}); - -module.exports = { - router, - handleDest, - routerHandleDest, - batchHandler, - handleProxyRequest, - handleDeletionOfUsers, - fileUpload, - pollStatus, - getJobStatus, - handleV0Destination, - getDestHandler, -}; diff --git a/src/routes/bulkUpload.ts b/src/routes/bulkUpload.ts deleted file mode 100644 index efbd81c34e..0000000000 --- a/src/routes/bulkUpload.ts +++ /dev/null @@ -1,17 +0,0 @@ -import Router from '@koa/router'; -import { - fileUpload, - pollStatus, - getFailedJobStatus, - getWarnJobStatus, -} from '../controllers/bulkUpload'; - -const router = new Router(); - -router.post('/fileUpload', fileUpload); -router.post('/pollStatus', pollStatus); -router.post('/getFailedJobs', getFailedJobStatus); -router.post('/getWarningJobs', getWarnJobStatus); -const bulkUploadRoutes = router.routes(); - -export default bulkUploadRoutes; diff --git a/src/routes/index.ts b/src/routes/index.ts index d77584bea3..890d5752f6 100644 --- a/src/routes/index.ts +++ b/src/routes/index.ts @@ -5,7 +5,6 @@ import dotenv from 'dotenv'; import { koaSwagger } from 'koa2-swagger-ui'; import path from 'path'; import userTransformRoutes from './userTransform'; -import bulkUploadRoutes from './bulkUpload'; import proxyRoutes from './delivery'; import destinationRoutes from './destination'; import miscRoutes from './misc'; @@ -21,7 +20,6 @@ dotenv.config(); const enableSwagger = process.env.ENABLE_SWAGGER === 'true'; export function applicationRoutes(app: Koa) { - app.use(bulkUploadRoutes); app.use(proxyRoutes); app.use(destinationRoutes); app.use(miscRoutes); diff --git a/test/__tests__/legacyRouter.test.ts b/test/__tests__/legacyRouter.test.ts deleted file mode 100644 index 926f6e76d4..0000000000 --- a/test/__tests__/legacyRouter.test.ts +++ /dev/null @@ -1,113 +0,0 @@ -import fs from 'fs'; -import path from 'path'; -import { DestinationController } from '../../src/controllers/destination'; -const destArg = process.argv.filter((x) => x.startsWith('--destName='))[0]; // send arguments on which destination -const typeArg = process.argv.filter((x) => x.startsWith('--type='))[0]; // send argument on which function - -// To invoke CDK live compare: -// router: CDK_LIVE_TEST=1 npx jest versionedRouter --destName=algolia --type=router -// processor: CDK_LIVE_TEST=1 npx jest versionedRouter --destName=algolia --type=processor - -let destination; -if (typeArg) { - destination = destArg ? destArg.split('=')[1] : 'heap'; // default - const type = typeArg.split('=')[1]; - let reqBody; - let respBody; - if (type !== 'all') { - try { - reqBody = JSON.parse( - fs - .readFileSync( - path.resolve(__dirname, `./data/versioned_${type}_${destination}_input.json`), - ) - .toString(), - ); - respBody = JSON.parse( - fs - .readFileSync( - path.resolve(__dirname, `./data/versioned_${type}_${destination}_output.json`), - ) - .toString(), - ); - } catch (error) { - throw new Error('destination/type not valid' + error); - } - } - if (type === 'router') { - it(`Testing: routerHandleDest`, async () => { - const output = await DestinationController.destinationTransformAtRouter(reqBody); - expect(output).toEqual(respBody); - }); - } else if (type === 'processor') { - it(`Testing: handleDest`, async () => { - const output = await DestinationController.destinationTransformAtProcessor(reqBody); - expect(output).toEqual(respBody); - }); - } else if (type === 'batch') { - it(`Testing: batchHandler`, async () => { - const output = await DestinationController.batchProcess(reqBody); - expect(output).toEqual(respBody); - }); - } else if (type === 'all') { - it(`Testing: routerHandleDest`, async () => { - const reqBody = JSON.parse( - fs - .readFileSync( - path.resolve(__dirname, `./data/versioned_router_${destination}_input.json`), - ) - .toString(), - ); - const respBody = JSON.parse( - fs - .readFileSync( - path.resolve(__dirname, `./data/versioned_router_${destination}_output.json`), - ) - .toString(), - ); - const output = await DestinationController.destinationTransformAtRouter(reqBody); - expect(output).toEqual(respBody); - }); - it(`Testing: handleDest`, async () => { - const reqBody = JSON.parse( - fs - .readFileSync( - path.resolve(__dirname, `./data/versioned_processor_${destination}_input.json`), - ) - .toString(), - ); - const respBody = JSON.parse( - fs - .readFileSync( - path.resolve(__dirname, `./data/versioned_processor_${destination}_output.json`), - ) - .toString(), - ); - destination = destination || 'heap'; // default - const output = await DestinationController.destinationTransformAtProcessor(reqBody); - expect(output).toEqual(respBody); - }); - it(`Testing: batchHandler`, async () => { - const reqBody = JSON.parse( - fs - .readFileSync(path.resolve(__dirname, `./data/versioned_batch_braze_input.json`)) - .toString(), - ); - const respBody = JSON.parse( - fs - .readFileSync(path.resolve(__dirname, `./data/versioned_batch_braze_output.json`)) - .toString(), - ); - }); - } else { - it(`Type is not all/router/batch/processor`, () => { - expect('Type is not all/router/batch/processor').toEqual( - 'Type is not all/router/batch/processor', - ); - }); - } -} else { - it(`No type and destination mentioned for testing versionedRouter`, () => { - expect('no command line argument provided').toEqual('no command line argument provided'); - }); -}