Skip to content

Commit

Permalink
Support s3 in Nextflow tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
lukfor committed Sep 4, 2023
1 parent 1788135 commit 8e6bdc4
Showing 1 changed file with 17 additions and 14 deletions.
31 changes: 17 additions & 14 deletions src/main/java/cloudgene/mapred/plugins/nextflow/NextflowTask.java
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
package cloudgene.mapred.plugins.nextflow;

import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;

import cloudgene.mapred.jobs.CloudgeneContext;
import cloudgene.sdk.internal.IExternalWorkspace;
import genepi.io.FileUtil;
import genepi.io.text.LineReader;

public class NextflowTask {

private static final String CLOUDGENE_LOG = "cloudgene.log";

private int id;

private Map<String, Object> trace;

private String log = null;

private CloudgeneContext context;
public NextflowTask( CloudgeneContext context, Map<String, Object> trace) {

public NextflowTask(CloudgeneContext context, Map<String, Object> trace) {
id = (Integer) trace.get("task_id");
this.trace = trace;
this.context = context;
Expand All @@ -33,21 +38,19 @@ public void update(Map<String, Object> trace) throws IOException {
String status = (String) trace.get("status");
if (status.equals("COMPLETED") || status.equals("FAILED")) {
String workDir = (String) trace.get("workdir");
String logFilename = FileUtil.path(workDir, "cloudgene.log");
// TODO: implement s3 support. How to handle other cloud providers?

if (new File(logFilename).exists()) {
log = FileUtil.readFileAsString(logFilename);
parseFile(logFilename);
}

String logFilename = FileUtil.path(workDir, CLOUDGENE_LOG);
IExternalWorkspace workspace = context.getExternalWorkspace();
InputStream stream = workspace.download(logFilename);
log = FileUtil.readFileAsString(stream);
parseFile(logFilename);
}

}

private void parseFile(String logFilename) throws IOException {
LineReader reader = new LineReader(logFilename);
while(reader.next()) {

LineReader reader = new LineReader(new DataInputStream(context.getExternalWorkspace().download(logFilename)));
while (reader.next()) {
String line = reader.get();
if (line.startsWith("[INC]")) {
String[] tiles = line.split(" ", 3);
Expand All @@ -59,9 +62,9 @@ private void parseFile(String logFilename) throws IOException {
String[] tiles = line.split(" ", 2);
String name = tiles[1];
context.submitCounter(name);
}
}
}
reader.close();
reader.close();
}

public int getId() {
Expand Down

0 comments on commit 8e6bdc4

Please sign in to comment.