diff --git a/Makefile b/Makefile index dded906..51e4546 100644 --- a/Makefile +++ b/Makefile @@ -25,7 +25,6 @@ unit-test: go test -race -count=1 -timeout 240s -cover ./logs/... \ ./manager/node/... \ ./manager/workload/... \ - ./selfmon/... \ ./types/... \ ./utils/... diff --git a/README.md b/README.md index 14cfe70..c3b35e8 100644 --- a/README.md +++ b/README.md @@ -68,4 +68,4 @@ Make sure you can clone code by ssh protocol because libgit2 ask for it. So you container deploy -pod --entry agent --network --deploy-method fill --image | --count 1 --file :/etc/eru/agent.yaml [--cpu 0.3 | --mem 1024000000] http://bit.ly/EruAgent ``` -Now you will find agent was started in each node, and monitor containers status include itself. +Now you will find agent was started in each node, and monitor containers status include itself. \ No newline at end of file diff --git a/agent.go b/agent.go index b0ef994..a842b41 100644 --- a/agent.go +++ b/agent.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "math/rand" "os" @@ -12,7 +13,6 @@ import ( "github.com/projecteru2/agent/api" "github.com/projecteru2/agent/manager/node" "github.com/projecteru2/agent/manager/workload" - "github.com/projecteru2/agent/selfmon" "github.com/projecteru2/agent/types" "github.com/projecteru2/agent/utils" "github.com/projecteru2/agent/version" @@ -56,17 +56,12 @@ func serve(c *cli.Context) error { utils.WritePid(config.PidFile) defer os.Remove(config.PidFile) - if c.Bool("selfmon") { - mon, err := selfmon.New(c.Context, config) - if err != nil { - return err - } - return mon.Run(c.Context) - } - - ctx, cancel := signal.NotifyContext(c.Context, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + ctx, cancel := context.WithCancel(c.Context) defer cancel() + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR1) + errChan := make(chan error, 2) defer close(errChan) @@ -103,10 +98,18 @@ func serve(c *cli.Context) error { go func() { select { case <-ctx.Done(): - log.Info("[agent] Agent caught system signal, exiting") + log.Info("[agent] Agent exiting") case <-errChan: log.Info("[agent] got err, exiting") cancel() + case sig := <-signalChan: + log.Infof("[agent] Agent caught system signal %v", sig) + if sig != syscall.SIGUSR1 { + if err := nodeManager.Exit(); err != nil { + log.Errorf("[agent] node manager exits with err: %v", err) + } + } + cancel() } }() @@ -234,17 +237,6 @@ func main() { Usage: "change hostname", EnvVars: []string{"ERU_HOSTNAME"}, }, - &cli.BoolFlag{ - Name: "selfmon", - Value: false, - Usage: "run this agent as a selfmon daemon", - }, - &cli.StringFlag{ - Name: "kv", - Value: "", - Usage: "kv type", - EnvVars: []string{"ERU_AGENT_KV"}, - }, &cli.BoolFlag{ Name: "check-only-mine", Value: false, diff --git a/agent.yaml.sample b/agent.yaml.sample index 8b0bb49..a8abe01 100644 --- a/agent.yaml.sample +++ b/agent.yaml.sample @@ -12,10 +12,6 @@ store: grpc # This option is not required as the default value is "docker". runtime: docker -# kv defines the type of kv store. -# This option is not required as the default value is "etcd". -kv: etcd - # core defines the address of eru-core component. # This option is not required as the default value is "127.0.0.1:5001". core: @@ -24,10 +20,8 @@ core: # heartbeat_interval defines the interval for eru-agent to # report health status of the node to eru-core. -# This option is not required, and is only useful when enabling -# selfmon mode. # If you don't want eru-agent to report this status, set it to 0. -# The default value of this option is 0. +# The default value of this option is 60. heartbeat_interval: 120 # auth defines the authentication values for eru-core. @@ -117,41 +111,13 @@ log: # # healthcheck.cache_ttl defines how long will eru-agent cache an unchanged status locally. # This is only used when selfmon mode is switched on. The default value is 300 (in seconds). -# -# healthcheck.enable_selfmon defines whether selfmon is switched on. -# This should be true if there's at least one eru-agent is in mode selfmon. -# When this is true, healthcheck.enable_selfmon and heartbeat_interval is meaningless. healthcheck: interval: 120 timeout: 10 cache_ttl: 300 - enable_selfmon: false # global_connection_timeout defines the timeout for eru-agent other than healthcheck. # E.g. the timeout for reporting action of eru-agent, or the timeout for eru-agent to # connect to docker. # The default value is "5s", note that "s" in the end. -global_connection_timeout: 15s - -# ha_keepalive_interval defines the time interval for sending heartbeat -# when selfmon maintains its own active state. -# The default value is "16s", note that "s" in the end. -ha_keepalive_interval: 16s - -# etcd defines the etcd configuration. -# This option is required and has no default value. -# If you don't plan to run this eru-agent in selfmon mode, -# you can give a mocked value e.g. 127.0.0.1:1111, -# this value won't be used to connect, it's only to pass -# the validation of this option (it's tricky). -# Will plan to improve this in next release. -# -# etcd.machines defines the addresses of etcd machines. -# -# etcd.prefix defines the prefix for eru-agents in selfmon mode. -# This prefix should be the same for all eru-agents in selfmon mode, -# and also distinguished for different ERU clusters. -etcd: - machines: - - 127.0.0.1:2379 - prefix: /agent-selfmon +global_connection_timeout: 15s \ No newline at end of file diff --git a/go.mod b/go.mod index 8ece5d1..acbb7c1 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/go-ole/go-ole v0.0.0-20180213002836-a1ec82a652eb // indirect github.com/jinzhu/configor v1.2.1 github.com/patrickmn/go-cache v2.1.0+incompatible - github.com/pkg/errors v0.9.1 + github.com/pkg/errors v0.9.1 // indirect github.com/projecteru2/core v0.0.0-20211021040158-0be8dbadbc55 github.com/projecteru2/libyavirt v0.0.0-20211014062234-66e6f24ab6d1 github.com/prometheus/client_golang v1.11.0 diff --git a/manager/node/manager.go b/manager/node/manager.go index e77d8f4..a4a2f78 100644 --- a/manager/node/manager.go +++ b/manager/node/manager.go @@ -78,22 +78,24 @@ func (m *Manager) Run(ctx context.Context) error { log.Info("[NodeManager] start node status heartbeat") go m.heartbeat(ctx) - // wait for signal <-ctx.Done() - return m.exit() + log.Info("[NodeManager] exiting") + return nil } -func (m *Manager) exit() error { +// Exit . +func (m *Manager) Exit() error { log.Info("[NodeManager] exiting") - log.Infof("[NodeManager] mark node %s as down", m.config.HostName) + log.Infof("[NodeManager] remove node status of %s", m.config.HostName) // ctx is now canceled. use a new context. var err error utils.WithTimeout(context.TODO(), m.config.GlobalConnectionTimeout, func(ctx context.Context) { - err = m.store.SetNode(ctx, m.config.HostName, false) + // remove node status + err = m.store.SetNodeStatus(ctx, -1) }) if err != nil { - log.Errorf("[NodeManager] failed to mark the node %s as down, err: %s", m.config.HostName, err) + log.Errorf("[NodeManager] failed to remove node status of %v, err: %s", m.config.HostName, err) return err } return nil diff --git a/manager/node/manager_test.go b/manager/node/manager_test.go index 92232ee..692f64d 100644 --- a/manager/node/manager_test.go +++ b/manager/node/manager_test.go @@ -23,10 +23,9 @@ func newMockNodeManager(t *testing.T) *Manager { Stdout: true, }, HealthCheck: types.HealthCheckConfig{ - Interval: 10, - Timeout: 5, - CacheTTL: 300, - EnableSelfmon: true, + Interval: 10, + Timeout: 5, + CacheTTL: 300, }, GlobalConnectionTimeout: 5 * time.Second, } diff --git a/manager/workload/manager_test.go b/manager/workload/manager_test.go index 0527a16..2ec03b3 100644 --- a/manager/workload/manager_test.go +++ b/manager/workload/manager_test.go @@ -23,10 +23,9 @@ func newMockWorkloadManager(t *testing.T) *Manager { Stdout: true, }, HealthCheck: types.HealthCheckConfig{ - Interval: 10, - Timeout: 5, - CacheTTL: 300, - EnableSelfmon: true, + Interval: 10, + Timeout: 5, + CacheTTL: 300, }, GlobalConnectionTimeout: 5 * time.Second, } diff --git a/runtime/mocks/Runtime.go b/runtime/mocks/Runtime.go index f56f6ea..59f426d 100644 --- a/runtime/mocks/Runtime.go +++ b/runtime/mocks/Runtime.go @@ -1,4 +1,4 @@ -// Code generated by mockery 2.9.0. DO NOT EDIT. +// Code generated by mockery v0.0.0-dev. DO NOT EDIT. package mocks diff --git a/selfmon/node.go b/selfmon/node.go deleted file mode 100644 index a3b9ded..0000000 --- a/selfmon/node.go +++ /dev/null @@ -1,104 +0,0 @@ -package selfmon - -import ( - "context" - "io" - "time" - - "github.com/projecteru2/agent/types" - "github.com/projecteru2/agent/utils" - - log "github.com/sirupsen/logrus" -) - -func (m *Selfmon) initNodeStatus(ctx context.Context) { - log.Debug("[selfmon] init node status started") - nodes := make(chan *types.Node) - - go func() { - defer close(nodes) - // Get all nodes which are active status, and regardless of pod. - var podNodes []*types.Node - var err error - utils.WithTimeout(ctx, m.config.GlobalConnectionTimeout, func(ctx context.Context) { - podNodes, err = m.store.ListPodNodes(ctx, true, "", nil) - }) - if err != nil { - log.Errorf("[selfmon] get pod nodes failed %v", err) - return - } - - for _, n := range podNodes { - log.Debugf("[selfmon] watched %s/%s", n.Name, n.Endpoint) - nodes <- n - } - }() - - for n := range nodes { - status, err := m.store.GetNodeStatus(ctx, n.Name) - if err != nil { - status = &types.NodeStatus{ - Nodename: n.Name, - Podname: n.Podname, - Alive: false, - } - } - m.dealNodeStatusMessage(ctx, status) - } -} - -func (m *Selfmon) watchNodeStatus(ctx context.Context) { - for { - select { - case <-ctx.Done(): - log.Infof("[selfmon] %v stop watching node status", m.id) - return - default: - time.Sleep(time.Second) - go m.initNodeStatus(ctx) - if m.watch(ctx) != nil { - log.Debug("[selfmon] retry to watch node status") - time.Sleep(m.config.GlobalConnectionTimeout) - } - } - } -} - -func (m *Selfmon) watch(ctx context.Context) error { - messageChan, errChan := m.store.NodeStatusStream(ctx) - log.Debug("[selfmon] watch node status started") - defer log.Debug("[selfmon] stop watching node status") - - for { - select { - case message := <-messageChan: - go m.dealNodeStatusMessage(ctx, message) - case err := <-errChan: - if err == io.EOF { - log.Debug("[selfmon] server closed the stream") - return err - } - log.Debugf("[selfmon] read node status failed, err: %s", err) - return err - } - } -} - -func (m *Selfmon) dealNodeStatusMessage(ctx context.Context, message *types.NodeStatus) { - if message.Error != nil { - log.Errorf("[selfmon] deal with node status stream message failed %+v", message) - return - } - - // TODO maybe we need a distributed lock to control concurrency - var err error - utils.WithTimeout(ctx, m.config.GlobalConnectionTimeout, func(ctx context.Context) { - err = m.store.SetNode(ctx, message.Nodename, message.Alive) - }) - - if err != nil { - log.Errorf("[selfmon] set node %s failed %v", message.Nodename, err) - return - } - log.Debugf("[selfmon] set node %s as alive: %v", message.Nodename, message.Alive) -} diff --git a/selfmon/register.go b/selfmon/register.go deleted file mode 100644 index 2df4bf6..0000000 --- a/selfmon/register.go +++ /dev/null @@ -1,80 +0,0 @@ -package selfmon - -import ( - "context" - "time" - - coretypes "github.com/projecteru2/core/types" - - "github.com/pkg/errors" - log "github.com/sirupsen/logrus" -) - -// WithActiveLock acquires the active lock synchronously -func (m *Selfmon) WithActiveLock(parentCtx context.Context, f func(ctx context.Context)) { - ctx, cancel := context.WithCancel(parentCtx) - defer cancel() - - var expiry <-chan struct{} - var unregister func() - defer func() { - if unregister != nil { - log.Infof("[Register] %v unregisters", m.id) - unregister() - } - }() - - for { - select { - case <-ctx.Done(): - log.Info("[Register] context canceled") - return - case <-m.Exit(): - log.Infof("[Register] selfmon %v closed", m.id) - return - default: - } - - // try to get the lock - if ne, un, err := m.register(ctx); err != nil { - if errors.Is(err, context.Canceled) { - log.Info("[Register] context canceled") - return - } else if !errors.Is(err, coretypes.ErrKeyExists) { - log.Errorf("[Register] failed to re-register: %v", err) - time.Sleep(time.Second) - continue - } - log.Infof("[Register] %v there has been another active selfmon", m.id) - time.Sleep(time.Second) - } else { - log.Infof("[Register] the agent %v has been active", m.id) - expiry = ne - unregister = un - break - } - } - - // cancel the ctx when: 1. selfmon closed 2. lost the active lock - go func() { - defer cancel() - - select { - case <-ctx.Done(): - log.Info("[Register] context canceled") - return - case <-m.Exit(): - log.Infof("[Register] selfmon %v closed", m.id) - return - case <-expiry: - log.Info("[Register] lock expired") - return - } - }() - - f(ctx) -} - -func (m *Selfmon) register(ctx context.Context) (<-chan struct{}, func(), error) { - return m.kv.StartEphemeral(ctx, ActiveKey, m.config.HAKeepaliveInterval) -} diff --git a/selfmon/selfmon.go b/selfmon/selfmon.go deleted file mode 100644 index 29816ac..0000000 --- a/selfmon/selfmon.go +++ /dev/null @@ -1,147 +0,0 @@ -package selfmon - -import ( - "context" - "os/signal" - "sync" - "syscall" - "time" - - "github.com/projecteru2/agent/common" - "github.com/projecteru2/agent/store" - corestore "github.com/projecteru2/agent/store/core" - storemocks "github.com/projecteru2/agent/store/mocks" - "github.com/projecteru2/agent/types" - coremeta "github.com/projecteru2/core/store/etcdv3/meta" - - "github.com/pkg/errors" - log "github.com/sirupsen/logrus" -) - -// ActiveKey . -const ActiveKey = "/selfmon/active" - -// Selfmon . -type Selfmon struct { - config *types.Config - store store.Store - kv coremeta.KV - id int64 - - exit struct { - sync.Once - C chan struct{} - } -} - -// New . -func New(ctx context.Context, config *types.Config) (mon *Selfmon, err error) { - mon = &Selfmon{} - mon.config = config - mon.exit.C = make(chan struct{}, 1) - mon.id = time.Now().UnixNano() / 1000 % 10000 - - switch config.KV { - case common.ETCDKV: - if mon.kv, err = coremeta.NewETCD(config.Etcd, nil); err != nil { - log.Errorf("[selfmon] failed to get etcd client, err: %s", err) - return nil, err - } - case common.MocksKV: - log.Debug("[selfmon] use embedded ETCD") - mon.kv = nil - default: - return nil, errors.New("unknown kv type") - } - - switch config.Store { - case common.GRPCStore: - corestore.Init(ctx, config) - mon.store = corestore.Get() - if mon.store == nil { - log.Error("[selfmon] failed to get core store") - return nil, errors.New("failed to get core store") - } - case common.MocksStore: - mon.store = storemocks.FromTemplate() - default: - return nil, errors.New("unknown store type") - } - - return mon, nil -} - -// Monitor . -func (m *Selfmon) Monitor(ctx context.Context) { - go m.watchNodeStatus(ctx) - log.Infof("[selfmon] selfmon %v is running", m.id) - <-ctx.Done() - log.Warnf("[selfmon] m %v monitor stops", m.id) -} - -// Run . -func (m *Selfmon) Run(ctx context.Context) error { - go m.handleSignals(ctx) - - for { - select { - case <-ctx.Done(): - return nil - case <-m.Exit(): - return nil - default: - m.WithActiveLock(ctx, func(ctx context.Context) { - m.Monitor(ctx) - }) - } - } -} - -// Exit . -func (m *Selfmon) Exit() <-chan struct{} { - return m.exit.C -} - -// Close . -func (m *Selfmon) Close() { - m.exit.Do(func() { - close(m.exit.C) - }) -} - -// Reload . -func (m *Selfmon) Reload() error { - return nil -} - -// handleSignals . -func (m *Selfmon) handleSignals(ctx context.Context) { - var reloadCtx context.Context - var cancel1 context.CancelFunc - defer func() { - log.Warnf("[selfmon] %v signals handler exit", m.id) - cancel1() - m.Close() - }() - - reloadCtx, cancel1 = signal.NotifyContext(ctx, syscall.SIGHUP, syscall.SIGUSR2) - exitCtx, cancel2 := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) - defer cancel2() - - for { - select { - case <-m.Exit(): - log.Warnf("[selfmon] recv from m %v exit ch", m.id) - return - case <-exitCtx.Done(): - log.Warn("[selfmon] recv signal to exit") - return - case <-reloadCtx.Done(): - log.Warn("[selfmon] recv signal to reload") - if err := m.Reload(); err != nil { - log.Errorf("[selfmon] reload %v failed %v", m.id, err) - } - reloadCtx, cancel1 = signal.NotifyContext(ctx, syscall.SIGHUP, syscall.SIGUSR2) - } - } -} diff --git a/selfmon/selfmon_test.go b/selfmon/selfmon_test.go deleted file mode 100644 index 0adf92e..0000000 --- a/selfmon/selfmon_test.go +++ /dev/null @@ -1,146 +0,0 @@ -package selfmon - -import ( - "context" - "sync" - "testing" - "time" - - "github.com/projecteru2/agent/common" - storemocks "github.com/projecteru2/agent/store/mocks" - "github.com/projecteru2/agent/types" - "github.com/projecteru2/core/store/etcdv3/meta" - coretypes "github.com/projecteru2/core/types" - - "github.com/stretchr/testify/assert" -) - -func newMockSelfmon(t *testing.T, withETCD bool) *Selfmon { - ctx := context.Background() - config := &types.Config{ - HostName: "fake", - Store: common.MocksStore, - Runtime: common.MocksRuntime, - KV: common.MocksKV, - Log: types.LogConfig{ - Stdout: true, - }, - Etcd: coretypes.EtcdConfig{ - Machines: []string{"127.0.0.1:2379"}, - Prefix: "/selfmon-agent", - LockPrefix: "__lock__/selfmon-agent", - }, - GlobalConnectionTimeout: 5 * time.Second, - HAKeepaliveInterval: 16 * time.Second, - } - - m, err := New(ctx, config) - assert.Nil(t, err) - - if withETCD { - etcd, err := meta.NewETCD(config.Etcd, t) - assert.Nil(t, err) - m.kv = etcd - } - - return m -} - -func TestCloseTwice(t *testing.T) { - m := newMockSelfmon(t, false) - defer m.Close() - m.Close() - m.Close() - <-m.Exit() -} - -func TestEmbeddedETCD(t *testing.T) { - etcd, err := meta.NewETCD(coretypes.EtcdConfig{ - Machines: []string{"127.0.0.1:2379"}, - Prefix: "/selfmon-agent", - LockPrefix: "__lock__/selfmon-agent", - }, t) - assert.Nil(t, err) - - ctx := context.Background() - - _, un, err := etcd.StartEphemeral(ctx, "/test/key", 1*time.Second) - assert.Nil(t, err) - time.Sleep(5 * time.Second) - un() - - _, _, err = etcd.StartEphemeral(ctx, "/test/key", 1*time.Second) - assert.Nil(t, err) -} - -func TestRegisterTwice(t *testing.T) { - m1 := newMockSelfmon(t, false) - m2 := newMockSelfmon(t, false) - defer m1.Close() - defer m2.Close() - - // make sure m1 and m2 are using the same embedded ETCD - etcd, err := meta.NewETCD(coretypes.EtcdConfig{ - Machines: []string{"127.0.0.1:2379"}, - Prefix: "/selfmon-agent", - LockPrefix: "__lock__/selfmon-agent", - }, t) - assert.Nil(t, err) - - m1.kv = etcd - m2.kv = etcd - - ctx := context.Background() - - i := "" - - go m1.WithActiveLock(ctx, func(ctx context.Context) { - i += "a" - time.Sleep(5 * time.Second) // hold the lock for 3s - }) - time.Sleep(time.Second) - wg := &sync.WaitGroup{} - wg.Add(1) - go m2.WithActiveLock(ctx, func(ctx context.Context) { - defer wg.Done() - i += "b" - }) - wg.Wait() - assert.Equal(t, i, "ab") -} - -func TestRun(t *testing.T) { - m := newMockSelfmon(t, true) - defer m.Close() - - store := m.store.(*storemocks.MockStore) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - exit := make(chan struct{}) - - // set node "fake" as alive - assert.Nil(t, store.SetNodeStatus(ctx, 0)) - - go func() { - assert.Nil(t, m.Run(ctx)) - exit <- struct{}{} - }() - time.Sleep(2 * time.Second) - - node, _ := store.GetNode(ctx, "fake") - assert.Equal(t, node.Available, true) - node, _ = store.GetNode(ctx, "faker") - assert.Equal(t, node.Available, false) - - go store.StartNodeStatusStream() - time.Sleep(2 * time.Second) - - node, _ = store.GetNode(ctx, "fake") - assert.Equal(t, node.Available, false) - node, _ = store.GetNode(ctx, "faker") - assert.Equal(t, node.Available, true) - - store.StopNodeStatusStream() - m.Close() - <-exit -} diff --git a/store/core/node.go b/store/core/node.go index 8929ef5..c547a68 100644 --- a/store/core/node.go +++ b/store/core/node.go @@ -7,7 +7,6 @@ import ( "github.com/projecteru2/agent/types" "github.com/projecteru2/agent/utils" pb "github.com/projecteru2/core/rpc/gen" - coretypes "github.com/projecteru2/core/types" ) // GetNode return a node by core @@ -32,24 +31,6 @@ func (c *Store) GetNode(ctx context.Context, nodename string) (*types.Node, erro return node, nil } -// UpdateNode update node status -func (c *Store) UpdateNode(ctx context.Context, node *types.Node) error { - opts := &pb.SetNodeOptions{ - Nodename: node.Name, - StatusOpt: coretypes.TriFalse, - } - if node.Available { - opts.StatusOpt = coretypes.TriTrue - } - - var err error - utils.WithTimeout(ctx, c.config.GlobalConnectionTimeout, func(ctx context.Context) { - _, err = c.GetClient().SetNode(ctx, opts) - }) - - return err -} - // SetNodeStatus reports the status of node // SetNodeStatus always reports alive status, // when not alive, TTL will cause expiration of node @@ -66,25 +47,6 @@ func (c *Store) SetNodeStatus(ctx context.Context, ttl int64) error { return err } -// SetNode sets node -func (c *Store) SetNode(ctx context.Context, node string, status bool) error { - statusOpt := pb.TriOpt_TRUE - if !status { - statusOpt = pb.TriOpt_FALSE - } - - var err error - utils.WithTimeout(ctx, c.config.GlobalConnectionTimeout, func(ctx context.Context) { - _, err = c.GetClient().SetNode(ctx, &pb.SetNodeOptions{ - Nodename: node, - StatusOpt: statusOpt, - WorkloadsDown: !status, - }) - }) - - return err -} - // GetNodeStatus gets the status of node func (c *Store) GetNodeStatus(ctx context.Context, nodename string) (*types.NodeStatus, error) { var resp *pb.NodeStatusStreamMessage diff --git a/store/mocks/Store.go b/store/mocks/Store.go index 41131fe..add7304 100644 --- a/store/mocks/Store.go +++ b/store/mocks/Store.go @@ -1,4 +1,4 @@ -// Code generated by mockery 2.9.0. DO NOT EDIT. +// Code generated by mockery v0.0.0-dev. DO NOT EDIT. package mocks @@ -123,20 +123,6 @@ func (_m *Store) NodeStatusStream(ctx context.Context) (<-chan *types.NodeStatus return r0, r1 } -// SetNode provides a mock function with given fields: ctx, node, status -func (_m *Store) SetNode(ctx context.Context, node string, status bool) error { - ret := _m.Called(ctx, node, status) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, bool) error); ok { - r0 = rf(ctx, node, status) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // SetNodeStatus provides a mock function with given fields: ctx, ttl func (_m *Store) SetNodeStatus(ctx context.Context, ttl int64) error { ret := _m.Called(ctx, ttl) @@ -164,17 +150,3 @@ func (_m *Store) SetWorkloadStatus(ctx context.Context, status *types.WorkloadSt return r0 } - -// UpdateNode provides a mock function with given fields: ctx, node -func (_m *Store) UpdateNode(ctx context.Context, node *types.Node) error { - ret := _m.Called(ctx, node) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *types.Node) error); ok { - r0 = rf(ctx, node) - } else { - r0 = ret.Error(0) - } - - return r0 -} diff --git a/store/mocks/template.go b/store/mocks/template.go index 050f984..521f5b7 100644 --- a/store/mocks/template.go +++ b/store/mocks/template.go @@ -6,9 +6,10 @@ import ( "fmt" "sync" + "github.com/stretchr/testify/mock" + "github.com/projecteru2/agent/store" "github.com/projecteru2/agent/types" - "github.com/stretchr/testify/mock" ) // MockStore . @@ -93,20 +94,6 @@ func FromTemplate() store.Store { return nil }) m.On("GetIdentifier", mock.Anything).Return("fake-identifier") - m.On("SetNode", mock.Anything, mock.Anything, mock.Anything).Return(func(ctx context.Context, node string, status bool) error { - fmt.Printf("[MockStore] set node %s as status: %v\n", node, status) - m.Lock() - defer m.Unlock() - if nodeInfo, ok := m.nodeInfo.Load(node); ok { - nodeInfo.(*types.Node).Available = status - } else { - m.nodeInfo.Store(node, &types.Node{ - Name: node, - Available: status, - }) - } - return nil - }) m.On("ListPodNodes", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{ { Name: "fake", diff --git a/store/store.go b/store/store.go index 782c456..5efdd63 100644 --- a/store/store.go +++ b/store/store.go @@ -9,11 +9,9 @@ import ( // Store wrapper of remote calls type Store interface { GetNode(ctx context.Context, nodename string) (*types.Node, error) - UpdateNode(ctx context.Context, node *types.Node) error SetNodeStatus(ctx context.Context, ttl int64) error GetNodeStatus(ctx context.Context, nodename string) (*types.NodeStatus, error) SetWorkloadStatus(ctx context.Context, status *types.WorkloadStatus, ttl int64) error - SetNode(ctx context.Context, node string, status bool) error GetIdentifier(ctx context.Context) string NodeStatusStream(ctx context.Context) (<-chan *types.NodeStatus, <-chan error) ListPodNodes(ctx context.Context, all bool, podname string, labels map[string]string) ([]*types.Node, error) diff --git a/systemd/README.md b/systemd/README.md new file mode 100644 index 0000000..870b286 --- /dev/null +++ b/systemd/README.md @@ -0,0 +1,11 @@ +Deploy Eru-Agent With Systemd +===== + +You can use the file `eru-agent.service` to create systemd service. + +Restart with Systemd +----- + +If your systemd support `RestartKillSignal=`, simply use `systemctl restart eru-agent`. + +If not, please make sure to send `SIGUSR1` signal to stop eru-agent first, then `systemctl start eru-agent`. \ No newline at end of file diff --git a/eru-agent.service b/systemd/eru-agent.service similarity index 92% rename from eru-agent.service rename to systemd/eru-agent.service index 9497659..7d5f7d2 100644 --- a/eru-agent.service +++ b/systemd/eru-agent.service @@ -11,6 +11,7 @@ LimitNOFILE=10485760 LimitNPROC=10485760 LimitCORE=infinity MountFlags=slave +RestartKillSignal=SIGUSR1 [Install] WantedBy=multi-user.target diff --git a/types/config.go b/types/config.go index c97c13a..77a120c 100644 --- a/types/config.go +++ b/types/config.go @@ -43,10 +43,9 @@ type LogConfig struct { // HealthCheckConfig contain healthcheck config type HealthCheckConfig struct { - Interval int `yaml:"interval" default:"60"` - Timeout int `yaml:"timeout" default:"10"` - CacheTTL int `yaml:"cache_ttl" default:"300"` - EnableSelfmon bool `yaml:"enable_selfmon" default:"false"` + Interval int `yaml:"interval" default:"60"` + Timeout int `yaml:"timeout" default:"10"` + CacheTTL int `yaml:"cache_ttl" default:"300"` } // Config contain all configs @@ -54,13 +53,12 @@ type Config struct { PidFile string `yaml:"pid" default:"/tmp/agent.pid"` Core []string `yaml:"core" required:"true"` HostName string `yaml:"-"` - HeartbeatInterval int `yaml:"heartbeat_interval" default:"0"` + HeartbeatInterval int `yaml:"heartbeat_interval" default:"60"` CheckOnlyMine bool `yaml:"check_only_mine" default:"false"` Store string `yaml:"store" default:"grpc"` Runtime string `yaml:"runtime" default:"docker"` - KV string `yaml:"kv" default:"etcd"` Auth coretypes.AuthConfig `yaml:"auth"` Docker DockerConfig @@ -69,21 +67,15 @@ type Config struct { Metrics MetricsConfig API APIConfig `yaml:"api"` Log LogConfig - HealthCheck HealthCheckConfig `yaml:"healthcheck"` - Etcd coretypes.EtcdConfig `yaml:"etcd"` + HealthCheck HealthCheckConfig `yaml:"healthcheck"` GlobalConnectionTimeout time.Duration `yaml:"global_connection_timeout" default:"5s"` - HAKeepaliveInterval time.Duration `yaml:"ha_keepalive_interval" default:"16s"` } // GetHealthCheckStatusTTL returns the TTL for health check status. -// If selfmon is enabled, will return 0. -// Otherwise will use 2.5 * interval. +// Because selfmon is integrated in eru-core, so there returns 0. func (config *Config) GetHealthCheckStatusTTL() int64 { - if config.HealthCheck.EnableSelfmon { - return 0 - } - return int64(2*config.HealthCheck.Interval + config.HealthCheck.Interval/2) + return 0 } // Prepare 从cli覆写并做准备 @@ -149,9 +141,6 @@ func (config *Config) Prepare(c *cli.Context) { if c.String("store") != "" { config.Store = c.String("store") } - if c.String("kv") != "" { - config.KV = c.String("kv") - } // validate if config.PidFile == "" { log.Fatal("need to set pidfile") diff --git a/types/config_test.go b/types/config_test.go index fd80569..4490d2b 100644 --- a/types/config_test.go +++ b/types/config_test.go @@ -22,15 +22,12 @@ func TestLoadConfig(t *testing.T) { assert.Equal(config.HealthCheck.Interval, 120) assert.Equal(config.HealthCheck.Timeout, 10) assert.Equal(config.HealthCheck.CacheTTL, 300) - assert.False(config.HealthCheck.EnableSelfmon) - assert.Equal(config.GetHealthCheckStatusTTL(), int64(300)) + assert.Equal(config.GetHealthCheckStatusTTL(), int64(0)) assert.Equal(config.Store, "grpc") assert.Equal(config.Runtime, "docker") - assert.Equal(config.KV, "etcd") assert.Equal(config.GlobalConnectionTimeout, time.Second*15) - assert.Equal(config.HAKeepaliveInterval, time.Second*16) config.Print() }