Skip to content

Commit

Permalink
Added IExternalReplication interface
Browse files Browse the repository at this point in the history
  • Loading branch information
noname0443 committed Sep 19, 2023
1 parent f7d5bbb commit 05149a4
Show file tree
Hide file tree
Showing 7 changed files with 307 additions and 226 deletions.
56 changes: 31 additions & 25 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,18 @@ import (

// App is main application structure
type App struct {
state appState
logger *log.Logger
config *config.Config
dcs dcs.DCS
cluster *mysql.Cluster
filelock *flock.Flock
nodeFailedAt map[string]time.Time
streamFromFailedAt map[string]time.Time
daemonState *DaemonState
daemonMutex sync.Mutex
replRepairState map[string]*ReplicationRepairState
state appState
logger *log.Logger
config *config.Config
dcs dcs.DCS
cluster *mysql.Cluster
filelock *flock.Flock
nodeFailedAt map[string]time.Time
streamFromFailedAt map[string]time.Time
daemonState *DaemonState
daemonMutex sync.Mutex
replRepairState map[string]*ReplicationRepairState
externalReplication mysql.IExternalReplication
}

// NewApp returns new App. Suddenly.
Expand All @@ -57,13 +58,18 @@ func NewApp(configFile, logLevel string, interactive bool) (*App, error) {
if logPath != "" {
logger.ReOpenOnSignal(syscall.SIGUSR2)
}
externalReplication, err := mysql.NewExternalReplication(config.ExternalReplicationType, logger)
if err != nil {
return nil, err
}
app := &App{
state: stateFirstRun,
config: config,
logger: logger,
nodeFailedAt: make(map[string]time.Time),
streamFromFailedAt: make(map[string]time.Time),
replRepairState: make(map[string]*ReplicationRepairState),
state: stateFirstRun,
config: config,
logger: logger,
nodeFailedAt: make(map[string]time.Time),
streamFromFailedAt: make(map[string]time.Time),
replRepairState: make(map[string]*ReplicationRepairState),
externalReplication: externalReplication,
}
return app, nil
}
Expand Down Expand Up @@ -229,7 +235,7 @@ func (app *App) externalCAFileChecker(ctx context.Context) {
select {
case <-ticker.C:
localNode := app.cluster.Local()
replicaStatus, err := localNode.GetExternalReplicaStatus()
replicaStatus, err := app.externalReplication.GetReplicaStatus(localNode)
if err != nil {
if !mysql.IsErrorChannelDoesNotExists(err) {
app.logger.Errorf("external CA file checker: host %s failed to get external replica status %v", localNode.Host(), err)
Expand Down Expand Up @@ -1145,7 +1151,7 @@ func (app *App) performSwitchover(clusterState map[string]*NodeState, activeNode

oldMasterNode := app.cluster.Get(oldMaster)
if clusterState[oldMaster].PingOk {
err := oldMasterNode.StopExternalReplication()
err := app.externalReplication.Stop(oldMasterNode)
if err != nil {
return fmt.Errorf("got error: %s while stopping external replication on old master: %s", err, oldMaster)
}
Expand Down Expand Up @@ -1290,7 +1296,7 @@ func (app *App) performSwitchover(clusterState map[string]*NodeState, activeNode
}
} else {
app.logger.Infof("switchover: old master %s does not need recovery", oldMaster)
err = oldMasterNode.ResetExternalReplicationAll()
err = app.externalReplication.Reset(oldMasterNode)
if err != nil {
return fmt.Errorf("got error: %s while reseting external replication on old master: %s", err, oldMaster)
}
Expand Down Expand Up @@ -1332,7 +1338,7 @@ func (app *App) performSwitchover(clusterState map[string]*NodeState, activeNode
}

// enable external replication
err = newMasterNode.SetExternalReplication()
err = app.externalReplication.Set(newMasterNode)
if err != nil {
app.logger.Errorf("failed to set external replication on new master")
}
Expand Down Expand Up @@ -1615,11 +1621,11 @@ func (app *App) repairSlaveNode(node *mysql.Node, clusterState map[string]*NodeS
if err != nil {
app.logger.Errorf("repair: %s", err)
}
err = node.StopExternalReplication()
err = app.externalReplication.Stop(node)
if err != nil {
app.logger.Errorf("repair: %s", err)
}
err = node.ResetExternalReplicationAll()
err = app.externalReplication.Reset(node)
if err != nil {
app.logger.Errorf("repair: %s", err)
}
Expand Down Expand Up @@ -1790,7 +1796,7 @@ func (app *App) repairCascadeNode(node *mysql.Node, clusterState map[string]*Nod
}

func (app *App) repairExternalReplication(masterNode *mysql.Node) {
extReplStatus, err := masterNode.GetExternalReplicaStatus()
extReplStatus, err := app.externalReplication.GetReplicaStatus(masterNode)
if err != nil {
if !mysql.IsErrorChannelDoesNotExists(err) {
app.logger.Errorf("repair (external): host %s failed to get external replica status %v", masterNode.Host(), err)
Expand All @@ -1802,7 +1808,7 @@ func (app *App) repairExternalReplication(masterNode *mysql.Node) {
return
}

if masterNode.IsExternalReplicationRunningByUser() && !extReplStatus.ReplicationRunning() {
if app.externalReplication.IsRunningByUser(masterNode) && !extReplStatus.ReplicationRunning() {
// TODO: remove "". Master is not needed for external replication now
app.TryRepairReplication(masterNode, "", app.config.ExternalReplicationChannel)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/app/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (app *App) makeReplStateKey(node *mysql.Node, channel string) string {
func StartSlaveAlgorithm(app *App, node *mysql.Node, _ string, channel string) error {
app.logger.Infof("repair: trying to repair replication using StartSlaveAlgorithm...")
if channel == app.config.ExternalReplicationChannel {
return node.StartExternalReplication()
return app.externalReplication.Start(node)
}
return node.StartSlave()
}
Expand Down
108 changes: 55 additions & 53 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,59 +34,60 @@ type MySQLConfig struct {

// Config contains all mysync configuration
type Config struct {
DevMode bool `config:"dev_mode" yaml:"dev_mode"`
SemiSync bool `config:"semi_sync" yaml:"semi_sync"`
SemiSyncEnableLag int64 `config:"semi_sync_enable_lag" yaml:"semi_sync_enable_lag"`
Failover bool `config:"failover" yaml:"failover"`
FailoverCooldown time.Duration `config:"failover_cooldown" yaml:"failover_cooldown"`
FailoverDelay time.Duration `config:"failover_delay" yaml:"failover_delay"`
InactivationDelay time.Duration `config:"inactivation_delay" yaml:"inactivation_delay"`
CriticalDiskUsage float64 `config:"critical_disk_usage" yaml:"critical_disk_usage"`
NotCriticalDiskUsage float64 `config:"not_critical_disk_usage" yaml:"not_critical_disk_usage"`
LogLevel string `config:"loglevel"`
Log string `config:"log"`
Hostname string `config:"hostname"`
Lockfile string `config:"lockfile"`
InfoFile string `config:"info_file" yaml:"info_file"`
Emergefile string `config:"emergefile"`
Resetupfile string `config:"resetupfile"`
Maintenancefile string `config:"maintenancefile"`
MySQL MySQLConfig `config:"mysql"`
Queries map[string]string `config:"queries"`
Commands map[string]string `config:"commands"`
Zookeeper dcs.ZookeeperConfig `config:"zookeeper"`
DcsWaitTimeout time.Duration `config:"dcs_wait_timeout" yaml:"dcs_wait_timeout"`
DBTimeout time.Duration `config:"db_timeout" yaml:"db_timeout"`
DBLostCheckTimeout time.Duration `config:"db_lost_check_timeout" yaml:"db_lost_check_timeout"`
DBSetRoTimeout time.Duration `config:"db_set_ro_timeout" yaml:"db_set_ro_timeout"`
DBSetRoForceTimeout time.Duration `config:"db_set_ro_force_timeout" yaml:"db_set_ro_force_timeout"`
DBStopSlaveSQLThreadTimeout time.Duration `config:"db_stop_slave_sql_thread_timeout" yaml:"db_stop_slave_sql_thread_timeout"`
TickInterval time.Duration `config:"tick_interval" yaml:"tick_interval"`
HealthCheckInterval time.Duration `config:"healthcheck_interval" yaml:"healthcheck_interval"`
InfoFileHandlerInterval time.Duration `config:"info_file_handler_interval" yaml:"info_file_handler_interval"`
RecoveryCheckInterval time.Duration `config:"recoverycheck_interval" yaml:"recoverycheck_interval"`
ExternalCAFileCheckInterval time.Duration `config:"external_ca_file_check_interval" yaml:"external_ca_file_check_interval"`
MaxAcceptableLag float64 `config:"max_acceptable_lag" yaml:"max_acceptable_lag"`
SlaveCatchUpTimeout time.Duration `config:"slave_catch_up_timeout" yaml:"slave_catch_up_timeout"`
DisableSemiSyncReplicationOnMaintenance bool `config:"disable_semi_sync_replication_on_maintenance" yaml:"disable_semi_sync_replication_on_maintenance"`
KeepSuperWritableOnCriticalDiskUsage bool `config:"keep_super_writable_on_critical_disk_usage" yaml:"keep_super_writable_on_critical_disk_usage"`
ExcludeUsers []string `config:"exclude_users" yaml:"exclude_users"`
OfflineModeEnableInterval time.Duration `config:"offline_mode_enable_interval" yaml:"offline_mode_enable_interval"`
OfflineModeEnableLag time.Duration `config:"offline_mode_enable_lag" yaml:"offline_mode_enable_lag"`
OfflineModeDisableLag time.Duration `config:"offline_mode_disable_lag" yaml:"offline_mode_disable_lag"`
DisableSetReadonlyOnLost bool `config:"disable_set_readonly_on_lost" yaml:"disable_set_readonly_on_lost"`
ResetupCrashedHosts bool `config:"resetup_crashed_hosts" yaml:"resetup_crashed_hosts"`
StreamFromReasonableLag time.Duration `config:"stream_from_reasonable_lag" yaml:"stream_from_reasonable_lag"`
PriorityChoiceMaxLag time.Duration `config:"priority_choice_max_lag" yaml:"priority_choice_max_lag"`
TestDiskUsageFile string `config:"test_disk_usage_file" yaml:"test_disk_usage_file"`
RplSemiSyncMasterWaitForSlaveCount int `config:"rpl_semi_sync_master_wait_for_slave_count" yaml:"rpl_semi_sync_master_wait_for_slave_count"`
WaitReplicationStartTimeout time.Duration `config:"wait_start_replication_timeout" yaml:"wait_start_replication_timeout"`
ReplicationRepairAggressiveMode bool `config:"replication_repair_aggressive_mode" yaml:"replication_repair_aggressive_mode"`
ReplicationRepairCooldown time.Duration `config:"replication_repair_cooldown" yaml:"replication_repair_cooldown"`
ReplicationRepairMaxAttempts int `config:"replication_repair_max_attempts" yaml:"replication_repair_max_attempts"`
TestFilesystemReadonlyFile string `config:"test_filesystem_readonly_file" yaml:"test_filesystem_readonly_file"`
ReplicationChannel string `config:"replication_channel" yaml:"replication_channel"`
ExternalReplicationChannel string `config:"external_replication_channel" yaml:"external_replication_channel"`
DevMode bool `config:"dev_mode" yaml:"dev_mode"`
SemiSync bool `config:"semi_sync" yaml:"semi_sync"`
SemiSyncEnableLag int64 `config:"semi_sync_enable_lag" yaml:"semi_sync_enable_lag"`
Failover bool `config:"failover" yaml:"failover"`
FailoverCooldown time.Duration `config:"failover_cooldown" yaml:"failover_cooldown"`
FailoverDelay time.Duration `config:"failover_delay" yaml:"failover_delay"`
InactivationDelay time.Duration `config:"inactivation_delay" yaml:"inactivation_delay"`
CriticalDiskUsage float64 `config:"critical_disk_usage" yaml:"critical_disk_usage"`
NotCriticalDiskUsage float64 `config:"not_critical_disk_usage" yaml:"not_critical_disk_usage"`
LogLevel string `config:"loglevel"`
Log string `config:"log"`
Hostname string `config:"hostname"`
Lockfile string `config:"lockfile"`
InfoFile string `config:"info_file" yaml:"info_file"`
Emergefile string `config:"emergefile"`
Resetupfile string `config:"resetupfile"`
Maintenancefile string `config:"maintenancefile"`
MySQL MySQLConfig `config:"mysql"`
Queries map[string]string `config:"queries"`
Commands map[string]string `config:"commands"`
Zookeeper dcs.ZookeeperConfig `config:"zookeeper"`
DcsWaitTimeout time.Duration `config:"dcs_wait_timeout" yaml:"dcs_wait_timeout"`
DBTimeout time.Duration `config:"db_timeout" yaml:"db_timeout"`
DBLostCheckTimeout time.Duration `config:"db_lost_check_timeout" yaml:"db_lost_check_timeout"`
DBSetRoTimeout time.Duration `config:"db_set_ro_timeout" yaml:"db_set_ro_timeout"`
DBSetRoForceTimeout time.Duration `config:"db_set_ro_force_timeout" yaml:"db_set_ro_force_timeout"`
DBStopSlaveSQLThreadTimeout time.Duration `config:"db_stop_slave_sql_thread_timeout" yaml:"db_stop_slave_sql_thread_timeout"`
TickInterval time.Duration `config:"tick_interval" yaml:"tick_interval"`
HealthCheckInterval time.Duration `config:"healthcheck_interval" yaml:"healthcheck_interval"`
InfoFileHandlerInterval time.Duration `config:"info_file_handler_interval" yaml:"info_file_handler_interval"`
RecoveryCheckInterval time.Duration `config:"recoverycheck_interval" yaml:"recoverycheck_interval"`
ExternalCAFileCheckInterval time.Duration `config:"external_ca_file_check_interval" yaml:"external_ca_file_check_interval"`
MaxAcceptableLag float64 `config:"max_acceptable_lag" yaml:"max_acceptable_lag"`
SlaveCatchUpTimeout time.Duration `config:"slave_catch_up_timeout" yaml:"slave_catch_up_timeout"`
DisableSemiSyncReplicationOnMaintenance bool `config:"disable_semi_sync_replication_on_maintenance" yaml:"disable_semi_sync_replication_on_maintenance"`
KeepSuperWritableOnCriticalDiskUsage bool `config:"keep_super_writable_on_critical_disk_usage" yaml:"keep_super_writable_on_critical_disk_usage"`
ExcludeUsers []string `config:"exclude_users" yaml:"exclude_users"`
OfflineModeEnableInterval time.Duration `config:"offline_mode_enable_interval" yaml:"offline_mode_enable_interval"`
OfflineModeEnableLag time.Duration `config:"offline_mode_enable_lag" yaml:"offline_mode_enable_lag"`
OfflineModeDisableLag time.Duration `config:"offline_mode_disable_lag" yaml:"offline_mode_disable_lag"`
DisableSetReadonlyOnLost bool `config:"disable_set_readonly_on_lost" yaml:"disable_set_readonly_on_lost"`
ResetupCrashedHosts bool `config:"resetup_crashed_hosts" yaml:"resetup_crashed_hosts"`
StreamFromReasonableLag time.Duration `config:"stream_from_reasonable_lag" yaml:"stream_from_reasonable_lag"`
PriorityChoiceMaxLag time.Duration `config:"priority_choice_max_lag" yaml:"priority_choice_max_lag"`
TestDiskUsageFile string `config:"test_disk_usage_file" yaml:"test_disk_usage_file"`
RplSemiSyncMasterWaitForSlaveCount int `config:"rpl_semi_sync_master_wait_for_slave_count" yaml:"rpl_semi_sync_master_wait_for_slave_count"`
WaitReplicationStartTimeout time.Duration `config:"wait_start_replication_timeout" yaml:"wait_start_replication_timeout"`
ReplicationRepairAggressiveMode bool `config:"replication_repair_aggressive_mode" yaml:"replication_repair_aggressive_mode"`
ReplicationRepairCooldown time.Duration `config:"replication_repair_cooldown" yaml:"replication_repair_cooldown"`
ReplicationRepairMaxAttempts int `config:"replication_repair_max_attempts" yaml:"replication_repair_max_attempts"`
TestFilesystemReadonlyFile string `config:"test_filesystem_readonly_file" yaml:"test_filesystem_readonly_file"`
ReplicationChannel string `config:"replication_channel" yaml:"replication_channel"`
ExternalReplicationChannel string `config:"external_replication_channel" yaml:"external_replication_channel"`
ExternalReplicationType util.ExternalReplicationType `config:"external_replication_type" yaml:"external_replication_type"`
}

// DefaultConfig returns default configuration for MySync
Expand Down Expand Up @@ -162,6 +163,7 @@ func DefaultConfig() (Config, error) {
TestFilesystemReadonlyFile: "", // fake readonly status, only for docker tests
ReplicationChannel: "",
ExternalReplicationChannel: "external",
ExternalReplicationType: util.Disabled,
}
return config, nil
}
Expand Down
Loading

0 comments on commit 05149a4

Please sign in to comment.