Skip to content

Commit

Permalink
chore(structured-properties): add cli validation for entity types (da…
Browse files Browse the repository at this point in the history
  • Loading branch information
shirshanka authored Nov 16, 2024
1 parent 17c9fcf commit 3e128f4
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
entity_types:
- dataset # or urn:li:entityType:datahub.dataset
- dataFlow
description: "Retention Time is used to figure out how long to retain records in a dataset"
description: 'Retention Time is used to figure out how long to retain records in a dataset'
allowed_values:
- value: 30
description: 30 days, usually reserved for datasets that are ephemeral and contain pii
Expand All @@ -18,7 +18,7 @@
- id: io.acryl.dataManagement.replicationSLA
type: number
display_name: Replication SLA
description: "SLA for how long data can be delayed before replicating to the destination cluster"
description: 'SLA for how long data can be delayed before replicating to the destination cluster'
entity_types:
- dataset
- id: io.acryl.dataManagement.deprecationDate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import logging
from contextlib import contextmanager
from enum import Enum
from pathlib import Path
from typing import List, Optional
from typing import Generator, List, Optional

import yaml
from pydantic import validator
Expand All @@ -20,6 +21,28 @@
logger = logging.getLogger(__name__)


class StructuredPropertiesConfig:
"""Configuration class to hold the graph client"""

_graph: Optional[DataHubGraph] = None

@classmethod
@contextmanager
def use_graph(cls, graph: DataHubGraph) -> Generator[None, None, None]:
"""Context manager to temporarily set a custom graph"""
previous_graph = cls._graph
cls._graph = graph
try:
yield
finally:
cls._graph = previous_graph

@classmethod
def get_graph(cls) -> DataHubGraph:
"""Get the current graph, falling back to default if none set"""
return cls._graph if cls._graph is not None else get_default_graph()


class AllowedTypes(Enum):
STRING = "string"
RICH_TEXT = "rich_text"
Expand All @@ -41,25 +64,28 @@ class AllowedValue(ConfigModel):
description: Optional[str] = None


VALID_ENTITY_TYPES_PREFIX_STRING = ", ".join(
[
f"urn:li:entityType:datahub.{x}"
for x in ["dataset", "dashboard", "dataFlow", "schemaField"]
]
)
VALID_ENTITY_TYPES_STRING = f"Valid entity type urns are {VALID_ENTITY_TYPES_PREFIX_STRING}, etc... Ensure that the entity type is valid."


class TypeQualifierAllowedTypes(ConfigModel):
allowed_types: List[str]

@validator("allowed_types")
@validator("allowed_types", each_item=True)
def validate_allowed_types(cls, v):
validated_entity_type_urns = []
if v:
with get_default_graph() as graph:
for et in v:
validated_urn = Urn.make_entity_type_urn(et)
if graph.exists(validated_urn):
validated_entity_type_urns.append(validated_urn)
else:
logger.warn(
f"Input {et} is not a valid entity type urn. Skipping."
)
v = validated_entity_type_urns
if not v:
logger.warn("No allowed_types given within type_qualifier.")
graph = StructuredPropertiesConfig.get_graph()
validated_urn = Urn.make_entity_type_urn(v)
if not graph.exists(validated_urn):
raise ValueError(
f"Input {v} is not a valid entity type urn. {VALID_ENTITY_TYPES_STRING}"
)
v = str(validated_urn)
return v


Expand All @@ -77,6 +103,18 @@ class StructuredProperties(ConfigModel):
type_qualifier: Optional[TypeQualifierAllowedTypes] = None
immutable: Optional[bool] = False

@validator("entity_types", each_item=True)
def validate_entity_types(cls, v):
if v:
graph = StructuredPropertiesConfig.get_graph()
validated_urn = Urn.make_entity_type_urn(v)
if not graph.exists(validated_urn):
raise ValueError(
f"Input {v} is not a valid entity type urn. {VALID_ENTITY_TYPES_STRING}"
)
v = str(validated_urn)
return v

@property
def fqn(self) -> str:
assert self.urn is not None
Expand All @@ -97,93 +135,99 @@ def urn_must_be_present(cls, v, values):
@staticmethod
def create(file: str, graph: Optional[DataHubGraph] = None) -> None:
emitter: DataHubGraph = graph if graph else get_default_graph()

with open(file) as fp:
structuredproperties: List[dict] = yaml.safe_load(fp)
for structuredproperty_raw in structuredproperties:
structuredproperty = StructuredProperties.parse_obj(
structuredproperty_raw
)
if not structuredproperty.type.islower():
structuredproperty.type = structuredproperty.type.lower()
logger.warn(
f"Structured property type should be lowercase. Updated to {structuredproperty.type}"
with StructuredPropertiesConfig.use_graph(emitter):
print("Using graph")
with open(file) as fp:
structuredproperties: List[dict] = yaml.safe_load(fp)
for structuredproperty_raw in structuredproperties:
structuredproperty = StructuredProperties.parse_obj(
structuredproperty_raw
)
if not AllowedTypes.check_allowed_type(structuredproperty.type):
raise ValueError(
f"Type {structuredproperty.type} is not allowed. Allowed types are {AllowedTypes.values()}"
)
mcp = MetadataChangeProposalWrapper(
entityUrn=structuredproperty.urn,
aspect=StructuredPropertyDefinitionClass(
qualifiedName=structuredproperty.fqn,
valueType=Urn.make_data_type_urn(structuredproperty.type),
displayName=structuredproperty.display_name,
description=structuredproperty.description,
entityTypes=[
Urn.make_entity_type_urn(entity_type)
for entity_type in structuredproperty.entity_types or []
],
cardinality=structuredproperty.cardinality,
immutable=structuredproperty.immutable,
allowedValues=(
[
PropertyValueClass(
value=v.value, description=v.description
)
for v in structuredproperty.allowed_values
]
if structuredproperty.allowed_values
else None
),
typeQualifier=(
{
"allowedTypes": structuredproperty.type_qualifier.allowed_types
}
if structuredproperty.type_qualifier
else None
if not structuredproperty.type.islower():
structuredproperty.type = structuredproperty.type.lower()
logger.warn(
f"Structured property type should be lowercase. Updated to {structuredproperty.type}"
)
if not AllowedTypes.check_allowed_type(structuredproperty.type):
raise ValueError(
f"Type {structuredproperty.type} is not allowed. Allowed types are {AllowedTypes.values()}"
)
mcp = MetadataChangeProposalWrapper(
entityUrn=structuredproperty.urn,
aspect=StructuredPropertyDefinitionClass(
qualifiedName=structuredproperty.fqn,
valueType=Urn.make_data_type_urn(structuredproperty.type),
displayName=structuredproperty.display_name,
description=structuredproperty.description,
entityTypes=[
Urn.make_entity_type_urn(entity_type)
for entity_type in structuredproperty.entity_types or []
],
cardinality=structuredproperty.cardinality,
immutable=structuredproperty.immutable,
allowedValues=(
[
PropertyValueClass(
value=v.value, description=v.description
)
for v in structuredproperty.allowed_values
]
if structuredproperty.allowed_values
else None
),
typeQualifier=(
{
"allowedTypes": structuredproperty.type_qualifier.allowed_types
}
if structuredproperty.type_qualifier
else None
),
),
),
)
emitter.emit_mcp(mcp)
)
emitter.emit_mcp(mcp)

logger.info(f"Created structured property {structuredproperty.urn}")
logger.info(f"Created structured property {structuredproperty.urn}")

@classmethod
def from_datahub(cls, graph: DataHubGraph, urn: str) -> "StructuredProperties":

structured_property: Optional[
StructuredPropertyDefinitionClass
] = graph.get_aspect(urn, StructuredPropertyDefinitionClass)
if structured_property is None:
raise Exception(
"StructuredPropertyDefinition aspect is None. Unable to create structured property."
with StructuredPropertiesConfig.use_graph(graph):
structured_property: Optional[
StructuredPropertyDefinitionClass
] = graph.get_aspect(urn, StructuredPropertyDefinitionClass)
if structured_property is None:
raise Exception(
"StructuredPropertyDefinition aspect is None. Unable to create structured property."
)
return StructuredProperties(
urn=urn,
qualified_name=structured_property.qualifiedName,
display_name=structured_property.displayName,
type=structured_property.valueType,
description=structured_property.description,
entity_types=structured_property.entityTypes,
cardinality=structured_property.cardinality,
allowed_values=(
[
AllowedValue(
value=av.value,
description=av.description,
)
for av in structured_property.allowedValues or []
]
if structured_property.allowedValues is not None
else None
),
type_qualifier=(
{
"allowed_types": structured_property.typeQualifier.get(
"allowedTypes"
)
}
if structured_property.typeQualifier
else None
),
)
return StructuredProperties(
urn=urn,
qualified_name=structured_property.qualifiedName,
display_name=structured_property.displayName,
type=structured_property.valueType,
description=structured_property.description,
entity_types=structured_property.entityTypes,
cardinality=structured_property.cardinality,
allowed_values=(
[
AllowedValue(
value=av.value,
description=av.description,
)
for av in structured_property.allowedValues or []
]
if structured_property.allowedValues is not None
else None
),
type_qualifier=(
{"allowed_types": structured_property.typeQualifier.get("allowedTypes")}
if structured_property.typeQualifier
else None
),
)

def to_yaml(
self,
Expand Down
6 changes: 6 additions & 0 deletions smoke-test/tests/structured_properties/bad_entity_type.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
- id: clusterTypeBad
type: STRING
display_name: Cluster's type
description: 'Test Cluster Type Property'
entity_types:
- urn:li:entityType:dataset # should fail because this is not a valid entity type
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,24 @@ def test_structured_property_schema_field(ingest_cleanup_data, graph_client):
raise e


def test_structured_properties_yaml_load_with_bad_entity_type(
ingest_cleanup_data, graph_client
):
try:
StructuredProperties.create(
"tests/structured_properties/bad_entity_type.yaml",
graph=graph_client,
)
raise AssertionError(
"Should not be able to create structured properties with bad entity type"
)
except Exception as e:
if "urn:li:entityType:dataset is not a valid entity type urn" in str(e):
pass
else:
raise e


def test_dataset_yaml_loader(ingest_cleanup_data, graph_client):
StructuredProperties.create(
"tests/structured_properties/test_structured_properties.yaml",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
- id: clusterType
type: STRING
display_name: Cluster's type
description: "Test Cluster Type Property"
description: 'Test Cluster Type Property'
entity_types:
- dataset
- id: clusterName
type: STRING
display_name: Cluster's name
description: "Test Cluster Name Property"
description: 'Test Cluster Name Property'
entity_types:
- dataset
- id: projectNames
type: STRING
cardinality: MULTIPLE
display_name: Project Name
entity_types:
- dataset # or urn:li:logicalEntity:metamodel.datahub.dataset
- dataflow
description: "Test property for project name"
- dataset # or urn:li:entityType:datahub.dataset
- dataFlow
description: 'Test property for project name'
allowed_values:
- value: Tracking
description: test value 1 for project
Expand Down

0 comments on commit 3e128f4

Please sign in to comment.