Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): enable EnsureAspectSizeProcessor for all sources #12262

Merged
merged 2 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions metadata-ingestion/src/datahub/emitter/rest_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from datahub import nice_version_name
from datahub.cli import config_utils
from datahub.cli.cli_utils import ensure_has_system_metadata, fixup_gms_url
from datahub.cli.env_utils import get_boolean_env_variable
from datahub.configuration.common import ConfigurationError, OperationalError
from datahub.emitter.generic_emitter import Emitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
Expand Down Expand Up @@ -46,6 +47,8 @@
os.getenv("DATAHUB_REST_EMITTER_DEFAULT_RETRY_MAX_TIMES", "4")
)

_DATAHUB_EMITTER_TRACE = get_boolean_env_variable("DATAHUB_EMITTER_TRACE", False)

# The limit is 16mb. We will use a max of 15mb to have some space
# for overhead like request headers.
# This applies to pretty much all calls to GMS.
Expand Down Expand Up @@ -291,7 +294,8 @@
mcps: Sequence[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]],
async_flag: Optional[bool] = None,
) -> int:
logger.debug("Attempting to emit batch mcps")
if _DATAHUB_EMITTER_TRACE:
logger.debug(f"Attempting to emit MCP batch of size {len(mcps)}")

Check warning on line 298 in metadata-ingestion/src/datahub/emitter/rest_emitter.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/emitter/rest_emitter.py#L298

Added line #L298 was not covered by tests
Comment on lines +297 to +298
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could remove such repeated conditionals, how about simply adding new function to the logging, i.e. in logging_manager.py:

def trace(self: logging.Logger, message: str, *args: Any, **kwargs: Any):
    env = f'TRACE_{self.name.replace(".", "__").upper()}'
    if get_boolean_env_variable(env, False):
        self._log(logging.DEBUG, message, args, **kwargs)


logging.Logger.trace = trace

Then here we would be only calling logger.trace(...). the variable to be exported in this case would be:

export TRACE_DATAHUB__EMITTER__REST_EMITTER=true

It would be quite universal piece of code

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately mypy really doesn't like this type of patching

I agree with you that the code duplication isn't ideal, but it's better than having type: ignore everywhere. If there's an alternative that doesn't require that, I'm all ears

url = f"{self._gms_server}/aspects?action=ingestProposalBatch"
for mcp in mcps:
ensure_has_system_metadata(mcp)
Expand All @@ -304,22 +308,25 @@
current_chunk_size = INGEST_MAX_PAYLOAD_BYTES
for mcp_obj in mcp_objs:
mcp_obj_size = len(json.dumps(mcp_obj))
logger.debug(
f"Iterating through object with size {mcp_obj_size} (type: {mcp_obj.get('aspectName')}"
)
if _DATAHUB_EMITTER_TRACE:
logger.debug(

Check warning on line 312 in metadata-ingestion/src/datahub/emitter/rest_emitter.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/emitter/rest_emitter.py#L312

Added line #L312 was not covered by tests
f"Iterating through object with size {mcp_obj_size} (type: {mcp_obj.get('aspectName')}"
)

if (
mcp_obj_size + current_chunk_size > INGEST_MAX_PAYLOAD_BYTES
or len(mcp_obj_chunks[-1]) >= BATCH_INGEST_MAX_PAYLOAD_LENGTH
):
logger.debug("Decided to create new chunk")
if _DATAHUB_EMITTER_TRACE:
logger.debug("Decided to create new chunk")

Check warning on line 321 in metadata-ingestion/src/datahub/emitter/rest_emitter.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/emitter/rest_emitter.py#L321

Added line #L321 was not covered by tests
mcp_obj_chunks.append([])
current_chunk_size = 0
mcp_obj_chunks[-1].append(mcp_obj)
current_chunk_size += mcp_obj_size
logger.debug(
f"Decided to send {len(mcps)} mcps in {len(mcp_obj_chunks)} chunks"
)
if len(mcp_obj_chunks) > 0:
logger.debug(
f"Decided to send {len(mcps)} MCP batch in {len(mcp_obj_chunks)} chunks"
)

for mcp_obj_chunk in mcp_obj_chunks:
# TODO: We're calling json.dumps on each MCP object twice, once to estimate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
import json
import logging
from typing import Iterable, List
from typing import TYPE_CHECKING, Iterable, List

from datahub.emitter.rest_emitter import INGEST_MAX_PAYLOAD_BYTES
from datahub.emitter.serialization_helper import pre_json_transform
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.schema_classes import (
DatasetProfileClass,
SchemaFieldClass,
SchemaMetadataClass,
)

if TYPE_CHECKING:
from datahub.ingestion.api.source import SourceReport

logger = logging.getLogger(__name__)


class EnsureAspectSizeProcessor:
def __init__(
self, report: SourceReport, payload_constraint: int = INGEST_MAX_PAYLOAD_BYTES
self, report: "SourceReport", payload_constraint: int = INGEST_MAX_PAYLOAD_BYTES
):
self.report = report
self.payload_constraint = payload_constraint
Expand Down
4 changes: 4 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
from datahub.ingestion.api.auto_work_units.auto_dataset_properties_aspect import (
auto_patch_last_modified,
)
from datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size import (
EnsureAspectSizeProcessor,
)
from datahub.ingestion.api.closeable import Closeable
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit
from datahub.ingestion.api.report import Report
Expand Down Expand Up @@ -450,6 +453,7 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
browse_path_processor,
partial(auto_workunit_reporter, self.get_report()),
auto_patch_last_modified,
EnsureAspectSizeProcessor(self.get_report()).ensure_aspect_size,
]

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@
gen_containers,
)
from datahub.emitter.sql_parsing_builder import SqlParsingBuilder
from datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size import (
EnsureAspectSizeProcessor,
)
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SupportStatus,
Expand Down Expand Up @@ -263,7 +260,6 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
StaleEntityRemovalHandler.create(
self, self.config, self.ctx
).workunit_processor,
EnsureAspectSizeProcessor(self.get_report()).ensure_aspect_size,
]

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
Expand Down
Loading