Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Spark Connect #564

Open
aurokk opened this issue Dec 10, 2024 · 10 comments · May be fixed by #565
Open

Support Spark Connect #564

aurokk opened this issue Dec 10, 2024 · 10 comments · May be fixed by #565

Comments

@aurokk
Copy link

aurokk commented Dec 10, 2024

Hi!
What do you guys think about supporting spark connect?
I would like to contribute.
FYI @moskovetss

@aurokk aurokk changed the title Support spark connect Support Spark Connect Dec 10, 2024
@nanne-aben
Copy link
Collaborator

Hi @aurokk !

Sure, I'm open to that. I'm not super familiar with Spark Connect though. Just had a quick read on what it is, but what would really help me is if you could provide a small code example of how you'd ideally use Spark Connect with typedspark?

@aurokk
Copy link
Author

aurokk commented Dec 10, 2024

@nanne-aben

Actually, code written for Spark Connect looks the same as code written for Regular Spark, and if you don't use some specific API's of Regular Spark the code can be run on Regular Spark and Spark Connect as well. /docs/. So the API is the same, but it works differently in some cases under the hood. I expect such code from the docs of typedspark to work with Spark Connect.

from typedspark import Column, DataSet, Schema
from pyspark.sql.types import LongType, StringType

class Person(Schema):
    id: Column[LongType]
    name: Column[StringType]
    age: Column[LongType]

def foo(df: DataSet[Person]) -> DataSet[Person]:
    # do stuff
    return df

For now we are getting such errors.
Image

I haven't made any research yet, so i don't know the exact reason but we use Regular Spark and Spark Connect side by side and we really want to use typedspark with them both, so i'll investigate and prepare some PR.

@nanne-aben
Copy link
Collaborator

Thanks for the example!

The problem seems to be here

def __getattribute__(cls, name: str) -> Any:

This function catches any call to an attribute of the Schema. Then through here

dtype=cls._get_dtype(name), # type: ignore

And subsequently here:

column = get_type_hints(cls)[name]

It will call get_type_hints(cls), which apparently results in a call to cls.__no_type_check__ in Spark Connect, which in turn enters a recursion loop.

Over here

if (
name.startswith("__")
or name == "_attributes"
or name in cls._attributes
or name in dir(Protocol)
):
return object.__getattribute__(cls, name)

We've defined attributes for which we should ignore this behaviour. You could try adding __no_type_check__ to that. Maybe there's also a better way where we dynamically add all attributes from DataFrame (be it the standard Spark one or the Spark Connect one) to it. Not sure.

Would also be nice if we can actually unit test this behaviour, so we can monitor for regressions. Not sure how easy that is with Spark Connect though?

Hope this helps! Is this enough to get you started on your research?

@aurokk
Copy link
Author

aurokk commented Dec 11, 2024

@nanne-aben

Thank you. Actually, I made a small research yesterday too, and now I know where the problem is.

At first, here is a code sample. In case we have a regular spark session — the type of dataframe is pyspark.sql.dataframe, with spark connect we got pyspark.sql.connect.dataframe.

def test_regular_spark():
    session = SparkSession.Builder().master("local[*]").getOrCreate()  # "pyspark.sql.session"

    df = session.createDataFrame([(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])  # "pyspark.sql.dataframe"
    assert df.count() == 3

    # notice it is a regular pyspark.sql.dataframe
    assert type(df).__module__ == "pyspark.sql.dataframe"


def test_spark_connect():
    session = SparkSession.Builder().remote("sc://localhost:15002").getOrCreate()  # "pyspark.sql.connect.session", but type checkers and ide thinks it is "pyspark.sql.session"

    df = session.createDataFrame([(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])  # "pyspark.sql.connect.dataframe", but type checkers and ide thinks it is "pyspark.sql.dataframe"
    assert df.count() == 3

    # notice it isn't a regular pyspark.sql.dataframe but ide thinks it is
    assert type(df).__module__ == "pyspark.sql.connect.dataframe"

The Spark devs made spark connect specific classes with similar to regular spark API and just hid this details from users. It happens right here at session creation, actually we are getting remote session and it works with pyspark.sql.connect (connect) types instead of pyspark.sql (regular) types.

So the problem with typedspark is here and here too — we have a subclass of pyspark.sql.dataframe, but in case of spark connect we actually need a subclass of pyspark.sql.connect.dataframe. So we cast pyspark.sql.connect.dataframe to pyspark.sql.dataframe and it doesn't work cuz python calls routed to wrong class and this behaviour is just unexpected.

I changed the inherited type to pyspark.sql.connect.dataframe to test the idea and typedspark works good. So we need to check at runtime what type of DataFrame we got i guess. Have no idea for now how to handle this correctly.

Now i'm trying to write some tests and start a spark connect server in a test, it is a lil bit complicated :-)

@nanne-aben
Copy link
Collaborator

Good find! I'm surprised though that this leads to the recursion error you posted above. How does the subclassing of a different DataFrame class lead to a recursion error in Schema?

@aurokk
Copy link
Author

aurokk commented Dec 11, 2024

I have no idea, this screenshot was provided by my colleague, I was trying to reproduce it, but in my case python and debugger processes are getting stuck dead so I was unable to debug it in details. I tried to reduce stack size and so on, but nothing worked. I decided to move on, cuz it isn't supposed to work any way and all the code around looks good.

Do you have any suggestions on how to extend DataSet type to support spark connect for now? :-)

@aurokk aurokk linked a pull request Dec 11, 2024 that will close this issue
4 tasks
@aurokk
Copy link
Author

aurokk commented Dec 11, 2024

I made a reproducible example, now working on a fix.

@aurokk
Copy link
Author

aurokk commented Dec 11, 2024

Made some changes to understand the problem deeper. It looks like all types inherited from pyspark.sql modules are problematic, especially in cases where they call super classes. Ughhh.

Currently I think that I need to redesign the lib completely, to replace inheritance with composition to make DataFrame and its hidden logic to work correctly. So I guess it might be better to use proxy pattern or sth like that.

class DataSet:  # no inheritance
    def __init__(self, dataframe: DataFrame) -> None:  # but composition
        self._dataframe = dataframe
   
   @property
    def schema(self) -> StructType:
        return self._dataframe.schema
    
    ...

I'm not sure that it is possible now to make such functionality to work with spark connect seamlessly without user noticing it and without changing the logic, cuz here we call the wrong class in the first place (DataSet inherited from DataFrame of Regular Spark). Also here we use type inherited from Column of Regular Spark. :(

So currently I think supporting spark connect requires lots of work and it might be better to make some prototype from scratch at first, but anyway this change will introduce breaking changes.

Image

@nanne-aben
Copy link
Collaborator

Thanks for the reproducible example!

I wonder if we could somehow use the SparkSession object to see whether we're in a regular spark or in a spark connect session. The active SparkSession object can be retrieved anywhere with SparkSession.builder.getOrCreate(), so we wouldn't have to pass it around.

And then we use tje SparkSession object to see if it's a spark connect session, win which case we'd dynamically change the inheritance to spark.sql.connect..... Not 100% clean either though, we'd have to think about side effects there.

I can have a closer look tomorrow.

@nanne-aben
Copy link
Collaborator

Apologies, one day turned into a week. The last days of the year are always super busy!

I do have an idea on how we could approach this.

We could define something like:

import sys

def configure(spark_connect=False):
    global Column
    if spark_connect:
        from pyspark.sql.connect.column import Column
    else:
        from pyspark.sql import Column
    sys.modules[__name__].Column = Column

Which would allow for

import typedspark
typedspark.configure(spark_connect=True)

from typedspark import Column

And we could do the same for DataFrame too of course. It might be a fairly clean way to make this work.

I wanted to write up a bit more of a prototype, but I'm gonna be packing up soon for the holidays. I won't be around a laptop until the 6th of January. I can respond on my phone to messages, if you're interested in working further on this. Otherwise, we'll pick it up again in the new year!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants