Skip to content

Commit

Permalink
Merge pull request #16 from njgheorghita/start-block
Browse files Browse the repository at this point in the history
Intelligently find start block
  • Loading branch information
njgheorghita authored May 16, 2019
2 parents 0f5d5dc + eb46852 commit 6b8e84c
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 43 deletions.
8 changes: 0 additions & 8 deletions ethpm_cli/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,6 @@ class BlockNotFoundError(BaseEthpmCliError):
pass


class BlockAlreadyScrapedError(BaseEthpmCliError):
"""
Raised when a block has already been scraped for VersionRelease events.
"""

pass


class ValidationError(BaseEthpmCliError):
"""
Raised when a command line args is not valid.
Expand Down
6 changes: 6 additions & 0 deletions ethpm_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pathlib import Path
import sys

from eth_utils import humanize_hash
import pkg_resources
from web3 import Web3
from web3.middleware import local_filter_middleware
Expand Down Expand Up @@ -41,6 +42,11 @@ def scraper(args: argparse.Namespace) -> None:
last_scraped_block_hash = w3.eth.getBlock(last_scraped_block)["hash"]
logger = setup_cli_logger()
logger.info(
"All blocks scraped up to # %d: %s.",
last_scraped_block,
humanize_hash(last_scraped_block_hash),
)
logger.debug(
"All blocks scraped up to # %d: %s.",
last_scraped_block,
last_scraped_block_hash,
Expand Down
132 changes: 102 additions & 30 deletions ethpm_cli/scraper.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from datetime import datetime
import itertools
import json
import logging
from pathlib import Path
import shutil
import tempfile
from typing import Any, Dict, Iterable, List, Set, Tuple # noqa: F401

from eth_utils import to_dict, to_list
Expand All @@ -14,18 +17,23 @@

from ethpm_cli._utils.various import flatten
from ethpm_cli.constants import VERSION_RELEASE_ABI
from ethpm_cli.exceptions import BlockAlreadyScrapedError, BlockNotFoundError
from ethpm_cli.exceptions import BlockNotFoundError
from ethpm_cli.validation import validate_chain_data_store

logger = logging.getLogger("ethpm_cli.scraper.Scraper")

# https://github.com/ethereum/EIPs/commit/123b7267b6270914a822001c119d11607e695517
VERSION_RELEASE_TIMESTAMP = 1_552_564_800 # March 14, 2019

BATCH_SIZE = 5000


def scrape(w3: Web3, ethpm_dir: Path, start_block: int = 0) -> int:
"""
Scrapes VersionRelease event data starting from start_block.
If start_block is 0 (default), scraping begins from the
max block found in ethpm_dir/chain_data.json.
If start_block is not 0, scraping begins from start_block.
Otherwise the scraping begins from the ethpm birth block.
"""
initialize_ethpm_dir(ethpm_dir, w3)
chain_data_path = ethpm_dir / "chain_data.json"
Expand All @@ -37,24 +45,65 @@ def scrape(w3: Web3, ethpm_dir: Path, start_block: int = 0) -> int:
f"instance with latest block number of {latest_block}."
)

ethpmdir_block = max(get_scraped_blocks(chain_data_path))
active_block = start_block if start_block else ethpmdir_block
for block in range(active_block, latest_block):
try:
validate_unscraped_block(block, chain_data_path)
except BlockAlreadyScrapedError:
pass
if start_block == 0:
active_block = get_ethpm_birth_block(
w3, 0, latest_block, VERSION_RELEASE_TIMESTAMP
)
else:
active_block = start_block

logger.info("Scraping from block %d.", active_block)
for from_block in range(active_block, latest_block, BATCH_SIZE):
if (from_block + BATCH_SIZE) > latest_block:
to_block = latest_block
else:
scraped_manifests = scrape_block_for_manifests(w3, block)
update_chain_data(chain_data_path, block, scraped_manifests)
to_block = from_block + BATCH_SIZE

if block_range_needs_scraping(from_block, to_block, chain_data_path):
scraped_manifests = scrape_block_range_for_manifests(
w3, from_block, to_block
)
update_chain_data(chain_data_path, from_block, to_block, scraped_manifests)
write_ipfs_uris_to_disk(ethpm_dir, scraped_manifests)
else:
logger.info("Block range: %d - %d already scraped.", from_block, to_block)

return latest_block


def validate_unscraped_block(block: int, chain_data_path: Path) -> None:
def get_ethpm_birth_block(
w3: Web3, from_block: int, to_block: int, target_timestamp: int
) -> int:
"""
Returns the closest block found before the target_timestamp
"""
version_release_date = datetime.fromtimestamp(target_timestamp)
from_date = datetime.fromtimestamp(w3.eth.getBlock(from_block)["timestamp"])
delta = version_release_date - from_date

if delta.days <= 0 and from_date < version_release_date:
while (
w3.eth.getBlock(from_block)["timestamp"] < version_release_date.timestamp()
):
from_block += 1
return from_block - 1

elif from_date < version_release_date:
updated_block = int((from_block + to_block) / 2)
return get_ethpm_birth_block(w3, updated_block, to_block, target_timestamp)

else:
updated_block = from_block - int(to_block - from_block)
return get_ethpm_birth_block(w3, updated_block, from_block, target_timestamp)


def block_range_needs_scraping(
from_block: int, to_block: int, chain_data_path: Path
) -> bool:
all_scraped_blocks = get_scraped_blocks(chain_data_path)
if block in all_scraped_blocks:
raise BlockAlreadyScrapedError(f"Skipping block #{block}. Already processed.")
if any(block not in all_scraped_blocks for block in range(from_block, to_block)):
return True
return False


def initialize_ethpm_dir(ethpm_dir: Path, w3: Web3) -> None:
Expand All @@ -69,19 +118,32 @@ def initialize_ethpm_dir(ethpm_dir: Path, w3: Web3) -> None:
"chain_id": w3.eth.chainId,
"scraped_blocks": [{"min": "0", "max": "0"}],
}
chain_data_path.write_text(f"{json.dumps(init_json, indent=4)}\n")
write_updated_chain_data(chain_data_path, init_json)


def update_chain_data(
chain_data_path: Path, block_number: int, manifests: Dict[Address, Dict[str, str]]
chain_data_path: Path,
from_block: int,
to_block: int,
manifests: Dict[Address, Dict[str, str]],
) -> None:
chain_data = json.loads(chain_data_path.read_text())

all_scraped_blocks = get_scraped_blocks(chain_data_path)
updated_blocks = blocks_to_ranges(set(all_scraped_blocks + [block_number]))
old_scraped_blocks = get_scraped_blocks(chain_data_path)
new_scraped_blocks = list(range(from_block, to_block))
updated_blocks = blocks_to_ranges(set(old_scraped_blocks + new_scraped_blocks))

chain_data_with_updated_blocks = assoc(chain_data, "scraped_blocks", updated_blocks)
write_updated_chain_data(chain_data_path, chain_data_with_updated_blocks)

chain_data_updated_blocks = assoc(chain_data, "scraped_blocks", updated_blocks)
chain_data_path.write_text(f"{json.dumps(chain_data_updated_blocks, indent=4)}\n")

def write_updated_chain_data(
chain_data_path: Path, updated_data: Dict[str, Any]
) -> None:
tmp_pkg_dir = Path(tempfile.mkdtemp())
tmp_data = tmp_pkg_dir / "chain_data.json"
tmp_data.write_text(f"{json.dumps(updated_data, indent=4)}\n")
shutil.copyfile(tmp_data, chain_data_path)


@to_list
Expand All @@ -92,8 +154,8 @@ def blocks_to_ranges(blocks_list: List[int]) -> Iterable[Dict[str, str]]:
-> [{"min": "0", "max": "2"}, {"min": "4", "max": "4"}, {"min": "6", "max": "9"}]
"""
for a, b in itertools.groupby(enumerate(blocks_list), lambda x: x[0] - x[1]):
b = list(b) # type: ignore
yield {"min": str(b[0][1]), "max": str(b[-1][1])} # type: ignore
c = list(b)
yield {"min": str(c[0][1]), "max": str(c[-1][1])}


def get_scraped_blocks(chain_data_path: Path) -> List[int]:
Expand Down Expand Up @@ -138,13 +200,14 @@ def write_ipfs_uris_to_disk(
logger.info("%s written to\n %s.\n", uri, asset_dest_path)


def scrape_block_for_manifests(
w3: Web3, block_number: int
def scrape_block_range_for_manifests(
w3: Web3, from_block: int, to_block: int
) -> Dict[Address, Dict[str, str]]:
version_release_logs = get_block_version_release_logs(w3, block_number)
version_release_logs = get_block_version_release_logs(w3, from_block, to_block)
logger.info(
"Block # %d scraped. %d VersionRelease events found in block.",
block_number,
"Blocks %d-%d scraped. %d VersionRelease events found.",
from_block,
to_block,
len(version_release_logs),
)
if version_release_logs:
Expand Down Expand Up @@ -198,14 +261,23 @@ def process_entries(
) -> Iterable[Tuple[str, str]]:
for entry in all_entries:
if entry["address"] == address:
logger.info(
"<Package %s==%s> released on registry @ %s.\n" "Manifest URI: %s\n",
entry["args"]["packageName"],
entry["args"]["version"],
address,
entry["args"]["manifestURI"],
)
yield "manifestURI", entry["args"]["manifestURI"]
yield "packageName", entry["args"]["packageName"]
yield "version", entry["args"]["version"]


def get_block_version_release_logs(w3: Web3, block_number: int) -> Dict[str, Any]:
def get_block_version_release_logs(
w3: Web3, from_block: int, to_block: int
) -> Dict[str, Any]:
log_contract = w3.eth.contract(abi=VERSION_RELEASE_ABI)
log_filter = log_contract.events.VersionRelease.createFilter(
fromBlock=block_number, toBlock=block_number
fromBlock=from_block, toBlock=to_block
)
return log_filter.get_all_entries()
30 changes: 25 additions & 5 deletions tests/core/test_scraper.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime, timedelta
import json
import os
from pathlib import Path
Expand All @@ -9,7 +10,7 @@

from ethpm_cli import CLI_ASSETS_DIR
from ethpm_cli._utils.testing import check_dir_trees_equal
from ethpm_cli.scraper import scrape
from ethpm_cli.scraper import get_ethpm_birth_block, scrape


@pytest.fixture
Expand Down Expand Up @@ -45,7 +46,7 @@ def test_scraper_logs_scraped_block_ranges(log, w3):

# Initial scrape
w3.testing.mine(6)
scrape(w3, ethpmcli_dir)
scrape(w3, ethpmcli_dir, 1)
expected_1 = {"chain_id": "0x3d", "scraped_blocks": [{"min": "0", "max": "6"}]}
actual_1 = json.loads((ethpmcli_dir / "chain_data.json").read_text())
assert actual_1 == expected_1
Expand Down Expand Up @@ -119,7 +120,7 @@ def test_scraper_writes_to_disk(log, log_2, test_assets_dir, w3):

w3.testing.mine(3)
ethpmcli_dir = Path(os.environ["XDG_ETHPMCLI_ROOT"])
scrape(w3, ethpmcli_dir)
scrape(w3, ethpmcli_dir, 1)
assert check_dir_trees_equal(ethpmcli_dir, (test_assets_dir.parent / "ipfs"))


Expand All @@ -143,7 +144,7 @@ def test_scraper_imports_existing_ethpmcli_dir(log, log_2, test_assets_dir, w3):

ethpmcli_dir = Path(os.environ["XDG_ETHPMCLI_ROOT"])
# First scrape
scrape(w3, ethpmcli_dir)
scrape(w3, ethpmcli_dir, 1)
w3.testing.mine(3)
release(
log,
Expand All @@ -154,5 +155,24 @@ def test_scraper_imports_existing_ethpmcli_dir(log, log_2, test_assets_dir, w3):
)
w3.testing.mine(3)
# Second scrape
scrape(w3, ethpmcli_dir)
scrape(w3, ethpmcli_dir, 1)
assert check_dir_trees_equal(ethpmcli_dir, (test_assets_dir.parent / "ipfs"))


@pytest.mark.parametrize("interval", (40, 400, 4000))
def test_get_ethpm_birth_block(w3, interval):
time_travel(w3, interval)
latest_block = w3.eth.getBlock("latest")
time_travel(w3, interval)
actual = get_ethpm_birth_block(w3, 0, w3.eth.blockNumber, latest_block.timestamp)
assert actual == latest_block.number - 1


def time_travel(w3, hours):
for hour in range(1, hours):
current_time = w3.eth.getBlock("latest").timestamp
dest_time = int(
(datetime.fromtimestamp(current_time) + timedelta(hours=1)).strftime("%s")
)
w3.provider.ethereum_tester.time_travel(dest_time)
w3.testing.mine(1)

0 comments on commit 6b8e84c

Please sign in to comment.