From 7cf606e187b3ff2bac3cda7547c190caa4dd4d92 Mon Sep 17 00:00:00 2001 From: Your GitHub Username Date: Tue, 3 Sep 2024 08:40:06 +0100 Subject: [PATCH 01/34] lakefs python code for branch creation, committing and merging --- src/lake_fs.py | 103 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 103 insertions(+) create mode 100644 src/lake_fs.py diff --git a/src/lake_fs.py b/src/lake_fs.py new file mode 100644 index 0000000..0760b4b --- /dev/null +++ b/src/lake_fs.py @@ -0,0 +1,103 @@ +import os +import glob +import config +from lakefs.client import Client +import lakefs +import subprocess +from datetime import datetime + +class LakeFSManager: + def __init__(self, host, username, password, repo_name): + self.client = Client(host=host, username=username, password=password) + self.repo_name = repo_name + self.repo = lakefs.Repository(repo_name, client=self.client) + + def execute_command(self, command): + try: + result = subprocess.run(command, check=True, capture_output=True, text=True) + print("Output:", result.stdout) + return result + except subprocess.CalledProcessError as e: + print("An error occurred while executing the command.") + print("Error message:", e.stderr) + return e + + def create_branch(self): + # Generate a unique branch name based on the current date and time + print("Creating branch for ingestion...") + branch_name = datetime.now().strftime("branch-%Y%m%d-%H%M%S") + branch = self.repo.branch(branch_name).create(source_reference="main") + return branch + + def upload_files_to_branch(self, branch_name, local_folder="data/local"): + print("Uploading files from data directory to branch...") + command = [ + "lakectl", "fs", "upload", + f"lakefs://{self.repo_name}/{branch_name}/", + "-s", local_folder, + "-r" + ] + self.execute_command(command) + + def commit_branch(self, branch_name, message): + print("Committing changes to branch...") + command = [ + "lakectl", "commit", + f"lakefs://{self.repo_name}/{branch_name}/", + "-m", message + ] + result = self.execute_command(command) + + if result.returncode != 0: + print(f"Commit failed for branch '{branch_name}'. Deleting branch and exiting.") + self.delete_branch(branch_name) + exit() + + def show_diff(self, branch): + main_branch = self.repo.branch("main") + changes = list(main_branch.diff(other_ref=branch)) + print(f"Number of changes made to main: {len(changes)}") + + def merge_branch(self, branch): + print("Merging branch to main...") + main_branch = self.repo.branch("main") + res = branch.merge_into(main_branch) + return res + + def delete_branch(self, branch_name): + print("Deleting branch.") + command = [ + "lakectl", "branch", "delete", + f"lakefs://{self.repo_name}/{branch_name}", + "--yes" + ] + self.execute_command(command) + +if __name__ == "__main__": + manager = LakeFSManager( + host="http://127.0.0.1:8000/", + username=config.username, + password=config.password, + repo_name="example-repo" + ) + + # Create a new branch with a unique name + branch = manager.create_branch() + + # Upload a local file to the new branch + manager.upload_files_to_branch( + branch_name=branch.id, + local_folder="data" + ) + + # Commit the uploaded files to the branch + manager.commit_branch( + branch_name=branch.id, + message="Uploaded new files." + ) + + # Show differences between the main branch and the new branch + manager.show_diff(branch) + + # Merge the new branch into the main branch + manager.merge_branch(branch) \ No newline at end of file From e0b0fe4aa360e8bcc6f9f85d338c98951ed50fd0 Mon Sep 17 00:00:00 2001 From: Your GitHub Username Date: Tue, 3 Sep 2024 08:40:27 +0100 Subject: [PATCH 02/34] initial test of lakefs workflow --- src/workflow.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/src/workflow.py b/src/workflow.py index 1d7e8bb..79be20f 100644 --- a/src/workflow.py +++ b/src/workflow.py @@ -77,6 +77,40 @@ def __call__(self, shot: int): url = self.upload_config.url + f"{shot}.{self.file_format}" if self.force or not self.fs.exists(url): create() + + # Do LakeFS stuff here, after the files have been created. This try block already checks whether the + # file already exists on S3, and either skips or forces it. My LakeFS code skips if there is no diff + # so should catch everything. + from lake_fs import LakeFSManager + import config + manager = LakeFSManager( + host="http://127.0.0.1:8000/", + username=config.username, + password=config.password, + repo_name="example-repo" + ) + + # Create a new branch with a unique name + branch = manager.create_branch() + + # Upload a local file to the new branch + manager.upload_files_to_branch( + branch_name=branch.id, + local_folder="data" + ) + + # Commit the uploaded files to the branch + manager.commit_branch( + branch_name=branch.id, + message="Uploaded new files." + ) + + # Show differences between the main branch and the new branch + manager.show_diff(branch) + + # Merge the new branch into the main branch + manager.merge_branch(branch) + upload() else: logging.info(f"Skipping shot {shot} as it already exists") From eb69a9d7b8bd05ab4af3dc3c1e6163ea93a54efc Mon Sep 17 00:00:00 2001 From: James Hodson Date: Thu, 5 Sep 2024 11:19:00 +0100 Subject: [PATCH 03/34] added crude versioning to local ingestion --- src/workflow.py | 38 ++++++-------------------------------- 1 file changed, 6 insertions(+), 32 deletions(-) diff --git a/src/workflow.py b/src/workflow.py index 79be20f..c4e1396 100644 --- a/src/workflow.py +++ b/src/workflow.py @@ -81,42 +81,12 @@ def __call__(self, shot: int): # Do LakeFS stuff here, after the files have been created. This try block already checks whether the # file already exists on S3, and either skips or forces it. My LakeFS code skips if there is no diff # so should catch everything. - from lake_fs import LakeFSManager - import config - manager = LakeFSManager( - host="http://127.0.0.1:8000/", - username=config.username, - password=config.password, - repo_name="example-repo" - ) - - # Create a new branch with a unique name - branch = manager.create_branch() - - # Upload a local file to the new branch - manager.upload_files_to_branch( - branch_name=branch.id, - local_folder="data" - ) - - # Commit the uploaded files to the branch - manager.commit_branch( - branch_name=branch.id, - message="Uploaded new files." - ) - - # Show differences between the main branch and the new branch - manager.show_diff(branch) - - # Merge the new branch into the main branch - manager.merge_branch(branch) - upload() + else: logging.info(f"Skipping shot {shot} as it already exists") except Exception as e: logging.error(f"Failed to run workflow with error {type(e)}: {e}") - cleanup() class LocalIngestionWorkflow: @@ -154,6 +124,10 @@ def __call__(self, shot: int): try: create() + from src.task import VersionDataTask + repo_name = "example-repo" + version = VersionDataTask("data/local", repo_name) + version() except Exception as e: import traceback trace = traceback.format_exc() @@ -168,7 +142,7 @@ class WorkflowManager: def __init__(self, workflow): self.workflow = workflow - def run_workflows(self, shot_list: list[int], parallel=True): + def run_workflows(self, shot_list: list[int], parallel=False): if parallel: self._run_workflows_parallel(shot_list) else: From 06889bf4c8e12b6509634f0e040328a17ee16b2e Mon Sep 17 00:00:00 2001 From: James Hodson Date: Thu, 5 Sep 2024 11:20:22 +0100 Subject: [PATCH 04/34] data versioning task added --- src/task.py | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/task.py b/src/task.py index e9bbe6e..27f7c68 100644 --- a/src/task.py +++ b/src/task.py @@ -12,9 +12,38 @@ from src.reader import DatasetReader, SignalMetadataReader, SourceMetadataReader from src.writer import DatasetWriter from src.uploader import UploadConfig +from src.lake_fs import LakeFSManager logging.basicConfig(level=logging.INFO) +class VersionDataTask: + def __init__(self, local_path, repo_name): + self.local_path = local_path + self.lakefs_manager = LakeFSManager(repo_name) + self.branch = None + + def __call__(self): + # 1. Create a new branch + self.branch = self.lakefs_manager.create_branch() + + # 2. Upload the files from the local path to the newly created branch + self.lakefs_manager.upload_files_to_branch( + branch_name=self.branch.id, + local_folder=self.local_path + ) + + # 3. Commit the changes to the branch + self.lakefs_manager.commit_branch( + branch_name=self.branch.id, + message="Uploaded new files." + ) + + # 4. Show differences between the main branch and the new branch + self.lakefs_manager.show_diff(self.branch) + + # 5. Merge the new branch into the main branch + self.lakefs_manager.merge_branch(self.branch) + class CleanupDatasetTask: From 1ae88df572250a3251dde7d71ffcc6cf5413586c Mon Sep 17 00:00:00 2001 From: James Hodson Date: Thu, 5 Sep 2024 11:20:53 +0100 Subject: [PATCH 05/34] removed config requirements, reads from lakectl config now --- src/lake_fs.py | 39 ++++----------------------------------- 1 file changed, 4 insertions(+), 35 deletions(-) diff --git a/src/lake_fs.py b/src/lake_fs.py index 0760b4b..afd2c21 100644 --- a/src/lake_fs.py +++ b/src/lake_fs.py @@ -1,14 +1,13 @@ import os import glob -import config from lakefs.client import Client import lakefs import subprocess from datetime import datetime class LakeFSManager: - def __init__(self, host, username, password, repo_name): - self.client = Client(host=host, username=username, password=password) + def __init__(self, repo_name): + self.client = Client() self.repo_name = repo_name self.repo = lakefs.Repository(repo_name, client=self.client) @@ -23,13 +22,12 @@ def execute_command(self, command): return e def create_branch(self): - # Generate a unique branch name based on the current date and time print("Creating branch for ingestion...") branch_name = datetime.now().strftime("branch-%Y%m%d-%H%M%S") branch = self.repo.branch(branch_name).create(source_reference="main") return branch - def upload_files_to_branch(self, branch_name, local_folder="data/local"): + def upload_files_to_branch(self, branch_name, local_folder): print("Uploading files from data directory to branch...") command = [ "lakectl", "fs", "upload", @@ -71,33 +69,4 @@ def delete_branch(self, branch_name): f"lakefs://{self.repo_name}/{branch_name}", "--yes" ] - self.execute_command(command) - -if __name__ == "__main__": - manager = LakeFSManager( - host="http://127.0.0.1:8000/", - username=config.username, - password=config.password, - repo_name="example-repo" - ) - - # Create a new branch with a unique name - branch = manager.create_branch() - - # Upload a local file to the new branch - manager.upload_files_to_branch( - branch_name=branch.id, - local_folder="data" - ) - - # Commit the uploaded files to the branch - manager.commit_branch( - branch_name=branch.id, - message="Uploaded new files." - ) - - # Show differences between the main branch and the new branch - manager.show_diff(branch) - - # Merge the new branch into the main branch - manager.merge_branch(branch) \ No newline at end of file + self.execute_command(command) \ No newline at end of file From aa87eddd799eb9ad47e2b58924c9b2c7d2e8fdbd Mon Sep 17 00:00:00 2001 From: James Hodson Date: Fri, 6 Sep 2024 09:19:22 +0100 Subject: [PATCH 06/34] removed cleanup function and versioning from workflow file --- src/workflow.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/src/workflow.py b/src/workflow.py index c4e1396..c770073 100644 --- a/src/workflow.py +++ b/src/workflow.py @@ -71,23 +71,21 @@ def __call__(self, shot: int): ) upload = UploadDatasetTask(local_path, self.upload_config) - cleanup = CleanupDatasetTask(local_path) + #cleanup = CleanupDatasetTask(local_path) try: url = self.upload_config.url + f"{shot}.{self.file_format}" if self.force or not self.fs.exists(url): create() - # Do LakeFS stuff here, after the files have been created. This try block already checks whether the - # file already exists on S3, and either skips or forces it. My LakeFS code skips if there is no diff - # so should catch everything. + upload() else: logging.info(f"Skipping shot {shot} as it already exists") except Exception as e: logging.error(f"Failed to run workflow with error {type(e)}: {e}") - cleanup() + #cleanup() class LocalIngestionWorkflow: @@ -124,10 +122,7 @@ def __call__(self, shot: int): try: create() - from src.task import VersionDataTask - repo_name = "example-repo" - version = VersionDataTask("data/local", repo_name) - version() + except Exception as e: import traceback trace = traceback.format_exc() @@ -142,7 +137,7 @@ class WorkflowManager: def __init__(self, workflow): self.workflow = workflow - def run_workflows(self, shot_list: list[int], parallel=False): + def run_workflows(self, shot_list: list[int], parallel=True): if parallel: self._run_workflows_parallel(shot_list) else: From 27ffe0b17a539394098dc84212d81ce43f60b408 Mon Sep 17 00:00:00 2001 From: James Hodson Date: Fri, 6 Sep 2024 09:19:58 +0100 Subject: [PATCH 07/34] improved versioning to get past mpi processes overlapping --- src/main.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/main.py b/src/main.py index a390782..64cacc9 100644 --- a/src/main.py +++ b/src/main.py @@ -68,6 +68,16 @@ def main(): workflow_manager.run_workflows(shot_list, parallel = not args.serial) logging.info(f"Finished source {source}") + from mpi4py import MPI + # Ensure only the process with rank 0 versions the data + comm = MPI.COMM_WORLD + rank = comm.Get_rank() + + if rank == 1: + from src.task import VersionDataTask + repo_name = "example-repo" + version = VersionDataTask(args.dataset_path, repo_name) + version() if __name__ == "__main__": main() From bb327df57cd318df3d71c4521744ed456ea87684 Mon Sep 17 00:00:00 2001 From: James Hodson Date: Fri, 6 Sep 2024 10:57:42 +0100 Subject: [PATCH 08/34] seperate lakefs versioning task, run external to ingestion --- src/lake_fs.py | 39 ++++++++++++++++++++++++++++++++------- 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/src/lake_fs.py b/src/lake_fs.py index afd2c21..2916ff7 100644 --- a/src/lake_fs.py +++ b/src/lake_fs.py @@ -1,5 +1,5 @@ import os -import glob +import argparse from lakefs.client import Client import lakefs import subprocess @@ -30,11 +30,11 @@ def create_branch(self): def upload_files_to_branch(self, branch_name, local_folder): print("Uploading files from data directory to branch...") command = [ - "lakectl", "fs", "upload", - f"lakefs://{self.repo_name}/{branch_name}/", - "-s", local_folder, - "-r" - ] + "lakectl", "fs", "upload", + f"lakefs://{self.repo_name}/{branch_name}/", + "-s", local_folder, + "-r" + ] self.execute_command(command) def commit_branch(self, branch_name, message): @@ -69,4 +69,29 @@ def delete_branch(self, branch_name): f"lakefs://{self.repo_name}/{branch_name}", "--yes" ] - self.execute_command(command) \ No newline at end of file + self.execute_command(command) + + +def main(): + # Parse command-line arguments + parser = argparse.ArgumentParser(description="LakeFS Ingestion Script") + parser.add_argument("--repo", required=True, help="Name of the LakeFS repository") + parser.add_argument("--data-dir", required=True, help="Location of the data files to upload") + parser.add_argument("--commit-message", default="Import data from CSD3", help="Commit message for the ingestion") + args = parser.parse_args() + + # Create the LakeFS manager instance + lakefs_manager = LakeFSManager(repo_name=args.repo) + + branch = lakefs_manager.create_branch() + lakefs_manager.upload_files_to_branch(branch.id, args.data_dir) + lakefs_manager.commit_branch(branch.id, args.commit_message) + lakefs_manager.show_diff(branch) + + # Merge the branch into main + lakefs_manager.merge_branch(branch) + lakefs_manager.delete_branch(branch.id) + + +if __name__ == "__main__": + main() \ No newline at end of file From 190fa4922ce731a75addec0ba73edf1e77e4841e Mon Sep 17 00:00:00 2001 From: James Hodson Date: Mon, 9 Sep 2024 09:22:49 +0100 Subject: [PATCH 09/34] removed mpi versioning, now doing seperate versioning process --- src/main.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/src/main.py b/src/main.py index 64cacc9..42392df 100644 --- a/src/main.py +++ b/src/main.py @@ -68,16 +68,5 @@ def main(): workflow_manager.run_workflows(shot_list, parallel = not args.serial) logging.info(f"Finished source {source}") - from mpi4py import MPI - # Ensure only the process with rank 0 versions the data - comm = MPI.COMM_WORLD - rank = comm.Get_rank() - - if rank == 1: - from src.task import VersionDataTask - repo_name = "example-repo" - version = VersionDataTask(args.dataset_path, repo_name) - version() - if __name__ == "__main__": main() From 0e0437ff3ebb2e1e5f46f267b45b556840322509 Mon Sep 17 00:00:00 2001 From: James Hodson Date: Mon, 9 Sep 2024 10:58:37 +0100 Subject: [PATCH 10/34] added extra validation for data dir, lakectl install --- src/lake_fs.py | 70 ++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 51 insertions(+), 19 deletions(-) diff --git a/src/lake_fs.py b/src/lake_fs.py index 2916ff7..831a307 100644 --- a/src/lake_fs.py +++ b/src/lake_fs.py @@ -1,9 +1,12 @@ import os import argparse +import logging from lakefs.client import Client import lakefs import subprocess from datetime import datetime +import uuid +import sys class LakeFSManager: def __init__(self, repo_name): @@ -11,24 +14,28 @@ def __init__(self, repo_name): self.repo_name = repo_name self.repo = lakefs.Repository(repo_name, client=self.client) + def execute_command(self, command): try: result = subprocess.run(command, check=True, capture_output=True, text=True) - print("Output:", result.stdout) + logging.info("Command output: %s", result.stdout) return result except subprocess.CalledProcessError as e: - print("An error occurred while executing the command.") - print("Error message:", e.stderr) + logging.error("Error executing command: %s", e.stderr) return e def create_branch(self): - print("Creating branch for ingestion...") - branch_name = datetime.now().strftime("branch-%Y%m%d-%H%M%S") - branch = self.repo.branch(branch_name).create(source_reference="main") - return branch + logging.info("Creating branch for ingestion...") + branch_name = f"branch-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{uuid.uuid4().hex[:8]}" + try: + branch = self.repo.branch(branch_name).create(source_reference="main") + return branch + except lakefs.exceptions.LakeFSException as e: + logging.error(f"Failed to create branch: {e}") + sys.exit(1) def upload_files_to_branch(self, branch_name, local_folder): - print("Uploading files from data directory to branch...") + logging.info("Uploading files from data directory to branch...") command = [ "lakectl", "fs", "upload", f"lakefs://{self.repo_name}/{branch_name}/", @@ -38,7 +45,7 @@ def upload_files_to_branch(self, branch_name, local_folder): self.execute_command(command) def commit_branch(self, branch_name, message): - print("Committing changes to branch...") + logging.info("Committing changes to branch...") command = [ "lakectl", "commit", f"lakefs://{self.repo_name}/{branch_name}/", @@ -47,23 +54,27 @@ def commit_branch(self, branch_name, message): result = self.execute_command(command) if result.returncode != 0: - print(f"Commit failed for branch '{branch_name}'. Deleting branch and exiting.") + logging.error(f"Commit failed for branch '{branch_name}'. Deleting branch and exiting.") self.delete_branch(branch_name) - exit() + sys.exit(1) def show_diff(self, branch): main_branch = self.repo.branch("main") changes = list(main_branch.diff(other_ref=branch)) - print(f"Number of changes made to main: {len(changes)}") + logging.info(f"Number of changes made to main: {len(changes)}") def merge_branch(self, branch): - print("Merging branch to main...") + logging.info("Merging branch to main...") main_branch = self.repo.branch("main") - res = branch.merge_into(main_branch) - return res + try: + res = branch.merge_into(main_branch) + return res + except lakefs.exceptions.LakeFSException as e: + logging.error(f"Failed to merge branch: {e}") + sys.exit(1) def delete_branch(self, branch_name): - print("Deleting branch.") + logging.info("Deleting branch.") command = [ "lakectl", "branch", "delete", f"lakefs://{self.repo_name}/{branch_name}", @@ -72,15 +83,37 @@ def delete_branch(self, branch_name): self.execute_command(command) +def check_lakectl_installed(): + try: + subprocess.run(["lakectl", "--version"], check=True, capture_output=True, text=True) + logging.info("lakectl is installed.") + except subprocess.CalledProcessError: + logging.error("lakectl is not installed or not found in PATH.") + sys.exit(1) + + +def validate_data_directory(local_folder): + if not os.path.exists(local_folder): + logging.error(f"Data directory '{local_folder}' does not exist.") + sys.exit(1) + elif not os.listdir(local_folder): + logging.error(f"Data directory '{local_folder}' is empty.") + sys.exit(1) + logging.info(f"Data directory '{local_folder}' is valid.") + + def main(): - # Parse command-line arguments + logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") + parser = argparse.ArgumentParser(description="LakeFS Ingestion Script") parser.add_argument("--repo", required=True, help="Name of the LakeFS repository") parser.add_argument("--data-dir", required=True, help="Location of the data files to upload") parser.add_argument("--commit-message", default="Import data from CSD3", help="Commit message for the ingestion") args = parser.parse_args() - # Create the LakeFS manager instance + check_lakectl_installed() + validate_data_directory(args.data_dir) + lakefs_manager = LakeFSManager(repo_name=args.repo) branch = lakefs_manager.create_branch() @@ -88,7 +121,6 @@ def main(): lakefs_manager.commit_branch(branch.id, args.commit_message) lakefs_manager.show_diff(branch) - # Merge the branch into main lakefs_manager.merge_branch(branch) lakefs_manager.delete_branch(branch.id) From 781df1ebf9c03bfe7207b2b935d46462bf60e4e7 Mon Sep 17 00:00:00 2001 From: James Hodson Date: Mon, 9 Sep 2024 14:54:15 +0100 Subject: [PATCH 11/34] added cleanup functionality --- src/lake_fs.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/lake_fs.py b/src/lake_fs.py index 831a307..b5ee70a 100644 --- a/src/lake_fs.py +++ b/src/lake_fs.py @@ -2,6 +2,7 @@ import argparse import logging from lakefs.client import Client +from src.task import CleanupDatasetTask import lakefs import subprocess from datetime import datetime @@ -101,6 +102,16 @@ def validate_data_directory(local_folder): sys.exit(1) logging.info(f"Data directory '{local_folder}' is valid.") +def perform_cleanup(data_dir): + logging.info("Starting dataset cleanup process...") + try: + cleanup = CleanupDatasetTask(data_dir) + cleanup() + logging.info(f"Cleanup successful for directory: {data_dir}") + except Exception as e: + logging.error(f"Error during cleanup: {e}") + raise + def main(): logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") @@ -124,6 +135,8 @@ def main(): lakefs_manager.merge_branch(branch) lakefs_manager.delete_branch(branch.id) + perform_cleanup(args.data_dir) + if __name__ == "__main__": main() \ No newline at end of file From 30831e8734c5a29d205f9781d55bfc6d17b61220 Mon Sep 17 00:00:00 2001 From: James Hodson Date: Mon, 9 Sep 2024 14:55:50 +0100 Subject: [PATCH 12/34] removed cleanup from workflow, moved to lake_fs.py --- src/workflow.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/workflow.py b/src/workflow.py index c770073..4909447 100644 --- a/src/workflow.py +++ b/src/workflow.py @@ -5,7 +5,6 @@ from src.task import ( CreateDatasetTask, UploadDatasetTask, - CleanupDatasetTask, CreateSignalMetadataTask, CreateSourceMetadataTask, ) @@ -71,21 +70,17 @@ def __call__(self, shot: int): ) upload = UploadDatasetTask(local_path, self.upload_config) - #cleanup = CleanupDatasetTask(local_path) try: url = self.upload_config.url + f"{shot}.{self.file_format}" if self.force or not self.fs.exists(url): create() - - upload() else: logging.info(f"Skipping shot {shot} as it already exists") except Exception as e: logging.error(f"Failed to run workflow with error {type(e)}: {e}") - #cleanup() class LocalIngestionWorkflow: From d866f06e1b6cb00f77eb30815bf9d8b61bdc3eb2 Mon Sep 17 00:00:00 2001 From: James Hodson Date: Mon, 9 Sep 2024 14:56:05 +0100 Subject: [PATCH 13/34] removed unused versioning data task --- src/task.py | 29 ----------------------------- 1 file changed, 29 deletions(-) diff --git a/src/task.py b/src/task.py index 27f7c68..e9bbe6e 100644 --- a/src/task.py +++ b/src/task.py @@ -12,38 +12,9 @@ from src.reader import DatasetReader, SignalMetadataReader, SourceMetadataReader from src.writer import DatasetWriter from src.uploader import UploadConfig -from src.lake_fs import LakeFSManager logging.basicConfig(level=logging.INFO) -class VersionDataTask: - def __init__(self, local_path, repo_name): - self.local_path = local_path - self.lakefs_manager = LakeFSManager(repo_name) - self.branch = None - - def __call__(self): - # 1. Create a new branch - self.branch = self.lakefs_manager.create_branch() - - # 2. Upload the files from the local path to the newly created branch - self.lakefs_manager.upload_files_to_branch( - branch_name=self.branch.id, - local_folder=self.local_path - ) - - # 3. Commit the changes to the branch - self.lakefs_manager.commit_branch( - branch_name=self.branch.id, - message="Uploaded new files." - ) - - # 4. Show differences between the main branch and the new branch - self.lakefs_manager.show_diff(self.branch) - - # 5. Merge the new branch into the main branch - self.lakefs_manager.merge_branch(self.branch) - class CleanupDatasetTask: From fd80c08343508c4ac8176992b63185b4979f460d Mon Sep 17 00:00:00 2001 From: James Hodson Date: Thu, 12 Sep 2024 14:36:29 +0100 Subject: [PATCH 14/34] create branch for versioning at beginning --- src/main.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/src/main.py b/src/main.py index 42392df..63bf7b5 100644 --- a/src/main.py +++ b/src/main.py @@ -2,12 +2,27 @@ import logging from functools import partial from dask_mpi import initialize +from mpi4py import MPI from src.uploader import UploadConfig from src.workflow import S3IngestionWorkflow, LocalIngestionWorkflow, WorkflowManager from src.utils import read_shot_file - +from src.lake_fs import LakeFSManager def main(): + + # Initialize the MPI communicator + comm = MPI.COMM_WORLD + # Get the rank of the current process + rank = comm.Get_rank() + from lakefs.client import Client + import lakefs + client = Client() + repo = lakefs.Repository("example-repo", client=client) + # Run the function only on process with rank 0 + if rank == 0: + branch = lakefs.repository("example-repo").branch("ingestion").create(source_reference="main") + + initialize() logging.basicConfig(level=logging.INFO) From a3b586f5b9b3c6e229c7550ae522df002491c133 Mon Sep 17 00:00:00 2001 From: James Hodson Date: Thu, 12 Sep 2024 14:36:58 +0100 Subject: [PATCH 15/34] commit shot after it is creating to branch ingestion --- src/workflow.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/workflow.py b/src/workflow.py index 4909447..271a013 100644 --- a/src/workflow.py +++ b/src/workflow.py @@ -9,6 +9,7 @@ CreateSourceMetadataTask, ) from src.uploader import UploadConfig +import subprocess logging.basicConfig(level=logging.INFO) @@ -117,6 +118,22 @@ def __call__(self, shot: int): try: create() + # want to commit the shot here + print(f"{self.data_dir}/{shot}.{self.file_format}") + command = [ + "lakectl", "fs", "upload", + f"lakefs://example-repo/ingestion/{shot}.{self.file_format}", + "-s", f"{self.data_dir}/{shot}.{self.file_format}", "--recursive" + ] + + try: + result = subprocess.run(command, check=True, capture_output=True, text=True) + print("Command executed successfully.") + print("Output:", result.stdout) + except subprocess.CalledProcessError as e: + print("An error occurred while executing the command.") + print("Error message:", e.stderr) + # then delete it except Exception as e: import traceback From 207074e80afedecc5c11a415277963d656a0a942 Mon Sep 17 00:00:00 2001 From: James Hodson Date: Thu, 12 Sep 2024 15:39:20 +0100 Subject: [PATCH 16/34] removed versioning from workflow --- src/workflow.py | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/src/workflow.py b/src/workflow.py index 271a013..8553b0e 100644 --- a/src/workflow.py +++ b/src/workflow.py @@ -118,22 +118,6 @@ def __call__(self, shot: int): try: create() - # want to commit the shot here - print(f"{self.data_dir}/{shot}.{self.file_format}") - command = [ - "lakectl", "fs", "upload", - f"lakefs://example-repo/ingestion/{shot}.{self.file_format}", - "-s", f"{self.data_dir}/{shot}.{self.file_format}", "--recursive" - ] - - try: - result = subprocess.run(command, check=True, capture_output=True, text=True) - print("Command executed successfully.") - print("Output:", result.stdout) - except subprocess.CalledProcessError as e: - print("An error occurred while executing the command.") - print("Error message:", e.stderr) - # then delete it except Exception as e: import traceback From 5cdaed696e3dde798b9c00a0bb8ca202d5854b4b Mon Sep 17 00:00:00 2001 From: James Hodson Date: Thu, 12 Sep 2024 15:39:42 +0100 Subject: [PATCH 17/34] working example of versioning, committing each shot at the end of their creation to the branch seperately --- src/main.py | 44 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 9 deletions(-) diff --git a/src/main.py b/src/main.py index 63bf7b5..4901801 100644 --- a/src/main.py +++ b/src/main.py @@ -3,24 +3,19 @@ from functools import partial from dask_mpi import initialize from mpi4py import MPI +import lakefs from src.uploader import UploadConfig from src.workflow import S3IngestionWorkflow, LocalIngestionWorkflow, WorkflowManager from src.utils import read_shot_file -from src.lake_fs import LakeFSManager +import subprocess def main(): # Initialize the MPI communicator comm = MPI.COMM_WORLD - # Get the rank of the current process rank = comm.Get_rank() - from lakefs.client import Client - import lakefs - client = Client() - repo = lakefs.Repository("example-repo", client=client) - # Run the function only on process with rank 0 if rank == 0: - branch = lakefs.repository("example-repo").branch("ingestion").create(source_reference="main") + lakefs.repository("example-repo").branch("ingestion").create(source_reference="main") initialize() @@ -83,5 +78,36 @@ def main(): workflow_manager.run_workflows(shot_list, parallel = not args.serial) logging.info(f"Finished source {source}") + for shot in shot_list: + file_path = args.dataset_path + f"/{shot}.{args.file_format}" + command = [ + "lakectl", "fs", "upload", + f"lakefs://example-repo/ingestion/{shot}.{args.file_format}", + "-s", str(file_path), "--recursive" + ] + + try: + result = subprocess.run(command, check=True, capture_output=True, text=True) + print("Command executed successfully.") + print("Output:", result.stdout) + except subprocess.CalledProcessError as e: + print("An error occurred while executing the command.") + print("Error message:", e.stderr) + + command = [ + "lakectl", "commit", + f"lakefs://example-repo/ingestion/", + "-m", f"Commit shot {shot}" + ] + + try: + result = subprocess.run(command, check=True, capture_output=True, text=True) + print("Command executed successfully.") + print("Output:", result.stdout) + except subprocess.CalledProcessError as e: + print("An error occurred while executing the command.") + print("Error message:", e.stderr) + + if __name__ == "__main__": - main() + main() \ No newline at end of file From 8613524de2f0a8a8a420ebc140a29e6984c03d07 Mon Sep 17 00:00:00 2001 From: James Hodson Date: Thu, 12 Sep 2024 16:06:36 +0100 Subject: [PATCH 18/34] organised versioning into functions --- src/main.py | 84 ++++++++++++++++++++++++++++++----------------------- 1 file changed, 47 insertions(+), 37 deletions(-) diff --git a/src/main.py b/src/main.py index 4901801..1305104 100644 --- a/src/main.py +++ b/src/main.py @@ -8,16 +8,55 @@ from src.workflow import S3IngestionWorkflow, LocalIngestionWorkflow, WorkflowManager from src.utils import read_shot_file import subprocess +from pathlib import Path -def main(): - # Initialize the MPI communicator +def initialize_lakefs_branch(): + """Initialize the lakeFS ingestion branch if on rank 0.""" comm = MPI.COMM_WORLD rank = comm.Get_rank() if rank == 0: lakefs.repository("example-repo").branch("ingestion").create(source_reference="main") +def execute_lakectl_command(command, error_message): + """Helper function to execute lakectl commands with error handling.""" + try: + result = subprocess.run(command, check=True, capture_output=True, text=True) + logging.info("Command executed successfully.") + logging.info("Output: %s", result.stdout) + return True + except subprocess.CalledProcessError as e: + logging.error(error_message) + logging.error("Error message: %s", e.stderr) + return False + + +def upload_shot_to_lakefs(shot, dataset_path, file_format): + """Upload a specific shot to lakeFS.""" + file_path = f"{dataset_path}/{shot}.{file_format}" + upload_command = [ + "lakectl", "fs", "upload", + f"lakefs://example-repo/ingestion/{shot}.{file_format}", + "-s", str(file_path), "--recursive" + ] + if execute_lakectl_command(upload_command, f"Failed to upload shot {shot} to lakeFS"): + logging.info(f"Uploaded shot {shot} to lakeFS.") + + +def commit_shot_to_lakefs(shot): + """Commit the uploaded shot to lakeFS.""" + commit_command = [ + "lakectl", "commit", + "lakefs://example-repo/ingestion/", + "-m", f"Commit shot {shot}" + ] + if execute_lakectl_command(commit_command, f"Failed to commit shot {shot} to lakeFS"): + logging.info(f"Committed shot {shot} to lakeFS.") + +def main(): + + initialize_lakefs_branch() initialize() logging.basicConfig(level=logging.INFO) @@ -43,10 +82,7 @@ def main(): args = parser.parse_args() if args.upload: - bucket_path = args.bucket_path - # Bucket path must have trailing slash - bucket_path = bucket_path + '/' if not bucket_path.endswith('/') else bucket_path - + bucket_path = args.bucket_path.rstrip('/') + '/' config = UploadConfig( credentials_file=args.credentials_file, endpoint_url=args.endpoint_url, @@ -57,7 +93,6 @@ def main(): config = None workflow_cls = LocalIngestionWorkflow - shot_list = read_shot_file(args.shot_file) for source in args.source_names: @@ -75,39 +110,14 @@ def main(): ) workflow_manager = WorkflowManager(workflow) - workflow_manager.run_workflows(shot_list, parallel = not args.serial) + workflow_manager.run_workflows(shot_list, parallel=not args.serial) logging.info(f"Finished source {source}") + # Upload and commit each shot to lakeFS for shot in shot_list: - file_path = args.dataset_path + f"/{shot}.{args.file_format}" - command = [ - "lakectl", "fs", "upload", - f"lakefs://example-repo/ingestion/{shot}.{args.file_format}", - "-s", str(file_path), "--recursive" - ] - - try: - result = subprocess.run(command, check=True, capture_output=True, text=True) - print("Command executed successfully.") - print("Output:", result.stdout) - except subprocess.CalledProcessError as e: - print("An error occurred while executing the command.") - print("Error message:", e.stderr) - - command = [ - "lakectl", "commit", - f"lakefs://example-repo/ingestion/", - "-m", f"Commit shot {shot}" - ] - - try: - result = subprocess.run(command, check=True, capture_output=True, text=True) - print("Command executed successfully.") - print("Output:", result.stdout) - except subprocess.CalledProcessError as e: - print("An error occurred while executing the command.") - print("Error message:", e.stderr) + upload_shot_to_lakefs(shot, args.dataset_path, args.file_format) + commit_shot_to_lakefs(shot) if __name__ == "__main__": - main() \ No newline at end of file + main() From a425809275460a96294c25af15f1eb1d5cf52881 Mon Sep 17 00:00:00 2001 From: James Hodson Date: Mon, 16 Sep 2024 14:51:11 +0100 Subject: [PATCH 19/34] added option to turn on or off versioning --- src/main.py | 53 ++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 40 insertions(+), 13 deletions(-) diff --git a/src/main.py b/src/main.py index 1305104..e5f993b 100644 --- a/src/main.py +++ b/src/main.py @@ -9,14 +9,16 @@ from src.utils import read_shot_file import subprocess from pathlib import Path +import shutil +import sys -def initialize_lakefs_branch(): +def initialize_lakefs_branch(repo_name): """Initialize the lakeFS ingestion branch if on rank 0.""" comm = MPI.COMM_WORLD rank = comm.Get_rank() if rank == 0: - lakefs.repository("example-repo").branch("ingestion").create(source_reference="main") + lakefs.repository(repo_name).branch("ingestion").create(source_reference="main") def execute_lakectl_command(command, error_message): @@ -32,31 +34,43 @@ def execute_lakectl_command(command, error_message): return False -def upload_shot_to_lakefs(shot, dataset_path, file_format): +def upload_shot_to_lakefs(shot, dataset_path, file_format, repo_name): """Upload a specific shot to lakeFS.""" file_path = f"{dataset_path}/{shot}.{file_format}" upload_command = [ "lakectl", "fs", "upload", - f"lakefs://example-repo/ingestion/{shot}.{file_format}", + f"lakefs://{repo_name}/ingestion/{shot}.{file_format}", "-s", str(file_path), "--recursive" ] if execute_lakectl_command(upload_command, f"Failed to upload shot {shot} to lakeFS"): logging.info(f"Uploaded shot {shot} to lakeFS.") -def commit_shot_to_lakefs(shot): +def commit_shot_to_lakefs(shot, repo_name): """Commit the uploaded shot to lakeFS.""" commit_command = [ "lakectl", "commit", - "lakefs://example-repo/ingestion/", + f"lakefs://{repo_name}/ingestion/", "-m", f"Commit shot {shot}" ] if execute_lakectl_command(commit_command, f"Failed to commit shot {shot} to lakeFS"): logging.info(f"Committed shot {shot} to lakeFS.") + + +def remove_shot_from_local(shot, dataset_path, file_format): + """Remove the shot file from the local filesystem.""" + file_path = Path(f"{dataset_path}/{shot}.{file_format}") + try: + if file_path.exists(): + shutil.rmtree(file_path) + logging.info(f"Removed local file: {file_path}") + else: + logging.warning(f"File {file_path} not found.") + except Exception as e: + logging.error(f"Failed to remove file {file_path}: {e}") -def main(): - initialize_lakefs_branch() +def main(): initialize() logging.basicConfig(level=logging.INFO) @@ -78,9 +92,18 @@ def main(): parser.add_argument("--source_names", nargs="*", default=[]) parser.add_argument("--file_format", choices=['zarr', 'nc', 'h5'], default='zarr') parser.add_argument("--facility", choices=['MAST', 'MASTU'], default='MAST') + parser.add_argument("--version", default=False, action="store_true", help="Version the data in lakeFS") + parser.add_argument("--repo_name", help="Specify the lakeFS repository for versioning") args = parser.parse_args() + if args.version and not args.repo_name: + logging.error("--repo_name is required when --version is enabled. Aborting") + comm = MPI.COMM_WORLD + comm.Abort() + else: + initialize_lakefs_branch(args.repo_name) + if args.upload: bucket_path = args.bucket_path.rstrip('/') + '/' config = UploadConfig( @@ -113,11 +136,15 @@ def main(): workflow_manager.run_workflows(shot_list, parallel=not args.serial) logging.info(f"Finished source {source}") - # Upload and commit each shot to lakeFS - for shot in shot_list: - upload_shot_to_lakefs(shot, args.dataset_path, args.file_format) - commit_shot_to_lakefs(shot) - + # Upload, version, and commit each shot to lakeFS if versioning is enabled + if args.version: + for shot in shot_list: + upload_shot_to_lakefs(shot, args.dataset_path, args.file_format, args.repo_name) + commit_shot_to_lakefs(shot, args.repo_name) + remove_shot_from_local(shot, args.dataset_path, args.file_format) + else: + logging.info("Versioning is disabled, no data will be uploaded or versioned.") if __name__ == "__main__": main() + From 23139595388fccce9b94f4bc40458356f373bc6c Mon Sep 17 00:00:00 2001 From: James Hodson Date: Mon, 16 Sep 2024 15:21:06 +0100 Subject: [PATCH 20/34] removed args for versioning for now --- src/main.py | 39 +++++++++++++-------------------------- 1 file changed, 13 insertions(+), 26 deletions(-) diff --git a/src/main.py b/src/main.py index e5f993b..e01ec63 100644 --- a/src/main.py +++ b/src/main.py @@ -10,15 +10,14 @@ import subprocess from pathlib import Path import shutil -import sys -def initialize_lakefs_branch(repo_name): +def initialize_lakefs_branch(): """Initialize the lakeFS ingestion branch if on rank 0.""" comm = MPI.COMM_WORLD rank = comm.Get_rank() if rank == 0: - lakefs.repository(repo_name).branch("ingestion").create(source_reference="main") + lakefs.repository("example-repo").branch("ingestion").create(source_reference="main") def execute_lakectl_command(command, error_message): @@ -34,23 +33,23 @@ def execute_lakectl_command(command, error_message): return False -def upload_shot_to_lakefs(shot, dataset_path, file_format, repo_name): +def upload_shot_to_lakefs(shot, dataset_path, file_format): """Upload a specific shot to lakeFS.""" file_path = f"{dataset_path}/{shot}.{file_format}" upload_command = [ "lakectl", "fs", "upload", - f"lakefs://{repo_name}/ingestion/{shot}.{file_format}", + f"lakefs://example-repo/ingestion/{shot}.{file_format}", "-s", str(file_path), "--recursive" ] if execute_lakectl_command(upload_command, f"Failed to upload shot {shot} to lakeFS"): logging.info(f"Uploaded shot {shot} to lakeFS.") -def commit_shot_to_lakefs(shot, repo_name): +def commit_shot_to_lakefs(shot): """Commit the uploaded shot to lakeFS.""" commit_command = [ "lakectl", "commit", - f"lakefs://{repo_name}/ingestion/", + "lakefs://example-repo/ingestion/", "-m", f"Commit shot {shot}" ] if execute_lakectl_command(commit_command, f"Failed to commit shot {shot} to lakeFS"): @@ -69,8 +68,9 @@ def remove_shot_from_local(shot, dataset_path, file_format): except Exception as e: logging.error(f"Failed to remove file {file_path}: {e}") - def main(): + + initialize_lakefs_branch() initialize() logging.basicConfig(level=logging.INFO) @@ -92,18 +92,9 @@ def main(): parser.add_argument("--source_names", nargs="*", default=[]) parser.add_argument("--file_format", choices=['zarr', 'nc', 'h5'], default='zarr') parser.add_argument("--facility", choices=['MAST', 'MASTU'], default='MAST') - parser.add_argument("--version", default=False, action="store_true", help="Version the data in lakeFS") - parser.add_argument("--repo_name", help="Specify the lakeFS repository for versioning") args = parser.parse_args() - if args.version and not args.repo_name: - logging.error("--repo_name is required when --version is enabled. Aborting") - comm = MPI.COMM_WORLD - comm.Abort() - else: - initialize_lakefs_branch(args.repo_name) - if args.upload: bucket_path = args.bucket_path.rstrip('/') + '/' config = UploadConfig( @@ -136,15 +127,11 @@ def main(): workflow_manager.run_workflows(shot_list, parallel=not args.serial) logging.info(f"Finished source {source}") - # Upload, version, and commit each shot to lakeFS if versioning is enabled - if args.version: - for shot in shot_list: - upload_shot_to_lakefs(shot, args.dataset_path, args.file_format, args.repo_name) - commit_shot_to_lakefs(shot, args.repo_name) - remove_shot_from_local(shot, args.dataset_path, args.file_format) - else: - logging.info("Versioning is disabled, no data will be uploaded or versioned.") + # Upload and commit each shot to lakeFS + for shot in shot_list: + upload_shot_to_lakefs(shot, args.dataset_path, args.file_format) + commit_shot_to_lakefs(shot) + remove_shot_from_local(shot, args.dataset_path, args.file_format) if __name__ == "__main__": main() - From 42c32f8f94429675d679d99c4a86b601a74338df Mon Sep 17 00:00:00 2001 From: James Hodson Date: Tue, 17 Sep 2024 10:32:15 +0100 Subject: [PATCH 21/34] added merge and branch delete --- src/main.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/main.py b/src/main.py index e01ec63..e3fff1f 100644 --- a/src/main.py +++ b/src/main.py @@ -68,6 +68,25 @@ def remove_shot_from_local(shot, dataset_path, file_format): except Exception as e: logging.error(f"Failed to remove file {file_path}: {e}") +def merge_branch(): + logging.info("Merging branch to main...") + main_branch = lakefs.repository("example-repo").branch("main") + ingestion_branch = lakefs.repository("example-repo").branch("ingestion") + try: + res = ingestion_branch.merge_into(main_branch) + except lakefs.exceptions.LakeFSException as e: + logging.error(f"Failed to merge branch: {e}") + +def delete_branch(): + logging.info("Deleting branch.") + command = [ + "lakectl", "branch", "delete", + f"lakefs://example-repo/ingestion", + "--yes" + ] + if execute_lakectl_command(command, f"Failed to delete branch."): + logging.info(f"Branch deleted.") + def main(): initialize_lakefs_branch() @@ -132,6 +151,8 @@ def main(): upload_shot_to_lakefs(shot, args.dataset_path, args.file_format) commit_shot_to_lakefs(shot) remove_shot_from_local(shot, args.dataset_path, args.file_format) + merge_branch() + delete_branch() if __name__ == "__main__": main() From 306f86895eba4980ad13e653c94a811c4aea70ce Mon Sep 17 00:00:00 2001 From: James Hodson Date: Tue, 17 Sep 2024 14:30:42 +0100 Subject: [PATCH 22/34] rework of ingestion workflow to use lakefs instead, does not work fully due to network problems with the lakefs instance. WIP --- src/main.py | 100 ++---------------------------------------------- src/task.py | 41 +++++++++----------- src/uploader.py | 5 +++ src/workflow.py | 16 ++------ 4 files changed, 32 insertions(+), 130 deletions(-) diff --git a/src/main.py b/src/main.py index e3fff1f..ce5dafb 100644 --- a/src/main.py +++ b/src/main.py @@ -2,94 +2,13 @@ import logging from functools import partial from dask_mpi import initialize -from mpi4py import MPI import lakefs -from src.uploader import UploadConfig +from src.uploader import UploadConfig, LakeFSUploadConfig from src.workflow import S3IngestionWorkflow, LocalIngestionWorkflow, WorkflowManager from src.utils import read_shot_file -import subprocess -from pathlib import Path -import shutil - - -def initialize_lakefs_branch(): - """Initialize the lakeFS ingestion branch if on rank 0.""" - comm = MPI.COMM_WORLD - rank = comm.Get_rank() - if rank == 0: - lakefs.repository("example-repo").branch("ingestion").create(source_reference="main") - - -def execute_lakectl_command(command, error_message): - """Helper function to execute lakectl commands with error handling.""" - try: - result = subprocess.run(command, check=True, capture_output=True, text=True) - logging.info("Command executed successfully.") - logging.info("Output: %s", result.stdout) - return True - except subprocess.CalledProcessError as e: - logging.error(error_message) - logging.error("Error message: %s", e.stderr) - return False - - -def upload_shot_to_lakefs(shot, dataset_path, file_format): - """Upload a specific shot to lakeFS.""" - file_path = f"{dataset_path}/{shot}.{file_format}" - upload_command = [ - "lakectl", "fs", "upload", - f"lakefs://example-repo/ingestion/{shot}.{file_format}", - "-s", str(file_path), "--recursive" - ] - if execute_lakectl_command(upload_command, f"Failed to upload shot {shot} to lakeFS"): - logging.info(f"Uploaded shot {shot} to lakeFS.") - - -def commit_shot_to_lakefs(shot): - """Commit the uploaded shot to lakeFS.""" - commit_command = [ - "lakectl", "commit", - "lakefs://example-repo/ingestion/", - "-m", f"Commit shot {shot}" - ] - if execute_lakectl_command(commit_command, f"Failed to commit shot {shot} to lakeFS"): - logging.info(f"Committed shot {shot} to lakeFS.") - - -def remove_shot_from_local(shot, dataset_path, file_format): - """Remove the shot file from the local filesystem.""" - file_path = Path(f"{dataset_path}/{shot}.{file_format}") - try: - if file_path.exists(): - shutil.rmtree(file_path) - logging.info(f"Removed local file: {file_path}") - else: - logging.warning(f"File {file_path} not found.") - except Exception as e: - logging.error(f"Failed to remove file {file_path}: {e}") - -def merge_branch(): - logging.info("Merging branch to main...") - main_branch = lakefs.repository("example-repo").branch("main") - ingestion_branch = lakefs.repository("example-repo").branch("ingestion") - try: - res = ingestion_branch.merge_into(main_branch) - except lakefs.exceptions.LakeFSException as e: - logging.error(f"Failed to merge branch: {e}") - -def delete_branch(): - logging.info("Deleting branch.") - command = [ - "lakectl", "branch", "delete", - f"lakefs://example-repo/ingestion", - "--yes" - ] - if execute_lakectl_command(command, f"Failed to delete branch."): - logging.info(f"Branch deleted.") def main(): - initialize_lakefs_branch() initialize() logging.basicConfig(level=logging.INFO) @@ -100,10 +19,9 @@ def main(): parser.add_argument("dataset_path") parser.add_argument("shot_file") - parser.add_argument("--bucket_path") - parser.add_argument("--credentials_file", default=".s5cfg.stfc") + parser.add_argument("--credentials_file", default="lakectl.cfg") parser.add_argument("--serial", default=False, action='store_true') - parser.add_argument("--endpoint_url", default="https://s3.echo.stfc.ac.uk") + parser.add_argument("--endpoint_url", default="http://localhost:8000") parser.add_argument("--upload", default=False, action="store_true") parser.add_argument("--metadata_dir", default="data/uda") parser.add_argument("--force", action="store_true") @@ -113,13 +31,10 @@ def main(): parser.add_argument("--facility", choices=['MAST', 'MASTU'], default='MAST') args = parser.parse_args() - if args.upload: - bucket_path = args.bucket_path.rstrip('/') + '/' - config = UploadConfig( + config = LakeFSUploadConfig( credentials_file=args.credentials_file, endpoint_url=args.endpoint_url, - url=bucket_path, ) workflow_cls = partial(S3IngestionWorkflow, upload_config=config) else: @@ -146,13 +61,6 @@ def main(): workflow_manager.run_workflows(shot_list, parallel=not args.serial) logging.info(f"Finished source {source}") - # Upload and commit each shot to lakeFS - for shot in shot_list: - upload_shot_to_lakefs(shot, args.dataset_path, args.file_format) - commit_shot_to_lakefs(shot) - remove_shot_from_local(shot, args.dataset_path, args.file_format) - merge_branch() - delete_branch() if __name__ == "__main__": main() diff --git a/src/task.py b/src/task.py index e9bbe6e..0002bf6 100644 --- a/src/task.py +++ b/src/task.py @@ -11,11 +11,11 @@ from src.mast import MASTClient from src.reader import DatasetReader, SignalMetadataReader, SourceMetadataReader from src.writer import DatasetWriter -from src.uploader import UploadConfig - +from src.uploader import UploadConfig, LakeFSUploadConfig logging.basicConfig(level=logging.INFO) + class CleanupDatasetTask: def __init__(self, path: str) -> None: @@ -30,40 +30,37 @@ def __call__(self): class UploadDatasetTask: - def __init__(self, local_file: Path, config: UploadConfig): + def __init__(self, local_file: Path, config: LakeFSUploadConfig): self.config = config self.local_file = local_file def __call__(self): - logging.info(f"Uploading {self.local_file} to {self.config.url}") if not Path(self.config.credentials_file).exists(): raise RuntimeError(f"Credentials file {self.config.credentials_file} does not exist!") env = os.environ.copy() - + logging.info(f"Uploading {self.local_file} to LakeFS.") args = [ - "s5cmd", - "--credentials-file", - self.config.credentials_file, - "--endpoint-url", - self.config.endpoint_url, - "cp", - "--acl", - "public-read", - str(self.local_file), - self.config.url, + "lakectl", "fs", "upload", + f"lakefs://example-repo/ingestion/{self.local_file}", + "-s", str(self.local_file), "--recursive" ] logging.debug(' ' .join(args)) - subprocess.run( - args, - stdout=subprocess.DEVNULL, - stderr=subprocess.STDOUT, - env=env, - check=True, - ) + try: + result = subprocess.run( + args, + capture_output=True, + env=env, + check=True + ) + logging.info(f"Successfully uploaded {self.local_file} to LakeFS.") + logging.debug(f"Command output: {result.stdout.decode()}") + except subprocess.CalledProcessError as e: + logging.error(f"Failed to upload {self.local_file}: {e.stderr.decode()}") + raise class CreateDatasetTask: diff --git a/src/uploader.py b/src/uploader.py index 335a776..b9a8547 100644 --- a/src/uploader.py +++ b/src/uploader.py @@ -6,3 +6,8 @@ class UploadConfig: url: str endpoint_url: str credentials_file: str + +@dataclass +class LakeFSUploadConfig: + endpoint_url: str + credentials_file: str diff --git a/src/workflow.py b/src/workflow.py index 8553b0e..82e3c93 100644 --- a/src/workflow.py +++ b/src/workflow.py @@ -8,7 +8,7 @@ CreateSignalMetadataTask, CreateSourceMetadataTask, ) -from src.uploader import UploadConfig +from src.uploader import UploadConfig, LakeFSUploadConfig import subprocess logging.basicConfig(level=logging.INFO) @@ -39,7 +39,7 @@ def __init__( self, metadata_dir: str, data_dir: str, - upload_config: UploadConfig, + upload_config: LakeFSUploadConfig, force: bool = True, signal_names: list[str] = [], source_names: list[str] = [], @@ -52,9 +52,6 @@ def __init__( self.force = force self.signal_names = signal_names self.source_names = source_names - self.fs = s3fs.S3FileSystem( - anon=True, client_kwargs={"endpoint_url": self.upload_config.endpoint_url} - ) self.file_format = file_format self.facility = facility @@ -73,13 +70,8 @@ def __call__(self, shot: int): upload = UploadDatasetTask(local_path, self.upload_config) try: - url = self.upload_config.url + f"{shot}.{self.file_format}" - if self.force or not self.fs.exists(url): - create() - upload() - - else: - logging.info(f"Skipping shot {shot} as it already exists") + create() + upload() except Exception as e: logging.error(f"Failed to run workflow with error {type(e)}: {e}") From 7629c8051d01b84f7d79191dcefe15bfacfc5eaa Mon Sep 17 00:00:00 2001 From: James Hodson Date: Wed, 18 Sep 2024 10:59:56 +0100 Subject: [PATCH 23/34] added committing to the branch and cleanup to remove local files after the fact --- src/task.py | 35 ++++++++++++++++++++++++++++++++++- src/workflow.py | 10 ++++++++-- 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/src/task.py b/src/task.py index 0002bf6..832e25d 100644 --- a/src/task.py +++ b/src/task.py @@ -28,7 +28,7 @@ def __call__(self): logging.warning(f"Cannot remove path: {self.path}") -class UploadDatasetTask: +class LakeFSUploadDatasetTask: def __init__(self, local_file: Path, config: LakeFSUploadConfig): self.config = config @@ -62,6 +62,39 @@ def __call__(self): logging.error(f"Failed to upload {self.local_file}: {e.stderr.decode()}") raise +class LakeFSCommitDatasetTask: + + def __init__(self, local_file: Path, config: LakeFSUploadConfig): + self.config = config + self.local_file = local_file + + def __call__(self): + + if not Path(self.config.credentials_file).exists(): + raise RuntimeError(f"Credentials file {self.config.credentials_file} does not exist!") + + env = os.environ.copy() + logging.info(f"Commit {self.local_file} to branch.") + args = [ + "lakectl", "commit", + f"lakefs://example-repo/ingestion/", + "-m", f"Commit file {self.local_file}" + ] + + logging.debug(' ' .join(args)) + + try: + result = subprocess.run( + args, + capture_output=True, + env=env, + check=True + ) + logging.info(f"Successfully committed {self.local_file} to branch.") + logging.debug(f"Command output: {result.stdout.decode()}") + except subprocess.CalledProcessError as e: + logging.error(f"Failed to commit {self.local_file}: {e.stderr.decode()}") + raise class CreateDatasetTask: diff --git a/src/workflow.py b/src/workflow.py index 82e3c93..c684499 100644 --- a/src/workflow.py +++ b/src/workflow.py @@ -4,9 +4,11 @@ from dask.distributed import Client, as_completed from src.task import ( CreateDatasetTask, - UploadDatasetTask, + LakeFSUploadDatasetTask, + LakeFSCommitDatasetTask, CreateSignalMetadataTask, CreateSourceMetadataTask, + CleanupDatasetTask ) from src.uploader import UploadConfig, LakeFSUploadConfig import subprocess @@ -67,11 +69,15 @@ def __call__(self, shot: int): self.facility ) - upload = UploadDatasetTask(local_path, self.upload_config) + upload = LakeFSUploadDatasetTask(local_path, self.upload_config) + commit = LakeFSCommitDatasetTask(local_path, self.upload_config) + cleanup = CleanupDatasetTask(local_path) try: create() upload() + commit() + cleanup() except Exception as e: logging.error(f"Failed to run workflow with error {type(e)}: {e}") From 5190edd100a50500e8a72085d6dd6a5e96ce21b7 Mon Sep 17 00:00:00 2001 From: James Hodson Date: Wed, 18 Sep 2024 11:35:27 +0100 Subject: [PATCH 24/34] included merge at the end of versioning run --- src/main.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/main.py b/src/main.py index ce5dafb..2332429 100644 --- a/src/main.py +++ b/src/main.py @@ -3,8 +3,9 @@ from functools import partial from dask_mpi import initialize import lakefs +from src.lake_fs import lakefs_merge_into_main from src.uploader import UploadConfig, LakeFSUploadConfig -from src.workflow import S3IngestionWorkflow, LocalIngestionWorkflow, WorkflowManager +from src.workflow import LakeFSIngestionWorkflow, LocalIngestionWorkflow, WorkflowManager from src.utils import read_shot_file def main(): @@ -36,7 +37,7 @@ def main(): credentials_file=args.credentials_file, endpoint_url=args.endpoint_url, ) - workflow_cls = partial(S3IngestionWorkflow, upload_config=config) + workflow_cls = partial(LakeFSIngestionWorkflow, upload_config=config) else: config = None workflow_cls = LocalIngestionWorkflow @@ -61,6 +62,8 @@ def main(): workflow_manager.run_workflows(shot_list, parallel=not args.serial) logging.info(f"Finished source {source}") + if args.upload: + lakefs_merge_into_main() if __name__ == "__main__": main() From 885508c914aa75f7fb1a6dbb48fe41c326e85152 Mon Sep 17 00:00:00 2001 From: James Hodson Date: Wed, 18 Sep 2024 11:36:10 +0100 Subject: [PATCH 25/34] upload and commit the shot name instead of the local_path prefix --- src/task.py | 5 +++-- src/workflow.py | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/task.py b/src/task.py index 832e25d..af59841 100644 --- a/src/task.py +++ b/src/task.py @@ -30,9 +30,10 @@ def __call__(self): class LakeFSUploadDatasetTask: - def __init__(self, local_file: Path, config: LakeFSUploadConfig): + def __init__(self, local_file: Path, shot_name: Path, config: LakeFSUploadConfig): self.config = config self.local_file = local_file + self.shot_name = shot_name def __call__(self): @@ -43,7 +44,7 @@ def __call__(self): logging.info(f"Uploading {self.local_file} to LakeFS.") args = [ "lakectl", "fs", "upload", - f"lakefs://example-repo/ingestion/{self.local_file}", + f"lakefs://example-repo/ingestion/{self.shot_name}", "-s", str(self.local_file), "--recursive" ] diff --git a/src/workflow.py b/src/workflow.py index c684499..9809c2f 100644 --- a/src/workflow.py +++ b/src/workflow.py @@ -35,7 +35,7 @@ def __call__(self, shot: int): logging.error(f"Could not parse source metadata for shot {shot}: {e}") -class S3IngestionWorkflow: +class LakeFSIngestionWorkflow: def __init__( self, @@ -59,6 +59,7 @@ def __init__( def __call__(self, shot: int): local_path = self.data_dir / f"{shot}.{self.file_format}" + shot_name = f"{shot}.{self.file_format}" create = CreateDatasetTask( self.metadata_dir, self.data_dir, @@ -69,7 +70,7 @@ def __call__(self, shot: int): self.facility ) - upload = LakeFSUploadDatasetTask(local_path, self.upload_config) + upload = LakeFSUploadDatasetTask(local_path, shot_name, self.upload_config) commit = LakeFSCommitDatasetTask(local_path, self.upload_config) cleanup = CleanupDatasetTask(local_path) From ade3c382113903e03ce25bd7e25b7d47a21ebb34 Mon Sep 17 00:00:00 2001 From: James Hodson Date: Wed, 18 Sep 2024 11:36:36 +0100 Subject: [PATCH 26/34] WIP file. Location of merge and execute functions used in main. --- src/lake_fs.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/lake_fs.py b/src/lake_fs.py index b5ee70a..1b3dcf1 100644 --- a/src/lake_fs.py +++ b/src/lake_fs.py @@ -9,6 +9,25 @@ import uuid import sys +def execute_command(command): + try: + result = subprocess.run(command, check=True, capture_output=True, text=True) + logging.info("Command output: %s", result.stdout) + return result + except subprocess.CalledProcessError as e: + logging.error("Error executing command: %s", e.stderr) + return e + +def lakefs_merge_into_main(): + logging.info("Uploading files from data directory to branch...") + command = [ + "lakectl", "merge", + f"lakefs://example-repo/ingestion/", + f"lakefs://example-repo/main/", + "-m", "Merge ingestion branch into main" + ] + execute_command(command) + class LakeFSManager: def __init__(self, repo_name): self.client = Client() From a270e280205f09c142548444dc12d970cdc57942 Mon Sep 17 00:00:00 2001 From: James Hodson Date: Wed, 18 Sep 2024 12:36:12 +0100 Subject: [PATCH 27/34] now able to use --upload arg as an option to enable lakefs versioning, requires a repo name --- src/lake_fs.py | 6 +++--- src/main.py | 5 +++-- src/task.py | 4 ++-- src/uploader.py | 1 + 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/lake_fs.py b/src/lake_fs.py index 1b3dcf1..515cb6f 100644 --- a/src/lake_fs.py +++ b/src/lake_fs.py @@ -18,12 +18,12 @@ def execute_command(command): logging.error("Error executing command: %s", e.stderr) return e -def lakefs_merge_into_main(): +def lakefs_merge_into_main(repo): logging.info("Uploading files from data directory to branch...") command = [ "lakectl", "merge", - f"lakefs://example-repo/ingestion/", - f"lakefs://example-repo/main/", + f"lakefs://{repo}/ingestion/", + f"lakefs://{repo}/main/", "-m", "Merge ingestion branch into main" ] execute_command(command) diff --git a/src/main.py b/src/main.py index 2332429..467f030 100644 --- a/src/main.py +++ b/src/main.py @@ -23,7 +23,7 @@ def main(): parser.add_argument("--credentials_file", default="lakectl.cfg") parser.add_argument("--serial", default=False, action='store_true') parser.add_argument("--endpoint_url", default="http://localhost:8000") - parser.add_argument("--upload", default=False, action="store_true") + parser.add_argument("--upload", nargs='?', const=False, default=False) parser.add_argument("--metadata_dir", default="data/uda") parser.add_argument("--force", action="store_true") parser.add_argument("--signal_names", nargs="*", default=[]) @@ -36,6 +36,7 @@ def main(): config = LakeFSUploadConfig( credentials_file=args.credentials_file, endpoint_url=args.endpoint_url, + repository=args.upload ) workflow_cls = partial(LakeFSIngestionWorkflow, upload_config=config) else: @@ -63,7 +64,7 @@ def main(): logging.info(f"Finished source {source}") if args.upload: - lakefs_merge_into_main() + lakefs_merge_into_main(args.upload) if __name__ == "__main__": main() diff --git a/src/task.py b/src/task.py index af59841..18b57f3 100644 --- a/src/task.py +++ b/src/task.py @@ -44,7 +44,7 @@ def __call__(self): logging.info(f"Uploading {self.local_file} to LakeFS.") args = [ "lakectl", "fs", "upload", - f"lakefs://example-repo/ingestion/{self.shot_name}", + f"lakefs://{self.config.repository}/ingestion/{self.shot_name}", "-s", str(self.local_file), "--recursive" ] @@ -78,7 +78,7 @@ def __call__(self): logging.info(f"Commit {self.local_file} to branch.") args = [ "lakectl", "commit", - f"lakefs://example-repo/ingestion/", + f"lakefs://{self.config.repository}/ingestion/", "-m", f"Commit file {self.local_file}" ] diff --git a/src/uploader.py b/src/uploader.py index b9a8547..337ae5e 100644 --- a/src/uploader.py +++ b/src/uploader.py @@ -11,3 +11,4 @@ class UploadConfig: class LakeFSUploadConfig: endpoint_url: str credentials_file: str + repository: str From 950f186066635825ed141491691d7729e42f026f Mon Sep 17 00:00:00 2001 From: James Hodson Date: Wed, 18 Sep 2024 14:32:24 +0100 Subject: [PATCH 28/34] added branch to config to enable merging of ingestion branch into main at the end --- src/main.py | 8 +++++--- src/task.py | 4 ++-- src/uploader.py | 1 + 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/main.py b/src/main.py index 467f030..827468a 100644 --- a/src/main.py +++ b/src/main.py @@ -3,7 +3,7 @@ from functools import partial from dask_mpi import initialize import lakefs -from src.lake_fs import lakefs_merge_into_main +from src.lake_fs import lakefs_merge_into_main, create_branch from src.uploader import UploadConfig, LakeFSUploadConfig from src.workflow import LakeFSIngestionWorkflow, LocalIngestionWorkflow, WorkflowManager from src.utils import read_shot_file @@ -33,10 +33,12 @@ def main(): args = parser.parse_args() if args.upload: + new_branch = create_branch(args.upload) config = LakeFSUploadConfig( credentials_file=args.credentials_file, endpoint_url=args.endpoint_url, - repository=args.upload + repository=args.upload, + branch=new_branch ) workflow_cls = partial(LakeFSIngestionWorkflow, upload_config=config) else: @@ -64,7 +66,7 @@ def main(): logging.info(f"Finished source {source}") if args.upload: - lakefs_merge_into_main(args.upload) + lakefs_merge_into_main(args.upload, new_branch) if __name__ == "__main__": main() diff --git a/src/task.py b/src/task.py index 18b57f3..96a5a6f 100644 --- a/src/task.py +++ b/src/task.py @@ -44,7 +44,7 @@ def __call__(self): logging.info(f"Uploading {self.local_file} to LakeFS.") args = [ "lakectl", "fs", "upload", - f"lakefs://{self.config.repository}/ingestion/{self.shot_name}", + f"lakefs://{self.config.repository}/{self.config.branch}/{self.shot_name}", "-s", str(self.local_file), "--recursive" ] @@ -78,7 +78,7 @@ def __call__(self): logging.info(f"Commit {self.local_file} to branch.") args = [ "lakectl", "commit", - f"lakefs://{self.config.repository}/ingestion/", + f"lakefs://{self.config.repository}/{self.config.branch}/", "-m", f"Commit file {self.local_file}" ] diff --git a/src/uploader.py b/src/uploader.py index 337ae5e..619ab80 100644 --- a/src/uploader.py +++ b/src/uploader.py @@ -12,3 +12,4 @@ class LakeFSUploadConfig: endpoint_url: str credentials_file: str repository: str + branch: str From d8136069cdf630054d7d855f44071c7fceb50015 Mon Sep 17 00:00:00 2001 From: James Hodson Date: Wed, 18 Sep 2024 14:32:43 +0100 Subject: [PATCH 29/34] removed excess class --- src/lake_fs.py | 149 +++++-------------------------------------------- 1 file changed, 14 insertions(+), 135 deletions(-) diff --git a/src/lake_fs.py b/src/lake_fs.py index 515cb6f..c9f28b5 100644 --- a/src/lake_fs.py +++ b/src/lake_fs.py @@ -17,145 +17,24 @@ def execute_command(command): except subprocess.CalledProcessError as e: logging.error("Error executing command: %s", e.stderr) return e - -def lakefs_merge_into_main(repo): + +def create_branch(repo): logging.info("Uploading files from data directory to branch...") + branch_name = f"branch-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{uuid.uuid4().hex[:8]}" command = [ - "lakectl", "merge", - f"lakefs://{repo}/ingestion/", - f"lakefs://{repo}/main/", - "-m", "Merge ingestion branch into main" + "lakectl", "branch", "create", + f"lakefs://{repo}/{branch_name}/", + "-s", f"lakefs://{repo}/main/" ] execute_command(command) - -class LakeFSManager: - def __init__(self, repo_name): - self.client = Client() - self.repo_name = repo_name - self.repo = lakefs.Repository(repo_name, client=self.client) - - - def execute_command(self, command): - try: - result = subprocess.run(command, check=True, capture_output=True, text=True) - logging.info("Command output: %s", result.stdout) - return result - except subprocess.CalledProcessError as e: - logging.error("Error executing command: %s", e.stderr) - return e - - def create_branch(self): - logging.info("Creating branch for ingestion...") - branch_name = f"branch-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{uuid.uuid4().hex[:8]}" - try: - branch = self.repo.branch(branch_name).create(source_reference="main") - return branch - except lakefs.exceptions.LakeFSException as e: - logging.error(f"Failed to create branch: {e}") - sys.exit(1) - - def upload_files_to_branch(self, branch_name, local_folder): + return branch_name + +def lakefs_merge_into_main(repo, branch): logging.info("Uploading files from data directory to branch...") command = [ - "lakectl", "fs", "upload", - f"lakefs://{self.repo_name}/{branch_name}/", - "-s", local_folder, - "-r" - ] - self.execute_command(command) - - def commit_branch(self, branch_name, message): - logging.info("Committing changes to branch...") - command = [ - "lakectl", "commit", - f"lakefs://{self.repo_name}/{branch_name}/", - "-m", message - ] - result = self.execute_command(command) - - if result.returncode != 0: - logging.error(f"Commit failed for branch '{branch_name}'. Deleting branch and exiting.") - self.delete_branch(branch_name) - sys.exit(1) - - def show_diff(self, branch): - main_branch = self.repo.branch("main") - changes = list(main_branch.diff(other_ref=branch)) - logging.info(f"Number of changes made to main: {len(changes)}") - - def merge_branch(self, branch): - logging.info("Merging branch to main...") - main_branch = self.repo.branch("main") - try: - res = branch.merge_into(main_branch) - return res - except lakefs.exceptions.LakeFSException as e: - logging.error(f"Failed to merge branch: {e}") - sys.exit(1) - - def delete_branch(self, branch_name): - logging.info("Deleting branch.") - command = [ - "lakectl", "branch", "delete", - f"lakefs://{self.repo_name}/{branch_name}", - "--yes" + "lakectl", "merge", + f"lakefs://{repo}/{branch}/", + f"lakefs://{repo}/main/", + "-m", f"Merge {branch} branch into main" ] - self.execute_command(command) - - -def check_lakectl_installed(): - try: - subprocess.run(["lakectl", "--version"], check=True, capture_output=True, text=True) - logging.info("lakectl is installed.") - except subprocess.CalledProcessError: - logging.error("lakectl is not installed or not found in PATH.") - sys.exit(1) - - -def validate_data_directory(local_folder): - if not os.path.exists(local_folder): - logging.error(f"Data directory '{local_folder}' does not exist.") - sys.exit(1) - elif not os.listdir(local_folder): - logging.error(f"Data directory '{local_folder}' is empty.") - sys.exit(1) - logging.info(f"Data directory '{local_folder}' is valid.") - -def perform_cleanup(data_dir): - logging.info("Starting dataset cleanup process...") - try: - cleanup = CleanupDatasetTask(data_dir) - cleanup() - logging.info(f"Cleanup successful for directory: {data_dir}") - except Exception as e: - logging.error(f"Error during cleanup: {e}") - raise - - -def main(): - logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") - - parser = argparse.ArgumentParser(description="LakeFS Ingestion Script") - parser.add_argument("--repo", required=True, help="Name of the LakeFS repository") - parser.add_argument("--data-dir", required=True, help="Location of the data files to upload") - parser.add_argument("--commit-message", default="Import data from CSD3", help="Commit message for the ingestion") - args = parser.parse_args() - - check_lakectl_installed() - validate_data_directory(args.data_dir) - - lakefs_manager = LakeFSManager(repo_name=args.repo) - - branch = lakefs_manager.create_branch() - lakefs_manager.upload_files_to_branch(branch.id, args.data_dir) - lakefs_manager.commit_branch(branch.id, args.commit_message) - lakefs_manager.show_diff(branch) - - lakefs_manager.merge_branch(branch) - lakefs_manager.delete_branch(branch.id) - - perform_cleanup(args.data_dir) - - -if __name__ == "__main__": - main() \ No newline at end of file + execute_command(command) \ No newline at end of file From 5a8919e0b93ad4ae44d4a0e11c8290f327e8e68f Mon Sep 17 00:00:00 2001 From: James Hodson Date: Fri, 20 Sep 2024 09:15:14 +0100 Subject: [PATCH 30/34] better info messages --- src/task.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/task.py b/src/task.py index 96a5a6f..525c9c4 100644 --- a/src/task.py +++ b/src/task.py @@ -41,7 +41,7 @@ def __call__(self): raise RuntimeError(f"Credentials file {self.config.credentials_file} does not exist!") env = os.environ.copy() - logging.info(f"Uploading {self.local_file} to LakeFS.") + logging.info(f"Attempting to upload {self.local_file} to repository: {self.config.repository}...") args = [ "lakectl", "fs", "upload", f"lakefs://{self.config.repository}/{self.config.branch}/{self.shot_name}", @@ -57,7 +57,7 @@ def __call__(self): env=env, check=True ) - logging.info(f"Successfully uploaded {self.local_file} to LakeFS.") + logging.info(f"Successfully uploaded {self.local_file} to repository: {self.config.repository}.") logging.debug(f"Command output: {result.stdout.decode()}") except subprocess.CalledProcessError as e: logging.error(f"Failed to upload {self.local_file}: {e.stderr.decode()}") @@ -75,7 +75,7 @@ def __call__(self): raise RuntimeError(f"Credentials file {self.config.credentials_file} does not exist!") env = os.environ.copy() - logging.info(f"Commit {self.local_file} to branch.") + logging.info(f"Attempting to commit {self.local_file} to branch: {self.config.branch}...") args = [ "lakectl", "commit", f"lakefs://{self.config.repository}/{self.config.branch}/", @@ -91,7 +91,7 @@ def __call__(self): env=env, check=True ) - logging.info(f"Successfully committed {self.local_file} to branch.") + logging.info(f"Successfully committed {self.local_file} to branch: {self.config.branch}...") logging.debug(f"Command output: {result.stdout.decode()}") except subprocess.CalledProcessError as e: logging.error(f"Failed to commit {self.local_file}: {e.stderr.decode()}") From 239e971deb2e37caec30367d8c47881babf05928 Mon Sep 17 00:00:00 2001 From: James Hodson Date: Fri, 20 Sep 2024 09:16:08 +0100 Subject: [PATCH 31/34] removed logging info --- src/lake_fs.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/lake_fs.py b/src/lake_fs.py index c9f28b5..17e4e6b 100644 --- a/src/lake_fs.py +++ b/src/lake_fs.py @@ -19,7 +19,6 @@ def execute_command(command): return e def create_branch(repo): - logging.info("Uploading files from data directory to branch...") branch_name = f"branch-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{uuid.uuid4().hex[:8]}" command = [ "lakectl", "branch", "create", @@ -30,7 +29,6 @@ def create_branch(repo): return branch_name def lakefs_merge_into_main(repo, branch): - logging.info("Uploading files from data directory to branch...") command = [ "lakectl", "merge", f"lakefs://{repo}/{branch}/", From 426e6f968959433b211a71d6a39baa0173969da6 Mon Sep 17 00:00:00 2001 From: James Hodson Date: Fri, 20 Sep 2024 10:08:09 +0100 Subject: [PATCH 32/34] ruff fixes --- src/lake_fs.py | 6 ------ src/main.py | 3 +-- src/workflow.py | 4 +--- 3 files changed, 2 insertions(+), 11 deletions(-) diff --git a/src/lake_fs.py b/src/lake_fs.py index 17e4e6b..f624e16 100644 --- a/src/lake_fs.py +++ b/src/lake_fs.py @@ -1,13 +1,7 @@ -import os -import argparse import logging -from lakefs.client import Client -from src.task import CleanupDatasetTask -import lakefs import subprocess from datetime import datetime import uuid -import sys def execute_command(command): try: diff --git a/src/main.py b/src/main.py index 827468a..384c458 100644 --- a/src/main.py +++ b/src/main.py @@ -2,9 +2,8 @@ import logging from functools import partial from dask_mpi import initialize -import lakefs from src.lake_fs import lakefs_merge_into_main, create_branch -from src.uploader import UploadConfig, LakeFSUploadConfig +from src.uploader import LakeFSUploadConfig from src.workflow import LakeFSIngestionWorkflow, LocalIngestionWorkflow, WorkflowManager from src.utils import read_shot_file diff --git a/src/workflow.py b/src/workflow.py index 9809c2f..8786349 100644 --- a/src/workflow.py +++ b/src/workflow.py @@ -1,4 +1,3 @@ -import s3fs import logging from pathlib import Path from dask.distributed import Client, as_completed @@ -10,8 +9,7 @@ CreateSourceMetadataTask, CleanupDatasetTask ) -from src.uploader import UploadConfig, LakeFSUploadConfig -import subprocess +from src.uploader import LakeFSUploadConfig logging.basicConfig(level=logging.INFO) From 5ea3f84ada5a14b64e2805ef233f28f0753aefa8 Mon Sep 17 00:00:00 2001 From: James Hodson Date: Fri, 20 Sep 2024 10:08:27 +0100 Subject: [PATCH 33/34] ruff fixes --- src/task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/task.py b/src/task.py index 525c9c4..da1082a 100644 --- a/src/task.py +++ b/src/task.py @@ -11,7 +11,7 @@ from src.mast import MASTClient from src.reader import DatasetReader, SignalMetadataReader, SourceMetadataReader from src.writer import DatasetWriter -from src.uploader import UploadConfig, LakeFSUploadConfig +from src.uploader import LakeFSUploadConfig logging.basicConfig(level=logging.INFO) From fe93dce11619a2c6ca18be51db7e6c048a2e4c40 Mon Sep 17 00:00:00 2001 From: James Hodson Date: Mon, 7 Oct 2024 09:41:29 +0100 Subject: [PATCH 34/34] changed job to work with lakefs command --- jobs/ingest.csd3.slurm.sh | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/jobs/ingest.csd3.slurm.sh b/jobs/ingest.csd3.slurm.sh index dd05075..4f3a40b 100644 --- a/jobs/ingest.csd3.slurm.sh +++ b/jobs/ingest.csd3.slurm.sh @@ -2,22 +2,21 @@ #SBATCH -A UKAEA-AP002-CPU #SBATCH -p icelake #SBATCH --job-name=fair-mast-ingest -#SBATCH --output=fair-mast-ingest_%A.out +#SBATCH --output=outputs/fair-mast-ingest_%A.out #SBATCH --time=5:00:00 -#SBATCH --mem=250G -#SBATCH --ntasks=128 +#SBATCH --mem=40G +#SBATCH --ntasks=8 #SBATCH -N 2 summary_file=$1 -bucket_path=$2 num_workers=$SLURM_NTASKS random_string=$(head /dev/urandom | tr -dc A-Za-z0-9 | head -c 16) -temp_dir="/rds/project/rds-sPGbyCAPsJI/local_cache/$random_string" +temp_dir="data/local" metadata_dir="/rds/project/rds-sPGbyCAPsJI/data/uda" source /rds/project/rds-sPGbyCAPsJI/uda-ssl.sh mpirun -np $num_workers \ - python3 -m src.main $temp_dir $summary_file --metadata_dir $metadata_dir --bucket_path $bucket_path --upload --force --source_names ${@:3} + python3 -m src.main $temp_dir $summary_file --metadata_dir $metadata_dir --upload example-repo --force --source_names ${@:3}