Skip to content

Commit

Permalink
Change staging table name format (#41)
Browse files Browse the repository at this point in the history
* Change staging table name format

Part of #17

Add methods required to table modification to TableManager class

* Merge from main
  • Loading branch information
s-vitaliy authored Feb 6, 2025
1 parent ec7f952 commit ad86888
Showing 1 changed file with 20 additions and 8 deletions.
28 changes: 20 additions & 8 deletions src/main/scala/services/app/TableManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,23 @@ package com.sneaksanddata.arcane.microsoft_synapse_link
package services.app

import models.app.{ArchiveTableSettings, MicrosoftSynapseLinkStreamContext, TargetTableSettings}
import services.app.JdbcTableManager.generateAlterTableSQL
import services.clients.BatchArchivationResult

import com.sneaksanddata.arcane.framework.logging.ZIOLogAnnotations.*
import com.sneaksanddata.arcane.framework.models.ArcaneSchema
import com.sneaksanddata.arcane.framework.services.base.SchemaProvider
import com.sneaksanddata.arcane.framework.logging.ZIOLogAnnotations.*

import scala.jdk.CollectionConverters.*
import com.sneaksanddata.arcane.framework.services.consumers.JdbcConsumerOptions
import com.sneaksanddata.arcane.framework.services.lakehouse.{SchemaConversions, given_Conversion_ArcaneSchema_Schema}
import org.apache.iceberg.Schema
import org.apache.iceberg.types.Type
import org.apache.iceberg.types.Type.TypeID
import org.apache.iceberg.types.Types.TimestampType
import zio.{Task, ZIO, ZLayer}

import java.sql.{Connection, DriverManager, ResultSet}
import scala.concurrent.Future
import org.apache.iceberg.Schema
import org.apache.iceberg.types.Type
import org.apache.iceberg.types.Type.TypeID
import org.apache.iceberg.types.Types.{NestedField, TimestampType}
import com.sneaksanddata.arcane.framework.services.lakehouse.given_Conversion_ArcaneSchema_Schema
import scala.jdk.CollectionConverters.*


trait TableManager:
Expand Down Expand Up @@ -74,6 +75,14 @@ class JdbcTableManager(options: JdbcConsumerOptions,
yield ()
}

def addColumns(targetTableName: String, missingFields: ArcaneSchema): Task[Unit] =
for _ <- ZIO.foreach(missingFields)(field => {
val query = generateAlterTableSQL(targetTableName, field.name, SchemaConversions.toIcebergType(field.fieldType))
zlog(s"Adding column to table $targetTableName: ${field.name} ${field.fieldType}, $query")
*> ZIO.attemptBlocking(sqlConnection.prepareStatement(query).execute())
})
yield ()

private def dropTable(tableName: String): Task[Unit] =
val sql = s"DROP TABLE IF EXISTS $tableName"
val statement = ZIO.attemptBlocking {
Expand Down Expand Up @@ -126,6 +135,9 @@ object JdbcTableManager:
}
}

def generateAlterTableSQL(tableName: String, fieldName: String, fieldType: Type): String =
s"ALTER TABLE $tableName ADD COLUMN $fieldName ${fieldType.convertType}"

private def generateCreateTableSQL(tableName: String, schema: Schema): String =
val columns = schema.columns().asScala.map { field => s"${field.name()} ${field.`type`().convertType}" }.mkString(", ")
s"CREATE TABLE IF NOT EXISTS $tableName ($columns)"
Expand Down

0 comments on commit ad86888

Please sign in to comment.