From 83e6c80264da434688beefb2a70d7682ca898a1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Wed, 27 Nov 2024 11:05:10 +0000 Subject: [PATCH] [DOP-21732] Fix Oracle reading with partitioning_mode=hash --- docs/changelog/next_release/319.bugfix.rst | 1 + onetl/connection/db_connection/clickhouse/dialect.py | 2 +- onetl/connection/db_connection/mssql/dialect.py | 2 +- onetl/connection/db_connection/oracle/dialect.py | 4 +++- 4 files changed, 6 insertions(+), 3 deletions(-) create mode 100644 docs/changelog/next_release/319.bugfix.rst diff --git a/docs/changelog/next_release/319.bugfix.rst b/docs/changelog/next_release/319.bugfix.rst new file mode 100644 index 000000000..82a6eebc4 --- /dev/null +++ b/docs/changelog/next_release/319.bugfix.rst @@ -0,0 +1 @@ +Fix ``DBReader(conn=oracle, options={"partitioning_mode": "hash"})`` lead to data skew in last partition due to wrong ``ora_hash`` usage. diff --git a/onetl/connection/db_connection/clickhouse/dialect.py b/onetl/connection/db_connection/clickhouse/dialect.py index 394843b80..1ee213d0e 100644 --- a/onetl/connection/db_connection/clickhouse/dialect.py +++ b/onetl/connection/db_connection/clickhouse/dialect.py @@ -10,7 +10,7 @@ class ClickhouseDialect(JDBCDialect): def get_partition_column_hash(self, partition_column: str, num_partitions: int) -> str: - return f"modulo(halfMD5({partition_column}), {num_partitions})" + return f"halfMD5({partition_column}) % {num_partitions}" def get_partition_column_mod(self, partition_column: str, num_partitions: int) -> str: return f"{partition_column} % {num_partitions}" diff --git a/onetl/connection/db_connection/mssql/dialect.py b/onetl/connection/db_connection/mssql/dialect.py index 6be43c802..3cb809ad2 100644 --- a/onetl/connection/db_connection/mssql/dialect.py +++ b/onetl/connection/db_connection/mssql/dialect.py @@ -10,7 +10,7 @@ class MSSQLDialect(JDBCDialect): # https://docs.microsoft.com/ru-ru/sql/t-sql/functions/hashbytes-transact-sql?view=sql-server-ver16 def get_partition_column_hash(self, partition_column: str, num_partitions: int) -> str: - return f"CONVERT(BIGINT, HASHBYTES ( 'SHA' , {partition_column} )) % {num_partitions}" + return f"CONVERT(BIGINT, HASHBYTES ('SHA', {partition_column})) % {num_partitions}" def get_partition_column_mod(self, partition_column: str, num_partitions: int) -> str: return f"{partition_column} % {num_partitions}" diff --git a/onetl/connection/db_connection/oracle/dialect.py b/onetl/connection/db_connection/oracle/dialect.py index 2f1218716..c7a739039 100644 --- a/onetl/connection/db_connection/oracle/dialect.py +++ b/onetl/connection/db_connection/oracle/dialect.py @@ -43,7 +43,9 @@ def get_sql_query( ) def get_partition_column_hash(self, partition_column: str, num_partitions: int) -> str: - return f"ora_hash({partition_column}, {num_partitions})" + # ora_hash returns values from 0 to N including N. + # Balancing N+1 splits to N partitions leads to data skew in last partition. + return f"ora_hash({partition_column}, {num_partitions - 1})" def get_partition_column_mod(self, partition_column: str, num_partitions: int) -> str: return f"MOD({partition_column}, {num_partitions})"