Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy files for benchmarks #5

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/asv-nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ jobs:
mv ../_results .
fi

- name: Download catalog data
run: python -m ${{ env.WORKING_DIR }}/copy_benchmark_data.py
env:
ABFS_LINCCDATA_ACCOUNT_NAME: ${{ secrets.LINCC_ABFS_ACCOUNT_NAME }}
ABFS_LINCCDATA_ACCOUNT_KEY: ${{ secrets.LINCC_ABFS_ACCOUNT_KEY }}

- name: Get nightly dates under comparison
id: nightly-dates
run: |
Expand Down
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ Then to run the tests:
pytest --cloud abfs
```

Alternatively, you can add the environment variables to your conda environment, and reactivate:

```
(condaenv)$ conda env config vars set ABFS_LINCCDATA_ACCOUNT_NAME=lincc_account_name
(condaenv)$ conda env config vars set ABFS_LINCCDATA_ACCOUNT_KEY=lincc_account_key
(condaenv)$ conda activate condaenv
```

### How are we connecting to the cloud resources?

We have abstracted our entire i/o infrastructure to be read through the python
Expand Down Expand Up @@ -94,7 +102,7 @@ def example_cloud_storage_options(cloud):

3. Finally, you will need to copy several `/tests/data/` directories into your newly
created bucket. This can be accomplished by running the `copy_data_to_fs.py` script.
4. Before running the tests, you will need to export your `valid_storage_option_param` into the environment.
4. Before running the tests, you will need to export your `valid_storage_option_param`s into the environment.


## Adding tests to the github workflows
Expand Down
13 changes: 13 additions & 0 deletions benchmarks/copy_benchmark_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import os

from hipscat_cloudtests.copy_cloud_directory import copy_tree_from_cloud

if __name__ == "__main__":
cloud_dir = "abfs://hipscat/benchmarks/"
local_dir = os.path.join(os.path.dirname(__file__), "_data")

storage_options = {
"account_name": os.environ.get("ABFS_LINCCDATA_ACCOUNT_NAME"),
"account_key": os.environ.get("ABFS_LINCCDATA_ACCOUNT_KEY"),
}
copy_tree_from_cloud(cloud_dir, local_dir, storage_options=storage_options, verbose=True)
80 changes: 3 additions & 77 deletions copy_data_to_fs.py
Original file line number Diff line number Diff line change
@@ -1,87 +1,13 @@
import os

from hipscat.io.file_io.file_io import get_fs


def copy_tree_fs_to_fs(
fs1_source: str,
fs2_destination: str,
storage_options1: dict = None,
storage_options2: dict = None,
verbose=False,
):
"""Recursive Copies directory from one filesystem to the other.

Args:
fs1_source: location of source directory to copy
fs2_destination: location of destination directory to for fs1 to be written two
storage_options1: dictionary that contains abstract filesystem1 credentials
storage_options2: dictionary that contains abstract filesystem2 credentials
"""

source_fs, source_fp = get_fs(fs1_source, storage_options=storage_options1)
destination_fs, desintation_fp = get_fs(fs2_destination, storage_options=storage_options2)
copy_dir(source_fs, source_fp, destination_fs, desintation_fp, verbose=verbose)


def copy_dir(
source_fs,
source_fp,
destination_fs,
desintation_fp,
verbose=False,
chunksize=1024 * 1024,
):
"""Recursive method to copy directories and their contents.

Args:
fs1: fsspec.filesystem for the source directory contents
fs1_pointer: source directory to copy content files
fs2: fsspec.filesytem for destination directory
fs2_pointer: destination directory for copied contents
"""
destination_folder = os.path.join(desintation_fp, source_fp.split("/")[-1])
if destination_folder[-1] != "/":
destination_folder += "/"
if not destination_fs.exists(destination_folder):
if verbose:
print(f"Creating destination folder: {destination_folder}")
destination_fs.makedirs(destination_folder, exist_ok=True)

dir_contents = source_fs.listdir(source_fp)
files = [x for x in source_fs.listdir(source_fp) if x["type"] == "file"]

for _file in files:
destination_fname = os.path.join(destination_folder, _file["name"].split("/")[-1])
if verbose:
print(f'Copying file {_file["name"]} to {destination_fname}')
with source_fs.open(_file["name"], "rb") as source_file:
with destination_fs.open(destination_fname, "wb") as destination_file:
while True:
chunk = source_file.read(chunksize)
if not chunk:
break
destination_file.write(chunk)

dirs = [x for x in dir_contents if x["type"] == "directory"]
for _dir in dirs:
copy_dir(
source_fs,
_dir["name"],
destination_fs,
destination_folder,
chunksize=chunksize,
verbose=verbose,
)

from hipscat_cloudtests.copy_cloud_directory import copy_tree_fs_to_fs

if __name__ == "__main__":

source_pw = f"{os.getcwd()}/../tests/data"
target_pw = "abfs://hipscat/pytests/lsdb"

target_so = {
"account_name": "linccdata",
"account_key": "ezBADSIGArKcI0JNHFdRfLF5S/64ZJcdrbXKbK5GJikF+YAC0hDAhMputN59HA4RS4N3HmjNZgdc+AStBFuQ6Q==",
"account_name": os.environ.get("ABFS_LINCCDATA_ACCOUNT_NAME"),
"account_key": os.environ.get("ABFS_LINCCDATA_ACCOUNT_KEY"),
}
copy_tree_fs_to_fs(source_pw, target_pw, {}, target_so, verbose=True)
88 changes: 88 additions & 0 deletions src/hipscat_cloudtests/copy_cloud_directory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import os

from hipscat.io.file_io.file_io import get_fs


def copy_tree_from_cloud(
cloud_path: str,
local_path: str,
storage_options: dict = None,
verbose=False,
):
"""Copy a directory from a cloud location to your local storage."""

source_fs, source_fp = get_fs(cloud_path, storage_options=storage_options)
destination_fs, desintation_fp = get_fs(local_path)
copy_dir(source_fs, source_fp, destination_fs, desintation_fp, verbose=verbose)


def copy_tree_fs_to_fs(
fs1_source: str,
fs2_destination: str,
storage_options1: dict = None,
storage_options2: dict = None,
verbose=False,
):
"""Recursive Copies directory from one filesystem to the other.

Args:
fs1_source: location of source directory to copy
fs2_destination: location of destination directory to for fs1 to be written two
storage_options1: dictionary that contains abstract filesystem1 credentials
storage_options2: dictionary that contains abstract filesystem2 credentials
"""

source_fs, source_fp = get_fs(fs1_source, storage_options=storage_options1)
destination_fs, desintation_fp = get_fs(fs2_destination, storage_options=storage_options2)
copy_dir(source_fs, source_fp, destination_fs, desintation_fp, verbose=verbose)


def copy_dir(
source_fs,
source_fp,
destination_fs,
desintation_fp,
verbose=False,
chunksize=1024 * 1024,
):
"""Recursive method to copy directories and their contents.

Args:
fs1: fsspec.filesystem for the source directory contents
fs1_pointer: source directory to copy content files
fs2: fsspec.filesytem for destination directory
fs2_pointer: destination directory for copied contents
"""
destination_folder = os.path.join(desintation_fp, source_fp.split("/")[-1])
if destination_folder[-1] != "/":
destination_folder += "/"
if not destination_fs.exists(destination_folder):
if verbose:
print(f"Creating destination folder: {destination_folder}")
destination_fs.makedirs(destination_folder, exist_ok=True)

dir_contents = source_fs.listdir(source_fp)
files = [x for x in source_fs.listdir(source_fp) if x["type"] == "file"]

for _file in files:
destination_fname = os.path.join(destination_folder, _file["name"].split("/")[-1])
if verbose:
print(f'Copying file {_file["name"]} to {destination_fname}')
with source_fs.open(_file["name"], "rb") as source_file:
with destination_fs.open(destination_fname, "wb") as destination_file:
while True:
chunk = source_file.read(chunksize)
if not chunk:
break
destination_file.write(chunk)

dirs = [x for x in dir_contents if x["type"] == "directory"]
for _dir in dirs:
copy_dir(
source_fs,
_dir["name"],
destination_fs,
destination_folder,
chunksize=chunksize,
verbose=verbose,
)
Loading