Skip to content

Commit

Permalink
use fallback addresses only if all primary fail
Browse files Browse the repository at this point in the history
  • Loading branch information
sni committed Jan 20, 2025
1 parent f3df9ef commit 97b339d
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 21 deletions.
3 changes: 3 additions & 0 deletions Changes
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
This file documents the revision history for the Livestatus Multitool Daemon (LMD)

next:
- use fallback addresses only if all primary fail

2.3.0 Thu Jan 16 08:20:30 CET 2025
- use primary key index for all tables
- use output buffer for queries without fixed header to reduce locked time
Expand Down
15 changes: 11 additions & 4 deletions pkg/lmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Connection struct {
TLSServerName string `toml:"tlsservername"`
TLSCA string `toml:"tlsca"`
Source []string `toml:"source"`
Fallback []string `toml:"fallback"`
Flags []string `toml:"flags"`
TLSSkipVerify int `toml:"tlsskipverify"`
NoConfigTool int `toml:"noconfigtool"` // skip adding config tool to sites query
Expand All @@ -44,6 +45,7 @@ func (c *Connection) Equals(other *Connection) bool {
equal = equal && c.TLSSkipVerify == other.TLSSkipVerify
equal = equal && c.NoConfigTool == other.NoConfigTool
equal = equal && strings.Join(c.Source, ":") == strings.Join(other.Source, ":")
equal = equal && strings.Join(c.Fallback, ":") == strings.Join(other.Fallback, ":")
equal = equal && strings.Join(c.Flags, ":") == strings.Join(other.Flags, ":")

return equal
Expand Down Expand Up @@ -167,10 +169,15 @@ func NewConfig(files []string) *Config {
conf.Listen = allListeners
conf.Connections = allConnections

for i := range conf.Connections {
for j := range conf.Connections[i].Source {
if strings.HasPrefix(conf.Connections[i].Source[j], "http") {
conf.Connections[i].Source[j] = completePeerHTTPAddr(conf.Connections[i].Source[j])
for num := range conf.Connections {
for j := range conf.Connections[num].Source {
if strings.HasPrefix(conf.Connections[num].Source[j], "http") {
conf.Connections[num].Source[j] = completePeerHTTPAddr(conf.Connections[num].Source[j])
}
}
for j := range conf.Connections[num].Fallback {
if strings.HasPrefix(conf.Connections[num].Fallback[j], "http") {
conf.Connections[num].Fallback[j] = completePeerHTTPAddr(conf.Connections[num].Fallback[j])
}
}
}
Expand Down
77 changes: 60 additions & 17 deletions pkg/lmd/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type Peer struct {
Response []byte // reference to last response
}
Source []string // reference to all connection strings
Fallback []string // reference to all fallback connection strings
SubName []string
SubType []string
SubAddr []string
Expand Down Expand Up @@ -234,6 +235,7 @@ func NewPeer(lmd *Daemon, config *Connection) *Peer {
Name: config.Name,
ID: config.ID,
Source: config.Source,
Fallback: config.Fallback,
PeerAddr: config.Source[0],
PeerState: PeerStatusPending,
LastError: "connecting...",
Expand Down Expand Up @@ -302,6 +304,13 @@ func (p *Peer) SetHTTPClient() {
break
}
}
for _, addr := range p.Fallback {
if strings.HasPrefix(addr, "http") {
hasHTTP = true

break
}
}
if !hasHTTP {
return
}
Expand Down Expand Up @@ -1155,7 +1164,7 @@ func (p *Peer) getSocketQueryResponseWithTemporaryRetries(req *Request, query st
peerAddr, connType := extractConnType(p.statusGetLocked(PeerAddr).(string))
conn.Close()
var oErr error
conn, oErr = p.OpenConnection(peerAddr, connType)
conn, oErr = p.openConnection(peerAddr, connType)
if oErr != nil {
// return both errors
return nil, conn, fmt.Errorf("connection failed: %w, retry failed as well: %s", err, oErr.Error())
Expand Down Expand Up @@ -1314,7 +1323,7 @@ func (p *Peer) parseResponseFixedSize(req *Request, conn io.ReadCloser) ([]byte,
func (p *Peer) Query(ctx context.Context, req *Request) (result ResultSet, meta *ResultMetaData, err error) {
result, meta, err = p.query(ctx, req)
if err != nil {
p.setNextAddrFromErr(err, req)
p.setNextAddrFromErr(err, req, p.Source)
}

return
Expand Down Expand Up @@ -1406,7 +1415,27 @@ func (p *Peer) validateResponseHeader(resBytes []byte, req *Request, code int, e
// return anything.
// It returns the connection object and any error encountered.
func (p *Peer) GetConnection(req *Request) (conn net.Conn, connType ConnectionType, err error) {
for num := range len(p.Source) {
// try primary sources first
conn, connType, err = p.tryConnection(req, p.Source)
if err == nil {
return conn, connType, nil
}

// then fallback sources
if len(p.Fallback) > 0 {
p.CurPeerAddrNum = 0
p.statusSetLocked(PeerAddr, p.Fallback[0])
conn, connType, err = p.tryConnection(req, p.Fallback)
if err == nil {
return conn, connType, nil
}
}

return nil, ConnTypeUnix, &PeerError{msg: err.Error(), kind: ConnectionError, srcErr: err}
}

func (p *Peer) tryConnection(req *Request, source []string) (conn net.Conn, connType ConnectionType, err error) {
for num := range source {
var peerAddr string
peerAddr, connType = extractConnType(p.statusGetLocked(PeerAddr).(string))
if connType == ConnTypeHTTP {
Expand All @@ -1420,7 +1449,7 @@ func (p *Peer) GetConnection(req *Request) (conn net.Conn, connType ConnectionTy
return conn, connType, nil
}

conn, err = p.OpenConnection(peerAddr, connType)
conn, err = p.openConnection(peerAddr, connType)

// connection successful
if err == nil {
Expand All @@ -1434,13 +1463,13 @@ func (p *Peer) GetConnection(req *Request) (conn net.Conn, connType ConnectionTy
}

// connection error
p.setNextAddrFromErr(err, req)
p.setNextAddrFromErr(err, req, source)
}

return nil, ConnTypeUnix, &PeerError{msg: err.Error(), kind: ConnectionError, srcErr: err}
return nil, ConnTypeUnix, err
}

func (p *Peer) OpenConnection(peerAddr string, connType ConnectionType) (conn net.Conn, err error) {
func (p *Peer) openConnection(peerAddr string, connType ConnectionType) (conn net.Conn, err error) {
switch connType {
case ConnTypeTCP:
logWith(p).Tracef("doing tcp connection test: %s", peerAddr)
Expand Down Expand Up @@ -1526,7 +1555,7 @@ func extractConnType(rawAddr string) (string, ConnectionType) {
return rawAddr, connType
}

func (p *Peer) setNextAddrFromErr(err error, req *Request) {
func (p *Peer) setNextAddrFromErr(err error, req *Request, source []string) {
var peerCmdErr *PeerCommandError
if errors.As(err, &peerCmdErr) {
// client errors do not affect remote site status
Expand All @@ -1545,15 +1574,16 @@ func (p *Peer) setNextAddrFromErr(err error, req *Request) {
p.LastError = strings.TrimSpace(err.Error())
p.ErrorCount++

numSources := len(p.Source)
numSources := len(source)
numAllSources := len(p.Source) + len(p.Fallback)

// try next node if there are multiple
nextNum := p.CurPeerAddrNum + 1
if nextNum >= numSources {
nextNum = 0
}
p.CurPeerAddrNum = nextNum
p.PeerAddr = p.Source[nextNum]
p.PeerAddr = source[nextNum]

// invalidate connection cache
p.closeConnectionPool()
Expand All @@ -1568,7 +1598,7 @@ func (p *Peer) setNextAddrFromErr(err error, req *Request) {
now := currentUnixTime()
lastOnline := p.LastOnline
logWith(logContext...).Debugf("last online: %s", timeOrNever(lastOnline))
if lastOnline < now-float64(p.lmd.Config.StaleBackendTimeout) || (p.ErrorCount > numSources && lastOnline <= 0) {
if lastOnline < now-float64(p.lmd.Config.StaleBackendTimeout) || (p.ErrorCount > numAllSources && lastOnline <= 0) {
if p.PeerState != PeerStatusDown {
logWith(logContext...).Infof("site went offline: %s", err.Error())
}
Expand All @@ -1577,7 +1607,7 @@ func (p *Peer) setNextAddrFromErr(err error, req *Request) {
p.ClearData(false)
}

if numSources > 1 {
if numAllSources > 1 {
logWith(logContext...).Debugf("trying next one: %s", p.PeerAddr)
}
}
Expand Down Expand Up @@ -1693,7 +1723,7 @@ func (p *Peer) fetchThrukExtras(ctx context.Context) (conf, thrukextras map[stri
return
}
// try all http connections and return first config tool config
for _, addr := range p.Config.Source {
for _, addr := range p.buildCombinedAddressList() {
if strings.HasPrefix(addr, "http") {
configTool, thruk, extraErr := p.fetchThrukExtrasFromAddr(ctx, addr)
err = extraErr
Expand Down Expand Up @@ -1756,7 +1786,7 @@ func (p *Peer) extractThrukExtrasFromResult(output []interface{}) (configtool, t
if ts, ok2 := processinfo["localtime"]; ok2 {
err := p.CheckLocaltime(interface2float64(ts))
if err != nil {
p.setNextAddrFromErr(err, nil)
p.setNextAddrFromErr(err, nil, p.Source)

return nil, nil, err
}
Expand All @@ -1779,6 +1809,18 @@ func (p *Peer) extractThrukExtrasFromResult(output []interface{}) (configtool, t
return nil, nil, nil
}

func (p *Peer) buildCombinedAddressList() (list []string) {
if len(p.Config.Fallback) == 0 {
return p.Config.Source
}

list = make([]string, 0, len(p.Config.Source)+len(p.Config.Fallback))
list = append(list, p.Config.Source...)
list = append(list, p.Config.Fallback...)

return list
}

func (p *Peer) fetchRemotePeers(ctx context.Context, store *DataStoreSet) (sites []interface{}, err error) {
// no http client is a sure sign for no http connection
if p.cache.HTTPClient == nil {
Expand All @@ -1795,7 +1837,7 @@ func (p *Peer) fetchRemotePeers(ctx context.Context, store *DataStoreSet) (sites
return nil, nil
}
// try all http connections and use first working connection
for _, addr := range p.Config.Source {
for _, addr := range p.buildCombinedAddressList() {
if !strings.HasPrefix(addr, "http") {
continue
}
Expand Down Expand Up @@ -2257,7 +2299,7 @@ func (p *Peer) PassThroughQuery(ctx context.Context, res *Response, passthroughR
var peerErr *PeerError
if !errors.As(queryErr, &peerErr) || peerErr.kind != ResponseError {
// connection issue, need to reset current connection
p.setNextAddrFromErr(queryErr, passthroughRequest)
p.setNextAddrFromErr(queryErr, passthroughRequest, p.Source)
}
logWith(p, req).Tracef("passthrough req errored %s", queryErr.Error())
res.Lock.Lock()
Expand Down Expand Up @@ -2784,7 +2826,7 @@ func (p *Peer) requestLocaltime(ctx context.Context) (err error) {
unix := interface2float64(res[0][0])
err = p.CheckLocaltime(unix)
if err != nil {
p.setNextAddrFromErr(err, req)
p.setNextAddrFromErr(err, req, p.Source)
}

return
Expand Down Expand Up @@ -2886,6 +2928,7 @@ func (p *Peer) addSubPeer(ctx context.Context, subFlag OptionalFlags, key, subNa
ID: subID,
Name: subName,
Source: p.Source,
Fallback: p.Fallback,
RemoteName: subName,
TLSCertificate: p.Config.TLSCertificate,
TLSKey: p.Config.TLSKey,
Expand Down

0 comments on commit 97b339d

Please sign in to comment.