Skip to content
This repository was archived by the owner on Nov 24, 2023. It is now read-only.

Commit 6c108ab

Browse files
authoredApr 29, 2021
Hotfix again (#1641)
1 parent be5b54f commit 6c108ab

File tree

26 files changed

+54
-51
lines changed

26 files changed

+54
-51
lines changed
 

‎.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@ relay_log/*
1111
vendor
1212
*/*.DS_Store
1313
tidb-slow.log
14+
.idea/

‎dm/config/subtask.go

+3
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,9 @@ type SubTaskConfig struct {
181181

182182
ConfigFile string `toml:"-" json:"config-file"`
183183

184+
UpperSchema []string `toml:"upper-schema" json:"upper-schema"`
185+
UpperTable []string `toml:"upper-table" json:"upper-table"`
186+
184187
// still needed by Syncer / Loader bin
185188
printVersion bool
186189
}

‎dm/config/task.go

+6
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,9 @@ type TaskConfig struct {
263263
Mydumpers map[string]*MydumperConfig `yaml:"mydumpers"`
264264
Loaders map[string]*LoaderConfig `yaml:"loaders"`
265265
Syncers map[string]*SyncerConfig `yaml:"syncers"`
266+
267+
UpperSchema []string `yaml:"upper-schema"`
268+
UpperTable []string `yaml:"upper-table"`
266269
}
267270

268271
// NewTaskConfig creates a TaskConfig
@@ -520,6 +523,9 @@ func (c *TaskConfig) SubTaskConfigs(sources map[string]DBConfig) ([]*SubTaskConf
520523
cfg.LoaderConfig = *inst.Loader
521524
cfg.SyncerConfig = *inst.Syncer
522525

526+
cfg.UpperSchema = c.UpperSchema
527+
cfg.UpperTable = c.UpperTable
528+
523529
err := cfg.Adjust(true)
524530
if err != nil {
525531
return nil, terror.Annotatef(err, "source %s", inst.SourceID)

‎syncer/ddl.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ func (s *Syncer) dropSchemaInSharding(tctx *tcontext.Context, sourceSchema strin
256256
targetSchema, targetTable := UnpackTableID(name)
257257
sourceIDs := make([]string, 0, len(tables))
258258
for _, table := range tables {
259-
sourceID, _ := GenTableID(table[0], table[1])
259+
sourceID, _ := GenTableID(table[0], table[1], s.cfg.UpperSchema, s.cfg.UpperTable)
260260
sourceIDs = append(sourceIDs, sourceID)
261261
}
262262
err := s.sgk.LeaveGroup(targetSchema, targetTable, sourceIDs)

‎syncer/sharding_group.go

+33-7
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ import (
7979
"github.com/pingcap/dm/dm/pb"
8080
"github.com/pingcap/dm/pkg/conn"
8181
tcontext "github.com/pingcap/dm/pkg/context"
82+
"github.com/pingcap/dm/pkg/log"
8283
"github.com/pingcap/dm/pkg/terror"
8384
shardmeta "github.com/pingcap/dm/syncer/sharding-meta"
8485

@@ -379,7 +380,32 @@ func (sg *ShardingGroup) FlushData(targetTableID string) ([]string, [][]interfac
379380
}
380381

381382
// GenTableID generates table ID
382-
func GenTableID(schema, table string) (ID string, isSchemaOnly bool) {
383+
func GenTableID(schema, table string, upperSchema, upperTable []string) (ID string, isSchemaOnly bool) {
384+
for _, s := range upperSchema {
385+
if s == schema {
386+
oldSchema := schema
387+
schema = strings.ToLower(schema)
388+
if oldSchema != schema {
389+
log.L().Warn("hotfix, changing schema to lowercase",
390+
zap.String("old schema", oldSchema),
391+
zap.String("schema", schema))
392+
}
393+
break
394+
}
395+
}
396+
for _, t := range upperTable {
397+
if t == table {
398+
oldTable := table
399+
table = strings.ToLower(table)
400+
if oldTable != table {
401+
log.L().Warn("hotfix, changing table to lowercase",
402+
zap.String("old table", oldTable),
403+
zap.String("table", table))
404+
}
405+
break
406+
}
407+
}
408+
383409
if len(table) == 0 {
384410
return fmt.Sprintf("`%s`", schema), true
385411
}
@@ -425,8 +451,8 @@ func NewShardingGroupKeeper(tctx *tcontext.Context, cfg *config.SubTaskConfig) *
425451
func (k *ShardingGroupKeeper) AddGroup(targetSchema, targetTable string, sourceIDs []string, meta *shardmeta.ShardingMeta, merge bool) (needShardingHandle bool, group *ShardingGroup, synced bool, remain int, err error) {
426452
// if need to support target table-level sharding DDL
427453
// we also need to support target schema-level sharding DDL
428-
schemaID, _ := GenTableID(targetSchema, "")
429-
targetTableID, _ := GenTableID(targetSchema, targetTable)
454+
schemaID, _ := GenTableID(targetSchema, "", k.cfg.UpperSchema, k.cfg.UpperTable)
455+
targetTableID, _ := GenTableID(targetSchema, targetTable, k.cfg.UpperSchema, k.cfg.UpperTable)
430456

431457
k.Lock()
432458
defer k.Unlock()
@@ -486,8 +512,8 @@ func (k *ShardingGroupKeeper) ResetGroups() {
486512
// LeaveGroup leaves group according to target schema, table and source IDs
487513
// LeaveGroup doesn't affect in syncing process
488514
func (k *ShardingGroupKeeper) LeaveGroup(targetSchema, targetTable string, sources []string) error {
489-
schemaID, _ := GenTableID(targetSchema, "")
490-
targetTableID, _ := GenTableID(targetSchema, targetTable)
515+
schemaID, _ := GenTableID(targetSchema, "", k.cfg.UpperSchema, k.cfg.UpperTable)
516+
targetTableID, _ := GenTableID(targetSchema, targetTable, k.cfg.UpperSchema, k.cfg.UpperTable)
491517
k.Lock()
492518
defer k.Unlock()
493519
if group, ok := k.groups[targetTableID]; ok {
@@ -514,7 +540,7 @@ func (k *ShardingGroupKeeper) TrySync(
514540
targetSchema, targetTable, source string, pos, endPos mysql.Position, ddls []string) (
515541
needShardingHandle bool, group *ShardingGroup, synced, active bool, remain int, err error) {
516542

517-
targetTableID, schemaOnly := GenTableID(targetSchema, targetTable)
543+
targetTableID, schemaOnly := GenTableID(targetSchema, targetTable, k.cfg.UpperSchema, k.cfg.UpperTable)
518544
if schemaOnly {
519545
// NOTE: now we don't support syncing for schema only sharding DDL
520546
return false, nil, true, false, 0, nil
@@ -563,7 +589,7 @@ func (k *ShardingGroupKeeper) UnresolvedTables() (map[string]bool, [][]string) {
563589

564590
// Group returns target table's group, nil if not exist
565591
func (k *ShardingGroupKeeper) Group(targetSchema, targetTable string) *ShardingGroup {
566-
targetTableID, _ := GenTableID(targetSchema, targetTable)
592+
targetTableID, _ := GenTableID(targetSchema, targetTable, k.cfg.UpperSchema, k.cfg.UpperTable)
567593
k.RLock()
568594
defer k.RUnlock()
569595
return k.groups[targetTableID]

‎syncer/syncer.go

+7-6
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ func (s *Syncer) initShardingGroups() error {
440440
if !ok {
441441
mSchema[targetTable] = make([]string, 0, len(tables))
442442
}
443-
ID, _ := GenTableID(schema, table)
443+
ID, _ := GenTableID(schema, table, s.cfg.UpperSchema, s.cfg.UpperTable)
444444
mSchema[targetTable] = append(mSchema[targetTable], ID)
445445
}
446446
}
@@ -453,7 +453,7 @@ func (s *Syncer) initShardingGroups() error {
453453
// add sharding group
454454
for targetSchema, mSchema := range mapper {
455455
for targetTable, sourceIDs := range mSchema {
456-
tableID, _ := GenTableID(targetSchema, targetTable)
456+
tableID, _ := GenTableID(targetSchema, targetTable, s.cfg.UpperSchema, s.cfg.UpperTable)
457457
_, _, _, _, err := s.sgk.AddGroup(targetSchema, targetTable, sourceIDs, loadMeta[tableID], false)
458458
if err != nil {
459459
return err
@@ -1447,7 +1447,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err
14471447
}
14481448

14491449
if s.cfg.IsSharding {
1450-
source, _ := GenTableID(originSchema, originTable)
1450+
source, _ := GenTableID(originSchema, originTable, s.cfg.UpperSchema, s.cfg.UpperTable)
14511451
if s.sgk.InSyncing(schemaName, tableName, source, *ec.currentPos) {
14521452
// if in unsync stage and not before active DDL, ignore it
14531453
// if in sharding re-sync stage and not before active DDL (the next DDL to be synced), ignore it
@@ -1666,7 +1666,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
16661666
}
16671667
continue
16681668
case *ast.DropTableStmt:
1669-
sourceID, _ := GenTableID(tableNames[0][0].Schema, tableNames[0][0].Name)
1669+
sourceID, _ := GenTableID(tableNames[0][0].Schema, tableNames[0][0].Name, s.cfg.UpperSchema, s.cfg.UpperTable)
16701670
err = s.sgk.LeaveGroup(tableNames[1][0].Schema, tableNames[1][0].Name, []string{sourceID})
16711671
if err != nil {
16721672
return err
@@ -1772,7 +1772,8 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
17721772
Name: ec.currentPos.Name,
17731773
Pos: ec.currentPos.Pos - ec.header.EventSize,
17741774
}
1775-
source, _ = GenTableID(ddlInfo.tableNames[0][0].Schema, ddlInfo.tableNames[0][0].Name)
1775+
1776+
source, _ = GenTableID(ddlInfo.tableNames[0][0].Schema, ddlInfo.tableNames[0][0].Name, s.cfg.UpperSchema, s.cfg.UpperTable)
17761777

17771778
var annotate string
17781779
switch ddlInfo.stmt.(type) {
@@ -1801,7 +1802,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
18011802
s.tctx.L().Info(annotate, zap.String("event", "query"), zap.String("source", source), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), zap.Bool("in-sharding", needShardingHandle), zap.Stringer("start position", startPos), zap.Bool("is-synced", synced), zap.Int("unsynced", remain))
18021803

18031804
if needShardingHandle {
1804-
target, _ := GenTableID(ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name)
1805+
target, _ := GenTableID(ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name, s.cfg.UpperSchema, s.cfg.UpperTable)
18051806
unsyncedTableGauge.WithLabelValues(s.cfg.Name, target).Set(float64(remain))
18061807
err = ec.safeMode.IncrForTable(s.tctx, ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name) // try enable safe-mode when starting syncing for sharding group
18071808
if err != nil {

‎tests/all_mode/conf/diff_config.toml

-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ check-thread-count = 4
88

99
sample-percent = 100
1010

11-
use-rowid = false
12-
1311
use-checksum = true
1412

1513
fix-sql-file = "fix.sql"

‎tests/compatibility/conf/diff_config.toml

-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ check-thread-count = 4
88

99
sample-percent = 100
1010

11-
use-rowid = false
12-
1311
use-checksum = true
1412

1513
fix-sql-file = "fix.sql"

‎tests/dmctl_basic/conf/diff_config.toml

-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ check-thread-count = 4
88

99
sample-percent = 100
1010

11-
use-rowid = false
12-
1311
use-checksum = true
1412

1513
fix-sql-file = "fix.sql"

‎tests/dmctl_command/conf/diff_config.toml

-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ check-thread-count = 4
88

99
sample-percent = 100
1010

11-
use-rowid = false
12-
1311
use-checksum = true
1412

1513
fix-sql-file = "fix.sql"

‎tests/full_mode/conf/diff_config.toml

-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ check-thread-count = 4
88

99
sample-percent = 100
1010

11-
use-rowid = false
12-
1311
use-checksum = true
1412

1513
fix-sql-file = "fix.sql"

‎tests/http_apis/conf/diff_config.toml

-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ check-thread-count = 4
88

99
sample-percent = 100
1010

11-
use-rowid = false
12-
1311
use-checksum = true
1412

1513
fix-sql-file = "fix.sql"

‎tests/incremental_mode/conf/diff_config.toml

-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ check-thread-count = 4
88

99
sample-percent = 100
1010

11-
use-rowid = false
12-
1311
use-checksum = true
1412

1513
fix-sql-file = "fix.sql"

‎tests/initial_unit/conf/diff_config.toml

-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ check-thread-count = 4
88

99
sample-percent = 100
1010

11-
use-rowid = false
12-
1311
use-checksum = true
1412

1513
fix-sql-file = "fix.sql"

‎tests/load_interrupt/conf/diff_config.toml

-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ check-thread-count = 4
88

99
sample-percent = 100
1010

11-
use-rowid = false
12-
1311
use-checksum = true
1412

1513
fix-sql-file = "fix.sql"

‎tests/online_ddl/conf/diff_config.toml

-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ check-thread-count = 4
88

99
sample-percent = 100
1010

11-
use-rowid = false
12-
1311
use-checksum = true
1412

1513
fix-sql-file = "fix.sql"

‎tests/print_status/conf/diff_config.toml

-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ check-thread-count = 4
88

99
sample-percent = 100
1010

11-
use-rowid = false
12-
1311
use-checksum = true
1412

1513
use-checkpoint = false

‎tests/relay_interrupt/conf/diff_config.toml

-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ check-thread-count = 4
88

99
sample-percent = 100
1010

11-
use-rowid = false
12-
1311
use-checksum = true
1412

1513
fix-sql-file = "fix.sql"

‎tests/retry_cancel/conf/diff_config.toml

-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ check-thread-count = 4
88

99
sample-percent = 100
1010

11-
use-rowid = false
12-
1311
use-checksum = true
1412

1513
fix-sql-file = "fix.sql"

‎tests/safe_mode/conf/diff_config.toml

-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ check-thread-count = 4
88

99
sample-percent = 100
1010

11-
use-rowid = false
12-
1311
use-checksum = true
1412

1513
fix-sql-file = "fix.sql"

‎tests/sequence_safe_mode/conf/diff_config.toml

-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ check-thread-count = 4
88

99
sample-percent = 100
1010

11-
use-rowid = false
12-
1311
use-checksum = true
1412

1513
fix-sql-file = "fix.sql"

‎tests/sequence_sharding/conf/diff_config.toml

-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ check-thread-count = 4
88

99
sample-percent = 100
1010

11-
use-rowid = false
12-
1311
use-checksum = true
1412

1513
fix-sql-file = "fix.sql"

‎tests/sequence_sharding/conf/dm-task.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ meta-schema: "dm_meta"
66
remove-meta: false
77
enable-heartbeat: true
88
timezone: "Asia/Shanghai"
9+
upper-table: ["T1"]
910

1011
target-database:
1112
host: "127.0.0.1"

‎tests/sequence_sharding/data/db1.increment.sql

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ insert into t1 (uid,name) values (100003,'NR');
33
update t1 set name = 'uxoKehvqWg' where `uid` = 100001;
44
update t1 set name = 'bapYymrtfT' where name = 'igvApUx';
55
insert into t2 (uid,name) values (200004,'CXDvoltoliUINgo'),(200005,'188689130');
6-
alter table t1 add column c int;
6+
alter table T1 add column c int;
7+
insert into t1 (uid,name,c) values (123123,'test',123);
78
alter table t1 add index c(c);
89
update t1 set c = 100;
910
alter table t1 add column d int;

‎tests/sharding/conf/diff_config.toml

-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ check-thread-count = 4
88

99
sample-percent = 100
1010

11-
use-rowid = false
12-
1311
use-checksum = true
1412

1513
fix-sql-file = "fix.sql"

‎tests/start_task/conf/diff_config.toml

-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ check-thread-count = 4
88

99
sample-percent = 100
1010

11-
use-rowid = false
12-
1311
use-checksum = true
1412

1513
fix-sql-file = "fix.sql"

0 commit comments

Comments
 (0)
This repository has been archived.