Skip to content

Commit

Permalink
Add canned log handling for unittesting main, some cleanup related to it
Browse files Browse the repository at this point in the history
  • Loading branch information
fingon committed Jun 14, 2024
1 parent 1761d8e commit d4f76ec
Show file tree
Hide file tree
Showing 15 changed files with 346 additions and 183 deletions.
158 changes: 44 additions & 114 deletions data/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,17 @@ package data

import (
"bytes"
"cmp"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
"slices"
"strconv"
"sync"

"github.com/sourcegraph/conc/iter"
)

type DatabaseConfig struct {
LokiServer string
LokiSelector string
type LogSource interface {
Load() ([]*Log, error)
}

type LogRules struct {
Expand All @@ -45,33 +38,35 @@ type LogRules struct {
}

type Database struct {
// Config is assumed to be immutable; rules and logs are not
// This mutex guards log rules and logs; configuration is assumed to be static
sync.Mutex

config DatabaseConfig
// Following are essentially configuration

LogRules *LogRules
// Where is this file saved
Path string `json:"-"`
Source LogSource `json:"-"`

LogRules LogRules
logs []*Log

// next id to be added state for rules
nextID int

// Where is this file saved
path string
}

var (
ErrHashNotFound = errors.New("specified hash not found")
ErrRuleNotFound = errors.New("specified rule not found")
ErrNoSource = errors.New("source has not been specified")
)

func NewLogRules(rules []*LogRule, version int) *LogRules {
func NewLogRules(rules []*LogRule, version int) LogRules {
count := len(rules)
reversed := make([]*LogRule, count)
for k, v := range rules {
reversed[count-k-1] = v
}
return &LogRules{Rules: rules, Reversed: reversed, Version: version}
return LogRules{Rules: rules, Reversed: reversed, Version: version}
}

func (self *Database) add(r LogRule) error {
Expand Down Expand Up @@ -133,94 +128,36 @@ func (self *Database) nextLogRuleID() int {
return id
}

func (self *Database) retrieveLogs(start int64) ([]*Log, error) {
logs := []*Log{}

base := self.config.LokiServer + "/loki/api/v1/query_range"
v := url.Values{}
v.Set("query", self.config.LokiSelector)
// v.Set("direction", "backward")
v.Set("limit", "5000")
if start > 0 {
v.Set("start", strconv.FormatInt(start, 10))
func (self *Database) updateLogs() error {
if self.Source == nil {
return ErrNoSource
}

resp, err := http.Get(base + "?" + v.Encode())
logs, err := self.Source.Load()
if err != nil {
return nil, err
}

if resp.StatusCode > 299 {
return nil, fmt.Errorf("Invalid result from Loki - status code %d", resp.StatusCode)
return err
}
self.addLogsToCounts(logs)
self.logs = append(logs, self.logs...)
return nil
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
resp.Body.Close()
func (self *Database) Logs() ([]*Log, error) {
self.Lock()
defer self.Unlock()

var result LokiQueryResult
err = json.Unmarshal(body, &result)
err := self.updateLogs()
if err != nil {
return nil, err
}

status := result.Status
if status != "success" {
return nil, fmt.Errorf("invalid status from Loki:%s", status)
}

rtype := result.Data.ResultType
if rtype != "streams" {
return nil, fmt.Errorf("invalid result type from Loki:%s", rtype)
}
for _, result := range result.Data.Result {
for _, value := range result.Values {
timestamp, err := strconv.ParseInt(value[0], 10, 64)
if err != nil {
return nil, err
}
logs = append(logs, NewLog(timestamp, result.Stream, value[1]))
}
}

// Loki output is by metric/stream and then by time; we don't
// really care, sort by timestamp desc. This may need to be
// rethought when we fetch more than just the latest
slices.SortFunc(logs, func(a, b *Log) int {
return -cmp.Compare(a.Timestamp, b.Timestamp)
})

return logs, nil
return self.logs, nil
}

func (self *Database) updateLogs() {
start := int64(0)
if len(self.logs) > 0 {
// TODO would be better to get same timestamp + eliminate if it is same entry
start = self.logs[0].Timestamp + 1
}
logs, err := self.retrieveLogs(start)
func (self *Database) LogCount() int {
logs, err := self.Logs()
if err != nil {
fmt.Printf("Error retrieving logs from Loki: %s\n", err.Error())
}
for i, log := range logs {
if log.Timestamp <= start {
logs = logs[:i]
break
}
return -1
}
self.addLogsToCounts(logs)
self.logs = append(logs, self.logs...)
}

func (self *Database) Logs() []*Log {
self.Lock()
defer self.Unlock()

self.updateLogs()
return self.logs
return len(logs)
}

func (self *Database) getLogByHashUnlocked(hash uint64) *Log {
Expand Down Expand Up @@ -263,14 +200,13 @@ func (self *Database) ClassifyHash(hash uint64, ham bool) error {
}

func (self *Database) save(rules []*LogRule) error {
lrules := NewLogRules(rules, self.LogRules.Version+1)
self.LogRules = lrules
self.LogRules = NewLogRules(rules, self.LogRules.Version+1)
b, err := json.Marshal(self)
if err != nil {
return err
}

temp := self.path + ".tmp"
temp := self.Path + ".tmp"
f, err := os.OpenFile(temp, os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
return err
Expand All @@ -285,7 +221,7 @@ func (self *Database) save(rules []*LogRule) error {
if err != nil {
return err
}
err = os.Rename(temp, self.path)
err = os.Rename(temp, self.Path)
if err != nil {
return err
}
Expand All @@ -294,7 +230,7 @@ func (self *Database) save(rules []*LogRule) error {
}

func (self *Database) addLogsToCounts(logs []*Log) {
lrules := self.LogRules
lrules := &self.LogRules
r2c := lrules.rid2Count
if r2c == nil {
return
Expand All @@ -314,10 +250,13 @@ func (self *Database) RuleCount(rid int) int {

// Trigger logs refresh only if we have nothing in cache
if self.logs == nil {
self.updateLogs()
err := self.updateLogs()
if err != nil {
return -1
}
}

lrules := self.LogRules
lrules := &self.LogRules

if lrules.rid2Count == nil {
r2c := make(map[int]int, len(lrules.Rules))
Expand All @@ -330,22 +269,13 @@ func (self *Database) RuleCount(rid int) int {
return lrules.rid2Count[rid]
}

func NewDatabaseFromFile(config DatabaseConfig, path string) (db *Database, err error) {
db = &Database{config: config, path: path, LogRules: &LogRules{}}
f, err := os.Open(path)
if err != nil {
return
}
data, err := io.ReadAll(f)
func (self *Database) Load() error {
err := UnmarshalJSONFromPath(self, self.Path)
if err != nil {
return
return err
}

err = json.Unmarshal(data, db)
if err != nil {
return
}
// Recreate to have also reverse slice
db.LogRules = NewLogRules(db.LogRules.Rules, db.LogRules.Version)
return
self.LogRules = NewLogRules(self.LogRules.Rules, self.LogRules.Version)
return nil
}
40 changes: 33 additions & 7 deletions data/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
* Copyright (c) 2024 Markus Stenberg
*
* Created: Mon Jun 3 07:40:40 2024 mstenber
* Last modified: Sun Jun 9 20:45:11 2024 mstenber
* Edit time: 9 min
* Last modified: Fri Jun 14 12:30:40 2024 mstenber
* Edit time: 18 min
*
*/

Expand All @@ -19,13 +19,41 @@ import (
)

func TestDatabase(t *testing.T) {
// Data source - static one, with exactly two log entries
log := Log{}
log2 := Log{}
arr := ArraySource{Data: []*Log{&log, &log2}, Chunk: 1}

path := "test_db.json"
_ = os.Remove(path)
db, err := NewDatabaseFromFile(DatabaseConfig{}, path)

db := Database{Path: path, Source: &arr}
err := db.Load()
assert.Assert(t, err != nil)

// Add rule
err = db.Add(LogRule{})
assert.Equal(t, err, nil)
assert.Equal(t, db.nextLogRuleID(), 2)

// Ensure the logs are available
logs, err := db.Logs()
assert.Equal(t, len(logs), 1)
assert.Equal(t, err, nil)

// Ensure they also match (only one fetched now)
assert.Equal(t, db.RuleCount(1), 1)

// Fetch more
logs, err = db.Logs()
assert.Equal(t, len(logs), 2)
assert.Equal(t, err, nil)
assert.Equal(t, db.RuleCount(1), 2)

// Source is empty; ensure we're still ok
logs, err = db.Logs()
assert.Equal(t, len(logs), 2)
assert.Equal(t, err, nil)

// Add another rule (using add-or-update API)
err = db.AddOrUpdate(LogRule{})
Expand All @@ -40,7 +68,8 @@ func TestDatabase(t *testing.T) {
assert.Equal(t, len(db.LogRules.Rules), 2)

// Ensure save + load gave us something similar
db2, err := NewDatabaseFromFile(DatabaseConfig{}, path)
db2 := Database{Path: path}
err = db2.Load()
assert.Equal(t, err, nil)
assert.Equal(t, db2.LogRules.Rules[0].Ham, true)
assert.Equal(t, len(db2.LogRules.Rules), 2)
Expand All @@ -52,7 +81,4 @@ func TestDatabase(t *testing.T) {
assert.Equal(t, db.LogRules.Rules[0].Ham, false)

assert.Equal(t, db.Delete(1), ErrRuleNotFound)

// bit worthless - should really test having proper logs within
assert.Equal(t, db.RuleCount(0), 0)
}
29 changes: 29 additions & 0 deletions data/log_rule_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Author: Markus Stenberg <[email protected]>
*
* Copyright (c) 2024 Markus Stenberg
*
* Created: Sun Jun 9 20:38:22 2024 mstenber
* Last modified: Sun Jun 9 20:42:07 2024 mstenber
* Edit time: 3 min
*
*/

package data

import (
"testing"

"gotest.tools/v3/assert"
)

func TestLogRule(t *testing.T) {
lr := LogRule{Matchers: []LogFieldMatcher{
{"source", "=", "dummysrc", nil},
{"message", "=", "dummymessage", nil},
}}
assert.Assert(t, lr.MatchesFTS("dummysrc"))
assert.Assert(t, lr.MatchesFTS("dummymessage"))
assert.Assert(t, !lr.MatchesFTS("dummynonexistent"))
assert.Equal(t, lr.SourceString(), "=dummysrc")
}
24 changes: 0 additions & 24 deletions data/loki.go

This file was deleted.

Loading

0 comments on commit d4f76ec

Please sign in to comment.