Skip to content

Commit

Permalink
🐛 Source Convex: full_refresh stops after one page (#33431)
Browse files Browse the repository at this point in the history
Co-authored-by: Serhii Lazebnyi <[email protected]>
  • Loading branch information
ldanilek and lazebnyi authored Dec 18, 2023
1 parent 3a70f0c commit 63e96fb
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 12 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-convex/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_convex ./source_convex
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.3.0
LABEL io.airbyte.version=0.4.0
LABEL io.airbyte.name=airbyte/source-convex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: c332628c-f55c-4017-8222-378cfafda9b2
dockerImageTag: 0.3.0
dockerImageTag: 0.4.0
dockerRepository: airbyte/source-convex
githubIssueLabel: source-convex
icon: convex.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
},
)

CONVEX_CLIENT_VERSION = "0.3.0"
CONVEX_CLIENT_VERSION = "0.4.0"


# Source
Expand Down Expand Up @@ -153,7 +153,8 @@ def next_page_token(self, response: requests.Response) -> Optional[ConvexState]:
else:
self._delta_cursor_value = resp_json["cursor"]
self._delta_has_more = resp_json["hasMore"]
return cast(ConvexState, self.state) if self._delta_has_more else None
has_more = self._snapshot_has_more or self._delta_has_more
return cast(ConvexState, self.state) if has_more else None

def path(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
from unittest.mock import MagicMock

import pytest
import requests
import responses
from airbyte_cdk.models import SyncMode
from source_convex.source import ConvexStream


@pytest.fixture
def patch_base_class(mocker):
# Mock abstract methods to enable instantiating abstract class
mocker.patch.object(ConvexStream, "path", "v0/example_endpoint")
mocker.patch.object(ConvexStream, "primary_key", "test_primary_key")
mocker.patch.object(ConvexStream, "__abstractmethods__", set())

Expand Down Expand Up @@ -62,6 +64,66 @@ def test_next_page_token(patch_base_class):
assert stream.state == {"snapshot_cursor": 1235, "snapshot_has_more": False, "delta_cursor": 7000}


@responses.activate
def test_read_records_full_refresh(patch_base_class):
stream = ConvexStream("http://mocked_base_url:8080", "accesskey", "json", "messages", None)
snapshot0_resp = {"values": [{"_id": "my_id", "field": "f", "_ts": 123}], "cursor": 1234, "snapshot": 5000, "hasMore": True}
responses.add(
responses.GET,
"http://mocked_base_url:8080/api/list_snapshot?tableName=messages&format=json",
json=snapshot0_resp,
)
snapshot1_resp = {"values": [{"_id": "an_id", "field": "b", "_ts": 100}], "cursor": 2345, "snapshot": 5000, "hasMore": True}
responses.add(
responses.GET,
"http://mocked_base_url:8080/api/list_snapshot?tableName=messages&format=json&cursor=1234&snapshot=5000",
json=snapshot1_resp,
)
snapshot2_resp = {"values": [{"_id": "a_id", "field": "x", "_ts": 300}], "cursor": 3456, "snapshot": 5000, "hasMore": False}
responses.add(
responses.GET,
"http://mocked_base_url:8080/api/list_snapshot?tableName=messages&format=json&cursor=2345&snapshot=5000",
json=snapshot2_resp,
)
records = list(stream.read_records(SyncMode.full_refresh))
assert len(records) == 3
assert [record["field"] for record in records] == ["f", "b", "x"]
assert stream.state == {"delta_cursor": 5000, "snapshot_cursor": 3456, "snapshot_has_more": False}


@responses.activate
def test_read_records_incremental(patch_base_class):
stream = ConvexStream("http://mocked_base_url:8080", "accesskey", "json", "messages", None)
snapshot0_resp = {"values": [{"_id": "my_id", "field": "f", "_ts": 123}], "cursor": 1234, "snapshot": 5000, "hasMore": True}
responses.add(
responses.GET,
"http://mocked_base_url:8080/api/list_snapshot?tableName=messages&format=json",
json=snapshot0_resp,
)
snapshot1_resp = {"values": [{"_id": "an_id", "field": "b", "_ts": 100}], "cursor": 2345, "snapshot": 5000, "hasMore": False}
responses.add(
responses.GET,
"http://mocked_base_url:8080/api/list_snapshot?tableName=messages&format=json&cursor=1234&snapshot=5000",
json=snapshot1_resp,
)
delta0_resp = {"values": [{"_id": "a_id", "field": "x", "_ts": 300}], "cursor": 6000, "hasMore": True}
responses.add(
responses.GET,
"http://mocked_base_url:8080/api/document_deltas?tableName=messages&format=json&cursor=5000",
json=delta0_resp,
)
delta1_resp = {"values": [{"_id": "a_id", "field": "x", "_ts": 400}], "cursor": 7000, "hasMore": False}
responses.add(
responses.GET,
"http://mocked_base_url:8080/api/document_deltas?tableName=messages&format=json&cursor=6000",
json=delta1_resp,
)
records = list(stream.read_records(SyncMode.incremental))
assert len(records) == 4
assert [record["field"] for record in records] == ["f", "b", "x", "x"]
assert stream.state == {"delta_cursor": 7000, "snapshot_cursor": 2345, "snapshot_has_more": False}


def test_parse_response(patch_base_class):
stream = ConvexStream("murky-swan-635", "accesskey", "json", "messages", None)
resp = MagicMock()
Expand All @@ -75,7 +137,7 @@ def test_parse_response(patch_base_class):
def test_request_headers(patch_base_class):
stream = ConvexStream("murky-swan-635", "accesskey", "json", "messages", None)
inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None}
assert stream.request_headers(**inputs) == {"Convex-Client": "airbyte-export-0.3.0"}
assert stream.request_headers(**inputs) == {"Convex-Client": "airbyte-export-0.4.0"}


def test_http_method(patch_base_class):
Expand Down
13 changes: 7 additions & 6 deletions docs/integrations/sources/convex.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ In the Data tab, you should see the tables and a sample of the data that will be

## Changelog

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :------------------------------------------- |
| 0.3.0 | 2023-09-28 | [30853](https://github.com/airbytehq/airbyte/pull/30853) | 🐛 Convex source switch to clean JSON format |
| 0.2.0 | 2023-06-21 | [27226](https://github.com/airbytehq/airbyte/pull/27226) | 🐛 Convex source fix skipped records |
| 0.1.1 | 2023-03-06 | [23797](https://github.com/airbytehq/airbyte/pull/23797) | 🐛 Convex source connector error messages |
| 0.1.0 | 2022-10-24 | [18403](https://github.com/airbytehq/airbyte/pull/18403) | 🎉 New Source: Convex |
| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :--------------------------------------------------------------- |
| 0.4.0 | 2023-12-13 | [33431](https://github.com/airbytehq/airbyte/pull/33431) | 🐛 Convex source fix bug where full_refresh stops after one page |
| 0.3.0 | 2023-09-28 | [30853](https://github.com/airbytehq/airbyte/pull/30853) | 🐛 Convex source switch to clean JSON format |
| 0.2.0 | 2023-06-21 | [27226](https://github.com/airbytehq/airbyte/pull/27226) | 🐛 Convex source fix skipped records |
| 0.1.1 | 2023-03-06 | [23797](https://github.com/airbytehq/airbyte/pull/23797) | 🐛 Convex source connector error messages |
| 0.1.0 | 2022-10-24 | [18403](https://github.com/airbytehq/airbyte/pull/18403) | 🎉 New Source: Convex |

0 comments on commit 63e96fb

Please sign in to comment.