From b583d4375acd0ad6da652e8c80850f0a0f3b80a3 Mon Sep 17 00:00:00 2001 From: Adam Shannon Date: Thu, 20 Feb 2025 14:09:59 -0600 Subject: [PATCH] search: cap groupsize workers by NumCPU / GOMAXPROCS This should result in more stable (less of a long tail) in search performance. --- cmd/server/config.go | 2 ++ cmd/server/download_test.go | 3 ++- cmd/server/main.go | 2 +- configs/config.default.yml | 8 +++++++ internal/search/api_search_test.go | 3 ++- internal/search/config.go | 35 ++++++++++++++++++++++++++++++ internal/search/service.go | 10 ++++----- internal/search/service_test.go | 3 ++- 8 files changed, 56 insertions(+), 10 deletions(-) create mode 100644 internal/search/config.go diff --git a/cmd/server/config.go b/cmd/server/config.go index 1498789c..f45df6b4 100644 --- a/cmd/server/config.go +++ b/cmd/server/config.go @@ -8,6 +8,7 @@ import ( watchman "github.com/moov-io/watchman" "github.com/moov-io/watchman/internal/download" "github.com/moov-io/watchman/internal/postalpool" + "github.com/moov-io/watchman/internal/search" "github.com/moov-io/base/config" "github.com/moov-io/base/log" @@ -23,6 +24,7 @@ type Config struct { Telemetry telemetry.Config Download download.Config + Search search.Config PostalPool postalpool.Config } diff --git a/cmd/server/download_test.go b/cmd/server/download_test.go index c8511bfd..117b936c 100644 --- a/cmd/server/download_test.go +++ b/cmd/server/download_test.go @@ -41,7 +41,8 @@ func TestDownloader_setupPeriodicRefreshing(t *testing.T) { dl, err := download.NewDownloader(logger, conf) require.NoError(t, err) - searchService, err := search.NewService(logger) + searchConfig := search.DefaultConfig() + searchService, err := search.NewService(logger, searchConfig) require.NoError(t, err) go func() { diff --git a/cmd/server/main.go b/cmd/server/main.go index fb182ea9..1239491a 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -69,7 +69,7 @@ func main() { errs := make(chan error, 1) // Setup search service and endpoints - searchService, err := search.NewService(logger) + searchService, err := search.NewService(logger, config.Search) if err != nil { logger.Fatal().LogErrorf("problem setting up search service: %v", err) os.Exit(1) diff --git a/configs/config.default.yml b/configs/config.default.yml index f0149cbd..3e213224 100644 --- a/configs/config.default.yml +++ b/configs/config.default.yml @@ -13,6 +13,14 @@ Watchman: - "us_csl" - "us_ofac" + Search: + # Tune these settings based on your available resources (CPUs, etc). + # Usually a multiple (i.e. 2x, 4x) of GOMAXPROCS is optimal. + SearchGroups: + Default: 10 + Min: 1 + Max: 25 + PostalPool: Enabled: false Instances: 2 diff --git a/internal/search/api_search_test.go b/internal/search/api_search_test.go index 6a13d820..2aad62c0 100644 --- a/internal/search/api_search_test.go +++ b/internal/search/api_search_test.go @@ -45,7 +45,8 @@ func testAPI(tb testing.TB) testSetup { logger := log.NewTestLogger() - service, err := NewService(logger) + searchConfig := DefaultConfig() + service, err := NewService(logger, searchConfig) require.NoError(tb, err) dl := ofactest.GetDownloader(tb) diff --git a/internal/search/config.go b/internal/search/config.go new file mode 100644 index 00000000..931d4340 --- /dev/null +++ b/internal/search/config.go @@ -0,0 +1,35 @@ +package search + +import ( + "cmp" + "os" + "runtime" + "strconv" +) + +type Config struct { + SearchGroups SearchGroups +} + +type SearchGroups struct { + Default int + Min int + Max int +} + +func DefaultConfig() Config { + cpus := runtime.NumCPU() + + if v := os.Getenv("GOMAXPROCS"); v != "" { + n, _ := strconv.ParseInt(v, 10, 8) + cpus = cmp.Or(cpus, int(n)) + } + + return Config{ + SearchGroups: SearchGroups{ + Default: cpus * 2, + Min: cpus, + Max: cpus * 4, + }, + } +} diff --git a/internal/search/service.go b/internal/search/service.go index ac7f5ef8..925cfb76 100644 --- a/internal/search/service.go +++ b/internal/search/service.go @@ -32,19 +32,21 @@ type Service interface { Search(ctx context.Context, query search.Entity[search.Value], opts SearchOpts) ([]search.SearchedEntity[search.Value], error) } -func NewService(logger log.Logger) (Service, error) { - cm, err := groupsize.NewConcurrencyManager(defaultGroupSize, 1, 100) +func NewService(logger log.Logger, config Config) (Service, error) { + cm, err := groupsize.NewConcurrencyManager(config.SearchGroups.Default, config.SearchGroups.Min, config.SearchGroups.Max) if err != nil { return nil, fmt.Errorf("creating search service: %w", err) } return &service{ logger: logger, + config: config, cm: cm, }, nil } type service struct { logger log.Logger + config Config latestStats download.Stats sync.RWMutex // protects latestStats (which has entities and list hashes) @@ -185,10 +187,6 @@ func (s *service) performSearch(ctx context.Context, query search.Entity[search. return out, nil } -const ( - defaultGroupSize = 20 // rough estimate from local testing -) - func getGroupSize(cm *groupsize.ConcurrencyManager) (int, error) { // After local benchmarking this is a tradeoff between the fastest / most efficient group size picking // and offering configurability to users. diff --git a/internal/search/service_test.go b/internal/search/service_test.go index c59a625d..454b6b43 100644 --- a/internal/search/service_test.go +++ b/internal/search/service_test.go @@ -74,7 +74,8 @@ func testService(tb testing.TB) Service { logger := log.NewTestLogger() - svc, err := NewService(logger) + searchConfig := DefaultConfig() + svc, err := NewService(logger, searchConfig) require.NoError(tb, err) svc.UpdateEntities(download.Stats{