Skip to content

Commit

Permalink
Merge branch 'linkedin:main' into rocksDBName
Browse files Browse the repository at this point in the history
  • Loading branch information
majisourav99 authored Feb 11, 2025
2 parents c153a83 + 4d5eb2f commit d82c642
Show file tree
Hide file tree
Showing 180 changed files with 3,982 additions and 1,360 deletions.
9 changes: 1 addition & 8 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -1,17 +1,10 @@
version: 2
registries:
# Helps find updates for non Maven Central dependencies'
duckdb-snapshots:
type: maven-repository
url: https://oss.sonatype.org/content/repositories/snapshots/

updates:
- package-ecosystem: "gradle"
directory: "/"
schedule:
interval: "daily"
registries:
- duckdb-snapshots
# Automatically update these dependencies
allow:
- dependency-name: "org.duckdb:duckdb_jdbc"
- dependency-name: "org.duckdb:duckdb_jdbc"
2 changes: 1 addition & 1 deletion .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Add a list of affected components in the PR title in the following format:
Valid component tags are: [da-vinci] (or [dvc]), [server], [controller], [router], [samza],
[vpj], [fast-client] (or [fc]), [thin-client] (or [tc]), [changelog] (or [cc]),
[pulsar-sink], [producer], [admin-tool], [test], [build], [doc], [script], [compat]
[pulsar-sink], [producer], [admin-tool], [test], [build], [doc], [script], [compat], [protocol]
Example title: [server][da-vinci] Use dedicated thread to persist data to storage engine
Expand Down
108 changes: 108 additions & 0 deletions .github/workflows/limit-pr-files-changed.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
name: Enforce Max Lines Changed Per File

on:
pull_request:
paths:
- '**/*.java' # Monitor changes to Java files
- '**/*.avsc' # Monitor changes to Avro schema files
- '**/*.proto' # Monitor changes to proto schema files

jobs:
enforce-lines-changed:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Fetch PR Title via GitHub API
id: pr_details
run: |
PR_NUMBER=${{ github.event.pull_request.number }}
PR_TITLE=$(curl -s -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \
-H "Accept: application/vnd.github.v3+json" \
"https://api.github.com/repos/${{ github.repository }}/pulls/$PR_NUMBER" | jq -r '.title')
echo "pr_title=$PR_TITLE" >> $GITHUB_ENV
PR_DESCRIPTION=$(curl -s -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \ -H "Accept: application/vnd.github.v3+json" \
"https://api.github.com/repos/${{ github.repository }}/pulls/$PR_NUMBER" | jq -r '.body')
{
echo "pr_description<<EOF"
echo "$PR_DESCRIPTION"
echo "EOF"
} >> "$GITHUB_ENV"
- name: Check for Override Keyword
id: check_override
run: |
# Define the keyword for override
OVERRIDE_KEYWORD="DIFFSIZEOVERRIDE"
# Check PR title and body for the keyword
if printf "%s%s" "$pr_title$pr_description" | grep -iq "$OVERRIDE_KEYWORD"; then
echo "Override keyword found. Skipping validation."
echo "override=true" >> $GITHUB_OUTPUT
else
echo "override=false" >> $GITHUB_OUTPUT
fi
- name: Calculate Lines Changed Per File
if: ${{ steps.check_override.outputs.override != 'true' }}
id: lines_changed_per_file
run: |
# Define the maximum allowed lines changed per file
MAX_LINES=500
MAX_TOTAL_LINES_ADDED=2000
TOTAL_LINED_ADDED=0
# Get the diff of the PR and process each file
EXCEEDED=false
JAVA_FILE=false
SCHEMA_FILE=false
while IFS=$'\t' read -r added removed file; do
# Skip test files
if [[ "$file" != *src/main* ]]; then
echo "Skipping file: $file"
continue
fi
if [[ "$file" == *.java ]]; then
JAVA_FILE=true
else
SCHEMA_FILE=true
fi
# Calculate total lines changed for the file
TOTAL=$((added + removed))
TOTAL_LINED_ADDED=$((TOTAL_LINED_ADDED + added))
echo "File: $file, Lines Changed: $TOTAL"
# Fail if there are both schema and Java file changes
if [[ "JAVA_FILE" == true && "SCHEMA_FILE" == true ]]; then
echo "The PR has both schema and Java code changes, please make a separate PR for schema changes."
echo "For schema file changes, please update build.gradle to update 'versionOverrides' inside compileAvro task to use fixed protocol version"
exit 1
fi
if [[ "$TOTAL" -gt "$MAX_LINES" && "$JAVA_FILE" == "true" ]]; then
echo "File $file exceeds the maximum allowed lines changed ($TOTAL > $MAX_LINES)"
EXCEEDED=true
fi
done < <(git diff --numstat origin/main | grep -E '\.(java|avsc|proto)$')
# Fail if total line added exceeds max limit
if [ "$TOTAL_LINED_ADDED" -gt "$MAX_TOTAL_LINES_ADDED" ]; then
echo "Total added lines of all files exceed the maximum allowed lines ($TOTAL_LINED_ADDED > $MAX_TOTAL_LINES_ADDED)."
exit 1
fi
# Fail if total number of lines added exceeds max limit
if [ "$EXCEEDED" = true ]; then
echo "Above files exceed the maximum allowed lines changed ($MAX_LINES)."
exit 1
fi
- name: Notify
if: failure()
run: |
echo "One or more files in the PR exceed the maximum allowed lines changed. Please review and adjust your changes to your files."
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ Gemfile.lock
.bundles_cache
docs/vendor/
clients/da-vinci-client/classHash*.txt
integrations/venice-duckdb/classHash*.txt
integrations/venice-duckdb/classHash*.txt
13 changes: 3 additions & 10 deletions all-modules/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,9 @@
// via transitive dependencies instead of enumerating all modules individually.
dependencies {
rootProject.subprojects.each { subproject ->
// Excluding venice-duckdb as it's experimental and causes build issues
if (subproject.path != project.path && subproject.subprojects.isEmpty() && subproject.path != ':integrations:venice-duckdb') {
implementation (project(subproject.path)) {
exclude group: 'org.duckdb'
exclude module: 'venice-duckdb'
}
if (subproject.path != project.path && subproject.subprojects.isEmpty()) {
implementation project(subproject.path)
}
}
implementation (project(path: ':internal:venice-test-common', configuration: 'integrationTestUtils')) {
exclude group: 'org.duckdb'
exclude module: 'venice-duckdb'
}
implementation project(path: ':internal:venice-test-common', configuration: 'integrationTestUtils')
}
6 changes: 4 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ ext.libraries = [
apacheSparkAvro: "org.apache.spark:spark-avro_${scala}:${apacheSparkVersion}",
apacheSparkCore: "org.apache.spark:spark-core_${scala}:${apacheSparkVersion}",
apacheSparkSql: "org.apache.spark:spark-sql_${scala}:${apacheSparkVersion}",
asm: "org.ow2.asm:asm:9.7",
avro: "org.apache.avro:avro:${avroVersion}",
avroCompiler: "org.apache.avro:avro-compiler:${avroVersion}",
avroMapred: "org.apache.avro:avro-mapred:${avroVersion}",
Expand All @@ -82,7 +83,7 @@ ext.libraries = [
commonsLang: 'commons-lang:commons-lang:2.6',
conscrypt: 'org.conscrypt:conscrypt-openjdk-uber:2.5.2',
d2: "com.linkedin.pegasus:d2:${pegasusVersion}",
duckdbJdbc: "org.duckdb:duckdb_jdbc:1.2.0-20250130.011706-145", // TODO: Remove SNAPSHOT when the real release is published!
duckdbJdbc: "org.duckdb:duckdb_jdbc:1.2.0",
failsafe: 'net.jodah:failsafe:2.4.0',
fastUtil: 'it.unimi.dsi:fastutil:8.3.0',
grpcNettyShaded: "io.grpc:grpc-netty-shaded:${grpcVersion}",
Expand Down Expand Up @@ -291,7 +292,6 @@ subprojects {
implementation libraries.grpcProtobuf
implementation libraries.grpcServices
implementation libraries.grpcStub
implementation 'org.ow2.asm:asm:9.7'
compileOnly libraries.tomcatAnnotations
}

Expand All @@ -314,6 +314,8 @@ subprojects {
doFirst {
def versionOverrides = [
// project(':internal:venice-common').file('src/main/resources/avro/StoreVersionState/v5', PathValidation.DIRECTORY)
project(':internal:venice-common').file('src/main/resources/avro/PartitionState/v15', PathValidation.DIRECTORY),
project(':internal:venice-common').file('src/main/resources/avro/KafkaMessageEnvelope/v11', PathValidation.DIRECTORY)
]

def schemaDirs = [sourceDir]
Expand Down
3 changes: 2 additions & 1 deletion clients/da-vinci-client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies {
implementation project(':clients:venice-thin-client')

implementation libraries.avroUtilFastserde
implementation libraries.asm
implementation libraries.caffeine
implementation libraries.fastUtil
implementation libraries.httpAsyncClient
Expand All @@ -55,4 +56,4 @@ checkerFramework {
checkers = ['org.checkerframework.checker.nullness.NullnessChecker']
skipCheckerFramework = true
excludeTests = true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ public DaVinciBackend(
LOGGER.info("Creating Da Vinci backend with managed clients: {}", managedClients);
try {
VeniceServerConfig backendConfig = configLoader.getVeniceServerConfig();

if (backendConfig.isDatabaseChecksumVerificationEnabled() && recordTransformerConfig != null) {
// The checksum verification will fail because DVRT transforms the values
throw new VeniceException("DaVinciRecordTransformer cannot be used with database checksum verification.");
}

useDaVinciSpecificExecutionStatusForError = backendConfig.useDaVinciSpecificExecutionStatusForError();
writeBatchingPushStatus = backendConfig.getDaVinciPushStatusCheckIntervalInMs() >= 0;
this.configLoader = configLoader;
Expand Down Expand Up @@ -295,10 +301,6 @@ public DaVinciBackend(
}

if (backendConfig.isBlobTransferManagerEnabled()) {
if (recordTransformerConfig != null) {
throw new VeniceException("DaVinciRecordTransformer doesn't support blob transfer.");
}

aggVersionedBlobTransferStats =
new AggVersionedBlobTransferStats(metricsRepository, storeRepository, configLoader.getVeniceServerConfig());

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
package com.linkedin.davinci.blobtransfer;

import static com.linkedin.venice.client.store.ClientFactory.getTransportClient;

import com.linkedin.davinci.blobtransfer.client.NettyFileTransferClient;
import com.linkedin.davinci.blobtransfer.server.P2PBlobTransferService;
import com.linkedin.davinci.stats.AggVersionedBlobTransferStats;
import com.linkedin.davinci.storage.StorageEngineRepository;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.venice.blobtransfer.DaVinciBlobFinder;
import com.linkedin.venice.blobtransfer.ServerBlobFinder;
import com.linkedin.venice.client.store.AbstractAvroStoreClient;
import com.linkedin.venice.client.store.AvroGenericStoreClientImpl;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
Expand All @@ -24,40 +20,14 @@ public class BlobTransferUtil {

/**
* Get a P2P blob transfer manager for DaVinci Client and start it.
* @param p2pTransferPort, the port used by the P2P transfer server and client
* @param p2pTransferServerPort, the port used by the P2P transfer server
* @param p2pTransferClientPort, the port used by the P2P transfer client
* @param baseDir, the base directory of the underlying storage
* @param clientConfig, the client config to start up a transport client
* @param storageMetadataService, the storage metadata service
* @return the blob transfer manager
* @throws Exception
*/
public static BlobTransferManager<Void> getP2PBlobTransferManagerForDVCAndStart(
int p2pTransferPort,
String baseDir,
ClientConfig clientConfig,
StorageMetadataService storageMetadataService,
ReadOnlyStoreRepository readOnlyStoreRepository,
StorageEngineRepository storageEngineRepository,
int maxConcurrentSnapshotUser,
int snapshotRetentionTimeInMin,
int blobTransferMaxTimeoutInMin,
AggVersionedBlobTransferStats aggVersionedBlobTransferStats,
BlobTransferUtils.BlobTransferTableFormat transferSnapshotTableFormat) {
return getP2PBlobTransferManagerForDVCAndStart(
p2pTransferPort,
p2pTransferPort,
baseDir,
clientConfig,
storageMetadataService,
readOnlyStoreRepository,
storageEngineRepository,
maxConcurrentSnapshotUser,
snapshotRetentionTimeInMin,
blobTransferMaxTimeoutInMin,
aggVersionedBlobTransferStats,
transferSnapshotTableFormat);
}

public static BlobTransferManager<Void> getP2PBlobTransferManagerForDVCAndStart(
int p2pTransferServerPort,
int p2pTransferClientPort,
Expand All @@ -79,12 +49,10 @@ public static BlobTransferManager<Void> getP2PBlobTransferManagerForDVCAndStart(
maxConcurrentSnapshotUser,
snapshotRetentionTimeInMin,
transferSnapshotTableFormat);
AbstractAvroStoreClient storeClient =
new AvroGenericStoreClientImpl<>(getTransportClient(clientConfig), false, clientConfig);
BlobTransferManager<Void> manager = new NettyP2PBlobTransferManager(
new P2PBlobTransferService(p2pTransferServerPort, baseDir, blobTransferMaxTimeoutInMin, blobSnapshotManager),
new NettyFileTransferClient(p2pTransferClientPort, baseDir, storageMetadataService),
new DaVinciBlobFinder(storeClient),
new DaVinciBlobFinder(clientConfig),
baseDir,
aggVersionedBlobTransferStats);
manager.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -700,6 +701,11 @@ private VeniceConfigLoader buildVeniceConfig() {
kafkaBootstrapServers = backendConfig.getString(KAFKA_BOOTSTRAP_SERVERS);
}

String recordTransformerOutputValueSchema = "null";
if (daVinciConfig.isRecordTransformerEnabled()) {
recordTransformerOutputValueSchema = Objects.toString(recordTransformerConfig.getOutputValueSchema(), "null");
}

VeniceProperties config = new PropertyBuilder().put(KAFKA_ADMIN_CLASS, ApacheKafkaAdminAdapter.class.getName())
.put(ROCKSDB_LEVEL0_FILE_NUM_COMPACTION_TRIGGER, 4) // RocksDB default config
.put(ROCKSDB_LEVEL0_SLOWDOWN_WRITES_TRIGGER, 20) // RocksDB default config
Expand All @@ -712,11 +718,7 @@ private VeniceConfigLoader buildVeniceConfig() {
.put(KAFKA_BOOTSTRAP_SERVERS, kafkaBootstrapServers)
.put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, daVinciConfig.getStorageClass() == StorageClass.MEMORY_BACKED_BY_DISK)
.put(INGESTION_USE_DA_VINCI_CLIENT, true)
.put(
RECORD_TRANSFORMER_VALUE_SCHEMA,
daVinciConfig.isRecordTransformerEnabled()
? recordTransformerConfig.getOutputValueSchema().toString()
: "null")
.put(RECORD_TRANSFORMER_VALUE_SCHEMA, recordTransformerOutputValueSchema)
.put(INGESTION_ISOLATION_CONFIG_PREFIX + "." + INGESTION_MEMORY_LIMIT, -1) // Explicitly disable memory limiter
// in Isolated Process
.put(backendConfig.toProperties())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package com.linkedin.davinci.client;

import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.venice.annotation.Experimental;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.utils.lazy.Lazy;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
Expand All @@ -25,8 +29,8 @@ public BlockingDaVinciRecordTransformer(
Schema keySchema,
Schema inputValueSchema,
Schema outputValueSchema,
boolean storeRecordsInDaVinci) {
super(recordTransformer.getStoreVersion(), keySchema, inputValueSchema, outputValueSchema, storeRecordsInDaVinci);
DaVinciRecordTransformerConfig recordTransformerConfig) {
super(recordTransformer.getStoreVersion(), keySchema, inputValueSchema, outputValueSchema, recordTransformerConfig);
this.recordTransformer = recordTransformer;
}

Expand Down Expand Up @@ -58,6 +62,19 @@ public void onEndVersionIngestion(int currentVersion) {
this.recordTransformer.onEndVersionIngestion(currentVersion);
}

public void internalOnRecovery(
AbstractStorageEngine storageEngine,
int partitionId,
InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer,
Lazy<VeniceCompressor> compressor) {
// Using a wrapper around onRecovery because when calculating the class hash it grabs the name of the current class
// that is invoking it. If we directly invoke onRecovery from this class, the class hash will be calculated based
// on the contents of BlockingDaVinciRecordTransformer, not the user's implementation of DVRT.
// We also can't override onRecovery like the other methods because this method is final and the implementation
// should never be overriden.
this.recordTransformer.onRecovery(storageEngine, partitionId, partitionStateSerializer, compressor);
}

@Override
public void close() throws IOException {
this.recordTransformer.close();
Expand Down
Loading

0 comments on commit d82c642

Please sign in to comment.