Skip to content

Commit

Permalink
Use shard split copy code for blocking shard moves
Browse files Browse the repository at this point in the history
The new shard copy code that was created for shard splits has some
advantages over the old code. This one uses binary copy when possible to
make the copy faster. When doing a shard move using `block_writes` it
now uses this better copy logic.
  • Loading branch information
JelteF committed Aug 1, 2022
1 parent 0a04b11 commit 1922ddf
Show file tree
Hide file tree
Showing 19 changed files with 441 additions and 231 deletions.
64 changes: 57 additions & 7 deletions src/backend/distributed/operations/repair_shards.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "access/htup_details.h"
#include "catalog/pg_class.h"
#include "catalog/pg_enum.h"
#include "distributed/adaptive_executor.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
Expand All @@ -38,6 +39,7 @@
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/shard_rebalancer.h"
#include "distributed/shard_split.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"
Expand Down Expand Up @@ -129,6 +131,7 @@ static List * PostLoadShardCreationCommandList(ShardInterval *shardInterval,
int32 sourceNodePort);
static ShardCommandList * CreateShardCommandList(ShardInterval *shardInterval,
List *ddlCommandList);
static char * CreateShardCopyCommand(ShardInterval *shard, WorkerNode *targetNode);


/* declarations for dynamic loading */
Expand Down Expand Up @@ -1180,6 +1183,9 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(localContext);

WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort);
WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort);

/* iterate through the colocated shards and copy each */
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardIntervalList)
Expand All @@ -1199,23 +1205,48 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
char *tableOwner = TableOwner(shardInterval->relationId);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner, ddlCommandList);
}

ddlCommandList = NIL;

int taskId = 0;
List *copyTaskList = NIL;
foreach_ptr(shardInterval, shardIntervalList)
{
/*
* Skip copying data for partitioned tables, because they contain no
* data themselves. Their partitions do contain data, but those are
* different colocated shards that will be copied seperately.
*/
if (!PartitionedTable(shardInterval->relationId))
{
ddlCommandList = CopyShardContentsCommandList(shardInterval, sourceNodeName,
sourceNodePort);
char *copyCommand = CreateShardCopyCommand(
shardInterval, targetNode);

Task *copyTask = CreateBasicTask(
INVALID_JOB_ID,
taskId,
READ_TASK,
copyCommand);

ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement);
SetPlacementNodeMetadata(taskPlacement, sourceNode);

copyTask->taskPlacementList = list_make1(taskPlacement);

copyTaskList = lappend(copyTaskList, copyTask);
taskId++;
}
ddlCommandList = list_concat(
ddlCommandList,
}

ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, copyTaskList,
MaxAdaptiveExecutorPoolSize,
NULL /* jobIdList (ignored by API implementation) */);

foreach_ptr(shardInterval, shardIntervalList)
{
List *ddlCommandList =
PostLoadShardCreationCommandList(shardInterval, sourceNodeName,
sourceNodePort));
sourceNodePort);
char *tableOwner = TableOwner(shardInterval->relationId);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner, ddlCommandList);

Expand Down Expand Up @@ -1278,6 +1309,25 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
}


/*
* CreateShardCopyCommand constructs the command to copy a shard to another
* worker node. This command needs to be run on the node wher you want to copy
* the shard from.
*/
static char *
CreateShardCopyCommand(ShardInterval *shard,
WorkerNode *targetNode)
{
char *shardName = ConstructQualifiedShardName(shard);
StringInfo query = makeStringInfo();
appendStringInfo(query,
"SELECT pg_catalog.worker_copy_table_to_node(%s::regclass, %u);",
quote_literal_cstr(shardName),
targetNode->nodeId);
return query->data;
}


/*
* CopyPartitionShardsCommandList gets a shardInterval which is a shard that
* belongs to partitioned table (this is asserted).
Expand Down
65 changes: 65 additions & 0 deletions src/backend/distributed/operations/worker_copy_table_to_node_udf.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*-------------------------------------------------------------------------
*
* worker_copy_table_to_node_udf.c
*
* This file implements the worker_copy_table_to_node UDF. This UDF can be
* used to copy the data in a shard (or other table) from one worker node to
* another.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/

#include "postgres.h"

#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/worker_shard_copy.h"

PG_FUNCTION_INFO_V1(worker_copy_table_to_node);

/*
* worker_copy_table_to_node copies a shard from this worker to another worker
*
* SQL signature:
*
* worker_copy_table_to_node(
* source_table regclass,
* target_node_id integer
* ) RETURNS VOID
*/
Datum
worker_copy_table_to_node(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
uint32_t targetNodeId = PG_GETARG_INT32(1);

Oid schemaOid = get_rel_namespace(relationId);
char *relationSchemaName = get_namespace_name(schemaOid);
char *relationName = get_rel_name(relationId);
char *relationQualifiedName = quote_qualified_identifier(
relationSchemaName,
relationName);

EState *executor = CreateExecutorState();
DestReceiver *destReceiver = CreateShardCopyDestReceiver(
executor,
list_make2(relationSchemaName, relationName),
targetNodeId);

StringInfo selectShardQueryForCopy = makeStringInfo();
appendStringInfo(selectShardQueryForCopy,
"SELECT * FROM %s;", relationQualifiedName);

ParamListInfo params = NULL;
ExecuteQueryStringIntoDestReceiver(selectShardQueryForCopy->data, params,
destReceiver);

FreeExecutorState(executor);

PG_RETURN_VOID();
}
1 change: 1 addition & 0 deletions src/backend/distributed/sql/citus--11.0-3--11.1-1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,4 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_
#include "udfs/get_all_active_transactions/11.1-1.sql"
#include "udfs/citus_split_shard_by_split_points/11.1-1.sql"
#include "udfs/worker_split_copy/11.1-1.sql"
#include "udfs/worker_copy_table_to_node/11.1-1.sql"
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ DROP FUNCTION pg_catalog.worker_split_copy(
splitCopyInfos pg_catalog.split_copy_info[]);
DROP TYPE pg_catalog.split_copy_info;

DROP FUNCTION pg_catalog.worker_copy_table_to_node(
source_table regclass,
target_node_id integer);

DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4,
OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz,
OUT global_pid int8);
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.worker_copy_table_to_node(
source_table regclass,
target_node_id integer)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_copy_table_to_node$$;
COMMENT ON FUNCTION pg_catalog.worker_copy_table_to_node(regclass, integer)
IS 'Perform copy of a shard';
9 changes: 9 additions & 0 deletions src/backend/distributed/utils/reference_table_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,15 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
CopyShardPlacementToWorkerNodeQuery(sourceShardPlacement,
newWorkerNode,
transferMode);

/*
* The placement copy command uses distributed execution to copy
* the shard. This is allowed when indicating that the backend is a
* rebalancer backend.
*/
ExecuteCriticalRemoteCommand(connection,
"SET LOCAL application_name TO "
CITUS_REBALANCER_NAME);
ExecuteCriticalRemoteCommand(connection, placementCopyCommand->data);
RemoteTransactionCommit(connection);
}
Expand Down
28 changes: 5 additions & 23 deletions src/test/regress/expected/failure_offline_move_shard_placement.out
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE move_shard_offline.t").

SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes');
ERROR: canceling statement due to user request
-- failure on blocking append_table_to_shard operation on target node
SELECT citus.mitmproxy('conn.onQuery(query="worker_append_table_to_shard").kill()');
-- failure on blocking COPY operation on target node
SELECT citus.mitmproxy('conn.onQuery(query="COPY").kill()');
mitmproxy
---------------------------------------------------------------------

Expand All @@ -101,8 +101,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="worker_append_table_to_shard").kill(
SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes');
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
-- cancellation on blocking append_table_to_shard operation on target node
SELECT citus.mitmproxy('conn.onQuery(query="worker_append_table_to_shard").cancel(' || :pid || ')');
while executing command on localhost:xxxxx
-- cancellation on blocking COPY operation on target node
SELECT citus.mitmproxy('conn.onQuery(query="COPY").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------

Expand All @@ -129,25 +130,6 @@ SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT").cancel(' || :pid ||

SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes');
ERROR: canceling statement due to user request
-- failure on CopyData operation on source node
SELECT citus.mitmproxy('conn.onCopyData().kill()');
mitmproxy
---------------------------------------------------------------------

(1 row)

SELECT master_move_shard_placement(200, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port, 'block_writes');
ERROR: could not copy table "t_200" from "localhost:xxxxx"
CONTEXT: while executing command on localhost:xxxxx
-- cancellation on CopyData operation on source node
SELECT citus.mitmproxy('conn.onCopyData().cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------

(1 row)

SELECT master_move_shard_placement(200, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port, 'block_writes');
ERROR: canceling statement due to user request
CALL citus_cleanup_orphaned_shards();
-- Verify that the shard is not moved and the number of rows are still 100k
SELECT citus.mitmproxy('conn.allow()');
Expand Down
3 changes: 2 additions & 1 deletion src/test/regress/expected/multi_extension.out
Original file line number Diff line number Diff line change
Expand Up @@ -1097,10 +1097,11 @@ SELECT * FROM multi_extension.print_extension_changes();
table columnar.stripe |
| function citus_locks() SETOF record
| function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode) void
| function worker_copy_table_to_node(regclass,integer) void
| function worker_split_copy(bigint,split_copy_info[]) void
| type split_copy_info
| view citus_locks
(26 rows)
(27 rows)

DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version
Expand Down
Loading

0 comments on commit 1922ddf

Please sign in to comment.