Skip to content

Commit

Permalink
Merge branch 'master' into bburkholder/virt-threads
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb authored Oct 26, 2023
2 parents 0276e6e + c44937d commit 6a4392b
Show file tree
Hide file tree
Showing 25 changed files with 536 additions and 642 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void measureLogSearcherSearch() {
logIndexSearcher.search(
"*",
"",
0,
0L,
Long.MAX_VALUE,
500,
new DateHistogramAggBuilder(
Expand Down
4 changes: 2 additions & 2 deletions kaldb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<grpc.version>1.57.2</grpc.version>
<micrometer.version>1.11.0</micrometer.version>
<armeria.version>1.25.2</armeria.version>
<kafka.version>3.4.0</kafka.version>
<kafka.version>3.5.0</kafka.version>
<jackson.version>2.15.2</jackson.version>
<jackson.databind.version>2.15.1</jackson.databind.version>
<lucene.version>9.5.0</lucene.version>
Expand Down Expand Up @@ -487,7 +487,7 @@
<dependency>
<groupId>com.github.charithe</groupId>
<artifactId>kafka-junit</artifactId>
<version>4.2.4</version>
<version>4.2.7</version>
<scope>test</scope>
<exclusions>
<exclusion>
Expand Down
35 changes: 33 additions & 2 deletions kaldb/src/main/java/com/slack/kaldb/chunk/ReadOnlyChunkImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -389,15 +389,46 @@ public String id() {
@Override
public SearchResult<T> query(SearchQuery query) {
if (logSearcher != null) {
Long searchStartTime =
determineStartTime(query.startTimeEpochMs, chunkInfo.getDataStartTimeEpochMs());
Long searchEndTime =
determineEndTime(query.endTimeEpochMs, chunkInfo.getDataEndTimeEpochMs());

return logSearcher.search(
query.dataset,
query.queryStr,
query.startTimeEpochMs,
query.endTimeEpochMs,
searchStartTime,
searchEndTime,
query.howMany,
query.aggBuilder);
} else {
return (SearchResult<T>) SearchResult.empty();
}
}

/**
* Determines the start time to use for the query, given the original query start time and the
* start time of data in the chunk
*/
protected static Long determineStartTime(long queryStartTimeEpochMs, long chunkStartTimeEpochMs) {
Long searchStartTime = null;
if (queryStartTimeEpochMs > chunkStartTimeEpochMs) {
// if the query start time falls after the beginning of the chunk
searchStartTime = queryStartTimeEpochMs;
}
return searchStartTime;
}

/**
* Determines the end time to use for the query, given the original query end time and the end
* time of data in the chunk
*/
protected static Long determineEndTime(long queryEndTimeEpochMs, long chunkEndTimeEpochMs) {
Long searchEndTime = null;
if (queryEndTimeEpochMs < chunkEndTimeEpochMs) {
// if the query end time falls before the end of the chunk
searchEndTime = queryEndTimeEpochMs;
}
return searchEndTime;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.slack.kaldb.clusterManager;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.AbstractScheduledService;
import com.slack.kaldb.metadata.cache.CacheSlotMetadataStore;
import com.slack.kaldb.metadata.hpa.HpaMetricMetadata;
Expand Down Expand Up @@ -142,7 +143,8 @@ private void publishCacheHpaMetrics() {
}
}

private static double calculateDemandFactor(
@VisibleForTesting
protected static double calculateDemandFactor(
long totalCacheSlotCapacity, long totalReplicaDemand) {
if (totalCacheSlotCapacity == 0) {
// we have no provisioned capacity, so cannot determine a value
Expand All @@ -154,8 +156,8 @@ private static double calculateDemandFactor(
}
// demand factor will be < 1 indicating a scale-down demand, and > 1 indicating a scale-up
double rawDemandFactor = (double) (totalReplicaDemand) / (totalCacheSlotCapacity);
// round to 2 decimals
return (double) Math.round(rawDemandFactor * 100) / 100;
// round up to 2 decimals
return Math.ceil(rawDemandFactor * 100) / 100;
}

/** Updates or inserts an (ephemeral) HPA metric for the cache nodes. This is NOT threadsafe. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,20 +187,25 @@ protected Map<String, Integer> assignReplicasToCacheSlots() {
.stream()
.flatMap(
(cacheSlotsPerHost) -> {
int currentlyAssigned =
int currentlyAssignedOrLoading =
cacheSlotsPerHost.stream()
.filter(
cacheSlotMetadata ->
cacheSlotMetadata.cacheSlotState.equals(
Metadata.CacheSlotMetadata.CacheSlotState.ASSIGNED))
Metadata.CacheSlotMetadata.CacheSlotState.ASSIGNED)
|| cacheSlotMetadata.cacheSlotState.equals(
Metadata.CacheSlotMetadata.CacheSlotState.LOADING))
.toList()
.size();

return cacheSlotsPerHost.stream()
.filter(
cacheSlotMetadata ->
cacheSlotMetadata.cacheSlotState.equals(
Metadata.CacheSlotMetadata.CacheSlotState.FREE))
.limit(Math.max(0, maxConcurrentAssignmentsPerNode - currentlyAssigned));
.limit(
Math.max(
0, maxConcurrentAssignmentsPerNode - currentlyAssignedOrLoading));
})
.collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ public OpenSearchAdapter(Map<String, LuceneFieldDef> chunkSchema) {
public Query buildQuery(
String dataset,
String queryStr,
long startTimeMsEpoch,
long endTimeMsEpoch,
Long startTimeMsEpoch,
Long endTimeMsEpoch,
IndexSearcher indexSearcher)
throws IOException {
LOG.trace("Query raw input string: '{}'", queryStr);
Expand All @@ -158,11 +158,22 @@ public Query buildQuery(
try {
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();

RangeQueryBuilder rangeQueryBuilder =
new RangeQueryBuilder(LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName)
.gte(startTimeMsEpoch)
.lte(endTimeMsEpoch);
boolQueryBuilder.filter(rangeQueryBuilder);
// only add a range filter if either start or end time is provided
if (startTimeMsEpoch != null || endTimeMsEpoch != null) {
RangeQueryBuilder rangeQueryBuilder =
new RangeQueryBuilder(LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName);

// todo - consider supporting something other than GTE/LTE (ie GT/LT?)
if (startTimeMsEpoch != null) {
rangeQueryBuilder.gte(startTimeMsEpoch);
}

if (endTimeMsEpoch != null) {
rangeQueryBuilder.lte(endTimeMsEpoch);
}

boolQueryBuilder.filter(rangeQueryBuilder);
}

// todo - dataset?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private static void addTextField(
}

// TODO: Move this definition to the config file.
private static ImmutableMap<String, LuceneFieldDef> getDefaultLuceneFieldDefinitions(
public static ImmutableMap<String, LuceneFieldDef> getDefaultLuceneFieldDefinitions(
boolean enableFullTextSearch) {
ImmutableMap.Builder<String, LuceneFieldDef> fieldDefBuilder = ImmutableMap.builder();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@

public interface LogIndexSearcher<T> extends Closeable {
SearchResult<T> search(
String dataset, String query, long minTime, long maxTime, int howMany, AggBuilder aggBuilder);
String dataset, String query, Long minTime, Long maxTime, int howMany, AggBuilder aggBuilder);
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,19 @@ public void afterRefresh(boolean didRefresh) {
public SearchResult<LogMessage> search(
String dataset,
String queryStr,
long startTimeMsEpoch,
long endTimeMsEpoch,
Long startTimeMsEpoch,
Long endTimeMsEpoch,
int howMany,
AggBuilder aggBuilder) {

ensureNonEmptyString(dataset, "dataset should be a non-empty string");
ensureNonNullString(queryStr, "query should be a non-empty string");
ensureTrue(startTimeMsEpoch >= 0, "start time should be non-negative value");
ensureTrue(startTimeMsEpoch < endTimeMsEpoch, "end time should be greater than start time");
if (startTimeMsEpoch != null) {
ensureTrue(startTimeMsEpoch >= 0, "start time should be non-negative value");
}
if (startTimeMsEpoch != null && endTimeMsEpoch != null) {
ensureTrue(startTimeMsEpoch < endTimeMsEpoch, "end time should be greater than start time");
}
ensureTrue(howMany >= 0, "hits requested should not be negative.");
ensureTrue(howMany > 0 || aggBuilder != null, "Hits or aggregation should be requested.");

Expand Down
1 change: 1 addition & 0 deletions kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.slack.kaldb.proto.metadata.Metadata;
import com.slack.kaldb.recovery.RecoveryService;
import com.slack.kaldb.util.RuntimeHalterImpl;
import com.slack.kaldb.zipkinApi.ZipkinService;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.binder.jvm.ClassLoaderMetrics;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.slack.kaldb.zipkinApi;

/**
* Endpoint response object for Zipkin API (local, remote)
*
* @see <a href="https://zipkin.io/zipkin-api/#/">Zipkin API Spec</a>
*/
@SuppressWarnings("unused")
public class ZipkinEndpointResponse {
private String serviceName;

public ZipkinEndpointResponse() {}

public String getServiceName() {
return serviceName;
}

public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
}
Loading

0 comments on commit 6a4392b

Please sign in to comment.