From e9f1ff439b25425b9435eee1605383da9f879773 Mon Sep 17 00:00:00 2001 From: Michael Hoffmann <mhoffm@posteo.de> Date: Thu, 7 Nov 2024 14:46:34 +0100 Subject: [PATCH] query, rule: make endpoint discovery dynamically reloadable * Removed previously deprecated and hidden flags to configure endpoints ( --rule, --target, ...) * Removed --store.sd-file and --store.sd-interval flags * Added new flags --endpoint.sd-config, --endpoint-sd-config-reload-interval to configure a dynamic SD file * Moved endpoint set construction into cmd/thanos/endpointset.go for a little cleanup The new config makes it possible to also set "strict" and "group" flags on the endpoint instead of only their addresses, making it possible to have file based service discovery for endpoint groups too. Signed-off-by: Michael Hoffmann <mhoffm@posteo.de> --- cmd/thanos/config.go | 38 +++ cmd/thanos/endpointset.go | 298 ++++++++++++++++++++++ cmd/thanos/query.go | 454 +++++---------------------------- cmd/thanos/rule.go | 43 +--- docs/components/query.md | 48 +--- pkg/discovery/dns/grpc.go | 2 +- pkg/query/endpointset.go | 28 +- pkg/query/endpointset_test.go | 30 +-- test/e2e/e2ethanos/services.go | 27 +- 9 files changed, 455 insertions(+), 513 deletions(-) create mode 100644 cmd/thanos/endpointset.go diff --git a/cmd/thanos/config.go b/cmd/thanos/config.go index f72d19fd79f..f16f8857379 100644 --- a/cmd/thanos/config.go +++ b/cmd/thanos/config.go @@ -14,11 +14,17 @@ import ( "github.com/KimMachineGun/automemlimit/memlimit" extflag "github.com/efficientgo/tools/extkingpin" + "github.com/go-kit/log" + "github.com/opentracing/opentracing-go" "github.com/pkg/errors" + "google.golang.org/grpc" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/thanos-io/thanos/pkg/extgrpc" + "github.com/thanos-io/thanos/pkg/extgrpc/snappy" "github.com/thanos-io/thanos/pkg/extkingpin" "github.com/thanos-io/thanos/pkg/shipper" ) @@ -58,6 +64,38 @@ func (gc *grpcConfig) registerFlag(cmd extkingpin.FlagClause) *grpcConfig { return gc } +type grpcClientConfig struct { + secure bool + skipVerify bool + cert, key, caCert string + serverName string + compression string +} + +func (gc *grpcClientConfig) registerFlag(cmd extkingpin.FlagClause) *grpcClientConfig { + cmd.Flag("grpc-client-tls-secure", "Use TLS when talking to the gRPC server").Default("false").BoolVar(&gc.secure) + cmd.Flag("grpc-client-tls-skip-verify", "Disable TLS certificate verification i.e self signed, signed by fake CA").Default("false").BoolVar(&gc.skipVerify) + cmd.Flag("grpc-client-tls-cert", "TLS Certificates to use to identify this client to the server").Default("").StringVar(&gc.cert) + cmd.Flag("grpc-client-tls-key", "TLS Key for the client's certificate").Default("").StringVar(&gc.key) + cmd.Flag("grpc-client-tls-ca", "TLS CA Certificates to use to verify gRPC servers").Default("").StringVar(&gc.caCert) + cmd.Flag("grpc-client-server-name", "Server name to verify the hostname on the returned gRPC certificates. See https://tools.ietf.org/html/rfc4366#section-3.1").Default("").StringVar(&gc.serverName) + compressionOptions := strings.Join([]string{snappy.Name, compressionNone}, ", ") + cmd.Flag("grpc-compression", "Compression algorithm to use for gRPC requests to other clients. Must be one of: "+compressionOptions).Default(compressionNone).EnumVar(&gc.compression, snappy.Name, compressionNone) + + return gc +} + +func (gc *grpcClientConfig) dialOptions(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer) ([]grpc.DialOption, error) { + dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer, gc.secure, gc.skipVerify, gc.cert, gc.key, gc.caCert, gc.serverName) + if err != nil { + return nil, errors.Wrapf(err, "building gRPC client") + } + if gc.compression != compressionNone { + dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gc.compression))) + } + return dialOpts, nil +} + type httpConfig struct { bindAddress string tlsConfig string diff --git a/cmd/thanos/endpointset.go b/cmd/thanos/endpointset.go new file mode 100644 index 00000000000..f17dc106b8a --- /dev/null +++ b/cmd/thanos/endpointset.go @@ -0,0 +1,298 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package main + +import ( + "context" + "fmt" + "sync" + "time" + + extflag "github.com/efficientgo/tools/extkingpin" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/oklog/run" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "google.golang.org/grpc" + "gopkg.in/yaml.v3" + + "github.com/thanos-io/thanos/pkg/discovery/dns" + "github.com/thanos-io/thanos/pkg/errors" + "github.com/thanos-io/thanos/pkg/extgrpc" + "github.com/thanos-io/thanos/pkg/extkingpin" + "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/query" + "github.com/thanos-io/thanos/pkg/runutil" +) + +type EndpointSpec struct { + Strict bool + Group bool + Address string +} + +type EndpointConfig struct { + Endpoints []EndpointSpec +} + +type endpointConfigProvider struct { + mu sync.Mutex + cfg EndpointConfig + + // statically defined endpoints from flags for backwards compatibility + endpoints []string + endpointGroups []string + strictEndpoints []string + strictEndpointGroups []string +} + +func (er *endpointConfigProvider) config() EndpointConfig { + er.mu.Lock() + defer er.mu.Unlock() + + res := EndpointConfig{} + copy(res.Endpoints, er.cfg.Endpoints) + return res +} + +func (er *endpointConfigProvider) parse(configFile *extflag.PathOrContent) (EndpointConfig, error) { + content, err := configFile.Content() + if err != nil { + return EndpointConfig{}, errors.Wrapf(err, "unable to load config content: %s", configFile.Path()) + } + var cfg EndpointConfig + if err := yaml.Unmarshal(content, &cfg); err != nil { + return EndpointConfig{}, errors.Wrapf(err, "unable to unmarshal config content: %s", configFile.Path()) + } + return cfg, nil +} + +func (er *endpointConfigProvider) addStaticEndpoints(cfg *EndpointConfig) { + for _, e := range er.endpoints { + cfg.Endpoints = append(cfg.Endpoints, EndpointSpec{ + Address: e, + }) + } + for _, e := range er.endpointGroups { + cfg.Endpoints = append(cfg.Endpoints, EndpointSpec{ + Address: e, + Group: true, + }) + } + for _, e := range er.strictEndpoints { + cfg.Endpoints = append(cfg.Endpoints, EndpointSpec{ + Address: e, + Strict: true, + }) + } + for _, e := range er.strictEndpointGroups { + cfg.Endpoints = append(cfg.Endpoints, EndpointSpec{ + Address: e, + Group: true, + Strict: true, + }) + } +} + +func validateEndpointConfig(cfg EndpointConfig) error { + for _, ecfg := range cfg.Endpoints { + if dns.IsDynamicNode(ecfg.Address) && ecfg.Strict { + return errors.Newf("%s is a dynamically specified endpoint i.e. it uses SD and that is not permitted under strict mode.", ecfg.Address) + } + if dns.IsDynamicNode(ecfg.Address) && ecfg.Group { + return errors.Newf("%s is a dynamically specified endpoint i.e. it uses SD and that is not permitted under group mode.", ecfg.Address) + } + } + return nil +} + +func newEndpointConfigProvider( + g *run.Group, + logger log.Logger, + configFile *extflag.PathOrContent, + configReloadInterval time.Duration, + endpoints []string, + endpointGroups []string, + strictEndpoints []string, + strictEndpointGroups []string, +) (*endpointConfigProvider, error) { + res := &endpointConfigProvider{ + endpoints: endpoints, + endpointGroups: endpointGroups, + strictEndpoints: strictEndpoints, + strictEndpointGroups: strictEndpointGroups, + } + + // only static endpoints + if configFile == nil { + cfg := EndpointConfig{} + res.addStaticEndpoints(&cfg) + if err := validateEndpointConfig(cfg); err != nil { + return nil, err + } + res.cfg = cfg + return res, nil + } + + // dynamically reload config file and merge with static endpoints + cfg, err := res.parse(configFile) + if err != nil { + return nil, errors.Wrapf(err, "unable to load config file") + } + res.cfg = cfg + + reloadCtx, reloadCancel := context.WithCancel(context.Background()) + g.Add(func() error { + return extkingpin.PathContentReloader(reloadCtx, configFile, logger, func() { + res.mu.Lock() + defer res.mu.Unlock() + + level.Info(logger).Log("msg", "reloading endpoint config") + cfg, err := res.parse(configFile) + if err != nil { + level.Error(logger).Log("msg", "failed to reload endpoint config", "err", err) + return + } + res.addStaticEndpoints(&cfg) + if err := validateEndpointConfig(cfg); err != nil { + level.Error(logger).Log("msg", "failed to validate endpoint config", "err", err) + return + } + res.cfg = cfg + }, configReloadInterval) + }, func(err error) { + reloadCancel() + }) + return res, nil +} + +func setupEndpointset( + g *run.Group, + reg prometheus.Registerer, + logger log.Logger, + configFile *extflag.PathOrContent, + configReloadInterval time.Duration, + endpoints []string, + endpointGroups []string, + strictEndpoints []string, + strictEndpointGroups []string, + dnsSDResolver string, + dnsSDInterval time.Duration, + unhealthyTimeout time.Duration, + endpointTimeout time.Duration, + dialOpts []grpc.DialOption, + queryConnMetricLabels ...string, +) (*query.EndpointSet, error) { + configProvider, err := newEndpointConfigProvider( + g, + logger, + configFile, + configReloadInterval, + endpoints, + endpointGroups, + strictEndpoints, + strictEndpointGroups, + ) + if err != nil { + return nil, errors.Wrapf(err, "unable to load config initially") + } + // Register resolver for the "thanos:///" scheme for endpoint-groups + dns.RegisterGRPCResolver( + logger, + dns.NewProvider( + logger, + extprom.WrapRegistererWithPrefix("thanos_query_endpoint_groups_", reg), + dns.ResolverType(dnsSDResolver), + ), + dnsSDInterval, + ) + dnsEndpointProvider := dns.NewProvider( + logger, + extprom.WrapRegistererWithPrefix("thanos_query_endpoints_", reg), + dns.ResolverType(dnsSDResolver), + ) + duplicatedStores := promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_query_duplicated_store_addresses_total", + Help: "The number of times a duplicated store addresses is detected from the different configs in query", + }) + + removeDuplicateEndpointSpecs := func(specs []*query.GRPCEndpointSpec) []*query.GRPCEndpointSpec { + set := make(map[string]*query.GRPCEndpointSpec) + for _, spec := range specs { + addr := spec.Addr() + if _, ok := set[addr]; ok { + level.Warn(logger).Log("msg", "Duplicate store address is provided", "addr", addr) + duplicatedStores.Inc() + } + set[addr] = spec + } + deduplicated := make([]*query.GRPCEndpointSpec, 0, len(set)) + for _, value := range set { + deduplicated = append(deduplicated, value) + } + return deduplicated + } + + ctxDNSUpdate, cancelDNSUpdate := context.WithCancel(context.Background()) + g.Add(func() error { + return runutil.Repeat(dnsSDInterval, ctxDNSUpdate.Done(), func() error { + ctxUpdateIter, cancelUpdateIter := context.WithTimeout(ctxDNSUpdate, dnsSDInterval) + defer cancelUpdateIter() + + endpointConfig := configProvider.config() + + addresses := make([]string, 0, len(endpointConfig.Endpoints)) + for _, ecfg := range endpointConfig.Endpoints { + if addr := ecfg.Address; dns.IsDynamicNode(addr) { + addresses = append(addresses, addr) + } + } + if err := dnsEndpointProvider.Resolve(ctxUpdateIter, addresses, true); err != nil { + level.Error(logger).Log("msg", "failed to resolve addresses for endpoints", "err", err) + } + return nil + }) + }, func(error) { + cancelDNSUpdate() + }) + + endpointset := query.NewEndpointSet(time.Now, logger, reg, func() []*query.GRPCEndpointSpec { + endpointConfig := configProvider.config() + + specs := make([]*query.GRPCEndpointSpec, 0) + // add static nodes + for _, ecfg := range endpointConfig.Endpoints { + strict, group, addr := ecfg.Strict, ecfg.Group, ecfg.Address + if dns.IsDynamicNode(addr) { + continue + } + if group { + specs = append(specs, query.NewGRPCEndpointSpec(fmt.Sprintf("thanos:///%s", addr), strict, append(dialOpts, extgrpc.EndpointGroupGRPCOpts()...)...)) + } else { + specs = append(specs, query.NewGRPCEndpointSpec(addr, strict, dialOpts...)) + } + + } + // add dynamic nodes + for _, addr := range dnsEndpointProvider.Addresses() { + specs = append(specs, query.NewGRPCEndpointSpec(addr, false, dialOpts...)) + } + return removeDuplicateEndpointSpecs(specs) + }, unhealthyTimeout, endpointTimeout, queryConnMetricLabels...) + + ctxEndpointUpdate, cancelEndpointUpdate := context.WithCancel(context.Background()) + g.Add(func() error { + return runutil.Repeat(5*time.Second, ctxEndpointUpdate.Done(), func() error { + ctxIter, cancelIter := context.WithTimeout(ctxEndpointUpdate, 5*time.Second) + defer cancelIter() + endpointset.Update(ctxIter) + return nil + }) + }, func(error) { + cancelEndpointUpdate() + }) + + return endpointset, nil +} diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 69ffb8ea329..679f9584683 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -4,7 +4,6 @@ package main import ( - "context" "fmt" "math" "net/http" @@ -12,7 +11,6 @@ import ( "time" extflag "github.com/efficientgo/tools/extkingpin" - "google.golang.org/grpc" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -21,11 +19,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/route" - "github.com/prometheus/prometheus/discovery" - "github.com/prometheus/prometheus/discovery/file" - "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/thanos-io/promql-engine/api" @@ -35,11 +29,8 @@ import ( "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" - "github.com/thanos-io/thanos/pkg/discovery/cache" "github.com/thanos-io/thanos/pkg/discovery/dns" "github.com/thanos-io/thanos/pkg/exemplars" - "github.com/thanos-io/thanos/pkg/extgrpc" - "github.com/thanos-io/thanos/pkg/extgrpc/snappy" "github.com/thanos-io/thanos/pkg/extkingpin" "github.com/thanos-io/thanos/pkg/extprom" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" @@ -51,7 +42,6 @@ import ( "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/query" "github.com/thanos-io/thanos/pkg/rules" - "github.com/thanos-io/thanos/pkg/runutil" grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" httpserver "github.com/thanos-io/thanos/pkg/server/http" "github.com/thanos-io/thanos/pkg/store" @@ -86,14 +76,8 @@ func registerQuery(app *extkingpin.App) { var grpcServerConfig grpcConfig grpcServerConfig.registerFlag(cmd) - secure := cmd.Flag("grpc-client-tls-secure", "Use TLS when talking to the gRPC server").Default("false").Bool() - skipVerify := cmd.Flag("grpc-client-tls-skip-verify", "Disable TLS certificate verification i.e self signed, signed by fake CA").Default("false").Bool() - cert := cmd.Flag("grpc-client-tls-cert", "TLS Certificates to use to identify this client to the server").Default("").String() - key := cmd.Flag("grpc-client-tls-key", "TLS Key for the client's certificate").Default("").String() - caCert := cmd.Flag("grpc-client-tls-ca", "TLS CA Certificates to use to verify gRPC servers").Default("").String() - serverName := cmd.Flag("grpc-client-server-name", "Server name to verify the hostname on the returned gRPC certificates. See https://tools.ietf.org/html/rfc4366#section-3.1").Default("").String() - compressionOptions := strings.Join([]string{snappy.Name, compressionNone}, ", ") - grpcCompression := cmd.Flag("grpc-compression", "Compression algorithm to use for gRPC requests to other clients. Must be one of: "+compressionOptions).Default(compressionNone).Enum(snappy.Name, compressionNone) + var grpcClientConfig grpcClientConfig + grpcClientConfig.registerFlag(cmd) webRoutePrefix := cmd.Flag("web.route-prefix", "Prefix for API and UI endpoints. This allows thanos UI to be served on a sub-path. Defaults to the value of --web.external-prefix. This option is analogous to --web.route-prefix of Prometheus.").Default("").String() webExternalPrefix := cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the UI query web interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos UI to be served behind a reverse proxy that strips a URL sub-path.").Default("").String() @@ -134,55 +118,6 @@ func registerQuery(app *extkingpin.App) { selectorLabels := cmd.Flag("selector-label", "Query selector labels that will be exposed in info endpoint (repeated)."). PlaceHolder("<name>=\"<value>\"").Strings() - endpoints := extkingpin.Addrs(cmd.Flag("endpoint", "Addresses of statically configured Thanos API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect Thanos API servers through respective DNS lookups."). - PlaceHolder("<endpoint>")) - - endpointGroups := extkingpin.Addrs(cmd.Flag("endpoint-group", "Experimental: DNS name of statically configured Thanos API server groups (repeatable). Targets resolved from the DNS name will be queried in a round-robin, instead of a fanout manner. This flag should be used when connecting a Thanos Query to HA groups of Thanos components."). - PlaceHolder("<endpoint-group>")) - - stores := extkingpin.Addrs(cmd.Flag("store", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Addresses of statically configured store API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect store API servers through respective DNS lookups."). - PlaceHolder("<store>")) - - // TODO(bwplotka): Hidden because we plan to extract discovery to separate API: https://github.com/thanos-io/thanos/issues/2600. - ruleEndpoints := extkingpin.Addrs(cmd.Flag("rule", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Experimental: Addresses of statically configured rules API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect rule API servers through respective DNS lookups."). - Hidden().PlaceHolder("<rule>")) - - metadataEndpoints := extkingpin.Addrs(cmd.Flag("metadata", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Experimental: Addresses of statically configured metadata API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect metadata API servers through respective DNS lookups."). - Hidden().PlaceHolder("<metadata>")) - - exemplarEndpoints := extkingpin.Addrs(cmd.Flag("exemplar", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Experimental: Addresses of statically configured exemplars API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect exemplars API servers through respective DNS lookups."). - Hidden().PlaceHolder("<exemplar>")) - - // TODO(atunik): Hidden because we plan to extract discovery to separate API: https://github.com/thanos-io/thanos/issues/2600. - targetEndpoints := extkingpin.Addrs(cmd.Flag("target", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Experimental: Addresses of statically configured target API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect target API servers through respective DNS lookups."). - Hidden().PlaceHolder("<target>")) - - strictStores := cmd.Flag("store-strict", "Deprecation Warning - This flag is deprecated and replaced with `endpoint-strict`. Addresses of only statically configured store API servers that are always used, even if the health check fails. Useful if you have a caching layer on top."). - PlaceHolder("<staticstore>").Strings() - - strictEndpoints := cmd.Flag("endpoint-strict", "Addresses of only statically configured Thanos API servers that are always used, even if the health check fails. Useful if you have a caching layer on top."). - PlaceHolder("<staticendpoint>").Strings() - - strictEndpointGroups := extkingpin.Addrs(cmd.Flag("endpoint-group-strict", "Experimental: DNS name of statically configured Thanos API server groups (repeatable) that are always used, even if the health check fails."). - PlaceHolder("<endpoint-group-strict>")) - - fileSDFiles := cmd.Flag("store.sd-files", "Path to files that contain addresses of store API servers. The path can be a glob pattern (repeatable)."). - PlaceHolder("<path>").Strings() - - fileSDInterval := extkingpin.ModelDuration(cmd.Flag("store.sd-interval", "Refresh interval to re-read file SD files. It is used as a resync fallback."). - Default("5m")) - - // TODO(bwplotka): Grab this from TTL at some point. - dnsSDInterval := extkingpin.ModelDuration(cmd.Flag("store.sd-dns-interval", "Interval between DNS resolutions."). - Default("30s")) - - dnsSDResolver := cmd.Flag("store.sd-dns-resolver", fmt.Sprintf("Resolver to use. Possible options: [%s, %s]", dns.GolangResolverType, dns.MiekgdnsResolverType)). - Default(string(dns.MiekgdnsResolverType)).Hidden().String() - - unhealthyStoreTimeout := extkingpin.ModelDuration(cmd.Flag("store.unhealthy-timeout", "Timeout before an unhealthy store is cleaned from the store UI page.").Default("5m")) - - endpointInfoTimeout := extkingpin.ModelDuration(cmd.Flag("endpoint.info-timeout", "Timeout of gRPC Info requests.").Default("5s").Hidden()) - enableAutodownsampling := cmd.Flag("query.auto-downsampling", "Enable automatic adjustment (step / 5) to what source of data should be used in store gateways if no max_source_resolution param is specified."). Default("false").Bool() @@ -233,6 +168,29 @@ func registerQuery(app *extkingpin.App) { tenantCertField := cmd.Flag("query.tenant-certificate-field", "Use TLS client's certificate field to determine tenant for write requests. Must be one of "+tenancy.CertificateFieldOrganization+", "+tenancy.CertificateFieldOrganizationalUnit+" or "+tenancy.CertificateFieldCommonName+". This setting will cause the query.tenant-header flag value to be ignored.").Default("").Enum("", tenancy.CertificateFieldOrganization, tenancy.CertificateFieldOrganizationalUnit, tenancy.CertificateFieldCommonName) enforceTenancy := cmd.Flag("query.enforce-tenancy", "Enforce tenancy on Query APIs. Responses are returned only if the label value of the configured tenant-label-name and the value of the tenant header matches.").Default("false").Bool() tenantLabel := cmd.Flag("query.tenant-label-name", "Label name to use when enforcing tenancy (if --query.enforce-tenancy is enabled).").Default(tenancy.DefaultTenantLabel).String() + // TODO(bwplotka): Grab this from TTL at some point. + dnsSDInterval := extkingpin.ModelDuration(cmd.Flag("store.sd-dns-interval", "Interval between DNS resolutions."). + Default("30s")) + + dnsSDResolver := cmd.Flag("store.sd-dns-resolver", fmt.Sprintf("Resolver to use. Possible options: [%s, %s]", dns.GolangResolverType, dns.MiekgdnsResolverType)). + Default(string(dns.MiekgdnsResolverType)).Hidden().String() + + unhealthyStoreTimeout := extkingpin.ModelDuration(cmd.Flag("store.unhealthy-timeout", "Timeout before an unhealthy store is cleaned from the store UI page.").Default("5m")) + + endpointInfoTimeout := extkingpin.ModelDuration(cmd.Flag("endpoint.info-timeout", "Timeout of gRPC Info requests.").Default("5s").Hidden()) + + endpointSetConfig := extflag.RegisterPathOrContent(cmd, "endpoint.sd-config", "Config File with endpoint definitions") + + endpointSetConfigReloadInterval := extkingpin.ModelDuration(cmd.Flag("endpoint.sd-config-reload-interval", "Interval between endpoint config refreshes").Default("1m")) + + endpoints := extkingpin.Addrs(cmd.Flag("endpoint", "(Deprecated): Addresses of statically configured Thanos API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect Thanos API servers through respective DNS lookups.").Hidden().PlaceHolder("<endpoint>")) + + endpointGroups := extkingpin.Addrs(cmd.Flag("endpoint-group", "(Deprecated, Experimental): DNS name of statically configured Thanos API server groups (repeatable). Targets resolved from the DNS name will be queried in a round-robin, instead of a fanout manner. This flag should be used when connecting a Thanos Query to HA groups of Thanos components.").Hidden().PlaceHolder("<endpoint-group>")) + + strictEndpoints := cmd.Flag("endpoint-strict", "(Deprecated): Addresses of only statically configured Thanos API servers that are always used, even if the health check fails. Useful if you have a caching layer on top."). + PlaceHolder("<staticendpoint>").Hidden().Strings() + + strictEndpointGroups := extkingpin.Addrs(cmd.Flag("endpoint-group-strict", "(Deprecated, Experimental): DNS name of statically configured Thanos API server groups (repeatable) that are always used, even if the health check fails.").Hidden().PlaceHolder("<endpoint-group-strict>")) var storeRateLimits store.SeriesSelectLimits storeRateLimits.RegisterFlags(cmd) @@ -266,18 +224,6 @@ func registerQuery(app *extkingpin.App) { return errors.Wrap(err, "error while parsing config for request logging") } - var fileSD *file.Discovery - if len(*fileSDFiles) > 0 { - conf := &file.SDConfig{ - Files: *fileSDFiles, - RefreshInterval: *fileSDInterval, - } - var err error - if fileSD, err = file.NewDiscovery(conf, logger, conf.NewDiscovererMetrics(reg, discovery.NewRefreshMetrics(reg))); err != nil { - return err - } - } - if *webRoutePrefix == "" { *webRoutePrefix = *webExternalPrefix } @@ -295,23 +241,43 @@ func registerQuery(app *extkingpin.App) { return err } + dialOpts, err := grpcClientConfig.dialOptions(logger, reg, tracer) + if err != nil { + return err + } + + endpointSet, err := setupEndpointset( + g, + reg, + logger, + endpointSetConfig, + time.Duration(*endpointSetConfigReloadInterval), + *endpoints, + *endpointGroups, + *strictEndpoints, + *strictEndpointGroups, + *dnsSDResolver, + time.Duration(*dnsSDInterval), + time.Duration(*unhealthyStoreTimeout), + time.Duration(*endpointInfoTimeout), + dialOpts, + *queryConnMetricLabels..., + ) + if err != nil { + return err + } + return runQuery( g, logger, debugLogging, + endpointSet, reg, tracer, httpLogOpts, grpcLogOpts, logFilterMethods, grpcServerConfig, - *grpcCompression, - *secure, - *skipVerify, - *cert, - *key, - *caCert, - *serverName, *httpBindAddr, *httpTLSConfig, time.Duration(*httpGracePeriod), @@ -326,18 +292,10 @@ func registerQuery(app *extkingpin.App) { *dynamicLookbackDelta, time.Duration(*defaultEvaluationInterval), time.Duration(*storeResponseTimeout), - *queryConnMetricLabels, *queryReplicaLabels, *queryPartitionLabels, selectorLset, getFlagsMap(cmd.Flags()), - *endpoints, - *endpointGroups, - *stores, - *ruleEndpoints, - *targetEndpoints, - *metadataEndpoints, - *exemplarEndpoints, *enableAutodownsampling, *enableQueryPartialResponse, *enableRulePartialResponse, @@ -345,16 +303,8 @@ func registerQuery(app *extkingpin.App) { *enableMetricMetadataPartialResponse, *enableExemplarPartialResponse, *activeQueryDir, - fileSD, - time.Duration(*dnsSDInterval), - *dnsSDResolver, - time.Duration(*unhealthyStoreTimeout), - time.Duration(*endpointInfoTimeout), time.Duration(*instantDefaultMaxSourceResolution), *defaultMetadataTimeRange, - *strictStores, - *strictEndpoints, - *strictEndpointGroups, *webDisableCORS, *alertQueryURL, *grpcProxyStrategy, @@ -381,19 +331,13 @@ func runQuery( g *run.Group, logger log.Logger, debugLogging bool, + endpointSet *query.EndpointSet, reg *prometheus.Registry, tracer opentracing.Tracer, httpLogOpts []logging.Option, grpcLogOpts []grpc_logging.Option, logFilterMethods []string, grpcServerConfig grpcConfig, - grpcCompression string, - secure bool, - skipVerify bool, - cert string, - key string, - caCert string, - serverName string, httpBindAddr string, httpTLSConfig string, httpGracePeriod time.Duration, @@ -408,18 +352,10 @@ func runQuery( dynamicLookbackDelta bool, defaultEvaluationInterval time.Duration, storeResponseTimeout time.Duration, - queryConnMetricLabels []string, queryReplicaLabels []string, queryPartitionLabels []string, selectorLset labels.Labels, flagsMap map[string]string, - endpointAddrs []string, - endpointGroupAddrs []string, - storeAddrs []string, - ruleAddrs []string, - targetAddrs []string, - metadataAddrs []string, - exemplarAddrs []string, enableAutodownsampling bool, enableQueryPartialResponse bool, enableRulePartialResponse bool, @@ -427,16 +363,8 @@ func runQuery( enableMetricMetadataPartialResponse bool, enableExemplarPartialResponse bool, activeQueryDir string, - fileSD *file.Discovery, - dnsSDInterval time.Duration, - dnsSDResolver string, - unhealthyStoreTimeout time.Duration, - endpointInfoTimeout time.Duration, instantDefaultMaxSourceResolution time.Duration, defaultMetadataTimeRange time.Duration, - strictStores []string, - strictEndpoints []string, - strictEndpointGroups []string, disableCORS bool, alertQueryURL string, grpcProxyStrategy string, @@ -462,79 +390,6 @@ func runQuery( } // NOTE(GiedriusS): default is set in config.ts. } - // TODO(bplotka in PR #513 review): Move arguments into struct. - duplicatedStores := promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_query_duplicated_store_addresses_total", - Help: "The number of times a duplicated store addresses is detected from the different configs in query", - }) - - dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer, secure, skipVerify, cert, key, caCert, serverName) - if err != nil { - return errors.Wrap(err, "building gRPC client") - } - if grpcCompression != compressionNone { - dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(grpcCompression))) - } - - fileSDCache := cache.New() - dnsStoreProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_store_apis_", reg), - dns.ResolverType(dnsSDResolver), - ) - - for _, store := range strictStores { - if dns.IsDynamicNode(store) { - return errors.Errorf("%s is a dynamically specified store i.e. it uses SD and that is not permitted under strict mode. Use --store for this", store) - } - } - - for _, endpoint := range strictEndpoints { - if dns.IsDynamicNode(endpoint) { - return errors.Errorf("%s is a dynamically specified endpoint i.e. it uses SD and that is not permitted under strict mode. Use --endpoint for this", endpoint) - } - } - - // Register resolver for the "thanos:///" scheme for endpoint-groups - dns.RegisterGRPCResolver( - dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_endpoint_groups_", reg), - dns.ResolverType(dnsSDResolver), - ), - dnsSDInterval, - logger, - ) - - dnsEndpointProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_endpoints_", reg), - dns.ResolverType(dnsSDResolver), - ) - - dnsRuleProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_rule_apis_", reg), - dns.ResolverType(dnsSDResolver), - ) - - dnsTargetProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_target_apis_", reg), - dns.ResolverType(dnsSDResolver), - ) - - dnsMetadataProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_metadata_apis_", reg), - dns.ResolverType(dnsSDResolver), - ) - - dnsExemplarProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_exemplar_apis_", reg), - dns.ResolverType(dnsSDResolver), - ) options := []store.ProxyStoreOption{ store.WithTSDBSelector(tsdbSelector), @@ -545,35 +400,12 @@ func runQuery( queryReplicaLabels = strutil.ParseFlagLabels(queryReplicaLabels) var ( - endpoints = prepareEndpointSet( - g, - logger, - reg, - []*dns.Provider{ - dnsStoreProvider, - dnsRuleProvider, - dnsExemplarProvider, - dnsMetadataProvider, - dnsTargetProvider, - dnsEndpointProvider, - }, - duplicatedStores, - strictStores, - strictEndpoints, - endpointGroupAddrs, - strictEndpointGroups, - dialOpts, - unhealthyStoreTimeout, - endpointInfoTimeout, - queryConnMetricLabels..., - ) - - proxyStore = store.NewProxyStore(logger, reg, endpoints.GetStoreClients, component.Query, selectorLset, storeResponseTimeout, store.RetrievalStrategy(grpcProxyStrategy), options...) + proxyStore = store.NewProxyStore(logger, reg, endpointSet.GetStoreClients, component.Query, selectorLset, storeResponseTimeout, store.RetrievalStrategy(grpcProxyStrategy), options...) seriesProxy = store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxyStore), reg, storeRateLimits) - rulesProxy = rules.NewProxy(logger, endpoints.GetRulesClients) - targetsProxy = targets.NewProxy(logger, endpoints.GetTargetsClients) - metadataProxy = metadata.NewProxy(logger, endpoints.GetMetricMetadataClients) - exemplarsProxy = exemplars.NewProxy(logger, endpoints.GetExemplarsStores, selectorLset) + rulesProxy = rules.NewProxy(logger, endpointSet.GetRulesClients) + targetsProxy = targets.NewProxy(logger, endpointSet.GetTargetsClients) + metadataProxy = metadata.NewProxy(logger, endpointSet.GetMetricMetadataClients) + exemplarsProxy = exemplars.NewProxy(logger, endpointSet.GetExemplarsStores, selectorLset) queryableCreator = query.NewQueryableCreator( logger, extprom.WrapRegistererWithPrefix("thanos_query_", reg), @@ -583,78 +415,6 @@ func runQuery( ) ) - // Run File Service Discovery and update the store set when the files are modified. - if fileSD != nil { - var fileSDUpdates chan []*targetgroup.Group - ctxRun, cancelRun := context.WithCancel(context.Background()) - - fileSDUpdates = make(chan []*targetgroup.Group) - - g.Add(func() error { - fileSD.Run(ctxRun, fileSDUpdates) - return nil - }, func(error) { - cancelRun() - }) - - ctxUpdate, cancelUpdate := context.WithCancel(context.Background()) - g.Add(func() error { - for { - select { - case update := <-fileSDUpdates: - // Discoverers sometimes send nil updates so need to check for it to avoid panics. - if update == nil { - continue - } - fileSDCache.Update(update) - endpoints.Update(ctxUpdate) - - if err := dnsStoreProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...), true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for storeAPIs", "err", err) - } - - // Rules apis do not support file service discovery as of now. - case <-ctxUpdate.Done(): - return nil - } - } - }, func(error) { - cancelUpdate() - }) - } - // Periodically update the addresses from static flags and file SD by resolving them using DNS SD if necessary. - { - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { - return runutil.Repeat(dnsSDInterval, ctx.Done(), func() error { - resolveCtx, resolveCancel := context.WithTimeout(ctx, dnsSDInterval) - defer resolveCancel() - if err := dnsStoreProvider.Resolve(resolveCtx, append(fileSDCache.Addresses(), storeAddrs...), true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for storeAPIs", "err", err) - } - if err := dnsRuleProvider.Resolve(resolveCtx, ruleAddrs, true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for rulesAPIs", "err", err) - } - if err := dnsTargetProvider.Resolve(ctx, targetAddrs, true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for targetsAPIs", "err", err) - } - if err := dnsMetadataProvider.Resolve(resolveCtx, metadataAddrs, true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for metadataAPIs", "err", err) - } - if err := dnsExemplarProvider.Resolve(resolveCtx, exemplarAddrs, true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for exemplarsAPI", "err", err) - } - if err := dnsEndpointProvider.Resolve(resolveCtx, endpointAddrs, true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses passed using endpoint flag", "err", err) - - } - return nil - }) - }, func(error) { - cancel() - }) - } - grpcProbe := prober.NewGRPC() httpProbe := prober.NewHTTP() statusProber := prober.Combine( @@ -687,7 +447,7 @@ func runQuery( if queryMode != queryModeLocal { level.Info(logger).Log("msg", "Distributed query mode enabled, using Thanos as the default query engine.") defaultEngine = string(apiv1.PromqlEngineThanos) - remoteEngineEndpoints = query.NewRemoteEndpoints(logger, endpoints.GetQueryAPIClients, query.Opts{ + remoteEngineEndpoints = query.NewRemoteEndpoints(logger, endpointSet.GetQueryAPIClients, query.Opts{ AutoDownsample: enableAutodownsampling, ReplicaLabels: queryReplicaLabels, PartitionLabels: queryPartitionLabels, @@ -727,11 +487,11 @@ func runQuery( ins := extpromhttp.NewTenantInstrumentationMiddleware(tenantHeader, defaultTenant, reg, nil) // TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting. - ui.NewQueryUI(logger, endpoints, webExternalPrefix, webPrefixHeaderName, alertQueryURL, tenantHeader, defaultTenant, enforceTenancy).Register(router, ins) + ui.NewQueryUI(logger, endpointSet, webExternalPrefix, webPrefixHeaderName, alertQueryURL, tenantHeader, defaultTenant, enforceTenancy).Register(router, ins) api := apiv1.NewQueryAPI( logger, - endpoints.GetEndpointStatus, + endpointSet.GetEndpointStatus, engineFactory, apiv1.PromqlEngineType(defaultEngine), lookbackDeltaCreator, @@ -844,7 +604,7 @@ func runQuery( }, func(error) { statusProber.NotReady(err) s.Shutdown(err) - endpoints.Close() + endpointSet.Close() }) } @@ -852,96 +612,6 @@ func runQuery( return nil } -func removeDuplicateEndpointSpecs(logger log.Logger, duplicatedStores prometheus.Counter, specs []*query.GRPCEndpointSpec) []*query.GRPCEndpointSpec { - set := make(map[string]*query.GRPCEndpointSpec) - for _, spec := range specs { - addr := spec.Addr() - if _, ok := set[addr]; ok { - level.Warn(logger).Log("msg", "Duplicate store address is provided", "addr", addr) - duplicatedStores.Inc() - } - set[addr] = spec - } - deduplicated := make([]*query.GRPCEndpointSpec, 0, len(set)) - for _, value := range set { - deduplicated = append(deduplicated, value) - } - return deduplicated -} - -func prepareEndpointSet( - g *run.Group, - logger log.Logger, - reg *prometheus.Registry, - dnsProviders []*dns.Provider, - duplicatedStores prometheus.Counter, - strictStores []string, - strictEndpoints []string, - endpointGroupAddrs []string, - strictEndpointGroups []string, - dialOpts []grpc.DialOption, - unhealthyStoreTimeout time.Duration, - endpointInfoTimeout time.Duration, - queryConnMetricLabels ...string, -) *query.EndpointSet { - endpointSet := query.NewEndpointSet( - time.Now, - logger, - reg, - func() (specs []*query.GRPCEndpointSpec) { - // Add strict & static nodes. - for _, addr := range strictStores { - specs = append(specs, query.NewGRPCEndpointSpec(addr, true)) - } - - for _, addr := range strictEndpoints { - specs = append(specs, query.NewGRPCEndpointSpec(addr, true)) - } - - for _, dnsProvider := range dnsProviders { - var tmpSpecs []*query.GRPCEndpointSpec - - for _, addr := range dnsProvider.Addresses() { - tmpSpecs = append(tmpSpecs, query.NewGRPCEndpointSpec(addr, false)) - } - tmpSpecs = removeDuplicateEndpointSpecs(logger, duplicatedStores, tmpSpecs) - specs = append(specs, tmpSpecs...) - } - - for _, eg := range endpointGroupAddrs { - spec := query.NewGRPCEndpointSpec(fmt.Sprintf("thanos:///%s", eg), false, extgrpc.EndpointGroupGRPCOpts()...) - specs = append(specs, spec) - } - - for _, eg := range strictEndpointGroups { - spec := query.NewGRPCEndpointSpec(fmt.Sprintf("thanos:///%s", eg), true, extgrpc.EndpointGroupGRPCOpts()...) - specs = append(specs, spec) - } - - return specs - }, - dialOpts, - unhealthyStoreTimeout, - endpointInfoTimeout, - queryConnMetricLabels..., - ) - - // Periodically update the store set with the addresses we see in our cluster. - { - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { - return runutil.Repeat(5*time.Second, ctx.Done(), func() error { - endpointSet.Update(ctx) - return nil - }) - }, func(error) { - cancel() - }) - } - - return endpointSet -} - // LookbackDeltaFactory creates from 1 to 3 lookback deltas depending on // dynamicLookbackDelta and eo.LookbackDelta and returns a function // that returns appropriate lookback delta for given maxSourceResolutionMillis. diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index e0780452fd9..72e161b8628 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -79,8 +79,6 @@ import ( "github.com/thanos-io/thanos/pkg/ui" ) -const dnsSDResolver = "miekgdns" - type ruleConfig struct { http httpConfig grpc grpcConfig @@ -404,17 +402,6 @@ func runRule( } if len(grpcEndpoints) > 0 { - duplicatedGRPCEndpoints := promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_rule_grpc_endpoints_duplicated_total", - Help: "The number of times a duplicated grpc endpoint is detected from the different configs in rule", - }) - - dnsEndpointProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_rule_grpc_endpoints_", reg), - dnsSDResolver, - ) - dialOpts, err := extgrpc.StoreClientGRPCOpts( logger, reg, @@ -430,36 +417,24 @@ func runRule( return err } - grpcEndpointSet = prepareEndpointSet( + grpcEndpointSet, err = setupEndpointset( g, - logger, reg, - []*dns.Provider{dnsEndpointProvider}, - duplicatedGRPCEndpoints, + logger, nil, + 1*time.Minute, + grpcEndpoints, nil, nil, nil, - dialOpts, + conf.query.dnsSDResolver, + conf.query.dnsSDInterval, 5*time.Minute, 5*time.Second, + dialOpts, ) - - // Periodically update the GRPC addresses from query config by resolving them using DNS SD if necessary. - { - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { - return runutil.Repeat(5*time.Second, ctx.Done(), func() error { - resolveCtx, resolveCancel := context.WithTimeout(ctx, 5*time.Second) - defer resolveCancel() - if err := dnsEndpointProvider.Resolve(resolveCtx, grpcEndpoints, true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses passed using grpc query config", "err", err) - } - return nil - }) - }, func(error) { - cancel() - }) + if err != nil { + return nil } } diff --git a/docs/components/query.md b/docs/components/query.md index 10975138b60..a97bef900cd 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -299,27 +299,14 @@ Flags: detected maximum container or system memory. --enable-auto-gomemlimit Enable go runtime to automatically limit memory consumption. - --endpoint=<endpoint> ... Addresses of statically configured Thanos - API servers (repeatable). The scheme may be - prefixed with 'dns+' or 'dnssrv+' to detect - Thanos API servers through respective DNS - lookups. - --endpoint-group=<endpoint-group> ... - Experimental: DNS name of statically configured - Thanos API server groups (repeatable). Targets - resolved from the DNS name will be queried in - a round-robin, instead of a fanout manner. - This flag should be used when connecting a - Thanos Query to HA groups of Thanos components. - --endpoint-group-strict=<endpoint-group-strict> ... - Experimental: DNS name of statically configured - Thanos API server groups (repeatable) that are - always used, even if the health check fails. - --endpoint-strict=<staticendpoint> ... - Addresses of only statically configured Thanos - API servers that are always used, even if - the health check fails. Useful if you have a - caching layer on top. + --endpoint.sd-config=<content> + Alternative to 'endpoint.sd-config-file' flag + (mutually exclusive). Content of Config File + with endpoint definitions + --endpoint.sd-config-file=<file-path> + Path to Config File with endpoint definitions + --endpoint.sd-config-reload-interval=1m + Interval between endpoint config refreshes --grpc-address="0.0.0.0:10901" Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable @@ -500,19 +487,6 @@ Flags: It follows the Thanos sharding relabel-config syntax. For format details see: https://thanos.io/tip/thanos/sharding.md/#relabelling - --store=<store> ... Deprecation Warning - This flag is deprecated - and replaced with `endpoint`. Addresses of - statically configured store API servers - (repeatable). The scheme may be prefixed with - 'dns+' or 'dnssrv+' to detect store API servers - through respective DNS lookups. - --store-strict=<staticstore> ... - Deprecation Warning - This flag is deprecated - and replaced with `endpoint-strict`. Addresses - of only statically configured store API servers - that are always used, even if the health check - fails. Useful if you have a caching layer on - top. --store.limits.request-samples=0 The maximum samples allowed for a single Series request, The Series call fails if @@ -531,12 +505,6 @@ Flags: enabled. 0 disables timeout. --store.sd-dns-interval=30s Interval between DNS resolutions. - --store.sd-files=<path> ... - Path to files that contain addresses of store - API servers. The path can be a glob pattern - (repeatable). - --store.sd-interval=5m Refresh interval to re-read file SD files. - It is used as a resync fallback. --store.unhealthy-timeout=5m Timeout before an unhealthy store is cleaned from the store UI page. diff --git a/pkg/discovery/dns/grpc.go b/pkg/discovery/dns/grpc.go index 79e832b6529..7971e7991cb 100644 --- a/pkg/discovery/dns/grpc.go +++ b/pkg/discovery/dns/grpc.go @@ -23,7 +23,7 @@ type builder struct { logger log.Logger } -func RegisterGRPCResolver(provider *Provider, interval time.Duration, logger log.Logger) { +func RegisterGRPCResolver(logger log.Logger, provider *Provider, interval time.Duration) { grpcresolver.Register(&builder{ resolveInterval: interval, provider: provider, diff --git a/pkg/query/endpointset.go b/pkg/query/endpointset.go index 4c519bf925f..071e04a8465 100644 --- a/pkg/query/endpointset.go +++ b/pkg/query/endpointset.go @@ -211,8 +211,7 @@ type EndpointSet struct { // Endpoint specifications can change dynamically. If some component is missing from the list, we assume it is no longer // accessible and we close gRPC client for it, unless it is strict. - endpointSpec func() map[string]*GRPCEndpointSpec - dialOpts []grpc.DialOption + endpointSpecs func() map[string]*GRPCEndpointSpec endpointInfoTimeout time.Duration unhealthyEndpointTimeout time.Duration @@ -235,7 +234,6 @@ func NewEndpointSet( logger log.Logger, reg prometheus.Registerer, endpointSpecs func() []*GRPCEndpointSpec, - dialOpts []grpc.DialOption, unhealthyEndpointTimeout time.Duration, endpointInfoTimeout time.Duration, endpointMetricLabels ...string, @@ -254,19 +252,17 @@ func NewEndpointSet( } return &EndpointSet{ - now: now, - logger: log.With(logger, "component", "endpointset"), - endpointsMetric: endpointsMetric, - - dialOpts: dialOpts, + now: now, + logger: log.With(logger, "component", "endpointset"), + endpointsMetric: endpointsMetric, endpointInfoTimeout: endpointInfoTimeout, unhealthyEndpointTimeout: unhealthyEndpointTimeout, - endpointSpec: func() map[string]*GRPCEndpointSpec { - specs := make(map[string]*GRPCEndpointSpec) + endpointSpecs: func() map[string]*GRPCEndpointSpec { + res := make(map[string]*GRPCEndpointSpec) for _, s := range endpointSpecs() { - specs[s.addr] = s + res[s.addr] = s } - return specs + return res }, endpoints: make(map[string]*endpointRef), } @@ -288,7 +284,7 @@ func (e *EndpointSet) Update(ctx context.Context) { mu sync.Mutex ) - for _, spec := range e.endpointSpec() { + for _, spec := range e.endpointSpecs() { spec := spec if er, existingRef := e.endpoints[spec.Addr()]; existingRef { @@ -571,11 +567,7 @@ type endpointRef struct { // newEndpointRef creates a new endpointRef with a gRPC channel to the given the IP address. // The call to newEndpointRef will return an error if establishing the channel fails. func (e *EndpointSet) newEndpointRef(spec *GRPCEndpointSpec) (*endpointRef, error) { - var dialOpts []grpc.DialOption - - dialOpts = append(dialOpts, e.dialOpts...) - dialOpts = append(dialOpts, spec.dialOpts...) - conn, err := grpc.NewClient(spec.Addr(), dialOpts...) + conn, err := grpc.NewClient(spec.Addr(), spec.dialOpts...) if err != nil { return nil, errors.Wrap(err, "dialing connection") } diff --git a/pkg/query/endpointset_test.go b/pkg/query/endpointset_test.go index 6f061211ab5..68b00aff5e1 100644 --- a/pkg/query/endpointset_test.go +++ b/pkg/query/endpointset_test.go @@ -675,11 +675,11 @@ func TestEndpointSetUpdate_AvailabilityScenarios(t *testing.T) { endpointSet := NewEndpointSet(nowFunc, nil, nil, func() (specs []*GRPCEndpointSpec) { for _, addr := range discoveredEndpointAddr { - specs = append(specs, NewGRPCEndpointSpec(addr, false)) + specs = append(specs, NewGRPCEndpointSpec(addr, false, testGRPCOpts...)) } return specs }, - testGRPCOpts, time.Minute, 2*time.Second) + time.Minute, 2*time.Second) defer endpointSet.Close() // Initial update. @@ -1052,7 +1052,7 @@ func TestEndpointSet_Update_NoneAvailable(t *testing.T) { } return specs }, - testGRPCOpts, time.Minute, 2*time.Second) + time.Minute, 2*time.Second) defer endpointSet.Close() // Should not matter how many of these we run. @@ -1159,11 +1159,11 @@ func TestEndpoint_Update_QuerierStrict(t *testing.T) { slowStaticEndpointAddr := discoveredEndpointAddr[2] endpointSet := NewEndpointSet(time.Now, nil, nil, func() (specs []*GRPCEndpointSpec) { return []*GRPCEndpointSpec{ - NewGRPCEndpointSpec(discoveredEndpointAddr[0], true), - NewGRPCEndpointSpec(discoveredEndpointAddr[1], false), - NewGRPCEndpointSpec(discoveredEndpointAddr[2], true), + NewGRPCEndpointSpec(discoveredEndpointAddr[0], true, testGRPCOpts...), + NewGRPCEndpointSpec(discoveredEndpointAddr[1], false, testGRPCOpts...), + NewGRPCEndpointSpec(discoveredEndpointAddr[2], true, testGRPCOpts...), } - }, testGRPCOpts, time.Minute, 1*time.Second) + }, time.Minute, 1*time.Second) defer endpointSet.Close() // Initial update. @@ -1273,7 +1273,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { endpointSpec: func() []*GRPCEndpointSpec { endpointSpec := make([]*GRPCEndpointSpec, 0, len(endpoints.orderAddrs)) for _, addr := range endpoints.orderAddrs { - endpointSpec = append(endpointSpec, NewGRPCEndpointSpec(addr, false)) + endpointSpec = append(endpointSpec, NewGRPCEndpointSpec(addr, false, testGRPCOpts...)) } return endpointSpec }, @@ -1297,7 +1297,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { name: "Sidecar discovered, no Ruler discovered", endpointSpec: func() []*GRPCEndpointSpec { return []*GRPCEndpointSpec{ - NewGRPCEndpointSpec(endpoints.orderAddrs[0], false), + NewGRPCEndpointSpec(endpoints.orderAddrs[0], false, testGRPCOpts...), } }, expectedStores: 1, // sidecar @@ -1310,8 +1310,8 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { name: "Ruler discovered", endpointSpec: func() []*GRPCEndpointSpec { return []*GRPCEndpointSpec{ - NewGRPCEndpointSpec(endpoints.orderAddrs[0], false), - NewGRPCEndpointSpec(endpoints.orderAddrs[1], false), + NewGRPCEndpointSpec(endpoints.orderAddrs[0], false, testGRPCOpts...), + NewGRPCEndpointSpec(endpoints.orderAddrs[1], false, testGRPCOpts...), } }, expectedStores: 2, // sidecar + ruler @@ -1324,7 +1324,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { name: "Sidecar removed", endpointSpec: func() []*GRPCEndpointSpec { return []*GRPCEndpointSpec{ - NewGRPCEndpointSpec(endpoints.orderAddrs[1], false), + NewGRPCEndpointSpec(endpoints.orderAddrs[1], false, testGRPCOpts...), } }, expectedStores: 1, // ruler @@ -1344,7 +1344,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { return tc.states[currentState].endpointSpec() }, - testGRPCOpts, time.Minute, 2*time.Second) + time.Minute, 2*time.Second) defer endpointSet.Close() @@ -1532,11 +1532,11 @@ func makeEndpointSet(discoveredEndpointAddr []string, strict bool, now nowFunc, endpointSet := NewEndpointSet(now, nil, nil, func() (specs []*GRPCEndpointSpec) { for _, addr := range discoveredEndpointAddr { - specs = append(specs, NewGRPCEndpointSpec(addr, strict)) + specs = append(specs, NewGRPCEndpointSpec(addr, strict, testGRPCOpts...)) } return specs }, - testGRPCOpts, time.Minute, time.Second, metricLabels...) + time.Minute, time.Second, metricLabels...) return endpointSet } diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index c8a9e7fc62d..08841fe3f52 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -17,9 +17,7 @@ import ( e2edb "github.com/efficientgo/e2e/db" e2eobs "github.com/efficientgo/e2e/observable" "github.com/pkg/errors" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/relabel" "gopkg.in/yaml.v2" @@ -429,26 +427,25 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) { "--store.sd-dns-interval": "5s", "--log.level": infoLogLevel, "--query.max-concurrent": "1", - "--store.sd-interval": "5s", }) for _, repl := range q.replicaLabels { args = append(args, "--query.replica-label="+repl) } for _, addr := range q.storeAddresses { - args = append(args, "--store="+addr) + args = append(args, "--endpoint="+addr) } for _, addr := range q.ruleAddresses { - args = append(args, "--rule="+addr) + args = append(args, "--endpoint="+addr) } for _, addr := range q.targetAddresses { - args = append(args, "--target="+addr) + args = append(args, "--endpoint="+addr) } for _, addr := range q.metadataAddresses { - args = append(args, "--metadata="+addr) + args = append(args, "--endpoint="+addr) } for _, addr := range q.exemplarAddresses { - args = append(args, "--exemplar="+addr) + args = append(args, "--endpoint="+addr) } for _, feature := range q.enableFeatures { args = append(args, "--enable-feature="+feature) @@ -470,21 +467,25 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) { return nil, errors.Wrap(err, "create query dir failed") } - fileSD := []*targetgroup.Group{{}} + type EndpointSpec struct{ Address string } + + endpoints := make([]EndpointSpec, 0) for _, a := range q.fileSDStoreAddresses { - fileSD[0].Targets = append(fileSD[0].Targets, model.LabelSet{model.AddressLabel: model.LabelValue(a)}) + endpoints = append(endpoints, EndpointSpec{Address: a}) } - b, err := yaml.Marshal(fileSD) + endpointSDConfig := struct{ Endpoints []EndpointSpec }{Endpoints: endpoints} + b, err := yaml.Marshal(endpointSDConfig) if err != nil { return nil, err } - if err := os.WriteFile(q.Dir()+"/filesd.yaml", b, 0600); err != nil { + if err := os.WriteFile(q.Dir()+"/endpoint-sd-config.yaml", b, 0600); err != nil { return nil, errors.Wrap(err, "creating query SD config failed") } - args = append(args, "--store.sd-files="+filepath.Join(q.InternalDir(), "filesd.yaml")) + args = append(args, "--endpoint.sd-config-file="+filepath.Join(q.InternalDir(), "endpoint-sd-config.yaml")) + args = append(args, "--endpoint.sd-config-reload-interval=5s") } if q.routePrefix != "" { args = append(args, "--web.route-prefix="+q.routePrefix)