Skip to content

Commit

Permalink
chore(mysql): port to MySQLdb instead of pymysql
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud committed Sep 14, 2024
1 parent d0856f3 commit bb1eecb
Show file tree
Hide file tree
Showing 14 changed files with 135 additions and 141 deletions.
2 changes: 1 addition & 1 deletion .github/renovate.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
"addLabels": ["druid"]
},
{
"matchPackagePatterns": ["pymysql", "mariadb"],
"matchPackagePatterns": ["mysqlclient", "mariadb"],
"addLabels": ["mysql"]
},
{
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/ibis-backends.yml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ jobs:
- polars
sys-deps:
- libgeos-dev
- default-libmysqlclient-dev
- name: postgres
title: PostgreSQL
extras:
Expand Down Expand Up @@ -270,6 +271,7 @@ jobs:
- mysql
sys-deps:
- libgeos-dev
- default-libmysqlclient-dev
- os: windows-latest
backend:
name: clickhouse
Expand Down
2 changes: 1 addition & 1 deletion conda/environment-arm64-flink.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ dependencies:
- pyarrow-hotfix >=0.4
- pydata-google-auth
- pydruid >=0.6.5
- pymysql >=1
- mysqlclient >=2.2.4
- pyspark >=3
- python-dateutil >=2.8.2
- python-duckdb >=0.8.1
Expand Down
2 changes: 1 addition & 1 deletion conda/environment-arm64.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ dependencies:
- pyarrow-hotfix >=0.4
- pydata-google-auth
- pydruid >=0.6.5
- pymysql >=1
- mysqlclient >=2.2.4
- pyodbc >=4.0.39
- pyspark >=3
- python-dateutil >=2.8.2
Expand Down
2 changes: 1 addition & 1 deletion conda/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ dependencies:
- pyarrow-hotfix >=0.4
- pydata-google-auth
- pydruid >=0.6.5
- pymysql >=1
- mysqlclient >=2.2.4
- pyodbc >=4.0.39
- pyspark >=3
- python >=3.10
Expand Down
157 changes: 75 additions & 82 deletions ibis/backends/mysql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@
from __future__ import annotations

import contextlib
import re
import warnings
from functools import cached_property
from operator import itemgetter
from typing import TYPE_CHECKING, Any
from urllib.parse import unquote_plus

import pymysql
import MySQLdb
import sqlglot as sg
import sqlglot.expressions as sge
from pymysql.constants import ER
from MySQLdb.constants import ER

import ibis
import ibis.backends.sql.compilers as sc
Expand All @@ -23,7 +22,6 @@
import ibis.expr.types as ir
from ibis import util
from ibis.backends import CanCreateDatabase
from ibis.backends.mysql.datatypes import _type_from_cursor_info
from ibis.backends.sql import SQLBackend
from ibis.backends.sql.compilers.base import STAR, TRUE, C

Expand Down Expand Up @@ -88,16 +86,14 @@ def _from_url(self, url: ParseResult, **kwargs):

@cached_property
def version(self):
matched = re.search(r"(\d+)\.(\d+)\.(\d+)", self.con.server_version)
return ".".join(matched.groups())
return ".".join(map(str, self.con._server_version))

def do_connect(
self,
host: str = "localhost",
user: str | None = None,
password: str | None = None,
port: int = 3306,
database: str | None = None,
autocommit: bool = True,
**kwargs,
) -> None:
Expand All @@ -113,12 +109,10 @@ def do_connect(
Password
port
Port
database
Database to connect to
autocommit
Autocommit mode
kwargs
Additional keyword arguments passed to `pymysql.connect`
Additional keyword arguments passed to `MySQLdb.connect`
Examples
--------
Expand Down Expand Up @@ -148,22 +142,20 @@ def do_connect(
year int32
month int32
"""
self.con = pymysql.connect(
self.con = MySQLdb.connect(
user=user,
host=host,
host="127.0.0.1" if host == "localhost" else host,
port=port,
password=password,
database=database,
autocommit=autocommit,
conv=pymysql.converters.conversions,
**kwargs,
)

self._post_connect()

@util.experimental
@classmethod
def from_connection(cls, con: pymysql.Connection) -> Backend:
def from_connection(cls, con) -> Backend:
"""Create an Ibis client from an existing connection to a MySQL database.
Parameters
Expand All @@ -178,7 +170,7 @@ def from_connection(cls, con: pymysql.Connection) -> Backend:
return new_backend

def _post_connect(self) -> None:
with contextlib.closing(self.con.cursor()) as cur:
with self.con.cursor() as cur:
try:
cur.execute("SET @@session.time_zone = 'UTC'")
except Exception as e: # noqa: BLE001
Expand All @@ -197,24 +189,34 @@ def list_databases(self, like: str | None = None) -> list[str]:
return self._filter_with_like(databases, like)

def _get_schema_using_query(self, query: str) -> sch.Schema:
with self.begin() as cur:
cur.execute(
sg.select(STAR)
.from_(
sg.parse_one(query, dialect=self.dialect).subquery(
sg.to_identifier("tmp", quoted=self.compiler.quoted)
)
from ibis.backends.mysql.datatypes import _type_from_cursor_info

sql = (
sg.select(STAR)
.from_(
sg.parse_one(query, dialect=self.dialect).subquery(
sg.to_identifier("tmp", quoted=self.compiler.quoted)
)
.limit(0)
.sql(self.dialect)
)

return sch.Schema(
{
field.name: _type_from_cursor_info(descr, field)
for descr, field in zip(cur.description, cur._result.fields)
}
.limit(0)
.sql(self.dialect)
)
with self.begin() as cur:
cur.execute(sql)
descr, flags = cur.description, cur.description_flags

items = {}
for (name, type_code, _, _, field_length, scale, _), raw_flags in zip(
descr, flags
):
item = _type_from_cursor_info(
flags=raw_flags,
type_code=type_code,
field_length=field_length,
scale=scale,
)
items[name] = item
return sch.Schema(items)

def get_schema(
self, name: str, *, catalog: str | None = None, database: str | None = None
Expand Down Expand Up @@ -253,38 +255,52 @@ def drop_database(self, name: str, force: bool = False) -> None:
def begin(self):
con = self.con
cur = con.cursor()
autocommit = con.get_autocommit()

if not autocommit:
con.begin()

Check warning on line 261 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L261

Added line #L261 was not covered by tests

try:
yield cur
except Exception:
con.rollback()
if not autocommit:
con.rollback()

Check warning on line 267 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L267

Added line #L267 was not covered by tests
raise
else:
con.commit()
if not autocommit:
con.commit()

Check warning on line 271 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L271

Added line #L271 was not covered by tests
finally:
cur.close()

# TODO(kszucs): should make it an abstract method or remove the use of it
# from .execute()
@contextlib.contextmanager
def _safe_raw_sql(self, *args, **kwargs):
with contextlib.closing(self.raw_sql(*args, **kwargs)) as result:
with self.raw_sql(*args, **kwargs) as result:
yield result

def raw_sql(self, query: str | sg.Expression, **kwargs: Any) -> Any:
with contextlib.suppress(AttributeError):
query = query.sql(dialect=self.name)

con = self.con
autocommit = con.get_autocommit()

cursor = con.cursor()

if not autocommit:
con.begin()

Check warning on line 292 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L292

Added line #L292 was not covered by tests

try:
cursor.execute(query, **kwargs)
except Exception:
con.rollback()
if not autocommit:
con.rollback()

Check warning on line 298 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L298

Added line #L298 was not covered by tests
cursor.close()
raise
else:
con.commit()
if not autocommit:
con.commit()

Check warning on line 303 in ibis/backends/mysql/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/mysql/__init__.py#L303

Added line #L303 was not covered by tests
return cursor

# TODO: disable positional arguments
Expand Down Expand Up @@ -401,11 +417,9 @@ def create_table(
if temp:
properties.append(sge.TemporaryProperty())

temp_memtable_view = None
if obj is not None:
if not isinstance(obj, ir.Expr):
table = ibis.memtable(obj)
temp_memtable_view = table.op().name
else:
table = obj

Expand All @@ -423,39 +437,33 @@ def create_table(
if not schema:
schema = table.schema()

table_expr = sg.table(temp_name, catalog=database, quoted=self.compiler.quoted)
target = sge.Schema(
this=table_expr, expressions=schema.to_sqlglot(self.dialect)
)
quoted = self.compiler.quoted
dialect = self.dialect

table_expr = sg.table(temp_name, catalog=database, quoted=quoted)
target = sge.Schema(this=table_expr, expressions=schema.to_sqlglot(dialect))

create_stmt = sge.Create(
kind="TABLE",
this=target,
properties=sge.Properties(expressions=properties),
kind="TABLE", this=target, properties=sge.Properties(expressions=properties)
)

this = sg.table(name, catalog=database, quoted=self.compiler.quoted)
this = sg.table(name, catalog=database, quoted=quoted)
with self._safe_raw_sql(create_stmt) as cur:
if query is not None:
insert_stmt = sge.Insert(this=table_expr, expression=query).sql(
self.name
)
cur.execute(insert_stmt)
cur.execute(sge.Insert(this=table_expr, expression=query).sql(dialect))

if overwrite:
cur.execute(sge.Drop(kind="TABLE", this=this, exists=True).sql(dialect))
cur.execute(
sge.Drop(kind="TABLE", this=this, exists=True).sql(self.name)
)
cur.execute(
f"ALTER TABLE IF EXISTS {table_expr.sql(self.name)} RENAME TO {this.sql(self.name)}"
sge.Alter(
kind="TABLE",
this=table_expr,
exists=True,
actions=[sge.RenameTable(this=this)],
).sql(dialect)
)

if schema is None:
# Clean up temporary memtable if we've created one
# for in-memory reads
if temp_memtable_view is not None:
self.drop_table(temp_memtable_view)

return self.table(name, database=database)

# preserve the input schema if it was provided
Expand All @@ -472,7 +480,7 @@ def _in_memory_table_exists(self, name: str) -> bool:
with self.begin() as cur:
cur.execute(sql)
cur.fetchall()
except pymysql.err.ProgrammingError as e:
except MySQLdb.ProgrammingError as e:
err_code, _ = e.args
if err_code == ER.NO_SUCH_TABLE:
return False
Expand All @@ -490,16 +498,17 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:

name = op.name
quoted = self.compiler.quoted
dialect = self.dialect

create_stmt = sg.exp.Create(
kind="TABLE",
this=sg.exp.Schema(
this=sg.to_identifier(name, quoted=quoted),
expressions=schema.to_sqlglot(self.dialect),
expressions=schema.to_sqlglot(dialect),
),
properties=sg.exp.Properties(expressions=[sge.TemporaryProperty()]),
)
create_stmt_sql = create_stmt.sql(self.name)
create_stmt_sql = create_stmt.sql(dialect)

df = op.data.to_frame()
# nan can not be used with MySQL
Expand Down Expand Up @@ -544,23 +553,7 @@ def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame:

from ibis.backends.mysql.converter import MySQLPandasData

try:
df = pd.DataFrame.from_records(
cursor, columns=schema.names, coerce_float=True
)
except Exception:
# clean up the cursor if we fail to create the DataFrame
#
# in the sqlite case failing to close the cursor results in
# artificially locked tables
cursor.close()
raise
df = MySQLPandasData.convert_table(df, schema)
return df

def _finalize_memtable(self, name: str) -> None:
"""No-op.
Executing **any** SQL in a finalizer causes the underlying connection
socket to be set to `None`. It is unclear why this happens.
"""
df = pd.DataFrame.from_records(
cursor.fetchall(), columns=schema.names, coerce_float=True
)
return MySQLPandasData.convert_table(df, schema)
Loading

0 comments on commit bb1eecb

Please sign in to comment.