Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli/delete): add delete by urn file option #12200

Closed
wants to merge 6 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 42 additions & 32 deletions metadata-ingestion/src/datahub/cli/delete_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ def undo_by_filter(
type=str,
help="Urn of the entity to delete, for single entity deletion",
)
@click.option(
"--urn-file",
required=True,
help="Absolute path of file with urns (one per line) to be deleted",
)
@click.option(
"-a",
"--aspect",
Expand Down Expand Up @@ -353,6 +358,7 @@ def undo_by_filter(
@telemetry.with_telemetry()
def by_filter(
urn: Optional[str],
urn_file: Optional[str],
aspect: Optional[str],
force: bool,
soft: bool,
Expand All @@ -373,11 +379,11 @@ def by_filter(
# Validate the cli arguments.
_validate_user_urn_and_filters(
urn=urn,
urn_file=urn_file,
entity_type=entity_type,
platform=platform,
env=env,
query=query,
recursive=recursive,
)
soft_delete_filter = _validate_user_soft_delete_flags(
soft=soft, aspect=aspect, only_soft_deleted=only_soft_deleted
Expand Down Expand Up @@ -407,11 +413,28 @@ def by_filter(
logger.info(f"Using {graph}")

# Determine which urns to delete.
delete_by_urn = bool(urn) and not recursive
if urn:
urns = [urn]

if recursive:
elif urn_file:
with open(urn_file, "r") as r:
urns = []
for line in r.readlines():
urn = line.strip().strip('"')
urns.append(urn)
else:
urns = list(
graph.get_urns_by_filter(
entity_types=[entity_type] if entity_type else None,
platform=platform,
env=env,
query=query,
status=soft_delete_filter,
batch_size=batch_size,
)
)
if recursive:
_validate_recursive_delete(urns)
for urn in urns:
# Add children urns to the list.
if guess_entity_type(urn) == "dataPlatformInstance":
urns.extend(
Expand All @@ -429,23 +452,13 @@ def by_filter(
batch_size=batch_size,
)
)
else:
urns = list(
graph.get_urns_by_filter(
entity_types=[entity_type] if entity_type else None,
platform=platform,
env=env,
query=query,
status=soft_delete_filter,
batch_size=batch_size,
)
if len(urns) == 0:
click.echo(
"Found no urns to delete. Maybe you want to change your filters to be something different?"
)
if len(urns) == 0:
click.echo(
"Found no urns to delete. Maybe you want to change your filters to be something different?"
)
return
return

delete_by_urn = len(urns) == 1
# Print out a summary of the urns to be deleted and confirm with the user.
if not delete_by_urn:
urns_by_type: Dict[str, List[str]] = {}
Expand Down Expand Up @@ -537,21 +550,21 @@ def process_urn(urn):

def _validate_user_urn_and_filters(
urn: Optional[str],
urn_file: Optional[str],
entity_type: Optional[str],
platform: Optional[str],
env: Optional[str],
query: Optional[str],
recursive: bool,
) -> None:
# Check urn / filters options.
if urn:
if entity_type or platform or env or query:
raise click.UsageError(
"You cannot provide both an urn and a filter rule (entity-type / platform / env / query)."
)
elif not urn and not (entity_type or platform or env or query):
elif not urn and not urn_file and not (entity_type or platform or env or query):
raise click.UsageError(
"You must provide either an urn or at least one filter (entity-type / platform / env / query) in order to delete entities."
"You must provide either an urn or urn file path or at least one filter (entity-type / platform / env / query) in order to delete entities."
)
elif query:
logger.warning(
Expand All @@ -562,20 +575,17 @@ def _validate_user_urn_and_filters(
f"Using --env without other filters will delete all metadata in the {env} environment. Please use with caution."
)

# Check recursive flag.
if recursive:
if not urn:
raise click.UsageError(
"The --recursive flag can only be used with a single urn."

def _validate_recursive_delete(urns: List[str]) -> None:
for urn in urns:
if guess_entity_type(urn) in _RECURSIVE_DELETE_TYPES:
logger.warning(
f"This will only delete {urn}. Use --recursive to delete all contained entities."
)
elif guess_entity_type(urn) not in _RECURSIVE_DELETE_TYPES:
else:
raise click.UsageError(
f"The --recursive flag can only be used with these entity types: {_RECURSIVE_DELETE_TYPES}."
)
elif urn and guess_entity_type(urn) in _RECURSIVE_DELETE_TYPES:
logger.warning(
f"This will only delete {urn}. Use --recursive to delete all contained entities."
)


def _validate_user_soft_delete_flags(
Expand Down
Loading