diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 5a48f8b7918dce..d5dbb98d3cb17b 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -461,7 +461,7 @@ "mssql-odbc": sql_common | mssql_common | {"pyodbc"}, "mysql": mysql, # mariadb should have same dependency as mysql - "mariadb": sql_common | {"pymysql>=1.0.2"}, + "mariadb": sql_common | mysql, "okta": {"okta~=1.7.0", "nest-asyncio"}, "oracle": sql_common | {"oracledb"}, "postgres": sql_common | postgres_common, diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py index cd3c2146e6d848..09f38913f11b19 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py @@ -1,6 +1,7 @@ import os from typing import Optional, Set +import pydantic from pydantic import Field, root_validator from datahub.configuration.common import AllowDenyPattern @@ -119,3 +120,12 @@ def check_ingesting_data(cls, values): " Please specify at least one of `database_connection` or `kafka_connection`, ideally both." ) return values + + @pydantic.validator("database_connection") + def validate_mysql_scheme( + cls, v: SQLAlchemyConnectionConfig + ) -> SQLAlchemyConnectionConfig: + if "mysql" in v.scheme: + if v.scheme != "mysql+pymysql": + raise ValueError("For MySQL, the scheme must be mysql+pymysql.") + return v diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_database_reader.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_database_reader.py index 80906ca63115f5..ee105f4862caba 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_database_reader.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_database_reader.py @@ -151,8 +151,10 @@ def execute_server_cursor( self, query: str, params: Dict[str, Any] ) -> Iterable[Dict[str, Any]]: with self.engine.connect() as conn: - if self.engine.dialect.name == "postgresql": + if self.engine.dialect.name in ["postgresql", "mysql", "mariadb"]: with conn.begin(): # Transaction required for PostgreSQL server-side cursor + # Note that stream_results=True is mainly supported by PostgreSQL and MySQL-based dialects. + # https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.Connection.execution_options.params.stream_results conn = conn.execution_options( stream_results=True, yield_per=self.config.database_query_batch_size, @@ -160,22 +162,6 @@ def execute_server_cursor( result = conn.execute(query, params) for row in result: yield dict(row) - elif self.engine.dialect.name == "mysql": # MySQL - import MySQLdb - - with contextlib.closing( - conn.connection.cursor(MySQLdb.cursors.SSCursor) - ) as cursor: - logger.debug(f"Using Cursor type: {cursor.__class__.__name__}") - cursor.execute(query, params) - - columns = [desc[0] for desc in cursor.description] - while True: - rows = cursor.fetchmany(self.config.database_query_batch_size) - if not rows: - break # Use break instead of return in generator - for row in rows: - yield dict(zip(columns, row)) else: raise ValueError(f"Unsupported dialect: {self.engine.dialect.name}") diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py index cb72441344088c..12daba298a2014 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py @@ -130,7 +130,7 @@ def _get_database_workunits( self._commit_progress(i) def _get_kafka_workunits( - self, from_offsets: Dict[int, int], soft_deleted_urns: List[str] = [] + self, from_offsets: Dict[int, int], soft_deleted_urns: List[str] ) -> Iterable[MetadataWorkUnit]: if self.config.kafka_connection is None: return