Skip to content

Commit

Permalink
Small refactoring (#43)
Browse files Browse the repository at this point in the history
* Changed 'Disabled' const

* Small refactoring

* A couple of methods transformed to functions
  • Loading branch information
noname0443 authored Oct 11, 2023
1 parent 5bb7474 commit c6874ee
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 47 deletions.
4 changes: 2 additions & 2 deletions internal/mysql/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ type Cluster struct {
sync.Mutex
config *config.Config
logger *log.Logger
haNodes map[string]*Node
cascadeNodes map[string]*Node
local *Node
dcs dcs.DCS
haNodes map[string]*Node
cascadeNodes map[string]*Node
}

func (c *Cluster) IsHAHost(hostname string) bool {
Expand Down
2 changes: 1 addition & 1 deletion internal/mysql/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ type NodeConfiguration struct {
}

type ResetupStatus struct {
Status bool
UpdateTime time.Time
Status bool
}

type replicationSettings struct {
Expand Down
89 changes: 48 additions & 41 deletions internal/mysql/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import (
type Node struct {
config *config.Config
logger *log.Logger
host string
db *sqlx.DB
version *Version
host string
}

var (
Expand Down Expand Up @@ -147,8 +147,8 @@ func (n *Node) getQuery(name string) string {
func (n *Node) traceQuery(query string, arg interface{}, result interface{}, err error) {
query = queryOnliner.ReplaceAllString(query, " ")
msg := fmt.Sprintf("node %s running query '%s' with args %#v, result: %#v, error: %v", n.host, query, arg, result, err)
msg = strings.Replace(msg, n.config.MySQL.Password, "********", -1)
msg = strings.Replace(msg, n.config.MySQL.ReplicationPassword, "********", -1)
msg = strings.ReplaceAll(msg, n.config.MySQL.Password, "********")
msg = strings.ReplaceAll(msg, n.config.MySQL.ReplicationPassword, "********")
n.logger.Debug(msg)
}

Expand All @@ -158,15 +158,9 @@ func (n *Node) queryRow(queryName string, arg interface{}, result interface{}) e
}

func (n *Node) queryRowWithTimeout(queryName string, arg interface{}, result interface{}, timeout time.Duration) error {
if arg == nil {
arg = struct{}{}
}
query := n.getQuery(queryName)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
rows, err := n.db.NamedQueryContext(ctx, query, arg)
if err == nil {
defer func() { _ = rows.Close() }()
return n.processQuery(queryName, arg, func(rows *sqlx.Rows) error {
var err error

if rows.Next() {
err = rows.StructScan(result)
} else {
Expand All @@ -175,32 +169,45 @@ func (n *Node) queryRowWithTimeout(queryName string, arg interface{}, result int
err = sql.ErrNoRows
}
}
}
n.traceQuery(query, arg, result, err)
return err

return err
}, timeout)
}

// nolint: unparam
func (n *Node) queryRows(queryName string, arg interface{}, scanner func(*sqlx.Rows) error) error {
return n.processQuery(queryName, arg, func(rows *sqlx.Rows) error {
var err error

for rows.Next() {
err = scanner(rows)
if err != nil {
break
}
}

return err
}, n.config.DBTimeout)
}

func (n *Node) processQuery(queryName string, arg interface{}, rowsProcessor func(*sqlx.Rows) error, timeout time.Duration) error {
if arg == nil {
arg = struct{}{}
}
query := n.getQuery(queryName)
ctx, cancel := context.WithTimeout(context.Background(), n.config.DBTimeout)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

query := n.getQuery(queryName)
rows, err := n.db.NamedQueryContext(ctx, query, arg)
n.traceQuery(query, arg, rows, err)
if err != nil {
return err
}

defer func() { _ = rows.Close() }()
for rows.Next() {
err = scanner(rows)
if err != nil {
break
}
}
return err

return rowsProcessor(rows)
}

// nolint: unparam
Expand Down Expand Up @@ -265,8 +272,8 @@ func (n *Node) getRunningQueryIds(excludeUsers []string, timeout time.Duration)
type schemaname string

func escape(s string) string {
s = strings.Replace(s, `\`, `\\`, -1)
s = strings.Replace(s, `'`, `\'`, -1)
s = strings.ReplaceAll(s, `\`, `\\`)
s = strings.ReplaceAll(s, `'`, `\'`)
return s
}

Expand Down Expand Up @@ -372,18 +379,6 @@ func (n *Node) GetDiskUsage() (used uint64, total uint64, err error) {
return
}

func (n *Node) isTestFileSystemReadonly(f string) (bool, error) {
data, err := os.ReadFile(f)
if err != nil {
return false, err
}
value, err := strconv.ParseBool(strings.TrimSpace(string(data)))
if err != nil {
return false, fmt.Errorf("error while parce test file: %s", err)
}
return value, nil
}

func getFlagsFromProcMounts(file, filesystem string) (string, error) {
for _, line := range strings.Split(file, "\n") {
components := strings.Split(line, " ")
Expand All @@ -407,7 +402,7 @@ func getFlagsFromProcMounts(file, filesystem string) (string, error) {

func (n *Node) IsFileSystemReadonly() (bool, error) {
if n.config.TestFilesystemReadonlyFile != "" {
return n.isTestFileSystemReadonly(n.config.TestFilesystemReadonlyFile)
return isTestFileSystemReadonly(n.config.TestFilesystemReadonlyFile)
}
if !n.IsLocal() {
return false, ErrNotLocalNode
Expand All @@ -431,6 +426,18 @@ func (n *Node) IsFileSystemReadonly() (bool, error) {
}
}

func isTestFileSystemReadonly(f string) (bool, error) {
data, err := os.ReadFile(f)
if err != nil {
return false, err
}
value, err := strconv.ParseBool(strings.TrimSpace(string(data)))
if err != nil {
return false, fmt.Errorf("error while parce test file: %s", err)
}
return value, nil
}

func (n *Node) GetDaemonStartTime() (time.Time, error) {
if !n.IsLocal() {
return time.Time{}, ErrNotLocalNode
Expand Down Expand Up @@ -900,7 +907,7 @@ func (n *Node) UpdateExternalCAFile() error {
}
if data != string(oldDataByte) {
n.logger.Infof("saving new CA file to %s", fileName)
err := n.SaveCAFile(data, fileName)
err := SaveCAFile(data, fileName)
if err != nil {
return err
}
Expand All @@ -920,7 +927,7 @@ func (n *Node) UpdateExternalCAFile() error {
return nil
}

func (n *Node) SaveCAFile(data string, path string) error {
func SaveCAFile(data string, path string) error {
rootCertPool := x509.NewCertPool()
byteData := []byte(data)
if ok := rootCertPool.AppendCertsFromPEM(byteData); !ok {
Expand Down
6 changes: 4 additions & 2 deletions internal/mysql/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ var dubiousErrorNumbers = []uint16{
1698, // Symbol: ER_ACCESS_DENIED_NO_PASSWORD_ERROR; SQLSTATE: 28000
}

const channelDoesNotExists = 3074 // Symbol: ER_REPLICA_CHANNEL_DOES_NOT_EXIST; SQLSTATE: HY000
const tableDoesNotExists = 1146 // Symbol: ER_NO_SUCH_TABLE; SQLSTATE: 42S02
const (
channelDoesNotExists = 3074 // Symbol: ER_REPLICA_CHANNEL_DOES_NOT_EXIST; SQLSTATE: HY000
tableDoesNotExists = 1146 // Symbol: ER_NO_SUCH_TABLE; SQLSTATE: 42S02
)

// IsErrorDubious check that error may be caused by misconfiguration, mysync/scripts bugs
// and not related to MySQL/network failure
Expand Down
2 changes: 1 addition & 1 deletion internal/util/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ package util
type ExternalReplicationType string

const (
Disabled ExternalReplicationType = ""
Disabled ExternalReplicationType = "off"
MyExternalReplication ExternalReplicationType = "external"
)

0 comments on commit c6874ee

Please sign in to comment.