diff --git a/build.sbt b/build.sbt
index f619e9c..c328161 100644
--- a/build.sbt
+++ b/build.sbt
@@ -18,7 +18,7 @@ lazy val plugin = (project in file("."))
name := "arcane-stream-microsoft-synapse-link",
idePackagePrefix := Some("com.sneaksanddata.arcane.microsoft_synapse_link"),
- libraryDependencies += "com.sneaksanddata" % "arcane-framework_3" % "0.2.0",
+ libraryDependencies += "com.sneaksanddata" % "arcane-framework_3" % "0.2.1",
libraryDependencies += "com.azure" % "azure-core-http-okhttp" % "1.12.1",
libraryDependencies += "io.netty" % "netty-tcnative-boringssl-static" % "2.0.65.Final",
diff --git a/integration-tests.env b/integration-tests.env
index b0905de..201181c 100644
--- a/integration-tests.env
+++ b/integration-tests.env
@@ -1,5 +1,5 @@
STREAMCONTEXT__BACKFILL=false
-STREAMCONTEXT__SPEC='{ "backfillJobTemplateRef": { "apiGroup": "streaming.sneaksanddata.com", "kind": "StreamingJobTemplate", "name": "arcane-stream-microsoft-synapse-link-large-job" }, "groupingIntervalSeconds": 1, "groupsPerFile": 1, "httpClientMaxRetries": 3, "httpClientRetryDelaySeconds": 1, "jobTemplateRef": { "apiGroup": "streaming.sneaksanddata.com", "kind": "StreamingJobTemplate", "name": "arcane-stream-microsoft-synapse-link-standard-job" }, "lookBackInterval": 7600, "partitionExpression": "", "rowsPerGroup": 10000, "schemaUpdateIntervalSeconds": 10, "sinkSettings": { "archiveTableName": "iceberg.test.archive_test", "optimizeSettings": { "batchThreshold": 60, "fileSizeThreshold": "512MB" }, "orphanFilesExpirationSettings": { "batchThreshold": 60, "retentionThreshold": "6h" }, "snapshotExpirationSettings": { "batchThreshold": 60, "retentionThreshold": "6h" }, "targetTableName": "iceberg.test.test" }, "sourceSettings": { "baseLocation": "abfss://cdm-e2e@devstoreaccount1.dfs.core.windows.net/", "changeCaptureIntervalSeconds": 5, "name": "synapsetable" }, "stagingDataSettings": { "catalog": { "catalogName": "iceberg", "catalogUri": "http://localhost:8181/api/catalog", "namespace": "test", "schemaName": "test", "warehouse": "polaris" }, "dataLocation": "s3://tmp/polaris/test", "tableNamePrefix": "staging_inventtrans" }}'
+STREAMCONTEXT__SPEC='{ "backfillJobTemplateRef": { "apiGroup": "streaming.sneaksanddata.com", "kind": "StreamingJobTemplate", "name": "arcane-stream-microsoft-synapse-link-large-job" }, "groupingIntervalSeconds": 1, "groupsPerFile": 1, "httpClientMaxRetries": 3, "httpClientRetryDelaySeconds": 1, "jobTemplateRef": { "apiGroup": "streaming.sneaksanddata.com", "kind": "StreamingJobTemplate", "name": "arcane-stream-microsoft-synapse-link-standard-job" }, "lookBackInterval": 21000, "partitionExpression": "", "rowsPerGroup": 10000, "schemaUpdateIntervalSeconds": 10, "sinkSettings": { "archiveTableName": "iceberg.test.archive_test", "optimizeSettings": { "batchThreshold": 60, "fileSizeThreshold": "512MB" }, "orphanFilesExpirationSettings": { "batchThreshold": 60, "retentionThreshold": "6h" }, "snapshotExpirationSettings": { "batchThreshold": 60, "retentionThreshold": "6h" }, "targetTableName": "iceberg.test.test" }, "sourceSettings": { "baseLocation": "abfss://cdm-e2e@devstoreaccount1.dfs.core.windows.net/", "changeCaptureIntervalSeconds": 5, "name": "synapsetable" }, "stagingDataSettings": { "catalog": { "catalogName": "iceberg", "catalogUri": "http://localhost:8181/api/catalog", "namespace": "test", "schemaName": "test", "warehouse": "polaris" }, "dataLocation": "s3://tmp/polaris/test", "tableNamePrefix": "staging_inventtrans" }}'
STREAMCONTEXT__STREAM_ID=test
STREAMCONTEXT__STREAM_KIND=CdmChangeFeed
APPLICATION_VERSION=0.0.1
diff --git a/populate-cdm-container.py b/populate-cdm-container.py
index 946e37f..7bb93ad 100644
--- a/populate-cdm-container.py
+++ b/populate-cdm-container.py
@@ -22,11 +22,18 @@
import os
-CONTENT = """50bff458-d47a-4924-804b-31c0a83108e6,"1/1/2020 0:00:00 PM","1/1/2020 0:00:00 PM",0,1111000000,1111000010,"F1234567",1,,"2020-01-01T00:15:00.0000000Z","acc1",111111110,"2020-01-01T00:15:00.0000000Z","acc1",0,"dat",1,1111000001,2111000001,1111000001,21111,2111000001,"2020-01-01T00:15:00.0000000+00:00","2020-01-01T00:15:00.0000000Z",
-5b4bc74e-2132-4d8e-8572-48ce4260f182,"1/1/2020 0:00:01 PM","1/1/2020 0:00:01 PM",0,1111000001,1111000011,"F1234568",1,,"2020-01-01T00:16:00.0000000Z","acc2",111111111,"2020-01-01T00:16:00.0000000Z","acc2",0,"dat",1,1111000002,2111000002,1111000001,21111,2111000001,"2020-01-01T00:16:00.0000000+00:00","2020-01-01T00:16:00.0000000Z",
-aae2094d-cd17-42b4-891e-3b268e2b6713,"1/1/2020 0:00:01 PM","1/1/2020 0:00:01 PM",0,1111000002,1111000012,"F1234569",1,,"2020-01-01T00:17:00.0000000Z","acc2",111111112,"2020-01-01T00:17:00.0000000Z","acc2",0,"dat",1,1111000003,2111000003,1111000001,21111,2111000001,"2020-01-01T00:17:00.0000000+00:00","2020-01-01T00:17:00.0000000Z",
-9633be9a-c485-4afa-8bb7-4ba380eaa206,"1/1/2020 0:00:01 PM","1/1/2020 0:00:01 PM",0,1111000003,1111000013,"F1234578",1,,"2020-01-01T00:18:00.0000000Z","acc1",111111113,"2020-01-01T00:18:00.0000000Z","acc1",0,"dat",1,1111000004,2111000004,1111000001,21111,2111000001,"2020-01-01T00:18:00.0000000+00:00","2020-01-01T00:18:00.0000000Z",
-b62c7b67-b8f8-4635-8cef-1c23591d5c4c,"1/1/2020 0:00:01 PM","1/1/2020 0:00:01 PM",0,1111000004,1111000014,"F1234511",1,,"2020-01-01T00:19:00.0000000Z","acc2",111111114,"2020-01-01T00:19:00.0000000Z","acc2",0,"dat",1,1111000005,2111000005,1111000001,21111,2111000001,"2020-01-01T00:19:00.0000000+00:00","2020-01-01T00:19:00.0000000Z",
+CONTENT = """10bff458-d47a-4924-804b-31c0a83108e6,"1/1/2020 0:00:00 PM","1/1/2020 0:00:00 PM",0,1111000000,1111000010,"F1234567",1,,"2020-01-01T00:15:00.0000000Z","acc1",111111110,"2020-01-01T00:15:00.0000000Z","acc1",0,"dat",1,1111000001,2111000001,1111000001,21111,2111000001,"2020-01-01T00:15:00.0000000+00:00","2020-01-01T00:15:00.0000000Z",
+2b4bc74e-2132-4d8e-8572-48ce4260f182,"1/1/2020 0:00:01 PM","1/1/2020 0:00:01 PM",0,1111000001,1111000011,"F1234568",1,,"2020-01-01T00:16:00.0000000Z","acc2",111111111,"2020-01-01T00:16:00.0000000Z","acc2",0,"dat",1,1111000002,2111000002,1111000001,21111,2111000001,"2020-01-01T00:16:00.0000000+00:00","2020-01-01T00:16:00.0000000Z",
+3ae2094d-cd17-42b4-891e-3b268e2b6713,"1/1/2020 0:00:01 PM","1/1/2020 0:00:01 PM",0,1111000002,1111000012,"F1234569",1,,"2020-01-01T00:17:00.0000000Z","acc2",111111112,"2020-01-01T00:17:00.0000000Z","acc2",0,"dat",1,1111000003,2111000003,1111000001,21111,2111000001,"2020-01-01T00:17:00.0000000+00:00","2020-01-01T00:17:00.0000000Z",
+4633be9a-c485-4afa-8bb7-4ba380eaa206,"1/1/2020 0:00:01 PM","1/1/2020 0:00:01 PM",0,1111000003,1111000013,"F1234578",1,,"2020-01-01T00:18:00.0000000Z","acc1",111111113,"2020-01-01T00:18:00.0000000Z","acc1",0,"dat",1,1111000004,2111000004,1111000001,21111,2111000001,"2020-01-01T00:18:00.0000000+00:00","2020-01-01T00:18:00.0000000Z",
+562c7b67-b8f8-4635-8cef-1c23591d5c4c,"1/1/2020 0:00:01 PM","1/1/2020 0:00:01 PM",0,1111000004,1111000014,"F1234511",1,,"2020-01-01T00:19:00.0000000Z","acc2",111111114,"2020-01-01T00:19:00.0000000Z","acc2",0,"dat",1,1111000005,2111000005,1111000001,21111,2111000001,"2020-01-01T00:19:00.0000000+00:00","2020-01-01T00:19:00.0000000Z",
+"""
+
+CONTENT_MODIFIED = """60bff458-d47a-4924-804b-31c0a83108e6,"1/1/2020 0:00:00 PM","1/1/2020 0:00:00 PM",0,1111000010,"F1234567",1,,"2020-01-01T00:15:00.0000000Z","acc1",111111110,"2020-01-01T00:15:00.0000000Z","acc1",0,"dat",1,1111000001,2111000001,1111000001,21111,2111000001,"2020-01-01T00:15:00.0000000+00:00","2020-01-01T00:15:00.0000000Z",
+7b4bc74e-2132-4d8e-8572-48ce4260f182,"1/1/2020 0:00:01 PM","1/1/2020 0:00:01 PM",0,1111000001,"F1234568",1,,"2020-01-01T00:16:00.0000000Z","acc2",111111111,"2020-01-01T00:16:00.0000000Z","acc2",0,"dat",1,1111000002,2111000002,1111000001,21111,2111000001,"2020-01-01T00:16:00.0000000+00:00","2020-01-01T00:16:00.0000000Z",
+8ae2094d-cd17-42b4-891e-3b268e2b6713,"1/1/2020 0:00:01 PM","1/1/2020 0:00:01 PM",0,1111000002,"F1234569",1,,"2020-01-01T00:17:00.0000000Z","acc2",111111112,"2020-01-01T00:17:00.0000000Z","acc2",0,"dat",1,1111000003,2111000003,1111000001,21111,2111000001,"2020-01-01T00:17:00.0000000+00:00","2020-01-01T00:17:00.0000000Z",
+9633be9a-c485-4afa-8bb7-4ba380eaa206,"1/1/2020 0:00:01 PM","1/1/2020 0:00:01 PM",0,1111000003,"F1234578",1,,"2020-01-01T00:18:00.0000000Z","acc1",111111113,"2020-01-01T00:18:00.0000000Z","acc1",0,"dat",1,1111000004,2111000004,1111000001,21111,2111000001,"2020-01-01T00:18:00.0000000+00:00","2020-01-01T00:18:00.0000000Z",
+102c7b67-b8f8-4635-8cef-1c23591d5c4c,"1/1/2020 0:00:01 PM","1/1/2020 0:00:01 PM",0,1111000004,"F1234511",1,,"2020-01-01T00:19:00.0000000Z","acc2",111111114,"2020-01-01T00:19:00.0000000Z","acc2",0,"dat",1,1111000005,2111000005,1111000001,21111,2111000001,"2020-01-01T00:19:00.0000000+00:00","2020-01-01T00:19:00.0000000Z",
"""
MODEL_JSON = """{
@@ -467,11 +474,610 @@
"dataType": "int64",
"maxLength": -1
},
- {
- "name": "dimensionattributevalue",
- "dataType": "int64",
- "maxLength": -1
- },
+ {
+ "name": "dimensionattributevalue",
+ "dataType": "int64",
+ "maxLength": -1
+ },
+ {
+ "name": "dimensionattributevaluegroup",
+ "dataType": "int64",
+ "maxLength": -1
+ },
+ {
+ "name": "displayvalue",
+ "dataType": "string",
+ "maxLength": 30,
+ "cdm:traits": [
+ {
+ "traitReference": "is.constrained",
+ "arguments": [
+ {
+ "name": "maximumLength",
+ "value": 30
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "ordinal",
+ "dataType": "int64",
+ "maxLength": -1
+ },
+ {
+ "name": "backingrecorddataareaid",
+ "dataType": "string",
+ "maxLength": 4,
+ "cdm:traits": [
+ {
+ "traitReference": "is.constrained",
+ "arguments": [
+ {
+ "name": "maximumLength",
+ "value": 4
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "modifieddatetime",
+ "dataType": "dateTime",
+ "maxLength": -1
+ },
+ {
+ "name": "modifiedby",
+ "dataType": "string",
+ "maxLength": 20,
+ "cdm:traits": [
+ {
+ "traitReference": "is.constrained",
+ "arguments": [
+ {
+ "name": "maximumLength",
+ "value": 20
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "modifiedtransactionid",
+ "dataType": "int64",
+ "maxLength": -1
+ },
+ {
+ "name": "createddatetime",
+ "dataType": "dateTime",
+ "maxLength": -1
+ },
+ {
+ "name": "createdby",
+ "dataType": "string",
+ "maxLength": 20,
+ "cdm:traits": [
+ {
+ "traitReference": "is.constrained",
+ "arguments": [
+ {
+ "name": "maximumLength",
+ "value": 20
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "createdtransactionid",
+ "dataType": "int64",
+ "maxLength": -1
+ },
+ {
+ "name": "dataareaid",
+ "dataType": "string",
+ "maxLength": 4,
+ "cdm:traits": [
+ {
+ "traitReference": "is.constrained",
+ "arguments": [
+ {
+ "name": "maximumLength",
+ "value": 4
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "recversion",
+ "dataType": "int64",
+ "maxLength": -1
+ },
+ {
+ "name": "partition",
+ "dataType": "int64",
+ "maxLength": -1
+ },
+ {
+ "name": "sysrowversion",
+ "dataType": "int64",
+ "maxLength": -1
+ },
+ {
+ "name": "recid",
+ "dataType": "int64",
+ "maxLength": -1
+ },
+ {
+ "name": "tableid",
+ "dataType": "int64",
+ "maxLength": -1
+ },
+ {
+ "name": "versionnumber",
+ "dataType": "int64",
+ "maxLength": -1
+ },
+ {
+ "name": "createdon",
+ "dataType": "dateTimeOffset",
+ "maxLength": -1
+ },
+ {
+ "name": "modifiedon",
+ "dataType": "dateTime",
+ "maxLength": -1
+ },
+ {
+ "name": "IsDelete",
+ "dataType": "boolean",
+ "maxLength": -1
+ }
+ ],
+ "partitions": []
+ }
+ ]
+ }"""
+
+MODEL_JSON_MODIFIED = """{
+ "name": "cdm",
+ "description": "cdm",
+ "version": "1.0",
+ "entities": [
+ {
+ "$type": "LocalEntity",
+ "name": "currency",
+ "description": "currency",
+ "annotations": [
+ {
+ "name": "Athena:PartitionGranularity",
+ "value": "Year"
+ },
+ {
+ "name": "Athena:InitialSyncState",
+ "value": "Completed"
+ },
+ {
+ "name": "Athena:InitialSyncDataCompletedTime",
+ "value": "1/1/2020 0:00:00 PM"
+ }
+ ],
+ "attributes": [
+ {
+ "name": "Id",
+ "dataType": "guid",
+ "maxLength": -1
+ },
+ {
+ "name": "SinkCreatedOn",
+ "dataType": "dateTime",
+ "maxLength": -1
+ },
+ {
+ "name": "SinkModifiedOn",
+ "dataType": "dateTime",
+ "maxLength": -1
+ },
+ {
+ "name": "iseuro",
+ "dataType": "int64",
+ "maxLength": -1
+ },
+ {
+ "name": "roundofftypeassetdep_jp",
+ "dataType": "int64",
+ "maxLength": -1
+ },
+ {
+ "name": "roundofftypeprice",
+ "dataType": "int64",
+ "maxLength": -1
+ },
+ {
+ "name": "roundofftypepurch",
+ "dataType": "int64",
+ "maxLength": -1
+ },
+ {
+ "name": "roundofftypesales",
+ "dataType": "int64",
+ "maxLength": -1
+ },
+ {
+ "name": "ltmroundofftypelineamount",
+ "dataType": "int64",
+ "maxLength": -1
+ },
+ {
+ "name": "sysdatastatecode",
+ "dataType": "int64",
+ "maxLength": -1
+ },
+ {
+ "name": "currencycode",
+ "dataType": "string",
+ "maxLength": 3,
+ "cdm:traits": [
+ {
+ "traitReference": "is.constrained",
+ "arguments": [
+ {
+ "name": "maximumLength",
+ "value": 3
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "currencycodeiso",
+ "dataType": "string",
+ "maxLength": 3,
+ "cdm:traits": [
+ {
+ "traitReference": "is.constrained",
+ "arguments": [
+ {
+ "name": "maximumLength",
+ "value": 3
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "roundingprecision",
+ "dataType": "decimal",
+ "maxLength": -1,
+ "cdm:traits": [
+ {
+ "traitReference": "is.dataFormat.numeric.shaped",
+ "arguments": [
+ {
+ "name": "precision",
+ "value": 38
+ },
+ {
+ "name": "scale",
+ "value": 6
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "roundoffassetdep_jp",
+ "dataType": "decimal",
+ "maxLength": -1,
+ "cdm:traits": [
+ {
+ "traitReference": "is.dataFormat.numeric.shaped",
+ "arguments": [
+ {
+ "name": "precision",
+ "value": 38
+ },
+ {
+ "name": "scale",
+ "value": 6
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "roundoffprice",
+ "dataType": "decimal",
+ "maxLength": -1,
+ "cdm:traits": [
+ {
+ "traitReference": "is.dataFormat.numeric.shaped",
+ "arguments": [
+ {
+ "name": "precision",
+ "value": 38
+ },
+ {
+ "name": "scale",
+ "value": 6
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "roundoffpurch",
+ "dataType": "decimal",
+ "maxLength": -1,
+ "cdm:traits": [
+ {
+ "traitReference": "is.dataFormat.numeric.shaped",
+ "arguments": [
+ {
+ "name": "precision",
+ "value": 38
+ },
+ {
+ "name": "scale",
+ "value": 6
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "roundoffsales",
+ "dataType": "decimal",
+ "maxLength": -1,
+ "cdm:traits": [
+ {
+ "traitReference": "is.dataFormat.numeric.shaped",
+ "arguments": [
+ {
+ "name": "precision",
+ "value": 38
+ },
+ {
+ "name": "scale",
+ "value": 6
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "symbol",
+ "dataType": "string",
+ "maxLength": 5,
+ "cdm:traits": [
+ {
+ "traitReference": "is.constrained",
+ "arguments": [
+ {
+ "name": "maximumLength",
+ "value": 5
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "txt",
+ "dataType": "string",
+ "maxLength": 120,
+ "cdm:traits": [
+ {
+ "traitReference": "is.constrained",
+ "arguments": [
+ {
+ "name": "maximumLength",
+ "value": 120
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "exchratemaxvariationpercent_mx",
+ "dataType": "decimal",
+ "maxLength": -1,
+ "cdm:traits": [
+ {
+ "traitReference": "is.dataFormat.numeric.shaped",
+ "arguments": [
+ {
+ "name": "precision",
+ "value": 38
+ },
+ {
+ "name": "scale",
+ "value": 6
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "decimalscount_mx",
+ "dataType": "int64",
+ "maxLength": -1
+ },
+ {
+ "name": "ltmroundofflineamount",
+ "dataType": "decimal",
+ "maxLength": -1,
+ "cdm:traits": [
+ {
+ "traitReference": "is.dataFormat.numeric.shaped",
+ "arguments": [
+ {
+ "name": "precision",
+ "value": 38
+ },
+ {
+ "name": "scale",
+ "value": 6
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "modifieddatetime",
+ "dataType": "dateTime",
+ "maxLength": -1
+ },
+ {
+ "name": "modifiedby",
+ "dataType": "string",
+ "maxLength": 20,
+ "cdm:traits": [
+ {
+ "traitReference": "is.constrained",
+ "arguments": [
+ {
+ "name": "maximumLength",
+ "value": 20
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "modifiedtransactionid",
+ "dataType": "int64",
+ "maxLength": -1
+ },
+ {
+ "name": "createddatetime",
+ "dataType": "dateTime",
+ "maxLength": -1
+ },
+ {
+ "name": "createdby",
+ "dataType": "string",
+ "maxLength": 20,
+ "cdm:traits": [
+ {
+ "traitReference": "is.constrained",
+ "arguments": [
+ {
+ "name": "maximumLength",
+ "value": 20
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "createdtransactionid",
+ "dataType": "int64",
+ "maxLength": -1
+ },
+ {
+ "name": "dataareaid",
+ "dataType": "string",
+ "maxLength": 4,
+ "cdm:traits": [
+ {
+ "traitReference": "is.constrained",
+ "arguments": [
+ {
+ "name": "maximumLength",
+ "value": 4
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "recversion",
+ "dataType": "int64",
+ "maxLength": -1
+ },
+ {
+ "name": "partition",
+ "dataType": "int64",
+ "maxLength": -1
+ },
+ {
+ "name": "sysrowversion",
+ "dataType": "int64",
+ "maxLength": -1
+ },
+ {
+ "name": "recid",
+ "dataType": "int64",
+ "maxLength": -1
+ },
+ {
+ "name": "tableid",
+ "dataType": "int64",
+ "maxLength": -1
+ },
+ {
+ "name": "versionnumber",
+ "dataType": "int64",
+ "maxLength": -1
+ },
+ {
+ "name": "createdon",
+ "dataType": "dateTimeOffset",
+ "maxLength": -1
+ },
+ {
+ "name": "modifiedon",
+ "dataType": "dateTime",
+ "maxLength": -1
+ },
+ {
+ "name": "IsDelete",
+ "dataType": "boolean",
+ "maxLength": -1
+ }
+ ],
+ "partitions": []
+ },
+ {
+ "$type": "LocalEntity",
+ "name": "synapsetable",
+ "description": "synapsetable",
+ "annotations": [
+ {
+ "name": "Athena:PartitionGranularity",
+ "value": "Year"
+ },
+ {
+ "name": "Athena:InitialSyncState",
+ "value": "Completed"
+ },
+ {
+ "name": "Athena:InitialSyncDataCompletedTime",
+ "value": "1/1/2020 0:00:00 PM"
+ }
+ ],
+ "attributes": [
+ {
+ "name": "Id",
+ "dataType": "guid",
+ "maxLength": -1
+ },
+ {
+ "name": "SinkCreatedOn",
+ "dataType": "dateTime",
+ "maxLength": -1
+ },
+ {
+ "name": "SinkModifiedOn",
+ "dataType": "dateTime",
+ "maxLength": -1
+ },
+ {
+ "name": "sysdatastatecode",
+ "dataType": "int64",
+ "maxLength": -1
+ },
{
"name": "dimensionattributevaluegroup",
"dataType": "int64",
@@ -655,13 +1261,19 @@ def create_container():
except Exception as e:
print(e)
-def create_blobs():
+def create_blobs(model_json, content, folders):
blob_service_client = BlobServiceClient.from_connection_string(AZURITE_CONNECTION_STRING)
- for folder in FOLDERS:
- upload_blob_file(blob_service_client, CONTAINER, f"{folder}/synapsetable/2020.csv", CONTENT)
- upload_blob_file(blob_service_client, CONTAINER, f"{folder}/synapsetable_other/2020.csv", CONTENT)
-
- upload_blob_file(blob_service_client, CONTAINER, "model.json", MODEL_JSON)
+ for folder in folders:
+ upload_blob_file(blob_service_client, CONTAINER, f"{folder}/model.json", model_json)
+ upload_blob_file(blob_service_client, CONTAINER, f"{folder}/synapsetable/2020.csv", content)
+ upload_blob_file(blob_service_client, CONTAINER, f"{folder}/synapsetable/2021.csv", content)
+ upload_blob_file(blob_service_client, CONTAINER, f"{folder}/synapsetable/2022.csv", content)
+ upload_blob_file(blob_service_client, CONTAINER, f"{folder}/synapsetable_other/2020.csv", content)
create_container()
-create_blobs()
+create_blobs(MODEL_JSON_MODIFIED, CONTENT_MODIFIED, FOLDERS[:4])
+create_blobs(MODEL_JSON, CONTENT, FOLDERS[4:])
+
+blob_service_client = BlobServiceClient.from_connection_string(AZURITE_CONNECTION_STRING)
+upload_blob_file(blob_service_client, CONTAINER, "model.json", MODEL_JSON_MODIFIED)
+
diff --git a/src/main/resources/logback.file.xml b/src/main/resources/logback.file.xml
new file mode 100644
index 0000000..3b9e0df
--- /dev/null
+++ b/src/main/resources/logback.file.xml
@@ -0,0 +1,17 @@
+
+
+ tmp/synapse-link.log
+
+
+ {
+ "service":"arcane-stream-runner",
+ "ddsource":"java",
+ "host":"${HOSTNAME}"
+ }
+
+
+
+
+
+
+
diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml
index 3b9e0df..eff1a71 100644
--- a/src/main/resources/logback.xml
+++ b/src/main/resources/logback.xml
@@ -1,17 +1,12 @@
-
- tmp/synapse-link.log
-
-
- {
- "service":"arcane-stream-runner",
- "ddsource":"java",
- "host":"${HOSTNAME}"
- }
-
-
+
+
+ %d [%thread] %-5level %logger{35} - %msg%n
+
+
-
+
+
diff --git a/src/main/scala/extensions/ArcaneSchemaExtensions.scala b/src/main/scala/extensions/ArcaneSchemaExtensions.scala
new file mode 100644
index 0000000..e051ec3
--- /dev/null
+++ b/src/main/scala/extensions/ArcaneSchemaExtensions.scala
@@ -0,0 +1,13 @@
+package com.sneaksanddata.arcane.microsoft_synapse_link
+package extensions
+
+import com.sneaksanddata.arcane.framework.models.{ArcaneSchema, ArcaneSchemaField}
+
+object ArcaneSchemaExtensions:
+
+ extension (targetSchema: ArcaneSchema) def getMissingFields(batches: ArcaneSchema): Seq[ArcaneSchemaField] =
+ batches.filter { batchField =>
+ !targetSchema.exists(targetField => targetField.name.toLowerCase() == batchField.name.toLowerCase()
+ && targetField.fieldType == batchField.fieldType)
+ }
+
diff --git a/src/main/scala/extensions/DataRowExtensions.scala b/src/main/scala/extensions/DataRowExtensions.scala
new file mode 100644
index 0000000..7cf4e31
--- /dev/null
+++ b/src/main/scala/extensions/DataRowExtensions.scala
@@ -0,0 +1,17 @@
+package com.sneaksanddata.arcane.microsoft_synapse_link
+package extensions
+
+import com.sneaksanddata.arcane.framework.models.{ArcaneSchema, DataRow, DatePartitionField, Field, MergeKeyField}
+
+object DataRowExtensions:
+
+ /**
+ * Extension method to get the schema of a DataRow.
+ */
+ extension (row: DataRow) def schema: ArcaneSchema =
+ row.foldLeft(ArcaneSchema.empty()) {
+ case (schema, cell) if cell.name == MergeKeyField.name => schema ++ Seq(MergeKeyField)
+ case (schema, cell) if cell.name == DatePartitionField.name => schema ++ Seq(DatePartitionField)
+ case (schema, cell) => schema ++ Seq(Field(cell.name, cell.Type))
+ }
+
diff --git a/src/main/scala/services/app/TableManager.scala b/src/main/scala/services/app/TableManager.scala
index 718ce3a..4be96fa 100644
--- a/src/main/scala/services/app/TableManager.scala
+++ b/src/main/scala/services/app/TableManager.scala
@@ -1,19 +1,21 @@
package com.sneaksanddata.arcane.microsoft_synapse_link
package services.app
+import extensions.ArcaneSchemaExtensions.getMissingFields
import models.app.{ArchiveTableSettings, MicrosoftSynapseLinkStreamContext, TargetTableSettings}
import services.app.JdbcTableManager.generateAlterTableSQL
import services.clients.BatchArchivationResult
+import com.sneaksanddata.arcane.framework.utils.SqlUtils.readArcaneSchema
import com.sneaksanddata.arcane.framework.logging.ZIOLogAnnotations.*
-import com.sneaksanddata.arcane.framework.models.ArcaneSchema
+import com.sneaksanddata.arcane.framework.models.{ArcaneSchema, ArcaneSchemaField}
import com.sneaksanddata.arcane.framework.services.base.SchemaProvider
import com.sneaksanddata.arcane.framework.services.consumers.JdbcConsumerOptions
import com.sneaksanddata.arcane.framework.services.lakehouse.{SchemaConversions, given_Conversion_ArcaneSchema_Schema}
import org.apache.iceberg.Schema
import org.apache.iceberg.types.Type
import org.apache.iceberg.types.Type.TypeID
-import org.apache.iceberg.types.Types.TimestampType
+import org.apache.iceberg.types.Types.{NestedField, TimestampType}
import zio.{Task, ZIO, ZLayer}
import java.sql.{Connection, DriverManager, ResultSet}
@@ -26,14 +28,24 @@ trait TableManager:
def createTargetTable: Task[TableCreationResult]
def createArchiveTable: Task[TableCreationResult]
+
+ def getTargetSchema(tableName: String): Task[ArcaneSchema]
def cleanupStagingTables: Task[Unit]
+
+ def migrateSchema(batchSchema: ArcaneSchema, tableName: String): Task[Unit]
+
/**
* The result of applying a batch.
*/
type TableCreationResult = Boolean
+/**
+ * The result of applying a batch.
+ */
+type TableModificationResult = Boolean
+
class JdbcTableManager(options: JdbcConsumerOptions,
targetTableSettings: TargetTableSettings,
archiveTableSettings: ArchiveTableSettings,
@@ -74,13 +86,29 @@ class JdbcTableManager(options: JdbcConsumerOptions,
_ <- ZIO.foreach(strings)(dropTable)
yield ()
}
+
+ def migrateSchema(batchSchema: ArcaneSchema, tableName: String): Task[Unit] =
+ for targetSchema <- getTargetSchema(tableName)
+ missingFields = targetSchema.getMissingFields(batchSchema)
+ _ <- addColumns(tableName, missingFields)
+ yield ()
+
+ def getTargetSchema(tableName: String): Task[ArcaneSchema] =
+ val query = s"SELECT * FROM $tableName where true and false"
+ val ack = ZIO.attemptBlocking(sqlConnection.prepareStatement(query))
+ ZIO.acquireReleaseWith(ack)(st => ZIO.succeed(st.close())) { statement =>
+ for
+ schemaResult <- ZIO.attemptBlocking(statement.executeQuery())
+ fields <- ZIO.attemptBlocking(schemaResult.readArcaneSchema)
+ yield fields.get
+ }
def addColumns(targetTableName: String, missingFields: ArcaneSchema): Task[Unit] =
for _ <- ZIO.foreach(missingFields)(field => {
- val query = generateAlterTableSQL(targetTableName, field.name, SchemaConversions.toIcebergType(field.fieldType))
- zlog(s"Adding column to table $targetTableName: ${field.name} ${field.fieldType}, $query")
- *> ZIO.attemptBlocking(sqlConnection.prepareStatement(query).execute())
- })
+ val query = generateAlterTableSQL(targetTableName, field.name, SchemaConversions.toIcebergType(field.fieldType))
+ zlog(s"Adding column to table $targetTableName: ${field.name} ${field.fieldType}, $query")
+ *> ZIO.attemptBlocking(sqlConnection.prepareStatement(query).execute())
+ })
yield ()
private def dropTable(tableName: String): Task[Unit] =
diff --git a/src/main/scala/services/clients/JdbcConsumer.scala b/src/main/scala/services/clients/JdbcConsumer.scala
index 80d4f78..8df50ef 100644
--- a/src/main/scala/services/clients/JdbcConsumer.scala
+++ b/src/main/scala/services/clients/JdbcConsumer.scala
@@ -6,7 +6,7 @@ import services.clients.{BatchArchivationResult, JdbcConsumer}
import com.sneaksanddata.arcane.framework.services.consumers.{JdbcConsumerOptions, StagedVersionedBatch}
import com.sneaksanddata.arcane.framework.logging.ZIOLogAnnotations.*
-
+import com.sneaksanddata.arcane.framework.models.ArcaneSchema
import zio.{Schedule, Task, ZIO, ZLayer}
import java.sql.{Connection, DriverManager, ResultSet}
@@ -50,7 +50,7 @@ class JdbcConsumer[Batch <: StagedVersionedBatch](options: JdbcConsumerOptions,
.map(values => partitionField -> values.toList)
)).map(_.toMap)
-
+
def applyBatch(batch: Batch): Task[BatchApplicationResult] =
val ack = ZIO.attemptBlocking({ sqlConnection.prepareStatement(batch.batchQuery.query) })
ZIO.acquireReleaseWith(ack)(st => ZIO.succeed(st.close())){ statement =>
@@ -59,8 +59,8 @@ class JdbcConsumer[Batch <: StagedVersionedBatch](options: JdbcConsumerOptions,
yield applicationResult
}
- def archiveBatch(batch: Batch): Task[BatchArchivationResult] =
- for _ <- executeArchivationQuery(batch)
+ def archiveBatch(batch: Batch, actualSchema: ArcaneSchema): Task[BatchArchivationResult] =
+ for _ <- executeArchivationQuery(batch, actualSchema)
yield new BatchArchivationResult
def optimizeTarget(tableName: String, batchNumber: Long, optimizeThreshold: Long, fileSizeThreshold: String): Task[BatchApplicationResult] =
@@ -105,8 +105,18 @@ class JdbcConsumer[Batch <: StagedVersionedBatch](options: JdbcConsumerOptions,
else
ZIO.succeed(false)
- private def executeArchivationQuery(batch: Batch): Task[BatchArchivationResult] =
- val expression = batch.archiveExpr(archiveTableSettings.archiveTableFullName)
+ private def archiveExpr(archiveTableName: String, reduceExpr: String, schema: ArcaneSchema): String =
+ val columns = schema.map(s => s.name).mkString(", ")
+ s"INSERT INTO $archiveTableName ($columns) $reduceExpr"
+
+ private def reduceExpr(batch: Batch): String =
+ val name = batch.name
+ s"""SELECT * FROM (
+ | SELECT * FROM $name ORDER BY ROW_NUMBER() OVER (PARTITION BY Id ORDER BY versionnumber DESC) FETCH FIRST 1 ROWS WITH TIES
+ |)""".stripMargin
+
+ private def executeArchivationQuery(batch: Batch, actualSchema: ArcaneSchema): Task[BatchArchivationResult] =
+ val expression = archiveExpr(archiveTableSettings.archiveTableFullName, reduceExpr(batch), actualSchema)
val ack = ZIO.blocking {
ZIO.succeed(sqlConnection.prepareStatement(expression))
}
diff --git a/src/main/scala/services/data_providers/microsoft_synapse_link/AzureBlobStorageReaderZIO.scala b/src/main/scala/services/data_providers/microsoft_synapse_link/AzureBlobStorageReaderZIO.scala
index 45d0dd6..133993a 100644
--- a/src/main/scala/services/data_providers/microsoft_synapse_link/AzureBlobStorageReaderZIO.scala
+++ b/src/main/scala/services/data_providers/microsoft_synapse_link/AzureBlobStorageReaderZIO.scala
@@ -94,6 +94,9 @@ final class AzureBlobStorageReaderZIO(accountName: String, endpoint: Option[Stri
val publisher = client.listBlobsByHierarchy("/", listOptions, defaultTimeout).stream().toList.asScala.map(implicitly)
ZStream.fromIterable(publisher)
+ def blobExists(blobPath: AdlsStoragePath): Task[Boolean] =
+ ZIO.attemptBlocking(getBlobClient(blobPath).exists())
+ .flatMap(result => ZIO.logDebug(s"Blob ${blobPath.toHdfsPath} exists: $result") *> ZIO.succeed(result))
def getFirstBlob(storagePath: AdlsStoragePath): Task[OffsetDateTime] =
streamPrefixes(storagePath + "/").runFold(OffsetDateTime.now(ZoneOffset.UTC)){ (date, blob) =>
diff --git a/src/main/scala/services/data_providers/microsoft_synapse_link/CdmSchemaProvider.scala b/src/main/scala/services/data_providers/microsoft_synapse_link/CdmSchemaProvider.scala
index fc9e089..07efe8b 100644
--- a/src/main/scala/services/data_providers/microsoft_synapse_link/CdmSchemaProvider.scala
+++ b/src/main/scala/services/data_providers/microsoft_synapse_link/CdmSchemaProvider.scala
@@ -17,7 +17,7 @@ class CdmSchemaProvider(azureBlobStorageReader: AzureBlobStorageReader, tableLoc
implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global
- override def getSchema: Future[SchemaType] = getEntity.flatMap(toArcaneSchema)
+ override lazy val getSchema: Future[SchemaType] = getEntity.flatMap(toArcaneSchema)
def getEntity: Future[SimpleCdmEntity] =
SimpleCdmModel(tableLocation, azureBlobStorageReader).flatMap(_.entities.find(_.name == tableName) match
diff --git a/src/main/scala/services/data_providers/microsoft_synapse_link/CdmTableStream.scala b/src/main/scala/services/data_providers/microsoft_synapse_link/CdmTableStream.scala
index 54d81db..175b411 100644
--- a/src/main/scala/services/data_providers/microsoft_synapse_link/CdmTableStream.scala
+++ b/src/main/scala/services/data_providers/microsoft_synapse_link/CdmTableStream.scala
@@ -3,14 +3,15 @@ package services.data_providers.microsoft_synapse_link
import models.app.streaming.SourceCleanupRequest
import models.app.{AzureConnectionSettings, ParallelismSettings}
-import services.data_providers.microsoft_synapse_link.CdmTableStream.getListPrefixes
-import com.sneaksanddata.arcane.framework.logging.ZIOLogAnnotations.*
+import services.data_providers.microsoft_synapse_link.CdmTableStream.withSchema
+import com.sneaksanddata.arcane.framework.logging.ZIOLogAnnotations.*
import com.sneaksanddata.arcane.framework.models.app.StreamContext
-import com.sneaksanddata.arcane.framework.models.cdm.{SimpleCdmEntity, given_Conversion_SimpleCdmEntity_ArcaneSchema, given_Conversion_String_ArcaneSchema_DataRow}
+import com.sneaksanddata.arcane.framework.models.cdm.given_Conversion_String_ArcaneSchema_DataRow
import com.sneaksanddata.arcane.framework.models.{ArcaneSchema, DataRow}
+import com.sneaksanddata.arcane.framework.services.base.SchemaProvider
import com.sneaksanddata.arcane.framework.services.cdm.CdmTableSettings
-import com.sneaksanddata.arcane.framework.services.storage.models.azure.AdlsStoragePath
+import com.sneaksanddata.arcane.framework.services.storage.models.azure.{AdlsStoragePath, AzureBlobStorageReader}
import com.sneaksanddata.arcane.framework.services.storage.models.base.StoredBlob
import zio.stream.ZStream
import zio.{Schedule, ZIO, ZLayer}
@@ -22,15 +23,22 @@ import scala.util.matching.Regex
type DataStreamElement = DataRow | SourceCleanupRequest
-class CdmTableStream(
- name: String,
+type BlobStream = ZStream[Any, Throwable, StoredBlob]
+
+type SchemaEnrichedBlobStream = ZStream[Any, Throwable, SchemaEnrichedBlob]
+
+case class SchemaEnrichedBlob(blob: StoredBlob, schemaProvider: SchemaProvider[ArcaneSchema])
+
+case class MetadataEnrichedReader(javaStream: BufferedReader, filePath: AdlsStoragePath, schemaProvider: SchemaProvider[ArcaneSchema])
+
+case class SchemaEnrichedContent[TContent](content: TContent, schema: ArcaneSchema)
+
+class CdmTableStream(name: String,
storagePath: AdlsStoragePath,
- entityModel: SimpleCdmEntity,
- reader: AzureBlobStorageReaderZIO,
+ zioReader: AzureBlobStorageReaderZIO,
+ reader: AzureBlobStorageReader,
parallelismSettings: ParallelismSettings,
streamContext: StreamContext):
- implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global
- private val schema: ArcaneSchema = implicitly(entityModel)
/**
* Read a table snapshot, taking optional start time. Lowest precision available is 1 hour
@@ -39,27 +47,30 @@ class CdmTableStream(
* @param changeCaptureInterval Interval to capture changes
* @return A stream of rows for this table
*/
- def snapshotPrefixes(lookBackInterval: Duration, changeCaptureInterval: Duration): ZStream[Any, Throwable, StoredBlob] =
- val backfillStream = ZStream.fromZIO(reader.getFirstBlob(storagePath + "/"))
- .flatMap(startDate => {
- ZStream.fromIterable(getListPrefixes(Some(startDate)))
- .flatMap(prefix => reader.streamPrefixes(storagePath + prefix))
- .flatMap(prefix => reader.streamPrefixes(storagePath + prefix.name + name + "/"))
- .filter(blob => blob.name.endsWith(".csv"))
- })
-
- val repeatStream = reader.getRootPrefixes(storagePath, lookBackInterval)
- .flatMap(prefix => reader.streamPrefixes(storagePath + prefix.name + name))
- .filter(blob => blob.name.endsWith(s"/$name/"))
- .flatMap(prefix => reader.streamPrefixes(storagePath + prefix.name))
- .filter(blob => blob.name.endsWith(".csv"))
+ def snapshotPrefixes(lookBackInterval: Duration, changeCaptureInterval: Duration): ZStream[Any, Throwable, SchemaEnrichedBlob] =
+ ZStream.fromZIO(dropLast(getRootDropPrefixes(storagePath, lookBackInterval)))
+ .flatMap(x => ZStream.fromIterable(x))
+ .flatMap(seb => zioReader.streamPrefixes(storagePath + seb.blob.name).withSchema(seb.schemaProvider))
+ .filter(seb => seb.blob.name.endsWith(s"/$name/"))
+ .flatMap(seb => zioReader.streamPrefixes(storagePath + seb.blob.name).withSchema(seb.schemaProvider))
+ .filter(seb => seb.blob.name.endsWith(".csv"))
.repeat(Schedule.spaced(changeCaptureInterval))
- if streamContext.IsBackfilling then backfillStream else repeatStream
- def getStream(blob: StoredBlob): ZIO[Any, IOException, (BufferedReader, AdlsStoragePath)] =
- reader.getBlobContent(storagePath + blob.name)
- .map(javaReader => (javaReader, storagePath + blob.name))
+ private def dropLast(stream: SchemaEnrichedBlobStream): ZIO[Any, Throwable, Seq[SchemaEnrichedBlob]] =
+ for blobs <- stream.runCollect
+ _ <- ZIO.log(s"Dropping last element from from the blobs stream: ${if blobs.nonEmpty then blobs.last.blob.name else "empty"}")
+ yield if blobs.nonEmpty then blobs.dropRight(1) else blobs
+
+ private def getRootDropPrefixes(storageRoot: AdlsStoragePath, lookBackInterval: Duration): SchemaEnrichedBlobStream =
+ for prefix <- zioReader.getRootPrefixes(storagePath, lookBackInterval).filterZIO(prefix => zioReader.blobExists(storagePath + prefix.name + "model.json"))
+ schemaProvider = CdmSchemaProvider(reader, (storagePath + prefix.name).toHdfsPath, name)
+ yield SchemaEnrichedBlob(prefix, schemaProvider)
+
+
+ def getStream(seb: SchemaEnrichedBlob): ZIO[Any, IOException, MetadataEnrichedReader] =
+ zioReader.getBlobContent(storagePath + seb.blob.name)
+ .map(javaReader => MetadataEnrichedReader(javaReader, storagePath + seb.blob.name, seb.schemaProvider))
.mapError(e => new IOException(s"Failed to get blob content: ${e.getMessage}", e))
def tryGetContinuation(stream: BufferedReader, quotes: Int, accum: StringBuilder): ZIO[Any, Throwable, String] =
@@ -89,43 +100,45 @@ class CdmTableStream(
regex.replaceSomeIn(csvLine, m => Some(Matcher.quoteReplacement(m.matched.replace("\n", "")))).replace("\r", "")
}
- def getData(streamData: (BufferedReader, AdlsStoragePath)): ZStream[Any, IOException, DataStreamElement] = streamData match
- case (javaStream, fileName) =>
- ZStream.acquireReleaseWith(ZIO.attempt(javaStream))(stream => ZIO.succeed(stream.close()))
- .tap(_ => zlog(s"Getting data from directory: $fileName"))
+ def getData(streamData: MetadataEnrichedReader): ZStream[Any, IOException, DataStreamElement] =
+ ZStream.acquireReleaseWith(ZIO.attempt(streamData.javaStream))(stream => ZIO.succeed(stream.close()))
.flatMap(javaReader => ZStream.repeatZIO(getLine(javaReader)))
.takeWhile(_.isDefined)
.map(_.get)
.mapZIO(content => ZIO.attempt(replaceQuotedNewlines(content)))
- .mapZIO(content => ZIO.attempt(implicitly[DataRow](content, schema)))
- .mapError(e => new IOException(s"Failed to parse CSV content: ${e.getMessage} from file: $fileName", e))
- .concat(ZStream.succeed(SourceCleanupRequest(fileName)))
+ .mapZIO(content => ZIO.fromFuture(sc => streamData.schemaProvider.getSchema).map(schema => SchemaEnrichedContent(content, schema)))
+ .mapZIO(sec => ZIO.attempt(implicitly[DataRow](sec.content, sec.schema)))
+ .mapError(e => new IOException(s"Failed to parse CSV content: ${e.getMessage} from file: ${streamData.filePath} with", e))
+ .concat(ZStream.succeed(SourceCleanupRequest(streamData.filePath)))
.zipWithIndex
.flatMap({
- case (e: SourceCleanupRequest, index: Long) => ZStream.log(s"Received $index lines frm $fileName, completed processing") *> ZStream.succeed(e)
+ case (e: SourceCleanupRequest, index: Long) => ZStream.log(s"Received $index lines frm ${streamData.filePath}, completed processing") *> ZStream.succeed(e)
case (r: DataRow, _) => ZStream.succeed(r)
})
object CdmTableStream:
+
+ extension (stream: ZStream[Any, Throwable, StoredBlob]) def withSchema(schemaProvider: SchemaProvider[ArcaneSchema]): SchemaEnrichedBlobStream =
+ stream.map(blob => SchemaEnrichedBlob(blob, schemaProvider))
+
type Environment = AzureConnectionSettings
& CdmTableSettings
& AzureBlobStorageReaderZIO
- & CdmSchemaProvider
+ & AzureBlobStorageReader
& ParallelismSettings
& StreamContext
def apply(settings: CdmTableSettings,
- entityModel: SimpleCdmEntity,
- reader: AzureBlobStorageReaderZIO,
+ zioReader: AzureBlobStorageReaderZIO,
+ reader: AzureBlobStorageReader,
parallelismSettings: ParallelismSettings,
streamContext: StreamContext): CdmTableStream = new CdmTableStream(
name = settings.name,
storagePath = AdlsStoragePath(settings.rootPath).get,
- entityModel = entityModel,
+ zioReader = zioReader,
reader = reader,
parallelismSettings = parallelismSettings,
- streamContext = streamContext
- )
+ streamContext = streamContext)
/**
* The ZLayer that creates the CdmDataProvider.
@@ -136,12 +149,11 @@ object CdmTableStream:
_ <- zlog("Creating the CDM data provider")
connectionSettings <- ZIO.service[AzureConnectionSettings]
tableSettings <- ZIO.service[CdmTableSettings]
- reader <- ZIO.service[AzureBlobStorageReaderZIO]
- schemaProvider <- ZIO.service[CdmSchemaProvider]
+ readerZIO <- ZIO.service[AzureBlobStorageReaderZIO]
+ reader <- ZIO.service[AzureBlobStorageReader]
parSettings <- ZIO.service[ParallelismSettings]
- l <- ZIO.fromFuture(_ => schemaProvider.getEntity)
sc <- ZIO.service[StreamContext]
- } yield CdmTableStream(tableSettings, l, reader, parSettings, sc)
+ } yield CdmTableStream(tableSettings, readerZIO, reader, parSettings, sc)
}
diff --git a/src/main/scala/services/streaming/consumers/IcebergSynapseConsumer.scala b/src/main/scala/services/streaming/consumers/IcebergSynapseConsumer.scala
index c5f82e6..076a983 100644
--- a/src/main/scala/services/streaming/consumers/IcebergSynapseConsumer.scala
+++ b/src/main/scala/services/streaming/consumers/IcebergSynapseConsumer.scala
@@ -6,9 +6,10 @@ import models.app.{MicrosoftSynapseLinkStreamContext, TargetTableSettings}
import services.clients.BatchArchivationResult
import services.streaming.consumers.IcebergSynapseConsumer.{getTableName, toStagedBatch}
import services.data_providers.microsoft_synapse_link.DataStreamElement
+import com.sneaksanddata.arcane.microsoft_synapse_link.extensions.DataRowExtensions.schema
import com.sneaksanddata.arcane.framework.models.app.StreamContext
-import com.sneaksanddata.arcane.framework.models.{ArcaneSchema, DataRow}
+import com.sneaksanddata.arcane.framework.models.{ArcaneSchema, DataRow, MergeKeyField}
import com.sneaksanddata.arcane.framework.services.base.SchemaProvider
import com.sneaksanddata.arcane.framework.services.consumers.{StagedVersionedBatch, SynapseLinkMergeBatch}
import com.sneaksanddata.arcane.framework.services.lakehouse.base.IcebergCatalogSettings
@@ -16,8 +17,6 @@ import com.sneaksanddata.arcane.framework.services.lakehouse.{CatalogWriter, giv
import com.sneaksanddata.arcane.framework.services.streaming.base.{BatchConsumer, BatchProcessor}
import com.sneaksanddata.arcane.framework.services.streaming.consumers.{IcebergStreamingConsumer, StreamingConsumer}
import com.sneaksanddata.arcane.framework.logging.ZIOLogAnnotations.*
-
-
import org.apache.iceberg.rest.RESTCatalog
import org.apache.iceberg.{Schema, Table}
import org.apache.zookeeper.proto.DeleteRequest
@@ -28,9 +27,9 @@ import java.time.format.DateTimeFormatter
import java.time.{Duration, ZoneOffset, ZonedDateTime}
import java.util.UUID
-type InFlightBatch = ((StagedVersionedBatch, Seq[SourceCleanupRequest]), Long)
-type CompletedBatch = (BatchArchivationResult, Seq[SourceCleanupRequest])
-type PipelineResult = (BatchArchivationResult, Seq[SourceCleanupResult])
+type InFlightBatch = ((Iterable[StagedVersionedBatch], Seq[SourceCleanupRequest]), Long)
+type CompletedBatch = (Iterable[BatchArchivationResult], Seq[SourceCleanupRequest])
+type PipelineResult = (Iterable[BatchArchivationResult], Seq[SourceCleanupResult])
class IcebergSynapseConsumer(streamContext: MicrosoftSynapseLinkStreamContext,
icebergCatalogSettings: IcebergCatalogSettings,
@@ -60,18 +59,22 @@ class IcebergSynapseConsumer(streamContext: MicrosoftSynapseLinkStreamContext,
}
private def writeStagingTable = ZPipeline[Chunk[DataStreamElement]]()
- .mapZIO(elements => writeDataRows(elements, streamContext.stagingTableNamePrefix.getTableName))
+ .mapZIO(elements =>
+ val groupedBySchema = elements.withFilter(e => e.isInstanceOf[DataRow]).map(e => e.asInstanceOf[DataRow]).groupBy(_.schema)
+ val deleteRequests = elements.withFilter(e => e.isInstanceOf[SourceCleanupRequest]).map(e => e.asInstanceOf[SourceCleanupRequest])
+ val batchResults = ZIO.foreach(groupedBySchema){
+ case (schema, rows) => writeDataRows(rows, schema)
+ }
+ batchResults.map(b => (b.values, deleteRequests))
+ )
.zipWithIndex
- private def writeDataRows(elements: Chunk[DataStreamElement], name: String): Task[(StagedVersionedBatch, Seq[SourceCleanupRequest])] =
+ private def writeDataRows(rows: Chunk[DataRow], arcaneSchema: ArcaneSchema): Task[(ArcaneSchema, StagedVersionedBatch)] =
for
- arcaneSchema <- ZIO.fromFuture(implicit ec => schemaProvider.getSchema)
- rows = elements.withFilter(e => e.isInstanceOf[DataRow]).map(e => e.asInstanceOf[DataRow])
- deleteRequests = elements.withFilter(e => e.isInstanceOf[SourceCleanupRequest]).map(e => e.asInstanceOf[SourceCleanupRequest])
- table <- ZIO.fromFuture(implicit ec => catalogWriter.write(rows, name, arcaneSchema)) retry retryPolicy
- batch = table.toStagedBatch( icebergCatalogSettings.namespace, icebergCatalogSettings.warehouse, arcaneSchema, sinkSettings.targetTableFullName, Map())
- yield (batch, deleteRequests)
+ table <- ZIO.fromFuture(implicit ec => catalogWriter.write(rows, streamContext.stagingTableNamePrefix.getTableName, arcaneSchema))
+ batch = table.toStagedBatch(icebergCatalogSettings.namespace, icebergCatalogSettings.warehouse, arcaneSchema, sinkSettings.targetTableFullName, Map())
+ yield (arcaneSchema, batch)
object IcebergSynapseConsumer:
diff --git a/src/main/scala/services/streaming/processors/ArchivationProcessor.scala b/src/main/scala/services/streaming/processors/ArchivationProcessor.scala
index 264bef3..aa9cd88 100644
--- a/src/main/scala/services/streaming/processors/ArchivationProcessor.scala
+++ b/src/main/scala/services/streaming/processors/ArchivationProcessor.scala
@@ -6,8 +6,13 @@ import services.streaming.consumers.{CompletedBatch, InFlightBatch}
import models.app.streaming.SourceCleanupRequest
import models.app.{ArchiveTableSettings, ParallelismSettings}
+import com.sneaksanddata.arcane.framework.models.ArcaneSchema
import com.sneaksanddata.arcane.framework.services.consumers.StagedVersionedBatch
import com.sneaksanddata.arcane.framework.services.streaming.base.BatchProcessor
+import com.sneaksanddata.arcane.microsoft_synapse_link.models.app.streaming.SourceCleanupRequest
+import com.sneaksanddata.arcane.microsoft_synapse_link.models.app.{ArchiveTableSettings, ParallelismSettings}
+import com.sneaksanddata.arcane.microsoft_synapse_link.services.app.TableManager
+import com.sneaksanddata.arcane.microsoft_synapse_link.services.streaming.consumers.{CompletedBatch, InFlightBatch}
import com.sneaksanddata.arcane.framework.logging.ZIOLogAnnotations.*
import zio.stream.ZPipeline
@@ -15,15 +20,18 @@ import zio.{ZIO, ZLayer}
class ArchivationProcessor(jdbcConsumer: JdbcConsumer[StagedVersionedBatch],
archiveTableSettings: ArchiveTableSettings,
- parallelismSettings: ParallelismSettings)
+ parallelismSettings: ParallelismSettings, tableManager: TableManager)
extends BatchProcessor[InFlightBatch, CompletedBatch]:
override def process: ZPipeline[Any, Throwable, InFlightBatch, CompletedBatch] =
ZPipeline.mapZIO({
- case ((batch, other), batchNumber) =>
+ case ((batches, other), batchNumber) =>
for _ <- zlog(s"Archiving batch $batchNumber")
- _ <- jdbcConsumer.archiveBatch(batch)
- result <- jdbcConsumer.dropTempTable(batch)
+ _ <- ZIO.foreach(batches){
+ batch => tableManager.migrateSchema(batch.schema, archiveTableSettings.archiveTableFullName) *>
+ tableManager.getTargetSchema(batch.name).flatMap(schema => jdbcConsumer.archiveBatch(batch, schema))
+ }
+ results <- ZIO.foreach(batches)(batch => jdbcConsumer.dropTempTable(batch))
_ <- jdbcConsumer.optimizeTarget(archiveTableSettings.archiveTableFullName, batchNumber,
archiveTableSettings.archiveOptimizeSettings.batchThreshold,
archiveTableSettings.archiveOptimizeSettings.fileSizeThreshold)
@@ -33,7 +41,7 @@ class ArchivationProcessor(jdbcConsumer: JdbcConsumer[StagedVersionedBatch],
_ <- jdbcConsumer.expireOrphanFiles(archiveTableSettings.archiveTableFullName, batchNumber,
archiveTableSettings.archiveOrphanFilesExpirationSettings.batchThreshold,
archiveTableSettings.archiveOrphanFilesExpirationSettings.retentionThreshold)
- yield (result, other)
+ yield (results, other)
})
object ArchivationProcessor:
@@ -41,10 +49,11 @@ object ArchivationProcessor:
type Environment = JdbcConsumer[StagedVersionedBatch]
& ArchiveTableSettings
& ParallelismSettings
-
+ & TableManager
+
def apply(jdbcConsumer: JdbcConsumer[StagedVersionedBatch], archiveTableSettings: ArchiveTableSettings,
- parallelismSettings: ParallelismSettings): ArchivationProcessor =
- new ArchivationProcessor(jdbcConsumer, archiveTableSettings, parallelismSettings)
+ parallelismSettings: ParallelismSettings, tableManager: TableManager): ArchivationProcessor =
+ new ArchivationProcessor(jdbcConsumer, archiveTableSettings, parallelismSettings, tableManager)
val layer: ZLayer[Environment, Nothing, ArchivationProcessor] =
ZLayer {
@@ -52,5 +61,6 @@ object ArchivationProcessor:
jdbcConsumer <- ZIO.service[JdbcConsumer[StagedVersionedBatch]]
archiveTableSettings <- ZIO.service[ArchiveTableSettings]
parallelismSettings <- ZIO.service[ParallelismSettings]
- yield ArchivationProcessor(jdbcConsumer, archiveTableSettings, parallelismSettings)
+ tableManager <- ZIO.service[TableManager]
+ yield ArchivationProcessor(jdbcConsumer, archiveTableSettings, parallelismSettings, tableManager)
}
diff --git a/src/main/scala/services/streaming/processors/MergeBatchProcessor.scala b/src/main/scala/services/streaming/processors/MergeBatchProcessor.scala
index b14b213..7b57cf3 100644
--- a/src/main/scala/services/streaming/processors/MergeBatchProcessor.scala
+++ b/src/main/scala/services/streaming/processors/MergeBatchProcessor.scala
@@ -5,10 +5,12 @@ import models.app.{OptimizeSettings, ParallelismSettings, TargetTableSettings}
import services.clients.{BatchApplicationResult, JdbcConsumer}
import services.streaming.consumers.InFlightBatch
+import com.sneaksanddata.arcane.framework.models.ArcaneSchema
import com.sneaksanddata.arcane.framework.services.consumers.StagedVersionedBatch
import com.sneaksanddata.arcane.framework.services.streaming.base.BatchProcessor
import com.sneaksanddata.arcane.framework.logging.ZIOLogAnnotations.*
+import com.sneaksanddata.arcane.microsoft_synapse_link.services.app.TableManager
import zio.stream.ZPipeline
import zio.{Task, ZIO, ZLayer}
@@ -19,7 +21,8 @@ import zio.{Task, ZIO, ZLayer}
*/
class MergeBatchProcessor(jdbcConsumer: JdbcConsumer[StagedVersionedBatch],
parallelismSettings: ParallelismSettings,
- targetTableSettings: TargetTableSettings)
+ targetTableSettings: TargetTableSettings,
+ tableManager: TableManager)
extends BatchProcessor[InFlightBatch, InFlightBatch]:
/**
@@ -29,19 +32,25 @@ class MergeBatchProcessor(jdbcConsumer: JdbcConsumer[StagedVersionedBatch],
*/
override def process: ZPipeline[Any, Throwable, InFlightBatch, InFlightBatch] =
ZPipeline.mapZIO({
- case ((batch, other), batchNumber) =>
+ case ((batches, other), batchNumber) =>
for _ <- zlog(s"Applying batch $batchNumber")
- _ <- jdbcConsumer.applyBatch(batch)
+
+ _ <- ZIO.foreach(batches)(batch => tableManager.migrateSchema(batch.schema, targetTableSettings.targetTableFullName))
+ _ <- ZIO.foreach(batches)(batch => jdbcConsumer.applyBatch(batch))
+
_ <- jdbcConsumer.optimizeTarget(targetTableSettings.targetTableFullName, batchNumber,
targetTableSettings.targetOptimizeSettings.batchThreshold,
targetTableSettings.targetOptimizeSettings.fileSizeThreshold)
+
_ <- jdbcConsumer.expireSnapshots(targetTableSettings.targetTableFullName, batchNumber,
targetTableSettings.targetSnapshotExpirationSettings.batchThreshold,
targetTableSettings.targetSnapshotExpirationSettings.retentionThreshold)
+
_ <- jdbcConsumer.expireOrphanFiles(targetTableSettings.targetTableFullName, batchNumber,
- targetTableSettings.targetOrphanFilesExpirationSettings.batchThreshold,
- targetTableSettings.targetOrphanFilesExpirationSettings.retentionThreshold)
- yield ((batch, other), batchNumber)
+ targetTableSettings.targetOrphanFilesExpirationSettings.batchThreshold,
+ targetTableSettings.targetOrphanFilesExpirationSettings.retentionThreshold)
+
+ yield ((batches, other), batchNumber)
})
object MergeBatchProcessor:
@@ -51,8 +60,8 @@ object MergeBatchProcessor:
* @param jdbcConsumer The JDBC consumer.
* @return The initialized MergeProcessor instance
*/
- def apply(jdbcConsumer: JdbcConsumer[StagedVersionedBatch], parallelismSettings: ParallelismSettings, targetTableSettings: TargetTableSettings): MergeBatchProcessor =
- new MergeBatchProcessor(jdbcConsumer, parallelismSettings, targetTableSettings)
+ def apply(jdbcConsumer: JdbcConsumer[StagedVersionedBatch], parallelismSettings: ParallelismSettings, targetTableSettings: TargetTableSettings, tableManager: TableManager): MergeBatchProcessor =
+ new MergeBatchProcessor(jdbcConsumer, parallelismSettings, targetTableSettings, tableManager)
/**
* The required environment for the MergeProcessor.
@@ -60,6 +69,7 @@ object MergeBatchProcessor:
type Environment = JdbcConsumer[StagedVersionedBatch]
& ParallelismSettings
& TargetTableSettings
+ & TableManager
/**
* The ZLayer that creates the MergeProcessor.
@@ -70,5 +80,6 @@ object MergeBatchProcessor:
jdbcConsumer <- ZIO.service[JdbcConsumer[StagedVersionedBatch]]
parallelismSettings <- ZIO.service[ParallelismSettings]
targetTableSettings <- ZIO.service[TargetTableSettings]
- yield MergeBatchProcessor(jdbcConsumer, parallelismSettings, targetTableSettings)
+ tableManager <- ZIO.service[TableManager]
+ yield MergeBatchProcessor(jdbcConsumer, parallelismSettings, targetTableSettings, tableManager)
}
diff --git a/src/main/scala/services/streaming/processors/SourceDeleteProcessor.scala b/src/main/scala/services/streaming/processors/SourceDeleteProcessor.scala
index c7384b0..ba5a971 100644
--- a/src/main/scala/services/streaming/processors/SourceDeleteProcessor.scala
+++ b/src/main/scala/services/streaming/processors/SourceDeleteProcessor.scala
@@ -8,6 +8,8 @@ import com.sneaksanddata.arcane.framework.services.streaming.base.BatchProcessor
import zio.stream.ZPipeline
import zio.{Task, ZIO, ZLayer}
+import scala.annotation.tailrec
+
class SourceDeleteProcessor(azureBlobStorageReaderZIO: AzureBlobStorageReaderZIO)
extends BatchProcessor[CompletedBatch, PipelineResult]:
@@ -19,7 +21,7 @@ class SourceDeleteProcessor(azureBlobStorageReaderZIO: AzureBlobStorageReaderZIO
})
def processEffects[A, B](effects: List[ZIO[Any, Throwable, A]], process: A => Task[B]): Task[List[B]] = {
- @scala.annotation.tailrec
+ @tailrec
def loop(remaining: List[ZIO[Any, Throwable, A]], acc: Task[List[B]]): Task[List[B]] = remaining match {
case Nil => acc
case head :: tail =>