From 16ae194951b90d297f71568b4e28094ae9ec1b47 Mon Sep 17 00:00:00 2001 From: DuodenumL Date: Wed, 8 Sep 2021 15:47:58 +0800 Subject: [PATCH] Add "HAKeepaliveInterval" in config --- agent.go | 6 +++--- agent.yaml.sample | 5 +++++ api/http.go | 2 +- manager/workload/log.go | 4 ++-- manager/workload/log_test.go | 2 +- manager/workload/manager.go | 4 ++-- selfmon/register.go | 2 +- selfmon/selfmon_test.go | 1 + types/config.go | 1 + types/config_test.go | 4 ++++ 10 files changed, 21 insertions(+), 10 deletions(-) diff --git a/agent.go b/agent.go index 40a4138..208461c 100644 --- a/agent.go +++ b/agent.go @@ -121,7 +121,7 @@ func main() { }, &cli.StringFlag{ Name: "store", - Value: "grpc", + Value: "", Usage: "store type", EnvVars: []string{"ERU_AGENT_STORE"}, }, @@ -145,7 +145,7 @@ func main() { }, &cli.StringFlag{ Name: "runtime", - Value: "docker", + Value: "", Usage: "runtime type", EnvVars: []string{"ERU_AGENT_RUNTIME"}, }, @@ -224,7 +224,7 @@ func main() { }, &cli.StringFlag{ Name: "kv", - Value: "etcd", + Value: "", Usage: "kv type", EnvVars: []string{"ERU_AGENT_KV"}, }, diff --git a/agent.yaml.sample b/agent.yaml.sample index 8eb92e5..140e548 100644 --- a/agent.yaml.sample +++ b/agent.yaml.sample @@ -123,6 +123,11 @@ healthcheck: # The default value is "5s", note that "s" in the end. global_connection_timeout: 15s +# ha_keepalive_interval defines the time interval for sending heartbeat +# when selfmon maintains its own active state. +# The default value is "16s", note that "s" in the end. +ha_keepalive_interval: 16s + # etcd defines the etcd configuration. # This option is required and has no default value. # If you don't plan to run this eru-agent in selfmon mode, diff --git a/api/http.go b/api/http.go index c0d04ee..f791433 100644 --- a/api/http.go +++ b/api/http.go @@ -59,7 +59,7 @@ func (h *Handler) log(w http.ResponseWriter, req *http.Request) { return } defer conn.Close() - h.workloadManager.Subscribe(app, buf) + h.workloadManager.Subscribe(req.Context(), app, buf) } } diff --git a/manager/workload/log.go b/manager/workload/log.go index cb1ee80..96b26ca 100644 --- a/manager/workload/log.go +++ b/manager/workload/log.go @@ -31,13 +31,13 @@ func newLogBroadcaster() *logBroadcaster { } // subscribe subscribes logs of the specific app. -func (l *logBroadcaster) subscribe(app string, buf *bufio.ReadWriter) { +func (l *logBroadcaster) subscribe(ctx context.Context, app string, buf *bufio.ReadWriter) { if _, ok := l.subscribers[app]; !ok { l.subscribers[app] = map[string]*subscriber{} } ID := coreutils.RandomString(8) - ctx, cancel := context.WithCancel(context.TODO()) + ctx, cancel := context.WithCancel(ctx) defer cancel() l.subscribers[app][ID] = &subscriber{buf, cancel} diff --git a/manager/workload/log_test.go b/manager/workload/log_test.go index 5a77f6e..7f68d4a 100644 --- a/manager/workload/log_test.go +++ b/manager/workload/log_test.go @@ -32,7 +32,7 @@ func TestLogBroadcaster(t *testing.T) { return } defer conn.Close() - l.subscribe(app, buf) + l.subscribe(context.TODO(), app, buf) } } diff --git a/manager/workload/manager.go b/manager/workload/manager.go index 2813ee4..96e5706 100644 --- a/manager/workload/manager.go +++ b/manager/workload/manager.go @@ -119,6 +119,6 @@ func (m *Manager) Run(ctx context.Context) error { } // Subscribe subscribes logs -func (m *Manager) Subscribe(app string, buf *bufio.ReadWriter) { - m.logBroadcaster.subscribe(app, buf) +func (m *Manager) Subscribe(ctx context.Context, app string, buf *bufio.ReadWriter) { + m.logBroadcaster.subscribe(ctx, app, buf) } diff --git a/selfmon/register.go b/selfmon/register.go index 44c8011..48c31a6 100644 --- a/selfmon/register.go +++ b/selfmon/register.go @@ -76,5 +76,5 @@ func (m *Selfmon) WithActiveLock(parentCtx context.Context, f func(ctx context.C } func (m *Selfmon) register(ctx context.Context) (<-chan struct{}, func(), error) { - return m.kv.StartEphemeral(ctx, ActiveKey, time.Second*16) + return m.kv.StartEphemeral(ctx, ActiveKey, m.config.HAKeepaliveInterval) } diff --git a/selfmon/selfmon_test.go b/selfmon/selfmon_test.go index 986f76f..b2ba16a 100644 --- a/selfmon/selfmon_test.go +++ b/selfmon/selfmon_test.go @@ -30,6 +30,7 @@ func newMockSelfmon(t *testing.T, withETCD bool) *Selfmon { LockPrefix: "__lock__/selfmon-agent", }, GlobalConnectionTimeout: 5 * time.Second, + HAKeepaliveInterval: 16 * time.Second, } m, err := New(ctx, config) diff --git a/types/config.go b/types/config.go index 42ed778..2fd8f36 100644 --- a/types/config.go +++ b/types/config.go @@ -65,6 +65,7 @@ type Config struct { Etcd coretypes.EtcdConfig `yaml:"etcd"` GlobalConnectionTimeout time.Duration `yaml:"global_connection_timeout" default:"5s"` + HAKeepaliveInterval time.Duration `yaml:"ha_keepalive_interval" default:"16s"` } // GetHealthCheckStatusTTL returns the TTL for health check status. diff --git a/types/config_test.go b/types/config_test.go index dfe909d..51d062e 100644 --- a/types/config_test.go +++ b/types/config_test.go @@ -2,6 +2,7 @@ package types import ( "testing" + "time" "github.com/jinzhu/configor" "github.com/stretchr/testify/assert" @@ -27,4 +28,7 @@ func TestLoadConfig(t *testing.T) { assert.Equal(config.Store, "grpc") assert.Equal(config.Runtime, "docker") assert.Equal(config.KV, "etcd") + + assert.Equal(config.GlobalConnectionTimeout, time.Second * 15) + assert.Equal(config.HAKeepaliveInterval, time.Second * 16) }