Skip to content

Commit

Permalink
perf(source-bing-ads): bump CDK (#46991)
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Inzhyyants <[email protected]>
  • Loading branch information
artem1205 authored Oct 23, 2024
1 parent b66b78f commit 246502e
Show file tree
Hide file tree
Showing 12 changed files with 329 additions and 230 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#


import io
import json
import logging
import os
Expand All @@ -20,6 +21,12 @@
from pydantic import ValidationError


def splitlines_generator(input_string: str):
with io.StringIO(input_string) as stream:
for line in stream:
yield line.rstrip("\n")


async def get_container_from_id(dagger_client: dagger.Client, container_id: str) -> dagger.Container:
"""Get a dagger container from its id.
Please remind that container id are not persistent and can change between Dagger sessions.
Expand Down Expand Up @@ -250,7 +257,7 @@ async def _run(
if catalog:
container = container.with_new_file(self.IN_CONTAINER_CATALOG_PATH, contents=catalog.json())
try:
output = await self._read_output_from_stdout(airbyte_command, container)
output = await self._read_output_from_file(airbyte_command, container)
except dagger.QueryError as e:
output_too_big = bool([error for error in e.errors if error.message.startswith("file size")])
if output_too_big:
Expand Down Expand Up @@ -281,7 +288,7 @@ async def _read_output_from_file(self, airbyte_command: list, container: dagger.

def parse_airbyte_messages_from_command_output(self, command_output: str) -> List[AirbyteMessage]:
airbyte_messages = []
for line in command_output.splitlines():
for line in splitlines_generator(command_output):
try:
airbyte_message = AirbyteMessage.parse_raw(line)
if airbyte_message.type is AirbyteMessageType.CONTROL and airbyte_message.control.type is OrchestratorType.CONNECTOR_CONFIG:
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 47f25999-dd5e-4636-8c39-e7cea2453331
dockerImageTag: 2.7.9
dockerImageTag: 2.8.0
dockerRepository: airbyte/source-bing-ads
documentationUrl: https://docs.airbyte.com/integrations/sources/bing-ads
erdUrl: https://dbdocs.io/airbyteio/source-bing-ads?view=relationships
Expand Down
286 changes: 190 additions & 96 deletions airbyte-integrations/connectors/source-bing-ads/poetry.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.7.9"
version = "2.8.0"
name = "source-bing-ads"
description = "Source implementation for Bing Ads."
authors = [ "Airbyte <[email protected]>",]
Expand All @@ -18,9 +18,8 @@ include = "source_bing_ads"
[tool.poetry.dependencies]
python = "^3.10,<3.12"
bingads = "==13.0.18.1"
pandas = "==2.2.0"
urllib3 = "==1.26.18"
airbyte-cdk = "^4"
airbyte-cdk = "^5"
cached-property = "==1.5.2"

[tool.poetry.scripts]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@

import _csv
import pendulum
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.core import package_name_from_class
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from airbyte_protocol.models import SyncMode
from bingads import ServiceClient
from bingads.v13.internal.reporting.row_report import _RowReport
from bingads.v13.internal.reporting.row_report_iterator import _RowReportRecord
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
from unittest import TestCase
from unittest.mock import MagicMock, patch

from airbyte_cdk.models import SyncMode
from airbyte_cdk.models import AirbyteStateMessage, SyncMode
from airbyte_cdk.test.catalog_builder import CatalogBuilder
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read
from airbyte_cdk.test.mock_http import HttpMocker
from airbyte_cdk.test.state_builder import StateBuilder
from airbyte_protocol.models import AirbyteStateMessage
from bingads.v13.bulk import BulkServiceManager
from bingads.v13.reporting.reporting_service_manager import ReportingServiceManager
from client_builder import build_request, response_with_status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def test_incremental_read_cursor_value_matches_value_from_most_recent_record(sel
self.auth_client(http_mocker)
output, _ = self.read_stream(self.stream_name, SyncMode.incremental, self._config, "app_install_ad_labels_with_cursor_value")
assert len(output.records) == 4
assert dict(output.most_recent_state.stream_state).get(self.account_id, {}) == {self.cursor_field: "2024-01-04T12:12:12.028+00:00"}
assert output.most_recent_state.stream_state.__dict__.get(self.account_id, {}) == {self.cursor_field: "2024-01-04T12:12:12.028+00:00"}

@HttpMocker()
@freeze_time("2024-02-26") # mock current time as stream data available for 30 days only
Expand All @@ -52,8 +52,8 @@ def test_incremental_read_with_state(self, http_mocker: HttpMocker):
"app_install_ad_labels_with_state",
state
)
assert dict(output.most_recent_state.stream_state).get(self.account_id, {}) == {self.cursor_field: "2024-01-29T12:55:12.028+00:00"}
assert output.most_recent_state.stream_state.__dict__.get(self.account_id, {}) == {self.cursor_field: "2024-01-29T12:55:12.028+00:00"}

previous_state = state[0].stream.stream_state.dict()
previous_state = state[0].stream.stream_state.__dict__
# gets DownloadParams object
assert service_call_mock.call_args.args[0].last_sync_time_in_utc == pendulum.parse(previous_state[self.account_id][self.cursor_field])
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ def test_incremental_read_cursor_value_matches_value_from_most_recent_record(sel
self.auth_client(http_mocker)
output, _ = self.read_stream(self.stream_name, SyncMode.incremental, self._config, "app_install_ads_with_cursor_value")
assert len(output.records) == 4
assert dict(output.most_recent_state.stream_state).get(self.account_id, {}) == {self.cursor_field: "2024-03-01T12:49:12.028+00:00"}
assert output.most_recent_state.stream_state.__dict__.get(self.account_id, {}) == {self.cursor_field: "2024-03-01T12:49:12.028+00:00"}

@HttpMocker()
@freeze_time("2023-12-29") # mock current time as stream data available for 30 days only
def test_incremental_read_with_state(self, http_mocker: HttpMocker):
state = self._state("app_install_ads_state", self.stream_name)
self.auth_client(http_mocker)
output, service_call_mock = self.read_stream(self.stream_name, SyncMode.incremental, self._config, "app_install_ads_with_state", state)
assert dict(output.most_recent_state.stream_state).get(self.account_id, {}) == {self.cursor_field: "2024-01-01T10:55:12.028+00:00"}
assert output.most_recent_state.stream_state.__dict__.get(self.account_id, {}) == {self.cursor_field: "2024-01-01T10:55:12.028+00:00"}

previous_state = state[0].stream.stream_state.dict()
previous_state = state[0].stream.stream_state.__dict__
# gets DownloadParams object
assert service_call_mock.call_args.args[0].last_sync_time_in_utc == pendulum.parse(previous_state[self.account_id][self.cursor_field])
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def test_incremental_read_cursor_value_matches_value_from_most_recent_record(sel
self.auth_client(http_mocker)
output, _ = self.read_stream(self.stream_name, SyncMode.incremental, self._config, "budget_with_cursor_value")
assert len(output.records) == 8
assert output.most_recent_state.stream_state.dict().get(self.account_id) == {self.cursor_field: "2024-01-01T12:54:12.028+00:00"}
assert output.most_recent_state.stream_state.__dict__.get(self.account_id) == {self.cursor_field: "2024-01-01T12:54:12.028+00:00"}

@HttpMocker()
@freeze_time("2024-02-26") # mock current time as stream data available for 30 days only
Expand All @@ -47,9 +47,9 @@ def test_incremental_read_with_state(self, http_mocker: HttpMocker):
self.auth_client(http_mocker)
output, service_call_mock = self.read_stream(self.stream_name, SyncMode.incremental, self._config, "budget_with_state", state)
assert len(output.records) == 8
assert output.most_recent_state.stream_state.dict().get(self.account_id) == {self.cursor_field: "2024-01-30T12:54:12.028+00:00"}
assert output.most_recent_state.stream_state.__dict__.get(self.account_id) == {self.cursor_field: "2024-01-30T12:54:12.028+00:00"}

previous_state = state[0].stream.stream_state.dict()
previous_state = state[0].stream.stream_state.__dict__
# gets DownloadParams object
assert service_call_mock.call_args.args[0].last_sync_time_in_utc == pendulum.parse(
previous_state[self.account_id][self.cursor_field]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def test_incremental_read_returns_records(self, http_mocker: HttpMocker):
self.auth_client(http_mocker)
output, _ = self.read_stream(self.stream_name, SyncMode.incremental, self._config, self.report_file)
assert len(output.records) == self.records_number
assert dict(output.most_recent_state.stream_state) == self.first_read_state
assert output.most_recent_state.stream_state.__dict__ == self.first_read_state

@HttpMocker()
def test_incremental_read_with_state_returns_records(self, http_mocker: HttpMocker):
Expand All @@ -84,11 +84,11 @@ def test_incremental_read_with_state_returns_records(self, http_mocker: HttpMock
else:
assert len(output.records) == self.second_read_records_number

actual_cursor = dict(output.most_recent_state.stream_state).get(self.account_id)
actual_cursor = output.most_recent_state.stream_state.__dict__.get(self.account_id)
expected_cursor = self.second_read_state.get(self.account_id)
assert actual_cursor == expected_cursor

provided_state = state[0].stream.stream_state.dict()[self.account_id][self.cursor_field]
provided_state = state[0].stream.stream_state.__dict__[self.account_id][self.cursor_field]
# gets ReportDownloadParams object
request_start_date = service_call_mock.call_args.args[0].report_request.Time.CustomDateRangeStart
year = request_start_date.Year
Expand Down
Loading

0 comments on commit 246502e

Please sign in to comment.