Skip to content

Commit

Permalink
HIVE-20032: Don't serialize hashCode for repartitionAndSortWithinPart…
Browse files Browse the repository at this point in the history
…itions (Sahil Takiar, reviewed by Rui Li)
  • Loading branch information
sahilTakiar authored and Sahil Takiar committed Jul 27, 2018
1 parent 94ec368 commit 1e437e2
Show file tree
Hide file tree
Showing 14 changed files with 208 additions and 42 deletions.
2 changes: 1 addition & 1 deletion common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -4239,7 +4239,7 @@ public static enum ConfVars {
"If this is set to true, mapjoin optimization in Hive/Spark will use statistics from\n" +
"TableScan operators at the root of operator tree, instead of parent ReduceSink\n" +
"operators of the Join operator."),
SPARK_OPTIMIZE_SHUFFLE_SERDE("hive.spark.optimize.shuffle.serde", false,
SPARK_OPTIMIZE_SHUFFLE_SERDE("hive.spark.optimize.shuffle.serde", true,
"If this is set to true, Hive on Spark will register custom serializers for data types\n" +
"in shuffle. This should result in less shuffled data."),
SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import org.junit.Assert;
import org.junit.Test;

import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
Expand All @@ -41,13 +44,10 @@
public class TestSparkStatistics {

@Test
public void testSparkStatistics() {
public void testSparkStatistics() throws MalformedURLException {
String confDir = "../../data/conf/spark/standalone/hive-site.xml";
HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL());
HiveConf conf = new HiveConf();
conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
SQLStdHiveAuthorizerFactory.class.getName());
conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "spark");
conf.set("spark.master", "local-cluster[1,2,1024]");
conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"),
"TestSparkStatistics-local-dir").toString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.net.MalformedURLException;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
Expand Down Expand Up @@ -66,12 +68,10 @@ public void run(HiveSessionHookContext sessionHookContext) throws HiveSQLExcepti
private Connection hs2Conn = null;
private Statement stmt;

private static HiveConf createHiveConf() {
private static HiveConf createHiveConf() throws MalformedURLException {
String confDir = "../../data/conf/spark/standalone/hive-site.xml";
HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL());
HiveConf conf = new HiveConf();
conf.set("hive.execution.engine", "spark");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.set("spark.master", "local-cluster[2,2,1024]");
conf.set("hive.spark.client.connect.timeout", "30000ms");
// FIXME: Hadoop3 made the incompatible change for dfs.client.datanode-restart.timeout
// while spark2 is still using Hadoop2.
// Spark requires Hive to support Hadoop3 first then Spark can start
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package org.apache.hive.jdbc;

import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
import java.io.Writer;
import java.net.MalformedURLException;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
Expand All @@ -32,7 +34,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.processors.ErasureProcessor;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.HadoopShims.HdfsErasureCodingShim;
Expand Down Expand Up @@ -63,19 +64,17 @@ public class TestJdbcWithMiniHS2ErasureCoding {
private static HiveConf conf;
private Connection hs2Conn = null;

private static HiveConf createHiveOnSparkConf() {
private static HiveConf createHiveOnSparkConf() throws MalformedURLException {
String confDir = "../../data/conf/spark/standalone/hive-site.xml";
HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL());
HiveConf hiveConf = new HiveConf();
// Tell dfs not to consider load when choosing a datanode as this can cause failure as
// in a test we do not have spare datanode capacity.
hiveConf.setBoolean("dfs.namenode.redundancy.considerLoad", false);
hiveConf.set("hive.execution.engine", "spark");
hiveConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
hiveConf.set("spark.master", "local-cluster[2,2,1024]");
hiveConf.set("hive.spark.client.connect.timeout", "30000ms");
hiveConf.set("spark.local.dir",
Paths.get(System.getProperty("test.tmp.dir"), "TestJdbcWithMiniHS2ErasureCoding-local-dir")
.toString());
hiveConf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); // avoid ZK errors
return hiveConf;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hive.jdbc;

import java.io.File;
import java.net.MalformedURLException;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
Expand Down Expand Up @@ -73,14 +75,12 @@ public void run(HiveSessionHookContext sessionHookContext) throws HiveSQLExcepti
private ExecutorService pool = null;


private static HiveConf createHiveConf() {
private static HiveConf createHiveConf() throws MalformedURLException {
String confDir = "../../data/conf/spark/standalone/hive-site.xml";
HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL());
HiveConf conf = new HiveConf();
conf.set("hive.exec.parallel", "true");
conf.set("hive.execution.engine", "spark");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.set("spark.master", "local-cluster[2,2,1024]");
conf.set("spark.deploy.defaultCores", "2");
conf.set("hive.spark.client.connect.timeout", "30000ms");
// FIXME: Hadoop3 made the incompatible change for dfs.client.datanode-restart.timeout
// while spark2 is still using Hadoop2.
// Spark requires Hive to support Hadoop3 first then Spark can start
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public HiveKey read(Kryo kryo, Input input, Class<HiveKey> type) {
}
}

private static class BytesWritableSerializer extends Serializer<BytesWritable> {
static class BytesWritableSerializer extends Serializer<BytesWritable> {

public void write(Kryo kryo, Output output, BytesWritable object) {
output.writeVarInt(object.getLength(), true);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hive.spark;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.io.BytesWritable;
import org.apache.spark.SparkConf;
import org.apache.spark.serializer.KryoSerializer;


/**
* A {@link KryoSerializer} that does not serialize hash codes while serializing a
* {@link HiveKey}. This decreases the amount of data to be shuffled during a Spark shuffle.
*/
public class NoHashCodeKryoSerializer extends KryoSerializer {

private static final long serialVersionUID = 3350910170041648022L;

public NoHashCodeKryoSerializer(SparkConf conf) {
super(conf);
}

@Override
public Kryo newKryo() {
Kryo kryo = super.newKryo();
kryo.register(HiveKey.class, new HiveKeySerializer());
kryo.register(BytesWritable.class, new HiveKryoRegistrator.BytesWritableSerializer());
return kryo;
}

private static class HiveKeySerializer extends Serializer<HiveKey> {

public void write(Kryo kryo, Output output, HiveKey object) {
output.writeVarInt(object.getLength(), true);
output.write(object.getBytes(), 0, object.getLength());
}

public HiveKey read(Kryo kryo, Input input, Class<HiveKey> type) {
int len = input.readVarInt(true);
byte[] bytes = new byte[len];
input.readBytes(bytes);
return new HiveKey(bytes);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.exec.spark;

import org.apache.hadoop.conf.Configuration;
import org.apache.hive.spark.client.SparkClientUtilities;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;

import java.io.FileNotFoundException;
import java.lang.reflect.InvocationTargetException;
import java.net.MalformedURLException;


final class ShuffleKryoSerializer {

private static final String HIVE_SHUFFLE_KRYO_SERIALIZER = "org.apache.hive.spark.NoHashCodeKryoSerializer";

private static org.apache.spark.serializer.KryoSerializer INSTANCE;

private ShuffleKryoSerializer() {
// Don't create me
}

static org.apache.spark.serializer.KryoSerializer getInstance(JavaSparkContext sc,
Configuration conf) {
if (INSTANCE == null) {
synchronized (ShuffleKryoSerializer.class) {
if (INSTANCE == null) {
try {
INSTANCE = (org.apache.spark.serializer.KryoSerializer) Thread.currentThread().getContextClassLoader().loadClass(
HIVE_SHUFFLE_KRYO_SERIALIZER).getConstructor(SparkConf.class).newInstance(
sc.getConf());
return INSTANCE;
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException | ClassNotFoundException e) {
throw new IllegalStateException(
"Unable to create kryo serializer for shuffle RDDs using " +
"class " + HIVE_SHUFFLE_KRYO_SERIALIZER, e);
}
} else {
return INSTANCE;
}
}
}
return INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,24 @@
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.rdd.ShuffledRDD;
import org.apache.spark.serializer.KryoSerializer;
import org.apache.spark.storage.StorageLevel;


public class SortByShuffler implements SparkShuffler<BytesWritable> {

private final boolean totalOrder;
private final SparkPlan sparkPlan;
private final KryoSerializer shuffleSerializer;

/**
* @param totalOrder whether this shuffler provides total order shuffle.
*/
public SortByShuffler(boolean totalOrder, SparkPlan sparkPlan) {
public SortByShuffler(boolean totalOrder, SparkPlan sparkPlan, KryoSerializer shuffleSerializer) {
this.totalOrder = totalOrder;
this.sparkPlan = sparkPlan;
this.shuffleSerializer = shuffleSerializer;
}

@Override
Expand All @@ -56,6 +61,11 @@ public JavaPairRDD<HiveKey, BytesWritable> shuffle(
Partitioner partitioner = new HashPartitioner(numPartitions);
rdd = input.repartitionAndSortWithinPartitions(partitioner);
}
if (shuffleSerializer != null) {
if (rdd.rdd() instanceof ShuffledRDD) {
((ShuffledRDD) rdd.rdd()).setSerializer(shuffleSerializer);
}
}
return rdd;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@

package org.apache.hadoop.hive.ql.exec.spark;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.MalformedURLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.hive.spark.client.SparkClientUtilities;
import org.apache.spark.SparkConf;
import org.apache.spark.util.CallSite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -68,10 +74,11 @@

@SuppressWarnings("rawtypes")
public class SparkPlanGenerator {

private static final String CLASS_NAME = SparkPlanGenerator.class.getName();
private final PerfLogger perfLogger = SessionState.getPerfLogger();
private static final Logger LOG = LoggerFactory.getLogger(SparkPlanGenerator.class);

private final PerfLogger perfLogger = SessionState.getPerfLogger();
private final JavaSparkContext sc;
private final JobConf jobConf;
private final Context context;
Expand All @@ -82,6 +89,7 @@ public class SparkPlanGenerator {
private final Map<BaseWork, SparkTran> workToParentWorkTranMap;
// a map from each BaseWork to its cloned JobConf
private final Map<BaseWork, JobConf> workToJobConf;
private final org.apache.spark.serializer.KryoSerializer shuffleSerializer;

public SparkPlanGenerator(
JavaSparkContext sc,
Expand All @@ -98,6 +106,11 @@ public SparkPlanGenerator(
this.workToParentWorkTranMap = new HashMap<BaseWork, SparkTran>();
this.sparkReporter = sparkReporter;
this.workToJobConf = new HashMap<BaseWork, JobConf>();
if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE)) {
this.shuffleSerializer = ShuffleKryoSerializer.getInstance(sc, jobConf);
} else {
this.shuffleSerializer = null;
}
}

public SparkPlan generate(SparkWork sparkWork) throws Exception {
Expand Down Expand Up @@ -251,9 +264,9 @@ private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, boolea
"AssertionError: SHUFFLE_NONE should only be used for UnionWork.");
SparkShuffler shuffler;
if (edge.isMRShuffle()) {
shuffler = new SortByShuffler(false, sparkPlan);
shuffler = new SortByShuffler(false, sparkPlan, shuffleSerializer);
} else if (edge.isShuffleSort()) {
shuffler = new SortByShuffler(true, sparkPlan);
shuffler = new SortByShuffler(true, sparkPlan, shuffleSerializer);
} else {
shuffler = new GroupByShuffler();
}
Expand Down Expand Up @@ -398,5 +411,4 @@ private void initStatsPublisher(BaseWork work) throws HiveException {
}
}
}

}
4 changes: 4 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ public HiveKey() {
hashCodeValid = false;
}

public HiveKey(byte[] bytes) {
super(bytes);
}

public HiveKey(byte[] bytes, int hashcode) {
super(bytes);
hashCode = hashcode;
Expand Down
Loading

0 comments on commit 1e437e2

Please sign in to comment.