Skip to content

Commit

Permalink
feat: Implement automatic refresh functionality for TableMetaCache (#739
Browse files Browse the repository at this point in the history
)

* support auto refresh table mate cache

* fix test

* Refactor the code to address the issue of duplicate definitions

* Refactor the code to address the issue of duplicate definitions

* Refactor the code to address the issue of duplicate definitions

* Refactor the code to address the issue of duplicate definitions

---------

Co-authored-by: root <[email protected]>
Co-authored-by: JayLiu <[email protected]>
  • Loading branch information
3 people authored Dec 21, 2024
1 parent 0b45672 commit af53317
Show file tree
Hide file tree
Showing 10 changed files with 413 additions and 16 deletions.
26 changes: 22 additions & 4 deletions pkg/datasource/sql/datasource/base/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sync"
"time"

"github.com/go-sql-driver/mysql"

"seata.apache.org/seata-go/pkg/datasource/sql/types"
)

Expand All @@ -32,7 +34,7 @@ type (
trigger interface {
LoadOne(ctx context.Context, dbName string, table string, conn *sql.Conn) (*types.TableMeta, error)

LoadAll() ([]types.TableMeta, error)
LoadAll(ctx context.Context, dbName string, conn *sql.Conn, tables ...string) ([]types.TableMeta, error)
}

entry struct {
Expand All @@ -50,10 +52,12 @@ type BaseTableMetaCache struct {
cache map[string]*entry
cancel context.CancelFunc
trigger trigger
db *sql.DB
cfg *mysql.Config
}

// NewBaseCache
func NewBaseCache(capity int32, expireDuration time.Duration, trigger trigger) *BaseTableMetaCache {
func NewBaseCache(capity int32, expireDuration time.Duration, trigger trigger, db *sql.DB, cfg *mysql.Config) *BaseTableMetaCache {
ctx, cancel := context.WithCancel(context.Background())

c := &BaseTableMetaCache{
Expand All @@ -64,6 +68,8 @@ func NewBaseCache(capity int32, expireDuration time.Duration, trigger trigger) *
cache: map[string]*entry{},
cancel: cancel,
trigger: trigger,
cfg: cfg,
db: db,
}

c.Init(ctx)
Expand All @@ -82,7 +88,19 @@ func (c *BaseTableMetaCache) Init(ctx context.Context) error {
// refresh
func (c *BaseTableMetaCache) refresh(ctx context.Context) {
f := func() {
v, err := c.trigger.LoadAll()
if c.db == nil || c.cfg == nil || c.cache == nil || len(c.cache) == 0 {
return
}

tables := make([]string, 0, len(c.cache))
for table := range c.cache {
tables = append(tables, table)
}
conn, err := c.db.Conn(ctx)
if err != nil {
return
}
v, err := c.trigger.LoadAll(ctx, c.cfg.DBName, conn, tables...)
if err != nil {
return
}
Expand All @@ -92,7 +110,7 @@ func (c *BaseTableMetaCache) refresh(ctx context.Context) {

for i := range v {
tm := v[i]
if _, ok := c.cache[tm.TableName]; !ok {
if _, ok := c.cache[tm.TableName]; ok {
c.cache[tm.TableName] = &entry{
value: tm,
}
Expand Down
126 changes: 126 additions & 0 deletions pkg/datasource/sql/datasource/base/meta_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package base

import (
"context"
"database/sql"
"sync"
"testing"
"time"

"github.com/agiledragon/gomonkey/v2"
"github.com/go-sql-driver/mysql"
"github.com/stretchr/testify/assert"
"seata.apache.org/seata-go/pkg/datasource/sql/types"
"seata.apache.org/seata-go/testdata"
)

var (
capacity int32 = 1024
EexpireTime = 15 * time.Minute
tableMetaOnce sync.Once
)

type mockTrigger struct {
}

func (m *mockTrigger) LoadOne(ctx context.Context, dbName string, table string, conn *sql.Conn) (*types.TableMeta, error) {
return nil, nil
}

func (m *mockTrigger) LoadAll(ctx context.Context, dbName string, conn *sql.Conn, tables ...string) ([]types.TableMeta, error) {
return nil, nil
}

func TestBaseTableMetaCache_refresh(t *testing.T) {
type fields struct {
lock sync.RWMutex
expireDuration time.Duration
capity int32
size int32
cache map[string]*entry
cancel context.CancelFunc
trigger trigger
db *sql.DB
cfg *mysql.Config
}
type args struct {
ctx context.Context
}
ctx, cancel := context.WithCancel(context.Background())
tests := []struct {
name string
fields fields
args args
want types.TableMeta
}{
{name: "test-1",
fields: fields{
lock: sync.RWMutex{},
capity: capacity,
size: 0,
expireDuration: EexpireTime,
cache: map[string]*entry{
"test": {
value: types.TableMeta{},
lastAccess: time.Now(),
},
},
cancel: cancel,
trigger: &mockTrigger{},
cfg: &mysql.Config{},
db: &sql.DB{},
}, args: args{ctx: ctx},
want: testdata.MockWantTypesMeta("test")},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

connStub := gomonkey.ApplyMethodFunc(tt.fields.db, "Conn",
func(_ context.Context) (*sql.Conn, error) {
return &sql.Conn{}, nil
})

defer connStub.Reset()

loadAllStub := gomonkey.ApplyMethodFunc(tt.fields.trigger, "LoadAll",
func(_ context.Context, _ string, _ *sql.Conn, _ ...string) ([]types.TableMeta, error) {
return []types.TableMeta{tt.want}, nil
})

defer loadAllStub.Reset()

c := &BaseTableMetaCache{
lock: tt.fields.lock,
expireDuration: tt.fields.expireDuration,
capity: tt.fields.capity,
size: tt.fields.size,
cache: tt.fields.cache,
cancel: tt.fields.cancel,
trigger: tt.fields.trigger,
db: tt.fields.db,
cfg: tt.fields.cfg,
}
go c.refresh(tt.args.ctx)
time.Sleep(time.Second * 3)

assert.Equal(t, c.cache["test"].value, tt.want)
})
}
}
6 changes: 4 additions & 2 deletions pkg/datasource/sql/datasource/mysql/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sync"
"time"

"github.com/go-sql-driver/mysql"

"seata.apache.org/seata-go/pkg/datasource/sql/datasource/base"
"seata.apache.org/seata-go/pkg/datasource/sql/types"
)
Expand All @@ -39,9 +41,9 @@ type TableMetaCache struct {
db *sql.DB
}

func NewTableMetaInstance(db *sql.DB) *TableMetaCache {
func NewTableMetaInstance(db *sql.DB, cfg *mysql.Config) *TableMetaCache {
tableMetaInstance := &TableMetaCache{
tableMetaCache: base.NewBaseCache(capacity, EexpireTime, NewMysqlTrigger()),
tableMetaCache: base.NewBaseCache(capacity, EexpireTime, NewMysqlTrigger(), db, cfg),
db: db,
}
return tableMetaInstance
Expand Down
12 changes: 10 additions & 2 deletions pkg/datasource/sql/datasource/mysql/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,16 @@ func (m *mysqlTrigger) LoadOne(ctx context.Context, dbName string, tableName str
}

// LoadAll
func (m *mysqlTrigger) LoadAll() ([]types.TableMeta, error) {
return []types.TableMeta{}, nil
func (m *mysqlTrigger) LoadAll(ctx context.Context, dbName string, conn *sql.Conn, tables ...string) ([]types.TableMeta, error) {
var tableMetas []types.TableMeta
for _, tableName := range tables {
tableMeta, err := m.LoadOne(ctx, dbName, tableName, conn)
if err != nil {
continue
}
tableMetas = append(tableMetas, *tableMeta)
}
return tableMetas, nil
}

// getColumnMetas get tableMeta column
Expand Down
Loading

0 comments on commit af53317

Please sign in to comment.