Skip to content

Commit

Permalink
Improve the unit test dataset used by CLPMutableForwardIndexV2Test an…
Browse files Browse the repository at this point in the history
…d CLPForwardIndexCreatorV2Test (#14632)

* Improve the unit test dataset used by CLPMutableForwardIndexV2Test and CLPForwardIndexCreatorV2Test

* Add compressed log data.

* Fix linting issue.

* Improved unit test code quality.
  • Loading branch information
jackluo923 authored Dec 13, 2024
1 parent 306be42 commit e645a50
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,75 +18,150 @@
*/
package org.apache.pinot.segment.local.segment.index.creator;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
import org.apache.pinot.segment.local.realtime.impl.forward.CLPMutableForwardIndexV2;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV2;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator;
import org.apache.pinot.segment.local.segment.index.forward.mutable.VarByteSVMutableForwardIndexTest;
import org.apache.pinot.segment.local.segment.index.readers.forward.CLPForwardIndexReaderV2;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;


public class CLPForwardIndexCreatorV2Test {
private static final String COLUMN_NAME = "column1";
private static final File TEMP_DIR =
new File(FileUtils.getTempDirectory(), CLPForwardIndexCreatorV2Test.class.getSimpleName());
private PinotDataBufferMemoryManager _memoryManager;
private List<String> _logMessages = new ArrayList<>();

@BeforeClass
public void setUp()
throws Exception {
TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
_memoryManager = new DirectMemoryManager(VarByteSVMutableForwardIndexTest.class.getName());

ObjectMapper objectMapper = new ObjectMapper();
try (GzipCompressorInputStream gzipInputStream = new GzipCompressorInputStream(
getClass().getClassLoader().getResourceAsStream("data/log.jsonl.gz"));
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(gzipInputStream))) {
String line;
while ((line = bufferedReader.readLine()) != null) {
JsonNode jsonNode = objectMapper.readTree(line);
_logMessages.add(jsonNode.get("message").asText());
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@AfterClass
public void tearDown()
throws IOException {
TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
}

@Test
public void testCLPWriter()
throws IOException {
List<String> logLines = new ArrayList<>();
logLines.add("INFO [PropertyCache] [HelixController-pipeline-default-pinot-(4a02a32c_DEFAULT)] "
+ "Event pinot::DEFAULT::4a02a32c_DEFAULT : Refreshed 35 property LiveInstance took 5 ms. Selective: true");
logLines.add("INFO [PropertyCache] [HelixController-pipeline-default-pinot-(4a02a32d_DEFAULT)] "
+ "Event pinot::DEFAULT::4a02a32d_DEFAULT : Refreshed 81 property LiveInstance took 4 ms. Selective: true");
logLines.add("INFO [ControllerResponseFilter] [grizzly-http-server-2] Handled request from 0.0"
+ ".0.0 GET https://0.0.0.0:8443/health?checkType=liveness, content-type null status code 200 OK");
logLines.add("INFO [ControllerResponseFilter] [grizzly-http-server-6] Handled request from 0.0"
+ ".0.0 GET https://pinot-pinot-broker-headless.managed.svc.cluster.local:8093/tables, content-type "
+ "application/json status code 200 OK");
logLines.add("null");

// Create and ingest into a clp mutable forward indexes
CLPMutableForwardIndexV2 clpMutableForwardIndexV2 = new CLPMutableForwardIndexV2("column1", _memoryManager);
for (int i = 0; i < logLines.size(); i++) {
clpMutableForwardIndexV2.setString(i, logLines.get(i));
CLPMutableForwardIndexV2 clpMutableForwardIndexV2 = new CLPMutableForwardIndexV2(COLUMN_NAME, _memoryManager);
int rawSizeBytes = 0;
int maxLength = 0;
for (int i = 0; i < _logMessages.size(); i++) {
String logMessage = _logMessages.get(i);
clpMutableForwardIndexV2.setString(i, logMessage);
rawSizeBytes += logMessage.length();
maxLength = Math.max(maxLength, logMessage.length());
}

// Create a immutable forward index from mutable forward index
CLPForwardIndexCreatorV2 clpForwardIndexCreatorV2 =
new CLPForwardIndexCreatorV2(TEMP_DIR, clpMutableForwardIndexV2, ChunkCompressionType.ZSTANDARD);
for (int i = 0; i < logLines.size(); i++) {
clpForwardIndexCreatorV2.putString(clpMutableForwardIndexV2.getString(i));
// LZ4 compression type
long rawStringFwdIndexSizeLZ4 = createStringRawForwardIndex(ChunkCompressionType.LZ4, maxLength);
long clpFwdIndexSizeLZ4 =
createAndValidateClpImmutableForwardIndex(clpMutableForwardIndexV2, ChunkCompressionType.LZ4);
// For LZ4 compression:
// 1. CLP raw forward index should achieve at least 40x compression
// 2. at least 25% smaller file size compared to standard raw forward index with LZ4 compression
Assert.assertTrue((float) rawSizeBytes / clpFwdIndexSizeLZ4 >= 40);
Assert.assertTrue((float) rawStringFwdIndexSizeLZ4 / clpFwdIndexSizeLZ4 >= 0.25);

// ZSTD compression type
long rawStringFwdIndexSizeZSTD = createStringRawForwardIndex(ChunkCompressionType.ZSTANDARD, maxLength);
long clpFwdIndexSizeZSTD =
createAndValidateClpImmutableForwardIndex(clpMutableForwardIndexV2, ChunkCompressionType.ZSTANDARD);
// For ZSTD compression
// 1. CLP raw forward index should achieve at least 66x compression
// 2. at least 19% smaller file size compared to standard raw forward index with ZSTD compression
Assert.assertTrue((float) rawSizeBytes / clpFwdIndexSizeZSTD >= 66);
Assert.assertTrue((float) rawStringFwdIndexSizeZSTD / clpFwdIndexSizeZSTD >= 0.19);
}

private long createStringRawForwardIndex(ChunkCompressionType compressionType, int maxLength)
throws IOException {
// Create a raw string immutable forward index
TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
SingleValueVarByteRawIndexCreator index =
new SingleValueVarByteRawIndexCreator(TEMP_DIR, compressionType, COLUMN_NAME, _logMessages.size(),
FieldSpec.DataType.STRING, maxLength);
for (String logMessage : _logMessages) {
index.putString(logMessage);
}
clpForwardIndexCreatorV2.seal();
clpForwardIndexCreatorV2.close();
index.seal();
index.close();

File indexFile = new File(TEMP_DIR, COLUMN_NAME + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
return indexFile.length();
}

private long createAndValidateClpImmutableForwardIndex(CLPMutableForwardIndexV2 clpMutableForwardIndexV2,
ChunkCompressionType compressionType)
throws IOException {
long indexSize = createClpImmutableForwardIndex(clpMutableForwardIndexV2, compressionType);

// Read from immutable forward index and validate the content
File indexFile = new File(TEMP_DIR, "column1" + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
File indexFile = new File(TEMP_DIR, COLUMN_NAME + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
PinotDataBuffer pinotDataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(indexFile);
CLPForwardIndexReaderV2 clpForwardIndexReaderV2 = new CLPForwardIndexReaderV2(pinotDataBuffer, logLines.size());
CLPForwardIndexReaderV2 clpForwardIndexReaderV2 = new CLPForwardIndexReaderV2(pinotDataBuffer, _logMessages.size());
CLPForwardIndexReaderV2.CLPReaderContext clpForwardIndexReaderV2Context = clpForwardIndexReaderV2.createContext();
for (int i = 0; i < logLines.size(); i++) {
Assert.assertEquals(clpForwardIndexReaderV2.getString(i, clpForwardIndexReaderV2Context), logLines.get(i));
for (int i = 0; i < _logMessages.size(); i++) {
Assert.assertEquals(clpForwardIndexReaderV2.getString(i, clpForwardIndexReaderV2Context), _logMessages.get(i));
}

return indexSize;
}

private long createClpImmutableForwardIndex(CLPMutableForwardIndexV2 clpMutableForwardIndexV2,
ChunkCompressionType compressionType)
throws IOException {
// Create a CLP immutable forward index from mutable forward index
TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
CLPForwardIndexCreatorV2 clpForwardIndexCreatorV2 =
new CLPForwardIndexCreatorV2(TEMP_DIR, clpMutableForwardIndexV2, compressionType);
for (int i = 0; i < _logMessages.size(); i++) {
clpForwardIndexCreatorV2.putString(clpMutableForwardIndexV2.getString(i));
}
clpForwardIndexCreatorV2.seal();
clpForwardIndexCreatorV2.close();

File indexFile = new File(TEMP_DIR, COLUMN_NAME + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
return indexFile.length();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
*/
package org.apache.pinot.segment.local.segment.index.forward.mutable;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
import org.apache.pinot.segment.local.realtime.impl.forward.CLPMutableForwardIndexV2;
import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
Expand All @@ -33,10 +38,24 @@

public class CLPMutableForwardIndexV2Test {
private PinotDataBufferMemoryManager _memoryManager;
private List<String> _logMessages = new ArrayList<>();

@BeforeClass
public void setUp() {
_memoryManager = new DirectMemoryManager(VarByteSVMutableForwardIndexTest.class.getName());

ObjectMapper objectMapper = new ObjectMapper();
try (GzipCompressorInputStream gzipInputStream = new GzipCompressorInputStream(
getClass().getClassLoader().getResourceAsStream("data/log.jsonl.gz"));
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(gzipInputStream))) {
String line;
while ((line = bufferedReader.readLine()) != null) {
JsonNode jsonNode = objectMapper.readTree(line);
_logMessages.add(jsonNode.get("message").asText());
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@AfterClass
Expand All @@ -52,33 +71,17 @@ public void tearDown()
public void testReadWriteOnLogMessages()
throws IOException {
try (CLPMutableForwardIndexV2 readerWriter = new CLPMutableForwardIndexV2("col1", _memoryManager)) {
List<String> logLines = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
logLines.add("INFO [PropertyCache] [HelixController-pipeline-default-pinot-(4a02a32c_DEFAULT)] "
+ "Event pinot::DEFAULT::4a02a32c_DEFAULT : Refreshed 35 property LiveInstance took 5 ms. Selective:"
+ " true");
logLines.add("INFO [PropertyCache] [HelixController-pipeline-default-pinot-(4a02a32d_DEFAULT)] "
+ "Event pinot::DEFAULT::4a02a32d_DEFAULT : Refreshed 81 property LiveInstance took 4 ms. Selective:"
+ " true");
logLines.add("INFO [ControllerResponseFilter] [grizzly-http-server-2] Handled request from 0.0"
+ ".0.0 GET https://0.0.0.0:8443/health?checkType=liveness, content-type null status code 200 OK");
logLines.add("INFO [ControllerResponseFilter] [grizzly-http-server-6] Handled request from 0.0"
+ ".0.0 GET https://pinot-pinot-broker-headless.managed.svc.cluster.local:8093/tables, content-type "
+ "application/json status code 200 OK");
logLines.add("null");
}

// Typically, log messages should be clp encoded due to low logtype and dictionary variable cardinality
Assert.assertTrue(readerWriter.isClpEncoded());

// Write
for (int i = 0; i < logLines.size(); i++) {
readerWriter.setString(i, logLines.get(i));
for (int i = 0; i < _logMessages.size(); i++) {
readerWriter.setString(i, _logMessages.get(i));
}

// Read
for (int i = 0; i < logLines.size(); i++) {
Assert.assertEquals(readerWriter.getString(i), logLines.get(i));
for (int i = 0; i < _logMessages.size(); i++) {
Assert.assertEquals(readerWriter.getString(i), _logMessages.get(i));
}
}
}
Expand Down
Binary file not shown.

0 comments on commit e645a50

Please sign in to comment.