diff --git a/atlas-core/src/main/scala/com/netflix/atlas/core/index/QueryIndex.scala b/atlas-core/src/main/scala/com/netflix/atlas/core/index/QueryIndex.scala deleted file mode 100644 index 22abb1fb4..000000000 --- a/atlas-core/src/main/scala/com/netflix/atlas/core/index/QueryIndex.scala +++ /dev/null @@ -1,272 +0,0 @@ -/* - * Copyright 2014-2024 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.netflix.atlas.core.index - -import com.netflix.atlas.core.model.Query -import com.netflix.atlas.core.util.SmallHashMap - -/** - * Index for quickly matching a set of tags against many query expressions. The intended use-case - * is for stream processing. If a stream of tagged data points are flowing through the system - * and we have thousands of queries, then we need efficient ways to: - * - * 1. Check if a datapoint is a match to any of the queries. This can be used to quickly filter - * out data that isn't going to be needed. - * 2. Figure out which queries or expressions match a given datapoint. - * - * @param indexes - * Map of :eq query to a sub-index. This is used to recursively search the set after finding - * the first match. - * @param entries - * Entries that remain after checking all the simple :eq queries. This list will be searched - * using a linear scan to get final matching with regex or other more complicated query - * clauses. - */ -case class QueryIndex[T]( - indexes: SmallHashMap[Query.Equal, QueryIndex[T]], - entries: Array[QueryIndex.Entry[T]] -) { - - import QueryIndex.* - - private def createQueryArray(tags: Map[String, String]): Array[Query.Equal] = { - val n = tags.size - var queries = queryArrays.get() - if (queries == null || queries.length < n) { - queries = new Array[Query.Equal](n) - queryArrays.set(queries) - } - - var i = 0 - tags.foreachEntry { (k, v) => - queries(i) = Query.Equal(k, v) - i += 1 - } - queries - } - - /** Returns true if the tags match any of the queries in the index. */ - def matches(tags: Map[String, String]): Boolean = { - val queries = createQueryArray(tags) - val result = matches(tags, queries, 0, tags.size) - // clear array to ensure entries are eligible for GC - java.util.Arrays.fill(queries.asInstanceOf[Array[AnyRef]], null) - result - } - - private def matches( - tags: Map[String, String], - queries: Array[Query.Equal], - i: Int, - n: Int - ): Boolean = { - if (i < n) { - val q = queries(i) - val qt = indexes.getOrNull(q) - val children = if (qt != null) qt.matches(tags, queries, i + 1, n) else false - children || entriesExists(tags) || matches(tags, queries, i + 1, n) - } else { - entriesExists(tags) - } - } - - /** Finds the set of items that match the provided tags. */ - def matchingEntries(tags: Map[String, String]): List[T] = { - val queries = createQueryArray(tags) - val result = matchingEntries(tags, queries, 0, tags.size).distinct - // clear array to ensure entries are eligible for GC - java.util.Arrays.fill(queries.asInstanceOf[Array[AnyRef]], null) - result - } - - private def matchingEntries( - tags: Map[String, String], - queries: Array[Query.Equal], - i: Int, - n: Int - ): List[T] = { - if (i < n) { - val q = queries(i) - val qt = indexes.getOrNull(q) - val children = if (qt != null) qt.matchingEntries(tags, queries, i + 1, n) else Nil - children ::: entriesFilter(tags) ::: matchingEntries(tags, queries, i + 1, n) - } else { - entriesFilter(tags) - } - } - - /** Performance optimization for: entries.exists(_.query.matches(tags)) */ - private def entriesExists(tags: Map[String, String]): Boolean = { - var i = 0 - while (i < entries.length) { - if (entries(i).query.matches(tags)) { - return true - } - i += 1 - } - false - } - - /** Performance optimization for: entries.filter(_.query.matches(tags)).map(_.value) */ - private def entriesFilter(tags: Map[String, String]): List[T] = { - var result = List.empty[T] - var i = 0 - while (i < entries.length) { - if (entries(i).query.matches(tags)) { - result = entries(i).value :: result - } - i += 1 - } - result - } - - /** - * Creates a string representation of the index tree. Warning: this can be large if many queries - * are indexed. - */ - override def toString: String = { - val buf = new java.lang.StringBuilder - append(buf, 0) - buf.toString - } - - private def append(buf: java.lang.StringBuilder, indent: Int): Unit = { - val pad1 = " " * indent - val pad2 = " " * (indent + 1) - buf.append(pad1).append("children\n") - indexes.foreach { - case (k, child) => - buf.append(pad2).append(k).append('\n') - child.append(buf, indent + 2) - } - buf.append(pad1).append("queries\n") - entries.foreach { e => - buf.append(pad2).append(e.query).append('\n') - } - } -} - -/** - * Helper for building an index. - */ -object QueryIndex { - - type IndexMap[T <: Any] = scala.collection.mutable.AnyRefMap[AnyRef, QueryIndex[T]] - - case class Entry[T](query: Query, value: T) - - private case class AnnotatedEntry[T](entry: Entry[T], filters: Set[Query.Equal]) { - - def toList: List[(Query.Equal, AnnotatedEntry[T])] = { - filters.toList.map(q => q -> AnnotatedEntry(entry, filters - q)) - } - } - - /** - * Create an index based on a list of queries. The value for the entry will be the raw input - * query. - */ - def apply(queries: List[Query]): QueryIndex[Query] = { - create(queries.map(q => Entry(q, q))) - } - - /** - * Create an index based on a list of entries. - */ - def create[T](entries: List[Entry[T]]): QueryIndex[T] = { - val annotated = entries.flatMap { entry => - val qs = Query.dnfList(entry.query).flatMap(q => Query.expandInClauses(q)) - qs.map(q => annotate(Entry(q, entry.value))) - } - val idxMap = new IndexMap[T] - createImpl(idxMap, annotated) - } - - /** - * Recursively build the index. - */ - private def createImpl[T]( - idxMap: IndexMap[T], - entries: List[AnnotatedEntry[T]] - ): QueryIndex[T] = { - idxMap.get(entries) match { - case Some(idx) => idx - case None => - val (children, leaf) = entries.partition(_.filters.nonEmpty) - val trees = children.flatMap(_.toList).groupBy(_._1).map { - case (q, ts) => - q -> createImpl(idxMap, ts.map(_._2)) - } - val idx = QueryIndex(smallMap(trees), leaf.map(_.entry).toArray) - idxMap += entries -> idx - idx - } - } - - /** - * Convert to a SmallHashMap to get a more compact memory representation. - */ - private def smallMap[T]( - m: Map[Query.Equal, QueryIndex[T]] - ): SmallHashMap[Query.Equal, QueryIndex[T]] = { - - // Otherwise, convert to a SmallHashMap. Note that default apply will create a - // map with the exact size of the input to optimize for memory use. This results - // in terrible performance for lookups of items that are not in the map because - // the entire array has to be scanned. - // - // In this case we use the builder and give 2x the size of the input so there - // will be 50% unused entries. Since we expect many misses this gives us better - // performance and memory overhead isn't too bad. It is still much lower than - // default immutable map since we don't need entry objects. - val size = m.size * 2 - val builder = new SmallHashMap.Builder[Query.Equal, QueryIndex[T]](size) - builder.addAll(m) - val sm = builder.result - sm - } - - /** - * Convert a query into a list of query clauses that are ANDd together. - */ - private def conjunctionList(query: Query): List[Query] = { - query match { - case Query.And(q1, q2) => conjunctionList(q1) ::: conjunctionList(q2) - case q => List(q) - } - } - - /** - * Annotate an entry with a set of :eq queries that should filter in the input before checking - * against the final remaining query. Ideally if the query is only using :eq and :and the final - * remainder will be :true. - */ - private def annotate[T](entry: Entry[T]): AnnotatedEntry[T] = { - val distinct = conjunctionList(entry.query).distinct - val filters = distinct.collect { case q: Query.Equal => q } - val remainder = distinct.collect { case q if !q.isInstanceOf[Query.Equal] => q } - val remainderQ = - if (remainder.isEmpty) Query.True - else - remainder.reduce { (a, b) => - Query.And(a, b) - } - AnnotatedEntry(Entry(remainderQ, entry.value), filters.toSet) - } - - /** Arrays for the set of queries that can be reused across calls to avoid allocation. */ - private val queryArrays = new ThreadLocal[Array[Query.Equal]] -} diff --git a/atlas-core/src/main/scala/com/netflix/atlas/core/model/DataExpr.scala b/atlas-core/src/main/scala/com/netflix/atlas/core/model/DataExpr.scala index 7df7a085b..a090449fc 100644 --- a/atlas-core/src/main/scala/com/netflix/atlas/core/model/DataExpr.scala +++ b/atlas-core/src/main/scala/com/netflix/atlas/core/model/DataExpr.scala @@ -16,10 +16,9 @@ package com.netflix.atlas.core.model import java.time.Duration - import com.netflix.atlas.core.model.ConsolidationFunction.SumOrAvgCf import com.netflix.atlas.core.util.Math -import com.netflix.atlas.core.util.SmallHashMap +import com.netflix.atlas.core.util.SortedTagMap import com.netflix.atlas.core.util.Strings sealed trait DataExpr extends TimeSeriesExpr with Product { @@ -76,7 +75,7 @@ sealed trait DataExpr extends TimeSeriesExpr with Product { object DataExpr { - private val unknown = SmallHashMap("name" -> "unknown") + private val unknown = SortedTagMap("name" -> "unknown") private def defaultLabel(expr: DataExpr, ts: TimeSeries): String = { val label = expr match { diff --git a/atlas-core/src/main/scala/com/netflix/atlas/core/model/ItemIdCalculator.scala b/atlas-core/src/main/scala/com/netflix/atlas/core/model/ItemIdCalculator.scala index fbdf7eb7f..d2cdc5da7 100644 --- a/atlas-core/src/main/scala/com/netflix/atlas/core/model/ItemIdCalculator.scala +++ b/atlas-core/src/main/scala/com/netflix/atlas/core/model/ItemIdCalculator.scala @@ -17,7 +17,6 @@ package com.netflix.atlas.core.model import com.netflix.atlas.core.util.ArrayHelper import com.netflix.atlas.core.util.Hash -import com.netflix.atlas.core.util.SmallHashMap import com.netflix.atlas.core.util.SortedTagMap import java.nio.ByteBuffer @@ -61,16 +60,6 @@ class ItemIdCalculator { tags match { case ts: SortedTagMap => ts.copyToArray(pairs) - case ts: SmallHashMap[String, String] => - var pos = 0 - val iter = ts.entriesIterator - while (iter.hasNext) { - pairs(pos) = iter.key - pairs(pos + 1) = iter.value - iter.nextEntry() - pos += 2 - } - ArrayHelper.sortPairs(pairs, length) case _ => var pos = 0 tags.foreachEntry { (k, v) => diff --git a/atlas-core/src/main/scala/com/netflix/atlas/core/model/Query.scala b/atlas-core/src/main/scala/com/netflix/atlas/core/model/Query.scala index 6c6ed7cde..ffde56741 100644 --- a/atlas-core/src/main/scala/com/netflix/atlas/core/model/Query.scala +++ b/atlas-core/src/main/scala/com/netflix/atlas/core/model/Query.scala @@ -16,7 +16,7 @@ package com.netflix.atlas.core.model import com.netflix.atlas.core.stacklang.Interpreter -import com.netflix.atlas.core.util.SmallHashMap +import com.netflix.atlas.core.util.SortedTagMap import com.netflix.spectator.impl.PatternMatcher sealed trait Query extends Expr { @@ -285,7 +285,7 @@ object Query { def matches(tags: Map[String, String]): Boolean = { tags match { - case ts: SmallHashMap[String, String] => + case ts: SortedTagMap => val v = ts.getOrNull(k) v != null && check(v) case _ => @@ -294,18 +294,12 @@ object Query { } def matchesAny(tags: Map[String, List[String]]): Boolean = { - tags match { - case ts: SmallHashMap[String, ?] => - val vs = ts.getOrNull(k).asInstanceOf[List[String]] - vs != null && vs.exists(check) - case _ => - tags.get(k).exists(_.exists(check)) - } + tags.get(k).exists(_.exists(check)) } def couldMatch(tags: Map[String, String]): Boolean = { tags match { - case ts: SmallHashMap[String, String] => + case ts: SortedTagMap => val v = ts.getOrNull(k) v == null || check(v) case _ => diff --git a/atlas-core/src/main/scala/com/netflix/atlas/core/model/TaggedItem.scala b/atlas-core/src/main/scala/com/netflix/atlas/core/model/TaggedItem.scala index 4789341c7..287d82e2f 100644 --- a/atlas-core/src/main/scala/com/netflix/atlas/core/model/TaggedItem.scala +++ b/atlas-core/src/main/scala/com/netflix/atlas/core/model/TaggedItem.scala @@ -17,7 +17,7 @@ package com.netflix.atlas.core.model import com.netflix.atlas.core.util.InternMap import com.netflix.atlas.core.util.Interner -import com.netflix.atlas.core.util.SmallHashMap +import com.netflix.atlas.core.util.SortedTagMap /** * Helper functions for manipulating tagged items. @@ -57,7 +57,7 @@ object TaggedItem { val iter = tags.iterator.map { t => strInterner.intern(t._1) -> strInterner.intern(t._2) } - val smallMap = SmallHashMap(tags.size, iter) + val smallMap = SortedTagMap(iter) tagsInterner.intern(smallMap) } diff --git a/atlas-core/src/main/scala/com/netflix/atlas/core/util/IdMap.scala b/atlas-core/src/main/scala/com/netflix/atlas/core/util/IdMap.scala index 130185629..a5f5f60b6 100644 --- a/atlas-core/src/main/scala/com/netflix/atlas/core/util/IdMap.scala +++ b/atlas-core/src/main/scala/com/netflix/atlas/core/util/IdMap.scala @@ -25,11 +25,11 @@ import com.netflix.spectator.api.Utils final case class IdMap(id: Id) extends scala.collection.immutable.Map[String, String] { override def removed(key: String): Map[String, String] = { - SmallHashMap(this).removed(key) + SortedTagMap(this).removed(key) } override def updated[V1 >: String](key: String, value: V1): Map[String, V1] = { - SmallHashMap(this).updated(key, value) + SortedTagMap(this).updated(key, value) } override def get(key: String): Option[String] = { diff --git a/atlas-core/src/main/scala/com/netflix/atlas/core/util/SmallHashMap.scala b/atlas-core/src/main/scala/com/netflix/atlas/core/util/SmallHashMap.scala deleted file mode 100644 index 0cd1c6d3d..000000000 --- a/atlas-core/src/main/scala/com/netflix/atlas/core/util/SmallHashMap.scala +++ /dev/null @@ -1,507 +0,0 @@ -/* - * Copyright 2014-2024 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.netflix.atlas.core.util - -object SmallHashMap { - - def empty[K <: Any, V <: Any]: SmallHashMap[K, V] = new SmallHashMap[K, V](Array.empty, 0) - - def apply[K <: Any, V <: Any](ts: (K, V)*): SmallHashMap[K, V] = { - apply(ts.size, ts.iterator) - } - - def apply[K <: Any, V <: Any](ts: Iterable[(K, V)]): SmallHashMap[K, V] = { - val seq = ts.toSeq - apply(seq.size, seq.iterator) - } - - def apply[K <: Any, V <: Any](length: Int, iter: Iterator[(K, V)]): SmallHashMap[K, V] = { - val b = new Builder[K, V](length) - while (iter.hasNext) { - val t = iter.next() - b.add(t._1, t._2) - } - b.result - } - - class Builder[K <: Any, V <: Any](size: Int) { - - private val buf = new Array[Any](size * 2) - private var actualSize = 0 - - def +=(pair: (K, V)): Unit = add(pair._1, pair._2) - - @scala.annotation.nowarn - def add(k: K, v: V): Builder[K, V] = { - val pos = Hash.absOrZero(k.hashCode) % size - var i = pos - var ki = buf(i * 2) - var keq = k.equals(ki) - while (ki != null && !keq) { - i = (i + 1) % size - require(i != pos, "data array is full") - ki = buf(i * 2) - keq = k.equals(ki) - } - - if (keq) { - buf(i * 2) = k - buf(i * 2 + 1) = v - } else { - require(buf(i * 2) == null, "position has already been filled") - buf(i * 2) = k - buf(i * 2 + 1) = v - actualSize += 1 - } - this - } - - def addAll(m: Map[K, V]): Builder[K, V] = { - m match { - case sm: SmallHashMap[?, ?] => - var i = 0 - while (i < sm.data.length) { - val k = sm.data(i).asInstanceOf[K] - if (k != null) add(k, sm.data(i + 1).asInstanceOf[V]) - i += 2 - } - case _ => - m.foreach { t => - add(t._1, t._2) - } - } - this - } - - def result: SmallHashMap[K, V] = { - new SmallHashMap[K, V](buf, actualSize) - } - - def compact: SmallHashMap[K, V] = { - if (actualSize == size) { - new SmallHashMap[K, V](buf, actualSize) - } else { - val b = new Builder[K, V](actualSize) - var i = 0 - while (i < buf.length) { - if (buf(i) != null) b.add(buf(i).asInstanceOf[K], buf(i + 1).asInstanceOf[V]) - i += 2 - } - b.result - } - } - } - - class EntryIterator[K <: Any, V <: Any](map: SmallHashMap[K, V]) extends Iterator[(K, V)] { - - private final val len = map.data.length - var pos = 0 - skipEmptyEntries() - - def hasNext: Boolean = pos < len - - def next(): (K, V) = { - val t = key -> value - nextEntry() - t - } - - def nextEntry(): Unit = { - pos += 2 - skipEmptyEntries() - } - - private def skipEmptyEntries(): Unit = { - while (pos < len && map.data(pos) == null) { - pos += 2 - } - } - - @inline def key: K = map.data(pos).asInstanceOf[K] - - @inline def value: V = map.data(pos + 1).asInstanceOf[V] - } -} - -/** - * Simple immutable hash map implementation intended for use-cases where the number of entries is - * known to be small. This implementation is backed by a single array and uses open addressing with - * linear probing to resolve conflicts. The underlying array is created to exactly fit the data - * size so hash collisions tend to be around 50%, but have a fairly low number of probes to find - * the actual entry. With a cheap equals function for the keys lookups should be fast and there - * is low memory overhead. - * - * You probably don't want to use this implementation if you expect more than around 50 keys in the - * map. If you have millions of small immutable maps, such as tag data associated with metrics, - * it may be a good fit. - * - * @param data array with the items - * @param dataLength number of pairs contained within the array starting at index 0. - */ -final class SmallHashMap[K <: Any, V <: Any] private (val data: Array[Any], dataLength: Int) - extends scala.collection.immutable.Map[K, V] { - - require(data.length % 2 == 0) - - private[this] var cachedHashCode: Int = 0 - - @inline - private def hash(k: Any): Int = { - val capacity = data.length / 2 - Hash.absOrZero(k.hashCode) % capacity - } - - @scala.annotation.nowarn - def getOrNull(key: K): V = { - if (dataLength == 0) return null.asInstanceOf[V] - val capacity = data.length / 2 - val pos = hash(key) - var i = pos - var ki = data(i * 2) - if (ki == null || ki.equals(key)) { - data(i * 2 + 1).asInstanceOf[V] - } else { - var found = false - i = (i + 1) % capacity - ki = data(i * 2) - found = ki == null || ki.equals(key) - while (!found && i != pos) { - i = (i + 1) % capacity - ki = data(i * 2) - found = ki == null || ki.equals(key) - } - val v = if (found) data(i * 2 + 1) else null - v.asInstanceOf[V] - } - } - - override def get(key: K): Option[V] = Option(getOrNull(key)) - - override def contains(key: K): Boolean = { - getOrNull(key) != null - } - - override def foreach[U](f: ((K, V)) => U): Unit = { - var i = 0 - while (i < data.length) { - if (data(i) != null) f(data(i).asInstanceOf[K] -> data(i + 1).asInstanceOf[V]) - i += 2 - } - } - - /** - * Call the function `f` for each tuple in the map without requiring a temporary object to be - * created. - */ - override def foreachEntry[U](f: (K, V) => U): Unit = { - var i = 0 - while (i < data.length) { - if (data(i) != null) f(data(i).asInstanceOf[K], data(i + 1).asInstanceOf[V]) - i += 2 - } - } - - /** - * Call the function `f` for each tuple in the map without requiring a temporary object to be - * created. - */ - @deprecated("Use `foreachEntry` instead.", "1.7.0") - def foreachItem(f: (K, V) => Unit): Unit = { - foreachEntry(f) - } - - def find(f: (K, V) => Boolean): Option[(K, V)] = { - var i = 0 - while (i < data.length) { - if (data(i) != null && f(data(i).asInstanceOf[K], data(i + 1).asInstanceOf[V])) { - return Some(data(i).asInstanceOf[K] -> data(i + 1).asInstanceOf[V]) - } - i += 2 - } - None - } - - def entriesIterator: SmallHashMap.EntryIterator[K, V] = { - new SmallHashMap.EntryIterator[K, V](this) - } - - def iterator: Iterator[(K, V)] = entriesIterator - - override def keysIterator: Iterator[K] = new Iterator[K] { - - private val iter = entriesIterator - - def hasNext: Boolean = iter.hasNext - - def next(): K = { - val k = iter.key - iter.nextEntry() - k - } - } - - override def valuesIterator: Iterator[V] = new Iterator[V] { - - private val iter = entriesIterator - - def hasNext: Boolean = iter.hasNext - - def next(): V = { - val v = iter.value - iter.nextEntry() - v - } - } - - /** - * Returns the number of keys that are not in the correct position based on their hash code. - */ - def numCollisions: Int = { - var count = 0 - var i = 0 - while (i < data.length) { - if (data(i) != null && hash(data(i)) != i / 2) count += 1 - i += 2 - } - count - } - - /** - * Returns the average number of probes that are required for looking up keys in this map. In - * general we want this number to be less than N/4. If we naively did a linear scan of the - * full data it would be N/2. - */ - def numProbesPerKey: Double = { - val capacity = data.length / 2 - var total = 0 - keys.foreach { k => - var i = hash(k) - while (!areEqual(data(i * 2), k)) { - total += 1 - i = (i + 1) % capacity - } - } - total.toDouble / dataLength - } - - def updated[V1 >: V](k: K, v: V1): collection.immutable.Map[K, V1] = { - val b = new SmallHashMap.Builder[K, V1](size + 1) - var i = 0 - while (i < data.length) { - if (data(i) != null) { - b.add(data(i).asInstanceOf[K], data(i + 1).asInstanceOf[V]) - } - i += 2 - } - b.add(k, v) - b.result - } - - def removed(key: K): collection.immutable.Map[K, V] = { - if (contains(key)) { - val b = new SmallHashMap.Builder[K, V](size - 1) - var i = 0 - while (i < data.length) { - val k = data(i).asInstanceOf[K] - if (k != null && !areEqual(key, k)) { - b.add(k, data(i + 1).asInstanceOf[V]) - } - i += 2 - } - b.result - } else { - this - } - } - - def ++(m: Map[K, V]): collection.immutable.Map[K, V] = { - val b = new SmallHashMap.Builder[K, V](size + m.size) - b.addAll(this) - b.addAll(m) - b.result - } - - /** Constant time operation to check if the map is empty */ - override def isEmpty: Boolean = dataLength == 0 - - /** Constant time operation to get the number of pairs in the map. */ - override def size: Int = dataLength - - /** - * Overridden to get better performance. See SmallHashMapHashCode benchmark for a - * comparison with the default for various inputs. - */ - override def hashCode: Int = { - - // Pattern copied from String.java of jdk - if (cachedHashCode == 0) { - cachedHashCode = computeHashCode - } - cachedHashCode - } - - /** - * Compute the hash code for the map. This method is based on the - * [[scala.util.hashing.MurmurHash3.unorderedHash()]] method. It is more efficient - * for our purposes because it avoids creating tons of [[scala.runtime.IntRef]] - * objects as well as tuples during iteration. - */ - private[util] def computeHashCode: Int = { - var a, b = 0 - var c = 1 - var i = 0 - while (i < data.length) { - if (data(i) != null) { - val h = data(i).hashCode - a += h - b ^= h - if (h != 0) c *= h - } - i += 1 - } - var h = 0x3C074A61 - h = scala.util.hashing.MurmurHash3.mix(h, a) - h = scala.util.hashing.MurmurHash3.mix(h, b) - h = scala.util.hashing.MurmurHash3.mixLast(h, c) - scala.util.hashing.MurmurHash3.finalizeHash(h, dataLength) - } - - /** - * Overridden to get better performance. See SmallHashMapEquals benchmark for a - * comparison with the default for various inputs. - */ - override def equals(obj: Any): Boolean = { - if (obj == null) return false - obj match { - case m: SmallHashMap[?, ?] => - // The hashCode is cached for this class and will often be a cheaper way to - // exclude equality. - if (this eq m) return true - size == m.size && hashCode == m.hashCode && dataEquals(m.asInstanceOf[SmallHashMap[K, V]]) - case _: Map[?, ?] => - super.equals(obj) - case _ => - false - } - } - - /** - * Compares the data arrays of the two maps. It is assumed that cheaper checks such - * as the sizes of the arrays have already been resolved before this method is called. - * This method will loop through the array and compare corresponding entries. If the - * keys do not match, then it will do a lookup in the other map to check for corresponding - * values. - * - * In practice, the maps are often being created from the same input source and therefore - * have the same insertion order. In those cases the array equality will work fine and no - * lookups will be needed. - */ - private[util] def dataEquals(m: SmallHashMap[K, V]): Boolean = { - var i = 0 - while (i < data.length) { - val k1 = data(i).asInstanceOf[K] - val k2 = m.data(i).asInstanceOf[K] - - if (areEqual(k1, k2)) { - val v1 = data(i + 1) - val v2 = m.data(i + 1) - if (!areEqual(v1, v2)) return false - } else { - if (!keyEquals(m, k1) || !keyEquals(m, k2)) return false - } - - i += 2 - } - true - } - - private def keyEquals(m: SmallHashMap[K, V], k: K): Boolean = { - if (k == null) true - else { - val v1 = getOrNull(k) - val v2 = m.getOrNull(k) - v1 == v2 - } - } - - /** - * Helper function to avoid BoxesRunTime.equals. This function should be easy for - * hotspot to inline. - */ - @scala.annotation.nowarn - private def areEqual[T](v1: T, v2: T): Boolean = { - (v1 == null && v2 == null) || (v1 != null && v1.equals(v2)) - } - - /** This is here to allow for testing and benchmarks. Should note be used otherwise. */ - private[util] def superEquals(obj: Any): Boolean = super.equals(obj) - - /** - * Returns a wrapper that adheres to the java Map interface. This wrapper helps to avoid - * unnecessary allocation of Option - */ - def asJavaMap: java.util.Map[K, V] = { - val self = this - new java.util.AbstractMap[K, V] { - - /** Overridden to use the `getOrNull` call and avoid allocating an Option. */ - override def get(k: AnyRef): V = self.getOrNull(k.asInstanceOf[K]) - - /** Overridden to use the `getOrNull` call and avoid allocating an Option. */ - override def containsKey(k: AnyRef): Boolean = self.getOrNull(k.asInstanceOf[K]) != null - - /** - * Overridden to avoid allocating memory for each entry when iterating over the map. - */ - override def entrySet(): java.util.Set[java.util.Map.Entry[K, V]] = { - new java.util.AbstractSet[java.util.Map.Entry[K, V]] { - override def size(): Int = self.size - - override def iterator(): java.util.Iterator[java.util.Map.Entry[K, V]] = { - new java.util.Iterator[java.util.Map.Entry[K, V]] with java.util.Map.Entry[K, V] { - private[this] val it = entriesIterator - private[this] var key: K = _ - private[this] var value: V = _ - - override def hasNext: Boolean = it.hasNext - - override def next(): java.util.Map.Entry[K, V] = { - key = it.key - value = it.value - it.nextEntry() - this - } - - override def getKey: K = key - - override def getValue: V = value - - override def setValue(value: V): V = { - throw new UnsupportedOperationException("setValue") - } - } - } - } - } - - /** - * Overridden for efficiency to avoid need to allocate entry set iterator with default - * implementation. - */ - override def size(): Int = self.size - } - } -} diff --git a/atlas-core/src/main/scala/com/netflix/atlas/core/validation/HasKeyRule.scala b/atlas-core/src/main/scala/com/netflix/atlas/core/validation/HasKeyRule.scala index 5882c1a8f..05d42b8a6 100644 --- a/atlas-core/src/main/scala/com/netflix/atlas/core/validation/HasKeyRule.scala +++ b/atlas-core/src/main/scala/com/netflix/atlas/core/validation/HasKeyRule.scala @@ -16,7 +16,6 @@ package com.netflix.atlas.core.validation import com.netflix.atlas.core.util.IdMap -import com.netflix.atlas.core.util.SmallHashMap import com.netflix.atlas.core.util.SortedTagMap import com.netflix.spectator.api.Id import com.typesafe.config.Config @@ -30,13 +29,6 @@ import com.typesafe.config.Config */ case class HasKeyRule(key: String) extends Rule { - override def validate(tags: SmallHashMap[String, String]): ValidationResult = { - if (tags.contains(key)) - ValidationResult.Pass - else - failure(s"missing key '$key'", tags) - } - override def validate(tags: SortedTagMap): ValidationResult = { if (tags.contains(key)) ValidationResult.Pass diff --git a/atlas-core/src/main/scala/com/netflix/atlas/core/validation/MaxUserTagsRule.scala b/atlas-core/src/main/scala/com/netflix/atlas/core/validation/MaxUserTagsRule.scala index 5f01535cd..0b514cebd 100644 --- a/atlas-core/src/main/scala/com/netflix/atlas/core/validation/MaxUserTagsRule.scala +++ b/atlas-core/src/main/scala/com/netflix/atlas/core/validation/MaxUserTagsRule.scala @@ -17,7 +17,6 @@ package com.netflix.atlas.core.validation import com.netflix.atlas.core.model.TagKey import com.netflix.atlas.core.util.IdMap -import com.netflix.atlas.core.util.SmallHashMap import com.netflix.atlas.core.util.SortedTagMap import com.netflix.spectator.api.Id import com.typesafe.config.Config @@ -31,19 +30,6 @@ import com.typesafe.config.Config */ case class MaxUserTagsRule(limit: Int) extends Rule { - override def validate(tags: SmallHashMap[String, String]): ValidationResult = { - var count = 0 - val iter = tags.entriesIterator - while (iter.hasNext) { - if (!TagKey.isRestricted(iter.key)) count += 1 - iter.nextEntry() - } - if (count <= limit) ValidationResult.Pass - else { - failure(s"too many user tags: $count > $limit", tags) - } - } - override def validate(tags: SortedTagMap): ValidationResult = { val size = tags.size var i = 0 diff --git a/atlas-core/src/main/scala/com/netflix/atlas/core/validation/Rule.scala b/atlas-core/src/main/scala/com/netflix/atlas/core/validation/Rule.scala index ea4909608..11d9a3e1a 100644 --- a/atlas-core/src/main/scala/com/netflix/atlas/core/validation/Rule.scala +++ b/atlas-core/src/main/scala/com/netflix/atlas/core/validation/Rule.scala @@ -15,7 +15,6 @@ */ package com.netflix.atlas.core.validation -import com.netflix.atlas.core.util.SmallHashMap import com.netflix.atlas.core.util.SortedTagMap import com.netflix.spectator.api.Id import com.typesafe.config.Config @@ -32,17 +31,11 @@ trait Rule { */ def validate(tags: Map[String, String]): ValidationResult = { tags match { - case m: SmallHashMap[String, String] => validate(m) - case m: SortedTagMap => validate(m) - case _ => validate(SmallHashMap(tags)) + case m: SortedTagMap => validate(m) + case _ => validate(SortedTagMap(tags)) } } - /** - * Validates that the tag map matches the rule. - */ - def validate(tags: SmallHashMap[String, String]): ValidationResult - /** * Validates that the tag map matches the rule. */ diff --git a/atlas-core/src/main/scala/com/netflix/atlas/core/validation/TagRule.scala b/atlas-core/src/main/scala/com/netflix/atlas/core/validation/TagRule.scala index 4072e357c..3c572fba5 100644 --- a/atlas-core/src/main/scala/com/netflix/atlas/core/validation/TagRule.scala +++ b/atlas-core/src/main/scala/com/netflix/atlas/core/validation/TagRule.scala @@ -16,7 +16,6 @@ package com.netflix.atlas.core.validation import com.netflix.atlas.core.util.IdMap -import com.netflix.atlas.core.util.SmallHashMap import com.netflix.atlas.core.util.SortedTagMap import com.netflix.spectator.api.Id @@ -25,16 +24,6 @@ import com.netflix.spectator.api.Id */ trait TagRule extends Rule { - override def validate(tags: SmallHashMap[String, String]): ValidationResult = { - val iter = tags.entriesIterator - while (iter.hasNext) { - val result = validate(iter.key, iter.value) - if (result != TagRule.Pass) return failure(result, tags) - iter.nextEntry() - } - ValidationResult.Pass - } - override def validate(tags: SortedTagMap): ValidationResult = { val size = tags.size var i = 0 diff --git a/atlas-core/src/test/scala/com/netflix/atlas/core/index/QueryIndexSuite.scala b/atlas-core/src/test/scala/com/netflix/atlas/core/index/QueryIndexSuite.scala deleted file mode 100644 index 6a711cf87..000000000 --- a/atlas-core/src/test/scala/com/netflix/atlas/core/index/QueryIndexSuite.scala +++ /dev/null @@ -1,330 +0,0 @@ -/* - * Copyright 2014-2024 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.netflix.atlas.core.index - -import java.net.URI -import com.netflix.atlas.core.model.DataExpr -import com.netflix.atlas.core.model.MathExpr -import com.netflix.atlas.core.model.ModelExtractors -import com.netflix.atlas.core.model.Query -import com.netflix.atlas.core.model.StyleVocabulary -import com.netflix.atlas.core.model.TimeSeriesExpr -import com.netflix.atlas.core.stacklang.Interpreter -import com.netflix.atlas.core.util.SmallHashMap -import com.netflix.atlas.core.util.Streams -import org.openjdk.jol.info.GraphLayout -import munit.FunSuite - -import scala.util.Using - -class QueryIndexSuite extends FunSuite { - - private def matches[T](index: QueryIndex[T], tags: Map[String, String]): Boolean = { - val r1 = index.matches(tags) - val r2 = index.matches(SmallHashMap(tags)) - assertEquals(r1, r2) - r1 - } - - private def matchingEntries[T](index: QueryIndex[T], tags: Map[String, String]): List[T] = { - val r1 = index.matchingEntries(tags).sortWith(_.toString < _.toString) - val r2 = index.matchingEntries(SmallHashMap(tags)).sortWith(_.toString < _.toString) - assertEquals(r1, r2) - r1 - } - - test("empty") { - val index = QueryIndex(Nil) - assert(!matches(index, Map.empty)) - assert(!matches(index, Map("a" -> "1"))) - } - - test("matchingEntries empty") { - val index = QueryIndex(Nil) - assert(matchingEntries(index, Map.empty).isEmpty) - assert(matchingEntries(index, Map("a" -> "1")).isEmpty) - } - - test("single query: simple") { - val q = Query.And(Query.Equal("a", "1"), Query.Equal("b", "2")) - val index = QueryIndex(List(q)) - - // Not all tags are present - assert(!matches(index, Map.empty)) - assert(!matches(index, Map("a" -> "1"))) - - // matches - assert(matches(index, Map("a" -> "1", "b" -> "2"))) - assert(matches(index, Map("a" -> "1", "b" -> "2", "c" -> "3"))) - - // a doesn't match - assert(!matches(index, Map("a" -> "2", "b" -> "2", "c" -> "3"))) - - // b doesn't match - assert(!matches(index, Map("a" -> "1", "b" -> "3", "c" -> "3"))) - } - - test("matchingEntries single query: simple") { - val q = Query.And(Query.Equal("a", "1"), Query.Equal("b", "2")) - val index = QueryIndex(List(q)) - - // Not all tags are present - assert(matchingEntries(index, Map.empty).isEmpty) - assert(matchingEntries(index, Map("a" -> "1")).isEmpty) - - // matches - assertEquals(matchingEntries(index, Map("a" -> "1", "b" -> "2")), List(q)) - assertEquals(matchingEntries(index, Map("a" -> "1", "b" -> "2", "c" -> "3")), List(q)) - - // a doesn't match - assert(matchingEntries(index, Map("a" -> "2", "b" -> "2", "c" -> "3")).isEmpty) - - // b doesn't match - assert(matchingEntries(index, Map("a" -> "1", "b" -> "3", "c" -> "3")).isEmpty) - } - - test("single query: complex") { - val q = Query.And(Query.And(Query.Equal("a", "1"), Query.Equal("b", "2")), Query.HasKey("c")) - val index = QueryIndex(List(q)) - - // Not all tags are present - assert(!matches(index, Map.empty)) - assert(!matches(index, Map("a" -> "1"))) - assert(!matches(index, Map("a" -> "1", "b" -> "2"))) - - // matches - assert(matches(index, Map("a" -> "1", "b" -> "2", "c" -> "3"))) - - // a doesn't match - assert(!matches(index, Map("a" -> "2", "b" -> "2", "c" -> "3"))) - - // b doesn't match - assert(!matches(index, Map("a" -> "1", "b" -> "3", "c" -> "3"))) - } - - test("single query: in expansion is limited") { - // If the :in clauses are fully expanded, then this will cause an OOM error because - // of the combinatorial explosion of simple queries (10k * 10k * 10k). - val q1 = Query.In("a", (0 until 10000).map(_.toString).toList) - val q2 = Query.In("b", (0 until 10000).map(_.toString).toList) - val q3 = Query.In("c", (0 until 10000).map(_.toString).toList) - val q = Query.And(Query.And(q1, q2), q3) - val index = QueryIndex(List(q)) - - assert(matches(index, Map("a" -> "1", "b" -> "9999", "c" -> "727"))) - assert(!matches(index, Map("a" -> "1", "b" -> "10000", "c" -> "727"))) - } - - test("matchingEntries single query: complex") { - val q = Query.And(Query.And(Query.Equal("a", "1"), Query.Equal("b", "2")), Query.HasKey("c")) - val index = QueryIndex(List(q)) - - // Not all tags are present - assert(matchingEntries(index, Map.empty).isEmpty) - assert(matchingEntries(index, Map("a" -> "1")).isEmpty) - assert(matchingEntries(index, Map("a" -> "1", "b" -> "2")).isEmpty) - - // matchingEntries - assertEquals(matchingEntries(index, Map("a" -> "1", "b" -> "2", "c" -> "3")), List(q)) - - // a doesn't match - assert(matchingEntries(index, Map("a" -> "2", "b" -> "2", "c" -> "3")).isEmpty) - - // b doesn't match - assert(matchingEntries(index, Map("a" -> "1", "b" -> "3", "c" -> "3")).isEmpty) - } - - test("many queries") { - // CpuUsage for all instances - val cpuUsage = Query.Equal("name", "cpuUsage") - - // DiskUsage query per node - val diskUsage = Query.Equal("name", "diskUsage") - val diskUsagePerNode = (0 until 100).toList.map { i => - val node = f"i-$i%05d" - Query.And(Query.Equal("nf.node", node), diskUsage) - } - - val index = QueryIndex(cpuUsage :: diskUsagePerNode) - - // Not all tags are present - assert(!matches(index, Map.empty)) - assert(!matches(index, Map("a" -> "1"))) - - // matches - assert(matches(index, Map("name" -> "cpuUsage", "nf.node" -> "unknown"))) - assert(matches(index, Map("name" -> "cpuUsage", "nf.node" -> "i-00099"))) - assert(matches(index, Map("name" -> "diskUsage", "nf.node" -> "i-00099"))) - - // shouldn't match - assert(!matches(index, Map("name" -> "diskUsage", "nf.node" -> "unknown"))) - assert(!matches(index, Map("name" -> "memoryUsage", "nf.node" -> "i-00099"))) - } - - test("matchingEntries many queries") { - // CpuUsage for all instances - val cpuUsage = Query.Equal("name", "cpuUsage") - - // DiskUsage query per node - val diskUsage = Query.Equal("name", "diskUsage") - val diskUsagePerNode = (0 until 100).toList.map { i => - val node = f"i-$i%05d" - Query.And(Query.Equal("nf.node", node), diskUsage) - } - - val index = QueryIndex(cpuUsage :: diskUsage :: diskUsagePerNode) - - // Not all tags are present - assert(matchingEntries(index, Map.empty).isEmpty) - assert(matchingEntries(index, Map("a" -> "1")).isEmpty) - - // matchingEntries - assertEquals( - matchingEntries(index, Map("name" -> "cpuUsage", "nf.node" -> "unknown")), - List(cpuUsage) - ) - assertEquals( - matchingEntries(index, Map("name" -> "cpuUsage", "nf.node" -> "i-00099")), - List(cpuUsage) - ) - assertEquals( - matchingEntries(index, Map("name" -> "diskUsage", "nf.node" -> "i-00099")), - List( - diskUsage, - diskUsagePerNode.last - ) - ) - assertEquals( - matchingEntries(index, Map("name" -> "diskUsage", "nf.node" -> "unknown")), - List(diskUsage) - ) - - // shouldn't match - assert(matchingEntries(index, Map("name" -> "memoryUsage", "nf.node" -> "i-00099")).isEmpty) - } - - test("from list of exprs") { - val expr1: TimeSeriesExpr = DataExpr.Sum(Query.Equal("name", "cpuUsage")) - val expr2: TimeSeriesExpr = - MathExpr.Divide(expr1, DataExpr.Sum(Query.Equal("name", "numCores"))) - val entries = List(expr1, expr2).flatMap { expr => - expr.dataExprs.map { d => - QueryIndex.Entry(d.query, expr) - } - } - val index = QueryIndex.create(entries) - - assertEquals(matchingEntries(index, Map("name" -> "cpuUsage")).toSet, Set(expr1, expr2)) - assertEquals(matchingEntries(index, Map("name" -> "numCores")).toSet, Set(expr2)) - } - - test("queries for both nf.app and nf.cluster") { - val appQuery = Query.Equal("nf.app", "testapp") - val clusterQuery = Query.Equal("nf.cluster", "testapp-test") - val queries = List(appQuery, clusterQuery) - val index = QueryIndex(queries) - - val tags = Map("nf.app" -> "testapp", "nf.cluster" -> "testapp-test") - assert(matches(index, tags)) - assertEquals(matchingEntries(index, tags), queries) - } - - test("queries for both nf.app w/ nf.cluster miss and nf.cluster") { - val appQuery = - Query.And(Query.Equal("nf.app", "testapp"), Query.Equal("nf.cluster", "testapp-miss")) - val clusterQuery = Query.Equal("nf.cluster", "testapp-test") - val queries = List(appQuery, clusterQuery) - val index = QueryIndex(queries) - - val tags = Map("nf.app" -> "testapp", "nf.cluster" -> "testapp-test") - assert(matches(index, tags)) - assertEquals(matchingEntries(index, tags), List(clusterQuery)) - } - - type QueryInterner = scala.collection.mutable.AnyRefMap[Query, Query] - - private def intern(interner: QueryInterner, query: Query): Query = { - query match { - case Query.True => - query - case Query.False => - query - case q: Query.Equal => - interner.getOrElseUpdate(q, Query.Equal(q.k.intern(), q.v.intern())) - case q: Query.LessThan => - interner.getOrElseUpdate(q, Query.LessThan(q.k.intern(), q.v.intern())) - case q: Query.LessThanEqual => - interner.getOrElseUpdate(q, Query.LessThanEqual(q.k.intern(), q.v.intern())) - case q: Query.GreaterThan => - interner.getOrElseUpdate(q, Query.GreaterThan(q.k.intern(), q.v.intern())) - case q: Query.GreaterThanEqual => - interner.getOrElseUpdate(q, Query.GreaterThanEqual(q.k.intern(), q.v.intern())) - case q: Query.Regex => - interner.getOrElseUpdate(q, Query.Regex(q.k.intern(), q.v.intern())) - case q: Query.RegexIgnoreCase => - interner.getOrElseUpdate(q, Query.RegexIgnoreCase(q.k.intern(), q.v.intern())) - case q: Query.In => - interner.getOrElseUpdate(q, Query.In(q.k.intern(), q.vs.map(_.intern()))) - case q: Query.HasKey => - interner.getOrElseUpdate(q, Query.HasKey(q.k.intern())) - case q: Query.And => - interner.getOrElseUpdate(q, Query.And(intern(interner, q.q1), intern(interner, q.q2))) - case q: Query.Or => - interner.getOrElseUpdate(q, Query.Or(intern(interner, q.q1), intern(interner, q.q2))) - case q: Query.Not => - interner.getOrElseUpdate(q, Query.Not(intern(interner, q.q))) - } - } - - private def parse(interner: QueryInterner, s: String): List[Query] = { - try { - val interpreter = Interpreter(StyleVocabulary.allWords) - val queries = interpreter.execute(s).stack.collect { - case ModelExtractors.PresentationType(t) => - t.expr.dataExprs.map(e => intern(interner, e.query)) - } - queries.flatten.distinct - } catch { - case _: Exception => Nil - } - } - - test("memory".ignore) { - val interner = new QueryInterner - val queries = Using.resource(Streams.resource("queries.txt")) { in => - Streams.lines(in).toList.flatMap { u => - val uri = URI.create(u.replace("|", "%7C").replace("^", "%5E")) - val qstring = uri.getRawQuery - if (qstring == null) Nil - else { - qstring - .split("&") - .filter(_.startsWith("q=")) - .map(s => parse(interner, s.substring(2))) - } - } - } - - val inputLayout = GraphLayout.parseInstance(queries) - println("INPUT:") - println(inputLayout.toFootprint) - - val index = QueryIndex(queries.flatten) - val idxLayout = GraphLayout.parseInstance(index) - println("INDEX:") - println(idxLayout.toFootprint) - } -} diff --git a/atlas-core/src/test/scala/com/netflix/atlas/core/model/QuerySuite.scala b/atlas-core/src/test/scala/com/netflix/atlas/core/model/QuerySuite.scala index ac6d2d2a2..3097a9002 100644 --- a/atlas-core/src/test/scala/com/netflix/atlas/core/model/QuerySuite.scala +++ b/atlas-core/src/test/scala/com/netflix/atlas/core/model/QuerySuite.scala @@ -15,7 +15,7 @@ */ package com.netflix.atlas.core.model -import com.netflix.atlas.core.util.SmallHashMap +import com.netflix.atlas.core.util.SortedTagMap import munit.FunSuite class QuerySuite extends FunSuite { @@ -24,19 +24,19 @@ class QuerySuite extends FunSuite { def matches(q: Query, tags: Map[String, String]): Boolean = { val result = q.matches(tags) - assertEquals(result, q.matches(SmallHashMap(tags))) + assertEquals(result, q.matches(SortedTagMap(tags))) result } def matchesAny(q: Query, tags: Map[String, List[String]]): Boolean = { val result = q.matchesAny(tags) - assertEquals(result, q.matchesAny(SmallHashMap(tags))) + assertEquals(result, q.matchesAny(tags)) result } def couldMatch(q: Query, tags: Map[String, String]): Boolean = { val result = q.couldMatch(tags) - assertEquals(result, q.couldMatch(SmallHashMap(tags))) + assertEquals(result, q.couldMatch(SortedTagMap(tags))) result } diff --git a/atlas-core/src/test/scala/com/netflix/atlas/core/model/TaggedItemSuite.scala b/atlas-core/src/test/scala/com/netflix/atlas/core/model/TaggedItemSuite.scala index b515c67bc..672bee056 100644 --- a/atlas-core/src/test/scala/com/netflix/atlas/core/model/TaggedItemSuite.scala +++ b/atlas-core/src/test/scala/com/netflix/atlas/core/model/TaggedItemSuite.scala @@ -16,7 +16,6 @@ package com.netflix.atlas.core.model import com.netflix.atlas.core.util.Hash -import com.netflix.atlas.core.util.SmallHashMap import com.netflix.atlas.core.util.SortedTagMap import munit.FunSuite @@ -54,14 +53,6 @@ class TaggedItemSuite extends FunSuite { assert(TaggedItem.computeId(t1) != TaggedItem.computeId(t2)) } - test("computeId, small hash map") { - val t1 = SmallHashMap("name" -> "foo", "cluster" -> "abc", "app" -> "a", "zone" -> "1") - val t2 = SmallHashMap("name" -> "foo", "cluster" -> "abc", "app" -> "a", "zone" -> "2") - assertEquals(TaggedItem.computeId(t1), expectedId(t1)) - assertEquals(TaggedItem.computeId(t2), expectedId(t2)) - assert(TaggedItem.computeId(t1) != TaggedItem.computeId(t2)) - } - test("computeId, sorted tag map") { val t1 = SortedTagMap("name" -> "foo", "cluster" -> "abc", "app" -> "a", "zone" -> "1") val t2 = SortedTagMap("name" -> "foo", "cluster" -> "abc", "app" -> "a", "zone" -> "2") @@ -74,7 +65,7 @@ class TaggedItemSuite extends FunSuite { // verify buffers grow as expected (10 until 10_000 by 1000).foreach { size => val tags = (0 until size).map(i => i.toString -> UUID.randomUUID().toString).toMap - val smallTags = SmallHashMap(tags) + val smallTags = SortedTagMap(tags) assertEquals(TaggedItem.computeId(smallTags), expectedId(tags)) } } diff --git a/atlas-core/src/test/scala/com/netflix/atlas/core/util/SmallHashMapSuite.scala b/atlas-core/src/test/scala/com/netflix/atlas/core/util/SmallHashMapSuite.scala deleted file mode 100644 index 095483e95..000000000 --- a/atlas-core/src/test/scala/com/netflix/atlas/core/util/SmallHashMapSuite.scala +++ /dev/null @@ -1,486 +0,0 @@ -/* - * Copyright 2014-2024 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.netflix.atlas.core.util - -import java.util.UUID - -import munit.FunSuite - -import scala.util.Random - -class SmallHashMapSuite extends FunSuite { - - // Set of keys taken from prod.us-east-1. This tends to be our biggest region and these are the - // actual keys we see in the data. - private val keys = List( - "action", - "app", - "asn", - "atlas.dstype", - "atlas.legacy", - "aws.dbname", - "aws.elb", - "aws.namespace", - "aws.queue", - "aws.statistic", - "aws.topic", - "backend", - "bitrate", - "blobtype", - "blocked", - "bucket", - "cache", - "cacheId", - "cacheName", - "cacheid", - "callback", - "caller", - "category", - "cdn", - "cdn.org", - "cdn.partnerCdn", - "cdn.routingCluster", - "cdn.site", - "cdnId", - "cdnid", - "cdnreq", - "class", - "clientApp", - "clientMovieId", - "clientipver", - "cluster", - "codePath", - "collector", - "columnfamily", - "command", - "contentRegion", - "contractId", - "controller", - "controller.operationalName", - "controller.rollup", - "country", - "currentZone", - "custom", - "dao", - "decision", - "decisionType", - "def", - "detected", - "device", - "device.operationalName", - "device.rollup", - "device.rollup.2", - "dialableService", - "diff", - "dltype", - "drmtype", - "error", - "errorType", - "esp", - "exception", - "failureLevel", - "findDeviceKeysNoError", - "findDeviceKeysPSK", - "flavor", - "geoequiv_changed_result", - "geoequiv_used", - "id", - "includeType", - "ip", - "keyId", - "keyMovieid", - "keySet", - "keyVersion", - "keyid", - "keyspace", - "languageTag", - "last7", - "level", - "manifestClusterName", - "manufacturer", - "maxBitRate", - "memtype", - "method", - "missing", - "mode", - "model", - "module", - "movieId", - "name", - "nas_site", - "nccprt", - "newService", - "nf.ami", - "nf.app", - "nf.asg", - "nf.cluster", - "nf.country", - "nf.country.rollup", - "nf.node", - "nf.region", - "nf.vmtype", - "nf.zone", - "niwsClientErrorCode", - "nrdp", - "op", - "operation", - "org", - "permitted", - "primary", - "primary.org", - "primary.partnerCdn", - "primary.routingCluster", - "primary.site", - "processor", - "profileId", - "proto", - "protocol", - "provider", - "quality", - "reason", - "recordset", - "redirectFrom", - "redirectTo", - "redirecthn", - "region", - "reqhn", - "request", - "restClient", - "result", - "routeCluster", - "routeClusterId", - "selected", - "selected.isPrimary", - "selected.org", - "selected.partnerCdn", - "selected.routingCluster", - "selected.site", - "sequenceCheck", - "sequenceCheck.deltaValue", - "service", - "shard", - "shouldRedirect", - "signupCountry", - "site", - "source", - "stat", - "statistic", - "status", - "streamState", - "streamType", - "subcounter", - "success", - "target", - "target.operationalName", - "target.rollup", - "targetApp", - "targetCountry", - "targetZone", - "testreason", - "tracks", - "type", - "uiver", - "unit", - "updateType", - "viewedText" - ) - - test("basic operations") { - val empty = SmallHashMap.empty[String, String] - val m1 = SmallHashMap("k1" -> "v1") - val m2 = SmallHashMap("k1" -> "v1", "k2" -> "v2") - - assertEquals(empty.size, 0) - assertEquals(m1.size, 1) - assertEquals(m2.size, 2) - - assertEquals(m1("k1"), "v1") - assert(m1.contains("k1")) - intercept[NoSuchElementException] { m1("k2") } - assert(!m1.contains("k2")) - assertEquals(m2("k1"), "v1") - assertEquals(m2("k2"), "v2") - - assertEquals(m1.get("k1"), Some("v1")) - assertEquals(m1.get("k2"), None) - assertEquals(m2.get("k1"), Some("v1")) - assertEquals(m2.get("k2"), Some("v2")) - - val s = m2.toSet - m2.foreach { t => - assert(s.contains(t)) - } - - assertEquals(m2 - "k2", m1) - } - - test("retains type after removal of key") { - val m1 = SmallHashMap("k1" -> "v1") - val m2 = SmallHashMap("k1" -> "v1", "k2" -> "v2") - assertEquals(m2 - "k2", m1) - assert((m2 - "k2").isInstanceOf[SmallHashMap[?, ?]]) - } - - test("remove key not in map") { - val m1 = SmallHashMap("k1" -> "v1", "k2" -> "v2") - assertEquals(m1 - "k3", m1) - assert((m1 - "k3").isInstanceOf[SmallHashMap[?, ?]]) - } - - test("retains type after adding pair") { - val m1 = SmallHashMap("k1" -> "v1") - val m2 = SmallHashMap("k1" -> "v1", "k2" -> "v2") - assertEquals(m1 + ("k2" -> "v2"), m2) - assert((m1 + ("k2" -> "v2")).isInstanceOf[SmallHashMap[?, ?]]) - } - - test("empty map") { - val m = SmallHashMap.empty[String, String] - assertEquals(m.keySet, Set.empty[String]) - assertEquals(m.get("k1"), None) - assertEquals(m.size, 0) - } - - test("map with 1 pair") { - val m = SmallHashMap("k1" -> "v1") - assertEquals(m.keySet, Set("k1")) - assertEquals(m.get("k1"), Some("v1")) - assertEquals(m.get("k2"), None) - assertEquals(m.size, 1) - } - - test("keySet") { - val m = SmallHashMap("k1" -> "v1", "k2" -> "v2") - assertEquals(m.keySet, Set("k1", "k2")) - } - - test("values") { - val m = SmallHashMap("k1" -> "v1", "k2" -> "v2") - assertEquals(m.values.toSet, Set("v1", "v2")) - } - - test("toSet") { - val m = SmallHashMap("k1" -> "v1", "k2" -> "v2") - assertEquals(m.toSet, Set("k1" -> "v1", "k2" -> "v2")) - } - - test("using builder") { - val expected = SmallHashMap("k1" -> "v1", "k2" -> "v2") - val actual = new SmallHashMap.Builder[String, String](4) - .add("k1", "v1") - .addAll(Map("k2" -> "v2")) - .result - assertEquals(expected, actual) - } - - private def testNumCollisions(m: SmallHashMap[String, String]): Unit = { - - // printf("%d: %d collisions, %.2f probes%n", m.size, m.numCollisions, m.numProbesPerKey) - assert(m.numProbesPerKey < m.size / 4) - } - - // Search for strings that have the desired hash value and will result in a - // collision. - @scala.annotation.tailrec - private def findStringWithHash(v: Int, n: Int): String = { - val s = UUID.randomUUID().toString - val h = Hash.absOrZero(s.hashCode) % n - if (h == v) s else findStringWithHash(v, n) - } - - test("numProbesPerKey with collision that is positioned > data length") { - // empty map, size 0 capacity 42 - val builder = new SmallHashMap.Builder[String, String](42) - builder.add(findStringWithHash(12, 42), "1") - builder.add(findStringWithHash(12, 42), "2") - val m = builder.result - assertEquals(m.numCollisions, 1) - assertEquals(m.numProbesPerKey, 0.5) - } - - test("numCollisions 25") { - val rkeys = Random.shuffle(keys) - val m = SmallHashMap(rkeys.take(25).map(v => v -> v)*) - testNumCollisions(m) - rkeys.take(25).foreach { k => - assertEquals(m.get(k), Some(k)) - } - } - - test("numCollisions 50") { - val rkeys = Random.shuffle(keys) - val m = SmallHashMap(rkeys.take(50).map(v => v -> v)*) - testNumCollisions(m) - rkeys.take(50).foreach { k => - assertEquals(m.get(k), Some(k)) - } - } - - test("numCollisions all") { - val rkeys = Random.shuffle(keys) - val m = SmallHashMap(rkeys.map(v => v -> v)*) - testNumCollisions(m) - rkeys.foreach { k => - assertEquals(m.get(k), Some(k)) - } - } - - test("equals and hashCode, different orders with gaps") { - (0 until 1000).foreach { _ => - val n = Random.nextInt(50) - val data = (0 until n).map { _ => - val v = Random.nextInt() - v.toString -> v.toString - } - val m1 = SmallHashMap(100, data.iterator) - val m2 = SmallHashMap(100, Random.shuffle(data).iterator) - assertEquals(m1.hashCode, m2.hashCode) - assertEquals(m1, m2) - } - } - - test("equals and hashCode, different orders") { - (0 until 1000).foreach { _ => - val n = Random.nextInt(50) - val data = (0 until n).map { _ => - val v = Random.nextInt() - v.toString -> v.toString - } - val m1 = SmallHashMap(data) - val m2 = SmallHashMap(Random.shuffle(data)) - assertEquals(m1.hashCode, m2.hashCode) - assertEquals(m1, m2) - } - } - - test("equals and hashCode, collisions on random data") { - val size = 10000 - val naive = new IntHashSet(0) - val ref = new IntHashSet(0) - (0 until size).foreach { _ => - val n = Random.nextInt(50) - val data = (0 until n).map { _ => - val v = Random.nextInt() - v.toString -> v.toString - } - val m = SmallHashMap(Random.shuffle(data)) - naive.add(m.hashCode) - ref.add(scala.util.hashing.MurmurHash3.mapHash(m)) - } - check(size, naive.size, ref.size) - } - - test("equals and hashCode, collisions on realistic data") { - val size = 10000 - val naive = new IntHashSet(0) - val ref = new IntHashSet(0) - (0 until size).foreach { i => - val m = SmallHashMap( - "nf.app" -> "atlas_backend", - "nf.cluster" -> "atlas_backend-dev", - "nf.asg" -> "atlas_backend-dev-v001", - "nf.stack" -> "dev", - "nf.region" -> "us-east-1", - "nf.zone" -> "us-east-1e", - "nf.node" -> f"i-$i%017x", - "nf.ami" -> "ami-987654321", - "nf.vmtype" -> "r3.2xlarge", - "name" -> "jvm.gc.pause", - "cause" -> "Allocation_Failure", - "action" -> "end_of_major_GC", - "statistic" -> "totalTime" - ) - naive.add(m.hashCode) - ref.add(scala.util.hashing.MurmurHash3.mapHash(m)) - } - check(size, naive.size, ref.size) - } - - // This map seems to do poorly with the naive hash - test("equals and hashCode, collisions on perf test data") { - var size = 0 - val naive = new IntHashSet(0) - val ref = new IntHashSet(0) - for (i <- 0 until 150; j <- 0 until 1000) { - size += 1 - val m = SmallHashMap( - "nf.app" -> "foo", - "nf.cluster" -> "foo-bar", - "nf.asg" -> "foo-bar-v000", - "nf.stack" -> "bar", - "nf.region" -> "us-east-1", - "nf.zone" -> "us-east-1a", - "nf.vmtype" -> "r3.2xlarge", - "name" -> f"test.metric.$j%08x", - "nf.node" -> f"$i%017x", - "atlas.dstype" -> "gauge" - ) - naive.add(m.hashCode) - ref.add(scala.util.hashing.MurmurHash3.mapHash(m)) - } - check(size, naive.size, ref.size) - } - - private def check(size: Int, naive: Int, ref: Int): Unit = { - assert(100.0 * ref / size >= 97.0) - assert(100.0 * naive / size >= 97.0) - } - - test("dataEquals") { - val m1 = SmallHashMap("a" -> "1") - val m2 = SmallHashMap("b" -> "2") - assert(!m1.dataEquals(m2)) - } - - test("dataEquals with different sizes") { - // dataEquals is internal and expects the sizes to match before being - // called. For this test case we are verifying the case where the first - // item in the two maps are different, but a lookup for the item from - // the first map will work. However, the lookup of the item from the - // second map will not. - val m1 = SmallHashMap("a" -> "1") - val m2 = SmallHashMap("a" -> "1", "c" -> "3") - assert(!m1.dataEquals(m2)) - } - - test("javaMap: get") { - val m = SmallHashMap("a" -> "1", "b" -> "2").asJavaMap - assertEquals(m.get("a"), "1") - assertEquals(m.get("b"), "2") - assertEquals(m.get("c"), null) - } - - test("javaMap: containsKey") { - val m = SmallHashMap("a" -> "1", "b" -> "2").asJavaMap - assert(m.containsKey("a")) - assert(m.containsKey("b")) - assert(!m.containsKey("c")) - } - - test("javaMap: entrySet") { - val entries = SmallHashMap("a" -> "1", "b" -> "2").asJavaMap.entrySet() - assertEquals(entries.size(), 2) - - val it = entries.iterator() - while (it.hasNext) { - val entry = it.next() - entry.getKey match { - case "a" => assertEquals(entry.getValue, "1") - case "b" => assertEquals(entry.getValue, "2") - } - } - } -} diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcMessages.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcMessages.scala index e636ac7df..048b2e9d9 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcMessages.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcMessages.scala @@ -20,7 +20,6 @@ import com.fasterxml.jackson.core.JsonParser import com.fasterxml.jackson.core.JsonToken import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.node.NullNode -import com.netflix.atlas.core.util.SmallHashMap import com.netflix.atlas.core.util.SortedTagMap import com.netflix.atlas.json.Json import com.netflix.atlas.json.JsonParserHelper.* @@ -173,11 +172,11 @@ object LwcMessages { } private def parseTags(parser: JsonParser): Map[String, String] = { - val builder = new SmallHashMap.Builder[String, String](30) + val builder = new SortedTagMap.Builder(30) foreachField(parser) { case k => builder.add(k, nextString(parser)) } - builder.result + builder.result() } private val Expression = 0 diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/util/SmallHashMapDeserializer.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/util/SmallHashMapDeserializer.scala deleted file mode 100644 index efca15afa..000000000 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/util/SmallHashMapDeserializer.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright 2014-2024 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.netflix.atlas.eval.util - -import com.fasterxml.jackson.core.JsonParser -import com.fasterxml.jackson.databind.DeserializationContext -import com.fasterxml.jackson.databind.JsonDeserializer -import com.netflix.atlas.core.util.SmallHashMap - -/** - * Custom deserializer for tag maps to go directly to `SmallHashMap` type. It is assumed - * that each tag map should have a relatively small number of entries. - */ -class SmallHashMapDeserializer extends JsonDeserializer[SmallHashMap[String, String]] { - - override def deserialize( - p: JsonParser, - ctxt: DeserializationContext - ): SmallHashMap[String, String] = { - val builder = new SmallHashMap.Builder[String, String](5) - var k = p.nextFieldName() - while (k != null) { - val v = p.nextTextValue() - builder.add(k, v) - k = p.nextFieldName() - } - builder.result - } -} diff --git a/atlas-jmh/src/main/scala/com/netflix/atlas/core/index/QueryIndexMatching.scala b/atlas-jmh/src/main/scala/com/netflix/atlas/core/index/QueryIndexMatching.scala deleted file mode 100644 index 2da5a5935..000000000 --- a/atlas-jmh/src/main/scala/com/netflix/atlas/core/index/QueryIndexMatching.scala +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Copyright 2014-2024 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.netflix.atlas.core.index - -import com.netflix.atlas.core.model.Query -import com.netflix.atlas.core.util.SmallHashMap -import org.openjdk.jmh.annotations.Benchmark -import org.openjdk.jmh.annotations.Scope -import org.openjdk.jmh.annotations.State -import org.openjdk.jmh.annotations.Threads -import org.openjdk.jmh.infra.Blackhole - -/** - * Check to see how query index performs with simple queries based on index size. With similar test - * with real data using 17k alert expressions that decomposed into over 33k query expressions, the - * index was around 1000x faster for processing a metrics payload of 5000 datapoints. The loop took - * around 6 seconds and the index took around 6ms. The real dataset is slower mostly due to more - * regex being used in real queries and not being used in this synthetic data. - * - * ``` - * > jmh:run -prof stack -prof gc -wi 10 -i 10 -f1 -t1 .*QueryIndexMatching.* - * ``` - * - * Initial results: - * - * ``` - * Benchmark Mode Cnt Score Error Units - * QueryIndexMatching.index_100 thrpt 10 1427970.545 ± 42632.895 ops/s - * QueryIndexMatching.index_1000 thrpt 10 1337580.661 ± 113418.137 ops/s - * QueryIndexMatching.index_10000 thrpt 10 1341069.994 ± 104992.441 ops/s - * QueryIndexMatching.index_100000 thrpt 10 1290159.738 ± 76488.013 ops/s - * QueryIndexMatching.loop_100 thrpt 10 714393.977 ± 26067.308 ops/s - * QueryIndexMatching.loop_1000 thrpt 10 68317.877 ± 6006.013 ops/s - * QueryIndexMatching.loop_10000 thrpt 10 3831.356 ± 454.029 ops/s - * QueryIndexMatching.loop_100000 thrpt 10 375.074 ± 30.352 ops/s - * ``` - */ -@State(Scope.Thread) -class QueryIndexMatching { - - // CpuUsage for all instances - private val cpuUsage = Query.Equal("name", "cpuUsage") - - // DiskUsage query per node - private val diskUsage = Query.Equal("name", "diskUsage") - - private def diskUsagePerNode(n: Int): List[Query] = { - (0 until n).toList.map { i => - val node = f"i-$i%05d" - Query.And(Query.Equal("nf.node", node), diskUsage) - } - } - - private val queries_100 = cpuUsage :: diskUsagePerNode(100) - private val index_100 = QueryIndex(queries_100) - - private val queries_1000 = cpuUsage :: diskUsagePerNode(1000) - private val index_1000 = QueryIndex(queries_1000) - - private val queries_10000 = cpuUsage :: diskUsagePerNode(10000) - private val index_10000 = QueryIndex(queries_10000) - - private val queries_100000 = cpuUsage :: diskUsagePerNode(100000) - private val index_100000 = QueryIndex(queries_100000) - - // Sample tag map that doesn't match the above queries. Not matching is often the most expensive - // because it will not be able to short circuit - private val id = Map( - "nf.app" -> "atlas_backend", - "nf.cluster" -> "atlas_backend-dev", - "nf.asg" -> "atlas_backend-dev-v001", - "nf.stack" -> "dev", - "nf.region" -> "us-east-1", - "nf.zone" -> "us-east-1e", - "nf.node" -> "i-123456789", - "nf.ami" -> "ami-987654321", - "nf.vmtype" -> "r3.2xlarge", - "name" -> "jvm.gc.pause", - "cause" -> "Allocation_Failure", - "action" -> "end_of_major_GC", - "statistic" -> "totalTime" - ) - - private val smallId = SmallHashMap(id) - - @Threads(1) - @Benchmark - def loop_100(bh: Blackhole): Unit = { - bh.consume(queries_100.exists(_.matches(id))) - } - - @Threads(1) - @Benchmark - def loop_1000(bh: Blackhole): Unit = { - bh.consume(queries_1000.exists(_.matches(id))) - } - - @Threads(1) - @Benchmark - def loop_10000(bh: Blackhole): Unit = { - bh.consume(queries_10000.exists(_.matches(id))) - } - - @Threads(1) - @Benchmark - def loop_100000(bh: Blackhole): Unit = { - bh.consume(queries_100000.exists(_.matches(id))) - } - - @Threads(1) - @Benchmark - def index_100(bh: Blackhole): Unit = { - bh.consume(index_100.matches(id)) - } - - @Threads(1) - @Benchmark - def index_1000(bh: Blackhole): Unit = { - bh.consume(index_1000.matches(id)) - } - - @Threads(1) - @Benchmark - def index_10000(bh: Blackhole): Unit = { - bh.consume(index_10000.matches(id)) - } - - @Threads(1) - @Benchmark - def index_100000(bh: Blackhole): Unit = { - bh.consume(index_100000.matches(id)) - } - - @Threads(1) - @Benchmark - def index_smallmap_100000(bh: Blackhole): Unit = { - bh.consume(index_100000.matches(smallId)) - } - -} diff --git a/atlas-jmh/src/main/scala/com/netflix/atlas/core/index/RoaringTagIndexBench.scala b/atlas-jmh/src/main/scala/com/netflix/atlas/core/index/RoaringTagIndexBench.scala index 867a3287a..fe16e161b 100644 --- a/atlas-jmh/src/main/scala/com/netflix/atlas/core/index/RoaringTagIndexBench.scala +++ b/atlas-jmh/src/main/scala/com/netflix/atlas/core/index/RoaringTagIndexBench.scala @@ -16,10 +16,9 @@ package com.netflix.atlas.core.index import java.util.UUID - import com.netflix.atlas.core.model.BasicTaggedItem import com.netflix.atlas.core.model.Query -import com.netflix.atlas.core.util.SmallHashMap +import com.netflix.atlas.core.util.SortedTagMap import org.openjdk.jmh.annotations.Benchmark import org.openjdk.jmh.annotations.Scope import org.openjdk.jmh.annotations.State @@ -68,7 +67,7 @@ class RoaringTagIndexBench { private val items = (0 until 10000).map { _ => val id = UUID.randomUUID().toString - BasicTaggedItem(SmallHashMap(baseId ++ Map("nf.node" -> id))) // , i.toString -> id)) + BasicTaggedItem(SortedTagMap(baseId ++ Map("nf.node" -> id))) // , i.toString -> id)) } private val index = RoaringTagIndex[BasicTaggedItem](items.toArray, new IndexStats()) diff --git a/atlas-jmh/src/main/scala/com/netflix/atlas/core/model/ComputeId.scala b/atlas-jmh/src/main/scala/com/netflix/atlas/core/model/ComputeId.scala index 0c347972a..ad782362f 100644 --- a/atlas-jmh/src/main/scala/com/netflix/atlas/core/model/ComputeId.scala +++ b/atlas-jmh/src/main/scala/com/netflix/atlas/core/model/ComputeId.scala @@ -16,7 +16,6 @@ package com.netflix.atlas.core.model import com.netflix.atlas.core.util.Hash -import com.netflix.atlas.core.util.SmallHashMap import com.netflix.atlas.core.util.SortedTagMap import org.openjdk.jmh.annotations.Benchmark import org.openjdk.jmh.annotations.Scope @@ -67,8 +66,6 @@ class ComputeId { "statistic" -> "totalTime" ) - private val smallTagMap = SmallHashMap(tagMap) - private val sortedTagMap = SortedTagMap(tagMap) @Threads(1) @@ -85,12 +82,6 @@ class ComputeId { bh.consume(TaggedItem.computeId(tagMap)) } - @Threads(1) - @Benchmark - def computeIdSmallTagMap(bh: Blackhole): Unit = { - bh.consume(TaggedItem.computeId(smallTagMap)) - } - @Threads(1) @Benchmark def computeIdSortedTagMap(bh: Blackhole): Unit = { diff --git a/atlas-jmh/src/main/scala/com/netflix/atlas/core/model/KeyValueQuery.scala b/atlas-jmh/src/main/scala/com/netflix/atlas/core/model/KeyValueQuery.scala index 39a669452..c1b5f3e0c 100644 --- a/atlas-jmh/src/main/scala/com/netflix/atlas/core/model/KeyValueQuery.scala +++ b/atlas-jmh/src/main/scala/com/netflix/atlas/core/model/KeyValueQuery.scala @@ -15,7 +15,7 @@ */ package com.netflix.atlas.core.model -import com.netflix.atlas.core.util.SmallHashMap +import com.netflix.atlas.core.util.SortedTagMap import org.openjdk.jmh.annotations.Benchmark import org.openjdk.jmh.annotations.Scope import org.openjdk.jmh.annotations.State @@ -51,7 +51,7 @@ class KeyValueQuery { "statistic" -> "totalTime" ) - private val smallTagMap = SmallHashMap(tagMap) + private val smallTagMap = SortedTagMap(tagMap) private val query = Query.And( Query.Equal("nf.app", "atlas_backend"), diff --git a/atlas-jmh/src/main/scala/com/netflix/atlas/core/util/SmallHashMapEntrySet.scala b/atlas-jmh/src/main/scala/com/netflix/atlas/core/util/SmallHashMapEntrySet.scala deleted file mode 100644 index 49e9cc27e..000000000 --- a/atlas-jmh/src/main/scala/com/netflix/atlas/core/util/SmallHashMapEntrySet.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright 2014-2024 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.netflix.atlas.core.util - -import org.openjdk.jmh.annotations.Benchmark -import org.openjdk.jmh.annotations.Scope -import org.openjdk.jmh.annotations.State -import org.openjdk.jmh.infra.Blackhole - -/** - * Check the overhead of java wrapper for use-cases that iterate over the entry set. - * - * ``` - * > jmh:run -prof gc -wi 10 -i 10 -f1 -t1 .*SmallHashMapEntrySet.* - * ... - * Benchmark Mode Cnt Score Error Units - * customEntrySet thrpt 10 13480190.254 ± 351370.866 ops/s - * scalaEntrySet thrpt 10 7782130.178 ± 514660.491 ops/s - * - * customEntrySet gc.alloc.rate.norm 10 24.000 ± 0.001 B/op - * scalaEntrySet gc.alloc.rate.norm 10 272.000 ± 0.001 B/op - * ``` - */ -@State(Scope.Thread) -class SmallHashMapEntrySet { - - import scala.jdk.CollectionConverters.* - - private val tagMap = Map( - "nf.app" -> "atlas_backend", - "nf.cluster" -> "atlas_backend-dev", - "nf.asg" -> "atlas_backend-dev-v001", - "nf.stack" -> "dev", - "nf.region" -> "us-east-1", - "nf.zone" -> "us-east-1e", - "nf.node" -> "i-123456789", - "nf.ami" -> "ami-987654321", - "nf.vmtype" -> "r3.2xlarge", - "name" -> "jvm.gc.pause", - "cause" -> "Allocation_Failure", - "action" -> "end_of_major_GC", - "statistic" -> "totalTime" - ) - - private val smallTagMap = SmallHashMap(tagMap) - private val scalaWrapper = smallTagMap.asJava - private val customWrapper = smallTagMap.asJavaMap - - private def traverseMap(bh: Blackhole, m: java.util.Map[String, String]): Unit = { - val it = m.entrySet().iterator() - while (it.hasNext) { - val entry = it.next() - bh.consume(entry.getKey) - bh.consume(entry.getValue) - } - } - - @Benchmark - def scalaEntrySet(bh: Blackhole): Unit = { - traverseMap(bh, scalaWrapper) - } - - @Benchmark - def customEntrySet(bh: Blackhole): Unit = { - traverseMap(bh, customWrapper) - } -} diff --git a/atlas-jmh/src/main/scala/com/netflix/atlas/core/util/SmallHashMapEquals.scala b/atlas-jmh/src/main/scala/com/netflix/atlas/core/util/SmallHashMapEquals.scala deleted file mode 100644 index b5b95098a..000000000 --- a/atlas-jmh/src/main/scala/com/netflix/atlas/core/util/SmallHashMapEquals.scala +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright 2014-2024 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.netflix.atlas.core.util - -import org.openjdk.jmh.annotations.Benchmark -import org.openjdk.jmh.annotations.Scope -import org.openjdk.jmh.annotations.State -import org.openjdk.jmh.annotations.Threads -import org.openjdk.jmh.infra.Blackhole - -import scala.util.Random - -/** - * Check performance of comparing small hash maps for equality. - * - * ``` - * > jmh:run -prof jmh.extras.JFR -wi 10 -i 10 -f1 -t1 .*SmallHashMapEquals.* - * ... - * [info] Benchmark Mode Cnt Score Error Units - * - * [info] currentEquals thrpt 10 47004215.059 ± 2291847.719 ops/s - * [info] inheritedEquals thrpt 10 2134457.383 ± 111821.520 ops/s - * [info] dataEquals thrpt 10 51326103.645 ± 758297.787 ops/s - * [info] selfEquals thrpt 10 351279563.043 ± 18578115.816 ops/s - * - * [info] currentEqualsNot thrpt 10 273893522.221 ± 7980051.600 ops/s - * [info] inheritedEqualsNot thrpt 10 2341207.187 ± 206356.584 ops/s - * [info] dataEqualsNot thrpt 10 32392263.165 ± 1289262.059 ops/s - * - * [info] currentEqualHashCodes thrpt 10 14601802.119 ± 360902.793 ops/s - * [info] inheritedEqualHashCodes thrpt 10 483515.784 ± 10044.781 ops/s - * ``` - */ -@State(Scope.Thread) -class SmallHashMapEquals { - - private val tagMap = Map( - "nf.app" -> "atlas_backend", - "nf.cluster" -> "atlas_backend-dev", - "nf.asg" -> "atlas_backend-dev-v001", - "nf.stack" -> "dev", - "nf.region" -> "us-east-1", - "nf.zone" -> "us-east-1e", - "nf.node" -> "i-123456789", - "nf.ami" -> "ami-987654321", - "nf.vmtype" -> "r3.2xlarge", - "name" -> "jvm.gc.pause", - "cause" -> "Allocation_Failure", - "action" -> "end_of_major_GC", - "statistic" -> "totalTime" - ) - - private val smallTagMap1 = SmallHashMap(tagMap) - private val smallTagMap2 = SmallHashMap(tagMap) - - private val smallTagMap3 = SmallHashMap(tagMap + ("nf.node" -> "i-987654321")) - - private val (randomMap1, randomMap2) = { - val n = Random.nextInt(50) - val data = (0 until n).map { _ => - val v = Random.nextInt() - v.toString -> v.toString - } - val m1 = SmallHashMap(data) - val m2 = SmallHashMap(Random.shuffle(data)) - m1 -> m2 - } - - @Threads(1) - @Benchmark - def selfEquals(bh: Blackhole): Unit = { - bh.consume(smallTagMap1.equals(smallTagMap1)) - } - - @Threads(1) - @Benchmark - def currentEquals(bh: Blackhole): Unit = { - bh.consume(smallTagMap1.equals(smallTagMap2)) - } - - @Threads(1) - @Benchmark - def inheritedEquals(bh: Blackhole): Unit = { - bh.consume(smallTagMap1.superEquals(smallTagMap2)) - } - - @Threads(1) - @Benchmark - def dataEquals(bh: Blackhole): Unit = { - bh.consume(smallTagMap1.dataEquals(smallTagMap2)) - } - - @Threads(1) - @Benchmark - def currentEqualsNot(bh: Blackhole): Unit = { - bh.consume(smallTagMap1.equals(smallTagMap3)) - } - - @Threads(1) - @Benchmark - def inheritedEqualsNot(bh: Blackhole): Unit = { - bh.consume(smallTagMap1.superEquals(smallTagMap3)) - } - - @Threads(1) - @Benchmark - def dataEqualsNot(bh: Blackhole): Unit = { - bh.consume(smallTagMap1.dataEquals(smallTagMap3)) - } - - @Threads(1) - @Benchmark - def currentEqualHashCodes(bh: Blackhole): Unit = { - bh.consume(randomMap1.equals(randomMap2)) - } - - @Threads(1) - @Benchmark - def inheritedEqualHashCodes(bh: Blackhole): Unit = { - bh.consume(randomMap1.superEquals(randomMap2)) - } -} diff --git a/atlas-jmh/src/main/scala/com/netflix/atlas/core/util/SmallHashMapHashCode.scala b/atlas-jmh/src/main/scala/com/netflix/atlas/core/util/SmallHashMapHashCode.scala deleted file mode 100644 index c329f305a..000000000 --- a/atlas-jmh/src/main/scala/com/netflix/atlas/core/util/SmallHashMapHashCode.scala +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright 2014-2024 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.netflix.atlas.core.util - -import org.openjdk.jmh.annotations.Benchmark -import org.openjdk.jmh.annotations.Scope -import org.openjdk.jmh.annotations.State -import org.openjdk.jmh.annotations.Threads -import org.openjdk.jmh.infra.Blackhole - -/** - * Check performance of computing the hash code for a small hash map. Note it will cache after - * the first run, see test with caching. However, the murmur3* tests are more interesting as - * they indicate the performance for the initial computation which is often in the critical - * path for new tag maps. - * - * ``` - * > jmh:run -prof jmh.extras.JFR -wi 10 -i 10 -f1 -t1 .*SmallHashMapHashCode.* - * ... - * [info] Benchmark Mode Cnt Score Error Units - * [info] computeHashCode thrpt 10 33962020.269 ± 883664.164 ops/s - * [info] currentHashCode thrpt 10 360180789.347 ± 10164654.707 ops/s - * [info] murmur3arrayHash thrpt 10 13861013.249 ± 3160191.522 ops/s - * [info] murmur3mapHash thrpt 10 4067194.458 ± 78185.171 ops/s - * ``` - */ -@State(Scope.Thread) -class SmallHashMapHashCode { - - private val tagMap = Map( - "nf.app" -> "atlas_backend", - "nf.cluster" -> "atlas_backend-dev", - "nf.asg" -> "atlas_backend-dev-v001", - "nf.stack" -> "dev", - "nf.region" -> "us-east-1", - "nf.zone" -> "us-east-1e", - "nf.node" -> "i-123456789", - "nf.ami" -> "ami-987654321", - "nf.vmtype" -> "r3.2xlarge", - "name" -> "jvm.gc.pause", - "cause" -> "Allocation_Failure", - "action" -> "end_of_major_GC", - "statistic" -> "totalTime" - ) - - private val smallTagMap = SmallHashMap(tagMap) - - @Threads(1) - @Benchmark - def currentHashCode(bh: Blackhole): Unit = { - bh.consume(smallTagMap.hashCode) - } - - @Threads(1) - @Benchmark - def computeHashCode(bh: Blackhole): Unit = { - bh.consume(smallTagMap.computeHashCode) - } - - @Threads(1) - @Benchmark - def murmur3mapHash(bh: Blackhole): Unit = { - bh.consume(scala.util.hashing.MurmurHash3.mapHash(smallTagMap)) - } - - @Threads(1) - @Benchmark - def murmur3arrayHash(bh: Blackhole): Unit = { - bh.consume(scala.util.hashing.MurmurHash3.arrayHash(smallTagMap.data)) - } -} diff --git a/atlas-jmh/src/main/scala/com/netflix/atlas/core/util/SmallHashMapJavaGet.scala b/atlas-jmh/src/main/scala/com/netflix/atlas/core/util/SmallHashMapJavaGet.scala deleted file mode 100644 index 032c737ef..000000000 --- a/atlas-jmh/src/main/scala/com/netflix/atlas/core/util/SmallHashMapJavaGet.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright 2014-2024 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.netflix.atlas.core.util - -import org.openjdk.jmh.annotations.Benchmark -import org.openjdk.jmh.annotations.Scope -import org.openjdk.jmh.annotations.State -import org.openjdk.jmh.infra.Blackhole - -/** - * Check the overhead of java wrapper for use-cases that perform a lot of get - * calls. - * - * ``` - * > jmh:run -prof gc -wi 10 -i 10 -f1 -t1 .*SmallHashMapJavaGet.* - * ... - * [info] Benchmark Mode Cnt Score Error Units - * [info] customGetFound thrpt 10 94270900.203 ± 5825997.538 ops/s - * [info] customGetNotFound thrpt 10 6799704.339 ± 462769.738 ops/s - * [info] scalaGetFound thrpt 10 85325015.251 ± 4367808.653 ops/s - * [info] scalaGetNotFound thrpt 10 6385962.734 ± 318520.923 ops/s - * ``` - */ -@State(Scope.Thread) -class SmallHashMapJavaGet { - - import scala.jdk.CollectionConverters.* - - private val tagMap = Map( - "nf.app" -> "atlas_backend", - "nf.cluster" -> "atlas_backend-dev", - "nf.asg" -> "atlas_backend-dev-v001", - "nf.stack" -> "dev", - "nf.region" -> "us-east-1", - "nf.zone" -> "us-east-1e", - "nf.node" -> "i-123456789", - "nf.ami" -> "ami-987654321", - "nf.vmtype" -> "r3.2xlarge", - "name" -> "jvm.gc.pause", - "cause" -> "Allocation_Failure", - "action" -> "end_of_major_GC", - "statistic" -> "totalTime" - ) - - private val smallTagMap = SmallHashMap(tagMap) - private val scalaWrapper = smallTagMap.asJava - private val customWrapper = smallTagMap.asJavaMap - - @Benchmark - def scalaGetFound(bh: Blackhole): Unit = { - bh.consume(scalaWrapper.get("name")) - } - - @Benchmark - def customGetFound(bh: Blackhole): Unit = { - bh.consume(customWrapper.get("name")) - } - - @Benchmark - def scalaGetNotFound(bh: Blackhole): Unit = { - bh.consume(scalaWrapper.get("foo")) - } - - @Benchmark - def customGetNotFound(bh: Blackhole): Unit = { - bh.consume(customWrapper.get("foo")) - } -} diff --git a/atlas-jmh/src/main/scala/com/netflix/atlas/core/util/SmallHashMapModify.scala b/atlas-jmh/src/main/scala/com/netflix/atlas/core/util/SmallHashMapModify.scala deleted file mode 100644 index 0a0aaa7c1..000000000 --- a/atlas-jmh/src/main/scala/com/netflix/atlas/core/util/SmallHashMapModify.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright 2014-2024 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.netflix.atlas.core.util - -import org.openjdk.jmh.annotations.Benchmark -import org.openjdk.jmh.annotations.Scope -import org.openjdk.jmh.annotations.State -import org.openjdk.jmh.annotations.Threads -import org.openjdk.jmh.infra.Blackhole - -/** - * Check performance of creating a copy of the map when adding/removing a single pair. - * - * ``` - * > jmh:run -prof gc -wi 10 -i 10 -f1 -t1 .*SmallHashMapModify.* - * ... - * [info] Benchmark Mode Cnt Score Error Units - * [info] SmallHashMapModify.addPair thrpt 5 2541571.202 ± 239240.513 ops/s - * [info] SmallHashMapModify.removePair thrpt 5 3840622.669 ± 439259.261 ops/s - * ``` - */ -@State(Scope.Thread) -class SmallHashMapModify { - - private val tagMap = Map( - "nf.app" -> "atlas_backend", - "nf.cluster" -> "atlas_backend-dev", - "nf.asg" -> "atlas_backend-dev-v001", - "nf.stack" -> "dev", - "nf.region" -> "us-east-1", - "nf.zone" -> "us-east-1e", - "nf.node" -> "i-123456789", - "nf.ami" -> "ami-987654321", - "nf.vmtype" -> "r3.2xlarge", - "name" -> "jvm.gc.pause", - "cause" -> "Allocation_Failure", - "action" -> "end_of_major_GC", - "statistic" -> "totalTime" - ) - - private val smallTagMap = SmallHashMap(tagMap) - - @Threads(1) - @Benchmark - def addPair(bh: Blackhole): Unit = { - bh.consume(smallTagMap + ("foo" -> "bar")) - } - - @Threads(1) - @Benchmark - def removePair(bh: Blackhole): Unit = { - bh.consume(smallTagMap - "nf.ami") - } -} diff --git a/atlas-jmh/src/main/scala/com/netflix/atlas/core/util/StringSub.scala b/atlas-jmh/src/main/scala/com/netflix/atlas/core/util/StringSub.scala index 4678c6d14..37ee4aa14 100644 --- a/atlas-jmh/src/main/scala/com/netflix/atlas/core/util/StringSub.scala +++ b/atlas-jmh/src/main/scala/com/netflix/atlas/core/util/StringSub.scala @@ -61,7 +61,7 @@ class StringSub { private val legend = "$(nf.cluster), $nf.asg, $nf.zone, $(name)" - private val tags = SmallHashMap( + private val tags = SortedTagMap( "nf.app" -> "foo-main", "nf.cluster" -> "foo-main", "nf.asg" -> "foo-main-v042", diff --git a/atlas-jmh/src/main/scala/com/netflix/atlas/core/validation/TagRules.scala b/atlas-jmh/src/main/scala/com/netflix/atlas/core/validation/TagRules.scala index 53fea335d..ddefc4d5c 100644 --- a/atlas-jmh/src/main/scala/com/netflix/atlas/core/validation/TagRules.scala +++ b/atlas-jmh/src/main/scala/com/netflix/atlas/core/validation/TagRules.scala @@ -15,7 +15,6 @@ */ package com.netflix.atlas.core.validation -import com.netflix.atlas.core.util.SmallHashMap import com.netflix.atlas.core.util.SortedTagMap import com.netflix.spectator.impl.AsciiSet import org.openjdk.jmh.annotations.Benchmark @@ -44,7 +43,7 @@ import org.openjdk.jmh.infra.Blackhole @State(Scope.Thread) class TagRules { - private val tags = SmallHashMap( + private val tags = SortedTagMap( "nf.app" -> "atlas_backend", "nf.cluster" -> "atlas_backend-dev", "nf.asg" -> "atlas_backend-dev-v001", @@ -60,8 +59,6 @@ class TagRules { "statistic" -> "totalTime" ) - private val sortedTags = SortedTagMap(tags) - private val rules = List( KeyLengthRule(2, 80), NameValueLengthRule(ValueLengthRule(2, 255), ValueLengthRule(2, 120)), @@ -93,18 +90,8 @@ class TagRules { bh.consume(Rule.validate(tags, rules)) } - @Benchmark - def separateSorted(bh: Blackhole): Unit = { - bh.consume(Rule.validate(sortedTags, rules)) - } - @Benchmark def composite(bh: Blackhole): Unit = { bh.consume(composite.validate(tags)) } - - @Benchmark - def compositeSorted(bh: Blackhole): Unit = { - bh.consume(composite.validate(sortedTags)) - } } diff --git a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/ExpressionSplitter.scala b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/ExpressionSplitter.scala index 3e2dad0f1..515b8fb66 100644 --- a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/ExpressionSplitter.scala +++ b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/ExpressionSplitter.scala @@ -24,6 +24,7 @@ import com.netflix.atlas.core.model.StyleExpr import com.netflix.atlas.core.model.TraceQuery import com.netflix.atlas.eval.model.ExprType import com.netflix.atlas.eval.stream.ExprInterpreter +import com.netflix.spectator.atlas.impl.Parser import com.netflix.spectator.ipc.ServerGroup import com.typesafe.config.Config @@ -58,61 +59,8 @@ class ExpressionSplitter(config: Config) { .expireAfterAccess(10, TimeUnit.MINUTES) .build[String, Try[List[DataExprMeta]]]() - /** - * Cache used to reduce the memory overhead of the query objects. - */ - private val interner = Caffeine - .newBuilder() - .expireAfterAccess(12, TimeUnit.HOURS) - .build[Query, Query]() - - /** - * On instance types with a lot of cores, the loading cache causes a lot of thread - * contention and most threads are blocked. This just does and get/put which potentially - * recomputes some values, but for this case that is preferable. - */ - private def internQuery(q: Query, newQuery: => Query): Query = { - val cached = interner.getIfPresent(q) - if (cached == null) { - val tmp = newQuery - interner.put(tmp, tmp) - tmp - } else { - cached - } - } - - private[lwcapi] def intern(query: Query): Query = { - query match { - case Query.True => - query - case Query.False => - query - case q: Query.Equal => - internQuery(q, Query.Equal(q.k.intern(), q.v.intern())) - case q: Query.LessThan => - internQuery(q, Query.LessThan(q.k.intern(), q.v.intern())) - case q: Query.LessThanEqual => - internQuery(q, Query.LessThanEqual(q.k.intern(), q.v.intern())) - case q: Query.GreaterThan => - internQuery(q, Query.GreaterThan(q.k.intern(), q.v.intern())) - case q: Query.GreaterThanEqual => - internQuery(q, Query.GreaterThanEqual(q.k.intern(), q.v.intern())) - case q: Query.Regex => - internQuery(q, Query.Regex(q.k.intern(), q.v.intern())) - case q: Query.RegexIgnoreCase => - internQuery(q, Query.RegexIgnoreCase(q.k.intern(), q.v.intern())) - case q: Query.In => - internQuery(q, Query.In(q.k.intern(), q.vs.map(_.intern()))) - case q: Query.HasKey => - internQuery(q, Query.HasKey(q.k.intern())) - case q: Query.And => - internQuery(q, Query.And(intern(q.q1), intern(q.q2))) - case q: Query.Or => - internQuery(q, Query.Or(intern(q.q1), intern(q.q2))) - case q: Query.Not => - internQuery(q, Query.Not(intern(q.q))) - } + private def toSpectatorQuery(query: Query): SpectatorQuery = { + Parser.parseQuery(query.toString) } private def parse(expression: String, exprType: ExprType): Try[List[DataExprMeta]] = Try { @@ -121,7 +69,7 @@ class ExpressionSplitter(config: Config) { case ExprType.EVENTS => parsedExpressions.collect { case e: EventExpr => - val q = intern(compress(e.query)) + val q = toSpectatorQuery(compress(e.query)) DataExprMeta(e.toString, q) } case ExprType.TIME_SERIES => @@ -132,13 +80,13 @@ class ExpressionSplitter(config: Config) { .flatten .distinct .map { e => - val q = intern(compress(e.query)) + val q = toSpectatorQuery(compress(e.query)) DataExprMeta(e.toString, q) } case ExprType.TRACE_EVENTS => parsedExpressions.map { e => // Tracing cannot be scoped to specific infrastructure, always use True - DataExprMeta(e.toString, Query.True) + DataExprMeta(e.toString, MatchesAll) } case ExprType.TRACE_TIME_SERIES => parsedExpressions @@ -150,7 +98,7 @@ class ExpressionSplitter(config: Config) { .distinct .map { e => // Tracing cannot be scoped to specific infrastructure, always use True - DataExprMeta(e.toString, Query.True) + DataExprMeta(e.toString, MatchesAll) } } } @@ -226,5 +174,5 @@ class ExpressionSplitter(config: Config) { } object ExpressionSplitter { - private case class DataExprMeta(exprString: String, compressedQuery: Query) + private case class DataExprMeta(exprString: String, compressedQuery: SpectatorQuery) } diff --git a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/Subscription.scala b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/Subscription.scala index 7716cbef8..39fca2bdb 100644 --- a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/Subscription.scala +++ b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/Subscription.scala @@ -15,6 +15,4 @@ */ package com.netflix.atlas.lwcapi -import com.netflix.atlas.core.model.Query - -case class Subscription(query: Query, metadata: ExpressionMetadata) +case class Subscription(query: SpectatorQuery, metadata: ExpressionMetadata) diff --git a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscriptionManager.scala b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscriptionManager.scala index 281bdc787..648a35f5a 100644 --- a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscriptionManager.scala +++ b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscriptionManager.scala @@ -18,10 +18,10 @@ package com.netflix.atlas.lwcapi import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ScheduledThreadPoolExecutor import java.util.concurrent.TimeUnit -import com.netflix.atlas.core.index.QueryIndex import com.netflix.atlas.pekko.ThreadPools import com.netflix.spectator.api.Id import com.netflix.spectator.api.Registry +import com.netflix.spectator.atlas.impl.QueryIndex import com.netflix.spectator.ipc.ServerGroup import com.typesafe.scalalogging.StrictLogging @@ -46,7 +46,7 @@ class SubscriptionManager[T](registry: Registry) extends StrictLogging { private val subHandlers = new ConcurrentHashMap[String, ConcurrentSet[T]]() @volatile private var subscriptionsList = List.empty[Subscription] - @volatile private var queryIndex = QueryIndex.create[Subscription](Nil) + @volatile private var queryIndex = newIndex(Nil) @volatile private var queryListChanged = false // Background process for updating the query index. It is not done inline because rebuilding @@ -56,6 +56,14 @@ class SubscriptionManager[T](registry: Registry) extends StrictLogging { ex.scheduleWithFixedDelay(() => regenerateQueryIndex(), 1, 1, TimeUnit.SECONDS) ex.scheduleAtFixedRate(() => updateGauges(), 1, 1, TimeUnit.MINUTES) + private def newIndex(subs: List[Subscription]): QueryIndex[Subscription] = { + val idx = QueryIndex.newInstance[Subscription](registry) + subs.foreach { sub => + idx.add(sub.query, sub) + } + idx + } + /** Rebuild the query index if there have been changes since it was last created. */ private[lwcapi] def regenerateQueryIndex(): Unit = { if (queryListChanged) { @@ -66,10 +74,7 @@ class SubscriptionManager[T](registry: Registry) extends StrictLogging { .flatMap(_.subscriptions) .toList .distinct - val entries = subscriptionsList.map { sub => - QueryIndex.Entry(sub.query, sub) - } - queryIndex = QueryIndex.create(entries) + queryIndex = newIndex(subscriptionsList) } } @@ -238,22 +243,25 @@ class SubscriptionManager[T](registry: Registry) extends StrictLogging { */ def subscriptionsForCluster(cluster: String): List[Subscription] = { val group = ServerGroup.parse(cluster) - val tags = Map.newBuilder[String, String] + val tags = new java.util.HashMap[String, String] addIfNotNull(tags, "nf.cluster", group.cluster) addIfNotNull(tags, "nf.app", group.app) addIfNotNull(tags, "nf.stack", group.stack) addIfNotNull(tags, "nf.shard1", group.shard1) addIfNotNull(tags, "nf.shard2", group.shard2) - queryIndex.matchingEntries(tags.result()) + + val builder = List.newBuilder[Subscription] + queryIndex.forEachMatch(k => tags.get(k), sub => builder.addOne(sub)) + builder.result() } private def addIfNotNull( - builder: scala.collection.mutable.Builder[(String, String), Map[String, String]], + builder: java.util.HashMap[String, String], key: String, value: String ): Unit = { if (value != null) - builder += (key -> value) + builder.put(key, value) } /** diff --git a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/package.scala b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/package.scala new file mode 100644 index 000000000..c7d16f91e --- /dev/null +++ b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/package.scala @@ -0,0 +1,27 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas + +import com.netflix.spectator.atlas.impl.Query + +package object lwcapi { + + type SpectatorQuery = Query + + val MatchesAll: Query = Query.TRUE + + val MatchesNone: Query = Query.FALSE +} diff --git a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionSplitterSuite.scala b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionSplitterSuite.scala index d38fc28e1..c491cfb0a 100644 --- a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionSplitterSuite.scala +++ b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionSplitterSuite.scala @@ -17,6 +17,7 @@ package com.netflix.atlas.lwcapi import com.netflix.atlas.core.model.Query import com.netflix.atlas.eval.model.ExprType +import com.netflix.spectator.atlas.impl.Parser import com.typesafe.config.ConfigFactory import munit.FunSuite @@ -27,7 +28,7 @@ class ExpressionSplitterSuite extends FunSuite { private val frequency1 = 60000 private val ds1a = "nf.cluster,skan-test,:eq,name,memUsed,:eq,:and,:count,(,nf.node,),:by" private val ds1b = "nf.cluster,skan-test,:eq,name,memUsed,:eq,:and,:sum,(,nf.node,),:by" - private val matchList1 = Query.Equal("nf.cluster", "skan-test") + private val matchList1 = Parser.parseQuery("nf.cluster,skan-test,:eq") private val splitter = new ExpressionSplitter(ConfigFactory.load()) @@ -58,11 +59,11 @@ class ExpressionSplitterSuite extends FunSuite { val actual = splitter.split(expr, ExprType.TRACE_TIME_SERIES, frequency1) val expected = List( Subscription( - Query.True, + MatchesAll, ExpressionMetadata(childExpr(ds1a), ExprType.TRACE_TIME_SERIES, frequency1) ), Subscription( - Query.True, + MatchesAll, ExpressionMetadata(childExpr(ds1b), ExprType.TRACE_TIME_SERIES, frequency1) ) ).reverse @@ -232,27 +233,4 @@ class ExpressionSplitterSuite extends FunSuite { assertEquals(ret, query) } - // - // Interner exerciser - // - test("interner exerciser") { - val tests = List( - Query.True, - Query.False, - Query.Equal("a", "b"), - Query.LessThan("a", "123"), - Query.LessThanEqual("a", "123"), - Query.GreaterThan("a", "123"), - Query.GreaterThanEqual("a", "123"), - Query.Regex("a", "b"), - Query.RegexIgnoreCase("a", "b"), - Query.In("a", List("b", "c")), - Query.HasKey("a"), - Query.And(Query.True, Query.True), - Query.Or(Query.True, Query.True), - Query.Not(Query.True) - ) - tests.foreach(query => assert(splitter.intern(query) == query)) - } - } diff --git a/build.sbt b/build.sbt index 1b7df2295..c39dc4d3c 100644 --- a/build.sbt +++ b/build.sbt @@ -98,6 +98,7 @@ lazy val `atlas-lwcapi` = project .dependsOn(`atlas-pekko`, `atlas-pekko-testkit` % "test", `atlas-core`, `atlas-eval`, `atlas-json`) .settings(libraryDependencies ++= Seq( Dependencies.iepDynConfig, + Dependencies.spectatorAtlas, Dependencies.pekkoTestkit % "test", Dependencies.pekkoHttpTestkit % "test", Dependencies.pekkoStreamTestkit % "test"