Skip to content

Commit

Permalink
Refactoring DataFetcher.fetch (#1979)
Browse files Browse the repository at this point in the history
  • Loading branch information
melikhov-dev authored Dec 26, 2024
1 parent 70eb150 commit daa0d81
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,11 @@ type PromiseWithAbortController = [Promise<unknown>, AbortController];
type DataFetcherOptions = {
chartsEngine: ChartsEngine;
sources: Record<string, Source | string>;
req: Request;
ctx?: AppContext;
/**
* @deprecated will be removed
*/
req?: Request;
ctx: AppContext;
postprocess?:
| ((
data: Record<string, DataFetcherResult>,
Expand All @@ -73,6 +76,17 @@ type DataFetcherOptions = {
userLogin?: string | null;
iamToken?: string | null;
workbookId?: WorkbookId;
isEmbed?: boolean;
zitadelParams?: ZitadelParams | undefined;
originalReqHeaders?: DataFetcherOriginalReqHeaders;
adapterContext?: AdapterContext;
};

export type DataFetcherOriginalReqHeaders = {
xRealIP: IncomingHttpHeaders['x-real-ip'];
xForwardedFor: IncomingHttpHeaders['x-forwarded-for'];
xChartsFetcherVia: IncomingHttpHeaders['x-charts-fetcher-via'];
referer: IncomingHttpHeaders['referer'];
};

type DataFetcherRequestOptions = {
Expand Down Expand Up @@ -184,11 +198,11 @@ export class DataFetcher {
userLogin,
iamToken,
workbookId,
isEmbed = false,
zitadelParams,
originalReqHeaders,
adapterContext,
}: DataFetcherOptions): Promise<Record<string, DataFetcherResult>> {
// TODO: remove aftex extension will be migrated
if (ctx === undefined) {
ctx = req.ctx;
}
const fetchingTimeout = chartsEngine.config.fetchingTimeout || DEFAULT_FETCHING_TIMEOUT;

const fetchingStartTime = Date.now();
Expand All @@ -209,27 +223,34 @@ export class DataFetcher {
const queue = new PQueue({concurrency: CONCURRENT_REQUESTS_LIMIT});
const fetchPromisesList: (() => unknown)[] = [];

const isEmbed = req.headers[DL_EMBED_TOKEN_HEADER] !== undefined;

const zitadelParams = ctx.config.isZitadelEnabled
? {
accessToken: req.user?.accessToken,
serviceUserAccessToken: req.serviceUserAccessToken,
}
: undefined;
// TODO: will be removed after migrations
if (req) {
isEmbed = req.headers[DL_EMBED_TOKEN_HEADER] !== undefined;

zitadelParams = ctx.config.isZitadelEnabled
? {
accessToken: req.user?.accessToken,
serviceUserAccessToken: req.serviceUserAccessToken,
}
: undefined;

originalReqHeaders = {
xRealIP: req.headers['x-real-ip'],
xForwardedFor: req.headers['x-forwarded-for'],
xChartsFetcherVia: req.headers['x-charts-fetcher-via'],
referer: req.headers.referer,
};
adapterContext = {
headers: {
['x-forwarded-for']: req.headers['x-forwarded-for'],
cookie: req.headers.cookie,
},
};
}

const originalReqHeaders = {
xRealIP: req.headers['x-real-ip'],
xForwardedFor: req.headers['x-forwarded-for'],
xChartsFetcherVia: req.headers['x-charts-fetcher-via'],
referer: req.headers.referer,
};
const adapterContext: AdapterContext = {
headers: {
['x-forwarded-for']: req.headers['x-forwarded-for'],
cookie: req.headers.cookie,
},
};
if (!originalReqHeaders || !adapterContext) {
throw new Error('Missing original request headers or adapter context');
}

Object.keys(sources).forEach((sourceName) => {
const source = sources[sourceName];
Expand All @@ -251,8 +272,9 @@ export class DataFetcher {
workbookId,
isEmbed,
zitadelParams,
originalReqHeaders,
adapterContext,
originalReqHeaders:
originalReqHeaders as DataFetcherOriginalReqHeaders,
adapterContext: adapterContext as AdapterContext,
})
: {
sourceId: sourceName,
Expand Down Expand Up @@ -297,12 +319,12 @@ export class DataFetcher {

failed[result.sourceId] = filterObjectWhitelist(
entry,
chartsEngine.config.runResponseWhitelist,
ctx.config.runResponseWhitelist,
);
} else {
fetched[result.sourceId] = filterObjectWhitelist(
result,
chartsEngine.config.runResponseWhitelist,
ctx.config.runResponseWhitelist,
) as DataFetcherResult;
}
});
Expand Down Expand Up @@ -486,16 +508,11 @@ export class DataFetcher {
workbookId?: WorkbookId;
isEmbed: boolean;
zitadelParams: ZitadelParams | undefined;
originalReqHeaders: {
xRealIP: IncomingHttpHeaders['x-real-ip'];
xForwardedFor: IncomingHttpHeaders['x-forwarded-for'];
xChartsFetcherVia: IncomingHttpHeaders['x-charts-fetcher-via'];
referer: IncomingHttpHeaders['referer'];
};
originalReqHeaders: DataFetcherOriginalReqHeaders;
adapterContext: AdapterContext;
}) {
const singleFetchingTimeout =
chartsEngine.config.singleFetchingTimeout || DEFAULT_SINGLE_FETCHING_TIMEOUT;
ctx.config.singleFetchingTimeout || DEFAULT_SINGLE_FETCHING_TIMEOUT;

const onDataFetched = chartsEngine.telemetryCallbacks.onDataFetched || (() => {});
const onDataFetchingFailed =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type {AppContext} from '@gravity-ui/nodekit';
import {flow} from 'lodash';

import type {ChartsEngine} from '../../index';
import type {HooksContext} from '../../types';

export class HookError extends Error {
hookError: {
Expand Down Expand Up @@ -56,11 +57,16 @@ export class ProcessorHooks {
config,
isEditMode,
ctx,
hooksContext,
}: {
/**
* @deprecated will be removed
*/
req: Request;
config: Record<string, any>;
isEditMode: boolean;
ctx: AppContext;
hooksContext: HooksContext;
}) {
let hrStart;

Expand All @@ -75,6 +81,7 @@ export class ProcessorHooks {
config,
isEditMode,
ctx,
hooksContext,
});
ctx.log(`Hook ${hookName} process`);
await hook.process();
Expand Down
23 changes: 20 additions & 3 deletions src/server/components/charts-engine/components/processor/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ import {DL_CONTEXT_HEADER, Feature, isEnabledServerFeature} from '../../../../..
import {renderHTML} from '../../../../../shared/modules/markdown/markdown';
import {registry} from '../../../../registry';
import {config as configConstants} from '../../constants';
import type {Source} from '../../types';
import type {AdapterContext, HooksContext, Source} from '../../types';
import * as Storage from '../storage';
import type {ResolvedConfig} from '../storage/types';
import {getDuration, normalizeParams, resolveParams} from '../utils';

import type {CommentsFetcherPrepareCommentsParams} from './comments-fetcher';
import {CommentsFetcher} from './comments-fetcher';
import type {LogItem} from './console';
import type {DataFetcherResult} from './data-fetcher';
import type {DataFetcherOriginalReqHeaders, DataFetcherResult, ZitadelParams} from './data-fetcher';
import {DataFetcher} from './data-fetcher';
import {ProcessorHooks} from './hooks';
import {updateActionParams, updateParams} from './paramsUtils';
Expand Down Expand Up @@ -152,6 +152,9 @@ export type ProcessorParams = {
userLang: string | null;
userId: string | null;
iamToken: string | null;
/**
* @deprecated will be removed
*/
req: Request;
responseOptions?: Record<string, string | boolean>;
uiOnly?: boolean;
Expand All @@ -165,6 +168,11 @@ export type ProcessorParams = {
disableJSONFnByCookie: boolean;
configName: string;
configId: string;
isEmbed: boolean;
zitadelParams: ZitadelParams | undefined;
originalReqHeaders: DataFetcherOriginalReqHeaders;
adapterContext: AdapterContext;
hooksContext: HooksContext;
};

export class Processor {
Expand Down Expand Up @@ -192,6 +200,11 @@ export class Processor {
disableJSONFnByCookie,
configName,
configId,
isEmbed,
zitadelParams,
originalReqHeaders,
adapterContext,
hooksContext,
}: ProcessorParams): Promise<
ProcessorSuccessResponse | ProcessorErrorResponse | {error: string}
> {
Expand Down Expand Up @@ -324,6 +337,7 @@ export class Processor {
},
isEditMode,
ctx,
hooksContext,
});

if (resultHooksInit.status === ProcessorHooks.STATUS.FAILED) {
Expand Down Expand Up @@ -556,13 +570,16 @@ export class Processor {
resolvedSources = await DataFetcher.fetch({
chartsEngine,
sources,
req,
ctx,
iamToken,
subrequestHeaders,
userId,
userLogin,
workbookId,
isEmbed,
zitadelParams,
originalReqHeaders,
adapterContext,
});

if (Object.keys(resolvedSources).length) {
Expand Down
35 changes: 35 additions & 0 deletions src/server/components/charts-engine/runners/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type {ControlType, EntryPublicAuthor, WorkbookId} from '../../../../share
import {
DISABLE,
DISABLE_JSONFN_SWITCH_MODE_COOKIE_NAME,
DL_EMBED_TOKEN_HEADER,
Feature,
isEnabledServerFeature,
} from '../../../../shared';
Expand Down Expand Up @@ -193,6 +194,35 @@ export function commonRunner({
const configId = req.body.id;
const disableJSONFnByCookie = req.cookies[DISABLE_JSONFN_SWITCH_MODE_COOKIE_NAME] === DISABLE;

const isEmbed = req.headers[DL_EMBED_TOKEN_HEADER] !== undefined;

const zitadelParams = ctx.config.isZitadelEnabled
? {
accessToken: req.user?.accessToken,
serviceUserAccessToken: req.serviceUserAccessToken,
}
: undefined;

const originalReqHeaders = {
xRealIP: req.headers['x-real-ip'],
xForwardedFor: req.headers['x-forwarded-for'],
xChartsFetcherVia: req.headers['x-charts-fetcher-via'],
referer: req.headers.referer,
};
const adapterContext = {
headers: {
['x-forwarded-for']: req.headers['x-forwarded-for'],
cookie: req.headers.cookie,
},
};

const hooksContext = {
headers: {
cookie: req.headers.cookie,
authorization: req.headers.authorization,
},
};

const processorParams: Omit<ProcessorParams, 'ctx'> = {
chartsEngine,
paramsOverride: params,
Expand All @@ -212,6 +242,11 @@ export function commonRunner({
configName,
configId,
disableJSONFnByCookie,
isEmbed,
zitadelParams,
originalReqHeaders,
adapterContext,
hooksContext,
};

if (req.body.unreleased === 1) {
Expand Down
7 changes: 7 additions & 0 deletions src/server/components/charts-engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ export type AdapterContext = {
};
};

export type HooksContext = {
headers: {
cookie: IncomingHttpHeaders['cookie'];
authorization: IncomingHttpHeaders['authorization'];
};
};

export type SourceConfig = {
description?: {
title: {
Expand Down

0 comments on commit daa0d81

Please sign in to comment.