Skip to content

Commit

Permalink
validate config (#60)
Browse files Browse the repository at this point in the history
  • Loading branch information
tonicmuroq authored Jun 5, 2021
1 parent 2cc872a commit 0063b65
Show file tree
Hide file tree
Showing 17 changed files with 95 additions and 56 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ ifneq ($(KEEP_SYMBOL), 1)
GO_LDFLAGS += -s
endif

all: build

deps:
env GO111MODULE=on go mod download
env GO111MODULE=on go mod vendor
Expand Down
29 changes: 18 additions & 11 deletions agent.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"bufio"
"bytes"
"fmt"
"os"

Expand All @@ -17,6 +19,7 @@ import (
"github.com/sethvargo/go-signalcontext"
log "github.com/sirupsen/logrus"
cli "github.com/urfave/cli/v2"
"gopkg.in/yaml.v3"
)

func setupLogLevel(l string) error {
Expand All @@ -37,16 +40,30 @@ func initConfig(c *cli.Context) *types.Config {
}

config.PrepareConfig(c)
printConfig(config)
return config
}

func printConfig(c *types.Config) {
bs, err := yaml.Marshal(c)
if err != nil {
log.Fatalf("[main] print config failed %v", err)
}

log.Info("---- current config ----")
scanner := bufio.NewScanner(bytes.NewBuffer(bs))
for scanner.Scan() {
log.Info(scanner.Text())
}
log.Info("------------------------")
}

func serve(c *cli.Context) error {
if err := setupLogLevel(c.String("log-level")); err != nil {
log.Fatal(err)
}

config := initConfig(c)
log.Debugf("[config] %v", config)
utils.WritePid(config.PidFile)
defer os.Remove(config.PidFile)

Expand Down Expand Up @@ -153,31 +170,21 @@ func main() {
},
&cli.IntFlag{
Name: "health-check-interval",
Value: 0,
Usage: "interval for agent to check container's health status",
EnvVars: []string{"ERU_AGENT_HEALTH_CHECK_INTERVAL"},
},
&cli.IntFlag{
Name: "health-check-status-ttl",
Value: 0,
Usage: "ttl for container's health status in remote store",
EnvVars: []string{"ERU_AGENT_HEALTH_CHECK_STATUS_TTL"},
},
&cli.IntFlag{
Name: "health-check-timeout",
Value: 0,
Usage: "timeout for agent to check container's health status",
EnvVars: []string{"ERU_AGENT_HEALTH_CHECK_TIMEOUT"},
},
&cli.IntFlag{
Name: "health-check-cache-ttl",
Value: 0,
Usage: "ttl for container's health status in local memory",
EnvVars: []string{"ERU_AGENT_HEALTH_CHECK_CACHE_TTL"},
},
&cli.IntFlag{
Name: "heartbeat-interval",
Value: 0,
Usage: "interval for agent to send heartbeat to core",
EnvVars: []string{"ERU_AGENT_HEARTBEAT_INTERVAL"},
},
Expand Down
9 changes: 6 additions & 3 deletions agent.yaml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,24 @@ auth:

docker:
endpoint: unix:///var/run/docker.sock # required

metrics:
step: 30 # required
transfers:
- 127.0.0.1:8125
api:
addr: 127.0.0.1:12345

log:
forwards:
- tcp://127.0.0.1:5144
stdout: False
stdout: false

healthcheck:
interval: 15 # required
status_ttl: 0 # 0 forever
interval: 120 # required
timeout: 10
cache_ttl: 300
enable_selfmon: false

global_connection_timeout: 15s

Expand Down
2 changes: 1 addition & 1 deletion engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (e *Engine) crash() error {
container.Healthy = false
ctx, cancel := context.WithTimeout(context.Background(), e.config.GlobalConnectionTimeout)
defer cancel()
if err := e.store.SetContainerStatus(ctx, container, e.node); err != nil {
if err := e.store.SetContainerStatus(ctx, container, e.node, e.config.GetHealthCheckStatusTTL()); err != nil {
return err
}
log.Infof("[crash] mark %s unhealthy", coreutils.ShortID(container.ID))
Expand Down
2 changes: 1 addition & 1 deletion engine/health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (e *Engine) checkOneContainer(container *types.Container) {

cctx, cancel := context.WithTimeout(context.Background(), e.config.GlobalConnectionTimeout)
defer cancel()
if err := e.store.SetContainerStatus(cctx, container, e.node); err != nil {
if err := e.store.SetContainerStatus(cctx, container, e.node, e.config.GetHealthCheckStatusTTL()); err != nil {
log.Errorf("[checkOneContainer] update deploy status failed %v", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion engine/health_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestCheckAllContainers(t *testing.T) {

e := mockNewEngine()
mockStore := e.store.(*mocks.Store)
mockStore.On("SetContainerStatus", mock.Anything, mock.Anything, mock.Anything).Return(nil)
mockStore.On("SetContainerStatus", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
e.checkAllContainers()

time.Sleep(1 * time.Second)
Expand Down
2 changes: 1 addition & 1 deletion engine/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (e *Engine) load() error {

ctx, cancel := context.WithTimeout(context.Background(), e.config.GlobalConnectionTimeout)
defer cancel()
if err := e.store.SetContainerStatus(ctx, c, e.node); err != nil {
if err := e.store.SetContainerStatus(ctx, c, e.node, e.config.GetHealthCheckStatusTTL()); err != nil {
log.Errorf("[load] update deploy status failed %v", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion engine/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestLoad(t *testing.T) {
n := new(coretypes.Node)
mockStore.On("GetNode", mock.AnythingOfType("string")).Return(n, nil)
mockStore.On("UpdateNode", mock.Anything).Return(nil)
mockStore.On("SetContainerStatus", mock.Anything, mock.Anything, mock.Anything).Return(nil)
mockStore.On("SetContainerStatus", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)

err := e.load()
assert.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions engine/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (e *Engine) handleContainerStart(event eventtypes.Message) {

// 发现需要 health check 立刻执行
if container.Healthy {
if err := e.store.SetContainerStatus(context.Background(), container, e.node); err != nil {
if err := e.store.SetContainerStatus(context.Background(), container, e.node, e.config.GetHealthCheckStatusTTL()); err != nil {
log.Errorf("[handleContainerStart] update deploy status failed %v", err)
}
} else {
Expand All @@ -58,7 +58,7 @@ func (e *Engine) handleContainerDie(event eventtypes.Message) {
container, err := e.detectContainer(event.ID)
if err != nil {
log.Errorf("[handleContainerDie] detect container failed %v", err)
} else if err := e.store.SetContainerStatus(context.Background(), container, e.node); err != nil {
} else if err := e.store.SetContainerStatus(context.Background(), container, e.node, e.config.GetHealthCheckStatusTTL()); err != nil {
log.Errorf("[handleContainerDie] update deploy status failed %v", err)
}
}
2 changes: 1 addition & 1 deletion engine/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestMonitor(t *testing.T) {
n := new(coretypes.Node)
mockStore.On("GetNode", mock.AnythingOfType("string")).Return(n, nil)
mockStore.On("UpdateNode", mock.Anything).Return(nil)
mockStore.On("SetContainerStatus", mock.Anything, mock.Anything, mock.Anything).Return(nil)
mockStore.On("SetContainerStatus", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)

go e.monitor(eventChan)
time.Sleep(3 * time.Second)
Expand Down
8 changes: 6 additions & 2 deletions engine/status_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@ import (
log "github.com/sirupsen/logrus"
)

// heartbeat creates a new goroutine to report status every NodeStatusInterval seconds
// by default it will be 180s
// heartbeat creates a new goroutine to report status every HeartbeatInterval seconds
// By default HeartbeatInterval is 0, will not do heartbeat.
func (e *Engine) heartbeat(ctx context.Context) {
if e.config.HeartbeatInterval <= 0 {
return
}

tick := time.NewTicker(time.Duration(e.config.HeartbeatInterval) * time.Second)
defer tick.Stop()

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ require (
go.etcd.io/etcd/v3 v3.3.0-rc.0.0.20200925060232-add86bbd1a7a
go.uber.org/automaxprocs v1.3.0
google.golang.org/grpc v1.29.1
gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c // indirect
gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c
gotest.tools/v3 v3.0.3 // indirect
)
6 changes: 3 additions & 3 deletions store/core/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
)

// SetContainerStatus deploy containers
func (c *CoreStore) SetContainerStatus(ctx context.Context, container *types.Container, node *coretypes.Node) error {
if c.config.HealthCheck.StatusTTL == 0 {
func (c *CoreStore) SetContainerStatus(ctx context.Context, container *types.Container, node *coretypes.Node, ttl int64) error {
if ttl == 0 {
status := fmt.Sprintf("%s|%v|%v", container.ID, container.Running, container.Healthy)
cached, ok := c.cache.Get(container.ID)
c.cache.Set(container.ID, status, time.Duration(c.config.HealthCheck.CacheTTL)*time.Second)
Expand All @@ -36,7 +36,7 @@ func (c *CoreStore) SetContainerStatus(ctx context.Context, container *types.Con
Healthy: container.Healthy,
Networks: container.Networks,
Extension: bytes,
Ttl: int64(2*c.config.HealthCheck.StatusTTL + c.config.HealthCheck.StatusTTL/2),
Ttl: ttl,
}

opts := &pb.SetWorkloadsStatusOptions{
Expand Down
12 changes: 6 additions & 6 deletions store/mocks/Store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type Store interface {
UpdateNode(node *coretypes.Node) error

SetNodeStatus(context.Context, int64) error
SetContainerStatus(context.Context, *types.Container, *coretypes.Node) error
SetContainerStatus(context.Context, *types.Container, *coretypes.Node, int64) error

GetCoreIdentifier() string
}
39 changes: 18 additions & 21 deletions types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type DockerConfig struct {

// MetricsConfig contain metrics config
type MetricsConfig struct {
Step int64 `yaml:"step" required:"true" default:"10"`
Step int64 `yaml:"step" default:"10"`
Transfers []string `yaml:"transfers"`
}

Expand All @@ -33,18 +33,18 @@ type LogConfig struct {

// HealthCheckConfig contain healthcheck config
type HealthCheckConfig struct {
Interval int `yaml:"interval" required:"true" default:"15"`
StatusTTL int `yaml:"status_ttl"`
Timeout int `yaml:"timeout" default:"10"`
CacheTTL int `yaml:"cache_ttl" default:"300"`
Interval int `yaml:"interval" default:"60"`
Timeout int `yaml:"timeout" default:"10"`
CacheTTL int `yaml:"cache_ttl" default:"300"`
EnableSelfmon bool `yaml:"enable_selfmon" default:"false"`
}

// Config contain all configs
type Config struct {
PidFile string `yaml:"pid" required:"true" default:"/tmp/agent.pid"`
PidFile string `yaml:"pid" default:"/tmp/agent.pid"`
Core string `yaml:"core" required:"true"`
HostName string `yaml:"-"`
HeartbeatInterval int `yaml:"heartbeat_interval" default:"180"`
HeartbeatInterval int `yaml:"heartbeat_interval" default:"0"`

CheckOnlyMine bool `yaml:"check_only_mine" default:"false"`

Expand All @@ -59,6 +59,16 @@ type Config struct {
GlobalConnectionTimeout time.Duration `yaml:"global_connection_timeout" default:"5s"`
}

// GetHealthCheckStatusTTL returns the TTL for health check status.
// If selfmon is enabled, will return 0.
// Otherwise will use 2.5 * interval.
func (config *Config) GetHealthCheckStatusTTL() int64 {
if config.HealthCheck.EnableSelfmon {
return 0
}
return int64(2*config.HealthCheck.Interval + config.HealthCheck.Interval/2)
}

// PrepareConfig 从cli覆写并做准备
func (config *Config) PrepareConfig(c *cli.Context) {
if c.String("hostname") != "" {
Expand Down Expand Up @@ -89,11 +99,6 @@ func (config *Config) PrepareConfig(c *cli.Context) {
if c.Int("health-check-interval") > 0 {
config.HealthCheck.Interval = c.Int("health-check-interval")
}
// status ttl can be 0
// but we need to check if it's set
if c.IsSet("health-check-status-ttl") {
config.HealthCheck.StatusTTL = c.Int("health-check-status-ttl")
}
if c.Int("health-check-timeout") > 0 {
config.HealthCheck.Timeout = c.Int("health-check-timeout")
}
Expand Down Expand Up @@ -125,21 +130,13 @@ func (config *Config) PrepareConfig(c *cli.Context) {
if config.PidFile == "" {
log.Fatal("need to set pidfile")
}
if config.HeartbeatInterval == 0 {
config.HeartbeatInterval = 180
}
if config.HealthCheck.Interval == 0 {
config.HealthCheck.Interval = 15
log.Fatal("healthcheck.interval == 0, this is not allowed")
}
if config.HealthCheck.Timeout == 0 {
config.HealthCheck.Timeout = 10
}
if config.HealthCheck.CacheTTL == 0 {
config.HealthCheck.CacheTTL = 300
}
// status ttl cannot be less than health check interval
// unless it's intended to be 0.
if config.HealthCheck.StatusTTL > 0 && config.HealthCheck.StatusTTL < config.HealthCheck.Interval {
config.HealthCheck.StatusTTL = config.HealthCheck.Interval
}
}
26 changes: 26 additions & 0 deletions types/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package types

import (
"testing"

"github.com/jinzhu/configor"
"github.com/stretchr/testify/assert"
)

func TestLoadConfig(t *testing.T) {
assert := assert.New(t)

config := &Config{}
err := configor.Load(config, "../agent.yaml.sample")
assert.NoError(err)
assert.Equal(config.PidFile, "/tmp/agent.pid")
assert.Equal(config.Core, "127.0.0.1:5001")
assert.Equal(config.HostName, "")
assert.Equal(config.HeartbeatInterval, 120)

assert.Equal(config.HealthCheck.Interval, 120)
assert.Equal(config.HealthCheck.Timeout, 10)
assert.Equal(config.HealthCheck.CacheTTL, 300)
assert.False(config.HealthCheck.EnableSelfmon)
assert.Equal(config.GetHealthCheckStatusTTL(), int64(300))
}

0 comments on commit 0063b65

Please sign in to comment.