Skip to content
This repository has been archived by the owner on Jun 13, 2024. It is now read-only.

Source id support #37

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@
*/
package co.llective.presto.hyena;

import co.llective.hyena.api.BlockType;
import co.llective.hyena.api.ColumnValues;
import co.llective.hyena.api.ScanAndFilters;
import co.llective.hyena.api.ScanComparison;
import co.llective.hyena.api.ScanFilter;
import co.llective.hyena.api.ScanOrFilters;
import co.llective.hyena.api.ScanRequest;
import co.llective.hyena.api.ScanResult;
Expand Down Expand Up @@ -92,9 +88,6 @@ public HyenaRecordCursor(HyenaPredicatesUtil predicateHandler, HyenaSession hyen

log.info("Filters: " + StringUtils.join(req.getFilters(), ", "));

//TODO: Remove when hyena will fully support source_id
remapSourceIdFilter(req);

scanStart = System.currentTimeMillis();
slicedResult = hyenaSession.scan(req);
scanFinish = System.currentTimeMillis();
Expand All @@ -113,71 +106,14 @@ private void prepareSliceMappings()
}
}

private void remapSourceIdFilter(ScanRequest req)
{
Optional<Long> sourceIdPosition = columns.stream()
.filter(x -> x.getColumnName().equals("source_id"))
.findFirst()
.map(HyenaColumnHandle::getOrdinalPosition);

// if there isn't source_id in this query context it won't be in filters
if (!sourceIdPosition.isPresent()) {
return;
}

// if there is no or filters
if (req.getFilters().isEmpty()) {
return;
}

// if there are or filters but all are empty
boolean areAndFilters = false;
for (ScanAndFilters andFilters : req.getFilters()) {
if (!andFilters.isEmpty()) {
areAndFilters = true;
}
}
if (!areAndFilters) {
return;
}

// if there are filters but none of them is source_id one
if (req.getFilters().stream().allMatch(andFilters ->
andFilters.stream().allMatch(filter ->
filter.getColumn() != sourceIdPosition.get()))) {
return;
}

for (ScanAndFilters andFilters : req.getFilters()) {
if (andFilters.size() == 1) {
andFilters.removeIf(x -> x.getColumn() == sourceIdPosition.get());
// add tautology filter, u64 > 0
long timestampIndex = 0L;
if (!req.getProjection().contains(timestampIndex)) {
req.getProjection().add(timestampIndex);
}
andFilters.add(
new ScanFilter(0, ScanComparison.Gt, BlockType.U64Dense.mapToFilterType(), timestampIndex));
}
else {
andFilters.removeIf(x -> x.getColumn() == sourceIdPosition.get());
}
}
}

@VisibleForTesting
int getRowCount(ScanResult slicedResult)
{
if (slicedResult.getColumnMap().isEmpty()) {
return 0;
}

//TODO: source_id removal
Optional<Long> sourceIdPosition = columns.stream().filter(x -> x.getColumnName().equals("source_id")).findFirst().map(
HyenaColumnHandle::getOrdinalPosition);

return slicedResult.getColumnMap().entrySet().stream()
.filter(entry -> sourceIdPosition.map(sourceIdCol -> !entry.getKey().equals(sourceIdCol)).orElse(true))
.map(x -> x.getValue().getElementsCount())
.max(Comparator.naturalOrder())
.orElse(0);
Expand Down Expand Up @@ -220,11 +156,6 @@ public boolean getBoolean(int field)
@Override
public long getLong(int field)
{
// TODO: temporal workaround for not filled source_id by hyena (we only have packet_headers now)
if (columns.get(field).getColumnName().equals("source_id")) {
return 1L;
}

ColumnValues column = getColumn(field);

switch (column.getType()) {
Expand Down