Skip to content

Commit

Permalink
[controller][h2v][common][router] Improve resiliency in ZSTD dictiona…
Browse files Browse the repository at this point in the history
…ry retrieval (linkedin#852)

* [controller][h2v][common][router] Improve resiliency in ZSTD dictionary retrieval

1. Catch throwable in router dictionary fetching thread so the thread doesn't die when encountering
issues fetching dictionary for one store.

2. Replaced static final field for empty push dictionary in ZstdWithDictCompressor to Lazy wrapped
final fields in corresponding classes. This way it doesn't get initialized if it's not used. This
is the case for routers since it was initialized but never used. Failure to initialize the field
will render the entire ZstdWithDictCompressor class unable to initialize.
xunyin8 authored Feb 12, 2024
1 parent db39ef3 commit 2e5dc16
Showing 5 changed files with 47 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -203,6 +203,7 @@ public class VenicePushJob implements AutoCloseable {

// Lazy state
private final Lazy<Properties> sslProperties;
private final Lazy<ByteBuffer> emptyPushZSTDDictionary;
private VeniceWriter<KafkaKey, byte[], byte[]> veniceWriter;
/** TODO: refactor to use {@link Lazy} */

@@ -289,6 +290,8 @@ public VenicePushJob(String jobId, Properties vanillaProps) {
LOGGER.info("Push job heartbeat is NOT enabled.");
this.pushJobHeartbeatSenderFactory = new NoOpPushJobHeartbeatSenderFactory();
}
emptyPushZSTDDictionary =
Lazy.of(() -> ByteBuffer.wrap(ZstdWithDictCompressor.buildDictionaryOnSyntheticAvroData()));
}

// This is a part of the public API. There is value in exposing this to users of VenicePushJob for reporting purposes
@@ -1530,7 +1533,7 @@ private Optional<ByteBuffer> getCompressionDictionary() throws VeniceException {
LOGGER.info(
"compression strategy is {} with no input records: Generating dictionary from synthetic data",
pushJobSetting.storeCompressionStrategy);
compressionDictionary = ZstdWithDictCompressor.EMPTY_PUSH_ZSTD_DICTIONARY;
compressionDictionary = emptyPushZSTDDictionary.get();
return Optional.of(compressionDictionary);
}

Original file line number Diff line number Diff line change
@@ -23,19 +23,19 @@
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.IOUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


public class ZstdWithDictCompressor extends VeniceCompressor {
private static final Logger LOGGER = LogManager.getLogger(ZstdWithDictCompressor.class);
private final CloseableThreadLocal<ZstdCompressCtx> compressor;
private final CloseableThreadLocal<ZstdDecompressCtx> decompressor;
private final ZstdDictCompress dictCompress;
private final ZstdDictDecompress dictDecompress;
private final byte[] dictionary;
private final int level;

public static final ByteBuffer EMPTY_PUSH_ZSTD_DICTIONARY =
ByteBuffer.wrap(ZstdWithDictCompressor.buildDictionaryOnSyntheticAvroData());

public ZstdWithDictCompressor(final byte[] dictionary, int level) {
super(CompressionStrategy.ZSTD_WITH_DICT);
this.dictionary = dictionary;
@@ -173,25 +173,30 @@ private void validateActualDecompressedSize(int actual, int expected) {
* @return a zstd compression dictionary trained on small amount of avro data
*/
public static byte[] buildDictionaryOnSyntheticAvroData() {
AvroSerializer<Object> serializer = new AvroSerializer<>(FakeCompressingSchema.getClassSchema());
// Insert fake records. We need to generate at least some data for the
// dictionary as failing to do so will result in the library throwing
// an exception (it's only able to generate a dictionary with a minimum threshold of test data).
// So we train on a small amount of basic avro data to
// at least gain some partial effectiveness.
List<byte[]> values = new ArrayList<>(50);
for (int i = 0; i < 50; ++i) {
GenericRecord value = new GenericData.Record(FakeCompressingSchema.getClassSchema());
value.put("id", i);
String name = i + "_name";
value.put("name", name);
values.add(i, serializer.serialize(value));
}
ZstdDictTrainer trainer = new ZstdDictTrainer(200 * BYTES_PER_MB, 100 * BYTES_PER_KB);
for (byte[] value: values) {
trainer.addSample(value);
try {
AvroSerializer<Object> serializer = new AvroSerializer<>(FakeCompressingSchema.getClassSchema());
// Insert fake records. We need to generate at least some data for the
// dictionary as failing to do so will result in the library throwing
// an exception (it's only able to generate a dictionary with a minimum threshold of test data).
// So we train on a small amount of basic avro data to
// at least gain some partial effectiveness.
List<byte[]> values = new ArrayList<>(50);
for (int i = 0; i < 50; ++i) {
GenericRecord value = new GenericData.Record(FakeCompressingSchema.getClassSchema());
value.put("id", i);
String name = i + "_name";
value.put("name", name);
values.add(i, serializer.serialize(value));
}
ZstdDictTrainer trainer = new ZstdDictTrainer(200 * BYTES_PER_MB, 100 * BYTES_PER_KB);
for (byte[] value: values) {
trainer.addSample(value);
}
return trainer.trainSamples();
} catch (Throwable throwable) {
LOGGER.error("Caught throwable while trying to build dictionary on synthetic data", throwable);
throw throwable;
}
return trainer.trainSamples();
}

@Override
Original file line number Diff line number Diff line change
@@ -472,7 +472,8 @@ private Properties createAndPushStore(String clusterName, String storeName) thro
UpdateStoreQueryParams updateStoreQueryParams =
new UpdateStoreQueryParams().setStorageQuotaInByte(Store.UNLIMITED_STORAGE_QUOTA)
.setHybridRewindSeconds(TEST_TIMEOUT)
.setHybridOffsetLagThreshold(2L);
.setHybridOffsetLagThreshold(2L)
.setCompressionStrategy(CompressionStrategy.ZSTD_WITH_DICT);
IntegrationTestPushUtils.createStoreForJob(clusterName, keySchemaStr, valueSchemaStr, props, updateStoreQueryParams)
.close();

Original file line number Diff line number Diff line change
@@ -415,6 +415,8 @@ public class VeniceHelixAdmin implements Admin, StoreCleaner {
private final Object LIVENESS_HEARTBEAT_CLIENT_LOCK = new Object();
private AvroSpecificStoreClient<BatchJobHeartbeatKey, BatchJobHeartbeatValue> livenessHeartbeatStoreClient = null;

private final Lazy<ByteBuffer> emptyPushZSTDDictionary;

public VeniceHelixAdmin(
VeniceControllerMultiClusterConfig multiClusterConfigs,
MetricsRepository metricsRepository,
@@ -683,6 +685,8 @@ public void handleDeletedInstances(Set<Instance> deletedInstances) {
}
});
}
emptyPushZSTDDictionary =
Lazy.of(() -> ByteBuffer.wrap(ZstdWithDictCompressor.buildDictionaryOnSyntheticAvroData()));
}

private VeniceProperties getPubSubSSLPropertiesFromControllerConfig(String pubSubBootstrapServers) {
@@ -2613,7 +2617,7 @@ private Pair<Boolean, Version> addVersion(
// so we generate a dictionary based on synthetic data. This is done in vpj driver
// as well, but this code will be triggered in cases like Samza batch push job
// which is independent of the vpj flow.
compressionDictionaryBuffer = ZstdWithDictCompressor.EMPTY_PUSH_ZSTD_DICTIONARY;
compressionDictionaryBuffer = emptyPushZSTDDictionary.get();
}

final Version finalVersion = version;
Original file line number Diff line number Diff line change
@@ -28,8 +28,8 @@
import java.io.IOException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -209,7 +209,15 @@ public DictionaryRetrievalService(
continue;
}

downloadDictionaries(Arrays.asList(kafkaTopic));
try {
downloadDictionaries(Collections.singletonList(kafkaTopic));
} catch (Throwable throwable) {
// Catch throwable so the thread don't die when encountering issues with one store/dictionary.
LOGGER.error(
"Caught a throwable while trying to fetch dictionary for store version: {}. Will not retry",
kafkaTopic,
throwable);
}
}
};

0 comments on commit 2e5dc16

Please sign in to comment.