From f07cb031e279cd2caa1a2d2390eadb02e5c83fb8 Mon Sep 17 00:00:00 2001 From: Michael Gregory <> Date: Sat, 18 Mar 2023 12:49:35 +0100 Subject: [PATCH 1/4] fixes #100 --- .../operators/great_expectations.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/great_expectations_provider/operators/great_expectations.py b/great_expectations_provider/operators/great_expectations.py index c2911d3..499ea19 100644 --- a/great_expectations_provider/operators/great_expectations.py +++ b/great_expectations_provider/operators/great_expectations.py @@ -269,7 +269,7 @@ def make_connection_configuration(self) -> Dict[str, str]: self.conn.extra_dejson.get("account") or self.conn.extra_dejson["extra__snowflake__account"] ) snowflake_region = ( - self.conn.extra_dejson.get("region") or self.conn.extra_dejson["extra__snowflake__region"] + self.conn.extra_dejson.get("region") or self.conn.extra_dejson.get("extra__snowflake__region") #Snowflake region can be None for us-west-2 ) snowflake_database = ( self.conn.extra_dejson.get("database") or self.conn.extra_dejson["extra__snowflake__database"] @@ -279,7 +279,10 @@ def make_connection_configuration(self) -> Dict[str, str]: ) snowflake_role = self.conn.extra_dejson.get("role") or self.conn.extra_dejson["extra__snowflake__role"] - uri_string = f"snowflake://{self.conn.login}:{self.conn.password}@{snowflake_account}.{snowflake_region}/{snowflake_database}/{self.schema}?warehouse={snowflake_warehouse}&role={snowflake_role}" # noqa + if snowflake_region: + uri_string = f"snowflake://{self.conn.login}:{self.conn.password}@{snowflake_account}.{snowflake_region}/{snowflake_database}/{self.schema}?warehouse={snowflake_warehouse}&role={snowflake_role}" # noqa + else: + uri_string = f"snowflake://{self.conn.login}:{self.conn.password}@{snowflake_account}/{snowflake_database}/{self.schema}?warehouse={snowflake_warehouse}&role={snowflake_role}" # noqa elif conn_type == "gcpbigquery": uri_string = f"{self.conn.host}{self.schema}" From 371960474b91fadb84545dfd80f7143740c220b7 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 20 Mar 2023 14:00:57 +0000 Subject: [PATCH 2/4] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- great_expectations_provider/operators/great_expectations.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/great_expectations_provider/operators/great_expectations.py b/great_expectations_provider/operators/great_expectations.py index 499ea19..3f172c2 100644 --- a/great_expectations_provider/operators/great_expectations.py +++ b/great_expectations_provider/operators/great_expectations.py @@ -268,9 +268,9 @@ def make_connection_configuration(self) -> Dict[str, str]: snowflake_account = ( self.conn.extra_dejson.get("account") or self.conn.extra_dejson["extra__snowflake__account"] ) - snowflake_region = ( - self.conn.extra_dejson.get("region") or self.conn.extra_dejson.get("extra__snowflake__region") #Snowflake region can be None for us-west-2 - ) + snowflake_region = self.conn.extra_dejson.get("region") or self.conn.extra_dejson.get( + "extra__snowflake__region" + ) # Snowflake region can be None for us-west-2 snowflake_database = ( self.conn.extra_dejson.get("database") or self.conn.extra_dejson["extra__snowflake__database"] ) From 87a42e275705d413cd4482134fc0d94fa1a68e6f Mon Sep 17 00:00:00 2001 From: Michael Gregory <> Date: Sat, 29 Apr 2023 09:33:54 +0200 Subject: [PATCH 3/4] fixes #110 and #107 --- .../operators/great_expectations.py | 51 +++++++++++++------ 1 file changed, 35 insertions(+), 16 deletions(-) diff --git a/great_expectations_provider/operators/great_expectations.py b/great_expectations_provider/operators/great_expectations.py index eecb6bf..88aa571 100644 --- a/great_expectations_provider/operators/great_expectations.py +++ b/great_expectations_provider/operators/great_expectations.py @@ -112,6 +112,8 @@ class GreatExpectationsOperator(BaseOperator): :type return_json_dict: bool :param use_open_lineage: If True (default), creates an OpenLineage action if an OpenLineage environment is found :type use_open_lineage: bool + :param database: If provided, overwrites the default database provided by the connection + :type database: Optional[str] :param schema: If provided, overwrites the default schema provided by the connection :type schema: Optional[str] """ @@ -148,6 +150,7 @@ def __init__( return_json_dict: bool = False, use_open_lineage: bool = True, schema: Optional[str] = None, + database: Optional[str] = None, *args, **kwargs, ) -> None: @@ -175,6 +178,7 @@ def __init__( self.datasource: Optional[Datasource] = None self.batch_request: Optional[BatchRequestBase] = None self.schema = schema + self.database = database self.kwargs = kwargs if self.is_dataframe and self.query_to_validate: @@ -227,14 +231,29 @@ def __init__( if isinstance(self.checkpoint_config, CheckpointConfig): self.checkpoint_config = deep_filter_properties_iterable(properties=self.checkpoint_config.to_dict()) - # If a schema is passed as part of the data_asset_name, use that schema - if self.data_asset_name and "." in self.data_asset_name: - # Assume data_asset_name is in the form "SCHEMA.TABLE" - # Schema parameter always takes priority + # If a schema and db are passed as part of the data_asset_name, use that schema/db + if self.data_asset_name: + # Check if data_asset_name is in the form "SCHEMA.TABLE" or "DATABASE.TABLE.SCHEMA" + # Database and Schema parameters always takes priority asset_list = self.data_asset_name.split(".") - self.schema = self.schema or asset_list[0] + # Update data_asset_name to be only the table - self.data_asset_name = asset_list[1] + self.data_asset_name = asset_list.pop(-1) + + if asset_list: + schema_name = asset_list.pop(-1) + if self.schema and schema_name: + print("Using Operator argument for 'schema' instead of schema in fully-qualified table name.") + self.schema = self.schema or schema_name + + if asset_list: + database_name = asset_list.pop(-1) + if self.database and database_name: + print("Using Operator argument for 'database' instead of database in fully-qualified table name.") + self.database = self.database or database_name + + if asset_list: + raise AirflowException('Parameter data_asset_name must be specified as TABLE, SCHEMA.TABLE or DATABASE.SCHEMA.TABLE.') def make_connection_configuration(self) -> Dict[str, str]: """Builds connection strings based off existing Airflow connections. Only supports necessary extras.""" @@ -243,15 +262,14 @@ def make_connection_configuration(self) -> Dict[str, str]: raise ValueError(f"Connections does not exist in Airflow for conn_id: {self.conn_id}") self.schema = self.schema or self.conn.schema conn_type = self.conn.conn_type - if conn_type in ("redshift", "postgres", "mysql", "mssql"): - odbc_connector = "" - if conn_type in ("redshift", "postgres"): - odbc_connector = "postgresql+psycopg2" - elif conn_type == "mysql": - odbc_connector = "mysql" - else: - odbc_connector = "mssql+pyodbc" - uri_string = f"{odbc_connector}://{self.conn.login}:{self.conn.password}@{self.conn.host}:{self.conn.port}/{self.schema}" # noqa + + #Postgres uses database instead of schema. User must pass database and schema via Operator args. + if conn_type in ("redshift", "postgres"): + uri_string = f"postgresql+psycopg2://{self.conn.login}:{self.conn.password}@{self.conn.host}:{self.conn.port}/{self.database}?options=-csearch_path%3D{self.schema}" # noqa + elif conn_type == "mysql": + uri_string = f"mysql://{self.conn.login}:{self.conn.password}@{self.conn.host}:{self.conn.port}/{self.schema}" # noqa + elif conn_type == "mssql": + uri_string = f"mssql+pyodbc://{self.conn.login}:{self.conn.password}@{self.conn.host}:{self.conn.port}/{self.schema}" # noqa elif conn_type == "snowflake": try: return self.build_snowflake_connection_config_from_hook() @@ -316,9 +334,10 @@ def build_snowflake_connection_config_from_hook(self) -> Dict[str, str]: hook = SnowflakeHook(snowflake_conn_id=self.conn_id) - # Support the operator overriding the schema + # Support the operator overriding the schema and database # which is necessary for temp tables. hook.schema = self.schema or hook.schema + hook.database = self.database or hook.database conn = hook.get_connection(self.conn_id) engine = hook.get_sqlalchemy_engine() From a5a79556c677723edd50f58f1570c9118a52ec38 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 29 Apr 2023 07:39:22 +0000 Subject: [PATCH 4/4] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../operators/great_expectations.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/great_expectations_provider/operators/great_expectations.py b/great_expectations_provider/operators/great_expectations.py index 88aa571..b2d1989 100644 --- a/great_expectations_provider/operators/great_expectations.py +++ b/great_expectations_provider/operators/great_expectations.py @@ -236,24 +236,26 @@ def __init__( # Check if data_asset_name is in the form "SCHEMA.TABLE" or "DATABASE.TABLE.SCHEMA" # Database and Schema parameters always takes priority asset_list = self.data_asset_name.split(".") - + # Update data_asset_name to be only the table self.data_asset_name = asset_list.pop(-1) - if asset_list: + if asset_list: schema_name = asset_list.pop(-1) if self.schema and schema_name: print("Using Operator argument for 'schema' instead of schema in fully-qualified table name.") self.schema = self.schema or schema_name - - if asset_list: + + if asset_list: database_name = asset_list.pop(-1) if self.database and database_name: print("Using Operator argument for 'database' instead of database in fully-qualified table name.") self.database = self.database or database_name - if asset_list: - raise AirflowException('Parameter data_asset_name must be specified as TABLE, SCHEMA.TABLE or DATABASE.SCHEMA.TABLE.') + if asset_list: + raise AirflowException( + "Parameter data_asset_name must be specified as TABLE, SCHEMA.TABLE or DATABASE.SCHEMA.TABLE." + ) def make_connection_configuration(self) -> Dict[str, str]: """Builds connection strings based off existing Airflow connections. Only supports necessary extras.""" @@ -262,8 +264,8 @@ def make_connection_configuration(self) -> Dict[str, str]: raise ValueError(f"Connections does not exist in Airflow for conn_id: {self.conn_id}") self.schema = self.schema or self.conn.schema conn_type = self.conn.conn_type - - #Postgres uses database instead of schema. User must pass database and schema via Operator args. + + # Postgres uses database instead of schema. User must pass database and schema via Operator args. if conn_type in ("redshift", "postgres"): uri_string = f"postgresql+psycopg2://{self.conn.login}:{self.conn.password}@{self.conn.host}:{self.conn.port}/{self.database}?options=-csearch_path%3D{self.schema}" # noqa elif conn_type == "mysql":