Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pyspark): add official support and ci testing with spark connect #10187

Merged
merged 21 commits into from
Sep 23, 2024

Conversation

cpcloud
Copy link
Member

@cpcloud cpcloud commented Sep 20, 2024

Description of changes

This PR adds testing for using the pyspark Ibis backend with spark-connect.

The way this is done is running a Spark connect instance as a docker compose
service, similar to our other client-server model backends.

The primary bit of functionality that isn't tested is UDFs (which means JSON unwrapping is also not tested, because that's implemented as a UDF).

These effectively require a clone of the Python environment on the server, and that seems out of scope for initial support of spark connect.

WIP for now.

Wanted to get some feedback on the testing approach, which is basically to set up fixtures
differently depending on the value of the SPARK_REMOTE environment variable.

Issues closed

@cpcloud cpcloud added this to the 10.0 milestone Sep 20, 2024
@cpcloud cpcloud added the feature Features or general enhancements label Sep 20, 2024
@github-actions github-actions bot added tests Issues or PRs related to tests ci Continuous Integration issues or PRs pyspark The Apache PySpark backend labels Sep 20, 2024
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I deleted this file, because all of the tests are redundant with other array tests that we have in the main backend test suite.

@@ -116,7 +119,22 @@ def test_alias_after_select(t, df):


def test_interval_columns_invalid(con):
msg = r"DayTimeIntervalType\(0, 1\) couldn't be converted to Interval"
df_interval_invalid = con._session.createDataFrame(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the setup here, because this is the only place this table is used.

@@ -761,7 +765,6 @@ def _create_cached_table(self, name, expr):
def _drop_cached_table(self, name):
self._session.catalog.dropTempView(name)
t = self._cached_dataframes.pop(name)
assert t.is_cached
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why this assert fails with spark connect. Identical objects (computed using id) have different values of this property across time. That said, I haven't looked into how this property is implemented yet.

["pyspark"],
condition=IS_SPARK_REMOTE,
raises=AssertionError,
reason="somehow, transformed results are different types",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe these are all the result of Spark connect unconditionally using Arrow for transport, which is great long term, but not compatible with a bunch of our existing array tests that want Python Nones and not numpy.nans.

["pyspark"],
condition=IS_SPARK_REMOTE,
raises=PySparkConnectGrpcException,
reason="arrow conversion breaks",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-duration intervals in arrow are extremely hard to use, i'm surprised only these two test cases are failing here.

@cpcloud cpcloud marked this pull request as ready for review September 20, 2024 19:00
Copy link
Member

@gforsyth gforsyth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm of two minds on this one. We shouldn't add another backend for what is effectively just a change in the connection method, and the test suite isn't really designed around changes in connection method this way.

On the other hand, I'm not hugely thrilled with needing to add condition kwargs to some of our xfail markers. My worry there is specifically needing to pull pyspark into a separate mark definition because we need to add condition to only xfail one mode of pyspark failures, which seems cluttered and annoying. But also, it hasn't happened here and if anything, I would expect that the two connection modes would tend towards feature parity and not away from it.

I think this can go in -- it is certainly running the test suite against both connection methods. If I think of something better, we can always revisit and refactor.

Thanks for slogging through all of this!

@cpcloud cpcloud force-pushed the spark-connect branch 3 times, most recently from 260d1b1 to b02fb34 Compare September 23, 2024 09:56
@cpcloud
Copy link
Member Author

cpcloud commented Sep 23, 2024

Lucky for us there's a deadlock happening during an invocation of _finalize_memtable 🙄

Traceback (most recent call last):
  File "/nix/store/h3i0acpmr8mrjx07519xxmidv8mpax4y-python3-3.12.5/lib/python3.12/weakref.py", line 666, in _exitfunc
    f()
  File "/nix/store/h3i0acpmr8mrjx07519xxmidv8mpax4y-python3-3.12.5/lib/python3.12/weakref.py", line 590, in __call__
    return info.func(*info.args, **(info.kwargs or {}))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/cloud/src/ibis/ibis/backends/__init__.py", line 1095, in _finalize_in_memory_table
    self._finalize_memtable(name)
  File "/home/cloud/src/ibis/ibis/backends/pyspark/__init__.py", line 459, in _finalize_memtable
    self._session.catalog.dropTempView(name)
  File "/nix/store/0gr6r5154zyrww5cjj4kybd50vsqp8qn-python3-3.12.5-env/lib/python3.12/site-packages/pyspark/sql/connect/catalog.py", line 266, in dropTempView
    pdf = self._execute_and_fetch(plan.DropTempView(view_name=viewName))
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/nix/store/0gr6r5154zyrww5cjj4kybd50vsqp8qn-python3-3.12.5-env/lib/python3.12/site-packages/pyspark/sql/connect/catalog.py", line 49, in _execute_and_fetch
    pdf = DataFrame.withPlan(catalog, session=self._sparkSession).toPandas()
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/nix/store/0gr6r5154zyrww5cjj4kybd50vsqp8qn-python3-3.12.5-env/lib/python3.12/site-packages/pyspark/sql/connect/dataframe.py", line 1663, in toPandas
    return self._session.client.to_pandas(query)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/nix/store/0gr6r5154zyrww5cjj4kybd50vsqp8qn-python3-3.12.5-env/lib/python3.12/site-packages/pyspark/sql/connect/client/core.py", line 869, in to_pandas
    (self_destruct_conf,) = self.get_config_with_defaults(
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/nix/store/0gr6r5154zyrww5cjj4kybd50vsqp8qn-python3-3.12.5-env/lib/python3.12/site-packages/pyspark/sql/connect/client/core.py", line 1349, in get_config_with_defaults
    configs = dict(self.config(op).pairs)
                   ^^^^^^^^^^^^^^^
  File "/nix/store/0gr6r5154zyrww5cjj4kybd50vsqp8qn-python3-3.12.5-env/lib/python3.12/site-packages/pyspark/sql/connect/client/core.py", line 1370, in config
    resp = self._stub.Config(req, metadata=self._builder.metadata())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/nix/store/0gr6r5154zyrww5cjj4kybd50vsqp8qn-python3-3.12.5-env/lib/python3.12/site-packages/grpc/_channel.py", line 1178, in __call__
    ) = self._blocking(
        ^^^^^^^^^^^^^^^
  File "/nix/store/0gr6r5154zyrww5cjj4kybd50vsqp8qn-python3-3.12.5-env/lib/python3.12/site-packages/grpc/_channel.py", line 1146, in _blocking
    call = self._channel.segregated_call(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 547, in grpc._cython.cygrpc.Channel.segregated_call
  File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 403, in grpc._cython.cygrpc._segregated_call
  File "/nix/store/h3i0acpmr8mrjx07519xxmidv8mpax4y-python3-3.12.5/lib/python3.12/threading.py", line 300, in __enter__
    return self._lock.__enter__()

@cpcloud
Copy link
Member Author

cpcloud commented Sep 23, 2024

@gforsyth I know you already approved, but wanted to get your thoughts/questions on having to remove memtable finalization for PySpark, to avoid what apparently is a deadlock when trying to invoke dropTempView during memtable finalization.

@gforsyth
Copy link
Member

Hmm, this seems like a variation on our usual "don't design to the lowest common denominator", but again, having an entirely separate backend just for a different connection method is gross.

In the interest of not kneecapping non-spark-connect-spark, do we want to add in a hacky check that we're running on Spark Connect and use that to set something on the backend instance? And then define our finalizer accordingly?

@cpcloud
Copy link
Member Author

cpcloud commented Sep 23, 2024

Oh, I guess we can check the SparkSession type 😬

@cpcloud
Copy link
Member Author

cpcloud commented Sep 23, 2024

Worth a shot if it works, until we can get some user feedback on how people are deploying it.

@cpcloud
Copy link
Member Author

cpcloud commented Sep 23, 2024

I guess one argument (slightly) in favor of no-op is that these views all get cleaned up on process termination.

@gforsyth
Copy link
Member

I guess one argument (slightly) in favor of no-op is that these views all get cleaned up on process termination.

Yeah, I don't know if this is more an issue of "purity" in that we should clean up when we can vs. do people have long-running spark sessions that they want to keep "clean"

@gforsyth gforsyth merged commit abb5593 into ibis-project:main Sep 23, 2024
78 checks passed
@cpcloud cpcloud deleted the spark-connect branch September 23, 2024 21:08
ncclementi pushed a commit to ncclementi/ibis that referenced this pull request Sep 24, 2024
…ibis-project#10187)

## Description of changes

This PR adds testing for using the pyspark Ibis backend with
spark-connect.

The way this is done is running a Spark connect instance as a docker
compose
service, similar to our other client-server model backends.

The primary bit of functionality that isn't tested is UDFs (which means
JSON unwrapping is also not tested, because that's implemented as a
UDF).

These effectively require a clone of the Python environment on the
server, and that seems out of scope for initial support of spark
connect.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci Continuous Integration issues or PRs feature Features or general enhancements pyspark The Apache PySpark backend tests Issues or PRs related to tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants