Skip to content

HIVE-29116 : Create a DDL for setting hive default partition name at the table level #6013

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
33 changes: 18 additions & 15 deletions common/src/java/org/apache/hadoop/hive/common/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.hadoop.fs.PathIsDirectoryException;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.Utils;
Expand Down Expand Up @@ -161,9 +162,9 @@ private FileUtils() {
// prevent instantiation
}


public static String makePartName(List<String> partCols, List<String> vals) {
return makePartName(partCols, vals, null);
public static String makePartName(List<String> partCols, List<String> vals, Map<String, String> tableParams,
Configuration conf) {
return makePartName(partCols, vals, null, tableParams, conf);
}

/**
Expand All @@ -175,15 +176,15 @@ public static String makePartName(List<String> partCols, List<String> vals) {
* @return An escaped, valid partition name.
*/
public static String makePartName(List<String> partCols, List<String> vals,
String defaultStr) {
String defaultStr, Map<String, String> tableParams, Configuration conf) {
StringBuilder name = new StringBuilder();
for (int i = 0; i < partCols.size(); i++) {
if (i > 0) {
name.append(Path.SEPARATOR);
}
name.append(escapePathName((partCols.get(i)).toLowerCase(), defaultStr));
name.append(escapePathName((partCols.get(i)).toLowerCase(), defaultStr, tableParams, conf));
name.append('=');
name.append(escapePathName(vals.get(i), defaultStr));
name.append(escapePathName(vals.get(i), defaultStr, tableParams, conf));
}
return name.toString();
}
Expand All @@ -196,9 +197,9 @@ public static String makePartName(List<String> partCols, List<String> vals,
* @return
*/
public static String makeDefaultListBucketingDirName(List<String> skewedCols,
String name) {
String name, Map<String, String> tableParams, Configuration conf) {
String lbDirName;
String defaultDir = FileUtils.escapePathName(name);
String defaultDir = FileUtils.escapePathName(name, tableParams, conf);
StringBuilder defaultDirPath = new StringBuilder();
for (int i = 0; i < skewedCols.size(); i++) {
if (i > 0) {
Expand All @@ -216,15 +217,16 @@ public static String makeDefaultListBucketingDirName(List<String> skewedCols,
* @param vals The skewed values
* @return An escaped, valid list bucketing directory name.
*/
public static String makeListBucketingDirName(List<String> lbCols, List<String> vals) {
public static String makeListBucketingDirName(List<String> lbCols, List<String> vals, Map<String, String> tableParams,
Configuration conf) {
StringBuilder name = new StringBuilder();
for (int i = 0; i < lbCols.size(); i++) {
if (i > 0) {
name.append(Path.SEPARATOR);
}
name.append(escapePathName((lbCols.get(i)).toLowerCase()));
name.append(escapePathName((lbCols.get(i)).toLowerCase(), tableParams, conf));
name.append('=');
name.append(escapePathName(vals.get(i)));
name.append(escapePathName(vals.get(i), tableParams, conf));
}
return name.toString();
}
Expand Down Expand Up @@ -276,8 +278,8 @@ static boolean needsEscaping(char c) {
return c < charToEscape.size() && charToEscape.get(c);
}

public static String escapePathName(String path) {
return escapePathName(path, null);
public static String escapePathName(String path, Map<String, String> tableParams, Configuration conf) {
return escapePathName(path, null, tableParams, conf);
}

/**
Expand All @@ -287,15 +289,16 @@ public static String escapePathName(String path) {
* The default name for the path, if the given path is empty or null.
* @return An escaped path name.
*/
public static String escapePathName(String path, String defaultPath) {
public static String escapePathName(String path, String defaultPath, Map<String, String> tableParams,
Configuration conf) {

// __HIVE_DEFAULT_NULL__ is the system default value for null and empty string.
// TODO: we should allow user to specify default partition or HDFS file location.
if (path == null || path.length() == 0) {
if (defaultPath == null) {
//previously, when path is empty or null and no default path is specified,
// __HIVE_DEFAULT_PARTITION__ was the return value for escapePathName
return "__HIVE_DEFAULT_PARTITION__";
return MetaStoreUtils.getDefaultPartitionName(tableParams, conf);
} else {
return defaultPath;
}
Expand Down
1 change: 1 addition & 0 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -5613,6 +5613,7 @@ public static enum ConfVars {
"hive.security.metastore.authorization.manager,hive.security.metastore.authenticator.manager," +
"hive.users.in.admin.role,hive.server2.xsrf.filter.enabled,hive.server2.csrf.filter.enabled,hive.security.authorization.enabled," +
"hive.distcp.privileged.doAs," +
"hive.exec.default.partition.name," +
"hive.server2.authentication.ldap.baseDN," +
"hive.server2.authentication.ldap.url," +
"hive.server2.authentication.ldap.Domain," +
Expand Down
1 change: 1 addition & 0 deletions common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ public enum ErrorMsg {
CATALOG_ALREADY_EXISTS(10444, "Catalog {0} already exists", true),
CATALOG_NOT_EXISTS(10445, "Catalog {0} does not exists:", true),
INVALID_SCHEDULED_QUERY(10446, "Scheduled query {0} does not exist", true),
NON_PARTITIONED_TABLE(10447, "Table {0} is not a partitioned table."),

//========================== 20000 range starts here ========================//

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ public void testPathEscapeChars() {
StringBuilder sb = new StringBuilder();
FileUtils.charToEscape.stream().forEach(integer -> sb.append((char) integer));
String path = sb.toString();
assertEquals(path, FileUtils.unescapePathName(FileUtils.escapePathName(path)));
assertEquals(path, FileUtils.unescapePathName(FileUtils.escapePathName(path, null, new HiveConf())));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.ddl.table.partition.PartitionUtils;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
Expand Down Expand Up @@ -72,7 +74,8 @@ class DynamicPartitionFileRecordWriterContainer extends FileRecordWriterContaine
*/
public DynamicPartitionFileRecordWriterContainer(
RecordWriter<? super WritableComparable<?>, ? super Writable> baseWriter,
TaskAttemptContext context) throws IOException, InterruptedException {
TaskAttemptContext context, Table tbl)
throws IOException, InterruptedException {
super(baseWriter, context);
maxDynamicPartitions = jobInfo.getMaxDynamicPartitions();
dynamicPartCols = jobInfo.getPosOfDynPartCols();
Expand All @@ -88,7 +91,8 @@ public DynamicPartitionFileRecordWriterContainer(
this.dynamicContexts = new HashMap<String, org.apache.hadoop.mapred.TaskAttemptContext>();
this.dynamicObjectInspectors = new HashMap<String, ObjectInspector>();
this.dynamicOutputJobInfo = new HashMap<String, OutputJobInfo>();
this.HIVE_DEFAULT_PARTITION_VALUE = HiveConf.getVar(context.getConfiguration(), HiveConf.ConfVars.DEFAULT_PARTITION_NAME);
this.HIVE_DEFAULT_PARTITION_VALUE = PartitionUtils.getDefaultPartitionName(tbl.getParameters(),
context.getConfiguration());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.ql.ddl.table.partition.PartitionUtils;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.shims.ShimLoader;
Expand Down Expand Up @@ -395,7 +396,8 @@ private Partition constructPartition(
HdfsUtils.setFullFileStatus(conf, status, status.getFileStatus().getGroup(), fs,
partPath, false);
}
partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs);
partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs, table.getParameters(),
conf);
}
}

Expand All @@ -406,7 +408,7 @@ private Partition constructPartition(

// Set the location in the StorageDescriptor
if (dynamicPartitioningUsed) {
String dynamicPartitionDestination = getFinalDynamicPartitionDestination(table, partKVs, jobInfo);
String dynamicPartitionDestination = getFinalDynamicPartitionDestination(table, partKVs, jobInfo, conf);
if (harProcessor.isEnabled()) {
harProcessor.exec(context, partition, partPath);
partition.getSd().setLocation(
Expand All @@ -421,13 +423,14 @@ private Partition constructPartition(
}

private String getFinalDynamicPartitionDestination(Table table, Map<String, String> partKVs,
OutputJobInfo jobInfo) {
OutputJobInfo jobInfo, Configuration conf) {
Path partPath = new Path(table.getTTable().getSd().getLocation());
if (!customDynamicLocationUsed) {
// file:///tmp/hcat_junit_warehouse/employee/_DYN0.7770480401313761/emp_country=IN/emp_state=KA ->
// file:///tmp/hcat_junit_warehouse/employee/emp_country=IN/emp_state=KA
for (FieldSchema partKey : table.getPartitionKeys()) {
partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs);
partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs, table.getParameters(),
conf);
}

return partPath.toString();
Expand All @@ -437,7 +440,8 @@ private String getFinalDynamicPartitionDestination(Table table, Map<String, Stri
&& jobInfo.getCustomDynamicRoot().length() > 0) {
partPath = new Path(partPath, jobInfo.getCustomDynamicRoot());
}
return new Path(partPath, HCatFileUtil.resolveCustomPath(jobInfo, partKVs, false)).toString();
return new Path(partPath, HCatFileUtil.resolveCustomPath(jobInfo, partKVs, false,
table.getParameters(), conf)).toString();
}
}

Expand All @@ -453,11 +457,12 @@ private Map<String, String> getStorerParameterMap(StorerInfo storer) {
return params;
}

private Path constructPartialPartPath(Path partialPath, String partKey, Map<String, String> partKVs) {
private Path constructPartialPartPath(Path partialPath, String partKey, Map<String, String> partKVs,
Map<String, String> tableParams, Configuration conf) {

StringBuilder sb = new StringBuilder(FileUtils.escapePathName(partKey));
StringBuilder sb = new StringBuilder(FileUtils.escapePathName(partKey, tableParams, conf));
sb.append("=");
sb.append(FileUtils.escapePathName(partKVs.get(partKey)));
sb.append(FileUtils.escapePathName(partKVs.get(partKey), tableParams, conf));
return new Path(partialPath, sb.toString());
}

Expand Down Expand Up @@ -712,7 +717,8 @@ private void discoverPartitions(JobContext context) throws IOException {

// construct a path pattern (e.g., /*/*) to find all dynamically generated paths
String dynPathSpec = loadPath.toUri().getPath();
dynPathSpec = dynPathSpec.replace("__HIVE_DEFAULT_PARTITION__", "*");
dynPathSpec = dynPathSpec.replace(PartitionUtils.getDefaultPartitionName(
jobInfo.getTableInfo().getTable().getParameters(), HCatUtil.getHiveConf(context.getConfiguration())), "*");

// LOG.info("Searching for "+dynPathSpec);
Path pathPattern = new Path(dynPathSpec);
Expand Down Expand Up @@ -990,7 +996,7 @@ private void moveCustomLocationTaskOutputs(FileSystem fs, Table table, Configura
// final destination of each partition and move its output.
for (Entry<String, Map<String, String>> entry : partitionsDiscoveredByPath.entrySet()) {
Path src = new Path(entry.getKey());
Path destPath = new Path(getFinalDynamicPartitionDestination(table, entry.getValue(), jobInfo));
Path destPath = new Path(getFinalDynamicPartitionDestination(table, entry.getValue(), jobInfo, conf));
moveTaskOutputs(conf, src, src, destPath, true);
}
// delete the parent temp directory of all custom dynamic partitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public RecordWriter<WritableComparable<?>, HCatRecord> getRecordWriter(TaskAttem
// (That's because records can't be written until the values of the dynamic partitions are deduced.
// By that time, a new local instance of RecordWriter, with the correct output-path, will be constructed.)
rw = new DynamicPartitionFileRecordWriterContainer(
(org.apache.hadoop.mapred.RecordWriter)null, context);
(org.apache.hadoop.mapred.RecordWriter)null, context, jobInfo.getTableInfo().getTable());
} else {
Path parentDir = new Path(context.getConfiguration().get("mapred.work.output.dir"));
String extension = HiveConf.getVar(context.getConfiguration(), HiveConf.ConfVars.OUTPUT_FILE_EXTENSION,"");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ public void configureOutputJobProperties(TableDesc tableDesc,
&& jobInfo.getCustomDynamicPath().length() > 0) {
// dynamic partitioning with custom path; resolve the custom path
// using partition column values
outputLocation = HCatFileUtil.resolveCustomPath(jobInfo, null, true);
outputLocation = HCatFileUtil.resolveCustomPath(jobInfo, null, true,
(jobInfo.getTableInfo() != null && jobInfo.getTableInfo().getTable() != null) ?
jobInfo.getTableInfo().getTable().getParameters() : null, conf);
} else if ((dynHash == null)
&& Boolean.parseBoolean((String)tableDesc.getProperties().get("EXTERNAL"))
&& jobInfo.getLocation() != null && jobInfo.getLocation().length() > 0) {
Expand All @@ -182,7 +184,9 @@ public void configureOutputJobProperties(TableDesc tableDesc,
cols.add(name);
values.add(value);
}
outputLocation = FileUtils.makePartName(cols, values);
outputLocation = FileUtils.makePartName(cols, values, (jobInfo.getTableInfo() != null &&
jobInfo.getTableInfo().getTable() != null) ? jobInfo.getTableInfo().getTable().getParameters() : null,
conf);
}

if (outputLocation!= null && !outputLocation.isEmpty()){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.ddl.table.partition.PartitionUtils;

public class HCatFileUtil {

Expand All @@ -36,7 +38,7 @@ public class HCatFileUtil {
// This method parses the custom dynamic path and replaces each occurrence
// of column name within regex pattern with its corresponding value, if provided
public static String resolveCustomPath(OutputJobInfo jobInfo,
Map<String, String> dynPartKVs, boolean createRegexPath) {
Map<String, String> dynPartKVs, boolean createRegexPath, Map<String, String> tableParams, Configuration conf) {
// get custom path string
String customPath = jobInfo.getCustomDynamicPath();
// create matcher for custom path
Expand Down Expand Up @@ -68,7 +70,7 @@ public static String resolveCustomPath(OutputJobInfo jobInfo,
if (columnValue != null) {
sb.append(columnValue);
} else {
sb.append("__HIVE_DEFAULT_PARTITION__");
sb.append(PartitionUtils.getDefaultPartitionName(tableParams, conf));
}

if (createRegexPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,8 @@ public PartitionFiles next() {
fileIterator = Collections.emptyIterator();
}
PartitionFiles partitionFiles =
new PartitionFiles(Warehouse.makePartName(t.getPartitionKeys(), p.getValues()), fileIterator);
new PartitionFiles(Warehouse.makePartName(t.getPartitionKeys(), p.getValues(), t.getParameters(), conf),
fileIterator);
return partitionFiles;
} catch (MetaException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ Partition toHivePartition() throws HCatException {
if (sd.getLocation() == null) {
LOG.warn("Partition location is not set! Attempting to construct default partition location.");
try {
String partName = Warehouse.makePartName(HCatSchemaUtils.getFieldSchemas(hcatTable.getPartCols()), values);
String partName = Warehouse.makePartName(HCatSchemaUtils.getFieldSchemas(hcatTable.getPartCols()), values,
hcatTable.getTblProps(), hcatTable.getConf());
sd.setLocation(new Path(hcatTable.getSd().getLocation(), partName).toString());
}
catch(MetaException exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,12 @@ public static String fixPath(String path) {
}

public static String makePartLocation(HCatTable table, Map<String, String> partitionSpec) throws MetaException {
return (new Path(table.getSd().getLocation(), Warehouse.makePartPath(partitionSpec))).toUri().toString();
try {
return (new Path(table.getSd().getLocation(), Warehouse.makePartPath(partitionSpec,
table.toHiveTable().getParameters(), getConf()))).toUri().toString();
} catch (HCatException e) {
throw new RuntimeException(e);
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -876,8 +876,7 @@ public DynamicPartitionCtx createDPContext(
Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties());

DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(Maps.newLinkedHashMap(),
hiveConf.getVar(ConfVars.DEFAULT_PARTITION_NAME),
hiveConf.getIntVar(ConfVars.DYNAMIC_PARTITION_MAX_PARTS_PER_NODE));
hiveConf.getIntVar(ConfVars.DYNAMIC_PARTITION_MAX_PARTS_PER_NODE), hmsTable.getParameters(), hiveConf);

if (table.spec().isPartitioned() &&
hiveConf.getIntVar(ConfVars.HIVE_OPT_SORT_DYNAMIC_PARTITION_THRESHOLD) >= 0) {
Expand Down Expand Up @@ -2009,7 +2008,7 @@ public Partition getPartition(org.apache.hadoop.hive.ql.metadata.Table table,
Map<String, String> partitionSpec, RewritePolicy policy) throws SemanticException {
validatePartSpec(table, partitionSpec, policy);
try {
String partName = Warehouse.makePartName(partitionSpec, false);
String partName = Warehouse.makePartName(partitionSpec, false, table.getParameters(), conf);
return new DummyPartition(table, partName, partitionSpec);
} catch (MetaException e) {
throw new SemanticException("Unable to construct name for dummy partition due to: ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ public static void importFiles(String sourceLocation,
Partition partition = partitionIterator.next();
Callable<Void> task = () -> {
Path partitionPath = new Path(partition.getSd().getLocation());
String partitionName = Warehouse.makePartName(partitionKeys, partition.getValues());
String partitionName = Warehouse.makePartName(partitionKeys, partition.getValues(),
icebergTable.properties(), conf);
Map<String, String> partitionSpec = Warehouse.makeSpecFromName(partitionName);
RemoteIterator<LocatedFileStatus> iterator = getFilesIterator(partitionPath, conf);
List<DataFile> dataFiles =
Expand Down
Loading
Loading