diff --git a/src/backend/distributed/operations/worker_copy_table_to_node_udf.c b/src/backend/distributed/operations/worker_copy_table_to_node_udf.c index c603de72af0..43a405f1c17 100644 --- a/src/backend/distributed/operations/worker_copy_table_to_node_udf.c +++ b/src/backend/distributed/operations/worker_copy_table_to_node_udf.c @@ -40,6 +40,15 @@ worker_copy_table_to_node(PG_FUNCTION_ARGS) Oid relationId = PG_GETARG_OID(0); uint32_t targetNodeId = PG_GETARG_INT32(1); + if (IsCitusTable(relationId)) + { + char *qualifiedRelationName = generate_qualified_relation_name(relationId); + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table %s is a Citus table, only copies of " + "shards or regular postgres tables are supported", + qualifiedRelationName))); + } + Oid schemaOid = get_rel_namespace(relationId); char *relationSchemaName = get_namespace_name(schemaOid); char *relationName = get_rel_name(relationId); diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index f99c9b537f7..a891b2ee24d 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -83,6 +83,8 @@ static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead); static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState); static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest); +static StringInfo ConstructShardTruncateStatement( + List *destinationShardFullyQualifiedName); static bool @@ -108,12 +110,35 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) NULL /* database (current) */); ClaimConnectionExclusively(copyDest->connection); - - RemoteTransactionBeginIfNecessary(copyDest->connection); + /* Begin the remote transaction */ + RemoteTransactionBegin(copyDest->connection); SetupReplicationOriginRemoteSession(copyDest->connection); + /* Handle TRUNCATE or any setup commands */ + StringInfo truncateStatement = ConstructShardTruncateStatement( + copyDest->destinationShardFullyQualifiedName); + + if (!SendRemoteCommand(copyDest->connection, truncateStatement->data)) + { + ReportConnectionError(copyDest->connection, ERROR); + RemoteTransactionAbort(copyDest->connection); + ResetRemoteTransaction(copyDest->connection); + } + PGresult *truncateResult = GetRemoteCommandResult(copyDest->connection, true); + if (!IsResponseOK(truncateResult)) + { + ReportResultError(copyDest->connection, truncateResult, ERROR); + PQclear(truncateResult); + ForgetResults(copyDest->connection); + RemoteTransactionAbort(copyDest->connection); + ResetRemoteTransaction(copyDest->connection); + } + PQclear(truncateResult); + ForgetResults(copyDest->connection); + + /* Construct and send the COPY statement with FREEZE */ StringInfo copyStatement = ConstructShardCopyStatement( copyDest->destinationShardFullyQualifiedName, copyDest->copyOutState->binary, @@ -122,16 +147,18 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) if (!SendRemoteCommand(copyDest->connection, copyStatement->data)) { ReportConnectionError(copyDest->connection, ERROR); + RemoteTransactionAbort(copyDest->connection); + ResetRemoteTransaction(copyDest->connection); } - PGresult *result = GetRemoteCommandResult(copyDest->connection, - true /* raiseInterrupts */); - if (PQresultStatus(result) != PGRES_COPY_IN) + PGresult *copyResult = GetRemoteCommandResult(copyDest->connection, + true /* raiseInterrupts */); + if (PQresultStatus(copyResult) != PGRES_COPY_IN) { - ReportResultError(copyDest->connection, result, ERROR); + ReportResultError(copyDest->connection, copyResult, ERROR); } - PQclear(result); + PQclear(copyResult); } @@ -329,7 +356,7 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) /* check whether there were any COPY errors */ PGresult *result = GetRemoteCommandResult(copyDest->connection, true /* raiseInterrupts */); - if (PQresultStatus(result) != PGRES_COMMAND_OK) + if (!IsResponseOK(result)) { ReportCopyError(copyDest->connection, result); } @@ -339,6 +366,21 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) ResetReplicationOriginRemoteSession(copyDest->connection); + /* End the transaction by sending a COMMIT command */ + if (!SendRemoteCommand(copyDest->connection, "COMMIT")) + { + HandleRemoteTransactionConnectionError(copyDest->connection, true); + } + + PGresult *commitResult = GetRemoteCommandResult(copyDest->connection, true); + if (!IsResponseOK(result)) + { + ereport(ERROR, (errcode(ERRCODE_TRANSACTION_RESOLUTION_UNKNOWN), + errmsg("Failed to commit transaction"))); + } + + PQclear(commitResult); + CloseConnection(copyDest->connection); } } @@ -424,6 +466,23 @@ CopyableColumnNamesFromRelationName(const char *schemaName, const char *relation } +/* + * ConstructShardTruncateStatement constructs the text of a TRUNCATE statement + * for the destination shard. + */ +static StringInfo +ConstructShardTruncateStatement(List *destinationShardFullyQualifiedName) +{ + StringInfo command = makeStringInfo(); + + appendStringInfo(command, "TRUNCATE %s.%s;", + quote_identifier(linitial(destinationShardFullyQualifiedName)), + quote_identifier(lsecond(destinationShardFullyQualifiedName))); + + return command; +} + + /* * ConstructShardCopyStatement constructs the text of a COPY statement * for copying into a result table @@ -436,22 +495,22 @@ ConstructShardCopyStatement(List *destinationShardFullyQualifiedName, bool char *destinationShardSchemaName = linitial(destinationShardFullyQualifiedName); char *destinationShardRelationName = lsecond(destinationShardFullyQualifiedName); - StringInfo command = makeStringInfo(); const char *columnList = CopyableColumnNamesFromTupleDesc(tupleDesc); appendStringInfo(command, "COPY %s.%s (%s) FROM STDIN", - quote_identifier(destinationShardSchemaName), quote_identifier( - destinationShardRelationName), columnList); + quote_identifier(destinationShardSchemaName), + quote_identifier(destinationShardRelationName), + columnList); if (useBinaryFormat) { - appendStringInfo(command, " WITH (format binary);"); + appendStringInfo(command, " WITH (format binary, FREEZE);"); } else { - appendStringInfo(command, ";"); + appendStringInfo(command, " WITH (FREEZE);"); } return command;