Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-28668: Hive should emit fewer events for truncate table operation #5582

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,49 @@ public void alterPartitions() throws Exception {
testEventCounts(defaultDbName, firstEventId, null, null, 3);
}

@Test
public void testTruncatePartitionedTable() throws Exception {
String defaultDbName = "default";
String unPartitionedTblName = "unPartitionedTable";
new TableBuilder()
.setDbName(defaultDbName)
.setTableName(unPartitionedTblName)
.addCol("col1", "int")
.setLocation(testTempDir)
.create(msClient, new HiveConf());

Table table = msClient.getTable(new GetTableRequest(defaultDbName,
unPartitionedTblName));
msClient.truncateTable(defaultDbName, unPartitionedTblName, null);
NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(2, rsp.getEventsSize()); // create unpartitioned table + alter table events

String partitionedTblName = "partitionedTbl";
new TableBuilder()
.setDbName(defaultDbName)
.setTableName(partitionedTblName)
.addCol("col1", "int")
.addPartCol("col2", "int")
.addPartCol("col3", "string")
.setLocation(testTempDir)
.create(msClient, new HiveConf());
table = msClient.getTable(new GetTableRequest(defaultDbName,
partitionedTblName));
List<Partition> partitions = new ArrayList<>();
for (int i = 0; i < 5; i++) {
List<String> values = Arrays.asList(i + "", "part" + i);
Partition part = new Partition(values, defaultDbName, partitionedTblName,
0, 0, table.getSd(), emptyParameters);
partitions.add(part);
}
msClient.add_partitions(partitions);
msClient.truncateTable(defaultDbName, partitionedTblName, null);
rsp = msClient.getNextNotification(firstEventId, 0, null);
// 5 events - create unpartitioned table, alter table events
// create partitioned table, add partition, alter table events.
assertEquals(5, rsp.getEventsSize());
}

@Test
public void dropPartition() throws Exception {
String defaultDbName = "default";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@

import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
import org.apache.hadoop.hive.metastore.messaging.AlterPartitionsMessage;
import org.apache.hadoop.hive.ql.ddl.DDLWork;
import org.apache.hadoop.hive.ql.ddl.table.misc.truncate.TruncateTableDesc;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.SemanticException;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -35,17 +39,39 @@
public class TruncatePartitionHandler extends AbstractMessageHandler {
@Override
public List<Task<?>> handle(Context context) throws SemanticException {
AlterPartitionMessage msg = deserializer.getAlterPartitionMessage(context.dmd.getPayload());
final TableName tName = TableName.fromString(msg.getTable(), null,
context.isDbNameEmpty() ? msg.getDB() : context.dbName);

Map<String, String> partSpec = new LinkedHashMap<>();
final TableName tName;
org.apache.hadoop.hive.metastore.api.Table tblObj;
try {
tblObj = msg.getTableObj();
Iterator<String> afterIterator = msg.getPtnObjAfter().getValuesIterator();
for (FieldSchema fs : tblObj.getPartitionKeys()) {
partSpec.put(fs.getName(), afterIterator.next());
if (MetastoreConf.getBoolVar(context.hiveConf,
MetastoreConf.ConfVars.NOTIFICATION_ALTER_PARTITIONS_V2_ENABLED)) {
AlterPartitionsMessage singleMsg = deserializer.getAlterPartitionsMessage(
context.dmd.getPayload());
tName = TableName.fromString(singleMsg.getTable(), null,
context.isDbNameEmpty() ? singleMsg.getDB() : context.dbName);
tblObj = singleMsg.getTableObj();
dengzhhu653 marked this conversation as resolved.
Show resolved Hide resolved
List<Map<String, String>> afterPartitionsList = singleMsg.getPartitions();
List<Task<?>> childTaskList = new ArrayList<>();
for(Map<String, String> afterIteratorMap : afterPartitionsList) {
Iterator<String> afterIterator = afterIteratorMap.values().iterator();
Map<String, String> partSpec = new LinkedHashMap<>();
for (FieldSchema fs : tblObj.getPartitionKeys()) {
partSpec.put(fs.getName(), afterIterator.next());
dengzhhu653 marked this conversation as resolved.
Show resolved Hide resolved
}
childTaskList.addAll(handleSingleAlterPartition(context, tName, partSpec,
singleMsg.getWriteId()));
}
return childTaskList;
} else {
AlterPartitionMessage msg = deserializer.getAlterPartitionMessage(context.dmd.getPayload());
tName = TableName.fromString(msg.getTable(), null,
context.isDbNameEmpty() ? msg.getDB() : context.dbName);
tblObj = msg.getTableObj();
Iterator<String> afterIterator = msg.getPtnObjAfter().getValuesIterator();
Map<String, String> partSpec = new LinkedHashMap<>();
for (FieldSchema fs : tblObj.getPartitionKeys()) {
partSpec.put(fs.getName(), afterIterator.next());
}
return handleSingleAlterPartition(context, tName, partSpec, msg.getWriteId());
}
} catch (Exception e) {
if (!(e instanceof SemanticException)) {
Expand All @@ -54,18 +80,19 @@ public List<Task<?>> handle(Context context) throws SemanticException {
throw (SemanticException) e;
}
}
}

private List<Task<?>> handleSingleAlterPartition(Context context, TableName tName,
Map<String, String> partSpec, Long writeId) throws SemanticException {
TruncateTableDesc truncateTableDesc = new TruncateTableDesc(
tName, partSpec,
context.eventOnlyReplicationSpec());
truncateTableDesc.setWriteId(msg.getWriteId());
tName, partSpec, context.eventOnlyReplicationSpec());
truncateTableDesc.setWriteId(writeId);
Task<DDLWork> truncatePtnTask = TaskFactory.get(
new DDLWork(readEntitySet, writeEntitySet, truncateTableDesc, true,
context.getDumpDirectory(), context.getMetricCollector()), context.hiveConf);
context.getDumpDirectory(), context.getMetricCollector()), context.hiveConf);
context.log.debug("Added truncate ptn task : {}:{}:{}", truncatePtnTask.getId(),
truncateTableDesc.getTableName(), truncateTableDesc.getWriteId());
updatedMetadata.set(context.dmd.getEventTo().toString(), tName.getDb(), tName.getTable(), partSpec);

try {
return ReplUtils.addChildTask(truncatePtnTask);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public interface AlterHandler extends Configurable {

/**
* @deprecated As of release 2.2.0. Replaced by {@link #alterTable(RawStore, Warehouse, String,
* String, String, Table, EnvironmentContext, IHMSHandler, String)}
* String, String, Table, EnvironmentContext, IHMSHandler, String, boolean)}
*
* handles alter table, the changes could be cascaded to partitions if applicable
*
Expand All @@ -53,6 +53,8 @@ public interface AlterHandler extends Configurable {
* <i>newTable.tableName</i> if alter op is not a rename.
* @param newTable
* new table object
* @param envContext
* environment context variable
* @throws InvalidOperationException
* thrown if the newTable object is invalid
* @throws MetaException
Expand All @@ -62,7 +64,7 @@ public interface AlterHandler extends Configurable {
default void alterTable(RawStore msdb, Warehouse wh, String catName, String dbname,
String name, Table newTable, EnvironmentContext envContext)
throws InvalidOperationException, MetaException {
alterTable(msdb, wh, catName, dbname, name, newTable, envContext, null, null);
alterTable(msdb, wh, catName, dbname, name, newTable, envContext, null, null, false);
}

/**
Expand All @@ -82,25 +84,29 @@ default void alterTable(RawStore msdb, Warehouse wh, String catName, String dbna
* new table object
* @param handler
* HMSHandle object (required to log event notification)
* @param writeIdList write id list for the table
* @param isTruncateOp boolean flag to specify if this is truncate operation
* @param envContext environment context variable
* @throws InvalidOperationException
* thrown if the newTable object is invalid
* @throws MetaException
* thrown if there is any other error
*/
void alterTable(RawStore msdb, Warehouse wh, String catName, String dbname,
String name, Table newTable, EnvironmentContext envContext,
IHMSHandler handler, String writeIdList)
IHMSHandler handler, String writeIdList, boolean isTruncateOp)
throws InvalidOperationException, MetaException;

/**
* @deprecated As of release 2.2.0. Replaced by {@link #alterPartitions(RawStore, Warehouse, String,
* String, String, List, EnvironmentContext, String, long, IHMSHandler)}
* String, String, List, EnvironmentContext, String, long, IHMSHandler, boolean)}
*
* handles alter partition
*
* @param msdb
* object to get metadata
* @param wh
* physical warehouse class
* @param dbname
* database of the partition being altered
* @param name
Expand All @@ -110,10 +116,11 @@ void alterTable(RawStore msdb, Warehouse wh, String catName, String dbname,
* @param new_part
* new partition object
* @return the altered partition
* @throws InvalidOperationException
* @throws InvalidObjectException
* @throws AlreadyExistsException
* @throws MetaException
* @throws InvalidOperationException thrown if the operation is invalid
* @throws InvalidObjectException thrown if the new_part object is invalid
* @throws AlreadyExistsException thrown if the new_part object already exists
* @throws MetaException thrown if there is any other error
* @throws NoSuchObjectException thrown if there is no such object
*/
@Deprecated
Partition alterPartition(final RawStore msdb, Warehouse wh, final String dbname,
Expand All @@ -136,13 +143,16 @@ Partition alterPartition(final RawStore msdb, Warehouse wh, final String dbname,
* original values of the partition being altered
* @param new_part
* new partition object
* @param environmentContext environment context variable
* @param handler
* HMSHandle object (required to log event notification)
* @param validWriteIds valid write id list for the table
* @return the altered partition
* @throws InvalidOperationException
* @throws InvalidObjectException
* @throws AlreadyExistsException
* @throws MetaException
* @throws InvalidOperationException thrown if the operation is invalid
* @throws InvalidObjectException thrown if the new_part object is invalid
* @throws AlreadyExistsException thrown if the new_part object already exists
* @throws MetaException thrown if there is any other error
* @throws NoSuchObjectException thrown if there is no such object
*/
Partition alterPartition(final RawStore msdb, Warehouse wh, final String catName,
final String dbname, final String name, final List<String> part_vals,
Expand All @@ -152,24 +162,25 @@ Partition alterPartition(final RawStore msdb, Warehouse wh, final String catName

/**
* @deprecated As of release 3.0.0. Replaced by {@link #alterPartitions(RawStore, Warehouse, String,
* String, String, List, EnvironmentContext, String, long, IHMSHandler)}
* String, String, List, EnvironmentContext, String, long, IHMSHandler, boolean)}
*
* handles alter partitions
*
* @param msdb
* object to get metadata
* @param wh
* @param wh physical warehouse class
* @param dbname
* database of the partition being altered
* @param name
* table of the partition being altered
* @param new_parts
* new partition list
* @param environmentContext environment context variable
* @return the altered partition list
* @throws InvalidOperationException
* @throws InvalidObjectException
* @throws AlreadyExistsException
* @throws MetaException
* @throws InvalidOperationException thrown if the operation is invalid
* @throws InvalidObjectException thrown if the new_parts object is invalid
* @throws AlreadyExistsException thrown if the new_part object already exists
* @throws MetaException thrown if there is any other error
*/
@Deprecated
List<Partition> alterPartitions(final RawStore msdb, Warehouse wh,
Expand All @@ -179,27 +190,26 @@ List<Partition> alterPartitions(final RawStore msdb, Warehouse wh,

/**
* handles alter partitions
*
* @param msdb
* object to get metadata
* @param wh
* @param dbname
* database of the partition being altered
* @param name
* table of the partition being altered
* @param new_parts
* new partition list
* @param handler
* HMSHandle object (required to log event notification)
* @param msdb object to get metadata
* @param wh physical warehouse class
* @param catName catalog name of the partition being altered
* @param dbname database of the partition being altered
* @param name table of the partition being altered
* @param new_parts new partition list
* @param environmentContext environment context variable
* @param writeIdList write id list for the table
* @param writeId writeId for the table
* @param handler HMSHandle object (required to log event notification)
* @param isTruncateOp whether the operation is truncate
* @return the altered partition list
* @throws InvalidOperationException
* @throws InvalidObjectException
* @throws AlreadyExistsException
* @throws MetaException
* @throws InvalidOperationException thrown if the operation is invalid
* @throws InvalidObjectException thrown if the new_parts object is invalid
* @throws AlreadyExistsException thrown if the new_part object already exists
* @throws MetaException thrown if there is any other error
*/
List<Partition> alterPartitions(final RawStore msdb, Warehouse wh, final String catName,
final String dbname, final String name, final List<Partition> new_parts,
EnvironmentContext environmentContext, String writeIdList, long writeId,
IHMSHandler handler)
IHMSHandler handler, boolean isTruncateOp)
throws InvalidOperationException, InvalidObjectException, AlreadyExistsException, MetaException;
}
Loading
Loading