diff --git a/pkg/auth/auth.go b/pkg/auth/auth.go index 87cec7b28..8bba3c5cc 100644 --- a/pkg/auth/auth.go +++ b/pkg/auth/auth.go @@ -7,6 +7,7 @@ import ( "crypto/sha256" "encoding/hex" "fmt" + "github.com/pg-sharding/spqr/pkg/client" "github.com/pg-sharding/spqr/pkg/conn" "github.com/pg-sharding/spqr/pkg/spqrlog" diff --git a/pkg/datashard/datashard.go b/pkg/datashard/datashard.go index bff7ff7b8..7bd6ceeed 100644 --- a/pkg/datashard/datashard.go +++ b/pkg/datashard/datashard.go @@ -62,7 +62,7 @@ type Conn struct { tx_served int64 - id string + idSelf uint status txstatus.TXStatus @@ -351,7 +351,7 @@ func (sh *Conn) SHKey() kr.ShardKey { // Returns: // - uint: The unique identifier of the Conn struct. func (sh *Conn) ID() uint { - return spqrlog.GetPointer(sh) + return sh.idSelf } // Usr returns the username associated with the Conn struct. @@ -420,6 +420,8 @@ func NewShard( stmtDesc: map[uint64]*prepstatement.PreparedStatementDescriptor{}, } + dtSh.idSelf = spqrlog.GetPointer(dtSh) + dtSh.dedicated = pgi if dtSh.dedicated.Status() == conn.NotInitialized { @@ -520,7 +522,7 @@ func (sh *Conn) fire(q string) error { return err } else { spqrlog.Zero.Debug(). - Str("shard", sh.id). + Uint("id", sh.ID()). Type("type", msg). Msg("shard rollback response") diff --git a/pkg/pool/dbpool.go b/pkg/pool/dbpool.go index f754cf700..eb2b9fbe2 100644 --- a/pkg/pool/dbpool.go +++ b/pkg/pool/dbpool.go @@ -355,14 +355,14 @@ func (s *InstancePoolImpl) ForEach(cb func(sh shard.Shardinfo) error) error { func (s *InstancePoolImpl) Put(sh shard.Shard) error { if sh.Sync() != 0 { spqrlog.Zero.Error(). - Uint("shard", spqrlog.GetPointer(sh)). + Uint("shard", sh.ID()). Int64("sync", sh.Sync()). Msg("discarding unsync connection") return s.pool.Discard(sh) } if sh.TxStatus() != txstatus.TXIDLE { spqrlog.Zero.Error(). - Uint("shard", spqrlog.GetPointer(sh)). + Uint("shard", sh.ID()). Str("txstatus", sh.TxStatus().String()). Msg("discarding non-idle connection") return s.pool.Discard(sh) diff --git a/router/client/client.go b/router/client/client.go index 61ee03db3..5d48fc3c7 100644 --- a/router/client/client.go +++ b/router/client/client.go @@ -117,6 +117,8 @@ type PsqlClient struct { show_notice_messages bool maintain_params bool + idSelf uint + /* protects server */ mu sync.RWMutex server server.Server @@ -241,6 +243,8 @@ func NewPsqlClient(pgconn conn.RawConn, pt port.RouterPortType, defaultRouteBeha show_notice_messages: showNoticeMessages, } + cl.idSelf = spqrlog.GetPointer(cl) + return cl } @@ -539,7 +543,7 @@ func (cl *PsqlClient) ReplyWarningf(fmtString string, args ...interface{}) error } func (cl *PsqlClient) ID() uint { - return spqrlog.GetPointer(cl) + return cl.idSelf } func (cl *PsqlClient) Shards() []shard.Shard { diff --git a/router/frontend/frontend.go b/router/frontend/frontend.go index 603ceed72..1f3afe345 100644 --- a/router/frontend/frontend.go +++ b/router/frontend/frontend.go @@ -9,6 +9,7 @@ import ( "github.com/pg-sharding/spqr/pkg/spqrlog" "github.com/pg-sharding/spqr/pkg/workloadlog" "github.com/pg-sharding/spqr/router/client" + "github.com/pg-sharding/spqr/router/parser" "github.com/pg-sharding/spqr/router/poolmgr" "github.com/pg-sharding/spqr/router/qrouter" "github.com/pg-sharding/spqr/router/relay" @@ -92,7 +93,7 @@ func ProcessMessage(qr qrouter.QueryRouter, cmngr poolmgr.PoolMgr, rst relay.Rel } spqrlog.Zero.Debug(). - Uint("client", spqrlog.GetPointer(rst.Client())). + Uint("client", rst.Client().ID()). Msg("client connection synced") return nil case *pgproto3.Parse: @@ -150,17 +151,17 @@ func ProcessMessage(qr qrouter.QueryRouter, cmngr poolmgr.PoolMgr, rst relay.Rel } } -func Frontend(qr qrouter.QueryRouter, cl client.RouterClient, cmngr poolmgr.PoolMgr, rcfg *config.Router, writer workloadlog.WorkloadLog) error { +func Frontend(qr qrouter.QueryRouter, cl client.RouterClient, cmngr poolmgr.PoolMgr, rcfg *config.Router, writer workloadlog.WorkloadLog, qp parser.Parser) error { spqrlog.Zero.Info(). Str("user", cl.Usr()). Str("db", cl.DB()). - Uint("client", spqrlog.GetPointer(cl)). + Uint("client", cl.ID()). Msg("process frontend for route") if rcfg.PgprotoDebug { _ = cl.ReplyDebugNoticef("process frontend for route %s %s", cl.Usr(), cl.DB()) } - rst := relay.NewRelayState(qr, cl, cmngr, rcfg) + rst := relay.NewRelayState(qr, cl, cmngr, rcfg, qp) defer rst.Close() diff --git a/router/frontend/frontend_test.go b/router/frontend/frontend_test.go index 4b2de149b..f782aec2c 100644 --- a/router/frontend/frontend_test.go +++ b/router/frontend/frontend_test.go @@ -16,6 +16,7 @@ import ( mockcl "github.com/pg-sharding/spqr/router/mock/client" mockqr "github.com/pg-sharding/spqr/router/mock/qrouter" mocksrv "github.com/pg-sharding/spqr/router/mock/server" + "github.com/pg-sharding/spqr/router/parser" "github.com/pg-sharding/spqr/router/route" "github.com/pg-sharding/spqr/router/routingstate" @@ -38,11 +39,13 @@ func TestFrontendSimpleEOF(t *testing.T) { cl.EXPECT().DB().AnyTimes().Return("db1") cl.EXPECT().Close().Times(1) + cl.EXPECT().ID().AnyTimes().Return(uint(322332)) + cl.EXPECT().Receive().Times(1).Return(nil, io.EOF) cmngr.EXPECT().UnRouteCB(gomock.Any(), gomock.Any()).Times(1) - err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil) + err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil, parser.NewQParser()) assert.NoError(err, "") } @@ -151,7 +154,7 @@ func TestFrontendSimple(t *testing.T) { cl.EXPECT().Receive().Times(1).Return(nil, io.EOF) - err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil) + err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil, parser.NewQParser()) assert.NoError(err, "") } @@ -315,7 +318,7 @@ func TestFrontendXProto(t *testing.T) { cl.EXPECT().Receive().Times(1).Return(nil, io.EOF) - err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil) + err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil, parser.NewQParser()) assert.NoError(err, "") } @@ -448,7 +451,7 @@ func TestFrontendSimpleCopyIn(t *testing.T) { cl.EXPECT().Receive().Times(1).Return(nil, io.EOF) - err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil) + err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil, parser.NewQParser()) assert.NoError(err, "") } diff --git a/router/instance/instance.go b/router/instance/instance.go index 466a17d90..7b15ac3d2 100644 --- a/router/instance/instance.go +++ b/router/instance/instance.go @@ -15,6 +15,7 @@ import ( "github.com/pg-sharding/spqr/qdb" "github.com/pg-sharding/spqr/router/console" "github.com/pg-sharding/spqr/router/frontend" + "github.com/pg-sharding/spqr/router/parser" "github.com/pg-sharding/spqr/router/poolmgr" "github.com/pg-sharding/spqr/router/port" "github.com/pg-sharding/spqr/router/qrouter" @@ -35,6 +36,7 @@ type RouterInstance interface { type InstanceImpl struct { RuleRouter rulerouter.RuleRouter Qrouter qrouter.QueryRouter + Qp parser.Parser AdmConsole console.Console Mgr meta.EntityMgr Writer workloadlog.WorkloadLog @@ -141,9 +143,12 @@ func NewRouter(ctx context.Context, rcfg *config.Router, ns string, persist bool return nil, err } + qp := parser.NewSharedParser() + r := &InstanceImpl{ RuleRouter: rr, Qrouter: qr, + Qp: qp, AdmConsole: localConsole, Mgr: lc, stchan: stchan, @@ -189,7 +194,7 @@ func (r *InstanceImpl) serv(netconn net.Conn, pt port.RouterPortType) error { _, _ = routerClient.Route().ReleaseClient(routerClient.ID()) }() - return frontend.Frontend(r.Qrouter, routerClient, cmngr, r.RuleRouter.Config(), r.Writer) + return frontend.Frontend(r.Qrouter, routerClient, cmngr, r.RuleRouter.Config(), r.Writer, r.Qp) } func (r *InstanceImpl) Run(ctx context.Context, listener net.Listener, pt port.RouterPortType) error { diff --git a/router/parser/cooment_test.go b/router/parser/comment_test.go similarity index 100% rename from router/parser/cooment_test.go rename to router/parser/comment_test.go diff --git a/router/parser/parser.go b/router/parser/parser.go new file mode 100644 index 000000000..85bca4f6f --- /dev/null +++ b/router/parser/parser.go @@ -0,0 +1,9 @@ +package parser + +import "github.com/pg-sharding/lyx/lyx" + +type Parser interface { + Parse(query string) (ParseState, string, error) + + Stmt() lyx.Node +} diff --git a/router/parser/qparser.go b/router/parser/qparser.go index 0737181db..21cc8dea3 100644 --- a/router/parser/qparser.go +++ b/router/parser/qparser.go @@ -100,6 +100,8 @@ type ParseStateExplain struct { Query lyx.Node } +var _ Parser = &QParser{} + // TODO : unit tests func (qp *QParser) Parse(query string) (ParseState, string, error) { qp.query = query @@ -247,3 +249,7 @@ func (qp *QParser) Parse(query string) (ParseState, string, error) { return ParseStateQuery{}, comment, nil } + +func NewQParser() Parser { + return &QParser{} +} diff --git a/router/parser/shared_parser.go b/router/parser/shared_parser.go new file mode 100644 index 000000000..457a5dd1a --- /dev/null +++ b/router/parser/shared_parser.go @@ -0,0 +1,54 @@ +package parser + +import ( + "sync" + + "github.com/pg-sharding/lyx/lyx" +) + +type CacheEntry struct { + ps ParseState + comm string +} + +// Parser shared accross all relays. +// caches parse result. +type SharedParser struct { + parseCache sync.Map + + mu sync.Mutex + underlying Parser +} + +// Stmt implements Parser. +func (shp *SharedParser) Stmt() lyx.Node { + return shp.underlying.Stmt() +} + +var _ Parser = &SharedParser{} + +func (shp *SharedParser) Parse(query string) (ParseState, string, error) { + ce, ok := shp.parseCache.Load(query) + if ok { + return ce.(CacheEntry).ps, ce.(CacheEntry).comm, nil + } + + p := NewQParser() + state, comm, err := p.Parse(query) + + if err == nil { + shp.parseCache.Store(query, CacheEntry{ + ps: state, + comm: comm, + }) + } + + return state, comm, err +} + +func NewSharedParser() Parser { + return &SharedParser{ + parseCache: sync.Map{}, + underlying: NewQParser(), + } +} diff --git a/router/relay/phs_test.go b/router/relay/phs_test.go index 021a34b73..8bb6cf6ab 100644 --- a/router/relay/phs_test.go +++ b/router/relay/phs_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/pg-sharding/spqr/pkg/config" + "github.com/pg-sharding/spqr/router/parser" "github.com/pg-sharding/spqr/router/relay" "github.com/stretchr/testify/assert" @@ -23,7 +24,7 @@ func TestTxSimpleCommit(t *testing.T) { cl := mockcl.NewMockRouterClient(ctrl) qr := mockqr.NewMockQueryRouter(ctrl) - rst := relay.NewRelayState(qr, cl, cmngr, &config.Router{}) + rst := relay.NewRelayState(qr, cl, cmngr, &config.Router{}, parser.NewQParser()) ph := relay.NewSimpleProtoStateHandler(cmngr) @@ -45,7 +46,7 @@ func TestTxSimpleRollback(t *testing.T) { cl := mockcl.NewMockRouterClient(ctrl) qr := mockqr.NewMockQueryRouter(ctrl) - rst := relay.NewRelayState(qr, cl, cmngr, &config.Router{}) + rst := relay.NewRelayState(qr, cl, cmngr, &config.Router{}, parser.NewQParser()) ph := relay.NewSimpleProtoStateHandler(cmngr) diff --git a/router/relay/qstate.go b/router/relay/qstate.go index 3787dc9ff..0c8e7426a 100644 --- a/router/relay/qstate.go +++ b/router/relay/qstate.go @@ -78,7 +78,7 @@ func deparseRouteHint(rst RelayStateMgr, params map[string]string) (routehint.Ro func ProcQueryAdvanced(rst RelayStateMgr, query string, ph ProtoStateHandler, binderQ func() error, doCaching bool) error { statistics.RecordStartTime(statistics.Router, time.Now(), rst.Client().ID()) - spqrlog.Zero.Debug().Str("query", query).Uint("client", spqrlog.GetPointer(rst.Client())).Msgf("process relay state advanced") + spqrlog.Zero.Debug().Str("query", query).Uint("client", rst.Client().ID()).Msgf("process relay state advanced") state, comment, err := rst.Parse(query, doCaching) if err != nil { return fmt.Errorf("error processing query '%v': %w", query, err) diff --git a/router/relay/relay.go b/router/relay/relay.go index 396991a3c..5c475d166 100644 --- a/router/relay/relay.go +++ b/router/relay/relay.go @@ -136,7 +136,7 @@ type RelayStateImpl struct { routingState routingstate.RoutingState Qr qrouter.QueryRouter - qp parser.QParser + qp parser.Parser plainQ string Cl client.RouterClient manager poolmgr.PoolMgr @@ -176,13 +176,14 @@ func (rst *RelayStateImpl) RequestData() { rst.Cl.Server().RequestData() } -func NewRelayState(qr qrouter.QueryRouter, client client.RouterClient, manager poolmgr.PoolMgr, rcfg *config.Router) RelayStateMgr { +func NewRelayState(qr qrouter.QueryRouter, client client.RouterClient, manager poolmgr.PoolMgr, rcfg *config.Router, parser parser.Parser) RelayStateMgr { return &RelayStateImpl{ activeShards: nil, txStatus: txstatus.TXIDLE, msgBuf: nil, traceMsgs: false, Qr: qr, + qp: parser, Cl: client, manager: manager, WorldShardFallback: rcfg.WorldShardFallback,