Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allows async iterators to be evaluated in extract step #835

Closed
rudolfix opened this issue Dec 17, 2023 · 0 comments · Fixed by #905
Closed

allows async iterators to be evaluated in extract step #835

rudolfix opened this issue Dec 17, 2023 · 0 comments · Fixed by #905
Assignees

Comments

@rudolfix
Copy link
Collaborator

rudolfix commented Dec 17, 2023

Background
Allow resources and transformers to be async generators. Allows DltResource to accept async iterator as data.
Async iterators should be evaluated in PipeIterator like regular iterators, but via futures pool.
Please note:

  1. async iterators are still iterated item by item so a single async iterator does not help to run extraction in parallel. however several such iterators may be run in parallel in futures pool
  2. note that DltResource implements a regular iterator interface and this ticket does not change that.

Implementation
One approach could be to wrap async iterator in a coroutine that will next element and then return this element and the async generator, that is immediately re-scheduled if not empty. Also several code paths are preventing async generators from being used. See tests below

Also we need to improve the resource state name being available in executed awaitables / callables. Most probably we will wrap all of them and set the resource name just before calling - already in the pool. if such resources is accessed as a first operation all should work correctly

Tests
Following cases should pass. You'll find more to test during the implementation

@dlt.resource
    async def async_gen_resource(idx):
        for l in ["a", "b", "c"]:
            await asyncio.sleep(1)
            yield {"async_gen": idx, "letter": l}

    pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True)
    pipeline_1.run(
        async_gen_resource(10)
    )
    pipeline_1.run(
        async_gen_table(11)
    )
@rudolfix rudolfix self-assigned this Jan 19, 2024
@rudolfix rudolfix moved this from Todo to Planned in dlt core library Jan 19, 2024
@sh-rp sh-rp moved this from Planned to In Progress in dlt core library Jan 23, 2024
@sh-rp sh-rp linked a pull request Jan 23, 2024 that will close this issue
@github-project-automation github-project-automation bot moved this from In Progress to Done in dlt core library Jan 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.

2 participants