Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FITS DatasourceV2 #89

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import Dependencies._
import xerial.sbt.Sonatype._

lazy val root = (project in file(".")).
settings(
inThisBuild(List(
Expand Down Expand Up @@ -44,10 +41,12 @@ lazy val root = (project in file(".")).
// assemblyShadeRules in assembly := Seq(ShadeRule.rename("nom.**" -> "new_nom.@1").inAll),
// Put dependencies of the library
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.4.3" % "provided",
"org.apache.spark" %% "spark-sql" % "2.4.3" % "provided",
scalaTest % Test
)
"org.apache.spark" %% "spark-core" % "3.0.0-preview" % "provided",
"org.apache.spark" %% "spark-sql" % "3.0.0-preview" % "provided",
"org.scalatest" %% "scalatest" % "3.0.1" % Test
),

scalaVersion := "2.12.1"
)

// POM settings for Sonatype
Expand All @@ -67,7 +66,13 @@ developers := List(
"Julien Peloton",
"[email protected]",
url("https://github.com/JulienPeloton")
)
),
Developer(
"MayurBhosale",
"Mayur Bhosale",
"[email protected]",
url("https://github.com/mayurdb")
)
)

licenses := Seq("Apache-2.0" -> url("http://www.apache.org/licenses/LICENSE-2.0.txt"))
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
com.astrolabsoftware.sparkfits.v2.FitsDataSourceV2
com.astrolabsoftware.sparkfits.DefaultSource
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright 2019 AstroLab Software
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.astrolabsoftware.sparkfits.utils

import com.astrolabsoftware.sparkfits.FitsLib
import com.astrolabsoftware.sparkfits.FitsLib.Fits
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.log4j.LogManager
import org.apache.spark.sql.execution.datasources.PartitionedFile

import scala.util.Try

class FitsMetadata(partitionedFile: PartitionedFile, val index: Int, conf: Configuration) {

val log = LogManager.getRootLogger
val path = new Path(partitionedFile.filePath)
private[sparkfits] val fits = new Fits(path, conf, conf.getInt("hdu", -1))
private[sparkfits] val startStop = fits.blockBoundaries
private val header = fits.blockHeader
private[sparkfits] var notValid = false
val keyValues = FitsLib.parseHeader(header)
if (keyValues("NAXIS").toInt == 0) {
conf.get("mode") match {
case "PERMISSIVE" =>
log.warn(s"Empty HDU for ${path}")
notValid = true
case "FAILFAST" =>
log.warn(s"Empty HDU for ${path}")
log.warn(s"Use option('mode', 'PERMISSIVE') if you want to discard all empty HDUs.")
case _ =>
}
}

private var recordLength: Long = 0L
var rowSizeInt: Int = 0
var rowSizeLong: Long = 0L

if (!notValid) {

val nrowsLong = fits.hdu.getNRows(keyValues)
rowSizeInt = fits.hdu.getSizeRowBytes(keyValues)
rowSizeLong = rowSizeInt.toLong


// Get the record length in Bytes (get integer!). First look if the user
// specify a size for the recordLength. If not, set it to max(1 Ko, rowSize).
// If the HDU is an image, the recordLength is the row size (NAXIS1 * nbytes)
val recordLengthFromUser = Try {
conf.get("recordlength").toInt
}
.getOrElse {
if (fits.hduType == "IMAGE") {
rowSizeInt
} else {
// set it to max(1 Ko, rowSize)
math.max((1 * 1024 / rowSizeInt) * rowSizeInt, rowSizeInt)
}
}


// For Table, seek for a round number of lines for the record
// ToDo: Cases when the user has given the record length. Currenty, this
// recordLength is not getting used.
recordLength = (recordLengthFromUser / rowSizeInt) * rowSizeInt

// Make sure that the recordLength is not bigger than the block size!
// This is a guard for small files.
recordLength = if ((recordLength / rowSizeInt) < nrowsLong.toInt) {
// OK less than the total number of lines
recordLength
} else {
// Small files, one record is the entire file.
nrowsLong.toInt * rowSizeLong.toInt
}
// Move to the starting binary index
fits.data.seek(startStop.dataStart)
}
}
159 changes: 159 additions & 0 deletions src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Copyright 2019 AstroLab Software
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.astrolabsoftware.sparkfits.utils

import com.astrolabsoftware.sparkfits.FitsLib.Fits
import com.astrolabsoftware.sparkfits.FitsSchema.getSchema
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{LocatedFileStatus, Path, RemoteIterator}

import scala.util.Try

object FitsUtils {
/**
* Search for input FITS files. The input path can be either a single
* FITS file, or a folder containing several FITS files with the
* same HDU structure, or a globbing structure e.g. "toto/\*.fits".
* Raise a NullPointerException if no files found.
*
* @param fn : (String)
* Input path.
* @return (List[String]) List with all files found.
*
*/
def searchFitsFile(fn: String, conf: Configuration, verbosity: Boolean=false): List[String] = {
// Make it Hadoop readable
val path = new Path(fn)
val fs = path.getFileSystem(conf)

// Check whether we are globbing
val isGlob : Boolean = Try{fs.globStatus(path).size > 1}.getOrElse(false)

val isCommaSep : Boolean = Try{fn.split(",").size > 1}.getOrElse(false)

// Check whether we want to load a single FITS file or several
val isDir : Boolean = fs.isDirectory(path)
val isFile : Boolean = fs.isFile(path)

// println(s"isDir=$isDir isFile=$isFile path=$path")

// List all the files
val listOfFitsFiles : List[String] = if (isGlob) {
val arr = fs.globStatus(path)
arr.map(x => x.getPath.toString).toList
} else if (isDir) {
val it = fs.listFiles(path, true)
getListOfFiles(it).filter{file => file.endsWith(".fits")}
} else if (isCommaSep) {
fn.split(",").toList
} else if (isFile){
List(fn)
} else {
List[String]()
}

// Check that we have at least one file
listOfFitsFiles.size match {
case x if x > 0 => if (verbosity) {
println("FitsRelation.searchFitsFile> Found " + listOfFitsFiles.size.toString + " file(s):")
listOfFitsFiles.foreach(println)
}
case x if x <= 0 => throw new NullPointerException(s"""
0 files detected! Is $fn a directory containing
FITS files or a FITS file?
""")
}

listOfFitsFiles
}

/**
* Load recursively all FITS file inside a directory.
*
* @param it : (RemoteIterator[LocatedFileStatus])
* Iterator from a Hadoop Path containing informations about files.
* @param extensions : (List[String)
* List of accepted extensions. Currently only .fits is available.
* Default is List("*.fits").
* @return List of files as a list of String.
*
*/
def getListOfFiles(it: RemoteIterator[LocatedFileStatus],
extensions: List[String] = List(".fits")): List[String] = {
if (!it.hasNext) {
Nil
} else {
it.next.getPath.toString :: getListOfFiles(it, extensions)
}
}

/**
* Check that the schemas of different FITS HDU to be added are
* the same. Throw an AssertionError otherwise.
* The check is performed only for BINTABLE.
*
* NOTE: This operation is very long for many files! Do not use it for
* hundreds of files!
*
* @param listOfFitsFiles : (List[String])
* List of files as a list of String.
* @return (String) the type of HDU: BINTABLE, IMAGE, EMPTY, or
* NOT UNDERSTOOD if not registered.
*
*/
def checkSchemaAndReturnType(listOfFitsFiles : List[String], conf: Configuration): Boolean = {
// Targeted HDU
val indexHDU = conf.get("hdu").toInt

// Initialise
val path_init = new Path(listOfFitsFiles(0))

val fits_init = new Fits(path_init, conf, indexHDU)

if (fits_init.hdu.implemented) {
// Do not perform checks if the mode is PERMISSIVE.
if (conf.get("mode") != "PERMISSIVE") {
val schema_init = getSchema(fits_init)
fits_init.data.close()
for (file <- listOfFitsFiles.slice(1, listOfFitsFiles.size)) {
var path = new Path(file)
val fits = new Fits(path, conf, indexHDU)
val schema = getSchema(fits)
val isOk = schema_init == schema
isOk match {
case true => isOk
case false => {
throw new AssertionError(
"""
You are trying to add HDU data with different structures!
Check that the number of columns, names of columns and element
types are the same. re-run with .option("verbose", true) to
list all the files.
""")
}
}
fits.data.close()
}
}
true
} else {
println(s"""
FITS type ${fits_init.hduType} not supported yet.
An empty DataFrame will be returned.""")
false
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2019 AstroLab Software
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.astrolabsoftware.sparkfits.v2

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

class FitsDataSourceV2 extends TableProvider with DataSourceRegister {

// ToDo: Use the name "fits" and still resolve v1 vs v2 somehow
override def shortName() = "fitsv2"

lazy val sparkSession = SparkSession.active

override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
FitsTable(sparkSession, options, Some(schema))
}

override def getTable(options: CaseInsensitiveStringMap): Table = {
FitsTable(sparkSession, options, None)
}
}
Loading