From 70593b13d8d09ab3345239a70a47c2fa8671a966 Mon Sep 17 00:00:00 2001 From: pavel-raykov Date: Mon, 4 Nov 2024 18:07:53 +0100 Subject: [PATCH 1/5] Extract txdb into the common repository to make it reusable from other repositories. --- go.mod | 2 +- pkg/sqlutil/txdb.go | 471 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 472 insertions(+), 1 deletion(-) create mode 100644 pkg/sqlutil/txdb.go diff --git a/go.mod b/go.mod index 7e1981644..aa61037b9 100644 --- a/go.mod +++ b/go.mod @@ -100,7 +100,7 @@ require ( github.com/x448/float16 v0.8.4 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.30.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect - go.uber.org/multierr v1.11.0 // indirect + go.uber.org/multierr v1.11.0 golang.org/x/mod v0.21.0 // indirect golang.org/x/net v0.29.0 // indirect golang.org/x/sync v0.8.0 // indirect diff --git a/pkg/sqlutil/txdb.go b/pkg/sqlutil/txdb.go new file mode 100644 index 000000000..c3a1cd1ef --- /dev/null +++ b/pkg/sqlutil/txdb.go @@ -0,0 +1,471 @@ +package sqlutil + +import ( + "context" + "database/sql" + "database/sql/driver" + "go.uber.org/multierr" + "io" + "net/url" + "sync" +) + +// txdb is a simplified version of https://github.com/DATA-DOG/go-txdb +// +// The original lib has various problems and is hard to understand because it +// tries to be more general. The version in this file is more tightly focused +// to our needs and should be easier to reason about and less likely to have +// subtle bugs/races. +// +// It doesn't currently support savepoints but could be made to if necessary. +// +// Transaction BEGIN/ROLLBACK effectively becomes a no-op, this should have no +// negative impact on normal test operation. +// +// If you MUST test BEGIN/ROLLBACK behaviour, you will have to configure your +// store to use the raw DialectPostgres dialect and setup a one-use database. +// See heavyweight.FullTestDB() as a convenience function to help you do this, +// but please use sparingly because as it's name implies, it is expensive. + +var _ driver.Conn = &conn{} + +var _ driver.Validator = &conn{} +var _ driver.SessionResetter = &conn{} + +// txDriver is an sql driver which runs on a single transaction. +// When `Close` is called, transaction is rolled back. +type txDriver struct { + sync.Mutex + db *sql.DB + conns map[string]*conn + + dbURL string +} + +func (d *txDriver) Open(dsn string) (driver.Conn, error) { + d.Lock() + defer d.Unlock() + // Open real db connection if its the first call + if d.db == nil { + parsedDB, err := url.Parse(d.dbURL) + if err != nil { + return nil, err + } + db, err := sql.Open(parsedDB.Scheme, d.dbURL) + if err != nil { + return nil, err + } + d.db = db + } + c, exists := d.conns[dsn] + if !exists || !c.tryOpen() { + tx, err := d.db.Begin() + if err != nil { + return nil, err + } + c = &conn{tx: tx, opened: 1, dsn: dsn} + c.removeSelf = func() error { + return d.deleteConn(c) + } + d.conns[dsn] = c + } + return c, nil +} + +// deleteConn is called by a connection when it is closed via the `close` method. +// It also auto-closes the DB when the last checked out connection is closed. +func (d *txDriver) deleteConn(c *conn) error { + // must lock here to avoid racing with Open + d.Lock() + defer d.Unlock() + + if d.conns[c.dsn] != c { + return nil // already been replaced + } + delete(d.conns, c.dsn) + if len(d.conns) == 0 && d.db != nil { + if err := d.db.Close(); err != nil { + return err + } + d.db = nil + } + return nil +} + +type conn struct { + sync.Mutex + dsn string + tx *sql.Tx // tx may be shared by many conns, definitive one lives in the map keyed by DSN on the txDriver. Do not modify from conn + closed bool + opened int + removeSelf func() error +} + +func (c *conn) Begin() (driver.Tx, error) { + c.Lock() + defer c.Unlock() + if c.closed { + panic("conn is closed") + } + // Begin is a noop because the transaction was already opened + return tx{c.tx}, nil +} + +// Implement the "ConnBeginTx" interface +func (c *conn) BeginTx(_ context.Context, opts driver.TxOptions) (driver.Tx, error) { + // Context is ignored, because single transaction is shared by all callers, thus caller should not be able to + // control it with local context + return c.Begin() +} + +// Prepare returns a prepared statement, bound to this connection. +func (c *conn) Prepare(query string) (driver.Stmt, error) { + return c.PrepareContext(context.Background(), query) +} + +// Implement the "ConnPrepareContext" interface +func (c *conn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) { + c.Lock() + defer c.Unlock() + if c.closed { + panic("conn is closed") + } + + // TODO: Fix context handling + // FIXME: It is not safe to give the passed in context to the tx directly + // because the tx is shared by many conns and cancelling the context will + // destroy the tx which can affect other conns + st, err := c.tx.PrepareContext(context.Background(), query) + if err != nil { + return nil, err + } + return &stmt{st, c}, nil +} + +// IsValid is called prior to placing the connection into the +// connection pool by database/sql. The connection will be discarded if false is returned. +func (c *conn) IsValid() bool { + c.Lock() + defer c.Unlock() + return !c.closed +} + +func (c *conn) ResetSession(ctx context.Context) error { + // Ensure bad connections are reported: From database/sql/driver: + // If a connection is never returned to the connection pool but immediately reused, then + // ResetSession is called prior to reuse but IsValid is not called. + c.Lock() + defer c.Unlock() + if c.closed { + return driver.ErrBadConn + } + + return nil +} + +// pgx returns nil +func (c *conn) CheckNamedValue(nv *driver.NamedValue) error { + return nil +} + +// Implement the "QueryerContext" interface +func (c *conn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) { + c.Lock() + defer c.Unlock() + if c.closed { + panic("conn is closed") + } + + // TODO: Fix context handling + rs, err := c.tx.QueryContext(context.Background(), query, mapNamedArgs(args)...) + if err != nil { + return nil, err + } + defer rs.Close() + + return buildRows(rs) +} + +// Implement the "ExecerContext" interface +func (c *conn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) { + c.Lock() + defer c.Unlock() + if c.closed { + panic("conn is closed") + } + // TODO: Fix context handling + return c.tx.ExecContext(context.Background(), query, mapNamedArgs(args)...) +} + +// tryOpen attempts to increment the open count, but returns false if closed. +func (c *conn) tryOpen() bool { + c.Lock() + defer c.Unlock() + if c.closed { + return false + } + c.opened++ + return true +} + +// Close invalidates and potentially stops any current +// prepared statements and transactions, marking this +// connection as no longer in use. +// +// Because the sql package maintains a free pool of +// connections and only calls Close when there's a surplus of +// idle connections, it shouldn't be necessary for drivers to +// do their own connection caching. +// +// Drivers must ensure all network calls made by Close +// do not block indefinitely (e.g. apply a timeout). +func (c *conn) Close() (err error) { + if !c.close() { + return + } + // Wait to remove self to avoid nesting locks. + if err := c.removeSelf(); err != nil { + panic(err) + } + return +} + +func (c *conn) close() bool { + c.Lock() + defer c.Unlock() + if c.closed { + // Double close, should be a safe to make this a noop + // PGX allows double close + // See: https://github.com/jackc/pgx/blob/a457da8bffa4f90ad672fa093ee87f20cf06687b/conn.go#L249 + return false + } + + c.opened-- + if c.opened > 0 { + return false + } + if c.tx != nil { + if err := c.tx.Rollback(); err != nil { + panic(err) + } + c.tx = nil + } + c.closed = true + return true +} + +type tx struct { + tx *sql.Tx +} + +func (tx tx) Commit() error { + // Commit is a noop because the transaction will be rolled back at the end + return nil +} + +func (tx tx) Rollback() error { + // Rollback is a noop because the transaction will be rolled back at the end + return nil +} + +type stmt struct { + st *sql.Stmt + conn *conn +} + +func (s stmt) Exec(args []driver.Value) (driver.Result, error) { + s.conn.Lock() + defer s.conn.Unlock() + if s.conn.closed { + panic("conn is closed") + } + return s.st.Exec(mapArgs(args)...) +} + +// Implement the "StmtExecContext" interface +func (s *stmt) ExecContext(ctx context.Context, args []driver.NamedValue) (driver.Result, error) { + s.conn.Lock() + defer s.conn.Unlock() + if s.conn.closed { + panic("conn is closed") + } + // TODO: Fix context handling + return s.st.ExecContext(context.Background(), mapNamedArgs(args)...) +} + +func mapArgs(args []driver.Value) (res []interface{}) { + res = make([]interface{}, len(args)) + for i := range args { + res[i] = args[i] + } + return +} + +func (s stmt) NumInput() int { + return -1 +} + +func (s stmt) Query(args []driver.Value) (driver.Rows, error) { + s.conn.Lock() + defer s.conn.Unlock() + if s.conn.closed { + panic("conn is closed") + } + rows, err := s.st.Query(mapArgs(args)...) + defer func() { + err = multierr.Combine(err, rows.Close()) + }() + if err != nil { + return nil, err + } + return buildRows(rows) +} + +// Implement the "StmtQueryContext" interface +func (s *stmt) QueryContext(ctx context.Context, args []driver.NamedValue) (driver.Rows, error) { + s.conn.Lock() + defer s.conn.Unlock() + if s.conn.closed { + panic("conn is closed") + } + // TODO: Fix context handling + rows, err := s.st.QueryContext(context.Background(), mapNamedArgs(args)...) + if err != nil { + return nil, err + } + return buildRows(rows) +} + +func (s stmt) Close() error { + s.conn.Lock() + defer s.conn.Unlock() + return s.st.Close() +} + +func buildRows(r *sql.Rows) (driver.Rows, error) { + set := &rowSets{} + rs := &rows{} + if err := rs.read(r); err != nil { + return set, err + } + set.sets = append(set.sets, rs) + for r.NextResultSet() { + rss := &rows{} + if err := rss.read(r); err != nil { + return set, err + } + set.sets = append(set.sets, rss) + } + return set, nil +} + +// Implement the "RowsNextResultSet" interface +func (rs *rowSets) HasNextResultSet() bool { + return rs.pos+1 < len(rs.sets) +} + +// Implement the "RowsNextResultSet" interface +func (rs *rowSets) NextResultSet() error { + if !rs.HasNextResultSet() { + return io.EOF + } + + rs.pos++ + return nil +} + +type rows struct { + rows [][]driver.Value + pos int + cols []string + colTypes []*sql.ColumnType +} + +func (r *rows) Columns() []string { + return r.cols +} + +func (r *rows) ColumnTypeDatabaseTypeName(index int) string { + return r.colTypes[index].DatabaseTypeName() +} + +func (r *rows) Next(dest []driver.Value) error { + r.pos++ + if r.pos > len(r.rows) { + return io.EOF + } + + for i, val := range r.rows[r.pos-1] { + dest[i] = *(val.(*interface{})) + } + + return nil +} + +func (r *rows) Close() error { + return nil +} + +func (r *rows) read(rs *sql.Rows) error { + var err error + r.cols, err = rs.Columns() + if err != nil { + return err + } + + r.colTypes, err = rs.ColumnTypes() + if err != nil { + return err + } + + for rs.Next() { + values := make([]interface{}, len(r.cols)) + for i := range values { + values[i] = new(interface{}) + } + if err := rs.Scan(values...); err != nil { + return err + } + row := make([]driver.Value, len(r.cols)) + for i, v := range values { + row[i] = driver.Value(v) + } + r.rows = append(r.rows, row) + } + return rs.Err() +} + +type rowSets struct { + sets []*rows + pos int +} + +func (rs *rowSets) Columns() []string { + return rs.sets[rs.pos].cols +} + +func (rs *rowSets) ColumnTypeDatabaseTypeName(index int) string { + return rs.sets[rs.pos].ColumnTypeDatabaseTypeName(index) +} + +func (rs *rowSets) Close() error { + return nil +} + +// advances to next row +func (rs *rowSets) Next(dest []driver.Value) error { + return rs.sets[rs.pos].Next(dest) +} + +func mapNamedArgs(args []driver.NamedValue) (res []interface{}) { + res = make([]interface{}, len(args)) + for i := range args { + name := args[i].Name + if name != "" { + res[i] = sql.Named(name, args[i].Value) + } else { + res[i] = args[i].Value + } + } + return +} From b0e6080e99e708058f98c1def4d1ba08bc4ee172 Mon Sep 17 00:00:00 2001 From: pavel-raykov Date: Mon, 4 Nov 2024 18:20:46 +0100 Subject: [PATCH 2/5] Minor --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index aa61037b9..7e1981644 100644 --- a/go.mod +++ b/go.mod @@ -100,7 +100,7 @@ require ( github.com/x448/float16 v0.8.4 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.30.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect - go.uber.org/multierr v1.11.0 + go.uber.org/multierr v1.11.0 // indirect golang.org/x/mod v0.21.0 // indirect golang.org/x/net v0.29.0 // indirect golang.org/x/sync v0.8.0 // indirect From 43fc5de85ca1b4e279f41907f925f3b6dfe5a897 Mon Sep 17 00:00:00 2001 From: pavel-raykov Date: Mon, 4 Nov 2024 18:23:15 +0100 Subject: [PATCH 3/5] Minor --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 7e1981644..aa61037b9 100644 --- a/go.mod +++ b/go.mod @@ -100,7 +100,7 @@ require ( github.com/x448/float16 v0.8.4 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.30.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect - go.uber.org/multierr v1.11.0 // indirect + go.uber.org/multierr v1.11.0 golang.org/x/mod v0.21.0 // indirect golang.org/x/net v0.29.0 // indirect golang.org/x/sync v0.8.0 // indirect From 13d7e28351e5fbd033f2b04c2a3ca22b782f50ee Mon Sep 17 00:00:00 2001 From: pavel-raykov Date: Tue, 5 Nov 2024 11:54:34 +0100 Subject: [PATCH 4/5] Added tests --- go.mod | 13 ++++++++ go.sum | 23 +++++++++++++++ pkg/sqlutil/txdb_test.go | 64 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 100 insertions(+) create mode 100644 pkg/sqlutil/txdb_test.go diff --git a/go.mod b/go.mod index aa61037b9..6f0faa398 100644 --- a/go.mod +++ b/go.mod @@ -27,10 +27,12 @@ require ( github.com/jpillora/backoff v1.0.0 github.com/lib/pq v1.10.9 github.com/linkedin/goavro/v2 v2.12.0 + github.com/marcboeker/go-duckdb v1.8.2 github.com/pelletier/go-toml/v2 v2.2.0 github.com/prometheus/client_golang v1.17.0 github.com/riferrei/srclient v0.5.4 github.com/santhosh-tekuri/jsonschema/v5 v5.2.0 + github.com/scylladb/go-reflectx v1.0.1 github.com/shopspring/decimal v1.4.0 github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7 github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12 @@ -63,6 +65,17 @@ require ( sigs.k8s.io/yaml v1.4.0 ) +require ( + github.com/apache/arrow/go/v17 v17.0.0 // indirect + github.com/goccy/go-json v0.10.3 // indirect + github.com/google/flatbuffers v24.3.25+incompatible // indirect + github.com/klauspost/compress v1.17.9 // indirect + github.com/klauspost/cpuid/v2 v2.2.8 // indirect + github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect + github.com/zeebo/xxh3 v1.0.2 // indirect +) + require ( github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/beorn7/perks v1.0.1 // indirect diff --git a/go.sum b/go.sum index 10f49bb5d..cd8ed5503 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,8 @@ github.com/Microsoft/hcsshim v0.9.4 h1:mnUj0ivWy6UzbB1uLFqKR6F+ZyiDc7j4iGgHTpO+5 github.com/Microsoft/hcsshim v0.9.4/go.mod h1:7pLA8lDk46WKDWlVsENo92gC0XFa8rbKfyFRBqxEbCc= github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= +github.com/apache/arrow/go/v17 v17.0.0 h1:RRR2bdqKcdbss9Gxy2NS/hK8i4LDMh23L6BbkN5+F54= +github.com/apache/arrow/go/v17 v17.0.0/go.mod h1:jR7QHkODl15PfYyjM2nU+yTLScZ/qfj7OSUZmJ8putc= github.com/atombender/go-jsonschema v0.16.1-0.20240916205339-a74cd4e2851c h1:cxQVoh6kY+c4b0HUchHjGWBI8288VhH50qxKG3hdEg0= github.com/atombender/go-jsonschema v0.16.1-0.20240916205339-a74cd4e2851c/go.mod h1:3XzxudkrYVUvbduN/uI2fl4lSrMSzU0+3RCu2mpnfx8= github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= @@ -78,6 +80,8 @@ github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpv github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/go-viper/mapstructure/v2 v2.1.0 h1:gHnMa2Y/pIxElCH2GlZZ1lZSsn6XMtufpGyP1XxdC/w= github.com/go-viper/mapstructure/v2 v2.1.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= +github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/goccy/go-yaml v1.12.0 h1:/1WHjnMsI1dlIBQutrvSMGZRQufVO3asrHfTwfACoPM= github.com/goccy/go-yaml v1.12.0/go.mod h1:wKnAMd44+9JAAnGQpWVEgBzGt3YuTaQ4uXoHvE4m7WU= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= @@ -103,6 +107,8 @@ github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6 github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb h1:PBC98N2aIaM3XXiurYmW7fx4GZkL8feAMVq7nEjURHk= github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/flatbuffers v24.3.25+incompatible h1:CX395cjN9Kke9mmalRoL3d81AtFUxJM+yDthflgJGkI= +github.com/google/flatbuffers v24.3.25+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -143,6 +149,10 @@ github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFF github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= +github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -158,6 +168,8 @@ github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamh github.com/magiconair/properties v1.8.6/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/marcboeker/go-duckdb v1.8.2 h1:gHcFjt+HcPSpDVjPSzwof+He12RS+KZPwxcfoVP8Yx4= +github.com/marcboeker/go-duckdb v1.8.2/go.mod h1:2oV8BZv88S16TKGKM+Lwd0g7DX84x0jMxjTInThC8Is= github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= @@ -173,6 +185,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zk github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/mitchellh/go-wordwrap v1.0.1 h1:TLuKupo69TCn6TQSyGxwI1EblZZEsQ0vMlAFQflz0v0= github.com/mitchellh/go-wordwrap v1.0.1/go.mod h1:R62XHJLzvMFRBbcrT7m7WgmE1eOyTSsCt+hzestvNj0= +github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= +github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/moby/sys/mount v0.3.3 h1:fX1SVkXFJ47XWDoeFW4Sq7PdQJnV2QIDZAqjNqgEjUs= github.com/moby/sys/mount v0.3.3/go.mod h1:PBaEorSNTLG5t/+4EgukEQVlAvVEc6ZjTySwKdqp5K0= github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vygl78= @@ -193,6 +207,8 @@ github.com/opencontainers/runc v1.1.3 h1:vIXrkId+0/J2Ymu2m7VjGvbSlAId9XNRPhn2p4b github.com/opencontainers/runc v1.1.3/go.mod h1:1J5XiS+vdZ3wCyZybsuxXZWGrgSr8fFJHLXuG2PsnNg= github.com/pelletier/go-toml/v2 v2.2.0 h1:QLgLl2yMN7N+ruc31VynXs1vhMZa7CeHHejIeBAsoHo= github.com/pelletier/go-toml/v2 v2.2.0/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -217,6 +233,8 @@ github.com/sanity-io/litter v1.5.5/go.mod h1:9gzJgR2i4ZpjZHsKvUXIRQVk7P+yM3e+jAF github.com/santhosh-tekuri/jsonschema/v5 v5.0.0/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H/cDPdvJ/SZJQLWWXWGrZ0= github.com/santhosh-tekuri/jsonschema/v5 v5.2.0 h1:WCcC4vZDS1tYNxjWlwRJZQy28r8CMoggKnxNzxsVDMQ= github.com/santhosh-tekuri/jsonschema/v5 v5.2.0/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H/cDPdvJ/SZJQLWWXWGrZ0= +github.com/scylladb/go-reflectx v1.0.1 h1:b917wZM7189pZdlND9PbIJ6NQxfDPfBvUaQ7cjj1iZQ= +github.com/scylladb/go-reflectx v1.0.1/go.mod h1:rWnOfDIRWBGN0miMLIcoPt/Dhi2doCMZqwMCJ3KupFc= github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= @@ -249,6 +267,10 @@ github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+x github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= +github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= +github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0 h1:vS1Ao/R55RNV4O7TA2Qopok8yN+X0LIP6RVWLFkprck= @@ -347,6 +369,7 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/pkg/sqlutil/txdb_test.go b/pkg/sqlutil/txdb_test.go new file mode 100644 index 000000000..f4897cb6d --- /dev/null +++ b/pkg/sqlutil/txdb_test.go @@ -0,0 +1,64 @@ +package sqlutil + +import ( + "context" + "database/sql" + "github.com/google/uuid" + "github.com/jmoiron/sqlx" + _ "github.com/marcboeker/go-duckdb" + "github.com/scylladb/go-reflectx" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +func TestTxDBDriver(t *testing.T) { + duckdb, err := sql.Open("duckdb", "") + const driverName = "txdb_test" + sql.Register(driverName, &txDriver{ + db: duckdb, + conns: make(map[string]*conn), + }) + sqlx.BindDriver(driverName, sqlx.DOLLAR) + + db, err := sqlx.Open(driverName, uuid.New().String()) + require.NoError(t, err) + t.Cleanup(func() { assert.NoError(t, db.Close()) }) + db.MapperFunc(reflectx.CamelToSnakeASCII) + + dropTable := func() error { + _, err := db.Exec(`DROP TABLE IF EXISTS txdb_test`) + return err + } + // clean up, if previous tests failed + err = dropTable() + assert.NoError(t, err) + _, err = db.Exec(`CREATE TABLE txdb_test (id TEXT NOT NULL)`) + assert.NoError(t, err) + t.Cleanup(func() { + _ = dropTable() + }) + _, err = db.Exec(`INSERT INTO txdb_test VALUES ($1)`, uuid.New().String()) + assert.NoError(t, err) + ensureValuesPresent := func(t *testing.T, db *sqlx.DB) { + var ids []string + err = db.Select(&ids, `SELECT id from txdb_test`) + assert.NoError(t, err) + assert.Len(t, ids, 1) + } + + ensureValuesPresent(t, db) + t.Run("Cancel of tx's context does not trigger rollback of driver's tx", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + _, err := db.BeginTx(ctx, nil) + assert.NoError(t, err) + cancel() + + // BeginTx spawns separate goroutine that rollbacks the tx and tries to close underlying connection, unless + // db driver says that connection is still active. + // This approach is not ideal, but there is no better way to wait for independent goroutine to complete + time.Sleep(time.Second * 2) + ensureValuesPresent(t, db) + }) +} From 19475083c3fab1aaf2987fd71432b8d5b9850a00 Mon Sep 17 00:00:00 2001 From: pavel-raykov Date: Tue, 5 Nov 2024 14:29:09 +0100 Subject: [PATCH 5/5] Add test --- pkg/sqlutil/txdb_test.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/pkg/sqlutil/txdb_test.go b/pkg/sqlutil/txdb_test.go index f4897cb6d..125c9c3b3 100644 --- a/pkg/sqlutil/txdb_test.go +++ b/pkg/sqlutil/txdb_test.go @@ -61,4 +61,24 @@ func TestTxDBDriver(t *testing.T) { time.Sleep(time.Second * 2) ensureValuesPresent(t, db) }) + + t.Run("Test statement", func(t *testing.T) { + stmt, err := db.Prepare("SELECT id FROM txdb_test") + defer stmt.Close() + assert.NoError(t, err) + rows, err := stmt.Query() + assert.True(t, rows.Next()) + var id string + rows.Scan(id) + assert.False(t, rows.Next()) + + _, err = stmt.Exec() + assert.NoError(t, err) + + _, err = stmt.ExecContext(context.Background()) + assert.NoError(t, err) + + _, err = stmt.QueryContext(context.Background()) + assert.NoError(t, err) + }) }