Skip to content
This repository has been archived by the owner on Jun 13, 2024. It is now read-only.

Commit

Permalink
Merge pull request #63 from FCG-LLC/splitting_timerange
Browse files Browse the repository at this point in the history
Splitting timerange
  • Loading branch information
IC3Q authored Aug 23, 2018
2 parents f7701a7 + efedc26 commit e0f1b86
Show file tree
Hide file tree
Showing 11 changed files with 367 additions and 87 deletions.
2 changes: 0 additions & 2 deletions dockerization/files/etc/jvm.config
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,3 @@
-XX:G1HeapRegionSize=64M
-XX:+UseGCOverheadLimit
-XX:+ExplicitGCInvokesConcurrent
-XX:+HeapDumpOnOutOfMemoryError
-XX:+ExitOnOutOfMemoryError
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ public class HyenaConfig
public static final String STREAMING_RECORDS_THRESHOLD = "streaming_records_threshold";
public static final String STREAMING_RECORDS_THRESHOLD_DESC = "Number of records which hyena can add/subtract to/from limit number";

private Boolean splittingEnabled = true;
public static final String SPLITTING_ENABLED = "splitting_enabled";
public static final String SPLITTING_ENABLED_DESC = "Should use multiple splits while scanning";

private Integer numberOfSplits = 5;
public static final String NUMBER_OF_SPLITS = "number_of_splits";
public static final String NUMBER_OF_SPLITS_DESC = "Defines how many parallel scans should be fired up";

private Long minDbTimestampNs = 1533074400000000L; //2018-08-01
public static final String MIN_DB_TIMESTAMP = "min_db_timestamp_ns";
public static final String MIN_DB_TIMESTAMP_DESC = "Lowest timestamp in database in nanoseconds (only used when someone gives no constraints on time in query)";

public String getHyenaHost()
{
return hyenaHost;
Expand All @@ -52,6 +64,21 @@ public long getStreamingRecordsThreshold()
return streamingRecordsThreshold;
}

public boolean getSplittingEnabled()
{
return splittingEnabled;
}

public int getNumberOfSplits()
{
return numberOfSplits;
}

public long getMinDbTimestampNs()
{
return minDbTimestampNs;
}

@Config("hyena.url")
@ConfigDescription("Hyena host address")
public HyenaConfig setHyenaHost(String hyenaHost)
Expand Down Expand Up @@ -83,4 +110,28 @@ public HyenaConfig setStreamingRecordsThreshold(Long threshold)
this.streamingRecordsThreshold = threshold;
return this;
}

@Config("hyena." + SPLITTING_ENABLED)
@ConfigDescription(SPLITTING_ENABLED_DESC)
public HyenaConfig setSplittingEnabled(Boolean splittingEnabled)
{
this.splittingEnabled = splittingEnabled;
return this;
}

@Config("hyena." + NUMBER_OF_SPLITS)
@ConfigDescription(NUMBER_OF_SPLITS_DESC)
public HyenaConfig setNumberOfSplits(Integer numberOfSplits)
{
this.numberOfSplits = numberOfSplits;
return this;
}

@Config("hyena." + MIN_DB_TIMESTAMP)
@ConfigDescription(MIN_DB_TIMESTAMP_DESC)
public HyenaConfig setMinDbTimestampNs(Long timestamp)
{
this.minDbTimestampNs = timestamp;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,20 @@

import java.util.List;

import static co.llective.presto.hyena.HyenaConfig.MIN_DB_TIMESTAMP;
import static co.llective.presto.hyena.HyenaConfig.MIN_DB_TIMESTAMP_DESC;
import static co.llective.presto.hyena.HyenaConfig.NUMBER_OF_SPLITS;
import static co.llective.presto.hyena.HyenaConfig.NUMBER_OF_SPLITS_DESC;
import static co.llective.presto.hyena.HyenaConfig.SPLITTING_ENABLED;
import static co.llective.presto.hyena.HyenaConfig.SPLITTING_ENABLED_DESC;
import static co.llective.presto.hyena.HyenaConfig.STREAMING_ENABLED;
import static co.llective.presto.hyena.HyenaConfig.STREAMING_ENABLED_DESC;
import static co.llective.presto.hyena.HyenaConfig.STREAMING_RECORDS_LIMIT;
import static co.llective.presto.hyena.HyenaConfig.STREAMING_RECORDS_LIMIT_DESC;
import static co.llective.presto.hyena.HyenaConfig.STREAMING_RECORDS_THRESHOLD;
import static co.llective.presto.hyena.HyenaConfig.STREAMING_RECORDS_THRESHOLD_DESC;
import static com.facebook.presto.spi.session.PropertyMetadata.booleanSessionProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.integerSessionProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.longSessionProperty;

public class HyenaConnectorSessionProperties
Expand All @@ -39,6 +46,21 @@ public HyenaConnectorSessionProperties(HyenaConfig hyenaConfig)
STREAMING_RECORDS_THRESHOLD,
STREAMING_RECORDS_THRESHOLD_DESC,
hyenaConfig.getStreamingRecordsThreshold(),
false),
booleanSessionProperty(
SPLITTING_ENABLED,
SPLITTING_ENABLED_DESC,
hyenaConfig.getSplittingEnabled(),
false),
integerSessionProperty(
NUMBER_OF_SPLITS,
NUMBER_OF_SPLITS_DESC,
hyenaConfig.getNumberOfSplits(),
false),
longSessionProperty(
MIN_DB_TIMESTAMP,
MIN_DB_TIMESTAMP_DESC,
hyenaConfig.getMinDbTimestampNs(),
false));
}

Expand All @@ -61,4 +83,19 @@ public static long getStreamingRecordsThreshold(ConnectorSession session)
{
return session.getProperty(STREAMING_RECORDS_THRESHOLD, Long.class);
}

public static boolean getSplittingEnabled(ConnectorSession session)
{
return session.getProperty(SPLITTING_ENABLED, Boolean.class);
}

public static int getNumberOfSplits(ConnectorSession session)
{
return session.getProperty(NUMBER_OF_SPLITS, Integer.class);
}

public static long getMinDbTimestampNs(ConnectorSession session)
{
return session.getProperty(MIN_DB_TIMESTAMP, Long.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ private ScanAndFilters scanAndFiltersFromRange(HyenaColumnHandle column, Range r

// handle = situation
if (range.isSingleValue()) {
//TODO: split string%string case
ScanFilter singleFilter = createSingleFilter(column, ScanComparison.Eq, range.getSingleValue());
andFilters.add(singleFilter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ public class HyenaRecordCursor
private long constructorFinishMs;
private long iteratingStartNs;

public HyenaRecordCursor(HyenaSession hyenaSession, ConnectorSession connectorSession, List<HyenaColumnHandle> columns, TupleDomain<HyenaColumnHandle> predicate)
public HyenaRecordCursor(HyenaSession hyenaSession, ConnectorSession connectorSession, List<HyenaColumnHandle> columns, TupleDomain<HyenaColumnHandle> predicate, Optional<TimeBoundaries> timeBoundaries)
{
this(new HyenaPredicatesUtil(), hyenaSession, connectorSession, columns, predicate);
this(new HyenaPredicatesUtil(), hyenaSession, connectorSession, columns, predicate, timeBoundaries);
}

public HyenaRecordCursor(HyenaPredicatesUtil predicateHandler, HyenaSession hyenaSession, ConnectorSession connectorSession, List<HyenaColumnHandle> columns, TupleDomain<HyenaColumnHandle> predicate)
public HyenaRecordCursor(HyenaPredicatesUtil predicateHandler, HyenaSession hyenaSession, ConnectorSession connectorSession, List<HyenaColumnHandle> columns, TupleDomain<HyenaColumnHandle> predicate, Optional<TimeBoundaries> timeBoundaries)
{
constructorStartMs = System.currentTimeMillis();
this.hyenaSession = hyenaSession;
Expand All @@ -81,7 +81,7 @@ public HyenaRecordCursor(HyenaPredicatesUtil predicateHandler, HyenaSession hyen
this.streamingLimit = HyenaConnectorSessionProperties.getStreamingRecordsLimit(connectorSession);
this.streamingThreshold = HyenaConnectorSessionProperties.getStreamingRecordsThreshold(connectorSession);

this.scanRequest = buildScanRequest(predicateHandler, columns, predicate);
this.scanRequest = buildScanRequest(predicateHandler, columns, predicate, timeBoundaries);

log.info("Filters: " + StringUtils.join(this.scanRequest.getFilters(), ", "));

Expand All @@ -91,7 +91,7 @@ public HyenaRecordCursor(HyenaPredicatesUtil predicateHandler, HyenaSession hyen
constructorFinishMs = System.currentTimeMillis();
}

private ScanRequest buildScanRequest(HyenaPredicatesUtil predicateHandler, List<HyenaColumnHandle> columns, TupleDomain<HyenaColumnHandle> predicate)
private ScanRequest buildScanRequest(HyenaPredicatesUtil predicateHandler, List<HyenaColumnHandle> columns, TupleDomain<HyenaColumnHandle> predicate, Optional<TimeBoundaries> timeBoundaries)
{
ScanRequest req = new ScanRequest();
req.setProjection(new ArrayList<>());
Expand All @@ -101,17 +101,19 @@ private ScanRequest buildScanRequest(HyenaPredicatesUtil predicateHandler, List<
req.getProjection().add(col.getOrdinalPosition());
}

Optional<TimeBoundaries> tsBoundaries = predicateHandler.getTsConstraints(predicate);
if (tsBoundaries.isPresent()) {
req.setMinTs(tsBoundaries.get().getStart());
req.setMaxTs(tsBoundaries.get().getEnd());
if (timeBoundaries.isPresent()) {
req.setMinTs(timeBoundaries.get().getStart());
req.setMaxTs(timeBoundaries.get().getEnd());
}
else {
req.setMinTs(0L);
req.setMaxTs(Long.MAX_VALUE);
}

ScanOrFilters filters = predicateHandler.predicateToFilters(predicate);

applyBoundariesToTimestampFilters(timeBoundaries, filters);

req.getFilters().addAll(filters);

if (streamingEnabled) {
Expand All @@ -121,10 +123,30 @@ private ScanRequest buildScanRequest(HyenaPredicatesUtil predicateHandler, List<
else {
req.setScanConfig(Optional.empty());
}
log.info("Time constraints:\t" + req.getMinTs() + " -\t" + req.getMaxTs());
log.info("Filters: " + StringUtils.join(req.getFilters(), ", "));

return req;
}

/**
* Applies time boundaries to timestamp filters if those exist.
* @param timeBoundaries time boundaries for split
* @param filters filters built for current scan
*/
private void applyBoundariesToTimestampFilters(Optional<TimeBoundaries> timeBoundaries, ScanOrFilters filters)
{
timeBoundaries.ifPresent(timeBoundaries1 -> filters.forEach(x -> x.stream().filter(y -> y.getColumn() == 0).forEach(
y -> {
if (y.getOp().equals(ScanComparison.Gt) || y.getOp().equals(ScanComparison.GtEq) && (Long) y.getValue() < timeBoundaries1.getStart()) {
y.setValue(timeBoundaries1.getStart());
}
else if (y.getOp().equals(ScanComparison.Lt) || y.getOp().equals(ScanComparison.LtEq) && (Long) y.getValue() > timeBoundaries1.getEnd()) {
y.setValue(timeBoundaries1.getEnd());
}
})));
}

/**
* Fetches records from database.
* If there are 0 records in next chunk it tries until there will be results or it is the end of the scan.
Expand Down Expand Up @@ -277,7 +299,7 @@ public boolean getBoolean(int field)
@Override
public long getLong(int field)
{
// TODO: temporal workaround for not filled source_id by hyena (we only have packet_headers now)
// TODO: temporal workaround for not filled source_id by hyena (we only have one source now)
if (columns.get(field).getColumnName().equals("source_id")) {
return 1L;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package co.llective.presto.hyena;

import co.llective.presto.hyena.util.TimeBoundaries;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.RecordCursor;
import com.facebook.presto.spi.RecordSet;
Expand All @@ -21,6 +22,7 @@
import com.google.common.collect.ImmutableList;

import java.util.List;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

Expand All @@ -30,6 +32,7 @@ public class HyenaRecordSet
private final List<HyenaColumnHandle> columns;
private final List<Type> columnTypes;
private final TupleDomain<HyenaColumnHandle> effectivePredicate;
private final Optional<TimeBoundaries> timeBoundaries;
private final HyenaSession hyenaSession;
private final ConnectorSession connectorSession;

Expand All @@ -44,6 +47,7 @@ public HyenaRecordSet(HyenaSession hyenaSession, ConnectorSession connectorSessi
}
this.columnTypes = types.build();
this.effectivePredicate = split.getEffectivePredicate();
this.timeBoundaries = split.getTimeBoundaries();

this.hyenaSession = requireNonNull(hyenaSession, "hyenaSession is null");
this.connectorSession = requireNonNull(connectorSession, "ConnectorSession is null");
Expand All @@ -58,6 +62,6 @@ public List<Type> getColumnTypes()
@Override
public RecordCursor cursor()
{
return new HyenaRecordCursor(hyenaSession, connectorSession, columns, effectivePredicate);
return new HyenaRecordCursor(hyenaSession, connectorSession, columns, effectivePredicate, timeBoundaries);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package co.llective.presto.hyena;

import co.llective.presto.hyena.util.TimeBoundaries;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.predicate.TupleDomain;
Expand All @@ -21,6 +22,7 @@
import com.google.common.collect.ImmutableList;

import java.util.List;
import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;
Expand All @@ -30,14 +32,17 @@ public class HyenaSplit
{
private final HostAddress address;
private final TupleDomain<HyenaColumnHandle> effectivePredicate;
private final Optional<TimeBoundaries> timeBoundaries;

@JsonCreator
public HyenaSplit(
@JsonProperty("address") HostAddress address,
@JsonProperty("effectivePredicate") TupleDomain<HyenaColumnHandle> effectivePredicate)
@JsonProperty("effectivePredicate") TupleDomain<HyenaColumnHandle> effectivePredicate,
@JsonProperty("timeBoundaries") Optional<TimeBoundaries> timeBoundaries)
{
this.address = requireNonNull(address, "address is null");
this.effectivePredicate = requireNonNull(effectivePredicate, "effectivePredicate is null");
this.timeBoundaries = requireNonNull(timeBoundaries);
}

@JsonProperty
Expand All @@ -52,6 +57,12 @@ public TupleDomain<HyenaColumnHandle> getEffectivePredicate()
return effectivePredicate;
}

@JsonProperty
public Optional<TimeBoundaries> getTimeBoundaries()
{
return timeBoundaries;
}

@Override
public boolean isRemotelyAccessible()
{
Expand Down
Loading

0 comments on commit e0f1b86

Please sign in to comment.