Skip to content

Commit

Permalink
Push the runtime filter from HashJoin down to SeqScan or AM.
Browse files Browse the repository at this point in the history
+----------+  AttrFilter   +------+  ScanKey   +------------+
| HashJoin | ------------> | Hash | ---------> | SeqScan/AM |
+----------+               +------+            +------------+

If "gp_enable_runtime_filter_pushdown" is on, three steps will be run:

Step 1. In ExecInitHashJoin(), try to find the mapper between the var in
        hashclauses and the var in SeqScan. If found we will save the mapper in
        AttrFilter and push them to Hash node;

Step 2. We will create the range/bloom filters in AttrFilter during building
        hash table, and these filters will be converted to the list of ScanKey
        and pushed down to Seqscan when the building finishes;

Step 3. If AM support SCAN_SUPPORT_RUNTIME_FILTER, these ScanKeys will be pushed
        down to the AM module further, otherwise will be used to filter slot in
        Seqscan;
  • Loading branch information
zhangyue-hashdata committed Nov 21, 2024
1 parent f92faf0 commit 980e2de
Show file tree
Hide file tree
Showing 11 changed files with 627 additions and 7 deletions.
162 changes: 161 additions & 1 deletion src/backend/executor/nodeHash.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable,
size_t size);
static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);

static void BuildRuntimeFilter(HashState *node, TupleTableSlot *slot);
static void PushdownRuntimeFilter(HashState *node);
static void FreeRuntimeFilter(HashState *node);
static void ResetRuntimeFilter(HashState *node);

/* ----------------------------------------------------------------
* ExecHash
Expand Down Expand Up @@ -193,7 +196,15 @@ MultiExecPrivateHash(HashState *node)
{
slot = ExecProcNode(outerNode);
if (TupIsNull(slot))
{
if (gp_enable_runtime_filter_pushdown && node->filters)
PushdownRuntimeFilter(node);
break;
}

if (gp_enable_runtime_filter_pushdown && node->filters)
BuildRuntimeFilter(node, slot);

/* We have to compute the hash value */
econtext->ecxt_outertuple = slot;
bool hashkeys_null = false;
Expand Down Expand Up @@ -334,7 +345,15 @@ MultiExecParallelHash(HashState *node)

slot = ExecProcNode(outerNode);
if (TupIsNull(slot))
{
if (gp_enable_runtime_filter_pushdown && node->filters)
PushdownRuntimeFilter(node);
break;
}

if (gp_enable_runtime_filter_pushdown && node->filters)
BuildRuntimeFilter(node, slot);

econtext->ecxt_outertuple = slot;
if (ExecHashGetHashValue(node, hashtable, econtext, hashkeys,
false, hashtable->keepNulls,
Expand Down Expand Up @@ -512,6 +531,9 @@ ExecEndHash(HashState *node)
*/
outerPlan = outerPlanState(node);
ExecEndNode(outerPlan);

if (gp_enable_runtime_filter_pushdown && node->filters)
FreeRuntimeFilter(node);
}


Expand Down Expand Up @@ -2520,6 +2542,9 @@ ExecReScanHash(HashState *node)
*/
if (node->ps.lefttree->chgParam == NULL)
ExecReScan(node->ps.lefttree);

if (gp_enable_runtime_filter_pushdown && node->filters)
ResetRuntimeFilter(node);
}


Expand Down Expand Up @@ -4126,3 +4151,138 @@ get_hash_mem(void)

return (int) mem_limit;
}

/*
* Convert AttrFilter to ScanKeyData and send these runtime filters to the
* target node(seqscan).
*/
void
PushdownRuntimeFilter(HashState *node)
{
ListCell *lc;
List *scankeys;
ScanKey sk;
AttrFilter *attr_filter;

foreach (lc, node->filters)
{
scankeys = NIL;

attr_filter = lfirst(lc);
if (!IsA(attr_filter->target, SeqScanState) || attr_filter->empty)
continue;

/* bloom filter */
sk = (ScanKey)palloc0(sizeof(ScanKeyData));
sk->sk_flags = SK_BLOOM_FILTER;
sk->sk_attno = attr_filter->lattno;
sk->sk_subtype = INT8OID;
sk->sk_argument = PointerGetDatum(attr_filter->blm_filter);
scankeys = lappend(scankeys, sk);

/* range filter */
sk = (ScanKey)palloc0(sizeof(ScanKeyData));
sk->sk_flags = 0;
sk->sk_attno = attr_filter->lattno;
sk->sk_strategy = BTGreaterEqualStrategyNumber;
sk->sk_subtype = INT8OID;
sk->sk_argument = attr_filter->min;
scankeys = lappend(scankeys, sk);

sk = (ScanKey)palloc0(sizeof(ScanKeyData));
sk->sk_flags = 0;
sk->sk_attno = attr_filter->lattno;
sk->sk_strategy = BTLessEqualStrategyNumber;
sk->sk_subtype = INT8OID;
sk->sk_argument = attr_filter->max;
scankeys = lappend(scankeys, sk);

/* append new runtime filters to target node */
SeqScanState *sss = castNode(SeqScanState, attr_filter->target);
sss->filters = list_concat(sss->filters, scankeys);
}
}

static void
BuildRuntimeFilter(HashState *node, TupleTableSlot *slot)
{
Datum val;
bool isnull;
ListCell *lc;
AttrFilter *attr_filter;

foreach (lc, node->filters)
{
attr_filter = (AttrFilter *) lfirst(lc);

val = slot_getattr(slot, attr_filter->rattno, &isnull);
if (isnull)
continue;

attr_filter->empty = false;

if ((int64_t)val < (int64_t)attr_filter->min)
attr_filter->min = val;

if ((int64_t)val > (int64_t)attr_filter->max)
attr_filter->max = val;

if (attr_filter->blm_filter)
bloom_add_element(attr_filter->blm_filter, (unsigned char *)&val, sizeof(Datum));
}
}

void
FreeRuntimeFilter(HashState *node)
{
ListCell *lc;
AttrFilter *attr_filter;

if (!node->filters)
return;

foreach (lc, node->filters)
{
attr_filter = lfirst(lc);
if (attr_filter->blm_filter)
bloom_free(attr_filter->blm_filter);
}

list_free_deep(node->filters);
node->filters = NIL;
}

void
ResetRuntimeFilter(HashState *node)
{
ListCell *lc;
AttrFilter *attr_filter;
SeqScanState *sss;

if (!node->filters)
return;

foreach (lc, node->filters)
{
attr_filter = lfirst(lc);
attr_filter->empty = true;

if (IsA(attr_filter->target, SeqScanState))
{
sss = castNode(SeqScanState, attr_filter->target);
if (sss->filters)
{
list_free_deep(sss->filters);
sss->filters = NIL;
}
}

if (attr_filter->blm_filter)
bloom_free(attr_filter->blm_filter);

attr_filter->blm_filter = bloom_create_aggresive(node->ps.plan->plan_rows,
work_mem, random());
attr_filter->min = LLONG_MAX;
attr_filter->max = LLONG_MIN;
}
}
Loading

0 comments on commit 980e2de

Please sign in to comment.