From 3b8d20f27e4789f67fc0be8d8cd06abc88cd21c5 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Tue, 1 Oct 2019 11:38:30 +0100 Subject: [PATCH 1/4] [fix] fixes APPLY / SORTBY / GROUPBY / REDUCE order on FT.AGGREGATE support. --- .circleci/config.yml | 8 +++- redisearch/aggregation.py | 84 ++++++++++++++++++++++++++------------- test/test_builder.py | 28 +++++++++---- 3 files changed, 83 insertions(+), 37 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 321d947..6bae071 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -77,7 +77,13 @@ jobs: name: run tests command: | . venv/bin/activate - REDIS_PORT=6379 python test/test.py + REDIS_PORT=6379 python test/test.py + + - run: + name: run query builder tests + command: | + . venv/bin/activate + python test/test.py # no need for store_artifacts on nightly builds diff --git a/redisearch/aggregation.py b/redisearch/aggregation.py index b767317..efbdee5 100644 --- a/redisearch/aggregation.py +++ b/redisearch/aggregation.py @@ -99,7 +99,7 @@ def __init__(self, fields, reducers): self.limit = Limit() def build_args(self): - ret = [str(len(self.fields))] + ret = ['GROUPBY', str(len(self.fields))] ret.extend(self.fields) for reducer in self.reducers: ret += ['REDUCE', reducer.NAME, str(len(reducer.args))] @@ -108,6 +108,49 @@ def build_args(self): ret += ['AS', reducer._alias] return ret +class Projection(object): + """ + This object automatically created in the `AggregateRequest.apply()` + """ + + def __init__(self, alias, projector): + + self.alias = alias + self.projector = projector + + def build_args(self): + ret = ['APPLY', self.projector] + if self.alias: + ret += ['AS', self.alias] + + return ret + +class SortBy(object): + """ + This object automatically created in the `AggregateRequest.sort_by()` + """ + + def __init__(self, fields, max): + self.fields = fields + self.max = max + + + + def build_args(self): + fields_args = [] + for f in self.fields: + if isinstance(f, SortDirection): + fields_args += [f.field, f.DIRSTRING] + else: + fields_args += [f] + + ret = ['SORTBY', str(len(fields_args))] + ret.extend(fields_args) + if self.max > 0: + ret += ['MAX', str(self.max)] + + return ret + class AggregateRequest(object): """ @@ -127,11 +170,9 @@ def __init__(self, query='*'): return the object itself, making them useful for chaining. """ self._query = query - self._groups = [] - self._projections = [] + self._aggregateplan = [] self._loadfields = [] self._limit = Limit() - self._sortby = [] self._max = 0 self._with_schema = False self._verbatim = False @@ -162,7 +203,7 @@ def group_by(self, fields, *reducers): `aggregation` module. """ group = Group(fields, reducers) - self._groups.append(group) + self._aggregateplan.extend(group.build_args()) return self @@ -177,7 +218,8 @@ def apply(self, **kwexpr): expression itself, for example `apply(square_root="sqrt(@foo)")` """ for alias, expr in kwexpr.items(): - self._projections.append([alias, expr]) + projection = Projection(alias, expr) + self._aggregateplan.extend(projection.build_args()) return self @@ -224,10 +266,7 @@ def limit(self, offset, num): """ limit = Limit(offset, num) - if self._groups: - self._groups[-1].limit = limit - else: - self._limit = limit + self._limit = limit return self def sort_by(self, *fields, **kwargs): @@ -258,14 +297,13 @@ def sort_by(self, *fields, **kwargs): .sort_by(Desc('@paid'), max=10) ``` """ - self._max = kwargs.get('max', 0) if isinstance(fields, (string_types, SortDirection)): fields = [fields] - for f in fields: - if isinstance(f, SortDirection): - self._sortby += [f.field, f.DIRSTRING] - else: - self._sortby.append(f) + + max = kwargs.get('max', 0) + sortby = SortBy(fields, max) + + self._aggregateplan.extend(sortby.build_args()) return self def with_schema(self): @@ -312,18 +350,8 @@ def build_args(self): ret.append('LOAD') ret.append(str(len(self._loadfields))) ret.extend(self._loadfields) - for group in self._groups: - ret += ['GROUPBY'] + group.build_args() + group.limit.build_args() - for alias, projector in self._projections: - ret += ['APPLY', projector] - if alias: - ret += ['AS', alias] - - if self._sortby: - ret += ['SORTBY', str(len(self._sortby))] - ret += self._sortby - if self._max: - ret += ['MAX', str(self._max)] + + ret.extend(self._aggregateplan) ret += self._limit.build_args() diff --git a/test/test_builder.py b/test/test_builder.py index b621890..75badc0 100644 --- a/test/test_builder.py +++ b/test/test_builder.py @@ -1,9 +1,9 @@ -from unittest import TestCase +import unittest import redisearch.aggregation as a import redisearch.querystring as q import redisearch.reducers as r -class QueryBuilderTest(TestCase): +class QueryBuilderTest(unittest.TestCase): def testBetween(self): b = q.between(1, 10) self.assertEqual('[1 10]', str(b)) @@ -42,16 +42,16 @@ def testGroup(self): # Single field, single reducer g = a.Group('foo', r.count()) ret = g.build_args() - self.assertEqual(['1', 'foo', 'REDUCE', 'COUNT', '0'], ret) + self.assertEqual(['GROUPBY', '1', 'foo', 'REDUCE', 'COUNT', '0'], ret) # Multiple fields, single reducer g = a.Group(['foo', 'bar'], r.count()) - self.assertEqual(['2', 'foo', 'bar', 'REDUCE', 'COUNT', '0'], + self.assertEqual(['GROUPBY', '2', 'foo', 'bar', 'REDUCE', 'COUNT', '0'], g.build_args()) # Multiple fields, multiple reducers g = a.Group(['foo', 'bar'], [r.count(), r.count_distinct('@fld1')]) - self.assertEqual(['2', 'foo', 'bar', 'REDUCE', 'COUNT', '0', 'REDUCE', 'COUNT_DISTINCT', '1', '@fld1'], + self.assertEqual(['GROUPBY', '2', 'foo', 'bar', 'REDUCE', 'COUNT', '0', 'REDUCE', 'COUNT_DISTINCT', '1', '@fld1'], g.build_args()) def testAggRequest(self): @@ -63,12 +63,20 @@ def testAggRequest(self): self.assertEqual(['*', 'GROUPBY', '1', '@foo', 'REDUCE', 'COUNT', '0'], req.build_args()) # Test with limit - req = a.AggregateRequest().\ - group_by('@foo', r.count()).\ + req = a.AggregateRequest(). \ + group_by('@foo', r.count()). \ sort_by('@foo') self.assertEqual(['*', 'GROUPBY', '1', '@foo', 'REDUCE', 'COUNT', '0', 'SORTBY', '1', '@foo'], req.build_args()) + # Test with apply + req = a.AggregateRequest(). \ + apply(foo="@bar / 2"). \ + group_by('@foo', r.count()) + + self.assertEqual(['*', 'APPLY', '@bar / 2', 'AS', 'foo', 'GROUPBY', '1', '@foo', 'REDUCE', 'COUNT', '0'], + req.build_args()) + # Test with sort_by req = a.AggregateRequest().group_by('@foo', r.count()).sort_by('@date') # print req.build_args() @@ -105,4 +113,8 @@ def test_reducers(self): self.assertEqual(('f1', 'BY', 'f2', 'ASC'), r.first_value('f1', a.Asc('f2')).args) self.assertEqual(('f1', 'BY', 'f1', 'ASC'), r.first_value('f1', a.Asc).args) - self.assertEqual(('f1', '50'), r.random_sample('f1', 50).args) \ No newline at end of file + self.assertEqual(('f1', '50'), r.random_sample('f1', 50).args) + +if __name__ == '__main__': + + unittest.main() \ No newline at end of file From 955100788593f5c0fe0efc7eadfdb3edcbf184b8 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Thu, 3 Oct 2019 15:10:08 +0100 Subject: [PATCH 2/4] [add] setted sorby max default to 0. [add] set apply alias as not mandatory --- redisearch/aggregation.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/redisearch/aggregation.py b/redisearch/aggregation.py index efbdee5..1b4bf35 100644 --- a/redisearch/aggregation.py +++ b/redisearch/aggregation.py @@ -113,14 +113,14 @@ class Projection(object): This object automatically created in the `AggregateRequest.apply()` """ - def __init__(self, alias, projector): + def __init__(self, projector, alias=None ): self.alias = alias self.projector = projector def build_args(self): ret = ['APPLY', self.projector] - if self.alias: + if self.alias is not None: ret += ['AS', self.alias] return ret @@ -130,7 +130,7 @@ class SortBy(object): This object automatically created in the `AggregateRequest.sort_by()` """ - def __init__(self, fields, max): + def __init__(self, fields, max=0): self.fields = fields self.max = max From 7041f4e4f84c70efcaefdc91a802bdd6f59ed89a Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Thu, 3 Oct 2019 15:41:40 +0100 Subject: [PATCH 3/4] [add] added supported for filter expressions on aggregations. include more examples on test_builder.py --- redisearch/aggregation.py | 25 +++++++++++++++++++++++-- test/test_builder.py | 8 ++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/redisearch/aggregation.py b/redisearch/aggregation.py index 1b4bf35..faec3e1 100644 --- a/redisearch/aggregation.py +++ b/redisearch/aggregation.py @@ -104,7 +104,7 @@ def build_args(self): for reducer in self.reducers: ret += ['REDUCE', reducer.NAME, str(len(reducer.args))] ret.extend(reducer.args) - if reducer._alias: + if reducer._alias is not None: ret += ['AS', reducer._alias] return ret @@ -172,6 +172,7 @@ def __init__(self, query='*'): self._query = query self._aggregateplan = [] self._loadfields = [] + self._filters = [] self._limit = Limit() self._max = 0 self._with_schema = False @@ -218,7 +219,7 @@ def apply(self, **kwexpr): expression itself, for example `apply(square_root="sqrt(@foo)")` """ for alias, expr in kwexpr.items(): - projection = Projection(alias, expr) + projection = Projection(expr, alias ) self._aggregateplan.extend(projection.build_args()) return self @@ -306,6 +307,22 @@ def sort_by(self, *fields, **kwargs): self._aggregateplan.extend(sortby.build_args()) return self + def filter(self, expressions): + """ + Specify filter for post-query results using predicates relating to values in the result set. + + ### Parameters + + - **fields**: Fields to group by. This can either be a single string, + or a list of strings. + """ + if isinstance(expressions, (string_types)): + expressions = [expressions] + + self._filters.extend(expressions) + + return self + def with_schema(self): """ If set, the `schema` property will contain a list of `[field, type]` @@ -353,6 +370,10 @@ def build_args(self): ret.extend(self._aggregateplan) + for expr in self._filters: + ret.append('FILTER') + ret.append(expr) + ret += self._limit.build_args() return ret diff --git a/test/test_builder.py b/test/test_builder.py index 75badc0..5253d99 100644 --- a/test/test_builder.py +++ b/test/test_builder.py @@ -62,6 +62,10 @@ def testAggRequest(self): req = a.AggregateRequest().group_by('@foo', r.count()) self.assertEqual(['*', 'GROUPBY', '1', '@foo', 'REDUCE', 'COUNT', '0'], req.build_args()) + # Test with group_by and alias on reducer + req = a.AggregateRequest().group_by('@foo', r.count().alias('foo_count')) + self.assertEqual(['*', 'GROUPBY', '1', '@foo', 'REDUCE', 'COUNT', '0', 'AS', 'foo_count'], req.build_args()) + # Test with limit req = a.AggregateRequest(). \ group_by('@foo', r.count()). \ @@ -77,6 +81,10 @@ def testAggRequest(self): self.assertEqual(['*', 'APPLY', '@bar / 2', 'AS', 'foo', 'GROUPBY', '1', '@foo', 'REDUCE', 'COUNT', '0'], req.build_args()) + # Test with filter + req = a.AggregateRequest().group_by('@foo', r.count()).filter( "@foo=='bar'") + self.assertEqual(['*', 'GROUPBY', '1', '@foo', 'REDUCE', 'COUNT', '0', 'FILTER', "@foo=='bar'" ], req.build_args()) + # Test with sort_by req = a.AggregateRequest().group_by('@foo', r.count()).sort_by('@date') # print req.build_args() From da4fa4c71591f1451e0be52125a1e3c845504a1a Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Sun, 27 Oct 2019 18:33:02 +0000 Subject: [PATCH 4/4] [fix] corrected FT.AGGREGATE filter expressions to relate to the current state of the pipeline --- redisearch/aggregation.py | 10 ++++------ test/test_builder.py | 9 +++++++++ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/redisearch/aggregation.py b/redisearch/aggregation.py index faec3e1..16e5eab 100644 --- a/redisearch/aggregation.py +++ b/redisearch/aggregation.py @@ -172,7 +172,6 @@ def __init__(self, query='*'): self._query = query self._aggregateplan = [] self._loadfields = [] - self._filters = [] self._limit = Limit() self._max = 0 self._with_schema = False @@ -319,10 +318,13 @@ def filter(self, expressions): if isinstance(expressions, (string_types)): expressions = [expressions] - self._filters.extend(expressions) + for expression in expressions: + self._aggregateplan.extend(['FILTER', expression]) return self + + def with_schema(self): """ If set, the `schema` property will contain a list of `[field, type]` @@ -370,10 +372,6 @@ def build_args(self): ret.extend(self._aggregateplan) - for expr in self._filters: - ret.append('FILTER') - ret.append(expr) - ret += self._limit.build_args() return ret diff --git a/test/test_builder.py b/test/test_builder.py index 5253d99..1936a81 100644 --- a/test/test_builder.py +++ b/test/test_builder.py @@ -85,6 +85,15 @@ def testAggRequest(self): req = a.AggregateRequest().group_by('@foo', r.count()).filter( "@foo=='bar'") self.assertEqual(['*', 'GROUPBY', '1', '@foo', 'REDUCE', 'COUNT', '0', 'FILTER', "@foo=='bar'" ], req.build_args()) + # Test with filter on different state of the pipeline + req = a.AggregateRequest().filter("@foo=='bar'").group_by('@foo', r.count()) + self.assertEqual(['*', 'FILTER', "@foo=='bar'", 'GROUPBY', '1', '@foo','REDUCE', 'COUNT', '0' ], req.build_args()) + + # Test with filter on different state of the pipeline + req = a.AggregateRequest().filter(["@foo=='bar'","@foo2=='bar2'"]).group_by('@foo', r.count()) + self.assertEqual(['*', 'FILTER', "@foo=='bar'", 'FILTER', "@foo2=='bar2'", 'GROUPBY', '1', '@foo', 'REDUCE', 'COUNT', '0'], + req.build_args()) + # Test with sort_by req = a.AggregateRequest().group_by('@foo', r.count()).sort_by('@date') # print req.build_args()