Skip to content

Commit f517c3b

Browse files
authored
🐛 Source Shopify: fix duplicates for Product Images, Metafield Product Images and Metafield Products streams for Incremental syncs (airbytehq#46095)
1 parent 4b3533a commit f517c3b

File tree

10 files changed

+189
-39
lines changed

10 files changed

+189
-39
lines changed

airbyte-integrations/connectors/source-shopify/acceptance-test-config.yml

-4
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,6 @@ acceptance_tests:
2424
discovery:
2525
tests:
2626
- config_path: "secrets/config.json"
27-
backward_compatibility_tests_config:
28-
# specified the Type for `customer_journey_summary` field,
29-
# for `customer_journey_summary` stream.
30-
disable_for_version: 2.14.17
3127
basic_read:
3228
tests:
3329
- config_path: "secrets/config_transactions_with_user_id.json"

airbyte-integrations/connectors/source-shopify/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ data:
1111
connectorSubtype: api
1212
connectorType: source
1313
definitionId: 9da77001-af33-4bcd-be46-6252bf9342b9
14-
dockerImageTag: 2.5.2
14+
dockerImageTag: 2.5.3
1515
dockerRepository: airbyte/source-shopify
1616
documentationUrl: https://docs.airbyte.com/integrations/sources/shopify
1717
erdUrl: https://dbdocs.io/airbyteio/source-shopify?view=relationships

airbyte-integrations/connectors/source-shopify/pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "2.5.2"
6+
version = "2.5.3"
77
name = "source-shopify"
88
description = "Source CDK implementation for Shopify."
99
authors = [ "Airbyte <[email protected]>",]

airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/query.py

+110-15
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,17 @@ def prepare(query: str) -> str:
8080
@dataclass
8181
class ShopifyBulkQuery:
8282
config: Mapping[str, Any]
83+
parent_stream_name: Optional[str] = None
84+
parent_stream_cursor: Optional[str] = None
85+
86+
@property
87+
def has_parent_stream(self) -> bool:
88+
return True if self.parent_stream_name and self.parent_stream_cursor else False
89+
90+
@property
91+
def parent_cursor_key(self) -> Optional[str]:
92+
if self.has_parent_stream:
93+
return f"{self.parent_stream_name}_{self.parent_stream_cursor}"
8394

8495
@property
8596
def shop_id(self) -> int:
@@ -132,6 +143,38 @@ def query_nodes(self) -> Optional[Union[List[Field], List[str]]]:
132143
"""
133144
return ["__typename", "id"]
134145

146+
def _inject_parent_cursor_field(self, nodes: List[Field], key: str = "updatedAt", index: int = 2) -> List[Field]:
147+
if self.has_parent_stream:
148+
# inject parent cursor key as alias to the `updatedAt` parent cursor field
149+
nodes.insert(index, Field(name="updatedAt", alias=self.parent_cursor_key))
150+
151+
return nodes
152+
153+
def _add_parent_record_state(self, record: MutableMapping[str, Any], items: List[dict], to_rfc3339: bool = False) -> List[dict]:
154+
"""
155+
Adds a parent cursor value to each item in the list.
156+
157+
This method iterates over a list of dictionaries and adds a new key-value pair to each dictionary.
158+
The key is the value of `self.query_name`, and the value is another dictionary with a single key "updated_at"
159+
and the provided `parent_cursor_value`.
160+
161+
Args:
162+
items (List[dict]): A list of dictionaries to which the parent cursor value will be added.
163+
parent_cursor_value (str): The value to be set for the "updated_at" key in the nested dictionary.
164+
165+
Returns:
166+
List[dict]: The modified list of dictionaries with the added parent cursor values.
167+
"""
168+
169+
if self.has_parent_stream:
170+
parent_cursor_value: Optional[str] = record.get(self.parent_cursor_key, None)
171+
parent_state = self.tools._datetime_str_to_rfc3339(parent_cursor_value) if to_rfc3339 and parent_cursor_value else None
172+
173+
for item in items:
174+
item[self.parent_stream_name] = {self.parent_stream_cursor: parent_state}
175+
176+
return items
177+
135178
def get(self, filter_field: Optional[str] = None, start: Optional[str] = None, end: Optional[str] = None) -> str:
136179
# define filter query string, if passed
137180
filter_query = f"{filter_field}:>='{start}' AND {filter_field}:<='{end}'" if filter_field else None
@@ -285,15 +328,22 @@ def query_nodes(self) -> List[Field]:
285328
List of available fields:
286329
https://shopify.dev/docs/api/admin-graphql/unstable/objects/Metafield
287330
"""
331+
332+
nodes = super().query_nodes
333+
288334
# define metafield node
289335
metafield_node = self.get_edge_node("metafields", self.metafield_fields)
290336

291337
if isinstance(self.type.value, list):
292-
return ["__typename", "id", self.get_edge_node(self.type.value[1], ["__typename", "id", metafield_node])]
338+
nodes = [*nodes, self.get_edge_node(self.type.value[1], [*nodes, metafield_node])]
293339
elif isinstance(self.type.value, str):
294-
return ["__typename", "id", metafield_node]
340+
nodes = [*nodes, metafield_node]
295341

296-
def record_process_components(self, record: MutableMapping[str, Any]) -> Iterable[MutableMapping[str, Any]]:
342+
nodes = self._inject_parent_cursor_field(nodes)
343+
344+
return nodes
345+
346+
def _process_metafield(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
297347
# resolve parent id from `str` to `int`
298348
record["owner_id"] = self.tools.resolve_str_id(record.get(BULK_PARENT_KEY))
299349
# add `owner_resource` field
@@ -304,7 +354,28 @@ def record_process_components(self, record: MutableMapping[str, Any]) -> Iterabl
304354
record["createdAt"] = self.tools.from_iso8601_to_rfc3339(record, "createdAt")
305355
record["updatedAt"] = self.tools.from_iso8601_to_rfc3339(record, "updatedAt")
306356
record = self.tools.fields_names_to_snake_case(record)
307-
yield record
357+
return record
358+
359+
def _process_components(self, entity: List[dict]) -> Iterable[MutableMapping[str, Any]]:
360+
for item in entity:
361+
# resolve the id from string
362+
item["admin_graphql_api_id"] = item.get("id")
363+
item["id"] = self.tools.resolve_str_id(item.get("id"))
364+
yield self._process_metafield(item)
365+
366+
def record_process_components(self, record: MutableMapping[str, Any]) -> Iterable[MutableMapping[str, Any]]:
367+
# get the joined record components collected for the record
368+
record_components = record.get("record_components", {})
369+
# process record components
370+
if not record_components:
371+
yield self._process_metafield(record)
372+
else:
373+
metafields = record_components.get("Metafield", [])
374+
if len(metafields) > 0:
375+
if self.has_parent_stream:
376+
# add parent state to each metafield
377+
metafields = self._add_parent_record_state(record, metafields, to_rfc3339=True)
378+
yield from self._process_components(metafields)
308379

309380

310381
class MetafieldCollection(Metafield):
@@ -343,7 +414,9 @@ class MetafieldCustomer(Metafield):
343414
customers(query: "updated_at:>='2023-02-07T00:00:00+00:00' AND updated_at:<='2023-12-04T00:00:00+00:00'", sortKey: UPDATED_AT) {
344415
edges {
345416
node {
417+
__typename
346418
id
419+
customer_updated_at: updatedAt
347420
metafields {
348421
edges {
349422
node {
@@ -366,6 +439,11 @@ class MetafieldCustomer(Metafield):
366439

367440
type = MetafieldType.CUSTOMERS
368441

442+
record_composition = {
443+
"new_record": "Customer",
444+
"record_components": ["Metafield"],
445+
}
446+
369447

370448
class MetafieldLocation(Metafield):
371449
"""
@@ -464,7 +542,9 @@ class MetafieldProduct(Metafield):
464542
products(query: "updated_at:>='2023-02-07T00:00:00+00:00' AND updated_at:<='2023-12-04T00:00:00+00:00'", sortKey: UPDATED_AT) {
465543
edges {
466544
node {
545+
__typename
467546
id
547+
product_updated_at: updatedAt
468548
metafields {
469549
edges {
470550
node {
@@ -487,6 +567,11 @@ class MetafieldProduct(Metafield):
487567

488568
type = MetafieldType.PRODUCTS
489569

570+
record_composition = {
571+
"new_record": "Product",
572+
"record_components": ["Metafield"],
573+
}
574+
490575

491576
class MetafieldProductImage(Metafield):
492577
"""
@@ -496,6 +581,7 @@ class MetafieldProductImage(Metafield):
496581
node {
497582
__typename
498583
id
584+
product_updated_at: updatedAt
499585
media {
500586
edges {
501587
node {
@@ -527,6 +613,13 @@ class MetafieldProductImage(Metafield):
527613
}
528614
"""
529615

616+
type = MetafieldType.PRODUCT_IMAGES
617+
618+
record_composition = {
619+
"new_record": "Product",
620+
"record_components": ["Metafield"],
621+
}
622+
530623
@property
531624
def query_nodes(self) -> List[Field]:
532625
"""
@@ -537,19 +630,16 @@ def query_nodes(self) -> List[Field]:
537630
More info here:
538631
https://shopify.dev/docs/api/release-notes/2024-04#productimage-value-removed
539632
"""
633+
540634
# define metafield node
541635
metafield_node = self.get_edge_node("metafields", self.metafield_fields)
542-
media_fields: List[Field] = [
543-
"__typename",
544-
"id",
545-
InlineFragment(type="MediaImage", fields=[metafield_node]),
546-
]
547-
# define media node
636+
media_fields: List[Field] = ["__typename", "id", InlineFragment(type="MediaImage", fields=[metafield_node])]
548637
media_node = self.get_edge_node("media", media_fields)
638+
549639
fields: List[Field] = ["__typename", "id", media_node]
550-
return fields
640+
fields = self._inject_parent_cursor_field(fields)
551641

552-
type = MetafieldType.PRODUCT_IMAGES
642+
return fields
553643

554644

555645
class MetafieldProductVariant(Metafield):
@@ -2238,6 +2328,7 @@ class ProductImage(ShopifyBulkQuery):
22382328
node {
22392329
__typename
22402330
id
2331+
products_updated_at: updatedAt
22412332
# THE MEDIA NODE IS NEEDED TO PROVIDE THE CURSORS
22422333
media {
22432334
edges {
@@ -2314,8 +2405,7 @@ class ProductImage(ShopifyBulkQuery):
23142405
# media property fields
23152406
media_fields: List[Field] = [Field(name="edges", fields=[Field(name="node", fields=media_fragment)])]
23162407

2317-
# main query
2318-
query_nodes: List[Field] = [
2408+
nodes: List[Field] = [
23192409
"__typename",
23202410
"id",
23212411
Field(name="media", fields=media_fields),
@@ -2330,6 +2420,10 @@ class ProductImage(ShopifyBulkQuery):
23302420
"record_components": ["MediaImage", "Image"],
23312421
}
23322422

2423+
@property
2424+
def query_nodes(self) -> List[Field]:
2425+
return self._inject_parent_cursor_field(self.nodes)
2426+
23332427
def _process_component(self, entity: List[dict]) -> List[dict]:
23342428
for item in entity:
23352429
# remove the `__parentId` from the object
@@ -2405,6 +2499,8 @@ def record_process_components(self, record: MutableMapping[str, Any]) -> Iterabl
24052499

24062500
# add the product_id to each `Image`
24072501
record["images"] = self._add_product_id(record.get("images", []), record.get("id"))
2502+
# add the product cursor to each `Image`
2503+
record["images"] = self._add_parent_record_state(record, record.get("images", []), to_rfc3339=True)
24082504
record["images"] = self._merge_with_media(record_components)
24092505
record.pop("record_components")
24102506

@@ -2413,7 +2509,6 @@ def record_process_components(self, record: MutableMapping[str, Any]) -> Iterabl
24132509
if len(images) > 0:
24142510
# convert dates from ISO-8601 to RFC-3339
24152511
record["images"] = self._convert_datetime_to_rfc3339(images)
2416-
24172512
yield from self._emit_complete_records(images)
24182513

24192514

airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/tools.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,11 @@ def shop_name_from_url(url: str) -> str:
6363
return url
6464

6565
@staticmethod
66-
def from_iso8601_to_rfc3339(record: Mapping[str, Any], field: str) -> Mapping[str, Any]:
66+
def _datetime_str_to_rfc3339(value: str) -> str:
67+
return pdm.parse(value).to_rfc3339_string()
68+
69+
@staticmethod
70+
def from_iso8601_to_rfc3339(record: Mapping[str, Any], field: str) -> Optional[str]:
6771
"""
6872
Converts date-time as follows:
6973
Input: "2023-01-01T15:00:00Z"
@@ -73,7 +77,7 @@ def from_iso8601_to_rfc3339(record: Mapping[str, Any], field: str) -> Mapping[st
7377
# some fields that expected to be resolved as ids, might not be populated for the particular `RECORD`,
7478
# we should return `None` to make the field `null` in the output as the result of the transformation.
7579
target_value = record.get(field)
76-
return pdm.parse(target_value).to_rfc3339_string() if target_value else record.get(field)
80+
return BulkTools._datetime_str_to_rfc3339(target_value) if target_value else record.get(field)
7781

7882
def fields_names_to_snake_case(self, dict_input: Optional[Mapping[str, Any]] = None) -> Optional[MutableMapping[str, Any]]:
7983
# transforming record field names from camel to snake case, leaving the `__parent_id` relation in place

airbyte-integrations/connectors/source-shopify/source_shopify/streams/base_streams.py

+41-11
Original file line numberDiff line numberDiff line change
@@ -644,7 +644,7 @@ def __init__(self, config: Dict) -> None:
644644
self.job_manager: ShopifyBulkManager = ShopifyBulkManager(
645645
http_client=self.bulk_http_client,
646646
base_url=f"{self.url_base}{self.path()}",
647-
query=self.bulk_query(config),
647+
query=self.bulk_query(config, self.parent_stream_name, self.parent_stream_cursor),
648648
job_termination_threshold=float(config.get("job_termination_threshold", 3600)),
649649
# overide the default job slice size, if provided (it's auto-adjusted, later on)
650650
job_size=config.get("bulk_window_in_days", 30.0),
@@ -670,6 +670,20 @@ def parent_stream(self) -> Union[ShopifyStream, IncrementalShopifyStream]:
670670
"""
671671
return self.parent_stream_class(self.config) if self.parent_stream_class else None
672672

673+
@property
674+
def parent_stream_name(self) -> Optional[str]:
675+
"""
676+
Returns the parent stream name, if the substream has a `parent_stream_class` dependency.
677+
"""
678+
return self.parent_stream.name if self.parent_stream_class else None
679+
680+
@property
681+
def parent_stream_cursor(self) -> Optional[str]:
682+
"""
683+
Returns the parent stream cursor, if the substream has a `parent_stream_class` dependency.
684+
"""
685+
return self.parent_stream.cursor_field if self.parent_stream_class else None
686+
673687
@property
674688
@abstractmethod
675689
def bulk_query(self) -> ShopifyBulkQuery:
@@ -716,21 +730,37 @@ def get_updated_state(
716730
"""
717731
updated_state = super().get_updated_state(current_stream_state, latest_record)
718732
if self.parent_stream_class:
733+
parent_state = latest_record.get(self.parent_stream.name, {})
734+
parent_state_value = (
735+
parent_state.get(self.parent_stream.cursor_field) if parent_state else latest_record.get(self.parent_stream.cursor_field)
736+
)
719737
# add parent_stream_state to `updated_state`
720-
updated_state[self.parent_stream.name] = {self.parent_stream.cursor_field: latest_record.get(self.parent_stream.cursor_field)}
738+
updated_state[self.parent_stream.name] = {self.parent_stream.cursor_field: parent_state_value}
721739
return updated_state
722740

723-
def get_stream_state_value(self, stream_state: Optional[Mapping[str, Any]]) -> str:
724-
if self.parent_stream_class:
725-
# get parent stream state from the stream_state object.
726-
parent_state = stream_state.get(self.parent_stream.name, {})
727-
if parent_state:
728-
return parent_state.get(self.parent_stream.cursor_field, self.default_state_comparison_value)
729-
else:
730-
# get the stream state, if no `parent_stream_class` was assigned.
741+
def _get_stream_cursor_value(self, stream_state: Optional[Mapping[str, Any]] = None) -> Optional[str]:
742+
if stream_state:
731743
return stream_state.get(self.cursor_field, self.default_state_comparison_value)
744+
else:
745+
return self.config.get("start_date")
746+
747+
def get_stream_state_value(self, stream_state: Optional[Mapping[str, Any]] = None) -> Optional[str]:
748+
if stream_state:
749+
if self.parent_stream_class:
750+
# get parent stream state from the stream_state object.
751+
parent_state = stream_state.get(self.parent_stream.name, {})
752+
if parent_state:
753+
return parent_state.get(self.parent_stream.cursor_field, self.default_state_comparison_value)
754+
else:
755+
# use the streams cursor value, if no parent state available
756+
return self._get_stream_cursor_value(stream_state)
757+
else:
758+
# get the stream state, if no `parent_stream_class` was assigned.
759+
return self._get_stream_cursor_value(stream_state)
760+
else:
761+
return self.config.get("start_date")
732762

733-
def get_state_value(self, stream_state: Mapping[str, Any] = None) -> Optional[Union[str, int]]:
763+
def get_state_value(self, stream_state: Optional[Mapping[str, Any]] = None) -> Optional[Union[str, int]]:
734764
if stream_state:
735765
return self.get_stream_state_value(stream_state)
736766
else:

0 commit comments

Comments
 (0)