From cace8ffb9583de6ec397bb190c8c6da4af1b8526 Mon Sep 17 00:00:00 2001 From: Emily Curtin Date: Thu, 8 Feb 2018 11:00:32 -0500 Subject: [PATCH] Bugfix for remote HDFS systems Bugfix to allow users to provide hdfs hostname in their urls. --- .../sparktc/sparkbench/utils/SparkFuncs.scala | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/utils/src/main/scala/com/ibm/sparktc/sparkbench/utils/SparkFuncs.scala b/utils/src/main/scala/com/ibm/sparktc/sparkbench/utils/SparkFuncs.scala index 43d224cf..932216b5 100644 --- a/utils/src/main/scala/com/ibm/sparktc/sparkbench/utils/SparkFuncs.scala +++ b/utils/src/main/scala/com/ibm/sparktc/sparkbench/utils/SparkFuncs.scala @@ -20,6 +20,7 @@ package com.ibm.sparktc.sparkbench.utils import org.apache.spark.sql.functions.lit import org.apache.spark.sql.{DataFrame, SparkSession} import com.databricks.spark.avro._ +import org.apache.hadoop.fs.FileSystem object SparkFuncs { @@ -62,9 +63,27 @@ object SparkFuncs { } } + private def extractHDFSuri(str: String): Option[String] = { + val regx = """hdfs://(hdfs:\d+)""".r.unanchored + str match { + case regx(uri) => Some(uri) + case _ => None + } + } + + private def getHadoopFS(path: String, spark: SparkSession): FileSystem = { + val uriOpt = extractHDFSuri(path) + if(uriOpt.nonEmpty){ + org.apache.hadoop.fs.FileSystem.get(new java.net.URI(s"hdfs://${uriOpt.get}"), new org.apache.hadoop.conf.Configuration()) + } + else { + val conf = spark.sparkContext.hadoopConfiguration + org.apache.hadoop.fs.FileSystem.get(conf) + } + } + def pathExists(path: String, spark: SparkSession): Boolean = { - val conf = spark.sparkContext.hadoopConfiguration - val fs = org.apache.hadoop.fs.FileSystem.get(conf) + val fs: FileSystem = getHadoopFS(path, spark) fs.exists(new org.apache.hadoop.fs.Path(path)) }