Skip to content

Commit

Permalink
Bugfix: The delay() plugin could lose events (#3973)
Browse files Browse the repository at this point in the history
If the query was terminated the delay() plugin could lose events. Also
the previous code could crash with "send on close channel"
  • Loading branch information
scudette authored Dec 17, 2024
1 parent 2aafa18 commit 5a09fe8
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 22 deletions.
2 changes: 2 additions & 0 deletions artifacts/testdata/server/testcases/delay.in.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Queries:
- SELECT * FROM delay(query={ SELECT "Hello" FROM scope() }, delay=1)
5 changes: 5 additions & 0 deletions artifacts/testdata/server/testcases/delay.out.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
SELECT * FROM delay(query={ SELECT "Hello" FROM scope() }, delay=1)[
{
"\"Hello\"": "Hello"
}
]
9 changes: 9 additions & 0 deletions services/repository/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ func (self *ArtifactRepositoryPlugin) Call(
precondition = scope.Bool(precondition_any)
}

// Allow the args to specify a ** kw style args.
kwargs_any, pres := args.Get("**")
if pres {
kwargs, ok := kwargs_any.(*ordereddict.Dict)
if ok {
args = kwargs
}
}

acl_manager, ok := artifacts.GetACLManager(scope)
if !ok {
acl_manager = acl_managers.NullACLManager{}
Expand Down
3 changes: 2 additions & 1 deletion vql/sigma/logsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (self *LogSourceProviderAssociative) Associative(scope vfilter.Scope, a vfi
return vfilter.Null{}, false
}

return res, true
return vfilter.FormatToString(scope, res), true
}

type LogSourcesFunction struct{}
Expand Down Expand Up @@ -119,6 +119,7 @@ func (self LogSourcesFunction) Info(scope vfilter.Scope, type_map *vfilter.TypeM
Name: "sigma_log_sources",
Doc: "Constructs a Log sources object to be used in sigma rules. Call with args being category/product/service and values being stored queries. You may use a * as a placeholder for any of these fields.",
FreeFormArgs: true,
Version: 2,
}
}

Expand Down
51 changes: 51 additions & 0 deletions vql/sigma/logsource_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package sigma

import (
"context"
"log"
"os"

"github.com/Velocidex/ordereddict"
"www.velocidex.com/golang/velociraptor/json"
vql_subsystem "www.velocidex.com/golang/velociraptor/vql"
"www.velocidex.com/golang/velociraptor/vtesting/assert"
"www.velocidex.com/golang/vfilter"

// For items plugin
_ "www.velocidex.com/golang/velociraptor/vql/common"
_ "www.velocidex.com/golang/velociraptor/vql/golang"
)

func (self *SigmaTestSuite) TestLogSourceIterator() {
ctx := context.Background()

scope := vql_subsystem.MakeScope()
scope.SetLogger(log.New(os.Stdout, "", 0))
defer scope.Close()

queries := []string{
"LET X <= sigma_log_sources(`*/windows/application`={SELECT * FROM info()})",
`
SELECT * FROM foreach(
row={
SELECT _value FROM items(item=X)
}, query={
SELECT typeof(a=_value), _value FROM scope()
})
`,
}

results := ordereddict.NewDict()
for _, query := range queries {
rows := []vfilter.Row{}
vql, err := vfilter.Parse(query)
assert.NoError(self.T(), err)

for row := range vql.Eval(ctx, scope) {
rows = append(rows, row)
}
results.Set(query, rows)
}

json.Dump(results)
}
65 changes: 44 additions & 21 deletions vql/tools/delay.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ package tools

import (
"context"
"sync"
"time"

"github.com/Velocidex/ordereddict"
Expand All @@ -56,13 +57,17 @@ func (self DelayPlugin) Call(ctx context.Context,
scope vfilter.Scope,
args *ordereddict.Dict) <-chan vfilter.Row {
output_chan := make(chan vfilter.Row)
sub_ctx, cancel := context.WithCancel(ctx)

go func() {
defer close(output_chan)
defer vql_subsystem.RegisterMonitor("delay", args)()
defer cancel()

wg := &sync.WaitGroup{}
defer wg.Wait()

arg := &DelayPluginArgs{}
err := arg_parser.ExtractArgsWithContext(ctx, scope, args, arg)
err := arg_parser.ExtractArgsWithContext(sub_ctx, scope, args, arg)
if err != nil {
scope.Log("delay: %v", err)
return
Expand All @@ -77,26 +82,32 @@ func (self DelayPlugin) Call(ctx context.Context,
}

buffer := make(chan *container, arg.Size)
defer close(buffer)

// This routine pumps data from the buffer to the output_chan
wg.Add(1)
go func() {
defer wg.Done()
defer close(output_chan)

for {
select {
case <-ctx.Done():
case <-sub_ctx.Done():
return

case row_container, ok := <-buffer:
if !ok {
return
}

now := time.Now()
now := utils.GetTime().Now()

if row_container.due.After(now) {
// Wait until it is time.
utils.SleepWithCtx(ctx, row_container.due.Sub(now))
utils.SleepWithCtx(sub_ctx, row_container.due.Sub(now))
}

select {
case <-ctx.Done():
case <-sub_ctx.Done():
return

case output_chan <- row_container.row:
Expand All @@ -105,25 +116,37 @@ func (self DelayPlugin) Call(ctx context.Context,
}
}()

row_chan := arg.Query.Eval(ctx, scope)
for {
select {
case row, ok := <-row_chan:
if !ok {
return
}
// This routine pumps data from the input query to the buffer.
wg.Add(1)
go func() {
defer wg.Done()
defer close(buffer)

event := &container{
row: row,
due: time.Now().Add(time.Second * time.Duration(arg.DelaySec)),
}
delay := time.Second * time.Duration(arg.DelaySec)

row_chan := arg.Query.Eval(sub_ctx, scope)
for {
select {
case <-ctx.Done():
case <-sub_ctx.Done():
return
case buffer <- event:

case row, ok := <-row_chan:
if !ok {
return
}

event := &container{
row: row,
due: utils.GetTime().Now().Add(delay),
}
select {
case <-sub_ctx.Done():
return
case buffer <- event:
}
}
}
}
}()
}()

return output_chan
Expand Down

0 comments on commit 5a09fe8

Please sign in to comment.