diff --git a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/ExpressionSplitter.scala b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/ExpressionSplitter.scala index be578cb0a..448500915 100644 --- a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/ExpressionSplitter.scala +++ b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/ExpressionSplitter.scala @@ -40,7 +40,7 @@ class ExpressionSplitter(config: Config) { import ExpressionSplitter.* - private val keepKeys = Set("nf.app", "nf.stack", "nf.cluster") + private val keepKeys = Set("nf.app", "nf.cluster", "nf.shard1", "nf.shard2", "nf.stack") private val interpreter = Interpreter(new CustomVocabulary(config).allWords) diff --git a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscriptionManager.scala b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscriptionManager.scala index 6d0c8b764..281bdc787 100644 --- a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscriptionManager.scala +++ b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscriptionManager.scala @@ -239,14 +239,23 @@ class SubscriptionManager[T](registry: Registry) extends StrictLogging { def subscriptionsForCluster(cluster: String): List[Subscription] = { val group = ServerGroup.parse(cluster) val tags = Map.newBuilder[String, String] - tags += ("nf.cluster" -> group.cluster) - if (group.app != null) - tags += ("nf.app" -> group.app) - if (group.stack != null) - tags += ("nf.stack" -> group.stack) + addIfNotNull(tags, "nf.cluster", group.cluster) + addIfNotNull(tags, "nf.app", group.app) + addIfNotNull(tags, "nf.stack", group.stack) + addIfNotNull(tags, "nf.shard1", group.shard1) + addIfNotNull(tags, "nf.shard2", group.shard2) queryIndex.matchingEntries(tags.result()) } + private def addIfNotNull( + builder: scala.collection.mutable.Builder[(String, String), Map[String, String]], + key: String, + value: String + ): Unit = { + if (value != null) + builder += (key -> value) + } + /** * Return all of the subscriptions that are in use for a given stream. */ diff --git a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/SubscriptionManagerSuite.scala b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/SubscriptionManagerSuite.scala index 24bb3b6e8..a2ec42e53 100644 --- a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/SubscriptionManagerSuite.scala +++ b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/SubscriptionManagerSuite.scala @@ -169,6 +169,26 @@ class SubscriptionManagerSuite extends FunSuite { checkSubsForCluster("name,exp1,:eq,nf.stack,dev,:eq,:and,:sum", "www-dev") } + test("subscriptions for cluster, stack no match") { + assertEquals(subsForCluster("name,exp1,:eq,nf.stack,dev2,:eq,:and,:sum", "www-dev"), Nil) + } + + test("subscriptions for cluster, shard1") { + checkSubsForCluster("name,exp1,:eq,nf.shard1,foo,:eq,:and,:sum", "www-dev-x1foo") + } + + test("subscriptions for cluster, shard1 no match") { + assertEquals(subsForCluster("name,exp1,:eq,nf.shard1,foo2,:eq,:and,:sum", "www-dev-x1foo"), Nil) + } + + test("subscriptions for cluster, shard2") { + checkSubsForCluster("name,exp1,:eq,nf.shard2,bar,:eq,:and,:sum", "www-dev-x1foo-x2bar") + } + + test("subscriptions for cluster, shard2 no match") { + assertEquals(subsForCluster("name,exp1,:eq,nf.shard2,bar2,:eq,:and,:sum", "www-dev-x1foo-x2bar"), Nil) + } + test("subscribe to unknown stream") { val sm = new SubscriptionManager[Integer](new NoopRegistry) intercept[IllegalStateException] {