Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic asyncio support #43944

Open
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

dstandish
Copy link
Contributor

@dstandish dstandish commented Nov 12, 2024

This is meant to be sort of a hello world for asyncio support in airflow. It will be refined and extended in the future. E.g. probably we would add more config flexibility re the libraries, connect args etc. But it's good to start simple.

Anyway, ultimately, I think Airflow really needs to go in this direction: in the new REST API, in the new AIP-72 internal API server, in triggers, and ultimately, in the scheduler.

@tirkarthi
Copy link
Contributor

@@ -221,6 +221,7 @@ def test_get_documentation_package_path():
"""
"apache-airflow-providers-common-sql>=1.17.0",
"apache-airflow>=2.8.0",
"asyncpg>=0.30.0",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

psycopg 3 also has async support but might conflict with psycopg 2 already installed.

https://www.psycopg.org/psycopg3/docs/advanced/async.html

https://docs.sqlalchemy.org/en/20/dialects/postgresql.html#module-sqlalchemy.dialects.postgresql.psycopg

True, yeah. We could in future look at allowing diff libs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark, so take with a pinch of salt, but https://pypi.org/project/asyncpg/

In our testing asyncpg is, on average, 5x faster than psycopg3.

That's def a big enough of a change that we should see if we notice any difference

@dstandish
Copy link
Contributor Author

dstandish commented Nov 13, 2024

Is this related to AIP-70?

https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-70+Migrating+to+asynchronous+programming

I think you prob know the answer. The AIP deals with asyncio, so yes strictly speaking related in that sense. But not "part of". That AIP has been in draft a long time not sure actively worked on. Anyway, I don't think what I'm doing here really requires an AIP.

@tirkarthi
Copy link
Contributor

There is a similar draft PR open as POC for it though not active. So just wanted to confirm. Thanks for the details.

#36504

Copy link
Member

@pierrejeambrun pierrejeambrun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really exciting.

Do we want an @provide_async_session as a follow up ? (I think Ash wanted to kill that decorator at some point, even if I find it kind of useful actually)

@ashb
Copy link
Member

ashb commented Nov 13, 2024

@pierrejeambrun A FastAPI dep to give an async session would be good, but I think anything in the models etc should be passed an explicit session.

@@ -199,13 +209,25 @@ def load_policy_plugins(pm: pluggy.PluginManager):
pm.load_setuptools_entrypoints("airflow.policy")


def _get_async_conn_uri_from_sync(sync_uri):
scheme, rest = sync_uri.split(":", maxsplit=1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQLA has stuff built in to parse URLs/schemas. Maybe we should use that instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -441,6 +463,9 @@ def configure_orm(disable_connection_pool=False, pool_class=None):

global Session
global engine
global async_engine
global create_async_session
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
global create_async_session

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why @ashb ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it's necessary ... it doesn't get set until configure_orm is called

@potiuk
Copy link
Member

potiuk commented Nov 13, 2024

NICE! Yeah. I really like to see it working with mysql compatibility and benchmarks 📦

(because of course MySQL is our beloved database)

@dstandish
Copy link
Contributor Author

Really exciting.

Do we want an @provide_async_session as a follow up ? (I think Ash wanted to kill that decorator at some point, even if I find it kind of useful actually)

i would say let's wait and see how / where / when we need it

@omkar-foss
Copy link
Collaborator

Lovely! Thank you for adding this, will try to start using this soon :)

dstandish and others added 10 commits November 13, 2024 19:14
This is meant to be sort of a hello world for asyncio support in airflow.  It will be refined and extended in the future.  But I think airflow ultimately really needs to go in this direction: in the new REST API, in the new AIP-72 internal API server, in triggers, and ultimately, in the scheduler.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants