Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

postgres: db implementation #1603

Merged
merged 1 commit into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2014-2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.atlas.postgres

import java.time.Instant

private[postgres] case class Interval(start: Instant, end: Instant) {

def overlaps(interval: Interval): Boolean = {
val s1 = start.toEpochMilli
val e1 = end.toEpochMilli
val s2 = interval.start.toEpochMilli
val e2 = interval.end.toEpochMilli

(s1 >= s2 && s1 <= e2) || (e1 >= s2 && e1 <= e2) || (s1 >= s2 && e1 <= e2) || (s2 >= s1 && e2 <= e1)
}
}

private[postgres] object Interval {

def apply(s: Long, e: Long): Interval = {
Interval(Instant.ofEpochMilli(s), Instant.ofEpochMilli(e))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright 2014-2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.atlas.postgres

import com.netflix.atlas.core.db.Database
import com.netflix.atlas.core.db.TimeSeriesBuffer
import com.netflix.atlas.core.index.TagIndex
import com.netflix.atlas.core.model.ArrayBlock
import com.netflix.atlas.core.model.DataExpr
import com.netflix.atlas.core.model.EvalContext
import com.netflix.atlas.core.model.Query
import com.netflix.atlas.core.model.TaggedItem
import com.netflix.atlas.core.model.TimeSeries

import java.sql.ResultSet
import java.sql.Statement
import java.time.Instant
import java.util
import scala.collection.mutable
import scala.util.Using

/**
* Database implementation that delegates to PostgreSQL with the blocks stored as rows in the
* time based tables. This class only handles queries, the data loading would be managed separately
* and is more specific to the backing long term storage, e.g. S3.
*/
class PostgresDatabase(postgres: PostgresService) extends Database {

private val blockSize: Int = postgres.config.getInt("atlas.postgres.block-size")
private val step: Long = postgres.config.getDuration("atlas.postgres.step").toMillis

private val blockDuration = blockSize * step / 1000

override val index: TagIndex[? <: TaggedItem] = {
new PostgresTagIndex(postgres)
}

private def overlappingTimes(stmt: Statement, ctxt: EvalContext): List[Instant] = {
val interval = Interval(ctxt.start, ctxt.end)
val ts = List.newBuilder[Instant]
val rs = stmt.executeQuery(SqlUtils.listTables)
while (rs.next()) {
SqlUtils.extractTime(rs.getString(1)).foreach { t =>
if (interval.overlaps(Interval(t, t.plusSeconds(blockDuration)))) {
ts += t
}
}
}
ts.result().distinct
}

private def extractTags(expr: DataExpr, rs: ResultSet): Map[String, String] = {
expr.finalGrouping.map { k =>
k -> rs.getString(k)
}.toMap
}

private def copyValues(rs: ResultSet, block: ArrayBlock): Unit = {
val sqlArray = rs.getArray("values")
if (sqlArray != null) {
val array = sqlArray.getArray.asInstanceOf[Array[java.lang.Double]]
var i = 0
while (i < array.length) {
block.buffer(i) = array(i).doubleValue()
i += 1
}
} else {
// Avoid carrying state over if block data is not overwritten
util.Arrays.fill(block.buffer, Double.NaN)
}
}

@scala.annotation.tailrec
private def aggr(expr: DataExpr, buffer: TimeSeriesBuffer, block: ArrayBlock): Unit = {
expr match {
case _: DataExpr.Sum => buffer.add(block)
case _: DataExpr.Count => buffer.add(block)
case _: DataExpr.Min => buffer.min(block)
case _: DataExpr.Max => buffer.max(block)
case DataExpr.Consolidation(af, _) => aggr(af, buffer, block)
case DataExpr.GroupBy(af, _) => aggr(af, buffer, block)
case e => throw new MatchError(s"unsupported DataExpr: $e")
}
}

override def execute(ctxt: EvalContext, expr: DataExpr): List[TimeSeries] = {
val exactTags = Query.tags(expr.query)
val data = mutable.AnyRefMap.empty[Map[String, String], TimeSeriesBuffer]
postgres.runQueries { stmt =>
overlappingTimes(stmt, ctxt).foreach { t =>
val block = ArrayBlock(t.toEpochMilli, blockSize)
val queries = SqlUtils.dataQueries(t, postgres.tables, expr)
val q = SqlUtils.unionAll(queries)
Using.resource(stmt.executeQuery(q)) { rs =>
while (rs.next()) {
val tags = exactTags ++ extractTags(expr, rs)
// The buffer step size always matches the storage for now. This can be optimized once
// the aggegates pushed down to postgres support consolidation while performing the
// aggregation.
val buffer = data.getOrElseUpdate(
tags,
TimeSeriesBuffer(tags, step, ctxt.start, ctxt.end)
)
copyValues(rs, block)
aggr(expr, buffer, block)
}
}
}
}

// Consolidate the final result set if needed
if (ctxt.step == step)
data.values.toList
else
data.values.map(_.consolidate(ctxt.step, expr.cf)).toList
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2014-2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.atlas.postgres

import com.netflix.iep.service.AbstractService
import com.typesafe.config.Config
import com.zaxxer.hikari.HikariConfig
import com.zaxxer.hikari.HikariDataSource
import org.postgresql.copy.CopyManager
import org.postgresql.core.BaseConnection

import java.sql.Connection
import java.sql.Statement
import scala.util.Using

/**
* Manage connections to postgres database.
*/
class PostgresService(val config: Config) extends AbstractService {

val tables: List[TableDefinition] = {
import scala.jdk.CollectionConverters.*
config
.getConfigList("atlas.postgres.tables")
.asScala
.toList
.map(TableDefinition.fromConfig)
}

private var ds: HikariDataSource = _

override def startImpl(): Unit = {
val c = config.getConfig("atlas.postgres")
Class.forName(c.getString("driver"))
val hc = new HikariConfig()
hc.setJdbcUrl(c.getString("url"))
hc.setUsername(c.getString("user"))
hc.setPassword(c.getString("password"))
ds = new HikariDataSource(hc)

Using.resource(ds.getConnection) { connection =>
// Ensure DB is appropriately configured
Using.resource(connection.createStatement()) { stmt =>
// Run init statements from config
c.getStringList("init-statements").forEach { sql =>
stmt.executeUpdate(sql)
}

// Setup helper functions
SqlUtils.customFunctions.foreach { sql =>
stmt.executeUpdate(sql)
}
}
}
}

override def stopImpl(): Unit = {
ds.close()
}

def getConnection: Connection = {
ds.getConnection
}

def getCopyManager: CopyManager = {
getConnection.unwrap(classOf[BaseConnection]).getCopyAPI
}

def runQueries[T](f: Statement => T): T = {
Using.resource(getConnection) { connection =>
Using.resource(connection.createStatement())(f)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2014-2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.atlas.postgres

import com.netflix.atlas.core.index.TagIndex
import com.netflix.atlas.core.index.TagQuery
import com.netflix.atlas.core.model.Tag
import com.netflix.atlas.core.model.TaggedItem

import java.sql.Statement
import java.time.Instant
import scala.util.Using

/** Lookups are delegated to PostgreSQL. Some operations like `size` are not supported. */
class PostgresTagIndex(postgres: PostgresService) extends TagIndex[TaggedItem] {

override def findTags(query: TagQuery): List[Tag] = {
val k = query.key.get
findValues(query).map(v => Tag(k, v))
}

private def times(stmt: Statement): List[Instant] = {
val ts = List.newBuilder[Instant]
val rs = stmt.executeQuery(SqlUtils.listTables)
while (rs.next()) {
SqlUtils.extractTime(rs.getString(1)).foreach(ts.addOne)
}
ts.result().distinct
}

override def findKeys(query: TagQuery): List[String] = {
Using.resource(postgres.getConnection) { connection =>
Using.resource(connection.createStatement()) { stmt =>
val queries = times(stmt).flatMap { t =>
SqlUtils.keyQueries(t, postgres.tables, query)
}
if (queries.isEmpty) {
Nil
} else {
val q = SqlUtils.union(queries)
val vs = List.newBuilder[String]
val rs = stmt.executeQuery(q)
while (rs.next()) {
vs += rs.getString(1)
}
vs.result().sorted.take(query.limit)
}
}
}
}

override def findValues(query: TagQuery): List[String] = {
Using.resource(postgres.getConnection) { connection =>
Using.resource(connection.createStatement()) { stmt =>
val queries = times(stmt).flatMap { t =>
SqlUtils.valueQueries(t, postgres.tables, query)
}
if (queries.isEmpty) {
Nil
} else {
val q = SqlUtils.union(queries)
val vs = List.newBuilder[String]
val rs = stmt.executeQuery(q)
while (rs.next()) {
vs += rs.getString(1)
}
vs.result().sorted.take(query.limit)
}
}
}
}

override def findItems(query: TagQuery): List[TaggedItem] = {
throw new UnsupportedOperationException("")
}

override def size: Int = 0
}
Loading
Loading