diff --git a/.changeset/tasty-files-end.md b/.changeset/tasty-files-end.md new file mode 100644 index 0000000000..2fe6468182 --- /dev/null +++ b/.changeset/tasty-files-end.md @@ -0,0 +1,7 @@ +--- +"@scow/portal-server": patch +"@scow/portal-web": patch +"@scow/ai": patch +--- + +ai 和 hpc 在提交作业和应用前检查一下是否重名 diff --git a/apps/ai/src/server/trpc/route/jobs/apps.ts b/apps/ai/src/server/trpc/route/jobs/apps.ts index f670f297a8..4023f58449 100644 --- a/apps/ai/src/server/trpc/route/jobs/apps.ts +++ b/apps/ai/src/server/trpc/route/jobs/apps.ts @@ -309,6 +309,24 @@ export const createAppSession = procedure model, mountPoints = [], account, partition, coreCount, nodeCount, gpuCount, memory, maxTime, workingDirectory, customAttributes, gpuType } = input; + const userId = user.identityId; + const client = getAdapterClient(clusterId); + + // 检查是否存在同名的作业 + const existingJobName = await asyncClientCall(client.job, "getJobs", { + fields: ["job_id"], + filter: { + users: [userId], accounts: [],states: [],jobName:appJobName, + }, + }).then((resp) => resp.jobs); + + if (existingJobName.length) { + throw new TRPCError({ + code: "CONFLICT", + message: `appJobName ${appJobName} is already existed`, + }); + } + const apps = getClusterAppConfigs(clusterId); const app = checkAppExist(apps, appId); @@ -381,7 +399,7 @@ export const createAppSession = procedure throw clusterNotFound(clusterId); } - const userId = user.identityId; + return await sshConnect(host, userId, logger, async (ssh) => { const homeDir = await getUserHomedir(ssh, userId, logger); @@ -470,7 +488,7 @@ export const createAppSession = procedure // 将entry.sh写入后将路径传给适配器后启动容器 await sftpWriteFile(sftp)(remoteEntryPath, entryScript); - const client = getAdapterClient(clusterId); + const reply = await asyncClientCall(client.job, "submitJob", { userId, jobName: appJobName, diff --git a/apps/ai/src/server/trpc/route/jobs/jobs.ts b/apps/ai/src/server/trpc/route/jobs/jobs.ts index feb5cea0c2..7ad22db527 100644 --- a/apps/ai/src/server/trpc/route/jobs/jobs.ts +++ b/apps/ai/src/server/trpc/route/jobs/jobs.ts @@ -142,6 +142,23 @@ procedure throw clusterNotFound(clusterId); } + const client = getAdapterClient(clusterId); + + // 检查是否存在同名的作业 + const existingJobName = await asyncClientCall(client.job, "getJobs", { + fields: ["job_id"], + filter: { + users: [userId], accounts: [],states: [],jobName:trainJobName, + }, + }).then((resp) => resp.jobs); + + if (existingJobName.length) { + throw new TRPCError({ + code: "CONFLICT", + message: `trainJobName ${trainJobName} is already existed`, + }); + } + const em = await forkEntityManager(); const { datasetVersion, @@ -189,7 +206,6 @@ procedure const entryScript = command; await sftpWriteFile(sftp)(remoteEntryPath, entryScript); - const client = getAdapterClient(clusterId); const reply = await asyncClientCall(client.job, "submitJob", { userId, jobName: trainJobName, diff --git a/apps/portal-server/src/clusterops/app.ts b/apps/portal-server/src/clusterops/app.ts index fb9b3395fa..23f788f572 100644 --- a/apps/portal-server/src/clusterops/app.ts +++ b/apps/portal-server/src/clusterops/app.ts @@ -16,10 +16,10 @@ import { Status } from "@grpc/grpc-js/build/src/constants"; import { AppType } from "@scow/config/build/app"; import { getPlaceholderKeys } from "@scow/lib-config/build/parse"; import { formatTime } from "@scow/lib-scheduler-adapter"; -import { getAppConnectionInfoFromAdapter, getEnvVariables } from "@scow/lib-server"; +import { checkJobNameExisting, errorInfo, getAppConnectionInfoFromAdapter,getEnvVariables } from "@scow/lib-server"; import { getUserHomedir, sftpChmod, sftpExists, sftpReaddir, sftpReadFile, sftpRealPath, sftpWriteFile } from "@scow/lib-ssh"; -import { DetailedError, ErrorInfo, parseErrorDetails } from "@scow/rich-error-model"; +import { DetailedError, parseErrorDetails } from "@scow/rich-error-model"; import { JobInfo, SubmitJobRequest } from "@scow/scheduler-adapter-protos/build/protos/job"; import fs from "fs"; import { join } from "path"; @@ -63,9 +63,6 @@ const VNC_SESSION_INFO = "VNC_SESSION_INFO"; const APP_LAST_SUBMISSION_INFO = "last_submission.json"; const BIN_BASH_SCRIPT_HEADER = "#!/bin/bash -l\n"; -const errorInfo = (reason: string) => - ErrorInfo.create({ domain: "", reason: reason, metadata: {} }); - export const appOps = (cluster: string): AppOps => { const host = getClusterLoginNode(cluster); @@ -79,8 +76,36 @@ export const appOps = (cluster: string): AppOps => { const { appId, userId, account, coreCount, nodeCount, gpuCount, memory, maxTime, proxyBasePath, partition, qos, customAttributes, appJobName } = request; - const memoryMb = memory ? Number(memory.slice(0, -2)) : undefined; + const jobName = appJobName; + + // 检查作业名是否重复 + await callOnOne( + cluster, + logger, + async (client) => { + await checkJobNameExisting(client,userId,jobName,logger); + }, + ).catch((e) => { + const ex = e as ServiceError; + const errors = parseErrorDetails(ex.metadata); + if (errors[0] && errors[0].$type === "google.rpc.ErrorInfo" + && errors[0].reason === "ALREADY_EXISTS") { + throw new DetailedError({ + code: Status.ALREADY_EXISTS, + message: ex.details, + details: [errorInfo("ALREADY_EXISTS")], + }); + } + else { + throw new DetailedError({ + code: ex.code, + message: ex.details, + details: [errorInfo("SBATCH_FAILED")], + }); + } + }); + const memoryMb = memory ? Number(memory.slice(0, -2)) : undefined; const userSbatchOptions = customAttributes.sbatchOptions ? splitSbatchArgs(customAttributes.sbatchOptions) @@ -97,8 +122,6 @@ export const appOps = (cluster: string): AppOps => { }); } - const jobName = appJobName; - const workingDirectory = join(portalConfig.appJobsDir, jobName); const lastSubmissionDirectory = join(portalConfig.appLastSubmissionDir, appId); diff --git a/apps/portal-server/src/services/job.ts b/apps/portal-server/src/services/job.ts index e30c15df75..45521aaf5e 100644 --- a/apps/portal-server/src/services/job.ts +++ b/apps/portal-server/src/services/job.ts @@ -15,7 +15,7 @@ import { ServiceError } from "@ddadaal/tsgrpc-common"; import { plugin } from "@ddadaal/tsgrpc-server"; import { Status } from "@grpc/grpc-js/build/src/constants"; import { jobInfoToPortalJobInfo, jobInfoToRunningjob } from "@scow/lib-scheduler-adapter"; -import { checkSchedulerApiVersion } from "@scow/lib-server"; +import { checkJobNameExisting, checkSchedulerApiVersion } from "@scow/lib-server"; import { createDirectoriesRecursively, sftpReadFile, sftpStat, sftpWriteFile } from "@scow/lib-ssh"; import { AccountStatusFilter, JobServiceServer, JobServiceService, TimeUnit } from "@scow/protos/build/portal/job"; import { parseErrorDetails } from "@scow/rich-error-model"; @@ -233,6 +233,15 @@ export const jobServiceServer = plugin((server) => { , errorOutput, memory, scriptOutput } = request; await checkActivatedClusters({ clusterIds: cluster }); + // 检查作业名是否重复 + await callOnOne( + cluster, + logger, + async (client) => { + await checkJobNameExisting(client,userId,jobName,logger); + }, + ); + // make sure working directory exists const host = getClusterLoginNode(cluster); if (!host) { throw clusterNotFound(cluster); } diff --git a/apps/portal-web/src/pageComponents/app/LaunchAppForm.tsx b/apps/portal-web/src/pageComponents/app/LaunchAppForm.tsx index ec4b10b89c..217b6680a9 100644 --- a/apps/portal-web/src/pageComponents/app/LaunchAppForm.tsx +++ b/apps/portal-web/src/pageComponents/app/LaunchAppForm.tsx @@ -116,7 +116,7 @@ export const LaunchAppForm: React.FC = ({ clusterId, appId, attributes, a maxTime, customAttributes: customFormKeyValue, } }) - .httpError(409, (e) => { + .httpError(500, (e) => { if (e.code === "SBATCH_FAILED") { createErrorModal(e.message); } else { @@ -137,6 +137,13 @@ export const LaunchAppForm: React.FC = ({ clusterId, appId, attributes, a throw e; } }) + .httpError(409, (e) => { + if (e.code === "ALREADY_EXISTS") { + createErrorModal(e.message); + } else { + throw e; + } + }) .then(() => { message.success(t(p("successMessage"))); Router.push(`/apps/${clusterId}/sessions`); diff --git a/apps/portal-web/src/pageComponents/job/SubmitJobForm.tsx b/apps/portal-web/src/pageComponents/job/SubmitJobForm.tsx index e6afa31068..aa12bc7e6e 100644 --- a/apps/portal-web/src/pageComponents/job/SubmitJobForm.tsx +++ b/apps/portal-web/src/pageComponents/job/SubmitJobForm.tsx @@ -132,6 +132,16 @@ export const SubmitJobForm: React.FC = ({ initial = initialValues, submit throw e; } }) + .httpError(409, (e) => { + if (e.code === "ALREADY_EXISTS") { + modal.error({ + title: t(p("errorMessage")), + content: e.message, + }); + } else { + throw e; + } + }) .then(({ jobId }) => { message.success(t(p("successMessage")) + jobId); Router.push("/jobs/runningJobs"); diff --git a/apps/portal-web/src/pages/api/app/createAppSession.ts b/apps/portal-web/src/pages/api/app/createAppSession.ts index cd3a262b15..df2e43b480 100644 --- a/apps/portal-web/src/pages/api/app/createAppSession.ts +++ b/apps/portal-web/src/pages/api/app/createAppSession.ts @@ -55,12 +55,17 @@ export const CreateAppSessionSchema = typeboxRouteSchema({ message: Type.String(), }), + 409: Type.Object({ + code: Type.Literal("ALREADY_EXISTS"), + message: Type.String(), + }), + 404: Type.Object({ code: Type.Literal("APP_NOT_FOUND"), message: Type.String(), }), - 409: Type.Object({ + 500: Type.Object({ code: Type.Literal("SBATCH_FAILED"), message: Type.String(), }), @@ -126,11 +131,13 @@ export default /* #__PURE__*/route(CreateAppSessionSchema, async (req, res) => { if (errors[0] && errors[0].$type === "google.rpc.ErrorInfo") { switch (errors[0].reason) { case "SBATCH_FAILED": - return { 409: { code: "SBATCH_FAILED" as const, message: ex.details } }; + return { 500: { code: "SBATCH_FAILED" as const, message: ex.details } }; case "NOT FOUND": return { 404: { code: "APP_NOT_FOUND" as const, message: ex.details } }; case "INVALID ARGUMENT": return { 400: { code: "INVALID_INPUT" as const, message: ex.details } }; + case "ALREADY EXISTS": + return { 409: { code: "ALREADY_EXISTS" as const, message: ex.details } }; default: return e; } diff --git a/apps/portal-web/src/pages/api/job/submitJob.ts b/apps/portal-web/src/pages/api/job/submitJob.ts index a3a05bafed..e383e6a7c2 100644 --- a/apps/portal-web/src/pages/api/job/submitJob.ts +++ b/apps/portal-web/src/pages/api/job/submitJob.ts @@ -65,6 +65,11 @@ export const SubmitJobSchema = typeboxRouteSchema({ message: Type.String(), }), + 409: Type.Object({ + code: Type.Literal("ALREADY_EXISTS"), + message: Type.String(), + }), + 500: Type.Object({ code: Type.Literal("SCHEDULER_FAILED"), message: Type.String(), @@ -137,6 +142,7 @@ export default route(SubmitJobSchema, async (req, res) => { .catch(handlegRPCError({ [status.INTERNAL]: (err) => ({ 500: { code: "SCHEDULER_FAILED", message: err.details } } as const), [status.NOT_FOUND]: (err) => ({ 404: { code: "NOT_FOUND", message: err.details } } as const), + [status.ALREADY_EXISTS]: (err) => ({ 409: { code: "ALREADY_EXISTS", message: err.details } } as const), }, async () => await callLog( { ...logInfo, diff --git a/libs/server/src/index.ts b/libs/server/src/index.ts index 1edcf6f84f..8784526c68 100644 --- a/libs/server/src/index.ts +++ b/libs/server/src/index.ts @@ -13,6 +13,7 @@ export * from "./apiAuthPlugin"; export * from "./app"; export * from "./date"; +export * from "./job"; export * from "./misCommon/clustersActivation"; export * from "./scheduleAdapter"; export * from "./systemLanguage"; diff --git a/libs/server/src/job.ts b/libs/server/src/job.ts new file mode 100644 index 0000000000..9c9aa24a5c --- /dev/null +++ b/libs/server/src/job.ts @@ -0,0 +1,59 @@ +/** + * Copyright (c) 2022 Peking University and Peking University Institute for Computing and Digital Economy + * SCOW is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +import { asyncClientCall } from "@ddadaal/tsgrpc-client"; +import { Status } from "@grpc/grpc-js/build/src/constants"; +import { SchedulerAdapterClient } from "@scow/lib-scheduler-adapter"; +import { DetailedError, ErrorInfo } from "@scow/rich-error-model"; +import { ApiVersion } from "@scow/utils/build/version"; +import { Logger } from "ts-log"; + +import { compareSchedulerApiVersion, getSchedulerApiVersion } from "./scheduleAdapter"; + +export const errorInfo = (reason: string) => + ErrorInfo.create({ domain: "", reason: reason, metadata: {} }); + +/** + * HPC提交作业前检查作业名和应用名是否重复 + * @param client + * @param userId + * @param jobName + * @param logger + */ +export const checkJobNameExisting = async (client: SchedulerAdapterClient,userId: string,jobName: string, + logger: Logger) => { + // 检查作业重名的最低调度器接口版本 + const minRequiredApiVersion: ApiVersion = { major: 1, minor: 6, patch: 0 }; + + const scheduleApiVersion = await getSchedulerApiVersion(client, logger); + + if (compareSchedulerApiVersion(scheduleApiVersion,minRequiredApiVersion)) { + const existingJobName = await asyncClientCall(client.job, "getJobs", { + fields: ["job_id"], + filter: { + users: [userId], accounts: [], states: [], jobName, + }, + }).then((resp) => resp.jobs); + + if (existingJobName.length) { + + throw new DetailedError({ + code: Status.ALREADY_EXISTS, + message: `jobName ${jobName} is already existed`, + details: [errorInfo("ALREADY_EXISTS")], + }); + } + } else { + logger.info("Adapter version lower than 1.6.0, do not perform check for duplicate job names"); + } + +}; diff --git a/libs/server/src/scheduleAdapter.ts b/libs/server/src/scheduleAdapter.ts index 40d1c9bf8f..ce69c3d9ef 100644 --- a/libs/server/src/scheduleAdapter.ts +++ b/libs/server/src/scheduleAdapter.ts @@ -16,6 +16,7 @@ import { Status } from "@grpc/grpc-js/build/src/constants"; import { SchedulerAdapterClient } from "@scow/lib-scheduler-adapter"; import { parseErrorDetails } from "@scow/rich-error-model"; import { ApiVersion } from "@scow/utils/build/version"; +import { Logger } from "ts-log"; /** @@ -54,14 +55,7 @@ export async function checkSchedulerApiVersion(client: SchedulerAdapterClient, if (scheduleApiVersion) { // 检查调度器接口版本是否大于等于最低要求版本 - let geMinVersion: boolean; - if (scheduleApiVersion.major !== minVersion.major) { - geMinVersion = (scheduleApiVersion.major > minVersion.major); - } else if (scheduleApiVersion.minor !== minVersion.minor) { - geMinVersion = (scheduleApiVersion.minor > minVersion.minor); - } else { - geMinVersion = true; - } + const geMinVersion = compareSchedulerApiVersion(scheduleApiVersion,minVersion); if (!geMinVersion) { throw { @@ -74,5 +68,38 @@ export async function checkSchedulerApiVersion(client: SchedulerAdapterClient, } as ServiceError; } } - }; + + +export function compareSchedulerApiVersion(scheduleApiVersion: ApiVersion, minVersion: ApiVersion): boolean { + let geMinVersion: boolean; + if (scheduleApiVersion.major !== minVersion.major) { + geMinVersion = (scheduleApiVersion.major > minVersion.major); + } else if (scheduleApiVersion.minor !== minVersion.minor) { + geMinVersion = (scheduleApiVersion.minor > minVersion.minor); + } else { + geMinVersion = true; + } + + return geMinVersion; +} + +export async function getSchedulerApiVersion(client: SchedulerAdapterClient, logger: Logger): Promise { + let scheduleApiVersion: ApiVersion; + try { + scheduleApiVersion = await asyncClientCall(client.version, "getVersion", {}); + } catch (e) { + // 适配器请求连接失败的处理 + if (((e as any).code === status.CANCELLED)) { + throw e; + } + + // 如果找不到获取版本号的接口,指定版本为接口存在前的最新版1.0.0 + scheduleApiVersion = { major: 1, minor: 0, patch: 0 }; + logger.info("The scheduler API version can not be confirmed"); + } + + return scheduleApiVersion; +} + + diff --git a/libs/web/src/layouts/base/header/BigScreenMenu.tsx b/libs/web/src/layouts/base/header/BigScreenMenu.tsx index d0b2f0ac45..bdff5e45ce 100644 --- a/libs/web/src/layouts/base/header/BigScreenMenu.tsx +++ b/libs/web/src/layouts/base/header/BigScreenMenu.tsx @@ -45,7 +45,7 @@ interface Props { } export const BigScreenMenu: React.FC = ({ - routes, className, activeKeys, + routes, className, activeKeys, pathname, }) => { const router = useRouter();