From f622836038031e0cf37411eff6e59dc60429a6a6 Mon Sep 17 00:00:00 2001
From: DuodenumL <qq2410088750@live.com>
Date: Fri, 1 Oct 2021 17:40:46 +0800
Subject: [PATCH] unit tests (#75)

---
 .github/workflows/dockerimage.yml |   1 -
 Makefile                          |   2 +-
 logs/writer.go                    |   2 +
 logs/writer_test.go               | 112 ++++++++++++++++++++----------
 manager/node/heartbeat_test.go    |  22 +++++-
 manager/node/manager_test.go      |  26 +++++++
 manager/workload/attach.go        |   1 -
 manager/workload/load.go          |   1 -
 manager/workload/log.go           |   1 -
 manager/workload/log_test.go      |  47 +++++++++++--
 manager/workload/manager_test.go  |   8 ++-
 runtime/mocks/template.go         |   6 ++
 selfmon/selfmon_test.go           |   4 ++
 store/mocks/template.go           |  16 ++++-
 types/config_test.go              |   2 +
 utils/check_test.go               |   5 +-
 utils/hash_test.go                |  19 +++++
 utils/retry_test.go               |  25 ++++++-
 utils/sync.go                     |  21 ------
 19 files changed, 246 insertions(+), 75 deletions(-)
 create mode 100644 utils/hash_test.go

diff --git a/.github/workflows/dockerimage.yml b/.github/workflows/dockerimage.yml
index bfb6253..d496013 100644
--- a/.github/workflows/dockerimage.yml
+++ b/.github/workflows/dockerimage.yml
@@ -48,5 +48,4 @@ jobs:
           username: ${{ github.actor }}
           password: ${{ secrets.GITHUB_TOKEN }}
           build_args: KEEP_SYMBOL=1
-          repository: projecteru2/agent
           tags: ${{ github.sha }}-debug
diff --git a/Makefile b/Makefile
index 92c672f..dded906 100644
--- a/Makefile
+++ b/Makefile
@@ -22,7 +22,7 @@ binary:
 
 unit-test:
 	go vet `go list ./... | grep -v '/vendor/'`
-	go test -race -count=1 -cover ./logs/... \
+	go test -race -count=1 -timeout 240s -cover ./logs/... \
 	./manager/node/... \
 	./manager/workload/... \
 	./selfmon/... \
diff --git a/logs/writer.go b/logs/writer.go
index 89bbb3d..e38b911 100644
--- a/logs/writer.go
+++ b/logs/writer.go
@@ -65,6 +65,8 @@ func NewWriter(ctx context.Context, addr string, stdout bool) (writer *Writer, e
 	case err == common.ErrInvalidScheme:
 		log.Infof("[writer] create an empty writer for %s success", addr)
 		writer.enc = NewStreamEncoder(discard{})
+	case err == errJournalDisabled:
+		return nil, err
 	case err != nil:
 		log.Errorf("[writer] failed to create writer encoder for %s, err: %v, will retry", addr, err)
 		writer.needReconnect = true
diff --git a/logs/writer_test.go b/logs/writer_test.go
index b9c19d6..302b9d7 100644
--- a/logs/writer_test.go
+++ b/logs/writer_test.go
@@ -4,6 +4,7 @@ import (
 	"context"
 	"net"
 	"testing"
+	"time"
 
 	"github.com/projecteru2/agent/types"
 
@@ -17,13 +18,6 @@ func TestNewWriterWithUDP(t *testing.T) {
 	addr := "udp://127.0.0.1:23456"
 	w, err := NewWriter(ctx, addr, true)
 	assert.NoError(t, err)
-
-	enc, err := w.createUDPEncoder()
-	assert.NoError(t, err)
-
-	w.withLock(func() {
-		w.enc = enc
-	})
 	assert.NoError(t, w.Write(&types.Log{}))
 }
 
@@ -31,42 +25,88 @@ func TestNewWriterWithTCP(t *testing.T) {
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 	// tcp writer
-	addr := "tcp://127.0.0.1:34567"
-	tcpL, err := net.Listen("tcp", ":34567")
-	assert.NoError(t, err)
 
+	tcpL, err := net.Listen("tcp", ":34567")
 	defer tcpL.Close()
+	addr := "tcp://127.0.0.1:34567"
 	w, err := NewWriter(ctx, addr, true)
 	assert.NoError(t, err)
+	assert.NoError(t, w.Write(&types.Log{}))
+}
 
-	enc, err := w.createTCPEncoder()
+func TestNewWriterWithJournal(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+	addr := "journal://system"
+	enc, err := CreateJournalEncoder()
+	if err == errJournalDisabled {
+		return
+	}
 	assert.NoError(t, err)
+	defer enc.Close()
 
-	w.withLock(func() {
-		w.enc = enc
+	w, err := NewWriter(ctx, addr, true)
+	assert.NoError(t, err)
+
+	w.enc = enc
+	err = w.enc.Encode(&types.Log{
+		ID:         "id",
+		Name:       "name",
+		Type:       "type",
+		EntryPoint: "entrypoint",
+		Ident:      "ident",
+		Data:       "data",
+		Datetime:   "datetime",
+		Extra:      map[string]string{"a": "1", "b": "2"},
 	})
-	assert.NoError(t, w.Write(&types.Log{}))
+	assert.NoError(t, err)
+}
+
+func TestNewWriters(t *testing.T) {
+	cases := map[string]error{
+		Discard:                 nil,
+		"udp://127.0.0.1:23456": nil,
+		"tcp://127.0.0.1:34567": nil,
+		"journal://system":      errJournalDisabled,
+		"invalid://hhh":         nil,
+	}
+	tcpL, err := net.Listen("tcp", ":34567")
+	assert.NoError(t, err)
+	defer tcpL.Close()
+
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	for addr, expectedErr := range cases {
+		go func(addr string, expectedErr error) {
+			writer, err := NewWriter(ctx, addr, false)
+			assert.Equal(t, err, expectedErr)
+			if expectedErr != nil {
+				return
+			}
+			assert.NoError(t, err)
+			err = writer.Write(&types.Log{})
+			assert.NoError(t, err)
+		}(addr, expectedErr)
+	}
+	// wait for closing all writers
+	time.Sleep(CloseWaitInterval + 2*time.Second)
 }
 
-// func TestNewWriterWithJournal(t *testing.T) {
-// 	addr := "journal://system"
-// 	enc, err := CreateJournalEncoder()
-// 	assert.NoError(t, err)
-// 	defer enc.Close()
-
-// 	w, err := NewWriter(addr, true)
-// 	assert.NoError(t, err)
-
-// 	w.enc = enc
-// 	err = w.enc.Encode(&types.Log{
-// 		ID: "id",
-// 		Name: "name",
-// 		Type: "type",
-// 		EntryPoint: "entrypoint",
-// 		Ident: "ident",
-// 		Data: "data",
-// 		Datetime: "datetime",
-// 		Extra: map[string]string{"a": "1", "b": "2"},
-// 	})
-// 	assert.NoError(t, err)
-// }
+func TestReconnect(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	addr := "tcp://127.0.0.1:34567"
+	writer, err := NewWriter(ctx, addr, false)
+	assert.NoError(t, err)
+	assert.Nil(t, writer.enc)
+	assert.Equal(t, writer.needReconnect, true)
+
+	tcpL, err := net.Listen("tcp", ":34567")
+	assert.NoError(t, err)
+	defer tcpL.Close()
+
+	writer.reconnect()
+	assert.NoError(t, writer.Write(&types.Log{}))
+}
diff --git a/manager/node/heartbeat_test.go b/manager/node/heartbeat_test.go
index f1b2e8f..fa7abf9 100644
--- a/manager/node/heartbeat_test.go
+++ b/manager/node/heartbeat_test.go
@@ -3,6 +3,7 @@ package node
 import (
 	"context"
 	"testing"
+	"time"
 
 	runtimemocks "github.com/projecteru2/agent/runtime/mocks"
 	storemocks "github.com/projecteru2/agent/store/mocks"
@@ -11,7 +12,8 @@ import (
 )
 
 func TestNodeStatusReport(t *testing.T) {
-	ctx := context.Background()
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
 	manager := newMockNodeManager(t)
 	runtime := manager.runtimeClient.(*runtimemocks.Nerv)
 	store := manager.store.(*storemocks.MockStore)
@@ -28,3 +30,21 @@ func TestNodeStatusReport(t *testing.T) {
 	assert.Nil(t, err)
 	assert.Equal(t, status.Alive, true)
 }
+
+func TestHeartbeat(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+	manager := newMockNodeManager(t)
+	store := manager.store.(*storemocks.MockStore)
+
+	status, err := store.GetNodeStatus(ctx, "fake")
+	assert.Nil(t, err)
+	assert.Equal(t, status.Alive, false)
+
+	go manager.heartbeat(ctx)
+
+	time.Sleep(time.Duration(manager.config.HeartbeatInterval+2) * time.Second)
+	status, err = store.GetNodeStatus(ctx, "fake")
+	assert.Nil(t, err)
+	assert.Equal(t, status.Alive, true)
+}
diff --git a/manager/node/manager_test.go b/manager/node/manager_test.go
index c126af8..92232ee 100644
--- a/manager/node/manager_test.go
+++ b/manager/node/manager_test.go
@@ -6,6 +6,7 @@ import (
 	"time"
 
 	"github.com/projecteru2/agent/common"
+	storemocks "github.com/projecteru2/agent/store/mocks"
 	"github.com/projecteru2/agent/types"
 
 	"github.com/stretchr/testify/assert"
@@ -34,3 +35,28 @@ func newMockNodeManager(t *testing.T) *Manager {
 	assert.Nil(t, err)
 	return m
 }
+
+func TestRun(t *testing.T) {
+	manager := newMockNodeManager(t)
+	store := manager.store.(*storemocks.MockStore)
+
+	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(manager.config.HeartbeatInterval*3)*time.Second)
+	defer cancel()
+
+	status, err := store.GetNodeStatus(ctx, "fake")
+	assert.Nil(t, err)
+	assert.Equal(t, status.Alive, false)
+
+	go func() {
+		time.Sleep(time.Duration(manager.config.HeartbeatInterval*2) * time.Second)
+		status, err := store.GetNodeStatus(ctx, "fake")
+		assert.Nil(t, err)
+		assert.Equal(t, status.Alive, true)
+	}()
+
+	assert.Nil(t, manager.Run(ctx))
+
+	info, err := store.GetNode(ctx, "fake")
+	assert.Nil(t, err)
+	assert.Equal(t, info.Available, false)
+}
diff --git a/manager/workload/attach.go b/manager/workload/attach.go
index 37c65da..52213ad 100644
--- a/manager/workload/attach.go
+++ b/manager/workload/attach.go
@@ -96,7 +96,6 @@ func (m *Manager) attach(ctx context.Context, ID string) {
 			}
 			if err := writer.Write(l); err != nil && !(entryPoint == "agent" && utils.IsDockerized()) {
 				log.Errorf("[attach] %s workload %s_%s write failed %v", workloadName, entryPoint, ID, err)
-				log.Errorf("[attach] %s", data)
 			}
 		}
 	}
diff --git a/manager/workload/load.go b/manager/workload/load.go
index a462f90..d6450e2 100644
--- a/manager/workload/load.go
+++ b/manager/workload/load.go
@@ -31,7 +31,6 @@ func (m *Manager) initWorkloadStatus(ctx context.Context) error {
 				go m.attach(ctx, ID)
 			}
 
-			// no health check here
 			if err := m.setWorkloadStatus(ctx, workloadStatus); err != nil {
 				log.Errorf("[initWorkloadStatus] update workload %v status failed %v", ID, err)
 			}
diff --git a/manager/workload/log.go b/manager/workload/log.go
index dce8525..1e53680 100644
--- a/manager/workload/log.go
+++ b/manager/workload/log.go
@@ -128,7 +128,6 @@ func (l *logBroadcaster) broadcast(log *types.Log) {
 				sub.errChan <- err
 				return
 			}
-
 			sub.buf.Flush()
 		}(ID, sub)
 	}
diff --git a/manager/workload/log_test.go b/manager/workload/log_test.go
index 19902cf..6bd18f1 100644
--- a/manager/workload/log_test.go
+++ b/manager/workload/log_test.go
@@ -18,30 +18,36 @@ func TestLogBroadcaster(t *testing.T) {
 	manager := newMockWorkloadManager(t)
 	logrus.SetLevel(logrus.DebugLevel)
 
+	logCtx, logCancel := context.WithCancel(context.Background())
+	defer logCancel()
+
 	handler := func(w http.ResponseWriter, req *http.Request) {
 		app := req.URL.Query().Get("app")
 		if app == "" {
 			w.WriteHeader(http.StatusBadRequest)
 			return
 		}
-		// fuck httpie
 		w.WriteHeader(http.StatusOK)
 		if hijack, ok := w.(http.Hijacker); ok {
 			conn, buf, err := hijack.Hijack()
-			assert.Nil(t, err)
+			if err != nil {
+				return
+			}
 			defer conn.Close()
-			manager.PullLog(req.Context(), app, buf)
+			manager.PullLog(logCtx, app, buf)
 		}
 	}
+	server := &http.Server{Addr: ":12310"}
 
 	go func() {
 		restfulAPIServer := pat.New()
 		restfulAPIServer.Add("GET", "/log/", http.HandlerFunc(handler))
-		http.Handle("/", restfulAPIServer)
-		http.ListenAndServe(":12310", nil)
+		server.Handler = restfulAPIServer
+		assert.Equal(t, server.ListenAndServe(), http.ErrServerClosed)
 	}()
 
 	go func() {
+		// wait for subscribers
 		time.Sleep(3 * time.Second)
 		manager.logBroadcaster.logC <- &types.Log{
 			ID:         "Rei",
@@ -63,9 +69,18 @@ func TestLogBroadcaster(t *testing.T) {
 	defer cancel()
 	go manager.logBroadcaster.run(ctx)
 
-	time.Sleep(2 * time.Second)
-	resp, err := http.Get("http://127.0.0.1:12310/log/?app=nerv")
+	// wait for http server to start
+	time.Sleep(time.Second)
+
+	reqCtx, reqCancel := context.WithTimeout(ctx, 5*time.Second)
+	defer reqCancel()
+
+	req, err := http.NewRequestWithContext(reqCtx, "GET", "http://127.0.0.1:12310/log/?app=nerv", nil)
+	assert.Nil(t, err)
+
+	resp, err := http.DefaultClient.Do(req)
 	assert.Nil(t, err)
+	defer resp.Body.Close()
 
 	reader := bufio.NewReader(resp.Body)
 	for i := 0; i < 2; i++ {
@@ -73,4 +88,22 @@ func TestLogBroadcaster(t *testing.T) {
 		assert.Nil(t, err)
 		t.Log(string(line))
 	}
+
+	logCancel()
+	// wait log subscriber to be removed
+	time.Sleep(time.Second)
+
+	manager.logBroadcaster.logC <- &types.Log{
+		ID:         "Rei",
+		Name:       "nerv",
+		Type:       "stdout",
+		EntryPoint: "eva0",
+		Data:       "data1",
+	}
+	count := 0
+	manager.logBroadcaster.subscribersMap.Range(func(key, value interface{}) bool {
+		count++
+		return true
+	})
+	assert.Equal(t, count, 0)
 }
diff --git a/manager/workload/manager_test.go b/manager/workload/manager_test.go
index 5362ebf..0527a16 100644
--- a/manager/workload/manager_test.go
+++ b/manager/workload/manager_test.go
@@ -41,6 +41,12 @@ func TestRun(t *testing.T) {
 	runtime := manager.runtimeClient.(*mocks.Nerv)
 	ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
 	defer cancel()
-	go runtime.StartEvents()
+	go func() {
+		runtime.StartEvents()
+		runtime.StartCustomEvent(&types.WorkloadEventMessage{
+			ID:     "Kaworu",
+			Action: "start",
+		})
+	}()
 	assert.Nil(t, manager.Run(ctx))
 }
diff --git a/runtime/mocks/template.go b/runtime/mocks/template.go
index bf32238..24ff447 100644
--- a/runtime/mocks/template.go
+++ b/runtime/mocks/template.go
@@ -106,6 +106,7 @@ func FromTemplate() runtime.Runtime {
 		n.withLock(func() {
 			v, ok := n.workloads.Load(ID)
 			if !ok {
+				status = &types.WorkloadStatus{ID: ID}
 				return
 			}
 			workload := v.(*eva)
@@ -179,6 +180,11 @@ func (n *Nerv) StartEvents() {
 	}
 }
 
+// StartCustomEvent .
+func (n *Nerv) StartCustomEvent(event *types.WorkloadEventMessage) {
+	n.msgChan <- event
+}
+
 // SetDaemonRunning set `daemonRunning`
 func (n *Nerv) SetDaemonRunning(status bool) {
 	n.daemonRunning = status
diff --git a/selfmon/selfmon_test.go b/selfmon/selfmon_test.go
index 346fbaf..0adf92e 100644
--- a/selfmon/selfmon_test.go
+++ b/selfmon/selfmon_test.go
@@ -116,12 +116,14 @@ func TestRun(t *testing.T) {
 	store := m.store.(*storemocks.MockStore)
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
+	exit := make(chan struct{})
 
 	// set node "fake" as alive
 	assert.Nil(t, store.SetNodeStatus(ctx, 0))
 
 	go func() {
 		assert.Nil(t, m.Run(ctx))
+		exit <- struct{}{}
 	}()
 	time.Sleep(2 * time.Second)
 
@@ -138,5 +140,7 @@ func TestRun(t *testing.T) {
 	node, _ = store.GetNode(ctx, "faker")
 	assert.Equal(t, node.Available, true)
 
+	store.StopNodeStatusStream()
 	m.Close()
+	<-exit
 }
diff --git a/store/mocks/template.go b/store/mocks/template.go
index 8d51fe6..050f984 100644
--- a/store/mocks/template.go
+++ b/store/mocks/template.go
@@ -2,6 +2,7 @@ package mocks
 
 import (
 	"context"
+	"errors"
 	"fmt"
 	"sync"
 
@@ -72,8 +73,14 @@ func FromTemplate() store.Store {
 		return nil
 	})
 	m.On("GetNodeStatus", mock.Anything, mock.Anything).Return(func(ctx context.Context, nodename string) *types.NodeStatus {
-		if status, ok := m.nodeStatus.Load(nodename); ok {
-			return status.(*types.NodeStatus)
+		m.Lock()
+		defer m.Unlock()
+		if v, ok := m.nodeStatus.Load(nodename); ok {
+			status := v.(*types.NodeStatus)
+			return &types.NodeStatus{
+				Nodename: status.Nodename,
+				Alive:    status.Alive,
+			}
 		}
 		return &types.NodeStatus{
 			Nodename: nodename,
@@ -137,3 +144,8 @@ func (m *MockStore) StartNodeStatusStream() {
 		Alive:    false,
 	}
 }
+
+// StopNodeStatusStream send an err to errChan.
+func (m *MockStore) StopNodeStatusStream() {
+	m.errChan <- errors.New("closed")
+}
diff --git a/types/config_test.go b/types/config_test.go
index 0ef4bf9..fd80569 100644
--- a/types/config_test.go
+++ b/types/config_test.go
@@ -31,4 +31,6 @@ func TestLoadConfig(t *testing.T) {
 
 	assert.Equal(config.GlobalConnectionTimeout, time.Second*15)
 	assert.Equal(config.HAKeepaliveInterval, time.Second*16)
+
+	config.Print()
 }
diff --git a/utils/check_test.go b/utils/check_test.go
index 8bfab4e..f00920d 100644
--- a/utils/check_test.go
+++ b/utils/check_test.go
@@ -12,12 +12,15 @@ import (
 func TestCheck(t *testing.T) {
 	go http.ListenAndServe(":12306", http.NotFoundHandler())
 	time.Sleep(time.Second)
-	ctx := context.Background()
+	ctx, cancel := context.WithCancel(context.Background())
 	assert.Equal(t, CheckHTTP(ctx, "", []string{"http://127.0.0.1:12306"}, 404, time.Second), true)
 	assert.Equal(t, CheckHTTP(ctx, "", []string{"http://127.0.0.1:12306"}, 0, time.Second), true)
 	assert.Equal(t, CheckHTTP(ctx, "", []string{"http://127.0.0.1:12306"}, 200, time.Second), false)
 	assert.Equal(t, CheckHTTP(ctx, "", []string{"http://127.0.0.1:12307"}, 200, time.Second), false)
 
+	cancel()
+	assert.Equal(t, CheckHTTP(ctx, "", []string{"http://127.0.0.1:12306"}, 404, time.Second), false)
+
 	assert.Equal(t, CheckTCP("", []string{"127.0.0.1:12306"}, time.Second), true)
 	assert.Equal(t, CheckTCP("", []string{"127.0.0.1:12307"}, time.Second), false)
 }
diff --git a/utils/hash_test.go b/utils/hash_test.go
new file mode 100644
index 0000000..f0b4dfc
--- /dev/null
+++ b/utils/hash_test.go
@@ -0,0 +1,19 @@
+package utils
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestHashBackend(t *testing.T) {
+	data := []string{
+		"s1",
+		"s2",
+	}
+	backend := NewHashBackends(data)
+	assert.EqualValues(t, backend.Len(), 2)
+	// a certain string will always get a certain hash
+	assert.Equal(t, backend.Get("param1", 0), "s2")
+	assert.Equal(t, backend.Get("param2", 0), "s1")
+}
diff --git a/utils/retry_test.go b/utils/retry_test.go
index 2761b9c..4a000df 100644
--- a/utils/retry_test.go
+++ b/utils/retry_test.go
@@ -4,19 +4,42 @@ import (
 	"context"
 	"errors"
 	"testing"
+	"time"
 
 	"github.com/stretchr/testify/assert"
 )
 
 func TestBackoffRetry(t *testing.T) {
+	var errNotSuccess = errors.New("not success")
 	i := 0
 	f := func() error {
 		i++
 		if i < 4 {
-			return errors.New("xxx")
+			return errNotSuccess
 		}
 		return nil
 	}
 	assert.Nil(t, BackoffRetry(context.Background(), 10, f))
 	assert.Equal(t, 4, i)
+
+	i = 0
+	assert.Equal(t, errNotSuccess, BackoffRetry(context.Background(), 0, f))
+	assert.Equal(t, 1, i)
+}
+
+func TestBackoffRetryWithCancel(t *testing.T) {
+	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+	defer cancel()
+
+	var errNotSuccess = errors.New("not success")
+	i := 0
+	f := func() error {
+		i++
+		if i < 4 {
+			return errNotSuccess
+		}
+		return nil
+	}
+	assert.Equal(t, context.DeadlineExceeded, BackoffRetry(ctx, 10, f))
+	assert.NotEqual(t, 4, i)
 }
diff --git a/utils/sync.go b/utils/sync.go
index efe493b..3d909a3 100644
--- a/utils/sync.go
+++ b/utils/sync.go
@@ -2,29 +2,8 @@ package utils
 
 import (
 	"sync"
-	"sync/atomic"
 )
 
-// AtomicBool indicates an atomic boolean instance.
-type AtomicBool struct {
-	i32 int32
-}
-
-// Bool .
-func (a *AtomicBool) Bool() bool {
-	return atomic.LoadInt32(&a.i32) == 1
-}
-
-// Set to true.
-func (a *AtomicBool) Set() {
-	atomic.StoreInt32(&a.i32, 1)
-}
-
-// Unset to false.
-func (a *AtomicBool) Unset() {
-	atomic.StoreInt32(&a.i32, 0)
-}
-
 // GroupCAS indicates cas locks which are grouped by keys.
 type GroupCAS struct {
 	groups sync.Map