Skip to content

Commit

Permalink
unit tests (#75)
Browse files Browse the repository at this point in the history
  • Loading branch information
DuodenumL authored Oct 1, 2021
1 parent b5897c9 commit f622836
Show file tree
Hide file tree
Showing 19 changed files with 246 additions and 75 deletions.
1 change: 0 additions & 1 deletion .github/workflows/dockerimage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,4 @@ jobs:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
build_args: KEEP_SYMBOL=1
repository: projecteru2/agent
tags: ${{ github.sha }}-debug
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ binary:

unit-test:
go vet `go list ./... | grep -v '/vendor/'`
go test -race -count=1 -cover ./logs/... \
go test -race -count=1 -timeout 240s -cover ./logs/... \
./manager/node/... \
./manager/workload/... \
./selfmon/... \
Expand Down
2 changes: 2 additions & 0 deletions logs/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func NewWriter(ctx context.Context, addr string, stdout bool) (writer *Writer, e
case err == common.ErrInvalidScheme:
log.Infof("[writer] create an empty writer for %s success", addr)
writer.enc = NewStreamEncoder(discard{})
case err == errJournalDisabled:
return nil, err
case err != nil:
log.Errorf("[writer] failed to create writer encoder for %s, err: %v, will retry", addr, err)
writer.needReconnect = true
Expand Down
112 changes: 76 additions & 36 deletions logs/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"net"
"testing"
"time"

"github.com/projecteru2/agent/types"

Expand All @@ -17,56 +18,95 @@ func TestNewWriterWithUDP(t *testing.T) {
addr := "udp://127.0.0.1:23456"
w, err := NewWriter(ctx, addr, true)
assert.NoError(t, err)

enc, err := w.createUDPEncoder()
assert.NoError(t, err)

w.withLock(func() {
w.enc = enc
})
assert.NoError(t, w.Write(&types.Log{}))
}

func TestNewWriterWithTCP(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// tcp writer
addr := "tcp://127.0.0.1:34567"
tcpL, err := net.Listen("tcp", ":34567")
assert.NoError(t, err)

tcpL, err := net.Listen("tcp", ":34567")
defer tcpL.Close()
addr := "tcp://127.0.0.1:34567"
w, err := NewWriter(ctx, addr, true)
assert.NoError(t, err)
assert.NoError(t, w.Write(&types.Log{}))
}

enc, err := w.createTCPEncoder()
func TestNewWriterWithJournal(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
addr := "journal://system"
enc, err := CreateJournalEncoder()
if err == errJournalDisabled {
return
}
assert.NoError(t, err)
defer enc.Close()

w.withLock(func() {
w.enc = enc
w, err := NewWriter(ctx, addr, true)
assert.NoError(t, err)

w.enc = enc
err = w.enc.Encode(&types.Log{
ID: "id",
Name: "name",
Type: "type",
EntryPoint: "entrypoint",
Ident: "ident",
Data: "data",
Datetime: "datetime",
Extra: map[string]string{"a": "1", "b": "2"},
})
assert.NoError(t, w.Write(&types.Log{}))
assert.NoError(t, err)
}

func TestNewWriters(t *testing.T) {
cases := map[string]error{
Discard: nil,
"udp://127.0.0.1:23456": nil,
"tcp://127.0.0.1:34567": nil,
"journal://system": errJournalDisabled,
"invalid://hhh": nil,
}
tcpL, err := net.Listen("tcp", ":34567")
assert.NoError(t, err)
defer tcpL.Close()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

for addr, expectedErr := range cases {
go func(addr string, expectedErr error) {
writer, err := NewWriter(ctx, addr, false)
assert.Equal(t, err, expectedErr)
if expectedErr != nil {
return
}
assert.NoError(t, err)
err = writer.Write(&types.Log{})
assert.NoError(t, err)
}(addr, expectedErr)
}
// wait for closing all writers
time.Sleep(CloseWaitInterval + 2*time.Second)
}

// func TestNewWriterWithJournal(t *testing.T) {
// addr := "journal://system"
// enc, err := CreateJournalEncoder()
// assert.NoError(t, err)
// defer enc.Close()

// w, err := NewWriter(addr, true)
// assert.NoError(t, err)

// w.enc = enc
// err = w.enc.Encode(&types.Log{
// ID: "id",
// Name: "name",
// Type: "type",
// EntryPoint: "entrypoint",
// Ident: "ident",
// Data: "data",
// Datetime: "datetime",
// Extra: map[string]string{"a": "1", "b": "2"},
// })
// assert.NoError(t, err)
// }
func TestReconnect(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

addr := "tcp://127.0.0.1:34567"
writer, err := NewWriter(ctx, addr, false)
assert.NoError(t, err)
assert.Nil(t, writer.enc)
assert.Equal(t, writer.needReconnect, true)

tcpL, err := net.Listen("tcp", ":34567")
assert.NoError(t, err)
defer tcpL.Close()

writer.reconnect()
assert.NoError(t, writer.Write(&types.Log{}))
}
22 changes: 21 additions & 1 deletion manager/node/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package node
import (
"context"
"testing"
"time"

runtimemocks "github.com/projecteru2/agent/runtime/mocks"
storemocks "github.com/projecteru2/agent/store/mocks"
Expand All @@ -11,7 +12,8 @@ import (
)

func TestNodeStatusReport(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
manager := newMockNodeManager(t)
runtime := manager.runtimeClient.(*runtimemocks.Nerv)
store := manager.store.(*storemocks.MockStore)
Expand All @@ -28,3 +30,21 @@ func TestNodeStatusReport(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, status.Alive, true)
}

func TestHeartbeat(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
manager := newMockNodeManager(t)
store := manager.store.(*storemocks.MockStore)

status, err := store.GetNodeStatus(ctx, "fake")
assert.Nil(t, err)
assert.Equal(t, status.Alive, false)

go manager.heartbeat(ctx)

time.Sleep(time.Duration(manager.config.HeartbeatInterval+2) * time.Second)
status, err = store.GetNodeStatus(ctx, "fake")
assert.Nil(t, err)
assert.Equal(t, status.Alive, true)
}
26 changes: 26 additions & 0 deletions manager/node/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/projecteru2/agent/common"
storemocks "github.com/projecteru2/agent/store/mocks"
"github.com/projecteru2/agent/types"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -34,3 +35,28 @@ func newMockNodeManager(t *testing.T) *Manager {
assert.Nil(t, err)
return m
}

func TestRun(t *testing.T) {
manager := newMockNodeManager(t)
store := manager.store.(*storemocks.MockStore)

ctx, cancel := context.WithTimeout(context.Background(), time.Duration(manager.config.HeartbeatInterval*3)*time.Second)
defer cancel()

status, err := store.GetNodeStatus(ctx, "fake")
assert.Nil(t, err)
assert.Equal(t, status.Alive, false)

go func() {
time.Sleep(time.Duration(manager.config.HeartbeatInterval*2) * time.Second)
status, err := store.GetNodeStatus(ctx, "fake")
assert.Nil(t, err)
assert.Equal(t, status.Alive, true)
}()

assert.Nil(t, manager.Run(ctx))

info, err := store.GetNode(ctx, "fake")
assert.Nil(t, err)
assert.Equal(t, info.Available, false)
}
1 change: 0 additions & 1 deletion manager/workload/attach.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func (m *Manager) attach(ctx context.Context, ID string) {
}
if err := writer.Write(l); err != nil && !(entryPoint == "agent" && utils.IsDockerized()) {
log.Errorf("[attach] %s workload %s_%s write failed %v", workloadName, entryPoint, ID, err)
log.Errorf("[attach] %s", data)
}
}
}
Expand Down
1 change: 0 additions & 1 deletion manager/workload/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ func (m *Manager) initWorkloadStatus(ctx context.Context) error {
go m.attach(ctx, ID)
}

// no health check here
if err := m.setWorkloadStatus(ctx, workloadStatus); err != nil {
log.Errorf("[initWorkloadStatus] update workload %v status failed %v", ID, err)
}
Expand Down
1 change: 0 additions & 1 deletion manager/workload/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ func (l *logBroadcaster) broadcast(log *types.Log) {
sub.errChan <- err
return
}

sub.buf.Flush()
}(ID, sub)
}
Expand Down
47 changes: 40 additions & 7 deletions manager/workload/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,36 @@ func TestLogBroadcaster(t *testing.T) {
manager := newMockWorkloadManager(t)
logrus.SetLevel(logrus.DebugLevel)

logCtx, logCancel := context.WithCancel(context.Background())
defer logCancel()

handler := func(w http.ResponseWriter, req *http.Request) {
app := req.URL.Query().Get("app")
if app == "" {
w.WriteHeader(http.StatusBadRequest)
return
}
// fuck httpie
w.WriteHeader(http.StatusOK)
if hijack, ok := w.(http.Hijacker); ok {
conn, buf, err := hijack.Hijack()
assert.Nil(t, err)
if err != nil {
return
}
defer conn.Close()
manager.PullLog(req.Context(), app, buf)
manager.PullLog(logCtx, app, buf)
}
}
server := &http.Server{Addr: ":12310"}

go func() {
restfulAPIServer := pat.New()
restfulAPIServer.Add("GET", "/log/", http.HandlerFunc(handler))
http.Handle("/", restfulAPIServer)
http.ListenAndServe(":12310", nil)
server.Handler = restfulAPIServer
assert.Equal(t, server.ListenAndServe(), http.ErrServerClosed)
}()

go func() {
// wait for subscribers
time.Sleep(3 * time.Second)
manager.logBroadcaster.logC <- &types.Log{
ID: "Rei",
Expand All @@ -63,14 +69,41 @@ func TestLogBroadcaster(t *testing.T) {
defer cancel()
go manager.logBroadcaster.run(ctx)

time.Sleep(2 * time.Second)
resp, err := http.Get("http://127.0.0.1:12310/log/?app=nerv")
// wait for http server to start
time.Sleep(time.Second)

reqCtx, reqCancel := context.WithTimeout(ctx, 5*time.Second)
defer reqCancel()

req, err := http.NewRequestWithContext(reqCtx, "GET", "http://127.0.0.1:12310/log/?app=nerv", nil)
assert.Nil(t, err)

resp, err := http.DefaultClient.Do(req)
assert.Nil(t, err)
defer resp.Body.Close()

reader := bufio.NewReader(resp.Body)
for i := 0; i < 2; i++ {
line, err := reader.ReadBytes('\n')
assert.Nil(t, err)
t.Log(string(line))
}

logCancel()
// wait log subscriber to be removed
time.Sleep(time.Second)

manager.logBroadcaster.logC <- &types.Log{
ID: "Rei",
Name: "nerv",
Type: "stdout",
EntryPoint: "eva0",
Data: "data1",
}
count := 0
manager.logBroadcaster.subscribersMap.Range(func(key, value interface{}) bool {
count++
return true
})
assert.Equal(t, count, 0)
}
8 changes: 7 additions & 1 deletion manager/workload/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ func TestRun(t *testing.T) {
runtime := manager.runtimeClient.(*mocks.Nerv)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
go runtime.StartEvents()
go func() {
runtime.StartEvents()
runtime.StartCustomEvent(&types.WorkloadEventMessage{
ID: "Kaworu",
Action: "start",
})
}()
assert.Nil(t, manager.Run(ctx))
}
6 changes: 6 additions & 0 deletions runtime/mocks/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func FromTemplate() runtime.Runtime {
n.withLock(func() {
v, ok := n.workloads.Load(ID)
if !ok {
status = &types.WorkloadStatus{ID: ID}
return
}
workload := v.(*eva)
Expand Down Expand Up @@ -179,6 +180,11 @@ func (n *Nerv) StartEvents() {
}
}

// StartCustomEvent .
func (n *Nerv) StartCustomEvent(event *types.WorkloadEventMessage) {
n.msgChan <- event
}

// SetDaemonRunning set `daemonRunning`
func (n *Nerv) SetDaemonRunning(status bool) {
n.daemonRunning = status
Expand Down
Loading

0 comments on commit f622836

Please sign in to comment.