diff --git a/contrib/envoyproxy/envoy/envoy.go b/contrib/envoyproxy/go-control-plane/envoy.go similarity index 93% rename from contrib/envoyproxy/envoy/envoy.go rename to contrib/envoyproxy/go-control-plane/envoy.go index 6805c1b73a..2df4d7a51d 100644 --- a/contrib/envoyproxy/envoy/envoy.go +++ b/contrib/envoyproxy/go-control-plane/envoy.go @@ -3,7 +3,7 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2016 Datadog, Inc. -package envoy +package go_control_plane import ( "context" @@ -32,18 +32,18 @@ import ( envoytypes "github.com/envoyproxy/go-control-plane/envoy/type/v3" ) -const componentName = "envoyproxy/go-control-plane/envoy/service/ext_proc/envoycore" +const componentName = "envoyproxy/go-control-plane" func init() { telemetry.LoadIntegration(componentName) - tracer.MarkIntegrationImported("github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/envoycore") + tracer.MarkIntegrationImported("github.com/envoyproxy/go-control-plane") } -type CurrentRequest struct { +type currentRequest struct { span tracer.Span afterHandle func() ctx context.Context - fakeResponseWriter *FakeResponseWriter + fakeResponseWriter *fakeResponseWriter wrappedResponseWriter http.ResponseWriter } @@ -58,7 +58,7 @@ func StreamServerInterceptor(opts ...grpctrace.Option) grpc.StreamServerIntercep var ( ctx = ss.Context() blocked bool - currentRequest *CurrentRequest + currentRequest *currentRequest processingRequest envoyextproc.ProcessingRequest processingResponse *envoyextproc.ProcessingResponse ) @@ -106,7 +106,7 @@ func StreamServerInterceptor(opts ...grpctrace.Option) grpc.StreamServerIntercep case *envoyextproc.ProcessingRequest_RequestHeaders: processingResponse, currentRequest, blocked, err = ProcessRequestHeaders(ctx, v) case *envoyextproc.ProcessingRequest_ResponseHeaders: - processingResponse, err = ProcessResponseHeaders(v, currentRequest) + processingResponse, err = processResponseHeaders(v, currentRequest) currentRequest = nil // Request is done, reset the current request } @@ -181,16 +181,16 @@ func envoyExternalProcessingRequestTypeAssert(req *envoyextproc.ProcessingReques } } -func ProcessRequestHeaders(ctx context.Context, req *envoyextproc.ProcessingRequest_RequestHeaders) (*envoyextproc.ProcessingResponse, *CurrentRequest, bool, error) { +func ProcessRequestHeaders(ctx context.Context, req *envoyextproc.ProcessingRequest_RequestHeaders) (*envoyextproc.ProcessingResponse, *currentRequest, bool, error) { log.Debug("external_processing: received request headers: %v\n", req.RequestHeaders) - request, err := NewRequestFromExtProc(ctx, req) + request, err := newRequest(ctx, req) if err != nil { return nil, nil, false, status.Errorf(codes.InvalidArgument, "Error processing request headers from ext_proc: %v", err) } var blocked bool - fakeResponseWriter := NewFakeResponseWriter() + fakeResponseWriter := newFakeResponseWriter() wrappedResponseWriter, request, afterHandle, blocked := httptrace.BeforeHandle(&httptrace.ServeConfig{ SpanOpts: []ddtrace.StartSpanOption{ tracer.Tag(ext.SpanKind, ext.SpanKindServer), @@ -214,7 +214,7 @@ func ProcessRequestHeaders(ctx context.Context, req *envoyextproc.ProcessingRequ return nil, nil, false, err } - return processingResponse, &CurrentRequest{ + return processingResponse, ¤tRequest{ span: span, ctx: request.Context(), fakeResponseWriter: fakeResponseWriter, @@ -257,10 +257,10 @@ func propagationRequestHeaderMutation(span ddtrace.Span) (*envoyextproc.Processi }, nil } -func ProcessResponseHeaders(res *envoyextproc.ProcessingRequest_ResponseHeaders, currentRequest *CurrentRequest) (*envoyextproc.ProcessingResponse, error) { +func processResponseHeaders(res *envoyextproc.ProcessingRequest_ResponseHeaders, currentRequest *currentRequest) (*envoyextproc.ProcessingResponse, error) { log.Debug("external_processing: received response headers: %v\n", res.ResponseHeaders) - if err := NewFakeResponseWriterFromExtProc(currentRequest.wrappedResponseWriter, res); err != nil { + if err := createFakeResponseWriter(currentRequest.wrappedResponseWriter, res); err != nil { return nil, status.Errorf(codes.InvalidArgument, "Error processing response headers from ext_proc: %v", err) } @@ -303,7 +303,7 @@ func ProcessResponseHeaders(res *envoyextproc.ProcessingRequest_ResponseHeaders, }, nil } -func doBlockResponse(writer *FakeResponseWriter) *envoyextproc.ProcessingResponse { +func doBlockResponse(writer *fakeResponseWriter) *envoyextproc.ProcessingResponse { var headersMutation []*envoycore.HeaderValueOption for k, v := range writer.headers { headersMutation = append(headersMutation, &envoycore.HeaderValueOption{ diff --git a/contrib/envoyproxy/envoy/envoy_test.go b/contrib/envoyproxy/go-control-plane/envoy_test.go similarity index 99% rename from contrib/envoyproxy/envoy/envoy_test.go rename to contrib/envoyproxy/go-control-plane/envoy_test.go index d8860670df..45e8ec740a 100644 --- a/contrib/envoyproxy/envoy/envoy_test.go +++ b/contrib/envoyproxy/go-control-plane/envoy_test.go @@ -5,7 +5,7 @@ // TODO: Blocking and Redirect action to test -package envoy +package go_control_plane import ( "context" @@ -27,120 +27,6 @@ import ( "google.golang.org/grpc" ) -func end2EndStreamRequest(t *testing.T, stream envoyextproc.ExternalProcessor_ProcessClient, path string, method string, requestHeaders map[string]string, responseHeaders map[string]string, blockOnResponse bool) { - // First part: request - // 1- Send the headers - err := stream.Send(&envoyextproc.ProcessingRequest{ - Request: &envoyextproc.ProcessingRequest_RequestHeaders{ - RequestHeaders: &envoyextproc.HttpHeaders{ - Headers: makeRequestHeaders(requestHeaders, method, path), - }, - }, - }) - require.NoError(t, err) - - res, err := stream.Recv() - require.NoError(t, err) - require.Equal(t, envoyextproc.CommonResponse_CONTINUE, res.GetRequestHeaders().GetResponse().GetStatus()) - - // 2- Send the body - err = stream.Send(&envoyextproc.ProcessingRequest{ - Request: &envoyextproc.ProcessingRequest_RequestBody{ - RequestBody: &envoyextproc.HttpBody{ - Body: []byte("body"), - }, - }, - }) - require.NoError(t, err) - - res, err = stream.Recv() - require.NoError(t, err) - require.Equal(t, envoyextproc.CommonResponse_CONTINUE, res.GetRequestBody().GetResponse().GetStatus()) - - // 3- Send the trailers - err = stream.Send(&envoyextproc.ProcessingRequest{ - Request: &envoyextproc.ProcessingRequest_RequestTrailers{ - RequestTrailers: &envoyextproc.HttpTrailers{ - Trailers: &v3.HeaderMap{ - Headers: []*v3.HeaderValue{ - {Key: "key", Value: "value"}, - }, - }, - }, - }, - }) - require.NoError(t, err) - - res, err = stream.Recv() - require.NoError(t, err) - require.NotNil(t, res.GetRequestTrailers()) - - // Second part: response - // 1- Send the response headers - err = stream.Send(&envoyextproc.ProcessingRequest{ - Request: &envoyextproc.ProcessingRequest_ResponseHeaders{ - ResponseHeaders: &envoyextproc.HttpHeaders{ - Headers: makeResponseHeaders(responseHeaders, "200"), - }, - }, - }) - require.NoError(t, err) - - if blockOnResponse { - // Should have received an immediate response for blocking - // Let the test handle the response - return - } - - res, err = stream.Recv() - require.NoError(t, err) - require.Equal(t, envoyextproc.CommonResponse_CONTINUE, res.GetResponseHeaders().GetResponse().GetStatus()) - - // 2- Send the response body - err = stream.Send(&envoyextproc.ProcessingRequest{ - Request: &envoyextproc.ProcessingRequest_ResponseBody{ - ResponseBody: &envoyextproc.HttpBody{ - Body: []byte("body"), - EndOfStream: true, - }, - }, - }) - require.NoError(t, err) - - // The stream should now be closed - _, err = stream.Recv() - require.Equal(t, io.EOF, err) -} - -func checkForAppsecEvent(t *testing.T, finished []mocktracer.Span, expectedRuleIDs map[string]int) { - // The request should have the attack attempts - event := finished[len(finished)-1].Tag("_dd.appsec.json") - require.NotNil(t, event, "the _dd.appsec.json tag was not found") - - jsonText := event.(string) - type trigger struct { - Rule struct { - ID string `json:"id"` - } `json:"rule"` - } - var parsed struct { - Triggers []trigger `json:"triggers"` - } - err := json.Unmarshal([]byte(jsonText), &parsed) - require.NoError(t, err) - - histogram := map[string]uint8{} - for _, tr := range parsed.Triggers { - histogram[tr.Rule.ID]++ - } - - for ruleID, count := range expectedRuleIDs { - require.Equal(t, count, int(histogram[ruleID]), "rule %s has been triggered %d times but expected %d") - } - - require.Len(t, parsed.Triggers, len(expectedRuleIDs), "unexpected number of rules triggered") -} - func TestAppSec(t *testing.T) { appsec.Start() defer appsec.Stop() @@ -540,6 +426,120 @@ type envoyFixtureServer struct { // Helper functions +func end2EndStreamRequest(t *testing.T, stream envoyextproc.ExternalProcessor_ProcessClient, path string, method string, requestHeaders map[string]string, responseHeaders map[string]string, blockOnResponse bool) { + // First part: request + // 1- Send the headers + err := stream.Send(&envoyextproc.ProcessingRequest{ + Request: &envoyextproc.ProcessingRequest_RequestHeaders{ + RequestHeaders: &envoyextproc.HttpHeaders{ + Headers: makeRequestHeaders(requestHeaders, method, path), + }, + }, + }) + require.NoError(t, err) + + res, err := stream.Recv() + require.NoError(t, err) + require.Equal(t, envoyextproc.CommonResponse_CONTINUE, res.GetRequestHeaders().GetResponse().GetStatus()) + + // 2- Send the body + err = stream.Send(&envoyextproc.ProcessingRequest{ + Request: &envoyextproc.ProcessingRequest_RequestBody{ + RequestBody: &envoyextproc.HttpBody{ + Body: []byte("body"), + }, + }, + }) + require.NoError(t, err) + + res, err = stream.Recv() + require.NoError(t, err) + require.Equal(t, envoyextproc.CommonResponse_CONTINUE, res.GetRequestBody().GetResponse().GetStatus()) + + // 3- Send the trailers + err = stream.Send(&envoyextproc.ProcessingRequest{ + Request: &envoyextproc.ProcessingRequest_RequestTrailers{ + RequestTrailers: &envoyextproc.HttpTrailers{ + Trailers: &v3.HeaderMap{ + Headers: []*v3.HeaderValue{ + {Key: "key", Value: "value"}, + }, + }, + }, + }, + }) + require.NoError(t, err) + + res, err = stream.Recv() + require.NoError(t, err) + require.NotNil(t, res.GetRequestTrailers()) + + // Second part: response + // 1- Send the response headers + err = stream.Send(&envoyextproc.ProcessingRequest{ + Request: &envoyextproc.ProcessingRequest_ResponseHeaders{ + ResponseHeaders: &envoyextproc.HttpHeaders{ + Headers: makeResponseHeaders(responseHeaders, "200"), + }, + }, + }) + require.NoError(t, err) + + if blockOnResponse { + // Should have received an immediate response for blocking + // Let the test handle the response + return + } + + res, err = stream.Recv() + require.NoError(t, err) + require.Equal(t, envoyextproc.CommonResponse_CONTINUE, res.GetResponseHeaders().GetResponse().GetStatus()) + + // 2- Send the response body + err = stream.Send(&envoyextproc.ProcessingRequest{ + Request: &envoyextproc.ProcessingRequest_ResponseBody{ + ResponseBody: &envoyextproc.HttpBody{ + Body: []byte("body"), + EndOfStream: true, + }, + }, + }) + require.NoError(t, err) + + // The stream should now be closed + _, err = stream.Recv() + require.Equal(t, io.EOF, err) +} + +func checkForAppsecEvent(t *testing.T, finished []mocktracer.Span, expectedRuleIDs map[string]int) { + // The request should have the attack attempts + event := finished[len(finished)-1].Tag("_dd.appsec.json") + require.NotNil(t, event, "the _dd.appsec.json tag was not found") + + jsonText := event.(string) + type trigger struct { + Rule struct { + ID string `json:"id"` + } `json:"rule"` + } + var parsed struct { + Triggers []trigger `json:"triggers"` + } + err := json.Unmarshal([]byte(jsonText), &parsed) + require.NoError(t, err) + + histogram := map[string]uint8{} + for _, tr := range parsed.Triggers { + histogram[tr.Rule.ID]++ + } + + for ruleID, count := range expectedRuleIDs { + require.Equal(t, count, int(histogram[ruleID]), "rule %s has been triggered %d times but expected %d") + } + + require.Len(t, parsed.Triggers, len(expectedRuleIDs), "unexpected number of rules triggered") +} + // Construct request headers func makeRequestHeaders(headers map[string]string, method string, path string) *v3.HeaderMap { h := &v3.HeaderMap{} diff --git a/contrib/envoyproxy/envoy/fakehttp.go b/contrib/envoyproxy/go-control-plane/fakehttp.go similarity index 73% rename from contrib/envoyproxy/envoy/fakehttp.go rename to contrib/envoyproxy/go-control-plane/fakehttp.go index 2d1a4652b1..3f20725e1b 100644 --- a/contrib/envoyproxy/envoy/fakehttp.go +++ b/contrib/envoyproxy/go-control-plane/fakehttp.go @@ -3,7 +3,7 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2024 Datadog, Inc. -package envoy +package go_control_plane import ( "context" @@ -32,7 +32,7 @@ func checkPseudoRequestHeaders(headers map[string]string) error { return nil } -// checkPseudoResponseHeaders Verify the required HTTP2 headers are present +// checkPseudoResponseHeaders verifies the required HTTP2 headers are present // Some mandatory headers need to be set. It can happen when it wasn't a real HTTP2 request sent by Envoy, func checkPseudoResponseHeaders(headers map[string]string) error { if _, ok := headers[":status"]; !ok { @@ -55,12 +55,12 @@ func getRemoteAddr(md metadata.MD) string { return xfwd[length-1] } -// partitionPeusdoHeaders Separate normal headers of the initial request made by the client and the pseudo headers of HTTP/2 +// splitPseudoHeaders splits normal headers of the initial request made by the client and the pseudo headers of HTTP/2 // - Format the headers to be used by the tracer as a map[string][]string // - Set headers keys to be canonical -func partitionPeusdoHeaders(receivedHeaders []*corev3.HeaderValue) (map[string][]string, map[string]string) { - headers := make(map[string][]string, len(receivedHeaders)-4) - pseudoHeaders := make(map[string]string, 4) +func splitPseudoHeaders(receivedHeaders []*corev3.HeaderValue) (headers map[string][]string, pseudoHeaders map[string]string) { + headers = make(map[string][]string, len(receivedHeaders)-4) + pseudoHeaders = make(map[string]string, 4) for _, v := range receivedHeaders { key := v.GetKey() if key == "" { @@ -76,8 +76,8 @@ func partitionPeusdoHeaders(receivedHeaders []*corev3.HeaderValue) (map[string][ return headers, pseudoHeaders } -func NewFakeResponseWriterFromExtProc(w http.ResponseWriter, res *extproc.ProcessingRequest_ResponseHeaders) error { - headers, pseudoHeaders := partitionPeusdoHeaders(res.ResponseHeaders.GetHeaders().GetHeaders()) +func createFakeResponseWriter(w http.ResponseWriter, res *extproc.ProcessingRequest_ResponseHeaders) error { + headers, pseudoHeaders := splitPseudoHeaders(res.ResponseHeaders.GetHeaders().GetHeaders()) if err := checkPseudoResponseHeaders(pseudoHeaders); err != nil { return err @@ -96,9 +96,9 @@ func NewFakeResponseWriterFromExtProc(w http.ResponseWriter, res *extproc.Proces return nil } -// NewRequestFromExtProc creates a new http.Request from an ext_proc RequestHeaders message -func NewRequestFromExtProc(ctx context.Context, req *extproc.ProcessingRequest_RequestHeaders) (*http.Request, error) { - headers, pseudoHeaders := partitionPeusdoHeaders(req.RequestHeaders.GetHeaders().GetHeaders()) +// newRequest creates a new http.Request from an ext_proc RequestHeaders message +func newRequest(ctx context.Context, req *extproc.ProcessingRequest_RequestHeaders) (*http.Request, error) { + headers, pseudoHeaders := splitPseudoHeaders(req.RequestHeaders.GetHeaders().GetHeaders()) if err := checkPseudoRequestHeaders(pseudoHeaders); err != nil { return nil, err } @@ -137,15 +137,15 @@ func NewRequestFromExtProc(ctx context.Context, req *extproc.ProcessingRequest_R }).WithContext(ctx), nil } -type FakeResponseWriter struct { +type fakeResponseWriter struct { mu sync.Mutex status int body []byte headers http.Header } -// Reset resets the FakeResponseWriter to its initial state -func (w *FakeResponseWriter) Reset() { +// Reset resets the fakeResponseWriter to its initial state +func (w *fakeResponseWriter) Reset() { w.mu.Lock() defer w.mu.Unlock() w.status = 0 @@ -154,36 +154,36 @@ func (w *FakeResponseWriter) Reset() { } // Status is not in the [http.ResponseWriter] interface, but it is cast into it by the tracing code -func (w *FakeResponseWriter) Status() int { +func (w *fakeResponseWriter) Status() int { w.mu.Lock() defer w.mu.Unlock() return w.status } -func (w *FakeResponseWriter) WriteHeader(status int) { +func (w *fakeResponseWriter) WriteHeader(status int) { w.mu.Lock() defer w.mu.Unlock() w.status = status } -func (w *FakeResponseWriter) Header() http.Header { +func (w *fakeResponseWriter) Header() http.Header { w.mu.Lock() defer w.mu.Unlock() return w.headers } -func (w *FakeResponseWriter) Write(b []byte) (int, error) { +func (w *fakeResponseWriter) Write(b []byte) (int, error) { w.mu.Lock() defer w.mu.Unlock() w.body = append(w.body, b...) return len(b), nil } -var _ http.ResponseWriter = &FakeResponseWriter{} +var _ http.ResponseWriter = &fakeResponseWriter{} -// NewFakeResponseWriter creates a new FakeResponseWriter that can be used to store the response a [http.Handler] made -func NewFakeResponseWriter() *FakeResponseWriter { - return &FakeResponseWriter{ +// newFakeResponseWriter creates a new fakeResponseWriter that can be used to store the response a [http.Handler] made +func newFakeResponseWriter() *fakeResponseWriter { + return &fakeResponseWriter{ headers: make(http.Header), } }