Skip to content

Commit

Permalink
Merge pull request #2982 from atlanhq/PLT-1300-search-cancellation-fr…
Browse files Browse the repository at this point in the history
…om-metastore-with-global-context-cache-using-redis

PLT-1300: search cancellation from metastore with global context cache using redis
  • Loading branch information
sumandas0 authored May 3, 2024
2 parents 5687784 + e129a39 commit 635b921
Show file tree
Hide file tree
Showing 13 changed files with 255 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public interface MetricsRegistry {

void collect(String requestId, String requestUri, AtlasPerfMetrics metrics);

void collectIndexsearch(String requestId, String requestUri, List<AtlasPerfMetrics.Metric> applicationMetrics);
void collectApplicationMetrics(String requestId, String requestUri, List<AtlasPerfMetrics.Metric> applicationMetrics);

void scrape(PrintWriter writer) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package org.apache.atlas.service.metrics;

import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.*;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.utils.AtlasMetricType;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -64,35 +62,42 @@ public void collect(String requestId, String requestUri, AtlasPerfMetrics metric
}
}
//Use this if you want to publish Histograms
public void collectIndexsearch(String requestId, String requestUri, List<AtlasPerfMetrics.Metric> applicationMetrics){
public void collectApplicationMetrics(String requestId, String requestUri, List<AtlasPerfMetrics.Metric> applicationMetrics){
try {
for(AtlasPerfMetrics.Metric metric : applicationMetrics){
Timer.builder(APPLICATION_LEVEL_METRICS_SUMMARY)
.serviceLevelObjectives(
Duration.ofMillis(500),
Duration.ofMillis(750),
Duration.ofMillis(1000),
Duration.ofMillis(1200),
Duration.ofMillis(1500),
Duration.ofSeconds(2),
Duration.ofSeconds(3),
Duration.ofSeconds(4),
Duration.ofSeconds(5),
Duration.ofSeconds(7),
Duration.ofSeconds(10),
Duration.ofSeconds(15),
Duration.ofSeconds(20),
Duration.ofSeconds(25),
Duration.ofSeconds(30),
Duration.ofSeconds(40),
Duration.ofSeconds(60),
Duration.ofSeconds(90),
Duration.ofSeconds(120),
Duration.ofSeconds(180)
)
.publishPercentiles(PERCENTILES)
.tags(convertToMicrometerTags(metric.getTags()))
.register(getMeterRegistry()).record(metric.getTotalTimeMSecs(), TimeUnit.MILLISECONDS);
if (metric.getMetricType() == AtlasMetricType.COUNTER) {
Counter.builder(metric.getName())
.tags(convertToMicrometerTags(metric.getTags()))
.register(getMeterRegistry())
.increment(metric.getInvocations());
} else {
Timer.builder(APPLICATION_LEVEL_METRICS_SUMMARY)
.serviceLevelObjectives(
Duration.ofMillis(500),
Duration.ofMillis(750),
Duration.ofMillis(1000),
Duration.ofMillis(1200),
Duration.ofMillis(1500),
Duration.ofSeconds(2),
Duration.ofSeconds(3),
Duration.ofSeconds(4),
Duration.ofSeconds(5),
Duration.ofSeconds(7),
Duration.ofSeconds(10),
Duration.ofSeconds(15),
Duration.ofSeconds(20),
Duration.ofSeconds(25),
Duration.ofSeconds(30),
Duration.ofSeconds(40),
Duration.ofSeconds(60),
Duration.ofSeconds(90),
Duration.ofSeconds(120),
Duration.ofSeconds(180)
)
.publishPercentiles(PERCENTILES)
.tags(convertToMicrometerTags(metric.getTags()))
.register(getMeterRegistry()).record(metric.getTotalTimeMSecs(), TimeUnit.MILLISECONDS);
}
}
} catch (Exception e) {
LOG.error("Failed to collect metrics", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public abstract class AbstractRedisService implements RedisService {
private static final String ATLAS_METASTORE_SERVICE = "atlas-metastore-service";

RedissonClient redisClient;
RedissonClient redisCacheClient;
Map<String, RLock> keyLockMap;
Configuration atlasConfig;
long waitTimeInMS;
Expand Down Expand Up @@ -71,6 +72,25 @@ public void releaseDistributedLock(String key) {
}
}

@Override
public String getValue(String key) {
// If value doesn't exist, return null else return the value
return (String) redisCacheClient.getBucket(convertToNamespace(key)).get();
}

@Override
public String putValue(String key, String value) {
// Put the value in the redis cache with TTL
redisCacheClient.getBucket(convertToNamespace(key)).set(value, 30, TimeUnit.SECONDS);
return value;
}

@Override
public void removeValue(String key) {
// Remove the value from the redis cache
redisCacheClient.getBucket(convertToNamespace(key)).delete();
}

private String getHostAddress() throws UnknownHostException {
return InetAddress.getLocalHost().getHostAddress();
}
Expand All @@ -85,6 +105,11 @@ private Config initAtlasConfig() throws AtlasException {
return redisConfig;
}

private String convertToNamespace(String key){
// Append key with namespace :atlas
return "atlas:"+key;
}

Config getLocalConfig() throws AtlasException {
Config config = initAtlasConfig();
config.useSingleServer()
Expand Down Expand Up @@ -112,6 +137,26 @@ Config getProdConfig() throws AtlasException {
return config;
}

Config getCacheImplConfig() {
Config config = new Config();
config.useSentinelServers()
.setClientName(ATLAS_METASTORE_SERVICE+"-redisCache")
.setReadMode(ReadMode.MASTER_SLAVE)
.setCheckSentinelsList(false)
.setKeepAlive(true)
.setMasterConnectionMinimumIdleSize(10)
.setMasterConnectionPoolSize(20)
.setSlaveConnectionMinimumIdleSize(10)
.setSlaveConnectionPoolSize(20)
.setMasterName(atlasConfig.getString(ATLAS_REDIS_MASTER_NAME))
.addSentinelAddress(formatUrls(atlasConfig.getStringArray(ATLAS_REDIS_SENTINEL_URLS)))
.setUsername(atlasConfig.getString(ATLAS_REDIS_USERNAME))
.setPassword(atlasConfig.getString(ATLAS_REDIS_PASSWORD))
.setTimeout(50) //Setting UP timeout to 50ms
.setRetryAttempts(0);
return config;
}

private String[] formatUrls(String[] urls) throws IllegalArgumentException {
if (ArrayUtils.isEmpty(urls)) {
getLogger().error("Invalid redis cluster urls");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,21 @@ public void releaseDistributedLock(String key) {
//do nothing
}

@Override
public String getValue(String key) {
return null;
}

@Override
public String putValue(String key, String value) {
return null;
}

@Override
public void removeValue(String key) {

}

@Override
public Logger getLogger() {
return LOG;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ public interface RedisService {

void releaseDistributedLock(String key);

String getValue(String key);

String putValue(String key, String value);

void removeValue(String key);

Logger getLogger();

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class RedisServiceImpl extends AbstractRedisService{
@PostConstruct
public void init() throws AtlasException {
redisClient = Redisson.create(getProdConfig());
redisCacheClient = Redisson.create(getCacheImplConfig());
LOG.info("Sentinel redis client created successfully.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,25 @@ public class RedisServiceLocalImpl extends AbstractRedisService {
@PostConstruct
public void init() throws AtlasException {
redisClient = Redisson.create(getLocalConfig());
redisCacheClient = Redisson.create(getLocalConfig());
LOG.info("Local redis client created successfully.");
}

@Override
public String getValue(String key) {
return null;
}

@Override
public String putValue(String key, String value) {
return null;
}

@Override
public void removeValue(String key) {

}

@Override
public Logger getLogger() {
return LOG;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.apache.atlas.utils;

public enum AtlasMetricType {
COUNTER,
GAUGE,
HISTOGRAM,
METER,
TIMER
}
14 changes: 14 additions & 0 deletions common/src/main/java/org/apache/atlas/utils/AtlasPerfMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ long getElapsedTime() {

public static class Metric {
private final String name;

private AtlasMetricType metricType;
private long invocations = 0;
private long totalTimeMSecs = 0;
HashMap<String, String> tags = new HashMap<>();
Expand All @@ -112,6 +114,14 @@ public Metric(String name) {
this.name = name;
}

public void setMetricType(AtlasMetricType metricType) {
this.metricType = metricType;
}

public AtlasMetricType getMetricType() {
return metricType;
}

public String getName() {
return name;
}
Expand All @@ -135,5 +145,9 @@ public HashMap<String, String> getTags() {
return tags;
}

public void incrementInvocations() {
invocations++;
}

}
}
4 changes: 4 additions & 0 deletions graphdb/janus/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-server-api</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Loading

0 comments on commit 635b921

Please sign in to comment.