diff --git a/README.md b/README.md index 707b2e8..981a20f 100644 --- a/README.md +++ b/README.md @@ -35,10 +35,23 @@ ALTER SYSTEM SET log_line_prefix='%m|%u|%d|%c|'; ALTER SYSTEM SET log_min_error_statement='log'; ALTER SYSTEM SET log_min_messages='error'; ALTER SYSTEM SET log_statement='all'; -ALTER SYSTEM SET log_min_duration_statement=0; +ALTER SYSTEM SET log_min_duration_statement=0; SELECT pg_reload_conf(); ``` +Or, if you need to capture logs for an RDS instance, you can use these parameters in your +instances parameter group: + +``` +log_destination = csvlog +log_connections = 1 +log_disconnections = 1 +log_min_error_statement = log +log_min_messages = error +log_statement = all +log_min_duration_statement = 0 +``` + ### 2. Take snapshot Now we're emitting logs we need to snapshot the database so that we can later @@ -91,7 +104,7 @@ the production cluster. Now create a copy of the original production cluster using the snapshot from (2). The aim is to have a cluster that exactly replicates production, providing -a reliable control for our experiment. +a reliable control for our experiment. The goal of this run will be to output Postgres logs that can be parsed by [pgBadger](https://github.com/darold/pgbadger) to provide an analysis of the diff --git a/cmd/pgreplay/main.go b/cmd/pgreplay/main.go index 63d71d1..432ae22 100644 --- a/cmd/pgreplay/main.go +++ b/cmd/pgreplay/main.go @@ -2,6 +2,7 @@ package main import ( "bufio" + "context" "fmt" stdlog "log" "net/http" @@ -16,7 +17,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/jackc/pgx" + "github.com/jackc/pgx/v5" ) var logger kitlog.Logger @@ -34,6 +35,7 @@ var ( filter = app.Command("filter", "Process an errlog file into a pgreplay preprocessed JSON log") filterJsonInput = filter.Flag("json-input", "JSON input file").ExistingFile() filterErrlogInput = filter.Flag("errlog-input", "Postgres errlog input file").ExistingFile() + filterCsvLogInput = filter.Flag("csvlog-input", "Postgres CSV log input file").ExistingFile() filterOutput = filter.Flag("output", "JSON output file").String() filterNullOutput = filter.Flag("null-output", "Don't output anything, for testing parsing only").Bool() @@ -42,8 +44,10 @@ var ( runPort = run.Flag("port", "PostgreSQL database port").Default("5432").Uint16() runDatname = run.Flag("database", "PostgreSQL root database").Default("postgres").String() runUser = run.Flag("user", "PostgreSQL root user").Default("postgres").String() + runPassword = run.Flag("password", "PostgreSQL root password").String() runReplayRate = run.Flag("replay-rate", "Rate of playback, will execute queries at Nx speed").Default("1").Float() runErrlogInput = run.Flag("errlog-input", "Path to PostgreSQL errlog").ExistingFile() + runCsvLogInput = run.Flag("csvlog-input", "Path to PostgreSQL CSV log").ExistingFile() runJsonInput = run.Flag("json-input", "Path to preprocessed pgreplay JSON log file").ExistingFile() ) @@ -81,11 +85,13 @@ func main() { case filter.FullCommand(): var items chan pgreplay.Item - switch checkSingleFormat(filterJsonInput, filterErrlogInput) { + switch checkSingleFormat(filterJsonInput, filterErrlogInput, filterCsvLogInput) { case filterJsonInput: items = parseLog(*filterJsonInput, pgreplay.ParseJSON) case filterErrlogInput: items = parseLog(*filterErrlogInput, pgreplay.ParseErrlog) + case filterCsvLogInput: + items = parseLog(*filterCsvLogInput, pgreplay.ParseCsvLog) } // Apply the start and end filters @@ -127,15 +133,20 @@ func main() { outputFile.Close() case run.FullCommand(): - database, err := pgreplay.NewDatabase( - pgx.ConnConfig{ - Host: *runHost, - Port: *runPort, - Database: *runDatname, - User: *runUser, - }, - ) + ctx := context.Background() + config, err := pgx.ParseConfig(fmt.Sprintf( + "host=%s port=%d dbname=%s user=%s password=%s", + *runHost, + *runPort, + *runDatname, + *runUser, + *runPassword, + )) + if err != nil { + kingpin.Fatalf("failed to parse postgres connection config: %v", err) + } + database, err := pgreplay.NewDatabase(ctx, config) if err != nil { logger.Log("event", "postgres.error", "error", err) os.Exit(255) @@ -143,11 +154,13 @@ func main() { var items chan pgreplay.Item - switch checkSingleFormat(runJsonInput, runErrlogInput) { + switch checkSingleFormat(runJsonInput, runErrlogInput, runCsvLogInput) { case runJsonInput: items = parseLog(*runJsonInput, pgreplay.ParseJSON) case runErrlogInput: items = parseLog(*runErrlogInput, pgreplay.ParseErrlog) + case runCsvLogInput: + items = parseLog(*runCsvLogInput, pgreplay.ParseCsvLog) } stream, err := pgreplay.NewStreamer(start, finish).Stream(items, *runReplayRate) @@ -155,7 +168,7 @@ func main() { kingpin.Fatalf("failed to start streamer: %s", err) } - errs, consumeDone := database.Consume(stream) + errs, consumeDone := database.Consume(ctx, stream) var status int diff --git a/go.mod b/go.mod index e543239..a7ad6c4 100644 --- a/go.mod +++ b/go.mod @@ -4,32 +4,27 @@ go 1.13 require ( github.com/alecthomas/kingpin v2.2.6+incompatible - github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc - github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf - github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 + github.com/cockroachdb/apd v1.1.0 // indirect github.com/eapache/channels v1.1.0 - github.com/eapache/queue v1.1.0 - github.com/fsnotify/fsnotify v1.4.7 + github.com/eapache/queue v1.1.0 // indirect github.com/go-kit/kit v0.8.0 - github.com/go-logfmt/logfmt v0.4.0 - github.com/golang/protobuf v1.2.0 - github.com/hpcloud/tail v1.0.0 - github.com/jackc/pgx v3.3.0+incompatible + github.com/go-logfmt/logfmt v0.4.0 // indirect + github.com/gofrs/uuid v4.4.0+incompatible // indirect + github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect + github.com/jackc/pgx v3.6.2+incompatible + github.com/jackc/pgx/v5 v5.3.1 github.com/json-iterator/go v1.1.5 - github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 - github.com/matttproud/golang_protobuf_extensions v1.0.1 - github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd - github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 + github.com/lib/pq v1.10.9 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 // indirect github.com/onsi/ginkgo v1.7.0 github.com/onsi/gomega v1.4.3 - github.com/pkg/errors v0.8.1 + github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v0.9.2 - github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 - github.com/prometheus/common v0.2.0 - github.com/prometheus/procfs v0.0.0-20190219184716-e4d4a2206da0 - golang.org/x/net v0.0.0-20190213061140-3a22650c66bd - golang.org/x/sys v0.0.0-20200107162124-548cf772de50 - golang.org/x/text v0.3.0 - gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 - gopkg.in/yaml.v2 v2.2.2 + github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 // indirect + github.com/prometheus/common v0.2.0 // indirect + github.com/prometheus/procfs v0.0.0-20190219184716-e4d4a2206da0 // indirect + github.com/shopspring/decimal v1.3.1 // indirect + golang.org/x/crypto v0.9.0 // indirect + gopkg.in/yaml.v2 v2.2.2 // indirect ) diff --git a/go.sum b/go.sum index 41960f1..45211b0 100644 --- a/go.sum +++ b/go.sum @@ -6,30 +6,59 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZq github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= +github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/eapache/channels v1.1.0 h1:F1taHcn7/F0i8DYqKXJnyhJcVpp2kgFcNePxXtnyu4k= github.com/eapache/channels v1.1.0/go.mod h1:jMm2qB5Ubtg9zLd+inMZd2/NUvXgzmWXsDaLyQIGfH0= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/go-kit/kit v0.8.0 h1:Wz+5lgoB0kkuqLEc6NVmwRknTKP6dTGbSqvhZtBI/j0= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0 h1:MP4Eh7ZCb31lleYCFuwm0oe4/YGak+5l1vA2NOE80nA= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= +github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gofrs/uuid v4.4.0+incompatible h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1YrTJupqA= +github.com/gofrs/uuid v4.4.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= -github.com/jackc/pgx v3.3.0+incompatible h1:Wa90/+qsITBAPkAZjiByeIGHFcj3Ztu+VzrrIpHjL90= -github.com/jackc/pgx v3.3.0+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I= +github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 h1:vr3AYkKovP8uR8AvSGGUK1IDqRa5lAAvEkZG1LKaCRc= +github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80sQsxDoMokWK1W5TQtxBFNpzWTD84ibQ= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx v3.6.2+incompatible h1:2zP5OD7kiyR3xzRYMhOcXVvkDZsImVXfj+yIyTQf3/o= +github.com/jackc/pgx v3.6.2+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I= +github.com/jackc/pgx/v5 v5.3.1 h1:Fcr8QJ1ZeLi5zsPZqQeUZhNhxfkkKBOgJuYkJHoBOtU= +github.com/jackc/pgx/v5 v5.3.1/go.mod h1:t3JDKnCBlYIc0ewLF0Q7B8MXmoIaBOZj/ic7iHozM/8= +github.com/jackc/puddle/v2 v2.2.0/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/json-iterator/go v1.1.5 h1:gL2yXlmiIo4+t+y32d4WGwOjKGYcGOuyrg46vadswDE= github.com/json-iterator/go v1.1.5/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 h1:T+h1c/A9Gawja4Y9mFVWj2vyii2bbUNDw3kt9VxK2EY= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= @@ -38,11 +67,14 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLD github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= -github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.2 h1:awm861/B8OKDd2I/6o1dy3ra4BamzKhYOiGItCeZ740= @@ -57,29 +89,86 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190219184716-e4d4a2206da0 h1:4+Tdy73otddqWxwK30bAMLH9ymeHQ1Y5+fmSoCF1XtU= github.com/prometheus/procfs v0.0.0-20190219184716-e4d4a2206da0/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= +github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g= +golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190222171317-cd391775e71e h1:oF7qaQxUH6KzFdKN4ww7NpPdo53SZi4UlcksLrb2y/o= -golang.org/x/sys v0.0.0-20190222171317-cd391775e71e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20200107162124-548cf772de50 h1:YvQ10rzcqWXLlJZ3XCUoO25savxmscf4+SC+ZqiCHhA= -golang.org/x/sys v0.0.0-20200107162124-548cf772de50/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/pgreplay/database.go b/pkg/pgreplay/database.go index ce384de..7d62d8f 100644 --- a/pkg/pgreplay/database.go +++ b/pkg/pgreplay/database.go @@ -1,11 +1,11 @@ package pgreplay import ( + "context" "sync" "github.com/eapache/channels" - "github.com/jackc/pgx" - "github.com/jackc/pgx/pgtype" + "github.com/jackc/pgx/v5" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) @@ -37,18 +37,17 @@ var ( ) ) -func NewDatabase(cfg pgx.ConnConfig) (*Database, error) { - conn, err := pgx.Connect(cfg) +func NewDatabase(ctx context.Context, cfg *pgx.ConnConfig) (*Database, error) { + conn, err := pgx.ConnectConfig(ctx, cfg) if err != nil { return nil, err } - return &Database{cfg, conn.ConnInfo, map[SessionID]*Conn{}}, conn.Close() + return &Database{cfg, map[SessionID]*Conn{}}, conn.Close(ctx) } type Database struct { - pgx.ConnConfig - *pgtype.ConnInfo + cfg *pgx.ConnConfig conns map[SessionID]*Conn } @@ -58,7 +57,7 @@ type Database struct { // indicate unrecoverable failures. // // Once all items have finished processing, both channels will be closed. -func (d *Database) Consume(items chan Item) (chan error, chan error) { +func (d *Database) Consume(ctx context.Context, items chan Item) (chan error, chan error) { var wg sync.WaitGroup errs, done := make(chan error, 10), make(chan error) @@ -70,7 +69,7 @@ func (d *Database) Consume(items chan Item) (chan error, chan error) { // Connection did not exist, so create a new one if !ok { - if conn, err = d.Connect(item); err != nil { + if conn, err = d.Connect(ctx, item); err != nil { errs <- err continue } @@ -85,7 +84,7 @@ func (d *Database) Consume(items chan Item) (chan error, chan error) { defer wg.Done() defer connectionsActive.Dec() - if err := conn.Start(); err != nil { + if err := conn.Start(ctx); err != nil { errs <- err } }(conn) @@ -111,14 +110,13 @@ func (d *Database) Consume(items chan Item) (chan error, chan error) { // Connect establishes a new connection to the database, reusing the ConnInfo that was // generated when the Database was constructed. The wg is incremented whenever we // establish a new connection and decremented when we disconnect. -func (d *Database) Connect(item Item) (*Conn, error) { - cfg := d.ConnConfig +func (d *Database) Connect(ctx context.Context, item Item) (*Conn, error) { + cfg := d.cfg cfg.Database, cfg.User = item.GetDatabase(), item.GetUser() - cfg.CustomConnInfo = func(_ *pgx.Conn) (*pgtype.ConnInfo, error) { - return d.ConnInfo.DeepCopy(), nil - } - - conn, err := pgx.Connect(cfg) + // cfg.CustomConnInfo = func(_ *pgx.Conn) (*pgtype.ConnInfo, error) { + // return d.ConnInfo.DeepCopy(), nil + // } + conn, err := pgx.ConnectConfig(ctx, cfg) if err != nil { return nil, err } @@ -137,9 +135,13 @@ func (c *Conn) Close() { c.Once.Do(c.Channel.Close) } +func (c *Conn) IsAlive(ctx context.Context) bool { + return c.Ping(ctx) == nil +} + // Start begins to process the items that are placed into the Conn's channel. We'll finish // once the connection has died or we run out of items to process. -func (c *Conn) Start() error { +func (c *Conn) Start(ctx context.Context) error { items := make(chan Item) channels.Unwrap(c.Channel, items) defer c.Close() @@ -152,10 +154,10 @@ func (c *Conn) Start() error { itemsProcessedTotal.Inc() itemsMostRecentTimestamp.Set(float64(item.GetTimestamp().Unix())) - err := item.Handle(c.Conn) + err := item.Handle(ctx, c.Conn) // If we're no longer alive, then we know we can no longer process items - if !c.IsAlive() { + if !c.IsAlive(ctx) { return err } } @@ -164,8 +166,8 @@ func (c *Conn) Start() error { // processing our logs before we saw this connection be disconnected. We should // terminate ourselves by handling our own disconnect, so we can know when all our // connection are done. - if c.IsAlive() { - Disconnect{}.Handle(c.Conn) + if c.IsAlive(ctx) { + Disconnect{}.Handle(ctx, c.Conn) } return nil diff --git a/pkg/pgreplay/integration/integration_test.go b/pkg/pgreplay/integration/integration_test.go index 51d972b..7738440 100644 --- a/pkg/pgreplay/integration/integration_test.go +++ b/pkg/pgreplay/integration/integration_test.go @@ -1,12 +1,14 @@ package integration import ( + "context" + "fmt" "os" "strconv" kitlog "github.com/go-kit/kit/log" "github.com/gocardless/pgreplay-go/pkg/pgreplay" - "github.com/jackc/pgx" + "github.com/jackc/pgx/v5" "github.com/onsi/gomega/types" . "github.com/onsi/ginkgo" @@ -19,28 +21,32 @@ var _ = Describe("pgreplay", func() { var ( conn *pgx.Conn logger = kitlog.NewLogfmtLogger(GinkgoWriter) - err error - - // We expect a Postgres database to be running for integration tests, and that - // environment variables are appropriately configured to permit access. - cfg = pgx.ConnConfig{ - Database: tryEnviron("PGDATABASE", "pgreplay_test"), - Host: tryEnviron("PGHOST", "127.0.0.1"), - User: tryEnviron("PGUSER", "pgreplay_test_users"), - Password: tryEnviron("PGPASSWORD", ""), - Port: uint16(mustAtoi(tryEnviron("PGPORT", "5432"))), - } ) DescribeTable("Replaying logfiles", func(parser pgreplay.ParserFunc, fixture string, matchLogs []types.GomegaMatcher) { - conn, err = pgx.Connect(cfg) + // We expect a Postgres database to be running for integration tests, and that + // environment variables are appropriately configured to permit access. + ctx := context.Background() + cfg, err := pgx.ParseConfig(fmt.Sprintf( + "host=%s port=%d dbname=%s user=%s password=%s", + tryEnviron("PGHOST", "127.0.0.1"), + uint16(mustAtoi(tryEnviron("PGPORT", "5432"))), + tryEnviron("PGDATABASE", "pgreplay_test"), + tryEnviron("PGUSER", "pgreplay_test_users"), + tryEnviron("PGPASSWORD", ""), + )) + if err != nil { + Fail(fmt.Sprintf("failed to parse postgres connection config: %v", err)) + } + + conn, err = pgx.ConnectConfig(ctx, cfg) Expect(err).NotTo(HaveOccurred(), "failed to connect to postgres") - _, err = conn.Exec(`TRUNCATE logs;`) + _, err = conn.Exec(ctx, `TRUNCATE logs;`) Expect(err).NotTo(HaveOccurred(), "failed to truncate logs table") - database, err := pgreplay.NewDatabase(cfg) + database, err := pgreplay.NewDatabase(ctx, cfg) Expect(err).NotTo(HaveOccurred()) log, err := os.Open(fixture) @@ -57,7 +63,7 @@ var _ = Describe("pgreplay", func() { stream, err := pgreplay.NewStreamer(nil, nil).Stream(items, 1.0) Expect(err).NotTo(HaveOccurred()) - errs, consumeDone := database.Consume(stream) + errs, consumeDone := database.Consume(ctx, stream) // Expect that we finish with no errors Eventually(consumeDone).Should(BeClosed()) @@ -109,7 +115,8 @@ func mustAtoi(numstr string) int { } func getLogs(conn *pgx.Conn) ([]interface{}, error) { - rows, err := conn.Query(`SELECT id::text, author, message FROM logs ORDER BY id;`) + ctx := context.Background() + rows, err := conn.Query(ctx, `SELECT id::text, author, message FROM logs ORDER BY id;`) if err != nil { return nil, err } diff --git a/pkg/pgreplay/parse.go b/pkg/pgreplay/parse.go index f4934c5..d21d821 100644 --- a/pkg/pgreplay/parse.go +++ b/pkg/pgreplay/parse.go @@ -3,6 +3,7 @@ package pgreplay import ( "bufio" "bytes" + "encoding/csv" "fmt" "io" "regexp" @@ -76,6 +77,45 @@ func ParseJSON(jsonlog io.Reader) (items chan Item, errs chan error, done chan e return } +func ParseCsvLog(csvlog io.Reader) (items chan Item, errs chan error, done chan error) { + reader := csv.NewReader(csvlog) + items, errs, done = make(chan Item, ItemBufferSize), make(chan error), make(chan error) + + go func() { + for { + logline, err := reader.Read() + if err == io.EOF { + break + } + if err != nil { + logLinesErrorTotal.Inc() + errs <- err + } + item, err := ParseCsvItem(logline) + if err != nil { + logLinesErrorTotal.Inc() + errs <- err + } + + if item != nil { + logLinesParsedTotal.Inc() + items <- item + } + } + + // Flush the item channel by pushing nil values up-to capacity + for i := 0; i < ItemBufferSize; i++ { + items <- nil + } + + close(items) + close(errs) + close(done) + }() + + return +} + // ParseErrlog generates a stream of Items from the given PostgreSQL errlog. Log line // parsing errors are returned down the errs channel, and we signal having finished our // parsing by sending a value down the done channel. @@ -128,6 +168,59 @@ const ( LogError = "ERROR: " ) +const ( + CsvLogDuration = "duration: " + CsvLogStatement = "statement:" + CsvLogConnectionReceived = "connection received:" + CsvLogConnectionAuthorized = "connection authorized:" +) + +// ParseCsvItem constructs a Item from a CSV log line. The format we accept is +// log_line_prefix='%t:%r:%u@%d:[%p]:' and log_destination='csvlog'. +func ParseCsvItem(logline []string) (Item, error) { + if len(logline) < 12 { + return nil, fmt.Errorf("failed to parse log line: '%s'", logline) + } + + ts, err := time.Parse(PostgresTimestampFormat, logline[0]) + if err != nil { + return nil, fmt.Errorf("failed to parse log timestamp: '%s': %v", logline[0], err) + } + + // 2023-06-09 01:50:01.825 UTC,"postgres","postgres",,,64828549.7698,,,,,,,, + user, database, session, msg := logline[1], logline[2], logline[5], logline[13] + + details := Details{ + Timestamp: ts, + SessionID: SessionID(session), + User: user, + Database: database, + } + + // statement:\nselect pg_reload_conf(); + if strings.Contains(msg, CsvLogStatement) { + return Statement{details, msg[len(CsvLogStatement):]}, nil + } + + // duration: 0.000 ms + if strings.Contains(msg, CsvLogDuration) { + return nil, nil + } + + // connection received: host=192.168.99.1 port=52188 + // We use connection authorized for replay, and can safely ignore connection received + if strings.HasPrefix(msg, CsvLogConnectionReceived) { + return nil, nil + } + + // connection authorized: user=postgres database=postgres + if strings.HasPrefix(msg, CsvLogConnectionAuthorized) { + return Connect{details}, nil + } + + return nil, fmt.Errorf("no parser matches line: %s", msg) +} + // ParseItem constructs a Item from Postgres errlogs. The format we accept is // log_line_prefix='%m|%u|%d|%c|', so we can split by | to discover each component. // @@ -265,7 +358,7 @@ func ParseItem(logline string, unbounds map[SessionID]*Execute, buffer []byte) ( // ParseBindParameters constructs an interface slice from the suffix of a DETAIL parameter // Postgres errlog. An example input to this function would be: // -// $1 = '', $2 = '30', $3 = '2018-05-03 10:26:27.905086+00' +// $1 = ”, $2 = '30', $3 = '2018-05-03 10:26:27.905086+00' // // ...and this would be parsed into []interface{"", "30", "2018-05-03 10:26:27.905086+00"} func ParseBindParameters(input string, buffer []byte) ([]interface{}, error) { @@ -345,8 +438,9 @@ func findClosingTag(input, marker, escapeSequence string) (idx int) { // from Postgres logs. Postgres errlog format looks like this: // // 2018-05-03|gc|LOG: duration: 0.096 ms parse : -// DELETE FROM que_jobs -// WHERE queue = $1::text +// +// DELETE FROM que_jobs +// WHERE queue = $1::text // // ...where a log line can spill over multiple lines, with trailing lines marked with a // preceding \t. diff --git a/pkg/pgreplay/parse_test.go b/pkg/pgreplay/parse_test.go index 7c27701..eb45968 100644 --- a/pkg/pgreplay/parse_test.go +++ b/pkg/pgreplay/parse_test.go @@ -11,6 +11,85 @@ import ( var time20190225, _ = time.Parse(PostgresTimestampFormat, "2019-02-25 15:08:27.222 GMT") +var _ = Describe("ParseCsvLog", func() { + DescribeTable("Parses", + func(input string, expected []Item) { + var items = []Item{} + itemsChan, errs, done := ParseCsvLog(strings.NewReader(input)) + go func() { + for _ = range errs { + // no-op, just drain the channel + } + }() + + for item := range itemsChan { + if item != nil { + items = append(items, item) + } + } + + Eventually(done).Should(BeClosed()) + Expect(len(items)).To(Equal(len(expected))) + + for idx, item := range items { + Expect(item).To(BeEquivalentTo(expected[idx])) + } + }, + Entry( + "queries and duration logs", + ` +2019-02-25 15:08:27.222 GMT,"postgres","postgres",7283,"199.167.158.43:57426",6480e39e.1c73,6374,"SELECT",2019-02-25 15:08:27.222 GMT,4/286618,0,LOG,00000,"connection received: host=127.0.0.1 port=59103",,,,,,,,,"","client backend" +2019-02-25 15:08:27.222 GMT,"postgres","postgres",7283,"199.167.158.43:57426",6480e39e.1c73,6374,"SELECT",2019-02-25 15:08:27.222 GMT,4/286618,0,LOG,00000,"connection authorized: user=alice database=pgreplay_test",,,,,,,,,"","client backend" +2019-02-25 15:08:27.222 GMT,"postgres","postgres",7283,"199.167.158.43:57426",6480e39e.1c73,6374,"SELECT",2019-02-25 15:08:27.222 GMT,4/286618,0,LOG,00000,"duration: 71.963 ms",,,,,,,,,"","client backend" +2019-02-25 15:08:27.222 GMT,"postgres","postgres",7283,"199.167.158.43:57426",6480e39e.1c73,6374,"SELECT",2019-02-25 15:08:27.222 GMT,4/286618,0,LOG,00000,"execute : select t.oid",,,,,,,,,"","client backend" +2019-02-25 15:08:27.222 GMT,"postgres","postgres",7283,"199.167.158.43:57426",6480e39e.1c73,6375,"idle in transaction",2019-02-25 15:08:27.222 GMT,4/286618,0,LOG,00000,"statement: + SELECT p.name, r.rating + FROM products p + JOIN reviews r ON p.id = r.product_id + WHERE r.rating IN ( + SELECT MIN(rating) FROM reviews + UNION + SELECT MAX(rating) FROM reviews + ); + ",,,,,,,,,"","client backend" +2019-02-25 15:08:27.222 GMT,"postgres","postgres",7283,"199.167.158.43:57426",6480e39e.1c73,6376,"SELECT",2019-02-25 15:08:27.222 GMT,4/286618,0,LOG,00000,"duration: 53.774 ms",,,,,,,,,"","client backend" +2019-02-25 15:08:27.222 GMT,"postgres","postgres",7283,"199.167.158.43:57426",6480e39e.1c73,6377,"idle in transaction",2019-02-25 15:08:27.222 GMT,4/286618,0,LOG,00000,"statement: + SELECT name, email + FROM users + WHERE email LIKE '@gmail.com'; + ",,,,,,,,,"","client backend"`, + []Item{ + Connect{ + Details{ + Timestamp: time20190225, + SessionID: "6480e39e.1c73", + User: "postgres", + Database: "postgres", + }, + }, + Statement{ + Details: Details{ + Timestamp: time20190225, + SessionID: "6480e39e.1c73", + User: "postgres", + Database: "postgres", + }, + Query: "\n\t\t\t\t\t\tSELECT p.name, r.rating\n\t\t\t\t\t\tFROM products p\n\t\t\t\t\t\tJOIN reviews r ON p.id = r.product_id\n\t\t\t\t\t\tWHERE r.rating IN (\n\t\t\t\t\t\tSELECT MIN(rating) FROM reviews\n\t\t\t\t\t\tUNION\n\t\t\t\t\t\tSELECT MAX(rating) FROM reviews\n\t\t\t\t\t\t);\n\t\t\t\t", + }, + Statement{ + Details: Details{ + Timestamp: time20190225, + SessionID: "6480e39e.1c73", + User: "postgres", + Database: "postgres", + }, + Query: "\n\t\t\t\t\t\tSELECT name, email\n\t\t\t\t\t\tFROM users\n\t\t\t\t\t\tWHERE email LIKE '@gmail.com';\n\t\t\t\t", + }, + }, + ), + ) +}) + var _ = Describe("ParseErrlog", func() { DescribeTable("Parses", func(input string, expected []Item) { diff --git a/pkg/pgreplay/types.go b/pkg/pgreplay/types.go index b1475de..498348b 100644 --- a/pkg/pgreplay/types.go +++ b/pkg/pgreplay/types.go @@ -1,12 +1,13 @@ package pgreplay import ( + "context" stdjson "encoding/json" "fmt" "time" - "github.com/jackc/pgx" - "github.com/json-iterator/go" + "github.com/jackc/pgx/v5" + jsoniter "github.com/json-iterator/go" ) var json = jsoniter.ConfigCompatibleWithStandardLibrary @@ -82,7 +83,7 @@ type Item interface { GetSessionID() SessionID GetUser() string GetDatabase() string - Handle(*pgx.Conn) error + Handle(context.Context, *pgx.Conn) error } type Details struct { @@ -99,14 +100,14 @@ func (e Details) GetDatabase() string { return e.Database } type Connect struct{ Details } -func (_ Connect) Handle(_ *pgx.Conn) error { +func (_ Connect) Handle(ctx context.Context, _ *pgx.Conn) error { return nil // Database will manage opening connections } type Disconnect struct{ Details } -func (_ Disconnect) Handle(conn *pgx.Conn) error { - return conn.Close() +func (_ Disconnect) Handle(ctx context.Context, conn *pgx.Conn) error { + return conn.Close(ctx) } type Statement struct { @@ -114,8 +115,8 @@ type Statement struct { Query string `json:"query"` } -func (s Statement) Handle(conn *pgx.Conn) error { - _, err := conn.Exec(s.Query) +func (s Statement) Handle(ctx context.Context, conn *pgx.Conn) error { + _, err := conn.Exec(ctx, s.Query) return err } @@ -141,7 +142,7 @@ type BoundExecute struct { Parameters []interface{} `json:"parameters"` } -func (e BoundExecute) Handle(conn *pgx.Conn) error { - _, err := conn.Exec(e.Query, e.Parameters...) +func (e BoundExecute) Handle(ctx context.Context, conn *pgx.Conn) error { + _, err := conn.Exec(ctx, e.Query, e.Parameters...) return err }