Skip to content

Commit

Permalink
filtermanager: init nonblockingly
Browse files Browse the repository at this point in the history
Signed-off-by: spacewander <[email protected]>
  • Loading branch information
spacewander committed Apr 3, 2024
1 parent 4153110 commit 050d98b
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 26 deletions.
2 changes: 1 addition & 1 deletion api/internal/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (c *Consumer) InitConfigs() error {
return fmt.Errorf("plugin %s not found", name)
}

conf, err := p.ConfigParser.Parse(data.Config, nil)
conf, err := p.ConfigParser.Parse(data.Config)
if err != nil {
return fmt.Errorf("%w during parsing plugin %s in consumer", err, name)
}
Expand Down
24 changes: 21 additions & 3 deletions api/pkg/filtermanager/filtermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type filterManagerConfig struct {

parsed []*model.ParsedFilterConfig
pool *sync.Pool

initOnceNonblockingly func()
}

func initFilterManagerConfig(namespace string) *filterManagerConfig {
Expand Down Expand Up @@ -121,8 +123,7 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC
for _, proto := range plugins {
name := proto.Name
if plugin := pkgPlugins.LoadHttpFilterFactoryAndParser(name); plugin != nil {
// For now, we have nothing to provide as config callbacks
config, err := plugin.ConfigParser.Parse(proto.Config, nil)
config, err := plugin.ConfigParser.Parse(proto.Config)
if err != nil {
api.LogErrorf("%s during parsing plugin %s in filtermanager", err, name)

Expand All @@ -133,7 +134,7 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC
// indicates something is wrong.
conf.parsed = append(conf.parsed, &model.ParsedFilterConfig{
Name: proto.Name,
Factory: InternalErrorFactory,
Factory: NewInternalErrorFactory(proto.Name, err),

Check warning on line 137 in api/pkg/filtermanager/filtermanager.go

View check run for this annotation

Codecov / codecov/patch

api/pkg/filtermanager/filtermanager.go#L137

Added line #L137 was not covered by tests
})
} else {
conf.parsed = append(conf.parsed, &model.ParsedFilterConfig{
Expand All @@ -155,6 +156,19 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC
}
conf.consumerFiltersEndAt = consumerFiltersEndAt

conf.initOnceNonblockingly = sync.OnceFunc(func() {
for i, fc := range conf.parsed {
config := fc.ParsedConfig
if initer, ok := config.(pkgPlugins.Initer); ok {
// For now, we have nothing to provide as config callbacks
err := initer.Init(nil)
if err != nil {
conf.parsed[i].Factory = NewInternalErrorFactory(fc.Name, err)

Check warning on line 166 in api/pkg/filtermanager/filtermanager.go

View check run for this annotation

Codecov / codecov/patch

api/pkg/filtermanager/filtermanager.go#L166

Added line #L166 was not covered by tests
}
}
}
})

return conf, nil
}

Expand Down Expand Up @@ -400,11 +414,15 @@ func FilterManagerFactory(c interface{}) capi.StreamFilterFactory {
}
}()

conf.initOnceNonblockingly()

fm := conf.pool.Get().(*filterManager)
fm.callbacks.FilterCallbackHandler = cb

canSkipMethod := fm.canSkipMethod
if canSkipMethod == nil {
// the `canSkipMethod` can't be initialized in initOnceNonblockingly,
// as it depends on the filter which is created per request.
canSkipMethod = newSkipMethodsMap()
}

Expand Down
13 changes: 11 additions & 2 deletions api/pkg/filtermanager/internal_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,25 @@ import (

type internalErrorFilter struct {
api.PassThroughFilter

plugin string
err error
}

func (f *internalErrorFilter) DecodeHeaders(headers api.RequestHeaderMap, endStream bool) api.ResultAction {
api.LogErrorf("error in plugin %s: %s", f.plugin, f.err)

Check warning on line 31 in api/pkg/filtermanager/internal_error.go

View check run for this annotation

Codecov / codecov/patch

api/pkg/filtermanager/internal_error.go#L31

Added line #L31 was not covered by tests
return &api.LocalResponse{
Code: 500,
}
}

func InternalErrorFactory(interface{}, api.FilterCallbackHandler) api.Filter {
return &internalErrorFilter{}
func NewInternalErrorFactory(plugin string, err error) api.FilterFactory {
return func(interface{}, api.FilterCallbackHandler) api.Filter {
return &internalErrorFilter{
plugin: plugin,
err: err,

Check warning on line 41 in api/pkg/filtermanager/internal_error.go

View check run for this annotation

Codecov / codecov/patch

api/pkg/filtermanager/internal_error.go#L37-L41

Added lines #L37 - L41 were not covered by tests
}
}
}

type internalErrorFilterForCAPI struct {
Expand Down
10 changes: 2 additions & 8 deletions api/pkg/plugins/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var (

// Here we introduce extra struct to avoid cyclic import between pkg/filtermanager and pkg/plugins
type FilterConfigParser interface {
Parse(input interface{}, callbacks api.ConfigCallbackHandler) (interface{}, error)
Parse(input interface{}) (interface{}, error)
Merge(parentConfig interface{}, childConfig interface{}) interface{}
}

Expand Down Expand Up @@ -140,7 +140,7 @@ func NewPluginConfigParser(parser GoPlugin) *PluginConfigParser {
}
}

func (cp *PluginConfigParser) Parse(any interface{}, callbacks api.ConfigCallbackHandler) (res interface{}, err error) {
func (cp *PluginConfigParser) Parse(any interface{}) (res interface{}, err error) {
defer func() {
if p := recover(); p != nil {
api.LogErrorf("panic: %v\n%s", p, debug.Stack())
Expand All @@ -166,12 +166,6 @@ func (cp *PluginConfigParser) Parse(any interface{}, callbacks api.ConfigCallbac
return nil, err
}

if initer, ok := conf.(Initer); ok {
err = initer.Init(callbacks)
if err != nil {
return nil, err
}
}
return conf, nil
}

Expand Down
2 changes: 1 addition & 1 deletion api/pkg/plugins/plugins_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestParse(t *testing.T) {
defer cln()
}

res, err := cp.Parse(c.input, nil)
res, err := cp.Parse(c.input)
if c.wantErr {
assert.NotNil(t, err)
} else {
Expand Down
31 changes: 21 additions & 10 deletions api/plugins/tests/integration/config.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions api/plugins/tests/integration/config.pb.validate.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions api/plugins/tests/integration/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ message Config {
message BadPluginConfig {
bool panic_in_factory = 1;
bool panic_in_parse = 2;
bool error_in_init = 3;
}
34 changes: 34 additions & 0 deletions api/plugins/tests/integration/filtermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,9 @@ func TestFilterManagerIgnoreUnknownFields(t *testing.T) {
func TestFilterManagerPluginReturnsErrorInParse(t *testing.T) {
dp, err := data_plane.StartDataPlane(t, &data_plane.Option{
NoErrorLogCheck: true,
ExpectLogPattern: []string{
`error in plugin buffer: `,
},
})
if err != nil {
t.Fatalf("failed to start data plane: %v", err)
Expand All @@ -929,6 +932,37 @@ func TestFilterManagerPluginReturnsErrorInParse(t *testing.T) {
assert.Equal(t, 500, resp.StatusCode, resp)
}

func TestFilterManagerPluginReturnsErrorInInit(t *testing.T) {
dp, err := data_plane.StartDataPlane(t, &data_plane.Option{
NoErrorLogCheck: true,
ExpectLogPattern: []string{
`error in plugin bad: ouch`,
},
})
if err != nil {
t.Fatalf("failed to start data plane: %v", err)
return
}
defer dp.Stop()

config := &filtermanager.FilterManagerConfig{
Plugins: []*model.FilterConfig{
{
Name: "bad",
Config: &badPluginConfig{
BadPluginConfig: BadPluginConfig{
ErrorInInit: true,
},
},
},
},
}
controlPlane.UseGoPluginConfig(t, config, dp)
resp, err := dp.Get("/echo", nil)
require.Nil(t, err)
assert.Equal(t, 500, resp.StatusCode, resp)
}

func TestFilterManagerPluginPanic(t *testing.T) {
dp, err := data_plane.StartDataPlane(t, &data_plane.Option{
NoErrorLogCheck: true,
Expand Down
10 changes: 9 additions & 1 deletion api/plugins/tests/integration/test_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package integration

import (
"errors"
"net/http"
"runtime/debug"
"strings"
Expand Down Expand Up @@ -262,13 +263,20 @@ type badPluginConfig struct {
BadPluginConfig
}

func (c *badPluginConfig) Init(cb api.ConfigCallbackHandler) error {
func (c *badPluginConfig) Validate() error {
if c.PanicInParse {
panic("panic in parse")
}
return nil
}

func (c *badPluginConfig) Init(cb api.ConfigCallbackHandler) error {
if c.ErrorInInit {
return errors.New("ouch")
}
return nil
}

func (p *badPlugin) Config() api.PluginConfig {
return &badPluginConfig{}
}
Expand Down

0 comments on commit 050d98b

Please sign in to comment.