Skip to content

Commit

Permalink
Remove cloudgene-sdk, improve logging and implement reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
lukfor committed Sep 7, 2023
1 parent 85c0c99 commit 9c69209
Show file tree
Hide file tree
Showing 47 changed files with 807 additions and 391 deletions.
6 changes: 0 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,6 @@

<dependencies>

<dependency>
<groupId>cloudgene</groupId>
<artifactId>cloudgene-java-sdk</artifactId>
<version>1.0.2</version>
</dependency>

<dependency>
<groupId>genepi</groupId>
<artifactId>genepi-io</artifactId>
Expand Down
26 changes: 13 additions & 13 deletions src/main/java/cloudgene/mapred/jobs/AbstractJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ public void run() {
return;
}

log.info("Setup job " + getId() + "...");
log.info("[Job {}] Setup job...", getId());
setState(AbstractJob.STATE_RUNNING);

setSetupStartTime(System.currentTimeMillis());
Expand All @@ -354,13 +354,13 @@ public void run() {
setSetupEndTime(System.currentTimeMillis());
setSetupRunning(false);
if (!isSetupComplete()) {
log.info("Setup failed for job " + getId() + ". Not added to Long Time Queue.");
log.info("[Job {}] Setup failed." , getId());
setFinishedOn(System.currentTimeMillis());
setComplete(true);
return;
}

log.info("Run job " + getId() + "...");
log.info("[Job {}] Running job...", getId());
setStartTime(System.currentTimeMillis());

try {
Expand Down Expand Up @@ -388,7 +388,7 @@ public void run() {

if (succesfull) {

log.info("Job " + getId() + ": executed successful.");
log.info("[Job {}] Execution successful.", getId());

writeLog("Job Execution successful.");
writeLog("Exporting Data...");
Expand All @@ -402,13 +402,13 @@ public void run() {
if (successfulAfter) {

setState(AbstractJob.STATE_SUCCESS);
log.info("Job " + getId() + ": data export successful.");
log.info("[Job {}] data export successful.", getId());
writeLog("Data Export successful.");

} else {

setState(AbstractJob.STATE_FAILED);
log.error("Job " + getId() + ": data export failed.");
log.error("[Job {}] data export failed.", getId());
writeLog("Data Export failed.");

}
Expand All @@ -421,15 +421,15 @@ public void run() {
String s = writer.toString();

setState(AbstractJob.STATE_FAILED);
log.error("Job " + getId() + ": data export failed.", e);
log.error("[Job {}] data export failed.", getId(), e);
writeLog("Data Export failed: " + e.getLocalizedMessage() + "\n" + s);

}

} else {

setState(AbstractJob.STATE_FAILED);
log.error("Job " + getId() + ": execution failed. " + getError());
log.error("[Job {}] Execution failed. {}", getId(), getError());
writeLog("Job Execution failed: " + getError());

}
Expand All @@ -438,13 +438,13 @@ public void run() {

writeLog("Cleaning up...");
onFailure();
log.info("Job " + getId() + ": cleanup successful.");
log.info("[Job {}]cleanup successful.", getId());
writeLog("Cleanup successful.");

} else {
writeLog("Cleaning up...");
cleanUp();
log.info("Job " + getId() + ": cleanup successful.");
log.info("[Job {}] cleanup successful.", getId());
writeLog("Cleanup successful.");

}
Expand All @@ -460,7 +460,7 @@ public void run() {
} catch (Exception | Error e) {

setState(AbstractJob.STATE_FAILED);
log.error("Job " + getId() + ": initialization failed.", e);
log.error("[Job {}]: initialization failed.", getId(), e);

Writer writer = new StringWriter();
PrintWriter printWriter = new PrintWriter(writer);
Expand All @@ -471,7 +471,7 @@ public void run() {

writeLog("Cleaning up...");
onFailure();
log.info("Job " + getId() + ": cleanup successful.");
log.info("[Job {}]: cleanup successful.", getId());
writeLog("Cleanup successful.");

closeStdOutFiles();
Expand All @@ -484,7 +484,7 @@ public void run() {
public void cancel() {

writeLog("Canceled by user.");
log.info("Job " + getId() + ": canceld by user.");
log.info("[Job {}]: canceld by user.", getId());

/*
* if (state == STATE_RUNNING) { closeStdOutFiles(); }
Expand Down
97 changes: 10 additions & 87 deletions src/main/java/cloudgene/mapred/jobs/CloudgeneContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@
import java.util.Vector;

import cloudgene.mapred.core.User;
import cloudgene.mapred.util.HashUtil;
import cloudgene.mapred.jobs.sdk.WorkflowContext;
import cloudgene.mapred.util.MailUtil;
import cloudgene.mapred.util.Settings;
import cloudgene.sdk.internal.WorkflowContext;
import genepi.io.FileUtil;

public class CloudgeneContext extends WorkflowContext {
Expand Down Expand Up @@ -42,12 +41,10 @@ public class CloudgeneContext extends WorkflowContext {

private Map<String, Object> data = new HashMap<String, Object>();

private Map<String, String> config;
private Map<String, Object> config;

private int chunks = 0;

private Map<String, List<Download>> customDownloads = new HashMap<String, List<Download>>();

public CloudgeneContext(CloudgeneJob job) {

this.workingDirectory = job.getWorkingDirectory();
Expand All @@ -67,10 +64,8 @@ public CloudgeneContext(CloudgeneJob job) {
}

outputParameters = new HashMap<String, CloudgeneParameterOutput>();
customDownloads = new HashMap<String, List<Download>>();
for (CloudgeneParameterOutput param : job.getOutputParams()) {
outputParameters.put(param.getName(), param);
customDownloads.put(param.getName(), new Vector<Download>());
}

settings = job.getSettings();
Expand All @@ -85,54 +80,6 @@ public CloudgeneStep getCurrentStep() {
return step;
}

/*public void setupOutputParameters() throws Exception {
FileUtil.deleteDirectory(getLocalTemp());
// create output directories
FileUtil.createDirectory(getLocalOutput());
FileUtil.createDirectory(getLocalTemp());
// create output directories
for (CloudgeneParameterOutput param : outputParameters.values()) {
switch (param.getType()) {
case HDFS_FILE:
case HDFS_FOLDER:
throw new Exception("HDFS support was removed in Cloudgene 3");
case LOCAL_FILE:
String parent = getLocalOutput();
if (!param.isDownload()) {
parent = getLocalTemp();
}
String folder = FileUtil.path(parent, param.getName());
String filename = FileUtil.path(folder, param.getName());
// delete and create (needed for restart)
FileUtil.deleteDirectory(folder);
FileUtil.createDirectory(folder);
param.setValue(filename);
break;
case LOCAL_FOLDER:
String parent2 = getLocalOutput();
if (!param.isDownload()) {
parent2 = getLocalTemp();
}
String folder2 = FileUtil.path(parent2, param.getName());
// delete and create (needed for restart)
FileUtil.deleteDirectory(folder2);
FileUtil.createDirectory(folder2);
param.setValue(folder2);
break;
}
}
}*/

public String getInput(String param) {

if (inputParameters.get(param) != null) {
Expand Down Expand Up @@ -170,28 +117,6 @@ public String get(String param) {
return result;
}
}

@Override
public void addDownload(String param, String name, String size, String path) {
List<Download> downloads = customDownloads.get(param);
if (downloads == null) {
new RuntimeException("Parameter " + param + " is unknown.");
}

String hash = HashUtil.getSha256(name + size + path + (Math.random() * 100000));
Download download = new Download();
download.setName(name);
download.setSize(size);
download.setPath(path);
download.setHash(hash);
download.setCount(CloudgeneJob.MAX_DOWNLOAD);

downloads.add(download);
}

public List<Download> getDownloads(String param){
return customDownloads.get(param);
}

public Settings getSettings() {
return settings;
Expand Down Expand Up @@ -257,10 +182,10 @@ public boolean sendMail(String to, String subject, String body) throws Exception
Settings settings = getSettings();

if (settings.getMail() != null) {
MailUtil.send(settings.getMail().get("smtp"), settings.getMail().get("port"), settings.getMail().get("user"),
settings.getMail().get("password"), settings.getMail().get("name"), to,
"[" + settings.getName() + "] " + subject, body);

MailUtil.send(settings.getMail().get("smtp"), settings.getMail().get("port"),
settings.getMail().get("user"), settings.getMail().get("password"), settings.getMail().get("name"),
to, "[" + settings.getName() + "] " + subject, body);

}
return true;
Expand All @@ -286,7 +211,7 @@ public void setInput(String input, String value) {
parameter.setValue(value);
}

public void setOutput(String input, String value) {
public void setOutput2(String input, String value) {

CloudgeneParameterOutput parameter = outputParameters.get(input);
parameter.setValue(value);
Expand Down Expand Up @@ -374,7 +299,6 @@ public void beginTask(String name) {
logs.add(status);
}


public Message createTask(String name) {
Message status = new Message(step, Message.RUNNING, name);

Expand All @@ -387,8 +311,6 @@ public Message createTask(String name) {
return status;
}



public void beginTask(String name, int totalWork) {
beginTask(name);
}
Expand Down Expand Up @@ -424,14 +346,15 @@ public void setData(String key, Object object) {
}

@Override
public void setConfig(Map<String, String> config) {
public void setConfig(Map<String, Object> config) {
this.config = config;
}

@Override
public String getConfig(String param) {
if (config != null) {
return config.get(param);
Object value = config.get(param);
return value != null ? value.toString() : null;
} else {
return null;
}
Expand Down
21 changes: 13 additions & 8 deletions src/main/java/cloudgene/mapred/jobs/CloudgeneJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@
import org.slf4j.LoggerFactory;

import cloudgene.mapred.core.User;
import cloudgene.mapred.jobs.engine.Executor;
import cloudgene.mapred.jobs.engine.ExecutableStep;
import cloudgene.mapred.jobs.engine.Executor;
import cloudgene.mapred.jobs.engine.Planner;
import cloudgene.mapred.jobs.workspace.WorkspaceWrapper;
import cloudgene.mapred.wdl.WdlApp;
import cloudgene.mapred.wdl.WdlParameterInput;
import cloudgene.mapred.wdl.WdlParameterOutput;
Expand Down Expand Up @@ -102,12 +101,13 @@ public boolean setup() {
FileUtil.createDirectory(context.getLocalTemp());

try {
log.info("[Job {}] Setup workspace {}'", getId(), workspace.getName());
context.log("Setup External Workspace on " + workspace.getName());
workspace.setup(this.getId());
context.setExternalWorkspace(new WorkspaceWrapper(workspace));
context.setWorkspace(workspace);
} catch (Exception e) {
writeLog(e.toString());
log.info("Error setup external workspace", e);
log.error("[Job {}] Error setup external workspace failed.", getId(), e);
setError(e.toString());
return false;
}
Expand All @@ -118,16 +118,18 @@ public boolean setup() {
switch (param.getType()) {
case HDFS_FILE:
case HDFS_FOLDER:

log.info("[Job {}] Setting output param '{}' failed. HDFS support was removed in Cloudgene 3'", getId(), param.getName());
throw new RuntimeException("HDFS support was removed in Cloudgene 3");

case LOCAL_FILE:
case LOCAL_FILE:
String filename = workspace.createFile(param.getName(), param.getName());
param.setValue(filename);
log.info("[Job {}] Set output file '{}' to '{}'", getId(), param.getName(), param.getValue());
break;

case LOCAL_FOLDER:
String folder = workspace.createFolder(param.getName());
log.info("[Job {}] Set output folder '{}' to '{}'", getId(), param.getName(), param.getValue());
param.setValue(folder);
break;
}
Expand All @@ -150,7 +152,7 @@ public boolean execute() {
// merge setup steps and normal steps
List<WdlStep> steps = new Vector<WdlStep>(app.getWorkflow().getSetups());
steps.addAll(app.getWorkflow().getSteps());
log.info("Job " + getId() + " execute " + steps.size() + " steps");
log.info("[Job {}] execute {} steps", getId(), steps.size());

// execute steps
executor = new Executor();
Expand Down Expand Up @@ -221,12 +223,15 @@ public boolean executeFailureStep(WdlStep failedStep) {
@Override
public boolean cleanUp() {

log.info("[Job {}] Cleaning up...", getId());

try {
workspace.cleanup(getId());
} catch (IOException e) {
writeLog("Cleanup failed.");
writeLog(e.getMessage());
setError(e.getMessage());
log.error("[Job {}] Clean up failed.", getId(), e);
return false;
}

Expand All @@ -236,7 +241,7 @@ public boolean cleanUp() {
@Override
public boolean after() {

log.info("Execute after and export params...");
log.info("[Job {}] Export parameters...", getId());

for (CloudgeneParameterOutput out : getOutputParams()) {
if (out.isDownload()) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/cloudgene/mapred/jobs/CloudgeneStep.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import java.io.InputStreamReader;
import java.util.List;

import cloudgene.mapred.jobs.sdk.WorkflowContext;
import cloudgene.mapred.wdl.WdlStep;
import cloudgene.sdk.internal.WorkflowContext;

public abstract class CloudgeneStep {

Expand Down
Loading

0 comments on commit 9c69209

Please sign in to comment.