From 05d95cdf2d5ffad58986b5cb2863f4043edd43a2 Mon Sep 17 00:00:00 2001 From: Mathew Merrick Date: Tue, 8 Oct 2024 17:00:55 -0700 Subject: [PATCH] fix: restart if pktmon server closes (#798) # Description the grpc windows client is pretty noisy with random grpc connection errors ![image](https://github.com/user-attachments/assets/9ac3f1b1-40b0-481a-bf15-b668ae5176d9) we currently log them and emit them, since (during test) they don't impact the full functionality of the stream. However, this specific error ![image](https://github.com/user-attachments/assets/df1cc421-4304-4674-866c-fbd63d3b774b) is indicative that the pktmon-server.exe has died for some reason. Currently, if the goroutine that kicked off the server exits, the main thread will keep trying to dial the server. This change is to return in the main thread if the goroutine exits, by writing to the error channel in that case retina windows will restart and setup all of the pktmon bits again ## Related Issue If this pull request is related to any issue, please mention it here. Additionally, make sure that the issue is assigned to you before submitting this pull request. ## Checklist - [ ] I have read the [contributing documentation](https://retina.sh/docs/contributing). - [ ] I signed and signed-off the commits (`git commit -S -s ...`). See [this documentation](https://docs.github.com/en/authentication/managing-commit-signature-verification/about-commit-signature-verification) on signing commits. - [ ] I have correctly attributed the author(s) of the code. - [ ] I have tested the changes locally. - [ ] I have followed the project's style guidelines. - [ ] I have updated the documentation, if necessary. - [ ] I have added tests, if applicable. ## Screenshots (if applicable) or Testing Completed Please add any relevant screenshots or GIFs to showcase the changes made. ## Additional Notes Add any additional notes or context about the pull request here. --- Please refer to the [CONTRIBUTING.md](../CONTRIBUTING.md) file for more information on how to contribute to this project. --- .../windows/pktmon/pktmon_plugin_windows.go | 61 +++++++++++-------- 1 file changed, 35 insertions(+), 26 deletions(-) diff --git a/pkg/plugin/windows/pktmon/pktmon_plugin_windows.go b/pkg/plugin/windows/pktmon/pktmon_plugin_windows.go index cb1daa0d8c..21e9b6f3df 100644 --- a/pkg/plugin/windows/pktmon/pktmon_plugin_windows.go +++ b/pkg/plugin/windows/pktmon/pktmon_plugin_windows.go @@ -3,11 +3,12 @@ package pktmon import ( "context" "encoding/json" - "errors" "fmt" "os" "os/exec" + "github.com/pkg/errors" + observerv1 "github.com/cilium/cilium/api/v1/observer" v1 "github.com/cilium/cilium/pkg/hubble/api/v1" kcfg "github.com/microsoft/retina/pkg/config" @@ -18,6 +19,7 @@ import ( "github.com/microsoft/retina/pkg/utils" "go.uber.org/zap" "go.uber.org/zap/zapio" + "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" @@ -79,14 +81,14 @@ func newGRPCClient() (*GRPCClient, error) { bytes, err := json.Marshal(retryPolicy) if err != nil { - return nil, fmt.Errorf("failed to marshal retry policy: %w", err) + return nil, errors.Wrapf(err, "failed to marshal retry policy") } retryPolicyStr := string(bytes) conn, err := grpc.Dial(fmt.Sprintf("%s:%s", "unix", socket), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(retryPolicyStr)) if err != nil { - return nil, fmt.Errorf("failed to dial pktmon server: %w", err) + return nil, errors.Wrapf(err, "failed to dial pktmon server:") } return &GRPCClient{observerv1.NewObserverClient(conn)}, nil @@ -100,7 +102,7 @@ func (p *Plugin) RunPktMonServer(ctx context.Context) error { pwd, err := os.Getwd() if err != nil { - return fmt.Errorf("failed to get current working directory for pktmon: %w", err) + return errors.Wrapf(err, "failed to get current working directory for pktmon") } cmd := pwd + "\\" + "controller-pktmon.exe" @@ -117,11 +119,11 @@ func (p *Plugin) RunPktMonServer(ctx context.Context) error { // block this thread, and should it ever return, it's a problem err = p.pktmonCmd.Run() if err != nil { - return fmt.Errorf("pktmon server exited when it should not have: %w", err) + return errors.Wrapf(err, "pktmon server exited when it should not have") } // we never want to return happy from this - return fmt.Errorf("pktmon server exited unexpectedly: %w", ErrUnexpectedExit) + return errors.Wrapf(ErrUnexpectedExit, "pktmon server exited unexpectedly") } func (p *Plugin) Start(ctx context.Context) error { @@ -130,27 +132,34 @@ func (p *Plugin) Start(ctx context.Context) error { return ErrNilEnricher } - go func() { + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { err := p.RunPktMonServer(ctx) if err != nil { - p.l.Error("pktmon server exited", zap.Error(err)) + return errors.Wrapf(err, "pktmon server exited") } - }() + return nil + }) err := p.SetupStream() if err != nil { - return fmt.Errorf("failed to setup initial pktmon stream: %w", err) + return errors.Wrapf(err, "failed to setup initial pktmon stream") } // run the getflows loop - for { - err := p.GetFlow(ctx) - if _, ok := status.FromError(err); ok { - p.l.Error("failed to get flow, retriable:", zap.Error(err)) - continue + g.Go(func() error { + for { + err := p.GetFlow(ctx) + if _, ok := status.FromError(err); ok { + p.l.Error("failed to get flow, retriable:", zap.Error(err)) + continue + } + return errors.Wrapf(err, "failed to get flow, unrecoverable") } - return fmt.Errorf("failed to get flow, unrecoverable: %w", err) - } + }) + + return g.Wait() } func (p *Plugin) SetupStream() error { @@ -159,14 +168,14 @@ func (p *Plugin) SetupStream() error { p.l.Info("creating pktmon client") p.grpcClient, err = newGRPCClient() if err != nil { - return fmt.Errorf("failed to create pktmon client before getting flows: %w", err) + return errors.Wrapf(err, "failed to create pktmon client before getting flows") } return nil } err = utils.Retry(fn, connectionRetryAttempts) if err != nil { - return fmt.Errorf("failed to create pktmon client: %w", err) + return errors.Wrapf(err, "failed to create pktmon client") } return nil @@ -174,20 +183,20 @@ func (p *Plugin) SetupStream() error { func (p *Plugin) StartStream(ctx context.Context) error { if p.grpcClient == nil { - return fmt.Errorf("unable to start stream: %w", ErrNilGrpcClient) + return errors.Wrapf(ErrNilGrpcClient, "unable to start stream") } var err error fn := func() error { p.stream, err = p.grpcClient.GetFlows(ctx, &observerv1.GetFlowsRequest{}) if err != nil { - return fmt.Errorf("failed to open pktmon stream: %w", err) + return errors.Wrapf(err, "failed to open pktmon stream") } return nil } err = utils.Retry(fn, connectionRetryAttempts) if err != nil { - return fmt.Errorf("failed to create pktmon client: %w", err) + return errors.Wrapf(err, "failed to create pktmon client") } return nil @@ -199,17 +208,17 @@ func (p *Plugin) GetFlow(ctx context.Context) error { err := p.StartStream(ctx) if err != nil { - return fmt.Errorf("failed to setup pktmon stream: %w", err) + return errors.Wrapf(err, "failed to setup pktmon stream") } for { select { case <-ctx.Done(): - return fmt.Errorf("pktmon plugin context done: %w", ctx.Err()) + return errors.Wrapf(ctx.Err(), "pktmon plugin context done") default: event, err := p.stream.Recv() if err != nil { - return fmt.Errorf("failed to receive pktmon event: %w", err) + return errors.Wrapf(err, "failed to receive pktmon event") } fl := event.GetFlow() @@ -258,7 +267,7 @@ func (p *Plugin) Stop() error { if p.pktmonCmd != nil { err := p.pktmonCmd.Process.Kill() if err != nil { - return fmt.Errorf("failed to kill pktmon server during stop: %w", err) + return errors.Wrapf(err, "failed to kill pktmon server during stop") } }