Skip to content

Commit

Permalink
Add the backfill implementation for MSSQL client
Browse files Browse the repository at this point in the history
## Scope

Part of #61

This PR implements the backfill behavior for MSSQL server client, where the client queries all data from the data source and returns this data as a LazyList.

NOTE: the `LazyList` class is used because the `Stream` class is deprecated
  • Loading branch information
s-vitaliy committed Nov 4, 2024
1 parent 35a58e5 commit 1a4876c
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
declare @currentVersion bigint = CHANGE_TRACKING_CURRENT_VERSION()
declare @currentVersion bigint = CHANGE_TRACKING_CURRENT_VERSION()

SELECT
{ChangeTrackingColumnsStatement},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
declare @currentVersion bigint = CHANGE_TRACKING_CURRENT_VERSION()
declare @currentVersion bigint = CHANGE_TRACKING_CURRENT_VERSION()

SELECT
{ChangeTrackingColumnsStatement},
Expand Down
22 changes: 22 additions & 0 deletions framework/arcane-framework/src/main/scala/models/DataColumn.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.sneaksanddata.arcane.framework
package models

/**
* Represents a row of data.
*/
type DataRow = List[DataColumn]

/**
* Represents a row of data.
*
* @param name The name of the row.
* @param Type The type of the row.
* @param value The value of the row.
*/
case class DataColumn(name: String, Type: ArcaneType, value: Any)

/**
* Companion object for [[DataColumn]].
*/
object DataColumn:
def apply(name: String, Type: ArcaneType, value: Any): DataColumn = new DataColumn(name, Type, value)
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package services.mssql

import models.{ArcaneSchema, ArcaneType, Field}
import services.base.{CanAdd, SchemaProvider}
import services.mssql.MsSqlConnection.{DATE_PARTITION_KEY, UPSERT_MERGE_KEY}
import services.mssql.MsSqlConnection.{DATE_PARTITION_KEY, UPSERT_MERGE_KEY, toArcaneType}

import com.microsoft.sqlserver.jdbc.SQLServerDriver

Expand Down Expand Up @@ -102,6 +102,13 @@ class MsSqlConnection(val connectionOptions: ConnectionOptions) extends AutoClos
sqlSchema <- getSqlSchema(query)
yield toSchema(sqlSchema, empty)


def backfill(arcaneSchema: ArcaneSchema): Future[QueryResult[LazyQueryResult.OutputType]] =
for query <- QueryProvider.getBackfillQuery(this)
runner = QueryRunner()
result <- runner.executeQuery(query, connection, LazyQueryResult.apply)
yield result

private def getSqlSchema(query: String): Future[SqlSchema] = Future {
val columns = Using.Manager { use =>
val statement = use(connection.createStatement())
Expand Down Expand Up @@ -130,23 +137,6 @@ class MsSqlConnection(val connectionOptions: ConnectionOptions) extends AutoClos
return result
readColumns(resultSet, result ++ List((resultSet.getString(1), resultSet.getInt(2) == 1)))

private def toArcaneType(sqlType: Int): ArcaneType = sqlType match
case java.sql.Types.BIGINT => ArcaneType.LongType
case java.sql.Types.BINARY => ArcaneType.ByteArrayType
case java.sql.Types.BIT => ArcaneType.BooleanType
case java.sql.Types.CHAR => ArcaneType.StringType
case java.sql.Types.DATE => ArcaneType.DateType
case java.sql.Types.TIMESTAMP => ArcaneType.TimestampType
case java.sql.Types.TIMESTAMP_WITH_TIMEZONE => ArcaneType.DateTimeOffsetType
case java.sql.Types.DECIMAL => ArcaneType.BigDecimalType
case java.sql.Types.DOUBLE => ArcaneType.DoubleType
case java.sql.Types.INTEGER => ArcaneType.IntType
case java.sql.Types.FLOAT => ArcaneType.FloatType
case java.sql.Types.SMALLINT => ArcaneType.ShortType
case java.sql.Types.TIME => ArcaneType.TimeType
case java.sql.Types.NCHAR => ArcaneType.StringType
case java.sql.Types.NVARCHAR => ArcaneType.StringType

object MsSqlConnection:
/**
* The key used to merge rows in the output table.
Expand All @@ -166,6 +156,31 @@ object MsSqlConnection:
*/
def apply(connectionOptions: ConnectionOptions): MsSqlConnection = new MsSqlConnection(connectionOptions)

/**
* Converts a SQL type to an Arcane type.
*
* @param sqlType The SQL type.
* @return The Arcane type.
*/
def toArcaneType(sqlType: Int): ArcaneType = sqlType match
case java.sql.Types.BIGINT => ArcaneType.LongType
case java.sql.Types.BINARY => ArcaneType.ByteArrayType
case java.sql.Types.BIT => ArcaneType.BooleanType
case java.sql.Types.CHAR => ArcaneType.StringType
case java.sql.Types.DATE => ArcaneType.DateType
case java.sql.Types.TIMESTAMP => ArcaneType.TimestampType
case java.sql.Types.TIMESTAMP_WITH_TIMEZONE => ArcaneType.DateTimeOffsetType
case java.sql.Types.DECIMAL => ArcaneType.BigDecimalType
case java.sql.Types.DOUBLE => ArcaneType.DoubleType
case java.sql.Types.INTEGER => ArcaneType.IntType
case java.sql.Types.FLOAT => ArcaneType.FloatType
case java.sql.Types.SMALLINT => ArcaneType.ShortType
case java.sql.Types.TIME => ArcaneType.TimeType
case java.sql.Types.NCHAR => ArcaneType.StringType
case java.sql.Types.NVARCHAR => ArcaneType.StringType
case java.sql.Types.VARCHAR => ArcaneType.StringType


object QueryProvider:
private implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global

Expand Down Expand Up @@ -209,7 +224,7 @@ object QueryProvider:
msSqlConnection.getColumnSummaries
.map(columnSummaries => {
val mergeExpression = QueryProvider.getMergeExpression(columnSummaries, "tq")
val columnExpression = QueryProvider.getChangeTrackingColumns(columnSummaries, "ct", "tq")
val columnExpression = QueryProvider.getChangeTrackingColumns(columnSummaries, "tq")
QueryProvider.getAllQuery(
msSqlConnection.connectionOptions,
mergeExpression,
Expand Down Expand Up @@ -243,6 +258,15 @@ object QueryProvider:
.map((name, _) => s"$tableAlias.[$name]")
(primaryKeyColumns ++ additionalColumns ++ nonPrimaryKeyColumns).mkString(",\n")

private def getChangeTrackingColumns(tableColumns: List[ColumnSummary], tableAlias: String): String =
val primaryKeyColumns = tableColumns.filter((_, isPrimaryKey) => isPrimaryKey).map((name, _) => s"$tableAlias.[$name]")
val additionalColumns = List("0 as SYS_CHANGE_VERSION", "'I' as SYS_CHANGE_OPERATION")
val nonPrimaryKeyColumns = tableColumns
.filter((name, isPrimaryKey) => !isPrimaryKey && !Set("SYS_CHANGE_VERSION", "SYS_CHANGE_OPERATION").contains(name))
.map((name, _) => s"$tableAlias.[$name]")

(primaryKeyColumns ++ additionalColumns ++ nonPrimaryKeyColumns).mkString(",\n")

private def getChangesQuery(connectionOptions: ConnectionOptions,
mergeExpression: String,
columnStatement: String,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.sneaksanddata.arcane.framework
package services.mssql

import models.{DataColumn, DataRow}

import java.sql.{ResultSet, Statement}
import scala.annotation.tailrec

/**
* Represents the result of a query to a SQL database.
*/
trait QueryResult[Output] {

type OutputType = Output

/**
* Reads the result of the SQL query mapped to an output type.
*
* @return The result of the query.
*/
def read: OutputType

}

/**
* Lazy-list based implementation of [[QueryResult]].
*
* @param statement The statement used to execute the query.
* @param resultSet The result set of the query.
*/
class LazyQueryResult(statement: Statement, resultSet: ResultSet) extends QueryResult[LazyList[DataRow]] with AutoCloseable {

/**
* Reads the result of the query.
*
* @return The result of the query.
*/
override def read: this.OutputType =
val columns = resultSet.getMetaData.getColumnCount
LazyList.continually(resultSet)
.takeWhile(_.next())
.map(row => toDataRow(row, columns, List.empty))


/**
* Closes the statement and the result set owned by this object.
* When a Statement object is closed, its current ResultSet object, if one exists, is also closed.
*/
override def close(): Unit = statement.close()

@tailrec
private def toDataRow(row: ResultSet, columns: Int, acc: DataRow): DataRow =
if columns == 0 then acc
else
val name = row.getMetaData.getColumnName(columns)
val value = row.getObject(columns)
val dataType = row.getMetaData.getColumnType(columns)
val arcaneType = MsSqlConnection.toArcaneType(dataType)
toDataRow(row, columns - 1, DataColumn(name, arcaneType, value) :: acc)

}

/**
* Companion object for [[LazyQueryResult]].
*/
object LazyQueryResult {

/**
* The output type of the query result.
*/
type OutputType = LazyList[DataRow]

/**
* Creates a new [[LazyQueryResult]] object.
*
* @param statement The statement used to execute the query.
* @param resultSet The result set of the query.
* @return The new [[LazyQueryResult]] object.
*/
def apply(statement: Statement, resultSet: ResultSet): LazyQueryResult = new LazyQueryResult(statement, resultSet)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.sneaksanddata.arcane.framework
package services.mssql

import java.sql.{Connection, ResultSet, Statement}
import scala.concurrent.{Future, blocking}

/**
* A class that runs a query on a SQL database.
* This class is intended to run a single query and traverse the results only once using the forward-only read-only
* cursor.
*
*/
class QueryRunner:

/**
* A factory for creating a QueryResult object from a statement and a result set.
*
* @tparam Output The type of the output of the query.
*/
private type ResultFactory[Output] = (Statement, ResultSet) => QueryResult[Output]

private implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global

/**
* Executes the given query on the given connection.
*
* The QueryResult object produced by this method must not outlive the connection object passed to it.
*
* @param query The query to execute.
* @param connection The connection to execute the query on.
* @return The result of the query.
*/
def executeQuery[Result](query: MsSqlQuery, connection: Connection, resultFactory: ResultFactory[Result]): Future[QueryResult[Result]] =
Future {
val statement = connection.createStatement()
val resultSet = blocking {
statement.executeQuery(query)
}
resultFactory(statement, resultSet)
}


object QueryRunner:
def apply(): QueryRunner = new QueryRunner()
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ class MsSqlConnectorsTests extends flatspec.AsyncFlatSpec with Matchers:
for i <- 1 to 10 do
val insertCmd = s"use arcane; insert into dbo.MsSqlConnectorsTests values($i, ${i+1})"
statement.execute(insertCmd)
statement.close()

val updateStatement = con.createStatement()
for i <- 1 to 10 do
val insertCmd = s"use arcane; insert into dbo.MsSqlConnectorsTests values(${i * 1000}, ${i * 1000 + 1})"
updateStatement.execute(insertCmd)


def removeDb(): Unit =
Expand Down Expand Up @@ -107,3 +113,26 @@ class MsSqlConnectorsTests extends flatspec.AsyncFlatSpec with Matchers:
fields should be(List(IntType, LongType, StringType, IntType, LongType, StringType, StringType))
}
}


"MsSqlConnection" should "return correct number of rows on backfill" in withDatabase { dbInfo =>
val connection = MsSqlConnection(dbInfo.connectionOptions)
for schema <- connection.getSchema
backfill <- connection.backfill(schema)
result = backfill.read.toList
yield {
result should have length 20
}
}


"MsSqlConnection" should "return correct number of columns on backfill" in withDatabase { dbInfo =>
val connection = MsSqlConnection(dbInfo.connectionOptions)
for schema <- connection.getSchema
backfill <- connection.backfill(schema)
result = backfill.read.toList
head = result.head
yield {
head should have length 7
}
}

0 comments on commit 1a4876c

Please sign in to comment.