Skip to content

Commit

Permalink
address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
onurctirtir committed Nov 10, 2023
1 parent ab7bf64 commit 78be00b
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 64 deletions.
12 changes: 6 additions & 6 deletions src/backend/distributed/commands/dependencies.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ static char * DropTableIfExistsCommand(Oid relationId);
* Note; only the actual objects are created via a separate session, the records to
* pg_dist_object are created in this session. As a side effect the objects could be
* created on the nodes without a catalog entry. Updates to the objects on local node
* are not propagated to the other nodes until the record is visible on local node.
* are not propagated to the remote nodes until the record is visible on local node.
*
* This is solved by creating the dependencies in an idempotent manner, either via
* postgres native CREATE IF NOT EXISTS, or citus helper functions.
Expand Down Expand Up @@ -95,7 +95,7 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
* either get it now, or get it in citus_add_node after this transaction finishes and
* the pg_dist_object record becomes visible.
*/
List *otherNodes = ActivePrimaryOtherNodesList(RowShareLock);
List *remoteNodeList = ActivePrimaryRemoteNodeList(RowShareLock);

/*
* Lock dependent objects explicitly to make sure same DDL command won't be sent
Expand Down Expand Up @@ -127,12 +127,12 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
*/
if (HasAnyDependencyInPropagatedObjects(target))
{
SendCommandListToOtherNodesWithMetadata(ddlCommands);
SendCommandListToRemoteNodesWithMetadata(ddlCommands);
}
else
{
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, otherNodes)
foreach_ptr(workerNode, remoteNodeList)
{
const char *nodeName = workerNode->workerName;
uint32 nodePort = workerNode->workerPort;
Expand All @@ -144,8 +144,8 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
}

/*
* We do this after creating the objects on other nodes, we make sure
* that objects have been created on other nodes before marking them
* We do this after creating the objects on remote nodes, we make sure
* that objects have been created on remote nodes before marking them
* distributed, so MarkObjectDistributed wouldn't fail.
*/
foreach_ptr(dependency, dependenciesWithCommands)
Expand Down
12 changes: 6 additions & 6 deletions src/backend/distributed/commands/role.c
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ PostprocessAlterRoleStmt(Node *node, const char *queryString)
(void *) CreateAlterRoleIfExistsCommand(stmt),
ENABLE_DDL_PROPAGATION);

return NodeDDLTaskList(OTHER_NODES, commands);
return NodeDDLTaskList(REMOTE_NODES, commands);
}


Expand Down Expand Up @@ -240,7 +240,7 @@ PreprocessAlterRoleSetStmt(Node *node, const char *queryString,
(void *) sql,
ENABLE_DDL_PROPAGATION);

return NodeDDLTaskList(OTHER_NODES, commandList);
return NodeDDLTaskList(REMOTE_NODES, commandList);
}


Expand Down Expand Up @@ -946,7 +946,7 @@ PreprocessCreateRoleStmt(Node *node, const char *queryString,

commands = lappend(commands, ENABLE_DDL_PROPAGATION);

return NodeDDLTaskList(OTHER_NODES, commands);
return NodeDDLTaskList(REMOTE_NODES, commands);
}


Expand Down Expand Up @@ -1055,7 +1055,7 @@ PreprocessDropRoleStmt(Node *node, const char *queryString,
sql,
ENABLE_DDL_PROPAGATION);

return NodeDDLTaskList(OTHER_NODES, commands);
return NodeDDLTaskList(REMOTE_NODES, commands);
}


Expand Down Expand Up @@ -1172,7 +1172,7 @@ PreprocessGrantRoleStmt(Node *node, const char *queryString,
sql,
ENABLE_DDL_PROPAGATION);

return NodeDDLTaskList(OTHER_NODES, commands);
return NodeDDLTaskList(REMOTE_NODES, commands);
}


Expand Down Expand Up @@ -1345,7 +1345,7 @@ PreprocessAlterRoleRenameStmt(Node *node, const char *queryString,
(void *) sql,
ENABLE_DDL_PROPAGATION);

return NodeDDLTaskList(OTHER_NODES, commands);
return NodeDDLTaskList(REMOTE_NODES, commands);
}


Expand Down
14 changes: 7 additions & 7 deletions src/backend/distributed/commands/utility_hook.c
Original file line number Diff line number Diff line change
Expand Up @@ -1107,7 +1107,7 @@ IsDropSchemaOrDB(Node *parsetree)
* CoordinatedTransactionCallback function.
*
* The function errors out if the DDL is on a partitioned table which has replication
* factor > 1 or if the the coordinator is not added into metadata and we're on a
* factor > 1, or if the the coordinator is not added into metadata and we're on a
* worker node because we want to make sure that distributed DDL jobs are executed
* on the coordinator node too. See EnsurePropagationToCoordinator() for more details.
*/
Expand Down Expand Up @@ -1140,23 +1140,23 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
{
if (shouldSyncMetadata)
{
SendCommandToOtherNodesWithMetadata(DISABLE_DDL_PROPAGATION);
SendCommandToRemoteNodesWithMetadata(DISABLE_DDL_PROPAGATION);

char *currentSearchPath = CurrentSearchPath();

/*
* Given that we're relaying the query to the other nodes directly,
* Given that we're relaying the query to the remote nodes directly,
* we should set the search path exactly the same when necessary.
*/
if (currentSearchPath != NULL)
{
SendCommandToOtherNodesWithMetadata(
SendCommandToRemoteNodesWithMetadata(
psprintf("SET LOCAL search_path TO %s;", currentSearchPath));
}

if (ddlJob->metadataSyncCommand != NULL)
{
SendCommandToOtherNodesWithMetadata(
SendCommandToRemoteNodesWithMetadata(
(char *) ddlJob->metadataSyncCommand);
}
}
Expand Down Expand Up @@ -1236,7 +1236,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
char *currentSearchPath = CurrentSearchPath();

/*
* Given that we're relaying the query to the other nodes directly,
* Given that we're relaying the query to the remote nodes directly,
* we should set the search path exactly the same when necessary.
*/
if (currentSearchPath != NULL)
Expand All @@ -1248,7 +1248,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)

commandList = lappend(commandList, (char *) ddlJob->metadataSyncCommand);

SendBareCommandListToOtherMetadataNodes(commandList);
SendBareCommandListToRemoteMetadataNodes(commandList);
}
}
PG_CATCH();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,8 @@ EnsureConnectionPossibilityForRemotePrimaryNodes(void)
* seem to cause any problems as none of the placements that we are
* going to access would be on the new node.
*/
List *otherNodes = ActivePrimaryOtherNodesList(NoLock);
EnsureConnectionPossibilityForNodeList(otherNodes);
List *remoteNodeList = ActivePrimaryRemoteNodeList(NoLock);
EnsureConnectionPossibilityForNodeList(remoteNodeList);
}


Expand Down
8 changes: 4 additions & 4 deletions src/backend/distributed/metadata/distobject.c
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ ObjectExists(const ObjectAddress *address)
/*
* MarkObjectDistributed marks an object as a distributed object. Marking is done
* by adding appropriate entries to citus.pg_dist_object and also marking the object
* as distributed by opening a connection using current user to all other nodes
* as distributed by opening a connection using current user to all remote nodes
* with metadata if object propagation is on.
*
* This function should be used if the user creating the given object. If you want
Expand All @@ -164,15 +164,15 @@ MarkObjectDistributed(const ObjectAddress *distAddress)
{
char *workerPgDistObjectUpdateCommand =
CreatePgDistObjectEntryCommand(distAddress);
SendCommandToOtherNodesWithMetadata(workerPgDistObjectUpdateCommand);
SendCommandToRemoteNodesWithMetadata(workerPgDistObjectUpdateCommand);
}
}


/*
* MarkObjectDistributedViaSuperUser marks an object as a distributed object. Marking
* is done by adding appropriate entries to citus.pg_dist_object and also marking the
* object as distributed by opening a connection using super user to all other nodes
* object as distributed by opening a connection using super user to all remote nodes
* with metadata if object propagation is on.
*
* This function should be used to mark dependent object as distributed. If you want
Expand All @@ -187,7 +187,7 @@ MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress)
{
char *workerPgDistObjectUpdateCommand =
CreatePgDistObjectEntryCommand(distAddress);
SendCommandToOtherNodesWithMetadataViaSuperUser(workerPgDistObjectUpdateCommand);
SendCommandToRemoteNodesWithMetadataViaSuperUser(workerPgDistObjectUpdateCommand);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/backend/distributed/operations/worker_node_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,11 @@ ActivePrimaryNodeList(LOCKMODE lockMode)


/*
* ActivePrimaryOtherNodesList returns a list of all active primary nodes in
* ActivePrimaryRemoteNodeList returns a list of all active primary nodes in
* workerNodeHash except the local one.
*/
List *
ActivePrimaryOtherNodesList(LOCKMODE lockMode)
ActivePrimaryRemoteNodeList(LOCKMODE lockMode)
{
EnsureModificationsCanRun();
return FilterActiveNodeListFunc(lockMode, NodeIsPrimaryAndRemote);
Expand Down
60 changes: 30 additions & 30 deletions src/backend/distributed/transaction/worker_transaction.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@
#include "utils/memutils.h"
#include "utils/builtins.h"

static void SendCommandToOtherMetadataNodesParams(const char *command,
const char *user, int parameterCount,
const Oid *parameterTypes,
const char *const *parameterValues);
static void SendCommandToRemoteMetadataNodesParams(const char *command,
const char *user, int parameterCount,
const Oid *parameterTypes,
const char *const *parameterValues);
static void SendBareCommandListToMetadataNodesInternal(List *commandList,
TargetWorkerSet targetWorkerSet);
static void SendCommandToMetadataWorkersParams(const char *command,
Expand Down Expand Up @@ -157,20 +157,20 @@ SendCommandListToWorkersWithMetadata(List *commands)


/*
* SendCommandToOtherNodesWithMetadata sends a command to other nodes in
* SendCommandToRemoteNodesWithMetadata sends a command to remote nodes in
* parallel. Commands are committed on the nodes when the local transaction
* commits.
*/
void
SendCommandToOtherNodesWithMetadata(const char *command)
SendCommandToRemoteNodesWithMetadata(const char *command)
{
SendCommandToOtherMetadataNodesParams(command, CurrentUserName(),
0, NULL, NULL);
SendCommandToRemoteMetadataNodesParams(command, CurrentUserName(),
0, NULL, NULL);
}


/*
* SendCommandToOtherNodesWithMetadataViaSuperUser sends a command to other
* SendCommandToRemoteNodesWithMetadataViaSuperUser sends a command to remote
* nodes in parallel by opening a super user connection. Commands are committed
* on the nodes when the local transaction commits. The connection are made as
* the extension owner to ensure write access to the Citus metadata tables.
Expand All @@ -180,46 +180,46 @@ SendCommandToOtherNodesWithMetadata(const char *command)
* tuples for dependent objects.
*/
void
SendCommandToOtherNodesWithMetadataViaSuperUser(const char *command)
SendCommandToRemoteNodesWithMetadataViaSuperUser(const char *command)
{
SendCommandToOtherMetadataNodesParams(command, CitusExtensionOwnerName(),
0, NULL, NULL);
SendCommandToRemoteMetadataNodesParams(command, CitusExtensionOwnerName(),
0, NULL, NULL);
}


/*
* SendCommandListToOtherNodesWithMetadata sends all commands to other nodes
* with the current user. See `SendCommandToOtherNodesWithMetadata`for details.
* SendCommandListToRemoteNodesWithMetadata sends all commands to remote nodes
* with the current user. See `SendCommandToRemoteNodesWithMetadata`for details.
*/
void
SendCommandListToOtherNodesWithMetadata(List *commands)
SendCommandListToRemoteNodesWithMetadata(List *commands)
{
char *command = NULL;
foreach_ptr(command, commands)
{
SendCommandToOtherNodesWithMetadata(command);
SendCommandToRemoteNodesWithMetadata(command);
}
}


/*
* SendCommandToOtherMetadataNodesParams is a wrapper around
* SendCommandToRemoteMetadataNodesParams is a wrapper around
* SendCommandToWorkersParamsInternal() that can be used to send commands
* to other metadata nodes.
* to remote metadata nodes.
*/
static void
SendCommandToOtherMetadataNodesParams(const char *command,
const char *user, int parameterCount,
const Oid *parameterTypes,
const char *const *parameterValues)
SendCommandToRemoteMetadataNodesParams(const char *command,
const char *user, int parameterCount,
const Oid *parameterTypes,
const char *const *parameterValues)
{
/* use METADATA_NODES so that ErrorIfAnyMetadataNodeOutOfSync checks local node as well */
List *workerNodeList = TargetWorkerSetNodeList(METADATA_NODES,
RowShareLock);

ErrorIfAnyMetadataNodeOutOfSync(workerNodeList);

SendCommandToWorkersParamsInternal(OTHER_METADATA_NODES, command, user,
SendCommandToWorkersParamsInternal(REMOTE_METADATA_NODES, command, user,
parameterCount, parameterTypes, parameterValues);
}

Expand All @@ -236,9 +236,9 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
{
workerNodeList = ActivePrimaryNodeList(lockMode);
}
else if (targetWorkerSet == OTHER_NODES || targetWorkerSet == OTHER_METADATA_NODES)
else if (targetWorkerSet == REMOTE_NODES || targetWorkerSet == REMOTE_METADATA_NODES)
{
workerNodeList = ActivePrimaryOtherNodesList(lockMode);
workerNodeList = ActivePrimaryRemoteNodeList(lockMode);
}
else if (targetWorkerSet == NON_COORDINATOR_METADATA_NODES ||
targetWorkerSet == NON_COORDINATOR_NODES)
Expand All @@ -257,7 +257,7 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
foreach_ptr(workerNode, workerNodeList)
{
if ((targetWorkerSet == NON_COORDINATOR_METADATA_NODES ||
targetWorkerSet == OTHER_METADATA_NODES ||
targetWorkerSet == REMOTE_METADATA_NODES ||
targetWorkerSet == METADATA_NODES) &&
!workerNode->hasMetadata)
{
Expand All @@ -272,15 +272,15 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)


/*
* SendBareCommandListToOtherMetadataNodes is a wrapper around
* SendBareCommandListToRemoteMetadataNodes is a wrapper around
* SendBareCommandListToMetadataNodesInternal() that can be used to send
* bare commands to other metadata nodes.
* bare commands to remote metadata nodes.
*/
void
SendBareCommandListToOtherMetadataNodes(List *commandList)
SendBareCommandListToRemoteMetadataNodes(List *commandList)
{
SendBareCommandListToMetadataNodesInternal(commandList,
OTHER_METADATA_NODES);
REMOTE_METADATA_NODES);
}


Expand Down
2 changes: 1 addition & 1 deletion src/include/distributed/worker_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ extern uint32 ActivePrimaryNonCoordinatorNodeCount(void);
extern uint32 ActiveReadableNodeCount(void);
extern List * ActivePrimaryNonCoordinatorNodeList(LOCKMODE lockMode);
extern List * ActivePrimaryNodeList(LOCKMODE lockMode);
extern List * ActivePrimaryOtherNodesList(LOCKMODE lockMode);
extern List * ActivePrimaryRemoteNodeList(LOCKMODE lockMode);
extern bool CoordinatorAddedAsWorkerNode(void);
extern List * ReferenceTablePlacementNodeList(LOCKMODE lockMode);
extern WorkerNode * CoordinatorNodeIfAddedAsWorkerOrError(void);
Expand Down
12 changes: 6 additions & 6 deletions src/include/distributed/worker_transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ typedef enum TargetWorkerSet
* All the active primary nodes in the metadata which have metadata
* except the local node
*/
OTHER_METADATA_NODES,
REMOTE_METADATA_NODES,

/*
* All the active primary nodes in the metadata except the coordinator
Expand All @@ -43,7 +43,7 @@ typedef enum TargetWorkerSet
/*
* All the active primary nodes in the metadata except the local node
*/
OTHER_NODES,
REMOTE_NODES,

/*
* All active primary nodes in the metadata
Expand Down Expand Up @@ -85,10 +85,10 @@ extern bool SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(cons
extern void SendCommandToWorkersWithMetadata(const char *command);
extern void SendCommandToWorkersWithMetadataViaSuperUser(const char *command);
extern void SendCommandListToWorkersWithMetadata(List *commands);
extern void SendCommandToOtherNodesWithMetadata(const char *command);
extern void SendCommandToOtherNodesWithMetadataViaSuperUser(const char *command);
extern void SendCommandListToOtherNodesWithMetadata(List *commands);
extern void SendBareCommandListToOtherMetadataNodes(List *commandList);
extern void SendCommandToRemoteNodesWithMetadata(const char *command);
extern void SendCommandToRemoteNodesWithMetadataViaSuperUser(const char *command);
extern void SendCommandListToRemoteNodesWithMetadata(List *commands);
extern void SendBareCommandListToRemoteMetadataNodes(List *commandList);
extern void SendBareCommandListToMetadataWorkers(List *commandList);
extern void EnsureNoModificationsHaveBeenDone(void);
extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName,
Expand Down

0 comments on commit 78be00b

Please sign in to comment.