Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement getChanges method for Arcane SQL server #75

Merged
merged 6 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,20 @@ package services.mssql

import models.{DataCell, DataRow}
import services.mssql.MsSqlConnection.toArcaneType
import services.mssql.base.{QueryResult, ResultSetOwner}

import java.sql.{ResultSet, Statement}
import scala.annotation.tailrec
import scala.util.{Failure, Success, Try}

/**
* Represents the result of a query to a SQL database.
*/
trait QueryResult[Output] {

type OutputType = Output

/**
* Reads the result of the SQL query mapped to an output type.
*
* @return The result of the query.
*/
def read: OutputType

}

/**
* Lazy-list based implementation of [[QueryResult]].
*
* @param statement The statement used to execute the query.
* @param resultSet The result set of the query.
*/
class LazyQueryResult(statement: Statement, resultSet: ResultSet) extends QueryResult[LazyList[DataRow]] with AutoCloseable {
class LazyQueryResult(protected val statement: Statement, resultSet: ResultSet) extends QueryResult[LazyList[DataRow]] with ResultSetOwner:

/**
* Reads the result of the query.
Expand All @@ -48,13 +34,6 @@ class LazyQueryResult(statement: Statement, resultSet: ResultSet) extends QueryR
}
})


/**
* Closes the statement and the result set owned by this object.
* When a Statement object is closed, its current ResultSet object, if one exists, is also closed.
*/
override def close(): Unit = statement.close()

@tailrec
private def toDataRow(row: ResultSet, columns: Int, acc: DataRow): Try[DataRow] =
if columns == 0 then Success(acc)
Expand All @@ -65,7 +44,6 @@ class LazyQueryResult(statement: Statement, resultSet: ResultSet) extends QueryR
toArcaneType(dataType) match
case Success(arcaneType) => toDataRow(row, columns - 1, DataCell(name, arcaneType, value) :: acc)
case Failure(exception) => Failure(exception)
}

/**
* Companion object for [[LazyQueryResult]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@ package services.mssql
import models.{ArcaneSchema, ArcaneType, Field}
import services.base.{CanAdd, SchemaProvider}
import services.mssql.MsSqlConnection.{DATE_PARTITION_KEY, UPSERT_MERGE_KEY, toArcaneType}
import services.mssql.base.QueryResult

import com.microsoft.sqlserver.jdbc.SQLServerDriver

import java.sql.ResultSet
import java.time.format.DateTimeFormatter
import java.time.{Duration, Instant, LocalDateTime, ZoneOffset}
import java.util.Properties
import scala.annotation.tailrec
import scala.concurrent.{Future, blocking}
import scala.io.Source
import scala.util.{Success, Failure, Try, Using}
import scala.util.{Failure, Success, Try, Using}

/**
* Represents a summary of a column in a table.
Expand Down Expand Up @@ -60,7 +63,7 @@ class MsSqlConnection(val connectionOptions: ConnectionOptions) extends AutoClos
private val driver = new SQLServerDriver()
private val connection = driver.connect(connectionOptions.connectionUrl, new Properties())
private implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global

private implicit val formatter: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")

/**
* Gets the column summaries for the table in the database.
Expand All @@ -80,6 +83,37 @@ class MsSqlConnection(val connectionOptions: ConnectionOptions) extends AutoClos
result.get
}

/**
* Run a backfill query on the database.
*
* @param arcaneSchema The schema for the data produced by Arcane.
* @return A future containing the result of the backfill.
*/
def backfill(arcaneSchema: ArcaneSchema)(using queryRunner: QueryRunner): Future[QueryResult[LazyQueryResult.OutputType]] =
for query <- QueryProvider.getBackfillQuery(this)
result <- queryRunner.executeQuery(query, connection, LazyQueryResult.apply)
yield result

/**
* Gets the changes in the database since the given version.
* @param latestVersion The version to start from.
* @param lookBackInterval The look back interval for the query.
* @return A future containing the changes in the database since the given version and the latest observed version.
*/
def getChanges(latestVersion: Long, lookBackInterval: Duration)(using queryRunner: QueryRunner): Future[(QueryResult[LazyQueryResult.OutputType], Long)] =
val query = QueryProvider.getChangeTrackingVersionQuery(connectionOptions.databaseName, latestVersion, lookBackInterval)

for versionResult <- queryRunner.executeQuery(query, connection, (st, rs) => ScalarQueryResult.apply(st, rs, readChangeTrackingVersion))
version = versionResult.read.getOrElse(0L)
changesQuery <- QueryProvider.getChangesQuery(this, version)
result <- queryRunner.executeQuery(changesQuery, connection, LazyQueryResult.apply)
yield (result, version)

private def readChangeTrackingVersion(resultSet: ResultSet): Long =
resultSet.getMetaData.getColumnType(1) match
case java.sql.Types.BIGINT => resultSet.getLong(1)
case _ => throw new IllegalArgumentException(s"Invalid column type for change tracking version: ${resultSet.getMetaData.getColumnType(1)}, expected BIGINT")

/**
* Closes the connection to the database.
*/
Expand All @@ -104,13 +138,6 @@ class MsSqlConnection(val connectionOptions: ConnectionOptions) extends AutoClos
case Success(schema) => schema
case Failure(exception) => throw exception


def backfill(arcaneSchema: ArcaneSchema): Future[QueryResult[LazyQueryResult.OutputType]] =
for query <- QueryProvider.getBackfillQuery(this)
runner = QueryRunner()
result <- runner.executeQuery(query, connection, LazyQueryResult.apply)
yield result

private def getSqlSchema(query: String): Future[SqlSchema] = Future {
val columns = Using.Manager { use =>
val statement = use(connection.createStatement())
Expand Down Expand Up @@ -209,6 +236,26 @@ object QueryProvider:
Long.MaxValue)
})

/**
* Gets the changes query for the Microsoft SQL Server database.
* @param msSqlConnection The connection to the database.
* @param fromVersion The version to start from.
* @return A future containing the changes query for the Microsoft SQL Server database.
*/
def getChangesQuery(msSqlConnection: MsSqlConnection, fromVersion: Long): Future[MsSqlQuery] =
msSqlConnection.getColumnSummaries
s-vitaliy marked this conversation as resolved.
Show resolved Hide resolved
.map(columnSummaries => {
val mergeExpression = QueryProvider.getMergeExpression(columnSummaries, "tq")
val columnExpression = QueryProvider.getChangeTrackingColumns(columnSummaries, "ct", "tq")
val matchStatement = QueryProvider.getMatchStatement(columnSummaries, "ct", "tq", None)
QueryProvider.getChangesQuery(
msSqlConnection.connectionOptions,
mergeExpression,
columnExpression,
matchStatement,
fromVersion)
})

/**
* Gets the column summaries query for the Microsoft SQL Server database.
*
Expand All @@ -225,6 +272,11 @@ object QueryProvider:
.replace("{schema}", schemaName)
.replace("{table}", tableName)

/**
* Gets the changes query for the Microsoft SQL Server database.
* @param msSqlConnection The connection to the database.
* @return A future containing the changes query for the Microsoft SQL Server database.
*/
def getBackfillQuery(msSqlConnection: MsSqlConnection): Future[MsSqlQuery] =
msSqlConnection.getColumnSummaries
.map(columnSummaries => {
Expand All @@ -236,6 +288,23 @@ object QueryProvider:
columnExpression)
})

/**
* Gets the query that retrieves the change tracking version for the Microsoft SQL Server database.
*
* @param databaseName The name of the database.
* @param version The version to start from.
* @param lookBackRange The look back range for the query.
* @return The change tracking version query for the Microsoft SQL Server database.
*/
def getChangeTrackingVersionQuery(databaseName: String, version: Long, lookBackRange: Duration)(using formatter: DateTimeFormatter): MsSqlQuery = {
version match
case 0 =>
val lookBackTime = Instant.now().minusSeconds(lookBackRange.getSeconds)
val formattedTime = formatter.format(LocalDateTime.ofInstant(lookBackTime, ZoneOffset.UTC))
s"SELECT MIN(commit_ts) FROM $databaseName.sys.dm_tran_commit_table WHERE commit_time > '$formattedTime'"
case _ => s"SELECT MIN(commit_ts) FROM sys.dm_tran_commit_table WHERE commit_ts > $version"
}

private def getMergeExpression(cs: List[ColumnSummary], tableAlias: String): String =
cs.filter((name, isPrimaryKey) => isPrimaryKey)
.map((name, _) => s"cast($tableAlias.[$name] as nvarchar(128))")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.sneaksanddata.arcane.framework
package services.mssql

import services.mssql.base.QueryResult

import java.sql.{Connection, ResultSet, Statement}
import scala.concurrent.{Future, blocking}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.sneaksanddata.arcane.framework
package services.mssql

import models.{DataCell, DataRow}
import services.mssql.MsSqlConnection.toArcaneType
import services.mssql.base.{QueryResult, ResultSetOwner}

import java.sql.{ResultSet, Statement}
import scala.annotation.tailrec
import scala.util.{Failure, Success, Try}

/**
* Callback function that converts a result set to a result.
* @tparam Result The type of the result.
*/
type ResultConverter[Result] = ResultSet => Result

/**
* Implementation of the [[QueryResult]] trait that reads the scalar result of a query.
*
* @param statement The statement used to execute the query.
* @param resultSet The result set of the query.
*/
class ScalarQueryResult[Result](val statement: Statement, resultSet: ResultSet, resultConverter: ResultConverter[Result])
extends QueryResult[Option[Result]] with ResultSetOwner:

/**
* Reads the result of the query.
*
* @return The result of the query.
*/
override def read: this.OutputType =
resultSet.getMetaData.getColumnCount match
case 1 =>
if resultSet.next() then
Some(resultConverter(resultSet))
else
None
case _ => None


/**
* Companion object for [[LazyQueryResult]].
*/
object ScalarQueryResult {
/**
* Creates a new [[LazyQueryResult]] object.
*
* @param statement The statement used to execute the query.
* @param resultSet The result set of the query.
* @return The new [[LazyQueryResult]] object.
*/
def apply[Result](statement: Statement, resultSet: ResultSet, resultConverter: ResultConverter[Result]): ScalarQueryResult[Result] =
new ScalarQueryResult[Result](statement, resultSet, resultConverter)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.sneaksanddata.arcane.framework
package services.mssql.base

/**
* Represents the result of a query to a SQL database.
*/
trait QueryResult[Output] {

/**
* The output type of the query result.
*/
type OutputType = Output

/**
* Reads the result of the SQL query mapped to an output type.
*
* @return The result of the query.
*/
def read: OutputType

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.sneaksanddata.arcane.framework
package services.mssql.base

import java.sql.Statement

/**
* AutoCloseable mixin for classes that own a result set.
*/
trait ResultSetOwner extends AutoCloseable {
protected val statement: Statement

/**
* Closes the statement and the result set owned by this object.
* When a Statement object is closed, its current ResultSet object, if one exists, is also closed.
*/
override def close(): Unit = statement.close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package services.connectors.mssql

import models.ArcaneType.{IntType, LongType, StringType}
import models.Field
import services.mssql.{ConnectionOptions, MsSqlConnection, QueryProvider}
import services.mssql.{ConnectionOptions, MsSqlConnection, QueryProvider, QueryRunner}

import com.microsoft.sqlserver.jdbc.SQLServerDriver
import org.scalatest.*
import org.scalatest.matchers.must.Matchers
import org.scalatest.matchers.should.Matchers.*

import java.sql.Connection
import java.time.Duration
import java.util.Properties
import scala.List
import scala.concurrent.Future
Expand All @@ -20,6 +21,7 @@ case class TestConnectionInfo(connectionOptions: ConnectionOptions, connection:

class MsSqlConnectorsTests extends flatspec.AsyncFlatSpec with Matchers:
implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global
private implicit val queryRunner: QueryRunner = QueryRunner()
val connectionUrl = "jdbc:sqlserver://localhost;encrypt=true;trustServerCertificate=true;username=sa;password=tMIxN11yGZgMC"

def createDb(): TestConnectionInfo =
Expand Down Expand Up @@ -114,7 +116,6 @@ class MsSqlConnectorsTests extends flatspec.AsyncFlatSpec with Matchers:
}
}


"MsSqlConnection" should "return correct number of rows on backfill" in withDatabase { dbInfo =>
val connection = MsSqlConnection(dbInfo.connectionOptions)
for schema <- connection.getSchema
Expand All @@ -125,7 +126,6 @@ class MsSqlConnectorsTests extends flatspec.AsyncFlatSpec with Matchers:
}
}


"MsSqlConnection" should "return correct number of columns on backfill" in withDatabase { dbInfo =>
val connection = MsSqlConnection(dbInfo.connectionOptions)
for schema <- connection.getSchema
Expand All @@ -136,3 +136,24 @@ class MsSqlConnectorsTests extends flatspec.AsyncFlatSpec with Matchers:
head should have length 7
}
}

"MsSqlConnection" should "return correct number of rows on getChanges" in withDatabase { dbInfo =>
val connection = MsSqlConnection(dbInfo.connectionOptions)
for schema <- connection.getSchema
result <- connection.getChanges(0, Duration.ofDays(1))
(columns, _ ) = result
changedData = columns.read.toList
yield {
changedData should have length 20
}
}

"MsSqlConnection" should "update latest version when changes received" in withDatabase { dbInfo =>
val connection = MsSqlConnection(dbInfo.connectionOptions)
for schema <- connection.getSchema
result <- connection.getChanges(0, Duration.ofDays(1))
(_, latestVersion) = result
yield {
latestVersion should be > 0L
}
}
Loading