From 35a58e50104f42d1a9f5b2792b6d0bfb626adbe2 Mon Sep 17 00:00:00 2001 From: Vitalii Savitskii Date: Mon, 4 Nov 2024 11:04:34 +0100 Subject: [PATCH] Add backfill generation methods (#73) ## Scope Part of #61 This PR adds the method that generates backfill queries for the MS SQL source --- .../main/resources/get_select_all_query.sql | 7 +++++ .../get_select_all_query_date_partitioned.sql | 8 +++++ .../services/mssql/MsSqlConnection.scala | 31 +++++++++++++++++++ .../mssql/MsSqlConnectorsTests.scala | 9 ++++++ 4 files changed, 55 insertions(+) create mode 100644 framework/arcane-framework/src/main/resources/get_select_all_query.sql create mode 100644 framework/arcane-framework/src/main/resources/get_select_all_query_date_partitioned.sql diff --git a/framework/arcane-framework/src/main/resources/get_select_all_query.sql b/framework/arcane-framework/src/main/resources/get_select_all_query.sql new file mode 100644 index 0000000..b9360cb --- /dev/null +++ b/framework/arcane-framework/src/main/resources/get_select_all_query.sql @@ -0,0 +1,7 @@ +declare @currentVersion bigint = CHANGE_TRACKING_CURRENT_VERSION() + +SELECT +{ChangeTrackingColumnsStatement}, +@currentVersion AS 'ChangeTrackingVersion', +lower(convert(nvarchar(128), HashBytes('SHA2_256', {MERGE_EXPRESSION}),2)) as [{MERGE_KEY}] +FROM [{dbName}].[{schema}].[{tableName}] tq diff --git a/framework/arcane-framework/src/main/resources/get_select_all_query_date_partitioned.sql b/framework/arcane-framework/src/main/resources/get_select_all_query_date_partitioned.sql new file mode 100644 index 0000000..7e1fab2 --- /dev/null +++ b/framework/arcane-framework/src/main/resources/get_select_all_query_date_partitioned.sql @@ -0,0 +1,8 @@ +declare @currentVersion bigint = CHANGE_TRACKING_CURRENT_VERSION() + +SELECT +{ChangeTrackingColumnsStatement}, +@currentVersion AS 'ChangeTrackingVersion', +lower(convert(nvarchar(128), HashBytes('SHA2_256', {MERGE_EXPRESSION}),2)) as [{MERGE_KEY}], +{DATE_PARTITION_EXPRESSION} as [{DATE_PARTITION_KEY}] +FROM [{dbName}].[{schema}].[{tableName}] tq diff --git a/framework/arcane-framework/src/main/scala/services/mssql/MsSqlConnection.scala b/framework/arcane-framework/src/main/scala/services/mssql/MsSqlConnection.scala index 35641a7..f42f08b 100644 --- a/framework/arcane-framework/src/main/scala/services/mssql/MsSqlConnection.scala +++ b/framework/arcane-framework/src/main/scala/services/mssql/MsSqlConnection.scala @@ -205,6 +205,17 @@ object QueryProvider: .replace("{schema}", schemaName) .replace("{table}", tableName) + def getBackfillQuery(msSqlConnection: MsSqlConnection): Future[MsSqlQuery] = + msSqlConnection.getColumnSummaries + .map(columnSummaries => { + val mergeExpression = QueryProvider.getMergeExpression(columnSummaries, "tq") + val columnExpression = QueryProvider.getChangeTrackingColumns(columnSummaries, "ct", "tq") + QueryProvider.getAllQuery( + msSqlConnection.connectionOptions, + mergeExpression, + columnExpression) + }) + private def getMergeExpression(cs: List[ColumnSummary], tableAlias: String): String = cs.filter((name, isPrimaryKey) => isPrimaryKey) .map((name, _) => s"cast($tableAlias.[$name] as nvarchar(128))") @@ -252,3 +263,23 @@ object QueryProvider: .replace("{DATE_PARTITION_EXPRESSION}", connectionOptions.partitionExpression.getOrElse("")) .replace("{DATE_PARTITION_KEY}", DATE_PARTITION_KEY) .replace("{lastId}", changeTrackingId.toString) + + private def getAllQuery(connectionOptions: ConnectionOptions, + mergeExpression: String, + columnExpression: String): String = { + + val baseQuery = connectionOptions.partitionExpression match { + case Some(_) => Source.fromResource("get_select_all_query_date_partitioned.sql").getLines.mkString("\n") + case None => Source.fromResource("get_select_all_query.sql").getLines.mkString("\n") + } + + baseQuery + .replace("{dbName}", connectionOptions.databaseName) + .replace("{schema}", connectionOptions.schemaName) + .replace("{tableName}", connectionOptions.tableName) + .replace("{ChangeTrackingColumnsStatement}", columnExpression) + .replace("{MERGE_EXPRESSION}", mergeExpression) + .replace("{MERGE_KEY}", UPSERT_MERGE_KEY) + .replace("{DATE_PARTITION_EXPRESSION}", connectionOptions.partitionExpression.getOrElse("")) + .replace("{DATE_PARTITION_KEY}", DATE_PARTITION_KEY) + } \ No newline at end of file diff --git a/framework/arcane-framework/src/test/scala/services/connectors/mssql/MsSqlConnectorsTests.scala b/framework/arcane-framework/src/test/scala/services/connectors/mssql/MsSqlConnectorsTests.scala index 9556160..bee733c 100644 --- a/framework/arcane-framework/src/test/scala/services/connectors/mssql/MsSqlConnectorsTests.scala +++ b/framework/arcane-framework/src/test/scala/services/connectors/mssql/MsSqlConnectorsTests.scala @@ -82,6 +82,15 @@ class MsSqlConnectorsTests extends flatspec.AsyncFlatSpec with Matchers: } } + "QueryProvider" should "generate backfill query" in withDatabase { dbInfo => + val connector = MsSqlConnection(dbInfo.connectionOptions) + QueryProvider.getBackfillQuery(connector) map { query => + query should ( + include ("ct.SYS_CHANGE_VERSION") and include ("ARCANE_MERGE_KEY") and include("format(getdate(), 'yyyyMM')") + ) + } + } + "MsSqlConnection" should "be able to extract schema column names from the database" in withDatabase { dbInfo => val connection = MsSqlConnection(dbInfo.connectionOptions) connection.getSchema map { schema =>