Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ai和hpc在提交作业和应用前检查一下是否重名 #1375

Merged
merged 26 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/tasty-files-end.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@scow/portal-server": patch
"@scow/portal-web": patch
"@scow/ai": patch
---

ai 和 hpc 在提交作业和应用前检查一下是否重名
22 changes: 20 additions & 2 deletions apps/ai/src/server/trpc/route/jobs/apps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,24 @@ export const createAppSession = procedure
model, mountPoints = [], account, partition, coreCount, nodeCount, gpuCount, memory,
maxTime, workingDirectory, customAttributes, gpuType } = input;

const userId = user.identityId;
const client = getAdapterClient(clusterId);

// 检查是否存在同名的作业
const existingJobName = await asyncClientCall(client.job, "getJobs", {
fields: ["job_id"],
filter: {
users: [userId], accounts: [],states: [],jobName:appJobName,
OYX-1 marked this conversation as resolved.
Show resolved Hide resolved
},
}).then((resp) => resp.jobs);

if (existingJobName.length) {
throw new TRPCError({
code: "CONFLICT",
message: `appJobName ${appJobName} is already existed`,
});
}

const apps = getClusterAppConfigs(clusterId);
const app = checkAppExist(apps, appId);

Expand Down Expand Up @@ -381,7 +399,7 @@ export const createAppSession = procedure
throw clusterNotFound(clusterId);
}

const userId = user.identityId;

return await sshConnect(host, userId, logger, async (ssh) => {
const homeDir = await getUserHomedir(ssh, userId, logger);

Expand Down Expand Up @@ -470,7 +488,7 @@ export const createAppSession = procedure

// 将entry.sh写入后将路径传给适配器后启动容器
await sftpWriteFile(sftp)(remoteEntryPath, entryScript);
const client = getAdapterClient(clusterId);

const reply = await asyncClientCall(client.job, "submitJob", {
userId,
jobName: appJobName,
Expand Down
18 changes: 17 additions & 1 deletion apps/ai/src/server/trpc/route/jobs/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,23 @@ procedure
throw clusterNotFound(clusterId);
}

const client = getAdapterClient(clusterId);

// 检查是否存在同名的作业
const existingJobName = await asyncClientCall(client.job, "getJobs", {
fields: ["job_id"],
filter: {
users: [userId], accounts: [],states: [],jobName:trainJobName,
},
}).then((resp) => resp.jobs);

if (existingJobName.length) {
throw new TRPCError({
code: "CONFLICT",
message: `trainJobName ${trainJobName} is already existed`,
});
}
OYX-1 marked this conversation as resolved.
Show resolved Hide resolved

const em = await forkEntityManager();
const {
datasetVersion,
Expand Down Expand Up @@ -189,7 +206,6 @@ procedure
const entryScript = command;
await sftpWriteFile(sftp)(remoteEntryPath, entryScript);

const client = getAdapterClient(clusterId);
const reply = await asyncClientCall(client.job, "submitJob", {
userId,
jobName: trainJobName,
Expand Down
39 changes: 31 additions & 8 deletions apps/portal-server/src/clusterops/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import { Status } from "@grpc/grpc-js/build/src/constants";
import { AppType } from "@scow/config/build/app";
import { getPlaceholderKeys } from "@scow/lib-config/build/parse";
import { formatTime } from "@scow/lib-scheduler-adapter";
import { getAppConnectionInfoFromAdapter, getEnvVariables } from "@scow/lib-server";
import { checkJobNameExisting, errorInfo, getAppConnectionInfoFromAdapter,getEnvVariables } from "@scow/lib-server";
import { getUserHomedir,
sftpChmod, sftpExists, sftpReaddir, sftpReadFile, sftpRealPath, sftpWriteFile } from "@scow/lib-ssh";
import { DetailedError, ErrorInfo, parseErrorDetails } from "@scow/rich-error-model";
import { DetailedError, parseErrorDetails } from "@scow/rich-error-model";
import { JobInfo, SubmitJobRequest } from "@scow/scheduler-adapter-protos/build/protos/job";
import fs from "fs";
import { join } from "path";
Expand Down Expand Up @@ -63,9 +63,6 @@ const VNC_SESSION_INFO = "VNC_SESSION_INFO";
const APP_LAST_SUBMISSION_INFO = "last_submission.json";
const BIN_BASH_SCRIPT_HEADER = "#!/bin/bash -l\n";

const errorInfo = (reason: string) =>
ErrorInfo.create({ domain: "", reason: reason, metadata: {} });

export const appOps = (cluster: string): AppOps => {

const host = getClusterLoginNode(cluster);
Expand All @@ -79,8 +76,36 @@ export const appOps = (cluster: string): AppOps => {
const { appId, userId, account, coreCount, nodeCount, gpuCount, memory, maxTime, proxyBasePath,
partition, qos, customAttributes, appJobName } = request;

const memoryMb = memory ? Number(memory.slice(0, -2)) : undefined;
const jobName = appJobName;

// 检查作业名是否重复
await callOnOne(
cluster,
logger,
async (client) => {
await checkJobNameExisting(client,userId,jobName,logger);
},
).catch((e) => {
const ex = e as ServiceError;
const errors = parseErrorDetails(ex.metadata);
if (errors[0] && errors[0].$type === "google.rpc.ErrorInfo"
&& errors[0].reason === "ALREADY_EXISTS") {
throw new DetailedError({
code: Status.ALREADY_EXISTS,
message: ex.details,
details: [errorInfo("ALREADY_EXISTS")],
});
}
else {
throw new DetailedError({
code: ex.code,
message: ex.details,
details: [errorInfo("SBATCH_FAILED")],
});
}
});

const memoryMb = memory ? Number(memory.slice(0, -2)) : undefined;

const userSbatchOptions = customAttributes.sbatchOptions
? splitSbatchArgs(customAttributes.sbatchOptions)
Expand All @@ -97,8 +122,6 @@ export const appOps = (cluster: string): AppOps => {
});
}

const jobName = appJobName;

const workingDirectory = join(portalConfig.appJobsDir, jobName);

const lastSubmissionDirectory = join(portalConfig.appLastSubmissionDir, appId);
Expand Down
11 changes: 10 additions & 1 deletion apps/portal-server/src/services/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { ServiceError } from "@ddadaal/tsgrpc-common";
import { plugin } from "@ddadaal/tsgrpc-server";
import { Status } from "@grpc/grpc-js/build/src/constants";
import { jobInfoToPortalJobInfo, jobInfoToRunningjob } from "@scow/lib-scheduler-adapter";
import { checkSchedulerApiVersion } from "@scow/lib-server";
import { checkJobNameExisting, checkSchedulerApiVersion } from "@scow/lib-server";
import { createDirectoriesRecursively, sftpReadFile, sftpStat, sftpWriteFile } from "@scow/lib-ssh";
import { AccountStatusFilter, JobServiceServer, JobServiceService, TimeUnit } from "@scow/protos/build/portal/job";
import { parseErrorDetails } from "@scow/rich-error-model";
Expand Down Expand Up @@ -233,6 +233,15 @@ export const jobServiceServer = plugin((server) => {
, errorOutput, memory, scriptOutput } = request;
await checkActivatedClusters({ clusterIds: cluster });

// 检查作业名是否重复
await callOnOne(
cluster,
logger,
async (client) => {
await checkJobNameExisting(client,userId,jobName,logger);
},
);

// make sure working directory exists
const host = getClusterLoginNode(cluster);
if (!host) { throw clusterNotFound(cluster); }
Expand Down
9 changes: 8 additions & 1 deletion apps/portal-web/src/pageComponents/app/LaunchAppForm.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ export const LaunchAppForm: React.FC<Props> = ({ clusterId, appId, attributes, a
maxTime,
customAttributes: customFormKeyValue,
} })
.httpError(409, (e) => {
.httpError(500, (e) => {
if (e.code === "SBATCH_FAILED") {
createErrorModal(e.message);
} else {
Expand All @@ -137,6 +137,13 @@ export const LaunchAppForm: React.FC<Props> = ({ clusterId, appId, attributes, a
throw e;
}
})
.httpError(409, (e) => {
if (e.code === "ALREADY_EXISTS") {
createErrorModal(e.message);
} else {
throw e;
}
})
.then(() => {
message.success(t(p("successMessage")));
Router.push(`/apps/${clusterId}/sessions`);
Expand Down
10 changes: 10 additions & 0 deletions apps/portal-web/src/pageComponents/job/SubmitJobForm.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,16 @@ export const SubmitJobForm: React.FC<Props> = ({ initial = initialValues, submit
throw e;
}
})
.httpError(409, (e) => {
if (e.code === "ALREADY_EXISTS") {
modal.error({
title: t(p("errorMessage")),
content: e.message,
});
} else {
throw e;
}
})
.then(({ jobId }) => {
message.success(t(p("successMessage")) + jobId);
Router.push("/jobs/runningJobs");
Expand Down
11 changes: 9 additions & 2 deletions apps/portal-web/src/pages/api/app/createAppSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,17 @@ export const CreateAppSessionSchema = typeboxRouteSchema({
message: Type.String(),
}),

409: Type.Object({
code: Type.Literal("ALREADY_EXISTS"),
message: Type.String(),
}),

404: Type.Object({
code: Type.Literal("APP_NOT_FOUND"),
message: Type.String(),
}),

409: Type.Object({
500: Type.Object({
code: Type.Literal("SBATCH_FAILED"),
message: Type.String(),
}),
Expand Down Expand Up @@ -126,11 +131,13 @@ export default /* #__PURE__*/route(CreateAppSessionSchema, async (req, res) => {
if (errors[0] && errors[0].$type === "google.rpc.ErrorInfo") {
switch (errors[0].reason) {
case "SBATCH_FAILED":
return { 409: { code: "SBATCH_FAILED" as const, message: ex.details } };
return { 500: { code: "SBATCH_FAILED" as const, message: ex.details } };
case "NOT FOUND":
return { 404: { code: "APP_NOT_FOUND" as const, message: ex.details } };
case "INVALID ARGUMENT":
return { 400: { code: "INVALID_INPUT" as const, message: ex.details } };
case "ALREADY EXISTS":
return { 409: { code: "ALREADY_EXISTS" as const, message: ex.details } };
default:
return e;
}
Expand Down
6 changes: 6 additions & 0 deletions apps/portal-web/src/pages/api/job/submitJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ export const SubmitJobSchema = typeboxRouteSchema({
message: Type.String(),
}),

409: Type.Object({
code: Type.Literal("ALREADY_EXISTS"),
message: Type.String(),
}),

500: Type.Object({
code: Type.Literal("SCHEDULER_FAILED"),
message: Type.String(),
Expand Down Expand Up @@ -137,6 +142,7 @@ export default route(SubmitJobSchema, async (req, res) => {
.catch(handlegRPCError({
[status.INTERNAL]: (err) => ({ 500: { code: "SCHEDULER_FAILED", message: err.details } } as const),
[status.NOT_FOUND]: (err) => ({ 404: { code: "NOT_FOUND", message: err.details } } as const),
[status.ALREADY_EXISTS]: (err) => ({ 409: { code: "ALREADY_EXISTS", message: err.details } } as const),
},
async () => await callLog(
{ ...logInfo,
Expand Down
1 change: 1 addition & 0 deletions libs/server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
export * from "./apiAuthPlugin";
export * from "./app";
export * from "./date";
export * from "./job";
export * from "./misCommon/clustersActivation";
export * from "./scheduleAdapter";
export * from "./systemLanguage";
Expand Down
59 changes: 59 additions & 0 deletions libs/server/src/job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Copyright (c) 2022 Peking University and Peking University Institute for Computing and Digital Economy
* SCOW is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/

import { asyncClientCall } from "@ddadaal/tsgrpc-client";
import { Status } from "@grpc/grpc-js/build/src/constants";
import { SchedulerAdapterClient } from "@scow/lib-scheduler-adapter";
import { DetailedError, ErrorInfo } from "@scow/rich-error-model";
import { ApiVersion } from "@scow/utils/build/version";
import { Logger } from "ts-log";

import { compareSchedulerApiVersion, getSchedulerApiVersion } from "./scheduleAdapter";

export const errorInfo = (reason: string) =>
ErrorInfo.create({ domain: "", reason: reason, metadata: {} });

/**
* HPC提交作业前检查作业名和应用名是否重复
* @param client
* @param userId
* @param jobName
* @param logger
*/
export const checkJobNameExisting = async (client: SchedulerAdapterClient,userId: string,jobName: string,
logger: Logger) => {
// 检查作业重名的最低调度器接口版本
const minRequiredApiVersion: ApiVersion = { major: 1, minor: 6, patch: 0 };

const scheduleApiVersion = await getSchedulerApiVersion(client, logger);

if (compareSchedulerApiVersion(scheduleApiVersion,minRequiredApiVersion)) {
const existingJobName = await asyncClientCall(client.job, "getJobs", {
fields: ["job_id"],
filter: {
users: [userId], accounts: [], states: [], jobName,
},
}).then((resp) => resp.jobs);

if (existingJobName.length) {

throw new DetailedError({
code: Status.ALREADY_EXISTS,
message: `jobName ${jobName} is already existed`,
details: [errorInfo("ALREADY_EXISTS")],
});
}
} else {
logger.info("Adapter version lower than 1.6.0, do not perform check for duplicate job names");
}

};
Loading
Loading