Skip to content

Commit

Permalink
chore(mysql): port to MySQLdb instead of pymysql
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud committed Sep 10, 2024
1 parent 019cae5 commit 4fcee76
Show file tree
Hide file tree
Showing 13 changed files with 133 additions and 134 deletions.
2 changes: 1 addition & 1 deletion .github/renovate.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
"addLabels": ["druid"]
},
{
"matchPackagePatterns": ["pymysql", "mariadb"],
"matchPackagePatterns": ["mysqlclient", "mariadb"],
"addLabels": ["mysql"]
},
{
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/ibis-backends.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ jobs:
- polars
sys-deps:
- libgeos-dev
- default-libmysqlclient-dev
- name: postgres
title: PostgreSQL
extras:
Expand Down Expand Up @@ -281,6 +282,7 @@ jobs:
- mysql
sys-deps:
- libgeos-dev
- default-libmysqlclient-dev
- os: windows-latest
backend:
name: clickhouse
Expand Down
2 changes: 1 addition & 1 deletion conda/environment-arm64-flink.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ dependencies:
- pyarrow-hotfix >=0.4
- pydata-google-auth
- pydruid >=0.6.5
- pymysql >=1
- mysqlclient >=2.2.4
- pyspark >=3
- python-dateutil >=2.8.2
- python-duckdb >=0.8.1
Expand Down
2 changes: 1 addition & 1 deletion conda/environment-arm64.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ dependencies:
- pyarrow-hotfix >=0.4
- pydata-google-auth
- pydruid >=0.6.5
- pymysql >=1
- mysqlclient >=2.2.4
- pyodbc >=4.0.39
- pyspark >=3
- python-dateutil >=2.8.2
Expand Down
2 changes: 1 addition & 1 deletion conda/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ dependencies:
- pyarrow-hotfix >=0.4
- pydata-google-auth
- pydruid >=0.6.5
- pymysql >=1
- mysqlclient >=2.2.4
- pyodbc >=4.0.39
- pyspark >=3
- python >=3.10
Expand Down
153 changes: 73 additions & 80 deletions ibis/backends/mysql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
from __future__ import annotations

import contextlib
import re
import warnings
from functools import cached_property
from operator import itemgetter
from typing import TYPE_CHECKING, Any
from urllib.parse import unquote_plus

import pymysql
import MySQLdb
import sqlglot as sg
import sqlglot.expressions as sge

Expand All @@ -22,7 +21,6 @@
import ibis.expr.types as ir
from ibis import util
from ibis.backends import CanCreateDatabase
from ibis.backends.mysql.datatypes import _type_from_cursor_info
from ibis.backends.sql import SQLBackend
from ibis.backends.sql.compilers.base import STAR, TRUE, C

Expand Down Expand Up @@ -87,16 +85,14 @@ def _from_url(self, url: ParseResult, **kwargs):

@cached_property
def version(self):
matched = re.search(r"(\d+)\.(\d+)\.(\d+)", self.con.server_version)
return ".".join(matched.groups())
return ".".join(map(str, self.con._server_version))

Check warning on line 88 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L88

Added line #L88 was not covered by tests

def do_connect(
self,
host: str = "localhost",
user: str | None = None,
password: str | None = None,
port: int = 3306,
database: str | None = None,
autocommit: bool = True,
**kwargs,
) -> None:
Expand All @@ -112,12 +108,10 @@ def do_connect(
Password
port
Port
database
Database to connect to
autocommit
Autocommit mode
kwargs
Additional keyword arguments passed to `pymysql.connect`
Additional keyword arguments passed to `MySQLdb.connect`
Examples
--------
Expand Down Expand Up @@ -150,22 +144,20 @@ def do_connect(
month : int32
"""
self.con = pymysql.connect(
self.con = MySQLdb.connect(

Check warning on line 147 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L147

Added line #L147 was not covered by tests
user=user,
host=host,
host="127.0.0.1" if host == "localhost" else host,
port=port,
password=password,
database=database,
autocommit=autocommit,
conv=pymysql.converters.conversions,
**kwargs,
)

self._post_connect()

@util.experimental
@classmethod
def from_connection(cls, con: pymysql.Connection) -> Backend:
def from_connection(cls, con) -> Backend:
"""Create an Ibis client from an existing connection to a MySQL database.
Parameters
Expand All @@ -180,7 +172,7 @@ def from_connection(cls, con: pymysql.Connection) -> Backend:
return new_backend

def _post_connect(self) -> None:
with contextlib.closing(self.con.cursor()) as cur:
with self.con.cursor() as cur:
try:
cur.execute("SET @@session.time_zone = 'UTC'")
except Exception as e: # noqa: BLE001
Expand All @@ -199,24 +191,34 @@ def list_databases(self, like: str | None = None) -> list[str]:
return self._filter_with_like(databases, like)

def _get_schema_using_query(self, query: str) -> sch.Schema:
with self.begin() as cur:
cur.execute(
sg.select(STAR)
.from_(
sg.parse_one(query, dialect=self.dialect).subquery(
sg.to_identifier("tmp", quoted=self.compiler.quoted)
)
from ibis.backends.mysql.datatypes import _type_from_cursor_info

Check warning on line 194 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L194

Added line #L194 was not covered by tests

sql = (

Check warning on line 196 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L196

Added line #L196 was not covered by tests
sg.select(STAR)
.from_(
sg.parse_one(query, dialect=self.dialect).subquery(
sg.to_identifier("tmp", quoted=self.compiler.quoted)
)
.limit(0)
.sql(self.dialect)
)

return sch.Schema(
{
field.name: _type_from_cursor_info(descr, field)
for descr, field in zip(cur.description, cur._result.fields)
}
.limit(0)
.sql(self.dialect)
)
with self.begin() as cur:
cur.execute(sql)
descr, flags = cur.description, cur.description_flags

Check warning on line 208 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L207-L208

Added lines #L207 - L208 were not covered by tests

items = {}

Check warning on line 210 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L210

Added line #L210 was not covered by tests
for (name, type_code, _, _, field_length, scale, _), raw_flags in zip(
descr, flags
):
item = _type_from_cursor_info(

Check warning on line 214 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L214

Added line #L214 was not covered by tests
flags=raw_flags,
type_code=type_code,
field_length=field_length,
scale=scale,
)
items[name] = item
return sch.Schema(items)

Check warning on line 221 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L220-L221

Added lines #L220 - L221 were not covered by tests

def get_schema(
self, name: str, *, catalog: str | None = None, database: str | None = None
Expand Down Expand Up @@ -255,38 +257,52 @@ def drop_database(self, name: str, force: bool = False) -> None:
def begin(self):
con = self.con
cur = con.cursor()
autocommit = con.get_autocommit()

Check warning on line 260 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L260

Added line #L260 was not covered by tests

if not autocommit:
con.begin()

Check warning on line 263 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L263

Added line #L263 was not covered by tests

try:
yield cur
except Exception:
con.rollback()
if not autocommit:
con.rollback()

Check warning on line 269 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L269

Added line #L269 was not covered by tests
raise
else:
con.commit()
if not autocommit:
con.commit()

Check warning on line 273 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L273

Added line #L273 was not covered by tests
finally:
cur.close()

# TODO(kszucs): should make it an abstract method or remove the use of it
# from .execute()
@contextlib.contextmanager
def _safe_raw_sql(self, *args, **kwargs):
with contextlib.closing(self.raw_sql(*args, **kwargs)) as result:
with self.raw_sql(*args, **kwargs) as result:
yield result

def raw_sql(self, query: str | sg.Expression, **kwargs: Any) -> Any:
with contextlib.suppress(AttributeError):
query = query.sql(dialect=self.name)

con = self.con
autocommit = con.get_autocommit()

Check warning on line 289 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L289

Added line #L289 was not covered by tests

cursor = con.cursor()

if not autocommit:
con.begin()

Check warning on line 294 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L294

Added line #L294 was not covered by tests

try:
cursor.execute(query, **kwargs)
except Exception:
con.rollback()
if not autocommit:
con.rollback()

Check warning on line 300 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L300

Added line #L300 was not covered by tests
cursor.close()
raise
else:
con.commit()
if not autocommit:
con.commit()

Check warning on line 305 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L305

Added line #L305 was not covered by tests
return cursor

# TODO: disable positional arguments
Expand Down Expand Up @@ -403,11 +419,9 @@ def create_table(
if temp:
properties.append(sge.TemporaryProperty())

temp_memtable_view = None
if obj is not None:
if not isinstance(obj, ir.Expr):
table = ibis.memtable(obj)
temp_memtable_view = table.op().name
else:
table = obj

Expand All @@ -425,39 +439,33 @@ def create_table(
if not schema:
schema = table.schema()

table_expr = sg.table(temp_name, catalog=database, quoted=self.compiler.quoted)
target = sge.Schema(
this=table_expr, expressions=schema.to_sqlglot(self.dialect)
)
quoted = self.compiler.quoted
dialect = self.dialect

Check warning on line 443 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L442-L443

Added lines #L442 - L443 were not covered by tests

table_expr = sg.table(temp_name, catalog=database, quoted=quoted)
target = sge.Schema(this=table_expr, expressions=schema.to_sqlglot(dialect))

Check warning on line 446 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L445-L446

Added lines #L445 - L446 were not covered by tests

create_stmt = sge.Create(
kind="TABLE",
this=target,
properties=sge.Properties(expressions=properties),
kind="TABLE", this=target, properties=sge.Properties(expressions=properties)
)

this = sg.table(name, catalog=database, quoted=self.compiler.quoted)
this = sg.table(name, catalog=database, quoted=quoted)

Check warning on line 452 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L452

Added line #L452 was not covered by tests
with self._safe_raw_sql(create_stmt) as cur:
if query is not None:
insert_stmt = sge.Insert(this=table_expr, expression=query).sql(
self.name
)
cur.execute(insert_stmt)
cur.execute(sge.Insert(this=table_expr, expression=query).sql(dialect))

Check warning on line 455 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L455

Added line #L455 was not covered by tests

if overwrite:
cur.execute(sge.Drop(kind="TABLE", this=this, exists=True).sql(dialect))

Check warning on line 458 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L458

Added line #L458 was not covered by tests
cur.execute(
sge.Drop(kind="TABLE", this=this, exists=True).sql(self.name)
)
cur.execute(
f"ALTER TABLE IF EXISTS {table_expr.sql(self.name)} RENAME TO {this.sql(self.name)}"
sge.Alter(
kind="TABLE",
this=table_expr,
exists=True,
actions=[sge.RenameTable(this=this)],
).sql(dialect)
)

if schema is None:
# Clean up temporary memtable if we've created one
# for in-memory reads
if temp_memtable_view is not None:
self.drop_table(temp_memtable_view)

return self.table(name, database=database)

# preserve the input schema if it was provided
Expand All @@ -475,16 +483,17 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:

name = op.name
quoted = self.compiler.quoted
dialect = self.dialect

Check warning on line 486 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L486

Added line #L486 was not covered by tests

create_stmt = sg.exp.Create(
kind="TABLE",
this=sg.exp.Schema(
this=sg.to_identifier(name, quoted=quoted),
expressions=schema.to_sqlglot(self.dialect),
expressions=schema.to_sqlglot(dialect),
),
properties=sg.exp.Properties(expressions=[sge.TemporaryProperty()]),
)
create_stmt_sql = create_stmt.sql(self.name)
create_stmt_sql = create_stmt.sql(dialect)

Check warning on line 496 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L496

Added line #L496 was not covered by tests

df = op.data.to_frame()
# nan can not be used with MySQL
Expand Down Expand Up @@ -529,23 +538,7 @@ def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame:

from ibis.backends.mysql.converter import MySQLPandasData

try:
df = pd.DataFrame.from_records(
cursor, columns=schema.names, coerce_float=True
)
except Exception:
# clean up the cursor if we fail to create the DataFrame
#
# in the sqlite case failing to close the cursor results in
# artificially locked tables
cursor.close()
raise
df = MySQLPandasData.convert_table(df, schema)
return df

def _finalize_memtable(self, name: str) -> None:
"""No-op.
Executing **any** SQL in a finalizer causes the underlying connection
socket to be set to `None`. It is unclear why this happens.
"""
df = pd.DataFrame.from_records(

Check warning on line 541 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L541

Added line #L541 was not covered by tests
cursor.fetchall(), columns=schema.names, coerce_float=True
)
return MySQLPandasData.convert_table(df, schema)

Check warning on line 544 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L544

Added line #L544 was not covered by tests
Loading

0 comments on commit 4fcee76

Please sign in to comment.