Skip to content

Commit

Permalink
Merge branch 'master' into btkcodedev/awsCloudTrailLowCode
Browse files Browse the repository at this point in the history
  • Loading branch information
lazebnyi authored Jun 3, 2024
2 parents d15f4c0 + f4c4c34 commit 8aa0449
Show file tree
Hide file tree
Showing 308 changed files with 17,839 additions and 6,626 deletions.
1 change: 1 addition & 0 deletions .github/workflows/connectors_version_increment_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ jobs:
connectors_ci:
name: Connectors Version Increment Check
runs-on: connector-test-large
if: github.event.pull_request.head.repo.fork != true
timeout-minutes: 10
steps:
- name: Checkout Airbyte
Expand Down
3 changes: 0 additions & 3 deletions .github/workflows/format-fix-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ jobs:
continue-on-error: true
with:
context: "manual"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
Expand Down
2 changes: 2 additions & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,10 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.36.4 | 2024-05-31 | [\#38824](https://github.com/airbytehq/airbyte/pull/38824) | Param marked as non-null to nullable in JdbcDestinationHandler for NPE fix |
| 0.36.2 | 2024-05-29 | [\#38538](https://github.com/airbytehq/airbyte/pull/38357) | Exit connector when encountering a config error. |
| 0.36.0 | 2024-05-29 | [\#38358](https://github.com/airbytehq/airbyte/pull/38358) | Plumb generation_id / sync_id to destinations code |
| 0.35.15 | 2024-05-31 | [\#38824](https://github.com/airbytehq/airbyte/pull/38824) | Param marked as non-null to nullable in JdbcDestinationHandler for NPE fix |
| 0.35.14 | 2024-05-28 | [\#38738](https://github.com/airbytehq/airbyte/pull/38738) | make ThreadCreationInfo cast as nullable |
| 0.35.13 | 2024-05-28 | [\#38632](https://github.com/airbytehq/airbyte/pull/38632) | minor changes to allow conversion of snowflake tests to kotlin |
| 0.35.12 | 2024-05-23 | [\#38638](https://github.com/airbytehq/airbyte/pull/38638) | Minor change to support Snowflake conversion to Kotlin |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.36.3
version=0.36.4
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ abstract class JdbcDestinationHandler<DestinationState>(
// Filter for nonNull values in case the query returned NULL (i.e. no unloaded
// records).
val minUnloadedTimestamp: Optional<Timestamp> =
timestampStream.filter { obj: Timestamp -> Objects.nonNull(obj) }.findFirst()
timestampStream.filter { obj: Timestamp? -> Objects.nonNull(obj) }.findFirst()
if (minUnloadedTimestamp.isPresent) {
// Decrement by 1 second since timestamp precision varies between databases.
val ts =
Expand All @@ -145,7 +145,7 @@ abstract class JdbcDestinationHandler<DestinationState>(
// Filter for nonNull values in case the query returned NULL (i.e. no raw records at
// all).
val minUnloadedTimestamp: Optional<Timestamp> =
timestampStream.filter { obj: Timestamp -> Objects.nonNull(obj) }.findFirst()
timestampStream.filter { obj: Timestamp? -> Objects.nonNull(obj) }.findFirst()
return InitialRawTableStatus(
true,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils
import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifier
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager
import io.airbyte.commons.exceptions.ConfigErrorException
import io.airbyte.commons.functional.CheckedConsumer
import io.airbyte.commons.functional.CheckedFunction
import io.airbyte.commons.json.Jsons
Expand Down Expand Up @@ -251,6 +252,45 @@ abstract class AbstractJdbcSource<Datatype>(
)
}

/**
* Checks that current user can SELECT from the tables in the schemas. We can override this
* function if it takes too long to finish for a particular database source connector.
*/
@Throws(Exception::class)
protected open fun checkUserHasPrivileges(config: JsonNode?, database: JdbcDatabase) {
var schemas = ArrayList<String>()
if (config!!.has(JdbcUtils.SCHEMAS_KEY) && config[JdbcUtils.SCHEMAS_KEY].isArray) {
for (schema in config[JdbcUtils.SCHEMAS_KEY]) {
schemas.add(schema.asText())
}
}
// if UI has schemas specified, check if the user has select access to any table
if (schemas.isNotEmpty()) {
for (schema in schemas) {
LOGGER.info {
"Checking if the user can perform select to any table in schema: $schema"
}
val tablesOfSchema = database.metaData.getTables(null, schema, "%", null)
if (tablesOfSchema.next()) {
var privileges =
getPrivilegesTableForCurrentUser<JdbcPrivilegeDto>(database, schema)
if (privileges.isEmpty()) {
LOGGER.info { "No table from schema $schema is accessible for the user." }
throw ConfigErrorException(
"User lacks privileges to SELECT from any of the tables in schema $schema"
)
}
} else {
LOGGER.info { "Schema $schema does not contain any table." }
}
}
} else {
LOGGER.info {
"No schema has been provided at the moment, skip table permission check."
}
}
}

/**
* Configures a list of operations that can be used to check the connection to the source.
*
Expand All @@ -270,9 +310,10 @@ abstract class AbstractJdbcSource<Datatype>(
CheckedFunction { connection: Connection -> connection.metaData.catalogs },
CheckedFunction { queryResult: ResultSet ->
sourceOperations.rowToJson(queryResult)
}
},
)
}
},
CheckedConsumer { database: JdbcDatabase -> checkUserHasPrivileges(config, database) },
)
}

Expand Down
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 1.2.0
add client side incremental sync

## 1.1.3
Removed experimental suffix for unstructured file type

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,10 @@ definitions:
title: Whether the target API is formatted as a data feed
description: A data feed API is an API that does not allow filtering and paginates the content from the most recent to the least recent. Given this, the CDK needs to know when to stop paginating and this field will generate a stop condition for pagination.
type: boolean
is_client_side_incremental:
title: Whether the target API does not support filtering and returns all data (the cursor filters records in the client instead of the API side)
description: If the target API endpoint does not take cursor values to filter records and returns all records anyway, the connector with this cursor will filter out records locally, and only emit new records from the last sync, hence incremental. This means that all records would be read from the API, but only new records will be emitted to the destination.
type: boolean
lookback_window:
title: Lookback Window
description: Time interval before the start_datetime to read data for, e.g. P1M for looking back one month.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import datetime
from dataclasses import InitVar, dataclass
from typing import Any, Iterable, Mapping, Optional

from airbyte_cdk.sources.declarative.incremental import DatetimeBasedCursor, PerPartitionCursor
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState

Expand Down Expand Up @@ -36,3 +37,80 @@ def filter_records(
for record in records:
if self._filter_interpolator.eval(self.config, record=record, **kwargs):
yield record


class ClientSideIncrementalRecordFilterDecorator(RecordFilter):
"""
Applies a filter to a list of records to exclude those that are older than the stream_state/start_date.
:param DatetimeBasedCursor date_time_based_cursor: Cursor used to extract datetime values
:param PerPartitionCursor per_partition_cursor: Optional Cursor used for mapping cursor value in nested stream_state
"""

def __init__(
self, date_time_based_cursor: DatetimeBasedCursor, per_partition_cursor: Optional[PerPartitionCursor] = None, **kwargs: Any
):
super().__init__(**kwargs)
self._date_time_based_cursor = date_time_based_cursor
self._per_partition_cursor = per_partition_cursor

@property
def _cursor_field(self) -> str:
return self._date_time_based_cursor.cursor_field.eval(self._date_time_based_cursor.config) # type: ignore # eval returns a string in this context

@property
def _start_date_from_config(self) -> datetime.datetime:
return self._date_time_based_cursor._start_datetime.get_datetime(self._date_time_based_cursor.config)

@property
def _end_datetime(self) -> datetime.datetime:
return (
self._date_time_based_cursor._end_datetime.get_datetime(self._date_time_based_cursor.config)
if self._date_time_based_cursor._end_datetime
else datetime.datetime.max
)

def filter_records(
self,
records: Iterable[Mapping[str, Any]],
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Iterable[Mapping[str, Any]]:
state_value = self._get_state_value(stream_state, stream_slice or StreamSlice(partition={}, cursor_slice={}))
filter_date: datetime.datetime = self._get_filter_date(state_value)
records = (
record
for record in records
if self._end_datetime > self._date_time_based_cursor.parse_date(record[self._cursor_field]) > filter_date
)
if self.condition:
records = super().filter_records(
records=records, stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
)
yield from records

def _get_state_value(self, stream_state: StreamState, stream_slice: StreamSlice) -> Optional[str]:
"""
Return cursor_value or None in case it was not found.
Cursor_value may be empty if:
1. It is an initial sync => no stream_state exist at all.
2. In Parent-child stream, and we already make initial sync, so stream_state is present.
During the second read, we receive one extra record from parent and therefore no stream_state for this record will be found.
:param StreamState stream_state: State
:param StreamSlice stream_slice: Current Stream slice
:return Optional[str]: cursor_value in case it was found, otherwise None.
"""
if self._per_partition_cursor:
# self._per_partition_cursor is the same object that DeclarativeStream uses to save/update stream_state
partition_state = self._per_partition_cursor.select_state(stream_slice=stream_slice)
return partition_state.get(self._cursor_field) if partition_state else None
return stream_state.get(self._cursor_field)

def _get_filter_date(self, state_value: Optional[str]) -> datetime.datetime:
start_date_parsed = self._start_date_from_config
if state_value:
return max(start_date_parsed, self._date_time_based_cursor.parse_date(state_value))
else:
return start_date_parsed
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
else datetime.timedelta.max
)
self._cursor_granularity = self._parse_timedelta(self.cursor_granularity)
self._cursor_field = InterpolatedString.create(self.cursor_field, parameters=parameters)
self.cursor_field = InterpolatedString.create(self.cursor_field, parameters=parameters)
self._lookback_window = InterpolatedString.create(self.lookback_window, parameters=parameters) if self.lookback_window else None
self._partition_field_start = InterpolatedString.create(self.partition_field_start or "start_time", parameters=parameters)
self._partition_field_end = InterpolatedString.create(self.partition_field_end or "end_time", parameters=parameters)
Expand All @@ -103,7 +103,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self.cursor_datetime_formats = [self.datetime_format]

def get_stream_state(self) -> StreamState:
return {self._cursor_field.eval(self.config): self._cursor} if self._cursor else {}
return {self.cursor_field.eval(self.config): self._cursor} if self._cursor else {} # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__

def set_initial_state(self, stream_state: StreamState) -> None:
"""
Expand All @@ -112,7 +112,7 @@ def set_initial_state(self, stream_state: StreamState) -> None:
:param stream_state: The state of the stream as returned by get_stream_state
"""
self._cursor = stream_state.get(self._cursor_field.eval(self.config)) if stream_state else None
self._cursor = stream_state.get(self.cursor_field.eval(self.config)) if stream_state else None # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__

def observe(self, stream_slice: StreamSlice, record: Record) -> None:
"""
Expand All @@ -122,7 +122,7 @@ def observe(self, stream_slice: StreamSlice, record: Record) -> None:
:param record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the
stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
"""
record_cursor_value = record.get(self._cursor_field.eval(self.config))
record_cursor_value = record.get(self.cursor_field.eval(self.config)) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__
# if the current record has no cursor value, we cannot meaningfully update the state based on it, so there is nothing more to do
if not record_cursor_value:
return
Expand Down Expand Up @@ -186,8 +186,8 @@ def _select_best_end_datetime(self) -> datetime.datetime:
return min(self._end_datetime.get_datetime(self.config), now)

def _calculate_cursor_datetime_from_state(self, stream_state: Mapping[str, Any]) -> datetime.datetime:
if self._cursor_field.eval(self.config, stream_state=stream_state) in stream_state:
return self.parse_date(stream_state[self._cursor_field.eval(self.config)])
if self.cursor_field.eval(self.config, stream_state=stream_state) in stream_state: # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__
return self.parse_date(stream_state[self.cursor_field.eval(self.config)]) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)

def _format_datetime(self, dt: datetime.datetime) -> str:
Expand Down Expand Up @@ -300,7 +300,7 @@ def _get_request_options(self, option_type: RequestOptionType, stream_slice: Opt
return options

def should_be_synced(self, record: Record) -> bool:
cursor_field = self._cursor_field.eval(self.config)
cursor_field = self.cursor_field.eval(self.config) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__
record_cursor_value = record.get(cursor_field)
if not record_cursor_value:
self._send_log(
Expand All @@ -315,7 +315,7 @@ def should_be_synced(self, record: Record) -> bool:
def _is_within_daterange_boundaries(
self, record: Record, start_datetime_boundary: Union[datetime.datetime, str], end_datetime_boundary: Union[datetime.datetime, str]
) -> bool:
cursor_field = self._cursor_field.eval(self.config)
cursor_field = self.cursor_field.eval(self.config) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__
record_cursor_value = record.get(cursor_field)
if not record_cursor_value:
self._send_log(
Expand All @@ -339,7 +339,7 @@ def _send_log(self, level: Level, message: str) -> None:
)

def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
cursor_field = self._cursor_field.eval(self.config)
cursor_field = self.cursor_field.eval(self.config) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__
first_cursor_value = first.get(cursor_field)
second_cursor_value = second.get(cursor_field)
if first_cursor_value and second_cursor_value:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,11 @@ class DatetimeBasedCursor(BaseModel):
description='A data feed API is an API that does not allow filtering and paginates the content from the most recent to the least recent. Given this, the CDK needs to know when to stop paginating and this field will generate a stop condition for pagination.',
title='Whether the target API is formatted as a data feed',
)
is_client_side_incremental: Optional[bool] = Field(
None,
description='If the target API endpoint does not take cursor values to filter records and returns all records anyway, the connector with this cursor will filter out records locally, and only emit new records from the last sync, hence incremental. This means that all records would be read from the API, but only new records will be emitted to the destination.',
title='Whether the target API does not support filtering and returns all data (the cursor filters records in the client instead of the API side)',
)
lookback_window: Optional[str] = Field(
None,
description='Time interval before the start_datetime to read data for, e.g. P1M for looking back one month.',
Expand Down
Loading

0 comments on commit 8aa0449

Please sign in to comment.