From 97f542dfe3cd2aaf64d4b681f8b43839d6529eb9 Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi Date: Wed, 4 Sep 2024 08:50:25 -0400 Subject: [PATCH 1/2] Add benchmarks for binning operations. --- .copier-answers.yml | 4 +- .github/ISSUE_TEMPLATE/1-bug_report.md | 4 +- .github/workflows/asv-main.yml | 101 ++++++++++++++++++++ .github/workflows/asv-nightly.yml | 93 ++++++++++++++++++ .github/workflows/asv-pr.yml | 86 +++++++++++++++++ .github/workflows/publish-benchmarks-pr.yml | 53 ++++++++++ .pre-commit-config.yaml | 2 +- benchmarks/__init__.py | 0 benchmarks/asv.conf.json | 80 ++++++++++++++++ benchmarks/benchmarks.py | 39 ++++++++ pyproject.toml | 47 +++++++++ src/hipscat_import/catalog/resume_plan.py | 20 +++- 12 files changed, 519 insertions(+), 10 deletions(-) create mode 100644 .github/workflows/asv-main.yml create mode 100644 .github/workflows/asv-nightly.yml create mode 100644 .github/workflows/asv-pr.yml create mode 100644 .github/workflows/publish-benchmarks-pr.yml create mode 100644 benchmarks/__init__.py create mode 100644 benchmarks/asv.conf.json create mode 100644 benchmarks/benchmarks.py diff --git a/.copier-answers.yml b/.copier-answers.yml index 54c2b6b2..44cede77 100644 --- a/.copier-answers.yml +++ b/.copier-answers.yml @@ -1,5 +1,5 @@ # Changes here will be overwritten by Copier -_commit: v2.0.1 +_commit: v2.0.2 _src_path: gh:lincc-frameworks/python-project-template author_email: lincc-frameworks-team@lists.lsst.org author_name: LINCC Frameworks @@ -11,7 +11,7 @@ enforce_style: - isort failure_notification: - slack -include_benchmarks: false +include_benchmarks: true include_docs: true include_notebooks: true mypy_type_checking: basic diff --git a/.github/ISSUE_TEMPLATE/1-bug_report.md b/.github/ISSUE_TEMPLATE/1-bug_report.md index 220a63d6..16b6b711 100644 --- a/.github/ISSUE_TEMPLATE/1-bug_report.md +++ b/.github/ISSUE_TEMPLATE/1-bug_report.md @@ -13,5 +13,5 @@ assignees: '' Please check the following: - [ ] I have described the situation in which the bug arose, including what code was executed, information about my environment, and any applicable data others will need to reproduce the problem. -- [ ] I have included available evidence of the unexpected behavior (including error messages, screenshots, and/or plots) as well as a descriprion of what I expected instead. -- [ ] If I have a solution in mind, I have provided an explanation and/or pseudocode and/or task list. \ No newline at end of file +- [ ] I have included available evidence of the unexpected behavior (including error messages, screenshots, and/or plots) as well as a description of what I expected instead. +- [ ] If I have a solution in mind, I have provided an explanation and/or pseudocode and/or task list. diff --git a/.github/workflows/asv-main.yml b/.github/workflows/asv-main.yml new file mode 100644 index 00000000..f6a6f297 --- /dev/null +++ b/.github/workflows/asv-main.yml @@ -0,0 +1,101 @@ +# This workflow will run benchmarks with airspeed velocity (asv), +# store the new results in the "benchmarks" branch and publish them +# to a dashboard on GH Pages. + +name: Run ASV benchmarks for main + +on: + push: + branches: [ main ] + +env: + PYTHON_VERSION: "3.10" + WORKING_DIR: ${{ github.workspace }}/benchmarks + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + + setup-python: + runs-on: ubuntu-latest + + steps: + - name: Cache Python ${{ env.PYTHON_VERSION }} + uses: actions/cache@v4 + with: + path: ~/.cache/pip + key: python-${{ env.PYTHON_VERSION }} + + - name: Set up Python ${{ env.PYTHON_VERSION }} + uses: actions/setup-python@v5 + with: + python-version: "${{ env.PYTHON_VERSION }}" + + asv-main: + runs-on: ubuntu-latest + needs: setup-python + + permissions: + contents: write + + defaults: + run: + working-directory: ${{ env.WORKING_DIR }} + + steps: + - name: Checkout main branch of the repository + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Cache Python ${{ env.PYTHON_VERSION }} + uses: actions/cache@v4 + with: + path: ~/.cache/pip + key: python-${{ env.PYTHON_VERSION }} + + - name: Install dependencies + run: | + sudo apt-get update + python -m pip install --upgrade pip + pip install asv==0.6.1 virtualenv tabulate + + - name: Configure git + run: | + git config user.name "github-actions[bot]" + git config user.email "41898282+github-actions[bot]@users.noreply.github.com" + + - name: Create ASV machine config file + run: asv machine --machine gh-runner --yes + + - name: Fetch previous results from the "benchmarks" branch + run: | + if git ls-remote --exit-code origin benchmarks > /dev/null 2>&1; then + git merge origin/benchmarks \ + --allow-unrelated-histories \ + --no-commit + mv ../_results . + fi + + - name: Run ASV for the main branch + run: asv run ALL --skip-existing --verbose || true + + - name: Submit new results to the "benchmarks" branch + uses: JamesIves/github-pages-deploy-action@v4 + with: + branch: benchmarks + folder: ${{ env.WORKING_DIR }}/_results + target-folder: _results + + - name: Generate dashboard HTML + run: | + asv show + asv publish + + - name: Deploy to Github pages + uses: JamesIves/github-pages-deploy-action@v4 + with: + branch: gh-pages + folder: ${{ env.WORKING_DIR }}/_html \ No newline at end of file diff --git a/.github/workflows/asv-nightly.yml b/.github/workflows/asv-nightly.yml new file mode 100644 index 00000000..80a2d785 --- /dev/null +++ b/.github/workflows/asv-nightly.yml @@ -0,0 +1,93 @@ +# This workflow will run daily at 06:45. +# It will run benchmarks with airspeed velocity (asv) +# and compare performance with the previous nightly build. + +name: Run benchmarks nightly job + +on: + schedule: + - cron: 45 6 * * * + workflow_dispatch: + +env: + PYTHON_VERSION: "3.10" + WORKING_DIR: ${{ github.workspace }}/benchmarks + NIGHTLY_HASH_FILE: nightly-hash + +jobs: + + asv-nightly: + runs-on: ubuntu-latest + + defaults: + run: + working-directory: ${{ env.WORKING_DIR }} + + steps: + - name: Checkout main branch of the repository + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Cache Python ${{ env.PYTHON_VERSION }} + uses: actions/cache@v4 + with: + path: ~/.cache/pip + key: python-${{ env.PYTHON_VERSION }} + + - name: Set up Python ${{ env.PYTHON_VERSION }} + uses: actions/setup-python@v5 + with: + python-version: "${{ env.PYTHON_VERSION }}" + + - name: Install dependencies + run: | + sudo apt-get update + python -m pip install --upgrade pip + pip install asv==0.6.1 virtualenv + + - name: Configure git + run: | + git config user.name "github-actions[bot]" + git config user.email "41898282+github-actions[bot]@users.noreply.github.com" + + - name: Create ASV machine config file + run: asv machine --machine gh-runner --yes + + - name: Fetch previous results from the "benchmarks" branch + run: | + if git ls-remote --exit-code origin benchmarks > /dev/null 2>&1; then + git merge origin/benchmarks \ + --allow-unrelated-histories \ + --no-commit + mv ../_results . + fi + + - name: Get nightly dates under comparison + id: nightly-dates + run: | + echo "yesterday=$(date -d yesterday +'%Y-%m-%d')" >> $GITHUB_OUTPUT + echo "today=$(date +'%Y-%m-%d')" >> $GITHUB_OUTPUT + + - name: Use last nightly commit hash from cache + uses: actions/cache@v4 + with: + path: ${{ env.WORKING_DIR }} + key: nightly-results-${{ steps.nightly-dates.outputs.yesterday }} + + - name: Run comparison of main against last nightly build + run: | + HASH_FILE=${{ env.NIGHTLY_HASH_FILE }} + CURRENT_HASH=${{ github.sha }} + if [ -f $HASH_FILE ]; then + PREV_HASH=$(cat $HASH_FILE) + asv continuous $PREV_HASH $CURRENT_HASH --verbose || true + asv compare $PREV_HASH $CURRENT_HASH --sort ratio --verbose + fi + echo $CURRENT_HASH > $HASH_FILE + + - name: Update last nightly hash in cache + uses: actions/cache@v4 + with: + path: ${{ env.WORKING_DIR }} + key: nightly-results-${{ steps.nightly-dates.outputs.today }} \ No newline at end of file diff --git a/.github/workflows/asv-pr.yml b/.github/workflows/asv-pr.yml new file mode 100644 index 00000000..bf5aed6d --- /dev/null +++ b/.github/workflows/asv-pr.yml @@ -0,0 +1,86 @@ +# This workflow will run benchmarks with airspeed velocity (asv) for pull requests. +# It will compare the performance of the main branch with the performance of the merge +# with the new changes. It then publishes a comment with this assessment by triggering +# the publish-benchmarks-pr workflow. +# Based on https://securitylab.github.com/research/github-actions-preventing-pwn-requests/. +name: Run benchmarks for PR + +on: + pull_request: + branches: [ main ] + workflow_dispatch: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +env: + PYTHON_VERSION: "3.10" + WORKING_DIR: ${{ github.workspace }}/benchmarks + ARTIFACTS_DIR: ${{ github.workspace }}/artifacts + +jobs: + setup-python: + runs-on: ubuntu-latest + steps: + - name: Cache Python ${{ env.PYTHON_VERSION }} + uses: actions/cache@v4 + with: + path: ~/.cache/pip + key: python-${{ env.PYTHON_VERSION }} + - name: Set up Python ${{ env.PYTHON_VERSION }} + uses: actions/setup-python@v5 + with: + python-version: ${{ env.PYTHON_VERSION }} + asv-pr: + runs-on: ubuntu-latest + needs: setup-python + defaults: + run: + working-directory: ${{ env.WORKING_DIR }} + steps: + - name: Checkout PR branch of the repository + uses: actions/checkout@v4 + with: + fetch-depth: 0 + - name: Display Workflow Run Information + run: | + echo "Workflow Run ID: ${{ github.run_id }}" + - name: Cache Python ${{ env.PYTHON_VERSION }} + uses: actions/cache@v4 + with: + path: ~/.cache/pip + key: python-${{ env.PYTHON_VERSION }} + - name: Install dependencies + run: | + sudo apt-get update + python -m pip install --upgrade pip + pip install asv==0.6.1 virtualenv tabulate lf-asv-formatter + - name: Make artifacts directory + run: mkdir -p ${{ env.ARTIFACTS_DIR }} + - name: Save pull request number + run: echo ${{ github.event.pull_request.number }} > ${{ env.ARTIFACTS_DIR }}/pr + - name: Get current job logs URL + uses: Tiryoh/gha-jobid-action@v1 + id: jobs + with: + github_token: ${{ secrets.GITHUB_TOKEN }} + job_name: ${{ github.job }} + - name: Create ASV machine config file + run: asv machine --machine gh-runner --yes + - name: Save comparison of PR against main branch + run: | + git remote add upstream https://github.com/${{ github.repository }}.git + git fetch upstream + asv continuous upstream/main HEAD --verbose || true + asv compare upstream/main HEAD --sort ratio --verbose | tee output + python -m lf_asv_formatter --asv_version "$(echo asv --version)" + printf "\n\nClick [here]($STEP_URL) to view all benchmarks." >> output + mv output ${{ env.ARTIFACTS_DIR }} + env: + STEP_URL: "${{ steps.jobs.outputs.html_url }}#step:11:1" + - name: Upload artifacts (PR number and benchmarks output) + uses: actions/upload-artifact@v4 + with: + name: benchmark-artifacts + path: ${{ env.ARTIFACTS_DIR }} \ No newline at end of file diff --git a/.github/workflows/publish-benchmarks-pr.yml b/.github/workflows/publish-benchmarks-pr.yml new file mode 100644 index 00000000..45ed9280 --- /dev/null +++ b/.github/workflows/publish-benchmarks-pr.yml @@ -0,0 +1,53 @@ +# This workflow publishes a benchmarks comment on a pull request. It is triggered after the +# benchmarks are computed in the asv-pr workflow. This separation of concerns allows us limit +# access to the target repository private tokens and secrets, increasing the level of security. +# Based on https://securitylab.github.com/research/github-actions-preventing-pwn-requests/. +name: Publish benchmarks comment to PR + +on: + workflow_run: + workflows: ["Run benchmarks for PR"] + types: [completed] + +jobs: + upload-pr-comment: + runs-on: ubuntu-latest + if: > + github.event.workflow_run.event == 'pull_request' && + github.event.workflow_run.conclusion == 'success' + permissions: + issues: write + pull-requests: write + steps: + - name: Display Workflow Run Information + run: | + echo "Workflow Run ID: ${{ github.event.workflow_run.id }}" + echo "Head SHA: ${{ github.event.workflow_run.head_sha }}" + echo "Head Branch: ${{ github.event.workflow_run.head_branch }}" + echo "Conclusion: ${{ github.event.workflow_run.conclusion }}" + echo "Event: ${{ github.event.workflow_run.event }}" + - name: Download artifact + uses: dawidd6/action-download-artifact@v3 + with: + name: benchmark-artifacts + run_id: ${{ github.event.workflow_run.id }} + - name: Extract artifacts information + id: pr-info + run: | + printf "PR number: $(cat pr)\n" + printf "Output:\n$(cat output)" + printf "pr=$(cat pr)" >> $GITHUB_OUTPUT + - name: Find benchmarks comment + uses: peter-evans/find-comment@v3 + id: find-comment + with: + issue-number: ${{ steps.pr-info.outputs.pr }} + comment-author: 'github-actions[bot]' + body-includes: view all benchmarks + - name: Create or update benchmarks comment + uses: peter-evans/create-or-update-comment@v4 + with: + comment-id: ${{ steps.find-comment.outputs.comment-id }} + issue-number: ${{ steps.pr-info.outputs.pr }} + body-path: output + edit-mode: replace \ No newline at end of file diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e7f7c3b2..d06c1c1a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -3,7 +3,7 @@ repos: # This hook should always pass. It will print a message if the local version # is out of date. - repo: https://github.com/lincc-frameworks/pre-commit-hooks - rev: v0.1.1 + rev: v0.1.2 hooks: - id: check-lincc-frameworks-template-version name: Check template version diff --git a/benchmarks/__init__.py b/benchmarks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/benchmarks/asv.conf.json b/benchmarks/asv.conf.json new file mode 100644 index 00000000..fbe36f21 --- /dev/null +++ b/benchmarks/asv.conf.json @@ -0,0 +1,80 @@ +{ + // The version of the config file format. Do not change, unless + // you know what you are doing. + "version": 1, + // The name of the project being benchmarked. + "project": "hipscat-import", + // The project's homepage. + "project_url": "https://github.com/astronomy-commons/hipscat-import", + // The URL or local path of the source code repository for the + // project being benchmarked. + "repo": "..", + // List of branches to benchmark. If not provided, defaults to "master" + // (for git) or "tip" (for mercurial). + "branches": [ + "HEAD" + ], + "install_command": [ + "python -m pip install {wheel_file}" + ], + "build_command": [ + "python -m build --wheel -o {build_cache_dir} {build_dir}" + ], + // The DVCS being used. If not set, it will be automatically + // determined from "repo" by looking at the protocol in the URL + // (if remote), or by looking for special directories, such as + // ".git" (if local). + "dvcs": "git", + // The tool to use to create environments. May be "conda", + // "virtualenv" or other value depending on the plugins in use. + // If missing or the empty string, the tool will be automatically + // determined by looking for tools on the PATH environment + // variable. + "environment_type": "virtualenv", + // the base URL to show a commit for the project. + "show_commit_url": "https://github.com/astronomy-commons/hipscat-import/commit/", + // The Pythons you'd like to test against. If not provided, defaults + // to the current version of Python used to run `asv`. + "pythons": [ + "3.10" + ], + // The matrix of dependencies to test. Each key is the name of a + // package (in PyPI) and the values are version numbers. An empty + // list indicates to just test against the default (latest) + // version. + "matrix": { + "Cython": [], + "build": [], + "packaging": [] + }, + // The directory (relative to the current directory) that benchmarks are + // stored in. If not provided, defaults to "benchmarks". + "benchmark_dir": ".", + // The directory (relative to the current directory) to cache the Python + // environments in. If not provided, defaults to "env". + "env_dir": "env", + // The directory (relative to the current directory) that raw benchmark + // results are stored in. If not provided, defaults to "results". + "results_dir": "_results", + // The directory (relative to the current directory) that the html tree + // should be written to. If not provided, defaults to "html". + "html_dir": "_html", + // The number of characters to retain in the commit hashes. + // "hash_length": 8, + // `asv` will cache wheels of the recent builds in each + // environment, making them faster to install next time. This is + // number of builds to keep, per environment. + "build_cache_size": 8 + // The commits after which the regression search in `asv publish` + // should start looking for regressions. Dictionary whose keys are + // regexps matching to benchmark names, and values corresponding to + // the commit (exclusive) after which to start looking for + // regressions. The default is to start from the first commit + // with results. If the commit is `null`, regression detection is + // skipped for the matching benchmark. + // + // "regressions_first_commits": { + // "some_benchmark": "352cdf", // Consider regressions only after this commit + // "another_benchmark": null, // Skip regression detection altogether + // } +} \ No newline at end of file diff --git a/benchmarks/benchmarks.py b/benchmarks/benchmarks.py new file mode 100644 index 00000000..f7e82398 --- /dev/null +++ b/benchmarks/benchmarks.py @@ -0,0 +1,39 @@ +import os +from pathlib import Path +import numpy as np +from hipscat_import.catalog.resume_plan import ResumePlan +from hipscat_import.catalog.sparse_histogram import SparseHistogram + + +class BinningSuite: + """Suite that generates sparse array histogram files and benchmarks the operations on them.""" + + def setup_cache(self): + root_dir = Path(os.getcwd()) + tmp_dir = root_dir / "intermediate" + binning_dir = tmp_dir / "histograms" + binning_dir.mkdir(parents=True, exist_ok=True) + max_value = 786_432 + + num_paths = 2_000 + for m in range(0, num_paths): + k = (m + 1) * 100 + pixels = np.arange(k, max_value, k) + counts = np.full(len(pixels), fill_value=k) + + histo = SparseHistogram.make_from_counts(pixels, counts, healpix_order=8) + + histo.to_file(binning_dir / f"map_{m}") + return (tmp_dir, num_paths) + + # def time_read_histogram(self, cache): + # input_paths = [f"foo{i}" for i in range(0, cache[1])] + # plan = ResumePlan(tmp_path=cache[0], progress_bar=False, input_paths=input_paths) + + # plan.read_histogram(8) + + def peakmem_read_histogram(self, cache): + input_paths = [f"foo{i}" for i in range(0, cache[1])] + plan = ResumePlan(tmp_path=cache[0], progress_bar=False, input_paths=input_paths) + + plan.read_histogram(8) \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index e541082f..71377ad3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,7 @@ dependencies = [ # On a mac, install optional dependencies with `pip install '.[dev]'` (include the single quotes) [project.optional-dependencies] dev = [ + "asv==0.6.4", # Used to compute performance benchmarks "black", # Used for static linting of files "jupyter", # Clears output from Jupyter notebooks "mypy", # Used for static type checking of files @@ -79,6 +80,52 @@ target-version = ["py38"] profile = "black" line_length = 110 +[tool.ruff] +line-length = 110 +target-version = "py39" + +[tool.ruff.lint] +select = [ + # pycodestyle + "E", + "W", + # Pyflakes + "F", + # pep8-naming + "N", + # pyupgrade + "UP", + # flake8-bugbear + "B", + # flake8-simplify + "SIM", + # isort + "I", + # docstrings + "D101", + "D102", + "D103", + "D106", + "D206", + "D207", + "D208", + "D300", + "D417", + "D419", + # Numpy v2.0 compatibility + "NPY201", +] + +ignore = [ + "UP006", # Allow non standard library generics in type hints + "UP007", # Allow Union in type hints + "SIM114", # Allow if with same arms + "B028", # Allow default warning level + "SIM117", # Allow nested with + "UP015", # Allow redundant open parameters + "UP028", # Allow yield in for loop +] + [tool.coverage.run] omit = [ "src/hipscat_import/_version.py", # auto-generated diff --git a/src/hipscat_import/catalog/resume_plan.py b/src/hipscat_import/catalog/resume_plan.py index 0f8a6682..c0739bb1 100644 --- a/src/hipscat_import/catalog/resume_plan.py +++ b/src/hipscat_import/catalog/resume_plan.py @@ -11,7 +11,8 @@ from hipscat import pixel_math from hipscat.io import FilePointer, file_io from hipscat.pixel_math.healpix_pixel import HealpixPixel - +import hipscat.pixel_math as hist +from numpy import frombuffer from hipscat_import.catalog.sparse_histogram import SparseHistogram from hipscat_import.pipeline_resume_plan import PipelineResumePlan @@ -175,18 +176,27 @@ def read_histogram(self, healpix_order): if len(remaining_map_files) > 0: raise RuntimeError(f"{len(remaining_map_files)} map stages did not complete successfully.") histogram_files = file_io.find_files_matching_path(self.tmp_path, self.HISTOGRAMS_DIR, "*.npz") - aggregate_histogram = SparseHistogram.make_empty(healpix_order) + aggregate_histogram = hist.empty_histogram(healpix_order) + # aggregate_histogram = SparseHistogram.make_empty(healpix_order) for partial_file_name in histogram_files: - aggregate_histogram.add(SparseHistogram.from_file(partial_file_name)) + partial = SparseHistogram.from_file(partial_file_name) + aggregate_histogram = np.add(aggregate_histogram, partial.to_array()) + + # aggregate_histogram.to_file(file_name) - aggregate_histogram.to_file(file_name) + file_name = file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAM_BINARY_FILE) + with open(file_name, "wb+") as file_handle: + file_handle.write(aggregate_histogram.data) if self.delete_resume_log_files: file_io.remove_directory( file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAMS_DIR), ignore_errors=True, ) - full_histogram = SparseHistogram.from_file(file_name).to_array() + # full_histogram = SparseHistogram.from_file(file_name).to_array() + + with open(file_name, "rb") as file_handle: + full_histogram = frombuffer(file_handle.read(), dtype=np.int64) if len(full_histogram) != hp.order2npix(healpix_order): raise ValueError( From 2cad92abfe0c2ad2b9ab78d8537d3854004148c3 Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi Date: Wed, 4 Sep 2024 12:07:48 -0400 Subject: [PATCH 2/2] Fix up tests. --- benchmarks/benchmarks.py | 12 ++++-------- src/hipscat_import/catalog/resume_plan.py | 16 +++++++++------- src/hipscat_import/catalog/sparse_histogram.py | 5 +++++ tests/hipscat_import/catalog/test_run_import.py | 2 +- 4 files changed, 19 insertions(+), 16 deletions(-) diff --git a/benchmarks/benchmarks.py b/benchmarks/benchmarks.py index f7e82398..061dc651 100644 --- a/benchmarks/benchmarks.py +++ b/benchmarks/benchmarks.py @@ -1,6 +1,8 @@ import os from pathlib import Path + import numpy as np + from hipscat_import.catalog.resume_plan import ResumePlan from hipscat_import.catalog.sparse_histogram import SparseHistogram @@ -26,14 +28,8 @@ def setup_cache(self): histo.to_file(binning_dir / f"map_{m}") return (tmp_dir, num_paths) - # def time_read_histogram(self, cache): - # input_paths = [f"foo{i}" for i in range(0, cache[1])] - # plan = ResumePlan(tmp_path=cache[0], progress_bar=False, input_paths=input_paths) - - # plan.read_histogram(8) - - def peakmem_read_histogram(self, cache): + def time_read_histogram(self, cache): input_paths = [f"foo{i}" for i in range(0, cache[1])] plan = ResumePlan(tmp_path=cache[0], progress_bar=False, input_paths=input_paths) - plan.read_histogram(8) \ No newline at end of file + plan.read_histogram(8) diff --git a/src/hipscat_import/catalog/resume_plan.py b/src/hipscat_import/catalog/resume_plan.py index c0739bb1..83de436f 100644 --- a/src/hipscat_import/catalog/resume_plan.py +++ b/src/hipscat_import/catalog/resume_plan.py @@ -6,13 +6,14 @@ from dataclasses import dataclass, field from typing import List, Optional, Tuple +import hipscat.pixel_math as hist import hipscat.pixel_math.healpix_shim as hp import numpy as np from hipscat import pixel_math from hipscat.io import FilePointer, file_io from hipscat.pixel_math.healpix_pixel import HealpixPixel -import hipscat.pixel_math as hist from numpy import frombuffer + from hipscat_import.catalog.sparse_histogram import SparseHistogram from hipscat_import.pipeline_resume_plan import PipelineResumePlan @@ -177,12 +178,15 @@ def read_histogram(self, healpix_order): raise RuntimeError(f"{len(remaining_map_files)} map stages did not complete successfully.") histogram_files = file_io.find_files_matching_path(self.tmp_path, self.HISTOGRAMS_DIR, "*.npz") aggregate_histogram = hist.empty_histogram(healpix_order) - # aggregate_histogram = SparseHistogram.make_empty(healpix_order) for partial_file_name in histogram_files: partial = SparseHistogram.from_file(partial_file_name) - aggregate_histogram = np.add(aggregate_histogram, partial.to_array()) - - # aggregate_histogram.to_file(file_name) + partial_as_array = partial.to_array() + if aggregate_histogram.shape != partial_as_array.shape: + raise ValueError( + "The histogram partials have incompatible sizes due to different healpix orders. " + + "To start the pipeline from scratch with the current order set `resume` to False." + ) + aggregate_histogram = np.add(aggregate_histogram, partial_as_array) file_name = file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAM_BINARY_FILE) with open(file_name, "wb+") as file_handle: @@ -193,8 +197,6 @@ def read_histogram(self, healpix_order): ignore_errors=True, ) - # full_histogram = SparseHistogram.from_file(file_name).to_array() - with open(file_name, "rb") as file_handle: full_histogram = frombuffer(file_handle.read(), dtype=np.int64) diff --git a/src/hipscat_import/catalog/sparse_histogram.py b/src/hipscat_import/catalog/sparse_histogram.py index ff2a3ebc..ac1549ae 100644 --- a/src/hipscat_import/catalog/sparse_histogram.py +++ b/src/hipscat_import/catalog/sparse_histogram.py @@ -46,6 +46,11 @@ def to_file(self, file_name): """ save_npz(file_name, self.sparse_array) + def to_dense_file(self, file_name): + """Persist the DENSE array to disk as a numpy array.""" + with open(file_name, "wb+") as file_handle: + file_handle.write(self.to_array().data) + @classmethod def make_empty(cls, healpix_order=10): """Create an empty sparse array for a given healpix order. diff --git a/tests/hipscat_import/catalog/test_run_import.py b/tests/hipscat_import/catalog/test_run_import.py index adb5ab8a..96612615 100644 --- a/tests/hipscat_import/catalog/test_run_import.py +++ b/tests/hipscat_import/catalog/test_run_import.py @@ -139,7 +139,7 @@ def test_resume_dask_runner_diff_pixel_order( ## Now set up our resume files to match previous work. resume_tmp = tmp_path / "tmp" / "resume_catalog" ResumePlan(tmp_path=resume_tmp, progress_bar=False) - SparseHistogram.make_from_counts([11], [131], 0).to_file(resume_tmp / "mapping_histogram.npz") + SparseHistogram.make_from_counts([11], [131], 0).to_dense_file(resume_tmp / "mapping_histogram.npz") for file_index in range(0, 5): ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.SPLITTING_STAGE, f"split_{file_index}")