Skip to content

Commit

Permalink
feat(ingest): add stateful ingestion support for file source (#11804)
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate authored Nov 8, 2024
1 parent 8ca8fd9 commit 84c6776
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 12 deletions.
5 changes: 4 additions & 1 deletion metadata-ingestion/src/datahub/cli/json_file.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import logging

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.file import GenericFileSource

logger = logging.getLogger(__name__)


def check_mce_file(filepath: str) -> str:
mce_source = GenericFileSource.create({"filename": filepath}, None)
mce_source = GenericFileSource.create(
{"filename": filepath}, PipelineContext(run_id="json-file")
)
for _ in mce_source.get_workunits():
pass
if len(mce_source.get_report().failures):
Expand Down
36 changes: 28 additions & 8 deletions metadata-ingestion/src/datahub/ingestion/source/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pydantic import validator
from pydantic.fields import Field

from datahub.configuration.common import ConfigEnum, ConfigModel
from datahub.configuration.common import ConfigEnum
from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.emitter.mcp import MetadataChangeProposalWrapper
Expand All @@ -26,14 +26,25 @@
from datahub.ingestion.api.source import (
CapabilityReport,
MetadataWorkUnitProcessor,
SourceReport,
TestableSource,
TestConnectionReport,
)
from datahub.ingestion.api.source_helpers import auto_workunit_reporter
from datahub.ingestion.api.source_helpers import (
auto_status_aspect,
auto_workunit_reporter,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.fs.fs_base import FileInfo, get_path_schema
from datahub.ingestion.fs.fs_registry import fs_registry
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
StaleEntityRemovalSourceReport,
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
StatefulIngestionSourceBase,
)
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
MetadataChangeProposal,
Expand All @@ -49,7 +60,7 @@ class FileReadMode(ConfigEnum):
AUTO = auto()


class FileSourceConfig(ConfigModel):
class FileSourceConfig(StatefulIngestionConfigBase):
_filename = pydantic_field_deprecated(
"filename",
message="filename is deprecated. Use path instead.",
Expand Down Expand Up @@ -88,6 +99,8 @@ class FileSourceConfig(ConfigModel):
"filename", "path", print_warning=False
)

stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None

@validator("file_extension", always=True)
def add_leading_dot_to_extension(cls, v: str) -> str:
if v:
Expand All @@ -99,7 +112,7 @@ def add_leading_dot_to_extension(cls, v: str) -> str:


@dataclass
class FileSourceReport(SourceReport):
class FileSourceReport(StaleEntityRemovalSourceReport):
total_num_files: int = 0
num_files_completed: int = 0
files_completed: list = field(default_factory=list)
Expand Down Expand Up @@ -174,17 +187,18 @@ def compute_stats(self) -> None:
@platform_name("Metadata File")
@config_class(FileSourceConfig)
@support_status(SupportStatus.CERTIFIED)
class GenericFileSource(TestableSource):
class GenericFileSource(StatefulIngestionSourceBase, TestableSource):
"""
This plugin pulls metadata from a previously generated file.
The [metadata file sink](../../../../metadata-ingestion/sink_docs/metadata-file.md) can produce such files, and a number of
samples are included in the [examples/mce_files](../../../../metadata-ingestion/examples/mce_files) directory.
"""

def __init__(self, ctx: PipelineContext, config: FileSourceConfig):
super().__init__(config, ctx)
self.ctx = ctx
self.config = config
self.report = FileSourceReport()
self.report: FileSourceReport = FileSourceReport()

@classmethod
def create(cls, config_dict, ctx):
Expand All @@ -204,7 +218,13 @@ def get_filenames(self) -> Iterable[FileInfo]:

def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
# No super() call, as we don't want helpers that create / remove workunits
return [partial(auto_workunit_reporter, self.report)]
return [
partial(auto_workunit_reporter, self.report),
auto_status_aspect if self.config.stateful_ingestion else None,
StaleEntityRemovalHandler.create(
self, self.config, self.ctx
).workunit_processor,
]

def get_workunits_internal(
self,
Expand Down
78 changes: 78 additions & 0 deletions metadata-ingestion/tests/integration/file/metadata_file.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"name": "TABLE_1",
"qualifiedName": "TEST_DB.TEST_SCHEMA.TABLE_1",
"description": "Comment for Table",
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2023_12_18-10_16_09",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"name": "TABLE_2",
"qualifiedName": "TEST_DB.TEST_SCHEMA.TABLE_2",
"description": "Comment for Table",
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2023_12_18-10_16_09",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_3,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"name": "TABLE_3",
"qualifiedName": "TEST_DB.TEST_SCHEMA.TABLE_3",
"description": "Comment for Table",
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2023_12_18-10_16_09",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_4,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"name": "TABLE_4",
"qualifiedName": "TEST_DB.TEST_SCHEMA.TABLE_4",
"description": "Comment for Table",
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2023_12_18-10_16_09",
"lastRunId": "no-run-id-provided"
}
}
]
26 changes: 26 additions & 0 deletions metadata-ingestion/tests/integration/file/state_golden.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(file,dummy_stateful,prod),default_stale_entity_removal)",
"changeType": "UPSERT",
"aspectName": "datahubIngestionCheckpoint",
"aspect": {
"json": {
"timestampMillis": 1730898142020,
"partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"pipelineName": "dummy_stateful",
"platformInstanceId": "",
"config": "",
"state": {
"formatVersion": "1.0",
"serde": "utf-8",
"payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_3,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_4,PROD)\"]}"
},
"runId": "test-run"
}
}
}
]
49 changes: 49 additions & 0 deletions metadata-ingestion/tests/integration/file/test_file_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import pathlib
from unittest import mock

from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
from tests.test_helpers import mce_helpers


def test_stateful_ingestion(tmp_path, pytestconfig):
state_file_name = "state.json"
golden_state_file_name = "state_golden.json"

test_resources_dir: pathlib.Path = pytestconfig.rootpath / "tests/integration/file"
pipeline_config = {
"run_id": "test-run",
"pipeline_name": "dummy_stateful",
"source": {
"type": "file",
"config": {
"filename": str(test_resources_dir / "metadata_file.json"),
"stateful_ingestion": {
"enabled": True,
"remove_stale_metadata": True,
"state_provider": {
"type": "file",
"config": {
"filename": f"{tmp_path}/{state_file_name}",
},
},
},
},
},
"sink": {
"type": "blackhole",
"config": {},
},
}
with mock.patch(
"datahub.ingestion.source.state.stale_entity_removal_handler.StaleEntityRemovalHandler._get_state_obj"
) as mock_state:
mock_state.return_value = GenericCheckpointState(serde="utf-8")
pipeline = Pipeline.create(pipeline_config)
pipeline.run()

mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / state_file_name,
golden_path=f"{test_resources_dir}/{golden_state_file_name}",
)
4 changes: 1 addition & 3 deletions metadata-ingestion/tests/unit/serde/test_serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ def test_serde_to_avro(
# In this test, we want to read in from JSON -> MCE object.
# Next we serialize from MCE to Avro and then deserialize back to MCE.
# Finally, we want to compare the two MCE objects.
with patch(
"datahub.ingestion.api.common.PipelineContext", autospec=True
) as mock_pipeline_context:
with patch("datahub.ingestion.api.common.PipelineContext") as mock_pipeline_context:
json_path = pytestconfig.rootpath / json_filename
source = GenericFileSource(
ctx=mock_pipeline_context, config=FileSourceConfig(path=str(json_path))
Expand Down

0 comments on commit 84c6776

Please sign in to comment.