Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Question] CDC schema evolution in synchronizing databases? #3581

Open
1 of 2 tasks
CodyPin opened this issue Jun 24, 2024 · 2 comments
Open
1 of 2 tasks

[Feature][Question] CDC schema evolution in synchronizing databases? #3581

CodyPin opened this issue Jun 24, 2024 · 2 comments
Labels
enhancement New feature or request

Comments

@CodyPin
Copy link

CodyPin commented Jun 24, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

I was using Flink CDC to sync some tables in a MySQL database and I suddenly noticed that the job failed, I didn't look much into it since the job often fails due to OOM (I am testing with loads of table, around 90ish), and when I restart the job, it comes with the below exception

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Incompatible schema found.
Paimon table is: ...
MySQL table is: ...
If you want to ignore the incompatible tables, please specify --ignore-incompatible to true.
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
	at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301)
	at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
	at org.apache.flink.runtime.concurrent.pekko.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:172)
	at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
	at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
	at org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:59)
	at org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:57)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)

The table schema in MySQL has indeed changed with a column added. Originally I had the assumption that database synchronization would also have schema evolution like table synchronization, but upon further reading into the documents:

Suppose we have a MySQL table named tableA, it has three fields: field_1, field_2, field_3. When we want to load this MySQL table to Paimon, we can do this in Flink SQL, or use MySqlSyncTableAction.

With it specifically mentioning MySqlSyncTableAction, it seems schema evolution is not supported in database synchronization, is this true? If so, I have 2 questions:

  1. What would be the recommended action if I want to continue the database synchronization with that table included and updated? Do I need to drop said table and load it again? Or is there another easier way?
  2. Would schema evolution be supported in database synchronization in the future?

Some additional info for the builds I am using:

  • Flink 1.18.1
  • paimon-flink-action 0.8.1
  • paimon-flink 1.18-0.8.1
  • flink-sql-connector-mysql-cdc 3.0.1

Solution

No response

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@CodyPin CodyPin added the enhancement New feature or request label Jun 24, 2024
@MOBIN-F
Copy link
Contributor

MOBIN-F commented Jun 24, 2024

hi @CodyPin ,Can you provide the complete information of this piece?
Paimon table is: ...
MySQL table is: ...

I have encountered a similar problem. This problem occurs when the schema change occurs when stopping and starting the flink job. I think that's what happened to you, too? Relevant pr:#3362

Would schema evolution be supported in database synchronization in the future? [ database synchronization supported schema evolution]

@LinMingQiang
Copy link
Contributor

You can add flink-cdc-common-3.0.1.jar into FLINK_HOME/lib path, and try the schema evolution again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants