From d32e7263ae7de7aedecdee11b62430d3b1eabbc9 Mon Sep 17 00:00:00 2001 From: Mehmet YILMAZ Date: Mon, 9 Sep 2024 15:16:39 +0000 Subject: [PATCH 1/8] add freeze, start and end transaction --- .../operations/worker_shard_copy.c | 197 ++++++++++++------ 1 file changed, 131 insertions(+), 66 deletions(-) diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index f99c9b537f7..24f50e1d0d4 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -83,6 +83,8 @@ static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead); static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState); static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest); +static List *CreateCopyOptions(bool isBinaryCopy); +static StringInfo ConstructShardTruncateStatement(List *destinationShardFullyQualifiedName); static bool @@ -108,12 +110,26 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) NULL /* database (current) */); ClaimConnectionExclusively(copyDest->connection); - - RemoteTransactionBeginIfNecessary(copyDest->connection); + StartRemoteTransactionBegin(copyDest->connection); SetupReplicationOriginRemoteSession(copyDest->connection); + // Construct and send the TRUNCATE statement to the remote node + StringInfo truncateStatement = ConstructShardTruncateStatement(copyDest->destinationShardFullyQualifiedName); + + if (!SendRemoteCommand(copyDest->connection, truncateStatement->data)) + { + ReportConnectionError(copyDest->connection, ERROR); + } + + PGresult *truncateResult = GetRemoteCommandResult(copyDest->connection, true); + if (PQresultStatus(truncateResult) != PGRES_COMMAND_OK) + { + ReportResultError(copyDest->connection, truncateResult, ERROR); + } + PQclear(truncateResult); + // Construct the COPY command and send it to the remote node StringInfo copyStatement = ConstructShardCopyStatement( copyDest->destinationShardFullyQualifiedName, copyDest->copyOutState->binary, @@ -329,7 +345,7 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) /* check whether there were any COPY errors */ PGresult *result = GetRemoteCommandResult(copyDest->connection, true /* raiseInterrupts */); - if (PQresultStatus(result) != PGRES_COMMAND_OK) + if (!IsResponseOK(result)) { ReportCopyError(copyDest->connection, result); } @@ -339,6 +355,21 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) ResetReplicationOriginRemoteSession(copyDest->connection); + /* End the transaction by sending a COMMIT command */ + if (!SendRemoteCommand(copyDest->connection, "COMMIT")) + { + HandleRemoteTransactionConnectionError(copyDest->connection, true); + } + + PGresult *commitResult = GetRemoteCommandResult(copyDest->connection, true); + if (!IsResponseOK(result)) + { + ereport(ERROR, (errcode(ERRCODE_TRANSACTION_RESOLUTION_UNKNOWN), + errmsg("Failed to commit transaction"))); + } + + PQclear(commitResult); + CloseConnection(copyDest->connection); } } @@ -424,37 +455,54 @@ CopyableColumnNamesFromRelationName(const char *schemaName, const char *relation } +/* + * ConstructShardTruncateStatement constructs the text of a TRUNCATE statement + * for the destination shard. + */ +static StringInfo +ConstructShardTruncateStatement(List *destinationShardFullyQualifiedName) +{ + StringInfo command = makeStringInfo(); + + appendStringInfo(command, "TRUNCATE %s.%s;", + quote_identifier(linitial(destinationShardFullyQualifiedName)), + quote_identifier(lsecond(destinationShardFullyQualifiedName))); + + return command; +} + + /* * ConstructShardCopyStatement constructs the text of a COPY statement * for copying into a result table */ static StringInfo ConstructShardCopyStatement(List *destinationShardFullyQualifiedName, bool - useBinaryFormat, - TupleDesc tupleDesc) + useBinaryFormat, + TupleDesc tupleDesc) { - char *destinationShardSchemaName = linitial(destinationShardFullyQualifiedName); - char *destinationShardRelationName = lsecond(destinationShardFullyQualifiedName); - + char *destinationShardSchemaName = linitial(destinationShardFullyQualifiedName); + char *destinationShardRelationName = lsecond(destinationShardFullyQualifiedName); - StringInfo command = makeStringInfo(); + StringInfo command = makeStringInfo(); - const char *columnList = CopyableColumnNamesFromTupleDesc(tupleDesc); + const char *columnList = CopyableColumnNamesFromTupleDesc(tupleDesc); - appendStringInfo(command, "COPY %s.%s (%s) FROM STDIN", - quote_identifier(destinationShardSchemaName), quote_identifier( - destinationShardRelationName), columnList); + appendStringInfo(command, "COPY %s.%s (%s) FROM STDIN", + quote_identifier(destinationShardSchemaName), + quote_identifier(destinationShardRelationName), + columnList); - if (useBinaryFormat) - { - appendStringInfo(command, " WITH (format binary);"); - } - else - { - appendStringInfo(command, ";"); - } + if (useBinaryFormat) + { + appendStringInfo(command, " WITH (format binary, FREEZE);"); + } + else + { + appendStringInfo(command, " WITH (FREEZE);"); + } - return command; + return command; } @@ -488,61 +536,78 @@ WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest) } +/* + * CreateCopyOptions creates the options list for the COPY command. + */ +static List * +CreateCopyOptions(bool isBinaryCopy) +{ + List *options = NIL; + + // Add the FREEZE option + DefElem *freezeOption = makeDefElem("freeze", (Node *) makeInteger(true), -1); + options = lappend(options, freezeOption); + + // If binary format is used, add the binary format option + if (isBinaryCopy) + { + DefElem *binaryFormatOption = makeDefElem("format", (Node *) makeString("binary"), -1); + options = lappend(options, binaryFormatOption); + } + + return options; +} + + /* * LocalCopyToShard performs local copy for the given destination shard. */ static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState) { - bool isBinaryCopy = localCopyOutState->binary; - if (isBinaryCopy) - { - AppendCopyBinaryFooters(localCopyOutState); - } + bool isBinaryCopy = localCopyOutState->binary; - /* - * Set the buffer as a global variable to allow ReadFromLocalBufferCallback - * to read from it. We cannot pass additional arguments to - * ReadFromLocalBufferCallback. - */ - LocalCopyBuffer = localCopyOutState->fe_msgbuf; + if (isBinaryCopy) + { + AppendCopyBinaryFooters(localCopyOutState); + } - char *destinationShardSchemaName = linitial( - copyDest->destinationShardFullyQualifiedName); - char *destinationShardRelationName = lsecond( - copyDest->destinationShardFullyQualifiedName); + /* + * Set the buffer as a global variable to allow ReadFromLocalBufferCallback + * to read from it. We cannot pass additional arguments to + * ReadFromLocalBufferCallback. + */ + LocalCopyBuffer = localCopyOutState->fe_msgbuf; - Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName, - false /* missing_ok */); - Oid destinationShardOid = get_relname_relid(destinationShardRelationName, - destinationSchemaOid); + // Extract schema and relation names + char *destinationShardSchemaName = linitial(copyDest->destinationShardFullyQualifiedName); + char *destinationShardRelationName = lsecond(copyDest->destinationShardFullyQualifiedName); - DefElem *binaryFormatOption = NULL; - if (isBinaryCopy) - { - binaryFormatOption = makeDefElem("format", (Node *) makeString("binary"), -1); - } + // Get OIDs for schema and shard + Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName, false); + Oid destinationShardOid = get_relname_relid(destinationShardRelationName, destinationSchemaOid); + + // Create options list for COPY command + List *options = CreateCopyOptions(isBinaryCopy); + + // Open the shard relation + Relation shard = table_open(destinationShardOid, RowExclusiveLock); + + // Create and configure parse state + ParseState *pState = make_parsestate(NULL); + (void) addRangeTableEntryForRelation(pState, shard, AccessShareLock, NULL, false, false); + + // Begin and execute the COPY FROM operation + CopyFromState cstate = BeginCopyFrom(pState, shard, NULL, NULL, false, ReadFromLocalBufferCallback, NULL, options); + CopyFrom(cstate); + EndCopyFrom(cstate); + + // Reset the local copy buffer + resetStringInfo(localCopyOutState->fe_msgbuf); - Relation shard = table_open(destinationShardOid, RowExclusiveLock); - ParseState *pState = make_parsestate(NULL /* parentParseState */); - (void) addRangeTableEntryForRelation(pState, shard, AccessShareLock, - NULL /* alias */, false /* inh */, - false /* inFromCl */); - - List *options = (isBinaryCopy) ? list_make1(binaryFormatOption) : NULL; - CopyFromState cstate = BeginCopyFrom(pState, shard, - NULL /* whereClause */, - NULL /* fileName */, - false /* is_program */, - ReadFromLocalBufferCallback, - NULL /* attlist (NULL is all columns) */, - options); - CopyFrom(cstate); - EndCopyFrom(cstate); - resetStringInfo(localCopyOutState->fe_msgbuf); - - table_close(shard, NoLock); - free_parsestate(pState); + // Close the shard relation and free parse state + table_close(shard, NoLock); + free_parsestate(pState); } From d13ecedcfa98d1502eb4db3424926e8b42c280e5 Mon Sep 17 00:00:00 2001 From: Mehmet YILMAZ Date: Mon, 9 Sep 2024 21:18:34 +0000 Subject: [PATCH 2/8] indent --- .../operations/worker_shard_copy.c | 162 +++++++++--------- 1 file changed, 85 insertions(+), 77 deletions(-) diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index 24f50e1d0d4..4c0d0ebd84b 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -83,8 +83,9 @@ static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead); static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState); static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest); -static List *CreateCopyOptions(bool isBinaryCopy); -static StringInfo ConstructShardTruncateStatement(List *destinationShardFullyQualifiedName); +static List * CreateCopyOptions(bool isBinaryCopy); +static StringInfo ConstructShardTruncateStatement( + List *destinationShardFullyQualifiedName); static bool @@ -114,8 +115,9 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) SetupReplicationOriginRemoteSession(copyDest->connection); - // Construct and send the TRUNCATE statement to the remote node - StringInfo truncateStatement = ConstructShardTruncateStatement(copyDest->destinationShardFullyQualifiedName); + /* Construct and send the TRUNCATE statement to the remote node */ + StringInfo truncateStatement = ConstructShardTruncateStatement( + copyDest->destinationShardFullyQualifiedName); if (!SendRemoteCommand(copyDest->connection, truncateStatement->data)) { @@ -127,9 +129,9 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) { ReportResultError(copyDest->connection, truncateResult, ERROR); } - PQclear(truncateResult); + PQclear(truncateResult); - // Construct the COPY command and send it to the remote node + /* Construct the COPY command and send it to the remote node */ StringInfo copyStatement = ConstructShardCopyStatement( copyDest->destinationShardFullyQualifiedName, copyDest->copyOutState->binary, @@ -478,31 +480,31 @@ ConstructShardTruncateStatement(List *destinationShardFullyQualifiedName) */ static StringInfo ConstructShardCopyStatement(List *destinationShardFullyQualifiedName, bool - useBinaryFormat, - TupleDesc tupleDesc) + useBinaryFormat, + TupleDesc tupleDesc) { - char *destinationShardSchemaName = linitial(destinationShardFullyQualifiedName); - char *destinationShardRelationName = lsecond(destinationShardFullyQualifiedName); + char *destinationShardSchemaName = linitial(destinationShardFullyQualifiedName); + char *destinationShardRelationName = lsecond(destinationShardFullyQualifiedName); - StringInfo command = makeStringInfo(); + StringInfo command = makeStringInfo(); - const char *columnList = CopyableColumnNamesFromTupleDesc(tupleDesc); + const char *columnList = CopyableColumnNamesFromTupleDesc(tupleDesc); - appendStringInfo(command, "COPY %s.%s (%s) FROM STDIN", - quote_identifier(destinationShardSchemaName), - quote_identifier(destinationShardRelationName), - columnList); + appendStringInfo(command, "COPY %s.%s (%s) FROM STDIN", + quote_identifier(destinationShardSchemaName), + quote_identifier(destinationShardRelationName), + columnList); - if (useBinaryFormat) - { - appendStringInfo(command, " WITH (format binary, FREEZE);"); - } - else - { - appendStringInfo(command, " WITH (FREEZE);"); - } + if (useBinaryFormat) + { + appendStringInfo(command, " WITH (format binary, FREEZE);"); + } + else + { + appendStringInfo(command, " WITH (FREEZE);"); + } - return command; + return command; } @@ -542,20 +544,21 @@ WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest) static List * CreateCopyOptions(bool isBinaryCopy) { - List *options = NIL; + List *options = NIL; - // Add the FREEZE option - DefElem *freezeOption = makeDefElem("freeze", (Node *) makeInteger(true), -1); - options = lappend(options, freezeOption); + /* Add the FREEZE option */ + DefElem *freezeOption = makeDefElem("freeze", (Node *) makeInteger(true), -1); + options = lappend(options, freezeOption); - // If binary format is used, add the binary format option - if (isBinaryCopy) - { - DefElem *binaryFormatOption = makeDefElem("format", (Node *) makeString("binary"), -1); - options = lappend(options, binaryFormatOption); - } + /* If binary format is used, add the binary format option */ + if (isBinaryCopy) + { + DefElem *binaryFormatOption = makeDefElem("format", (Node *) makeString("binary"), + -1); + options = lappend(options, binaryFormatOption); + } - return options; + return options; } @@ -565,49 +568,54 @@ CreateCopyOptions(bool isBinaryCopy) static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState) { - bool isBinaryCopy = localCopyOutState->binary; - - if (isBinaryCopy) - { - AppendCopyBinaryFooters(localCopyOutState); - } - - /* - * Set the buffer as a global variable to allow ReadFromLocalBufferCallback - * to read from it. We cannot pass additional arguments to - * ReadFromLocalBufferCallback. - */ - LocalCopyBuffer = localCopyOutState->fe_msgbuf; - - // Extract schema and relation names - char *destinationShardSchemaName = linitial(copyDest->destinationShardFullyQualifiedName); - char *destinationShardRelationName = lsecond(copyDest->destinationShardFullyQualifiedName); - - // Get OIDs for schema and shard - Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName, false); - Oid destinationShardOid = get_relname_relid(destinationShardRelationName, destinationSchemaOid); - - // Create options list for COPY command - List *options = CreateCopyOptions(isBinaryCopy); - - // Open the shard relation - Relation shard = table_open(destinationShardOid, RowExclusiveLock); - - // Create and configure parse state - ParseState *pState = make_parsestate(NULL); - (void) addRangeTableEntryForRelation(pState, shard, AccessShareLock, NULL, false, false); - - // Begin and execute the COPY FROM operation - CopyFromState cstate = BeginCopyFrom(pState, shard, NULL, NULL, false, ReadFromLocalBufferCallback, NULL, options); - CopyFrom(cstate); - EndCopyFrom(cstate); + bool isBinaryCopy = localCopyOutState->binary; - // Reset the local copy buffer - resetStringInfo(localCopyOutState->fe_msgbuf); + if (isBinaryCopy) + { + AppendCopyBinaryFooters(localCopyOutState); + } - // Close the shard relation and free parse state - table_close(shard, NoLock); - free_parsestate(pState); + /* + * Set the buffer as a global variable to allow ReadFromLocalBufferCallback + * to read from it. We cannot pass additional arguments to + * ReadFromLocalBufferCallback. + */ + LocalCopyBuffer = localCopyOutState->fe_msgbuf; + + /* Extract schema and relation names */ + char *destinationShardSchemaName = linitial( + copyDest->destinationShardFullyQualifiedName); + char *destinationShardRelationName = lsecond( + copyDest->destinationShardFullyQualifiedName); + + /* Get OIDs for schema and shard */ + Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName, false); + Oid destinationShardOid = get_relname_relid(destinationShardRelationName, + destinationSchemaOid); + + /* Create options list for COPY command */ + List *options = CreateCopyOptions(isBinaryCopy); + + /* Open the shard relation */ + Relation shard = table_open(destinationShardOid, RowExclusiveLock); + + /* Create and configure parse state */ + ParseState *pState = make_parsestate(NULL); + (void) addRangeTableEntryForRelation(pState, shard, AccessShareLock, NULL, false, + false); + + /* Begin and execute the COPY FROM operation */ + CopyFromState cstate = BeginCopyFrom(pState, shard, NULL, NULL, false, + ReadFromLocalBufferCallback, NULL, options); + CopyFrom(cstate); + EndCopyFrom(cstate); + + /* Reset the local copy buffer */ + resetStringInfo(localCopyOutState->fe_msgbuf); + + /* Close the shard relation and free parse state */ + table_close(shard, NoLock); + free_parsestate(pState); } From 812f9b182f7e877f6fcb93d2ed93700a2a8fb780 Mon Sep 17 00:00:00 2001 From: Mehmet YILMAZ Date: Tue, 10 Sep 2024 13:37:07 +0000 Subject: [PATCH 3/8] remote shard update --- .../operations/worker_shard_copy.c | 96 ++++++++++--------- 1 file changed, 53 insertions(+), 43 deletions(-) diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index 4c0d0ebd84b..5a22fc95b66 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -100,56 +100,66 @@ CanUseLocalCopy(uint32_t destinationNodeId) static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) { - int connectionFlags = OUTSIDE_TRANSACTION; - char *currentUser = CurrentUserName(); - WorkerNode *workerNode = FindNodeWithNodeId(copyDest->destinationNodeId, - false /* missingOk */); - copyDest->connection = GetNodeUserDatabaseConnection(connectionFlags, - workerNode->workerName, - workerNode->workerPort, - currentUser, - NULL /* database (current) */); - ClaimConnectionExclusively(copyDest->connection); - - StartRemoteTransactionBegin(copyDest->connection); + int connectionFlags = OUTSIDE_TRANSACTION; + char *currentUser = CurrentUserName(); + WorkerNode *workerNode = FindNodeWithNodeId(copyDest->destinationNodeId, + false /* missingOk */); + copyDest->connection = GetNodeUserDatabaseConnection(connectionFlags, + workerNode->workerName, + workerNode->workerPort, + currentUser, + NULL /* database (current) */); + ClaimConnectionExclusively(copyDest->connection); + + /* Begin the remote transaction */ + RemoteTransactionBegin(copyDest->connection); SetupReplicationOriginRemoteSession(copyDest->connection); - /* Construct and send the TRUNCATE statement to the remote node */ - StringInfo truncateStatement = ConstructShardTruncateStatement( - copyDest->destinationShardFullyQualifiedName); - - if (!SendRemoteCommand(copyDest->connection, truncateStatement->data)) - { - ReportConnectionError(copyDest->connection, ERROR); - } - - PGresult *truncateResult = GetRemoteCommandResult(copyDest->connection, true); - if (PQresultStatus(truncateResult) != PGRES_COMMAND_OK) - { - ReportResultError(copyDest->connection, truncateResult, ERROR); - } - PQclear(truncateResult); - - /* Construct the COPY command and send it to the remote node */ - StringInfo copyStatement = ConstructShardCopyStatement( - copyDest->destinationShardFullyQualifiedName, - copyDest->copyOutState->binary, - copyDest->tupleDescriptor); - - if (!SendRemoteCommand(copyDest->connection, copyStatement->data)) - { - ReportConnectionError(copyDest->connection, ERROR); - } - - PGresult *result = GetRemoteCommandResult(copyDest->connection, + /* Handle TRUNCATE or any setup commands */ + StringInfo truncateStatement = ConstructShardTruncateStatement( + copyDest->destinationShardFullyQualifiedName); + + if (!SendRemoteCommand(copyDest->connection, truncateStatement->data)) + { + ReportConnectionError(copyDest->connection, ERROR); + RemoteTransactionAbort(copyDest->connection); + ResetRemoteTransaction(copyDest->connection); + } + + PGresult *truncateResult = GetRemoteCommandResult(copyDest->connection, true); + if (!IsResponseOK(truncateResult)) + { + ReportResultError(copyDest->connection, truncateResult, ERROR); + PQclear(truncateResult); + ForgetResults(copyDest->connection); + RemoteTransactionAbort(copyDest->connection); + ResetRemoteTransaction(copyDest->connection); + } + PQclear(truncateResult); + ForgetResults(copyDest->connection); + + /* Construct and send the COPY statement with FREEZE */ + StringInfo copyStatement = ConstructShardCopyStatement( + copyDest->destinationShardFullyQualifiedName, + copyDest->copyOutState->binary, + copyDest->tupleDescriptor); + + if (!SendRemoteCommand(copyDest->connection, copyStatement->data)) + { + ReportConnectionError(copyDest->connection, ERROR); + RemoteTransactionAbort(copyDest->connection); + ResetRemoteTransaction(copyDest->connection); + } + + PGresult *copyResult = GetRemoteCommandResult(copyDest->connection, true /* raiseInterrupts */); - if (PQresultStatus(result) != PGRES_COPY_IN) + if (PQresultStatus(copyResult ) != PGRES_COPY_IN) { - ReportResultError(copyDest->connection, result, ERROR); + ReportResultError(copyDest->connection, copyResult, ERROR); } - PQclear(result); + PQclear(copyResult); } From 9b76b62bda72099f8b588a4cca08bcd4b63af979 Mon Sep 17 00:00:00 2001 From: Mehmet YILMAZ Date: Tue, 10 Sep 2024 14:25:08 +0000 Subject: [PATCH 4/8] remove freeze option from local --- src/backend/distributed/operations/worker_shard_copy.c | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index 5a22fc95b66..6ed81020b64 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -556,10 +556,6 @@ CreateCopyOptions(bool isBinaryCopy) { List *options = NIL; - /* Add the FREEZE option */ - DefElem *freezeOption = makeDefElem("freeze", (Node *) makeInteger(true), -1); - options = lappend(options, freezeOption); - /* If binary format is used, add the binary format option */ if (isBinaryCopy) { From 842f5e486fda0c7bf2ae62778695e262d458ed44 Mon Sep 17 00:00:00 2001 From: Mehmet YILMAZ Date: Tue, 10 Sep 2024 17:55:39 +0000 Subject: [PATCH 5/8] indent --- .../operations/worker_shard_copy.c | 94 +++++++++---------- 1 file changed, 47 insertions(+), 47 deletions(-) diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index 6ed81020b64..3b45cb91365 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -100,61 +100,61 @@ CanUseLocalCopy(uint32_t destinationNodeId) static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) { - int connectionFlags = OUTSIDE_TRANSACTION; - char *currentUser = CurrentUserName(); - WorkerNode *workerNode = FindNodeWithNodeId(copyDest->destinationNodeId, - false /* missingOk */); - copyDest->connection = GetNodeUserDatabaseConnection(connectionFlags, - workerNode->workerName, - workerNode->workerPort, - currentUser, - NULL /* database (current) */); - ClaimConnectionExclusively(copyDest->connection); - - /* Begin the remote transaction */ - RemoteTransactionBegin(copyDest->connection); + int connectionFlags = OUTSIDE_TRANSACTION; + char *currentUser = CurrentUserName(); + WorkerNode *workerNode = FindNodeWithNodeId(copyDest->destinationNodeId, + false /* missingOk */); + copyDest->connection = GetNodeUserDatabaseConnection(connectionFlags, + workerNode->workerName, + workerNode->workerPort, + currentUser, + NULL /* database (current) */); + ClaimConnectionExclusively(copyDest->connection); + + /* Begin the remote transaction */ + RemoteTransactionBegin(copyDest->connection); SetupReplicationOriginRemoteSession(copyDest->connection); - /* Handle TRUNCATE or any setup commands */ - StringInfo truncateStatement = ConstructShardTruncateStatement( - copyDest->destinationShardFullyQualifiedName); + /* Handle TRUNCATE or any setup commands */ + StringInfo truncateStatement = ConstructShardTruncateStatement( + copyDest->destinationShardFullyQualifiedName); - if (!SendRemoteCommand(copyDest->connection, truncateStatement->data)) - { - ReportConnectionError(copyDest->connection, ERROR); + if (!SendRemoteCommand(copyDest->connection, truncateStatement->data)) + { + ReportConnectionError(copyDest->connection, ERROR); RemoteTransactionAbort(copyDest->connection); - ResetRemoteTransaction(copyDest->connection); - } - - PGresult *truncateResult = GetRemoteCommandResult(copyDest->connection, true); - if (!IsResponseOK(truncateResult)) - { - ReportResultError(copyDest->connection, truncateResult, ERROR); - PQclear(truncateResult); - ForgetResults(copyDest->connection); - RemoteTransactionAbort(copyDest->connection); - ResetRemoteTransaction(copyDest->connection); - } - PQclear(truncateResult); - ForgetResults(copyDest->connection); - - /* Construct and send the COPY statement with FREEZE */ - StringInfo copyStatement = ConstructShardCopyStatement( - copyDest->destinationShardFullyQualifiedName, - copyDest->copyOutState->binary, - copyDest->tupleDescriptor); - - if (!SendRemoteCommand(copyDest->connection, copyStatement->data)) - { - ReportConnectionError(copyDest->connection, ERROR); + ResetRemoteTransaction(copyDest->connection); + } + + PGresult *truncateResult = GetRemoteCommandResult(copyDest->connection, true); + if (!IsResponseOK(truncateResult)) + { + ReportResultError(copyDest->connection, truncateResult, ERROR); + PQclear(truncateResult); + ForgetResults(copyDest->connection); RemoteTransactionAbort(copyDest->connection); - ResetRemoteTransaction(copyDest->connection); - } + ResetRemoteTransaction(copyDest->connection); + } + PQclear(truncateResult); + ForgetResults(copyDest->connection); + + /* Construct and send the COPY statement with FREEZE */ + StringInfo copyStatement = ConstructShardCopyStatement( + copyDest->destinationShardFullyQualifiedName, + copyDest->copyOutState->binary, + copyDest->tupleDescriptor); + + if (!SendRemoteCommand(copyDest->connection, copyStatement->data)) + { + ReportConnectionError(copyDest->connection, ERROR); + RemoteTransactionAbort(copyDest->connection); + ResetRemoteTransaction(copyDest->connection); + } PGresult *copyResult = GetRemoteCommandResult(copyDest->connection, - true /* raiseInterrupts */); - if (PQresultStatus(copyResult ) != PGRES_COPY_IN) + true /* raiseInterrupts */); + if (PQresultStatus(copyResult) != PGRES_COPY_IN) { ReportResultError(copyDest->connection, copyResult, ERROR); } From 4db4a1ee9fd11bdd2b57ee2ea77bc6c8ba2d9281 Mon Sep 17 00:00:00 2001 From: Mehmet YILMAZ Date: Tue, 10 Sep 2024 18:40:24 +0000 Subject: [PATCH 6/8] revert local shard changes --- .../operations/worker_shard_copy.c | 60 +++++++------------ 1 file changed, 20 insertions(+), 40 deletions(-) diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index 3b45cb91365..a891b2ee24d 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -83,7 +83,6 @@ static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead); static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState); static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest); -static List * CreateCopyOptions(bool isBinaryCopy); static StringInfo ConstructShardTruncateStatement( List *destinationShardFullyQualifiedName); @@ -548,26 +547,6 @@ WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest) } -/* - * CreateCopyOptions creates the options list for the COPY command. - */ -static List * -CreateCopyOptions(bool isBinaryCopy) -{ - List *options = NIL; - - /* If binary format is used, add the binary format option */ - if (isBinaryCopy) - { - DefElem *binaryFormatOption = makeDefElem("format", (Node *) makeString("binary"), - -1); - options = lappend(options, binaryFormatOption); - } - - return options; -} - - /* * LocalCopyToShard performs local copy for the given destination shard. */ @@ -575,7 +554,6 @@ static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState) { bool isBinaryCopy = localCopyOutState->binary; - if (isBinaryCopy) { AppendCopyBinaryFooters(localCopyOutState); @@ -588,38 +566,40 @@ LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState */ LocalCopyBuffer = localCopyOutState->fe_msgbuf; - /* Extract schema and relation names */ char *destinationShardSchemaName = linitial( copyDest->destinationShardFullyQualifiedName); char *destinationShardRelationName = lsecond( copyDest->destinationShardFullyQualifiedName); - /* Get OIDs for schema and shard */ - Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName, false); + Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName, + false /* missing_ok */); Oid destinationShardOid = get_relname_relid(destinationShardRelationName, destinationSchemaOid); - /* Create options list for COPY command */ - List *options = CreateCopyOptions(isBinaryCopy); + DefElem *binaryFormatOption = NULL; + if (isBinaryCopy) + { + binaryFormatOption = makeDefElem("format", (Node *) makeString("binary"), -1); + } - /* Open the shard relation */ Relation shard = table_open(destinationShardOid, RowExclusiveLock); - - /* Create and configure parse state */ - ParseState *pState = make_parsestate(NULL); - (void) addRangeTableEntryForRelation(pState, shard, AccessShareLock, NULL, false, - false); - - /* Begin and execute the COPY FROM operation */ - CopyFromState cstate = BeginCopyFrom(pState, shard, NULL, NULL, false, - ReadFromLocalBufferCallback, NULL, options); + ParseState *pState = make_parsestate(NULL /* parentParseState */); + (void) addRangeTableEntryForRelation(pState, shard, AccessShareLock, + NULL /* alias */, false /* inh */, + false /* inFromCl */); + + List *options = (isBinaryCopy) ? list_make1(binaryFormatOption) : NULL; + CopyFromState cstate = BeginCopyFrom(pState, shard, + NULL /* whereClause */, + NULL /* fileName */, + false /* is_program */, + ReadFromLocalBufferCallback, + NULL /* attlist (NULL is all columns) */, + options); CopyFrom(cstate); EndCopyFrom(cstate); - - /* Reset the local copy buffer */ resetStringInfo(localCopyOutState->fe_msgbuf); - /* Close the shard relation and free parse state */ table_close(shard, NoLock); free_parsestate(pState); } From 77c9bf8d6e981088497e18de1ff54fcf02fa0622 Mon Sep 17 00:00:00 2001 From: Mehmet YILMAZ Date: Tue, 17 Sep 2024 14:16:56 +0000 Subject: [PATCH 7/8] local copy --- .../operations/worker_shard_copy.c | 161 +++++++++++------- 1 file changed, 99 insertions(+), 62 deletions(-) diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index a891b2ee24d..173d89c5559 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -17,6 +17,8 @@ #include "parser/parse_relation.h" #include "utils/builtins.h" #include "utils/lsyscache.h" +#include "executor/spi.h" +#include "miscadmin.h" #include "distributed/commands/multi_copy.h" #include "distributed/connection_management.h" @@ -85,6 +87,7 @@ static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest); static StringInfo ConstructShardTruncateStatement( List *destinationShardFullyQualifiedName); +static void TruncateShardForCopy(Oid shardOid); static bool @@ -366,20 +369,10 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) ResetReplicationOriginRemoteSession(copyDest->connection); - /* End the transaction by sending a COMMIT command */ - if (!SendRemoteCommand(copyDest->connection, "COMMIT")) - { - HandleRemoteTransactionConnectionError(copyDest->connection, true); - } - - PGresult *commitResult = GetRemoteCommandResult(copyDest->connection, true); - if (!IsResponseOK(result)) - { - ereport(ERROR, (errcode(ERRCODE_TRANSACTION_RESOLUTION_UNKNOWN), - errmsg("Failed to commit transaction"))); - } - - PQclear(commitResult); + PQclear(result); + ForgetResults(copyDest->connection); + RemoteTransactionCommit(copyDest->connection); + ResetRemoteTransaction(copyDest->connection); CloseConnection(copyDest->connection); } @@ -548,60 +541,104 @@ WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest) /* - * LocalCopyToShard performs local copy for the given destination shard. + * Truncate the table before starting the COPY with FREEZE. */ static void -LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState) +TruncateShardForCopy(Oid shardOid) { - bool isBinaryCopy = localCopyOutState->binary; - if (isBinaryCopy) - { - AppendCopyBinaryFooters(localCopyOutState); - } - - /* - * Set the buffer as a global variable to allow ReadFromLocalBufferCallback - * to read from it. We cannot pass additional arguments to - * ReadFromLocalBufferCallback. - */ - LocalCopyBuffer = localCopyOutState->fe_msgbuf; - - char *destinationShardSchemaName = linitial( - copyDest->destinationShardFullyQualifiedName); - char *destinationShardRelationName = lsecond( - copyDest->destinationShardFullyQualifiedName); + Relation shard = table_open(shardOid, AccessExclusiveLock); + + /* Execute the TRUNCATE */ + char *shardRelationName = RelationGetRelationName(shard); + char *schemaName = get_namespace_name(RelationGetNamespace(shard)); + StringInfo truncateQuery = makeStringInfo(); + appendStringInfo(truncateQuery, "TRUNCATE %s.%s", quote_identifier(schemaName), quote_identifier(shardRelationName)); + + /* Initialize SPI */ + if (SPI_connect() != SPI_OK_CONNECT) + { + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("could not connect to SPI manager"))); + } + + /* Execute the TRUNCATE command */ + int spiResult = SPI_execute(truncateQuery->data, false, 0); + if (spiResult != SPI_OK_UTILITY) + { + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("TRUNCATE command failed"))); + } + + /* Finalize SPI */ + SPI_finish(); + + /* Release lock */ + table_close(shard, NoLock); +} - Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName, - false /* missing_ok */); - Oid destinationShardOid = get_relname_relid(destinationShardRelationName, - destinationSchemaOid); - DefElem *binaryFormatOption = NULL; - if (isBinaryCopy) - { - binaryFormatOption = makeDefElem("format", (Node *) makeString("binary"), -1); - } - Relation shard = table_open(destinationShardOid, RowExclusiveLock); - ParseState *pState = make_parsestate(NULL /* parentParseState */); - (void) addRangeTableEntryForRelation(pState, shard, AccessShareLock, - NULL /* alias */, false /* inh */, - false /* inFromCl */); - - List *options = (isBinaryCopy) ? list_make1(binaryFormatOption) : NULL; - CopyFromState cstate = BeginCopyFrom(pState, shard, - NULL /* whereClause */, - NULL /* fileName */, - false /* is_program */, - ReadFromLocalBufferCallback, - NULL /* attlist (NULL is all columns) */, - options); - CopyFrom(cstate); - EndCopyFrom(cstate); - resetStringInfo(localCopyOutState->fe_msgbuf); - - table_close(shard, NoLock); - free_parsestate(pState); +static void +LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState) +{ + bool isBinaryCopy = localCopyOutState->binary; + + if (isBinaryCopy) + { + AppendCopyBinaryFooters(localCopyOutState); + } + + LocalCopyBuffer = localCopyOutState->fe_msgbuf; + + char *destinationShardSchemaName = linitial(copyDest->destinationShardFullyQualifiedName); + char *destinationShardRelationName = lsecond(copyDest->destinationShardFullyQualifiedName); + + Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName, false); + Oid destinationShardOid = get_relname_relid(destinationShardRelationName, destinationSchemaOid); + + /* Truncate the destination shard before performing COPY FREEZE */ + set_config_option("citus.enable_manual_changes_to_shards", + "on", /* Always set to "on" */ + (superuser() ? PGC_SUSET : PGC_USERSET), /* Allow superusers to change the setting at SUSET level */ + PGC_S_SESSION, /* Session level scope */ + GUC_ACTION_LOCAL, /* Local action within the session */ + true, /* Change in the current transaction */ + 0, /* No GUC source specified */ + false /* Do not report errors if already set */ + ); + + TruncateShardForCopy(destinationShardOid); + + DefElem *binaryFormatOption = NULL; + if (isBinaryCopy) + { + binaryFormatOption = makeDefElem("format", (Node *) makeString("binary"), -1); + } + + DefElem *freezeOption = makeDefElem("freeze", (Node *) makeString("true"), -1); + + Relation shard = table_open(destinationShardOid, RowExclusiveLock); + ParseState *pState = make_parsestate(NULL); + (void) addRangeTableEntryForRelation(pState, shard, AccessShareLock, NULL, false, false); + + List *options = NIL; + if (isBinaryCopy) + { + options = list_make2(binaryFormatOption, freezeOption); + } + else + { + options = list_make1(freezeOption); + } + + CopyFromState cstate = BeginCopyFrom(pState, shard, NULL, NULL, false, ReadFromLocalBufferCallback, NULL, options); + CopyFrom(cstate); + EndCopyFrom(cstate); + + resetStringInfo(localCopyOutState->fe_msgbuf); + + table_close(shard, NoLock); + free_parsestate(pState); } From c958e8e0922a2d6b914b5aeb0d5109accc056240 Mon Sep 17 00:00:00 2001 From: Mehmet YILMAZ Date: Mon, 23 Sep 2024 11:26:48 +0000 Subject: [PATCH 8/8] update --- .../worker_copy_table_to_node_udf.c | 9 + .../operations/worker_shard_copy.c | 161 +++++++----------- 2 files changed, 71 insertions(+), 99 deletions(-) diff --git a/src/backend/distributed/operations/worker_copy_table_to_node_udf.c b/src/backend/distributed/operations/worker_copy_table_to_node_udf.c index c603de72af0..43a405f1c17 100644 --- a/src/backend/distributed/operations/worker_copy_table_to_node_udf.c +++ b/src/backend/distributed/operations/worker_copy_table_to_node_udf.c @@ -40,6 +40,15 @@ worker_copy_table_to_node(PG_FUNCTION_ARGS) Oid relationId = PG_GETARG_OID(0); uint32_t targetNodeId = PG_GETARG_INT32(1); + if (IsCitusTable(relationId)) + { + char *qualifiedRelationName = generate_qualified_relation_name(relationId); + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table %s is a Citus table, only copies of " + "shards or regular postgres tables are supported", + qualifiedRelationName))); + } + Oid schemaOid = get_rel_namespace(relationId); char *relationSchemaName = get_namespace_name(schemaOid); char *relationName = get_rel_name(relationId); diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index 173d89c5559..a891b2ee24d 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -17,8 +17,6 @@ #include "parser/parse_relation.h" #include "utils/builtins.h" #include "utils/lsyscache.h" -#include "executor/spi.h" -#include "miscadmin.h" #include "distributed/commands/multi_copy.h" #include "distributed/connection_management.h" @@ -87,7 +85,6 @@ static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest); static StringInfo ConstructShardTruncateStatement( List *destinationShardFullyQualifiedName); -static void TruncateShardForCopy(Oid shardOid); static bool @@ -369,10 +366,20 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) ResetReplicationOriginRemoteSession(copyDest->connection); - PQclear(result); - ForgetResults(copyDest->connection); - RemoteTransactionCommit(copyDest->connection); - ResetRemoteTransaction(copyDest->connection); + /* End the transaction by sending a COMMIT command */ + if (!SendRemoteCommand(copyDest->connection, "COMMIT")) + { + HandleRemoteTransactionConnectionError(copyDest->connection, true); + } + + PGresult *commitResult = GetRemoteCommandResult(copyDest->connection, true); + if (!IsResponseOK(result)) + { + ereport(ERROR, (errcode(ERRCODE_TRANSACTION_RESOLUTION_UNKNOWN), + errmsg("Failed to commit transaction"))); + } + + PQclear(commitResult); CloseConnection(copyDest->connection); } @@ -541,104 +548,60 @@ WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest) /* - * Truncate the table before starting the COPY with FREEZE. + * LocalCopyToShard performs local copy for the given destination shard. */ static void -TruncateShardForCopy(Oid shardOid) +LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState) { - Relation shard = table_open(shardOid, AccessExclusiveLock); - - /* Execute the TRUNCATE */ - char *shardRelationName = RelationGetRelationName(shard); - char *schemaName = get_namespace_name(RelationGetNamespace(shard)); - StringInfo truncateQuery = makeStringInfo(); - appendStringInfo(truncateQuery, "TRUNCATE %s.%s", quote_identifier(schemaName), quote_identifier(shardRelationName)); - - /* Initialize SPI */ - if (SPI_connect() != SPI_OK_CONNECT) - { - ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("could not connect to SPI manager"))); - } - - /* Execute the TRUNCATE command */ - int spiResult = SPI_execute(truncateQuery->data, false, 0); - if (spiResult != SPI_OK_UTILITY) - { - ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("TRUNCATE command failed"))); - } - - /* Finalize SPI */ - SPI_finish(); - - /* Release lock */ - table_close(shard, NoLock); -} + bool isBinaryCopy = localCopyOutState->binary; + if (isBinaryCopy) + { + AppendCopyBinaryFooters(localCopyOutState); + } + /* + * Set the buffer as a global variable to allow ReadFromLocalBufferCallback + * to read from it. We cannot pass additional arguments to + * ReadFromLocalBufferCallback. + */ + LocalCopyBuffer = localCopyOutState->fe_msgbuf; + char *destinationShardSchemaName = linitial( + copyDest->destinationShardFullyQualifiedName); + char *destinationShardRelationName = lsecond( + copyDest->destinationShardFullyQualifiedName); -static void -LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState) -{ - bool isBinaryCopy = localCopyOutState->binary; - - if (isBinaryCopy) - { - AppendCopyBinaryFooters(localCopyOutState); - } - - LocalCopyBuffer = localCopyOutState->fe_msgbuf; - - char *destinationShardSchemaName = linitial(copyDest->destinationShardFullyQualifiedName); - char *destinationShardRelationName = lsecond(copyDest->destinationShardFullyQualifiedName); - - Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName, false); - Oid destinationShardOid = get_relname_relid(destinationShardRelationName, destinationSchemaOid); - - /* Truncate the destination shard before performing COPY FREEZE */ - set_config_option("citus.enable_manual_changes_to_shards", - "on", /* Always set to "on" */ - (superuser() ? PGC_SUSET : PGC_USERSET), /* Allow superusers to change the setting at SUSET level */ - PGC_S_SESSION, /* Session level scope */ - GUC_ACTION_LOCAL, /* Local action within the session */ - true, /* Change in the current transaction */ - 0, /* No GUC source specified */ - false /* Do not report errors if already set */ - ); - - TruncateShardForCopy(destinationShardOid); - - DefElem *binaryFormatOption = NULL; - if (isBinaryCopy) - { - binaryFormatOption = makeDefElem("format", (Node *) makeString("binary"), -1); - } - - DefElem *freezeOption = makeDefElem("freeze", (Node *) makeString("true"), -1); - - Relation shard = table_open(destinationShardOid, RowExclusiveLock); - ParseState *pState = make_parsestate(NULL); - (void) addRangeTableEntryForRelation(pState, shard, AccessShareLock, NULL, false, false); - - List *options = NIL; - if (isBinaryCopy) - { - options = list_make2(binaryFormatOption, freezeOption); - } - else - { - options = list_make1(freezeOption); - } - - CopyFromState cstate = BeginCopyFrom(pState, shard, NULL, NULL, false, ReadFromLocalBufferCallback, NULL, options); - CopyFrom(cstate); - EndCopyFrom(cstate); - - resetStringInfo(localCopyOutState->fe_msgbuf); - - table_close(shard, NoLock); - free_parsestate(pState); + Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName, + false /* missing_ok */); + Oid destinationShardOid = get_relname_relid(destinationShardRelationName, + destinationSchemaOid); + + DefElem *binaryFormatOption = NULL; + if (isBinaryCopy) + { + binaryFormatOption = makeDefElem("format", (Node *) makeString("binary"), -1); + } + + Relation shard = table_open(destinationShardOid, RowExclusiveLock); + ParseState *pState = make_parsestate(NULL /* parentParseState */); + (void) addRangeTableEntryForRelation(pState, shard, AccessShareLock, + NULL /* alias */, false /* inh */, + false /* inFromCl */); + + List *options = (isBinaryCopy) ? list_make1(binaryFormatOption) : NULL; + CopyFromState cstate = BeginCopyFrom(pState, shard, + NULL /* whereClause */, + NULL /* fileName */, + false /* is_program */, + ReadFromLocalBufferCallback, + NULL /* attlist (NULL is all columns) */, + options); + CopyFrom(cstate); + EndCopyFrom(cstate); + resetStringInfo(localCopyOutState->fe_msgbuf); + + table_close(shard, NoLock); + free_parsestate(pState); }