Skip to content

Commit

Permalink
Perf enhancements
Browse files Browse the repository at this point in the history
  • Loading branch information
reshke authored and Denchick committed Sep 19, 2024
1 parent 2989f55 commit 8ccaa55
Show file tree
Hide file tree
Showing 14 changed files with 107 additions and 20 deletions.
1 change: 1 addition & 0 deletions pkg/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"crypto/sha256"
"encoding/hex"
"fmt"

"github.com/pg-sharding/spqr/pkg/client"
"github.com/pg-sharding/spqr/pkg/conn"
"github.com/pg-sharding/spqr/pkg/spqrlog"
Expand Down
8 changes: 5 additions & 3 deletions pkg/datashard/datashard.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type Conn struct {

tx_served int64

id string
idSelf uint

status txstatus.TXStatus

Expand Down Expand Up @@ -351,7 +351,7 @@ func (sh *Conn) SHKey() kr.ShardKey {
// Returns:
// - uint: The unique identifier of the Conn struct.
func (sh *Conn) ID() uint {
return spqrlog.GetPointer(sh)
return sh.idSelf
}

// Usr returns the username associated with the Conn struct.
Expand Down Expand Up @@ -420,6 +420,8 @@ func NewShard(
stmtDesc: map[uint64]*prepstatement.PreparedStatementDescriptor{},
}

dtSh.idSelf = spqrlog.GetPointer(dtSh)

dtSh.dedicated = pgi

if dtSh.dedicated.Status() == conn.NotInitialized {
Expand Down Expand Up @@ -520,7 +522,7 @@ func (sh *Conn) fire(q string) error {
return err
} else {
spqrlog.Zero.Debug().
Str("shard", sh.id).
Uint("id", sh.ID()).
Type("type", msg).
Msg("shard rollback response")

Expand Down
4 changes: 2 additions & 2 deletions pkg/pool/dbpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,14 +355,14 @@ func (s *InstancePoolImpl) ForEach(cb func(sh shard.Shardinfo) error) error {
func (s *InstancePoolImpl) Put(sh shard.Shard) error {
if sh.Sync() != 0 {
spqrlog.Zero.Error().
Uint("shard", spqrlog.GetPointer(sh)).
Uint("shard", sh.ID()).
Int64("sync", sh.Sync()).
Msg("discarding unsync connection")
return s.pool.Discard(sh)
}
if sh.TxStatus() != txstatus.TXIDLE {
spqrlog.Zero.Error().
Uint("shard", spqrlog.GetPointer(sh)).
Uint("shard", sh.ID()).
Str("txstatus", sh.TxStatus().String()).
Msg("discarding non-idle connection")
return s.pool.Discard(sh)
Expand Down
6 changes: 5 additions & 1 deletion router/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ type PsqlClient struct {
show_notice_messages bool
maintain_params bool

idSelf uint

/* protects server */
mu sync.RWMutex
server server.Server
Expand Down Expand Up @@ -241,6 +243,8 @@ func NewPsqlClient(pgconn conn.RawConn, pt port.RouterPortType, defaultRouteBeha
show_notice_messages: showNoticeMessages,
}

cl.idSelf = spqrlog.GetPointer(cl)

return cl
}

Expand Down Expand Up @@ -539,7 +543,7 @@ func (cl *PsqlClient) ReplyWarningf(fmtString string, args ...interface{}) error
}

func (cl *PsqlClient) ID() uint {
return spqrlog.GetPointer(cl)
return cl.idSelf
}

func (cl *PsqlClient) Shards() []shard.Shard {
Expand Down
9 changes: 5 additions & 4 deletions router/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/pg-sharding/spqr/pkg/spqrlog"
"github.com/pg-sharding/spqr/pkg/workloadlog"
"github.com/pg-sharding/spqr/router/client"
"github.com/pg-sharding/spqr/router/parser"
"github.com/pg-sharding/spqr/router/poolmgr"
"github.com/pg-sharding/spqr/router/qrouter"
"github.com/pg-sharding/spqr/router/relay"

Check failure on line 15 in router/frontend/frontend.go

View workflow job for this annotation

GitHub Actions / golangci-lint

could not import github.com/pg-sharding/spqr/router/relay (-: # github.com/pg-sharding/spqr/router/relay
Expand Down Expand Up @@ -92,7 +93,7 @@ func ProcessMessage(qr qrouter.QueryRouter, cmngr poolmgr.PoolMgr, rst relay.Rel
}

spqrlog.Zero.Debug().
Uint("client", spqrlog.GetPointer(rst.Client())).
Uint("client", rst.Client().ID()).
Msg("client connection synced")
return nil
case *pgproto3.Parse:
Expand Down Expand Up @@ -150,17 +151,17 @@ func ProcessMessage(qr qrouter.QueryRouter, cmngr poolmgr.PoolMgr, rst relay.Rel
}
}

func Frontend(qr qrouter.QueryRouter, cl client.RouterClient, cmngr poolmgr.PoolMgr, rcfg *config.Router, writer workloadlog.WorkloadLog) error {
func Frontend(qr qrouter.QueryRouter, cl client.RouterClient, cmngr poolmgr.PoolMgr, rcfg *config.Router, writer workloadlog.WorkloadLog, qp parser.Parser) error {
spqrlog.Zero.Info().
Str("user", cl.Usr()).
Str("db", cl.DB()).
Uint("client", spqrlog.GetPointer(cl)).
Uint("client", cl.ID()).
Msg("process frontend for route")

if rcfg.PgprotoDebug {
_ = cl.ReplyDebugNoticef("process frontend for route %s %s", cl.Usr(), cl.DB())
}
rst := relay.NewRelayState(qr, cl, cmngr, rcfg)
rst := relay.NewRelayState(qr, cl, cmngr, rcfg, qp)

defer rst.Close()

Expand Down
11 changes: 7 additions & 4 deletions router/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
mockcl "github.com/pg-sharding/spqr/router/mock/client"
mockqr "github.com/pg-sharding/spqr/router/mock/qrouter"
mocksrv "github.com/pg-sharding/spqr/router/mock/server"
"github.com/pg-sharding/spqr/router/parser"
"github.com/pg-sharding/spqr/router/route"
"github.com/pg-sharding/spqr/router/routingstate"

Expand All @@ -38,11 +39,13 @@ func TestFrontendSimpleEOF(t *testing.T) {
cl.EXPECT().DB().AnyTimes().Return("db1")
cl.EXPECT().Close().Times(1)

cl.EXPECT().ID().AnyTimes().Return(uint(322332))

cl.EXPECT().Receive().Times(1).Return(nil, io.EOF)

cmngr.EXPECT().UnRouteCB(gomock.Any(), gomock.Any()).Times(1)

err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil)
err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil, parser.NewQParser())

assert.NoError(err, "")
}
Expand Down Expand Up @@ -151,7 +154,7 @@ func TestFrontendSimple(t *testing.T) {

cl.EXPECT().Receive().Times(1).Return(nil, io.EOF)

err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil)
err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil, parser.NewQParser())

assert.NoError(err, "")
}
Expand Down Expand Up @@ -315,7 +318,7 @@ func TestFrontendXProto(t *testing.T) {

cl.EXPECT().Receive().Times(1).Return(nil, io.EOF)

err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil)
err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil, parser.NewQParser())

assert.NoError(err, "")
}
Expand Down Expand Up @@ -448,7 +451,7 @@ func TestFrontendSimpleCopyIn(t *testing.T) {

cl.EXPECT().Receive().Times(1).Return(nil, io.EOF)

err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil)
err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil, parser.NewQParser())

assert.NoError(err, "")
}
7 changes: 6 additions & 1 deletion router/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/pg-sharding/spqr/qdb"
"github.com/pg-sharding/spqr/router/console"
"github.com/pg-sharding/spqr/router/frontend"
"github.com/pg-sharding/spqr/router/parser"
"github.com/pg-sharding/spqr/router/poolmgr"
"github.com/pg-sharding/spqr/router/port"
"github.com/pg-sharding/spqr/router/qrouter"
Expand All @@ -35,6 +36,7 @@ type RouterInstance interface {
type InstanceImpl struct {
RuleRouter rulerouter.RuleRouter
Qrouter qrouter.QueryRouter
Qp parser.Parser
AdmConsole console.Console
Mgr meta.EntityMgr
Writer workloadlog.WorkloadLog
Expand Down Expand Up @@ -141,9 +143,12 @@ func NewRouter(ctx context.Context, rcfg *config.Router, ns string, persist bool
return nil, err
}

qp := parser.NewSharedParser()

r := &InstanceImpl{
RuleRouter: rr,
Qrouter: qr,
Qp: qp,
AdmConsole: localConsole,
Mgr: lc,
stchan: stchan,
Expand Down Expand Up @@ -189,7 +194,7 @@ func (r *InstanceImpl) serv(netconn net.Conn, pt port.RouterPortType) error {
_, _ = routerClient.Route().ReleaseClient(routerClient.ID())
}()

return frontend.Frontend(r.Qrouter, routerClient, cmngr, r.RuleRouter.Config(), r.Writer)
return frontend.Frontend(r.Qrouter, routerClient, cmngr, r.RuleRouter.Config(), r.Writer, r.Qp)
}

func (r *InstanceImpl) Run(ctx context.Context, listener net.Listener, pt port.RouterPortType) error {
Expand Down
File renamed without changes.
9 changes: 9 additions & 0 deletions router/parser/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package parser

import "github.com/pg-sharding/lyx/lyx"

type Parser interface {
Parse(query string) (ParseState, string, error)

Stmt() lyx.Node
}
6 changes: 6 additions & 0 deletions router/parser/qparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ type ParseStateExplain struct {
Query lyx.Node
}

var _ Parser = &QParser{}

// TODO : unit tests
func (qp *QParser) Parse(query string) (ParseState, string, error) {
qp.query = query
Expand Down Expand Up @@ -247,3 +249,7 @@ func (qp *QParser) Parse(query string) (ParseState, string, error) {

return ParseStateQuery{}, comment, nil
}

func NewQParser() Parser {
return &QParser{}
}
54 changes: 54 additions & 0 deletions router/parser/shared_parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package parser

import (
"sync"

"github.com/pg-sharding/lyx/lyx"
)

type CacheEntry struct {
ps ParseState
comm string
}

// Parser shared accross all relays.
// caches parse result.
type SharedParser struct {
parseCache sync.Map

mu sync.Mutex
underlying Parser
}

// Stmt implements Parser.
func (shp *SharedParser) Stmt() lyx.Node {
return shp.underlying.Stmt()
}

var _ Parser = &SharedParser{}

func (shp *SharedParser) Parse(query string) (ParseState, string, error) {
ce, ok := shp.parseCache.Load(query)
if ok {
return ce.(CacheEntry).ps, ce.(CacheEntry).comm, nil
}

p := NewQParser()
state, comm, err := p.Parse(query)

if err == nil {
shp.parseCache.Store(query, CacheEntry{
ps: state,
comm: comm,
})
}

return state, comm, err
}

func NewSharedParser() Parser {
return &SharedParser{
parseCache: sync.Map{},
underlying: NewQParser(),
}
}
5 changes: 3 additions & 2 deletions router/relay/phs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"github.com/pg-sharding/spqr/pkg/config"
"github.com/pg-sharding/spqr/router/parser"
"github.com/pg-sharding/spqr/router/relay"

Check failure on line 8 in router/relay/phs_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

could not import github.com/pg-sharding/spqr/router/relay (-: # github.com/pg-sharding/spqr/router/relay
"github.com/stretchr/testify/assert"

Expand All @@ -23,7 +24,7 @@ func TestTxSimpleCommit(t *testing.T) {
cl := mockcl.NewMockRouterClient(ctrl)
qr := mockqr.NewMockQueryRouter(ctrl)

rst := relay.NewRelayState(qr, cl, cmngr, &config.Router{})
rst := relay.NewRelayState(qr, cl, cmngr, &config.Router{}, parser.NewQParser())

ph := relay.NewSimpleProtoStateHandler(cmngr)

Expand All @@ -45,7 +46,7 @@ func TestTxSimpleRollback(t *testing.T) {
cl := mockcl.NewMockRouterClient(ctrl)
qr := mockqr.NewMockQueryRouter(ctrl)

rst := relay.NewRelayState(qr, cl, cmngr, &config.Router{})
rst := relay.NewRelayState(qr, cl, cmngr, &config.Router{}, parser.NewQParser())

ph := relay.NewSimpleProtoStateHandler(cmngr)

Expand Down
2 changes: 1 addition & 1 deletion router/relay/qstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func deparseRouteHint(rst RelayStateMgr, params map[string]string) (routehint.Ro
func ProcQueryAdvanced(rst RelayStateMgr, query string, ph ProtoStateHandler, binderQ func() error, doCaching bool) error {
statistics.RecordStartTime(statistics.Router, time.Now(), rst.Client().ID())

spqrlog.Zero.Debug().Str("query", query).Uint("client", spqrlog.GetPointer(rst.Client())).Msgf("process relay state advanced")
spqrlog.Zero.Debug().Str("query", query).Uint("client", rst.Client().ID()).Msgf("process relay state advanced")
state, comment, err := rst.Parse(query, doCaching)
if err != nil {
return fmt.Errorf("error processing query '%v': %w", query, err)
Expand Down
5 changes: 3 additions & 2 deletions router/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ type RelayStateImpl struct {
routingState routingstate.RoutingState

Qr qrouter.QueryRouter
qp parser.QParser
qp parser.Parser
plainQ string
Cl client.RouterClient
manager poolmgr.PoolMgr
Expand Down Expand Up @@ -176,13 +176,14 @@ func (rst *RelayStateImpl) RequestData() {
rst.Cl.Server().RequestData()
}

func NewRelayState(qr qrouter.QueryRouter, client client.RouterClient, manager poolmgr.PoolMgr, rcfg *config.Router) RelayStateMgr {
func NewRelayState(qr qrouter.QueryRouter, client client.RouterClient, manager poolmgr.PoolMgr, rcfg *config.Router, parser parser.Parser) RelayStateMgr {
return &RelayStateImpl{
activeShards: nil,
txStatus: txstatus.TXIDLE,
msgBuf: nil,
traceMsgs: false,
Qr: qr,
qp: parser,
Cl: client,
manager: manager,
WorldShardFallback: rcfg.WorldShardFallback,
Expand Down

0 comments on commit 8ccaa55

Please sign in to comment.