diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java index 4755f836869..4ac2cc5d942 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java @@ -153,15 +153,20 @@ public List getFlowStatusesAcrossGroup(String flowGroup, int countPe * @return true, if any jobs of the flow are RUNNING. */ public boolean isFlowRunning(String flowName, String flowGroup, long flowExecutionId) { - List flowStatusList = getLatestFlowStatus(flowName, flowGroup, 1, null); + List flowStatusList = getLatestFlowStatus(flowName, flowGroup, 2, null); if (flowStatusList == null || flowStatusList.isEmpty()) { return false; + } + FlowStatus flowStatus = flowStatusList.get(0); + ExecutionStatus flowExecutionStatus = flowStatus.getFlowExecutionStatus(); + log.info("Comparing flow execution status with flowExecutionId: " + flowStatus.getFlowExecutionId() + " and flowStatus: " + flowExecutionStatus + " with incoming flowExecutionId: " + flowExecutionId); + // If the latest flow status is the current job about to get kicked off, we should ignore this check + if (flowStatus.getFlowExecutionId() == flowExecutionId) { + // Another host may have already emitted a flow status that skipped this flow execution, so compare against the previous flow status + FlowStatus previousFlowStatus = flowStatusList.size() > 1 ? flowStatusList.get(1) : null; + return previousFlowStatus != null && FINISHED_STATUSES.contains(previousFlowStatus.getFlowExecutionStatus().name()); } else { - FlowStatus flowStatus = flowStatusList.get(0); - ExecutionStatus flowExecutionStatus = flowStatus.getFlowExecutionStatus(); - log.info("Comparing flow execution status with flowExecutionId: " + flowStatus.getFlowExecutionId() + " and flowStatus: " + flowExecutionStatus + " with incoming flowExecutionId: " + flowExecutionId); - // If the latest flow status is the current job about to get kicked off, we should ignore this check - return flowStatus.getFlowExecutionId() != flowExecutionId && !FINISHED_STATUSES.contains(flowExecutionStatus.name()); + return !FINISHED_STATUSES.contains(flowExecutionStatus.name()); } } diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java index 109c71f4053..6b267a151e8 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java @@ -33,6 +33,8 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.anyInt; public class FlowStatusGeneratorTest { @@ -43,7 +45,7 @@ public void testIsFlowRunningFirstExecution() { String flowName = "testName"; String flowGroup = "testGroup"; long currFlowExecutionId = 1234L; - when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn(null); + when(jobStatusRetriever.getLatestExecutionIdsForFlow(eq(flowName), eq(flowGroup), anyInt())).thenReturn(null); FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever); Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, currFlowExecutionId)); @@ -55,7 +57,7 @@ public void testIsFlowRunningCompiledPastExecution() { String flowName = "testName"; String flowGroup = "testGroup"; long flowExecutionId = 1234L; - when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn( + when(jobStatusRetriever.getLatestExecutionIdsForFlow(eq(flowName), eq(flowGroup), Mockito.anyInt())).thenReturn( Lists.newArrayList(flowExecutionId)); JobStatus jobStatus = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId) .jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.COMPILED.name()).build(); @@ -73,7 +75,7 @@ public void skipFlowConcurrentCheckSameFlowExecutionId() { String flowName = "testName"; String flowGroup = "testGroup"; long flowExecutionId = 1234L; - when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn( + when(jobStatusRetriever.getLatestExecutionIdsForFlow(eq(flowName), eq(flowGroup), Mockito.anyInt())).thenReturn( Lists.newArrayList(flowExecutionId)); JobStatus jobStatus = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId) .jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.COMPILED.name()).build(); @@ -81,17 +83,41 @@ public void skipFlowConcurrentCheckSameFlowExecutionId() { when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn( jobStatusIterator); FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever); - // If the flow is compiled but the flow execution status is the same as the one about to be kicked off, do not consider it as running. + // If the flow is compiled but the flow execution id is the same as the one about to be kicked off, do not consider it as running. Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId)); } - @Test + @Test + public void skipCurrentFlowBasedOnPriorFlowExecutionId() { + JobStatusRetriever jobStatusRetriever = Mockito.mock(JobStatusRetriever.class); + String flowName = "testName"; + String flowGroup = "testGroup"; + long flowExecutionId = 1234L; + long flowExecutionId2 = 1235L; + when(jobStatusRetriever.getLatestExecutionIdsForFlow(eq(flowName), eq(flowGroup), Mockito.anyInt())).thenReturn( + Lists.newArrayList(flowExecutionId, flowExecutionId2)); + JobStatus jobStatus = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId) + .jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.RUNNING.name()).build(); + JobStatus jobStatus2 = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId2) + .jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.FAILED.name()).build(); + Iterator jobStatusIterator = Lists.newArrayList(jobStatus).iterator(); + Iterator jobStatusIterator2 = Lists.newArrayList(jobStatus2).iterator(); + when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn( + jobStatusIterator); + when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId2)).thenReturn( + jobStatusIterator2); + FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever); + // If prior flow is still running but another host failed the current flow to skip it, use the prior flow execution ID as reference + Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId2)); + } + + @Test public void testIsFlowRunningJobExecutionIgnored() { String flowName = "testName"; String flowGroup = "testGroup"; long flowExecutionId = 1234L; JobStatusRetriever jobStatusRetriever = Mockito.mock(JobStatusRetriever.class); - when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn( + when(jobStatusRetriever.getLatestExecutionIdsForFlow(eq(flowName), eq(flowGroup), Mockito.anyInt())).thenReturn( Lists.newArrayList(flowExecutionId)); //JobStatuses should be ignored, only the flow level status matters. String job1 = "job1";