Skip to content

Commit

Permalink
pg: colinfo, scan vals, and QueryScan funcs
Browse files Browse the repository at this point in the history
This adds the ability to scan query results into provided variables
instead of relying on pgx Row.Values() to choose the type. This provides
some foundational components for table statistics collection.

The sql.QueryScanner interface is the advanced version of Execute that
uses caller-provided scan values and a function to run for each scanned
row:

// QueryScanner represents a type that provides the ability to execute an SQL
// statement, where for each row:
//
//  1. result values are scanned into the variables in the scans slice
//  2. the provided function is then called
//
// The function would typically capture the variables in the scans slice,
// allowing it to operator on the values. For instance, append the values to
// slices allocated by the caller, or perform reduction operations like
// sum/mean/min/etc.
//
// NOTE: This method may end up being included in the Tx interface alongside
// Executor since all of the concrete transaction implementations provided by
// this package implement this method.
type QueryScanner interface {
	QueryScanFn(ctx context.Context, stmt string,
		scans []any, fn func() error, args ...any) error
}

Each transaction type in the pg package satisfies the sql.QueryScanner
interface.

The pg.QueryRowFunc function executes an SQL statement, handling the rows
and returned values as described by the sql.QueryScanner interface.

The pg.QueryRowFuncAny is similar to pg.QueryRowFunc, except that no scan
values slice is provided. The provided function is called for each row
of the result. The caller does not determine the types of the Go variables
in the values slice. In this way it behaves similar to Execute, but
providing "for each row" semantics so that every row does not need to
be loaded into memory.

Table statistics collection: beginning with a simplified sql.Statistics
struct based on the types proposed in the initial unmerged query cost
branch, the pg package provides the following new methods aimed at the
(relatively expensive) collection of ground truth table statistics:

  - RowCount provides an exact row count
  - colStats computes column-wise statistics
  - TableStats uses the above functions to build a *sql.Statistics for a table.

These methods are will not be used routinely. We will have incremental
updates, but there are cases where a full scan may be needed to obtain
the ground truth statistics.

pg: decimal and uint256 use pgNumericToDecimal helper

Use the pgNumericToDecimal helper to reuse the logic to convert from
pgtypes.Numeric to either our decimal.Decimal or types.Uint256 in the
recent pgtype decoding added to the query helper for interpreting the
values returned by row.Values() in pgx.CollectRows.

types,decimal: sql scan/value for uint256 and decimal and arrays

nulls with uint256 and decimal

deps: update pgx module from 5.5.5 to 5.6.0
  • Loading branch information
jchappelow committed Aug 12, 2024
1 parent a499074 commit 3aecf89
Show file tree
Hide file tree
Showing 17 changed files with 1,792 additions and 145 deletions.
19 changes: 19 additions & 0 deletions common/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,25 @@ type Executor interface {
Execute(ctx context.Context, stmt string, args ...any) (*ResultSet, error)
}

// QueryScanner represents a type that provides the ability to execute an SQL
// statement, where for each row:
//
// 1. result values are scanned into the variables in the scans slice
// 2. the provided function is then called
//
// The function would typically capture the variables in the scans slice,
// allowing it to operator on the values. For instance, append the values to
// slices allocated by the caller, or perform reduction operations like
// sum/mean/min/etc.
//
// NOTE: This method may end up being included in the Tx interface alongside
// Executor since all of the concrete transaction implementations provided by
// this package implement this method.
type QueryScanner interface {
QueryScanFn(ctx context.Context, stmt string,
scans []any, fn func() error, args ...any) error
}

// TxMaker is an interface that creates a new transaction. In the context of the
// recursive Tx interface, is creates a nested transaction.
type TxMaker interface {
Expand Down
73 changes: 73 additions & 0 deletions common/sql/statistics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package sql

// NOTE: this file is TRANSITIONAL! These types are lifted from the
// unmerged internal/engine/costs/datatypes package.

import (
"fmt"
"strings"
)

// Statistics contains statistics about a table or a Plan. A Statistics can be
// derived directly from the underlying table, or derived from the statistics of
// its children.
type Statistics struct {
RowCount int64

ColumnStatistics []ColumnStatistics

//Selectivity, for plan statistics
}

func (s *Statistics) String() string {
var st strings.Builder
fmt.Fprintf(&st, "RowCount: %d", s.RowCount)
if len(s.ColumnStatistics) > 0 {
fmt.Fprintln(&st, "")
}
for i, cs := range s.ColumnStatistics {
fmt.Fprintf(&st, " Column %d:\n", i)
fmt.Fprintf(&st, " - Min/Max = %v / %v\n", cs.Min, cs.Max)
fmt.Fprintf(&st, " - NULL count = %v\n", cs.NullCount)
}
return st.String()
}

type ValCount struct {
Val any
Count int
}

// ColumnStatistics contains statistics about a column.
type ColumnStatistics struct {
NullCount int64

Min any
MinCount int

Max any
MaxCount int

// MCVs are the most common values. It should be sorted by the value. It
// should also be limited capacity, which means scan order has to be
// deterministic since we have to throw out same-frequency observations.
// (crap) Solution: multi-pass scan, merge lists, continue until no higher
// freq values observed? OR when capacity reached, use a histogram? Do not
// throw away MCVs, just start putting additional observations in to the
// histogram instead.
// MCVs []ValCount
// MCVs map[cmp.Ordered]

// MCVals []any
// MCFreqs []int

// DistinctCount is harder. For example, unless we sub-sample
// (deterministically), tracking distinct values could involve a data
// structure with the same number of elements as rows in the table.
DistinctCount int64

AvgSize int64 // maybe: length of text, length of array, otherwise not used for scalar?

// without histogram, we can make uniformity assumption to simplify the cost model
//Histogram []HistogramBucket
}
65 changes: 43 additions & 22 deletions core/types/decimal/decimal.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func inferPrecisionAndScale(s string) (precision, scale uint16) {
s = strings.TrimLeft(s, "-+")
parts := strings.Split(s, ".")

// remove 0s from the left part, siince 001.23 is the same as 1.23
// remove 0s from the left part, since 001.23 is the same as 1.23
parts[0] = strings.TrimLeft(parts[0], "0")

intPart := uint16(len(parts[0]))
Expand Down Expand Up @@ -279,17 +279,57 @@ func (d *Decimal) Sign() int {
return d.dec.Sign()
}

func (d Decimal) NaN() bool {
switch d.dec.Form {
case apd.NaN, apd.NaNSignaling:
return true
}
return false
}

func (d Decimal) Inf() bool {
return d.dec.Form == apd.Infinite
}

// Value implements the database/sql/driver.Valuer interface. It converts d to a
// string.
func (d Decimal) Value() (driver.Value, error) {
// NOTE: we're currently (ab)using the NaN case to handle scanning of NULL
// values. Match that here. We may want something different though.
if d.dec.Form == apd.NaN {
return nil, nil
}
return d.dec.Value()
}

var _ driver.Valuer = &Decimal{}

// Scan implements the database/sql.Scanner interface.
func (d *Decimal) Scan(src interface{}) error {
return d.dec.Scan(src)
if src == nil {
*d = Decimal{
dec: apd.Decimal{Form: apd.NaN},
}
return nil
}

s, ok := src.(string)
if !ok {
var dec apd.Decimal
err := dec.Scan(src)
if err != nil {
return err
}
s = dec.String()
}

// set scale and prec from the string
d2, err := NewFromString(s)
if err != nil {
return err
}
*d = *d2
return nil
}

var _ sql.Scanner = &Decimal{}
Expand Down Expand Up @@ -425,6 +465,7 @@ func Cmp(x, y *Decimal) (int64, error) {
}

return z.Int64()
// return x.dec.Cmp(&y.dec)
}

// CheckPrecisionAndScale checks if the precision and scale are valid.
Expand Down Expand Up @@ -459,23 +500,3 @@ func (da DecimalArray) Value() (driver.Value, error) {
}

var _ driver.Valuer = (*DecimalArray)(nil)

// Scan implements the sql.Scanner interface.
func (da *DecimalArray) Scan(src interface{}) error {
switch s := src.(type) {
case []string:
*da = make(DecimalArray, len(s))
for i, str := range s {
d, err := NewFromString(str)
if err != nil {
return err
}

(*da)[i] = d
}

return nil
}

return fmt.Errorf("cannot convert %T to DecimalArray", src)
}
77 changes: 51 additions & 26 deletions core/types/uint256.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package types

import (
"database/sql"
"database/sql/driver"
"fmt"
"math/big"
Expand All @@ -12,12 +13,13 @@ import (
// It is mostly a wrapper around github.com/holiman/uint256.Int, but includes
// extra methods for usage in Postgres.
type Uint256 struct {
uint256.Int
base uint256.Int // not exporting massive method set, which also has params and returns of holiman types
Null bool
}

// Uint256FromInt creates a new Uint256 from an int.
func Uint256FromInt(i uint64) *Uint256 {
return &Uint256{Int: *uint256.NewInt(i)}
return &Uint256{base: *uint256.NewInt(i)}
}

// Uint256FromString creates a new Uint256 from a string.
Expand All @@ -26,7 +28,7 @@ func Uint256FromString(s string) (*Uint256, error) {
if err != nil {
return nil, err
}
return &Uint256{Int: *i}, nil
return &Uint256{base: *i}, nil
}

// Uint256FromBig creates a new Uint256 from a big.Int.
Expand All @@ -40,8 +42,33 @@ func Uint256FromBytes(b []byte) (*Uint256, error) {
return Uint256FromBig(bigInt)
}

func (u Uint256) String() string {
return u.base.String()
}

func (u Uint256) Bytes() []byte {
return u.base.Bytes()
}

func (u Uint256) ToBig() *big.Int {
return u.base.ToBig()
}

func (u Uint256) MarshalJSON() ([]byte, error) {
return []byte(u.String()), nil
return []byte(u.base.String()), nil // ? json ?
}

func (u *Uint256) Clone() *Uint256 {
v := *u
return &v
}

func (u *Uint256) Cmp(v *Uint256) int {
return u.base.Cmp(&v.base)
}

func CmpUint256(u, v *Uint256) int {
return u.Cmp(v)
}

func (u *Uint256) UnmarshalJSON(b []byte) error {
Expand All @@ -50,16 +77,20 @@ func (u *Uint256) UnmarshalJSON(b []byte) error {
return err
}

u.Int = u2.Int
u.base = u2.base
return nil
}

// Value implements the driver.Valuer interface.
func (u Uint256) Value() (driver.Value, error) {
if u.Null {
return nil, nil
}
return u.String(), nil
}

var _ driver.Valuer = Uint256{}
var _ driver.Valuer = (*Uint256)(nil)

// Scan implements the sql.Scanner interface.
func (u *Uint256) Scan(src interface{}) error {
Expand All @@ -70,21 +101,28 @@ func (u *Uint256) Scan(src interface{}) error {
return err
}

u.Int = u2.Int
u.base = u2.base
u.Null = false
return nil

case nil:
u.Null = true
u.base.Clear()
return nil
}

return fmt.Errorf("cannot convert %T to Uint256", src)
}

var _ driver.Valuer = (*Uint256)(nil)
var _ driver.Valuer = (*Uint256)(nil)
var _ sql.Scanner = (*Uint256)(nil)

// Uint256Array is an array of Uint256s.
type Uint256Array []*Uint256

// Value implements the driver.Valuer interface.
func (ua Uint256Array) Value() (driver.Value, error) {
// Even when implementing pgtype.ArrayGetter we still need this, so that the
// pgx driver can use it's wrapSliceEncodePlan.
strs := make([]string, len(ua))
for i, u := range ua {
strs[i] = u.String()
Expand All @@ -95,21 +133,8 @@ func (ua Uint256Array) Value() (driver.Value, error) {

var _ driver.Valuer = (*Uint256Array)(nil)

// Scan implements the sql.Scanner interface.
func (ua *Uint256Array) Scan(src interface{}) error {
switch s := src.(type) {
case []string:
*ua = make(Uint256Array, len(s))
for i, str := range s {
u, err := Uint256FromString(str)
if err != nil {
return err
}

(*ua)[i] = u
}
return nil
}

return fmt.Errorf("cannot convert %T to Uint256Array", src)
}
// Uint256Array is a slice of Scanners. pgx at least is smart enough to make
// this work automatically!
// Another approach is to implement pgx.ArraySetter and pgx.ArrayGetter like
// similar in effect to:
// type Uint256Array pgtype.FlatArray[*Uint256]
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1
github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9
github.com/jackc/pgx/v5 v5.5.5
github.com/jackc/pgx/v5 v5.6.0
github.com/jpillora/backoff v1.0.0
github.com/kwilteam/kwil-db/core v0.2.0
github.com/kwilteam/kwil-db/parse v0.2.0-beta.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,8 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 h1:L0QtFUgDarD7Fpv9jeVMgy/+Ec0mtnmYuImjTz6dtDA=
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw=
github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A=
github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY=
github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
Expand Down
7 changes: 7 additions & 0 deletions internal/sql/pg/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,13 @@ func (p *Pool) Close() error {
// BeginTx starts a read-write transaction. It is an error to call this twice
// without first closing the initial transaction.
func (p *Pool) BeginTx(ctx context.Context) (sql.Tx, error) {
return p.begin(ctx)
}

// begin is the unexported version of BeginTx that returns a concrete type
// instead of an interface, which is required of the exported method to satisfy
// the sql.TxMaker interface.
func (p *Pool) begin(ctx context.Context) (*nestedTx, error) {
tx, err := p.writer.BeginTx(ctx, pgx.TxOptions{
AccessMode: pgx.ReadWrite,
IsoLevel: pgx.ReadCommitted,
Expand Down
Loading

0 comments on commit 3aecf89

Please sign in to comment.