Skip to content

Commit

Permalink
Issue #166 - Replica.updateMetastore calls metastore with batched par…
Browse files Browse the repository at this point in the history
…tition lists. (#167)

* Issue 166 - call metastore with batched partition lists
  • Loading branch information
barnharts4 authored Feb 6, 2020
1 parent 9e1da5e commit 3ce18cc
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 54 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## [15.1.1] - 2020-02-06
### Fixed
* When replicating tables with large numbers of partitions, `Replica.updateMetadata` now calls add/alter partition in batches of 1000. See [#166](https://github.com/HotelsDotCom/circus-train/issues/166).

## [15.1.0] - 2020-01-28
### Changed
* AVRO Schema Copier now re-uses the normal 'data' copier instead of its own. See [#162](https://github.com/HotelsDotCom/circus-train/issues/162).
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2019 Expedia, Inc.
* Copyright (C) 2016-2020 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,9 +42,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Enums;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;

import com.hotels.bdp.circustrain.api.CircusTrainException;
import com.hotels.bdp.circustrain.api.ReplicaLocationManager;
Expand All @@ -71,6 +73,7 @@ public class Replica extends HiveEndpoint {
private final ReplicaCatalogListener replicaCatalogListener;
private final ReplicationMode replicationMode;
private final TableReplication tableReplication;
private int partitionBatchSize = 1000;

/**
* Use {@link ReplicaFactory}
Expand All @@ -91,6 +94,28 @@ public class Replica extends HiveEndpoint {
this.tableReplication = tableReplication;
}

/**
* Use {@link ReplicaFactory}
*/
@VisibleForTesting
Replica(
ReplicaCatalog replicaCatalog,
HiveConf replicaHiveConf,
Supplier<CloseableMetaStoreClient> replicaMetaStoreClientSupplier,
ReplicaTableFactory replicaTableFactory,
HousekeepingListener housekeepingListener,
ReplicaCatalogListener replicaCatalogListener,
TableReplication tableReplication,
int partitionBatchSize) {
super(replicaCatalog.getName(), replicaHiveConf, replicaMetaStoreClientSupplier);
this.replicaCatalogListener = replicaCatalogListener;
tableFactory = replicaTableFactory;
this.housekeepingListener = housekeepingListener;
replicationMode = tableReplication.getReplicationMode();
this.tableReplication = tableReplication;
this.partitionBatchSize = partitionBatchSize;
}

public void updateMetadata(
String eventId,
TableAndStatistics sourceTable,
Expand Down Expand Up @@ -175,7 +200,13 @@ public void updateMetadata(
if (!partitionsToCreate.isEmpty()) {
LOG.info("Creating {} new partitions.", partitionsToCreate.size());
try {
client.add_partitions(partitionsToCreate);
int counter = 0;
for (List<Partition> sublist : Lists.partition(partitionsToCreate, partitionBatchSize)) {
int start = counter * partitionBatchSize;
LOG.info("Creating partitions {} through {}", start, start + sublist.size() - 1);
client.add_partitions(sublist);
counter++;
}
} catch (TException e) {
throw new MetaStoreClientException("Unable to add partitions '"
+ partitionsToCreate
Expand All @@ -189,7 +220,13 @@ public void updateMetadata(
if (!partitionsToAlter.isEmpty()) {
LOG.info("Altering {} existing partitions.", partitionsToAlter.size());
try {
client.alter_partitions(replicaDatabaseName, replicaTableName, partitionsToAlter);
int counter = 0;
for (List<Partition> sublist : Lists.partition(partitionsToAlter, partitionBatchSize)) {
int start = counter * partitionBatchSize;
LOG.info("Altering partitions {} through {}", start, start + sublist.size() - 1);
client.alter_partitions(replicaDatabaseName, replicaTableName, sublist);
counter++;
}
} catch (TException e) {
throw new MetaStoreClientException("Unable to alter partitions '"
+ partitionsToAlter
Expand All @@ -203,7 +240,13 @@ public void updateMetadata(
if (!statisticsToSet.isEmpty()) {
LOG.info("Setting column statistics for {} partitions.", statisticsToSet.size());
try {
client.setPartitionColumnStatistics(new SetPartitionsStatsRequest(statisticsToSet));
int counter = 0;
for (List<ColumnStatistics> sublist : Lists.partition(statisticsToSet, partitionBatchSize)) {
int start = counter * partitionBatchSize;
LOG.info("Setting column statistics for partitions {} through {}", start, start + sublist.size() - 1);
client.setPartitionColumnStatistics(new SetPartitionsStatsRequest(sublist));
counter++;
}
} catch (TException e) {
throw new MetaStoreClientException(
"Unable to set column statistics of replica table '" + replicaDatabaseName + "." + replicaTableName + "'",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2019 Expedia, Inc.
* Copyright (C) 2016-2020 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
Expand All @@ -38,6 +39,7 @@
import java.util.List;
import java.util.Map;

import org.apache.commons.collections.ListUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
Expand Down Expand Up @@ -103,6 +105,7 @@ public class ReplicaTest {
private static final FieldSchema FIELD_D = new FieldSchema(COLUMN_D, "string", null);
private static final List<FieldSchema> FIELDS = Arrays.asList(FIELD_A, FIELD_B);
private static final List<FieldSchema> PARTITIONS = Arrays.asList(FIELD_C, FIELD_D);
private static final int TEST_PARTITION_BATCH_SIZE = 17;

public @Rule TemporaryFolder temporaryFolder = new TemporaryFolder();

Expand Down Expand Up @@ -175,7 +178,7 @@ private void convertExistingReplicaTableToView() {

private Replica newReplica(TableReplication tableReplication) {
return new Replica(replicaCatalog, hiveConf, metaStoreClientSupplier, tableFactory, houseKeepingListener,
replicaCatalogListener, tableReplication);
replicaCatalogListener, tableReplication, TEST_PARTITION_BATCH_SIZE);
}

@Test
Expand Down Expand Up @@ -391,29 +394,69 @@ public void getName() throws Exception {
assertThat(replica.getName(), is(NAME));
}

@Test
public void alteringExistingPartitionedReplicaTableWithPartitionsSucceeds() throws TException, IOException {
Partition newPartition = newPartition("three", "four");
ColumnStatistics newPartitionStatistics = newPartitionStatistics("three", "four");
Partition modifiedPartition = new Partition(existingPartition);
ColumnStatistics modifiedPartitionStatistics = newPartitionStatistics("one", "two");
when(mockMetaStoreClient
.getPartitionsByNames(DB_NAME, TABLE_NAME, Lists.newArrayList("c=one/d=two", "c=three/d=four")))
.thenReturn(Arrays.asList(existingPartition));
private void alterExistingPartitionedReplicaTableWithNewPartitionsInBatches(
int numTestAlterPartitions, int numTestAddPartitions) throws TException, IOException {
List<Partition> existingPartitions = new ArrayList<>();
List<Partition> newPartitions = new ArrayList<>();
List<ColumnStatistics> modifiedPartitionStatisticsList = new ArrayList<>();
List<ColumnStatistics> newPartitionStatisticsList = new ArrayList<>();

for (int i = 0; i < numTestAlterPartitions; i++) {
existingPartitions.add(newPartition(String.format("exist_%s", i), String.format("exist_%s_sub", i)));
modifiedPartitionStatisticsList.add(
newPartitionStatistics(String.format("exist_%s", i), String.format("exist_%s_sub", i)));
}

for (int i = 0; i < numTestAddPartitions; i++) {
newPartitions.add(newPartition(String.format("new_%s", i), String.format("new_%s_sub", i)));
newPartitionStatisticsList.add(
newPartitionStatistics(String.format("new_%s", i), String.format("new_%s_sub", i)));
}

List<List<Partition>> alterSublists = Lists.partition(existingPartitions, TEST_PARTITION_BATCH_SIZE);
int numAlterBatches = alterSublists.size();
int lastAlterBatchSize = numAlterBatches == 0 ? 0 : alterSublists.get(alterSublists.size()-1).size();

List<List<Partition>> addSublists = Lists.partition(newPartitions, TEST_PARTITION_BATCH_SIZE);
int numAddBatches = addSublists.size();
int lastAddBatchSize = numAddBatches == 0 ? 0 : addSublists.get(addSublists.size()-1).size();

List<List<ColumnStatistics>> statsSublists = Lists.partition(
ListUtils.union(modifiedPartitionStatisticsList, newPartitionStatisticsList), TEST_PARTITION_BATCH_SIZE);
int numStatisticsBatches = statsSublists.size();
int lastStatisticsBatchSize = numStatisticsBatches == 0 ? 0 : statsSublists.get(statsSublists.size()-1).size();


List<String> testPartitionNames = new ArrayList<>();
for (Partition p : (List<Partition>) ListUtils.union(existingPartitions, newPartitions)) {
testPartitionNames.add(partitionName((String[]) p.getValues().toArray()));
}

when(mockMetaStoreClient.getPartitionsByNames(DB_NAME, TABLE_NAME, testPartitionNames))
.thenReturn(existingPartitions);

Map<String, List<ColumnStatisticsObj>> partitionStatsMap = new HashMap<>();
partitionStatsMap
.put(Warehouse.makePartName(PARTITIONS, newPartition.getValues()), newPartitionStatistics.getStatsObj());
partitionStatsMap
.put(Warehouse.makePartName(PARTITIONS, modifiedPartition.getValues()),
modifiedPartitionStatistics.getStatsObj());
for (int i = 0; i < numTestAddPartitions; i++) {
partitionStatsMap
.put(Warehouse.makePartName(PARTITIONS, newPartitions.get(i).getValues()),
newPartitionStatisticsList.get(i).getStatsObj());
}
for (int i = 0; i < numTestAlterPartitions; i++) {
partitionStatsMap
.put(Warehouse.makePartName(PARTITIONS, existingPartitions.get(i).getValues()),
modifiedPartitionStatisticsList.get(i).getStatsObj());
}

PartitionsAndStatistics partitionsAndStatistics = new PartitionsAndStatistics(sourceTable.getPartitionKeys(),
Arrays.asList(modifiedPartition, newPartition), partitionStatsMap);
when(mockReplicaLocationManager.getPartitionLocation(existingPartition))
.thenReturn(new Path(tableLocation, "c=one/d=two"));
when(mockReplicaLocationManager.getPartitionLocation(newPartition))
.thenReturn(new Path(tableLocation, "c=three/d=four"));
ListUtils.union(existingPartitions, newPartitions), partitionStatsMap);
for (int i = 0; i < numTestAddPartitions; i++) {
when(mockReplicaLocationManager.getPartitionLocation(newPartitions.get(i)))
.thenReturn(new Path(tableLocation, String.format("c=new_%s/d=new_%s_sub", i, i)));
}
for (int i = 0; i < numTestAlterPartitions; i++) {
when(mockReplicaLocationManager.getPartitionLocation(existingPartitions.get(i)))
.thenReturn(new Path(tableLocation, String.format("c=exist_%s/d=exist_%s_sub", i, i)));
}

existingReplicaTable.getParameters().put(REPLICATION_EVENT.parameterName(), "previousEventId");

Expand All @@ -423,41 +466,106 @@ public void alteringExistingPartitionedReplicaTableWithPartitionsSucceeds() thro

verify(mockMetaStoreClient).alter_table(eq(DB_NAME), eq(TABLE_NAME), any(Table.class));
verify(mockMetaStoreClient).updateTableColumnStatistics(columnStatistics);
verify(mockReplicaLocationManager).addCleanUpLocation(anyString(), any(Path.class));
verify(mockMetaStoreClient).alter_partitions(eq(DB_NAME), eq(TABLE_NAME), alterPartitionCaptor.capture());
verify(mockMetaStoreClient).add_partitions(addPartitionCaptor.capture());
verify(mockReplicaLocationManager, times(numTestAlterPartitions)).addCleanUpLocation(anyString(), any(Path.class));
verify(mockMetaStoreClient, times(numAlterBatches)).alter_partitions(eq(DB_NAME), eq(TABLE_NAME), alterPartitionCaptor.capture());
verify(mockMetaStoreClient, times(numAddBatches)).add_partitions(addPartitionCaptor.capture());

// Validate that the args were expected number of batches , and expected batch sizes
List<List<Partition>> addCaptorValues = addPartitionCaptor.getAllValues();
assertThat(addCaptorValues.size(), is(numAddBatches));

for (int batchNdx = 0; batchNdx < numAddBatches; batchNdx++) {
int thisBatchSize = batchNdx < (numAddBatches - 1) ? TEST_PARTITION_BATCH_SIZE : lastAddBatchSize;
List<Partition> addBatch = addCaptorValues.get(batchNdx);
assertThat(addBatch.size(), is(thisBatchSize));
for (int entryInBatchNdx = 0; entryInBatchNdx < addBatch.size(); entryInBatchNdx++) {
assertThat(addBatch.get(entryInBatchNdx).getValues(),
is(Arrays.asList(String.format("new_%s", (batchNdx * TEST_PARTITION_BATCH_SIZE) + entryInBatchNdx),
String.format("new_%s_sub", (batchNdx * TEST_PARTITION_BATCH_SIZE) + entryInBatchNdx))));
}
}

assertThat(alterPartitionCaptor.getValue().size(), is(1));
assertThat(addPartitionCaptor.getValue().size(), is(1));
List<List<Partition>> alterCaptorValues = alterPartitionCaptor.getAllValues();
assertThat(alterCaptorValues.size(), is(numAlterBatches));
for (int batchNdx = 0; batchNdx < numAlterBatches; batchNdx++) {
int thisBatchSize = batchNdx < (numAlterBatches - 1) ? TEST_PARTITION_BATCH_SIZE : lastAlterBatchSize;
List<Partition> alterBatch = alterCaptorValues.get(batchNdx);
assertThat(alterBatch.size(), is(thisBatchSize));
for (int entryInBatchNdx = 0; entryInBatchNdx < alterBatch.size(); entryInBatchNdx++) {
assertThat(alterBatch.get(entryInBatchNdx).getValues(),
is(Arrays.asList(String.format("exist_%s", (batchNdx * TEST_PARTITION_BATCH_SIZE) + entryInBatchNdx),
String.format("exist_%s_sub", (batchNdx * TEST_PARTITION_BATCH_SIZE) + entryInBatchNdx))));
}
}

Partition altered = alterPartitionCaptor.getValue().get(0);
assertThat(altered.getValues(), is(Arrays.asList("one", "two")));
verify(mockMetaStoreClient, times(numStatisticsBatches)).setPartitionColumnStatistics(setStatsRequestCaptor.capture());
List<SetPartitionsStatsRequest> statsRequestList = setStatsRequestCaptor.getAllValues();
assertThat(statsRequestList.size(), is(numStatisticsBatches));

Partition added = addPartitionCaptor.getValue().get(0);
assertThat(added.getValues(), is(Arrays.asList("three", "four")));
List<ColumnStatistics> columnStats = new ArrayList<>();
for (int colStatNdx = 0; colStatNdx < numStatisticsBatches; colStatNdx++) {
int thisBatchSize = colStatNdx < (numStatisticsBatches - 1) ? TEST_PARTITION_BATCH_SIZE : lastStatisticsBatchSize;
assertThat(statsRequestList.get(colStatNdx).getColStats().size(), is(thisBatchSize));
columnStats.addAll(statsRequestList.get(colStatNdx).getColStats());
}

verify(mockMetaStoreClient).setPartitionColumnStatistics(setStatsRequestCaptor.capture());
SetPartitionsStatsRequest statsRequest = setStatsRequestCaptor.getValue();
assertThat(columnStats.size(), is(numTestAlterPartitions + numTestAddPartitions));

List<ColumnStatistics> columnStats = new ArrayList<>(statsRequest.getColStats());
Collections.sort(columnStats, new Comparator<ColumnStatistics>() {
@Override
public int compare(ColumnStatistics o1, ColumnStatistics o2) {
return o1.getStatsDesc().getPartName().compareTo(o2.getStatsDesc().getPartName());
}
});
assertThat(columnStats.size(), is(2));
for (int colStatNdx = 0; colStatNdx < numTestAlterPartitions; colStatNdx++) {
assertThat(columnStats.get(colStatNdx).getStatsDesc().isIsTblLevel(), is(false));
assertThat(columnStats.get(colStatNdx).getStatsDesc().getDbName(), is(DB_NAME));
assertThat(columnStats.get(colStatNdx).getStatsDesc().getTableName(), is(TABLE_NAME));
assertThat(columnStats.get(colStatNdx).getStatsDesc().getPartName(),
is(String.format("c=exist_%s/d=exist_%s_sub", colStatNdx, colStatNdx)));
assertThat(columnStats.get(colStatNdx).getStatsObj().size(), is(2));
}

assertThat(columnStats.get(0).getStatsDesc().isIsTblLevel(), is(false));
assertThat(columnStats.get(0).getStatsDesc().getDbName(), is(DB_NAME));
assertThat(columnStats.get(0).getStatsDesc().getTableName(), is(TABLE_NAME));
assertThat(columnStats.get(0).getStatsDesc().getPartName(), is("c=one/d=two"));
assertThat(columnStats.get(0).getStatsObj().size(), is(2));
assertThat(columnStats.get(1).getStatsDesc().isIsTblLevel(), is(false));
assertThat(columnStats.get(1).getStatsDesc().getDbName(), is(DB_NAME));
assertThat(columnStats.get(1).getStatsDesc().getTableName(), is(TABLE_NAME));
assertThat(columnStats.get(1).getStatsDesc().getPartName(), is("c=three/d=four"));
assertThat(columnStats.get(1).getStatsObj().size(), is(2));
for (int colStatNdx = numTestAlterPartitions, addPartColStatNdx = 0;
colStatNdx < numTestAlterPartitions + numTestAddPartitions;
colStatNdx++, addPartColStatNdx++) {
assertThat(columnStats.get(colStatNdx).getStatsDesc().isIsTblLevel(), is(false));
assertThat(columnStats.get(colStatNdx).getStatsDesc().getDbName(), is(DB_NAME));
assertThat(columnStats.get(colStatNdx).getStatsDesc().getTableName(), is(TABLE_NAME));
assertThat(columnStats.get(colStatNdx).getStatsDesc().getPartName(),
is(String.format("c=new_%s/d=new_%s_sub", addPartColStatNdx, addPartColStatNdx)));
assertThat(columnStats.get(colStatNdx).getStatsObj().size(), is(2));
}
}


@Test
public void alteringExistingPartitionedReplicaTableWithNewPartitionsInBatchesSucceeds_0_0() throws TException, IOException {
alterExistingPartitionedReplicaTableWithNewPartitionsInBatches(0,0);
}

@Test
public void alteringExistingPartitionedReplicaTableWithNewPartitionsInBatchesSucceeds_0_1() throws TException, IOException {
alterExistingPartitionedReplicaTableWithNewPartitionsInBatches(0,1);
}

@Test
public void alteringExistingPartitionedReplicaTableWithNewPartitionsInBatchesSucceeds_1_0() throws TException, IOException {
alterExistingPartitionedReplicaTableWithNewPartitionsInBatches(1,0);
}

@Test
public void alteringExistingPartitionedReplicaTableWithNewPartitionsInBatchesSucceeds_1_1() throws TException, IOException {
alterExistingPartitionedReplicaTableWithNewPartitionsInBatches(1,1);
}

@Test
public void alteringExistingPartitionedReplicaTableWithNewPartitionsInBatchesSucceeds_boundaries() throws TException, IOException {
alterExistingPartitionedReplicaTableWithNewPartitionsInBatches(TEST_PARTITION_BATCH_SIZE,TEST_PARTITION_BATCH_SIZE);
}

@Test
public void alteringExistingPartitionedReplicaTableWithNewPartitionsInBatchesSucceeds_many() throws TException, IOException {
alterExistingPartitionedReplicaTableWithNewPartitionsInBatches(17,28);
}

@Test
public void alteringExistingPartitionedReplicaTableWithNewPartitionsInBatchesSucceeds_lots() throws TException, IOException {
alterExistingPartitionedReplicaTableWithNewPartitionsInBatches(172,333);
}

@Test
Expand Down

0 comments on commit 3ce18cc

Please sign in to comment.