Skip to content

Commit

Permalink
lwcapi: support shard dimensions for cluster scoping (#1623)
Browse files Browse the repository at this point in the history
Update cluster matching logic to consider the nf.shard1 and
nf.shard2 dimensions. Helps ensure more precise matching with
clusters using these newer aspects.
  • Loading branch information
brharrington authored Mar 6, 2024
1 parent 3a7dcb1 commit 36024f3
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ExpressionSplitter(config: Config) {

import ExpressionSplitter.*

private val keepKeys = Set("nf.app", "nf.stack", "nf.cluster")
private val keepKeys = Set("nf.app", "nf.cluster", "nf.shard1", "nf.shard2", "nf.stack")

private val interpreter = Interpreter(new CustomVocabulary(config).allWords)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,23 @@ class SubscriptionManager[T](registry: Registry) extends StrictLogging {
def subscriptionsForCluster(cluster: String): List[Subscription] = {
val group = ServerGroup.parse(cluster)
val tags = Map.newBuilder[String, String]
tags += ("nf.cluster" -> group.cluster)
if (group.app != null)
tags += ("nf.app" -> group.app)
if (group.stack != null)
tags += ("nf.stack" -> group.stack)
addIfNotNull(tags, "nf.cluster", group.cluster)
addIfNotNull(tags, "nf.app", group.app)
addIfNotNull(tags, "nf.stack", group.stack)
addIfNotNull(tags, "nf.shard1", group.shard1)
addIfNotNull(tags, "nf.shard2", group.shard2)
queryIndex.matchingEntries(tags.result())
}

private def addIfNotNull(
builder: scala.collection.mutable.Builder[(String, String), Map[String, String]],
key: String,
value: String
): Unit = {
if (value != null)
builder += (key -> value)
}

/**
* Return all of the subscriptions that are in use for a given stream.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,29 @@ class SubscriptionManagerSuite extends FunSuite {
checkSubsForCluster("name,exp1,:eq,nf.stack,dev,:eq,:and,:sum", "www-dev")
}

test("subscriptions for cluster, stack no match") {
assertEquals(subsForCluster("name,exp1,:eq,nf.stack,dev2,:eq,:and,:sum", "www-dev"), Nil)
}

test("subscriptions for cluster, shard1") {
checkSubsForCluster("name,exp1,:eq,nf.shard1,foo,:eq,:and,:sum", "www-dev-x1foo")
}

test("subscriptions for cluster, shard1 no match") {
assertEquals(subsForCluster("name,exp1,:eq,nf.shard1,foo2,:eq,:and,:sum", "www-dev-x1foo"), Nil)
}

test("subscriptions for cluster, shard2") {
checkSubsForCluster("name,exp1,:eq,nf.shard2,bar,:eq,:and,:sum", "www-dev-x1foo-x2bar")
}

test("subscriptions for cluster, shard2 no match") {
assertEquals(
subsForCluster("name,exp1,:eq,nf.shard2,bar2,:eq,:and,:sum", "www-dev-x1foo-x2bar"),
Nil
)
}

test("subscribe to unknown stream") {
val sm = new SubscriptionManager[Integer](new NoopRegistry)
intercept[IllegalStateException] {
Expand Down

0 comments on commit 36024f3

Please sign in to comment.