Skip to content

Commit

Permalink
nit
Browse files Browse the repository at this point in the history
  • Loading branch information
Harnoor7 committed Dec 13, 2024
1 parent 546f27e commit fae4aaa
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@
* {@link org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments.RealtimeToOfflineSegmentsTaskExecutor}
* before uploading offline segment(s) to the offline table.
*/
public class ExpectedRealtimeOfflineTaskResultInfo {
public class ExpectedRealtimeToOfflineTaskResultInfo {
private final List<String> _segmentsFrom;
private final List<String> _segmentsTo;
private final String _id;
private final String _taskID;

public ExpectedRealtimeOfflineTaskResultInfo(List<String> segmentsFrom, List<String> segmentsTo, String taskID) {
public ExpectedRealtimeToOfflineTaskResultInfo(List<String> segmentsFrom, List<String> segmentsTo, String taskID) {
_segmentsFrom = segmentsFrom;
_segmentsTo = segmentsTo;
_taskID = taskID;
_id = UUID.randomUUID().toString();
}

public ExpectedRealtimeOfflineTaskResultInfo(List<String> segmentsFrom, List<String> segmentsTo,
public ExpectedRealtimeToOfflineTaskResultInfo(List<String> segmentsFrom, List<String> segmentsTo,
String realtimeToOfflineSegmentsMapId, String taskID) {
_segmentsFrom = segmentsFrom;
_segmentsTo = segmentsTo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class RealtimeToOfflineSegmentsTaskMetadata extends BaseTaskMetadata {

private final String _tableNameWithType;
private long _watermarkMs;
private final List<ExpectedRealtimeOfflineTaskResultInfo> _expectedRealtimeToOfflineSegmentsTaskResultList;
private final List<ExpectedRealtimeToOfflineTaskResultInfo> _expectedRealtimeToOfflineSegmentsTaskResultList;

public RealtimeToOfflineSegmentsTaskMetadata(String tableNameWithType, long watermarkMs) {
_watermarkMs = watermarkMs;
Expand All @@ -65,7 +65,7 @@ public RealtimeToOfflineSegmentsTaskMetadata(String tableNameWithType, long wate
}

public RealtimeToOfflineSegmentsTaskMetadata(String tableNameWithType, long watermarkMs,
List<ExpectedRealtimeOfflineTaskResultInfo> expectedRealtimeToOfflineSegmentsMapList) {
List<ExpectedRealtimeToOfflineTaskResultInfo> expectedRealtimeToOfflineSegmentsMapList) {
_tableNameWithType = tableNameWithType;
_watermarkMs = watermarkMs;
_expectedRealtimeToOfflineSegmentsTaskResultList = expectedRealtimeToOfflineSegmentsMapList;
Expand All @@ -75,7 +75,7 @@ public String getTableNameWithType() {
return _tableNameWithType;
}

public List<ExpectedRealtimeOfflineTaskResultInfo> getExpectedRealtimeToOfflineSegmentsTaskResultList() {
public List<ExpectedRealtimeToOfflineTaskResultInfo> getExpectedRealtimeToOfflineSegmentsTaskResultList() {
return _expectedRealtimeToOfflineSegmentsTaskResultList;
}

Expand All @@ -92,7 +92,7 @@ public long getWatermarkMs() {

public static RealtimeToOfflineSegmentsTaskMetadata fromZNRecord(ZNRecord znRecord) {
long watermark = znRecord.getLongField(WATERMARK_KEY, 0);
List<ExpectedRealtimeOfflineTaskResultInfo> expectedRealtimeToOfflineSegmentsMapList = new ArrayList<>();
List<ExpectedRealtimeToOfflineTaskResultInfo> expectedRealtimeToOfflineSegmentsMapList = new ArrayList<>();
Map<String, List<String>> listFields = znRecord.getListFields();
for (Map.Entry<String, List<String>> listField : listFields.entrySet()) {
String realtimeToOfflineSegmentsMapId = listField.getKey();
Expand All @@ -102,7 +102,8 @@ public static RealtimeToOfflineSegmentsTaskMetadata fromZNRecord(ZNRecord znReco
List<String> segmentsTo = Arrays.asList(StringUtils.split(value.get(1), COMMA_SEPARATOR));
String taskID = value.get(2);
expectedRealtimeToOfflineSegmentsMapList.add(
new ExpectedRealtimeOfflineTaskResultInfo(segmentsFrom, segmentsTo, realtimeToOfflineSegmentsMapId, taskID));
new ExpectedRealtimeToOfflineTaskResultInfo(segmentsFrom, segmentsTo, realtimeToOfflineSegmentsMapId, taskID)
);
}
return new RealtimeToOfflineSegmentsTaskMetadata(znRecord.getId(), watermark,
expectedRealtimeToOfflineSegmentsMapList);
Expand All @@ -111,7 +112,7 @@ public static RealtimeToOfflineSegmentsTaskMetadata fromZNRecord(ZNRecord znReco
public ZNRecord toZNRecord() {
ZNRecord znRecord = new ZNRecord(_tableNameWithType);
znRecord.setLongField(WATERMARK_KEY, _watermarkMs);
for (ExpectedRealtimeOfflineTaskResultInfo realtimeToOfflineSegmentsMap
for (ExpectedRealtimeToOfflineTaskResultInfo realtimeToOfflineSegmentsMap
: _expectedRealtimeToOfflineSegmentsTaskResultList) {
String segmentsFrom = String.join(COMMA_SEPARATOR, realtimeToOfflineSegmentsMap.getSegmentsFrom());
String segmentsTo = String.join(COMMA_SEPARATOR, realtimeToOfflineSegmentsMap.getSegmentsTo());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.minion.ExpectedRealtimeOfflineTaskResultInfo;
import org.apache.pinot.common.minion.ExpectedRealtimeToOfflineTaskResultInfo;
import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
Expand Down Expand Up @@ -219,7 +219,7 @@ protected void preUploadSegments(SegmentUploadContext context)
RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata =
RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(realtimeToOfflineSegmentsTaskZNRecord);

List<ExpectedRealtimeOfflineTaskResultInfo>
List<ExpectedRealtimeToOfflineTaskResultInfo>
expectedRealtimeToOfflineSegmentsMapList =
realtimeToOfflineSegmentsTaskMetadata.getExpectedRealtimeToOfflineSegmentsTaskResultList();

Expand All @@ -233,8 +233,8 @@ protected void preUploadSegments(SegmentUploadContext context)

PinotTaskConfig pinotTaskConfig = context.getPinotTaskConfig();

ExpectedRealtimeOfflineTaskResultInfo realtimeToOfflineSegmentsMap =
new ExpectedRealtimeOfflineTaskResultInfo(segmentsFrom, segmentsTo,
ExpectedRealtimeToOfflineTaskResultInfo realtimeToOfflineSegmentsMap =
new ExpectedRealtimeToOfflineTaskResultInfo(segmentsFrom, segmentsTo,
pinotTaskConfig.getTaskId());

expectedRealtimeToOfflineSegmentsMapList.add(realtimeToOfflineSegmentsMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.minion.ExpectedRealtimeOfflineTaskResultInfo;
import org.apache.pinot.common.minion.ExpectedRealtimeToOfflineTaskResultInfo;
import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
Expand Down Expand Up @@ -190,7 +190,7 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {

// get past minion task runs expected results. This list can have both successful and
// failed task's expected results.
List<ExpectedRealtimeOfflineTaskResultInfo> expectedRealtimeToOfflineSegmentsMapList =
List<ExpectedRealtimeToOfflineTaskResultInfo> expectedRealtimeToOfflineSegmentsMapList =
realtimeToOfflineSegmentsTaskMetadata.getExpectedRealtimeToOfflineSegmentsTaskResultList();
Map<String, List<String>> realtimeSegmentNameVsCorrespondingOfflineSegmentNamesOfPrevTask =
getRealtimeVsCorrespondingOfflineSegmentNames(expectedRealtimeToOfflineSegmentsMapList);
Expand Down Expand Up @@ -339,12 +339,12 @@ private boolean checkIfSegmentNeedsToBeReProcessed(List<String> expectedCorrespo
}

private Map<String, List<String>> getRealtimeVsCorrespondingOfflineSegmentNames(
List<ExpectedRealtimeOfflineTaskResultInfo>
List<ExpectedRealtimeToOfflineTaskResultInfo>
expectedRealtimeToOfflineSegmentsMapList) {
Map<String, List<String>> realtimeSegmentNameVsCorrespondingOfflineSegmentNames
= new HashMap<>();

for (ExpectedRealtimeOfflineTaskResultInfo realtimeToOfflineSegmentsMap
for (ExpectedRealtimeToOfflineTaskResultInfo realtimeToOfflineSegmentsMap
: expectedRealtimeToOfflineSegmentsMapList) {
List<String> segmentsFrom = realtimeToOfflineSegmentsMap.getSegmentsFrom();
List<String> segmentsTo = realtimeToOfflineSegmentsMap.getSegmentsTo();
Expand Down

0 comments on commit fae4aaa

Please sign in to comment.