Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

External replication refactoring #39

Merged
merged 3 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 34 additions & 26 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,18 @@ import (

// App is main application structure
type App struct {
state appState
logger *log.Logger
config *config.Config
dcs dcs.DCS
cluster *mysql.Cluster
filelock *flock.Flock
nodeFailedAt map[string]time.Time
streamFromFailedAt map[string]time.Time
daemonState *DaemonState
daemonMutex sync.Mutex
replRepairState map[string]*ReplicationRepairState
state appState
logger *log.Logger
config *config.Config
dcs dcs.DCS
cluster *mysql.Cluster
filelock *flock.Flock
nodeFailedAt map[string]time.Time
streamFromFailedAt map[string]time.Time
daemonState *DaemonState
daemonMutex sync.Mutex
replRepairState map[string]*ReplicationRepairState
externalReplication mysql.IExternalReplication
}

// NewApp returns new App. Suddenly.
Expand All @@ -57,13 +58,18 @@ func NewApp(configFile, logLevel string, interactive bool) (*App, error) {
if logPath != "" {
logger.ReOpenOnSignal(syscall.SIGUSR2)
}
externalReplication, err := mysql.NewExternalReplication(config.ExternalReplicationType, logger)
if err != nil {
return nil, err
}
app := &App{
state: stateFirstRun,
config: config,
logger: logger,
nodeFailedAt: make(map[string]time.Time),
streamFromFailedAt: make(map[string]time.Time),
replRepairState: make(map[string]*ReplicationRepairState),
state: stateFirstRun,
config: config,
logger: logger,
nodeFailedAt: make(map[string]time.Time),
streamFromFailedAt: make(map[string]time.Time),
replRepairState: make(map[string]*ReplicationRepairState),
externalReplication: externalReplication,
}
return app, nil
}
Expand Down Expand Up @@ -229,7 +235,7 @@ func (app *App) externalCAFileChecker(ctx context.Context) {
select {
case <-ticker.C:
localNode := app.cluster.Local()
replicaStatus, err := localNode.GetExternalReplicaStatus()
replicaStatus, err := app.externalReplication.GetReplicaStatus(localNode)
noname0443 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
if !mysql.IsErrorChannelDoesNotExists(err) {
app.logger.Errorf("external CA file checker: host %s failed to get external replica status %v", localNode.Host(), err)
Expand Down Expand Up @@ -1145,7 +1151,7 @@ func (app *App) performSwitchover(clusterState map[string]*NodeState, activeNode

oldMasterNode := app.cluster.Get(oldMaster)
if clusterState[oldMaster].PingOk {
err := oldMasterNode.StopExternalReplication()
err := app.externalReplication.Stop(oldMasterNode)
if err != nil {
return fmt.Errorf("got error: %s while stopping external replication on old master: %s", err, oldMaster)
}
Expand Down Expand Up @@ -1290,7 +1296,7 @@ func (app *App) performSwitchover(clusterState map[string]*NodeState, activeNode
}
} else {
app.logger.Infof("switchover: old master %s does not need recovery", oldMaster)
err = oldMasterNode.ResetExternalReplicationAll()
err = app.externalReplication.Reset(oldMasterNode)
if err != nil {
return fmt.Errorf("got error: %s while reseting external replication on old master: %s", err, oldMaster)
}
Expand Down Expand Up @@ -1332,7 +1338,7 @@ func (app *App) performSwitchover(clusterState map[string]*NodeState, activeNode
}

// enable external replication
err = newMasterNode.SetExternalReplication()
err = app.externalReplication.Set(newMasterNode)
if err != nil {
app.logger.Errorf("failed to set external replication on new master")
}
Expand Down Expand Up @@ -1615,11 +1621,11 @@ func (app *App) repairSlaveNode(node *mysql.Node, clusterState map[string]*NodeS
if err != nil {
app.logger.Errorf("repair: %s", err)
}
err = node.StopExternalReplication()
err = app.externalReplication.Stop(node)
if err != nil {
app.logger.Errorf("repair: %s", err)
}
err = node.ResetExternalReplicationAll()
err = app.externalReplication.Reset(node)
if err != nil {
app.logger.Errorf("repair: %s", err)
}
Expand Down Expand Up @@ -1790,7 +1796,7 @@ func (app *App) repairCascadeNode(node *mysql.Node, clusterState map[string]*Nod
}

func (app *App) repairExternalReplication(masterNode *mysql.Node) {
extReplStatus, err := masterNode.GetExternalReplicaStatus()
extReplStatus, err := app.externalReplication.GetReplicaStatus(masterNode)
if err != nil {
if !mysql.IsErrorChannelDoesNotExists(err) {
app.logger.Errorf("repair (external): host %s failed to get external replica status %v", masterNode.Host(), err)
Expand All @@ -1802,7 +1808,7 @@ func (app *App) repairExternalReplication(masterNode *mysql.Node) {
return
}

if masterNode.IsExternalReplicationRunningByUser() && !extReplStatus.ReplicationRunning() {
if app.externalReplication.IsRunningByUser(masterNode) && !extReplStatus.ReplicationRunning() {
// TODO: remove "". Master is not needed for external replication now
app.TryRepairReplication(masterNode, "", app.config.ExternalReplicationChannel)
}
Expand Down Expand Up @@ -2252,7 +2258,9 @@ func (app *App) Run() int {
go app.healthChecker(ctx)
go app.recoveryChecker(ctx)
go app.stateFileHandler(ctx)
go app.externalCAFileChecker(ctx)
if app.config.ExternalReplicationType != util.Disabled {
go app.externalCAFileChecker(ctx)
}

handlers := map[appState](func() appState){
stateFirstRun: app.stateFirstRun,
Expand Down
2 changes: 1 addition & 1 deletion internal/app/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (app *App) makeReplStateKey(node *mysql.Node, channel string) string {
func StartSlaveAlgorithm(app *App, node *mysql.Node, _ string, channel string) error {
app.logger.Infof("repair: trying to repair replication using StartSlaveAlgorithm...")
if channel == app.config.ExternalReplicationChannel {
return node.StartExternalReplication()
return app.externalReplication.Start(node)
}
return node.StartSlave()
}
Expand Down
108 changes: 55 additions & 53 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,59 +34,60 @@ type MySQLConfig struct {

// Config contains all mysync configuration
type Config struct {
DevMode bool `config:"dev_mode" yaml:"dev_mode"`
SemiSync bool `config:"semi_sync" yaml:"semi_sync"`
SemiSyncEnableLag int64 `config:"semi_sync_enable_lag" yaml:"semi_sync_enable_lag"`
Failover bool `config:"failover" yaml:"failover"`
FailoverCooldown time.Duration `config:"failover_cooldown" yaml:"failover_cooldown"`
FailoverDelay time.Duration `config:"failover_delay" yaml:"failover_delay"`
InactivationDelay time.Duration `config:"inactivation_delay" yaml:"inactivation_delay"`
CriticalDiskUsage float64 `config:"critical_disk_usage" yaml:"critical_disk_usage"`
NotCriticalDiskUsage float64 `config:"not_critical_disk_usage" yaml:"not_critical_disk_usage"`
LogLevel string `config:"loglevel"`
Log string `config:"log"`
Hostname string `config:"hostname"`
Lockfile string `config:"lockfile"`
InfoFile string `config:"info_file" yaml:"info_file"`
Emergefile string `config:"emergefile"`
Resetupfile string `config:"resetupfile"`
Maintenancefile string `config:"maintenancefile"`
MySQL MySQLConfig `config:"mysql"`
Queries map[string]string `config:"queries"`
Commands map[string]string `config:"commands"`
Zookeeper dcs.ZookeeperConfig `config:"zookeeper"`
DcsWaitTimeout time.Duration `config:"dcs_wait_timeout" yaml:"dcs_wait_timeout"`
DBTimeout time.Duration `config:"db_timeout" yaml:"db_timeout"`
DBLostCheckTimeout time.Duration `config:"db_lost_check_timeout" yaml:"db_lost_check_timeout"`
DBSetRoTimeout time.Duration `config:"db_set_ro_timeout" yaml:"db_set_ro_timeout"`
DBSetRoForceTimeout time.Duration `config:"db_set_ro_force_timeout" yaml:"db_set_ro_force_timeout"`
DBStopSlaveSQLThreadTimeout time.Duration `config:"db_stop_slave_sql_thread_timeout" yaml:"db_stop_slave_sql_thread_timeout"`
TickInterval time.Duration `config:"tick_interval" yaml:"tick_interval"`
HealthCheckInterval time.Duration `config:"healthcheck_interval" yaml:"healthcheck_interval"`
InfoFileHandlerInterval time.Duration `config:"info_file_handler_interval" yaml:"info_file_handler_interval"`
RecoveryCheckInterval time.Duration `config:"recoverycheck_interval" yaml:"recoverycheck_interval"`
ExternalCAFileCheckInterval time.Duration `config:"external_ca_file_check_interval" yaml:"external_ca_file_check_interval"`
MaxAcceptableLag float64 `config:"max_acceptable_lag" yaml:"max_acceptable_lag"`
SlaveCatchUpTimeout time.Duration `config:"slave_catch_up_timeout" yaml:"slave_catch_up_timeout"`
DisableSemiSyncReplicationOnMaintenance bool `config:"disable_semi_sync_replication_on_maintenance" yaml:"disable_semi_sync_replication_on_maintenance"`
KeepSuperWritableOnCriticalDiskUsage bool `config:"keep_super_writable_on_critical_disk_usage" yaml:"keep_super_writable_on_critical_disk_usage"`
ExcludeUsers []string `config:"exclude_users" yaml:"exclude_users"`
OfflineModeEnableInterval time.Duration `config:"offline_mode_enable_interval" yaml:"offline_mode_enable_interval"`
OfflineModeEnableLag time.Duration `config:"offline_mode_enable_lag" yaml:"offline_mode_enable_lag"`
OfflineModeDisableLag time.Duration `config:"offline_mode_disable_lag" yaml:"offline_mode_disable_lag"`
DisableSetReadonlyOnLost bool `config:"disable_set_readonly_on_lost" yaml:"disable_set_readonly_on_lost"`
ResetupCrashedHosts bool `config:"resetup_crashed_hosts" yaml:"resetup_crashed_hosts"`
StreamFromReasonableLag time.Duration `config:"stream_from_reasonable_lag" yaml:"stream_from_reasonable_lag"`
PriorityChoiceMaxLag time.Duration `config:"priority_choice_max_lag" yaml:"priority_choice_max_lag"`
TestDiskUsageFile string `config:"test_disk_usage_file" yaml:"test_disk_usage_file"`
RplSemiSyncMasterWaitForSlaveCount int `config:"rpl_semi_sync_master_wait_for_slave_count" yaml:"rpl_semi_sync_master_wait_for_slave_count"`
WaitReplicationStartTimeout time.Duration `config:"wait_start_replication_timeout" yaml:"wait_start_replication_timeout"`
ReplicationRepairAggressiveMode bool `config:"replication_repair_aggressive_mode" yaml:"replication_repair_aggressive_mode"`
ReplicationRepairCooldown time.Duration `config:"replication_repair_cooldown" yaml:"replication_repair_cooldown"`
ReplicationRepairMaxAttempts int `config:"replication_repair_max_attempts" yaml:"replication_repair_max_attempts"`
TestFilesystemReadonlyFile string `config:"test_filesystem_readonly_file" yaml:"test_filesystem_readonly_file"`
ReplicationChannel string `config:"replication_channel" yaml:"replication_channel"`
ExternalReplicationChannel string `config:"external_replication_channel" yaml:"external_replication_channel"`
DevMode bool `config:"dev_mode" yaml:"dev_mode"`
SemiSync bool `config:"semi_sync" yaml:"semi_sync"`
SemiSyncEnableLag int64 `config:"semi_sync_enable_lag" yaml:"semi_sync_enable_lag"`
Failover bool `config:"failover" yaml:"failover"`
FailoverCooldown time.Duration `config:"failover_cooldown" yaml:"failover_cooldown"`
FailoverDelay time.Duration `config:"failover_delay" yaml:"failover_delay"`
InactivationDelay time.Duration `config:"inactivation_delay" yaml:"inactivation_delay"`
CriticalDiskUsage float64 `config:"critical_disk_usage" yaml:"critical_disk_usage"`
NotCriticalDiskUsage float64 `config:"not_critical_disk_usage" yaml:"not_critical_disk_usage"`
LogLevel string `config:"loglevel"`
Log string `config:"log"`
Hostname string `config:"hostname"`
Lockfile string `config:"lockfile"`
InfoFile string `config:"info_file" yaml:"info_file"`
Emergefile string `config:"emergefile"`
Resetupfile string `config:"resetupfile"`
Maintenancefile string `config:"maintenancefile"`
MySQL MySQLConfig `config:"mysql"`
Queries map[string]string `config:"queries"`
Commands map[string]string `config:"commands"`
Zookeeper dcs.ZookeeperConfig `config:"zookeeper"`
DcsWaitTimeout time.Duration `config:"dcs_wait_timeout" yaml:"dcs_wait_timeout"`
DBTimeout time.Duration `config:"db_timeout" yaml:"db_timeout"`
DBLostCheckTimeout time.Duration `config:"db_lost_check_timeout" yaml:"db_lost_check_timeout"`
DBSetRoTimeout time.Duration `config:"db_set_ro_timeout" yaml:"db_set_ro_timeout"`
DBSetRoForceTimeout time.Duration `config:"db_set_ro_force_timeout" yaml:"db_set_ro_force_timeout"`
DBStopSlaveSQLThreadTimeout time.Duration `config:"db_stop_slave_sql_thread_timeout" yaml:"db_stop_slave_sql_thread_timeout"`
TickInterval time.Duration `config:"tick_interval" yaml:"tick_interval"`
HealthCheckInterval time.Duration `config:"healthcheck_interval" yaml:"healthcheck_interval"`
InfoFileHandlerInterval time.Duration `config:"info_file_handler_interval" yaml:"info_file_handler_interval"`
RecoveryCheckInterval time.Duration `config:"recoverycheck_interval" yaml:"recoverycheck_interval"`
ExternalCAFileCheckInterval time.Duration `config:"external_ca_file_check_interval" yaml:"external_ca_file_check_interval"`
MaxAcceptableLag float64 `config:"max_acceptable_lag" yaml:"max_acceptable_lag"`
SlaveCatchUpTimeout time.Duration `config:"slave_catch_up_timeout" yaml:"slave_catch_up_timeout"`
DisableSemiSyncReplicationOnMaintenance bool `config:"disable_semi_sync_replication_on_maintenance" yaml:"disable_semi_sync_replication_on_maintenance"`
KeepSuperWritableOnCriticalDiskUsage bool `config:"keep_super_writable_on_critical_disk_usage" yaml:"keep_super_writable_on_critical_disk_usage"`
ExcludeUsers []string `config:"exclude_users" yaml:"exclude_users"`
OfflineModeEnableInterval time.Duration `config:"offline_mode_enable_interval" yaml:"offline_mode_enable_interval"`
OfflineModeEnableLag time.Duration `config:"offline_mode_enable_lag" yaml:"offline_mode_enable_lag"`
OfflineModeDisableLag time.Duration `config:"offline_mode_disable_lag" yaml:"offline_mode_disable_lag"`
DisableSetReadonlyOnLost bool `config:"disable_set_readonly_on_lost" yaml:"disable_set_readonly_on_lost"`
ResetupCrashedHosts bool `config:"resetup_crashed_hosts" yaml:"resetup_crashed_hosts"`
StreamFromReasonableLag time.Duration `config:"stream_from_reasonable_lag" yaml:"stream_from_reasonable_lag"`
PriorityChoiceMaxLag time.Duration `config:"priority_choice_max_lag" yaml:"priority_choice_max_lag"`
TestDiskUsageFile string `config:"test_disk_usage_file" yaml:"test_disk_usage_file"`
RplSemiSyncMasterWaitForSlaveCount int `config:"rpl_semi_sync_master_wait_for_slave_count" yaml:"rpl_semi_sync_master_wait_for_slave_count"`
WaitReplicationStartTimeout time.Duration `config:"wait_start_replication_timeout" yaml:"wait_start_replication_timeout"`
ReplicationRepairAggressiveMode bool `config:"replication_repair_aggressive_mode" yaml:"replication_repair_aggressive_mode"`
ReplicationRepairCooldown time.Duration `config:"replication_repair_cooldown" yaml:"replication_repair_cooldown"`
ReplicationRepairMaxAttempts int `config:"replication_repair_max_attempts" yaml:"replication_repair_max_attempts"`
TestFilesystemReadonlyFile string `config:"test_filesystem_readonly_file" yaml:"test_filesystem_readonly_file"`
ReplicationChannel string `config:"replication_channel" yaml:"replication_channel"`
ExternalReplicationChannel string `config:"external_replication_channel" yaml:"external_replication_channel"`
ExternalReplicationType util.ExternalReplicationType `config:"external_replication_type" yaml:"external_replication_type"`
}

// DefaultConfig returns default configuration for MySync
Expand Down Expand Up @@ -162,6 +163,7 @@ func DefaultConfig() (Config, error) {
TestFilesystemReadonlyFile: "", // fake readonly status, only for docker tests
ReplicationChannel: "",
ExternalReplicationChannel: "external",
ExternalReplicationType: util.Disabled,
}
return config, nil
}
Expand Down
Loading