Skip to content

Commit

Permalink
[HWORKS-881] [HWORKS-881] Validate SparkJob and Python job applicatio…
Browse files Browse the repository at this point in the history
…n file exists before launching job (#1457)
  • Loading branch information
robzor92 authored Jan 17, 2024
1 parent 8b89075 commit 87577be
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 49 deletions.
93 changes: 49 additions & 44 deletions hopsworks-IT/src/test/ruby/spec/execution_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -189,74 +189,79 @@ def run_job_and_get_logs(project, job_name)
before :all do
with_valid_tour_project("spark")
end
before :each do
$job_name = "j_#{short_random_id}"
end
after :each do
clean_jobs(@project[:id])
end
it "should start a job and use default args" do
create_sparktour_job(@project, $job_name, type)
default_args = json_body[:config][:defaultArgs]
start_execution(@project[:id], $job_name)
execution_id = json_body[:id]
begin
get_execution(@project[:id], $job_name, execution_id)
expect(json_body[:args]).not_to be_nil
expect(default_args).not_to be_nil
expect(json_body[:args]).to eq default_args
ensure
wait_for_execution_completed(@project[:id], $job_name, execution_id, "FINISHED")
end
end
it "should start a job with args 123" do
create_sparktour_job(@project, $job_name, type)
args = "123"
start_execution(@project[:id], $job_name, args: args)
execution_id = json_body[:id]
begin
get_execution(@project[:id], $job_name, execution_id)
expect(json_body[:args]).to eq args
ensure
wait_for_execution_completed(@project[:id], $job_name, execution_id, "FINISHED")
end
end
it "should fail to start a spark job with missing appPath" do
create_sparktour_job(@project, $job_name, type)
config = json_body[:config]

create_sparktour_job(@project, $job_name, type, job_conf: config, expected_status: 200)
delete_dataset(@project, config[:appPath], datasetType: "?type=DATASET")
#start execution
start_execution(@project[:id], $job_name, expected_status: 400)
end
it "should fail to start a spark job with missing spark.yarn.dist.files" do
$job_name_2 = "demo_job_2_" + type
create_sparktour_job(@project, $job_name_2, type)
create_sparktour_job(@project, $job_name, type)
config = json_body[:config]
config[:'spark.yarn.dist.files'] = "hdfs:///Projects/#{@project[:projectname]}/Resources/iamnothere.txt"
create_sparktour_job(@project, $job_name_2, type, job_conf: config, expected_status: 200)
create_sparktour_job(@project, $job_name, type, job_conf: config, expected_status: 200)
#start execution
start_execution(@project[:id], $job_name_2, expected_status: 400)
start_execution(@project[:id], $job_name, expected_status: 400)
end
it "should fail to start a spark job with missing spark.yarn.dist.pyFiles" do
$job_name_2 = "demo_job_2_" + type
create_sparktour_job(@project, $job_name_2, type)
create_sparktour_job(@project, $job_name, type)
config = json_body[:config]
config[:'spark.yarn.dist.pyFiles'] = "hdfs:///Projects/#{@project[:projectname]}/Resources/iamnothere.py"
create_sparktour_job(@project, $job_name_2, type, job_conf: config, expected_status: 200)
create_sparktour_job(@project, $job_name, type, job_conf: config, expected_status: 200)
#start execution
start_execution(@project[:id], $job_name_2, expected_status: 400)
start_execution(@project[:id], $job_name, expected_status: 400)
end
it "should fail to start a spark job with missing spark.yarn.dist.jars" do
$job_name_2 = "demo_job_2_" + type
create_sparktour_job(@project, $job_name_2, type)
create_sparktour_job(@project, $job_name, type)
config = json_body[:config]
config[:'spark.yarn.dist.jars'] = "hdfs:///Projects/#{@project[:projectname]}/Resources/iamnothere.jar"
create_sparktour_job(@project, $job_name_2, type, job_conf: config, expected_status: 200)
create_sparktour_job(@project, $job_name, type, job_conf: config, expected_status: 200)
#start execution
start_execution(@project[:id], $job_name_2, expected_status: 400)
start_execution(@project[:id], $job_name, expected_status: 400)
end
it "should fail to start a spark job with missing spark.yarn.dist.archives" do
$job_name_2 = "demo_job_2_" + type
create_sparktour_job(@project, $job_name_2, type)
create_sparktour_job(@project, $job_name, type)
config = json_body[:config]
config[:'spark.yarn.dist.archives'] = "hdfs:///Projects/#{@project[:projectname]}/Resources/iamnothere.zip"
create_sparktour_job(@project, $job_name_2, type, job_conf: config, expected_status: 200)
create_sparktour_job(@project, $job_name, type, job_conf: config, expected_status: 200)

#start execution
start_execution(@project[:id], $job_name_2, expected_status: 400)
end
it "should start a job and use default args" do
$job_name_3 = "demo_job_3_" + type
create_sparktour_job(@project, $job_name_3, type)
default_args = json_body[:config][:defaultArgs]
start_execution(@project[:id], $job_name_3)
execution_id = json_body[:id]
begin
get_execution(@project[:id], $job_name_3, execution_id)
expect(json_body[:args]).not_to be_nil
expect(default_args).not_to be_nil
expect(json_body[:args]).to eq default_args
ensure
wait_for_execution_completed(@project[:id], $job_name_3, execution_id, "FINISHED")
end
end
it "should start a job with args 123" do
$job_name_3 = "demo_job_3_" + type
create_sparktour_job(@project, $job_name_3, type)
args = "123"
start_execution(@project[:id], $job_name_3, args: args)
execution_id = json_body[:id]
begin
get_execution(@project[:id], $job_name_3, execution_id)
expect(json_body[:args]).to eq args
ensure
wait_for_execution_completed(@project[:id], $job_name_3, execution_id, "FINISHED")
end
start_execution(@project[:id], $job_name, expected_status: 400)
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ public Response startNotebookServer(JupyterSettings jupyterSettings, @Context Ht

//Inspect dependencies
sparkController
.inspectDependencies(project, hopsworksUser, (SparkJobConfiguration) jupyterSettings.getJobConfig());
.inspectDependencies(project, hopsworksUser, (SparkJobConfiguration) jupyterSettings.getJobConfig(), false);
dto = jupyterManager.startJupyterServer(project, hopsworksUser, configSecret, jupyterSettings, allowOrigin);
jupyterJWTManager.materializeJWT(hopsworksUser, project, jupyterSettings, dto.getCid(), dto.getPort(),
JUPYTER_JWT_AUD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private void sanityCheck(Jobs job, Users user) throws GenericException, ProjectE
throw new IllegalArgumentException("Path does not point to a .jar, .py or .ipynb file.");
}

inspectDependencies(job.getProject(), user, (SparkJobConfiguration) job.getJobConfig());
inspectDependencies(job.getProject(), user, (SparkJobConfiguration) job.getJobConfig(), true);

}

Expand Down Expand Up @@ -214,16 +214,23 @@ public SparkJobConfiguration inspectProgram(SparkJobConfiguration existingConfig
return sparkConfig;
}

public void inspectDependencies(Project project, Users user, SparkJobConfiguration jobConf)
public void inspectDependencies(Project project, Users user, SparkJobConfiguration jobConf, boolean isJob)
throws ProjectException, GenericException {
DistributedFileSystemOps udfso = null;
try {
if(isJob) {
udfso = dfs.getDfsOps(hdfsUsersBean.getHdfsUserName(project, user));
if (!udfso.exists(jobConf.getAppPath())) {
throw new ProjectException(RESTCodes.ProjectErrorCode.FILE_NOT_FOUND, Level.FINEST,
"Job application file does not exist: " + jobConf.getAppPath());
}
}

if (!Strings.isNullOrEmpty(jobConf.getArchives())
|| !Strings.isNullOrEmpty(jobConf.getFiles())
|| !Strings.isNullOrEmpty(jobConf.getJars())
|| !Strings.isNullOrEmpty(jobConf.getPyFiles())) {

udfso = dfs.getDfsOps(hdfsUsersBean.getHdfsUserName(project, user));

if (!Strings.isNullOrEmpty(jobConf.getArchives())) {
for (String filePath : jobConf.getArchives().split(",")) {
if (!Strings.isNullOrEmpty(filePath) && !udfso.exists(filePath)) {
Expand Down

0 comments on commit 87577be

Please sign in to comment.