Skip to content
This repository has been archived by the owner on Jun 13, 2024. It is now read-only.

Commit

Permalink
Merge pull request #56 from FCG-LLC/streaming
Browse files Browse the repository at this point in the history
Streaming
  • Loading branch information
IC3Q authored Aug 9, 2018
2 parents f71d952 + 0341a59 commit f7701a7
Show file tree
Hide file tree
Showing 8 changed files with 398 additions and 34 deletions.
4 changes: 4 additions & 0 deletions dockerization/files/etc/catalog/hyena.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
connector.name=hyena
hyena.url=ipc:///tmp/hyena/hyena.ipc

hyena.streaming_enabled=true
hyena.streaming_records_limit=2000000
hyena.streaming_records_threshold=2000000
2 changes: 1 addition & 1 deletion presto-hyena/hyena-java
Submodule hyena-java updated from 9b7d72 to 4dd3b0
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,67 @@ public class HyenaConfig
{
private String hyenaHost = "localhost";

private Boolean streamingEnabled = true;
public static final String STREAMING_ENABLED = "streaming_enabled";
public static final String STREAMING_ENABLED_DESC = "Should records streaming be used";

private Long streamingRecordsLimit = 200000L;
public static final String STREAMING_RECORDS_LIMIT = "streaming_records_limit";
public static final String STREAMING_RECORDS_LIMIT_DESC = "Numbers of records fetched per subscan from hyena";

private Long streamingRecordsThreshold = 200000L;
public static final String STREAMING_RECORDS_THRESHOLD = "streaming_records_threshold";
public static final String STREAMING_RECORDS_THRESHOLD_DESC = "Number of records which hyena can add/subtract to/from limit number";

public String getHyenaHost()
{
return hyenaHost;
}

public boolean getStreamingEnabled()
{
return streamingEnabled;
}

public long getStreamingRecordsLimit()
{
return streamingRecordsLimit;
}

public long getStreamingRecordsThreshold()
{
return streamingRecordsThreshold;
}

@Config("hyena.url")
@ConfigDescription("Hyena host address")
public HyenaConfig setHyenaHost(String hyenaHost)
{
this.hyenaHost = hyenaHost;
return this;
}

@Config("hyena." + STREAMING_ENABLED)
@ConfigDescription(STREAMING_ENABLED_DESC)
public HyenaConfig setStreamingEnabled(Boolean streamingEnabled)
{
this.streamingEnabled = streamingEnabled;
return this;
}

@Config("hyena." + STREAMING_RECORDS_LIMIT)
@ConfigDescription(STREAMING_RECORDS_LIMIT_DESC)
public HyenaConfig setStreamingRecordsLimit(Long limit)
{
this.streamingRecordsLimit = limit;
return this;
}

@Config("hyena." + STREAMING_RECORDS_THRESHOLD)
@ConfigDescription(STREAMING_RECORDS_THRESHOLD_DESC)
public HyenaConfig setStreamingRecordsThreshold(Long threshold)
{
this.streamingRecordsThreshold = threshold;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -1,24 +1,64 @@
package co.llective.presto.hyena;

import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.session.PropertyMetadata;
import com.google.common.collect.ImmutableList;

import javax.inject.Inject;

import java.util.List;

import static co.llective.presto.hyena.HyenaConfig.STREAMING_ENABLED;
import static co.llective.presto.hyena.HyenaConfig.STREAMING_ENABLED_DESC;
import static co.llective.presto.hyena.HyenaConfig.STREAMING_RECORDS_LIMIT;
import static co.llective.presto.hyena.HyenaConfig.STREAMING_RECORDS_LIMIT_DESC;
import static co.llective.presto.hyena.HyenaConfig.STREAMING_RECORDS_THRESHOLD;
import static co.llective.presto.hyena.HyenaConfig.STREAMING_RECORDS_THRESHOLD_DESC;
import static com.facebook.presto.spi.session.PropertyMetadata.booleanSessionProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.longSessionProperty;

public class HyenaConnectorSessionProperties
{
private final List<PropertyMetadata<?>> sessionProperties;

@Inject
public HyenaConnectorSessionProperties(HyenaConfig hyenaConfig)
{
sessionProperties = ImmutableList.of();
sessionProperties = ImmutableList.of(
booleanSessionProperty(
STREAMING_ENABLED,
STREAMING_ENABLED_DESC,
hyenaConfig.getStreamingEnabled(),
false),
longSessionProperty(
STREAMING_RECORDS_LIMIT,
STREAMING_RECORDS_LIMIT_DESC,
hyenaConfig.getStreamingRecordsLimit(),
false),
longSessionProperty(
STREAMING_RECORDS_THRESHOLD,
STREAMING_RECORDS_THRESHOLD_DESC,
hyenaConfig.getStreamingRecordsThreshold(),
false));
}

public List<PropertyMetadata<?>> getSessionProperties()
{
return sessionProperties;
}

public static boolean getStreamingEnabled(ConnectorSession session)
{
return session.getProperty(STREAMING_ENABLED, Boolean.class);
}

public static long getStreamingRecordsLimit(ConnectorSession session)
{
return session.getProperty(STREAMING_RECORDS_LIMIT, Long.class);
}

public static long getStreamingRecordsThreshold(ConnectorSession session)
{
return session.getProperty(STREAMING_RECORDS_THRESHOLD, Long.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import co.llective.hyena.api.ScanOrFilters;
import co.llective.hyena.api.ScanRequest;
import co.llective.hyena.api.ScanResult;
import co.llective.hyena.api.StreamConfig;
import co.llective.presto.hyena.util.TimeBoundaries;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.RecordCursor;
Expand All @@ -38,6 +39,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
Expand All @@ -47,17 +49,22 @@ public class HyenaRecordCursor
{
private static final Logger log = Logger.get(HyenaRecordCursor.class);

private final Long streamingLimit;
private final Long streamingThreshold;
private final Boolean streamingEnabled;

private final List<HyenaColumnHandle> columns;
private final ScanResult slicedResult;
private int rowPosition = -1; // presto first advances next row and then fetch data
private final int rowCount;
private ScanResult slicedResult;
private final HyenaSession hyenaSession;
private ScanRequest scanRequest;
@VisibleForTesting AtomicBoolean endOfScan = new AtomicBoolean(false);
@VisibleForTesting int rowPosition = -1; // presto first advances next row and then fetch data
@VisibleForTesting int rowCount;

private Map<Integer, ColumnValues> fieldsToColumns = new HashMap<>();

private long constructorStartMs;
private long constructorFinishMs;
private long scanStart;
private long scanFinish;
private long iteratingStartNs;

public HyenaRecordCursor(HyenaSession hyenaSession, ConnectorSession connectorSession, List<HyenaColumnHandle> columns, TupleDomain<HyenaColumnHandle> predicate)
Expand All @@ -68,8 +75,24 @@ public HyenaRecordCursor(HyenaSession hyenaSession, ConnectorSession connectorSe
public HyenaRecordCursor(HyenaPredicatesUtil predicateHandler, HyenaSession hyenaSession, ConnectorSession connectorSession, List<HyenaColumnHandle> columns, TupleDomain<HyenaColumnHandle> predicate)
{
constructorStartMs = System.currentTimeMillis();
this.hyenaSession = hyenaSession;
this.columns = requireNonNull(columns, "columns is null");
this.streamingEnabled = HyenaConnectorSessionProperties.getStreamingEnabled(connectorSession);
this.streamingLimit = HyenaConnectorSessionProperties.getStreamingRecordsLimit(connectorSession);
this.streamingThreshold = HyenaConnectorSessionProperties.getStreamingRecordsThreshold(connectorSession);

this.scanRequest = buildScanRequest(predicateHandler, columns, predicate);

log.info("Filters: " + StringUtils.join(this.scanRequest.getFilters(), ", "));

//TODO: Remove when hyena will fully support source_id
remapSourceIdFilter(this.scanRequest);

constructorFinishMs = System.currentTimeMillis();
}

private ScanRequest buildScanRequest(HyenaPredicatesUtil predicateHandler, List<HyenaColumnHandle> columns, TupleDomain<HyenaColumnHandle> predicate)
{
ScanRequest req = new ScanRequest();
req.setProjection(new ArrayList<>());
req.setFilters(new ScanOrFilters());
Expand All @@ -91,22 +114,39 @@ public HyenaRecordCursor(HyenaPredicatesUtil predicateHandler, HyenaSession hyen
ScanOrFilters filters = predicateHandler.predicateToFilters(predicate);
req.getFilters().addAll(filters);

log.info("Filters: " + StringUtils.join(req.getFilters(), ", "));
if (streamingEnabled) {
StreamConfig scanConfig = new StreamConfig(streamingLimit, streamingThreshold, Optional.empty());
req.setScanConfig(Optional.of(scanConfig));
}
else {
req.setScanConfig(Optional.empty());
}

//TODO: Remove when hyena will fully support source_id
remapSourceIdFilter(req);
return req;
}

scanStart = System.currentTimeMillis();
slicedResult = hyenaSession.scan(req);
scanFinish = System.currentTimeMillis();
rowCount = getRowCount(slicedResult);
/**
* Fetches records from database.
* If there are 0 records in next chunk it tries until there will be results or it is the end of the scan.
*/
@VisibleForTesting void fetchRecordsFromDb()
{
do {
long scanStart = System.currentTimeMillis();
slicedResult = hyenaSession.scan(scanRequest);
long scanFinish = System.currentTimeMillis();
log.debug("Scan + deserialization time: " + (scanFinish - scanStart) + "ms");
rowCount = getRowCount(slicedResult);
log.debug("Received " + rowCount + " records");
endOfScan.set(!slicedResult.getStreamState().isPresent());
if (scanRequest.getScanConfig().isPresent() && slicedResult.getStreamState().isPresent()) {
scanRequest.getScanConfig().get().setStreamState(slicedResult.getStreamState());
}
} while (rowCount == 0 && !endOfScan.get());
prepareSliceMappings();

log.info("Received " + rowCount + " records");
constructorFinishMs = System.currentTimeMillis();
}

private void prepareSliceMappings()
@VisibleForTesting void prepareSliceMappings()
{
for (int field = 0; field < columns.size(); field++) {
long columnId = columns.get(field).getOrdinalPosition();
Expand Down Expand Up @@ -206,10 +246,26 @@ public Type getType(int field)
@Override
public boolean advanceNextPosition()
{
if (rowPosition == -1) {
iteratingStartNs = System.nanoTime();
if (++rowPosition < rowCount) {
return true;
}
else {
if (rowCount != 0) {
long iteratingEndNs = System.nanoTime();
log.debug("Iterated through " + rowPosition + " rows, " + (rowPosition == 0 ? 0 : ((iteratingEndNs - iteratingStartNs) / rowPosition)) + "ns per row");
}
if (endOfScan.get()) {
log.debug("No more records in db. Finishing.");
return false;
}
else {
log.debug("Cursor needs more data. Scanning again.");
fetchRecordsFromDb();
rowPosition = 0;
iteratingStartNs = System.nanoTime();
return rowCount != 0;
}
}
return ++rowPosition < rowCount;
}

@Override
Expand Down Expand Up @@ -275,12 +331,8 @@ private ColumnValues getColumn(int field)
public void close()
{
long closeTimeMs = System.currentTimeMillis();
long iteratingTimeNs = (System.nanoTime() - iteratingStartNs);
log.warn("Scan + deserialization time: " + (scanFinish - scanStart) + "ms");
log.warn("Constructor time: " + (constructorFinishMs - constructorStartMs) + "ms");
log.warn("Iterating time: " + (iteratingTimeNs / 1000000) + "ms");
log.warn("Iterated through " + rowPosition + " rows, " + (rowPosition == 0 ? 0 : (iteratingTimeNs / rowPosition)) + "ns per row");
log.warn("Whole cursor job: " + (closeTimeMs - constructorStartMs) + "ms");
log.debug("Constructor time: " + (constructorFinishMs - constructorStartMs) + "ms");
log.debug("Whole cursor job: " + (closeTimeMs - constructorStartMs) + "ms");
//TODO: cancel query in hyenaAPI (send abort request with requestID)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ public class NativeHyenaSession

public NativeHyenaSession(HyenaConfig config)
{
hyenaApi = new HyenaApi(config.getHyenaHost());
hyenaApi = (new HyenaApi.Builder())
.address(config.getHyenaHost())
.build();
}

@Override
Expand Down
Loading

0 comments on commit f7701a7

Please sign in to comment.