Skip to content

Commit

Permalink
Add "HAKeepaliveInterval" in config
Browse files Browse the repository at this point in the history
  • Loading branch information
DuodenumL authored and CMGS committed Sep 8, 2021
1 parent 214a9e8 commit 16ae194
Show file tree
Hide file tree
Showing 10 changed files with 21 additions and 10 deletions.
6 changes: 3 additions & 3 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func main() {
},
&cli.StringFlag{
Name: "store",
Value: "grpc",
Value: "",
Usage: "store type",
EnvVars: []string{"ERU_AGENT_STORE"},
},
Expand All @@ -145,7 +145,7 @@ func main() {
},
&cli.StringFlag{
Name: "runtime",
Value: "docker",
Value: "",
Usage: "runtime type",
EnvVars: []string{"ERU_AGENT_RUNTIME"},
},
Expand Down Expand Up @@ -224,7 +224,7 @@ func main() {
},
&cli.StringFlag{
Name: "kv",
Value: "etcd",
Value: "",
Usage: "kv type",
EnvVars: []string{"ERU_AGENT_KV"},
},
Expand Down
5 changes: 5 additions & 0 deletions agent.yaml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ healthcheck:
# The default value is "5s", note that "s" in the end.
global_connection_timeout: 15s

# ha_keepalive_interval defines the time interval for sending heartbeat
# when selfmon maintains its own active state.
# The default value is "16s", note that "s" in the end.
ha_keepalive_interval: 16s

# etcd defines the etcd configuration.
# This option is required and has no default value.
# If you don't plan to run this eru-agent in selfmon mode,
Expand Down
2 changes: 1 addition & 1 deletion api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (h *Handler) log(w http.ResponseWriter, req *http.Request) {
return
}
defer conn.Close()
h.workloadManager.Subscribe(app, buf)
h.workloadManager.Subscribe(req.Context(), app, buf)
}
}

Expand Down
4 changes: 2 additions & 2 deletions manager/workload/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ func newLogBroadcaster() *logBroadcaster {
}

// subscribe subscribes logs of the specific app.
func (l *logBroadcaster) subscribe(app string, buf *bufio.ReadWriter) {
func (l *logBroadcaster) subscribe(ctx context.Context, app string, buf *bufio.ReadWriter) {
if _, ok := l.subscribers[app]; !ok {
l.subscribers[app] = map[string]*subscriber{}
}

ID := coreutils.RandomString(8)
ctx, cancel := context.WithCancel(context.TODO())
ctx, cancel := context.WithCancel(ctx)
defer cancel()

l.subscribers[app][ID] = &subscriber{buf, cancel}
Expand Down
2 changes: 1 addition & 1 deletion manager/workload/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestLogBroadcaster(t *testing.T) {
return
}
defer conn.Close()
l.subscribe(app, buf)
l.subscribe(context.TODO(), app, buf)
}
}

Expand Down
4 changes: 2 additions & 2 deletions manager/workload/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,6 @@ func (m *Manager) Run(ctx context.Context) error {
}

// Subscribe subscribes logs
func (m *Manager) Subscribe(app string, buf *bufio.ReadWriter) {
m.logBroadcaster.subscribe(app, buf)
func (m *Manager) Subscribe(ctx context.Context, app string, buf *bufio.ReadWriter) {
m.logBroadcaster.subscribe(ctx, app, buf)
}
2 changes: 1 addition & 1 deletion selfmon/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,5 @@ func (m *Selfmon) WithActiveLock(parentCtx context.Context, f func(ctx context.C
}

func (m *Selfmon) register(ctx context.Context) (<-chan struct{}, func(), error) {
return m.kv.StartEphemeral(ctx, ActiveKey, time.Second*16)
return m.kv.StartEphemeral(ctx, ActiveKey, m.config.HAKeepaliveInterval)
}
1 change: 1 addition & 0 deletions selfmon/selfmon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func newMockSelfmon(t *testing.T, withETCD bool) *Selfmon {
LockPrefix: "__lock__/selfmon-agent",
},
GlobalConnectionTimeout: 5 * time.Second,
HAKeepaliveInterval: 16 * time.Second,
}

m, err := New(ctx, config)
Expand Down
1 change: 1 addition & 0 deletions types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Config struct {
Etcd coretypes.EtcdConfig `yaml:"etcd"`

GlobalConnectionTimeout time.Duration `yaml:"global_connection_timeout" default:"5s"`
HAKeepaliveInterval time.Duration `yaml:"ha_keepalive_interval" default:"16s"`
}

// GetHealthCheckStatusTTL returns the TTL for health check status.
Expand Down
4 changes: 4 additions & 0 deletions types/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package types

import (
"testing"
"time"

"github.com/jinzhu/configor"
"github.com/stretchr/testify/assert"
Expand All @@ -27,4 +28,7 @@ func TestLoadConfig(t *testing.T) {
assert.Equal(config.Store, "grpc")
assert.Equal(config.Runtime, "docker")
assert.Equal(config.KV, "etcd")

assert.Equal(config.GlobalConnectionTimeout, time.Second * 15)
assert.Equal(config.HAKeepaliveInterval, time.Second * 16)
}

0 comments on commit 16ae194

Please sign in to comment.