From 1e437e2b1e00aa1942781f0a9b5b2f0868644b81 Mon Sep 17 00:00:00 2001 From: Sahil Takiar Date: Thu, 28 Jun 2018 17:50:41 -0700 Subject: [PATCH] HIVE-20032: Don't serialize hashCode for repartitionAndSortWithinPartitions (Sahil Takiar, reviewed by Rui Li) --- .../org/apache/hadoop/hive/conf/HiveConf.java | 2 +- .../ql/exec/spark/TestSparkStatistics.java | 12 ++-- .../jdbc/TestJdbcWithLocalClusterSpark.java | 10 +-- .../TestJdbcWithMiniHS2ErasureCoding.java | 11 ++-- ...MultiSessionsHS2WithLocalClusterSpark.java | 10 +-- .../hive/spark/HiveKryoRegistrator.java | 2 +- .../hive/spark/NoHashCodeKryoSerializer.java | 65 +++++++++++++++++++ .../ql/exec/spark/ShuffleKryoSerializer.java | 62 ++++++++++++++++++ .../hive/ql/exec/spark/SortByShuffler.java | 12 +++- .../ql/exec/spark/SparkPlanGenerator.java | 20 ++++-- .../org/apache/hadoop/hive/ql/io/HiveKey.java | 4 ++ .../ql/exec/spark/TestHiveSparkClient.java | 17 ++--- .../hive/ql/exec/spark/TestSparkPlan.java | 14 ++-- .../hive/spark/client/TestSparkClient.java | 9 ++- 14 files changed, 208 insertions(+), 42 deletions(-) create mode 100644 kryo-registrator/src/main/java/org/apache/hive/spark/NoHashCodeKryoSerializer.java create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleKryoSerializer.java diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 9f1da60dcf0d..39c77b3fe52c 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4239,7 +4239,7 @@ public static enum ConfVars { "If this is set to true, mapjoin optimization in Hive/Spark will use statistics from\n" + "TableScan operators at the root of operator tree, instead of parent ReduceSink\n" + "operators of the Join operator."), - SPARK_OPTIMIZE_SHUFFLE_SERDE("hive.spark.optimize.shuffle.serde", false, + SPARK_OPTIMIZE_SHUFFLE_SERDE("hive.spark.optimize.shuffle.serde", true, "If this is set to true, Hive on Spark will register custom serializers for data types\n" + "in shuffle. This should result in less shuffled data."), SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout", diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java index d383873acf53..191d5f5a8a1b 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java @@ -33,6 +33,9 @@ import org.junit.Assert; import org.junit.Test; +import java.io.File; +import java.net.MalformedURLException; +import java.net.URL; import java.nio.file.Paths; import java.util.List; import java.util.Map; @@ -41,13 +44,10 @@ public class TestSparkStatistics { @Test - public void testSparkStatistics() { + public void testSparkStatistics() throws MalformedURLException { + String confDir = "../../data/conf/spark/standalone/hive-site.xml"; + HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL()); HiveConf conf = new HiveConf(); - conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, - SQLStdHiveAuthorizerFactory.class.getName()); - conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); - conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "spark"); - conf.set("spark.master", "local-cluster[1,2,1024]"); conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), "TestSparkStatistics-local-dir").toString()); diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java index fe8a32f80168..341da33f1a4c 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java @@ -22,6 +22,8 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.File; +import java.net.MalformedURLException; import java.nio.file.Paths; import java.sql.Connection; import java.sql.DriverManager; @@ -66,12 +68,10 @@ public void run(HiveSessionHookContext sessionHookContext) throws HiveSQLExcepti private Connection hs2Conn = null; private Statement stmt; - private static HiveConf createHiveConf() { + private static HiveConf createHiveConf() throws MalformedURLException { + String confDir = "../../data/conf/spark/standalone/hive-site.xml"; + HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL()); HiveConf conf = new HiveConf(); - conf.set("hive.execution.engine", "spark"); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.set("spark.master", "local-cluster[2,2,1024]"); - conf.set("hive.spark.client.connect.timeout", "30000ms"); // FIXME: Hadoop3 made the incompatible change for dfs.client.datanode-restart.timeout // while spark2 is still using Hadoop2. // Spark requires Hive to support Hadoop3 first then Spark can start diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java index efb37590e6dd..b2ddff7a2e29 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java @@ -18,9 +18,11 @@ package org.apache.hive.jdbc; +import java.io.File; import java.io.IOException; import java.io.StringWriter; import java.io.Writer; +import java.net.MalformedURLException; import java.nio.file.Paths; import java.sql.Connection; import java.sql.DriverManager; @@ -32,7 +34,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.processors.ErasureProcessor; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.HadoopShims.HdfsErasureCodingShim; @@ -63,19 +64,17 @@ public class TestJdbcWithMiniHS2ErasureCoding { private static HiveConf conf; private Connection hs2Conn = null; - private static HiveConf createHiveOnSparkConf() { + private static HiveConf createHiveOnSparkConf() throws MalformedURLException { + String confDir = "../../data/conf/spark/standalone/hive-site.xml"; + HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL()); HiveConf hiveConf = new HiveConf(); // Tell dfs not to consider load when choosing a datanode as this can cause failure as // in a test we do not have spare datanode capacity. hiveConf.setBoolean("dfs.namenode.redundancy.considerLoad", false); - hiveConf.set("hive.execution.engine", "spark"); - hiveConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - hiveConf.set("spark.master", "local-cluster[2,2,1024]"); hiveConf.set("hive.spark.client.connect.timeout", "30000ms"); hiveConf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), "TestJdbcWithMiniHS2ErasureCoding-local-dir") .toString()); - hiveConf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); // avoid ZK errors return hiveConf; } diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java index 79d56f5633ce..f7586c108d95 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java @@ -18,6 +18,8 @@ package org.apache.hive.jdbc; +import java.io.File; +import java.net.MalformedURLException; import java.nio.file.Paths; import java.sql.Connection; import java.sql.DriverManager; @@ -73,14 +75,12 @@ public void run(HiveSessionHookContext sessionHookContext) throws HiveSQLExcepti private ExecutorService pool = null; - private static HiveConf createHiveConf() { + private static HiveConf createHiveConf() throws MalformedURLException { + String confDir = "../../data/conf/spark/standalone/hive-site.xml"; + HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL()); HiveConf conf = new HiveConf(); conf.set("hive.exec.parallel", "true"); - conf.set("hive.execution.engine", "spark"); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.set("spark.master", "local-cluster[2,2,1024]"); conf.set("spark.deploy.defaultCores", "2"); - conf.set("hive.spark.client.connect.timeout", "30000ms"); // FIXME: Hadoop3 made the incompatible change for dfs.client.datanode-restart.timeout // while spark2 is still using Hadoop2. // Spark requires Hive to support Hadoop3 first then Spark can start diff --git a/kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoRegistrator.java b/kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoRegistrator.java index 838ad9998204..001ab8e08635 100644 --- a/kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoRegistrator.java +++ b/kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoRegistrator.java @@ -54,7 +54,7 @@ public HiveKey read(Kryo kryo, Input input, Class type) { } } - private static class BytesWritableSerializer extends Serializer { + static class BytesWritableSerializer extends Serializer { public void write(Kryo kryo, Output output, BytesWritable object) { output.writeVarInt(object.getLength(), true); diff --git a/kryo-registrator/src/main/java/org/apache/hive/spark/NoHashCodeKryoSerializer.java b/kryo-registrator/src/main/java/org/apache/hive/spark/NoHashCodeKryoSerializer.java new file mode 100644 index 000000000000..d4bcc5bdc813 --- /dev/null +++ b/kryo-registrator/src/main/java/org/apache/hive/spark/NoHashCodeKryoSerializer.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.spark; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.io.BytesWritable; +import org.apache.spark.SparkConf; +import org.apache.spark.serializer.KryoSerializer; + + +/** + * A {@link KryoSerializer} that does not serialize hash codes while serializing a + * {@link HiveKey}. This decreases the amount of data to be shuffled during a Spark shuffle. + */ +public class NoHashCodeKryoSerializer extends KryoSerializer { + + private static final long serialVersionUID = 3350910170041648022L; + + public NoHashCodeKryoSerializer(SparkConf conf) { + super(conf); + } + + @Override + public Kryo newKryo() { + Kryo kryo = super.newKryo(); + kryo.register(HiveKey.class, new HiveKeySerializer()); + kryo.register(BytesWritable.class, new HiveKryoRegistrator.BytesWritableSerializer()); + return kryo; + } + + private static class HiveKeySerializer extends Serializer { + + public void write(Kryo kryo, Output output, HiveKey object) { + output.writeVarInt(object.getLength(), true); + output.write(object.getBytes(), 0, object.getLength()); + } + + public HiveKey read(Kryo kryo, Input input, Class type) { + int len = input.readVarInt(true); + byte[] bytes = new byte[len]; + input.readBytes(bytes); + return new HiveKey(bytes); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleKryoSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleKryoSerializer.java new file mode 100644 index 000000000000..47a0f77c98b4 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleKryoSerializer.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hive.spark.client.SparkClientUtilities; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.FileNotFoundException; +import java.lang.reflect.InvocationTargetException; +import java.net.MalformedURLException; + + +final class ShuffleKryoSerializer { + + private static final String HIVE_SHUFFLE_KRYO_SERIALIZER = "org.apache.hive.spark.NoHashCodeKryoSerializer"; + + private static org.apache.spark.serializer.KryoSerializer INSTANCE; + + private ShuffleKryoSerializer() { + // Don't create me + } + + static org.apache.spark.serializer.KryoSerializer getInstance(JavaSparkContext sc, + Configuration conf) { + if (INSTANCE == null) { + synchronized (ShuffleKryoSerializer.class) { + if (INSTANCE == null) { + try { + INSTANCE = (org.apache.spark.serializer.KryoSerializer) Thread.currentThread().getContextClassLoader().loadClass( + HIVE_SHUFFLE_KRYO_SERIALIZER).getConstructor(SparkConf.class).newInstance( + sc.getConf()); + return INSTANCE; + } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException | ClassNotFoundException e) { + throw new IllegalStateException( + "Unable to create kryo serializer for shuffle RDDs using " + + "class " + HIVE_SHUFFLE_KRYO_SERIALIZER, e); + } + } else { + return INSTANCE; + } + } + } + return INSTANCE; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java index 22b598f0b45f..1bf5a562c441 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java @@ -23,19 +23,24 @@ import org.apache.spark.HashPartitioner; import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.rdd.ShuffledRDD; +import org.apache.spark.serializer.KryoSerializer; import org.apache.spark.storage.StorageLevel; + public class SortByShuffler implements SparkShuffler { private final boolean totalOrder; private final SparkPlan sparkPlan; + private final KryoSerializer shuffleSerializer; /** * @param totalOrder whether this shuffler provides total order shuffle. */ - public SortByShuffler(boolean totalOrder, SparkPlan sparkPlan) { + public SortByShuffler(boolean totalOrder, SparkPlan sparkPlan, KryoSerializer shuffleSerializer) { this.totalOrder = totalOrder; this.sparkPlan = sparkPlan; + this.shuffleSerializer = shuffleSerializer; } @Override @@ -56,6 +61,11 @@ public JavaPairRDD shuffle( Partitioner partitioner = new HashPartitioner(numPartitions); rdd = input.repartitionAndSortWithinPartitions(partitioner); } + if (shuffleSerializer != null) { + if (rdd.rdd() instanceof ShuffledRDD) { + ((ShuffledRDD) rdd.rdd()).setSerializer(shuffleSerializer); + } + } return rdd; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 001d0b0518cc..806deb5f3193 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -18,13 +18,19 @@ package org.apache.hadoop.hive.ql.exec.spark; +import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.MalformedURLException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import org.apache.hive.spark.client.SparkClientUtilities; +import org.apache.spark.SparkConf; import org.apache.spark.util.CallSite; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,10 +74,11 @@ @SuppressWarnings("rawtypes") public class SparkPlanGenerator { + private static final String CLASS_NAME = SparkPlanGenerator.class.getName(); - private final PerfLogger perfLogger = SessionState.getPerfLogger(); private static final Logger LOG = LoggerFactory.getLogger(SparkPlanGenerator.class); + private final PerfLogger perfLogger = SessionState.getPerfLogger(); private final JavaSparkContext sc; private final JobConf jobConf; private final Context context; @@ -82,6 +89,7 @@ public class SparkPlanGenerator { private final Map workToParentWorkTranMap; // a map from each BaseWork to its cloned JobConf private final Map workToJobConf; + private final org.apache.spark.serializer.KryoSerializer shuffleSerializer; public SparkPlanGenerator( JavaSparkContext sc, @@ -98,6 +106,11 @@ public SparkPlanGenerator( this.workToParentWorkTranMap = new HashMap(); this.sparkReporter = sparkReporter; this.workToJobConf = new HashMap(); + if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE)) { + this.shuffleSerializer = ShuffleKryoSerializer.getInstance(sc, jobConf); + } else { + this.shuffleSerializer = null; + } } public SparkPlan generate(SparkWork sparkWork) throws Exception { @@ -251,9 +264,9 @@ private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, boolea "AssertionError: SHUFFLE_NONE should only be used for UnionWork."); SparkShuffler shuffler; if (edge.isMRShuffle()) { - shuffler = new SortByShuffler(false, sparkPlan); + shuffler = new SortByShuffler(false, sparkPlan, shuffleSerializer); } else if (edge.isShuffleSort()) { - shuffler = new SortByShuffler(true, sparkPlan); + shuffler = new SortByShuffler(true, sparkPlan, shuffleSerializer); } else { shuffler = new GroupByShuffler(); } @@ -398,5 +411,4 @@ private void initStatsPublisher(BaseWork work) throws HiveException { } } } - } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java index a1f944621b24..b6f78d8abc4a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java @@ -38,6 +38,10 @@ public HiveKey() { hashCodeValid = false; } + public HiveKey(byte[] bytes) { + super(bytes); + } + public HiveKey(byte[] bytes, int hashcode) { super(bytes); hashCode = hashcode; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveSparkClient.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveSparkClient.java index b960508fc674..f42cffd8b2cf 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveSparkClient.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveSparkClient.java @@ -37,6 +37,7 @@ import org.junit.Assert; import org.junit.Test; +import java.io.File; import java.nio.file.Paths; import java.util.List; @@ -48,19 +49,19 @@ public class TestHiveSparkClient { @Test public void testSetJobGroupAndDescription() throws Exception { - + String confDir = "../data/conf/spark/local/hive-site.xml"; + HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL()); HiveConf conf = new HiveConf(); - conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, - SQLStdHiveAuthorizerFactory.class.getName()); - conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); - conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "spark"); - conf.set("spark.master", "local"); + + // Set to false because we don't launch a job using LocalHiveSparkClient so the + // hive-kryo-registrator jar is never added to the classpath + conf.setBoolVar(HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE, false); conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), "TestHiveSparkClient-local-dir").toString()); SessionState.start(conf); FileSystem fs = FileSystem.getLocal(conf); - Path tmpDir = new Path("TestSparkPlan-tmp"); + Path tmpDir = new Path("TestHiveSparkClient-tmp"); IDriver driver = null; JavaSparkContext sc = null; @@ -81,7 +82,7 @@ public void testSetJobGroupAndDescription() throws Exception { SparkConf sparkConf = new SparkConf(); sparkConf.setMaster("local"); - sparkConf.setAppName("TestSparkPlan-app"); + sparkConf.setAppName("TestHiveSparkClient-app"); sc = new JavaSparkContext(sparkConf); byte[] jobConfBytes = KryoSerializer.serializeJobConf(jobConf); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkPlan.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkPlan.java index 5f47bb4276ab..ef02a292b622 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkPlan.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkPlan.java @@ -46,6 +46,8 @@ import scala.Tuple2; import scala.collection.JavaConversions; +import java.io.File; +import java.nio.file.Paths; import java.util.List; @@ -53,11 +55,15 @@ public class TestSparkPlan { @Test public void testSetRDDCallSite() throws Exception { + String confDir = "../data/conf/spark/local/hive-site.xml"; + HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL()); HiveConf conf = new HiveConf(); - conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, - SQLStdHiveAuthorizerFactory.class.getName()); - conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); - conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "spark"); + + // Set to false because we don't launch a job using LocalHiveSparkClient so the + // hive-kryo-registrator jar is never added to the classpath + conf.setBoolVar(HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE, false); + conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), + "TestSparkPlan-local-dir").toString()); FileSystem fs = FileSystem.getLocal(conf); Path tmpDir = new Path("TestSparkPlan-tmp"); diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java index 681463e4056d..d7380038fa4a 100644 --- a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java +++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java @@ -35,6 +35,7 @@ import java.io.FileOutputStream; import java.io.InputStream; import java.io.Serializable; +import java.net.MalformedURLException; import java.net.URI; import java.nio.file.Paths; import java.util.Arrays; @@ -70,7 +71,13 @@ public class TestSparkClient { private static final HiveConf HIVECONF = new HiveConf(); static { - HIVECONF.set("hive.spark.client.connect.timeout", "30000ms"); + String confDir = "../data/conf/spark/standalone/hive-site.xml"; + try { + HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL()); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + HIVECONF.setBoolVar(HiveConf.ConfVars.HIVE_IN_TEST, true); HIVECONF.setVar(HiveConf.ConfVars.SPARK_CLIENT_TYPE, HiveConf.HIVE_SPARK_LAUNCHER_CLIENT); }