Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dm: add a stand-alone load mode #11749

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ ErrConfigLoaderCfgConflict,[code=20016:class=config:scope=internal:level=medium]
ErrConfigSyncerCfgConflict,[code=20017:class=config:scope=internal:level=medium], "Message: syncer-config-name and syncer should only specify one, Workaround: Please check the `syncer-config-name` and `syncer` config in task configuration file."
ErrConfigReadCfgFromFile,[code=20018:class=config:scope=internal:level=medium], "Message: read config file %v"
ErrConfigNeedUniqueTaskName,[code=20019:class=config:scope=internal:level=medium], "Message: must specify a unique task name, Workaround: Please check the `name` config in task configuration file."
ErrConfigInvalidTaskMode,[code=20020:class=config:scope=internal:level=medium], "Message: please specify right task-mode, support `full`, `incremental`, `all`, Workaround: Please check the `task-mode` config in task configuration file."
ErrConfigInvalidTaskMode,[code=20020:class=config:scope=internal:level=medium], "Message: please specify right task-mode, support `full`, `incremental`, `all`, `dump`, `load`, Workaround: Please check the `task-mode` config in task configuration file."
ErrConfigNeedTargetDB,[code=20021:class=config:scope=internal:level=medium], "Message: must specify target-database, Workaround: Please check the `target-database` config in task configuration file."
ErrConfigMetadataNotSet,[code=20022:class=config:scope=internal:level=medium], "Message: mysql-instance(%s) must set meta for task-mode %s, Workaround: Please check the `meta` config in task configuration file."
ErrConfigRouteRuleNotFound,[code=20023:class=config:scope=internal:level=medium], "Message: mysql-instance(%d)'s route-rules %s not exist in routes, Workaround: Please check the `route-rules` config in task configuration file."
Expand Down
2 changes: 1 addition & 1 deletion dm/config/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func HasDump(taskMode string) bool {
// HasLoad returns true if taskMode contains load unit.
func HasLoad(taskMode string) bool {
switch taskMode {
case ModeAll, ModeFull, ModeLoadSync:
case ModeAll, ModeFull, ModeLoad, ModeLoadSync:
return true
default:
return false
Expand Down
4 changes: 4 additions & 0 deletions dm/config/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ func TestTaskModeHasFunction(t *testing.T) {
require.False(t, HasLoad(ModeDump))
require.False(t, HasSync(ModeDump))

require.False(t, HasDump(ModeLoad))
require.True(t, HasLoad(ModeLoad))
require.False(t, HasSync(ModeLoad))

require.False(t, HasDump(ModeLoadSync))
require.True(t, HasLoad(ModeLoadSync))
require.True(t, HasSync(ModeLoadSync))
Expand Down
6 changes: 4 additions & 2 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ const (
ModeFull = "full"
ModeIncrement = "incremental"
ModeDump = "dump"
ModeLoad = "load"
ModeLoadSync = "load&sync"

DefaultShadowTableRules = "^_(.+)_(?:new|gho)$"
Expand Down Expand Up @@ -347,8 +348,9 @@ func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error {
c.MetaSchema = defaultMetaSchema
}

// adjust dir, no need to do for load&sync mode because it needs its own s3 repository
if HasLoad(c.Mode) && c.Mode != ModeLoadSync {
// adjust dir. Do not do this for both load and load&sync mode, as they are standalone
// mode and should take LoaderConfig.Dir as is
if HasLoad(c.Mode) && c.Mode != ModeLoadSync && c.Mode != ModeLoad {
// check
isS3 := storage.IsS3Path(c.LoaderConfig.Dir)
if isS3 && c.ImportMode == LoadModeLoader {
Expand Down
6 changes: 3 additions & 3 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@
return terror.ErrConfigNeedUniqueTaskName.Generate()
}
switch c.TaskMode {
case ModeFull, ModeIncrement, ModeAll, ModeDump, ModeLoadSync:
case ModeFull, ModeIncrement, ModeAll, ModeDump, ModeLoad, ModeLoadSync:
default:
return terror.ErrConfigInvalidTaskMode.Generate()
}
Expand Down Expand Up @@ -774,9 +774,9 @@
instanceIDs[inst.SourceID] = i

switch c.TaskMode {
case ModeFull, ModeAll, ModeDump:
case ModeFull, ModeAll, ModeDump, ModeLoad:
if inst.Meta != nil {
log.L().Warn("metadata will not be used. for Full mode, incremental sync will never occur; for All mode, the meta dumped by MyDumper will be used", zap.Int("mysql instance", i), zap.String("task mode", c.TaskMode))
log.L().Warn("metadata will not be used. for Full/Dump/Load mode, incremental sync will never occur; for All mode, the meta dumped by MyDumper will be used", zap.Int("mysql instance", i), zap.String("task mode", c.TaskMode))

Check warning on line 779 in dm/config/task.go

View check run for this annotation

Codecov / codecov/patch

dm/config/task.go#L779

Added line #L779 was not covered by tests
}
case ModeIncrement:
if inst.Meta == nil {
Expand Down
2 changes: 1 addition & 1 deletion dm/errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,7 @@ workaround = "Please check the `name` config in task configuration file."
tags = ["internal", "medium"]

[error.DM-config-20020]
message = "please specify right task-mode, support `full`, `incremental`, `all`"
message = "please specify right task-mode, support `full`, `incremental`, `all`, `dump`, `load`"
description = ""
workaround = "Please check the `task-mode` config in task configuration file."
tags = ["internal", "medium"]
Expand Down
102 changes: 51 additions & 51 deletions dm/openapi/gen.server.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions dm/openapi/gen.types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dm/openapi/spec/dm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1846,6 +1846,7 @@ components:
- "incremental"
- "all"
- "dump"
- "load"
shard_mode:
type: string
description: the way to coordinate DDL
Expand Down
2 changes: 1 addition & 1 deletion dm/pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,7 @@ var (
ErrConfigSyncerCfgConflict = New(codeConfigSyncerCfgConflict, ClassConfig, ScopeInternal, LevelMedium, "syncer-config-name and syncer should only specify one", "Please check the `syncer-config-name` and `syncer` config in task configuration file.")
ErrConfigReadCfgFromFile = New(codeConfigReadCfgFromFile, ClassConfig, ScopeInternal, LevelMedium, "read config file %v", "")
ErrConfigNeedUniqueTaskName = New(codeConfigNeedUniqueTaskName, ClassConfig, ScopeInternal, LevelMedium, "must specify a unique task name", "Please check the `name` config in task configuration file.")
ErrConfigInvalidTaskMode = New(codeConfigInvalidTaskMode, ClassConfig, ScopeInternal, LevelMedium, "please specify right task-mode, support `full`, `incremental`, `all`", "Please check the `task-mode` config in task configuration file.")
ErrConfigInvalidTaskMode = New(codeConfigInvalidTaskMode, ClassConfig, ScopeInternal, LevelMedium, "please specify right task-mode, support `full`, `incremental`, `all`, `dump`, `load`", "Please check the `task-mode` config in task configuration file.")
ErrConfigNeedTargetDB = New(codeConfigNeedTargetDB, ClassConfig, ScopeInternal, LevelMedium, "must specify target-database", "Please check the `target-database` config in task configuration file.")
ErrConfigMetadataNotSet = New(codeConfigMetadataNotSet, ClassConfig, ScopeInternal, LevelMedium, "mysql-instance(%s) must set meta for task-mode %s", "Please check the `meta` config in task configuration file.")
ErrConfigRouteRuleNotFound = New(codeConfigRouteRuleNotFound, ClassConfig, ScopeInternal, LevelMedium, "mysql-instance(%d)'s route-rules %s not exist in routes", "Please check the `route-rules` config in task configuration file.")
Expand Down
41 changes: 41 additions & 0 deletions dm/tests/openapi/client/openapi_task_check
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import sys
import requests

SHARD_TASK_NAME = "test-shard"
LOAD_TASK_NAME = "test-load"
ILLEGAL_CHAR_TASK_NAME = "t-Ë!s`t"
SOURCE1_NAME = "mysql-01"
SOURCE2_NAME = "mysql-02"
Expand Down Expand Up @@ -308,6 +309,45 @@ def create_dump_task_success():
print("create_dump_task_success resp=", resp.json())
assert resp.status_code == 201

def create_load_task_success():
task = {
"name": LOAD_TASK_NAME,
"task_mode": "load",
"meta_schema": "dm-meta",
"enhance_online_schema_change": True,
"on_duplicate": "error",
"target_config": {
"host": "127.0.0.1",
"port": 4000,
"user": "root",
"password": "",
},
"table_migrate_rule": [
{
"source": {
"source_name": SOURCE1_NAME,
"schema": "openapi",
"table": "*",
},
"target": {"schema": "openapi", "table": ""},
}
],
"source_config": {
"full_migrate_conf": {
"export_threads": 4,
"import_threads": 16,
"data_dir": "./exported_data",
"consistency": "auto",
},
"source_conf": [
{"source_name": SOURCE1_NAME}
],
},
}
resp = requests.post(url=API_ENDPOINT, json={"task": task})
print("create_load_task_success resp=", resp.json())
assert resp.status_code == 201

def start_task_success(task_name, source_name):
url = API_ENDPOINT + "/" + task_name + "/start"
req = {}
Expand Down Expand Up @@ -810,6 +850,7 @@ if __name__ == "__main__":
"create_noshard_task_success": create_noshard_task_success,
"create_shard_task_success": create_shard_task_success,
"create_dump_task_success": create_dump_task_success,
"create_load_task_success": create_load_task_success,
"create_incremental_task_with_gtid_success": create_incremental_task_with_gtid_success,
"delete_task_failed": delete_task_failed,
"delete_task_success": delete_task_success,
Expand Down
29 changes: 29 additions & 0 deletions dm/tests/openapi/conf/diff_config_no_shard_one_source.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diff Configuration.

check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/ticdc_dm_test/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["openapi.t1", "openapi.t2"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
password = "123456"
port = 3306
user = "root"

[data-sources.tidb0]
host = "127.0.0.1"
password = ""
port = 4000
user = "root"
Loading
Loading