Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Beam Jobs built using version 2.50 and above are not running with FlinkRunners v 1.16.x #32944

Open
3 of 17 tasks
siddhanta-rath opened this issue Oct 25, 2024 · 3 comments

Comments

@siddhanta-rath
Copy link

siddhanta-rath commented Oct 25, 2024

What happened?

We have been running our Beam jobs on Google Cloud Dataflow for a while now. We are now evaluating migrating to running them on Flink.
All of our jobs are built using beam sdk version 2.56.0.

During this exercise, we have experienced an issue where our jobs are not coming up on FlinkRunner with Flink(v1.16.x) when we use the same pipelines built using 2.56.0 but when we downgrade the beam SDK version to 2.49 the jobs start running. But, when we downgrade we lose out on some of the features offered in the newer Beam Sdks ( RequestResponseIO among others, which is a critical component in one of our jobs).

We tried to run the WordCount example pipeline with the beam version starting from 2.50 till 2.56.0 with the respective runner versions from 2.50 to 2.58.0 on Flink 1.16.x but to no avail. But it runs fine on 2.49.0 version

These are the failure LOGS

caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Pipeline execution failed at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261) ~[flink-dist_2.12-1.14.3.jar:1.14.3] ... 12 more Caused by: java.lang.RuntimeException: Pipeline execution failed at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:107) ~[?:?] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325) ~[?:?] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310) ~[?:?] at com.gojek.de.jobs.JobManager.main(JobManager.java:83) ~[?:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_372] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_372] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_372] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_372] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261) ~[flink-dist_2.12-1.14.3.jar:1.14.3] ... 12 more Caused by: org.apache.flink.api.common.InvalidProgramException: Job was submitted in detached mode. Results of job execution, such as accumulators, runtime, etc. are not available. at org.apache.flink.core.execution.DetachedJobExecutionResult.getNetRuntime(DetachedJobExecutionResult.java:51) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.createAttachedPipelineResult(FlinkPipelineExecutionEnvironment.java:171) ~[?:?] at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:154) ~[?:?] at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:104) ~[?:?] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325) ~[?:?] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310) ~[?:?] at com.gojek.de.jobs.JobManager.main(JobManager.java:83) ~[?:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_372] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_372] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_372] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_372] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261) ~[flink-dist_2.12-1.14.3.jar:1.14.3] ... 12 more 2024-07-25 11:45:12,956 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint. java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_372] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_372] at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957) ~[?:1.8.0_372] at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) ~[?:1.8.0_372] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_372] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_372] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:287) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:224) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_372] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_372] at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) ~[flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3] at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3] at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) ~[flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3] at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) [flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) [flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_372] at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_372] at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_372] at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_372] Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application. ... 13 more Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Pipeline execution failed at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261) ~[flink-dist_2.12-1.14.3.jar:1.14.3] ... 12 more Caused by: java.lang.RuntimeException: Pipeline execution failed at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:107) ~[?:?] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325) ~[?:?] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310) ~[?:?] at com.gojek.de.jobs.JobManager.main(JobManager.java:83) ~[?:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_372] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_372] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_372] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_372] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261) ~[flink-dist_2.12-1.14.3.jar:1.14.3] ... 12 more Caused by: org.apache.flink.api.common.InvalidProgramException: Job was submitted in detached mode. Results of job execution, such as accumulators, runtime, etc. are not available. at org.apache.flink.core.execution.DetachedJobExecutionResult.getNetRuntime(DetachedJobExecutionResult.java:51) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.createAttachedPipelineResult(FlinkPipelineExecutionEnvironment.java:171) ~[?:?] at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:154) ~[?:?] at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:104) ~[?:?] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325) ~[?:?] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310) ~[?:?] at com.gojek.de.jobs.JobManager.main(JobManager.java:83) ~[?:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_372] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_372] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_372] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_372] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261) ~[flink-dist_2.12-1.14.3.jar:1.14.3] ... 12 more

I have gone through the beam community issues [Issue/29660] and also the release notes. Beyond the compatibility mismatch (which in our case is already addressed), I didn't find anything else that could directly be the cause for the mentioned issue...

QUESTION IS - Has anyone in the community experienced such issues and have found a workaround to run Beam pipelines built using Beam SDKs > 2.49.0 on Flink ? Particularly if anybody is successfully running Beam pipelines built using SDK version 2.56 or newer...

Any help is appreciated !

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@siddhanta-rath
Copy link
Author

From the change log from beam SDK 2.50.0, I learned that some changes have been made to support both x86 and ARM CPU architectures in beam images. Is this the reason for this incompatibility?

@siddhanta-rath siddhanta-rath changed the title [Bug]: Beam Jobs built using ver 2.50 and above are not running with FlinkRunners v 1.16.x [Bug]: Beam Jobs built using version 2.50 and above are not running with FlinkRunners v 1.16.x Oct 25, 2024
@lintong
Copy link

lintong commented Jan 28, 2025

Hi @siddhanta-rath - I've managed to get it working with:

  • Flink Docker image=flink:1.19.0-java11
  • Beam SDK=2.62.0
  • Java Version=11/Corretto
  • Flink Operator=1.10.0

pom configuration:

<properties>
        <beam.version>2.62.0</beam.version>

        <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
        <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-core</artifactId>
            <version>${beam.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
            <version>${beam.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-flink-1.19</artifactId>
            <version>${beam.version}</version>
        </dependency>
        
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${maven-compiler-plugin.version}</version>
                <configuration>
                    <source>11</source>
                    <target>11</target>
                </configuration>
            </plugin>
        </plugins>

        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.codehaus.mojo</groupId>
                    <artifactId>exec-maven-plugin</artifactId>
                    <version>${maven-exec-plugin.version}</version>
                    <configuration>
                        <cleanupDaemonThreads>false</cleanupDaemonThreads>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <profiles>
        <profile>
            <id>package</id>
            <activation>
                <activeByDefault>false</activeByDefault>
            </activation>
            <build>
                <finalName>job</finalName>
                <plugins>
                    <plugin>
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-dependency-plugin</artifactId>
                        <executions>
                            <execution>
                                <phase>prepare-package</phase>
                                <goals>
                                    <goal>copy-dependencies</goal>
                                </goals>
                                <configuration>
                                    <outputDirectory>${project.basedir}/target/lib</outputDirectory>
                                </configuration>
                            </execution>
                        </executions>
                    </plugin>
                    <plugin>
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-shade-plugin</artifactId>
                        <executions>
                            <!-- Run shade goal on package phase -->
                            <execution>
                                <phase>package</phase>
                                <goals>
                                    <goal>shade</goal>
                                </goals>
                                <configuration>
                                    <finalName>job</finalName>
                                    <shadedArtifactAttached>false</shadedArtifactAttached>
                                    <filters>
                                        <filter>
                                            <!-- Do not copy the signatures in the META-INF folder.
                                            Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                            <artifact>*:*</artifact>
                                            <excludes>
                                                <exclude>META-INF/*.SF</exclude>
                                                <exclude>META-INF/*.DSA</exclude>
                                                <exclude>META-INF/*.RSA</exclude>
                                            </excludes>
                                        </filter>
                                    </filters>
                                    <transformers>
                                        <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                            <mainClass>some.path.WordCount</mainClass>
                                        </transformer>
                                    </transformers>
                                    <createDependencyReducedPom>false</createDependencyReducedPom>
                                </configuration>
                            </execution>
                        </executions>
                    </plugin>
                </plugins>
            </build>
        </profile>

Operator configuration:

---
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  creationTimestamp: "..."
  finalizers:
  - flinkdeployments.flink.apache.org/finalizer
  generation: 26
  labels:
    argocd.argoproj.io/instance: some-instance-name
  name: some-name
  namespace: some-namespace
spec:
  flinkConfiguration:
    kubernetes.operator.job.autoscaler.enabled: "true"
    kubernetes.operator.job.autoscaler.metrics.window: 3m
    kubernetes.operator.job.autoscaler.stabilization.interval: 1m
    metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
    metrics.reporters: prom
    pipeline.max-parallelism: "24"
    taskmanager.numberOfTaskSlots: "2"
  flinkVersion: v1_19
  image: ghcr.io/some/image:tag
  imagePullPolicy: Always
  job:
    args:
    - --runner=FlinkRunner
    - --attachedMode=false
    - --output=file://opt/output.txt
    entryClass: some-path.WordCount
    jarURI: local:///opt/flink/lib/job.jar
    parallelism: 2
    state: running
    upgradeMode: stateless
  jobManager:
    replicas: 1
    resource:
      cpu: 1
      memory: 2048m
  podTemplate:
    metadata:
      annotations:
        prometheus.io/path: /metrics
        prometheus.io/port: prom
        prometheus.io/scrape: "true"
      labels:
        app.kubernetes.io/instance: some-instance
        app.kubernetes.io/managed-by: Helm
        app.kubernetes.io/name: some-name
        app.kubernetes.io/version: 1.16.0
        helm.sh/chart: some-helm-chart
    spec:
      containers:
      - name: flink-main-container
        ports:
        - containerPort: 9249
          name: prom
          protocol: TCP
      imagePullSecrets:
      - name: regcred
  serviceAccount: some-service-account
  taskManager:
    resource:
      cpu: 1
      memory: 2048m

Dockerfile:

FROM flink:1.19.0-java11

COPY target/lib /opt/flink/lib
COPY target/job.jar /opt/flink/lib

Reference code:

@lintong
Copy link

lintong commented Jan 28, 2025

I realise that the above could do with some refinement (e.g. the layering). However, I came across the same issue myself and didn't want to wait before mentioning it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants