diff --git a/src/main/java/cloudgene/mapred/plugins/nextflow/NextflowTask.java b/src/main/java/cloudgene/mapred/plugins/nextflow/NextflowTask.java index 8bc6ba79..eae8049c 100644 --- a/src/main/java/cloudgene/mapred/plugins/nextflow/NextflowTask.java +++ b/src/main/java/cloudgene/mapred/plugins/nextflow/NextflowTask.java @@ -1,15 +1,20 @@ package cloudgene.mapred.plugins.nextflow; +import java.io.DataInputStream; import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.util.Map; import cloudgene.mapred.jobs.CloudgeneContext; +import cloudgene.sdk.internal.IExternalWorkspace; import genepi.io.FileUtil; import genepi.io.text.LineReader; public class NextflowTask { + private static final String CLOUDGENE_LOG = "cloudgene.log"; + private int id; private Map trace; @@ -17,8 +22,8 @@ public class NextflowTask { private String log = null; private CloudgeneContext context; - - public NextflowTask( CloudgeneContext context, Map trace) { + + public NextflowTask(CloudgeneContext context, Map trace) { id = (Integer) trace.get("task_id"); this.trace = trace; this.context = context; @@ -33,21 +38,19 @@ public void update(Map trace) throws IOException { String status = (String) trace.get("status"); if (status.equals("COMPLETED") || status.equals("FAILED")) { String workDir = (String) trace.get("workdir"); - String logFilename = FileUtil.path(workDir, "cloudgene.log"); - // TODO: implement s3 support. How to handle other cloud providers? - - if (new File(logFilename).exists()) { - log = FileUtil.readFileAsString(logFilename); - parseFile(logFilename); - } - + String logFilename = FileUtil.path(workDir, CLOUDGENE_LOG); + IExternalWorkspace workspace = context.getExternalWorkspace(); + InputStream stream = workspace.download(logFilename); + log = FileUtil.readFileAsString(stream); + parseFile(logFilename); } } private void parseFile(String logFilename) throws IOException { - LineReader reader = new LineReader(logFilename); - while(reader.next()) { + + LineReader reader = new LineReader(new DataInputStream(context.getExternalWorkspace().download(logFilename))); + while (reader.next()) { String line = reader.get(); if (line.startsWith("[INC]")) { String[] tiles = line.split(" ", 3); @@ -59,9 +62,9 @@ private void parseFile(String logFilename) throws IOException { String[] tiles = line.split(" ", 2); String name = tiles[1]; context.submitCounter(name); - } + } } - reader.close(); + reader.close(); } public int getId() {