Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pg: advanced query feats, table inspection, and stats #901

Merged
merged 1 commit into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions common/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,25 @@ type Executor interface {
Execute(ctx context.Context, stmt string, args ...any) (*ResultSet, error)
}

// QueryScanner represents a type that provides the ability to execute an SQL
// statement, where for each row:
//
// 1. result values are scanned into the variables in the scans slice
// 2. the provided function is then called
//
// The function would typically capture the variables in the scans slice,
// allowing it to operator on the values. For instance, append the values to
// slices allocated by the caller, or perform reduction operations like
// sum/mean/min/etc.
//
// NOTE: This method may end up being included in the Tx interface alongside
// Executor since all of the concrete transaction implementations provided by
// this package implement this method.
type QueryScanner interface {
QueryScanFn(ctx context.Context, stmt string,
scans []any, fn func() error, args ...any) error
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall seems find, but what is the reason for not doing something more conventional, similar to the standard library's sql.Rows (containing Next(), Scan(), etc.)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall seems find, but what is the reason for not doing something more conventional, similar to the standard library's sql.Rows (containing Next(), Scan(), etc.)?

Basically just no real reason to introduce a rows type. The for each helpers in the pgx package were also a more intuitive building block for this method.


// TxMaker is an interface that creates a new transaction. In the context of the
// recursive Tx interface, is creates a nested transaction.
type TxMaker interface {
Expand Down
73 changes: 73 additions & 0 deletions common/sql/statistics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package sql

// NOTE: this file is TRANSITIONAL! These types are lifted from the
// unmerged internal/engine/costs/datatypes package.

import (
"fmt"
"strings"
)

// Statistics contains statistics about a table or a Plan. A Statistics can be
// derived directly from the underlying table, or derived from the statistics of
// its children.
type Statistics struct {
RowCount int64

ColumnStatistics []ColumnStatistics

//Selectivity, for plan statistics
}

func (s *Statistics) String() string {
var st strings.Builder
fmt.Fprintf(&st, "RowCount: %d", s.RowCount)
if len(s.ColumnStatistics) > 0 {
fmt.Fprintln(&st, "")
}
for i, cs := range s.ColumnStatistics {
fmt.Fprintf(&st, " Column %d:\n", i)
fmt.Fprintf(&st, " - Min/Max = %v / %v\n", cs.Min, cs.Max)
fmt.Fprintf(&st, " - NULL count = %v\n", cs.NullCount)
}
return st.String()
}

type ValCount struct {
Val any
Count int
}

// ColumnStatistics contains statistics about a column.
type ColumnStatistics struct {
NullCount int64

Min any
MinCount int

Max any
MaxCount int

// MCVs are the most common values. It should be sorted by the value. It
// should also be limited capacity, which means scan order has to be
// deterministic since we have to throw out same-frequency observations.
// (crap) Solution: multi-pass scan, merge lists, continue until no higher
// freq values observed? OR when capacity reached, use a histogram? Do not
// throw away MCVs, just start putting additional observations in to the
// histogram instead.
// MCVs []ValCount
// MCVs map[cmp.Ordered]

// MCVals []any
// MCFreqs []int

// DistinctCount is harder. For example, unless we sub-sample
// (deterministically), tracking distinct values could involve a data
// structure with the same number of elements as rows in the table.
DistinctCount int64

AvgSize int64 // maybe: length of text, length of array, otherwise not used for scalar?

// without histogram, we can make uniformity assumption to simplify the cost model
//Histogram []HistogramBucket
}
65 changes: 43 additions & 22 deletions core/types/decimal/decimal.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func inferPrecisionAndScale(s string) (precision, scale uint16) {
s = strings.TrimLeft(s, "-+")
parts := strings.Split(s, ".")

// remove 0s from the left part, siince 001.23 is the same as 1.23
// remove 0s from the left part, since 001.23 is the same as 1.23
parts[0] = strings.TrimLeft(parts[0], "0")

intPart := uint16(len(parts[0]))
Expand Down Expand Up @@ -279,17 +279,57 @@ func (d *Decimal) Sign() int {
return d.dec.Sign()
}

func (d Decimal) NaN() bool {
switch d.dec.Form {
case apd.NaN, apd.NaNSignaling:
return true
}
return false
}

func (d Decimal) Inf() bool {
return d.dec.Form == apd.Infinite
}

// Value implements the database/sql/driver.Valuer interface. It converts d to a
// string.
func (d Decimal) Value() (driver.Value, error) {
// NOTE: we're currently (ab)using the NaN case to handle scanning of NULL
// values. Match that here. We may want something different though.
if d.dec.Form == apd.NaN {
return nil, nil
}
return d.dec.Value()
}

var _ driver.Valuer = &Decimal{}

// Scan implements the database/sql.Scanner interface.
func (d *Decimal) Scan(src interface{}) error {
return d.dec.Scan(src)
if src == nil {
*d = Decimal{
dec: apd.Decimal{Form: apd.NaN},
}
return nil
}

s, ok := src.(string)
if !ok {
var dec apd.Decimal
err := dec.Scan(src)
if err != nil {
return err
}
s = dec.String()
}

// set scale and prec from the string
d2, err := NewFromString(s)
if err != nil {
return err
}
*d = *d2
return nil
}

var _ sql.Scanner = &Decimal{}
Expand Down Expand Up @@ -425,6 +465,7 @@ func Cmp(x, y *Decimal) (int64, error) {
}

return z.Int64()
// return x.dec.Cmp(&y.dec)
}

// CheckPrecisionAndScale checks if the precision and scale are valid.
Expand Down Expand Up @@ -459,23 +500,3 @@ func (da DecimalArray) Value() (driver.Value, error) {
}

var _ driver.Valuer = (*DecimalArray)(nil)

// Scan implements the sql.Scanner interface.
func (da *DecimalArray) Scan(src interface{}) error {
switch s := src.(type) {
case []string:
*da = make(DecimalArray, len(s))
for i, str := range s {
d, err := NewFromString(str)
if err != nil {
return err
}

(*da)[i] = d
}

return nil
}

return fmt.Errorf("cannot convert %T to DecimalArray", src)
}
77 changes: 51 additions & 26 deletions core/types/uint256.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package types

import (
"database/sql"
"database/sql/driver"
"fmt"
"math/big"
Expand All @@ -12,12 +13,13 @@ import (
// It is mostly a wrapper around github.com/holiman/uint256.Int, but includes
// extra methods for usage in Postgres.
type Uint256 struct {
uint256.Int
base uint256.Int // not exporting massive method set, which also has params and returns of holiman types
Null bool
}

// Uint256FromInt creates a new Uint256 from an int.
func Uint256FromInt(i uint64) *Uint256 {
return &Uint256{Int: *uint256.NewInt(i)}
return &Uint256{base: *uint256.NewInt(i)}
}

// Uint256FromString creates a new Uint256 from a string.
Expand All @@ -26,7 +28,7 @@ func Uint256FromString(s string) (*Uint256, error) {
if err != nil {
return nil, err
}
return &Uint256{Int: *i}, nil
return &Uint256{base: *i}, nil
}

// Uint256FromBig creates a new Uint256 from a big.Int.
Expand All @@ -40,8 +42,33 @@ func Uint256FromBytes(b []byte) (*Uint256, error) {
return Uint256FromBig(bigInt)
}

func (u Uint256) String() string {
return u.base.String()
}

func (u Uint256) Bytes() []byte {
return u.base.Bytes()
}

func (u Uint256) ToBig() *big.Int {
return u.base.ToBig()
}

func (u Uint256) MarshalJSON() ([]byte, error) {
return []byte(u.String()), nil
return []byte(u.base.String()), nil // ? json ?
}

func (u *Uint256) Clone() *Uint256 {
v := *u
return &v
}

func (u *Uint256) Cmp(v *Uint256) int {
return u.base.Cmp(&v.base)
}

func CmpUint256(u, v *Uint256) int {
return u.Cmp(v)
}

func (u *Uint256) UnmarshalJSON(b []byte) error {
Expand All @@ -50,16 +77,20 @@ func (u *Uint256) UnmarshalJSON(b []byte) error {
return err
}

u.Int = u2.Int
u.base = u2.base
return nil
}

// Value implements the driver.Valuer interface.
func (u Uint256) Value() (driver.Value, error) {
if u.Null {
return nil, nil
}
return u.String(), nil
}

var _ driver.Valuer = Uint256{}
var _ driver.Valuer = (*Uint256)(nil)

// Scan implements the sql.Scanner interface.
func (u *Uint256) Scan(src interface{}) error {
Expand All @@ -70,21 +101,28 @@ func (u *Uint256) Scan(src interface{}) error {
return err
}

u.Int = u2.Int
u.base = u2.base
u.Null = false
return nil

case nil:
u.Null = true
u.base.Clear()
return nil
}

return fmt.Errorf("cannot convert %T to Uint256", src)
}

var _ driver.Valuer = (*Uint256)(nil)
var _ driver.Valuer = (*Uint256)(nil)
var _ sql.Scanner = (*Uint256)(nil)

// Uint256Array is an array of Uint256s.
type Uint256Array []*Uint256

// Value implements the driver.Valuer interface.
func (ua Uint256Array) Value() (driver.Value, error) {
// Even when implementing pgtype.ArrayGetter we still need this, so that the
// pgx driver can use it's wrapSliceEncodePlan.
strs := make([]string, len(ua))
for i, u := range ua {
strs[i] = u.String()
Expand All @@ -95,21 +133,8 @@ func (ua Uint256Array) Value() (driver.Value, error) {

var _ driver.Valuer = (*Uint256Array)(nil)

// Scan implements the sql.Scanner interface.
func (ua *Uint256Array) Scan(src interface{}) error {
switch s := src.(type) {
case []string:
*ua = make(Uint256Array, len(s))
for i, str := range s {
u, err := Uint256FromString(str)
if err != nil {
return err
}

(*ua)[i] = u
}
return nil
}

return fmt.Errorf("cannot convert %T to Uint256Array", src)
}
// Uint256Array is a slice of Scanners. pgx at least is smart enough to make
// this work automatically!
// Another approach is to implement pgx.ArraySetter and pgx.ArrayGetter like
// similar in effect to:
// type Uint256Array pgtype.FlatArray[*Uint256]
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1
github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9
github.com/jackc/pgx/v5 v5.5.5
github.com/jackc/pgx/v5 v5.6.0
github.com/jpillora/backoff v1.0.0
github.com/kwilteam/kwil-db/core v0.2.0
github.com/kwilteam/kwil-db/parse v0.2.0-beta.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,8 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 h1:L0QtFUgDarD7Fpv9jeVMgy/+Ec0mtnmYuImjTz6dtDA=
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw=
github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A=
github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY=
github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
Expand Down
7 changes: 7 additions & 0 deletions internal/sql/pg/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,13 @@ func (p *Pool) Close() error {
// BeginTx starts a read-write transaction. It is an error to call this twice
// without first closing the initial transaction.
func (p *Pool) BeginTx(ctx context.Context) (sql.Tx, error) {
return p.begin(ctx)
}

// begin is the unexported version of BeginTx that returns a concrete type
// instead of an interface, which is required of the exported method to satisfy
// the sql.TxMaker interface.
func (p *Pool) begin(ctx context.Context) (*nestedTx, error) {
tx, err := p.writer.BeginTx(ctx, pgx.TxOptions{
AccessMode: pgx.ReadWrite,
IsoLevel: pgx.ReadCommitted,
Expand Down
Loading
Loading