Skip to content

Commit

Permalink
feat: correlation feature (#394)
Browse files Browse the repository at this point in the history
  • Loading branch information
praveen5959 authored Jan 13, 2025
1 parent e204794 commit 5728b53
Show file tree
Hide file tree
Showing 26 changed files with 3,455 additions and 6 deletions.
23 changes: 22 additions & 1 deletion src/api/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Axios } from './axios';
import { LOG_QUERY_URL } from './constants';
import { Log, LogsQuery, LogsResponseWithHeaders } from '@/@types/parseable/api/query';
import timeRangeUtils from '@/utils/timeRangeUtils';
import { QueryBuilder } from '@/utils/queryBuilder';
import { CorrelationQueryBuilder, QueryBuilder } from '@/utils/queryBuilder';

const { formatDateAsCastType } = timeRangeUtils;
type QueryLogs = {
Expand All @@ -13,6 +13,15 @@ type QueryLogs = {
pageOffset: number;
};

type CorrelationLogs = {
streamNames: string[];
startTime: Date;
endTime: Date;
limit: number;
correlationCondition?: string;
selectedFields?: string[];
};

// to optimize query performace, it has been decided to round off the time at the given level
const optimizeTime = (date: Date) => {
const tempDate = new Date(date);
Expand Down Expand Up @@ -53,6 +62,18 @@ export const getQueryLogsWithHeaders = (logsQuery: QueryLogs) => {
return Axios().post<LogsResponseWithHeaders>(endPoint, formQueryOpts(logsQuery), {});
};

export const getCorrelationQueryLogsWithHeaders = (logsQuery: CorrelationLogs) => {
const queryBuilder = new CorrelationQueryBuilder(logsQuery);
const endPoint = LOG_QUERY_URL({ fields: true }, queryBuilder.getResourcePath());
return Axios().post<LogsResponseWithHeaders>(endPoint, queryBuilder.getCorrelationQuery(), {});
};

export const getStreamDataWithHeaders = (logsQuery: CorrelationLogs) => {
const queryBuilder = new CorrelationQueryBuilder(logsQuery);
const endPoint = LOG_QUERY_URL({ fields: true }, queryBuilder.getResourcePath());
return Axios().post<LogsResponseWithHeaders>(endPoint, queryBuilder.getQuery(), {});
};

// ------ Custom sql query

const makeCustomQueryRequestData = (logsQuery: LogsQuery, query: string) => {
Expand Down
21 changes: 21 additions & 0 deletions src/assets/images/correlation_placeholder.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
28 changes: 28 additions & 0 deletions src/components/Navbar/components/CorrelationIcon.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { forwardRef } from 'react';

export const CorrelationIcon = forwardRef<
SVGSVGElement,
{
stroke?: string;
strokeWidth?: number;
}
>(({ stroke, strokeWidth }, ref) => (
<svg ref={ref} height="1.2rem" width="1.2rem" viewBox="0 0 24 24" fill="none" xmlns="http://www.w3.org/2000/svg">
<path
d="M13.3333 17.3333L14.6667 18.6667C15.0203 19.0203 15.4999 19.219 16 19.219C16.5001 19.219 16.9797 19.0203 17.3333 18.6667L22.6667 13.3333C23.0203 12.9797 23.219 12.5001 23.219 12C23.219 11.4999 23.0203 11.0203 22.6667 10.6667L17.3333 5.33333C16.9797 4.97971 16.5001 4.78105 16 4.78105C15.4999 4.78105 15.0203 4.97971 14.6667 5.33333L9.33333 10.6667C8.97971 11.0203 8.78105 11.4999 8.78105 12C8.78105 12.5001 8.97971 12.9797 9.33333 13.3333L10.6667 14.6667"
stroke={stroke}
strokeWidth={strokeWidth}
strokeLinecap="round"
strokeLinejoin="round"
/>
<path
d="M10.6667 6.66667L9.33333 5.33333C8.97971 4.97971 8.5001 4.78105 8 4.78105C7.4999 4.78105 7.02029 4.97971 6.66667 5.33333L1.33333 10.6667C0.979711 11.0203 0.781049 11.4999 0.781049 12C0.781049 12.5001 0.979711 12.9797 1.33333 13.3333L6.66667 18.6667C7.02029 19.0203 7.4999 19.219 8 19.219C8.5001 19.219 8.97971 19.0203 9.33333 18.6667L14.6667 13.3333C15.0203 12.9797 15.219 12.5001 15.219 12C15.219 11.4999 15.0203 11.0203 14.6667 10.6667L13.3333 9.33333"
stroke={stroke}
strokeWidth={strokeWidth}
strokeLinecap="round"
strokeLinejoin="round"
/>
</svg>
));

CorrelationIcon.displayName = 'CorrelationIcon';
25 changes: 23 additions & 2 deletions src/components/Navbar/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@ import { FC, useCallback, useEffect } from 'react';
import { useLocation, useParams } from 'react-router-dom';
import { useNavigate } from 'react-router-dom';
import { useDisclosure } from '@mantine/hooks';
import { HOME_ROUTE, CLUSTER_ROUTE, USERS_MANAGEMENT_ROUTE, STREAM_ROUTE, DASHBOARDS_ROUTE } from '@/constants/routes';
import {
HOME_ROUTE,
CLUSTER_ROUTE,
USERS_MANAGEMENT_ROUTE,
STREAM_ROUTE,
DASHBOARDS_ROUTE,
CORRELATION_ROUTE,
} from '@/constants/routes';
import InfoModal from './infoModal';
import { getStreamsSepcificAccess, getUserSepcificStreams } from './rolesHandler';
import Cookies from 'js-cookie';
Expand All @@ -26,6 +33,7 @@ import UserModal from './UserModal';
import { signOutHandler } from '@/utils';
import { appStoreReducers, useAppStore } from '@/layouts/MainLayout/providers/AppProvider';
import _ from 'lodash';
import { CorrelationIcon } from './components/CorrelationIcon';

const { setUserRoles, setUserSpecificStreams, setUserAccessMap, changeStream, setStreamSpecificUserAccess } =
appStoreReducers;
Expand All @@ -49,6 +57,12 @@ const navItems = [
path: '/explore',
route: STREAM_ROUTE,
},
{
icon: CorrelationIcon,
label: 'Correlation',
path: '/correlation',
route: CORRELATION_ROUTE,
},
];

const previlagedActions = [
Expand Down Expand Up @@ -167,7 +181,14 @@ const Navbar: FC = () => {
onClick={() => navigateToPage(navItem.route)}
key={index}>
<Tooltip label={navItem.label} position="right">
<navItem.icon stroke={isActiveItem ? 1.4 : 1.2} size={'1.2rem'} />
{navItem.label === 'Correlation' ? (
<navItem.icon
stroke={isActiveItem ? '#000000' : '#858e96'}
strokeWidth={isActiveItem ? 1.4 : 1.2}
/>
) : (
<navItem.icon stroke={isActiveItem ? '1.4' : '1.2'} size="1.2rem" />
)}
</Tooltip>
</Stack>
);
Expand Down
2 changes: 2 additions & 0 deletions src/constants/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export const OIDC_NOT_CONFIGURED_ROUTE = '/oidc-not-configured';
export const CLUSTER_ROUTE = '/cluster';
export const STREAM_ROUTE = '/:streamName/:view?';
export const DASHBOARDS_ROUTE = '/dashboards';
export const CORRELATION_ROUTE = '/correlation';

export const STREAM_VIEWS = ['explore', 'manage', 'live-tail'];

Expand All @@ -27,4 +28,5 @@ export const PATHS = {
cluster: '/cluster',
manage: '/:streamName/:view?',
dashboards: '/dashboards',
correlation: '/correlation',
} as { [key: string]: string };
85 changes: 85 additions & 0 deletions src/hooks/useCorrelationQueryLogs.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { getCorrelationQueryLogsWithHeaders } from '@/api/query';
import { StatusCodes } from 'http-status-codes';
import useMountedState from './useMountedState';
import { useAppStore } from '@/layouts/MainLayout/providers/AppProvider';
import _ from 'lodash';
import { AxiosError } from 'axios';
import { useStreamStore } from '@/pages/Stream/providers/StreamProvider';
import {
CORRELATION_LOAD_LIMIT,
correlationStoreReducers,
useCorrelationStore,
} from '@/pages/Correlation/providers/CorrelationProvider';
import { notifyError } from '@/utils/notification';
import { useQuery } from 'react-query';
import { LogsResponseWithHeaders } from '@/@types/parseable/api/query';

const { setStreamData } = correlationStoreReducers;

export const useCorrelationQueryLogs = () => {
const [error, setError] = useMountedState<string | null>(null);
const [{ selectedFields, correlationCondition, fields }, setCorrelationStore] = useCorrelationStore((store) => store);
const [streamInfo] = useStreamStore((store) => store.info);
const [currentStream] = useAppStore((store) => store.currentStream);
const timePartitionColumn = _.get(streamInfo, 'time_partition', 'p_timestamp');
const [timeRange] = useAppStore((store) => store.timeRange);
const [
{
tableOpts: { currentOffset },
},
] = useCorrelationStore((store) => store);
const streamNames = Object.keys(fields);

const defaultQueryOpts = {
startTime: timeRange.startTime,
endTime: timeRange.endTime,
limit: CORRELATION_LOAD_LIMIT,
pageOffset: currentOffset,
timePartitionColumn,
selectedFields: _.flatMap(selectedFields, (values, key) => _.map(values, (value) => `${key}.${value}`)) || [],
correlationCondition: correlationCondition,
};

const {
isLoading: logsLoading,
isRefetching: logsRefetching,
refetch: getCorrelationData,
} = useQuery(
['fetch-logs', defaultQueryOpts],
async () => {
const queryOpts = { ...defaultQueryOpts, streamNames };
const response = await getCorrelationQueryLogsWithHeaders(queryOpts);
return [response];
},
{
enabled: false,
refetchOnWindowFocus: false,
onSuccess: async (responses) => {
responses.map((data: { data: LogsResponseWithHeaders; status: StatusCodes }) => {
const logs = data.data;
const isInvalidResponse = _.isEmpty(logs) || _.isNil(logs) || data.status !== StatusCodes.OK;
if (isInvalidResponse) return setError('Failed to query logs');

const { records, fields } = logs;
if (fields.length > 0 && !correlationCondition) {
return setCorrelationStore((store) => setStreamData(store, currentStream || '', records));
} else if (fields.length > 0 && correlationCondition) {
return setCorrelationStore((store) => setStreamData(store, 'correlatedStream', records));
} else {
notifyError({ message: `${currentStream} doesn't have any fields` });
}
});
},
onError: (data: AxiosError) => {
const errorMessage = data.response?.data as string;
setError(_.isString(errorMessage) && !_.isEmpty(errorMessage) ? errorMessage : 'Failed to query logs');
},
},
);

return {
error,
loading: logsLoading || logsRefetching,
getCorrelationData,
};
};
104 changes: 104 additions & 0 deletions src/hooks/useFetchStreamData.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import { getStreamDataWithHeaders } from '@/api/query';
import { StatusCodes } from 'http-status-codes';
import useMountedState from './useMountedState';
import { useAppStore } from '@/layouts/MainLayout/providers/AppProvider';
import _ from 'lodash';
import { AxiosError } from 'axios';
import { useStreamStore } from '@/pages/Stream/providers/StreamProvider';
import {
correlationStoreReducers,
CORRELATION_LOAD_LIMIT,
useCorrelationStore,
} from '@/pages/Correlation/providers/CorrelationProvider';
import { notifyError } from '@/utils/notification';
import { useQuery } from 'react-query';
import { LogsResponseWithHeaders } from '@/@types/parseable/api/query';
import { useRef, useEffect } from 'react';

const { setStreamData } = correlationStoreReducers;

export const useFetchStreamData = () => {
const [error, setError] = useMountedState<string | null>(null);
const [{ selectedFields, correlationCondition, fields, streamData }, setCorrelationStore] = useCorrelationStore(
(store) => store,
);
const [streamInfo] = useStreamStore((store) => store.info);
const [currentStream] = useAppStore((store) => store.currentStream);
const timePartitionColumn = _.get(streamInfo, 'time_partition', 'p_timestamp');
const [timeRange] = useAppStore((store) => store.timeRange);
const [
{
tableOpts: { currentOffset },
},
] = useCorrelationStore((store) => store);
const streamNames = Object.keys(fields);

const prevTimeRangeRef = useRef({ startTime: timeRange.startTime, endTime: timeRange.endTime });

const hasTimeRangeChanged =
prevTimeRangeRef.current.startTime !== timeRange.startTime ||
prevTimeRangeRef.current.endTime !== timeRange.endTime;

useEffect(() => {
prevTimeRangeRef.current = { startTime: timeRange.startTime, endTime: timeRange.endTime };
}, [timeRange.startTime, timeRange.endTime]);

const defaultQueryOpts = {
startTime: timeRange.startTime,
endTime: timeRange.endTime,
limit: CORRELATION_LOAD_LIMIT,
pageOffset: currentOffset,
timePartitionColumn,
selectedFields: _.flatMap(selectedFields, (values, key) => _.map(values, (value) => `${key}.${value}`)) || [],
correlationCondition: correlationCondition,
};

const {
isLoading: logsLoading,
isRefetching: logsRefetching,
refetch: getFetchStreamData,
} = useQuery(
['fetch-logs', defaultQueryOpts],
async () => {
const streamsToFetch = hasTimeRangeChanged
? streamNames
: streamNames.filter((streamName) => !Object.keys(streamData).includes(streamName));

const fetchPromises = streamsToFetch.map((streamName) => {
const queryOpts = { ...defaultQueryOpts, streamNames: [streamName] };
return getStreamDataWithHeaders(queryOpts);
});
return Promise.all(fetchPromises);
},
{
enabled: false,
refetchOnWindowFocus: false,
onSuccess: async (responses) => {
responses.map((data: { data: LogsResponseWithHeaders; status: StatusCodes }) => {
const logs = data.data;
const isInvalidResponse = _.isEmpty(logs) || _.isNil(logs) || data.status !== StatusCodes.OK;
if (isInvalidResponse) return setError('Failed to query logs');

const { records, fields } = logs;
if (fields.length > 0 && !correlationCondition) {
return setCorrelationStore((store) => setStreamData(store, currentStream || '', records));
} else if (fields.length > 0 && correlationCondition) {
return setCorrelationStore((store) => setStreamData(store, 'correlatedStream', records));
} else {
notifyError({ message: `${currentStream} doesn't have any fields` });
}
});
},
onError: (data: AxiosError) => {
const errorMessage = data.response?.data as string;
setError(_.isString(errorMessage) && !_.isEmpty(errorMessage) ? errorMessage : 'Failed to query logs');
},
},
);

return {
error,
loading: logsLoading || logsRefetching,
getFetchStreamData,
};
};
45 changes: 45 additions & 0 deletions src/hooks/useGetCorrelationStreamSchema.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { getLogStreamSchema } from '@/api/logStream';
import { AxiosError, isAxiosError } from 'axios';
import _ from 'lodash';
import { useQuery } from 'react-query';
import { useState } from 'react';
import { correlationStoreReducers, useCorrelationStore } from '@/pages/Correlation/providers/CorrelationProvider';

const { setStreamSchema } = correlationStoreReducers;

export const useGetStreamSchema = (opts: { streamName: string }) => {
const { streamName } = opts;
const [, setCorrelationStore] = useCorrelationStore((_store) => null);

const [errorMessage, setErrorMesssage] = useState<string | null>(null);

const { isError, isSuccess, isLoading, isRefetching } = useQuery(
['stream-schema', streamName],
() => getLogStreamSchema(streamName),
{
retry: false,
enabled: streamName !== '' && streamName !== 'correlatedStream',
refetchOnWindowFocus: false,
onSuccess: (data) => {
setErrorMesssage(null);
setCorrelationStore((store) => setStreamSchema(store, data.data, streamName));
},
onError: (data: AxiosError) => {
if (isAxiosError(data) && data.response) {
const error = data.response?.data as string;
typeof error === 'string' && setErrorMesssage(error);
} else if (data.message && typeof data.message === 'string') {
setErrorMesssage(data.message);
}
},
},
);

return {
isSuccess,
isError,
isLoading,
errorMessage,
isRefetching,
};
};
Loading

0 comments on commit 5728b53

Please sign in to comment.