Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How PostgreSQL-CDC uses heartbeat.action.query to reduce WAL disk space consumption #8762

Open
huangyutongs opened this issue Feb 18, 2025 · 0 comments

Comments

@huangyutongs
Copy link

What happened

seatunnel 2.3.9

Without the heartbeat.action.query parameter it works fine,I can confirm that I have the table test_heartbeat_table and seatunneluser has write permissions.

A simple SQL statement to create a table

CREATE TABLE test_heartbeat_table (
    text text
);

SeaTunnel Configuration

{
    "env": {
        "parallelism": 1,
        "job.mode": "STREAMING",
        "checkpoint.interval": 10000
    },
    "source": [
        {
            "plugin_name": "Postgres-CDC",
            "base-url": "jdbc:postgresql://10.4.0.91:5432/lifecycle?loggerLevel=OFF",
            "username": "seatunneluser",
            "password": "aaaaa",
            "slot.name": "test_seatunnel",
            "debezium": {
                "heartbeat.interval.ms" : "5000",
                "heartbeat.action.query": "INSERT INTO lifecycle.public.test_heartbeat_table (text) VALUES ('test_heartbeat');"
            },
            "database-names": [
                "lifecycle"
            ],
            "schema-names": [
                "public"
            ],
            "table-names": [
                "lifecycle.public.test"
            ]
        }
    ],
    "sink": [
        {
            "plugin_name": "StarRocks",
            "nodeUrls": [
                "10.4.252.3:8030",
                "10.4.252.5:8030",
                "10.4.252.6:8030"
            ],
            "base-url": "jdbc:mysql://10.4.252.3:9030/",
            "username": "datahub",
            "password": "dpsxMPz2w6LGs3Mo",
            "database": "datahub",
            "batch_max_rows": 5000,
            "table": "ods_test"
        }
    ]
}

Error Exception

2025-02-18 08:26:04,583 INFO  [c.h.c.LifecycleService        ] [main] - hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is CLIENT_DISCONNECTED
2025-02-18 08:26:04,587 INFO  [c.h.c.LifecycleService        ] [main] - hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTDOWN
2025-02-18 08:26:04,587 INFO  [s.c.s.s.c.ClientExecuteCommand] [main] - Closed SeaTunnel client......
2025-02-18 08:26:04,588 INFO  [s.c.s.s.c.ClientExecuteCommand] [main] - Closed metrics executor service ......
2025-02-18 08:26:04,588 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - 

===============================================================================


2025-02-18 08:26:04,588 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Fatal Error, 

2025-02-18 08:26:04,588 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Please submit bug report in https://github.com/apache/seatunnel/issues

2025-02-18 08:26:04,588 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Reason:SeaTunnel job executed failed 

2025-02-18 08:26:04,592 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: One or more fetchers have encountered exception
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:147)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:167)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:93)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
        at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159)
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:169)
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:694)
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1019)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81)
        ... 5 more
Caused by: java.lang.NullPointerException
        at io.debezium.heartbeat.HeartbeatFactory.createHeartbeat(HeartbeatFactory.java:79)
        at io.debezium.pipeline.EventDispatcher.<init>(EventDispatcher.java:127)
        at io.debezium.pipeline.EventDispatcher.<init>(EventDispatcher.java:94)
        at org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher.<init>(JdbcSourceEventDispatcher.java:67)
        at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.PostgresSourceFetchTaskContext.configure(PostgresSourceFetchTaskContext.java:253)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher.submitTask(IncrementalSourceScanFetcher.java:85)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.checkSplitOrStartNext(IncrementalSourceSplitReader.java:147)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:71)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.FetchTask.run(FetchTask.java:54)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
        ... 6 more

        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220)
        ... 2 more
 
2025-02-18 08:26:04,592 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - 
===============================================================================



Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: One or more fetchers have encountered exception
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:147)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:167)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:93)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
        at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159)
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:169)
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:694)
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1019)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81)
        ... 5 more
Caused by: java.lang.NullPointerException
        at io.debezium.heartbeat.HeartbeatFactory.createHeartbeat(HeartbeatFactory.java:79)
        at io.debezium.pipeline.EventDispatcher.<init>(EventDispatcher.java:127)
        at io.debezium.pipeline.EventDispatcher.<init>(EventDispatcher.java:94)
        at org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher.<init>(JdbcSourceEventDispatcher.java:67)
        at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.PostgresSourceFetchTaskContext.configure(PostgresSourceFetchTaskContext.java:253)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher.submitTask(IncrementalSourceScanFetcher.java:85)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.checkSplitOrStartNext(IncrementalSourceSplitReader.java:147)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:71)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.FetchTask.run(FetchTask.java:54)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
        ... 6 more

        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220)
        ... 2 more
2025-02-18 08:26:04,594 INFO  [s.c.s.s.c.ClientExecuteCommand] [SeaTunnel-CompletableFuture-Thread-0] - run shutdown hook because get close signal
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant