Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
Milos Ljubinkovic committed Feb 6, 2018
2 parents 8c8f0c4 + 69441a9 commit 7f3fedb
Show file tree
Hide file tree
Showing 29 changed files with 71 additions and 328 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package org.rabix.backend.tes.model;

import java.util.List;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

@JsonIgnoreProperties(ignoreUnknown = true)
public class TESExecutorLog {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ private CWLCommandLinePart buildCommandLinePart(CWLJob job, CWLInputPort inputPo
String keyValue = inputPort != null ? inputPort.getId() : "";

Object valueFrom = CWLBindingHelper.getValueFrom(inputBinding);
if (valueFrom != null) {
if (valueFrom != null && value != null) {
try {
value = CWLExpressionResolver.resolve(valueFrom, job, value);
} catch (CWLExpressionException e) {
Expand All @@ -259,7 +259,7 @@ private CWLCommandLinePart buildCommandLinePart(CWLJob job, CWLInputPort inputPo
}

if (value instanceof Boolean) {
if (((Boolean) value)) {
if (((Boolean) value) && (inputBinding instanceof Map && !((Map) inputBinding).isEmpty())) {
if (prefix == null) {
throw new BindingException("Missing prefix for " + inputPort.getId() + " input.");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package org.rabix.bindings.cwl;

import java.net.URI;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -16,12 +13,9 @@
import org.rabix.bindings.cwl.bean.CWLJobApp;
import org.rabix.bindings.cwl.bean.CWLOutputPort;
import org.rabix.bindings.cwl.bean.CWLStep;
import org.rabix.bindings.cwl.bean.CWLStepInputs;
import org.rabix.bindings.cwl.bean.CWLWorkflow;
import org.rabix.bindings.cwl.bean.resource.CWLResource;
import org.rabix.bindings.cwl.helper.CWLBindingHelper;
import org.rabix.bindings.cwl.helper.CWLDirectoryValueHelper;
import org.rabix.bindings.cwl.helper.CWLFileValueHelper;
import org.rabix.bindings.cwl.helper.CWLSchemaHelper;
import org.rabix.bindings.model.ApplicationPort;
import org.rabix.bindings.model.LinkMerge;
Expand Down Expand Up @@ -84,10 +78,11 @@ private CWLJob process(CWLJob parentJob, CWLJob job) throws CWLException {
* Process hints in workflow
*/
public void processHints(CWLStep step, CWLJobApp parentJob, CWLJobApp childJob) {
for(CWLResource resource: parentJob.getHints()) {
for(CWLResource resource: step.getHints()) {
childJob.setHint(resource);
}
for(CWLResource resource: step.getHints()) {

for(CWLResource resource: parentJob.getHints()) {
childJob.setHint(resource);
}
}
Expand All @@ -99,10 +94,11 @@ public void processHints(CWLStep step, CWLJobApp parentJob, CWLJobApp childJob)
* Process requirements in workflow
*/
public void processRequirements(CWLStep step, CWLJobApp parentJob, CWLJobApp childJob) {
for(CWLResource resource: parentJob.getRequirements()) {
for(CWLResource resource: step.getRequirements()) {
childJob.setRequirement(resource);
}
for(CWLResource resource: step.getRequirements()) {

for(CWLResource resource: parentJob.getRequirements()) {
childJob.setRequirement(resource);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.rabix.bindings.BindingException;
import org.rabix.bindings.ProtocolProcessor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@
import org.rabix.bindings.cwl.json.CWLResourcesDeserializer;
import org.rabix.bindings.model.Application;
import org.rabix.bindings.model.ApplicationPort;
import org.rabix.common.json.BeanPropertyView;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonView;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;

@JsonDeserialize(using=CWLJobAppDeserializer.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,8 @@ private static String getNameext(String filename) {
return null;
}

public static void buildMissingInfo(Object value, HashAlgorithm alg, Path workDir) throws IOException, URISyntaxException {
public static void buildMissingInfo(Object value, HashAlgorithm alg, Path dir) throws IOException, URISyntaxException {
Path workDir = dir == null ? Paths.get("/") : dir;
String path = getPath(value);
String location = getLocation(value);
Path actual = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,9 @@ private static JsonNode traverse(String appUrl, JsonNode root, Path file, JsonNo
getReplacements(appUrl).add(new CWLDocumentResolverReplacement(parentNode, currentNode, path));
return null;
}

namespace(currentNode);

if(!namespaces.isEmpty())
namespace(currentNode);

boolean isReference = currentNode.has(RESOLVER_REFERENCE_KEY);
boolean appReference = currentNode.has(APP_STEP_KEY) && currentNode.get(APP_STEP_KEY).isTextual();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.rabix.bindings.sb.service.SBMetadataService;
import org.rabix.bindings.sb.service.impl.SBGlobServiceImpl;
import org.rabix.bindings.sb.service.impl.SBMetadataServiceImpl;
import org.rabix.common.helper.ChecksumHelper;
import org.rabix.common.helper.ChecksumHelper.HashAlgorithm;
import org.rabix.common.helper.JSONHelper;
import org.rabix.common.json.BeanSerializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ public static Map<String, Object> createFileRaw(FileValue fileValue) {
return raw;
}

public static void buildMissingInfo(Object value, HashAlgorithm alg, Path workDir) throws IOException, URISyntaxException {
public static void buildMissingInfo(Object value, HashAlgorithm alg, Path dir) throws IOException, URISyntaxException {
Path workDir = dir == null ? Paths.get("/") : dir;
String path = getPath(value);
String location = getLocation(value);
Path actual = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
package org.rabix.engine.store.model;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.rabix.bindings.model.LinkMerge;
import org.rabix.bindings.model.dag.DAGLinkPort.LinkPortType;
import org.rabix.bindings.model.dag.DAGNode;
import org.rabix.engine.store.model.scatter.ScatterStrategy;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

public class JobRecord extends TimestampedModel {

public static class JobIdRootIdPair {
final public String id;
final public UUID rootId;

public JobIdRootIdPair(String id, UUID rootId) {
this.id = id;
this.rootId = rootId;
Expand All @@ -33,18 +32,18 @@ public static enum JobState {
FAILED,
ABORTED
}

public final static String CACHE_NAME = "JOB_RECORD";

private String id;
private UUID externalId;
private UUID rootId;
private UUID parentId;
private Boolean master;
private Boolean blocking;

private JobState state;

private List<PortCounter> inputCounters;
private List<PortCounter> outputCounters;

Expand All @@ -54,15 +53,15 @@ public static enum JobState {

private int numberOfGlobalInputs = 0;
private int numberOfGlobalOutputs = 0;

private String dagHash;

private ScatterStrategy scatterStrategy;

public JobRecord() {
super(LocalDateTime.now(), LocalDateTime.now());
}

public JobRecord(UUID rootId, String id, UUID uniqueId, UUID parentId, JobState state, Boolean isContainer, Boolean isScattered, Boolean master, Boolean blocking, String dagCache) {
this(rootId, id, uniqueId, parentId, state, isContainer, isScattered, master, blocking, dagCache, LocalDateTime.now(), LocalDateTime.now());
}
Expand All @@ -82,11 +81,11 @@ public JobRecord(UUID rootId, String id, UUID uniqueId, UUID parentId, JobState
this.inputCounters = new ArrayList<>();
this.outputCounters = new ArrayList<>();
}

public Boolean isRoot() {
return externalId.equals(rootId);
}

public Boolean isBlocking() {
return blocking;
}
Expand Down Expand Up @@ -195,6 +194,10 @@ public void setDagHash(String dagHash) {
this.dagHash = dagHash;
}

public boolean isTopLevel() {
return isRoot() || (parentId != null && parentId.equals(rootId));
}

public Boolean isInputPortReady(String port) {
for (PortCounter pc : inputCounters) {
if (pc.port.equals(port)) {
Expand All @@ -205,7 +208,7 @@ public Boolean isInputPortReady(String port) {
}
return false;
}

public Boolean isOutputPortReady(String port) {
for (PortCounter pc : outputCounters) {
if (pc.port.equals(port)) {
Expand All @@ -216,7 +219,7 @@ public Boolean isOutputPortReady(String port) {
}
return false;
}

public PortCounter getInputCounter(String port) {
for (PortCounter portCounter : inputCounters) {
if (portCounter.port.equals(port)) {
Expand All @@ -225,7 +228,7 @@ public PortCounter getInputCounter(String port) {
}
return null;
}

public PortCounter getOutputCounter(String port) {
for (PortCounter portCounter : outputCounters) {
if (portCounter.port.equals(port)) {
Expand All @@ -243,7 +246,7 @@ public int getInputPortIncoming(String port) {
}
return 0;
}

public int getOutputPortIncoming(String port) {
for (PortCounter pc : outputCounters) {
if (pc.port.equals(port)) {
Expand All @@ -252,7 +255,7 @@ public int getOutputPortIncoming(String port) {
}
return 0;
}

public Boolean isReady() {
for (PortCounter portCounter : inputCounters) {
if (portCounter.counter > 0) {
Expand All @@ -270,7 +273,7 @@ public Boolean isCompleted() {
}
return true;
}

public Boolean isScatterPort(String port) {
for (PortCounter portCounter : inputCounters) {
if (portCounter.port.equals(port)) {
Expand All @@ -290,7 +293,7 @@ public List<String> getScatterPorts() {
}
return result;
}

public Boolean isInputPortBlocking(DAGNode node, String port) {
return getInputPortIncoming(port) > 1 && LinkMerge.isBlocking(node.getLinkMerge(port, LinkPortType.INPUT));
}
Expand All @@ -304,7 +307,7 @@ public static class PortCounter {
public Boolean scatter;
@JsonProperty("incoming")
public int incoming;

@JsonProperty("updatedAsSourceCounter")
public int updatedAsSourceCounter = 0;
@JsonProperty("globalCounter")
Expand Down Expand Up @@ -332,15 +335,15 @@ public PortCounter(String port, int counter, Boolean scatter) {
public void increaseIncoming() {
this.incoming++;
}

public void updatedAsSource(int value) {
this.updatedAsSourceCounter = updatedAsSourceCounter + value;
}

public void setGlobalCounter(int globalCounter) {
this.globalCounter = globalCounter;
}

public String getPort() {
return port;
}
Expand All @@ -352,7 +355,7 @@ public void setPort(String port) {
public int getGlobalCounter() {
return globalCounter;
}

public int getCounter() {
return counter;
}
Expand All @@ -374,7 +377,7 @@ public String toString() {
return "PortCounter [port=" + port + ", counter=" + counter + ", scatter=" + scatter + ", incoming=" + incoming
+ ", updatedAsSourceCounter=" + updatedAsSourceCounter + ", globalCounter=" + globalCounter + "]";
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package org.rabix.engine.store.repository;

import org.rabix.engine.store.model.BackendRecord;

import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import org.rabix.engine.store.model.BackendRecord;

public interface BackendRepository {

void insert(BackendRecord backend);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void handle(JobStatusEvent event, EventHandlingMode mode) throws EventHan
}

JobStatsRecord jobStatsRecord = null;
if (mode != EventHandlingMode.REPLAY && (jobRecord.getParentId() != null && jobRecord.getParentId().equals(jobRecord.getRootId())) || (jobRecord.isRoot())) {
if (mode != EventHandlingMode.REPLAY && jobRecord.isTopLevel()) {
jobStatsRecord = jobStatsRecordService.findOrCreate(jobRecord.getRootId());
}

Expand Down Expand Up @@ -149,6 +149,11 @@ public void handle(JobStatusEvent event, EventHandlingMode mode) throws EventHan
}
break;
case COMPLETED:
if (jobRecord.getState() == JobRecord.JobState.COMPLETED) {
logger.info("Job {} of {} is already completed.", jobRecord.getId(), jobRecord.getRootId());
break;
}

if (!jobRecord.isRoot()) {
jobService.delete(jobRecord.getRootId(), jobRecord.getExternalId());
if (jobRecord.isContainer() || jobRecord.isScatterWrapper()) {
Expand Down
Loading

0 comments on commit 7f3fedb

Please sign in to comment.