Skip to content

Commit

Permalink
Allow flows to be deleted asyncronously
Browse files Browse the repository at this point in the history
This supports the mass-deletion patten with indexes rebuild
periodically but not very frequently.
  • Loading branch information
scudette committed Jan 20, 2025
1 parent e76910c commit d754d0b
Show file tree
Hide file tree
Showing 10 changed files with 237 additions and 36 deletions.
6 changes: 5 additions & 1 deletion artifacts/definitions/Server/Utils/DeleteFlow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ parameters:
- name: ReallyDoIt
description: If you really want to delete the collection, check this.
type: bool
- name: Sync
description: If specified we ensure delete happens immediately
type: bool

sources:
- query: |
SELECT Type, Data.VFSPath AS VFSPath, Error
FROM delete_flow(flow_id=FlowId, client_id=ClientId, really_do_it=ReallyDoIt)
FROM delete_flow(flow_id=FlowId,
client_id=ClientId, really_do_it=ReallyDoIt, sync=Sync)
1 change: 1 addition & 0 deletions gui/velociraptor/src/components/flows/flows-list.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ export class DeleteFlowDialog extends React.PureComponent {
"Server.Utils.DeleteFlow",
{FlowId: flow_id,
ClientId: client_id,
Sync: "Y",
ReallyDoIt: "Y"}, ()=>{
this.props.onClose();
this.setState({loading: false});
Expand Down
21 changes: 18 additions & 3 deletions services/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@ const (
// When the principal is set to this below we avoid audit logging
// the call.
NoAuditLogging = ""
DryRunOnly = false
)

var (
FlowNotFoundError = utils.Wrap(os.ErrNotExist, "Flow not found")
DryRunOnly = DeleteFlowOptions{
ReallyDoIt: false,
}
)

type DeleteFlowResponse struct {
Expand All @@ -93,6 +95,19 @@ type GetFlowOptions struct {
Downloads bool
}

type DeleteFlowOptions struct {
// If this is not set, we do a dry run to indicate which files
// will be deleted within the flow but do not actually delete the
// files.
ReallyDoIt bool

// If this is set the delete will be synchronous and index updated
// immediately. This is much slower but it is necessary when
// results need to be available immediately. When False, we delete
// asynchronously and update the index at a later time.
Sync bool
}

type CompilerOptions struct {
// Should names be obfuscated in the resulting VQL?
ObfuscateNames bool
Expand Down Expand Up @@ -136,7 +151,7 @@ type FlowStorer interface {
ctx context.Context,
config_obj *config_proto.Config,
client_id string, flow_id string, principal string,
really_do_it bool) ([]*DeleteFlowResponse, error)
options DeleteFlowOptions) ([]*DeleteFlowResponse, error)

LoadCollectionContext(
ctx context.Context,
Expand Down Expand Up @@ -246,5 +261,5 @@ type Launcher interface {
config_obj *config_proto.Config,
principal, artifact, client_id string,
start_time, end_time time.Time,
really_do_it bool) ([]*DeleteFlowResponse, error)
options DeleteFlowOptions) ([]*DeleteFlowResponse, error)
}
28 changes: 19 additions & 9 deletions services/launcher/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ func (self *FlowStorageManager) DeleteFlow(
ctx context.Context,
config_obj *config_proto.Config,
client_id string, flow_id string, principal string,
really_do_it bool) ([]*services.DeleteFlowResponse, error) {
options services.DeleteFlowOptions) (
[]*services.DeleteFlowResponse, error) {

launcher, err := services.GetLauncher(config_obj)
if err != nil {
Expand All @@ -46,7 +47,7 @@ func (self *FlowStorageManager) DeleteFlow(
return nil, nil
}

if really_do_it && principal != "" {
if options.ReallyDoIt && principal != "" {
services.LogAudit(ctx,
config_obj, principal, "delete_flow",
ordereddict.NewDict().
Expand All @@ -60,7 +61,7 @@ func (self *FlowStorageManager) DeleteFlow(
upload_metadata_path := flow_path_manager.UploadMetadata()

r := &reporter{
really_do_it: really_do_it,
really_do_it: options.ReallyDoIt,
ctx: ctx,
config_obj: config_obj,
seen: make(map[string]bool),
Expand Down Expand Up @@ -151,8 +152,16 @@ func (self *FlowStorageManager) DeleteFlow(
r.emit_fs("NotebookItem", path)
return nil
})
if really_do_it {
err = self.removeFlowFromIndex(ctx, config_obj, client_id, flow_id)
if options.ReallyDoIt {
// User specified the flow must be removed immediately.
if options.Sync {
err = self.removeFlowFromIndex(ctx, config_obj, client_id, flow_id)
} else {
// Otherwise we just mark the index as pending a rebuild and move on.
self.mu.Lock()
self.pendingIndexes = append(self.pendingIndexes, client_id)
self.mu.Unlock()
}
}
r.pool.StopAndWait()

Expand Down Expand Up @@ -265,7 +274,8 @@ func (self *Launcher) DeleteEvents(
config_obj *config_proto.Config,
principal, artifact, client_id string,
start_time, end_time time.Time,
really_do_it bool) ([]*services.DeleteFlowResponse, error) {
options services.DeleteFlowOptions) (
[]*services.DeleteFlowResponse, error) {

path_manager, err := artifacts.NewArtifactPathManager(ctx,
config_obj, client_id, "", artifact)
Expand All @@ -284,7 +294,7 @@ func (self *Launcher) DeleteEvents(
f.StartTime.Before(end_time) {
var error_message string

if really_do_it {
if options.ReallyDoIt {
err := file_store_factory.Delete(f.Path)
if err != nil {
error_message = fmt.Sprintf(
Expand Down Expand Up @@ -320,7 +330,7 @@ func (self *Launcher) DeleteEvents(
f.StartTime.Before(end_time) {
var error_message string

if really_do_it {
if options.ReallyDoIt {
err := file_store_factory.Delete(f.Path)
if err != nil {
error_message = fmt.Sprintf(
Expand All @@ -347,7 +357,7 @@ func (self *Launcher) DeleteEvents(
}

// Log into the audit log
if really_do_it {
if options.ReallyDoIt {
return result, services.LogAudit(ctx, config_obj, principal, "DeleteEvents",
ordereddict.NewDict().
Set("artifact", artifact).
Expand Down
7 changes: 4 additions & 3 deletions services/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,8 @@ func NewLauncherService(
wg *sync.WaitGroup,
config_obj *config_proto.Config) (services.Launcher, error) {

return &Launcher{
Storage_: &FlowStorageManager{},
}, nil
res := &Launcher{
Storage_: NewFlowStorageManager(ctx, config_obj, wg),
}
return res, nil
}
109 changes: 109 additions & 0 deletions services/launcher/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (

"github.com/Velocidex/ordereddict"
"github.com/go-errors/errors"
"www.velocidex.com/golang/velociraptor/constants"
"www.velocidex.com/golang/velociraptor/file_store"
"www.velocidex.com/golang/velociraptor/result_sets"
"www.velocidex.com/golang/velociraptor/utils"
"www.velocidex.com/golang/velociraptor/vtesting/goldie"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -66,6 +70,13 @@ sources:
SELECT * FROM info()
`

func (self *LauncherTestSuite) SetupTest() {
self.ConfigObj = self.TestSuite.LoadConfig()
self.ConfigObj.Services.ServerArtifacts = true

self.TestSuite.SetupTest()
}

func (self *LauncherTestSuite) TestCompilingWithTools() {
// Our tool binary and its hash.
message := []byte("Hello world")
Expand Down Expand Up @@ -1318,6 +1329,104 @@ func getReqName(in *actions_proto.VQLCollectorArgs) string {
return ""
}

func (self *LauncherTestSuite) TestDelete() {
launcher, err := services.GetLauncher(self.ConfigObj)
assert.NoError(self.T(), err)

flow_id := "F.FlowId123"
user := "admin"

manager, _ := services.GetRepositoryManager(self.ConfigObj)
repository, _ := manager.GetGlobalRepository(self.ConfigObj)
acl_manager := acl_managers.NullACLManager{}

defer utils.SetFlowIdForTests(flow_id)()

// Schedule a job for the server runner.
flow_id, err = launcher.ScheduleArtifactCollection(
self.Ctx, self.ConfigObj, acl_manager,
repository, &flows_proto.ArtifactCollectorArgs{
Creator: user,
ClientId: "server",
Artifacts: []string{"Generic.Client.Info"},
}, utils.SyncCompleter)

assert.NoError(self.T(), err)

res, err := launcher.GetFlows(self.Ctx, self.ConfigObj, "server",
result_sets.ResultSetOptions{}, 0, 10)
assert.NoError(self.T(), err)
assert.Equal(self.T(), len(res.Items), 1)
assert.Equal(self.T(), res.Items[0].SessionId, flow_id)

// Now delete the flow asyncronously
_, err = launcher.Storage().DeleteFlow(
self.Ctx, self.ConfigObj, "server",
flow_id, constants.PinnedServerName,
services.DeleteFlowOptions{
ReallyDoIt: true,
Sync: false,
})
assert.NoError(self.T(), err)

// Index is not updated yet
idx := self.getIndex("server")
assert.Equal(self.T(), len(idx), 1)
idx_flow_id, _ := idx[0].GetString("FlowId")
assert.Equal(self.T(), flow_id, idx_flow_id)

// However GetFlows omits the deleted flow immediately because it
// can not find it (The actual flow object is removed but the
// index is out of step).
res, err = launcher.GetFlows(self.Ctx, self.ConfigObj, "server",
result_sets.ResultSetOptions{}, 0, 10)
assert.NoError(self.T(), err)
assert.Equal(self.T(), len(res.Items), 0)

// Create the flow again
new_flow_id, err := launcher.ScheduleArtifactCollection(
self.Ctx, self.ConfigObj, acl_manager,
repository, &flows_proto.ArtifactCollectorArgs{
Creator: user,
ClientId: "server",
Artifacts: []string{"Generic.Client.Info"},
}, utils.SyncCompleter)
assert.NoError(self.T(), err)
assert.Equal(self.T(), new_flow_id, flow_id)

// Now delete the flow syncronously
_, err = launcher.Storage().DeleteFlow(
self.Ctx, self.ConfigObj, "server",
flow_id, constants.PinnedServerName,
services.DeleteFlowOptions{
ReallyDoIt: true,
Sync: true,
})
assert.NoError(self.T(), err)

// This time the index is reset immediately.
idx = self.getIndex("server")
assert.Equal(self.T(), len(idx), 0)
}

func (self *LauncherTestSuite) getIndex(client_id string) (
res []*ordereddict.Dict) {

client_path_manager := paths.NewClientPathManager(client_id)
file_store_factory := file_store.GetFileStore(self.ConfigObj)
rs_reader, err := result_sets.NewResultSetReader(file_store_factory,
client_path_manager.FlowIndex())
if err != nil {
return nil
}
defer rs_reader.Close()

for r := range rs_reader.Rows(self.Ctx) {
res = append(res, r)
}
return res
}

func TestLauncher(t *testing.T) {
suite.Run(t, &LauncherTestSuite{})
}
Loading

0 comments on commit d754d0b

Please sign in to comment.