Skip to content

Commit

Permalink
Update flow index when importing a flow (#3240)
Browse files Browse the repository at this point in the history
Fixes: #3192
  • Loading branch information
scudette authored Jan 24, 2024
1 parent 5db9bc4 commit 19a0638
Show file tree
Hide file tree
Showing 15 changed files with 139 additions and 57 deletions.
4 changes: 2 additions & 2 deletions artifacts/definitions/Linux/Sys/LastUserLogin.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ parameters:
- name: recent_x_days
default: 14
default: 100000
type: int
description: |
show all logs within the last X days (default 14 days)
Expand All @@ -46,7 +46,7 @@ export: |
["records", 0, "Array", {
"type": "utmp",
"count": "x=>MaxCount",
"max_count": "x=>MaxCount",
"max_count": 100000,
}],
]],
["utmp", 384, [
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Queries:
# Client IDs are random so we sanitize them in the results below.
- LET Sanitize(X) = regex_replace(re="[CF]\\.[0-9a-z]+", replace="C.ID", source=X)
- LET Sanitize(X) = regex_replace(re="[C]\\.[0-9a-z]+", replace="C.ID", source=X)

# Import a collection once
- LET X <= SELECT * FROM Artifact.Server.Utils.ImportCollection(
Expand Down
50 changes: 40 additions & 10 deletions artifacts/testdata/server/testcases/import_collection.out.yaml
Original file line number Diff line number Diff line change
@@ -1,33 +1,63 @@
LET Sanitize(X) = regex_replace(re="[CF]\\.[0-9a-z]+", replace="C.ID", source=X)[]LET X <= SELECT * FROM Artifact.Server.Utils.ImportCollection( Hostname="FooBarHost", Path=srcDir+"/vql/tools/collector/fixtures/import.zip")[]LET Y <= SELECT * FROM Artifact.Server.Utils.ImportCollection( Hostname="FooBarHost", Path=srcDir+"/vql/tools/collector/fixtures/import.zip")[]SELECT X[0].ClientId = Y[0].ClientId AS ClientIdEqual FROM scope()[
LET Sanitize(X) = regex_replace(re="[C]\\.[0-9a-z]+", replace="C.ID", source=X)[]LET X <= SELECT * FROM Artifact.Server.Utils.ImportCollection( Hostname="FooBarHost", Path=srcDir+"/vql/tools/collector/fixtures/import.zip")[]LET Y <= SELECT * FROM Artifact.Server.Utils.ImportCollection( Hostname="FooBarHost", Path=srcDir+"/vql/tools/collector/fixtures/import.zip")[]SELECT X[0].ClientId = Y[0].ClientId AS ClientIdEqual FROM scope()[
{
"ClientIdEqual": true
}
]SELECT Sanitize(X=vfs_path) AS vfs_path FROM Artifact.Server.Utils.DeleteClient( ClientIdList=[X[0].ClientId, Y[0].ClientId], ReallyDoIt=TRUE) WHERE NOT vfs_path =~ "monitoring" Order by vfs_path[
{
"vfs_path": "/clients/C.ID/artifacts/Linux.Search.FileFinder/C.ID.json"
"vfs_path": "/clients/C.ID/artifacts/Linux.Search.FileFinder/F.1.json"
},
{
"vfs_path": "/clients/C.ID/artifacts/Linux.Search.FileFinder/C.ID.json.index"
"vfs_path": "/clients/C.ID/artifacts/Linux.Search.FileFinder/F.1.json.index"
},
{
"vfs_path": "/clients/C.ID/collections/C.ID.json.db"
"vfs_path": "/clients/C.ID/artifacts/Linux.Search.FileFinder/F.CD62UJJO259A4.json"
},
{
"vfs_path": "/clients/C.ID/collections/C.ID/logs.json"
"vfs_path": "/clients/C.ID/artifacts/Linux.Search.FileFinder/F.CD62UJJO259A4.json.index"
},
{
"vfs_path": "/clients/C.ID/collections/C.ID/logs.json.index"
"vfs_path": "/clients/C.ID/collections/F.1.json.db"
},
{
"vfs_path": "/clients/C.ID/collections/C.ID/task.db"
"vfs_path": "/clients/C.ID/collections/F.1/logs.json"
},
{
"vfs_path": "/clients/C.ID/collections/C.ID/uploads.json"
"vfs_path": "/clients/C.ID/collections/F.1/logs.json.index"
},
{
"vfs_path": "/clients/C.ID/collections/C.ID/uploads.json.index"
"vfs_path": "/clients/C.ID/collections/F.1/uploads.json"
},
{
"vfs_path": "/clients/C.ID/collections/C.ID/uploads/file/tmp/\"ls\\with\\back:slash\""
"vfs_path": "/clients/C.ID/collections/F.1/uploads.json.index"
},
{
"vfs_path": "/clients/C.ID/collections/F.1/uploads/file/tmp/\"ls\\with\\back:slash\""
},
{
"vfs_path": "/clients/C.ID/collections/F.CD62UJJO259A4.json.db"
},
{
"vfs_path": "/clients/C.ID/collections/F.CD62UJJO259A4/logs.json"
},
{
"vfs_path": "/clients/C.ID/collections/F.CD62UJJO259A4/logs.json.index"
},
{
"vfs_path": "/clients/C.ID/collections/F.CD62UJJO259A4/task.db"
},
{
"vfs_path": "/clients/C.ID/collections/F.CD62UJJO259A4/uploads.json"
},
{
"vfs_path": "/clients/C.ID/collections/F.CD62UJJO259A4/uploads.json.index"
},
{
"vfs_path": "/clients/C.ID/collections/F.CD62UJJO259A4/uploads/file/tmp/\"ls\\with\\back:slash\""
},
{
"vfs_path": "/clients/C.ID/flow_index.json"
},
{
"vfs_path": "/clients/C.ID/flow_index.json.index"
}
]
3 changes: 3 additions & 0 deletions bin/golden.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ func makeCtxWithTimeout(
func runTest(fixture *testFixture, sm *services.Service,
config_obj *config_proto.Config) (string, error) {

gen := utils.IncrementalFlowIdGenerator(0)
utils.SetFlowIdGenerator(&gen)

// Freeze the time for consistent golden tests Monday, May 31, 2020 3:28:05 PM
closer := utils.MockTime(utils.NewMockClock(time.Unix(1590938885, 10)))
defer closer()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ require (
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.2.0
gopkg.in/yaml.v3 v3.0.1
software.sslmate.com/src/go-pkcs12 v0.2.0
www.velocidex.com/golang/vtypes v0.0.0-20231115033415-7856327477d2
www.velocidex.com/golang/vtypes v0.0.0-20240123105603-069d4a7f435c
)

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1299,5 +1299,5 @@ www.velocidex.com/golang/regparser v0.0.0-20221020153526-bbc758cbd18b h1:NrnjFXw
www.velocidex.com/golang/regparser v0.0.0-20221020153526-bbc758cbd18b/go.mod h1:pxSECT5mWM3goJ4sxB4HCJNKnKqiAlpyT8XnvBwkLGU=
www.velocidex.com/golang/vfilter v0.0.0-20231014062339-d62b5a5877d2 h1:lYCetJ0TDKrYSqDhLTfZN3LyBRupx0B1XVcVy3rIw5E=
www.velocidex.com/golang/vfilter v0.0.0-20231014062339-d62b5a5877d2/go.mod h1:4mDQuvnVu6oPvDu/rZm8eYXh0h8mM7j9CJpj1nRfu8g=
www.velocidex.com/golang/vtypes v0.0.0-20231115033415-7856327477d2 h1:1hP5bpeUxXW2J2+Xlipqyb653xaal7dlOXxFU4+mnNs=
www.velocidex.com/golang/vtypes v0.0.0-20231115033415-7856327477d2/go.mod h1:tjaJNlBWbvH4cEMrEu678CFR2hrtcdyPINIpRxrOh4U=
www.velocidex.com/golang/vtypes v0.0.0-20240123105603-069d4a7f435c h1:rL/It+Ig+mvIhmy9vl5gg5b6CX2J12x0v2SXIT2RoWE=
www.velocidex.com/golang/vtypes v0.0.0-20240123105603-069d4a7f435c/go.mod h1:tjaJNlBWbvH4cEMrEu678CFR2hrtcdyPINIpRxrOh4U=
28 changes: 2 additions & 26 deletions services/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,8 @@ package launcher

import (
"context"
"crypto/rand"
"encoding/base32"
"encoding/binary"
"fmt"
"sync"
"time"

"github.com/go-errors/errors"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -610,7 +606,7 @@ func (self *Launcher) WriteArtifactCollectionRecord(

session_id := collector_request.FlowId
if session_id == "" {
session_id = NewFlowId(client_id)
session_id = utils.NewFlowId(client_id)
}

// How long to batch log messages for on the client.
Expand Down Expand Up @@ -759,27 +755,7 @@ func addOrReplaceParameter(
}

func (self *Launcher) SetFlowIdForTests(id string) {
NextFlowIdForTests = id
}

var (
NextFlowIdForTests string
)

func NewFlowId(client_id string) string {
if NextFlowIdForTests != "" {
result := NextFlowIdForTests
NextFlowIdForTests = ""
return result
}

buf := make([]byte, 8)
_, _ = rand.Read(buf)

binary.BigEndian.PutUint32(buf, uint32(time.Now().Unix()))
result := base32.HexEncoding.EncodeToString(buf)[:13]

return constants.FLOW_PREFIX + result
utils.SetFlowIdGenerator(utils.ConstantFlowIdGenerator(id))
}

func NewLauncherService(
Expand Down
53 changes: 53 additions & 0 deletions utils/flows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package utils

import (
"crypto/rand"
"encoding/base32"
"encoding/binary"
"fmt"
"time"

"www.velocidex.com/golang/velociraptor/constants"
)

var (
generator FlowIdGenerator = RandomFlowIdGenerator{}
)

type FlowIdGenerator interface {
Next(client_id string) string
}

type RandomFlowIdGenerator struct{}

func (self RandomFlowIdGenerator) Next(client_id string) string {
buf := make([]byte, 8)
_, _ = rand.Read(buf)

binary.BigEndian.PutUint32(buf, uint32(time.Now().Unix()))
result := base32.HexEncoding.EncodeToString(buf)[:13]

return constants.FLOW_PREFIX + result

}

type ConstantFlowIdGenerator string

func (self ConstantFlowIdGenerator) Next(client_id string) string {
return string(self)
}

type IncrementalFlowIdGenerator int

func (self *IncrementalFlowIdGenerator) Next(client_id string) string {
*self = *self + 1
return fmt.Sprintf("F.%d", *self)
}

func NewFlowId(client_id string) string {
return generator.Next(client_id)
}

func SetFlowIdGenerator(gen FlowIdGenerator) {
generator = gen
}
2 changes: 1 addition & 1 deletion vql/tools/collector/collector_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ func (self *collectionManager) MakeContainer(filename, password string, level in

self.scope.Log("Will create container at %s", filename)

self.collection_context.SessionId = launcher.NewFlowId("")
self.collection_context.SessionId = utils.NewFlowId("")
self.log_file, err = reporting.NewResultSetWriter(
self.container, "log.json")
return err
Expand Down
34 changes: 27 additions & 7 deletions vql/tools/collector/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"www.velocidex.com/golang/velociraptor/acls"
api_proto "www.velocidex.com/golang/velociraptor/api/proto"
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
"www.velocidex.com/golang/velociraptor/datastore"
crypto_proto "www.velocidex.com/golang/velociraptor/crypto/proto"
"www.velocidex.com/golang/velociraptor/file_store"
"www.velocidex.com/golang/velociraptor/file_store/api"
flows_proto "www.velocidex.com/golang/velociraptor/flows/proto"
Expand Down Expand Up @@ -251,15 +251,28 @@ func (self ImportCollectionFunction) importFlow(

collection_context.ClientId = client_id

flow_path_manager := paths.NewFlowPathManager(
client_id, collection_context.SessionId)
launcher, err := services.GetLauncher(config_obj)
if err != nil {
return nil, err
}

// Check if this flow is already in this client. If it is then we
// make a new flow id so the new import is kept separated.
_, err = launcher.GetFlowDetails(ctx, config_obj, client_id,
collection_context.SessionId)
if err == nil {
collection_context.SessionId = utils.NewFlowId(client_id)
}

db, err := datastore.GetDB(config_obj)
// Write the flow and update indexes
err = launcher.Storage().WriteFlow(ctx, config_obj,
collection_context, utils.BackgroundWriter)
if err != nil {
return nil, err
}

err = db.SetSubject(config_obj, flow_path_manager.Path(), collection_context)
err = launcher.Storage().WriteFlowIndex(ctx, config_obj,
collection_context)
if err != nil {
return nil, err
}
Expand All @@ -268,13 +281,20 @@ func (self ImportCollectionFunction) importFlow(
tasks := &api_proto.ApiFlowRequestDetails{}

// If there is no requests.json, just write an empty one
_ = self.getFile(accessor, root.Append("requests.json"), tasks)
err = db.SetSubject(config_obj, flow_path_manager.Task(), tasks)
err = self.getFile(accessor, root.Append("requests.json"), tasks)
if err != nil || len(tasks.Items) == 0 {
tasks.Items = append(tasks.Items, &crypto_proto.VeloMessage{})
}

err = launcher.Storage().WriteTask(ctx, config_obj, client_id,
tasks.Items[0])
if err != nil {
return nil, err
}

// Copy the logs results set over.
flow_path_manager := paths.NewFlowPathManager(client_id,
collection_context.SessionId)
err = self.copyResultSet(ctx, config_obj, scope,
accessor, root.Append("log.json"), flow_path_manager.Log())
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions vql/tools/collector/import_hunt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ sources:
}
)

func (self *TestSuite) TestImportDynamicHunt() {
func (self *TestSuite) TestCreateAndImportHunt() {
closer := utils.MockTime(utils.NewMockClock(time.Unix(10, 10)))
defer closer()

Expand Down Expand Up @@ -176,7 +176,7 @@ func (self *TestSuite) TestImportDynamicHunt() {
// test_utils.GetMemoryFileStore(self.T(), self.ConfigObj).Debug()
golden.Set("Imported Flow", self.snapshotHuntFlow())

goldie.Assert(self.T(), "TestImportDynamicHunt", json.MustMarshalIndent(golden))
goldie.Assert(self.T(), "TestCreateAndImportHunt", json.MustMarshalIndent(golden))
}

func (self *TestSuite) snapshotHuntFlow() *ordereddict.Dict {
Expand All @@ -194,7 +194,7 @@ func (self *TestSuite) snapshotHuntFlow() *ordereddict.Dict {
})
}

func (self *TestSuite) TestImportStaticHunt() {
func (self *TestSuite) TestImportHuntFromFixture() {
launcher, err := services.GetLauncher(self.ConfigObj)
assert.NoError(self.T(), err)
launcher.SetFlowIdForTests("F.1234XX")
Expand Down Expand Up @@ -257,7 +257,7 @@ func (self *TestSuite) TestImportStaticHunt() {
return len(value) > 0
})

goldie.Assert(self.T(), "TestImportStaticHunt",
goldie.Assert(self.T(), "TestImportHuntFromFixture",
json.MustMarshalIndent(self.snapshotStaticHuntFlow()))
}

Expand Down
6 changes: 3 additions & 3 deletions vql/tools/collector/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
_ "www.velocidex.com/golang/velociraptor/vql/protocols"
)

func (self *TestSuite) TestImportDynamicCollection() {
func (self *TestSuite) TestCreateAndImportCollection() {
closer := utils.MockTime(utils.NewMockClock(time.Unix(10, 10)))
defer closer()

Expand Down Expand Up @@ -112,10 +112,10 @@ func (self *TestSuite) TestImportDynamicCollection() {
// test_utils.GetMemoryFileStore(self.T(), self.ConfigObj).Debug()
golden.Set("Imported Flow", self.snapshotHuntFlow())

goldie.Assert(self.T(), "TestImportDynamicCollection", json.MustMarshalIndent(golden))
goldie.Assert(self.T(), "TestCreateAndImportCollection", json.MustMarshalIndent(golden))
}

func (self *TestSuite) TestImportStaticCollection() {
func (self *TestSuite) TestImportCollectionFromFixture() {
manager, _ := services.GetRepositoryManager(self.ConfigObj)
repository, _ := manager.GetGlobalRepository(self.ConfigObj)
_, err := repository.LoadYaml(CustomTestArtifactDependent,
Expand Down

0 comments on commit 19a0638

Please sign in to comment.