Skip to content

Commit

Permalink
Added paging and filtering to hunt's client table (#3263)
Browse files Browse the repository at this point in the history
This makes it easier to see the clients that participated in the hunt
without resorting to VQL notebookes.
  • Loading branch information
scudette authored Feb 1, 2024
1 parent d43adca commit 06eb7c4
Show file tree
Hide file tree
Showing 13 changed files with 240 additions and 73 deletions.
24 changes: 11 additions & 13 deletions api/hunts.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
errors "github.com/go-errors/errors"

context "golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"www.velocidex.com/golang/velociraptor/acls"
api_proto "www.velocidex.com/golang/velociraptor/api/proto"
Expand Down Expand Up @@ -41,31 +39,31 @@ func (self *ApiServer) GetHuntFlows(
"User is not allowed to view hunt results.")
}

hunt_dispatcher, err := services.GetHuntDispatcher(org_config_obj)
options, err := tables.GetTableOptions(in)
if err != nil {
return nil, Status(self.verbose, err)
}

hunt, pres := hunt_dispatcher.GetHunt(ctx, in.HuntId)
if !pres {
return nil, status.Error(codes.InvalidArgument, "No hunt known")
hunt_dispatcher, err := services.GetHuntDispatcher(org_config_obj)
if err != nil {
return nil, Status(self.verbose, err)
}

total_scheduled := int64(-1)
if hunt.Stats != nil {
total_scheduled = int64(hunt.Stats.TotalClientsScheduled)
scope := vql_subsystem.MakeScope()
flow_chan, total_rows, err := hunt_dispatcher.GetFlows(
ctx, org_config_obj, options, scope, in.HuntId, int(in.StartRow))
if err != nil {
return nil, Status(self.verbose, err)
}

result := &api_proto.GetTableResponse{
TotalRows: total_scheduled,
TotalRows: total_rows,
Columns: []string{
"ClientId", "Hostname", "FlowId", "StartedTime", "State", "Duration",
"TotalBytes", "TotalRows",
}}

scope := vql_subsystem.MakeScope()
for flow := range hunt_dispatcher.GetFlows(ctx, org_config_obj, scope,
in.HuntId, int(in.StartRow)) {
for flow := range flow_chan {
if flow.Context == nil {
continue
}
Expand Down
11 changes: 2 additions & 9 deletions gui/velociraptor/src/components/core/paged-table.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -450,20 +450,13 @@ class VeloPagedTable extends Component {
this.source.cancel();
this.source = CancelToken.source();

this.setState({loading: true});

api.get(url, params, this.source.token).then((response) => {
if (response.cancel) {
return;
}

/*
// Ignore updates that return the same data - this
// prevents the table from redrawing when no data has
// changed.
if (_.isEqual(this.state.last_data, response.data)) {
return;
}
this.setState({last_data: response.data});
*/
let pageData = PrepareData(response.data);
let toggles = Object.assign({}, this.state.toggles);
let columns = pageData.columns;
Expand Down
1 change: 0 additions & 1 deletion gui/velociraptor/src/components/hunts/hunt-clients.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ export default class HuntClients extends React.Component {
renderers={renderers}
params={params}
translate_column_headers={true}
no_transformations={true}
no_toolbar={true}
/>
);
Expand Down
6 changes: 6 additions & 0 deletions paths/hunt_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ func (self HuntPathManager) Clients() api.FSPathSpec {
return HUNTS_ROOT.AddChild(self.hunt_id).AsFilestorePath()
}

// A frequently refreshed table that mirrors the Clients() table above
// but include filterable/searchable fields.
func (self HuntPathManager) EnrichedClients() api.FSPathSpec {
return HUNTS_ROOT.AddChild(self.hunt_id, "enriched").AsFilestorePath()
}

// Where to store client errors.
func (self HuntPathManager) ClientErrors() api.FSPathSpec {
return HUNTS_ROOT.AddChild(self.hunt_id + "_errors").
Expand Down
2 changes: 2 additions & 0 deletions result_sets/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ type ResultSetReader interface {
// to parse the data from storage.
JSON(ctx context.Context) (<-chan []byte, error)
Close()

TotalRows() int64
MTime() time.Time
}

type TimedResultSetReader interface {
Expand Down
18 changes: 14 additions & 4 deletions result_sets/simple/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/Velocidex/json"
"github.com/Velocidex/ordereddict"
Expand Down Expand Up @@ -237,16 +238,22 @@ func (self ResultSetFactory) NewResultSetWriter(
// A ResultSetReader can produce rows from a result set.
type ResultSetReaderImpl struct {
total_rows int64
fd api.FileReader
idx_fd api.FileReader
log_path api.FSPathSpec
idx int64
mtime time.Time

fd api.FileReader
idx_fd api.FileReader
log_path api.FSPathSpec
idx int64
}

func (self *ResultSetReaderImpl) TotalRows() int64 {
return self.total_rows
}

func (self *ResultSetReaderImpl) MTime() time.Time {
return self.mtime
}

// Seeks the fd to the starting location. If successful then fd is
// ready to be read from row at a time.
func (self *ResultSetReaderImpl) SeekToRow(start int64) error {
Expand Down Expand Up @@ -477,12 +484,14 @@ func (self ResultSetFactory) NewResultSetReader(

// -1 indicates we dont know how many rows there are
total_rows := int64(-1)
var mtime time.Time
idx_fd, err := file_store_factory.ReadFile(log_path.
SetType(api.PATH_TYPE_FILESTORE_JSON_INDEX))
if err == nil {
stat, err := idx_fd.Stat()
if err == nil {
total_rows = stat.Size() / 8
mtime = stat.ModTime()
}
}

Expand All @@ -495,6 +504,7 @@ func (self ResultSetFactory) NewResultSetReader(

return &ResultSetReaderImpl{
total_rows: total_rows,
mtime: mtime,
fd: fd,
idx_fd: idx_fd,
log_path: log_path,
Expand Down
5 changes: 3 additions & 2 deletions services/hunt_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ type IHuntDispatcher interface {

// Paged view into the flows in the hunt
GetFlows(ctx context.Context, config_obj *config_proto.Config,
scope vfilter.Scope,
hunt_id string, start int) chan *api_proto.FlowDetails
options result_sets.ResultSetOptions, scope vfilter.Scope,
hunt_id string, start int) (
output chan *api_proto.FlowDetails, total_rows int64, err error)

CreateHunt(ctx context.Context,
config_obj *config_proto.Config,
Expand Down
171 changes: 141 additions & 30 deletions services/hunt_dispatcher/flows.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,61 +2,172 @@ package hunt_dispatcher

import (
"context"
"time"

api_proto "www.velocidex.com/golang/velociraptor/api/proto"
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
"www.velocidex.com/golang/velociraptor/file_store"
"www.velocidex.com/golang/velociraptor/json"
"www.velocidex.com/golang/velociraptor/logging"
"www.velocidex.com/golang/velociraptor/paths"
"www.velocidex.com/golang/velociraptor/result_sets"
"www.velocidex.com/golang/velociraptor/services"
"www.velocidex.com/golang/velociraptor/services/hunt_manager"
"www.velocidex.com/golang/velociraptor/utils"
vql_subsystem "www.velocidex.com/golang/velociraptor/vql"
"www.velocidex.com/golang/vfilter"
"www.velocidex.com/golang/vfilter/arg_parser"
)

func (self *HuntDispatcher) GetFlows(
ctx context.Context,
config_obj *config_proto.Config,
scope vfilter.Scope,
hunt_id string, start int) chan *api_proto.FlowDetails {
output_chan := make(chan *api_proto.FlowDetails)
func (self *HuntDispatcher) syncFlowTables(
ctx context.Context, config_obj *config_proto.Config,
hunt_id string) error {

go func() {
defer close(output_chan)
count := 0

hunt_path_manager := paths.NewHuntPathManager(hunt_id).Clients()
file_store_factory := file_store.GetFileStore(config_obj)
rs_reader, err := result_sets.NewResultSetReader(
file_store_factory, hunt_path_manager)
if err != nil {
scope.Log("hunt_flows: %v\n", err)
return
now := utils.GetTime().Now()

options := result_sets.ResultSetOptions{}
scope := vql_subsystem.MakeScope()
hunt_path_manager := paths.NewHuntPathManager(hunt_id)
file_store_factory := file_store.GetFileStore(config_obj)
rs_reader, err := result_sets.NewResultSetReaderWithOptions(
ctx, self.config_obj, file_store_factory,
hunt_path_manager.Clients(), options)
if err != nil {
return err
}
defer rs_reader.Close()

enriched_reader, err := result_sets.NewResultSetReaderWithOptions(
ctx, self.config_obj, file_store_factory,
hunt_path_manager.EnrichedClients(), options)
if err == nil {
enriched_reader.Close()

// Skip refreshing the enriched table if it is newer than 5 min
// old - this helps to reduce unnecessary updates.
if now.Sub(enriched_reader.MTime()) < 5*time.Minute {
return nil
}
defer rs_reader.Close()
}

// Report how long it took to refresh the table
defer func() {
logger := logging.GetLogger(config_obj, &logging.FrontendComponent)
logger.Info("<green>HuntDispatcher:</> Mirrored client table in %v (%v records)",
utils.GetTime().Now().Sub(now), count)
}()

rs_writer, err := result_sets.NewResultSetWriter(file_store_factory,
hunt_path_manager.EnrichedClients(), json.DefaultEncOpts(),
utils.SyncCompleter, result_sets.TruncateMode)
if err != nil {
return err
}
defer rs_writer.Close()

// Seek to the row we need.
err = rs_reader.SeekToRow(int64(start))
launcher, err := services.GetLauncher(config_obj)
if err != nil {
return err
}

for row := range rs_reader.Rows(ctx) {
participation_row := &hunt_manager.ParticipationRecord{}
err := arg_parser.ExtractArgsWithContext(ctx, scope, row, participation_row)
if err != nil {
scope.Log("hunt_flows: %v\n", err)
return
return err
}

launcher, err := services.GetLauncher(config_obj)
flow, err := launcher.GetFlowDetails(
ctx, config_obj, participation_row.ClientId,
participation_row.FlowId)
if err != nil {
scope.Log("hunt_flows: %v\n", err)
return
continue
}

count++
rs_writer.WriteJSONL([]byte(
json.Format(`{"ClientId": %q, "Hostname": %q, "FlowId": %q, "StartedTime": %q, "State": %q, "Duration": %q, "TotalBytes": %q, "TotalRows": %q}
`,
participation_row.ClientId,
services.GetHostname(ctx, config_obj, participation_row.ClientId),
participation_row.FlowId,
flow.Context.StartTime/1000,
flow.Context.State.String(),
flow.Context.ExecutionDuration/1000000000,
flow.Context.TotalUploadedBytes,
flow.Context.TotalCollectedRows)), 1)
}
return nil
}

func (self *HuntDispatcher) GetFlows(
ctx context.Context,
config_obj *config_proto.Config,
options result_sets.ResultSetOptions, scope vfilter.Scope,
hunt_id string, start int) (chan *api_proto.FlowDetails, int64, error) {

output_chan := make(chan *api_proto.FlowDetails)

hunt_path_manager := paths.NewHuntPathManager(hunt_id)
table_to_query := hunt_path_manager.Clients()

// We only need to sync the tables if the options need to use
// anything other than the default table, otherwise we just query
// the original table.
if options.SortColumn != "" || options.FilterColumn != "" {
self.syncFlowTables(ctx, config_obj, hunt_id)
table_to_query = hunt_path_manager.EnrichedClients()
}

file_store_factory := file_store.GetFileStore(config_obj)
rs_reader, err := result_sets.NewResultSetReaderWithOptions(
ctx, self.config_obj, file_store_factory,
table_to_query, options)
if err != nil {
close(output_chan)
return output_chan, 0, err
}

// Seek to the row we need.
err = rs_reader.SeekToRow(int64(start))
if err != nil {
close(output_chan)
rs_reader.Close()
return output_chan, 0, err
}

launcher, err := services.GetLauncher(config_obj)
if err != nil {
close(output_chan)
rs_reader.Close()
return output_chan, 0, err
}

go func() {
defer close(output_chan)
defer rs_reader.Close()

for row := range rs_reader.Rows(ctx) {
participation_row := &hunt_manager.ParticipationRecord{}
err := arg_parser.ExtractArgsWithContext(ctx, scope, row, participation_row)
if err != nil {
return
client_id, pres := row.GetString("ClientId")
if !pres {
client_id, pres = row.GetString("client_id")
if !pres {
continue
}
}

flow_id, pres := row.GetString("FlowId")
if !pres {
flow_id, pres = row.GetString("flow_id")
if !pres {
continue
}
}

collection_context, err := launcher.GetFlowDetails(
ctx, config_obj, participation_row.ClientId,
participation_row.FlowId)
ctx, config_obj, client_id, flow_id)
if err != nil {
continue
}
Expand All @@ -69,5 +180,5 @@ func (self *HuntDispatcher) GetFlows(
}
}()

return output_chan
return output_chan, rs_reader.TotalRows(), nil
}
Loading

0 comments on commit 06eb7c4

Please sign in to comment.