Skip to content

Commit

Permalink
Merge branch 'master' into lmossman/upload-all-script
Browse files Browse the repository at this point in the history
  • Loading branch information
lmossman committed Oct 6, 2023
2 parents 9e1e838 + 5e45f54 commit 6faed19
Show file tree
Hide file tree
Showing 94 changed files with 547 additions and 663 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/publish-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,5 @@ jobs:
> :warning: The publish slash command is now deprecated.<br>
The connector publication happens on merge to the master branch.<br>
Please use /legacy-publish if you need to publish normalization images.<br>
Please join the #publish-on-merge-updates slack channel to track ongoing publish pipelines.<br>
Please join the #connector-publish-updates slack channel to track ongoing publish pipelines.<br>
Please reach out to the @dev-connector-ops team if you need support in publishing a connector.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class GlobalMemoryManager {
// In cases where a queue is rapidly expanding, a larger block size allows less allocation calls. On
// the flip size, a smaller block size allows more granular memory management. Since this overhead
// is minimal for now, err on a smaller block sizes.
public static final long BLOCK_SIZE_BYTES = 30 * 1024 * 1024; // 30MB
public static final long BLOCK_SIZE_BYTES = 10 * 1024 * 1024; // 10MB
private final long maxMemoryBytes;

private final AtomicLong currentMemoryBytes = new AtomicLong(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ public class GlobalMemoryManagerTest {

@Test
void test() {
final GlobalMemoryManager mgr = new GlobalMemoryManager(35 * BYTES_MB);
final GlobalMemoryManager mgr = new GlobalMemoryManager(15 * BYTES_MB);

assertEquals(30 * BYTES_MB, mgr.requestMemory());
assertEquals(10 * BYTES_MB, mgr.requestMemory());
assertEquals(5 * BYTES_MB, mgr.requestMemory());
assertEquals(0, mgr.requestMemory());

mgr.free(10 * BYTES_MB);
assertEquals(10 * BYTES_MB, mgr.requestMemory());
mgr.free(31 * BYTES_MB);
assertEquals(30 * BYTES_MB, mgr.requestMemory());
mgr.free(16 * BYTES_MB);
assertEquals(10 * BYTES_MB, mgr.requestMemory());
}

}
2 changes: 1 addition & 1 deletion airbyte-cdk/python/.bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.51.26
current_version = 0.51.29
commit = False

[bumpversion:file:setup.py]
Expand Down
9 changes: 9 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Changelog

## 0.51.29
Coerce read_records to iterable in http availabilty strategy

## 0.51.28
Add functionality enabling Page Number/Offset to be set on the first request

## 0.51.27
Fix parsing of UUID fields in avro files

## 0.51.26
Vector DB CDK: Fix OpenAI embedder batch size

Expand Down
4 changes: 2 additions & 2 deletions airbyte-cdk/python/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ RUN apk --no-cache upgrade \
&& apk --no-cache add tzdata build-base

# install airbyte-cdk
RUN pip install --prefix=/install airbyte-cdk==0.51.26
RUN pip install --prefix=/install airbyte-cdk==0.51.29

# build a clean environment
FROM base
Expand All @@ -32,5 +32,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

# needs to be the same as CDK
LABEL io.airbyte.version=0.51.26
LABEL io.airbyte.version=0.51.29
LABEL io.airbyte.name=airbyte/source-declarative-manifest
Original file line number Diff line number Diff line change
Expand Up @@ -1626,6 +1626,11 @@ definitions:
examples:
- 100
- "{{ config['page_size'] }}"
inject_on_first_request:
title: Inject Offset
description: Using the `offset` with value `0` during the first request
type: boolean
default: false
$parameters:
type: object
additionalProperties: true
Expand Down Expand Up @@ -1654,6 +1659,11 @@ definitions:
examples:
- 0
- 1
inject_on_first_request:
title: Inject Page Number
description: Using the `page number` with value defined by `start_from_page` during the first request
type: boolean
default: false
$parameters:
type: object
additionalProperties: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,11 @@ class OffsetIncrement(BaseModel):
examples=[100, "{{ config['page_size'] }}"],
title='Limit',
)
inject_on_first_request: Optional[bool] = Field(
False,
description='Using the `offset` with value `0` during the first request',
title='Inject Offset',
)
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')


Expand All @@ -568,6 +573,11 @@ class PageIncrement(BaseModel):
examples=[0, 1],
title='Start From Page',
)
inject_on_first_request: Optional[bool] = Field(
False,
description='Using the `page number` with value defined by `start_from_page` during the first request',
title='Inject Page Number',
)
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass, field
from dataclasses import InitVar, dataclass
from typing import Any, List, Mapping, Optional, Union

import requests
Expand Down Expand Up @@ -91,7 +91,6 @@ class DefaultPaginator(Paginator):
url_base: Union[InterpolatedString, str]
parameters: InitVar[Mapping[str, Any]]
decoder: Decoder = JsonDecoder(parameters={})
_token: Optional[Any] = field(init=False, repr=False, default=None)
page_size_option: Optional[RequestOption] = None
page_token_option: Optional[Union[RequestPath, RequestOption]] = None

Expand All @@ -100,6 +99,7 @@ def __post_init__(self, parameters: Mapping[str, Any]):
raise ValueError("page_size_option cannot be set if the pagination strategy does not have a page_size")
if isinstance(self.url_base, str):
self.url_base = InterpolatedString(string=self.url_base, parameters=parameters)
self._token = self.pagination_strategy.initial_token

def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Mapping[str, Any]]:
self._token = self.pagination_strategy.next_page_token(response, last_records)
Expand Down Expand Up @@ -160,7 +160,7 @@ def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, A

if (
self.page_token_option
and self._token
and self._token is not None
and isinstance(self.page_token_option, RequestOption)
and self.page_token_option.inject_into == option_type
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ def __post_init__(self, parameters: Mapping[str, Any]):
if isinstance(self.stop_condition, str):
self.stop_condition = InterpolatedBoolean(condition=self.stop_condition, parameters=parameters)

@property
def initial_token(self) -> Optional[Any]:
return None

def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Any]:
decoded_response = self.decoder.decode(response)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class OffsetIncrement(PaginationStrategy):
page_size: Optional[Union[str, int]]
parameters: InitVar[Mapping[str, Any]]
decoder: Decoder = JsonDecoder(parameters={})
inject_on_first_request: bool = False

def __post_init__(self, parameters: Mapping[str, Any]):
self._offset = 0
Expand All @@ -49,6 +50,12 @@ def __post_init__(self, parameters: Mapping[str, Any]):
else:
self._page_size = None

@property
def initial_token(self) -> Optional[Any]:
if self.inject_on_first_request:
return self._offset
return None

def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Any]:
decoded_response = self.decoder.decode(response)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,17 @@ class PageIncrement(PaginationStrategy):
page_size: Optional[int]
parameters: InitVar[Mapping[str, Any]]
start_from_page: int = 0
inject_on_first_request: bool = False

def __post_init__(self, parameters: Mapping[str, Any]):
self._page = self.start_from_page

@property
def initial_token(self) -> Optional[Any]:
if self.inject_on_first_request:
return self._page
return None

def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Any]:
# Stop paginating when there are fewer records than the page size or the current page has no records
if (self.page_size and len(last_records) < self.page_size) or len(last_records) == 0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ class PaginationStrategy:
Defines how to get the next page token
"""

@property
@abstractmethod
def initial_token(self) -> Optional[Any]:
"""
Return the initial value of the token
"""

@abstractmethod
def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Any]:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#

import logging
import uuid
from typing import Any, Dict, Iterable, Mapping, Optional

import fastavro
Expand Down Expand Up @@ -159,9 +158,7 @@ def _to_output_value(avro_format: AvroFormat, record_type: Mapping[str, Any], re
if record_type == "double" and avro_format.double_as_string:
return str(record_value)
return record_value
if record_type.get("logicalType") == "uuid":
return uuid.UUID(bytes=record_value)
elif record_type.get("logicalType") == "decimal":
if record_type.get("logicalType") in ("decimal", "uuid"):
return str(record_value)
elif record_type.get("logicalType") == "date":
return record_value.isoformat()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ def _to_output_value(parquet_value: Scalar, parquet_format: ParquetFormat) -> An

# Decode binary strings to utf-8
if ParquetParser._is_binary(parquet_value.type):
return parquet_value.as_py().decode("utf-8")
py_value = parquet_value.as_py()
if py_value is None:
return py_value
return py_value.decode("utf-8")
if pa.types.is_decimal(parquet_value.type):
if parquet_format.decimal_as_float:
return parquet_value.as_py()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,7 @@ def get_first_record_for_slice(stream: Stream, stream_slice: Optional[Mapping[st
:raises StopIteration: if there is no first record to return (the read_records generator is empty)
:return: StreamData containing the first record in the slice
"""
records_for_slice = stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice)
# We wrap the return output of read_records() because some implementations return types that are iterable,
# but not iterators such as lists or tuples
records_for_slice = iter(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice))
return next(records_for_slice)
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
name="airbyte-cdk",
# The version of the airbyte-cdk package is used at runtime to validate manifests. That validation must be
# updated if our semver format changes such as using release candidate versions.
version="0.51.26",
version="0.51.29",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,20 @@ def test_reset():
assert request_parameters_for_second_request != request_parameters_after_reset


def test_initial_token_with_offset_pagination():
page_size_request_option = RequestOption(inject_into=RequestOptionType.request_parameter, field_name="limit", parameters={})
page_token_request_option = RequestOption(inject_into=RequestOptionType.request_parameter, field_name="offset", parameters={})
url_base = "https://airbyte.io"
config = {}
strategy = OffsetIncrement(config={}, page_size=2, parameters={}, inject_on_first_request=True)
paginator = DefaultPaginator(
strategy, config, url_base, parameters={}, page_size_option=page_size_request_option, page_token_option=page_token_request_option
)
initial_request_parameters = paginator.get_request_params()

assert initial_request_parameters == {"limit": 2, "offset": 0}


def test_limit_page_fetched():
maximum_number_of_pages = 5
number_of_next_performed = maximum_number_of_pages - 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

import json
from typing import Any, Optional

import pytest
import requests
Expand Down Expand Up @@ -42,3 +43,16 @@ def test_offset_increment_paginator_strategy_rises():
with pytest.raises(Exception) as exc:
paginator_strategy.get_page_size()
assert str(exc.value) == "invalid value is of type <class 'str'>. Expected <class 'int'>"


@pytest.mark.parametrize(
"inject_on_first_request, expected_initial_token",
[
pytest.param(True, 0, id="test_with_inject_offset"),
pytest.param(False, None, id="test_without_inject_offset"),
],
)
def test_offset_increment_paginator_strategy_initial_token(inject_on_first_request: bool, expected_initial_token: Optional[Any]):
paginator_strategy = OffsetIncrement(page_size=20, parameters={}, config={}, inject_on_first_request=inject_on_first_request)

assert paginator_strategy.initial_token == expected_initial_token
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

import json
from typing import Any, Optional

import pytest
import requests
Expand Down Expand Up @@ -35,3 +36,21 @@ def test_page_increment_paginator_strategy(page_size, start_from, last_records,

paginator_strategy.reset()
assert start_from == paginator_strategy._page


@pytest.mark.parametrize(
"inject_on_first_request, start_from_page, expected_initial_token",
[
pytest.param(True, 0, 0, id="test_with_inject_offset_page_start_from_0"),
pytest.param(True, 12, 12, id="test_with_inject_offset_page_start_from_12"),
pytest.param(False, 2, None, id="test_without_inject_offset"),
],
)
def test_page_increment_paginator_strategy_initial_token(
inject_on_first_request: bool, start_from_page: int, expected_initial_token: Optional[Any]
):
paginator_strategy = PageIncrement(
page_size=20, parameters={}, start_from_page=start_from_page, inject_on_first_request=inject_on_first_request
)

assert paginator_strategy.initial_token == expected_initial_token
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

_default_avro_format = AvroFormat()
_double_as_string_avro_format = AvroFormat(double_as_string=True)
_uuid_value = uuid.uuid4()


@pytest.mark.parametrize(
Expand Down Expand Up @@ -217,9 +218,7 @@ def test_convert_primitive_avro_type_to_json(avro_format, avro_type, expected_js
pytest.param(_default_avro_format, "bytes", b"hello world", b"hello world", id="test_bytes"),
pytest.param(_default_avro_format, "string", "hello world", "hello world", id="test_string"),
pytest.param(_default_avro_format, {"logicalType": "decimal"}, 3.1415, "3.1415", id="test_decimal"),
pytest.param(
_default_avro_format, {"logicalType": "uuid"}, b"abcdefghijklmnop", uuid.UUID(bytes=b"abcdefghijklmnop"), id="test_uuid"
),
pytest.param(_default_avro_format, {"logicalType": "uuid"}, _uuid_value, str(_uuid_value), id="test_uuid"),
pytest.param(_default_avro_format, {"logicalType": "date"}, datetime.date(2023, 8, 7), "2023-08-07", id="test_date"),
pytest.param(_default_avro_format, {"logicalType": "time-millis"}, 70267068, 70267068, id="test_time_millis"),
pytest.param(_default_avro_format, {"logicalType": "time-micros"}, 70267068, 70267068, id="test_time_micros"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,14 @@ def test_value_dictionary() -> None:
assert py_value == {"indices": [0, 1, 2, 0, 1], "values": ["apple", "banana", "cherry"]}


def test_value_none_binary() -> None:
none_binary_scalar = pa.scalar(None, type=pa.binary())
try:
ParquetParser._to_output_value(none_binary_scalar, _default_parquet_format)
except AttributeError:
assert False, "`None` type binary should be handled properly"


@pytest.mark.parametrize(
"file_format",
[
Expand Down
Loading

0 comments on commit 6faed19

Please sign in to comment.