From d088edcb8699c1d37f266c3c9e00d85aa5349e6a Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Sun, 1 Dec 2019 22:20:46 +0530 Subject: [PATCH 01/16] Initial commit --- build.sbt | 8 ++-- ...pache.spark.sql.sources.DataSourceRegister | 2 +- .../sparkfits/v2/FitsDataSourceV2.scala | 27 ++++++++++++ .../sparkfits/v2/FitsTable.scala | 43 +++++++++++++++++++ 4 files changed, 76 insertions(+), 4 deletions(-) create mode 100644 src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsDataSourceV2.scala create mode 100644 src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala diff --git a/build.sbt b/build.sbt index d58c781..79ad47c 100644 --- a/build.sbt +++ b/build.sbt @@ -44,10 +44,12 @@ lazy val root = (project in file(".")). // assemblyShadeRules in assembly := Seq(ShadeRule.rename("nom.**" -> "new_nom.@1").inAll), // Put dependencies of the library libraryDependencies ++= Seq( - "org.apache.spark" %% "spark-core" % "2.4.3" % "provided", - "org.apache.spark" %% "spark-sql" % "2.4.3" % "provided", + "org.apache.spark" %% "spark-core" % "3.0.0-preview" % "provided", + "org.apache.spark" %% "spark-sql" % "3.0.0-preview" % "provided", scalaTest % Test - ) + ), + + scalaVersion := "2.12.1" ) // POM settings for Sonatype diff --git a/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index f2436f0..d6e7b29 100644 --- a/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -1 +1 @@ -com.astrolabsoftware.sparkfits.DefaultSource +com.astrolabsoftware.sparkfits.v2.FitsDataSourceV2 \ No newline at end of file diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsDataSourceV2.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsDataSourceV2.scala new file mode 100644 index 0000000..5ea4b8c --- /dev/null +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsDataSourceV2.scala @@ -0,0 +1,27 @@ +package com.astrolabsoftware.sparkfits.v2 + +import org.apache.spark.sql.connector.catalog.Table +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class FitsDataSourceV2 extends FileDataSourceV2 { + + override def shortName() = "fits" + + override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = { + val paths = getPaths(options) + val tableName = getTableName(paths) + FitsTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat) + } + + // Still have to figure this out + override def fallbackFileFormat: Class[_ <: FileFormat] = ??? + + override def getTable(options: CaseInsensitiveStringMap): Table = { + val paths = getPaths(options) + val tableName = getTableName(paths) + FitsTable(tableName, sparkSession, options, paths, None, fallbackFileFormat) + } +} \ No newline at end of file diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala new file mode 100644 index 0000000..992e8a6 --- /dev/null +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala @@ -0,0 +1,43 @@ +package com.astrolabsoftware.sparkfits.v2 + +import com.astrolabsoftware.sparkfits.FitsLib.Fits +import com.astrolabsoftware.sparkfits.FitsSchema.getSchema +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.read.ScanBuilder +import org.apache.spark.sql.connector.write.WriteBuilder +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.v2.FileTable +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +case class FitsTable( + name: String, + sparkSession: SparkSession, + options: CaseInsensitiveStringMap, + paths: Seq[String], + userSpecifiedSchema: Option[StructType], + fallbackFileFormat: Class[_ <: FileFormat]) + extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = ??? + + override lazy val schema: StructType = userSpecifiedSchema.getOrElse { + val conf = new Configuration(sparkSession.sparkContext.hadoopConfiguration) + val pathFS = new Path(paths(0)) + val fits = new Fits(pathFS, conf, options.get("hdu").toInt) + // Register header and block boundaries + // in the Hadoop configuration for later re-use + fits.registerHeader + fits.blockBoundaries.register(pathFS, conf) + getSchema(fits) + } + + override def formatName: String = "FITS" + + override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = ??? + + // override def name(): String = ??? + override def inferSchema(files: Seq[FileStatus]): Option[StructType] = ??? +} \ No newline at end of file From ab61755be4f3a749571644a7ee27217b37800dc1 Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Mon, 16 Dec 2019 09:55:15 +0530 Subject: [PATCH 02/16] Added the V2 skeleton. Still a long way to go!!! --- build.sbt | 2 +- .../sparkfits/v2/FitsDataSourceV2.scala | 2 +- .../sparkfits/v2/FitsPartitionReader.scala | 136 ++++++++++++++++++ .../v2/FitsPartitionReaderFactory.scala | 21 +++ .../sparkfits/v2/FitsScan.scala | 23 +++ .../sparkfits/v2/FitsScanBuilder.scala | 17 +++ .../sparkfits/v2/FitsTable.scala | 3 +- 7 files changed, 201 insertions(+), 3 deletions(-) create mode 100644 src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala create mode 100644 src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReaderFactory.scala create mode 100644 src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala create mode 100644 src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScanBuilder.scala diff --git a/build.sbt b/build.sbt index 79ad47c..996d0cd 100644 --- a/build.sbt +++ b/build.sbt @@ -46,7 +46,7 @@ lazy val root = (project in file(".")). libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "3.0.0-preview" % "provided", "org.apache.spark" %% "spark-sql" % "3.0.0-preview" % "provided", - scalaTest % Test + "org.scalatest" %% "scalatest" % "3.0.1" % Test ), scalaVersion := "2.12.1" diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsDataSourceV2.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsDataSourceV2.scala index 5ea4b8c..0f78bac 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsDataSourceV2.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsDataSourceV2.scala @@ -17,7 +17,7 @@ class FitsDataSourceV2 extends FileDataSourceV2 { } // Still have to figure this out - override def fallbackFileFormat: Class[_ <: FileFormat] = ??? + override def fallbackFileFormat: Class[_ <: FileFormat] = null override def getTable(options: CaseInsensitiveStringMap): Table = { val paths = getPaths(options) diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala new file mode 100644 index 0000000..e32e61e --- /dev/null +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala @@ -0,0 +1,136 @@ +package com.astrolabsoftware.sparkfits.v2 + +import com.astrolabsoftware.sparkfits.FitsLib +import com.astrolabsoftware.sparkfits.FitsLib.Fits +import org.apache.hadoop.io.LongWritable +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.read.PartitionReader + +class FitsPartitionReader[T] extends PartitionReader[T] { + + // Initialise mutable variables to be used by the executors + // Handle the HDFS block boundaries + private var splitStart: Long = 0L + private var splitEnd: Long = 0L + + // Cursor position when reading the file + private var currentPosition: Long = 0L + + // Size of the records to read from the file + private var recordLength: Int = 0 + + // Object to manipulate the fits file + private var fits: Fits = null + private var header: Array[String] = null + private var nrowsLong : Long = 0L + private var rowSizeInt : Int = 0 + private var rowSizeLong : Long = 0L + private var startstop: FitsLib.FitsBlockBoundaries = FitsLib.FitsBlockBoundaries() + private var notValid : Boolean = false + + // The (key, value) used to create the RDD + private var recordKey: LongWritable = null + private var recordValue: InternalRow = null + + // Intermediate variable to store binary data + private var recordValueBytes: Array[Byte] = null + + override def next(): Boolean = { + // Close the file if mapper is outside the HDU + if (notValid) { + fits.data.close() + return false + } + + // Close the file if we went outside the block! + // This means we sent all our records. + if (fits.data.getPos >= startstop.dataStop) { + fits.data.close() + return false + } + + // Initialise the key of the HDFS block + if (recordKey == null) { + recordKey = new LongWritable() + } + + // the key is a linear index of the record, given by the + // position the record starts divided by the record length + recordKey.set(currentPosition / recordLength) + + // The last record might not be of the same size as the other. + // So if recordLength goes above the end of the data block, cut it. + + // If (getPos + recordLength) goes above splitEnd + recordLength = if ((startstop.dataStop - fits.data.getPos) < recordLength.toLong) { + (startstop.dataStop - fits.data.getPos).toInt + } else { + recordLength + } + + // If (currentPosition + recordLength) goes above splitEnd + recordLength = if ((splitEnd - currentPosition) < recordLength.toLong) { + (splitEnd - currentPosition).toInt + } else { + recordLength + } + + // Last record may not end at the end of a row. + // If record length is not a multiple of the row size + // This can only happen if one of the two ifs below have been triggered + // (by default recordLength is a multiple of the row size). + recordLength = if (recordLength % rowSizeLong != 0) { + + // Decrement recordLength until we reach the end of the row n-1. + do { + recordLength = recordLength - 1 + } while (recordLength % rowSizeLong != 0) + + // Return + recordLength + } else recordLength + + // If recordLength is below the size of a row + // skip and leave this row for the next block + if (recordLength < rowSizeLong) { + fits.data.close() + return false + } + + // The array to place the binary data into + recordValueBytes = new Array[Byte](recordLength) + + // read a record if the currentPosition is less than the split end + if (currentPosition < splitEnd) { + // Read a record of length `0 to recordLength - 1` + fits.data.readFully(recordValueBytes, 0, recordLength) + + // Convert each row + // 1 task: 32 MB @ 2s + val tmp = Seq.newBuilder[InternalRow] + for (i <- 0 to recordLength / rowSizeLong.toInt - 1) { + tmp += InternalRow.fromSeq(fits.getRow( + recordValueBytes.slice( + rowSizeInt*i, rowSizeInt*(i+1)))) + } + recordValue = tmp.result + + // update our current position + currentPosition = currentPosition + recordLength + + // we did not reach the end of the split, and we need to send more records + return true + } + + // We reached the end of the split. + // We will now go to another split (if more available) + fits.data.close() + false + } + + override def get(): InternalRow = recordValue + + override def close(): Unit = ??? + +} diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReaderFactory.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReaderFactory.scala new file mode 100644 index 0000000..4ead0bc --- /dev/null +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReaderFactory.scala @@ -0,0 +1,21 @@ +package com.astrolabsoftware.sparkfits.v2 + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.v2.FilePartitionReaderFactory +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class FitsPartitionReaderFactory( + name: String, + sparkSession: SparkSession, + options: CaseInsensitiveStringMap, + schema: StructType + ) extends FilePartitionReaderFactory { + + override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = + new FitsPartitionReader[InternalRow] + +} diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala new file mode 100644 index 0000000..0685296 --- /dev/null +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala @@ -0,0 +1,23 @@ +package com.astrolabsoftware.sparkfits.v2 + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory, Scan} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class FitsScan( + name: String, + sparkSession: SparkSession, + options: CaseInsensitiveStringMap, + schema: StructType + ) extends Scan with Batch { + + override def toBatch: Batch = super.toBatch + + override def readSchema(): StructType = ??? + + override def planInputPartitions(): Array[InputPartition] = Array.empty + + override def createReaderFactory(): PartitionReaderFactory = + new FitsPartitionReaderFactory(name, sparkSession, options, schema) +} diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScanBuilder.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScanBuilder.scala new file mode 100644 index 0000000..3ed943e --- /dev/null +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScanBuilder.scala @@ -0,0 +1,17 @@ +package com.astrolabsoftware.sparkfits.v2 + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.read.{Batch, Scan, ScanBuilder} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class FitsScanBuilder( + name: String, + sparkSession: SparkSession, + options: CaseInsensitiveStringMap, + schema: StructType + ) extends ScanBuilder { + override def build(): Scan = new FitsScan(name, sparkSession, options, schema) + + +} diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala index 992e8a6..46e7181 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala @@ -21,7 +21,8 @@ case class FitsTable( fallbackFileFormat: Class[_ <: FileFormat]) extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { - override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = ??? + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = + new FitsScanBuilder(name, sparkSession, options, schema) override lazy val schema: StructType = userSpecifiedSchema.getOrElse { val conf = new Configuration(sparkSession.sparkContext.hadoopConfiguration) From 9477838516faf1241742c3d17bdc88ff2a1741eb Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Thu, 19 Dec 2019 23:33:32 +0530 Subject: [PATCH 03/16] Baby steps towards Fits V2 --- .../astrolabsoftware/sparkfits/ReadFitsJ.java | 2 +- .../sparkfits/utils/FiteUtils.scala | 144 ++++++++++++++++++ .../sparkfits/v2/FitsDataSourceV2.scala | 8 +- .../v2/FitsPartitionReaderFactory.scala | 1 - .../sparkfits/v2/FitsScan.scala | 3 +- .../sparkfits/v2/FitsScanBuilder.scala | 4 +- .../sparkfits/v2/FitsTable.scala | 65 ++++++-- 7 files changed, 198 insertions(+), 29 deletions(-) create mode 100644 src/main/scala/com/astrolabsoftware/sparkfits/utils/FiteUtils.scala diff --git a/src/main/java/com/astrolabsoftware/sparkfits/ReadFitsJ.java b/src/main/java/com/astrolabsoftware/sparkfits/ReadFitsJ.java index 47f8144..55f34de 100644 --- a/src/main/java/com/astrolabsoftware/sparkfits/ReadFitsJ.java +++ b/src/main/java/com/astrolabsoftware/sparkfits/ReadFitsJ.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.astrolabsoftware.sparkfits; +package com.astrolabsoftware.sparkfits.examples; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.functions.*; diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/utils/FiteUtils.scala b/src/main/scala/com/astrolabsoftware/sparkfits/utils/FiteUtils.scala new file mode 100644 index 0000000..34d8659 --- /dev/null +++ b/src/main/scala/com/astrolabsoftware/sparkfits/utils/FiteUtils.scala @@ -0,0 +1,144 @@ +package com.astrolabsoftware.sparkfits.utils + +import com.astrolabsoftware.sparkfits.FitsLib.Fits +import com.astrolabsoftware.sparkfits.FitsSchema.getSchema +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{LocatedFileStatus, Path, RemoteIterator} + +import scala.util.Try + +object FiteUtils { + /** + * Search for input FITS files. The input path can be either a single + * FITS file, or a folder containing several FITS files with the + * same HDU structure, or a globbing structure e.g. "toto/\*.fits". + * Raise a NullPointerException if no files found. + * + * @param fn : (String) + * Input path. + * @return (List[String]) List with all files found. + * + */ + def searchFitsFile(fn: String, conf: Configuration, verbosity: Boolean=false): List[String] = { + // Make it Hadoop readable + val path = new Path(fn) + val fs = path.getFileSystem(conf) + + // Check whether we are globbing + val isGlob : Boolean = Try{fs.globStatus(path).size > 1}.getOrElse(false) + + val isCommaSep : Boolean = Try{fn.split(",").size > 1}.getOrElse(false) + + // Check whether we want to load a single FITS file or several + val isDir : Boolean = fs.isDirectory(path) + val isFile : Boolean = fs.isFile(path) + + // println(s"isDir=$isDir isFile=$isFile path=$path") + + // List all the files + val listOfFitsFiles : List[String] = if (isGlob) { + val arr = fs.globStatus(path) + arr.map(x => x.getPath.toString).toList + } else if (isDir) { + val it = fs.listFiles(path, true) + getListOfFiles(it).filter{file => file.endsWith(".fits")} + } else if (isCommaSep) { + fn.split(",").toList + } else if (isFile){ + List(fn) + } else { + List[String]() + } + + // Check that we have at least one file + listOfFitsFiles.size match { + case x if x > 0 => if (verbosity) { + println("FitsRelation.searchFitsFile> Found " + listOfFitsFiles.size.toString + " file(s):") + listOfFitsFiles.foreach(println) + } + case x if x <= 0 => throw new NullPointerException(s""" + 0 files detected! Is $fn a directory containing + FITS files or a FITS file? + """) + } + + listOfFitsFiles + } + + /** + * Load recursively all FITS file inside a directory. + * + * @param it : (RemoteIterator[LocatedFileStatus]) + * Iterator from a Hadoop Path containing informations about files. + * @param extensions : (List[String) + * List of accepted extensions. Currently only .fits is available. + * Default is List("*.fits"). + * @return List of files as a list of String. + * + */ + def getListOfFiles(it: RemoteIterator[LocatedFileStatus], + extensions: List[String] = List(".fits")): List[String] = { + if (!it.hasNext) { + Nil + } else { + it.next.getPath.toString :: getListOfFiles(it, extensions) + } + } + + /** + * Check that the schemas of different FITS HDU to be added are + * the same. Throw an AssertionError otherwise. + * The check is performed only for BINTABLE. + * + * NOTE: This operation is very long for many files! Do not use it for + * hundreds of files! + * + * @param listOfFitsFiles : (List[String]) + * List of files as a list of String. + * @return (String) the type of HDU: BINTABLE, IMAGE, EMPTY, or + * NOT UNDERSTOOD if not registered. + * + */ + def checkSchemaAndReturnType(listOfFitsFiles : List[String], conf: Configuration): Boolean = { + // Targeted HDU + val indexHDU = conf.get("hdu").toInt + + // Initialise + val path_init = new Path(listOfFitsFiles(0)) + + val fits_init = new Fits(path_init, conf, indexHDU) + + if (fits_init.hdu.implemented) { + // Do not perform checks if the mode is PERMISSIVE. + if (conf.get("mode") != "PERMISSIVE") { + val schema_init = getSchema(fits_init) + fits_init.data.close() + for (file <- listOfFitsFiles.slice(1, listOfFitsFiles.size)) { + var path = new Path(file) + val fits = new Fits(path, conf, indexHDU) + val schema = getSchema(fits) + val isOk = schema_init == schema + isOk match { + case true => isOk + case false => { + throw new AssertionError( + """ + You are trying to add HDU data with different structures! + Check that the number of columns, names of columns and element + types are the same. re-run with .option("verbose", true) to + list all the files. + """) + } + } + fits.data.close() + } + } + true + } else { + println(s""" + FITS type ${fits_init.hduType} not supported yet. + An empty DataFrame will be returned.""") + false + } + } +} diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsDataSourceV2.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsDataSourceV2.scala index 0f78bac..e358d9a 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsDataSourceV2.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsDataSourceV2.scala @@ -11,17 +11,13 @@ class FitsDataSourceV2 extends FileDataSourceV2 { override def shortName() = "fits" override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = { - val paths = getPaths(options) - val tableName = getTableName(paths) - FitsTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat) + FitsTable(sparkSession, options, Some(schema), fallbackFileFormat) } // Still have to figure this out override def fallbackFileFormat: Class[_ <: FileFormat] = null override def getTable(options: CaseInsensitiveStringMap): Table = { - val paths = getPaths(options) - val tableName = getTableName(paths) - FitsTable(tableName, sparkSession, options, paths, None, fallbackFileFormat) + FitsTable(sparkSession, options, None, fallbackFileFormat) } } \ No newline at end of file diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReaderFactory.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReaderFactory.scala index 4ead0bc..b5f4b0c 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReaderFactory.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReaderFactory.scala @@ -9,7 +9,6 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap class FitsPartitionReaderFactory( - name: String, sparkSession: SparkSession, options: CaseInsensitiveStringMap, schema: StructType diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala index 0685296..a19a746 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala @@ -6,7 +6,6 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap class FitsScan( - name: String, sparkSession: SparkSession, options: CaseInsensitiveStringMap, schema: StructType @@ -19,5 +18,5 @@ class FitsScan( override def planInputPartitions(): Array[InputPartition] = Array.empty override def createReaderFactory(): PartitionReaderFactory = - new FitsPartitionReaderFactory(name, sparkSession, options, schema) + new FitsPartitionReaderFactory(sparkSession, options, schema) } diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScanBuilder.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScanBuilder.scala index 3ed943e..5ed351e 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScanBuilder.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScanBuilder.scala @@ -6,12 +6,10 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap class FitsScanBuilder( - name: String, sparkSession: SparkSession, options: CaseInsensitiveStringMap, schema: StructType ) extends ScanBuilder { - override def build(): Scan = new FitsScan(name, sparkSession, options, schema) - + override def build(): Scan = new FitsScan(sparkSession, options, schema) } diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala index 46e7181..10913e5 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala @@ -1,32 +1,67 @@ package com.astrolabsoftware.sparkfits.v2 +import scala.collection.JavaConverters._ import com.astrolabsoftware.sparkfits.FitsLib.Fits +import com.astrolabsoftware.sparkfits.utils.FiteUtils._ import com.astrolabsoftware.sparkfits.FitsSchema.getSchema import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ +import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability} import org.apache.spark.sql.connector.read.ScanBuilder -import org.apache.spark.sql.connector.write.WriteBuilder import org.apache.spark.sql.execution.datasources.FileFormat -import org.apache.spark.sql.execution.datasources.v2.FileTable import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap +import scala.util.Try + case class FitsTable( - name: String, sparkSession: SparkSession, options: CaseInsensitiveStringMap, - paths: Seq[String], userSpecifiedSchema: Option[StructType], fallbackFileFormat: Class[_ <: FileFormat]) - extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + extends Table with SupportsRead { override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = - new FitsScanBuilder(name, sparkSession, options, schema) + new FitsScanBuilder(sparkSession, options, schema) + + // Initialise Hadoop configuration + val conf = new Configuration(sparkSession.sparkContext.hadoopConfiguration) + + // This will contain all options use to load the data + private val extraOptions = new scala.collection.mutable.HashMap[String, String] + private val optionsAsScala = options.asScala.toMap + private val listOfFitsFiles = searchFitsFile(optionsAsScala("paths"), conf, verbosity) + + def registerConfigurations: Unit = { + for (keyAndVal <- optionsAsScala) { + conf.set(keyAndVal._1, keyAndVal._2) + extraOptions += (keyAndVal._1 -> keyAndVal._2) + } + if (conf.get("mode") == null) { + conf.set("mode", "PERMISSIVE") + extraOptions += ("mode" -> "PERMISSIVE") + } + } + registerConfigurations + val verbosity = Try{extraOptions("verbose")}.getOrElse("false").toBoolean + + override lazy final val schema: StructType = userSpecifiedSchema.getOrElse { - override lazy val schema: StructType = userSpecifiedSchema.getOrElse { - val conf = new Configuration(sparkSession.sparkContext.hadoopConfiguration) - val pathFS = new Path(paths(0)) + // Check that all the files have the same Schema + // in order to perform the union. Return the HDU type. + // NOTE: This operation is very long for hundreds of files! + // NOTE: Limit that to the first 10 files. + // NOTE: Need to be fixed! + val implemented = if (listOfFitsFiles.size < 10) { + checkSchemaAndReturnType(listOfFitsFiles, conf) + } else{ + checkSchemaAndReturnType(listOfFitsFiles.slice(0, 10), conf) + } + +// val conf = new Configuration(sparkSession.sparkContext.hadoopConfiguration) + val pathFS = new Path(listOfFitsFiles(0)) val fits = new Fits(pathFS, conf, options.get("hdu").toInt) // Register header and block boundaries // in the Hadoop configuration for later re-use @@ -35,10 +70,8 @@ case class FitsTable( getSchema(fits) } - override def formatName: String = "FITS" - - override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = ??? + // We don't really have the notion of table name FITS. So just returning the location + override def name(): String = options.get("paths") - // override def name(): String = ??? - override def inferSchema(files: Seq[FileStatus]): Option[StructType] = ??? -} \ No newline at end of file + override def capabilities: java.util.Set[TableCapability] = Set(BATCH_READ).asJava +} From 103308342e4f5972e26c955e2c95c7e1edbb0fcf Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Sat, 21 Dec 2019 19:59:18 +0530 Subject: [PATCH 04/16] Added per job level setup; ToDo: Implement per record level code --- .../sparkfits/utils/FitsMetadata.scala | 38 ++++++++++ .../{FiteUtils.scala => FitsUtils.scala} | 2 +- .../sparkfits/v2/FitsPartitionReader.scala | 73 ++++++++++++++++--- .../v2/FitsPartitionReaderFactory.scala | 18 ++--- .../sparkfits/v2/FitsScan.scala | 57 +++++++++++++-- .../sparkfits/v2/FitsScanBuilder.scala | 7 +- .../sparkfits/v2/FitsTable.scala | 4 +- 7 files changed, 168 insertions(+), 31 deletions(-) create mode 100644 src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsMetadata.scala rename src/main/scala/com/astrolabsoftware/sparkfits/utils/{FiteUtils.scala => FitsUtils.scala} (99%) diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsMetadata.scala b/src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsMetadata.scala new file mode 100644 index 0000000..162a857 --- /dev/null +++ b/src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsMetadata.scala @@ -0,0 +1,38 @@ +package com.astrolabsoftware.sparkfits.utils + +import com.astrolabsoftware.sparkfits.FitsLib +import com.astrolabsoftware.sparkfits.FitsLib.Fits +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.log4j.LogManager +import org.apache.spark.sql.execution.datasources.PartitionedFile + +class FitsMetadata(partitionedFile: PartitionedFile, conf: Configuration) { + + val log = LogManager.getRootLogger + val path = new Path(partitionedFile.filePath) + private val fits = new Fits(path, conf, conf.getInt("hdu", -1)) + private val header = fits.blockHeader + private var notValid = false + val keyValues = FitsLib.parseHeader(header) + if (keyValues("NAXIS").toInt == 0){ + conf.get("mode") match { + case "PERMISSIVE" => + log.warn(s"Empty HDU for ${path}") + notValid = true + case "FAILFAST" => + log.warn(s"Empty HDU for ${path}") + log.warn(s"Use option('mode', 'PERMISSIVE') if you want to discard all empty HDUs.") + case _ => + } + } + + val nrowsLong = fits.hdu.getNRows(keyValues) + val rowSizeInt = fits.hdu.getSizeRowBytes(keyValues) + val rowSizeLong = rowSizeInt.toLong + + + + + +} diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/utils/FiteUtils.scala b/src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsUtils.scala similarity index 99% rename from src/main/scala/com/astrolabsoftware/sparkfits/utils/FiteUtils.scala rename to src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsUtils.scala index 34d8659..400b130 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/utils/FiteUtils.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsUtils.scala @@ -7,7 +7,7 @@ import org.apache.hadoop.fs.{LocatedFileStatus, Path, RemoteIterator} import scala.util.Try -object FiteUtils { +object FitsUtils { /** * Search for input FITS files. The input path can be either a single * FITS file, or a folder containing several FITS files with the diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala index e32e61e..8d79741 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala @@ -2,12 +2,26 @@ package com.astrolabsoftware.sparkfits.v2 import com.astrolabsoftware.sparkfits.FitsLib import com.astrolabsoftware.sparkfits.FitsLib.Fits +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.apache.hadoop.io.LongWritable -import org.apache.spark.sql.Row +import org.apache.log4j.LogManager +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.PartitionReader +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} +import org.apache.spark.sql.types.StructType -class FitsPartitionReader[T] extends PartitionReader[T] { +class FitsPartitionReader[T]( + partition: FilePartition, + sparkSession: SparkSession, + conf: Configuration, + schema: StructType + ) extends PartitionReader[T] { + + // partition will have how many files are in this logical partition. There can be one or more + // It is ensured that the one file will not be split across multiple partitions, so + // we don;'t have to worry about padding, split in middle of row etc etc // Initialise mutable variables to be used by the executors // Handle the HDFS block boundaries @@ -25,7 +39,7 @@ class FitsPartitionReader[T] extends PartitionReader[T] { private var header: Array[String] = null private var nrowsLong : Long = 0L private var rowSizeInt : Int = 0 - private var rowSizeLong : Long = 0L + private var rowSizeLong : Long = 0Lf private var startstop: FitsLib.FitsBlockBoundaries = FitsLib.FitsBlockBoundaries() private var notValid : Boolean = false @@ -36,18 +50,57 @@ class FitsPartitionReader[T] extends PartitionReader[T] { // Intermediate variable to store binary data private var recordValueBytes: Array[Byte] = null + private var currentFileIndex = 0 + private var currentPartitionedFile: Option[PartitionedFile] = None + private var currentFileDataStop: Long = _ + private var currentFitsFile: Fits = _ + private var currentHeader: Array[String] = _ + val log = LogManager.getRootLogger + + private def setCurrentFileParams(index: Int): Unit = { + if (currentFileIndex != index || !currentPartitionedFile.isDefined) { + currentFileIndex = index + currentPartitionedFile = Option(partition.files(currentFileIndex)) + currentFileDataStop = currentPartitionedFile.get.start + currentPartitionedFile.get.length + val path = new Path(currentPartitionedFile.get.filePath) + currentFitsFile = new Fits(path, conf, conf.getInt("hdu", -1)) + currentHeader = fits.blockHeader + val keyValues = FitsLib.parseHeader(header) + if (keyValues("NAXIS").toInt == 0 & conf.get("mode") == "PERMISSIVE") { + log.warn(s"Empty HDU for ${path}") + notValid = true + } + if (keyValues("NAXIS").toInt == 0 & conf.get("mode") == "FAILFAST") { + log.warn(s"Empty HDU for ${file}") + log.warn(s"Use option('mode', 'PERMISSIVE') if you want to discard all empty HDUs.") + } + nrowsLong = fits.hdu.getNRows(keyValues) + rowSizeInt = fits.hdu.getSizeRowBytes(keyValues) + rowSizeLong = rowSizeInt.toLong + + } + } override def next(): Boolean = { - // Close the file if mapper is outside the HDU - if (notValid) { - fits.data.close() + // We are done reading all the files in the partition + if (currentFileIndex >= partition.index) { return false } + setCurrentFileParams(currentFileIndex) + + // Close the file if mapper is outside the HDU +// if (notValid) { +// fits.data.close() +// return false +// } + // Close the file if we went outside the block! // This means we sent all our records. - if (fits.data.getPos >= startstop.dataStop) { - fits.data.close() - return false + if (currentFitsFile.data.getPos >= currentFileDataStop) { + currentFitsFile.data.close() + // Done reading this file, try with the next file in this block + currentFileIndex += 1 + next() } // Initialise the key of the HDFS block @@ -114,7 +167,7 @@ class FitsPartitionReader[T] extends PartitionReader[T] { recordValueBytes.slice( rowSizeInt*i, rowSizeInt*(i+1)))) } - recordValue = tmp.result +// recordValue = tmp.result // update our current position currentPosition = currentPosition + recordLength diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReaderFactory.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReaderFactory.scala index b5f4b0c..8e22dc1 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReaderFactory.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReaderFactory.scala @@ -3,18 +3,18 @@ package com.astrolabsoftware.sparkfits.v2 import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} -import org.apache.spark.sql.execution.datasources.PartitionedFile -import org.apache.spark.sql.execution.datasources.v2.FilePartitionReaderFactory import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql.execution.datasources.FilePartition class FitsPartitionReaderFactory( sparkSession: SparkSession, - options: CaseInsensitiveStringMap, + conf: Configuration, schema: StructType - ) extends FilePartitionReaderFactory { - - override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = - new FitsPartitionReader[InternalRow] - + ) extends PartitionReaderFactory { + override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { + assert(partition.isInstanceOf[FilePartition]) + val filePartition = partition.asInstanceOf[FilePartition] + new FitsPartitionReader[InternalRow](filePartition, sparkSession, conf, schema) + } } diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala index a19a746..186c9b5 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala @@ -1,22 +1,67 @@ package com.astrolabsoftware.sparkfits.v2 +import com.astrolabsoftware.sparkfits.FitsLib.Fits +import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory, Scan} +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionDirectory, PartitionedFile} import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.util.CaseInsensitiveStringMap +import com.astrolabsoftware.sparkfits.utils.FitsUtils._ +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.catalyst.InternalRow class FitsScan( sparkSession: SparkSession, - options: CaseInsensitiveStringMap, + conf: Configuration, schema: StructType ) extends Scan with Batch { - override def toBatch: Batch = super.toBatch + override def toBatch: Batch = this - override def readSchema(): StructType = ??? + // FITS does not support column pruning or other optimizations at the file level. + // So schema won't change at run-time + override def readSchema(): StructType = schema - override def planInputPartitions(): Array[InputPartition] = Array.empty + override def planInputPartitions(): Array[InputPartition] = { + partitions.toArray + } override def createReaderFactory(): PartitionReaderFactory = - new FitsPartitionReaderFactory(sparkSession, options, schema) + new FitsPartitionReaderFactory(sparkSession, conf, schema) + + protected def partitions: Seq[FilePartition] = { + val partitionedFiles = getPartitionedFiles() + // Sort by length so that bigger blocks are scheduled first + val sortedPartitionedFiles = partitionedFiles.sortBy(_.length) + val maxSplitBytes = maxSplitBytes(sparkSession, partitionedFiles) + // Handle the case when there is just one file and its size is less than then maxSplitBytes + FilePartition.getFilePartitions(sparkSession, sortedPartitionedFiles, maxSplitBytes) + } + + private def getPartitionedFiles(): Seq[PartitionedFile] = { + val files = searchFitsFile(conf.get("paths"), conf, conf.getBoolean("verbosity", false)) + files.map { + file => + val path = new Path(file) + val fits = new Fits(path, conf, conf.getInt("hdu", 0)) + val boundaries = fits.getBlockBoundaries + // Broadcast the boundaries, to avoid computing again + // ToDO: Check this once - InternalRow.empty + PartitionedFile(InternalRow.empty, file, boundaries.dataStart, boundaries.blockStop - boundaries.dataStart) + } + } + + /** + * Borrowed from [[org.apache.spark.sql.execution.datasources.FilePartition$#maxSplitBytes]] + */ + def maxSplitBytes(sparkSession: SparkSession, + partitionedFiles: Seq[PartitionedFile]): Long = { + val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes + val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes + val defaultParallelism = sparkSession.sparkContext.defaultParallelism + val totalBytes = partitionedFiles.map(_.length + openCostInBytes).sum + val bytesPerCore = totalBytes / defaultParallelism + + Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + } } diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScanBuilder.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScanBuilder.scala index 5ed351e..cb6a164 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScanBuilder.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScanBuilder.scala @@ -3,13 +3,14 @@ package com.astrolabsoftware.sparkfits.v2 import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.read.{Batch, Scan, ScanBuilder} import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.hadoop.conf.Configuration class FitsScanBuilder( sparkSession: SparkSession, - options: CaseInsensitiveStringMap, + conf: Configuration, schema: StructType ) extends ScanBuilder { - override def build(): Scan = new FitsScan(sparkSession, options, schema) + + override def build(): Scan = new FitsScan(sparkSession, conf, schema) } diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala index 10913e5..f11a57c 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala @@ -2,7 +2,7 @@ package com.astrolabsoftware.sparkfits.v2 import scala.collection.JavaConverters._ import com.astrolabsoftware.sparkfits.FitsLib.Fits -import com.astrolabsoftware.sparkfits.utils.FiteUtils._ +import com.astrolabsoftware.sparkfits.utils.FitsUtils._ import com.astrolabsoftware.sparkfits.FitsSchema.getSchema import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -24,7 +24,7 @@ case class FitsTable( extends Table with SupportsRead { override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = - new FitsScanBuilder(sparkSession, options, schema) + new FitsScanBuilder(sparkSession, conf, schema) // Initialise Hadoop configuration val conf = new Configuration(sparkSession.sparkContext.hadoopConfiguration) From d407a536a44a4f78f90e9710fd312c98c17af250 Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Sun, 22 Dec 2019 10:57:11 +0530 Subject: [PATCH 05/16] Have deviated quite a bit from V1 way of reading; Testing time now --- .../sparkfits/utils/FitsMetadata.scala | 41 ++++- .../sparkfits/v2/FitsPartitionReader.scala | 174 ++++-------------- .../v2/FitsPartitionReaderFactory.scala | 7 +- .../sparkfits/v2/FitsScan.scala | 10 +- 4 files changed, 79 insertions(+), 153 deletions(-) diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsMetadata.scala b/src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsMetadata.scala index 162a857..f0d5cad 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsMetadata.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsMetadata.scala @@ -7,15 +7,18 @@ import org.apache.hadoop.fs.Path import org.apache.log4j.LogManager import org.apache.spark.sql.execution.datasources.PartitionedFile -class FitsMetadata(partitionedFile: PartitionedFile, conf: Configuration) { +import scala.util.Try + +class FitsMetadata(partitionedFile: PartitionedFile, val index: Int, conf: Configuration) { val log = LogManager.getRootLogger val path = new Path(partitionedFile.filePath) - private val fits = new Fits(path, conf, conf.getInt("hdu", -1)) + private[sparkfits] val fits = new Fits(path, conf, conf.getInt("hdu", -1)) + private[sparkfits] val startStop = fits.blockBoundaries private val header = fits.blockHeader - private var notValid = false + private[sparkfits] var notValid = false val keyValues = FitsLib.parseHeader(header) - if (keyValues("NAXIS").toInt == 0){ + if (keyValues("NAXIS").toInt == 0) { conf.get("mode") match { case "PERMISSIVE" => log.warn(s"Empty HDU for ${path}") @@ -27,12 +30,38 @@ class FitsMetadata(partitionedFile: PartitionedFile, conf: Configuration) { } } + // Get the record length in Bytes (get integer!). First look if the user + // specify a size for the recordLength. If not, set it to max(1 Ko, rowSize). + // If the HDU is an image, the recordLength is the row size (NAXIS1 * nbytes) + val recordLengthFromUser = Try{conf.get("recordlength").toInt} + .getOrElse{ + if (fits.hduType == "IMAGE") { + rowSizeInt + } else { + // set it to max(1 Ko, rowSize) + math.max((1 * 1024 / rowSizeInt) * rowSizeInt, rowSizeInt) + } + } + val nrowsLong = fits.hdu.getNRows(keyValues) val rowSizeInt = fits.hdu.getSizeRowBytes(keyValues) val rowSizeLong = rowSizeInt.toLong + // For Table, seek for a round number of lines for the record + private var recordLength = (recordLengthFromUser / rowSizeInt) * rowSizeInt + // Make sure that the recordLength is not bigger than the block size! + // This is a guard for small files. + recordLength = if ((recordLength / rowSizeInt) < nrowsLong.toInt) { + // OK less than the total number of lines + recordLength + } else { + // Small files, one record is the entire file. + nrowsLong.toInt * rowSizeLong.toInt + } + // Move to the starting binary index + fits.data.seek(startStop.dataStart) - - + // Set our starting block position + var currentPosition = startStop.dataStart } diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala index 8d79741..b8d5a37 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala @@ -1,83 +1,42 @@ package com.astrolabsoftware.sparkfits.v2 -import com.astrolabsoftware.sparkfits.FitsLib import com.astrolabsoftware.sparkfits.FitsLib.Fits -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.LongWritable +import com.astrolabsoftware.sparkfits.utils.FitsMetadata import org.apache.log4j.LogManager +import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.PartitionReader import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableConfiguration class FitsPartitionReader[T]( partition: FilePartition, sparkSession: SparkSession, - conf: Configuration, + broadCastedConf: Broadcast[SerializableConfiguration], schema: StructType ) extends PartitionReader[T] { // partition will have how many files are in this logical partition. There can be one or more // It is ensured that the one file will not be split across multiple partitions, so - // we don;'t have to worry about padding, split in middle of row etc etc + // we don't have to worry about padding, split in middle of row etc etc - // Initialise mutable variables to be used by the executors - // Handle the HDFS block boundaries - private var splitStart: Long = 0L - private var splitEnd: Long = 0L + assert(partition.index >= 1, "There are no files in this partition, seems incorrect") - // Cursor position when reading the file - private var currentPosition: Long = 0L - - // Size of the records to read from the file - private var recordLength: Int = 0 - - // Object to manipulate the fits file - private var fits: Fits = null - private var header: Array[String] = null - private var nrowsLong : Long = 0L - private var rowSizeInt : Int = 0 - private var rowSizeLong : Long = 0Lf - private var startstop: FitsLib.FitsBlockBoundaries = FitsLib.FitsBlockBoundaries() - private var notValid : Boolean = false - - // The (key, value) used to create the RDD - private var recordKey: LongWritable = null - private var recordValue: InternalRow = null - - // Intermediate variable to store binary data + private val conf = broadCastedConf.value.value + private var currentFitsMetadata: Option[FitsMetadata] = None + private var currentFileIndex = 0 + private var fits: Fits = _ private var recordValueBytes: Array[Byte] = null + private var currentRow: InternalRow = null - private var currentFileIndex = 0 - private var currentPartitionedFile: Option[PartitionedFile] = None - private var currentFileDataStop: Long = _ - private var currentFitsFile: Fits = _ - private var currentHeader: Array[String] = _ val log = LogManager.getRootLogger - private def setCurrentFileParams(index: Int): Unit = { - if (currentFileIndex != index || !currentPartitionedFile.isDefined) { - currentFileIndex = index - currentPartitionedFile = Option(partition.files(currentFileIndex)) - currentFileDataStop = currentPartitionedFile.get.start + currentPartitionedFile.get.length - val path = new Path(currentPartitionedFile.get.filePath) - currentFitsFile = new Fits(path, conf, conf.getInt("hdu", -1)) - currentHeader = fits.blockHeader - val keyValues = FitsLib.parseHeader(header) - if (keyValues("NAXIS").toInt == 0 & conf.get("mode") == "PERMISSIVE") { - log.warn(s"Empty HDU for ${path}") - notValid = true - } - if (keyValues("NAXIS").toInt == 0 & conf.get("mode") == "FAILFAST") { - log.warn(s"Empty HDU for ${file}") - log.warn(s"Use option('mode', 'PERMISSIVE') if you want to discard all empty HDUs.") - } - nrowsLong = fits.hdu.getNRows(keyValues) - rowSizeInt = fits.hdu.getSizeRowBytes(keyValues) - rowSizeLong = rowSizeInt.toLong - + private def setCurrentFileParams(): Unit = { + if (!currentFitsMetadata.isDefined || currentFitsMetadata.get.index != currentFileIndex) { + currentFitsMetadata = Option(new FitsMetadata(partition.files(currentFileIndex), currentFileIndex, conf)) + fits = currentFitsMetadata.get.fits } } override def next(): Boolean = { @@ -86,104 +45,35 @@ class FitsPartitionReader[T]( return false } - setCurrentFileParams(currentFileIndex) + setCurrentFileParams() // Close the file if mapper is outside the HDU -// if (notValid) { -// fits.data.close() -// return false -// } + if (currentFitsMetadata.get.notValid) { + fits.data.close() + // Try next file + currentFileIndex += 1 + } // Close the file if we went outside the block! // This means we sent all our records. - if (currentFitsFile.data.getPos >= currentFileDataStop) { - currentFitsFile.data.close() + if (fits.data.getPos >= currentFitsMetadata.get.startStop.dataStop) { + fits.data.close() // Done reading this file, try with the next file in this block currentFileIndex += 1 - next() - } - - // Initialise the key of the HDFS block - if (recordKey == null) { - recordKey = new LongWritable() - } - - // the key is a linear index of the record, given by the - // position the record starts divided by the record length - recordKey.set(currentPosition / recordLength) - - // The last record might not be of the same size as the other. - // So if recordLength goes above the end of the data block, cut it. - - // If (getPos + recordLength) goes above splitEnd - recordLength = if ((startstop.dataStop - fits.data.getPos) < recordLength.toLong) { - (startstop.dataStop - fits.data.getPos).toInt - } else { - recordLength - } - - // If (currentPosition + recordLength) goes above splitEnd - recordLength = if ((splitEnd - currentPosition) < recordLength.toLong) { - (splitEnd - currentPosition).toInt - } else { - recordLength + return next() } - // Last record may not end at the end of a row. - // If record length is not a multiple of the row size - // This can only happen if one of the two ifs below have been triggered - // (by default recordLength is a multiple of the row size). - recordLength = if (recordLength % rowSizeLong != 0) { - - // Decrement recordLength until we reach the end of the row n-1. - do { - recordLength = recordLength - 1 - } while (recordLength % rowSizeLong != 0) + recordValueBytes = new Array[Byte](currentFitsMetadata.get.rowSizeInt) + fits.data.readFully(recordValueBytes, 0, currentFitsMetadata.get.rowSizeInt) + currentRow = InternalRow.fromSeq(recordValueBytes) + true + } - // Return - recordLength - } else recordLength + override def get(): InternalRow = currentRow - // If recordLength is below the size of a row - // skip and leave this row for the next block - if (recordLength < rowSizeLong) { + override def close(): Unit = { + if (fits.data.getPos >= currentFitsMetadata.get.startStop.dataStop) { fits.data.close() - return false - } - - // The array to place the binary data into - recordValueBytes = new Array[Byte](recordLength) - - // read a record if the currentPosition is less than the split end - if (currentPosition < splitEnd) { - // Read a record of length `0 to recordLength - 1` - fits.data.readFully(recordValueBytes, 0, recordLength) - - // Convert each row - // 1 task: 32 MB @ 2s - val tmp = Seq.newBuilder[InternalRow] - for (i <- 0 to recordLength / rowSizeLong.toInt - 1) { - tmp += InternalRow.fromSeq(fits.getRow( - recordValueBytes.slice( - rowSizeInt*i, rowSizeInt*(i+1)))) - } -// recordValue = tmp.result - - // update our current position - currentPosition = currentPosition + recordLength - - // we did not reach the end of the split, and we need to send more records - return true } - - // We reached the end of the split. - // We will now go to another split (if more available) - fits.data.close() - false } - - override def get(): InternalRow = recordValue - - override def close(): Unit = ??? - } diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReaderFactory.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReaderFactory.scala index 8e22dc1..0b41284 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReaderFactory.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReaderFactory.scala @@ -4,17 +4,18 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} import org.apache.spark.sql.types.StructType -import org.apache.hadoop.conf.Configuration +import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.execution.datasources.FilePartition +import org.apache.spark.util.SerializableConfiguration class FitsPartitionReaderFactory( sparkSession: SparkSession, - conf: Configuration, + broadCastedConf: Broadcast[SerializableConfiguration], schema: StructType ) extends PartitionReaderFactory { override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { assert(partition.isInstanceOf[FilePartition]) val filePartition = partition.asInstanceOf[FilePartition] - new FitsPartitionReader[InternalRow](filePartition, sparkSession, conf, schema) + new FitsPartitionReader[InternalRow](filePartition, sparkSession, broadCastedConf, schema) } } diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala index 186c9b5..3246621 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala @@ -9,6 +9,7 @@ import org.apache.spark.sql.types.StructType import com.astrolabsoftware.sparkfits.utils.FitsUtils._ import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.util.SerializableConfiguration class FitsScan( sparkSession: SparkSession, @@ -26,8 +27,11 @@ class FitsScan( partitions.toArray } - override def createReaderFactory(): PartitionReaderFactory = - new FitsPartitionReaderFactory(sparkSession, conf, schema) + override def createReaderFactory(): PartitionReaderFactory = { + val broadCastedConf = sparkSession.sparkContext.broadcast( + new SerializableConfiguration(conf)) + new FitsPartitionReaderFactory(sparkSession, broadCastedConf, schema) + } protected def partitions: Seq[FilePartition] = { val partitionedFiles = getPartitionedFiles() @@ -45,6 +49,8 @@ class FitsScan( val path = new Path(file) val fits = new Fits(path, conf, conf.getInt("hdu", 0)) val boundaries = fits.getBlockBoundaries + fits.registerHeader + fits.blockBoundaries.register(path, conf) // Broadcast the boundaries, to avoid computing again // ToDO: Check this once - InternalRow.empty PartitionedFile(InternalRow.empty, file, boundaries.dataStart, boundaries.blockStop - boundaries.dataStart) From 8337df57796b13d56a48309d490ea121ab327e1b Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Sun, 22 Dec 2019 11:10:29 +0530 Subject: [PATCH 06/16] Resolved compilation failures --- .../sparkfits/v2/FitsPartitionReader.scala | 2 +- .../scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala index b8d5a37..0b976af 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala @@ -16,7 +16,7 @@ class FitsPartitionReader[T]( sparkSession: SparkSession, broadCastedConf: Broadcast[SerializableConfiguration], schema: StructType - ) extends PartitionReader[T] { + ) extends PartitionReader[InternalRow] { // partition will have how many files are in this logical partition. There can be one or more // It is ensured that the one file will not be split across multiple partitions, so diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala index 3246621..064d662 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala @@ -37,9 +37,9 @@ class FitsScan( val partitionedFiles = getPartitionedFiles() // Sort by length so that bigger blocks are scheduled first val sortedPartitionedFiles = partitionedFiles.sortBy(_.length) - val maxSplitBytes = maxSplitBytes(sparkSession, partitionedFiles) - // Handle the case when there is just one file and its size is less than then maxSplitBytes - FilePartition.getFilePartitions(sparkSession, sortedPartitionedFiles, maxSplitBytes) + val splitBytes = maxSplitBytes(sparkSession, partitionedFiles) + // Handle the case when there is just one file and its size is less than then splitBytes + FilePartition.getFilePartitions(sparkSession, sortedPartitionedFiles, splitBytes) } private def getPartitionedFiles(): Seq[PartitionedFile] = { @@ -67,7 +67,6 @@ class FitsScan( val defaultParallelism = sparkSession.sparkContext.defaultParallelism val totalBytes = partitionedFiles.map(_.length + openCostInBytes).sum val bytesPerCore = totalBytes / defaultParallelism - Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) } } From fc20760494e091ac2ce9a2f342914519461b5667 Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Sun, 22 Dec 2019 11:11:26 +0530 Subject: [PATCH 07/16] Lets not hard code --- .../com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala index 0b976af..8ea55d2 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala @@ -11,7 +11,7 @@ import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFil import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration -class FitsPartitionReader[T]( +class FitsPartitionReader[T <: InternalRow]( partition: FilePartition, sparkSession: SparkSession, broadCastedConf: Broadcast[SerializableConfiguration], From 4c7bf140661c3e51e4e3828d9ecfc60a9dc76a8e Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Mon, 23 Dec 2019 23:03:39 +0530 Subject: [PATCH 08/16] Moved the step closer to first V2 read; There seems to be an issue when the Catalyst type is not same as the scala type, String type for example is converted internally to UTF8String; Lets seee --- .../sparkfits/v2/FitsPartitionReader.scala | 18 +++++++++++++----- .../sparkfits/v2/FitsScan.scala | 3 ++- .../sparkfits/v2/FitsTable.scala | 10 +++++++--- 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala index 8ea55d2..4915cfd 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala @@ -6,6 +6,8 @@ import org.apache.log4j.LogManager import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.connector.read.PartitionReader import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} import org.apache.spark.sql.types.StructType @@ -22,7 +24,7 @@ class FitsPartitionReader[T <: InternalRow]( // It is ensured that the one file will not be split across multiple partitions, so // we don't have to worry about padding, split in middle of row etc etc - assert(partition.index >= 1, "There are no files in this partition, seems incorrect") + assert(partition.files.size >= 1, "There are no files in this partition, seems incorrect") private val conf = broadCastedConf.value.value private var currentFitsMetadata: Option[FitsMetadata] = None @@ -30,6 +32,8 @@ class FitsPartitionReader[T <: InternalRow]( private var fits: Fits = _ private var recordValueBytes: Array[Byte] = null private var currentRow: InternalRow = null + private final val attributedSchema = schema.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()) + private val unsafeProjection = GenerateUnsafeProjection.generate(attributedSchema, attributedSchema) val log = LogManager.getRootLogger @@ -41,7 +45,7 @@ class FitsPartitionReader[T <: InternalRow]( } override def next(): Boolean = { // We are done reading all the files in the partition - if (currentFileIndex >= partition.index) { + if (currentFileIndex > partition.index) { return false } @@ -65,14 +69,18 @@ class FitsPartitionReader[T <: InternalRow]( recordValueBytes = new Array[Byte](currentFitsMetadata.get.rowSizeInt) fits.data.readFully(recordValueBytes, 0, currentFitsMetadata.get.rowSizeInt) - currentRow = InternalRow.fromSeq(recordValueBytes) + currentRow = InternalRow.fromSeq(fits.getRow(recordValueBytes)) true } - override def get(): InternalRow = currentRow + private val rowConverter = { + () => unsafeProjection(currentRow) + } + + override def get(): InternalRow = rowConverter() override def close(): Unit = { - if (fits.data.getPos >= currentFitsMetadata.get.startStop.dataStop) { + if (fits.data != null) { fits.data.close() } } diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala index 064d662..91c4758 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala @@ -43,7 +43,8 @@ class FitsScan( } private def getPartitionedFiles(): Seq[PartitionedFile] = { - val files = searchFitsFile(conf.get("paths"), conf, conf.getBoolean("verbosity", false)) + val files = conf.get("listOfFitsFiles").split(",") + // val files = searchFitsFile(conf.get("path"), conf, conf.getBoolean("verbosity", false)) files.map { file => val path = new Path(file) diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala index f11a57c..09c0035 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala @@ -14,6 +14,7 @@ import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap +import scala.collection.mutable import scala.util.Try case class FitsTable( @@ -31,8 +32,11 @@ case class FitsTable( // This will contain all options use to load the data private val extraOptions = new scala.collection.mutable.HashMap[String, String] - private val optionsAsScala = options.asScala.toMap - private val listOfFitsFiles = searchFitsFile(optionsAsScala("paths"), conf, verbosity) + private val optionsAsScala: mutable.Map[String, String] = mutable.Map.empty + optionsAsScala ++= options.asScala + private final val listOfFitsFiles = searchFitsFile(optionsAsScala("path"), conf, verbosity) + // Add list of Fits files for a use later + optionsAsScala += ("listOfFitsFiles" -> listOfFitsFiles.mkString(",")) def registerConfigurations: Unit = { for (keyAndVal <- optionsAsScala) { @@ -71,7 +75,7 @@ case class FitsTable( } // We don't really have the notion of table name FITS. So just returning the location - override def name(): String = options.get("paths") + override def name(): String = options.get("path") override def capabilities: java.util.Set[TableCapability] = Set(BATCH_READ).asJava } From 38aeb28bdd421ac9166aa02de15b682459e31955 Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Mon, 23 Dec 2019 23:56:20 +0530 Subject: [PATCH 09/16] Was able to hack this to work, got the first output (o_o) --- .../sparkfits/v2/FitsPartitionReader.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala index 4915cfd..822c388 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala @@ -5,6 +5,7 @@ import com.astrolabsoftware.sparkfits.utils.FitsMetadata import org.apache.log4j.LogManager import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.CatalystTypeConverters import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection @@ -69,15 +70,17 @@ class FitsPartitionReader[T <: InternalRow]( recordValueBytes = new Array[Byte](currentFitsMetadata.get.rowSizeInt) fits.data.readFully(recordValueBytes, 0, currentFitsMetadata.get.rowSizeInt) - currentRow = InternalRow.fromSeq(fits.getRow(recordValueBytes)) +// currentRow = InternalRow.fromSeq(fits.getRow(recordValueBytes)) + currentRow = InternalRow.fromSeq(fits.getRow(recordValueBytes).map(CatalystTypeConverters.convertToCatalyst(_))) true } - private val rowConverter = { - () => unsafeProjection(currentRow) - } +// private val rowConverter = { +// () => unsafeProjection(currentRow) +// } - override def get(): InternalRow = rowConverter() +// override def get(): InternalRow = rowConverter() + override def get(): InternalRow = currentRow override def close(): Unit = { if (fits.data != null) { From 2fd825ecb2b73faa3a3971b388a244089ae3c0df Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Tue, 24 Dec 2019 23:45:08 +0530 Subject: [PATCH 10/16] Changed v2 source name, Code clean-up --- ...pache.spark.sql.sources.DataSourceRegister | 3 ++- .../sparkfits/utils/FitsMetadata.scala | 4 +--- .../sparkfits/v2/FitsDataSourceV2.scala | 20 +++++++++---------- .../sparkfits/v2/FitsPartitionReader.scala | 19 +++++------------- .../sparkfits/v2/FitsScan.scala | 18 ++++++++++------- .../sparkfits/v2/FitsTable.scala | 12 +++++------ 6 files changed, 34 insertions(+), 42 deletions(-) diff --git a/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index d6e7b29..8ab1956 100644 --- a/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -1 +1,2 @@ -com.astrolabsoftware.sparkfits.v2.FitsDataSourceV2 \ No newline at end of file +com.astrolabsoftware.sparkfits.v2.FitsDataSourceV2 +com.astrolabsoftware.sparkfits.DefaultSource \ No newline at end of file diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsMetadata.scala b/src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsMetadata.scala index f0d5cad..2ae8298 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsMetadata.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsMetadata.scala @@ -48,6 +48,7 @@ class FitsMetadata(partitionedFile: PartitionedFile, val index: Int, conf: Confi val rowSizeLong = rowSizeInt.toLong // For Table, seek for a round number of lines for the record + // ToDo: Cases when the user has given the record length private var recordLength = (recordLengthFromUser / rowSizeInt) * rowSizeInt // Make sure that the recordLength is not bigger than the block size! @@ -61,7 +62,4 @@ class FitsMetadata(partitionedFile: PartitionedFile, val index: Int, conf: Confi } // Move to the starting binary index fits.data.seek(startStop.dataStart) - - // Set our starting block position - var currentPosition = startStop.dataStart } diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsDataSourceV2.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsDataSourceV2.scala index e358d9a..ea2c5ff 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsDataSourceV2.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsDataSourceV2.scala @@ -1,23 +1,23 @@ package com.astrolabsoftware.sparkfits.v2 -import org.apache.spark.sql.connector.catalog.Table -import org.apache.spark.sql.execution.datasources.FileFormat -import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.catalog.{Table, TableProvider} +import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap -class FitsDataSourceV2 extends FileDataSourceV2 { +class FitsDataSourceV2 extends TableProvider with DataSourceRegister { - override def shortName() = "fits" + // ToDo: Use the name "fits" and still resolve v1 vs v2 somehow + override def shortName() = "fitsv2" + + lazy val sparkSession = SparkSession.active override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = { - FitsTable(sparkSession, options, Some(schema), fallbackFileFormat) + FitsTable(sparkSession, options, Some(schema)) } - // Still have to figure this out - override def fallbackFileFormat: Class[_ <: FileFormat] = null - override def getTable(options: CaseInsensitiveStringMap): Table = { - FitsTable(sparkSession, options, None, fallbackFileFormat) + FitsTable(sparkSession, options, None) } } \ No newline at end of file diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala index 822c388..3c82651 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala @@ -5,12 +5,9 @@ import com.astrolabsoftware.sparkfits.utils.FitsMetadata import org.apache.log4j.LogManager import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.CatalystTypeConverters -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.connector.read.PartitionReader -import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} +import org.apache.spark.sql.execution.datasources.FilePartition import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration @@ -33,8 +30,6 @@ class FitsPartitionReader[T <: InternalRow]( private var fits: Fits = _ private var recordValueBytes: Array[Byte] = null private var currentRow: InternalRow = null - private final val attributedSchema = schema.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()) - private val unsafeProjection = GenerateUnsafeProjection.generate(attributedSchema, attributedSchema) val log = LogManager.getRootLogger @@ -60,7 +55,7 @@ class FitsPartitionReader[T <: InternalRow]( } // Close the file if we went outside the block! - // This means we sent all our records. + // This means we read all the records. if (fits.data.getPos >= currentFitsMetadata.get.startStop.dataStop) { fits.data.close() // Done reading this file, try with the next file in this block @@ -70,16 +65,12 @@ class FitsPartitionReader[T <: InternalRow]( recordValueBytes = new Array[Byte](currentFitsMetadata.get.rowSizeInt) fits.data.readFully(recordValueBytes, 0, currentFitsMetadata.get.rowSizeInt) -// currentRow = InternalRow.fromSeq(fits.getRow(recordValueBytes)) + // FixMe: We can just directly read the rows as UnsafeRow to avoid unnecessary conversion + // back and forth currentRow = InternalRow.fromSeq(fits.getRow(recordValueBytes).map(CatalystTypeConverters.convertToCatalyst(_))) true } -// private val rowConverter = { -// () => unsafeProjection(currentRow) -// } - -// override def get(): InternalRow = rowConverter() override def get(): InternalRow = currentRow override def close(): Unit = { diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala index 91c4758..a0d5ea1 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala @@ -6,7 +6,6 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory, Scan} import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionDirectory, PartitionedFile} import org.apache.spark.sql.types.StructType -import com.astrolabsoftware.sparkfits.utils.FitsUtils._ import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.util.SerializableConfiguration @@ -34,12 +33,16 @@ class FitsScan( } protected def partitions: Seq[FilePartition] = { - val partitionedFiles = getPartitionedFiles() - // Sort by length so that bigger blocks are scheduled first - val sortedPartitionedFiles = partitionedFiles.sortBy(_.length) - val splitBytes = maxSplitBytes(sparkSession, partitionedFiles) - // Handle the case when there is just one file and its size is less than then splitBytes - FilePartition.getFilePartitions(sparkSession, sortedPartitionedFiles, splitBytes) + if (conf.getBoolean("implemented", true)) { + val partitionedFiles = getPartitionedFiles() + // Sort by length so that bigger blocks are scheduled first + val sortedPartitionedFiles = partitionedFiles.sortBy(_.length) + val splitBytes = maxSplitBytes(sparkSession, partitionedFiles) + // Handle the case when there is just one file and its size is less than then splitBytes + FilePartition.getFilePartitions(sparkSession, sortedPartitionedFiles, splitBytes) + } else { + Seq.empty + } } private def getPartitionedFiles(): Seq[PartitionedFile] = { @@ -50,6 +53,7 @@ class FitsScan( val path = new Path(file) val fits = new Fits(path, conf, conf.getInt("hdu", 0)) val boundaries = fits.getBlockBoundaries + // Register the header and block boundaries for re-use later fits.registerHeader fits.blockBoundaries.register(path, conf) // Broadcast the boundaries, to avoid computing again diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala index 09c0035..d3ce116 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala @@ -1,27 +1,25 @@ package com.astrolabsoftware.sparkfits.v2 -import scala.collection.JavaConverters._ import com.astrolabsoftware.sparkfits.FitsLib.Fits -import com.astrolabsoftware.sparkfits.utils.FitsUtils._ import com.astrolabsoftware.sparkfits.FitsSchema.getSchema +import com.astrolabsoftware.sparkfits.utils.FitsUtils._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability} import org.apache.spark.sql.connector.read.ScanBuilder -import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Try case class FitsTable( sparkSession: SparkSession, options: CaseInsensitiveStringMap, - userSpecifiedSchema: Option[StructType], - fallbackFileFormat: Class[_ <: FileFormat]) + userSpecifiedSchema: Option[StructType]) extends Table with SupportsRead { override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = @@ -63,8 +61,8 @@ case class FitsTable( } else{ checkSchemaAndReturnType(listOfFitsFiles.slice(0, 10), conf) } + conf.setBoolean("implemented", implemented) -// val conf = new Configuration(sparkSession.sparkContext.hadoopConfiguration) val pathFS = new Path(listOfFitsFiles(0)) val fits = new Fits(pathFS, conf, options.get("hdu").toInt) // Register header and block boundaries @@ -75,7 +73,7 @@ case class FitsTable( } // We don't really have the notion of table name FITS. So just returning the location - override def name(): String = options.get("path") + override def name(): String = s"FITS Table: ${options.get("path")}" override def capabilities: java.util.Set[TableCapability] = Set(BATCH_READ).asJava } From 6b7002e64a4aa690670b260c3e4dcdb5b4fbdbd3 Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Tue, 24 Dec 2019 23:48:04 +0530 Subject: [PATCH 11/16] Added license headers --- build.sbt | 10 +++++++--- .../sparkfits/utils/FitsMetadata.scala | 15 +++++++++++++++ .../sparkfits/utils/FitsUtils.scala | 15 +++++++++++++++ .../sparkfits/v2/FitsDataSourceV2.scala | 15 +++++++++++++++ .../sparkfits/v2/FitsPartitionReader.scala | 15 +++++++++++++++ .../v2/FitsPartitionReaderFactory.scala | 15 +++++++++++++++ .../sparkfits/v2/FitsScan.scala | 15 +++++++++++++++ .../sparkfits/v2/FitsScanBuilder.scala | 19 +++++++++++++++++-- .../sparkfits/v2/FitsTable.scala | 15 +++++++++++++++ 9 files changed, 129 insertions(+), 5 deletions(-) diff --git a/build.sbt b/build.sbt index 996d0cd..ffdefa9 100644 --- a/build.sbt +++ b/build.sbt @@ -13,8 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import Dependencies._ -import xerial.sbt.Sonatype._ lazy val root = (project in file(".")). settings( @@ -69,7 +67,13 @@ developers := List( "Julien Peloton", "peloton@lal.in2p3.fr", url("https://github.com/JulienPeloton") - ) + ), + Developer( + "Mayur Bhosale", + "Mayur Bhosale", + "mayurdb31@gmail.com", + url("https://github.com/mayurdb") + ) ) licenses := Seq("Apache-2.0" -> url("http://www.apache.org/licenses/LICENSE-2.0.txt")) diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsMetadata.scala b/src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsMetadata.scala index 2ae8298..fb77bb3 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsMetadata.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsMetadata.scala @@ -1,3 +1,18 @@ +/* + * Copyright 2019 AstroLab Software + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.astrolabsoftware.sparkfits.utils import com.astrolabsoftware.sparkfits.FitsLib diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsUtils.scala b/src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsUtils.scala index 400b130..dff67bd 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsUtils.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsUtils.scala @@ -1,3 +1,18 @@ +/* + * Copyright 2019 AstroLab Software + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.astrolabsoftware.sparkfits.utils import com.astrolabsoftware.sparkfits.FitsLib.Fits diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsDataSourceV2.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsDataSourceV2.scala index ea2c5ff..384fe61 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsDataSourceV2.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsDataSourceV2.scala @@ -1,3 +1,18 @@ +/* + * Copyright 2019 AstroLab Software + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.astrolabsoftware.sparkfits.v2 import org.apache.spark.sql.SparkSession diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala index 3c82651..370448c 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala @@ -1,3 +1,18 @@ +/* + * Copyright 2019 AstroLab Software + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.astrolabsoftware.sparkfits.v2 import com.astrolabsoftware.sparkfits.FitsLib.Fits diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReaderFactory.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReaderFactory.scala index 0b41284..1fabedc 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReaderFactory.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReaderFactory.scala @@ -1,3 +1,18 @@ +/* + * Copyright 2019 AstroLab Software + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.astrolabsoftware.sparkfits.v2 import org.apache.spark.sql.SparkSession diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala index a0d5ea1..50d709d 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala @@ -1,3 +1,18 @@ +/* + * Copyright 2019 AstroLab Software + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.astrolabsoftware.sparkfits.v2 import com.astrolabsoftware.sparkfits.FitsLib.Fits diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScanBuilder.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScanBuilder.scala index cb6a164..fb506de 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScanBuilder.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScanBuilder.scala @@ -1,9 +1,24 @@ +/* + * Copyright 2019 AstroLab Software + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.astrolabsoftware.sparkfits.v2 +import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.connector.read.{Batch, Scan, ScanBuilder} +import org.apache.spark.sql.connector.read.{Scan, ScanBuilder} import org.apache.spark.sql.types.StructType -import org.apache.hadoop.conf.Configuration class FitsScanBuilder( sparkSession: SparkSession, diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala index d3ce116..3d01e6b 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala @@ -1,3 +1,18 @@ +/* + * Copyright 2019 AstroLab Software + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.astrolabsoftware.sparkfits.v2 import com.astrolabsoftware.sparkfits.FitsLib.Fits From ad2efe57f19a39f5bee8ad507cce91287e0f3b12 Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Tue, 24 Dec 2019 23:53:06 +0530 Subject: [PATCH 12/16] Further cleanup --- build.sbt | 3 +-- src/main/java/com/astrolabsoftware/sparkfits/ReadFitsJ.java | 2 +- .../services/org.apache.spark.sql.sources.DataSourceRegister | 2 +- .../com/astrolabsoftware/sparkfits/v2/FitsDataSourceV2.scala | 2 +- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/build.sbt b/build.sbt index ffdefa9..48d86fc 100644 --- a/build.sbt +++ b/build.sbt @@ -13,7 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - lazy val root = (project in file(".")). settings( inThisBuild(List( @@ -69,7 +68,7 @@ developers := List( url("https://github.com/JulienPeloton") ), Developer( - "Mayur Bhosale", + "MayurBhosale", "Mayur Bhosale", "mayurdb31@gmail.com", url("https://github.com/mayurdb") diff --git a/src/main/java/com/astrolabsoftware/sparkfits/ReadFitsJ.java b/src/main/java/com/astrolabsoftware/sparkfits/ReadFitsJ.java index 55f34de..47f8144 100644 --- a/src/main/java/com/astrolabsoftware/sparkfits/ReadFitsJ.java +++ b/src/main/java/com/astrolabsoftware/sparkfits/ReadFitsJ.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.astrolabsoftware.sparkfits.examples; +package com.astrolabsoftware.sparkfits; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.functions.*; diff --git a/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index 8ab1956..79b98db 100644 --- a/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -1,2 +1,2 @@ com.astrolabsoftware.sparkfits.v2.FitsDataSourceV2 -com.astrolabsoftware.sparkfits.DefaultSource \ No newline at end of file +com.astrolabsoftware.sparkfits.DefaultSource diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsDataSourceV2.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsDataSourceV2.scala index 384fe61..bf0882a 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsDataSourceV2.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsDataSourceV2.scala @@ -35,4 +35,4 @@ class FitsDataSourceV2 extends TableProvider with DataSourceRegister { override def getTable(options: CaseInsensitiveStringMap): Table = { FitsTable(sparkSession, options, None) } -} \ No newline at end of file +} From db7a93a56d4a229e7f211bbe144a8ef40026a5c2 Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Wed, 25 Dec 2019 00:19:11 +0530 Subject: [PATCH 13/16] Code cleanup --- .../sparkfits/v2/FitsPartitionReader.scala | 10 ++-------- .../com/astrolabsoftware/sparkfits/v2/FitsScan.scala | 4 ++-- .../astrolabsoftware/sparkfits/v2/FitsTable.scala | 12 ++++-------- 3 files changed, 8 insertions(+), 18 deletions(-) diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala index 370448c..61297c3 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala @@ -62,13 +62,6 @@ class FitsPartitionReader[T <: InternalRow]( setCurrentFileParams() - // Close the file if mapper is outside the HDU - if (currentFitsMetadata.get.notValid) { - fits.data.close() - // Try next file - currentFileIndex += 1 - } - // Close the file if we went outside the block! // This means we read all the records. if (fits.data.getPos >= currentFitsMetadata.get.startStop.dataStop) { @@ -82,7 +75,8 @@ class FitsPartitionReader[T <: InternalRow]( fits.data.readFully(recordValueBytes, 0, currentFitsMetadata.get.rowSizeInt) // FixMe: We can just directly read the rows as UnsafeRow to avoid unnecessary conversion // back and forth - currentRow = InternalRow.fromSeq(fits.getRow(recordValueBytes).map(CatalystTypeConverters.convertToCatalyst(_))) + currentRow = InternalRow.fromSeq( + fits.getRow(recordValueBytes).map(CatalystTypeConverters.convertToCatalyst(_))) true } diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala index 50d709d..2407ac6 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala @@ -42,6 +42,8 @@ class FitsScan( } override def createReaderFactory(): PartitionReaderFactory = { + // Broadcast the confs which will have the file boundaries and headers + // for the re-use on the executor side val broadCastedConf = sparkSession.sparkContext.broadcast( new SerializableConfiguration(conf)) new FitsPartitionReaderFactory(sparkSession, broadCastedConf, schema) @@ -62,7 +64,6 @@ class FitsScan( private def getPartitionedFiles(): Seq[PartitionedFile] = { val files = conf.get("listOfFitsFiles").split(",") - // val files = searchFitsFile(conf.get("path"), conf, conf.getBoolean("verbosity", false)) files.map { file => val path = new Path(file) @@ -71,7 +72,6 @@ class FitsScan( // Register the header and block boundaries for re-use later fits.registerHeader fits.blockBoundaries.register(path, conf) - // Broadcast the boundaries, to avoid computing again // ToDO: Check this once - InternalRow.empty PartitionedFile(InternalRow.empty, file, boundaries.dataStart, boundaries.blockStop - boundaries.dataStart) } diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala index 3d01e6b..c12b759 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala @@ -29,7 +29,6 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.util.Try case class FitsTable( sparkSession: SparkSession, @@ -42,27 +41,23 @@ case class FitsTable( // Initialise Hadoop configuration val conf = new Configuration(sparkSession.sparkContext.hadoopConfiguration) - - // This will contain all options use to load the data - private val extraOptions = new scala.collection.mutable.HashMap[String, String] private val optionsAsScala: mutable.Map[String, String] = mutable.Map.empty optionsAsScala ++= options.asScala private final val listOfFitsFiles = searchFitsFile(optionsAsScala("path"), conf, verbosity) - // Add list of Fits files for a use later + // Add list of Fits files to conf for a re-use later optionsAsScala += ("listOfFitsFiles" -> listOfFitsFiles.mkString(",")) def registerConfigurations: Unit = { for (keyAndVal <- optionsAsScala) { conf.set(keyAndVal._1, keyAndVal._2) - extraOptions += (keyAndVal._1 -> keyAndVal._2) } if (conf.get("mode") == null) { conf.set("mode", "PERMISSIVE") - extraOptions += ("mode" -> "PERMISSIVE") } } registerConfigurations - val verbosity = Try{extraOptions("verbose")}.getOrElse("false").toBoolean + + val verbosity = conf.getBoolean("verbose", false) override lazy final val schema: StructType = userSpecifiedSchema.getOrElse { @@ -90,5 +85,6 @@ case class FitsTable( // We don't really have the notion of table name FITS. So just returning the location override def name(): String = s"FITS Table: ${options.get("path")}" + // Here we define, functionality supported by FITS datasource override def capabilities: java.util.Set[TableCapability] = Set(BATCH_READ).asJava } From deadd76beaff6c4aed50426fa5c54eb05ce8ec8e Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Mon, 30 Dec 2019 18:28:38 +0530 Subject: [PATCH 14/16] Added UTs and fixes along the way --- .../sparkfits/utils/FitsMetadata.scala | 65 +-- .../sparkfits/v2/FitsPartitionReader.scala | 33 +- .../sparkfits/v2/FitsScan.scala | 7 + .../sparkfits/v2/FitsTable.scala | 7 +- .../sparkfits/ReadFitsTest.scala | 389 +++++++++++------- .../sparkfits/packageTest.scala | 311 ++++++++------ 6 files changed, 491 insertions(+), 321 deletions(-) diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsMetadata.scala b/src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsMetadata.scala index fb77bb3..2bc3c6b 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsMetadata.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsMetadata.scala @@ -45,36 +45,47 @@ class FitsMetadata(partitionedFile: PartitionedFile, val index: Int, conf: Confi } } - // Get the record length in Bytes (get integer!). First look if the user - // specify a size for the recordLength. If not, set it to max(1 Ko, rowSize). - // If the HDU is an image, the recordLength is the row size (NAXIS1 * nbytes) - val recordLengthFromUser = Try{conf.get("recordlength").toInt} - .getOrElse{ - if (fits.hduType == "IMAGE") { - rowSizeInt - } else { - // set it to max(1 Ko, rowSize) - math.max((1 * 1024 / rowSizeInt) * rowSizeInt, rowSizeInt) - } + private var recordLength: Long = 0L + var rowSizeInt: Int = 0 + var rowSizeLong: Long = 0L + + if (!notValid) { + + val nrowsLong = fits.hdu.getNRows(keyValues) + rowSizeInt = fits.hdu.getSizeRowBytes(keyValues) + rowSizeLong = rowSizeInt.toLong + + + // Get the record length in Bytes (get integer!). First look if the user + // specify a size for the recordLength. If not, set it to max(1 Ko, rowSize). + // If the HDU is an image, the recordLength is the row size (NAXIS1 * nbytes) + val recordLengthFromUser = Try { + conf.get("recordlength").toInt } + .getOrElse { + if (fits.hduType == "IMAGE") { + rowSizeInt + } else { + // set it to max(1 Ko, rowSize) + math.max((1 * 1024 / rowSizeInt) * rowSizeInt, rowSizeInt) + } + } - val nrowsLong = fits.hdu.getNRows(keyValues) - val rowSizeInt = fits.hdu.getSizeRowBytes(keyValues) - val rowSizeLong = rowSizeInt.toLong - // For Table, seek for a round number of lines for the record - // ToDo: Cases when the user has given the record length - private var recordLength = (recordLengthFromUser / rowSizeInt) * rowSizeInt + // For Table, seek for a round number of lines for the record + // ToDo: Cases when the user has given the record length + recordLength = (recordLengthFromUser / rowSizeInt) * rowSizeInt - // Make sure that the recordLength is not bigger than the block size! - // This is a guard for small files. - recordLength = if ((recordLength / rowSizeInt) < nrowsLong.toInt) { - // OK less than the total number of lines - recordLength - } else { - // Small files, one record is the entire file. - nrowsLong.toInt * rowSizeLong.toInt + // Make sure that the recordLength is not bigger than the block size! + // This is a guard for small files. + recordLength = if ((recordLength / rowSizeInt) < nrowsLong.toInt) { + // OK less than the total number of lines + recordLength + } else { + // Small files, one record is the entire file. + nrowsLong.toInt * rowSizeLong.toInt + } + // Move to the starting binary index + fits.data.seek(startStop.dataStart) } - // Move to the starting binary index - fits.data.seek(startStop.dataStart) } diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala index 61297c3..57aa3a1 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala @@ -19,8 +19,9 @@ import com.astrolabsoftware.sparkfits.FitsLib.Fits import com.astrolabsoftware.sparkfits.utils.FitsMetadata import org.apache.log4j.LogManager import org.apache.spark.broadcast.Broadcast -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.connector.read.PartitionReader import org.apache.spark.sql.execution.datasources.FilePartition import org.apache.spark.sql.types.StructType @@ -45,23 +46,37 @@ class FitsPartitionReader[T <: InternalRow]( private var fits: Fits = _ private var recordValueBytes: Array[Byte] = null private var currentRow: InternalRow = null + val converters = RowEncoder(schema) val log = LogManager.getRootLogger private def setCurrentFileParams(): Unit = { - if (!currentFitsMetadata.isDefined || currentFitsMetadata.get.index != currentFileIndex) { + if (!currentFitsMetadata.isDefined || currentFitsMetadata.get.index != currentFileIndex) { + println( + s""" + | Info: + | Number of files: ${partition.files.size} + | Index: ${partition.index} + |""".stripMargin) currentFitsMetadata = Option(new FitsMetadata(partition.files(currentFileIndex), currentFileIndex, conf)) fits = currentFitsMetadata.get.fits } } override def next(): Boolean = { + println("Getting next record") // We are done reading all the files in the partition - if (currentFileIndex > partition.index) { + if (currentFileIndex > partition.files.size-1) { return false } setCurrentFileParams() + if (currentFitsMetadata.get.notValid) { + // Non Valid FITS file, try with the next file in this block + currentFileIndex += 1 + return next() + } + // Close the file if we went outside the block! // This means we read all the records. if (fits.data.getPos >= currentFitsMetadata.get.startStop.dataStop) { @@ -75,8 +90,14 @@ class FitsPartitionReader[T <: InternalRow]( fits.data.readFully(recordValueBytes, 0, currentFitsMetadata.get.rowSizeInt) // FixMe: We can just directly read the rows as UnsafeRow to avoid unnecessary conversion // back and forth - currentRow = InternalRow.fromSeq( - fits.getRow(recordValueBytes).map(CatalystTypeConverters.convertToCatalyst(_))) +// currentRow = InternalRow.fromSeq( +// fits.getRow(recordValueBytes).map(CatalystTypeConverters.convertToCatalyst(_))) + +// currentRow = InternalRow.fromSeq( +// fits.getRow(recordValueBytes).map(converters.toRow(_))) + currentRow = converters.toRow(Row.fromSeq(recordValueBytes)) + + println("Got record") true } diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala index 2407ac6..a92adfa 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsScan.scala @@ -31,6 +31,8 @@ class FitsScan( schema: StructType ) extends Scan with Batch { + println("Using the V2 for read") + override def toBatch: Batch = this // FITS does not support column pruning or other optimizations at the file level. @@ -63,6 +65,11 @@ class FitsScan( } private def getPartitionedFiles(): Seq[PartitionedFile] = { + // FixMe: For a really large number of files, the Driver will get stuck while listing + // the files itself for block storage like S3. If not that, driver definitely + // get stuck while iterating over each file, and computing the boundaries + // We can launch a spark job to list the files and compute the boundaries distributively + // :Just_A_Thought: val files = conf.get("listOfFitsFiles").split(",") files.map { file => diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala index c12b759..6f48c36 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala @@ -60,7 +60,12 @@ case class FitsTable( val verbosity = conf.getBoolean("verbose", false) override lazy final val schema: StructType = userSpecifiedSchema.getOrElse { - + // Ensure HDU index was specified + if (conf.get("hdu") == null) { + throw new NoSuchElementException(""" + You need to specify the HDU to be read! + spark.readfits.option("hdu", )""") + } // Check that all the files have the same Schema // in order to perform the union. Return the HDU type. // NOTE: This operation is very long for hundreds of files! diff --git a/src/test/scala/com/astrolabsoftware/sparkfits/ReadFitsTest.scala b/src/test/scala/com/astrolabsoftware/sparkfits/ReadFitsTest.scala index 62e070b..cd580cb 100644 --- a/src/test/scala/com/astrolabsoftware/sparkfits/ReadFitsTest.scala +++ b/src/test/scala/com/astrolabsoftware/sparkfits/ReadFitsTest.scala @@ -36,6 +36,8 @@ class ReadFitsTest extends FunSuite with BeforeAndAfterAll { private var spark : SparkSession = _ + private val fileFormats = List("com.astrolabsoftware.sparkfits", "fitsv2") + override protected def beforeAll() : Unit = { super.beforeAll() spark = SparkSession @@ -63,256 +65,327 @@ class ReadFitsTest extends FunSuite with BeforeAndAfterAll { // Test if the user provides a correct recordLength test("recordLength test: Can you catch a too small user-defined recordLength?") { - val results = spark.read - .format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .option("recordLength", 1024) - val exception = intercept[AssertionError] { - results.load(fn_long) + fileFormats.foreach { + fileFormat => + val results = spark.read + .format(fileFormat) + .option("hdu", 1) + .option("recordLength", 1024) + val exception = intercept[AssertionError] { + results.load(fn_long) + } + assert(exception.getMessage.contains("recordLength option too small")) } - assert(exception.getMessage.contains("recordLength option too small")) } // Test if the code can adapt recordlength test("recordLength test: Can you adapt the size of recordLength if needed?") { - val results = spark.read - .format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .load(fn_long) - assert(results.count() == 100) + fileFormats.foreach { + fileFormat => + val results = spark.read + .format(fileFormat) + .option("hdu", 1) + .load(fn_long) + assert(results.count() == 100) + } } // Test if the user provides the HDU index to be read test("HDU test: Is there a HDU number?") { - val results = spark.read.format("com.astrolabsoftware.sparkfits") - val exception = intercept[NoSuchElementException] { - results.load(fn) + fileFormats.foreach { + fileFormat => + val results = spark.read.format(fileFormat) + val exception = intercept[NoSuchElementException] { + results.load(fn) + } + assert(exception.getMessage.contains("HDU")) } - assert(exception.getMessage.contains("HDU")) } // Test if the user provides the HDU index to be read test("HDU test: Is HDU index above the max HDU index?") { - val results = spark.read.format("com.astrolabsoftware.sparkfits") - val exception = intercept[AssertionError] { - results.option("hdu", 30).load(fn) + fileFormats.foreach { + fileFormat => + val results = spark.read.format(fileFormat) + val exception = intercept[AssertionError] { + results.option("hdu", 30).load(fn) + } + assert(exception.getMessage.contains("HDU")) } - assert(exception.getMessage.contains("HDU")) } test("HDU type test: Return an empty DataFrame if HDU is empty?") { - val results = spark.read.format("com.astrolabsoftware.sparkfits").option("hdu", 0).load(fn) - assert(results.collect().size == 0) + fileFormats.foreach { + fileFormat => + val results = spark.read.format(fileFormat).option("hdu", 0).load(fn) + assert(results.collect().size == 0) + } } test("HDU type test: Return the proper record count if HDU is an image?") { - val fn_image = "src/test/resources/toTest/tst0009.fits" - val results = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 2) - .load(fn_image) - val count = results.count() - assert(count == 155) + fileFormats.foreach { + fileFormat => + val fn_image = "src/test/resources/toTest/tst0009.fits" + val results = spark.read.format(fileFormat) + .option("hdu", 2) + .load(fn_image) + val count = results.count() + assert(count == 155) + } } // Test if the user provides the data type in the HDU test("HDU type test: Return an empty DF if the HDU is a Table? (not implemented yet)") { - val fn_table = "src/test/resources/toTest/tst0009.fits" - val results = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .load(fn_table) - val count = results.count() - assert(count == 0) + fileFormats.foreach { + fileFormat => + val fn_table = "src/test/resources/toTest/tst0009.fits" + val results = spark.read.format(fileFormat) + .option("hdu", 1) + .load(fn_table) + val count = results.count() + assert(count == 0) + } } // Test if one accesses column as expected for HDU 1 test("Count test: Do you count all elements in a column in HDU 1?") { - val results = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .load(fn) - assert(results.select("RA").count() == 20000) + fileFormats.foreach { + fileFormat => + val results = spark.read.format(fileFormat) + .option("hdu", 1) + .load(fn) + assert(results.select("RA").count() == 20000) + } } // Test if one accesses column as expected for HDU 1 test("Count test: Do you count all elements in a column in HDU 2?") { - val results = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 2) - .load(fn) - assert(results.select("Index").count() == 20000) + fileFormats.foreach { + fileFormat => + val results = spark.read.format(fileFormat) + .option("hdu", 2) + .load(fn) + assert(results.select("Index").count() == 20000) + } } // Test if one accesses column as expected for HDU 1 test("Column test: Can you select only one column?") { - val results = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .option("columns", "target") - .load(fn) - assert(results.first.size == 1) + fileFormats.foreach { + fileFormat => + val results = spark.read.format(fileFormat) + .option("hdu", 1) + .option("columns", "target") + .load(fn) + assert(results.first.size == 1) + } } // Test if one accesses column as expected for HDU 1 test("Column test: Can you select only some columns?") { - val results = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .option("columns", "target,RA") - .load(fn) - assert(results.first.size == 2) + fileFormats.foreach { + fileFormat => + val results = spark.read.format(fileFormat) + .option("hdu", 1) + .option("columns", "target,RA") + .load(fn) + assert(results.first.size == 2) + } } // Test if type cast is done correctly test("Type test: Do you see a Boolean?") { - val results = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 2) - .load(fn) - // Elements of a column are arrays of 1 element - assert(results.select("Discovery").first()(0).isInstanceOf[Boolean]) + fileFormats.foreach { + fileFormat => + val results = spark.read.format(fileFormat) + .option("hdu", 2) + .load(fn) + // Elements of a column are arrays of 1 element + assert(results.select("Discovery").first()(0).isInstanceOf[Boolean]) + } } // Test if type cast is done correctly test("Type test: Do you see a Long?") { - val results = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .load(fn) - // Elements of a column are arrays of 1 element - assert(results.select("Index").first()(0).isInstanceOf[Long]) - - // Test also that vector with one element gets converted to scalar - val resultsAlt = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .load(fnAlt) - // Elements of a column are arrays of 1 element - assert(resultsAlt.select("Index").first()(0).isInstanceOf[Long]) + fileFormats.foreach { + fileFormat => + val results = spark.read.format(fileFormat) + .option("hdu", 1) + .load(fn) + // Elements of a column are arrays of 1 element + assert(results.select("Index").first()(0).isInstanceOf[Long]) + + // Test also that vector with one element gets converted to scalar + val resultsAlt = spark.read.format(fileFormat) + .option("hdu", 1) + .load(fnAlt) + // Elements of a column are arrays of 1 element + assert(resultsAlt.select("Index").first()(0).isInstanceOf[Long]) + } } // Test if type cast is done correctly test("Type test: Do you see a Int?") { - val results = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .load(fn) - // Elements of a column are arrays of 1 element - assert(results.select("RunId").first()(0).isInstanceOf[Int]) - - // Test also that vector with one element gets converted to scalar - val resultsAlt = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 2) - .load(fnAlt) - // Elements of a column are arrays of 1 element - assert(resultsAlt.select("Index").first()(0).isInstanceOf[Int]) + fileFormats.foreach { + fileFormat => + val results = spark.read.format(fileFormat) + .option("hdu", 1) + .load(fn) + // Elements of a column are arrays of 1 element + assert(results.select("RunId").first()(0).isInstanceOf[Int]) + + // Test also that vector with one element gets converted to scalar + val resultsAlt = spark.read.format(fileFormat) + .option("hdu", 2) + .load(fnAlt) + // Elements of a column are arrays of 1 element + assert(resultsAlt.select("Index").first()(0).isInstanceOf[Int]) + } } // Test if type cast is done correctly test("Type test: Do you see a Short?") { - val results = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .load(fn_array) - // Elements of a column are arrays of 1 element - assert(results.select("RunId").first()(0).isInstanceOf[Short]) - - // Test also that vector with one element gets converted to scalar - val resultsAlt = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .load(fnAlt) - // Elements of a column are arrays of 1 element - assert(resultsAlt.select("RunId").first()(0).isInstanceOf[Short]) + fileFormats.foreach { + fileFormat => + val results = spark.read.format(fileFormat) + .option("hdu", 1) + .load(fn_array) + // Elements of a column are arrays of 1 element + assert(results.select("RunId").first()(0).isInstanceOf[Short]) + + // Test also that vector with one element gets converted to scalar + val resultsAlt = spark.read.format(fileFormat) + .option("hdu", 1) + .load(fnAlt) + // Elements of a column are arrays of 1 element + assert(resultsAlt.select("RunId").first()(0).isInstanceOf[Short]) + } } // Test if type cast is done correctly test("Type test: Do you see a Float?") { - val results = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .load(fn) - // Elements of a column are arrays of 1 element - assert(results.select("RA").first()(0).isInstanceOf[Float]) - - // Test also that vector with one element gets converted to scalar - val resultsAlt = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .load(fnAlt) - // Elements of a column are arrays of 1 element - assert(resultsAlt.select("RA").first()(0).isInstanceOf[Float]) + fileFormats.foreach { + fileFormat => + val results = spark.read.format(fileFormat) + .option("hdu", 1) + .load(fn) + // Elements of a column are arrays of 1 element + assert(results.select("RA").first()(0).isInstanceOf[Float]) + + // Test also that vector with one element gets converted to scalar + val resultsAlt = spark.read.format(fileFormat) + .option("hdu", 1) + .load(fnAlt) + // Elements of a column are arrays of 1 element + assert(resultsAlt.select("RA").first()(0).isInstanceOf[Float]) + } } // Test if type cast is done correctly test("Type test: Do you see a Double?") { - val results = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .load(fn) - // Elements of a column are arrays of 1 element - assert(results.select("Dec").first()(0).isInstanceOf[Double]) - - // Test also that vector with one element gets converted to scalar - val resultsAlt = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .load(fnAlt) - // Elements of a column are arrays of 1 element - assert(resultsAlt.select("Dec").first()(0).isInstanceOf[Double]) + fileFormats.foreach { + fileFormat => + val results = spark.read.format(fileFormat) + .option("hdu", 1) + .load(fn) + // Elements of a column are arrays of 1 element + assert(results.select("Dec").first()(0).isInstanceOf[Double]) + + // Test also that vector with one element gets converted to scalar + val resultsAlt = spark.read.format(fileFormat) + .option("hdu", 1) + .load(fnAlt) + // Elements of a column are arrays of 1 element + assert(resultsAlt.select("Dec").first()(0).isInstanceOf[Double]) + } } // Test if type cast is done correctly test("Type test: Do you see an Array(Long)?") { - val results = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .load(fn_array) - // Elements of a column are arrays of 1 element - assert(results.select("Index").schema(0).dataType.simpleString == "array") - assert(results.select("Index").take(1)(0)(0).asInstanceOf[Seq[Long]].size == 7) + fileFormats.foreach { + fileFormat => + val results = spark.read.format(fileFormat) + .option("hdu", 1) + .load(fn_array) + // Elements of a column are arrays of 1 element + assert(results.select("Index").schema(0).dataType.simpleString == "array") + assert(results.select("Index").take(1)(0)(0).asInstanceOf[Seq[Long]].size == 7) + } } // Test if type cast is done correctly test("Type test: Do you see an Array(Float)?") { - val results = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .load(fn_array) - // Elements of a column are arrays of 1 element - assert(results.select("RA").schema(0).dataType.simpleString == "array") - assert(results.select("RA").take(1)(0)(0).asInstanceOf[Seq[Float]].size == 2) + fileFormats.foreach { + fileFormat => + val results = spark.read.format(fileFormat) + .option("hdu", 1) + .load(fn_array) + // Elements of a column are arrays of 1 element + assert(results.select("RA").schema(0).dataType.simpleString == "array") + assert(results.select("RA").take(1)(0)(0).asInstanceOf[Seq[Float]].size == 2) + } } // Test if type cast is done correctly test("Type test: Do you see an Array(Double)?") { - val results = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .load(fn_array) - // Elements of a column are arrays of 1 element - assert(results.select("Dec").schema(0).dataType.simpleString == "array") - assert(results.select("Dec").take(1)(0)(0).asInstanceOf[Seq[Double]].size == 3) + fileFormats.foreach { + fileFormat => + val results = spark.read.format(fileFormat) + .option("hdu", 1) + .load(fn_array) + // Elements of a column are arrays of 1 element + assert(results.select("Dec").schema(0).dataType.simpleString == "array") + assert(results.select("Dec").take(1)(0)(0).asInstanceOf[Seq[Double]].size == 3) + } } // Test if type cast is done correctly test("Type test: Do you see an Array(Int)?") { - val results = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 2) - .load(fn_array) - // Elements of a column are arrays of 1 element - assert(results.select("Index").schema(0).dataType.simpleString == "array") - assert(results.select("Index").take(1)(0)(0).asInstanceOf[Seq[Int]].size == 2) + fileFormats.foreach { + fileFormat => + val results = spark.read.format(fileFormat) + .option("hdu", 2) + .load(fn_array) + // Elements of a column are arrays of 1 element + assert(results.select("Index").schema(0).dataType.simpleString == "array") + assert(results.select("Index").take(1)(0)(0).asInstanceOf[Seq[Int]].size == 2) + } } // Test if type cast is done correctly test("Type test: Do you see an Array(Short)?") { - val results = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .load(fn_array) - // Elements of a column are arrays of 1 element - assert(results.select("RunIdArray").schema(0).dataType.simpleString == "array") - assert(results.select("RunIdArray").take(1)(0)(0).asInstanceOf[Seq[Int]].size == 3) + fileFormats.foreach { + fileFormat => + val results = spark.read.format(fileFormat) + .option("hdu", 1) + .load(fn_array) + // Elements of a column are arrays of 1 element + assert(results.select("RunIdArray").schema(0).dataType.simpleString == "array") + assert(results.select("RunIdArray").take(1)(0)(0).asInstanceOf[Seq[Int]].size == 3) + } } // Test if type cast is done correctly test("Type test: Do you see a String?") { - val results = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .load(fn) - // Elements of a column are arrays of 1 element - assert(results.select("target").first()(0).isInstanceOf[String]) + fileFormats.foreach { + fileFormat => + val results = spark.read.format(fileFormat) + .option("hdu", 1) + .load(fn) + // Elements of a column are arrays of 1 element + assert(results.select("target").first()(0).isInstanceOf[String]) + } } // Test if type cast is done correctly test("Type test: Do you see a Byte?") { - val results = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .load(fnUb) - // Elements of a column are arrays of 1 element - assert(results.select("unsigned bytes").first()(0).isInstanceOf[Byte]) + fileFormats.foreach { + fileFormat => + val results = spark.read.format(fileFormat) + .option("hdu", 1) + .load(fnUb) + // Elements of a column are arrays of 1 element + assert(results.select("unsigned bytes").first()(0).isInstanceOf[Byte]) + } } - } diff --git a/src/test/scala/com/astrolabsoftware/sparkfits/packageTest.scala b/src/test/scala/com/astrolabsoftware/sparkfits/packageTest.scala index e819ba4..05a8733 100644 --- a/src/test/scala/com/astrolabsoftware/sparkfits/packageTest.scala +++ b/src/test/scala/com/astrolabsoftware/sparkfits/packageTest.scala @@ -40,6 +40,8 @@ class packageTest extends FunSuite with BeforeAndAfterAll { private var spark : SparkSession = _ + private val fileFormats = List("com.astrolabsoftware.sparkfits", "fitsv2") + override protected def beforeAll() : Unit = { super.beforeAll() spark = SparkSession @@ -63,192 +65,243 @@ class packageTest extends FunSuite with BeforeAndAfterAll { // Test if readfits does nothing :D test("Readfits test: Do you send back a DataFrameReader?") { - val results = spark.read.format("com.astrolabsoftware.sparkfits") - assert(results.isInstanceOf[DataFrameReader]) + fileFormats.foreach { + fileFormat => + val results = spark.read.format(fileFormat) + assert(results.isInstanceOf[DataFrameReader]) + } } // Test if readfits does nothing :D test("Readfits test: Do you yout nickname?") { - val results = spark.read.format("fits") - assert(results.isInstanceOf[DataFrameReader]) + fileFormats.foreach { + fileFormat => + val results = spark.read.format(fileFormat) + assert(results.isInstanceOf[DataFrameReader]) + } } // Test DataFrame test("DataFrame test: can you really make a DF from the hdu?") { - val results = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .load(fn) - assert(results.isInstanceOf[DataFrame]) + fileFormats.foreach { + fileFormat => + val results = spark.read.format(fileFormat) + .option("hdu", 1) + .load(fn) + assert(results.isInstanceOf[DataFrame]) + } } // Test DataFrame test("User schema test: can you really take an external header?") { - // Specify manually the header - val schema = StructType( - List( - StructField("toto", StringType, true), - StructField("tutu", FloatType, true), - StructField("tata", DoubleType, true), - StructField("titi", LongType, true), - StructField("tete", IntegerType, true) - ) - ) - - val results = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .schema(schema) - .load(fn) - assert(results.columns.deep == Array("toto", "tutu", "tata", "titi", "tete").deep) + fileFormats.foreach { + fileFormat => + // Specify manually the header + val schema = StructType( + List( + StructField("toto", StringType, true), + StructField("tutu", FloatType, true), + StructField("tata", DoubleType, true), + StructField("titi", LongType, true), + StructField("tete", IntegerType, true) + ) + ) + + val results = spark.read.format(fileFormat) + .option("hdu", 1) + .schema(schema) + .load(fn) + assert(results.columns.deep == Array("toto", "tutu", "tata", "titi", "tete").deep) + } } // Test Data distribution test("Data distribution test: Can you count all elements?") { - val results = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .load(fn) - assert(results.select(col("Index")).count().toInt == 20000) + fileFormats.foreach { + fileFormat => + val results = spark.read.format(fileFormat) + .option("hdu", 1) + .load(fn) + assert(results.select(col("Index")).count().toInt == 20000) + } } test("Data distribution test: Can you sum up all elements?") { - val results = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .load(fn) - assert( - results.select( - col("Index")).rdd - .map(_(0).asInstanceOf[Long]) - .reduce(_+_) == 199990000) + fileFormats.foreach { + fileFormat => + val results = spark.read.format(fileFormat) + .option("hdu", 1) + .load(fn) + assert( + results.select( + col("Index")).rdd + .map(_ (0).asInstanceOf[Long]) + .reduce(_ + _) == 199990000) + } } test("Data distribution test: Do you pass over all blocks?") { - val results = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .option("recordlength", 16 * 1024) - .load(fn) + fileFormats.foreach { + fileFormat => + val results = spark.read.format(fileFormat) + .option("hdu", 1) + .option("recordlength", 16 * 1024) + .load(fn) - val count = results.select(col("Index")).count().toInt - val count_unique = results.select(col("Index")).distinct().count().toInt + val count = results.select(col("Index")).count().toInt + val count_unique = results.select(col("Index")).distinct().count().toInt - assert(count == count_unique) + assert(count == count_unique) + } } test("Header printing test") { - val results = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .option("verbose", true) - .option("recordlength", 16 * 1024) - - // Finally print the header and exit. - assert(results.load(fn).isInstanceOf[DataFrame]) + fileFormats.foreach { + fileFormat => + val results = spark.read.format(fileFormat) + .option("hdu", 1) + .option("verbose", true) + .option("recordlength", 16 * 1024) + + // Finally print the header and exit. + assert(results.load(fn).isInstanceOf[DataFrame]) + } } test("Multi files test: Can you read several FITS file?") { - val fn = "src/test/resources/dir" - val results = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .option("verbose", true) - .option("recordlength", 16 * 1024) - - assert(results.load(fn).isInstanceOf[DataFrame]) - assert(results.load(fn).count() == 27000) + fileFormats.foreach { + fileFormat => + val fn = "src/test/resources/dir" + val results = spark.read.format(fileFormat) + .option("hdu", 1) + .option("verbose", true) + .option("recordlength", 16 * 1024) + + assert(results.load(fn).isInstanceOf[DataFrame]) + assert(results.load(fn).count() == 27000) + } } test("Multi files test: Can you read several FITS file (glob)?") { - val fn = "src/test/resources/dir/*.fits" - val df = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .option("verbose", true) - .option("recordlength", 16 * 1024) - .load(fn) - - assert(df.isInstanceOf[DataFrame]) - assert(df.count() == 27000) + fileFormats.foreach { + fileFormat => + val fn = "src/test/resources/dir/*.fits" + val df = spark.read.format(fileFormat) + .option("hdu", 1) + .option("verbose", true) + .option("recordlength", 16 * 1024) + .load(fn) + + assert(df.isInstanceOf[DataFrame]) + assert(df.count() == 27000) + } } test("Multi files test: Can you read several FITS file (comma-separated)?") { - val fn = "src/test/resources/dir/test_file.fits,src/test/resources/dir/test_file2.fits" - val df = spark.read.format("fits") - .option("hdu", 1) - .load(fn) - - assert(df.isInstanceOf[DataFrame]) - assert(df.count() == 27000) + (fileFormats ++ List("fits")).foreach { + fileFormat => + val fn = "src/test/resources/dir/test_file.fits,src/test/resources/dir/test_file2.fits" + val df = spark.read.format(fileFormat) + .option("hdu", 1) + .load(fn) + + assert(df.isInstanceOf[DataFrame]) + assert(df.count() == 27000) + } } test("Multi files test: Can you detect an error in reading different FITS file [FAILFAST]?") { - val fn = "src/test/resources/dirNotOk" - val results = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .option("verbose", true) - .option("mode", "FAILFAST") - .option("recordlength", 16 * 1024) - - val exception = intercept[AssertionError] { - results.load(fn).count - } - - assert(exception.getMessage.contains("different structures")) + fileFormats.foreach { + fileFormat => + val fn = "src/test/resources/dirNotOk" + val results = spark.read.format(fileFormat) + .option("hdu", 1) + .option("verbose", true) + .option("mode", "FAILFAST") + .option("recordlength", 16 * 1024) + + val exception = intercept[AssertionError] { + results.load(fn).count + } + + assert(exception.getMessage.contains("different structures")) + } } test("Multi files test: Can you read several FITS file (image) discarding empty ones?") { - val fn = "src/test/resources/dirIm" - val df = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 2) - .option("verbose", true) - .load(fn) + fileFormats.foreach { + fileFormat => + val fn = "src/test/resources/dirIm" + val df = spark.read.format(fileFormat) + .option("hdu", 2) + .option("verbose", true) + .load(fn) - df.count() + df.count() - assert(df.isInstanceOf[DataFrame]) + assert(df.isInstanceOf[DataFrame]) + } } test("Multi files test: Can you read several FITS file (image) discarding empty ones + set recordLength?") { - val fn = "src/test/resources/dirIm" - val df = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 2) - .option("verbose", true) - .option("recordlength", 2 * 1024) - .load(fn) - - df.count() - - assert(df.isInstanceOf[DataFrame]) + fileFormats.foreach { + fileFormat => + val fn = "src/test/resources/dirIm" + val df = spark.read.format(fileFormat) + .option("hdu", 2) + .option("verbose", true) + .option("recordlength", 2 * 1024) + .load(fn) + + df.count() + + assert(df.isInstanceOf[DataFrame]) + } } test("Multi files test: Can you read several FITS file (image), and fail if there are empty ones??") { - val fn = "src/test/resources/dirIm/*.fits" - val df = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 2) - .option("verbose", true) - .option("mode", "FAILFAST") - .load(fn) - - val exception = intercept[AssertionError] { - df.count() + fileFormats.foreach { + fileFormat => + val fn = "src/test/resources/dirIm/*.fits" + val df = spark.read.format(fileFormat) + .option("hdu", 2) + .option("verbose", true) + .option("mode", "FAILFAST") + + // in V2 schema is loaded while the data frame is created + val exception = intercept[AssertionError] { + df.load(fn).count + } + + assert(exception.getMessage.contains("You are trying to add HDU data with different structures!")) } - - assert(exception.getMessage.contains("You are trying to add HDU data with different structures!")) } test("No file test: Can you detect an error if there is no input FITS file found?") { - val fn = "src/test/resources/dirfjsdhf" - val results = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .option("verbose", true) - .option("recordlength", 16 * 1024) - - val exception = intercept[NullPointerException] { - results.load(fn) - } - - assert(exception.getMessage.contains("0 files detected")) + fileFormats.foreach { + fileFormat => + val fn = "src/test/resources/dirfjsdhf" + val results = spark.read.format(fileFormat) + .option("hdu", 1) + .option("verbose", true) + .option("recordlength", 16 * 1024) + + val exception = intercept[NullPointerException] { + results.load(fn) + } + + assert(exception.getMessage.contains("0 files detected")) + } } // Test ordering of elements in the DF test("Ordering test: Is the first element of the DF correct?") { - val results = spark.read.format("com.astrolabsoftware.sparkfits") - .option("hdu", 1) - .load(fn) - assert(results.select(col("target")).first.getString(0) == "NGC0000000") + fileFormats.foreach { + fileFormat => + val results = spark.read.format(fileFormat) + .option("hdu", 1) + .load(fn) + assert(results.select(col("target")).first.getString(0) == "NGC0000000") + } } } From 10ab44487dde333208930b24e7f5a07288c5bb15 Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Mon, 30 Dec 2019 18:37:03 +0530 Subject: [PATCH 15/16] Code cleanup --- .../sparkfits/v2/FitsPartitionReader.scala | 17 ++--------------- .../sparkfits/v2/FitsTable.scala | 4 ++-- 2 files changed, 4 insertions(+), 17 deletions(-) diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala index 57aa3a1..15be2ed 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsPartitionReader.scala @@ -52,18 +52,12 @@ class FitsPartitionReader[T <: InternalRow]( private def setCurrentFileParams(): Unit = { if (!currentFitsMetadata.isDefined || currentFitsMetadata.get.index != currentFileIndex) { - println( - s""" - | Info: - | Number of files: ${partition.files.size} - | Index: ${partition.index} - |""".stripMargin) currentFitsMetadata = Option(new FitsMetadata(partition.files(currentFileIndex), currentFileIndex, conf)) fits = currentFitsMetadata.get.fits } } + override def next(): Boolean = { - println("Getting next record") // We are done reading all the files in the partition if (currentFileIndex > partition.files.size-1) { return false @@ -90,14 +84,7 @@ class FitsPartitionReader[T <: InternalRow]( fits.data.readFully(recordValueBytes, 0, currentFitsMetadata.get.rowSizeInt) // FixMe: We can just directly read the rows as UnsafeRow to avoid unnecessary conversion // back and forth -// currentRow = InternalRow.fromSeq( -// fits.getRow(recordValueBytes).map(CatalystTypeConverters.convertToCatalyst(_))) - -// currentRow = InternalRow.fromSeq( -// fits.getRow(recordValueBytes).map(converters.toRow(_))) - currentRow = converters.toRow(Row.fromSeq(recordValueBytes)) - - println("Got record") + currentRow = converters.toRow(Row.fromSeq(fits.getRow(recordValueBytes))) true } diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala index 6f48c36..228918c 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/v2/FitsTable.scala @@ -87,8 +87,8 @@ case class FitsTable( getSchema(fits) } - // We don't really have the notion of table name FITS. So just returning the location - override def name(): String = s"FITS Table: ${options.get("path")}" + // We don't really have the notion of table name in FITS. So just returning the location + override def name(): String = s"${options.get("path")}" // Here we define, functionality supported by FITS datasource override def capabilities: java.util.Set[TableCapability] = Set(BATCH_READ).asJava From 6bfbd706de83c877be48013d4a324ee4e2c8f30e Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Mon, 13 Jan 2020 13:24:40 +0530 Subject: [PATCH 16/16] Dummy commit --- .../com/astrolabsoftware/sparkfits/utils/FitsMetadata.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsMetadata.scala b/src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsMetadata.scala index 2bc3c6b..0a2130b 100644 --- a/src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsMetadata.scala +++ b/src/main/scala/com/astrolabsoftware/sparkfits/utils/FitsMetadata.scala @@ -73,7 +73,8 @@ class FitsMetadata(partitionedFile: PartitionedFile, val index: Int, conf: Confi // For Table, seek for a round number of lines for the record - // ToDo: Cases when the user has given the record length + // ToDo: Cases when the user has given the record length. Currenty, this + // recordLength is not getting used. recordLength = (recordLengthFromUser / rowSizeInt) * rowSizeInt // Make sure that the recordLength is not bigger than the block size!