Skip to content

Commit

Permalink
Field cleanup should be skippable
Browse files Browse the repository at this point in the history
  • Loading branch information
Paultagoras committed Nov 27, 2024
1 parent ccad169 commit 6dc6699
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 2 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
# 1.2.6
* Detect if table schema has changed and refresh the schema
* Allow bypassing field cleanup

# 1.2.5
* Remove redis state provide since we are using KeeperMap for state storage
* Remove unused avro property from `build.gradle.kts`
* Trim schemaless data to only pass the fields that are in the table
* Allow bypassing the schema validation
* Detect if table schema has changed and refresh the schema

# 1.2.4
* Adjusting underlying client version to 0.7.0
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v1.2.5
v1.2.6
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class ClickHouseSinkConfig {
public static final String DATE_TIME_FORMAT = "dateTimeFormats";
public static final String TOLERATE_STATE_MISMATCH = "tolerateStateMismatch";
public static final String BYPASS_SCHEMA_VALIDATION = "bypassSchemaValidation";
public static final String BYPASS_FIELD_CLEANUP = "bypassFieldCleanup";

public static final int MILLI_IN_A_SEC = 1000;
private static final String databaseDefault = "default";
Expand Down Expand Up @@ -90,6 +91,7 @@ public class ClickHouseSinkConfig {
private final String clientVersion;
private final boolean tolerateStateMismatch;
private final boolean bypassSchemaValidation;
private final boolean bypassFieldCleanup;

public enum InsertFormats {
NONE,
Expand Down Expand Up @@ -263,6 +265,7 @@ public ClickHouseSinkConfig(Map<String, String> props) {
this.clientVersion = props.getOrDefault(CLIENT_VERSION, "V1");
this.tolerateStateMismatch = Boolean.parseBoolean(props.getOrDefault(TOLERATE_STATE_MISMATCH, "false"));
this.bypassSchemaValidation = Boolean.parseBoolean(props.getOrDefault(BYPASS_SCHEMA_VALIDATION, "false"));
this.bypassFieldCleanup = Boolean.parseBoolean(props.getOrDefault(BYPASS_FIELD_CLEANUP, "false"));

LOGGER.debug("ClickHouseSinkConfig: hostname: {}, port: {}, database: {}, username: {}, sslEnabled: {}, timeout: {}, retry: {}, exactlyOnce: {}",
hostname, port, database, username, sslEnabled, timeout, retry, exactlyOnce);
Expand All @@ -285,6 +288,7 @@ public void addClickHouseSetting(String key, String value, boolean override) {
private static ConfigDef createConfigDef() {
ConfigDef configDef = new ConfigDef();

//TODO: At some point we should group these more clearly
String group = "Connection";
int orderInGroup = 0;
configDef.define(HOSTNAME,
Expand Down Expand Up @@ -568,6 +572,26 @@ private static ConfigDef createConfigDef() {
ConfigDef.Width.SHORT,
"Tolerate state mismatch."
);
configDef.define(BYPASS_SCHEMA_VALIDATION,
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.LOW,
"Bypass schema validation. default: false",
group,
++orderInGroup,
ConfigDef.Width.SHORT,
"Bypass schema validation."
);
configDef.define(BYPASS_FIELD_CLEANUP,
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.LOW,
"Bypass field cleanup. default: false",
group,
++orderInGroup,
ConfigDef.Width.SHORT,
"Bypass field cleanup."
);
return configDef;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,10 @@ protected void doInsertJsonV2(List<Record> records, Table table, QueryIdentifier
}

protected Map<String, Object> cleanupExtraFields(Map<String, Object> m, Table t) {
if (csc.isBypassFieldCleanup()) {
return m;
}

Map<String, Object> cleaned = new HashMap<>();
for (Column c : t.getRootColumnsList()) {
if (m.containsKey(c.getName())) {
Expand Down

0 comments on commit 6dc6699

Please sign in to comment.