Skip to content

Commit

Permalink
fix bug that canceled meta job cannot be removed from queue. change h…
Browse files Browse the repository at this point in the history
…ow job api list job (#67)
  • Loading branch information
EverettSummer authored Oct 28, 2023
1 parent b5cc6e6 commit 33344d9
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 18 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "mira-video-manager",
"version": "1.5.3",
"version": "1.5.4",
"description": "Video Process for mira project",
"main": "index.js",
"scripts": {
Expand Down
43 changes: 26 additions & 17 deletions src/JobScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,16 @@ import {
DOWNLOAD_MESSAGE_QUEUE,
DownloadMQMessage,
JOB_EXCHANGE,
JOB_QUEUE,
JOB_QUEUE, MQMessage,
RabbitMQService,
Sentry,
TYPES,
VIDEO_MANAGER_COMMAND,
VIDEO_MANAGER_EXCHANGE,
VIDEO_MANAGER_GENERAL
VIDEO_MANAGER_EXCHANGE
} from '@irohalab/mira-shared';
import { randomUUID } from 'crypto';
import { getStdLogger } from './utils/Logger';
import { META_JOB_KEY, NORMAL_JOB_KEY } from './TYPES';
import { META_JOB_KEY, META_JOB_QUEUE, NORMAL_JOB_KEY } from './TYPES';
import { JobType } from './domains/JobType';
import { ValidateAction } from './domains/ValidateAction';

Expand All @@ -56,6 +55,7 @@ export class JobScheduler implements JobApplication {
private _downloadMessageConsumeTag: string;
private _commandMessageConsumeTag: string;
private _jobMessageConsumeTag: string;
private _metaJobMessageConsumeTag: string;
private _jobStatusCheckerTimerId: NodeJS.Timeout;

constructor(@inject(TYPES.ConfigManager) private _configManager: ConfigManager,
Expand Down Expand Up @@ -92,27 +92,36 @@ export class JobScheduler implements JobApplication {
}
});
this._jobMessageConsumeTag = await this._rabbitmqService.consume(JOB_QUEUE, async (msg) => {
try {
const jobMessage = msg as JobMessage;
const job = await this._databaseService.getJobRepository().findOne({ id: jobMessage.jobId });
if (job.status === JobStatus.Canceled) {
logger.info('remove canceled job (' + job.id +') from message queue');
// remove from Message Queue
return true;
}
} catch (ex) {
logger.error(ex);
this._sentry.capture(ex);
}
return false;
return await this.removeJobMessageFromQueue(msg);
});

this._metaJobMessageConsumeTag = await this._rabbitmqService.consume(META_JOB_QUEUE, async (msg) => {
return await this.removeJobMessageFromQueue(msg);
});

this.checkJobStatus();
}

public async stop(): Promise<void> {
clearTimeout(this._jobStatusCheckerTimerId);
}

private async removeJobMessageFromQueue(msg: MQMessage): Promise<boolean> {
try {
const jobMessage = msg as JobMessage;
const job = await this._databaseService.getJobRepository().findOne({ id: jobMessage.jobId });
if (job.status === JobStatus.Canceled) {
logger.info('remove canceled job (' + job.id +') from message queue');
// remove from Message Queue
return true;
}
} catch (ex) {
logger.error(ex);
this._sentry.capture(ex);
}
return false;
}

private async onDownloadMessage(msg: DownloadMQMessage): Promise<void> {
let appliedRule: VideoProcessRule;
const rules = await this._databaseService.getVideoProcessRuleRepository().findByBangumiId(msg.bangumiId);
Expand Down
2 changes: 2 additions & 0 deletions src/api-service/controller/JobController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ export class JobController extends BaseHttpController implements interfaces.Cont
try {
if (status === 'all') {
jobs = await this._databaseService.getJobRepository(true).getRecentJobs();
} else if (status === 'Running') {
jobs = await this._databaseService.getJobRepository(true).getRunningJobs();
} else {
jobs = await this._databaseService.getJobRepository(true).getJobsByStatus(status);
}
Expand Down
8 changes: 8 additions & 0 deletions src/repository/JobRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ export class JobRepository extends BaseEntityRepository<Job> {
})
}

public async getRunningJobs(): Promise<Job[]> {
return await this.find({ $and: [{status: JobStatus.Running}, {status: JobStatus.MetaData}]}, {
orderBy: {
createTime: 'DESC'
}
})
}

public async getRecentJobs(): Promise<Job[]> {
return await this.find({}, {
orderBy: {
Expand Down

0 comments on commit 33344d9

Please sign in to comment.