Skip to content

Commit

Permalink
Keep original ccnt into query structure (#1064)
Browse files Browse the repository at this point in the history
Problem:
Previous approach to identify a query that is currently being executed was based
on the global gp_command_count variable. Query identification is required by the
monitoring extensions to track the execution process. But gp_command_count is
incremented on each new query. So, in case of a complex query with initplans,
monitoring extensions were not able to track the execution of the query after
the initplan execution had been done, because gp_command_count was already
changed comparing to its value at the query start. To address this issue,
monitoring extensions hacked the gp_command_count value, which is dispatched
from the QD to the QEs (replaced it with the value stored before the init plan
execution). But it led to other problems, for ex. hanging of query execution if
IC proxy is enabled or if the plan had Shared Input scans, as these features
also relied on the gp_command_count value.

Fix:
This patch adds a support of a query monitoring into the GPDB core.
The approach uses already existing (but not previously used in the GPDB core)
field queryCommandId of PGPROC for command identification. The handling for
queryCommandId is reworked:

queryCommandId is set to gp_command_count every time gp_command_count is
incremented on the dispatcher (that’s old behavior, it is not changed).
QueryDesc now contains 1 new field (filled in CreateQueryDesc()) command_id -
the updated value of MyProc->queryCommandId after its increment.
At the Dispatcher side:
Executor switches MyProc->queryCommandId to the command_id of the QueryDesc at
the beginning of ExecutorStart(), ExecutorRun(), ExecutorFinish(),
ExecutorEnd(), and back to the preceding value at their end.
For utility commands, increment_command_count() is added in the
ProcessUtility(). At the end of ProcessUtility(), the value of
MyProc->queryCommandId is restored to preceding value.
MyProc->queryCommandId is added to the query message dispatched to the
executors (instead of gp_command_count). gp_command_count is not sent to
the QEs.
At the Executor side, backend acquires proper queryCommandId from the message
dispatched from the query dispatcher. On the QE the gp_command_count is set to
be equal to the queryCommandId.
Most parts of the code, that used gp_command_count, now use queryCommandId
(some places still use gp_command_count, like for ex. RunawayCleaner. They
require additional analysis and possibly would be updated later).
After this change, monitoring extension should use MyProc->queryCommandId as the
ccnt for the query identification instead of the gp_command_count.

gp_command_count still exists as a global counter at the QD, that is used to
assign a new value to the queryCommandId at the start of query execution.

Ticket: ADBDEV-6375
(cherry picked from commit 7ee713d)

Changes comparing to the original commit:
1. ABI check ignore file is moved to a folder with actual base version;
2. expected files for the test are updated because of GPDB 7 differences in some
queries execution;
3. matchsubs are updated in the test because of GPDB 7 differences;
4. gpperfmon is removed in GPDB 7, so changes in gpperfmon are omitted;
5. external_set_env_vars_ext() is now located in a different file, so the
respective change is done there;
6. sisc_lockname() doesn't exist anymore, so this change is omitted;
7. in get_shareinput_reference() gp_command_count is replaced with
MyProc->queryCommandId;
8. changes from CheckForResetSession() are now done in GpResetSessionIfNeeded(),
as CheckForResetSession() through the times was transormed into
GpDropTempTables(), which in turn was transformed into GpResetSessionIfNeeded().
  • Loading branch information
whitehawk committed Oct 24, 2024
1 parent d15eade commit 03a9bb3
Show file tree
Hide file tree
Showing 30 changed files with 2,091 additions and 59 deletions.
1 change: 1 addition & 0 deletions .abi-check/7.2.0_arenadata5/postgres.types.ignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
struct QueryDesc
5 changes: 3 additions & 2 deletions src/backend/access/external/url.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "libpq/libpq-be.h"
#include "miscadmin.h"
#include "postmaster/postmaster.h" /* postmaster port */
#include "storage/proc.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/guc.h"
Expand Down Expand Up @@ -109,9 +110,9 @@ external_set_env_vars_ext(extvar_t *extvar, char *uri, bool csv, char *escape, c
* "session id"-"command id" to identify the transaction.
*/
if (!getDistributedTransactionIdentifier(extvar->GP_XID))
sprintf(extvar->GP_XID, "%u-%.10u", gp_session_id, gp_command_count);
sprintf(extvar->GP_XID, "%u-%.10u", gp_session_id, MyProc->queryCommandId);

sprintf(extvar->GP_CID, "%x", gp_command_count);
sprintf(extvar->GP_CID, "%x", MyProc->queryCommandId);
sprintf(extvar->GP_SN, "%x", scancounter);
sprintf(extvar->GP_SEGMENT_ID, "%d", GpIdentity.segindex);
sprintf(extvar->GP_SEG_PORT, "%d", PostPortNumber);
Expand Down
3 changes: 2 additions & 1 deletion src/backend/cdb/cdbdtxcontextinfo.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "cdb/cdbvars.h"
#include "cdb/cdbtm.h"
#include "access/xact.h"
#include "storage/proc.h"
#include "utils/guc.h"
#include "utils/session_state.h"

Expand Down Expand Up @@ -62,7 +63,7 @@ DtxContextInfo_CreateOnCoordinator(DtxContextInfo *dtxContextInfo, bool inCursor

AssertImply(inCursor,
dtxContextInfo->distributedXid != InvalidDistributedTransactionId &&
gp_command_count == MySessionState->latestCursorCommandId);
MyProc->queryCommandId == MySessionState->latestCursorCommandId);

dtxContextInfo->cursorContext = inCursor;
dtxContextInfo->nestingLevel = GetCurrentTransactionNestLevel();
Expand Down
4 changes: 3 additions & 1 deletion src/backend/cdb/cdbthreadlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "miscadmin.h"
#include "pgtime.h"
#include "postmaster/syslogger.h"
#include "storage/proc.h"


#ifndef _WIN32
Expand Down Expand Up @@ -166,7 +167,8 @@ write_log(const char *fmt,...)
sprintf(tempbuf, "%d", MyProcPid);
strcat(logprefix, tempbuf); /* pid */
strcat(logprefix, "|");
sprintf(tempbuf, "con%d cmd%d", gp_session_id, gp_command_count);
sprintf(tempbuf, "con%d cmd%d",
gp_session_id, MyProc != NULL ? MyProc->queryCommandId : 0);
strcat(logprefix, tempbuf);

strcat(logprefix, "|");
Expand Down
6 changes: 1 addition & 5 deletions src/backend/cdb/cdbvars.c
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ gpvars_check_rg_query_fixed_mem(int *newval, void **extra, GucSource source)
/*
* increment_command_count
* Increment gp_command_count. If the new command count is 0 or a negative number, reset it to 1.
* And keep MyProc->queryCommandId synced with gp_command_count.
* And update MyProc->queryCommandId.
*/
void
increment_command_count()
Expand All @@ -662,10 +662,6 @@ increment_command_count()
if (gp_command_count <= 0)
gp_command_count = 1;

/*
* No need to maintain MyProc->queryCommandId elsewhere, we guarantee
* they are always synced here.
*/
MyProc->queryCommandId = gp_command_count;
}

Expand Down
16 changes: 11 additions & 5 deletions src/backend/cdb/dispatcher/cdbdisp_query.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "catalog/namespace.h" /* for GetTempNamespaceState() */
#include "nodes/execnodes.h"
#include "pgstat.h"
#include "storage/proc.h"
#include "tcop/tcopprot.h"
#include "utils/datum.h"
#include "utils/guc.h"
Expand Down Expand Up @@ -82,6 +83,7 @@ typedef struct DispatchCommandQueryParms
*/
const char *strCommand;
int strCommandlen;
int queryCommandId;
char *serializedPlantree;
int serializedPlantreelen;
char *serializedQueryDispatchDesc;
Expand Down Expand Up @@ -259,7 +261,7 @@ CdbDispatchPlan(struct QueryDesc *queryDesc,
*/
if (queryDesc->extended_query)
{
verify_shared_snapshot_ready(gp_command_count);
verify_shared_snapshot_ready(MyProc->queryCommandId);
}

cdbdisp_dispatchX(queryDesc, planRequiresTxn, cancelOnError);
Expand Down Expand Up @@ -560,6 +562,7 @@ cdbdisp_buildCommandQueryParms(const char *strCommand, int flags)

pQueryParms = palloc0(sizeof(*pQueryParms));
pQueryParms->strCommand = strCommand;
pQueryParms->queryCommandId = MyProc->queryCommandId;
pQueryParms->serializedQueryDispatchDesc = NULL;
pQueryParms->serializedQueryDispatchDesclen = 0;

Expand Down Expand Up @@ -631,6 +634,7 @@ cdbdisp_buildUtilityQueryParms(struct Node *stmt,

pQueryParms = palloc0(sizeof(*pQueryParms));
pQueryParms->strCommand = PointerIsValid(debug_query_string) ? debug_query_string : "";
pQueryParms->queryCommandId = MyProc->queryCommandId;
pQueryParms->serializedPlantree = serializedPlantree;
pQueryParms->serializedPlantreelen = serializedPlantree_len;
pQueryParms->serializedQueryDispatchDesc = serializedQueryDispatchDesc;
Expand Down Expand Up @@ -690,6 +694,7 @@ cdbdisp_buildPlanQueryParms(struct QueryDesc *queryDesc,
sddesc = serializeNode((Node *) queryDesc->ddesc, &sddesc_len, NULL /* uncompressed_size */ );

pQueryParms->strCommand = queryDesc->sourceText;
pQueryParms->queryCommandId = MyProc->queryCommandId;
pQueryParms->serializedPlantree = splan;
pQueryParms->serializedPlantreelen = splan_len;
pQueryParms->serializedQueryDispatchDesc = sddesc;
Expand Down Expand Up @@ -892,6 +897,7 @@ buildGpQueryString(DispatchCommandQueryParms *pQueryParms,
const char *dtxContextInfo = pQueryParms->serializedDtxContextInfo;
int dtxContextInfo_len = pQueryParms->serializedDtxContextInfolen;
int64 currentStatementStartTimestamp = GetCurrentStatementStartTimestamp();
int queryCommandId = pQueryParms->queryCommandId;
Oid sessionUserId = GetSessionUserId();
Oid outerUserId = GetOuterUserId();
Oid currentUserId = GetUserId();
Expand Down Expand Up @@ -934,7 +940,7 @@ buildGpQueryString(DispatchCommandQueryParms *pQueryParms,

total_query_len = 1 /* 'M' */ +
sizeof(len) /* message length */ +
sizeof(gp_command_count) +
sizeof(queryCommandId) +
sizeof(sessionUserId) /* sessionUserIsSuper */ +
sizeof(outerUserId) /* outerUserIsSuper */ +
sizeof(currentUserId) +
Expand Down Expand Up @@ -963,9 +969,9 @@ buildGpQueryString(DispatchCommandQueryParms *pQueryParms,

pos += 4; /* placeholder for message length */

tmp = htonl(gp_command_count);
memcpy(pos, &tmp, sizeof(gp_command_count));
pos += sizeof(gp_command_count);
tmp = htonl(queryCommandId);
memcpy(pos, &tmp, sizeof(queryCommandId));
pos += sizeof(queryCommandId);

tmp = htonl(sessionUserId);
memcpy(pos, &tmp, sizeof(sessionUserId));
Expand Down
1 change: 1 addition & 0 deletions src/backend/cdb/dispatcher/cdbgang.c
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,7 @@ GpResetSessionIfNeeded(void)

gp_session_id = newSessionId;
gp_command_count = 0;
MyProc->queryCommandId = 0;
pgstat_report_sessionid(newSessionId);

/* Update the slotid for our singleton reader. */
Expand Down
4 changes: 2 additions & 2 deletions src/backend/cdb/endpoint/cdbendpointutils.c
Original file line number Diff line number Diff line change
Expand Up @@ -574,14 +574,14 @@ generate_endpoint_name(char *name, const char *cursorName)
len += ENDPOINT_NAME_SESSIONID_LEN;

/*
* part3: gp_command_count In theory cursor name + gp_session_id is
* part3: MyProc->queryCommandId. In theory cursor name + gp_session_id is
* enough, but we'd keep this part to avoid confusion or potential issues
* for the scenario that in the same session (thus same gp_session_id),
* two endpoints with same cursor names (happens the cursor is
* dropped/rollbacked and then recreated) and retrieve the endpoints would
* be confusing for users that in the same retrieve connection.
*/
snprintf(name + len, ENDPOINT_NAME_COMMANDID_LEN + 1, "%08x", gp_command_count);
snprintf(name + len, ENDPOINT_NAME_COMMANDID_LEN + 1, "%08x", MyProc->queryCommandId);
len += ENDPOINT_NAME_COMMANDID_LEN;

name[len] = '\0';
Expand Down
3 changes: 2 additions & 1 deletion src/backend/cdb/motion/ic_proxy_backend.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "cdb/cdbvars.h"
#include "cdb/ml_ipc.h"
#include "executor/execdesc.h"
#include "storage/proc.h"
#include "storage/shmem.h"

#include "ic_proxy.h"
Expand Down Expand Up @@ -444,7 +445,7 @@ ic_proxy_backend_connect(ICProxyBackendContext *context, ChunkTransportStateEntr
/* message key for a HELLO message */
ic_proxy_key_init(&backend->key, /* key itself */
gp_session_id, /* sessionId */
gp_command_count, /* commandId */
MyProc->queryCommandId, /* commandId */
pEntry->sendSlice->sliceIndex, /* sendSliceIndex */
pEntry->recvSlice->sliceIndex, /* recvSliceIndex */
GpIdentity.segindex, /* localContentId */
Expand Down
3 changes: 2 additions & 1 deletion src/backend/cdb/motion/ic_udpifc.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "postmaster/postmaster.h"
#include "storage/latch.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
Expand Down Expand Up @@ -3087,7 +3088,7 @@ SetupUDPIFCInterconnect_Internal(SliceTable *sliceTable)
}
}

addCursorIcEntry(ich_table, sliceTable->ic_instance_id, gp_command_count);
addCursorIcEntry(ich_table, sliceTable->ic_instance_id, MyProc->queryCommandId);
/* save the latest transaction id */
rx_control_info.lastDXatId = distTransId;
}
Expand Down
116 changes: 104 additions & 12 deletions src/backend/executor/execMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@
queryDesc->ddesc->parallelCursorName && \
strlen(queryDesc->ddesc->parallelCursorName) > 0)

#define UPDATE_COMMAND_ID_AT_START(queryDesc, prevCommandId) UpdateCommandId(queryDesc, prevCommandId, __FUNCTION__, true)
#define UPDATE_COMMAND_ID(queryDesc, prevCommandId) UpdateCommandId(queryDesc, prevCommandId, __FUNCTION__, false)
#define RESTORE_COMMAND_ID(queryDesc, prevCommandId) RestoreCommandId(queryDesc, prevCommandId, __FUNCTION__)

/* Hooks for plugins to get control in ExecutorStart/Run/Finish/End */
ExecutorStart_hook_type ExecutorStart_hook = NULL;
ExecutorRun_hook_type ExecutorRun_hook = NULL;
Expand Down Expand Up @@ -183,9 +187,48 @@ static char *ExecBuildSlotValueDescription(Oid reloid,
int maxfieldlen);
static void EvalPlanQualStart(EPQState *epqstate, Plan *planTree);

static inline void UpdateCommandId(QueryDesc *queryDesc, int *prevCommandId, const char *functionName, bool trackStart);
static inline void RestoreCommandId(QueryDesc *queryDesc, int prevCommandId, const char *functionName);

/* end of local decls */


static inline void
UpdateCommandId(QueryDesc *queryDesc, int *prevCommandId, const char *functionName, bool trackStart)
{
if (Gp_role != GP_ROLE_EXECUTE)
{
*prevCommandId = MyProc->queryCommandId;
MyProc->queryCommandId = queryDesc->command_id;
}

#ifdef FAULT_INJECTOR
if (SIMPLE_FAULT_INJECTOR("track_query_command_id") == FaultInjectorTypeSkip ||
(trackStart && SIMPLE_FAULT_INJECTOR("track_query_command_id_at_start") == FaultInjectorTypeSkip))
elog(NOTICE,
"START %s | Q: %s | QUERY ID: %d",
functionName,
queryDesc->sourceText,
MyProc->queryCommandId);
#endif
}

static inline void
RestoreCommandId(QueryDesc *queryDesc, int prevCommandId, const char *functionName)
{
#ifdef FAULT_INJECTOR
if (SIMPLE_FAULT_INJECTOR("track_query_command_id") == FaultInjectorTypeSkip)
elog(NOTICE,
"END %s | Q: %s | QUERY ID: %d",
functionName,
queryDesc->sourceText,
MyProc->queryCommandId);
#endif

if (Gp_role != GP_ROLE_EXECUTE)
MyProc->queryCommandId = prevCommandId;
}

/* ----------------------------------------------------------------
* ExecutorStart
*
Expand Down Expand Up @@ -216,10 +259,24 @@ static void EvalPlanQualStart(EPQState *epqstate, Plan *planTree);
void
ExecutorStart(QueryDesc *queryDesc, int eflags)
{
if (ExecutorStart_hook)
(*ExecutorStart_hook) (queryDesc, eflags);
else
standard_ExecutorStart(queryDesc, eflags);
int prevCommandId = 0;
UPDATE_COMMAND_ID_AT_START(queryDesc, &prevCommandId);

PG_TRY();
{
if (ExecutorStart_hook)
(*ExecutorStart_hook) (queryDesc, eflags);
else
standard_ExecutorStart(queryDesc, eflags);
}
PG_CATCH();
{
RESTORE_COMMAND_ID(queryDesc, prevCommandId);
PG_RE_THROW();
}
PG_END_TRY();

RESTORE_COMMAND_ID(queryDesc, prevCommandId);
}

void
Expand Down Expand Up @@ -784,6 +841,10 @@ ExecutorRun(QueryDesc *queryDesc,
* at the definition of the static variable executor_run_nesting_level.
*/
executor_run_nesting_level++;

int prevCommandId = 0;
UPDATE_COMMAND_ID(queryDesc, &prevCommandId);

PG_TRY();
{
if (ExecutorRun_hook)
Expand All @@ -795,9 +856,12 @@ ExecutorRun(QueryDesc *queryDesc,
PG_CATCH();
{
executor_run_nesting_level--;
RESTORE_COMMAND_ID(queryDesc, prevCommandId);
PG_RE_THROW();
}
PG_END_TRY();

RESTORE_COMMAND_ID(queryDesc, prevCommandId);
}

void
Expand Down Expand Up @@ -1058,10 +1122,24 @@ standard_ExecutorRun(QueryDesc *queryDesc,
void
ExecutorFinish(QueryDesc *queryDesc)
{
if (ExecutorFinish_hook)
(*ExecutorFinish_hook) (queryDesc);
else
standard_ExecutorFinish(queryDesc);
int prevCommandId = 0;
UPDATE_COMMAND_ID(queryDesc, &prevCommandId);

PG_TRY();
{
if (ExecutorFinish_hook)
(*ExecutorFinish_hook) (queryDesc);
else
standard_ExecutorFinish(queryDesc);
}
PG_CATCH();
{
RESTORE_COMMAND_ID(queryDesc, prevCommandId);
PG_RE_THROW();
}
PG_END_TRY();

RESTORE_COMMAND_ID(queryDesc, prevCommandId);
}

void
Expand Down Expand Up @@ -1118,10 +1196,24 @@ standard_ExecutorFinish(QueryDesc *queryDesc)
void
ExecutorEnd(QueryDesc *queryDesc)
{
if (ExecutorEnd_hook)
(*ExecutorEnd_hook) (queryDesc);
else
standard_ExecutorEnd(queryDesc);
int prevCommandId = 0;
UPDATE_COMMAND_ID(queryDesc, &prevCommandId);

PG_TRY();
{
if (ExecutorEnd_hook)
(*ExecutorEnd_hook) (queryDesc);
else
standard_ExecutorEnd(queryDesc);
}
PG_CATCH();
{
RESTORE_COMMAND_ID(queryDesc, prevCommandId);
PG_RE_THROW();
}
PG_END_TRY();

RESTORE_COMMAND_ID(queryDesc, prevCommandId);
}

void
Expand Down
Loading

0 comments on commit 03a9bb3

Please sign in to comment.