Skip to content

Commit

Permalink
Config 'privated'
Browse files Browse the repository at this point in the history
  • Loading branch information
noname0443 committed Sep 18, 2023
1 parent 2d846c1 commit 67d8654
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 51 deletions.
82 changes: 41 additions & 41 deletions internal/mysql/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

// Node represents API to query/manipulate single MySQL node
type Node struct {
Config *config.Config
config *config.Config
logger *log.Logger
host string
db *sqlx.DB
Expand Down Expand Up @@ -65,7 +65,7 @@ func NewNode(config *config.Config, logger *log.Logger, host string) (*Node, err
db.SetMaxOpenConns(3)
db.SetConnMaxLifetime(3 * config.TickInterval)
return &Node{
Config: config,
config: config,
logger: logger,
db: db,
host: host,
Expand Down Expand Up @@ -97,7 +97,7 @@ func (n *Node) Host() string {

// IsLocal returns true if MySQL Node running on the same host as calling mysync process
func (n *Node) IsLocal() bool {
return n.host == n.Config.Hostname
return n.host == n.config.Hostname
}

func (n *Node) String() string {
Expand All @@ -110,7 +110,7 @@ func (n *Node) Close() error {
}

func (n *Node) getCommand(name string) string {
command, ok := n.Config.Commands[name]
command, ok := n.config.Commands[name]
if !ok {
command, ok = defaultCommands[name]
}
Expand All @@ -134,7 +134,7 @@ func (n *Node) runCommand(name string) (int, error) {
}

func (n *Node) getQuery(name string) string {
query, ok := n.Config.Queries[name]
query, ok := n.config.Queries[name]
if !ok {
query, ok = DefaultQueries[name]
}
Expand All @@ -147,14 +147,14 @@ func (n *Node) getQuery(name string) string {
func (n *Node) traceQuery(query string, arg interface{}, result interface{}, err error) {
query = queryOnliner.ReplaceAllString(query, " ")
msg := fmt.Sprintf("node %s running query '%s' with args %#v, result: %#v, error: %v", n.host, query, arg, result, err)
msg = strings.Replace(msg, n.Config.MySQL.Password, "********", -1)
msg = strings.Replace(msg, n.Config.MySQL.ReplicationPassword, "********", -1)
msg = strings.Replace(msg, n.config.MySQL.Password, "********", -1)
msg = strings.Replace(msg, n.config.MySQL.ReplicationPassword, "********", -1)
n.logger.Debug(msg)
}

//nolint:unparam
func (n *Node) QueryRow(queryName string, arg interface{}, result interface{}) error {
return n.queryRowWithTimeout(queryName, arg, result, n.Config.DBTimeout)
return n.queryRowWithTimeout(queryName, arg, result, n.config.DBTimeout)
}

func (n *Node) queryRowWithTimeout(queryName string, arg interface{}, result interface{}, timeout time.Duration) error {
Expand Down Expand Up @@ -186,7 +186,7 @@ func (n *Node) QueryRows(queryName string, arg interface{}, scanner func(*sqlx.R
arg = struct{}{}
}
query := n.getQuery(queryName)
ctx, cancel := context.WithTimeout(context.Background(), n.Config.DBTimeout)
ctx, cancel := context.WithTimeout(context.Background(), n.config.DBTimeout)
defer cancel()
rows, err := n.db.NamedQueryContext(ctx, query, arg)
n.traceQuery(query, arg, rows, err)
Expand Down Expand Up @@ -225,7 +225,7 @@ func (n *Node) execWithTimeout(queryName string, arg map[string]interface{}, tim

// nolint: unparam
func (n *Node) Exec(queryName string, arg map[string]interface{}) error {
return n.execWithTimeout(queryName, arg, n.Config.DBTimeout)
return n.execWithTimeout(queryName, arg, n.config.DBTimeout)
}

func (n *Node) getRunningQueryIds(excludeUsers []string, timeout time.Duration) ([]int, error) {
Expand Down Expand Up @@ -304,7 +304,7 @@ func (n *Node) execMogrifyWithTimeout(queryName string, arg map[string]interface
}

func (n *Node) ExecMogrify(queryName string, arg map[string]interface{}) error {
return n.execMogrifyWithTimeout(queryName, arg, n.Config.DBTimeout)
return n.execMogrifyWithTimeout(queryName, arg, n.config.DBTimeout)
}

func (n *Node) queryRowMogrifyWithTimeout(queryName string, arg map[string]interface{}, result interface{}, timeout time.Duration) error {
Expand Down Expand Up @@ -352,15 +352,15 @@ func (n *Node) getTestDiskUsage(f string) (used uint64, total uint64, err error)

// GetDiskUsage returns datadir usage statistics
func (n *Node) GetDiskUsage() (used uint64, total uint64, err error) {
if n.Config.TestDiskUsageFile != "" {
return n.getTestDiskUsage(n.Config.TestDiskUsageFile)
if n.config.TestDiskUsageFile != "" {
return n.getTestDiskUsage(n.config.TestDiskUsageFile)
}
if !n.IsLocal() {
err = ErrNotLocalNode
return
}
var stat syscall.Statfs_t
err = syscall.Statfs(n.Config.MySQL.DataDir, &stat)
err = syscall.Statfs(n.config.MySQL.DataDir, &stat)
total = uint64(stat.Bsize) * stat.Blocks
// on FreeBSD stat.Bavail may be negative
bavail := stat.Bavail
Expand Down Expand Up @@ -406,8 +406,8 @@ func getFlagsFromProcMounts(file, filesystem string) (string, error) {
}

func (n *Node) IsFileSystemReadonly() (bool, error) {
if n.Config.TestFilesystemReadonlyFile != "" {
return n.isTestFileSystemReadonly(n.Config.TestFilesystemReadonlyFile)
if n.config.TestFilesystemReadonlyFile != "" {
return n.isTestFileSystemReadonly(n.config.TestFilesystemReadonlyFile)
}
if !n.IsLocal() {
return false, ErrNotLocalNode
Expand All @@ -419,7 +419,7 @@ func (n *Node) IsFileSystemReadonly() (bool, error) {
}
file := string(data)

flag, err := getFlagsFromProcMounts(file, n.Config.MySQL.DataDir)
flag, err := getFlagsFromProcMounts(file, n.config.MySQL.DataDir)
if err != nil {
return false, err
}
Expand All @@ -435,7 +435,7 @@ func (n *Node) GetDaemonStartTime() (time.Time, error) {
if !n.IsLocal() {
return time.Time{}, ErrNotLocalNode
}
pidB, err := os.ReadFile(n.Config.MySQL.PidFile)
pidB, err := os.ReadFile(n.config.MySQL.PidFile)
if err != nil {
if os.IsNotExist(err) {
err = nil
Expand All @@ -461,7 +461,7 @@ func (n *Node) GetCrashRecoveryTime() (time.Time, error) {
if !n.IsLocal() {
return time.Time{}, ErrNotLocalNode
}
fh, err := os.Open(n.Config.MySQL.ErrorLog)
fh, err := os.Open(n.config.MySQL.ErrorLog)
if err != nil {
return time.Time{}, err
}
Expand Down Expand Up @@ -594,7 +594,7 @@ func (n *Node) IsReadOnly() (bool, bool, error) {
// Setting server read-only may take a while
// as server waits all running commits (not transactions) to be finished
func (n *Node) SetReadOnly(superReadOnly bool) error {
return n.setReadonlyWithTimeout(superReadOnly, n.Config.DBSetRoTimeout)
return n.setReadonlyWithTimeout(superReadOnly, n.config.DBSetRoTimeout)
}

func (n *Node) setReadonlyWithTimeout(superReadOnly bool, timeout time.Duration) error {
Expand Down Expand Up @@ -661,7 +661,7 @@ func (n *Node) SetReadOnlyWithForce(excludeUsers []string, superReadOnly bool) e

defer func() { quit <- true }()

return n.setReadonlyWithTimeout(superReadOnly, n.Config.DBSetRoForceTimeout)
return n.setReadonlyWithTimeout(superReadOnly, n.config.DBSetRoForceTimeout)
}

// SetWritable sets MySQL Node to be writable, eg. disables read-only
Expand All @@ -672,28 +672,28 @@ func (n *Node) SetWritable() error {
// StopSlave stops replication (both IO and SQL threads)
func (n *Node) StopSlave() error {
return n.execMogrifyWithTimeout(queryStopSlave, map[string]interface{}{
"channel": n.Config.ReplicationChannel,
}, n.Config.DBStopSlaveSQLThreadTimeout)
"channel": n.config.ReplicationChannel,
}, n.config.DBStopSlaveSQLThreadTimeout)
}

// StartSlave starts replication (both IO and SQL threads)
func (n *Node) StartSlave() error {
return n.ExecMogrify(queryStartSlave, map[string]interface{}{
"channel": n.Config.ReplicationChannel,
"channel": n.config.ReplicationChannel,
})
}

// StopSlaveIOThread stops IO replication thread
func (n *Node) StopSlaveIOThread() error {
return n.ExecMogrify(queryStopSlaveIOThread, map[string]interface{}{
"channel": n.Config.ReplicationChannel,
"channel": n.config.ReplicationChannel,
})
}

// StartSlaveIOThread starts IO replication thread
func (n *Node) StartSlaveIOThread() error {
return n.ExecMogrify(queryStartSlaveIOThread, map[string]interface{}{
"channel": n.Config.ReplicationChannel,
"channel": n.config.ReplicationChannel,
})
}

Expand All @@ -709,21 +709,21 @@ func (n *Node) RestartSlaveIOThread() error {
// StopSlaveSQLThread stops SQL replication thread
func (n *Node) StopSlaveSQLThread() error {
return n.execMogrifyWithTimeout(queryStopSlaveSQLThread, map[string]interface{}{
"channel": n.Config.ReplicationChannel,
}, n.Config.DBStopSlaveSQLThreadTimeout)
"channel": n.config.ReplicationChannel,
}, n.config.DBStopSlaveSQLThreadTimeout)
}

// StartSlaveSQLThread starts SQL replication thread
func (n *Node) StartSlaveSQLThread() error {
return n.ExecMogrify(queryStartSlaveSQLThread, map[string]interface{}{
"channel": n.Config.ReplicationChannel,
"channel": n.config.ReplicationChannel,
})
}

// ResetSlaveAll promotes MySQL Node to be master
func (n *Node) ResetSlaveAll() error {
return n.ExecMogrify(queryResetSlaveAll, map[string]interface{}{
"channel": n.Config.ReplicationChannel,
"channel": n.config.ReplicationChannel,
})
}

Expand Down Expand Up @@ -784,20 +784,20 @@ func (n *Node) SetOnline() error {
// ChangeMaster changes master of MySQL Node, demoting it to slave
func (n *Node) ChangeMaster(host string) error {
useSsl := 0
if n.Config.MySQL.ReplicationSslCA != "" {
if n.config.MySQL.ReplicationSslCA != "" {
useSsl = 1
}
return n.ExecMogrify(queryChangeMaster, map[string]interface{}{
"host": host,
"port": n.Config.MySQL.ReplicationPort,
"user": n.Config.MySQL.ReplicationUser,
"password": n.Config.MySQL.ReplicationPassword,
"port": n.config.MySQL.ReplicationPort,
"user": n.config.MySQL.ReplicationUser,
"password": n.config.MySQL.ReplicationPassword,
"ssl": useSsl,
"sslCa": n.Config.MySQL.ReplicationSslCA,
"retryCount": n.Config.MySQL.ReplicationRetryCount,
"connectRetry": n.Config.MySQL.ReplicationConnectRetry,
"heartbeatPeriod": n.Config.MySQL.ReplicationHeartbeatPeriod,
"channel": n.Config.ReplicationChannel,
"sslCa": n.config.MySQL.ReplicationSslCA,
"retryCount": n.config.MySQL.ReplicationRetryCount,
"connectRetry": n.config.MySQL.ReplicationConnectRetry,
"heartbeatPeriod": n.config.MySQL.ReplicationHeartbeatPeriod,
"channel": n.config.ReplicationChannel,
})
}

Expand Down Expand Up @@ -878,7 +878,7 @@ func (n *Node) GetStartupTime() (time.Time, error) {

// GetReplicaStatus returns slave/replica status or nil if node is master
func (n *Node) GetReplicaStatus() (ReplicaStatus, error) {
return n.ReplicaStatusWithTimeout(n.Config.DBTimeout, n.Config.ReplicationChannel)
return n.ReplicaStatusWithTimeout(n.config.DBTimeout, n.config.ReplicationChannel)
}

func (n *Node) UpdateExternalCAFile() error {
Expand All @@ -888,7 +888,7 @@ func (n *Node) UpdateExternalCAFile() error {
return nil
}
data := replSettings.SourceSslCa
fileName := n.Config.MySQL.ExternalReplicationSslCA
fileName := n.config.MySQL.ExternalReplicationSslCA
if data != "" && fileName != "" {
err = util.TouchFile(fileName)
if err != nil {
Expand Down
21 changes: 11 additions & 10 deletions internal/mysql/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ func NewExternalReplication(replicationType util.ExternalReplicationType, logger
return &ExternalReplication{
logger: logger,
}, nil
default:
return &UnimplementedExternalReplication{}, nil
}
return &UnimplementedExternalReplication{}, nil
}

// GetExternalReplicaStatus returns slave/replica status or nil if node is master for external channel
Expand All @@ -66,7 +67,7 @@ func (er *ExternalReplication) GetReplicaStatus(n *Node) (ReplicaStatus, error)
return nil, nil
}

return n.ReplicaStatusWithTimeout(n.Config.DBTimeout, n.Config.ExternalReplicationChannel)
return n.ReplicaStatusWithTimeout(n.config.DBTimeout, n.config.ExternalReplicationChannel)
}

// StartExternalReplication starts external replication
Expand All @@ -77,7 +78,7 @@ func (er *ExternalReplication) Start(n *Node) error {
}
if checked {
err := n.ExecMogrify(queryStartReplica, map[string]interface{}{
"channel": n.Config.ExternalReplicationChannel,
"channel": n.config.ExternalReplicationChannel,
})
if err != nil {
return err
Expand All @@ -94,7 +95,7 @@ func (er *ExternalReplication) Stop(n *Node) error {
}
if checked {
err := n.ExecMogrify(queryStopReplica, map[string]interface{}{
"channel": n.Config.ExternalReplicationChannel,
"channel": n.config.ExternalReplicationChannel,
})
if err != nil && !IsErrorChannelDoesNotExists(err) {
return err
Expand All @@ -111,7 +112,7 @@ func (er *ExternalReplication) Reset(n *Node) error {
}
if checked {
err := n.ExecMogrify(queryResetReplicaAll, map[string]interface{}{
"channel": n.Config.ExternalReplicationChannel,
"channel": n.config.ExternalReplicationChannel,
})
if err != nil && !IsErrorChannelDoesNotExists(err) {
return err
Expand Down Expand Up @@ -145,9 +146,9 @@ func (er *ExternalReplication) Set(n *Node) error {
}
useSsl := 0
sslCa := ""
if replSettings.SourceSslCa != "" && n.Config.MySQL.ExternalReplicationSslCA != "" {
if replSettings.SourceSslCa != "" && n.config.MySQL.ExternalReplicationSslCA != "" {
useSsl = 1
sslCa = n.Config.MySQL.ExternalReplicationSslCA
sslCa = n.config.MySQL.ExternalReplicationSslCA
}
err = er.Stop(n)
if err != nil {
Expand All @@ -165,9 +166,9 @@ func (er *ExternalReplication) Set(n *Node) error {
"ssl": useSsl,
"sslCa": sslCa,
"sourceDelay": replSettings.SourceDelay,
"retryCount": n.Config.MySQL.ReplicationRetryCount,
"connectRetry": n.Config.MySQL.ReplicationConnectRetry,
"heartbeatPeriod": n.Config.MySQL.ReplicationHeartbeatPeriod,
"retryCount": n.config.MySQL.ReplicationRetryCount,
"connectRetry": n.config.MySQL.ReplicationConnectRetry,
"heartbeatPeriod": n.config.MySQL.ReplicationHeartbeatPeriod,
"channel": "external",
})
if err != nil {
Expand Down

0 comments on commit 67d8654

Please sign in to comment.