Skip to content
This repository has been archived by the owner on Jan 8, 2024. It is now read-only.

support csv logs #18

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,23 @@ ALTER SYSTEM SET log_line_prefix='%m|%u|%d|%c|';
ALTER SYSTEM SET log_min_error_statement='log';
ALTER SYSTEM SET log_min_messages='error';
ALTER SYSTEM SET log_statement='all';
ALTER SYSTEM SET log_min_duration_statement=0;
ALTER SYSTEM SET log_min_duration_statement=0;
SELECT pg_reload_conf();
```

Or, if you need to capture logs for an RDS instance, you can use these parameters in your
instances parameter group:

```
log_destination = csvlog
log_connections = 1
log_disconnections = 1
log_min_error_statement = log
log_min_messages = error
log_statement = all
log_min_duration_statement = 0
```

### 2. Take snapshot

Now we're emitting logs we need to snapshot the database so that we can later
Expand Down Expand Up @@ -91,7 +104,7 @@ the production cluster.

Now create a copy of the original production cluster using the snapshot from
(2). The aim is to have a cluster that exactly replicates production, providing
a reliable control for our experiment.
a reliable control for our experiment.

The goal of this run will be to output Postgres logs that can be parsed by
[pgBadger](https://github.com/darold/pgbadger) to provide an analysis of the
Expand Down
37 changes: 25 additions & 12 deletions cmd/pgreplay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"bufio"
"context"
"fmt"
stdlog "log"
"net/http"
Expand All @@ -16,7 +17,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/jackc/pgx"
"github.com/jackc/pgx/v5"
)

var logger kitlog.Logger
Expand All @@ -34,6 +35,7 @@ var (
filter = app.Command("filter", "Process an errlog file into a pgreplay preprocessed JSON log")
filterJsonInput = filter.Flag("json-input", "JSON input file").ExistingFile()
filterErrlogInput = filter.Flag("errlog-input", "Postgres errlog input file").ExistingFile()
filterCsvLogInput = filter.Flag("csvlog-input", "Postgres CSV log input file").ExistingFile()
filterOutput = filter.Flag("output", "JSON output file").String()
filterNullOutput = filter.Flag("null-output", "Don't output anything, for testing parsing only").Bool()

Expand All @@ -42,8 +44,10 @@ var (
runPort = run.Flag("port", "PostgreSQL database port").Default("5432").Uint16()
runDatname = run.Flag("database", "PostgreSQL root database").Default("postgres").String()
runUser = run.Flag("user", "PostgreSQL root user").Default("postgres").String()
runPassword = run.Flag("password", "PostgreSQL root password").String()
runReplayRate = run.Flag("replay-rate", "Rate of playback, will execute queries at Nx speed").Default("1").Float()
runErrlogInput = run.Flag("errlog-input", "Path to PostgreSQL errlog").ExistingFile()
runCsvLogInput = run.Flag("csvlog-input", "Path to PostgreSQL CSV log").ExistingFile()
runJsonInput = run.Flag("json-input", "Path to preprocessed pgreplay JSON log file").ExistingFile()
)

Expand Down Expand Up @@ -81,11 +85,13 @@ func main() {
case filter.FullCommand():
var items chan pgreplay.Item

switch checkSingleFormat(filterJsonInput, filterErrlogInput) {
switch checkSingleFormat(filterJsonInput, filterErrlogInput, filterCsvLogInput) {
case filterJsonInput:
items = parseLog(*filterJsonInput, pgreplay.ParseJSON)
case filterErrlogInput:
items = parseLog(*filterErrlogInput, pgreplay.ParseErrlog)
case filterCsvLogInput:
items = parseLog(*filterCsvLogInput, pgreplay.ParseCsvLog)
}

// Apply the start and end filters
Expand Down Expand Up @@ -127,35 +133,42 @@ func main() {
outputFile.Close()

case run.FullCommand():
database, err := pgreplay.NewDatabase(
pgx.ConnConfig{
Host: *runHost,
Port: *runPort,
Database: *runDatname,
User: *runUser,
},
)
ctx := context.Background()
config, err := pgx.ParseConfig(fmt.Sprintf(
"host=%s port=%d dbname=%s user=%s password=%s",
*runHost,
*runPort,
*runDatname,
*runUser,
*runPassword,
))
if err != nil {
kingpin.Fatalf("failed to parse postgres connection config: %v", err)
}

database, err := pgreplay.NewDatabase(ctx, config)
if err != nil {
logger.Log("event", "postgres.error", "error", err)
os.Exit(255)
}

var items chan pgreplay.Item

switch checkSingleFormat(runJsonInput, runErrlogInput) {
switch checkSingleFormat(runJsonInput, runErrlogInput, runCsvLogInput) {
case runJsonInput:
items = parseLog(*runJsonInput, pgreplay.ParseJSON)
case runErrlogInput:
items = parseLog(*runErrlogInput, pgreplay.ParseErrlog)
case runCsvLogInput:
items = parseLog(*runCsvLogInput, pgreplay.ParseCsvLog)
}

stream, err := pgreplay.NewStreamer(start, finish).Stream(items, *runReplayRate)
if err != nil {
kingpin.Fatalf("failed to start streamer: %s", err)
}

errs, consumeDone := database.Consume(stream)
errs, consumeDone := database.Consume(ctx, stream)

var status int

Expand Down
39 changes: 17 additions & 22 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,27 @@ go 1.13

require (
github.com/alecthomas/kingpin v2.2.6+incompatible
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973
github.com/cockroachdb/apd v1.1.0 // indirect
github.com/eapache/channels v1.1.0
github.com/eapache/queue v1.1.0
github.com/fsnotify/fsnotify v1.4.7
github.com/eapache/queue v1.1.0 // indirect
github.com/go-kit/kit v0.8.0
github.com/go-logfmt/logfmt v0.4.0
github.com/golang/protobuf v1.2.0
github.com/hpcloud/tail v1.0.0
github.com/jackc/pgx v3.3.0+incompatible
github.com/go-logfmt/logfmt v0.4.0 // indirect
github.com/gofrs/uuid v4.4.0+incompatible // indirect
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect
github.com/jackc/pgx v3.6.2+incompatible
github.com/jackc/pgx/v5 v5.3.1
github.com/json-iterator/go v1.1.5
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515
github.com/matttproud/golang_protobuf_extensions v1.0.1
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742
github.com/lib/pq v1.10.9 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 // indirect
github.com/onsi/ginkgo v1.7.0
github.com/onsi/gomega v1.4.3
github.com/pkg/errors v0.8.1
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v0.9.2
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90
github.com/prometheus/common v0.2.0
github.com/prometheus/procfs v0.0.0-20190219184716-e4d4a2206da0
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd
golang.org/x/sys v0.0.0-20200107162124-548cf772de50
golang.org/x/text v0.3.0
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7
gopkg.in/yaml.v2 v2.2.2
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 // indirect
github.com/prometheus/common v0.2.0 // indirect
github.com/prometheus/procfs v0.0.0-20190219184716-e4d4a2206da0 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
golang.org/x/crypto v0.9.0 // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect
)
Loading