Skip to content

Commit

Permalink
Add ActionIfExists.MERGE_REFS_OR_APPEND (#619)
Browse files Browse the repository at this point in the history
* Add ActionIfExists.MERGE_REFS_OR_APPEND (#607) (#618)

* Lint files

---------

Co-authored-by: Tiago Lubiana <[email protected]>
  • Loading branch information
LeMyst and lubianat authored Jan 7, 2024
1 parent a568cc8 commit 928f602
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 1 deletion.
52 changes: 51 additions & 1 deletion wikibaseintegrator/models/claims.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import copy
import warnings
from abc import abstractmethod
from typing import Any, Callable

Expand Down Expand Up @@ -97,7 +98,26 @@ def add(self, claims: Claims | list[Claim] | Claim, action_if_exists: ActionIfEx
elif action_if_exists == ActionIfExists.REPLACE_ALL:
if claim not in self.claims[property]:
self.claims[property].append(claim)

elif action_if_exists == ActionIfExists.MERGE_REFS_OR_APPEND:
claim_exists = False
for existing_claim in self.claims[property]:
existing_claim_json = existing_claim.get_json()
claim_to_add_json = claim.get_json()

# Check if the values match, including qualifiers
if (claim_to_add_json["mainsnak"]["datavalue"]["value"] == existing_claim_json["mainsnak"]["datavalue"]["value"]) and claim.quals_equal(claim, existing_claim):
claim_exists = True

# Check if current reference block is present on references
if not Claim.ref_present(newitem=claim, olditem=existing_claim):
for ref_to_add in claim.references:
if ref_to_add not in existing_claim.references:
existing_claim.references.add(ref_to_add)
break

# If the claim value does not exist, append it
if not claim_exists:
self.claims[property].append(claim)
return self

def from_json(self, json_data: dict[str, Any]) -> Claims:
Expand Down Expand Up @@ -363,6 +383,17 @@ def equals(self, that: Claim, include_ref: bool = False, fref: Callable | None =

return fref(self, that)

@staticmethod
def quals_equal(olditem: Claim, newitem: Claim) -> bool:
"""
Tests for exactly identical qualifiers.
"""

oldqual = olditem.qualifiers
newqual = newitem.qualifiers

return (len(oldqual) == len(newqual)) and all(x in oldqual for x in newqual)

@staticmethod
def refs_equal(olditem: Claim, newitem: Claim) -> bool:
"""
Expand All @@ -377,6 +408,25 @@ def ref_equal(oldref: References, newref: References) -> bool:

return len(oldrefs) == len(newrefs) and all(any(ref_equal(oldref, newref) for oldref in oldrefs) for newref in newrefs)

@staticmethod
def ref_present(olditem: Claim, newitem: Claim) -> bool:
"""
Tests if (1) there is a single ref in the new item and
(2) if this single ref is present among the claims of the old item.
"""

oldrefs = olditem.references
newrefs = newitem.references

if len(newrefs) != 1:
warnings.warn("New item has more or less than 1 reference block.")
return False

def ref_equal(oldref: References, newref: References) -> bool:
return (len(oldref) == len(newref)) and all(x in oldref for x in newref)

return any(any(ref_equal(oldref, newref) for oldref in oldrefs) for newref in newrefs)

@abstractmethod
def get_sparql_value(self) -> str:
pass
2 changes: 2 additions & 0 deletions wikibaseintegrator/wbi_enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ class ActionIfExists(Enum):
FORCE_APPEND: Forces the addition of the new element to the property, even if it already exists.
KEEP: Does nothing if the property already has elements stated.
REPLACE_ALL: Replace all elements with the same property number.
MERGE_REFS_OR_APPEND: Add the new element to the property if it does not exist, otherwise merge the references, adding the references for the new claim as a new reference block.
"""
APPEND_OR_REPLACE = auto()
FORCE_APPEND = auto()
KEEP = auto()
REPLACE_ALL = auto()
MERGE_REFS_OR_APPEND = auto()


class WikibaseDatatype(Enum):
Expand Down

0 comments on commit 928f602

Please sign in to comment.