Skip to content

Commit

Permalink
Bugfix for remote HDFS systems
Browse files Browse the repository at this point in the history
Bugfix to allow users to provide hdfs hostname in their urls.
  • Loading branch information
ecurtin authored and Emily Curtin committed Feb 8, 2018
1 parent 992a278 commit cace8ff
Showing 1 changed file with 21 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package com.ibm.sparktc.sparkbench.utils
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.{DataFrame, SparkSession}
import com.databricks.spark.avro._
import org.apache.hadoop.fs.FileSystem

object SparkFuncs {

Expand Down Expand Up @@ -62,9 +63,27 @@ object SparkFuncs {
}
}

private def extractHDFSuri(str: String): Option[String] = {
val regx = """hdfs://(hdfs:\d+)""".r.unanchored
str match {
case regx(uri) => Some(uri)
case _ => None
}
}

private def getHadoopFS(path: String, spark: SparkSession): FileSystem = {
val uriOpt = extractHDFSuri(path)
if(uriOpt.nonEmpty){
org.apache.hadoop.fs.FileSystem.get(new java.net.URI(s"hdfs://${uriOpt.get}"), new org.apache.hadoop.conf.Configuration())
}
else {
val conf = spark.sparkContext.hadoopConfiguration
org.apache.hadoop.fs.FileSystem.get(conf)
}
}

def pathExists(path: String, spark: SparkSession): Boolean = {
val conf = spark.sparkContext.hadoopConfiguration
val fs = org.apache.hadoop.fs.FileSystem.get(conf)
val fs: FileSystem = getHadoopFS(path, spark)
fs.exists(new org.apache.hadoop.fs.Path(path))
}

Expand Down

0 comments on commit cace8ff

Please sign in to comment.