From 024d22bb1d4cbdc0ea06f23ce20560fc11b5adda Mon Sep 17 00:00:00 2001 From: Oscar SeungJun Park Date: Tue, 25 Feb 2025 23:57:24 +0900 Subject: [PATCH] feat(ingest/s3): add table filtering - add tables_pattern to path_spec - add table filtering to S3Source().s3_browser() --- .../source/data_lake_common/path_spec.py | 29 +- .../src/datahub/ingestion/source/s3/source.py | 10 + .../s3/golden_mces_allow_table.json | 641 +++++++++++++++++ .../s3/golden_mces_deny_table.json | 653 ++++++++++++++++++ .../s3/sources/s3/allow_table.json | 16 + .../integration/s3/sources/s3/deny_table.json | 16 + .../tests/unit/data_lake/test_path_spec.py | 81 +++ 7 files changed, 1445 insertions(+), 1 deletion(-) create mode 100644 metadata-ingestion/tests/integration/s3/golden-files/s3/golden_mces_allow_table.json create mode 100644 metadata-ingestion/tests/integration/s3/golden-files/s3/golden_mces_deny_table.json create mode 100644 metadata-ingestion/tests/integration/s3/sources/s3/allow_table.json create mode 100644 metadata-ingestion/tests/integration/s3/sources/s3/deny_table.json diff --git a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py index 10dd9e9e7e029a..711c3681597900 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py +++ b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py @@ -11,7 +11,7 @@ from pydantic.fields import Field from wcmatch import pathlib -from datahub.configuration.common import ConfigModel +from datahub.configuration.common import AllowDenyPattern, ConfigModel from datahub.ingestion.source.aws.s3_util import is_s3_uri from datahub.ingestion.source.azure.abs_utils import is_abs_uri from datahub.ingestion.source.gcs.gcs_utils import is_gcs_uri @@ -145,6 +145,11 @@ class Config: description="Include hidden folders in the traversal (folders starting with . or _", ) + tables_pattern: AllowDenyPattern = Field( + default=AllowDenyPattern.allow_all(), + description="Regex patterns to filter tables for ingestion", + ) + def is_path_hidden(self, path: str) -> bool: # Split the path into directories and filename dirs, filename = os.path.split(path) @@ -177,6 +182,11 @@ def allowed(self, path: str, ignore_ext: bool = False) -> bool: ): return False logger.debug(f"{path} is not excluded") + + if not self.tables_pattern.allowed(self._get_table_name(path) or ""): + return False + logger.debug(f"{path} is passed table name check") + ext = os.path.splitext(path)[1].strip(".") if not ignore_ext: @@ -218,6 +228,11 @@ def dir_allowed(self, path: str) -> bool: exclude_path.rstrip("/"), flags=pathlib.GLOBSTAR ): return False + + if not self.tables_pattern.allowed(self._get_table_name(path) or ""): + return False + logger.debug(f"{path} is passed table name check") + return True @classmethod @@ -561,3 +576,15 @@ def extract_table_name_and_path(self, path: str) -> Tuple[str, str]: "/".join(path.split("/")[:depth]) + "/" + parsed_vars.named["table"] ) return self._extract_table_name(parsed_vars.named), table_path + + def _get_table_name(self, path: str) -> Optional[str]: + if "{table}" not in self.include: + return None + + table_idx = self.include.split("/").index("{table}") + path_items = path.rstrip("/").split("/") + + if table_idx >= len(path_items): + raise ValueError(f"Table not found in path: {path}") + + return path_items[table_idx] diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index b1554ad127b7ac..856be07efa4f10 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -966,6 +966,16 @@ def s3_browser(self, path_spec: PathSpec, sample_size: int) -> Iterable[BrowsePa for f in list_folders( bucket_name, f"{folder}", self.source_config.aws_config ): + table_name = f.split("/")[-1] + if not path_spec.tables_pattern.allowed(table_name): + logger.debug( + f"Table '{table_name}' not allowed and skipping" + ) + self.report.report_file_dropped( + self.create_s3_path(bucket_name, f) + ) + continue + dirs_to_process = [] logger.info(f"Processing folder: {f}") if path_spec.traversal_method == FolderTraversalMethod.ALL: diff --git a/metadata-ingestion/tests/integration/s3/golden-files/s3/golden_mces_allow_table.json b/metadata-ingestion/tests/integration/s3/golden-files/s3/golden_mces_allow_table.json new file mode 100644 index 00000000000000..21b4f6d2622439 --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/golden-files/s3/golden_mces_allow_table.json @@ -0,0 +1,641 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,my-test-bucket/folder_a/folder_aa/folder_aaa/food_csv,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": { + "schema_inferred_from": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/food_csv/part3.csv" + }, + "name": "food_csv", + "description": "", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,my-test-bucket/folder_a/folder_aa/folder_aaa/food_csv,PROD)", + "changeType": "UPSERT", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "food_csv", + "platform": "urn:li:dataPlatform:s3", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.OtherSchema": { + "rawSchema": "" + } + }, + "fields": [ + { + "fieldPath": "name", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "weight", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "integer", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "height", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "integer", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "color", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,my-test-bucket/folder_a/folder_aa/folder_aaa/food_csv,PROD)", + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "json": { + "timestampMillis": 1615443388097, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "operationType": "UPDATE", + "lastUpdatedTimestamp": 1586847820000 + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:2151647ff17bde0f948909d19fa91b9b", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "s3", + "env": "PROD", + "bucket_name": "my-test-bucket" + }, + "name": "my-test-bucket", + "env": "PROD" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:2151647ff17bde0f948909d19fa91b9b", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:2151647ff17bde0f948909d19fa91b9b", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:s3" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:2151647ff17bde0f948909d19fa91b9b", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "S3 bucket" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:2151647ff17bde0f948909d19fa91b9b", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:a8aa32e8169b2ecc7ab4f3389c79124c", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:2151647ff17bde0f948909d19fa91b9b" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:a8aa32e8169b2ecc7ab4f3389c79124c", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "s3", + "env": "PROD", + "folder_abs_path": "my-test-bucket/folder_a" + }, + "name": "folder_a", + "env": "PROD" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:a8aa32e8169b2ecc7ab4f3389c79124c", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:a8aa32e8169b2ecc7ab4f3389c79124c", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:s3" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:a8aa32e8169b2ecc7ab4f3389c79124c", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Folder" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:a8aa32e8169b2ecc7ab4f3389c79124c", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:2151647ff17bde0f948909d19fa91b9b", + "urn": "urn:li:container:2151647ff17bde0f948909d19fa91b9b" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:4f62b9a3e6794ee2cd4160bc0bbd8e15", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:a8aa32e8169b2ecc7ab4f3389c79124c" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:4f62b9a3e6794ee2cd4160bc0bbd8e15", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "s3", + "env": "PROD", + "folder_abs_path": "my-test-bucket/folder_a/folder_aa" + }, + "name": "folder_aa", + "env": "PROD" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:4f62b9a3e6794ee2cd4160bc0bbd8e15", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:4f62b9a3e6794ee2cd4160bc0bbd8e15", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:s3" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:4f62b9a3e6794ee2cd4160bc0bbd8e15", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Folder" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:4f62b9a3e6794ee2cd4160bc0bbd8e15", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:2151647ff17bde0f948909d19fa91b9b", + "urn": "urn:li:container:2151647ff17bde0f948909d19fa91b9b" + }, + { + "id": "urn:li:container:a8aa32e8169b2ecc7ab4f3389c79124c", + "urn": "urn:li:container:a8aa32e8169b2ecc7ab4f3389c79124c" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:5abb7acbb8783b9e2d266c15bf7cebc0", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:4f62b9a3e6794ee2cd4160bc0bbd8e15" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:5abb7acbb8783b9e2d266c15bf7cebc0", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "s3", + "env": "PROD", + "folder_abs_path": "my-test-bucket/folder_a/folder_aa/folder_aaa" + }, + "name": "folder_aaa", + "env": "PROD" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:5abb7acbb8783b9e2d266c15bf7cebc0", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:5abb7acbb8783b9e2d266c15bf7cebc0", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:s3" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:5abb7acbb8783b9e2d266c15bf7cebc0", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Folder" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:5abb7acbb8783b9e2d266c15bf7cebc0", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:2151647ff17bde0f948909d19fa91b9b", + "urn": "urn:li:container:2151647ff17bde0f948909d19fa91b9b" + }, + { + "id": "urn:li:container:a8aa32e8169b2ecc7ab4f3389c79124c", + "urn": "urn:li:container:a8aa32e8169b2ecc7ab4f3389c79124c" + }, + { + "id": "urn:li:container:4f62b9a3e6794ee2cd4160bc0bbd8e15", + "urn": "urn:li:container:4f62b9a3e6794ee2cd4160bc0bbd8e15" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,my-test-bucket/folder_a/folder_aa/folder_aaa/food_csv,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:5abb7acbb8783b9e2d266c15bf7cebc0" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,my-test-bucket/folder_a/folder_aa/folder_aaa/food_csv,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,my-test-bucket/folder_a/folder_aa/folder_aaa/food_csv,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:2151647ff17bde0f948909d19fa91b9b", + "urn": "urn:li:container:2151647ff17bde0f948909d19fa91b9b" + }, + { + "id": "urn:li:container:a8aa32e8169b2ecc7ab4f3389c79124c", + "urn": "urn:li:container:a8aa32e8169b2ecc7ab4f3389c79124c" + }, + { + "id": "urn:li:container:4f62b9a3e6794ee2cd4160bc0bbd8e15", + "urn": "urn:li:container:4f62b9a3e6794ee2cd4160bc0bbd8e15" + }, + { + "id": "urn:li:container:5abb7acbb8783b9e2d266c15bf7cebc0", + "urn": "urn:li:container:5abb7acbb8783b9e2d266c15bf7cebc0" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,my-test-bucket/folder_a/folder_aa/folder_aaa/food_csv,PROD)", + "changeType": "PATCH", + "aspectName": "datasetProperties", + "aspect": { + "json": [ + { + "op": "add", + "path": "/lastModified", + "value": { + "time": 1586847820000 + } + } + ] + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "allow_table.json", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/s3/golden-files/s3/golden_mces_deny_table.json b/metadata-ingestion/tests/integration/s3/golden-files/s3/golden_mces_deny_table.json new file mode 100644 index 00000000000000..3a052576837fb8 --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/golden-files/s3/golden_mces_deny_table.json @@ -0,0 +1,653 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,my-test-bucket/folder_a/folder_aa/folder_aaa/food_parquet,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": { + "schema_inferred_from": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/food_parquet/part2.parquet" + }, + "name": "food_parquet", + "description": "", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,my-test-bucket/folder_a/folder_aa/folder_aaa/food_parquet,PROD)", + "changeType": "UPSERT", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "food_parquet", + "platform": "urn:li:dataPlatform:s3", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.OtherSchema": { + "rawSchema": "" + } + }, + "fields": [ + { + "fieldPath": "name", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "weight", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "int64", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "height", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "int64", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "color", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "healthy", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.BooleanType": {} + } + }, + "nativeDataType": "bool", + "recursive": false, + "isPartOfKey": false + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,my-test-bucket/folder_a/folder_aa/folder_aaa/food_parquet,PROD)", + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "json": { + "timestampMillis": 1615443388097, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "operationType": "UPDATE", + "lastUpdatedTimestamp": 1586847840000 + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:2151647ff17bde0f948909d19fa91b9b", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "s3", + "env": "PROD", + "bucket_name": "my-test-bucket" + }, + "name": "my-test-bucket", + "env": "PROD" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:2151647ff17bde0f948909d19fa91b9b", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:2151647ff17bde0f948909d19fa91b9b", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:s3" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:2151647ff17bde0f948909d19fa91b9b", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "S3 bucket" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:2151647ff17bde0f948909d19fa91b9b", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:a8aa32e8169b2ecc7ab4f3389c79124c", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:2151647ff17bde0f948909d19fa91b9b" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:a8aa32e8169b2ecc7ab4f3389c79124c", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "s3", + "env": "PROD", + "folder_abs_path": "my-test-bucket/folder_a" + }, + "name": "folder_a", + "env": "PROD" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:a8aa32e8169b2ecc7ab4f3389c79124c", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:a8aa32e8169b2ecc7ab4f3389c79124c", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:s3" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:a8aa32e8169b2ecc7ab4f3389c79124c", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Folder" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:a8aa32e8169b2ecc7ab4f3389c79124c", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:2151647ff17bde0f948909d19fa91b9b", + "urn": "urn:li:container:2151647ff17bde0f948909d19fa91b9b" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:4f62b9a3e6794ee2cd4160bc0bbd8e15", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:a8aa32e8169b2ecc7ab4f3389c79124c" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:4f62b9a3e6794ee2cd4160bc0bbd8e15", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "s3", + "env": "PROD", + "folder_abs_path": "my-test-bucket/folder_a/folder_aa" + }, + "name": "folder_aa", + "env": "PROD" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:4f62b9a3e6794ee2cd4160bc0bbd8e15", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:4f62b9a3e6794ee2cd4160bc0bbd8e15", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:s3" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:4f62b9a3e6794ee2cd4160bc0bbd8e15", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Folder" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:4f62b9a3e6794ee2cd4160bc0bbd8e15", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:2151647ff17bde0f948909d19fa91b9b", + "urn": "urn:li:container:2151647ff17bde0f948909d19fa91b9b" + }, + { + "id": "urn:li:container:a8aa32e8169b2ecc7ab4f3389c79124c", + "urn": "urn:li:container:a8aa32e8169b2ecc7ab4f3389c79124c" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:5abb7acbb8783b9e2d266c15bf7cebc0", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:4f62b9a3e6794ee2cd4160bc0bbd8e15" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:5abb7acbb8783b9e2d266c15bf7cebc0", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "s3", + "env": "PROD", + "folder_abs_path": "my-test-bucket/folder_a/folder_aa/folder_aaa" + }, + "name": "folder_aaa", + "env": "PROD" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:5abb7acbb8783b9e2d266c15bf7cebc0", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:5abb7acbb8783b9e2d266c15bf7cebc0", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:s3" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:5abb7acbb8783b9e2d266c15bf7cebc0", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Folder" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:5abb7acbb8783b9e2d266c15bf7cebc0", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:2151647ff17bde0f948909d19fa91b9b", + "urn": "urn:li:container:2151647ff17bde0f948909d19fa91b9b" + }, + { + "id": "urn:li:container:a8aa32e8169b2ecc7ab4f3389c79124c", + "urn": "urn:li:container:a8aa32e8169b2ecc7ab4f3389c79124c" + }, + { + "id": "urn:li:container:4f62b9a3e6794ee2cd4160bc0bbd8e15", + "urn": "urn:li:container:4f62b9a3e6794ee2cd4160bc0bbd8e15" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,my-test-bucket/folder_a/folder_aa/folder_aaa/food_parquet,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:5abb7acbb8783b9e2d266c15bf7cebc0" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,my-test-bucket/folder_a/folder_aa/folder_aaa/food_parquet,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,my-test-bucket/folder_a/folder_aa/folder_aaa/food_parquet,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:2151647ff17bde0f948909d19fa91b9b", + "urn": "urn:li:container:2151647ff17bde0f948909d19fa91b9b" + }, + { + "id": "urn:li:container:a8aa32e8169b2ecc7ab4f3389c79124c", + "urn": "urn:li:container:a8aa32e8169b2ecc7ab4f3389c79124c" + }, + { + "id": "urn:li:container:4f62b9a3e6794ee2cd4160bc0bbd8e15", + "urn": "urn:li:container:4f62b9a3e6794ee2cd4160bc0bbd8e15" + }, + { + "id": "urn:li:container:5abb7acbb8783b9e2d266c15bf7cebc0", + "urn": "urn:li:container:5abb7acbb8783b9e2d266c15bf7cebc0" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,my-test-bucket/folder_a/folder_aa/folder_aaa/food_parquet,PROD)", + "changeType": "PATCH", + "aspectName": "datasetProperties", + "aspect": { + "json": [ + { + "op": "add", + "path": "/lastModified", + "value": { + "time": 1586847840000 + } + } + ] + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "deny_table.json", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/s3/sources/s3/allow_table.json b/metadata-ingestion/tests/integration/s3/sources/s3/allow_table.json new file mode 100644 index 00000000000000..1e05c4c568b913 --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/s3/allow_table.json @@ -0,0 +1,16 @@ +{ + "type": "s3", + "config": { + "path_specs": [{ + "include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/{table}/*.*", + "tables_pattern": { + "allow": ["f.*csv"] + } + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + } + } +} diff --git a/metadata-ingestion/tests/integration/s3/sources/s3/deny_table.json b/metadata-ingestion/tests/integration/s3/sources/s3/deny_table.json new file mode 100644 index 00000000000000..2b724f8d910cf9 --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/s3/deny_table.json @@ -0,0 +1,16 @@ +{ + "type": "s3", + "config": { + "path_specs": [{ + "include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/{table}/*.*", + "tables_pattern": { + "deny": ["f.*csv"] + } + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + } + } +} diff --git a/metadata-ingestion/tests/unit/data_lake/test_path_spec.py b/metadata-ingestion/tests/unit/data_lake/test_path_spec.py index 305f4b72f5329d..6e8b92c9f44ab6 100644 --- a/metadata-ingestion/tests/unit/data_lake/test_path_spec.py +++ b/metadata-ingestion/tests/unit/data_lake/test_path_spec.py @@ -1,5 +1,8 @@ +from typing import Optional + import pytest +from datahub.configuration.common import AllowDenyPattern from datahub.ingestion.source.data_lake_common.path_spec import PathSpec @@ -29,3 +32,81 @@ def test_allowed_ignores_depth_mismatch( # act, assert assert path_spec.allowed(s3_uri) == expected + + +@pytest.mark.parametrize( + "s3_uri, expected", + [ + ("s3://bucket/table-111/p1/test.csv", True), + ("s3://bucket/table-222/p1/test.csv", False), + ], +) +def test_allowed_tables_pattern_allow(s3_uri: str, expected: bool) -> None: + # arrange + path_spec = PathSpec( + include="s3://bucket/{table}/{partition0}/*.csv", + tables_pattern=AllowDenyPattern(allow=["t.*111"]), + ) + + # act, assert + assert path_spec.allowed(s3_uri) == expected + + +@pytest.mark.parametrize( + "s3_uri, expected", + [ + ("s3://bucket/table-111/p1/", True), + ("s3://bucket/table-222/p1/", False), + ], +) +def test_dir_allowed_tables_pattern_allow(s3_uri: str, expected: bool) -> None: + # arrange + path_spec = PathSpec( + include="s3://bucket/{table}/{partition0}/*.csv", + tables_pattern=AllowDenyPattern(allow=["t.*111"]), + ) + + # act, assert + assert path_spec.dir_allowed(s3_uri) == expected + + +@pytest.mark.parametrize( + "include, s3_uri, expected", + [ + ( + "s3://bucket/{table}/{partition0}/*.csv", + "s3://bucket/table/p1/test.csv", + "table", + ), + ( + "s3://bucket/data1/{partition0}/test.csv", + "s3://bucket/data1/p1/test.csv", + None, + ), + ], +) +def test_get_table_name(include: str, s3_uri: str, expected: Optional[str]) -> None: + # arrange + path_spec = PathSpec( + include=include, + ) + + # act, assert + assert path_spec._get_table_name(s3_uri) == expected + + +@pytest.mark.parametrize( + "s3_uri", + [ + "s3://bucket/dir/", + "s3://bucket/dir", + ], +) +def test_get_table_name_raises_error_table_not_found(s3_uri: str) -> None: + # arrange + path_spec = PathSpec(include="s3://bucket/dir1/{table}/{partition0}/*.csv") + + # act, assert + with pytest.raises(ValueError) as e: + path_spec._get_table_name(s3_uri) + assert str(e.value) == f"Table not found in path: {s3_uri}"