diff --git a/CHANGELOG.md b/CHANGELOG.md index f89c47c855..966c5f2411 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7885](https://github.com/thanos-io/thanos/pull/7885) Store: Return chunks to the pool after completing a Series call. - [#7893](https://github.com/thanos-io/thanos/pull/7893) Sidecar: Fix retrieval of external labels for Prometheus v3.0.0. - [#7903](https://github.com/thanos-io/thanos/pull/7903) Query: Fix panic on regex store matchers. +- [#7915](https://github.com/thanos-io/thanos/pull/7915) Store: Close block series client at the end to not reuse chunk buffer +- [#7941](https://github.com/thanos-io/thanos/pull/7941) Receive: Fix race condition when adding multiple new tenants, see [issue-7892](https://github.com/thanos-io/thanos/issues/7892). ### Added - [#7763](https://github.com/thanos-io/thanos/pull/7763) Ruler: use native histograms for client latency metrics. diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 5fd8352b4b..e7cd940228 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -62,6 +62,9 @@ type MultiTSDB struct { hashFunc metadata.HashFunc hashringConfigs []HashringConfig + tsdbClients []store.Client + exemplarClients map[string]*exemplars.TSDB + metricNameFilterEnabled bool } @@ -101,6 +104,8 @@ func NewMultiTSDB( mtx: &sync.RWMutex{}, tenants: map[string]*tenant{}, labels: labels, + tsdbClients: make([]store.Client, 0), + exemplarClients: map[string]*exemplars.TSDB{}, tenantLabelName: tenantLabelName, bucket: bucket, allowOutOfOrderUpload: allowOutOfOrderUpload, @@ -116,14 +121,57 @@ func NewMultiTSDB( func (t *MultiTSDB) GetTenants() []string { t.mtx.RLock() + defer t.mtx.RUnlock() tenants := make([]string, 0, len(t.tenants)) for tname := range t.tenants { tenants = append(tenants, tname) } - defer t.mtx.RUnlock() return tenants } +// testGetTenant returns the tenant with the given tenantID for testing purposes. +func (t *MultiTSDB) testGetTenant(tenantID string) *tenant { + t.mtx.RLock() + defer t.mtx.RUnlock() + return t.tenants[tenantID] +} + +func (t *MultiTSDB) updateTSDBClients() { + t.tsdbClients = t.tsdbClients[:0] + for _, tenant := range t.tenants { + client := tenant.client() + if client != nil { + t.tsdbClients = append(t.tsdbClients, client) + } + } +} + +func (t *MultiTSDB) addTenantUnlocked(tenantID string, newTenant *tenant) { + t.tenants[tenantID] = newTenant + t.updateTSDBClients() + if newTenant.exemplars() != nil { + t.exemplarClients[tenantID] = newTenant.exemplars() + } +} + +func (t *MultiTSDB) addTenantLocked(tenantID string, newTenant *tenant) { + t.mtx.Lock() + defer t.mtx.Unlock() + t.addTenantUnlocked(tenantID, newTenant) +} + +func (t *MultiTSDB) removeTenantUnlocked(tenantID string) { + delete(t.tenants, tenantID) + delete(t.exemplarClients, tenantID) + t.updateTSDBClients() +} + +func (t *MultiTSDB) removeTenantLocked(tenantID string) { + t.mtx.Lock() + defer t.mtx.Unlock() + t.removeTenantUnlocked(tenantID) +} + type localClient struct { store *store.TSDBStore @@ -426,7 +474,7 @@ func (t *MultiTSDB) Prune(ctx context.Context) error { } level.Info(t.logger).Log("msg", "Pruned tenant", "tenant", tenantID) - delete(t.tenants, tenantID) + t.removeTenantUnlocked(tenantID) } return merr.Err() @@ -586,33 +634,18 @@ func (t *MultiTSDB) RemoveLockFilesIfAny() error { return merr.Err() } +// TSDBLocalClients should be used as read-only. func (t *MultiTSDB) TSDBLocalClients() []store.Client { t.mtx.RLock() defer t.mtx.RUnlock() - - res := make([]store.Client, 0, len(t.tenants)) - for _, tenant := range t.tenants { - client := tenant.client() - if client != nil { - res = append(res, client) - } - } - - return res + return t.tsdbClients } +// TSDBExemplars should be used as read-only. func (t *MultiTSDB) TSDBExemplars() map[string]*exemplars.TSDB { t.mtx.RLock() defer t.mtx.RUnlock() - - res := make(map[string]*exemplars.TSDB, len(t.tenants)) - for k, tenant := range t.tenants { - e := tenant.exemplars() - if e != nil { - res[k] = e - } - } - return res + return t.exemplarClients } func (t *MultiTSDB) TenantStats(limit int, statsByLabelName string, tenantIDs ...string) []status.TenantStats { @@ -687,9 +720,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant nil, ) if err != nil { - t.mtx.Lock() - delete(t.tenants, tenantID) - t.mtx.Unlock() + t.removeTenantLocked(tenantID) return err } var ship *shipper.Shipper @@ -712,6 +743,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant options = append(options, store.WithCuckooMetricNameStoreFilter()) } tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, options...), s, ship, exemplars.NewTSDB(s, lset)) + t.addTenantLocked(tenantID, tenant) // need to update the client list once store is ready & client != nil level.Info(logger).Log("msg", "TSDB is now ready") return nil } @@ -740,7 +772,7 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan } tenant = newTenant() - t.tenants[tenantID] = tenant + t.addTenantUnlocked(tenantID, tenant) t.mtx.Unlock() logger := log.With(t.logger, "tenant", tenantID) diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index a846dfbbdd..a36db4b402 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -194,7 +194,7 @@ func TestMultiTSDB(t *testing.T) { testutil.Ok(t, m.Open()) testutil.Ok(t, appendSample(m, testTenant, time.Now())) - tenant := m.tenants[testTenant] + tenant := m.testGetTenant(testTenant) db := tenant.readyStorage().Get() testutil.Equals(t, 0, len(db.Blocks())) @@ -843,7 +843,10 @@ func appendSampleWithLabels(m *MultiTSDB, tenant string, lbls labels.Labels, tim func queryLabelValues(ctx context.Context, m *MultiTSDB) error { proxy := store.NewProxyStore(nil, nil, func() []store.Client { - clients := m.TSDBLocalClients() + m.mtx.Lock() + defer m.mtx.Unlock() + clients := make([]store.Client, len(m.tsdbClients)) + copy(clients, m.tsdbClients) if len(clients) > 0 { clients[0] = &slowClient{clients[0]} } diff --git a/pkg/receive/receive_test.go b/pkg/receive/receive_test.go index 7e90c3c204..bf38cb06ed 100644 --- a/pkg/receive/receive_test.go +++ b/pkg/receive/receive_test.go @@ -210,7 +210,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) { for _, c := range tc.cfg { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -294,7 +294,7 @@ func TestLabelSetsOfTenantsWhenAddingTenants(t *testing.T) { for _, c := range initialConfig { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -319,7 +319,7 @@ func TestLabelSetsOfTenantsWhenAddingTenants(t *testing.T) { for _, c := range changedConfig { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -534,7 +534,7 @@ func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) { for _, c := range initialConfig { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -704,7 +704,7 @@ func TestAddingLabelsWhenTenantAppearsInMultipleHashrings(t *testing.T) { for _, c := range initialConfig { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -778,7 +778,7 @@ func TestReceiverLabelsNotOverwrittenByExternalLabels(t *testing.T) { for _, c := range cfg { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index 9e715bc3a8..c938a4f040 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -1215,74 +1215,3 @@ func TestReceiveCpnp(t *testing.T) { }, v) } - -func TestNewTenant(t *testing.T) { - e, err := e2e.NewDockerEnvironment("new-tenant") - testutil.Ok(t, err) - t.Cleanup(e2ethanos.CleanScenario(t, e)) - - // Setup 3 ingestors. - i1 := e2ethanos.NewReceiveBuilder(e, "i1").WithIngestionEnabled().Init() - i2 := e2ethanos.NewReceiveBuilder(e, "i2").WithIngestionEnabled().Init() - i3 := e2ethanos.NewReceiveBuilder(e, "i3").WithIngestionEnabled().Init() - - h := receive.HashringConfig{ - Endpoints: []receive.Endpoint{ - {Address: i1.InternalEndpoint("grpc")}, - {Address: i2.InternalEndpoint("grpc")}, - {Address: i3.InternalEndpoint("grpc")}, - }, - } - - // Setup 1 distributor with double replication - r1 := e2ethanos.NewReceiveBuilder(e, "r1").WithRouting(2, h).Init() - testutil.Ok(t, e2e.StartAndWaitReady(i1, i2, i3, r1)) - - q := e2ethanos.NewQuerierBuilder(e, "1", i1.InternalEndpoint("grpc"), i2.InternalEndpoint("grpc"), i3.InternalEndpoint("grpc")).Init() - testutil.Ok(t, e2e.StartAndWaitReady(q)) - testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(3), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics())) - - rp1 := e2ethanos.NewReverseProxy(e, "1", "tenant-1", "http://"+r1.InternalEndpoint("remote-write")) - prom1 := e2ethanos.NewPrometheus(e, "1", e2ethanos.DefaultPromConfig("prom1", 0, "http://"+rp1.InternalEndpoint("http")+"/api/v1/receive", "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage()) - testutil.Ok(t, e2e.StartAndWaitReady(rp1, prom1)) - - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) - t.Cleanup(cancel) - - expectedReplicationFactor := 2.0 - - queryAndAssert(t, ctx, q.Endpoint("http"), func() string { return "count(up) by (prometheus, tenant_id)" }, time.Now, promclient.QueryOptions{ - Deduplicate: false, - }, model.Vector{ - &model.Sample{ - Metric: model.Metric{ - "prometheus": "prom1", - "tenant_id": "tenant-1", - }, - Value: model.SampleValue(expectedReplicationFactor), - }, - }) - - rp2 := e2ethanos.NewReverseProxy(e, "2", "tenant-2", "http://"+r1.InternalEndpoint("remote-write")) - prom2 := e2ethanos.NewPrometheus(e, "2", e2ethanos.DefaultPromConfig("prom2", 0, "http://"+rp2.InternalEndpoint("http")+"/api/v1/receive", "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage()) - testutil.Ok(t, e2e.StartAndWaitReady(rp2, prom2)) - - queryAndAssert(t, ctx, q.Endpoint("http"), func() string { return "count(up) by (prometheus, tenant_id)" }, time.Now, promclient.QueryOptions{ - Deduplicate: false, - }, model.Vector{ - &model.Sample{ - Metric: model.Metric{ - "prometheus": "prom1", - "tenant_id": "tenant-1", - }, - Value: model.SampleValue(expectedReplicationFactor), - }, - &model.Sample{ - Metric: model.Metric{ - "prometheus": "prom2", - "tenant_id": "tenant-2", - }, - Value: model.SampleValue(expectedReplicationFactor), - }, - }) -}