Skip to content

Commit

Permalink
feat: fine-grain chart data telemetry (apache#31273)
Browse files Browse the repository at this point in the history
  • Loading branch information
betodealmeida authored Dec 10, 2024
1 parent 232e205 commit d6a82f7
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 29 deletions.
24 changes: 14 additions & 10 deletions superset/charts/data/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,17 +399,19 @@ def _process_data(query_data: Any) -> Any:
for query in queries:
with contextlib.suppress(KeyError):
del query["query"]
response_data = json.dumps(
{"result": queries},
default=json.json_int_dttm_ser,
ignore_nan=True,
)
with event_logger.log_context(f"{self.__class__.__name__}.json_dumps"):
response_data = json.dumps(
{"result": queries},
default=json.json_int_dttm_ser,
ignore_nan=True,
)
resp = make_response(response_data, 200)
resp.headers["Content-Type"] = "application/json; charset=utf-8"
return resp

return self.response_400(message=f"Unsupported result_format: {result_format}")

@event_logger.log_this
def _get_data_response(
self,
command: ChartDataCommand,
Expand All @@ -435,11 +437,13 @@ def _map_form_data_datasource_to_dataset_id(
) -> dict[str, Any]:
return {
"dashboard_id": form_data.get("form_data", {}).get("dashboardId"),
"dataset_id": form_data.get("datasource", {}).get("id")
if isinstance(form_data.get("datasource"), dict)
and form_data.get("datasource", {}).get("type")
== DatasourceType.TABLE.value
else None,
"dataset_id": (
form_data.get("datasource", {}).get("id")
if isinstance(form_data.get("datasource"), dict)
and form_data.get("datasource", {}).get("type")
== DatasourceType.TABLE.value
else None
),
"slice_id": form_data.get("form_data", {}).get("slice_id"),
}

Expand Down
18 changes: 12 additions & 6 deletions superset/charts/post_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from flask_babel import gettext as __

from superset.common.chart_data import ChartDataResultFormat
from superset.extensions import event_logger
from superset.utils.core import (
extract_dataframe_dtypes,
get_column_names,
Expand Down Expand Up @@ -296,6 +297,7 @@ def table(
}


@event_logger.log_this
def apply_post_process(
result: dict[Any, Any],
form_data: Optional[dict[str, Any]] = None,
Expand Down Expand Up @@ -344,15 +346,19 @@ def apply_post_process(
# `Tuple[str]`. Otherwise encoding to JSON later will fail because
# maps cannot have tuples as their keys in JSON.
processed_df.columns = [
" ".join(str(name) for name in column).strip()
if isinstance(column, tuple)
else column
(
" ".join(str(name) for name in column).strip()
if isinstance(column, tuple)
else column
)
for column in processed_df.columns
]
processed_df.index = [
" ".join(str(name) for name in index).strip()
if isinstance(index, tuple)
else index
(
" ".join(str(name) for name in index).strip()
if isinstance(index, tuple)
else index
)
for index in processed_df.index
]

Expand Down
44 changes: 32 additions & 12 deletions superset/models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@
from superset.models.helpers import AuditMixinNullable, ImportExportMixin
from superset.result_set import SupersetResultSet
from superset.sql_parse import Table
from superset.superset_typing import OAuth2ClientConfig, ResultSetColumnType
from superset.superset_typing import (
DbapiDescription,
OAuth2ClientConfig,
ResultSetColumnType,
)
from superset.utils import cache as cache_util, core as utils, json
from superset.utils.backports import StrEnum
from superset.utils.core import DatasourceName, get_username
Expand Down Expand Up @@ -667,7 +671,7 @@ def mutate_sql_based_on_config(self, sql_: str, is_split: bool = False) -> str:
)
return sql_

def get_df( # pylint: disable=too-many-locals
def get_df(
self,
sql: str,
catalog: str | None = None,
Expand Down Expand Up @@ -700,21 +704,37 @@ def _log_query(sql: str) -> None:
object_ref=__name__,
):
self.db_engine_spec.execute(cursor, sql_, self)
if i < len(sqls) - 1:
# If it's not the last, we don't keep the results
cursor.fetchall()
else:
# Last query, fetch and process the results
data = self.db_engine_spec.fetch_data(cursor)
result_set = SupersetResultSet(
data, cursor.description, self.db_engine_spec
)
df = result_set.to_pandas_df()

rows = self.fetch_rows(cursor, i == len(sqls) - 1)
if rows is not None:
df = self.load_into_dataframe(cursor.description, rows)

if mutator:
df = mutator(df)

return self.post_process_df(df)

@event_logger.log_this
def fetch_rows(self, cursor: Any, last: bool) -> list[tuple[Any, ...]] | None:
if not last:
cursor.fetchall()
return None

return self.db_engine_spec.fetch_data(cursor)

@event_logger.log_this
def load_into_dataframe(
self,
description: DbapiDescription,
data: list[tuple[Any, ...]],
) -> pd.DataFrame:
result_set = SupersetResultSet(
data,
description,
self.db_engine_spec,
)
return result_set.to_pandas_df()

def compile_sqla_query(
self,
qry: Select,
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/reports/commands_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1710,7 +1710,7 @@ def test_alert_limit_is_applied(
with patch.object(
create_alert_email_chart.database.db_engine_spec,
"fetch_data",
return_value=None,
return_value=[],
): # noqa: F841
AsyncExecuteReportScheduleCommand(
TEST_ID, create_alert_email_chart.id, datetime.utcnow()
Expand Down

0 comments on commit d6a82f7

Please sign in to comment.