Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

contrib/jackc/pgx.v5: wrap previous tracer #2932

Merged
merged 2 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions contrib/jackc/pgx.v5/pgx.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,33 @@ func init() {
tracer.MarkIntegrationImported("github.com/jackc/pgx.v5")
}

// Deprecated: this type is unused internally so it will be removed in a future release, please use pgx.Batch instead.
type Batch = pgx.Batch

// Connect is equivalent to pgx.Connect providing a connection augmented with tracing.
func Connect(ctx context.Context, connString string, opts ...Option) (*pgx.Conn, error) {
connConfig, err := pgx.ParseConfig(connString)
if err != nil {
return nil, err
}

return ConnectConfig(ctx, connConfig, opts...)
}

// ConnectConfig is equivalent to pgx.ConnectConfig providing a connection augmented with tracing.
func ConnectConfig(ctx context.Context, connConfig *pgx.ConnConfig, opts ...Option) (*pgx.Conn, error) {
// The tracer must be set in the config before calling connect
// as pgx takes ownership of the config. QueryTracer traces
// may work, but none of the others will, as they're set in
// unexported fields in the config in the pgx.connect function.
connConfig.Tracer = newPgxTracer(opts...)
connConfig.Tracer = wrapPgxTracer(connConfig.Tracer, opts...)
return pgx.ConnectConfig(ctx, connConfig)
}

// ConnectWithOptions is equivalent to pgx.ConnectWithOptions providing a connection augmented with tracing.
func ConnectWithOptions(ctx context.Context, connString string, options pgx.ParseConfigOptions, tracerOpts ...Option) (*pgx.Conn, error) {
connConfig, err := pgx.ParseConfigWithOptions(connString, options)
if err != nil {
return nil, err
}
return ConnectConfig(ctx, connConfig, tracerOpts...)
}
102 changes: 87 additions & 15 deletions contrib/jackc/pgx.v5/pgx_tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,42 +44,82 @@ func (tb *tracedBatchQuery) finish() {
tb.span.Finish(tracer.WithError(tb.data.Err))
}

type allPgxTracers interface {
pgx.QueryTracer
pgx.BatchTracer
pgx.ConnectTracer
pgx.PrepareTracer
pgx.CopyFromTracer
pgxpool.AcquireTracer
}

type wrappedPgxTracer struct {
query pgx.QueryTracer
batch pgx.BatchTracer
connect pgx.ConnectTracer
prepare pgx.PrepareTracer
copyFrom pgx.CopyFromTracer
poolAcquire pgxpool.AcquireTracer
}

type pgxTracer struct {
cfg *config
prevBatchQuery *tracedBatchQuery
wrapped wrappedPgxTracer
}

var (
_ pgx.QueryTracer = (*pgxTracer)(nil)
_ pgx.BatchTracer = (*pgxTracer)(nil)
_ pgx.ConnectTracer = (*pgxTracer)(nil)
_ pgx.PrepareTracer = (*pgxTracer)(nil)
_ pgx.CopyFromTracer = (*pgxTracer)(nil)
_ pgxpool.AcquireTracer = (*pgxTracer)(nil)
_ allPgxTracers = (*pgxTracer)(nil)
)

func newPgxTracer(opts ...Option) *pgxTracer {
func wrapPgxTracer(prev pgx.QueryTracer, opts ...Option) *pgxTracer {
cfg := defaultConfig()
for _, opt := range opts {
opt(cfg)
}
cfg.checkStatsdRequired()
return &pgxTracer{cfg: cfg}
tr := &pgxTracer{cfg: cfg}
if prev != nil {
tr.wrapped.query = prev
if batchTr, ok := prev.(pgx.BatchTracer); ok {
tr.wrapped.batch = batchTr
}
if connTr, ok := prev.(pgx.ConnectTracer); ok {
tr.wrapped.connect = connTr
}
if prepareTr, ok := prev.(pgx.PrepareTracer); ok {
tr.wrapped.prepare = prepareTr
}
if copyFromTr, ok := prev.(pgx.CopyFromTracer); ok {
tr.wrapped.copyFrom = copyFromTr
}
if poolAcquireTr, ok := prev.(pgxpool.AcquireTracer); ok {
tr.wrapped.poolAcquire = poolAcquireTr
}
}

return tr
}

func (t *pgxTracer) TraceQueryStart(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryStartData) context.Context {
if !t.cfg.traceQuery {
return ctx
}
if t.wrapped.query != nil {
ctx = t.wrapped.query.TraceQueryStart(ctx, conn, data)
}
opts := t.spanOptions(conn.Config(), operationTypeQuery, data.SQL)
_, ctx = tracer.StartSpanFromContext(ctx, "pgx.query", opts...)
return ctx
}

func (t *pgxTracer) TraceQueryEnd(ctx context.Context, _ *pgx.Conn, data pgx.TraceQueryEndData) {
func (t *pgxTracer) TraceQueryEnd(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryEndData) {
if !t.cfg.traceQuery {
return
}
if t.wrapped.query != nil {
t.wrapped.query.TraceQueryEnd(ctx, conn, data)
}
span, ok := tracer.SpanFromContext(ctx)
if ok {
span.SetTag(tagRowsAffected, data.CommandTag.RowsAffected())
Expand All @@ -91,6 +131,9 @@ func (t *pgxTracer) TraceBatchStart(ctx context.Context, conn *pgx.Conn, data pg
if !t.cfg.traceBatch {
return ctx
}
if t.wrapped.batch != nil {
ctx = t.wrapped.batch.TraceBatchStart(ctx, conn, data)
}
opts := t.spanOptions(conn.Config(), operationTypeBatch, "",
tracer.Tag(tagBatchNumQueries, data.Batch.Len()),
)
Expand All @@ -102,6 +145,9 @@ func (t *pgxTracer) TraceBatchQuery(ctx context.Context, conn *pgx.Conn, data pg
if !t.cfg.traceBatch {
return
}
if t.wrapped.batch != nil {
t.wrapped.batch.TraceBatchQuery(ctx, conn, data)
}
// Finish the previous batch query span before starting the next one, since pgx doesn't provide hooks or timestamp
// information about when the actual operation started or finished.
// pgx.Batch* types don't support concurrency. This function doesn't support it either.
Expand All @@ -118,10 +164,13 @@ func (t *pgxTracer) TraceBatchQuery(ctx context.Context, conn *pgx.Conn, data pg
}
}

func (t *pgxTracer) TraceBatchEnd(ctx context.Context, _ *pgx.Conn, data pgx.TraceBatchEndData) {
func (t *pgxTracer) TraceBatchEnd(ctx context.Context, conn *pgx.Conn, data pgx.TraceBatchEndData) {
if !t.cfg.traceBatch {
return
}
if t.wrapped.batch != nil {
t.wrapped.batch.TraceBatchEnd(ctx, conn, data)
}
if t.prevBatchQuery != nil {
t.prevBatchQuery.finish()
t.prevBatchQuery = nil
Expand All @@ -133,6 +182,9 @@ func (t *pgxTracer) TraceCopyFromStart(ctx context.Context, conn *pgx.Conn, data
if !t.cfg.traceCopyFrom {
return ctx
}
if t.wrapped.copyFrom != nil {
ctx = t.wrapped.copyFrom.TraceCopyFromStart(ctx, conn, data)
}
opts := t.spanOptions(conn.Config(), operationTypeCopyFrom, "",
tracer.Tag(tagCopyFromTables, data.TableName),
tracer.Tag(tagCopyFromColumns, data.ColumnNames),
Expand All @@ -141,33 +193,45 @@ func (t *pgxTracer) TraceCopyFromStart(ctx context.Context, conn *pgx.Conn, data
return ctx
}

func (t *pgxTracer) TraceCopyFromEnd(ctx context.Context, _ *pgx.Conn, data pgx.TraceCopyFromEndData) {
func (t *pgxTracer) TraceCopyFromEnd(ctx context.Context, conn *pgx.Conn, data pgx.TraceCopyFromEndData) {
if !t.cfg.traceCopyFrom {
return
}
if t.wrapped.copyFrom != nil {
t.wrapped.copyFrom.TraceCopyFromEnd(ctx, conn, data)
}
finishSpan(ctx, data.Err)
}

func (t *pgxTracer) TracePrepareStart(ctx context.Context, conn *pgx.Conn, data pgx.TracePrepareStartData) context.Context {
if !t.cfg.tracePrepare {
return ctx
}
if t.wrapped.prepare != nil {
ctx = t.wrapped.prepare.TracePrepareStart(ctx, conn, data)
}
opts := t.spanOptions(conn.Config(), operationTypePrepare, data.SQL)
_, ctx = tracer.StartSpanFromContext(ctx, "pgx.prepare", opts...)
return ctx
}

func (t *pgxTracer) TracePrepareEnd(ctx context.Context, _ *pgx.Conn, data pgx.TracePrepareEndData) {
func (t *pgxTracer) TracePrepareEnd(ctx context.Context, conn *pgx.Conn, data pgx.TracePrepareEndData) {
if !t.cfg.tracePrepare {
return
}
if t.wrapped.prepare != nil {
t.wrapped.prepare.TracePrepareEnd(ctx, conn, data)
}
finishSpan(ctx, data.Err)
}

func (t *pgxTracer) TraceConnectStart(ctx context.Context, data pgx.TraceConnectStartData) context.Context {
if !t.cfg.traceConnect {
return ctx
}
if t.wrapped.connect != nil {
ctx = t.wrapped.connect.TraceConnectStart(ctx, data)
}
opts := t.spanOptions(data.ConnConfig, operationTypeConnect, "")
_, ctx = tracer.StartSpanFromContext(ctx, "pgx.connect", opts...)
return ctx
Expand All @@ -177,23 +241,31 @@ func (t *pgxTracer) TraceConnectEnd(ctx context.Context, data pgx.TraceConnectEn
if !t.cfg.traceConnect {
return
}
if t.wrapped.connect != nil {
t.wrapped.connect.TraceConnectEnd(ctx, data)
}
finishSpan(ctx, data.Err)
}

func (t *pgxTracer) TraceAcquireStart(ctx context.Context, pool *pgxpool.Pool, _ pgxpool.TraceAcquireStartData) context.Context {
func (t *pgxTracer) TraceAcquireStart(ctx context.Context, pool *pgxpool.Pool, data pgxpool.TraceAcquireStartData) context.Context {
if !t.cfg.traceAcquire {
return ctx
}
if t.wrapped.poolAcquire != nil {
ctx = t.wrapped.poolAcquire.TraceAcquireStart(ctx, pool, data)
}
opts := t.spanOptions(pool.Config().ConnConfig, operationTypeAcquire, "")
_, ctx = tracer.StartSpanFromContext(ctx, "pgx.pool.acquire", opts...)
return ctx
}

func (t *pgxTracer) TraceAcquireEnd(ctx context.Context, _ *pgxpool.Pool, data pgxpool.TraceAcquireEndData) {
func (t *pgxTracer) TraceAcquireEnd(ctx context.Context, pool *pgxpool.Pool, data pgxpool.TraceAcquireEndData) {
if !t.cfg.traceAcquire {
return
}

if t.wrapped.poolAcquire != nil {
t.wrapped.poolAcquire.TraceAcquireEnd(ctx, pool, data)
}
finishSpan(ctx, data.Err)
}

Expand Down
Loading
Loading