Skip to content

Commit

Permalink
feat(message-queue/golang): add PgMetrics to insert metrics into post…
Browse files Browse the repository at this point in the history
…gres refs #6
  • Loading branch information
jan-matejka committed Dec 13, 2024
1 parent e68e7b0 commit 2d979cc
Show file tree
Hide file tree
Showing 4 changed files with 346 additions and 6 deletions.
4 changes: 3 additions & 1 deletion message-queue/golang/GNUmakefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ format: ## Format code

go fmt
go fmt cmd/main.go
cd observer && go fmt

.PHONY: check
check:
check: ## Run tests

go test
cd observer && go test
17 changes: 12 additions & 5 deletions message-queue/golang/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import "time"
import "sync"

import "github.com/jan-matejka/code-golf/message-queue/golang"
import "github.com/jan-matejka/code-golf/message-queue/golang/observer"

import "github.com/jackc/pgx/v4/pgxpool"

Expand Down Expand Up @@ -104,13 +105,19 @@ func main() {
}
defer pool.Close()

pgm, err := postgres.NewPgMetrics("postgres://mq@localhost/mq?pool_max_conns=2048")
if err != nil {
die("Unable to connect to postgres metrics: %v\n", err)
}

sample := func(workers int) *golang.Results {
r := sample_workers(app, workers, pool)
golang.PushMetrics(
app,
golang.SampleDesc{workers, "channels", "postgres"},
r,
)
sampleDesc := golang.SampleDesc{workers, "channels", "postgres"}
golang.PushMetrics(app, sampleDesc, r)
err := pgm.Push(context.Background(), app.Runtime, sampleDesc, r)
if err != nil {
die("%v", err)
}
return r
}

Expand Down
156 changes: 156 additions & 0 deletions message-queue/golang/observer/postgres.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package postgres

import "context"

import "github.com/jackc/pgx/v4/pgxpool"
import "github.com/jackc/pgx/v4"
import "github.com/jan-matejka/code-golf/message-queue/golang"

type PgMetrics struct {
pool *pgxpool.Pool
runtime_id int
}

func NewPgMetrics(dsn string) (*PgMetrics, error) {
pool, err := pgxpool.Connect(context.Background(), dsn)
if err != nil {
return nil, err
}

pgm := &PgMetrics{pool, 0}
return pgm, nil
}

func (p *PgMetrics) Push(
ctx context.Context,
runtime *golang.Runtime,
sample golang.SampleDesc,
rs *golang.Results,
) error {
tx, err := p.pool.Begin(ctx)
if err != nil {
return err
}

if p.runtime_id == 0 {
runtime_id, err := runtimeId(ctx, tx, runtime)
if err != nil {
return err
}
p.runtime_id = runtime_id
}

sample_id, err := sampleId(ctx, tx, p.runtime_id, sample)
if err != nil {
return err
}

for _, r := range rs.Workers {
err = workerResult(ctx, tx, sample_id, r)
if err != nil {
return err
}
}

err = tx.Commit(ctx)
if err != nil {
return err
}
return nil
}

func runtimeId(ctx context.Context, tx pgx.Tx, r *golang.Runtime) (int, error) {
q := `
insert into results.runtime (
ctime, uuid, lang, lang_version, runtime, os, kernel, arch
) values (
$1, $2, $3, $4, $5, $6, $7, $8
)
returning id;
`
rows, err := tx.Query(
ctx,
q,
r.Ctime,
r.Uuid,
r.Lang,
r.Lang_version,
r.Runtime,
r.Os,
r.Kernel,
r.Arch,
)
if err != nil {
return 0, err
}
defer rows.Close()

var id int
rows.Next()
err = rows.Scan(&id)
if err != nil {
return 0, err
}
return id, nil
}

func sampleId(ctx context.Context, tx pgx.Tx, runtime_id int, sample golang.SampleDesc) (int, error) {
// TBD: pgxv5 supports named arguments
q := `
with
sel as (
select id from results.sample where
runtime_id = $1
and n_workers = $2
and algorithm = $3
and mq_system = $4
),
ins as (
insert into results.sample
(runtime_id, n_workers, algorithm, mq_system)
values
($1, $2, $3, $4)
on conflict do nothing
returning id
)
select * from ins
union
select * from sel
where id is not null;
`
var id int
err := tx.QueryRow(
ctx,
q,
runtime_id,
sample.N_workers,
sample.Algorithm,
sample.Mq_system,
).Scan(&id)
if err != nil {
return 0, err
}
return id, nil
}

func workerResult(ctx context.Context, tx pgx.Tx, sample_id int, wr *golang.WorkerResult) error {
q := `
insert into results.worker
(sample_id, worker_id, messages_total, duration_ns)
values
($1, $2, $3, $4)
`
_, err := tx.Exec(
ctx,
q,
sample_id,
wr.WorkerId,
wr.MessagesTotal,
wr.DurationNs,
)
return err
}

func (p *PgMetrics) Close() {
p.pool.Close()
}
175 changes: 175 additions & 0 deletions message-queue/golang/observer/postgres_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package postgres

import (
"context"
"reflect"
"testing"
)

import "github.com/jackc/pgx/v4/pgxpool"
import "github.com/jan-matejka/code-golf/message-queue/golang"

const TestRootDsn = "postgres://postgres@localhost:5433"
const TestMqDsn = "postgres://mq@localhost:5433/test"

func mkTestDb() (*pgxpool.Pool, error) {
ctx := context.Background()
pool, err := pgxpool.Connect(ctx, TestRootDsn)
if err != nil {
return nil, err
}
defer pool.Close()

_, err = pool.Exec(ctx, "drop database if exists test")
if err != nil {
return nil, err
}
_, err = pool.Exec(ctx, "create database test template mq")
if err != nil {
return nil, err
}

pool, err = pgxpool.Connect(ctx, TestMqDsn)
if err != nil {
return nil, err
}
return pool, nil
}

func TestPush(t *testing.T) {
pool, err := mkTestDb()
if err != nil {
t.Fatalf("%v", err)
}
defer pool.Close()

pgm, err := NewPgMetrics(TestMqDsn)
if err != nil {
t.Fatalf("%v", err)
}

r, err := golang.NewRuntime()
if err != nil {
t.Fatalf("%v", err)
}

sdesc := golang.SampleDesc{2, "channels", "postgres"}
results := golang.NewResults()
results.Add(golang.NewWorkerResult(1, 10, 20))
results.Add(golang.NewWorkerResult(2, 30, 40))

ctx := context.Background()
err = pgm.Push(ctx, r, sdesc, results)
if err != nil {
t.Fatalf("%v", err)
}

q := `
select
id,ctime,uuid,lang,lang_version,runtime,os,kernel,arch
from results.runtime
`
rows, err := pool.Query(ctx, q)
defer rows.Close()
i := 0
var runtime_id int
var r2 golang.Runtime
for rows.Next() {
err = rows.Scan(
&runtime_id,
&r2.Ctime,
&r2.Uuid,
&r2.Lang,
&r2.Lang_version,
&r2.Runtime,
&r2.Os,
&r2.Kernel,
&r2.Arch,
)
if err != nil {
t.Fatalf("%v", err)
}
i += 1
}
if i != 1 {
t.Fatalf("unexpcted rows, expected: 1, got: %v", i)
}
expected := r.Map()
got := r2.Map()
delete(expected, "ctime") // expected has tz, while got is tzless
delete(got, "ctime")
if !reflect.DeepEqual(expected, got) {
t.Fatalf("expected %v, got %v", expected, got)
}

q = `
select
id,runtime_id,n_workers,algorithm,mq_system
from results.sample
`
rows, err = pool.Query(ctx, q)
if err != nil {
t.Fatalf("%v", err)
}
defer rows.Close()
i = 0
var sample_id int
var r_id int
var s2 golang.SampleDesc
for rows.Next() {
i += 1
err = rows.Scan(&sample_id, &r_id, &s2.N_workers, &s2.Algorithm, &s2.Mq_system)
if err != nil {
t.Fatalf("%v", err)
}
}
if i != 1 {
t.Fatalf("unexpected rows, expected: 1, got: %v", i)
}
if r_id != runtime_id {
t.Fatalf("Expected %v, got %v", runtime_id, r_id)
}
if !reflect.DeepEqual(sdesc, s2) {
t.Fatalf("expected %v, got %v", sdesc, s2)
}

q = `
select
id,sample_id,worker_id,messages_total,duration_ns
from results.worker
`
rows, err = pool.Query(ctx, q)
if err != nil {
t.Fatalf("%v", err)
}
defer rows.Close()
var workers [][]interface{}
for rows.Next() {
vals, err := rows.Values()
if err != nil {
t.Fatalf("%v", err)
}
workers = append(workers, vals)
}

var expected2 [][]interface{}
for i, wr := range results.Workers {
expected2 = append(
expected2,
[]interface{}{
int32(i + 1),
int32(sample_id),
int32(wr.WorkerId),
int32(wr.MessagesTotal),
int64(wr.DurationNs),
},
)
}

if !reflect.DeepEqual(expected2, workers) {
// The DeepEqual might fail because of different integer types but it shows up the same under
// %v. If that happens, the values need to be examined individually under %v and %T.
// And this is why we cast the expected2 values above.
t.Fatalf("expected %v, got %v", expected2, workers)
}
}

0 comments on commit 2d979cc

Please sign in to comment.