Skip to content

Commit

Permalink
feat(datasource): Checkbox for always filtering main dttm in datasour…
Browse files Browse the repository at this point in the history
…ce (apache#25204)

Co-authored-by: Elizabeth Thompson <[email protected]>
  • Loading branch information
Always-prog and eschutho authored Sep 14, 2023
1 parent 467e062 commit 14c3249
Show file tree
Hide file tree
Showing 26 changed files with 180 additions and 48 deletions.
10 changes: 10 additions & 0 deletions superset-frontend/src/components/Datasource/DatasourceEditor.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,7 @@ class DatasourceEditor extends React.PureComponent {
? encodeURIComponent(datasource.table_name)
: datasource.table_name,
normalize_columns: datasource.normalize_columns,
always_filter_main_dttm: datasource.always_filter_main_dttm,
};
Object.entries(params).forEach(([key, value]) => {
// rison can't encode the undefined value
Expand Down Expand Up @@ -1003,6 +1004,15 @@ class DatasourceEditor extends React.PureComponent {
)}
control={<CheckboxControl controlId="normalize_columns" />}
/>
<Field
inline
fieldKey="always_filter_main_dttm"
label={t('Always filter main datetime column')}
description={t(
`When the secondary temporal columns are filtered, apply the same filter to the main datetime column.`,
)}
control={<CheckboxControl controlId="always_filter_main_dttm" />}
/>
</Fieldset>
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ const DatasourceModal: FunctionComponent<DatasourceModalProps> = ({
description: currentDatasource.description,
main_dttm_col: currentDatasource.main_dttm_col,
normalize_columns: currentDatasource.normalize_columns,
always_filter_main_dttm: currentDatasource.always_filter_main_dttm,
offset: currentDatasource.offset,
default_endpoint: currentDatasource.default_endpoint,
cache_timeout:
Expand Down
1 change: 1 addition & 0 deletions superset-frontend/src/features/datasets/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,5 @@ export type DatasetObject = {
extra?: string;
is_managed_externally: boolean;
normalize_columns: boolean;
always_filter_main_dttm: boolean;
};
4 changes: 4 additions & 0 deletions superset/connectors/sqla/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@ class SqlaTable(
template_params = Column(Text)
extra = Column(Text)
normalize_columns = Column(Boolean, default=False)
always_filter_main_dttm = Column(Boolean, default=False)

baselink = "tablemodelview"

Expand All @@ -564,6 +565,7 @@ class SqlaTable(
"fetch_values_predicate",
"extra",
"normalize_columns",
"always_filter_main_dttm",
]
update_from_object_fields = [f for f in export_fields if f != "database_id"]
export_parent = "database"
Expand Down Expand Up @@ -761,6 +763,8 @@ def data(self) -> dict[str, Any]:
data_["health_check_message"] = self.health_check_message
data_["extra"] = self.extra
data_["owners"] = self.owners_data
data_["always_filter_main_dttm"] = self.always_filter_main_dttm
data_["normalize_columns"] = self.normalize_columns
return data_

@property
Expand Down
7 changes: 7 additions & 0 deletions superset/connectors/sqla/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ class TableModelView( # pylint: disable=too-many-ancestors
"template_params",
"extra",
"normalize_columns",
"always_filter_main_dttm",
]
base_filters = [["id", DatasourceFilter, lambda: []]]
show_columns = edit_columns + ["perm", "slices"]
Expand Down Expand Up @@ -384,6 +385,12 @@ class TableModelView( # pylint: disable=too-many-ancestors
"Allow column names to be changed to case insensitive format, "
"if supported (e.g. Oracle, Snowflake)."
),
"always_filter_main_dttm": _(
"Datasets can have a main temporal column (main_dttm_col), "
"but can also have secondary time columns. "
"When this attribute is true, whenever the secondary columns are filtered, "
"the same filter is applied to the main datetime column."
),
}
label_columns = {
"slices": _("Associated Charts"),
Expand Down
1 change: 1 addition & 0 deletions superset/dashboards/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ class DashboardDatasetSchema(Schema):
time_grain_sqla = fields.List(fields.List(fields.Str()))
granularity_sqla = fields.List(fields.List(fields.Str()))
normalize_columns = fields.Bool()
always_filter_main_dttm = fields.Bool()


class BaseDashboardSchema(Schema):
Expand Down
2 changes: 2 additions & 0 deletions superset/datasets/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ class DatasetRestApi(BaseSupersetModelRestApi):
"description",
"main_dttm_col",
"normalize_columns",
"always_filter_main_dttm",
"offset",
"default_endpoint",
"cache_timeout",
Expand Down Expand Up @@ -221,6 +222,7 @@ class DatasetRestApi(BaseSupersetModelRestApi):
"description",
"main_dttm_col",
"normalize_columns",
"always_filter_main_dttm",
"offset",
"default_endpoint",
"cache_timeout",
Expand Down
1 change: 1 addition & 0 deletions superset/datasets/commands/duplicate.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def run(self) -> Model:
table.schema = self._base_model.schema
table.template_params = self._base_model.template_params
table.normalize_columns = self._base_model.normalize_columns
table.always_filter_main_dttm = self._base_model.always_filter_main_dttm
table.is_sqllab_view = True
table.sql = ParsedQuery(self._base_model.sql).stripped()
db.session.add(table)
Expand Down
4 changes: 4 additions & 0 deletions superset/datasets/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class DatasetPostSchema(Schema):
is_managed_externally = fields.Boolean(allow_none=True, dump_default=False)
external_url = fields.String(allow_none=True)
normalize_columns = fields.Boolean(load_default=False)
always_filter_main_dttm = fields.Boolean(load_default=False)


class DatasetPutSchema(Schema):
Expand All @@ -111,6 +112,7 @@ class DatasetPutSchema(Schema):
description = fields.String(allow_none=True)
main_dttm_col = fields.String(allow_none=True)
normalize_columns = fields.Boolean(allow_none=True, dump_default=False)
always_filter_main_dttm = fields.Boolean(load_default=False)
offset = fields.Integer(allow_none=True)
default_endpoint = fields.String(allow_none=True)
cache_timeout = fields.Integer(allow_none=True)
Expand Down Expand Up @@ -250,6 +252,7 @@ def fix_extra(self, data: dict[str, Any], **kwargs: Any) -> dict[str, Any]:
is_managed_externally = fields.Boolean(allow_none=True, dump_default=False)
external_url = fields.String(allow_none=True)
normalize_columns = fields.Boolean(load_default=False)
always_filter_main_dttm = fields.Boolean(load_default=False)


class GetOrCreateDatasetSchema(Schema):
Expand All @@ -266,6 +269,7 @@ class GetOrCreateDatasetSchema(Schema):
metadata={"description": "Template params for the table"}
)
normalize_columns = fields.Boolean(load_default=False)
always_filter_main_dttm = fields.Boolean(load_default=False)


class DatasetSchema(SQLAlchemyAutoSchema):
Expand Down
38 changes: 0 additions & 38 deletions superset/db_engine_specs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,44 +156,6 @@ FROM
GROUP BY
UPPER(country_of_origin)
```

### `time_secondary_columns = False`

Datasets can have a main datetime column (`main_dttm_col`), but can also have secondary time columns. When this attribute is true, wheneve the secondary columns are filtered, the same filter is applied to the main datetime column.

This might be useful if you have a table partitioned on a daily `ds` column in Hive (which doesn't support indexes), and a secondary column with the timestamp of the events, ie:

| ds | event | ... |
| ---------- | ------------------- | --- |
| 2023-01-01 | 2023-01-01 23:58:41 | ... |
| 2023-01-02 | 2023-01-02 00:03:17 | ... |
| 2023-01-02 | 2023-01-02 00:14:02 | ... |

With the table above, filtering only on `event` can be very innefective. For example, this query:

```sql
SELECT
*
FROM
some_table
WHERE
event BETWEEN '2023-01-02 00:00:00' AND '2023-01-02 01:00:00'
```

Would scan all the `ds` partitions, even though only one is needed! By setting the attribute to true, if `ds` is set as the main datetime column then the query would be generated as:

```sql
SELECT
*
FROM
some_table
WHERE
event BETWEEN '2023-01-02 00:00:00' AND '2023-01-02 01:00:00' AND
ds BETWEEN '2023-01-02 00:00:00' AND '2023-01-02 01:00:00'
```

Which reads data from a single partition instead.

### `time_groupby_inline = False`

In theory this attribute should be used to ommit time filters from the self-joins. When the attribute is false the time attribute will be present in the subquery used to compute limited series, eg:
Expand Down
1 change: 0 additions & 1 deletion superset/db_engine_specs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,6 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods
# Does database support join-free timeslot grouping
time_groupby_inline = False
limit_method = LimitMethod.FORCE_LIMIT
time_secondary_columns = False
allows_joins = True
allows_subqueries = True
allows_alias_in_select = True
Expand Down
1 change: 0 additions & 1 deletion superset/db_engine_specs/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
class ClickHouseBaseEngineSpec(BaseEngineSpec):
"""Shared engine spec for ClickHouse."""

time_secondary_columns = True
time_groupby_inline = True

_time_grain_expressions = {
Expand Down
2 changes: 0 additions & 2 deletions superset/db_engine_specs/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ class ElasticSearchEngineSpec(BaseEngineSpec): # pylint: disable=abstract-metho
engine = "elasticsearch"
engine_name = "ElasticSearch (SQL API)"
time_groupby_inline = True
time_secondary_columns = True
allows_joins = False
allows_subqueries = True
allows_sql_comments = False
Expand Down Expand Up @@ -98,7 +97,6 @@ def convert_dttm(

class OpenDistroEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method
time_groupby_inline = True
time_secondary_columns = True
allows_joins = False
allows_subqueries = True
allows_sql_comments = False
Expand Down
2 changes: 0 additions & 2 deletions superset/db_engine_specs/kusto.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class KustoSqlEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method
engine = "kustosql"
engine_name = "KustoSQL"
time_groupby_inline = True
time_secondary_columns = True
allows_joins = True
allows_subqueries = True
allows_sql_comments = False
Expand Down Expand Up @@ -116,7 +115,6 @@ class KustoKqlEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method
engine = "kustokql"
engine_name = "KustoKQL"
time_groupby_inline = True
time_secondary_columns = True
allows_joins = True
allows_subqueries = True
allows_sql_comments = False
Expand Down
1 change: 0 additions & 1 deletion superset/db_engine_specs/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ def diagnose(spec: type[BaseEngineSpec]) -> dict[str, Any]:
"subqueries": spec.allows_subqueries,
"alias_in_select": spec.allows_alias_in_select,
"alias_in_orderby": spec.allows_alias_in_orderby,
"secondary_time_columns": spec.time_secondary_columns,
"time_groupby_inline": spec.time_groupby_inline,
"alias_to_source_column": not spec.allows_alias_to_source_column,
"order_by_not_in_select": spec.allows_hidden_orderby_agg,
Expand Down
1 change: 0 additions & 1 deletion superset/db_engine_specs/solr.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ class SolrEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method
engine_name = "Apache Solr"

time_groupby_inline = False
time_secondary_columns = False
allows_joins = False
allows_subqueries = False

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Added always_filter_main_dttm to datasource
Revision ID: 317970b4400c
Revises: ec54aca4c8a2
Create Date: 2023-09-06 13:18:59.597259
"""

# revision identifiers, used by Alembic.
revision = "317970b4400c"
down_revision = "ec54aca4c8a2"

import sqlalchemy as sa
from alembic import op
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session

from superset import db
from superset.migrations.shared.utils import paginated_update

Base = declarative_base()


class SqlaTable(Base):
__tablename__ = "tables"

id = sa.Column(sa.Integer, primary_key=True)
always_filter_main_dttm = sa.Column(sa.Boolean())


def upgrade():
op.add_column(
"tables",
sa.Column(
"always_filter_main_dttm",
sa.Boolean(),
nullable=True,
default=False,
server_default=sa.false(),
),
)

bind = op.get_bind()
session = db.Session(bind=bind)

for table in paginated_update(session.query(SqlaTable)):
table.always_filter_main_dttm = False


def downgrade():
op.drop_column("tables", "always_filter_main_dttm")
6 changes: 5 additions & 1 deletion superset/models/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,10 @@ def offset(self) -> int:
def main_dttm_col(self) -> Optional[str]:
raise NotImplementedError()

@property
def always_filter_main_dttm(self) -> Optional[bool]:
return False

@property
def dttm_cols(self) -> list[str]:
raise NotImplementedError()
Expand Down Expand Up @@ -1676,7 +1680,7 @@ def get_sqla_query( # pylint: disable=too-many-arguments,too-many-locals,too-ma

# Use main dttm column to support index with secondary dttm columns.
if (
db_engine_spec.time_secondary_columns
self.always_filter_main_dttm
and self.main_dttm_col in self.dttm_cols
and self.main_dttm_col != dttm_col.column_name
):
Expand Down
4 changes: 4 additions & 0 deletions superset/views/datasource/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class ExternalMetadataParams(TypedDict):
schema_name: str
table_name: str
normalize_columns: Optional[bool]
always_filter_main_dttm: Optional[bool]


get_external_metadata_schema = {
Expand All @@ -37,6 +38,7 @@ class ExternalMetadataParams(TypedDict):
"schema_name": "string",
"table_name": "string",
"normalize_columns": "boolean",
"always_filter_main_dttm": "boolean",
}


Expand All @@ -46,6 +48,7 @@ class ExternalMetadataSchema(Schema):
schema_name = fields.Str(allow_none=True)
table_name = fields.Str(required=True)
normalize_columns = fields.Bool(allow_none=True)
always_filter_main_dttm = fields.Bool(allow_none=True)

# pylint: disable=unused-argument
@post_load
Expand All @@ -60,6 +63,7 @@ def normalize(
schema_name=data.get("schema_name", ""),
table_name=data["table_name"],
normalize_columns=data["normalize_columns"],
always_filter_main_dttm=data["always_filter_main_dttm"],
)


Expand Down
2 changes: 2 additions & 0 deletions superset/views/datasource/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ def save(self) -> FlaskResponse:

datasource_dict = json.loads(data)
normalize_columns = datasource_dict.get("normalize_columns", False)
always_filter_main_dttm = datasource_dict.get("always_filter_main_dttm", False)
datasource_dict["normalize_columns"] = normalize_columns
datasource_dict["always_filter_main_dttm"] = always_filter_main_dttm
datasource_id = datasource_dict.get("id")
datasource_type = datasource_dict.get("type")
database_id = datasource_dict["database"].get("id")
Expand Down
Loading

0 comments on commit 14c3249

Please sign in to comment.