Skip to content

Commit

Permalink
Merge pull request #117 from opensanctions/statement-store
Browse files Browse the repository at this point in the history
Statement store
  • Loading branch information
pudo authored Jun 26, 2023
2 parents 82faddf + 5ac1db8 commit 98c426a
Show file tree
Hide file tree
Showing 44 changed files with 1,069 additions and 625 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ $ cat merged.ijson | wc -l
The command-line use of `nomenklatura` is targeted at small datasets which need to be de-duplicated. For more involved scenarios, the package also offers a Python API which can be used to control the semantics of de-duplication.

* `nomenklatura.Dataset` - implements a basic dataset for describing a set of entities.
* `nomenklatura.Loader` - a general purpose access mechanism for entities. By default, a `nomenklatura.FileLoader` is used to access entity data stored in files, but the loader can be subclassed to work with entities from a database system.
* `nomenklatura.Store` - a general purpose access mechanism for entities. By default, a store is used to access entity data stored in files as an in-memory cache, but the store can be subclassed to work with entities from a database system.
* `nomenklatura.Index` - a full-text in-memory search index for FtM entities. In the application, this is used to block de-duplication candidates, but the index can also be used to drive an API etc.
* `nomenklatura.Resolver` - the core of the de-duplication process, the resolver is essentially a graph with edges made out of entity judgements. The resolver can be used to store judgements or get the canonical ID for a given entity.

Expand Down
7 changes: 3 additions & 4 deletions nomenklatura/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from nomenklatura.dataset import Dataset
from nomenklatura.entity import CompositeEntity
from nomenklatura.resolver import Resolver
from nomenklatura.loader import Loader, FileLoader, MemoryLoader
from nomenklatura.store import Store, View
from nomenklatura.index import Index

__version__ = "2.14.1"
Expand All @@ -10,7 +10,6 @@
"CompositeEntity",
"Resolver",
"Index",
"Loader",
"FileLoader",
"MemoryLoader",
"Store",
"View",
]
13 changes: 6 additions & 7 deletions nomenklatura/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from nomenklatura.cache import Cache
from nomenklatura.matching import train_v2_matcher, train_v1_matcher
from nomenklatura.loader import FileLoader
from nomenklatura.store import load_entity_file_store
from nomenklatura.resolver import Resolver
from nomenklatura.dataset import Dataset
from nomenklatura.entity import CompositeEntity as Entity
Expand Down Expand Up @@ -69,13 +69,12 @@ def xref_file(
scored: bool = True,
) -> None:
resolver_ = _get_resolver(path, resolver)
loader = FileLoader(path, resolver=resolver_)
store = load_entity_file_store(path, resolver=resolver_)
algorithm_type = get_algorithm(algorithm)
if algorithm_type is None:
raise click.Abort(f"Unknown algorithm: {algorithm}")
run_xref(
loader,
resolver_,
store,
auto_threshold=auto_threshold,
algorithm=algorithm_type,
scored=scored,
Expand Down Expand Up @@ -138,11 +137,11 @@ def make_sortable(path: Path, outpath: Path) -> None:
@click.option("-r", "--resolver", type=ResPath)
def dedupe(path: Path, xref: bool = False, resolver: Optional[Path] = None) -> None:
resolver_ = _get_resolver(path, resolver)
loader = FileLoader(path, resolver=resolver_)
store = load_entity_file_store(path, resolver=resolver_)
if xref:
run_xref(loader, resolver_)
run_xref(store)

dedupe_ui(resolver_, loader)
dedupe_ui(store)
resolver_.save()


Expand Down
8 changes: 8 additions & 0 deletions nomenklatura/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ def parents(self: DS) -> Set[DS]:
def dataset_names(self) -> List[str]:
return [d.name for d in self.datasets]

@cached_property
def scope_names(self) -> Set[str]:
"""This is based on the premise that collections (ie. datasets which have children)
never contain entities themselves that need to be queried."""
if len(self.children):
return {d.name for d in self.children}
return {self.name}

def __repr__(self) -> str:
return f"<Dataset({self.name})>" # pragma: no cover

Expand Down
73 changes: 45 additions & 28 deletions nomenklatura/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
from nomenklatura.dataset import DS
from nomenklatura.publish.names import pick_name
from nomenklatura.statement.statement import Statement
from nomenklatura.util import BASE_ID

if TYPE_CHECKING:
from nomenklatura.loader import Loader
from nomenklatura.store import View

CE = TypeVar("CE", bound="CompositeEntity")
DEFAULT_DATASET = "default"


class CompositeEntity(EntityProxy):
Expand All @@ -29,10 +31,7 @@ class CompositeEntity(EntityProxy):
"schema",
"id",
"_caption",
"target",
"external",
"referents",
"datasets",
"extra_referents",
"default_dataset",
"statement_type",
"_statements",
Expand All @@ -43,7 +42,7 @@ def __init__(
model: "Model",
data: Dict[str, Any],
cleaned: bool = True,
default_dataset: str = "default",
default_dataset: str = DEFAULT_DATASET,
):
data = dict(data or {})
schema = model.get(data.pop("schema", None))
Expand All @@ -54,19 +53,15 @@ def __init__(
self._caption: Optional[str] = None
"""A pre-computed label for this entity."""

self.target: Optional[bool] = data.pop("target", None)
self.external: Optional[bool] = data.pop("external", None)
self.referents: Set[str] = set(data.pop("referents", []))
self.extra_referents: Set[str] = set(data.pop("referents", []))
"""The IDs of all entities which are included in this canonical entity."""

self.datasets: Set[str] = set(data.pop("datasets", []))
"""The set of datasets from which information in this entity is derived."""

self.default_dataset = default_dataset
self.id: Optional[str] = data.pop("id", None)
self._statements: Dict[str, Set[Statement]] = {}

properties = data.pop("properties", None)
# external = data.pop("external", None)
if isinstance(properties, Mapping):
for key, value in properties.items():
self.add(key, value, cleaned=cleaned, quiet=True)
Expand Down Expand Up @@ -95,8 +90,8 @@ def statements(self) -> Generator[Statement, None, None]:
yield Statement(
canonical_id=self.id,
entity_id=self.id,
prop=Statement.BASE,
prop_type=Statement.BASE,
prop=BASE_ID,
prop_type=BASE_ID,
schema=self.schema.name,
value=self.checksum(),
dataset=self.default_dataset,
Expand All @@ -113,6 +108,29 @@ def last_seen(self) -> Optional[str]:
seen = (s.last_seen for s in self._iter_stmt() if s.last_seen is not None)
return max(seen, default=None)

@property
def target(self) -> Optional[bool]:
target: Optional[bool] = None
for stmt in self._iter_stmt():
if stmt.target is not None:
target = target or stmt.target
return target

@property
def datasets(self) -> Set[str]:
datasets: Set[str] = set()
for stmt in self._iter_stmt():
datasets.add(stmt.dataset)
return datasets

@property
def referents(self) -> Set[str]:
referents: Set[str] = set(self.extra_referents)
for stmt in self._iter_stmt():
if stmt.entity_id is not None and stmt.entity_id != self.id:
referents.add(stmt.entity_id)
return referents

@property
def key_prefix(self) -> Optional[str]:
return self.default_dataset
Expand Down Expand Up @@ -149,12 +167,7 @@ def add_statement(self, stmt: Statement) -> None:
self.schema = model.common_schema(self.schema, stmt.schema)
except InvalidData as exc:
raise InvalidData(f"{self.id}: {exc}") from exc
if stmt.target is not None:
self.target = self.target or stmt.target
self.datasets.add(stmt.dataset)
if stmt.entity_id != self.id and stmt.entity_id is not None:
self.referents.add(stmt.entity_id)
if stmt.prop != Statement.BASE:
if stmt.prop != BASE_ID:
self._statements.setdefault(stmt.prop, set())
self._statements[stmt.prop].add(stmt)

Expand Down Expand Up @@ -414,7 +427,7 @@ def __len__(self) -> int:
return len(list(self._iter_stmt())) + 1

def _to_nested_dict(
self: CE, loader: "Loader[DS, CE]", depth: int, path: List[str]
self: CE, view: "View[DS, CE]", depth: int, path: List[str]
) -> Dict[str, Any]:
next_depth = depth if self.schema.edge else depth - 1
next_path = list(path)
Expand All @@ -423,27 +436,31 @@ def _to_nested_dict(
data = self.to_dict()
if next_depth < 0:
return data
nested: Dict[str, Any] = {}
for prop, adjacent in loader.get_adjacent(self):
nested: Dict[str, List[Any]] = {}
for prop, adjacent in view.get_adjacent(self):
if adjacent.id in next_path:
continue
value = adjacent._to_nested_dict(loader, next_depth, next_path)
value = adjacent._to_nested_dict(view, next_depth, next_path)
if prop.name not in nested:
nested[prop.name] = []
nested[prop.name].append(value)
data["properties"].update(nested)
return data

def to_nested_dict(
self: CE, loader: "Loader[DS, CE]", depth: int = 1
self: CE, view: "View[DS, CE]", depth: int = 1
) -> Dict[str, Any]:
return self._to_nested_dict(loader, depth=depth, path=[])
return self._to_nested_dict(view, depth=depth, path=[])

@classmethod
def from_dict(
cls: Type[CE], model: Model, data: Dict[str, Any], cleaned: bool = True
cls: Type[CE],
model: Model,
data: Dict[str, Any],
cleaned: bool = True,
default_dataset: str = DEFAULT_DATASET,
) -> CE:
return super().from_dict(model, data, cleaned=cleaned)
return cls(model, data, cleaned=cleaned, default_dataset=default_dataset)

@classmethod
def from_statements(cls: Type[CE], statements: Iterable[Statement]) -> CE:
Expand Down
Loading

0 comments on commit 98c426a

Please sign in to comment.