Skip to content

Commit

Permalink
changes plugins reader and writer method
Browse files Browse the repository at this point in the history
// PluginReader is an interface for input plugins
type PluginReader interface {
	PluginRead() (msg *Message, err error)
}

// PluginWriter is an interface for output plugins
type PluginWriter interface {
	PluginWrite(msg *Message) (n int, err error)
}
  • Loading branch information
Urban Ishimwe committed Nov 2, 2020
1 parent 20435af commit 6d812ce
Show file tree
Hide file tree
Showing 39 changed files with 716 additions and 781 deletions.
2 changes: 1 addition & 1 deletion elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,14 @@ func (p *ESPlugin) RttDurationToMs(d time.Duration) int64 {
return int64(fl)
}

// ResponseAnalyze send req and resp to ES
func (p *ESPlugin) ResponseAnalyze(req, resp []byte, start, stop time.Time) {
if len(resp) == 0 {
// nil http response - skipped elasticsearch export for this request
return
}
t := time.Now()
rtt := p.RttDurationToMs(stop.Sub(start))
req = payloadBody(req)

esResp := ESRequestResponse{
ReqURL: string(proto.Path(req)),
Expand Down
176 changes: 60 additions & 116 deletions emitter.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
package main

import (
"bytes"
"fmt"
"hash/fnv"
"io"
"log"
"sync"
"time"

"github.com/buger/goreplay/byteutils"
)

type emitter struct {
// Emitter represents an abject to manage plugins communication
type Emitter struct {
sync.Mutex
sync.WaitGroup
quit chan int
plugins *InOutPlugins
}

// NewEmitter creates and initializes new `emitter` object.
func NewEmitter(quit chan int) *emitter {
return &emitter{
quit: quit,
}
// NewEmitter creates and initializes new Emitter object.
func NewEmitter() *Emitter {
return &Emitter{}
}

// Start initialize loop for sending data from inputs to outputs
func (e *emitter) Start(plugins *InOutPlugins, middlewareCmd string) {
func (e *Emitter) Start(plugins *InOutPlugins, middlewareCmd string) {
defer e.Wait()
if Settings.CopyBufferSize < 1 {
Settings.CopyBufferSize = 5 << 20
Expand All @@ -38,147 +38,92 @@ func (e *emitter) Start(plugins *InOutPlugins, middlewareCmd string) {
middleware.ReadFrom(in)
}

// We are going only to read responses, so using same ReadFrom method
for _, out := range plugins.Outputs {
if r, ok := out.(io.Reader); ok {
middleware.ReadFrom(r)
}
}
e.plugins.Inputs = append(e.plugins.Inputs, middleware)
e.plugins.All = append(e.plugins.All, middleware)
e.Add(1)
go func() {
defer e.Done()
if err := CopyMulty(e.quit, middleware, plugins.Outputs...); err != nil {
Debug(2, "Error during copy: ", err)
e.Close()
}
}()
go func() {
for {
select {
case <-e.quit:
middleware.Close()
return
}
if err := CopyMulty(middleware, plugins.Outputs...); err != nil {
Debug(2, fmt.Sprintf("[EMITTER] error during copy: %q", err))
}
}()
} else {
for _, in := range plugins.Inputs {
e.Add(1)
go func(in io.Reader) {
go func(in PluginReader) {
defer e.Done()
if err := CopyMulty(e.quit, in, plugins.Outputs...); err != nil {
Debug(2, "Error during copy: ", err)
e.Close()
if err := CopyMulty(in, plugins.Outputs...); err != nil {
Debug(2, fmt.Sprintf("[EMITTER] error during copy: %q", err))
}
}(in)
}

for _, out := range plugins.Outputs {
if r, ok := out.(io.Reader); ok {
e.Add(1)
go func(r io.Reader) {
defer e.Done()
if err := CopyMulty(e.quit, r, plugins.Outputs...); err != nil {
Debug(2, "Error during copy: ", err)
e.Close()
}
}(r)
}
}
}
}

func (e *emitter) close() {
select {
case <-e.quit:
default:
close(e.quit)
}
}

// Close closes all the goroutine and waits for it to finish.
func (e *emitter) Close() {
e.close()
for _, p := range e.plugins.Inputs {
func (e *Emitter) Close() {
for _, p := range e.plugins.All {
if cp, ok := p.(io.Closer); ok {
cp.Close()
}
}
for _, p := range e.plugins.Outputs {
if cp, ok := p.(io.Closer); ok {
cp.Close()
}
}
e.plugins = nil // avoid further accidental usage
e.plugins.All = nil // avoid further accidental close
}

// CopyMulty copies from 1 reader to multiple writers
func CopyMulty(stop chan int, src io.Reader, writers ...io.Writer) error {
buf := make([]byte, Settings.CopyBufferSize)
func CopyMulty(src PluginReader, writers ...PluginWriter) error {
wIndex := 0
modifier := NewHTTPModifier(&Settings.ModifierConfig)
filteredRequests := make(map[string]time.Time)
filteredRequestsLastCleanTime := time.Now()
filteredRequests := make(map[string]int64)
filteredRequestsLastCleanTime := time.Now().UnixNano()
filteredCount := 0

i := 0
for {
var nr int
nr, err := src.Read(buf)

select {
case <-stop:
return nil
default:
}
msg, err := src.PluginRead()
if err != nil {
if err == ErrorStopped || err == io.EOF {
return nil
}
return err
}

_maxN := nr
if nr > 500 {
_maxN = 500
}
if nr > 0 {
payload := buf[:nr]
meta := payloadMeta(payload)
if msg != nil && len(msg.Data) > 0 {
if len(msg.Data) > int(Settings.CopyBufferSize) {
msg.Data = msg.Data[:Settings.CopyBufferSize]
}
meta := payloadMeta(msg.Meta)
if len(meta) < 3 {
Debug(2, "[EMITTER] Found malformed record", string(payload[0:_maxN]), nr, "from:", src)
Debug(2, fmt.Sprintf("[EMITTER] Found malformed record %q from %q", msg.Meta, src))
continue
}
requestID := string(meta[1])

Debug(3, "[EMITTER] input:", string(payload[0:_maxN]), nr, "from:", src)

requestID := byteutils.SliceToString(meta[1])
// start a subroutine only when necessary
if Settings.Verbose >= 3 {
Debug(3, "[EMITTER] input: ", byteutils.SliceToString(msg.Meta[:len(msg.Meta)-1]), " from: ", src)
}
if modifier != nil {
if isRequestPayload(payload) {
headSize := bytes.IndexByte(payload, '\n') + 1
body := payload[headSize:]
originalBodyLen := len(body)
body = modifier.Rewrite(body)

Debug(3, "[EMITTER] modifier:", requestID, "from:", src)
if isRequestPayload(msg.Meta) {
msg.Data = modifier.Rewrite(msg.Data)
// If modifier tells to skip request
if len(body) == 0 {
filteredRequests[requestID] = time.Now()
if len(msg.Data) == 0 {
filteredRequests[requestID] = time.Now().UnixNano()
filteredCount++
continue
}

if originalBodyLen != len(body) {
payload = append(payload[:headSize], body...)
}

Debug(3, "[EMITTER] Rewritten input:", len(payload), "First %d bytes:", _maxN, string(payload[0:_maxN]))
Debug(3, "[EMITTER] Rewritten input:", requestID, "from:", src)

} else {
if _, ok := filteredRequests[requestID]; ok {
delete(filteredRequests, requestID)
filteredCount--
continue
}
}
}

if Settings.PrettifyHTTP {
payload = prettifyHTTP(payload)
if len(payload) == 0 {
msg.Data = prettifyHTTP(msg.Data)
if len(msg.Data) == 0 {
continue
}
}
Expand All @@ -189,15 +134,15 @@ func CopyMulty(stop chan int, src io.Reader, writers ...io.Writer) error {
log.Fatal("Detailed TCP sessions work only with PRO license")
}
hasher := fnv.New32a()
// First 20 bytes contain tcp session
id := payloadID(payload)
hasher.Write(id)
hasher.Write(meta[1])

wIndex = int(hasher.Sum32()) % len(writers)
writers[wIndex].Write(payload)
if _, err := writers[wIndex].PluginWrite(msg); err != nil {
return err
}
} else {
// Simple round robin
if _, err := writers[wIndex].Write(payload); err != nil {
if _, err := writers[wIndex].PluginWrite(msg); err != nil {
return err
}

Expand All @@ -209,27 +154,26 @@ func CopyMulty(stop chan int, src io.Reader, writers ...io.Writer) error {
}
} else {
for _, dst := range writers {
if _, err := dst.Write(payload); err != nil {
if _, err := dst.PluginWrite(msg); err != nil {
return err
}
}
}
}

// Run GC on each 1000 request
if i%1000 == 0 {
if filteredCount > 0 && filteredCount%1000 == 0 {
// Clean up filtered requests for which we didn't get a response to filter
now := time.Now()
if now.Sub(filteredRequestsLastCleanTime) > 60*time.Second {
now := time.Now().UnixNano()
if now-filteredRequestsLastCleanTime > int64(60*time.Second) {
for k, v := range filteredRequests {
if now.Sub(v) > 60*time.Second {
if now-v > int64(60*time.Second) {
delete(filteredRequests, k)
filteredCount--
}
}
filteredRequestsLastCleanTime = time.Now()
filteredRequestsLastCleanTime = time.Now().UnixNano()
}
}

i++
}
}
Loading

0 comments on commit 6d812ce

Please sign in to comment.