From 168974770e9466ceb637511c329cce0fbac822b8 Mon Sep 17 00:00:00 2001 From: OYX-1 <13121812323@163.com> Date: Thu, 8 Aug 2024 06:21:33 +0000 Subject: [PATCH] =?UTF-8?q?fix:=20=E5=88=A4=E6=96=AD=E6=98=AF=E5=90=A6?= =?UTF-8?q?=E6=A3=80=E6=9F=A5=E4=BD=9C=E4=B8=9A=E9=87=8D=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/portal-server/src/clusterops/app.ts | 63 ++++++++++++------------ apps/portal-server/src/services/job.ts | 60 +++++++++++----------- libs/server/src/scheduleAdapter.ts | 42 ++++++++++++---- 3 files changed, 93 insertions(+), 72 deletions(-) diff --git a/apps/portal-server/src/clusterops/app.ts b/apps/portal-server/src/clusterops/app.ts index 24cac61acf..fd4126cc34 100644 --- a/apps/portal-server/src/clusterops/app.ts +++ b/apps/portal-server/src/clusterops/app.ts @@ -16,7 +16,8 @@ import { Status } from "@grpc/grpc-js/build/src/constants"; import { AppType } from "@scow/config/build/app"; import { getPlaceholderKeys } from "@scow/lib-config/build/parse"; import { formatTime } from "@scow/lib-scheduler-adapter"; -import { checkSchedulerApiVersion, getAppConnectionInfoFromAdapter, getEnvVariables } from "@scow/lib-server"; +import { compareSchedulerApiVersion, getAppConnectionInfoFromAdapter, + getEnvVariables, getSchedulerApiVersion } from "@scow/lib-server"; import { getUserHomedir, sftpChmod, sftpExists, sftpReaddir, sftpReadFile, sftpRealPath, sftpWriteFile } from "@scow/lib-ssh"; import { DetailedError, ErrorInfo, parseErrorDetails } from "@scow/rich-error-model"; @@ -80,44 +81,42 @@ export const appOps = (cluster: string): AppOps => { partition, qos, customAttributes, appJobName } = request; const jobName = appJobName; - // 检查是否存在同名的作业 - const existingJobName = await callOnOne( + + // 检查作业重名的最低调度器接口版本 + const minRequiredApiVersion: ApiVersion = { major: 1, minor: 6, patch: 0 }; + + const scheduleApiVersion = await callOnOne( cluster, logger, - async (client) => { - - try { - // 当前接口要求的最低调度器接口版本 - const minRequiredApiVersion: ApiVersion = { major: 1, minor: 6, patch: 0 }; - // 检验调度器的API版本是否符合要求,不符合要求报错 - await checkSchedulerApiVersion(client, minRequiredApiVersion); - - return await asyncClientCall(client.job, "getJobs", { - fields: ["job_id"], - filter: { - users: [userId], accounts: [], states: [], jobName, - }, - }); - - } catch (error) { - logger.info("Check SchedulerApiVersion failed with %o", error); - // 适配器不支持时,返回空数组,跳过检查 - return { jobs:[]}; - } - }, - ).then((resp) => resp.jobs); + async (client) => await getSchedulerApiVersion(client), + ); + + // 适配器支持的话就检查是否存在同名的作业 + if (compareSchedulerApiVersion(scheduleApiVersion,minRequiredApiVersion)) { + + const existingJobName = await callOnOne( + cluster, + logger, + async (client) => await asyncClientCall(client.job, "getJobs", { + fields: ["job_id"], + filter: { + users: [userId], accounts: [], states: [], jobName, + }, + }), + ).then((resp) => resp.jobs); + + if (existingJobName.length) { + throw new DetailedError({ + code: Status.ALREADY_EXISTS, + message: `appJobName ${appJobName} is already existed`, + details: [errorInfo("ALREADY EXISTS")], + }); + } - if (existingJobName.length) { - throw new DetailedError({ - code: Status.ALREADY_EXISTS, - message: `appJobName ${appJobName} is already existed`, - details: [errorInfo("ALREADY EXISTS")], - }); } const memoryMb = memory ? Number(memory.slice(0, -2)) : undefined; - const userSbatchOptions = customAttributes.sbatchOptions ? splitSbatchArgs(customAttributes.sbatchOptions) : []; diff --git a/apps/portal-server/src/services/job.ts b/apps/portal-server/src/services/job.ts index 2e2cf00a76..4de779aadc 100644 --- a/apps/portal-server/src/services/job.ts +++ b/apps/portal-server/src/services/job.ts @@ -15,7 +15,7 @@ import { ServiceError } from "@ddadaal/tsgrpc-common"; import { plugin } from "@ddadaal/tsgrpc-server"; import { Status } from "@grpc/grpc-js/build/src/constants"; import { jobInfoToPortalJobInfo, jobInfoToRunningjob } from "@scow/lib-scheduler-adapter"; -import { checkSchedulerApiVersion } from "@scow/lib-server"; +import { checkSchedulerApiVersion, compareSchedulerApiVersion, getSchedulerApiVersion } from "@scow/lib-server"; import { createDirectoriesRecursively, sftpReadFile, sftpStat, sftpWriteFile } from "@scow/lib-ssh"; import { AccountStatusFilter, JobServiceServer, JobServiceService, TimeUnit } from "@scow/protos/build/portal/job"; import { parseErrorDetails } from "@scow/rich-error-model"; @@ -233,39 +233,37 @@ export const jobServiceServer = plugin((server) => { , errorOutput, memory, scriptOutput } = request; await checkActivatedClusters({ clusterIds: cluster }); - // 检查是否存在同名的作业 - const existingJobName = await callOnOne( + // 检查作业重名的最低调度器接口版本 + const minRequiredApiVersion: ApiVersion = { major: 1, minor: 6, patch: 0 }; + + const scheduleApiVersion = await callOnOne( cluster, logger, - async (client) => { + async (client) => await getSchedulerApiVersion(client), + ); + + // 适配器支持的话就检查是否存在同名的作业 + if (compareSchedulerApiVersion(scheduleApiVersion,minRequiredApiVersion)) { + + const existingJobName = await callOnOne( + cluster, + logger, + async (client) => await asyncClientCall(client.job, "getJobs", { + fields: ["job_id"], + filter: { + users: [userId], accounts: [], states: [], jobName, + }, + }), + ).then((resp) => resp.jobs); + + if (existingJobName.length) { + throw { + code: Status.ALREADY_EXISTS, + message: "already exists", + details: `jobName ${jobName} is already existed`, + } as ServiceError; + } - try { - // 当前接口要求的最低调度器接口版本 - const minRequiredApiVersion: ApiVersion = { major: 1, minor: 6, patch: 0 }; - // 检验调度器的API版本是否符合要求,不符合要求报错 - await checkSchedulerApiVersion(client, minRequiredApiVersion); - - return await asyncClientCall(client.job, "getJobs", { - fields: ["job_id"], - filter: { - users: [userId], accounts: [], states: [], jobName, - }, - }); - - } catch (error) { - logger.info("Check SchedulerApiVersion failed with %o", error); - // 适配器不支持时,返回空数组,跳过检查 - return { jobs:[]}; - } - }, - ).then((resp) => resp.jobs); - - if (existingJobName.length) { - throw { - code: Status.ALREADY_EXISTS, - message: "already exists", - details: `jobName ${jobName} is already existed`, - } as ServiceError; } // make sure working directory exists diff --git a/libs/server/src/scheduleAdapter.ts b/libs/server/src/scheduleAdapter.ts index 40d1c9bf8f..157b8198ac 100644 --- a/libs/server/src/scheduleAdapter.ts +++ b/libs/server/src/scheduleAdapter.ts @@ -54,14 +54,7 @@ export async function checkSchedulerApiVersion(client: SchedulerAdapterClient, if (scheduleApiVersion) { // 检查调度器接口版本是否大于等于最低要求版本 - let geMinVersion: boolean; - if (scheduleApiVersion.major !== minVersion.major) { - geMinVersion = (scheduleApiVersion.major > minVersion.major); - } else if (scheduleApiVersion.minor !== minVersion.minor) { - geMinVersion = (scheduleApiVersion.minor > minVersion.minor); - } else { - geMinVersion = true; - } + const geMinVersion = compareSchedulerApiVersion(scheduleApiVersion,minVersion); if (!geMinVersion) { throw { @@ -74,5 +67,36 @@ export async function checkSchedulerApiVersion(client: SchedulerAdapterClient, } as ServiceError; } } - }; + + +export function compareSchedulerApiVersion(scheduleApiVersion: ApiVersion, minVersion: ApiVersion): boolean { + let geMinVersion: boolean; + if (scheduleApiVersion.major !== minVersion.major) { + geMinVersion = (scheduleApiVersion.major > minVersion.major); + } else if (scheduleApiVersion.minor !== minVersion.minor) { + geMinVersion = (scheduleApiVersion.minor > minVersion.minor); + } else { + geMinVersion = true; + } + + return geMinVersion; +} + +export async function getSchedulerApiVersion(client: SchedulerAdapterClient): Promise { + let scheduleApiVersion: ApiVersion; + try { + scheduleApiVersion = await asyncClientCall(client.version, "getVersion", {}); + } catch (e) { + // 适配器请求连接失败的处理 + if (((e as any).code === status.CANCELLED)) { + throw e; + } + + // 如果找不到获取版本号的接口,指定版本为接口存在前的最新版1.0.0 + scheduleApiVersion = { major: 1, minor: 0, patch: 0 }; + console.log("The scheduler API version can not be confirmed"); + } + + return scheduleApiVersion; +}