Skip to content

Commit

Permalink
rebase with master
Browse files Browse the repository at this point in the history
  • Loading branch information
Kartik Khare authored and Kartik Khare committed Dec 11, 2024
1 parent 25340aa commit e9dbd81
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,7 @@ public SegmentCompletionProtocol.Response extendBuildTime(final String instanceI
* the _winner.
*/
@Override
public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams,
boolean success, boolean isSplitCommit, CommittingSegmentDescriptor committingSegmentDescriptor) {
public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams, CommittingSegmentDescriptor committingSegmentDescriptor) {
String instanceId = reqParams.getInstanceId();
StreamPartitionMsgOffset offset =
_streamPartitionMsgOffsetFactory.create(reqParams.getStreamPartitionMsgOffset());
Expand All @@ -342,12 +341,8 @@ public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProt
_segmentName.getSegmentName(), _winner, _winningOffset);
return abortAndReturnFailed();
}
if (!success) {
_logger.error("Segment upload failed");
return abortAndReturnFailed();
}
SegmentCompletionProtocol.Response response =
commitSegment(reqParams, isSplitCommit, committingSegmentDescriptor);
commitSegment(reqParams, committingSegmentDescriptor);
if (!response.equals(SegmentCompletionProtocol.RESP_COMMIT_SUCCESS)) {
return abortAndReturnFailed();
} else {
Expand Down Expand Up @@ -752,7 +747,7 @@ private SegmentCompletionProtocol.Response processConsumedAfterCommitStart(Strin
}

private SegmentCompletionProtocol.Response commitSegment(SegmentCompletionProtocol.Request.Params reqParams,
boolean isSplitCommit, CommittingSegmentDescriptor committingSegmentDescriptor) {
CommittingSegmentDescriptor committingSegmentDescriptor) {
String instanceId = reqParams.getInstanceId();
StreamPartitionMsgOffset offset =
_streamPartitionMsgOffsetFactory.create(reqParams.getStreamPartitionMsgOffset());
Expand All @@ -768,14 +763,12 @@ private SegmentCompletionProtocol.Response commitSegment(SegmentCompletionProtoc
// so we need to move the segment file to its permanent location first before committing the metadata.
// The committingSegmentDescriptor is then updated with the permanent segment location to be saved in metadata
// store.
if (isSplitCommit) {
try {
_segmentManager.commitSegmentFile(_realtimeTableName, committingSegmentDescriptor);
} catch (Exception e) {
_logger.error("Caught exception while committing segment file for segment: {}", _segmentName.getSegmentName(),
e);
return SegmentCompletionProtocol.RESP_FAILED;
}
try {
_segmentManager.commitSegmentFile(_realtimeTableName, committingSegmentDescriptor);
} catch (Exception e) {
_logger.error("Caught exception while committing segment file for segment: {}", _segmentName.getSegmentName(),
e);
return SegmentCompletionProtocol.RESP_FAILED;
}
try {
// Convert to a controller uri if the segment location uses local file scheme.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,9 @@ SegmentCompletionProtocol.Response extendBuildTime(String instanceId, StreamPart
* successfully committed or the FSM transitions to an error state.
*
* @param reqParams The parameters of the commit request.
* @param success {@code true} if the commit was successful, {@code false} otherwise.
* @param isSplitCommit {@code true} if the commit uses split commit protocol, {@code false} otherwise.
* @param committingSegmentDescriptor Metadata about the segment being committed.
* @return A response indicating whether the commit was successful or failed.
*/
SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams,
boolean success, boolean isSplitCommit, CommittingSegmentDescriptor committingSegmentDescriptor);
CommittingSegmentDescriptor committingSegmentDescriptor);
}

0 comments on commit e9dbd81

Please sign in to comment.