Skip to content

Commit

Permalink
fix: 判断是否检查作业重名
Browse files Browse the repository at this point in the history
  • Loading branch information
OYX-1 committed Aug 8, 2024
1 parent 41e501f commit 1689747
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 72 deletions.
63 changes: 31 additions & 32 deletions apps/portal-server/src/clusterops/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import { Status } from "@grpc/grpc-js/build/src/constants";
import { AppType } from "@scow/config/build/app";
import { getPlaceholderKeys } from "@scow/lib-config/build/parse";
import { formatTime } from "@scow/lib-scheduler-adapter";
import { checkSchedulerApiVersion, getAppConnectionInfoFromAdapter, getEnvVariables } from "@scow/lib-server";
import { compareSchedulerApiVersion, getAppConnectionInfoFromAdapter,
getEnvVariables, getSchedulerApiVersion } from "@scow/lib-server";
import { getUserHomedir,
sftpChmod, sftpExists, sftpReaddir, sftpReadFile, sftpRealPath, sftpWriteFile } from "@scow/lib-ssh";
import { DetailedError, ErrorInfo, parseErrorDetails } from "@scow/rich-error-model";
Expand Down Expand Up @@ -80,44 +81,42 @@ export const appOps = (cluster: string): AppOps => {
partition, qos, customAttributes, appJobName } = request;

const jobName = appJobName;
// 检查是否存在同名的作业
const existingJobName = await callOnOne(

// 检查作业重名的最低调度器接口版本
const minRequiredApiVersion: ApiVersion = { major: 1, minor: 6, patch: 0 };

const scheduleApiVersion = await callOnOne(
cluster,
logger,
async (client) => {

try {
// 当前接口要求的最低调度器接口版本
const minRequiredApiVersion: ApiVersion = { major: 1, minor: 6, patch: 0 };
// 检验调度器的API版本是否符合要求,不符合要求报错
await checkSchedulerApiVersion(client, minRequiredApiVersion);

return await asyncClientCall(client.job, "getJobs", {
fields: ["job_id"],
filter: {
users: [userId], accounts: [], states: [], jobName,
},
});

} catch (error) {
logger.info("Check SchedulerApiVersion failed with %o", error);
// 适配器不支持时,返回空数组,跳过检查
return { jobs:[]};
}
},
).then((resp) => resp.jobs);
async (client) => await getSchedulerApiVersion(client),
);

// 适配器支持的话就检查是否存在同名的作业
if (compareSchedulerApiVersion(scheduleApiVersion,minRequiredApiVersion)) {

const existingJobName = await callOnOne(
cluster,
logger,
async (client) => await asyncClientCall(client.job, "getJobs", {
fields: ["job_id"],
filter: {
users: [userId], accounts: [], states: [], jobName,
},
}),
).then((resp) => resp.jobs);

if (existingJobName.length) {
throw new DetailedError({
code: Status.ALREADY_EXISTS,
message: `appJobName ${appJobName} is already existed`,
details: [errorInfo("ALREADY EXISTS")],
});
}

if (existingJobName.length) {
throw new DetailedError({
code: Status.ALREADY_EXISTS,
message: `appJobName ${appJobName} is already existed`,
details: [errorInfo("ALREADY EXISTS")],
});
}

const memoryMb = memory ? Number(memory.slice(0, -2)) : undefined;


const userSbatchOptions = customAttributes.sbatchOptions
? splitSbatchArgs(customAttributes.sbatchOptions)
: [];
Expand Down
60 changes: 29 additions & 31 deletions apps/portal-server/src/services/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { ServiceError } from "@ddadaal/tsgrpc-common";
import { plugin } from "@ddadaal/tsgrpc-server";
import { Status } from "@grpc/grpc-js/build/src/constants";
import { jobInfoToPortalJobInfo, jobInfoToRunningjob } from "@scow/lib-scheduler-adapter";
import { checkSchedulerApiVersion } from "@scow/lib-server";
import { checkSchedulerApiVersion, compareSchedulerApiVersion, getSchedulerApiVersion } from "@scow/lib-server";
import { createDirectoriesRecursively, sftpReadFile, sftpStat, sftpWriteFile } from "@scow/lib-ssh";
import { AccountStatusFilter, JobServiceServer, JobServiceService, TimeUnit } from "@scow/protos/build/portal/job";
import { parseErrorDetails } from "@scow/rich-error-model";
Expand Down Expand Up @@ -233,39 +233,37 @@ export const jobServiceServer = plugin((server) => {
, errorOutput, memory, scriptOutput } = request;
await checkActivatedClusters({ clusterIds: cluster });

// 检查是否存在同名的作业
const existingJobName = await callOnOne(
// 检查作业重名的最低调度器接口版本
const minRequiredApiVersion: ApiVersion = { major: 1, minor: 6, patch: 0 };

const scheduleApiVersion = await callOnOne(
cluster,
logger,
async (client) => {
async (client) => await getSchedulerApiVersion(client),
);

// 适配器支持的话就检查是否存在同名的作业
if (compareSchedulerApiVersion(scheduleApiVersion,minRequiredApiVersion)) {

const existingJobName = await callOnOne(
cluster,
logger,
async (client) => await asyncClientCall(client.job, "getJobs", {
fields: ["job_id"],
filter: {
users: [userId], accounts: [], states: [], jobName,
},
}),
).then((resp) => resp.jobs);

if (existingJobName.length) {
throw {
code: Status.ALREADY_EXISTS,
message: "already exists",
details: `jobName ${jobName} is already existed`,
} as ServiceError;
}

try {
// 当前接口要求的最低调度器接口版本
const minRequiredApiVersion: ApiVersion = { major: 1, minor: 6, patch: 0 };
// 检验调度器的API版本是否符合要求,不符合要求报错
await checkSchedulerApiVersion(client, minRequiredApiVersion);

return await asyncClientCall(client.job, "getJobs", {
fields: ["job_id"],
filter: {
users: [userId], accounts: [], states: [], jobName,
},
});

} catch (error) {
logger.info("Check SchedulerApiVersion failed with %o", error);
// 适配器不支持时,返回空数组,跳过检查
return { jobs:[]};
}
},
).then((resp) => resp.jobs);

if (existingJobName.length) {
throw {
code: Status.ALREADY_EXISTS,
message: "already exists",
details: `jobName ${jobName} is already existed`,
} as ServiceError;
}

// make sure working directory exists
Expand Down
42 changes: 33 additions & 9 deletions libs/server/src/scheduleAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,7 @@ export async function checkSchedulerApiVersion(client: SchedulerAdapterClient,
if (scheduleApiVersion) {

// 检查调度器接口版本是否大于等于最低要求版本
let geMinVersion: boolean;
if (scheduleApiVersion.major !== minVersion.major) {
geMinVersion = (scheduleApiVersion.major > minVersion.major);
} else if (scheduleApiVersion.minor !== minVersion.minor) {
geMinVersion = (scheduleApiVersion.minor > minVersion.minor);
} else {
geMinVersion = true;
}
const geMinVersion = compareSchedulerApiVersion(scheduleApiVersion,minVersion);

if (!geMinVersion) {
throw {
Expand All @@ -74,5 +67,36 @@ export async function checkSchedulerApiVersion(client: SchedulerAdapterClient,
} as ServiceError;
}
}

};


export function compareSchedulerApiVersion(scheduleApiVersion: ApiVersion, minVersion: ApiVersion): boolean {
let geMinVersion: boolean;
if (scheduleApiVersion.major !== minVersion.major) {
geMinVersion = (scheduleApiVersion.major > minVersion.major);
} else if (scheduleApiVersion.minor !== minVersion.minor) {
geMinVersion = (scheduleApiVersion.minor > minVersion.minor);
} else {
geMinVersion = true;
}

return geMinVersion;
}

export async function getSchedulerApiVersion(client: SchedulerAdapterClient): Promise<ApiVersion> {
let scheduleApiVersion: ApiVersion;
try {
scheduleApiVersion = await asyncClientCall(client.version, "getVersion", {});
} catch (e) {
// 适配器请求连接失败的处理
if (((e as any).code === status.CANCELLED)) {
throw e;
}

// 如果找不到获取版本号的接口,指定版本为接口存在前的最新版1.0.0
scheduleApiVersion = { major: 1, minor: 0, patch: 0 };
console.log("The scheduler API version can not be confirmed");
}

return scheduleApiVersion;
}

0 comments on commit 1689747

Please sign in to comment.