Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry pick upstream fix to memorize tsdb client #112

Merged
merged 2 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7885](https://github.com/thanos-io/thanos/pull/7885) Store: Return chunks to the pool after completing a Series call.
- [#7893](https://github.com/thanos-io/thanos/pull/7893) Sidecar: Fix retrieval of external labels for Prometheus v3.0.0.
- [#7903](https://github.com/thanos-io/thanos/pull/7903) Query: Fix panic on regex store matchers.
- [#7915](https://github.com/thanos-io/thanos/pull/7915) Store: Close block series client at the end to not reuse chunk buffer
- [#7941](https://github.com/thanos-io/thanos/pull/7941) Receive: Fix race condition when adding multiple new tenants, see [issue-7892](https://github.com/thanos-io/thanos/issues/7892).

### Added
- [#7763](https://github.com/thanos-io/thanos/pull/7763) Ruler: use native histograms for client latency metrics.
Expand Down
82 changes: 57 additions & 25 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type MultiTSDB struct {
hashFunc metadata.HashFunc
hashringConfigs []HashringConfig

tsdbClients []store.Client
exemplarClients map[string]*exemplars.TSDB

metricNameFilterEnabled bool
}

Expand Down Expand Up @@ -101,6 +104,8 @@ func NewMultiTSDB(
mtx: &sync.RWMutex{},
tenants: map[string]*tenant{},
labels: labels,
tsdbClients: make([]store.Client, 0),
exemplarClients: map[string]*exemplars.TSDB{},
tenantLabelName: tenantLabelName,
bucket: bucket,
allowOutOfOrderUpload: allowOutOfOrderUpload,
Expand All @@ -116,14 +121,57 @@ func NewMultiTSDB(

func (t *MultiTSDB) GetTenants() []string {
t.mtx.RLock()
defer t.mtx.RUnlock()
tenants := make([]string, 0, len(t.tenants))
for tname := range t.tenants {
tenants = append(tenants, tname)
}
defer t.mtx.RUnlock()
Copy link
Collaborator Author

@jnyi jnyi Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aha, found a potential bug cc @yuchen-db, you should defer Unlock right after, otherwise if there is a return statement in between, the lock won't be released ever.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks

return tenants
}

// testGetTenant returns the tenant with the given tenantID for testing purposes.
func (t *MultiTSDB) testGetTenant(tenantID string) *tenant {
t.mtx.RLock()
defer t.mtx.RUnlock()
return t.tenants[tenantID]
}

func (t *MultiTSDB) updateTSDBClients() {
t.tsdbClients = t.tsdbClients[:0]
for _, tenant := range t.tenants {
client := tenant.client()
if client != nil {
t.tsdbClients = append(t.tsdbClients, client)
}
}
}

func (t *MultiTSDB) addTenantUnlocked(tenantID string, newTenant *tenant) {
t.tenants[tenantID] = newTenant
t.updateTSDBClients()
if newTenant.exemplars() != nil {
t.exemplarClients[tenantID] = newTenant.exemplars()
}
}

func (t *MultiTSDB) addTenantLocked(tenantID string, newTenant *tenant) {
t.mtx.Lock()
defer t.mtx.Unlock()
t.addTenantUnlocked(tenantID, newTenant)
}

func (t *MultiTSDB) removeTenantUnlocked(tenantID string) {
delete(t.tenants, tenantID)
delete(t.exemplarClients, tenantID)
t.updateTSDBClients()
}

func (t *MultiTSDB) removeTenantLocked(tenantID string) {
t.mtx.Lock()
defer t.mtx.Unlock()
t.removeTenantUnlocked(tenantID)
}

type localClient struct {
store *store.TSDBStore

Expand Down Expand Up @@ -426,7 +474,7 @@ func (t *MultiTSDB) Prune(ctx context.Context) error {
}

level.Info(t.logger).Log("msg", "Pruned tenant", "tenant", tenantID)
delete(t.tenants, tenantID)
t.removeTenantUnlocked(tenantID)
}

return merr.Err()
Expand Down Expand Up @@ -586,33 +634,18 @@ func (t *MultiTSDB) RemoveLockFilesIfAny() error {
return merr.Err()
}

// TSDBLocalClients should be used as read-only.
func (t *MultiTSDB) TSDBLocalClients() []store.Client {
t.mtx.RLock()
defer t.mtx.RUnlock()

res := make([]store.Client, 0, len(t.tenants))
for _, tenant := range t.tenants {
client := tenant.client()
if client != nil {
res = append(res, client)
}
}

return res
return t.tsdbClients
}

// TSDBExemplars should be used as read-only.
func (t *MultiTSDB) TSDBExemplars() map[string]*exemplars.TSDB {
t.mtx.RLock()
defer t.mtx.RUnlock()

res := make(map[string]*exemplars.TSDB, len(t.tenants))
for k, tenant := range t.tenants {
e := tenant.exemplars()
if e != nil {
res[k] = e
}
}
return res
return t.exemplarClients
}

func (t *MultiTSDB) TenantStats(limit int, statsByLabelName string, tenantIDs ...string) []status.TenantStats {
Expand Down Expand Up @@ -687,9 +720,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
nil,
)
if err != nil {
t.mtx.Lock()
delete(t.tenants, tenantID)
t.mtx.Unlock()
t.removeTenantLocked(tenantID)
return err
}
var ship *shipper.Shipper
Expand All @@ -712,6 +743,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
options = append(options, store.WithCuckooMetricNameStoreFilter())
}
tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, options...), s, ship, exemplars.NewTSDB(s, lset))
t.addTenantLocked(tenantID, tenant) // need to update the client list once store is ready & client != nil
level.Info(logger).Log("msg", "TSDB is now ready")
return nil
}
Expand Down Expand Up @@ -740,7 +772,7 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan
}

tenant = newTenant()
t.tenants[tenantID] = tenant
t.addTenantUnlocked(tenantID, tenant)
t.mtx.Unlock()

logger := log.With(t.logger, "tenant", tenantID)
Expand Down
7 changes: 5 additions & 2 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func TestMultiTSDB(t *testing.T) {
testutil.Ok(t, m.Open())
testutil.Ok(t, appendSample(m, testTenant, time.Now()))

tenant := m.tenants[testTenant]
tenant := m.testGetTenant(testTenant)
db := tenant.readyStorage().Get()

testutil.Equals(t, 0, len(db.Blocks()))
Expand Down Expand Up @@ -843,7 +843,10 @@ func appendSampleWithLabels(m *MultiTSDB, tenant string, lbls labels.Labels, tim

func queryLabelValues(ctx context.Context, m *MultiTSDB) error {
proxy := store.NewProxyStore(nil, nil, func() []store.Client {
clients := m.TSDBLocalClients()
m.mtx.Lock()
defer m.mtx.Unlock()
clients := make([]store.Client, len(m.tsdbClients))
copy(clients, m.tsdbClients)
if len(clients) > 0 {
clients[0] = &slowClient{clients[0]}
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/receive/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) {

for _, c := range tc.cfg {
for _, tenantId := range c.Tenants {
if m.tenants[tenantId] == nil {
if m.testGetTenant(tenantId) == nil {
err = appendSample(m, tenantId, time.Now())
require.NoError(t, err)
}
Expand Down Expand Up @@ -294,7 +294,7 @@ func TestLabelSetsOfTenantsWhenAddingTenants(t *testing.T) {

for _, c := range initialConfig {
for _, tenantId := range c.Tenants {
if m.tenants[tenantId] == nil {
if m.testGetTenant(tenantId) == nil {
err = appendSample(m, tenantId, time.Now())
require.NoError(t, err)
}
Expand All @@ -319,7 +319,7 @@ func TestLabelSetsOfTenantsWhenAddingTenants(t *testing.T) {

for _, c := range changedConfig {
for _, tenantId := range c.Tenants {
if m.tenants[tenantId] == nil {
if m.testGetTenant(tenantId) == nil {
err = appendSample(m, tenantId, time.Now())
require.NoError(t, err)
}
Expand Down Expand Up @@ -534,7 +534,7 @@ func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) {

for _, c := range initialConfig {
for _, tenantId := range c.Tenants {
if m.tenants[tenantId] == nil {
if m.testGetTenant(tenantId) == nil {
err = appendSample(m, tenantId, time.Now())
require.NoError(t, err)
}
Expand Down Expand Up @@ -704,7 +704,7 @@ func TestAddingLabelsWhenTenantAppearsInMultipleHashrings(t *testing.T) {

for _, c := range initialConfig {
for _, tenantId := range c.Tenants {
if m.tenants[tenantId] == nil {
if m.testGetTenant(tenantId) == nil {
err = appendSample(m, tenantId, time.Now())
require.NoError(t, err)
}
Expand Down Expand Up @@ -778,7 +778,7 @@ func TestReceiverLabelsNotOverwrittenByExternalLabels(t *testing.T) {

for _, c := range cfg {
for _, tenantId := range c.Tenants {
if m.tenants[tenantId] == nil {
if m.testGetTenant(tenantId) == nil {
err = appendSample(m, tenantId, time.Now())
require.NoError(t, err)
}
Expand Down
71 changes: 0 additions & 71 deletions test/e2e/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1215,74 +1215,3 @@ func TestReceiveCpnp(t *testing.T) {
}, v)

}

func TestNewTenant(t *testing.T) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this test introduced in #100 which didn't capture the bug anyway

e, err := e2e.NewDockerEnvironment("new-tenant")
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

// Setup 3 ingestors.
i1 := e2ethanos.NewReceiveBuilder(e, "i1").WithIngestionEnabled().Init()
i2 := e2ethanos.NewReceiveBuilder(e, "i2").WithIngestionEnabled().Init()
i3 := e2ethanos.NewReceiveBuilder(e, "i3").WithIngestionEnabled().Init()

h := receive.HashringConfig{
Endpoints: []receive.Endpoint{
{Address: i1.InternalEndpoint("grpc")},
{Address: i2.InternalEndpoint("grpc")},
{Address: i3.InternalEndpoint("grpc")},
},
}

// Setup 1 distributor with double replication
r1 := e2ethanos.NewReceiveBuilder(e, "r1").WithRouting(2, h).Init()
testutil.Ok(t, e2e.StartAndWaitReady(i1, i2, i3, r1))

q := e2ethanos.NewQuerierBuilder(e, "1", i1.InternalEndpoint("grpc"), i2.InternalEndpoint("grpc"), i3.InternalEndpoint("grpc")).Init()
testutil.Ok(t, e2e.StartAndWaitReady(q))
testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(3), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics()))

rp1 := e2ethanos.NewReverseProxy(e, "1", "tenant-1", "http://"+r1.InternalEndpoint("remote-write"))
prom1 := e2ethanos.NewPrometheus(e, "1", e2ethanos.DefaultPromConfig("prom1", 0, "http://"+rp1.InternalEndpoint("http")+"/api/v1/receive", "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage())
testutil.Ok(t, e2e.StartAndWaitReady(rp1, prom1))

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
t.Cleanup(cancel)

expectedReplicationFactor := 2.0

queryAndAssert(t, ctx, q.Endpoint("http"), func() string { return "count(up) by (prometheus, tenant_id)" }, time.Now, promclient.QueryOptions{
Deduplicate: false,
}, model.Vector{
&model.Sample{
Metric: model.Metric{
"prometheus": "prom1",
"tenant_id": "tenant-1",
},
Value: model.SampleValue(expectedReplicationFactor),
},
})

rp2 := e2ethanos.NewReverseProxy(e, "2", "tenant-2", "http://"+r1.InternalEndpoint("remote-write"))
prom2 := e2ethanos.NewPrometheus(e, "2", e2ethanos.DefaultPromConfig("prom2", 0, "http://"+rp2.InternalEndpoint("http")+"/api/v1/receive", "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage())
testutil.Ok(t, e2e.StartAndWaitReady(rp2, prom2))

queryAndAssert(t, ctx, q.Endpoint("http"), func() string { return "count(up) by (prometheus, tenant_id)" }, time.Now, promclient.QueryOptions{
Deduplicate: false,
}, model.Vector{
&model.Sample{
Metric: model.Metric{
"prometheus": "prom1",
"tenant_id": "tenant-1",
},
Value: model.SampleValue(expectedReplicationFactor),
},
&model.Sample{
Metric: model.Metric{
"prometheus": "prom2",
"tenant_id": "tenant-2",
},
Value: model.SampleValue(expectedReplicationFactor),
},
})
}
Loading