diff --git a/metadata-ingestion/docs/sources/abs/README.md b/metadata-ingestion/docs/sources/abs/README.md new file mode 100644 index 00000000000000..46a234ed305e0b --- /dev/null +++ b/metadata-ingestion/docs/sources/abs/README.md @@ -0,0 +1,40 @@ +This connector ingests Azure Blob Storage (abbreviated to abs) datasets into DataHub. It allows mapping an individual +file or a folder of files to a dataset in DataHub. +To specify the group of files that form a dataset, use `path_specs` configuration in ingestion recipe. Refer +section [Path Specs](https://datahubproject.io/docs/generated/ingestion/sources/s3/#path-specs) for more details. + +### Concept Mapping + +This ingestion source maps the following Source System Concepts to DataHub Concepts: + +| Source Concept | DataHub Concept | Notes | +|----------------------------------------|--------------------------------------------------------------------------------------------|------------------| +| `"abs"` | [Data Platform](https://datahubproject.io/docs/generated/metamodel/entities/dataplatform/) | | +| abs blob / Folder containing abs blobs | [Dataset](https://datahubproject.io/docs/generated/metamodel/entities/dataset/) | | +| abs container | [Container](https://datahubproject.io/docs/generated/metamodel/entities/container/) | Subtype `Folder` | + +This connector supports both local files and those stored on Azure Blob Storage (which must be identified using the +prefix `http(s)://.blob.core.windows.net/` or `azure://`). + +### Supported file types + +Supported file types are as follows: + +- CSV (*.csv) +- TSV (*.tsv) +- JSONL (*.jsonl) +- JSON (*.json) +- Parquet (*.parquet) +- Apache Avro (*.avro) + +Schemas for Parquet and Avro files are extracted as provided. + +Schemas for schemaless formats (CSV, TSV, JSONL, JSON) are inferred. For CSV, TSV and JSONL files, we consider the first +100 rows by default, which can be controlled via the `max_rows` recipe parameter (see [below](#config-details)) +JSON file schemas are inferred on the basis of the entire file (given the difficulty in extracting only the first few +objects of the file), which may impact performance. +We are working on using iterator-based JSON parsers to avoid reading in the entire JSON object. + +### Profiling + +Profiling is not available in the current release. diff --git a/metadata-ingestion/docs/sources/abs/abs.md b/metadata-ingestion/docs/sources/abs/abs.md new file mode 100644 index 00000000000000..613ace280c8ba0 --- /dev/null +++ b/metadata-ingestion/docs/sources/abs/abs.md @@ -0,0 +1,204 @@ + +### Path Specs + +Path Specs (`path_specs`) is a list of Path Spec (`path_spec`) objects, where each individual `path_spec` represents one or more datasets. The include path (`path_spec.include`) represents a formatted path to the dataset. This path must end with `*.*` or `*.[ext]` to represent the leaf level. If `*.[ext]` is provided, then only files with the specified extension type will be scanned. "`.[ext]`" can be any of the [supported file types](#supported-file-types). Refer to [example 1](#example-1---individual-file-as-dataset) below for more details. + +All folder levels need to be specified in the include path. You can use `/*/` to represent a folder level and avoid specifying the exact folder name. To map a folder as a dataset, use the `{table}` placeholder to represent the folder level for which the dataset is to be created. For a partitioned dataset, you can use the placeholder `{partition_key[i]}` to represent the name of the `i`th partition and `{partition[i]}` to represent the value of the `i`th partition. During ingestion, `i` will be used to match the partition_key to the partition. Refer to [examples 2 and 3](#example-2---folder-of-files-as-dataset-without-partitions) below for more details. + +Exclude paths (`path_spec.exclude`) can be used to ignore paths that are not relevant to the current `path_spec`. This path cannot have named variables (`{}`). The exclude path can have `**` to represent multiple folder levels. Refer to [example 4](#example-4---folder-of-files-as-dataset-with-partitions-and-exclude-filter) below for more details. + +Refer to [example 5](#example-5---advanced---either-individual-file-or-folder-of-files-as-dataset) if your container has a more complex dataset representation. + +**Additional points to note** +- Folder names should not contain {, }, *, / in their names. +- Named variable {folder} is reserved for internal working. please do not use in named variables. + + +### Path Specs - Examples +#### Example 1 - Individual file as Dataset + +Container structure: + +``` +test-container +├── employees.csv +├── departments.json +└── food_items.csv +``` + +Path specs config to ingest `employees.csv` and `food_items.csv` as datasets: +``` +path_specs: + - include: https://storageaccountname.blob.core.windows.net/test-container/*.csv + +``` +This will automatically ignore `departments.json` file. To include it, use `*.*` instead of `*.csv`. + +#### Example 2 - Folder of files as Dataset (without Partitions) + +Container structure: +``` +test-container +└── offers + ├── 1.avro + └── 2.avro + +``` + +Path specs config to ingest folder `offers` as dataset: +``` +path_specs: + - include: https://storageaccountname.blob.core.windows.net/test-container/{table}/*.avro +``` + +`{table}` represents folder for which dataset will be created. + +#### Example 3 - Folder of files as Dataset (with Partitions) + +Container structure: +``` +test-container +├── orders +│ └── year=2022 +│ └── month=2 +│ ├── 1.parquet +│ └── 2.parquet +└── returns + └── year=2021 + └── month=2 + └── 1.parquet + +``` + +Path specs config to ingest folders `orders` and `returns` as datasets: +``` +path_specs: + - include: https://storageaccountname.blob.core.windows.net/test-container/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.parquet +``` + +One can also use `include: https://storageaccountname.blob.core.windows.net/test-container/{table}/*/*/*.parquet` here however above format is preferred as it allows declaring partitions explicitly. + +#### Example 4 - Folder of files as Dataset (with Partitions), and Exclude Filter + +Container structure: +``` +test-container +├── orders +│ └── year=2022 +│ └── month=2 +│ ├── 1.parquet +│ └── 2.parquet +└── tmp_orders + └── year=2021 + └── month=2 + └── 1.parquet + + +``` + +Path specs config to ingest folder `orders` as dataset but not folder `tmp_orders`: +``` +path_specs: + - include: https://storageaccountname.blob.core.windows.net/test-container/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.parquet + exclude: + - **/tmp_orders/** +``` + + +#### Example 5 - Advanced - Either Individual file OR Folder of files as Dataset + +Container structure: +``` +test-container +├── customers +│ ├── part1.json +│ ├── part2.json +│ ├── part3.json +│ └── part4.json +├── employees.csv +├── food_items.csv +├── tmp_10101000.csv +└── orders + └── year=2022 + └── month=2 + ├── 1.parquet + ├── 2.parquet + └── 3.parquet + +``` + +Path specs config: +``` +path_specs: + - include: https://storageaccountname.blob.core.windows.net/test-container/*.csv + exclude: + - **/tmp_10101000.csv + - include: https://storageaccountname.blob.core.windows.net/test-container/{table}/*.json + - include: https://storageaccountname.blob.core.windows.net/test-container/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.parquet +``` + +Above config has 3 path_specs and will ingest following datasets +- `employees.csv` - Single File as Dataset +- `food_items.csv` - Single File as Dataset +- `customers` - Folder as Dataset +- `orders` - Folder as Dataset + and will ignore file `tmp_10101000.csv` + +**Valid path_specs.include** + +```python +https://storageaccountname.blob.core.windows.net/my-container/foo/tests/bar.avro # single file table +https://storageaccountname.blob.core.windows.net/my-container/foo/tests/*.* # mulitple file level tables +https://storageaccountname.blob.core.windows.net/my-container/foo/tests/{table}/*.avro #table without partition +https://storageaccountname.blob.core.windows.net/my-container/foo/tests/{table}/*/*.avro #table where partitions are not specified +https://storageaccountname.blob.core.windows.net/my-container/foo/tests/{table}/*.* # table where no partitions as well as data type specified +https://storageaccountname.blob.core.windows.net/my-container/{dept}/tests/{table}/*.avro # specifying keywords to be used in display name +https://storageaccountname.blob.core.windows.net/my-container/{dept}/tests/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.avro # specify partition key and value format +https://storageaccountname.blob.core.windows.net/my-container/{dept}/tests/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.avro # specify partition value only format +https://storageaccountname.blob.core.windows.net/my-container/{dept}/tests/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.* # for all extensions +https://storageaccountname.blob.core.windows.net/my-container/*/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.* # table is present at 2 levels down in container +https://storageaccountname.blob.core.windows.net/my-container/*/*/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.* # table is present at 3 levels down in container +``` + +**Valid path_specs.exclude** +- \**/tests/** +- https://storageaccountname.blob.core.windows.net/my-container/hr/** +- **/tests/*.csv +- https://storageaccountname.blob.core.windows.net/my-container/foo/*/my_table/** + + + +If you would like to write a more complicated function for resolving file names, then a {transformer} would be a good fit. + +:::caution + +Specify as long fixed prefix ( with out /*/ ) as possible in `path_specs.include`. This will reduce the scanning time and cost, specifically on AWS S3 + +::: + +:::caution + +Running profiling against many tables or over many rows can run up significant costs. +While we've done our best to limit the expensiveness of the queries the profiler runs, you +should be prudent about the set of tables profiling is enabled on or the frequency +of the profiling runs. + +::: + +:::caution + +If you are ingesting datasets from AWS S3, we recommend running the ingestion on a server in the same region to avoid high egress costs. + +::: + +### Compatibility + +Profiles are computed with PyDeequ, which relies on PySpark. Therefore, for computing profiles, we currently require Spark 3.0.3 with Hadoop 3.2 to be installed and the `SPARK_HOME` and `SPARK_VERSION` environment variables to be set. The Spark+Hadoop binary can be downloaded [here](https://www.apache.org/dyn/closer.lua/spark/spark-3.0.3/spark-3.0.3-bin-hadoop3.2.tgz). + +For an example guide on setting up PyDeequ on AWS, see [this guide](https://aws.amazon.com/blogs/big-data/testing-data-quality-at-scale-with-pydeequ/). + +:::caution + +From Spark 3.2.0+, Avro reader fails on column names that don't start with a letter and contains other character than letters, number, and underscore. [https://github.com/apache/spark/blob/72c62b6596d21e975c5597f8fff84b1a9d070a02/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala#L158] +Avro files that contain such columns won't be profiled. +::: \ No newline at end of file diff --git a/metadata-ingestion/docs/sources/abs/abs_recipe.yml b/metadata-ingestion/docs/sources/abs/abs_recipe.yml new file mode 100644 index 00000000000000..4c4e5c678238f7 --- /dev/null +++ b/metadata-ingestion/docs/sources/abs/abs_recipe.yml @@ -0,0 +1,13 @@ +source: + type: abs + config: + path_specs: + - include: "https://storageaccountname.blob.core.windows.net/covid19-lake/covid_knowledge_graph/csv/nodes/*.*" + + azure_config: + account_name: "*****" + sas_token: "*****" + container_name: "covid_knowledge_graph" + env: "PROD" + +# sink configs diff --git a/metadata-ingestion/docs/sources/s3/README.md b/metadata-ingestion/docs/sources/s3/README.md index b0d354a9b3c2ac..5feda741070240 100644 --- a/metadata-ingestion/docs/sources/s3/README.md +++ b/metadata-ingestion/docs/sources/s3/README.md @@ -1,5 +1,5 @@ This connector ingests AWS S3 datasets into DataHub. It allows mapping an individual file or a folder of files to a dataset in DataHub. -To specify the group of files that form a dataset, use `path_specs` configuration in ingestion recipe. Refer section [Path Specs](https://datahubproject.io/docs/generated/ingestion/sources/s3/#path-specs) for more details. +Refer to the section [Path Specs](https://datahubproject.io/docs/generated/ingestion/sources/s3/#path-specs) for more details. :::tip This connector can also be used to ingest local files. diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 41c04ca4a433cf..e1a9e6a55909d4 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -258,6 +258,13 @@ *path_spec_common, } +abs_base = { + "azure-core==1.29.4", + "azure-identity>=1.14.0", + "azure-storage-blob>=12.19.0", + "azure-storage-file-datalake>=12.14.0", +} + data_lake_profiling = { "pydeequ~=1.1.0", "pyspark~=3.3.0", @@ -265,6 +272,7 @@ delta_lake = { *s3_base, + *abs_base, # Version 0.18.0 broken on ARM Macs: https://github.com/delta-io/delta-rs/issues/2577 "deltalake>=0.6.3, != 0.6.4, < 0.18.0; platform_system == 'Darwin' and platform_machine == 'arm64'", "deltalake>=0.6.3, != 0.6.4; platform_system != 'Darwin' or platform_machine != 'arm64'", @@ -407,6 +415,7 @@ | {"cachetools"}, "s3": {*s3_base, *data_lake_profiling}, "gcs": {*s3_base, *data_lake_profiling}, + "abs": {*abs_base}, "sagemaker": aws_common, "salesforce": {"simple-salesforce"}, "snowflake": snowflake_common | usage_common | sqlglot_lib, @@ -686,6 +695,7 @@ "demo-data = datahub.ingestion.source.demo_data.DemoDataSource", "unity-catalog = datahub.ingestion.source.unity.source:UnityCatalogSource", "gcs = datahub.ingestion.source.gcs.gcs_source:GCSSource", + "abs = datahub.ingestion.source.abs.source:ABSSource", "sql-queries = datahub.ingestion.source.sql_queries:SqlQueriesSource", "fivetran = datahub.ingestion.source.fivetran.fivetran:FivetranSource", "qlik-sense = datahub.ingestion.source.qlik_sense.qlik_sense:QlikSenseSource", diff --git a/metadata-ingestion/src/datahub/ingestion/source/abs/__init__.py b/metadata-ingestion/src/datahub/ingestion/source/abs/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/metadata-ingestion/src/datahub/ingestion/source/abs/config.py b/metadata-ingestion/src/datahub/ingestion/source/abs/config.py new file mode 100644 index 00000000000000..c62239527a1200 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/abs/config.py @@ -0,0 +1,163 @@ +import logging +from typing import Any, Dict, List, Optional, Union + +import pydantic +from pydantic.fields import Field + +from datahub.configuration.common import AllowDenyPattern +from datahub.configuration.source_common import DatasetSourceConfigMixin +from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated +from datahub.configuration.validate_field_rename import pydantic_renamed_field +from datahub.ingestion.source.abs.datalake_profiler_config import DataLakeProfilerConfig +from datahub.ingestion.source.azure.azure_common import AzureConnectionConfig +from datahub.ingestion.source.data_lake_common.config import PathSpecsConfigMixin +from datahub.ingestion.source.data_lake_common.path_spec import PathSpec +from datahub.ingestion.source.state.stale_entity_removal_handler import ( + StatefulStaleMetadataRemovalConfig, +) +from datahub.ingestion.source.state.stateful_ingestion_base import ( + StatefulIngestionConfigBase, +) +from datahub.ingestion.source_config.operation_config import is_profiling_enabled + +# hide annoying debug errors from py4j +logging.getLogger("py4j").setLevel(logging.ERROR) +logger: logging.Logger = logging.getLogger(__name__) + + +class DataLakeSourceConfig( + StatefulIngestionConfigBase, DatasetSourceConfigMixin, PathSpecsConfigMixin +): + platform: str = Field( + default="", + description="The platform that this source connects to (either 'abs' or 'file'). " + "If not specified, the platform will be inferred from the path_specs.", + ) + + azure_config: Optional[AzureConnectionConfig] = Field( + default=None, description="Azure configuration" + ) + + stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None + # Whether to create Datahub Azure Container properties + use_abs_container_properties: Optional[bool] = Field( + None, + description="Whether to create tags in datahub from the abs container properties", + ) + # Whether to create Datahub Azure blob tags + use_abs_blob_tags: Optional[bool] = Field( + None, + description="Whether to create tags in datahub from the abs blob tags", + ) + # Whether to create Datahub Azure blob properties + use_abs_blob_properties: Optional[bool] = Field( + None, + description="Whether to create tags in datahub from the abs blob properties", + ) + + # Whether to update the table schema when schema in files within the partitions are updated + _update_schema_on_partition_file_updates_deprecation = pydantic_field_deprecated( + "update_schema_on_partition_file_updates", + message="update_schema_on_partition_file_updates is deprecated. This behaviour is the default now.", + ) + + profile_patterns: AllowDenyPattern = Field( + default=AllowDenyPattern.allow_all(), + description="regex patterns for tables to profile ", + ) + profiling: DataLakeProfilerConfig = Field( + default=DataLakeProfilerConfig(), description="Data profiling configuration" + ) + + spark_driver_memory: str = Field( + default="4g", description="Max amount of memory to grant Spark." + ) + + spark_config: Dict[str, Any] = Field( + description='Spark configuration properties to set on the SparkSession. Put config property names into quotes. For example: \'"spark.executor.memory": "2g"\'', + default={}, + ) + + max_rows: int = Field( + default=100, + description="Maximum number of rows to use when inferring schemas for TSV and CSV files.", + ) + add_partition_columns_to_schema: bool = Field( + default=False, + description="Whether to add partition fields to the schema.", + ) + verify_ssl: Union[bool, str] = Field( + default=True, + description="Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use.", + ) + + number_of_files_to_sample: int = Field( + default=100, + description="Number of files to list to sample for schema inference. This will be ignored if sample_files is set to False in the pathspec.", + ) + + _rename_path_spec_to_plural = pydantic_renamed_field( + "path_spec", "path_specs", lambda path_spec: [path_spec] + ) + + def is_profiling_enabled(self) -> bool: + return self.profiling.enabled and is_profiling_enabled( + self.profiling.operation_config + ) + + @pydantic.validator("path_specs", always=True) + def check_path_specs_and_infer_platform( + cls, path_specs: List[PathSpec], values: Dict + ) -> List[PathSpec]: + if len(path_specs) == 0: + raise ValueError("path_specs must not be empty") + + # Check that all path specs have the same platform. + guessed_platforms = set( + "abs" if path_spec.is_abs else "file" for path_spec in path_specs + ) + if len(guessed_platforms) > 1: + raise ValueError( + f"Cannot have multiple platforms in path_specs: {guessed_platforms}" + ) + guessed_platform = guessed_platforms.pop() + + # Ensure abs configs aren't used for file sources. + if guessed_platform != "abs" and ( + values.get("use_abs_container_properties") + or values.get("use_abs_blob_tags") + or values.get("use_abs_blob_properties") + ): + raise ValueError( + "Cannot grab abs blob/container tags when platform is not abs. Remove the flag or use abs." + ) + + # Infer platform if not specified. + if values.get("platform") and values["platform"] != guessed_platform: + raise ValueError( + f"All path_specs belong to {guessed_platform} platform, but platform is set to {values['platform']}" + ) + else: + logger.debug(f'Setting config "platform": {guessed_platform}') + values["platform"] = guessed_platform + + return path_specs + + @pydantic.validator("platform", always=True) + def platform_not_empty(cls, platform: str, values: dict) -> str: + inferred_platform = values.get( + "platform", None + ) # we may have inferred it above + platform = platform or inferred_platform + if not platform: + raise ValueError("platform must not be empty") + return platform + + @pydantic.root_validator() + def ensure_profiling_pattern_is_passed_to_profiling( + cls, values: Dict[str, Any] + ) -> Dict[str, Any]: + profiling: Optional[DataLakeProfilerConfig] = values.get("profiling") + if profiling is not None and profiling.enabled: + profiling._allow_deny_patterns = values["profile_patterns"] + return values diff --git a/metadata-ingestion/src/datahub/ingestion/source/abs/datalake_profiler_config.py b/metadata-ingestion/src/datahub/ingestion/source/abs/datalake_profiler_config.py new file mode 100644 index 00000000000000..9f6d13a08b182e --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/abs/datalake_profiler_config.py @@ -0,0 +1,92 @@ +from typing import Any, Dict, Optional + +import pydantic +from pydantic.fields import Field + +from datahub.configuration import ConfigModel +from datahub.configuration.common import AllowDenyPattern +from datahub.ingestion.source_config.operation_config import OperationConfig + + +class DataLakeProfilerConfig(ConfigModel): + enabled: bool = Field( + default=False, description="Whether profiling should be done." + ) + operation_config: OperationConfig = Field( + default_factory=OperationConfig, + description="Experimental feature. To specify operation configs.", + ) + + # These settings will override the ones below. + profile_table_level_only: bool = Field( + default=False, + description="Whether to perform profiling at table-level only or include column-level profiling as well.", + ) + + _allow_deny_patterns: AllowDenyPattern = pydantic.PrivateAttr( + default=AllowDenyPattern.allow_all(), + ) + + max_number_of_fields_to_profile: Optional[pydantic.PositiveInt] = Field( + default=None, + description="A positive integer that specifies the maximum number of columns to profile for any table. `None` implies all columns. The cost of profiling goes up significantly as the number of columns to profile goes up.", + ) + + include_field_null_count: bool = Field( + default=True, + description="Whether to profile for the number of nulls for each column.", + ) + include_field_min_value: bool = Field( + default=True, + description="Whether to profile for the min value of numeric columns.", + ) + include_field_max_value: bool = Field( + default=True, + description="Whether to profile for the max value of numeric columns.", + ) + include_field_mean_value: bool = Field( + default=True, + description="Whether to profile for the mean value of numeric columns.", + ) + include_field_median_value: bool = Field( + default=True, + description="Whether to profile for the median value of numeric columns.", + ) + include_field_stddev_value: bool = Field( + default=True, + description="Whether to profile for the standard deviation of numeric columns.", + ) + include_field_quantiles: bool = Field( + default=True, + description="Whether to profile for the quantiles of numeric columns.", + ) + include_field_distinct_value_frequencies: bool = Field( + default=True, description="Whether to profile for distinct value frequencies." + ) + include_field_histogram: bool = Field( + default=True, + description="Whether to profile for the histogram for numeric fields.", + ) + include_field_sample_values: bool = Field( + default=True, + description="Whether to profile for the sample values for all columns.", + ) + + @pydantic.root_validator() + def ensure_field_level_settings_are_normalized( + cls: "DataLakeProfilerConfig", values: Dict[str, Any] + ) -> Dict[str, Any]: + max_num_fields_to_profile_key = "max_number_of_fields_to_profile" + max_num_fields_to_profile = values.get(max_num_fields_to_profile_key) + + # Disable all field-level metrics. + if values.get("profile_table_level_only"): + for field_level_metric in cls.__fields__: + if field_level_metric.startswith("include_field_"): + values.setdefault(field_level_metric, False) + + assert ( + max_num_fields_to_profile is None + ), f"{max_num_fields_to_profile_key} should be set to None" + + return values diff --git a/metadata-ingestion/src/datahub/ingestion/source/abs/profiling.py b/metadata-ingestion/src/datahub/ingestion/source/abs/profiling.py new file mode 100644 index 00000000000000..c969b229989e84 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/abs/profiling.py @@ -0,0 +1,472 @@ +import dataclasses +from typing import Any, List, Optional + +from pandas import DataFrame +from pydeequ.analyzers import ( + AnalysisRunBuilder, + AnalysisRunner, + AnalyzerContext, + ApproxCountDistinct, + ApproxQuantile, + ApproxQuantiles, + Histogram, + Maximum, + Mean, + Minimum, + StandardDeviation, +) +from pyspark.sql import SparkSession +from pyspark.sql.functions import col, count, isnan, when +from pyspark.sql.types import ( + DataType as SparkDataType, + DateType, + DecimalType, + DoubleType, + FloatType, + IntegerType, + LongType, + NullType, + ShortType, + StringType, + TimestampType, +) + +from datahub.emitter.mce_builder import get_sys_time +from datahub.ingestion.source.profiling.common import ( + Cardinality, + convert_to_cardinality, +) +from datahub.ingestion.source.s3.datalake_profiler_config import DataLakeProfilerConfig +from datahub.ingestion.source.s3.report import DataLakeSourceReport +from datahub.metadata.schema_classes import ( + DatasetFieldProfileClass, + DatasetProfileClass, + HistogramClass, + QuantileClass, + ValueFrequencyClass, +) +from datahub.telemetry import stats, telemetry + +NUM_SAMPLE_ROWS = 20 +QUANTILES = [0.05, 0.25, 0.5, 0.75, 0.95] +MAX_HIST_BINS = 25 + + +def null_str(value: Any) -> Optional[str]: + # str() with a passthrough for None. + return str(value) if value is not None else None + + +@dataclasses.dataclass +class _SingleColumnSpec: + column: str + column_profile: DatasetFieldProfileClass + + # if the histogram is a list of value frequencies (discrete data) or bins (continuous data) + histogram_distinct: Optional[bool] = None + + type_: SparkDataType = NullType # type:ignore + + unique_count: Optional[int] = None + non_null_count: Optional[int] = None + cardinality: Optional[Cardinality] = None + + +class _SingleTableProfiler: + spark: SparkSession + dataframe: DataFrame + analyzer: AnalysisRunBuilder + column_specs: List[_SingleColumnSpec] + row_count: int + profiling_config: DataLakeProfilerConfig + file_path: str + columns_to_profile: List[str] + ignored_columns: List[str] + profile: DatasetProfileClass + report: DataLakeSourceReport + + def __init__( + self, + dataframe: DataFrame, + spark: SparkSession, + profiling_config: DataLakeProfilerConfig, + report: DataLakeSourceReport, + file_path: str, + ): + self.spark = spark + self.dataframe = dataframe + self.analyzer = AnalysisRunner(spark).onData(dataframe) + self.column_specs = [] + self.row_count = dataframe.count() + self.profiling_config = profiling_config + self.file_path = file_path + self.columns_to_profile = [] + self.ignored_columns = [] + self.profile = DatasetProfileClass(timestampMillis=get_sys_time()) + self.report = report + + self.profile.rowCount = self.row_count + self.profile.columnCount = len(dataframe.columns) + + column_types = {x.name: x.dataType for x in dataframe.schema.fields} + + if self.profiling_config.profile_table_level_only: + return + + # get column distinct counts + for column in dataframe.columns: + if not self.profiling_config._allow_deny_patterns.allowed(column): + self.ignored_columns.append(column) + continue + + self.columns_to_profile.append(column) + # Normal CountDistinct is ridiculously slow + self.analyzer.addAnalyzer(ApproxCountDistinct(column)) + + if self.profiling_config.max_number_of_fields_to_profile is not None: + if ( + len(self.columns_to_profile) + > self.profiling_config.max_number_of_fields_to_profile + ): + columns_being_dropped = self.columns_to_profile[ + self.profiling_config.max_number_of_fields_to_profile : + ] + self.columns_to_profile = self.columns_to_profile[ + : self.profiling_config.max_number_of_fields_to_profile + ] + + self.report.report_file_dropped( + f"The max_number_of_fields_to_profile={self.profiling_config.max_number_of_fields_to_profile} reached. Profile of columns {self.file_path}({', '.join(sorted(columns_being_dropped))})" + ) + + analysis_result = self.analyzer.run() + analysis_metrics = AnalyzerContext.successMetricsAsJson( + self.spark, analysis_result + ) + + # reshape distinct counts into dictionary + column_distinct_counts = { + x["instance"]: int(x["value"]) + for x in analysis_metrics + if x["name"] == "ApproxCountDistinct" + } + + select_numeric_null_counts = [ + count( + when( + isnan(c) | col(c).isNull(), + c, + ) + ).alias(c) + for c in self.columns_to_profile + if column_types[column] in [DoubleType, FloatType] + ] + + # PySpark doesn't support isnan() on non-float/double columns + select_nonnumeric_null_counts = [ + count( + when( + col(c).isNull(), + c, + ) + ).alias(c) + for c in self.columns_to_profile + if column_types[column] not in [DoubleType, FloatType] + ] + + null_counts = dataframe.select( + select_numeric_null_counts + select_nonnumeric_null_counts + ) + column_null_counts = null_counts.toPandas().T[0].to_dict() + column_null_fractions = { + c: column_null_counts[c] / self.row_count if self.row_count != 0 else 0 + for c in self.columns_to_profile + } + column_nonnull_counts = { + c: self.row_count - column_null_counts[c] for c in self.columns_to_profile + } + + column_unique_proportions = { + c: ( + column_distinct_counts[c] / column_nonnull_counts[c] + if column_nonnull_counts[c] > 0 + else 0 + ) + for c in self.columns_to_profile + } + + if self.profiling_config.include_field_sample_values: + # take sample and convert to Pandas DataFrame + if self.row_count < NUM_SAMPLE_ROWS: + # if row count is less than number to sample, just take all rows + rdd_sample = dataframe.rdd.take(self.row_count) + else: + rdd_sample = dataframe.rdd.takeSample(False, NUM_SAMPLE_ROWS, seed=0) + + # init column specs with profiles + for column in self.columns_to_profile: + column_profile = DatasetFieldProfileClass(fieldPath=column) + + column_spec = _SingleColumnSpec(column, column_profile) + + column_profile.uniqueCount = column_distinct_counts.get(column) + column_profile.uniqueProportion = column_unique_proportions.get(column) + column_profile.nullCount = column_null_counts.get(column) + column_profile.nullProportion = column_null_fractions.get(column) + if self.profiling_config.include_field_sample_values: + column_profile.sampleValues = sorted( + [str(x[column]) for x in rdd_sample] + ) + + column_spec.type_ = column_types[column] + column_spec.cardinality = convert_to_cardinality( + column_distinct_counts[column], + column_null_fractions[column], + ) + + self.column_specs.append(column_spec) + + def prep_min_value(self, column: str) -> None: + if self.profiling_config.include_field_min_value: + self.analyzer.addAnalyzer(Minimum(column)) + + def prep_max_value(self, column: str) -> None: + if self.profiling_config.include_field_max_value: + self.analyzer.addAnalyzer(Maximum(column)) + + def prep_mean_value(self, column: str) -> None: + if self.profiling_config.include_field_mean_value: + self.analyzer.addAnalyzer(Mean(column)) + + def prep_median_value(self, column: str) -> None: + if self.profiling_config.include_field_median_value: + self.analyzer.addAnalyzer(ApproxQuantile(column, 0.5)) + + def prep_stdev_value(self, column: str) -> None: + if self.profiling_config.include_field_stddev_value: + self.analyzer.addAnalyzer(StandardDeviation(column)) + + def prep_quantiles(self, column: str) -> None: + if self.profiling_config.include_field_quantiles: + self.analyzer.addAnalyzer(ApproxQuantiles(column, QUANTILES)) + + def prep_distinct_value_frequencies(self, column: str) -> None: + if self.profiling_config.include_field_distinct_value_frequencies: + self.analyzer.addAnalyzer(Histogram(column)) + + def prep_field_histogram(self, column: str) -> None: + if self.profiling_config.include_field_histogram: + self.analyzer.addAnalyzer(Histogram(column, maxDetailBins=MAX_HIST_BINS)) + + def prepare_table_profiles(self) -> None: + row_count = self.row_count + + telemetry.telemetry_instance.ping( + "profile_data_lake_table", + {"rows_profiled": stats.discretize(row_count)}, + ) + + # loop through the columns and add the analyzers + for column_spec in self.column_specs: + column = column_spec.column + column_profile = column_spec.column_profile + type_ = column_spec.type_ + cardinality = column_spec.cardinality + + non_null_count = column_spec.non_null_count + unique_count = column_spec.unique_count + + if ( + self.profiling_config.include_field_null_count + and non_null_count is not None + ): + null_count = row_count - non_null_count + assert null_count >= 0 + column_profile.nullCount = null_count + if row_count > 0: + column_profile.nullProportion = null_count / row_count + + if unique_count is not None: + column_profile.uniqueCount = unique_count + if non_null_count is not None and non_null_count > 0: + column_profile.uniqueProportion = unique_count / non_null_count + + if isinstance( + type_, + ( + DecimalType, + DoubleType, + FloatType, + IntegerType, + LongType, + ShortType, + ), + ): + if cardinality == Cardinality.UNIQUE: + pass + elif cardinality in [ + Cardinality.ONE, + Cardinality.TWO, + Cardinality.VERY_FEW, + Cardinality.FEW, + ]: + column_spec.histogram_distinct = True + self.prep_distinct_value_frequencies(column) + elif cardinality in [ + Cardinality.MANY, + Cardinality.VERY_MANY, + Cardinality.UNIQUE, + ]: + column_spec.histogram_distinct = False + self.prep_min_value(column) + self.prep_max_value(column) + self.prep_mean_value(column) + self.prep_median_value(column) + self.prep_stdev_value(column) + self.prep_quantiles(column) + self.prep_field_histogram(column) + else: # unknown cardinality - skip + pass + + elif isinstance(type_, StringType): + if cardinality in [ + Cardinality.ONE, + Cardinality.TWO, + Cardinality.VERY_FEW, + Cardinality.FEW, + ]: + column_spec.histogram_distinct = True + self.prep_distinct_value_frequencies( + column, + ) + + elif isinstance(type_, (DateType, TimestampType)): + self.prep_min_value(column) + self.prep_max_value(column) + + # FIXME: Re-add histogram once kl_divergence has been modified to support datetimes + + if cardinality in [ + Cardinality.ONE, + Cardinality.TWO, + Cardinality.VERY_FEW, + Cardinality.FEW, + ]: + self.prep_distinct_value_frequencies( + column, + ) + + def extract_table_profiles( + self, + analysis_metrics: DataFrame, + ) -> None: + self.profile.fieldProfiles = [] + + analysis_metrics = analysis_metrics.toPandas() + # DataFrame with following columns: + # entity: "Column" for column profile, "Table" for table profile + # instance: name of column being profiled. "*" for table profiles + # name: name of metric. Histogram metrics are formatted as "Histogram.." + # value: value of metric + + column_metrics = analysis_metrics[analysis_metrics["entity"] == "Column"] + + # resolve histogram types for grouping + column_metrics["kind"] = column_metrics["name"].apply( + lambda x: "Histogram" if x.startswith("Histogram.") else x + ) + + column_histogram_metrics = column_metrics[column_metrics["kind"] == "Histogram"] + column_nonhistogram_metrics = column_metrics[ + column_metrics["kind"] != "Histogram" + ] + + histogram_columns = set() + + if len(column_histogram_metrics) > 0: + # we only want the absolute counts for each histogram for now + column_histogram_metrics = column_histogram_metrics[ + column_histogram_metrics["name"].apply( + lambda x: x.startswith("Histogram.abs.") + ) + ] + # get the histogram bins by chopping off the "Histogram.abs." prefix + column_histogram_metrics["bin"] = column_histogram_metrics["name"].apply( + lambda x: x[14:] + ) + + # reshape histogram counts for easier access + histogram_counts = column_histogram_metrics.set_index(["instance", "bin"])[ + "value" + ] + + histogram_columns = set(histogram_counts.index.get_level_values(0)) + + profiled_columns = set() + + if len(column_nonhistogram_metrics) > 0: + # reshape other metrics for easier access + nonhistogram_metrics = column_nonhistogram_metrics.set_index( + ["instance", "name"] + )["value"] + + profiled_columns = set(nonhistogram_metrics.index.get_level_values(0)) + # histogram_columns = set(histogram_counts.index.get_level_values(0)) + + for column_spec in self.column_specs: + column = column_spec.column + column_profile = column_spec.column_profile + + if column not in profiled_columns: + continue + + # convert to Dict so we can use .get + deequ_column_profile = nonhistogram_metrics.loc[column].to_dict() + + # uniqueCount, uniqueProportion, nullCount, nullProportion, sampleValues already set in TableWrapper + column_profile.min = null_str(deequ_column_profile.get("Minimum")) + column_profile.max = null_str(deequ_column_profile.get("Maximum")) + column_profile.mean = null_str(deequ_column_profile.get("Mean")) + column_profile.median = null_str( + deequ_column_profile.get("ApproxQuantiles-0.5") + ) + column_profile.stdev = null_str( + deequ_column_profile.get("StandardDeviation") + ) + if all( + deequ_column_profile.get(f"ApproxQuantiles-{quantile}") is not None + for quantile in QUANTILES + ): + column_profile.quantiles = [ + QuantileClass( + quantile=str(quantile), + value=str(deequ_column_profile[f"ApproxQuantiles-{quantile}"]), + ) + for quantile in QUANTILES + ] + + if column in histogram_columns: + column_histogram = histogram_counts.loc[column] + # sort so output is deterministic + column_histogram = column_histogram.sort_index() + + if column_spec.histogram_distinct: + column_profile.distinctValueFrequencies = [ + ValueFrequencyClass( + value=value, frequency=int(column_histogram.loc[value]) + ) + for value in column_histogram.index + ] + # sort so output is deterministic + column_profile.distinctValueFrequencies = sorted( + column_profile.distinctValueFrequencies, key=lambda x: x.value + ) + + else: + column_profile.histogram = HistogramClass( + [str(x) for x in column_histogram.index], + [float(x) for x in column_histogram], + ) + + # append the column profile to the dataset profile + self.profile.fieldProfiles.append(column_profile) diff --git a/metadata-ingestion/src/datahub/ingestion/source/abs/report.py b/metadata-ingestion/src/datahub/ingestion/source/abs/report.py new file mode 100644 index 00000000000000..c24e2f97060916 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/abs/report.py @@ -0,0 +1,19 @@ +import dataclasses +from dataclasses import field as dataclass_field +from typing import List + +from datahub.ingestion.source.state.stale_entity_removal_handler import ( + StaleEntityRemovalSourceReport, +) + + +@dataclasses.dataclass +class DataLakeSourceReport(StaleEntityRemovalSourceReport): + files_scanned = 0 + filtered: List[str] = dataclass_field(default_factory=list) + + def report_file_scanned(self) -> None: + self.files_scanned += 1 + + def report_file_dropped(self, file: str) -> None: + self.filtered.append(file) diff --git a/metadata-ingestion/src/datahub/ingestion/source/abs/source.py b/metadata-ingestion/src/datahub/ingestion/source/abs/source.py new file mode 100644 index 00000000000000..07cc694e1b1628 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/abs/source.py @@ -0,0 +1,700 @@ +import dataclasses +import functools +import logging +import os +import pathlib +import re +import time +from collections import OrderedDict +from datetime import datetime +from pathlib import PurePath +from typing import Any, Dict, Iterable, List, Optional, Tuple + +import smart_open.compression as so_compression +from more_itertools import peekable +from pyspark.sql.types import ( + ArrayType, + BinaryType, + BooleanType, + ByteType, + DateType, + DecimalType, + DoubleType, + FloatType, + IntegerType, + LongType, + MapType, + NullType, + ShortType, + StringType, + StructField, + StructType, + TimestampType, +) +from smart_open import open as smart_open + +from datahub.emitter.mce_builder import ( + make_data_platform_urn, + make_dataplatform_instance_urn, + make_dataset_urn_with_platform_instance, +) +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.decorators import ( + SourceCapability, + SupportStatus, + capability, + config_class, + platform_name, + support_status, +) +from datahub.ingestion.api.source import MetadataWorkUnitProcessor, SourceReport +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.abs.config import DataLakeSourceConfig, PathSpec +from datahub.ingestion.source.abs.report import DataLakeSourceReport +from datahub.ingestion.source.azure.abs_util import ( + get_abs_properties, + get_abs_tags, + get_container_name, + get_container_relative_path, + get_key_prefix, + list_folders, + strip_abs_prefix, +) +from datahub.ingestion.source.data_lake_common.data_lake_utils import ContainerWUCreator +from datahub.ingestion.source.schema_inference import avro, csv_tsv, json, parquet +from datahub.ingestion.source.state.stale_entity_removal_handler import ( + StaleEntityRemovalHandler, +) +from datahub.ingestion.source.state.stateful_ingestion_base import ( + StatefulIngestionSourceBase, +) +from datahub.metadata.com.linkedin.pegasus2avro.schema import ( + BooleanTypeClass, + BytesTypeClass, + DateTypeClass, + NullTypeClass, + NumberTypeClass, + RecordTypeClass, + SchemaField, + SchemaFieldDataType, + SchemaMetadata, + StringTypeClass, + TimeTypeClass, +) +from datahub.metadata.schema_classes import ( + DataPlatformInstanceClass, + DatasetPropertiesClass, + MapTypeClass, + OperationClass, + OperationTypeClass, + OtherSchemaClass, + _Aspect, +) +from datahub.telemetry import telemetry +from datahub.utilities.perf_timer import PerfTimer + +# hide annoying debug errors from py4j +logging.getLogger("py4j").setLevel(logging.ERROR) +logger: logging.Logger = logging.getLogger(__name__) + +# for a list of all types, see https://spark.apache.org/docs/3.0.3/api/python/_modules/pyspark/sql/types.html +_field_type_mapping = { + NullType: NullTypeClass, + StringType: StringTypeClass, + BinaryType: BytesTypeClass, + BooleanType: BooleanTypeClass, + DateType: DateTypeClass, + TimestampType: TimeTypeClass, + DecimalType: NumberTypeClass, + DoubleType: NumberTypeClass, + FloatType: NumberTypeClass, + ByteType: BytesTypeClass, + IntegerType: NumberTypeClass, + LongType: NumberTypeClass, + ShortType: NumberTypeClass, + ArrayType: NullTypeClass, + MapType: MapTypeClass, + StructField: RecordTypeClass, + StructType: RecordTypeClass, +} +PAGE_SIZE = 1000 + +# Hack to support the .gzip extension with smart_open. +so_compression.register_compressor(".gzip", so_compression._COMPRESSOR_REGISTRY[".gz"]) + + +def get_column_type( + report: SourceReport, dataset_name: str, column_type: str +) -> SchemaFieldDataType: + """ + Maps known Spark types to datahub types + """ + TypeClass: Any = None + + for field_type, type_class in _field_type_mapping.items(): + if isinstance(column_type, field_type): + TypeClass = type_class + break + + # if still not found, report the warning + if TypeClass is None: + report.report_warning( + dataset_name, f"unable to map type {column_type} to metadata schema" + ) + TypeClass = NullTypeClass + + return SchemaFieldDataType(type=TypeClass()) + + +# config flags to emit telemetry for +config_options_to_report = [ + "platform", + "use_relative_path", + "ignore_dotfiles", +] + + +def partitioned_folder_comparator(folder1: str, folder2: str) -> int: + # Try to convert to number and compare if the folder name is a number + try: + # Stripping = from the folder names as it most probably partition name part like year=2021 + if "=" in folder1 and "=" in folder2: + if folder1.rsplit("=", 1)[0] == folder2.rsplit("=", 1)[0]: + folder1 = folder1.rsplit("=", 1)[-1] + folder2 = folder2.rsplit("=", 1)[-1] + + num_folder1 = int(folder1) + num_folder2 = int(folder2) + if num_folder1 == num_folder2: + return 0 + else: + return 1 if num_folder1 > num_folder2 else -1 + except Exception: + # If folder name is not a number then do string comparison + if folder1 == folder2: + return 0 + else: + return 1 if folder1 > folder2 else -1 + + +@dataclasses.dataclass +class TableData: + display_name: str + is_abs: bool + full_path: str + rel_path: str + partitions: Optional[OrderedDict] + timestamp: datetime + table_path: str + size_in_bytes: int + number_of_files: int + + +@platform_name("ABS Data Lake", id="abs") +@config_class(DataLakeSourceConfig) +@support_status(SupportStatus.INCUBATING) +@capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration") +@capability(SourceCapability.TAGS, "Can extract ABS object/container tags if enabled") +@capability( + SourceCapability.DELETION_DETECTION, + "Optionally enabled via `stateful_ingestion.remove_stale_metadata`", + supported=True, +) +class ABSSource(StatefulIngestionSourceBase): + source_config: DataLakeSourceConfig + report: DataLakeSourceReport + profiling_times_taken: List[float] + container_WU_creator: ContainerWUCreator + + def __init__(self, config: DataLakeSourceConfig, ctx: PipelineContext): + super().__init__(config, ctx) + self.source_config = config + self.report = DataLakeSourceReport() + self.profiling_times_taken = [] + config_report = { + config_option: config.dict().get(config_option) + for config_option in config_options_to_report + } + config_report = { + **config_report, + "profiling_enabled": config.is_profiling_enabled(), + } + + telemetry.telemetry_instance.ping( + "data_lake_config", + config_report, + ) + + @classmethod + def create(cls, config_dict, ctx): + config = DataLakeSourceConfig.parse_obj(config_dict) + + return cls(config, ctx) + + def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List: + if self.is_abs_platform(): + if self.source_config.azure_config is None: + raise ValueError("Azure config is required for ABS file sources") + + abs_client = self.source_config.azure_config.get_blob_service_client() + file = smart_open( + f"azure://{self.source_config.azure_config.container_name}/{table_data.rel_path}", + "rb", + transport_params={"client": abs_client}, + ) + else: + # We still use smart_open here to take advantage of the compression + # capabilities of smart_open. + file = smart_open(table_data.full_path, "rb") + + fields = [] + + extension = pathlib.Path(table_data.full_path).suffix + from datahub.ingestion.source.data_lake_common.path_spec import ( + SUPPORTED_COMPRESSIONS, + ) + + if path_spec.enable_compression and (extension[1:] in SUPPORTED_COMPRESSIONS): + # Removing the compression extension and using the one before that like .json.gz -> .json + extension = pathlib.Path(table_data.full_path).with_suffix("").suffix + if extension == "" and path_spec.default_extension: + extension = f".{path_spec.default_extension}" + + try: + if extension == ".parquet": + fields = parquet.ParquetInferrer().infer_schema(file) + elif extension == ".csv": + fields = csv_tsv.CsvInferrer( + max_rows=self.source_config.max_rows + ).infer_schema(file) + elif extension == ".tsv": + fields = csv_tsv.TsvInferrer( + max_rows=self.source_config.max_rows + ).infer_schema(file) + elif extension == ".json": + fields = json.JsonInferrer().infer_schema(file) + elif extension == ".avro": + fields = avro.AvroInferrer().infer_schema(file) + else: + self.report.report_warning( + table_data.full_path, + f"file {table_data.full_path} has unsupported extension", + ) + file.close() + except Exception as e: + self.report.report_warning( + table_data.full_path, + f"could not infer schema for file {table_data.full_path}: {e}", + ) + file.close() + logger.debug(f"Extracted fields in schema: {fields}") + fields = sorted(fields, key=lambda f: f.fieldPath) + + if self.source_config.add_partition_columns_to_schema: + self.add_partition_columns_to_schema( + fields=fields, path_spec=path_spec, full_path=table_data.full_path + ) + + return fields + + def add_partition_columns_to_schema( + self, path_spec: PathSpec, full_path: str, fields: List[SchemaField] + ) -> None: + vars = path_spec.get_named_vars(full_path) + if vars is not None and "partition" in vars: + for partition in vars["partition"].values(): + partition_arr = partition.split("=") + if len(partition_arr) != 2: + logger.debug( + f"Could not derive partition key from partition field {partition}" + ) + continue + partition_key = partition_arr[0] + fields.append( + SchemaField( + fieldPath=f"{partition_key}", + nativeDataType="string", + type=SchemaFieldDataType(StringTypeClass()), + isPartitioningKey=True, + nullable=True, + recursive=False, + ) + ) + + def _create_table_operation_aspect(self, table_data: TableData) -> OperationClass: + reported_time = int(time.time() * 1000) + + operation = OperationClass( + timestampMillis=reported_time, + lastUpdatedTimestamp=int(table_data.timestamp.timestamp() * 1000), + operationType=OperationTypeClass.UPDATE, + ) + + return operation + + def ingest_table( + self, table_data: TableData, path_spec: PathSpec + ) -> Iterable[MetadataWorkUnit]: + aspects: List[Optional[_Aspect]] = [] + + logger.info(f"Extracting table schema from file: {table_data.full_path}") + browse_path: str = ( + strip_abs_prefix(table_data.table_path) + if self.is_abs_platform() + else table_data.table_path.strip("/") + ) + + data_platform_urn = make_data_platform_urn(self.source_config.platform) + logger.info(f"Creating dataset urn with name: {browse_path}") + dataset_urn = make_dataset_urn_with_platform_instance( + self.source_config.platform, + browse_path, + self.source_config.platform_instance, + self.source_config.env, + ) + + if self.source_config.platform_instance: + data_platform_instance = DataPlatformInstanceClass( + platform=data_platform_urn, + instance=make_dataplatform_instance_urn( + self.source_config.platform, self.source_config.platform_instance + ), + ) + aspects.append(data_platform_instance) + + container = get_container_name(table_data.table_path) + key_prefix = ( + get_key_prefix(table_data.table_path) + if table_data.full_path == table_data.table_path + else None + ) + + custom_properties = get_abs_properties( + container, + key_prefix, + full_path=str(table_data.full_path), + number_of_files=table_data.number_of_files, + size_in_bytes=table_data.size_in_bytes, + sample_files=path_spec.sample_files, + azure_config=self.source_config.azure_config, + use_abs_container_properties=self.source_config.use_abs_container_properties, + use_abs_blob_properties=self.source_config.use_abs_blob_properties, + ) + + dataset_properties = DatasetPropertiesClass( + description="", + name=table_data.display_name, + customProperties=custom_properties, + ) + aspects.append(dataset_properties) + if table_data.size_in_bytes > 0: + try: + fields = self.get_fields(table_data, path_spec) + schema_metadata = SchemaMetadata( + schemaName=table_data.display_name, + platform=data_platform_urn, + version=0, + hash="", + fields=fields, + platformSchema=OtherSchemaClass(rawSchema=""), + ) + aspects.append(schema_metadata) + except Exception as e: + logger.error( + f"Failed to extract schema from file {table_data.full_path}. The error was:{e}" + ) + else: + logger.info( + f"Skipping schema extraction for empty file {table_data.full_path}" + ) + + if ( + self.source_config.use_abs_container_properties + or self.source_config.use_abs_blob_tags + ): + abs_tags = get_abs_tags( + container, + key_prefix, + dataset_urn, + self.source_config.azure_config, + self.ctx, + self.source_config.use_abs_blob_tags, + ) + if abs_tags: + aspects.append(abs_tags) + + operation = self._create_table_operation_aspect(table_data) + aspects.append(operation) + for mcp in MetadataChangeProposalWrapper.construct_many( + entityUrn=dataset_urn, + aspects=aspects, + ): + yield mcp.as_workunit() + + yield from self.container_WU_creator.create_container_hierarchy( + table_data.table_path, dataset_urn + ) + + def get_prefix(self, relative_path: str) -> str: + index = re.search(r"[\*|\{]", relative_path) + if index: + return relative_path[: index.start()] + else: + return relative_path + + def extract_table_name(self, path_spec: PathSpec, named_vars: dict) -> str: + if path_spec.table_name is None: + raise ValueError("path_spec.table_name is not set") + return path_spec.table_name.format_map(named_vars) + + def extract_table_data( + self, + path_spec: PathSpec, + path: str, + rel_path: str, + timestamp: datetime, + size: int, + ) -> TableData: + logger.debug(f"Getting table data for path: {path}") + table_name, table_path = path_spec.extract_table_name_and_path(path) + table_data = TableData( + display_name=table_name, + is_abs=self.is_abs_platform(), + full_path=path, + rel_path=rel_path, + partitions=None, + timestamp=timestamp, + table_path=table_path, + number_of_files=1, + size_in_bytes=size, + ) + return table_data + + def resolve_templated_folders( + self, container_name: str, prefix: str + ) -> Iterable[str]: + folder_split: List[str] = prefix.split("*", 1) + # If the len of split is 1 it means we don't have * in the prefix + if len(folder_split) == 1: + yield prefix + return + + folders: Iterable[str] = list_folders( + container_name, folder_split[0], self.source_config.azure_config + ) + for folder in folders: + yield from self.resolve_templated_folders( + container_name, f"{folder}{folder_split[1]}" + ) + + def get_dir_to_process( + self, + container_name: str, + folder: str, + path_spec: PathSpec, + protocol: str, + ) -> str: + iterator = list_folders( + container_name=container_name, + prefix=folder, + azure_config=self.source_config.azure_config, + ) + iterator = peekable(iterator) + if iterator: + sorted_dirs = sorted( + iterator, + key=functools.cmp_to_key(partitioned_folder_comparator), + reverse=True, + ) + for dir in sorted_dirs: + if path_spec.dir_allowed(f"{protocol}{container_name}/{dir}/"): + return self.get_dir_to_process( + container_name=container_name, + folder=dir + "/", + path_spec=path_spec, + protocol=protocol, + ) + return folder + else: + return folder + + def abs_browser( + self, path_spec: PathSpec, sample_size: int + ) -> Iterable[Tuple[str, str, datetime, int]]: + if self.source_config.azure_config is None: + raise ValueError("azure_config not set. Cannot browse Azure Blob Storage") + abs_blob_service_client = ( + self.source_config.azure_config.get_blob_service_client() + ) + container_client = abs_blob_service_client.get_container_client( + self.source_config.azure_config.container_name + ) + + container_name = self.source_config.azure_config.container_name + logger.debug(f"Scanning container: {container_name}") + + prefix = self.get_prefix(get_container_relative_path(path_spec.include)) + logger.debug(f"Scanning objects with prefix:{prefix}") + + matches = re.finditer(r"{\s*\w+\s*}", path_spec.include, re.MULTILINE) + matches_list = list(matches) + if matches_list and path_spec.sample_files: + max_start: int = -1 + include: str = path_spec.include + max_match: str = "" + for match in matches_list: + pos = include.find(match.group()) + if pos > max_start: + if max_match: + include = include.replace(max_match, "*") + max_start = match.start() + max_match = match.group() + + table_index = include.find(max_match) + + for folder in self.resolve_templated_folders( + container_name, + get_container_relative_path(include[:table_index]), + ): + try: + for f in list_folders( + container_name, f"{folder}", self.source_config.azure_config + ): + logger.info(f"Processing folder: {f}") + protocol = ContainerWUCreator.get_protocol(path_spec.include) + dir_to_process = self.get_dir_to_process( + container_name=container_name, + folder=f + "/", + path_spec=path_spec, + protocol=protocol, + ) + logger.info(f"Getting files from folder: {dir_to_process}") + dir_to_process = dir_to_process.rstrip("\\") + for obj in container_client.list_blobs( + name_starts_with=f"{dir_to_process}", + results_per_page=PAGE_SIZE, + ): + abs_path = self.create_abs_path(obj.name) + logger.debug(f"Sampling file: {abs_path}") + yield abs_path, obj.name, obj.last_modified, obj.size, + except Exception as e: + # This odd check if being done because boto does not have a proper exception to catch + # The exception that appears in stacktrace cannot actually be caught without a lot more work + # https://github.com/boto/boto3/issues/1195 + if "NoSuchBucket" in repr(e): + logger.debug( + f"Got NoSuchBucket exception for {container_name}", e + ) + self.get_report().report_warning( + "Missing bucket", f"No bucket found {container_name}" + ) + else: + raise e + else: + logger.debug( + "No template in the pathspec can't do sampling, fallbacking to do full scan" + ) + path_spec.sample_files = False + for obj in container_client.list_blobs( + prefix=f"{prefix}", results_per_page=PAGE_SIZE + ): + abs_path = self.create_abs_path(obj.name) + logger.debug(f"Path: {abs_path}") + # the following line if using the file_system_client + # yield abs_path, obj.last_modified, obj.content_length, + yield abs_path, obj.name, obj.last_modified, obj.size + + def create_abs_path(self, key: str) -> str: + if self.source_config.azure_config: + account_name = self.source_config.azure_config.account_name + container_name = self.source_config.azure_config.container_name + return ( + f"https://{account_name}.blob.core.windows.net/{container_name}/{key}" + ) + return "" + + def local_browser( + self, path_spec: PathSpec + ) -> Iterable[Tuple[str, str, datetime, int]]: + prefix = self.get_prefix(path_spec.include) + if os.path.isfile(prefix): + logger.debug(f"Scanning single local file: {prefix}") + file_name = prefix + yield prefix, file_name, datetime.utcfromtimestamp( + os.path.getmtime(prefix) + ), os.path.getsize(prefix) + else: + logger.debug(f"Scanning files under local folder: {prefix}") + for root, dirs, files in os.walk(prefix): + dirs.sort(key=functools.cmp_to_key(partitioned_folder_comparator)) + + for file in sorted(files): + # We need to make sure the path is in posix style which is not true on windows + full_path = PurePath( + os.path.normpath(os.path.join(root, file)) + ).as_posix() + yield full_path, file, datetime.utcfromtimestamp( + os.path.getmtime(full_path) + ), os.path.getsize(full_path) + + def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: + self.container_WU_creator = ContainerWUCreator( + self.source_config.platform, + self.source_config.platform_instance, + self.source_config.env, + ) + with PerfTimer(): + assert self.source_config.path_specs + for path_spec in self.source_config.path_specs: + file_browser = ( + self.abs_browser( + path_spec, self.source_config.number_of_files_to_sample + ) + if self.is_abs_platform() + else self.local_browser(path_spec) + ) + table_dict: Dict[str, TableData] = {} + for file, name, timestamp, size in file_browser: + if not path_spec.allowed(file): + continue + table_data = self.extract_table_data( + path_spec, file, name, timestamp, size + ) + if table_data.table_path not in table_dict: + table_dict[table_data.table_path] = table_data + else: + table_dict[table_data.table_path].number_of_files = ( + table_dict[table_data.table_path].number_of_files + 1 + ) + table_dict[table_data.table_path].size_in_bytes = ( + table_dict[table_data.table_path].size_in_bytes + + table_data.size_in_bytes + ) + if ( + table_dict[table_data.table_path].timestamp + < table_data.timestamp + ) and (table_data.size_in_bytes > 0): + table_dict[ + table_data.table_path + ].full_path = table_data.full_path + table_dict[ + table_data.table_path + ].timestamp = table_data.timestamp + + for guid, table_data in table_dict.items(): + yield from self.ingest_table(table_data, path_spec) + + def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: + return [ + *super().get_workunit_processors(), + StaleEntityRemovalHandler.create( + self, self.source_config, self.ctx + ).workunit_processor, + ] + + def is_abs_platform(self): + return self.source_config.platform == "abs" + + def get_report(self): + return self.report diff --git a/metadata-ingestion/src/datahub/ingestion/source/azure/__init__.py b/metadata-ingestion/src/datahub/ingestion/source/azure/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/metadata-ingestion/src/datahub/ingestion/source/azure/abs_util.py b/metadata-ingestion/src/datahub/ingestion/source/azure/abs_util.py new file mode 100644 index 00000000000000..34faa0f0979eff --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/azure/abs_util.py @@ -0,0 +1,286 @@ +import logging +import os +import re +from typing import Dict, Iterable, List, Optional + +from azure.storage.blob import BlobProperties + +from datahub.emitter.mce_builder import make_tag_urn +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.source.azure.azure_common import AzureConnectionConfig +from datahub.metadata.schema_classes import GlobalTagsClass, TagAssociationClass + +ABS_PREFIXES_REGEX = re.compile( + r"(http[s]?://[a-z0-9]{3,24}\.blob\.core\.windows\.net/)" +) + +logging.getLogger("py4j").setLevel(logging.ERROR) +logger: logging.Logger = logging.getLogger(__name__) + + +def is_abs_uri(uri: str) -> bool: + return bool(ABS_PREFIXES_REGEX.match(uri)) + + +def get_abs_prefix(abs_uri: str) -> Optional[str]: + result = re.search(ABS_PREFIXES_REGEX, abs_uri) + if result and result.groups(): + return result.group(1) + return None + + +def strip_abs_prefix(abs_uri: str) -> str: + # remove abs prefix https://.blob.core.windows.net + abs_prefix = get_abs_prefix(abs_uri) + if not abs_prefix: + raise ValueError( + f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}" + ) + length_abs_prefix = len(abs_prefix) + return abs_uri[length_abs_prefix:] + + +def make_abs_urn(abs_uri: str, env: str) -> str: + abs_name = strip_abs_prefix(abs_uri) + + if abs_name.endswith("/"): + abs_name = abs_name[:-1] + + name, extension = os.path.splitext(abs_name) + + if extension != "": + extension = extension[1:] # remove the dot + return f"urn:li:dataset:(urn:li:dataPlatform:abs,{name}_{extension},{env})" + + return f"urn:li:dataset:(urn:li:dataPlatform:abs,{abs_name},{env})" + + +def get_container_name(abs_uri: str) -> str: + if not is_abs_uri(abs_uri): + raise ValueError( + f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}" + ) + return strip_abs_prefix(abs_uri).split("/")[0] + + +def get_key_prefix(abs_uri: str) -> str: + if not is_abs_uri(abs_uri): + raise ValueError( + f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}" + ) + return strip_abs_prefix(abs_uri).split("/", maxsplit=1)[1] + + +def get_abs_properties( + container_name: str, + blob_name: Optional[str], + full_path: str, + number_of_files: int, + size_in_bytes: int, + sample_files: bool, + azure_config: Optional[AzureConnectionConfig], + use_abs_container_properties: Optional[bool] = False, + use_abs_blob_properties: Optional[bool] = False, +) -> Dict[str, str]: + if azure_config is None: + raise ValueError( + "Azure configuration is not provided. Cannot retrieve container client." + ) + + blob_service_client = azure_config.get_blob_service_client() + container_client = blob_service_client.get_container_client( + container=container_name + ) + + custom_properties = {"schema_inferred_from": full_path} + if not sample_files: + custom_properties.update( + { + "number_of_files": str(number_of_files), + "size_in_bytes": str(size_in_bytes), + } + ) + + if use_abs_blob_properties and blob_name is not None: + blob_client = container_client.get_blob_client(blob=blob_name) + blob_properties = blob_client.get_blob_properties() + if blob_properties: + create_properties( + data=blob_properties, + prefix="blob", + custom_properties=custom_properties, + resource_name=blob_name, + json_properties=[ + "metadata", + "content_settings", + "lease", + "copy", + "immutability_policy", + ], + ) + else: + logger.warning( + f"No blob properties found for container={container_name}, blob={blob_name}." + ) + + if use_abs_container_properties: + container_properties = container_client.get_container_properties() + if container_properties: + create_properties( + data=container_properties, + prefix="container", + custom_properties=custom_properties, + resource_name=container_name, + json_properties=["metadata"], + ) + else: + logger.warning( + f"No container properties found for container={container_name}." + ) + + return custom_properties + + +def add_property( + key: str, value: str, custom_properties: Dict[str, str], resource_name: str +) -> Dict[str, str]: + if key in custom_properties: + key = f"{key}_{resource_name}" + if value is not None: + custom_properties[key] = str(value) + return custom_properties + + +def create_properties( + data: BlobProperties, + prefix: str, + custom_properties: Dict[str, str], + resource_name: str, + json_properties: List[str], +) -> None: + for item in data.items(): + key = item[0] + transformed_key = f"{prefix}_{key}" + value = item[1] + if value is None: + continue + try: + # These are known properties with a json value, we process these recursively... + if key in json_properties: + create_properties( + data=value, + prefix=f"{prefix}_{key}", + custom_properties=custom_properties, + resource_name=resource_name, + json_properties=json_properties, + ) + else: + custom_properties = add_property( + key=transformed_key, + value=value, + custom_properties=custom_properties, + resource_name=resource_name, + ) + except Exception as exception: + logger.debug( + f"Could not create property {key} value {value}, from resource {resource_name}: {exception}." + ) + + +def get_abs_tags( + container_name: str, + key_name: Optional[str], + dataset_urn: str, + azure_config: Optional[AzureConnectionConfig], + ctx: PipelineContext, + use_abs_blob_tags: Optional[bool] = False, +) -> Optional[GlobalTagsClass]: + # Todo add the service_client, when building out this get_abs_tags + if azure_config is None: + raise ValueError( + "Azure configuration is not provided. Cannot retrieve container client." + ) + + tags_to_add: List[str] = [] + blob_service_client = azure_config.get_blob_service_client() + container_client = blob_service_client.get_container_client(container_name) + blob_client = container_client.get_blob_client(blob=key_name) + + if use_abs_blob_tags and key_name is not None: + tag_set = blob_client.get_blob_tags() + if tag_set: + tags_to_add.extend( + make_tag_urn(f"""{key}:{value}""") for key, value in tag_set.items() + ) + else: + # Unlike container tags, if an object does not have tags, it will just return an empty array + # as opposed to an exception. + logger.info(f"No tags found for container={container_name} key={key_name}") + + if len(tags_to_add) == 0: + return None + + if ctx.graph is not None: + logger.debug("Connected to DatahubApi, grabbing current tags to maintain.") + current_tags: Optional[GlobalTagsClass] = ctx.graph.get_aspect( + entity_urn=dataset_urn, + aspect_type=GlobalTagsClass, + ) + if current_tags: + tags_to_add.extend([current_tag.tag for current_tag in current_tags.tags]) + else: + logger.warning("Could not connect to DatahubApi. No current tags to maintain") + + # Sort existing tags + tags_to_add = sorted(list(set(tags_to_add))) + # Remove duplicate tags + new_tags = GlobalTagsClass( + tags=[TagAssociationClass(tag_to_add) for tag_to_add in tags_to_add] + ) + return new_tags + + +def list_folders( + container_name: str, prefix: str, azure_config: Optional[AzureConnectionConfig] +) -> Iterable[str]: + if azure_config is None: + raise ValueError( + "Azure configuration is not provided. Cannot retrieve container client." + ) + + abs_blob_service_client = azure_config.get_blob_service_client() + container_client = abs_blob_service_client.get_container_client(container_name) + + current_level = prefix.count("/") + blob_list = container_client.list_blobs(name_starts_with=prefix) + + this_dict = {} + for blob in blob_list: + blob_name = blob.name[: blob.name.rfind("/") + 1] + folder_structure_arr = blob_name.split("/") + + folder_name = "" + if len(folder_structure_arr) > current_level: + folder_name = f"{folder_name}/{folder_structure_arr[current_level]}" + else: + continue + + folder_name = folder_name[1 : len(folder_name)] + + if folder_name.endswith("/"): + folder_name = folder_name[:-1] + + if folder_name == "": + continue + + folder_name = f"{prefix}{folder_name}" + if folder_name in this_dict: + continue + else: + this_dict[folder_name] = folder_name + + yield f"{folder_name}" + + +def get_container_relative_path(abs_uri: str) -> str: + return "/".join(strip_abs_prefix(abs_uri).split("/")[1:]) diff --git a/metadata-ingestion/src/datahub/ingestion/source/azure/azure_common.py b/metadata-ingestion/src/datahub/ingestion/source/azure/azure_common.py new file mode 100644 index 00000000000000..46de4e09d7ee5b --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/azure/azure_common.py @@ -0,0 +1,98 @@ +from typing import Dict, Optional, Union + +from azure.identity import ClientSecretCredential +from azure.storage.blob import BlobServiceClient +from azure.storage.filedatalake import DataLakeServiceClient, FileSystemClient +from pydantic import Field, root_validator + +from datahub.configuration import ConfigModel +from datahub.configuration.common import ConfigurationError + + +class AzureConnectionConfig(ConfigModel): + """ + Common Azure credentials config. + + https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-directory-file-acl-python + """ + + base_path: str = Field( + default="/", + description="Base folder in hierarchical namespaces to start from.", + ) + container_name: str = Field( + description="Azure storage account container name.", + ) + account_name: str = Field( + description="Name of the Azure storage account. See [Microsoft official documentation on how to create a storage account.](https://docs.microsoft.com/en-us/azure/storage/blobs/create-data-lake-storage-account)", + ) + account_key: Optional[str] = Field( + description="Azure storage account access key that can be used as a credential. **An account key, a SAS token or a client secret is required for authentication.**", + default=None, + ) + sas_token: Optional[str] = Field( + description="Azure storage account Shared Access Signature (SAS) token that can be used as a credential. **An account key, a SAS token or a client secret is required for authentication.**", + default=None, + ) + client_secret: Optional[str] = Field( + description="Azure client secret that can be used as a credential. **An account key, a SAS token or a client secret is required for authentication.**", + default=None, + ) + client_id: Optional[str] = Field( + description="Azure client (Application) ID required when a `client_secret` is used as a credential.", + default=None, + ) + tenant_id: Optional[str] = Field( + description="Azure tenant (Directory) ID required when a `client_secret` is used as a credential.", + default=None, + ) + + def get_abfss_url(self, folder_path: str = "") -> str: + if not folder_path.startswith("/"): + folder_path = f"/{folder_path}" + return f"abfss://{self.container_name}@{self.account_name}.dfs.core.windows.net{folder_path}" + + # TODO DEX-1010 + def get_filesystem_client(self) -> FileSystemClient: + return self.get_data_lake_service_client().get_file_system_client( + file_system=self.container_name + ) + + def get_blob_service_client(self): + return BlobServiceClient( + account_url=f"https://{self.account_name}.blob.core.windows.net", + credential=f"{self.get_credentials()}", + ) + + def get_data_lake_service_client(self) -> DataLakeServiceClient: + return DataLakeServiceClient( + account_url=f"https://{self.account_name}.dfs.core.windows.net", + credential=f"{self.get_credentials()}", + ) + + def get_credentials( + self, + ) -> Union[Optional[str], ClientSecretCredential]: + if self.client_id and self.client_secret and self.tenant_id: + return ClientSecretCredential( + tenant_id=self.tenant_id, + client_id=self.client_id, + client_secret=self.client_secret, + ) + return self.sas_token if self.sas_token is not None else self.account_key + + @root_validator() + def _check_credential_values(cls, values: Dict) -> Dict: + if ( + values.get("account_key") + or values.get("sas_token") + or ( + values.get("client_id") + and values.get("client_secret") + and values.get("tenant_id") + ) + ): + return values + raise ConfigurationError( + "credentials missing, requires one combination of account_key or sas_token or (client_id and client_secret and tenant_id)" + ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py b/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py index 84547efe37a62e..0d9fc8225532c9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py +++ b/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py @@ -35,6 +35,7 @@ class DatasetContainerSubTypes(str, Enum): FOLDER = "Folder" S3_BUCKET = "S3 bucket" GCS_BUCKET = "GCS bucket" + ABS_CONTAINER = "ABS container" class BIContainerSubTypes(str, Enum): diff --git a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/data_lake_utils.py b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/data_lake_utils.py index 5393dd4835d8c1..2ebdd2b4126bbd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/data_lake_utils.py +++ b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/data_lake_utils.py @@ -16,6 +16,12 @@ get_s3_prefix, is_s3_uri, ) +from datahub.ingestion.source.azure.abs_util import ( + get_abs_prefix, + get_container_name, + get_container_relative_path, + is_abs_uri, +) from datahub.ingestion.source.common.subtypes import DatasetContainerSubTypes from datahub.ingestion.source.gcs.gcs_utils import ( get_gcs_bucket_name, @@ -29,6 +35,7 @@ PLATFORM_S3 = "s3" PLATFORM_GCS = "gcs" +PLATFORM_ABS = "abs" class ContainerWUCreator: @@ -85,6 +92,8 @@ def get_protocol(path: str) -> str: protocol = get_s3_prefix(path) elif is_gcs_uri(path): protocol = get_gcs_prefix(path) + elif is_abs_uri(path): + protocol = get_abs_prefix(path) if protocol: return protocol @@ -99,7 +108,25 @@ def get_bucket_name(path: str) -> str: return get_bucket_name(path) elif is_gcs_uri(path): return get_gcs_bucket_name(path) - raise ValueError(f"Unable to get get bucket name form path: {path}") + elif is_abs_uri(path): + return get_container_name(path) + raise ValueError(f"Unable to get bucket name from path: {path}") + + def get_sub_types(self) -> str: + if self.platform == PLATFORM_S3: + return DatasetContainerSubTypes.S3_BUCKET + elif self.platform == PLATFORM_GCS: + return DatasetContainerSubTypes.GCS_BUCKET + elif self.platform == PLATFORM_ABS: + return DatasetContainerSubTypes.ABS_CONTAINER + raise ValueError(f"Unable to sub type for platform: {self.platform}") + + def get_base_full_path(self, path: str) -> str: + if self.platform == "s3" or self.platform == "gcs": + return get_bucket_relative_path(path) + elif self.platform == "abs": + return get_container_relative_path(path) + raise ValueError(f"Unable to get base full path from path: {path}") def create_container_hierarchy( self, path: str, dataset_urn: str @@ -107,22 +134,18 @@ def create_container_hierarchy( logger.debug(f"Creating containers for {dataset_urn}") base_full_path = path parent_key = None - if self.platform in (PLATFORM_S3, PLATFORM_GCS): + if self.platform in (PLATFORM_S3, PLATFORM_GCS, PLATFORM_ABS): bucket_name = self.get_bucket_name(path) bucket_key = self.gen_bucket_key(bucket_name) yield from self.create_emit_containers( container_key=bucket_key, name=bucket_name, - sub_types=[ - DatasetContainerSubTypes.S3_BUCKET - if self.platform == "s3" - else DatasetContainerSubTypes.GCS_BUCKET - ], + sub_types=[self.get_sub_types()], parent_container_key=None, ) parent_key = bucket_key - base_full_path = get_bucket_relative_path(path) + base_full_path = self.get_base_full_path(path) parent_folder_path = ( base_full_path[: base_full_path.rfind("/")] diff --git a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py index 7a807bde2ed0ae..e21cdac1edf754 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py +++ b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py @@ -11,6 +11,7 @@ from datahub.configuration.common import ConfigModel from datahub.ingestion.source.aws.s3_util import is_s3_uri +from datahub.ingestion.source.azure.abs_util import is_abs_uri from datahub.ingestion.source.gcs.gcs_utils import is_gcs_uri # hide annoying debug errors from py4j @@ -107,7 +108,7 @@ def dir_allowed(self, path: str) -> bool: # glob_include = self.glob_include.rsplit("/", 1)[0] glob_include = self.glob_include - for i in range(slash_to_remove_from_glob): + for _ in range(slash_to_remove_from_glob): glob_include = glob_include.rsplit("/", 1)[0] logger.debug(f"Checking dir to inclusion: {path}") @@ -169,7 +170,8 @@ def validate_default_extension(cls, v): def turn_off_sampling_for_non_s3(cls, v, values): is_s3 = is_s3_uri(values.get("include") or "") is_gcs = is_gcs_uri(values.get("include") or "") - if not is_s3 and not is_gcs: + is_abs = is_abs_uri(values.get("include") or "") + if not is_s3 and not is_gcs and not is_abs: # Sampling only makes sense on s3 and gcs currently v = False return v @@ -213,6 +215,10 @@ def is_s3(self): def is_gcs(self): return is_gcs_uri(self.include) + @cached_property + def is_abs(self): + return is_abs_uri(self.include) + @cached_property def compiled_include(self): parsable_include = PathSpec.get_parsable_include(self.include)