diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index 26928fd3af2..6f5443ba304 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -20,6 +20,7 @@ #include "access/htup_details.h" #include "catalog/pg_class.h" #include "catalog/pg_enum.h" +#include "distributed/adaptive_executor.h" #include "distributed/citus_ruleutils.h" #include "distributed/colocation_utils.h" #include "distributed/commands.h" @@ -38,6 +39,7 @@ #include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/shard_rebalancer.h" +#include "distributed/shard_split.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" #include "distributed/worker_transaction.h" @@ -129,6 +131,7 @@ static List * PostLoadShardCreationCommandList(ShardInterval *shardInterval, int32 sourceNodePort); static ShardCommandList * CreateShardCommandList(ShardInterval *shardInterval, List *ddlCommandList); +static char * CreateShardCopyCommand(ShardInterval *shard, WorkerNode *targetNode); /* declarations for dynamic loading */ @@ -1180,6 +1183,9 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, ALLOCSET_DEFAULT_SIZES); MemoryContext oldContext = MemoryContextSwitchTo(localContext); + WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort); + WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort); + /* iterate through the colocated shards and copy each */ ShardInterval *shardInterval = NULL; foreach_ptr(shardInterval, shardIntervalList) @@ -1199,9 +1205,12 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, char *tableOwner = TableOwner(shardInterval->relationId); SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, tableOwner, ddlCommandList); + } - ddlCommandList = NIL; - + int taskId = 0; + List *copyTaskList = NIL; + foreach_ptr(shardInterval, shardIntervalList) + { /* * Skip copying data for partitioned tables, because they contain no * data themselves. Their partitions do contain data, but those are @@ -1209,13 +1218,35 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, */ if (!PartitionedTable(shardInterval->relationId)) { - ddlCommandList = CopyShardContentsCommandList(shardInterval, sourceNodeName, - sourceNodePort); + char *copyCommand = CreateShardCopyCommand( + shardInterval, targetNode); + + Task *copyTask = CreateBasicTask( + INVALID_JOB_ID, + taskId, + READ_TASK, + copyCommand); + + ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement); + SetPlacementNodeMetadata(taskPlacement, sourceNode); + + copyTask->taskPlacementList = list_make1(taskPlacement); + + copyTaskList = lappend(copyTaskList, copyTask); + taskId++; } - ddlCommandList = list_concat( - ddlCommandList, + } + + ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, copyTaskList, + MaxAdaptiveExecutorPoolSize, + NULL /* jobIdList (ignored by API implementation) */); + + foreach_ptr(shardInterval, shardIntervalList) + { + List *ddlCommandList = PostLoadShardCreationCommandList(shardInterval, sourceNodeName, - sourceNodePort)); + sourceNodePort); + char *tableOwner = TableOwner(shardInterval->relationId); SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, tableOwner, ddlCommandList); @@ -1278,6 +1309,25 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, } +/* + * CreateShardCopyCommand constructs the command to copy a shard to another + * worker node. This command needs to be run on the node wher you want to copy + * the shard from. + */ +static char * +CreateShardCopyCommand(ShardInterval *shard, + WorkerNode *targetNode) +{ + char *shardName = ConstructQualifiedShardName(shard); + StringInfo query = makeStringInfo(); + appendStringInfo(query, + "SELECT pg_catalog.worker_copy_table_to_node(%s::regclass, %u);", + quote_literal_cstr(shardName), + targetNode->nodeId); + return query->data; +} + + /* * CopyPartitionShardsCommandList gets a shardInterval which is a shard that * belongs to partitioned table (this is asserted). diff --git a/src/backend/distributed/operations/worker_copy_table_to_node_udf.c b/src/backend/distributed/operations/worker_copy_table_to_node_udf.c new file mode 100644 index 00000000000..46391160c57 --- /dev/null +++ b/src/backend/distributed/operations/worker_copy_table_to_node_udf.c @@ -0,0 +1,65 @@ +/*------------------------------------------------------------------------- + * + * worker_copy_table_to_node_udf.c + * + * This file implements the worker_copy_table_to_node UDF. This UDF can be + * used to copy the data in a shard (or other table) from one worker node to + * another. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "distributed/citus_ruleutils.h" +#include "distributed/metadata_cache.h" +#include "distributed/multi_executor.h" +#include "distributed/worker_shard_copy.h" + +PG_FUNCTION_INFO_V1(worker_copy_table_to_node); + +/* + * worker_copy_table_to_node copies a shard from this worker to another worker + * + * SQL signature: + * + * worker_copy_table_to_node( + * source_table regclass, + * target_node_id integer + * ) RETURNS VOID + */ +Datum +worker_copy_table_to_node(PG_FUNCTION_ARGS) +{ + Oid relationId = PG_GETARG_OID(0); + uint32_t targetNodeId = PG_GETARG_INT32(1); + + Oid schemaOid = get_rel_namespace(relationId); + char *relationSchemaName = get_namespace_name(schemaOid); + char *relationName = get_rel_name(relationId); + char *relationQualifiedName = quote_qualified_identifier( + relationSchemaName, + relationName); + + EState *executor = CreateExecutorState(); + DestReceiver *destReceiver = CreateShardCopyDestReceiver( + executor, + list_make2(relationSchemaName, relationName), + targetNodeId); + + StringInfo selectShardQueryForCopy = makeStringInfo(); + appendStringInfo(selectShardQueryForCopy, + "SELECT * FROM %s;", relationQualifiedName); + + ParamListInfo params = NULL; + ExecuteQueryStringIntoDestReceiver(selectShardQueryForCopy->data, params, + destReceiver); + + FreeExecutorState(executor); + + PG_RETURN_VOID(); +} diff --git a/src/backend/distributed/sql/citus--11.0-3--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-3--11.1-1.sql index f8b9563788b..e71f9362bbb 100644 --- a/src/backend/distributed/sql/citus--11.0-3--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-3--11.1-1.sql @@ -69,3 +69,4 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_ #include "udfs/get_all_active_transactions/11.1-1.sql" #include "udfs/citus_split_shard_by_split_points/11.1-1.sql" #include "udfs/worker_split_copy/11.1-1.sql" +#include "udfs/worker_copy_table_to_node/11.1-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-3.sql b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-3.sql index 26430a9f683..7261a31db2b 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-3.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-3.sql @@ -73,6 +73,10 @@ DROP FUNCTION pg_catalog.worker_split_copy( splitCopyInfos pg_catalog.split_copy_info[]); DROP TYPE pg_catalog.split_copy_info; +DROP FUNCTION pg_catalog.worker_copy_table_to_node( + source_table regclass, + target_node_id integer); + DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT global_pid int8); diff --git a/src/backend/distributed/sql/udfs/worker_copy_table_to_node/11.1-1.sql b/src/backend/distributed/sql/udfs/worker_copy_table_to_node/11.1-1.sql new file mode 100644 index 00000000000..ebe093dee33 --- /dev/null +++ b/src/backend/distributed/sql/udfs/worker_copy_table_to_node/11.1-1.sql @@ -0,0 +1,8 @@ +CREATE OR REPLACE FUNCTION pg_catalog.worker_copy_table_to_node( + source_table regclass, + target_node_id integer) +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$worker_copy_table_to_node$$; +COMMENT ON FUNCTION pg_catalog.worker_copy_table_to_node(regclass, integer) + IS 'Perform copy of a shard'; diff --git a/src/backend/distributed/sql/udfs/worker_copy_table_to_node/latest.sql b/src/backend/distributed/sql/udfs/worker_copy_table_to_node/latest.sql new file mode 100644 index 00000000000..ebe093dee33 --- /dev/null +++ b/src/backend/distributed/sql/udfs/worker_copy_table_to_node/latest.sql @@ -0,0 +1,8 @@ +CREATE OR REPLACE FUNCTION pg_catalog.worker_copy_table_to_node( + source_table regclass, + target_node_id integer) +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$worker_copy_table_to_node$$; +COMMENT ON FUNCTION pg_catalog.worker_copy_table_to_node(regclass, integer) + IS 'Perform copy of a shard'; diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index e271d0ceba1..e0cab96d69d 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -207,6 +207,15 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) CopyShardPlacementToWorkerNodeQuery(sourceShardPlacement, newWorkerNode, transferMode); + + /* + * The placement copy command uses distributed execution to copy + * the shard. This is allowed when indicating that the backend is a + * rebalancer backend. + */ + ExecuteCriticalRemoteCommand(connection, + "SET LOCAL application_name TO " + CITUS_REBALANCER_NAME); ExecuteCriticalRemoteCommand(connection, placementCopyCommand->data); RemoteTransactionCommit(connection); } diff --git a/src/test/regress/expected/failure_offline_move_shard_placement.out b/src/test/regress/expected/failure_offline_move_shard_placement.out index a6ecee18e59..bdd45449bdc 100644 --- a/src/test/regress/expected/failure_offline_move_shard_placement.out +++ b/src/test/regress/expected/failure_offline_move_shard_placement.out @@ -91,8 +91,8 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE move_shard_offline.t"). SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes'); ERROR: canceling statement due to user request --- failure on blocking append_table_to_shard operation on target node -SELECT citus.mitmproxy('conn.onQuery(query="worker_append_table_to_shard").kill()'); +-- failure on blocking COPY operation on target node +SELECT citus.mitmproxy('conn.onQuery(query="COPY").kill()'); mitmproxy --------------------------------------------------------------------- @@ -101,8 +101,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="worker_append_table_to_shard").kill( SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes'); ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx --- cancellation on blocking append_table_to_shard operation on target node -SELECT citus.mitmproxy('conn.onQuery(query="worker_append_table_to_shard").cancel(' || :pid || ')'); +while executing command on localhost:xxxxx +-- cancellation on blocking COPY operation on target node +SELECT citus.mitmproxy('conn.onQuery(query="COPY").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- @@ -129,25 +130,6 @@ SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT").cancel(' || :pid || SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes'); ERROR: canceling statement due to user request --- failure on CopyData operation on source node -SELECT citus.mitmproxy('conn.onCopyData().kill()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -SELECT master_move_shard_placement(200, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port, 'block_writes'); -ERROR: could not copy table "t_200" from "localhost:xxxxx" -CONTEXT: while executing command on localhost:xxxxx --- cancellation on CopyData operation on source node -SELECT citus.mitmproxy('conn.onCopyData().cancel(' || :pid || ')'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -SELECT master_move_shard_placement(200, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port, 'block_writes'); -ERROR: canceling statement due to user request CALL citus_cleanup_orphaned_shards(); -- Verify that the shard is not moved and the number of rows are still 100k SELECT citus.mitmproxy('conn.allow()'); diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index c2624894cf0..0067bdbadad 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1097,10 +1097,11 @@ SELECT * FROM multi_extension.print_extension_changes(); table columnar.stripe | | function citus_locks() SETOF record | function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode) void + | function worker_copy_table_to_node(regclass,integer) void | function worker_split_copy(bigint,split_copy_info[]) void | type split_copy_info | view citus_locks -(26 rows) +(27 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index b9d928acbc2..aab94dad101 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -416,18 +416,31 @@ SELECT unnest(shard_placement_replication_array( 2 )); ERROR: could not find a target for shard xxxxx +SET client_min_messages TO WARNING; +set citus.shard_count = 4; +-- Create a distributed table with all shards on a single node, so that we can +-- use this as an under-replicated +SET citus.shard_replication_factor TO 1; +SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false); + master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE replication_test_table(int_column int); +SELECT create_distributed_table('replication_test_table', 'int_column'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid = 'replication_test_table'::regclass; +INSERT INTO replication_test_table SELECT * FROM generate_series(1, 100); -- Ensure that shard_replication_factor is 2 during replicate_table_shards -- and rebalance_table_shards tests SET citus.shard_replication_factor TO 2; --- Turn off NOTICE messages -SET client_min_messages TO WARNING; --- Create a single-row test data for shard rebalancer test shards -CREATE TABLE shard_rebalancer_test_data AS SELECT 1::int as int_column; --- Test replicate_table_shards, which will in turn test update_shard_placement --- in copy mode. -CREATE TABLE replication_test_table(int_column int); -SELECT master_create_distributed_table('replication_test_table', 'int_column', 'append'); - master_create_distributed_table +SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true); + master_set_node_property --------------------------------------------------------------------- (1 row) @@ -438,37 +451,14 @@ CREATE VIEW replication_test_table_placements_per_node AS AND shardstate != 4 GROUP BY nodename, nodeport ORDER BY nodename, nodeport; -WARNING: "view replication_test_table_placements_per_node" has dependency to "table replication_test_table" that is not in Citus' metadata -DETAIL: "view replication_test_table_placements_per_node" will be created only locally -HINT: Distribute "table replication_test_table" first to distribute "view replication_test_table_placements_per_node" --- Create four shards with replication factor 2, and delete the placements --- with smaller port number to simulate under-replicated shards. -SELECT count(master_create_empty_shard('replication_test_table')) - FROM generate_series(1, 4); - count ---------------------------------------------------------------------- - 4 -(1 row) - -DELETE FROM pg_dist_shard_placement WHERE placementid in ( - SELECT pg_dist_shard_placement.placementid - FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard - WHERE logicalrelid = 'replication_test_table'::regclass - AND (nodename, nodeport) = (SELECT nodename, nodeport FROM pg_dist_shard_placement - ORDER BY nodename, nodeport limit 1) -); --- Upload the test data to the shards -\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) -\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) -\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) -\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) --- Verify that there is one node with all placements SELECT * FROM replication_test_table_placements_per_node; count --------------------------------------------------------------------- 4 (1 row) +-- Test replicate_table_shards, which will in turn test update_shard_placement +-- in copy mode. -- Check excluded_shard_list by excluding three shards with smaller ids SELECT replicate_table_shards('replication_test_table', excluded_shard_list := excluded_shard_list, @@ -540,19 +530,22 @@ SELECT * FROM replication_test_table_placements_per_node; SELECT count(*) FROM replication_test_table; count --------------------------------------------------------------------- - 4 + 100 (1 row) DROP TABLE public.replication_test_table CASCADE; -- Test rebalance_table_shards, which will in turn test update_shard_placement -- in move mode. +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 6; CREATE TABLE rebalance_test_table(int_column int); -SELECT master_create_distributed_table('rebalance_test_table', 'int_column', 'append'); - master_create_distributed_table +SELECT create_distributed_table('rebalance_test_table', 'int_column'); + create_distributed_table --------------------------------------------------------------------- (1 row) +UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid = 'rebalance_test_table'::regclass; CREATE VIEW table_placements_per_node AS SELECT nodeport, logicalrelid::regclass, count(*) FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard @@ -566,9 +559,6 @@ LANGUAGE SQL AS $$ SET citus.shard_replication_factor TO 1; - SELECT count(master_create_empty_shard(rel)) - FROM generate_series(1, 6); - SELECT count(master_move_shard_placement(shardid, src.nodename, src.nodeport::int, dst.nodename, dst.nodeport::int, @@ -582,12 +572,7 @@ $$; CALL create_unbalanced_shards('rebalance_test_table'); SET citus.shard_replication_factor TO 2; -- Upload the test data to the shards -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx) +INSERT INTO rebalance_test_table SELECT * FROM generate_series(1, 100); -- Verify that there is one node with all placements SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count @@ -772,7 +757,7 @@ SELECT * FROM table_placements_per_node; SELECT count(*) FROM rebalance_test_table; count --------------------------------------------------------------------- - 6 + 100 (1 row) DROP TABLE rebalance_test_table; @@ -863,21 +848,39 @@ INSERT INTO test_schema_support.imbalanced_table_local VALUES(4); CREATE TABLE test_schema_support.imbalanced_table ( id integer not null ); -SELECT master_create_distributed_table('test_schema_support.imbalanced_table', 'id', 'append'); - master_create_distributed_table +SET citus.shard_count = 3; +SET citus.shard_replication_factor TO 1; +SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false); + master_set_node_property --------------------------------------------------------------------- (1 row) -SET citus.shard_replication_factor TO 1; -SELECT master_create_empty_shard('test_schema_support.imbalanced_table') AS shardid \gset -COPY test_schema_support.imbalanced_table FROM STDIN WITH (format 'csv', append_to_shard :shardid); +SELECT create_distributed_table('test_schema_support.imbalanced_table', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO test_schema_support.imbalanced_table SELECT * FROM generate_series(1, 100); +UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid = 'test_schema_support.imbalanced_table'::regclass; +SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true); + master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +SET citus.shard_count = 4; +-- copy one of the shards to the other node, this is to test that the +-- rebalancer takes into account all copies of a placement SET citus.shard_replication_factor TO 2; -SELECT master_create_empty_shard('test_schema_support.imbalanced_table') AS shardid \gset -COPY test_schema_support.imbalanced_table FROM STDIN WITH (format 'csv', append_to_shard :shardid); +SELECT replicate_table_shards('test_schema_support.imbalanced_table', max_shard_copies := 1, shard_transfer_mode := 'block_writes'); + replicate_table_shards +--------------------------------------------------------------------- + +(1 row) + SET citus.shard_replication_factor TO 1; -SELECT master_create_empty_shard('test_schema_support.imbalanced_table') AS shardid \gset -COPY test_schema_support.imbalanced_table FROM STDIN WITH (format 'csv', append_to_shard :shardid); -- imbalanced_table is now imbalanced -- Shard counts in each node before rebalance SELECT * FROM public.table_placements_per_node; @@ -891,7 +894,7 @@ SELECT * FROM public.table_placements_per_node; SELECT COUNT(*) FROM imbalanced_table; count --------------------------------------------------------------------- - 12 + 100 (1 row) -- Test rebalance operation @@ -915,13 +918,13 @@ SELECT * FROM public.table_placements_per_node; SELECT COUNT(*) FROM imbalanced_table; count --------------------------------------------------------------------- - 12 + 100 (1 row) -DROP TABLE public.shard_rebalancer_test_data; DROP TABLE test_schema_support.imbalanced_table; DROP TABLE test_schema_support.imbalanced_table_local; SET citus.shard_replication_factor TO 1; +SET citus.shard_count = 4; CREATE TABLE colocated_rebalance_test(id integer); CREATE TABLE colocated_rebalance_test2(id integer); SELECT create_distributed_table('colocated_rebalance_test', 'id'); @@ -1073,14 +1076,14 @@ CALL citus_cleanup_orphaned_shards(); select * from pg_dist_placement ORDER BY placementid; placementid | shardid | shardstate | shardlength | groupid --------------------------------------------------------------------- - 150 | 123023 | 1 | 0 | 14 - 153 | 123024 | 1 | 0 | 14 - 156 | 123027 | 1 | 0 | 14 - 157 | 123028 | 1 | 0 | 14 - 158 | 123021 | 1 | 0 | 16 - 159 | 123025 | 1 | 0 | 16 - 160 | 123022 | 1 | 0 | 16 - 161 | 123026 | 1 | 0 | 16 + 146 | 123023 | 1 | 0 | 14 + 149 | 123024 | 1 | 0 | 14 + 152 | 123027 | 1 | 0 | 14 + 153 | 123028 | 1 | 0 | 14 + 154 | 123021 | 1 | 0 | 16 + 155 | 123025 | 1 | 0 | 16 + 156 | 123022 | 1 | 0 | 16 + 157 | 123026 | 1 | 0 | 16 (8 rows) -- Move all shards to worker1 again @@ -2123,8 +2126,7 @@ SET citus.shard_replication_factor TO 2; SELECT replicate_table_shards('dist_table_test_3', max_shard_copies := 4, shard_transfer_mode:='block_writes'); ERROR: Table 'dist_table_test_3' is streaming replicated. Shards of streaming replicated tables cannot be copied -- Mark table as coordinator replicated in order to be able to test replicate_table_shards -UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid IN - ('dist_table_test_3'::regclass); +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid = 'dist_table_test_3'::regclass; SELECT replicate_table_shards('dist_table_test_3', max_shard_copies := 4, shard_transfer_mode:='block_writes'); replicate_table_shards --------------------------------------------------------------------- diff --git a/src/test/regress/expected/tableam.out b/src/test/regress/expected/tableam.out index e211e2bf17f..242cb53100a 100644 --- a/src/test/regress/expected/tableam.out +++ b/src/test/regress/expected/tableam.out @@ -5,7 +5,7 @@ SET citus.shard_count TO 4; create schema test_tableam; set search_path to test_tableam; SELECT public.run_command_on_coordinator_and_workers($Q$ - SET citus.enable_ddl_propagation TO off; + SET citus.enable_ddl_propagation TO off; CREATE FUNCTION fake_am_handler(internal) RETURNS table_am_handler AS 'citus' @@ -26,7 +26,7 @@ ALTER EXTENSION citus ADD ACCESS METHOD fake_am; create table test_hash_dist(id int, val int) using fake_am; insert into test_hash_dist values (1, 1); WARNING: fake_tuple_insert -select create_distributed_table('test_hash_dist','id'); +select create_distributed_table('test_hash_dist','id', colocate_with := 'none'); WARNING: fake_scan_getnextslot CONTEXT: SQL statement "SELECT TRUE FROM test_tableam.test_hash_dist LIMIT 1" WARNING: fake_scan_getnextslot @@ -168,16 +168,20 @@ SELECT * FROM master_get_table_ddl_events('test_range_dist'); -- select a.shardid, a.nodeport FROM pg_dist_shard b, pg_dist_shard_placement a -WHERE a.shardid=b.shardid AND logicalrelid = 'test_range_dist'::regclass::oid +WHERE a.shardid=b.shardid AND logicalrelid = 'test_hash_dist'::regclass::oid ORDER BY a.shardid, nodeport; shardid | nodeport --------------------------------------------------------------------- - 60005 | 57637 - 60006 | 57638 -(2 rows) + 60000 | 57637 + 60001 | 57638 + 60002 | 57637 + 60003 | 57638 +(4 rows) +-- Change repmodel to allow master_copy_shard_placement +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid = 'test_hash_dist'::regclass; SELECT master_copy_shard_placement( - get_shard_id_for_distribution_column('test_range_dist', '1'), + get_shard_id_for_distribution_column('test_hash_dist', '1'), 'localhost', :worker_1_port, 'localhost', :worker_2_port, do_repair := false, @@ -189,55 +193,42 @@ SELECT master_copy_shard_placement( select a.shardid, a.nodeport FROM pg_dist_shard b, pg_dist_shard_placement a -WHERE a.shardid=b.shardid AND logicalrelid = 'test_range_dist'::regclass::oid +WHERE a.shardid=b.shardid AND logicalrelid = 'test_hash_dist'::regclass::oid ORDER BY a.shardid, nodeport; shardid | nodeport --------------------------------------------------------------------- - 60005 | 57637 - 60005 | 57638 - 60006 | 57638 -(3 rows) + 60000 | 57637 + 60000 | 57638 + 60001 | 57638 + 60002 | 57637 + 60003 | 57638 +(5 rows) -- verify that data was copied correctly \c - - - :worker_1_port -select * from test_tableam.test_range_dist_60005 ORDER BY id; -WARNING: fake_scan_getnextslot -WARNING: fake_scan_getnextslot -WARNING: fake_scan_getnextslot -WARNING: fake_scan_getnextslot +select * from test_tableam.test_hash_dist_60000 ORDER BY id; WARNING: fake_scan_getnextslot WARNING: fake_scan_getnextslot WARNING: fake_scan_getnextslot id | val --------------------------------------------------------------------- - 0 | 0 1 | 1 - 1 | -1 - 2 | 4 - 3 | 9 - 7 | 9 -(6 rows) + 1 | 1 +(2 rows) \c - - - :worker_2_port -select * from test_tableam.test_range_dist_60005 ORDER BY id; -WARNING: fake_scan_getnextslot -WARNING: fake_scan_getnextslot -WARNING: fake_scan_getnextslot -WARNING: fake_scan_getnextslot +select * from test_tableam.test_hash_dist_60000 ORDER BY id; WARNING: fake_scan_getnextslot WARNING: fake_scan_getnextslot WARNING: fake_scan_getnextslot id | val --------------------------------------------------------------------- - 0 | 0 1 | 1 - 1 | -1 - 2 | 4 - 3 | 9 - 7 | 9 -(6 rows) + 1 | 1 +(2 rows) \c - - - :master_port +set search_path to test_tableam; -- -- Test that partitioned tables work correctly with a fake_am table -- @@ -254,15 +245,15 @@ SELECT create_distributed_table('test_partitioned', 'id'); NOTICE: Copying data from local table... NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. -HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.test_partitioned_p1$$) +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$test_tableam.test_partitioned_p1$$) WARNING: fake_scan_getnextslot -CONTEXT: SQL statement "SELECT TRUE FROM public.test_partitioned_p2 LIMIT 1" +CONTEXT: SQL statement "SELECT TRUE FROM test_tableam.test_partitioned_p2 LIMIT 1" WARNING: fake_scan_getnextslot NOTICE: Copying data from local table... WARNING: fake_scan_getnextslot NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. -HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.test_partitioned_p2$$) +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$test_tableam.test_partitioned_p2$$) create_distributed_table --------------------------------------------------------------------- diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 0271d4a7743..ecf0e0e4da0 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -206,6 +206,7 @@ ORDER BY 1; function worker_apply_shard_ddl_command(bigint,text) function worker_apply_shard_ddl_command(bigint,text,text) function worker_change_sequence_dependency(regclass,regclass,regclass) + function worker_copy_table_to_node(regclass,integer) function worker_create_or_alter_role(text,text,text) function worker_create_or_replace_object(text) function worker_create_or_replace_object(text[]) @@ -263,5 +264,5 @@ ORDER BY 1; view citus_stat_statements view pg_dist_shard_placement view time_partitions -(255 rows) +(256 rows) diff --git a/src/test/regress/expected/worker_copy_table_to_node.out b/src/test/regress/expected/worker_copy_table_to_node.out new file mode 100644 index 00000000000..76f440189d0 --- /dev/null +++ b/src/test/regress/expected/worker_copy_table_to_node.out @@ -0,0 +1,81 @@ +CREATE SCHEMA worker_copy_table_to_node; +SET search_path TO worker_copy_table_to_node; +SET citus.shard_count TO 1; -- single shard table for ease of testing +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 62629600; +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +CREATE TABLE t(a int); +INSERT INTO t SELECT generate_series(1, 100); +CREATE TABLE ref(a int); +INSERT INTO ref SELECT generate_series(1, 100); +select create_distributed_table('t', 'a'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$worker_copy_table_to_node.t$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +select create_reference_table('ref'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$worker_copy_table_to_node.ref$$) + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +\c - - - :worker_2_port +SET search_path TO worker_copy_table_to_node; +-- Create empty shard on worker 2 too +CREATE TABLE t_62629600(a int); +\c - - - :worker_1_port +SET search_path TO worker_copy_table_to_node; +-- Make sure that the UDF doesn't work on Citus tables +SELECT worker_copy_table_to_node('t', :worker_1_node); +ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +SELECT worker_copy_table_to_node('ref', :worker_1_node); +ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +-- It should work on shards +SELECT worker_copy_table_to_node('t_62629600', :worker_1_node); + worker_copy_table_to_node +--------------------------------------------------------------------- + +(1 row) + +SELECT count(*) FROM t; + count +--------------------------------------------------------------------- + 200 +(1 row) + +SELECT count(*) FROM t_62629600; + count +--------------------------------------------------------------------- + 200 +(1 row) + +SELECT worker_copy_table_to_node('t_62629600', :worker_2_node); + worker_copy_table_to_node +--------------------------------------------------------------------- + +(1 row) + +\c - - - :worker_2_port +SET search_path TO worker_copy_table_to_node; +SELECT count(*) FROM t_62629600; + count +--------------------------------------------------------------------- + 200 +(1 row) + +\c - - - :master_port +SET search_path TO worker_copy_table_to_node; +SET client_min_messages TO WARNING; +DROP SCHEMA worker_copy_table_to_node CASCADE; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index f9136008f2e..58cfc87c835 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -58,7 +58,8 @@ test: cte_inline recursive_view_local_table values sequences_with_different_type test: pg13 pg12 # run pg14 sequentially as it syncs metadata test: pg14 -test: tableam drop_column_partitioned_table +test: drop_column_partitioned_table +test: tableam # ---------- # Miscellaneous tests to check our query planning behavior diff --git a/src/test/regress/operations_schedule b/src/test/regress/operations_schedule index 2692f212fc2..353eabfcdb4 100644 --- a/src/test/regress/operations_schedule +++ b/src/test/regress/operations_schedule @@ -3,6 +3,7 @@ test: multi_cluster_management test: multi_test_catalog_views test: shard_rebalancer_unit test: shard_rebalancer +test: worker_copy_table_to_node test: foreign_key_to_reference_shard_rebalance test: multi_move_mx test: shard_move_deferred_delete diff --git a/src/test/regress/sql/failure_offline_move_shard_placement.sql b/src/test/regress/sql/failure_offline_move_shard_placement.sql index 81683398b9f..1b02da1e9f4 100644 --- a/src/test/regress/sql/failure_offline_move_shard_placement.sql +++ b/src/test/regress/sql/failure_offline_move_shard_placement.sql @@ -57,12 +57,12 @@ SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost' SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE move_shard_offline.t").cancel(' || :pid || ')'); SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes'); --- failure on blocking append_table_to_shard operation on target node -SELECT citus.mitmproxy('conn.onQuery(query="worker_append_table_to_shard").kill()'); +-- failure on blocking COPY operation on target node +SELECT citus.mitmproxy('conn.onQuery(query="COPY").kill()'); SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes'); --- cancellation on blocking append_table_to_shard operation on target node -SELECT citus.mitmproxy('conn.onQuery(query="worker_append_table_to_shard").cancel(' || :pid || ')'); +-- cancellation on blocking COPY operation on target node +SELECT citus.mitmproxy('conn.onQuery(query="COPY").cancel(' || :pid || ')'); SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes'); -- failure on adding constraints on target node @@ -73,14 +73,6 @@ SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost' SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT").cancel(' || :pid || ')'); SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes'); --- failure on CopyData operation on source node -SELECT citus.mitmproxy('conn.onCopyData().kill()'); -SELECT master_move_shard_placement(200, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port, 'block_writes'); - --- cancellation on CopyData operation on source node -SELECT citus.mitmproxy('conn.onCopyData().cancel(' || :pid || ')'); -SELECT master_move_shard_placement(200, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port, 'block_writes'); - CALL citus_cleanup_orphaned_shards(); -- Verify that the shard is not moved and the number of rows are still 100k diff --git a/src/test/regress/sql/shard_rebalancer.sql b/src/test/regress/sql/shard_rebalancer.sql index b16356a4a96..0d482998bac 100644 --- a/src/test/regress/sql/shard_rebalancer.sql +++ b/src/test/regress/sql/shard_rebalancer.sql @@ -291,24 +291,22 @@ SELECT unnest(shard_placement_replication_array( 2 )); --- Ensure that shard_replication_factor is 2 during replicate_table_shards --- and rebalance_table_shards tests - -SET citus.shard_replication_factor TO 2; - --- Turn off NOTICE messages - SET client_min_messages TO WARNING; --- Create a single-row test data for shard rebalancer test shards - -CREATE TABLE shard_rebalancer_test_data AS SELECT 1::int as int_column; - --- Test replicate_table_shards, which will in turn test update_shard_placement --- in copy mode. - +set citus.shard_count = 4; +-- Create a distributed table with all shards on a single node, so that we can +-- use this as an under-replicated +SET citus.shard_replication_factor TO 1; +SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false); CREATE TABLE replication_test_table(int_column int); -SELECT master_create_distributed_table('replication_test_table', 'int_column', 'append'); +SELECT create_distributed_table('replication_test_table', 'int_column'); +UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid = 'replication_test_table'::regclass; +INSERT INTO replication_test_table SELECT * FROM generate_series(1, 100); + +-- Ensure that shard_replication_factor is 2 during replicate_table_shards +-- and rebalance_table_shards tests +SET citus.shard_replication_factor TO 2; +SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true); CREATE VIEW replication_test_table_placements_per_node AS SELECT count(*) FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard @@ -317,31 +315,12 @@ CREATE VIEW replication_test_table_placements_per_node AS GROUP BY nodename, nodeport ORDER BY nodename, nodeport; --- Create four shards with replication factor 2, and delete the placements --- with smaller port number to simulate under-replicated shards. - -SELECT count(master_create_empty_shard('replication_test_table')) - FROM generate_series(1, 4); - -DELETE FROM pg_dist_shard_placement WHERE placementid in ( - SELECT pg_dist_shard_placement.placementid - FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard - WHERE logicalrelid = 'replication_test_table'::regclass - AND (nodename, nodeport) = (SELECT nodename, nodeport FROM pg_dist_shard_placement - ORDER BY nodename, nodeport limit 1) -); - --- Upload the test data to the shards - -\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123000) -\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123001) -\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123002) -\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123003) - --- Verify that there is one node with all placements SELECT * FROM replication_test_table_placements_per_node; +-- Test replicate_table_shards, which will in turn test update_shard_placement +-- in copy mode. + -- Check excluded_shard_list by excluding three shards with smaller ids SELECT replicate_table_shards('replication_test_table', @@ -386,8 +365,11 @@ DROP TABLE public.replication_test_table CASCADE; -- Test rebalance_table_shards, which will in turn test update_shard_placement -- in move mode. +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 6; CREATE TABLE rebalance_test_table(int_column int); -SELECT master_create_distributed_table('rebalance_test_table', 'int_column', 'append'); +SELECT create_distributed_table('rebalance_test_table', 'int_column'); +UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid = 'rebalance_test_table'::regclass; CREATE VIEW table_placements_per_node AS SELECT nodeport, logicalrelid::regclass, count(*) @@ -404,9 +386,6 @@ LANGUAGE SQL AS $$ SET citus.shard_replication_factor TO 1; - SELECT count(master_create_empty_shard(rel)) - FROM generate_series(1, 6); - SELECT count(master_move_shard_placement(shardid, src.nodename, src.nodeport::int, dst.nodename, dst.nodeport::int, @@ -424,12 +403,7 @@ SET citus.shard_replication_factor TO 2; -- Upload the test data to the shards -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123004) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123005) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123006) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123007) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123008) -\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123009) +INSERT INTO rebalance_test_table SELECT * FROM generate_series(1, 100); -- Verify that there is one node with all placements @@ -604,34 +578,20 @@ CREATE TABLE test_schema_support.imbalanced_table ( id integer not null ); -SELECT master_create_distributed_table('test_schema_support.imbalanced_table', 'id', 'append'); - +SET citus.shard_count = 3; SET citus.shard_replication_factor TO 1; -SELECT master_create_empty_shard('test_schema_support.imbalanced_table') AS shardid \gset -COPY test_schema_support.imbalanced_table FROM STDIN WITH (format 'csv', append_to_shard :shardid); -1 -2 -3 -4 -\. +SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false); +SELECT create_distributed_table('test_schema_support.imbalanced_table', 'id'); +INSERT INTO test_schema_support.imbalanced_table SELECT * FROM generate_series(1, 100); +UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid = 'test_schema_support.imbalanced_table'::regclass; +SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true); +SET citus.shard_count = 4; +-- copy one of the shards to the other node, this is to test that the +-- rebalancer takes into account all copies of a placement SET citus.shard_replication_factor TO 2; -SELECT master_create_empty_shard('test_schema_support.imbalanced_table') AS shardid \gset -COPY test_schema_support.imbalanced_table FROM STDIN WITH (format 'csv', append_to_shard :shardid); -1 -2 -3 -4 -\. - +SELECT replicate_table_shards('test_schema_support.imbalanced_table', max_shard_copies := 1, shard_transfer_mode := 'block_writes'); SET citus.shard_replication_factor TO 1; -SELECT master_create_empty_shard('test_schema_support.imbalanced_table') AS shardid \gset -COPY test_schema_support.imbalanced_table FROM STDIN WITH (format 'csv', append_to_shard :shardid); -1 -2 -3 -4 -\. -- imbalanced_table is now imbalanced @@ -652,11 +612,11 @@ SELECT * FROM public.table_placements_per_node; -- Row count in imbalanced table after rebalance SELECT COUNT(*) FROM imbalanced_table; -DROP TABLE public.shard_rebalancer_test_data; DROP TABLE test_schema_support.imbalanced_table; DROP TABLE test_schema_support.imbalanced_table_local; SET citus.shard_replication_factor TO 1; +SET citus.shard_count = 4; CREATE TABLE colocated_rebalance_test(id integer); CREATE TABLE colocated_rebalance_test2(id integer); @@ -1276,8 +1236,7 @@ SET citus.shard_replication_factor TO 2; SELECT replicate_table_shards('dist_table_test_3', max_shard_copies := 4, shard_transfer_mode:='block_writes'); -- Mark table as coordinator replicated in order to be able to test replicate_table_shards -UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid IN - ('dist_table_test_3'::regclass); +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid = 'dist_table_test_3'::regclass; SELECT replicate_table_shards('dist_table_test_3', max_shard_copies := 4, shard_transfer_mode:='block_writes'); diff --git a/src/test/regress/sql/tableam.sql b/src/test/regress/sql/tableam.sql index 47845492a17..f0ed5cfca0f 100644 --- a/src/test/regress/sql/tableam.sql +++ b/src/test/regress/sql/tableam.sql @@ -26,7 +26,7 @@ ALTER EXTENSION citus ADD ACCESS METHOD fake_am; create table test_hash_dist(id int, val int) using fake_am; insert into test_hash_dist values (1, 1); -select create_distributed_table('test_hash_dist','id'); +select create_distributed_table('test_hash_dist','id', colocate_with := 'none'); select * from test_hash_dist; insert into test_hash_dist values (1, 1); @@ -86,11 +86,14 @@ SELECT * FROM master_get_table_ddl_events('test_range_dist'); select a.shardid, a.nodeport FROM pg_dist_shard b, pg_dist_shard_placement a -WHERE a.shardid=b.shardid AND logicalrelid = 'test_range_dist'::regclass::oid +WHERE a.shardid=b.shardid AND logicalrelid = 'test_hash_dist'::regclass::oid ORDER BY a.shardid, nodeport; +-- Change repmodel to allow master_copy_shard_placement +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid = 'test_hash_dist'::regclass; + SELECT master_copy_shard_placement( - get_shard_id_for_distribution_column('test_range_dist', '1'), + get_shard_id_for_distribution_column('test_hash_dist', '1'), 'localhost', :worker_1_port, 'localhost', :worker_2_port, do_repair := false, @@ -98,19 +101,21 @@ SELECT master_copy_shard_placement( select a.shardid, a.nodeport FROM pg_dist_shard b, pg_dist_shard_placement a -WHERE a.shardid=b.shardid AND logicalrelid = 'test_range_dist'::regclass::oid +WHERE a.shardid=b.shardid AND logicalrelid = 'test_hash_dist'::regclass::oid ORDER BY a.shardid, nodeport; -- verify that data was copied correctly \c - - - :worker_1_port -select * from test_tableam.test_range_dist_60005 ORDER BY id; +select * from test_tableam.test_hash_dist_60000 ORDER BY id; \c - - - :worker_2_port -select * from test_tableam.test_range_dist_60005 ORDER BY id; +select * from test_tableam.test_hash_dist_60000 ORDER BY id; \c - - - :master_port +set search_path to test_tableam; + -- -- Test that partitioned tables work correctly with a fake_am table -- diff --git a/src/test/regress/sql/worker_copy_table_to_node.sql b/src/test/regress/sql/worker_copy_table_to_node.sql new file mode 100644 index 00000000000..fa0703a257b --- /dev/null +++ b/src/test/regress/sql/worker_copy_table_to_node.sql @@ -0,0 +1,49 @@ +CREATE SCHEMA worker_copy_table_to_node; +SET search_path TO worker_copy_table_to_node; +SET citus.shard_count TO 1; -- single shard table for ease of testing +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 62629600; + +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset + +CREATE TABLE t(a int); +INSERT INTO t SELECT generate_series(1, 100); + +CREATE TABLE ref(a int); +INSERT INTO ref SELECT generate_series(1, 100); + +select create_distributed_table('t', 'a'); +select create_reference_table('ref'); + +\c - - - :worker_2_port +SET search_path TO worker_copy_table_to_node; + +-- Create empty shard on worker 2 too +CREATE TABLE t_62629600(a int); + +\c - - - :worker_1_port +SET search_path TO worker_copy_table_to_node; + +-- Make sure that the UDF doesn't work on Citus tables +SELECT worker_copy_table_to_node('t', :worker_1_node); +SELECT worker_copy_table_to_node('ref', :worker_1_node); + +-- It should work on shards +SELECT worker_copy_table_to_node('t_62629600', :worker_1_node); + +SELECT count(*) FROM t; +SELECT count(*) FROM t_62629600; + +SELECT worker_copy_table_to_node('t_62629600', :worker_2_node); + +\c - - - :worker_2_port +SET search_path TO worker_copy_table_to_node; + +SELECT count(*) FROM t_62629600; + +\c - - - :master_port +SET search_path TO worker_copy_table_to_node; + +SET client_min_messages TO WARNING; +DROP SCHEMA worker_copy_table_to_node CASCADE;