Skip to content
This repository has been archived by the owner on Jun 13, 2024. It is now read-only.

Commit

Permalink
Removed source_id hacks
Browse files Browse the repository at this point in the history
  • Loading branch information
IC3Q committed Jun 21, 2018
1 parent e650a7b commit ff1df12
Showing 1 changed file with 0 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@
*/
package co.llective.presto.hyena;

import co.llective.hyena.api.BlockType;
import co.llective.hyena.api.ColumnValues;
import co.llective.hyena.api.ScanAndFilters;
import co.llective.hyena.api.ScanComparison;
import co.llective.hyena.api.ScanFilter;
import co.llective.hyena.api.ScanOrFilters;
import co.llective.hyena.api.ScanRequest;
import co.llective.hyena.api.ScanResult;
Expand Down Expand Up @@ -92,9 +88,6 @@ public HyenaRecordCursor(HyenaPredicatesUtil predicateHandler, HyenaSession hyen

log.info("Filters: " + StringUtils.join(req.getFilters(), ", "));

//TODO: Remove when hyena will fully support source_id
remapSourceIdFilter(req);

scanStart = System.currentTimeMillis();
slicedResult = hyenaSession.scan(req);
scanFinish = System.currentTimeMillis();
Expand All @@ -113,71 +106,14 @@ private void prepareSliceMappings()
}
}

private void remapSourceIdFilter(ScanRequest req)
{
Optional<Long> sourceIdPosition = columns.stream()
.filter(x -> x.getColumnName().equals("source_id"))
.findFirst()
.map(HyenaColumnHandle::getOrdinalPosition);

// if there isn't source_id in this query context it won't be in filters
if (!sourceIdPosition.isPresent()) {
return;
}

// if there is no or filters
if (req.getFilters().isEmpty()) {
return;
}

// if there are or filters but all are empty
boolean areAndFilters = false;
for (ScanAndFilters andFilters : req.getFilters()) {
if (!andFilters.isEmpty()) {
areAndFilters = true;
}
}
if (!areAndFilters) {
return;
}

// if there are filters but none of them is source_id one
if (req.getFilters().stream().allMatch(andFilters ->
andFilters.stream().allMatch(filter ->
filter.getColumn() != sourceIdPosition.get()))) {
return;
}

for (ScanAndFilters andFilters : req.getFilters()) {
if (andFilters.size() == 1) {
andFilters.removeIf(x -> x.getColumn() == sourceIdPosition.get());
// add tautology filter, u64 > 0
long timestampIndex = 0L;
if (!req.getProjection().contains(timestampIndex)) {
req.getProjection().add(timestampIndex);
}
andFilters.add(
new ScanFilter(0, ScanComparison.Gt, BlockType.U64Dense.mapToFilterType(), timestampIndex));
}
else {
andFilters.removeIf(x -> x.getColumn() == sourceIdPosition.get());
}
}
}

@VisibleForTesting
int getRowCount(ScanResult slicedResult)
{
if (slicedResult.getColumnMap().isEmpty()) {
return 0;
}

//TODO: source_id removal
Optional<Long> sourceIdPosition = columns.stream().filter(x -> x.getColumnName().equals("source_id")).findFirst().map(
HyenaColumnHandle::getOrdinalPosition);

return slicedResult.getColumnMap().entrySet().stream()
.filter(entry -> sourceIdPosition.map(sourceIdCol -> !entry.getKey().equals(sourceIdCol)).orElse(true))
.map(x -> x.getValue().getElementsCount())
.max(Comparator.naturalOrder())
.orElse(0);
Expand Down Expand Up @@ -220,11 +156,6 @@ public boolean getBoolean(int field)
@Override
public long getLong(int field)
{
// TODO: temporal workaround for not filled source_id by hyena (we only have packet_headers now)
if (columns.get(field).getColumnName().equals("source_id")) {
return 1L;
}

ColumnValues column = getColumn(field);

switch (column.getType()) {
Expand Down

0 comments on commit ff1df12

Please sign in to comment.