From d8cfa79dfb66f6edd256739c2bd99997d5a04593 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Fri, 13 Oct 2023 15:21:40 +0200 Subject: [PATCH] Source Apify Dataset: add item_collection stream with dynamic schema (#31333) Co-authored-by: Joe Reuter --- .../source-apify-dataset/Dockerfile | 2 +- .../source-apify-dataset/metadata.yaml | 8 ++++--- .../source_apify_dataset/manifest.yaml | 16 +++++++++++++ .../schemas/item_collection.json | 12 ++++++++++ .../wrapping_dpath_extractor.py | 24 +++++++++++++++++++ .../sources/apify-dataset-migrations.md | 6 ++++- docs/integrations/sources/apify-dataset.md | 11 +++++++-- 7 files changed, 72 insertions(+), 7 deletions(-) create mode 100644 airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/schemas/item_collection.json create mode 100644 airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/wrapping_dpath_extractor.py diff --git a/airbyte-integrations/connectors/source-apify-dataset/Dockerfile b/airbyte-integrations/connectors/source-apify-dataset/Dockerfile index e83b72f6ecf0..c51f4d752c98 100644 --- a/airbyte-integrations/connectors/source-apify-dataset/Dockerfile +++ b/airbyte-integrations/connectors/source-apify-dataset/Dockerfile @@ -34,5 +34,5 @@ COPY source_apify_dataset ./source_apify_dataset ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=2.0.0 +LABEL io.airbyte.version=2.1.0 LABEL io.airbyte.name=airbyte/source-apify-dataset diff --git a/airbyte-integrations/connectors/source-apify-dataset/metadata.yaml b/airbyte-integrations/connectors/source-apify-dataset/metadata.yaml index 3c165d9154d0..70a8c083a8b7 100644 --- a/airbyte-integrations/connectors/source-apify-dataset/metadata.yaml +++ b/airbyte-integrations/connectors/source-apify-dataset/metadata.yaml @@ -7,14 +7,13 @@ data: enabled: true cloud: enabled: true - dockerImageTag: 0.2.0 # https://github.com/airbytehq/airbyte/issues/30478 connectorSubtype: api connectorType: source definitionId: 47f17145-fe20-4ef5-a548-e29b048adf84 - dockerImageTag: 2.0.0 + dockerImageTag: 2.1.0 dockerRepository: airbyte/source-apify-dataset githubIssueLabel: source-apify-dataset - icon: apify-dataset.svg + icon: apify.svg license: MIT name: Apify Dataset releaseDate: 2023-08-25 @@ -27,6 +26,9 @@ data: 2.0.0: upgradeDeadline: 2023-09-18 message: "Fix broken stream, manifest refactor" + 2.1.0: + upgradeDeadline: 2023-10-27 + message: "Rename dataset streams" supportLevel: community documentationUrl: https://docs.airbyte.com/integrations/sources/apify-dataset tags: diff --git a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/manifest.yaml b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/manifest.yaml index 600dfa99356a..1d2bd898809d 100644 --- a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/manifest.yaml +++ b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/manifest.yaml @@ -107,9 +107,25 @@ streams: type: DpathExtractor field_path: [] + - type: DeclarativeStream + name: item_collection + $parameters: + path: "datasets/{{ config['dataset_id'] }}/items" + schema_loader: + type: JsonFileSchemaLoader + file_path: "./source_apify_dataset/schemas/item_collection.json" + retriever: + $ref: "#/definitions/retriever" + record_selector: + type: RecordSelector + extractor: + class_name: source_apify_dataset.wrapping_dpath_extractor.WrappingDpathExtractor + field_path: [] + check: type: CheckStream stream_names: - dataset_collection - dataset - item_collection_website_content_crawler + - item_collection diff --git a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/schemas/item_collection.json b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/schemas/item_collection.json new file mode 100644 index 000000000000..5ceff1848c55 --- /dev/null +++ b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/schemas/item_collection.json @@ -0,0 +1,12 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Item collection", + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "data": { + "additionalProperties": true, + "type": ["null", "object"] + } + } +} diff --git a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/wrapping_dpath_extractor.py b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/wrapping_dpath_extractor.py new file mode 100644 index 000000000000..0fea713e975a --- /dev/null +++ b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/wrapping_dpath_extractor.py @@ -0,0 +1,24 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from dataclasses import dataclass + +import requests +from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor +from airbyte_cdk.sources.declarative.types import Record + + +@dataclass +class WrappingDpathExtractor(DpathExtractor): + """ + Record extractor that wraps the extracted value into a dict, with the value being set to the key `data`. + This is done because the actual shape of the data is dynamic, so by wrapping everything into a `data` object + it can be specified as a generic object in the schema. + + Note that this will cause fields to not be normalized in the destination. + """ + + def extract_records(self, response: requests.Response) -> list[Record]: + records = super().extract_records(response) + return [{"data": record} for record in records] diff --git a/docs/integrations/sources/apify-dataset-migrations.md b/docs/integrations/sources/apify-dataset-migrations.md index e2c4a948a077..dbc7e63056d2 100644 --- a/docs/integrations/sources/apify-dataset-migrations.md +++ b/docs/integrations/sources/apify-dataset-migrations.md @@ -1,5 +1,9 @@ # Apify Dataset Migration Guide +## Upgrading to 2.1.0 + +A minor update adding a new stream `item_collection` for general datasets. No actions are required regarding your current connector configuration setup. + ## Upgrading to 2.0.0 Major update: The old broken Item Collection stream has been removed and replaced with a new Item Collection (WCC) stream specific for the datasets produced by [Website Content Crawler](https://apify.com/apify/website-content-crawler) Actor. Please update your connector configuration setup. Note: The schema of the Apify Dataset is at least Actor-specific, so we cannot have a general Stream with a static schema for getting data from a Dataset. @@ -7,4 +11,4 @@ Major update: The old broken Item Collection stream has been removed and replace ## Upgrading to 1.0.0 A major update fixing the data ingestion to retrieve properly data from Apify. -Please update your connector configuration setup. \ No newline at end of file +Please update your connector configuration setup. diff --git a/docs/integrations/sources/apify-dataset.md b/docs/integrations/sources/apify-dataset.md index 69f060a2ab12..d672cced7c9b 100644 --- a/docs/integrations/sources/apify-dataset.md +++ b/docs/integrations/sources/apify-dataset.md @@ -46,19 +46,26 @@ The Apify dataset connector uses [Apify Python Client](https://docs.apify.com/ap - Apify Personal API token (you can find it [here](https://console.apify.com/account/integrations)) - Dataset ID (check the [docs](https://docs.apify.com/platform/storage/dataset)) -### `item_collection_website_content_crawler` +### `item_collection` - Calls `api.apify.com/v2/datasets/{datasetId}/items` ([docs](https://docs.apify.com/api/v2#/reference/datasets/item-collection/get-items)) - Properties: - Apify Personal API token (you can find it [here](https://console.apify.com/account/integrations)) - Dataset ID (check the [docs](https://docs.apify.com/platform/storage/dataset)) - Limitations: - - Currently works only for the datasets produced by [Website Content Crawler](https://apify.com/apify/website-content-crawler). + - The stream uses a dynamic schema (all the data are stored under the `"data"` key), so it should support all the Apify Datasets (produced by whatever Actor). + +### `item_collection_website_content_crawler` + +- Calls the same endpoint and uses the same properties as the `item_collection` stream. +- Limitations: + - The stream uses a static schema which corresponds to the datasets produced by [Website Content Crawler](https://apify.com/apify/website-content-crawler) Actor. So only datasets produced by this Actor are supported. ## Changelog | Version | Date | Pull Request | Subject | | :------ | :--------- | :----------------------------------------------------------- | :-------------------------------------------------------------------------- | +| 2.1.0 | 2023-10-13 | [31333](https://github.com/airbytehq/airbyte/pull/31333) | Add stream for arbitrary datasets | | 2.0.0 | 2023-09-18 | [30428](https://github.com/airbytehq/airbyte/pull/30428) | Fix broken stream, manifest refactor | | 1.0.0 | 2023-08-25 | [29859](https://github.com/airbytehq/airbyte/pull/29859) | Migrate to lowcode | | 0.2.0 | 2022-06-20 | [28290](https://github.com/airbytehq/airbyte/pull/28290) | Make connector work with platform changes not syncing empty stream schemas. |