Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
noob-se7en committed Dec 16, 2024
1 parent 1baf68e commit 1754dbc
Show file tree
Hide file tree
Showing 5 changed files with 339 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,24 +51,27 @@
*/
public class RealtimeToOfflineSegmentsTaskMetadata extends BaseTaskMetadata {

private static final String WATERMARK_KEY = "watermarkMs";
private static final String WINDOW_START_KEY = "windowStartMs";
private static final String WINDOW_END_KEY = "windowEndMs";
private static final String COMMA_SEPARATOR = ",";

private final String _tableNameWithType;
private long _watermarkMs;
private long _windowStartMs;
private final List<ExpectedRealtimeToOfflineTaskResultInfo> _expectedRealtimeToOfflineSegmentsTaskResultList;
private long _windowEndMs;

public RealtimeToOfflineSegmentsTaskMetadata(String tableNameWithType, long watermarkMs) {
_watermarkMs = watermarkMs;
_windowStartMs = watermarkMs;
_tableNameWithType = tableNameWithType;
_expectedRealtimeToOfflineSegmentsTaskResultList = new ArrayList<>();
}

public RealtimeToOfflineSegmentsTaskMetadata(String tableNameWithType, long watermarkMs,
List<ExpectedRealtimeToOfflineTaskResultInfo> expectedRealtimeToOfflineSegmentsMapList) {
public RealtimeToOfflineSegmentsTaskMetadata(String tableNameWithType, long windowStartMs,
long windowEndMs, List<ExpectedRealtimeToOfflineTaskResultInfo> expectedRealtimeToOfflineSegmentsMapList) {
_tableNameWithType = tableNameWithType;
_watermarkMs = watermarkMs;
_windowStartMs = windowStartMs;
_expectedRealtimeToOfflineSegmentsTaskResultList = expectedRealtimeToOfflineSegmentsMapList;
_windowEndMs = windowEndMs;
}

public String getTableNameWithType() {
Expand All @@ -79,19 +82,28 @@ public List<ExpectedRealtimeToOfflineTaskResultInfo> getExpectedRealtimeToOfflin
return _expectedRealtimeToOfflineSegmentsTaskResultList;
}

public void setWatermarkMs(long watermarkMs) {
_watermarkMs = watermarkMs;
public void setWindowStartMs(long windowStartMs) {
_windowStartMs = windowStartMs;
}

/**
* Get the watermark in millis
*/
public long getWatermarkMs() {
return _watermarkMs;
public long getWindowStartMs() {
return _windowStartMs;
}

public long getWindowEndMs() {
return _windowEndMs;
}

public void setWindowEndMs(long windowEndMs) {
_windowEndMs = windowEndMs;
}

public static RealtimeToOfflineSegmentsTaskMetadata fromZNRecord(ZNRecord znRecord) {
long watermark = znRecord.getLongField(WATERMARK_KEY, 0);
long windowStartMs = znRecord.getLongField(WINDOW_START_KEY, 0);
long windowEndMs = znRecord.getLongField(WINDOW_END_KEY, 0);
List<ExpectedRealtimeToOfflineTaskResultInfo> expectedRealtimeToOfflineSegmentsMapList = new ArrayList<>();
Map<String, List<String>> listFields = znRecord.getListFields();
for (Map.Entry<String, List<String>> listField : listFields.entrySet()) {
Expand All @@ -105,13 +117,14 @@ public static RealtimeToOfflineSegmentsTaskMetadata fromZNRecord(ZNRecord znReco
new ExpectedRealtimeToOfflineTaskResultInfo(segmentsFrom, segmentsTo, realtimeToOfflineSegmentsMapId, taskID)
);
}
return new RealtimeToOfflineSegmentsTaskMetadata(znRecord.getId(), watermark,
return new RealtimeToOfflineSegmentsTaskMetadata(znRecord.getId(), windowStartMs, windowEndMs,
expectedRealtimeToOfflineSegmentsMapList);
}

public ZNRecord toZNRecord() {
ZNRecord znRecord = new ZNRecord(_tableNameWithType);
znRecord.setLongField(WATERMARK_KEY, _watermarkMs);
znRecord.setLongField(WINDOW_START_KEY, _windowStartMs);
znRecord.setLongField(WINDOW_END_KEY, _windowEndMs);
for (ExpectedRealtimeToOfflineTaskResultInfo realtimeToOfflineSegmentsMap
: _expectedRealtimeToOfflineSegmentsTaskResultList) {
String segmentsFrom = String.join(COMMA_SEPARATOR, realtimeToOfflineSegmentsMap.getSegmentsFrom());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ public void testToFromZNRecord() {
RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata =
RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(znRecord);
assertEquals(realtimeToOfflineSegmentsTaskMetadata.getTableNameWithType(), "testTable_REALTIME");
assertEquals(realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs(), 1000);
assertEquals(realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs(), 1000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ private void waitForTaskToComplete(long expectedWatermark, String realtimeTableN
RealtimeToOfflineSegmentsTaskMetadata minionTaskMetadata =
znRecord != null ? RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(znRecord) : null;
assertNotNull(minionTaskMetadata);
assertEquals(minionTaskMetadata.getWatermarkMs(), expectedWatermark);
assertEquals(minionTaskMetadata.getWindowStartMs(), expectedWatermark);
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ public void preProcess(PinotTaskConfig pinotTaskConfig) {
RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata =
RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(realtimeToOfflineSegmentsTaskZNRecord);
long windowStartMs = Long.parseLong(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY));
Preconditions.checkState(realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs() <= windowStartMs,
Preconditions.checkState(realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs() <= windowStartMs,
"watermarkMs in RealtimeToOfflineSegmentsTask metadata: %s shouldn't be larger than windowStartMs: %d in task"
+ " configs for table: %s. ZNode may have been modified by another task",
realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs(), windowStartMs, realtimeTableName);
realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs(), windowStartMs, realtimeTableName);
}

@Override
Expand Down
Loading

0 comments on commit 1754dbc

Please sign in to comment.