Skip to content

Commit

Permalink
refactor proxy service so proxying doesn't require access to db (#1602)
Browse files Browse the repository at this point in the history
* refactor proxy service so proxying doesn't require access to db

We don't want runners to have direct access to the datbase.
The goal of this commit is to remove database dependency when calling
the proxy service, while still executing the http request directly
without going through our own API

* string equality without type conversion

Co-authored-by: Samuel Bodin <[email protected]>

* string equality without type conversion

Co-authored-by: Samuel Bodin <[email protected]>

* fix: await log function

* proxy.service: remove unecessary if(activityLogId)

* refactor proxy.service to remove db access from response handlers

* refactor linear post connection

* remove redundant attributes

---------

Co-authored-by: Samuel Bodin <[email protected]>
  • Loading branch information
TBonnin and bodinsamuel authored Feb 8, 2024
1 parent 958e5b0 commit af00599
Show file tree
Hide file tree
Showing 14 changed files with 579 additions and 661 deletions.
282 changes: 78 additions & 204 deletions packages/server/lib/controllers/proxy.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ import type { OutgoingHttpHeaders } from 'http';
import stream, { Readable, Transform, TransformCallback, PassThrough } from 'stream';
import url, { UrlWithParsedQuery } from 'url';
import querystring from 'querystring';
import axios, { AxiosError, AxiosResponse } from 'axios';
import axios, { AxiosError, AxiosRequestConfig, AxiosResponse } from 'axios';
import { backOff } from 'exponential-backoff';
import { ActivityLogMessage, NangoError } from '@nangohq/shared';
import { updateProvider as updateProviderActivityLog, updateEndpoint as updateEndpointActivityLog } from '@nangohq/shared';

import {
createActivityLog,
createActivityLogMessageAndEnd,
createActivityLogMessage,
updateSuccess as updateSuccessActivityLog,
HTTP_VERB,
LogLevel,
Expand All @@ -21,8 +24,9 @@ import {
InternalProxyConfiguration,
ApplicationConstructedProxyConfiguration,
ErrorSourceEnum,
ServiceResponse,
proxyService
proxyService,
connectionService,
configService
} from '@nangohq/shared';

interface ForwardedHeaders {
Expand All @@ -45,8 +49,8 @@ class ProxyController {
const retries = req.get('Retries') as string;
const baseUrlOverride = req.get('Base-Url-Override') as string;
const decompress = req.get('Decompress') as string;
const isSync = req.get('Nango-Is-Sync') as string;
const isDryRun = req.get('Nango-Is-Dry-Run') as string;
const isSync = (req.get('Nango-Is-Sync') as string) === 'true';
const isDryRun = (req.get('Nango-Is-Dry-Run') as string) === 'true';
const existingActivityLogId = req.get('Nango-Activity-Log-Id') as number | string;
const environment_id = getEnvironmentId(res);
const accountId = getAccount(res);
Expand Down Expand Up @@ -93,34 +97,61 @@ class ProxyController {
method: method.toUpperCase() as HTTP_VERB
};

const {
success: connSuccess,
error: connError,
response: connection
} = await connectionService.getConnectionCredentials(accountId, environment_id, connectionId, providerConfigKey, activityLogId, logAction, false);

if (!connSuccess || !connection) {
throw new Error(`Failed to get connection credentials: '${connError}'`);
}
const providerConfig = await configService.getProviderConfig(providerConfigKey, environment_id);

if (!providerConfig) {
await createActivityLogMessageAndEnd({
level: 'error',
environment_id,
activity_log_id: activityLogId as number,
timestamp: Date.now(),
content: 'Provider configuration not found'
});
throw new NangoError('unknown_provider_config');
}
await updateProviderActivityLog(activityLogId as number, providerConfig.provider);

const internalConfig: InternalProxyConfiguration = {
existingActivityLogId: activityLogId as number,
environmentId: environment_id,
accountId,
throwErrors: false,
isFlow: isSync === 'true',
isDryRun: isDryRun === 'true'
connection,
provider: providerConfig.provider
};

const {
success,
error,
response: configBody
} = (await proxyService.routeOrConfigure(externalConfig, internalConfig)) as ServiceResponse<ApplicationConstructedProxyConfiguration>;

if (!success || !configBody) {
const { success, error, response: proxyConfig, activityLogs } = proxyService.configure(externalConfig, internalConfig);
if (activityLogId) {
await updateEndpointActivityLog(activityLogId, externalConfig.endpoint);
for (const log of activityLogs) {
switch (log.level) {
case 'error':
await createActivityLogMessageAndEnd(log);
break;
default:
await createActivityLogMessage(log);
break;
}
}
}
if (!success || !proxyConfig || error) {
errorManager.errResFromNangoErr(res, error);

return;
}

await this.sendToHttpMethod(res, next, method as HTTP_VERB, configBody, activityLogId as number, environment_id, isSync, isDryRun);
await this.sendToHttpMethod(res, next, method as HTTP_VERB, proxyConfig, activityLogId as number, environment_id, isSync, isDryRun);
} catch (error) {
const environmentId = getEnvironmentId(res);
const connectionId = req.get('Connection-Id') as string;
const providerConfigKey = req.get('Provider-Config-Key') as string;

await errorManager.report(error, {
errorManager.report(error, {
source: ErrorSourceEnum.PLATFORM,
operation: LogActionEnum.PROXY,
environmentId,
Expand Down Expand Up @@ -149,8 +180,8 @@ class ProxyController {
configBody: ApplicationConstructedProxyConfiguration,
activityLogId: number,
environment_id: number,
isSync?: string,
isDryRun?: string
isSync?: boolean,
isDryRun?: boolean
) {
const url = proxyService.constructUrl(configBody);
let decompress = false;
Expand All @@ -159,17 +190,7 @@ class ProxyController {
decompress = true;
}

if (method === 'POST') {
return this.post(res, next, url, configBody, activityLogId, environment_id, decompress, isSync, isDryRun);
} else if (method === 'PATCH') {
return this.patch(res, next, url, configBody, activityLogId, environment_id, decompress, isSync, isDryRun);
} else if (method === 'PUT') {
return this.put(res, next, url, configBody, activityLogId, environment_id, decompress, isSync, isDryRun);
} else if (method === 'DELETE') {
return this.delete(res, next, url, configBody, activityLogId, environment_id, decompress, isSync, isDryRun);
} else {
return this.get(res, next, url, configBody, activityLogId, environment_id, decompress, isSync, isDryRun);
}
return this.request(res, next, method, url, configBody, activityLogId, environment_id, decompress, isSync, isDryRun, configBody.data);
}

private async handleResponse(
Expand All @@ -179,8 +200,8 @@ class ProxyController {
activityLogId: number,
environment_id: number,
url: string,
isSync?: string,
isDryRun?: string
isSync?: boolean,
isDryRun?: boolean
) {
if (!isSync) {
await updateSuccessActivityLog(activityLogId, true);
Expand Down Expand Up @@ -275,200 +296,53 @@ class ProxyController {
* Get
* @param {Response} res Express response object
* @param {NextFuncion} next callback function to pass control to the next middleware function in the pipeline.
* @param {HTTP_VERB} method
* @param {string} url
* @param {ApplicationConstructedProxyConfiguration} config
*/
private async get(
res: Response,
_next: NextFunction,
url: string,
config: ApplicationConstructedProxyConfiguration,
activityLogId: number,
environment_id: number,
decompress: boolean,
isSync?: string,
isDryRun?: string
) {
try {
const headers = proxyService.constructHeaders(config);

const responseStream: AxiosResponse = await backOff(
() => {
return axios({
method: 'get',
url,
responseType: 'stream',
headers,
decompress
});
},
{ numOfAttempts: Number(config.retries), retry: proxyService.retry.bind(this, activityLogId, environment_id, config) }
);

this.handleResponse(res, responseStream, config, activityLogId, environment_id, url, isSync, isDryRun);
} catch (e: unknown) {
this.handleErrorResponse(res, e, url, config, activityLogId, environment_id);
}
}

/**
* Post
* @param {Response} res Express response object
* @param {NextFuncion} next callback function to pass control to the next middleware function in the pipeline.
* @param {string} url
* @param {ApplicationConstructedProxyConfiguration} config
*/
private async post(
res: Response,
_next: NextFunction,
url: string,
config: ApplicationConstructedProxyConfiguration,
activityLogId: number,
environment_id: number,
decompress: boolean,
isSync?: string,
isDryRun?: string
) {
try {
const headers = proxyService.constructHeaders(config);
const responseStream: AxiosResponse = await backOff(
() => {
return axios({
method: 'post',
url,
data: config.data ?? {},
responseType: 'stream',
headers,
decompress
});
},
{ numOfAttempts: Number(config.retries), retry: proxyService.retry.bind(this, activityLogId, environment_id, config) }
);

this.handleResponse(res, responseStream, config, activityLogId, environment_id, url, isSync, isDryRun);
} catch (error) {
this.handleErrorResponse(res, error, url, config, activityLogId, environment_id);
}
}

/**
* Patch
* @param {Response} res Express response object
* @param {NextFuncion} next callback function to pass control to the next middleware function in the pipeline.
* @param {string} url
* @param {ApplicationConstructedProxyConfiguration} config
*/
private async patch(
res: Response,
_next: NextFunction,
url: string,
config: ApplicationConstructedProxyConfiguration,
activityLogId: number,
environment_id: number,
decompress: boolean,
isSync?: string,
isDryRun?: string
) {
try {
const headers = proxyService.constructHeaders(config);
const responseStream: AxiosResponse = await backOff(
() => {
return axios({
method: 'patch',
url,
data: config.data ?? {},
responseType: 'stream',
headers,
decompress
});
},
{ numOfAttempts: Number(config.retries), retry: proxyService.retry.bind(this, activityLogId, environment_id, config) }
);

this.handleResponse(res, responseStream, config, activityLogId, environment_id, url, isSync, isDryRun);
} catch (error) {
this.handleErrorResponse(res, error, url, config, activityLogId, environment_id);
}
}

/**
* Put
* @param {Response} res Express response object
* @param {NextFuncion} next callback function to pass control to the next middleware function in the pipeline.
* @param {string} url
* @param {ApplicationConstructedProxyConfiguration} config
*/
private async put(
private async request(
res: Response,
_next: NextFunction,
method: HTTP_VERB,
url: string,
config: ApplicationConstructedProxyConfiguration,
activityLogId: number,
environment_id: number,
decompress: boolean,
isSync?: string,
isDryRun?: string
isSync?: boolean,
isDryRun?: boolean,
data?: unknown
) {
try {
const activityLogs: ActivityLogMessage[] = [];
const headers = proxyService.constructHeaders(config);
const requestConfig: AxiosRequestConfig = {
method,
url,
responseType: 'stream',
headers,
decompress
};
if (['POST', 'PUT', 'PATCH'].includes(method)) {
requestConfig.data = data || {};
}
const responseStream: AxiosResponse = await backOff(
() => {
return axios({
method: 'put',
url,
data: config.data ?? {},
responseType: 'stream',
headers,
decompress
});
return axios(requestConfig);
},
{ numOfAttempts: Number(config.retries), retry: proxyService.retry.bind(this, activityLogId, environment_id, config) }
{ numOfAttempts: Number(config.retries), retry: proxyService.retry.bind(this, activityLogId, environment_id, config, activityLogs) }
);
activityLogs.forEach((activityLogMessage) => {
createActivityLogMessage(activityLogMessage);
});

this.handleResponse(res, responseStream, config, activityLogId, environment_id, url, isSync, isDryRun);
} catch (error) {
this.handleErrorResponse(res, error, url, config, activityLogId, environment_id);
}
}

/**
* Delete
* @param {Response} res Express response object
* @param {NextFuncion} next callback function to pass control to the next middleware function in the pipeline.
* @param {string} url
* @param {ApplicationConstructedProxyConfiguration} config
*/
private async delete(
res: Response,
_next: NextFunction,
url: string,
config: ApplicationConstructedProxyConfiguration,
activityLogId: number,
environment_id: number,
decompress: boolean,
isSync?: string,
isDryRun?: string
) {
try {
const headers = proxyService.constructHeaders(config);
const responseStream: AxiosResponse = await backOff(
() => {
return axios({
method: 'delete',
url,
responseType: 'stream',
headers,
decompress
});
},
{ numOfAttempts: Number(config.retries), retry: proxyService.retry.bind(this, activityLogId, environment_id, config) }
);
this.handleResponse(res, responseStream, config, activityLogId, environment_id, url, isSync, isDryRun);
} catch (e) {
this.handleErrorResponse(res, e, url, config, activityLogId, environment_id);
}
}

private async reportError(
error: AxiosError,
url: string,
Expand Down
Loading

0 comments on commit af00599

Please sign in to comment.