Skip to content

Commit

Permalink
[server] Support addPartition and dropPartition for static partition …
Browse files Browse the repository at this point in the history
…table
  • Loading branch information
swuferhong committed Feb 10, 2025
1 parent 70229b8 commit 20a217d
Show file tree
Hide file tree
Showing 38 changed files with 1,893 additions and 527 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import com.alibaba.fluss.exception.InvalidTableException;
import com.alibaba.fluss.exception.KvSnapshotNotExistException;
import com.alibaba.fluss.exception.NonPrimaryKeyTableException;
import com.alibaba.fluss.exception.PartitionAlreadyExistsException;
import com.alibaba.fluss.exception.PartitionNotExistException;
import com.alibaba.fluss.exception.PartitionSpecInvalidException;
import com.alibaba.fluss.exception.SchemaNotExistException;
import com.alibaba.fluss.exception.TableAlreadyExistException;
import com.alibaba.fluss.exception.TableNotExistException;
Expand All @@ -38,6 +40,7 @@
import com.alibaba.fluss.metadata.DatabaseDescriptor;
import com.alibaba.fluss.metadata.DatabaseInfo;
import com.alibaba.fluss.metadata.PartitionInfo;
import com.alibaba.fluss.metadata.PartitionSpec;
import com.alibaba.fluss.metadata.PhysicalTablePath;
import com.alibaba.fluss.metadata.SchemaInfo;
import com.alibaba.fluss.metadata.TableBucket;
Expand Down Expand Up @@ -328,4 +331,50 @@ ListOffsetsResult listOffsets(

/** Describe the lake used for lakehouse storage. */
CompletableFuture<LakeStorageInfo> describeLakeStorage();

/**
* Add a partition to a partitioned table.
*
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
*
* <ul>
* <li>{@link TableNotExistException} if the table does not exist and {@code ignoreIfExists}
* is false.
* <li>{@link TableNotPartitionedException} if the table is not partitioned.
* <li>{@link PartitionAlreadyExistsException} if the partition already exists and {@code
* ignoreIfExists} is false.
* <li>{@link PartitionSpecInvalidException} if the input partition spec is invalid.
* </ul>
*
* @param tablePath The table path of the table.
* @param partitionSpec The partition spec to add.
* @param ignoreIfExists Flag to specify behavior when a partition with the given name already
* exists: if set to false, throw a PartitionAlreadyExistsException, if set to true, do
* nothing.
*/
CompletableFuture<Void> addPartition(
TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfExists);

/**
* Drop a partition from a partitioned table.
*
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
*
* <ul>
* <li>{@link TableNotExistException} if the table does not exist and {@code ignoreIfExists}
* is false.
* <li>{@link TableNotPartitionedException} if the table is not partitioned.
* <li>{@link PartitionNotExistException} if the partition not exists and {@code
* ignoreIfExists} is false.
* <li>{@link PartitionSpecInvalidException} if the input partition spec is invalid.
* </ul>
*
* @param tablePath The table path of the table.
* @param partitionSpec The partition spec to drop.
* @param ignoreIfNotExists Flag to specify behavior when a partition with the given name does
* not exist: if set to false, throw a PartitionNotExistException, if set to true, do
* nothing.
*/
CompletableFuture<Void> dropPartition(
TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfNotExists);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.alibaba.fluss.metadata.DatabaseDescriptor;
import com.alibaba.fluss.metadata.DatabaseInfo;
import com.alibaba.fluss.metadata.PartitionInfo;
import com.alibaba.fluss.metadata.PartitionSpec;
import com.alibaba.fluss.metadata.PhysicalTablePath;
import com.alibaba.fluss.metadata.Schema;
import com.alibaba.fluss.metadata.SchemaInfo;
Expand Down Expand Up @@ -73,6 +74,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makeAddPartitionRequest;
import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makeListOffsetsRequest;
import static com.alibaba.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
Expand Down Expand Up @@ -347,6 +350,22 @@ public CompletableFuture<LakeStorageInfo> describeLakeStorage() {
.thenApply(ClientRpcMessageUtils::toLakeStorageInfo);
}

@Override
public CompletableFuture<Void> addPartition(
TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfExists) {
return gateway.addPartition(
makeAddPartitionRequest(tablePath, partitionSpec, ignoreIfExists))
.thenApply(r -> null);
}

@Override
public CompletableFuture<Void> dropPartition(
TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfNotExists) {
return gateway.dropPartition(
makeDropPartitionRequest(tablePath, partitionSpec, ignoreIfNotExists))
.thenApply(r -> null);
}

@Override
public void close() {
// nothing to do yet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@

import com.alibaba.fluss.row.InternalRow;
import com.alibaba.fluss.types.DataType;
import com.alibaba.fluss.types.DataTypeRoot;
import com.alibaba.fluss.types.RowType;
import com.alibaba.fluss.utils.Preconditions;

import java.util.Collections;
import java.util.List;

import static com.alibaba.fluss.utils.PartitionUtils.getPartitionName;

/** A getter to get partition name from a row. */
public class PartitionGetter {

// TODO currently, only support one partition key.
private final String partitionKeyName;
private final InternalRow.FieldGetter partitionFieldGetter;

public PartitionGetter(RowType rowType, List<String> partitionKeys) {
Expand All @@ -36,30 +40,29 @@ public PartitionGetter(RowType rowType, List<String> partitionKeys) {
"Currently, partitioned table only supports one partition key, but got partition keys %s.",
partitionKeys));
}

// check the partition column
List<String> fieldNames = rowType.getFieldNames();
String partitionColumnName = partitionKeys.get(0);
int partitionColumnIndex = fieldNames.indexOf(partitionColumnName);
this.partitionKeyName = partitionKeys.get(0);
int partitionColumnIndex = fieldNames.indexOf(partitionKeyName);
Preconditions.checkArgument(
partitionColumnIndex >= 0,
"The partition column %s is not in the row %s.",
partitionColumnName,
partitionKeyName,
rowType);

// check the data type of the partition column
DataType partitionColumnDataType = rowType.getTypeAt(partitionColumnIndex);
Preconditions.checkArgument(
partitionColumnDataType.getTypeRoot() == DataTypeRoot.STRING,
"Currently, partitioned table only supports STRING type partition key, but got partition key '%s' with data type %s.",
partitionColumnName,
partitionColumnDataType);
this.partitionFieldGetter =
InternalRow.createFieldGetter(partitionColumnDataType, partitionColumnIndex);
}

public String getPartition(InternalRow row) {
Object partitionValue = partitionFieldGetter.getFieldOrNull(row);
Preconditions.checkNotNull(partitionValue, "Partition value shouldn't be null.");
return partitionValue.toString();
return getPartitionName(
Collections.singletonList(partitionKeyName),
Collections.singletonList(partitionValue.toString()),
false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.alibaba.fluss.fs.token.ObtainedSecurityToken;
import com.alibaba.fluss.lakehouse.LakeStorageInfo;
import com.alibaba.fluss.metadata.PartitionInfo;
import com.alibaba.fluss.metadata.PartitionSpec;
import com.alibaba.fluss.metadata.PhysicalTablePath;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TablePath;
Expand All @@ -37,7 +38,9 @@
import com.alibaba.fluss.remote.RemoteLogFetchInfo;
import com.alibaba.fluss.remote.RemoteLogSegment;
import com.alibaba.fluss.rpc.entity.FetchLogResultForBucket;
import com.alibaba.fluss.rpc.messages.AddPartitionRequest;
import com.alibaba.fluss.rpc.messages.DescribeLakeStorageResponse;
import com.alibaba.fluss.rpc.messages.DropPartitionRequest;
import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenResponse;
import com.alibaba.fluss.rpc.messages.GetKvSnapshotMetadataResponse;
import com.alibaba.fluss.rpc.messages.GetLatestKvSnapshotsResponse;
Expand Down Expand Up @@ -377,6 +380,44 @@ public static ListOffsetsRequest makeListOffsetsRequest(
return listOffsetsRequest;
}

public static AddPartitionRequest makeAddPartitionRequest(
TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfNotExists) {
AddPartitionRequest addPartitionRequest =
new AddPartitionRequest().setIgnoreIfNotExists(ignoreIfNotExists);
addPartitionRequest
.setTablePath()
.setDatabaseName(tablePath.getDatabaseName())
.setTableName(tablePath.getTableName());
partitionSpec
.getPartitionSpec()
.forEach(
(partitionKey, value) ->
addPartitionRequest
.addPartitionSpec()
.setPartitionKey(partitionKey)
.setValue(value));
return addPartitionRequest;
}

public static DropPartitionRequest makeDropPartitionRequest(
TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfNotExists) {
DropPartitionRequest dropPartitionRequest =
new DropPartitionRequest().setIgnoreIfNotExists(ignoreIfNotExists);
dropPartitionRequest
.setTablePath()
.setDatabaseName(tablePath.getDatabaseName())
.setTableName(tablePath.getTableName());
partitionSpec
.getPartitionSpec()
.forEach(
(partitionKey, value) ->
dropPartitionRequest
.addPartitionSpec()
.setPartitionKey(partitionKey)
.setValue(value));
return dropPartitionRequest;
}

public static List<PartitionInfo> toPartitionInfos(ListPartitionInfosResponse response) {
return response.getPartitionsInfosList().stream()
.map(
Expand Down
Loading

0 comments on commit 20a217d

Please sign in to comment.