Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/s3): add table filtering #12661

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from pydantic.fields import Field
from wcmatch import pathlib

from datahub.configuration.common import ConfigModel
from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.ingestion.source.aws.s3_util import is_s3_uri
from datahub.ingestion.source.azure.abs_utils import is_abs_uri
from datahub.ingestion.source.gcs.gcs_utils import is_gcs_uri
Expand Down Expand Up @@ -145,6 +145,11 @@ class Config:
description="Include hidden folders in the traversal (folders starting with . or _",
)

tables_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns to filter tables for ingestion",
)

def is_path_hidden(self, path: str) -> bool:
# Split the path into directories and filename
dirs, filename = os.path.split(path)
Expand Down Expand Up @@ -177,6 +182,11 @@ def allowed(self, path: str, ignore_ext: bool = False) -> bool:
):
return False
logger.debug(f"{path} is not excluded")

if not self.tables_pattern.allowed(self._get_table_name(path) or ""):
return False
logger.debug(f"{path} is passed table name check")

ext = os.path.splitext(path)[1].strip(".")

if not ignore_ext:
Expand Down Expand Up @@ -218,6 +228,11 @@ def dir_allowed(self, path: str) -> bool:
exclude_path.rstrip("/"), flags=pathlib.GLOBSTAR
):
return False

if not self.tables_pattern.allowed(self._get_table_name(path) or ""):
return False
logger.debug(f"{path} is passed table name check")

return True

@classmethod
Expand Down Expand Up @@ -561,3 +576,15 @@ def extract_table_name_and_path(self, path: str) -> Tuple[str, str]:
"/".join(path.split("/")[:depth]) + "/" + parsed_vars.named["table"]
)
return self._extract_table_name(parsed_vars.named), table_path

def _get_table_name(self, path: str) -> Optional[str]:
if "{table}" not in self.include:
return None

table_idx = self.include.split("/").index("{table}")
path_items = path.rstrip("/").split("/")

if table_idx >= len(path_items):
raise ValueError(f"Table not found in path: {path}")

return path_items[table_idx]
10 changes: 10 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/s3/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,16 @@
for f in list_folders(
bucket_name, f"{folder}", self.source_config.aws_config
):
table_name = f.split("/")[-1]
if not path_spec.tables_pattern.allowed(table_name):
logger.debug(

Check warning on line 971 in metadata-ingestion/src/datahub/ingestion/source/s3/source.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/s3/source.py#L969-L971

Added lines #L969 - L971 were not covered by tests
f"Table '{table_name}' not allowed and skipping"
)
self.report.report_file_dropped(

Check warning on line 974 in metadata-ingestion/src/datahub/ingestion/source/s3/source.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/s3/source.py#L974

Added line #L974 was not covered by tests
self.create_s3_path(bucket_name, f)
)
continue

Check warning on line 977 in metadata-ingestion/src/datahub/ingestion/source/s3/source.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/s3/source.py#L977

Added line #L977 was not covered by tests

dirs_to_process = []
logger.info(f"Processing folder: {f}")
if path_spec.traversal_method == FolderTraversalMethod.ALL:
Expand Down
Loading
Loading