Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor sinks #386

Merged
merged 10 commits into from
Nov 24, 2020
Merged

Refactor sinks #386

merged 10 commits into from
Nov 24, 2020

Conversation

roveo
Copy link
Contributor

@roveo roveo commented Nov 13, 2020

Resolves #384

  • added Sink class
  • moved related code to sinks.py
  • moved related tests to test_sinks.py
  • didn't remove sink_to_file (which is not a sink, but still someone might be using it)
  • added sink_to_textfile (for symmetry) since there is from_textfile in core
  • sink_to_list is a special case since it returns a list. This can be done with a class, but involves some black magic with __new__, don't think it's worth it.

Copy link
Member

@martindurant martindurant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A good refactor, I am in favour.

streamz/__init__.py Show resolved Hide resolved
streamz/sinks.py Show resolved Hide resolved
streamz/sinks.py Outdated
def __init__(self, upstream, func, *args, **kwargs):
self.func = func
# take the stream specific kwargs out
stream_name = kwargs.pop("stream_name", None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine, and I know it's only moved code, but would be good to not have to update these whenever the superclass might gain more keywords

Copy link
Contributor Author

@roveo roveo Nov 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how we can both allow mixing stream-specific and func arguments in **kwargs and avoid explicitly listing them.

I can see two solutions here:

  • disallow stream kwargs and just call func(*args, **kwargs)
  • make it a normal stream and have the user deal with constructing the correct single-arg function (with wrapping, closure, partials etc.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see here: 5c1b3a6#diff-eb662403bf433f3884021435c3f6cd010b387b4174a01367ae415b3e593cde5eR56-R60

Still looks ugly to me, but it does what you asked for :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think, @CJ-Wright , is this better or worse?

streamz/sinks.py Show resolved Hide resolved
streamz/sinks.py Outdated
1
"""
def __init__(self, upstream, file, end="\n", mode="a", **kwargs):
self._fp = open(file, mode=mode, buffering=1) if isinstance(file, str) else file
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Integrate with fsspec.open?
Why set the buffering?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why set the buffering?

Used line buffering in development to see what's going on without delay, forgot to remove. Also some tests won't work as is without it, but I guess I'll just use wait_for.

Integrate with fsspec.open?

This is really a question about what filesystem functionality should be provided in core. If we want to integrate this sink with fsspec, then from_textfile should be updated too (or some other nodes added, like from_fs/sink_to_fs), which is beyond sink refactoring. And then, we should think about how users might have a way to install fsspec extras (s3, hdfs etc.) and what other fs-related nodes might be useful, like:

  • listening for new files in a folder/glob and "tailing" them
  • writing to same-sized file partitions (like split command)
  • writing to files partitioned by time (e.g. rotating every 5 minutes)

What I'm getting at is that it looks like a plugin to me. We could have streamz[fs] for nodes and fsspec built-ins and then streamz[s3,hdfs] for extras.

streamz/sinks.py Outdated Show resolved Hide resolved
streamz/tests/test_sinks.py Outdated Show resolved Hide resolved
streamz/tests/test_sinks.py Outdated Show resolved Hide resolved
streamz/tests/test_sinks.py Show resolved Hide resolved
@codecov
Copy link

codecov bot commented Nov 13, 2020

Codecov Report

Merging #386 (269d3ab) into master (a6e9111) will increase coverage by 0.02%.
The diff coverage is 100.00%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #386      +/-   ##
==========================================
+ Coverage   95.74%   95.77%   +0.02%     
==========================================
  Files          17       18       +1     
  Lines        2559     2577      +18     
==========================================
+ Hits         2450     2468      +18     
  Misses        109      109              
Impacted Files Coverage Δ
streamz/__init__.py 83.33% <100.00%> (+1.51%) ⬆️
streamz/core.py 95.82% <100.00%> (-0.10%) ⬇️
streamz/graph.py 89.89% <100.00%> (-0.30%) ⬇️
streamz/sinks.py 100.00% <100.00%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update a6e9111...269d3ab. Read the comment docs.

@roveo
Copy link
Contributor Author

roveo commented Nov 17, 2020

I took the liberty of making the default Stream.downstreams an empty list instead of [None] and then just using append/remove to manipulate, core tests are passing.

@martindurant
Copy link
Member

The line that's annoying codecov is this, which should now be removed, if we don't have Nones.

@roveo
Copy link
Contributor Author

roveo commented Nov 18, 2020

I also have a question. What is the point of returning [] here?

https://github.com/roveo/streamz/blob/269d3ab397103436c1599b013a67aa83324e28c0/streamz/sinks.py#L63-L68

I tried commenting it out and running tests, everything seems to work the same.

@CJ-Wright
Copy link
Member

Hmm I'm not certain. Although at this point I'd trust the tests, if we break something then we should have a test for that behavior

@martindurant
Copy link
Member

What is the point of returning [] here?

Looks like we're keeping it for now :) Feel free to make a new PR with the change.
This is being merged now!

@martindurant martindurant merged commit f6dd5aa into python-streamz:master Nov 24, 2020
@roveo
Copy link
Contributor Author

roveo commented Nov 24, 2020

Great, thanks!

@martindurant
Copy link
Member

Just noticed sink_to_file in streamz.sources - obviously in the wrong place, and sorry to say you had to repeat some of this. The former seems to be called just once in test_core, and could be deleted in favour of yours.

@roveo
Copy link
Contributor Author

roveo commented Nov 27, 2020

Just noticed sink_to_file in streamz.sources

Yeah, I mentioned it in PR description. Wasn't confident I should be deleting it, especially because it's not really a source/stream node, but more of a utility function. If you're sure it won't be a breaking change, I'll remove.

@martindurant
Copy link
Member

martindurant commented Nov 28, 2020 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Creating custom sink nodes
3 participants