Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AirbyteLib: Add Lazy Datasets and iterator syntax support for datasets, caches, and read results #34429

Merged
merged 17 commits into from
Jan 23, 2024

Conversation

aaronsteers
Copy link
Collaborator

@aaronsteers aaronsteers commented Jan 22, 2024

Given a source and cache declared something like this...

    source = ab.get_connector("source-test", config={"apiKey": "test"})
    result: ReadResult = source.read(ab.new_local_cache())

This adds support for all of these syntaxes:

    # Access by key:
    cached_dataset: CachedDataset = result[stream_name]
    cached_dataset: CachedDataset = result.streams[stream_name]
    cached_dataset: CachedDataset = result.cache[stream_name]
    cached_dataset: CachedDataset = result.cache.streams[stream_name]

    # Check if key exists:
    assert stream_name in cached_dataset
    assert stream_name in cached_dataset.streams
    assert stream_name in cached_dataset.cache
    assert stream_name in cached_dataset.cache.streams

Above, the .streams property exists as a more explicit reference than calling iterator or key operations directly against the object.

CachedDatasets and SQLDataset now also have a "with_filter()" method that creates a new SQLDataset by filtering down the original dataset. This is done lazily, so nothing is done on the connection until the dataset is invokes as an iterator or its data is attempted to be read.

These are all equivalent:

    filtered_dataset: SQLDataset = cached_dataset.with_filter("column2 == 1")
    filtered_dataset: SQLDataset = cached_dataset.with_filter(text("column2 == 1"))
    filtered_dataset: SQLDataset = cached_dataset.with_filter(column("column2") == 1)

Other points:

  • SQLDataset objects (the base class) do not support to_sql_table() but CachedDataset objects do.
  • Filter can also be chained together:
    • filtered_dataset: SQLDataset = cached_dataset.with_filter("column2 == 1").with_filter("column1 == 'value1'")
  • Internally, processors (which includes caches and file writers) now internally track which streams they've seen in self._streams_with_data. This is a subset of streams that are declared in the catalog, and this is also where a cache might contain different members than a read result - even if the underlying data is the same.

@aaronsteers aaronsteers requested a review from flash1293 January 22, 2024 22:07
Copy link

vercel bot commented Jan 22, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview Jan 23, 2024 7:26pm

Comment on lines +102 to +106
@property
def _streams_with_data(self) -> set[str]:
"""Return a list of known streams."""
return self._pending_batches.keys() | self._finalized_batches.keys()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is new. Basically, this tells us all the streams we've seen so far.

@aaronsteers
Copy link
Collaborator Author

aaronsteers commented Jan 23, 2024

Per Joe, does this work?:

for stream_name, dataset in result.streams.items():
    ...

Update:

@flash1293 - Confirmed this syntax works as expected, and I've added tests that confirm this.

In the process I noticed a look-alike Source.streams: str which I've refactored as Source._selected_stream_names to avoid conflating with the behavior that treats Cache.streams and ReadResult.streams as (basically) Mapping[str, CachedDataset].

@aaronsteers aaronsteers merged commit 3b2dbe0 into master Jan 23, 2024
22 checks passed
@aaronsteers aaronsteers deleted the aj/lazy-datasets branch January 23, 2024 19:45
jatinyadav-cc pushed a commit to ollionorg/datapipes-airbyte that referenced this pull request Feb 26, 2024
jatinyadav-cc pushed a commit to ollionorg/datapipes-airbyte that referenced this pull request Feb 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant