Skip to content

Commit

Permalink
Read schema history with a clean instance of FileSchemaHistory to pre… (
Browse files Browse the repository at this point in the history
  • Loading branch information
rodireich authored Dec 2, 2024
1 parent 77e7c0d commit 552b952
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,22 +53,8 @@ class DebeziumStateFilesAccessor : AutoCloseable {
fileOffsetBackingStore.configure(StandaloneConfig(fileOffsetConfig))
}

private val fileSchemaHistory = FileSchemaHistory()

init {
fileSchemaHistory.configure(
Configuration.create()
.with(FileSchemaHistory.FILE_PATH, schemaFilePath.toString())
.build(),
HistoryRecordComparator.INSTANCE,
SchemaHistoryListener.NOOP,
false
)
}

override fun close() {
fileOffsetBackingStore.stop()
fileSchemaHistory.stop()
FileUtils.deleteDirectory(workingDir.toFile())
}

Expand All @@ -90,21 +76,20 @@ class DebeziumStateFilesAccessor : AutoCloseable {
}

fun readSchema(): DebeziumSchemaHistory {
fileSchemaHistory.start()
val schema: List<HistoryRecord> = buildList {
recoverRecords(fileSchemaHistory, Consumer(this::add))
var schema: List<HistoryRecord> = listOf()
doWithFileSchemaHistory(schemaFilePath) { fileSchemaHistory ->
schema = buildList { recoverRecords(fileSchemaHistory, Consumer(this::add)) }
}
fileSchemaHistory.stop()

return DebeziumSchemaHistory(schema)
}

fun writeSchema(schema: DebeziumSchemaHistory) {
fileSchemaHistory.initializeStorage()
fileSchemaHistory.start()
for (r in schema.wrapped) {
storeRecord(fileSchemaHistory, r)
doWithFileSchemaHistory(schemaFilePath) { fileSchemaHistory ->
for (r in schema.wrapped) {
storeRecord(fileSchemaHistory, r)
}
}
fileSchemaHistory.stop()
}

private fun toJson(byteBuffer: ByteBuffer): JsonNode {
Expand Down Expand Up @@ -137,5 +122,23 @@ class DebeziumStateFilesAccessor : AutoCloseable {
.java
.getDeclaredMethod("recoverRecords", Consumer::class.java)
.apply { isAccessible = true }

private fun doWithFileSchemaHistory(
schemaFilePath: Path,
block: (FileSchemaHistory) -> Unit
) {
val fileSchemaHistory = FileSchemaHistory()
fileSchemaHistory.configure(
Configuration.create()
.with(FileSchemaHistory.FILE_PATH, schemaFilePath.toString())
.build(),
HistoryRecordComparator.INSTANCE,
SchemaHistoryListener.NOOP,
false
)
fileSchemaHistory.start()
block(fileSchemaHistory)
fileSchemaHistory.stop()
}
}
}
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.9.0-rc.21
dockerImageTag: 3.9.0-rc.22
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import io.airbyte.cdk.output.BufferingOutputConsumer
import io.airbyte.integrations.source.mysql.MysqlContainerFactory.execAsRoot
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteStateMessage
import io.airbyte.protocol.models.v0.AirbyteStream
import io.airbyte.protocol.models.v0.CatalogHelpers
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
Expand Down Expand Up @@ -77,15 +78,17 @@ class MysqlCdcIntegrationTest {

@Test
fun test() {
CliRunner.source("read", config(), configuredCatalog).run()
// TODO: add assertions on run1 messages.
val state1 = CliRunner.source("read", config(), configuredCatalog).run().states().last()

connectionFactory.get().use { connection: Connection ->
connection.isReadOnly = false
connection.createStatement().use { stmt: Statement ->
stmt.execute("INSERT INTO test.tbl (k, v) VALUES (3, 'baz')")
}
}

val run2InputState: List<AirbyteStateMessage> = listOf(state1)
CliRunner.source("read", config(), configuredCatalog, run2InputState).run().records()
}

@Test
Expand Down

0 comments on commit 552b952

Please sign in to comment.