Skip to content

Commit

Permalink
Apply comments
Browse files Browse the repository at this point in the history
  • Loading branch information
e-n-0 committed Nov 20, 2024
1 parent e3bb1e5 commit db73a1f
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package envoy
package go_control_plane

import (
"context"
Expand Down Expand Up @@ -32,18 +32,18 @@ import (
envoytypes "github.com/envoyproxy/go-control-plane/envoy/type/v3"
)

const componentName = "envoyproxy/go-control-plane/envoy/service/ext_proc/envoycore"
const componentName = "envoyproxy/go-control-plane"

func init() {
telemetry.LoadIntegration(componentName)
tracer.MarkIntegrationImported("github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/envoycore")
tracer.MarkIntegrationImported("github.com/envoyproxy/go-control-plane")
}

type CurrentRequest struct {
type currentRequest struct {
span tracer.Span
afterHandle func()
ctx context.Context
fakeResponseWriter *FakeResponseWriter
fakeResponseWriter *fakeResponseWriter
wrappedResponseWriter http.ResponseWriter
}

Expand All @@ -58,7 +58,7 @@ func StreamServerInterceptor(opts ...grpctrace.Option) grpc.StreamServerIntercep
var (
ctx = ss.Context()
blocked bool
currentRequest *CurrentRequest
currentRequest *currentRequest
processingRequest envoyextproc.ProcessingRequest
processingResponse *envoyextproc.ProcessingResponse
)
Expand Down Expand Up @@ -106,7 +106,7 @@ func StreamServerInterceptor(opts ...grpctrace.Option) grpc.StreamServerIntercep
case *envoyextproc.ProcessingRequest_RequestHeaders:
processingResponse, currentRequest, blocked, err = ProcessRequestHeaders(ctx, v)
case *envoyextproc.ProcessingRequest_ResponseHeaders:
processingResponse, err = ProcessResponseHeaders(v, currentRequest)
processingResponse, err = processResponseHeaders(v, currentRequest)
currentRequest = nil // Request is done, reset the current request
}

Expand Down Expand Up @@ -181,16 +181,16 @@ func envoyExternalProcessingRequestTypeAssert(req *envoyextproc.ProcessingReques
}
}

func ProcessRequestHeaders(ctx context.Context, req *envoyextproc.ProcessingRequest_RequestHeaders) (*envoyextproc.ProcessingResponse, *CurrentRequest, bool, error) {
func ProcessRequestHeaders(ctx context.Context, req *envoyextproc.ProcessingRequest_RequestHeaders) (*envoyextproc.ProcessingResponse, *currentRequest, bool, error) {
log.Debug("external_processing: received request headers: %v\n", req.RequestHeaders)

request, err := NewRequestFromExtProc(ctx, req)
request, err := newRequest(ctx, req)
if err != nil {
return nil, nil, false, status.Errorf(codes.InvalidArgument, "Error processing request headers from ext_proc: %v", err)
}

var blocked bool
fakeResponseWriter := NewFakeResponseWriter()
fakeResponseWriter := newFakeResponseWriter()
wrappedResponseWriter, request, afterHandle, blocked := httptrace.BeforeHandle(&httptrace.ServeConfig{
SpanOpts: []ddtrace.StartSpanOption{
tracer.Tag(ext.SpanKind, ext.SpanKindServer),
Expand All @@ -214,7 +214,7 @@ func ProcessRequestHeaders(ctx context.Context, req *envoyextproc.ProcessingRequ
return nil, nil, false, err
}

return processingResponse, &CurrentRequest{
return processingResponse, &currentRequest{
span: span,
ctx: request.Context(),
fakeResponseWriter: fakeResponseWriter,
Expand Down Expand Up @@ -257,10 +257,10 @@ func propagationRequestHeaderMutation(span ddtrace.Span) (*envoyextproc.Processi
}, nil
}

func ProcessResponseHeaders(res *envoyextproc.ProcessingRequest_ResponseHeaders, currentRequest *CurrentRequest) (*envoyextproc.ProcessingResponse, error) {
func processResponseHeaders(res *envoyextproc.ProcessingRequest_ResponseHeaders, currentRequest *currentRequest) (*envoyextproc.ProcessingResponse, error) {
log.Debug("external_processing: received response headers: %v\n", res.ResponseHeaders)

if err := NewFakeResponseWriterFromExtProc(currentRequest.wrappedResponseWriter, res); err != nil {
if err := createFakeResponseWriter(currentRequest.wrappedResponseWriter, res); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "Error processing response headers from ext_proc: %v", err)
}

Expand Down Expand Up @@ -303,7 +303,7 @@ func ProcessResponseHeaders(res *envoyextproc.ProcessingRequest_ResponseHeaders,
}, nil
}

func doBlockResponse(writer *FakeResponseWriter) *envoyextproc.ProcessingResponse {
func doBlockResponse(writer *fakeResponseWriter) *envoyextproc.ProcessingResponse {
var headersMutation []*envoycore.HeaderValueOption
for k, v := range writer.headers {
headersMutation = append(headersMutation, &envoycore.HeaderValueOption{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

// TODO: Blocking and Redirect action to test

package envoy
package go_control_plane

import (
"context"
Expand All @@ -27,120 +27,6 @@ import (
"google.golang.org/grpc"
)

func end2EndStreamRequest(t *testing.T, stream envoyextproc.ExternalProcessor_ProcessClient, path string, method string, requestHeaders map[string]string, responseHeaders map[string]string, blockOnResponse bool) {
// First part: request
// 1- Send the headers
err := stream.Send(&envoyextproc.ProcessingRequest{
Request: &envoyextproc.ProcessingRequest_RequestHeaders{
RequestHeaders: &envoyextproc.HttpHeaders{
Headers: makeRequestHeaders(requestHeaders, method, path),
},
},
})
require.NoError(t, err)

res, err := stream.Recv()
require.NoError(t, err)
require.Equal(t, envoyextproc.CommonResponse_CONTINUE, res.GetRequestHeaders().GetResponse().GetStatus())

// 2- Send the body
err = stream.Send(&envoyextproc.ProcessingRequest{
Request: &envoyextproc.ProcessingRequest_RequestBody{
RequestBody: &envoyextproc.HttpBody{
Body: []byte("body"),
},
},
})
require.NoError(t, err)

res, err = stream.Recv()
require.NoError(t, err)
require.Equal(t, envoyextproc.CommonResponse_CONTINUE, res.GetRequestBody().GetResponse().GetStatus())

// 3- Send the trailers
err = stream.Send(&envoyextproc.ProcessingRequest{
Request: &envoyextproc.ProcessingRequest_RequestTrailers{
RequestTrailers: &envoyextproc.HttpTrailers{
Trailers: &v3.HeaderMap{
Headers: []*v3.HeaderValue{
{Key: "key", Value: "value"},
},
},
},
},
})
require.NoError(t, err)

res, err = stream.Recv()
require.NoError(t, err)
require.NotNil(t, res.GetRequestTrailers())

// Second part: response
// 1- Send the response headers
err = stream.Send(&envoyextproc.ProcessingRequest{
Request: &envoyextproc.ProcessingRequest_ResponseHeaders{
ResponseHeaders: &envoyextproc.HttpHeaders{
Headers: makeResponseHeaders(responseHeaders, "200"),
},
},
})
require.NoError(t, err)

if blockOnResponse {
// Should have received an immediate response for blocking
// Let the test handle the response
return
}

res, err = stream.Recv()
require.NoError(t, err)
require.Equal(t, envoyextproc.CommonResponse_CONTINUE, res.GetResponseHeaders().GetResponse().GetStatus())

// 2- Send the response body
err = stream.Send(&envoyextproc.ProcessingRequest{
Request: &envoyextproc.ProcessingRequest_ResponseBody{
ResponseBody: &envoyextproc.HttpBody{
Body: []byte("body"),
EndOfStream: true,
},
},
})
require.NoError(t, err)

// The stream should now be closed
_, err = stream.Recv()
require.Equal(t, io.EOF, err)
}

func checkForAppsecEvent(t *testing.T, finished []mocktracer.Span, expectedRuleIDs map[string]int) {
// The request should have the attack attempts
event := finished[len(finished)-1].Tag("_dd.appsec.json")
require.NotNil(t, event, "the _dd.appsec.json tag was not found")

jsonText := event.(string)
type trigger struct {
Rule struct {
ID string `json:"id"`
} `json:"rule"`
}
var parsed struct {
Triggers []trigger `json:"triggers"`
}
err := json.Unmarshal([]byte(jsonText), &parsed)
require.NoError(t, err)

histogram := map[string]uint8{}
for _, tr := range parsed.Triggers {
histogram[tr.Rule.ID]++
}

for ruleID, count := range expectedRuleIDs {
require.Equal(t, count, int(histogram[ruleID]), "rule %s has been triggered %d times but expected %d")
}

require.Len(t, parsed.Triggers, len(expectedRuleIDs), "unexpected number of rules triggered")
}

func TestAppSec(t *testing.T) {
appsec.Start()
defer appsec.Stop()
Expand Down Expand Up @@ -540,6 +426,120 @@ type envoyFixtureServer struct {

// Helper functions

func end2EndStreamRequest(t *testing.T, stream envoyextproc.ExternalProcessor_ProcessClient, path string, method string, requestHeaders map[string]string, responseHeaders map[string]string, blockOnResponse bool) {
// First part: request
// 1- Send the headers
err := stream.Send(&envoyextproc.ProcessingRequest{
Request: &envoyextproc.ProcessingRequest_RequestHeaders{
RequestHeaders: &envoyextproc.HttpHeaders{
Headers: makeRequestHeaders(requestHeaders, method, path),
},
},
})
require.NoError(t, err)

res, err := stream.Recv()
require.NoError(t, err)
require.Equal(t, envoyextproc.CommonResponse_CONTINUE, res.GetRequestHeaders().GetResponse().GetStatus())

// 2- Send the body
err = stream.Send(&envoyextproc.ProcessingRequest{
Request: &envoyextproc.ProcessingRequest_RequestBody{
RequestBody: &envoyextproc.HttpBody{
Body: []byte("body"),
},
},
})
require.NoError(t, err)

res, err = stream.Recv()
require.NoError(t, err)
require.Equal(t, envoyextproc.CommonResponse_CONTINUE, res.GetRequestBody().GetResponse().GetStatus())

// 3- Send the trailers
err = stream.Send(&envoyextproc.ProcessingRequest{
Request: &envoyextproc.ProcessingRequest_RequestTrailers{
RequestTrailers: &envoyextproc.HttpTrailers{
Trailers: &v3.HeaderMap{
Headers: []*v3.HeaderValue{
{Key: "key", Value: "value"},
},
},
},
},
})
require.NoError(t, err)

res, err = stream.Recv()
require.NoError(t, err)
require.NotNil(t, res.GetRequestTrailers())

// Second part: response
// 1- Send the response headers
err = stream.Send(&envoyextproc.ProcessingRequest{
Request: &envoyextproc.ProcessingRequest_ResponseHeaders{
ResponseHeaders: &envoyextproc.HttpHeaders{
Headers: makeResponseHeaders(responseHeaders, "200"),
},
},
})
require.NoError(t, err)

if blockOnResponse {
// Should have received an immediate response for blocking
// Let the test handle the response
return
}

res, err = stream.Recv()
require.NoError(t, err)
require.Equal(t, envoyextproc.CommonResponse_CONTINUE, res.GetResponseHeaders().GetResponse().GetStatus())

// 2- Send the response body
err = stream.Send(&envoyextproc.ProcessingRequest{
Request: &envoyextproc.ProcessingRequest_ResponseBody{
ResponseBody: &envoyextproc.HttpBody{
Body: []byte("body"),
EndOfStream: true,
},
},
})
require.NoError(t, err)

// The stream should now be closed
_, err = stream.Recv()
require.Equal(t, io.EOF, err)
}

func checkForAppsecEvent(t *testing.T, finished []mocktracer.Span, expectedRuleIDs map[string]int) {
// The request should have the attack attempts
event := finished[len(finished)-1].Tag("_dd.appsec.json")
require.NotNil(t, event, "the _dd.appsec.json tag was not found")

jsonText := event.(string)
type trigger struct {
Rule struct {
ID string `json:"id"`
} `json:"rule"`
}
var parsed struct {
Triggers []trigger `json:"triggers"`
}
err := json.Unmarshal([]byte(jsonText), &parsed)
require.NoError(t, err)

histogram := map[string]uint8{}
for _, tr := range parsed.Triggers {
histogram[tr.Rule.ID]++
}

for ruleID, count := range expectedRuleIDs {
require.Equal(t, count, int(histogram[ruleID]), "rule %s has been triggered %d times but expected %d")
}

require.Len(t, parsed.Triggers, len(expectedRuleIDs), "unexpected number of rules triggered")
}

// Construct request headers
func makeRequestHeaders(headers map[string]string, method string, path string) *v3.HeaderMap {
h := &v3.HeaderMap{}
Expand Down
Loading

0 comments on commit db73a1f

Please sign in to comment.