Skip to content

Commit

Permalink
feat(ingest): enable EnsureAspectSizeProcessor for all sources (#12262
Browse files Browse the repository at this point in the history
)
  • Loading branch information
hsheth2 authored Jan 6, 2025
1 parent fb471f1 commit ba8bf53
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 15 deletions.
23 changes: 15 additions & 8 deletions metadata-ingestion/src/datahub/emitter/rest_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from datahub import nice_version_name
from datahub.cli import config_utils
from datahub.cli.cli_utils import ensure_has_system_metadata, fixup_gms_url
from datahub.cli.env_utils import get_boolean_env_variable
from datahub.configuration.common import ConfigurationError, OperationalError
from datahub.emitter.generic_emitter import Emitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
Expand Down Expand Up @@ -46,6 +47,8 @@
os.getenv("DATAHUB_REST_EMITTER_DEFAULT_RETRY_MAX_TIMES", "4")
)

_DATAHUB_EMITTER_TRACE = get_boolean_env_variable("DATAHUB_EMITTER_TRACE", False)

# The limit is 16mb. We will use a max of 15mb to have some space
# for overhead like request headers.
# This applies to pretty much all calls to GMS.
Expand Down Expand Up @@ -291,7 +294,8 @@ def emit_mcps(
mcps: Sequence[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]],
async_flag: Optional[bool] = None,
) -> int:
logger.debug("Attempting to emit batch mcps")
if _DATAHUB_EMITTER_TRACE:
logger.debug(f"Attempting to emit MCP batch of size {len(mcps)}")
url = f"{self._gms_server}/aspects?action=ingestProposalBatch"
for mcp in mcps:
ensure_has_system_metadata(mcp)
Expand All @@ -304,22 +308,25 @@ def emit_mcps(
current_chunk_size = INGEST_MAX_PAYLOAD_BYTES
for mcp_obj in mcp_objs:
mcp_obj_size = len(json.dumps(mcp_obj))
logger.debug(
f"Iterating through object with size {mcp_obj_size} (type: {mcp_obj.get('aspectName')}"
)
if _DATAHUB_EMITTER_TRACE:
logger.debug(
f"Iterating through object with size {mcp_obj_size} (type: {mcp_obj.get('aspectName')}"
)

if (
mcp_obj_size + current_chunk_size > INGEST_MAX_PAYLOAD_BYTES
or len(mcp_obj_chunks[-1]) >= BATCH_INGEST_MAX_PAYLOAD_LENGTH
):
logger.debug("Decided to create new chunk")
if _DATAHUB_EMITTER_TRACE:
logger.debug("Decided to create new chunk")
mcp_obj_chunks.append([])
current_chunk_size = 0
mcp_obj_chunks[-1].append(mcp_obj)
current_chunk_size += mcp_obj_size
logger.debug(
f"Decided to send {len(mcps)} mcps in {len(mcp_obj_chunks)} chunks"
)
if len(mcp_obj_chunks) > 0:
logger.debug(
f"Decided to send {len(mcps)} MCP batch in {len(mcp_obj_chunks)} chunks"
)

for mcp_obj_chunk in mcp_obj_chunks:
# TODO: We're calling json.dumps on each MCP object twice, once to estimate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
import json
import logging
from typing import Iterable, List
from typing import TYPE_CHECKING, Iterable, List

from datahub.emitter.rest_emitter import INGEST_MAX_PAYLOAD_BYTES
from datahub.emitter.serialization_helper import pre_json_transform
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.schema_classes import (
DatasetProfileClass,
SchemaFieldClass,
SchemaMetadataClass,
)

if TYPE_CHECKING:
from datahub.ingestion.api.source import SourceReport

logger = logging.getLogger(__name__)


class EnsureAspectSizeProcessor:
def __init__(
self, report: SourceReport, payload_constraint: int = INGEST_MAX_PAYLOAD_BYTES
self, report: "SourceReport", payload_constraint: int = INGEST_MAX_PAYLOAD_BYTES
):
self.report = report
self.payload_constraint = payload_constraint
Expand Down
4 changes: 4 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
from datahub.ingestion.api.auto_work_units.auto_dataset_properties_aspect import (
auto_patch_last_modified,
)
from datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size import (
EnsureAspectSizeProcessor,
)
from datahub.ingestion.api.closeable import Closeable
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit
from datahub.ingestion.api.report import Report
Expand Down Expand Up @@ -450,6 +453,7 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
browse_path_processor,
partial(auto_workunit_reporter, self.get_report()),
auto_patch_last_modified,
EnsureAspectSizeProcessor(self.get_report()).ensure_aspect_size,
]

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@
gen_containers,
)
from datahub.emitter.sql_parsing_builder import SqlParsingBuilder
from datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size import (
EnsureAspectSizeProcessor,
)
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SupportStatus,
Expand Down Expand Up @@ -263,7 +260,6 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
StaleEntityRemovalHandler.create(
self, self.config, self.ctx
).workunit_processor,
EnsureAspectSizeProcessor(self.get_report()).ensure_aspect_size,
]

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
Expand Down

0 comments on commit ba8bf53

Please sign in to comment.