From 3b0c4ec4fe8f620d1eba284e491e3fefa5adaa15 Mon Sep 17 00:00:00 2001 From: Friedrich Lindenberg Date: Sun, 1 Sep 2024 17:16:42 +0200 Subject: [PATCH] clean up a bit --- nomenklatura/store/resolved.py | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/nomenklatura/store/resolved.py b/nomenklatura/store/resolved.py index bca4b8e4..b4752174 100644 --- a/nomenklatura/store/resolved.py +++ b/nomenklatura/store/resolved.py @@ -1,4 +1,5 @@ import orjson +import logging from redis.client import Redis from typing import Generator, List, Optional, Tuple from followthemoney.property import Property @@ -7,12 +8,17 @@ from nomenklatura.kv import get_redis, close_redis, b from nomenklatura.dataset import DS from nomenklatura.entity import CE -from nomenklatura.resolver import Linker +from nomenklatura.resolver import Linker, StrIdent from nomenklatura.statement import Statement from nomenklatura.store.base import Store, View, Writer +log = logging.getLogger(__name__) + class ResolvedStore(Store[DS, CE]): + """A store implementation which is built to store fully resolved entities. This + implementation is not designed to be updated, and cannot store individual statements.""" + def __init__( self, dataset: DS, @@ -34,7 +40,7 @@ def view(self, scope: DS, external: bool = False) -> View[DS, CE]: raise NotImplementedError("External views not supported!") return ResolvedView(self, scope, external=external) - def update(self, id: str) -> None: + def update(self, id: StrIdent) -> None: raise NotImplementedError("Entity store cannot update entities") def assemble(self, statements: List[Statement]) -> Optional[CE]: @@ -44,7 +50,7 @@ def assemble(self, statements: List[Statement]) -> Optional[CE]: return self.entity_class.from_statements(self.dataset, statements) def drop(self, prefix: Optional[str]) -> None: - """Delete all data associated with a specific version of a dataset.""" + """Delete all data associated with a prefix of the store.""" pipeline = self.db.pipeline() prefix = f"xre:{prefix}" if prefix else self.prefix cmds = 0 @@ -58,6 +64,16 @@ def drop(self, prefix: Optional[str]) -> None: if cmds > 0: pipeline.execute() + def derive(self, store: Store[DS, CE]) -> None: + """Copy all data from another store into this one.""" + writer = self.writer() + view = store.default_view() + for idx, entity in enumerate(view.entities()): + if idx > 0 and idx % 10_000 == 0: + log.info("Deriving resolved store %s: %s...", store.dataset.name, idx) + writer.add_entity(entity) + writer.flush() + def close(self) -> None: close_redis() @@ -182,9 +198,9 @@ def get_inverted(self, id: str) -> Generator[Tuple[Property, CE], None, None]: def entities(self) -> Generator[CE, None, None]: prefix = f"{self.store.prefix}:e:*" batch: List[bytes] = [] - for id in self.store.db.scan_iter(prefix, count=100_000): + for id in self.store.db.scan_iter(prefix, count=50_000): batch.append(id) - if len(batch) >= 1000: + if len(batch) >= 1_000: datas = self.store.db.mget(batch) for data in datas: if data is None: