diff --git a/utils/src/main/scala/com/ibm/sparktc/sparkbench/utils/SparkFuncs.scala b/utils/src/main/scala/com/ibm/sparktc/sparkbench/utils/SparkFuncs.scala index 43d224cf..932216b5 100644 --- a/utils/src/main/scala/com/ibm/sparktc/sparkbench/utils/SparkFuncs.scala +++ b/utils/src/main/scala/com/ibm/sparktc/sparkbench/utils/SparkFuncs.scala @@ -20,6 +20,7 @@ package com.ibm.sparktc.sparkbench.utils import org.apache.spark.sql.functions.lit import org.apache.spark.sql.{DataFrame, SparkSession} import com.databricks.spark.avro._ +import org.apache.hadoop.fs.FileSystem object SparkFuncs { @@ -62,9 +63,27 @@ object SparkFuncs { } } + private def extractHDFSuri(str: String): Option[String] = { + val regx = """hdfs://(hdfs:\d+)""".r.unanchored + str match { + case regx(uri) => Some(uri) + case _ => None + } + } + + private def getHadoopFS(path: String, spark: SparkSession): FileSystem = { + val uriOpt = extractHDFSuri(path) + if(uriOpt.nonEmpty){ + org.apache.hadoop.fs.FileSystem.get(new java.net.URI(s"hdfs://${uriOpt.get}"), new org.apache.hadoop.conf.Configuration()) + } + else { + val conf = spark.sparkContext.hadoopConfiguration + org.apache.hadoop.fs.FileSystem.get(conf) + } + } + def pathExists(path: String, spark: SparkSession): Boolean = { - val conf = spark.sparkContext.hadoopConfiguration - val fs = org.apache.hadoop.fs.FileSystem.get(conf) + val fs: FileSystem = getHadoopFS(path, spark) fs.exists(new org.apache.hadoop.fs.Path(path)) }