Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: RowBinary for SELECT operations #257

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
Prev Previous commit
Next Next commit
Floats, adjust columns decoding.
  • Loading branch information
slvrtrn committed Mar 6, 2024
commit 26115d2bc2e9f126279b53cec8b020610e8b9388
2 changes: 1 addition & 1 deletion .docker/clickhouse/single_node_tls/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM clickhouse/clickhouse-server:23.11-alpine
FROM clickhouse/clickhouse-server:24.2-alpine
COPY .docker/clickhouse/single_node_tls/certificates /etc/clickhouse-server/certs
RUN chown clickhouse:clickhouse -R /etc/clickhouse-server/certs \
&& chmod 600 /etc/clickhouse-server/certs/* \
63 changes: 43 additions & 20 deletions benchmarks/leaks/row_binary.ts
Original file line number Diff line number Diff line change
@@ -33,36 +33,58 @@ LIMIT 5000000

*/

const query = `SELECT * FROM fluff ORDER BY id ASC LIMIT 1000000`
const limit = 50000
const query = `SELECT * FROM fluff ORDER BY id ASC LIMIT 5`
// const query = `SELECT * FROM large_strings ORDER BY id ASC LIMIT ${limit}`
// const query = `SELECT * EXCEPT (i128, i256, u128, u256) FROM fluff ORDER BY id ASC LIMIT ${limit}`

void (async () => {
const client = createClient({
url: 'http://localhost:8123',
})

async function benchmarkJSONEachRow() {
async function benchmarkJSON(format: 'JSONEachRow' | 'JSONCompactEachRow') {
const start = +new Date()
const rs = await client.query({
query,
format: 'JSONCompactEachRow',
format,
})
const values = []
let total = 0
await new Promise((resolve, reject) => {
rs.stream()
.on('data', (rows: Row[]) => {
rows.forEach((row) => {
values.push(row.json())
console.log(row.json())
total++
})
})
.on('end', resolve)
.on('error', reject)
})
console.log(
`JSONCompactEachRow elapsed: ${+new Date() - start} ms, total: ${
values.length
}`
)
return values.length
console.log(`${format} elapsed: ${+new Date() - start} ms, total: ${total}`)
return total
}

async function benchmarkCSV() {
const start = +new Date()
const rs = await client.query({
query,
format: 'CSV',
})
let total = 0
await new Promise((resolve, reject) => {
rs.stream()
.on('data', (rows: Row[]) => {
rows.forEach((row) => {
row.text.split(',')
total++
})
})
.on('end', resolve)
.on('error', reject)
})
console.log(`CSV elapsed: ${+new Date() - start} ms, total: ${total}`)
return total
}

async function benchmarkRowBinary() {
@@ -71,29 +93,30 @@ void (async () => {
query,
format: 'RowBinary',
})
const values: unknown[][] = []
let total = 0
await new Promise((resolve, reject) => {
;(rs as RowBinaryResultSet)
.stream()
.on('data', (rows: unknown[][]) => {
rows.forEach((row) => {
values.push(row)
total++
// if (total === limit) {
console.log(`Last row`, row)
// }
})
})
.on('end', resolve)
.on('error', reject)
})
console.log(
`RowBinary elapsed: ${+new Date() - start} ms, total: ${values.length}`
)
return values.length
console.log(`RowBinary elapsed: ${+new Date() - start} ms, total: ${total}`)
return total
}

attachExceptionHandlers()
for (let i = 0; i < 10; i++) {
await benchmarkJSONEachRow()
for (let i = 0; i < 3; i++) {
await benchmarkJSON('JSONCompactEachRow')
// await benchmarkCSV()
await benchmarkRowBinary()
}

process.exit(0)
})()
4 changes: 2 additions & 2 deletions docker-compose.cluster.yml
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ version: '2.3'

services:
clickhouse1:
image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-23.11-alpine}'
image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-24.2-alpine}'
ulimits:
nofile:
soft: 262144
@@ -19,7 +19,7 @@ services:
- './.docker/clickhouse/users.xml:/etc/clickhouse-server/users.xml'

clickhouse2:
image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-23.11-alpine}'
image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-24.2-alpine}'
ulimits:
nofile:
soft: 262144
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '3.8'
services:
clickhouse:
image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-23.11-alpine}'
image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-24.2-alpine}'
container_name: 'clickhouse-js-clickhouse-server'
ports:
- '8123:8123'
29 changes: 13 additions & 16 deletions packages/client-common/__tests__/unit/row_binary_decoders.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
import {
removeLowCardinality,
RowBinaryTypesDecoder,
} from '../../src/data_formatter'
import { RowBinaryTypesDecoder } from '../../src/data_formatter'

fdescribe('RowBinary decoders', () => {
it('should decode Date', () => {
@@ -23,16 +20,16 @@ fdescribe('RowBinary decoders', () => {
})
})

it('should remove low cardinality', async () => {
const args: [string, string][] = [
['LowCardinality(String)', 'String'],
['LowCardinality(Nullable(String))', 'Nullable(String)'],
['LowCardinality(Array(String))', 'Array(String)'],
['Nullable(String)', 'Nullable(String)'],
['String', 'String'],
]
args.forEach(([src, expected]) => {
expect(removeLowCardinality(src)).toEqual(expected)
})
})
// it('should remove low cardinality', async () => {
// const args: [string, string][] = [
// ['LowCardinality(String)', 'String'],
// ['LowCardinality(Nullable(String))', 'Nullable(String)'],
// ['LowCardinality(Array(String))', 'Array(String)'],
// ['Nullable(String)', 'Nullable(String)'],
// ['String', 'String'],
// ]
// args.forEach(([src, expected]) => {
// expect(re(src)).toEqual(expected)
// })
// })
})
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
readBytesAsFloat32,
readBytesAsUnsignedBigInt,
readBytesAsUnsignedInt,
} from '../../src/data_formatter'
@@ -90,6 +91,27 @@ fdescribe('RowBinary read bytes', () => {
})
})

fdescribe('Floats', () => {
it('should decode Float32', async () => {
const args: [Uint8Array, number][] = [
[new Uint8Array([0x00, 0x00, 0x00, 0x00]), 0],
// some reference values from a random dataset (not 100% matching the CH output, because floats)
[new Uint8Array([151, 136, 46, 6]), 3.2826113095459874e-35],
[new Uint8Array([176, 183, 118, 153]), -1.2754997313209913e-23],
[new Uint8Array([114, 233, 40, 161]), -5.72295763540352e-19],
[new Uint8Array([112, 205, 62, 233]), -1.4416628555694005e25],
[new Uint8Array([43, 253, 113, 82]), 259833643008],
[new Uint8Array([165, 173, 250, 112]), 6.206494065007942e29],
[new Uint8Array([175, 228, 124, 108]), 1.2229169371247749e27],
]
args.forEach(([src, expected]) => {
expect(readBytesAsFloat32(src, 0))
.withContext(ctx(src, expected))
.toBe(expected)
})
})
})

function ctx(src: Uint8Array, expected: number | bigint) {
return `Expected ${src.toString()} to be decoded as ${expected}`
}
123 changes: 123 additions & 0 deletions packages/client-common/src/data_formatter/row_binary/columns.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import type { DecodeResult } from './read_bytes'
import { readBytesAsUnsignedLEB128 } from './read_bytes'
import type {
ColumnType,
DecodedColumnType,
DecodeError,
TypeDecoder,
} from './types'
import { RowBinaryColumnTypeToDecoder, RowBinaryTypesDecoder } from './types'

export type DecodedColumns = DecodeResult<{
names: string[]
types: DecodedColumnType[]
decoders: TypeDecoder[]
}>

export const RowBinaryColumns = {
decode: (src: Uint8Array): DecodedColumns | DecodeError => {
const res = readBytesAsUnsignedLEB128(src, 0)
if (res === null) {
return { error: 'Not enough data to decode the number of columns' }
}
const numColumns = res[0]
let nextLoc = res[1]
const names = new Array<string>(numColumns)
const types = new Array<DecodedColumnType>(numColumns)
const decoders: TypeDecoder[] = new Array<TypeDecoder>(numColumns)
for (let i = 0; i < numColumns; i++) {
const res = RowBinaryTypesDecoder.string(src, nextLoc)
if (res === null) {
return { error: `Not enough data to decode column ${i} name` }
}
nextLoc = res[1]
names[i] = res[0]
}
for (let i = 0; i < numColumns; i++) {
const res = RowBinaryTypesDecoder.string(src, nextLoc)
if (res === null) {
return { error: `Not enough data to decode column ${i} type` }
}
nextLoc = res[1]
const decodedColumn = decodeColumnType(res[0])
if (!(decodedColumn.columnType in RowBinaryColumnTypeToDecoder)) {
return {
error: `No matching type decoder for client type in ${decodedColumn}`,
}
}
const columnType = decodedColumn.columnType as ColumnType
const typeDecoder = RowBinaryColumnTypeToDecoder[columnType]
decoders[i] = decodedColumn.isNullable
? RowBinaryTypesDecoder.nullable(typeDecoder)
: typeDecoder
types[i] = {
...decodedColumn,
columnType,
}
}
// console.log(`Decoded columns: ${names}, ${types}`)
return [{ names, types, decoders }, nextLoc]
},
}

type DecodeColumnSimpleType = {
type: 'Simple'
// from ClickHouse as is
dbType: string
// without LowCardinality and Nullable
columnType: string
isNullable: boolean
isLowCardinality: boolean
}
type DecodeColumnArrayType = {
type: 'Array'
innerType:
| DecodeColumnSimpleType
| DecodeColumnArrayType
| DecodeColumnMapType
}
type DecodeColumnMapType = {
type: 'Map'
keyType: DecodeColumnSimpleType
valueType:
| DecodeColumnSimpleType
| DecodeColumnArrayType
| DecodeColumnMapType
}
type DecodeColumnTypeResult =
| DecodeColumnSimpleType
| DecodeColumnArrayType
| DecodeColumnMapType

export function decodeColumnType(dbType: string): {
// from ClickHouse as is
dbType: string
// without LowCardinality and Nullable
columnType: string
isNullable: boolean
isLowCardinality: boolean
type: 'Simple'
} {
// if (dbType.startsWith('Map(')) {
// dbType = dbType.slice(4, -1)
//
// }
let columnType = dbType
let isNullable = false
let isLowCardinality = false
if (columnType.startsWith('LowCardinality')) {
columnType = columnType.slice(15, -1)
isLowCardinality = true
}
if (columnType.startsWith('Nullable')) {
columnType = columnType.slice(9, -1)
isNullable = true
}
return {
dbType,
columnType,
isNullable,
isLowCardinality,
type: 'Simple',
}
}
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './decoder'
export * from './columns'
export * from './read_bytes'
export * from './types'
13 changes: 13 additions & 0 deletions packages/client-common/src/data_formatter/row_binary/read_bytes.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Decoded value + the next index to scan from
export type DecodeResult<T> = [T, number]

// May return null since we cannot determine how many bytes we need to read in advance
export function readBytesAsUnsignedLEB128(
src: Uint8Array,
loc: number
@@ -23,6 +24,8 @@ export function readBytesAsUnsignedLEB128(
}
}

// FIXME: use DecodeResult | null for all methods and do the validation here
// instead of relying on the caller
export function readBytesAsUnsignedInt(
src: Uint8Array,
loc: number,
@@ -46,3 +49,13 @@ export function readBytesAsUnsignedBigInt(
}
return result
}

export function readBytesAsFloat32(src: Uint8Array, loc: number) {
// FIXME: maybe can be optimized without DataView
return new DataView(src.buffer.slice(loc, loc + 4)).getFloat32(0, true)
}

export function readBytesAsFloat64(src: Uint8Array, loc: number) {
// FIXME: maybe can be optimized without DataView
return new DataView(src.buffer.slice(loc, loc + 8)).getFloat64(0, true)
}
Loading