Skip to content

Commit

Permalink
Add the backfill implementation for MSSQL client (#74)
Browse files Browse the repository at this point in the history
* Add the backfill implementation for MSSQL client

## Scope

Part of #61

This PR implements the backfill behavior for MSSQL server client, where the client queries all data from the data source and returns this data as a LazyList.

NOTE: the `LazyList` class is used because the `Stream` class is deprecated

* Fix unit tests

* Update framework/arcane-framework/src/main/scala/models/DataColumn.scala

Co-authored-by: George Zubrienko <[email protected]>

* Review fixes

---------

Co-authored-by: George Zubrienko <[email protected]>
  • Loading branch information
s-vitaliy and george-zubrienko authored Nov 4, 2024
1 parent 35a58e5 commit 666cfd4
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
declare @currentVersion bigint = CHANGE_TRACKING_CURRENT_VERSION()
declare @currentVersion bigint = CHANGE_TRACKING_CURRENT_VERSION()

SELECT
{ChangeTrackingColumnsStatement},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
declare @currentVersion bigint = CHANGE_TRACKING_CURRENT_VERSION()
declare @currentVersion bigint = CHANGE_TRACKING_CURRENT_VERSION()

SELECT
{ChangeTrackingColumnsStatement},
Expand Down
22 changes: 22 additions & 0 deletions framework/arcane-framework/src/main/scala/models/DataCell.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.sneaksanddata.arcane.framework
package models

/**
* Represents a row of data.
*/
type DataRow = List[DataCell]

/**
* Represents a row of data.
*
* @param name The name of the row.
* @param Type The type of the row.
* @param value The value of the row.
*/
case class DataCell(name: String, Type: ArcaneType, value: Any)

/**
* Companion object for [[DataCell]].
*/
object DataCell:
def apply(name: String, Type: ArcaneType, value: Any): DataCell = new DataCell(name, Type, value)
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package services.mssql

import models.{ArcaneSchema, ArcaneType, Field}
import services.base.{CanAdd, SchemaProvider}
import services.mssql.MsSqlConnection.{DATE_PARTITION_KEY, UPSERT_MERGE_KEY}
import services.mssql.MsSqlConnection.{DATE_PARTITION_KEY, UPSERT_MERGE_KEY, toArcaneType}

import com.microsoft.sqlserver.jdbc.SQLServerDriver

Expand All @@ -12,7 +12,7 @@ import java.util.Properties
import scala.annotation.tailrec
import scala.concurrent.{Future, blocking}
import scala.io.Source
import scala.util.Using
import scala.util.{Success, Failure, Try, Using}

/**
* Represents a summary of a column in a table.
Expand Down Expand Up @@ -100,7 +100,16 @@ class MsSqlConnection(val connectionOptions: ConnectionOptions) extends AutoClos
override def getSchema: Future[this.SchemaType] =
for query <- QueryProvider.getSchemaQuery(this)
sqlSchema <- getSqlSchema(query)
yield toSchema(sqlSchema, empty)
yield toSchema(sqlSchema, empty) match
case Success(schema) => schema
case Failure(exception) => throw exception


def backfill(arcaneSchema: ArcaneSchema): Future[QueryResult[LazyQueryResult.OutputType]] =
for query <- QueryProvider.getBackfillQuery(this)
runner = QueryRunner()
result <- runner.executeQuery(query, connection, LazyQueryResult.apply)
yield result

private def getSqlSchema(query: String): Future[SqlSchema] = Future {
val columns = Using.Manager { use =>
Expand All @@ -115,12 +124,14 @@ class MsSqlConnection(val connectionOptions: ConnectionOptions) extends AutoClos
}

@tailrec
private def toSchema(sqlSchema: SqlSchema, schema: this.SchemaType): this.SchemaType =
private def toSchema(sqlSchema: SqlSchema, schema: this.SchemaType): Try[this.SchemaType] =
sqlSchema match
case Nil => schema
case Nil => Success(schema)
case x +: xs =>
val (name, fieldType) = x
toSchema(xs, schema.addField(name, toArcaneType(fieldType)))
toArcaneType(fieldType) match
case Success(arcaneType) => toSchema(xs, schema.addField(name, arcaneType))
case Failure(exception) => Failure[this.SchemaType](exception)

@tailrec
private def readColumns(resultSet: ResultSet, result: List[ColumnSummary]): List[ColumnSummary] =
Expand All @@ -130,23 +141,6 @@ class MsSqlConnection(val connectionOptions: ConnectionOptions) extends AutoClos
return result
readColumns(resultSet, result ++ List((resultSet.getString(1), resultSet.getInt(2) == 1)))

private def toArcaneType(sqlType: Int): ArcaneType = sqlType match
case java.sql.Types.BIGINT => ArcaneType.LongType
case java.sql.Types.BINARY => ArcaneType.ByteArrayType
case java.sql.Types.BIT => ArcaneType.BooleanType
case java.sql.Types.CHAR => ArcaneType.StringType
case java.sql.Types.DATE => ArcaneType.DateType
case java.sql.Types.TIMESTAMP => ArcaneType.TimestampType
case java.sql.Types.TIMESTAMP_WITH_TIMEZONE => ArcaneType.DateTimeOffsetType
case java.sql.Types.DECIMAL => ArcaneType.BigDecimalType
case java.sql.Types.DOUBLE => ArcaneType.DoubleType
case java.sql.Types.INTEGER => ArcaneType.IntType
case java.sql.Types.FLOAT => ArcaneType.FloatType
case java.sql.Types.SMALLINT => ArcaneType.ShortType
case java.sql.Types.TIME => ArcaneType.TimeType
case java.sql.Types.NCHAR => ArcaneType.StringType
case java.sql.Types.NVARCHAR => ArcaneType.StringType

object MsSqlConnection:
/**
* The key used to merge rows in the output table.
Expand All @@ -166,6 +160,32 @@ object MsSqlConnection:
*/
def apply(connectionOptions: ConnectionOptions): MsSqlConnection = new MsSqlConnection(connectionOptions)

/**
* Converts a SQL type to an Arcane type.
*
* @param sqlType The SQL type.
* @return The Arcane type.
*/
def toArcaneType(sqlType: Int): Try[ArcaneType] = sqlType match
case java.sql.Types.BIGINT => Success(ArcaneType.LongType)
case java.sql.Types.BINARY => Success(ArcaneType.ByteArrayType)
case java.sql.Types.BIT => Success(ArcaneType.BooleanType)
case java.sql.Types.CHAR => Success(ArcaneType.StringType)
case java.sql.Types.DATE => Success(ArcaneType.DateType)
case java.sql.Types.TIMESTAMP => Success(ArcaneType.TimestampType)
case java.sql.Types.TIMESTAMP_WITH_TIMEZONE => Success(ArcaneType.DateTimeOffsetType)
case java.sql.Types.DECIMAL => Success(ArcaneType.BigDecimalType)
case java.sql.Types.DOUBLE => Success(ArcaneType.DoubleType)
case java.sql.Types.INTEGER => Success(ArcaneType.IntType)
case java.sql.Types.FLOAT => Success(ArcaneType.FloatType)
case java.sql.Types.SMALLINT => Success(ArcaneType.ShortType)
case java.sql.Types.TIME => Success(ArcaneType.TimeType)
case java.sql.Types.NCHAR => Success(ArcaneType.StringType)
case java.sql.Types.NVARCHAR => Success(ArcaneType.StringType)
case java.sql.Types.VARCHAR => Success(ArcaneType.StringType)
case _ => Failure(new IllegalArgumentException(s"Unsupported SQL type: $sqlType"))


object QueryProvider:
private implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global

Expand Down Expand Up @@ -209,7 +229,7 @@ object QueryProvider:
msSqlConnection.getColumnSummaries
.map(columnSummaries => {
val mergeExpression = QueryProvider.getMergeExpression(columnSummaries, "tq")
val columnExpression = QueryProvider.getChangeTrackingColumns(columnSummaries, "ct", "tq")
val columnExpression = QueryProvider.getChangeTrackingColumns(columnSummaries, "tq")
QueryProvider.getAllQuery(
msSqlConnection.connectionOptions,
mergeExpression,
Expand Down Expand Up @@ -243,6 +263,15 @@ object QueryProvider:
.map((name, _) => s"$tableAlias.[$name]")
(primaryKeyColumns ++ additionalColumns ++ nonPrimaryKeyColumns).mkString(",\n")

private def getChangeTrackingColumns(tableColumns: List[ColumnSummary], tableAlias: String): String =
val primaryKeyColumns = tableColumns.filter((_, isPrimaryKey) => isPrimaryKey).map((name, _) => s"$tableAlias.[$name]")
val additionalColumns = List("0 as SYS_CHANGE_VERSION", "'I' as SYS_CHANGE_OPERATION")
val nonPrimaryKeyColumns = tableColumns
.filter((name, isPrimaryKey) => !isPrimaryKey && !Set("SYS_CHANGE_VERSION", "SYS_CHANGE_OPERATION").contains(name))
.map((name, _) => s"$tableAlias.[$name]")

(primaryKeyColumns ++ additionalColumns ++ nonPrimaryKeyColumns).mkString(",\n")

private def getChangesQuery(connectionOptions: ConnectionOptions,
mergeExpression: String,
columnStatement: String,
Expand Down Expand Up @@ -282,4 +311,4 @@ object QueryProvider:
.replace("{MERGE_KEY}", UPSERT_MERGE_KEY)
.replace("{DATE_PARTITION_EXPRESSION}", connectionOptions.partitionExpression.getOrElse(""))
.replace("{DATE_PARTITION_KEY}", DATE_PARTITION_KEY)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.sneaksanddata.arcane.framework
package services.mssql

import models.{DataCell, DataRow}
import services.mssql.MsSqlConnection.toArcaneType

import java.sql.{ResultSet, Statement}
import scala.annotation.tailrec
import scala.util.{Failure, Success, Try}

/**
* Represents the result of a query to a SQL database.
*/
trait QueryResult[Output] {

type OutputType = Output

/**
* Reads the result of the SQL query mapped to an output type.
*
* @return The result of the query.
*/
def read: OutputType

}

/**
* Lazy-list based implementation of [[QueryResult]].
*
* @param statement The statement used to execute the query.
* @param resultSet The result set of the query.
*/
class LazyQueryResult(statement: Statement, resultSet: ResultSet) extends QueryResult[LazyList[DataRow]] with AutoCloseable {

/**
* Reads the result of the query.
*
* @return The result of the query.
*/
override def read: this.OutputType =
val columns = resultSet.getMetaData.getColumnCount
LazyList.continually(resultSet)
.takeWhile(_.next())
.map(row => {
toDataRow(row, columns, List.empty) match {
case Success(dataRow) => dataRow
case Failure(exception) => throw exception
}
})


/**
* Closes the statement and the result set owned by this object.
* When a Statement object is closed, its current ResultSet object, if one exists, is also closed.
*/
override def close(): Unit = statement.close()

@tailrec
private def toDataRow(row: ResultSet, columns: Int, acc: DataRow): Try[DataRow] =
if columns == 0 then Success(acc)
else
val name = row.getMetaData.getColumnName(columns)
val value = row.getObject(columns)
val dataType = row.getMetaData.getColumnType(columns)
toArcaneType(dataType) match
case Success(arcaneType) => toDataRow(row, columns - 1, DataCell(name, arcaneType, value) :: acc)
case Failure(exception) => Failure(exception)
}

/**
* Companion object for [[LazyQueryResult]].
*/
object LazyQueryResult {

/**
* The output type of the query result.
*/
type OutputType = LazyList[DataRow]

/**
* Creates a new [[LazyQueryResult]] object.
*
* @param statement The statement used to execute the query.
* @param resultSet The result set of the query.
* @return The new [[LazyQueryResult]] object.
*/
def apply(statement: Statement, resultSet: ResultSet): LazyQueryResult = new LazyQueryResult(statement, resultSet)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.sneaksanddata.arcane.framework
package services.mssql

import java.sql.{Connection, ResultSet, Statement}
import scala.concurrent.{Future, blocking}

/**
* A class that runs a query on a SQL database.
* This class is intended to run a single query and traverse the results only once using the forward-only read-only
* cursor.
*
*/
class QueryRunner:

/**
* A factory for creating a QueryResult object from a statement and a result set.
*
* @tparam Output The type of the output of the query.
*/
private type ResultFactory[Output] = (Statement, ResultSet) => QueryResult[Output]

private implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global

/**
* Executes the given query on the given connection.
*
* The QueryResult object produced by this method must not outlive the connection object passed to it.
*
* @param query The query to execute.
* @param connection The connection to execute the query on.
* @return The result of the query.
*/
def executeQuery[Result](query: MsSqlQuery, connection: Connection, resultFactory: ResultFactory[Result]): Future[QueryResult[Result]] =
Future {
val statement = connection.createStatement()
val resultSet = blocking {
statement.executeQuery(query)
}
resultFactory(statement, resultSet)
}


object QueryRunner:
def apply(): QueryRunner = new QueryRunner()
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ class MsSqlConnectorsTests extends flatspec.AsyncFlatSpec with Matchers:
for i <- 1 to 10 do
val insertCmd = s"use arcane; insert into dbo.MsSqlConnectorsTests values($i, ${i+1})"
statement.execute(insertCmd)
statement.close()

val updateStatement = con.createStatement()
for i <- 1 to 10 do
val insertCmd = s"use arcane; insert into dbo.MsSqlConnectorsTests values(${i * 1000}, ${i * 1000 + 1})"
updateStatement.execute(insertCmd)


def removeDb(): Unit =
Expand Down Expand Up @@ -86,7 +92,7 @@ class MsSqlConnectorsTests extends flatspec.AsyncFlatSpec with Matchers:
val connector = MsSqlConnection(dbInfo.connectionOptions)
QueryProvider.getBackfillQuery(connector) map { query =>
query should (
include ("ct.SYS_CHANGE_VERSION") and include ("ARCANE_MERGE_KEY") and include("format(getdate(), 'yyyyMM')")
include ("SYS_CHANGE_VERSION") and include ("ARCANE_MERGE_KEY") and include("format(getdate(), 'yyyyMM')")
)
}
}
Expand All @@ -107,3 +113,26 @@ class MsSqlConnectorsTests extends flatspec.AsyncFlatSpec with Matchers:
fields should be(List(IntType, LongType, StringType, IntType, LongType, StringType, StringType))
}
}


"MsSqlConnection" should "return correct number of rows on backfill" in withDatabase { dbInfo =>
val connection = MsSqlConnection(dbInfo.connectionOptions)
for schema <- connection.getSchema
backfill <- connection.backfill(schema)
result = backfill.read.toList
yield {
result should have length 20
}
}


"MsSqlConnection" should "return correct number of columns on backfill" in withDatabase { dbInfo =>
val connection = MsSqlConnection(dbInfo.connectionOptions)
for schema <- connection.getSchema
backfill <- connection.backfill(schema)
result = backfill.read.toList
head = result.head
yield {
head should have length 7
}
}

0 comments on commit 666cfd4

Please sign in to comment.