Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix connection leak #31

Merged
merged 2 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 16 additions & 38 deletions pkg/activerecord/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"hash"
"hash/crc32"
"math/rand"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -138,18 +137,8 @@ func (s *Shard) NextReplica() ShardInstance {
func (c *Shard) Instances() []ShardInstance {
instances := make([]ShardInstance, 0, len(c.Masters)+len(c.Replicas))
instances = append(instances, c.Masters...)
// сортировка в подсписках чтобы не зависет от порядка в котором инстансы добавлялись в конфигурацию
sort.Slice(instances, func(i, j int) bool {
return instances[i].ParamsID < instances[j].ParamsID
})

instances = append(instances, c.Replicas...)

replicas := instances[len(c.Masters):]
sort.Slice(replicas, func(i, j int) bool {
return replicas[i].ParamsID < replicas[j].ParamsID
})

return instances
}

Expand Down Expand Up @@ -421,41 +410,30 @@ func NewConfigCacher() *DefaultConfigCacher {
// Получение конфигурации. Если есть в кеше и он еще валидный, то конфигурация берётся из кешаб
// если в кеше нет, то достаём из конфига и кешируем.
func (cc *DefaultConfigCacher) Get(ctx context.Context, path string, globs MapGlobParam, optionCreator func(ShardInstanceConfig) (OptionInterface, error)) (*Cluster, error) {
curConf := cc.container[path]
if cc.lock.TryLock() {
if cc.updateTime.Sub(Config().GetLastUpdateTime()) < 0 {
// Очищаем кеш если поменялся конфиг
cc.container = make(map[string]*Cluster)
cc.updateTime = time.Now()
}
cc.lock.Unlock()
}

cc.lock.RLock()
conf, ex := cc.container[path]
confUpdateTime := cc.updateTime
cc.lock.RUnlock()

if !ex {
return cc.loadClusterInfo(ctx, curConf, path, globs, optionCreator)
}

return conf, nil
}
// Если конфигурация не найдена в кеше или конфигурация была обновлена, то перегружаем конфигурацию
if !ex || confUpdateTime.Sub(Config().GetLastUpdateTime()) < 0 {
cc.lock.Lock()
newConf, err := GetClusterInfoFromCfg(ctx, path, globs, optionCreator)
if err != nil {
cc.lock.Unlock()

func (cc *DefaultConfigCacher) loadClusterInfo(ctx context.Context, curConf *Cluster, path string, globs MapGlobParam, optionCreator func(ShardInstanceConfig) (OptionInterface, error)) (*Cluster, error) {
cc.lock.Lock()
defer cc.lock.Unlock()
return nil, fmt.Errorf("can't get config: %w", err)
}

conf, err := GetClusterInfoFromCfg(ctx, path, globs, optionCreator)
if err != nil {
return nil, fmt.Errorf("can't get config: %w", err)
}
// если конфигурация поменялась, то обновляем её в кеше
if !newConf.Equal(conf) {
conf = newConf
cc.container[path] = conf
cc.updateTime = time.Now()
}

if conf.Equal(curConf) {
conf = curConf
cc.lock.Unlock()
}

cc.container[path] = conf

return conf, nil
}
56 changes: 0 additions & 56 deletions pkg/activerecord/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,59 +184,3 @@ func TestGetClusterInfoFromCfg(t *testing.T) {
})
}
}

func TestShard_Instances(t *testing.T) {
tests := []struct {
name string
shard Shard
want []ShardInstance
}{
{
name: "unordered sequence",
shard: Shard{
Masters: []ShardInstance{
{
ParamsID: "Master2",
},
{
ParamsID: "Master1",
},
},
Replicas: []ShardInstance{
{
ParamsID: "Replica2",
},
{
ParamsID: "Replica3",
},
{
ParamsID: "Replica1",
},
},
},
want: []ShardInstance{
{
ParamsID: "Master1",
},
{
ParamsID: "Master2",
},
{
ParamsID: "Replica1",
},
{
ParamsID: "Replica2",
},
{
ParamsID: "Replica3",
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := tt.shard.Instances()
assert.Check(t, cmp.DeepEqual(tt.want, got))
})
}
}
26 changes: 13 additions & 13 deletions pkg/octopus/box.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,16 @@ func Box(ctx context.Context, shard int, instType activerecord.ShardInstanceType
}

func CheckShardInstance(ctx context.Context, instance activerecord.ShardInstance) (activerecord.OptionInterface, error) {
octopusOpt, ok := instance.Options.(*ConnectionOptions)
if !ok {
return nil, fmt.Errorf("invalit type of options %T, want Options", instance.Options)
}

var err error
c := activerecord.ConnectionCacher().Get(instance)
if c == nil {
c, err = GetConnection(ctx, octopusOpt)
if err != nil {
return nil, fmt.Errorf("error from connectionCacher: %w", err)
c, err := activerecord.ConnectionCacher().GetOrAdd(instance, func(options interface{}) (activerecord.ConnectionInterface, error) {
octopusOpt, ok := options.(*ConnectionOptions)
if !ok {
return nil, fmt.Errorf("invalit type of options %T, want Options", options)
}

return GetConnection(ctx, octopusOpt)
})
if err != nil {
return nil, fmt.Errorf("error from connectionCacher: %w", err)
}

conn, ok := c.(*Connection)
Expand All @@ -123,10 +121,12 @@ func CheckShardInstance(ctx context.Context, instance activerecord.ShardInstance
ret := td[0]
switch string(ret.Data[0]) {
case "primary":
return NewOptions(octopusOpt.server, ModeMaster)
instance.Config.Mode = activerecord.ServerModeType(ModeMaster)
default:
return NewOptions(octopusOpt.server, ModeReplica)
instance.Config.Mode = activerecord.ServerModeType(ModeReplica)
}

return DefaultOptionCreator(instance.Config)
}

return nil, fmt.Errorf("can't parse status: %w", err)
Expand Down
Loading