Skip to content

Commit

Permalink
Merge pull request #1408 from grid-js/fix-multiple-reqs
Browse files Browse the repository at this point in the history
Fix the multiple server-side requests bug
  • Loading branch information
afshinm authored Jan 9, 2024
2 parents 6275e01 + da64c76 commit 4219ea6
Show file tree
Hide file tree
Showing 15 changed files with 259 additions and 113 deletions.
3 changes: 3 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ export interface Config {
/** to parse a HTML table and load the data */
from: HTMLElement;
storage: Storage<any>;

Check warning on line 38 in src/config.ts

View workflow job for this annotation

GitHub Actions / build (14.x)

Unexpected any. Specify a different type

Check warning on line 38 in src/config.ts

View workflow job for this annotation

GitHub Actions / build (14.x)

Unexpected any. Specify a different type

Check warning on line 38 in src/config.ts

View workflow job for this annotation

GitHub Actions / build (16.x)

Unexpected any. Specify a different type

Check warning on line 38 in src/config.ts

View workflow job for this annotation

GitHub Actions / build (16.x)

Unexpected any. Specify a different type
/** Pipeline process throttle timeout in milliseconds */
processingThrottleMs: number;
pipeline: Pipeline<Tabular>;
/** to automatically calculate the columns width */
autoWidth: boolean;
Expand Down Expand Up @@ -128,6 +130,7 @@ export class Config {
tableRef: createRef(),
width: '100%',
height: 'auto',
processingThrottleMs: 100,
autoWidth: true,
style: {},
className: {},
Expand Down
2 changes: 1 addition & 1 deletion src/hooks/useSelector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export default function useSelector<T>(selector: (state) => T) {
});

return unsubscribe;
}, []);
}, [store, current]);

return current;
}
2 changes: 1 addition & 1 deletion src/pipeline/extractor/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ interface StorageExtractorProps extends PipelineProcessorProps {
}

class StorageExtractor extends PipelineProcessor<
Promise<StorageResponse>,
StorageResponse,
StorageExtractorProps
> {
get type(): ProcessorType {
Expand Down
77 changes: 54 additions & 23 deletions src/pipeline/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ import { ID } from '../util/id';
import log from '../util/log';
import { EventEmitter } from '../util/eventEmitter';

interface PipelineEvents<T> {
interface PipelineEvents<R> {
/**
* Generic updated event. Triggers the callback function when the pipeline
* is updated, including when a new processor is registered, a processor's props
* get updated, etc.
*/
updated: (processor: PipelineProcessor<any, any>) => void;
updated: <T, P>(processor: PipelineProcessor<T, P>) => void;
/**
* Triggers the callback function when a new
* processor is registered successfully
Expand All @@ -27,27 +27,29 @@ interface PipelineEvents<T> {
* afterProcess will not be called if there is an
* error in the pipeline (i.e a step throw an Error)
*/
afterProcess: (prev: T) => void;
afterProcess: (prev: R) => void;
/**
* Triggers the callback function when the pipeline
* fails to process all steps or at least one step
* throws an Error
*/
error: (prev: T) => void;
error: <T>(prev: T) => void;
}

class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
class Pipeline<R> extends EventEmitter<PipelineEvents<R>> {
// available steps for this pipeline
private readonly _steps: Map<ProcessorType, PipelineProcessor<T, P>[]> =
new Map<ProcessorType, PipelineProcessor<T, P>[]>();
private readonly _steps: Map<
ProcessorType,
PipelineProcessor<unknown, unknown>[]
> = new Map<ProcessorType, PipelineProcessor<unknown, unknown>[]>();
// used to cache the results of processors using their id field
private cache: Map<string, any> = new Map<string, any>();
private cache: Map<string, unknown> = new Map<string, unknown>();
// keeps the index of the last updated processor in the registered
// processors list and will be used to invalidate the cache
// -1 means all new processors should be processed
private lastProcessorIndexUpdated = -1;

constructor(steps?: PipelineProcessor<any, any>[]) {
constructor(steps?: PipelineProcessor<unknown, unknown>[]) {
super();

if (steps) {
Expand All @@ -59,7 +61,7 @@ class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
* Clears the `cache` array
*/
clearCache(): void {
this.cache = new Map<string, any>();
this.cache = new Map<string, object>();
this.lastProcessorIndexUpdated = -1;
}

Expand All @@ -69,30 +71,57 @@ class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
* @param processor
* @param priority
*/
register(
processor: PipelineProcessor<any, any>,
register<T, P>(
processor: PipelineProcessor<T, P>,
priority: number = null,
): void {
if (!processor) return;
): PipelineProcessor<T, P> {
if (!processor) {
throw Error('Processor is not defined');
}

if (processor.type === null) {
throw Error('Processor type is not defined');
}

if (this.findProcessorIndexByID(processor.id) > -1) {
throw Error(`Processor ID ${processor.id} is already defined`);
}

// binding the propsUpdated callback to the Pipeline
processor.on('propsUpdated', this.processorPropsUpdated.bind(this));

this.addProcessorByPriority(processor, priority);
this.afterRegistered(processor);

return processor;
}

/**
* Tries to register a new processor
* @param processor
* @param priority
*/
tryRegister<T, P>(
processor: PipelineProcessor<T, P>,
priority: number = null,
): PipelineProcessor<T, P> | undefined {
try {
return this.register(processor, priority);
} catch (_) {
// noop
}

return undefined;
}

/**
* Removes a processor from the list
*
* @param processor
*/
unregister(processor: PipelineProcessor<any, any>): void {
unregister<T, P>(processor: PipelineProcessor<T, P>): void {
if (!processor) return;
if (this.findProcessorIndexByID(processor.id) === -1) return;

const subSteps = this._steps.get(processor.type);

Expand All @@ -111,7 +140,7 @@ class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
* @param processor
* @param priority
*/
private addProcessorByPriority(
private addProcessorByPriority<T, P>(
processor: PipelineProcessor<T, P>,
priority: number,
): void {
Expand Down Expand Up @@ -142,8 +171,8 @@ class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
/**
* Flattens the _steps Map and returns a list of steps with their correct priorities
*/
get steps(): PipelineProcessor<T, P>[] {
let steps: PipelineProcessor<T, P>[] = [];
get steps(): PipelineProcessor<unknown, unknown>[] {
let steps: PipelineProcessor<unknown, unknown>[] = [];

for (const type of this.getSortedProcessorTypes()) {
const subSteps = this._steps.get(type);
Expand All @@ -163,7 +192,7 @@ class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
*
* @param type
*/
getStepsByType(type: ProcessorType): PipelineProcessor<T, P>[] {
getStepsByType(type: ProcessorType): PipelineProcessor<unknown, unknown>[] {
return this.steps.filter((process) => process.type === type);
}

Expand All @@ -182,7 +211,7 @@ class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
*
* @param data
*/
async process(data?: T): Promise<T> {
async process(data?: R): Promise<R> {
const lastProcessorIndexUpdated = this.lastProcessorIndexUpdated;
const steps = this.steps;

Expand All @@ -197,11 +226,11 @@ class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
// updated processor was before "processor".
// This is to ensure that we always have correct and up to date
// data from processors and also to skip them when necessary
prev = await processor.process(prev);
prev = (await processor.process(prev)) as R;
this.cache.set(processor.id, prev);
} else {
// cached results already exist
prev = this.cache.get(processor.id);
prev = this.cache.get(processor.id) as R;
}
}
} catch (e) {
Expand Down Expand Up @@ -236,7 +265,9 @@ class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
* This is used to invalid or skip a processor in
* the process() method
*/
private setLastProcessorIndex(processor: PipelineProcessor<T, P>): void {
private setLastProcessorIndex<T, P>(
processor: PipelineProcessor<T, P>,
): void {
const processorIndex = this.findProcessorIndexByID(processor.id);

if (this.lastProcessorIndexUpdated > processorIndex) {
Expand Down
21 changes: 15 additions & 6 deletions src/pipeline/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// e.g. Extractor = 0 will be processed before Transformer = 1
import { generateUUID, ID } from '../util/id';
import { EventEmitter } from '../util/eventEmitter';
import { deepEqual } from '../util/deepEqual';

export enum ProcessorType {
Initiator,
Expand All @@ -15,8 +16,8 @@ export enum ProcessorType {
Limit,
}

interface PipelineProcessorEvents<T, P> {
propsUpdated: (processor: PipelineProcessor<T, P>) => void;
interface PipelineProcessorEvents {
propsUpdated: <T, P>(processor: PipelineProcessor<T, P>) => void;
beforeProcess: (...args) => void;
afterProcess: (...args) => void;
}
Expand All @@ -27,9 +28,9 @@ export interface PipelineProcessorProps {}
export abstract class PipelineProcessor<
T,
P extends Partial<PipelineProcessorProps>,
> extends EventEmitter<PipelineProcessorEvents<T, P>> {
> extends EventEmitter<PipelineProcessorEvents> {
public readonly id: ID;
private readonly _props: P;
private _props: P;

abstract get type(): ProcessorType;
protected abstract _process(...args): T | Promise<T>;
Expand Down Expand Up @@ -62,8 +63,16 @@ export abstract class PipelineProcessor<
}

setProps(props: Partial<P>): this {
Object.assign(this._props, props);
this.emit('propsUpdated', this);
const updatedProps = {
...this._props,
...props,
};

if (!deepEqual(updatedProps, this._props)) {
this._props = updatedProps;
this.emit('propsUpdated', this);
}

return this;
}

Expand Down
9 changes: 9 additions & 0 deletions src/util/deepEqual.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/**
* Returns true if both objects are equal
* @param a left object
* @param b right object
* @returns
*/
export function deepEqual<A, B>(a: A, b: B) {
return JSON.stringify(a) === JSON.stringify(b);
}
42 changes: 28 additions & 14 deletions src/util/throttle.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,35 @@
/**
* Throttle a given function
* @param fn Function to be called
* @param wait Throttle timeout in milliseconds
* @returns Throttled function
*/
export const throttle = (fn: (...args) => void, wait = 100) => {
let inThrottle: boolean;
let lastFn: ReturnType<typeof setTimeout>;
let lastTime: number;
let timeoutId: ReturnType<typeof setTimeout>;
let lastTime = Date.now();

const execute = (...args) => {
lastTime = Date.now();
fn(...args);
};

return (...args) => {
if (!inThrottle) {
fn(...args);
lastTime = Date.now();
inThrottle = true;
const currentTime = Date.now();
const elapsed = currentTime - lastTime;

if (elapsed >= wait) {
// If enough time has passed since the last call, execute the function immediately
execute(args);
} else {
clearTimeout(lastFn);
lastFn = setTimeout(() => {
if (Date.now() - lastTime >= wait) {
fn(...args);
lastTime = Date.now();
}
}, Math.max(wait - (Date.now() - lastTime), 0));
// If not enough time has passed, schedule the function call after the remaining delay
if (timeoutId) {
clearTimeout(timeoutId);
}

timeoutId = setTimeout(() => {
execute(args);
timeoutId = null;
}, wait - elapsed);
}
};
};
39 changes: 20 additions & 19 deletions src/view/container.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import log from '../util/log';
import { useEffect } from 'preact/hooks';
import * as actions from './actions';
import { useStore } from '../hooks/useStore';
import useSelector from '../../src/hooks/useSelector';
import { useConfig } from '../../src/hooks/useConfig';
import useSelector from '../hooks/useSelector';
import { useConfig } from '../hooks/useConfig';
import { throttle } from '../util/throttle';

export function Container() {
const config = useConfig();
Expand All @@ -19,6 +20,23 @@ export function Container() {
const tableRef = useSelector((state) => state.tableRef);
const tempRef = createRef();

const processPipeline = throttle(async () => {
dispatch(actions.SetLoadingData());

try {
const data = await config.pipeline.process();
dispatch(actions.SetData(data));

// TODO: do we need this setTimemout?
setTimeout(() => {
dispatch(actions.SetStatusToRendered());
}, 0);
} catch (e) {
log.error(e);
dispatch(actions.SetDataErrored());
}
}, config.processingThrottleMs);

useEffect(() => {
// set the initial header object
// we update the header width later when "data"
Expand All @@ -41,23 +59,6 @@ export function Container() {
}
}, [data, config, tempRef]);

const processPipeline = async () => {
dispatch(actions.SetLoadingData());

try {
const data = await config.pipeline.process();
dispatch(actions.SetData(data));

// TODO: do we need this setTimemout?
setTimeout(() => {
dispatch(actions.SetStatusToRendered());
}, 0);
} catch (e) {
log.error(e);
dispatch(actions.SetDataErrored());
}
};

return (
<div
role="complementary"
Expand Down
Loading

0 comments on commit 4219ea6

Please sign in to comment.