Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

* Added table/options.WithQueryService() option for redirect table.Session.Execute call to query.Execute #1202

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Added `table/options.WithQueryService()` option for redirect `/Ydb.Table.V1.TableService/ExecuteDataQuery` call to `/Ydb.Query.V1.QueryService/ExecuteQuery`

## v3.65.2
* Fixed data race using `log.WithNames`

Expand Down
20 changes: 10 additions & 10 deletions balancers/balancers.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package balancers

import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
"sort"
"strings"

balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xstring"
)

Expand All @@ -28,8 +28,8 @@ func SingleConn() *balancerConfig.Config {

type filterLocalDC struct{}

func (filterLocalDC) Allow(info balancerConfig.Info, c conn.Conn) bool {
return c.Endpoint().Location() == info.SelfLocation
func (filterLocalDC) Allow(info balancerConfig.Info, e endpoint.Info) bool {
return e.Location() == info.SelfLocation
}

func (filterLocalDC) String() string {
Expand Down Expand Up @@ -58,8 +58,8 @@ func PreferLocalDCWithFallBack(balancer *balancerConfig.Config) *balancerConfig.

type filterLocations []string

func (locations filterLocations) Allow(_ balancerConfig.Info, c conn.Conn) bool {
location := strings.ToUpper(c.Endpoint().Location())
func (locations filterLocations) Allow(_ balancerConfig.Info, e endpoint.Info) bool {
location := strings.ToUpper(e.Location())
for _, l := range locations {
if location == l {
return true
Expand Down Expand Up @@ -122,10 +122,10 @@ type Endpoint interface {
LocalDC() bool
}

type filterFunc func(info balancerConfig.Info, c conn.Conn) bool
type filterFunc func(info balancerConfig.Info, e endpoint.Info) bool

func (p filterFunc) Allow(info balancerConfig.Info, c conn.Conn) bool {
return p(info, c)
func (p filterFunc) Allow(info balancerConfig.Info, e endpoint.Info) bool {
return p(info, e)
}

func (p filterFunc) String() string {
Expand All @@ -135,8 +135,8 @@ func (p filterFunc) String() string {
// Prefer creates balancer which use endpoints by filter
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter
func Prefer(balancer *balancerConfig.Config, filter func(endpoint Endpoint) bool) *balancerConfig.Config {
balancer.Filter = filterFunc(func(_ balancerConfig.Info, c conn.Conn) bool {
return filter(c.Endpoint())
balancer.Filter = filterFunc(func(_ balancerConfig.Info, e endpoint.Info) bool {
return filter(e)
})

return balancer
Expand Down
80 changes: 55 additions & 25 deletions internal/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) {
}
}

b.applyDiscoveredEndpoints(ctx, endpoints, localDC)
b.applyEndpoints(ctx, endpoints, localDC)

return nil
}
Expand All @@ -142,40 +142,40 @@ func endpointsDiff(newestEndpoints []endpoint.Endpoint, previousConns []conn.Con
added = make([]trace.EndpointInfo, 0, len(previousConns))
dropped = make([]trace.EndpointInfo, 0, len(previousConns))
var (
newestMap = make(map[string]struct{}, len(newestEndpoints))
newestMap = make(map[string]endpoint.Endpoint, len(newestEndpoints))
previousMap = make(map[string]struct{}, len(previousConns))
)
sort.Slice(newestEndpoints, func(i, j int) bool {
return newestEndpoints[i].Address() < newestEndpoints[j].Address()
})
sort.Slice(previousConns, func(i, j int) bool {
return previousConns[i].Endpoint().Address() < previousConns[j].Endpoint().Address()
return previousConns[i].Address() < previousConns[j].Address()
})
for _, e := range previousConns {
previousMap[e.Endpoint().Address()] = struct{}{}
previousMap[e.Address()] = struct{}{}
}
for _, e := range newestEndpoints {
nodes = append(nodes, e.Copy())
newestMap[e.Address()] = struct{}{}
nodes = append(nodes, e)
newestMap[e.Address()] = e
if _, has := previousMap[e.Address()]; !has {
added = append(added, e.Copy())
added = append(added, e)
}
}
for _, c := range previousConns {
if _, has := newestMap[c.Endpoint().Address()]; !has {
dropped = append(dropped, c.Endpoint().Copy())
if e, has := newestMap[c.Address()]; !has {
dropped = append(dropped, e)
}
}

return nodes, added, dropped
}

func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []endpoint.Endpoint, localDC string) {
func (b *Balancer) applyEndpoints(ctx context.Context, endpoints []endpoint.Endpoint, localDC string) {
var (
onDone = trace.DriverOnBalancerUpdate(
b.driverConfig.Trace(), &ctx,
stack.FunctionID(
"github.com/ydb-platform/ydb-go-sdk/3/internal/balancer.(*Balancer).applyDiscoveredEndpoints"),
"github.com/ydb-platform/ydb-go-sdk/3/internal/balancer.(*Balancer).applyEndpoints"),
b.config.DetectLocalDC,
)
previousConns []conn.Conn
Expand All @@ -186,10 +186,9 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []end
}()

connections := endpointsToConnections(b.pool, endpoints)
for _, c := range connections {
connections.Each(func(c conn.Conn, e endpoint.Endpoint) {
b.pool.Allow(ctx, c)
c.Endpoint().Touch()
}
})

info := balancerConfig.Info{SelfLocation: localDC}
state := newConnectionsState(connections, b.config.Filter, info, b.config.AllowFallback)
Expand Down Expand Up @@ -260,7 +259,7 @@ func New(
localDCDetector: detectLocalDC,
}
d := internalDiscovery.New(ctx, pool.Get(
endpoint.New(driverConfig.Endpoint()),
driverConfig.Endpoint(),
), discoveryConfig)

b.discoveryClient = d
Expand All @@ -272,8 +271,8 @@ func New(
}

if b.config.SingleConn {
b.applyDiscoveredEndpoints(ctx, []endpoint.Endpoint{
endpoint.New(driverConfig.Endpoint()),
b.applyEndpoints(ctx, []endpoint.Endpoint{
endpoint.New(0, driverConfig.Endpoint(), ""),
}, "")
} else {
// initialization of balancer state
Expand Down Expand Up @@ -348,8 +347,7 @@ func (b *Balancer) wrapCall(ctx context.Context, f func(ctx context.Context, cc
if conn.UseWrapping(ctx) {
if credentials.IsAccessError(err) {
err = credentials.AccessError("no access", err,
credentials.WithAddress(cc.Endpoint().String()),
credentials.WithNodeID(cc.Endpoint().NodeID()),
credentials.WithAddress(cc.Address()),
credentials.WithCredentials(b.driverConfig.Credentials()),
)
}
Expand Down Expand Up @@ -377,9 +375,9 @@ func (b *Balancer) getConn(ctx context.Context) (c conn.Conn, err error) {
)
defer func() {
if err == nil {
onDone(c.Endpoint(), nil)
onDone(c.Address(), nil)
} else {
onDone(nil, err)
onDone("", err)
}
}()

Expand Down Expand Up @@ -408,11 +406,43 @@ func (b *Balancer) getConn(ctx context.Context) (c conn.Conn, err error) {
return c, nil
}

func endpointsToConnections(p *conn.Pool, endpoints []endpoint.Endpoint) []conn.Conn {
conns := make([]conn.Conn, 0, len(endpoints))
for _, e := range endpoints {
conns = append(conns, p.Get(e))
type connByEndpoint struct {
conns map[string]conn.Conn
endpoints map[string]endpoint.Endpoint
}

func (m *connByEndpoint) Get(address string) (conn.Conn, endpoint.Endpoint) {
return m.conns[address], m.endpoints[address]
}

func (m *connByEndpoint) Each(visitor func(c conn.Conn, e endpoint.Endpoint)) {
for _, e := range m.endpoints {
visitor(m.conns[e.Address()], e)
}
}

func (m *connByEndpoint) Len() int {
return len(m.endpoints)
}

func (m *connByEndpoint) Conns() (conns []conn.Conn) {
conns = make([]conn.Conn, 0, len(m.conns))
for _, c := range m.conns {
conns = append(conns, c)
}
return conns
}

func endpointsToConnections(p *conn.Pool, endpoints []endpoint.Endpoint) *connByEndpoint {
m := &connByEndpoint{
conns: make(map[string]conn.Conn, len(endpoints)),
endpoints: make(map[string]endpoint.Endpoint, len(endpoints)),
}

for _, e := range endpoints {
m.conns[e.Address()] = p.Get(e.Address())
m.endpoints[e.Address()] = e
}

return m
}
4 changes: 2 additions & 2 deletions internal/balancer/config/routerconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package config

import (
"fmt"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xstring"
)

Expand Down Expand Up @@ -47,6 +47,6 @@ type Info struct {
}

type Filter interface {
Allow(info Info, c conn.Conn) bool
Allow(info Info, e endpoint.Info) bool
String() string
}
31 changes: 16 additions & 15 deletions internal/balancer/connections_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package balancer

import (
"context"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"

balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
Expand All @@ -19,7 +20,7 @@ type connectionsState struct {
}

func newConnectionsState(
conns []conn.Conn,
conns *connByEndpoint,
filter balancerConfig.Filter,
info balancerConfig.Info,
allowFallback bool,
Expand All @@ -31,7 +32,7 @@ func newConnectionsState(

res.prefer, res.fallback = sortPreferConnections(conns, filter, info, allowFallback)
if allowFallback {
res.all = conns
res.all = conns.Conns()
} else {
res.all = res.prefer
}
Expand Down Expand Up @@ -115,40 +116,40 @@ func (s *connectionsState) selectRandomConnection(conns []conn.Conn, allowBanned
return nil, failedConns
}

func connsToNodeIDMap(conns []conn.Conn) (nodes map[uint32]conn.Conn) {
if len(conns) == 0 {
func connsToNodeIDMap(conns *connByEndpoint) (nodes map[uint32]conn.Conn) {
if conns.Len() == 0 {
return nil
}
nodes = make(map[uint32]conn.Conn, len(conns))
for _, c := range conns {
nodes[c.Endpoint().NodeID()] = c
}
nodes = make(map[uint32]conn.Conn, conns.Len())
conns.Each(func(c conn.Conn, e endpoint.Endpoint) {
nodes[e.NodeID()] = c
})

return nodes
}

func sortPreferConnections(
conns []conn.Conn,
conns *connByEndpoint,
filter balancerConfig.Filter,
info balancerConfig.Info,
allowFallback bool,
) (prefer, fallback []conn.Conn) {
if filter == nil {
return conns, nil
return conns.Conns(), nil
}

prefer = make([]conn.Conn, 0, len(conns))
prefer = make([]conn.Conn, 0, conns.Len())
if allowFallback {
fallback = make([]conn.Conn, 0, len(conns))
fallback = make([]conn.Conn, 0, conns.Len())
}

for _, c := range conns {
if filter.Allow(info, c) {
conns.Each(func(c conn.Conn, e endpoint.Endpoint) {
if filter.Allow(info, e) {
prefer = append(prefer, c)
} else if allowFallback {
fallback = append(fallback, c)
}
}
})

return prefer, fallback
}
Expand Down
Loading
Loading