Skip to content

Commit

Permalink
feat(webui): Support viewing search results in context for JSON logs …
Browse files Browse the repository at this point in the history
…(clp-json). (y-scope#596)

Co-authored-by: Junhao Liao <[email protected]>
  • Loading branch information
haiqi96 and junhaoliao authored Nov 21, 2024
1 parent ee7e493 commit 7d85ad3
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,7 @@ def start_log_viewer_webui(
"MongoDbStreamFilesCollectionName": clp_config.results_cache.stream_collection_name,
"ClientDir": str(container_log_viewer_webui_dir / "client"),
"StreamFilesDir": str(container_clp_config.stream_output.directory),
"StreamTargetUncompressedSize": container_clp_config.stream_output.target_uncompressed_size,
"LogViewerDir": str(container_log_viewer_webui_dir / "yscope-log-viewer"),
}
settings_json = read_and_update_settings_json(settings_json_path, settings_json_updates)
Expand Down
2 changes: 1 addition & 1 deletion components/clp-py-utils/clp_py_utils/clp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ def dump_to_primitive_dict(self):


class StreamOutput(BaseModel):
directory: pathlib.Path = pathlib.Path("var") / "data" / "stream"
directory: pathlib.Path = pathlib.Path("var") / "data" / "streams"
target_uncompressed_size: int = 128 * 1024 * 1024

@validator("directory")
Expand Down
29 changes: 20 additions & 9 deletions components/log-viewer-webui/client/src/api/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,31 @@ import axios from "axios";
*/

/**
* Submits a job to extract the split of an original file that contains a given log event. The file
* is extracted as a CLP IR file.
* @typedef {object} ExtractJsonResp
* @property {number} begin_msg_ix
* @property {number} end_msg_ix
* @property {boolean} is_last_ir_chunk
* @property {string} orig_file_id
* @property {string} path
* @property {string} _id
*/

/**
* Submits a job to extract the stream that contains a given log event. The stream is extracted
* either as a CLP IR or a JSON Lines file.
*
* @param {number|string} origFileId The ID of the original file
* @param {number} logEventIdx The index of the log event
* @param {QUERY_JOB_TYPE} extractJobType
* @param {string} streamId
* @param {number} logEventIdx
* @param {Function} onUploadProgress Callback to handle upload progress events.
* @return {Promise<axios.AxiosResponse<ExtractIrResp>>}
* @return {Promise<axios.AxiosResponse<ExtractIrResp|ExtractJsonResp>>}
*/
const submitExtractIrJob = async (origFileId, logEventIdx, onUploadProgress) => {
const submitExtractStreamJob = async (extractJobType, streamId, logEventIdx, onUploadProgress) => {
return await axios.post(
"/query/extract-ir",
{logEventIdx, origFileId},
"/query/extract-stream",
{extractJobType, streamId, logEventIdx},
{onUploadProgress}
);
};

export {submitExtractIrJob};
export {submitExtractStreamJob};
50 changes: 43 additions & 7 deletions components/log-viewer-webui/client/src/ui/QueryStatus.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,37 @@ import {

import {AxiosError} from "axios";

import {submitExtractIrJob} from "../api/query.js";
import {submitExtractStreamJob} from "../api/query.js";
import {QUERY_LOADING_STATES} from "../typings/query.js";
import Loading from "./Loading.jsx";


let enumQueryType;
/* eslint-disable sort-keys */
/**
* Note: This enum is duplicated from server, as it is non-trivial to include server enums from the
* client.
*
* Enum of job types, matching the `QueryJobType` class in
* `job_orchestration.query_scheduler.constants`.
*
* @enum {number}
*/
const QUERY_JOB_TYPE = Object.freeze({
SEARCH_OR_AGGREGATION: (enumQueryType = 0),
EXTRACT_IR: ++enumQueryType,
EXTRACT_JSON: ++enumQueryType,
});
/* eslint-enable sort-keys */

/**
* Mapping between job type enums and stream type
*/
const EXTRACT_JOB_TYPE = Object.freeze({
ir: QUERY_JOB_TYPE.EXTRACT_IR,
json: QUERY_JOB_TYPE.EXTRACT_JSON,
});

/**
* Submits queries and renders the query states.
*
Expand All @@ -28,20 +54,30 @@ const QueryStatus = () => {
isFirstRun.current = false;

const searchParams = new URLSearchParams(window.location.search);
const origFileId = searchParams.get("origFileId");
const streamType = searchParams.get("type");
const streamId = searchParams.get("streamId");
const logEventIdx = searchParams.get("logEventIdx");
if (null === origFileId || null === logEventIdx) {
const error = "Either `origFileId` or `logEventIdx` are missing from the URL " +
"parameters. Note that non-IR-extraction queries are not supported at the moment.";

if (null === streamType || null === streamId || null === logEventIdx) {
const error = "Queries parameters are missing from the URL parameters.";
console.error(error);
setErrorMsg(error);

return;
}

const extractJobType = EXTRACT_JOB_TYPE[streamType];
if ("undefined" === typeof extractJobType) {
const error = `Unsupported Stream type: ${streamType}`;
console.error(error);
setErrorMsg(error);

return;
}

submitExtractIrJob(
origFileId,
submitExtractStreamJob(
extractJobType,
streamId,
Number(logEventIdx),
() => {
setQueryState(QUERY_LOADING_STATES.WAITING);
Expand Down
1 change: 1 addition & 0 deletions components/log-viewer-webui/server/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@

"ClientDir": "../client/dist",
"StreamFilesDir": "../../../build/clp-package/var/data/streams",
"StreamTargetUncompressedSize": 134217728,
"LogViewerDir": "../yscope-log-viewer/dist"
}
56 changes: 46 additions & 10 deletions components/log-viewer-webui/server/src/DbManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,18 @@ let enumQueryType;
const QUERY_JOB_TYPE = Object.freeze({
SEARCH_OR_AGGREGATION: (enumQueryType = 0),
EXTRACT_IR: ++enumQueryType,
EXTRACT_JSON: ++enumQueryType,
});
/* eslint-enable sort-keys */

/**
* List of valid extract job types.
*/
const EXTRACT_JOB_TYPES = Object.freeze([
QUERY_JOB_TYPE.EXTRACT_IR,
QUERY_JOB_TYPE.EXTRACT_JSON,
]);

/**
* Class to manage connections to the jobs database (MySQL) and results cache (MongoDB).
*/
Expand Down Expand Up @@ -101,20 +110,43 @@ class DbManager {
}

/**
* Submits an IR extraction job to the scheduler and waits for it to finish.
* Submits a stream extraction job to the scheduler and waits for it to finish.
*
* @param {object} jobConfig
* @param {number} jobType
* @param {number} logEventIdx
* @param {string} streamId
* @param {number} targetUncompressedSize
* @return {Promise<number|null>} The ID of the job or null if an error occurred.
*/
async submitAndWaitForExtractIrJob (jobConfig) {
async submitAndWaitForExtractStreamJob ({
jobType,
logEventIdx,
streamId,
targetUncompressedSize,
}) {
let jobConfig;
if (QUERY_JOB_TYPE.EXTRACT_IR === jobType) {
jobConfig = {
file_split_id: null,
msg_ix: logEventIdx,
orig_file_id: streamId,
target_uncompressed_size: targetUncompressedSize,
};
} else if (QUERY_JOB_TYPE.EXTRACT_JSON === jobType) {
jobConfig = {
archive_id: streamId,
target_chunk_size: targetUncompressedSize,
};
}

let jobId;
try {
const [result] = await this.#mysqlConnectionPool.query(
`INSERT INTO ${this.#queryJobsTableName} (job_config, type)
VALUES (?, ?)`,
[
Buffer.from(msgpackEncode(jobConfig)),
QUERY_JOB_TYPE.EXTRACT_IR,
jobType,
]
);

Expand All @@ -130,16 +162,16 @@ class DbManager {
}

/**
* Gets the metadata for an IR file extracted from part of an original file, where the original
* file has the given ID and the extracted part contains the given log event index.
* Gets the metadata for the extracted stream that has the given streamId and contains the
* given logEventIdx.
*
* @param {string} origFileId
* @param {string} streamId
* @param {number} logEventIdx
* @return {Promise<object>} A promise that resolves to the extracted IR file's metadata.
* @return {Promise<object>} A promise that resolves to the extracted stream's metadata.
*/
async getExtractedIrFileMetadata (origFileId, logEventIdx) {
async getExtractedStreamFileMetadata (streamId, logEventIdx) {
return await this.#streamFilesCollection.findOne({
orig_file_id: origFileId,
orig_file_id: streamId,
begin_msg_ix: {$lte: logEventIdx},
end_msg_ix: {$gt: logEventIdx},
});
Expand Down Expand Up @@ -236,6 +268,10 @@ class DbManager {
}
}

export {
EXTRACT_JOB_TYPES,
QUERY_JOB_TYPE,
};
export default fastifyPlugin(async (app, options) => {
await app.decorate("dbManager", new DbManager(app, options));
});
59 changes: 38 additions & 21 deletions components/log-viewer-webui/server/src/routes/query.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
// eslint-disable-next-line no-magic-numbers
const EXTRACT_IR_TARGET_UNCOMPRESSED_SIZE = 128 * 1024 * 1024;
import {StatusCodes} from "http-status-codes";

import settings from "../../settings.json" with {type: "json"};
import {EXTRACT_JOB_TYPES} from "../DbManager.js";


/**
* Creates query routes.
Expand All @@ -9,37 +12,51 @@ const EXTRACT_IR_TARGET_UNCOMPRESSED_SIZE = 128 * 1024 * 1024;
* @return {Promise<void>}
*/
const routes = async (fastify, options) => {
fastify.post("/query/extract-ir", async (req, resp) => {
const {origFileId, logEventIdx} = req.body;
const sanitizedLogEventIdx = Number(logEventIdx);
fastify.post("/query/extract-stream", async (req, resp) => {
const {extractJobType, logEventIdx, streamId} = req.body;
if (false === EXTRACT_JOB_TYPES.includes(extractJobType)) {
resp.code(StatusCodes.BAD_REQUEST);
throw new Error(`Invalid extractJobType="${extractJobType}".`);
}

if ("string" !== typeof streamId || 0 === streamId.trim().length) {
resp.code(StatusCodes.BAD_REQUEST);
throw new Error("\"streamId\" must be a non-empty string.");
}

let irMetadata = await fastify.dbManager.getExtractedIrFileMetadata(
origFileId,
const sanitizedLogEventIdx = Number(logEventIdx);
let streamMetadata = await fastify.dbManager.getExtractedStreamFileMetadata(
streamId,
sanitizedLogEventIdx
);

if (null === irMetadata) {
const extractResult = await fastify.dbManager.submitAndWaitForExtractIrJob({
file_split_id: null,
msg_ix: sanitizedLogEventIdx,
orig_file_id: origFileId,
target_uncompressed_size: EXTRACT_IR_TARGET_UNCOMPRESSED_SIZE,
if (null === streamMetadata) {
const extractResult = await fastify.dbManager.submitAndWaitForExtractStreamJob({
jobType: extractJobType,
logEventIdx: sanitizedLogEventIdx,
streamId: streamId,
targetUncompressedSize: settings.StreamTargetUncompressedSize,
});

if (null === extractResult) {
const err = new Error("Unable to extract IR for file with " +
`origFileId=${origFileId} at logEventIdx=${sanitizedLogEventIdx}`);

err.statusCode = 400;
throw err;
resp.code(StatusCodes.BAD_REQUEST);
throw new Error("Unable to extract stream with " +
`streamId=${streamId} at logEventIdx=${sanitizedLogEventIdx}`);
}
irMetadata = await fastify.dbManager.getExtractedIrFileMetadata(
origFileId,

streamMetadata = await fastify.dbManager.getExtractedStreamFileMetadata(
streamId,
sanitizedLogEventIdx
);

if (null === streamMetadata) {
resp.code(StatusCodes.BAD_REQUEST);
throw new Error("Unable to find the metadata of extracted stream with " +
`streamId=${streamId} at logEventIdx=${sanitizedLogEventIdx}`);
}
}

return irMetadata;
return streamMetadata;
});
};

Expand Down
2 changes: 1 addition & 1 deletion components/package-template/src/etc/clp-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
#stream_output:
# directory: "var/data/streams"
#
# # How large each IR file should be before being split into a new IR file
# # How large each stream file should be before being split into a new stream file
# target_uncompressed_size: 134217728 # 128 MB
#
## Location where other data (besides archives) are stored. It will be created if
Expand Down
Loading

0 comments on commit 7d85ad3

Please sign in to comment.