Skip to content

Commit

Permalink
Bugfix: Disable support for FlowStatsRequest in pool client (#3840)
Browse files Browse the repository at this point in the history
The responses were not forwarded causing the server to constantly
query about the state of the flows.

Also added debug view into directly connected clients. Added tags to
timeouts so we can identify the cause of timeout cancellations better.
  • Loading branch information
scudette committed Oct 21, 2024
1 parent 0d75946 commit 2d1e270
Show file tree
Hide file tree
Showing 24 changed files with 238 additions and 149 deletions.
11 changes: 8 additions & 3 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
package api

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
Expand All @@ -32,7 +33,6 @@ import (
"github.com/Velocidex/ordereddict"
errors "github.com/go-errors/errors"
"github.com/prometheus/client_golang/prometheus/promhttp"
context "golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -713,6 +713,10 @@ func (self *ApiServer) GetArtifacts(

for _, name := range in.Names {
artifact, pres := repository.Get(ctx, org_config_obj, name)
if !pres {
continue
}

artifact_clone := proto.Clone(artifact).(*artifacts_proto.Artifact)
for _, s := range artifact_clone.Sources {
s.Queries = nil
Expand Down Expand Up @@ -1238,8 +1242,9 @@ func StartMonitoringService(
<-ctx.Done()

logger.Info("<red>Shutting down</> Prometheus monitoring service")
timeout_ctx, cancel := context.WithTimeout(
context.Background(), 10*time.Second)
timeout_ctx, cancel := context.WithTimeoutCause(
context.Background(), 10*time.Second,
errors.New("Monitoring Service deadline reached"))
defer cancel()

err := server.Shutdown(timeout_ctx)
Expand Down
20 changes: 12 additions & 8 deletions api/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,9 @@ func StartFrontendHttps(
server_obj.Info("<red>Shutting down</> frontend")
atomic.StoreInt32(&server_obj.Healthy, 0)

time_ctx, cancel := context.WithTimeout(
context.Background(), 10*time.Second)
time_ctx, cancel := context.WithTimeoutCause(
context.Background(), 10*time.Second,
errors.New("Deadline exceeded shuttin down frontend"))
defer cancel()

server.SetKeepAlivesEnabled(false)
Expand Down Expand Up @@ -580,8 +581,9 @@ func StartFrontendWithAutocert(
server_obj.Info("<red>Stopping Frontend Server")
atomic.StoreInt32(&server_obj.Healthy, 0)

timeout_ctx, cancel := context.WithTimeout(
context.Background(), 10*time.Second)
timeout_ctx, cancel := context.WithTimeoutCause(
context.Background(), 10*time.Second,
errors.New("Deadline exceeded shuttin down frontend"))
defer cancel()

server.SetKeepAlivesEnabled(false)
Expand Down Expand Up @@ -641,8 +643,9 @@ func StartHTTPGUI(
<-ctx.Done()

logger.Info("<red>Stopping GUI Server")
timeout_ctx, cancel := context.WithTimeout(
context.Background(), 10*time.Second)
timeout_ctx, cancel := context.WithTimeoutCause(
context.Background(), 10*time.Second,
errors.New("Deadline exceeded shuttin down GUI"))
defer cancel()

server.SetKeepAlivesEnabled(false)
Expand Down Expand Up @@ -720,8 +723,9 @@ func StartSelfSignedGUI(
<-ctx.Done()

logger.Info("<red>Stopping GUI Server")
timeout_ctx, cancel := context.WithTimeout(
context.Background(), 10*time.Second)
timeout_ctx, cancel := context.WithTimeoutCause(
context.Background(), 10*time.Second,
errors.New("Deadline exceeded shuttin down GUI"))
defer cancel()

server.SetKeepAlivesEnabled(false)
Expand Down
31 changes: 16 additions & 15 deletions api/query.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
/*
Velociraptor - Dig Deeper
Copyright (C) 2019-2024 Rapid7 Inc.
Velociraptor - Dig Deeper
Copyright (C) 2019-2024 Rapid7 Inc.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package api

import (
"context"
"fmt"
"io"
"log"
Expand All @@ -30,7 +31,6 @@ import (
errors "github.com/go-errors/errors"

"github.com/sirupsen/logrus"
context "golang.org/x/net/context"
"www.velocidex.com/golang/velociraptor/actions"
actions_proto "www.velocidex.com/golang/velociraptor/actions/proto"
api_proto "www.velocidex.com/golang/velociraptor/api/proto"
Expand Down Expand Up @@ -111,8 +111,9 @@ func streamQuery(
// Implement timeout
if arg.Timeout > 0 {
start := time.Now()
timed_ctx, timed_cancel := context.WithTimeout(subctx,
time.Second*time.Duration(arg.Timeout))
timed_ctx, timed_cancel := context.WithTimeoutCause(subctx,
time.Second*time.Duration(arg.Timeout),
errors.New("Query API timeout reached"))

wg.Add(1)
go func() {
Expand Down
6 changes: 4 additions & 2 deletions bin/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

"github.com/Velocidex/ordereddict"
errors "github.com/go-errors/errors"
kingpin "gopkg.in/alecthomas/kingpin.v2"
"www.velocidex.com/golang/velociraptor/actions"
actions_proto "www.velocidex.com/golang/velociraptor/actions/proto"
Expand Down Expand Up @@ -390,8 +391,9 @@ func doQuery() error {

if *query_command_collect_timeout > 0 {
start := time.Now()
timed_ctx, timed_cancel := context.WithTimeout(ctx,
time.Second*time.Duration(*query_command_collect_timeout))
timed_ctx, timed_cancel := context.WithTimeoutCause(ctx,
time.Second*time.Duration(*query_command_collect_timeout),
errors.New("Query: deadline reached"))

go func() {
select {
Expand Down
8 changes: 8 additions & 0 deletions executor/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,14 @@ func (self *PoolClientExecutor) ProcessRequest(
ctx context.Context,
message *crypto_proto.VeloMessage) {

// Handle FlowStatsRequest specially - we just pretend this client
// does not support this feature.
if message.FlowStatsRequest != nil {
responder.MakeErrorResponse(self.Outbound, message.SessionId,
"Unsupported in Pool Client")
return
}

if message.UpdateEventTable != nil {
self.delegate.maybeUpdateEventTable(ctx, message)
return
Expand Down
41 changes: 35 additions & 6 deletions flows/client_flow_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,19 @@ func (self *ClientFlowRunner) removeInflightChecks(
Set("ClearFlows", true),
"Server.Internal.ClientScheduled")

return nil
// Update the client's in flight flow tracker on the local system
// as well. This helps to update this record ASAP before waiting
// for the minion message to arrive.
client_info_manager, err := services.GetClientInfoManager(self.config_obj)
if err != nil {
return err
}

return client_info_manager.Modify(ctx, client_id,
func(client_info *services.ClientInfo) (*services.ClientInfo, error) {
client_info.InFlightFlows = nil
return client_info, nil
})
}

func (self *ClientFlowRunner) ProcessSingleMessage(
Expand Down Expand Up @@ -493,6 +505,12 @@ func (self *ClientFlowRunner) FlowStats(
return err
}

// Update the client's in flight flow tracker.
client_info_manager, err := services.GetClientInfoManager(self.config_obj)
if err != nil {
return err
}

// If this is the final response, then we will notify a flow
// completion.
if msg.FlowComplete {
Expand All @@ -502,19 +520,30 @@ func (self *ClientFlowRunner) FlowStats(
Set("Flow", stats).
Set("FlowId", flow_id).
Set("ClientId", client_id))
}

// Update the client's in flight flow tracker.
client_info_manager, err := services.GetClientInfoManager(self.config_obj)
if err != nil {
return err
// Immediately remove this flow from the local InFlightFlows
// so we dont schedule it again.
return client_info_manager.Modify(ctx, client_id,
func(client_info *services.ClientInfo) (*services.ClientInfo, error) {
if client_info.InFlightFlows == nil {
client_info.InFlightFlows = make(map[string]int64)
}

// Update the timestamp that we last received a stats
// update from this flow.
delete(client_info.InFlightFlows, flow_id)
return client_info, nil
})
}

return client_info_manager.Modify(ctx, client_id,
func(client_info *services.ClientInfo) (*services.ClientInfo, error) {
if client_info.InFlightFlows == nil {
client_info.InFlightFlows = make(map[string]int64)
}

// Update the timestamp that we last received a stats
// update from this flow.
client_info.InFlightFlows[flow_id] = utils.GetTime().Now().Unix()
return client_info, nil
})
Expand Down
28 changes: 13 additions & 15 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/Velocidex/go-elasticsearch/v7 v7.3.1-0.20191001125819-fee0ef9cac6b
github.com/Velocidex/go-magic v0.0.0-20211018155418-c5dc48282f28
github.com/Velocidex/go-yara v1.1.10-0.20240309155455-3f491847cec9
github.com/Velocidex/grpc-go-pool v1.2.2-0.20211129003310-ece3b3fe13f4
github.com/Velocidex/grpc-go-pool v1.2.2-0.20241016164850-ff0cb80037a8
github.com/Velocidex/json v0.0.0-20220224052537-92f3c0326e5a
github.com/Velocidex/pkcs7 v0.0.0-20230220112103-d4ed02e1862a
github.com/Velocidex/sflags v0.3.1-0.20231011011525-620ab7ca8617
Expand All @@ -37,7 +37,7 @@ require (
github.com/google/btree v1.1.2
github.com/google/rpmpack v0.5.0
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/google/uuid v1.3.1
github.com/google/uuid v1.6.0
github.com/gorilla/csrf v1.6.2
github.com/gorilla/schema v1.4.1
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0
Expand Down Expand Up @@ -66,24 +66,23 @@ require (
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5
github.com/robertkrimen/otto v0.3.0
github.com/russross/blackfriday/v2 v2.1.0
github.com/sebdah/goldie v1.0.0
github.com/sebdah/goldie/v2 v2.5.3
github.com/sergi/go-diff v1.2.0
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.9.0
github.com/xor-gate/ar v0.0.0-20170530204233-5c72ae81e2b7 // indirect
github.com/xor-gate/debpkg v1.0.0
go.starlark.net v0.0.0-20230925163745-10651d5192ab
golang.org/x/crypto v0.26.0
golang.org/x/crypto v0.28.0
golang.org/x/mod v0.17.0
golang.org/x/net v0.27.0
golang.org/x/sys v0.25.0
golang.org/x/net v0.30.0
golang.org/x/sys v0.26.0
golang.org/x/text v0.19.0
golang.org/x/time v0.3.0
google.golang.org/api v0.146.0
google.golang.org/genproto v0.0.0-20231009173412-8bfb1ae86b6c // indirect
google.golang.org/grpc v1.58.3
google.golang.org/protobuf v1.34.2
google.golang.org/grpc v1.67.1
google.golang.org/protobuf v1.35.1
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
Expand Down Expand Up @@ -149,7 +148,7 @@ require (
github.com/vincent-petithory/dataurl v1.0.0
github.com/virtuald/go-paniclog v0.0.0-20190812204905-43a7fa316459
golang.org/x/oauth2 v0.22.0
google.golang.org/genproto/googleapis/api v0.0.0-20231009173412-8bfb1ae86b6c
google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.2.0
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
Expand All @@ -159,7 +158,7 @@ require (

require (
cloud.google.com/go v0.110.8 // indirect
cloud.google.com/go/compute/metadata v0.3.0 // indirect
cloud.google.com/go/compute/metadata v0.5.0 // indirect
cloud.google.com/go/iam v1.1.2 // indirect
github.com/360EntSecGroup-Skylar/excelize v1.4.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.3.0 // indirect
Expand Down Expand Up @@ -187,17 +186,16 @@ require (
github.com/beevik/etree v1.1.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cavaliergopher/cpio v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/crewjam/httperr v0.2.0 // indirect
github.com/danwakefield/fnmatch v0.0.0-20160403171240-cbb64ac3d964 // indirect
github.com/dlclark/regexp2 v1.7.0 // indirect
github.com/dustmop/soup v1.1.2-0.20190516214245-38228baa104e // indirect
github.com/geoffgarside/ber v1.1.0 // indirect
github.com/go-jose/go-jose/v4 v4.0.4 // indirect
github.com/golang/gddo v0.0.0-20210115222349-20d68f94ee1f // indirect
github.com/golang/glog v1.1.0 // indirect
github.com/golang/glog v1.2.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.1 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
Expand Down Expand Up @@ -239,9 +237,9 @@ require (
go.uber.org/goleak v1.2.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/term v0.23.0 // indirect
golang.org/x/term v0.25.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231009173412-8bfb1ae86b6c // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 // indirect
www.velocidex.com/golang/binparsergen v0.1.1-0.20240404114946-8f66c7cf586e // indirect
)

Expand Down
Loading

0 comments on commit 2d1e270

Please sign in to comment.