Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Visualize Dataset statistics in metadata panel #1472

Merged
merged 40 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
f86ac6f
initial draft with hooks
ravi-kumar-pilla Jul 31, 2023
0370d56
modify test for formatFileSize
ravi-kumar-pilla Jul 31, 2023
e6e3898
add file size to the api response using fsspec
ravi-kumar-pilla Aug 1, 2023
6812664
update unit test for metadata panel
ravi-kumar-pilla Aug 2, 2023
ccafa98
remove print statements and update stats file
ravi-kumar-pilla Aug 2, 2023
a37459c
update get file size to not consider empty dir
ravi-kumar-pilla Aug 2, 2023
8c6965f
fixing linting and format errors
ravi-kumar-pilla Aug 2, 2023
83bf822
Merge branch 'main' of https://github.com/kedro-org/kedro-viz into fe…
ravi-kumar-pilla Aug 2, 2023
9466ea6
fix format and lint errors
ravi-kumar-pilla Aug 2, 2023
cf90243
fix pytest errors
ravi-kumar-pilla Aug 2, 2023
5a08448
add test cases and add fix for circle ci builds
ravi-kumar-pilla Aug 3, 2023
bb5a342
resolve PR comments
ravi-kumar-pilla Aug 4, 2023
9d66c92
fixing PR comments and add additional support for MemoryDataset
ravi-kumar-pilla Aug 4, 2023
3103894
update stats and modify file_size extraction
ravi-kumar-pilla Aug 4, 2023
bbbeb7d
fix lint and format errors
ravi-kumar-pilla Aug 4, 2023
72b8c74
fix lint errors
ravi-kumar-pilla Aug 4, 2023
36760b4
fix lint errors
ravi-kumar-pilla Aug 4, 2023
dd65977
fix lint errors
ravi-kumar-pilla Aug 4, 2023
86dd8ac
fix lint errors
ravi-kumar-pilla Aug 4, 2023
37f7059
fix lint errors
ravi-kumar-pilla Aug 4, 2023
3c231a0
Merge branch 'main' into feature/viz-size-datasets
tynandebold Aug 7, 2023
c3ff3e1
fix for PR comments
ravi-kumar-pilla Aug 8, 2023
5b1f7e4
add test coverage for transcoded data node
ravi-kumar-pilla Aug 8, 2023
f7a4dc1
address PR comments
ravi-kumar-pilla Aug 8, 2023
d84f01f
fix lint errors
ravi-kumar-pilla Aug 8, 2023
3b55684
modify test cases for hooks and utils
ravi-kumar-pilla Aug 9, 2023
18d9974
add matplotlib in requirements file for e2e tests
ravi-kumar-pilla Aug 9, 2023
beb1e5e
Merge branch 'main' into feature/viz-size-datasets
ravi-kumar-pilla Aug 9, 2023
4f3e77f
add design change for overflow
ravi-kumar-pilla Aug 10, 2023
b332dc2
Merge branch 'feature/viz-size-datasets' of https://github.com/kedro-…
ravi-kumar-pilla Aug 10, 2023
8ddfffe
add design change for overflow
ravi-kumar-pilla Aug 10, 2023
cf21083
remove matplotlib from requirements and fix metadata suggestions
ravi-kumar-pilla Aug 10, 2023
4213cf7
add release notes for visualizing dataset stats
ravi-kumar-pilla Aug 10, 2023
57c139b
add release notes for displaying dataset stats
ravi-kumar-pilla Aug 10, 2023
f917c22
hooks update based on Nok's comments
ravi-kumar-pilla Aug 11, 2023
7b88fc9
fix lint and format checks
ravi-kumar-pilla Aug 11, 2023
2d823da
modify stats based on Nok's comments
ravi-kumar-pilla Aug 11, 2023
381dfa4
fix lint and format
ravi-kumar-pilla Aug 11, 2023
6dd02ff
fixed failing unit test
ravi-kumar-pilla Aug 12, 2023
ac65d0d
update code based on Nok's suggestion
ravi-kumar-pilla Aug 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 33 additions & 11 deletions demo-project/stats.json
Original file line number Diff line number Diff line change
@@ -1,17 +1,39 @@
{
"companies": { "rows": 77096, "columns": 5 },
"reviews": { "rows": 77096, "columns": 10 },
"shuttles": { "rows": 77096, "columns": 13 },
"ingestion.int_typed_companies": { "rows": 77096, "columns": 5 },
"ingestion.int_typed_shuttles": { "rows": 77096, "columns": 13 },
"companies": { "rows": 77096, "columns": 5, "file_size": 1810602 },
"ingestion.int_typed_companies": {
"rows": 77096,
"columns": 5,
"file_size": 550616
},
"reviews": { "rows": 77096, "columns": 10, "file_size": 2937144 },
"ingestion.int_typed_reviews": {
"rows": 55790,
"columns": 11,
"file_size": 1335600
},
"shuttles": { "rows": 77096, "columns": 13, "file_size": 4195290 },
"ingestion.int_typed_shuttles": {
"rows": 77096,
"columns": 13,
"file_size": 1235685
},
"ingestion.prm_agg_companies": { "rows": 50098, "columns": 5 },
"ingestion.int_typed_reviews": { "rows": 55790, "columns": 11 },
"prm_spine_table": { "rows": 29768, "columns": 3 },
"prm_shuttle_company_reviews": { "rows": 29768, "columns": 27 },
"feature_engineering.feat_static_features": { "rows": 29768, "columns": 12 },
"prm_shuttle_company_reviews": {
"rows": 29768,
"columns": 27,
"file_size": 1020356
},
"prm_spine_table": { "rows": 29768, "columns": 3, "file_size": 655994 },
"feature_engineering.feat_derived_features": { "rows": 29768, "columns": 3 },
"feature_importance_output": { "rows": 15, "columns": 2 },
"model_input_table": { "rows": 29768, "columns": 12 },
"feature_importance_output": { "rows": 15, "columns": 2, "file_size": 460 },
"feature_engineering.feat_static_features": { "rows": 29768, "columns": 12 },
"ingestion.prm_spine_table_clone": { "rows": 29768, "columns": 3 },
"reporting.cancellation_policy_breakdown": {
"rows": 21,
"columns": 3,
"file_size": 8744
},
"model_input_table": { "rows": 29768, "columns": 12, "file_size": 787351 },
"X_train": { "rows": 23814, "columns": 11 },
"X_test": { "rows": 5954, "columns": 11 }
}
136 changes: 121 additions & 15 deletions package/kedro_viz/integrations/kedro/hooks.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
# pylint: disable=broad-exception-caught
# pylint: disable=broad-exception-caught, protected-access
"""`kedro_viz.integrations.kedro.hooks` defines hooks to add additional
functionalities for a kedro run."""

import json
import logging
from collections import defaultdict
from typing import Any
from typing import Any, Union

import pandas as pd
from kedro.framework.hooks import hook_impl

from kedro_viz.integrations.kedro.utils import get_stats_dataset_name, stats_order
from kedro.io import DataCatalog
from kedro.io.core import get_filepath_str
from kedro.pipeline.pipeline import TRANSCODING_SEPARATOR, _strip_transcoding

logger = logging.getLogger(__name__)

Expand All @@ -23,26 +23,41 @@ class DatasetStatsHook:
def __init__(self):
self._stats = defaultdict(dict)

@hook_impl
def after_catalog_created(self, catalog: DataCatalog):
"""Hooks to be invoked after a data catalog is created.

Args:
catalog: The catalog that was created.
"""

self.datasets = catalog._data_sets

@hook_impl
def after_dataset_loaded(self, dataset_name: str, data: Any):
"""Hook to be invoked after a dataset is loaded from the catalog.
Once the dataset is loaded, extract the required dataset statistics.
The hook currently supports (pd.DataFrame) dataset instances

Args:
dataset_name: name of the dataset that was loaded from the catalog.
data: the actual data that was loaded from the catalog.
"""

self.create_dataset_stats(dataset_name, data)

@hook_impl
def after_dataset_saved(self, dataset_name: str, data: Any):
"""Hook to be invoked after a dataset is saved to the catalog.
Once the dataset is saved, extract the required dataset statistics.
The hook currently supports (pd.DataFrame) dataset instances

Args:
dataset_name: name of the dataset that was saved to the catalog.
data: the actual data that was saved to the catalog.
"""
try:
stats_dataset_name = get_stats_dataset_name(dataset_name)
if isinstance(data, pd.DataFrame):
self._stats[stats_dataset_name]["rows"] = int(data.shape[0])
self._stats[stats_dataset_name]["columns"] = int(data.shape[1])

except Exception as exc: # pragma: no cover
logger.warning(
"Unable to create statistics for the dataset %s : %s", dataset_name, exc
)
self.create_dataset_stats(dataset_name, data)

@hook_impl
def after_pipeline_run(self):
Expand All @@ -54,7 +69,7 @@ def after_pipeline_run(self):
try:
with open("stats.json", "w", encoding="utf8") as file:
sorted_stats_data = {
dataset_name: stats_order(stats)
dataset_name: self.format_stats(stats)
for dataset_name, stats in self._stats.items()
}
json.dump(sorted_stats_data, file)
Expand All @@ -64,5 +79,96 @@ def after_pipeline_run(self):
"Unable to write dataset statistics for the pipeline: %s", exc
)

def create_dataset_stats(self, dataset_name: str, data: Any):
"""Helper method to create dataset statistics.
Currently supports (pd.DataFrame) dataset instances.

Args:
dataset_name: The dataset name for which we need the statistics
data: Actual data that is loaded/saved to the catalog

"""
try:
import pandas as pd # pylint: disable=import-outside-toplevel

stats_dataset_name = self.get_stats_dataset_name(dataset_name)

if isinstance(data, pd.DataFrame):
self._stats[stats_dataset_name]["rows"] = int(data.shape[0])
self._stats[stats_dataset_name]["columns"] = int(data.shape[1])

current_dataset = self.datasets.get(dataset_name, None)

if current_dataset:
self._stats[stats_dataset_name]["file_size"] = self.get_file_size(
current_dataset
)
Comment on lines +96 to +105

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good start ! However it only works with dataframes.

There are 2 additional cases that can be easily implemented :

  • In the case of a PartitionedDataSet that loads pd.DataFrame s
  • In the case of loading multiple sheets of an Excel file (load_args: sheet_name: None )

I think it's not too difficult to account for those cases by adding additional logic on data, and it would avoid just throwing N/A in the UI.

Do you see how to implement that ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @oulianov,

We will try to include more use cases for visualizing datasets in the future releases. Created #1511 for tracking. Please feel free to comment any additional cases here.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice ! thank you


except ImportError as exc: # pragma: no cover
logger.warning(
"Unable to import dependencies to extract dataset statistics for %s : %s",
dataset_name,
exc,
)
except Exception as exc: # pragma: no cover
logger.warning(
"[hook: after_dataset_saved] Unable to create statistics for the dataset %s : %s",
dataset_name,
exc,
)

def get_file_size(self, dataset: Any) -> Union[int, None]:
"""Helper method to return the file size of a dataset

Args:
dataset: A dataset instance for which we need the file size

Returns: file size for the dataset if file_path is valid, if not returns None
"""

try:
if not (hasattr(dataset, "_filepath") and dataset._filepath):
return None
ravi-kumar-pilla marked this conversation as resolved.
Show resolved Hide resolved

file_path = get_filepath_str(dataset._filepath, dataset._protocol)
return dataset._fs.size(file_path)

except Exception as exc:
logger.warning(
"Unable to get file size for the dataset %s: %s", dataset, exc
)
return None

def format_stats(self, stats: dict) -> dict:
"""Sort the stats extracted from the datasets using the sort order

Args:
stats: A dictionary of statistics for a dataset

Returns: A sorted dictionary based on the sort_order
"""
# Custom sort order
sort_order = ["rows", "columns", "file_size"]
return {stat: stats.get(stat) for stat in sort_order if stat in stats}

def get_stats_dataset_name(self, dataset_name: str) -> str:
"""Get the dataset name for assigning stat values in the dictionary.
If the dataset name contains transcoded information, strip the transcoding.

Args:
dataset_name: name of the dataset

Returns: Dataset name without any transcoding information
"""

stats_dataset_name = dataset_name

# Strip transcoding
is_transcoded_dataset = TRANSCODING_SEPARATOR in dataset_name
if is_transcoded_dataset:
stats_dataset_name = _strip_transcoding(dataset_name)

return stats_dataset_name


dataset_stats_hook = DatasetStatsHook()
37 changes: 0 additions & 37 deletions package/kedro_viz/integrations/kedro/utils.py

This file was deleted.

4 changes: 1 addition & 3 deletions package/kedro_viz/models/flowchart.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from kedro.pipeline.node import Node as KedroNode
from kedro.pipeline.pipeline import TRANSCODING_SEPARATOR, _strip_transcoding

from kedro_viz.models.utils import get_dataset_type, get_file_size
from kedro_viz.models.utils import get_dataset_type

try:
# kedro 0.18.11 onwards
Expand Down Expand Up @@ -567,7 +567,6 @@ def __post_init__(self, data_node: DataNode, dataset_stats: Dict):
dataset_description = dataset._describe()
self.filepath = _parse_filepath(dataset_description)
self.stats = dataset_stats
self.stats["file_size"] = get_file_size(self.filepath)

# Run command is only available if a node is an output, i.e. not a free input
if not data_node.is_free_input:
Expand Down Expand Up @@ -640,7 +639,6 @@ def __post_init__(
dataset_description = original_version._describe()
self.filepath = _parse_filepath(dataset_description)
self.stats = dataset_stats
self.stats["file_size"] = get_file_size(self.filepath)

if not transcoded_data_node.is_free_input:
self.run_command = (
Expand Down
46 changes: 1 addition & 45 deletions package/kedro_viz/models/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
"""`kedro_viz.models.utils` contains utility functions used in the `kedro_viz.models` package"""
import logging
from typing import TYPE_CHECKING, Union

import fsspec
from typing import TYPE_CHECKING

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -45,45 +43,3 @@ def get_dataset_type(dataset: "AbstractDataset") -> str:
abbreviated_module_name = ".".join(dataset.__class__.__module__.split(".")[-2:])
class_name = f"{dataset.__class__.__qualname__}"
return f"{abbreviated_module_name}.{class_name}"


def get_file_size(file_path: Union[str, None]) -> Union[int, None]:
"""Get the dataset file size using fsspec. If the file_path is a directory,
get the latest file created (this corresponds to the latest run)

Args:
file_path: The file path for the dataset
"""
try:
if not file_path:
return None

resolved_file_path = file_path
file_system, _, paths = fsspec.get_fs_token_paths(file_path)

# Get information about the file
file_info = file_system.info(paths[0])

if file_info["type"] == "directory":
files = file_system.ls(paths[0])
# Filter only directories from the list
directories = [
file
for file in files
if file_system.isdir(file) and len(file_system.ls(file)) > 0
]
resolved_file_path = file_system.ls(
max(directories, key=lambda f: file_system.info(f)["created"])
)[0]

with file_system.open(resolved_file_path) as file:
file_size_in_bytes = file.size
return file_size_in_bytes

except FileNotFoundError as exc:
logger.warning("File not found for %s : %s", file_path, exc)
return None

except Exception as exc: # pylint: disable=broad-exception-caught
logger.warning("Error getting file size for %s : %s", file_path, exc)
return None
33 changes: 9 additions & 24 deletions package/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Dict
from unittest import mock

import fsspec
import pandas as pd
import pytest
from fastapi.testclient import TestClient
Expand Down Expand Up @@ -289,29 +287,16 @@ def example_data_frame():
yield pd.DataFrame(data)


@pytest.fixture
def example_text_file(tmp_path):
# Get the current timestamp
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")

# Create the directory path using tmp_path and timestamp
directory_path = tmp_path / timestamp

# Create the directory using fsspec
fs = fsspec.filesystem("file")
fs.mkdir(directory_path)

# Create the file path inside the directory
file_path = directory_path / "file.txt"

# Create the mock file using fsspec
with fs.open(file_path, "w") as file:
file.write("Mock content")

yield file_path


@pytest.fixture
def example_dataset_stats_hook_obj():
# Create an instance of DatasetStatsHook
yield DatasetStatsHook()


@pytest.fixture
def example_csv_dataset(tmp_path, example_data_frame):
new_csv_dataset = pandas.CSVDataSet(
filepath=Path(tmp_path / "model_inputs.csv").as_posix(),
)
new_csv_dataset.save(example_data_frame)
yield new_csv_dataset
Loading