Skip to content

Commit

Permalink
rename cat in cost attribution package to t or tracker
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne committed Dec 19, 2024
1 parent cc0e939 commit c020be0
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 55 deletions.
30 changes: 15 additions & 15 deletions pkg/costattribution/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,46 +123,46 @@ func (m *Manager) purgeInactiveAttributionsUntil(deadline int64) error {
}

invalidKeys := m.inactiveObservationsForUser(userID, deadline)
cat := m.Tracker(userID)
t := m.Tracker(userID)
for _, key := range invalidKeys {
cat.cleanupTrackerAttribution(key)
t.cleanupTrackerAttribution(key)
}

if cat != nil && cat.cooldownUntil != nil && cat.cooldownUntil.Load() < deadline {
if len(cat.observed) <= cat.MaxCardinality() {
cat.state = OverflowComplete
if t != nil && t.cooldownUntil != nil && t.cooldownUntil.Load() < deadline {
if len(t.observed) <= t.MaxCardinality() {
t.state = OverflowComplete
m.deleteTracker(userID)
} else {
cat.cooldownUntil.Store(deadline + cat.cooldownDuration)
t.cooldownUntil.Store(deadline + t.cooldownDuration)
}
}
}
return nil
}

func (m *Manager) inactiveObservationsForUser(userID string, deadline int64) []string {
cat := m.Tracker(userID)
t := m.Tracker(userID)
newTrackedLabels := m.limits.CostAttributionLabels(userID)
sort.Slice(newTrackedLabels, func(i, j int) bool {
return newTrackedLabels[i] < newTrackedLabels[j]
})

if !cat.CompareCALabels(newTrackedLabels) {
if !t.CompareCALabels(newTrackedLabels) {
m.mtx.Lock()
cat = newTracker(userID, newTrackedLabels, m.limits.MaxCostAttributionCardinalityPerUser(userID), m.limits.CostAttributionCooldown(userID), m.logger)
m.trackersByUserID[userID] = cat
t = newTracker(userID, newTrackedLabels, m.limits.MaxCostAttributionCardinalityPerUser(userID), m.limits.CostAttributionCooldown(userID), m.logger)
m.trackersByUserID[userID] = t
m.mtx.Unlock()
return nil
}
maxCardinality := m.limits.MaxCostAttributionCardinalityPerUser(userID)
if cat.MaxCardinality() != maxCardinality {
cat.UpdateMaxCardinality(maxCardinality)
if t.MaxCardinality() != maxCardinality {
t.UpdateMaxCardinality(maxCardinality)
}

cooldown := int64(m.limits.CostAttributionCooldown(userID).Seconds())
if cooldown != cat.CooldownDuration() {
cat.UpdateCooldownDuration(cooldown)
if cooldown != t.CooldownDuration() {
t.UpdateCooldownDuration(cooldown)
}

return cat.InactiveObservations(deadline)
return t.InactiveObservations(deadline)
}
80 changes: 40 additions & 40 deletions pkg/costattribution/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,30 @@ import (
)

func Test_GetCALabels(t *testing.T) {
cat := newTestManager().Tracker("user1")
assert.True(t, cat.CompareCALabels([]string{"team"}), "Expected cost attribution labels mismatch")
tracker := newTestManager().Tracker("user1")
assert.True(t, tracker.CompareCALabels([]string{"team"}), "Expected cost attribution labels mismatch")
}

func Test_GetMaxCardinality(t *testing.T) {
cat := newTestManager().Tracker("user1")
assert.Equal(t, 5, cat.MaxCardinality(), "Expected max cardinality mismatch")
tracker := newTestManager().Tracker("user1")
assert.Equal(t, 5, tracker.MaxCardinality(), "Expected max cardinality mismatch")
}

func Test_CreateCleanupTracker(t *testing.T) {
tManager := newTestManager()
cat := tManager.Tracker("user4")
tracker := tManager.Tracker("user4")

reg := prometheus.NewRegistry()
err := reg.Register(tManager)
require.NoError(t, err)

cat.IncrementActiveSeries(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "1"), time.Unix(1, 0))
cat.IncrementActiveSeries(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "2"), time.Unix(2, 0))
cat.DecrementActiveSeries(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "3"))
cat.IncrementReceivedSamples(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "1"), 5, time.Unix(4, 0))
cat.IncrementDiscardedSamples(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "1"), 2, "sample-out-of-order", time.Unix(4, 0))
cat.IncrementActiveSeries(labels.FromStrings("platform", "bar", "tenant", "user4", "team", "2"), time.Unix(6, 0))
cat.IncrementActiveSeriesFailure(1)
tracker.IncrementActiveSeries(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "1"), time.Unix(1, 0))
tracker.IncrementActiveSeries(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "2"), time.Unix(2, 0))
tracker.DecrementActiveSeries(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "3"))
tracker.IncrementReceivedSamples(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "1"), 5, time.Unix(4, 0))
tracker.IncrementDiscardedSamples(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "1"), 2, "sample-out-of-order", time.Unix(4, 0))
tracker.IncrementActiveSeries(labels.FromStrings("platform", "bar", "tenant", "user4", "team", "2"), time.Unix(6, 0))
tracker.IncrementActiveSeriesFailure(1)

expectedMetrics := `
# HELP cortex_discarded_attributed_samples_total The total number of samples that were discarded per attribution.
Expand All @@ -64,7 +64,7 @@ func Test_CreateCleanupTracker(t *testing.T) {
"cortex_ingester_attributed_active_series_failure",
}
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), metricNames...))
assert.Equal(t, []string{"foo"}, cat.InactiveObservations(5))
assert.Equal(t, []string{"foo"}, tracker.InactiveObservations(5))
assert.NoError(t, tManager.purgeInactiveAttributionsUntil(5))

expectedMetrics = `
Expand All @@ -81,29 +81,29 @@ func Test_CreateCleanupTracker(t *testing.T) {
}

func Test_UpdateCounters(t *testing.T) {
cat := newTestManager().Tracker("user3")
tracker := newTestManager().Tracker("user3")
lbls1 := labels.FromStrings("department", "foo", "service", "bar")
lbls2 := labels.FromStrings("department", "bar", "service", "baz")
lbls3 := labels.FromStrings("department", "baz", "service", "foo")

cat.updateCounters(lbls1, 1, 1, 0, 0, nil)
assert.Equal(t, Normal, cat.state, "First observation, should not overflow")
tracker.updateCounters(lbls1, 1, 1, 0, 0, nil)
assert.Equal(t, Normal, tracker.state, "First observation, should not overflow")

cat.updateCounters(lbls2, 2, 1, 0, 0, nil)
assert.Equal(t, Normal, cat.state, "Second observation, should not overflow")
tracker.updateCounters(lbls2, 2, 1, 0, 0, nil)
assert.Equal(t, Normal, tracker.state, "Second observation, should not overflow")

cat.updateCounters(lbls3, 3, 1, 0, 0, nil)
assert.Equal(t, Overflow, cat.state, "Third observation, should overflow")
tracker.updateCounters(lbls3, 3, 1, 0, 0, nil)
assert.Equal(t, Overflow, tracker.state, "Third observation, should overflow")

cat.updateCounters(lbls3, 4, 1, 0, 0, nil)
assert.Equal(t, Overflow, cat.state, "Fourth observation, should stay overflow")
tracker.updateCounters(lbls3, 4, 1, 0, 0, nil)
assert.Equal(t, Overflow, tracker.state, "Fourth observation, should stay overflow")

assert.Equal(t, int64(3+cat.cooldownDuration), cat.cooldownUntil.Load(), "CooldownUntil should be updated correctly")
assert.Equal(t, int64(3+tracker.cooldownDuration), tracker.cooldownUntil.Load(), "CooldownUntil should be updated correctly")
}

func Test_GetInactiveObservations(t *testing.T) {
// Setup the test environment: create a tracker for user1 with a "team" label and max cardinality of 5.
cat := newTestManager().Tracker("user1")
tracker := newTestManager().Tracker("user1")

// Create two observations with different last update timestamps.
observations := []labels.Labels{
Expand All @@ -112,54 +112,54 @@ func Test_GetInactiveObservations(t *testing.T) {
labels.FromStrings("team", "baz"),
}
// Simulate samples discarded with different timestamps.
cat.IncrementDiscardedSamples(observations[0], 1, "invalid-metrics-name", time.Unix(1, 0))
cat.IncrementDiscardedSamples(observations[1], 2, "out-of-window-sample", time.Unix(12, 0))
cat.IncrementDiscardedSamples(observations[2], 3, "invalid-metrics-name", time.Unix(20, 0))
tracker.IncrementDiscardedSamples(observations[0], 1, "invalid-metrics-name", time.Unix(1, 0))
tracker.IncrementDiscardedSamples(observations[1], 2, "out-of-window-sample", time.Unix(12, 0))
tracker.IncrementDiscardedSamples(observations[2], 3, "invalid-metrics-name", time.Unix(20, 0))

// Ensure that two observations were successfully added to the tracker.
require.Len(t, cat.observed, 3)
require.Len(t, tracker.observed, 3)

// Purge observations that haven't been updated in the last 10 seconds.
purged := cat.InactiveObservations(0)
purged := tracker.InactiveObservations(0)
require.Len(t, purged, 0)

purged = cat.InactiveObservations(10)
purged = tracker.InactiveObservations(10)
assert.ElementsMatch(t, []string{"foo"}, purged)

purged = cat.InactiveObservations(15)
purged = tracker.InactiveObservations(15)
assert.ElementsMatch(t, []string{"foo", "bar"}, purged)

// Check that the purged observation matches the expected details.
purged = cat.InactiveObservations(25)
purged = tracker.InactiveObservations(25)
assert.ElementsMatch(t, []string{"foo", "bar", "baz"}, purged)
}

func Test_UpdateMaxCardinality(t *testing.T) {
// user1 original max cardinality is 5
cat := newTestManager().Tracker("user1")
cat.UpdateMaxCardinality(2)
assert.Equal(t, 2, cat.MaxCardinality(), "Expected max cardinality update to 2")
tracker := newTestManager().Tracker("user1")
tracker.UpdateMaxCardinality(2)
assert.Equal(t, 2, tracker.MaxCardinality(), "Expected max cardinality update to 2")
}

func Test_Concurrency(t *testing.T) {
m := newTestManager()
cat := m.Tracker("user1")
tracker := m.Tracker("user1")

var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
lbls := labels.FromStrings("team", string(rune('A'+(i%26))))
cat.updateCounters(lbls, int64(i), 1, 0, 0, nil)
tracker.updateCounters(lbls, int64(i), 1, 0, 0, nil)
}(i)
}
wg.Wait()

// Verify no data races or inconsistencies
assert.True(t, len(cat.observed) > 0, "Observed set should not be empty after concurrent updates")
assert.LessOrEqual(t, len(cat.observed), 2*cat.MaxCardinality(), "Observed count should not exceed 2 times of max cardinality")
assert.Equal(t, Overflow, cat.state, "Expected state to be Overflow")
assert.True(t, len(tracker.observed) > 0, "Observed set should not be empty after concurrent updates")
assert.LessOrEqual(t, len(tracker.observed), 2*tracker.MaxCardinality(), "Observed count should not exceed 2 times of max cardinality")
assert.Equal(t, Overflow, tracker.state, "Expected state to be Overflow")

expectedMetrics := `
# HELP cortex_ingester_attributed_active_series The total number of active series per user and attribution.
Expand Down

0 comments on commit c020be0

Please sign in to comment.