Skip to content

Commit

Permalink
Update experimental JSON support (ClickHouse#438)
Browse files Browse the repository at this point in the history
* Update JSON type to handle new serialization version

* Fix lint

* Maintain JSON compatibility across ClickHouse 24.8 and 24.10
  • Loading branch information
genzgd authored Dec 14, 2024
1 parent acc5b1d commit dd9c9db
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 34 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/on_push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ jobs:
- '3.13'
clickhouse-version:
- '24.3'
- '24.7'
- '24.8'
- '24.9'
- '24.10'
- '24.11'
- latest

name: Local Tests Py=${{ matrix.python-version }} CH=${{ matrix.clickhouse-version }}
Expand Down
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,28 @@
Python 3.8 was EOL on 2024-10-07. It is no longer tested, and versions after 2025-04-07 will not include Python
3.8 wheel distributions.

### WARNING -- JSON Incompatibility between versions 22.8 and 22.10
The internal serialization format for experimental JSON was updated in ClickHouse version 24.10. `clickhouse-connect`
will set the compatibility level on a global basis based on the last client created, so multiple clients using the
library with mixed versions 22.8/22.9 and 22.10 and later versions will break. If you need JSON support for mixed
versions you must use different Python interpreters for each version.

### WARNING -- Impending Breaking Change - Server Settings in DSN
When creating a DBAPI Connection method using the Connection constructor or a SQLAlchemy DSN, the library currently
converts any unrecognized keyword argument/query parameter to a ClickHouse server setting. Starting in the next minor
release (0.9.0), unrecognized arguments/keywords for these methods of creating a DBAPI connection will raise an exception
instead of being passed as ClickHouse server settings. This is in conjunction with some refactoring in Client construction.
The supported method of passing ClickHouse server settings is to prefix such arguments/query parameters with`ch_`.

## 0.8.10, 2024-12-14
### Bug Fixes
- The experimental JSON type would break in some circumstances with ClickHouse server version 24.10 and later. This has
been fixed. The fix is incompatible with ClickHouse version 24.8 and 24.9 however, so see the above WARNING about
mixing JSON types
- Experimental JSON types within a Tuple was broken. This has been fixed; however, the fix fails on ClickHouse server
versions 24.8 and 24.9. If you need Tuple(JSON) support, you must use ClickHouse server version 24.10 or later.
Closes https://github.com/ClickHouse/clickhouse-connect/issues/436.

## 0.8.9, 2024-12-02
### Bug Fix
- Roll back some timezone changes that caused incorrect usage of "local time" objects for some ClickHouse queries. Note that
Expand Down
2 changes: 1 addition & 1 deletion clickhouse_connect/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = '0.8.9'
version = '0.8.10'
30 changes: 22 additions & 8 deletions clickhouse_connect/datatypes/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from clickhouse_connect.datatypes.base import ClickHouseType, TypeDef
from clickhouse_connect.datatypes.registry import get_from_name
from clickhouse_connect.driver.common import unescape_identifier, first_value
from clickhouse_connect.driver.common import unescape_identifier, first_value, write_uint64
from clickhouse_connect.driver.ctypes import data_conv
from clickhouse_connect.driver.errors import handle_error
from clickhouse_connect.driver.exceptions import DataError
Expand All @@ -14,6 +14,8 @@
SHARED_DATA_TYPE: ClickHouseType
STRING_DATA_TYPE: ClickHouseType

json_serialization_format = 0x1

class Variant(ClickHouseType):
_slots = 'element_types'
python_type = object
Expand Down Expand Up @@ -86,9 +88,11 @@ def write_column_data(self, column: Sequence, dest: bytearray, ctx: InsertContex


def read_dynamic_prefix(source: ByteSource) -> List[ClickHouseType]:
if source.read_uint64() != 1: # dynamic structure serialization version, currently only 1 is recognized
serialize_version = source.read_uint64()
if serialize_version == 1:
source.read_leb128() # max dynamic types, we ignore this value
elif serialize_version != 2:
raise DataError('Unrecognized dynamic structure version')
source.read_leb128() # max dynamic types, we ignore this value
num_variants = source.read_leb128()
variant_types = [get_from_name(source.read_leb128_str()) for _ in range(num_variants)]
variant_types.append(STRING_DATA_TYPE)
Expand Down Expand Up @@ -188,13 +192,23 @@ def __init__(self, type_def:TypeDef):

@property
def insert_name(self):
return 'String'
if json_serialization_format == 0:
return 'String'
return super().insert_name

def write_column_prefix(self, dest: bytearray):
if json_serialization_format > 0:
write_uint64(json_serialization_format, dest)

def read_column_prefix(self, source: ByteSource, ctx: QueryContext):
serialize_version = source.read_uint64()
if serialize_version == 0:
source.read_leb128() # max dynamic types, we ignore this value
elif serialize_version != 2:
raise DataError(f'Unrecognized dynamic structure version: {serialize_version} column: `{ctx.column_name}`')

# pylint: disable=too-many-locals
def read_column(self, source: ByteSource, num_rows: int, ctx: QueryContext):
if source.read_uint64() != 0: # object serialization version, currently only 0 is recognized
raise DataError(f'unrecognized object serialization version, column `{ctx.column_name}`')
source.read_leb128() # the max number of dynamic paths. Used to preallocate storage in ClickHouse; we ignore it
def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext):
dynamic_path_cnt = source.read_leb128()
dynamic_paths = [source.read_leb128_str() for _ in range(dynamic_path_cnt)]
for typed in self.typed_types:
Expand Down
6 changes: 2 additions & 4 deletions clickhouse_connect/driver/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,8 @@ def read_array(self, array_type: str, num_rows: int) -> Iterable[Any]:
return column

@property
def last_message(self):
if len(self.buffer) == 0:
return None
return self.buffer.decode()
def last_message(self) -> bytes:
return self.buffer

def close(self):
if self.source:
Expand Down
5 changes: 4 additions & 1 deletion clickhouse_connect/driver/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from clickhouse_connect.common import version
from clickhouse_connect.datatypes.registry import get_from_name
from clickhouse_connect.datatypes.base import ClickHouseType
from clickhouse_connect.datatypes import dynamic as dynamic_module
from clickhouse_connect.driver import tzutil
from clickhouse_connect.driver.common import dict_copy, StreamContext, coerce_int, coerce_bool
from clickhouse_connect.driver.constants import CH_VERSION_WITH_PROTOCOL, PROTOCOL_VERSION_WITH_LOW_CARD
Expand Down Expand Up @@ -90,7 +91,7 @@ def _init_common_settings(self, apply_server_timezone:Optional[Union[str, bool]]
server_settings = self.query(f'SELECT name, value, {readonly} as readonly FROM system.settings LIMIT 10000')
self.server_settings = {row['name']: SettingDef(**row) for row in server_settings.named_results()}

if self.min_version(CH_VERSION_WITH_PROTOCOL):
if self.min_version(CH_VERSION_WITH_PROTOCOL) and common.get_setting('use_protocol_version'):
# Unfortunately we have to validate that the client protocol version is actually used by ClickHouse
# since the query parameter could be stripped off (in particular, by CHProxy)
test_data = self.raw_query('SELECT 1 AS check', fmt='Native', settings={
Expand All @@ -103,6 +104,8 @@ def _init_common_settings(self, apply_server_timezone:Optional[Union[str, bool]]
if self._setting_status('allow_experimental_json_type').is_set and \
self._setting_status('cast_string_to_dynamic_user_inference').is_writable:
self.set_client_setting('cast_string_to_dynamic_use_inference', '1')
if self.min_version('24.8') and not self.min_version('24.10'):
dynamic_module.json_serialization_format = 0


def _validate_settings(self, settings: Optional[Dict[str, Any]]) -> Dict[str, str]:
Expand Down
21 changes: 14 additions & 7 deletions clickhouse_connect/driver/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,7 @@ def get_block():
# We ran out of data before it was expected, this could be ClickHouse reporting an error
# in the response
if source.last_message:
message = source.last_message
if len(message) > 1024:
message = message[-1024:]
error_start = message.find('Code: ')
if error_start != -1:
message = message[error_start:]
raise StreamFailureError(message) from None
raise StreamFailureError(extract_error_message(source.last_message)) from None
raise
block_num += 1
return result_block
Expand Down Expand Up @@ -119,3 +113,16 @@ def chunk_gen():
yield footer

return chunk_gen()


def extract_error_message(message: bytes) -> str:
if len(message) > 1024:
message = message[-1024:]
error_start = message.find('Code: '.encode())
if error_start != -1:
message = message[error_start:]
try:
message_str = message.decode()
except UnicodeError:
message_str = f'unrecognized data found in stream: `{message.hex()[128:]}`'
return message_str
2 changes: 1 addition & 1 deletion clickhouse_connect/driver/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def close(self):


class ByteSource(Closable):
last_message = None
last_message:bytes = None

@abstractmethod
def read_leb128(self) -> int:
Expand Down
6 changes: 3 additions & 3 deletions clickhouse_connect/driverc/buffer.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ cdef class ResponseBuffer:

@cython.boundscheck(False)
@cython.wraparound(False)
def read_byte(self) -> int:
def read_byte(self) -> int:
if self.buf_loc < self.buf_sz:
b = self.buffer[self.buf_loc]
self.buf_loc += 1
Expand Down Expand Up @@ -299,8 +299,8 @@ cdef class ResponseBuffer:
@property
def last_message(self):
if self.buffer == NULL:
return None
return self.buffer[self.buf_sz:].decode()
return self.slice[0:]
return self.buffer[self.buf_sz:]

def __dealloc__(self):
self.close()
Expand Down
32 changes: 24 additions & 8 deletions tests/integration_tests/test_dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@
from clickhouse_connect.driver import Client


def type_available(test_client: Client, data_type: str):
if not test_client.get_client_setting(f'allow_experimental_{data_type}_type'):
pytest.skip(f'New {data_type.upper()} type not available in this version: {test_client.server_version}')


def test_variant(test_client: Client, table_context: Callable):
if not test_client.get_client_setting('allow_experimental_variant_type'):
pytest.skip(f'New Variant type not available in this version: {test_client.server_version}')
type_available(test_client, 'variant')
with table_context('basic_variants', [
'key Int32',
'v1 Variant(UInt64, String, Array(UInt64), UUID)',
Expand All @@ -30,8 +34,7 @@ def test_variant(test_client: Client, table_context: Callable):


def test_dynamic(test_client: Client, table_context: Callable):
if not test_client.get_client_setting('allow_experimental_dynamic_type'):
pytest.skip(f'New Dynamic type not available in this version: {test_client.server_version}')
type_available(test_client, 'dynamic')
with table_context('basic_dynamic', [
'key UInt64',
'v1 Dynamic',
Expand All @@ -49,8 +52,7 @@ def test_dynamic(test_client: Client, table_context: Callable):


def test_basic_json(test_client: Client, table_context: Callable):
if not test_client.get_client_setting('allow_experimental_json_type'):
pytest.skip(f'New JSON type not available in this version: {test_client.server_version}')
type_available(test_client, 'json')
with table_context('new_json_basic', [
'key Int32',
'value JSON',
Expand Down Expand Up @@ -89,8 +91,7 @@ def test_basic_json(test_client: Client, table_context: Callable):


def test_typed_json(test_client: Client, table_context: Callable):
if not test_client.get_client_setting('allow_experimental_json_type'):
pytest.skip(f'New JSON type not available in this version: {test_client.server_version}')
type_available(test_client, 'json')
with table_context('new_json_typed', [
'key Int32',
'value JSON(max_dynamic_paths=150, `a.b` DateTime64(3), SKIP a.c)'
Expand All @@ -100,3 +101,18 @@ def test_typed_json(test_client: Client, table_context: Callable):
result = test_client.query('SELECT * FROM new_json_typed ORDER BY key')
json1 = result.result_set[0][1]
assert json1['a']['b'] == datetime.datetime(2020, 10, 15, 10, 15, 44, 877000)


def test_complex_json(test_client: Client, table_context: Callable):
type_available(test_client, 'json')
if not test_client.min_version('24.10'):
pytest.skip('Complex JSON broken before 24.10')
with table_context('new_json_complex', [
'key Int32',
'value Tuple(t JSON)'
]):
data = [[100, ({'a': 'qwe123', 'b': 'main', 'c': None},)]]
test_client.insert('new_json_complex', data)
result = test_client.query('SELECT * FROM new_json_complex ORDER BY key')
json1 = result.result_set[0][1]
assert json1['t']['a'] == 'qwe123'

0 comments on commit dd9c9db

Please sign in to comment.