From 37775a0588b4746734d697acd094af238740cd1f Mon Sep 17 00:00:00 2001 From: Nick Gheorghita Date: Wed, 15 May 2019 11:16:51 +0200 Subject: [PATCH 1/4] update to scrape block ranges rather than individual blocks --- ethpm_cli/scraper.py | 48 ++++++++++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/ethpm_cli/scraper.py b/ethpm_cli/scraper.py index 59f0c55..6cf461e 100644 --- a/ethpm_cli/scraper.py +++ b/ethpm_cli/scraper.py @@ -20,6 +20,8 @@ logger = logging.getLogger("ethpm_cli.scraper.Scraper") +BLOCK_INTERVAL = 50 + def scrape(w3: Web3, ethpm_dir: Path, start_block: int = 0) -> int: """ Scrapes VersionRelease event data starting from start_block. @@ -39,22 +41,27 @@ def scrape(w3: Web3, ethpm_dir: Path, start_block: int = 0) -> int: ethpmdir_block = max(get_scraped_blocks(chain_data_path)) active_block = start_block if start_block else ethpmdir_block - for block in range(active_block, latest_block): - try: - validate_unscraped_block(block, chain_data_path) - except BlockAlreadyScrapedError: - pass + + for from_block in range(active_block, latest_block, BLOCK_INTERVAL): + if (from_block + BLOCK_INTERVAL) > latest_block: + to_block = latest_block else: - scraped_manifests = scrape_block_for_manifests(w3, block) - update_chain_data(chain_data_path, block, scraped_manifests) + to_block = from_block + BLOCK_INTERVAL + + if block_range_needs_scraping(from_block, to_block, chain_data_path): + scraped_manifests = scrape_block_range_for_manifests(w3, from_block, to_block) + update_chain_data(chain_data_path, from_block, to_block, scraped_manifests) write_ipfs_uris_to_disk(ethpm_dir, scraped_manifests) + return latest_block -def validate_unscraped_block(block: int, chain_data_path: Path) -> None: +def block_range_needs_scraping(from_block: int, to_block: int, chain_data_path: Path) -> bool: all_scraped_blocks = get_scraped_blocks(chain_data_path) - if block in all_scraped_blocks: - raise BlockAlreadyScrapedError(f"Skipping block #{block}. Already processed.") + for block in range(from_block, to_block): + if block not in all_scraped_blocks: + return True + return False def initialize_ethpm_dir(ethpm_dir: Path, w3: Web3) -> None: @@ -73,12 +80,13 @@ def initialize_ethpm_dir(ethpm_dir: Path, w3: Web3) -> None: def update_chain_data( - chain_data_path: Path, block_number: int, manifests: Dict[Address, Dict[str, str]] + chain_data_path: Path, from_block: int, to_block: int, manifests: Dict[Address, Dict[str, str]] ) -> None: chain_data = json.loads(chain_data_path.read_text()) - all_scraped_blocks = get_scraped_blocks(chain_data_path) - updated_blocks = blocks_to_ranges(set(all_scraped_blocks + [block_number])) + old_scraped_blocks = get_scraped_blocks(chain_data_path) + new_scraped_blocks = list(range(from_block, to_block)) + updated_blocks = blocks_to_ranges(set(old_scraped_blocks + new_scraped_blocks)) chain_data_updated_blocks = assoc(chain_data, "scraped_blocks", updated_blocks) chain_data_path.write_text(f"{json.dumps(chain_data_updated_blocks, indent=4)}\n") @@ -138,13 +146,13 @@ def write_ipfs_uris_to_disk( logger.info("%s written to\n %s.\n", uri, asset_dest_path) -def scrape_block_for_manifests( - w3: Web3, block_number: int +def scrape_block_range_for_manifests( + w3: Web3, from_block: int, to_block: int ) -> Dict[Address, Dict[str, str]]: - version_release_logs = get_block_version_release_logs(w3, block_number) + version_release_logs = get_block_version_release_logs(w3, from_block, to_block) logger.info( - "Block # %d scraped. %d VersionRelease events found in block.", - block_number, + "Blocks %d-%d scraped. %d VersionRelease events found.", + from_block, to_block, len(version_release_logs), ) if version_release_logs: @@ -203,9 +211,9 @@ def process_entries( yield "version", entry["args"]["version"] -def get_block_version_release_logs(w3: Web3, block_number: int) -> Dict[str, Any]: +def get_block_version_release_logs(w3: Web3, from_block: int, to_block: int) -> Dict[str, Any]: log_contract = w3.eth.contract(abi=VERSION_RELEASE_ABI) log_filter = log_contract.events.VersionRelease.createFilter( - fromBlock=block_number, toBlock=block_number + fromBlock=from_block, toBlock=to_block ) return log_filter.get_all_entries() From 1d17f53d399d1fd2609b200e6eee13886ca1f332 Mon Sep 17 00:00:00 2001 From: Nick Gheorghita Date: Wed, 15 May 2019 11:28:18 +0200 Subject: [PATCH 2/4] Improve logging message for VersionRelease events --- ethpm_cli/scraper.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ethpm_cli/scraper.py b/ethpm_cli/scraper.py index 6cf461e..0221d00 100644 --- a/ethpm_cli/scraper.py +++ b/ethpm_cli/scraper.py @@ -206,6 +206,10 @@ def process_entries( ) -> Iterable[Tuple[str, str]]: for entry in all_entries: if entry["address"] == address: + logger.info( + f" released on registry @ {address}.\n" + f"Manifest URI: {entry['args']['manifestURI']}" + ) yield "manifestURI", entry["args"]["manifestURI"] yield "packageName", entry["args"]["packageName"] yield "version", entry["args"]["version"] From c9e4e4c3f6487e087d29610758bb79770ca82aa6 Mon Sep 17 00:00:00 2001 From: Nick Gheorghita Date: Wed, 15 May 2019 13:51:38 +0200 Subject: [PATCH 3/4] Intelligent start block --- ethpm_cli/exceptions.py | 8 ----- ethpm_cli/main.py | 3 +- ethpm_cli/scraper.py | 73 +++++++++++++++++++++++++++++--------- tests/core/test_scraper.py | 8 ++--- 4 files changed, 63 insertions(+), 29 deletions(-) diff --git a/ethpm_cli/exceptions.py b/ethpm_cli/exceptions.py index 74113ac..f713d09 100644 --- a/ethpm_cli/exceptions.py +++ b/ethpm_cli/exceptions.py @@ -30,14 +30,6 @@ class BlockNotFoundError(BaseEthpmCliError): pass -class BlockAlreadyScrapedError(BaseEthpmCliError): - """ - Raised when a block has already been scraped for VersionRelease events. - """ - - pass - - class ValidationError(BaseEthpmCliError): """ Raised when a command line args is not valid. diff --git a/ethpm_cli/main.py b/ethpm_cli/main.py index 16d1ed0..b7e7499 100644 --- a/ethpm_cli/main.py +++ b/ethpm_cli/main.py @@ -3,6 +3,7 @@ from pathlib import Path import sys +from eth_utils import to_hex import pkg_resources from web3 import Web3 from web3.middleware import local_filter_middleware @@ -43,7 +44,7 @@ def scraper(args: argparse.Namespace) -> None: logger.info( "All blocks scraped up to # %d: %s.", last_scraped_block, - last_scraped_block_hash, + to_hex(last_scraped_block_hash), ) diff --git a/ethpm_cli/scraper.py b/ethpm_cli/scraper.py index 0221d00..3cf30a5 100644 --- a/ethpm_cli/scraper.py +++ b/ethpm_cli/scraper.py @@ -1,3 +1,4 @@ +from datetime import datetime import itertools import json import logging @@ -14,20 +15,23 @@ from ethpm_cli._utils.various import flatten from ethpm_cli.constants import VERSION_RELEASE_ABI -from ethpm_cli.exceptions import BlockAlreadyScrapedError, BlockNotFoundError +from ethpm_cli.exceptions import BlockNotFoundError from ethpm_cli.validation import validate_chain_data_store logger = logging.getLogger("ethpm_cli.scraper.Scraper") +# https://github.com/ethereum/EIPs/commit/123b7267b6270914a822001c119d11607e695517 +VERSION_RELEASE_TIMESTAMP = 1_552_564_800 # March 14, 2019 + +BLOCK_INTERVAL = 5000 -BLOCK_INTERVAL = 50 def scrape(w3: Web3, ethpm_dir: Path, start_block: int = 0) -> int: """ Scrapes VersionRelease event data starting from start_block. - If start_block is 0 (default), scraping begins from the - max block found in ethpm_dir/chain_data.json. + If start_block is not 0, scraping begins from start_block. + Otherwise the scraping begins from the ethpm birth block. """ initialize_ethpm_dir(ethpm_dir, w3) chain_data_path = ethpm_dir / "chain_data.json" @@ -39,9 +43,12 @@ def scrape(w3: Web3, ethpm_dir: Path, start_block: int = 0) -> int: f"instance with latest block number of {latest_block}." ) - ethpmdir_block = max(get_scraped_blocks(chain_data_path)) - active_block = start_block if start_block else ethpmdir_block + if start_block == 0: + active_block = get_ethpm_birth_block(w3, 0, latest_block) + else: + active_block = start_block + logger.info("Scraping from block %d.", active_block) for from_block in range(active_block, latest_block, BLOCK_INTERVAL): if (from_block + BLOCK_INTERVAL) > latest_block: to_block = latest_block @@ -49,18 +56,43 @@ def scrape(w3: Web3, ethpm_dir: Path, start_block: int = 0) -> int: to_block = from_block + BLOCK_INTERVAL if block_range_needs_scraping(from_block, to_block, chain_data_path): - scraped_manifests = scrape_block_range_for_manifests(w3, from_block, to_block) + scraped_manifests = scrape_block_range_for_manifests( + w3, from_block, to_block + ) update_chain_data(chain_data_path, from_block, to_block, scraped_manifests) write_ipfs_uris_to_disk(ethpm_dir, scraped_manifests) + else: + logger.info("Block range: %d - %d already scraped.", from_block, to_block) return latest_block -def block_range_needs_scraping(from_block: int, to_block: int, chain_data_path: Path) -> bool: +def get_ethpm_birth_block(w3: Web3, from_block: int, to_block: int) -> int: + """ + Returns the closest block found before the EthPM VersionRelease event birthday. + """ + version_release_date = datetime.fromtimestamp(VERSION_RELEASE_TIMESTAMP) + from_date = datetime.fromtimestamp(w3.eth.getBlock(from_block)["timestamp"]) + delta = version_release_date - from_date + + if delta.days == 0 and from_date < version_release_date: + return from_block + + elif from_date < version_release_date: + updated_block = int((from_block + to_block) / 2) + return get_ethpm_birth_block(w3, updated_block, to_block) + + else: + updated_block = from_block - int(to_block - from_block) + return get_ethpm_birth_block(w3, updated_block, from_block) + + +def block_range_needs_scraping( + from_block: int, to_block: int, chain_data_path: Path +) -> bool: all_scraped_blocks = get_scraped_blocks(chain_data_path) - for block in range(from_block, to_block): - if block not in all_scraped_blocks: - return True + if any(block not in all_scraped_blocks for block in range(from_block, to_block)): + return True return False @@ -80,7 +112,10 @@ def initialize_ethpm_dir(ethpm_dir: Path, w3: Web3) -> None: def update_chain_data( - chain_data_path: Path, from_block: int, to_block: int, manifests: Dict[Address, Dict[str, str]] + chain_data_path: Path, + from_block: int, + to_block: int, + manifests: Dict[Address, Dict[str, str]], ) -> None: chain_data = json.loads(chain_data_path.read_text()) @@ -152,7 +187,8 @@ def scrape_block_range_for_manifests( version_release_logs = get_block_version_release_logs(w3, from_block, to_block) logger.info( "Blocks %d-%d scraped. %d VersionRelease events found.", - from_block, to_block, + from_block, + to_block, len(version_release_logs), ) if version_release_logs: @@ -207,15 +243,20 @@ def process_entries( for entry in all_entries: if entry["address"] == address: logger.info( - f" released on registry @ {address}.\n" - f"Manifest URI: {entry['args']['manifestURI']}" + " released on registry @ %s.\n" "Manifest URI: %s\n", + entry["args"]["packageName"], + entry["args"]["version"], + address, + entry["args"]["manifestURI"], ) yield "manifestURI", entry["args"]["manifestURI"] yield "packageName", entry["args"]["packageName"] yield "version", entry["args"]["version"] -def get_block_version_release_logs(w3: Web3, from_block: int, to_block: int) -> Dict[str, Any]: +def get_block_version_release_logs( + w3: Web3, from_block: int, to_block: int +) -> Dict[str, Any]: log_contract = w3.eth.contract(abi=VERSION_RELEASE_ABI) log_filter = log_contract.events.VersionRelease.createFilter( fromBlock=from_block, toBlock=to_block diff --git a/tests/core/test_scraper.py b/tests/core/test_scraper.py index 5aacb24..bd13d29 100644 --- a/tests/core/test_scraper.py +++ b/tests/core/test_scraper.py @@ -45,7 +45,7 @@ def test_scraper_logs_scraped_block_ranges(log, w3): # Initial scrape w3.testing.mine(6) - scrape(w3, ethpmcli_dir) + scrape(w3, ethpmcli_dir, 1) expected_1 = {"chain_id": "0x3d", "scraped_blocks": [{"min": "0", "max": "6"}]} actual_1 = json.loads((ethpmcli_dir / "chain_data.json").read_text()) assert actual_1 == expected_1 @@ -119,7 +119,7 @@ def test_scraper_writes_to_disk(log, log_2, test_assets_dir, w3): w3.testing.mine(3) ethpmcli_dir = Path(os.environ["XDG_ETHPMCLI_ROOT"]) - scrape(w3, ethpmcli_dir) + scrape(w3, ethpmcli_dir, 1) assert check_dir_trees_equal(ethpmcli_dir, (test_assets_dir.parent / "ipfs")) @@ -143,7 +143,7 @@ def test_scraper_imports_existing_ethpmcli_dir(log, log_2, test_assets_dir, w3): ethpmcli_dir = Path(os.environ["XDG_ETHPMCLI_ROOT"]) # First scrape - scrape(w3, ethpmcli_dir) + scrape(w3, ethpmcli_dir, 1) w3.testing.mine(3) release( log, @@ -154,5 +154,5 @@ def test_scraper_imports_existing_ethpmcli_dir(log, log_2, test_assets_dir, w3): ) w3.testing.mine(3) # Second scrape - scrape(w3, ethpmcli_dir) + scrape(w3, ethpmcli_dir, 1) assert check_dir_trees_equal(ethpmcli_dir, (test_assets_dir.parent / "ipfs")) From eb46852a15ed76ad7c6d358973a317419af353c6 Mon Sep 17 00:00:00 2001 From: Nick Gheorghita Date: Thu, 16 May 2019 12:39:07 +0200 Subject: [PATCH 4/4] Write tests for get_ethpm_birth_block --- ethpm_cli/main.py | 9 +++++-- ethpm_cli/scraper.py | 53 ++++++++++++++++++++++++++------------ tests/core/test_scraper.py | 22 +++++++++++++++- 3 files changed, 64 insertions(+), 20 deletions(-) diff --git a/ethpm_cli/main.py b/ethpm_cli/main.py index b7e7499..e7822fb 100644 --- a/ethpm_cli/main.py +++ b/ethpm_cli/main.py @@ -3,7 +3,7 @@ from pathlib import Path import sys -from eth_utils import to_hex +from eth_utils import humanize_hash import pkg_resources from web3 import Web3 from web3.middleware import local_filter_middleware @@ -44,7 +44,12 @@ def scraper(args: argparse.Namespace) -> None: logger.info( "All blocks scraped up to # %d: %s.", last_scraped_block, - to_hex(last_scraped_block_hash), + humanize_hash(last_scraped_block_hash), + ) + logger.debug( + "All blocks scraped up to # %d: %s.", + last_scraped_block, + last_scraped_block_hash, ) diff --git a/ethpm_cli/scraper.py b/ethpm_cli/scraper.py index 3cf30a5..d30bd8e 100644 --- a/ethpm_cli/scraper.py +++ b/ethpm_cli/scraper.py @@ -3,6 +3,8 @@ import json import logging from pathlib import Path +import shutil +import tempfile from typing import Any, Dict, Iterable, List, Set, Tuple # noqa: F401 from eth_utils import to_dict, to_list @@ -23,7 +25,7 @@ # https://github.com/ethereum/EIPs/commit/123b7267b6270914a822001c119d11607e695517 VERSION_RELEASE_TIMESTAMP = 1_552_564_800 # March 14, 2019 -BLOCK_INTERVAL = 5000 +BATCH_SIZE = 5000 def scrape(w3: Web3, ethpm_dir: Path, start_block: int = 0) -> int: @@ -44,16 +46,18 @@ def scrape(w3: Web3, ethpm_dir: Path, start_block: int = 0) -> int: ) if start_block == 0: - active_block = get_ethpm_birth_block(w3, 0, latest_block) + active_block = get_ethpm_birth_block( + w3, 0, latest_block, VERSION_RELEASE_TIMESTAMP + ) else: active_block = start_block logger.info("Scraping from block %d.", active_block) - for from_block in range(active_block, latest_block, BLOCK_INTERVAL): - if (from_block + BLOCK_INTERVAL) > latest_block: + for from_block in range(active_block, latest_block, BATCH_SIZE): + if (from_block + BATCH_SIZE) > latest_block: to_block = latest_block else: - to_block = from_block + BLOCK_INTERVAL + to_block = from_block + BATCH_SIZE if block_range_needs_scraping(from_block, to_block, chain_data_path): scraped_manifests = scrape_block_range_for_manifests( @@ -67,24 +71,30 @@ def scrape(w3: Web3, ethpm_dir: Path, start_block: int = 0) -> int: return latest_block -def get_ethpm_birth_block(w3: Web3, from_block: int, to_block: int) -> int: +def get_ethpm_birth_block( + w3: Web3, from_block: int, to_block: int, target_timestamp: int +) -> int: """ - Returns the closest block found before the EthPM VersionRelease event birthday. + Returns the closest block found before the target_timestamp """ - version_release_date = datetime.fromtimestamp(VERSION_RELEASE_TIMESTAMP) + version_release_date = datetime.fromtimestamp(target_timestamp) from_date = datetime.fromtimestamp(w3.eth.getBlock(from_block)["timestamp"]) delta = version_release_date - from_date - if delta.days == 0 and from_date < version_release_date: - return from_block + if delta.days <= 0 and from_date < version_release_date: + while ( + w3.eth.getBlock(from_block)["timestamp"] < version_release_date.timestamp() + ): + from_block += 1 + return from_block - 1 elif from_date < version_release_date: updated_block = int((from_block + to_block) / 2) - return get_ethpm_birth_block(w3, updated_block, to_block) + return get_ethpm_birth_block(w3, updated_block, to_block, target_timestamp) else: updated_block = from_block - int(to_block - from_block) - return get_ethpm_birth_block(w3, updated_block, from_block) + return get_ethpm_birth_block(w3, updated_block, from_block, target_timestamp) def block_range_needs_scraping( @@ -108,7 +118,7 @@ def initialize_ethpm_dir(ethpm_dir: Path, w3: Web3) -> None: "chain_id": w3.eth.chainId, "scraped_blocks": [{"min": "0", "max": "0"}], } - chain_data_path.write_text(f"{json.dumps(init_json, indent=4)}\n") + write_updated_chain_data(chain_data_path, init_json) def update_chain_data( @@ -123,8 +133,17 @@ def update_chain_data( new_scraped_blocks = list(range(from_block, to_block)) updated_blocks = blocks_to_ranges(set(old_scraped_blocks + new_scraped_blocks)) - chain_data_updated_blocks = assoc(chain_data, "scraped_blocks", updated_blocks) - chain_data_path.write_text(f"{json.dumps(chain_data_updated_blocks, indent=4)}\n") + chain_data_with_updated_blocks = assoc(chain_data, "scraped_blocks", updated_blocks) + write_updated_chain_data(chain_data_path, chain_data_with_updated_blocks) + + +def write_updated_chain_data( + chain_data_path: Path, updated_data: Dict[str, Any] +) -> None: + tmp_pkg_dir = Path(tempfile.mkdtemp()) + tmp_data = tmp_pkg_dir / "chain_data.json" + tmp_data.write_text(f"{json.dumps(updated_data, indent=4)}\n") + shutil.copyfile(tmp_data, chain_data_path) @to_list @@ -135,8 +154,8 @@ def blocks_to_ranges(blocks_list: List[int]) -> Iterable[Dict[str, str]]: -> [{"min": "0", "max": "2"}, {"min": "4", "max": "4"}, {"min": "6", "max": "9"}] """ for a, b in itertools.groupby(enumerate(blocks_list), lambda x: x[0] - x[1]): - b = list(b) # type: ignore - yield {"min": str(b[0][1]), "max": str(b[-1][1])} # type: ignore + c = list(b) + yield {"min": str(c[0][1]), "max": str(c[-1][1])} def get_scraped_blocks(chain_data_path: Path) -> List[int]: diff --git a/tests/core/test_scraper.py b/tests/core/test_scraper.py index bd13d29..14e9d83 100644 --- a/tests/core/test_scraper.py +++ b/tests/core/test_scraper.py @@ -1,3 +1,4 @@ +from datetime import datetime, timedelta import json import os from pathlib import Path @@ -9,7 +10,7 @@ from ethpm_cli import CLI_ASSETS_DIR from ethpm_cli._utils.testing import check_dir_trees_equal -from ethpm_cli.scraper import scrape +from ethpm_cli.scraper import get_ethpm_birth_block, scrape @pytest.fixture @@ -156,3 +157,22 @@ def test_scraper_imports_existing_ethpmcli_dir(log, log_2, test_assets_dir, w3): # Second scrape scrape(w3, ethpmcli_dir, 1) assert check_dir_trees_equal(ethpmcli_dir, (test_assets_dir.parent / "ipfs")) + + +@pytest.mark.parametrize("interval", (40, 400, 4000)) +def test_get_ethpm_birth_block(w3, interval): + time_travel(w3, interval) + latest_block = w3.eth.getBlock("latest") + time_travel(w3, interval) + actual = get_ethpm_birth_block(w3, 0, w3.eth.blockNumber, latest_block.timestamp) + assert actual == latest_block.number - 1 + + +def time_travel(w3, hours): + for hour in range(1, hours): + current_time = w3.eth.getBlock("latest").timestamp + dest_time = int( + (datetime.fromtimestamp(current_time) + timedelta(hours=1)).strftime("%s") + ) + w3.provider.ethereum_tester.time_travel(dest_time) + w3.testing.mine(1)