Skip to content

Commit

Permalink
Schema evolution (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
s-vitaliy authored Feb 7, 2025
1 parent ad86888 commit b91d242
Show file tree
Hide file tree
Showing 16 changed files with 856 additions and 123 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ lazy val plugin = (project in file("."))
name := "arcane-stream-microsoft-synapse-link",
idePackagePrefix := Some("com.sneaksanddata.arcane.microsoft_synapse_link"),

libraryDependencies += "com.sneaksanddata" % "arcane-framework_3" % "0.2.0",
libraryDependencies += "com.sneaksanddata" % "arcane-framework_3" % "0.2.1",
libraryDependencies += "com.azure" % "azure-core-http-okhttp" % "1.12.1",
libraryDependencies += "io.netty" % "netty-tcnative-boringssl-static" % "2.0.65.Final",

Expand Down
2 changes: 1 addition & 1 deletion integration-tests.env
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
STREAMCONTEXT__BACKFILL=false
STREAMCONTEXT__SPEC='{ "backfillJobTemplateRef": { "apiGroup": "streaming.sneaksanddata.com", "kind": "StreamingJobTemplate", "name": "arcane-stream-microsoft-synapse-link-large-job" }, "groupingIntervalSeconds": 1, "groupsPerFile": 1, "httpClientMaxRetries": 3, "httpClientRetryDelaySeconds": 1, "jobTemplateRef": { "apiGroup": "streaming.sneaksanddata.com", "kind": "StreamingJobTemplate", "name": "arcane-stream-microsoft-synapse-link-standard-job" }, "lookBackInterval": 7600, "partitionExpression": "", "rowsPerGroup": 10000, "schemaUpdateIntervalSeconds": 10, "sinkSettings": { "archiveTableName": "iceberg.test.archive_test", "optimizeSettings": { "batchThreshold": 60, "fileSizeThreshold": "512MB" }, "orphanFilesExpirationSettings": { "batchThreshold": 60, "retentionThreshold": "6h" }, "snapshotExpirationSettings": { "batchThreshold": 60, "retentionThreshold": "6h" }, "targetTableName": "iceberg.test.test" }, "sourceSettings": { "baseLocation": "abfss://[email protected]/", "changeCaptureIntervalSeconds": 5, "name": "synapsetable" }, "stagingDataSettings": { "catalog": { "catalogName": "iceberg", "catalogUri": "http://localhost:8181/api/catalog", "namespace": "test", "schemaName": "test", "warehouse": "polaris" }, "dataLocation": "s3://tmp/polaris/test", "tableNamePrefix": "staging_inventtrans" }}'
STREAMCONTEXT__SPEC='{ "backfillJobTemplateRef": { "apiGroup": "streaming.sneaksanddata.com", "kind": "StreamingJobTemplate", "name": "arcane-stream-microsoft-synapse-link-large-job" }, "groupingIntervalSeconds": 1, "groupsPerFile": 1, "httpClientMaxRetries": 3, "httpClientRetryDelaySeconds": 1, "jobTemplateRef": { "apiGroup": "streaming.sneaksanddata.com", "kind": "StreamingJobTemplate", "name": "arcane-stream-microsoft-synapse-link-standard-job" }, "lookBackInterval": 21000, "partitionExpression": "", "rowsPerGroup": 10000, "schemaUpdateIntervalSeconds": 10, "sinkSettings": { "archiveTableName": "iceberg.test.archive_test", "optimizeSettings": { "batchThreshold": 60, "fileSizeThreshold": "512MB" }, "orphanFilesExpirationSettings": { "batchThreshold": 60, "retentionThreshold": "6h" }, "snapshotExpirationSettings": { "batchThreshold": 60, "retentionThreshold": "6h" }, "targetTableName": "iceberg.test.test" }, "sourceSettings": { "baseLocation": "abfss://[email protected]/", "changeCaptureIntervalSeconds": 5, "name": "synapsetable" }, "stagingDataSettings": { "catalog": { "catalogName": "iceberg", "catalogUri": "http://localhost:8181/api/catalog", "namespace": "test", "schemaName": "test", "warehouse": "polaris" }, "dataLocation": "s3://tmp/polaris/test", "tableNamePrefix": "staging_inventtrans" }}'
STREAMCONTEXT__STREAM_ID=test
STREAMCONTEXT__STREAM_KIND=CdmChangeFeed
APPLICATION_VERSION=0.0.1
Expand Down
646 changes: 629 additions & 17 deletions populate-cdm-container.py

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions src/main/resources/logback.file.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<configuration>
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>tmp/synapse-link.log</file>
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<customFields>
{
"service":"arcane-stream-runner",
"ddsource":"java",
"host":"${HOSTNAME}"
}
</customFields>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="FILE" />
</root>
</configuration>
19 changes: 7 additions & 12 deletions src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
<configuration>
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>tmp/synapse-link.log</file>
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<customFields>
{
"service":"arcane-stream-runner",
"ddsource":"java",
"host":"${HOSTNAME}"
}
</customFields>
</encoder>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<layout>
<Pattern>%d [%thread] %-5level %logger{35} - %msg%n</Pattern>
</layout>
</appender>

<root level="INFO">
<appender-ref ref="FILE" />
<appender-ref ref="STDOUT" />
</root>

</configuration>
13 changes: 13 additions & 0 deletions src/main/scala/extensions/ArcaneSchemaExtensions.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.sneaksanddata.arcane.microsoft_synapse_link
package extensions

import com.sneaksanddata.arcane.framework.models.{ArcaneSchema, ArcaneSchemaField}

object ArcaneSchemaExtensions:

extension (targetSchema: ArcaneSchema) def getMissingFields(batches: ArcaneSchema): Seq[ArcaneSchemaField] =
batches.filter { batchField =>
!targetSchema.exists(targetField => targetField.name.toLowerCase() == batchField.name.toLowerCase()
&& targetField.fieldType == batchField.fieldType)
}

17 changes: 17 additions & 0 deletions src/main/scala/extensions/DataRowExtensions.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.sneaksanddata.arcane.microsoft_synapse_link
package extensions

import com.sneaksanddata.arcane.framework.models.{ArcaneSchema, DataRow, DatePartitionField, Field, MergeKeyField}

object DataRowExtensions:

/**
* Extension method to get the schema of a DataRow.
*/
extension (row: DataRow) def schema: ArcaneSchema =
row.foldLeft(ArcaneSchema.empty()) {
case (schema, cell) if cell.name == MergeKeyField.name => schema ++ Seq(MergeKeyField)
case (schema, cell) if cell.name == DatePartitionField.name => schema ++ Seq(DatePartitionField)
case (schema, cell) => schema ++ Seq(Field(cell.name, cell.Type))
}

40 changes: 34 additions & 6 deletions src/main/scala/services/app/TableManager.scala
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
package com.sneaksanddata.arcane.microsoft_synapse_link
package services.app

import extensions.ArcaneSchemaExtensions.getMissingFields
import models.app.{ArchiveTableSettings, MicrosoftSynapseLinkStreamContext, TargetTableSettings}
import services.app.JdbcTableManager.generateAlterTableSQL
import services.clients.BatchArchivationResult

import com.sneaksanddata.arcane.framework.utils.SqlUtils.readArcaneSchema
import com.sneaksanddata.arcane.framework.logging.ZIOLogAnnotations.*
import com.sneaksanddata.arcane.framework.models.ArcaneSchema
import com.sneaksanddata.arcane.framework.models.{ArcaneSchema, ArcaneSchemaField}
import com.sneaksanddata.arcane.framework.services.base.SchemaProvider
import com.sneaksanddata.arcane.framework.services.consumers.JdbcConsumerOptions
import com.sneaksanddata.arcane.framework.services.lakehouse.{SchemaConversions, given_Conversion_ArcaneSchema_Schema}
import org.apache.iceberg.Schema
import org.apache.iceberg.types.Type
import org.apache.iceberg.types.Type.TypeID
import org.apache.iceberg.types.Types.TimestampType
import org.apache.iceberg.types.Types.{NestedField, TimestampType}
import zio.{Task, ZIO, ZLayer}

import java.sql.{Connection, DriverManager, ResultSet}
Expand All @@ -26,14 +28,24 @@ trait TableManager:
def createTargetTable: Task[TableCreationResult]

def createArchiveTable: Task[TableCreationResult]

def getTargetSchema(tableName: String): Task[ArcaneSchema]

def cleanupStagingTables: Task[Unit]

def migrateSchema(batchSchema: ArcaneSchema, tableName: String): Task[Unit]


/**
* The result of applying a batch.
*/
type TableCreationResult = Boolean

/**
* The result of applying a batch.
*/
type TableModificationResult = Boolean

class JdbcTableManager(options: JdbcConsumerOptions,
targetTableSettings: TargetTableSettings,
archiveTableSettings: ArchiveTableSettings,
Expand Down Expand Up @@ -74,13 +86,29 @@ class JdbcTableManager(options: JdbcConsumerOptions,
_ <- ZIO.foreach(strings)(dropTable)
yield ()
}

def migrateSchema(batchSchema: ArcaneSchema, tableName: String): Task[Unit] =
for targetSchema <- getTargetSchema(tableName)
missingFields = targetSchema.getMissingFields(batchSchema)
_ <- addColumns(tableName, missingFields)
yield ()

def getTargetSchema(tableName: String): Task[ArcaneSchema] =
val query = s"SELECT * FROM $tableName where true and false"
val ack = ZIO.attemptBlocking(sqlConnection.prepareStatement(query))
ZIO.acquireReleaseWith(ack)(st => ZIO.succeed(st.close())) { statement =>
for
schemaResult <- ZIO.attemptBlocking(statement.executeQuery())
fields <- ZIO.attemptBlocking(schemaResult.readArcaneSchema)
yield fields.get
}

def addColumns(targetTableName: String, missingFields: ArcaneSchema): Task[Unit] =
for _ <- ZIO.foreach(missingFields)(field => {
val query = generateAlterTableSQL(targetTableName, field.name, SchemaConversions.toIcebergType(field.fieldType))
zlog(s"Adding column to table $targetTableName: ${field.name} ${field.fieldType}, $query")
*> ZIO.attemptBlocking(sqlConnection.prepareStatement(query).execute())
})
val query = generateAlterTableSQL(targetTableName, field.name, SchemaConversions.toIcebergType(field.fieldType))
zlog(s"Adding column to table $targetTableName: ${field.name} ${field.fieldType}, $query")
*> ZIO.attemptBlocking(sqlConnection.prepareStatement(query).execute())
})
yield ()

private def dropTable(tableName: String): Task[Unit] =
Expand Down
22 changes: 16 additions & 6 deletions src/main/scala/services/clients/JdbcConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import services.clients.{BatchArchivationResult, JdbcConsumer}

import com.sneaksanddata.arcane.framework.services.consumers.{JdbcConsumerOptions, StagedVersionedBatch}
import com.sneaksanddata.arcane.framework.logging.ZIOLogAnnotations.*

import com.sneaksanddata.arcane.framework.models.ArcaneSchema
import zio.{Schedule, Task, ZIO, ZLayer}

import java.sql.{Connection, DriverManager, ResultSet}
Expand Down Expand Up @@ -50,7 +50,7 @@ class JdbcConsumer[Batch <: StagedVersionedBatch](options: JdbcConsumerOptions,
.map(values => partitionField -> values.toList)
)).map(_.toMap)


def applyBatch(batch: Batch): Task[BatchApplicationResult] =
val ack = ZIO.attemptBlocking({ sqlConnection.prepareStatement(batch.batchQuery.query) })
ZIO.acquireReleaseWith(ack)(st => ZIO.succeed(st.close())){ statement =>
Expand All @@ -59,8 +59,8 @@ class JdbcConsumer[Batch <: StagedVersionedBatch](options: JdbcConsumerOptions,
yield applicationResult
}

def archiveBatch(batch: Batch): Task[BatchArchivationResult] =
for _ <- executeArchivationQuery(batch)
def archiveBatch(batch: Batch, actualSchema: ArcaneSchema): Task[BatchArchivationResult] =
for _ <- executeArchivationQuery(batch, actualSchema)
yield new BatchArchivationResult

def optimizeTarget(tableName: String, batchNumber: Long, optimizeThreshold: Long, fileSizeThreshold: String): Task[BatchApplicationResult] =
Expand Down Expand Up @@ -105,8 +105,18 @@ class JdbcConsumer[Batch <: StagedVersionedBatch](options: JdbcConsumerOptions,
else
ZIO.succeed(false)

private def executeArchivationQuery(batch: Batch): Task[BatchArchivationResult] =
val expression = batch.archiveExpr(archiveTableSettings.archiveTableFullName)
private def archiveExpr(archiveTableName: String, reduceExpr: String, schema: ArcaneSchema): String =
val columns = schema.map(s => s.name).mkString(", ")
s"INSERT INTO $archiveTableName ($columns) $reduceExpr"

private def reduceExpr(batch: Batch): String =
val name = batch.name
s"""SELECT * FROM (
| SELECT * FROM $name ORDER BY ROW_NUMBER() OVER (PARTITION BY Id ORDER BY versionnumber DESC) FETCH FIRST 1 ROWS WITH TIES
|)""".stripMargin

private def executeArchivationQuery(batch: Batch, actualSchema: ArcaneSchema): Task[BatchArchivationResult] =
val expression = archiveExpr(archiveTableSettings.archiveTableFullName, reduceExpr(batch), actualSchema)
val ack = ZIO.blocking {
ZIO.succeed(sqlConnection.prepareStatement(expression))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ final class AzureBlobStorageReaderZIO(accountName: String, endpoint: Option[Stri
val publisher = client.listBlobsByHierarchy("/", listOptions, defaultTimeout).stream().toList.asScala.map(implicitly)
ZStream.fromIterable(publisher)

def blobExists(blobPath: AdlsStoragePath): Task[Boolean] =
ZIO.attemptBlocking(getBlobClient(blobPath).exists())
.flatMap(result => ZIO.logDebug(s"Blob ${blobPath.toHdfsPath} exists: $result") *> ZIO.succeed(result))

def getFirstBlob(storagePath: AdlsStoragePath): Task[OffsetDateTime] =
streamPrefixes(storagePath + "/").runFold(OffsetDateTime.now(ZoneOffset.UTC)){ (date, blob) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class CdmSchemaProvider(azureBlobStorageReader: AzureBlobStorageReader, tableLoc

implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global

override def getSchema: Future[SchemaType] = getEntity.flatMap(toArcaneSchema)
override lazy val getSchema: Future[SchemaType] = getEntity.flatMap(toArcaneSchema)

def getEntity: Future[SimpleCdmEntity] =
SimpleCdmModel(tableLocation, azureBlobStorageReader).flatMap(_.entities.find(_.name == tableName) match
Expand Down
Loading

0 comments on commit b91d242

Please sign in to comment.