diff --git a/internal/mysql/external_replication.go b/internal/mysql/external_replication.go index cf8feb90..796b8e81 100644 --- a/internal/mysql/external_replication.go +++ b/internal/mysql/external_replication.go @@ -2,10 +2,12 @@ package mysql import ( "database/sql" + "time" ) type IExternalReplication interface { StartExternalReplication() error + ReplicaStatusWithTimeout(time.Duration, string) (ReplicaStatus, error) StopExternalReplication() error GetExternalReplicaStatus() (ReplicaStatus, error) SetExternalReplication() error @@ -17,6 +19,10 @@ type DummyExternalReplication struct { isExternalReplicationSupported bool } +func ReplicaStatusWithTimeout(time.Duration, string) (ReplicaStatus, error) { + return nil, nil +} + func (d *DummyExternalReplication) StartExternalReplication() error { return nil } @@ -79,6 +85,28 @@ func (n *ExternalReplication) StopExternalReplication() error { return nil } +func (n *ExternalReplication) GetVersionSlaveStatusQuery() (string, ReplicaStatus, error) { + version, err := n.GetVersion() + if err != nil { + return "", nil, err + } + return version.GetSlaveStatusQuery(), version.GetSlaveOrReplicaStruct(), nil +} + +func (n *ExternalReplication) ReplicaStatusWithTimeout(timeout time.Duration, channel string) (ReplicaStatus, error) { + query, status, err := n.GetVersionSlaveStatusQuery() + if err != nil { + return nil, err + } + err = n.queryRowMogrifyWithTimeout(query, map[string]interface{}{ + "channel": channel, + }, status, timeout) + if err == sql.ErrNoRows { + return nil, nil + } + return status, err +} + // GetExternalReplicaStatus returns slave/replica status or nil if node is master for external channel func (n *ExternalReplication) GetExternalReplicaStatus() (ReplicaStatus, error) { checked, err := n.IsExternalReplicationSupported() @@ -88,14 +116,8 @@ func (n *ExternalReplication) GetExternalReplicaStatus() (ReplicaStatus, error) if !(checked) { return nil, nil } - status := new(ReplicaStatusStruct) - err = n.queryRowMogrifyWithTimeout(queryReplicaStatus, map[string]interface{}{ - "channel": n.config.ExternalReplicationChannel, - }, status, n.config.DBTimeout) - if err == sql.ErrNoRows { - return nil, nil - } - return status, err + + return n.ReplicaStatusWithTimeout(n.config.DBTimeout, n.config.ExternalReplicationChannel) } func (n *ExternalReplication) SetExternalReplication() error { diff --git a/internal/mysql/node.go b/internal/mysql/node.go index e3f66d5a..c42308b1 100644 --- a/internal/mysql/node.go +++ b/internal/mysql/node.go @@ -298,41 +298,6 @@ func (n *Node) GetReplicaStatus() (ReplicaStatus, error) { return n.ReplicaStatusWithTimeout(n.config.DBTimeout, n.config.ReplicationChannel) } -// GetExternalReplicaStatus returns slave/replica status or nil if node is master for external channel -func (n *Node) GetExternalReplicaStatus() (ReplicaStatus, error) { - checked, err := n.IsExternalReplicationSupported() - if err != nil { - return nil, err - } - if !(checked) { - return nil, nil - } - - return n.ReplicaStatusWithTimeout(n.config.DBTimeout, n.config.ExternalReplicationChannel) -} - -func (n *Node) ReplicaStatusWithTimeout(timeout time.Duration, channel string) (ReplicaStatus, error) { - query, status, err := n.GetVersionSlaveStatusQuery() - if err != nil { - return nil, err - } - err = n.queryRowMogrifyWithTimeout(query, map[string]interface{}{ - "channel": channel, - }, status, timeout) - if err == sql.ErrNoRows { - return nil, nil - } - return status, err -} - -func (n *Node) GetVersionSlaveStatusQuery() (string, ReplicaStatus, error) { - version, err := n.GetVersion() - if err != nil { - return "", nil, err - } - return version.GetSlaveStatusQuery(), version.GetSlaveOrReplicaStruct(), nil -} - // ReplicationLag returns slave replication lag in seconds // ReplicationLag may return nil without error if lag is unknown (replication not running) func (n *Node) ReplicationLag(sstatus ReplicaStatus) (*float64, error) { @@ -686,61 +651,6 @@ func (n *Node) GetStartupTime() (time.Time, error) { return time.Unix(int64(startupTime.LastStartup), 0), nil } -func (n *Node) SetExternalReplication() error { - var replSettings replicationSettings - err := n.queryRow(queryGetExternalReplicationSettings, nil, &replSettings) - if err != nil { - // If no table in scheme then we consider external replication not existing so we do nothing - if IsErrorTableDoesNotExists(err) { - return nil - } - // If there is no rows in table for external replication - do nothing - if err == sql.ErrNoRows { - n.logger.Infof("no external replication records found in replication table on host %s", n.host) - return nil - } - return err - } - useSsl := 0 - sslCa := "" - if replSettings.SourceSslCa != "" && n.config.MySQL.ExternalReplicationSslCA != "" { - useSsl = 1 - sslCa = n.config.MySQL.ExternalReplicationSslCA - } - err = n.StopExternalReplication() - if err != nil { - return err - } - err = n.ResetExternalReplicationAll() - if err != nil { - return err - } - err = n.execMogrify(queryChangeSource, map[string]interface{}{ - "host": replSettings.SourceHost, - "port": replSettings.SourcePort, - "user": replSettings.SourceUser, - "password": replSettings.SourcePassword, - "ssl": useSsl, - "sslCa": sslCa, - "sourceDelay": replSettings.SourceDelay, - "retryCount": n.config.MySQL.ReplicationRetryCount, - "connectRetry": n.config.MySQL.ReplicationConnectRetry, - "heartbeatPeriod": n.config.MySQL.ReplicationHeartbeatPeriod, - "channel": "external", - }) - if err != nil { - return err - } - err = n.execMogrify(queryIgnoreDB, map[string]interface{}{ - "ignoreList": schemaname("mysql"), - "channel": "external", - }) - if err != nil { - return err - } - return n.StartExternalReplication() -} - func (n *Node) UpdateExternalCAFile() error { var replSettings replicationSettings err := n.queryRow(queryGetExternalReplicationSettings, nil, &replSettings)