diff --git a/internal/mysql/node.go b/internal/mysql/node.go index ed1966ad..4dcdd053 100644 --- a/internal/mysql/node.go +++ b/internal/mysql/node.go @@ -29,7 +29,7 @@ import ( // Node represents API to query/manipulate single MySQL node type Node struct { - Config *config.Config + config *config.Config logger *log.Logger host string db *sqlx.DB @@ -65,7 +65,7 @@ func NewNode(config *config.Config, logger *log.Logger, host string) (*Node, err db.SetMaxOpenConns(3) db.SetConnMaxLifetime(3 * config.TickInterval) return &Node{ - Config: config, + config: config, logger: logger, db: db, host: host, @@ -97,7 +97,7 @@ func (n *Node) Host() string { // IsLocal returns true if MySQL Node running on the same host as calling mysync process func (n *Node) IsLocal() bool { - return n.host == n.Config.Hostname + return n.host == n.config.Hostname } func (n *Node) String() string { @@ -110,7 +110,7 @@ func (n *Node) Close() error { } func (n *Node) getCommand(name string) string { - command, ok := n.Config.Commands[name] + command, ok := n.config.Commands[name] if !ok { command, ok = defaultCommands[name] } @@ -134,7 +134,7 @@ func (n *Node) runCommand(name string) (int, error) { } func (n *Node) getQuery(name string) string { - query, ok := n.Config.Queries[name] + query, ok := n.config.Queries[name] if !ok { query, ok = DefaultQueries[name] } @@ -147,14 +147,14 @@ func (n *Node) getQuery(name string) string { func (n *Node) traceQuery(query string, arg interface{}, result interface{}, err error) { query = queryOnliner.ReplaceAllString(query, " ") msg := fmt.Sprintf("node %s running query '%s' with args %#v, result: %#v, error: %v", n.host, query, arg, result, err) - msg = strings.Replace(msg, n.Config.MySQL.Password, "********", -1) - msg = strings.Replace(msg, n.Config.MySQL.ReplicationPassword, "********", -1) + msg = strings.Replace(msg, n.config.MySQL.Password, "********", -1) + msg = strings.Replace(msg, n.config.MySQL.ReplicationPassword, "********", -1) n.logger.Debug(msg) } //nolint:unparam func (n *Node) QueryRow(queryName string, arg interface{}, result interface{}) error { - return n.queryRowWithTimeout(queryName, arg, result, n.Config.DBTimeout) + return n.queryRowWithTimeout(queryName, arg, result, n.config.DBTimeout) } func (n *Node) queryRowWithTimeout(queryName string, arg interface{}, result interface{}, timeout time.Duration) error { @@ -186,7 +186,7 @@ func (n *Node) QueryRows(queryName string, arg interface{}, scanner func(*sqlx.R arg = struct{}{} } query := n.getQuery(queryName) - ctx, cancel := context.WithTimeout(context.Background(), n.Config.DBTimeout) + ctx, cancel := context.WithTimeout(context.Background(), n.config.DBTimeout) defer cancel() rows, err := n.db.NamedQueryContext(ctx, query, arg) n.traceQuery(query, arg, rows, err) @@ -225,7 +225,7 @@ func (n *Node) execWithTimeout(queryName string, arg map[string]interface{}, tim // nolint: unparam func (n *Node) Exec(queryName string, arg map[string]interface{}) error { - return n.execWithTimeout(queryName, arg, n.Config.DBTimeout) + return n.execWithTimeout(queryName, arg, n.config.DBTimeout) } func (n *Node) getRunningQueryIds(excludeUsers []string, timeout time.Duration) ([]int, error) { @@ -304,7 +304,7 @@ func (n *Node) execMogrifyWithTimeout(queryName string, arg map[string]interface } func (n *Node) ExecMogrify(queryName string, arg map[string]interface{}) error { - return n.execMogrifyWithTimeout(queryName, arg, n.Config.DBTimeout) + return n.execMogrifyWithTimeout(queryName, arg, n.config.DBTimeout) } func (n *Node) queryRowMogrifyWithTimeout(queryName string, arg map[string]interface{}, result interface{}, timeout time.Duration) error { @@ -352,15 +352,15 @@ func (n *Node) getTestDiskUsage(f string) (used uint64, total uint64, err error) // GetDiskUsage returns datadir usage statistics func (n *Node) GetDiskUsage() (used uint64, total uint64, err error) { - if n.Config.TestDiskUsageFile != "" { - return n.getTestDiskUsage(n.Config.TestDiskUsageFile) + if n.config.TestDiskUsageFile != "" { + return n.getTestDiskUsage(n.config.TestDiskUsageFile) } if !n.IsLocal() { err = ErrNotLocalNode return } var stat syscall.Statfs_t - err = syscall.Statfs(n.Config.MySQL.DataDir, &stat) + err = syscall.Statfs(n.config.MySQL.DataDir, &stat) total = uint64(stat.Bsize) * stat.Blocks // on FreeBSD stat.Bavail may be negative bavail := stat.Bavail @@ -406,8 +406,8 @@ func getFlagsFromProcMounts(file, filesystem string) (string, error) { } func (n *Node) IsFileSystemReadonly() (bool, error) { - if n.Config.TestFilesystemReadonlyFile != "" { - return n.isTestFileSystemReadonly(n.Config.TestFilesystemReadonlyFile) + if n.config.TestFilesystemReadonlyFile != "" { + return n.isTestFileSystemReadonly(n.config.TestFilesystemReadonlyFile) } if !n.IsLocal() { return false, ErrNotLocalNode @@ -419,7 +419,7 @@ func (n *Node) IsFileSystemReadonly() (bool, error) { } file := string(data) - flag, err := getFlagsFromProcMounts(file, n.Config.MySQL.DataDir) + flag, err := getFlagsFromProcMounts(file, n.config.MySQL.DataDir) if err != nil { return false, err } @@ -435,7 +435,7 @@ func (n *Node) GetDaemonStartTime() (time.Time, error) { if !n.IsLocal() { return time.Time{}, ErrNotLocalNode } - pidB, err := os.ReadFile(n.Config.MySQL.PidFile) + pidB, err := os.ReadFile(n.config.MySQL.PidFile) if err != nil { if os.IsNotExist(err) { err = nil @@ -461,7 +461,7 @@ func (n *Node) GetCrashRecoveryTime() (time.Time, error) { if !n.IsLocal() { return time.Time{}, ErrNotLocalNode } - fh, err := os.Open(n.Config.MySQL.ErrorLog) + fh, err := os.Open(n.config.MySQL.ErrorLog) if err != nil { return time.Time{}, err } @@ -594,7 +594,7 @@ func (n *Node) IsReadOnly() (bool, bool, error) { // Setting server read-only may take a while // as server waits all running commits (not transactions) to be finished func (n *Node) SetReadOnly(superReadOnly bool) error { - return n.setReadonlyWithTimeout(superReadOnly, n.Config.DBSetRoTimeout) + return n.setReadonlyWithTimeout(superReadOnly, n.config.DBSetRoTimeout) } func (n *Node) setReadonlyWithTimeout(superReadOnly bool, timeout time.Duration) error { @@ -661,7 +661,7 @@ func (n *Node) SetReadOnlyWithForce(excludeUsers []string, superReadOnly bool) e defer func() { quit <- true }() - return n.setReadonlyWithTimeout(superReadOnly, n.Config.DBSetRoForceTimeout) + return n.setReadonlyWithTimeout(superReadOnly, n.config.DBSetRoForceTimeout) } // SetWritable sets MySQL Node to be writable, eg. disables read-only @@ -672,28 +672,28 @@ func (n *Node) SetWritable() error { // StopSlave stops replication (both IO and SQL threads) func (n *Node) StopSlave() error { return n.execMogrifyWithTimeout(queryStopSlave, map[string]interface{}{ - "channel": n.Config.ReplicationChannel, - }, n.Config.DBStopSlaveSQLThreadTimeout) + "channel": n.config.ReplicationChannel, + }, n.config.DBStopSlaveSQLThreadTimeout) } // StartSlave starts replication (both IO and SQL threads) func (n *Node) StartSlave() error { return n.ExecMogrify(queryStartSlave, map[string]interface{}{ - "channel": n.Config.ReplicationChannel, + "channel": n.config.ReplicationChannel, }) } // StopSlaveIOThread stops IO replication thread func (n *Node) StopSlaveIOThread() error { return n.ExecMogrify(queryStopSlaveIOThread, map[string]interface{}{ - "channel": n.Config.ReplicationChannel, + "channel": n.config.ReplicationChannel, }) } // StartSlaveIOThread starts IO replication thread func (n *Node) StartSlaveIOThread() error { return n.ExecMogrify(queryStartSlaveIOThread, map[string]interface{}{ - "channel": n.Config.ReplicationChannel, + "channel": n.config.ReplicationChannel, }) } @@ -709,21 +709,21 @@ func (n *Node) RestartSlaveIOThread() error { // StopSlaveSQLThread stops SQL replication thread func (n *Node) StopSlaveSQLThread() error { return n.execMogrifyWithTimeout(queryStopSlaveSQLThread, map[string]interface{}{ - "channel": n.Config.ReplicationChannel, - }, n.Config.DBStopSlaveSQLThreadTimeout) + "channel": n.config.ReplicationChannel, + }, n.config.DBStopSlaveSQLThreadTimeout) } // StartSlaveSQLThread starts SQL replication thread func (n *Node) StartSlaveSQLThread() error { return n.ExecMogrify(queryStartSlaveSQLThread, map[string]interface{}{ - "channel": n.Config.ReplicationChannel, + "channel": n.config.ReplicationChannel, }) } // ResetSlaveAll promotes MySQL Node to be master func (n *Node) ResetSlaveAll() error { return n.ExecMogrify(queryResetSlaveAll, map[string]interface{}{ - "channel": n.Config.ReplicationChannel, + "channel": n.config.ReplicationChannel, }) } @@ -784,20 +784,20 @@ func (n *Node) SetOnline() error { // ChangeMaster changes master of MySQL Node, demoting it to slave func (n *Node) ChangeMaster(host string) error { useSsl := 0 - if n.Config.MySQL.ReplicationSslCA != "" { + if n.config.MySQL.ReplicationSslCA != "" { useSsl = 1 } return n.ExecMogrify(queryChangeMaster, map[string]interface{}{ "host": host, - "port": n.Config.MySQL.ReplicationPort, - "user": n.Config.MySQL.ReplicationUser, - "password": n.Config.MySQL.ReplicationPassword, + "port": n.config.MySQL.ReplicationPort, + "user": n.config.MySQL.ReplicationUser, + "password": n.config.MySQL.ReplicationPassword, "ssl": useSsl, - "sslCa": n.Config.MySQL.ReplicationSslCA, - "retryCount": n.Config.MySQL.ReplicationRetryCount, - "connectRetry": n.Config.MySQL.ReplicationConnectRetry, - "heartbeatPeriod": n.Config.MySQL.ReplicationHeartbeatPeriod, - "channel": n.Config.ReplicationChannel, + "sslCa": n.config.MySQL.ReplicationSslCA, + "retryCount": n.config.MySQL.ReplicationRetryCount, + "connectRetry": n.config.MySQL.ReplicationConnectRetry, + "heartbeatPeriod": n.config.MySQL.ReplicationHeartbeatPeriod, + "channel": n.config.ReplicationChannel, }) } @@ -878,7 +878,7 @@ func (n *Node) GetStartupTime() (time.Time, error) { // GetReplicaStatus returns slave/replica status or nil if node is master func (n *Node) GetReplicaStatus() (ReplicaStatus, error) { - return n.ReplicaStatusWithTimeout(n.Config.DBTimeout, n.Config.ReplicationChannel) + return n.ReplicaStatusWithTimeout(n.config.DBTimeout, n.config.ReplicationChannel) } func (n *Node) UpdateExternalCAFile() error { @@ -888,7 +888,7 @@ func (n *Node) UpdateExternalCAFile() error { return nil } data := replSettings.SourceSslCa - fileName := n.Config.MySQL.ExternalReplicationSslCA + fileName := n.config.MySQL.ExternalReplicationSslCA if data != "" && fileName != "" { err = util.TouchFile(fileName) if err != nil { diff --git a/internal/mysql/replication.go b/internal/mysql/replication.go index 52185a0c..c7fe48a7 100644 --- a/internal/mysql/replication.go +++ b/internal/mysql/replication.go @@ -52,8 +52,9 @@ func NewExternalReplication(replicationType util.ExternalReplicationType, logger return &ExternalReplication{ logger: logger, }, nil + default: + return &UnimplementedExternalReplication{}, nil } - return &UnimplementedExternalReplication{}, nil } // GetExternalReplicaStatus returns slave/replica status or nil if node is master for external channel @@ -66,7 +67,7 @@ func (er *ExternalReplication) GetReplicaStatus(n *Node) (ReplicaStatus, error) return nil, nil } - return n.ReplicaStatusWithTimeout(n.Config.DBTimeout, n.Config.ExternalReplicationChannel) + return n.ReplicaStatusWithTimeout(n.config.DBTimeout, n.config.ExternalReplicationChannel) } // StartExternalReplication starts external replication @@ -77,7 +78,7 @@ func (er *ExternalReplication) Start(n *Node) error { } if checked { err := n.ExecMogrify(queryStartReplica, map[string]interface{}{ - "channel": n.Config.ExternalReplicationChannel, + "channel": n.config.ExternalReplicationChannel, }) if err != nil { return err @@ -94,7 +95,7 @@ func (er *ExternalReplication) Stop(n *Node) error { } if checked { err := n.ExecMogrify(queryStopReplica, map[string]interface{}{ - "channel": n.Config.ExternalReplicationChannel, + "channel": n.config.ExternalReplicationChannel, }) if err != nil && !IsErrorChannelDoesNotExists(err) { return err @@ -111,7 +112,7 @@ func (er *ExternalReplication) Reset(n *Node) error { } if checked { err := n.ExecMogrify(queryResetReplicaAll, map[string]interface{}{ - "channel": n.Config.ExternalReplicationChannel, + "channel": n.config.ExternalReplicationChannel, }) if err != nil && !IsErrorChannelDoesNotExists(err) { return err @@ -145,9 +146,9 @@ func (er *ExternalReplication) Set(n *Node) error { } useSsl := 0 sslCa := "" - if replSettings.SourceSslCa != "" && n.Config.MySQL.ExternalReplicationSslCA != "" { + if replSettings.SourceSslCa != "" && n.config.MySQL.ExternalReplicationSslCA != "" { useSsl = 1 - sslCa = n.Config.MySQL.ExternalReplicationSslCA + sslCa = n.config.MySQL.ExternalReplicationSslCA } err = er.Stop(n) if err != nil { @@ -165,9 +166,9 @@ func (er *ExternalReplication) Set(n *Node) error { "ssl": useSsl, "sslCa": sslCa, "sourceDelay": replSettings.SourceDelay, - "retryCount": n.Config.MySQL.ReplicationRetryCount, - "connectRetry": n.Config.MySQL.ReplicationConnectRetry, - "heartbeatPeriod": n.Config.MySQL.ReplicationHeartbeatPeriod, + "retryCount": n.config.MySQL.ReplicationRetryCount, + "connectRetry": n.config.MySQL.ReplicationConnectRetry, + "heartbeatPeriod": n.config.MySQL.ReplicationHeartbeatPeriod, "channel": "external", }) if err != nil {