Skip to content

Commit 05f666a

Browse files
author
Dmitriy Fingerman
committed
HIVE-29028: Iceberg: Implement auto compaction
1 parent 981e765 commit 05f666a

File tree

10 files changed

+858
-377
lines changed

10 files changed

+858
-377
lines changed

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,15 @@
6262
import org.apache.hadoop.hive.metastore.api.AggrStats;
6363
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
6464
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
65+
import org.apache.hadoop.hive.metastore.api.CompactionType;
6566
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
6667
import org.apache.hadoop.hive.metastore.api.FieldSchema;
6768
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
6869
import org.apache.hadoop.hive.metastore.api.LockType;
6970
import org.apache.hadoop.hive.metastore.api.MetaException;
7071
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
7172
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
73+
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
7274
import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
7375
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
7476
import org.apache.hadoop.hive.ql.Context.Operation;
@@ -180,6 +182,7 @@
180182
import org.apache.iceberg.mr.Catalogs;
181183
import org.apache.iceberg.mr.InputFormatConfig;
182184
import org.apache.iceberg.mr.hive.actions.HiveIcebergDeleteOrphanFiles;
185+
import org.apache.iceberg.mr.hive.compaction.evaluator.CompactionEvaluator;
183186
import org.apache.iceberg.mr.hive.plan.IcebergBucketFunction;
184187
import org.apache.iceberg.puffin.Blob;
185188
import org.apache.iceberg.puffin.BlobMetadata;
@@ -2381,4 +2384,15 @@ private static List<FieldSchema> schema(List<VirtualColumn> exprs) {
23812384
private static List<FieldSchema> orderBy(VirtualColumn... exprs) {
23822385
return schema(Arrays.asList(exprs));
23832386
}
2387+
2388+
@Override
2389+
public CompactionType determineCompactionType(org.apache.hadoop.hive.metastore.api.Table table, CompactionInfo ci)
2390+
throws IOException {
2391+
org.apache.iceberg.Table icebergTable = IcebergTableUtil.getTable(conf, table);
2392+
CompactionEvaluator compactionEvaluator = new CompactionEvaluator(icebergTable, ci, table.getParameters());
2393+
if (compactionEvaluator.isEligibleForCompaction()) {
2394+
return compactionEvaluator.determineCompactionType();
2395+
}
2396+
return null;
2397+
}
23842398
}

itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.util.List;
5353
import java.util.Map;
5454
import java.util.concurrent.TimeUnit;
55+
import java.util.concurrent.atomic.AtomicBoolean;
5556
import java.util.regex.Matcher;
5657
import java.util.regex.Pattern;
5758

@@ -69,6 +70,7 @@ public abstract class CompactorOnTezTest {
6970
protected IMetaStoreClient msClient;
7071
protected IDriver driver;
7172
protected boolean mmCompaction = false;
73+
private final AtomicBoolean stop = new AtomicBoolean();
7274

7375
@ClassRule
7476
public static TemporaryFolder folder = new TemporaryFolder();
@@ -541,4 +543,27 @@ protected void dropTable(String tblName) throws Exception {
541543
executeStatementOnDriver("drop table " + tblName, driver);
542544
}
543545
}
546+
547+
protected void startInitiator() throws Exception {
548+
runOneLoopOfCompactorThread(CompactorTestUtilities.CompactorThreadType.INITIATOR);
549+
}
550+
551+
protected InitiatorBase getInitiator() {
552+
return new Initiator();
553+
}
554+
555+
private void runOneLoopOfCompactorThread(CompactorTestUtilities.CompactorThreadType type) throws Exception {
556+
TestTxnDbUtil.setConfValues(conf);
557+
CompactorThread t;
558+
switch (type) {
559+
case INITIATOR: t = getInitiator(); break;
560+
case WORKER: t = new Worker(); break;
561+
case CLEANER: t = new Cleaner(); break;
562+
default: throw new RuntimeException("Huh? Unknown thread type.");
563+
}
564+
t.setConf(conf);
565+
stop.set(true);
566+
t.init(stop);
567+
t.run();
568+
}
544569
}

itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestIcebergCompactorOnTez.java

Lines changed: 100 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,66 +21,144 @@
2121
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
2222
import org.apache.hadoop.hive.metastore.api.CompactionType;
2323
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
24+
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
25+
import org.apache.hadoop.hive.metastore.txn.entities.CompactionState;
2426
import org.junit.Assert;
27+
import org.junit.Before;
2528
import org.junit.Test;
2629

2730
import java.util.ArrayList;
2831
import java.util.List;
32+
import java.util.Objects;
2933

3034
import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver;
3135

3236
public class TestIcebergCompactorOnTez extends CompactorOnTezTest {
37+
38+
private static final String DB_NAME = "default";
39+
private static final String TABLE_NAME = "ice_orc";
40+
private static final String QUALIFIED_TABLE_NAME = TxnUtils.getFullTableName(DB_NAME, TABLE_NAME);
41+
42+
@Override
43+
@Before
44+
public void setup() throws Exception {
45+
super.setup();
46+
executeStatementOnDriver("drop table if exists " + QUALIFIED_TABLE_NAME, driver);
47+
}
3348

3449
@Test
3550
public void testIcebergCompactorWithAllPartitionFieldTypes() throws Exception{
3651
conf.setVar(HiveConf.ConfVars.COMPACTOR_JOB_QUEUE, CUSTOM_COMPACTION_QUEUE);
3752
msClient = new HiveMetaStoreClient(conf);
3853

39-
String dbName = "default";
40-
String tableName = "ice_orc";
41-
String qualifiedTableName = dbName + "." + tableName;
42-
43-
executeStatementOnDriver("drop table if exists " + qualifiedTableName, driver);
4454
executeStatementOnDriver(String.format("create table %s " +
4555
"(id int, a string, b int, c bigint, d float, e double, f decimal(4, 2), g boolean, h date, i date, j date, k timestamp) " +
4656
"partitioned by spec(a, truncate(3, a), bucket(4, a), b, c, d, e, f, g, h, year(h), month(i), day(j), k, hour(k)) stored by iceberg stored as orc " +
47-
"tblproperties ('compactor.threshold.min.input.files'='1')", qualifiedTableName), driver);
57+
"tblproperties ('compactor.threshold.min.input.files'='1')", QUALIFIED_TABLE_NAME), driver);
4858

4959
// 6 records, one records per file --> 3 partitions, 2 files per partition
50-
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (1, 'aaa111', 1, 100, 1.0, 2.0, 4.00, true, DATE '2024-05-01', DATE '2024-05-01', DATE '2024-05-01', TIMESTAMP '2024-05-02 10:00:00')", qualifiedTableName), driver);
51-
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (2, 'aaa111', 1, 100, 1.0, 2.0, 4.00, true, DATE '2024-05-01', DATE '2024-05-01', DATE '2024-05-01', TIMESTAMP '2024-05-02 10:00:00')", qualifiedTableName), driver);
52-
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (3, 'bbb222', 2, 200, 2.0, 3.0, 8.00, false, DATE '2024-05-03', DATE '2024-05-03', DATE '2024-05-03', TIMESTAMP '2024-05-04 13:00:00')", qualifiedTableName), driver);
53-
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (4, 'bbb222', 2, 200, 2.0, 3.0, 8.00, false, DATE '2024-05-03', DATE '2024-05-03', DATE '2024-05-03', TIMESTAMP '2024-05-04 13:00:00')", qualifiedTableName), driver);
54-
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (5, null, null, null, null, null, null, null, null, null, null, null)", qualifiedTableName), driver);
55-
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (6, null, null, null, null, null, null, null, null, null, null, null)", qualifiedTableName), driver);
60+
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (1, 'aaa111', 1, 100, 1.0, 2.0, 4.00, true, DATE '2024-05-01', DATE '2024-05-01', DATE '2024-05-01', TIMESTAMP '2024-05-02 10:00:00')", QUALIFIED_TABLE_NAME), driver);
61+
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (2, 'aaa111', 1, 100, 1.0, 2.0, 4.00, true, DATE '2024-05-01', DATE '2024-05-01', DATE '2024-05-01', TIMESTAMP '2024-05-02 10:00:00')", QUALIFIED_TABLE_NAME), driver);
62+
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (3, 'bbb222', 2, 200, 2.0, 3.0, 8.00, false, DATE '2024-05-03', DATE '2024-05-03', DATE '2024-05-03', TIMESTAMP '2024-05-04 13:00:00')", QUALIFIED_TABLE_NAME), driver);
63+
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (4, 'bbb222', 2, 200, 2.0, 3.0, 8.00, false, DATE '2024-05-03', DATE '2024-05-03', DATE '2024-05-03', TIMESTAMP '2024-05-04 13:00:00')", QUALIFIED_TABLE_NAME), driver);
64+
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (5, null, null, null, null, null, null, null, null, null, null, null)", QUALIFIED_TABLE_NAME), driver);
65+
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (6, null, null, null, null, null, null, null, null, null, null, null)", QUALIFIED_TABLE_NAME), driver);
5666

57-
Assert.assertEquals(6, getFilesCount(qualifiedTableName));
58-
List<String> recordsBefore = getAllRecords(qualifiedTableName);
67+
Assert.assertEquals(6, getFilesCount());
68+
List<String> recordsBefore = getAllRecords();
5969

60-
CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, false,
70+
CompactorTestUtil.runCompaction(conf, DB_NAME, TABLE_NAME, CompactionType.MINOR, false,
6171
"a=aaa111/a_trunc=aaa/a_bucket=0/b=1/c=100/d=1.0/e=2.0/f=4.00/g=true/h=2024-05-01/h_year=2024/i_month=2024-05/j_day=2024-05-01/k=2024-05-02T10%3A00%3A00/k_hour=2024-05-02-10",
6272
"a=bbb222/a_trunc=bbb/a_bucket=3/b=2/c=200/d=2.0/e=3.0/f=8.00/g=false/h=2024-05-03/h_year=2024/i_month=2024-05/j_day=2024-05-03/k=2024-05-04T13%3A00%3A00/k_hour=2024-05-04-13",
6373
"a=null/a_trunc=null/a_bucket=null/b=null/c=null/d=null/e=null/f=null/g=null/h=null/h_year=null/i_month=null/j_day=null/k=null/k_hour=null"
6474
);
6575

66-
Assert.assertEquals(3, getFilesCount(qualifiedTableName));
76+
Assert.assertEquals(3, getFilesCount());
6777
verifySuccessfulCompaction(3);
68-
List<String> recordsAfter = getAllRecords(qualifiedTableName);
78+
List<String> recordsAfter = getAllRecords();
6979

7080
Assert.assertEquals(recordsBefore, recordsAfter);
7181
}
72-
73-
private int getFilesCount(String qualifiedTableName) throws Exception {
74-
driver.run(String.format("select count(*) from %s.files", qualifiedTableName));
82+
83+
@Test
84+
public void testIcebergAutoCompactionPartitionEvolution() throws Exception {
85+
executeStatementOnDriver(String.format("create table %s " +
86+
"(id int, a string) " +
87+
"partitioned by spec(id) stored by iceberg stored as orc " +
88+
"tblproperties ('compactor.threshold.min.input.files'='1')", QUALIFIED_TABLE_NAME), driver);
89+
90+
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (1, 'a')", QUALIFIED_TABLE_NAME), driver);
91+
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (2, 'b')", QUALIFIED_TABLE_NAME), driver);
92+
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (3, 'c')", QUALIFIED_TABLE_NAME), driver);
93+
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (4, 'd')", QUALIFIED_TABLE_NAME), driver);
94+
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (5, 'e')", QUALIFIED_TABLE_NAME), driver);
95+
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (6, 'd')", QUALIFIED_TABLE_NAME), driver);
96+
97+
executeStatementOnDriver(String.format("alter table %s set partition spec(truncate(3, a))", QUALIFIED_TABLE_NAME), driver);
98+
99+
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (7, 'aaa111')", QUALIFIED_TABLE_NAME), driver);
100+
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (8, 'aaa111')", QUALIFIED_TABLE_NAME), driver);
101+
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (9, 'bbb222')", QUALIFIED_TABLE_NAME), driver);
102+
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (10, 'bbb222')", QUALIFIED_TABLE_NAME), driver);
103+
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (11, null)", QUALIFIED_TABLE_NAME), driver);
104+
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (12, null)", QUALIFIED_TABLE_NAME), driver);
105+
106+
startInitiator();
107+
ShowCompactResponse rsp = msClient.showCompactions();
108+
Assert.assertEquals(4, rsp.getCompactsSize());
109+
110+
// Compaction should be initiated for each partition from the latest spec
111+
Assert.assertTrue(isCompactExist(rsp, "a_trunc_3=aaa", CompactionType.MINOR, CompactionState.INITIATED));
112+
Assert.assertTrue(isCompactExist(rsp, "a_trunc_3=bbb", CompactionType.MINOR, CompactionState.INITIATED));
113+
Assert.assertTrue(isCompactExist(rsp, "a_trunc_3=null", CompactionType.MINOR, CompactionState.INITIATED));
114+
115+
// Additional compaction should be initiated for all partitions from past partition specs
116+
Assert.assertTrue(isCompactExist(rsp, null, CompactionType.MINOR, CompactionState.INITIATED));
117+
}
118+
119+
@Test
120+
public void testIcebergAutoCompactionUnpartitioned() throws Exception {
121+
executeStatementOnDriver(String.format("create table %s " +
122+
"(id int, a string) " +
123+
"stored by iceberg stored as orc " +
124+
"tblproperties ('compactor.threshold.min.input.files'='1')", QUALIFIED_TABLE_NAME), driver);
125+
126+
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (7, 'aaa111')", QUALIFIED_TABLE_NAME), driver);
127+
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (8, 'aaa111')", QUALIFIED_TABLE_NAME), driver);
128+
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (9, 'bbb222')", QUALIFIED_TABLE_NAME), driver);
129+
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (10, 'bbb222')", QUALIFIED_TABLE_NAME), driver);
130+
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (11, null)", QUALIFIED_TABLE_NAME), driver);
131+
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (12, null)", QUALIFIED_TABLE_NAME), driver);
132+
133+
startInitiator();
134+
ShowCompactResponse rsp = msClient.showCompactions();
135+
Assert.assertEquals(1, rsp.getCompactsSize());
136+
Assert.assertTrue(isCompactExist(rsp, null, CompactionType.MINOR, CompactionState.INITIATED));
137+
}
138+
139+
private int getFilesCount() throws Exception {
140+
driver.run(String.format("select count(*) from %s.files", QUALIFIED_TABLE_NAME));
75141
List<String> res = new ArrayList<>();
76142
driver.getFetchTask().fetch(res);
77143
return Integer.parseInt(res.get(0));
78144
}
79145

80-
private List<String> getAllRecords(String qualifiedTableName) throws Exception {
81-
driver.run(String.format("select * from %s order by id", qualifiedTableName));
146+
private List<String> getAllRecords() throws Exception {
147+
driver.run(String.format("select * from %s order by id", QUALIFIED_TABLE_NAME));
82148
List<String> res = new ArrayList<>();
83149
driver.getFetchTask().fetch(res);
84150
return res;
85151
}
152+
153+
private boolean isCompactExist(ShowCompactResponse rsp, String partName, CompactionType type, CompactionState state) {
154+
return rsp.getCompacts().stream().anyMatch(c ->
155+
c.getDbname().equals(DB_NAME) && c.getTablename().equals(TABLE_NAME) &&
156+
Objects.equals(c.getPartitionname(), partName) && c.getType().equals(type) &&
157+
c.getState().equals(state.name().toLowerCase()));
158+
}
159+
160+
@Override
161+
protected InitiatorBase getInitiator() {
162+
return new IcebergInitiator();
163+
}
86164
}

ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,13 @@
3737
import org.apache.hadoop.hive.metastore.api.AggrStats;
3838
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
3939
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
40+
import org.apache.hadoop.hive.metastore.api.CompactionType;
4041
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
4142
import org.apache.hadoop.hive.metastore.api.FieldSchema;
4243
import org.apache.hadoop.hive.metastore.api.LockType;
4344
import org.apache.hadoop.hive.metastore.api.MetaException;
4445
import org.apache.hadoop.hive.metastore.api.Table;
46+
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
4547
import org.apache.hadoop.hive.ql.Context;
4648
import org.apache.hadoop.hive.ql.Context.Operation;
4749
import org.apache.hadoop.hive.ql.ddl.DDLOperationContext;
@@ -76,7 +78,6 @@
7678
import org.apache.hadoop.mapred.JobConf;
7779
import org.apache.hadoop.mapred.OutputCommitter;
7880
import org.apache.hadoop.mapred.OutputFormat;
79-
import org.apache.hadoop.mapred.TaskAttemptContext;
8081

8182
import java.util.HashMap;
8283
import java.util.List;
@@ -980,4 +981,16 @@ default MergeTaskProperties getMergeTaskProperties(Properties properties) {
980981
default void setMergeTaskDeleteProperties(TableDesc tableDesc) {
981982
throw new UnsupportedOperationException("Storage handler does not support getting custom delete merge schema.");
982983
}
984+
985+
/**
986+
* Determines the appropriate {@link CompactionType} for a given table or partition and compaction information.
987+
* @param table {@link org.apache.hadoop.hive.metastore.api.Table} metadata stored in Hive Metastore.
988+
* @param ci {@link org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo} details about the compaction request.
989+
* @return The determined {@link CompactionType} or null if table or partition does not need compaction.
990+
* @throws IOException If an I/O error occurs during the compaction evaluation process.
991+
* @throws UnsupportedOperationException if the implementing storage handler does not support this operation.
992+
*/
993+
default CompactionType determineCompactionType(Table table, CompactionInfo ci) throws IOException {
994+
throw new UnsupportedOperationException("Storage handler does not support determining compaction type");
995+
}
983996
}

0 commit comments

Comments
 (0)