Skip to content

Commit

Permalink
added option to insert to random shard (#20336)
Browse files Browse the repository at this point in the history
* added option to insert to random shard

* added changelog and updated cue file

* fix spelling

* fix fragment type on changelog

* changelog suggestion

* added authors

* fix missing new line (please work already)

* fix missing parameters for testing
  • Loading branch information
rguleryuz authored Apr 22, 2024
1 parent f391fe2 commit 69f985d
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `clickhouse` sink now has a new configuration option, `insert_random_shard`, to tell Clickhouse to insert into a random shard (by setting `insert_distributed_one_random_shard`). See the Clickhouse [Distributed Table Engine docs](https://clickhouse.com/docs/en/engines/table-engines/special/distributed) for details.

authors: rguleryuz
5 changes: 5 additions & 0 deletions src/sinks/clickhouse/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ pub struct ClickhouseConfig {
#[serde(default)]
pub date_time_best_effort: bool,

/// Sets `insert_distributed_one_random_shard`, allowing ClickHouse to insert data into a random shard when using Distributed Table Engine.
#[serde(default)]
pub insert_random_shard: bool,

#[configurable(derived)]
#[serde(default = "Compression::gzip_default")]
pub compression: Compression,
Expand Down Expand Up @@ -129,6 +133,7 @@ impl SinkConfig for ClickhouseConfig {
endpoint: endpoint.clone(),
skip_unknown_fields: self.skip_unknown_fields,
date_time_best_effort: self.date_time_best_effort,
insert_random_shard: self.insert_random_shard,
compression: self.compression,
};

Expand Down
10 changes: 10 additions & 0 deletions src/sinks/clickhouse/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub(super) struct ClickhouseServiceRequestBuilder {
pub(super) endpoint: Uri,
pub(super) skip_unknown_fields: bool,
pub(super) date_time_best_effort: bool,
pub(super) insert_random_shard: bool,
pub(super) compression: Compression,
}

Expand All @@ -82,6 +83,7 @@ impl HttpServiceRequestBuilder<PartitionKey> for ClickhouseServiceRequestBuilder
metadata.format,
self.skip_unknown_fields,
self.date_time_best_effort,
self.insert_random_shard,
)?;

let auth: Option<Auth> = self.auth.clone();
Expand Down Expand Up @@ -112,6 +114,7 @@ fn set_uri_query(
format: Format,
skip_unknown: bool,
date_time_best_effort: bool,
insert_random_shard: bool,
) -> crate::Result<Uri> {
let query = url::form_urlencoded::Serializer::new(String::new())
.append_pair(
Expand All @@ -138,6 +141,9 @@ fn set_uri_query(
if date_time_best_effort {
uri.push_str("date_time_input_format=best_effort&")
}
if insert_random_shard {
uri.push_str("insert_distributed_one_random_shard=1&")
}
uri.push_str(query.as_str());

uri.parse::<Uri>()
Expand All @@ -158,6 +164,7 @@ mod tests {
Format::JsonEachRow,
false,
true,
false,
)
.unwrap();
assert_eq!(uri.to_string(), "http://localhost:80/?\
Expand All @@ -172,6 +179,7 @@ mod tests {
Format::JsonEachRow,
false,
false,
false,
)
.unwrap();
assert_eq!(uri.to_string(), "http://localhost:80/?\
Expand All @@ -185,6 +193,7 @@ mod tests {
Format::JsonAsObject,
true,
true,
false,
)
.unwrap();
assert_eq!(uri.to_string(), "http://localhost:80/?\
Expand All @@ -203,6 +212,7 @@ mod tests {
Format::JsonEachRow,
false,
false,
false,
)
.unwrap_err();
}
Expand Down
5 changes: 5 additions & 0 deletions website/cue/reference/components/sinks/base/clickhouse.cue
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ base: components: sinks: clickhouse: configuration: {
}
}
}
insert_random_shard: {
description: "Sets `insert_distributed_one_random_shard`, allowing ClickHouse to insert data into a random shard when using Distributed Table Engine."
required: false
type: bool: default: false
}
request: {
description: """
Middleware settings for outbound requests.
Expand Down

0 comments on commit 69f985d

Please sign in to comment.