Skip to content

[WIP][SQL] Incapsulate type operations #51467

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, CalendarIntervalEncoder, NullEncoder, PrimitiveBooleanEncoder, PrimitiveByteEncoder, PrimitiveDoubleEncoder, PrimitiveFloatEncoder, PrimitiveIntEncoder, PrimitiveLongEncoder, PrimitiveShortEncoder, SparkDecimalEncoder, VariantEncoder}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.types.{PhysicalBinaryType, PhysicalIntegerType, PhysicalLongType}
import org.apache.spark.sql.catalyst.types.{PhysicalBinaryType, PhysicalIntegerType, PhysicalLongType, PhyTypeOps}
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, CalendarIntervalType, DataType, DateType, DayTimeIntervalType, Decimal, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ObjectType, ShortType, StringType, StructType, TimestampNTZType, TimestampType, TimeType, UserDefinedType, VariantType, YearMonthIntervalType}
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, CalendarIntervalType, DataType, DateType, DayTimeIntervalType, Decimal, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ObjectType, ShortType, StringType, StructType, TimestampNTZType, TimestampType, UserDefinedType, VariantType, YearMonthIntervalType}
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String, VariantVal}

/**
Expand Down Expand Up @@ -99,10 +99,10 @@ object EncoderUtils {

def dataTypeJavaClass(dt: DataType): Class[_] = {
dt match {
case _ if PhyTypeOps.supports(dt) => PhyTypeOps(dt).getJavaClass
Copy link
Member Author

@MaxGekk MaxGekk Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, we will get an Ops object here, and we will match by it instead of PhyTypeOps.supports. The current implementation is just workaround to avoid passing TypeOps instead of DataType.

case _: DecimalType => classOf[Decimal]
case _: DayTimeIntervalType => classOf[PhysicalLongType.InternalType]
case _: YearMonthIntervalType => classOf[PhysicalIntegerType.InternalType]
case _: TimeType => classOf[PhysicalLongType.InternalType]
case _: StringType => classOf[UTF8String]
case _: StructType => classOf[InternalRow]
case _: ArrayType => classOf[ArrayData]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions

import scala.annotation.tailrec

import org.apache.spark.sql.catalyst.types.PhyTypeOps
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -196,10 +197,11 @@ final class SpecificInternalRow(val values: Array[MutableValue]) extends BaseGen

@tailrec
private[this] def dataTypeToMutableValue(dataType: DataType): MutableValue = dataType match {
case _ if PhyTypeOps.supports(dataType) => PhyTypeOps(dataType).getMutableValue
// We use INT for DATE and YearMonthIntervalType internally
case IntegerType | DateType | _: YearMonthIntervalType => new MutableInt
// We use Long for Timestamp, Timestamp without time zone and DayTimeInterval internally
case LongType | TimestampType | TimestampNTZType | _: DayTimeIntervalType | _: TimeType =>
case LongType | TimestampType | TimestampNTZType | _: DayTimeIntervalType =>
new MutableLong
case FloatType => new MutableFloat
case DoubleType => new MutableDouble
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1985,11 +1985,12 @@ object CodeGenerator extends Logging {

@tailrec
def javaClass(dt: DataType): Class[_] = dt match {
case _ if PhyTypeOps.supports(dt) => PhyTypeOps(dt).getJavaClass
case BooleanType => java.lang.Boolean.TYPE
case ByteType => java.lang.Byte.TYPE
case ShortType => java.lang.Short.TYPE
case IntegerType | DateType | _: YearMonthIntervalType => java.lang.Integer.TYPE
case LongType | TimestampType | TimestampNTZType | _: DayTimeIntervalType | _: TimeType =>
case LongType | TimestampType | TimestampNTZType | _: DayTimeIntervalType =>
java.lang.Long.TYPE
case FloatType => java.lang.Float.TYPE
case DoubleType => java.lang.Double.TYPE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ object Literal {
* Create a literal with default value for given DataType
*/
def default(dataType: DataType): Literal = dataType match {
case _ if LiteralTypeOps.supports(dataType) => LiteralTypeOps(dataType).getDefaultLiteral
case NullType => create(null, NullType)
case BooleanType => Literal(false)
case ByteType => Literal(0.toByte)
Expand All @@ -200,7 +201,6 @@ object Literal {
case DateType => create(0, DateType)
case TimestampType => create(0L, TimestampType)
case TimestampNTZType => create(0L, TimestampNTZType)
case t: TimeType => create(0L, t)
case it: DayTimeIntervalType => create(0L, it)
case it: YearMonthIntervalType => create(0, it)
case CharType(length) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.types

import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.types.{DataType, TimeType}

// Literal operations over Catalyst's types
trait LiteralTypeOps {
// Gets a literal with default value of the type
def getDefaultLiteral: Literal
}

object LiteralTypeOps {
def supports(dt: DataType): Boolean = dt match {
case _: TimeType => true
case _ => false
}
Comment on lines +30 to +33
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like duplicate code, compared to PhyTypeOps, e.g. something that we wanted to avoid in the first place (adding TimeTime to a bunch of matches).


def apply(dt: DataType): LiteralTypeOps = TypeOps(dt).asInstanceOf[LiteralTypeOps]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.types

import org.apache.spark.sql.catalyst.expressions.MutableValue
import org.apache.spark.sql.types.{DataType, TimeType}

// Base operations over Catalyst's types.
trait PhyTypeOps extends TypeOps {
// Gets the underlying physical type
def getPhysicalType: PhysicalDataType

// Gets the Java class of the physical type
def getJavaClass: Class[_]

// Gets a mutable container for the physical type
def getMutableValue: MutableValue
}

object PhyTypeOps {
def supports(dt: DataType): Boolean = dt match {
case _: TimeType => true
case _ => false
}
Comment on lines +36 to +39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even this is somewhat of a duplicate, given that we already kind of match TimeType as part of TypeOps. Ideal scenario would be to do this logic only once, without the need to add TimeType to several matches.


def apply(dt: DataType): PhyTypeOps = TypeOps(dt).asInstanceOf[PhyTypeOps]
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.reflect.runtime.universe.typeTag
import org.apache.spark.sql.catalyst.expressions.{Ascending, BoundReference, InterpretedOrdering, SortOrder}
import org.apache.spark.sql.catalyst.util.{ArrayData, CollationFactory, MapData, SQLOrderingUtil}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteExactNumeric, ByteType, CalendarIntervalType, CharType, DataType, DateType, DayTimeIntervalType, Decimal, DecimalExactNumeric, DecimalType, DoubleExactNumeric, DoubleType, FloatExactNumeric, FloatType, FractionalType, IntegerExactNumeric, IntegerType, IntegralType, LongExactNumeric, LongType, MapType, NullType, NumericType, ShortExactNumeric, ShortType, StringType, StructField, StructType, TimestampNTZType, TimestampType, TimeType, VarcharType, VariantType, YearMonthIntervalType}
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteExactNumeric, ByteType, CalendarIntervalType, CharType, DataType, DateType, DayTimeIntervalType, Decimal, DecimalExactNumeric, DecimalType, DoubleExactNumeric, DoubleType, FloatExactNumeric, FloatType, FractionalType, IntegerExactNumeric, IntegerType, IntegralType, LongExactNumeric, LongType, MapType, NullType, NumericType, ShortExactNumeric, ShortType, StringType, StructField, StructType, TimestampNTZType, TimestampType, VarcharType, VariantType, YearMonthIntervalType}
import org.apache.spark.unsafe.types.{ByteArray, UTF8String, VariantVal}
import org.apache.spark.util.ArrayImplicits._

Expand All @@ -35,6 +35,7 @@ sealed abstract class PhysicalDataType {

object PhysicalDataType {
def apply(dt: DataType): PhysicalDataType = dt match {
case _ if PhyTypeOps.supports(dt) => PhyTypeOps(dt).getPhysicalType
case NullType => PhysicalNullType
case ByteType => PhysicalByteType
case ShortType => PhysicalShortType
Expand All @@ -54,7 +55,6 @@ object PhysicalDataType {
case DayTimeIntervalType(_, _) => PhysicalLongType
case YearMonthIntervalType(_, _) => PhysicalIntegerType
case DateType => PhysicalIntegerType
case _: TimeType => PhysicalLongType
case ArrayType(elementType, containsNull) => PhysicalArrayType(elementType, containsNull)
case StructType(fields) => PhysicalStructType(fields)
case MapType(keyType, valueType, valueContainsNull) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.types

import org.apache.spark.sql.catalyst.expressions.{Literal, MutableLong, MutableValue}
import org.apache.spark.sql.types.TimeType

case class TimeTypeOps (t: TimeType)
extends TypeOps
with PhyTypeOps
with LiteralTypeOps {

override def getPhysicalType: PhysicalDataType = PhysicalLongType
override def getJavaClass: Class[_] = classOf[PhysicalLongType.InternalType]
override def getMutableValue: MutableValue = new MutableLong

override def getDefaultLiteral: Literal = Literal.create(0L, t)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.types

import org.apache.spark.SparkException
import org.apache.spark.sql.types.{DataType, TimeType}

// Operations over a data type
trait TypeOps

// The factory of type operation objects.
object TypeOps {
def apply(dt: DataType): TypeOps = dt match {
case tt: TimeType => TimeTypeOps(tt)
case other => throw SparkException.internalError(
s"Cannot create an operation object of the type ${other.sql}")
}
}