Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ protected int checkPendingJob(Job job) {
try {
String queue = getQueue(tool);
logger.info("Queue job '{}' on queue '{}'", job.getId(), queue);
batchExecutor.execute(job.getId(), queue, authenticatedCommandLine, stdout, stderr);
batchExecutor.execute(job.getStudy().getId(), job.getId(), queue, authenticatedCommandLine, stdout, stderr);
} catch (Exception e) {
logger.error("Error executing job {}.", job.getId(), e);
return abortJob(job, "Error executing job. " + e.getMessage());
Expand Down Expand Up @@ -921,7 +921,7 @@ private Enums.ExecutionStatus getCurrentStatus(Job job) {
}
}

String status = batchExecutor.getStatus(job.getId());
String status = batchExecutor.getStatus(job.getStudy().getId(), job.getId());
if (!StringUtils.isEmpty(status) && !status.equals(Enums.ExecutionStatus.UNKNOWN)) {
return new Enums.ExecutionStatus(status);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,12 @@ private BatchClient createBatchClient() {
}

@Override
public void execute(String jobId, String queue, String commandLine, Path stdout, Path stderr) throws Exception {
public void execute(String studyId, String jobId, String queue, String commandLine, Path stdout, Path stderr) throws Exception {
// submitAzureTask(job, token);
}

@Override
public String getStatus(String jobId) {
public String getStatus(String studyId, String jobId) {
return null;
// try {
// CloudTask cloudTask = batchClient.taskOperations().getTask(getAzureJobType(job.getType()), job.getId());
Expand All @@ -118,17 +118,17 @@ public String getStatus(String jobId) {
}

@Override
public boolean stop(String jobId) throws Exception {
public boolean stop(String studyId, String jobId) throws Exception {
return false;
}

@Override
public boolean resume(String jobId) throws Exception {
public boolean resume(String studyId, String jobId) throws Exception {
return false;
}

@Override
public boolean kill(String jobId) throws Exception {
public boolean kill(String studyId, String jobId) throws Exception {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ public interface BatchExecutor {
String OUT_LOG_EXTENSION = ".out";
String ERR_LOG_EXTENSION = ".err";

void execute(String jobId, String queue, String commandLine, Path stdout, Path stderr) throws Exception;
void execute(String studyId, String jobId, String queue, String commandLine, Path stdout, Path stderr) throws Exception;

String getStatus(String jobId);
String getStatus(String studyId, String jobId);

boolean stop(String jobId) throws Exception;
boolean stop(String studyId, String jobId) throws Exception;

boolean resume(String jobId) throws Exception;
boolean resume(String studyId, String jobId) throws Exception;

boolean kill(String jobId) throws Exception;
boolean kill(String studyId, String jobId) throws Exception;

default boolean canBeQueued() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ public void onClose(KubernetesClientException e) {
}

@Override
public void execute(String jobId, String queue, String commandLine, Path stdout, Path stderr) throws Exception {
String jobName = buildJobName(jobId);
public void execute(String studyId, String jobId, String queue, String commandLine, Path stdout, Path stderr) throws Exception {
String jobName = buildJobName(studyId, jobId);
final io.fabric8.kubernetes.api.model.batch.Job k8sJob = new JobBuilder()
.withApiVersion("batch/v1")
.withKind("Job")
Expand Down Expand Up @@ -305,12 +305,20 @@ private boolean shouldAddDockerDaemon(String queue) {
* DNS-1123 subdomain must consist of lower case alphanumeric characters, '-' or '.', and must
* start and end with an alphanumeric character (e.g. 'example.com', regex used for validation
* is '[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*')
* @param jobIdInput job Is
*
* @param studyId Study id
* @param jobIdInput job Id
* @link https://github.com/kubernetes/kubernetes/blob/c560907/staging/src/k8s.io/apimachinery/pkg/util/validation/validation.go#L135
* @return valid name
*/
protected static String buildJobName(String jobIdInput) {
String jobId = jobIdInput.replace("_", "-");
protected static String buildJobName(String studyId, String jobIdInput) {
String jobId;
if (studyId != null) {
jobId = studyId.substring(studyId.indexOf(":") + 1) + "-" + jobIdInput;
jobIdInput = studyId + "-" + jobIdInput;
} else {
jobId = jobIdInput;
}
int[] invalidChars = jobId
.chars()
.filter(c -> c != '-' && !StringUtils.isAlphanumeric(String.valueOf((char) c)))
Expand All @@ -335,40 +343,47 @@ protected static String buildJobName(String jobIdInput) {
}

@Override
public String getStatus(String jobId) {
String k8sJobName = buildJobName(jobId);
String status = jobStatusCache.compute(k8sJobName, (k, v) -> {
if (v == null) {
logger.warn("Missing job " + k8sJobName + " in cache. Fetch JOB info");
return Pair.of(Instant.now(), getStatusForce(k));
} else if (v.getKey().until(Instant.now(), ChronoUnit.MINUTES) > 10) {
String newStatus = getStatusForce(k);
String oldStatus = v.getValue();
if (!oldStatus.equals(newStatus)) {
logger.warn("Update job " + k8sJobName + " from status cache. Change from " + oldStatus + " to " + newStatus);
} else {
logger.debug("Update job " + k8sJobName + " from status cache. Status unchanged");
}
return Pair.of(Instant.now(), newStatus);
}
return v;
}).getValue();
public String getStatus(String studyId, String jobId) {
String k8sJobName = buildJobName(studyId, jobId);
String status = jobStatusCache.compute(k8sJobName, this::updateStatus).getValue();
if (Objects.equals(status, Enums.ExecutionStatus.ABORTED)) {
// FIXME: Keep compatibility with running k8s jobs without studyId. This should be removed
// See #TASK-1384
status = jobStatusCache.compute(buildJobName(null, jobId), this::updateStatus).getValue();
}
logger.debug("Get status from job " + k8sJobName + ". Cache size: " + jobStatusCache.size() + " . Status: " + status);
return status;
}

private Pair<Instant, String> updateStatus(String k8sJobName, Pair<Instant, String> v) {
if (v == null) {
logger.warn("Missing job " + k8sJobName + " in cache. Fetch JOB info");
return Pair.of(Instant.now(), getStatusForce(k8sJobName));
} else if (v.getKey().until(Instant.now(), ChronoUnit.MINUTES) > 10) {
String newStatus = getStatusForce(k8sJobName);
String oldStatus = v.getValue();
if (!oldStatus.equals(newStatus)) {
logger.warn("Update job " + k8sJobName + " from status cache. Change from " + oldStatus + " to " + newStatus);
} else {
logger.debug("Update job " + k8sJobName + " from status cache. Status unchanged");
}
return Pair.of(Instant.now(), newStatus);
}
return v;
}

@Override
public boolean stop(String jobId) throws Exception {
public boolean stop(String studyId, String jobId) throws Exception {
return false;
}

@Override
public boolean resume(String jobId) throws Exception {
public boolean resume(String studyId, String jobId) throws Exception {
return false;
}

@Override
public boolean kill(String jobId) throws Exception {
public boolean kill(String studyId, String jobId) throws Exception {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ protected boolean removeEldestEntry(Map.Entry<String, String> eldest) {
}

@Override
public void execute(String jobId, String queue, String commandLine, Path stdout, Path stderr) throws Exception {
public void execute(String studyId, String jobId, String queue, String commandLine, Path stdout, Path stderr) throws Exception {
jobStatus.put(jobId, Enums.ExecutionStatus.QUEUED);
Runnable runnable = () -> {
try {
Expand Down Expand Up @@ -119,22 +119,22 @@ private static synchronized int nextThreadNum() {
}

@Override
public String getStatus(String jobId) {
public String getStatus(String studyId, String jobId) {
return jobStatus.getOrDefault(jobId, Enums.ExecutionStatus.UNKNOWN);
}

@Override
public boolean stop(String jobId) throws Exception {
public boolean stop(String studyId, String jobId) throws Exception {
return false;
}

@Override
public boolean resume(String jobId) throws Exception {
public boolean resume(String studyId, String jobId) throws Exception {
return false;
}

@Override
public boolean kill(String jobId) throws Exception {
public boolean kill(String studyId, String jobId) throws Exception {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ public SGEExecutor(Execution execution) {
}

@Override
public void execute(String jobId, String queue, String commandLine, Path stdout, Path stderr) throws Exception {
public void execute(String studyId, String jobId, String queue, String commandLine, Path stdout, Path stderr) throws Exception {
sgeManager.queueJob(jobId, "", -1, getCommandLine(commandLine, stdout, stderr), null);
}

@Override
public String getStatus(String jobId) {
public String getStatus(String studyId, String jobId) {
return Enums.ExecutionStatus.UNKNOWN;
// String status;
// try {
Expand Down Expand Up @@ -70,17 +70,17 @@ public String getStatus(String jobId) {
}

@Override
public boolean stop(String jobId) throws Exception {
public boolean stop(String studyId, String jobId) throws Exception {
return false;
}

@Override
public boolean resume(String jobId) throws Exception {
public boolean resume(String studyId, String jobId) throws Exception {
return false;
}

@Override
public boolean kill(String jobId) throws Exception {
public boolean kill(String studyId, String jobId) throws Exception {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,13 +512,13 @@ private static class DummyBatchExecutor implements BatchExecutor {
public Map<String, String> jobStatus = new HashMap<>();

@Override
public void execute(String jobId, String queue, String commandLine, Path stdout, Path stderr) throws Exception {
public void execute(String studyId, String jobId, String queue, String commandLine, Path stdout, Path stderr) throws Exception {
System.out.println("Executing job " + jobId + " --- " + commandLine);
jobStatus.put(jobId, Enums.ExecutionStatus.QUEUED);
}

@Override
public String getStatus(String jobId) {
public String getStatus(String studyId, String jobId) {
return jobStatus.getOrDefault(jobId, Enums.ExecutionStatus.UNKNOWN);
}

Expand All @@ -528,17 +528,17 @@ public boolean isExecutorAlive() {
}

@Override
public boolean stop(String jobId) throws Exception {
public boolean stop(String studyId, String jobId) throws Exception {
return false;
}

@Override
public boolean resume(String jobId) throws Exception {
public boolean resume(String studyId, String jobId) throws Exception {
return false;
}

@Override
public boolean kill(String jobId) throws Exception {
public boolean kill(String studyId, String jobId) throws Exception {
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,40 @@ public class K8SExecutorTest {

@Test
public void testBuildJobName() {
assertEquals("opencga-job-really-complicated-j-b-2---id-e8b6f0",
K8SExecutor.buildJobName("really_Complicated J@b 2£$ ID"));
assertEquals("opencga-job-study-really-complicated-j-b-2---id-80a79e",
K8SExecutor.buildJobName("user@project:study", "really_Complicated J@b 2£$ ID"));

String jobName = K8SExecutor.buildJobName("really_Complicated and extra super duper large job name for a simple task 20210323122209");
String expected = "opencga-job-really-complica-1d9128--simple-task-20210323122209";
String jobName = K8SExecutor.buildJobName("user@project:study", "really_Complicated and extra super duper large job name for a simple task 20210323122209");
String expected = "opencga-job-study-really-co-89b6a8--simple-task-20210323122209";
assertEquals(expected, jobName);

assertEquals("opencga-job-my-job-1-92b223", K8SExecutor.buildJobName("my job 1"));
assertEquals("opencga-job-my-job-1-f3eb13", K8SExecutor.buildJobName("my_job_1"));
assertEquals("opencga-job-my-job-1-672a51", K8SExecutor.buildJobName("my:job:1"));
assertEquals("opencga-job-my-job-1-94caca", K8SExecutor.buildJobName("My-job-1"));
assertEquals("opencga-job-my-job-1", K8SExecutor.buildJobName("my-job-1"));

assertEquals("opencga-job-abcdefghijklmnopqrstuvwxyz12345678900987654321", K8SExecutor.buildJobName("abcdefghijklmnopqrstuvwxyz12345678900987654321"));
assertEquals("opencga-job-abcdefghijklmno-cb3af7-tuvwxyz12345678900987654321", K8SExecutor.buildJobName("ABCDEFGHIJKLMNOPQRSTUVWXYZ12345678900987654321"));
assertEquals("opencga-job-abcdefghijklmno-47a86e-stuvwxyz1234567890098765432", K8SExecutor.buildJobName("ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890098765432"));
assertEquals("opencga-job-abcdefghijklmno-9c2366-rstuvwxyz123456789009876543", K8SExecutor.buildJobName("ABCDEFGHIJKLMNOPQRSTUVWXYZ123456789009876543"));
assertEquals("opencga-job-abcdefghijklmnopqrstuvwxyz12345678900987654-890ca6", K8SExecutor.buildJobName("ABCDEFGHIJKLMNOPQRSTUVWXYZ12345678900987654"));
assertEquals("opencga-job-abcdefghijklmnopqrstuvwxyz1234567890-016fd6", K8SExecutor.buildJobName("ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890"));
assertEquals("opencga-job-my-job-1-92b223", K8SExecutor.buildJobName(null, "my job 1"));
assertEquals("opencga-job-my-job-1-f3eb13", K8SExecutor.buildJobName(null, "my_job_1"));
assertEquals("opencga-job-my-job-1-672a51", K8SExecutor.buildJobName(null, "my:job:1"));
assertEquals("opencga-job-my-job-1-94caca", K8SExecutor.buildJobName(null, "My-job-1"));
assertEquals("opencga-job-my-job-1", K8SExecutor.buildJobName(null, "my-job-1"));

assertEquals("opencga-job-study-my-job-1-39dd81", K8SExecutor.buildJobName("user@project:study", "my job 1"));
assertEquals("opencga-job-study-my-job-1-0bb01d", K8SExecutor.buildJobName("user@project:study", "my_job_1"));
assertEquals("opencga-job-study-my-job-1-75954c", K8SExecutor.buildJobName("user@project:study", "my:job:1"));
assertEquals("opencga-job-study-my-job-1-d95244", K8SExecutor.buildJobName("user@project:study", "My-job-1"));
assertEquals("opencga-job-study-my-job-1-ecd290", K8SExecutor.buildJobName("user@project:study", "my-job-1"));
assertEquals("opencga-job-study-my-job-1-354a5a", K8SExecutor.buildJobName("user2@project:study", "my-job-1"));
assertEquals("opencga-job-study-my-job-1-da217f", K8SExecutor.buildJobName("user@project2:study", "my-job-1"));

assertEquals("opencga-job-abcdefghijklmnopqrstuvwxyz12345678900987654321", K8SExecutor.buildJobName(null, "abcdefghijklmnopqrstuvwxyz12345678900987654321"));
assertEquals("opencga-job-abcdefghijklmno-cb3af7-tuvwxyz12345678900987654321", K8SExecutor.buildJobName(null, "ABCDEFGHIJKLMNOPQRSTUVWXYZ12345678900987654321"));
assertEquals("opencga-job-abcdefghijklmno-47a86e-stuvwxyz1234567890098765432", K8SExecutor.buildJobName(null, "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890098765432"));
assertEquals("opencga-job-abcdefghijklmno-9c2366-rstuvwxyz123456789009876543", K8SExecutor.buildJobName(null, "ABCDEFGHIJKLMNOPQRSTUVWXYZ123456789009876543"));
assertEquals("opencga-job-abcdefghijklmnopqrstuvwxyz12345678900987654-890ca6", K8SExecutor.buildJobName(null, "ABCDEFGHIJKLMNOPQRSTUVWXYZ12345678900987654"));
assertEquals("opencga-job-abcdefghijklmnopqrstuvwxyz1234567890-016fd6", K8SExecutor.buildJobName(null, "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890"));

assertEquals("opencga-job-s-abcdefghijklmnopqrstuvwxyz12345678900987654321", K8SExecutor.buildJobName("s", "abcdefghijklmnopqrstuvwxyz12345678900987654321"));
assertEquals("opencga-job-s-abcdefghijklm-adb5ea-tuvwxyz12345678900987654321", K8SExecutor.buildJobName("s", "ABCDEFGHIJKLMNOPQRSTUVWXYZ12345678900987654321"));
assertEquals("opencga-job-s-abcdefghijklm-2cc773-stuvwxyz1234567890098765432", K8SExecutor.buildJobName("s", "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890098765432"));
assertEquals("opencga-job-s-abcdefghijklm-6fb1b7-rstuvwxyz123456789009876543", K8SExecutor.buildJobName("s", "ABCDEFGHIJKLMNOPQRSTUVWXYZ123456789009876543"));
assertEquals("opencga-job-s-abcdefghijklmnopqrstuvwxyz123456789009876-a64d4a", K8SExecutor.buildJobName("s", "ABCDEFGHIJKLMNOPQRSTUVWXYZ123456789009876"));
assertEquals("opencga-job-s-abcdefghijklmnopqrstuvwxyz1234567890-72eaa2", K8SExecutor.buildJobName("s", "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,14 @@ public void setUp() throws Exception {
@Test(timeout = 10000)
public void test() throws Exception {
System.out.println(rootDir.toAbsolutePath());
String study = "study";
for (int i = 0; i < 10; i++) {
localExecutor.execute("jobId-" + i, "default", "echo Hello World", rootDir.resolve("out_" + i + ".txt"), rootDir.resolve("err_" + i + ".txt"));
localExecutor.execute(study, "jobId-" + i, "default", "echo Hello World", rootDir.resolve("out_" + i + ".txt"), rootDir.resolve("err_" + i + ".txt"));
}

for (int i = 0; i < 10; i++) {
String jobId = "jobId-" + i;
while(!localExecutor.getStatus(jobId).equals("DONE")) {
while(!localExecutor.getStatus(study, jobId).equals("DONE")) {
Thread.sleep(1000);
}
Assert.assertTrue(Files.exists(rootDir.resolve("out_" + i + ".txt")));
Expand Down