Skip to content

Commit

Permalink
feat(table): support partition & upsert config
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx committed Jan 2, 2025
1 parent fc1ea5f commit e0e1d7b
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,17 @@ public class TopicConfig {

public static final String REMOTE_LOG_STORAGE_ENABLE_CONFIG = "remote.storage.enable";
public static final String REMOTE_LOG_STORAGE_ENABLE_DOC = "To enable tiered storage for a topic, set this configuration as true. " +
"You can not disable this config once it is enabled. It will be provided in future versions.";
"You can not disable this config once it is enabled. It will be provided in future versions.";

public static final String LOCAL_LOG_RETENTION_MS_CONFIG = "local.retention.ms";
public static final String LOCAL_LOG_RETENTION_MS_DOC = "The number of milliseconds to keep the local log segment before it gets deleted. " +
"Default value is -2, it represents `retention.ms` value is to be used. The effective value should always be less than or equal " +
"to `retention.ms` value.";
"Default value is -2, it represents `retention.ms` value is to be used. The effective value should always be less than or equal " +
"to `retention.ms` value.";

public static final String LOCAL_LOG_RETENTION_BYTES_CONFIG = "local.retention.bytes";
public static final String LOCAL_LOG_RETENTION_BYTES_DOC = "The maximum size of local log segments that can grow for a partition before it " +
"deletes the old segments. Default value is -2, it represents `retention.bytes` value to be used. The effective value should always be " +
"less than or equal to `retention.bytes` value.";
"deletes the old segments. Default value is -2, it represents `retention.bytes` value to be used. The effective value should always be " +
"less than or equal to `retention.bytes` value.";

public static final String REMOTE_LOG_DISABLE_POLICY_RETAIN = "retain";
public static final String REMOTE_LOG_DISABLE_POLICY_DELETE = "delete";
Expand All @@ -103,16 +103,16 @@ public class TopicConfig {
"selected then all data in remote will be kept post-disablement and will only be deleted when it breaches expiration " +
"thresholds. If %s is selected then the data will be made inaccessible immediately by advancing the log start offset and will be " +
"deleted asynchronously.", REMOTE_LOG_DISABLE_POLICY_RETAIN, REMOTE_LOG_DISABLE_POLICY_DELETE,
REMOTE_LOG_DISABLE_POLICY_RETAIN, REMOTE_LOG_DISABLE_POLICY_DELETE);
REMOTE_LOG_DISABLE_POLICY_RETAIN, REMOTE_LOG_DISABLE_POLICY_DELETE);

public static final String MAX_MESSAGE_BYTES_CONFIG = "max.message.bytes";
public static final String MAX_MESSAGE_BYTES_DOC =
"The largest record batch size allowed by Kafka (after compression if compression is enabled). " +
"If this is increased and there are consumers older than 0.10.2, the consumers' fetch " +
"size must also be increased so that they can fetch record batches this large. " +
"In the latest message format version, records are always grouped into batches for efficiency. " +
"In previous message format versions, uncompressed records are not grouped into batches and this " +
"limit only applies to a single record in that case.";
"If this is increased and there are consumers older than 0.10.2, the consumers' fetch " +
"size must also be increased so that they can fetch record batches this large. " +
"In the latest message format version, records are always grouped into batches for efficiency. " +
"In previous message format versions, uncompressed records are not grouped into batches and this " +
"limit only applies to a single record in that case.";

public static final String INDEX_INTERVAL_BYTES_CONFIG = "index.interval.bytes";
public static final String INDEX_INTERVAL_BYTES_DOC = "This setting controls how frequently " +
Expand Down Expand Up @@ -266,6 +266,15 @@ public class TopicConfig {
public static final String TABLE_TOPIC_NAMESPACE_DOC = "The table topic table namespace";
public static final String TABLE_TOPIC_SCHEMA_TYPE_CONFIG = "automq.table.topic.schema.type";
public static final String TABLE_TOPIC_SCHEMA_TYPE_DOC = "The table topic schema type, support schemaless, schema";
public static final String TABLE_TOPIC_ID_COLUMNS_CONFIG = "automq.table.topic.id.columns";
public static final String TABLE_TOPIC_ID_COLUMNS_DOC = "The primary key, comma-separated list of columns that identify a row in tables."
+ "ex. [region, name]";
public static final String TABLE_TOPIC_PARTITION_BY_CONFIG = "automq.table.topic.partition.by";
public static final String TABLE_TOPIC_PARTITION_BY_DOC = "The partition fields of the table. ex. [bucket(name), month(timestamp)]";
public static final String TABLE_TOPIC_UPSERT_ENABLE_CONFIG = "automq.table.topic.upsert.enable";
public static final String TABLE_TOPIC_UPSERT_ENABLE_DOC = "The configuration controls whether enable table topic upsert";
public static final String TABLE_TOPIC_CDC_FIELD_CONFIG = "automq.table.topic.cdc.field";
public static final String TABLE_TOPIC_CDC_FIELD_DOC = "The name of the field containing the CDC operation, I, U, or D";
// AutoMQ inject end

}
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,11 @@ public Optional<String> serverConfigName(String configName) {
.define(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG, LONG, TimeUnit.MINUTES.toMillis(5), between(1, TimeUnit.MINUTES.toMillis(15)), MEDIUM, TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_DOC)
.define(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG, STRING, null, null, MEDIUM, TopicConfig.TABLE_TOPIC_NAMESPACE_DOC)
.define(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG, STRING, TableTopicSchemaType.SCHEMALESS.name, in(TableTopicSchemaType.names().toArray(new String[0])), MEDIUM, TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_DOC)
// TODO: add validator
.define(TopicConfig.TABLE_TOPIC_ID_COLUMNS_CONFIG, STRING, null, null, MEDIUM, TopicConfig.TABLE_TOPIC_ID_COLUMNS_DOC)
.define(TopicConfig.TABLE_TOPIC_PARTITION_BY_CONFIG, STRING, null, null, MEDIUM, TopicConfig.TABLE_TOPIC_PARTITION_BY_DOC)
.define(TopicConfig.TABLE_TOPIC_UPSERT_ENABLE_CONFIG, BOOLEAN, false, null, MEDIUM, TopicConfig.TABLE_TOPIC_UPSERT_ENABLE_DOC)
.define(TopicConfig.TABLE_TOPIC_CDC_FIELD_CONFIG, STRING, null, null, MEDIUM, TopicConfig.TABLE_TOPIC_CDC_FIELD_DOC)
// AutoMQ inject end
.define(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, STRING, TopicConfig.REMOTE_LOG_DISABLE_POLICY_RETAIN,
in(TopicConfig.REMOTE_LOG_DISABLE_POLICY_RETAIN, TopicConfig.REMOTE_LOG_DISABLE_POLICY_DELETE),
Expand Down Expand Up @@ -393,6 +398,10 @@ public Optional<String> serverConfigName(String configName) {
public final long tableTopicCommitInterval;
public final String tableTopicNamespace;
public final TableTopicSchemaType tableTopicSchemaType;
public final String tableTopicIdColumns;
public final String tableTopicPartitionBy;
public final boolean tableTopicUpsertEnable;
public final String tableTopicCdcField;
// AutoMQ inject end

private final int maxMessageSize;
Expand Down Expand Up @@ -450,6 +459,10 @@ public LogConfig(Map<?, ?> props, Set<String> overriddenConfigs) {
this.tableTopicCommitInterval = getLong(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG);
this.tableTopicNamespace = getString(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG);
this.tableTopicSchemaType = TableTopicSchemaType.forName(getString(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG));
this.tableTopicIdColumns = getString(TopicConfig.TABLE_TOPIC_ID_COLUMNS_CONFIG);
this.tableTopicPartitionBy = getString(TopicConfig.TABLE_TOPIC_PARTITION_BY_CONFIG);
this.tableTopicUpsertEnable = getBoolean(TopicConfig.TABLE_TOPIC_UPSERT_ENABLE_CONFIG);
this.tableTopicCdcField = getString(TopicConfig.TABLE_TOPIC_CDC_FIELD_CONFIG);
// AutoMQ inject end

remoteLogConfig = new RemoteLogConfig(this);
Expand Down

0 comments on commit e0e1d7b

Please sign in to comment.