From 67d5684620518bf6bf30f12aa325622eced89b64 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Wed, 6 Mar 2024 13:16:46 +0100 Subject: [PATCH 01/75] Streamlit improvements --- dlt/cli/pipeline_command.py | 13 +- dlt/helpers/streamlit_app/__init__.py | 1 + dlt/helpers/streamlit_app/dashboard.py | 176 +++++++++++++++++++ dlt/helpers/streamlit_app/menu.py | 93 ++++++++++ dlt/helpers/streamlit_app/pages/__init__.py | 0 dlt/helpers/streamlit_app/pages/load_info.py | 134 ++++++++++++++ dlt/helpers/streamlit_app/utils.py | 4 + dlt/helpers/streamlit_app/widgets.py | 15 ++ 8 files changed, 434 insertions(+), 2 deletions(-) create mode 100644 dlt/helpers/streamlit_app/__init__.py create mode 100644 dlt/helpers/streamlit_app/dashboard.py create mode 100644 dlt/helpers/streamlit_app/menu.py create mode 100644 dlt/helpers/streamlit_app/pages/__init__.py create mode 100644 dlt/helpers/streamlit_app/pages/load_info.py create mode 100644 dlt/helpers/streamlit_app/utils.py create mode 100644 dlt/helpers/streamlit_app/widgets.py diff --git a/dlt/cli/pipeline_command.py b/dlt/cli/pipeline_command.py index 9981fa8493..fe25496c7d 100644 --- a/dlt/cli/pipeline_command.py +++ b/dlt/cli/pipeline_command.py @@ -101,12 +101,21 @@ def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]: if operation == "show": from dlt.common.runtime import signals - from dlt.helpers import streamlit_helper + from dlt.helpers.streamlit_app import dashboard with signals.delayed_signals(): venv = Venv.restore_current() for line in iter_stdout( - venv, "streamlit", "run", streamlit_helper.__file__, pipeline_name + venv, + "streamlit", + "run", + dashboard.__file__, + pipeline_name, + "--client.showSidebarNavigation", + "false", + # TODO: once done remove the option below + "--server.runOnSave", + "true", ): fmt.echo(line) diff --git a/dlt/helpers/streamlit_app/__init__.py b/dlt/helpers/streamlit_app/__init__.py new file mode 100644 index 0000000000..b8a5b8c073 --- /dev/null +++ b/dlt/helpers/streamlit_app/__init__.py @@ -0,0 +1 @@ +# import dlt.cli.helpers.dependencies # noqa diff --git a/dlt/helpers/streamlit_app/dashboard.py b/dlt/helpers/streamlit_app/dashboard.py new file mode 100644 index 0000000000..45663cacb5 --- /dev/null +++ b/dlt/helpers/streamlit_app/dashboard.py @@ -0,0 +1,176 @@ +import sys + +from typing import List, Iterator + +import streamlit as st +from streamlit_extras.tags import tagger_component + +from dlt.common.exceptions import MissingDependencyException +from dlt.common.utils import flatten_list_or_items +from dlt.common.libs.pandas import pandas as pd +from dlt.helpers.streamlit_app.menu import menu +from dlt.pipeline import Pipeline +from dlt.pipeline.exceptions import SqlClientNotAvailable + + +# use right caching function to disable deprecation message +if hasattr(st, "cache_data"): + cache_data = st.cache_data +else: + cache_data = st.experimental_memo + + +def write_data_explorer_page( + pipeline: Pipeline, + schema_name: str = None, + show_dlt_tables: bool = False, + example_query: str = "", + show_charts: bool = True, +) -> None: + """Writes Streamlit app page with a schema and live data preview. + + #### Args: + pipeline (Pipeline): Pipeline instance to use. + schema_name (str, optional): Name of the schema to display. If None, default schema is used. + show_dlt_tables (bool, optional): Should show dlt internal tables. Defaults to False. + example_query (str, optional): Example query to be displayed in the SQL Query box. + show_charts (bool, optional): Should automatically show charts for the queries from SQL Query box. Defaults to True. + + Raises: + MissingDependencyException: Raised when a particular python dependency is not installed + """ + + @cache_data(ttl=60) + def _query_data(query: str, chunk_size: int = None) -> pd.DataFrame: + try: + with pipeline.sql_client(schema_name) as client: + with client.execute_query(query) as curr: + return curr.df(chunk_size=chunk_size) + except SqlClientNotAvailable: + st.error("Cannot load data - SqlClient not available") + + st.subheader("Schemas and tables") + + num_schemas = len(pipeline.schema_names) + if num_schemas == 1: + schema_name = pipeline.schema_names[0] + selected_schema = pipeline.schemas.get(schema_name) + st.text(f"Schema: {schema_name}") + elif num_schemas > 1: + st.subheader("Schema:") + text = "Pick a schema name to see all its tables below" + selected_schema_name = st.selectbox(text, sorted(pipeline.schema_names)) + selected_schema = pipeline.schemas.get(selected_schema_name) + + for table in sorted(selected_schema.data_tables(), key=lambda table: table["name"]): + table_name = table["name"] + + tagger_component("Table", [table_name]) + if "description" in table: + st.text(table["description"]) + table_hints: List[str] = [] + if "parent" in table: + table_hints.append("parent: **%s**" % table["parent"]) + if "resource" in table: + table_hints.append("resource: **%s**" % table["resource"]) + if "write_disposition" in table: + table_hints.append("write disposition: **%s**" % table["write_disposition"]) + columns = table["columns"] + primary_keys: Iterator[str] = flatten_list_or_items( + [ + col_name + for col_name in columns.keys() + if not col_name.startswith("_") and columns[col_name].get("primary_key") is not None + ] + ) + table_hints.append("primary key(s): **%s**" % ", ".join(primary_keys)) + merge_keys = flatten_list_or_items( + [ + col_name + for col_name in columns.keys() + if not col_name.startswith("_") + and not columns[col_name].get("merge_key") is None # noqa: E714 + ] + ) + table_hints.append("merge key(s): **%s**" % ", ".join(merge_keys)) + + st.markdown(" | ".join(table_hints)) + + # table schema contains various hints (like clustering or partition options) that we do not want to show in basic view + def essentials_f(c): + return {k: v for k, v in c.items() if k in ["name", "data_type", "nullable"]} + + st.table(map(essentials_f, table["columns"].values())) + # add a button that when pressed will show the full content of a table + if st.button("SHOW DATA", key=table_name): + df = _query_data(f"SELECT * FROM {table_name}", chunk_size=2048) + if df is None: + st.text("No rows returned") + else: + rows_count = df.shape[0] + if df.shape[0] < 2048: + st.text(f"All {rows_count} row(s)") + else: + st.text(f"Top {rows_count} row(s)") + st.dataframe(df) + + st.header("Run your query") + sql_query = st.text_area("Enter your SQL query", value=example_query) + if st.button("Run Query"): + if sql_query: + try: + # run the query from the text area + df = _query_data(sql_query) + if df is None: + st.text("No rows returned") + else: + rows_count = df.shape[0] + st.text(f"{rows_count} row(s) returned") + st.dataframe(df) + try: + # now if the dataset has supported shape try to display the bar or altair chart + if df.dtypes.shape[0] == 1 and show_charts: + # try barchart + st.bar_chart(df) + if df.dtypes.shape[0] == 2 and show_charts: + # try to import altair charts + try: + import altair as alt + except ModuleNotFoundError: + raise MissingDependencyException( + "DLT Streamlit Helpers", + ["altair"], + "DLT Helpers for Streamlit should be run within a streamlit" + " app.", + ) + + # try altair + bar_chart = ( + alt.Chart(df) + .mark_bar() + .encode( + x=f"{df.columns[1]}:Q", y=alt.Y(f"{df.columns[0]}:N", sort="-x") + ) + ) + st.altair_chart(bar_chart, use_container_width=True) + except Exception as ex: + st.error(f"Chart failed due to: {ex}") + except Exception as ex: + st.text("Exception when running query") + st.exception(ex) + + +def display(pipeline_name: str) -> None: + import dlt + + pipeline = dlt.attach(pipeline_name) + st.session_state["pipeline_name"] = pipeline_name + st.subheader(f"{pipeline_name}", divider="rainbow") + with st.sidebar: + menu() + + write_data_explorer_page(pipeline) + + +if __name__ == "__main__": + display(sys.argv[1]) diff --git a/dlt/helpers/streamlit_app/menu.py b/dlt/helpers/streamlit_app/menu.py new file mode 100644 index 0000000000..1003c86725 --- /dev/null +++ b/dlt/helpers/streamlit_app/menu.py @@ -0,0 +1,93 @@ +import base64 +import dlt +import streamlit as st + +from PIL import Image +from dlt.common.destination.reference import WithStateSync +from dlt.helpers.streamlit_app.utils import HERE +from dlt.pipeline.state_sync import load_state_from_destination +from streamlit_extras.metric_cards import style_metric_cards + + +def logo(): + logo_text = """ + + """ + styles = """ + + """ + + st.markdown(logo_text + styles, unsafe_allow_html=True) + + +def menu() -> None: + logo() + st.page_link(f"{HERE}/dashboard.py", label="Explore data", icon="🕹ī¸") + st.page_link(f"{HERE}/pages/load_info.py", label="Load info", icon="💾") + + +def pipeline_state_info(pipeline: dlt.Pipeline): + st.divider() + st.subheader("Pipeline state info") + remote_state = None + with pipeline.destination_client() as client: + if isinstance(client, WithStateSync): + remote_state = load_state_from_destination(pipeline.pipeline_name, client) + + local_state = pipeline.state + + if remote_state: + remote_state_version = remote_state["_state_version"] + else: + remote_state_version = "---" # type: ignore + + col1, col2 = st.columns(2) + + col1.metric(label="Local version", value=local_state["_state_version"]) + col2.metric(label="Remote version", value=remote_state_version) + style_metric_cards( + background_color="#0e1111", + border_size_px=1, + border_color="#272736", + border_left_color="#007b05", + border_radius_px=4, + ) + + if remote_state_version != local_state["_state_version"]: + st.warning( + "Looks like that local state is not yet synchronized or synchronization is disabled", + icon="⚠ī¸", + ) diff --git a/dlt/helpers/streamlit_app/pages/__init__.py b/dlt/helpers/streamlit_app/pages/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dlt/helpers/streamlit_app/pages/load_info.py b/dlt/helpers/streamlit_app/pages/load_info.py new file mode 100644 index 0000000000..6b452d1c5e --- /dev/null +++ b/dlt/helpers/streamlit_app/pages/load_info.py @@ -0,0 +1,134 @@ +import streamlit as st + +import humanize + +from dlt.common import pendulum +from dlt.common.configuration.exceptions import ConfigFieldMissingException +from dlt.common.destination.reference import WithStateSync +from dlt.common.libs.pandas import pandas as pd +from dlt.helpers.streamlit_app.menu import menu, pipeline_state_info +from dlt.helpers.streamlit_app.widgets import pipeline_summary + +from dlt.pipeline import Pipeline +from dlt.pipeline.exceptions import CannotRestorePipelineException, SqlClientNotAvailable +from dlt.pipeline.state_sync import load_state_from_destination + +# use right caching function to disable deprecation message +if hasattr(st, "cache_data"): + cache_data = st.cache_data +else: + cache_data = st.experimental_memo + + +def write_load_status_page(pipeline: Pipeline) -> None: + """Display pipeline loading information. Will be moved to dlt package once tested""" + + @cache_data(ttl=600) + def _query_data(query: str, schema_name: str = None) -> pd.DataFrame: + try: + with pipeline.sql_client(schema_name) as client: + with client.execute_query(query) as curr: + return curr.df() + except SqlClientNotAvailable: + st.error("Cannot load data - SqlClient not available") + + @cache_data(ttl=5) + def _query_data_live(query: str, schema_name: str = None) -> pd.DataFrame: + try: + with pipeline.sql_client(schema_name) as client: + with client.execute_query(query) as curr: + return curr.df() + except SqlClientNotAvailable: + st.error("Cannot load data - SqlClient not available") + + try: + pipeline_summary(pipeline) + + st.subheader("Last load info") + col1, col2, col3 = st.columns(3) + loads_df = _query_data_live( + f"SELECT load_id, inserted_at FROM {pipeline.default_schema.loads_table_name} WHERE" + " status = 0 ORDER BY inserted_at DESC LIMIT 101 " + ) + loads_no = loads_df.shape[0] + if loads_df.shape[0] > 0: + rel_time = ( + humanize.naturaldelta( + pendulum.now() - pendulum.from_timestamp(loads_df.iloc[0, 1].timestamp()) + ) + + " ago" + ) + last_load_id = loads_df.iloc[0, 0] + if loads_no > 100: + loads_no = "> " + str(loads_no) + else: + rel_time = "---" + last_load_id = "---" + + col1.metric("Last load time", rel_time) + col2.metric("Last load id", last_load_id) + col3.metric("Total number of loads", loads_no) + + st.markdown("**Number of loaded rows:**") + selected_load_id = st.selectbox("Select load id", loads_df) + schema = pipeline.default_schema + + # construct a union query + query_parts = [] + for table in schema.data_tables(): + if "parent" in table: + continue + table_name = table["name"] + query_parts.append( + f"SELECT '{table_name}' as table_name, COUNT(1) As rows_count FROM" + f" {table_name} WHERE _dlt_load_id = '{selected_load_id}'" + ) + query_parts.append("UNION ALL") + query_parts.pop() + rows_counts_df = _query_data("\n".join(query_parts)) + + st.markdown(f"Rows loaded in **{selected_load_id}**") + st.dataframe(rows_counts_df) + + st.markdown("**Last 100 loads**") + st.dataframe(loads_df) + + st.subheader("Schema updates") + schemas_df = _query_data_live( + "SELECT schema_name, inserted_at, version, version_hash FROM" + f" {pipeline.default_schema.version_table_name} ORDER BY inserted_at DESC LIMIT 101 " + ) + st.markdown("**100 recent schema updates**") + st.dataframe(schemas_df) + except CannotRestorePipelineException as restore_ex: + st.error("Seems like the pipeline does not exist. Did you run it at least once?") + st.exception(restore_ex) + + except ConfigFieldMissingException as cf_ex: + st.error( + "Pipeline credentials/configuration is missing. This most often happen when you run the" + " streamlit app from different folder than the `.dlt` with `toml` files resides." + ) + st.text(str(cf_ex)) + + except Exception as ex: + st.error("Pipeline info could not be prepared. Did you load the data at least once?") + st.exception(ex) + + +def show(): + import dlt + + if not st.session_state.get("pipeline_name"): + st.switch_page("dashboard.py") + + pipeline = dlt.attach(st.session_state["pipeline_name"]) + st.subheader("Load info", divider="rainbow") + write_load_status_page(pipeline) + with st.sidebar: + menu() + pipeline_state_info(pipeline) + + +if __name__ == "__main__": + show() diff --git a/dlt/helpers/streamlit_app/utils.py b/dlt/helpers/streamlit_app/utils.py new file mode 100644 index 0000000000..566e847d3e --- /dev/null +++ b/dlt/helpers/streamlit_app/utils.py @@ -0,0 +1,4 @@ +from pathlib import Path + + +HERE = Path(__file__).absolute().parent diff --git a/dlt/helpers/streamlit_app/widgets.py b/dlt/helpers/streamlit_app/widgets.py new file mode 100644 index 0000000000..3a386d7282 --- /dev/null +++ b/dlt/helpers/streamlit_app/widgets.py @@ -0,0 +1,15 @@ +import dlt +import streamlit as st + + +def pipeline_summary(pipeline: dlt.Pipeline): + credentials = pipeline.sql_client().credentials + schema_names = ", ".join(sorted(pipeline.schema_names)) + expander = st.expander("Pipeline info") + expander.markdown(f""" + * pipeline name: **{pipeline.pipeline_name}** + * destination: **{str(credentials)}** in **{pipeline.destination.destination_description}** + * dataset name: **{pipeline.dataset_name}** + * default schema name: **{pipeline.default_schema_name}** + * all schema names: **{schema_names}** + """) From dbfd63ca984fd0c04a2103224de003b0712d7cb8 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Wed, 6 Mar 2024 14:26:10 +0100 Subject: [PATCH 02/75] Refactor menu and create custom widgets --- dlt/helpers/streamlit_app/dashboard.py | 2 +- dlt/helpers/streamlit_app/last_load_info.py | 36 +++++++++ dlt/helpers/streamlit_app/menu.py | 79 +++++-------------- dlt/helpers/streamlit_app/pages/load_info.py | 28 +------ dlt/helpers/streamlit_app/utils.py | 36 +++++++++ dlt/helpers/streamlit_app/widgets/__init__.py | 3 + dlt/helpers/streamlit_app/widgets/logo.py | 45 +++++++++++ dlt/helpers/streamlit_app/widgets/stats.py | 52 ++++++++++++ .../{widgets.py => widgets/summary.py} | 0 9 files changed, 195 insertions(+), 86 deletions(-) create mode 100644 dlt/helpers/streamlit_app/last_load_info.py create mode 100644 dlt/helpers/streamlit_app/widgets/__init__.py create mode 100644 dlt/helpers/streamlit_app/widgets/logo.py create mode 100644 dlt/helpers/streamlit_app/widgets/stats.py rename dlt/helpers/streamlit_app/{widgets.py => widgets/summary.py} (100%) diff --git a/dlt/helpers/streamlit_app/dashboard.py b/dlt/helpers/streamlit_app/dashboard.py index 45663cacb5..93c5fdf048 100644 --- a/dlt/helpers/streamlit_app/dashboard.py +++ b/dlt/helpers/streamlit_app/dashboard.py @@ -167,7 +167,7 @@ def display(pipeline_name: str) -> None: st.session_state["pipeline_name"] = pipeline_name st.subheader(f"{pipeline_name}", divider="rainbow") with st.sidebar: - menu() + menu(pipeline) write_data_explorer_page(pipeline) diff --git a/dlt/helpers/streamlit_app/last_load_info.py b/dlt/helpers/streamlit_app/last_load_info.py new file mode 100644 index 0000000000..7f7fa4e3f5 --- /dev/null +++ b/dlt/helpers/streamlit_app/last_load_info.py @@ -0,0 +1,36 @@ +import dlt +import humanize +import streamlit as st + +from dlt.common import pendulum +from dlt.helpers.streamlit_app.utils import query_data_live +from dlt.helpers.streamlit_app.widgets.stats import stat + + +def last_load_info(pipeline: dlt.Pipeline): + loads_df = query_data_live( + pipeline, + f"SELECT load_id, inserted_at FROM {pipeline.default_schema.loads_table_name} WHERE" + " status = 0 ORDER BY inserted_at DESC LIMIT 101 ", + ) + + loads_no = loads_df.shape[0] + if loads_df.shape[0] > 0: + rel_time = ( + humanize.naturaldelta( + pendulum.now() - pendulum.from_timestamp(loads_df.iloc[0, 1].timestamp()) + ) + + " ago" + ) + last_load_id = loads_df.iloc[0, 0] + if loads_no > 100: + loads_no = "> " + str(loads_no) + else: + rel_time = "---" + last_load_id = "---" + + st.divider() + st.subheader("Last load info", divider="rainbow") + stat("Last load time", rel_time, border_left_width=4) + stat("Last load id", last_load_id) + stat("Total number of loads", loads_no) diff --git a/dlt/helpers/streamlit_app/menu.py b/dlt/helpers/streamlit_app/menu.py index 1003c86725..3c21c7f1aa 100644 --- a/dlt/helpers/streamlit_app/menu.py +++ b/dlt/helpers/streamlit_app/menu.py @@ -1,67 +1,25 @@ -import base64 import dlt import streamlit as st -from PIL import Image from dlt.common.destination.reference import WithStateSync from dlt.helpers.streamlit_app.utils import HERE +from dlt.helpers.streamlit_app.widgets import logo, stat +from dlt.helpers.streamlit_app.last_load_info import last_load_info from dlt.pipeline.state_sync import load_state_from_destination -from streamlit_extras.metric_cards import style_metric_cards -def logo(): - logo_text = """ - - """ - styles = """ - - """ - - st.markdown(logo_text + styles, unsafe_allow_html=True) - - -def menu() -> None: +def menu(pipeline: dlt.Pipeline) -> None: logo() st.page_link(f"{HERE}/dashboard.py", label="Explore data", icon="🕹ī¸") st.page_link(f"{HERE}/pages/load_info.py", label="Load info", icon="💾") + pipeline_state_info(pipeline) + last_load_info(pipeline) def pipeline_state_info(pipeline: dlt.Pipeline): st.divider() - st.subheader("Pipeline state info") + st.subheader(f"Pipeline {pipeline.pipeline_name}", divider="rainbow") + st.subheader("State info") remote_state = None with pipeline.destination_client() as client: if isinstance(client, WithStateSync): @@ -76,15 +34,20 @@ def pipeline_state_info(pipeline: dlt.Pipeline): col1, col2 = st.columns(2) - col1.metric(label="Local version", value=local_state["_state_version"]) - col2.metric(label="Remote version", value=remote_state_version) - style_metric_cards( - background_color="#0e1111", - border_size_px=1, - border_color="#272736", - border_left_color="#007b05", - border_radius_px=4, - ) + with col1: + stat( + label="Local version", + value=local_state["_state_version"], + display="block", + border_left_width=4, + ) + with col2: + stat( + label="Remote version", + value=remote_state_version, + display="block", + border_left_width=4, + ) if remote_state_version != local_state["_state_version"]: st.warning( diff --git a/dlt/helpers/streamlit_app/pages/load_info.py b/dlt/helpers/streamlit_app/pages/load_info.py index 6b452d1c5e..0c322eb16c 100644 --- a/dlt/helpers/streamlit_app/pages/load_info.py +++ b/dlt/helpers/streamlit_app/pages/load_info.py @@ -1,17 +1,12 @@ import streamlit as st -import humanize -from dlt.common import pendulum from dlt.common.configuration.exceptions import ConfigFieldMissingException -from dlt.common.destination.reference import WithStateSync from dlt.common.libs.pandas import pandas as pd from dlt.helpers.streamlit_app.menu import menu, pipeline_state_info from dlt.helpers.streamlit_app.widgets import pipeline_summary - from dlt.pipeline import Pipeline from dlt.pipeline.exceptions import CannotRestorePipelineException, SqlClientNotAvailable -from dlt.pipeline.state_sync import load_state_from_destination # use right caching function to disable deprecation message if hasattr(st, "cache_data"): @@ -44,30 +39,10 @@ def _query_data_live(query: str, schema_name: str = None) -> pd.DataFrame: try: pipeline_summary(pipeline) - st.subheader("Last load info") - col1, col2, col3 = st.columns(3) loads_df = _query_data_live( f"SELECT load_id, inserted_at FROM {pipeline.default_schema.loads_table_name} WHERE" " status = 0 ORDER BY inserted_at DESC LIMIT 101 " ) - loads_no = loads_df.shape[0] - if loads_df.shape[0] > 0: - rel_time = ( - humanize.naturaldelta( - pendulum.now() - pendulum.from_timestamp(loads_df.iloc[0, 1].timestamp()) - ) - + " ago" - ) - last_load_id = loads_df.iloc[0, 0] - if loads_no > 100: - loads_no = "> " + str(loads_no) - else: - rel_time = "---" - last_load_id = "---" - - col1.metric("Last load time", rel_time) - col2.metric("Last load id", last_load_id) - col3.metric("Total number of loads", loads_no) st.markdown("**Number of loaded rows:**") selected_load_id = st.selectbox("Select load id", loads_df) @@ -126,8 +101,7 @@ def show(): st.subheader("Load info", divider="rainbow") write_load_status_page(pipeline) with st.sidebar: - menu() - pipeline_state_info(pipeline) + menu(pipeline) if __name__ == "__main__": diff --git a/dlt/helpers/streamlit_app/utils.py b/dlt/helpers/streamlit_app/utils.py index 566e847d3e..157457da4b 100644 --- a/dlt/helpers/streamlit_app/utils.py +++ b/dlt/helpers/streamlit_app/utils.py @@ -1,4 +1,40 @@ from pathlib import Path +import dlt +import pandas as pd +import streamlit as st + +from dlt.pipeline.exceptions import SqlClientNotAvailable HERE = Path(__file__).absolute().parent + +if hasattr(st, "cache_data"): + cache_data = st.cache_data +else: + cache_data = st.experimental_memo + + +def query_data(pipeline: dlt.Pipeline, query: str, schema_name: str = None) -> pd.DataFrame: + @cache_data(ttl=600) + def query_data(query: str, schema_name: str = None): + try: + with pipeline.sql_client(schema_name) as client: + with client.execute_query(query) as curr: + return curr.df() + except SqlClientNotAvailable: + st.error("Cannot load data - SqlClient not available") + + return query_data(query, schema_name) + + +def query_data_live(pipeline: dlt.Pipeline, query: str, schema_name: str = None) -> pd.DataFrame: + @cache_data(ttl=5) + def query_data(query: str, schema_name: str = None): + try: + with pipeline.sql_client(schema_name) as client: + with client.execute_query(query) as curr: + return curr.df() + except SqlClientNotAvailable: + st.error("Cannot load data - SqlClient not available") + + return query_data(query, schema_name) diff --git a/dlt/helpers/streamlit_app/widgets/__init__.py b/dlt/helpers/streamlit_app/widgets/__init__.py new file mode 100644 index 0000000000..6cd6888903 --- /dev/null +++ b/dlt/helpers/streamlit_app/widgets/__init__.py @@ -0,0 +1,3 @@ +from dlt.helpers.streamlit_app.widgets.logo import logo +from dlt.helpers.streamlit_app.widgets.stats import stat +from dlt.helpers.streamlit_app.widgets.summary import pipeline_summary diff --git a/dlt/helpers/streamlit_app/widgets/logo.py b/dlt/helpers/streamlit_app/widgets/logo.py new file mode 100644 index 0000000000..9eeb7d1854 --- /dev/null +++ b/dlt/helpers/streamlit_app/widgets/logo.py @@ -0,0 +1,45 @@ +import streamlit as st + + +def logo(): + logo_text = """ + + """ + styles = """ + + """ + + st.markdown(logo_text + styles, unsafe_allow_html=True) diff --git a/dlt/helpers/streamlit_app/widgets/stats.py b/dlt/helpers/streamlit_app/widgets/stats.py new file mode 100644 index 0000000000..a9c677898f --- /dev/null +++ b/dlt/helpers/streamlit_app/widgets/stats.py @@ -0,0 +1,52 @@ +from typing import Any, Optional +import streamlit as st + + +def stat( + label: str, + value: Any, + width: Optional[str] = "100%", + display: Optional[str] = "inline-block", + background_color: Optional[str] = "#0e1111", + border_radius: Optional[int] = 4, + border_color: Optional[str] = "#272736", + border_left_color: Optional[str] = "#007b05", + border_left_width: Optional[int] = 0, +): + stat_html = f""" +
+

{label}

+

{value}

+
+ """ + styles = """ + .stat { + display: %s; + width: %s; + border-radius: %dpx; + border: 1px solid %s; + background-color: %s; + padding: 2%% 2%% 1%% 5%%; + margin-bottom: 2%%; + } + .stat-label { + font-size: 14px; + margin-bottom: 5px; + } + .stat-value { + font-size: 32px; + margin-bottom: 0; + } + """ % (display, width, border_radius, border_color, background_color) + + if border_left_width > 1: + styles += """ + .stat { + border-left: %dpx solid %s !important; + } + """ % (border_left_width, border_left_color) + + st.markdown( + stat_html + f"", + unsafe_allow_html=True, + ) diff --git a/dlt/helpers/streamlit_app/widgets.py b/dlt/helpers/streamlit_app/widgets/summary.py similarity index 100% rename from dlt/helpers/streamlit_app/widgets.py rename to dlt/helpers/streamlit_app/widgets/summary.py From 476acdd34a12365b1fed13e881ea7113b6deed52 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Wed, 6 Mar 2024 14:53:21 +0100 Subject: [PATCH 03/75] Add tag component --- dlt/helpers/streamlit_app/dashboard.py | 5 +-- dlt/helpers/streamlit_app/widgets/__init__.py | 1 + dlt/helpers/streamlit_app/widgets/tags.py | 37 +++++++++++++++++++ 3 files changed, 40 insertions(+), 3 deletions(-) create mode 100644 dlt/helpers/streamlit_app/widgets/tags.py diff --git a/dlt/helpers/streamlit_app/dashboard.py b/dlt/helpers/streamlit_app/dashboard.py index 93c5fdf048..02e373dbd9 100644 --- a/dlt/helpers/streamlit_app/dashboard.py +++ b/dlt/helpers/streamlit_app/dashboard.py @@ -3,12 +3,12 @@ from typing import List, Iterator import streamlit as st -from streamlit_extras.tags import tagger_component from dlt.common.exceptions import MissingDependencyException from dlt.common.utils import flatten_list_or_items from dlt.common.libs.pandas import pandas as pd from dlt.helpers.streamlit_app.menu import menu +from dlt.helpers.streamlit_app.widgets import tag from dlt.pipeline import Pipeline from dlt.pipeline.exceptions import SqlClientNotAvailable @@ -64,8 +64,7 @@ def _query_data(query: str, chunk_size: int = None) -> pd.DataFrame: for table in sorted(selected_schema.data_tables(), key=lambda table: table["name"]): table_name = table["name"] - - tagger_component("Table", [table_name]) + tag(table_name, label="Table") if "description" in table: st.text(table["description"]) table_hints: List[str] = [] diff --git a/dlt/helpers/streamlit_app/widgets/__init__.py b/dlt/helpers/streamlit_app/widgets/__init__.py index 6cd6888903..b14c44d106 100644 --- a/dlt/helpers/streamlit_app/widgets/__init__.py +++ b/dlt/helpers/streamlit_app/widgets/__init__.py @@ -1,3 +1,4 @@ from dlt.helpers.streamlit_app.widgets.logo import logo from dlt.helpers.streamlit_app.widgets.stats import stat from dlt.helpers.streamlit_app.widgets.summary import pipeline_summary +from dlt.helpers.streamlit_app.widgets.tags import tag diff --git a/dlt/helpers/streamlit_app/widgets/tags.py b/dlt/helpers/streamlit_app/widgets/tags.py new file mode 100644 index 0000000000..30ae99a28e --- /dev/null +++ b/dlt/helpers/streamlit_app/widgets/tags.py @@ -0,0 +1,37 @@ +from typing import Optional, Literal + +import streamlit as st + +TagType = Literal["info", "success", "warning", "error"] + + +def tag( + tag_name: str, + label: Optional[str] = None, + border_radius: Optional[int] = 4, + tag_type: Optional[TagType] = "info", +) -> None: + tag_html = f""" + {label+": " if label else ""}{tag_name} + """ + kinds = { + "info": {"text_color": "#1864ab", "bg_color": "#4dabf7"}, + "success": {"text_color": "#2b8a3e", "bg_color": "#8ce99a"}, + "warning": {"text_color": "#d9480f", "bg_color": "#ffa94d"}, + "error": {"text_color": "#c92a2a", "bg_color": "#ffe3e3"}, + } + kind = kinds[tag_type] + bg_color = kind["bg_color"] + text_color = kind["text_color"] + + styles = """ + + """ % (border_radius, bg_color, text_color) + st.markdown(tag_html + styles, unsafe_allow_html=True) From cf59ea493ea4d5cea4e1abb051c39254839feba7 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Wed, 6 Mar 2024 14:59:55 +0100 Subject: [PATCH 04/75] Add destination name to sidebar --- dlt/helpers/streamlit_app/menu.py | 7 ++++++- dlt/helpers/streamlit_app/widgets/tags.py | 5 +++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/dlt/helpers/streamlit_app/menu.py b/dlt/helpers/streamlit_app/menu.py index 3c21c7f1aa..8a0ed176a0 100644 --- a/dlt/helpers/streamlit_app/menu.py +++ b/dlt/helpers/streamlit_app/menu.py @@ -3,7 +3,7 @@ from dlt.common.destination.reference import WithStateSync from dlt.helpers.streamlit_app.utils import HERE -from dlt.helpers.streamlit_app.widgets import logo, stat +from dlt.helpers.streamlit_app.widgets import logo, stat, tag from dlt.helpers.streamlit_app.last_load_info import last_load_info from dlt.pipeline.state_sync import load_state_from_destination @@ -18,6 +18,11 @@ def menu(pipeline: dlt.Pipeline) -> None: def pipeline_state_info(pipeline: dlt.Pipeline): st.divider() + tag( + pipeline.destination.destination_name, + label="Destination", + tag_type="error", + ) st.subheader(f"Pipeline {pipeline.pipeline_name}", divider="rainbow") st.subheader("State info") remote_state = None diff --git a/dlt/helpers/streamlit_app/widgets/tags.py b/dlt/helpers/streamlit_app/widgets/tags.py index 30ae99a28e..1aad548aa1 100644 --- a/dlt/helpers/streamlit_app/widgets/tags.py +++ b/dlt/helpers/streamlit_app/widgets/tags.py @@ -2,19 +2,20 @@ import streamlit as st -TagType = Literal["info", "success", "warning", "error"] +TagType = Literal["info", "success", "warning", "error", "mute"] def tag( tag_name: str, label: Optional[str] = None, border_radius: Optional[int] = 4, - tag_type: Optional[TagType] = "info", + tag_type: Optional[TagType] = "mute", ) -> None: tag_html = f""" {label+": " if label else ""}{tag_name} """ kinds = { + "mute": {"text_color": "#495057", "bg_color": "#e9ecef"}, "info": {"text_color": "#1864ab", "bg_color": "#4dabf7"}, "success": {"text_color": "#2b8a3e", "bg_color": "#8ce99a"}, "warning": {"text_color": "#d9480f", "bg_color": "#ffa94d"}, From 3ededb41f121ca87295e42559ff888465437a386 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Wed, 6 Mar 2024 15:01:34 +0100 Subject: [PATCH 05/75] Adjust headers --- dlt/helpers/streamlit_app/dashboard.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dlt/helpers/streamlit_app/dashboard.py b/dlt/helpers/streamlit_app/dashboard.py index 02e373dbd9..7bb50d9c18 100644 --- a/dlt/helpers/streamlit_app/dashboard.py +++ b/dlt/helpers/streamlit_app/dashboard.py @@ -49,7 +49,7 @@ def _query_data(query: str, chunk_size: int = None) -> pd.DataFrame: except SqlClientNotAvailable: st.error("Cannot load data - SqlClient not available") - st.subheader("Schemas and tables") + st.subheader("Schemas and tables", divider="rainbow") num_schemas = len(pipeline.schema_names) if num_schemas == 1: @@ -164,7 +164,6 @@ def display(pipeline_name: str) -> None: pipeline = dlt.attach(pipeline_name) st.session_state["pipeline_name"] = pipeline_name - st.subheader(f"{pipeline_name}", divider="rainbow") with st.sidebar: menu(pipeline) From 50fe82bdc49a7f2c471972fd3c02874b87150eaa Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Wed, 6 Mar 2024 16:04:40 +0100 Subject: [PATCH 06/75] Set color scheme --- dlt/helpers/streamlit_app/dashboard.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/dlt/helpers/streamlit_app/dashboard.py b/dlt/helpers/streamlit_app/dashboard.py index 7bb50d9c18..2e70165f3f 100644 --- a/dlt/helpers/streamlit_app/dashboard.py +++ b/dlt/helpers/streamlit_app/dashboard.py @@ -2,6 +2,7 @@ from typing import List, Iterator +import dlt import streamlit as st from dlt.common.exceptions import MissingDependencyException @@ -11,7 +12,7 @@ from dlt.helpers.streamlit_app.widgets import tag from dlt.pipeline import Pipeline from dlt.pipeline.exceptions import SqlClientNotAvailable - +from streamlit import config # use right caching function to disable deprecation message if hasattr(st, "cache_data"): @@ -160,8 +161,16 @@ def essentials_f(c): def display(pipeline_name: str) -> None: - import dlt + config.set_option("theme.primaryColor", "#191937") + + # Main background + config.set_option("theme.backgroundColor", "#4C4898") + + # Sidebar + config.set_option("theme.secondaryBackgroundColor", "#191937") + # Text + config.set_option("theme.textColor", "#FFFFFF") pipeline = dlt.attach(pipeline_name) st.session_state["pipeline_name"] = pipeline_name with st.sidebar: From c6965bad713dd33b3f16e5abe5ac98e1f505cc5b Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Wed, 6 Mar 2024 16:10:26 +0100 Subject: [PATCH 07/75] Fix linting issues --- dlt/helpers/streamlit_app/dashboard.py | 4 ++-- dlt/helpers/streamlit_app/last_load_info.py | 2 +- dlt/helpers/streamlit_app/menu.py | 2 +- dlt/helpers/streamlit_app/pages/load_info.py | 2 +- dlt/helpers/streamlit_app/utils.py | 6 ++++-- dlt/helpers/streamlit_app/widgets/logo.py | 2 +- dlt/helpers/streamlit_app/widgets/stats.py | 2 +- dlt/helpers/streamlit_app/widgets/summary.py | 2 +- 8 files changed, 12 insertions(+), 10 deletions(-) diff --git a/dlt/helpers/streamlit_app/dashboard.py b/dlt/helpers/streamlit_app/dashboard.py index 2e70165f3f..75bc7dac52 100644 --- a/dlt/helpers/streamlit_app/dashboard.py +++ b/dlt/helpers/streamlit_app/dashboard.py @@ -1,6 +1,6 @@ import sys -from typing import List, Iterator +from typing import Any, Dict, List, Iterator import dlt import streamlit as st @@ -97,7 +97,7 @@ def _query_data(query: str, chunk_size: int = None) -> pd.DataFrame: st.markdown(" | ".join(table_hints)) # table schema contains various hints (like clustering or partition options) that we do not want to show in basic view - def essentials_f(c): + def essentials_f(c) -> Dict[str, Any]: return {k: v for k, v in c.items() if k in ["name", "data_type", "nullable"]} st.table(map(essentials_f, table["columns"].values())) diff --git a/dlt/helpers/streamlit_app/last_load_info.py b/dlt/helpers/streamlit_app/last_load_info.py index 7f7fa4e3f5..565673ac95 100644 --- a/dlt/helpers/streamlit_app/last_load_info.py +++ b/dlt/helpers/streamlit_app/last_load_info.py @@ -7,7 +7,7 @@ from dlt.helpers.streamlit_app.widgets.stats import stat -def last_load_info(pipeline: dlt.Pipeline): +def last_load_info(pipeline: dlt.Pipeline) -> None: loads_df = query_data_live( pipeline, f"SELECT load_id, inserted_at FROM {pipeline.default_schema.loads_table_name} WHERE" diff --git a/dlt/helpers/streamlit_app/menu.py b/dlt/helpers/streamlit_app/menu.py index 8a0ed176a0..2d9507dc66 100644 --- a/dlt/helpers/streamlit_app/menu.py +++ b/dlt/helpers/streamlit_app/menu.py @@ -16,7 +16,7 @@ def menu(pipeline: dlt.Pipeline) -> None: last_load_info(pipeline) -def pipeline_state_info(pipeline: dlt.Pipeline): +def pipeline_state_info(pipeline: dlt.Pipeline) -> None: st.divider() tag( pipeline.destination.destination_name, diff --git a/dlt/helpers/streamlit_app/pages/load_info.py b/dlt/helpers/streamlit_app/pages/load_info.py index 0c322eb16c..f799cb1078 100644 --- a/dlt/helpers/streamlit_app/pages/load_info.py +++ b/dlt/helpers/streamlit_app/pages/load_info.py @@ -91,7 +91,7 @@ def _query_data_live(query: str, schema_name: str = None) -> pd.DataFrame: st.exception(ex) -def show(): +def show() -> None: import dlt if not st.session_state.get("pipeline_name"): diff --git a/dlt/helpers/streamlit_app/utils.py b/dlt/helpers/streamlit_app/utils.py index 157457da4b..66ec652d5a 100644 --- a/dlt/helpers/streamlit_app/utils.py +++ b/dlt/helpers/streamlit_app/utils.py @@ -1,4 +1,5 @@ from pathlib import Path +from typing import Optional import dlt import pandas as pd @@ -14,9 +15,10 @@ cache_data = st.experimental_memo +# FIXME: make something to DRY the code def query_data(pipeline: dlt.Pipeline, query: str, schema_name: str = None) -> pd.DataFrame: @cache_data(ttl=600) - def query_data(query: str, schema_name: str = None): + def query_data(query: str, schema_name: str = None) -> Optional[pd.DataFrame]: try: with pipeline.sql_client(schema_name) as client: with client.execute_query(query) as curr: @@ -29,7 +31,7 @@ def query_data(query: str, schema_name: str = None): def query_data_live(pipeline: dlt.Pipeline, query: str, schema_name: str = None) -> pd.DataFrame: @cache_data(ttl=5) - def query_data(query: str, schema_name: str = None): + def query_data(query: str, schema_name: str = None) -> Optional[pd.DataFrame]: try: with pipeline.sql_client(schema_name) as client: with client.execute_query(query) as curr: diff --git a/dlt/helpers/streamlit_app/widgets/logo.py b/dlt/helpers/streamlit_app/widgets/logo.py index 9eeb7d1854..dfbe07deac 100644 --- a/dlt/helpers/streamlit_app/widgets/logo.py +++ b/dlt/helpers/streamlit_app/widgets/logo.py @@ -1,7 +1,7 @@ import streamlit as st -def logo(): +def logo() -> None: logo_text = """