diff --git a/framework/arcane-framework/src/main/resources/get_select_all_query.sql b/framework/arcane-framework/src/main/resources/get_select_all_query.sql index b9360cb..7949705 100644 --- a/framework/arcane-framework/src/main/resources/get_select_all_query.sql +++ b/framework/arcane-framework/src/main/resources/get_select_all_query.sql @@ -1,4 +1,4 @@ -declare @currentVersion bigint = CHANGE_TRACKING_CURRENT_VERSION() +declare @currentVersion bigint = CHANGE_TRACKING_CURRENT_VERSION() SELECT {ChangeTrackingColumnsStatement}, diff --git a/framework/arcane-framework/src/main/resources/get_select_all_query_date_partitioned.sql b/framework/arcane-framework/src/main/resources/get_select_all_query_date_partitioned.sql index 7e1fab2..d4244f8 100644 --- a/framework/arcane-framework/src/main/resources/get_select_all_query_date_partitioned.sql +++ b/framework/arcane-framework/src/main/resources/get_select_all_query_date_partitioned.sql @@ -1,4 +1,4 @@ -declare @currentVersion bigint = CHANGE_TRACKING_CURRENT_VERSION() +declare @currentVersion bigint = CHANGE_TRACKING_CURRENT_VERSION() SELECT {ChangeTrackingColumnsStatement}, diff --git a/framework/arcane-framework/src/main/scala/models/DataCell.scala b/framework/arcane-framework/src/main/scala/models/DataCell.scala new file mode 100644 index 0000000..76fd6ad --- /dev/null +++ b/framework/arcane-framework/src/main/scala/models/DataCell.scala @@ -0,0 +1,22 @@ +package com.sneaksanddata.arcane.framework +package models + +/** + * Represents a row of data. + */ +type DataRow = List[DataCell] + +/** + * Represents a row of data. + * + * @param name The name of the row. + * @param Type The type of the row. + * @param value The value of the row. + */ +case class DataCell(name: String, Type: ArcaneType, value: Any) + +/** + * Companion object for [[DataCell]]. + */ +object DataCell: + def apply(name: String, Type: ArcaneType, value: Any): DataCell = new DataCell(name, Type, value) \ No newline at end of file diff --git a/framework/arcane-framework/src/main/scala/services/mssql/MsSqlConnection.scala b/framework/arcane-framework/src/main/scala/services/mssql/MsSqlConnection.scala index f42f08b..0933cbd 100644 --- a/framework/arcane-framework/src/main/scala/services/mssql/MsSqlConnection.scala +++ b/framework/arcane-framework/src/main/scala/services/mssql/MsSqlConnection.scala @@ -3,7 +3,7 @@ package services.mssql import models.{ArcaneSchema, ArcaneType, Field} import services.base.{CanAdd, SchemaProvider} -import services.mssql.MsSqlConnection.{DATE_PARTITION_KEY, UPSERT_MERGE_KEY} +import services.mssql.MsSqlConnection.{DATE_PARTITION_KEY, UPSERT_MERGE_KEY, toArcaneType} import com.microsoft.sqlserver.jdbc.SQLServerDriver @@ -12,7 +12,7 @@ import java.util.Properties import scala.annotation.tailrec import scala.concurrent.{Future, blocking} import scala.io.Source -import scala.util.Using +import scala.util.{Success, Failure, Try, Using} /** * Represents a summary of a column in a table. @@ -100,7 +100,16 @@ class MsSqlConnection(val connectionOptions: ConnectionOptions) extends AutoClos override def getSchema: Future[this.SchemaType] = for query <- QueryProvider.getSchemaQuery(this) sqlSchema <- getSqlSchema(query) - yield toSchema(sqlSchema, empty) + yield toSchema(sqlSchema, empty) match + case Success(schema) => schema + case Failure(exception) => throw exception + + + def backfill(arcaneSchema: ArcaneSchema): Future[QueryResult[LazyQueryResult.OutputType]] = + for query <- QueryProvider.getBackfillQuery(this) + runner = QueryRunner() + result <- runner.executeQuery(query, connection, LazyQueryResult.apply) + yield result private def getSqlSchema(query: String): Future[SqlSchema] = Future { val columns = Using.Manager { use => @@ -115,12 +124,14 @@ class MsSqlConnection(val connectionOptions: ConnectionOptions) extends AutoClos } @tailrec - private def toSchema(sqlSchema: SqlSchema, schema: this.SchemaType): this.SchemaType = + private def toSchema(sqlSchema: SqlSchema, schema: this.SchemaType): Try[this.SchemaType] = sqlSchema match - case Nil => schema + case Nil => Success(schema) case x +: xs => val (name, fieldType) = x - toSchema(xs, schema.addField(name, toArcaneType(fieldType))) + toArcaneType(fieldType) match + case Success(arcaneType) => toSchema(xs, schema.addField(name, arcaneType)) + case Failure(exception) => Failure[this.SchemaType](exception) @tailrec private def readColumns(resultSet: ResultSet, result: List[ColumnSummary]): List[ColumnSummary] = @@ -130,23 +141,6 @@ class MsSqlConnection(val connectionOptions: ConnectionOptions) extends AutoClos return result readColumns(resultSet, result ++ List((resultSet.getString(1), resultSet.getInt(2) == 1))) - private def toArcaneType(sqlType: Int): ArcaneType = sqlType match - case java.sql.Types.BIGINT => ArcaneType.LongType - case java.sql.Types.BINARY => ArcaneType.ByteArrayType - case java.sql.Types.BIT => ArcaneType.BooleanType - case java.sql.Types.CHAR => ArcaneType.StringType - case java.sql.Types.DATE => ArcaneType.DateType - case java.sql.Types.TIMESTAMP => ArcaneType.TimestampType - case java.sql.Types.TIMESTAMP_WITH_TIMEZONE => ArcaneType.DateTimeOffsetType - case java.sql.Types.DECIMAL => ArcaneType.BigDecimalType - case java.sql.Types.DOUBLE => ArcaneType.DoubleType - case java.sql.Types.INTEGER => ArcaneType.IntType - case java.sql.Types.FLOAT => ArcaneType.FloatType - case java.sql.Types.SMALLINT => ArcaneType.ShortType - case java.sql.Types.TIME => ArcaneType.TimeType - case java.sql.Types.NCHAR => ArcaneType.StringType - case java.sql.Types.NVARCHAR => ArcaneType.StringType - object MsSqlConnection: /** * The key used to merge rows in the output table. @@ -166,6 +160,32 @@ object MsSqlConnection: */ def apply(connectionOptions: ConnectionOptions): MsSqlConnection = new MsSqlConnection(connectionOptions) + /** + * Converts a SQL type to an Arcane type. + * + * @param sqlType The SQL type. + * @return The Arcane type. + */ + def toArcaneType(sqlType: Int): Try[ArcaneType] = sqlType match + case java.sql.Types.BIGINT => Success(ArcaneType.LongType) + case java.sql.Types.BINARY => Success(ArcaneType.ByteArrayType) + case java.sql.Types.BIT => Success(ArcaneType.BooleanType) + case java.sql.Types.CHAR => Success(ArcaneType.StringType) + case java.sql.Types.DATE => Success(ArcaneType.DateType) + case java.sql.Types.TIMESTAMP => Success(ArcaneType.TimestampType) + case java.sql.Types.TIMESTAMP_WITH_TIMEZONE => Success(ArcaneType.DateTimeOffsetType) + case java.sql.Types.DECIMAL => Success(ArcaneType.BigDecimalType) + case java.sql.Types.DOUBLE => Success(ArcaneType.DoubleType) + case java.sql.Types.INTEGER => Success(ArcaneType.IntType) + case java.sql.Types.FLOAT => Success(ArcaneType.FloatType) + case java.sql.Types.SMALLINT => Success(ArcaneType.ShortType) + case java.sql.Types.TIME => Success(ArcaneType.TimeType) + case java.sql.Types.NCHAR => Success(ArcaneType.StringType) + case java.sql.Types.NVARCHAR => Success(ArcaneType.StringType) + case java.sql.Types.VARCHAR => Success(ArcaneType.StringType) + case _ => Failure(new IllegalArgumentException(s"Unsupported SQL type: $sqlType")) + + object QueryProvider: private implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global @@ -209,7 +229,7 @@ object QueryProvider: msSqlConnection.getColumnSummaries .map(columnSummaries => { val mergeExpression = QueryProvider.getMergeExpression(columnSummaries, "tq") - val columnExpression = QueryProvider.getChangeTrackingColumns(columnSummaries, "ct", "tq") + val columnExpression = QueryProvider.getChangeTrackingColumns(columnSummaries, "tq") QueryProvider.getAllQuery( msSqlConnection.connectionOptions, mergeExpression, @@ -243,6 +263,15 @@ object QueryProvider: .map((name, _) => s"$tableAlias.[$name]") (primaryKeyColumns ++ additionalColumns ++ nonPrimaryKeyColumns).mkString(",\n") + private def getChangeTrackingColumns(tableColumns: List[ColumnSummary], tableAlias: String): String = + val primaryKeyColumns = tableColumns.filter((_, isPrimaryKey) => isPrimaryKey).map((name, _) => s"$tableAlias.[$name]") + val additionalColumns = List("0 as SYS_CHANGE_VERSION", "'I' as SYS_CHANGE_OPERATION") + val nonPrimaryKeyColumns = tableColumns + .filter((name, isPrimaryKey) => !isPrimaryKey && !Set("SYS_CHANGE_VERSION", "SYS_CHANGE_OPERATION").contains(name)) + .map((name, _) => s"$tableAlias.[$name]") + + (primaryKeyColumns ++ additionalColumns ++ nonPrimaryKeyColumns).mkString(",\n") + private def getChangesQuery(connectionOptions: ConnectionOptions, mergeExpression: String, columnStatement: String, @@ -282,4 +311,4 @@ object QueryProvider: .replace("{MERGE_KEY}", UPSERT_MERGE_KEY) .replace("{DATE_PARTITION_EXPRESSION}", connectionOptions.partitionExpression.getOrElse("")) .replace("{DATE_PARTITION_KEY}", DATE_PARTITION_KEY) - } \ No newline at end of file + } diff --git a/framework/arcane-framework/src/main/scala/services/mssql/QueryResult.scala b/framework/arcane-framework/src/main/scala/services/mssql/QueryResult.scala new file mode 100644 index 0000000..2013b01 --- /dev/null +++ b/framework/arcane-framework/src/main/scala/services/mssql/QueryResult.scala @@ -0,0 +1,88 @@ +package com.sneaksanddata.arcane.framework +package services.mssql + +import models.{DataCell, DataRow} +import services.mssql.MsSqlConnection.toArcaneType + +import java.sql.{ResultSet, Statement} +import scala.annotation.tailrec +import scala.util.{Failure, Success, Try} + +/** + * Represents the result of a query to a SQL database. + */ +trait QueryResult[Output] { + + type OutputType = Output + + /** + * Reads the result of the SQL query mapped to an output type. + * + * @return The result of the query. + */ + def read: OutputType + +} + +/** + * Lazy-list based implementation of [[QueryResult]]. + * + * @param statement The statement used to execute the query. + * @param resultSet The result set of the query. + */ +class LazyQueryResult(statement: Statement, resultSet: ResultSet) extends QueryResult[LazyList[DataRow]] with AutoCloseable { + + /** + * Reads the result of the query. + * + * @return The result of the query. + */ + override def read: this.OutputType = + val columns = resultSet.getMetaData.getColumnCount + LazyList.continually(resultSet) + .takeWhile(_.next()) + .map(row => { + toDataRow(row, columns, List.empty) match { + case Success(dataRow) => dataRow + case Failure(exception) => throw exception + } + }) + + + /** + * Closes the statement and the result set owned by this object. + * When a Statement object is closed, its current ResultSet object, if one exists, is also closed. + */ + override def close(): Unit = statement.close() + + @tailrec + private def toDataRow(row: ResultSet, columns: Int, acc: DataRow): Try[DataRow] = + if columns == 0 then Success(acc) + else + val name = row.getMetaData.getColumnName(columns) + val value = row.getObject(columns) + val dataType = row.getMetaData.getColumnType(columns) + toArcaneType(dataType) match + case Success(arcaneType) => toDataRow(row, columns - 1, DataCell(name, arcaneType, value) :: acc) + case Failure(exception) => Failure(exception) +} + +/** + * Companion object for [[LazyQueryResult]]. + */ +object LazyQueryResult { + + /** + * The output type of the query result. + */ + type OutputType = LazyList[DataRow] + + /** + * Creates a new [[LazyQueryResult]] object. + * + * @param statement The statement used to execute the query. + * @param resultSet The result set of the query. + * @return The new [[LazyQueryResult]] object. + */ + def apply(statement: Statement, resultSet: ResultSet): LazyQueryResult = new LazyQueryResult(statement, resultSet) +} diff --git a/framework/arcane-framework/src/main/scala/services/mssql/QueryRunner.scala b/framework/arcane-framework/src/main/scala/services/mssql/QueryRunner.scala new file mode 100644 index 0000000..8a26990 --- /dev/null +++ b/framework/arcane-framework/src/main/scala/services/mssql/QueryRunner.scala @@ -0,0 +1,44 @@ +package com.sneaksanddata.arcane.framework +package services.mssql + +import java.sql.{Connection, ResultSet, Statement} +import scala.concurrent.{Future, blocking} + +/** + * A class that runs a query on a SQL database. + * This class is intended to run a single query and traverse the results only once using the forward-only read-only + * cursor. + * + */ +class QueryRunner: + + /** + * A factory for creating a QueryResult object from a statement and a result set. + * + * @tparam Output The type of the output of the query. + */ + private type ResultFactory[Output] = (Statement, ResultSet) => QueryResult[Output] + + private implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global + + /** + * Executes the given query on the given connection. + * + * The QueryResult object produced by this method must not outlive the connection object passed to it. + * + * @param query The query to execute. + * @param connection The connection to execute the query on. + * @return The result of the query. + */ + def executeQuery[Result](query: MsSqlQuery, connection: Connection, resultFactory: ResultFactory[Result]): Future[QueryResult[Result]] = + Future { + val statement = connection.createStatement() + val resultSet = blocking { + statement.executeQuery(query) + } + resultFactory(statement, resultSet) + } + + +object QueryRunner: + def apply(): QueryRunner = new QueryRunner() \ No newline at end of file diff --git a/framework/arcane-framework/src/test/scala/services/connectors/mssql/MsSqlConnectorsTests.scala b/framework/arcane-framework/src/test/scala/services/connectors/mssql/MsSqlConnectorsTests.scala index bee733c..caf10e0 100644 --- a/framework/arcane-framework/src/test/scala/services/connectors/mssql/MsSqlConnectorsTests.scala +++ b/framework/arcane-framework/src/test/scala/services/connectors/mssql/MsSqlConnectorsTests.scala @@ -51,6 +51,12 @@ class MsSqlConnectorsTests extends flatspec.AsyncFlatSpec with Matchers: for i <- 1 to 10 do val insertCmd = s"use arcane; insert into dbo.MsSqlConnectorsTests values($i, ${i+1})" statement.execute(insertCmd) + statement.close() + + val updateStatement = con.createStatement() + for i <- 1 to 10 do + val insertCmd = s"use arcane; insert into dbo.MsSqlConnectorsTests values(${i * 1000}, ${i * 1000 + 1})" + updateStatement.execute(insertCmd) def removeDb(): Unit = @@ -86,7 +92,7 @@ class MsSqlConnectorsTests extends flatspec.AsyncFlatSpec with Matchers: val connector = MsSqlConnection(dbInfo.connectionOptions) QueryProvider.getBackfillQuery(connector) map { query => query should ( - include ("ct.SYS_CHANGE_VERSION") and include ("ARCANE_MERGE_KEY") and include("format(getdate(), 'yyyyMM')") + include ("SYS_CHANGE_VERSION") and include ("ARCANE_MERGE_KEY") and include("format(getdate(), 'yyyyMM')") ) } } @@ -107,3 +113,26 @@ class MsSqlConnectorsTests extends flatspec.AsyncFlatSpec with Matchers: fields should be(List(IntType, LongType, StringType, IntType, LongType, StringType, StringType)) } } + + + "MsSqlConnection" should "return correct number of rows on backfill" in withDatabase { dbInfo => + val connection = MsSqlConnection(dbInfo.connectionOptions) + for schema <- connection.getSchema + backfill <- connection.backfill(schema) + result = backfill.read.toList + yield { + result should have length 20 + } + } + + + "MsSqlConnection" should "return correct number of columns on backfill" in withDatabase { dbInfo => + val connection = MsSqlConnection(dbInfo.connectionOptions) + for schema <- connection.getSchema + backfill <- connection.backfill(schema) + result = backfill.read.toList + head = result.head + yield { + head should have length 7 + } + }