diff --git a/CHANGELOG.md b/CHANGELOG.md index 2084ac76..882b950f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## [15.1.1] - 2020-02-06 +### Fixed +* When replicating tables with large numbers of partitions, `Replica.updateMetadata` now calls add/alter partition in batches of 1000. See [#166](https://github.com/HotelsDotCom/circus-train/issues/166). + ## [15.1.0] - 2020-01-28 ### Changed * AVRO Schema Copier now re-uses the normal 'data' copier instead of its own. See [#162](https://github.com/HotelsDotCom/circus-train/issues/162). diff --git a/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/replica/Replica.java b/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/replica/Replica.java index 2614391b..64380652 100644 --- a/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/replica/Replica.java +++ b/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/replica/Replica.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2019 Expedia, Inc. + * Copyright (C) 2016-2020 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,9 +42,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Enums; import com.google.common.base.Optional; import com.google.common.base.Supplier; +import com.google.common.collect.Lists; import com.hotels.bdp.circustrain.api.CircusTrainException; import com.hotels.bdp.circustrain.api.ReplicaLocationManager; @@ -71,6 +73,7 @@ public class Replica extends HiveEndpoint { private final ReplicaCatalogListener replicaCatalogListener; private final ReplicationMode replicationMode; private final TableReplication tableReplication; + private int partitionBatchSize = 1000; /** * Use {@link ReplicaFactory} @@ -91,6 +94,28 @@ public class Replica extends HiveEndpoint { this.tableReplication = tableReplication; } + /** + * Use {@link ReplicaFactory} + */ + @VisibleForTesting + Replica( + ReplicaCatalog replicaCatalog, + HiveConf replicaHiveConf, + Supplier replicaMetaStoreClientSupplier, + ReplicaTableFactory replicaTableFactory, + HousekeepingListener housekeepingListener, + ReplicaCatalogListener replicaCatalogListener, + TableReplication tableReplication, + int partitionBatchSize) { + super(replicaCatalog.getName(), replicaHiveConf, replicaMetaStoreClientSupplier); + this.replicaCatalogListener = replicaCatalogListener; + tableFactory = replicaTableFactory; + this.housekeepingListener = housekeepingListener; + replicationMode = tableReplication.getReplicationMode(); + this.tableReplication = tableReplication; + this.partitionBatchSize = partitionBatchSize; + } + public void updateMetadata( String eventId, TableAndStatistics sourceTable, @@ -175,7 +200,13 @@ public void updateMetadata( if (!partitionsToCreate.isEmpty()) { LOG.info("Creating {} new partitions.", partitionsToCreate.size()); try { - client.add_partitions(partitionsToCreate); + int counter = 0; + for (List sublist : Lists.partition(partitionsToCreate, partitionBatchSize)) { + int start = counter * partitionBatchSize; + LOG.info("Creating partitions {} through {}", start, start + sublist.size() - 1); + client.add_partitions(sublist); + counter++; + } } catch (TException e) { throw new MetaStoreClientException("Unable to add partitions '" + partitionsToCreate @@ -189,7 +220,13 @@ public void updateMetadata( if (!partitionsToAlter.isEmpty()) { LOG.info("Altering {} existing partitions.", partitionsToAlter.size()); try { - client.alter_partitions(replicaDatabaseName, replicaTableName, partitionsToAlter); + int counter = 0; + for (List sublist : Lists.partition(partitionsToAlter, partitionBatchSize)) { + int start = counter * partitionBatchSize; + LOG.info("Altering partitions {} through {}", start, start + sublist.size() - 1); + client.alter_partitions(replicaDatabaseName, replicaTableName, sublist); + counter++; + } } catch (TException e) { throw new MetaStoreClientException("Unable to alter partitions '" + partitionsToAlter @@ -203,7 +240,13 @@ public void updateMetadata( if (!statisticsToSet.isEmpty()) { LOG.info("Setting column statistics for {} partitions.", statisticsToSet.size()); try { - client.setPartitionColumnStatistics(new SetPartitionsStatsRequest(statisticsToSet)); + int counter = 0; + for (List sublist : Lists.partition(statisticsToSet, partitionBatchSize)) { + int start = counter * partitionBatchSize; + LOG.info("Setting column statistics for partitions {} through {}", start, start + sublist.size() - 1); + client.setPartitionColumnStatistics(new SetPartitionsStatsRequest(sublist)); + counter++; + } } catch (TException e) { throw new MetaStoreClientException( "Unable to set column statistics of replica table '" + replicaDatabaseName + "." + replicaTableName + "'", diff --git a/circus-train-core/src/test/java/com/hotels/bdp/circustrain/core/replica/ReplicaTest.java b/circus-train-core/src/test/java/com/hotels/bdp/circustrain/core/replica/ReplicaTest.java index ad03aa8b..9acb8468 100644 --- a/circus-train-core/src/test/java/com/hotels/bdp/circustrain/core/replica/ReplicaTest.java +++ b/circus-train-core/src/test/java/com/hotels/bdp/circustrain/core/replica/ReplicaTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2019 Expedia, Inc. + * Copyright (C) 2016-2020 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @@ -38,6 +39,7 @@ import java.util.List; import java.util.Map; +import org.apache.commons.collections.ListUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; @@ -103,6 +105,7 @@ public class ReplicaTest { private static final FieldSchema FIELD_D = new FieldSchema(COLUMN_D, "string", null); private static final List FIELDS = Arrays.asList(FIELD_A, FIELD_B); private static final List PARTITIONS = Arrays.asList(FIELD_C, FIELD_D); + private static final int TEST_PARTITION_BATCH_SIZE = 17; public @Rule TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -175,7 +178,7 @@ private void convertExistingReplicaTableToView() { private Replica newReplica(TableReplication tableReplication) { return new Replica(replicaCatalog, hiveConf, metaStoreClientSupplier, tableFactory, houseKeepingListener, - replicaCatalogListener, tableReplication); + replicaCatalogListener, tableReplication, TEST_PARTITION_BATCH_SIZE); } @Test @@ -391,29 +394,69 @@ public void getName() throws Exception { assertThat(replica.getName(), is(NAME)); } - @Test - public void alteringExistingPartitionedReplicaTableWithPartitionsSucceeds() throws TException, IOException { - Partition newPartition = newPartition("three", "four"); - ColumnStatistics newPartitionStatistics = newPartitionStatistics("three", "four"); - Partition modifiedPartition = new Partition(existingPartition); - ColumnStatistics modifiedPartitionStatistics = newPartitionStatistics("one", "two"); - when(mockMetaStoreClient - .getPartitionsByNames(DB_NAME, TABLE_NAME, Lists.newArrayList("c=one/d=two", "c=three/d=four"))) - .thenReturn(Arrays.asList(existingPartition)); + private void alterExistingPartitionedReplicaTableWithNewPartitionsInBatches( + int numTestAlterPartitions, int numTestAddPartitions) throws TException, IOException { + List existingPartitions = new ArrayList<>(); + List newPartitions = new ArrayList<>(); + List modifiedPartitionStatisticsList = new ArrayList<>(); + List newPartitionStatisticsList = new ArrayList<>(); + + for (int i = 0; i < numTestAlterPartitions; i++) { + existingPartitions.add(newPartition(String.format("exist_%s", i), String.format("exist_%s_sub", i))); + modifiedPartitionStatisticsList.add( + newPartitionStatistics(String.format("exist_%s", i), String.format("exist_%s_sub", i))); + } + + for (int i = 0; i < numTestAddPartitions; i++) { + newPartitions.add(newPartition(String.format("new_%s", i), String.format("new_%s_sub", i))); + newPartitionStatisticsList.add( + newPartitionStatistics(String.format("new_%s", i), String.format("new_%s_sub", i))); + } + + List> alterSublists = Lists.partition(existingPartitions, TEST_PARTITION_BATCH_SIZE); + int numAlterBatches = alterSublists.size(); + int lastAlterBatchSize = numAlterBatches == 0 ? 0 : alterSublists.get(alterSublists.size()-1).size(); + + List> addSublists = Lists.partition(newPartitions, TEST_PARTITION_BATCH_SIZE); + int numAddBatches = addSublists.size(); + int lastAddBatchSize = numAddBatches == 0 ? 0 : addSublists.get(addSublists.size()-1).size(); + + List> statsSublists = Lists.partition( + ListUtils.union(modifiedPartitionStatisticsList, newPartitionStatisticsList), TEST_PARTITION_BATCH_SIZE); + int numStatisticsBatches = statsSublists.size(); + int lastStatisticsBatchSize = numStatisticsBatches == 0 ? 0 : statsSublists.get(statsSublists.size()-1).size(); + + + List testPartitionNames = new ArrayList<>(); + for (Partition p : (List) ListUtils.union(existingPartitions, newPartitions)) { + testPartitionNames.add(partitionName((String[]) p.getValues().toArray())); + } + + when(mockMetaStoreClient.getPartitionsByNames(DB_NAME, TABLE_NAME, testPartitionNames)) + .thenReturn(existingPartitions); Map> partitionStatsMap = new HashMap<>(); - partitionStatsMap - .put(Warehouse.makePartName(PARTITIONS, newPartition.getValues()), newPartitionStatistics.getStatsObj()); - partitionStatsMap - .put(Warehouse.makePartName(PARTITIONS, modifiedPartition.getValues()), - modifiedPartitionStatistics.getStatsObj()); + for (int i = 0; i < numTestAddPartitions; i++) { + partitionStatsMap + .put(Warehouse.makePartName(PARTITIONS, newPartitions.get(i).getValues()), + newPartitionStatisticsList.get(i).getStatsObj()); + } + for (int i = 0; i < numTestAlterPartitions; i++) { + partitionStatsMap + .put(Warehouse.makePartName(PARTITIONS, existingPartitions.get(i).getValues()), + modifiedPartitionStatisticsList.get(i).getStatsObj()); + } PartitionsAndStatistics partitionsAndStatistics = new PartitionsAndStatistics(sourceTable.getPartitionKeys(), - Arrays.asList(modifiedPartition, newPartition), partitionStatsMap); - when(mockReplicaLocationManager.getPartitionLocation(existingPartition)) - .thenReturn(new Path(tableLocation, "c=one/d=two")); - when(mockReplicaLocationManager.getPartitionLocation(newPartition)) - .thenReturn(new Path(tableLocation, "c=three/d=four")); + ListUtils.union(existingPartitions, newPartitions), partitionStatsMap); + for (int i = 0; i < numTestAddPartitions; i++) { + when(mockReplicaLocationManager.getPartitionLocation(newPartitions.get(i))) + .thenReturn(new Path(tableLocation, String.format("c=new_%s/d=new_%s_sub", i, i))); + } + for (int i = 0; i < numTestAlterPartitions; i++) { + when(mockReplicaLocationManager.getPartitionLocation(existingPartitions.get(i))) + .thenReturn(new Path(tableLocation, String.format("c=exist_%s/d=exist_%s_sub", i, i))); + } existingReplicaTable.getParameters().put(REPLICATION_EVENT.parameterName(), "previousEventId"); @@ -423,41 +466,106 @@ public void alteringExistingPartitionedReplicaTableWithPartitionsSucceeds() thro verify(mockMetaStoreClient).alter_table(eq(DB_NAME), eq(TABLE_NAME), any(Table.class)); verify(mockMetaStoreClient).updateTableColumnStatistics(columnStatistics); - verify(mockReplicaLocationManager).addCleanUpLocation(anyString(), any(Path.class)); - verify(mockMetaStoreClient).alter_partitions(eq(DB_NAME), eq(TABLE_NAME), alterPartitionCaptor.capture()); - verify(mockMetaStoreClient).add_partitions(addPartitionCaptor.capture()); + verify(mockReplicaLocationManager, times(numTestAlterPartitions)).addCleanUpLocation(anyString(), any(Path.class)); + verify(mockMetaStoreClient, times(numAlterBatches)).alter_partitions(eq(DB_NAME), eq(TABLE_NAME), alterPartitionCaptor.capture()); + verify(mockMetaStoreClient, times(numAddBatches)).add_partitions(addPartitionCaptor.capture()); + + // Validate that the args were expected number of batches , and expected batch sizes + List> addCaptorValues = addPartitionCaptor.getAllValues(); + assertThat(addCaptorValues.size(), is(numAddBatches)); + + for (int batchNdx = 0; batchNdx < numAddBatches; batchNdx++) { + int thisBatchSize = batchNdx < (numAddBatches - 1) ? TEST_PARTITION_BATCH_SIZE : lastAddBatchSize; + List addBatch = addCaptorValues.get(batchNdx); + assertThat(addBatch.size(), is(thisBatchSize)); + for (int entryInBatchNdx = 0; entryInBatchNdx < addBatch.size(); entryInBatchNdx++) { + assertThat(addBatch.get(entryInBatchNdx).getValues(), + is(Arrays.asList(String.format("new_%s", (batchNdx * TEST_PARTITION_BATCH_SIZE) + entryInBatchNdx), + String.format("new_%s_sub", (batchNdx * TEST_PARTITION_BATCH_SIZE) + entryInBatchNdx)))); + } + } - assertThat(alterPartitionCaptor.getValue().size(), is(1)); - assertThat(addPartitionCaptor.getValue().size(), is(1)); + List> alterCaptorValues = alterPartitionCaptor.getAllValues(); + assertThat(alterCaptorValues.size(), is(numAlterBatches)); + for (int batchNdx = 0; batchNdx < numAlterBatches; batchNdx++) { + int thisBatchSize = batchNdx < (numAlterBatches - 1) ? TEST_PARTITION_BATCH_SIZE : lastAlterBatchSize; + List alterBatch = alterCaptorValues.get(batchNdx); + assertThat(alterBatch.size(), is(thisBatchSize)); + for (int entryInBatchNdx = 0; entryInBatchNdx < alterBatch.size(); entryInBatchNdx++) { + assertThat(alterBatch.get(entryInBatchNdx).getValues(), + is(Arrays.asList(String.format("exist_%s", (batchNdx * TEST_PARTITION_BATCH_SIZE) + entryInBatchNdx), + String.format("exist_%s_sub", (batchNdx * TEST_PARTITION_BATCH_SIZE) + entryInBatchNdx)))); + } + } - Partition altered = alterPartitionCaptor.getValue().get(0); - assertThat(altered.getValues(), is(Arrays.asList("one", "two"))); + verify(mockMetaStoreClient, times(numStatisticsBatches)).setPartitionColumnStatistics(setStatsRequestCaptor.capture()); + List statsRequestList = setStatsRequestCaptor.getAllValues(); + assertThat(statsRequestList.size(), is(numStatisticsBatches)); - Partition added = addPartitionCaptor.getValue().get(0); - assertThat(added.getValues(), is(Arrays.asList("three", "four"))); + List columnStats = new ArrayList<>(); + for (int colStatNdx = 0; colStatNdx < numStatisticsBatches; colStatNdx++) { + int thisBatchSize = colStatNdx < (numStatisticsBatches - 1) ? TEST_PARTITION_BATCH_SIZE : lastStatisticsBatchSize; + assertThat(statsRequestList.get(colStatNdx).getColStats().size(), is(thisBatchSize)); + columnStats.addAll(statsRequestList.get(colStatNdx).getColStats()); + } - verify(mockMetaStoreClient).setPartitionColumnStatistics(setStatsRequestCaptor.capture()); - SetPartitionsStatsRequest statsRequest = setStatsRequestCaptor.getValue(); + assertThat(columnStats.size(), is(numTestAlterPartitions + numTestAddPartitions)); - List columnStats = new ArrayList<>(statsRequest.getColStats()); - Collections.sort(columnStats, new Comparator() { - @Override - public int compare(ColumnStatistics o1, ColumnStatistics o2) { - return o1.getStatsDesc().getPartName().compareTo(o2.getStatsDesc().getPartName()); - } - }); - assertThat(columnStats.size(), is(2)); + for (int colStatNdx = 0; colStatNdx < numTestAlterPartitions; colStatNdx++) { + assertThat(columnStats.get(colStatNdx).getStatsDesc().isIsTblLevel(), is(false)); + assertThat(columnStats.get(colStatNdx).getStatsDesc().getDbName(), is(DB_NAME)); + assertThat(columnStats.get(colStatNdx).getStatsDesc().getTableName(), is(TABLE_NAME)); + assertThat(columnStats.get(colStatNdx).getStatsDesc().getPartName(), + is(String.format("c=exist_%s/d=exist_%s_sub", colStatNdx, colStatNdx))); + assertThat(columnStats.get(colStatNdx).getStatsObj().size(), is(2)); + } - assertThat(columnStats.get(0).getStatsDesc().isIsTblLevel(), is(false)); - assertThat(columnStats.get(0).getStatsDesc().getDbName(), is(DB_NAME)); - assertThat(columnStats.get(0).getStatsDesc().getTableName(), is(TABLE_NAME)); - assertThat(columnStats.get(0).getStatsDesc().getPartName(), is("c=one/d=two")); - assertThat(columnStats.get(0).getStatsObj().size(), is(2)); - assertThat(columnStats.get(1).getStatsDesc().isIsTblLevel(), is(false)); - assertThat(columnStats.get(1).getStatsDesc().getDbName(), is(DB_NAME)); - assertThat(columnStats.get(1).getStatsDesc().getTableName(), is(TABLE_NAME)); - assertThat(columnStats.get(1).getStatsDesc().getPartName(), is("c=three/d=four")); - assertThat(columnStats.get(1).getStatsObj().size(), is(2)); + for (int colStatNdx = numTestAlterPartitions, addPartColStatNdx = 0; + colStatNdx < numTestAlterPartitions + numTestAddPartitions; + colStatNdx++, addPartColStatNdx++) { + assertThat(columnStats.get(colStatNdx).getStatsDesc().isIsTblLevel(), is(false)); + assertThat(columnStats.get(colStatNdx).getStatsDesc().getDbName(), is(DB_NAME)); + assertThat(columnStats.get(colStatNdx).getStatsDesc().getTableName(), is(TABLE_NAME)); + assertThat(columnStats.get(colStatNdx).getStatsDesc().getPartName(), + is(String.format("c=new_%s/d=new_%s_sub", addPartColStatNdx, addPartColStatNdx))); + assertThat(columnStats.get(colStatNdx).getStatsObj().size(), is(2)); + } + } + + + @Test + public void alteringExistingPartitionedReplicaTableWithNewPartitionsInBatchesSucceeds_0_0() throws TException, IOException { + alterExistingPartitionedReplicaTableWithNewPartitionsInBatches(0,0); + } + + @Test + public void alteringExistingPartitionedReplicaTableWithNewPartitionsInBatchesSucceeds_0_1() throws TException, IOException { + alterExistingPartitionedReplicaTableWithNewPartitionsInBatches(0,1); + } + + @Test + public void alteringExistingPartitionedReplicaTableWithNewPartitionsInBatchesSucceeds_1_0() throws TException, IOException { + alterExistingPartitionedReplicaTableWithNewPartitionsInBatches(1,0); + } + + @Test + public void alteringExistingPartitionedReplicaTableWithNewPartitionsInBatchesSucceeds_1_1() throws TException, IOException { + alterExistingPartitionedReplicaTableWithNewPartitionsInBatches(1,1); + } + + @Test + public void alteringExistingPartitionedReplicaTableWithNewPartitionsInBatchesSucceeds_boundaries() throws TException, IOException { + alterExistingPartitionedReplicaTableWithNewPartitionsInBatches(TEST_PARTITION_BATCH_SIZE,TEST_PARTITION_BATCH_SIZE); + } + + @Test + public void alteringExistingPartitionedReplicaTableWithNewPartitionsInBatchesSucceeds_many() throws TException, IOException { + alterExistingPartitionedReplicaTableWithNewPartitionsInBatches(17,28); + } + + @Test + public void alteringExistingPartitionedReplicaTableWithNewPartitionsInBatchesSucceeds_lots() throws TException, IOException { + alterExistingPartitionedReplicaTableWithNewPartitionsInBatches(172,333); } @Test