diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py index 731c9141c4cde..de0d4f8711f53 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py @@ -262,7 +262,7 @@ def extract(self) -> Optional[TaskMetadata]: ) operator: "BigQueryInsertJobOperator" = self.operator - sql = operator.configuration.get("query") + sql = operator.configuration.get("query", {}).get("query") if not sql: self.log.warning("No query found in BigQueryInsertJobOperator") return None diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py index 15f76a8b1e1d0..ed155a35a925c 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py @@ -247,10 +247,11 @@ def _extract_lineage( SQL_PARSING_RESULT_KEY, None ) if sql_parsing_result: - if sql_parsing_result.debug_info.error: - datajob.properties["datahub_sql_parser_error"] = str( - sql_parsing_result.debug_info.error - ) + if error := sql_parsing_result.debug_info.error: + logger.info(f"SQL parsing error: {error}", exc_info=error) + datajob.properties[ + "datahub_sql_parser_error" + ] = f"{type(error).__name__}: {error}" if not sql_parsing_result.debug_info.table_error: input_urns.extend(sql_parsing_result.in_tables) output_urns.extend(sql_parsing_result.out_tables) diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json index e85a07b194e4f..693e9b6120a1c 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json @@ -38,6 +38,7 @@ } } ], + "ownerTypes": {}, "lastModified": { "time": 0, "actor": "urn:li:corpuser:airflow" @@ -76,7 +77,7 @@ "downstream_task_ids": "['populate_cost_table']", "inlets": "[]", "outlets": "[]", - "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE TABLE IF NOT EXISTS costs (\\n id INTEGER PRIMARY KEY,\\n month TEXT NOT NULL,\\n total_cost REAL NOT NULL,\\n area REAL NOT NULL\\n )\\n \"}" + "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE TABLE IF NOT EXISTS costs (\\n id INTEGER PRIMARY KEY,\\n month TEXT NOT NULL,\\n total_cost REAL NOT NULL,\\n area REAL NOT NULL\\n )\\n \"}" }, "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=sqlite_operator&_flt_3_task_id=create_cost_table", "name": "create_cost_table", @@ -166,6 +167,7 @@ } } ], + "ownerTypes": {}, "lastModified": { "time": 0, "actor": "urn:li:corpuser:airflow" @@ -212,7 +214,7 @@ "name": "sqlite_operator_create_cost_table_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1707253651059, + "time": 1716506459310, "actor": "urn:li:corpuser:datahub" } } @@ -261,7 +263,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1707253651059, + "timestampMillis": 1716506459310, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -278,14 +280,14 @@ "aspectName": "operation", "aspect": { "json": { - "timestampMillis": 1714672059338, + "timestampMillis": 1716506459665, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" }, "actor": "urn:li:corpuser:airflow", "operationType": "CREATE", - "lastUpdatedTimestamp": 1714672059338 + "lastUpdatedTimestamp": 1716506459665 } } }, @@ -309,7 +311,7 @@ "downstream_task_ids": "['populate_cost_table']", "inlets": "[]", "outlets": "[]", - "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE TABLE IF NOT EXISTS costs (\\n id INTEGER PRIMARY KEY,\\n month TEXT NOT NULL,\\n total_cost REAL NOT NULL,\\n area REAL NOT NULL\\n )\\n \"}" + "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE TABLE IF NOT EXISTS costs (\\n id INTEGER PRIMARY KEY,\\n month TEXT NOT NULL,\\n total_cost REAL NOT NULL,\\n area REAL NOT NULL\\n )\\n \"}" }, "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=sqlite_operator&_flt_3_task_id=create_cost_table", "name": "create_cost_table", @@ -435,6 +437,7 @@ } } ], + "ownerTypes": {}, "lastModified": { "time": 0, "actor": "urn:li:corpuser:airflow" @@ -460,7 +463,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1707253651425, + "timestampMillis": 1716506459692, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -493,7 +496,9 @@ "downstream_task_ids": "['transform_cost_table']", "inlets": "[]", "outlets": "[]", - "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n INSERT INTO costs (id, month, total_cost, area)\\n VALUES\\n (1, '2021-01', 100, 10),\\n (2, '2021-02', 200, 20),\\n (3, '2021-03', 300, 30)\\n \"}" + "datahub_sql_parser_error": "SqlUnderstandingError: Failed to build scope for statement - scope was empty: INSERT INTO public.costs (id, month, total_cost, area) VALUES (1, '2021-01', 100, 10), (2, '2021-02', 200, 20), (3, '2021-03', 300, 30)", + "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n INSERT INTO costs (id, month, total_cost, area)\\n VALUES\\n (1, '2021-01', 100, 10),\\n (2, '2021-02', 200, 20),\\n (3, '2021-03', 300, 30)\\n \"}", + "openlineage_run_facet_extractionError": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ExtractionErrorRunFacet\", \"errors\": [{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"errorMessage\": \"Failed to build scope for statement - scope was empty: INSERT INTO public.costs (id, month, total_cost, area) VALUES (1, '2021-01', 100, 10), (2, '2021-02', 200, 20), (3, '2021-03', 300, 30)\", \"task\": \"datahub_sql_parser\"}], \"failedTasks\": 1, \"totalTasks\": 1}" }, "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=sqlite_operator&_flt_3_task_id=populate_cost_table", "name": "populate_cost_table", @@ -548,6 +553,7 @@ } } ], + "ownerTypes": {}, "lastModified": { "time": 0, "actor": "urn:li:corpuser:airflow" @@ -594,7 +600,7 @@ "name": "sqlite_operator_populate_cost_table_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1707253655698, + "time": 1716506463946, "actor": "urn:li:corpuser:datahub" } } @@ -643,7 +649,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1707253655698, + "timestampMillis": 1716506463946, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -660,14 +666,14 @@ "aspectName": "operation", "aspect": { "json": { - "timestampMillis": 1714672062927, + "timestampMillis": 1716506464455, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" }, "actor": "urn:li:corpuser:airflow", "operationType": "CREATE", - "lastUpdatedTimestamp": 1714672062927 + "lastUpdatedTimestamp": 1716506464455 } } }, @@ -691,7 +697,9 @@ "downstream_task_ids": "['transform_cost_table']", "inlets": "[]", "outlets": "[]", - "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n INSERT INTO costs (id, month, total_cost, area)\\n VALUES\\n (1, '2021-01', 100, 10),\\n (2, '2021-02', 200, 20),\\n (3, '2021-03', 300, 30)\\n \"}" + "datahub_sql_parser_error": "SqlUnderstandingError: Failed to build scope for statement - scope was empty: INSERT INTO public.costs (id, month, total_cost, area) VALUES (1, '2021-01', 100, 10), (2, '2021-02', 200, 20), (3, '2021-03', 300, 30)", + "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n INSERT INTO costs (id, month, total_cost, area)\\n VALUES\\n (1, '2021-01', 100, 10),\\n (2, '2021-02', 200, 20),\\n (3, '2021-03', 300, 30)\\n \"}", + "openlineage_run_facet_extractionError": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ExtractionErrorRunFacet\", \"errors\": [{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"errorMessage\": \"Failed to build scope for statement - scope was empty: INSERT INTO public.costs (id, month, total_cost, area) VALUES (1, '2021-01', 100, 10), (2, '2021-02', 200, 20), (3, '2021-03', 300, 30)\", \"task\": \"datahub_sql_parser\"}], \"failedTasks\": 1, \"totalTasks\": 1}" }, "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=sqlite_operator&_flt_3_task_id=populate_cost_table", "name": "populate_cost_table", @@ -746,6 +754,7 @@ } } ], + "ownerTypes": {}, "lastModified": { "time": 0, "actor": "urn:li:corpuser:airflow" @@ -771,7 +780,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1707253656320, + "timestampMillis": 1716506464494, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -804,7 +813,7 @@ "downstream_task_ids": "['cleanup_costs', 'cleanup_processed_costs']", "inlets": "[]", "outlets": "[]", - "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE TABLE IF NOT EXISTS processed_costs AS\\n SELECT\\n id,\\n month,\\n total_cost,\\n area,\\n total_cost / area as cost_per_area\\n FROM costs\\n \"}" + "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE TABLE IF NOT EXISTS processed_costs AS\\n SELECT\\n id,\\n month,\\n total_cost,\\n area,\\n total_cost / area as cost_per_area\\n FROM costs\\n \"}" }, "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=sqlite_operator&_flt_3_task_id=transform_cost_table", "name": "transform_cost_table", @@ -929,6 +938,7 @@ } } ], + "ownerTypes": {}, "lastModified": { "time": 0, "actor": "urn:li:corpuser:airflow" @@ -975,7 +985,7 @@ "name": "sqlite_operator_transform_cost_table_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1707253660584, + "time": 1716506468706, "actor": "urn:li:corpuser:datahub" } } @@ -1048,7 +1058,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1707253660584, + "timestampMillis": 1716506468706, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1065,14 +1075,14 @@ "aspectName": "operation", "aspect": { "json": { - "timestampMillis": 1714672066747, + "timestampMillis": 1716506469563, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" }, "actor": "urn:li:corpuser:airflow", "operationType": "CREATE", - "lastUpdatedTimestamp": 1714672066747 + "lastUpdatedTimestamp": 1716506469563 } } }, @@ -1096,7 +1106,7 @@ "downstream_task_ids": "['cleanup_costs', 'cleanup_processed_costs']", "inlets": "[]", "outlets": "[]", - "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE TABLE IF NOT EXISTS processed_costs AS\\n SELECT\\n id,\\n month,\\n total_cost,\\n area,\\n total_cost / area as cost_per_area\\n FROM costs\\n \"}" + "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE TABLE IF NOT EXISTS processed_costs AS\\n SELECT\\n id,\\n month,\\n total_cost,\\n area,\\n total_cost / area as cost_per_area\\n FROM costs\\n \"}" }, "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=sqlite_operator&_flt_3_task_id=transform_cost_table", "name": "transform_cost_table", @@ -1277,6 +1287,7 @@ } } ], + "ownerTypes": {}, "lastModified": { "time": 0, "actor": "urn:li:corpuser:airflow" @@ -1302,7 +1313,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1707253661682, + "timestampMillis": 1716506469626, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1335,9 +1346,9 @@ "downstream_task_ids": "[]", "inlets": "[]", "outlets": "[]", - "datahub_sql_parser_error": "Can only generate column-level lineage for select-like inner statements, not (outer statement type: )", - "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n DROP TABLE costs\\n \"}", - "openlineage_run_facet_extractionError": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ExtractionErrorRunFacet\", \"errors\": [{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"errorMessage\": \"Can only generate column-level lineage for select-like inner statements, not (outer statement type: )\", \"task\": \"datahub_sql_parser\"}], \"failedTasks\": 1, \"totalTasks\": 1}" + "datahub_sql_parser_error": "UnsupportedStatementTypeError: Can only generate column-level lineage for select-like inner statements, not (outer statement type: )", + "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n DROP TABLE costs\\n \"}", + "openlineage_run_facet_extractionError": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ExtractionErrorRunFacet\", \"errors\": [{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"errorMessage\": \"Can only generate column-level lineage for select-like inner statements, not (outer statement type: )\", \"task\": \"datahub_sql_parser\"}], \"failedTasks\": 1, \"totalTasks\": 1}" }, "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=sqlite_operator&_flt_3_task_id=cleanup_costs", "name": "cleanup_costs", @@ -1392,6 +1403,7 @@ } } ], + "ownerTypes": {}, "lastModified": { "time": 0, "actor": "urn:li:corpuser:airflow" @@ -1438,7 +1450,7 @@ "name": "sqlite_operator_cleanup_costs_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1707253669241, + "time": 1716506477141, "actor": "urn:li:corpuser:datahub" } } @@ -1487,7 +1499,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1707253669241, + "timestampMillis": 1716506477141, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1517,9 +1529,9 @@ "downstream_task_ids": "[]", "inlets": "[]", "outlets": "[]", - "datahub_sql_parser_error": "Can only generate column-level lineage for select-like inner statements, not (outer statement type: )", - "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n DROP TABLE costs\\n \"}", - "openlineage_run_facet_extractionError": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ExtractionErrorRunFacet\", \"errors\": [{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"errorMessage\": \"Can only generate column-level lineage for select-like inner statements, not (outer statement type: )\", \"task\": \"datahub_sql_parser\"}], \"failedTasks\": 1, \"totalTasks\": 1}" + "datahub_sql_parser_error": "UnsupportedStatementTypeError: Can only generate column-level lineage for select-like inner statements, not (outer statement type: )", + "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n DROP TABLE costs\\n \"}", + "openlineage_run_facet_extractionError": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ExtractionErrorRunFacet\", \"errors\": [{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"errorMessage\": \"Can only generate column-level lineage for select-like inner statements, not (outer statement type: )\", \"task\": \"datahub_sql_parser\"}], \"failedTasks\": 1, \"totalTasks\": 1}" }, "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=sqlite_operator&_flt_3_task_id=cleanup_costs", "name": "cleanup_costs", @@ -1574,6 +1586,7 @@ } } ], + "ownerTypes": {}, "lastModified": { "time": 0, "actor": "urn:li:corpuser:airflow" @@ -1599,7 +1612,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1707253670409, + "timestampMillis": 1716506478016, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1632,9 +1645,9 @@ "downstream_task_ids": "[]", "inlets": "[]", "outlets": "[]", - "datahub_sql_parser_error": "Can only generate column-level lineage for select-like inner statements, not (outer statement type: )", - "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n DROP TABLE processed_costs\\n \"}", - "openlineage_run_facet_extractionError": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ExtractionErrorRunFacet\", \"errors\": [{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"errorMessage\": \"Can only generate column-level lineage for select-like inner statements, not (outer statement type: )\", \"task\": \"datahub_sql_parser\"}], \"failedTasks\": 1, \"totalTasks\": 1}" + "datahub_sql_parser_error": "UnsupportedStatementTypeError: Can only generate column-level lineage for select-like inner statements, not (outer statement type: )", + "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n DROP TABLE processed_costs\\n \"}", + "openlineage_run_facet_extractionError": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ExtractionErrorRunFacet\", \"errors\": [{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"errorMessage\": \"Can only generate column-level lineage for select-like inner statements, not (outer statement type: )\", \"task\": \"datahub_sql_parser\"}], \"failedTasks\": 1, \"totalTasks\": 1}" }, "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=sqlite_operator&_flt_3_task_id=cleanup_processed_costs", "name": "cleanup_processed_costs", @@ -1689,6 +1702,7 @@ } } ], + "ownerTypes": {}, "lastModified": { "time": 0, "actor": "urn:li:corpuser:airflow" @@ -1735,7 +1749,7 @@ "name": "sqlite_operator_cleanup_processed_costs_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1707253675107, + "time": 1716506482495, "actor": "urn:li:corpuser:datahub" } } @@ -1784,7 +1798,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1707253675107, + "timestampMillis": 1716506482495, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1814,9 +1828,9 @@ "downstream_task_ids": "[]", "inlets": "[]", "outlets": "[]", - "datahub_sql_parser_error": "Can only generate column-level lineage for select-like inner statements, not (outer statement type: )", - "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n DROP TABLE processed_costs\\n \"}", - "openlineage_run_facet_extractionError": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ExtractionErrorRunFacet\", \"errors\": [{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"errorMessage\": \"Can only generate column-level lineage for select-like inner statements, not (outer statement type: )\", \"task\": \"datahub_sql_parser\"}], \"failedTasks\": 1, \"totalTasks\": 1}" + "datahub_sql_parser_error": "UnsupportedStatementTypeError: Can only generate column-level lineage for select-like inner statements, not (outer statement type: )", + "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n DROP TABLE processed_costs\\n \"}", + "openlineage_run_facet_extractionError": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ExtractionErrorRunFacet\", \"errors\": [{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"errorMessage\": \"Can only generate column-level lineage for select-like inner statements, not (outer statement type: )\", \"task\": \"datahub_sql_parser\"}], \"failedTasks\": 1, \"totalTasks\": 1}" }, "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=sqlite_operator&_flt_3_task_id=cleanup_processed_costs", "name": "cleanup_processed_costs", @@ -1871,6 +1885,7 @@ } } ], + "ownerTypes": {}, "lastModified": { "time": 0, "actor": "urn:li:corpuser:airflow" @@ -1896,7 +1911,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1707253676482, + "timestampMillis": 1716506483469, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py index 54ef69215ea4c..005969aeba732 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py @@ -373,6 +373,7 @@ def test_airflow_plugin( golden_path=golden_path, ignore_paths=[ # TODO: If we switched to Git urls, maybe we could get this to work consistently. + r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['datahub_sql_parser_error'\]", r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['openlineage_.*'\]", ], ) diff --git a/metadata-ingestion-modules/airflow-plugin/tox.ini b/metadata-ingestion-modules/airflow-plugin/tox.ini index 5fbce2add9e88..b154f92fe553f 100644 --- a/metadata-ingestion-modules/airflow-plugin/tox.ini +++ b/metadata-ingestion-modules/airflow-plugin/tox.ini @@ -4,7 +4,7 @@ # and then run "tox" from this directory. [tox] -envlist = py38-airflow21, py38-airflow22, py310-airflow24, py310-airflow26, py310-airflow27, py310-airflow28 +envlist = py38-airflow21, py38-airflow22, py310-airflow24, py310-airflow26, py310-airflow27, py310-airflow28, py310-airflow29 [testenv] use_develop = true @@ -19,6 +19,7 @@ deps = airflow26: apache-airflow~=2.6.0 airflow27: apache-airflow~=2.7.0 airflow28: apache-airflow~=2.8.0 + airflow29: apache-airflow~=2.9.0 # Respect the Airflow constraints files. # We can't make ourselves work with the constraints of Airflow < 2.3. @@ -26,6 +27,7 @@ deps = py310-airflow26: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-3.10.txt py310-airflow27: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.7.3/constraints-3.10.txt py310-airflow28: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.10.txt + py310-airflow29: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.9.1/constraints-3.10.txt # Before pinning to the constraint files, we previously left the dependencies # more open. There were a number of packages for which this caused issues. @@ -53,6 +55,6 @@ commands = [testenv:py310-airflow24] extras = dev,integration-tests,plugin-v2,test-airflow24 -[testenv:py310-airflow{26,27,28}] +[testenv:py310-airflow{26,27,28,29}] extras = dev,integration-tests,plugin-v2 diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 60f1812ec4e69..2bfc94d13aa14 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -98,8 +98,8 @@ sqlglot_lib = { # Using an Acryl fork of sqlglot. - # https://github.com/tobymao/sqlglot/compare/main...hsheth2:sqlglot:hsheth?expand=1 - "acryl-sqlglot[rs]==23.17.1.dev10", + # https://github.com/tobymao/sqlglot/compare/main...hsheth2:sqlglot:main?expand=1 + "acryl-sqlglot[rs]==24.0.1.dev7", } classification_lib = { diff --git a/metadata-ingestion/src/datahub/ingestion/api/report.py b/metadata-ingestion/src/datahub/ingestion/api/report.py index 4a74d6cbc6268..ade2832f1b669 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/report.py +++ b/metadata-ingestion/src/datahub/ingestion/api/report.py @@ -41,7 +41,7 @@ def to_pure_python_obj(some_val: Any) -> Any: if isinstance(some_val, SupportsAsObj): return some_val.as_obj() elif isinstance(some_val, pydantic.BaseModel): - return some_val.dict() + return Report.to_pure_python_obj(some_val.dict()) elif dataclasses.is_dataclass(some_val): return dataclasses.asdict(some_val) elif isinstance(some_val, list): diff --git a/metadata-ingestion/src/datahub/sql_parsing/_models.py b/metadata-ingestion/src/datahub/sql_parsing/_models.py index bbd3a9c9d11f8..23594534b5fbb 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/_models.py +++ b/metadata-ingestion/src/datahub/sql_parsing/_models.py @@ -29,7 +29,13 @@ def __lt__(self, other: "_FrozenModel") -> bool: for field in self.__fields__: self_v = getattr(self, field) other_v = getattr(other, field) - if self_v != other_v: + + # Handle None values by pushing them to the end of the ordering. + if self_v is None and other_v is not None: + return False + elif self_v is not None and other_v is None: + return True + elif self_v != other_v: return self_v < other_v return False diff --git a/metadata-ingestion/src/datahub/sql_parsing/query_types.py b/metadata-ingestion/src/datahub/sql_parsing/query_types.py new file mode 100644 index 0000000000000..2acad19418c11 --- /dev/null +++ b/metadata-ingestion/src/datahub/sql_parsing/query_types.py @@ -0,0 +1,75 @@ +from typing import Optional, Tuple + +import sqlglot + +from datahub.sql_parsing.sql_parsing_common import QueryType, QueryTypeProps +from datahub.sql_parsing.sqlglot_utils import ( + DialectOrStr, + get_dialect, + is_dialect_instance, +) + + +def _is_temp_table(table: sqlglot.exp.Table, dialect: sqlglot.Dialect) -> bool: + identifier: sqlglot.exp.Identifier = table.this + + return identifier.args.get("temporary") or ( + is_dialect_instance(dialect, "redshift") and identifier.name.startswith("#") + ) + + +def _get_create_type_from_kind(kind: Optional[str]) -> QueryType: + if kind and "TABLE" in kind: + return QueryType.CREATE_TABLE_AS_SELECT + elif kind and "VIEW" in kind: + return QueryType.CREATE_VIEW + else: + return QueryType.CREATE_OTHER + + +def get_query_type_of_sql( + expression: sqlglot.exp.Expression, dialect: DialectOrStr +) -> Tuple[QueryType, QueryTypeProps]: + dialect = get_dialect(dialect) + query_type_props: QueryTypeProps = {} + + # For creates, we need to look at the inner expression. + if isinstance(expression, sqlglot.exp.Create): + if is_create_table_ddl(expression): + return QueryType.CREATE_DDL, query_type_props + + kind = expression.args.get("kind") + if kind: + kind = kind.upper() + query_type_props["kind"] = kind + + target = expression.this + if any( + isinstance(prop, sqlglot.exp.TemporaryProperty) + for prop in (expression.args.get("properties") or []) + ) or _is_temp_table(target, dialect=dialect): + query_type_props["temporary"] = True + + query_type = _get_create_type_from_kind(kind) + return query_type, query_type_props + + # UPGRADE: Once we use Python 3.10, replace this with a match expression. + mapping = { + sqlglot.exp.Select: QueryType.SELECT, + sqlglot.exp.Insert: QueryType.INSERT, + sqlglot.exp.Update: QueryType.UPDATE, + sqlglot.exp.Delete: QueryType.DELETE, + sqlglot.exp.Merge: QueryType.MERGE, + sqlglot.exp.Query: QueryType.SELECT, # unions, etc. are also selects + } + + for cls, query_type in mapping.items(): + if isinstance(expression, cls): + return query_type, query_type_props + return QueryType.UNKNOWN, {} + + +def is_create_table_ddl(statement: sqlglot.exp.Expression) -> bool: + return isinstance(statement, sqlglot.exp.Create) and isinstance( + statement.this, sqlglot.exp.Schema + ) diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py index ae152a6c58b15..9c2a588a577cc 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py @@ -1,3 +1,4 @@ +import dataclasses import functools import itertools import logging @@ -28,6 +29,7 @@ TimeTypeClass, ) from datahub.sql_parsing._models import _FrozenModel, _ParserBaseModel, _TableName +from datahub.sql_parsing.query_types import get_query_type_of_sql, is_create_table_ddl from datahub.sql_parsing.schema_resolver import ( SchemaInfo, SchemaResolver, @@ -40,7 +42,6 @@ QueryTypeProps, ) from datahub.sql_parsing.sqlglot_utils import ( - DialectOrStr, get_dialect, get_query_fingerprint_debug, is_dialect_instance, @@ -82,65 +83,6 @@ assert 0 < len(RULES_BEFORE_TYPE_ANNOTATION) < len(sqlglot.optimizer.optimizer.RULES) -def _is_temp_table(table: sqlglot.exp.Table, dialect: sqlglot.Dialect) -> bool: - identifier: sqlglot.exp.Identifier = table.this - - return identifier.args.get("temporary") or ( - is_dialect_instance(dialect, "redshift") and identifier.name.startswith("#") - ) - - -def _get_create_type_from_kind(kind: Optional[str]) -> QueryType: - if kind and "TABLE" in kind: - return QueryType.CREATE_TABLE_AS_SELECT - elif kind and "VIEW" in kind: - return QueryType.CREATE_VIEW - else: - return QueryType.CREATE_OTHER - - -def get_query_type_of_sql( - expression: sqlglot.exp.Expression, dialect: DialectOrStr -) -> Tuple[QueryType, QueryTypeProps]: - dialect = get_dialect(dialect) - query_type_props: QueryTypeProps = {} - - # For creates, we need to look at the inner expression. - if isinstance(expression, sqlglot.exp.Create): - if _is_create_table_ddl(expression): - return QueryType.CREATE_DDL, query_type_props - - kind = expression.args.get("kind") - if kind: - kind = kind.upper() - query_type_props["kind"] = kind - - target = expression.this - if any( - isinstance(prop, sqlglot.exp.TemporaryProperty) - for prop in (expression.args.get("properties") or []) - ) or _is_temp_table(target, dialect=dialect): - query_type_props["temporary"] = True - - query_type = _get_create_type_from_kind(kind) - return query_type, query_type_props - - # UPGRADE: Once we use Python 3.10, replace this with a match expression. - mapping = { - sqlglot.exp.Select: QueryType.SELECT, - sqlglot.exp.Insert: QueryType.INSERT, - sqlglot.exp.Update: QueryType.UPDATE, - sqlglot.exp.Delete: QueryType.DELETE, - sqlglot.exp.Merge: QueryType.MERGE, - sqlglot.exp.Query: QueryType.SELECT, # unions, etc. are also selects - } - - for cls, query_type in mapping.items(): - if isinstance(expression, cls): - return query_type, query_type_props - return QueryType.UNKNOWN, {} - - class _ColumnRef(_FrozenModel): table: _TableName column: str @@ -314,16 +256,34 @@ class SqlUnderstandingError(Exception): pass -# TODO: Break this up into smaller functions. -def _column_level_lineage( # noqa: C901 +@dataclasses.dataclass +class _ColumnResolver: + sqlglot_db_schema: sqlglot.MappingSchema + table_schema_normalized_mapping: Dict[_TableName, Dict[str, str]] + use_case_insensitive_cols: bool + + def schema_aware_fuzzy_column_resolve( + self, table: Optional[_TableName], sqlglot_column: str + ) -> str: + default_col_name = ( + sqlglot_column.lower() if self.use_case_insensitive_cols else sqlglot_column + ) + if table: + return self.table_schema_normalized_mapping[table].get( + sqlglot_column, default_col_name + ) + else: + return default_col_name + + +def _prepare_query_columns( statement: sqlglot.exp.Expression, dialect: sqlglot.Dialect, table_schemas: Dict[_TableName, SchemaInfo], - output_table: Optional[_TableName], default_db: Optional[str], default_schema: Optional[str], -) -> List[_ColumnLineageInfo]: - is_create_ddl = _is_create_table_ddl(statement) +) -> Tuple[sqlglot.exp.Expression, "_ColumnResolver"]: + is_create_ddl = is_create_table_ddl(statement) if ( not isinstance( statement, @@ -335,8 +295,6 @@ def _column_level_lineage( # noqa: C901 f"Can only generate column-level lineage for select-like inner statements, not {type(statement)}" ) - column_lineage: List[_ColumnLineageInfo] = [] - use_case_insensitive_cols = is_dialect_instance( dialect, DIALECTS_WITH_CASE_INSENSITIVE_COLS ) @@ -392,112 +350,118 @@ def _sqlglot_force_column_normalizer( # statement.sql(pretty=True, dialect=dialect), # ) - def _schema_aware_fuzzy_column_resolve( - table: Optional[_TableName], sqlglot_column: str - ) -> str: - default_col_name = ( - sqlglot_column.lower() if use_case_insensitive_cols else sqlglot_column - ) - if table: - return table_schema_normalized_mapping[table].get( - sqlglot_column, default_col_name + if not is_create_ddl: + # Optimize the statement + qualify column references. + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "Prior to column qualification sql %s", + statement.sql(pretty=True, dialect=dialect), + ) + try: + # Second time running qualify, this time with: + # - the select instead of the full outer statement + # - schema info + # - column qualification enabled + # - running the full pre-type annotation optimizer + + # logger.debug("Schema: %s", sqlglot_db_schema.mapping) + statement = sqlglot.optimizer.optimizer.optimize( + statement, + dialect=dialect, + schema=sqlglot_db_schema, + qualify_columns=True, + validate_qualify_columns=False, + identify=True, + # sqlglot calls the db -> schema -> table hierarchy "catalog", "db", "table". + catalog=default_db, + db=default_schema, + rules=RULES_BEFORE_TYPE_ANNOTATION, + ) + except (sqlglot.errors.OptimizeError, ValueError) as e: + raise SqlUnderstandingError( + f"sqlglot failed to map columns to their source tables; likely missing/outdated table schema info: {e}" + ) from e + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "Qualified sql %s", statement.sql(pretty=True, dialect=dialect) ) - else: - return default_col_name - - # Optimize the statement + qualify column references. - if logger.isEnabledFor(logging.DEBUG): - logger.debug( - "Prior to column qualification sql %s", - statement.sql(pretty=True, dialect=dialect), - ) - try: - # Second time running qualify, this time with: - # - the select instead of the full outer statement - # - schema info - # - column qualification enabled - # - running the full pre-type annotation optimizer - - # logger.debug("Schema: %s", sqlglot_db_schema.mapping) - statement = sqlglot.optimizer.optimizer.optimize( - statement, - dialect=dialect, - schema=sqlglot_db_schema, - qualify_columns=True, - validate_qualify_columns=False, - identify=True, - # sqlglot calls the db -> schema -> table hierarchy "catalog", "db", "table". - catalog=default_db, - db=default_schema, - rules=RULES_BEFORE_TYPE_ANNOTATION, - ) - except (sqlglot.errors.OptimizeError, ValueError) as e: - raise SqlUnderstandingError( - f"sqlglot failed to map columns to their source tables; likely missing/outdated table schema info: {e}" - ) from e - if logger.isEnabledFor(logging.DEBUG): - logger.debug("Qualified sql %s", statement.sql(pretty=True, dialect=dialect)) - # Handle the create DDL case. - if is_create_ddl: - assert ( - output_table is not None - ), "output_table must be set for create DDL statements" + # Try to figure out the types of the output columns. + try: + statement = sqlglot.optimizer.annotate_types.annotate_types( + statement, schema=sqlglot_db_schema + ) + except (sqlglot.errors.OptimizeError, sqlglot.errors.ParseError) as e: + # This is not a fatal error, so we can continue. + logger.debug("sqlglot failed to annotate or parse types: %s", e) + + return statement, _ColumnResolver( + sqlglot_db_schema=sqlglot_db_schema, + table_schema_normalized_mapping=table_schema_normalized_mapping, + use_case_insensitive_cols=use_case_insensitive_cols, + ) - create_schema: sqlglot.exp.Schema = statement.this - sqlglot_columns = create_schema.expressions - for column_def in sqlglot_columns: - if not isinstance(column_def, sqlglot.exp.ColumnDef): - # Ignore things like constraints. - continue +def _create_table_ddl_cll( + statement: sqlglot.exp.Expression, + dialect: sqlglot.Dialect, + column_resolver: _ColumnResolver, + output_table: Optional[_TableName], +) -> List[_ColumnLineageInfo]: + column_lineage: List[_ColumnLineageInfo] = [] - output_col = _schema_aware_fuzzy_column_resolve( - output_table, column_def.name - ) - output_col_type = column_def.args.get("kind") + assert ( + output_table is not None + ), "output_table must be set for create DDL statements" - column_lineage.append( - _ColumnLineageInfo( - downstream=_DownstreamColumnRef( - table=output_table, - column=output_col, - column_type=output_col_type, - ), - upstreams=[], - ) - ) + create_schema: sqlglot.exp.Schema = statement.this + sqlglot_columns = create_schema.expressions - return column_lineage + for column_def in sqlglot_columns: + if not isinstance(column_def, sqlglot.exp.ColumnDef): + # Ignore things like constraints. + continue - # Try to figure out the types of the output columns. - try: - statement = sqlglot.optimizer.annotate_types.annotate_types( - statement, schema=sqlglot_db_schema + output_col = column_resolver.schema_aware_fuzzy_column_resolve( + output_table, column_def.name + ) + output_col_type = column_def.args.get("kind") + + column_lineage.append( + _ColumnLineageInfo( + downstream=_DownstreamColumnRef( + table=output_table, + column=output_col, + column_type=output_col_type, + ), + upstreams=[], + ) ) - except (sqlglot.errors.OptimizeError, sqlglot.errors.ParseError) as e: - # This is not a fatal error, so we can continue. - logger.debug("sqlglot failed to annotate or parse types: %s", e) - try: - assert isinstance(statement, _SupportedColumnLineageTypesTuple) + return column_lineage - cached_scope = sqlglot.optimizer.build_scope(statement) +def _select_statement_cll( # noqa: C901 + statement: _SupportedColumnLineageTypes, + dialect: sqlglot.Dialect, + root_scope: sqlglot.optimizer.Scope, + column_resolver: _ColumnResolver, + output_table: Optional[_TableName], +) -> List[_ColumnLineageInfo]: + column_lineage: List[_ColumnLineageInfo] = [] + + try: # List output columns. output_columns = [ (select_col.alias_or_name, select_col) for select_col in statement.selects ] logger.debug("output columns: %s", [col[0] for col in output_columns]) for output_col, original_col_expression in output_columns: - if output_col == "*": + if not output_col or output_col == "*": # If schema information is available, the * will be expanded to the actual columns. # Otherwise, we can't process it. continue - if output_col == "": - continue - if is_dialect_instance(dialect, "bigquery") and output_col.lower() in { "_partitiontime", "_partitiondate", @@ -512,54 +476,31 @@ def _schema_aware_fuzzy_column_resolve( output_col, statement, dialect=dialect, - schema=sqlglot_db_schema, - scope=cached_scope, + scope=root_scope, trim_selects=False, + # We don't need to pass the schema in here, since we've already qualified the columns. ) + # import pathlib # pathlib.Path("sqlglot.html").write_text( # str(lineage_node.to_html(dialect=dialect)) # ) # Generate SELECT lineage. - # Using a set here to deduplicate upstreams. - direct_raw_col_upstreams: Set[_ColumnRef] = set() - for node in lineage_node.walk(): - if node.downstream: - # We only want the leaf nodes. - pass - - elif isinstance(node.expression, sqlglot.exp.Table): - table_ref = _TableName.from_sqlglot_table(node.expression) - - if node.name == "*": - # This will happen if we couldn't expand the * to actual columns e.g. if - # we don't have schema info for the table. In this case, we can't generate - # column-level lineage, so we skip it. - continue - - # Parse the column name out of the node name. - # Sqlglot calls .sql(), so we have to do the inverse. - normalized_col = sqlglot.parse_one(node.name).this.name - if node.subfield: - normalized_col = f"{normalized_col}.{node.subfield}" - - direct_raw_col_upstreams.add( - _ColumnRef(table=table_ref, column=normalized_col) - ) - else: - # This branch doesn't matter. For example, a count(*) column would go here, and - # we don't get any column-level lineage for that. - pass + direct_raw_col_upstreams = _get_direct_raw_col_upstreams(lineage_node) # column_logic = lineage_node.source + # Fuzzy resolve the output column. + original_col_expression = lineage_node.expression if output_col.startswith("_col_"): # This is the format sqlglot uses for unnamed columns e.g. 'count(id)' -> 'count(id) AS _col_0' # This is a bit jank since we're relying on sqlglot internals, but it seems to be # the best way to do it. output_col = original_col_expression.this.sql(dialect=dialect) - output_col = _schema_aware_fuzzy_column_resolve(output_table, output_col) + output_col = column_resolver.schema_aware_fuzzy_column_resolve( + output_table, output_col + ) # Guess the output column type. output_col_type = None @@ -570,7 +511,9 @@ def _schema_aware_fuzzy_column_resolve( direct_resolved_col_upstreams = { _ColumnRef( table=edge.table, - column=_schema_aware_fuzzy_column_resolve(edge.table, edge.column), + column=column_resolver.schema_aware_fuzzy_column_resolve( + edge.table, edge.column + ), ) for edge in direct_raw_col_upstreams } @@ -598,6 +541,121 @@ def _schema_aware_fuzzy_column_resolve( return column_lineage +class _ColumnLineageWithDebugInfo(_ParserBaseModel): + column_lineage: List[_ColumnLineageInfo] + + select_statement: Optional[sqlglot.exp.Expression] = None + # TODO: Add column exceptions here. + + +def _column_level_lineage( + statement: sqlglot.exp.Expression, + dialect: sqlglot.Dialect, + downstream_table: Optional[_TableName], + table_name_schema_mapping: Dict[_TableName, SchemaInfo], + default_db: Optional[str], + default_schema: Optional[str], +) -> _ColumnLineageWithDebugInfo: + # Simplify the input statement for column-level lineage generation. + try: + select_statement = _try_extract_select(statement) + except Exception as e: + raise SqlUnderstandingError( + f"Failed to extract select from statement: {e}" + ) from e + + try: + assert select_statement is not None + (select_statement, column_resolver) = _prepare_query_columns( + select_statement, + dialect=dialect, + table_schemas=table_name_schema_mapping, + default_db=default_db, + default_schema=default_schema, + ) + except UnsupportedStatementTypeError as e: + # Inject details about the outer statement type too. + e.args = (f"{e.args[0]} (outer statement type: {type(statement)})",) + logger.debug(e) + raise e + + # Handle the create table DDL case separately. + if is_create_table_ddl(select_statement): + column_lineage = _create_table_ddl_cll( + select_statement, + dialect=dialect, + column_resolver=column_resolver, + output_table=downstream_table, + ) + return _ColumnLineageWithDebugInfo( + column_lineage=column_lineage, + select_statement=select_statement, + ) + + assert isinstance(select_statement, _SupportedColumnLineageTypesTuple) + try: + root_scope = sqlglot.optimizer.build_scope(select_statement) + if root_scope is None: + raise SqlUnderstandingError( + f"Failed to build scope for statement - scope was empty: {statement}" + ) + except (sqlglot.errors.OptimizeError, ValueError, IndexError) as e: + raise SqlUnderstandingError( + f"sqlglot failed to preprocess statement: {e}" + ) from e + + # Generate column-level lineage. + column_lineage = _select_statement_cll( + select_statement, + dialect=dialect, + root_scope=root_scope, + column_resolver=column_resolver, + output_table=downstream_table, + ) + + return _ColumnLineageWithDebugInfo( + column_lineage=column_lineage, + select_statement=select_statement, + ) + + +def _get_direct_raw_col_upstreams( + lineage_node: sqlglot.lineage.Node, +) -> Set[_ColumnRef]: + # Using a set here to deduplicate upstreams. + direct_raw_col_upstreams: Set[_ColumnRef] = set() + + for node in lineage_node.walk(): + if node.downstream: + # We only want the leaf nodes. + pass + + elif isinstance(node.expression, sqlglot.exp.Table): + table_ref = _TableName.from_sqlglot_table(node.expression) + + if node.name == "*": + # This will happen if we couldn't expand the * to actual columns e.g. if + # we don't have schema info for the table. In this case, we can't generate + # column-level lineage, so we skip it. + continue + + # Parse the column name out of the node name. + # Sqlglot calls .sql(), so we have to do the inverse. + normalized_col = sqlglot.parse_one(node.name).this.name + if node.subfield: + normalized_col = f"{normalized_col}.{node.subfield}" + + direct_raw_col_upstreams.add( + _ColumnRef(table=table_ref, column=normalized_col) + ) + else: + # This branch doesn't matter. For example, a count(*) column would go here, and + # we don't get any column-level lineage for that. + pass + + return direct_raw_col_upstreams + + def _extract_select_from_create( statement: sqlglot.exp.Create, ) -> sqlglot.exp.Expression: @@ -641,7 +699,7 @@ def _extract_select_from_update( new_expressions.append(expr) # Special translation for the `from` clause. - extra_args = {} + extra_args: dict = {} original_from = statement.args.get("from") if original_from and isinstance(original_from.this, sqlglot.exp.Table): # Move joins, laterals, and pivots from the Update->From->Table->field @@ -650,7 +708,8 @@ def _extract_select_from_update( for k in _UPDATE_FROM_TABLE_ARGS_TO_MOVE: if k in original_from.this.args: # Mutate the from table clause in-place. - extra_args[k] = original_from.this.args.pop(k) + extra_args[k] = original_from.this.args.get(k) + original_from.this.set(k, None) select_statement = sqlglot.exp.Select( **{ @@ -676,12 +735,6 @@ def _extract_select_from_update( return select_statement -def _is_create_table_ddl(statement: sqlglot.exp.Expression) -> bool: - return isinstance(statement, sqlglot.exp.Create) and isinstance( - statement.this, sqlglot.exp.Schema - ) - - def _try_extract_select( statement: sqlglot.exp.Expression, ) -> sqlglot.exp.Expression: @@ -878,39 +931,28 @@ def _sqlglot_lineage_inner( f"Resolved {total_schemas_resolved} of {total_tables_discovered} table schemas" ) - # Simplify the input statement for column-level lineage generation. - try: - select_statement = _try_extract_select(statement) - except Exception as e: - logger.debug(f"Failed to extract select from statement: {e}", exc_info=True) - debug_info.column_error = e - select_statement = None - - # Generate column-level lineage. column_lineage: Optional[List[_ColumnLineageInfo]] = None try: - if select_statement is not None: - with cooperative_timeout( - timeout=( - SQL_LINEAGE_TIMEOUT_SECONDS if SQL_LINEAGE_TIMEOUT_ENABLED else None - ) - ): - column_lineage = _column_level_lineage( - select_statement, - dialect=dialect, - table_schemas=table_name_schema_mapping, - output_table=downstream_table, - default_db=default_db, - default_schema=default_schema, - ) - except UnsupportedStatementTypeError as e: - # Inject details about the outer statement type too. - e.args = (f"{e.args[0]} (outer statement type: {type(statement)})",) - debug_info.column_error = e - logger.debug(debug_info.column_error) + with cooperative_timeout( + timeout=( + SQL_LINEAGE_TIMEOUT_SECONDS if SQL_LINEAGE_TIMEOUT_ENABLED else None + ) + ): + column_lineage_debug_info = _column_level_lineage( + statement, + dialect=dialect, + downstream_table=downstream_table, + table_name_schema_mapping=table_name_schema_mapping, + default_db=default_db, + default_schema=default_schema, + ) + column_lineage = column_lineage_debug_info.column_lineage except CooperativeTimeoutError as e: logger.debug(f"Timed out while generating column-level lineage: {e}") debug_info.column_error = e + except UnsupportedStatementTypeError as e: + # For this known exception type, we assume the error is logged at the point of failure. + debug_info.column_error = e except Exception as e: logger.debug(f"Failed to generate column-level lineage: {e}", exc_info=True) debug_info.column_error = e diff --git a/metadata-ingestion/src/datahub/utilities/logging_manager.py b/metadata-ingestion/src/datahub/utilities/logging_manager.py index 6b69fda15fc70..6bfbb9d373396 100644 --- a/metadata-ingestion/src/datahub/utilities/logging_manager.py +++ b/metadata-ingestion/src/datahub/utilities/logging_manager.py @@ -13,6 +13,7 @@ import collections import contextlib +import itertools import logging import os import pathlib @@ -50,6 +51,9 @@ def extract_name_from_filename(filename: str, fallback_name: str) -> str: >>> extract_name_from_filename("/home/user/datahub/metadata-ingestion/src/datahub/telemetry/telemetry.py", "bad") 'datahub.telemetry.telemetry' + >>> extract_name_from_filename("/home/user/datahub/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py", "bad") + 'datahub_airflow_plugin.datahub_listener' + >>> extract_name_from_filename("/this/is/not/a/normal/path.py", "fallback.package") 'fallback.package' @@ -76,14 +80,25 @@ def extract_name_from_filename(filename: str, fallback_name: str) -> str: # Join the parts from 'site-packages' onwards with '.' return ".".join(path_parts[site_packages_index + 1 :]) - # We're probably in a development environment, so take everything after 'metadata-ingestion' - metadata_ingestion_index = next( - (i for i, part in enumerate(path_parts) if "metadata-ingestion" in part), - None, + # We're probably in a development environment, so take everything after 'src' as the module. + src_dir_index = next( + itertools.chain( + ( + i + 2 + for i, part in enumerate(path_parts) + if "metadata-ingestion-modules" in part + ), + ( + i + 1 + for i, part in enumerate(path_parts) + if "metadata-ingestion" in part + ), + [None], + ) ) - if metadata_ingestion_index is not None: - # Join the parts from 'metadata-ingestion/src' onwards with '.' - return ".".join(path_parts[metadata_ingestion_index + 2 :]) + if src_dir_index is not None: + # Join the parts after 'src' with '.' + return ".".join(path_parts[src_dir_index + 1 :]) return fallback_name diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_multiple_select_subqueries.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_multiple_select_subqueries.json new file mode 100644 index 0000000000000..e01e7253847ed --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_multiple_select_subqueries.json @@ -0,0 +1,37 @@ +{ + "query_type": "SELECT", + "query_type_props": {}, + "query_fingerprint": "6550b4146336b0b15b972685bd4b5b30ba9573c9a1f2e5ddd2fc2fb64eca1b51", + "in_tables": [ + "urn:li:dataset:(urn:li:dataPlatform:mysql,x,PROD)" + ], + "out_tables": [], + "column_lineage": [ + { + "downstream": { + "table": null, + "column": "y", + "column_type": null, + "native_column_type": null + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:mysql,x,PROD)", + "column": "a" + }, + { + "table": "urn:li:dataset:(urn:li:dataPlatform:mysql,x,PROD)", + "column": "b" + }, + { + "table": "urn:li:dataset:(urn:li:dataPlatform:mysql,x,PROD)", + "column": "c" + } + ] + } + ], + "debug_info": { + "confidence": 0.2, + "generalized_statement": "SELECT SUM((SELECT MAX(a) AS a FROM x) + (SELECT MIN(b) AS b FROM x) + c) AS y FROM x" + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_with_complex_ctes.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_with_complex_ctes.json new file mode 100644 index 0000000000000..0fb0860e2c070 --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_with_complex_ctes.json @@ -0,0 +1,44 @@ +{ + "query_type": "SELECT", + "query_type_props": {}, + "query_fingerprint": "1934ea53767f5be081ab4e0bcf9930ecc6d8e46d178187abe335a5de18f0d743", + "in_tables": [ + "urn:li:dataset:(urn:li:dataPlatform:oracle,table1,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:oracle,table2,PROD)" + ], + "out_tables": [], + "column_lineage": [ + { + "downstream": { + "table": null, + "column": "COL1", + "column_type": null, + "native_column_type": null + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:oracle,table1,PROD)", + "column": "COL1" + } + ] + }, + { + "downstream": { + "table": null, + "column": "COL3", + "column_type": null, + "native_column_type": null + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:oracle,table2,PROD)", + "column": "COL3" + } + ] + } + ], + "debug_info": { + "confidence": 0.2, + "generalized_statement": "WITH cte1 AS (SELECT col1, col2 FROM table1 WHERE col1 = ? GROUP BY ?, ?), cte2 AS (SELECT col3, col4 FROM table2 WHERE col2 = ? GROUP BY col3, col4) SELECT cte1.col1, cte2.col3 FROM cte1 JOIN cte2 ON cte1.col2 = cte2.col4" + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_sqlite_insert_into_values.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_sqlite_insert_into_values.json new file mode 100644 index 0000000000000..b562f493dc89f --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_sqlite_insert_into_values.json @@ -0,0 +1,14 @@ +{ + "query_type": "INSERT", + "query_type_props": {}, + "query_fingerprint": "8ec1beb7974bddae4c16b256055853bcbf08e46d4a401fd3a114c39c63479e61", + "in_tables": [], + "out_tables": [ + "urn:li:dataset:(urn:li:dataPlatform:sqlite,my_table,PROD)" + ], + "column_lineage": null, + "debug_info": { + "confidence": 0.2, + "generalized_statement": "INSERT INTO my_table (id, month, total_cost, area) VALUES (?), (?), (?)" + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py index 826016d07317b..5c6abf4c9371d 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py @@ -11,7 +11,11 @@ SqlParsingAggregator, ) from datahub.sql_parsing.sql_parsing_common import QueryType -from datahub.sql_parsing.sqlglot_lineage import ColumnLineageInfo, ColumnRef +from datahub.sql_parsing.sqlglot_lineage import ( + ColumnLineageInfo, + ColumnRef, + DownstreamColumnRef, +) from tests.test_helpers import mce_helpers from tests.test_helpers.click_helpers import run_datahub_cmd @@ -432,15 +436,15 @@ def test_add_known_query_lineage(pytestconfig: pytest.Config) -> None: upstreams=[upstream_urn], column_lineage=[ ColumnLineageInfo( - downstream=ColumnRef(table=downstream_urn, column="a"), + downstream=DownstreamColumnRef(table=downstream_urn, column="a"), upstreams=[ColumnRef(table=upstream_urn, column="a")], ), ColumnLineageInfo( - downstream=ColumnRef(table=downstream_urn, column="b"), + downstream=DownstreamColumnRef(table=downstream_urn, column="b"), upstreams=[ColumnRef(table=upstream_urn, column="b")], ), ColumnLineageInfo( - downstream=ColumnRef(table=downstream_urn, column="c"), + downstream=DownstreamColumnRef(table=downstream_urn, column="c"), upstreams=[ColumnRef(table=upstream_urn, column="c")], ), ], diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py b/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py index a786070eeb101..ed767f48cf685 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py @@ -84,6 +84,40 @@ def test_select_with_ctes(): ) +def test_select_with_complex_ctes(): + # This one has group bys in the CTEs, which means they can't be collapsed into the main query. + assert_sql_result( + """ +WITH cte1 AS ( + SELECT col1, col2 + FROM table1 + WHERE col1 = 'value1' + GROUP BY 1, 2 +), cte2 AS ( + SELECT col3, col4 + FROM table2 + WHERE col2 = 'value2' + GROUP BY col3, col4 +) +SELECT cte1.col1, cte2.col3 +FROM cte1 +JOIN cte2 ON cte1.col2 = cte2.col4 +""", + dialect="oracle", + expected_file=RESOURCE_DIR / "test_select_with_complex_ctes.json", + ) + + +def test_multiple_select_subqueries(): + assert_sql_result( + """ +SELECT SUM((SELECT max(a) a from x) + (SELECT min(b) b from x) + c) AS y FROM x +""", + dialect="mysql", + expected_file=RESOURCE_DIR / "test_multiple_select_subqueries.json", + ) + + def test_create_view_as_select(): assert_sql_result( """ @@ -1123,3 +1157,17 @@ def test_snowflake_with_unnamed_column_from_udf_call() -> None: default_db="my_db", expected_file=RESOURCE_DIR / "test_snowflake_unnamed_column_udf.json", ) + + +def test_sqlite_insert_into_values() -> None: + assert_sql_result( + """\ +INSERT INTO my_table (id, month, total_cost, area) + VALUES + (1, '2021-01', 100, 10), + (2, '2021-02', 200, 20), + (3, '2021-03', 300, 30) +""", + dialect="sqlite", + expected_file=RESOURCE_DIR / "test_sqlite_insert_into_values.json", + )