Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
llance authored Jan 16, 2025
2 parents fc49916 + bfe9758 commit 61c7f97
Show file tree
Hide file tree
Showing 27 changed files with 522 additions and 206 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ project.ext.externalDependency = [
'junitJupiterEngine': "org.junit.jupiter:junit-jupiter-engine:$junitJupiterVersion",
// avro-serde includes dependencies for `kafka-avro-serializer` `kafka-schema-registry-client` and `avro`
'kafkaAvroSerde': "io.confluent:kafka-streams-avro-serde:$kafkaVersion",
'kafkaAvroSerializer': 'io.confluent:kafka-avro-serializer:5.1.4',
'kafkaAvroSerializer': "io.confluent:kafka-avro-serializer:$kafkaVersion",
'kafkaClients': "org.apache.kafka:kafka-clients:$kafkaVersion-ccs",
'snappy': 'org.xerial.snappy:snappy-java:1.1.10.5',
'logbackClassic': "ch.qos.logback:logback-classic:$logbackClassic",
Expand Down
8 changes: 4 additions & 4 deletions docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,17 @@ datahub ingest -c ./examples/recipes/example_to_datahub_rest.dhub.yaml --dry-run
datahub ingest -c ./examples/recipes/example_to_datahub_rest.dhub.yaml -n
```

#### ingest --list-source-runs
#### ingest list-source-runs

The `--list-source-runs` option of the `ingest` command lists the previous runs, displaying their run ID, source name,
The `list-source-runs` option of the `ingest` command lists the previous runs, displaying their run ID, source name,
start time, status, and source URN. This command allows you to filter results using the --urn option for URN-based
filtering or the --source option to filter by source name (partial or complete matches are supported).

```shell
# List all ingestion runs
datahub ingest --list-source-runs
datahub ingest list-source-runs
# Filter runs by a source name containing "demo"
datahub ingest --list-source-runs --source "demo"
datahub ingest list-source-runs --source "demo"
```

#### ingest --preview
Expand Down
6 changes: 3 additions & 3 deletions metadata-ingestion/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,15 @@ task lint(type: Exec, dependsOn: installDev) {
commandLine 'bash', '-c',
"source ${venv_name}/bin/activate && set -x && " +
"black --check --diff src/ tests/ examples/ && " +
"isort --check --diff src/ tests/ examples/ && " +
"flake8 --count --statistics src/ tests/ examples/ && " +
"ruff check src/ tests/ examples/ && " +
"mypy --show-traceback --show-error-codes src/ tests/ examples/"
}

task lintFix(type: Exec, dependsOn: installDev) {
commandLine 'bash', '-c',
"source ${venv_name}/bin/activate && set -x && " +
"black src/ tests/ examples/ && " +
"isort src/ tests/ examples/"
"ruff check --fix src/ tests/ examples/"
}

def pytest_default_env = "PYTHONDEVMODE=1"
Expand Down
5 changes: 2 additions & 3 deletions metadata-ingestion/developing.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,12 @@ The architecture of this metadata ingestion framework is heavily inspired by [Ap

## Code style

We use black, isort, flake8, and mypy to ensure consistent code style and quality.
We use black, ruff, and mypy to ensure consistent code style and quality.

```shell
# Assumes: pip install -e '.[dev]' and venv is activated
black src/ tests/
isort src/ tests/
flake8 src/ tests/
ruff check src/ tests/
mypy src/ tests/
```

Expand Down
74 changes: 65 additions & 9 deletions metadata-ingestion/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,24 @@ extend-exclude = '''
^/tmp
'''
include = '\.pyi?$'
target-version = ['py37', 'py38', 'py39', 'py310']
target-version = ['py38', 'py39', 'py310', 'py311']

[tool.isort]
combine_as_imports = true
indent = ' '
known_future_library = ['__future__', 'datahub.utilities._markupsafe_compat', 'datahub.sql_parsing._sqlglot_patch']
profile = 'black'
sections = 'FUTURE,STDLIB,THIRDPARTY,FIRSTPARTY,LOCALFOLDER'
skip_glob = 'src/datahub/metadata'
[tool.ruff.lint.isort]
combine-as-imports = true
known-first-party = ["datahub"]
extra-standard-library = ["__future__", "datahub.utilities._markupsafe_compat", "datahub.sql_parsing._sqlglot_patch"]
section-order = ["future", "standard-library", "third-party", "first-party", "local-folder"]
force-sort-within-sections = false
force-wrap-aliases = false
split-on-trailing-comma = false
order-by-type = true
relative-imports-order = "closest-to-furthest"
force-single-line = false
single-line-exclusions = ["typing"]
length-sort = false
from-first = false
required-imports = []
classes = ["typing"]

[tool.pyright]
extraPaths = ['tests']
Expand All @@ -26,6 +35,53 @@ extraPaths = ['tests']
exclude = ["src/datahub/metadata/"]
ignore_decorators = ["@click.*", "@validator", "@root_validator", "@pydantic.validator", "@pydantic.root_validator", "@pytest.fixture"]
ignore_names = ["*Source", "*Sink", "*Report"]
# min_confidence = 80
paths = ["src"]
sort_by_size = true

[tool.ruff]
# Same as Black.
line-length = 88
# Exclude directories matching these patterns.
exclude = [
".git",
"src/datahub/metadata",
"venv",
".tox",
"__pycache__",
]

[tool.ruff.lint]
select = [
"B",
"C90",
"E",
"F",
"I", # For isort
"TID",
]
ignore = [
# Ignore line length violations (handled by Black)
"E501",
# Ignore whitespace before ':' (matches Black)
"E203",
# Allow usages of functools.lru_cache
"B019",
# Allow function call in argument defaults
"B008",
# TODO: Enable these later
"B006", # Mutable args
"B007", # Unused loop control variable
"B017", # Do not assert blind exception
"B904", # Checks for raise statements in exception handlers that lack a from clause
]

[tool.ruff.lint.mccabe]
max-complexity = 20

[tool.ruff.lint.flake8-tidy-imports]
# Disallow all relative imports.
ban-relative-imports = "all"


[tool.ruff.lint.per-file-ignores]
"__init__.py" = ["F401"]
36 changes: 0 additions & 36 deletions metadata-ingestion/setup.cfg
Original file line number Diff line number Diff line change
@@ -1,39 +1,3 @@
[flake8]
max-complexity = 20
ignore =
# Ignore: line length issues, since black's formatter will take care of them.
E501,
# Ignore compound statements, since they're used for ellipsis by black
# See https://github.com/psf/black/issues/3887
E704,
# Ignore: 1 blank line required before class docstring.
D203,
# See https://stackoverflow.com/a/57074416.
W503,
# See https://github.com/psf/black/issues/315.
E203,
# Allow usages of functools.lru_cache.
B019,
# This rule flags the use of function calls in argument defaults.
# There's some good reasons to do this, so we're ok with it.
B008,
# TODO: However, we should enable B006 to catch issues with mutable args.
B006,
# TODO: Enable B007 - unused loop control variable.
B007
# TODO: Enable B902 - require self/cls naming.
# TODO: Enable B904 - use raise from in except clauses.
exclude =
.git,
src/datahub/metadata,
venv,
.tox,
__pycache__
per-file-ignores =
# imported but unused
__init__.py: F401, I250
ban-relative-imports = true

[mypy]
plugins =
./tests/test_helpers/sqlalchemy_mypy_plugin.py,
Expand Down
5 changes: 1 addition & 4 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,10 +593,7 @@
# This is pinned only to avoid spurious errors in CI.
# We should make an effort to keep it up to date.
"black==23.3.0",
"flake8>=6.0.0",
"flake8-tidy-imports>=4.3.0",
"flake8-bugbear==23.3.12",
"isort>=5.7.0",
"ruff==0.9.1",
"mypy==1.10.1",
}

Expand Down
40 changes: 25 additions & 15 deletions metadata-ingestion/src/datahub/cli/ingest_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,15 +507,11 @@ def list_source_runs(page_offset: int, page_size: int, urn: str, source: str) ->
click.echo("No response received from the server.")
return

# when urn or source filter does not match, exit gracefully
if (
not isinstance(data.get("data"), dict)
or "listIngestionSources" not in data["data"]
):
click.echo("No matching ingestion sources found. Please check your filters.")
return
# a lot of responses can be null if there's errors in the run
ingestion_sources = (
data.get("data", {}).get("listIngestionSources", {}).get("ingestionSources", [])
)

ingestion_sources = data["data"]["listIngestionSources"]["ingestionSources"]
if not ingestion_sources:
click.echo("No ingestion sources or executions found.")
return
Expand All @@ -526,18 +522,32 @@ def list_source_runs(page_offset: int, page_size: int, urn: str, source: str) ->
name = ingestion_source.get("name", "N/A")

executions = ingestion_source.get("executions", {}).get("executionRequests", [])

for execution in executions:
if execution is None:
continue

execution_id = execution.get("id", "N/A")
start_time = execution.get("result", {}).get("startTimeMs", "N/A")
start_time = (
datetime.fromtimestamp(start_time / 1000).strftime("%Y-%m-%d %H:%M:%S")
if start_time != "N/A"
else "N/A"
)
status = execution.get("result", {}).get("status", "N/A")
result = execution.get("result") or {}
status = result.get("status", "N/A")

try:
start_time = (
datetime.fromtimestamp(
result.get("startTimeMs", 0) / 1000
).strftime("%Y-%m-%d %H:%M:%S")
if status != "DUPLICATE" and result.get("startTimeMs") is not None
else "N/A"
)
except (TypeError, ValueError):
start_time = "N/A"

rows.append([execution_id, name, start_time, status, urn])

if not rows:
click.echo("No execution data found.")
return

click.echo(
tabulate(
rows,
Expand Down
6 changes: 6 additions & 0 deletions metadata-ingestion/src/datahub/entrypoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@

MAX_CONTENT_WIDTH = 120

if sys.version_info >= (3, 12):
click.secho(
"Python versions above 3.11 are not tested with. Please use Python 3.11.",
fg="red",
)


@click.group(
context_settings=dict(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
"globalSettingsKey",
"globalSettingsInfo",
"testResults",
"dataHubExecutionRequestKey",
"dataHubExecutionRequestInput",
"dataHubExecutionRequestSignal",
"dataHubExecutionRequestResult",
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ def revoke_expired_tokens(self) -> None:
tokens = list_access_tokens.get("tokens", [])
total = list_access_tokens.get("total", 0)
if tokens == []:
# Due to a server bug we cannot rely on just total
break
for token in tokens:
self.report.expired_tokens_revoked += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class SoftDeletedEntitiesCleanupConfig(ConfigModel):

@dataclass
class SoftDeletedEntitiesReport(SourceReport):
num_calls_made: Dict[str, int] = field(default_factory=dict)
num_entities_found: Dict[str, int] = field(default_factory=dict)
num_soft_deleted_entity_processed: int = 0
num_soft_deleted_retained_due_to_age: int = 0
Expand Down Expand Up @@ -242,6 +243,11 @@ def _get_soft_deleted(self, graphql_query: str, entity_type: str) -> Iterable[st

while True:
try:
if entity_type not in self.report.num_calls_made:
self.report.num_calls_made[entity_type] = 1
else:
self.report.num_calls_made[entity_type] += 1
self._print_report()
result = self.ctx.graph.execute_graphql(
graphql_query,
{
Expand Down Expand Up @@ -270,7 +276,13 @@ def _get_soft_deleted(self, graphql_query: str, entity_type: str) -> Iterable[st
)
break
scroll_across_entities = result.get("scrollAcrossEntities")
if not scroll_across_entities or not scroll_across_entities.get("count"):
if not scroll_across_entities:
break
search_results = scroll_across_entities.get("searchResults")
count = scroll_across_entities.get("count")
if not count or not search_results:
# Due to a server bug we cannot rely on just count as it was returning response like this
# {'count': 1, 'nextScrollId': None, 'searchResults': []}
break
if entity_type == "DATA_PROCESS_INSTANCE":
# Temp workaround. See note in beginning of the function
Expand All @@ -282,7 +294,7 @@ def _get_soft_deleted(self, graphql_query: str, entity_type: str) -> Iterable[st
self.report.num_entities_found[entity_type] += scroll_across_entities.get(
"count"
)
for query in scroll_across_entities.get("searchResults"):
for query in search_results:
yield query["entity"]["urn"]

def _get_urns(self) -> Iterable[str]:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from datahub.utilities._markupsafe_compat import MARKUPSAFE_PATCHED

import collections
import concurrent.futures
import contextlib
Expand All @@ -12,6 +10,7 @@
import traceback
import unittest.mock
import uuid
from datahub.utilities._markupsafe_compat import MARKUPSAFE_PATCHED
from functools import lru_cache
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -267,7 +266,6 @@ def _is_single_row_query_method(query: Any) -> bool:
"get_column_max",
"get_column_mean",
"get_column_stdev",
"get_column_stdev",
"get_column_nonnull_count",
"get_column_unique_count",
}
Expand Down
10 changes: 5 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -893,11 +893,11 @@ def normalize_mode_query(self, query: str) -> str:
jinja_params[key] = parameters[key].get("default", "")

normalized_query = re.sub(
r"{% form %}(.*){% endform %}",
"",
query,
0,
re.MULTILINE | re.DOTALL,
pattern=r"{% form %}(.*){% endform %}",
repl="",
string=query,
count=0,
flags=re.MULTILINE | re.DOTALL,
)

# Wherever we don't resolve the jinja params, we replace it with NULL
Expand Down
Loading

0 comments on commit 61c7f97

Please sign in to comment.