Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Allow users to specify custom Dependency jars #1395

Merged
merged 6 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 48 additions & 26 deletions user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@
from spark_rapids_pytools.rapids.rapids_job import RapidsJobPropContainer
from spark_rapids_pytools.rapids.tool_ctxt import ToolContext
from spark_rapids_tools import CspEnv
from spark_rapids_tools.enums import HashAlgorithm
from spark_rapids_tools.configuration.common import RuntimeDependency
from spark_rapids_tools.configuration.tools_config import ToolsConfig
from spark_rapids_tools.enums import DependencyType
from spark_rapids_tools.storagelib import LocalPath, CspFs
from spark_rapids_tools.storagelib.tools.fs_utils import untar_file, FileHashAlgorithm
from spark_rapids_tools.storagelib.tools.fs_utils import untar_file
from spark_rapids_tools.utils import Utilities
from spark_rapids_tools.utils.net_utils import DownloadTask

Expand Down Expand Up @@ -70,6 +72,13 @@ class RapidsTool(object):
logger: Logger = field(default=None, init=False)
spinner: ToolsSpinner = field(default=None, init=False)

def get_tools_config_obj(self) -> Optional['ToolsConfig']:
"""
Get the tools configuration object if provided in the CLI arguments.
:return: An object containing all the tools configuration or None if not provided.
"""
return self.wrapper_options.get('toolsConfig')

def pretty_name(self):
return self.name.capitalize()

Expand Down Expand Up @@ -136,7 +145,7 @@ def _check_environment(self) -> None:

def _process_output_args(self):
self.logger.debug('Processing Output Arguments')
# make sure that output_folder is being absolute
# make sure output_folder is absolute
if self.output_folder is None:
self.output_folder = Utils.get_rapids_tools_env('OUTPUT_DIRECTORY', os.getcwd())
try:
Expand Down Expand Up @@ -393,7 +402,8 @@ def _calculate_spark_settings(self, worker_info: NodeHWInfo) -> dict:
return res

@classmethod
def get_rapids_tools_dependencies(cls, deploy_mode: str, json_props: AbstractPropertiesContainer) -> Optional[list]:
def get_rapids_tools_dependencies(cls, deploy_mode: str,
json_props: AbstractPropertiesContainer) -> Optional[list]:
"""
Get the tools dependencies from the platform configuration.
"""
Expand All @@ -403,7 +413,9 @@ def get_rapids_tools_dependencies(cls, deploy_mode: str, json_props: AbstractPro
depend_arr = json_props.get_value_silent('dependencies', 'deployMode', deploy_mode, active_buildver)
if depend_arr is None:
raise ValueError(f'Invalid SPARK dependency version [{active_buildver}]')
return depend_arr
# convert the json array to a list of RuntimeDependency objects
runtime_dep_arr = [RuntimeDependency(**dep) for dep in depend_arr]
return runtime_dep_arr


@dataclass
Expand Down Expand Up @@ -532,47 +544,46 @@ def exception_handler(future):
if exception:
self.logger.error('Error while downloading dependency: %s', exception)

def cache_single_dependency(dep: dict) -> str:
def cache_single_dependency(dep: RuntimeDependency) -> str:
"""
Downloads the specified URL and saves it to disk
"""
self.logger.info('Checking dependency %s', dep['name'])
self.logger.info('Checking dependency %s', dep.name)
dest_folder = self.ctxt.get_cache_folder()
verify_opts = {}
dep_verification = dep.get('verification')
if dep_verification is not None:
if 'size' in dep_verification:
verify_opts['size'] = dep_verification['size']
hash_lib_alg = dep_verification.get('hashLib')
if hash_lib_alg:
verify_opts['file_hash'] = FileHashAlgorithm(HashAlgorithm(hash_lib_alg['type']),
hash_lib_alg['value'])
download_task = DownloadTask(src_url=dep['uri'], # pylint: disable=no-value-for-parameter)
if dep.verification is not None:
verify_opts = dict(dep.verification)
download_task = DownloadTask(src_url=dep.uri, # pylint: disable=no-value-for-parameter)
dest_folder=dest_folder,
verification=verify_opts)
download_result = download_task.run_task()
self.logger.info('Completed downloading of dependency [%s] => %s',
dep['name'],
dep.name,
f'{download_result.pretty_print()}')
if not download_result.success:
msg = f'Failed to download dependency {dep["name"]}, reason: {download_result.download_error}'
msg = f'Failed to download dependency {dep.name}, reason: {download_result.download_error}'
raise RuntimeError(f'Could not download all dependencies. Aborting Executions.\n\t{msg}')
destination_path = self.ctxt.get_local_work_dir()
destination_cspath = LocalPath(destination_path)
if dep['type'] == 'archive':
# set the default dependency type to jar
defined_dep_type = DependencyType.get_default()
if dep.dependency_type:
defined_dep_type = dep.dependency_type.dep_type
if defined_dep_type == DependencyType.ARCHIVE:
uncompressed_cspath = untar_file(download_result.resource, destination_cspath)
dep_item = uncompressed_cspath.no_scheme
relative_path = dep.get('relativePath')
if relative_path is not None:
dep_item = f'{dep_item}/{relative_path}'
else:
if dep.dependency_type.relative_path is not None:
dep_item = f'{dep_item}/{dep.dependency_type.relative_path}'
elif defined_dep_type == DependencyType.JAR:
# copy the jar into dependency folder
CspFs.copy_resources(download_result.resource, destination_cspath)
final_dep_csp = destination_cspath.create_sub_path(download_result.resource.base_name())
dep_item = final_dep_csp.no_scheme
else:
raise ValueError(f'Invalid dependency type [{defined_dep_type}]')
return dep_item

def cache_all_dependencies(dep_arr: List[dict]):
def cache_all_dependencies(dep_arr: List[RuntimeDependency]) -> List[str]:
"""
Create a thread pool and download specified urls
"""
Expand All @@ -593,8 +604,19 @@ def cache_all_dependencies(dep_arr: List[dict]):
raise ex
return results

deploy_mode = DeployMode.tostring(self.ctxt.get_deploy_mode())
depend_arr = self.get_rapids_tools_dependencies(deploy_mode, self.ctxt.platform.configs)
def populate_dependency_list() -> List[RuntimeDependency]:
# check if the dependencies is defined in a config file
config_obj = self.get_tools_config_obj()
if config_obj is not None:
if config_obj.runtime.dependencies:
return config_obj.runtime.dependencies
self.logger.info('The ToolsConfig did not specify the dependencies. '
'Falling back to the default dependencies.')
# load dependency list from the platform configuration
deploy_mode = DeployMode.tostring(self.ctxt.get_deploy_mode())
return self.get_rapids_tools_dependencies(deploy_mode, self.ctxt.platform.configs)

depend_arr = populate_dependency_list()
if depend_arr:
dep_list = cache_all_dependencies(depend_arr)
if any(dep_item is None for dep_item in dep_list):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,23 @@
"name": "Apache Spark",
"uri": "https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz",
"verification": {
"hashLib": {
"type": "sha512",
"fileHash": {
"algorithm": "sha512",
"value": "8883c67e0a138069e597f3e7d4edbbd5c3a565d50b28644aad02856a1ec1da7cb92b8f80454ca427118f69459ea326eaa073cf7b1a860c3b796f4b07c2101319"
},
"size": 400395283
},
"type": "archive",
"relativePath": "jars/*"
"dependencyType": {
"depType": "archive",
"relativePath": "jars/*"
}
},
{
"name": "Hadoop AWS",
"uri": "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar",
"verification": {
"hashLib": {
"type": "sha1",
"fileHash": {
"algorithm": "sha1",
"value": "a65839fbf1869f81a1632e09f415e586922e4f80"
},
"size": 962685
Expand All @@ -33,8 +35,8 @@
"name": "AWS Java SDK Bundled",
"uri": "https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar",
"verification": {
"hashLib": {
"type": "sha1",
"fileHash": {
"algorithm": "sha1",
"value": "02deec3a0ad83d13d032b1812421b23d7a961eea"
},
"size": 280645251
Expand All @@ -47,38 +49,38 @@
"name": "Apache Spark",
"uri": "https://archive.apache.org/dist/spark/spark-3.3.3/spark-3.3.3-bin-hadoop3.tgz",
"verification": {
"hashLib": {
"type": "sha512",
"fileHash": {
"algorithm": "sha512",
"value": "ebf79c7861f3120d5ed9465fdd8d5302a734ff30713a0454b714bbded7ab9f218b3108dc46a5de4cc2102c86e7be53908f84d2c7a19e59bc75880766eeefeef9"
},
"size": 299426263
},
"type": "archive",
"relativePath": "jars/*"
"dependencyType": {
"depType": "archive",
"relativePath": "jars/*"
}
},
{
"name": "Hadoop AWS",
"uri": "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar",
"verification": {
"hashLib": {
"type": "sha1",
"fileHash": {
"algorithm": "sha1",
"value": "a65839fbf1869f81a1632e09f415e586922e4f80"
},
"size": 962685
},
"type": "jar"
}
},
{
"name": "AWS Java SDK Bundled",
"uri": "https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar",
"verification": {
"hashLib": {
"type": "sha1",
"fileHash": {
"algorithm": "sha1",
"value": "02deec3a0ad83d13d032b1812421b23d7a961eea"
},
"size": 280645251
},
"type": "jar"
}
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,53 +8,55 @@
"name": "Apache Spark",
"uri": "https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz",
"verification": {
"hashLib": {
"type": "sha512",
"fileHash": {
"algorithm": "sha512",
"value": "8883c67e0a138069e597f3e7d4edbbd5c3a565d50b28644aad02856a1ec1da7cb92b8f80454ca427118f69459ea326eaa073cf7b1a860c3b796f4b07c2101319"
},
"size": 400395283
},
"type": "archive",
"relativePath": "jars/*"
"dependencyType": {
"depType": "archive",
"relativePath": "jars/*"
}
},
{
"name": "Hadoop Azure",
"uri": "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-azure/3.3.4/hadoop-azure-3.3.4.jar",
"verification": {
"hashLib": {
"type": "sha1",
"fileHash": {
"algorithm": "sha1",
"value": "a23f621bca9b2100554150f6b0b521f94b8b419e"
},
"size": 574116
},
"type": "jar"
}
}
],
"333": [
{
"name": "Apache Spark",
"uri": "https://archive.apache.org/dist/spark/spark-3.3.3/spark-3.3.3-bin-hadoop3.tgz",
"verification": {
"hashLib": {
"type": "sha512",
"fileHash": {
"algorithm": "sha512",
"value": "ebf79c7861f3120d5ed9465fdd8d5302a734ff30713a0454b714bbded7ab9f218b3108dc46a5de4cc2102c86e7be53908f84d2c7a19e59bc75880766eeefeef9"
},
"size": 299426263
},
"type": "archive",
"relativePath": "jars/*"
"dependencyType": {
"depType": "archive",
"relativePath": "jars/*"
}
},
{
"name": "Hadoop Azure",
"uri": "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-azure/3.3.4/hadoop-azure-3.3.4.jar",
"verification": {
"hashLib": {
"type": "sha1",
"fileHash": {
"algorithm": "sha1",
"value": "a23f621bca9b2100554150f6b0b521f94b8b419e"
},
"size": 574116
},
"type": "jar"
}
}
]
}
Expand Down
34 changes: 18 additions & 16 deletions user_tools/src/spark_rapids_pytools/resources/dataproc-configs.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,53 +8,55 @@
"name": "Apache Spark",
"uri": "https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz",
"verification": {
"hashLib": {
"type": "sha512",
"fileHash": {
"algorithm": "sha512",
"value": "8883c67e0a138069e597f3e7d4edbbd5c3a565d50b28644aad02856a1ec1da7cb92b8f80454ca427118f69459ea326eaa073cf7b1a860c3b796f4b07c2101319"
},
"size": 400395283
},
"type": "archive",
"relativePath": "jars/*"
"dependencyType": {
"depType": "archive",
"relativePath": "jars/*"
}
},
{
"name": "GCS Connector Hadoop3",
"uri": "https://repo1.maven.org/maven2/com/google/cloud/bigdataoss/gcs-connector/hadoop3-2.2.19/gcs-connector-hadoop3-2.2.19-shaded.jar",
"verification": {
"hashLib": {
"type": "sha1",
"fileHash": {
"algorithm": "sha1",
"value": "3bea6d5e62663a2a5c03d8ca44dff4921aeb3170"
},
"size": 39359477
},
"type": "jar"
}
}
],
"333": [
{
"name": "Apache Spark",
"uri": "https://archive.apache.org/dist/spark/spark-3.3.3/spark-3.3.3-bin-hadoop3.tgz",
"verification": {
"hashLib": {
"type": "sha512",
"fileHash": {
"algorithm": "sha512",
"value": "ebf79c7861f3120d5ed9465fdd8d5302a734ff30713a0454b714bbded7ab9f218b3108dc46a5de4cc2102c86e7be53908f84d2c7a19e59bc75880766eeefeef9"
},
"size": 299426263
},
"type": "archive",
"relativePath": "jars/*"
"dependencyType": {
"depType": "archive",
"relativePath": "jars/*"
}
},
{
"name": "GCS Connector Hadoop3",
"uri": "https://repo1.maven.org/maven2/com/google/cloud/bigdataoss/gcs-connector/hadoop3-2.2.17/gcs-connector-hadoop3-2.2.17-shaded.jar",
"verification": {
"hashLib": {
"type": "sha1",
"fileHash": {
"algorithm": "sha1",
"value": "06438f562692ff8fae5e8555eba2b9f95cb74f66"
},
"size": 38413466
},
"type": "jar"
}
}
]
}
Expand Down
Loading