Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-28586 Support write order for Iceberg tables at CREATE TABLE #5541

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.SortOrderParser;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.parquet.Strings;

/**
* Class for catalog resolution and accessing the common functions for {@link Catalog} API.
Expand Down Expand Up @@ -140,15 +143,23 @@ public static Table createTable(Configuration conf, Properties props) {
Map<String, String> map = filterIcebergTableProperties(props);

Optional<Catalog> catalog = loadCatalog(conf, catalogName);

SortOrder sortOrder = getSortOrder(props, schema);
if (catalog.isPresent()) {
String name = props.getProperty(NAME);
Preconditions.checkNotNull(name, "Table identifier not set");
return catalog.get().createTable(TableIdentifier.parse(name), schema, spec, location, map);
return catalog.get().buildTable(TableIdentifier.parse(name), schema).withPartitionSpec(spec)
.withLocation(location).withProperties(map).withSortOrder(sortOrder).create();
}

Preconditions.checkNotNull(location, "Table location not set");
return new HadoopTables(conf).create(schema, spec, map, location);
return new HadoopTables(conf).create(schema, spec, sortOrder, map, location);
}

private static SortOrder getSortOrder(Properties props, Schema schema) {
String sortOrderJsonString = props.getProperty(InputFormatConfig.INSERT_WRITE_ORDER);
SortOrder sortOrder = Strings.isNullOrEmpty(sortOrderJsonString) ?
SortOrder.unsorted() : SortOrderParser.fromJson(schema, sortOrderJsonString);
return sortOrder;
}

/**
Expand Down Expand Up @@ -215,9 +226,9 @@ public static Table registerTable(Configuration conf, Properties props, String m
Preconditions.checkNotNull(name, "Table identifier not set");
return catalog.get().registerTable(TableIdentifier.parse(name), metadataLocation);
}

Preconditions.checkNotNull(location, "Table location not set");
return new HadoopTables(conf).create(schema, spec, map, location);
SortOrder sortOrder = getSortOrder(props, schema);
return new HadoopTables(conf).create(schema, spec, sortOrder, map, location);
}

public static void renameTable(Configuration conf, Properties props, TableIdentifier to) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ private InputFormatConfig() {
public static final String CATALOG_CLASS_TEMPLATE = "iceberg.catalog.%s.catalog-impl";
public static final String CATALOG_DEFAULT_CONFIG_PREFIX = "iceberg.catalog-default.";
public static final String QUERY_FILTERS = "iceberg.query.filters";
public static final String INSERT_WRITE_ORDER = "iceberg.write-order";

public enum InMemoryDataModel {
PIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NullOrderingType;
import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
Expand Down Expand Up @@ -82,13 +83,16 @@
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.PartitionsTable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.SortOrderParser;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
Expand Down Expand Up @@ -271,6 +275,23 @@ public void preCreateTable(CreateTableRequest request) {
setOrcOnlyFilesParam(hmsTable);
// Remove hive primary key columns from table request, as iceberg doesn't support hive primary key.
request.setPrimaryKeys(null);
addSortOrder(hmsTable, schema, catalogProperties);
}

private void addSortOrder(org.apache.hadoop.hive.metastore.api.Table hmsTable, Schema schema,
Properties properties) {
SortOrder.Builder sortOderBuilder = SortOrder.builderFor(schema);
hmsTable.getSd().getSortCols().forEach(item -> {
NullOrder nullOrder = item.getNullOrdering() == NullOrderingType.NULLS_FIRST ?
NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST;
if (item.getOrder() == 0) {
sortOderBuilder.desc(item.getCol(), nullOrder);
} else {
sortOderBuilder.asc(item.getCol(), nullOrder);
}

});
properties.put(InputFormatConfig.INSERT_WRITE_ORDER, SortOrderParser.toJson(sortOderBuilder.build()));
}

@Override
Expand Down Expand Up @@ -781,7 +802,7 @@ private void setCommonHmsTablePropertiesForIceberg(org.apache.hadoop.hive.metast
* @param hmsTable Table for which we are calculating the properties
* @return The properties we can provide for Iceberg functions, like {@link Catalogs}
*/
private static Properties getCatalogProperties(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
private Properties getCatalogProperties(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
Properties properties = new Properties();

hmsTable.getParameters().entrySet().stream().filter(e -> e.getKey() != null && e.getValue() != null).forEach(e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,7 @@ public void testIcebergAndHmsTableProperties() throws Exception {
expectedIcebergProperties.put(TableProperties.DELETE_MODE, HiveIcebergStorageHandler.MERGE_ON_READ);
expectedIcebergProperties.put(TableProperties.UPDATE_MODE, HiveIcebergStorageHandler.MERGE_ON_READ);
expectedIcebergProperties.put(TableProperties.MERGE_MODE, HiveIcebergStorageHandler.MERGE_ON_READ);
expectedIcebergProperties.put(InputFormatConfig.INSERT_WRITE_ORDER, "{\"order-id\":0,\"fields\":[]}");
Assert.assertEquals(expectedIcebergProperties, icebergTable.properties());

if (Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
-- Mask neededVirtualColumns due to non-strict order
--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/
-- Mask the totalSize value as it can have slight variability, causing test flakiness
--! qt:replace:/(\s+totalSize\s+)\S+(\s+)/$1#Masked#$2/
-- Mask random uuid
--! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/
-- Mask a random snapshot id
--! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/
-- Mask added file size
--! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
-- Mask total file size
--! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
-- Mask removed file size
--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
-- Mask current-snapshot-timestamp-ms
--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/
--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
-- Mask iceberg version
--! qt:replace:/(\S\"iceberg-version\\\":\\\")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))(\\\")/$1#Masked#$3/
set hive.llap.io.enabled=true;
set hive.vectorized.execution.enabled=true;
set hive.optimize.shared.work.merge.ts.schema=true;


create table ice_orc_sorted (id int, text string) write ordered by id desc nulls first, text asc nulls last stored by iceberg stored as orc;

insert into ice_orc_sorted values (3, "3"),(2, "2"),(4, "4"),(5, "5"),(1, "1"),(2, "3"),(3,null),(2,null),(null,"a");

describe formatted ice_orc_sorted;
describe extended ice_orc_sorted;

select * from ice_orc_sorted;

Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ Table Parameters:
current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"a\",\"required\":false,\"type\":\"int\"}]}
format-version 2
iceberg.orc.files.only false
iceberg.write-order {\"order-id\":0,\"fields\":[]}
metadata_location hdfs://### HDFS PATH ###
numFiles 0
numRows 0
Expand Down Expand Up @@ -559,6 +560,7 @@ Table Parameters:
current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"b\",\"required\":false,\"type\":\"int\"}]}
format-version 2
iceberg.orc.files.only false
iceberg.write-order {\"order-id\":0,\"fields\":[]}
metadata_location hdfs://### HDFS PATH ###
numFiles 0
numRows 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Table Parameters:
current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"i\",\"required\":false,\"type\":\"int\"},{\"id\":2,\"name\":\"s\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"ts\",\"required\":false,\"type\":\"timestamp\"},{\"id\":4,\"name\":\"d\",\"required\":false,\"type\":\"date\"}]}
format-version 2
iceberg.orc.files.only false
iceberg.write-order {\"order-id\":0,\"fields\":[]}
metadata_location hdfs://### HDFS PATH ###
numFiles 0
numRows 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Table Parameters:
current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"i\",\"required\":false,\"type\":\"int\"},{\"id\":2,\"name\":\"s\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"ts\",\"required\":false,\"type\":\"timestamp\"},{\"id\":4,\"name\":\"d\",\"required\":false,\"type\":\"date\"}]}
format-version 2
iceberg.orc.files.only true
iceberg.write-order {\"order-id\":0,\"fields\":[]}
metadata_location hdfs://### HDFS PATH ###
numFiles 0
numRows 0
Expand Down Expand Up @@ -109,6 +110,7 @@ Table Parameters:
current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"i\",\"required\":false,\"type\":\"int\"},{\"id\":2,\"name\":\"s\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"ts\",\"required\":false,\"type\":\"timestamp\"},{\"id\":4,\"name\":\"d\",\"required\":false,\"type\":\"date\"}]}
format-version 2
iceberg.orc.files.only false
iceberg.write-order {\"order-id\":0,\"fields\":[]}
metadata_location hdfs://### HDFS PATH ###
numFiles 0
numRows 0
Expand Down Expand Up @@ -181,6 +183,7 @@ Table Parameters:
current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"i\",\"required\":false,\"type\":\"int\"},{\"id\":2,\"name\":\"s\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"ts\",\"required\":false,\"type\":\"timestamp\"},{\"id\":4,\"name\":\"d\",\"required\":false,\"type\":\"date\"}]}
format-version 2
iceberg.orc.files.only false
iceberg.write-order {\"order-id\":0,\"fields\":[]}
metadata_location hdfs://### HDFS PATH ###
numFiles 0
numRows 0
Expand Down Expand Up @@ -253,6 +256,7 @@ Table Parameters:
current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"i\",\"required\":false,\"type\":\"int\"},{\"id\":2,\"name\":\"s\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"ts\",\"required\":false,\"type\":\"timestamp\"},{\"id\":4,\"name\":\"d\",\"required\":false,\"type\":\"date\"}]}
format-version 2
iceberg.orc.files.only false
iceberg.write-order {\"order-id\":0,\"fields\":[]}
metadata_location hdfs://### HDFS PATH ###
numFiles 0
numRows 0
Expand Down Expand Up @@ -320,6 +324,7 @@ Table Parameters:
dummy dummy_value
format-version 2
iceberg.orc.files.only true
iceberg.write-order {\"order-id\":0,\"fields\":[]}
metadata_location hdfs://### HDFS PATH ###
numFiles 0
numRows 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Table Parameters:
current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"i\",\"required\":false,\"type\":\"int\"},{\"id\":2,\"name\":\"s\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"ts\",\"required\":false,\"type\":\"timestamp\"},{\"id\":4,\"name\":\"d\",\"required\":false,\"type\":\"date\"}]}
format-version 2
iceberg.orc.files.only false
iceberg.write-order {\"order-id\":0,\"fields\":[]}
metadata_location hdfs://### HDFS PATH ###
numFiles 0
numRows 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Table Parameters:
current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"i\",\"required\":false,\"type\":\"int\"},{\"id\":2,\"name\":\"s\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"ts\",\"required\":false,\"type\":\"timestamp\"},{\"id\":4,\"name\":\"d\",\"required\":false,\"type\":\"date\"}]}
format-version 2
iceberg.orc.files.only true
iceberg.write-order {\"order-id\":0,\"fields\":[]}
metadata_location hdfs://### HDFS PATH ###
numFiles 0
numRows 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ Table Parameters:
default-partition-spec {\"spec-id\":0,\"fields\":[{\"name\":\"a_bucket\",\"transform\":\"bucket[16]\",\"source-id\":1,\"field-id\":1000},{\"name\":\"b_trunc\",\"transform\":\"truncate[3]\",\"source-id\":2,\"field-id\":1001}]}
format-version 2
iceberg.orc.files.only true
iceberg.write-order {\"order-id\":0,\"fields\":[]}
metadata_location hdfs://### HDFS PATH ###
numFiles 2
numRows 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ TBLPROPERTIES (
'current-schema'='{"type":"struct","schema-id":0,"fields":[{"id":1,"name":"a","required":false,"type":"int"}]}',
'format-version'='2',
'iceberg.orc.files.only'='false',
'iceberg.write-order'='{"order-id":0,"fields":[]}',
'metadata_location'='hdfs://### HDFS PATH ###',
'parquet.compression'='zstd',
'snapshot-count'='0',
Expand Down Expand Up @@ -129,6 +130,7 @@ TBLPROPERTIES (
'default-partition-spec'='{"spec-id":0,"fields":[{"name":"company","transform":"identity","source-id":2,"field-id":1000}]}',
'format-version'='2',
'iceberg.orc.files.only'='false',
'iceberg.write-order'='{"order-id":0,"fields":[]}',
'metadata_location'='hdfs://### HDFS PATH ###',
'parquet.compression'='zstd',
'serialization.format'='1',
Expand Down Expand Up @@ -173,6 +175,7 @@ TBLPROPERTIES (
'default-partition-spec'='{"spec-id":0,"fields":[{"name":"company","transform":"identity","source-id":2,"field-id":1000}]}',
'format-version'='2',
'iceberg.orc.files.only'='false',
'iceberg.write-order'='{"order-id":0,"fields":[]}',
'metadata_location'='hdfs://### HDFS PATH ###',
'parquet.compression'='zstd',
'snapshot-count'='0',
Expand Down Expand Up @@ -245,6 +248,7 @@ TBLPROPERTIES (
'default-partition-spec'='{"spec-id":0,"fields":[{"name":"company","transform":"identity","source-id":2,"field-id":1000}]}',
'format-version'='2',
'iceberg.orc.files.only'='false',
'iceberg.write-order'='{"order-id":0,"fields":[]}',
'metadata_location'='hdfs://### HDFS PATH ###',
'parquet.compression'='zstd',
'snapshot-count'='0',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ Table Parameters:
format-version 2
iceberg.delete.skiprowdata false
iceberg.orc.files.only true
iceberg.write-order {\"order-id\":0,\"fields\":[]}
metadata_location hdfs://### HDFS PATH ###
numFiles 0
numRows 0
Expand Down Expand Up @@ -269,6 +270,7 @@ Table Parameters:
current-snapshot-timestamp-ms #Masked#
format-version 2
iceberg.orc.files.only true
iceberg.write-order {\"order-id\":0,\"fields\":[]}
metadata_location hdfs://### HDFS PATH ###
numFiles 0
numRows 0
Expand Down Expand Up @@ -420,6 +422,7 @@ Table Parameters:
current-snapshot-timestamp-ms #Masked#
format-version 2
iceberg.orc.files.only true
iceberg.write-order {\"order-id\":0,\"fields\":[]}
metadata_location hdfs://### HDFS PATH ###
numFiles 0
numRows 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ Table Parameters:
current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"i\",\"required\":false,\"type\":\"int\"},{\"id\":2,\"name\":\"s\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"ts\",\"required\":false,\"type\":\"timestamp\"},{\"id\":4,\"name\":\"d\",\"required\":false,\"type\":\"date\"}]}
format-version 2
iceberg.orc.files.only false
iceberg.write-order {\"order-id\":0,\"fields\":[]}
metadata_location hdfs://### HDFS PATH ###
numFiles 0
numRows 0
Expand Down Expand Up @@ -140,6 +141,7 @@ Table Parameters:
default-partition-spec {\"spec-id\":0,\"fields\":[{\"name\":\"year_field_year\",\"transform\":\"year\",\"source-id\":1,\"field-id\":1000},{\"name\":\"month_field_month\",\"transform\":\"month\",\"source-id\":2,\"field-id\":1001},{\"name\":\"day_field_day\",\"transform\":\"day\",\"source-id\":3,\"field-id\":1002},{\"name\":\"hour_field_hour\",\"transform\":\"hour\",\"source-id\":4,\"field-id\":1003},{\"name\":\"truncate_field_trunc\",\"transform\":\"truncate[2]\",\"source-id\":5,\"field-id\":1004},{\"name\":\"bucket_field_bucket\",\"transform\":\"bucket[2]\",\"source-id\":6,\"field-id\":1005},{\"name\":\"identity_field\",\"transform\":\"identity\",\"source-id\":7,\"field-id\":1006}]}
format-version 2
iceberg.orc.files.only false
iceberg.write-order {\"order-id\":0,\"fields\":[]}
metadata_location hdfs://### HDFS PATH ###
numFiles 0
numRows 0
Expand Down Expand Up @@ -202,6 +204,7 @@ Table Parameters:
default-partition-spec {\"spec-id\":0,\"fields\":[{\"name\":\"year_field_year\",\"transform\":\"year\",\"source-id\":2,\"field-id\":1000},{\"name\":\"month_field_month\",\"transform\":\"month\",\"source-id\":3,\"field-id\":1001},{\"name\":\"day_field_day\",\"transform\":\"day\",\"source-id\":4,\"field-id\":1002},{\"name\":\"hour_field_hour\",\"transform\":\"hour\",\"source-id\":5,\"field-id\":1003},{\"name\":\"truncate_field_trunc\",\"transform\":\"truncate[2]\",\"source-id\":6,\"field-id\":1004},{\"name\":\"bucket_field_bucket\",\"transform\":\"bucket[2]\",\"source-id\":7,\"field-id\":1005},{\"name\":\"identity_field\",\"transform\":\"identity\",\"source-id\":8,\"field-id\":1006}]}
format-version 2
iceberg.orc.files.only false
iceberg.write-order {\"order-id\":0,\"fields\":[]}
metadata_location hdfs://### HDFS PATH ###
numFiles 0
numRows 0
Expand Down Expand Up @@ -252,6 +255,7 @@ Table Parameters:
default-partition-spec {\"spec-id\":0,\"fields\":[{\"name\":\"b\",\"transform\":\"identity\",\"source-id\":2,\"field-id\":1000}]}
format-version 2
iceberg.orc.files.only false
iceberg.write-order {\"order-id\":0,\"fields\":[]}
metadata_location hdfs://### HDFS PATH ###
numFiles 0
numRows 0
Expand Down
Loading
Loading