diff --git a/CHANGELOG.md b/CHANGELOG.md index 70383eeb..7c831d63 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,18 @@ +## 0.2.9 (Common, Node.js, Web) + +### New features + +- It is now possible to set additional HTTP headers for outgoing ClickHouse requests. This might be useful if, for example, you use a reverse proxy with authorization. ([@teawithfruit](https://github.com/teawithfruit), [#224](https://github.com/ClickHouse/clickhouse-js/pull/224)) + +```ts +const client = createClient({ + additional_headers: { + 'X-ClickHouse-User': 'clickhouse_user', + 'X-ClickHouse-Key': 'clickhouse_password', + }, +}) +``` + ## 0.2.8 (Common, Node.js, Web) ### New features diff --git a/packages/client-common/src/client.ts b/packages/client-common/src/client.ts index ab0e5e45..ecfa3f90 100644 --- a/packages/client-common/src/client.ts +++ b/packages/client-common/src/client.ts @@ -92,7 +92,7 @@ export interface ClickHouseClientConfigOptions { level?: ClickHouseLogLevel } session_id?: string - additional_headers?: Record + additional_headers?: Record } export type BaseClickHouseClientConfigOptions = Omit< diff --git a/packages/client-common/src/connection.ts b/packages/client-common/src/connection.ts index d99df511..c52032d2 100644 --- a/packages/client-common/src/connection.ts +++ b/packages/client-common/src/connection.ts @@ -16,7 +16,7 @@ export interface ConnectionParams { clickhouse_settings: ClickHouseSettings logWriter: LogWriter application_id?: string - additional_headers?: Record + additional_headers?: Record } export interface ConnBaseQueryParams { diff --git a/packages/client-common/src/version.ts b/packages/client-common/src/version.ts index 746c2b11..e8618bef 100644 --- a/packages/client-common/src/version.ts +++ b/packages/client-common/src/version.ts @@ -1 +1 @@ -export default '0.2.8' +export default '0.2.9' diff --git a/packages/client-node/__tests__/integration/node_client.test.ts b/packages/client-node/__tests__/integration/node_client.test.ts new file mode 100644 index 00000000..4b0857e1 --- /dev/null +++ b/packages/client-node/__tests__/integration/node_client.test.ts @@ -0,0 +1,55 @@ +import Http from 'http' +import type Stream from 'stream' +import type { ClickHouseClient } from '../../src' +import { createClient } from '../../src' +import { emitResponseBody, stubClientRequest } from '../utils/http_stubs' + +describe('[Node.js] Client', () => { + let httpRequestStub: jasmine.Spy + let clientRequest: Http.ClientRequest + beforeEach(() => { + clientRequest = stubClientRequest() + httpRequestStub = spyOn(Http, 'request').and.returnValue(clientRequest) + }) + + describe('Additional headers', () => { + it('should be possible to set additional_headers', async () => { + const client = createClient({ + additional_headers: { + 'Test-Header': 'foobar', + }, + }) + await query(client) + + expect(httpRequestStub).toHaveBeenCalledTimes(1) + const calledWith = httpRequestStub.calls.mostRecent().args[1] + expect(calledWith.headers).toEqual({ + Authorization: 'Basic ZGVmYXVsdDo=', // default user with empty password + 'Accept-Encoding': 'gzip', + 'Test-Header': 'foobar', + 'User-Agent': jasmine.stringContaining('clickhouse-js'), + }) + }) + + it('should work without additional headers', async () => { + const client = createClient({}) + await query(client) + + expect(httpRequestStub).toHaveBeenCalledTimes(1) + const calledWith = httpRequestStub.calls.mostRecent().args[1] + expect(calledWith.headers).toEqual({ + Authorization: 'Basic ZGVmYXVsdDo=', // default user with empty password + 'Accept-Encoding': 'gzip', + 'User-Agent': jasmine.stringContaining('clickhouse-js'), + }) + }) + }) + + async function query(client: ClickHouseClient) { + const selectPromise = client.query({ + query: 'SELECT * FROM system.numbers LIMIT 5', + }) + emitResponseBody(clientRequest, 'hi') + await selectPromise + } +}) diff --git a/packages/client-node/__tests__/integration/node_connection.test.ts b/packages/client-node/__tests__/integration/node_connection.test.ts deleted file mode 100644 index 385f568e..00000000 --- a/packages/client-node/__tests__/integration/node_connection.test.ts +++ /dev/null @@ -1,102 +0,0 @@ -import type { ConnectionParams } from '@clickhouse/client-common' -import { LogWriter } from '@clickhouse/client-common' -import { TestLogger } from '@test/utils' -import { randomUUID } from '@test/utils/guid' -import type { ClientRequest } from 'http' -import Http from 'http' -import Stream from 'stream' -import { NodeHttpConnection } from '../../src/connection' - -describe('[Node.js] Connection', () => { - it('should be possible to set additional_headers', async () => { - const request = stubClientRequest() - const httpRequestStub = spyOn(Http, 'request').and.returnValue(request) - const adapter = buildHttpAdapter({ - additional_headers: { - 'Test-Header': 'default', - }, - }) - - const selectPromise = adapter.query({ - query: 'SELECT * FROM system.numbers LIMIT 5', - }) - - const responseBody = 'foobar' - request.emit( - 'response', - buildIncomingMessage({ - body: responseBody, - }) - ) - - await selectPromise - - expect(httpRequestStub).toHaveBeenCalledTimes(1) - const calledWith = httpRequestStub.calls.mostRecent().args[1] - expect(calledWith.headers?.['Test-Header']).toBe('default') - }) - - function buildIncomingMessage({ - body = '', - statusCode = 200, - headers = {}, - }: { - body?: string | Buffer - statusCode?: number - headers?: Http.IncomingHttpHeaders - }): Http.IncomingMessage { - const response = new Stream.Readable({ - read() { - this.push(body) - this.push(null) - }, - }) as Http.IncomingMessage - - response.statusCode = statusCode - response.headers = { - 'x-clickhouse-query-id': randomUUID(), - ...headers, - } - return response - } - - function stubClientRequest() { - const request = new Stream.Writable({ - write() { - /** stub */ - }, - }) as ClientRequest - request.getHeaders = () => ({}) - return request - } - - function buildHttpAdapter(config: Partial) { - return new NodeHttpConnection({ - ...{ - url: new URL('http://localhost:8123'), - - connect_timeout: 10_000, - request_timeout: 30_000, - compression: { - decompress_response: true, - compress_request: false, - }, - max_open_connections: Infinity, - - username: '', - password: '', - database: '', - clickhouse_settings: {}, - additional_headers: {}, - - logWriter: new LogWriter(new TestLogger()), - keep_alive: { - enabled: true, - socket_ttl: 2500, - retry_on_expired_socket: false, - }, - }, - ...config, - }) - } -}) diff --git a/packages/client-node/__tests__/unit/node_connection.test.ts b/packages/client-node/__tests__/unit/node_connection.test.ts index 8d4317c6..b156e708 100644 --- a/packages/client-node/__tests__/unit/node_connection.test.ts +++ b/packages/client-node/__tests__/unit/node_connection.test.ts @@ -1,41 +1,528 @@ -import { createConnection } from '../../src' +import type { + ConnectionParams, + ConnQueryResult, +} from '@clickhouse/client-common' +import { LogWriter } from '@clickhouse/client-common' +import { guid, sleep, TestLogger, validateUUID } from '@test/utils' +import type { ClientRequest } from 'http' +import Http from 'http' +import Stream from 'stream' +import Zlib from 'zlib' +import type { NodeConnectionParams } from '../../src/connection' +import { NodeBaseConnection, NodeHttpConnection } from '../../src/connection' +import { getAsText } from '../../src/utils' import { - type NodeConnectionParams, - NodeHttpConnection, - NodeHttpsConnection, -} from '../../src/connection' - -describe('[Node.js] connection', () => { - const baseParams = { - keep_alive: { - enabled: true, - retry_on_expired_socket: false, - socket_ttl: 2500, - }, - } as NodeConnectionParams - - it('should create HTTP adapter', async () => { - expect(adapter).toBeInstanceOf(NodeHttpConnection) - }) - const adapter = createConnection({ - ...baseParams, - url: new URL('http://localhost'), + buildIncomingMessage, + emitCompressedBody, + emitResponseBody, + stubClientRequest, +} from '../utils/http_stubs' + +describe('[Node.js] Connection', () => { + describe('compression', () => { + describe('response decompression', () => { + it('hints ClickHouse server to send a gzip compressed response if compress_request: true', async () => { + const request = stubClientRequest() + const httpRequestStub = spyOn(Http, 'request').and.returnValue(request) + + const adapter = buildHttpAdapter({ + compression: { + decompress_response: true, + compress_request: false, + }, + }) + + const selectPromise = adapter.query({ + query: 'SELECT * FROM system.numbers LIMIT 5', + }) + + const responseBody = 'foobar' + await emitCompressedBody(request, responseBody) + + await selectPromise + + expect(httpRequestStub).toHaveBeenCalledTimes(1) + const calledWith = httpRequestStub.calls.mostRecent().args[1] + expect(calledWith.headers!['Accept-Encoding']).toBe('gzip') + }) + + it('does not send a compression algorithm hint if compress_request: false', async () => { + const request = stubClientRequest() + const httpRequestStub = spyOn(Http, 'request').and.returnValue(request) + const adapter = buildHttpAdapter({ + compression: { + decompress_response: false, + compress_request: false, + }, + }) + + const selectPromise = adapter.query({ + query: 'SELECT * FROM system.numbers LIMIT 5', + }) + + const responseBody = 'foobar' + emitResponseBody(request, responseBody) + + const queryResult = await selectPromise + await assertQueryResult(queryResult, responseBody) + + expect(httpRequestStub).toHaveBeenCalledTimes(1) + const calledWith = httpRequestStub.calls.mostRecent().args[1] + expect(calledWith.headers!['Accept-Encoding']).toBeUndefined() + }) + + it('uses request-specific settings over config settings', async () => { + const request = stubClientRequest() + const httpRequestStub = spyOn(Http, 'request').and.returnValue(request) + const adapter = buildHttpAdapter({ + compression: { + decompress_response: false, + compress_request: false, + }, + }) + + const selectPromise = adapter.query({ + query: 'SELECT * FROM system.numbers LIMIT 5', + clickhouse_settings: { + enable_http_compression: 1, + }, + }) + + const responseBody = 'foobar' + await emitCompressedBody(request, responseBody) + + const queryResult = await selectPromise + await assertQueryResult(queryResult, responseBody) + + expect(httpRequestStub).toHaveBeenCalledTimes(1) + const calledWith = httpRequestStub.calls.mostRecent().args[1] + expect(calledWith.headers!['Accept-Encoding']).toBe('gzip') + }) + + it('decompresses a gzip response', async () => { + const request = stubClientRequest() + spyOn(Http, 'request').and.returnValue(request) + const adapter = buildHttpAdapter({ + compression: { + decompress_response: true, + compress_request: false, + }, + }) + + const selectPromise = adapter.query({ + query: 'SELECT * FROM system.numbers LIMIT 5', + }) + + const responseBody = 'abc'.repeat(1_000) + await emitCompressedBody(request, responseBody) + + const queryResult = await selectPromise + await assertQueryResult(queryResult, responseBody) + }) + + it('throws on an unexpected encoding', async () => { + const request = stubClientRequest() + spyOn(Http, 'request').and.returnValue(request) + const adapter = buildHttpAdapter({ + compression: { + decompress_response: true, + compress_request: false, + }, + }) + + const selectPromise = adapter.query({ + query: 'SELECT * FROM system.numbers LIMIT 5', + }) + + await emitCompressedBody(request, 'abc', 'br') + + await expectAsync(selectPromise).toBeRejectedWith( + jasmine.objectContaining({ + message: 'Unexpected encoding: br', + }) + ) + }) + + it('provides decompression error to a stream consumer', async () => { + const request = stubClientRequest() + spyOn(Http, 'request').and.returnValue(request) + const adapter = buildHttpAdapter({ + compression: { + decompress_response: true, + compress_request: false, + }, + }) + + const selectPromise = adapter.query({ + query: 'SELECT * FROM system.numbers LIMIT 5', + }) + + // No GZIP encoding for the body here + request.emit( + 'response', + buildIncomingMessage({ + body: 'abc', + headers: { + 'content-encoding': 'gzip', + }, + }) + ) + + const readStream = async () => { + const { stream } = await selectPromise + for await (const chunk of stream) { + void chunk // stub + } + } + + await expectAsync(readStream()).toBeRejectedWith( + jasmine.objectContaining({ + message: 'incorrect header check', + code: 'Z_DATA_ERROR', + }) + ) + }) + }) + + describe('request compression', () => { + it('sends a compressed request if compress_request: true', async () => { + const adapter = buildHttpAdapter({ + compression: { + decompress_response: false, + compress_request: true, + }, + }) + + const values = 'abc'.repeat(1_000) + + let chunks = Buffer.alloc(0) + let finalResult: Buffer | undefined = undefined + const request = new Stream.Writable({ + write(chunk, encoding, next) { + chunks = Buffer.concat([chunks, chunk]) + next() + }, + final() { + Zlib.unzip(chunks, (err, result) => { + finalResult = result + }) + }, + }) as ClientRequest + + const httpRequestStub = spyOn(Http, 'request').and.returnValue(request) + + void adapter.insert({ + query: 'INSERT INTO insert_compression_table', + values, + }) + + // trigger stream pipeline + request.emit('socket', { + setTimeout: () => { + // + }, + }) + + await sleep(100) + expect(finalResult!.toString('utf8')).toEqual(values) + expect(httpRequestStub).toHaveBeenCalledTimes(1) + const calledWith = httpRequestStub.calls.mostRecent().args[1] + expect(calledWith.headers!['Content-Encoding']).toBe('gzip') + }) + }) }) - it('should create HTTPS adapter', async () => { - const adapter = createConnection({ - ...baseParams, - url: new URL('https://localhost'), + describe('User-Agent', () => { + it('should have proper user agent without app id', async () => { + const myHttpAdapter = new MyTestHttpAdapter() + const headers = myHttpAdapter.getDefaultHeaders() + expect(headers['User-Agent']).toMatch( + /^clickhouse-js\/[0-9\\.]+-?(?:(alpha|beta)\d*)? \(lv:nodejs\/v[0-9\\.]+?; os:(?:linux|darwin|win32)\)$/ + ) + }) + + it('should have proper user agent with app id', async () => { + const myHttpAdapter = new MyTestHttpAdapter('MyFancyApp') + const headers = myHttpAdapter.getDefaultHeaders() + expect(headers['User-Agent']).toMatch( + /^MyFancyApp clickhouse-js\/[0-9\\.]+-?(?:(alpha|beta)\d*)? \(lv:nodejs\/v[0-9\\.]+?; os:(?:linux|darwin|win32)\)$/ + ) }) - expect(adapter).toBeInstanceOf(NodeHttpsConnection) }) - it('should throw if the supplied protocol is unknown', async () => { - expect(() => - createConnection({ - ...baseParams, - url: new URL('tcp://localhost'), + it('should have proper auth header', async () => { + const myHttpAdapter = new MyTestHttpAdapter() + const headers = myHttpAdapter.getDefaultHeaders() + expect(headers['Authorization']).toMatch(/^Basic [A-Za-z0-9/+=]+$/) + }) + + describe('query_id', () => { + it('should generate random query_id for each query', async () => { + const adapter = buildHttpAdapter({ + compression: { + decompress_response: false, + compress_request: false, + }, + }) + + const httpRequestStub = spyOn(Http, 'request') + + const request1 = stubClientRequest() + httpRequestStub.and.returnValue(request1) + + const selectPromise1 = adapter.query({ + query: 'SELECT * FROM system.numbers LIMIT 5', + }) + const responseBody1 = 'foobar' + emitResponseBody(request1, responseBody1) + const queryResult1 = await selectPromise1 + + const request2 = stubClientRequest() + httpRequestStub.and.returnValue(request2) + + const selectPromise2 = adapter.query({ + query: 'SELECT * FROM system.numbers LIMIT 5', + }) + const responseBody2 = 'qaz' + emitResponseBody(request2, responseBody2) + const queryResult2 = await selectPromise2 + + await assertQueryResult(queryResult1, responseBody1) + await assertQueryResult(queryResult2, responseBody2) + expect(queryResult1.query_id).not.toEqual(queryResult2.query_id) + + const url1 = httpRequestStub.calls.all()[0].args[0] + expect(url1.search).toContain(`&query_id=${queryResult1.query_id}`) + + const url2 = httpRequestStub.calls.all()[1].args[0] + expect(url2.search).toContain(`&query_id=${queryResult2.query_id}`) + }) + + it('should use provided query_id for query', async () => { + const adapter = buildHttpAdapter({ + compression: { + decompress_response: false, + compress_request: false, + }, + }) + + const request = stubClientRequest() + const httpRequestStub = spyOn(Http, 'request').and.returnValue(request) + + const query_id = guid() + const selectPromise = adapter.query({ + query: 'SELECT * FROM system.numbers LIMIT 5', + query_id, + }) + const responseBody = 'foobar' + emitResponseBody(request, responseBody) + const { stream } = await selectPromise + expect(await getAsText(stream)).toBe(responseBody) + + expect(httpRequestStub).toHaveBeenCalledTimes(1) + const [url] = httpRequestStub.calls.mostRecent().args + expect(url.search).toContain(`&query_id=${query_id}`) + }) + + it('should generate random query_id for every exec request', async () => { + const adapter = buildHttpAdapter({ + compression: { + decompress_response: false, + compress_request: false, + }, + }) + + const httpRequestStub = spyOn(Http, 'request') + + const request1 = stubClientRequest() + httpRequestStub.and.returnValue(request1) + + const execPromise1 = adapter.exec({ + query: 'SELECT * FROM system.numbers LIMIT 5', + }) + const responseBody1 = 'foobar' + emitResponseBody(request1, responseBody1) + const queryResult1 = await execPromise1 + + const request2 = stubClientRequest() + httpRequestStub.and.returnValue(request2) + + const execPromise2 = adapter.exec({ + query: 'SELECT * FROM system.numbers LIMIT 5', + }) + const responseBody2 = 'qaz' + emitResponseBody(request2, responseBody2) + const queryResult2 = await execPromise2 + + await assertQueryResult(queryResult1, responseBody1) + await assertQueryResult(queryResult2, responseBody2) + expect(queryResult1.query_id).not.toEqual(queryResult2.query_id) + + const [url1] = httpRequestStub.calls.all()[0].args + expect(url1.search).toContain(`&query_id=${queryResult1.query_id}`) + + const [url2] = httpRequestStub.calls.all()[1].args + expect(url2.search).toContain(`&query_id=${queryResult2.query_id}`) + }) + + it('should use provided query_id for exec', async () => { + const adapter = buildHttpAdapter({ + compression: { + decompress_response: false, + compress_request: false, + }, + }) + + const httpRequestStub = spyOn(Http, 'request') + const request = stubClientRequest() + httpRequestStub.and.returnValue(request) + + const query_id = guid() + const execPromise = adapter.exec({ + query: 'SELECT * FROM system.numbers LIMIT 5', + query_id, + }) + const responseBody = 'foobar' + emitResponseBody(request, responseBody) + const { stream } = await execPromise + expect(await getAsText(stream)).toBe(responseBody) + + expect(httpRequestStub).toHaveBeenCalledTimes(1) + const [url] = httpRequestStub.calls.mostRecent().args + expect(url.search).toContain(`&query_id=${query_id}`) + }) + + it('should generate random query_id for every insert request', async () => { + const adapter = buildHttpAdapter({ + compression: { + decompress_response: false, + compress_request: false, + }, + }) + + const httpRequestStub = spyOn(Http, 'request') + + const request1 = stubClientRequest() + httpRequestStub.and.returnValue(request1) + + const insertPromise1 = adapter.insert({ + query: 'INSERT INTO default.foo VALUES (42)', + values: 'foobar', }) - ).toThrowError('Only HTTP(s) adapters are supported') + const responseBody1 = 'foobar' + emitResponseBody(request1, responseBody1) + const { query_id: queryId1 } = await insertPromise1 + + const request2 = stubClientRequest() + httpRequestStub.and.returnValue(request2) + + const insertPromise2 = adapter.insert({ + query: 'INSERT INTO default.foo VALUES (42)', + values: 'foobar', + }) + const responseBody2 = 'qaz' + emitResponseBody(request2, responseBody2) + const { query_id: queryId2 } = await insertPromise2 + + assertQueryId(queryId1) + assertQueryId(queryId2) + expect(queryId1).not.toEqual(queryId2) + + const [url1] = httpRequestStub.calls.all()[0].args + expect(url1.search).toContain(`&query_id=${queryId1}`) + + const [url2] = httpRequestStub.calls.all()[1].args + expect(url2.search).toContain(`&query_id=${queryId2}`) + }) + + it('should use provided query_id for insert', async () => { + const adapter = buildHttpAdapter({ + compression: { + decompress_response: false, + compress_request: false, + }, + }) + + const request = stubClientRequest() + const httpRequestStub = spyOn(Http, 'request').and.returnValue(request) + + const query_id = guid() + const insertPromise = adapter.insert({ + query: 'INSERT INTO default.foo VALUES (42)', + values: 'foobar', + query_id, + }) + const responseBody = 'foobar' + emitResponseBody(request, responseBody) + await insertPromise + + const [url] = httpRequestStub.calls.mostRecent().args + expect(url.search).toContain(`&query_id=${query_id}`) + }) }) + + function buildHttpAdapter(config: Partial) { + return new NodeHttpConnection({ + ...{ + url: new URL('http://localhost:8123'), + + connect_timeout: 10_000, + request_timeout: 30_000, + compression: { + decompress_response: true, + compress_request: false, + }, + max_open_connections: Infinity, + + username: '', + password: '', + database: '', + clickhouse_settings: {}, + + logWriter: new LogWriter(new TestLogger()), + keep_alive: { + enabled: true, + socket_ttl: 2500, + retry_on_expired_socket: false, + }, + }, + ...config, + }) + } + + async function assertQueryResult( + { stream, query_id }: ConnQueryResult, + expectedResponseBody: any + ) { + expect(await getAsText(stream)).toBe(expectedResponseBody) + assertQueryId(query_id) + } + + function assertQueryId(query_id: string) { + expect(typeof query_id).toBe('string') + expect(validateUUID(query_id)).toBeTruthy() + } }) + +class MyTestHttpAdapter extends NodeBaseConnection { + constructor(application_id?: string) { + super( + { + application_id, + logWriter: new LogWriter(new TestLogger()), + keep_alive: { + enabled: true, + socket_ttl: 2500, + retry_on_expired_socket: true, + }, + } as NodeConnectionParams, + {} as Http.Agent + ) + } + protected createClientRequest(): Http.ClientRequest { + return {} as any + } + public getDefaultHeaders() { + return this.headers + } +} diff --git a/packages/client-node/__tests__/unit/node_create_connection.test.ts b/packages/client-node/__tests__/unit/node_create_connection.test.ts new file mode 100644 index 00000000..954757a9 --- /dev/null +++ b/packages/client-node/__tests__/unit/node_create_connection.test.ts @@ -0,0 +1,41 @@ +import { createConnection } from '../../src' +import { + type NodeConnectionParams, + NodeHttpConnection, + NodeHttpsConnection, +} from '../../src/connection' + +describe('[Node.js] createConnection', () => { + const baseParams = { + keep_alive: { + enabled: true, + retry_on_expired_socket: false, + socket_ttl: 2500, + }, + } as NodeConnectionParams + + it('should create HTTP adapter', async () => { + expect(adapter).toBeInstanceOf(NodeHttpConnection) + }) + const adapter = createConnection({ + ...baseParams, + url: new URL('http://localhost'), + }) + + it('should create HTTPS adapter', async () => { + const adapter = createConnection({ + ...baseParams, + url: new URL('https://localhost'), + }) + expect(adapter).toBeInstanceOf(NodeHttpsConnection) + }) + + it('should throw if the supplied protocol is unknown', async () => { + expect(() => + createConnection({ + ...baseParams, + url: new URL('tcp://localhost'), + }) + ).toThrowError('Only HTTP(s) adapters are supported') + }) +}) diff --git a/packages/client-node/__tests__/unit/node_http_adapter.test.ts b/packages/client-node/__tests__/unit/node_http_adapter.test.ts deleted file mode 100644 index 28911c56..00000000 --- a/packages/client-node/__tests__/unit/node_http_adapter.test.ts +++ /dev/null @@ -1,627 +0,0 @@ -import type { - ConnectionParams, - ConnQueryResult, -} from '@clickhouse/client-common' -import { LogWriter } from '@clickhouse/client-common' -import { guid, sleep, TestLogger, validateUUID } from '@test/utils' -import { randomUUID } from '@test/utils/guid' -import type { ClientRequest } from 'http' -import Http from 'http' -import Stream from 'stream' -import Util from 'util' -import Zlib from 'zlib' -import type { NodeConnectionParams } from '../../src/connection' -import { NodeBaseConnection, NodeHttpConnection } from '../../src/connection' -import { getAsText } from '../../src/utils' - -describe('[Node.js] HttpAdapter', () => { - const gzip = Util.promisify(Zlib.gzip) - - describe('compression', () => { - describe('response decompression', () => { - it('hints ClickHouse server to send a gzip compressed response if compress_request: true', async () => { - const request = stubClientRequest() - const httpRequestStub = spyOn(Http, 'request').and.returnValue(request) - - const adapter = buildHttpAdapter({ - compression: { - decompress_response: true, - compress_request: false, - }, - }) - - const selectPromise = adapter.query({ - query: 'SELECT * FROM system.numbers LIMIT 5', - }) - - const responseBody = 'foobar' - await emitCompressedBody(request, responseBody) - - await selectPromise - - expect(httpRequestStub).toHaveBeenCalledTimes(1) - const calledWith = httpRequestStub.calls.mostRecent().args[1] - expect(calledWith.headers!['Accept-Encoding']).toBe('gzip') - }) - - it('does not send a compression algorithm hint if compress_request: false', async () => { - const request = stubClientRequest() - const httpRequestStub = spyOn(Http, 'request').and.returnValue(request) - const adapter = buildHttpAdapter({ - compression: { - decompress_response: false, - compress_request: false, - }, - }) - - const selectPromise = adapter.query({ - query: 'SELECT * FROM system.numbers LIMIT 5', - }) - - const responseBody = 'foobar' - request.emit( - 'response', - buildIncomingMessage({ - body: responseBody, - }) - ) - - const queryResult = await selectPromise - await assertQueryResult(queryResult, responseBody) - - expect(httpRequestStub).toHaveBeenCalledTimes(1) - const calledWith = httpRequestStub.calls.mostRecent().args[1] - expect(calledWith.headers!['Accept-Encoding']).toBeUndefined() - }) - - it('uses request-specific settings over config settings', async () => { - const request = stubClientRequest() - const httpRequestStub = spyOn(Http, 'request').and.returnValue(request) - const adapter = buildHttpAdapter({ - compression: { - decompress_response: false, - compress_request: false, - }, - }) - - const selectPromise = adapter.query({ - query: 'SELECT * FROM system.numbers LIMIT 5', - clickhouse_settings: { - enable_http_compression: 1, - }, - }) - - const responseBody = 'foobar' - await emitCompressedBody(request, responseBody) - - const queryResult = await selectPromise - await assertQueryResult(queryResult, responseBody) - - expect(httpRequestStub).toHaveBeenCalledTimes(1) - const calledWith = httpRequestStub.calls.mostRecent().args[1] - expect(calledWith.headers!['Accept-Encoding']).toBe('gzip') - }) - - it('decompresses a gzip response', async () => { - const request = stubClientRequest() - spyOn(Http, 'request').and.returnValue(request) - const adapter = buildHttpAdapter({ - compression: { - decompress_response: true, - compress_request: false, - }, - }) - - const selectPromise = adapter.query({ - query: 'SELECT * FROM system.numbers LIMIT 5', - }) - - const responseBody = 'abc'.repeat(1_000) - await emitCompressedBody(request, responseBody) - - const queryResult = await selectPromise - await assertQueryResult(queryResult, responseBody) - }) - - it('throws on an unexpected encoding', async () => { - const request = stubClientRequest() - spyOn(Http, 'request').and.returnValue(request) - const adapter = buildHttpAdapter({ - compression: { - decompress_response: true, - compress_request: false, - }, - }) - - const selectPromise = adapter.query({ - query: 'SELECT * FROM system.numbers LIMIT 5', - }) - - await emitCompressedBody(request, 'abc', 'br') - - await expectAsync(selectPromise).toBeRejectedWith( - jasmine.objectContaining({ - message: 'Unexpected encoding: br', - }) - ) - }) - - it('provides decompression error to a stream consumer', async () => { - const request = stubClientRequest() - spyOn(Http, 'request').and.returnValue(request) - const adapter = buildHttpAdapter({ - compression: { - decompress_response: true, - compress_request: false, - }, - }) - - const selectPromise = adapter.query({ - query: 'SELECT * FROM system.numbers LIMIT 5', - }) - - // No GZIP encoding for the body here - request.emit( - 'response', - buildIncomingMessage({ - body: 'abc', - headers: { - 'content-encoding': 'gzip', - }, - }) - ) - - const readStream = async () => { - const { stream } = await selectPromise - for await (const chunk of stream) { - void chunk // stub - } - } - - await expectAsync(readStream()).toBeRejectedWith( - jasmine.objectContaining({ - message: 'incorrect header check', - code: 'Z_DATA_ERROR', - }) - ) - }) - }) - - describe('request compression', () => { - it('sends a compressed request if compress_request: true', async () => { - const adapter = buildHttpAdapter({ - compression: { - decompress_response: false, - compress_request: true, - }, - }) - - const values = 'abc'.repeat(1_000) - - let chunks = Buffer.alloc(0) - let finalResult: Buffer | undefined = undefined - const request = new Stream.Writable({ - write(chunk, encoding, next) { - chunks = Buffer.concat([chunks, chunk]) - next() - }, - final() { - Zlib.unzip(chunks, (err, result) => { - finalResult = result - }) - }, - }) as ClientRequest - - const httpRequestStub = spyOn(Http, 'request').and.returnValue(request) - - void adapter.insert({ - query: 'INSERT INTO insert_compression_table', - values, - }) - - // trigger stream pipeline - request.emit('socket', { - setTimeout: () => { - // - }, - }) - - await sleep(100) - expect(finalResult!.toString('utf8')).toEqual(values) - expect(httpRequestStub).toHaveBeenCalledTimes(1) - const calledWith = httpRequestStub.calls.mostRecent().args[1] - expect(calledWith.headers!['Content-Encoding']).toBe('gzip') - }) - }) - - async function emitCompressedBody( - request: ClientRequest, - body: string, - encoding = 'gzip' - ) { - const compressedBody = await gzip(body) - request.emit( - 'response', - buildIncomingMessage({ - body: compressedBody, - headers: { - 'content-encoding': encoding, - }, - }) - ) - } - }) - - describe('User-Agent', () => { - it('should have proper user agent without app id', async () => { - const myHttpAdapter = new MyTestHttpAdapter() - const headers = myHttpAdapter.getDefaultHeaders() - expect(headers['User-Agent']).toMatch( - /^clickhouse-js\/[0-9\\.]+-?(?:(alpha|beta)\d*)? \(lv:nodejs\/v[0-9\\.]+?; os:(?:linux|darwin|win32)\)$/ - ) - }) - - it('should have proper user agent with app id', async () => { - const myHttpAdapter = new MyTestHttpAdapter('MyFancyApp') - const headers = myHttpAdapter.getDefaultHeaders() - expect(headers['User-Agent']).toMatch( - /^MyFancyApp clickhouse-js\/[0-9\\.]+-?(?:(alpha|beta)\d*)? \(lv:nodejs\/v[0-9\\.]+?; os:(?:linux|darwin|win32)\)$/ - ) - }) - }) - - it('should have proper auth header', async () => { - const myHttpAdapter = new MyTestHttpAdapter() - const headers = myHttpAdapter.getDefaultHeaders() - expect(headers['Authorization']).toMatch(/^Basic [A-Za-z0-9/+=]+$/) - }) - - describe('query_id', () => { - it('should generate random query_id for each query', async () => { - const adapter = buildHttpAdapter({ - compression: { - decompress_response: false, - compress_request: false, - }, - }) - - const httpRequestStub = spyOn(Http, 'request') - - const request1 = stubClientRequest() - httpRequestStub.and.returnValue(request1) - - const selectPromise1 = adapter.query({ - query: 'SELECT * FROM system.numbers LIMIT 5', - }) - const responseBody1 = 'foobar' - request1.emit( - 'response', - buildIncomingMessage({ - body: responseBody1, - }) - ) - const queryResult1 = await selectPromise1 - - const request2 = stubClientRequest() - httpRequestStub.and.returnValue(request2) - - const selectPromise2 = adapter.query({ - query: 'SELECT * FROM system.numbers LIMIT 5', - }) - const responseBody2 = 'qaz' - request2.emit( - 'response', - buildIncomingMessage({ - body: responseBody2, - }) - ) - const queryResult2 = await selectPromise2 - - await assertQueryResult(queryResult1, responseBody1) - await assertQueryResult(queryResult2, responseBody2) - expect(queryResult1.query_id).not.toEqual(queryResult2.query_id) - - const url1 = httpRequestStub.calls.all()[0].args[0] - expect(url1.search).toContain(`&query_id=${queryResult1.query_id}`) - - const url2 = httpRequestStub.calls.all()[1].args[0] - expect(url2.search).toContain(`&query_id=${queryResult2.query_id}`) - }) - - it('should use provided query_id for query', async () => { - const adapter = buildHttpAdapter({ - compression: { - decompress_response: false, - compress_request: false, - }, - }) - - const request = stubClientRequest() - const httpRequestStub = spyOn(Http, 'request').and.returnValue(request) - - const query_id = guid() - const selectPromise = adapter.query({ - query: 'SELECT * FROM system.numbers LIMIT 5', - query_id, - }) - const responseBody = 'foobar' - request.emit( - 'response', - buildIncomingMessage({ - body: responseBody, - }) - ) - const { stream } = await selectPromise - expect(await getAsText(stream)).toBe(responseBody) - - expect(httpRequestStub).toHaveBeenCalledTimes(1) - const [url] = httpRequestStub.calls.mostRecent().args - expect(url.search).toContain(`&query_id=${query_id}`) - }) - - it('should generate random query_id for every exec request', async () => { - const adapter = buildHttpAdapter({ - compression: { - decompress_response: false, - compress_request: false, - }, - }) - - const httpRequestStub = spyOn(Http, 'request') - - const request1 = stubClientRequest() - httpRequestStub.and.returnValue(request1) - - const execPromise1 = adapter.exec({ - query: 'SELECT * FROM system.numbers LIMIT 5', - }) - const responseBody1 = 'foobar' - request1.emit( - 'response', - buildIncomingMessage({ - body: responseBody1, - }) - ) - const queryResult1 = await execPromise1 - - const request2 = stubClientRequest() - httpRequestStub.and.returnValue(request2) - - const execPromise2 = adapter.exec({ - query: 'SELECT * FROM system.numbers LIMIT 5', - }) - const responseBody2 = 'qaz' - request2.emit( - 'response', - buildIncomingMessage({ - body: responseBody2, - }) - ) - const queryResult2 = await execPromise2 - - await assertQueryResult(queryResult1, responseBody1) - await assertQueryResult(queryResult2, responseBody2) - expect(queryResult1.query_id).not.toEqual(queryResult2.query_id) - - const [url1] = httpRequestStub.calls.all()[0].args - expect(url1.search).toContain(`&query_id=${queryResult1.query_id}`) - - const [url2] = httpRequestStub.calls.all()[1].args - expect(url2.search).toContain(`&query_id=${queryResult2.query_id}`) - }) - - it('should use provided query_id for exec', async () => { - const adapter = buildHttpAdapter({ - compression: { - decompress_response: false, - compress_request: false, - }, - }) - - const httpRequestStub = spyOn(Http, 'request') - const request = stubClientRequest() - httpRequestStub.and.returnValue(request) - - const query_id = guid() - const execPromise = adapter.exec({ - query: 'SELECT * FROM system.numbers LIMIT 5', - query_id, - }) - const responseBody = 'foobar' - request.emit( - 'response', - buildIncomingMessage({ - body: responseBody, - }) - ) - const { stream } = await execPromise - expect(await getAsText(stream)).toBe(responseBody) - - expect(httpRequestStub).toHaveBeenCalledTimes(1) - const [url] = httpRequestStub.calls.mostRecent().args - expect(url.search).toContain(`&query_id=${query_id}`) - }) - - it('should generate random query_id for every insert request', async () => { - const adapter = buildHttpAdapter({ - compression: { - decompress_response: false, - compress_request: false, - }, - }) - - const httpRequestStub = spyOn(Http, 'request') - - const request1 = stubClientRequest() - httpRequestStub.and.returnValue(request1) - - const insertPromise1 = adapter.insert({ - query: 'INSERT INTO default.foo VALUES (42)', - values: 'foobar', - }) - const responseBody1 = 'foobar' - request1.emit( - 'response', - buildIncomingMessage({ - body: responseBody1, - }) - ) - const { query_id: queryId1 } = await insertPromise1 - - const request2 = stubClientRequest() - httpRequestStub.and.returnValue(request2) - - const insertPromise2 = adapter.insert({ - query: 'INSERT INTO default.foo VALUES (42)', - values: 'foobar', - }) - const responseBody2 = 'qaz' - request2.emit( - 'response', - buildIncomingMessage({ - body: responseBody2, - }) - ) - const { query_id: queryId2 } = await insertPromise2 - - assertQueryId(queryId1) - assertQueryId(queryId2) - expect(queryId1).not.toEqual(queryId2) - - const [url1] = httpRequestStub.calls.all()[0].args - expect(url1.search).toContain(`&query_id=${queryId1}`) - - const [url2] = httpRequestStub.calls.all()[1].args - expect(url2.search).toContain(`&query_id=${queryId2}`) - }) - - it('should use provided query_id for insert', async () => { - const adapter = buildHttpAdapter({ - compression: { - decompress_response: false, - compress_request: false, - }, - }) - - const request = stubClientRequest() - const httpRequestStub = spyOn(Http, 'request').and.returnValue(request) - - const query_id = guid() - const insertPromise1 = adapter.insert({ - query: 'INSERT INTO default.foo VALUES (42)', - values: 'foobar', - query_id, - }) - const responseBody1 = 'foobar' - request.emit( - 'response', - buildIncomingMessage({ - body: responseBody1, - }) - ) - await insertPromise1 - - const [url] = httpRequestStub.calls.mostRecent().args - expect(url.search).toContain(`&query_id=${query_id}`) - }) - }) - - function buildIncomingMessage({ - body = '', - statusCode = 200, - headers = {}, - }: { - body?: string | Buffer - statusCode?: number - headers?: Http.IncomingHttpHeaders - }): Http.IncomingMessage { - const response = new Stream.Readable({ - read() { - this.push(body) - this.push(null) - }, - }) as Http.IncomingMessage - - response.statusCode = statusCode - response.headers = { - 'x-clickhouse-query-id': randomUUID(), - ...headers, - } - return response - } - - function stubClientRequest() { - const request = new Stream.Writable({ - write() { - /** stub */ - }, - }) as ClientRequest - request.getHeaders = () => ({}) - return request - } - - function buildHttpAdapter(config: Partial) { - return new NodeHttpConnection({ - ...{ - url: new URL('http://localhost:8123'), - - connect_timeout: 10_000, - request_timeout: 30_000, - compression: { - decompress_response: true, - compress_request: false, - }, - max_open_connections: Infinity, - - username: '', - password: '', - database: '', - clickhouse_settings: {}, - - logWriter: new LogWriter(new TestLogger()), - keep_alive: { - enabled: true, - socket_ttl: 2500, - retry_on_expired_socket: false, - }, - }, - ...config, - }) - } - - async function assertQueryResult( - { stream, query_id }: ConnQueryResult, - expectedResponseBody: any - ) { - expect(await getAsText(stream)).toBe(expectedResponseBody) - assertQueryId(query_id) - } - - function assertQueryId(query_id: string) { - expect(typeof query_id).toBe('string') - expect(validateUUID(query_id)).toBeTruthy() - } -}) - -class MyTestHttpAdapter extends NodeBaseConnection { - constructor(application_id?: string) { - super( - { - application_id, - logWriter: new LogWriter(new TestLogger()), - keep_alive: { - enabled: true, - socket_ttl: 2500, - retry_on_expired_socket: true, - }, - } as NodeConnectionParams, - {} as Http.Agent - ) - } - protected createClientRequest(): Http.ClientRequest { - return {} as any - } - public getDefaultHeaders() { - return this.headers - } -} diff --git a/packages/client-node/__tests__/utils/http_stubs.ts b/packages/client-node/__tests__/utils/http_stubs.ts new file mode 100644 index 00000000..fdce42ad --- /dev/null +++ b/packages/client-node/__tests__/utils/http_stubs.ts @@ -0,0 +1,71 @@ +import { randomUUID } from '@test/utils/guid' +import type { ClientRequest } from 'http' +import type Http from 'http' +import Stream from 'stream' +import Util from 'util' +import Zlib from 'zlib' + +const gzip = Util.promisify(Zlib.gzip) + +export function buildIncomingMessage({ + body = '', + statusCode = 200, + headers = {}, +}: { + body?: string | Buffer + statusCode?: number + headers?: Http.IncomingHttpHeaders +}): Http.IncomingMessage { + const response = new Stream.Readable({ + read() { + this.push(body) + this.push(null) + }, + }) as Http.IncomingMessage + + response.statusCode = statusCode + response.headers = { + 'x-clickhouse-query-id': randomUUID(), + ...headers, + } + return response +} + +export function stubClientRequest() { + const request = new Stream.Writable({ + write() { + /** stub */ + }, + }) as ClientRequest + request.getHeaders = () => ({}) + return request +} + +export function emitResponseBody( + request: Http.ClientRequest, + body: string | Buffer | undefined +) { + request.emit( + 'response', + buildIncomingMessage({ + body, + }) + ) +} + +export async function emitCompressedBody( + request: ClientRequest, + body: string | Buffer, + encoding = 'gzip' +) { + const compressedBody = await gzip(body) + request.emit( + 'response', + buildIncomingMessage({ + body: compressedBody, + headers: { + 'content-encoding': encoding, + }, + }) + ) +} diff --git a/packages/client-node/src/connection/node_base_connection.ts b/packages/client-node/src/connection/node_base_connection.ts index c24c51f8..28cce922 100644 --- a/packages/client-node/src/connection/node_base_connection.ts +++ b/packages/client-node/src/connection/node_base_connection.ts @@ -92,7 +92,7 @@ export abstract class NodeBaseConnection protected buildDefaultHeaders( username: string, password: string, - additional_headers?: Record + additional_headers?: Record ): Http.OutgoingHttpHeaders { return { Authorization: `Basic ${Buffer.from(`${username}:${password}`).toString( diff --git a/packages/client-node/src/connection/node_https_connection.ts b/packages/client-node/src/connection/node_https_connection.ts index 3b07c3a5..e30969f5 100644 --- a/packages/client-node/src/connection/node_https_connection.ts +++ b/packages/client-node/src/connection/node_https_connection.ts @@ -27,7 +27,7 @@ export class NodeHttpsConnection protected override buildDefaultHeaders( username: string, password: string, - additional_headers?: Record + additional_headers?: Record ): Http.OutgoingHttpHeaders { if (this.params.tls?.type === 'Mutual') { return { diff --git a/packages/client-node/src/version.ts b/packages/client-node/src/version.ts index 746c2b11..e8618bef 100644 --- a/packages/client-node/src/version.ts +++ b/packages/client-node/src/version.ts @@ -1 +1 @@ -export default '0.2.8' +export default '0.2.9' diff --git a/packages/client-web/__tests__/integration/web_connection.test.ts b/packages/client-web/__tests__/integration/web_client.test.ts similarity index 50% rename from packages/client-web/__tests__/integration/web_connection.test.ts rename to packages/client-web/__tests__/integration/web_client.test.ts index d01249fc..eb89b7da 100644 --- a/packages/client-web/__tests__/integration/web_connection.test.ts +++ b/packages/client-web/__tests__/integration/web_client.test.ts @@ -1,41 +1,40 @@ import { createClient } from '../../src' import type { WebClickHouseClient } from '../../src/client' -describe('[Web] Connection', () => { - describe('additional_headers', () => { - let fetchSpy: jasmine.Spy - beforeEach(() => { - fetchSpy = spyOn(window, 'fetch').and.returnValue( - Promise.resolve(new Response()) - ) - }) +describe('[Web] Client', () => { + let fetchSpy: jasmine.Spy + beforeEach(() => { + fetchSpy = spyOn(window, 'fetch').and.returnValue( + Promise.resolve(new Response()) + ) + }) + describe('Additional headers', () => { it('should be possible to set', async () => { const client = createClient({ additional_headers: { - 'Test-Header': 'default', + 'Test-Header': 'foobar', }, }) const fetchParams = await pingAndGetRequestInit(client) - expect(fetchParams!.headers?.['Test-Header']).toBe('default') + expect(fetchParams!.headers).toEqual({ + Authorization: 'Basic ZGVmYXVsdDo=', // default user with empty password + 'Accept-Encoding': 'gzip', + 'Test-Header': 'foobar', + }) + }) - async function pingAndGetRequestInit(client: WebClickHouseClient) { - await client.ping() - expect(fetchSpy).toHaveBeenCalledTimes(1) - const [, fetchParams] = fetchSpy.calls.mostRecent().args - return fetchParams! - } + it('should work with no additional headers provided', async () => { + const client = createClient({}) + const fetchParams = await pingAndGetRequestInit(client) + expect(fetchParams!.headers).toEqual({ + Authorization: 'Basic ZGVmYXVsdDo=', // default user with empty password + 'Accept-Encoding': 'gzip', + }) }) }) describe('KeepAlive setting', () => { - let fetchSpy: jasmine.Spy - beforeEach(() => { - fetchSpy = spyOn(window, 'fetch').and.returnValue( - Promise.resolve(new Response()) - ) - }) - it('should be enabled by default', async () => { const client = createClient() const fetchParams = await pingAndGetRequestInit(client) @@ -53,12 +52,12 @@ describe('[Web] Connection', () => { const fetchParams = await pingAndGetRequestInit(client) expect(fetchParams.keepalive).toBeTruthy() }) - - async function pingAndGetRequestInit(client: WebClickHouseClient) { - await client.ping() - expect(fetchSpy).toHaveBeenCalledTimes(1) - const [, fetchParams] = fetchSpy.calls.mostRecent().args - return fetchParams! - } }) + + async function pingAndGetRequestInit(client: WebClickHouseClient) { + await client.ping() + expect(fetchSpy).toHaveBeenCalledTimes(1) + const [, fetchParams] = fetchSpy.calls.mostRecent().args + return fetchParams! + } }) diff --git a/packages/client-web/src/version.ts b/packages/client-web/src/version.ts index 746c2b11..e8618bef 100644 --- a/packages/client-web/src/version.ts +++ b/packages/client-web/src/version.ts @@ -1 +1 @@ -export default '0.2.8' +export default '0.2.9'