diff --git a/.github/workflows/dockerimage.yml b/.github/workflows/dockerimage.yml index bfb6253..d496013 100644 --- a/.github/workflows/dockerimage.yml +++ b/.github/workflows/dockerimage.yml @@ -48,5 +48,4 @@ jobs: username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} build_args: KEEP_SYMBOL=1 - repository: projecteru2/agent tags: ${{ github.sha }}-debug diff --git a/Makefile b/Makefile index 92c672f..dded906 100644 --- a/Makefile +++ b/Makefile @@ -22,7 +22,7 @@ binary: unit-test: go vet `go list ./... | grep -v '/vendor/'` - go test -race -count=1 -cover ./logs/... \ + go test -race -count=1 -timeout 240s -cover ./logs/... \ ./manager/node/... \ ./manager/workload/... \ ./selfmon/... \ diff --git a/logs/writer.go b/logs/writer.go index 89bbb3d..e38b911 100644 --- a/logs/writer.go +++ b/logs/writer.go @@ -65,6 +65,8 @@ func NewWriter(ctx context.Context, addr string, stdout bool) (writer *Writer, e case err == common.ErrInvalidScheme: log.Infof("[writer] create an empty writer for %s success", addr) writer.enc = NewStreamEncoder(discard{}) + case err == errJournalDisabled: + return nil, err case err != nil: log.Errorf("[writer] failed to create writer encoder for %s, err: %v, will retry", addr, err) writer.needReconnect = true diff --git a/logs/writer_test.go b/logs/writer_test.go index b9c19d6..302b9d7 100644 --- a/logs/writer_test.go +++ b/logs/writer_test.go @@ -4,6 +4,7 @@ import ( "context" "net" "testing" + "time" "github.com/projecteru2/agent/types" @@ -17,13 +18,6 @@ func TestNewWriterWithUDP(t *testing.T) { addr := "udp://127.0.0.1:23456" w, err := NewWriter(ctx, addr, true) assert.NoError(t, err) - - enc, err := w.createUDPEncoder() - assert.NoError(t, err) - - w.withLock(func() { - w.enc = enc - }) assert.NoError(t, w.Write(&types.Log{})) } @@ -31,42 +25,88 @@ func TestNewWriterWithTCP(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() // tcp writer - addr := "tcp://127.0.0.1:34567" - tcpL, err := net.Listen("tcp", ":34567") - assert.NoError(t, err) + tcpL, err := net.Listen("tcp", ":34567") defer tcpL.Close() + addr := "tcp://127.0.0.1:34567" w, err := NewWriter(ctx, addr, true) assert.NoError(t, err) + assert.NoError(t, w.Write(&types.Log{})) +} - enc, err := w.createTCPEncoder() +func TestNewWriterWithJournal(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + addr := "journal://system" + enc, err := CreateJournalEncoder() + if err == errJournalDisabled { + return + } assert.NoError(t, err) + defer enc.Close() - w.withLock(func() { - w.enc = enc + w, err := NewWriter(ctx, addr, true) + assert.NoError(t, err) + + w.enc = enc + err = w.enc.Encode(&types.Log{ + ID: "id", + Name: "name", + Type: "type", + EntryPoint: "entrypoint", + Ident: "ident", + Data: "data", + Datetime: "datetime", + Extra: map[string]string{"a": "1", "b": "2"}, }) - assert.NoError(t, w.Write(&types.Log{})) + assert.NoError(t, err) +} + +func TestNewWriters(t *testing.T) { + cases := map[string]error{ + Discard: nil, + "udp://127.0.0.1:23456": nil, + "tcp://127.0.0.1:34567": nil, + "journal://system": errJournalDisabled, + "invalid://hhh": nil, + } + tcpL, err := net.Listen("tcp", ":34567") + assert.NoError(t, err) + defer tcpL.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + for addr, expectedErr := range cases { + go func(addr string, expectedErr error) { + writer, err := NewWriter(ctx, addr, false) + assert.Equal(t, err, expectedErr) + if expectedErr != nil { + return + } + assert.NoError(t, err) + err = writer.Write(&types.Log{}) + assert.NoError(t, err) + }(addr, expectedErr) + } + // wait for closing all writers + time.Sleep(CloseWaitInterval + 2*time.Second) } -// func TestNewWriterWithJournal(t *testing.T) { -// addr := "journal://system" -// enc, err := CreateJournalEncoder() -// assert.NoError(t, err) -// defer enc.Close() - -// w, err := NewWriter(addr, true) -// assert.NoError(t, err) - -// w.enc = enc -// err = w.enc.Encode(&types.Log{ -// ID: "id", -// Name: "name", -// Type: "type", -// EntryPoint: "entrypoint", -// Ident: "ident", -// Data: "data", -// Datetime: "datetime", -// Extra: map[string]string{"a": "1", "b": "2"}, -// }) -// assert.NoError(t, err) -// } +func TestReconnect(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + addr := "tcp://127.0.0.1:34567" + writer, err := NewWriter(ctx, addr, false) + assert.NoError(t, err) + assert.Nil(t, writer.enc) + assert.Equal(t, writer.needReconnect, true) + + tcpL, err := net.Listen("tcp", ":34567") + assert.NoError(t, err) + defer tcpL.Close() + + writer.reconnect() + assert.NoError(t, writer.Write(&types.Log{})) +} diff --git a/manager/node/heartbeat_test.go b/manager/node/heartbeat_test.go index f1b2e8f..fa7abf9 100644 --- a/manager/node/heartbeat_test.go +++ b/manager/node/heartbeat_test.go @@ -3,6 +3,7 @@ package node import ( "context" "testing" + "time" runtimemocks "github.com/projecteru2/agent/runtime/mocks" storemocks "github.com/projecteru2/agent/store/mocks" @@ -11,7 +12,8 @@ import ( ) func TestNodeStatusReport(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() manager := newMockNodeManager(t) runtime := manager.runtimeClient.(*runtimemocks.Nerv) store := manager.store.(*storemocks.MockStore) @@ -28,3 +30,21 @@ func TestNodeStatusReport(t *testing.T) { assert.Nil(t, err) assert.Equal(t, status.Alive, true) } + +func TestHeartbeat(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + manager := newMockNodeManager(t) + store := manager.store.(*storemocks.MockStore) + + status, err := store.GetNodeStatus(ctx, "fake") + assert.Nil(t, err) + assert.Equal(t, status.Alive, false) + + go manager.heartbeat(ctx) + + time.Sleep(time.Duration(manager.config.HeartbeatInterval+2) * time.Second) + status, err = store.GetNodeStatus(ctx, "fake") + assert.Nil(t, err) + assert.Equal(t, status.Alive, true) +} diff --git a/manager/node/manager_test.go b/manager/node/manager_test.go index c126af8..92232ee 100644 --- a/manager/node/manager_test.go +++ b/manager/node/manager_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/projecteru2/agent/common" + storemocks "github.com/projecteru2/agent/store/mocks" "github.com/projecteru2/agent/types" "github.com/stretchr/testify/assert" @@ -34,3 +35,28 @@ func newMockNodeManager(t *testing.T) *Manager { assert.Nil(t, err) return m } + +func TestRun(t *testing.T) { + manager := newMockNodeManager(t) + store := manager.store.(*storemocks.MockStore) + + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(manager.config.HeartbeatInterval*3)*time.Second) + defer cancel() + + status, err := store.GetNodeStatus(ctx, "fake") + assert.Nil(t, err) + assert.Equal(t, status.Alive, false) + + go func() { + time.Sleep(time.Duration(manager.config.HeartbeatInterval*2) * time.Second) + status, err := store.GetNodeStatus(ctx, "fake") + assert.Nil(t, err) + assert.Equal(t, status.Alive, true) + }() + + assert.Nil(t, manager.Run(ctx)) + + info, err := store.GetNode(ctx, "fake") + assert.Nil(t, err) + assert.Equal(t, info.Available, false) +} diff --git a/manager/workload/attach.go b/manager/workload/attach.go index 37c65da..52213ad 100644 --- a/manager/workload/attach.go +++ b/manager/workload/attach.go @@ -96,7 +96,6 @@ func (m *Manager) attach(ctx context.Context, ID string) { } if err := writer.Write(l); err != nil && !(entryPoint == "agent" && utils.IsDockerized()) { log.Errorf("[attach] %s workload %s_%s write failed %v", workloadName, entryPoint, ID, err) - log.Errorf("[attach] %s", data) } } } diff --git a/manager/workload/load.go b/manager/workload/load.go index a462f90..d6450e2 100644 --- a/manager/workload/load.go +++ b/manager/workload/load.go @@ -31,7 +31,6 @@ func (m *Manager) initWorkloadStatus(ctx context.Context) error { go m.attach(ctx, ID) } - // no health check here if err := m.setWorkloadStatus(ctx, workloadStatus); err != nil { log.Errorf("[initWorkloadStatus] update workload %v status failed %v", ID, err) } diff --git a/manager/workload/log.go b/manager/workload/log.go index dce8525..1e53680 100644 --- a/manager/workload/log.go +++ b/manager/workload/log.go @@ -128,7 +128,6 @@ func (l *logBroadcaster) broadcast(log *types.Log) { sub.errChan <- err return } - sub.buf.Flush() }(ID, sub) } diff --git a/manager/workload/log_test.go b/manager/workload/log_test.go index 19902cf..6bd18f1 100644 --- a/manager/workload/log_test.go +++ b/manager/workload/log_test.go @@ -18,30 +18,36 @@ func TestLogBroadcaster(t *testing.T) { manager := newMockWorkloadManager(t) logrus.SetLevel(logrus.DebugLevel) + logCtx, logCancel := context.WithCancel(context.Background()) + defer logCancel() + handler := func(w http.ResponseWriter, req *http.Request) { app := req.URL.Query().Get("app") if app == "" { w.WriteHeader(http.StatusBadRequest) return } - // fuck httpie w.WriteHeader(http.StatusOK) if hijack, ok := w.(http.Hijacker); ok { conn, buf, err := hijack.Hijack() - assert.Nil(t, err) + if err != nil { + return + } defer conn.Close() - manager.PullLog(req.Context(), app, buf) + manager.PullLog(logCtx, app, buf) } } + server := &http.Server{Addr: ":12310"} go func() { restfulAPIServer := pat.New() restfulAPIServer.Add("GET", "/log/", http.HandlerFunc(handler)) - http.Handle("/", restfulAPIServer) - http.ListenAndServe(":12310", nil) + server.Handler = restfulAPIServer + assert.Equal(t, server.ListenAndServe(), http.ErrServerClosed) }() go func() { + // wait for subscribers time.Sleep(3 * time.Second) manager.logBroadcaster.logC <- &types.Log{ ID: "Rei", @@ -63,9 +69,18 @@ func TestLogBroadcaster(t *testing.T) { defer cancel() go manager.logBroadcaster.run(ctx) - time.Sleep(2 * time.Second) - resp, err := http.Get("http://127.0.0.1:12310/log/?app=nerv") + // wait for http server to start + time.Sleep(time.Second) + + reqCtx, reqCancel := context.WithTimeout(ctx, 5*time.Second) + defer reqCancel() + + req, err := http.NewRequestWithContext(reqCtx, "GET", "http://127.0.0.1:12310/log/?app=nerv", nil) + assert.Nil(t, err) + + resp, err := http.DefaultClient.Do(req) assert.Nil(t, err) + defer resp.Body.Close() reader := bufio.NewReader(resp.Body) for i := 0; i < 2; i++ { @@ -73,4 +88,22 @@ func TestLogBroadcaster(t *testing.T) { assert.Nil(t, err) t.Log(string(line)) } + + logCancel() + // wait log subscriber to be removed + time.Sleep(time.Second) + + manager.logBroadcaster.logC <- &types.Log{ + ID: "Rei", + Name: "nerv", + Type: "stdout", + EntryPoint: "eva0", + Data: "data1", + } + count := 0 + manager.logBroadcaster.subscribersMap.Range(func(key, value interface{}) bool { + count++ + return true + }) + assert.Equal(t, count, 0) } diff --git a/manager/workload/manager_test.go b/manager/workload/manager_test.go index 5362ebf..0527a16 100644 --- a/manager/workload/manager_test.go +++ b/manager/workload/manager_test.go @@ -41,6 +41,12 @@ func TestRun(t *testing.T) { runtime := manager.runtimeClient.(*mocks.Nerv) ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() - go runtime.StartEvents() + go func() { + runtime.StartEvents() + runtime.StartCustomEvent(&types.WorkloadEventMessage{ + ID: "Kaworu", + Action: "start", + }) + }() assert.Nil(t, manager.Run(ctx)) } diff --git a/runtime/mocks/template.go b/runtime/mocks/template.go index bf32238..24ff447 100644 --- a/runtime/mocks/template.go +++ b/runtime/mocks/template.go @@ -106,6 +106,7 @@ func FromTemplate() runtime.Runtime { n.withLock(func() { v, ok := n.workloads.Load(ID) if !ok { + status = &types.WorkloadStatus{ID: ID} return } workload := v.(*eva) @@ -179,6 +180,11 @@ func (n *Nerv) StartEvents() { } } +// StartCustomEvent . +func (n *Nerv) StartCustomEvent(event *types.WorkloadEventMessage) { + n.msgChan <- event +} + // SetDaemonRunning set `daemonRunning` func (n *Nerv) SetDaemonRunning(status bool) { n.daemonRunning = status diff --git a/selfmon/selfmon_test.go b/selfmon/selfmon_test.go index 346fbaf..0adf92e 100644 --- a/selfmon/selfmon_test.go +++ b/selfmon/selfmon_test.go @@ -116,12 +116,14 @@ func TestRun(t *testing.T) { store := m.store.(*storemocks.MockStore) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + exit := make(chan struct{}) // set node "fake" as alive assert.Nil(t, store.SetNodeStatus(ctx, 0)) go func() { assert.Nil(t, m.Run(ctx)) + exit <- struct{}{} }() time.Sleep(2 * time.Second) @@ -138,5 +140,7 @@ func TestRun(t *testing.T) { node, _ = store.GetNode(ctx, "faker") assert.Equal(t, node.Available, true) + store.StopNodeStatusStream() m.Close() + <-exit } diff --git a/store/mocks/template.go b/store/mocks/template.go index 8d51fe6..050f984 100644 --- a/store/mocks/template.go +++ b/store/mocks/template.go @@ -2,6 +2,7 @@ package mocks import ( "context" + "errors" "fmt" "sync" @@ -72,8 +73,14 @@ func FromTemplate() store.Store { return nil }) m.On("GetNodeStatus", mock.Anything, mock.Anything).Return(func(ctx context.Context, nodename string) *types.NodeStatus { - if status, ok := m.nodeStatus.Load(nodename); ok { - return status.(*types.NodeStatus) + m.Lock() + defer m.Unlock() + if v, ok := m.nodeStatus.Load(nodename); ok { + status := v.(*types.NodeStatus) + return &types.NodeStatus{ + Nodename: status.Nodename, + Alive: status.Alive, + } } return &types.NodeStatus{ Nodename: nodename, @@ -137,3 +144,8 @@ func (m *MockStore) StartNodeStatusStream() { Alive: false, } } + +// StopNodeStatusStream send an err to errChan. +func (m *MockStore) StopNodeStatusStream() { + m.errChan <- errors.New("closed") +} diff --git a/types/config_test.go b/types/config_test.go index 0ef4bf9..fd80569 100644 --- a/types/config_test.go +++ b/types/config_test.go @@ -31,4 +31,6 @@ func TestLoadConfig(t *testing.T) { assert.Equal(config.GlobalConnectionTimeout, time.Second*15) assert.Equal(config.HAKeepaliveInterval, time.Second*16) + + config.Print() } diff --git a/utils/check_test.go b/utils/check_test.go index 8bfab4e..f00920d 100644 --- a/utils/check_test.go +++ b/utils/check_test.go @@ -12,12 +12,15 @@ import ( func TestCheck(t *testing.T) { go http.ListenAndServe(":12306", http.NotFoundHandler()) time.Sleep(time.Second) - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) assert.Equal(t, CheckHTTP(ctx, "", []string{"http://127.0.0.1:12306"}, 404, time.Second), true) assert.Equal(t, CheckHTTP(ctx, "", []string{"http://127.0.0.1:12306"}, 0, time.Second), true) assert.Equal(t, CheckHTTP(ctx, "", []string{"http://127.0.0.1:12306"}, 200, time.Second), false) assert.Equal(t, CheckHTTP(ctx, "", []string{"http://127.0.0.1:12307"}, 200, time.Second), false) + cancel() + assert.Equal(t, CheckHTTP(ctx, "", []string{"http://127.0.0.1:12306"}, 404, time.Second), false) + assert.Equal(t, CheckTCP("", []string{"127.0.0.1:12306"}, time.Second), true) assert.Equal(t, CheckTCP("", []string{"127.0.0.1:12307"}, time.Second), false) } diff --git a/utils/hash_test.go b/utils/hash_test.go new file mode 100644 index 0000000..f0b4dfc --- /dev/null +++ b/utils/hash_test.go @@ -0,0 +1,19 @@ +package utils + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestHashBackend(t *testing.T) { + data := []string{ + "s1", + "s2", + } + backend := NewHashBackends(data) + assert.EqualValues(t, backend.Len(), 2) + // a certain string will always get a certain hash + assert.Equal(t, backend.Get("param1", 0), "s2") + assert.Equal(t, backend.Get("param2", 0), "s1") +} diff --git a/utils/retry_test.go b/utils/retry_test.go index 2761b9c..4a000df 100644 --- a/utils/retry_test.go +++ b/utils/retry_test.go @@ -4,19 +4,42 @@ import ( "context" "errors" "testing" + "time" "github.com/stretchr/testify/assert" ) func TestBackoffRetry(t *testing.T) { + var errNotSuccess = errors.New("not success") i := 0 f := func() error { i++ if i < 4 { - return errors.New("xxx") + return errNotSuccess } return nil } assert.Nil(t, BackoffRetry(context.Background(), 10, f)) assert.Equal(t, 4, i) + + i = 0 + assert.Equal(t, errNotSuccess, BackoffRetry(context.Background(), 0, f)) + assert.Equal(t, 1, i) +} + +func TestBackoffRetryWithCancel(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + var errNotSuccess = errors.New("not success") + i := 0 + f := func() error { + i++ + if i < 4 { + return errNotSuccess + } + return nil + } + assert.Equal(t, context.DeadlineExceeded, BackoffRetry(ctx, 10, f)) + assert.NotEqual(t, 4, i) } diff --git a/utils/sync.go b/utils/sync.go index efe493b..3d909a3 100644 --- a/utils/sync.go +++ b/utils/sync.go @@ -2,29 +2,8 @@ package utils import ( "sync" - "sync/atomic" ) -// AtomicBool indicates an atomic boolean instance. -type AtomicBool struct { - i32 int32 -} - -// Bool . -func (a *AtomicBool) Bool() bool { - return atomic.LoadInt32(&a.i32) == 1 -} - -// Set to true. -func (a *AtomicBool) Set() { - atomic.StoreInt32(&a.i32, 1) -} - -// Unset to false. -func (a *AtomicBool) Unset() { - atomic.StoreInt32(&a.i32, 0) -} - // GroupCAS indicates cas locks which are grouped by keys. type GroupCAS struct { groups sync.Map