Skip to content

Commit

Permalink
[opt](hudi) use spark bundle to read hudi data apache#21260
Browse files Browse the repository at this point in the history
  • Loading branch information
AshinGau authored and morningman committed Jul 3, 2023
1 parent 08159e5 commit 67319fc
Show file tree
Hide file tree
Showing 26 changed files with 1,500 additions and 516 deletions.
6 changes: 4 additions & 2 deletions be/src/vec/exec/scan/hudi_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <ostream>

#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "runtime/types.h"
#include "vec/core/types.h"

Expand All @@ -35,7 +36,7 @@ class Block;

namespace doris::vectorized {

const std::string HudiJniReader::HADOOP_FS_PREFIX = "hadoop_fs.";
const std::string HudiJniReader::HADOOP_CONF_PREFIX = "hadoop_conf.";

HudiJniReader::HudiJniReader(const TFileScanRangeParams& scan_params,
const THudiFileDesc& hudi_params,
Expand All @@ -52,6 +53,7 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams& scan_params,
}

std::map<String, String> params = {
{"query_id", print_id(_state->query_id())},
{"base_path", _hudi_params.base_path},
{"data_file_path", _hudi_params.data_file_path},
{"data_file_length", std::to_string(_hudi_params.data_file_length)},
Expand All @@ -65,7 +67,7 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams& scan_params,

// Use compatible hadoop client to read data
for (auto& kv : _scan_params.properties) {
params[HADOOP_FS_PREFIX + kv.first] = kv.second;
params[HADOOP_CONF_PREFIX + kv.first] = kv.second;
}

_jni_connector = std::make_unique<JniConnector>("org/apache/doris/hudi/HudiJniScanner", params,
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/hudi_jni_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class HudiJniReader : public GenericReader {
ENABLE_FACTORY_CREATOR(HudiJniReader);

public:
static const std::string HADOOP_FS_PREFIX;
static const std::string HADOOP_CONF_PREFIX;

HudiJniReader(const TFileScanRangeParams& scan_params, const THudiFileDesc& hudi_params,
const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state,
Expand Down
12 changes: 6 additions & 6 deletions bin/start_be.sh
Original file line number Diff line number Diff line change
Expand Up @@ -82,23 +82,23 @@ for f in "${DORIS_HOME}/lib/java_extensions"/*.jar; do
if [[ -z "${DORIS_CLASSPATH}" ]]; then
export DORIS_CLASSPATH="${f}"
else
export DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}"
export DORIS_CLASSPATH="${DORIS_CLASSPATH}:${f}"
fi
done

if [[ -d "${DORIS_HOME}/lib/hadoop_hdfs/" ]]; then
# add hadoop libs
for f in "${DORIS_HOME}/lib/hadoop_hdfs/common"/*.jar; do
DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}"
DORIS_CLASSPATH="${DORIS_CLASSPATH}:${f}"
done
for f in "${DORIS_HOME}/lib/hadoop_hdfs/common/lib"/*.jar; do
DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}"
DORIS_CLASSPATH="${DORIS_CLASSPATH}:${f}"
done
for f in "${DORIS_HOME}/lib/hadoop_hdfs/hdfs"/*.jar; do
DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}"
DORIS_CLASSPATH="${DORIS_CLASSPATH}:${f}"
done
for f in "${DORIS_HOME}/lib/hadoop_hdfs/hdfs/lib"/*.jar; do
DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}"
DORIS_CLASSPATH="${DORIS_CLASSPATH}:${f}"
done
fi

Expand All @@ -107,7 +107,7 @@ fi
if command -v hadoop >/dev/null 2>&1; then
HADOOP_SYSTEM_CLASSPATH="$(hadoop classpath --glob)"
fi
export CLASSPATH="${HADOOP_SYSTEM_CLASSPATH}:${DORIS_HOME}/conf/:${DORIS_CLASSPATH}"
export CLASSPATH="${DORIS_CLASSPATH}:${HADOOP_SYSTEM_CLASSPATH}:${DORIS_HOME}/conf/"
# DORIS_CLASSPATH is for self-managed jni
export DORIS_CLASSPATH="-Djava.class.path=${DORIS_CLASSPATH}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ public String getString() {
return inspectObject().toString();
}

@Override
public byte[] getStringAsBytes() {
throw new UnsupportedOperationException();
}

@Override
public LocalDate getDate() {
// avro has no date type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ protected int getNext() throws IOException {
return numRows;
}

@Override
protected TableSchema parseTableSchema() throws UnsupportedOperationException {
Schema schema = avroReader.getSchema();
List<Field> schemaFields = schema.getFields();
Expand Down
202 changes: 168 additions & 34 deletions fe/be-java-extensions/hudi-scanner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,83 +30,217 @@ under the License.
<properties>
<doris.home>${basedir}/../../</doris.home>
<fe_ut_parallel>1</fe_ut_parallel>
<scala.version>2.12.15</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.2.0</spark.version>
<sparkbundle.version>3.2</sparkbundle.version>
<hudi.version>0.13.0</hudi.version>
<janino.version>3.0.16</janino.version>
<fasterxml.jackson.version>2.14.3</fasterxml.jackson.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>java-common</artifactId>
<version>${project.version}</version>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark-common_${scala.binary.version}</artifactId>
<version>${hudi.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark-client</artifactId>
<version>${hudi.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark3-common</artifactId>
<version>${hudi.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark3.2.x_${scala.binary.version}</artifactId>
<version>${hudi.version}</version>
<exclusions>
<exclusion>
<artifactId>fe-common</artifactId>
<groupId>org.apache.doris</groupId>
<artifactId>json4s-ast_2.11</artifactId>
<groupId>org.json4s</groupId>
</exclusion>
<exclusion>
<artifactId>json4s-core_2.11</artifactId>
<groupId>org.json4s</groupId>
</exclusion>
<exclusion>
<artifactId>json4s-jackson_2.11</artifactId>
<groupId>org.json4s</groupId>
</exclusion>
<exclusion>
<artifactId>json4s-scalap_2.11</artifactId>
<groupId>org.json4s</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-hadoop-mr-bundle</artifactId>
<version>${hudi.version}</version>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.10.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<artifactId>jackson-module-scala_2.12</artifactId>
<groupId>com.fasterxml.jackson.module</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-common</artifactId>
<artifactId>hadoop-client-api</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<artifactId>hadoop-client-runtime</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
</exclusions>
<version>${spark.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
</exclusion>
<exclusion>
<artifactId>hudi-hadoop-mr</artifactId>
<groupId>org.apache.hudi</groupId>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.facebook.presto.hive</groupId>
<artifactId>hive-apache</artifactId>
<version>${presto.hive.version}</version>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-launcher_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<!-- version of spark's janino is error -->
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>${janino.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
<version>${janino.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<!-- version of spark's jackson module is error -->
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
<version>${fasterxml.jackson.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<groupId>org.apache.doris</groupId>
<artifactId>java-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
</dependencies>
<build>
<finalName>hudi-scanner</finalName>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
<testResources>
<testResource>
<directory>src/test/resources</directory>
</testResource>
</testResources>

<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.7.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-unchecked</arg>
<arg>-deprecation</arg>
<arg>-feature</arg>
</args>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<id>default-compile</id>
<phase>none</phase>
</execution>
<execution>
<id>default-testCompile</id>
<phase>none</phase>
</execution>
<execution>
<id>java-compile</id>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<phase>compile</phase>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
Expand Down
Loading

0 comments on commit 67319fc

Please sign in to comment.