From da0af6ca9781520884df7c3e7ecb3d5b0e20a2dc Mon Sep 17 00:00:00 2001 From: Anthonin Bonnefoy Date: Thu, 21 Mar 2024 08:37:50 +0100 Subject: [PATCH] Add tracing of parallel queries On parallel query, save the tracecontext in a shared memory. Parallel worker will be able to pull the tracecontext from it and generate spans on their own. --- Makefile | 3 +- doc/pg_tracing.md | 32 ++++++++--- expected/parallel.out | 62 ++++++++++++++++++++++ sql/parallel.sql | 30 +++++++++++ src/pg_tracing.c | 68 ++++++++++++++++++++++++ src/pg_tracing.h | 26 +++++++++ src/pg_tracing_parallel.c | 109 ++++++++++++++++++++++++++++++++++++++ 7 files changed, 321 insertions(+), 9 deletions(-) create mode 100644 expected/parallel.out create mode 100644 sql/parallel.sql create mode 100644 src/pg_tracing_parallel.c diff --git a/Makefile b/Makefile index 8a47e80..fd00397 100644 --- a/Makefile +++ b/Makefile @@ -13,10 +13,11 @@ OBJS = \ $(WIN32RES) \ src/pg_tracing.o \ src/pg_tracing_query_process.o \ + src/pg_tracing_parallel.o \ src/pg_tracing_span.o REGRESSCHECKS = utility select extended insert trigger sample \ - subxact full_buffer nested wal cleanup + parallel subxact full_buffer nested wal cleanup REGRESSCHECKS_OPTS = --no-locale --encoding=UTF8 --temp-config pg_tracing.conf --temp-instance=./tmp_check PGXS := $(shell $(PG_CONFIG) --pgxs) diff --git a/doc/pg_tracing.md b/doc/pg_tracing.md index 245753c..7257265 100644 --- a/doc/pg_tracing.md +++ b/doc/pg_tracing.md @@ -112,6 +112,26 @@ The `pg_tracing_consume_spans` and `pg_tracing_peek_spans` views are defined in ## Parameters +### pg_tracing.buffer_mode (enum) + +Controls span buffer's behaviour when `pg_tracing.max_span` spans is reached. If `keep_on_full`, the existing buffer is kept while new spans are dropped. If `drop_on_full`, the existing buffer is dropped and new spans are added. The default value is `keep_on_full`. + +### pg_tracing.caller_sample_rate (real) + +Controls the fraction of statements with SQLCommenter tracecontext and an enabled sampled flag that will generate spans. The default value is 1. + +### pg_tracing.export_parameters (boolean) + +Controls whether the query's parameters should be exported in spans metadata. The default value is `on`. + +### pg_tracing.filter_query_ids (string) + +Restrict sampling to the provided queryIds. An empty value won't filter any queries. The default value is empty. + +### pg_tracing.max_parameter_size (integer) + +Controls the maximum size of the parameter string. The default value is 1024. + ### pg_tracing.max_span (integer) Specifies the maximum number of spans stored by the extension. If more spans are generated, the span buffer will be emptied if `pg_tracing.buffer_mode` is set to `drop_on_full`. If `pg_tracing.buffer_mode` is set to `keep_on_full`, the new spans will be dropped and tracing will be aborted. The default value is 5000. This parameter can only be set at server start. @@ -129,13 +149,13 @@ FROM pg_shmem_allocations WHERE name ='PgTracing Spans'; ``` -### pg_tracing.buffer_mode (enum) +### pg_tracing.sample_rate (real) -Controls span buffer's behaviour when `pg_tracing.max_span` spans is reached. If `keep_on_full`, the existing buffer is kept while new spans are dropped. If `drop_on_full`, the existing buffer is dropped and new spans are added. The default value is `keep_on_full`. +Controls the fraction of statements that generate spans. Statements with tracecontext propagated with SQLCommenter and sampled flag enabled are not impacted by this parameter. For traces with nested statements, either all will be explained or none. The default value is 0. -### pg_tracing.max_parameter_size (integer) +### pg_tracing.trace_parallel_workers (boolean) -Controls the maximum size of the parameter string. The default value is 1024. +Controls whether spans should be generated for workers created by parallel queries. ### pg_tracing.track (enum) @@ -153,10 +173,6 @@ Controls the fraction of statements that generate spans. Statements with traceco Controls the fraction of statements with SQLCommenter tracecontext and an enabled sampled flag that will generate spans. The default value is 1. -### pg_tracing.filter_query_ids (string) - -Restrict sampling to the provided queryIds. An empty value won't filter any queries. The default value is empty. - ### pg_tracing.export_parameters (boolean) Controls whether the query's parameters should be exported in spans metadata. The default value is `on`. diff --git a/expected/parallel.out b/expected/parallel.out new file mode 100644 index 0000000..d5b8b49 --- /dev/null +++ b/expected/parallel.out @@ -0,0 +1,62 @@ +begin; +-- encourage use of parallel plans +set local parallel_setup_cost=0; +set local parallel_tuple_cost=0; +set local min_parallel_table_scan_size=0; +set local max_parallel_workers_per_gather=2; +-- Trace parallel queries +/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000001-0000000000000001-01'*/ select 1 from pg_class limit 1; + ?column? +---------- + 1 +(1 row) + +/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000002-0000000000000002-01'*/ select 2 from pg_class limit 1; + ?column? +---------- + 2 +(1 row) + +/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000003-0000000000000003-00'*/ select 3 from pg_class limit 1; + ?column? +---------- + 3 +(1 row) + +-- Try with parallel tracing disabled +set local pg_tracing.trace_parallel_workers = false; +/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000004-0000000000000004-01'*/ select 4 from pg_class limit 1; + ?column? +---------- + 4 +(1 row) + +commit; +-- Get root top span id +SELECT span_id as root_span_id from pg_tracing_peek_spans where span_type='Select query' and trace_id='00000000000000000000000000000001' and parent_id='0000000000000001' \gset +-- Get executor top span id +SELECT span_id as executor_span_id from pg_tracing_peek_spans where span_operation='ExecutorRun' and trace_id='00000000000000000000000000000001' and parent_id=:'root_span_id' \gset +-- Check the select spans that are attached to the root top span +SELECT trace_id, span_type, span_operation from pg_tracing_peek_spans where span_type='Select query' and parent_id=:'executor_span_id' order by span_operation; + trace_id | span_type | span_operation +----------------------------------+--------------+---------------- + 00000000000000000000000000000001 | Select query | Worker 0 + 00000000000000000000000000000001 | Select query | Worker 1 +(2 rows) + +-- Check generated trace_id +SELECT trace_id from pg_tracing_peek_spans group by trace_id; + trace_id +---------------------------------- + 00000000000000000000000000000001 + 00000000000000000000000000000004 + 00000000000000000000000000000002 +(3 rows) + +-- Check number of executor spans +SELECT count(*) from pg_tracing_consume_spans where span_type='Executor'; + count +------- + 7 +(1 row) + diff --git a/sql/parallel.sql b/sql/parallel.sql new file mode 100644 index 0000000..3d982a1 --- /dev/null +++ b/sql/parallel.sql @@ -0,0 +1,30 @@ +begin; +-- encourage use of parallel plans +set local parallel_setup_cost=0; +set local parallel_tuple_cost=0; +set local min_parallel_table_scan_size=0; +set local max_parallel_workers_per_gather=2; + +-- Trace parallel queries +/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000001-0000000000000001-01'*/ select 1 from pg_class limit 1; +/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000002-0000000000000002-01'*/ select 2 from pg_class limit 1; +/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000003-0000000000000003-00'*/ select 3 from pg_class limit 1; + +-- Try with parallel tracing disabled +set local pg_tracing.trace_parallel_workers = false; +/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000004-0000000000000004-01'*/ select 4 from pg_class limit 1; +commit; + +-- Get root top span id +SELECT span_id as root_span_id from pg_tracing_peek_spans where span_type='Select query' and trace_id='00000000000000000000000000000001' and parent_id='0000000000000001' \gset +-- Get executor top span id +SELECT span_id as executor_span_id from pg_tracing_peek_spans where span_operation='ExecutorRun' and trace_id='00000000000000000000000000000001' and parent_id=:'root_span_id' \gset + +-- Check the select spans that are attached to the root top span +SELECT trace_id, span_type, span_operation from pg_tracing_peek_spans where span_type='Select query' and parent_id=:'executor_span_id' order by span_operation; + +-- Check generated trace_id +SELECT trace_id from pg_tracing_peek_spans group by trace_id; + +-- Check number of executor spans +SELECT count(*) from pg_tracing_consume_spans where span_type='Executor'; diff --git a/src/pg_tracing.c b/src/pg_tracing.c index 3a92d8f..2c5421e 100644 --- a/src/pg_tracing.c +++ b/src/pg_tracing.c @@ -108,6 +108,9 @@ typedef struct pgTracingQueryIdFilter static int pg_tracing_max_span; /* Maximum number of spans to store */ static int pg_tracing_max_parameter_str; /* Maximum number of spans to * store */ +static bool pg_tracing_trace_parallel_workers = true; /* True to generate + * spans from parallel + * workers */ static double pg_tracing_sample_rate = 0; /* Sample rate applied to queries * without SQLCommenter */ static double pg_tracing_caller_sample_rate = 1; /* Sample rate applied to @@ -287,6 +290,17 @@ _PG_init(void) NULL, NULL); + DefineCustomBoolVariable("pg_tracing.trace_parallel_workers", + "Whether to generate samples from parallel workers.", + NULL, + &pg_tracing_trace_parallel_workers, + true, + PGC_USERSET, + 0, + NULL, + NULL, + NULL); + DefineCustomEnumVariable("pg_tracing.track", "Selects which statements are tracked by pg_tracing.", NULL, @@ -415,6 +429,8 @@ pg_tracing_memsize(void) size = add_size(size, sizeof(pgTracingSpans)); /* the span variable array */ size = add_size(size, mul_size(pg_tracing_max_span, sizeof(Span))); + /* and the parallel workers context */ + size = add_size(size, mul_size(max_parallel_workers, sizeof(pgTracingParallelContext))); return size; } @@ -522,6 +538,23 @@ add_str_to_trace_buffer(const char *str, int str_len) return position; } + +/* + * Add the worker name to the provided stringinfo + */ +static int +add_worker_name_to_trace_buffer(StringInfo str_info, int parallel_worker_number) +{ + int position = str_info->cursor; + + Assert(str_len > 0); + + appendStringInfo(str_info, "Worker %d", parallel_worker_number); + appendStringInfoChar(str_info, '\0'); + str_info->cursor = str_info->len; + return position; +} + /* * Store a span in the current_trace_spans buffer */ @@ -767,6 +800,9 @@ pg_tracing_shmem_startup(void) "pg_tracing memory context", ALLOCSET_DEFAULT_SIZES); + /* Initialize shmem for trace propagation to parallel workers */ + pg_tracing_shmem_parallel_startup(); + /* First time, let's init shared state */ if (!found_pg_tracing) { @@ -928,6 +964,17 @@ extract_trace_context(struct pgTracingTraceContext *trace_context, ParseState *p if (pg_tracing_sample_rate == 0 && pg_tracing_caller_sample_rate == 0) return; + /* + * In a parallel worker, check the parallel context shared buffer to see + * if the leader left a trace context + */ + if (IsParallelWorker()) + { + if (pg_tracing_trace_parallel_workers) + fetch_parallel_context(trace_context); + return; + } + Assert(trace_context->root_span.span_id == 0); Assert(traceid_zero(trace_context->traceparent.trace_id)); @@ -975,6 +1022,8 @@ cleanup_tracing(void) !current_trace_context.traceparent.sampled) /* No need for cleaning */ return; + if (pg_tracing_trace_parallel_workers) + remove_parallel_context(); MemoryContextReset(pg_tracing_mem_ctx); reset_trace_context(&root_trace_context); reset_trace_context(¤t_trace_context); @@ -1199,6 +1248,16 @@ begin_top_span(pgTracingTraceContext * trace_context, Span * top_span, NULL, parent_id, per_level_buffers[exec_nested_level].query_id, &start_time); + if (IsParallelWorker()) + { + /* + * In a parallel worker, we use the worker name as the span's + * operation + */ + top_span->operation_name_offset = add_worker_name_to_trace_buffer(current_trace_text, ParallelWorkerNumber); + return; + } + if (jstate && jstate->clocations_count > 0 && query != NULL) { /* jstate is available, normalise query and extract parameters' values */ @@ -1572,6 +1631,7 @@ pg_tracing_ExecutorStart(QueryDesc *queryDesc, int eflags) * ExecutorRun hook: track nesting depth and create ExecutorRun span. * ExecutorRun can create nested queries so we need to create ExecutorRun span * as a top span. + * If the plan needs to create parallel workers, push the trace context in the parallel shared buffer. */ static void pg_tracing_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, @@ -1592,6 +1652,14 @@ pg_tracing_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 cou begin_span(trace_context->traceparent.trace_id, executor_run_span, SPAN_EXECUTOR_RUN, NULL, parent_id, per_level_buffers[exec_nested_level].query_id, &span_start_time); + + /* + * If this query starts parallel worker, push the trace context for + * the child processes + */ + if (queryDesc->plannedstmt->parallelModeNeeded && pg_tracing_trace_parallel_workers) + add_parallel_context(trace_context, executor_run_span->span_id, + per_level_buffers[exec_nested_level].query_id); } exec_nested_level++; diff --git a/src/pg_tracing.h b/src/pg_tracing.h index 7a871c1..719029a 100644 --- a/src/pg_tracing.h +++ b/src/pg_tracing.h @@ -171,6 +171,32 @@ typedef struct pgTracingTraceContext Span root_span; /* Top span for exec_nested_level 0 */ } pgTracingTraceContext; +/* + * A trace context for a specific parallel context + */ +typedef struct pgTracingParallelContext +{ + BackendId leader_backend_id; /* Backend id of the leader, set to + * InvalidBackendId if unused */ + pgTracingTraceContext trace_context; +} pgTracingParallelContext; + +/* + * Store context for parallel workers + */ +typedef struct pgTracingParallelWorkers +{ + slock_t mutex; + pgTracingParallelContext trace_contexts[FLEXIBLE_ARRAY_MEMBER]; +} pgTracingParallelWorkers; + +/* pg_tracing_parallel.c */ +extern void pg_tracing_shmem_parallel_startup(void); +extern void add_parallel_context(const struct pgTracingTraceContext *trace_context, + uint64 parent_id, uint64 query_id); +extern void remove_parallel_context(void); +extern void fetch_parallel_context(pgTracingTraceContext * trace_context); + /* pg_tracing_query_process.c */ extern const char *normalise_query_parameters(const JumbleState *jstate, const char *query, int query_loc, int *query_len_p, char **param_str, diff --git a/src/pg_tracing_parallel.c b/src/pg_tracing_parallel.c new file mode 100644 index 0000000..6239208 --- /dev/null +++ b/src/pg_tracing_parallel.c @@ -0,0 +1,109 @@ +/*------------------------------------------------------------------------- + * + * pg_tracing_parallel.c + * Store, retrieve and remove trace context for parallel workers. + * + * IDENTIFICATION + * contrib/pg_tracing/pg_tracing_parallel.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "storage/shmem.h" +#include "storage/spin.h" +#include "pg_tracing.h" + +/* Shared buffer storing trace context for parallel workers. */ +static pgTracingParallelWorkers * pg_tracing_parallel = NULL; + +/* Index of the parallel worker context shared buffer if any */ +static int parallel_context_index = -1; + +/* + * Allocate share memory for propagation of trace context to parallel workers + */ +void +pg_tracing_shmem_parallel_startup(void) +{ + bool found_parallel; + + /* We won't have more than max_parallel_workers workers */ + pg_tracing_parallel = ShmemInitStruct("PgTracing Parallel Workers Context", + sizeof(pgTracingParallelWorkers) + max_parallel_workers * sizeof(pgTracingParallelContext), + &found_parallel); + if (!found_parallel) + { + SpinLockInit(&pg_tracing_parallel->mutex); + for (int i = 0; i < max_parallel_workers; i++) + pg_tracing_parallel->trace_contexts[i].leader_backend_id = InvalidBackendId; + } +} + +/* + * Push trace context to the shared parallel worker buffer + */ +void +add_parallel_context(const struct pgTracingTraceContext *trace_context, + uint64 parent_id, uint64 query_id) +{ + pgTracingParallelContext *ctx; + + Assert(parallel_context_index == -1); + SpinLockAcquire(&pg_tracing_parallel->mutex); + for (int i = 0; i < max_parallel_workers; i++) + { + ctx = pg_tracing_parallel->trace_contexts + i; + Assert(ctx->leader_backend_id != MyBackendId); + if (ctx->leader_backend_id != InvalidBackendId) + continue; + /* Slot is available */ + parallel_context_index = i; + ctx->leader_backend_id = MyBackendId; + /* We can do the rest outside the lock */ + break; + } + SpinLockRelease(&pg_tracing_parallel->mutex); + + if (parallel_context_index > -1) + { + ctx->trace_context = *trace_context; + /* We don't need to propagate root span index to parallel workers */ + ctx->trace_context.root_span.span_id = 0; + ctx->trace_context.traceparent.parent_id = parent_id; + } +} + +/* + * Remove parallel context for the current leader from the shared memory. + */ +void +remove_parallel_context(void) +{ + if (parallel_context_index < 0) + /* No tracing of parallel workers */ + return; + + SpinLockAcquire(&pg_tracing_parallel->mutex); + pg_tracing_parallel->trace_contexts[parallel_context_index].leader_backend_id = InvalidBackendId; + SpinLockRelease(&pg_tracing_parallel->mutex); + parallel_context_index = -1; +} + +/* + * If we're inside a parallel worker, check if the trace context is stored in shared memory. + * If a trace context exists, it means that the query is sampled and worker tracing is enabled. + */ +void +fetch_parallel_context(pgTracingTraceContext * trace_context) +{ + SpinLockAcquire(&pg_tracing_parallel->mutex); + for (int i = 0; i < max_parallel_workers; i++) + { + if (pg_tracing_parallel->trace_contexts[i].leader_backend_id != ParallelLeaderBackendId) + continue; + /* Found a matching a trace context, fetch it */ + *trace_context = pg_tracing_parallel->trace_contexts[i].trace_context; + } + SpinLockRelease(&pg_tracing_parallel->mutex); +}