Skip to content

Commit

Permalink
Implement DataHub Catalogue client for dp-catalogue library (#2902)
Browse files Browse the repository at this point in the history
Generalise CatalogueClient and add datahub implementation

- Convert CatalogueClient class into ABC base, so OMD and DataHub client classes can be inherited
- implement DataHubCatalogueClient using DataHub gms
- rename all `create_or_update_x` methods to `upsert_x`
- add `create_domain` and `create_or_update_data_product` methods to DataHubCatalogueClient class
- update `DataHubCatalogueClient.create_or_update_table` method to create domain and data product if they don't exist but are passed as `data_product_metadata`
- associate tables with data products when created in DataHub
  • Loading branch information
tom-webber authored Jan 19, 2024
1 parent 3c88978 commit 8bb6e68
Show file tree
Hide file tree
Showing 22 changed files with 2,673 additions and 146 deletions.
4 changes: 4 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ trim_trailing_whitespace = true

[**/*.drawio]
insert_final_newline = unset

[**/tests/snapshots/*.json]
end_of_line = unset
insert_final_newline = unset
18 changes: 18 additions & 0 deletions python-libraries/data-platform-catalogue/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Breaking changes

- Changed `database_fqn`, `schema_fqn`, etc to a more generic
`location: DataLocation` argument on all methods. This captures information
about where a node in the metadata graph should be located, and what kind
of database it comes from.

- Extracted `BaseCatalogueClient` base class from `CatalogueClient`. Use this
as a type annotation to avoid coupling to the OpenMetadata implementation.

- Renamed the existing `CatalogueClient` implementation to
`OpenMetadataCatalogueClient`.

### Added

- Added `DataHubCatalogueClient` to support DataHub's GMS as the catalogue
implementation.

## [0.3.1] 2023-11-13

- Updated to OpenMetadata 1.2
72 changes: 39 additions & 33 deletions python-libraries/data-platform-catalogue/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

This library is part of the Ministry of Justice data platform.

It provides functionality to publish object metadata to the OpenMetadata data catalogue
so that data products are discoverable.
It publishes object metadata to a data catalogue, so that the
metadata can be made discoverable by consumers.

Broadly speaking, a catalogue stores a _metadata graph_, consisting of
_data assets_. Data assets could be **tables**, **schemas** or **databases**.

## How to install

Expand All @@ -13,30 +16,27 @@ To install the package using `pip`, run:
pip install ministryofjustice-data-platform-catalogue
```

## Topology

- Each moj data product is mapped to a database in the OpenMetadata catalogue
- We populate the schema level in openmetdata with a generic entry of `Tables`
- Each table is mapped to a table in openmetadata
## Terminology

![Topology diagram](./diagram.png)
- **Data assets** - Any databases, tables, or schemas within the metadata graph
- **Data products** - Groupings of data assets that are published for
reuse across MOJ. In the data platform, the concepts of database and data
product are similar, but they may be represented as different entities in the
catalogue.
- **Domains** - allow metadata to be grouped into different service areas that have
their own governance, like HMCTS, HMPPS, OPG, etc.

## Example usage

```python
from data_platform_catalogue import (
CatalogueClient, CatalogueMetadata,
DataHubCatalogueClient,
BaseCatalogueClient, DataLocation, CatalogueMetadata,
DataProductMetadata, TableMetadata,
CatalogueError
)

client = CatalogueClient(
jwt_token="***",
api_uri="https://catalogue.apps-tools.development.data-platform.service.justice.gov.uk/api"
)

assert client.is_healthy()

client: BaseCatalogueClient = DataHubCatalogueClient(jwt_token=jwt_token, api_url=api_url)

data_product = DataProductMetadata(
name = "my_data_product",
Expand All @@ -45,18 +45,7 @@ data_product = DataProductMetadata(
owner = "7804c127-d677-4900-82f9-83517e51bb94",
email = "[email protected]",
retention_period_in_days = 365,
domain = "legal-aid",
dpia_required = False
)

data_product_schema = DataProductMetadata(
name = "Tables",
description = "All the tables contained within my_data_product",
version = "v1.0.0",
owner = "7804c127-d677-4900-82f9-83517e51bb94",
email = "[email protected]",
retention_period_in_days = 365,
domain = "legal-aid",
domain = "HMCTS",
dpia_required = False
)

Expand All @@ -67,14 +56,31 @@ table = TableMetadata(
{"name": "foo", "type": "string", "description": "a"},
{"name": "bar", "type": "int", "description": "b"},
],
retention_period_in_days = 365
retention_period_in_days = 365,
major_version = 1
)

try:
service_fqn = client.create_or_update_database_service(name="data_platform")
database_fqn = client.create_or_update_database(metadata=data_product, service_fqn=service_fqn)
schema_fqn = client.create_or_update_schema(metadata=data_product_schema, database_fqn=database_fqn)
table_fqn = client.create_or_update_table(metadata=table, schema_fqn=schema_fqn)
table_fqn = client.upsert_table(
metadata=table,
data_product_metadata=data_product,
location=DataLocation("test_data_product_v1"),
)
except CatalogueError:
print("oh no")
```

## Catalogue Implementations

### DataHub

- Each data product within the MOJ data platform is created as a data product entity
- Each table is created as a dataset in DataHub
- Tables that reside in the same athena database (data_product_v1) should
be placed within the same DataHub container.

## OpenMetadata

- Each MOJ data product is mapped to a database in the OpenMetadata catalogue
- We populate the schema level in openmetdata with a generic entry of `Tables`
- Each table is mapped to a table in openmetadata
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .client import CatalogueClient # noqa: F401
from .client import DataHubCatalogueClient # noqa: F401
from .client import OpenMetadataCatalogueClient # noqa: F401
from .client import CatalogueError, ReferencedEntityMissing # noqa: F401
from .entities import CatalogueMetadata # noqa: F401
from .entities import DataProductMetadata # noqa: F401
from .entities import TableMetadata # noqa: F401
from .entities import CatalogueMetadata, DataLocation, TableMetadata # noqa: F401
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .base import BaseCatalogueClient # noqa: F401
from .base import CatalogueError # noqa: F401
from .base import ReferencedEntityMissing # noqa: F401
from .datahub import DataHubCatalogueClient # noqa: F401
from .openmetadata import OpenMetadataCatalogueClient # noqa: F401
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import logging
from abc import ABC, abstractmethod

from ..entities import (
CatalogueMetadata,
DataLocation,
DataProductMetadata,
TableMetadata,
)

logger = logging.getLogger(__name__)


class CatalogueError(Exception):
"""
Base class for all errors.
"""


class ReferencedEntityMissing(CatalogueError):
"""
A referenced entity (such as a user or tag) does not yet exist when
attempting to create a new metadata resource in the catalogue.
"""


class BaseCatalogueClient(ABC):
@abstractmethod
def upsert_database_service(
self, platform: str = "glue", display_name: str = "Data platform"
) -> str:
pass

@abstractmethod
def upsert_database(
self,
metadata: CatalogueMetadata | DataProductMetadata,
location: DataLocation,
) -> str:
pass

@abstractmethod
def upsert_schema(
self, metadata: DataProductMetadata, location: DataLocation
) -> str:
pass

@abstractmethod
def upsert_table(
self,
metadata: TableMetadata,
location: DataLocation,
data_product_metadata: DataProductMetadata | None = None,
) -> str:
pass
Loading

0 comments on commit 8bb6e68

Please sign in to comment.