Skip to content

!!! DO NOT MERGE !!! New ka mining service logic #35

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ require('dotenv').config();

const express = require('express');
// const { createAssetJob } = require('./queue');
require('./sync-assets-queue');
//require('./sync-assets-queue');
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intentional?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can leave a comment suggesting that the line be uncommented if the user needs that feature, or we can just leave it enabled by default.

const cors = require('cors');
const cookieParser = require('cookie-parser');
const app = express();
Expand Down
47 changes: 30 additions & 17 deletions services/kMiningService.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,19 @@ const FormData = require('form-data');
const fs = require('fs');
const datasetService = require('./datasetService.js');

exports.defineProcessingPipelineId = async (req) => {
exports.defineProcessingPipelineId = async req => {
const kmining_json_pipeline_id = req.user.config.find(
(item) => item.option === 'kmining_json_pipeline_id'
item => item.option === 'kmining_json_pipeline_id'
)?.value;
const kmining_pdf_pipeline_id = req.user.config.find(
(item) => item.option === 'kmining_pdf_pipeline_id'
item => item.option === 'kmining_pdf_pipeline_id'
)?.value;
const kmining_csv_pipeline_id = req.user.config.find(
(item) => item.option === 'kmining_csv_pipeline_id'
item => item.option === 'kmining_csv_pipeline_id'
)?.value;


if (req.file.mimetype === 'application/ld+json') {
return "simple_json_to_jsonld";
return 'simple_json_to_jsonld';
}
if (req.file.mimetype === 'application/json') {
return kmining_json_pipeline_id;
Expand Down Expand Up @@ -44,16 +43,16 @@ exports.triggerPipeline = async (
formData.append('pipelineId', kMiningPipelineId);
formData.append(
'fileFormat',
file.mimetype === 'application/json'
|| file.mimetype === 'application/ld+json'
file.mimetype === 'application/json' ||
file.mimetype === 'application/ld+json'
? 'json'
: file.mimetype === 'application/pdf'
? 'pdf'
: 'csv'
);

let result = await axios.post(
`${kMiningEndpoint}/trigger_pipeline`,
`${kMiningEndpoint}/trigger-pipeline`,
formData,
{
withCredentials: true,
Expand All @@ -64,9 +63,10 @@ exports.triggerPipeline = async (
}
);

if (result.data.message === 'DAG triggered') {
const pipelineId = result.data.pipeline_id;
const runId = result.data.run_id;
console.log('Trigger pipeline result', result.status, result.data);
if (result.data.success) {
const pipelineId = result.data.pipelineId;
const runId = result.data.runId;
if (inputDatasetDBRecord) {
await datasetService.storePipelineInfo(
inputDatasetDBRecord,
Expand All @@ -77,18 +77,26 @@ exports.triggerPipeline = async (
while (true) {
await wait(1000);

console.log('Checking pipeline status...');
let pipelineResp = await axios.get(
`${kMiningEndpoint}/check-pipeline-status`,
{
params: {
pipeline_id: pipelineId,
run_id: runId
pipelineId,
runId
},
withCredentials: true,
headers: {
Cookie: sessionCookie,
...formData.getHeaders() // Include multipart/form-data headers
}
}
);

console.log('Pipeline status result', pipelineResp.data);

if (pipelineResp.data.status === 'success') {
return pipelineResp.data.xcom_value;
return pipelineResp.data.result;
} else if (
pipelineResp.data.status === 'failed' ||
pipelineResp.data.status === 'not_found'
Expand All @@ -104,7 +112,12 @@ exports.triggerPipeline = async (
throw error;
}
};

function wait(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
console.log(`Waiting for ${ms}ms`);
return new Promise(resolve => {
setTimeout(() => {
console.log('Finished waiting');
resolve();
}, 1000);
});
}