From 088d46113b0c5e88e8a3dc39e721b6adff676ddc Mon Sep 17 00:00:00 2001 From: Ivan Stankov Date: Tue, 30 May 2023 18:08:24 +0300 Subject: [PATCH 1/2] capture OnTimer --- input_raw_test.go | 64 +++++++++++++++++++++++++++++++++++-- internal/capture/capture.go | 12 +++++-- 2 files changed, 70 insertions(+), 6 deletions(-) diff --git a/input_raw_test.go b/input_raw_test.go index cf5e6673..a1f88f57 100644 --- a/input_raw_test.go +++ b/input_raw_test.go @@ -2,9 +2,6 @@ package goreplay import ( "bytes" - "github.com/buger/goreplay/internal/capture" - "github.com/buger/goreplay/internal/tcp" - "github.com/buger/goreplay/proto" "io/ioutil" "net" "net/http" @@ -16,6 +13,10 @@ import ( "sync/atomic" "testing" "time" + + "github.com/buger/goreplay/internal/capture" + "github.com/buger/goreplay/internal/tcp" + "github.com/buger/goreplay/proto" ) const testRawExpire = time.Millisecond * 200 @@ -355,3 +356,60 @@ func BenchmarkRAWInputWithReplay(b *testing.B) { b.ReportMetric(float64(replayCounter), "replayed") emitter.Close() } + +func TestRAWInputOnTimer(t *testing.T) { + listener, err := net.Listen("tcp4", "127.0.0.1:0") + if err != nil { + t.Error(err) + return + } + defer listener.Close() + + origin := &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("ab")) + }), + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + } + go origin.Serve(listener) + + wg := new(sync.WaitGroup) + wg.Add(1) + called := false + onTimer := func() { + if !called { + called = true + } + wg.Done() + } + + conf := RAWInputConfig{ + OnTimer: onTimer, + } + input := NewRAWInput(listener.Addr().String(), conf) + output := NewTestOutput(func(_ *Message) {}) + plugins := &InOutPlugins{ + Inputs: []PluginReader{input}, + Outputs: []PluginWriter{output}, + } + plugins.All = append(plugins.All, input, output) + + emitter := NewEmitter() + defer emitter.Close() + go emitter.Start(plugins, Settings.Middleware) + + _, port, _ := net.SplitHostPort(listener.Addr().String()) + addr := "http://127.0.0.1:" + port + _, err = http.Get(addr) + if err != nil { + t.Error(err) + return + } + + wg.Wait() + + if !called { + t.Error("want call OnTimer") + } +} diff --git a/internal/capture/capture.go b/internal/capture/capture.go index b2a31f1d..6dcbac33 100644 --- a/internal/capture/capture.go +++ b/internal/capture/capture.go @@ -5,9 +5,6 @@ import ( "errors" "expvar" "fmt" - "github.com/buger/goreplay/internal/size" - "github.com/buger/goreplay/internal/tcp" - "github.com/buger/goreplay/proto" "io" "log" "net" @@ -18,6 +15,10 @@ import ( "syscall" "time" + "github.com/buger/goreplay/internal/size" + "github.com/buger/goreplay/internal/tcp" + "github.com/buger/goreplay/proto" + "github.com/google/gopacket" "github.com/google/gopacket/layers" "github.com/google/gopacket/pcap" @@ -68,6 +69,8 @@ type PcapOptions struct { AllowIncomplete bool `json:"input-raw-allow-incomplete"` IgnoreInterface []string `json:"input-raw-ignore-interface"` Transport string + + OnTimer func() `json:"-"` } // Listener handle traffic capture, this is its representation. @@ -552,6 +555,9 @@ func (l *Listener) readHandle(key string, hndl packetHandle) { case <-l.quit: return case <-timer.C: + if l.config.OnTimer != nil { + l.config.OnTimer() + } if h, ok := hndl.handler.(PcapStatProvider); ok { s, err := h.Stats() if err == nil { From 58a655ba5d175ea57074ac64addc1edc7609597b Mon Sep 17 00:00:00 2001 From: Ivan Stankov Date: Tue, 30 May 2023 18:55:48 +0300 Subject: [PATCH 2/2] fix test --- input_raw_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/input_raw_test.go b/input_raw_test.go index a1f88f57..577d79ab 100644 --- a/input_raw_test.go +++ b/input_raw_test.go @@ -380,8 +380,8 @@ func TestRAWInputOnTimer(t *testing.T) { onTimer := func() { if !called { called = true + wg.Done() } - wg.Done() } conf := RAWInputConfig{