diff --git a/atlas-postgres/src/main/scala/com/netflix/atlas/postgres/Interval.scala b/atlas-postgres/src/main/scala/com/netflix/atlas/postgres/Interval.scala new file mode 100644 index 000000000..0b84a3037 --- /dev/null +++ b/atlas-postgres/src/main/scala/com/netflix/atlas/postgres/Interval.scala @@ -0,0 +1,37 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.postgres + +import java.time.Instant + +private[postgres] case class Interval(start: Instant, end: Instant) { + + def overlaps(interval: Interval): Boolean = { + val s1 = start.toEpochMilli + val e1 = end.toEpochMilli + val s2 = interval.start.toEpochMilli + val e2 = interval.end.toEpochMilli + + (s1 >= s2 && s1 <= e2) || (e1 >= s2 && e1 <= e2) || (s1 >= s2 && e1 <= e2) || (s2 >= s1 && e2 <= e1) + } +} + +private[postgres] object Interval { + + def apply(s: Long, e: Long): Interval = { + Interval(Instant.ofEpochMilli(s), Instant.ofEpochMilli(e)) + } +} diff --git a/atlas-postgres/src/main/scala/com/netflix/atlas/postgres/PostgresDatabase.scala b/atlas-postgres/src/main/scala/com/netflix/atlas/postgres/PostgresDatabase.scala new file mode 100644 index 000000000..084ba3488 --- /dev/null +++ b/atlas-postgres/src/main/scala/com/netflix/atlas/postgres/PostgresDatabase.scala @@ -0,0 +1,130 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.postgres + +import com.netflix.atlas.core.db.Database +import com.netflix.atlas.core.db.TimeSeriesBuffer +import com.netflix.atlas.core.index.TagIndex +import com.netflix.atlas.core.model.ArrayBlock +import com.netflix.atlas.core.model.DataExpr +import com.netflix.atlas.core.model.EvalContext +import com.netflix.atlas.core.model.Query +import com.netflix.atlas.core.model.TaggedItem +import com.netflix.atlas.core.model.TimeSeries + +import java.sql.ResultSet +import java.sql.Statement +import java.time.Instant +import java.util +import scala.collection.mutable +import scala.util.Using + +/** + * Database implementation that delegates to PostgreSQL with the blocks stored as rows in the + * time based tables. This class only handles queries, the data loading would be managed separately + * and is more specific to the backing long term storage, e.g. S3. + */ +class PostgresDatabase(postgres: PostgresService) extends Database { + + private val blockSize: Int = postgres.config.getInt("atlas.postgres.block-size") + private val step: Long = postgres.config.getDuration("atlas.postgres.step").toMillis + + private val blockDuration = blockSize * step / 1000 + + override val index: TagIndex[? <: TaggedItem] = { + new PostgresTagIndex(postgres) + } + + private def overlappingTimes(stmt: Statement, ctxt: EvalContext): List[Instant] = { + val interval = Interval(ctxt.start, ctxt.end) + val ts = List.newBuilder[Instant] + val rs = stmt.executeQuery(SqlUtils.listTables) + while (rs.next()) { + SqlUtils.extractTime(rs.getString(1)).foreach { t => + if (interval.overlaps(Interval(t, t.plusSeconds(blockDuration)))) { + ts += t + } + } + } + ts.result().distinct + } + + private def extractTags(expr: DataExpr, rs: ResultSet): Map[String, String] = { + expr.finalGrouping.map { k => + k -> rs.getString(k) + }.toMap + } + + private def copyValues(rs: ResultSet, block: ArrayBlock): Unit = { + val sqlArray = rs.getArray("values") + if (sqlArray != null) { + val array = sqlArray.getArray.asInstanceOf[Array[java.lang.Double]] + var i = 0 + while (i < array.length) { + block.buffer(i) = array(i).doubleValue() + i += 1 + } + } else { + // Avoid carrying state over if block data is not overwritten + util.Arrays.fill(block.buffer, Double.NaN) + } + } + + @scala.annotation.tailrec + private def aggr(expr: DataExpr, buffer: TimeSeriesBuffer, block: ArrayBlock): Unit = { + expr match { + case _: DataExpr.Sum => buffer.add(block) + case _: DataExpr.Count => buffer.add(block) + case _: DataExpr.Min => buffer.min(block) + case _: DataExpr.Max => buffer.max(block) + case DataExpr.Consolidation(af, _) => aggr(af, buffer, block) + case DataExpr.GroupBy(af, _) => aggr(af, buffer, block) + case e => throw new MatchError(s"unsupported DataExpr: $e") + } + } + + override def execute(ctxt: EvalContext, expr: DataExpr): List[TimeSeries] = { + val exactTags = Query.tags(expr.query) + val data = mutable.AnyRefMap.empty[Map[String, String], TimeSeriesBuffer] + postgres.runQueries { stmt => + overlappingTimes(stmt, ctxt).foreach { t => + val block = ArrayBlock(t.toEpochMilli, blockSize) + val queries = SqlUtils.dataQueries(t, postgres.tables, expr) + val q = SqlUtils.unionAll(queries) + Using.resource(stmt.executeQuery(q)) { rs => + while (rs.next()) { + val tags = exactTags ++ extractTags(expr, rs) + // The buffer step size always matches the storage for now. This can be optimized once + // the aggegates pushed down to postgres support consolidation while performing the + // aggregation. + val buffer = data.getOrElseUpdate( + tags, + TimeSeriesBuffer(tags, step, ctxt.start, ctxt.end) + ) + copyValues(rs, block) + aggr(expr, buffer, block) + } + } + } + } + + // Consolidate the final result set if needed + if (ctxt.step == step) + data.values.toList + else + data.values.map(_.consolidate(ctxt.step, expr.cf)).toList + } +} diff --git a/atlas-postgres/src/main/scala/com/netflix/atlas/postgres/PostgresService.scala b/atlas-postgres/src/main/scala/com/netflix/atlas/postgres/PostgresService.scala new file mode 100644 index 000000000..195281cc8 --- /dev/null +++ b/atlas-postgres/src/main/scala/com/netflix/atlas/postgres/PostgresService.scala @@ -0,0 +1,87 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.postgres + +import com.netflix.iep.service.AbstractService +import com.typesafe.config.Config +import com.zaxxer.hikari.HikariConfig +import com.zaxxer.hikari.HikariDataSource +import org.postgresql.copy.CopyManager +import org.postgresql.core.BaseConnection + +import java.sql.Connection +import java.sql.Statement +import scala.util.Using + +/** + * Manage connections to postgres database. + */ +class PostgresService(val config: Config) extends AbstractService { + + val tables: List[TableDefinition] = { + import scala.jdk.CollectionConverters.* + config + .getConfigList("atlas.postgres.tables") + .asScala + .toList + .map(TableDefinition.fromConfig) + } + + private var ds: HikariDataSource = _ + + override def startImpl(): Unit = { + val c = config.getConfig("atlas.postgres") + Class.forName(c.getString("driver")) + val hc = new HikariConfig() + hc.setJdbcUrl(c.getString("url")) + hc.setUsername(c.getString("user")) + hc.setPassword(c.getString("password")) + ds = new HikariDataSource(hc) + + Using.resource(ds.getConnection) { connection => + // Ensure DB is appropriately configured + Using.resource(connection.createStatement()) { stmt => + // Run init statements from config + c.getStringList("init-statements").forEach { sql => + stmt.executeUpdate(sql) + } + + // Setup helper functions + SqlUtils.customFunctions.foreach { sql => + stmt.executeUpdate(sql) + } + } + } + } + + override def stopImpl(): Unit = { + ds.close() + } + + def getConnection: Connection = { + ds.getConnection + } + + def getCopyManager: CopyManager = { + getConnection.unwrap(classOf[BaseConnection]).getCopyAPI + } + + def runQueries[T](f: Statement => T): T = { + Using.resource(getConnection) { connection => + Using.resource(connection.createStatement())(f) + } + } +} diff --git a/atlas-postgres/src/main/scala/com/netflix/atlas/postgres/PostgresTagIndex.scala b/atlas-postgres/src/main/scala/com/netflix/atlas/postgres/PostgresTagIndex.scala new file mode 100644 index 000000000..a7657f36a --- /dev/null +++ b/atlas-postgres/src/main/scala/com/netflix/atlas/postgres/PostgresTagIndex.scala @@ -0,0 +1,91 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.postgres + +import com.netflix.atlas.core.index.TagIndex +import com.netflix.atlas.core.index.TagQuery +import com.netflix.atlas.core.model.Tag +import com.netflix.atlas.core.model.TaggedItem + +import java.sql.Statement +import java.time.Instant +import scala.util.Using + +/** Lookups are delegated to PostgreSQL. Some operations like `size` are not supported. */ +class PostgresTagIndex(postgres: PostgresService) extends TagIndex[TaggedItem] { + + override def findTags(query: TagQuery): List[Tag] = { + val k = query.key.get + findValues(query).map(v => Tag(k, v)) + } + + private def times(stmt: Statement): List[Instant] = { + val ts = List.newBuilder[Instant] + val rs = stmt.executeQuery(SqlUtils.listTables) + while (rs.next()) { + SqlUtils.extractTime(rs.getString(1)).foreach(ts.addOne) + } + ts.result().distinct + } + + override def findKeys(query: TagQuery): List[String] = { + Using.resource(postgres.getConnection) { connection => + Using.resource(connection.createStatement()) { stmt => + val queries = times(stmt).flatMap { t => + SqlUtils.keyQueries(t, postgres.tables, query) + } + if (queries.isEmpty) { + Nil + } else { + val q = SqlUtils.union(queries) + val vs = List.newBuilder[String] + val rs = stmt.executeQuery(q) + while (rs.next()) { + vs += rs.getString(1) + } + vs.result().sorted.take(query.limit) + } + } + } + } + + override def findValues(query: TagQuery): List[String] = { + Using.resource(postgres.getConnection) { connection => + Using.resource(connection.createStatement()) { stmt => + val queries = times(stmt).flatMap { t => + SqlUtils.valueQueries(t, postgres.tables, query) + } + if (queries.isEmpty) { + Nil + } else { + val q = SqlUtils.union(queries) + val vs = List.newBuilder[String] + val rs = stmt.executeQuery(q) + while (rs.next()) { + vs += rs.getString(1) + } + vs.result().sorted.take(query.limit) + } + } + } + } + + override def findItems(query: TagQuery): List[TaggedItem] = { + throw new UnsupportedOperationException("") + } + + override def size: Int = 0 +} diff --git a/atlas-postgres/src/main/scala/com/netflix/atlas/postgres/SqlUtils.scala b/atlas-postgres/src/main/scala/com/netflix/atlas/postgres/SqlUtils.scala new file mode 100644 index 000000000..70e8d0561 --- /dev/null +++ b/atlas-postgres/src/main/scala/com/netflix/atlas/postgres/SqlUtils.scala @@ -0,0 +1,512 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.postgres + +import com.netflix.atlas.core.index.TagQuery +import com.netflix.atlas.core.model.DataExpr +import com.netflix.atlas.core.model.Query +import com.netflix.spectator.impl.PatternMatcher + +import java.time.Instant +import java.time.LocalDateTime +import java.time.ZoneOffset +import java.time.format.DateTimeFormatter + +/** Utilities for working with tables. */ +object SqlUtils { + + private val schema = "atlas" + + private val TableNamePattern = "^.*_([0-9]{12})$".r + + private val suffixFormatter = DateTimeFormatter + .ofPattern("yyyyMMddHHmm") + .withZone(ZoneOffset.UTC) + + /** Convert the time to a suffix string that will be used on the table name. */ + def toSuffix(time: Instant): String = { + suffixFormatter.format(time) + } + + /** Extract the time based on the suffix for the table name. */ + def extractTime(table: String): Option[Instant] = { + table match { + case TableNamePattern(suffix) => Some(parseInstant(suffix)) + case _ => None + } + } + + private def parseInstant(suffix: String): Instant = { + LocalDateTime.parse(suffix, suffixFormatter).toInstant(ZoneOffset.UTC) + } + + def createSchema: String = { + s"create schema if not exists $schema" + } + + def listTables: String = { + s""" + select table_name + from information_schema.tables + where table_schema = '$schema' + and table_type = 'BASE TABLE' + """ + } + + /** + * Create a table if not already present. + * + * @param config + * Configuration for the table schema. + * @param time + * Time suffix to use for the table. Typically tables will be used for a range + * of time and then deleted entirely when past the retention window. + */ + def createTable(config: TableDefinition, time: Instant): String = { + val suffix = toSuffix(time) + val tableName = s"${config.tableName}_$suffix" + val columns = + "values float8[]" :: "tags hstore" :: config.columns.map(c => s"\"$c\" ${config.columnType}") + val columnsStr = columns.mkString(", ") + s"create table if not exists $schema.$tableName($columnsStr)" + } + + /** + * Return a set of key queries for an ASL query expression. + * + * @param time + * Timestamp of interest for the current context. + * @param tables + * Set of tables defined. + * @param tq + * Tag query to map to SQL. + * @return + * Set of SQL queries to lookup the keys. + */ + def keyQueries(time: Instant, tables: List[TableDefinition], tq: TagQuery): List[String] = { + val suffix = toSuffix(time) + val query = tq.query.getOrElse(Query.True) + + tables + .filter { table => + query.couldMatch(table.tags) + } + .map { table => + val cs = table.columns.map(c => s"('${escapeLiteral(c)}')").mkString(", ") + val limit = tq.limit + val offset = s"key > '${escapeLiteral(tq.offset)}'" + s""" + select distinct vs.key as key + from (( + select (each(tags)).key as key + from $schema.${table.tableName}_$suffix + where ${toWhere(table.columns, query)} + ) union (values + $cs + )) as vs + where $offset + order by key + limit $limit + """ + } + } + + /** + * Return a set of value queries for an ASL query expression. + * + * @param time + * Timestamp of interest for the current context. + * @param tables + * Set of tables defined. + * @param tq + * Tag query to map to SQL. + * @return + * Set of SQL queries to lookup the values. + */ + def valueQueries(time: Instant, tables: List[TableDefinition], tq: TagQuery): List[String] = { + require(tq.key.isDefined) + + val suffix = toSuffix(time) + val key = tq.key.get + val query = tq.query.getOrElse(Query.True) + + tables + .filter { table => + query.couldMatch(table.tags) + } + .map { table => + val column = formatColumn(table.columns, key) + val limit = tq.limit + val offset = s"and $column > '${escapeLiteral(tq.offset)}'" + s""" + select distinct $column as "${escapeLiteral(key)}" + from $schema.${table.tableName}_$suffix + where ${toWhere(table.columns, query)} $offset + order by $column + limit $limit + """ + } + } + + /** + * Return a set of data queries for an ASL expression. + * + * @param time + * Timestamp of interest for the current context. + * @param tables + * Set of tables defined. + * @param expr + * Tag query to map to SQL. + * @return + * Set of SQL queries to evaluate a data expression. + */ + def dataQueries(time: Instant, tables: List[TableDefinition], expr: DataExpr): List[String] = { + val suffix = toSuffix(time) + tables + .filter { table => + expr.query.couldMatch(table.tags) + } + .map { table => + if (expr.isGrouped) { + val cs = expr.finalGrouping + .map(c => formatColumn(table.columns, c)) + val selectColumns = cs + .zip(expr.finalGrouping) + .map { + case (column, label) => s"$column as \"${escapeLiteral(label)}\"" + } + .mkString(", ") + val groupByColumns = cs.mkString(", ") + s""" + select $selectColumns, ${toAggr(expr)} + from $schema.${table.tableName}_$suffix + where ${toWhere(table.columns, expr.query)} + group by $groupByColumns + """ + } else { + s""" + select ${toAggr(expr)} + from $schema.${table.tableName}_$suffix + where ${toWhere(table.columns, expr.query)} + """ + } + } + } + + @scala.annotation.tailrec + private def toAggr(expr: DataExpr): String = { + expr match { + case _: DataExpr.Sum => "atlas_aggr_sum(values) as values" + case _: DataExpr.Count => "atlas_aggr_count(values) as values" + case _: DataExpr.Max => "atlas_aggr_max(values) as values" + case _: DataExpr.Min => "atlas_aggr_min(values) as values" + case DataExpr.GroupBy(af, _) => toAggr(af) + case DataExpr.Consolidation(af, _) => toAggr(af) + case e: DataExpr.All => throw new MatchError(s"unsupported DataExpr: $e") + } + } + + private def toWhere(columns: List[String], query: Query): String = { + query match { + case Query.True => "TRUE" + case Query.False => "FALSE" + case Query.And(q1, q2) => s"(${toWhere(columns, q1)}) and (${toWhere(columns, q2)})" + case Query.Or(q1, q2) => s"(${toWhere(columns, q1)}) or (${toWhere(columns, q2)})" + case Query.Not(q) => s"not (${toWhere(columns, q)})" + case Query.HasKey(k) => s"${formatColumn(columns, k)} is not null" + case Query.Equal(k, v) => s"${formatColumn(columns, k)} = '${escapeLiteral(v)}'" + case Query.GreaterThan(k, v) => s"${formatColumn(columns, k)} > '${escapeLiteral(v)}'" + case Query.GreaterThanEqual(k, v) => s"${formatColumn(columns, k)} >= '${escapeLiteral(v)}'" + case Query.LessThan(k, v) => s"${formatColumn(columns, k)} < '${escapeLiteral(v)}'" + case Query.LessThanEqual(k, v) => s"${formatColumn(columns, k)} <= '${escapeLiteral(v)}'" + case r: Query.Regex => toRegexCondition(columns, r.k, r.pattern) + case r: Query.RegexIgnoreCase => toRegexIgnoreCaseCondition(columns, r.k, r.pattern) + case Query.In(k, vs) => toInCondition(columns, k, vs) + } + } + + private def toRegexCondition(columns: List[String], k: String, p: PatternMatcher): String = { + val likePattern = p.toSqlPattern + if (likePattern != null) + s"${formatColumn(columns, k)} like '${escapeLiteral(likePattern)}'" + else + s"${formatColumn(columns, k)} ~ '${escapeLiteral(p.toString)}'" + } + + private def toRegexIgnoreCaseCondition( + columns: List[String], + k: String, + p: PatternMatcher + ): String = { + s"${formatColumn(columns, k)} ~* '${escapeLiteral(p.toString)}'" + } + + private def toInCondition(columns: List[String], k: String, vs: List[String]): String = { + s"${formatColumn(columns, k)} in ('${vs.map(escapeLiteral).mkString("', '")}')" + } + + private def formatColumn(columns: List[String], k: String): String = { + if (columns.contains(k)) + s"\"${escapeLiteral(k)}\"" + else + s"tags -> '${escapeLiteral(k)}'" + } + + private[postgres] def escapeLiteral(str: String): String = { + val buf = new java.lang.StringBuilder(str.length) + org.postgresql.core.Utils.escapeLiteral(buf, str, false) + buf.toString + } + + /** Add operation to use as part of sum aggregation. */ + def addNaN: String = { + """ + create or replace function atlas_add(a float8, b float8) returns float8 as $$ + begin + -- We use null to stand in for NaN. Treats NaN as 0 if the other value is not NaN. + if a is null then + return b; + elsif b is null then + return a; + else + return a + b; + end if; + end; + $$ language plpgsql; + """ + } + + /** Max operation to use as part of aggregation. */ + def maxNaN: String = { + """ + create or replace function atlas_max(a float8, b float8) returns float8 as $$ + begin + -- We use null to stand in for NaN. Treats NaN as 0 if the other value is not NaN. + if a is null then + return b; + elsif b is null then + return a; + elsif a >= b then + return a; + else + return b; + end if; + end; + $$ language plpgsql; + """ + } + + /** Min operation to use as part of aggregation. */ + def minNaN: String = { + """ + create or replace function atlas_min(a float8, b float8) returns float8 as $$ + begin + -- We use null to stand in for NaN. Treats NaN as 0 if the other value is not NaN. + if a is null then + return b; + elsif b is null then + return a; + elsif a <= b then + return a; + else + return b; + end if; + end; + $$ language plpgsql; + """ + } + + /** Add corresponding elements of two arrays. */ + def arrayAdd: String = { + """ + create or replace function atlas_array_add(float8[], float8[]) returns float8[] as $$ + declare + i int; + begin + -- Verify arrays have the same length + if array_length($1, 1) != array_length($2, 1) then + raise exception 'arrays must have the same length'; + end if; + + -- First array is used as the accumulator + for i in 1..array_upper($2, 1) loop + $1[i] := atlas_add($1[i], $2[i]); + end loop; + return $1; + end; + $$ language plpgsql; + """ + } + + /** Count corresponding elements of two arrays. */ + def arrayCount: String = { + """ + create or replace function atlas_array_count(float8[], float8[]) returns float8[] as $$ + declare + acc float8[]; + i int; + begin + -- For first value, initialize the accumulator to 0.0 + acc := $1; + if array_length($1, 1) is null then + acc := array_fill(0.0, ARRAY[array_length($2, 1)]); + end if; + + -- Verify arrays have the same length + if array_length(acc, 1) != array_length($2, 1) then + raise exception 'arrays must have the same length'; + end if; + + -- First array is used as the accumulator + for i in 1..array_upper($2, 1) loop + if $2[i] is not null then + acc[i] := acc[i] + 1.0; + end if; + end loop; + return acc; + end; + $$ language plpgsql; + """ + } + + /** Compute max of corresponding elements of two arrays. */ + def arrayMax: String = { + """ + create or replace function atlas_array_max(float8[], float8[]) returns float8[] as $$ + declare + i int; + begin + -- Verify arrays have the same length + if array_length($1, 1) != array_length($2, 1) then + raise exception 'arrays must have the same length'; + end if; + + -- First array is used as the accumulator + for i in 1..array_upper($2, 1) loop + $1[i] := atlas_max($1[i], $2[i]); + end loop; + return $1; + end; + $$ language plpgsql; + """ + } + + /** Compute min of corresponding elements of two arrays. */ + def arrayMin: String = { + """ + create or replace function atlas_array_min(float8[], float8[]) returns float8[] as $$ + declare + i int; + begin + -- Verify arrays have the same length + if array_length($1, 1) != array_length($2, 1) then + raise exception 'arrays must have the same length'; + end if; + + -- First array is used as the accumulator + for i in 1..array_upper($2, 1) loop + $1[i] := atlas_min($1[i], $2[i]); + end loop; + return $1; + end; + $$ language plpgsql; + """ + } + + /** Convert zero values in an array to nulls. Used with count aggregation. */ + def arrayZeroToNull: String = { + """ + create or replace function atlas_array_ztn(float8[]) returns float8[] as $$ + declare + i int; + begin + for i in 1..array_upper($1, 1) loop + if $1[i] = 0.0 then + $1[i] := null; + end if; + end loop; + return $1; + end; + $$ language plpgsql; + """ + } + + /** Sum aggregation for an array of values. */ + def aggrSum: String = { + """ + create or replace aggregate atlas_aggr_sum(float8[]) ( + sfunc = atlas_array_add, + stype = float8[] + ) + """ + } + + /** Count aggregation for an array of values. */ + def aggrCount: String = { + """ + create or replace aggregate atlas_aggr_count(float8[]) ( + sfunc = atlas_array_count, + stype = float8[], + initcond = '{}', + finalfunc = atlas_array_ztn + ) + """ + } + + /** Max aggregation for an array of values. */ + def aggrMax: String = { + """ + create or replace aggregate atlas_aggr_max(float8[]) ( + sfunc = atlas_array_max, + stype = float8[] + ) + """ + } + + /** Min aggregation for an array of values. */ + def aggrMin: String = { + """ + create or replace aggregate atlas_aggr_min(float8[]) ( + sfunc = atlas_array_min, + stype = float8[] + ) + """ + } + + /** Set of custom functions to simplify usage. */ + def customFunctions: List[String] = List( + addNaN, + maxNaN, + minNaN, + arrayAdd, + arrayCount, + arrayMax, + arrayMin, + arrayZeroToNull, + aggrSum, + aggrCount, + aggrMax, + aggrMin + ) + + def union(queries: List[String]): String = { + queries.mkString("(", ") union (", ")") + } + + def unionAll(queries: List[String]): String = { + queries.mkString("(", ") union all (", ")") + } +} diff --git a/atlas-postgres/src/main/scala/com/netflix/atlas/postgres/TableDefinition.scala b/atlas-postgres/src/main/scala/com/netflix/atlas/postgres/TableDefinition.scala new file mode 100644 index 000000000..c1e36de6a --- /dev/null +++ b/atlas-postgres/src/main/scala/com/netflix/atlas/postgres/TableDefinition.scala @@ -0,0 +1,60 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.postgres + +import com.typesafe.config.Config + +/** + * Definition for a custom table. + * + * @param metricName + * Metric name for the table. A table can be for a specific metric or can use `*` to indicate it is + * a generic table for any data. + * @param tableName + * Base name to use for the table. + * @param columns + * Tag keys to split out as separate columns rather than include in tag map. + * @param columnType + * Type to use for the columns. + */ +case class TableDefinition( + metricName: String, + tableName: String, + columns: List[String], + columnType: String +) { + + def isNameSpecific: Boolean = { + metricName != "*" + } + + val tags: Map[String, String] = { + if (isNameSpecific) Map("name" -> metricName) else Map.empty + } +} + +object TableDefinition { + + def fromConfig(config: Config): TableDefinition = { + import scala.jdk.CollectionConverters.* + TableDefinition( + config.getString("metric-name"), + config.getString("table-name"), + config.getStringList("columns").asScala.toList, + if (config.hasPath("column-type")) config.getString("column-type") else "varchar(255)" + ) + } +} diff --git a/atlas-postgres/src/test/scala/com/netflix/atlas/postgres/IntervalSuite.scala b/atlas-postgres/src/test/scala/com/netflix/atlas/postgres/IntervalSuite.scala new file mode 100644 index 000000000..14e4ef765 --- /dev/null +++ b/atlas-postgres/src/test/scala/com/netflix/atlas/postgres/IntervalSuite.scala @@ -0,0 +1,51 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.postgres + +import munit.FunSuite + +class IntervalSuite extends FunSuite { + + test("overlaps: none") { + val i1 = Interval(0, 100) + val i2 = Interval(101, 102) + assert(!i1.overlaps(i2)) + } + + test("overlaps: i2 starts before end of i1") { + val i1 = Interval(0, 105) + val i2 = Interval(101, 120) + assert(i1.overlaps(i2)) + } + + test("overlaps: i1 starts before end of i2") { + val i1 = Interval(101, 120) + val i2 = Interval(0, 105) + assert(i1.overlaps(i2)) + } + + test("overlaps: i2 within i1") { + val i1 = Interval(0, 105) + val i2 = Interval(101, 102) + assert(i1.overlaps(i2)) + } + + test("overlaps: i1 within i2") { + val i1 = Interval(101, 102) + val i2 = Interval(10, 150) + assert(i1.overlaps(i2)) + } +} diff --git a/atlas-postgres/src/test/scala/com/netflix/atlas/postgres/PostgresDatabaseSuite.scala b/atlas-postgres/src/test/scala/com/netflix/atlas/postgres/PostgresDatabaseSuite.scala new file mode 100644 index 000000000..f7ca46ad0 --- /dev/null +++ b/atlas-postgres/src/test/scala/com/netflix/atlas/postgres/PostgresDatabaseSuite.scala @@ -0,0 +1,242 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.postgres + +import com.netflix.atlas.core.model.ArrayTimeSeq +import com.netflix.atlas.core.model.DataExpr +import com.netflix.atlas.core.model.EvalContext +import com.netflix.atlas.core.model.Query +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory + +import java.time.Duration +import java.time.Instant + +class PostgresDatabaseSuite extends PostgresSuite { + + private val time = Instant.ofEpochMilli(1647892800000L) + private val step = 60_000 + + override def baseConfig: Config = ConfigFactory.parseString( + """ + |atlas.postgres { + | block-size = 2 + | step = 60s + | tables = [ + | { + | metric-name = "requests" + | table-name = "requests" + | columns = ["nf.app", "nf.node"] + | column-type = "varchar(120)" + | } + | { + | metric-name = "*" + | table-name = "others" + | columns = ["name"] + | column-type = "varchar(255)" + | } + | ] + |} + |""".stripMargin + ) + + private def populateRequestsTable(suffix: String): Unit = { + (0 until 10).foreach { i => + executeUpdate(s""" + |insert into atlas.requests_$suffix + |values (ARRAY[$i, $i], '"name"=>"requests"', 'www', 'i-$i') + |""".stripMargin) + val d = i * 10 + executeUpdate(s""" + |insert into atlas.requests_$suffix + |values (ARRAY[$d, $d], '"name"=>"requests"', 'db', 'i-${i + 10}') + |""".stripMargin) + } + } + + private def populateOthersTable(suffix: String): Unit = { + (0 until 10).foreach { i => + executeUpdate(s""" + |insert into atlas.others_$suffix + |values (ARRAY[$i, $i], '"nf.app"=>"www","nf.node"=>"i-$i"', 'cpu') + |""".stripMargin) + val d = i * 10 + executeUpdate(s""" + |insert into atlas.others_$suffix + |values (ARRAY[$d, $d], '"nf.app"=>"www","nf.node"=>"i-$i"', 'disk') + |""".stripMargin) + } + executeUpdate(s""" + |insert into atlas.others_$suffix + |values (ARRAY[100.0, 100.0], '"nf.app"=>"db","id"=>"postgres"', 'disk') + |""".stripMargin) + } + + private def populateSelectTables(): Unit = { + SqlUtils.customFunctions.foreach(executeUpdate) + dropTables() + val blockStep = Duration.ofMinutes(2) + var blockTime = time + (0 until 3).foreach { _ => + val suffix = SqlUtils.toSuffix(blockTime) + service.tables.foreach(t => executeUpdate(SqlUtils.createTable(t, blockTime))) + populateRequestsTable(suffix) + populateOthersTable(suffix) + blockTime = blockTime.plus(blockStep) + } + } + + private def checkData( + data: Array[Double], + size: Int, + sizeWithData: Int, + expected: Double + ): Unit = { + assertEquals(data.length, size) + var i = 0 + while (i < size) { + if (i < sizeWithData) + assertEquals(data(i), expected, s"position $i") + else + assert(data(i).isNaN, s"position $i") + i += 1 + } + } + + test("sum") { + populateSelectTables() + val db = new PostgresDatabase(service) + val context = EvalContext(time.toEpochMilli, time.plusSeconds(360).toEpochMilli, step) + val expr = DataExpr.Sum(Query.Equal("name", "cpu")) + val results = db.execute(context, expr) + + assertEquals(results.size, 1) + val ts = results.head + assertEquals(ts.tags, Map("name" -> "cpu")) + val seq = ts.data.asInstanceOf[ArrayTimeSeq] + assertEquals(seq.start, context.start) + checkData(seq.data, 7, 6, 45.0) + } + + test("sum, no matching data") { + populateSelectTables() + val db = new PostgresDatabase(service) + val context = EvalContext(time.toEpochMilli, time.plusSeconds(360).toEpochMilli, step) + val expr = DataExpr.Sum(Query.Equal("name", "foo")) + val results = db.execute(context, expr) + + assertEquals(results.size, 1) + val ts = results.head + assertEquals(ts.tags, Map("name" -> "foo")) + val seq = ts.data.asInstanceOf[ArrayTimeSeq] + assertEquals(seq.start, context.start) + checkData(seq.data, 7, 0, Double.NaN) + } + + test("sum, no tables for part of range") { + populateSelectTables() + val db = new PostgresDatabase(service) + val context = EvalContext(time.toEpochMilli, time.plusSeconds(600).toEpochMilli, step) + val expr = DataExpr.Sum(Query.Equal("name", "cpu")) + val results = db.execute(context, expr) + + assertEquals(results.size, 1) + val ts = results.head + assertEquals(ts.tags, Map("name" -> "cpu")) + val seq = ts.data.asInstanceOf[ArrayTimeSeq] + assertEquals(seq.start, context.start) + checkData(seq.data, 11, 6, 45.0) + } + + test("requests sum") { + populateSelectTables() + val db = new PostgresDatabase(service) + val context = EvalContext(time.toEpochMilli, time.plusSeconds(360).toEpochMilli, step) + val expr = DataExpr.Sum(Query.Equal("name", "requests")) + val results = db.execute(context, expr) + + assertEquals(results.size, 1) + val ts = results.head + assertEquals(ts.tags, Map("name" -> "requests")) + val seq = ts.data.asInstanceOf[ArrayTimeSeq] + assertEquals(seq.start, context.start) + checkData(seq.data, 7, 6, 495.0) + } + + test("requests sum db only") { + populateSelectTables() + val db = new PostgresDatabase(service) + val context = EvalContext(time.toEpochMilli, time.plusSeconds(360).toEpochMilli, step) + val expr = DataExpr.Sum(Query.Equal("name", "requests").and(Query.Equal("nf.app", "db"))) + val results = db.execute(context, expr) + + assertEquals(results.size, 1) + val ts = results.head + assertEquals(ts.tags, Map("name" -> "requests", "nf.app" -> "db")) + val seq = ts.data.asInstanceOf[ArrayTimeSeq] + assertEquals(seq.start, context.start) + checkData(seq.data, 7, 6, 450.0) + } + + test("requests sum group by app") { + populateSelectTables() + val db = new PostgresDatabase(service) + val context = EvalContext(time.toEpochMilli, time.plusSeconds(360).toEpochMilli, step) + val expr = DataExpr.GroupBy(DataExpr.Sum(Query.Equal("name", "requests")), List("nf.app")) + val results = db.execute(context, expr) + + assertEquals(results.size, 2) + results.foreach { ts => + val app = ts.tags("nf.app") + assertEquals(ts.tags, Map("name" -> "requests", "nf.app" -> app)) + val seq = ts.data.asInstanceOf[ArrayTimeSeq] + assertEquals(seq.start, context.start) + checkData(seq.data, 7, 6, if (app == "db") 450.0 else 45.0) + } + } + + test("requests sum with cpu") { + populateSelectTables() + val db = new PostgresDatabase(service) + val context = EvalContext(time.toEpochMilli, time.plusSeconds(360).toEpochMilli, step) + val expr = + DataExpr.Sum(Query.In("name", List("requests", "cpu")).and(Query.Equal("nf.app", "www"))) + val results = db.execute(context, expr) + + assertEquals(results.size, 1) + val ts = results.head + assertEquals(ts.tags, Map("nf.app" -> "www")) + val seq = ts.data.asInstanceOf[ArrayTimeSeq] + assertEquals(seq.start, context.start) + checkData(seq.data, 7, 6, 90.0) + } + + test("requests sum consolidated") { + populateSelectTables() + val db = new PostgresDatabase(service) + val context = EvalContext(time.toEpochMilli, time.plusSeconds(360).toEpochMilli, 2 * step) + val expr = + DataExpr.Sum(Query.In("name", List("requests", "cpu")).and(Query.Equal("nf.app", "www"))) + val results = db.execute(context, expr) + + assertEquals(results.size, 1) + val ts = results.head + assertEquals(ts.tags, Map("nf.app" -> "www")) + val seq = ts.data.bounded(context.start, context.end) + assertEquals(seq.start, context.start) + checkData(seq.data, 3, 3, 90.0) + } +} diff --git a/atlas-postgres/src/test/scala/com/netflix/atlas/postgres/PostgresSuite.scala b/atlas-postgres/src/test/scala/com/netflix/atlas/postgres/PostgresSuite.scala new file mode 100644 index 000000000..7f5ca4f29 --- /dev/null +++ b/atlas-postgres/src/test/scala/com/netflix/atlas/postgres/PostgresSuite.scala @@ -0,0 +1,88 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.postgres + +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import io.zonky.test.db.postgres.embedded.EmbeddedPostgres +import munit.FunSuite + +abstract class PostgresSuite extends FunSuite { + + private var postgres: EmbeddedPostgres = _ + var service: PostgresService = _ + + def baseConfig: Config = { + ConfigFactory.empty() + } + + override def beforeAll(): Unit = { + postgres = EmbeddedPostgres + .builder() + .setCleanDataDirectory(true) + .setPort(54321) + .start() + + val dbConfig = ConfigFactory.parseString(""" + |atlas.postgres { + | driver = "org.postgresql.Driver" + | url = "jdbc:postgresql://localhost:54321/postgres" + | user = "postgres" + | password = "postgres" + | init-statements = ["create extension if not exists hstore"] + | tables = [ + | { + | metric-name = "*" + | table-name = "others" + | columns = ["name", "atlas.dstype"] + | } + | ] + |} + |""".stripMargin) + val config = baseConfig.withFallback(dbConfig).resolve() + service = new PostgresService(config) + service.start() + + executeUpdate(SqlUtils.createSchema) + } + + override def afterAll(): Unit = { + service.stop() + postgres.close() + } + + def executeUpdate(sql: String): Unit = { + service.runQueries { stmt => + try stmt.executeUpdate(sql) + catch { + case e: Exception => throw new IllegalStateException(s"failed update: $sql", e) + } + } + } + + def dropTables(): Unit = { + service.runQueries { stmt => + val ts = List.newBuilder[String] + val rs = stmt.executeQuery(SqlUtils.listTables) + while (rs.next()) { + ts += rs.getString(1) + } + ts.result().foreach { table => + executeUpdate(s"drop table atlas.$table") + } + } + } +} diff --git a/atlas-postgres/src/test/scala/com/netflix/atlas/postgres/PostgresTagIndexSuite.scala b/atlas-postgres/src/test/scala/com/netflix/atlas/postgres/PostgresTagIndexSuite.scala new file mode 100644 index 000000000..339b207f3 --- /dev/null +++ b/atlas-postgres/src/test/scala/com/netflix/atlas/postgres/PostgresTagIndexSuite.scala @@ -0,0 +1,154 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.postgres + +import com.netflix.atlas.core.index.TagQuery +import com.netflix.atlas.core.model.Query +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory + +import java.time.Instant + +class PostgresTagIndexSuite extends PostgresSuite { + + private val time = Instant.ofEpochMilli(1647892800000L) + private val suffix = SqlUtils.toSuffix(time) + + override def baseConfig: Config = ConfigFactory.parseString( + """ + |atlas.postgres { + | tables = [ + | { + | metric-name = "requests" + | table-name = "requests" + | columns = ["nf.app", "nf.node"] + | column-type = "varchar(120)" + | } + | { + | metric-name = "*" + | table-name = "others" + | columns = ["name"] + | column-type = "varchar(255)" + | } + | ] + |} + |""".stripMargin + ) + + private def populateRequestsTable(): Unit = { + (0 until 10).foreach { i => + executeUpdate(s""" + |insert into atlas.requests_$suffix + |values (ARRAY[$i, $i], '"name"=>"requests"', 'www', 'i-$i') + |""".stripMargin) + val d = i * 10 + executeUpdate(s""" + |insert into atlas.requests_$suffix + |values (ARRAY[$d, $d], '"name"=>"requests"', 'db', 'i-${i + 10}') + |""".stripMargin) + } + } + + private def populateOthersTable(): Unit = { + (0 until 10).foreach { i => + executeUpdate(s""" + |insert into atlas.others_$suffix + |values (ARRAY[$i, $i], '"nf.app"=>"www","nf.node"=>"i-$i"', 'cpu') + |""".stripMargin) + val d = i * 10 + executeUpdate(s""" + |insert into atlas.others_$suffix + |values (ARRAY[$d, $d], '"nf.app"=>"www","nf.node"=>"i-$i"', 'disk') + |""".stripMargin) + } + executeUpdate(s""" + |insert into atlas.others_$suffix + |values (ARRAY[100.0, 100.0], '"nf.app"=>"db","id"=>"postgres"', 'disk') + |""".stripMargin) + } + + private def populateSelectTables(): Unit = { + SqlUtils.customFunctions.foreach(executeUpdate) + dropTables() + service.tables.foreach(t => executeUpdate(SqlUtils.createTable(t, time))) + populateRequestsTable() + populateOthersTable() + } + + test("findKeys") { + populateSelectTables() + val index = new PostgresTagIndex(service) + val keys = index.findKeys(TagQuery(None, None)) + val expected = List("id", "name", "nf.app", "nf.node") + assertEquals(keys, expected) + } + + test("findKeys: cpu") { + populateSelectTables() + val index = new PostgresTagIndex(service) + val keys = index.findKeys(TagQuery(Some(Query.Equal("name", "cpu")), None)) + val expected = List("name", "nf.app", "nf.node") + assertEquals(keys, expected) + } + + test("findKeys: disk") { + populateSelectTables() + val index = new PostgresTagIndex(service) + val keys = index.findKeys(TagQuery(Some(Query.Equal("name", "disk")), None)) + val expected = List("id", "name", "nf.app", "nf.node") + assertEquals(keys, expected) + } + + test("findKeys: requests") { + populateSelectTables() + val index = new PostgresTagIndex(service) + val keys = index.findKeys(TagQuery(Some(Query.Equal("name", "requests")), None)) + val expected = List("name", "nf.app", "nf.node") + assertEquals(keys, expected) + } + + test("findValues: name") { + populateSelectTables() + val index = new PostgresTagIndex(service) + val keys = index.findValues(TagQuery(None, Some("name"))) + val expected = List("cpu", "disk", "requests") + assertEquals(keys, expected) + } + + test("findValues: nf.app") { + populateSelectTables() + val index = new PostgresTagIndex(service) + val keys = index.findValues(TagQuery(None, Some("nf.app"))) + val expected = List("db", "www") + assertEquals(keys, expected) + } + + test("findValues: nf.node") { + populateSelectTables() + val index = new PostgresTagIndex(service) + val keys = index.findValues(TagQuery(None, Some("nf.node"))) + val expected = (0 until 20).map(i => s"i-$i").toList.sorted + assertEquals(keys, expected) + } + + test("findValues: nf.node for cpu") { + populateSelectTables() + val index = new PostgresTagIndex(service) + val keys = index.findValues(TagQuery(Some(Query.Equal("name", "cpu")), Some("nf.node"))) + val expected = (0 until 10).map(i => s"i-$i").toList.sorted + assertEquals(keys, expected) + } +} diff --git a/atlas-postgres/src/test/scala/com/netflix/atlas/postgres/SqlUtilsSuite.scala b/atlas-postgres/src/test/scala/com/netflix/atlas/postgres/SqlUtilsSuite.scala new file mode 100644 index 000000000..fa3c87d54 --- /dev/null +++ b/atlas-postgres/src/test/scala/com/netflix/atlas/postgres/SqlUtilsSuite.scala @@ -0,0 +1,460 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.postgres + +import com.netflix.atlas.core.index.TagQuery +import com.netflix.atlas.core.model.DataExpr +import com.netflix.atlas.core.model.DataVocabulary +import com.netflix.atlas.core.model.ModelExtractors +import com.netflix.atlas.core.model.Query +import com.netflix.atlas.core.stacklang.Interpreter + +import java.time.Instant +import scala.util.Using + +class SqlUtilsSuite extends PostgresSuite { + + private val time = Instant.ofEpochMilli(1647892800000L) + private val suffix = "202203212000" + + private val interpreter = Interpreter(DataVocabulary.allWords) + + test("toSuffix") { + assertEquals(SqlUtils.toSuffix(time), suffix) + } + + test("extractTime") { + assertEquals(SqlUtils.extractTime(s"foo_$suffix"), Some(time)) + assertEquals(SqlUtils.extractTime(s"foo"), None) + } + + test("escapeLiteral") { + assertEquals(SqlUtils.escapeLiteral("foo'bar"), "foo''bar") + } + + test("listTables") { + dropTables() + service.runQueries { stmt => + executeUpdate("create table atlas.foo(a varchar(10))") + executeUpdate("create table atlas.bar(a varchar(10))") + val ts = List.newBuilder[String] + val rs = stmt.executeQuery(SqlUtils.listTables) + while (rs.next()) { + ts += rs.getString(1) + } + assertEquals(ts.result().sorted, List("bar", "foo")) + } + } + + test("createTable: for name") { + dropTables() + val config = TableDefinition("sys.cpu", "cpu", List("nf.app", "id"), "varchar(255)") + val sql = SqlUtils.createTable(config, time) + executeUpdate(sql) + executeUpdate(s""" + |insert into atlas.cpu_$suffix + |values (ARRAY[1, 2], '"nf.cluster"=>"www-main","nf.stack"=>"main"', 'www', 'system') + |""".stripMargin) + } + + test("createTable: for name, just tags column") { + dropTables() + val config = TableDefinition("sys.cpu", "cpu", Nil, "varchar(255)") + val sql = SqlUtils.createTable(config, time) + executeUpdate(sql) + executeUpdate(s""" + |insert into atlas.cpu_$suffix + |values (ARRAY[1, 2], '"nf.cluster"=>"www-main","nf.stack"=>"main"') + |""".stripMargin) + } + + private def populateSelectTable(): TableDefinition = { + SqlUtils.customFunctions.foreach(executeUpdate) + dropTables() + val table = TableDefinition("*", "others", List("name"), "varchar(255)") + executeUpdate(SqlUtils.createTable(table, time)) + (0 until 10).foreach { i => + executeUpdate(s""" + |insert into atlas.others_$suffix + |values (ARRAY[$i, $i], '"nf.app"=>"www","nf.node"=>"i-$i"', 'cpu') + |""".stripMargin) + val d = i * 10 + executeUpdate(s""" + |insert into atlas.others_$suffix + |values (ARRAY[$d, $d], '"nf.app"=>"www","nf.node"=>"i-$i"', 'disk') + |""".stripMargin) + } + executeUpdate(s""" + |insert into atlas.others_$suffix + |values (ARRAY[100.0, 100.0], '"nf.app"=>"db","id"=>"postgres"', 'disk') + |""".stripMargin) + table + } + + private def parseQuery(str: String): Query = { + interpreter.execute(str).stack match { + case (q: Query) :: Nil => q + case _ => throw new IllegalArgumentException(str) + } + } + + private def parseDataExpr(str: String): DataExpr = { + interpreter.execute(str).stack match { + case ModelExtractors.DataExprType(t) :: Nil => t + case _ => throw new IllegalArgumentException(str) + } + } + + private def stringList(sql: String): List[String] = { + service.runQueries { stmt => + val vs = List.newBuilder[String] + val rs = stmt.executeQuery(sql) + while (rs.next()) { + vs += rs.getString(1) + } + vs.result() + } + } + + private def arrayQuery(sql: String): List[java.lang.Double] = { + service.runQueries { stmt => + val rs = stmt.executeQuery(sql) + assert(rs.next()) + val array = rs.getArray(1) + if (array == null) { + Nil + } else { + val vs = array.getArray + .asInstanceOf[Array[java.lang.Double]] + .toList + assert(!rs.next()) + vs + } + } + } + + private def list(vs: java.lang.Double*): List[java.lang.Double] = vs.toList + + private def basicKeys(tq: TagQuery, expected: List[String]): Unit = { + val table = populateSelectTable() + val queries = SqlUtils.keyQueries(time, List(table), tq) + queries.foreach { sql => + assertEquals(stringList(sql), expected) + } + } + + test("keys: all") { + val tq = TagQuery(None) + basicKeys(tq, List("id", "name", "nf.app", "nf.node")) + } + + test("keys: none") { + val query = parseQuery(":false") + val tq = TagQuery(Some(query)) + basicKeys(tq, Nil) + } + + test("keys: cpu") { + val query = parseQuery("name,cpu,:eq") + val tq = TagQuery(Some(query)) + basicKeys(tq, List("name", "nf.app", "nf.node")) + } + + test("keys: db") { + val query = parseQuery("nf.app,db,:eq") + val tq = TagQuery(Some(query)) + basicKeys(tq, List("id", "name", "nf.app")) + } + + private def basicValues(tq: TagQuery, expected: List[String]): Unit = { + val table = populateSelectTable() + val queries = SqlUtils.valueQueries(time, List(table), tq) + queries.foreach { sql => + assertEquals(stringList(sql), expected) + } + } + + test("values: name") { + val tq = TagQuery(None, Some("name")) + basicValues(tq, List("cpu", "disk")) + } + + test("values: with query no matches, name") { + val query = parseQuery(":false") + val tq = TagQuery(Option(query), Some("name")) + basicValues(tq, Nil) + } + + test("values: with query, name") { + val query = parseQuery("name,cpu,:eq") + val tq = TagQuery(Option(query), Some("name")) + basicValues(tq, List("cpu")) + } + + test("values: with query, nf.node") { + val query = parseQuery("name,cpu,:eq") + val tq = TagQuery(Option(query), Some("nf.node")) + basicValues(tq, (0 until 10).map(i => s"i-$i").toList) + } + + test("values: with query and limit, nf.node") { + val query = parseQuery("name,cpu,:eq") + val tq = TagQuery(Option(query), Some("nf.node"), limit = 2) + basicValues(tq, (0 until 2).map(i => s"i-$i").toList) + } + + test("values: with query, offset, and limit, nf.node") { + val query = parseQuery("name,cpu,:eq") + val tq = TagQuery(Option(query), Some("nf.node"), offset = "i-4", limit = 2) + basicValues(tq, (5 until 7).map(i => s"i-$i").toList) + } + + private def basicSelect(exprStr: String, v: Double): Unit = { + val table = populateSelectTable() + val expr = parseDataExpr(exprStr) + val queries = SqlUtils.dataQueries(time, List(table), expr) + queries.foreach { sql => + val expected = if (v.isNaN) Nil else List(v, v) + assertEquals(arrayQuery(sql).map(_.doubleValue()), expected) + } + } + + test("select: sum") { + basicSelect("name,cpu,:eq,:sum", 45.0) + } + + test("select: sum, disk") { + basicSelect("name,disk,:eq,:sum", 550.0) + } + + test("select: sum, has") { + basicSelect("name,cpu,:eq,nf.node,:has,:and,:sum", 45.0) + } + + test("select: sum, has, no data") { + basicSelect("name,cpu,:eq,foo,:has,:and,:sum", Double.NaN) + } + + test("select: sum, in") { + basicSelect("name,cpu,:eq,nf.node,(,i-2,i-4,),:in,:and,:sum", 6.0) + } + + test("select: sum, greater than") { + basicSelect("name,cpu,:eq,nf.node,i-7,:gt,:and,:sum", 17.0) + } + + test("select: sum, greater than equal") { + basicSelect("name,cpu,:eq,nf.node,i-7,:ge,:and,:sum", 24.0) + } + + test("select: sum, less than") { + basicSelect("name,cpu,:eq,nf.node,i-3,:lt,:and,:sum", 3.0) + } + + test("select: sum, less than equal") { + basicSelect("name,cpu,:eq,nf.node,i-3,:le,:and,:sum", 6.0) + } + + test("select: sum, regex") { + basicSelect("name,cpu,:eq,nf.node,i-[3-6],:re,:and,:sum", 18.0) + } + + test("select: sum, regex mapped to like") { + basicSelect("name,cpu,:eq,nf.node,i-.*,:re,:and,:sum", 45.0) + } + + test("select: sum, regex ignore case") { + basicSelect("name,cpu,:eq,nf.node,I-[3-6],:reic,:and,:sum", 18.0) + } + + test("select: sum, or") { + basicSelect("name,cpu,:eq,nf.node,I-[3-6],:reic,nf.node,i-1,:eq,:or,:and,:sum", 19.0) + } + + test("select: sum, not") { + basicSelect("name,cpu,:eq,nf.node,I-[3-6],:reic,nf.node,i-4,:eq,:not,:and,:and,:sum", 14.0) + } + + test("select: count") { + basicSelect("name,cpu,:eq,:count", 10.0) + } + + test("select: max") { + basicSelect("name,cpu,:eq,:max", 9.0) + } + + test("select: min") { + basicSelect("name,cpu,:eq,:min", 0.0) + } + + private def basicSelectBy(exprStr: String, k: String, values: Map[String, Double]): Unit = { + val table = populateSelectTable() + val expr = parseDataExpr(exprStr) + val queries = SqlUtils.dataQueries(time, List(table), expr) + service.runQueries { stmt => + queries.foreach { sql => + val rs = stmt.executeQuery(sql) + while (rs.next()) { + val c = rs.getString(k) + val expected = values(c) + val vs = rs + .getArray("values") + .getArray + .asInstanceOf[Array[java.lang.Double]] + .toList + assertEquals(vs.map(_.doubleValue()), List(expected, expected)) + } + } + } + } + + test("select: sum by name") { + basicSelectBy( + "nf.app,www,:eq,:sum,(,name,),:by", + "name", + Map("cpu" -> 45.0, "disk" -> 450.0) + ) + } + + test("select: sum by nf.node") { + basicSelectBy( + "nf.app,www,:eq,name,cpu,:eq,:and,:sum,(,nf.node,),:by", + "nf.node", + (0 until 10).map(i => s"i-$i" -> i.toDouble).toMap + ) + } + + test("arrayAdd") { + executeUpdate(SqlUtils.addNaN) + executeUpdate(SqlUtils.arrayAdd) + val sql = + """ + select atlas_array_add(ARRAY[1.0, 2.0, null], ARRAY[3.0, null, null]) + """ + assertEquals(arrayQuery(sql), list(4.0, 2.0, null)) + } + + test("aggrSum") { + executeUpdate(SqlUtils.addNaN) + executeUpdate(SqlUtils.arrayAdd) + executeUpdate(SqlUtils.aggrSum) + val sql = + """ + select atlas_aggr_sum(vs) + from (values + (ARRAY[1.0, 2.0, null]), + (ARRAY[3.0, null, null]), + (ARRAY[4.0, 5.0, null]) + ) as t(vs) + """ + assertEquals(arrayQuery(sql), list(8.0, 7.0, null)) + } + + private def arrayValues(rs: java.sql.ResultSet, i: Int): List[Any] = { + rs.getArray(i) + .getArray + .asInstanceOf[Array[java.lang.Double]] + .toList + } + + test("aggrSum, state should not be preserved") { + executeUpdate(SqlUtils.addNaN) + executeUpdate(SqlUtils.arrayAdd) + executeUpdate(SqlUtils.aggrSum) + val sql = + """ + ( + select atlas_aggr_sum(vs) + from (values + (ARRAY[1.0, 2.0, null]), + (ARRAY[3.0, null, null]), + (ARRAY[4.0, 5.0, null]) + ) as a(vs) + ) union all ( + select atlas_aggr_sum(vs) + from (values + (ARRAY[null::float8, null, null]) + ) as b(vs) + ) + """ + service.runQueries { stmt => + val rs = stmt.executeQuery(sql) + assert(rs.next()) + val vs1 = rs + .getArray(1) + .getArray + .asInstanceOf[Array[java.lang.Double]] + .toList + assertEquals(vs1, list(8.0, 7.0, null)) + assert(rs.next()) + val vs2 = rs + .getArray(1) + .getArray + .asInstanceOf[Array[java.lang.Double]] + .toList + assertEquals(vs2, List(null, null, null)) + assert(!rs.next()) + } + } + + test("aggrCount") { + executeUpdate(SqlUtils.arrayCount) + executeUpdate(SqlUtils.arrayZeroToNull) + executeUpdate(SqlUtils.aggrCount) + val sql = + """ + select atlas_aggr_count(vs) + from (values + (ARRAY[1.0, 2.0, null]), + (ARRAY[3.0, null, null]), + (ARRAY[4.0, 5.0, null]) + ) as t(vs) + """ + assertEquals(arrayQuery(sql), list(3.0, 2.0, null)) + } + + test("aggrMax") { + executeUpdate(SqlUtils.maxNaN) + executeUpdate(SqlUtils.arrayMax) + executeUpdate(SqlUtils.aggrMax) + val sql = + """ + select atlas_aggr_max(vs) + from (values + (ARRAY[1.0, 2.0, null]), + (ARRAY[3.0, null, null]), + (ARRAY[4.0, 5.0, null]) + ) as t(vs) + """ + assertEquals(arrayQuery(sql), list(4.0, 5.0, null)) + } + + test("aggrMin") { + executeUpdate(SqlUtils.minNaN) + executeUpdate(SqlUtils.arrayMin) + executeUpdate(SqlUtils.aggrMin) + val sql = + """ + select atlas_aggr_min(vs) + from (values + (ARRAY[1.0, 2.0, null]), + (ARRAY[3.0, null, null]), + (ARRAY[4.0, 5.0, null]) + ) as t(vs) + """ + assertEquals(arrayQuery(sql), list(1.0, 2.0, null)) + } +} diff --git a/atlas-postgres/src/test/scala/com/netflix/atlas/postgres/TableDefinitionSuite.scala b/atlas-postgres/src/test/scala/com/netflix/atlas/postgres/TableDefinitionSuite.scala new file mode 100644 index 000000000..807a13753 --- /dev/null +++ b/atlas-postgres/src/test/scala/com/netflix/atlas/postgres/TableDefinitionSuite.scala @@ -0,0 +1,55 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.postgres + +import com.typesafe.config.ConfigFactory +import munit.FunSuite + +class TableDefinitionSuite extends FunSuite { + + test("load from config") { + val config = ConfigFactory.parseString(""" + |metric-name = "ipc.server.call" + |table-name = "ipc_server_call" + |columns = ["nf.app", "ipc.client.app"] + |column-type = "varchar(120)" + |""".stripMargin) + val table = TableDefinition.fromConfig(config) + val expected = TableDefinition( + "ipc.server.call", + "ipc_server_call", + List("nf.app", "ipc.client.app"), + "varchar(120)" + ) + assertEquals(table, expected) + } + + test("load from config, default column type") { + val config = ConfigFactory.parseString(""" + |metric-name = "ipc.server.call" + |table-name = "ipc_server_call" + |columns = ["nf.app", "ipc.client.app"] + |""".stripMargin) + val table = TableDefinition.fromConfig(config) + val expected = TableDefinition( + "ipc.server.call", + "ipc_server_call", + List("nf.app", "ipc.client.app"), + "varchar(255)" + ) + assertEquals(table, expected) + } +} diff --git a/build.sbt b/build.sbt index bdbcad599..26a4b7948 100644 --- a/build.sbt +++ b/build.sbt @@ -115,6 +115,8 @@ lazy val `atlas-postgres` = project .configure(BuildSettings.profile) .dependsOn(`atlas-core`) .settings(libraryDependencies ++= Seq( + Dependencies.hikariCP, + Dependencies.iepService, Dependencies.postgres, Dependencies.postgresEmbedded % "test" )) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index fc53341ea..ba3673b65 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -30,6 +30,7 @@ object Dependencies { val caffeine = "com.github.ben-manes.caffeine" % "caffeine" % "3.1.8" val datasketches = "org.apache.datasketches" % "datasketches-java" % "4.2.0" val equalsVerifier = "nl.jqno.equalsverifier" % "equalsverifier" % "3.15.5" + val hikariCP = "com.zaxxer" % "HikariCP" % "5.1.0" val iepLeaderApi = "com.netflix.iep" % "iep-leader-api" % iep val iepLeaderDynamoDb = "com.netflix.iep" % "iep-leader-dynamodb" % iep val iepDynConfig = "com.netflix.iep" % "iep-dynconfig" % iep