Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[client-v2] Inserting compressed data (using compressed input stream) #2010

Open
dizider opened this issue Dec 10, 2024 · 3 comments
Open

[client-v2] Inserting compressed data (using compressed input stream) #2010

dizider opened this issue Dec 10, 2024 · 3 comments

Comments

@dizider
Copy link
Contributor

dizider commented Dec 10, 2024

Describe your feedback

The client in version 2 allows to insert data from input byte stream. It also supports inserting data in compressed format (decompress option). I would like to be able to insert data from already compressed data stream. This change allows to create compressed in memory batches and then send them directly.

I have already looked at code and found that there is one major incompatibly with the current API - insert statement is a part of the request body and is added before each request. This is a problem because the whole body has to be compressed or not. So I think the options are to send the statement as query parameter of HTTP or to allow user defined insert statements.

Code example

Here are two examples of how I would like to use the client. The second example is my actual use case - create compressed batches in memory and then sending them. Creating these compressed batches should be more memory efficient which can be used in memory heavy applications.

DISCLAIMER: The following examples do not work with current implementation! they assume that insert query will be sent as an HTTP query parameter.

Compressed stream

void compressedStream() {
	final String table = "test_table";  
	final ClickHouseFormat format = ClickHouseFormat.CSV;  
	try (Client client = new Client.Builder()  
	        .compressClientRequest(false)  
	        .compressServerResponse(false)  
	        .addEndpoint(Protocol.HTTP, "localhost", clickhouse.getMappedPort(8123), false)  
	        .setUsername(clickhouse.getUsername())  
	        .setPassword(clickhouse.getPassword())  
	        .useAsyncRequests(true)  
	        .build()) {  
	  
	    final var pipedOutputStream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(new ClickHouseConfig());  
	    final OutputStream compressedOutputStream = new ClickHouseLZ4OutputStream(pipedOutputStream, LZ4Factory.fastestInstance().fastCompressor(), 8192);  
	    
	    final var futureResponse = client.insert(table, pipedOutputStream.getInputStream(), format, new InsertSettings().serverSetting("decompress", "1"));  
	  
	    // write data to insert to compressedOutputStream  
	    final int numberOfRows = 2;  
	    compressedOutputStream.write("1,foo\n".getBytes());  
	    compressedOutputStream.write("2,bar\n".getBytes());  
	    compressedOutputStream.close();  
	    pipedOutputStream.close();  
	  
	    // insert setting tells ClickHouse that data are compressed  
	    // but the insert doesn't work, because the `insert` method prepend the insert statement (and it isn't compressed)
	    try (var response = futureResponse.join()) {  
	        final var writtenRows = response.getWrittenRows();  
	        System.out.println("Written rows to ClickHouse: " + writtenRows);  
	        if (writtenRows != numberOfRows) {  
	            System.err.println("Written only " + writtenRows + " from " + numberOfRows + " expected.");  
	        }  
	    }  
	}
}

Compressed batch

private static class ByteBufferBackedOutputStream extends OutputStream {  
    private final ByteBuffer buffer;  
  
    public ByteBufferBackedOutputStream(ByteBuffer buffer) {  
        this.buffer = buffer;
    }  
  
    @Override  
    public void write(int b) {  
        buffer.put((byte) b);  
    }  
  
    @Override  
    public void write(byte[] bytes, int off, int len) {  
        buffer.put(bytes, off, len);  
    }  
}

void compressedBatch() {
	final String table = "test_table";  
	final ClickHouseFormat format = ClickHouseFormat.CSV;  
	final ByteBuffer batch = ByteBuffer.allocate(1024);  
	  
	try (Client client = new Client.Builder()  
	        .compressClientRequest(false)  
	        .compressServerResponse(false)  
	        .addEndpoint(Protocol.HTTP, "localhost", clickhouse.getMappedPort(8123), false)  
	        .setUsername(clickhouse.getUsername())  
	        .setPassword(clickhouse.getPassword())  
	        .useAsyncRequests(true)  
	        .build()) {  
	  
	    final var sinkStream = new ByteBufferBackedOutputStream(batch);  
	    final OutputStream compressedOutputStream = new ClickHouseLZ4OutputStream(sinkStream, LZ4Factory.fastestInstance().fastCompressor(), 8192);  
	    
	    // write data into batch  
	    final int numberOfRows = 2;  
	    compressedOutputStream.write("1,foo\n".getBytes());  
	    compressedOutputStream.write("2,bar\n".getBytes());  
	    compressedOutputStream.flush();  
	    // now, the batch contains 2 rows in compressed form  
	  
	    final var pipedOutputStream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(new ClickHouseConfig());  
	  
	    // open request  
	    final var insertResponseFuture = client.insert(table, pipedOutputStream.getInputStream(), format, new InsertSettings().serverSetting("decompress", "1"));  
	  
	    // stream buffer  
	    pipedOutputStream.writeBytes(batch.array(), 0, batch.position());  
	    pipedOutputStream.close();  
	  
	    // await response  
	    try (var response = insertResponseFuture.join()) {  
	        final var writtenRows = response.getWrittenRows();  
	        System.out.println("Written rows to ClickHouse: " + writtenRows);  
	        if (writtenRows != numberOfRows) {  
	            System.err.println("Written only " + writtenRows + " from " + numberOfRows + " expected.");  
	        }  
	    }
}
@chernser
Copy link
Contributor

Good day, @dizider!
Thank you for the feedback!

I think option with sending SQL as query parameter is good. I will experiment with this. So in case you are reading compressed data from external source it would be great to forward it to server.

What compression algorithm do you use?
Do you plan to insert by small batches?

Thanks!

@chernser chernser added this to the Priority Backlog milestone Dec 10, 2024
@chernser chernser changed the title Inserting compressed data (using compressed input stream) [client-v2] Inserting compressed data (using compressed input stream) Dec 10, 2024
@den-crane
Copy link
Collaborator

It was supported in the old jdbc extended api
Clickhouse's HTTP API supports 'auto', 'none', 'gzip', 'deflate', 'br', 'xz', 'zstd', 'lz4', 'bz2', 'snappy'

@dizider
Copy link
Contributor Author

dizider commented Dec 11, 2024

I think option with sending SQL as query parameter is good. I will experiment with this. So in case you are reading compressed data from external source it would be great to forward it to server.

I have already modify client, with four line change, to suit my needs (SQL in query parameter). Everything works fine but the usage is not convenient as there are more configurations settings for compression etc.

What compression algorithm do you use?
Do you plan to insert by small batches?

As the example shows, I read uncompressed data from source, perform some transformation and compressed it with LZ4 (ClickHouseLZ4OutputStream). The batches are quite large ~1GB of uncompressed data, so I would like to compress data before sending. Streaming data without buffering does not work for me, because I need to do retry in case of error and the source does not provide any way to read the same data twice.

@chernser chernser modified the milestones: Priority Backlog, 0.7.2 Dec 20, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants