Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
midavadim committed Feb 21, 2024
1 parent e7a034f commit 3788cc2
Showing 1 changed file with 17 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -1,24 +1,20 @@
import dpath.util
from dataclasses import dataclass
from typing import Optional
from typing import List, Mapping, Any, Iterable
import requests
from airbyte_cdk.models import AirbyteMessage, SyncMode, Type
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState
from airbyte_cdk.sources.declarative.extractors import DpathExtractor
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
from airbyte_cdk.sources.declarative.types import Record
from airbyte_cdk.sources.declarative.transformations import RecordTransformation, AddFields
from airbyte_cdk.sources.declarative.partition_routers import SubstreamPartitionRouter
from airbyte_cdk.sources.streams.core import Stream
from airbyte_cdk.models import AirbyteMessage, SyncMode, Type
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState
from airbyte_cdk.sources.streams.core import StreamData


@dataclass
class ChannelMembersExtractor(DpathExtractor):
"""
Record extractor that extracts record of the form from activity logs stream:
Transform response from list of strings to list dicts:
from: ['aa', 'bb']
to: [{'member_id': 'aa'}, {{'member_id': 'bb'}]
"""
def extract_records(self, response: requests.Response) -> List[Record]:
records = super().extract_records(response)
Expand All @@ -28,7 +24,8 @@ def extract_records(self, response: requests.Response) -> List[Record]:
@dataclass
class JoinChannels(RecordTransformation):
"""
Implementations of this class define transformations that can be applied to records of a stream.
Make 'conversations.join' POST request for every found channel id
if we are not still a member of such channel
"""

def transform(
Expand All @@ -38,8 +35,6 @@ def transform(
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> Record:
""" sdf """
print(f"++++++++++CHECK {record['id']} ++++++++++++++++++++++++++++++++++++++++")
# The `is_member` property indicates whether or not the API Bot is already assigned / joined to the channel.
# https://api.slack.com/types/conversation#booleans
channel_id = record.get('id')
Expand All @@ -54,12 +49,15 @@ def transform(
)
print(response.json())

# self.logger.info(f"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Successfully joined channel: {channel_id}")
# WHAT TO DO IF IT FAILS ????????????????????????
# self.logger.info(f"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Successfully joined channel: {channel_id}")


@dataclass
class ThreadsPartitionRouter(SubstreamPartitionRouter):

"""Overwrite SubstreamPartitionRouter to be able to pass more than one value
from parent stream to stream_slices
"""
def get_request_params(
self,
stream_state: Optional[StreamState] = None,
Expand All @@ -73,18 +71,8 @@ def get_request_params(

def stream_slices(self) -> Iterable[StreamSlice]:
"""
Iterate over each parent stream's record and create a StreamSlice for each record.
For each stream, iterate over its stream_slices.
For each stream slice, iterate over each record.
yield a stream slice for each such records.
If a parent slice contains no record, emit a slice with parent_record=None.
The template string can interpolate the following values:
- parent_stream_slice: mapping representing the parent's stream slice
- parent_record: mapping representing the parent record
- parent_stream_name: string representing the parent stream name
Change behaviour of main stream_slices by adding two values (for channel_id, ts) from parent stream
(previously it was possible to add only one value)
"""
if not self.parent_stream_configs:
yield from []
Expand All @@ -96,17 +84,14 @@ def stream_slices(self) -> Iterable[StreamSlice]:
for parent_stream_slice in parent_stream.stream_slices(
sync_mode=SyncMode.full_refresh, cursor_field=None, stream_state=None
):
print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
print("parent_stream_slice")
print(parent_stream_slice)

empty_parent_slice = True
parent_slice = parent_stream_slice

for parent_record in parent_stream.read_records(
sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=parent_stream_slice, stream_state=None
):
print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
print("parent_record")

print(parent_record)
# Skip non-records (eg AirbyteLogMessage)
if isinstance(parent_record, AirbyteMessage):
Expand All @@ -116,17 +101,9 @@ def stream_slices(self) -> Iterable[StreamSlice]:
continue
elif isinstance(parent_record, Record):
parent_record = parent_record.data
# try:
# stream_state_value = dpath.util.get(parent_record, parent_field)
# except KeyError:
# pass
# else:

empty_parent_slice = False
print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
print(parent_record)
print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")

yield {
'channel_id': parent_record['channel_id'],
'ts': parent_record['ts'],
Expand Down

0 comments on commit 3788cc2

Please sign in to comment.