From b0b6f3ea87a72da86ff1e5620c32ab2e948fbf2b Mon Sep 17 00:00:00 2001 From: Halil Ozan Akgul Date: Thu, 14 Sep 2023 16:32:54 +0300 Subject: [PATCH] Adds 2PC distributed commands to pools --- src/backend/distributed/commands/function.c | 1 + .../distributed/commands/utility_hook.c | 57 ++++++++ .../connection/connection_configuration.c | 14 +- src/backend/distributed/metadata/distobject.c | 48 ++++++- .../distributed/metadata/metadata_cache.c | 5 + .../distributed/metadata/metadata_sync.c | 23 +++- src/backend/distributed/shared_library_init.c | 3 + .../distributed/sql/citus--12.0-1--12.1-1.sql | 8 ++ .../sql/downgrades/citus--12.1-1--12.0-1.sql | 17 +++ .../12.1-1.sql | 7 + .../latest.sql | 7 + .../citus_mark_object_distributed/12.1-1.sql | 7 + .../citus_mark_object_distributed/latest.sql | 7 + .../commit_management_command_2pc/12.1-1.sql | 7 + .../commit_management_command_2pc/latest.sql | 7 + .../execute_command_on_other_nodes/12.1-1.sql | 7 + .../execute_command_on_other_nodes/latest.sql | 7 + .../transaction/remote_transaction.c | 127 +++++++++++++++++- .../transaction/transaction_management.c | 18 ++- .../transaction/transaction_recovery.c | 44 +++++- .../transaction/worker_transaction.c | 9 +- src/include/distributed/metadata/distobject.h | 1 + src/include/distributed/metadata_sync.h | 1 + src/include/distributed/pg_dist_transaction.h | 3 +- src/include/distributed/remote_commands.h | 2 + src/include/distributed/remote_transaction.h | 8 ++ .../distributed/transaction_recovery.h | 3 +- src/include/distributed/worker_transaction.h | 7 +- src/test/regress/expected/multi_extension.out | 6 +- .../multi_mx_transaction_recovery.out | 2 +- src/test/regress/expected/other_databases.out | 57 ++++++++ .../expected/upgrade_list_citus_objects.out | 6 +- src/test/regress/failure_schedule | 1 + src/test/regress/multi_schedule | 1 + src/test/regress/pg_regress_multi.pl | 1 + .../regress/sql/failure_non_main_db_2pc.sql | 15 +++ .../sql/multi_mx_transaction_recovery.sql | 2 +- src/test/regress/sql/other_databases.sql | 36 +++++ 38 files changed, 563 insertions(+), 19 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/citus_internal_start_management_transaction/12.1-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_internal_start_management_transaction/latest.sql create mode 100644 src/backend/distributed/sql/udfs/citus_mark_object_distributed/12.1-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_mark_object_distributed/latest.sql create mode 100644 src/backend/distributed/sql/udfs/commit_management_command_2pc/12.1-1.sql create mode 100644 src/backend/distributed/sql/udfs/commit_management_command_2pc/latest.sql create mode 100644 src/backend/distributed/sql/udfs/execute_command_on_other_nodes/12.1-1.sql create mode 100644 src/backend/distributed/sql/udfs/execute_command_on_other_nodes/latest.sql create mode 100644 src/test/regress/expected/other_databases.out create mode 100644 src/test/regress/sql/failure_non_main_db_2pc.sql create mode 100644 src/test/regress/sql/other_databases.sql diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index 7010416734e..319c6ddf9ac 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -883,6 +883,7 @@ UpdateFunctionDistributionInfo(const ObjectAddress *distAddress, char *workerPgDistObjectUpdateCommand = MarkObjectsDistributedCreateCommand(objectAddressList, + NIL, distArgumentIndexList, colocationIdList, forceDelegationList); diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index afc8fa9fd7b..ce6e80abac7 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -35,6 +35,7 @@ #include "access/htup_details.h" #include "catalog/catalog.h" #include "catalog/dependency.h" +#include "catalog/pg_authid.h" #include "citus_version.h" #include "commands/dbcommands.h" #include "commands/defrem.h" @@ -63,6 +64,7 @@ #include "distributed/multi_explain.h" #include "distributed/multi_physical_planner.h" #include "distributed/reference_table_utils.h" +#include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/string_utils.h" #include "distributed/transaction_management.h" @@ -74,6 +76,7 @@ #include "nodes/parsenodes.h" #include "nodes/pg_list.h" #include "nodes/makefuncs.h" +#include "postmaster/postmaster.h" #include "tcop/utility.h" #include "utils/builtins.h" #include "utils/fmgroids.h" @@ -109,6 +112,8 @@ static void PostStandardProcessUtility(Node *parsetree); static void DecrementUtilityHookCountersIfNecessary(Node *parsetree); static bool IsDropSchemaOrDB(Node *parsetree); static bool ShouldCheckUndistributeCitusLocalTables(void); +static void RunPreprocessMainDBCommand(Node *parsetree, const char *queryString); +static void RunPostprocessMainDBCommand(Node *parsetree); /* * ProcessUtilityParseTree is a convenience method to create a PlannedStmt out of @@ -240,6 +245,11 @@ citus_ProcessUtility(PlannedStmt *pstmt, if (!CitusHasBeenLoaded()) { + if (!IsCitusMainDB()) + { + RunPreprocessMainDBCommand(parsetree, queryString); + } + /* * Ensure that utility commands do not behave any differently until CREATE * EXTENSION is invoked. @@ -247,6 +257,11 @@ citus_ProcessUtility(PlannedStmt *pstmt, PrevProcessUtility(pstmt, queryString, false, context, params, queryEnv, dest, completionTag); + if (!IsCitusMainDB()) + { + RunPostprocessMainDBCommand(parsetree); + } + return; } else if (IsA(parsetree, CallStmt)) @@ -1553,3 +1568,45 @@ DropSchemaOrDBInProgress(void) { return activeDropSchemaOrDBs > 0; } + + +/* + * RunPreprocessMainDBCommand runs the necessary commands for a query, in main + * database before query is run on the local node with PrevProcessUtility + */ +static void RunPreprocessMainDBCommand(Node *parsetree, const char *queryString) +{ + if (IsA(parsetree, CreateRoleStmt)) + { + StringInfo mainDBQuery = makeStringInfo(); + appendStringInfo(mainDBQuery, + "SELECT citus_internal_start_management_transaction('%lu')", + GetCurrentFullTransactionId().value); + RunCitusMainDBQuery(mainDBQuery->data); + mainDBQuery = makeStringInfo(); + appendStringInfo(mainDBQuery, + "SELECT execute_command_on_other_nodes(%s)", + quote_literal_cstr(queryString)); + RunCitusMainDBQuery(mainDBQuery->data); + } +} + +/* + * RunPostprocessMainDBCommand runs the necessary commands for a query, in main + * database after query is run on the local node with PrevProcessUtility + */ +static void RunPostprocessMainDBCommand(Node *parsetree) +{ + if (IsA(parsetree, CreateRoleStmt)) + { + StringInfo mainDBQuery = makeStringInfo(); + CreateRoleStmt *createRoleStmt = castNode(CreateRoleStmt, parsetree); + Oid roleOid = get_role_oid(createRoleStmt->role, false); + appendStringInfo(mainDBQuery, + "SELECT citus_mark_object_distributed(%d, %s, %d)", + AuthIdRelationId, + quote_literal_cstr(createRoleStmt->role), + roleOid); + RunCitusMainDBQuery(mainDBQuery->data); + } +} diff --git a/src/backend/distributed/connection/connection_configuration.c b/src/backend/distributed/connection/connection_configuration.c index bf61f7fac37..997d35e5912 100644 --- a/src/backend/distributed/connection/connection_configuration.c +++ b/src/backend/distributed/connection/connection_configuration.c @@ -444,7 +444,19 @@ GetEffectiveConnKey(ConnectionHashKey *key) return key; } - WorkerNode *worker = FindWorkerNode(key->hostname, key->port); + WorkerNode *worker = NULL; + + if (!CitusHasBeenLoaded()) + { + /* + * This happens when we connect to main database over localhost + * from some non Citus database. + */ + return key; + } + + worker = FindWorkerNode(key->hostname, key->port); + if (worker == NULL) { /* this can be hit when the key references an unknown node */ diff --git a/src/backend/distributed/metadata/distobject.c b/src/backend/distributed/metadata/distobject.c index fa9da8b7531..c1659d49f9c 100644 --- a/src/backend/distributed/metadata/distobject.c +++ b/src/backend/distributed/metadata/distobject.c @@ -37,6 +37,7 @@ #include "distributed/metadata/pg_dist_object.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" +#include "distributed/remote_commands.h" #include "distributed/version_compat.h" #include "distributed/worker_transaction.h" #include "executor/spi.h" @@ -50,15 +51,35 @@ #include "utils/rel.h" -static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress); +static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress, + char *objectName); static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes, Datum *paramValues); static bool IsObjectDistributed(const ObjectAddress *address); +PG_FUNCTION_INFO_V1(citus_mark_object_distributed); PG_FUNCTION_INFO_V1(citus_unmark_object_distributed); PG_FUNCTION_INFO_V1(master_unmark_object_distributed); +/* + * citus_mark_object_distributed adds an object to pg_dist_object + * in all of the nodes. + */ +Datum +citus_mark_object_distributed(PG_FUNCTION_ARGS) +{ + Oid classId = PG_GETARG_OID(0); + text *objectNameText = PG_GETARG_TEXT_P(1); + char *objectName = text_to_cstring(objectNameText); + Oid objectId = PG_GETARG_OID(2); + ObjectAddress *objectAddress = palloc0(sizeof(ObjectAddress)); + ObjectAddressSet(*objectAddress, classId, objectId); + MarkObjectDistributedWithName(objectAddress, objectName); + PG_RETURN_VOID(); +} + + /* * citus_unmark_object_distributed(classid oid, objid oid, objsubid int) * @@ -158,12 +179,28 @@ ObjectExists(const ObjectAddress *address) void MarkObjectDistributed(const ObjectAddress *distAddress) { + MarkObjectDistributedWithName(distAddress, ""); +} + + +/* + * MarkObjectDistributedWithName marks an object as a distributed object. + * Same as MarkObjectDistributed but this function also allows passing an objectName + * that is used in case the object does not exists for the current transaction. + */ +void +MarkObjectDistributedWithName(const ObjectAddress *distAddress, char* objectName) +{ + if (!CitusHasBeenLoaded()) + { + return; + } MarkObjectDistributedLocally(distAddress); if (EnableMetadataSync) { char *workerPgDistObjectUpdateCommand = - CreatePgDistObjectEntryCommand(distAddress); + CreatePgDistObjectEntryCommand(distAddress, objectName); SendCommandToRemoteNodesWithMetadata(workerPgDistObjectUpdateCommand); } } @@ -186,7 +223,7 @@ MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress) if (EnableMetadataSync) { char *workerPgDistObjectUpdateCommand = - CreatePgDistObjectEntryCommand(distAddress); + CreatePgDistObjectEntryCommand(distAddress, ""); SendCommandToRemoteNodesWithMetadataViaSuperUser(workerPgDistObjectUpdateCommand); } } @@ -277,17 +314,20 @@ ShouldMarkRelationDistributed(Oid relationId) * for the given object address. */ static char * -CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress) +CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress, char *objectName) { /* create a list by adding the address of value to not to have warning */ List *objectAddressList = list_make1((ObjectAddress *) objectAddress); + /* names also require a list so we create a nested list here */ + List *objectNameList = list_make1(list_make1((char *)objectName)); List *distArgumetIndexList = list_make1_int(INVALID_DISTRIBUTION_ARGUMENT_INDEX); List *colocationIdList = list_make1_int(INVALID_COLOCATION_ID); List *forceDelegationList = list_make1_int(NO_FORCE_PUSHDOWN); char *workerPgDistObjectUpdateCommand = MarkObjectsDistributedCreateCommand(objectAddressList, + objectNameList, distArgumetIndexList, colocationIdList, forceDelegationList); diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 44179cffbdf..11edc6a3830 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -55,6 +55,7 @@ #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_placement.h" +#include "distributed/remote_commands.h" #include "distributed/shared_library_init.h" #include "distributed/shardinterval_utils.h" #include "distributed/utils/array_type.h" @@ -5722,6 +5723,10 @@ GetPoolinfoViaCatalog(int32 nodeId) char * GetAuthinfoViaCatalog(const char *roleName, int64 nodeId) { + if (!CitusHasBeenLoaded()) + { + return ""; + } char *authinfo = ""; Datum nodeIdDatumArray[2] = { Int32GetDatum(nodeId), diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 53dc7e74763..0138a1551f6 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -64,6 +64,7 @@ #include "distributed/pg_dist_schema.h" #include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" +#include "distributed/remote_transaction.h" #include "distributed/resource_lock.h" #include "distributed/utils/array_type.h" #include "distributed/utils/function.h" @@ -895,6 +896,7 @@ NodeListIdempotentInsertCommand(List *workerNodeList) */ char * MarkObjectsDistributedCreateCommand(List *addresses, + List *namesArg, List *distributionArgumentIndexes, List *colocationIds, List *forceDelegations) @@ -919,9 +921,25 @@ MarkObjectsDistributedCreateCommand(List *addresses, int forceDelegation = list_nth_int(forceDelegations, currentObjectCounter); List *names = NIL; List *args = NIL; + char *objectType = NULL; - char *objectType = getObjectTypeDescription(address, false); - getObjectIdentityParts(address, &names, &args, false); + if (IsMainDBCommand) + { + /* + * When we try to distribute an object that's being created in a non Citus + * main database, we cannot find the name, since the object is not visible + * in Citus main database. + * Because of that we need to pass the name to this function. + */ + names = list_nth(namesArg, currentObjectCounter); + bool missingOk = false; + objectType = getObjectTypeDescription(address, missingOk); + } + else + { + objectType = getObjectTypeDescription(address, false); + getObjectIdentityParts(address, &names, &args, IsMainDBCommand); + } if (!isFirstObject) { @@ -4988,6 +5006,7 @@ SendDistObjectCommands(MetadataSyncContext *context) char *workerMetadataUpdateCommand = MarkObjectsDistributedCreateCommand(list_make1(address), + NIL, list_make1_int(distributionArgumentIndex), list_make1_int(colocationId), list_make1_int(forceDelegation)); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 22037c82bd4..7b6fd4264de 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -85,6 +85,7 @@ #include "distributed/time_constants.h" #include "distributed/query_stats.h" #include "distributed/remote_commands.h" +#include "distributed/remote_transaction.h" #include "distributed/shard_rebalancer.h" #include "distributed/shared_library_init.h" #include "distributed/statistics_collection.h" @@ -3139,6 +3140,8 @@ CitusAuthHook(Port *port, int status) */ InitializeBackendData(port->application_name); + IsMainDB = (strncmp(MainDb, "", NAMEDATALEN) == 0 || + strncmp(MainDb, port->database_name, NAMEDATALEN) == 0); /* let other authentication hooks to kick in first */ if (original_client_auth_hook) diff --git a/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql b/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql index 4e2a515a357..0df4d9f74bd 100644 --- a/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql +++ b/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql @@ -10,3 +10,11 @@ #include "udfs/citus_internal_delete_placement_metadata/12.1-1.sql" #include "udfs/citus_schema_move/12.1-1.sql" + + +#include "udfs/citus_internal_start_management_transaction/12.1-1.sql" +#include "udfs/execute_command_on_other_nodes/12.1-1.sql" +#include "udfs/citus_mark_object_distributed/12.1-1.sql" +#include "udfs/commit_management_command_2pc/12.1-1.sql" + +ALTER TABLE pg_catalog.pg_dist_transaction ADD COLUMN outer_xid xid8; diff --git a/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql b/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql index 6f58b2f54b1..c87e23f9d9d 100644 --- a/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql @@ -22,3 +22,20 @@ DROP FUNCTION pg_catalog.citus_schema_move( schema_id regnamespace, target_node_id integer, shard_transfer_mode citus.shard_transfer_mode ); + + +DROP FUNCTION pg_catalog.citus_internal_start_management_transaction( + outer_xid xid8 +); + +DROP FUNCTION pg_catalog.execute_command_on_other_nodes( + query text +); + +DROP FUNCTION pg_catalog.citus_mark_object_distributed( + classId Oid, objectName text, objectId Oid +); + +DROP FUNCTION pg_catalog.commit_management_command_2pc(); + +ALTER TABLE pg_catalog.pg_dist_transaction DROP COLUMN outer_xid; diff --git a/src/backend/distributed/sql/udfs/citus_internal_start_management_transaction/12.1-1.sql b/src/backend/distributed/sql/udfs/citus_internal_start_management_transaction/12.1-1.sql new file mode 100644 index 00000000000..0b4ab2b4ff4 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_start_management_transaction/12.1-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_start_management_transaction(outer_xid xid8) + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$citus_internal_start_management_transaction$$; + +COMMENT ON FUNCTION pg_catalog.citus_internal_start_management_transaction(outer_xid xid8) + IS 'internal Citus function that starts a management transaction in the main database'; diff --git a/src/backend/distributed/sql/udfs/citus_internal_start_management_transaction/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_start_management_transaction/latest.sql new file mode 100644 index 00000000000..0b4ab2b4ff4 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_start_management_transaction/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_start_management_transaction(outer_xid xid8) + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$citus_internal_start_management_transaction$$; + +COMMENT ON FUNCTION pg_catalog.citus_internal_start_management_transaction(outer_xid xid8) + IS 'internal Citus function that starts a management transaction in the main database'; diff --git a/src/backend/distributed/sql/udfs/citus_mark_object_distributed/12.1-1.sql b/src/backend/distributed/sql/udfs/citus_mark_object_distributed/12.1-1.sql new file mode 100644 index 00000000000..70c7bb91d88 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_mark_object_distributed/12.1-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_mark_object_distributed(classId Oid, objectName text, objectId Oid) + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$citus_mark_object_distributed$$; + +COMMENT ON FUNCTION pg_catalog.citus_mark_object_distributed(classId Oid, objectName text, objectId Oid) + IS 'adds an object to pg_dist_object on all nodes'; diff --git a/src/backend/distributed/sql/udfs/citus_mark_object_distributed/latest.sql b/src/backend/distributed/sql/udfs/citus_mark_object_distributed/latest.sql new file mode 100644 index 00000000000..70c7bb91d88 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_mark_object_distributed/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_mark_object_distributed(classId Oid, objectName text, objectId Oid) + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$citus_mark_object_distributed$$; + +COMMENT ON FUNCTION pg_catalog.citus_mark_object_distributed(classId Oid, objectName text, objectId Oid) + IS 'adds an object to pg_dist_object on all nodes'; diff --git a/src/backend/distributed/sql/udfs/commit_management_command_2pc/12.1-1.sql b/src/backend/distributed/sql/udfs/commit_management_command_2pc/12.1-1.sql new file mode 100644 index 00000000000..2677c8301ca --- /dev/null +++ b/src/backend/distributed/sql/udfs/commit_management_command_2pc/12.1-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.commit_management_command_2pc() + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$commit_management_command_2pc$$; + +COMMENT ON FUNCTION pg_catalog.commit_management_command_2pc() + IS 'commits the coordinated remote transactions, is a wrapper function for CoordinatedRemoteTransactionsCommit'; diff --git a/src/backend/distributed/sql/udfs/commit_management_command_2pc/latest.sql b/src/backend/distributed/sql/udfs/commit_management_command_2pc/latest.sql new file mode 100644 index 00000000000..2677c8301ca --- /dev/null +++ b/src/backend/distributed/sql/udfs/commit_management_command_2pc/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.commit_management_command_2pc() + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$commit_management_command_2pc$$; + +COMMENT ON FUNCTION pg_catalog.commit_management_command_2pc() + IS 'commits the coordinated remote transactions, is a wrapper function for CoordinatedRemoteTransactionsCommit'; diff --git a/src/backend/distributed/sql/udfs/execute_command_on_other_nodes/12.1-1.sql b/src/backend/distributed/sql/udfs/execute_command_on_other_nodes/12.1-1.sql new file mode 100644 index 00000000000..1401ac0cd45 --- /dev/null +++ b/src/backend/distributed/sql/udfs/execute_command_on_other_nodes/12.1-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.execute_command_on_other_nodes(query text) + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$execute_command_on_other_nodes$$; + +COMMENT ON FUNCTION pg_catalog.execute_command_on_other_nodes(query text) + IS 'executes a query on the nodes other than the current one'; diff --git a/src/backend/distributed/sql/udfs/execute_command_on_other_nodes/latest.sql b/src/backend/distributed/sql/udfs/execute_command_on_other_nodes/latest.sql new file mode 100644 index 00000000000..1401ac0cd45 --- /dev/null +++ b/src/backend/distributed/sql/udfs/execute_command_on_other_nodes/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.execute_command_on_other_nodes(query text) + RETURNS VOID + LANGUAGE C +AS 'MODULE_PATHNAME', $$execute_command_on_other_nodes$$; + +COMMENT ON FUNCTION pg_catalog.execute_command_on_other_nodes(query text) + IS 'executes a query on the nodes other than the current one'; diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 0f62417931f..b9c351a3b0e 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -23,8 +23,10 @@ #include "distributed/backend_data.h" #include "distributed/citus_safe_lib.h" #include "distributed/connection_management.h" +#include "distributed/commands/utility_hook.h" #include "distributed/listutils.h" #include "distributed/metadata_cache.h" +#include "distributed/metadata/distobject.h" #include "distributed/placement_connection.h" #include "distributed/remote_commands.h" #include "distributed/remote_transaction.h" @@ -32,8 +34,10 @@ #include "distributed/transaction_management.h" #include "distributed/transaction_recovery.h" #include "distributed/worker_manager.h" +#include "postmaster/postmaster.h" #include "utils/builtins.h" #include "utils/hsearch.h" +#include "utils/xid8.h" #define PREPARED_TRANSACTION_NAME_FORMAT "citus_%u_%u_"UINT64_FORMAT "_%u" @@ -56,6 +60,9 @@ static void FinishRemoteTransactionSavepointRollback(MultiConnection *connection static void Assign2PCIdentifier(MultiConnection *connection); +PG_FUNCTION_INFO_V1(citus_internal_start_management_transaction); +PG_FUNCTION_INFO_V1(execute_command_on_other_nodes); +PG_FUNCTION_INFO_V1(commit_management_command_2pc); static char *IsolationLevelName[] = { "READ UNCOMMITTED", @@ -64,6 +71,124 @@ static char *IsolationLevelName[] = { "SERIALIZABLE" }; +/* + * These variables are necessary for running queries from a database that is not + * the Citus main database. Some of these queries need to be propagated to the + * workers and Citus main database will be used for these queries, such as + * CREATE ROLE. For that we create a connection to the Citus main database and + * run queries from there. + * + * MainDb is the name of the Citus main database. + * MainDBConnection is the MultiConnection used for connecting Citus main database. + * IsMainDBCommand is true if this is a query in the Citus main database that is started + * by a query from a different database. + * OuterXid is the transaction id of the query from the other dtabase that started the + * main database query. + */ +char *MainDb = ""; +MultiConnection *MainDBConnection = NULL; +bool IsMainDBCommand = false; +FullTransactionId OuterXid; +bool IsMainDB = true; + + +/* + * citus_internal_start_management_transaction starts a management transaction + * in the main database by recording the outer transaction's transaction id and setting + * IsMainDBCommand to true. + */ +Datum +citus_internal_start_management_transaction(PG_FUNCTION_ARGS) +{ + OuterXid = PG_GETARG_FULLTRANSACTIONID(0); + IsMainDBCommand = true; + + PG_RETURN_VOID(); +} + + +/* + * execute_command_on_other_nodes executes the query on the nodes + * other than the current node. + */ +Datum +execute_command_on_other_nodes(PG_FUNCTION_ARGS) +{ + text *queryText = PG_GETARG_TEXT_P(0); + char *query = text_to_cstring(queryText); + + List *queryList = NIL; + queryList = lappend(queryList, "SET citus.enable_metadata_sync TO OFF;"); + queryList = lappend(queryList, query); + queryList = lappend(queryList, "SET citus.enable_metadata_sync TO ON;"); + List *taskList = NodeDDLTaskList(OTHER_NODES, queryList); + DDLJob *ddlJob = NULL; + foreach_ptr(ddlJob, taskList) + { + ExecuteDistributedDDLJob(ddlJob); + } + PG_RETURN_VOID(); +} + + +/* + * commit_management_command_2pc is a wrapper UDF for + * CoordinatedRemoteTransactionsCommit + */ +Datum +commit_management_command_2pc(PG_FUNCTION_ARGS) +{ + CoordinatedRemoteTransactionsCommit(); + + PG_RETURN_VOID(); +} + + +/* + * IsCitusMainDB returns true if this is the Citus main database. + */ +bool +IsCitusMainDB(void) +{ + return IsMainDB; +} + + +/* + * RunCitusMainDBQuery creates a connection to Citus main database if necessary + * and runs the query over the connection in the main database. + */ +void +RunCitusMainDBQuery(char *query) +{ + if (MainDBConnection == NULL) + { + int flags = 0; + MainDBConnection = GetNodeUserDatabaseConnection(flags, LocalHostName, + PostPortNumber, + NULL, MainDb); + RemoteTransactionBegin(MainDBConnection); + } + + SendRemoteCommand(MainDBConnection, query); + ForgetResults(MainDBConnection); +} + + +/* + * CleanCitusMainDBConnection closes and removes the connection to Citus main database. + */ +void +CleanCitusMainDBConnection(void) +{ + if (MainDBConnection == NULL) + { + return; + } + CloseConnection(MainDBConnection); + MainDBConnection = NULL; +} + /* * StartRemoteTransactionBegin initiates beginning the remote transaction in @@ -616,7 +741,7 @@ StartRemoteTransactionPrepare(struct MultiConnection *connection) WorkerNode *workerNode = FindWorkerNode(connection->hostname, connection->port); if (workerNode != NULL) { - LogTransactionRecord(workerNode->groupId, transaction->preparedName); + LogTransactionRecord(workerNode->groupId, transaction->preparedName, OuterXid); } /* diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 9a7bd908918..c31cab9eaab 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -41,6 +41,7 @@ #include "distributed/transaction_management.h" #include "distributed/placement_connection.h" #include "distributed/relation_access_tracking.h" +#include "distributed/remote_commands.h" #include "distributed/shared_connection_stats.h" #include "distributed/shard_cleaner.h" #include "distributed/subplan_execution.h" @@ -48,6 +49,7 @@ #include "distributed/worker_log_messages.h" #include "distributed/commands.h" #include "distributed/metadata_cache.h" +#include "postmaster/postmaster.h" #include "utils/hsearch.h" #include "utils/guc.h" #include "utils/memutils.h" @@ -317,12 +319,19 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) MemoryContext previousContext = MemoryContextSwitchTo(CitusXactCallbackContext); - if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED) + if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED && + !IsMainDBCommand) { /* handles both already prepared and open transactions */ CoordinatedRemoteTransactionsCommit(); } + if (!IsCitusMainDB() && MainDBConnection != NULL) + { + RunCitusMainDBQuery("SELECT commit_management_command_2pc()"); + CleanCitusMainDBConnection(); + } + /* close connections etc. */ if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) { @@ -378,6 +387,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) RemoveIntermediateResultsDirectories(); + CleanCitusMainDBConnection(); + /* handles both already prepared and open transactions */ if (CurrentCoordinatedTransactionState > COORD_TRANS_IDLE) { @@ -537,7 +548,10 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) * us to mark failed placements as invalid. Better don't use * this for anything important (i.e. DDL/metadata). */ - CoordinatedRemoteTransactionsCommit(); + if (IsCitusMainDB()) + { + CoordinatedRemoteTransactionsCommit(); + } CurrentCoordinatedTransactionState = COORD_TRANS_COMMITTED; } diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index a833f5a4679..82fa63def8f 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -41,10 +41,12 @@ #include "lib/stringinfo.h" #include "storage/lmgr.h" #include "storage/lock.h" +#include "storage/procarray.h" #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/memutils.h" #include "utils/rel.h" +#include "utils/xid8.h" /* exports for SQL callable functions */ @@ -81,7 +83,7 @@ recover_prepared_transactions(PG_FUNCTION_ARGS) * prepared transaction should be committed. */ void -LogTransactionRecord(int32 groupId, char *transactionName) +LogTransactionRecord(int32 groupId, char *transactionName, FullTransactionId outerXid) { Datum values[Natts_pg_dist_transaction]; bool isNulls[Natts_pg_dist_transaction]; @@ -92,6 +94,7 @@ LogTransactionRecord(int32 groupId, char *transactionName) values[Anum_pg_dist_transaction_groupid - 1] = Int32GetDatum(groupId); values[Anum_pg_dist_transaction_gid - 1] = CStringGetTextDatum(transactionName); + values[Anum_pg_dist_transaction_outerxid - 1] = FullTransactionIdGetDatum(outerXid); /* open transaction relation and insert new tuple */ Relation pgDistTransaction = table_open(DistTransactionRelationId(), @@ -257,6 +260,45 @@ RecoverWorkerTransactions(WorkerNode *workerNode) continue; } + /* Check if the transaction is created by an outer transaction from another database */ + bool outerXidIsNull = false; + Datum outerXidDatum = heap_getattr(heapTuple, + Anum_pg_dist_transaction_outerxid, + tupleDescriptor, &outerXidIsNull); + + if (!outerXidIsNull) + { + FullTransactionId outerFullXid = DatumGetFullTransactionId(outerXidDatum); + TransactionId outerXid = XidFromFullTransactionId(outerFullXid); + bool outerXactIsInProgress = TransactionIdIsInProgress(outerXid); + bool outerXactDidCommit = TransactionIdDidCommit(outerXid); + if (outerXactIsInProgress && !outerXactDidCommit) + { + /* + * The transaction is initiated from an outer transaction and the outer + * transaction is not yet committed, so we should not commit either + */ + hash_search(pendingTransactionSet, transactionName, HASH_REMOVE, + &foundPreparedTransactionBeforeCommit); + continue; + } + else if (!outerXactIsInProgress && !outerXactDidCommit) + { + /* + * Since outer transaction isn't in progress and did not commit we need to + * abort the prepared transaction too. + */ + continue; + } + else + { + /* + * Outer transaction did commit, so we can try to commit the prepared + * transaction too. + */ + } + } + /* * Remove the transaction from the pending list such that only transactions * that need to be aborted remain at the end. diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 3399365aa63..a0cee16df00 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -232,7 +232,9 @@ List * TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) { List *workerNodeList = NIL; - if (targetWorkerSet == ALL_SHARD_NODES || targetWorkerSet == METADATA_NODES) + if (targetWorkerSet == ALL_SHARD_NODES || + targetWorkerSet == METADATA_NODES || + targetWorkerSet == OTHER_NODES) { workerNodeList = ActivePrimaryNodeList(lockMode); } @@ -264,6 +266,11 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) continue; } + if (targetWorkerSet == OTHER_NODES && workerNode->nodeId == GetLocalNodeId()) + { + continue; + } + result = lappend(result, workerNode); } diff --git a/src/include/distributed/metadata/distobject.h b/src/include/distributed/metadata/distobject.h index de56c0e1fb6..67ae08f8542 100644 --- a/src/include/distributed/metadata/distobject.h +++ b/src/include/distributed/metadata/distobject.h @@ -23,6 +23,7 @@ extern bool CitusExtensionObject(const ObjectAddress *objectAddress); extern bool IsAnyObjectDistributed(const List *addresses); extern bool ClusterHasDistributedFunctionWithDistArgument(void); extern void MarkObjectDistributed(const ObjectAddress *distAddress); +extern void MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *name); extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress); extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress); extern void UnmarkObjectDistributed(const ObjectAddress *address); diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 237df363a13..8a325a5fce1 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -88,6 +88,7 @@ extern List * NodeMetadataCreateCommands(void); extern List * CitusTableMetadataCreateCommandList(Oid relationId); extern List * NodeMetadataDropCommands(void); extern char * MarkObjectsDistributedCreateCommand(List *addresses, + List *names, List *distributionArgumentIndexes, List *colocationIds, List *forceDelegations); diff --git a/src/include/distributed/pg_dist_transaction.h b/src/include/distributed/pg_dist_transaction.h index 815633b7030..95658f782ca 100644 --- a/src/include/distributed/pg_dist_transaction.h +++ b/src/include/distributed/pg_dist_transaction.h @@ -35,9 +35,10 @@ typedef FormData_pg_dist_transaction *Form_pg_dist_transaction; * compiler constants for pg_dist_transaction * ---------------- */ -#define Natts_pg_dist_transaction 2 +#define Natts_pg_dist_transaction 3 #define Anum_pg_dist_transaction_groupid 1 #define Anum_pg_dist_transaction_gid 2 +#define Anum_pg_dist_transaction_outerxid 3 #endif /* PG_DIST_TRANSACTION_H */ diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index 71cb9dad27f..5b8bff5ceae 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -76,5 +76,7 @@ extern void StoreErrorMessage(MultiConnection *connection, StringInfo queryResul extern bool IsSettingSafeToPropagate(const char *name); +extern bool IsCitusMainDB(void); + #endif /* REMOTE_COMMAND_H */ diff --git a/src/include/distributed/remote_transaction.h b/src/include/distributed/remote_transaction.h index 6136f25c9f8..d9bf13f5793 100644 --- a/src/include/distributed/remote_transaction.h +++ b/src/include/distributed/remote_transaction.h @@ -143,4 +143,12 @@ extern void CoordinatedRemoteTransactionsSavepointBegin(SubTransactionId subId); extern void CoordinatedRemoteTransactionsSavepointRelease(SubTransactionId subId); extern void CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId); +extern void RunCitusMainDBQuery(char *query); +extern void CleanCitusMainDBConnection(void); + +extern bool IsMainDBCommand; +extern bool IsMainDB; +extern char *MainDb; +extern struct MultiConnection *MainDBConnection; + #endif /* REMOTE_TRANSACTION_H */ diff --git a/src/include/distributed/transaction_recovery.h b/src/include/distributed/transaction_recovery.h index 811dbb949f0..a4073875aa2 100644 --- a/src/include/distributed/transaction_recovery.h +++ b/src/include/distributed/transaction_recovery.h @@ -17,7 +17,8 @@ extern int Recover2PCInterval; /* Functions declarations for worker transactions */ -extern void LogTransactionRecord(int32 groupId, char *transactionName); +extern void LogTransactionRecord(int32 groupId, char *transactionName, + FullTransactionId outerXid); extern int RecoverTwoPhaseCommits(void); extern void DeleteWorkerTransactions(WorkerNode *workerNode); diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index d622fe366cd..ec49d21e002 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -54,7 +54,12 @@ typedef enum TargetWorkerSet * All the active primary nodes in the metadata which have metadata * (includes the coodinator if it is added) */ - METADATA_NODES + METADATA_NODES, + + /* + * All the active primary nodes in the metadata except the current node + */ + OTHER_NODES } TargetWorkerSet; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 295b10c76e0..321feb352be 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1402,11 +1402,15 @@ SELECT * FROM multi_extension.print_extension_changes(); previous_object | current_object --------------------------------------------------------------------- | function citus_internal_delete_placement_metadata(bigint) void + | function citus_internal_start_management_transaction(xid8) void | function citus_internal_update_none_dist_table_metadata(oid,"char",bigint,boolean) void + | function citus_mark_object_distributed(oid,text,oid) void | function citus_pause_node_within_txn(integer,boolean,integer) void | function citus_schema_move(regnamespace,integer,citus.shard_transfer_mode) void | function citus_schema_move(regnamespace,text,integer,citus.shard_transfer_mode) void -(5 rows) + | function commit_management_command_2pc() void + | function execute_command_on_other_nodes(text) void +(9 rows) -- Test downgrade to 12.1-1 from 12.2-1 ALTER EXTENSION citus UPDATE TO '12.2-1'; diff --git a/src/test/regress/expected/multi_mx_transaction_recovery.out b/src/test/regress/expected/multi_mx_transaction_recovery.out index 20cec75783a..0a29a22af65 100644 --- a/src/test/regress/expected/multi_mx_transaction_recovery.out +++ b/src/test/regress/expected/multi_mx_transaction_recovery.out @@ -64,7 +64,7 @@ SELECT recover_prepared_transactions(); (1 row) -- delete the citus_122_should_do_nothing transaction -DELETE FROM pg_dist_transaction WHERE gid = 'citus_122_should_do_nothing' RETURNING *; +DELETE FROM pg_dist_transaction WHERE gid = 'citus_122_should_do_nothing' RETURNING groupid, gid; groupid | gid --------------------------------------------------------------------- 122 | citus_122_should_do_nothing diff --git a/src/test/regress/expected/other_databases.out b/src/test/regress/expected/other_databases.out new file mode 100644 index 00000000000..bb91b130002 --- /dev/null +++ b/src/test/regress/expected/other_databases.out @@ -0,0 +1,57 @@ +CREATE SCHEMA other_databases; +SET search_path TO other_databases; +SET citus.next_shard_id TO 10231023; +CREATE DATABASE other_db1; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +DETAIL: Citus does not propagate CREATE DATABASE command to workers +HINT: You can manually create a database and its extensions on workers. +\c other_db1 +SET citus.is_main_db TO false; +SHOW citus.is_main_db; + citus.is_main_db +--------------------------------------------------------------------- + off +(1 row) + +SHOW citus.main_db; + citus.main_db +--------------------------------------------------------------------- + regression +(1 row) + +CREATE USER other_db_user1; +CREATE USER other_db_user2; +BEGIN; +CREATE USER other_db_user3; +COMMIT; +BEGIN; +CREATE USER other_db_user4; +ROLLBACK; +BEGIN; +CREATE USER other_db_user5; +SELECT 1/0; +ERROR: division by zero +COMMIT; +CREATE USER other_db_user6; +\c regression +SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1; + usename +--------------------------------------------------------------------- + other_db_user1 + other_db_user2 + other_db_user3 + other_db_user6 +(4 rows) + +\c - - - :worker_1_port +SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1; + usename +--------------------------------------------------------------------- + other_db_user1 + other_db_user2 + other_db_user3 + other_db_user6 +(4 rows) + +\c - - - :master_port +DROP SCHEMA other_databases; diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 36bd504e88d..1adc29a6e70 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -80,6 +80,7 @@ ORDER BY 1; function citus_internal_is_replication_origin_tracking_active() function citus_internal_local_blocked_processes() function citus_internal_mark_node_not_synced(integer,integer) + function citus_internal_start_management_transaction(xid8) function citus_internal_start_replication_origin_tracking() function citus_internal_stop_replication_origin_tracking() function citus_internal_unregister_tenant_schema_globally(oid,text) @@ -99,6 +100,7 @@ ORDER BY 1; function citus_jsonb_concatenate_final(jsonb) function citus_local_disk_space_stats() function citus_locks() + function citus_mark_object_distributed(oid,text,oid) function citus_move_shard_placement(bigint,integer,integer,citus.shard_transfer_mode) function citus_move_shard_placement(bigint,text,integer,text,integer,citus.shard_transfer_mode) function citus_node_capacity_1(integer) @@ -166,6 +168,7 @@ ORDER BY 1; function cluster_clock_send(cluster_clock) function column_name_to_column(regclass,text) function column_to_column_name(regclass,text) + function commit_management_command_2pc() function coord_combine_agg(oid,cstring,anyelement) function coord_combine_agg_ffunc(internal,oid,cstring,anyelement) function coord_combine_agg_sfunc(internal,oid,cstring,anyelement) @@ -179,6 +182,7 @@ ORDER BY 1; function drop_old_time_partitions(regclass,timestamp with time zone) function dump_global_wait_edges() function dump_local_wait_edges() + function execute_command_on_other_nodes(text) function fetch_intermediate_results(text[],text,integer) function fix_all_partition_shard_index_names() function fix_partition_shard_index_names(regclass) @@ -343,5 +347,5 @@ ORDER BY 1; view citus_stat_tenants_local view pg_dist_shard_placement view time_partitions -(333 rows) +(337 rows) diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index afc4780bf2b..a04affb9fa4 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -4,6 +4,7 @@ test: failure_test_helpers # this should only be run by pg_regress_multi, you don't need it test: failure_setup test: multi_test_helpers multi_test_helpers_superuser +test: failure_non_main_db_2pc test: failure_parallel_connection test: failure_replicated_partitions test: multi_test_catalog_views diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 2c8f7b085b1..2daefc51154 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -108,6 +108,7 @@ test: object_propagation_debug test: undistribute_table test: run_command_on_all_nodes test: background_task_queue_monitor +test: other_databases # Causal clock test test: clock diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index 3acde4c3c62..fe8334fca71 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -490,6 +490,7 @@ sub generate_hba push(@pgOptions, "citus.enable_change_data_capture=on"); push(@pgOptions, "citus.stat_tenants_limit = 2"); push(@pgOptions, "citus.stat_tenants_track = 'ALL'"); +push(@pgOptions, "citus.main_db = 'regression'"); # Some tests look at shards in pg_class, make sure we can usually see them: push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'"); diff --git a/src/test/regress/sql/failure_non_main_db_2pc.sql b/src/test/regress/sql/failure_non_main_db_2pc.sql new file mode 100644 index 00000000000..35978319827 --- /dev/null +++ b/src/test/regress/sql/failure_non_main_db_2pc.sql @@ -0,0 +1,15 @@ +SELECT citus.mitmproxy('conn.allow()'); + +CREATE SCHEMA failure_non_main_db_2pc; +SET SEARCH_PATH TO 'failure_non_main_db_2pc'; + +SET citus.shard_count TO 4; + +CREATE TABLE dist (a INT, b INT); +SELECT create_distributed_table('dist', 'a'); + +SELECT citus.mitmproxy('conn.onQuery(query="commit_management_command_2pc").kill()'); + +CREATE USER user_1; + +SELECT citus.mitmproxy('conn.allow()'); diff --git a/src/test/regress/sql/multi_mx_transaction_recovery.sql b/src/test/regress/sql/multi_mx_transaction_recovery.sql index 2a6b4991bd2..e46917f3542 100644 --- a/src/test/regress/sql/multi_mx_transaction_recovery.sql +++ b/src/test/regress/sql/multi_mx_transaction_recovery.sql @@ -47,7 +47,7 @@ INSERT INTO pg_dist_transaction VALUES (122, 'citus_122_should_do_nothing'); SELECT recover_prepared_transactions(); -- delete the citus_122_should_do_nothing transaction -DELETE FROM pg_dist_transaction WHERE gid = 'citus_122_should_do_nothing' RETURNING *; +DELETE FROM pg_dist_transaction WHERE gid = 'citus_122_should_do_nothing' RETURNING groupid, gid; ROLLBACK PREPARED 'citus_122_should_do_nothing'; SELECT count(*) FROM pg_dist_transaction; diff --git a/src/test/regress/sql/other_databases.sql b/src/test/regress/sql/other_databases.sql new file mode 100644 index 00000000000..492bfe1df95 --- /dev/null +++ b/src/test/regress/sql/other_databases.sql @@ -0,0 +1,36 @@ +CREATE SCHEMA other_databases; +SET search_path TO other_databases; + +SET citus.next_shard_id TO 10231023; + +CREATE DATABASE other_db1; + +\c other_db1 +SHOW citus.main_db; + +CREATE USER other_db_user1; +CREATE USER other_db_user2; + +BEGIN; +CREATE USER other_db_user3; +COMMIT; + +BEGIN; +CREATE USER other_db_user4; +ROLLBACK; + +BEGIN; +CREATE USER other_db_user5; +SELECT 1/0; +COMMIT; + +CREATE USER other_db_user6; + +\c regression +SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1; + +\c - - - :worker_1_port +SELECT usename FROM pg_user WHERE usename LIKE 'other\_db\_user%' ORDER BY 1; + +\c - - - :master_port +DROP SCHEMA other_databases;