Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ dist/
.tidbcloud-cli.toml
.CHANGELOG.md
node_modules/
.gomodcache/
.gocache/
43 changes: 23 additions & 20 deletions internal/cli/serverless/migration/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,26 +86,28 @@ func CreateCmd(h *internal.Helper) *cobra.Command {
}
definitionStr := string(definitionBytes)

sources, target, mode, err := parseMigrationDefinition(definitionStr)
sources, target, mode, binlogFilterRule, err := parseMigrationDefinition(definitionStr)
if err != nil {
return err
}

if dryRun {
precheckBody := &pkgmigration.MigrationServicePrecheckBody{
DisplayName: name,
Sources: sources,
Target: target,
Mode: mode,
DisplayName: name,
Sources: sources,
Target: target,
Mode: mode,
BinlogFilterRule: binlogFilterRule,
}
return runMigrationPrecheck(ctx, d, clusterID, precheckBody, h)
}

createBody := &pkgmigration.MigrationServiceCreateMigrationBody{
DisplayName: name,
Sources: sources,
Target: target,
Mode: mode,
DisplayName: name,
Sources: sources,
Target: target,
Mode: mode,
BinlogFilterRule: binlogFilterRule,
}

resp, err := d.CreateMigration(ctx, clusterID, createBody)
Expand Down Expand Up @@ -247,34 +249,35 @@ func shouldPrintPrecheckItem(status *pkgmigration.PrecheckItemStatus) bool {
}
}

func parseMigrationDefinition(value string) ([]pkgmigration.Source, pkgmigration.Target, pkgmigration.TaskMode, error) {
func parseMigrationDefinition(value string) ([]pkgmigration.Source, pkgmigration.Target, pkgmigration.TaskMode, *pkgmigration.BinlogFilterRule, error) {
trimmed := strings.TrimSpace(value)
if trimmed == "" {
return nil, pkgmigration.Target{}, "", errors.New("migration config is required; use --config-file")
return nil, pkgmigration.Target{}, "", nil, errors.New("migration config is required; use --config-file")
}
var payload struct {
Sources []pkgmigration.Source `json:"sources"`
Target *pkgmigration.Target `json:"target"`
Mode string `json:"mode"`
Sources []pkgmigration.Source `json:"sources"`
Target *pkgmigration.Target `json:"target"`
Mode string `json:"mode"`
BinlogFilterRule *pkgmigration.BinlogFilterRule `json:"binlogFilterRule"`
}
stdJson, err := standardizeJSON([]byte(trimmed))
if err != nil {
return nil, pkgmigration.Target{}, "", errors.Annotate(err, "invalid migration definition JSON")
return nil, pkgmigration.Target{}, "", nil, errors.Annotate(err, "invalid migration definition JSON")
}
if err := json.Unmarshal(stdJson, &payload); err != nil {
return nil, pkgmigration.Target{}, "", errors.Annotate(err, "invalid migration definition JSON")
return nil, pkgmigration.Target{}, "", nil, errors.Annotate(err, "invalid migration definition JSON")
}
if len(payload.Sources) == 0 {
return nil, pkgmigration.Target{}, "", errors.New("migration definition must include at least one source")
return nil, pkgmigration.Target{}, "", nil, errors.New("migration definition must include at least one source")
}
if payload.Target == nil {
return nil, pkgmigration.Target{}, "", errors.New("migration definition must include the target block")
return nil, pkgmigration.Target{}, "", nil, errors.New("migration definition must include the target block")
}
mode, err := parseMigrationMode(payload.Mode)
if err != nil {
return nil, pkgmigration.Target{}, "", err
return nil, pkgmigration.Target{}, "", nil, err
}
return payload.Sources, *payload.Target, mode, nil
return payload.Sources, *payload.Target, mode, payload.BinlogFilterRule, nil
}

func parseMigrationMode(value string) (pkgmigration.TaskMode, error) {
Expand Down
17 changes: 17 additions & 0 deletions internal/cli/serverless/migration/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ func (suite *CreateMigrationSuite) writeTempConfig(content string) string {
func validMigrationConfig() string {
return `{
"mode": "ALL",
"binlogFilterRule": {
"ignoreEvent": ["truncate table", "drop database"],
"ignoreSql": ["^DROP\\s+TABLE.*", "^TRUNCATE\\s+TABLE.*"]
},
"target": {
"user": "migration_user",
"password": "Passw0rd!"
Expand All @@ -171,6 +175,19 @@ func validMigrationConfig() string {
}`
}

func TestParseMigrationDefinition_BinlogFilterRule(t *testing.T) {
assert := require.New(t)

sources, target, mode, binlogFilterRule, err := parseMigrationDefinition(validMigrationConfig())
assert.NoError(err)
assert.Equal(pkgmigration.TASKMODE_ALL, mode)
assert.NotNil(binlogFilterRule)
assert.Equal([]string{"truncate table", "drop database"}, binlogFilterRule.IgnoreEvent)
assert.Equal([]string{`^DROP\s+TABLE.*`, `^TRUNCATE\s+TABLE.*`}, binlogFilterRule.IgnoreSql)
assert.NotEmpty(sources)
assert.Equal("migration_user", target.User)
}

func TestCreateMigrationSuite(t *testing.T) {
suite.Run(t, new(CreateMigrationSuite))
}
14 changes: 14 additions & 0 deletions internal/cli/serverless/migration/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ const (
"user": "migration_user",
"password": "Passw0rd!"
},
// Optional global binlog filter rules applied during incremental replication.
"binlogFilterRule": {
// Event types to ignore, see https://docs.pingcap.com/tidb/stable/dm-binlog-event-filter/#parameter-descriptions .
"ignoreEvent": ["truncate table", "drop database"],
// SQL patterns to ignore.
"ignoreSql": ["^DROP\\s+TABLE.*", "^TRUNCATE\\s+TABLE.*"]
},
// List at least one migration source
"sources": [
{
Expand Down Expand Up @@ -111,6 +118,13 @@ const (
"user": "migration_user",
"password": "Passw0rd!"
},
// Optional global binlog filter rules applied during incremental replication.
"binlogFilterRule": {
// Event types to ignore, see https://docs.pingcap.com/tidb/stable/dm-binlog-event-filter/#parameter-descriptions .
"ignoreEvent": ["truncate table", "drop database"],
// SQL patterns to ignore.
"ignoreSql": ["^DROP\\s+TABLE.*", "^TRUNCATE\\s+TABLE.*"]
},
"sources": [
{
// Required: source database type. Supported values: MYSQL, ALICLOUD_RDS_MYSQL, AWS_RDS_MYSQL
Expand Down
136 changes: 121 additions & 15 deletions pkg/tidbcloud/v1beta1/serverless/dm.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,26 @@
},
"additionalProperties": {}
},
"BinlogFilterRule": {
"type": "object",
"properties": {
"ignoreEvent": {
"type": "array",
"items": {
"type": "string"
},
"description": "Event types to ignore (e.g., \"truncate table\", \"drop database\")."
},
"ignoreSql": {
"type": "array",
"items": {
"type": "string"
},
"description": "SQL patterns to ignore."
}
},
"description": "Binlog filter rules applied during incremental replication."
},
"ConnProfile": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -518,11 +538,19 @@
]
}
},
"required": ["connType", "port", "user", "password"]
"required": [
"connType",
"port",
"user",
"password"
]
},
"ConnType": {
"type": "string",
"enum": ["PUBLIC", "PRIVATE_LINK"],
"enum": [
"PUBLIC",
"PRIVATE_LINK"
],
"description": "The connection type used to connect to the source database.\n\n - PUBLIC: Connect over the public internet.\n - PRIVATE_LINK: Connect via Private Link/Private Endpoint."
},
"CreateMigrationPrecheckResp": {
Expand Down Expand Up @@ -680,12 +708,27 @@
"$ref": "#/definitions/Migration.State"
}
]
},
"binlogFilterRule": {
"description": "Global binlog filter rule applied to the migration.",
"readOnly": true,
"allOf": [
{
"$ref": "#/definitions/BinlogFilterRule"
}
]
}
}
},
"Migration.State": {
"type": "string",
"enum": ["CREATING", "RUNNING", "PAUSED", "FAILED", "DELETING"],
"enum": [
"CREATING",
"RUNNING",
"PAUSED",
"FAILED",
"DELETING"
],
"description": "Overall state of a migration.\n\n - CREATING: Task is being created.\n - RUNNING: Task is actively running.\n - PAUSED: Task is paused.\n - FAILED: Task failed with error.\n - DELETING: Task is being deleted."
},
"MigrationPrecheck": {
Expand Down Expand Up @@ -742,7 +785,13 @@
},
"MigrationPrecheck.Status": {
"type": "string",
"enum": ["RUNNING", "FINISHED", "PENDING", "FAILED", "CANCELED"],
"enum": [
"RUNNING",
"FINISHED",
"PENDING",
"FAILED",
"CANCELED"
],
"description": " - RUNNING: Precheck is in progress.\n - FINISHED: Precheck finished successfully.\n - PENDING: Precheck is pending.\n - FAILED: Precheck failed.\n - CANCELED: Precheck is canceled."
},
"MigrationRule": {
Expand Down Expand Up @@ -778,7 +827,9 @@
"description": "Table pattern of the source, supports wildcards."
}
},
"required": ["schemaPattern"]
"required": [
"schemaPattern"
]
},
"MigrationRule.Table": {
"type": "object",
Expand All @@ -792,7 +843,9 @@
"description": "Table name. Wildcards are not supported. Set empty to use the source table name."
}
},
"required": ["schema"]
"required": [
"schema"
]
},
"MigrationService.CreateMigrationBody": {
"type": "object",
Expand Down Expand Up @@ -824,9 +877,22 @@
"$ref": "#/definitions/TaskMode"
}
]
},
"binlogFilterRule": {
"description": "Global binlog filter rule applied to all migrated tables.",
"allOf": [
{
"$ref": "#/definitions/BinlogFilterRule"
}
]
}
},
"required": ["displayName", "sources", "target", "mode"]
"required": [
"displayName",
"sources",
"target",
"mode"
]
},
"MigrationService.PauseMigrationBody": {
"type": "object",
Expand Down Expand Up @@ -862,9 +928,22 @@
"$ref": "#/definitions/TaskMode"
}
]
},
"binlogFilterRule": {
"description": "Global binlog filter rule applied to all migrated tables.",
"allOf": [
{
"$ref": "#/definitions/BinlogFilterRule"
}
]
}
},
"required": ["displayName", "sources", "target", "mode"]
"required": [
"displayName",
"sources",
"target",
"mode"
]
},
"MigrationService.ResumeMigrationBody": {
"type": "object",
Expand Down Expand Up @@ -915,7 +994,11 @@
},
"PrecheckItem.Status": {
"type": "string",
"enum": ["SUCCESS", "WARNING", "FAILED"],
"enum": [
"SUCCESS",
"WARNING",
"FAILED"
],
"description": " - SUCCESS: Check passed successfully.\n - WARNING: Check resulted in a warning.\n - FAILED: Check failed."
},
"PrecheckItemType": {
Expand Down Expand Up @@ -1009,11 +1092,18 @@
]
}
},
"required": ["connProfile", "sourceType"]
"required": [
"connProfile",
"sourceType"
]
},
"Source.SourceType": {
"type": "string",
"enum": ["MYSQL", "ALICLOUD_RDS_MYSQL", "AWS_RDS_MYSQL"],
"enum": [
"MYSQL",
"ALICLOUD_RDS_MYSQL",
"AWS_RDS_MYSQL"
],
"description": "The source database type.\n\n - MYSQL: Self-managed MySQL.\n - ALICLOUD_RDS_MYSQL: Alibaba Cloud RDS for MySQL.\n - AWS_RDS_MYSQL: Amazon RDS for MySQL."
},
"Status": {
Expand Down Expand Up @@ -1102,12 +1192,22 @@
},
"SubTask.Stage": {
"type": "string",
"enum": ["RUNNING", "PAUSED", "FAILED", "FINISHED", "UNKNOWN"],
"enum": [
"RUNNING",
"PAUSED",
"FAILED",
"FINISHED",
"UNKNOWN"
],
"description": "The high-level lifecycle stage of a subtask.\n\n - RUNNING: Subtask is running.\n - PAUSED: Subtask is paused.\n - FAILED: Subtask failed.\n - FINISHED: Subtask finished successfully.\n - UNKNOWN: Subtask stage is unknown."
},
"SubTask.Step": {
"type": "string",
"enum": ["DUMP", "LOAD", "SYNC"],
"enum": [
"DUMP",
"LOAD",
"SYNC"
],
"description": "The current step within a subtask.\n\n - DUMP: Dump/export data from source.\n - LOAD: Load/import data into target.\n - SYNC: Sync/replicate binlog changes."
},
"SyncDetail": {
Expand Down Expand Up @@ -1144,11 +1244,17 @@
"description": "Target database password."
}
},
"required": ["user", "password"]
"required": [
"user",
"password"
]
},
"TaskMode": {
"type": "string",
"enum": ["ALL", "INCREMENTAL"],
"enum": [
"ALL",
"INCREMENTAL"
],
"description": "Migration task mode.\n\n - ALL: Full + incremental migration (all phases).\n - INCREMENTAL: Incremental-only migration (replication)."
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ client.go
configuration.go
git_push.sh
model_any.go
model_binlog_filter_rule.go
model_conn_profile.go
model_conn_type.go
model_create_migration_precheck_resp.go
Expand Down
Loading
Loading