Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pauseless Consumption #3: Disaster Recovery with Reingestion #14920

Open
wants to merge 51 commits into
base: master
Choose a base branch
from

Conversation

KKcorps
Copy link
Contributor

@KKcorps KKcorps commented Jan 27, 2025

This PR adds support for Disaster recovery for Pauseless Ingestion along with Reingestion. These changes help solve the scenario where real-time segments permanently fail to transition out of ERROR state, leading to data gaps. With reingestion, Pinot can recover such segments, ensuring availability and correctness of real-time data.

During Pauseless ingestion, a ONLINE segment can wind up in an ERROR state if its commit fails due to server restart and there are no other replicas. Currently in pinot, there is no way to recover from such failures.

Reingestion Flow

Segments that fail to commit or end up in ERROR state can now be re-ingested by calling a new endpoint (/reingestSegment) on the server.

The ReIngestionResource reconstructs the segment from the stream, builds it, and commits it, ensuring that offline peers and the deep store get updated properly.

If successful, the re-ingested segment transitions from ERROR to ONLINE.

New APIs introduced:

Get Running Re-ingestion Jobs

GET /reingestSegment/jobs

Returns all currently running re-ingestion jobs with their status information.

Response

  • Type: JSON
  • Contains array of jobs with:
    • jobId: Unique identifier
    • tableNameWithType: Table being processed
    • segmentName: Segment being re-ingested
    • startTimeMs: Job start timestamp

Responses

  • 200: Success - List of running jobs

Re-ingest Segment

POST /reingestSegment

Asynchronously re-ingests a segment with updated configurations.

Request Body

  • Type: JSON
  • Required fields:
    • tableNameWithType: Table name with type (e.g. "myTable_REALTIME")
    • segmentName: Name of segment to re-ingest

Response

  • Type: JSON
  • Contains:
    • jobId: Unique identifier for tracking progress
    • message: Success confirmation

Responses

  • 200: Success - Job started successfully
  • 429: Too Many Requests - Parallel job limit reached
  • 404: Not Found - Table/segment not found
  • 409: Conflict - Segment already being re-ingested
  • 500: Internal Server Error - Server initialization issues

Reingestion data flow

sequenceDiagram
    participant Controller
    participant Server
    participant ReIngestionResource
    participant SimpleRealtimeSegmentDataManager

    Controller->>Controller: Finds ERROR segment in validation task
    Controller->>Controller: Pick one alive server from IS for segment
    Controller->>Server: POST /reIngestSegment (tableName, segmentName)
    Server->>ReIngestionResource: ReIngestionRequest
    ReIngestionResource->>SimpleRealtimeSegmentDataManager: startConsumption()
    SimpleRealtimeSegmentDataManager-->>SimpleRealtimeSegmentDataManager: Consume data & Build Segment
    SimpleRealtimeSegmentDataManager->>ReIngestionResource: Segment Tar File
    ReIngestionResource->>Controller: Upload segment metadata
    Controller->>Controller: Update ZK segment status to UPLOADED
    Server->>Server: Wait for segment to get uploads
    Server->>Controller: Reset segment to ONLINE
    Server-->>Controller: 200 OK (Reingestion complete)
Loading

Reingestion design diagram

flowchart TD
    A[Start reingestion request] --> B[Check concurrency <br> & segment ingestion map]
    B --> C{Already ingesting?}
    C -- Yes --> D[Return 409 conflict]
    C -- No --> E[Acquire semaphore, set ingesting]
    E --> F[Create & start SimpleRealtimeSegmentDataManager]
    F --> G{Consume from<br>startOffset to endOffset}
    G --> H[Build & tar segment]
    H --> I[Push metadata to controller]
    I --> J[Wait for segment to be uploaded]
    J --> K[Trigger segment reset]
    K --> L[Return success]
    L --> M[Release semaphore, mark not ingesting]
    M --> N[Done]
Loading

9aman and others added 30 commits January 2, 2025 16:57
1. Changing FSM
2. Changing the 3 steps performed during the commit protocol to update ZK and Ideal state
1. Changes in the commit protocol to start segment commit before the build
2. Changes in the BaseTableDataManager to ensure that the locally built segment is replaced by a downloaded one
   only when the CRC is present in the ZK Metadata
3. Changes in the download segment method to allow waited download in case of pauseless consumption
…segment commit end metadata call

Refactoing code for redability
… ingestion by moving it out of streamConfigMap
…auseless ingestion in RealtimeSegmentValidationManager
…d by RealtimeSegmentValitdationManager to fix commit protocol failures
@noob-se7en
Copy link
Contributor

noob-se7en commented Jan 27, 2025

So this check is only performed when RealtimeSegmentValidationManager job runs right? If yes then should this logic be part of separate dedicated job with higher frequency?

@@ -2195,6 +2198,139 @@ URI createSegmentPath(String rawTableName, String segmentName) {
return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName));
}

/**
* Re-ingests segments that are in DONE status with a missing download URL, but also
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be:
Re-ingests segments that are in ONLINE status with a missing download URL, but also

LOGGER.info(
"Segment {} in table {} is in ERROR state with download URL present. Resetting segment to ONLINE state.",
segmentName, tableNameWithType);
_helixResourceManager.resetSegment(tableNameWithType, segmentName, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reset segment does not work when the SegmentDataManager is missing on the server. Consider the following scenario:
A segment has missing url. The server hosting these segments restart and the segment goes in ERROR state in EV.
The re-ingestion updates the ZKMetadata and the reset segment message is sent.
The server does not have any SegmentDataManager instance for the segment and hence the reset does not work.

  protected void doReplaceSegment(String segmentName)
      throws Exception {
    SegmentDataManager segmentDataManager = _segmentDataManagerMap.get(segmentName);
    if (segmentDataManager != null) {
      SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName);
      IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
      indexLoadingConfig.setSegmentTier(zkMetadata.getTier());
      replaceSegmentIfCrcMismatch(segmentDataManager, zkMetadata, indexLoadingConfig);
    } else {
      _logger.warn("Failed to find segment: {}, skipping replacing it", segmentName);
    }
  }


A ran the above code and found the following error.
[upsertMeetupRsvp_with_dr_2_REALTIME-RealtimeTableDataManager] [HelixTaskExecutor-message_handle_thread_40] Failed to find segment: upsertMeetupRsvp_with_dr_2__0__57__20250127T0745Z, skipping replacing it

@KKcorps KKcorps force-pushed the pauseless-reingestion branch from 45f6f29 to 609942d Compare January 27, 2025 18:26
@KKcorps KKcorps changed the title Pauseless reingestion #3: Disaster Recovery with Reingestion Pauseless Consumption #3: Disaster Recovery with Reingestion Jan 29, 2025
@KKcorps KKcorps changed the base branch from resolve-failures-pauseless-ingestion to master January 29, 2025 09:39
@KKcorps KKcorps requested a review from Jackie-Jiang January 29, 2025 09:59
@codecov-commenter
Copy link

codecov-commenter commented Jan 29, 2025

Codecov Report

Attention: Patch coverage is 1.43369% with 550 lines in your changes missing coverage. Please review.

Project coverage is 63.49%. Comparing base (59551e4) to head (e84788a).
Report is 1644 commits behind head on master.

Files with missing lines Patch % Lines
...estion/utils/SimpleRealtimeSegmentDataManager.java 0.00% 250 Missing ⚠️
...inot/server/api/resources/ReIngestionResource.java 0.00% 189 Missing ⚠️
.../core/realtime/PinotLLCRealtimeSegmentManager.java 0.00% 70 Missing ⚠️
...e/pinot/common/utils/FileUploadDownloadClient.java 0.00% 20 Missing ⚠️
.../api/resources/reingestion/ReIngestionRequest.java 0.00% 7 Missing ⚠️
...api/resources/reingestion/ReIngestionResponse.java 0.00% 6 Missing ⚠️
...r/validation/RealtimeSegmentValidationManager.java 0.00% 3 Missing ⚠️
...ata/manager/realtime/RealtimeTableDataManager.java 0.00% 2 Missing ⚠️
...altime/ServerSegmentCompletionProtocolHandler.java 0.00% 2 Missing ⚠️
.../pinot/core/data/manager/BaseTableDataManager.java 88.88% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #14920      +/-   ##
============================================
+ Coverage     61.75%   63.49%   +1.74%     
- Complexity      207     1472    +1265     
============================================
  Files          2436     2713     +277     
  Lines        133233   152427   +19194     
  Branches      20636    23510    +2874     
============================================
+ Hits          82274    96781   +14507     
- Misses        44911    48374    +3463     
- Partials       6048     7272    +1224     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 63.47% <1.43%> (+1.76%) ⬆️
java-21 63.36% <1.43%> (+1.74%) ⬆️
skip-bytebuffers-false 63.48% <1.43%> (+1.73%) ⬆️
skip-bytebuffers-true 63.35% <1.43%> (+35.62%) ⬆️
temurin 63.49% <1.43%> (+1.74%) ⬆️
unittests 63.48% <1.43%> (+1.74%) ⬆️
unittests1 56.23% <24.24%> (+9.34%) ⬆️
unittests2 33.91% <0.00%> (+6.18%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@noob-se7en noob-se7en left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partial review

// Grab start/end offsets
String startOffsetStr = segmentZKMetadata.getStartOffset();
String endOffsetStr = segmentZKMetadata.getEndOffset();
if (startOffsetStr == null || endOffsetStr == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) maybe let's breakup all these similar validations into separate method (will be useful for unit testing as well).

} catch (Exception e) {
LOGGER.error("Error during async re-ingestion for job {} (segment={})", jobId, segmentName, e);
} finally {
isIngesting.set(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't there be race condition here when same segments is again scheduled to be re-ingested?

// Generate a jobId for tracking
String jobId = UUID.randomUUID().toString();
ReIngestionJob job = new ReIngestionJob(jobId, tableNameWithType, segmentName);
RUNNING_JOBS.put(jobId, job);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) should add to RUNNING_JOBS after submitting to executor.

String tableNameWithType = request.getTableNameWithType();
String segmentName = request.getSegmentName();

if (RUNNING_JOBS.size() >= MAX_PARALLEL_REINGESTIONS) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These states are in-memory hence curious about the case when controllers sends re-ingest request to another server for same segment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. Currently it will get executed. However that scenario is unlikely as reingestion runs extremely infrequently (once per few hours)

One possible solution could be ZK

Another would be to keep track on controller side instead of server side.

Will check

Comment on lines +79 to +83
/**
* Simplified Segment Data Manager for ingesting data from a start offset to an end offset.
*/
public class SimpleRealtimeSegmentDataManager extends SegmentDataManager {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this create code debt in future maintaining both SimpleRealtimeSegmentDataManager and RealtimeSegmentDataManager?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should leverage RealtimeSegmentDataManager methods in this class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately that is not possible as these methods diverge a lot from main ones

throws Exception {
try {
manager.startConsumption();
waitForCondition((Void) -> manager.isDoneConsuming(), 1000, CONSUMPTION_END_TIMEOUT_MS, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intervals and timeouts seems quite small

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants