From d4f76ecf95609483d2885d2b6c4fc1de630a5350 Mon Sep 17 00:00:00 2001 From: Markus Stenberg Date: Fri, 14 Jun 2024 12:32:25 +0300 Subject: [PATCH] Add canned log handling for unittesting main, some cleanup related to it --- data/db.go | 158 ++++++++++++------------------------------ data/db_test.go | 40 +++++++++-- data/log_rule_test.go | 29 ++++++++ data/loki.go | 24 ------- data/source_array.go | 27 ++++++++ data/source_loki.go | 126 +++++++++++++++++++++++++++++++++ data/util.go | 20 ++++++ log_list.go | 17 +++-- log_rule.go | 5 +- main.go | 30 ++++---- main.templ | 2 +- main_templ.go | 4 +- main_test.go | 18 ++--- testdata/db.json | 25 +++++++ testdata/logs.json | 4 ++ 15 files changed, 346 insertions(+), 183 deletions(-) create mode 100644 data/log_rule_test.go delete mode 100644 data/loki.go create mode 100644 data/source_array.go create mode 100644 data/source_loki.go create mode 100644 testdata/db.json create mode 100644 testdata/logs.json diff --git a/data/db.go b/data/db.go index 85af899..87f0ea1 100644 --- a/data/db.go +++ b/data/db.go @@ -13,24 +13,17 @@ package data import ( "bytes" - "cmp" "encoding/json" "errors" - "fmt" - "io" - "net/http" - "net/url" "os" "slices" - "strconv" "sync" "github.com/sourcegraph/conc/iter" ) -type DatabaseConfig struct { - LokiServer string - LokiSelector string +type LogSource interface { + Load() ([]*Log, error) } type LogRules struct { @@ -45,33 +38,35 @@ type LogRules struct { } type Database struct { - // Config is assumed to be immutable; rules and logs are not + // This mutex guards log rules and logs; configuration is assumed to be static sync.Mutex - config DatabaseConfig + // Following are essentially configuration - LogRules *LogRules + // Where is this file saved + Path string `json:"-"` + Source LogSource `json:"-"` + + LogRules LogRules logs []*Log // next id to be added state for rules nextID int - - // Where is this file saved - path string } var ( ErrHashNotFound = errors.New("specified hash not found") ErrRuleNotFound = errors.New("specified rule not found") + ErrNoSource = errors.New("source has not been specified") ) -func NewLogRules(rules []*LogRule, version int) *LogRules { +func NewLogRules(rules []*LogRule, version int) LogRules { count := len(rules) reversed := make([]*LogRule, count) for k, v := range rules { reversed[count-k-1] = v } - return &LogRules{Rules: rules, Reversed: reversed, Version: version} + return LogRules{Rules: rules, Reversed: reversed, Version: version} } func (self *Database) add(r LogRule) error { @@ -133,94 +128,36 @@ func (self *Database) nextLogRuleID() int { return id } -func (self *Database) retrieveLogs(start int64) ([]*Log, error) { - logs := []*Log{} - - base := self.config.LokiServer + "/loki/api/v1/query_range" - v := url.Values{} - v.Set("query", self.config.LokiSelector) - // v.Set("direction", "backward") - v.Set("limit", "5000") - if start > 0 { - v.Set("start", strconv.FormatInt(start, 10)) +func (self *Database) updateLogs() error { + if self.Source == nil { + return ErrNoSource } - - resp, err := http.Get(base + "?" + v.Encode()) + logs, err := self.Source.Load() if err != nil { - return nil, err - } - - if resp.StatusCode > 299 { - return nil, fmt.Errorf("Invalid result from Loki - status code %d", resp.StatusCode) + return err } + self.addLogsToCounts(logs) + self.logs = append(logs, self.logs...) + return nil +} - body, err := io.ReadAll(resp.Body) - if err != nil { - return nil, err - } - resp.Body.Close() +func (self *Database) Logs() ([]*Log, error) { + self.Lock() + defer self.Unlock() - var result LokiQueryResult - err = json.Unmarshal(body, &result) + err := self.updateLogs() if err != nil { return nil, err } - - status := result.Status - if status != "success" { - return nil, fmt.Errorf("invalid status from Loki:%s", status) - } - - rtype := result.Data.ResultType - if rtype != "streams" { - return nil, fmt.Errorf("invalid result type from Loki:%s", rtype) - } - for _, result := range result.Data.Result { - for _, value := range result.Values { - timestamp, err := strconv.ParseInt(value[0], 10, 64) - if err != nil { - return nil, err - } - logs = append(logs, NewLog(timestamp, result.Stream, value[1])) - } - } - - // Loki output is by metric/stream and then by time; we don't - // really care, sort by timestamp desc. This may need to be - // rethought when we fetch more than just the latest - slices.SortFunc(logs, func(a, b *Log) int { - return -cmp.Compare(a.Timestamp, b.Timestamp) - }) - - return logs, nil + return self.logs, nil } -func (self *Database) updateLogs() { - start := int64(0) - if len(self.logs) > 0 { - // TODO would be better to get same timestamp + eliminate if it is same entry - start = self.logs[0].Timestamp + 1 - } - logs, err := self.retrieveLogs(start) +func (self *Database) LogCount() int { + logs, err := self.Logs() if err != nil { - fmt.Printf("Error retrieving logs from Loki: %s\n", err.Error()) - } - for i, log := range logs { - if log.Timestamp <= start { - logs = logs[:i] - break - } + return -1 } - self.addLogsToCounts(logs) - self.logs = append(logs, self.logs...) -} - -func (self *Database) Logs() []*Log { - self.Lock() - defer self.Unlock() - - self.updateLogs() - return self.logs + return len(logs) } func (self *Database) getLogByHashUnlocked(hash uint64) *Log { @@ -263,14 +200,13 @@ func (self *Database) ClassifyHash(hash uint64, ham bool) error { } func (self *Database) save(rules []*LogRule) error { - lrules := NewLogRules(rules, self.LogRules.Version+1) - self.LogRules = lrules + self.LogRules = NewLogRules(rules, self.LogRules.Version+1) b, err := json.Marshal(self) if err != nil { return err } - temp := self.path + ".tmp" + temp := self.Path + ".tmp" f, err := os.OpenFile(temp, os.O_CREATE|os.O_WRONLY, 0o644) if err != nil { return err @@ -285,7 +221,7 @@ func (self *Database) save(rules []*LogRule) error { if err != nil { return err } - err = os.Rename(temp, self.path) + err = os.Rename(temp, self.Path) if err != nil { return err } @@ -294,7 +230,7 @@ func (self *Database) save(rules []*LogRule) error { } func (self *Database) addLogsToCounts(logs []*Log) { - lrules := self.LogRules + lrules := &self.LogRules r2c := lrules.rid2Count if r2c == nil { return @@ -314,10 +250,13 @@ func (self *Database) RuleCount(rid int) int { // Trigger logs refresh only if we have nothing in cache if self.logs == nil { - self.updateLogs() + err := self.updateLogs() + if err != nil { + return -1 + } } - lrules := self.LogRules + lrules := &self.LogRules if lrules.rid2Count == nil { r2c := make(map[int]int, len(lrules.Rules)) @@ -330,22 +269,13 @@ func (self *Database) RuleCount(rid int) int { return lrules.rid2Count[rid] } -func NewDatabaseFromFile(config DatabaseConfig, path string) (db *Database, err error) { - db = &Database{config: config, path: path, LogRules: &LogRules{}} - f, err := os.Open(path) - if err != nil { - return - } - data, err := io.ReadAll(f) +func (self *Database) Load() error { + err := UnmarshalJSONFromPath(self, self.Path) if err != nil { - return + return err } - err = json.Unmarshal(data, db) - if err != nil { - return - } // Recreate to have also reverse slice - db.LogRules = NewLogRules(db.LogRules.Rules, db.LogRules.Version) - return + self.LogRules = NewLogRules(self.LogRules.Rules, self.LogRules.Version) + return nil } diff --git a/data/db_test.go b/data/db_test.go index b2935fb..46cfe4c 100644 --- a/data/db_test.go +++ b/data/db_test.go @@ -4,8 +4,8 @@ * Copyright (c) 2024 Markus Stenberg * * Created: Mon Jun 3 07:40:40 2024 mstenber - * Last modified: Sun Jun 9 20:45:11 2024 mstenber - * Edit time: 9 min + * Last modified: Fri Jun 14 12:30:40 2024 mstenber + * Edit time: 18 min * */ @@ -19,13 +19,41 @@ import ( ) func TestDatabase(t *testing.T) { + // Data source - static one, with exactly two log entries + log := Log{} + log2 := Log{} + arr := ArraySource{Data: []*Log{&log, &log2}, Chunk: 1} + path := "test_db.json" _ = os.Remove(path) - db, err := NewDatabaseFromFile(DatabaseConfig{}, path) + + db := Database{Path: path, Source: &arr} + err := db.Load() assert.Assert(t, err != nil) + // Add rule err = db.Add(LogRule{}) assert.Equal(t, err, nil) + assert.Equal(t, db.nextLogRuleID(), 2) + + // Ensure the logs are available + logs, err := db.Logs() + assert.Equal(t, len(logs), 1) + assert.Equal(t, err, nil) + + // Ensure they also match (only one fetched now) + assert.Equal(t, db.RuleCount(1), 1) + + // Fetch more + logs, err = db.Logs() + assert.Equal(t, len(logs), 2) + assert.Equal(t, err, nil) + assert.Equal(t, db.RuleCount(1), 2) + + // Source is empty; ensure we're still ok + logs, err = db.Logs() + assert.Equal(t, len(logs), 2) + assert.Equal(t, err, nil) // Add another rule (using add-or-update API) err = db.AddOrUpdate(LogRule{}) @@ -40,7 +68,8 @@ func TestDatabase(t *testing.T) { assert.Equal(t, len(db.LogRules.Rules), 2) // Ensure save + load gave us something similar - db2, err := NewDatabaseFromFile(DatabaseConfig{}, path) + db2 := Database{Path: path} + err = db2.Load() assert.Equal(t, err, nil) assert.Equal(t, db2.LogRules.Rules[0].Ham, true) assert.Equal(t, len(db2.LogRules.Rules), 2) @@ -52,7 +81,4 @@ func TestDatabase(t *testing.T) { assert.Equal(t, db.LogRules.Rules[0].Ham, false) assert.Equal(t, db.Delete(1), ErrRuleNotFound) - - // bit worthless - should really test having proper logs within - assert.Equal(t, db.RuleCount(0), 0) } diff --git a/data/log_rule_test.go b/data/log_rule_test.go new file mode 100644 index 0000000..6641eeb --- /dev/null +++ b/data/log_rule_test.go @@ -0,0 +1,29 @@ +/* + * Author: Markus Stenberg + * + * Copyright (c) 2024 Markus Stenberg + * + * Created: Sun Jun 9 20:38:22 2024 mstenber + * Last modified: Sun Jun 9 20:42:07 2024 mstenber + * Edit time: 3 min + * + */ + +package data + +import ( + "testing" + + "gotest.tools/v3/assert" +) + +func TestLogRule(t *testing.T) { + lr := LogRule{Matchers: []LogFieldMatcher{ + {"source", "=", "dummysrc", nil}, + {"message", "=", "dummymessage", nil}, + }} + assert.Assert(t, lr.MatchesFTS("dummysrc")) + assert.Assert(t, lr.MatchesFTS("dummymessage")) + assert.Assert(t, !lr.MatchesFTS("dummynonexistent")) + assert.Equal(t, lr.SourceString(), "=dummysrc") +} diff --git a/data/loki.go b/data/loki.go deleted file mode 100644 index 795f838..0000000 --- a/data/loki.go +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Author: Markus Stenberg - * - * Copyright (c) 2024 Markus Stenberg - * - */ - -package data - -type LokiQueryResultDataResult struct { - Metric map[string]string `json:"metric"` - Stream map[string]string `json:"stream"` - Values [][]string `json:"values"` -} - -type LokiQueryResultData struct { - ResultType string `json:"resultType"` - Result []LokiQueryResultDataResult `json:"result"` -} - -type LokiQueryResult struct { - Status string `json:"status"` - Data *LokiQueryResultData `json:"data"` -} diff --git a/data/source_array.go b/data/source_array.go new file mode 100644 index 0000000..7c3cb87 --- /dev/null +++ b/data/source_array.go @@ -0,0 +1,27 @@ +/* + * Author: Markus Stenberg + * + * Copyright (c) 2024 Markus Stenberg + * + */ + +/* Static data source for testing */ + +package data + +type ArraySource struct { + Data []*Log + Chunk int + offset int +} + +func (self *ArraySource) Load() ([]*Log, error) { + ofs := self.offset + got := len(self.Data) + if self.offset >= got { + return []*Log{}, nil + } + end := min(got, ofs+self.Chunk) + self.offset = end + return self.Data[ofs:end], nil +} diff --git a/data/source_loki.go b/data/source_loki.go new file mode 100644 index 0000000..8914312 --- /dev/null +++ b/data/source_loki.go @@ -0,0 +1,126 @@ +/* + * Author: Markus Stenberg + * + * Copyright (c) 2024 Markus Stenberg + * + */ + +package data + +import ( + "cmp" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "slices" + "strconv" +) + +type LokiQueryResultDataResult struct { + Metric map[string]string `json:"metric"` + Stream map[string]string `json:"stream"` + Values [][]string `json:"values"` +} + +type LokiQueryResultData struct { + ResultType string `json:"resultType"` + Result []LokiQueryResultDataResult `json:"result"` +} + +type LokiQueryResult struct { + Status string `json:"status"` + Data *LokiQueryResultData `json:"data"` +} + +type LokiSource struct { + Server string + Selector string + + last *Log +} + +func (self *LokiSource) loadAfter(start int64) ([]*Log, error) { + logs := []*Log{} + + base := self.Server + "/loki/api/v1/query_range" + v := url.Values{} + v.Set("query", self.Selector) + // v.Set("direction", "backward") + v.Set("limit", "5000") + if start > 0 { + v.Set("start", strconv.FormatInt(start, 10)) + } + + resp, err := http.Get(base + "?" + v.Encode()) + if err != nil { + return nil, err + } + + if resp.StatusCode > 299 { + return nil, fmt.Errorf("Invalid result from Loki - status code %d", resp.StatusCode) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + resp.Body.Close() + + var result LokiQueryResult + err = json.Unmarshal(body, &result) + if err != nil { + return nil, err + } + + status := result.Status + if status != "success" { + return nil, fmt.Errorf("invalid status from Loki:%s", status) + } + + rtype := result.Data.ResultType + if rtype != "streams" { + return nil, fmt.Errorf("invalid result type from Loki:%s", rtype) + } + for _, result := range result.Data.Result { + for _, value := range result.Values { + timestamp, err := strconv.ParseInt(value[0], 10, 64) + if err != nil { + return nil, err + } + logs = append(logs, NewLog(timestamp, result.Stream, value[1])) + } + } + + // Loki output is by metric/stream and then by time; we don't + // really care, sort by timestamp desc. This may need to be + // rethought when we fetch more than just the latest + slices.SortFunc(logs, func(a, b *Log) int { + return -cmp.Compare(a.Timestamp, b.Timestamp) + }) + + return logs, nil +} + +func (self *LokiSource) Load() ([]*Log, error) { + start := int64(0) + if self.last != nil { + // TODO would be better to get same timestamp + eliminate if it is same entry + start = self.last.Timestamp + 1 + } + logs, err := self.loadAfter(start) + if err != nil { + return nil, err + } + for i, log := range logs { + if log.Timestamp <= start { + logs = logs[:i] + break + } + } + if len(logs) > 0 { + self.last = logs[0] + } + return logs, nil +} diff --git a/data/util.go b/data/util.go index c075bf1..2fb65f5 100644 --- a/data/util.go +++ b/data/util.go @@ -9,6 +9,9 @@ package data import ( "cmp" + "encoding/json" + "io" + "os" "slices" ) @@ -24,3 +27,20 @@ func SortedKeysWithFunc[K comparable, V any](m map[K]V, cmp func(a, b K) int) [] func SortedKeys[K cmp.Ordered, V any](m map[string]V) []string { return SortedKeysWithFunc(m, cmp.Compare) } + +func UnmarshalJSONFromPath(target any, path string) error { + f, err := os.Open(path) + if err != nil { + return err + } + data, err := io.ReadAll(f) + if err != nil { + return err + } + + err = json.Unmarshal(data, target) + if err != nil { + return err + } + return nil +} diff --git a/log_list.go b/log_list.go index b645065..32733bf 100644 --- a/log_list.go +++ b/log_list.go @@ -131,7 +131,7 @@ func (self *LogListModel) LogToRule(log *data.Log) *data.LogRule { if self.LogRules != nil { return data.LogToRule(log, self.LogRules) } - return log.ToRule(self.DB.LogRules) + return log.ToRule(&self.DB.LogRules) } func (self *LogListModel) LogVerdict(log *data.Log) int { @@ -139,12 +139,16 @@ func (self *LogListModel) LogVerdict(log *data.Log) int { return data.LogRuleToVerdict(rule) } -func (self *LogListModel) Filter() { +func (self *LogListModel) Filter() error { // Some spare capacity but who really cares logs := make([]*data.Log, 0, self.Limit) active := self.Config.BeforeHash == 0 count := 0 - allLogs := self.DB.Logs() + allLogs, err := self.DB.Logs() + if err != nil { + return err + } + allLogs = filterFTS(allLogs, self.Config.Global.Search, len(allLogs)) self.TotalCount = len(allLogs) for _, log := range allLogs { @@ -169,6 +173,7 @@ func (self *LogListModel) Filter() { } self.Logs = logs self.FilteredCount = count + return nil } func logListHandler(st State) http.Handler { @@ -186,7 +191,11 @@ func logListHandler(st State) http.Handler { return } model := LogListModel{Config: config, DB: st.DB, Limit: 20, Post: r.Method == "POST"} - model.Filter() + err = model.Filter() + if err != nil { + http.Error(w, err.Error(), 400) + return + } err = LogList(st, model).Render(r.Context(), w) if err != nil { http.Error(w, err.Error(), 400) diff --git a/log_rule.go b/log_rule.go index dd3895f..cd2cadc 100644 --- a/log_rule.go +++ b/log_rule.go @@ -98,7 +98,7 @@ func findMatchingLogs(db *data.Database, rule *data.LogRule) *LogListModel { EnableAccurateCounting: true, Limit: 5, } - m.Filter() + _ = m.Filter() return &m } @@ -107,9 +107,6 @@ func findMatchingOtherRules(db *data.Database, logs []*data.Log, skipRule *data. // matching is tricky). So we just show rules that out of the // box seem to overlap as they match the same rules. lrules := db.LogRules - if lrules == nil { - return nil - } rules := iter.Map(lrules.Rules, func(rulep **data.LogRule) *data.LogRule { rule := *rulep if rule == skipRule { diff --git a/main.go b/main.go index d797879..55b2d49 100644 --- a/main.go +++ b/main.go @@ -35,14 +35,6 @@ var boot = time.Now() // //go:embed all:static var embedContent embed.FS -func setupDatabase(config data.DatabaseConfig, path string) *data.Database { - db, err := data.NewDatabaseFromFile(config, path) - if err != nil { - fmt.Printf("Unable to read %s: %s", path, err.Error()) - } - return db -} - type mainConfig struct { RSSort int `cm:"rss"` } @@ -114,6 +106,7 @@ func run( address := flags.String("address", "127.0.0.1", "Address to listen at") lokiServer := flags.String("loki-server", "https://fw.fingon.iki.fi:3100", "Address of the Loki server") lokiSelector := flags.String("loki-selector", "{host=~\".+\"}", "Selector to use when querying logs from Loki") + arrayFile := flags.String("log-source-file", "", "Log file source") dbPath := flags.String("db", "db.json", "Database to use") dev := flags.Bool("dev", false, "Enable development mode") @@ -122,13 +115,24 @@ func run( return err } - config := data.DatabaseConfig{ - LokiServer: *lokiServer, - LokiSelector: *lokiSelector, + var source data.LogSource + if *arrayFile != "" { + arr := data.ArraySource{} + err := data.UnmarshalJSONFromPath(&arr, *arrayFile) + if err != nil { + return err + } + source = &arr + } else { + source = &data.LokiSource{Server: *lokiServer, Selector: *lokiSelector} + } + db := data.Database{Source: source, Path: *dbPath} + err := db.Load() + if err != nil { + return err } - db := setupDatabase(config, *dbPath) - state := State{DB: db, BuildTimestamp: ldBuildTimestamp} + state := State{DB: &db, BuildTimestamp: ldBuildTimestamp} if *dev { state.RefreshIntervalMs = 1000 } diff --git a/main.templ b/main.templ index 8116621..073a6c4 100644 --- a/main.templ +++ b/main.templ @@ -27,7 +27,7 @@ templ MainPage(st State, c mainConfig) { Log lines - { strconv.Itoa(len(st.DB.Logs())) } + { strconv.Itoa(st.DB.LogCount()) } diff --git a/main_templ.go b/main_templ.go index 539a442..1f52656 100644 --- a/main_templ.go +++ b/main_templ.go @@ -113,9 +113,9 @@ func MainPage(st State, c mainConfig) templ.Component { return templ_7745c5c3_Err } var templ_7745c5c3_Var9 string - templ_7745c5c3_Var9, templ_7745c5c3_Err = templ.JoinStringErrs(strconv.Itoa(len(st.DB.Logs()))) + templ_7745c5c3_Var9, templ_7745c5c3_Err = templ.JoinStringErrs(strconv.Itoa(st.DB.LogCount())) if templ_7745c5c3_Err != nil { - return templ.Error{Err: templ_7745c5c3_Err, FileName: `main.templ`, Line: 30, Col: 44} + return templ.Error{Err: templ_7745c5c3_Err, FileName: `main.templ`, Line: 30, Col: 43} } _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var9)) if templ_7745c5c3_Err != nil { diff --git a/main_test.go b/main_test.go index 588c54a..882e525 100644 --- a/main_test.go +++ b/main_test.go @@ -4,8 +4,8 @@ * Copyright (c) 2024 Markus Stenberg * * Created: Thu May 16 07:24:25 2024 mstenber - * Last modified: Thu May 16 08:43:58 2024 mstenber - * Edit time: 25 min + * Last modified: Fri Jun 14 12:12:29 2024 mstenber + * Edit time: 32 min * */ @@ -17,7 +17,6 @@ import ( "fmt" "log" "net/http" - "os" "strconv" "testing" "time" @@ -62,19 +61,10 @@ func TestMain(t *testing.T) { ctx, cancel := context.WithCancel(ctx) t.Cleanup(cancel) - f, err := os.CreateTemp("", "lixie-test-db-*.json") - if err != nil { - log.Fatal(err) - } - // TODO: Produce test data? - f.Close() - defer os.Remove(f.Name()) port := 18080 - // TODO: Produce some sort of Loki fake (or other way to - // ingest precanned input?) go func() { - err := run(ctx, []string{"lixie", "-port", strconv.Itoa(port), "-db", f.Name(), "-loki-server", "http://localhost:3100"}) + err := run(ctx, []string{"lixie", "-port", strconv.Itoa(port), "-db", "testdata/db.json", "-log-source-file", "testdata/logs.json"}) if err != nil { log.Panic(err) } @@ -83,7 +73,7 @@ func TestMain(t *testing.T) { ctx2, cancel2 := context.WithTimeout(ctx, 1*time.Second) t.Cleanup(cancel2) baseURL := fmt.Sprintf("http://localhost:%d", port) - err = waitForURL(ctx2, baseURL) + err := waitForURL(ctx2, baseURL) if err != nil { log.Panic(err) } diff --git a/testdata/db.json b/testdata/db.json new file mode 100644 index 0000000..f0bf7df --- /dev/null +++ b/testdata/db.json @@ -0,0 +1,25 @@ +{ + "LogRules": { + "Rules": [ + { + "ID": 4, + "Disabled": false, + "Ham": false, + "Matchers": [ + { + "Field": "message", + "Op": "=", + "Value": "Finished sysstat-collect.service - system activity accounting tool." + }, + { + "Field": "source", + "Op": "=", + "Value": "systemd" + } + ], + "Comment": "", + "Version": 0 + } + ] + } +} diff --git a/testdata/logs.json b/testdata/logs.json new file mode 100644 index 0000000..aea3dd6 --- /dev/null +++ b/testdata/logs.json @@ -0,0 +1,4 @@ +{ + "Data": [{}], + "Chunk": 1 +}