Skip to content

Commit

Permalink
postgres: db implementation (Netflix#1603)
Browse files Browse the repository at this point in the history
Metric database implementation delegating to postgres for
querying tables with rows of block data. Some optimizations
for aggregations pending: moving windows, consolidation,
and potentially compressed arrays.
  • Loading branch information
brharrington authored and manolama committed May 22, 2024
1 parent e4cdb8b commit 8691c9b
Show file tree
Hide file tree
Showing 14 changed files with 1,970 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2014-2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.atlas.postgres

import java.time.Instant

private[postgres] case class Interval(start: Instant, end: Instant) {

def overlaps(interval: Interval): Boolean = {
val s1 = start.toEpochMilli
val e1 = end.toEpochMilli
val s2 = interval.start.toEpochMilli
val e2 = interval.end.toEpochMilli

(s1 >= s2 && s1 <= e2) || (e1 >= s2 && e1 <= e2) || (s1 >= s2 && e1 <= e2) || (s2 >= s1 && e2 <= e1)
}
}

private[postgres] object Interval {

def apply(s: Long, e: Long): Interval = {
Interval(Instant.ofEpochMilli(s), Instant.ofEpochMilli(e))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright 2014-2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.atlas.postgres

import com.netflix.atlas.core.db.Database
import com.netflix.atlas.core.db.TimeSeriesBuffer
import com.netflix.atlas.core.index.TagIndex
import com.netflix.atlas.core.model.ArrayBlock
import com.netflix.atlas.core.model.DataExpr
import com.netflix.atlas.core.model.EvalContext
import com.netflix.atlas.core.model.Query
import com.netflix.atlas.core.model.TaggedItem
import com.netflix.atlas.core.model.TimeSeries

import java.sql.ResultSet
import java.sql.Statement
import java.time.Instant
import java.util
import scala.collection.mutable
import scala.util.Using

/**
* Database implementation that delegates to PostgreSQL with the blocks stored as rows in the
* time based tables. This class only handles queries, the data loading would be managed separately
* and is more specific to the backing long term storage, e.g. S3.
*/
class PostgresDatabase(postgres: PostgresService) extends Database {

private val blockSize: Int = postgres.config.getInt("atlas.postgres.block-size")
private val step: Long = postgres.config.getDuration("atlas.postgres.step").toMillis

private val blockDuration = blockSize * step / 1000

override val index: TagIndex[? <: TaggedItem] = {
new PostgresTagIndex(postgres)
}

private def overlappingTimes(stmt: Statement, ctxt: EvalContext): List[Instant] = {
val interval = Interval(ctxt.start, ctxt.end)
val ts = List.newBuilder[Instant]
val rs = stmt.executeQuery(SqlUtils.listTables)
while (rs.next()) {
SqlUtils.extractTime(rs.getString(1)).foreach { t =>
if (interval.overlaps(Interval(t, t.plusSeconds(blockDuration)))) {
ts += t
}
}
}
ts.result().distinct
}

private def extractTags(expr: DataExpr, rs: ResultSet): Map[String, String] = {
expr.finalGrouping.map { k =>
k -> rs.getString(k)
}.toMap
}

private def copyValues(rs: ResultSet, block: ArrayBlock): Unit = {
val sqlArray = rs.getArray("values")
if (sqlArray != null) {
val array = sqlArray.getArray.asInstanceOf[Array[java.lang.Double]]
var i = 0
while (i < array.length) {
block.buffer(i) = array(i).doubleValue()
i += 1
}
} else {
// Avoid carrying state over if block data is not overwritten
util.Arrays.fill(block.buffer, Double.NaN)
}
}

@scala.annotation.tailrec
private def aggr(expr: DataExpr, buffer: TimeSeriesBuffer, block: ArrayBlock): Unit = {
expr match {
case _: DataExpr.Sum => buffer.add(block)
case _: DataExpr.Count => buffer.add(block)
case _: DataExpr.Min => buffer.min(block)
case _: DataExpr.Max => buffer.max(block)
case DataExpr.Consolidation(af, _) => aggr(af, buffer, block)
case DataExpr.GroupBy(af, _) => aggr(af, buffer, block)
case e => throw new MatchError(s"unsupported DataExpr: $e")
}
}

override def execute(ctxt: EvalContext, expr: DataExpr): List[TimeSeries] = {
val exactTags = Query.tags(expr.query)
val data = mutable.AnyRefMap.empty[Map[String, String], TimeSeriesBuffer]
postgres.runQueries { stmt =>
overlappingTimes(stmt, ctxt).foreach { t =>
val block = ArrayBlock(t.toEpochMilli, blockSize)
val queries = SqlUtils.dataQueries(t, postgres.tables, expr)
val q = SqlUtils.unionAll(queries)
Using.resource(stmt.executeQuery(q)) { rs =>
while (rs.next()) {
val tags = exactTags ++ extractTags(expr, rs)
// The buffer step size always matches the storage for now. This can be optimized once
// the aggegates pushed down to postgres support consolidation while performing the
// aggregation.
val buffer = data.getOrElseUpdate(
tags,
TimeSeriesBuffer(tags, step, ctxt.start, ctxt.end)
)
copyValues(rs, block)
aggr(expr, buffer, block)
}
}
}
}

// Consolidate the final result set if needed
if (ctxt.step == step)
data.values.toList
else
data.values.map(_.consolidate(ctxt.step, expr.cf)).toList
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2014-2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.atlas.postgres

import com.netflix.iep.service.AbstractService
import com.typesafe.config.Config
import com.zaxxer.hikari.HikariConfig
import com.zaxxer.hikari.HikariDataSource
import org.postgresql.copy.CopyManager
import org.postgresql.core.BaseConnection

import java.sql.Connection
import java.sql.Statement
import scala.util.Using

/**
* Manage connections to postgres database.
*/
class PostgresService(val config: Config) extends AbstractService {

val tables: List[TableDefinition] = {
import scala.jdk.CollectionConverters.*
config
.getConfigList("atlas.postgres.tables")
.asScala
.toList
.map(TableDefinition.fromConfig)
}

private var ds: HikariDataSource = _

override def startImpl(): Unit = {
val c = config.getConfig("atlas.postgres")
Class.forName(c.getString("driver"))
val hc = new HikariConfig()
hc.setJdbcUrl(c.getString("url"))
hc.setUsername(c.getString("user"))
hc.setPassword(c.getString("password"))
ds = new HikariDataSource(hc)

Using.resource(ds.getConnection) { connection =>
// Ensure DB is appropriately configured
Using.resource(connection.createStatement()) { stmt =>
// Run init statements from config
c.getStringList("init-statements").forEach { sql =>
stmt.executeUpdate(sql)
}

// Setup helper functions
SqlUtils.customFunctions.foreach { sql =>
stmt.executeUpdate(sql)
}
}
}
}

override def stopImpl(): Unit = {
ds.close()
}

def getConnection: Connection = {
ds.getConnection
}

def getCopyManager: CopyManager = {
getConnection.unwrap(classOf[BaseConnection]).getCopyAPI
}

def runQueries[T](f: Statement => T): T = {
Using.resource(getConnection) { connection =>
Using.resource(connection.createStatement())(f)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2014-2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.atlas.postgres

import com.netflix.atlas.core.index.TagIndex
import com.netflix.atlas.core.index.TagQuery
import com.netflix.atlas.core.model.Tag
import com.netflix.atlas.core.model.TaggedItem

import java.sql.Statement
import java.time.Instant
import scala.util.Using

/** Lookups are delegated to PostgreSQL. Some operations like `size` are not supported. */
class PostgresTagIndex(postgres: PostgresService) extends TagIndex[TaggedItem] {

override def findTags(query: TagQuery): List[Tag] = {
val k = query.key.get
findValues(query).map(v => Tag(k, v))
}

private def times(stmt: Statement): List[Instant] = {
val ts = List.newBuilder[Instant]
val rs = stmt.executeQuery(SqlUtils.listTables)
while (rs.next()) {
SqlUtils.extractTime(rs.getString(1)).foreach(ts.addOne)
}
ts.result().distinct
}

override def findKeys(query: TagQuery): List[String] = {
Using.resource(postgres.getConnection) { connection =>
Using.resource(connection.createStatement()) { stmt =>
val queries = times(stmt).flatMap { t =>
SqlUtils.keyQueries(t, postgres.tables, query)
}
if (queries.isEmpty) {
Nil
} else {
val q = SqlUtils.union(queries)
val vs = List.newBuilder[String]
val rs = stmt.executeQuery(q)
while (rs.next()) {
vs += rs.getString(1)
}
vs.result().sorted.take(query.limit)
}
}
}
}

override def findValues(query: TagQuery): List[String] = {
Using.resource(postgres.getConnection) { connection =>
Using.resource(connection.createStatement()) { stmt =>
val queries = times(stmt).flatMap { t =>
SqlUtils.valueQueries(t, postgres.tables, query)
}
if (queries.isEmpty) {
Nil
} else {
val q = SqlUtils.union(queries)
val vs = List.newBuilder[String]
val rs = stmt.executeQuery(q)
while (rs.next()) {
vs += rs.getString(1)
}
vs.result().sorted.take(query.limit)
}
}
}
}

override def findItems(query: TagQuery): List[TaggedItem] = {
throw new UnsupportedOperationException("")
}

override def size: Int = 0
}
Loading

0 comments on commit 8691c9b

Please sign in to comment.