Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize decorator #965

Merged
merged 28 commits into from
Mar 1, 2024
Merged

Parallelize decorator #965

merged 28 commits into from
Mar 1, 2024

Conversation

steinitzu
Copy link
Collaborator

@steinitzu steinitzu commented Feb 13, 2024

Description

Implements #934

Draft for now, may need to test more + add docs

Added a parallelized argument to resource. For me that seems like a good interface.
Under the hood it's basically the same decorator from the test implementation (maybe it makes sense to use it directly in some cases?) or we could integrate this more deeply in resource.

There was a lot of overhead coming from all the polling in PipeIterator when constantly yielding None, so I tried to implement more intelligent waiting for futures. Basically just block until some future completes when the pool is full instead of blindly polling and looping over the resources and that seems to make a big difference.
+ pretty big refactor of PipeIterator -> the futures worker pool is a separate class

Benchmark before and after optimizing: https://gist.github.com/steinitzu/671b47cdb7cbf61b2fab46cf1faefd86
The original iterator had a lot of overhead, especially when number of resources is <= number of workers.

@steinitzu steinitzu requested a review from sh-rp February 13, 2024 23:40
Copy link

netlify bot commented Feb 13, 2024

Deploy Preview for dlt-hub-docs ready!

Name Link
🔨 Latest commit d376cb5
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/65e1048b812d2000083b9cc7
😎 Deploy Preview https://deploy-preview-965--dlt-hub-docs.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify site configuration.

@steinitzu steinitzu changed the title Sthor/parallelize decorator Parallelize decorator Feb 14, 2024
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK with the code refactor. Code duplication you mentioned worries me. We didnt have it before!

Could you also add a parallelize method to the DltResource that will wrap existing gen so regular resources (ie. from verified sources) can be converted to parallel ones?

also you must test parallel transformers!

if self.free_slots == 0:
return None

if self.free_slots < 0: # TODO: Sanity check during dev, should never happen
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

user assert for that!

dlt/extract/concurrency.py Show resolved Hide resolved
dlt/extract/pipe.py Outdated Show resolved Hide resolved
return ResolvablePipeItem(item, step, pipe, meta)
pipe_item = ResolvablePipeItem(pipe_item, step, pipe, meta)

if isinstance(pipe_item.item, Awaitable) or callable(pipe_item.item):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this duplication looks really suspicious. AFAIK we were returning Awaitable/Callable item to the next function and resolved the item there. I do not see a reason for this code to be there. if this is a result of the code refactor then IMO something is still not right there

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an optimization, but I'm not totally happy with how this looks either. Want to give it a little more love.
The basic idea behind this is that we start multiple futures on each __next__ iteration, instead of just adding one future and doing the whole dance "poll -> iterate all sources -> repeat" each time. It seems to make a big difference.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My benchmark with and without this block:

Done in 10.724883079528809 seconds
Extracted 5000 items total from 5 resources

Done in 20.511144161224365 seconds
Extracted 5000 items total from 5 resources

@wraps(f)
def _wrap(*args: Any, **kwargs: Any) -> Any: # TODO: Type correctly
gen = f(*args, **kwargs)
if inspect.isfunction(gen):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if gen is a function, we'll call it when pipe iterator is constructed. do not start evaluating gen here! context:

  • pipes are cloned before evaluation. so if gen is generator function you'll be able to evaluate it multiple times
  • maybe someone will pass the source to a different thread for evaluation. we support parallel pipelines now (and this decorator is made to support that)
    instead check if this is generator function and if not raise a nice exception. please look into exceptions.py and use proper base class (where you can pass function name and a few other so user knows what is going on)

gen = f(*args, **kwargs)
if inspect.isfunction(gen):
gen = gen()
if inspect.isasyncgen(gen):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above.

@rudolfix
Copy link
Collaborator

one more comment to refactor: please split Pipe from PipeIterator. Also I think you can move items.py to typing.py but that's up to you

@steinitzu steinitzu force-pushed the sthor/parallelize-decorator branch 2 times, most recently from b9f0ddf to 57df1d9 Compare February 14, 2024 23:33
@sh-rp
Copy link
Collaborator

sh-rp commented Feb 19, 2024

I did not review the PR, but want to point out that returning None from a resource to skip to the next in round robin mode should still work for backwards compatibility at least alex is using this.

@rudolfix
Copy link
Collaborator

I did not review the PR, but want to point out that returning None from a resource to skip to the next in round robin mode should still work for backwards compatibility at least alex is using this.

💯 we have a test that makes sure that round robin really works test_rotation_on_none and all other tests must pass

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

besides all the comments:

  • modification to add_limit worries my. why it works right now? we are still yielding awaitables from the resource itslef no?
  • we need a test for add limit for parallelize (both resource and transformer)
  • also we need docs, but let's make this work first

dlt/extract/concurrency.py Outdated Show resolved Hide resolved
if not self.futures:
return None

if (item := self.resolve_next_future_no_wait()) is not None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this is needed? we'll get the same result using for .. as_completed loop below?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to guarantee ordered results from each resource. I.e. if there are 2 or more futures from the same pipe already done when this is called, we get the results in FIFO order, since the self._futures dict is in insertion order but as_completed yields in undefined order.

dlt/extract/concurrency.py Show resolved Hide resolved
dlt/extract/concurrency.py Show resolved Hide resolved
dlt/extract/decorators.py Outdated Show resolved Hide resolved
dlt/extract/source.py Show resolved Hide resolved
dlt/extract/utils.py Show resolved Hide resolved
tests/extract/test_decorators.py Show resolved Hide resolved

if parallelized:
assert len(threads) > 1
assert execution_order == ["one", "two", "one", "two", "one", "two"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope it sleeps long enough so test is really deteministic :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably beter to not count on it. Don't think we need to check exact order.

# assert len(threads) == 1
@pytest.mark.parametrize("parallelized", [True, False])
def test_parallelized_resource(parallelized: bool) -> None:
os.environ["EXTRACT__NEXT_ITEM_MODE"] = "fifo"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what for round robin? test it for both. I want to see what is the difference!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah can parametrize it. Should be not much difference. It's going round-robin implicitly but resetting at some points depending on timings.

@steinitzu
Copy link
Collaborator Author

I did not review the PR, but want to point out that returning None from a resource to skip to the next in round robin mode should still work for backwards compatibility at least alex is using this.

This should still be the same. Assuming the tests covering this are good also.
The only difference now is that returning callables moves to next source as well in some cases.

@rudolfix
Copy link
Collaborator

@steinitzu one more ask - when this is implemented could you check if our sql_database can be executed in parallel? looks like engine is thread safe and we open connection when already (possibly) in a thread

@steinitzu
Copy link
Collaborator Author

@steinitzu one more ask - when this is implemented could you check if our sql_database can be executed in parallel? looks like engine is thread safe and we open connection when already (possibly) in a thread

Good call, wonder if that will work with streaming results. The conn pool is thread safe, but we're streaming chunks on one connection to load a table.

@steinitzu steinitzu force-pushed the sthor/parallelize-decorator branch from 6f05f2e to 6fbd859 Compare February 20, 2024 22:37
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a feeling that this PR got more chaotic. but now I think I understand various edge cases. Please read my comments. There's not so much fixing.

btw. I realized that generators can be read from multiple threads not sure where I got the idea that it is not the case :)


result = list(some_source())

assert set(result) == {1, 2, 3, 4, 5, -1, -2, -3, -4, -5}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the result does not look parallelized. first pos data is evaluated and then neg data. I expect we do not really run it in the theread

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not checking the order here since it's not that guaranteed, just that all items are here. But I updated all the tests to check thread idents ( >1 thread and not main thread)
Would be nice to check execution order is roughly interleaved but tricky without making flaky tests

self._futures_pool.submit(pipe_item, block=True)
pipe_item = None
if len(self._futures_pool) >= sources_count:
# Return here so we're not collecting done futures forever
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

submitting, not collecting?

else:
self._current_source_index = (self._current_source_index - 1) % sources_count
while True:
# get next item from the current source
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this is gone?

# if we have checked all sources once and all returned None, then we can sleep a bit
                if self._current_source_index == first_evaluated_index:
                    sleep(self.futures_poll_interval)

we must keep it. if all generators return None we cannot loop forever. instead of sleeping we can return None though and let next handle the sleep part

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Figured the future submitting would return eventually. But yes also need to handle the returning None case. I'm returning None from this now and __next__ does polling

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah! you do optimized polling when waiting for a future but if there are no futures, there's no sleep and we max the cpu

dlt/extract/concurrency.py Outdated Show resolved Hide resolved
# do we need new item?
if pipe_item is None:
# if none then take element from the newest source
pipe_item = self._get_source_item()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

W8,

  1. we were taking items from futures pool without blocking before going for next item in the sources.
  2. if None we tried to get items from the source.
  3. if None we checked if iterator is maybe exhausted and possibly exit
  4. if not we were sleeping a little bit and go to 1

this pattern must remain. we cannot have any blocking operations anywhere because we'll start starving our pools.

we added additional complications by adding generators that return None. we prevent sources from starving futures by exiting when all sources returned None and taking items from future pool

there is OFC a problem because we sleep when there's no data anywhere. and this sleep is unconditional. we can improve it a little by waiting on a futures pool instead of sleeping (if any elements in it) and make our parallel and async generators to trigger a semaphore on which we sleep (but must be in thread storage let's not do it)

you can decrease CPU usage by moving done to special done dicts and improving _next_done_future method

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw. we always try to resolve items in the pool before getting more stuff from the sources

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now this is pretty much it. "get source item" -> "wait for a resolved future (with timeout)" -> repeat. Just the order is opposite, which I don't think matters other than we don't waste 10ms polling at the beginning of extract, right?
https://github.com/dlt-hub/dlt/blob/sthor%2Fparallelize-decorator/dlt/extract/pipe_iterator.py#L151-L169

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered doing something with locks or async "Event" yielded from resource initially, to skip pending sources and only wait exactly as needed. But got complicated. But don't think we have too much overhead anyway.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can decrease CPU usage by moving done to special done dicts and improving _next_done_future method

This is good, will do that.

# jobs to the pool, we ned to change this whole method to be inside a `threading.Lock`
self._wait_for_free_slot()
else:
return None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is dangerous. now you call submit without checking this. IMO we do not need a submit without waiting. or raise exception

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done this, submit always blocks

else:
pipe_item = ResolvablePipeItem(pipe_item, step, pipe, meta)

if isinstance(pipe_item.item, Awaitable) or callable(pipe_item.item):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's really no reason to have it. as I mentioned next will handle this. there's small overhead in the main loop for checking the pool for done and going back here just to get new item so if you want then submit without blocking and if it fails return the item so blocking version in next handles this.
if len(self._futures_pool) >= sources_count: is super arbitrary


if pipe_item is None:
# Block until a future is resolved
pipe_item = self._futures_pool.resolve_next_future()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should never block when resolving next future.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could do it as an optimzation for polling sleep if there's anything in the pool but with timeout of the futures_poll_interval

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense and I don't see any performance impact. That is using the poll_interval as timeout for done future, doing that now

@steinitzu
Copy link
Collaborator Author

@rudolfix Few more adjustements and tests.

The duplicate "submit futures" block in _get_source_item is gone. I was getting huge overhead (2x runtime in some cases) without it originally, but now after all the changes I see no real difference so there's no need for it.

@steinitzu steinitzu force-pushed the sthor/parallelize-decorator branch from c469541 to b7acf93 Compare February 23, 2024 19:44
@steinitzu steinitzu force-pushed the sthor/parallelize-decorator branch from 2a96b74 to acb3b45 Compare February 23, 2024 20:34
@rudolfix rudolfix marked this pull request as ready for review February 27, 2024 21:07
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now LGTM!
Check this commit: 6af5c1a WDYT?

Please amend docs:

  1. In resource.md please add a chapter
### Declare parallel and async resources

Where you just mention that async generators or parallel flag can be used to allow many resources to be executed at once. Then link to performance.md
In the performance kick our @dlt.defer example and use parallelize instead. Explain how it works with generators and regular functions.
provide code snippets, mind that code snippets are tested.

There are two examples in examples section where defer on transformers should be replaced with parallelize
Remember to change the source of example in the relevant test!
in chess_production:

# this resource takes data from players and returns profiles
    # it uses `defer` decorator to enable parallel run in thread pool.
    # defer requires return at the end so we convert yield into return (we return one item anyway)
    # you can still have yielding transformers, look for the test named `test_evolve_schema`
    @dlt.transformer(data_from=players, write_disposition="replace")
    @dlt.defer

and transformers

# a special case where just one item is retrieved in transformer
    # a whole transformer may be marked for parallel execution
    @dlt.transformer
    @dlt.defer
    def species(pokemon_details):

else:
self._current_source_index = (self._current_source_index - 1) % sources_count
while True:
# get next item from the current source
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah! you do optimized polling when waiting for a future but if there are no futures, there's no sleep and we max the cpu

@steinitzu
Copy link
Collaborator Author

Updated the docs. Couple of concerns:

  1. I left one instance of @defer in the pokemon example since it's a different use case. We're not planning on removing it, right?
  2. The "parallel pipelines asyncio" example I'm not sure if it works. For me it hangs sometimes and the generators don't look like they're even executing (print debugging) though the test passes https://github.com/dlt-hub/dlt/blob/sthor%2Fdestination-docs/docs/website/docs/reference/performance_snippets/performance-snippets.py#L123 . It's the same on the devel branch, was this working at some point?

@rudolfix
Copy link
Collaborator

Updated the docs. Couple of concerns:

  1. I left one instance of @defer in the pokemon example since it's a different use case. We're not planning on removing it, right?

No, it still works well when you want parallelize a loop.

  1. The "parallel pipelines asyncio" example I'm not sure if it works. For me it hangs sometimes and the generators don't look like they're even executing (print debugging) though the test passes https://github.com/dlt-hub/dlt/blob/sthor%2Fdestination-docs/docs/website/docs/reference/performance_snippets/performance-snippets.py#L123 . It's the same on the devel branch, was this working at some point?

It runs. But console output gets disabled when two pipelines are gathered at the same time. Something evil must happen there... It does not matter if we run the pools in extract step or if we run extract step at all. if I serialize all the calls by locking in _run_pipeline then the problem is gone. any ideas what could be happening?
also it indeed locks from time to time. probably connected to the above

@rudolfix rudolfix merged commit 4a3d6ab into devel Mar 1, 2024
58 of 66 checks passed
@rudolfix rudolfix deleted the sthor/parallelize-decorator branch March 1, 2024 10:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants