diff --git a/airbyte-integrations/connectors/source-slack/source_slack/components.py b/airbyte-integrations/connectors/source-slack/source_slack/components.py index 0e2de9f632a3..d3581bba71ea 100644 --- a/airbyte-integrations/connectors/source-slack/source_slack/components.py +++ b/airbyte-integrations/connectors/source-slack/source_slack/components.py @@ -1,24 +1,20 @@ -import dpath.util from dataclasses import dataclass from typing import Optional from typing import List, Mapping, Any, Iterable import requests +from airbyte_cdk.models import AirbyteMessage, SyncMode, Type from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState from airbyte_cdk.sources.declarative.extractors import DpathExtractor -from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever -from airbyte_cdk.sources.declarative.types import Record from airbyte_cdk.sources.declarative.transformations import RecordTransformation, AddFields from airbyte_cdk.sources.declarative.partition_routers import SubstreamPartitionRouter -from airbyte_cdk.sources.streams.core import Stream -from airbyte_cdk.models import AirbyteMessage, SyncMode, Type -from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState -from airbyte_cdk.sources.streams.core import StreamData @dataclass class ChannelMembersExtractor(DpathExtractor): """ - Record extractor that extracts record of the form from activity logs stream: + Transform response from list of strings to list dicts: + from: ['aa', 'bb'] + to: [{'member_id': 'aa'}, {{'member_id': 'bb'}] """ def extract_records(self, response: requests.Response) -> List[Record]: records = super().extract_records(response) @@ -28,7 +24,8 @@ def extract_records(self, response: requests.Response) -> List[Record]: @dataclass class JoinChannels(RecordTransformation): """ - Implementations of this class define transformations that can be applied to records of a stream. + Make 'conversations.join' POST request for every found channel id + if we are not still a member of such channel """ def transform( @@ -38,8 +35,6 @@ def transform( stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, ) -> Record: - """ sdf """ - print(f"++++++++++CHECK {record['id']} ++++++++++++++++++++++++++++++++++++++++") # The `is_member` property indicates whether or not the API Bot is already assigned / joined to the channel. # https://api.slack.com/types/conversation#booleans channel_id = record.get('id') @@ -54,12 +49,15 @@ def transform( ) print(response.json()) - # self.logger.info(f"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Successfully joined channel: {channel_id}") + # WHAT TO DO IF IT FAILS ???????????????????????? + # self.logger.info(f"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Successfully joined channel: {channel_id}") @dataclass class ThreadsPartitionRouter(SubstreamPartitionRouter): - + """Overwrite SubstreamPartitionRouter to be able to pass more than one value + from parent stream to stream_slices + """ def get_request_params( self, stream_state: Optional[StreamState] = None, @@ -73,18 +71,8 @@ def get_request_params( def stream_slices(self) -> Iterable[StreamSlice]: """ - Iterate over each parent stream's record and create a StreamSlice for each record. - - For each stream, iterate over its stream_slices. - For each stream slice, iterate over each record. - yield a stream slice for each such records. - - If a parent slice contains no record, emit a slice with parent_record=None. - - The template string can interpolate the following values: - - parent_stream_slice: mapping representing the parent's stream slice - - parent_record: mapping representing the parent record - - parent_stream_name: string representing the parent stream name + Change behaviour of main stream_slices by adding two values (for channel_id, ts) from parent stream + (previously it was possible to add only one value) """ if not self.parent_stream_configs: yield from [] @@ -96,17 +84,14 @@ def stream_slices(self) -> Iterable[StreamSlice]: for parent_stream_slice in parent_stream.stream_slices( sync_mode=SyncMode.full_refresh, cursor_field=None, stream_state=None ): - print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@") - print("parent_stream_slice") - print(parent_stream_slice) + empty_parent_slice = True parent_slice = parent_stream_slice for parent_record in parent_stream.read_records( sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=parent_stream_slice, stream_state=None ): - print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@") - print("parent_record") + print(parent_record) # Skip non-records (eg AirbyteLogMessage) if isinstance(parent_record, AirbyteMessage): @@ -116,17 +101,9 @@ def stream_slices(self) -> Iterable[StreamSlice]: continue elif isinstance(parent_record, Record): parent_record = parent_record.data - # try: - # stream_state_value = dpath.util.get(parent_record, parent_field) - # except KeyError: - # pass - # else: + empty_parent_slice = False - print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@") - print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@") - print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@") - print(parent_record) - print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@") + yield { 'channel_id': parent_record['channel_id'], 'ts': parent_record['ts'],