Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres CDC sync is making the confirmed flush LSN to a value > saved state LSN #55846

Open
1 task
vrishin-bolt opened this issue Mar 19, 2025 · 2 comments
Open
1 task
Labels

Comments

@vrishin-bolt
Copy link

Connector Name

source-postgres

Connector Version

v3.6.28

What step the error happened?

During the sync

Relevant information

Hi all,

Something super weird is happening -

  1. I am doing PG -> BQ, sync in CDC mode (im in latest PG source and BQ destination version and airbyte version 0.64.2)
  2. This is the first sync => its going to load everything

Confirmed flush LSN (2 mins before sync start)
-- airbyte_slot_2 C1/20E83D08 829480779016

Confirmed flush LSN (2 mins after sync start)
-- airbyte_slot_2 C1/22539BC0 829504592832

State LSN saved in airbyte state file
829504592784

⁉️ Clearly state LSN < confirmed flush LSN
Airbyte is saving an LSN > confirmed flush LSN when it starts? (or)
Airbyte is confirm that it has read and processed a LSN > what it has saved? Why??

This is causing my sync to fail with
"Saved offset is before replication slot's confirmed LSN" after sometime

I have attached the logs (I stopped the sync the moment I noticed this happened)

Airbyte Logs.txt

Relevant log output

Contribute

  • Yes, I want to contribute
@vrishin-bolt
Copy link
Author

vrishin-bolt commented Mar 28, 2025

Here is an update:
Doing PG -> BQ via CDC, noticing the shared state LSN that is being saved is incorrect(on the very first sync in incremental| append + deduped mode). Here are the relevant log(s) -

2025-03-28 01:11:44 source INFO main i.d.c.CommonConnectorConfig(getSourceInfoStructMaker):1649 Loading the custom source info struct maker plugin: io.debezium.connector.postgresql.PostgresSourceInfoStructMaker
2025-03-28 01:11:44 source INFO main i.a.i.s.p.c.PostgresDebeziumStateUtil(extractLsn):191 Found previous partition offset PostgresPartition [sourcePartition={server=api_staging}]: {lsn=833635403600, txId=5209687748, ts_usec=1743104503698506}
2025-03-28 01:11:44 source INFO main i.a.i.s.p.c.PostgresDebeziumStateUtil(parseSavedOffset):171 Closing offsetStorageReader and fileOffsetBackingStore
2025-03-28 01:11:44 source INFO main o.a.k.c.s.FileOffsetBackingStore(stop):71 Stopped FileOffsetBackingStore
2025-03-28 01:11:44 source INFO main i.a.i.s.p.c.PostgresDebeziumStateUtil(isSavedOffsetAfterReplicationSlotLSN):69 Replication slot confirmed_flush_lsn : 833635145880 Saved offset LSN : 833635403600
2025-03-28 01:11:44 source INFO main i.a.i.s.p.c.PostgresCdcCtidInitializer(getCtidInitialLoadGlobalStateManager):117 Streams to be synced via ctid (can include RFR streams) : 1
2025-03-28 01:11:44 source INFO main i.a.i.s.p.c.PostgresCdcCtidInitializer(getCtidInitialLoadGlobalStateManager):118 Streams: public.risk_assessment_factors
2025-03-28 01:11:44 source INFO main i.a.i.s.p.PostgresQueryUtils(fileNodeForIndividualStream):235 Relation filenode is for stream "public"."risk_assessment_factors" is 1270938
2025-03-28 01:11:44 source INFO main i.a.i.s.p.c.PostgresDebeziumStateUtil(format):240 Initial Debezium state constructed: {"[\"api_staging\",{\"server\":\"api_staging\"}]":"{\"transaction_id\":null,\"lsn\":833635403792,\"txId\":5209687750,\"ts_usec\":1743104504606614}"}
2025-03-28 01:11:44 source INFO main i.a.i.s.p.PostgresUtils(isCdc):70 using CDC: true
2025-03-28 01:11:44 source INFO main i.a.i.s.p.PostgresSource(getIncrementalIterators):508 Using ctid + CDC
2025-03-28 01:11:44 source INFO main i.a.i.s.p.PostgresUtils(getFirstRecordWaitTime):171 First record waiting time: 120 seconds
2025-03-28 01:11:44 source WARN main i.a.c.i.s.r.InitialLoadTimeoutUtil(getInitialLoadTimeout):36 Initial Load timeout is overridden to 24 hours, which is the max time allowed for safety.
2025-03-28 01:11:44 source INFO main i.a.c.i.s.r.InitialLoadTimeoutUtil(getInitialLoadTimeout):44 Initial Load timeout: 86400 seconds
2025-03-28 01:11:44 source INFO main i.a.i.s.p.c.PostgresCdcCtidInitializer(cdcCtidIteratorsCombined):149 First record waiting time: 120 seconds
2025-03-28 01:11:44 source INFO main i.a.i.s.p.c.PostgresCdcCtidInitializer(cdcCtidIteratorsCombined):150 Initial load timeout: 24 hours
2025-03-28 01:11:44 source INFO main i.a.i.s.p.c.PostgresCdcCtidInitializer(cdcCtidIteratorsCombined):151 Queue size: 10000
2025-03-28 01:11:44 source INFO main i.a.i.s.p.c.PostgresDebeziumStateUtil(format):240 Initial Debezium state constructed: {"[\"api_staging\",{\"server\":\"api_staging\"}]":"{\"transaction_id\":null,\"lsn\":833635403840,\"txId\":5209687751,\"ts_usec\":1743104504691364}"}
2025-03-28 01:11:44 source INFO main i.a.i.s.p.PostgresUtils(shouldFlushAfterSync):78 Should flush after sync: true
2025-03-28 01:11:44 source INFO main i.a.i.s.p.PostgresSource(toSslJdbcParamInternal):924 DISABLED toSslJdbcParam disable
2025-03-28 01:11:44 source INFO main o.a.k.c.c.AbstractConfig(logAll):370 JsonConverterConfig values: 
2025-03-28 01:11:44 source INFO main o.a.k.c.c.AbstractConfig(logAll):370 StandaloneConfig values: 
	access.control.allow.methods = 
	access.control.allow.origin = 
	admin.listeners = null
	auto.include.jmx.reporter = true
	bootstrap.servers = [localhost:9092]
	client.dns.lookup = use_all_dns_ips
	config.providers = []
	connector.client.config.override.policy = All
	header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
	key.converter = class org.apache.kafka.connect.json.JsonConverter
	listeners = [http://:8083]
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	offset.flush.interval.ms = 1000
	offset.flush.timeout.ms = 5000
	offset.storage.file.filename = /tmp/cdc-state-offset13844630858619352820/offset.dat
	plugin.discovery = hybrid_warn
	plugin.path = null
	response.http.headers.config = 
	rest.advertised.host.name = null
	rest.advertised.listener = null
	rest.advertised.port = null
	rest.extension.classes = []
	ssl.cipher.suites = null
	ssl.client.auth = none
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	task.shutdown.graceful.timeout.ms = 5000
	topic.creation.enable = true
	topic.tracking.allow.reset = true
	topic.tracking.enable = true
	value.converter = class org.apache.kafka.connect.json.JsonConverter
2025-03-28 01:11:44 source INFO main o.a.k.c.s.FileOffsetBackingStore(start):63 Starting FileOffsetBackingStore with file /tmp/cdc-state-offset13844630858619352820/offset.dat
2025-03-28 01:11:44 source INFO main o.a.k.c.c.AbstractConfig(logAll):370 JsonConverterConfig values: 
	converter.type = key
	decimal.format = BASE64
	replace.null.with.default = true
	schemas.cache.size = 1000
	schemas.enable = false
2025-03-28 01:11:44 source INFO main o.a.k.c.c.AbstractConfig(logAll):370 JsonConverterConfig values: 
	converter.type = value
	decimal.format = BASE64
	replace.null.with.default = true
	schemas.cache.size = 1000
	schemas.enable = false
2025-03-28 01:11:44 source INFO main i.d.c.CommonConnectorConfig(getSourceInfoStructMaker):1649 Loading the custom source info struct maker plugin: io.debezium.connector.postgresql.PostgresSourceInfoStructMaker
2025-03-28 01:11:44 source INFO main i.a.i.s.p.c.PostgresDebeziumStateUtil(extractLsn):191 Found previous partition offset PostgresPartition [sourcePartition={server=api_staging}]: {lsn=833635403840, txId=5209687751, ts_usec=1743104504691364}
2025-03-28 01:11:44 source INFO main i.a.i.s.p.c.PostgresDebeziumStateUtil(parseSavedOffset):171 Closing offsetStorageReader and fileOffsetBackingStore
2025-03-28 01:11:44 source INFO main o.a.k.c.s.FileOffsetBackingStore(stop):71 Stopped FileOffsetBackingStore
2025-03-28 01:11:44 source INFO main i.a.i.s.p.PostgresUtils(shouldFlushAfterSync):78 Should flush after sync: true
2025-03-28 01:11:44 source INFO main i.a.i.s.p.c.PostgresDebeziumStateUtil(commitLSNToPostgresDatabase):104 Committing upto LSN: 833635403840

I notice from these logs

Initial Debezium state constructed: {"[\"api_staging\",{\"server\":\"api_staging\"}]":"{\"transaction_id\":null,\"lsn\":833635403792,\"txId\":5209687750,\"ts_usec\":1743104504606614}"}

and

 Initial Debezium state constructed: {"[\"api_staging\",{\"server\":\"api_staging\"}]":"{\"transaction_id\":null,\"lsn\":833635403840,\"txId\":5209687751,\"ts_usec\":1743104504691364}"}
  • but only the first one(incorrect and old) is saved, and the second one(correct and latest) is not. Why is that?
  • I'm on source-postgres v3.6.18 (not on latest cos there is some issue #49802, that says latest one has some cdc issues)
  • I'm on latest airbyte v1.5.0
  • I have also correctly configured and validated both the debezium heartbeat query and the pg_cron heartbeat
  • I'm running on AWS Aurora PG(this is a standalone writer instance that is getting AWS CDC replicated data from another PG writer instance) with rds.logical_wal_cache=0, max_slot_wal_keep_size=-1

edit1: btw I also tried with v3.6.28 (same issue)
edit2: I think im facing this issue

@marcosmarxm
Copy link
Member

@rodireich do you mind taking a look at this issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants