Skip to content

Commit

Permalink
Source Slack Refactor: update Python stream state handling (#39343)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristoGrab authored Jun 13, 2024
1 parent 5f1f7fe commit ea0e818
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 48 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-slack/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: c2281cee-86f9-4a86-bb48-d23286b4c7bd
dockerImageTag: 1.1.6
dockerImageTag: 1.1.7
dockerRepository: airbyte/source-slack
documentationUrl: https://docs.airbyte.com/integrations/sources/slack
githubIssueLabel: source-slack
Expand Down
6 changes: 3 additions & 3 deletions airbyte-integrations/connectors/source-slack/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "1.1.6"
version = "1.1.7"
name = "source-slack"
description = "Source implementation for Slack."
authors = [ "Airbyte <[email protected]>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pendulum
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.core import CheckpointMixin
from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream
from pendulum import DateTime

Expand Down Expand Up @@ -148,15 +149,24 @@ def read_records(self, sync_mode: SyncMode, **kwargs) -> Iterable[Mapping[str, A


# Incremental Streams
class IncrementalMessageStream(ChanneledStream, ABC):
class IncrementalMessageStream(CheckpointMixin, ChanneledStream, ABC):
data_field = "messages"
cursor_field = "float_ts"
primary_key = ["channel_id", "ts"]

@property
def state(self) -> Mapping[str, Any]:
return self._state

@state.setter
def state(self, value: Mapping[str, Any]):
self._state = value

def __init__(self, default_start_date: DateTime, end_date: Optional[DateTime] = None, **kwargs):
self._start_ts = default_start_date.timestamp()
self._end_ts = end_date and end_date.timestamp()
self.set_sub_primary_key()
self._state = None
super().__init__(**kwargs)

def set_sub_primary_key(self):
Expand All @@ -177,7 +187,7 @@ def parse_response(self, response: requests.Response, stream_slice: Mapping[str,
record[self.cursor_field] = float(record[self.sub_primary_key_2])
yield record

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
current_stream_state = current_stream_state or {}
current_stream_state[self.cursor_field] = max(
latest_record[self.cursor_field], float(current_stream_state.get(self.cursor_field, self._start_ts))
Expand All @@ -196,7 +206,9 @@ def read_records(
# return an empty iterator
# this is done to emit at least one state message when no slices are generated
return iter([])
return super().read_records(sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state)
for record in super().read_records(sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state):
self.state = self._get_updated_state(self.state, record)
yield record


class ChannelMessages(HttpSubStream, IncrementalMessageStream):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def test_get_updated_state(authenticator, token_config, current_state, latest_re
default_start_date=pendulum.parse(token_config["start_date"]),
lookback_window=token_config["lookback_window"]
)
assert stream.get_updated_state(current_stream_state=current_state, latest_record=latest_record) == expected_state
assert stream._get_updated_state(current_stream_state=current_state, latest_record=latest_record) == expected_state


def test_threads_request_params(authenticator, token_config):
Expand Down
79 changes: 40 additions & 39 deletions docs/integrations/sources/slack.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,47 +171,48 @@ Slack has [rate limit restrictions](https://api.slack.com/docs/rate-limits).

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :------------------------------------------------------- |:--------------------------------------------------------------------------------------|
| 1.1.7 | 2025-06-14 | [39343](https://github.com/airbytehq/airbyte/pull/39343) | Update state handling for `threads` Python stream |
| 1.1.6 | 2024-06-12 | [39132](https://github.com/airbytehq/airbyte/pull/39416) | Respect `include_private_channels` option in `threads` stream |
| 1.1.5 | 2024-06-10 | [39132](https://github.com/airbytehq/airbyte/pull/39132) | Convert string state to float for `threads` stream |
| 1.1.4 | 2024-06-06 | [39271](https://github.com/airbytehq/airbyte/pull/39271) | [autopull] Upgrade base image to v1.2.2 |
| 1.1.3 | 2024-06-05 | [39121](https://github.com/airbytehq/airbyte/pull/39121) | Change cursor format for `channel_messages` stream to `%s_as_float` |
| 1.1.2 | 2024-05-23 | [38619](https://github.com/airbytehq/airbyte/pull/38619) | Fix cursor granularity for the `channel_messages` stream |
| 1.1.1 | 2024-05-02 | [36661](https://github.com/airbytehq/airbyte/pull/36661) | Schema descriptions |
| 1.1.0 | 2024-04-18 | [37332](https://github.com/airbytehq/airbyte/pull/37332) | Add the capability to sync from private channels |
| 1.0.0 | 2024-04-02 | [35477](https://github.com/airbytehq/airbyte/pull/35477) | Migration to low-code CDK |
| 0.4.1 | 2024-03-27 | [36579](https://github.com/airbytehq/airbyte/pull/36579) | Upgrade airbyte-cdk version to emit record counts as floats |
| 0.4.0 | 2024-03-19 | [36267](https://github.com/airbytehq/airbyte/pull/36267) | Pin airbyte-cdk version to `^0` |
| 0.3.9 | 2024-02-12 | [35157](https://github.com/airbytehq/airbyte/pull/35157) | Manage dependencies with Poetry |
| 0.3.8 | 2024-02-09 | [35131](https://github.com/airbytehq/airbyte/pull/35131) | Fixed the issue when `schema discovery` fails with `502` due to the platform timeout |
| 0.3.7 | 2024-01-10 | [1234](https://github.com/airbytehq/airbyte/pull/1234) | Prepare for airbyte-lib |
| 0.3.6 | 2023-11-21 | [32707](https://github.com/airbytehq/airbyte/pull/32707) | Threads: do not use client-side record filtering |
| 0.3.5 | 2023-10-19 | [31599](https://github.com/airbytehq/airbyte/pull/31599) | Base image migration: remove Dockerfile and use the python-connector-base image |
| 0.3.4 | 2023-10-06 | [31134](https://github.com/airbytehq/airbyte/pull/31134) | Update CDK and remove non iterable return from records |
| 0.3.3 | 2023-09-28 | [30580](https://github.com/airbytehq/airbyte/pull/30580) | Add `bot_id` field to threads schema |
| 0.3.2 | 2023-09-20 | [30613](https://github.com/airbytehq/airbyte/pull/30613) | Set default value for channel_filters during discover |
| 0.3.1 | 2023-09-19 | [30570](https://github.com/airbytehq/airbyte/pull/30570) | Use default availability strategy |
| 0.3.0 | 2023-09-18 | [30521](https://github.com/airbytehq/airbyte/pull/30521) | Add unexpected fields to streams `channel_messages`, `channels`, `threads`, `users` |
| 0.2.0 | 2023-05-24 | [26497](https://github.com/airbytehq/airbyte/pull/26497) | Fixed `lookback window` value limitations |
| 0.1.26 | 2023-05-17 | [26186](https://github.com/airbytehq/airbyte/pull/26186) | Limited the `lookback window` range for input configuration |
| 0.1.25 | 2023-03-20 | [22889](https://github.com/airbytehq/airbyte/pull/22889) | Specified date formatting in specification |
| 0.1.24 | 2023-03-20 | [24126](https://github.com/airbytehq/airbyte/pull/24126) | Increase page size to 1000 |
| 0.1.23 | 2023-02-21 | [21907](https://github.com/airbytehq/airbyte/pull/21907) | Do not join channels that not gonna be synced |
| 0.1.22 | 2023-01-27 | [22022](https://github.com/airbytehq/airbyte/pull/22022) | Set `AvailabilityStrategy` for streams explicitly to `None` |
| 0.1.21 | 2023-01-12 | [21321](https://github.com/airbytehq/airbyte/pull/21321) | Retry Timeout error |
| 0.1.20 | 2022-12-21 | [20767](https://github.com/airbytehq/airbyte/pull/20767) | Update schema |
| 0.1.19 | 2022-12-01 | [19970](https://github.com/airbytehq/airbyte/pull/19970) | Remove OAuth2.0 broken `refresh_token` support |
| 0.1.18 | 2022-09-28 | [17315](https://github.com/airbytehq/airbyte/pull/17315) | Always install latest version of Airbyte CDK |
| 0.1.17 | 2022-08-28 | [16085](https://github.com/airbytehq/airbyte/pull/16085) | Increase unit test coverage |
| 0.1.16 | 2022-08-28 | [16050](https://github.com/airbytehq/airbyte/pull/16050) | Fix SATs |
| 0.1.15 | 2022-03-31 | [11613](https://github.com/airbytehq/airbyte/pull/11613) | Add 'channel_filter' config and improve performance |
| 0.1.14 | 2022-01-26 | [9575](https://github.com/airbytehq/airbyte/pull/9575) | Correct schema |
| 0.1.13 | 2021-11-08 | [7499](https://github.com/airbytehq/airbyte/pull/7499) | Remove base-python dependencies |
| 0.1.12 | 2021-10-07 | [6570](https://github.com/airbytehq/airbyte/pull/6570) | Implement OAuth support with OAuth authenticator |
| 0.1.11 | 2021-08-27 | [5830](https://github.com/airbytehq/airbyte/pull/5830) | Fix sync operations hang forever issue |
| 0.1.10 | 2021-08-27 | [5697](https://github.com/airbytehq/airbyte/pull/5697) | Fix max retries issue |
| 0.1.9 | 2021-07-20 | [4860](https://github.com/airbytehq/airbyte/pull/4860) | Fix reading threads issue |
| 0.1.8 | 2021-07-14 | [4683](https://github.com/airbytehq/airbyte/pull/4683) | Add float_ts primary key |
| 0.1.7 | 2021-06-25 | [3978](https://github.com/airbytehq/airbyte/pull/3978) | Release Slack CDK Connector |
| 1.1.4 | 2024-06-06 | [39271](https://github.com/airbytehq/airbyte/pull/39271) | [autopull] Upgrade base image to v1.2.2 |
| 1.1.3 | 2024-06-05 | [39121](https://github.com/airbytehq/airbyte/pull/39121) | Change cursor format for `channel_messages` stream to `%s_as_float` |
| 1.1.2 | 2024-05-23 | [38619](https://github.com/airbytehq/airbyte/pull/38619) | Fix cursor granularity for the `channel_messages` stream |
| 1.1.1 | 2024-05-02 | [36661](https://github.com/airbytehq/airbyte/pull/36661) | Schema descriptions |
| 1.1.0 | 2024-04-18 | [37332](https://github.com/airbytehq/airbyte/pull/37332) | Add the capability to sync from private channels |
| 1.0.0 | 2024-04-02 | [35477](https://github.com/airbytehq/airbyte/pull/35477) | Migration to low-code CDK |
| 0.4.1 | 2024-03-27 | [36579](https://github.com/airbytehq/airbyte/pull/36579) | Upgrade airbyte-cdk version to emit record counts as floats |
| 0.4.0 | 2024-03-19 | [36267](https://github.com/airbytehq/airbyte/pull/36267) | Pin airbyte-cdk version to `^0` |
| 0.3.9 | 2024-02-12 | [35157](https://github.com/airbytehq/airbyte/pull/35157) | Manage dependencies with Poetry |
| 0.3.8 | 2024-02-09 | [35131](https://github.com/airbytehq/airbyte/pull/35131) | Fixed the issue when `schema discovery` fails with `502` due to the platform timeout |
| 0.3.7 | 2024-01-10 | [1234](https://github.com/airbytehq/airbyte/pull/1234) | Prepare for airbyte-lib |
| 0.3.6 | 2023-11-21 | [32707](https://github.com/airbytehq/airbyte/pull/32707) | Threads: do not use client-side record filtering |
| 0.3.5 | 2023-10-19 | [31599](https://github.com/airbytehq/airbyte/pull/31599) | Base image migration: remove Dockerfile and use the python-connector-base image |
| 0.3.4 | 2023-10-06 | [31134](https://github.com/airbytehq/airbyte/pull/31134) | Update CDK and remove non iterable return from records |
| 0.3.3 | 2023-09-28 | [30580](https://github.com/airbytehq/airbyte/pull/30580) | Add `bot_id` field to threads schema |
| 0.3.2 | 2023-09-20 | [30613](https://github.com/airbytehq/airbyte/pull/30613) | Set default value for channel_filters during discover |
| 0.3.1 | 2023-09-19 | [30570](https://github.com/airbytehq/airbyte/pull/30570) | Use default availability strategy |
| 0.3.0 | 2023-09-18 | [30521](https://github.com/airbytehq/airbyte/pull/30521) | Add unexpected fields to streams `channel_messages`, `channels`, `threads`, `users` |
| 0.2.0 | 2023-05-24 | [26497](https://github.com/airbytehq/airbyte/pull/26497) | Fixed `lookback window` value limitations |
| 0.1.26 | 2023-05-17 | [26186](https://github.com/airbytehq/airbyte/pull/26186) | Limited the `lookback window` range for input configuration |
| 0.1.25 | 2023-03-20 | [22889](https://github.com/airbytehq/airbyte/pull/22889) | Specified date formatting in specification |
| 0.1.24 | 2023-03-20 | [24126](https://github.com/airbytehq/airbyte/pull/24126) | Increase page size to 1000 |
| 0.1.23 | 2023-02-21 | [21907](https://github.com/airbytehq/airbyte/pull/21907) | Do not join channels that not gonna be synced |
| 0.1.22 | 2023-01-27 | [22022](https://github.com/airbytehq/airbyte/pull/22022) | Set `AvailabilityStrategy` for streams explicitly to `None` |
| 0.1.21 | 2023-01-12 | [21321](https://github.com/airbytehq/airbyte/pull/21321) | Retry Timeout error |
| 0.1.20 | 2022-12-21 | [20767](https://github.com/airbytehq/airbyte/pull/20767) | Update schema |
| 0.1.19 | 2022-12-01 | [19970](https://github.com/airbytehq/airbyte/pull/19970) | Remove OAuth2.0 broken `refresh_token` support |
| 0.1.18 | 2022-09-28 | [17315](https://github.com/airbytehq/airbyte/pull/17315) | Always install latest version of Airbyte CDK |
| 0.1.17 | 2022-08-28 | [16085](https://github.com/airbytehq/airbyte/pull/16085) | Increase unit test coverage |
| 0.1.16 | 2022-08-28 | [16050](https://github.com/airbytehq/airbyte/pull/16050) | Fix SATs |
| 0.1.15 | 2022-03-31 | [11613](https://github.com/airbytehq/airbyte/pull/11613) | Add 'channel_filter' config and improve performance |
| 0.1.14 | 2022-01-26 | [9575](https://github.com/airbytehq/airbyte/pull/9575) | Correct schema |
| 0.1.13 | 2021-11-08 | [7499](https://github.com/airbytehq/airbyte/pull/7499) | Remove base-python dependencies |
| 0.1.12 | 2021-10-07 | [6570](https://github.com/airbytehq/airbyte/pull/6570) | Implement OAuth support with OAuth authenticator |
| 0.1.11 | 2021-08-27 | [5830](https://github.com/airbytehq/airbyte/pull/5830) | Fix sync operations hang forever issue |
| 0.1.10 | 2021-08-27 | [5697](https://github.com/airbytehq/airbyte/pull/5697) | Fix max retries issue |
| 0.1.9 | 2021-07-20 | [4860](https://github.com/airbytehq/airbyte/pull/4860) | Fix reading threads issue |
| 0.1.8 | 2021-07-14 | [4683](https://github.com/airbytehq/airbyte/pull/4683) | Add float_ts primary key |
| 0.1.7 | 2021-06-25 | [3978](https://github.com/airbytehq/airbyte/pull/3978) | Release Slack CDK Connector |

</details>

Expand Down

0 comments on commit ea0e818

Please sign in to comment.