From 8805edcea6deca3fc52212d013c7da720386b51e Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 18 Sep 2023 08:58:56 -0700 Subject: [PATCH] DV2: Better errors in the UI (#30491) Co-authored-by: edgao --- .../models/declarative_component_schema.py | 1016 +++++++++-------- .../SerializedBufferingStrategy.java | 8 +- .../util/ConnectorExceptionUtil.java | 28 +- .../typing_deduping/FutureUtils.java | 17 +- .../destination-bigquery/Dockerfile | 2 +- .../destination-bigquery/metadata.yaml | 2 +- .../bigquery/BigQueryRecordConsumer.java | 7 +- .../BigQueryDestinationHandler.java | 24 +- .../destination-snowflake/Dockerfile | 2 +- .../destination-snowflake/metadata.yaml | 2 +- .../SnowflakeDestinationHandler.java | 17 +- .../SnowflakeSqlGeneratorIntegrationTest.java | 2 +- docs/integrations/destinations/bigquery.md | 1 + docs/integrations/destinations/snowflake.md | 1 + 14 files changed, 601 insertions(+), 528 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 24e3a907b115..c790291e9fc0 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -11,12 +11,12 @@ class AddedFieldDefinition(BaseModel): - type: Literal["AddedFieldDefinition"] + type: Literal['AddedFieldDefinition'] path: List[str] = Field( ..., - description="List of strings defining the path where to add the value on the record.", - examples=[["segment_id"], ["metadata", "segment_id"]], - title="Path", + description='List of strings defining the path where to add the value on the record.', + examples=[['segment_id'], ['metadata', 'segment_id']], + title='Path', ) value: str = Field( ..., @@ -26,601 +26,607 @@ class AddedFieldDefinition(BaseModel): "{{ record['MetaData']['LastUpdatedTime'] }}", "{{ stream_partition['segment_id'] }}", ], - title="Value", + title='Value', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class AddFields(BaseModel): - type: Literal["AddFields"] + type: Literal['AddFields'] fields: List[AddedFieldDefinition] = Field( ..., - description="List of transformations (path and corresponding value) that will be added to the record.", - title="Fields", + description='List of transformations (path and corresponding value) that will be added to the record.', + title='Fields', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class AuthFlowType(Enum): - oauth2_0 = "oauth2.0" - oauth1_0 = "oauth1.0" + oauth2_0 = 'oauth2.0' + oauth1_0 = 'oauth1.0' class BasicHttpAuthenticator(BaseModel): - type: Literal["BasicHttpAuthenticator"] + type: Literal['BasicHttpAuthenticator'] username: str = Field( ..., - description="The username that will be combined with the password, base64 encoded and used to make requests. Fill it in the user inputs.", + description='The username that will be combined with the password, base64 encoded and used to make requests. Fill it in the user inputs.', examples=["{{ config['username'] }}", "{{ config['api_key'] }}"], - title="Username", + title='Username', ) password: Optional[str] = Field( - "", - description="The password that will be combined with the username, base64 encoded and used to make requests. Fill it in the user inputs.", - examples=["{{ config['password'] }}", ""], - title="Password", + '', + description='The password that will be combined with the username, base64 encoded and used to make requests. Fill it in the user inputs.', + examples=["{{ config['password'] }}", ''], + title='Password', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class BearerAuthenticator(BaseModel): - type: Literal["BearerAuthenticator"] + type: Literal['BearerAuthenticator'] api_token: str = Field( ..., - description="Token to inject as request header for authenticating with the API.", + description='Token to inject as request header for authenticating with the API.', examples=["{{ config['api_key'] }}", "{{ config['token'] }}"], - title="Bearer Token", + title='Bearer Token', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class CheckStream(BaseModel): - type: Literal["CheckStream"] + type: Literal['CheckStream'] stream_names: List[str] = Field( ..., - description="Names of the streams to try reading from when running a check operation.", - examples=[["users"], ["users", "contacts"]], - title="Stream Names", + description='Names of the streams to try reading from when running a check operation.', + examples=[['users'], ['users', 'contacts']], + title='Stream Names', ) class ConstantBackoffStrategy(BaseModel): - type: Literal["ConstantBackoffStrategy"] + type: Literal['ConstantBackoffStrategy'] backoff_time_in_seconds: Union[float, str] = Field( ..., - description="Backoff time in seconds.", + description='Backoff time in seconds.', examples=[30, 30.5, "{{ config['backoff_time'] }}"], - title="Backoff Time", + title='Backoff Time', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class CustomAuthenticator(BaseModel): class Config: extra = Extra.allow - type: Literal["CustomAuthenticator"] + type: Literal['CustomAuthenticator'] class_name: str = Field( ..., - description="Fully-qualified name of the class that will be implementing the custom authentication strategy. Has to be a sub class of DeclarativeAuthenticator. The format is `source_..`.", - examples=["source_railz.components.ShortLivedTokenAuthenticator"], - title="Class Name", + description='Fully-qualified name of the class that will be implementing the custom authentication strategy. Has to be a sub class of DeclarativeAuthenticator. The format is `source_..`.', + examples=['source_railz.components.ShortLivedTokenAuthenticator'], + title='Class Name', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class CustomBackoffStrategy(BaseModel): class Config: extra = Extra.allow - type: Literal["CustomBackoffStrategy"] + type: Literal['CustomBackoffStrategy'] class_name: str = Field( ..., - description="Fully-qualified name of the class that will be implementing the custom backoff strategy. The format is `source_..`.", - examples=["source_railz.components.MyCustomBackoffStrategy"], - title="Class Name", + description='Fully-qualified name of the class that will be implementing the custom backoff strategy. The format is `source_..`.', + examples=['source_railz.components.MyCustomBackoffStrategy'], + title='Class Name', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class CustomErrorHandler(BaseModel): class Config: extra = Extra.allow - type: Literal["CustomErrorHandler"] + type: Literal['CustomErrorHandler'] class_name: str = Field( ..., - description="Fully-qualified name of the class that will be implementing the custom error handler. The format is `source_..`.", - examples=["source_railz.components.MyCustomErrorHandler"], - title="Class Name", + description='Fully-qualified name of the class that will be implementing the custom error handler. The format is `source_..`.', + examples=['source_railz.components.MyCustomErrorHandler'], + title='Class Name', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class CustomIncrementalSync(BaseModel): class Config: extra = Extra.allow - type: Literal["CustomIncrementalSync"] + type: Literal['CustomIncrementalSync'] class_name: str = Field( ..., - description="Fully-qualified name of the class that will be implementing the custom incremental sync. The format is `source_..`.", - examples=["source_railz.components.MyCustomIncrementalSync"], - title="Class Name", + description='Fully-qualified name of the class that will be implementing the custom incremental sync. The format is `source_..`.', + examples=['source_railz.components.MyCustomIncrementalSync'], + title='Class Name', ) cursor_field: str = Field( ..., - description="The location of the value on a record that will be used as a bookmark during sync.", + description='The location of the value on a record that will be used as a bookmark during sync.', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class CustomPaginationStrategy(BaseModel): class Config: extra = Extra.allow - type: Literal["CustomPaginationStrategy"] + type: Literal['CustomPaginationStrategy'] class_name: str = Field( ..., - description="Fully-qualified name of the class that will be implementing the custom pagination strategy. The format is `source_..`.", - examples=["source_railz.components.MyCustomPaginationStrategy"], - title="Class Name", + description='Fully-qualified name of the class that will be implementing the custom pagination strategy. The format is `source_..`.', + examples=['source_railz.components.MyCustomPaginationStrategy'], + title='Class Name', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class CustomRecordExtractor(BaseModel): class Config: extra = Extra.allow - type: Literal["CustomRecordExtractor"] + type: Literal['CustomRecordExtractor'] class_name: str = Field( ..., - description="Fully-qualified name of the class that will be implementing the custom record extraction strategy. The format is `source_..`.", - examples=["source_railz.components.MyCustomRecordExtractor"], - title="Class Name", + description='Fully-qualified name of the class that will be implementing the custom record extraction strategy. The format is `source_..`.', + examples=['source_railz.components.MyCustomRecordExtractor'], + title='Class Name', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class CustomRequester(BaseModel): class Config: extra = Extra.allow - type: Literal["CustomRequester"] + type: Literal['CustomRequester'] class_name: str = Field( ..., - description="Fully-qualified name of the class that will be implementing the custom requester strategy. The format is `source_..`.", - examples=["source_railz.components.MyCustomRecordExtractor"], - title="Class Name", + description='Fully-qualified name of the class that will be implementing the custom requester strategy. The format is `source_..`.', + examples=['source_railz.components.MyCustomRecordExtractor'], + title='Class Name', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class CustomRetriever(BaseModel): class Config: extra = Extra.allow - type: Literal["CustomRetriever"] + type: Literal['CustomRetriever'] class_name: str = Field( ..., - description="Fully-qualified name of the class that will be implementing the custom retriever strategy. The format is `source_..`.", - examples=["source_railz.components.MyCustomRetriever"], - title="Class Name", + description='Fully-qualified name of the class that will be implementing the custom retriever strategy. The format is `source_..`.', + examples=['source_railz.components.MyCustomRetriever'], + title='Class Name', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class CustomPartitionRouter(BaseModel): class Config: extra = Extra.allow - type: Literal["CustomPartitionRouter"] + type: Literal['CustomPartitionRouter'] class_name: str = Field( ..., - description="Fully-qualified name of the class that will be implementing the custom partition router. The format is `source_..`.", - examples=["source_railz.components.MyCustomPartitionRouter"], - title="Class Name", + description='Fully-qualified name of the class that will be implementing the custom partition router. The format is `source_..`.', + examples=['source_railz.components.MyCustomPartitionRouter'], + title='Class Name', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class CustomTransformation(BaseModel): class Config: extra = Extra.allow - type: Literal["CustomTransformation"] + type: Literal['CustomTransformation'] class_name: str = Field( ..., - description="Fully-qualified name of the class that will be implementing the custom transformation. The format is `source_..`.", - examples=["source_railz.components.MyCustomTransformation"], - title="Class Name", + description='Fully-qualified name of the class that will be implementing the custom transformation. The format is `source_..`.', + examples=['source_railz.components.MyCustomTransformation'], + title='Class Name', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class RefreshTokenUpdater(BaseModel): refresh_token_name: Optional[str] = Field( - "refresh_token", - description="The name of the property which contains the updated refresh token in the response from the token refresh endpoint.", - examples=["refresh_token"], - title="Refresh Token Property Name", + 'refresh_token', + description='The name of the property which contains the updated refresh token in the response from the token refresh endpoint.', + examples=['refresh_token'], + title='Refresh Token Property Name', ) access_token_config_path: Optional[List[str]] = Field( - ["credentials", "access_token"], - description="Config path to the access token. Make sure the field actually exists in the config.", - examples=[["credentials", "access_token"], ["access_token"]], - title="Config Path To Access Token", + ['credentials', 'access_token'], + description='Config path to the access token. Make sure the field actually exists in the config.', + examples=[['credentials', 'access_token'], ['access_token']], + title='Config Path To Access Token', ) refresh_token_config_path: Optional[List[str]] = Field( - ["credentials", "refresh_token"], - description="Config path to the access token. Make sure the field actually exists in the config.", - examples=[["credentials", "refresh_token"], ["refresh_token"]], - title="Config Path To Refresh Token", + ['credentials', 'refresh_token'], + description='Config path to the access token. Make sure the field actually exists in the config.', + examples=[['credentials', 'refresh_token'], ['refresh_token']], + title='Config Path To Refresh Token', ) token_expiry_date_config_path: Optional[List[str]] = Field( - ["credentials", "token_expiry_date"], - description="Config path to the expiry date. Make sure actually exists in the config.", - examples=[["credentials", "token_expiry_date"]], - title="Config Path To Expiry Date", + ['credentials', 'token_expiry_date'], + description='Config path to the expiry date. Make sure actually exists in the config.', + examples=[['credentials', 'token_expiry_date']], + title='Config Path To Expiry Date', ) class OAuthAuthenticator(BaseModel): - type: Literal["OAuthAuthenticator"] + type: Literal['OAuthAuthenticator'] client_id: str = Field( ..., - description="The OAuth client ID. Fill it in the user inputs.", + description='The OAuth client ID. Fill it in the user inputs.', examples=["{{ config['client_id }}", "{{ config['credentials']['client_id }}"], - title="Client ID", + title='Client ID', ) client_secret: str = Field( ..., - description="The OAuth client secret. Fill it in the user inputs.", + description='The OAuth client secret. Fill it in the user inputs.', examples=[ "{{ config['client_secret }}", "{{ config['credentials']['client_secret }}", ], - title="Client Secret", + title='Client Secret', ) refresh_token: Optional[str] = Field( None, - description="Credential artifact used to get a new access token.", + description='Credential artifact used to get a new access token.', examples=[ "{{ config['refresh_token'] }}", "{{ config['credentials]['refresh_token'] }}", ], - title="Refresh Token", + title='Refresh Token', ) token_refresh_endpoint: str = Field( ..., - description="The full URL to call to obtain a new access token.", - examples=["https://connect.squareup.com/oauth2/token"], - title="Token Refresh Endpoint", + description='The full URL to call to obtain a new access token.', + examples=['https://connect.squareup.com/oauth2/token'], + title='Token Refresh Endpoint', ) access_token_name: Optional[str] = Field( - "access_token", - description="The name of the property which contains the access token in the response from the token refresh endpoint.", - examples=["access_token"], - title="Access Token Property Name", + 'access_token', + description='The name of the property which contains the access token in the response from the token refresh endpoint.', + examples=['access_token'], + title='Access Token Property Name', ) expires_in_name: Optional[str] = Field( - "expires_in", - description="The name of the property which contains the expiry date in the response from the token refresh endpoint.", - examples=["expires_in"], - title="Token Expiry Property Name", + 'expires_in', + description='The name of the property which contains the expiry date in the response from the token refresh endpoint.', + examples=['expires_in'], + title='Token Expiry Property Name', ) grant_type: Optional[str] = Field( - "refresh_token", - description="Specifies the OAuth2 grant type. If set to refresh_token, the refresh_token needs to be provided as well. For client_credentials, only client id and secret are required. Other grant types are not officially supported.", - examples=["refresh_token", "client_credentials"], - title="Grant Type", + 'refresh_token', + description='Specifies the OAuth2 grant type. If set to refresh_token, the refresh_token needs to be provided as well. For client_credentials, only client id and secret are required. Other grant types are not officially supported.', + examples=['refresh_token', 'client_credentials'], + title='Grant Type', ) refresh_request_body: Optional[Dict[str, Any]] = Field( None, - description="Body of the request sent to get a new access token.", + description='Body of the request sent to get a new access token.', examples=[ { - "applicationId": "{{ config['application_id'] }}", - "applicationSecret": "{{ config['application_secret'] }}", - "token": "{{ config['token'] }}", + 'applicationId': "{{ config['application_id'] }}", + 'applicationSecret': "{{ config['application_secret'] }}", + 'token': "{{ config['token'] }}", } ], - title="Refresh Request Body", + title='Refresh Request Body', ) scopes: Optional[List[str]] = Field( None, - description="List of scopes that should be granted to the access token.", - examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], - title="Scopes", + description='List of scopes that should be granted to the access token.', + examples=[ + ['crm.list.read', 'crm.objects.contacts.read', 'crm.schema.contacts.read'] + ], + title='Scopes', ) token_expiry_date: Optional[str] = Field( None, - description="The access token expiry date.", - examples=["2023-04-06T07:12:10.421833+00:00", 1680842386], - title="Token Expiry Date", + description='The access token expiry date.', + examples=['2023-04-06T07:12:10.421833+00:00', 1680842386], + title='Token Expiry Date', ) token_expiry_date_format: Optional[str] = Field( None, - description="The format of the time to expiration datetime. Provide it if the time is returned as a date-time string instead of seconds.", - examples=["%Y-%m-%d %H:%M:%S.%f+00:00"], - title="Token Expiry Date Format", + description='The format of the time to expiration datetime. Provide it if the time is returned as a date-time string instead of seconds.', + examples=['%Y-%m-%d %H:%M:%S.%f+00:00'], + title='Token Expiry Date Format', ) refresh_token_updater: Optional[RefreshTokenUpdater] = Field( None, - description="When the token updater is defined, new refresh tokens, access tokens and the access token expiry date are written back from the authentication response to the config object. This is important if the refresh token can only used once.", - title="Token Updater", + description='When the token updater is defined, new refresh tokens, access tokens and the access token expiry date are written back from the authentication response to the config object. This is important if the refresh token can only used once.', + title='Token Updater', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class ExponentialBackoffStrategy(BaseModel): - type: Literal["ExponentialBackoffStrategy"] + type: Literal['ExponentialBackoffStrategy'] factor: Optional[Union[float, str]] = Field( 5, - description="Multiplicative constant applied on each retry.", - examples=[5, 5.5, "10"], - title="Factor", + description='Multiplicative constant applied on each retry.', + examples=[5, 5.5, '10'], + title='Factor', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class SessionTokenRequestBearerAuthenticator(BaseModel): - type: Literal["Bearer"] + type: Literal['Bearer'] class HttpMethodEnum(Enum): - GET = "GET" - POST = "POST" + GET = 'GET' + POST = 'POST' class Action(Enum): - SUCCESS = "SUCCESS" - FAIL = "FAIL" - RETRY = "RETRY" - IGNORE = "IGNORE" + SUCCESS = 'SUCCESS' + FAIL = 'FAIL' + RETRY = 'RETRY' + IGNORE = 'IGNORE' class HttpResponseFilter(BaseModel): - type: Literal["HttpResponseFilter"] + type: Literal['HttpResponseFilter'] action: Action = Field( ..., - description="Action to execute if a response matches the filter.", - examples=["SUCCESS", "FAIL", "RETRY", "IGNORE"], - title="Action", + description='Action to execute if a response matches the filter.', + examples=['SUCCESS', 'FAIL', 'RETRY', 'IGNORE'], + title='Action', ) error_message: Optional[str] = Field( None, - description="Error Message to display if the response matches the filter.", - title="Error Message", + description='Error Message to display if the response matches the filter.', + title='Error Message', ) error_message_contains: Optional[str] = Field( None, - description="Match the response if its error message contains the substring.", - example=["This API operation is not enabled for this site"], - title="Error Message Substring", + description='Match the response if its error message contains the substring.', + example=['This API operation is not enabled for this site'], + title='Error Message Substring', ) http_codes: Optional[List[int]] = Field( None, - description="Match the response if its HTTP code is included in this list.", + description='Match the response if its HTTP code is included in this list.', examples=[[420, 429], [500]], - title="HTTP Codes", + title='HTTP Codes', ) predicate: Optional[str] = Field( None, - description="Match the response if the predicate evaluates to true.", + description='Match the response if the predicate evaluates to true.', examples=[ "{{ 'Too much requests' in response }}", "{{ 'error_code' in response and response['error_code'] == 'ComplexityException' }}", ], - title="Predicate", + title='Predicate', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class InlineSchemaLoader(BaseModel): - type: Literal["InlineSchemaLoader"] + type: Literal['InlineSchemaLoader'] schema_: Optional[Dict[str, Any]] = Field( None, - alias="schema", + alias='schema', description='Describes a streams\' schema. Refer to the Data Types documentation for more details on which types are valid.', - title="Schema", + title='Schema', ) class JsonFileSchemaLoader(BaseModel): - type: Literal["JsonFileSchemaLoader"] + type: Literal['JsonFileSchemaLoader'] file_path: Optional[str] = Field( None, description="Path to the JSON file defining the schema. The path is relative to the connector module's root.", - example=["./schemas/users.json"], - title="File Path", + example=['./schemas/users.json'], + title='File Path', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class JsonDecoder(BaseModel): - type: Literal["JsonDecoder"] + type: Literal['JsonDecoder'] class MinMaxDatetime(BaseModel): - type: Literal["MinMaxDatetime"] + type: Literal['MinMaxDatetime'] datetime: str = Field( ..., - description="Datetime value.", - examples=["2021-01-01", "2021-01-01T00:00:00Z", "{{ config['start_time'] }}"], - title="Datetime", + description='Datetime value.', + examples=['2021-01-01', '2021-01-01T00:00:00Z', "{{ config['start_time'] }}"], + title='Datetime', ) datetime_format: Optional[str] = Field( - "", + '', description='Format of the datetime value. Defaults to "%Y-%m-%dT%H:%M:%S.%f%z" if left empty. Use placeholders starting with "%" to describe the format the API is using. The following placeholders are available:\n * **%s**: Epoch unix timestamp - `1686218963`\n * **%ms**: Epoch unix timestamp - `1686218963123`\n * **%a**: Weekday (abbreviated) - `Sun`\n * **%A**: Weekday (full) - `Sunday`\n * **%w**: Weekday (decimal) - `0` (Sunday), `6` (Saturday)\n * **%d**: Day of the month (zero-padded) - `01`, `02`, ..., `31`\n * **%b**: Month (abbreviated) - `Jan`\n * **%B**: Month (full) - `January`\n * **%m**: Month (zero-padded) - `01`, `02`, ..., `12`\n * **%y**: Year (without century, zero-padded) - `00`, `01`, ..., `99`\n * **%Y**: Year (with century) - `0001`, `0002`, ..., `9999`\n * **%H**: Hour (24-hour, zero-padded) - `00`, `01`, ..., `23`\n * **%I**: Hour (12-hour, zero-padded) - `01`, `02`, ..., `12`\n * **%p**: AM/PM indicator\n * **%M**: Minute (zero-padded) - `00`, `01`, ..., `59`\n * **%S**: Second (zero-padded) - `00`, `01`, ..., `59`\n * **%f**: Microsecond (zero-padded to 6 digits) - `000000`, `000001`, ..., `999999`\n * **%z**: UTC offset - `(empty)`, `+0000`, `-04:00`\n * **%Z**: Time zone name - `(empty)`, `UTC`, `GMT`\n * **%j**: Day of the year (zero-padded) - `001`, `002`, ..., `366`\n * **%U**: Week number of the year (Sunday as first day) - `00`, `01`, ..., `53`\n * **%W**: Week number of the year (Monday as first day) - `00`, `01`, ..., `53`\n * **%c**: Date and time representation - `Tue Aug 16 21:30:00 1988`\n * **%x**: Date representation - `08/16/1988`\n * **%X**: Time representation - `21:30:00`\n * **%%**: Literal \'%\' character\n\n Some placeholders depend on the locale of the underlying system - in most cases this locale is configured as en/US. For more information see the [Python documentation](https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes).\n', - examples=["%Y-%m-%dT%H:%M:%S.%f%z", "%Y-%m-%d", "%s"], - title="Datetime Format", + examples=['%Y-%m-%dT%H:%M:%S.%f%z', '%Y-%m-%d', '%s'], + title='Datetime Format', ) max_datetime: Optional[str] = Field( None, - description="Ceiling applied on the datetime value. Must be formatted with the datetime_format field.", - examples=["2021-01-01T00:00:00Z", "2021-01-01"], - title="Max Datetime", + description='Ceiling applied on the datetime value. Must be formatted with the datetime_format field.', + examples=['2021-01-01T00:00:00Z', '2021-01-01'], + title='Max Datetime', ) min_datetime: Optional[str] = Field( None, - description="Floor applied on the datetime value. Must be formatted with the datetime_format field.", - examples=["2010-01-01T00:00:00Z", "2010-01-01"], - title="Min Datetime", + description='Floor applied on the datetime value. Must be formatted with the datetime_format field.', + examples=['2010-01-01T00:00:00Z', '2010-01-01'], + title='Min Datetime', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class NoAuth(BaseModel): - type: Literal["NoAuth"] - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + type: Literal['NoAuth'] + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class NoPagination(BaseModel): - type: Literal["NoPagination"] + type: Literal['NoPagination'] class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( + oauth_user_input_from_connector_config_specification: Optional[ + Dict[str, Any] + ] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + {'app_id': {'type': 'string', 'path_in_connector_config': ['app_id']}}, { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], + 'app_id': { + 'type': 'string', + 'path_in_connector_config': ['info', 'app_id'], } }, ], - title="OAuth user input", + title='OAuth user input', ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations produced by the OAuth flows as they are\nreturned by the distant OAuth APIs.\nMust be a valid JSON describing the fields to merge back to `ConnectorSpecification.connectionSpecification`.\nFor each field, a special annotation `path_in_connector_config` can be specified to determine where to merge it,\nExamples:\n complete_oauth_output_specification={\n refresh_token: {\n type: string,\n path_in_connector_config: ['credentials', 'refresh_token']\n }\n }", examples=[ { - "refresh_token": { - "type": "string,", - "path_in_connector_config": ["credentials", "refresh_token"], + 'refresh_token': { + 'type': 'string,', + 'path_in_connector_config': ['credentials', 'refresh_token'], } } ], - title="OAuth output specification", + title='OAuth output specification', ) complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], - title="OAuth input specification", + description='OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }', + examples=[ + {'client_id': {'type': 'string'}, 'client_secret': {'type': 'string'}} + ], + title='OAuth input specification', ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations that\nalso need to be merged back into the connector configuration at runtime.\nThis is a subset configuration of `complete_oauth_server_input_specification` that filters fields out to retain only the ones that\nare necessary for the connector to function with OAuth. (some fields could be used during oauth flows but not needed afterwards, therefore\nthey would be listed in the `complete_oauth_server_input_specification` but not `complete_oauth_server_output_specification`)\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nconnector when using OAuth flow APIs.\nThese fields are to be merged back to `ConnectorSpecification.connectionSpecification`.\nFor each field, a special annotation `path_in_connector_config` can be specified to determine where to merge it,\nExamples:\n complete_oauth_server_output_specification={\n client_id: {\n type: string,\n path_in_connector_config: ['credentials', 'client_id']\n },\n client_secret: {\n type: string,\n path_in_connector_config: ['credentials', 'client_secret']\n }\n }", examples=[ { - "client_id": { - "type": "string,", - "path_in_connector_config": ["credentials", "client_id"], + 'client_id': { + 'type': 'string,', + 'path_in_connector_config': ['credentials', 'client_id'], }, - "client_secret": { - "type": "string,", - "path_in_connector_config": ["credentials", "client_secret"], + 'client_secret': { + 'type': 'string,', + 'path_in_connector_config': ['credentials', 'client_secret'], }, } ], - title="OAuth server output specification", + title='OAuth server output specification', ) class OffsetIncrement(BaseModel): - type: Literal["OffsetIncrement"] + type: Literal['OffsetIncrement'] page_size: Optional[Union[int, str]] = Field( None, - description="The number of records to include in each pages.", + description='The number of records to include in each pages.', examples=[100, "{{ config['page_size'] }}"], - title="Limit", + title='Limit', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class PageIncrement(BaseModel): - type: Literal["PageIncrement"] + type: Literal['PageIncrement'] page_size: Optional[int] = Field( None, - description="The number of records to include in each pages.", - examples=[100, "100"], - title="Page Size", + description='The number of records to include in each pages.', + examples=[100, '100'], + title='Page Size', ) start_from_page: Optional[int] = Field( 0, - description="Index of the first page to request.", + description='Index of the first page to request.', examples=[0, 1], - title="Start From Page", + title='Start From Page', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class PrimaryKey(BaseModel): __root__: Union[str, List[str], List[List[str]]] = Field( ..., - description="The stream field to be used to distinguish unique records. Can either be a single field, an array of fields representing a composite key, or an array of arrays representing a composite key where the fields are nested fields.", - examples=["id", ["code", "type"]], - title="Primary Key", + description='The stream field to be used to distinguish unique records. Can either be a single field, an array of fields representing a composite key, or an array of arrays representing a composite key where the fields are nested fields.', + examples=['id', ['code', 'type']], + title='Primary Key', ) class RecordFilter(BaseModel): - type: Literal["RecordFilter"] + type: Literal['RecordFilter'] condition: Optional[str] = Field( - "", - description="The predicate to filter a record. Records will be removed if evaluated to False.", + '', + description='The predicate to filter a record. Records will be removed if evaluated to False.', examples=[ "{{ record['created_at'] >= stream_interval['start_time'] }}", "{{ record.status in ['active', 'expired'] }}", ], ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class RemoveFields(BaseModel): - type: Literal["RemoveFields"] + type: Literal['RemoveFields'] field_pointers: List[List[str]] = Field( ..., - description="Array of paths defining the field to remove. Each item is an array whose field describe the path of a field to remove.", - examples=[["tags"], [["content", "html"], ["content", "plain_text"]]], - title="Field Paths", + description='Array of paths defining the field to remove. Each item is an array whose field describe the path of a field to remove.', + examples=[['tags'], [['content', 'html'], ['content', 'plain_text']]], + title='Field Paths', ) class RequestPath(BaseModel): - type: Literal["RequestPath"] + type: Literal['RequestPath'] class InjectInto(Enum): - request_parameter = "request_parameter" - header = "header" - body_data = "body_data" - body_json = "body_json" + request_parameter = 'request_parameter' + header = 'header' + body_data = 'body_data' + body_json = 'body_json' class RequestOption(BaseModel): - type: Literal["RequestOption"] + type: Literal['RequestOption'] field_name: str = Field( ..., - description="Configures which key should be used in the location that the descriptor is being injected into", - examples=["segment_id"], - title="Request Option", + description='Configures which key should be used in the location that the descriptor is being injected into', + examples=['segment_id'], + title='Request Option', ) inject_into: InjectInto = Field( ..., - description="Configures where the descriptor should be set on the HTTP requests. Note that request parameters that are already encoded in the URL path will not be duplicated.", - examples=["request_parameter", "header", "body_data", "body_json"], - title="Inject Into", + description='Configures where the descriptor should be set on the HTTP requests. Note that request parameters that are already encoded in the URL path will not be duplicated.', + examples=['request_parameter', 'header', 'body_data', 'body_json'], + title='Inject Into', ) @@ -632,251 +638,253 @@ class Config: class LegacySessionTokenAuthenticator(BaseModel): - type: Literal["LegacySessionTokenAuthenticator"] + type: Literal['LegacySessionTokenAuthenticator'] header: str = Field( ..., - description="The name of the session token header that will be injected in the request", - examples=["X-Session"], - title="Session Request Header", + description='The name of the session token header that will be injected in the request', + examples=['X-Session'], + title='Session Request Header', ) login_url: str = Field( ..., - description="Path of the login URL (do not include the base URL)", - examples=["session"], - title="Login Path", + description='Path of the login URL (do not include the base URL)', + examples=['session'], + title='Login Path', ) session_token: Optional[str] = Field( None, - description="Session token to use if using a pre-defined token. Not needed if authenticating with username + password pair", + description='Session token to use if using a pre-defined token. Not needed if authenticating with username + password pair', example=["{{ config['session_token'] }}"], - title="Session Token", + title='Session Token', ) session_token_response_key: str = Field( ..., - description="Name of the key of the session token to be extracted from the response", - examples=["id"], - title="Response Token Response Key", + description='Name of the key of the session token to be extracted from the response', + examples=['id'], + title='Response Token Response Key', ) username: Optional[str] = Field( None, - description="Username used to authenticate and obtain a session token", + description='Username used to authenticate and obtain a session token', examples=[" {{ config['username'] }}"], - title="Username", + title='Username', ) password: Optional[str] = Field( - "", - description="Password used to authenticate and obtain a session token", - examples=["{{ config['password'] }}", ""], - title="Password", + '', + description='Password used to authenticate and obtain a session token', + examples=["{{ config['password'] }}", ''], + title='Password', ) validate_session_url: str = Field( ..., - description="Path of the URL to use to validate that the session token is valid (do not include the base URL)", - examples=["user/current"], - title="Validate Session Path", + description='Path of the URL to use to validate that the session token is valid (do not include the base URL)', + examples=['user/current'], + title='Validate Session Path', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class WaitTimeFromHeader(BaseModel): - type: Literal["WaitTimeFromHeader"] + type: Literal['WaitTimeFromHeader'] header: str = Field( ..., - description="The name of the response header defining how long to wait before retrying.", - examples=["Retry-After"], - title="Response Header Name", + description='The name of the response header defining how long to wait before retrying.', + examples=['Retry-After'], + title='Response Header Name', ) regex: Optional[str] = Field( None, - description="Optional regex to apply on the header to extract its value. The regex should define a capture group defining the wait time.", - examples=["([-+]?\\d+)"], - title="Extraction Regex", + description='Optional regex to apply on the header to extract its value. The regex should define a capture group defining the wait time.', + examples=['([-+]?\\d+)'], + title='Extraction Regex', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class WaitUntilTimeFromHeader(BaseModel): - type: Literal["WaitUntilTimeFromHeader"] + type: Literal['WaitUntilTimeFromHeader'] header: str = Field( ..., - description="The name of the response header defining how long to wait before retrying.", - examples=["wait_time"], - title="Response Header", + description='The name of the response header defining how long to wait before retrying.', + examples=['wait_time'], + title='Response Header', ) min_wait: Optional[Union[float, str]] = Field( None, - description="Minimum time to wait before retrying.", - examples=[10, "60"], - title="Minimum Wait Time", + description='Minimum time to wait before retrying.', + examples=[10, '60'], + title='Minimum Wait Time', ) regex: Optional[str] = Field( None, - description="Optional regex to apply on the header to extract its value. The regex should define a capture group defining the wait time.", - examples=["([-+]?\\d+)"], - title="Extraction Regex", + description='Optional regex to apply on the header to extract its value. The regex should define a capture group defining the wait time.', + examples=['([-+]?\\d+)'], + title='Extraction Regex', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class ApiKeyAuthenticator(BaseModel): - type: Literal["ApiKeyAuthenticator"] + type: Literal['ApiKeyAuthenticator'] api_token: Optional[str] = Field( None, - description="The API key to inject in the request. Fill it in the user inputs.", + description='The API key to inject in the request. Fill it in the user inputs.', examples=["{{ config['api_key'] }}", "Token token={{ config['api_key'] }}"], - title="API Key", + title='API Key', ) header: Optional[str] = Field( None, - description="The name of the HTTP header that will be set to the API key. This setting is deprecated, use inject_into instead. Header and inject_into can not be defined at the same time.", - examples=["Authorization", "Api-Token", "X-Auth-Token"], - title="Header Name", + description='The name of the HTTP header that will be set to the API key. This setting is deprecated, use inject_into instead. Header and inject_into can not be defined at the same time.', + examples=['Authorization', 'Api-Token', 'X-Auth-Token'], + title='Header Name', ) inject_into: Optional[RequestOption] = Field( None, - description="Configure how the API Key will be sent in requests to the source API. Either inject_into or header has to be defined.", + description='Configure how the API Key will be sent in requests to the source API. Either inject_into or header has to be defined.', examples=[ - {"inject_into": "header", "field_name": "Authorization"}, - {"inject_into": "request_parameter", "field_name": "authKey"}, + {'inject_into': 'header', 'field_name': 'Authorization'}, + {'inject_into': 'request_parameter', 'field_name': 'authKey'}, ], - title="Inject API Key Into Outgoing HTTP Request", + title='Inject API Key Into Outgoing HTTP Request', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class AuthFlow(BaseModel): - auth_flow_type: Optional[AuthFlowType] = Field(None, description="The type of auth to use", title="Auth flow type") + auth_flow_type: Optional[AuthFlowType] = Field( + None, description='The type of auth to use', title='Auth flow type' + ) predicate_key: Optional[List[str]] = Field( None, - description="JSON path to a field in the connectorSpecification that should exist for the advanced auth to be applicable.", - examples=[["credentials", "auth_type"]], - title="Predicate key", + description='JSON path to a field in the connectorSpecification that should exist for the advanced auth to be applicable.', + examples=[['credentials', 'auth_type']], + title='Predicate key', ) predicate_value: Optional[str] = Field( None, - description="Value of the predicate_key fields for the advanced auth to be applicable.", - examples=["Oauth"], - title="Predicate value", + description='Value of the predicate_key fields for the advanced auth to be applicable.', + examples=['Oauth'], + title='Predicate value', ) oauth_config_specification: Optional[OAuthConfigSpecification] = None class CursorPagination(BaseModel): - type: Literal["CursorPagination"] + type: Literal['CursorPagination'] cursor_value: str = Field( ..., - description="Value of the cursor defining the next page to fetch.", + description='Value of the cursor defining the next page to fetch.', examples=[ - "{{ headers.link.next.cursor }}", + '{{ headers.link.next.cursor }}', "{{ last_records[-1]['key'] }}", "{{ response['nextPage'] }}", ], - title="Cursor Value", + title='Cursor Value', ) page_size: Optional[int] = Field( None, - description="The number of records to include in each pages.", + description='The number of records to include in each pages.', examples=[100], - title="Page Size", + title='Page Size', ) stop_condition: Optional[str] = Field( None, - description="Template string evaluating when to stop paginating.", + description='Template string evaluating when to stop paginating.', examples=[ - "{{ response.data.has_more is false }}", + '{{ response.data.has_more is false }}', "{{ 'next' not in headers['link'] }}", ], - title="Stop Condition", + title='Stop Condition', ) decoder: Optional[JsonDecoder] = Field( None, - description="Component decoding the response so records can be extracted.", - title="Decoder", + description='Component decoding the response so records can be extracted.', + title='Decoder', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class DatetimeBasedCursor(BaseModel): - type: Literal["DatetimeBasedCursor"] + type: Literal['DatetimeBasedCursor'] cursor_field: str = Field( ..., - description="The location of the value on a record that will be used as a bookmark during sync. To ensure no data loss, the API must return records in ascending order based on the cursor field. Nested fields are not supported, so the field must be at the top level of the record. You can use a combination of Add Field and Remove Field transformations to move the nested field to the top.", - examples=["created_at", "{{ config['record_cursor'] }}"], - title="Cursor Field", + description='The location of the value on a record that will be used as a bookmark during sync. To ensure no data loss, the API must return records in ascending order based on the cursor field. Nested fields are not supported, so the field must be at the top level of the record. You can use a combination of Add Field and Remove Field transformations to move the nested field to the top.', + examples=['created_at', "{{ config['record_cursor'] }}"], + title='Cursor Field', ) datetime_format: str = Field( ..., - description="The datetime format used to format the datetime values that are sent in outgoing requests to the API. Use placeholders starting with \"%\" to describe the format the API is using. The following placeholders are available:\n * **%s**: Epoch unix timestamp - `1686218963`\n * **%ms**: Epoch unix timestamp (milliseconds) - `1686218963123`\n * **%a**: Weekday (abbreviated) - `Sun`\n * **%A**: Weekday (full) - `Sunday`\n * **%w**: Weekday (decimal) - `0` (Sunday), `6` (Saturday)\n * **%d**: Day of the month (zero-padded) - `01`, `02`, ..., `31`\n * **%b**: Month (abbreviated) - `Jan`\n * **%B**: Month (full) - `January`\n * **%m**: Month (zero-padded) - `01`, `02`, ..., `12`\n * **%y**: Year (without century, zero-padded) - `00`, `01`, ..., `99`\n * **%Y**: Year (with century) - `0001`, `0002`, ..., `9999`\n * **%H**: Hour (24-hour, zero-padded) - `00`, `01`, ..., `23`\n * **%I**: Hour (12-hour, zero-padded) - `01`, `02`, ..., `12`\n * **%p**: AM/PM indicator\n * **%M**: Minute (zero-padded) - `00`, `01`, ..., `59`\n * **%S**: Second (zero-padded) - `00`, `01`, ..., `59`\n * **%f**: Microsecond (zero-padded to 6 digits) - `000000`\n * **%z**: UTC offset - `(empty)`, `+0000`, `-04:00`\n * **%Z**: Time zone name - `(empty)`, `UTC`, `GMT`\n * **%j**: Day of the year (zero-padded) - `001`, `002`, ..., `366`\n * **%U**: Week number of the year (starting Sunday) - `00`, ..., `53`\n * **%W**: Week number of the year (starting Monday) - `00`, ..., `53`\n * **%c**: Date and time - `Tue Aug 16 21:30:00 1988`\n * **%x**: Date standard format - `08/16/1988`\n * **%X**: Time standard format - `21:30:00`\n * **%%**: Literal '%' character\n\n Some placeholders depend on the locale of the underlying system - in most cases this locale is configured as en/US. For more information see the [Python documentation](https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes).\n", - examples=["%Y-%m-%dT%H:%M:%S.%f%z", "%Y-%m-%d", "%s", "%ms"], - title="Outgoing Datetime Format", + description='The datetime format used to format the datetime values that are sent in outgoing requests to the API. Use placeholders starting with "%" to describe the format the API is using. The following placeholders are available:\n * **%s**: Epoch unix timestamp - `1686218963`\n * **%ms**: Epoch unix timestamp (milliseconds) - `1686218963123`\n * **%a**: Weekday (abbreviated) - `Sun`\n * **%A**: Weekday (full) - `Sunday`\n * **%w**: Weekday (decimal) - `0` (Sunday), `6` (Saturday)\n * **%d**: Day of the month (zero-padded) - `01`, `02`, ..., `31`\n * **%b**: Month (abbreviated) - `Jan`\n * **%B**: Month (full) - `January`\n * **%m**: Month (zero-padded) - `01`, `02`, ..., `12`\n * **%y**: Year (without century, zero-padded) - `00`, `01`, ..., `99`\n * **%Y**: Year (with century) - `0001`, `0002`, ..., `9999`\n * **%H**: Hour (24-hour, zero-padded) - `00`, `01`, ..., `23`\n * **%I**: Hour (12-hour, zero-padded) - `01`, `02`, ..., `12`\n * **%p**: AM/PM indicator\n * **%M**: Minute (zero-padded) - `00`, `01`, ..., `59`\n * **%S**: Second (zero-padded) - `00`, `01`, ..., `59`\n * **%f**: Microsecond (zero-padded to 6 digits) - `000000`\n * **%z**: UTC offset - `(empty)`, `+0000`, `-04:00`\n * **%Z**: Time zone name - `(empty)`, `UTC`, `GMT`\n * **%j**: Day of the year (zero-padded) - `001`, `002`, ..., `366`\n * **%U**: Week number of the year (starting Sunday) - `00`, ..., `53`\n * **%W**: Week number of the year (starting Monday) - `00`, ..., `53`\n * **%c**: Date and time - `Tue Aug 16 21:30:00 1988`\n * **%x**: Date standard format - `08/16/1988`\n * **%X**: Time standard format - `21:30:00`\n * **%%**: Literal \'%\' character\n\n Some placeholders depend on the locale of the underlying system - in most cases this locale is configured as en/US. For more information see the [Python documentation](https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes).\n', + examples=['%Y-%m-%dT%H:%M:%S.%f%z', '%Y-%m-%d', '%s', '%ms'], + title='Outgoing Datetime Format', ) start_datetime: Union[str, MinMaxDatetime] = Field( ..., - description="The datetime that determines the earliest record that should be synced.", - examples=["2020-01-1T00:00:00Z", "{{ config['start_time'] }}"], - title="Start Datetime", + description='The datetime that determines the earliest record that should be synced.', + examples=['2020-01-1T00:00:00Z', "{{ config['start_time'] }}"], + title='Start Datetime', ) cursor_datetime_formats: Optional[List[str]] = Field( None, - description="The possible formats for the cursor field, in order of preference. The first format that matches the cursor field value will be used to parse it. If not provided, the `datetime_format` will be used.", - title="Cursor Datetime Formats", + description='The possible formats for the cursor field, in order of preference. The first format that matches the cursor field value will be used to parse it. If not provided, the `datetime_format` will be used.', + title='Cursor Datetime Formats', ) cursor_granularity: Optional[str] = Field( None, - description="Smallest increment the datetime_format has (ISO 8601 duration) that is used to ensure the start of a slice does not overlap with the end of the previous one, e.g. for %Y-%m-%d the granularity should be P1D, for %Y-%m-%dT%H:%M:%SZ the granularity should be PT1S. Given this field is provided, `step` needs to be provided as well.", - examples=["PT1S"], - title="Cursor Granularity", + description='Smallest increment the datetime_format has (ISO 8601 duration) that is used to ensure the start of a slice does not overlap with the end of the previous one, e.g. for %Y-%m-%d the granularity should be P1D, for %Y-%m-%dT%H:%M:%SZ the granularity should be PT1S. Given this field is provided, `step` needs to be provided as well.', + examples=['PT1S'], + title='Cursor Granularity', ) end_datetime: Optional[Union[str, MinMaxDatetime]] = Field( None, - description="The datetime that determines the last record that should be synced. If not provided, `{{ now_utc() }}` will be used.", - examples=["2021-01-1T00:00:00Z", "{{ now_utc() }}", "{{ day_delta(-1) }}"], - title="End Datetime", + description='The datetime that determines the last record that should be synced. If not provided, `{{ now_utc() }}` will be used.', + examples=['2021-01-1T00:00:00Z', '{{ now_utc() }}', '{{ day_delta(-1) }}'], + title='End Datetime', ) end_time_option: Optional[RequestOption] = Field( None, - description="Optionally configures how the end datetime will be sent in requests to the source API.", - title="Inject End Time Into Outgoing HTTP Request", + description='Optionally configures how the end datetime will be sent in requests to the source API.', + title='Inject End Time Into Outgoing HTTP Request', ) is_data_feed: Optional[bool] = Field( None, - description="A data feed API is an API that does not allow filtering and paginates the content from the most recent to the least recent. Given this, the CDK needs to know when to stop paginating and this field will generate a stop condition for pagination.", - title="Whether the target API is formatted as a data feed", + description='A data feed API is an API that does not allow filtering and paginates the content from the most recent to the least recent. Given this, the CDK needs to know when to stop paginating and this field will generate a stop condition for pagination.', + title='Whether the target API is formatted as a data feed', ) lookback_window: Optional[str] = Field( None, - description="Time interval before the start_datetime to read data for, e.g. P1M for looking back one month.", - examples=["P1D", "P{{ config['lookback_days'] }}D"], - title="Lookback Window", + description='Time interval before the start_datetime to read data for, e.g. P1M for looking back one month.', + examples=['P1D', "P{{ config['lookback_days'] }}D"], + title='Lookback Window', ) partition_field_end: Optional[str] = Field( None, - description="Name of the partition start time field.", - examples=["ending_time"], - title="Partition Field End", + description='Name of the partition start time field.', + examples=['ending_time'], + title='Partition Field End', ) partition_field_start: Optional[str] = Field( None, - description="Name of the partition end time field.", - examples=["starting_time"], - title="Partition Field Start", + description='Name of the partition end time field.', + examples=['starting_time'], + title='Partition Field Start', ) start_time_option: Optional[RequestOption] = Field( None, - description="Optionally configures how the start datetime will be sent in requests to the source API.", - title="Inject Start Time Into Outgoing HTTP Request", + description='Optionally configures how the start datetime will be sent in requests to the source API.', + title='Inject Start Time Into Outgoing HTTP Request', ) step: Optional[str] = Field( None, - description="The size of the time window (ISO8601 duration). Given this field is provided, `cursor_granularity` needs to be provided as well.", - examples=["P1W", "{{ config['step_increment'] }}"], - title="Step", + description='The size of the time window (ISO8601 duration). Given this field is provided, `cursor_granularity` needs to be provided as well.', + examples=['P1W', "{{ config['step_increment'] }}"], + title='Step', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class DefaultErrorHandler(BaseModel): - type: Literal["DefaultErrorHandler"] + type: Literal['DefaultErrorHandler'] backoff_strategies: Optional[ List[ Union[ @@ -889,142 +897,144 @@ class DefaultErrorHandler(BaseModel): ] ] = Field( None, - description="List of backoff strategies to use to determine how long to wait before retrying a retryable request.", - title="Backoff Strategies", + description='List of backoff strategies to use to determine how long to wait before retrying a retryable request.', + title='Backoff Strategies', ) max_retries: Optional[int] = Field( 5, - description="The maximum number of time to retry a retryable request before giving up and failing.", + description='The maximum number of time to retry a retryable request before giving up and failing.', examples=[5, 0, 10], - title="Max Retry Count", + title='Max Retry Count', ) response_filters: Optional[List[HttpResponseFilter]] = Field( None, description="List of response filters to iterate on when deciding how to handle an error. When using an array of multiple filters, the filters will be applied sequentially and the response will be selected if it matches any of the filter's predicate.", - title="Response Filters", + title='Response Filters', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class DefaultPaginator(BaseModel): - type: Literal["DefaultPaginator"] - pagination_strategy: Union[CursorPagination, CustomPaginationStrategy, OffsetIncrement, PageIncrement] = Field( + type: Literal['DefaultPaginator'] + pagination_strategy: Union[ + CursorPagination, CustomPaginationStrategy, OffsetIncrement, PageIncrement + ] = Field( ..., - description="Strategy defining how records are paginated.", - title="Pagination Strategy", + description='Strategy defining how records are paginated.', + title='Pagination Strategy', ) decoder: Optional[JsonDecoder] = Field( None, - description="Component decoding the response so records can be extracted.", - title="Decoder", + description='Component decoding the response so records can be extracted.', + title='Decoder', ) page_size_option: Optional[RequestOption] = None page_token_option: Optional[Union[RequestOption, RequestPath]] = None - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class DpathExtractor(BaseModel): - type: Literal["DpathExtractor"] + type: Literal['DpathExtractor'] field_path: List[str] = Field( ..., description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).', examples=[ - ["data"], - ["data", "records"], - ["data", "{{ parameters.name }}"], - ["data", "*", "record"], + ['data'], + ['data', 'records'], + ['data', '{{ parameters.name }}'], + ['data', '*', 'record'], ], - title="Field Path", + title='Field Path', ) decoder: Optional[JsonDecoder] = Field( None, - description="Component decoding the response so records can be extracted.", - title="Decoder", + description='Component decoding the response so records can be extracted.', + title='Decoder', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class SessionTokenRequestApiKeyAuthenticator(BaseModel): - type: Literal["ApiKey"] + type: Literal['ApiKey'] inject_into: RequestOption = Field( ..., - description="Configure how the API Key will be sent in requests to the source API.", + description='Configure how the API Key will be sent in requests to the source API.', examples=[ - {"inject_into": "header", "field_name": "Authorization"}, - {"inject_into": "request_parameter", "field_name": "authKey"}, + {'inject_into': 'header', 'field_name': 'Authorization'}, + {'inject_into': 'request_parameter', 'field_name': 'authKey'}, ], - title="Inject API Key Into Outgoing HTTP Request", + title='Inject API Key Into Outgoing HTTP Request', ) class ListPartitionRouter(BaseModel): - type: Literal["ListPartitionRouter"] + type: Literal['ListPartitionRouter'] cursor_field: str = Field( ..., description='While iterating over list values, the name of field used to reference a list value. The partition value can be accessed with string interpolation. e.g. "{{ stream_partition[\'my_key\'] }}" where "my_key" is the value of the cursor_field.', - examples=["section", "{{ config['section_key'] }}"], - title="Current Partition Value Identifier", + examples=['section', "{{ config['section_key'] }}"], + title='Current Partition Value Identifier', ) values: Union[str, List[str]] = Field( ..., - description="The list of attributes being iterated over and used as input for the requests made to the source API.", - examples=[["section_a", "section_b", "section_c"], "{{ config['sections'] }}"], - title="Partition Values", + description='The list of attributes being iterated over and used as input for the requests made to the source API.', + examples=[['section_a', 'section_b', 'section_c'], "{{ config['sections'] }}"], + title='Partition Values', ) request_option: Optional[RequestOption] = Field( None, - description="A request option describing where the list value should be injected into and under what field name if applicable.", - title="Inject Partition Value Into Outgoing HTTP Request", + description='A request option describing where the list value should be injected into and under what field name if applicable.', + title='Inject Partition Value Into Outgoing HTTP Request', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class RecordSelector(BaseModel): - type: Literal["RecordSelector"] + type: Literal['RecordSelector'] extractor: Union[CustomRecordExtractor, DpathExtractor] record_filter: Optional[RecordFilter] = Field( None, - description="Responsible for filtering records to be emitted by the Source.", - title="Record Filter", + description='Responsible for filtering records to be emitted by the Source.', + title='Record Filter', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class Spec(BaseModel): - type: Literal["Spec"] + type: Literal['Spec'] connection_specification: Dict[str, Any] = Field( ..., - description="A connection specification describing how a the connector can be configured.", - title="Connection Specification", + description='A connection specification describing how a the connector can be configured.', + title='Connection Specification', ) documentation_url: Optional[str] = Field( None, description="URL of the connector's documentation page.", - examples=["https://docs.airbyte.com/integrations/sources/dremio"], - title="Documentation URL", + examples=['https://docs.airbyte.com/integrations/sources/dremio'], + title='Documentation URL', ) advanced_auth: Optional[AuthFlow] = Field( None, - description="Advanced specification for configuring the authentication flow.", - title="Advanced Auth", + description='Advanced specification for configuring the authentication flow.', + title='Advanced Auth', ) class CompositeErrorHandler(BaseModel): - type: Literal["CompositeErrorHandler"] + type: Literal['CompositeErrorHandler'] error_handlers: List[Union[CompositeErrorHandler, DefaultErrorHandler]] = Field( ..., - description="List of error handlers to iterate on to determine how to handle a failed response.", - title="Error Handlers", + description='List of error handlers to iterate on to determine how to handle a failed response.', + title='Error Handlers', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class DeclarativeSource(BaseModel): class Config: extra = Extra.forbid - type: Literal["DeclarativeSource"] + type: Literal['DeclarativeSource'] check: CheckStream streams: List[DeclarativeStream] version: str @@ -1033,7 +1043,7 @@ class Config: spec: Optional[Spec] = None metadata: Optional[Dict[str, Any]] = Field( None, - description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.", + description='For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.', ) @@ -1041,91 +1051,101 @@ class DeclarativeStream(BaseModel): class Config: extra = Extra.allow - type: Literal["DeclarativeStream"] + type: Literal['DeclarativeStream'] retriever: Union[CustomRetriever, SimpleRetriever] = Field( ..., - description="Component used to coordinate how records are extracted across stream slices and request pages.", - title="Retriever", + description='Component used to coordinate how records are extracted across stream slices and request pages.', + title='Retriever', ) - incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field( + incremental_sync: Optional[ + Union[CustomIncrementalSync, DatetimeBasedCursor] + ] = Field( None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", + description='Component used to fetch data incrementally based on a time field in the data.', + title='Incremental Sync', + ) + name: Optional[str] = Field( + '', description='The stream name.', example=['Users'], title='Name' + ) + primary_key: Optional[PrimaryKey] = Field( + '', description='The primary key of the stream.', title='Primary Key' ) - name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") - primary_key: Optional[PrimaryKey] = Field("", description="The primary key of the stream.", title="Primary Key") schema_loader: Optional[Union[InlineSchemaLoader, JsonFileSchemaLoader]] = Field( None, - description="Component used to retrieve the schema for the current stream.", - title="Schema Loader", + description='Component used to retrieve the schema for the current stream.', + title='Schema Loader', ) - transformations: Optional[List[Union[AddFields, CustomTransformation, RemoveFields]]] = Field( + transformations: Optional[ + List[Union[AddFields, CustomTransformation, RemoveFields]] + ] = Field( None, - description="A list of transformations to be applied to each output record.", - title="Transformations", + description='A list of transformations to be applied to each output record.', + title='Transformations', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class SessionTokenAuthenticator(BaseModel): - type: Literal["SessionTokenAuthenticator"] + type: Literal['SessionTokenAuthenticator'] login_requester: HttpRequester = Field( ..., - description="Description of the request to perform to obtain a session token to perform data requests. The response body is expected to be a JSON object with a session token property.", + description='Description of the request to perform to obtain a session token to perform data requests. The response body is expected to be a JSON object with a session token property.', examples=[ { - "type": "HttpRequester", - "url_base": "https://my_api.com", - "path": "/login", - "authenticator": { - "type": "BasicHttpAuthenticator", - "username": "{{ config.username }}", - "password": "{{ config.password }}", + 'type': 'HttpRequester', + 'url_base': 'https://my_api.com', + 'path': '/login', + 'authenticator': { + 'type': 'BasicHttpAuthenticator', + 'username': '{{ config.username }}', + 'password': '{{ config.password }}', }, } ], - title="Login Requester", + title='Login Requester', ) session_token_path: List[str] = Field( ..., - description="The path in the response body returned from the login requester to the session token.", - examples=[["access_token"], ["result", "token"]], - title="Session Token Path", + description='The path in the response body returned from the login requester to the session token.', + examples=[['access_token'], ['result', 'token']], + title='Session Token Path', ) expiration_duration: Optional[str] = Field( None, - description="The duration in ISO 8601 duration notation after which the session token expires, starting from the time it was obtained. Omitting it will result in the session token being refreshed for every request.", - examples=["PT1H", "P1D"], - title="Expiration Duration", + description='The duration in ISO 8601 duration notation after which the session token expires, starting from the time it was obtained. Omitting it will result in the session token being refreshed for every request.', + examples=['PT1H', 'P1D'], + title='Expiration Duration', ) - request_authentication: Union[SessionTokenRequestApiKeyAuthenticator, SessionTokenRequestBearerAuthenticator] = Field( + request_authentication: Union[ + SessionTokenRequestApiKeyAuthenticator, SessionTokenRequestBearerAuthenticator + ] = Field( ..., - description="Authentication method to use for requests sent to the API, specifying how to inject the session token.", - title="Data Request Authentication", + description='Authentication method to use for requests sent to the API, specifying how to inject the session token.', + title='Data Request Authentication', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class HttpRequester(BaseModel): - type: Literal["HttpRequester"] + type: Literal['HttpRequester'] url_base: str = Field( ..., - description="Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.", + description='Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.', examples=[ - "https://connect.squareup.com/v2", + 'https://connect.squareup.com/v2', "{{ config['base_url'] or 'https://app.posthog.com'}}/api/", ], - title="API Base URL", + title='API Base URL', ) path: str = Field( ..., - description="Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.", + description='Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.', examples=[ - "/products", + '/products', "/quotes/{{ stream_partition['id'] }}/quote_line_groups", "/trades/{{ config['symbol_id'] }}/history", ], - title="URL Path", + title='URL Path', ) authenticator: Optional[ Union[ @@ -1140,92 +1160,96 @@ class HttpRequester(BaseModel): ] ] = Field( None, - description="Authentication method to use for requests sent to the API.", - title="Authenticator", + description='Authentication method to use for requests sent to the API.', + title='Authenticator', ) - error_handler: Optional[Union[DefaultErrorHandler, CustomErrorHandler, CompositeErrorHandler]] = Field( + error_handler: Optional[ + Union[DefaultErrorHandler, CustomErrorHandler, CompositeErrorHandler] + ] = Field( None, - description="Error handler component that defines how to handle errors.", - title="Error Handler", + description='Error handler component that defines how to handle errors.', + title='Error Handler', ) http_method: Optional[Union[str, HttpMethodEnum]] = Field( - "GET", - description="The HTTP method used to fetch data from the source (can be GET or POST).", - examples=["GET", "POST"], - title="HTTP Method", + 'GET', + description='The HTTP method used to fetch data from the source (can be GET or POST).', + examples=['GET', 'POST'], + title='HTTP Method', ) request_body_data: Optional[Union[str, Dict[str, str]]] = Field( None, - description="Specifies how to populate the body of the request with a non-JSON payload. Plain text will be sent as is, whereas objects will be converted to a urlencoded form.", + description='Specifies how to populate the body of the request with a non-JSON payload. Plain text will be sent as is, whereas objects will be converted to a urlencoded form.', examples=[ '[{"clause": {"type": "timestamp", "operator": 10, "parameters":\n [{"value": {{ stream_interval[\'start_time\'] | int * 1000 }} }]\n }, "orderBy": 1, "columnName": "Timestamp"}]/\n' ], - title="Request Body Payload (Non-JSON)", + title='Request Body Payload (Non-JSON)', ) request_body_json: Optional[Union[str, Dict[str, Any]]] = Field( None, - description="Specifies how to populate the body of the request with a JSON payload. Can contain nested objects.", + description='Specifies how to populate the body of the request with a JSON payload. Can contain nested objects.', examples=[ - {"sort_order": "ASC", "sort_field": "CREATED_AT"}, - {"key": "{{ config['value'] }}"}, - {"sort": {"field": "updated_at", "order": "ascending"}}, + {'sort_order': 'ASC', 'sort_field': 'CREATED_AT'}, + {'key': "{{ config['value'] }}"}, + {'sort': {'field': 'updated_at', 'order': 'ascending'}}, ], - title="Request Body JSON Payload", + title='Request Body JSON Payload', ) request_headers: Optional[Union[str, Dict[str, str]]] = Field( None, - description="Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.", - examples=[{"Output-Format": "JSON"}, {"Version": "{{ config['version'] }}"}], - title="Request Headers", + description='Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.', + examples=[{'Output-Format': 'JSON'}, {'Version': "{{ config['version'] }}"}], + title='Request Headers', ) request_parameters: Optional[Union[str, Dict[str, str]]] = Field( None, - description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", + description='Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.', examples=[ - {"unit": "day"}, + {'unit': 'day'}, { - "query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' + 'query': 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' }, - {"searchIn": "{{ ','.join(config.get('search_in', [])) }}"}, - {"sort_by[asc]": "updated_at"}, + {'searchIn': "{{ ','.join(config.get('search_in', [])) }}"}, + {'sort_by[asc]': 'updated_at'}, ], - title="Query Parameters", + title='Query Parameters', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class ParentStreamConfig(BaseModel): - type: Literal["ParentStreamConfig"] + type: Literal['ParentStreamConfig'] parent_key: str = Field( ..., - description="The primary key of records from the parent stream that will be used during the retrieval of records for the current substream. This parent identifier field is typically a characteristic of the child records being extracted from the source API.", - examples=["id", "{{ config['parent_record_id'] }}"], - title="Parent Key", + description='The primary key of records from the parent stream that will be used during the retrieval of records for the current substream. This parent identifier field is typically a characteristic of the child records being extracted from the source API.', + examples=['id', "{{ config['parent_record_id'] }}"], + title='Parent Key', + ) + stream: DeclarativeStream = Field( + ..., description='Reference to the parent stream.', title='Parent Stream' ) - stream: DeclarativeStream = Field(..., description="Reference to the parent stream.", title="Parent Stream") partition_field: str = Field( ..., - description="While iterating over parent records during a sync, the parent_key value can be referenced by using this field.", - examples=["parent_id", "{{ config['parent_partition_field'] }}"], - title="Current Parent Key Value Identifier", + description='While iterating over parent records during a sync, the parent_key value can be referenced by using this field.', + examples=['parent_id', "{{ config['parent_partition_field'] }}"], + title='Current Parent Key Value Identifier', ) request_option: Optional[RequestOption] = Field( None, - description="A request option describing where the parent key value should be injected into and under what field name if applicable.", - title="Request Option", + description='A request option describing where the parent key value should be injected into and under what field name if applicable.', + title='Request Option', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class SimpleRetriever(BaseModel): - type: Literal["SimpleRetriever"] + type: Literal['SimpleRetriever'] record_selector: RecordSelector = Field( ..., - description="Component that describes how to extract records from a HTTP response.", + description='Component that describes how to extract records from a HTTP response.', ) requester: Union[CustomRequester, HttpRequester] = Field( ..., - description="Requester component that describes how to prepare HTTP requests to send to the source API.", + description='Requester component that describes how to prepare HTTP requests to send to the source API.', ) paginator: Optional[Union[DefaultPaginator, NoPagination]] = Field( None, @@ -1236,24 +1260,28 @@ class SimpleRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], + List[ + Union[ + CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter + ] + ], ] ] = Field( [], - description="PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.", - title="Partition Router", + description='PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.', + title='Partition Router', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') class SubstreamPartitionRouter(BaseModel): - type: Literal["SubstreamPartitionRouter"] + type: Literal['SubstreamPartitionRouter'] parent_stream_configs: List[ParentStreamConfig] = Field( ..., - description="Specifies which parent streams are being iterated over and how parent records should be used to partition the child stream data set.", - title="Parent Stream Configs", + description='Specifies which parent streams are being iterated over and how parent records should be used to partition the child stream data set.', + title='Parent Stream Configs', ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') CompositeErrorHandler.update_forward_refs() diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/SerializedBufferingStrategy.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/SerializedBufferingStrategy.java index d69451440e03..1557ab51beac 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/SerializedBufferingStrategy.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/SerializedBufferingStrategy.java @@ -4,7 +4,7 @@ package io.airbyte.integrations.destination.record_buffer; -import io.airbyte.commons.string.Strings; +import io.airbyte.integrations.util.ConnectorExceptionUtil; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; @@ -60,7 +60,6 @@ public SerializedBufferingStrategy(final BufferCreateFunction onCreateBuffer, * @param stream stream associated with record * @param message {@link AirbyteMessage} to buffer * @return Optional which contains a {@link BufferFlushType} if a flush occurred, otherwise empty) - * @throws Exception */ @Override public Optional addRecord(final AirbyteStreamNameNamespacePair stream, final AirbyteMessage message) throws Exception { @@ -163,9 +162,8 @@ public void close() throws Exception { LOGGER.error("Exception while closing stream buffer", e); } } - if (!exceptionsThrown.isEmpty()) { - throw new RuntimeException(String.format("Exceptions thrown while closing buffers: %s", Strings.join(exceptionsThrown, "\n"))); - } + + ConnectorExceptionUtil.logAllAndThrowFirst("Exceptions thrown while closing buffers: ", exceptionsThrown); } } diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/util/ConnectorExceptionUtil.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/util/ConnectorExceptionUtil.java index 65d6428fcdc3..888d5dc4677c 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/util/ConnectorExceptionUtil.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/util/ConnectorExceptionUtil.java @@ -4,21 +4,29 @@ package io.airbyte.integrations.util; +import static java.util.stream.Collectors.joining; + import com.google.common.collect.ImmutableList; import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.exceptions.ConnectionErrorException; import io.airbyte.integrations.base.errors.messages.ErrorMessage; import java.sql.SQLException; import java.sql.SQLSyntaxErrorException; +import java.util.Collection; import java.util.List; import java.util.Locale; import java.util.function.Predicate; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Utility class defining methods for handling configuration exceptions in connectors. */ public class ConnectorExceptionUtil { + private static final Logger LOGGER = LoggerFactory.getLogger(ConnectorExceptionUtil.class); + public static final String COMMON_EXCEPTION_MESSAGE_TEMPLATE = "Could not connect with provided configuration. Error: %s"; static final String RECOVERY_CONNECTION_ERROR_MESSAGE = "We're having issues syncing from a Postgres replica that is configured as a hot standby server. " + @@ -36,8 +44,7 @@ public static boolean isConfigError(final Throwable e) { public static String getDisplayMessage(final Throwable e) { if (e instanceof ConfigErrorException) { return ((ConfigErrorException) e).getDisplayMessage(); - } else if (e instanceof ConnectionErrorException) { - final ConnectionErrorException connEx = (ConnectionErrorException) e; + } else if (e instanceof final ConnectionErrorException connEx) { return ErrorMessage.getErrorMessage(connEx.getStateCode(), connEx.getErrorCode(), connEx.getExceptionMessage(), connEx); } else if (isRecoveryConnectionExceptionPredicate().test(e)) { return RECOVERY_CONNECTION_ERROR_MESSAGE; @@ -64,6 +71,23 @@ public static Throwable getRootConfigError(final Exception e) { return e; } + /** + * Log all the exceptions, and rethrow the first. This is useful for e.g. running multiple futures + * and waiting for them to complete/fail. Rather than combining them into a single mega-exception + * (which works poorly in the UI), we just log all of them, and throw the first exception. + *

+ * In most cases, all the exceptions will look very similar, so the user only needs to see the first + * exception anyway. This mimics e.g. a for-loop over multiple tasks, where the loop would break on + * the first exception. + */ + public static void logAllAndThrowFirst(final String initialMessage, final Collection throwables) throws T { + if (!throwables.isEmpty()) { + final String stacktraces = throwables.stream().map(ExceptionUtils::getStackTrace).collect(joining("\n")); + LOGGER.error(initialMessage + stacktraces + "\nRethrowing first exception."); + throw throwables.iterator().next(); + } + } + private static Predicate getConfigErrorPredicate() { return e -> e instanceof ConfigErrorException; } diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/FutureUtils.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/FutureUtils.java index 81a8a899927d..5804345bc7ed 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/FutureUtils.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/FutureUtils.java @@ -4,11 +4,11 @@ package io.airbyte.integrations.base.destination.typing_deduping; +import io.airbyte.integrations.util.ConnectorExceptionUtil; import java.util.Collection; +import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; -import org.apache.commons.lang3.StringUtils; public class FutureUtils { @@ -24,17 +24,18 @@ public static int countOfTypingDedupingThreads(final int defaultThreads) { .orElse(defaultThreads); } + /** + * Log all exceptions from a list of futures, and rethrow the first exception if there is one. This + * mimics the behavior of running the futures in serial, where the first failure + */ public static void reduceExceptions(final Collection>> potentialExceptions, final String initialMessage) throws Exception { - final var exceptionMessages = potentialExceptions.stream() + final List exceptions = potentialExceptions.stream() .map(CompletableFuture::join) .filter(Optional::isPresent) .map(Optional::get) - .map(Exception::getMessage) - .collect(Collectors.joining("\n")); - if (StringUtils.isNotBlank(exceptionMessages)) { - throw new Exception(initialMessage + exceptionMessages); - } + .toList(); + ConnectorExceptionUtil.logAllAndThrowFirst(initialMessage, exceptions); } } diff --git a/airbyte-integrations/connectors/destination-bigquery/Dockerfile b/airbyte-integrations/connectors/destination-bigquery/Dockerfile index b0eefc66db37..d4f74bfe3e38 100644 --- a/airbyte-integrations/connectors/destination-bigquery/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery/Dockerfile @@ -25,5 +25,5 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=2.0.9 +LABEL io.airbyte.version=2.0.10 LABEL io.airbyte.name=airbyte/destination-bigquery diff --git a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml index a01a47122987..eca40d520ff4 100644 --- a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml +++ b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 - dockerImageTag: 2.0.9 + dockerImageTag: 2.0.10 dockerRepository: airbyte/destination-bigquery githubIssueLabel: destination-bigquery icon: bigquery.svg diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java index ebc72057c068..3bfcef96f1b9 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java @@ -6,7 +6,6 @@ import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.TableId; -import io.airbyte.commons.string.Strings; import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog; @@ -15,6 +14,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper; import io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryRecordFormatter; import io.airbyte.integrations.destination.bigquery.uploader.AbstractBigQueryUploader; +import io.airbyte.integrations.util.ConnectorExceptionUtil; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteMessage.Type; import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; @@ -132,9 +132,8 @@ public void close(final boolean hasFailed) throws Exception { }); typerDeduper.commitFinalTables(); typerDeduper.cleanup(); - if (!exceptionsThrown.isEmpty()) { - throw new RuntimeException(String.format("Exceptions thrown while closing consumer: %s", Strings.join(exceptionsThrown, "\n"))); - } + + ConnectorExceptionUtil.logAllAndThrowFirst("Exceptions thrown while closing consumer: ", exceptionsThrown); } } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java index bf9d96b5620c..7f27d5e83d6d 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.destination.bigquery.typing_deduping; import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.JobConfiguration; import com.google.cloud.bigquery.JobId; @@ -15,12 +16,14 @@ import com.google.cloud.bigquery.Table; import com.google.cloud.bigquery.TableDefinition; import com.google.cloud.bigquery.TableId; +import com.google.common.collect.Streams; import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; import java.math.BigInteger; import java.util.Comparator; import java.util.Optional; import java.util.UUID; +import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,19 +35,19 @@ public class BigQueryDestinationHandler implements DestinationHandler findExistingTable(StreamId id) { + public Optional findExistingTable(final StreamId id) { final Table table = bq.getTable(id.finalNamespace(), id.finalName()); return Optional.ofNullable(table).map(Table::getDefinition); } @Override - public boolean isFinalTableEmpty(StreamId id) { + public boolean isFinalTableEmpty(final StreamId id) { return BigInteger.ZERO.equals(bq.getTable(TableId.of(id.finalNamespace(), id.finalName())).getNumRows()); } @@ -67,10 +70,13 @@ public void execute(final String sql) throws InterruptedException { job = job.reload(); } if (job.getStatus().getError() != null) { - throw new RuntimeException(job.getStatus().getError().toString()); + throw new BigQueryException(Streams.concat( + Stream.of(job.getStatus().getError()), + job.getStatus().getExecutionErrors().stream() + ).toList()); } - JobStatistics.QueryStatistics statistics = job.getStatistics(); + final JobStatistics.QueryStatistics statistics = job.getStatistics(); LOGGER.info("Root-level job {} completed in {} ms; processed {} bytes; billed for {} bytes", queryId, statistics.getEndTime() - statistics.getStartTime(), @@ -83,9 +89,9 @@ public void execute(final String sql) throws InterruptedException { bq.listJobs(BigQuery.JobListOption.parentJobId(job.getJobId().getJob())).streamAll() .sorted(Comparator.comparing(childJob -> childJob.getStatistics().getEndTime())) .forEach(childJob -> { - JobConfiguration configuration = childJob.getConfiguration(); - if (configuration instanceof QueryJobConfiguration qc) { - JobStatistics.QueryStatistics childQueryStats = childJob.getStatistics(); + final JobConfiguration configuration = childJob.getConfiguration(); + if (configuration instanceof final QueryJobConfiguration qc) { + final JobStatistics.QueryStatistics childQueryStats = childJob.getStatistics(); String truncatedQuery = qc.getQuery() .replaceAll("\n", " ") .replaceAll(" +", " ") @@ -101,7 +107,7 @@ public void execute(final String sql) throws InterruptedException { } else { // other job types are extract/copy/load // we're probably not using them, but handle just in case? - JobStatistics childJobStats = childJob.getStatistics(); + final JobStatistics childJobStats = childJob.getStatistics(); LOGGER.info("Non-query child job ({}) completed in {} ms", configuration.getType(), childJobStats.getEndTime() - childJobStats.getStartTime()); diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile index 29b8c2a88fa6..4d12b9d1b757 100644 --- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile @@ -29,5 +29,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1 ENV ENABLE_SENTRY true -LABEL io.airbyte.version=3.1.6 +LABEL io.airbyte.version=3.1.7 LABEL io.airbyte.name=airbyte/destination-snowflake diff --git a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml index 83fc6dd78fe7..8d6c07817ea4 100644 --- a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml +++ b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 424892c4-daac-4491-b35d-c6688ba547ba - dockerImageTag: 3.1.6 + dockerImageTag: 3.1.7 dockerRepository: airbyte/destination-snowflake githubIssueLabel: destination-snowflake icon: snowflake.svg diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java index 20539b4e1280..644f2c54e857 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java @@ -11,12 +11,14 @@ import java.util.LinkedHashMap; import java.util.Optional; import java.util.UUID; +import net.snowflake.client.jdbc.SnowflakeSQLException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SnowflakeDestinationHandler implements DestinationHandler { private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeDestinationHandler.class); + public static final String EXCEPTION_COMMON_PREFIX = "JavaScript execution error: Uncaught Execution of multiple statements failed on statement"; private final String databaseName; private final JdbcDatabase database; @@ -79,7 +81,20 @@ public void execute(final String sql) throws Exception { LOGGER.info("Executing sql {}: {}", queryId, sql); final long startTime = System.currentTimeMillis(); - database.execute(sql); + try { + database.execute(sql); + } catch (final SnowflakeSQLException e) { + LOGGER.error("Sql {} failed", queryId, e); + // Snowflake SQL exceptions by default may not be super helpful, so we try to extract the relevant part of the message. + final String trimmedMessage; + if (e.getMessage().startsWith(EXCEPTION_COMMON_PREFIX)) { + // The first line is a pretty generic message, so just remove it + trimmedMessage = e.getMessage().substring(e.getMessage().indexOf("\n") + 1); + } else { + trimmedMessage = e.getMessage(); + } + throw new RuntimeException(trimmedMessage, e); + } LOGGER.info("Sql {} completed in {} ms", queryId, System.currentTimeMillis() - startTime); } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java index 0f3da71d3ce7..26c28944495a 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java @@ -327,7 +327,7 @@ public void incrementalDedupInvalidPrimaryKey() throws Exception { final String sql = generator.updateTable(incrementalDedupStream, ""); final Exception exception = assertThrows( - SnowflakeSQLException.class, + RuntimeException.class, () -> destinationHandler.execute(sql)); assertTrue(exception.getMessage().contains("_AB_MISSING_PRIMARY_KEY")); diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index 0ae4a55bfd0a..501db0294ef4 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -135,6 +135,7 @@ Now that you have set up the BigQuery destination connector, check out the follo | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 2.0.10 | 2023-09-15 | [\#30491](https://github.com/airbytehq/airbyte/pull/30491) | Improve error message display | | 2.0.9 | 2023-09-14 | [\#30439](https://github.com/airbytehq/airbyte/pull/30439) | Fix a transient error | | 2.0.8 | 2023-09-12 | [\#30364](https://github.com/airbytehq/airbyte/pull/30364) | Add log message | | 2.0.7 | 2023-08-29 | [29878](https://github.com/airbytehq/airbyte/pull/29878) | Internal code changes | diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index 0d8ea8e8ab6b..af1b4e6a9283 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -271,6 +271,7 @@ Otherwise, make sure to grant the role the required permissions in the desired n | Version | Date | Pull Request | Subject | | :-------------- | :--------- | :--------------------------------------------------------- | :-------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| 3.1.7 | 2023-09-15 | [\#30491](https://github.com/airbytehq/airbyte/pull/30491) | Improve error message display | | 3.1.6 | 2023-09-14 | [\#30439](https://github.com/airbytehq/airbyte/pull/30439) | Fix a transient error | | 3.1.5 | 2023-09-13 | [\#30416](https://github.com/airbytehq/airbyte/pull/30416) | Support `${` in stream name/namespace, and in column names | | 3.1.4 | 2023-09-12 | [\#30364](https://github.com/airbytehq/airbyte/pull/30364) | Add log message |