Skip to content

Commit

Permalink
feat: record plugin name which sends local reply (#674)
Browse files Browse the repository at this point in the history
Signed-off-by: spacewander <[email protected]>
  • Loading branch information
spacewander authored Aug 8, 2024
1 parent 78c9bf5 commit abac363
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 21 deletions.
47 changes: 29 additions & 18 deletions api/pkg/filtermanager/filtermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,16 @@ func FilterManagerFactory(c interface{}) capi.StreamFilterFactory {
}
}

func (m *filterManager) handleAction(res api.ResultAction, phase phase) (needReturn bool) {
func (m *filterManager) recordLocalReplyPluginName(name string) {
// We can get the plugin name which returns the local response from the dynamic metadata.
// For example, use %DYNAMIC_METADATA(htnn:local_reply_plugin_name)% in the access log format.
m.callbacks.StreamInfo().DynamicMetadata().Set("htnn", "local_reply_plugin_name", name)
// For now, we don't record when the local reply is caused by panic. As we can't always get
// the name of plugin which is the root of the panic correctly. For example, consider a plugin kicks
// off a goroutine and the goroutine panics.
}

func (m *filterManager) handleAction(res api.ResultAction, phase phase, filter *model.FilterWrapper) (needReturn bool) {
if res == api.Continue {
return false
}
Expand All @@ -244,6 +253,7 @@ func (m *filterManager) handleAction(res api.ResultAction, phase phase) (needRet

switch v := res.(type) {
case *api.LocalResponse:
m.recordLocalReplyPluginName(filter.Name)
m.localReply(v)
return true
default:
Expand Down Expand Up @@ -328,6 +338,7 @@ func (m *filterManager) DecodeHeaders(headers capi.RequestHeaderMap, endStream b
m.config.InitOnce()
if m.config.initFailed {
api.LogErrorf("error in plugin %s: %s", m.config.initFailedPluginName, m.config.initFailure)
m.recordLocalReplyPluginName(m.config.initFailedPluginName)
m.localReply(&api.LocalResponse{
Code: 500,
})
Expand All @@ -346,7 +357,7 @@ func (m *filterManager) DecodeHeaders(headers capi.RequestHeaderMap, endStream b
f := m.filters[i]
// We don't support DecodeRequest for now
res = f.DecodeHeaders(m.reqHdr, endStream)
if m.handleAction(res, phaseDecodeHeaders) {
if m.handleAction(res, phaseDecodeHeaders, f) {
return
}
}
Expand Down Expand Up @@ -463,7 +474,7 @@ func (m *filterManager) DecodeHeaders(headers capi.RequestHeaderMap, endStream b
for i := m.config.consumerFiltersEndAt; i < len(m.filters); i++ {
f := m.filters[i]
res = f.DecodeHeaders(m.reqHdr, endStream)
if m.handleAction(res, phaseDecodeHeaders) {
if m.handleAction(res, phaseDecodeHeaders, f) {
return
}

Expand All @@ -479,7 +490,7 @@ func (m *filterManager) DecodeHeaders(headers capi.RequestHeaderMap, endStream b

// no body
res = f.DecodeRequest(m.reqHdr, nil, nil)
if m.handleAction(res, phaseDecodeRequest) {
if m.handleAction(res, phaseDecodeRequest, f) {
return
}
}
Expand Down Expand Up @@ -522,7 +533,7 @@ func (m *filterManager) DecodeData(buf capi.BufferInstance, endStream bool) capi
for i := 0; i < n; i++ {
f := m.filters[i]
res = f.DecodeData(buf, endStream)
if m.handleAction(res, phaseDecodeData) {
if m.handleAction(res, phaseDecodeData, f) {
return
}
}
Expand All @@ -532,14 +543,14 @@ func (m *filterManager) DecodeData(buf capi.BufferInstance, endStream bool) capi
for i := 0; i < m.decodeIdx; i++ {
f := m.filters[i]
res = f.DecodeData(buf, endStream)
if m.handleAction(res, phaseDecodeData) {
if m.handleAction(res, phaseDecodeData, f) {
return
}
}

f := m.filters[m.decodeIdx]
res = f.DecodeRequest(m.reqHdr, buf, nil)
if m.handleAction(res, phaseDecodeRequest) {
if m.handleAction(res, phaseDecodeRequest, f) {
return
}

Expand All @@ -550,7 +561,7 @@ func (m *filterManager) DecodeData(buf capi.BufferInstance, endStream bool) capi
// The endStream in DecodeHeaders indicates whether there is a body.
// The body always exists when we hit this path.
res = f.DecodeHeaders(m.reqHdr, false)
if m.handleAction(res, phaseDecodeHeaders) {
if m.handleAction(res, phaseDecodeHeaders, f) {
return
}
if m.decodeRequestNeeded {
Expand All @@ -564,7 +575,7 @@ func (m *filterManager) DecodeData(buf capi.BufferInstance, endStream bool) capi
for j := m.decodeIdx + 1; j < i; j++ {
f := m.filters[j]
res = f.DecodeData(buf, endStream)
if m.handleAction(res, phaseDecodeData) {
if m.handleAction(res, phaseDecodeData, f) {
return
}
}
Expand All @@ -574,7 +585,7 @@ func (m *filterManager) DecodeData(buf capi.BufferInstance, endStream bool) capi
m.decodeIdx = i
f := m.filters[m.decodeIdx]
res = f.DecodeRequest(m.reqHdr, buf, nil)
if m.handleAction(res, phaseDecodeRequest) {
if m.handleAction(res, phaseDecodeRequest, f) {
return
}
i++
Expand Down Expand Up @@ -614,7 +625,7 @@ func (m *filterManager) EncodeHeaders(headers capi.ResponseHeaderMap, endStream
for i := n - 1; i >= 0; i-- {
f := m.filters[i]
res = f.EncodeHeaders(headers, endStream)
if m.handleAction(res, phaseEncodeHeaders) {
if m.handleAction(res, phaseEncodeHeaders, f) {
return
}

Expand All @@ -628,7 +639,7 @@ func (m *filterManager) EncodeHeaders(headers capi.ResponseHeaderMap, endStream

// no body
res = f.EncodeResponse(headers, nil, nil)
if m.handleAction(res, phaseEncodeResponse) {
if m.handleAction(res, phaseEncodeResponse, f) {
return
}
}
Expand Down Expand Up @@ -658,7 +669,7 @@ func (m *filterManager) EncodeData(buf capi.BufferInstance, endStream bool) capi
for i := n - 1; i >= 0; i-- {
f := m.filters[i]
res = f.EncodeData(buf, endStream)
if m.handleAction(res, phaseEncodeData) {
if m.handleAction(res, phaseEncodeData, f) {
return
}
}
Expand All @@ -668,14 +679,14 @@ func (m *filterManager) EncodeData(buf capi.BufferInstance, endStream bool) capi
for i := n - 1; i > m.encodeIdx; i-- {
f := m.filters[i]
res = f.EncodeData(buf, endStream)
if m.handleAction(res, phaseEncodeData) {
if m.handleAction(res, phaseEncodeData, f) {
return
}
}

f := m.filters[m.encodeIdx]
res = f.EncodeResponse(m.rspHdr, buf, nil)
if m.handleAction(res, phaseEncodeResponse) {
if m.handleAction(res, phaseEncodeResponse, f) {
return
}

Expand All @@ -684,7 +695,7 @@ func (m *filterManager) EncodeData(buf capi.BufferInstance, endStream bool) capi
for ; i >= 0; i-- {
f := m.filters[i]
res = f.EncodeHeaders(m.rspHdr, false)
if m.handleAction(res, phaseEncodeHeaders) {
if m.handleAction(res, phaseEncodeHeaders, f) {
return
}
if m.encodeResponseNeeded {
Expand All @@ -696,7 +707,7 @@ func (m *filterManager) EncodeData(buf capi.BufferInstance, endStream bool) capi
for j := m.encodeIdx - 1; j > i; j-- {
f := m.filters[j]
res = f.EncodeData(buf, endStream)
if m.handleAction(res, phaseEncodeData) {
if m.handleAction(res, phaseEncodeData, f) {
return
}
}
Expand All @@ -706,7 +717,7 @@ func (m *filterManager) EncodeData(buf capi.BufferInstance, endStream bool) capi
m.encodeIdx = i
f := m.filters[m.encodeIdx]
res = f.EncodeResponse(m.rspHdr, buf, nil)
if m.handleAction(res, phaseEncodeResponse) {
if m.handleAction(res, phaseEncodeResponse, f) {
return
}
i--
Expand Down
21 changes: 19 additions & 2 deletions api/plugins/tests/integration/dataplane/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type bootstrap struct {
backendRoutes []map[string]interface{}
consumers map[string]map[string]interface{}
httpFilterGolang map[string]interface{}
accessLogFormat string
}

func Bootstrap() *bootstrap {
Expand Down Expand Up @@ -75,6 +76,11 @@ func (b *bootstrap) SetFilterGolang(cfg map[string]interface{}) *bootstrap {
return b
}

func (b *bootstrap) SetAccessLogFormat(fmt string) *bootstrap {
b.accessLogFormat = fmt
return b
}

func (b *bootstrap) WriteTo(cfgFile *os.File) error {
var root map[string]interface{}
// check if the input is valid yaml
Expand All @@ -84,14 +90,16 @@ func (b *bootstrap) WriteTo(cfgFile *os.File) error {
}

// TODO: simplify it with some third party lib if possible
vh := root["static_resources"].(map[string]interface{})["listeners"].([]interface{})[1].(map[string]interface{})["filter_chains"].([]interface{})[0].(map[string]interface{})["filters"].([]interface{})[0].(map[string]interface{})["typed_config"].(map[string]interface{})["route_config"].(map[string]interface{})["virtual_hosts"].([]interface{})[0].(map[string]interface{})
backendHCM := root["static_resources"].(map[string]interface{})["listeners"].([]interface{})[1].(map[string]interface{})["filter_chains"].([]interface{})[0].(map[string]interface{})["filters"].([]interface{})[0].(map[string]interface{})["typed_config"].(map[string]interface{})
vh := backendHCM["route_config"].(map[string]interface{})["virtual_hosts"].([]interface{})[0].(map[string]interface{})
routes := vh["routes"].([]interface{})
for _, backendRoute := range b.backendRoutes {
routes = append(routes, backendRoute)
}
vh["routes"] = routes

httpFilters := root["static_resources"].(map[string]interface{})["listeners"].([]interface{})[0].(map[string]interface{})["filter_chains"].([]interface{})[0].(map[string]interface{})["filters"].([]interface{})[0].(map[string]interface{})["typed_config"].(map[string]interface{})["http_filters"].([]interface{})
hcm := root["static_resources"].(map[string]interface{})["listeners"].([]interface{})[0].(map[string]interface{})["filter_chains"].([]interface{})[0].(map[string]interface{})["filters"].([]interface{})[0].(map[string]interface{})["typed_config"].(map[string]interface{})
httpFilters := hcm["http_filters"].([]interface{})

var cf map[string]interface{}
for _, hf := range httpFilters {
Expand All @@ -118,6 +126,15 @@ func (b *bootstrap) WriteTo(cfgFile *os.File) error {
}
}

if b.accessLogFormat != "" {
accessLog := hcm["access_log"].([]interface{})[0].(map[string]interface{})["typed_config"].(map[string]interface{})
accessLog["log_format"] = map[string]interface{}{
"text_format_source": map[string]interface{}{
"inline_string": b.accessLogFormat + "\n",
},
}
}

res, err := yaml.Marshal(&root)
if err != nil {
return err
Expand Down
8 changes: 7 additions & 1 deletion api/plugins/tests/pkg/envoy/capi.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (i *DynamicMetadata) Get(filterName string) map[string]interface{} {
func (i *DynamicMetadata) Set(filterName string, key string, value interface{}) {
dm, ok := i.store[filterName]
if !ok {
dm := map[string]interface{}{}
dm = map[string]interface{}{}
i.store[filterName] = dm
}

Expand Down Expand Up @@ -360,6 +360,9 @@ func (i *StreamInfo) AttemptCount() uint32 {
}

func (i *StreamInfo) DynamicMetadata() api.DynamicMetadata {
if i.dynamicMetadata == nil {
i.dynamicMetadata = NewDynamicMetadata(map[string]map[string]interface{}{})
}
return i.dynamicMetadata
}

Expand Down Expand Up @@ -388,6 +391,9 @@ func (i *StreamInfo) UpstreamClusterName() (string, bool) {
}

func (i *StreamInfo) FilterState() api.FilterState {
if i.filterState == nil {
i.filterState = NewFilterState(map[string]string{})
}
return i.filterState
}

Expand Down
33 changes: 33 additions & 0 deletions api/tests/integration/filtermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,9 +912,13 @@ func TestFilterManagerIgnoreUnknownFields(t *testing.T) {

func TestFilterManagerPluginReturnsErrorInParse(t *testing.T) {
dp, err := dataplane.StartDataPlane(t, &dataplane.Option{
Bootstrap: dataplane.Bootstrap().SetAccessLogFormat(
`access_log: %RESPONSE_CODE% plugin: %DYNAMIC_METADATA(htnn:local_reply_plugin_name)%`,
),
NoErrorLogCheck: true,
ExpectLogPattern: []string{
`error in plugin buffer: `,
`access_log: 500 plugin: buffer`,
},
})
if err != nil {
Expand All @@ -934,9 +938,13 @@ func TestFilterManagerPluginReturnsErrorInParse(t *testing.T) {

func TestFilterManagerPluginReturnsErrorInInit(t *testing.T) {
dp, err := dataplane.StartDataPlane(t, &dataplane.Option{
Bootstrap: dataplane.Bootstrap().SetAccessLogFormat(
`access_log: %RESPONSE_CODE% plugin: %DYNAMIC_METADATA(htnn:local_reply_plugin_name)%`,
),
NoErrorLogCheck: true,
ExpectLogPattern: []string{
`error in plugin bad: ouch`,
`access_log: 500 plugin: bad`,
},
})
if err != nil {
Expand Down Expand Up @@ -1069,3 +1077,28 @@ func TestFilterManagerPluginIncorrectMethodDefinition(t *testing.T) {
require.Nil(t, err)
assert.Equal(t, 200, resp.StatusCode, resp)
}

func TestFilterManagerRecordLocalReplyPlugin(t *testing.T) {
dp, err := dataplane.StartDataPlane(t, &dataplane.Option{
Bootstrap: dataplane.Bootstrap().SetAccessLogFormat(
`access_log: %RESPONSE_CODE% plugin: %DYNAMIC_METADATA(htnn:local_reply_plugin_name)%`,
),
ExpectLogPattern: []string{
`access_log: 206 plugin: localReply`,
},
})
if err != nil {
t.Fatalf("failed to start data plane: %v", err)
return
}
defer dp.Stop()

config := controlplane.NewSinglePluinConfig("localReply", map[string]interface{}{
"decode": true,
"headers": true,
})
controlPlane.UseGoPluginConfig(t, config, dp)
resp, err := dp.Get("/echo", nil)
require.Nil(t, err)
assert.Equal(t, 206, resp.StatusCode, resp)
}

0 comments on commit abac363

Please sign in to comment.