Skip to content

Commit

Permalink
Do not hard code resource class in BaseClusterIntegrationTest (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Jun 15, 2024
1 parent 0ff43b5 commit b151554
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ public void testPinotTaskManagerSchedulerWithUpdate()
"MergeRollupTask", ImmutableMap.of("schedule", "0 */10 * ? * * *"))));
updateTableConfig(tableConfig);
waitForJobGroupNames(_controllerStarter.getTaskManager(),
jgn -> jgn.size() == 2 && jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE) && jgn
.contains(MinionConstants.MergeRollupTask.TASK_TYPE),
jgn -> jgn.size() == 2 && jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE) && jgn.contains(
MinionConstants.MergeRollupTask.TASK_TYPE),
"JobGroupNames should have SegmentGenerationAndPushTask and MergeRollupTask");
validateJob(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, "0 */30 * ? * * *");
validateJob(MinionConstants.MergeRollupTask.TASK_TYPE, "0 */10 * ? * * *");
Expand Down Expand Up @@ -203,8 +203,10 @@ public void testPinotTaskManagerSchedulerWithRestart()
"JobGroupNames should have SegmentGenerationAndPushTask only");
validateJob(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, "0 */10 * ? * * *");

// Restart controller
// Restart controller. We need to set the port after stopping the controller because we are reusing the same config.
int controllerPort = _controllerPort;
stopController();
_controllerPort = controllerPort;
startController(properties);
// wait for controller to start correctly.
TestUtils.waitForCondition((aVoid) -> {
Expand Down Expand Up @@ -232,8 +234,8 @@ public void testPinotTaskManagerSchedulerWithRestart()
// The new MergeRollup task wouldn't be scheduled if not eagerly checking table configs
// after setting up subscriber on ChildChanges zk event when controller gets restarted.
waitForJobGroupNames(_controllerStarter.getTaskManager(),
jgn -> jgn.size() == 2 && jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE) && jgn
.contains(MinionConstants.MergeRollupTask.TASK_TYPE),
jgn -> jgn.size() == 2 && jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE) && jgn.contains(
MinionConstants.MergeRollupTask.TASK_TYPE),
"JobGroupNames should have SegmentGenerationAndPushTask and MergeRollupTask");

dropOfflineTable(RAW_TABLE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ protected Schema createSchema()

protected Schema createSchema(String schemaFileName)
throws IOException {
InputStream inputStream = BaseClusterIntegrationTest.class.getClassLoader().getResourceAsStream(schemaFileName);
InputStream inputStream = getClass().getClassLoader().getResourceAsStream(schemaFileName);
Assert.assertNotNull(inputStream);
return Schema.fromInputStream(inputStream);
}
Expand All @@ -291,13 +291,13 @@ protected TableConfig createTableConfig(File tableConfigFile)
* Creates a new OFFLINE table config.
*/
protected TableConfig createOfflineTableConfig() {
return new TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName())
.setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
.setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
.setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
.setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
.setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()).setBrokerTenant(getBrokerTenant())
.setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()).setQueryConfig(getQueryConfig())
return new TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()).setTimeColumnName(getTimeColumnName())
.setSortedColumn(getSortedColumn()).setInvertedIndexColumns(getInvertedIndexColumns())
.setNoDictionaryColumns(getNoDictionaryColumns()).setRangeIndexColumns(getRangeIndexColumns())
.setBloomFilterColumns(getBloomFilterColumns()).setFieldConfigList(getFieldConfigs())
.setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode())
.setTaskConfig(getTaskConfig()).setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant())
.setIngestionConfig(getIngestionConfig()).setQueryConfig(getQueryConfig())
.setNullHandlingEnabled(getNullHandlingEnabled()).setSegmentPartitionConfig(getSegmentPartitionConfig())
.build();
}
Expand Down Expand Up @@ -399,9 +399,8 @@ protected Map<String, String> getCSVDecoderProperties(@Nullable String delimiter
/**
* Creates a new Upsert enabled table config.
*/
protected TableConfig createCSVUpsertTableConfig(String tableName, @Nullable String kafkaTopicName,
int numPartitions, Map<String, String> streamDecoderProperties, UpsertConfig upsertConfig,
String primaryKeyColumn) {
protected TableConfig createCSVUpsertTableConfig(String tableName, @Nullable String kafkaTopicName, int numPartitions,
Map<String, String> streamDecoderProperties, UpsertConfig upsertConfig, String primaryKeyColumn) {
Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new HashMap<>();
columnPartitionConfigMap.put(primaryKeyColumn, new ColumnPartitionConfig("Murmur", numPartitions));

Expand Down Expand Up @@ -469,18 +468,14 @@ protected org.apache.pinot.client.Connection getPinotConnection() {
Properties properties = getPinotConnectionProperties();
properties.put("useMultistageEngine", "true");
_pinotConnectionV2 = ConnectionFactory.fromZookeeper(getZkUrl() + "/" + getHelixClusterName(),
new JsonAsyncHttpPinotClientTransportFactory()
.withConnectionProperties(properties)
.buildTransport());
new JsonAsyncHttpPinotClientTransportFactory().withConnectionProperties(properties).buildTransport());
}
return _pinotConnectionV2;
}
if (_pinotConnection == null) {
_pinotConnection =
ConnectionFactory.fromZookeeper(getZkUrl() + "/" + getHelixClusterName(),
new JsonAsyncHttpPinotClientTransportFactory()
.withConnectionProperties(getPinotConnectionProperties())
.buildTransport());
_pinotConnection = ConnectionFactory.fromZookeeper(getZkUrl() + "/" + getHelixClusterName(),
new JsonAsyncHttpPinotClientTransportFactory().withConnectionProperties(getPinotConnectionProperties())
.buildTransport());
}
return _pinotConnection;
}
Expand Down Expand Up @@ -554,7 +549,7 @@ protected List<File> unpackAvroData(File outputDir)
*/
protected List<File> unpackTarData(String tarFileName, File outputDir)
throws Exception {
InputStream inputStream = BaseClusterIntegrationTest.class.getClassLoader().getResourceAsStream(tarFileName);
InputStream inputStream = getClass().getClassLoader().getResourceAsStream(tarFileName);
Assert.assertNotNull(inputStream);
return TarGzCompressionUtils.untar(inputStream, outputDir);
}
Expand Down

0 comments on commit b151554

Please sign in to comment.