diff --git a/kaldb/src/main/java/com/slack/kaldb/recovery/RecoveryService.java b/kaldb/src/main/java/com/slack/kaldb/recovery/RecoveryService.java index 47515a7d15..1bc97b4d12 100644 --- a/kaldb/src/main/java/com/slack/kaldb/recovery/RecoveryService.java +++ b/kaldb/src/main/java/com/slack/kaldb/recovery/RecoveryService.java @@ -211,21 +211,30 @@ private void recoveryNodeListener(RecoveryNodeMetadata recoveryNodeMetadata) { * fails. To break this cycle add a enqueue_count value to recovery task so we can stop recovering * it if the task fails a certain number of times. */ - private void handleRecoveryTaskAssignment(RecoveryNodeMetadata recoveryNodeMetadata) { + protected void handleRecoveryTaskAssignment(RecoveryNodeMetadata recoveryNodeMetadata) { try { setRecoveryNodeMetadataState(Metadata.RecoveryNodeMetadata.RecoveryNodeState.RECOVERING); RecoveryTaskMetadata recoveryTaskMetadata = recoveryTaskMetadataStore.getSync(recoveryNodeMetadata.recoveryTaskName); - boolean success = handleRecoveryTask(recoveryTaskMetadata); - if (success) { - // delete the completed recovery task on success + if (!isValidRecoveryTask(recoveryTaskMetadata)) { + LOG.error( + "Invalid recovery task detected, skipping and deleting invalid task {}", + recoveryTaskMetadata); recoveryTaskMetadataStore.deleteSync(recoveryTaskMetadata.name); - setRecoveryNodeMetadataState(Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE); - recoveryNodeAssignmentSuccess.increment(); - } else { setRecoveryNodeMetadataState(Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE); recoveryNodeAssignmentFailed.increment(); + } else { + boolean success = handleRecoveryTask(recoveryTaskMetadata); + if (success) { + // delete the completed recovery task on success + recoveryTaskMetadataStore.deleteSync(recoveryTaskMetadata.name); + setRecoveryNodeMetadataState(Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE); + recoveryNodeAssignmentSuccess.increment(); + } else { + setRecoveryNodeMetadataState(Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE); + recoveryNodeAssignmentFailed.increment(); + } } } catch (Exception e) { setRecoveryNodeMetadataState(Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE); @@ -234,6 +243,20 @@ private void handleRecoveryTaskAssignment(RecoveryNodeMetadata recoveryNodeMetad } } + /** + * Attempts a final sanity-check on the recovery task to prevent a bad task from halting the + * recovery pipeline. Bad state should be ideally prevented at the creation, as well as prior to + * assignment, but this can be considered a final fail-safe if invalid recovery tasks somehow made + * it this far. + */ + private boolean isValidRecoveryTask(RecoveryTaskMetadata recoveryTaskMetadata) { + // todo - consider adding further invalid recovery task detections + if (recoveryTaskMetadata.endOffset <= recoveryTaskMetadata.startOffset) { + return false; + } + return true; + } + /** * This method does the recovery work from a recovery task. A recovery task indicates the start * and end offset of a kafka partition to index. To do the recovery work, we create a recovery diff --git a/kaldb/src/main/java/com/slack/kaldb/server/RecoveryTaskCreator.java b/kaldb/src/main/java/com/slack/kaldb/server/RecoveryTaskCreator.java index 8bfda6112e..17b5b63f9a 100644 --- a/kaldb/src/main/java/com/slack/kaldb/server/RecoveryTaskCreator.java +++ b/kaldb/src/main/java/com/slack/kaldb/server/RecoveryTaskCreator.java @@ -218,6 +218,8 @@ public long determineStartingOffset( } else if (indexerConfig.getCreateRecoveryTasksOnStart() && indexerConfig.getReadFromLocationOnStart() == KaldbConfigs.KafkaOffsetLocation.LATEST) { + // todo - this appears to be able to create recovery tasks that have a start and end + // position of 0, which is invalid LOG.info( "CreateRecoveryTasksOnStart is set and ReadLocationOnStart is set to current. Reading from current and" + " spinning up recovery tasks"); diff --git a/kaldb/src/test/java/com/slack/kaldb/recovery/RecoveryServiceTest.java b/kaldb/src/test/java/com/slack/kaldb/recovery/RecoveryServiceTest.java index 138dd53c62..d3e23fb654 100644 --- a/kaldb/src/test/java/com/slack/kaldb/recovery/RecoveryServiceTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/recovery/RecoveryServiceTest.java @@ -655,6 +655,61 @@ public void testValidateOffsetsWhenRecoveryTaskOverlapsWithEndOfKafkaRange() { assertThat(offsets.endOffset).isEqualTo(kafkaEndOffset); } + @Test + public void shouldHandleInvalidRecoveryTasks() throws Exception { + KaldbConfigs.KaldbConfig kaldbCfg = makeKaldbConfig(TEST_S3_BUCKET); + curatorFramework = + CuratorBuilder.build(meterRegistry, kaldbCfg.getMetadataStoreConfig().getZookeeperConfig()); + + // Start recovery service + recoveryService = new RecoveryService(kaldbCfg, curatorFramework, meterRegistry, blobFs); + recoveryService.startAsync(); + recoveryService.awaitRunning(DEFAULT_START_STOP_DURATION); + + // Create a recovery task + RecoveryTaskMetadataStore recoveryTaskMetadataStore = + new RecoveryTaskMetadataStore(curatorFramework, false); + assertThat(KaldbMetadataTestUtils.listSyncUncached(recoveryTaskMetadataStore).size()).isZero(); + RecoveryTaskMetadata recoveryTask = + new RecoveryTaskMetadata("testRecoveryTask", "0", 0, 0, Instant.now().toEpochMilli()); + recoveryTaskMetadataStore.createSync(recoveryTask); + assertThat(KaldbMetadataTestUtils.listSyncUncached(recoveryTaskMetadataStore).size()) + .isEqualTo(1); + assertThat(KaldbMetadataTestUtils.listSyncUncached(recoveryTaskMetadataStore).get(0)) + .isEqualTo(recoveryTask); + + // Assign the recovery task to node. + RecoveryNodeMetadataStore recoveryNodeMetadataStore = + new RecoveryNodeMetadataStore(curatorFramework, false); + List recoveryNodes = + KaldbMetadataTestUtils.listSyncUncached(recoveryNodeMetadataStore); + assertThat(recoveryNodes.size()).isEqualTo(1); + RecoveryNodeMetadata recoveryNodeMetadata = recoveryNodes.get(0); + assertThat(recoveryNodeMetadata.recoveryNodeState) + .isEqualTo(Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE); + recoveryNodeMetadataStore.updateSync( + new RecoveryNodeMetadata( + recoveryNodeMetadata.getName(), + Metadata.RecoveryNodeMetadata.RecoveryNodeState.ASSIGNED, + recoveryTask.getName(), + Instant.now().toEpochMilli())); + assertThat(KaldbMetadataTestUtils.listSyncUncached(recoveryTaskMetadataStore).size()) + .isEqualTo(1); + + await().until(() -> getCount(RECOVERY_NODE_ASSIGNMENT_FAILED, meterRegistry) == 1); + assertThat(getCount(RECOVERY_NODE_ASSIGNMENT_SUCCESS, meterRegistry)).isZero(); + assertThat(getCount(RECOVERY_NODE_ASSIGNMENT_RECEIVED, meterRegistry)).isEqualTo(1); + + // Post recovery checks + assertThat(KaldbMetadataTestUtils.listSyncUncached(recoveryNodeMetadataStore).size()) + .isEqualTo(1); + assertThat( + KaldbMetadataTestUtils.listSyncUncached(recoveryNodeMetadataStore) + .get(0) + .recoveryNodeState) + .isEqualTo(Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE); + } + // returns startOffset or endOffset based on the supplied OffsetSpec private static AdminClient getAdminClient(long startOffset, long endOffset) { AdminClient adminClient = mock(AdminClient.class);