Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Parquet format support for streaming #208

Merged
merged 2 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
## 0.2.6 (Common, Node.js)

### New features

- Added [Parquet format](https://clickhouse.com/docs/en/integrations/data-formats/parquet) streaming support.
See the new examples:
[insert from a file](./examples/node/insert_file_stream_parquet.ts),
[select into a file](./examples/node/select_parquet_as_file.ts).

## 0.2.5 (Common, Node.js, Web)

### Bug fixes
Expand Down
3 changes: 2 additions & 1 deletion examples/node/insert_file_stream_csv.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { createClient } from '@clickhouse/client'
import type { Row } from '@clickhouse/client-common'
import Fs from 'fs'
import { cwd } from 'node:process'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀

import Path from 'path'

void (async () => {
Expand All @@ -19,7 +20,7 @@ void (async () => {
})

// contains data as 1,"foo","[1,2]"\n2,"bar","[3,4]"\n...
const filename = Path.resolve(process.cwd(), './node/resources/data.csv')
const filename = Path.resolve(cwd(), './node/resources/data.csv')

await client.insert({
table: tableName,
Expand Down
3 changes: 2 additions & 1 deletion examples/node/insert_file_stream_ndjson.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { Row } from '@clickhouse/client'
import { createClient } from '@clickhouse/client'
import Fs from 'fs'
import { cwd } from 'node:process'
import Path from 'path'
import split from 'split2'

Expand All @@ -20,7 +21,7 @@ void (async () => {

// contains id as numbers in JSONCompactEachRow format ["0"]\n["0"]\n...
// see also: NDJSON format
const filename = Path.resolve(process.cwd(), './node/resources/data.ndjson')
const filename = Path.resolve(cwd(), './node/resources/data.ndjson')

await client.insert({
table: tableName,
Expand Down
58 changes: 58 additions & 0 deletions examples/node/insert_file_stream_parquet.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { createClient } from '@clickhouse/client'
import type { Row } from '@clickhouse/client-common'
import Fs from 'fs'
import { cwd } from 'node:process'
import Path from 'path'

void (async () => {
const client = createClient()
const tableName = 'insert_file_stream_parquet'
await client.command({
query: `DROP TABLE IF EXISTS ${tableName}`,
})
await client.command({
query: `
CREATE TABLE ${tableName}
(id UInt64, name String, sku Array(UInt8))
ENGINE MergeTree()
ORDER BY (id)
`,
})

const filename = Path.resolve(cwd(), './node/resources/data.parquet')

/*

(examples) $ pqrs cat node/resources/data.parquet

############################
File: node/resources/data.parquet
############################

{id: 0, name: [97], sku: [1, 2]}
{id: 1, name: [98], sku: [3, 4]}
{id: 2, name: [99], sku: [5, 6]}

*/

await client.insert({
table: tableName,
values: Fs.createReadStream(filename),
format: 'Parquet',
})

const rs = await client.query({
query: `SELECT * from ${tableName}`,
format: 'JSONEachRow',
})

for await (const rows of rs.stream()) {
// or just `rows.json()`
// to consume the entire response at once
rows.forEach((row: Row) => {
console.log(row.json())
})
}

await client.close()
})()
Binary file added examples/node/resources/data.parquet
Binary file not shown.
42 changes: 42 additions & 0 deletions examples/node/select_parquet_as_file.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { createClient } from '@clickhouse/client'
import Fs from 'fs'
import { cwd } from 'node:process'
import Path from 'path'

void (async () => {
const client = createClient()

const { stream } = await client.exec({
query: `SELECT * from system.numbers LIMIT 10 FORMAT Parquet`,
})

const filename = Path.resolve(cwd(), './node/out.parquet')
const writeStream = Fs.createWriteStream(filename)
stream.pipe(writeStream)
await new Promise((resolve) => {
stream.on('end', resolve)
})

/*

(examples) $ pqrs cat node/out.parquet

#################
File: node/out.parquet
#################

{number: 0}
{number: 1}
{number: 2}
{number: 3}
{number: 4}
{number: 5}
{number: 6}
{number: 7}
{number: 8}
{number: 9}

*/

await client.close()
})()
Binary file not shown.
1 change: 1 addition & 0 deletions packages/client-common/src/data_formatter/formatter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const supportedRawFormats = [
'CustomSeparated',
'CustomSeparatedWithNames',
'CustomSeparatedWithNamesAndTypes',
'Parquet',
] as const

export type JSONDataFormat = (typeof supportedJSONFormats)[number]
Expand Down
2 changes: 1 addition & 1 deletion packages/client-common/src/version.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export default '0.2.5'
export default '0.2.6'
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { fakerRU } from '@faker-js/faker'
import { createSimpleTable } from '@test/fixtures/simple_table'
import { createTableWithFields } from '@test/fixtures/table_with_fields'
import { createTestClient, guid } from '@test/utils'
import { Buffer } from 'buffer'
import Fs from 'fs'
import split from 'split2'
import Stream from 'stream'
Expand All @@ -28,7 +29,7 @@ describe('[Node.js] streaming e2e', () => {
['2', 'c', [5, 6]],
]

it('should stream a file', async () => {
it('should stream an NDJSON file', async () => {
// contains id as numbers in JSONCompactEachRow format ["0"]\n["1"]\n...
const filename =
'packages/client-common/__tests__/fixtures/streaming_e2e_data.ndjson'
Expand All @@ -55,6 +56,43 @@ describe('[Node.js] streaming e2e', () => {
expect(actual).toEqual(expected)
})

it('should stream a Parquet file', async () => {
const filename =
'packages/client-common/__tests__/fixtures/streaming_e2e_data.parquet'
await client.insert({
table: tableName,
values: Fs.createReadStream(filename),
format: 'Parquet',
})

// check that the data was inserted correctly
const rs = await client.query({
query: `SELECT * from ${tableName}`,
format: 'JSONCompactEachRow',
})

const actual: unknown[] = []
for await (const rows of rs.stream()) {
rows.forEach((row: Row) => {
actual.push(row.json())
})
}
expect(actual).toEqual(expected)

// check if we can stream it back and get the output matching the input file
const stream = await client
.exec({
query: `SELECT * from ${tableName} FORMAT Parquet`,
})
.then((r) => r.stream)

const parquetChunks: Buffer[] = []
for await (const chunk of stream) {
parquetChunks.push(chunk)
}
expect(Buffer.concat(parquetChunks)).toEqual(Fs.readFileSync(filename))
})

it('should stream a stream created in-place', async () => {
await client.insert({
table: tableName,
Expand Down
2 changes: 1 addition & 1 deletion packages/client-node/src/version.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export default '0.2.5'
export default '0.2.6'
2 changes: 1 addition & 1 deletion packages/client-web/src/version.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export default '0.2.5'
export default '0.2.6'
Loading