Skip to content

Commit

Permalink
add ci
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Jun 21, 2024
1 parent 4a25f01 commit 98b6a19
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 70 deletions.
36 changes: 33 additions & 3 deletions ci/scripts/e2e-clickhouse-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ sleep 1
echo "--- create clickhouse table"
curl https://clickhouse.com/ | sh
sleep 2
./clickhouse client --host=clickhouse-server --port=9000 --query="CREATE table demo_test(v1 Int32,v2 Int64,v3 String,v4 Enum16('A'=1,'B'=2), v5 decimal64(3))ENGINE = ReplacingMergeTree PRIMARY KEY (v1);"
./clickhouse client --host=clickhouse-server --port=9000 --query="CREATE table demo_test_append_only(v1 Int32,v2 Int64,v3 String,v4 Enum16('A'=1,'B'=2), v5 decimal64(3))ENGINE = ReplacingMergeTree PRIMARY KEY (v1);"
./clickhouse client --host=clickhouse-server --port=9000 --query="CREATE table demo_test_upsert1(v1 Int32,v2 Int64,v3 String,ver DateTime64,del UInt8)ENGINE = ReplacingMergeTree(ver, del) PRIMARY KEY (v1);"
./clickhouse client --host=clickhouse-server --port=9000 --query="CREATE table demo_test_upsert2(v1 Int32,v2 Int64,v3 String,del Int8)ENGINE = CollapsingMergeTree(del) PRIMARY KEY (v1);"

echo "--- testing sinks"
echo "--- testing sinks append_only"
sqllogictest -p 4566 -d dev './e2e_test/sink/clickhouse_sink.slt'
sleep 5
./clickhouse client --host=clickhouse-server --port=9000 --query="select * from demo_test FORMAT CSV;" > ./query_result.csv
./clickhouse client --host=clickhouse-server --port=9000 --query="select * from demo_test_append_only FORMAT CSV;" > ./query_result.csv


# check sink destination using shell
Expand All @@ -56,5 +58,33 @@ else
exit 1
fi

echo "--- testing sinks upsert1"
./clickhouse client --host=clickhouse-server --port=9000 --query="select * from demo_test_upsert1 FORMAT CSV final;" > ./query_result2.csv

if cat ./query_result2.csv | sort | awk -F "," '{
if ($1 == 2 && $2 == 2 && $3 == "\"2-2\"" && $4 == "2013-01-02 01:01:02+01:00" && $4 == 0) c2++;
if ($1 == 3 && $2 == 2 && $3 == "\"3-2\"" && $4 == "2013-01-03 01:01:02+01:00" && $4 == 0) c3++;
END { exit !(c2 == 1 && c3 == 1); }'; then
echo "Clickhouse sink check passed"
else
echo "The output is not as expected."
cat ./query_result2.csv
exit 1
fi

echo "--- testing sinks upsert2"
./clickhouse client --host=clickhouse-server --port=9000 --query="select * from demo_test_upsert2 FORMAT CSV final;" > ./query_result3.csv

if cat ./query_result2.csv | sort | awk -F "," '{
if ($1 == 2 && $2 == 2 && $3 == "\"2-2\"" && $3 == 1) c2++;
if ($1 == 3 && $2 == 2 && $3 == "\"3-2\"" && $3 == 1) c3++; }
END { exit !(c2 == 1 && c3 == 1); }'; then
echo "Clickhouse sink check passed"
else
echo "The output is not as expected."
cat ./query_result3.csv
exit 1
fi

echo "--- Kill cluster"
risedev ci-kill
64 changes: 58 additions & 6 deletions e2e_test/sink/clickhouse_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,83 @@ statement ok
CREATE TABLE t6 (v1 int primary key, v2 bigint, v3 varchar, v4 smallint, v5 decimal);

statement ok
CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;
CREATE TABLE t7 (v1 int primary key, v2 bigint, v3 varchar,ver timestamptz);

statement ok
CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, mv6.v4 as v4, mv6.v5 as v5 from mv6 WITH (
CREATE TABLE t8 (v1 int primary key, v2 bigint, v3 varchar);


statement ok
CREATE SINK s6 from t6 WITH (
connector = 'clickhouse',
type = 'append-only',
force_append_only='true',
clickhouse.url = 'http://clickhouse-server:8123',
clickhouse.url = 'http://127.0.0.1:8123',
clickhouse.user = 'default',
clickhouse.password = '',
clickhouse.database = 'default',
clickhouse.table='demo_test',
clickhouse.table='demo_test_append_only',
);

statement ok
INSERT INTO t6 VALUES (1, 50, '1-50', 1, 1.1), (2, 2, '2-2', 2, 2.2), (3, 2, '3-2', 1, 3.3), (5, 2, '5-2', 2, 4.4), (8, 2, '8-2', 1, 'inf'), (13, 2, '13-2', 2, '-inf'), (21, 2, '21-2', 1, 'nan');


statement ok
CREATE SINK s7 from t7 WITH (
connector = 'clickhouse',
type = 'upsert',
primary_key = 'v1',
clickhouse.url = 'http://127.0.0.1:8123',
clickhouse.user = 'default',
clickhouse.password = '',
clickhouse.database = 'default',
clickhouse.table='demo_test_upsert1',
clickhouse.delete.column = 'del'
);

statement ok
INSERT INTO t7 VALUES (1, 50, '1-50', '2013-01-01 01:01:02+01:00'), (2, 2, '2-2', '2013-01-02 01:01:02+01:00'), (3, 2, '3-2','2013-01-03 01:01:02+01:00');

statement ok
delete from t7 where v1 = 1;

statement ok
CREATE SINK s8 from t8 WITH (
connector = 'clickhouse',
type = 'upsert',
primary_key = 'v1',
clickhouse.url = 'http://127.0.0.1:8123',
clickhouse.user = 'default',
clickhouse.password = '',
clickhouse.database = 'default',
clickhouse.table='demo_test_upsert2',
clickhouse.delete.column = 'del'
);

statement ok
INSERT INTO t8 VALUES (1, 50, '1-50'), (2, 2, '2-2'), (3, 2, '3-2');

statement ok
delete from t8 where v1 = 1;

statement ok
FLUSH;

statement ok
DROP SINK s6;

statement ok
DROP MATERIALIZED VIEW mv6;
DROP TABLE t6;

statement ok
DROP SINK s7;

statement ok
DROP TABLE t7;

statement ok
DROP SINK s8;

statement ok
DROP TABLE t6;
DROP TABLE t8;
133 changes: 72 additions & 61 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,6 @@ impl ClickHouseEngine {
}
}

pub fn get_delete_col(&self) -> Option<String> {
match self {
ClickHouseEngine::ReplacingMergeTree(Some(delete_col)) => Some(delete_col.to_string()),
ClickHouseEngine::ReplicatedReplacingMergeTree(Some(delete_col)) => {
Some(delete_col.to_string())
}
_ => None,
}
}

pub fn get_sign_name(&self) -> Option<String> {
match self {
ClickHouseEngine::CollapsingMergeTree(sign_name) => Some(sign_name.to_string()),
Expand All @@ -126,6 +116,10 @@ impl ClickHouseEngine {
ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(sign_name) => {
Some(sign_name.to_string())
}
ClickHouseEngine::ReplacingMergeTree(Some(delete_col)) => Some(delete_col.to_string()),
ClickHouseEngine::ReplicatedReplacingMergeTree(Some(delete_col)) => {
Some(delete_col.to_string())
}
_ => None,
}
}
Expand All @@ -144,30 +138,42 @@ impl ClickHouseEngine {
"AggregatingMergeTree" => Ok(ClickHouseEngine::AggregatingMergeTree),
// VersionedCollapsingMergeTree(sign_name,"a")
"VersionedCollapsingMergeTree" => {
let sign_name = engine_name
.create_table_query
.split("VersionedCollapsingMergeTree(")
.last()
.ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))?
.split(',')
.next()
.ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))?
.trim()
.to_string();
let sign_name = match config.common.delete_column.clone() {
Some(sign) => sign,
None => {
warn!("If you use `upsert`, it is recommended to fill in `delete_column`, otherwise we will use string matching to get it, which may have some problems");
engine_name
.create_table_query
.split("VersionedCollapsingMergeTree(")
.last()
.ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))?
.split(',')
.next()
.ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))?
.trim()
.to_string()
}
};
Ok(ClickHouseEngine::VersionedCollapsingMergeTree(sign_name))
}
// CollapsingMergeTree(sign_name)
"CollapsingMergeTree" => {
let sign_name = engine_name
.create_table_query
.split("CollapsingMergeTree(")
.last()
.ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))?
.split(')')
.next()
.ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))?
.trim()
.to_string();
let sign_name = match config.common.delete_column.clone() {
Some(sign) => sign,
None => {
warn!("If you use `upsert`, it is recommended to fill in `delete_column`, otherwise we will use string matching to get it, which may have some problems");
engine_name
.create_table_query
.split("CollapsingMergeTree(")
.last()
.ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))?
.split(')')
.next()
.ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))?
.trim()
.to_string()
}
};
Ok(ClickHouseEngine::CollapsingMergeTree(sign_name))
}
"GraphiteMergeTree" => Ok(ClickHouseEngine::GraphiteMergeTree),
Expand All @@ -184,34 +190,46 @@ impl ClickHouseEngine {
}
// ReplicatedVersionedCollapsingMergeTree("a","b",sign_name,"c")
"ReplicatedVersionedCollapsingMergeTree" => {
let sign_name = engine_name
.create_table_query
.split("ReplicatedVersionedCollapsingMergeTree(")
.last()
.ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))?
.split(',')
.rev()
.nth(1)
.ok_or_else(|| SinkError::ClickHouse("must have index 1".to_string()))?
.trim()
.to_string();
let sign_name = match config.common.delete_column.clone() {
Some(sign) => sign,
None => {
warn!("If you use `upsert`, it is recommended to fill in `delete_column`, otherwise we will use string matching to get it, which may have some problems");
engine_name
.create_table_query
.split("ReplicatedVersionedCollapsingMergeTree(")
.last()
.ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))?
.split(',')
.rev()
.nth(1)
.ok_or_else(|| SinkError::ClickHouse("must have index 1".to_string()))?
.trim()
.to_string()
}
};
Ok(ClickHouseEngine::VersionedCollapsingMergeTree(sign_name))
}
// ReplicatedCollapsingMergeTree("a","b",sign_name)
"ReplicatedCollapsingMergeTree" => {
let sign_name = engine_name
.create_table_query
.split("ReplicatedCollapsingMergeTree(")
.last()
.ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))?
.split(')')
.next()
.ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))?
.split(',')
.last()
.ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))?
.trim()
.to_string();
let sign_name = match config.common.delete_column.clone() {
Some(sign) => sign,
None => {
warn!("If you use `upsert`, it is recommended to fill in `delete_column`, otherwise we will use string matching to get it, which may have some problems");
engine_name
.create_table_query
.split("ReplicatedCollapsingMergeTree(")
.last()
.ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))?
.split(')')
.next()
.ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))?
.split(',')
.last()
.ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))?
.trim()
.to_string()
}
};
Ok(ClickHouseEngine::CollapsingMergeTree(sign_name))
}
"ReplicatedGraphiteMergeTree" => Ok(ClickHouseEngine::ReplicatedGraphiteMergeTree),
Expand Down Expand Up @@ -508,9 +526,6 @@ impl ClickHouseSinkWriter {
if let Some(sign) = clickhouse_engine.get_sign_name() {
rw_fields_name_after_calibration.push(sign);
}
if let Some(delete_col) = clickhouse_engine.get_delete_col() {
rw_fields_name_after_calibration.push(delete_col);
}
Ok(Self {
config,
schema,
Expand Down Expand Up @@ -715,10 +730,6 @@ async fn query_column_engine_from_ck(
clickhouse_column.retain(|a| sign.ne(&a.name))
}

if let Some(delete_col) = &clickhouse_engine.get_delete_col() {
clickhouse_column.retain(|a| delete_col.ne(&a.name))
}

Ok((clickhouse_column, clickhouse_engine))
}

Expand Down

0 comments on commit 98b6a19

Please sign in to comment.