Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reverse merge #8

Merged
merged 102 commits into from
May 1, 2024
Merged
Changes from 1 commit
Commits
Show all changes
102 commits
Select commit Hold shift + click to select a range
df38aa6
swagger-ui upgrade to 5.15.0 Fixes (#12908)
deepthi912 Apr 12, 2024
4c51472
Bump javax.servlet.jsp:javax.servlet.jsp-api from 2.2 to 2.3.3 (#12919)
dependabot[bot] Apr 12, 2024
2c88d09
Bump org.apache.maven.plugins:maven-gpg-plugin from 3.2.2 to 3.2.3 (#…
dependabot[bot] Apr 12, 2024
2127558
Bump io.github.hakky54:sslcontext-kickstart-for-netty (#12917)
dependabot[bot] Apr 12, 2024
2c22980
Bump it.unimi.dsi:fastutil from 8.2.3 to 8.5.13 (#12916)
dependabot[bot] Apr 12, 2024
94b2f3f
Bump aws.sdk.version from 2.25.29 to 2.25.30 (#12914)
dependabot[bot] Apr 12, 2024
f86928d
Bump com.mycila:license-maven-plugin from 4.2 to 4.3 (#12912)
dependabot[bot] Apr 12, 2024
159aca6
Bump org.codehaus.mojo:exec-maven-plugin from 3.1.0 to 3.2.0 (#12911)
dependabot[bot] Apr 12, 2024
4040a19
fix(build): update node version to 16 (#12924)
jayeshchoudhary Apr 15, 2024
848fe9c
Added PR compatability test against release 1.1.0 (#12921)
abhioncbr Apr 15, 2024
2c6a84b
Improved segment build time for Lucene text index realtime to offline…
itschrispeck Apr 15, 2024
c823430
Bump io.netty:netty-bom from 4.1.108.Final to 4.1.109.Final (#12929)
dependabot[bot] Apr 15, 2024
af8fd40
Using local copy of segment instead of downloading from remote (#12863)
swaminathanmanish Apr 15, 2024
2459cfc
Bump slf4j.version from 2.0.12 to 2.0.13 (#12928)
dependabot[bot] Apr 15, 2024
684cece
Bump org.apache.maven.plugins:maven-jar-plugin from 3.3.0 to 3.4.0 (#…
dependabot[bot] Apr 15, 2024
013435a
Bump aws.sdk.version from 2.25.30 to 2.25.31 (#12926)
dependabot[bot] Apr 15, 2024
edf9d53
Specify version for commons-validator (#12935)
Jackie-Jiang Apr 15, 2024
c08ba2c
Bump org.apache.maven.scm:maven-scm-provider-gitexe from 2.0.1 to 2.1…
dependabot[bot] Apr 16, 2024
ec452a4
Refine PeerServerSegmentFinder (#12933)
Jackie-Jiang Apr 16, 2024
3c45469
Update superset docker build script (#12385)
xiangfu0 Apr 16, 2024
1393750
Allow Server throttling just before executing queries on server to al…
deepthi912 Apr 16, 2024
9b4ec33
Bump org.scala-lang.modules:scala-xml_2.12 from 1.3.0 to 2.3.0 (#12939)
dependabot[bot] Apr 16, 2024
b264512
Bump org.codehaus.mojo:buildnumber-maven-plugin from 1.3 to 3.2.0 (#1…
xiangfu0 Apr 16, 2024
d4cb93d
Fix metric rule pattern regex (#12856)
shounakmk219 Apr 16, 2024
67cb52c
Refine SegmentFetcherFactory (#12936)
Jackie-Jiang Apr 16, 2024
1d807df
Add validation check for forward index disabled if it's a REALTIME ta…
jackjlli Apr 17, 2024
7dbc345
Move PinotRelExchangeType back to the original package to fix backwar…
Jackie-Jiang Apr 17, 2024
2f3db6e
TLS Port for Minion (#12943)
suddendust Apr 17, 2024
263f4f6
Fix a typo when calculating query freshness (#12947)
Jackie-Jiang Apr 17, 2024
af2fcb7
Enhance ProtoSerializationUtils to handle class move (#12946)
Jackie-Jiang Apr 17, 2024
3b46d2c
Bump org.apache.commons:commons-text from 1.11.0 to 1.12.0 (#12950)
dependabot[bot] Apr 17, 2024
d206f12
Allowing users to pass minionInstanceTag as a param in /tasks/schedul…
tibrewalpratik17 Apr 17, 2024
2a38d14
Bump org.apache:apache from 31 to 32 (#12952)
dependabot[bot] Apr 17, 2024
ece96d3
Bump aws.sdk.version from 2.25.31 to 2.25.32 (#12951)
dependabot[bot] Apr 17, 2024
07d50fb
Isolate bad server configs during broker startup phase (#12931)
lnbest0707-uber Apr 18, 2024
ca0d381
make reflection calls compatible with 0.9.11 (#12958)
jasperjiaguo Apr 18, 2024
02b1e3d
add some tests on jsonPathString (#12954)
gortiz Apr 18, 2024
e057129
Enable complexType handling in SegmentProcessFramework (#12942)
swaminathanmanish Apr 18, 2024
8d6bcec
Bump aws.sdk.version from 2.25.32 to 2.25.33 (#12962)
dependabot[bot] Apr 18, 2024
f34abb4
Bump commons-cli:commons-cli from 1.6.0 to 1.7.0 (#12963)
dependabot[bot] Apr 18, 2024
a843ad4
Upgrade ORC version to 1.9.3 (#12956)
abhioncbr Apr 18, 2024
022e0a0
Add ability to track filtered messages offset (#12602)
tibrewalpratik17 Apr 18, 2024
6a5739f
Enhance PulsarConsumerTest (#12948)
Jackie-Jiang Apr 18, 2024
7a4c0b8
upgrade maven-shade-plugin version to 3.5.2 (#12712)
xiangfu0 Apr 18, 2024
dbbf44c
Add splitPartWithLimit and splitPartFromEnd UDFs (#12437)
deemoliu Apr 18, 2024
ea60408
hash4j version upgrade to 0.17.0 (#12968)
abhioncbr Apr 19, 2024
5b90c65
Bump moment in /pinot-controller/src/main/resources (#9030)
dependabot[bot] Apr 19, 2024
5e8428c
Bump terser from 4.8.0 to 4.8.1 in /pinot-controller/src/main/resourc…
dependabot[bot] Apr 19, 2024
31d2ee8
Bump json5 from 1.0.1 to 1.0.2 in /pinot-controller/src/main/resource…
dependabot[bot] Apr 19, 2024
da68236
Bump net.openhft:posix from 2.23.2 to 2.25ea0 (#12828)
dependabot[bot] Apr 19, 2024
76eebc2
Bump net.openhft:chronicle-core from 2.25ea13 to 2.25ea14 (#12971)
dependabot[bot] Apr 19, 2024
fe63a02
Bump org.apache.maven.plugins:maven-gpg-plugin from 3.2.3 to 3.2.4 (#…
dependabot[bot] Apr 19, 2024
bebb491
Bump aws.sdk.version from 2.25.33 to 2.25.34 (#12975)
dependabot[bot] Apr 19, 2024
d840413
reduce logging for SpecialValueTransformer (#12970)
aishikbh Apr 19, 2024
e1b0e53
Refactor PinotTaskManager class (#12964)
tibrewalpratik17 Apr 19, 2024
f83e466
Bump org.roaringbitmap:RoaringBitmap from 1.0.5 to 1.0.6 (#12985)
dependabot[bot] Apr 22, 2024
7b68aa3
Bump aws.sdk.version from 2.25.34 to 2.25.35 (#12984)
dependabot[bot] Apr 22, 2024
c9d513a
Bump org.apache.maven.plugins:maven-jar-plugin from 3.4.0 to 3.4.1 (#…
dependabot[bot] Apr 22, 2024
a852c8a
Update ORC and Hive dependency versions in the license binary file (#…
yashmayya Apr 22, 2024
a5c728f
Add back profile for shade (#12979)
xiangfu0 Apr 22, 2024
8e10320
handle absent segments so that catchup checker doesn't get stuck on t…
klsince Apr 22, 2024
dd4f0ac
Bump org.jline:jline from 3.24.1 to 3.26.0 (#12991)
dependabot[bot] Apr 23, 2024
40cf5a7
Bump aws.sdk.version from 2.25.35 to 2.25.36 (#12990)
dependabot[bot] Apr 23, 2024
0caeccf
Bump org.webjars:swagger-ui from 5.15.0 to 5.17.0 (#12989)
dependabot[bot] Apr 23, 2024
36c4b9a
Add Prefix, Suffix and Ngram UDFs (#12392)
deemoliu Apr 23, 2024
bc9e8ee
Upgrade Pulsar to 3.2.2 (#12967)
Jackie-Jiang Apr 24, 2024
33b8c88
Bump org.apache.maven.plugins:maven-shade-plugin from 3.5.2 to 3.5.3 …
dependabot[bot] Apr 24, 2024
cb16cd7
Bump com.github.luben:zstd-jni from 1.5.6-2 to 1.5.6-3 (#12999)
dependabot[bot] Apr 24, 2024
d602ffd
Bump aws.sdk.version from 2.25.36 to 2.25.37 (#12994)
dependabot[bot] Apr 24, 2024
5adb02f
Bump com.azure:azure-storage-file-datalake from 12.18.3 to 12.18.4 (#…
dependabot[bot] Apr 24, 2024
73f1620
Bump org.jline:jline from 3.26.0 to 3.26.1 (#12997)
dependabot[bot] Apr 24, 2024
2ca6666
Pull pulsar version definitaion into root POM (#13002)
Jackie-Jiang Apr 24, 2024
099a86c
Add schema as input to the decoder. (#12981)
rseetham Apr 24, 2024
99a4180
avoid useless intermediate byte array allocation for VarChunkV4Reader…
wirybeaver Apr 25, 2024
49da798
Upgrade scala maven plugin to 4.9.0 (#13007)
abhioncbr Apr 25, 2024
3f0b748
Bump aws.sdk.version from 2.25.37 to 2.25.38 (#13006)
dependabot[bot] Apr 25, 2024
84a4c70
Re-enable the Spotless plugin for Java 21 (#12992)
yashmayya Apr 25, 2024
fc98ce1
Use ArrayList instead of LinkedList in SortOperator (#12783)
gortiz Apr 25, 2024
97a2e6d
fix TextMatchFilterOperator boolean grouping (#13009)
itschrispeck Apr 25, 2024
2fb30c0
Add some multi-stage metrics (#12982)
gortiz Apr 25, 2024
571214d
Metric for count of tables configured with various tier backends (#12…
shounakmk219 Apr 26, 2024
e9cba49
Bump aws.sdk.version from 2.25.38 to 2.25.39 (#13012)
dependabot[bot] Apr 26, 2024
cb68783
Bump circe.version from 0.14.6 to 0.14.7 (#13013)
dependabot[bot] Apr 26, 2024
5fc89ce
Support NOT in StarTree Index (#12988)
Jackie-Jiang Apr 26, 2024
0be51ca
Allow apply both environment variables and system properties to user …
xiangfu0 Apr 27, 2024
fc967d0
Bump org.testng:testng from 7.10.1 to 7.10.2 (#13021)
dependabot[bot] Apr 29, 2024
2a7f320
Bump aws.sdk.version from 2.25.39 to 2.25.40 (#13022)
dependabot[bot] Apr 29, 2024
e2cadfa
Bump com.google.errorprone:error_prone_annotations from 2.26.1 to 2.2…
dependabot[bot] Apr 29, 2024
bbf63c7
Bump org.apache.datasketches:datasketches-java from 5.0.2 to 6.0.0 (#…
dependabot[bot] Apr 29, 2024
bdfb34a
Bump commons-codec:commons-codec from 1.16.1 to 1.17.0 (#13025)
dependabot[bot] Apr 29, 2024
14651a2
Bump com.puppycrawl.tools:checkstyle from 10.15.0 to 10.16.0 (#13027)
dependabot[bot] Apr 29, 2024
7b06b9f
Issue #12367 (#12922)
aditya0811 Apr 29, 2024
475708f
Use try-with-resources to close file walk stream in LocalPinotFS (#13…
yashmayya Apr 30, 2024
7413e99
Upgrade s3mock to 2.17.0 (#13028)
Jackie-Jiang Apr 30, 2024
ea0c71b
Bump org.scala-lang:scala-library from 2.11.11 to 2.11.12 and from 2.…
dependabot[bot] Apr 30, 2024
f153011
Upgrade jna to version 5.14.0 for Mac M1/M2 local execution support (…
abhioncbr Apr 30, 2024
087fca3
Ensure all the lists used in PinotQuery are ArrayList (#13017)
Jackie-Jiang Apr 30, 2024
bf28a83
Use more efficient variants of URLEncoder::encode and URLDecoder::dec…
yashmayya May 1, 2024
ad70686
Enhancement: Sketch value aggregator performance (#13020)
davecromberge May 1, 2024
0f28a5c
fix merging null multi value in partial upsert (#13031)
rohityadav1993 May 1, 2024
c8b223f
Upgrade lucene to 9.10.0 and compatibility changes to code. (#12866)
abhioncbr May 1, 2024
5d1dc73
log the log rate limiter rate for dropped broker logs (#13041)
jadami10 May 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refine SegmentFetcherFactory (apache#12936)
  • Loading branch information
Jackie-Jiang authored Apr 16, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 67cb52c04b5a2e81bfe26f4878e6782422f8f1c1
Original file line number Diff line number Diff line change
@@ -18,61 +18,45 @@
*/
package org.apache.pinot.common.utils.fetcher;

import com.google.common.base.Preconditions;
import java.io.File;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.pinot.common.auth.AuthConfig;
import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.spi.crypt.PinotCrypter;
import org.apache.pinot.spi.crypt.PinotCrypterFactory;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class SegmentFetcherFactory {
private final static SegmentFetcherFactory INSTANCE = new SegmentFetcherFactory();

static final String SEGMENT_FETCHER_CLASS_KEY_SUFFIX = ".class";
private static final String PROTOCOLS_KEY = "protocols";
private static final String ENCODED_SUFFIX = ".enc";
private static final String AUTH_KEY = CommonConstants.KEY_OF_AUTH;

private static final Logger LOGGER = LoggerFactory.getLogger(SegmentFetcherFactory.class);
private static final Random RANDOM = new Random();

private final Map<String, SegmentFetcher> _segmentFetcherMap = new HashMap<>();
private final SegmentFetcher _httpSegmentFetcher = new HttpSegmentFetcher();
private final SegmentFetcher _pinotFSSegmentFetcher = new PinotFSSegmentFetcher();

private SegmentFetcherFactory() {
// left blank
}

public static SegmentFetcherFactory getInstance() {
return INSTANCE;
}
public static final String SEGMENT_FETCHER_CLASS_KEY_SUFFIX = ".class";
public static final String PROTOCOLS_KEY = "protocols";
public static final String ENCODED_SUFFIX = ".enc";

private static final Logger LOGGER = LoggerFactory.getLogger(SegmentFetcherFactory.class);
private static final Map<String, SegmentFetcher> SEGMENT_FETCHER_MAP = new HashMap<>();
private static final SegmentFetcher HTTP_SEGMENT_FETCHER = new HttpSegmentFetcher();
private static final SegmentFetcher PINOT_FS_SEGMENT_FETCHER = new PinotFSSegmentFetcher();

/**
* Initializes the segment fetcher factory. This method should only be called once.
*/
public static void init(PinotConfiguration config)
throws Exception {
getInstance().initInternal(config);
}

private void initInternal(PinotConfiguration config)
throws Exception {
_httpSegmentFetcher.init(config); // directly, without sub-namespace
_pinotFSSegmentFetcher.init(config); // directly, without sub-namespace
HTTP_SEGMENT_FETCHER.init(config); // directly, without sub-namespace
PINOT_FS_SEGMENT_FETCHER.init(config); // directly, without sub-namespace

List<String> protocols = config.getProperty(PROTOCOLS_KEY, Collections.emptyList());
for (String protocol : protocols) {
@@ -93,22 +77,22 @@ private void initInternal(PinotConfiguration config)
}
} else {
LOGGER.info("Creating segment fetcher for protocol: {} with class: {}", protocol, segmentFetcherClassName);
segmentFetcher = (SegmentFetcher) Class.forName(segmentFetcherClassName).newInstance();
segmentFetcher = (SegmentFetcher) Class.forName(segmentFetcherClassName).getConstructor().newInstance();
}

AuthConfig authConfig = AuthProviderUtils.extractAuthConfig(config, AUTH_KEY);

PinotConfiguration subConfig = config.subset(protocol);
AuthConfig subAuthConfig = AuthProviderUtils.extractAuthConfig(subConfig, AUTH_KEY);
Map<String, Object> subConfigMap = subConfig.toMap();

Map<String, Object> subConfigMap = config.subset(protocol).toMap();
// Put global auth properties into sub-config if sub-config does not have auth properties
AuthConfig authConfig = AuthProviderUtils.extractAuthConfig(config, CommonConstants.KEY_OF_AUTH);
AuthConfig subAuthConfig = AuthProviderUtils.extractAuthConfig(subConfig, CommonConstants.KEY_OF_AUTH);
if (subAuthConfig.getProperties().isEmpty() && !authConfig.getProperties().isEmpty()) {
authConfig.getProperties().forEach((key, value) -> subConfigMap.put(AUTH_KEY + "." + key, value));
authConfig.getProperties()
.forEach((key, value) -> subConfigMap.put(CommonConstants.KEY_OF_AUTH + "." + key, value));
}

segmentFetcher.init(new PinotConfiguration(subConfigMap));

_segmentFetcherMap.put(protocol, segmentFetcher);
SEGMENT_FETCHER_MAP.put(protocol, segmentFetcher);
}
}

@@ -117,21 +101,17 @@ private void initInternal(PinotConfiguration config)
* ({@link HttpSegmentFetcher} for "http" and "https", {@link PinotFSSegmentFetcher} for other protocols).
*/
public static SegmentFetcher getSegmentFetcher(String protocol) {
return getInstance().getSegmentFetcherInternal(protocol);
}

private SegmentFetcher getSegmentFetcherInternal(String protocol) {
SegmentFetcher segmentFetcher = _segmentFetcherMap.get(protocol);
SegmentFetcher segmentFetcher = SEGMENT_FETCHER_MAP.get(protocol);
if (segmentFetcher != null) {
return segmentFetcher;
} else {
LOGGER.info("Segment fetcher is not configured for protocol: {}, using default", protocol);
switch (protocol) {
case CommonConstants.HTTP_PROTOCOL:
case CommonConstants.HTTPS_PROTOCOL:
return _httpSegmentFetcher;
return HTTP_SEGMENT_FETCHER;
default:
return _pinotFSSegmentFetcher;
return PINOT_FS_SEGMENT_FETCHER;
}
}
}
@@ -141,21 +121,15 @@ private SegmentFetcher getSegmentFetcherInternal(String protocol) {
*/
public static void fetchSegmentToLocal(URI uri, File dest)
throws Exception {
getInstance().fetchSegmentToLocalInternal(uri, dest);
getSegmentFetcher(uri.getScheme()).fetchSegmentToLocal(uri, dest);
}

/**
* Fetches a segment from URI location to local.
*/
public static void fetchSegmentToLocal(String uri, File dest)
throws Exception {
getInstance().fetchSegmentToLocalInternal(new URI(uri), dest);
}

private void fetchSegmentToLocalInternal(URI uri, File dest)
throws Exception {
// caller untars
getSegmentFetcher(uri.getScheme()).fetchSegmentToLocal(uri, dest);
fetchSegmentToLocal(new URI(uri), dest);
}

/**
@@ -167,36 +141,25 @@ private void fetchSegmentToLocalInternal(URI uri, File dest)
* @return the untared directory
* @throws Exception
*/
public static File fetchAndStreamUntarToLocal(String uri, File tempRootDir,
long maxStreamRateInByte, AtomicInteger attempts)
public static File fetchAndStreamUntarToLocal(URI uri, File tempRootDir, long maxStreamRateInByte,
AtomicInteger attempts)
throws Exception {
return getInstance().fetchAndStreamUntarToLocalInternal(new URI(uri), tempRootDir, maxStreamRateInByte, attempts);
return getSegmentFetcher(uri.getScheme()).fetchUntarSegmentToLocalStreamed(uri, tempRootDir, maxStreamRateInByte,
attempts);
}

private File fetchAndStreamUntarToLocalInternal(URI uri, File tempRootDir,
long maxStreamRateInByte, AtomicInteger attempts)
public static File fetchAndStreamUntarToLocal(String uri, File tempRootDir, long maxStreamRateInByte,
AtomicInteger attempts)
throws Exception {
return getSegmentFetcher(uri.getScheme()).fetchUntarSegmentToLocalStreamed(uri, tempRootDir, maxStreamRateInByte,
attempts);
return fetchAndStreamUntarToLocal(new URI(uri), tempRootDir, maxStreamRateInByte, attempts);
}

/**
* Fetches a segment from a URI location to a local file and decrypts it if needed
* @param uri remote segment location
* @param dest local file
*/
public static void fetchAndDecryptSegmentToLocal(String uri, File dest, String crypterName)
throws Exception {
getInstance().fetchAndDecryptSegmentToLocalInternal(uri, dest, crypterName);
}

// uris have equal weight to be selected for segment download
public static void fetchAndDecryptSegmentToLocal(List<URI> uris, File dest, String crypterName)
throws Exception {
getInstance().fetchAndDecryptSegmentToLocalInternal(uris, dest, crypterName);
}

private void fetchAndDecryptSegmentToLocalInternal(String uri, File dest, String crypterName)
public static void fetchAndDecryptSegmentToLocal(String uri, File dest, @Nullable String crypterName)
throws Exception {
if (crypterName == null) {
fetchSegmentToLocal(uri, dest);
@@ -211,16 +174,16 @@ private void fetchAndDecryptSegmentToLocalInternal(String uri, File dest, String
}
}

private void fetchAndDecryptSegmentToLocalInternal(@NonNull List<URI> uris, File dest, String crypterName)
throws Exception {
Preconditions.checkArgument(!uris.isEmpty(), "empty uris passed into the fetchAndDecryptSegmentToLocalInternal");
URI uri = uris.get(RANDOM.nextInt(uris.size()));
public static void fetchAndDecryptSegmentToLocal(String segmentName, String scheme, Supplier<List<URI>> uriSupplier,
File dest, @Nullable String crypterName)
throws Exception {
SegmentFetcher segmentFetcher = getSegmentFetcher(scheme);
if (crypterName == null) {
fetchSegmentToLocal(uri, dest);
segmentFetcher.fetchSegmentToLocal(segmentName, uriSupplier, dest);
} else {
// download
File tempDownloadedFile = new File(dest.getPath() + ENCODED_SUFFIX);
fetchSegmentToLocal(uri, tempDownloadedFile);
segmentFetcher.fetchSegmentToLocal(segmentName, uriSupplier, tempDownloadedFile);

// decrypt
PinotCrypter crypter = PinotCrypterFactory.create(crypterName);
Original file line number Diff line number Diff line change
@@ -693,27 +693,22 @@ File downloadAndDecrypt(String segmentName, SegmentZKMetadata zkMetadata, File t
}
}

// not thread safe. Caller should invoke it with safe concurrency control.
protected void downloadFromPeersWithoutStreaming(String segmentName, SegmentZKMetadata zkMetadata, File destTarFile)
throws Exception {
Preconditions.checkState(_peerDownloadScheme != null, "Download peers require non null peer download scheme");
List<URI> peerSegmentURIs =
PeerServerSegmentFinder.getPeerServerURIs(_helixManager, _tableNameWithType, segmentName, _peerDownloadScheme);
if (peerSegmentURIs.isEmpty()) {
String msg = String.format("segment %s doesn't have any peers", segmentName);
LOGGER.warn(msg);
// HelixStateTransitionHandler would catch the runtime exception and mark the segment state as Error
throw new RuntimeException(msg);
}
Preconditions.checkState(_peerDownloadScheme != null, "Peer download is not enabled for table: %s",
_tableNameWithType);
try {
// Next download the segment from a randomly chosen server using configured scheme.
SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(peerSegmentURIs, destTarFile, zkMetadata.getCrypterName());
LOGGER.info("Fetched segment {} from peers: {} to: {} of size: {}", segmentName, peerSegmentURIs, destTarFile,
SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(segmentName, _peerDownloadScheme, () -> {
List<URI> peerServerURIs =
PeerServerSegmentFinder.getPeerServerURIs(_helixManager, _tableNameWithType, segmentName,
_peerDownloadScheme);
Collections.shuffle(peerServerURIs);
return peerServerURIs;
}, destTarFile, zkMetadata.getCrypterName());
_logger.info("Downloaded tarred segment: {} from peers to: {}, file length: {}", segmentName, destTarFile,
destTarFile.length());
} catch (AttemptsExceededException e) {
LOGGER.error("Attempts exceeded when downloading segment: {} for table: {} from peers {} to: {}", segmentName,
_tableNameWithType, peerSegmentURIs, destTarFile);
_serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DOWNLOAD_FROM_PEERS_FAILURES, 1L);
} catch (Exception e) {
_serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DOWNLOAD_FROM_PEERS_FAILURES, 1);
throw e;
}
}
Original file line number Diff line number Diff line change
@@ -38,7 +38,6 @@
import org.apache.pinot.common.utils.fetcher.BaseSegmentFetcher;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
import org.apache.pinot.core.util.PeerServerSegmentFinder;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
@@ -647,26 +646,6 @@ public void testDownloadAndDecryptPeerDownload()
verify(tmgr, times(1)).downloadFromPeersWithoutStreaming("seg01", zkmd, destFile);
}

// happy case: download from peers
@Test
public void testDownloadFromPeersWithoutStreaming()
throws Exception {
URI uri = mockRemoteCopy();
InstanceDataManagerConfig config = createDefaultInstanceDataManagerConfig();
when(config.getSegmentPeerDownloadScheme()).thenReturn("http");
HelixManager helixManager = mock(HelixManager.class);
BaseTableDataManager tmgr = createTableManager(config, helixManager);
File tempRootDir = tmgr.getTmpSegmentDataDir("test-download-peer-without-streaming");
File destFile = new File(tempRootDir, "seg01" + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
try (MockedStatic<PeerServerSegmentFinder> mockPeerSegFinder = mockStatic(PeerServerSegmentFinder.class)) {
mockPeerSegFinder.when(
() -> PeerServerSegmentFinder.getPeerServerURIs(helixManager, TABLE_NAME_WITH_TYPE, "seg01",
CommonConstants.HTTP_PROTOCOL)).thenReturn(List.of(uri));
tmgr.downloadFromPeersWithoutStreaming("seg01", mock(SegmentZKMetadata.class), destFile);
}
assertEquals(FileUtils.readFileToString(destFile), "this is from somewhere remote");
}

@Test
public void testUntarAndMoveSegment()
throws IOException {