From ea0e818898c51794b17627da3b86cbddc0b73d4a Mon Sep 17 00:00:00 2001 From: Christo Grabowski <108154848+ChristoGrab@users.noreply.github.com> Date: Thu, 13 Jun 2024 18:25:28 -0400 Subject: [PATCH] Source Slack Refactor: update Python stream state handling (#39343) --- .../connectors/source-slack/metadata.yaml | 2 +- .../connectors/source-slack/poetry.lock | 6 +- .../connectors/source-slack/pyproject.toml | 2 +- .../source-slack/source_slack/streams.py | 18 ++++- .../source-slack/unit_tests/test_streams.py | 2 +- docs/integrations/sources/slack.md | 79 ++++++++++--------- 6 files changed, 61 insertions(+), 48 deletions(-) diff --git a/airbyte-integrations/connectors/source-slack/metadata.yaml b/airbyte-integrations/connectors/source-slack/metadata.yaml index d25e69e2cebf..75ae8d4515bd 100644 --- a/airbyte-integrations/connectors/source-slack/metadata.yaml +++ b/airbyte-integrations/connectors/source-slack/metadata.yaml @@ -10,7 +10,7 @@ data: connectorSubtype: api connectorType: source definitionId: c2281cee-86f9-4a86-bb48-d23286b4c7bd - dockerImageTag: 1.1.6 + dockerImageTag: 1.1.7 dockerRepository: airbyte/source-slack documentationUrl: https://docs.airbyte.com/integrations/sources/slack githubIssueLabel: source-slack diff --git a/airbyte-integrations/connectors/source-slack/poetry.lock b/airbyte-integrations/connectors/source-slack/poetry.lock index cec23b6ae152..7feaaacfb234 100644 --- a/airbyte-integrations/connectors/source-slack/poetry.lock +++ b/airbyte-integrations/connectors/source-slack/poetry.lock @@ -399,13 +399,13 @@ dev = ["PyTest", "PyTest-Cov", "bump2version (<1)", "sphinx (<2)", "tox"] [[package]] name = "dpath" -version = "2.1.6" +version = "2.2.0" description = "Filesystem-like pathing and searching for dictionaries" optional = false python-versions = ">=3.7" files = [ - {file = "dpath-2.1.6-py3-none-any.whl", hash = "sha256:31407395b177ab63ef72e2f6ae268c15e938f2990a8ecf6510f5686c02b6db73"}, - {file = "dpath-2.1.6.tar.gz", hash = "sha256:f1e07c72e8605c6a9e80b64bc8f42714de08a789c7de417e49c3f87a19692e47"}, + {file = "dpath-2.2.0-py3-none-any.whl", hash = "sha256:b330a375ded0a0d2ed404440f6c6a715deae5313af40bbb01c8a41d891900576"}, + {file = "dpath-2.2.0.tar.gz", hash = "sha256:34f7e630dc55ea3f219e555726f5da4b4b25f2200319c8e6902c394258dd6a3e"}, ] [[package]] diff --git a/airbyte-integrations/connectors/source-slack/pyproject.toml b/airbyte-integrations/connectors/source-slack/pyproject.toml index edf51f6325f8..fbf7765f8ead 100644 --- a/airbyte-integrations/connectors/source-slack/pyproject.toml +++ b/airbyte-integrations/connectors/source-slack/pyproject.toml @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",] build-backend = "poetry.core.masonry.api" [tool.poetry] -version = "1.1.6" +version = "1.1.7" name = "source-slack" description = "Source implementation for Slack." authors = [ "Airbyte ",] diff --git a/airbyte-integrations/connectors/source-slack/source_slack/streams.py b/airbyte-integrations/connectors/source-slack/source_slack/streams.py index 276ea1d2b203..6dfffe7a7436 100644 --- a/airbyte-integrations/connectors/source-slack/source_slack/streams.py +++ b/airbyte-integrations/connectors/source-slack/source_slack/streams.py @@ -9,6 +9,7 @@ import pendulum import requests from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.streams.core import CheckpointMixin from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream from pendulum import DateTime @@ -148,15 +149,24 @@ def read_records(self, sync_mode: SyncMode, **kwargs) -> Iterable[Mapping[str, A # Incremental Streams -class IncrementalMessageStream(ChanneledStream, ABC): +class IncrementalMessageStream(CheckpointMixin, ChanneledStream, ABC): data_field = "messages" cursor_field = "float_ts" primary_key = ["channel_id", "ts"] + @property + def state(self) -> Mapping[str, Any]: + return self._state + + @state.setter + def state(self, value: Mapping[str, Any]): + self._state = value + def __init__(self, default_start_date: DateTime, end_date: Optional[DateTime] = None, **kwargs): self._start_ts = default_start_date.timestamp() self._end_ts = end_date and end_date.timestamp() self.set_sub_primary_key() + self._state = None super().__init__(**kwargs) def set_sub_primary_key(self): @@ -177,7 +187,7 @@ def parse_response(self, response: requests.Response, stream_slice: Mapping[str, record[self.cursor_field] = float(record[self.sub_primary_key_2]) yield record - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: current_stream_state = current_stream_state or {} current_stream_state[self.cursor_field] = max( latest_record[self.cursor_field], float(current_stream_state.get(self.cursor_field, self._start_ts)) @@ -196,7 +206,9 @@ def read_records( # return an empty iterator # this is done to emit at least one state message when no slices are generated return iter([]) - return super().read_records(sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state) + for record in super().read_records(sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state): + self.state = self._get_updated_state(self.state, record) + yield record class ChannelMessages(HttpSubStream, IncrementalMessageStream): diff --git a/airbyte-integrations/connectors/source-slack/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-slack/unit_tests/test_streams.py index 25cea71ab6b0..75b04c4a18aa 100644 --- a/airbyte-integrations/connectors/source-slack/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-slack/unit_tests/test_streams.py @@ -96,7 +96,7 @@ def test_get_updated_state(authenticator, token_config, current_state, latest_re default_start_date=pendulum.parse(token_config["start_date"]), lookback_window=token_config["lookback_window"] ) - assert stream.get_updated_state(current_stream_state=current_state, latest_record=latest_record) == expected_state + assert stream._get_updated_state(current_stream_state=current_state, latest_record=latest_record) == expected_state def test_threads_request_params(authenticator, token_config): diff --git a/docs/integrations/sources/slack.md b/docs/integrations/sources/slack.md index 3d1957b7d900..b3d1192247a4 100644 --- a/docs/integrations/sources/slack.md +++ b/docs/integrations/sources/slack.md @@ -171,47 +171,48 @@ Slack has [rate limit restrictions](https://api.slack.com/docs/rate-limits). | Version | Date | Pull Request | Subject | |:--------|:-----------| :------------------------------------------------------- |:--------------------------------------------------------------------------------------| +| 1.1.7 | 2025-06-14 | [39343](https://github.com/airbytehq/airbyte/pull/39343) | Update state handling for `threads` Python stream | | 1.1.6 | 2024-06-12 | [39132](https://github.com/airbytehq/airbyte/pull/39416) | Respect `include_private_channels` option in `threads` stream | | 1.1.5 | 2024-06-10 | [39132](https://github.com/airbytehq/airbyte/pull/39132) | Convert string state to float for `threads` stream | -| 1.1.4 | 2024-06-06 | [39271](https://github.com/airbytehq/airbyte/pull/39271) | [autopull] Upgrade base image to v1.2.2 | -| 1.1.3 | 2024-06-05 | [39121](https://github.com/airbytehq/airbyte/pull/39121) | Change cursor format for `channel_messages` stream to `%s_as_float` | -| 1.1.2 | 2024-05-23 | [38619](https://github.com/airbytehq/airbyte/pull/38619) | Fix cursor granularity for the `channel_messages` stream | -| 1.1.1 | 2024-05-02 | [36661](https://github.com/airbytehq/airbyte/pull/36661) | Schema descriptions | -| 1.1.0 | 2024-04-18 | [37332](https://github.com/airbytehq/airbyte/pull/37332) | Add the capability to sync from private channels | -| 1.0.0 | 2024-04-02 | [35477](https://github.com/airbytehq/airbyte/pull/35477) | Migration to low-code CDK | -| 0.4.1 | 2024-03-27 | [36579](https://github.com/airbytehq/airbyte/pull/36579) | Upgrade airbyte-cdk version to emit record counts as floats | -| 0.4.0 | 2024-03-19 | [36267](https://github.com/airbytehq/airbyte/pull/36267) | Pin airbyte-cdk version to `^0` | -| 0.3.9 | 2024-02-12 | [35157](https://github.com/airbytehq/airbyte/pull/35157) | Manage dependencies with Poetry | -| 0.3.8 | 2024-02-09 | [35131](https://github.com/airbytehq/airbyte/pull/35131) | Fixed the issue when `schema discovery` fails with `502` due to the platform timeout | -| 0.3.7 | 2024-01-10 | [1234](https://github.com/airbytehq/airbyte/pull/1234) | Prepare for airbyte-lib | -| 0.3.6 | 2023-11-21 | [32707](https://github.com/airbytehq/airbyte/pull/32707) | Threads: do not use client-side record filtering | -| 0.3.5 | 2023-10-19 | [31599](https://github.com/airbytehq/airbyte/pull/31599) | Base image migration: remove Dockerfile and use the python-connector-base image | -| 0.3.4 | 2023-10-06 | [31134](https://github.com/airbytehq/airbyte/pull/31134) | Update CDK and remove non iterable return from records | -| 0.3.3 | 2023-09-28 | [30580](https://github.com/airbytehq/airbyte/pull/30580) | Add `bot_id` field to threads schema | -| 0.3.2 | 2023-09-20 | [30613](https://github.com/airbytehq/airbyte/pull/30613) | Set default value for channel_filters during discover | -| 0.3.1 | 2023-09-19 | [30570](https://github.com/airbytehq/airbyte/pull/30570) | Use default availability strategy | -| 0.3.0 | 2023-09-18 | [30521](https://github.com/airbytehq/airbyte/pull/30521) | Add unexpected fields to streams `channel_messages`, `channels`, `threads`, `users` | -| 0.2.0 | 2023-05-24 | [26497](https://github.com/airbytehq/airbyte/pull/26497) | Fixed `lookback window` value limitations | -| 0.1.26 | 2023-05-17 | [26186](https://github.com/airbytehq/airbyte/pull/26186) | Limited the `lookback window` range for input configuration | -| 0.1.25 | 2023-03-20 | [22889](https://github.com/airbytehq/airbyte/pull/22889) | Specified date formatting in specification | -| 0.1.24 | 2023-03-20 | [24126](https://github.com/airbytehq/airbyte/pull/24126) | Increase page size to 1000 | -| 0.1.23 | 2023-02-21 | [21907](https://github.com/airbytehq/airbyte/pull/21907) | Do not join channels that not gonna be synced | -| 0.1.22 | 2023-01-27 | [22022](https://github.com/airbytehq/airbyte/pull/22022) | Set `AvailabilityStrategy` for streams explicitly to `None` | -| 0.1.21 | 2023-01-12 | [21321](https://github.com/airbytehq/airbyte/pull/21321) | Retry Timeout error | -| 0.1.20 | 2022-12-21 | [20767](https://github.com/airbytehq/airbyte/pull/20767) | Update schema | -| 0.1.19 | 2022-12-01 | [19970](https://github.com/airbytehq/airbyte/pull/19970) | Remove OAuth2.0 broken `refresh_token` support | -| 0.1.18 | 2022-09-28 | [17315](https://github.com/airbytehq/airbyte/pull/17315) | Always install latest version of Airbyte CDK | -| 0.1.17 | 2022-08-28 | [16085](https://github.com/airbytehq/airbyte/pull/16085) | Increase unit test coverage | -| 0.1.16 | 2022-08-28 | [16050](https://github.com/airbytehq/airbyte/pull/16050) | Fix SATs | -| 0.1.15 | 2022-03-31 | [11613](https://github.com/airbytehq/airbyte/pull/11613) | Add 'channel_filter' config and improve performance | -| 0.1.14 | 2022-01-26 | [9575](https://github.com/airbytehq/airbyte/pull/9575) | Correct schema | -| 0.1.13 | 2021-11-08 | [7499](https://github.com/airbytehq/airbyte/pull/7499) | Remove base-python dependencies | -| 0.1.12 | 2021-10-07 | [6570](https://github.com/airbytehq/airbyte/pull/6570) | Implement OAuth support with OAuth authenticator | -| 0.1.11 | 2021-08-27 | [5830](https://github.com/airbytehq/airbyte/pull/5830) | Fix sync operations hang forever issue | -| 0.1.10 | 2021-08-27 | [5697](https://github.com/airbytehq/airbyte/pull/5697) | Fix max retries issue | -| 0.1.9 | 2021-07-20 | [4860](https://github.com/airbytehq/airbyte/pull/4860) | Fix reading threads issue | -| 0.1.8 | 2021-07-14 | [4683](https://github.com/airbytehq/airbyte/pull/4683) | Add float_ts primary key | -| 0.1.7 | 2021-06-25 | [3978](https://github.com/airbytehq/airbyte/pull/3978) | Release Slack CDK Connector | +| 1.1.4 | 2024-06-06 | [39271](https://github.com/airbytehq/airbyte/pull/39271) | [autopull] Upgrade base image to v1.2.2 | +| 1.1.3 | 2024-06-05 | [39121](https://github.com/airbytehq/airbyte/pull/39121) | Change cursor format for `channel_messages` stream to `%s_as_float` | +| 1.1.2 | 2024-05-23 | [38619](https://github.com/airbytehq/airbyte/pull/38619) | Fix cursor granularity for the `channel_messages` stream | +| 1.1.1 | 2024-05-02 | [36661](https://github.com/airbytehq/airbyte/pull/36661) | Schema descriptions | +| 1.1.0 | 2024-04-18 | [37332](https://github.com/airbytehq/airbyte/pull/37332) | Add the capability to sync from private channels | +| 1.0.0 | 2024-04-02 | [35477](https://github.com/airbytehq/airbyte/pull/35477) | Migration to low-code CDK | +| 0.4.1 | 2024-03-27 | [36579](https://github.com/airbytehq/airbyte/pull/36579) | Upgrade airbyte-cdk version to emit record counts as floats | +| 0.4.0 | 2024-03-19 | [36267](https://github.com/airbytehq/airbyte/pull/36267) | Pin airbyte-cdk version to `^0` | +| 0.3.9 | 2024-02-12 | [35157](https://github.com/airbytehq/airbyte/pull/35157) | Manage dependencies with Poetry | +| 0.3.8 | 2024-02-09 | [35131](https://github.com/airbytehq/airbyte/pull/35131) | Fixed the issue when `schema discovery` fails with `502` due to the platform timeout | +| 0.3.7 | 2024-01-10 | [1234](https://github.com/airbytehq/airbyte/pull/1234) | Prepare for airbyte-lib | +| 0.3.6 | 2023-11-21 | [32707](https://github.com/airbytehq/airbyte/pull/32707) | Threads: do not use client-side record filtering | +| 0.3.5 | 2023-10-19 | [31599](https://github.com/airbytehq/airbyte/pull/31599) | Base image migration: remove Dockerfile and use the python-connector-base image | +| 0.3.4 | 2023-10-06 | [31134](https://github.com/airbytehq/airbyte/pull/31134) | Update CDK and remove non iterable return from records | +| 0.3.3 | 2023-09-28 | [30580](https://github.com/airbytehq/airbyte/pull/30580) | Add `bot_id` field to threads schema | +| 0.3.2 | 2023-09-20 | [30613](https://github.com/airbytehq/airbyte/pull/30613) | Set default value for channel_filters during discover | +| 0.3.1 | 2023-09-19 | [30570](https://github.com/airbytehq/airbyte/pull/30570) | Use default availability strategy | +| 0.3.0 | 2023-09-18 | [30521](https://github.com/airbytehq/airbyte/pull/30521) | Add unexpected fields to streams `channel_messages`, `channels`, `threads`, `users` | +| 0.2.0 | 2023-05-24 | [26497](https://github.com/airbytehq/airbyte/pull/26497) | Fixed `lookback window` value limitations | +| 0.1.26 | 2023-05-17 | [26186](https://github.com/airbytehq/airbyte/pull/26186) | Limited the `lookback window` range for input configuration | +| 0.1.25 | 2023-03-20 | [22889](https://github.com/airbytehq/airbyte/pull/22889) | Specified date formatting in specification | +| 0.1.24 | 2023-03-20 | [24126](https://github.com/airbytehq/airbyte/pull/24126) | Increase page size to 1000 | +| 0.1.23 | 2023-02-21 | [21907](https://github.com/airbytehq/airbyte/pull/21907) | Do not join channels that not gonna be synced | +| 0.1.22 | 2023-01-27 | [22022](https://github.com/airbytehq/airbyte/pull/22022) | Set `AvailabilityStrategy` for streams explicitly to `None` | +| 0.1.21 | 2023-01-12 | [21321](https://github.com/airbytehq/airbyte/pull/21321) | Retry Timeout error | +| 0.1.20 | 2022-12-21 | [20767](https://github.com/airbytehq/airbyte/pull/20767) | Update schema | +| 0.1.19 | 2022-12-01 | [19970](https://github.com/airbytehq/airbyte/pull/19970) | Remove OAuth2.0 broken `refresh_token` support | +| 0.1.18 | 2022-09-28 | [17315](https://github.com/airbytehq/airbyte/pull/17315) | Always install latest version of Airbyte CDK | +| 0.1.17 | 2022-08-28 | [16085](https://github.com/airbytehq/airbyte/pull/16085) | Increase unit test coverage | +| 0.1.16 | 2022-08-28 | [16050](https://github.com/airbytehq/airbyte/pull/16050) | Fix SATs | +| 0.1.15 | 2022-03-31 | [11613](https://github.com/airbytehq/airbyte/pull/11613) | Add 'channel_filter' config and improve performance | +| 0.1.14 | 2022-01-26 | [9575](https://github.com/airbytehq/airbyte/pull/9575) | Correct schema | +| 0.1.13 | 2021-11-08 | [7499](https://github.com/airbytehq/airbyte/pull/7499) | Remove base-python dependencies | +| 0.1.12 | 2021-10-07 | [6570](https://github.com/airbytehq/airbyte/pull/6570) | Implement OAuth support with OAuth authenticator | +| 0.1.11 | 2021-08-27 | [5830](https://github.com/airbytehq/airbyte/pull/5830) | Fix sync operations hang forever issue | +| 0.1.10 | 2021-08-27 | [5697](https://github.com/airbytehq/airbyte/pull/5697) | Fix max retries issue | +| 0.1.9 | 2021-07-20 | [4860](https://github.com/airbytehq/airbyte/pull/4860) | Fix reading threads issue | +| 0.1.8 | 2021-07-14 | [4683](https://github.com/airbytehq/airbyte/pull/4683) | Add float_ts primary key | +| 0.1.7 | 2021-06-25 | [3978](https://github.com/airbytehq/airbyte/pull/3978) | Release Slack CDK Connector |