From 97b339d4256666cac7771e9d87e308eb176d41fa Mon Sep 17 00:00:00 2001 From: Sven Nierlein Date: Mon, 20 Jan 2025 17:10:30 +0100 Subject: [PATCH] use fallback addresses only if all primary fail --- Changes | 3 ++ pkg/lmd/config.go | 15 ++++++--- pkg/lmd/peer.go | 77 ++++++++++++++++++++++++++++++++++++----------- 3 files changed, 74 insertions(+), 21 deletions(-) diff --git a/Changes b/Changes index 6e0e5db..67b7460 100644 --- a/Changes +++ b/Changes @@ -1,5 +1,8 @@ This file documents the revision history for the Livestatus Multitool Daemon (LMD) +next: + - use fallback addresses only if all primary fail + 2.3.0 Thu Jan 16 08:20:30 CET 2025 - use primary key index for all tables - use output buffer for queries without fixed header to reduce locked time diff --git a/pkg/lmd/config.go b/pkg/lmd/config.go index 6a6f410..696428b 100644 --- a/pkg/lmd/config.go +++ b/pkg/lmd/config.go @@ -25,6 +25,7 @@ type Connection struct { TLSServerName string `toml:"tlsservername"` TLSCA string `toml:"tlsca"` Source []string `toml:"source"` + Fallback []string `toml:"fallback"` Flags []string `toml:"flags"` TLSSkipVerify int `toml:"tlsskipverify"` NoConfigTool int `toml:"noconfigtool"` // skip adding config tool to sites query @@ -44,6 +45,7 @@ func (c *Connection) Equals(other *Connection) bool { equal = equal && c.TLSSkipVerify == other.TLSSkipVerify equal = equal && c.NoConfigTool == other.NoConfigTool equal = equal && strings.Join(c.Source, ":") == strings.Join(other.Source, ":") + equal = equal && strings.Join(c.Fallback, ":") == strings.Join(other.Fallback, ":") equal = equal && strings.Join(c.Flags, ":") == strings.Join(other.Flags, ":") return equal @@ -167,10 +169,15 @@ func NewConfig(files []string) *Config { conf.Listen = allListeners conf.Connections = allConnections - for i := range conf.Connections { - for j := range conf.Connections[i].Source { - if strings.HasPrefix(conf.Connections[i].Source[j], "http") { - conf.Connections[i].Source[j] = completePeerHTTPAddr(conf.Connections[i].Source[j]) + for num := range conf.Connections { + for j := range conf.Connections[num].Source { + if strings.HasPrefix(conf.Connections[num].Source[j], "http") { + conf.Connections[num].Source[j] = completePeerHTTPAddr(conf.Connections[num].Source[j]) + } + } + for j := range conf.Connections[num].Fallback { + if strings.HasPrefix(conf.Connections[num].Fallback[j], "http") { + conf.Connections[num].Fallback[j] = completePeerHTTPAddr(conf.Connections[num].Fallback[j]) } } } diff --git a/pkg/lmd/peer.go b/pkg/lmd/peer.go index d3f6f9f..903f464 100644 --- a/pkg/lmd/peer.go +++ b/pkg/lmd/peer.go @@ -94,6 +94,7 @@ type Peer struct { Response []byte // reference to last response } Source []string // reference to all connection strings + Fallback []string // reference to all fallback connection strings SubName []string SubType []string SubAddr []string @@ -234,6 +235,7 @@ func NewPeer(lmd *Daemon, config *Connection) *Peer { Name: config.Name, ID: config.ID, Source: config.Source, + Fallback: config.Fallback, PeerAddr: config.Source[0], PeerState: PeerStatusPending, LastError: "connecting...", @@ -302,6 +304,13 @@ func (p *Peer) SetHTTPClient() { break } } + for _, addr := range p.Fallback { + if strings.HasPrefix(addr, "http") { + hasHTTP = true + + break + } + } if !hasHTTP { return } @@ -1155,7 +1164,7 @@ func (p *Peer) getSocketQueryResponseWithTemporaryRetries(req *Request, query st peerAddr, connType := extractConnType(p.statusGetLocked(PeerAddr).(string)) conn.Close() var oErr error - conn, oErr = p.OpenConnection(peerAddr, connType) + conn, oErr = p.openConnection(peerAddr, connType) if oErr != nil { // return both errors return nil, conn, fmt.Errorf("connection failed: %w, retry failed as well: %s", err, oErr.Error()) @@ -1314,7 +1323,7 @@ func (p *Peer) parseResponseFixedSize(req *Request, conn io.ReadCloser) ([]byte, func (p *Peer) Query(ctx context.Context, req *Request) (result ResultSet, meta *ResultMetaData, err error) { result, meta, err = p.query(ctx, req) if err != nil { - p.setNextAddrFromErr(err, req) + p.setNextAddrFromErr(err, req, p.Source) } return @@ -1406,7 +1415,27 @@ func (p *Peer) validateResponseHeader(resBytes []byte, req *Request, code int, e // return anything. // It returns the connection object and any error encountered. func (p *Peer) GetConnection(req *Request) (conn net.Conn, connType ConnectionType, err error) { - for num := range len(p.Source) { + // try primary sources first + conn, connType, err = p.tryConnection(req, p.Source) + if err == nil { + return conn, connType, nil + } + + // then fallback sources + if len(p.Fallback) > 0 { + p.CurPeerAddrNum = 0 + p.statusSetLocked(PeerAddr, p.Fallback[0]) + conn, connType, err = p.tryConnection(req, p.Fallback) + if err == nil { + return conn, connType, nil + } + } + + return nil, ConnTypeUnix, &PeerError{msg: err.Error(), kind: ConnectionError, srcErr: err} +} + +func (p *Peer) tryConnection(req *Request, source []string) (conn net.Conn, connType ConnectionType, err error) { + for num := range source { var peerAddr string peerAddr, connType = extractConnType(p.statusGetLocked(PeerAddr).(string)) if connType == ConnTypeHTTP { @@ -1420,7 +1449,7 @@ func (p *Peer) GetConnection(req *Request) (conn net.Conn, connType ConnectionTy return conn, connType, nil } - conn, err = p.OpenConnection(peerAddr, connType) + conn, err = p.openConnection(peerAddr, connType) // connection successful if err == nil { @@ -1434,13 +1463,13 @@ func (p *Peer) GetConnection(req *Request) (conn net.Conn, connType ConnectionTy } // connection error - p.setNextAddrFromErr(err, req) + p.setNextAddrFromErr(err, req, source) } - return nil, ConnTypeUnix, &PeerError{msg: err.Error(), kind: ConnectionError, srcErr: err} + return nil, ConnTypeUnix, err } -func (p *Peer) OpenConnection(peerAddr string, connType ConnectionType) (conn net.Conn, err error) { +func (p *Peer) openConnection(peerAddr string, connType ConnectionType) (conn net.Conn, err error) { switch connType { case ConnTypeTCP: logWith(p).Tracef("doing tcp connection test: %s", peerAddr) @@ -1526,7 +1555,7 @@ func extractConnType(rawAddr string) (string, ConnectionType) { return rawAddr, connType } -func (p *Peer) setNextAddrFromErr(err error, req *Request) { +func (p *Peer) setNextAddrFromErr(err error, req *Request, source []string) { var peerCmdErr *PeerCommandError if errors.As(err, &peerCmdErr) { // client errors do not affect remote site status @@ -1545,7 +1574,8 @@ func (p *Peer) setNextAddrFromErr(err error, req *Request) { p.LastError = strings.TrimSpace(err.Error()) p.ErrorCount++ - numSources := len(p.Source) + numSources := len(source) + numAllSources := len(p.Source) + len(p.Fallback) // try next node if there are multiple nextNum := p.CurPeerAddrNum + 1 @@ -1553,7 +1583,7 @@ func (p *Peer) setNextAddrFromErr(err error, req *Request) { nextNum = 0 } p.CurPeerAddrNum = nextNum - p.PeerAddr = p.Source[nextNum] + p.PeerAddr = source[nextNum] // invalidate connection cache p.closeConnectionPool() @@ -1568,7 +1598,7 @@ func (p *Peer) setNextAddrFromErr(err error, req *Request) { now := currentUnixTime() lastOnline := p.LastOnline logWith(logContext...).Debugf("last online: %s", timeOrNever(lastOnline)) - if lastOnline < now-float64(p.lmd.Config.StaleBackendTimeout) || (p.ErrorCount > numSources && lastOnline <= 0) { + if lastOnline < now-float64(p.lmd.Config.StaleBackendTimeout) || (p.ErrorCount > numAllSources && lastOnline <= 0) { if p.PeerState != PeerStatusDown { logWith(logContext...).Infof("site went offline: %s", err.Error()) } @@ -1577,7 +1607,7 @@ func (p *Peer) setNextAddrFromErr(err error, req *Request) { p.ClearData(false) } - if numSources > 1 { + if numAllSources > 1 { logWith(logContext...).Debugf("trying next one: %s", p.PeerAddr) } } @@ -1693,7 +1723,7 @@ func (p *Peer) fetchThrukExtras(ctx context.Context) (conf, thrukextras map[stri return } // try all http connections and return first config tool config - for _, addr := range p.Config.Source { + for _, addr := range p.buildCombinedAddressList() { if strings.HasPrefix(addr, "http") { configTool, thruk, extraErr := p.fetchThrukExtrasFromAddr(ctx, addr) err = extraErr @@ -1756,7 +1786,7 @@ func (p *Peer) extractThrukExtrasFromResult(output []interface{}) (configtool, t if ts, ok2 := processinfo["localtime"]; ok2 { err := p.CheckLocaltime(interface2float64(ts)) if err != nil { - p.setNextAddrFromErr(err, nil) + p.setNextAddrFromErr(err, nil, p.Source) return nil, nil, err } @@ -1779,6 +1809,18 @@ func (p *Peer) extractThrukExtrasFromResult(output []interface{}) (configtool, t return nil, nil, nil } +func (p *Peer) buildCombinedAddressList() (list []string) { + if len(p.Config.Fallback) == 0 { + return p.Config.Source + } + + list = make([]string, 0, len(p.Config.Source)+len(p.Config.Fallback)) + list = append(list, p.Config.Source...) + list = append(list, p.Config.Fallback...) + + return list +} + func (p *Peer) fetchRemotePeers(ctx context.Context, store *DataStoreSet) (sites []interface{}, err error) { // no http client is a sure sign for no http connection if p.cache.HTTPClient == nil { @@ -1795,7 +1837,7 @@ func (p *Peer) fetchRemotePeers(ctx context.Context, store *DataStoreSet) (sites return nil, nil } // try all http connections and use first working connection - for _, addr := range p.Config.Source { + for _, addr := range p.buildCombinedAddressList() { if !strings.HasPrefix(addr, "http") { continue } @@ -2257,7 +2299,7 @@ func (p *Peer) PassThroughQuery(ctx context.Context, res *Response, passthroughR var peerErr *PeerError if !errors.As(queryErr, &peerErr) || peerErr.kind != ResponseError { // connection issue, need to reset current connection - p.setNextAddrFromErr(queryErr, passthroughRequest) + p.setNextAddrFromErr(queryErr, passthroughRequest, p.Source) } logWith(p, req).Tracef("passthrough req errored %s", queryErr.Error()) res.Lock.Lock() @@ -2784,7 +2826,7 @@ func (p *Peer) requestLocaltime(ctx context.Context) (err error) { unix := interface2float64(res[0][0]) err = p.CheckLocaltime(unix) if err != nil { - p.setNextAddrFromErr(err, req) + p.setNextAddrFromErr(err, req, p.Source) } return @@ -2886,6 +2928,7 @@ func (p *Peer) addSubPeer(ctx context.Context, subFlag OptionalFlags, key, subNa ID: subID, Name: subName, Source: p.Source, + Fallback: p.Fallback, RemoteName: subName, TLSCertificate: p.Config.TLSCertificate, TLSKey: p.Config.TLSKey,