Skip to content

Commit

Permalink
Added ReplicaStatusWithTimeout method to interface
Browse files Browse the repository at this point in the history
  • Loading branch information
noname0443 committed Sep 13, 2023
1 parent 37d03ac commit e5f774a
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 98 deletions.
38 changes: 30 additions & 8 deletions internal/mysql/external_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package mysql

import (
"database/sql"
"time"
)

type IExternalReplication interface {
StartExternalReplication() error
ReplicaStatusWithTimeout(time.Duration, string) (ReplicaStatus, error)
StopExternalReplication() error
GetExternalReplicaStatus() (ReplicaStatus, error)
SetExternalReplication() error
Expand All @@ -17,6 +19,10 @@ type DummyExternalReplication struct {
isExternalReplicationSupported bool
}

func ReplicaStatusWithTimeout(time.Duration, string) (ReplicaStatus, error) {
return nil, nil
}

func (d *DummyExternalReplication) StartExternalReplication() error {
return nil
}
Expand Down Expand Up @@ -79,6 +85,28 @@ func (n *ExternalReplication) StopExternalReplication() error {
return nil
}

func (n *ExternalReplication) GetVersionSlaveStatusQuery() (string, ReplicaStatus, error) {
version, err := n.GetVersion()
if err != nil {
return "", nil, err
}
return version.GetSlaveStatusQuery(), version.GetSlaveOrReplicaStruct(), nil
}

func (n *ExternalReplication) ReplicaStatusWithTimeout(timeout time.Duration, channel string) (ReplicaStatus, error) {
query, status, err := n.GetVersionSlaveStatusQuery()
if err != nil {
return nil, err
}
err = n.queryRowMogrifyWithTimeout(query, map[string]interface{}{
"channel": channel,
}, status, timeout)
if err == sql.ErrNoRows {
return nil, nil
}
return status, err
}

// GetExternalReplicaStatus returns slave/replica status or nil if node is master for external channel
func (n *ExternalReplication) GetExternalReplicaStatus() (ReplicaStatus, error) {
checked, err := n.IsExternalReplicationSupported()
Expand All @@ -88,14 +116,8 @@ func (n *ExternalReplication) GetExternalReplicaStatus() (ReplicaStatus, error)
if !(checked) {
return nil, nil
}
status := new(ReplicaStatusStruct)
err = n.queryRowMogrifyWithTimeout(queryReplicaStatus, map[string]interface{}{
"channel": n.config.ExternalReplicationChannel,
}, status, n.config.DBTimeout)
if err == sql.ErrNoRows {
return nil, nil
}
return status, err

return n.ReplicaStatusWithTimeout(n.config.DBTimeout, n.config.ExternalReplicationChannel)
}

func (n *ExternalReplication) SetExternalReplication() error {
Expand Down
90 changes: 0 additions & 90 deletions internal/mysql/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,41 +298,6 @@ func (n *Node) GetReplicaStatus() (ReplicaStatus, error) {
return n.ReplicaStatusWithTimeout(n.config.DBTimeout, n.config.ReplicationChannel)
}

// GetExternalReplicaStatus returns slave/replica status or nil if node is master for external channel
func (n *Node) GetExternalReplicaStatus() (ReplicaStatus, error) {
checked, err := n.IsExternalReplicationSupported()
if err != nil {
return nil, err
}
if !(checked) {
return nil, nil
}

return n.ReplicaStatusWithTimeout(n.config.DBTimeout, n.config.ExternalReplicationChannel)
}

func (n *Node) ReplicaStatusWithTimeout(timeout time.Duration, channel string) (ReplicaStatus, error) {
query, status, err := n.GetVersionSlaveStatusQuery()
if err != nil {
return nil, err
}
err = n.queryRowMogrifyWithTimeout(query, map[string]interface{}{
"channel": channel,
}, status, timeout)
if err == sql.ErrNoRows {
return nil, nil
}
return status, err
}

func (n *Node) GetVersionSlaveStatusQuery() (string, ReplicaStatus, error) {
version, err := n.GetVersion()
if err != nil {
return "", nil, err
}
return version.GetSlaveStatusQuery(), version.GetSlaveOrReplicaStruct(), nil
}

// ReplicationLag returns slave replication lag in seconds
// ReplicationLag may return nil without error if lag is unknown (replication not running)
func (n *Node) ReplicationLag(sstatus ReplicaStatus) (*float64, error) {
Expand Down Expand Up @@ -686,61 +651,6 @@ func (n *Node) GetStartupTime() (time.Time, error) {
return time.Unix(int64(startupTime.LastStartup), 0), nil
}

func (n *Node) SetExternalReplication() error {
var replSettings replicationSettings
err := n.queryRow(queryGetExternalReplicationSettings, nil, &replSettings)
if err != nil {
// If no table in scheme then we consider external replication not existing so we do nothing
if IsErrorTableDoesNotExists(err) {
return nil
}
// If there is no rows in table for external replication - do nothing
if err == sql.ErrNoRows {
n.logger.Infof("no external replication records found in replication table on host %s", n.host)
return nil
}
return err
}
useSsl := 0
sslCa := ""
if replSettings.SourceSslCa != "" && n.config.MySQL.ExternalReplicationSslCA != "" {
useSsl = 1
sslCa = n.config.MySQL.ExternalReplicationSslCA
}
err = n.StopExternalReplication()
if err != nil {
return err
}
err = n.ResetExternalReplicationAll()
if err != nil {
return err
}
err = n.execMogrify(queryChangeSource, map[string]interface{}{
"host": replSettings.SourceHost,
"port": replSettings.SourcePort,
"user": replSettings.SourceUser,
"password": replSettings.SourcePassword,
"ssl": useSsl,
"sslCa": sslCa,
"sourceDelay": replSettings.SourceDelay,
"retryCount": n.config.MySQL.ReplicationRetryCount,
"connectRetry": n.config.MySQL.ReplicationConnectRetry,
"heartbeatPeriod": n.config.MySQL.ReplicationHeartbeatPeriod,
"channel": "external",
})
if err != nil {
return err
}
err = n.execMogrify(queryIgnoreDB, map[string]interface{}{
"ignoreList": schemaname("mysql"),
"channel": "external",
})
if err != nil {
return err
}
return n.StartExternalReplication()
}

func (n *Node) UpdateExternalCAFile() error {
var replSettings replicationSettings
err := n.queryRow(queryGetExternalReplicationSettings, nil, &replSettings)
Expand Down

0 comments on commit e5f774a

Please sign in to comment.