Skip to content

Commit

Permalink
contrib/jackc/pgx.v5: wrap previous tracer (#2932)
Browse files Browse the repository at this point in the history
Co-authored-by: Dario Castañé <[email protected]>
  • Loading branch information
rarguelloF and darccio authored Oct 21, 2024
1 parent f65cab8 commit 1366f6b
Show file tree
Hide file tree
Showing 4 changed files with 327 additions and 49 deletions.
15 changes: 13 additions & 2 deletions contrib/jackc/pgx.v5/pgx.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,33 @@ func init() {
tracer.MarkIntegrationImported("github.com/jackc/pgx.v5")
}

// Deprecated: this type is unused internally so it will be removed in a future release, please use pgx.Batch instead.
type Batch = pgx.Batch

// Connect is equivalent to pgx.Connect providing a connection augmented with tracing.
func Connect(ctx context.Context, connString string, opts ...Option) (*pgx.Conn, error) {
connConfig, err := pgx.ParseConfig(connString)
if err != nil {
return nil, err
}

return ConnectConfig(ctx, connConfig, opts...)
}

// ConnectConfig is equivalent to pgx.ConnectConfig providing a connection augmented with tracing.
func ConnectConfig(ctx context.Context, connConfig *pgx.ConnConfig, opts ...Option) (*pgx.Conn, error) {
// The tracer must be set in the config before calling connect
// as pgx takes ownership of the config. QueryTracer traces
// may work, but none of the others will, as they're set in
// unexported fields in the config in the pgx.connect function.
connConfig.Tracer = newPgxTracer(opts...)
connConfig.Tracer = wrapPgxTracer(connConfig.Tracer, opts...)
return pgx.ConnectConfig(ctx, connConfig)
}

// ConnectWithOptions is equivalent to pgx.ConnectWithOptions providing a connection augmented with tracing.
func ConnectWithOptions(ctx context.Context, connString string, options pgx.ParseConfigOptions, tracerOpts ...Option) (*pgx.Conn, error) {
connConfig, err := pgx.ParseConfigWithOptions(connString, options)
if err != nil {
return nil, err
}
return ConnectConfig(ctx, connConfig, tracerOpts...)
}
102 changes: 87 additions & 15 deletions contrib/jackc/pgx.v5/pgx_tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,42 +44,82 @@ func (tb *tracedBatchQuery) finish() {
tb.span.Finish(tracer.WithError(tb.data.Err))
}

type allPgxTracers interface {
pgx.QueryTracer
pgx.BatchTracer
pgx.ConnectTracer
pgx.PrepareTracer
pgx.CopyFromTracer
pgxpool.AcquireTracer
}

type wrappedPgxTracer struct {
query pgx.QueryTracer
batch pgx.BatchTracer
connect pgx.ConnectTracer
prepare pgx.PrepareTracer
copyFrom pgx.CopyFromTracer
poolAcquire pgxpool.AcquireTracer
}

type pgxTracer struct {
cfg *config
prevBatchQuery *tracedBatchQuery
wrapped wrappedPgxTracer
}

var (
_ pgx.QueryTracer = (*pgxTracer)(nil)
_ pgx.BatchTracer = (*pgxTracer)(nil)
_ pgx.ConnectTracer = (*pgxTracer)(nil)
_ pgx.PrepareTracer = (*pgxTracer)(nil)
_ pgx.CopyFromTracer = (*pgxTracer)(nil)
_ pgxpool.AcquireTracer = (*pgxTracer)(nil)
_ allPgxTracers = (*pgxTracer)(nil)
)

func newPgxTracer(opts ...Option) *pgxTracer {
func wrapPgxTracer(prev pgx.QueryTracer, opts ...Option) *pgxTracer {
cfg := defaultConfig()
for _, opt := range opts {
opt(cfg)
}
cfg.checkStatsdRequired()
return &pgxTracer{cfg: cfg}
tr := &pgxTracer{cfg: cfg}
if prev != nil {
tr.wrapped.query = prev
if batchTr, ok := prev.(pgx.BatchTracer); ok {
tr.wrapped.batch = batchTr
}
if connTr, ok := prev.(pgx.ConnectTracer); ok {
tr.wrapped.connect = connTr
}
if prepareTr, ok := prev.(pgx.PrepareTracer); ok {
tr.wrapped.prepare = prepareTr
}
if copyFromTr, ok := prev.(pgx.CopyFromTracer); ok {
tr.wrapped.copyFrom = copyFromTr
}
if poolAcquireTr, ok := prev.(pgxpool.AcquireTracer); ok {
tr.wrapped.poolAcquire = poolAcquireTr
}
}

return tr
}

func (t *pgxTracer) TraceQueryStart(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryStartData) context.Context {
if !t.cfg.traceQuery {
return ctx
}
if t.wrapped.query != nil {
ctx = t.wrapped.query.TraceQueryStart(ctx, conn, data)
}
opts := t.spanOptions(conn.Config(), operationTypeQuery, data.SQL)
_, ctx = tracer.StartSpanFromContext(ctx, "pgx.query", opts...)
return ctx
}

func (t *pgxTracer) TraceQueryEnd(ctx context.Context, _ *pgx.Conn, data pgx.TraceQueryEndData) {
func (t *pgxTracer) TraceQueryEnd(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryEndData) {
if !t.cfg.traceQuery {
return
}
if t.wrapped.query != nil {
t.wrapped.query.TraceQueryEnd(ctx, conn, data)
}
span, ok := tracer.SpanFromContext(ctx)
if ok {
span.SetTag(tagRowsAffected, data.CommandTag.RowsAffected())
Expand All @@ -91,6 +131,9 @@ func (t *pgxTracer) TraceBatchStart(ctx context.Context, conn *pgx.Conn, data pg
if !t.cfg.traceBatch {
return ctx
}
if t.wrapped.batch != nil {
ctx = t.wrapped.batch.TraceBatchStart(ctx, conn, data)
}
opts := t.spanOptions(conn.Config(), operationTypeBatch, "",
tracer.Tag(tagBatchNumQueries, data.Batch.Len()),
)
Expand All @@ -102,6 +145,9 @@ func (t *pgxTracer) TraceBatchQuery(ctx context.Context, conn *pgx.Conn, data pg
if !t.cfg.traceBatch {
return
}
if t.wrapped.batch != nil {
t.wrapped.batch.TraceBatchQuery(ctx, conn, data)
}
// Finish the previous batch query span before starting the next one, since pgx doesn't provide hooks or timestamp
// information about when the actual operation started or finished.
// pgx.Batch* types don't support concurrency. This function doesn't support it either.
Expand All @@ -118,10 +164,13 @@ func (t *pgxTracer) TraceBatchQuery(ctx context.Context, conn *pgx.Conn, data pg
}
}

func (t *pgxTracer) TraceBatchEnd(ctx context.Context, _ *pgx.Conn, data pgx.TraceBatchEndData) {
func (t *pgxTracer) TraceBatchEnd(ctx context.Context, conn *pgx.Conn, data pgx.TraceBatchEndData) {
if !t.cfg.traceBatch {
return
}
if t.wrapped.batch != nil {
t.wrapped.batch.TraceBatchEnd(ctx, conn, data)
}
if t.prevBatchQuery != nil {
t.prevBatchQuery.finish()
t.prevBatchQuery = nil
Expand All @@ -133,6 +182,9 @@ func (t *pgxTracer) TraceCopyFromStart(ctx context.Context, conn *pgx.Conn, data
if !t.cfg.traceCopyFrom {
return ctx
}
if t.wrapped.copyFrom != nil {
ctx = t.wrapped.copyFrom.TraceCopyFromStart(ctx, conn, data)
}
opts := t.spanOptions(conn.Config(), operationTypeCopyFrom, "",
tracer.Tag(tagCopyFromTables, data.TableName),
tracer.Tag(tagCopyFromColumns, data.ColumnNames),
Expand All @@ -141,33 +193,45 @@ func (t *pgxTracer) TraceCopyFromStart(ctx context.Context, conn *pgx.Conn, data
return ctx
}

func (t *pgxTracer) TraceCopyFromEnd(ctx context.Context, _ *pgx.Conn, data pgx.TraceCopyFromEndData) {
func (t *pgxTracer) TraceCopyFromEnd(ctx context.Context, conn *pgx.Conn, data pgx.TraceCopyFromEndData) {
if !t.cfg.traceCopyFrom {
return
}
if t.wrapped.copyFrom != nil {
t.wrapped.copyFrom.TraceCopyFromEnd(ctx, conn, data)
}
finishSpan(ctx, data.Err)
}

func (t *pgxTracer) TracePrepareStart(ctx context.Context, conn *pgx.Conn, data pgx.TracePrepareStartData) context.Context {
if !t.cfg.tracePrepare {
return ctx
}
if t.wrapped.prepare != nil {
ctx = t.wrapped.prepare.TracePrepareStart(ctx, conn, data)
}
opts := t.spanOptions(conn.Config(), operationTypePrepare, data.SQL)
_, ctx = tracer.StartSpanFromContext(ctx, "pgx.prepare", opts...)
return ctx
}

func (t *pgxTracer) TracePrepareEnd(ctx context.Context, _ *pgx.Conn, data pgx.TracePrepareEndData) {
func (t *pgxTracer) TracePrepareEnd(ctx context.Context, conn *pgx.Conn, data pgx.TracePrepareEndData) {
if !t.cfg.tracePrepare {
return
}
if t.wrapped.prepare != nil {
t.wrapped.prepare.TracePrepareEnd(ctx, conn, data)
}
finishSpan(ctx, data.Err)
}

func (t *pgxTracer) TraceConnectStart(ctx context.Context, data pgx.TraceConnectStartData) context.Context {
if !t.cfg.traceConnect {
return ctx
}
if t.wrapped.connect != nil {
ctx = t.wrapped.connect.TraceConnectStart(ctx, data)
}
opts := t.spanOptions(data.ConnConfig, operationTypeConnect, "")
_, ctx = tracer.StartSpanFromContext(ctx, "pgx.connect", opts...)
return ctx
Expand All @@ -177,23 +241,31 @@ func (t *pgxTracer) TraceConnectEnd(ctx context.Context, data pgx.TraceConnectEn
if !t.cfg.traceConnect {
return
}
if t.wrapped.connect != nil {
t.wrapped.connect.TraceConnectEnd(ctx, data)
}
finishSpan(ctx, data.Err)
}

func (t *pgxTracer) TraceAcquireStart(ctx context.Context, pool *pgxpool.Pool, _ pgxpool.TraceAcquireStartData) context.Context {
func (t *pgxTracer) TraceAcquireStart(ctx context.Context, pool *pgxpool.Pool, data pgxpool.TraceAcquireStartData) context.Context {
if !t.cfg.traceAcquire {
return ctx
}
if t.wrapped.poolAcquire != nil {
ctx = t.wrapped.poolAcquire.TraceAcquireStart(ctx, pool, data)
}
opts := t.spanOptions(pool.Config().ConnConfig, operationTypeAcquire, "")
_, ctx = tracer.StartSpanFromContext(ctx, "pgx.pool.acquire", opts...)
return ctx
}

func (t *pgxTracer) TraceAcquireEnd(ctx context.Context, _ *pgxpool.Pool, data pgxpool.TraceAcquireEndData) {
func (t *pgxTracer) TraceAcquireEnd(ctx context.Context, pool *pgxpool.Pool, data pgxpool.TraceAcquireEndData) {
if !t.cfg.traceAcquire {
return
}

if t.wrapped.poolAcquire != nil {
t.wrapped.poolAcquire.TraceAcquireEnd(ctx, pool, data)
}
finishSpan(ctx, data.Err)
}

Expand Down
Loading

0 comments on commit 1366f6b

Please sign in to comment.