Skip to content

Commit

Permalink
Merge pull request ClickHouse#70075 from ClickHouse/fix_dddl_test
Browse files Browse the repository at this point in the history
Fix `test_distributed_ddl`
  • Loading branch information
alexey-milovidov authored Sep 28, 2024
2 parents 6126310 + cb9bf8a commit b2aa78e
Showing 1 changed file with 35 additions and 29 deletions.
64 changes: 35 additions & 29 deletions tests/integration/test_distributed_ddl/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ def test_cluster(request):
yield cluster

instance = cluster.instances["ch1"]
cluster.ddl_check_query(instance, "DROP DATABASE test ON CLUSTER 'cluster'")
cluster.ddl_check_query(
instance, "DROP DATABASE IF EXISTS test ON CLUSTER 'cluster'"
)
cluster.ddl_check_query(
instance, "DROP DATABASE IF EXISTS test2 ON CLUSTER 'cluster'"
)
Expand All @@ -49,7 +51,7 @@ def test_default_database(test_cluster):
)
test_cluster.ddl_check_query(
instance,
"CREATE TABLE null ON CLUSTER 'cluster2' (s String DEFAULT 'escape\t\nme') ENGINE = Null",
"CREATE TABLE IF NOT EXISTS null ON CLUSTER 'cluster2' (s String DEFAULT 'escape\t\nme') ENGINE = Null",
settings={"distributed_ddl_entry_format_version": 2},
)

Expand All @@ -75,14 +77,15 @@ def test_create_view(test_cluster):
)
test_cluster.ddl_check_query(
instance,
"CREATE VIEW test.super_simple_view ON CLUSTER 'cluster' AS SELECT * FROM system.numbers FORMAT TSV",
"CREATE VIEW IF NOT EXISTS test.super_simple_view ON CLUSTER 'cluster' AS SELECT * FROM system.numbers FORMAT TSV",
)
test_cluster.ddl_check_query(
instance,
"CREATE MATERIALIZED VIEW test.simple_mat_view ON CLUSTER 'cluster' ENGINE = Memory AS SELECT * FROM system.numbers FORMAT TSV",
)
test_cluster.ddl_check_query(
instance, "DROP TABLE test.simple_mat_view ON CLUSTER 'cluster' FORMAT TSV"
instance,
"DROP TABLE IF EXISTS test.simple_mat_view ON CLUSTER 'cluster' FORMAT TSV",
)
test_cluster.ddl_check_query(
instance,
Expand All @@ -91,14 +94,14 @@ def test_create_view(test_cluster):

test_cluster.ddl_check_query(
instance,
"CREATE TABLE test.super_simple ON CLUSTER 'cluster' (i Int8) ENGINE = Memory",
"CREATE TABLE IF NOT EXISTS test.super_simple ON CLUSTER 'cluster' (i Int8) ENGINE = Memory",
)
test_cluster.ddl_check_query(
instance,
"RENAME TABLE test.super_simple TO test.super_simple2 ON CLUSTER 'cluster' FORMAT TSV",
)
test_cluster.ddl_check_query(
instance, "DROP TABLE test.super_simple2 ON CLUSTER 'cluster'"
instance, "DROP TABLE IF EXISTS test.super_simple2 ON CLUSTER 'cluster'"
)


Expand Down Expand Up @@ -131,7 +134,7 @@ def test_on_server_fail(test_cluster):
assert TSV(contents) == TSV("ch1\nch2\nch3\nch4\n")

test_cluster.ddl_check_query(
instance, "DROP TABLE test.test_server_fail ON CLUSTER 'cluster'"
instance, "DROP TABLE IF EXISTS test.test_server_fail ON CLUSTER 'cluster'"
)


Expand Down Expand Up @@ -211,20 +214,22 @@ def test_simple_alters(test_cluster):
"".join(["{}\t{}\n".format(x, x) for x in range(4)])
)

test_cluster.ddl_check_query(instance, "DROP TABLE merge ON CLUSTER '{cluster}'")
test_cluster.ddl_check_query(
instance, "DROP TABLE all_merge_32 ON CLUSTER '{cluster}'"
instance, "DROP TABLE IF EXISTS merge ON CLUSTER '{cluster}'"
)
test_cluster.ddl_check_query(
instance, "DROP TABLE IF EXISTS all_merge_32 ON CLUSTER '{cluster}'"
)
test_cluster.ddl_check_query(
instance, "DROP TABLE all_merge_64 ON CLUSTER '{cluster}'"
instance, "DROP TABLE IF EXISTS all_merge_64 ON CLUSTER '{cluster}'"
)


def test_macro(test_cluster):
instance = test_cluster.instances["ch2"]
test_cluster.ddl_check_query(
instance,
"CREATE TABLE tab ON CLUSTER '{cluster}' (value UInt8) ENGINE = Memory",
"CREATE TABLE IF NOT EXISTS tab ON CLUSTER '{cluster}' (value UInt8) ENGINE = Memory",
)

for i in range(4):
Expand All @@ -235,7 +240,7 @@ def test_macro(test_cluster):

test_cluster.ddl_check_query(
instance,
"CREATE TABLE distr ON CLUSTER '{cluster}' (value UInt8) ENGINE = Distributed('{cluster}', 'default', 'tab', value % 4)",
"CREATE TABLE IF NOT EXISTS distr ON CLUSTER '{cluster}' (value UInt8) ENGINE = Distributed('{cluster}', 'default', 'tab', value % 4)",
)

assert TSV(instance.query("SELECT value FROM distr ORDER BY value")) == TSV(
Expand Down Expand Up @@ -351,7 +356,7 @@ def test_optimize_query(test_cluster):
)
test_cluster.ddl_check_query(
instance,
"CREATE TABLE test_optimize ON CLUSTER cluster (p Date, i Int32) ENGINE = MergeTree(p, p, 8192)",
"CREATE TABLE IF NOT EXISTS test_optimize ON CLUSTER cluster (p Date, i Int32) ENGINE = MergeTree(p, p, 8192)",
)
test_cluster.ddl_check_query(
instance, "OPTIMIZE TABLE test_optimize ON CLUSTER cluster FORMAT TSV"
Expand All @@ -362,7 +367,7 @@ def test_create_as_select(test_cluster):
instance = test_cluster.instances["ch2"]
test_cluster.ddl_check_query(
instance,
"CREATE TABLE test_as_select ON CLUSTER cluster ENGINE = Memory AS (SELECT 1 AS x UNION ALL SELECT 2 AS x)",
"CREATE TABLE IF NOT EXISTS test_as_select ON CLUSTER cluster ENGINE = Memory AS (SELECT 1 AS x UNION ALL SELECT 2 AS x)",
)
assert TSV(instance.query("SELECT x FROM test_as_select ORDER BY x")) == TSV(
"1\n2\n"
Expand All @@ -376,11 +381,11 @@ def test_create_reserved(test_cluster):
instance = test_cluster.instances["ch2"]
test_cluster.ddl_check_query(
instance,
"CREATE TABLE test_reserved ON CLUSTER cluster (`p` Date, `image` Nullable(String), `index` Nullable(Float64), `invalidate` Nullable(Int64)) ENGINE = MergeTree(`p`, `p`, 8192)",
"CREATE TABLE IF NOT EXISTS test_reserved ON CLUSTER cluster (`p` Date, `image` Nullable(String), `index` Nullable(Float64), `invalidate` Nullable(Int64)) ENGINE = MergeTree(`p`, `p`, 8192)",
)
test_cluster.ddl_check_query(
instance,
"CREATE TABLE test_as_reserved ON CLUSTER cluster ENGINE = Memory AS (SELECT * from test_reserved)",
"CREATE TABLE IF NOT EXISTS test_as_reserved ON CLUSTER cluster ENGINE = Memory AS (SELECT * from test_reserved)",
)
test_cluster.ddl_check_query(
instance, "DROP TABLE IF EXISTS test_reserved ON CLUSTER cluster"
Expand Down Expand Up @@ -408,11 +413,11 @@ def test_rename(test_cluster):

test_cluster.ddl_check_query(
instance,
"CREATE TABLE rename_shard ON CLUSTER cluster (id Int64, sid String DEFAULT concat('old', toString(id))) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/staging/test_shard', '{replica}') ORDER BY (id)",
"CREATE TABLE IF NOT EXISTS rename_shard ON CLUSTER cluster (id Int64, sid String DEFAULT concat('old', toString(id))) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/staging/test_shard', '{replica}') ORDER BY (id)",
)
test_cluster.ddl_check_query(
instance,
"CREATE TABLE rename_new ON CLUSTER cluster AS rename_shard ENGINE = Distributed(cluster, default, rename_shard, id % 2)",
"CREATE TABLE IF NOT EXISTS rename_new ON CLUSTER cluster AS rename_shard ENGINE = Distributed(cluster, default, rename_shard, id % 2)",
)
test_cluster.ddl_check_query(
instance, "RENAME TABLE rename_new TO rename ON CLUSTER cluster;"
Expand All @@ -434,7 +439,7 @@ def test_rename(test_cluster):

test_cluster.ddl_check_query(
instance,
"CREATE TABLE rename_new ON CLUSTER cluster AS rename_shard ENGINE = Distributed(cluster, default, rename_shard, id % 2)",
"CREATE TABLE IF NOT EXISTS rename_new ON CLUSTER cluster AS rename_shard ENGINE = Distributed(cluster, default, rename_shard, id % 2)",
)

instance.query("system stop distributed sends rename")
Expand Down Expand Up @@ -494,7 +499,8 @@ def test_replicated_without_arguments(test_cluster):
)

test_cluster.ddl_check_query(
instance, "CREATE DATABASE test_atomic ON CLUSTER cluster ENGINE=Atomic"
instance,
"CREATE DATABASE IF NOT EXISTS test_atomic ON CLUSTER cluster ENGINE=Atomic",
)
assert (
"are supported only for ON CLUSTER queries with Atomic database engine"
Expand All @@ -504,14 +510,14 @@ def test_replicated_without_arguments(test_cluster):
)
test_cluster.ddl_check_query(
instance,
"CREATE TABLE test_atomic.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree() ORDER BY n",
"CREATE TABLE IF NOT EXISTS test_atomic.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree() ORDER BY n",
)
test_cluster.ddl_check_query(
instance, "DROP TABLE test_atomic.rmt ON CLUSTER cluster SYNC"
)
test_cluster.ddl_check_query(
instance,
"CREATE TABLE test_atomic.rmt UUID '12345678-0000-4000-8000-000000000001' ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n",
"CREATE TABLE IF NOT EXISTS test_atomic.rmt UUID '12345678-0000-4000-8000-000000000001' ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n",
)
assert (
instance.query("SHOW CREATE test_atomic.rmt FORMAT TSVRaw")
Expand All @@ -523,7 +529,7 @@ def test_replicated_without_arguments(test_cluster):
)
test_cluster.ddl_check_query(
instance,
"CREATE TABLE test_atomic.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}') ORDER BY n",
"CREATE TABLE IF NOT EXISTS test_atomic.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}') ORDER BY n",
)
test_cluster.ddl_check_query(
instance,
Expand All @@ -543,23 +549,23 @@ def test_replicated_without_arguments(test_cluster):
)
test_cluster.ddl_check_query(
instance,
"CREATE TABLE test_atomic.rrmt ON CLUSTER cluster (n UInt64, m UInt64) ENGINE=ReplicatedReplacingMergeTree(m) ORDER BY n",
"CREATE TABLE IF NOT EXISTS test_atomic.rrmt ON CLUSTER cluster (n UInt64, m UInt64) ENGINE=ReplicatedReplacingMergeTree(m) ORDER BY n",
)
test_cluster.ddl_check_query(
instance,
"CREATE TABLE test_atomic.rsmt ON CLUSTER cluster (n UInt64, m UInt64, k UInt64) ENGINE=ReplicatedSummingMergeTree((m, k)) ORDER BY n",
"CREATE TABLE IF NOT EXISTS test_atomic.rsmt ON CLUSTER cluster (n UInt64, m UInt64, k UInt64) ENGINE=ReplicatedSummingMergeTree((m, k)) ORDER BY n",
)
test_cluster.ddl_check_query(
instance,
"CREATE TABLE test_atomic.rvcmt ON CLUSTER cluster (n UInt64, m Int8, k UInt64) ENGINE=ReplicatedVersionedCollapsingMergeTree(m, k) ORDER BY n",
"CREATE TABLE IF NOT EXISTS test_atomic.rvcmt ON CLUSTER cluster (n UInt64, m Int8, k UInt64) ENGINE=ReplicatedVersionedCollapsingMergeTree(m, k) ORDER BY n",
)
test_cluster.ddl_check_query(
instance, "DROP DATABASE test_atomic ON CLUSTER cluster SYNC"
)

test_cluster.ddl_check_query(
instance,
"CREATE DATABASE test_ordinary ON CLUSTER cluster ENGINE=Ordinary",
"CREATE DATABASE IF NOT EXISTS test_ordinary ON CLUSTER cluster ENGINE=Ordinary",
settings={"allow_deprecated_database_ordinary": 1},
)
assert (
Expand All @@ -576,14 +582,14 @@ def test_replicated_without_arguments(test_cluster):
)
test_cluster.ddl_check_query(
instance,
"CREATE TABLE test_ordinary.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree('/{shard}/{table}/', '{replica}') ORDER BY n",
"CREATE TABLE IF NOT EXISTS test_ordinary.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree('/{shard}/{table}/', '{replica}') ORDER BY n",
)
assert (
instance.query("SHOW CREATE test_ordinary.rmt FORMAT TSVRaw")
== "CREATE TABLE test_ordinary.rmt\n(\n `n` UInt64,\n `s` String\n)\nENGINE = ReplicatedMergeTree('/{shard}/rmt/', '{replica}')\nORDER BY n\nSETTINGS index_granularity = 8192\n"
)
test_cluster.ddl_check_query(
instance, "DROP DATABASE test_ordinary ON CLUSTER cluster SYNC"
instance, "DROP DATABASE IF EXISTS test_ordinary ON CLUSTER cluster SYNC"
)
test_cluster.pm_random_drops.push_rules(rules)

Expand Down

0 comments on commit b2aa78e

Please sign in to comment.