Skip to content

Commit

Permalink
fix: restart if pktmon server closes (#798)
Browse files Browse the repository at this point in the history
# Description

the grpc windows client is pretty noisy with random grpc connection
errors
 

![image](https://github.com/user-attachments/assets/9ac3f1b1-40b0-481a-bf15-b668ae5176d9)

we currently log them and emit them, since (during test) they don't
impact the full functionality of the stream. However, this specific
error
 


![image](https://github.com/user-attachments/assets/df1cc421-4304-4674-866c-fbd63d3b774b)

is indicative that the pktmon-server.exe has died for some reason.
Currently, if the goroutine that kicked off the server exits, the main
thread will keep trying to dial the server. This change is to return in
the main thread if the goroutine exits, by writing to the error channel
 
in that case retina windows will restart and setup all of the pktmon
bits again

## Related Issue

If this pull request is related to any issue, please mention it here.
Additionally, make sure that the issue is assigned to you before
submitting this pull request.

## Checklist

- [ ] I have read the [contributing
documentation](https://retina.sh/docs/contributing).
- [ ] I signed and signed-off the commits (`git commit -S -s ...`). See
[this
documentation](https://docs.github.com/en/authentication/managing-commit-signature-verification/about-commit-signature-verification)
on signing commits.
- [ ] I have correctly attributed the author(s) of the code.
- [ ] I have tested the changes locally.
- [ ] I have followed the project's style guidelines.
- [ ] I have updated the documentation, if necessary.
- [ ] I have added tests, if applicable.

## Screenshots (if applicable) or Testing Completed

Please add any relevant screenshots or GIFs to showcase the changes
made.

## Additional Notes

Add any additional notes or context about the pull request here.

---

Please refer to the [CONTRIBUTING.md](../CONTRIBUTING.md) file for more
information on how to contribute to this project.
  • Loading branch information
matmerr authored Oct 9, 2024
1 parent 25963b4 commit 05d95cd
Showing 1 changed file with 35 additions and 26 deletions.
61 changes: 35 additions & 26 deletions pkg/plugin/windows/pktmon/pktmon_plugin_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package pktmon
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"os/exec"

"github.com/pkg/errors"

observerv1 "github.com/cilium/cilium/api/v1/observer"
v1 "github.com/cilium/cilium/pkg/hubble/api/v1"
kcfg "github.com/microsoft/retina/pkg/config"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/microsoft/retina/pkg/utils"
"go.uber.org/zap"
"go.uber.org/zap/zapio"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -79,14 +81,14 @@ func newGRPCClient() (*GRPCClient, error) {

bytes, err := json.Marshal(retryPolicy)
if err != nil {
return nil, fmt.Errorf("failed to marshal retry policy: %w", err)
return nil, errors.Wrapf(err, "failed to marshal retry policy")
}

retryPolicyStr := string(bytes)

conn, err := grpc.Dial(fmt.Sprintf("%s:%s", "unix", socket), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(retryPolicyStr))
if err != nil {
return nil, fmt.Errorf("failed to dial pktmon server: %w", err)
return nil, errors.Wrapf(err, "failed to dial pktmon server:")
}

return &GRPCClient{observerv1.NewObserverClient(conn)}, nil
Expand All @@ -100,7 +102,7 @@ func (p *Plugin) RunPktMonServer(ctx context.Context) error {

pwd, err := os.Getwd()
if err != nil {
return fmt.Errorf("failed to get current working directory for pktmon: %w", err)
return errors.Wrapf(err, "failed to get current working directory for pktmon")
}

cmd := pwd + "\\" + "controller-pktmon.exe"
Expand All @@ -117,11 +119,11 @@ func (p *Plugin) RunPktMonServer(ctx context.Context) error {
// block this thread, and should it ever return, it's a problem
err = p.pktmonCmd.Run()
if err != nil {
return fmt.Errorf("pktmon server exited when it should not have: %w", err)
return errors.Wrapf(err, "pktmon server exited when it should not have")
}

// we never want to return happy from this
return fmt.Errorf("pktmon server exited unexpectedly: %w", ErrUnexpectedExit)
return errors.Wrapf(ErrUnexpectedExit, "pktmon server exited unexpectedly")
}

func (p *Plugin) Start(ctx context.Context) error {
Expand All @@ -130,27 +132,34 @@ func (p *Plugin) Start(ctx context.Context) error {
return ErrNilEnricher
}

go func() {
g, ctx := errgroup.WithContext(ctx)

g.Go(func() error {
err := p.RunPktMonServer(ctx)
if err != nil {
p.l.Error("pktmon server exited", zap.Error(err))
return errors.Wrapf(err, "pktmon server exited")
}
}()
return nil
})

err := p.SetupStream()
if err != nil {
return fmt.Errorf("failed to setup initial pktmon stream: %w", err)
return errors.Wrapf(err, "failed to setup initial pktmon stream")
}

// run the getflows loop
for {
err := p.GetFlow(ctx)
if _, ok := status.FromError(err); ok {
p.l.Error("failed to get flow, retriable:", zap.Error(err))
continue
g.Go(func() error {
for {
err := p.GetFlow(ctx)
if _, ok := status.FromError(err); ok {
p.l.Error("failed to get flow, retriable:", zap.Error(err))
continue
}
return errors.Wrapf(err, "failed to get flow, unrecoverable")
}
return fmt.Errorf("failed to get flow, unrecoverable: %w", err)
}
})

return g.Wait()
}

func (p *Plugin) SetupStream() error {
Expand All @@ -159,35 +168,35 @@ func (p *Plugin) SetupStream() error {
p.l.Info("creating pktmon client")
p.grpcClient, err = newGRPCClient()
if err != nil {
return fmt.Errorf("failed to create pktmon client before getting flows: %w", err)
return errors.Wrapf(err, "failed to create pktmon client before getting flows")
}

return nil
}
err = utils.Retry(fn, connectionRetryAttempts)
if err != nil {
return fmt.Errorf("failed to create pktmon client: %w", err)
return errors.Wrapf(err, "failed to create pktmon client")
}

return nil
}

func (p *Plugin) StartStream(ctx context.Context) error {
if p.grpcClient == nil {
return fmt.Errorf("unable to start stream: %w", ErrNilGrpcClient)
return errors.Wrapf(ErrNilGrpcClient, "unable to start stream")
}

var err error
fn := func() error {
p.stream, err = p.grpcClient.GetFlows(ctx, &observerv1.GetFlowsRequest{})
if err != nil {
return fmt.Errorf("failed to open pktmon stream: %w", err)
return errors.Wrapf(err, "failed to open pktmon stream")
}
return nil
}
err = utils.Retry(fn, connectionRetryAttempts)
if err != nil {
return fmt.Errorf("failed to create pktmon client: %w", err)
return errors.Wrapf(err, "failed to create pktmon client")
}

return nil
Expand All @@ -199,17 +208,17 @@ func (p *Plugin) GetFlow(ctx context.Context) error {

err := p.StartStream(ctx)
if err != nil {
return fmt.Errorf("failed to setup pktmon stream: %w", err)
return errors.Wrapf(err, "failed to setup pktmon stream")
}

for {
select {
case <-ctx.Done():
return fmt.Errorf("pktmon plugin context done: %w", ctx.Err())
return errors.Wrapf(ctx.Err(), "pktmon plugin context done")
default:
event, err := p.stream.Recv()
if err != nil {
return fmt.Errorf("failed to receive pktmon event: %w", err)
return errors.Wrapf(err, "failed to receive pktmon event")
}

fl := event.GetFlow()
Expand Down Expand Up @@ -258,7 +267,7 @@ func (p *Plugin) Stop() error {
if p.pktmonCmd != nil {
err := p.pktmonCmd.Process.Kill()
if err != nil {
return fmt.Errorf("failed to kill pktmon server during stop: %w", err)
return errors.Wrapf(err, "failed to kill pktmon server during stop")
}
}

Expand Down

0 comments on commit 05d95cd

Please sign in to comment.