Skip to content

Commit

Permalink
clean up a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
pudo committed Sep 1, 2024
1 parent b4c839c commit 3b0c4ec
Showing 1 changed file with 21 additions and 5 deletions.
26 changes: 21 additions & 5 deletions nomenklatura/store/resolved.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import orjson
import logging
from redis.client import Redis
from typing import Generator, List, Optional, Tuple
from followthemoney.property import Property
Expand All @@ -7,12 +8,17 @@
from nomenklatura.kv import get_redis, close_redis, b
from nomenklatura.dataset import DS
from nomenklatura.entity import CE
from nomenklatura.resolver import Linker
from nomenklatura.resolver import Linker, StrIdent
from nomenklatura.statement import Statement
from nomenklatura.store.base import Store, View, Writer

log = logging.getLogger(__name__)


class ResolvedStore(Store[DS, CE]):
"""A store implementation which is built to store fully resolved entities. This
implementation is not designed to be updated, and cannot store individual statements."""

def __init__(
self,
dataset: DS,
Expand All @@ -34,7 +40,7 @@ def view(self, scope: DS, external: bool = False) -> View[DS, CE]:
raise NotImplementedError("External views not supported!")
return ResolvedView(self, scope, external=external)

def update(self, id: str) -> None:
def update(self, id: StrIdent) -> None:
raise NotImplementedError("Entity store cannot update entities")

def assemble(self, statements: List[Statement]) -> Optional[CE]:
Expand All @@ -44,7 +50,7 @@ def assemble(self, statements: List[Statement]) -> Optional[CE]:
return self.entity_class.from_statements(self.dataset, statements)

def drop(self, prefix: Optional[str]) -> None:
"""Delete all data associated with a specific version of a dataset."""
"""Delete all data associated with a prefix of the store."""
pipeline = self.db.pipeline()
prefix = f"xre:{prefix}" if prefix else self.prefix
cmds = 0
Expand All @@ -58,6 +64,16 @@ def drop(self, prefix: Optional[str]) -> None:
if cmds > 0:
pipeline.execute()

def derive(self, store: Store[DS, CE]) -> None:
"""Copy all data from another store into this one."""
writer = self.writer()
view = store.default_view()
for idx, entity in enumerate(view.entities()):
if idx > 0 and idx % 10_000 == 0:
log.info("Deriving resolved store %s: %s...", store.dataset.name, idx)
writer.add_entity(entity)
writer.flush()

def close(self) -> None:
close_redis()

Expand Down Expand Up @@ -182,9 +198,9 @@ def get_inverted(self, id: str) -> Generator[Tuple[Property, CE], None, None]:
def entities(self) -> Generator[CE, None, None]:
prefix = f"{self.store.prefix}:e:*"
batch: List[bytes] = []
for id in self.store.db.scan_iter(prefix, count=100_000):
for id in self.store.db.scan_iter(prefix, count=50_000):
batch.append(id)
if len(batch) >= 1000:
if len(batch) >= 1_000:
datas = self.store.db.mget(batch)
for data in datas:
if data is None:
Expand Down

0 comments on commit 3b0c4ec

Please sign in to comment.