Skip to content

Commit

Permalink
Fix an uncaught exception during failure response decompression
Browse files Browse the repository at this point in the history
  • Loading branch information
slvrtrn committed Dec 6, 2024
1 parent abee859 commit ad0e374
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import type { ClickHouseClient } from '@clickhouse/client-common'
import { createTestClient } from '@test/utils'
import http from 'http'
import type Stream from 'stream'
import type { NodeClickHouseClientConfigOptions } from '../../src/config'

describe('[Node.js] Compression', () => {
const port = 18123

let client: ClickHouseClient<Stream.Readable>
let server: http.Server

describe('Malformed compression response', () => {
const logAndQuit = (err: Error | unknown, prefix: string) => {
console.error(prefix, err)
process.exit(1)
}
const uncaughtExceptionListener = (err: Error) =>
logAndQuit(err, 'uncaughtException:')
const unhandledRejectionListener = (err: unknown) =>
logAndQuit(err, 'unhandledRejection:')

beforeEach(async () => {
process.on('uncaughtException', uncaughtExceptionListener)
process.on('unhandledRejection', unhandledRejectionListener)
client = createTestClient({
url: `http://localhost:${port}`,
compression: {
response: true,
},
} as NodeClickHouseClientConfigOptions)
})
afterEach(async () => {
process.off('uncaughtException', uncaughtExceptionListener)
process.off('unhandledRejection', unhandledRejectionListener)
await client.close()
server.close()
})

it('should not propagate the exception to the global context if a failed response is malformed', async () => {
server = http.createServer(async (_req, res) => {
return makeResponse(res, 500)
})
server.listen(port)

// The request fails completely (and the error message cannot be decompressed)
await expectAsync(
client.query({
query: 'SELECT 1',
format: 'JSONEachRow',
}),
).toBeRejectedWith(
jasmine.objectContaining({
code: 'Z_DATA_ERROR',
}),
)
})

it('should not propagate the exception to the global context if a successful response is malformed', async () => {
server = http.createServer(async (_req, res) => {
return makeResponse(res, 200)
})
server.listen(port)

const rs = await client.query({
query: 'SELECT 1',
format: 'JSONEachRow',
})

// Fails during the response streaming
await expectAsync(rs.text()).toBeRejectedWithError()
})
})

function makeResponse(res: http.ServerResponse, status: 200 | 500) {
res.appendHeader('Content-Encoding', 'gzip')
res.statusCode = status
res.write('A malformed response without compression')
return res.end()
}
})
19 changes: 11 additions & 8 deletions packages/client-node/src/connection/compression.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import type { LogWriter } from '@clickhouse/client-common'
import type Http from 'http'
import Stream from 'stream'
import Zlib from 'zlib'

export function decompressResponse(response: Http.IncomingMessage):
| {
response: Stream.Readable
}
| { error: Error } {
type DecompressResponseResult = { response: Stream.Readable } | { error: Error }

export function decompressResponse(
response: Http.IncomingMessage,
logWriter: LogWriter,
): DecompressResponseResult {
const encoding = response.headers['content-encoding']

if (encoding === 'gzip') {
Expand All @@ -16,9 +18,10 @@ export function decompressResponse(response: Http.IncomingMessage):
Zlib.createGunzip(),
function pipelineCb(err) {
if (err) {
// FIXME: use logger instead
// eslint-disable-next-line no-console
console.error(err)
logWriter.error({
message: 'An error occurred while decompressing the response',
err,
})
}
},
),
Expand Down
10 changes: 8 additions & 2 deletions packages/client-node/src/connection/node_base_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ export abstract class NodeBaseConnection
// even if the stream decompression is disabled, we have to decompress it in case of an error
const isFailedResponse = !isSuccessfulResponse(_response.statusCode)
if (tryDecompressResponseStream || isFailedResponse) {
const decompressionResult = decompressResponse(_response)
const decompressionResult = decompressResponse(_response, this.logger)
if (isDecompressionError(decompressionResult)) {
return reject(decompressionResult.error)
}
Expand All @@ -474,7 +474,13 @@ export abstract class NodeBaseConnection
responseStream = _response
}
if (isFailedResponse) {
reject(parseError(await getAsText(responseStream)))
try {
const errorMessage = await getAsText(responseStream)
reject(parseError(errorMessage))
} catch (err) {
// If the ClickHouse response is malformed
reject(err)
}
} else {
return resolve({
stream: responseStream,
Expand Down

0 comments on commit ad0e374

Please sign in to comment.