diff --git a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscriptionManager.scala b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscriptionManager.scala index 8a4d60f2b..cf45bc957 100644 --- a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscriptionManager.scala +++ b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscriptionManager.scala @@ -44,6 +44,7 @@ class SubscriptionManager[T](registry: Registry) extends StrictLogging { private val subHandlers = new ConcurrentHashMap[String, ConcurrentSet[T]]() + @volatile private var subscriptionsList = List.empty[Subscription] @volatile private var queryIndex = QueryIndex.create[Subscription](Nil) @volatile private var queryListChanged = false @@ -57,7 +58,13 @@ class SubscriptionManager[T](registry: Registry) extends StrictLogging { private[lwcapi] def regenerateQueryIndex(): Unit = { if (queryListChanged) { queryListChanged = false - val entries = subscriptions.map { sub => + subscriptionsList = registrations + .values() + .asScala + .flatMap(_.subscriptions) + .toList + .distinct + val entries = subscriptionsList.map { sub => QueryIndex.Entry(sub.query, sub) } queryIndex = QueryIndex.create(entries) @@ -219,12 +226,7 @@ class SubscriptionManager[T](registry: Registry) extends StrictLogging { * Return the set of all current subscriptions across all streams. */ def subscriptions: List[Subscription] = { - registrations - .values() - .asScala - .flatMap(_.subscriptions) - .toList - .distinct + subscriptionsList } /** diff --git a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/SubscriptionManagerSuite.scala b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/SubscriptionManagerSuite.scala index 723d3f88e..7d0db5bef 100644 --- a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/SubscriptionManagerSuite.scala +++ b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/SubscriptionManagerSuite.scala @@ -73,9 +73,11 @@ class SubscriptionManagerSuite extends FunSuite { val exp1 = sub("name,exp1,:eq") sm.subscribe(meta.streamId, exp1) + sm.regenerateQueryIndex() assertEquals(sm.subscriptions, List(exp1)) assert(!sm.register(meta, 1)) + sm.regenerateQueryIndex() assertEquals(sm.subscriptions, List(exp1)) } @@ -86,6 +88,7 @@ class SubscriptionManagerSuite extends FunSuite { val subs = List(sub("name,exp1,:eq"), sub("name,exp2,:eq")) sm.subscribe(meta.streamId, subs) + sm.regenerateQueryIndex() assertEquals(sm.subscriptions.toSet, subs.toSet) } @@ -98,6 +101,7 @@ class SubscriptionManagerSuite extends FunSuite { val s = sub("name,exp1,:eq") sm.subscribe(meta.streamId, s) sm.subscribe(meta.streamId, s) + sm.regenerateQueryIndex() assertEquals(sm.subscriptions, List(s)) } @@ -112,6 +116,7 @@ class SubscriptionManagerSuite extends FunSuite { val s = sub("name,exp1,:eq") sm.subscribe(a.streamId, s) sm.subscribe(b.streamId, s) + sm.regenerateQueryIndex() assertEquals(sm.subscriptions, List(s)) assertEquals(sm.subscriptionsForStream(a.streamId), List(s))