From feef340411d543738be632faf98ff82762f59cf3 Mon Sep 17 00:00:00 2001 From: yunhong <337361684@qq.com> Date: Thu, 6 Feb 2025 20:01:46 +0800 Subject: [PATCH] [server] Support addPartition and dropPartition for static partition table --- .../com/alibaba/fluss/client/admin/Admin.java | 49 ++ .../fluss/client/admin/FlussAdmin.java | 19 + .../client/table/getter/PartitionGetter.java | 23 +- .../client/utils/ClientRpcMessageUtils.java | 41 ++ .../fluss/client/admin/FlussAdminITCase.java | 193 ++++++++ ...e.java => AutoPartitionedTableITCase.java} | 59 ++- .../table/StaticPartitionedTableITCase.java | 232 +++++++++ .../PartitionAlreadyExistsException.java | 35 ++ .../PartitionSpecInvalidException.java | 38 ++ .../alibaba/fluss/metadata/PartitionSpec.java | 69 +++ .../fluss/metadata/PhysicalTablePath.java | 2 +- .../com/alibaba/fluss/metadata/TablePath.java | 14 +- .../fluss/utils/AutoPartitionUtils.java | 54 -- .../alibaba/fluss/utils/PartitionUtils.java | 148 ++++++ ...UtilsTest.java => PartitionUtilsTest.java} | 54 +- .../connector/flink/catalog/FlinkCatalog.java | 86 +++- .../flink/utils/CatalogExceptionUtils.java | 20 + .../flink/catalog/FlinkCatalogITCase.java | 122 ++++- .../fluss/rpc/gateway/AdminGateway.java | 20 + .../alibaba/fluss/rpc/protocol/ApiKeys.java | 4 +- .../alibaba/fluss/rpc/protocol/Errors.java | 8 +- fluss-rpc/src/main/proto/FlussApi.proto | 23 + .../coordinator/AutoPartitionManager.java | 292 ----------- .../CoordinatorEventProcessor.java | 32 +- .../server/coordinator/CoordinatorServer.java | 15 +- .../coordinator/CoordinatorService.java | 65 ++- .../server/coordinator/PartitionManager.java | 461 ++++++++++++++++++ .../coordinator/event/DropTableEvent.java | 17 +- .../event/watcher/TableChangeWatcher.java | 1 + .../fluss/server/utils/RpcMessageUtils.java | 10 + .../utils/TableDescriptorValidation.java | 86 ++-- .../coordinator/AutoPartitionManagerTest.java | 16 +- .../CoordinatorEventProcessorTest.java | 28 +- .../coordinator/TableManagerITCase.java | 30 +- .../coordinator/TestCoordinatorGateway.java | 14 + .../event/watcher/TableChangeWatcherTest.java | 4 +- .../TableBucketStateMachineTest.java | 14 +- .../testutils/FlussClusterExtension.java | 4 + 38 files changed, 1888 insertions(+), 514 deletions(-) rename fluss-client/src/test/java/com/alibaba/fluss/client/table/{FlussPartitionedTableITCase.java => AutoPartitionedTableITCase.java} (86%) create mode 100644 fluss-client/src/test/java/com/alibaba/fluss/client/table/StaticPartitionedTableITCase.java create mode 100644 fluss-common/src/main/java/com/alibaba/fluss/exception/PartitionAlreadyExistsException.java create mode 100644 fluss-common/src/main/java/com/alibaba/fluss/exception/PartitionSpecInvalidException.java create mode 100644 fluss-common/src/main/java/com/alibaba/fluss/metadata/PartitionSpec.java delete mode 100644 fluss-common/src/main/java/com/alibaba/fluss/utils/AutoPartitionUtils.java create mode 100644 fluss-common/src/main/java/com/alibaba/fluss/utils/PartitionUtils.java rename fluss-common/src/test/java/com/alibaba/fluss/utils/{AutoPartitionUtilsTest.java => PartitionUtilsTest.java} (58%) delete mode 100644 fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/AutoPartitionManager.java create mode 100644 fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/PartitionManager.java diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/admin/Admin.java b/fluss-client/src/main/java/com/alibaba/fluss/client/admin/Admin.java index f1d4b0396..f26774536 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/admin/Admin.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/admin/Admin.java @@ -29,7 +29,9 @@ import com.alibaba.fluss.exception.InvalidTableException; import com.alibaba.fluss.exception.KvSnapshotNotExistException; import com.alibaba.fluss.exception.NonPrimaryKeyTableException; +import com.alibaba.fluss.exception.PartitionAlreadyExistsException; import com.alibaba.fluss.exception.PartitionNotExistException; +import com.alibaba.fluss.exception.PartitionSpecInvalidException; import com.alibaba.fluss.exception.SchemaNotExistException; import com.alibaba.fluss.exception.TableAlreadyExistException; import com.alibaba.fluss.exception.TableNotExistException; @@ -38,6 +40,7 @@ import com.alibaba.fluss.metadata.DatabaseDescriptor; import com.alibaba.fluss.metadata.DatabaseInfo; import com.alibaba.fluss.metadata.PartitionInfo; +import com.alibaba.fluss.metadata.PartitionSpec; import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.SchemaInfo; import com.alibaba.fluss.metadata.TableBucket; @@ -328,4 +331,50 @@ ListOffsetsResult listOffsets( /** Describe the lake used for lakehouse storage. */ CompletableFuture describeLakeStorage(); + + /** + * Add a partition to a partitioned table. + * + *

The following exceptions can be anticipated when calling {@code get()} on returned future. + * + *

+ * + * @param tablePath The table path of the table. + * @param partitionSpec The partition spec to add. + * @param ignoreIfExists Flag to specify behavior when a partition with the given name already + * exists: if set to false, throw a PartitionAlreadyExistsException, if set to true, do + * nothing. + */ + CompletableFuture addPartition( + TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfExists); + + /** + * Drop a partition from a partitioned table. + * + *

The following exceptions can be anticipated when calling {@code get()} on returned future. + * + *

+ * + * @param tablePath The table path of the table. + * @param partitionSpec The partition spec to drop. + * @param ignoreIfNotExists Flag to specify behavior when a partition with the given name does + * not exist: if set to false, throw a PartitionNotExistException, if set to true, do + * nothing. + */ + CompletableFuture dropPartition( + TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfNotExists); } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java b/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java index ffc1bbbc9..2ee5506e6 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java @@ -27,6 +27,7 @@ import com.alibaba.fluss.metadata.DatabaseDescriptor; import com.alibaba.fluss.metadata.DatabaseInfo; import com.alibaba.fluss.metadata.PartitionInfo; +import com.alibaba.fluss.metadata.PartitionSpec; import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.SchemaInfo; @@ -73,6 +74,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makeAddPartitionRequest; +import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest; import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makeListOffsetsRequest; import static com.alibaba.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster; import static com.alibaba.fluss.utils.Preconditions.checkNotNull; @@ -347,6 +350,22 @@ public CompletableFuture describeLakeStorage() { .thenApply(ClientRpcMessageUtils::toLakeStorageInfo); } + @Override + public CompletableFuture addPartition( + TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfExists) { + return gateway.addPartition( + makeAddPartitionRequest(tablePath, partitionSpec, ignoreIfExists)) + .thenApply(r -> null); + } + + @Override + public CompletableFuture dropPartition( + TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfNotExists) { + return gateway.dropPartition( + makeDropPartitionRequest(tablePath, partitionSpec, ignoreIfNotExists)) + .thenApply(r -> null); + } + @Override public void close() { // nothing to do yet diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/table/getter/PartitionGetter.java b/fluss-client/src/main/java/com/alibaba/fluss/client/table/getter/PartitionGetter.java index 892e92dc1..686d32a78 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/table/getter/PartitionGetter.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/table/getter/PartitionGetter.java @@ -18,15 +18,19 @@ import com.alibaba.fluss.row.InternalRow; import com.alibaba.fluss.types.DataType; -import com.alibaba.fluss.types.DataTypeRoot; import com.alibaba.fluss.types.RowType; import com.alibaba.fluss.utils.Preconditions; +import java.util.Collections; import java.util.List; +import static com.alibaba.fluss.utils.PartitionUtils.getPartitionName; + /** A getter to get partition name from a row. */ public class PartitionGetter { + // TODO currently, only support one partition key. + private final String partitionKeyName; private final InternalRow.FieldGetter partitionFieldGetter; public PartitionGetter(RowType rowType, List partitionKeys) { @@ -36,23 +40,19 @@ public PartitionGetter(RowType rowType, List partitionKeys) { "Currently, partitioned table only supports one partition key, but got partition keys %s.", partitionKeys)); } + // check the partition column List fieldNames = rowType.getFieldNames(); - String partitionColumnName = partitionKeys.get(0); - int partitionColumnIndex = fieldNames.indexOf(partitionColumnName); + this.partitionKeyName = partitionKeys.get(0); + int partitionColumnIndex = fieldNames.indexOf(partitionKeyName); Preconditions.checkArgument( partitionColumnIndex >= 0, "The partition column %s is not in the row %s.", - partitionColumnName, + partitionKeyName, rowType); // check the data type of the partition column DataType partitionColumnDataType = rowType.getTypeAt(partitionColumnIndex); - Preconditions.checkArgument( - partitionColumnDataType.getTypeRoot() == DataTypeRoot.STRING, - "Currently, partitioned table only supports STRING type partition key, but got partition key '%s' with data type %s.", - partitionColumnName, - partitionColumnDataType); this.partitionFieldGetter = InternalRow.createFieldGetter(partitionColumnDataType, partitionColumnIndex); } @@ -60,6 +60,9 @@ public PartitionGetter(RowType rowType, List partitionKeys) { public String getPartition(InternalRow row) { Object partitionValue = partitionFieldGetter.getFieldOrNull(row); Preconditions.checkNotNull(partitionValue, "Partition value shouldn't be null."); - return partitionValue.toString(); + return getPartitionName( + Collections.singletonList(partitionKeyName), + Collections.singletonList(partitionValue.toString()), + false); } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java b/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java index e51973dcb..c73e38456 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java @@ -29,6 +29,7 @@ import com.alibaba.fluss.fs.token.ObtainedSecurityToken; import com.alibaba.fluss.lakehouse.LakeStorageInfo; import com.alibaba.fluss.metadata.PartitionInfo; +import com.alibaba.fluss.metadata.PartitionSpec; import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TablePath; @@ -37,7 +38,9 @@ import com.alibaba.fluss.remote.RemoteLogFetchInfo; import com.alibaba.fluss.remote.RemoteLogSegment; import com.alibaba.fluss.rpc.entity.FetchLogResultForBucket; +import com.alibaba.fluss.rpc.messages.AddPartitionRequest; import com.alibaba.fluss.rpc.messages.DescribeLakeStorageResponse; +import com.alibaba.fluss.rpc.messages.DropPartitionRequest; import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenResponse; import com.alibaba.fluss.rpc.messages.GetKvSnapshotMetadataResponse; import com.alibaba.fluss.rpc.messages.GetLatestKvSnapshotsResponse; @@ -377,6 +380,44 @@ public static ListOffsetsRequest makeListOffsetsRequest( return listOffsetsRequest; } + public static AddPartitionRequest makeAddPartitionRequest( + TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfNotExists) { + AddPartitionRequest addPartitionRequest = + new AddPartitionRequest().setIgnoreIfNotExists(ignoreIfNotExists); + addPartitionRequest + .setTablePath() + .setDatabaseName(tablePath.getDatabaseName()) + .setTableName(tablePath.getTableName()); + partitionSpec + .getPartitionSpec() + .forEach( + (partitionKey, value) -> + addPartitionRequest + .addPartitionSpec() + .setPartitionKey(partitionKey) + .setValue(value)); + return addPartitionRequest; + } + + public static DropPartitionRequest makeDropPartitionRequest( + TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfNotExists) { + DropPartitionRequest dropPartitionRequest = + new DropPartitionRequest().setIgnoreIfNotExists(ignoreIfNotExists); + dropPartitionRequest + .setTablePath() + .setDatabaseName(tablePath.getDatabaseName()) + .setTableName(tablePath.getTableName()); + partitionSpec + .getPartitionSpec() + .forEach( + (partitionKey, value) -> + dropPartitionRequest + .addPartitionSpec() + .setPartitionKey(partitionKey) + .setValue(value)); + return dropPartitionRequest; + } + public static List toPartitionInfos(ListPartitionInfosResponse response) { return response.getPartitionsInfosList().stream() .map( diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java index 4692e43a5..42ddb0231 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java @@ -30,6 +30,9 @@ import com.alibaba.fluss.exception.InvalidDatabaseException; import com.alibaba.fluss.exception.InvalidReplicationFactorException; import com.alibaba.fluss.exception.InvalidTableException; +import com.alibaba.fluss.exception.PartitionAlreadyExistsException; +import com.alibaba.fluss.exception.PartitionNotExistException; +import com.alibaba.fluss.exception.PartitionSpecInvalidException; import com.alibaba.fluss.exception.SchemaNotExistException; import com.alibaba.fluss.exception.TableNotExistException; import com.alibaba.fluss.exception.TableNotPartitionedException; @@ -39,6 +42,7 @@ import com.alibaba.fluss.metadata.KvFormat; import com.alibaba.fluss.metadata.LogFormat; import com.alibaba.fluss.metadata.PartitionInfo; +import com.alibaba.fluss.metadata.PartitionSpec; import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.SchemaInfo; import com.alibaba.fluss.metadata.TableBucket; @@ -55,7 +59,10 @@ import org.junit.jupiter.api.Test; import java.time.Duration; +import java.time.LocalDate; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -563,6 +570,177 @@ void testGetServerNodes() throws Exception { assertThat(serverNodes).containsExactlyInAnyOrderElementsOf(expectedNodes); } + @Test + void testCreateIllegalPartitionTable() { + // 1. key contains separator '$'. + String dbName = DEFAULT_TABLE_PATH.getDatabaseName(); + TableDescriptor partitionedTable = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("id", DataTypes.STRING()) + .column("name", DataTypes.STRING()) + .column("age$", DataTypes.INT()) + .build()) + .distributedBy(3, "id") + .partitionedBy("age$") + .build(); + TablePath tablePath = TablePath.of(dbName, "test_create_illegal_partitioned_table_1"); + assertThatThrownBy(() -> admin.createTable(tablePath, partitionedTable, true).get()) + .cause() + .isInstanceOf(InvalidTableException.class) + .hasMessageContaining("Partition key should not contains separator: '$'"); + } + + @Test + void testAddAndDropPartitionsForStaticPartition() throws Exception { + String dbName = DEFAULT_TABLE_PATH.getDatabaseName(); + TableDescriptor partitionedTable = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("id", DataTypes.STRING()) + .column("name", DataTypes.STRING()) + .column("age", DataTypes.INT()) + .build()) + .distributedBy(3, "id") + .partitionedBy("age") + .build(); + TablePath tablePath = TablePath.of(dbName, "test_add_and_drop_partitioned_table"); + admin.createTable(tablePath, partitionedTable, true).get(); + assertPartitionInfo(admin.listPartitionInfos(tablePath).get(), Collections.emptyList()); + + // add two partitions. + admin.addPartition( + tablePath, new PartitionSpec(Collections.singletonMap("age", "10")), false) + .get(); + admin.addPartition( + tablePath, new PartitionSpec(Collections.singletonMap("age", "11")), false) + .get(); + assertPartitionInfo(admin.listPartitionInfos(tablePath).get(), Arrays.asList("10", "11")); + + // drop one partition. + admin.dropPartition( + tablePath, new PartitionSpec(Collections.singletonMap("age", "10")), false) + .get(); + assertPartitionInfo(admin.listPartitionInfos(tablePath).get(), Arrays.asList("11")); + + // test add partition already exists with ignoreIfNotExists = false. + assertThatThrownBy( + () -> + admin.addPartition( + tablePath, + new PartitionSpec( + Collections.singletonMap("age", "11")), + false) + .get()) + .cause() + .isInstanceOf(PartitionAlreadyExistsException.class) + .hasMessageContaining( + "Partition '11' already exists for table test_db.test_add_and_drop_partitioned_table"); + + // test drop partition not-exists with ignoreIfNotExists = false. + assertThatThrownBy( + () -> + admin.dropPartition( + tablePath, + new PartitionSpec( + Collections.singletonMap("age", "13")), + false) + .get()) + .cause() + .isInstanceOf(PartitionNotExistException.class) + .hasMessageContaining( + "Partition '13' does not exist for table test_db.test_add_and_drop_partitioned_table"); + + // test add partition with illegal value. + assertThatThrownBy( + () -> + admin.addPartition( + tablePath, + new PartitionSpec( + Collections.singletonMap("age", "$10")), + false) + .get()) + .cause() + .isInstanceOf(PartitionSpecInvalidException.class) + .hasMessageContaining( + "The value of partition key should not contains separator: '$'"); + } + + @Test + void testAddAndDropPartitionsForAutoPartition() throws Exception { + String dbName = DEFAULT_TABLE_PATH.getDatabaseName(); + TableDescriptor partitionedTable = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("id", DataTypes.STRING()) + .column("name", DataTypes.STRING()) + .column("pt", DataTypes.STRING()) + .build()) + .distributedBy(3, "id") + .partitionedBy("pt") + .property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true) + .property( + ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, + AutoPartitionTimeUnit.YEAR) + .build(); + TablePath tablePath = TablePath.of(dbName, "test_add_and_drop_partitioned_table_1"); + admin.createTable(tablePath, partitionedTable, true).get(); + // wait all auto partitions created. + FLUSS_CLUSTER_EXTENSION.waitUtilPartitionAllReady(tablePath); + + // there are four auto created partitions. + int currentYear = LocalDate.now().getYear(); + assertPartitionInfo( + admin.listPartitionInfos(tablePath).get(), + Arrays.asList( + String.valueOf(currentYear), + String.valueOf(currentYear + 1), + String.valueOf(currentYear + 2), + String.valueOf(currentYear + 3))); + + // add two older partitions (currentYear - 2, currentYear - 1). + admin.addPartition( + tablePath, + new PartitionSpec( + Collections.singletonMap("pt", String.valueOf(currentYear - 2))), + false) + .get(); + admin.addPartition( + tablePath, + new PartitionSpec( + Collections.singletonMap("pt", String.valueOf(currentYear - 1))), + false) + .get(); + assertPartitionInfo( + admin.listPartitionInfos(tablePath).get(), + Arrays.asList( + String.valueOf(currentYear - 2), + String.valueOf(currentYear - 1), + String.valueOf(currentYear), + String.valueOf(currentYear + 1), + String.valueOf(currentYear + 2), + String.valueOf(currentYear + 3))); + + // drop one auto created partition. + admin.dropPartition( + tablePath, + new PartitionSpec( + Collections.singletonMap("pt", String.valueOf(currentYear + 1))), + false) + .get(); + assertPartitionInfo( + admin.listPartitionInfos(tablePath).get(), + Arrays.asList( + String.valueOf(currentYear - 2), + String.valueOf(currentYear - 1), + String.valueOf(currentYear), + String.valueOf(currentYear + 2), + String.valueOf(currentYear + 3))); + } + private void assertHasTabletServerNumber(int tabletServerNumber) { CoordinatorGateway coordinatorGateway = FLUSS_CLUSTER_EXTENSION.newCoordinatorClient(); retry( @@ -618,6 +796,21 @@ private void assertBucketSnapshot( assertThat(snapshotMetadata.getLogOffset()).isEqualTo(expectedSnapshot.getLogOffset()); } + private void assertPartitionInfo( + List partitionInfos, List expectedPartitionNames) { + if (expectedPartitionNames.isEmpty()) { + assertThat(partitionInfos.isEmpty()).isTrue(); + return; + } + + assertThat(partitionInfos.size()).isEqualTo(expectedPartitionNames.size()); + assertThat( + partitionInfos.stream() + .map(PartitionInfo::getPartitionName) + .collect(Collectors.toList())) + .containsExactlyInAnyOrderElementsOf(expectedPartitionNames); + } + private List toFsPathAndFileNames(KvSnapshotHandle kvSnapshotHandle) { return Stream.concat( kvSnapshotHandle.getSharedKvFileHandles().stream(), diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/table/AutoPartitionedTableITCase.java similarity index 86% rename from fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java rename to fluss-client/src/test/java/com/alibaba/fluss/client/table/AutoPartitionedTableITCase.java index 74211d16e..031d0fc35 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/table/AutoPartitionedTableITCase.java @@ -27,6 +27,8 @@ import com.alibaba.fluss.config.AutoPartitionTimeUnit; import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.exception.PartitionNotExistException; +import com.alibaba.fluss.metadata.PartitionInfo; +import com.alibaba.fluss.metadata.PartitionSpec; import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableBucket; @@ -41,12 +43,15 @@ import org.junit.jupiter.params.provider.ValueSource; import java.time.Duration; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH_PK; @@ -57,8 +62,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** IT case for Fluss partitioned table. */ -class FlussPartitionedTableITCase extends ClientToServerITCaseBase { +/** IT case for Fluss auto partitioned table. */ +class AutoPartitionedTableITCase extends ClientToServerITCaseBase { @Test void testPartitionedPrimaryKeyTable() throws Exception { @@ -343,6 +348,56 @@ void testOperateNotExistPartitionShouldThrowException() throws Exception { // exception and won't retry again and again } + @Test + void testAddPartitionForAutoPartitionedTable() throws Exception { + TablePath tablePath = + TablePath.of("test_db_1", "test_auto_partition_table_add_partition_1"); + Schema schema = createPartitionedTable(tablePath, false); + int currentYear = LocalDateTime.now().getYear(); + + // add one partition. + admin.addPartition( + tablePath, + new PartitionSpec( + Collections.singletonMap("c", String.valueOf(currentYear + 10))), + false) + .get(); + Map partitionIdByNames = + FLUSS_CLUSTER_EXTENSION.waitUtilPartitionAllReady(tablePath, 5); + + List partitionInfos = admin.listPartitionInfos(tablePath).get(); + assertThat(partitionInfos.size()).isEqualTo(5); + assertThat( + partitionInfos.stream() + .map(PartitionInfo::getPartitionName) + .collect(Collectors.toList())) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList( + String.valueOf(currentYear), + String.valueOf(currentYear + 1), + String.valueOf(currentYear + 2), + String.valueOf(currentYear + 3), + String.valueOf(currentYear + 10))); + + Table table = conn.getTable(tablePath); + AppendWriter appendWriter = table.newAppend().createWriter(); + int recordsPerPartition = 5; + Map> expectPartitionAppendRows = new HashMap<>(); + for (String partition : partitionIdByNames.keySet()) { + for (int i = 0; i < recordsPerPartition; i++) { + InternalRow row = row(schema.getRowType(), new Object[] {i, "a" + i, partition}); + appendWriter.append(row); + expectPartitionAppendRows + .computeIfAbsent(partitionIdByNames.get(partition), k -> new ArrayList<>()) + .add(row); + } + } + appendWriter.flush(); + + // then, let's verify the logs + verifyPartitionLogs(table, schema.getRowType(), expectPartitionAppendRows); + } + private Schema createPartitionedTable(TablePath tablePath, boolean isPrimaryTable) throws Exception { Schema.Builder schemaBuilder = diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/table/StaticPartitionedTableITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/table/StaticPartitionedTableITCase.java new file mode 100644 index 000000000..c088db198 --- /dev/null +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/table/StaticPartitionedTableITCase.java @@ -0,0 +1,232 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.table; + +import com.alibaba.fluss.client.admin.ClientToServerITCaseBase; +import com.alibaba.fluss.client.lookup.Lookuper; +import com.alibaba.fluss.client.table.writer.AppendWriter; +import com.alibaba.fluss.client.table.writer.UpsertWriter; +import com.alibaba.fluss.metadata.PartitionInfo; +import com.alibaba.fluss.metadata.PartitionSpec; +import com.alibaba.fluss.metadata.Schema; +import com.alibaba.fluss.metadata.TableDescriptor; +import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.types.DataType; +import com.alibaba.fluss.types.DataTypes; +import com.alibaba.fluss.utils.TypeUtils; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static com.alibaba.fluss.testutils.DataTestUtils.compactedRow; +import static com.alibaba.fluss.testutils.DataTestUtils.keyRow; +import static com.alibaba.fluss.testutils.DataTestUtils.row; +import static org.assertj.core.api.Assertions.assertThat; + +/** IT case for Fluss static partitioned table. */ +class StaticPartitionedTableITCase extends ClientToServerITCaseBase { + + @Test + void testPartitionedPrimaryKeyTable() throws Exception { + TablePath tablePath = TablePath.of("test_db_1", "test_static_partitioned_pk_table_1"); + Schema schema = createPartitionedTable(tablePath, true); + + List partitionInfos = admin.listPartitionInfos(tablePath).get(); + assertThat(partitionInfos.isEmpty()).isTrue(); + + // add three partitions. + for (int i = 0; i < 3; i++) { + admin.addPartition( + tablePath, + new PartitionSpec(Collections.singletonMap("c", "c" + i)), + false) + .get(); + } + partitionInfos = admin.listPartitionInfos(tablePath).get(); + assertThat(partitionInfos.size()).isEqualTo(3); + + Table table = conn.getTable(tablePath); + UpsertWriter upsertWriter = table.newUpsert().createWriter(); + int recordsPerPartition = 5; + // now, put some data to the partitions + Map> expectPutRows = new HashMap<>(); + for (PartitionInfo partitionInfo : partitionInfos) { + String partitionName = partitionInfo.getPartitionName(); + long partitionId = partitionInfo.getPartitionId(); + for (int j = 0; j < recordsPerPartition; j++) { + InternalRow row = + compactedRow(schema.getRowType(), new Object[] {j, "a" + j, partitionName}); + upsertWriter.upsert(row); + expectPutRows.computeIfAbsent(partitionId, k -> new ArrayList<>()).add(row); + } + } + + upsertWriter.flush(); + + Lookuper lookuper = table.newLookup().createLookuper(); + // now, let's lookup the written data by look up. + for (PartitionInfo partitionInfo : partitionInfos) { + String partitionName = partitionInfo.getPartitionName(); + for (int j = 0; j < recordsPerPartition; j++) { + InternalRow actualRow = + compactedRow(schema.getRowType(), new Object[] {j, "a" + j, partitionName}); + InternalRow lookupRow = + lookuper.lookup(keyRow(schema, new Object[] {j, null, partitionName})) + .get() + .getSingletonRow(); + assertThat(lookupRow).isEqualTo(actualRow); + } + } + + // then, let's scan and check the cdc log + verifyPartitionLogs(table, schema.getRowType(), expectPutRows); + } + + @Test + void testPartitionedLogTable() throws Exception { + TablePath tablePath = TablePath.of("test_db_1", "test_static_partitioned_log_table_1"); + Schema schema = createPartitionedTable(tablePath, false); + + List partitionInfos = admin.listPartitionInfos(tablePath).get(); + assertThat(partitionInfos.isEmpty()).isTrue(); + + // add three partitions. + for (int i = 0; i < 3; i++) { + admin.addPartition( + tablePath, + new PartitionSpec(Collections.singletonMap("c", "c" + i)), + false) + .get(); + } + partitionInfos = admin.listPartitionInfos(tablePath).get(); + assertThat(partitionInfos.size()).isEqualTo(3); + + Table table = conn.getTable(tablePath); + AppendWriter appendWriter = table.newAppend().createWriter(); + int recordsPerPartition = 5; + Map> expectPartitionAppendRows = new HashMap<>(); + for (PartitionInfo partitionInfo : partitionInfos) { + String partitionName = partitionInfo.getPartitionName(); + long partitionId = partitionInfo.getPartitionId(); + for (int j = 0; j < recordsPerPartition; j++) { + InternalRow row = + row(schema.getRowType(), new Object[] {j, "a" + j, partitionName}); + appendWriter.append(row); + expectPartitionAppendRows + .computeIfAbsent(partitionId, k -> new ArrayList<>()) + .add(row); + } + } + appendWriter.flush(); + + // then, let's verify the logs + verifyPartitionLogs(table, schema.getRowType(), expectPartitionAppendRows); + } + + @ParameterizedTest + @MethodSource("keyTypeAndSpec") + void testPartitionKeyType(DataType dataType, Object keySpec) throws Exception { + TablePath tablePath = + TablePath.of( + "test_db_1", + "test_static_partitioned_log_table_" + dataType.getTypeRoot().name()); + Schema.Builder schemaBuilder = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .column("c", dataType); + Schema schema = schemaBuilder.build(); + TableDescriptor partitionTableDescriptor = + TableDescriptor.builder().schema(schema).partitionedBy("c").build(); + createTable(tablePath, partitionTableDescriptor, false); + + List partitionInfos = admin.listPartitionInfos(tablePath).get(); + assertThat(partitionInfos.isEmpty()).isTrue(); + + // add one partitions. + admin.addPartition( + tablePath, + new PartitionSpec(Collections.singletonMap("c", keySpec.toString())), + false) + .get(); + partitionInfos = admin.listPartitionInfos(tablePath).get(); + assertThat(partitionInfos.size()).isEqualTo(1); + + Table table = conn.getTable(tablePath); + AppendWriter appendWriter = table.newAppend().createWriter(); + int recordSize = 5; + List expectResult = new ArrayList<>(); + for (int i = 0; i < recordSize; i++) { + InternalRow row = row(schema.getRowType(), new Object[] {i, "a" + i, keySpec}); + appendWriter.append(row); + expectResult.add(row); + } + appendWriter.flush(); + + // then, let's verify the logs + verifyPartitionLogs( + table, + schema.getRowType(), + Collections.singletonMap(partitionInfos.get(0).getPartitionId(), expectResult)); + } + + private Schema createPartitionedTable(TablePath tablePath, boolean isPrimaryTable) + throws Exception { + Schema.Builder schemaBuilder = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .withComment("a is first column") + .column("b", DataTypes.STRING()) + .withComment("b is second column") + .column("c", DataTypes.STRING()) + .withComment("c is third column"); + + if (isPrimaryTable) { + schemaBuilder.primaryKey("a", "c"); + } + + Schema schema = schemaBuilder.build(); + + TableDescriptor partitionTableDescriptor = + TableDescriptor.builder().schema(schema).partitionedBy("c").build(); + createTable(tablePath, partitionTableDescriptor, false); + return schema; + } + + private static Collection keyTypeAndSpec() { + return Arrays.asList( + Arguments.arguments(DataTypes.INT(), 1), + Arguments.arguments(DataTypes.STRING(), "c0"), + Arguments.arguments(DataTypes.BOOLEAN(), true), + Arguments.arguments( + DataTypes.DATE(), TypeUtils.castFromString("2023-10-25", DataTypes.DATE())), + Arguments.arguments( + DataTypes.TIME(), + TypeUtils.castFromString("09:00:00.0", DataTypes.TIME()))); + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/exception/PartitionAlreadyExistsException.java b/fluss-common/src/main/java/com/alibaba/fluss/exception/PartitionAlreadyExistsException.java new file mode 100644 index 000000000..3b1864b73 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/exception/PartitionAlreadyExistsException.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.exception; + +import com.alibaba.fluss.annotation.PublicEvolving; + +/** + * Exception for trying to add a partition that already exists. + * + * @since 0.6 + */ +@PublicEvolving +public class PartitionAlreadyExistsException extends ApiException { + public PartitionAlreadyExistsException(String message) { + this(message, null); + } + + public PartitionAlreadyExistsException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/exception/PartitionSpecInvalidException.java b/fluss-common/src/main/java/com/alibaba/fluss/exception/PartitionSpecInvalidException.java new file mode 100644 index 000000000..c437d4062 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/exception/PartitionSpecInvalidException.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.exception; + +import com.alibaba.fluss.annotation.PublicEvolving; + +/** + * Exception for invalid PartitionSpec compared with partition key list of a partitioned Table. For + * example, it is thrown when the size of PartitionFiledSpec in request AddPartition exceeds the + * size of partition key list. + * + * @since 0.6 + */ +@PublicEvolving +public class PartitionSpecInvalidException extends ApiException { + + public PartitionSpecInvalidException(String message) { + this(message, null); + } + + public PartitionSpecInvalidException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/PartitionSpec.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/PartitionSpec.java new file mode 100644 index 000000000..7e4b4c469 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/PartitionSpec.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.metadata; + +import com.alibaba.fluss.annotation.PublicEvolving; + +import java.util.Collections; +import java.util.Map; + +import static com.alibaba.fluss.utils.Preconditions.checkNotNull; + +/** + * Represents a partition spec in fluss. Partition columns and values are NOT of strict order, and + * they need to be re-arranged to the correct order by comparing with a list of strictly ordered + * partition keys. + * + * @since 0.6 + */ +@PublicEvolving +public class PartitionSpec { + + // An unmodifiable map as + private final Map partitionSpec; + + public PartitionSpec(Map partitionSpec) { + checkNotNull(partitionSpec, "partitionSpec cannot be null"); + this.partitionSpec = Collections.unmodifiableMap(partitionSpec); + } + + public Map getPartitionSpec() { + return partitionSpec; + } + + @Override + public String toString() { + return "PartitionSpec{" + "partitionSpec=" + partitionSpec + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PartitionSpec that = (PartitionSpec) o; + return partitionSpec.equals(that.partitionSpec); + } + + @Override + public int hashCode() { + return partitionSpec.hashCode(); + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/PhysicalTablePath.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/PhysicalTablePath.java index a09840987..5842c014b 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metadata/PhysicalTablePath.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/PhysicalTablePath.java @@ -85,7 +85,7 @@ public String getPartitionName() { */ public boolean isValid() { return getTablePath().isValid() - && (partitionName == null || detectInvalidName(partitionName) == null); + && (partitionName == null || detectInvalidName(partitionName, true) == null); } @Override diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TablePath.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TablePath.java index 25e01f0fe..10840fa42 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TablePath.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TablePath.java @@ -141,6 +141,10 @@ public static void validateTableName(String tableName) throws InvalidTableExcept } static String detectInvalidName(String identifier) { + return detectInvalidName(identifier, false); + } + + static String detectInvalidName(String identifier, boolean isPartitionName) { if (identifier == null) { return "null string is not allowed"; } @@ -159,7 +163,7 @@ static String detectInvalidName(String identifier) { + "' is longer than the max allowed length " + MAX_NAME_LENGTH; } - if (containsInvalidPattern(identifier)) { + if (containsInvalidPattern(identifier, isPartitionName)) { return "'" + identifier + "' contains one or more characters other than " @@ -169,7 +173,7 @@ static String detectInvalidName(String identifier) { } /** Valid characters for Fluss table names are the ASCII alphanumerics, '_', and '-'. */ - private static boolean containsInvalidPattern(String identifier) { + private static boolean containsInvalidPattern(String identifier, boolean isPartitionName) { for (int i = 0; i < identifier.length(); ++i) { char c = identifier.charAt(i); @@ -180,6 +184,12 @@ private static boolean containsInvalidPattern(String identifier) { || (c >= 'A' && c <= 'Z') || c == '_' || c == '-'; + + // partition name can contain '$' as separator. + if (isPartitionName && (c == '$')) { + validChar = true; + } + if (!validChar) { return true; } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/utils/AutoPartitionUtils.java b/fluss-common/src/main/java/com/alibaba/fluss/utils/AutoPartitionUtils.java deleted file mode 100644 index 40379cf1b..000000000 --- a/fluss-common/src/main/java/com/alibaba/fluss/utils/AutoPartitionUtils.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright (c) 2024 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.fluss.utils; - -import com.alibaba.fluss.config.AutoPartitionTimeUnit; - -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; - -/** Utils for auto partition. */ -public class AutoPartitionUtils { - - private static final String YEAR_FORMAT = "yyyy"; - private static final String QUARTER_FORMAT = "yyyyQ"; - private static final String MONTH_FORMAT = "yyyyMM"; - private static final String DAY_FORMAT = "yyyyMMdd"; - private static final String HOUR_FORMAT = "yyyyMMddHH"; - - public static String getPartitionString( - ZonedDateTime current, int offset, AutoPartitionTimeUnit timeUnit) { - switch (timeUnit) { - case YEAR: - return getFormattedTime(current.plusYears(offset), YEAR_FORMAT); - case QUARTER: - return getFormattedTime(current.plusMonths(offset * 3L), QUARTER_FORMAT); - case MONTH: - return getFormattedTime(current.plusMonths(offset), MONTH_FORMAT); - case DAY: - return getFormattedTime(current.plusDays(offset), DAY_FORMAT); - case HOUR: - return getFormattedTime(current.plusHours(offset), HOUR_FORMAT); - default: - throw new IllegalArgumentException("Unsupported time unit: " + timeUnit); - } - } - - private static String getFormattedTime(ZonedDateTime zonedDateTime, String format) { - return DateTimeFormatter.ofPattern(format).format(zonedDateTime); - } -} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/utils/PartitionUtils.java b/fluss-common/src/main/java/com/alibaba/fluss/utils/PartitionUtils.java new file mode 100644 index 000000000..a1c7d02a0 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/utils/PartitionUtils.java @@ -0,0 +1,148 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.utils; + +import com.alibaba.fluss.config.AutoPartitionTimeUnit; +import com.alibaba.fluss.exception.PartitionSpecInvalidException; +import com.alibaba.fluss.types.DataTypeRoot; + +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static com.alibaba.fluss.utils.Preconditions.checkArgument; + +/** Utils for partition. */ +public class PartitionUtils { + public static final String PARTITION_SPEC_SEPARATOR = "$"; + + public static final List STATIC_PARTITION_KEY_SUPPORTED_TYPES = + Arrays.asList( + DataTypeRoot.STRING, + DataTypeRoot.INTEGER, + DataTypeRoot.BOOLEAN, + DataTypeRoot.DATE, + DataTypeRoot.TIME_WITHOUT_TIME_ZONE); + + public static final List AUTO_PARTITION_KEY_SUPPORTED_TYPES = + Collections.singletonList(DataTypeRoot.STRING); + + private static final String YEAR_FORMAT = "yyyy"; + private static final String QUARTER_FORMAT = "yyyyQ"; + private static final String MONTH_FORMAT = "yyyyMM"; + private static final String DAY_FORMAT = "yyyyMMdd"; + private static final String HOUR_FORMAT = "yyyyMMddHH"; + + /** + * Returns the partition name for a partition table of specify partition spec. + * + *

The partition name is in the following format: + * + *

+     * spec1$spec2$...$specN
+     * 
+ * + *

For example, if the partition keys are [a, b, c], and the partition spec is [1, 2, 3], the + * partition name is "1$2$3". + * + *

Currently, we only support one partition key. So the partition name is in the following + * format: + * + *

+     * spec
+     * 
+ * + *

For example, if the partition keys are [a], and the partition spec is [1], the partition + * name is "1". + * + * @param partitionKeys the partition keys + * @param partitionSpecs the partition specs + */ + public static String getPartitionName(List partitionKeys, List partitionSpecs) { + return getPartitionName(partitionKeys, partitionSpecs, true); + } + + public static String getPartitionName( + List partitionKeys, List partitionSpecs, boolean checkValid) { + checkArgument( + partitionKeys.size() == partitionSpecs.size(), + "The number of partition keys and partition specs should be the same."); + + for (String value : partitionSpecs) { + if (checkValid && value.contains(PARTITION_SPEC_SEPARATOR)) { + throw new PartitionSpecInvalidException( + "The value of partition key should not contains separator: '" + + PARTITION_SPEC_SEPARATOR + + "'"); + } + } + return String.join(PARTITION_SPEC_SEPARATOR, partitionSpecs); + } + + /** + * Generate auto partition name in server. When we auto creating a partition, we need to + * generate a partition name. As we only support one partition key now, the partition name is in + * the following format: + * + *

+     * value
+     * 
+ * + *

The value is the formatted time with the specified time unit. + * + * @param partitionKeys the partition keys + * @param current the current time + * @param offset the offset + * @param timeUnit the time unit + * @return the auto partition name + */ + public static String generateAutoPartitionName( + List partitionKeys, + ZonedDateTime current, + int offset, + AutoPartitionTimeUnit timeUnit) { + String autoPartitionFieldSpec; + switch (timeUnit) { + case YEAR: + autoPartitionFieldSpec = getFormattedTime(current.plusYears(offset), YEAR_FORMAT); + break; + case QUARTER: + autoPartitionFieldSpec = + getFormattedTime(current.plusMonths(offset * 3L), QUARTER_FORMAT); + break; + case MONTH: + autoPartitionFieldSpec = getFormattedTime(current.plusMonths(offset), MONTH_FORMAT); + break; + case DAY: + autoPartitionFieldSpec = getFormattedTime(current.plusDays(offset), DAY_FORMAT); + break; + case HOUR: + autoPartitionFieldSpec = getFormattedTime(current.plusHours(offset), HOUR_FORMAT); + break; + default: + throw new IllegalArgumentException("Unsupported time unit: " + timeUnit); + } + + return getPartitionName(partitionKeys, Collections.singletonList(autoPartitionFieldSpec)); + } + + private static String getFormattedTime(ZonedDateTime zonedDateTime, String format) { + return DateTimeFormatter.ofPattern(format).format(zonedDateTime); + } +} diff --git a/fluss-common/src/test/java/com/alibaba/fluss/utils/AutoPartitionUtilsTest.java b/fluss-common/src/test/java/com/alibaba/fluss/utils/PartitionUtilsTest.java similarity index 58% rename from fluss-common/src/test/java/com/alibaba/fluss/utils/AutoPartitionUtilsTest.java rename to fluss-common/src/test/java/com/alibaba/fluss/utils/PartitionUtilsTest.java index 1f6b59bf4..d363817e6 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/utils/AutoPartitionUtilsTest.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/utils/PartitionUtilsTest.java @@ -17,47 +17,76 @@ package com.alibaba.fluss.utils; import com.alibaba.fluss.config.AutoPartitionTimeUnit; +import com.alibaba.fluss.exception.PartitionSpecInvalidException; import org.junit.jupiter.api.Test; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.ZonedDateTime; +import java.util.Arrays; +import java.util.Collections; +import static com.alibaba.fluss.utils.PartitionUtils.generateAutoPartitionName; +import static com.alibaba.fluss.utils.PartitionUtils.getPartitionName; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** Test for {@link com.alibaba.fluss.utils.AutoPartitionUtils}. */ -class AutoPartitionUtilsTest { +/** Test for {@link PartitionUtils}. */ +class PartitionUtilsTest { @Test - void testGetPartitionString() { + void testGetPartitionName() { + assertThat(getPartitionName(Collections.singletonList("a"), Collections.singletonList("1"))) + .isEqualTo("1"); + + assertThat(getPartitionName(Arrays.asList("a", "b"), Arrays.asList("1", "2"))) + .isEqualTo("1$2"); + + assertThatThrownBy( + () -> + getPartitionName( + Arrays.asList("a", "b", "c"), Arrays.asList("1", "2"))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "The number of partition keys and partition specs should be the same."); + + assertThatThrownBy( + () -> getPartitionName(Arrays.asList("a", "b"), Arrays.asList("$1", "2"))) + .isInstanceOf(PartitionSpecInvalidException.class) + .hasMessageContaining( + "The value of partition key should not contains separator: '$'"); + } + + @Test + void testGenerateAutoPartitionName() { LocalDateTime localDateTime = LocalDateTime.of(2024, 11, 11, 11, 11); ZoneId zoneId = ZoneId.of("UTC-8"); ZonedDateTime zonedDateTime = ZonedDateTime.of(localDateTime, zoneId); // for year - testGetPartitionString( + testGenerateAutoPartitionName( zonedDateTime, AutoPartitionTimeUnit.YEAR, new int[] {-1, 0, 1, 2, 3}, new String[] {"2023", "2024", "2025", "2026", "2027"}); // for quarter - testGetPartitionString( + testGenerateAutoPartitionName( zonedDateTime, AutoPartitionTimeUnit.QUARTER, new int[] {-1, 0, 1, 2, 3}, new String[] {"20243", "20244", "20251", "20252", "20253"}); // for month - testGetPartitionString( + testGenerateAutoPartitionName( zonedDateTime, AutoPartitionTimeUnit.MONTH, new int[] {-1, 0, 1, 2, 3}, new String[] {"202410", "202411", "202412", "202501", "202502"}); // for day - testGetPartitionString( + testGenerateAutoPartitionName( zonedDateTime, AutoPartitionTimeUnit.DAY, new int[] {-1, 0, 1, 2, 3, 20}, @@ -66,7 +95,7 @@ void testGetPartitionString() { }); // for hour - testGetPartitionString( + testGenerateAutoPartitionName( zonedDateTime, AutoPartitionTimeUnit.HOUR, new int[] {-2, -1, 0, 1, 2, 3, 13}, @@ -81,15 +110,18 @@ void testGetPartitionString() { }); } - void testGetPartitionString( + void testGenerateAutoPartitionName( ZonedDateTime zonedDateTime, AutoPartitionTimeUnit autoPartitionTimeUnit, int[] offsets, String[] expected) { for (int i = 0; i < offsets.length; i++) { String partitionString = - AutoPartitionUtils.getPartitionString( - zonedDateTime, offsets[i], autoPartitionTimeUnit); + generateAutoPartitionName( + Collections.singletonList("ds"), + zonedDateTime, + offsets[i], + autoPartitionTimeUnit); assertThat(partitionString).isEqualTo(expected[i]); } } diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java index 0163dd021..560d53b64 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java @@ -26,6 +26,8 @@ import com.alibaba.fluss.connector.flink.utils.FlinkConversions; import com.alibaba.fluss.exception.FlussRuntimeException; import com.alibaba.fluss.metadata.DatabaseDescriptor; +import com.alibaba.fluss.metadata.PartitionInfo; +import com.alibaba.fluss.metadata.PartitionSpec; import com.alibaba.fluss.metadata.TableDescriptor; import com.alibaba.fluss.metadata.TableInfo; import com.alibaba.fluss.metadata.TablePath; @@ -63,6 +65,7 @@ import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -70,6 +73,11 @@ import java.util.Optional; import static com.alibaba.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS; +import static com.alibaba.fluss.connector.flink.utils.CatalogExceptionUtils.isPartitionAlreadyExists; +import static com.alibaba.fluss.connector.flink.utils.CatalogExceptionUtils.isPartitionNotExist; +import static com.alibaba.fluss.connector.flink.utils.CatalogExceptionUtils.isPartitionSpecInvalid; +import static com.alibaba.fluss.connector.flink.utils.CatalogExceptionUtils.isTableNotExist; +import static com.alibaba.fluss.connector.flink.utils.CatalogExceptionUtils.isTableNotPartitioned; import static com.alibaba.fluss.connector.flink.utils.FlinkConversions.toFlussDatabase; import static org.apache.flink.util.Preconditions.checkArgument; @@ -275,7 +283,7 @@ public CatalogBaseTable getTable(ObjectPath objectPath) return catalogTable.copy(newOptions); } catch (Exception e) { Throwable t = ExceptionUtils.stripExecutionException(e); - if (CatalogExceptionUtils.isTableNotExist(t)) { + if (isTableNotExist(t)) { throw new TableNotExistException(getName(), objectPath); } else { throw new CatalogException( @@ -319,7 +327,7 @@ public void dropTable(ObjectPath objectPath, boolean ignoreIfNotExists) admin.deleteTable(tablePath, ignoreIfNotExists).get(); } catch (Exception e) { Throwable t = ExceptionUtils.stripExecutionException(e); - if (CatalogExceptionUtils.isTableNotExist(t)) { + if (isTableNotExist(t)) { throw new TableNotExistException(getName(), objectPath); } else { throw new CatalogException( @@ -371,8 +379,37 @@ public void alterTable(ObjectPath objectPath, CatalogBaseTable catalogBaseTable, @Override public List listPartitions(ObjectPath objectPath) throws TableNotExistException, TableNotPartitionedException, CatalogException { - // TODO: use admin.listPartitionInfos() - return Collections.emptyList(); + try { + TablePath tablePath = toTablePath(objectPath); + TableInfo tableInfo = admin.getTableInfo(tablePath).get(); + List partitionKeys = tableInfo.getPartitionKeys(); + + List partitionInfos = admin.listPartitionInfos(tablePath).get(); + List catalogPartitionSpecs = new ArrayList<>(); + for (PartitionInfo partitionInfo : partitionInfos) { + String[] partitionSpec = partitionInfo.getPartitionName().split("\\$"); + checkArgument(partitionKeys.size() == partitionSpec.length); + Map flinkPartitionSpec = new HashMap<>(); + for (int i = 0; i < partitionKeys.size(); i++) { + flinkPartitionSpec.put(partitionKeys.get(i), partitionSpec[i]); + } + catalogPartitionSpecs.add(new CatalogPartitionSpec(flinkPartitionSpec)); + } + return catalogPartitionSpecs; + } catch (Exception e) { + Throwable t = ExceptionUtils.stripExecutionException(e); + if (isTableNotExist(t)) { + throw new TableNotExistException(getName(), objectPath); + } else if (isTableNotPartitioned(t)) { + throw new TableNotPartitionedException(getName(), objectPath); + } else { + throw new CatalogException( + String.format( + "Failed to list partitions of table %s in %s", + objectPath, getName()), + t); + } + } } @Override @@ -412,14 +449,51 @@ public void createPartition( throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException { - throw new UnsupportedOperationException(); + PartitionSpec partitionSpec = new PartitionSpec(catalogPartitionSpec.getPartitionSpec()); + try { + admin.addPartition(toTablePath(objectPath), partitionSpec, b).get(); + } catch (Exception e) { + Throwable t = ExceptionUtils.stripExecutionException(e); + if (isTableNotExist(t)) { + throw new TableNotExistException(getName(), objectPath); + } else if (isTableNotPartitioned(t)) { + throw new TableNotPartitionedException(getName(), objectPath); + } else if (isPartitionSpecInvalid(t)) { + throw new PartitionSpecInvalidException( + getName(), null, objectPath, catalogPartitionSpec, e); + } else if (isPartitionAlreadyExists(t)) { + throw new PartitionAlreadyExistsException( + getName(), objectPath, catalogPartitionSpec); + } else { + throw new CatalogException( + String.format( + "Failed to create partition with partition spec %s of table %s in %s", + catalogPartitionSpec, objectPath, getName()), + t); + } + } } @Override public void dropPartition( ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec, boolean b) throws PartitionNotExistException, CatalogException { - throw new UnsupportedOperationException(); + PartitionSpec partitionSpec = new PartitionSpec(catalogPartitionSpec.getPartitionSpec()); + try { + admin.dropPartition(toTablePath(objectPath), partitionSpec, b).get(); + } catch (Exception e) { + Throwable t = ExceptionUtils.stripExecutionException(e); + if (isPartitionNotExist(t)) { + throw new PartitionNotExistException( + getName(), objectPath, catalogPartitionSpec, e); + } else { + throw new CatalogException( + String.format( + "Failed to drop partition with partition spec %s of table %s in %s", + catalogPartitionSpec, objectPath, getName()), + t); + } + } } @Override diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/CatalogExceptionUtils.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/CatalogExceptionUtils.java index f033ead9c..3413b42ae 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/CatalogExceptionUtils.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/CatalogExceptionUtils.java @@ -19,8 +19,12 @@ import com.alibaba.fluss.exception.DatabaseAlreadyExistException; import com.alibaba.fluss.exception.DatabaseNotEmptyException; import com.alibaba.fluss.exception.DatabaseNotExistException; +import com.alibaba.fluss.exception.PartitionAlreadyExistsException; +import com.alibaba.fluss.exception.PartitionNotExistException; +import com.alibaba.fluss.exception.PartitionSpecInvalidException; import com.alibaba.fluss.exception.TableAlreadyExistException; import com.alibaba.fluss.exception.TableNotExistException; +import com.alibaba.fluss.exception.TableNotPartitionedException; /** Utility class for catalog exceptions. */ public class CatalogExceptionUtils { @@ -46,4 +50,20 @@ public static boolean isTableNotExist(Throwable throwable) { public static boolean isTableAlreadyExist(Throwable throwable) { return throwable instanceof TableAlreadyExistException; } + + public static boolean isTableNotPartitioned(Throwable throwable) { + return throwable instanceof TableNotPartitionedException; + } + + public static boolean isPartitionAlreadyExists(Throwable throwable) { + return throwable instanceof PartitionAlreadyExistsException; + } + + public static boolean isPartitionNotExist(Throwable throwable) { + return throwable instanceof PartitionNotExistException; + } + + public static boolean isPartitionSpecInvalid(Throwable throwable) { + return throwable instanceof PartitionSpecInvalidException; + } } diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogITCase.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogITCase.java index 0a38415e7..c2b6682a4 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogITCase.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogITCase.java @@ -18,7 +18,8 @@ import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; -import com.alibaba.fluss.exception.InvalidConfigException; +import com.alibaba.fluss.exception.InvalidTableException; +import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.server.testutils.FlussClusterExtension; import org.apache.flink.table.api.DataTypes; @@ -28,6 +29,7 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogPartitionSpec; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.exceptions.CatalogException; @@ -39,6 +41,9 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import java.time.LocalDate; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -154,17 +159,17 @@ void testCreateTable() throws Exception { @Test void testCreateUnSupportedTable() { - // test unsupported table, partitioned table without auto partitioned enabled + // test unsupported table, partitioned table with illegal partition key. assertThatThrownBy( () -> tEnv.executeSql( "create table test_table_unsupported" - + " (a int, b int) partitioned by (b)")) + + " (a int, $b int) partitioned by ($b)")) .cause() .isInstanceOf(CatalogException.class) .cause() - .isInstanceOf(InvalidConfigException.class) - .hasMessageContaining("Currently, partitioned table must enable auto partition"); + .isInstanceOf(InvalidTableException.class) + .hasMessageContaining("Partition key should not contains separator: '$'"); // test invalid property assertThatThrownBy( @@ -195,19 +200,93 @@ void testCreateNoPkTable() throws Exception { } @Test - void testCreatePartitionedTable() throws Exception { + void testStaticPartitionedTable() throws Exception { + ObjectPath objectPath = new ObjectPath(DEFAULT_DB, "test_static_partitioned_table"); + + // 1. first create. tEnv.executeSql( - "create table test_partitioned_table (a int, b string) partitioned by (b) " + "create table test_static_partitioned_table (a int, b int) partitioned by (b)"); + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("a", DataTypes.INT()).column("b", DataTypes.INT()); + Schema expectedSchema = schemaBuilder.build(); + CatalogTable table = (CatalogTable) catalog.getTable(objectPath); + assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema); + List partitionKeys = table.getPartitionKeys(); + assertThat(partitionKeys).isEqualTo(Collections.singletonList("b")); + + // 2. add partitions. + tEnv.executeSql("alter table test_static_partitioned_table add partition (b = 1)"); + tEnv.executeSql("alter table test_static_partitioned_table add partition (b = 2)"); + tEnv.executeSql("alter table test_static_partitioned_table add partition (b = 3)"); + assertCatalogPartitionSpecsEquals( + catalog.listPartitions(objectPath), partitionKeys, Arrays.asList("1", "2", "3")); + + // 3. drop partitions. + tEnv.executeSql("alter table test_static_partitioned_table drop partition (b = 1)"); + assertCatalogPartitionSpecsEquals( + catalog.listPartitions(objectPath), partitionKeys, Arrays.asList("2", "3")); + } + + @Test + void testAutoPartitionedTable() throws Exception { + ObjectPath objectPath = new ObjectPath(DEFAULT_DB, "test_auto_partitioned_table"); + + // 1. test add table. + tEnv.executeSql( + "create table test_auto_partitioned_table (a int, b string) partitioned by (b) " + "with ('table.auto-partition.enabled' = 'true'," - + " 'table.auto-partition.time-unit' = 'day')"); + + " 'table.auto-partition.time-unit' = 'year')"); Schema.Builder schemaBuilder = Schema.newBuilder(); schemaBuilder.column("a", DataTypes.INT()).column("b", DataTypes.STRING()); Schema expectedSchema = schemaBuilder.build(); - CatalogTable table = - (CatalogTable) - catalog.getTable(new ObjectPath(DEFAULT_DB, "test_partitioned_table")); + CatalogTable table = (CatalogTable) catalog.getTable(objectPath); assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema); - assertThat(table.getPartitionKeys()).isEqualTo(Collections.singletonList("b")); + List partitionKeys = table.getPartitionKeys(); + assertThat(partitionKeys).isEqualTo(Collections.singletonList("b")); + + TablePath tablePath = new TablePath(DEFAULT_DB, "test_auto_partitioned_table"); + FLUSS_CLUSTER_EXTENSION.waitUtilPartitionAllReady(tablePath); + int currentYear = LocalDate.now().getYear(); + assertCatalogPartitionSpecsEquals( + catalog.listPartitions(objectPath), + partitionKeys, + Arrays.asList( + String.valueOf(currentYear), + String.valueOf(currentYear + 1), + String.valueOf(currentYear + 2), + String.valueOf(currentYear + 3))); + + // 2. test add partitions. + tEnv.executeSql( + String.format( + "alter table test_auto_partitioned_table add partition (b = '%s')", + currentYear + 10)); + assertCatalogPartitionSpecsEquals( + catalog.listPartitions(objectPath), + partitionKeys, + Arrays.asList( + String.valueOf(currentYear), + String.valueOf(currentYear + 1), + String.valueOf(currentYear + 2), + String.valueOf(currentYear + 3), + String.valueOf(currentYear + 10))); + + // 3. test drop partitions. + tEnv.executeSql( + String.format( + "alter table test_auto_partitioned_table drop partition (b = '%s')", + currentYear + 2)); + tEnv.executeSql( + String.format( + "alter table test_auto_partitioned_table drop partition (b = '%s')", + currentYear + 10)); + assertCatalogPartitionSpecsEquals( + catalog.listPartitions(objectPath), + partitionKeys, + Arrays.asList( + String.valueOf(currentYear), + String.valueOf(currentYear + 1), + String.valueOf(currentYear + 3))); } @Test @@ -319,4 +398,23 @@ private static void assertOptionsEqual( assertThat(actualOptions.size()).isEqualTo(expectedOptions.size()); assertThat(actualOptions).isEqualTo(expectedOptions); } + + private void assertCatalogPartitionSpecsEquals( + List catalogPartitionSpecs, + List partitionKeys, + List expectedPartitionNames) { + assertThat(catalogPartitionSpecs.size()).isEqualTo(expectedPartitionNames.size()); + List actualPartitionNames = new ArrayList<>(); + for (CatalogPartitionSpec catalogPartitionSpec : catalogPartitionSpecs) { + Map actualPartitionSpec = catalogPartitionSpec.getPartitionSpec(); + List orderedPartitionSpec = new ArrayList<>(); + for (String partitionKey : partitionKeys) { + assertThat(actualPartitionSpec.containsKey(partitionKey)).isTrue(); + orderedPartitionSpec.add(actualPartitionSpec.get(partitionKey)); + } + actualPartitionNames.add(String.join("$", orderedPartitionSpec)); + } + assertThat(actualPartitionNames) + .containsExactlyInAnyOrderElementsOf(expectedPartitionNames); + } } diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminGateway.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminGateway.java index 3ab96912d..d5f200d17 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminGateway.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminGateway.java @@ -16,12 +16,16 @@ package com.alibaba.fluss.rpc.gateway; +import com.alibaba.fluss.rpc.messages.AddPartitionRequest; +import com.alibaba.fluss.rpc.messages.AddPartitionResponse; import com.alibaba.fluss.rpc.messages.CreateDatabaseRequest; import com.alibaba.fluss.rpc.messages.CreateDatabaseResponse; import com.alibaba.fluss.rpc.messages.CreateTableRequest; import com.alibaba.fluss.rpc.messages.CreateTableResponse; import com.alibaba.fluss.rpc.messages.DropDatabaseRequest; import com.alibaba.fluss.rpc.messages.DropDatabaseResponse; +import com.alibaba.fluss.rpc.messages.DropPartitionRequest; +import com.alibaba.fluss.rpc.messages.DropPartitionResponse; import com.alibaba.fluss.rpc.messages.DropTableRequest; import com.alibaba.fluss.rpc.messages.DropTableResponse; import com.alibaba.fluss.rpc.protocol.ApiKeys; @@ -63,5 +67,21 @@ public interface AdminGateway extends AdminReadOnlyGateway { @RPC(api = ApiKeys.DROP_TABLE) CompletableFuture dropTable(DropTableRequest request); + /** + * Add a partition to a partitioned table. + * + * @param request Add partition request + */ + @RPC(api = ApiKeys.ADD_PARTITION) + CompletableFuture addPartition(AddPartitionRequest request); + + /** + * Drop a partition from a partitioned table. + * + * @param request Drop partition request + */ + @RPC(api = ApiKeys.DROP_PARTITION) + CompletableFuture dropPartition(DropPartitionRequest request); + // todo: rename table & alter table } diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java index 1d8035b51..2b5d76b41 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java @@ -63,7 +63,9 @@ public enum ApiKeys { GET_LATEST_LAKE_SNAPSHOT(1033, 0, 0, PUBLIC), LIMIT_SCAN(1034, 0, 0, PUBLIC), PREFIX_LOOKUP(1035, 0, 0, PUBLIC), - GET_DATABASE_INFO(1036, 0, 0, PUBLIC); + GET_DATABASE_INFO(1036, 0, 0, PUBLIC), + ADD_PARTITION(1037, 0, 0, PUBLIC), + DROP_PARTITION(1038, 0, 0, PUBLIC); private static final Map ID_TO_TYPE = Arrays.stream(ApiKeys.values()) diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java index e4714cddc..08cffc63c 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java @@ -46,7 +46,9 @@ import com.alibaba.fluss.exception.NotLeaderOrFollowerException; import com.alibaba.fluss.exception.OperationNotAttemptedException; import com.alibaba.fluss.exception.OutOfOrderSequenceException; +import com.alibaba.fluss.exception.PartitionAlreadyExistsException; import com.alibaba.fluss.exception.PartitionNotExistException; +import com.alibaba.fluss.exception.PartitionSpecInvalidException; import com.alibaba.fluss.exception.RecordTooLargeException; import com.alibaba.fluss.exception.SchemaNotExistException; import com.alibaba.fluss.exception.SecurityTokenException; @@ -176,7 +178,11 @@ public enum Errors { INVALID_CONFIG_EXCEPTION(39, "The config is invalid.", InvalidConfigException::new), LAKE_STORAGE_NOT_CONFIGURED_EXCEPTION( 40, "The lake storage is not configured.", LakeStorageNotConfiguredException::new), - KV_SNAPSHOT_NOT_EXIST(41, "The kv snapshot is not exist.", KvSnapshotNotExistException::new); + KV_SNAPSHOT_NOT_EXIST(41, "The kv snapshot is not exist.", KvSnapshotNotExistException::new), + PARTITION_ALREADY_EXISTS( + 42, "The partition already exists.", PartitionAlreadyExistsException::new), + PARTITION_SPEC_INVALID_EXCEPTION( + 43, "The partition spec is invalid.", PartitionSpecInvalidException::new); private static final Logger LOG = LoggerFactory.getLogger(Errors.class); diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index e30d58046..2b980276f 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -382,6 +382,24 @@ message ListPartitionInfosResponse { repeated PbPartitionInfo partitions_info = 1; } +// add partition request and response +message AddPartitionRequest { + required PbTablePath table_path = 1; + repeated PbPartitionSpec partition_spec = 2; + required bool ignore_if_not_exists = 3; +} + +message AddPartitionResponse {} + +// drop partition request and response +message DropPartitionRequest { + required PbTablePath table_path = 1; + repeated PbPartitionSpec partition_spec = 2; + required bool ignore_if_not_exists = 3; +} + +message DropPartitionResponse {} + // commit remote log manifest request and response message CommitRemoteLogManifestRequest { required int64 table_id = 1; @@ -718,6 +736,11 @@ message PbPartitionInfo { required string partition_name = 2; } +message PbPartitionSpec { + required string partition_key = 1; + required string value = 2; +} + message PbLakeStorageInfo { required string lake_storage_type = 1; repeated PbKeyValue catalog_properties = 2; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/AutoPartitionManager.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/AutoPartitionManager.java deleted file mode 100644 index 652358c7e..000000000 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/AutoPartitionManager.java +++ /dev/null @@ -1,292 +0,0 @@ -/* - * Copyright (c) 2024 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.fluss.server.coordinator; - -import com.alibaba.fluss.annotation.VisibleForTesting; -import com.alibaba.fluss.cluster.MetadataCache; -import com.alibaba.fluss.config.AutoPartitionTimeUnit; -import com.alibaba.fluss.config.ConfigOptions; -import com.alibaba.fluss.config.Configuration; -import com.alibaba.fluss.metadata.TableInfo; -import com.alibaba.fluss.metadata.TablePath; -import com.alibaba.fluss.server.utils.TableAssignmentUtils; -import com.alibaba.fluss.server.zk.ZooKeeperClient; -import com.alibaba.fluss.server.zk.data.BucketAssignment; -import com.alibaba.fluss.server.zk.data.PartitionAssignment; -import com.alibaba.fluss.utils.AutoPartitionStrategy; -import com.alibaba.fluss.utils.clock.Clock; -import com.alibaba.fluss.utils.clock.SystemClock; -import com.alibaba.fluss.utils.concurrent.ExecutorThreadFactory; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.concurrent.GuardedBy; - -import java.time.Instant; -import java.time.ZonedDateTime; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.NavigableSet; -import java.util.Set; -import java.util.TreeSet; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import static com.alibaba.fluss.utils.AutoPartitionUtils.getPartitionString; -import static com.alibaba.fluss.utils.concurrent.LockUtils.inLock; - -/** - * An auto partition manager which will trigger auto partition for the tables in cluster - * periodically. It'll use a {@link ScheduledExecutorService} to schedule the auto partition which - * will trigger auto partition for them. - */ -public class AutoPartitionManager implements AutoCloseable { - - private static final Logger LOG = LoggerFactory.getLogger(AutoPartitionManager.class); - - /** scheduled executor, periodically trigger auto partition. */ - private final ScheduledExecutorService periodicExecutor; - - private final ZooKeeperClient zooKeeperClient; - private final MetadataCache metadataCache; - private final Clock clock; - - private final long periodicInterval; - private final AtomicBoolean isClosed = new AtomicBoolean(false); - - @GuardedBy("lock") - private final Map autoPartitionTables = new HashMap<>(); - - @GuardedBy("lock") - private final Map> partitionsByTable = new HashMap<>(); - - private final Lock lock = new ReentrantLock(); - - public AutoPartitionManager( - MetadataCache metadataCache, ZooKeeperClient zooKeeperClient, Configuration conf) { - this( - metadataCache, - zooKeeperClient, - conf, - SystemClock.getInstance(), - Executors.newScheduledThreadPool( - 1, new ExecutorThreadFactory("periodic-auto-partition-manager"))); - } - - @VisibleForTesting - AutoPartitionManager( - MetadataCache metadataCache, - ZooKeeperClient zooKeeperClient, - Configuration conf, - Clock clock, - ScheduledExecutorService periodicExecutor) { - this.metadataCache = metadataCache; - this.zooKeeperClient = zooKeeperClient; - this.clock = clock; - this.periodicExecutor = periodicExecutor; - this.periodicInterval = conf.get(ConfigOptions.AUTO_PARTITION_CHECK_INTERVAL).toMillis(); - } - - public void initAutoPartitionTables(Map tableInfos) { - inLock(lock, () -> autoPartitionTables.putAll(tableInfos)); - } - - public void addAutoPartitionTable(TableInfo tableInfo) { - checkNotClosed(); - long tableId = tableInfo.getTableId(); - inLock(lock, () -> autoPartitionTables.put(tableId, tableInfo)); - // schedule auto partition for this table immediately - periodicExecutor.schedule(() -> doAutoPartition(tableId), 0, TimeUnit.MILLISECONDS); - } - - public void removeAutoPartitionTable(long tableId) { - inLock(lock, () -> autoPartitionTables.remove(tableId)); - } - - public void start() { - checkNotClosed(); - periodicExecutor.scheduleWithFixedDelay( - this::doAutoPartition, periodicInterval, periodicInterval, TimeUnit.MILLISECONDS); - LOG.info("Auto partitioning task is scheduled at fixed interval {}ms.", periodicInterval); - } - - private void checkNotClosed() { - if (isClosed.get()) { - throw new IllegalStateException("AutoPartitionManager is already closed."); - } - } - - private void doAutoPartition() { - Instant now = clock.instant(); - LOG.info("Start auto partitioning for all tables at {}.", now); - inLock(lock, () -> autoPartition(now, autoPartitionTables.keySet())); - } - - private void doAutoPartition(long tableId) { - Instant now = clock.instant(); - LOG.info("Start auto partitioning for table {} at {}.", tableId, now); - inLock(lock, () -> autoPartition(now, Collections.singleton(tableId))); - } - - @VisibleForTesting - protected void autoPartition(Instant now, Set tableIds) { - for (Long tableId : tableIds) { - TreeSet currentPartitions = - partitionsByTable.computeIfAbsent(tableId, k -> new TreeSet<>()); - TableInfo tableInfo = autoPartitionTables.get(tableId); - dropPartitions( - tableInfo.getTablePath(), - now, - tableInfo.getTableConfig().getAutoPartitionStrategy(), - currentPartitions); - createPartitions(tableInfo, now, currentPartitions); - } - } - - protected void createPartitions( - TableInfo tableInfo, Instant currentInstant, TreeSet currentPartitions) { - // get the partitions needed to create - List partitionsToPreCreate = - partitionNamesToPreCreate( - currentInstant, - tableInfo.getTableConfig().getAutoPartitionStrategy(), - currentPartitions); - if (partitionsToPreCreate.isEmpty()) { - return; - } - - TablePath tablePath = tableInfo.getTablePath(); - for (String partitionName : partitionsToPreCreate) { - try { - long tableId = tableInfo.getTableId(); - long partitionId = zooKeeperClient.getPartitionIdAndIncrement(); - // register partition assignments to zk first - registerPartitionAssignment(tableId, partitionId, tableInfo); - // then register the partition metadata to zk - zooKeeperClient.registerPartition(tablePath, tableId, partitionName, partitionId); - currentPartitions.add(partitionName); - LOG.info( - "Auto partitioning created partition {} for table [{}].", - partitionName, - tablePath); - } catch (Exception e) { - LOG.error( - "Auto partitioning failed to create partition {} for table [{}]", - partitionName, - tablePath, - e); - } - } - } - - private void registerPartitionAssignment(long tableId, long partitionId, TableInfo tableInfo) - throws Exception { - int replicaFactor = tableInfo.getTableConfig().getReplicationFactor(); - int[] servers = metadataCache.getLiveServerIds(); - // bucket count must exist for table has been created - int bucketCount = tableInfo.getNumBuckets(); - Map bucketAssignments = - TableAssignmentUtils.generateAssignment(bucketCount, replicaFactor, servers) - .getBucketAssignments(); - PartitionAssignment partitionAssignment = - new PartitionAssignment(tableId, bucketAssignments); - // register table assignment - zooKeeperClient.registerPartitionAssignment(partitionId, partitionAssignment); - } - - private static List partitionNamesToPreCreate( - Instant currentInstant, - AutoPartitionStrategy autoPartitionStrategy, - TreeSet currentPartitions) { - AutoPartitionTimeUnit autoPartitionTimeUnit = autoPartitionStrategy.timeUnit(); - ZonedDateTime currentZonedDateTime = - ZonedDateTime.ofInstant( - currentInstant, autoPartitionStrategy.timeZone().toZoneId()); - - int partitionToPreCreate = autoPartitionStrategy.numPreCreate(); - List partitionsToCreate = new ArrayList<>(); - for (int idx = 0; idx < partitionToPreCreate; idx++) { - String partition = getPartitionString(currentZonedDateTime, idx, autoPartitionTimeUnit); - // if the partition already exists, we don't need to create it, - // otherwise, create it - if (!currentPartitions.contains(partition)) { - partitionsToCreate.add(partition); - } - } - return partitionsToCreate; - } - - private void dropPartitions( - TablePath tablePath, - Instant currentInstant, - AutoPartitionStrategy autoPartitionStrategy, - NavigableSet currentPartitions) { - int numToRetain = autoPartitionStrategy.numToRetain(); - // negative value means not to drop partitions - if (numToRetain < 0) { - return; - } - - ZonedDateTime currentZonedDateTime = - ZonedDateTime.ofInstant( - currentInstant, autoPartitionStrategy.timeZone().toZoneId()); - - // get the earliest one partition that need to retain - String lastRetainPartitionName = - getPartitionString( - currentZonedDateTime, -numToRetain, autoPartitionStrategy.timeUnit()); - - Iterator partitionsToExpire = - currentPartitions.headSet(lastRetainPartitionName, false).iterator(); - - while (partitionsToExpire.hasNext()) { - String partitionName = partitionsToExpire.next(); - // drop the partition - try { - zooKeeperClient.deletePartition(tablePath, partitionName); - // only remove when zk success, this reflects to the partitionsByTable - partitionsToExpire.remove(); - LOG.info( - "Auto partitioning deleted partition {} for table [{}].", - partitionName, - tablePath); - } catch (Exception e) { - LOG.error( - "Auto partitioning failed to delete partition {} for table [{}]", - partitionName, - tablePath, - e); - } - } - } - - @Override - public void close() throws Exception { - if (isClosed.compareAndSet(false, true)) { - periodicExecutor.shutdownNow(); - } - } -} diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java index 0a9b52b8c..900aa4acd 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java @@ -87,7 +87,6 @@ import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -117,7 +116,7 @@ public class CoordinatorEventProcessor implements EventProcessor { private final CoordinatorEventManager coordinatorEventManager; private final MetadataManager metadataManager; private final TableManager tableManager; - private final AutoPartitionManager autoPartitionManager; + private final PartitionManager partitionManager; private final TableChangeWatcher tableChangeWatcher; private final CoordinatorChannelManager coordinatorChannelManager; private final TabletServerChangeWatcher tabletServerChangeWatcher; @@ -143,7 +142,7 @@ public CoordinatorEventProcessor( ServerMetadataCache serverMetadataCache, CoordinatorChannelManager coordinatorChannelManager, CompletedSnapshotStoreManager completedSnapshotStoreManager, - AutoPartitionManager autoPartitionManager, + PartitionManager partitionManager, CoordinatorMetricGroup coordinatorMetricGroup) { this( zooKeeperClient, @@ -151,7 +150,7 @@ public CoordinatorEventProcessor( coordinatorChannelManager, new CoordinatorContext(), completedSnapshotStoreManager, - autoPartitionManager, + partitionManager, coordinatorMetricGroup); } @@ -161,7 +160,7 @@ public CoordinatorEventProcessor( CoordinatorChannelManager coordinatorChannelManager, CoordinatorContext coordinatorContext, CompletedSnapshotStoreManager completedSnapshotStoreManager, - AutoPartitionManager autoPartitionManager, + PartitionManager partitionManager, CoordinatorMetricGroup coordinatorMetricGroup) { this.zooKeeperClient = zooKeeperClient; this.serverMetadataCache = serverMetadataCache; @@ -192,7 +191,7 @@ public CoordinatorEventProcessor( this.coordinatorRequestBatch = new CoordinatorRequestBatch(coordinatorChannelManager, coordinatorEventManager); this.completedSnapshotStoreManager = completedSnapshotStoreManager; - this.autoPartitionManager = autoPartitionManager; + this.partitionManager = partitionManager; this.coordinatorMetricGroup = coordinatorMetricGroup; registerMetric(); } @@ -291,7 +290,7 @@ private void initCoordinatorContext() throws Exception { coordinatorChannelManager.startup(tabletServers); // load all tables - Map autoPartitionTables = new HashMap<>(); + List partitionTables = new ArrayList<>(); for (String database : metadataManager.listDatabases()) { for (String tableName : metadataManager.listTables(database)) { TablePath tablePath = TablePath.of(database, tableName); @@ -306,17 +305,11 @@ private void initCoordinatorContext() throws Exception { // put partition info to coordinator context coordinatorContext.putPartition(partition.getValue(), partition.getKey()); } - // if the table is auto partition, put the partitions info - if (tableInfo - .getTableConfig() - .getAutoPartitionStrategy() - .isAutoPartitionEnabled()) { - autoPartitionTables.put(tableInfo.getTableId(), tableInfo); - } + partitionTables.add(tableInfo); } } } - autoPartitionManager.initAutoPartitionTables(autoPartitionTables); + partitionManager.initPartitionTables(partitionTables); // load all assignment loadTableAssignment(); @@ -484,8 +477,8 @@ private void processCreateTable(CreateTableEvent createTableEvent) { tableInfo.getTablePath(), tableInfo.getTableId(), createTableEvent.getTableAssignment()); - if (createTableEvent.isAutoPartitionTable()) { - autoPartitionManager.addAutoPartitionTable(tableInfo); + if (tableInfo.isPartitioned()) { + partitionManager.addPartitionTable(tableInfo); } } @@ -501,8 +494,9 @@ private void processCreatePartition(CreatePartitionEvent createPartitionEvent) { private void processDropTable(DropTableEvent dropTableEvent) { coordinatorContext.queueTableDeletion(Collections.singleton(dropTableEvent.getTableId())); tableManager.onDeleteTable(dropTableEvent.getTableId()); - if (dropTableEvent.isAutoPartitionTable()) { - autoPartitionManager.removeAutoPartitionTable(dropTableEvent.getTableId()); + if (dropTableEvent.isPartitionTable()) { + partitionManager.removePartitionTable( + dropTableEvent.getTableId(), dropTableEvent.isAutoPartitionTable()); } } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java index 51730232a..f547ba2a4 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java @@ -107,7 +107,7 @@ public class CoordinatorServer extends ServerBase { private ZooKeeperClient zkClient; @GuardedBy("lock") - private AutoPartitionManager autoPartitionManager; + private PartitionManager partitionManager; public CoordinatorServer(Configuration conf) { super(conf); @@ -142,12 +142,16 @@ protected void startServices() throws Exception { this.metadataCache = new ServerMetadataCacheImpl(); + this.partitionManager = new PartitionManager(metadataCache, zkClient, conf); + partitionManager.start(); + this.coordinatorService = new CoordinatorService( conf, remoteFileSystem, zkClient, this::getCoordinatorEventManager, + partitionManager, metadataCache); this.rpcServer = @@ -173,9 +177,6 @@ protected void startServices() throws Exception { conf.getInt(ConfigOptions.COORDINATOR_IO_POOL_SIZE), zkClient); - this.autoPartitionManager = new AutoPartitionManager(metadataCache, zkClient, conf); - autoPartitionManager.start(); - // start coordinator event processor after we register coordinator leader to zk // so that the event processor can get the coordinator leader node from zk during start // up. @@ -187,7 +188,7 @@ protected void startServices() throws Exception { metadataCache, coordinatorChannelManager, bucketSnapshotManager, - autoPartitionManager, + partitionManager, serverMetricGroup); coordinatorEventProcessor.startup(); @@ -260,8 +261,8 @@ CompletableFuture stopServices() { } try { - if (autoPartitionManager != null) { - autoPartitionManager.close(); + if (partitionManager != null) { + partitionManager.close(); } } catch (Throwable t) { exception = ExceptionUtils.firstOrSuppressed(t, exception); diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java index 7ce859932..58329e58a 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java @@ -21,11 +21,16 @@ import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.exception.InvalidDatabaseException; import com.alibaba.fluss.exception.InvalidTableException; +import com.alibaba.fluss.exception.TableAlreadyExistException; +import com.alibaba.fluss.exception.TableNotPartitionedException; import com.alibaba.fluss.fs.FileSystem; import com.alibaba.fluss.metadata.DatabaseDescriptor; import com.alibaba.fluss.metadata.TableDescriptor; +import com.alibaba.fluss.metadata.TableInfo; import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.rpc.gateway.CoordinatorGateway; +import com.alibaba.fluss.rpc.messages.AddPartitionRequest; +import com.alibaba.fluss.rpc.messages.AddPartitionResponse; import com.alibaba.fluss.rpc.messages.AdjustIsrRequest; import com.alibaba.fluss.rpc.messages.AdjustIsrResponse; import com.alibaba.fluss.rpc.messages.CommitKvSnapshotRequest; @@ -40,6 +45,8 @@ import com.alibaba.fluss.rpc.messages.CreateTableResponse; import com.alibaba.fluss.rpc.messages.DropDatabaseRequest; import com.alibaba.fluss.rpc.messages.DropDatabaseResponse; +import com.alibaba.fluss.rpc.messages.DropPartitionRequest; +import com.alibaba.fluss.rpc.messages.DropPartitionResponse; import com.alibaba.fluss.rpc.messages.DropTableRequest; import com.alibaba.fluss.rpc.messages.DropTableResponse; import com.alibaba.fluss.server.RpcServiceBase; @@ -58,36 +65,35 @@ import com.alibaba.fluss.server.zk.data.TableAssignment; import com.alibaba.fluss.utils.concurrent.FutureUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.UncheckedIOException; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import static com.alibaba.fluss.server.utils.RpcMessageUtils.getCommitLakeTableSnapshotData; +import static com.alibaba.fluss.server.utils.RpcMessageUtils.getPartitionSpec; import static com.alibaba.fluss.server.utils.RpcMessageUtils.toTablePath; /** An RPC Gateway service for coordinator server. */ public final class CoordinatorService extends RpcServiceBase implements CoordinatorGateway { - private static final Logger LOG = LoggerFactory.getLogger(CoordinatorService.class); - private final int defaultBucketNumber; private final int defaultReplicationFactor; private final Supplier eventManagerSupplier; + private final PartitionManager partitionManager; public CoordinatorService( Configuration conf, FileSystem remoteFileSystem, ZooKeeperClient zkClient, Supplier eventManagerSupplier, + PartitionManager partitionManager, ServerMetadataCache metadataCache) { super(conf, remoteFileSystem, ServerType.COORDINATOR, zkClient, metadataCache); this.defaultBucketNumber = conf.getInt(ConfigOptions.DEFAULT_BUCKET_NUMBER); this.defaultReplicationFactor = conf.getInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR); this.eventManagerSupplier = eventManagerSupplier; + this.partitionManager = partitionManager; } @Override @@ -109,7 +115,7 @@ public CompletableFuture createDatabase(CreateDatabaseRe return FutureUtils.failedFuture(e); } - DatabaseDescriptor databaseDescriptor = null; + DatabaseDescriptor databaseDescriptor; if (request.getDatabaseJson() != null) { databaseDescriptor = DatabaseDescriptor.fromJsonBytes(request.getDatabaseJson()); } else { @@ -133,7 +139,7 @@ public CompletableFuture createTable(CreateTableRequest req TablePath tablePath = toTablePath(request.getTablePath()); tablePath.validate(); - TableDescriptor tableDescriptor = null; + TableDescriptor tableDescriptor; try { tableDescriptor = TableDescriptor.fromJsonBytes(request.getTableJson()); } catch (Exception e) { @@ -197,6 +203,51 @@ public CompletableFuture dropTable(DropTableRequest request) return CompletableFuture.completedFuture(response); } + @Override + public CompletableFuture addPartition(AddPartitionRequest request) { + AddPartitionResponse response = new AddPartitionResponse(); + TablePath tablePath = toTablePath(request.getTablePath()); + if (!metadataManager.tableExists(tablePath)) { + if (!request.isIgnoreIfNotExists()) { + throw new TableAlreadyExistException("Table " + tablePath + " not exists."); + } + } + + TableInfo tableInfo = metadataManager.getTable(tablePath); + if (!tableInfo.isPartitioned()) { + throw new TableNotPartitionedException("Only partitioned table support add partition."); + } + + partitionManager.addPartition( + tableInfo, + getPartitionSpec(request.getPartitionSpecsList()), + request.isIgnoreIfNotExists()); + return CompletableFuture.completedFuture(response); + } + + @Override + public CompletableFuture dropPartition(DropPartitionRequest request) { + DropPartitionResponse response = new DropPartitionResponse(); + TablePath tablePath = toTablePath(request.getTablePath()); + if (!metadataManager.tableExists(tablePath)) { + if (!request.isIgnoreIfNotExists()) { + throw new TableAlreadyExistException("Table " + tablePath + " not exists."); + } + } + + TableInfo tableInfo = metadataManager.getTable(tablePath); + if (!tableInfo.isPartitioned()) { + throw new TableNotPartitionedException( + "Only partitioned table support drop partition."); + } + + partitionManager.dropPartition( + metadataManager.getTable(tablePath), + getPartitionSpec(request.getPartitionSpecsList()), + request.isIgnoreIfNotExists()); + return CompletableFuture.completedFuture(response); + } + public CompletableFuture adjustIsr(AdjustIsrRequest request) { CompletableFuture response = new CompletableFuture<>(); eventManagerSupplier diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/PartitionManager.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/PartitionManager.java new file mode 100644 index 000000000..eeb6ffef9 --- /dev/null +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/PartitionManager.java @@ -0,0 +1,461 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.server.coordinator; + +import com.alibaba.fluss.annotation.VisibleForTesting; +import com.alibaba.fluss.cluster.MetadataCache; +import com.alibaba.fluss.config.AutoPartitionTimeUnit; +import com.alibaba.fluss.config.ConfigOptions; +import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.exception.PartitionAlreadyExistsException; +import com.alibaba.fluss.exception.PartitionNotExistException; +import com.alibaba.fluss.exception.PartitionSpecInvalidException; +import com.alibaba.fluss.metadata.PartitionSpec; +import com.alibaba.fluss.metadata.TableInfo; +import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.server.utils.TableAssignmentUtils; +import com.alibaba.fluss.server.zk.ZooKeeperClient; +import com.alibaba.fluss.server.zk.data.BucketAssignment; +import com.alibaba.fluss.server.zk.data.PartitionAssignment; +import com.alibaba.fluss.utils.AutoPartitionStrategy; +import com.alibaba.fluss.utils.PartitionUtils; +import com.alibaba.fluss.utils.clock.Clock; +import com.alibaba.fluss.utils.clock.SystemClock; +import com.alibaba.fluss.utils.concurrent.ExecutorThreadFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; + +import java.time.Instant; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static com.alibaba.fluss.utils.PartitionUtils.generateAutoPartitionName; +import static com.alibaba.fluss.utils.Preconditions.checkNotNull; +import static com.alibaba.fluss.utils.concurrent.LockUtils.inLock; + +/** + * A partition manager which will manage static partition tables and auto partition tables. + * + *

For static partition tables, it's responsible for adding partitions to or drop partitions from + * static partition table. + * + *

For auto partition tables, it's responsible not only for adding partitions to or drop + * partitions from auto partition table, but also triggering auto partition for these auto partition + * tables in cluster periodically. It'll use a {@link ScheduledExecutorService} to schedule the auto + * partition which trigger auto partition for them. + */ +public class PartitionManager implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(PartitionManager.class); + + private final ZooKeeperClient zooKeeperClient; + private final MetadataCache metadataCache; + private final Lock lock = new ReentrantLock(); + private final AtomicBoolean isClosed = new AtomicBoolean(false); + + private final Clock clock; + private final long periodicInterval; + + /** scheduled executor, periodically trigger auto partition. */ + private final ScheduledExecutorService periodicExecutor; + + @GuardedBy("lock") + private final Map autoPartitionTables = new HashMap<>(); + + /** A map from tableId to the set of partitions for auto partition tables. */ + @GuardedBy("lock") + private final Map> partitionsForAutoPartitionTable = new HashMap<>(); + + /** A map from tableId to the set of partitions for static partition tables. */ + @GuardedBy("lock") + private final Map> partitionsForStaticPartitionTable = new HashMap<>(); + + public PartitionManager( + MetadataCache metadataCache, ZooKeeperClient zooKeeperClient, Configuration conf) { + this( + zooKeeperClient, + metadataCache, + conf, + SystemClock.getInstance(), + Executors.newScheduledThreadPool( + 1, new ExecutorThreadFactory("periodic-auto-partition-manager"))); + } + + @VisibleForTesting + PartitionManager( + ZooKeeperClient zooKeeperClient, + MetadataCache metadataCache, + Configuration conf, + Clock clock, + ScheduledExecutorService periodicExecutor) { + this.metadataCache = metadataCache; + this.zooKeeperClient = zooKeeperClient; + this.clock = clock; + this.periodicExecutor = periodicExecutor; + this.periodicInterval = conf.get(ConfigOptions.AUTO_PARTITION_CHECK_INTERVAL).toMillis(); + } + + public void start() { + checkNotClosed(); + periodicExecutor.scheduleWithFixedDelay( + this::doAutoPartition, periodicInterval, periodicInterval, TimeUnit.MILLISECONDS); + LOG.info("Auto partitioning task is scheduled at fixed interval {}ms.", periodicInterval); + } + + public void initPartitionTables(List tableInfos) { + tableInfos.forEach(this::addPartitionTable); + } + + public void addPartitionTable(TableInfo tableInfo) { + checkNotClosed(); + long tableId = tableInfo.getTableId(); + inLock( + lock, + () -> { + if (isAutoPartitionTable(tableInfo)) { + autoPartitionTables.put(tableId, tableInfo); + } + + Set partitionSet = getOrCreatePartitionSet(tableInfo); + checkNotNull(partitionSet, "Partition set is null."); + try { + partitionSet.addAll( + zooKeeperClient.getPartitions(tableInfo.getTablePath())); + } catch (Exception e) { + LOG.error( + "Fail to get partitions from zookeeper for table {}.", + tableInfo.getTablePath(), + e); + } + }); + + if (isAutoPartitionTable(tableInfo)) { + // schedule auto partition for this table immediately + periodicExecutor.schedule(() -> doAutoPartition(tableId), 0, TimeUnit.MILLISECONDS); + } + } + + public void removePartitionTable(long tableId, boolean isAutoPartitionTable) { + checkNotClosed(); + inLock( + lock, + () -> { + if (isAutoPartitionTable) { + autoPartitionTables.remove(tableId); + partitionsForAutoPartitionTable.remove(tableId); + } else { + partitionsForStaticPartitionTable.remove(tableId); + } + }); + } + + public void addPartition( + TableInfo tableInfo, PartitionSpec partitionSpec, boolean ignoreIfExists) { + checkNotClosed(); + + long tableId = tableInfo.getTableId(); + String partitionName = getPartitionName(tableInfo, partitionSpec); + inLock( + lock, + () -> { + Set partitionSet = getOrCreatePartitionSet(tableInfo); + if (partitionSet.contains(partitionName)) { + if (ignoreIfExists) { + return; + } + throw new PartitionAlreadyExistsException( + "Partition '" + + partitionName + + "' already exists for table " + + tableInfo.getTablePath()); + } + + registerPartitionToZk( + tableInfo.getTablePath(), tableId, tableInfo, partitionName); + partitionSet.add(partitionName); + }); + } + + public void dropPartition( + TableInfo tableInfo, PartitionSpec partitionSpec, boolean ignoreIfNotExists) { + checkNotClosed(); + + String partitionName = getPartitionName(tableInfo, partitionSpec); + inLock( + lock, + () -> { + Set partitionSet = getOrCreatePartitionSet(tableInfo); + if (!partitionSet.contains(partitionName)) { + if (ignoreIfNotExists) { + return; + } + throw new PartitionNotExistException( + "Partition '" + + partitionName + + "' does not exist for table " + + tableInfo.getTablePath()); + } + try { + zooKeeperClient.deletePartition(tableInfo.getTablePath(), partitionName); + } catch (Exception e) { + LOG.error( + "Fail to delete partition '{}' from zookeeper for table {}.", + partitionName, + tableInfo.getTablePath(), + e); + } + partitionSet.remove(partitionName); + }); + } + + private void doAutoPartition() { + Instant now = clock.instant(); + LOG.info("Start auto partitioning for all tables at {}.", now); + inLock(lock, () -> doAutoPartition(now, autoPartitionTables.keySet())); + } + + private void doAutoPartition(long tableId) { + Instant now = clock.instant(); + LOG.info("Start auto partitioning for table {} at {}.", tableId, now); + inLock(lock, () -> doAutoPartition(now, Collections.singleton(tableId))); + } + + private boolean isAutoPartitionTable(TableInfo tableInfo) { + return tableInfo.isAutoPartitioned(); + } + + private Set getOrCreatePartitionSet(TableInfo tableInfo) { + if (isAutoPartitionTable(tableInfo)) { + return partitionsForAutoPartitionTable.computeIfAbsent( + tableInfo.getTableId(), k -> new TreeSet<>()); + } else { + return partitionsForStaticPartitionTable.computeIfAbsent( + tableInfo.getTableId(), k -> new HashSet<>()); + } + } + + private String getPartitionName(TableInfo tableInfo, PartitionSpec partitionSpec) { + List partitionKeys = tableInfo.getPartitionKeys(); + Map partitionSpecMap = partitionSpec.getPartitionSpec(); + if (partitionKeys.size() != partitionSpecMap.size()) { + throw new PartitionSpecInvalidException( + String.format( + "Partition spec size is not equal to partition keys size for partitioned table %s.", + tableInfo.getTablePath())); + } + List reOrderedPartitionValue = new ArrayList<>(partitionKeys.size()); + for (String partitionKey : partitionKeys) { + if (!partitionSpecMap.containsKey(partitionKey)) { + throw new PartitionSpecInvalidException( + String.format( + "Partition spec does not contain partition key " + + partitionKey + + "for partitioned table %s.", + tableInfo.getTablePath())); + } else { + reOrderedPartitionValue.add(partitionSpecMap.get(partitionKey)); + } + } + return PartitionUtils.getPartitionName(partitionKeys, reOrderedPartitionValue); + } + + private void registerPartitionToZk( + TablePath tablePath, long tableId, TableInfo tableInfo, String partitionName) { + try { + long partitionId = zooKeeperClient.getPartitionIdAndIncrement(); + // register partition assignments to zk first + registerPartitionAssignment(tableId, partitionId, tableInfo); + // then register the partition metadata to zk + zooKeeperClient.registerPartition(tablePath, tableId, partitionName, partitionId); + LOG.info( + "Register partition {} to zookeeper for table [{}].", partitionName, tablePath); + } catch (Exception e) { + LOG.error( + "Register partition to zookeeper failed to create partition {} for table [{}]", + partitionName, + tablePath, + e); + } + } + + private void registerPartitionAssignment(long tableId, long partitionId, TableInfo tableInfo) + throws Exception { + int replicaFactor = tableInfo.getTableConfig().getReplicationFactor(); + int[] servers = metadataCache.getLiveServerIds(); + // bucket count must exist for table has been created + int bucketCount = tableInfo.getNumBuckets(); + Map bucketAssignments = + TableAssignmentUtils.generateAssignment(bucketCount, replicaFactor, servers) + .getBucketAssignments(); + PartitionAssignment partitionAssignment = + new PartitionAssignment(tableId, bucketAssignments); + // register table assignment + zooKeeperClient.registerPartitionAssignment(partitionId, partitionAssignment); + } + + private void doAutoPartition(Instant now, Set tableIds) { + for (Long tableId : tableIds) { + TreeSet currentPartitions = + partitionsForAutoPartitionTable.computeIfAbsent(tableId, k -> new TreeSet<>()); + TableInfo tableInfo = autoPartitionTables.get(tableId); + autoDropPartitions( + tableInfo.getPartitionKeys(), + tableInfo.getTablePath(), + now, + tableInfo.getTableConfig().getAutoPartitionStrategy(), + currentPartitions); + autoCreatePartitions(tableInfo, now, currentPartitions); + } + } + + private void autoCreatePartitions( + TableInfo tableInfo, Instant currentInstant, TreeSet currentPartitions) { + // get the partitions needed to create + List partitionsToPreCreate = + autoPartitionNamesToPreCreate( + tableInfo.getPartitionKeys(), + currentInstant, + tableInfo.getTableConfig().getAutoPartitionStrategy(), + currentPartitions); + if (partitionsToPreCreate.isEmpty()) { + return; + } + + TablePath tablePath = tableInfo.getTablePath(); + for (String partitionName : partitionsToPreCreate) { + long tableId = tableInfo.getTableId(); + try { + registerPartitionToZk(tablePath, tableId, tableInfo, partitionName); + currentPartitions.add(partitionName); + LOG.info( + "Auto partitioning created partition {} for table [{}].", + partitionName, + tablePath); + } catch (Exception e) { + LOG.error( + "Auto partitioning failed to create partition {} for table [{}]", + partitionName, + tablePath, + e); + } + } + } + + private List autoPartitionNamesToPreCreate( + List partitionKeys, + Instant currentInstant, + AutoPartitionStrategy autoPartitionStrategy, + TreeSet currentPartitions) { + AutoPartitionTimeUnit autoPartitionTimeUnit = autoPartitionStrategy.timeUnit(); + ZonedDateTime currentZonedDateTime = + ZonedDateTime.ofInstant( + currentInstant, autoPartitionStrategy.timeZone().toZoneId()); + + int partitionToPreCreate = autoPartitionStrategy.numPreCreate(); + List partitionsToCreate = new ArrayList<>(); + for (int idx = 0; idx < partitionToPreCreate; idx++) { + String partition = + generateAutoPartitionName( + partitionKeys, currentZonedDateTime, idx, autoPartitionTimeUnit); + // if the partition already exists, we don't need to create it, + // otherwise, create it + if (!currentPartitions.contains(partition)) { + partitionsToCreate.add(partition); + } + } + return partitionsToCreate; + } + + private void autoDropPartitions( + List partitionKeys, + TablePath tablePath, + Instant currentInstant, + AutoPartitionStrategy autoPartitionStrategy, + NavigableSet currentPartitions) { + int numToRetain = autoPartitionStrategy.numToRetain(); + // negative value means not to drop partitions + if (numToRetain < 0) { + return; + } + + ZonedDateTime currentZonedDateTime = + ZonedDateTime.ofInstant( + currentInstant, autoPartitionStrategy.timeZone().toZoneId()); + + // get the earliest one partition that need to retain + String lastRetainPartitionName = + generateAutoPartitionName( + partitionKeys, + currentZonedDateTime, + -numToRetain, + autoPartitionStrategy.timeUnit()); + + Iterator partitionsToExpire = + currentPartitions.headSet(lastRetainPartitionName, false).iterator(); + + while (partitionsToExpire.hasNext()) { + String partitionName = partitionsToExpire.next(); + // drop the partition + try { + zooKeeperClient.deletePartition(tablePath, partitionName); + // only remove when zk success, this reflects to the partitionsByTable + partitionsToExpire.remove(); + LOG.info( + "Auto partitioning deleted partition {} for table [{}].", + partitionName, + tablePath); + } catch (Exception e) { + LOG.error( + "Auto partitioning failed to delete partition {} for table [{}]", + partitionName, + tablePath, + e); + } + } + } + + private void checkNotClosed() { + if (isClosed.get()) { + throw new IllegalStateException("PartitionManager is already closed."); + } + } + + @Override + public void close() throws Exception { + if (isClosed.compareAndSet(false, true)) { + periodicExecutor.shutdownNow(); + } + } +} diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/event/DropTableEvent.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/event/DropTableEvent.java index ea4496546..aacab5bfa 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/event/DropTableEvent.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/event/DropTableEvent.java @@ -23,11 +23,14 @@ public class DropTableEvent implements CoordinatorEvent { private final long tableId; + private final boolean isPartitionTable; + // true if the table is with auto partition enabled private final boolean isAutoPartitionTable; - public DropTableEvent(long tableId, boolean isAutoPartitionTable) { + public DropTableEvent(long tableId, boolean isPartitionTable, boolean isAutoPartitionTable) { this.tableId = tableId; + this.isPartitionTable = isPartitionTable; this.isAutoPartitionTable = isAutoPartitionTable; } @@ -35,6 +38,10 @@ public long getTableId() { return tableId; } + public boolean isPartitionTable() { + return isPartitionTable; + } + public boolean isAutoPartitionTable() { return isAutoPartitionTable; } @@ -48,12 +55,14 @@ public boolean equals(Object o) { return false; } DropTableEvent that = (DropTableEvent) o; - return tableId == that.tableId && isAutoPartitionTable == that.isAutoPartitionTable; + return tableId == that.tableId + && isPartitionTable == that.isPartitionTable + && isAutoPartitionTable == that.isAutoPartitionTable; } @Override public int hashCode() { - return Objects.hash(tableId, isAutoPartitionTable); + return Objects.hash(tableId, isPartitionTable, isAutoPartitionTable); } @Override @@ -61,6 +70,8 @@ public String toString() { return "DropTableEvent{" + "tableId=" + tableId + + ", isPartitionTable=" + + isPartitionTable + ", isAutoPartitionTable=" + isAutoPartitionTable + '}'; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/event/watcher/TableChangeWatcher.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/event/watcher/TableChangeWatcher.java index e0b13e3ec..a790f494e 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/event/watcher/TableChangeWatcher.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/event/watcher/TableChangeWatcher.java @@ -141,6 +141,7 @@ public void event(Type type, ChildData oldData, ChildData newData) { eventManager.put( new DropTableEvent( table.tableId, + !table.partitionKeys.isEmpty(), AutoPartitionStrategy.from(table.properties) .isAutoPartitionEnabled())); } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/RpcMessageUtils.java b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/RpcMessageUtils.java index 30a8946be..2d4a1e0c7 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/RpcMessageUtils.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/RpcMessageUtils.java @@ -21,6 +21,7 @@ import com.alibaba.fluss.fs.FsPath; import com.alibaba.fluss.fs.token.ObtainedSecurityToken; import com.alibaba.fluss.lakehouse.LakeStorageInfo; +import com.alibaba.fluss.metadata.PartitionSpec; import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TablePath; @@ -85,6 +86,7 @@ import com.alibaba.fluss.rpc.messages.PbNotifyLakeTableOffsetReqForBucket; import com.alibaba.fluss.rpc.messages.PbNotifyLeaderAndIsrReqForBucket; import com.alibaba.fluss.rpc.messages.PbNotifyLeaderAndIsrRespForBucket; +import com.alibaba.fluss.rpc.messages.PbPartitionSpec; import com.alibaba.fluss.rpc.messages.PbPhysicalTablePath; import com.alibaba.fluss.rpc.messages.PbPrefixLookupReqForBucket; import com.alibaba.fluss.rpc.messages.PbPrefixLookupRespForBucket; @@ -1259,6 +1261,14 @@ public static GetLatestLakeSnapshotResponse makeGetLatestLakeSnapshotResponse( return getLakeTableSnapshotResponse; } + public static PartitionSpec getPartitionSpec(List partitionSpecs) { + Map partitionFieldSpec = new HashMap<>(); + for (PbPartitionSpec pbPartitionSpec : partitionSpecs) { + partitionFieldSpec.put(pbPartitionSpec.getPartitionKey(), pbPartitionSpec.getValue()); + } + return new PartitionSpec(partitionFieldSpec); + } + private static PbLakeStorageInfo toPbLakeStorageInfo(LakeStorageInfo lakeStorageInfo) { PbLakeStorageInfo pbLakeStorageInfo = new PbLakeStorageInfo(); pbLakeStorageInfo.setLakeStorageType(lakeStorageInfo.getLakeStorage()); diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/TableDescriptorValidation.java b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/TableDescriptorValidation.java index acc0b58ee..b5361af2e 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/TableDescriptorValidation.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/TableDescriptorValidation.java @@ -36,6 +36,9 @@ import java.util.Optional; import static com.alibaba.fluss.config.FlussConfigUtils.TABLE_OPTIONS; +import static com.alibaba.fluss.utils.PartitionUtils.AUTO_PARTITION_KEY_SUPPORTED_TYPES; +import static com.alibaba.fluss.utils.PartitionUtils.PARTITION_SPEC_SEPARATOR; +import static com.alibaba.fluss.utils.PartitionUtils.STATIC_PARTITION_KEY_SUPPORTED_TYPES; /** Validator of {@link TableDescriptor}. */ public class TableDescriptorValidation { @@ -69,7 +72,7 @@ public static void validateTableDescriptor(TableDescriptor tableDescriptor) { checkArrowCompression(tableConf); checkMergeEngine(tableConf, hasPrimaryKey, schema); checkTieredLog(tableConf); - checkAutoPartition(tableConf, tableDescriptor.getPartitionKeys(), schema); + checkPartition(tableConf, tableDescriptor.getPartitionKeys(), schema); } private static void checkDistribution(TableDescriptor tableDescriptor) { @@ -170,19 +173,23 @@ private static void checkTieredLog(Configuration tableConf) { } } - private static void checkAutoPartition( + private static void checkPartition( Configuration tableConf, List partitionKeys, RowType rowType) { boolean isPartitioned = !partitionKeys.isEmpty(); AutoPartitionStrategy autoPartition = AutoPartitionStrategy.from(tableConf); - - if (!isPartitioned && autoPartition.isAutoPartitionEnabled()) { - throw new InvalidConfigException( - String.format( - "Currently, auto partition is only supported for partitioned table, please set table property '%s' to false.", - ConfigOptions.TABLE_AUTO_PARTITION_ENABLED.key())); - } - if (isPartitioned) { + partitionKeys.forEach( + partitionKey -> { + if (partitionKey.contains(PARTITION_SPEC_SEPARATOR)) { + throw new InvalidTableException( + "Partition key should not contains separator: '" + + PARTITION_SPEC_SEPARATOR + + "'"); + } + }); + + // TODO Currently, we only support one partition key, multi-partition keys will be + // supported in next pr. if (partitionKeys.size() > 1) { throw new InvalidTableException( String.format( @@ -190,29 +197,44 @@ private static void checkAutoPartition( partitionKeys)); } - // TODO: support general partitioned table - if (!autoPartition.isAutoPartitionEnabled()) { - throw new InvalidConfigException( - String.format( - "Currently, partitioned table must enable auto partition, please set table property '%s' to true.", - ConfigOptions.TABLE_AUTO_PARTITION_ENABLED.key())); - } - - if (autoPartition.timeUnit() == null) { - throw new InvalidConfigException( - String.format( - "Currently, partitioned table must set auto partition time unit when auto partition is enabled, please set table property '%s'.", - ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT.key())); - } + if (autoPartition.isAutoPartitionEnabled()) { + if (autoPartition.timeUnit() == null) { + throw new InvalidTableException( + String.format( + "Currently, auto partitioned table must set auto partition time unit when auto " + + "partition is enabled, please set table property '%s'.", + ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT.key())); + } - String partitionKey = partitionKeys.get(0); - int partitionIndex = rowType.getFieldIndex(partitionKey); - DataType partitionDataType = rowType.getTypeAt(partitionIndex); - if (partitionDataType.getTypeRoot() != DataTypeRoot.STRING) { - throw new InvalidTableException( - String.format( - "Currently, auto partition enabled table only supports STRING type partition key, but got partition key '%s' with data type %s.", - partitionKey, partitionDataType)); + for (String partitionKey : partitionKeys) { + int partitionIndex = rowType.getFieldIndex(partitionKey); + DataType partitionDataType = rowType.getTypeAt(partitionIndex); + if (!AUTO_PARTITION_KEY_SUPPORTED_TYPES.contains( + partitionDataType.getTypeRoot())) { + throw new InvalidTableException( + String.format( + "Currently, auto partitioned table supported partition key type are %s, " + + "but got partition key '%s' with data type %s.", + AUTO_PARTITION_KEY_SUPPORTED_TYPES, + partitionKey, + partitionDataType)); + } + } + } else { + for (String partitionKey : partitionKeys) { + int partitionIndex = rowType.getFieldIndex(partitionKey); + DataType partitionDataType = rowType.getTypeAt(partitionIndex); + if (!STATIC_PARTITION_KEY_SUPPORTED_TYPES.contains( + partitionDataType.getTypeRoot())) { + throw new InvalidTableException( + String.format( + "Currently, static partitioned table supported partition key type are %s, " + + "but got partition key '%s' with data type %s.", + STATIC_PARTITION_KEY_SUPPORTED_TYPES, + partitionKey, + partitionDataType)); + } + } } } } diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/AutoPartitionManagerTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/AutoPartitionManagerTest.java index 2bad89be8..60f491e63 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/AutoPartitionManagerTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/AutoPartitionManagerTest.java @@ -161,23 +161,23 @@ static Stream parameters() { @ParameterizedTest @MethodSource("parameters") - void testAddPartitionedTable(TestParams params) throws Exception { + void testAddAutoPartitionedTable(TestParams params) throws Exception { ManualClock clock = new ManualClock(params.startTimeMs); ManuallyTriggeredScheduledExecutorService periodicExecutor = new ManuallyTriggeredScheduledExecutorService(); - AutoPartitionManager autoPartitionManager = - new AutoPartitionManager( - new TestingMetadataCache(3), + PartitionManager partitionManager = + new PartitionManager( zookeeperClient, + new TestingMetadataCache(3), new Configuration(), clock, periodicExecutor); - autoPartitionManager.start(); + partitionManager.start(); - TableInfo table = createPartitionedTable(params.timeUnit); + TableInfo table = createAutoPartitionedTable(params.timeUnit); TablePath tablePath = table.getTablePath(); - autoPartitionManager.addAutoPartitionTable(table); + partitionManager.addPartitionTable(table); // the first auto-partition task is a non-periodic task periodicExecutor.triggerNonPeriodicScheduledTask(); @@ -303,7 +303,7 @@ public TestParams build() { // ------------------------------------------------------------------------------------------- - private TableInfo createPartitionedTable(AutoPartitionTimeUnit timeUnit) throws Exception { + private TableInfo createAutoPartitionedTable(AutoPartitionTimeUnit timeUnit) throws Exception { long tableId = 1; TablePath tablePath = TablePath.of("db", "test_partition_" + UUID.randomUUID()); TableDescriptor descriptor = diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java index 803f66af7..d2a0c42ab 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java @@ -116,7 +116,7 @@ class CoordinatorEventProcessorTest { private ServerMetadataCache serverMetadataCache; private TestCoordinatorChannelManager testCoordinatorChannelManager; private CompletedSnapshotStoreManager completedSnapshotStoreManager; - private AutoPartitionManager autoPartitionManager; + private PartitionManager partitionManager; @BeforeAll static void baseBeforeAll() throws Exception { @@ -138,15 +138,15 @@ void beforeEach() { // set a test channel manager for the context testCoordinatorChannelManager = new TestCoordinatorChannelManager(); completedSnapshotStoreManager = new CompletedSnapshotStoreManager(1, 1, zookeeperClient); - autoPartitionManager = - new AutoPartitionManager(serverMetadataCache, zookeeperClient, new Configuration()); + partitionManager = + new PartitionManager(serverMetadataCache, zookeeperClient, new Configuration()); eventProcessor = new CoordinatorEventProcessor( zookeeperClient, serverMetadataCache, testCoordinatorChannelManager, completedSnapshotStoreManager, - autoPartitionManager, + partitionManager, TestingMetricGroups.COORDINATOR_METRICS); eventProcessor.startup(); metadataManager.createDatabase( @@ -206,7 +206,7 @@ void testCreateAndDropTable() throws Exception { serverMetadataCache, testCoordinatorChannelManager, completedSnapshotStoreManager, - autoPartitionManager, + partitionManager, TestingMetricGroups.COORDINATOR_METRICS); CoordinatorTestUtils.makeSendLeaderAndStopRequestAlwaysSuccess( testCoordinatorChannelManager, @@ -396,7 +396,7 @@ void testServerBecomeOnlineAndOfflineLine() throws Exception { serverMetadataCache, testCoordinatorChannelManager, completedSnapshotStoreManager, - autoPartitionManager, + partitionManager, TestingMetricGroups.COORDINATOR_METRICS); CoordinatorContext newCoordinatorContext = eventProcessor.getCoordinatorContext(); @@ -444,7 +444,7 @@ void testRestartTriggerReplicaToOffline() throws Exception { serverMetadataCache, testCoordinatorChannelManager, completedSnapshotStoreManager, - autoPartitionManager, + partitionManager, TestingMetricGroups.COORDINATOR_METRICS); CoordinatorContext coordinatorContext = eventProcessor.getCoordinatorContext(); int failedServer = 0; @@ -630,7 +630,7 @@ void testCreateAndDropPartition() throws Exception { serverMetadataCache, testCoordinatorChannelManager, completedSnapshotStoreManager, - autoPartitionManager, + partitionManager, TestingMetricGroups.COORDINATOR_METRICS); CoordinatorTestUtils.makeSendLeaderAndStopRequestAlwaysSuccess( testCoordinatorChannelManager, @@ -809,12 +809,12 @@ private void verifyReplicaOnlineOrOffline( // otherwise, should be online retry( Duration.ofMinutes(1), - () -> { - assertThat( - coordinatorContext.getReplicaState( - bucketReplica)) - .isEqualTo(OnlineReplica); - }); + () -> + assertThat( + coordinatorContext + .getReplicaState( + bucketReplica)) + .isEqualTo(OnlineReplica)); } } }); diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TableManagerITCase.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TableManagerITCase.java index 216e420c9..9d37f05a0 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TableManagerITCase.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TableManagerITCase.java @@ -24,7 +24,6 @@ import com.alibaba.fluss.exception.DatabaseAlreadyExistException; import com.alibaba.fluss.exception.DatabaseNotEmptyException; import com.alibaba.fluss.exception.DatabaseNotExistException; -import com.alibaba.fluss.exception.InvalidConfigException; import com.alibaba.fluss.exception.InvalidDatabaseException; import com.alibaba.fluss.exception.InvalidTableException; import com.alibaba.fluss.exception.PartitionNotExistException; @@ -50,7 +49,6 @@ import com.alibaba.fluss.server.zk.data.BucketAssignment; import com.alibaba.fluss.server.zk.data.TableAssignment; import com.alibaba.fluss.types.DataTypes; -import com.alibaba.fluss.utils.AutoPartitionUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -90,6 +88,7 @@ import static com.alibaba.fluss.server.utils.RpcMessageUtils.toTablePath; import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry; import static com.alibaba.fluss.testutils.common.CommonTestUtils.waitValue; +import static com.alibaba.fluss.utils.PartitionUtils.generateAutoPartitionName; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -370,7 +369,8 @@ void testPartitionedTableManagement(AutoPartitionTimeUnit timeUnit) throws Excep Duration.ofMinutes(1), "partition is not created"); // check the created partitions - List expectAddedPartitions = getExpectAddedPartitions(now, timeUnit, 1); + List expectAddedPartitions = + getExpectAddedPartitions(tableDescriptor.getPartitionKeys(), now, timeUnit, 1); assertThat(partitions).containsOnlyKeys(expectAddedPartitions); // let's drop the table @@ -392,21 +392,6 @@ void testCreateInvalidPartitionedTable() throws Exception { TablePath tablePath = TablePath.of(db1, tb1); // first create a database adminGateway.createDatabase(newCreateDatabaseRequest(db1, false)).get(); - // then create a partitioned table and removes all options - TableDescriptor tableWithoutOptions = - newPartitionedTable().withProperties(Collections.emptyMap()); - assertThatThrownBy( - () -> - adminGateway - .createTable( - newCreateTableRequest( - tablePath, tableWithoutOptions, false)) - .get()) - .cause() - .isInstanceOf(InvalidConfigException.class) - .hasMessageContaining( - "Currently, partitioned table must enable auto partition, " - + "please set table property 'table.auto-partition.enabled' to true."); TableDescriptor tableWithMultiPartKey = newPartitionedTableBuilder(new Schema.Column("tttt", DataTypes.INT())) @@ -436,7 +421,7 @@ void testCreateInvalidPartitionedTable() throws Exception { .cause() .isInstanceOf(InvalidTableException.class) .hasMessageContaining( - "Currently, auto partition enabled table only supports STRING type partition key, but got partition key 'id' with data type INT NOT NULL."); + "Currently, auto partitioned table supported partition key type are [STRING], but got partition key 'id' with data type INT NOT NULL."); } @ParameterizedTest @@ -612,11 +597,14 @@ private AdminGateway getAdminGateway() { } public static List getExpectAddedPartitions( - Instant addInstant, AutoPartitionTimeUnit timeUnit, int newPartitions) { + List partitionKeys, + Instant addInstant, + AutoPartitionTimeUnit timeUnit, + int newPartitions) { ZonedDateTime addDateTime = ZonedDateTime.ofInstant(addInstant, ZoneId.systemDefault()); List partitions = new ArrayList<>(); for (int i = 0; i < newPartitions; i++) { - partitions.add(AutoPartitionUtils.getPartitionString(addDateTime, i, timeUnit)); + partitions.add(generateAutoPartitionName(partitionKeys, addDateTime, i, timeUnit)); } return partitions; } diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TestCoordinatorGateway.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TestCoordinatorGateway.java index ceb200d88..0d1aa0a95 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TestCoordinatorGateway.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TestCoordinatorGateway.java @@ -18,6 +18,8 @@ import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.rpc.gateway.CoordinatorGateway; +import com.alibaba.fluss.rpc.messages.AddPartitionRequest; +import com.alibaba.fluss.rpc.messages.AddPartitionResponse; import com.alibaba.fluss.rpc.messages.AdjustIsrRequest; import com.alibaba.fluss.rpc.messages.AdjustIsrResponse; import com.alibaba.fluss.rpc.messages.ApiVersionsRequest; @@ -38,6 +40,8 @@ import com.alibaba.fluss.rpc.messages.DescribeLakeStorageResponse; import com.alibaba.fluss.rpc.messages.DropDatabaseRequest; import com.alibaba.fluss.rpc.messages.DropDatabaseResponse; +import com.alibaba.fluss.rpc.messages.DropPartitionRequest; +import com.alibaba.fluss.rpc.messages.DropPartitionResponse; import com.alibaba.fluss.rpc.messages.DropTableRequest; import com.alibaba.fluss.rpc.messages.DropTableResponse; import com.alibaba.fluss.rpc.messages.GetDatabaseInfoRequest; @@ -124,6 +128,16 @@ public CompletableFuture dropTable(DropTableRequest request) throw new UnsupportedOperationException(); } + @Override + public CompletableFuture addPartition(AddPartitionRequest request) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture dropPartition(DropPartitionRequest request) { + throw new UnsupportedOperationException(); + } + @Override public CompletableFuture describeLakeStorage( DescribeLakeStorageRequest request) { diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java index 0952fc691..6077ccae0 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java @@ -132,7 +132,7 @@ void testTableChanges() { CreateTableEvent createTableEvent = (CreateTableEvent) coordinatorEvent; TableInfo tableInfo = createTableEvent.getTableInfo(); metadataManager.dropTable(tableInfo.getTablePath(), false); - expectedTableEvents.add(new DropTableEvent(tableInfo.getTableId(), false)); + expectedTableEvents.add(new DropTableEvent(tableInfo.getTableId(), false, false)); } // collect all events and check the all events @@ -208,7 +208,7 @@ void testPartitionedTable() throws Exception { expectedEvents.add(new DropPartitionEvent(tableId, 1L)); expectedEvents.add(new DropPartitionEvent(tableId, 2L)); // drop table event - expectedEvents.add(new DropTableEvent(tableId, true)); + expectedEvents.add(new DropTableEvent(tableId, true, true)); retry( Duration.ofMinutes(1), diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java index c77b92efd..ed6603dc5 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java @@ -22,13 +22,13 @@ import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.rpc.RpcClient; import com.alibaba.fluss.rpc.metrics.TestingClientMetricGroup; -import com.alibaba.fluss.server.coordinator.AutoPartitionManager; import com.alibaba.fluss.server.coordinator.CompletedSnapshotStoreManager; import com.alibaba.fluss.server.coordinator.CoordinatorChannelManager; import com.alibaba.fluss.server.coordinator.CoordinatorContext; import com.alibaba.fluss.server.coordinator.CoordinatorEventProcessor; import com.alibaba.fluss.server.coordinator.CoordinatorRequestBatch; import com.alibaba.fluss.server.coordinator.CoordinatorTestUtils; +import com.alibaba.fluss.server.coordinator.PartitionManager; import com.alibaba.fluss.server.coordinator.TestCoordinatorChannelManager; import com.alibaba.fluss.server.coordinator.event.CoordinatorEventManager; import com.alibaba.fluss.server.metadata.ServerMetadataCache; @@ -45,8 +45,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Arrays; @@ -63,8 +61,6 @@ /** Test for {@link TableBucketStateMachine}. */ class TableBucketStateMachineTest { - private static final Logger LOG = LoggerFactory.getLogger(TableBucketStateMachineTest.class); - @RegisterExtension public static final AllCallbackWrapper ZOO_KEEPER_EXTENSION_WRAPPER = new AllCallbackWrapper<>(new ZooKeeperExtension()); @@ -75,7 +71,7 @@ class TableBucketStateMachineTest { private TestCoordinatorChannelManager testCoordinatorChannelManager; private CoordinatorRequestBatch coordinatorRequestBatch; private CompletedSnapshotStoreManager completedSnapshotStoreManager; - private AutoPartitionManager autoPartitionManager; + private PartitionManager partitionManager; @BeforeAll static void baseBeforeAll() { @@ -99,8 +95,8 @@ void beforeEach() { }); serverMetadataCache = new ServerMetadataCacheImpl(); completedSnapshotStoreManager = new CompletedSnapshotStoreManager(1, 1, zookeeperClient); - autoPartitionManager = - new AutoPartitionManager(serverMetadataCache, zookeeperClient, new Configuration()); + partitionManager = + new PartitionManager(serverMetadataCache, zookeeperClient, new Configuration()); } @Test @@ -234,7 +230,7 @@ void testStateChangeToOnline() throws Exception { TestingClientMetricGroup.newInstance())), coordinatorContext, completedSnapshotStoreManager, - autoPartitionManager, + partitionManager, TestingMetricGroups.COORDINATOR_METRICS); CoordinatorEventManager eventManager = new CoordinatorEventManager(coordinatorEventProcessor); diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/FlussClusterExtension.java b/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/FlussClusterExtension.java index e319530be..562aa6585 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/FlussClusterExtension.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/FlussClusterExtension.java @@ -551,6 +551,10 @@ public Map waitUtilPartitionAllReady(TablePath tablePath) { return waitUntilPartitionsCreated(tablePath, preCreatePartitions); } + public Map waitUtilPartitionAllReady(TablePath tablePath, int expectCount) { + return waitUntilPartitionsCreated(tablePath, expectCount); + } + public Map waitUntilPartitionsCreated(TablePath tablePath, int expectCount) { return waitValue( () -> {