diff --git a/rabix-bindings-sb/src/main/java/org/rabix/bindings/sb/SBProcessor.java b/rabix-bindings-sb/src/main/java/org/rabix/bindings/sb/SBProcessor.java index 7a480e586..cedf93c38 100644 --- a/rabix-bindings-sb/src/main/java/org/rabix/bindings/sb/SBProcessor.java +++ b/rabix-bindings-sb/src/main/java/org/rabix/bindings/sb/SBProcessor.java @@ -3,6 +3,8 @@ import java.io.File; import java.io.IOException; import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collections; @@ -154,8 +156,8 @@ private Map collectOutputs(SBJob job, File workingDir, HashAlgor if (resultFile.exists()) { String resultStr = FileUtils.readFileToString(resultFile); Map result = JSONHelper.readMap(resultStr); - postprocessToolCreatedResults(result, hashAlgorithm); - return JSONHelper.readMap(resultStr); + postprocessToolCreatedResults(result, hashAlgorithm, workingDir.toPath()); + return result; } Map result = new HashMap<>(); @@ -172,37 +174,30 @@ private Map collectOutputs(SBJob job, File workingDir, HashAlgor return result; } - private void postprocessToolCreatedResults(Object value, HashAlgorithm hashAlgorithm) { + private void postprocessToolCreatedResults(Object value, HashAlgorithm hashAlgorithm, Path workDir) { if (value == null) { return; } if ((SBSchemaHelper.isFileFromValue(value))) { - File file = new File(SBFileValueHelper.getPath(value)); - if (!file.exists()) { - return; - } - SBFileValueHelper.setSize(file.length(), value); - - if (hashAlgorithm != null) { - String checksum = ChecksumHelper.checksum(file, hashAlgorithm); - if (checksum != null) { - SBFileValueHelper.setChecksum(checksum, value); - } + try { + SBFileValueHelper.buildMissingInfo(value, hashAlgorithm, workDir); + } catch (IOException | URISyntaxException e) { + logger.error("Couldn't postprocess file: " + value + " : " + e.getMessage()); } List> secondaryFiles = SBFileValueHelper.getSecondaryFiles(value); if (secondaryFiles != null) { for (Object secondaryFile : secondaryFiles) { - postprocessToolCreatedResults(secondaryFile, hashAlgorithm); + postprocessToolCreatedResults(secondaryFile, hashAlgorithm, workDir); } } } else if (value instanceof List) { for (Object subvalue : (List) value) { - postprocessToolCreatedResults(subvalue, hashAlgorithm); + postprocessToolCreatedResults(subvalue, hashAlgorithm, workDir); } } else if (value instanceof Map) { for (Object subvalue : ((Map) value).values()) { - postprocessToolCreatedResults(subvalue, hashAlgorithm); + postprocessToolCreatedResults(subvalue, hashAlgorithm, workDir); } } } @@ -323,7 +318,7 @@ private List> globFiles(final SBJob job, final File workingD SBFileValueHelper.setName(file.getName(), fileData); SBFileValueHelper.setPath(file.getAbsolutePath(), fileData); - List secondaryFiles = getSecondaryFiles(job, hashAlgorithm, fileData, file.getAbsolutePath(), outputBinding); + List secondaryFiles = getSecondaryFiles(job, hashAlgorithm, fileData, file.getAbsolutePath(), outputBinding, true); if (secondaryFiles != null) { SBFileValueHelper.setSecondaryFiles(secondaryFiles, fileData); } @@ -357,7 +352,7 @@ private List> globFiles(final SBJob job, final File workingD * Gets secondary files (absolute paths) */ public static List> getSecondaryFiles(SBJob job, HashAlgorithm hashAlgorithm, Map fileValue, String fileName, - Object binding) throws SBExpressionException { + Object binding, boolean onlyExisting) throws SBExpressionException { List secondaryFileSufixes = SBBindingHelper.getSecondaryFiles(binding); if (secondaryFileSufixes == null) { @@ -383,8 +378,11 @@ public static List> getSecondaryFiles(SBJob job, HashAlgorit secondaryFilePath += suffix.startsWith(".") ? suffix : "." + suffix; } try { - Map file = SBFileValueHelper.pathToRawFile(Paths.get(secondaryFilePath), hashAlgorithm, Paths.get(SBFileValueHelper.getPath(fileValue))); - secondaryFileMaps.add(file); + Path pathToSec = Paths.get(secondaryFilePath); + if (Files.exists(pathToSec) || !onlyExisting) { + Map file = SBFileValueHelper.pathToRawFile(pathToSec, hashAlgorithm, Paths.get(SBFileValueHelper.getPath(fileValue))); + secondaryFileMaps.add(file); + } } catch (IOException | URISyntaxException e) { logger.error("Couldn't collect secondary file: " + secondaryFilePath); } @@ -393,6 +391,14 @@ public static List> getSecondaryFiles(SBJob job, HashAlgorit return secondaryFileMaps; } + /** + * Gets secondary files (absolute paths) + */ + public static List> getSecondaryFiles(SBJob job, HashAlgorithm hashAlgorithm, Map fileValue, String fileName, + Object binding) throws SBExpressionException { + return getSecondaryFiles(job, hashAlgorithm, fileValue, fileName, binding, true); + } + @Override public Object transformInputs(Object value, Job job, Object transform) throws BindingException { return value; diff --git a/rabix-bindings-sb/src/main/java/org/rabix/bindings/sb/resolver/SBDocumentResolver.java b/rabix-bindings-sb/src/main/java/org/rabix/bindings/sb/resolver/SBDocumentResolver.java index 637cec549..63811cd02 100644 --- a/rabix-bindings-sb/src/main/java/org/rabix/bindings/sb/resolver/SBDocumentResolver.java +++ b/rabix-bindings-sb/src/main/java/org/rabix/bindings/sb/resolver/SBDocumentResolver.java @@ -31,28 +31,28 @@ public class SBDocumentResolver { public static final String CWL_VERSION_KEY = "cwlVersion"; - + public static final String RESOLVER_JSON_POINTER_KEY = "$job"; - + public static final String DOCUMENT_FRAGMENT_SEPARATOR = "#"; - + private static final Map> fragmentsCache = new HashMap<>(); private static final Map> referenceCache = new HashMap<>(); private static final Map> replacements = new HashMap<>(); - + public static String resolve(String appUrl) throws BindingException { String appUrlBase = appUrl; - URI uri = URI.create(appUrl); - if (uri.getScheme().equals(URIHelper.DATA_URI_SCHEME)) { + + if (appUrlBase.startsWith(URIHelper.DATA_URI_SCHEME)) { appUrlBase = URIHelper.extractBase(appUrl); } - Path file = null; JsonNode root = null; try { boolean isFile = URIHelper.isFile(appUrlBase); if (isFile) { + URI uri = URI.create(appUrl); file = Paths.get(uri.getPath()); } else { file = Paths.get("."); @@ -63,13 +63,13 @@ public static String resolve(String appUrl) throws BindingException { } JsonNode cwlVersion = root.get(CWL_VERSION_KEY); - if (cwlVersion == null || !(cwlVersion.asText().equals(ProtocolType.SB.appVersion))){ + if (cwlVersion == null || !(cwlVersion.asText().equals(ProtocolType.SB.appVersion))) { clearReplacements(appUrl); clearReferenceCache(appUrl); clearFragmentCache(appUrl); throw new BindingWrongVersionException("Document version is not " + ProtocolType.SB.appVersion); } - + if (root.isArray()) { Map fragmentsCachePerUrl = getFragmentsCache(appUrl); for (JsonNode child : root) { @@ -78,7 +78,7 @@ public static String resolve(String appUrl) throws BindingException { String fragment = URIHelper.extractFragment(appUrl); root = fragmentsCachePerUrl.get(fragment); } - + traverse(appUrl, root, file, null, root); for (SBDocumentResolverReplacement replacement : getReplacements(appUrl)) { @@ -88,8 +88,8 @@ public static String resolve(String appUrl) throws BindingException { replaceObjectItem(appUrl, root, replacement); } } - - if(!(root.get(CWL_VERSION_KEY).asText().equals(ProtocolType.SB.appVersion))) { + + if (!(root.get(CWL_VERSION_KEY).asText().equals(ProtocolType.SB.appVersion))) { clearReplacements(appUrl); clearReferenceCache(appUrl); clearFragmentCache(appUrl); @@ -101,12 +101,17 @@ public static String resolve(String appUrl) throws BindingException { clearFragmentCache(appUrl); return JSONHelper.writeObject(root); } - + private static JsonNode traverse(String appUrl, JsonNode root, Path file, JsonNode parentNode, JsonNode currentNode) throws BindingException { Preconditions.checkNotNull(currentNode, "current node id is null"); - boolean isJsonPointer = currentNode.has(RESOLVER_JSON_POINTER_KEY) && parentNode != null; // we skip the first level $job + boolean isJsonPointer = currentNode.has(RESOLVER_JSON_POINTER_KEY) && parentNode != null; // we + // skip + // the + // first + // level + // $job if (isJsonPointer) { String referencePath = currentNode.get(RESOLVER_JSON_POINTER_KEY).textValue(); @@ -122,7 +127,7 @@ private static JsonNode traverse(String appUrl, JsonNode root, Path file, JsonNo getReferenceCache(appUrl).put(referencePath, reference); Map fragmentsCachePerUrl = getFragmentsCache(appUrl); - + ParentChild parentChild = null; JsonNode referenceDocumentRoot = null; if (fragmentsCachePerUrl != null && fragmentsCachePerUrl.containsKey(referencePath)) { @@ -214,7 +219,7 @@ private static JsonNode findDocumentRoot(JsonNode root, Path file, String refere } } } - + private static String loadContents(Path file, String path) throws BindingException { if (path.startsWith("ftp")) { try { @@ -274,7 +279,7 @@ private static ParentChild findReferencedNode(JsonNode rootNode, String absolute } return new ParentChild(parent, child); } - + private synchronized static Set getReplacements(String url) { LinkedHashSet replacementsPerUrl = replacements.get(url); if (replacementsPerUrl == null) { @@ -283,11 +288,11 @@ private synchronized static Set getReplacements(S } return replacementsPerUrl; } - + private synchronized static void clearReplacements(String url) { replacements.remove(url); } - + private synchronized static Map getReferenceCache(String url) { Map referenceCachePerUrl = referenceCache.get(url); if (referenceCachePerUrl == null) { @@ -296,7 +301,7 @@ private synchronized static Map getReferenc } return referenceCachePerUrl; } - + private synchronized static Map getFragmentsCache(String url) { Map fragmentsCachePerUrl = fragmentsCache.get(url); if (fragmentsCachePerUrl == null) { @@ -305,15 +310,15 @@ private synchronized static Map getFragmentsCache(String url) } return fragmentsCachePerUrl; } - + private synchronized static void clearReferenceCache(String url) { referenceCache.remove(url); } - + private synchronized static void clearFragmentCache(String url) { fragmentsCache.remove(url); } - + private static class ParentChild { JsonNode parent; JsonNode child; diff --git a/rabix-engine/src/main/java/org/rabix/engine/processor/handler/impl/JobStatusEventHandler.java b/rabix-engine/src/main/java/org/rabix/engine/processor/handler/impl/JobStatusEventHandler.java index be8c03ceb..c80be1c75 100644 --- a/rabix-engine/src/main/java/org/rabix/engine/processor/handler/impl/JobStatusEventHandler.java +++ b/rabix-engine/src/main/java/org/rabix/engine/processor/handler/impl/JobStatusEventHandler.java @@ -18,6 +18,7 @@ import org.rabix.common.helper.CloneHelper; import org.rabix.common.helper.InternalSchemaHelper; import org.rabix.common.helper.JSONHelper; +import org.rabix.common.logging.VerboseLogger; import org.rabix.engine.JobHelper; import org.rabix.engine.event.Event; import org.rabix.engine.event.impl.ContextStatusEvent; @@ -150,6 +151,9 @@ public void handle(JobStatusEvent event, EventHandlingMode mode) throws EventHan case COMPLETED: if (!jobRecord.isRoot()) { jobService.delete(jobRecord.getRootId(), jobRecord.getExternalId()); + if (jobRecord.isContainer() || jobRecord.isScatterWrapper()) { + VerboseLogger.log(String.format("Job %s has completed", jobRecord.getId())); + } } updateJobStats(jobRecord, jobStatsRecord); diff --git a/rabix-engine/src/main/java/org/rabix/engine/processor/impl/EventProcessorImpl.java b/rabix-engine/src/main/java/org/rabix/engine/processor/impl/EventProcessorImpl.java index a0709d87f..96065f59f 100644 --- a/rabix-engine/src/main/java/org/rabix/engine/processor/impl/EventProcessorImpl.java +++ b/rabix-engine/src/main/java/org/rabix/engine/processor/impl/EventProcessorImpl.java @@ -126,6 +126,7 @@ private void doProcessEvent(final ExternalEvent externalEvent) { invalidateContext(event.getContextId()); } catch (Exception ex) { logger.error("Failed to call jobFailed handler for job after event {} failed.", e, ex); + jobService.handleJobRootFailed(event.getContextId(), ex.getMessage()); } } finally { triggerGC(event); diff --git a/rabix-engine/src/main/java/org/rabix/engine/service/JobService.java b/rabix-engine/src/main/java/org/rabix/engine/service/JobService.java index 0cb822cf1..554190698 100644 --- a/rabix-engine/src/main/java/org/rabix/engine/service/JobService.java +++ b/rabix-engine/src/main/java/org/rabix/engine/service/JobService.java @@ -40,6 +40,8 @@ public interface JobService { void handleJobRootFailed(Job job); + void handleJobRootFailed(UUID job, String message); + void handleJobRootCompleted(Job job); void handleJobFailed(Job failedJob); diff --git a/rabix-engine/src/main/java/org/rabix/engine/service/impl/JobServiceImpl.java b/rabix-engine/src/main/java/org/rabix/engine/service/impl/JobServiceImpl.java index 5a64dc969..da2cecd0f 100644 --- a/rabix-engine/src/main/java/org/rabix/engine/service/impl/JobServiceImpl.java +++ b/rabix-engine/src/main/java/org/rabix/engine/service/impl/JobServiceImpl.java @@ -322,12 +322,17 @@ public void handleJobRootFailed(Job job){ jobRepository.update(job); stoppingRootIds.remove(job.getId()); } + handleJobRootFailed(job.getRootId(), job.getMessage()); + } + + @Override + public void handleJobRootFailed(UUID job, String message){ try { - engineStatusCallback.onJobRootFailed(job.getRootId(), job.getMessage()); + engineStatusCallback.onJobRootFailed(job, message); } catch (EngineStatusCallbackException e) { logger.error("Engine status callback failed", e); } finally { - garbageCollectionService.forceGc(job.getRootId()); + garbageCollectionService.forceGc(job); } } diff --git a/rabix-executor/src/main/java/org/rabix/executor/container/impl/DockerContainerHandler.java b/rabix-executor/src/main/java/org/rabix/executor/container/impl/DockerContainerHandler.java index b6e5edd98..84aa533d9 100644 --- a/rabix-executor/src/main/java/org/rabix/executor/container/impl/DockerContainerHandler.java +++ b/rabix-executor/src/main/java/org/rabix/executor/container/impl/DockerContainerHandler.java @@ -10,14 +10,16 @@ import java.net.URI; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; -import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -38,6 +40,7 @@ import org.rabix.bindings.model.requirement.DockerContainerRequirement; import org.rabix.bindings.model.requirement.EnvironmentVariableRequirement; import org.rabix.bindings.model.requirement.Requirement; +import org.rabix.common.helper.InternalSchemaHelper; import org.rabix.common.logging.VerboseLogger; import org.rabix.common.retry.Retry; import org.rabix.executor.config.DockerConfigation; @@ -109,6 +112,8 @@ public class DockerContainerHandler implements ContainerHandler { private FilePathMapper mapper; + private Path rootWorkingDir; + public DockerContainerHandler(Job job, Configuration configuration, DockerContainerRequirement dockerResource, StorageConfiguration storageConfig, DockerConfigation dockerConfig, WorkerStatusCallback statusCallback, DockerClientLockDecorator dockerClient) throws ContainerException { this.job = job; @@ -116,6 +121,7 @@ public DockerContainerHandler(Job job, Configuration configuration, DockerContai this.dockerResource = dockerResource; this.statusCallback = statusCallback; this.workingDir = storageConfig.getWorkingDir(job); + this.rootWorkingDir = findRoot(workingDir.toPath()); this.isConfigAuthEnabled = dockerConfig.isDockerConfigAuthEnabled(); this.removeContainers = dockerConfig.removeContainers(); @@ -139,6 +145,14 @@ public DockerContainerHandler(Job job, Configuration configuration, DockerContai } } + private Path findRoot(Path start) { + if (start.getFileName().toString().equals(InternalSchemaHelper.ROOT_NAME)) + return start; + if (start.getParent() == null || start.getParent().getFileName() == null) + return null; + return findRoot(start.getParent()); + } + private void pull(String image) throws ContainerException { logger.debug("Pulling docker image"); VerboseLogger.log(String.format("Pulling docker image %s", image)); @@ -202,29 +216,42 @@ private String getId(String param) throws IOException { List readLines = IOUtils.readLines(stream); return readLines.get(0); } - + @Override public void start() throws ContainerException { - try { HostConfig.Builder hostConfigBuilder = HostConfig.builder(); - + Set flat = new HashSet<>(); + Set binds = new HashSet<>(); for (FileValue f : FileValueHelper.getInputFiles(job)) { - hostConfigBuilder.appendBinds(createBind(f)); + flat.add(f); for (FileValue sec : f.getSecondaryFiles()) - hostConfigBuilder.appendBinds(createBind(sec)); - + flat.add(sec); + } + for (FileValue f : flat) { + Path location = Paths.get(URI.create(f.getLocation())); + if (location.startsWith(rootWorkingDir)) { + continue; + } + String path = f.getPath(); + if (path.equals(location.toString())) { + binds.add(location.getParent().toString() + ":" + location.getParent().toString()); + } else { + binds.add(location.toString() + ":" + path); + } } + hostConfigBuilder.appendBinds(binds); + if (dockerResource.getDockerOutputDirectory() != null) { hostConfigBuilder.binds(workingDir.getAbsolutePath() + ":" + dockerResource.getDockerOutputDirectory()); - } else { - hostConfigBuilder.appendBinds(workingDir.getAbsolutePath() + ":" + workingDir.getAbsolutePath()); } if (SystemUtils.IS_OS_WINDOWS) { hostConfigBuilder.binds(hostConfigBuilder.binds().stream().map(s -> s.replace("\\", "/")).collect(Collectors.toList())); + } else { + hostConfigBuilder.appendBinds(rootWorkingDir.toString() + ":" + rootWorkingDir.toString()); } - + String dockerPull = checkTagOrAddLatest(dockerResource.getDockerPull()); pull(dockerPull); @@ -255,7 +282,7 @@ public void start() throws ContainerException { List entrypoint = image.containerConfig().entrypoint(); commandLine = addEntrypoint(entrypoint, commandLine); - + if (StringUtils.isEmpty(commandLine.trim())) { overrideResultStatus = 0; // default is success return; @@ -296,15 +323,6 @@ public void start() throws ContainerException { } } - private String createBind(FileValue f) throws IOException { - URI uri = URI.create(f.getLocation()); - String path = uri.getPath(); - if (!Files.exists(Paths.get(uri))) { - throw new IOException("File " + path + " doesn't exist"); - } - return path + ":" + f.getPath(); - } - private String normalizeCommandLine(String commandLine) { commandLine = commandLine.trim(); if (commandLine.startsWith("\"") && commandLine.endsWith("\"")) { @@ -537,7 +555,7 @@ public synchronized void startContainer(String containerId) throws DockerExcepti dockerClient.startContainer(containerId); } catch (DockerException e) { ContainerInfo inspect = dockerClient.inspectContainer(containerId); - if (inspect == null || inspect.state().startedAt() == null) { + if (inspect == null || inspect.state().status().equals("created")) { throw e; } logger.warn("Start container method timed-out but still started the container, recovering."); @@ -677,10 +695,12 @@ public void dumpCommandLine() throws ContainerException { @Override public String getProcessExitMessage() throws ContainerException { try { - return this.dockerClient.logs(containerId, LogsParam.stderr()).readFully(); + LogStream log = this.dockerClient.logs(containerId, LogsParam.stderr()); + String message = log.readFully(); + log.close(); + return message; } catch (DockerException | InterruptedException e) { throw new ContainerException(e); } } - } diff --git a/rabix-executor/src/main/java/org/rabix/executor/container/impl/TemplateScope.java b/rabix-executor/src/main/java/org/rabix/executor/container/impl/TemplateScope.java index ddfea49fe..e0b4077cd 100644 --- a/rabix-executor/src/main/java/org/rabix/executor/container/impl/TemplateScope.java +++ b/rabix-executor/src/main/java/org/rabix/executor/container/impl/TemplateScope.java @@ -1,5 +1,6 @@ package org.rabix.executor.container.impl; +import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; @@ -61,7 +62,7 @@ public void setCommand(String command) { } public Set> getEnv() { - return env.entrySet(); + return env == null ? Collections.emptySet() : env.entrySet(); } public void setEnv(Map env) {