From 2d979ccb13e28b83456e57a662901f79d0ca036c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Mat=C4=9Bjka?= Date: Fri, 13 Dec 2024 18:49:43 +0100 Subject: [PATCH] feat(message-queue/golang): add PgMetrics to insert metrics into postgres refs #6 --- message-queue/golang/GNUmakefile | 4 +- message-queue/golang/cmd/main.go | 17 +- message-queue/golang/observer/postgres.go | 156 ++++++++++++++++ .../golang/observer/postgres_test.go | 175 ++++++++++++++++++ 4 files changed, 346 insertions(+), 6 deletions(-) create mode 100644 message-queue/golang/observer/postgres.go create mode 100644 message-queue/golang/observer/postgres_test.go diff --git a/message-queue/golang/GNUmakefile b/message-queue/golang/GNUmakefile index 8f6b2b69..dd76f189 100644 --- a/message-queue/golang/GNUmakefile +++ b/message-queue/golang/GNUmakefile @@ -14,8 +14,10 @@ format: ## Format code go fmt go fmt cmd/main.go + cd observer && go fmt .PHONY: check -check: +check: ## Run tests go test + cd observer && go test diff --git a/message-queue/golang/cmd/main.go b/message-queue/golang/cmd/main.go index 505e567e..cda6227a 100644 --- a/message-queue/golang/cmd/main.go +++ b/message-queue/golang/cmd/main.go @@ -7,6 +7,7 @@ import "time" import "sync" import "github.com/jan-matejka/code-golf/message-queue/golang" +import "github.com/jan-matejka/code-golf/message-queue/golang/observer" import "github.com/jackc/pgx/v4/pgxpool" @@ -104,13 +105,19 @@ func main() { } defer pool.Close() + pgm, err := postgres.NewPgMetrics("postgres://mq@localhost/mq?pool_max_conns=2048") + if err != nil { + die("Unable to connect to postgres metrics: %v\n", err) + } + sample := func(workers int) *golang.Results { r := sample_workers(app, workers, pool) - golang.PushMetrics( - app, - golang.SampleDesc{workers, "channels", "postgres"}, - r, - ) + sampleDesc := golang.SampleDesc{workers, "channels", "postgres"} + golang.PushMetrics(app, sampleDesc, r) + err := pgm.Push(context.Background(), app.Runtime, sampleDesc, r) + if err != nil { + die("%v", err) + } return r } diff --git a/message-queue/golang/observer/postgres.go b/message-queue/golang/observer/postgres.go new file mode 100644 index 00000000..ce79676a --- /dev/null +++ b/message-queue/golang/observer/postgres.go @@ -0,0 +1,156 @@ +package postgres + +import "context" + +import "github.com/jackc/pgx/v4/pgxpool" +import "github.com/jackc/pgx/v4" +import "github.com/jan-matejka/code-golf/message-queue/golang" + +type PgMetrics struct { + pool *pgxpool.Pool + runtime_id int +} + +func NewPgMetrics(dsn string) (*PgMetrics, error) { + pool, err := pgxpool.Connect(context.Background(), dsn) + if err != nil { + return nil, err + } + + pgm := &PgMetrics{pool, 0} + return pgm, nil +} + +func (p *PgMetrics) Push( + ctx context.Context, + runtime *golang.Runtime, + sample golang.SampleDesc, + rs *golang.Results, +) error { + tx, err := p.pool.Begin(ctx) + if err != nil { + return err + } + + if p.runtime_id == 0 { + runtime_id, err := runtimeId(ctx, tx, runtime) + if err != nil { + return err + } + p.runtime_id = runtime_id + } + + sample_id, err := sampleId(ctx, tx, p.runtime_id, sample) + if err != nil { + return err + } + + for _, r := range rs.Workers { + err = workerResult(ctx, tx, sample_id, r) + if err != nil { + return err + } + } + + err = tx.Commit(ctx) + if err != nil { + return err + } + return nil +} + +func runtimeId(ctx context.Context, tx pgx.Tx, r *golang.Runtime) (int, error) { + q := ` + insert into results.runtime ( + ctime, uuid, lang, lang_version, runtime, os, kernel, arch + ) values ( + $1, $2, $3, $4, $5, $6, $7, $8 + ) + returning id; + ` + rows, err := tx.Query( + ctx, + q, + r.Ctime, + r.Uuid, + r.Lang, + r.Lang_version, + r.Runtime, + r.Os, + r.Kernel, + r.Arch, + ) + if err != nil { + return 0, err + } + defer rows.Close() + + var id int + rows.Next() + err = rows.Scan(&id) + if err != nil { + return 0, err + } + return id, nil +} + +func sampleId(ctx context.Context, tx pgx.Tx, runtime_id int, sample golang.SampleDesc) (int, error) { + // TBD: pgxv5 supports named arguments + q := ` + with + sel as ( + select id from results.sample where + runtime_id = $1 + and n_workers = $2 + and algorithm = $3 + and mq_system = $4 + ), + ins as ( + insert into results.sample + (runtime_id, n_workers, algorithm, mq_system) + values + ($1, $2, $3, $4) + on conflict do nothing + returning id + ) + select * from ins + union + select * from sel + where id is not null; + ` + var id int + err := tx.QueryRow( + ctx, + q, + runtime_id, + sample.N_workers, + sample.Algorithm, + sample.Mq_system, + ).Scan(&id) + if err != nil { + return 0, err + } + return id, nil +} + +func workerResult(ctx context.Context, tx pgx.Tx, sample_id int, wr *golang.WorkerResult) error { + q := ` + insert into results.worker + (sample_id, worker_id, messages_total, duration_ns) + values + ($1, $2, $3, $4) + ` + _, err := tx.Exec( + ctx, + q, + sample_id, + wr.WorkerId, + wr.MessagesTotal, + wr.DurationNs, + ) + return err +} + +func (p *PgMetrics) Close() { + p.pool.Close() +} diff --git a/message-queue/golang/observer/postgres_test.go b/message-queue/golang/observer/postgres_test.go new file mode 100644 index 00000000..6c2d1624 --- /dev/null +++ b/message-queue/golang/observer/postgres_test.go @@ -0,0 +1,175 @@ +package postgres + +import ( + "context" + "reflect" + "testing" +) + +import "github.com/jackc/pgx/v4/pgxpool" +import "github.com/jan-matejka/code-golf/message-queue/golang" + +const TestRootDsn = "postgres://postgres@localhost:5433" +const TestMqDsn = "postgres://mq@localhost:5433/test" + +func mkTestDb() (*pgxpool.Pool, error) { + ctx := context.Background() + pool, err := pgxpool.Connect(ctx, TestRootDsn) + if err != nil { + return nil, err + } + defer pool.Close() + + _, err = pool.Exec(ctx, "drop database if exists test") + if err != nil { + return nil, err + } + _, err = pool.Exec(ctx, "create database test template mq") + if err != nil { + return nil, err + } + + pool, err = pgxpool.Connect(ctx, TestMqDsn) + if err != nil { + return nil, err + } + return pool, nil +} + +func TestPush(t *testing.T) { + pool, err := mkTestDb() + if err != nil { + t.Fatalf("%v", err) + } + defer pool.Close() + + pgm, err := NewPgMetrics(TestMqDsn) + if err != nil { + t.Fatalf("%v", err) + } + + r, err := golang.NewRuntime() + if err != nil { + t.Fatalf("%v", err) + } + + sdesc := golang.SampleDesc{2, "channels", "postgres"} + results := golang.NewResults() + results.Add(golang.NewWorkerResult(1, 10, 20)) + results.Add(golang.NewWorkerResult(2, 30, 40)) + + ctx := context.Background() + err = pgm.Push(ctx, r, sdesc, results) + if err != nil { + t.Fatalf("%v", err) + } + + q := ` + select + id,ctime,uuid,lang,lang_version,runtime,os,kernel,arch + from results.runtime + ` + rows, err := pool.Query(ctx, q) + defer rows.Close() + i := 0 + var runtime_id int + var r2 golang.Runtime + for rows.Next() { + err = rows.Scan( + &runtime_id, + &r2.Ctime, + &r2.Uuid, + &r2.Lang, + &r2.Lang_version, + &r2.Runtime, + &r2.Os, + &r2.Kernel, + &r2.Arch, + ) + if err != nil { + t.Fatalf("%v", err) + } + i += 1 + } + if i != 1 { + t.Fatalf("unexpcted rows, expected: 1, got: %v", i) + } + expected := r.Map() + got := r2.Map() + delete(expected, "ctime") // expected has tz, while got is tzless + delete(got, "ctime") + if !reflect.DeepEqual(expected, got) { + t.Fatalf("expected %v, got %v", expected, got) + } + + q = ` + select + id,runtime_id,n_workers,algorithm,mq_system + from results.sample + ` + rows, err = pool.Query(ctx, q) + if err != nil { + t.Fatalf("%v", err) + } + defer rows.Close() + i = 0 + var sample_id int + var r_id int + var s2 golang.SampleDesc + for rows.Next() { + i += 1 + err = rows.Scan(&sample_id, &r_id, &s2.N_workers, &s2.Algorithm, &s2.Mq_system) + if err != nil { + t.Fatalf("%v", err) + } + } + if i != 1 { + t.Fatalf("unexpected rows, expected: 1, got: %v", i) + } + if r_id != runtime_id { + t.Fatalf("Expected %v, got %v", runtime_id, r_id) + } + if !reflect.DeepEqual(sdesc, s2) { + t.Fatalf("expected %v, got %v", sdesc, s2) + } + + q = ` + select + id,sample_id,worker_id,messages_total,duration_ns + from results.worker + ` + rows, err = pool.Query(ctx, q) + if err != nil { + t.Fatalf("%v", err) + } + defer rows.Close() + var workers [][]interface{} + for rows.Next() { + vals, err := rows.Values() + if err != nil { + t.Fatalf("%v", err) + } + workers = append(workers, vals) + } + + var expected2 [][]interface{} + for i, wr := range results.Workers { + expected2 = append( + expected2, + []interface{}{ + int32(i + 1), + int32(sample_id), + int32(wr.WorkerId), + int32(wr.MessagesTotal), + int64(wr.DurationNs), + }, + ) + } + + if !reflect.DeepEqual(expected2, workers) { + // The DeepEqual might fail because of different integer types but it shows up the same under + // %v. If that happens, the values need to be examined individually under %v and %T. + // And this is why we cast the expected2 values above. + t.Fatalf("expected %v, got %v", expected2, workers) + } +}