Skip to content

Commit

Permalink
Merge branch 'master' into ykurochkin/source-stripe/implement-partition
Browse files Browse the repository at this point in the history
  • Loading branch information
girarda authored Oct 23, 2023
2 parents f568b03 + dcfa331 commit 3686904
Show file tree
Hide file tree
Showing 188 changed files with 4,734 additions and 1,730 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.50.31
current_version = 0.50.32
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-[a-z]+)?
Expand Down
8 changes: 8 additions & 0 deletions .github/actions/run-dagger-pipeline/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ inputs:
ci_job_key:
description: "CI job key"
required: false
s3_build_cache_access_key_id:
description: "Gradle S3 Build Cache AWS access key ID"
required: false
s3_build_cache_secret_key:
description: "Gradle S3 Build Cache AWS secret key"
required: false
runs:
using: "composite"
steps:
Expand Down Expand Up @@ -120,4 +126,6 @@ runs:
SPEC_CACHE_GCS_CREDENTIALS: ${{ inputs.spec_cache_gcs_credentials }}
DOCKER_HUB_USERNAME: ${{ inputs.docker_hub_username }}
DOCKER_HUB_PASSWORD: ${{ inputs.docker_hub_password }}
S3_BUILD_CACHE_ACCESS_KEY_ID: ${{ inputs.s3_build_cache_access_key_id }}
S3_BUILD_CACHE_SECRET_KEY: ${{ inputs.s3_build_cache_secret_key }}
CI: "True"
2 changes: 2 additions & 0 deletions .github/workflows/connectors_nightly_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,6 @@ jobs:
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
git_branch: ${{ steps.extract_branch.outputs.branch }}
github_token: ${{ secrets.GITHUB_TOKEN }}
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
subcommand: "connectors ${{ inputs.test-connectors-options || '--concurrency=8 --support-level=certified' }} test"
4 changes: 4 additions & 0 deletions .github/workflows/connectors_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ jobs:
git_branch: ${{ steps.extract_branch.outputs.branch }}
git_revision: ${{ steps.fetch_last_commit_id_pr.outputs.commit_id }}
github_token: ${{ env.PAT }}
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
subcommand: "connectors ${{ github.event.inputs.test-connectors-options }} test"
- name: Test connectors [PULL REQUESTS]
if: github.event_name == 'pull_request'
Expand All @@ -76,4 +78,6 @@ jobs:
git_branch: ${{ github.head_ref }}
git_revision: ${{ steps.fetch_last_commit_id_pr.outputs.commit_id }}
github_token: ${{ env.PAT }}
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
subcommand: "connectors --modified test"
4 changes: 4 additions & 0 deletions .github/workflows/publish_connectors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ jobs:
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
slack_webhook_url: ${{ secrets.PUBLISH_ON_MERGE_SLACK_WEBHOOK }}
spec_cache_gcs_credentials: ${{ secrets.SPEC_CACHE_SERVICE_ACCOUNT_KEY_PUBLISH }}
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
subcommand: "connectors --concurrency=1 --execute-timeout=3600 --metadata-changes-only publish --main-release"

- name: Publish connectors [manual]
Expand All @@ -57,6 +59,8 @@ jobs:
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
slack_webhook_url: ${{ secrets.PUBLISH_ON_MERGE_SLACK_WEBHOOK }}
spec_cache_gcs_credentials: ${{ secrets.SPEC_CACHE_SERVICE_ACCOUNT_KEY_PUBLISH }}
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
subcommand: "connectors ${{ github.event.inputs.connectors-options }} publish ${{ github.event.inputs.publish-options }}"

set-instatus-incident-on-failure:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/.bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.51.41
current_version = 0.51.44
commit = False

[bumpversion:file:setup.py]
Expand Down
9 changes: 9 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Changelog

## 0.51.44
low-code: Allow connector developers to specify the type of an added field

## 0.51.43
concurrent cdk: fail fast if a partition raises an exception

## 0.51.42
File CDK: Avoid listing all files for check command

## 0.51.41
Vector DB CDK: Expose stream identifier logic, add field remapping to processing | File CDK: Emit analytics message for used streams

Expand Down
4 changes: 2 additions & 2 deletions airbyte-cdk/python/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ RUN apk --no-cache upgrade \
&& apk --no-cache add tzdata build-base

# install airbyte-cdk
RUN pip install --prefix=/install airbyte-cdk==0.51.41
RUN pip install --prefix=/install airbyte-cdk==0.51.44

# build a clean environment
FROM base
Expand All @@ -32,5 +32,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

# needs to be the same as CDK
LABEL io.airbyte.version=0.51.41
LABEL io.airbyte.version=0.51.44
LABEL io.airbyte.name=airbyte/source-declarative-manifest
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ definitions:
- "{{ record['updates'] }}"
- "{{ record['MetaData']['LastUpdatedTime'] }}"
- "{{ stream_partition['segment_id'] }}"
value_type:
title: Value Type
description: Type of the value. If not specified, the type will be inferred from the value.
"$ref": "#/definitions/ValueType"
$parameters:
type: object
additionalProperties: true
Expand Down Expand Up @@ -1987,6 +1991,15 @@ definitions:
$parameters:
type: object
additionalProperties: true
ValueType:
title: Value Type
description: A schema type.
type: string
enum:
- string
- number
- integer
- boolean
WaitTimeFromHeader:
title: Wait Time Extracted From Response Header
description: Extract wait time from a HTTP header in the response.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,6 @@
from typing_extensions import Literal


class AddedFieldDefinition(BaseModel):
type: Literal['AddedFieldDefinition']
path: List[str] = Field(
...,
description='List of strings defining the path where to add the value on the record.',
examples=[['segment_id'], ['metadata', 'segment_id']],
title='Path',
)
value: str = Field(
...,
description="Value of the new field. Use {{ record['existing_field'] }} syntax to refer to other fields in the record.",
examples=[
"{{ record['updates'] }}",
"{{ record['MetaData']['LastUpdatedTime'] }}",
"{{ stream_partition['segment_id'] }}",
],
title='Value',
)
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')


class AddFields(BaseModel):
type: Literal['AddFields']
fields: List[AddedFieldDefinition] = Field(
...,
description='List of transformations (path and corresponding value) that will be added to the record.',
title='Fields',
)
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')


class AuthFlowType(Enum):
oauth2_0 = 'oauth2.0'
oauth1_0 = 'oauth1.0'
Expand Down Expand Up @@ -694,6 +663,13 @@ class LegacySessionTokenAuthenticator(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')


class ValueType(Enum):
string = 'string'
number = 'number'
integer = 'integer'
boolean = 'boolean'


class WaitTimeFromHeader(BaseModel):
type: Literal['WaitTimeFromHeader']
header: str = Field(
Expand Down Expand Up @@ -734,6 +710,42 @@ class WaitUntilTimeFromHeader(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')


class AddedFieldDefinition(BaseModel):
type: Literal['AddedFieldDefinition']
path: List[str] = Field(
...,
description='List of strings defining the path where to add the value on the record.',
examples=[['segment_id'], ['metadata', 'segment_id']],
title='Path',
)
value: str = Field(
...,
description="Value of the new field. Use {{ record['existing_field'] }} syntax to refer to other fields in the record.",
examples=[
"{{ record['updates'] }}",
"{{ record['MetaData']['LastUpdatedTime'] }}",
"{{ stream_partition['segment_id'] }}",
],
title='Value',
)
value_type: Optional[ValueType] = Field(
None,
description='Type of the value. If not specified, the type will be inferred from the value.',
title='Value Type',
)
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')


class AddFields(BaseModel):
type: Literal['AddFields']
fields: List[AddedFieldDefinition] = Field(
...,
description='List of transformations (path and corresponding value) that will be added to the record.',
title='Fields',
)
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')


class ApiKeyAuthenticator(BaseModel):
type: Literal['ApiKeyAuthenticator']
api_token: Optional[str] = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import SimpleRetriever as SimpleRetrieverModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import SubstreamPartitionRouter as SubstreamPartitionRouterModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ValueType
from airbyte_cdk.sources.declarative.models.declarative_component_schema import WaitTimeFromHeader as WaitTimeFromHeaderModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import WaitUntilTimeFromHeader as WaitUntilTimeFromHeaderModel
from airbyte_cdk.sources.declarative.partition_routers import ListPartitionRouter, SinglePartitionRouter, SubstreamPartitionRouter
Expand Down Expand Up @@ -232,15 +233,36 @@ def _create_component_from_model(self, model: BaseModel, config: Config, **kwarg
@staticmethod
def create_added_field_definition(model: AddedFieldDefinitionModel, config: Config, **kwargs: Any) -> AddedFieldDefinition:
interpolated_value = InterpolatedString.create(model.value, parameters=model.parameters or {})
return AddedFieldDefinition(path=model.path, value=interpolated_value, parameters=model.parameters or {})
return AddedFieldDefinition(
path=model.path,
value=interpolated_value,
value_type=ModelToComponentFactory._json_schema_type_name_to_type(model.value_type),
parameters=model.parameters or {},
)

def create_add_fields(self, model: AddFieldsModel, config: Config, **kwargs: Any) -> AddFields:
added_field_definitions = [
self._create_component_from_model(model=added_field_definition_model, config=config)
self._create_component_from_model(
model=added_field_definition_model,
value_type=ModelToComponentFactory._json_schema_type_name_to_type(added_field_definition_model.value_type),
config=config,
)
for added_field_definition_model in model.fields
]
return AddFields(fields=added_field_definitions, parameters=model.parameters or {})

@staticmethod
def _json_schema_type_name_to_type(value_type: Optional[ValueType]) -> Optional[Type[Any]]:
if not value_type:
return None
names_to_types = {
ValueType.string: str,
ValueType.number: float,
ValueType.integer: int,
ValueType.boolean: bool,
}
return names_to_types[value_type]

@staticmethod
def create_api_key_authenticator(
model: ApiKeyAuthenticatorModel, config: Config, token_provider: Optional[TokenProvider] = None, **kwargs: Any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

from abc import abstractmethod
from dataclasses import dataclass
from typing import Any, List, Mapping, Optional
from typing import Any, List, Optional

import requests
from airbyte_cdk.sources.declarative.types import Record


@dataclass
Expand All @@ -23,7 +24,7 @@ def initial_token(self) -> Optional[Any]:
"""

@abstractmethod
def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Any]:
def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Any]:
"""
:param response: response to process
:param last_records: records extracted from the response
Expand All @@ -32,7 +33,7 @@ def next_page_token(self, response: requests.Response, last_records: List[Mappin
pass

@abstractmethod
def reset(self):
def reset(self) -> None:
"""
Reset the pagination's inner state
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,12 @@ def next_page_token(self, response: requests.Response, last_records: List[Record
return None
return self._delegate.next_page_token(response, last_records)

def reset(self):
def reset(self) -> None:
self._delegate.reset()

def get_page_size(self) -> Optional[int]:
return self._delegate.get_page_size()

@property
def initial_token(self) -> Optional[Any]:
return self._delegate.initial_token
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

from dataclasses import InitVar, dataclass, field
from typing import Any, List, Mapping, Optional, Union
from typing import Any, List, Mapping, Optional, Type, Union

import dpath.util
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
Expand All @@ -17,6 +17,7 @@ class AddedFieldDefinition:

path: FieldPointer
value: Union[InterpolatedString, str]
value_type: Optional[Type[Any]]
parameters: InitVar[Mapping[str, Any]]


Expand All @@ -26,6 +27,7 @@ class ParsedAddFieldDefinition:

path: FieldPointer
value: InterpolatedString
value_type: Optional[Type[Any]]
parameters: InitVar[Mapping[str, Any]]


Expand Down Expand Up @@ -85,22 +87,27 @@ class AddFields(RecordTransformation):
parameters: InitVar[Mapping[str, Any]]
_parsed_fields: List[ParsedAddFieldDefinition] = field(init=False, repr=False, default_factory=list)

def __post_init__(self, parameters: Mapping[str, Any]):
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
for add_field in self.fields:
if len(add_field.path) < 1:
raise f"Expected a non-zero-length path for the AddFields transformation {add_field}"
raise ValueError(f"Expected a non-zero-length path for the AddFields transformation {add_field}")

if not isinstance(add_field.value, InterpolatedString):
if not isinstance(add_field.value, str):
raise f"Expected a string value for the AddFields transformation: {add_field}"
else:
self._parsed_fields.append(
ParsedAddFieldDefinition(
add_field.path, InterpolatedString.create(add_field.value, parameters=parameters), parameters=parameters
add_field.path,
InterpolatedString.create(add_field.value, parameters=parameters),
value_type=add_field.value_type,
parameters=parameters,
)
)
else:
self._parsed_fields.append(ParsedAddFieldDefinition(add_field.path, add_field.value, parameters={}))
self._parsed_fields.append(
ParsedAddFieldDefinition(add_field.path, add_field.value, value_type=add_field.value_type, parameters={})
)

def transform(
self,
Expand All @@ -109,12 +116,15 @@ def transform(
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> Record:
if config is None:
config = {}
kwargs = {"record": record, "stream_state": stream_state, "stream_slice": stream_slice}
for parsed_field in self._parsed_fields:
value = parsed_field.value.eval(config, **kwargs)
valid_types = (parsed_field.value_type,) if parsed_field.value_type else None
value = parsed_field.value.eval(config, valid_types=valid_types, **kwargs)
dpath.util.new(record, parsed_field.path, value)

return record

def __eq__(self, other):
return self.__dict__ == other.__dict__
def __eq__(self, other: Any) -> bool:
return bool(self.__dict__ == other.__dict__)
Loading

0 comments on commit 3686904

Please sign in to comment.