Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf enhancements #710

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"crypto/sha256"
"encoding/hex"
"fmt"

"github.com/pg-sharding/spqr/pkg/client"
"github.com/pg-sharding/spqr/pkg/conn"
"github.com/pg-sharding/spqr/pkg/spqrlog"
Expand Down
8 changes: 5 additions & 3 deletions pkg/datashard/datashard.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type Conn struct {

tx_served int64

id string
idSelf uint

status txstatus.TXStatus

Expand Down Expand Up @@ -351,7 +351,7 @@ func (sh *Conn) SHKey() kr.ShardKey {
// Returns:
// - uint: The unique identifier of the Conn struct.
func (sh *Conn) ID() uint {
return spqrlog.GetPointer(sh)
return sh.idSelf
}

// Usr returns the username associated with the Conn struct.
Expand Down Expand Up @@ -420,6 +420,8 @@ func NewShard(
stmtDesc: map[uint64]*prepstatement.PreparedStatementDescriptor{},
}

dtSh.idSelf = spqrlog.GetPointer(dtSh)

dtSh.dedicated = pgi

if dtSh.dedicated.Status() == conn.NotInitialized {
Expand Down Expand Up @@ -520,7 +522,7 @@ func (sh *Conn) fire(q string) error {
return err
} else {
spqrlog.Zero.Debug().
Str("shard", sh.id).
Uint("id", sh.ID()).
Type("type", msg).
Msg("shard rollback response")

Expand Down
4 changes: 2 additions & 2 deletions pkg/pool/dbpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,14 +355,14 @@ func (s *InstancePoolImpl) ForEach(cb func(sh shard.Shardinfo) error) error {
func (s *InstancePoolImpl) Put(sh shard.Shard) error {
if sh.Sync() != 0 {
spqrlog.Zero.Error().
Uint("shard", spqrlog.GetPointer(sh)).
Uint("shard", sh.ID()).
Int64("sync", sh.Sync()).
Msg("discarding unsync connection")
return s.pool.Discard(sh)
}
if sh.TxStatus() != txstatus.TXIDLE {
spqrlog.Zero.Error().
Uint("shard", spqrlog.GetPointer(sh)).
Uint("shard", sh.ID()).
Str("txstatus", sh.TxStatus().String()).
Msg("discarding non-idle connection")
return s.pool.Discard(sh)
Expand Down
6 changes: 5 additions & 1 deletion router/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ type PsqlClient struct {
show_notice_messages bool
maintain_params bool

idSelf uint

/* protects server */
mu sync.RWMutex
server server.Server
Expand Down Expand Up @@ -241,6 +243,8 @@ func NewPsqlClient(pgconn conn.RawConn, pt port.RouterPortType, defaultRouteBeha
show_notice_messages: showNoticeMessages,
}

cl.idSelf = spqrlog.GetPointer(cl)

return cl
}

Expand Down Expand Up @@ -539,7 +543,7 @@ func (cl *PsqlClient) ReplyWarningf(fmtString string, args ...interface{}) error
}

func (cl *PsqlClient) ID() uint {
return spqrlog.GetPointer(cl)
return cl.idSelf
}

func (cl *PsqlClient) Shards() []shard.Shard {
Expand Down
9 changes: 5 additions & 4 deletions router/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
"github.com/pg-sharding/spqr/pkg/spqrlog"
"github.com/pg-sharding/spqr/pkg/workloadlog"
"github.com/pg-sharding/spqr/router/client"
"github.com/pg-sharding/spqr/router/parser"
"github.com/pg-sharding/spqr/router/poolmgr"
"github.com/pg-sharding/spqr/router/qrouter"
"github.com/pg-sharding/spqr/router/relay"

Check failure on line 15 in router/frontend/frontend.go

View workflow job for this annotation

GitHub Actions / golangci-lint

could not import github.com/pg-sharding/spqr/router/relay (-: # github.com/pg-sharding/spqr/router/relay
)

type Qinteractor interface{}
Expand Down Expand Up @@ -92,7 +93,7 @@
}

spqrlog.Zero.Debug().
Uint("client", spqrlog.GetPointer(rst.Client())).
Uint("client", rst.Client().ID()).
Msg("client connection synced")
return nil
case *pgproto3.Parse:
Expand Down Expand Up @@ -150,17 +151,17 @@
}
}

func Frontend(qr qrouter.QueryRouter, cl client.RouterClient, cmngr poolmgr.PoolMgr, rcfg *config.Router, writer workloadlog.WorkloadLog) error {
func Frontend(qr qrouter.QueryRouter, cl client.RouterClient, cmngr poolmgr.PoolMgr, rcfg *config.Router, writer workloadlog.WorkloadLog, qp parser.Parser) error {
spqrlog.Zero.Info().
Str("user", cl.Usr()).
Str("db", cl.DB()).
Uint("client", spqrlog.GetPointer(cl)).
Uint("client", cl.ID()).
Msg("process frontend for route")

if rcfg.PgprotoDebug {
_ = cl.ReplyDebugNoticef("process frontend for route %s %s", cl.Usr(), cl.DB())
}
rst := relay.NewRelayState(qr, cl, cmngr, rcfg)
rst := relay.NewRelayState(qr, cl, cmngr, rcfg, qp)

defer rst.Close()

Expand Down
11 changes: 7 additions & 4 deletions router/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
mockcl "github.com/pg-sharding/spqr/router/mock/client"
mockqr "github.com/pg-sharding/spqr/router/mock/qrouter"
mocksrv "github.com/pg-sharding/spqr/router/mock/server"
"github.com/pg-sharding/spqr/router/parser"
"github.com/pg-sharding/spqr/router/route"
"github.com/pg-sharding/spqr/router/routingstate"

Expand All @@ -38,11 +39,13 @@ func TestFrontendSimpleEOF(t *testing.T) {
cl.EXPECT().DB().AnyTimes().Return("db1")
cl.EXPECT().Close().Times(1)

cl.EXPECT().ID().AnyTimes().Return(uint(322332))

cl.EXPECT().Receive().Times(1).Return(nil, io.EOF)

cmngr.EXPECT().UnRouteCB(gomock.Any(), gomock.Any()).Times(1)

err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil)
err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil, parser.NewQParser())

assert.NoError(err, "")
}
Expand Down Expand Up @@ -151,7 +154,7 @@ func TestFrontendSimple(t *testing.T) {

cl.EXPECT().Receive().Times(1).Return(nil, io.EOF)

err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil)
err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil, parser.NewQParser())

assert.NoError(err, "")
}
Expand Down Expand Up @@ -315,7 +318,7 @@ func TestFrontendXProto(t *testing.T) {

cl.EXPECT().Receive().Times(1).Return(nil, io.EOF)

err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil)
err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil, parser.NewQParser())

assert.NoError(err, "")
}
Expand Down Expand Up @@ -448,7 +451,7 @@ func TestFrontendSimpleCopyIn(t *testing.T) {

cl.EXPECT().Receive().Times(1).Return(nil, io.EOF)

err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil)
err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil, parser.NewQParser())

assert.NoError(err, "")
}
7 changes: 6 additions & 1 deletion router/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/pg-sharding/spqr/qdb"
"github.com/pg-sharding/spqr/router/console"
"github.com/pg-sharding/spqr/router/frontend"
"github.com/pg-sharding/spqr/router/parser"
"github.com/pg-sharding/spqr/router/poolmgr"
"github.com/pg-sharding/spqr/router/port"
"github.com/pg-sharding/spqr/router/qrouter"
Expand All @@ -35,6 +36,7 @@ type RouterInstance interface {
type InstanceImpl struct {
RuleRouter rulerouter.RuleRouter
Qrouter qrouter.QueryRouter
Qp parser.Parser
AdmConsole console.Console
Mgr meta.EntityMgr
Writer workloadlog.WorkloadLog
Expand Down Expand Up @@ -141,9 +143,12 @@ func NewRouter(ctx context.Context, rcfg *config.Router, ns string, persist bool
return nil, err
}

qp := parser.NewSharedParser()

r := &InstanceImpl{
RuleRouter: rr,
Qrouter: qr,
Qp: qp,
AdmConsole: localConsole,
Mgr: lc,
stchan: stchan,
Expand Down Expand Up @@ -189,7 +194,7 @@ func (r *InstanceImpl) serv(netconn net.Conn, pt port.RouterPortType) error {
_, _ = routerClient.Route().ReleaseClient(routerClient.ID())
}()

return frontend.Frontend(r.Qrouter, routerClient, cmngr, r.RuleRouter.Config(), r.Writer)
return frontend.Frontend(r.Qrouter, routerClient, cmngr, r.RuleRouter.Config(), r.Writer, r.Qp)
}

func (r *InstanceImpl) Run(ctx context.Context, listener net.Listener, pt port.RouterPortType) error {
Expand Down
File renamed without changes.
9 changes: 9 additions & 0 deletions router/parser/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package parser

import "github.com/pg-sharding/lyx/lyx"

type Parser interface {
Parse(query string) (ParseState, string, error)

Stmt() lyx.Node
}
6 changes: 6 additions & 0 deletions router/parser/qparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ type ParseStateExplain struct {
Query lyx.Node
}

var _ Parser = &QParser{}

// TODO : unit tests
func (qp *QParser) Parse(query string) (ParseState, string, error) {
qp.query = query
Expand Down Expand Up @@ -247,3 +249,7 @@ func (qp *QParser) Parse(query string) (ParseState, string, error) {

return ParseStateQuery{}, comment, nil
}

func NewQParser() Parser {
return &QParser{}
}
54 changes: 54 additions & 0 deletions router/parser/shared_parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package parser

import (
"sync"

"github.com/pg-sharding/lyx/lyx"
)

type CacheEntry struct {
ps ParseState
comm string
}

// Parser shared accross all relays.
// caches parse result.
type SharedParser struct {
parseCache sync.Map

mu sync.Mutex
underlying Parser
}

// Stmt implements Parser.
func (shp *SharedParser) Stmt() lyx.Node {
return shp.underlying.Stmt()
}

var _ Parser = &SharedParser{}

func (shp *SharedParser) Parse(query string) (ParseState, string, error) {
ce, ok := shp.parseCache.Load(query)
if ok {
return ce.(CacheEntry).ps, ce.(CacheEntry).comm, nil
}

p := NewQParser()
state, comm, err := p.Parse(query)

if err == nil {
shp.parseCache.Store(query, CacheEntry{
ps: state,
comm: comm,
})
}

return state, comm, err
}

func NewSharedParser() Parser {
return &SharedParser{
parseCache: sync.Map{},
underlying: NewQParser(),
}
}
5 changes: 3 additions & 2 deletions router/relay/phs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
"testing"

"github.com/pg-sharding/spqr/pkg/config"
"github.com/pg-sharding/spqr/router/parser"
"github.com/pg-sharding/spqr/router/relay"

Check failure on line 8 in router/relay/phs_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

could not import github.com/pg-sharding/spqr/router/relay (-: # github.com/pg-sharding/spqr/router/relay
"github.com/stretchr/testify/assert"

"github.com/golang/mock/gomock"
Expand All @@ -23,7 +24,7 @@
cl := mockcl.NewMockRouterClient(ctrl)
qr := mockqr.NewMockQueryRouter(ctrl)

rst := relay.NewRelayState(qr, cl, cmngr, &config.Router{})
rst := relay.NewRelayState(qr, cl, cmngr, &config.Router{}, parser.NewQParser())

ph := relay.NewSimpleProtoStateHandler(cmngr)

Expand All @@ -45,7 +46,7 @@
cl := mockcl.NewMockRouterClient(ctrl)
qr := mockqr.NewMockQueryRouter(ctrl)

rst := relay.NewRelayState(qr, cl, cmngr, &config.Router{})
rst := relay.NewRelayState(qr, cl, cmngr, &config.Router{}, parser.NewQParser())

ph := relay.NewSimpleProtoStateHandler(cmngr)

Expand Down
2 changes: 1 addition & 1 deletion router/relay/qstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func deparseRouteHint(rst RelayStateMgr, params map[string]string) (routehint.Ro
func ProcQueryAdvanced(rst RelayStateMgr, query string, ph ProtoStateHandler, binderQ func() error, doCaching bool) error {
statistics.RecordStartTime(statistics.Router, time.Now(), rst.Client().ID())

spqrlog.Zero.Debug().Str("query", query).Uint("client", spqrlog.GetPointer(rst.Client())).Msgf("process relay state advanced")
spqrlog.Zero.Debug().Str("query", query).Uint("client", rst.Client().ID()).Msgf("process relay state advanced")
state, comment, err := rst.Parse(query, doCaching)
if err != nil {
return fmt.Errorf("error processing query '%v': %w", query, err)
Expand Down
5 changes: 3 additions & 2 deletions router/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@
routingState routingstate.RoutingState

Qr qrouter.QueryRouter
qp parser.QParser
qp parser.Parser
plainQ string
Cl client.RouterClient
manager poolmgr.PoolMgr
Expand Down Expand Up @@ -176,13 +176,14 @@
rst.Cl.Server().RequestData()
}

func NewRelayState(qr qrouter.QueryRouter, client client.RouterClient, manager poolmgr.PoolMgr, rcfg *config.Router) RelayStateMgr {
func NewRelayState(qr qrouter.QueryRouter, client client.RouterClient, manager poolmgr.PoolMgr, rcfg *config.Router, parser parser.Parser) RelayStateMgr {
return &RelayStateImpl{
activeShards: nil,
txStatus: txstatus.TXIDLE,
msgBuf: nil,
traceMsgs: false,
Qr: qr,
qp: parser,
Cl: client,
manager: manager,
WorldShardFallback: rcfg.WorldShardFallback,
Expand Down Expand Up @@ -1469,7 +1470,7 @@
// TODO : unit tests
func (rst *RelayStateImpl) Parse(query string, doCaching bool) (parser.ParseState, string, error) {
if cache, ok := rst.parseCache[query]; ok {
rst.qp.SetStmt(cache.stmt)

Check failure on line 1473 in router/relay/relay.go

View workflow job for this annotation

GitHub Actions / analyze

rst.qp.SetStmt undefined (type parser.Parser has no field or method SetStmt)

Check failure on line 1473 in router/relay/relay.go

View workflow job for this annotation

GitHub Actions / build

rst.qp.SetStmt undefined (type parser.Parser has no field or method SetStmt)

Check failure on line 1473 in router/relay/relay.go

View workflow job for this annotation

GitHub Actions / golangci-lint

rst.qp.SetStmt undefined (type parser.Parser has no field or method SetStmt) (typecheck)

Check failure on line 1473 in router/relay/relay.go

View workflow job for this annotation

GitHub Actions / golangci-lint

rst.qp.SetStmt undefined (type parser.Parser has no field or method SetStmt)) (typecheck)

Check failure on line 1473 in router/relay/relay.go

View workflow job for this annotation

GitHub Actions / golangci-lint

rst.qp.SetStmt undefined (type parser.Parser has no field or method SetStmt)) (typecheck)
return cache.ps, cache.comm, nil
}

Expand Down
Loading