Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The Flink job using nebula-flink-connector-3.5.0 cannot run after being submitted to Yarn due to class name conflicts. #97

Open
adu-shzz opened this issue Jan 11, 2024 · 8 comments
Labels
type/question Type: question about the product

Comments

@adu-shzz
Copy link

adu-shzz commented Jan 11, 2024

NebulaGraph 环境

  • 版本 v3.5.0
  • 操作系统 CentOS7.6
  • Hadoop 版本 v3.3.4
  • JDK 版本 v1.8

BUG 描述

  • 现象:nebula-flink-connector-3.5.0 的 Flink 作业,提交到 Yarn 之后无法运行,报错信息如下:
Caused by: java.lang.LinkageError: loader constraint violation: loader (instance of sun/misc/Launcher$AppClassLoader) previously initiated loading for a different type with name "com/vesoft/nebula/client/graph/net/Session"
	at java.lang.ClassLoader.defineClass1(Native Method) ~[?:1.8.0_211]
	at java.lang.ClassLoader.defineClass(ClassLoader.java:763) ~[?:1.8.0_211]
	at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) ~[?:1.8.0_211]
	at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) ~[?:1.8.0_211]
	at java.net.URLClassLoader.access$100(URLClassLoader.java:74) ~[?:1.8.0_211]
	at java.net.URLClassLoader$1.run(URLClassLoader.java:369) ~[?:1.8.0_211]
	at java.net.URLClassLoader$1.run(URLClassLoader.java:363) ~[?:1.8.0_211]
	at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_211]
	at java.net.URLClassLoader.findClass(URLClassLoader.java:362) ~[?:1.8.0_211]
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_211]
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) ~[?:1.8.0_211]
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_211]
	at java.lang.Class.getDeclaredFields0(Native Method) ~[?:1.8.0_211]
	at java.lang.Class.privateGetDeclaredFields(Class.java:2583) ~[?:1.8.0_211]
	at java.lang.Class.getDeclaredField(Class.java:2068) ~[?:1.8.0_211]
	at java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1857) ~[?:1.8.0_211]
	at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:79) ~[?:1.8.0_211]
	at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:506) ~[?:1.8.0_211]
	at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494) ~[?:1.8.0_211]
	at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_211]
	at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:494) ~[?:1.8.0_211]
	at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391) ~[?:1.8.0_211]
	at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:490) ~[?:1.8.0_211]
	at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391) ~[?:1.8.0_211]
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134) ~[?:1.8.0_211]
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) ~[?:1.8.0_211]
	at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632) ~[flink-core-1.16.2.jar:1.16.2]
	at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:143) ~[flink-core-1.16.2.jar:1.16.2]
	at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) ~[flink-core-1.16.2.jar:1.16.2]
	at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69) ~[flink-core-1.16.2.jar:1.16.2]
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2308) ~[flink-streaming-java-1.16.2.jar:1.16.2]
	at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:202) ~[flink-streaming-java-1.16.2.jar:1.16.2]
	at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1242) ~[flink-streaming-java-1.16.2.jar:1.16.2]
	at com.XXX.jobs.portraits.XXXJober.initSink(XXXJober.java:82) ~[xxx-23.1114.1746.jar:?]
	at com.XXX.jobs.portraits.XXXJober.arrange(XXXJober.java:42) ~[xxx-23.1114.1746.jar:?]
	at com.XXX.common.launcher.IBdJober$Builder.lambda$execute$8(IBdJober.java:143) ~[xxx-23.1114.1746.jar:?]
	at java.util.HashMap$EntrySet.forEach(HashMap.java:1044) ~[?:1.8.0_211]
	at com.XXX.common.launcher.IBdJober$Builder.execute(IBdJober.java:137) ~[xxx-23.1114.1746.jar:?]
	at com.XXX.BigDataMain.main(BigDataMain.java:31) ~[xxx-23.1114.1746.jar:?]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_211]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_211]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_211]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_211]
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-clients-1.16.2.jar:1.16.2]
	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-clients-1.16.2.jar:1.16.2]
	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98) ~[flink-clients-1.16.2.jar:1.16.2]
	at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-clients-1.16.2.jar:1.16.2]
	... 12 more
  • 初步发现的原因:
    官方的 com.vesoft.nebula.client.graph.net.Session 这个 Java 类,在 com.vesoft:nebula-flink-connector:3.5.0.jar 及其依赖的 com.vesoft:client:3.5.0.jar 中均有定义。
@QingZ11 QingZ11 added the type/question Type: question about the product label Jan 11, 2024
@QingZ11 QingZ11 changed the title nebula-flink-connector-3.5.0 的 Flink 作业,提交到 Yarn 之后无法运行,报类名冲突。 The Flink job using nebula-flink-connector-3.5.0 cannot run after being submitted to Yarn due to class name conflicts. Jan 11, 2024
@QingZ11
Copy link

QingZ11 commented Jan 11, 2024

com.vesoft.nebula.client.graph.net.Session 这个 Java 类,在 com.vesoft:nebula-flink-connector:3.5.0.jar 没有定义,只在nebula-java 的 com.vesoft:client:3.5.0.jar 中有定义。大概是你 pom 中对 com.vesoft:client 使用的 scope 的问题。

研发的意思是让你去看看 scope,了解下 maven 依赖的 scope。

@adu-shzz
Copy link
Author

adu-shzz commented Jan 11, 2024

maven 依赖的 scope 问题,发现是发生在官方提供的 jar 这边:

刚下载了官方 jar 包和源码,对比后发现,官方提供的 com.vesoft:nebula-flink-connector:3.5.0.jarcom.vesoft:client:3.5.0.jar 存在相同的源码。
image

我们先在使用时,直接排除掉 com.vesoft:client:3.5.0.jar 试一下吧,如果还不行,只能手动处理 com.vesoft:nebula-flink-connector:3.5.0.jar 里的 class 文件,先删掉同名的包了。

@Nicole00
Copy link
Contributor

我用这个依赖可以运行的,没有类加载的问题,你看下 你pom.xml中的引用方式

  <dependencies>
        <dependency>
            <groupId>com.vesoft</groupId>
            <artifactId>nebula-flink-connector</artifactId>
            <version>3.5.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.14.4</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>
    </dependencies>

@adu-shzz
Copy link
Author

adu-shzz commented Jan 12, 2024

感谢回复,你的方式在我这边还是跑不通,我最终试成功的方式是:
1、maven 引 nebula-flink-connector 时,加 provided,将它从我的 Flink uber-jar 中排除出去,然后单独把 nebula-flink-connector-3.5.0.jar 放到 ${FLINK_HOME}/lib/ 下面。
2、在报错的第 82 行那里,将所有代码格式做换行然后 IDEA 重新编译打包,对,是换行,你没看错。

// 这个位置
at com.XXX.jobs.portraits.XXXJober.initSink(XXXJober.java:82)

换行前是这样

kfkSource.addSink(SinkUtil.<MyVO>nebulaSinkOptions(context, cfgPrefix)
                    .vertexOpts(opts -> opts.setGraphSpace("my_space").setTag("test_tag")
                            .setBatchSize(2000)
                            .setBatchIntervalMs(3000)
                    )
                    .batchExecutor(new MyNebulaBatchExecutor())
                    .buildVertexSinkFunction()
        );

换行后是这样

kfkSource.addSink(
                SinkUtil.<MyVO>nebulaSinkOptions(context, cfgPrefix)
                        .vertexOpts(opts -> opts.setGraphSpace("my_space").setTag("test_tag")
                                .setBatchSize(2000)
                                .setBatchIntervalMs(3000)
                        )
                        .batchExecutor(new MyNebulaBatchExecutor())
                        .buildVertexSinkFunction()
        );

做了这两步,就能成功通过 DataSphere 提交到 Hadoop-Yarn 集群运行了。
怀疑可能和 IDEA 的编译方式或 Flink 类加载机制有关。

@Nicole00
Copy link
Contributor

第二点太诡异了,我用的https://github.com/vesoft-inc/nebula-flink-connector/blob/master/example/src/main/java/org/apache/flink/FlinkConnectorSinkExample.java 这里的代码本地测试跑的,全部的依赖 就4个,而且格式故意打乱,不影响运行。
image

第一点,你的pom中没有额外引用com.vesoft:client吧,这个不需要单独引入, nebula-flink-connector的包中会带着client的。还有一个 可能要看下flink环境中是否有其他版本的client jar包

@adu-shzz
Copy link
Author

第一点,你的pom中没有额外引用com.vesoft:client吧,这个不需要单独引入, nebula-flink-connector的包中会带着client的。还有一个 可能要看下flink环境中是否有其他版本的client jar包

这个我有排查过,有的都去掉了。不是这方面的问题。就感觉挺诡异的。

@adu-shzz
Copy link
Author

adu-shzz commented Jan 12, 2024

第一点,你的pom中没有额外引用com.vesoft:client吧,这个不需要单独引入, nebula-flink-connector的包中会带着client的。还有一个 可能要看下flink环境中是否有其他版本的client jar包

对了,com.vesoft:client-3.5.0.jar 中,有一个 com.facebook.thrift 包,这个没有包含在 com.vesoft:nebula-flink-connector:3.5.0.jar 里边,担心 flink 里只引 nebula-flink-connector 不引 client 会有问题,求解决。

@Nicole00
Copy link
Contributor

对了,com.vesoft:client-3.5.0.jar 中,有一个 com.facebook.thrift 包,这个没有包含在 com.vesoft:nebula-flink-connector:3.5.0.jar 里边,担心 flink 里只引 nebula-flink-connector 不引 client 会有问题,求解决。

这个不需要单独引用client是因为你已经引用了nebula-flink-connector,这个包里面包含了 client。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/question Type: question about the product
Projects
None yet
Development

No branches or pull requests

3 participants