Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#5201] feat(client-python): Implement expressions in python client #5646

Merged
merged 25 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
bade12b
[5201]Implement expressions in python client
SophieTech88 Nov 22, 2024
3fc7400
Update the script for named_reference and function_expression
SophieTech88 Nov 23, 2024
00d31e8
Update the distribution.py
SophieTech88 Nov 24, 2024
c21bba5
Update literals.py
SophieTech88 Nov 24, 2024
361acbc
Update sorts.py
SophieTech88 Nov 24, 2024
07a8a87
Update the transforms.py
SophieTech88 Nov 25, 2024
2bc8983
Fix a f string bug for distributions.py
SophieTech88 Nov 25, 2024
4bac456
Update the comment for Partition class and so on
SophieTech88 Nov 25, 2024
40a14ee
improve model structure
xunliu Nov 25, 2024
2a25eb4
Update the script and fix a name bug
SophieTech88 Nov 26, 2024
3bc79ca
Update the Literal class to raise NotImplementedError
SophieTech88 Nov 26, 2024
9fe8d6c
remove Literals folder
xunliu Nov 26, 2024
cbb2a55
Add literals folder
xunliu Nov 26, 2024
624929e
Update licenses headers and format
SophieTech88 Nov 26, 2024
c3d8271
Remove distributions, sorts, and transforms to separate PRs
SophieTech88 Nov 26, 2024
6042765
Update the data type in literals.py
SophieTech88 Nov 27, 2024
45ae0c3
imporve type in the literal.py
xunliu Dec 2, 2024
88acd36
fix CI
xunliu Dec 2, 2024
2c0e316
delete Decimal
xunliu Dec 3, 2024
8a28e7d
fix CI
xunliu Dec 3, 2024
3750bbf
Update the typing for list and set
SophieTech88 Dec 4, 2024
82333a7
Fix CI: import List in literal.py
SophieTech88 Dec 4, 2024
89b56bf
remove pass
xunliu Dec 4, 2024
d3dc38a
remove List
xunliu Dec 4, 2024
c68c530
Revert "remove List"
xunliu Dec 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from abc import abstractmethod
from typing import List

from gravitino.api.expressions.expression import Expression


class Literal(Expression):
"""
Represents a constant literal value in the public expression API.
"""

@abstractmethod
def value(self):
"""The literal value."""
pass

@abstractmethod
def data_type(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like that the above two methods are be implemented by subclasses anyway.
If that is true, I don't think we we do a pass here.
We may want to raise an exception instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Just updated the code to raise NotImplementedError() for those 2 functions.

"""The data type of the literal."""
pass

def children(self) -> List[Expression]:
return Expression.EMPTY_EXPRESSION
138 changes: 138 additions & 0 deletions clients/client-python/gravitino/api/expressions/Literals/literals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from decimal import Decimal
from typing import Union
from datetime import date, time, datetime

from gravitino.api.expressions.literals.literal import Literal


class LiteralImpl(Literal):
"""Creates a literal with the given type value."""

_value: Union[int, float, str, datetime, time, date, bool, Decimal, None]
_data_type: (
str # TODO: Need implement `api/src/main/java/org/apache/gravitino/rel/types`
)

def __init__(
self,
value: Union[int, float, str, datetime, time, date, bool, Decimal, None],
SophieTech88 marked this conversation as resolved.
Show resolved Hide resolved
data_type: str,
):
self._value = value
self._data_type = data_type

def value(self) -> Union[int, float, str, datetime, time, date, bool]:
return self._value

def data_type(self) -> str:
return self._data_type

def __eq__(self, other: object) -> bool:
if not isinstance(other, LiteralImpl):
return False
return (self._value == other._value) and (self._data_type == other._data_type)

def __hash__(self):
return hash((self._value, self._data_type))

def __str__(self):
return f"LiteralImpl(value={self._value}, data_type={self._data_type})"


class Literals:
"""The helper class to create literals to pass into Apache Gravitino."""

NULL = LiteralImpl(None, "NullType")

@staticmethod
def of(value, data_type) -> Literal:
return LiteralImpl(value, data_type)

@staticmethod
def boolean_literal(value: bool) -> Literal:
return LiteralImpl(value, "Boolean")

@staticmethod
def byte_literal(value: int) -> Literal:
return LiteralImpl(value, "Byte")

@staticmethod
def unsigned_byte_literal(value: int) -> Literal:
return LiteralImpl(value, "Unsigned Byte")

@staticmethod
def short_literal(value: int) -> Literal:
return LiteralImpl(value, "Short")

@staticmethod
def unsigned_short_literal(value: int) -> Literal:
return LiteralImpl(value, "Unsigned Short")

@staticmethod
def integer_literal(value: int) -> Literal:
return LiteralImpl(value, "Integer")

@staticmethod
def unsigned_integer_literal(value: int) -> Literal:
return LiteralImpl(value, "Unsigned Integer")

@staticmethod
def long_literal(value: int) -> Literal:
return LiteralImpl(value, "Long")

@staticmethod
def unsigned_long_literal(value: Decimal) -> Literal:
return LiteralImpl(value, "Unsigned Long")

@staticmethod
def float_literal(value: float) -> Literal:
return LiteralImpl(value, "Float")

@staticmethod
def double_literal(value: float) -> Literal:
return LiteralImpl(value, "Double")

@staticmethod
def decimal_literal(value: float) -> Literal:
return LiteralImpl(value, "Decimal")

@staticmethod
def date_literal(value: date) -> Literal:
return LiteralImpl(value, "Date")

@staticmethod
def time_literal(value: time) -> Literal:
return LiteralImpl(value, "Time")

@staticmethod
def timestamp_literal(value: datetime) -> Literal:
return LiteralImpl(value, "Timestamp")

@staticmethod
def timestamp_literal_from_string(value: str) -> Literal:
return Literals.timestamp_literal(datetime.fromisoformat(value))

@staticmethod
def string_literal(value: str) -> Literal:
return LiteralImpl(value, "String")

@staticmethod
def varchar_literal(length: int, value: str) -> Literal:
return LiteralImpl(value, f"Varchar({length})")
16 changes: 16 additions & 0 deletions clients/client-python/gravitino/api/expressions/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import List

from gravitino.api.expressions.distributions.strategy import Strategy
from gravitino.api.expressions.expression import Expression


class Distribution(Expression):
"""An interface that defines how data is distributed across partitions."""

def strategy(self) -> Strategy:
"""
Return the distribution strategy name.
"""
raise NotImplementedError

def number(self) -> int:
"""
Return the number of buckets/distribution.
For example, if the distribution strategy is HASH
* and the number is 10, then the data is distributed across 10 buckets.
"""
raise NotImplementedError

def expressions(self) -> List[Expression]:
"""Return the expressions passed to the distribution function."""
raise NotImplementedError

def children(self) -> List[Expression]:
return self.expressions()

def equals(self, distribution) -> bool:
"""
Indicates whether some other object is "equal to" this one.

Args:
distribution The reference distribution object with which to compare.

Returns:
Returns true if this object is the same as the obj argument; false otherwise.
"""
if distribution is None:
return False

return (
self.strategy() == distribution.strategy()
and self.number() == distribution.number()
and self.expressions() == distribution.expressions()
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import List, Tuple

from gravitino.api.expressions.distributions.distribution import Distribution
from gravitino.api.expressions.distributions.strategy import Strategy
from gravitino.api.expressions.expression import Expression
from gravitino.api.expressions.named_reference import NamedReference


class DistributionImpl(Distribution):
"""
Create a distribution on columns. Like distribute by (a) or (a, b), for complex like
distributing by (func(a), b) or (func(a), func(b))
"""

_strategy: Strategy
_number: int
_expressions: List[Expression]

def __init__(self, strategy: Strategy, number: int, expressions: List[Expression]):
self._strategy = strategy
self._number = number
self._expressions = expressions

def strategy(self) -> Strategy:
return self._strategy

def number(self) -> int:
return self._number

def expressions(self) -> List[Expression]:
return self._expressions

def __str__(self):
return f"DistributionImpl(strategy={self._strategy}, number={self._number}, expressions={self._expressions})"

def __eq__(self, other):
if self is other:
return True
if other is None or not isinstance(other, DistributionImpl):
return False
return (
self._strategy == other.strategy()
and self._number == other.number()
and self._expressions == other.expressions()
)

def __hash__(self):
return hash((self._strategy, self._number, tuple(self._expressions)))


# Helper methods to create distributions
class Distributions:
NONE = DistributionImpl(Strategy.NONE, 0, [])
"""NONE is used to indicate that there is no distribution."""
HASH = DistributionImpl(Strategy.HASH, 0, [])
"""List bucketing strategy hash, TODO: #1505 Separate the bucket number from the Distribution."""
RANGE = DistributionImpl(Strategy.RANGE, 0, [])
"""List bucketing strategy range, TODO: #1505 Separate the bucket number from the Distribution."""

@staticmethod
def even(number: int, *expressions) -> Distribution:
"""
Create a distribution by evenly distributing the data across the number of buckets.

:param number: The number of buckets
:param expressions: The expressions to distribute by
:return: The created even distribution
"""
return DistributionImpl(Strategy.EVEN, number, list(expressions))

@staticmethod
def hash(number: int, *expressions) -> Distribution:
"""
Create a distribution by hashing the data across the number of buckets.

:param number: The number of buckets
:param expressions: The expressions to distribute by
:return: The created hash distribution
"""
return DistributionImpl(Strategy.HASH, number, list(expressions))

@staticmethod
def of(strategy: Strategy, number: int, *expressions) -> Distribution:
"""
Create a distribution by the given strategy.

:param strategy: The strategy to use
:param number: The number of buckets
:param expressions: The expressions to distribute by
:return: The created distribution
"""
return DistributionImpl(strategy, number, list(expressions))

@staticmethod
def fields(
strategy: Strategy, number: int, *field_names: Tuple[str]
) -> Distribution:
"""
Create a distribution on columns. Like distribute by (a) or (a, b), for complex like
distributing by (func(a), b) or (func(a), func(b)), please use DistributionImpl.Builder to create.

NOTE: a, b, c are column names.

SQL syntax: distribute by hash(a, b) buckets 5
fields(Strategy.HASH, 5, "a", "b")

SQL syntax: distribute by hash(a, b, c) buckets 10
fields(Strategy.HASH, 10, "a", "b", "c")

SQL syntax: distribute by EVEN(a) buckets 128
fields(Strategy.EVEN, 128, "a")

:param strategy: The strategy to use.
:param number: The number of buckets.
:param field_names: The field names to distribute by.
:return: The created distribution.
"""
expressions = [NamedReference.field(field_name) for field_name in field_names]
return Distributions.of(strategy, number, *expressions)
Loading
Loading