Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

moving DLO strategy output table to a StrategyDAO for better testing #294

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.openhouse.datalayout.datasource.TablePartitionStats;
import com.linkedin.openhouse.datalayout.generator.OpenHouseDataLayoutStrategyGenerator;
import com.linkedin.openhouse.datalayout.persistence.StrategiesDao;
import com.linkedin.openhouse.datalayout.persistence.StrategiesDaoInternal;
import com.linkedin.openhouse.datalayout.persistence.StrategiesDaoTableProps;
import com.linkedin.openhouse.datalayout.strategy.DataLayoutStrategy;
import com.linkedin.openhouse.jobs.spark.state.StateManager;
Expand Down Expand Up @@ -57,106 +58,27 @@ private void runInnerTableScope(
strategies.stream().map(Object::toString).collect(Collectors.joining(", ")));
StrategiesDao dao = StrategiesDaoTableProps.builder().spark(spark).build();
dao.save(fqtn, strategies);
appendToDloStrategiesTable(spark, outputFqtn, strategies, false);
StrategiesDao internalDao =
StrategiesDaoInternal.builder()
.spark(spark)
.outputFqtn(outputFqtn)
.isPartitionScope(false)
.build();
internalDao.save(fqtn, strategies);
}

private void runInnerPartitionScope(
SparkSession spark, OpenHouseDataLayoutStrategyGenerator strategiesGenerator) {
log.info("Generating partition-level strategies for table {}", fqtn);
List<DataLayoutStrategy> strategies = strategiesGenerator.generatePartitionLevelStrategies();
log.info("Generated {} strategies", strategies.size());
appendToDloStrategiesTable(spark, partitionLevelOutputFqtn, strategies, true);
}

private void appendToDloStrategiesTable(
SparkSession spark,
String outputFqtn,
List<DataLayoutStrategy> strategies,
boolean isPartitionScope) {
if (outputFqtn != null && !strategies.isEmpty()) {
createTableIfNotExists(spark, outputFqtn, isPartitionScope);
List<String> rows = new ArrayList<>();
for (DataLayoutStrategy strategy : strategies) {
if (isPartitionScope) {
rows.add(
String.format(
"('%s', '%s', '%s', current_timestamp(), %f, %f, %f, %d, %d, %d, %d, %d, %d)",
fqtn,
strategy.getPartitionId(),
strategy.getPartitionColumns(),
strategy.getCost(),
strategy.getGain(),
strategy.getEntropy(),
strategy.getPosDeleteFileCount(),
strategy.getEqDeleteFileCount(),
strategy.getPosDeleteFileBytes(),
strategy.getEqDeleteFileBytes(),
strategy.getPosDeleteRecordCount(),
strategy.getEqDeleteRecordCount()));
} else {
rows.add(
String.format(
"('%s', current_timestamp(), %f, %f, %f, %d, %d, %d, %d, %d, %d)",
fqtn,
strategy.getCost(),
strategy.getGain(),
strategy.getEntropy(),
strategy.getPosDeleteFileCount(),
strategy.getEqDeleteFileCount(),
strategy.getPosDeleteFileBytes(),
strategy.getEqDeleteFileBytes(),
strategy.getPosDeleteRecordCount(),
strategy.getEqDeleteRecordCount()));
}
}
String strategiesInsertStmt =
String.format("INSERT INTO %s VALUES %s", outputFqtn, String.join(", ", rows));
log.info("Running {}", strategiesInsertStmt);
spark.sql(strategiesInsertStmt);
}
}

private void createTableIfNotExists(
SparkSession spark, String outputFqtn, boolean isPartitionScope) {
if (isPartitionScope) {
spark.sql(
String.format(
"CREATE TABLE IF NOT EXISTS %s ("
+ "fqtn STRING, "
+ "partition_id STRING, "
+ "partition_columns STRING, "
+ "timestamp TIMESTAMP, "
+ "estimated_compute_cost DOUBLE, "
+ "estimated_file_count_reduction DOUBLE, "
+ "file_size_entropy DOUBLE, "
+ "pos_delete_file_count LONG, "
+ "eq_delete_file_count LONG, "
+ "pos_delete_file_bytes LONG, "
+ "eq_delete_file_bytes LONG,"
+ "pos_delete_record_count LONG, "
+ "eq_delete_record_count LONG"
+ ") "
+ "PARTITIONED BY (days(timestamp))",
outputFqtn));
} else {
spark.sql(
String.format(
"CREATE TABLE IF NOT EXISTS %s ("
+ "fqtn STRING, "
+ "timestamp TIMESTAMP, "
+ "estimated_compute_cost DOUBLE, "
+ "estimated_file_count_reduction DOUBLE, "
+ "file_size_entropy DOUBLE, "
+ "pos_delete_file_count LONG, "
+ "eq_delete_file_count LONG, "
+ "pos_delete_file_bytes LONG, "
+ "eq_delete_file_bytes LONG,"
+ "pos_delete_record_count LONG, "
+ "eq_delete_record_count LONG"
+ ") "
+ "PARTITIONED BY (days(timestamp))",
outputFqtn));
}
StrategiesDao internalDao =
StrategiesDaoInternal.builder()
.spark(spark)
.outputFqtn(partitionLevelOutputFqtn)
.isPartitionScope(true)
.build();
internalDao.save(fqtn, strategies);
}

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package com.linkedin.openhouse.datalayout.persistence;

import com.linkedin.openhouse.datalayout.config.DataCompactionConfig;
import com.linkedin.openhouse.datalayout.strategy.DataLayoutStrategy;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

/**
* DAO implementation for persisting and loading data layout optimization strategies in table
* properties.
*/
@Slf4j
@Builder
public class StrategiesDaoInternal implements StrategiesDao {
private final SparkSession spark;
private final boolean isPartitionScope;
private final String outputFqtn;

@Override
public void save(String targetFqtn, List<DataLayoutStrategy> strategies) {
if (outputFqtn != null && !strategies.isEmpty()) {
createTableIfNotExists(spark, outputFqtn, isPartitionScope);
List<String> rows = new ArrayList<>();
for (DataLayoutStrategy strategy : strategies) {
if (isPartitionScope) {
rows.add(
String.format(
"('%s', '%s', '%s', current_timestamp(), %f, %f, %f, %d, %d, %d, %d, %d, %d)",
targetFqtn,
strategy.getPartitionId(),
strategy.getPartitionColumns(),
strategy.getCost(),
strategy.getGain(),
strategy.getEntropy(),
strategy.getPosDeleteFileCount(),
strategy.getEqDeleteFileCount(),
strategy.getPosDeleteFileBytes(),
strategy.getEqDeleteFileBytes(),
strategy.getPosDeleteRecordCount(),
strategy.getEqDeleteRecordCount()));
} else {
rows.add(
String.format(
"('%s', current_timestamp(), %f, %f, %f, %d, %d, %d, %d, %d, %d)",
targetFqtn,
strategy.getCost(),
strategy.getGain(),
strategy.getEntropy(),
strategy.getPosDeleteFileCount(),
strategy.getEqDeleteFileCount(),
strategy.getPosDeleteFileBytes(),
strategy.getEqDeleteFileBytes(),
strategy.getPosDeleteRecordCount(),
strategy.getEqDeleteRecordCount()));
}
}
String strategiesInsertStmt =
String.format("INSERT INTO %s VALUES %s", outputFqtn, String.join(", ", rows));
log.info("Running {}", strategiesInsertStmt);
spark.sql(strategiesInsertStmt);
}
}

@Override
public List<DataLayoutStrategy> load(String fqtn) {
Dataset<Row> df = spark.table(fqtn);
return df.collectAsList().stream()
.map(
row -> {
DataLayoutStrategy.DataLayoutStrategyBuilder builder =
DataLayoutStrategy.builder()
.config(DataCompactionConfig.builder().build())
.entropy(row.getAs("file_size_entropy"))
.cost(row.getAs("estimated_compute_cost"))
.gain(row.getAs("estimated_file_count_reduction"))
.posDeleteFileCount(row.getAs("pos_delete_file_count"))
.eqDeleteFileCount(row.getAs("eq_delete_file_count"))
.posDeleteFileBytes(row.getAs("pos_delete_file_bytes"))
.eqDeleteFileBytes(row.getAs("eq_delete_file_bytes"))
.posDeleteRecordCount(row.getAs("pos_delete_record_count"))
.eqDeleteRecordCount(row.getAs("eq_delete_record_count"));
if (isPartitionScope) {
builder
.partitionId(row.getAs("partition_id"))
.partitionColumns(row.getAs("partition_columns"));
}
return builder.build();
})
.collect(Collectors.toList());
}

private void createTableIfNotExists(
SparkSession spark, String outputFqtn, boolean isPartitionScope) {
if (isPartitionScope) {
spark.sql(
String.format(
"CREATE TABLE IF NOT EXISTS %s ("
+ "fqtn STRING, "
+ "partition_id STRING, "
+ "partition_columns STRING, "
+ "timestamp TIMESTAMP, "
+ "estimated_compute_cost DOUBLE, "
+ "estimated_file_count_reduction DOUBLE, "
+ "file_size_entropy DOUBLE, "
+ "pos_delete_file_count LONG, "
+ "eq_delete_file_count LONG, "
+ "pos_delete_file_bytes LONG, "
+ "eq_delete_file_bytes LONG,"
+ "pos_delete_record_count LONG, "
+ "eq_delete_record_count LONG"
+ ") "
+ "PARTITIONED BY (days(timestamp))",
outputFqtn));
} else {
spark.sql(
String.format(
"CREATE TABLE IF NOT EXISTS %s ("
+ "fqtn STRING, "
+ "timestamp TIMESTAMP, "
+ "estimated_compute_cost DOUBLE, "
+ "estimated_file_count_reduction DOUBLE, "
+ "file_size_entropy DOUBLE, "
+ "pos_delete_file_count LONG, "
+ "eq_delete_file_count LONG, "
+ "pos_delete_file_bytes LONG, "
+ "eq_delete_file_bytes LONG,"
+ "pos_delete_record_count LONG, "
+ "eq_delete_record_count LONG"
+ ") "
+ "PARTITIONED BY (days(timestamp))",
outputFqtn));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.openhouse.datalayout.datasource.TablePartitionStats;
import com.linkedin.openhouse.datalayout.generator.OpenHouseDataLayoutStrategyGenerator;
import com.linkedin.openhouse.datalayout.persistence.StrategiesDao;
import com.linkedin.openhouse.datalayout.persistence.StrategiesDaoInternal;
import com.linkedin.openhouse.datalayout.persistence.StrategiesDaoTableProps;
import com.linkedin.openhouse.datalayout.strategy.DataLayoutStrategy;
import com.linkedin.openhouse.tablestest.OpenHouseSparkITest;
Expand Down Expand Up @@ -37,6 +38,20 @@ public void testCompactionStrategyGenerationWithPersistencePartitioned() throws
dao.save(testTable, strategies);
List<DataLayoutStrategy> retrievedStrategies = dao.load(testTable);
Assertions.assertEquals(strategies, retrievedStrategies);

StrategiesDao internalDao =
StrategiesDaoInternal.builder()
.spark(spark)
.outputFqtn("db.dlo_output")
.isPartitionScope(true)
.build();
internalDao.save(testTable, strategies);
List<DataLayoutStrategy> retrievedStrategiesInternal = internalDao.load("db.dlo_output");
for (int i = 0; i < strategies.size(); i++) {
Assertions.assertEquals(
strategies.get(i).getPosDeleteFileCount(),
retrievedStrategiesInternal.get(i).getPosDeleteFileCount());
}
}
}

Expand All @@ -61,6 +76,20 @@ public void testCompactionStrategyGenerationNonPartitioned() throws Exception {
dao.save(testTable, strategies);
List<DataLayoutStrategy> retrievedStrategies = dao.load(testTable);
Assertions.assertEquals(strategies, retrievedStrategies);

StrategiesDao internalDao =
StrategiesDaoInternal.builder()
.spark(spark)
.outputFqtn("db.dlo_output2")
.isPartitionScope(false)
.build();
internalDao.save(testTable, strategies);
List<DataLayoutStrategy> retrievedStrategiesInternal = internalDao.load("db.dlo_output2");
for (int i = 0; i < strategies.size(); i++) {
Assertions.assertEquals(
strategies.get(i).getPosDeleteFileCount(),
retrievedStrategiesInternal.get(i).getPosDeleteFileCount());
}
}
}

Expand Down
Loading