From abac3636a2bf82cc5ead6bd1291626ed05e0edc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BD=97=E6=B3=BD=E8=BD=A9?= Date: Thu, 8 Aug 2024 11:54:56 +0800 Subject: [PATCH] feat: record plugin name which sends local reply (#674) Signed-off-by: spacewander --- api/pkg/filtermanager/filtermanager.go | 47 ++++++++++++------- .../tests/integration/dataplane/bootstrap.go | 21 ++++++++- api/plugins/tests/pkg/envoy/capi.go | 8 +++- api/tests/integration/filtermanager_test.go | 33 +++++++++++++ 4 files changed, 88 insertions(+), 21 deletions(-) diff --git a/api/pkg/filtermanager/filtermanager.go b/api/pkg/filtermanager/filtermanager.go index ada52738..593aff3b 100644 --- a/api/pkg/filtermanager/filtermanager.go +++ b/api/pkg/filtermanager/filtermanager.go @@ -227,7 +227,16 @@ func FilterManagerFactory(c interface{}) capi.StreamFilterFactory { } } -func (m *filterManager) handleAction(res api.ResultAction, phase phase) (needReturn bool) { +func (m *filterManager) recordLocalReplyPluginName(name string) { + // We can get the plugin name which returns the local response from the dynamic metadata. + // For example, use %DYNAMIC_METADATA(htnn:local_reply_plugin_name)% in the access log format. + m.callbacks.StreamInfo().DynamicMetadata().Set("htnn", "local_reply_plugin_name", name) + // For now, we don't record when the local reply is caused by panic. As we can't always get + // the name of plugin which is the root of the panic correctly. For example, consider a plugin kicks + // off a goroutine and the goroutine panics. +} + +func (m *filterManager) handleAction(res api.ResultAction, phase phase, filter *model.FilterWrapper) (needReturn bool) { if res == api.Continue { return false } @@ -244,6 +253,7 @@ func (m *filterManager) handleAction(res api.ResultAction, phase phase) (needRet switch v := res.(type) { case *api.LocalResponse: + m.recordLocalReplyPluginName(filter.Name) m.localReply(v) return true default: @@ -328,6 +338,7 @@ func (m *filterManager) DecodeHeaders(headers capi.RequestHeaderMap, endStream b m.config.InitOnce() if m.config.initFailed { api.LogErrorf("error in plugin %s: %s", m.config.initFailedPluginName, m.config.initFailure) + m.recordLocalReplyPluginName(m.config.initFailedPluginName) m.localReply(&api.LocalResponse{ Code: 500, }) @@ -346,7 +357,7 @@ func (m *filterManager) DecodeHeaders(headers capi.RequestHeaderMap, endStream b f := m.filters[i] // We don't support DecodeRequest for now res = f.DecodeHeaders(m.reqHdr, endStream) - if m.handleAction(res, phaseDecodeHeaders) { + if m.handleAction(res, phaseDecodeHeaders, f) { return } } @@ -463,7 +474,7 @@ func (m *filterManager) DecodeHeaders(headers capi.RequestHeaderMap, endStream b for i := m.config.consumerFiltersEndAt; i < len(m.filters); i++ { f := m.filters[i] res = f.DecodeHeaders(m.reqHdr, endStream) - if m.handleAction(res, phaseDecodeHeaders) { + if m.handleAction(res, phaseDecodeHeaders, f) { return } @@ -479,7 +490,7 @@ func (m *filterManager) DecodeHeaders(headers capi.RequestHeaderMap, endStream b // no body res = f.DecodeRequest(m.reqHdr, nil, nil) - if m.handleAction(res, phaseDecodeRequest) { + if m.handleAction(res, phaseDecodeRequest, f) { return } } @@ -522,7 +533,7 @@ func (m *filterManager) DecodeData(buf capi.BufferInstance, endStream bool) capi for i := 0; i < n; i++ { f := m.filters[i] res = f.DecodeData(buf, endStream) - if m.handleAction(res, phaseDecodeData) { + if m.handleAction(res, phaseDecodeData, f) { return } } @@ -532,14 +543,14 @@ func (m *filterManager) DecodeData(buf capi.BufferInstance, endStream bool) capi for i := 0; i < m.decodeIdx; i++ { f := m.filters[i] res = f.DecodeData(buf, endStream) - if m.handleAction(res, phaseDecodeData) { + if m.handleAction(res, phaseDecodeData, f) { return } } f := m.filters[m.decodeIdx] res = f.DecodeRequest(m.reqHdr, buf, nil) - if m.handleAction(res, phaseDecodeRequest) { + if m.handleAction(res, phaseDecodeRequest, f) { return } @@ -550,7 +561,7 @@ func (m *filterManager) DecodeData(buf capi.BufferInstance, endStream bool) capi // The endStream in DecodeHeaders indicates whether there is a body. // The body always exists when we hit this path. res = f.DecodeHeaders(m.reqHdr, false) - if m.handleAction(res, phaseDecodeHeaders) { + if m.handleAction(res, phaseDecodeHeaders, f) { return } if m.decodeRequestNeeded { @@ -564,7 +575,7 @@ func (m *filterManager) DecodeData(buf capi.BufferInstance, endStream bool) capi for j := m.decodeIdx + 1; j < i; j++ { f := m.filters[j] res = f.DecodeData(buf, endStream) - if m.handleAction(res, phaseDecodeData) { + if m.handleAction(res, phaseDecodeData, f) { return } } @@ -574,7 +585,7 @@ func (m *filterManager) DecodeData(buf capi.BufferInstance, endStream bool) capi m.decodeIdx = i f := m.filters[m.decodeIdx] res = f.DecodeRequest(m.reqHdr, buf, nil) - if m.handleAction(res, phaseDecodeRequest) { + if m.handleAction(res, phaseDecodeRequest, f) { return } i++ @@ -614,7 +625,7 @@ func (m *filterManager) EncodeHeaders(headers capi.ResponseHeaderMap, endStream for i := n - 1; i >= 0; i-- { f := m.filters[i] res = f.EncodeHeaders(headers, endStream) - if m.handleAction(res, phaseEncodeHeaders) { + if m.handleAction(res, phaseEncodeHeaders, f) { return } @@ -628,7 +639,7 @@ func (m *filterManager) EncodeHeaders(headers capi.ResponseHeaderMap, endStream // no body res = f.EncodeResponse(headers, nil, nil) - if m.handleAction(res, phaseEncodeResponse) { + if m.handleAction(res, phaseEncodeResponse, f) { return } } @@ -658,7 +669,7 @@ func (m *filterManager) EncodeData(buf capi.BufferInstance, endStream bool) capi for i := n - 1; i >= 0; i-- { f := m.filters[i] res = f.EncodeData(buf, endStream) - if m.handleAction(res, phaseEncodeData) { + if m.handleAction(res, phaseEncodeData, f) { return } } @@ -668,14 +679,14 @@ func (m *filterManager) EncodeData(buf capi.BufferInstance, endStream bool) capi for i := n - 1; i > m.encodeIdx; i-- { f := m.filters[i] res = f.EncodeData(buf, endStream) - if m.handleAction(res, phaseEncodeData) { + if m.handleAction(res, phaseEncodeData, f) { return } } f := m.filters[m.encodeIdx] res = f.EncodeResponse(m.rspHdr, buf, nil) - if m.handleAction(res, phaseEncodeResponse) { + if m.handleAction(res, phaseEncodeResponse, f) { return } @@ -684,7 +695,7 @@ func (m *filterManager) EncodeData(buf capi.BufferInstance, endStream bool) capi for ; i >= 0; i-- { f := m.filters[i] res = f.EncodeHeaders(m.rspHdr, false) - if m.handleAction(res, phaseEncodeHeaders) { + if m.handleAction(res, phaseEncodeHeaders, f) { return } if m.encodeResponseNeeded { @@ -696,7 +707,7 @@ func (m *filterManager) EncodeData(buf capi.BufferInstance, endStream bool) capi for j := m.encodeIdx - 1; j > i; j-- { f := m.filters[j] res = f.EncodeData(buf, endStream) - if m.handleAction(res, phaseEncodeData) { + if m.handleAction(res, phaseEncodeData, f) { return } } @@ -706,7 +717,7 @@ func (m *filterManager) EncodeData(buf capi.BufferInstance, endStream bool) capi m.encodeIdx = i f := m.filters[m.encodeIdx] res = f.EncodeResponse(m.rspHdr, buf, nil) - if m.handleAction(res, phaseEncodeResponse) { + if m.handleAction(res, phaseEncodeResponse, f) { return } i-- diff --git a/api/plugins/tests/integration/dataplane/bootstrap.go b/api/plugins/tests/integration/dataplane/bootstrap.go index a05a351c..54d742d9 100644 --- a/api/plugins/tests/integration/dataplane/bootstrap.go +++ b/api/plugins/tests/integration/dataplane/bootstrap.go @@ -32,6 +32,7 @@ type bootstrap struct { backendRoutes []map[string]interface{} consumers map[string]map[string]interface{} httpFilterGolang map[string]interface{} + accessLogFormat string } func Bootstrap() *bootstrap { @@ -75,6 +76,11 @@ func (b *bootstrap) SetFilterGolang(cfg map[string]interface{}) *bootstrap { return b } +func (b *bootstrap) SetAccessLogFormat(fmt string) *bootstrap { + b.accessLogFormat = fmt + return b +} + func (b *bootstrap) WriteTo(cfgFile *os.File) error { var root map[string]interface{} // check if the input is valid yaml @@ -84,14 +90,16 @@ func (b *bootstrap) WriteTo(cfgFile *os.File) error { } // TODO: simplify it with some third party lib if possible - vh := root["static_resources"].(map[string]interface{})["listeners"].([]interface{})[1].(map[string]interface{})["filter_chains"].([]interface{})[0].(map[string]interface{})["filters"].([]interface{})[0].(map[string]interface{})["typed_config"].(map[string]interface{})["route_config"].(map[string]interface{})["virtual_hosts"].([]interface{})[0].(map[string]interface{}) + backendHCM := root["static_resources"].(map[string]interface{})["listeners"].([]interface{})[1].(map[string]interface{})["filter_chains"].([]interface{})[0].(map[string]interface{})["filters"].([]interface{})[0].(map[string]interface{})["typed_config"].(map[string]interface{}) + vh := backendHCM["route_config"].(map[string]interface{})["virtual_hosts"].([]interface{})[0].(map[string]interface{}) routes := vh["routes"].([]interface{}) for _, backendRoute := range b.backendRoutes { routes = append(routes, backendRoute) } vh["routes"] = routes - httpFilters := root["static_resources"].(map[string]interface{})["listeners"].([]interface{})[0].(map[string]interface{})["filter_chains"].([]interface{})[0].(map[string]interface{})["filters"].([]interface{})[0].(map[string]interface{})["typed_config"].(map[string]interface{})["http_filters"].([]interface{}) + hcm := root["static_resources"].(map[string]interface{})["listeners"].([]interface{})[0].(map[string]interface{})["filter_chains"].([]interface{})[0].(map[string]interface{})["filters"].([]interface{})[0].(map[string]interface{})["typed_config"].(map[string]interface{}) + httpFilters := hcm["http_filters"].([]interface{}) var cf map[string]interface{} for _, hf := range httpFilters { @@ -118,6 +126,15 @@ func (b *bootstrap) WriteTo(cfgFile *os.File) error { } } + if b.accessLogFormat != "" { + accessLog := hcm["access_log"].([]interface{})[0].(map[string]interface{})["typed_config"].(map[string]interface{}) + accessLog["log_format"] = map[string]interface{}{ + "text_format_source": map[string]interface{}{ + "inline_string": b.accessLogFormat + "\n", + }, + } + } + res, err := yaml.Marshal(&root) if err != nil { return err diff --git a/api/plugins/tests/pkg/envoy/capi.go b/api/plugins/tests/pkg/envoy/capi.go index 21837519..c51664fd 100644 --- a/api/plugins/tests/pkg/envoy/capi.go +++ b/api/plugins/tests/pkg/envoy/capi.go @@ -303,7 +303,7 @@ func (i *DynamicMetadata) Get(filterName string) map[string]interface{} { func (i *DynamicMetadata) Set(filterName string, key string, value interface{}) { dm, ok := i.store[filterName] if !ok { - dm := map[string]interface{}{} + dm = map[string]interface{}{} i.store[filterName] = dm } @@ -360,6 +360,9 @@ func (i *StreamInfo) AttemptCount() uint32 { } func (i *StreamInfo) DynamicMetadata() api.DynamicMetadata { + if i.dynamicMetadata == nil { + i.dynamicMetadata = NewDynamicMetadata(map[string]map[string]interface{}{}) + } return i.dynamicMetadata } @@ -388,6 +391,9 @@ func (i *StreamInfo) UpstreamClusterName() (string, bool) { } func (i *StreamInfo) FilterState() api.FilterState { + if i.filterState == nil { + i.filterState = NewFilterState(map[string]string{}) + } return i.filterState } diff --git a/api/tests/integration/filtermanager_test.go b/api/tests/integration/filtermanager_test.go index 6fb81d96..75d7fb92 100644 --- a/api/tests/integration/filtermanager_test.go +++ b/api/tests/integration/filtermanager_test.go @@ -912,9 +912,13 @@ func TestFilterManagerIgnoreUnknownFields(t *testing.T) { func TestFilterManagerPluginReturnsErrorInParse(t *testing.T) { dp, err := dataplane.StartDataPlane(t, &dataplane.Option{ + Bootstrap: dataplane.Bootstrap().SetAccessLogFormat( + `access_log: %RESPONSE_CODE% plugin: %DYNAMIC_METADATA(htnn:local_reply_plugin_name)%`, + ), NoErrorLogCheck: true, ExpectLogPattern: []string{ `error in plugin buffer: `, + `access_log: 500 plugin: buffer`, }, }) if err != nil { @@ -934,9 +938,13 @@ func TestFilterManagerPluginReturnsErrorInParse(t *testing.T) { func TestFilterManagerPluginReturnsErrorInInit(t *testing.T) { dp, err := dataplane.StartDataPlane(t, &dataplane.Option{ + Bootstrap: dataplane.Bootstrap().SetAccessLogFormat( + `access_log: %RESPONSE_CODE% plugin: %DYNAMIC_METADATA(htnn:local_reply_plugin_name)%`, + ), NoErrorLogCheck: true, ExpectLogPattern: []string{ `error in plugin bad: ouch`, + `access_log: 500 plugin: bad`, }, }) if err != nil { @@ -1069,3 +1077,28 @@ func TestFilterManagerPluginIncorrectMethodDefinition(t *testing.T) { require.Nil(t, err) assert.Equal(t, 200, resp.StatusCode, resp) } + +func TestFilterManagerRecordLocalReplyPlugin(t *testing.T) { + dp, err := dataplane.StartDataPlane(t, &dataplane.Option{ + Bootstrap: dataplane.Bootstrap().SetAccessLogFormat( + `access_log: %RESPONSE_CODE% plugin: %DYNAMIC_METADATA(htnn:local_reply_plugin_name)%`, + ), + ExpectLogPattern: []string{ + `access_log: 206 plugin: localReply`, + }, + }) + if err != nil { + t.Fatalf("failed to start data plane: %v", err) + return + } + defer dp.Stop() + + config := controlplane.NewSinglePluinConfig("localReply", map[string]interface{}{ + "decode": true, + "headers": true, + }) + controlPlane.UseGoPluginConfig(t, config, dp) + resp, err := dp.Get("/echo", nil) + require.Nil(t, err) + assert.Equal(t, 206, resp.StatusCode, resp) +}