-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🎉 Source Slack migration to low code #35477
Changes from all commits
bd8a4c8
56daf02
8ad4061
2cab1ff
6aae53d
e45624c
c923aa9
be763e2
4883642
3e2c793
d948037
4736e76
b4ef214
c0376ab
ed7dbd7
703880f
a22e333
cb00fc2
854f87d
c68a752
d13e69f
9795a7d
8358894
0b8ee3b
7d55bad
9fa8a29
7cab07a
85e72ed
bdde638
0031711
cd2c611
8d23ce2
162b096
d94c4dd
d2083b9
602b92f
e84a1c2
53718a1
e7a034f
3788cc2
44ef205
21a2e82
175970d
c45a766
7348e7f
563daa1
7192ba2
af74589
3d48905
320a675
53e93d2
f622dcb
e874d06
5c36ec9
a5b26ca
42a499f
b567508
af8d4e4
3ad394f
3d571a3
5cebd2d
a898a70
0c24c5f
dfd3eac
d0f1b69
42f8309
bf4cb5d
b95bbd8
47e850c
2c94e4e
b89e7f4
53c3207
935e818
29979bf
a1ae4f4
87b1544
92c5f05
82e288f
a36d6cd
2537e03
6ce6404
f23f55e
37de0b8
0e92f78
8f967ac
9b48589
f1a317b
6fadec0
f67b342
47c6400
3163df5
e3ee23b
ca452f4
af613b2
deb7918
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
[run] | ||
omit = | ||
source_slack/run.py |
Large diffs are not rendered by default.
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",] | |
build-backend = "poetry.core.masonry.api" | ||
|
||
[tool.poetry] | ||
version = "0.4.1" | ||
version = "1.0.0" | ||
name = "source-slack" | ||
description = "Source implementation for Slack." | ||
authors = [ "Airbyte <[email protected]>",] | ||
|
@@ -19,6 +19,7 @@ include = "source_slack" | |
python = "^3.9,<3.12" | ||
pendulum = "==2.1.2" | ||
airbyte-cdk = "^0" | ||
freezegun = "^1.4.0" | ||
|
||
[tool.poetry.scripts] | ||
source-slack = "source_slack.run:run" | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,21 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from dataclasses import dataclass | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from typing import List | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import requests | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from airbyte_cdk.sources.declarative.extractors import DpathExtractor | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from airbyte_cdk.sources.declarative.types import Record | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@dataclass | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
class ChannelMembersExtractor(DpathExtractor): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Transform response from list of strings to list dicts: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from: ['aa', 'bb'] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
to: [{'member_id': 'aa'}, {{'member_id': 'bb'}] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def extract_records(self, response: requests.Response) -> List[Record]: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
records = super().extract_records(response) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return [{"member_id": record} for record in records] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
|
||
import logging | ||
from functools import partial | ||
from typing import Any, Iterable, List, Mapping, Optional | ||
|
||
import requests | ||
from airbyte_cdk.models import SyncMode | ||
from airbyte_cdk.sources.declarative.partition_routers import SinglePartitionRouter | ||
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever | ||
from airbyte_cdk.sources.declarative.types import Record, StreamSlice | ||
from airbyte_cdk.sources.streams.core import StreamData | ||
from airbyte_cdk.sources.streams.http import HttpStream | ||
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator | ||
|
||
LOGGER = logging.getLogger("airbyte_logger") | ||
|
||
|
||
class JoinChannelsStream(HttpStream): | ||
""" | ||
This class is a special stream which joins channels because the Slack API only returns messages from channels this bot is in. | ||
Its responses should only be logged for debugging reasons, not read as records. | ||
""" | ||
|
||
url_base = "https://slack.com/api/" | ||
http_method = "POST" | ||
primary_key = "id" | ||
|
||
def __init__(self, channel_filter: List[str] = None, **kwargs): | ||
self.channel_filter = channel_filter or [] | ||
super().__init__(**kwargs) | ||
|
||
def path(self, **kwargs) -> str: | ||
return "conversations.join" | ||
|
||
def parse_response(self, response: requests.Response, stream_slice: Mapping[str, Any] = None, **kwargs) -> Iterable: | ||
""" | ||
Override to simply indicate that the specific channel was joined successfully. | ||
This method should not return any data, but should return an empty iterable. | ||
""" | ||
is_ok = response.json().get("ok", False) | ||
if is_ok: | ||
self.logger.info(f"Successfully joined channel: {stream_slice['channel_name']}") | ||
else: | ||
self.logger.info(f"Unable to joined channel: {stream_slice['channel_name']}. Reason: {response.json()}") | ||
return [] | ||
|
||
def request_body_json(self, stream_slice: Mapping = None, **kwargs) -> Optional[Mapping]: | ||
if stream_slice: | ||
return {"channel": stream_slice.get("channel")} | ||
|
||
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: | ||
""" | ||
The pagination is not applicable to this Service Stream. | ||
""" | ||
return None | ||
|
||
|
||
class ChannelsRetriever(SimpleRetriever): | ||
def __post_init__(self, parameters: Mapping[str, Any]): | ||
super().__post_init__(parameters) | ||
self.stream_slicer = SinglePartitionRouter(parameters={}) | ||
self.record_selector.transformations = [] | ||
|
||
def should_join_to_channel(self, config: Mapping[str, Any], record: Record) -> bool: | ||
""" | ||
The `is_member` property indicates whether the API Bot is already assigned / joined to the channel. | ||
https://api.slack.com/types/conversation#booleans | ||
""" | ||
return config["join_channels"] and not record.get("is_member") | ||
|
||
def make_join_channel_slice(self, channel: Mapping[str, Any]) -> Mapping[str, Any]: | ||
channel_id: str = channel.get("id") | ||
channel_name: str = channel.get("name") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suppose channel_name is not needed for join channel ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no, it used for logging |
||
LOGGER.info(f"Joining Slack Channel: `{channel_name}`") | ||
bazarnov marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return {"channel": channel_id, "channel_name": channel_name} | ||
|
||
def join_channels_stream(self, config) -> JoinChannelsStream: | ||
token = config["credentials"].get("api_token") or config["credentials"].get("access_token") | ||
authenticator = TokenAuthenticator(token) | ||
channel_filter = config["channel_filter"] | ||
return JoinChannelsStream(authenticator=authenticator, channel_filter=channel_filter) | ||
|
||
def join_channel(self, config: Mapping[str, Any], record: Mapping[str, Any]): | ||
list( | ||
self.join_channels_stream(config).read_records( | ||
sync_mode=SyncMode.full_refresh, | ||
stream_slice=self.make_join_channel_slice(record), | ||
) | ||
) | ||
|
||
def read_records( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why "read_records" method was chosen for "join channel" injection? Main disadvantage here is that we had to copy full content of original "read_records" instead of just calling super().read_records(....) and then adding our customization. maybe better point of injection could be parse_response or transform functions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Joining to channel logic is not transformation or parse response. We should join to channel once we receive a channel that was not joined. As it was implemented in python code. We perform all api requests in a one place and have proper handling in case of fail. As this is not logic of parse response or transformation, in feature we can remove this custom component when similar logic appear in low-code cdk. |
||
self, | ||
records_schema: Mapping[str, Any], | ||
stream_slice: Optional[StreamSlice] = None, | ||
) -> Iterable[StreamData]: | ||
_slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check | ||
|
||
self._paginator.reset() | ||
|
||
most_recent_record_from_slice = None | ||
record_generator = partial( | ||
self._parse_records, | ||
stream_state=self.state or {}, | ||
stream_slice=_slice, | ||
records_schema=records_schema, | ||
) | ||
|
||
for stream_data in self._read_pages(record_generator, self.state, _slice): | ||
# joining channel logic | ||
if self.should_join_to_channel(self.config, stream_data): | ||
self.join_channel(self.config, stream_data) | ||
|
||
current_record = self._extract_record(stream_data, _slice) | ||
if self.cursor and current_record: | ||
self.cursor.observe(_slice, current_record) | ||
|
||
most_recent_record_from_slice = self._get_most_recent_record(most_recent_record_from_slice, current_record, _slice) | ||
yield stream_data | ||
|
||
if self.cursor: | ||
self.cursor.observe(_slice, most_recent_record_from_slice) | ||
return |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
|
||
import logging | ||
from typing import Any, List, Mapping | ||
|
||
from airbyte_cdk import AirbyteEntrypoint | ||
from airbyte_cdk.config_observation import create_connector_config_control_message | ||
from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository | ||
from source_slack import SourceSlack | ||
|
||
logger = logging.getLogger("airbyte_logger") | ||
|
||
|
||
class MigrateLegacyConfig: | ||
message_repository: MessageRepository = InMemoryMessageRepository() | ||
|
||
@classmethod | ||
def _should_migrate(cls, config: Mapping[str, Any]) -> bool: | ||
""" | ||
legacy config: | ||
{ | ||
"start_date": "2021-07-22T20:00:00Z", | ||
"end_date": "2021-07-23T20:00:00Z", | ||
"lookback_window": 1, | ||
"join_channels": True, | ||
"channel_filter": ["airbyte-for-beginners", "good-reads"], | ||
"api_token": "api-token" | ||
} | ||
api token should be in the credentials object | ||
""" | ||
if config.get("api_token") and not config.get("credentials"): | ||
return True | ||
return False | ||
|
||
@classmethod | ||
def _move_token_to_credentials(cls, config: Mapping[str, Any]) -> Mapping[str, Any]: | ||
api_token = config["api_token"] | ||
config.update({"credentials": {"api_token": api_token, "option_title": "API Token Credentials"}}) | ||
config.pop("api_token") | ||
return config | ||
|
||
@classmethod | ||
def _modify_and_save(cls, config_path: str, source: SourceSlack, config: Mapping[str, Any]) -> Mapping[str, Any]: | ||
migrated_config = cls._move_token_to_credentials(config) | ||
# save the config | ||
source.write_config(migrated_config, config_path) | ||
return migrated_config | ||
|
||
@classmethod | ||
def _emit_control_message(cls, migrated_config: Mapping[str, Any]) -> None: | ||
# add the Airbyte Control Message to message repo | ||
cls.message_repository.emit_message(create_connector_config_control_message(migrated_config)) | ||
# emit the Airbyte Control Message from message queue to stdout | ||
for message in cls.message_repository._message_queue: | ||
print(message.json(exclude_unset=True)) | ||
|
||
@classmethod | ||
def migrate(cls, args: List[str], source: SourceSlack) -> None: | ||
""" | ||
This method checks the input args, should the config be migrated, | ||
transform if necessary and emit the CONTROL message. | ||
""" | ||
# get config path | ||
config_path = AirbyteEntrypoint(source).extract_config(args) | ||
# proceed only if `--config` arg is provided | ||
if config_path: | ||
# read the existing config | ||
config = source.read_config(config_path) | ||
# migration check | ||
if cls._should_migrate(config): | ||
cls._emit_control_message( | ||
cls._modify_and_save(config_path, source, config), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should properly handle errors from slack. like it was before:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please give me an example that is not covered in this handling errors implementation? Now it works the same as in #35477 (comment), but instead of AirbyteTracedException it raises airbyte_cdk.sources.declarative.exceptions.ReadException.