Skip to content

Commit

Permalink
Don't filter integer columns with non-integer values (float/double, d…
Browse files Browse the repository at this point in the history
…ecimal)
  • Loading branch information
johanl-db committed Nov 27, 2023
1 parent a392ef5 commit 1f88c4f
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.datasources.parquet

import java.lang.{Boolean => JBoolean, Double => JDouble, Float => JFloat, Long => JLong}
import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Long => JLong, Short => JShort}
import java.math.{BigDecimal => JBigDecimal}
import java.nio.charset.StandardCharsets.UTF_8
import java.sql.{Date, Timestamp}
Expand Down Expand Up @@ -614,7 +614,10 @@ class ParquetFilters(
case ParquetBooleanType => value.isInstanceOf[JBoolean]
case ParquetIntegerType if value.isInstanceOf[Period] => true
case ParquetByteType | ParquetShortType | ParquetIntegerType => value match {
case v: Number => v.longValue() >= Int.MinValue && v.longValue() <= Int.MaxValue
// Byte/Short/Int are all stored as INT32 in Parquet so filters are built using type Int.
// We don't create a filter if the value would overflow.
case _: JByte | _: JShort | _: Integer => true
case v: JLong => v.longValue() >= Int.MinValue && v.longValue() <= Int.MaxValue
case _ => false
}
case ParquetLongType => value.isInstanceOf[JLong] || value.isInstanceOf[Duration]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.datasources.parquet

import java.io.File
import java.lang.{Long => JLong}
import java.lang.{Double => JDouble, Float => JFloat, Long => JLong}
import java.math.{BigDecimal => JBigDecimal}
import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
Expand Down Expand Up @@ -919,7 +919,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared

for {
column <- Seq("cbyte", "cshort", "cint")
value <- Seq(JLong.MAX_VALUE, JLong.MIN_VALUE): Seq[JLong]
value <- Seq(JLong.MAX_VALUE, JLong.MIN_VALUE).map(JLong.valueOf)
} {
val filters = Seq(
sources.LessThan(column, value),
Expand Down Expand Up @@ -958,18 +958,18 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
val parquetFilters = createParquetFilters(parquetSchema)

val filters = Seq(
sources.LessThan("cbyte", "1"),
sources.LessThan("cshort", "1"),
sources.LessThan("cint", "1"),
sources.LessThan("clong", "1"),
sources.LessThan("cfloat", 1.0D),
sources.LessThan("cdouble", 1.0F),
sources.LessThan("cboolean", "true"),
sources.LessThan("cstring", 1),
sources.LessThan("cbyte", String.valueOf("1")),
sources.LessThan("cshort", JBigDecimal.valueOf(1)),
sources.LessThan("cint", JFloat.valueOf(JFloat.NaN)),
sources.LessThan("clong", String.valueOf("1")),
sources.LessThan("cfloat", JDouble.valueOf(1.0D)),
sources.LessThan("cdouble", JFloat.valueOf(1.0F)),
sources.LessThan("cboolean", String.valueOf("true")),
sources.LessThan("cstring", Integer.valueOf(1)),
sources.LessThan("cdate", Timestamp.valueOf("2018-01-01 00:00:00")),
sources.LessThan("ctimestamp", Date.valueOf("2018-01-01")),
sources.LessThan("cbinary", 1),
sources.LessThan("cdecimal", 1234)
sources.LessThan("cbinary", Integer.valueOf(1)),
sources.LessThan("cdecimal", Integer.valueOf(1234))
)
for (filter <- filters) {
assert(parquetFilters.createFilter(filter).isEmpty,
Expand Down

0 comments on commit 1f88c4f

Please sign in to comment.