diff --git a/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageOnDemandInfo.java b/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageOnDemandInfo.java index 56042a8669..3a86a8a963 100644 --- a/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageOnDemandInfo.java +++ b/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageOnDemandInfo.java @@ -171,6 +171,8 @@ public static class LineageInfoOnDemand { boolean hasMoreOutputs; int inputRelationsCount; int outputRelationsCount; + int totalInputRelationsCount; + int totalOutputRelationsCount; boolean isInputRelationsReachedLimit; boolean isOutputRelationsReachedLimit; @JsonProperty @@ -188,13 +190,15 @@ public LineageInfoOnDemand(LineageOnDemandConstraints onDemandConstraints) { this.hasMoreOutputs = false; this.inputRelationsCount = 0; this.outputRelationsCount = 0; + this.totalInputRelationsCount = 0; + this.totalOutputRelationsCount = 0; this.isInputRelationsReachedLimit = false; this.isOutputRelationsReachedLimit = false; this.hasUpstream = false; this.hasDownstream = false; this.fromCounter = 0; } - + public boolean isInputRelationsReachedLimit() { return isInputRelationsReachedLimit; } @@ -243,10 +247,18 @@ public void setHasDownstream(boolean hasDownstream) { this.hasDownstream = hasDownstream; } - public int getFromCounter() { - return fromCounter; + public int getTotalInputRelationsCount() { + return totalInputRelationsCount; + } + + public void setTotalInputRelationsCount(int count) {this.totalInputRelationsCount = count;} + + public int getTotalOutputRelationsCount() { + return totalOutputRelationsCount; } + public void setTotalOutputRelationsCount(int count) {this.totalOutputRelationsCount = count;} + public void incrementFromCounter() { fromCounter++; } @@ -255,6 +267,10 @@ public int getInputRelationsCount() { return inputRelationsCount; } + public int getFromCounter() { + return fromCounter; + } + public void incrementInputRelationsCount() { this.inputRelationsCount++; if (inputRelationsCount == onDemandConstraints.getInputRelationsLimit()) { diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index df41d22328..9c4b07764c 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -650,24 +650,27 @@ else if (!isInput && ! isInVertexVisited) } private void setHasDownstream(AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasVertex inVertex, LineageInfoOnDemand inLineageInfo) { - List filteredEdges = getFilteredAtlasEdges(inVertex, PROCESS_INPUTS_EDGE, atlasLineageOnDemandContext); - if (!filteredEdges.isEmpty()) + List filteredEdges = getFilteredAtlasEdges(inVertex, IN, PROCESS_INPUTS_EDGE, atlasLineageOnDemandContext); + if (!filteredEdges.isEmpty()) { inLineageInfo.setHasDownstream(true); + inLineageInfo.setTotalOutputRelationsCount(filteredEdges.size()); + } } private void setHasUpstream(AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasVertex outVertex, LineageInfoOnDemand outLineageInfo) { - List filteredEdges = getFilteredAtlasEdges(outVertex, PROCESS_OUTPUTS_EDGE, atlasLineageOnDemandContext); - if (!filteredEdges.isEmpty()) + List filteredEdges = getFilteredAtlasEdges(outVertex, IN, PROCESS_OUTPUTS_EDGE, atlasLineageOnDemandContext); + if (!filteredEdges.isEmpty()) { outLineageInfo.setHasUpstream(true); + outLineageInfo.setTotalInputRelationsCount(filteredEdges.size()); + } } - private List getFilteredAtlasEdges(AtlasVertex outVertex, String processEdgeLabel, AtlasLineageOnDemandContext atlasLineageOnDemandContext) { + private List getFilteredAtlasEdges(AtlasVertex outVertex, AtlasEdgeDirection direction, String processEdgeLabel, AtlasLineageOnDemandContext atlasLineageOnDemandContext) { List filteredEdges = new ArrayList<>(); - Iterable edges = outVertex.getEdges(IN, processEdgeLabel); + Iterable edges = outVertex.getEdges(direction, processEdgeLabel); for (AtlasEdge edge : edges) { if (edgeMatchesEvaluation(edge, atlasLineageOnDemandContext)) { filteredEdges.add(edge); - break; } } return filteredEdges;