diff --git a/.gitignore b/.gitignore index 323546a..03f0492 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +.env + # Compiled Object files, Static and Dynamic libs (Shared Objects) *.o *.a diff --git a/.travis.yml b/.travis.yml index 6247ff3..3d6cdb0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,8 +2,5 @@ language: go go: - 1.1 - tip -before_install: - - go get github.com/mattn/gom -script: - - $HOME/gopath/bin/gom install - - $HOME/gopath/bin/gom test +install: make deps +script: go test -v -bench . diff --git a/Gomfile b/Gomfile deleted file mode 100644 index d173b89..0000000 --- a/Gomfile +++ /dev/null @@ -1 +0,0 @@ -gom 'github.com/stretchr/testify' diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..f57a138 --- /dev/null +++ b/Makefile @@ -0,0 +1,12 @@ +test: + go test -v . + +bench: + go test -bench . + +deps: + go get github.com/stretchr/testify + +dev-deps: + go get github.com/nsf/gocode + go get code.google.com/p/rog-go/exp/cmd/godef diff --git a/benchmarks/bufio_read.go b/benchmarks/bufio_read.go new file mode 100644 index 0000000..e37a66d --- /dev/null +++ b/benchmarks/bufio_read.go @@ -0,0 +1,23 @@ +// Example program that reads big nginx file from stdin line by line +// and measure reading time. The file should be big enough, at least 500K lines +package main + +import ( + "bufio" + "fmt" + "os" + "time" +) + +func main() { + scanner := bufio.NewScanner(os.Stdin) + var count int + start := time.Now() + for scanner.Scan() { + // A dummy action, jest read line by line + scanner.Text() + count++ + } + duration := time.Since(start) + fmt.Printf("%v lines readed, it takes %v\n", count, duration) +} diff --git a/benchmarks/bufio_read_entry.go b/benchmarks/bufio_read_entry.go new file mode 100644 index 0000000..5fa64d3 --- /dev/null +++ b/benchmarks/bufio_read_entry.go @@ -0,0 +1,29 @@ +// Example program that reads big nginx file from stdin line by line +// and measure reading time. The file should be big enough, at least 500K lines +package main + +import ( + gonx ".." + "bufio" + "fmt" + "os" + "time" +) + +func main() { + scanner := bufio.NewScanner(os.Stdin) + var count int + format := `$remote_addr - $remote_user [$time_local] "$request" $status ` + + `$body_bytes_sent "$http_referer" "$http_user_agent" "$http_x_forwarded_for" ` + + `"$cookie_uid" "$cookie_userid" "$request_time" "$http_host" "$is_ajax" ` + + `"$uid_got/$uid_set" "$msec" "$geoip_country_code"` + parser := gonx.NewParser(format) + start := time.Now() + for scanner.Scan() { + // A dummy action, jest read line by line + parser.ParseString(scanner.Text()) + count++ + } + duration := time.Since(start) + fmt.Printf("%v lines readed, it takes %v\n", count, duration) +} diff --git a/benchmarks/gonx_read_entry.go b/benchmarks/gonx_read_entry.go new file mode 100644 index 0000000..e4b6fbf --- /dev/null +++ b/benchmarks/gonx_read_entry.go @@ -0,0 +1,36 @@ +// Example program that reads big nginx file from stdin line by line +// and measure reading time. The file should be big enough, at least 500K lines +package main + +import ( + gonx ".." + "fmt" + "io" + "os" + "runtime" + "time" +) + +func init() { + numcpu := runtime.NumCPU() + runtime.GOMAXPROCS(numcpu + 1) +} + +func main() { + var count int + format := `$remote_addr - $remote_user [$time_local] "$request" $status ` + + `$body_bytes_sent "$http_referer" "$http_user_agent" "$http_x_forwarded_for" ` + + `"$cookie_uid" "$cookie_userid" "$request_time" "$http_host" "$is_ajax" ` + + `"$uid_got/$uid_set" "$msec" "$geoip_country_code"` + reader := gonx.NewReader(os.Stdin, format) + start := time.Now() + for { + _, err := reader.Read() + if err == io.EOF { + break + } + count++ + } + duration := time.Since(start) + fmt.Printf("%v lines readed, it takes %v\n", count, duration) +} diff --git a/entry.go b/entry.go new file mode 100644 index 0000000..7e43fe8 --- /dev/null +++ b/entry.go @@ -0,0 +1,19 @@ +package gonx + +import ( + "fmt" +) + +// Parsed log record. Use Get method to retrieve a value by name instead of +// threating this as a map, because inner representation is in design. +type Entry map[string]string + +// Return entry field value by name or empty string and error if it +// does not exist. +func (entry *Entry) Get(name string) (value string, err error) { + value, ok := (*entry)[name] + if !ok { + err = fmt.Errorf("Field '%v' does not found in record %+v", name, *entry) + } + return +} diff --git a/entry_test.go b/entry_test.go new file mode 100644 index 0000000..a5823c0 --- /dev/null +++ b/entry_test.go @@ -0,0 +1,20 @@ +package gonx + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestEntry(t *testing.T) { + entry := Entry{"foo": "1"} + + // Get existings field + val, err := entry.Get("foo") + assert.NoError(t, err) + assert.Equal(t, val, "1") + + // Get field that does not exist + val, err = entry.Get("bar") + assert.Error(t, err) + assert.Equal(t, val, "") +} diff --git a/example/common/common.go b/example/common/common.go index 58d028d..0ba930b 100644 --- a/example/common/common.go +++ b/example/common/common.go @@ -1,9 +1,9 @@ package main import ( + gonx "../.." "flag" "fmt" - "github.com/satyrius/gonx" "io" "os" "strings" diff --git a/example/nginx/nginx.go b/example/nginx/nginx.go index eeecd59..f606826 100644 --- a/example/nginx/nginx.go +++ b/example/nginx/nginx.go @@ -1,33 +1,59 @@ package main import ( + gonx "../.." "flag" "fmt" - "github.com/satyrius/gonx" "io" "os" + "strings" ) var conf string var format string +var logFile string func init() { - flag.StringVar(&conf, "conf", "/etc/nginx/nginx.conf", "Nginx config file") + flag.StringVar(&conf, "conf", "dummy", "Nginx config file (e.g. /etc/nginx/nginx.conf)") flag.StringVar(&format, "format", "main", "Nginx log_format name") + flag.StringVar(&logFile, "log", "dummy", "Log file name to read. Read from STDIN if file name is '-'") } func main() { flag.Parse() + // Read given file or from STDIN + var file io.Reader + if logFile == "dummy" { + file = strings.NewReader(`89.234.89.123 [08/Nov/2013:13:39:18 +0000] "GET /api/foo/bar HTTP/1.1"`) + } else if logFile == "-" { + file = os.Stdin + } else { + file, err := os.Open(logFile) + if err != nil { + panic(err) + } + defer file.Close() + } + // Use nginx config file to extract format by the name - nginxConfig, err := os.Open(conf) - if err != nil { - panic(err) + var nginxConfig io.Reader + if conf == "dummy" { + nginxConfig = strings.NewReader(` + http { + log_format main '$remote_addr [$time_local] "$request"'; + } + `) + } else { + nginxConfig, err := os.Open(conf) + if err != nil { + panic(err) + } + defer nginxConfig.Close() } - defer nginxConfig.Close() // Read from STDIN and use log_format to parse log records - reader, err := gonx.NewNginxReader(os.Stdin, nginxConfig, format) + reader, err := gonx.NewNginxReader(file, nginxConfig, format) if err != nil { panic(err) } diff --git a/mapreduce.go b/mapreduce.go new file mode 100644 index 0000000..b88c845 --- /dev/null +++ b/mapreduce.go @@ -0,0 +1,88 @@ +package gonx + +import ( + "bufio" + "io" + "sync" +) + +func handleError(err error) { + //fmt.Fprintln(os.Stderr, err) +} + +// Iterate over given file and map each it's line into Entry record using +// parser and apply reducer to the Entries channel. Execution terminates +// when result will be readed from reducer's output channel, but the mapper +// works and fills input Entries channel until all lines will be read from +// the fiven file. +func MapReduce(file io.Reader, parser *Parser, reducer Reducer) interface{} { + // Input file lines. This channel is unbuffered to publish + // next line to handle only when previous is taken by mapper. + var lines = make(chan string) + + // Host thread to spawn new mappers + var entries = make(chan Entry, 10) + go func(topLoad int) { + // Create semafore channel with capacity equal to the output channel + // capacity. Use it to control mapper goroutines spawn. + var sem = make(chan bool, topLoad) + for i := 0; i < topLoad; i++ { + // Ready to go! + sem <- true + } + + var wg sync.WaitGroup + for { + // Wait until semaphore becomes available and run a mapper + if !<-sem { + // Stop the host loop if false received from semaphore + break + } + wg.Add(1) + go func() { + defer wg.Done() + // Take next file line to map. Check is channel closed. + line, ok := <-lines + // Return immediately if lines channel is closed + if !ok { + // Send false to semaphore channel to indicate that job's done + sem <- false + return + } + entry, err := parser.ParseString(line) + if err == nil { + // Write result Entry to the output channel. This will + // block goroutine runtime until channel is free to + // accept new item. + entries <- entry + } else { + handleError(err) + } + // Increment semaphore to allow new mapper workers to spawn + sem <- true + }() + } + // Wait for all mappers to complete, then send a quit signal + wg.Wait() + close(entries) + }(cap(entries)) + + // Run reducer routine. + var output = make(chan interface{}) + go reducer.Reduce(entries, output) + + go func() { + scanner := bufio.NewScanner(file) + for scanner.Scan() { + // Read next line from the file and feed mapper routines. + lines <- scanner.Text() + } + close(lines) + + if err := scanner.Err(); err != nil { + handleError(err) + } + }() + + return <-output +} diff --git a/parser.go b/parser.go new file mode 100644 index 0000000..0c16e4e --- /dev/null +++ b/parser.go @@ -0,0 +1,83 @@ +package gonx + +import ( + "bufio" + "fmt" + "io" + "regexp" +) + +// Log record parser. Use specific constructors to initialize it. +type Parser struct { + format string + regexp *regexp.Regexp +} + +// Returns a new Parser, use given log format to create its internal +// strings parsing regexp. +func NewParser(format string) *Parser { + re := regexp.MustCompile(`\\\$([a-z_]+)(\\?(.))`).ReplaceAllString( + regexp.QuoteMeta(format), "(?P<$1>[^$3]+)$2") + return &Parser{format, regexp.MustCompile(fmt.Sprintf("^%v$", re))} +} + +// Parse log file line using internal format regexp. If line do not match +// given format an error will be returned. +func (parser *Parser) ParseString(line string) (entry Entry, err error) { + re := parser.regexp + fields := re.FindStringSubmatch(line) + if fields == nil { + err = fmt.Errorf("Access log line '%v' does not match given format '%v'", line, re) + return + } + + // Iterate over subexp foung and fill the map record + entry = make(Entry) + for i, name := range re.SubexpNames() { + if i == 0 { + continue + } + entry[name] = fields[i] + } + return +} + +// NewNginxParser parse nginx conf file to find log_format with given name and +// returns parser for this format. It returns an error if cannot find the needle. +func NewNginxParser(conf io.Reader, name string) (parser *Parser, err error) { + scanner := bufio.NewScanner(conf) + re := regexp.MustCompile(fmt.Sprintf(`^.*log_format\s+%v\s+(.+)\s*$`, name)) + found := false + var format string + for scanner.Scan() { + var line string + if !found { + // Find a log_format definition + line = scanner.Text() + formatDef := re.FindStringSubmatch(line) + if formatDef == nil { + continue + } + found = true + line = formatDef[1] + } else { + line = scanner.Text() + } + // Look for a definition end + re = regexp.MustCompile(`^\s*(.*?)\s*(;|$)`) + lineSplit := re.FindStringSubmatch(line) + if l := len(lineSplit[1]); l > 2 { + format += lineSplit[1][1 : l-1] + } + if lineSplit[2] == ";" { + break + } + } + if !found { + err = fmt.Errorf("`log_format %v` not found in given config", name) + } else { + err = scanner.Err() + } + parser = NewParser(format) + return +} diff --git a/parser_bench_test.go b/parser_bench_test.go new file mode 100644 index 0000000..4e3097d --- /dev/null +++ b/parser_bench_test.go @@ -0,0 +1,39 @@ +package gonx + +import ( + "testing" +) + +func benchLogParsing(b *testing.B, format string, line string) { + parser := NewParser(format) + + // Ensure the string is in valid format + _, err := parser.ParseString(line) + if err != nil { + b.Error(err) + b.Fail() + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + parser.ParseString(line) + } +} + +func BenchmarkParseSimpleLogRecord(b *testing.B) { + format := "$remote_addr [$time_local] \"$request\"" + line := `89.234.89.123 [08/Nov/2013:13:39:18 +0000] "GET /api/foo/bar HTTP/1.1"` + benchLogParsing(b, format, line) +} + +func BenchmarkParseLogRecord(b *testing.B) { + format := `$remote_addr - $remote_user [$time_local] "$request" $status ` + + `$body_bytes_sent "$http_referer" "$http_user_agent" "$http_x_forwarded_for" ` + + `"$cookie_uid" "$cookie_userid" "$request_time" "$http_host" "$is_ajax" ` + + `"$uid_got/$uid_set" "$msec" "$geoip_country_code"` + line := `**.***.**.*** - - [08/Nov/2013:13:39:18 +0000] ` + + `"GET /api/internal/v2/item/1?lang=en HTTP/1.1" 200 142 "http://example.com" ` + + `"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/30.0.1599.101 Safari/537.36" ` + + `"-" "-" "-" "0.084" "example.com" "ajax" "-/-" "1383917958.587" "-"` + benchLogParsing(b, format, line) +} diff --git a/parser_test.go b/parser_test.go new file mode 100644 index 0000000..b44051a --- /dev/null +++ b/parser_test.go @@ -0,0 +1,72 @@ +package gonx + +import ( + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + "strings" + "testing" +) + +type ParserTestSuite struct { + suite.Suite + format string + parser *Parser +} + +func (suite *ParserTestSuite) SetupTest() { + suite.format = "$remote_addr [$time_local] \"$request\"" + suite.parser = NewParser(suite.format) +} + +func TestParserTestSuite(t *testing.T) { + suite.Run(t, new(ParserTestSuite)) +} + +func (suite *ParserTestSuite) TestFormatSaved() { + assert.Equal(suite.T(), suite.parser.format, suite.format) +} + +func (suite *ParserTestSuite) TestRegexp() { + assert.Equal(suite.T(), + suite.parser.regexp.String(), + `^(?P[^ ]+) \[(?P[^]]+)\] "(?P[^"]+)"$`) +} + +func (suite *ParserTestSuite) TestParseString() { + line := `89.234.89.123 [08/Nov/2013:13:39:18 +0000] "GET /api/foo/bar HTTP/1.1"` + expected := Entry{ + "remote_addr": "89.234.89.123", + "time_local": "08/Nov/2013:13:39:18 +0000", + "request": "GET /api/foo/bar HTTP/1.1", + } + entry, err := suite.parser.ParseString(line) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), entry, expected) +} + +func (suite *ParserTestSuite) TestParseInvalidString() { + line := `GET /api/foo/bar HTTP/1.1` + _, err := suite.parser.ParseString(line) + assert.Error(suite.T(), err) + // TODO test empty value +} + +func TestNginxParser(t *testing.T) { + expected := "$remote_addr - $remote_user [$time_local] \"$request\" $status \"$http_referer\" \"$http_user_agent\"" + conf := strings.NewReader(` + http { + include conf/mime.types; + log_format minimal '$remote_addr [$time_local] "$request"'; + log_format main '$remote_addr - $remote_user [$time_local] ' + '"$request" $status ' + '"$http_referer" "$http_user_agent"'; + log_format download '$remote_addr - $remote_user [$time_local] ' + '"$request" $status $bytes_sent ' + '"$http_referer" "$http_user_agent" ' + '"$http_range" "$sent_http_content_range"'; + } + `) + parser, err := NewNginxParser(conf, "main") + assert.NoError(t, err) + assert.Equal(t, parser.format, expected) +} diff --git a/reader.go b/reader.go index 54f9166..293482d 100644 --- a/reader.go +++ b/reader.go @@ -1,117 +1,47 @@ package gonx import ( - "bufio" - "fmt" "io" - "regexp" ) +// Log file reader. Use specific constructors to create it. type Reader struct { - format string - re *regexp.Regexp - scanner *bufio.Scanner + file io.Reader + parser *Parser + entries chan Entry } +// Creates reader for custom log format. func NewReader(logFile io.Reader, format string) *Reader { return &Reader{ - format: format, - scanner: bufio.NewScanner(logFile), + file: logFile, + parser: NewParser(format), } } +// Creates reader for nginx log format. Nginx config parser will be used +// to get particular format from the conf file. func NewNginxReader(logFile io.Reader, nginxConf io.Reader, formatName string) (reader *Reader, err error) { - scanner := bufio.NewScanner(nginxConf) - re := regexp.MustCompile(fmt.Sprintf(`^.*log_format\s+%v\s+(.+)\s*$`, formatName)) - found := false - var format string - for scanner.Scan() { - var line string - if !found { - // Find a log_format definition - line = scanner.Text() - formatDef := re.FindStringSubmatch(line) - if formatDef == nil { - continue - } - found = true - line = formatDef[1] - } else { - line = scanner.Text() - } - // Look for a definition end - re = regexp.MustCompile(`^\s*(.*?)\s*(;|$)`) - lineSplit := re.FindStringSubmatch(line) - if l := len(lineSplit[1]); l > 2 { - format += lineSplit[1][1 : l-1] - } - if lineSplit[2] == ";" { - break - } + parser, err := NewNginxParser(nginxConf, formatName) + if err != nil { + return nil, err } - if !found { - err = fmt.Errorf("`log_format %v` not found in given config", formatName) - } else { - err = scanner.Err() + reader = &Reader{ + file: logFile, + parser: parser, } - reader = NewReader(logFile, format) return } -func (r *Reader) GetFormat() string { - return r.format -} - -func (r *Reader) GetFormatRegexp() *regexp.Regexp { - if r.re != nil { - return r.re +// Get next parsed Entry from the log file. Return EOF if there is no Entries to read. +func (r *Reader) Read() (entry Entry, err error) { + if r.entries == nil { + r.entries = MapReduce(r.file, r.parser, new(ReadAll)).(chan Entry) } - format := regexp.MustCompile(`\\\$([a-z_]+)(\\?(.))`).ReplaceAllString( - regexp.QuoteMeta(r.format), "(?P<$1>[^$3]+)$2") - r.re = regexp.MustCompile(fmt.Sprintf("^%v$", format)) - return r.re -} - -type Entry map[string]string - -func (entry *Entry) Get(name string) (value string, err error) { - value, ok := (*entry)[name] + // TODO return Entry reference instead of instance + entry, ok := <-r.entries if !ok { - err = fmt.Errorf("Field '%v' does not found in record %+v", name, *entry) - } - return -} - -// Read next line from log file, and return parsed record. If all lines read -// method return ni, io.EOF -func (r *Reader) Read() (record Entry, err error) { - if r.scanner.Scan() { - record, err = r.parseRecord(r.scanner.Text()) - } else { - err = r.scanner.Err() - if err == nil { - err = io.EOF - } - } - return -} - -func (r *Reader) parseRecord(line string) (record Entry, err error) { - // Parse line to fill map record. Return error if a line does not match given format - re := r.GetFormatRegexp() - fields := re.FindStringSubmatch(line) - if fields == nil { - err = fmt.Errorf("Access log line '%v' does not match given format '%v'", line, re) - return - } - - // Iterate over subexp foung and fill the map record - record = make(Entry) - for i, name := range re.SubexpNames() { - if i == 0 { - continue - } - record[name] = fields[i] + err = io.EOF } return } diff --git a/reader_test.go b/reader_test.go index 7b53467..fca5cef 100644 --- a/reader_test.go +++ b/reader_test.go @@ -3,85 +3,41 @@ package gonx import ( "github.com/stretchr/testify/assert" "io" - "reflect" "strings" "testing" ) -func TestGetFormatRegexp(t *testing.T) { - format := "$remote_addr [$time_local] \"$request\"" - reader := NewReader(strings.NewReader(""), format) - expected := `^(?P[^ ]+) \[(?P[^]]+)\] "(?P[^"]+)"$` - if re := reader.GetFormatRegexp(); re.String() != expected { - t.Errorf("Wrong RE '%v'", re) - } -} - -func TestGetRecord(t *testing.T) { +func TestRead(t *testing.T) { format := "$remote_addr [$time_local] \"$request\"" file := strings.NewReader(`89.234.89.123 [08/Nov/2013:13:39:18 +0000] "GET /api/foo/bar HTTP/1.1"`) reader := NewReader(file, format) + assert.Nil(t, reader.entries) + expected := Entry{ "remote_addr": "89.234.89.123", "time_local": "08/Nov/2013:13:39:18 +0000", "request": "GET /api/foo/bar HTTP/1.1", } - rec, err := reader.Read() - if err != nil { - t.Error(err) - } - if !reflect.DeepEqual(rec, expected) { - t.Errorf("Get invalid record %v", rec) - } - if _, err := reader.Read(); err != io.EOF { - t.Error("End of file expected") - } + + // Read entry from incoming channel + entry, err := reader.Read() + assert.NoError(t, err) + assert.Equal(t, entry, expected) + + // It was only one line, nothing to read + _, err = reader.Read() + assert.Equal(t, err, io.EOF) } func TestInvalidLineFormat(t *testing.T) { + t.Skip("Read method does not return errors anymore, because of asynchronios algorithm") format := "$remote_addr [$time_local] \"$request\"" file := strings.NewReader(`89.234.89.123 - - [08/Nov/2013:13:39:18 +0000] "GET /api/foo/bar HTTP/1.1"`) reader := NewReader(file, format) - if rec, err := reader.Read(); err == nil { - t.Errorf("Invalid record error expected, but get the record %+v", rec) - } -} -func TestReadLogFormatFromFile(t *testing.T) { - expected := "$remote_addr - $remote_user [$time_local] \"$request\" $status \"$http_referer\" \"$http_user_agent\"" - conf := strings.NewReader(` - http { - include conf/mime.types; - log_format minimal '$remote_addr [$time_local] "$request"'; - log_format main '$remote_addr - $remote_user [$time_local] ' - '"$request" $status ' - '"$http_referer" "$http_user_agent"'; - log_format download '$remote_addr - $remote_user [$time_local] ' - '"$request" $status $bytes_sent ' - '"$http_referer" "$http_user_agent" ' - '"$http_range" "$sent_http_content_range"'; - } - `) - file := strings.NewReader("") - reader, err := NewNginxReader(file, conf, "main") - if err != nil { - t.Error(err) - } - if format := reader.GetFormat(); format != expected { - t.Errorf("Wrong format was read from conf file \n%v\nExpected\n%v", format, expected) - } -} - -func TestEntry(t *testing.T) { - entry := Entry{"foo": "1"} - - // Get existings field - val, err := entry.Get("foo") - assert.NoError(t, err) - assert.Equal(t, val, "1") + // Invalid entries do not go to the entries channel, so nothing to read + _, err := reader.Read() + assert.Equal(t, err, io.EOF) - // Get field that does not exist - val, err = entry.Get("bar") - assert.Error(t, err) - assert.Equal(t, val, "") + // TODO test Reader internal error handling } diff --git a/reducer.go b/reducer.go new file mode 100644 index 0000000..04e5689 --- /dev/null +++ b/reducer.go @@ -0,0 +1,24 @@ +package gonx + +// Reducer interface for Entries channel redure. +// +// Each Reduce method should accept input channel of Entries, do it's job and +// the result should be written to the output channel. +// +// It does not return values because usually it runs in a separate +// goroutine and it is handy to use channel for reduced data retrieval. +type Reducer interface { + Reduce(input chan Entry, output chan interface{}) +} + +// Implements Reducer interface for simple input entries redirection to +// the output channel. +type ReadAll struct { +} + +// Redirect input Entries channel directly to the output without any +// modifications. It is useful when you want jast to read file fast +// using asynchronous with mapper routines. +func (r *ReadAll) Reduce(input chan Entry, output chan interface{}) { + output <- input +} diff --git a/reducer_test.go b/reducer_test.go new file mode 100644 index 0000000..8abd33a --- /dev/null +++ b/reducer_test.go @@ -0,0 +1,24 @@ +package gonx + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestReadAllReducer(t *testing.T) { + reducer := new(ReadAll) + assert.Implements(t, (*Reducer)(nil), reducer) + + // Prepare import chanel + input := make(chan Entry, 1) + input <- Entry{} + + output := make(chan interface{}, 1) // Make it buffered to avoid deadlock + reducer.Reduce(input, output) + + // ReadAll reducer writes input channel to the output + result, opened := <-output + assert.True(t, opened) + _, ok := result.(chan Entry) + assert.True(t, ok) +}