diff --git a/metadata-ingestion/src/datahub/emitter/rest_emitter.py b/metadata-ingestion/src/datahub/emitter/rest_emitter.py index 04242c8bf45d2..7c67349c74db1 100644 --- a/metadata-ingestion/src/datahub/emitter/rest_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/rest_emitter.py @@ -13,6 +13,7 @@ from datahub import nice_version_name from datahub.cli import config_utils from datahub.cli.cli_utils import ensure_has_system_metadata, fixup_gms_url +from datahub.cli.env_utils import get_boolean_env_variable from datahub.configuration.common import ConfigurationError, OperationalError from datahub.emitter.generic_emitter import Emitter from datahub.emitter.mcp import MetadataChangeProposalWrapper @@ -46,6 +47,8 @@ os.getenv("DATAHUB_REST_EMITTER_DEFAULT_RETRY_MAX_TIMES", "4") ) +_DATAHUB_EMITTER_TRACE = get_boolean_env_variable("DATAHUB_EMITTER_TRACE", False) + # The limit is 16mb. We will use a max of 15mb to have some space # for overhead like request headers. # This applies to pretty much all calls to GMS. @@ -291,7 +294,8 @@ def emit_mcps( mcps: Sequence[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]], async_flag: Optional[bool] = None, ) -> int: - logger.debug("Attempting to emit batch mcps") + if _DATAHUB_EMITTER_TRACE: + logger.debug(f"Attempting to emit MCP batch of size {len(mcps)}") url = f"{self._gms_server}/aspects?action=ingestProposalBatch" for mcp in mcps: ensure_has_system_metadata(mcp) @@ -304,22 +308,25 @@ def emit_mcps( current_chunk_size = INGEST_MAX_PAYLOAD_BYTES for mcp_obj in mcp_objs: mcp_obj_size = len(json.dumps(mcp_obj)) - logger.debug( - f"Iterating through object with size {mcp_obj_size} (type: {mcp_obj.get('aspectName')}" - ) + if _DATAHUB_EMITTER_TRACE: + logger.debug( + f"Iterating through object with size {mcp_obj_size} (type: {mcp_obj.get('aspectName')}" + ) if ( mcp_obj_size + current_chunk_size > INGEST_MAX_PAYLOAD_BYTES or len(mcp_obj_chunks[-1]) >= BATCH_INGEST_MAX_PAYLOAD_LENGTH ): - logger.debug("Decided to create new chunk") + if _DATAHUB_EMITTER_TRACE: + logger.debug("Decided to create new chunk") mcp_obj_chunks.append([]) current_chunk_size = 0 mcp_obj_chunks[-1].append(mcp_obj) current_chunk_size += mcp_obj_size - logger.debug( - f"Decided to send {len(mcps)} mcps in {len(mcp_obj_chunks)} chunks" - ) + if len(mcp_obj_chunks) > 0: + logger.debug( + f"Decided to send {len(mcps)} MCP batch in {len(mcp_obj_chunks)} chunks" + ) for mcp_obj_chunk in mcp_obj_chunks: # TODO: We're calling json.dumps on each MCP object twice, once to estimate diff --git a/metadata-ingestion/src/datahub/ingestion/api/auto_work_units/auto_ensure_aspect_size.py b/metadata-ingestion/src/datahub/ingestion/api/auto_work_units/auto_ensure_aspect_size.py index 559f0b77f59df..b63c96b617ff0 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/auto_work_units/auto_ensure_aspect_size.py +++ b/metadata-ingestion/src/datahub/ingestion/api/auto_work_units/auto_ensure_aspect_size.py @@ -1,10 +1,9 @@ import json import logging -from typing import Iterable, List +from typing import TYPE_CHECKING, Iterable, List from datahub.emitter.rest_emitter import INGEST_MAX_PAYLOAD_BYTES from datahub.emitter.serialization_helper import pre_json_transform -from datahub.ingestion.api.source import SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.metadata.schema_classes import ( DatasetProfileClass, @@ -12,12 +11,15 @@ SchemaMetadataClass, ) +if TYPE_CHECKING: + from datahub.ingestion.api.source import SourceReport + logger = logging.getLogger(__name__) class EnsureAspectSizeProcessor: def __init__( - self, report: SourceReport, payload_constraint: int = INGEST_MAX_PAYLOAD_BYTES + self, report: "SourceReport", payload_constraint: int = INGEST_MAX_PAYLOAD_BYTES ): self.report = report self.payload_constraint = payload_constraint diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index c3638635b19aa..75dc980e234ac 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -31,6 +31,9 @@ from datahub.ingestion.api.auto_work_units.auto_dataset_properties_aspect import ( auto_patch_last_modified, ) +from datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size import ( + EnsureAspectSizeProcessor, +) from datahub.ingestion.api.closeable import Closeable from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit from datahub.ingestion.api.report import Report @@ -450,6 +453,7 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: browse_path_processor, partial(auto_workunit_reporter, self.get_report()), auto_patch_last_modified, + EnsureAspectSizeProcessor(self.get_report()).ensure_aspect_size, ] @staticmethod diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index 7bfa7fdb28aaf..9d9a746580f93 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -26,9 +26,6 @@ gen_containers, ) from datahub.emitter.sql_parsing_builder import SqlParsingBuilder -from datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size import ( - EnsureAspectSizeProcessor, -) from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.decorators import ( SupportStatus, @@ -263,7 +260,6 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: StaleEntityRemovalHandler.create( self, self.config, self.ctx ).workunit_processor, - EnsureAspectSizeProcessor(self.get_report()).ensure_aspect_size, ] def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: