Skip to content

Commit

Permalink
Conditionally modify the context object instead of conditionally
Browse files Browse the repository at this point in the history
returning early.
  • Loading branch information
Jeremy Phelps committed Jan 19, 2018
1 parent 13012b9 commit 9e64810
Showing 1 changed file with 39 additions and 168 deletions.
207 changes: 39 additions & 168 deletions pydruid/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def _prepare_url_headers_and_body(self, query):
username_password = \
b64encode(bytes('{}:{}'.format(self.username, self.password)))
headers['Authorization'] = 'Basic {}'.format(username_password)

return headers, querystr, url

def _post(self, query):
Expand All @@ -71,10 +71,8 @@ def _post(self, query):

def topn(self, **kwargs):
"""
A TopN query returns a set of the values in a given dimension,
sorted by a specified metric. Conceptually, a topN can be
thought of as an approximate GroupByQuery over a single
dimension with an Ordering spec. TopNs are
A TopN query returns a set of the values in a given dimension, sorted by a specified metric. Conceptually, a
topN can be thought of as an approximate GroupByQuery over a single dimension with an Ordering spec. TopNs are
faster and more resource efficient than GroupBy for this use case.
Required key/value pairs:
Expand All @@ -83,8 +81,7 @@ def topn(self, **kwargs):
:param str granularity: Aggregate data by hour, day, minute, etc.,
:param intervals: ISO-8601 intervals of data to query
:type intervals: str or list
:param dict aggregations: A map from aggregator name to one of
the pydruid.utils.aggregators e.g., doublesum
:param dict aggregations: A map from aggregator name to one of the pydruid.utils.aggregators e.g., doublesum
:param str dimension: Dimension to run the query against
:param str metric: Metric over which to sort the specified dimension by
:param int threshold: How many of the top items to return
Expand All @@ -94,10 +91,8 @@ def topn(self, **kwargs):
Optional key/value pairs:
:param pydruid.utils.filters.Filter filter: Indicates which rows
of data to include in the query
:param post_aggregations: A dict with string key = 'post_aggregator_name',
and value pydruid.utils.PostAggregator
:param pydruid.utils.filters.Filter filter: Indicates which rows of data to include in the query
:param post_aggregations: A dict with string key = 'post_aggregator_name', and value pydruid.utils.PostAggregator
:param dict context: A dict of query context options
Example:
Expand All @@ -117,35 +112,30 @@ def topn(self, **kwargs):
context={"timeout": 1000}
)
>>> print top
>>> [{'timestamp': '2013-06-14T00:00:00.000Z',
'result': [{'count': 22.0, 'user': "cool_user"}}]}]
>>> [{'timestamp': '2013-06-14T00:00:00.000Z', 'result': [{'count': 22.0, 'user': "cool_user"}}]}]
"""
query = self.query_builder.topn(kwargs)
return self._post(query)

def timeseries(self, **kwargs):
"""
A timeseries query returns the values of the requested metrics (in aggregate)
for each timestamp.
A timeseries query returns the values of the requested metrics (in aggregate) for each timestamp.
Required key/value pairs:
:param str datasource: Data source to query
:param str granularity: Time bucket to aggregate data by hour, day, minute, etc.,
:param intervals: ISO-8601 intervals for which to run the query on
:type intervals: str or list
:param dict aggregations: A map from aggregator name to one of the
``pydruid.utils.aggregators`` e.g., ``doublesum``
:param dict aggregations: A map from aggregator name to one of the pydruid.utils.aggregators e.g., doublesum
:return: The query result
:rtype: Query
Optional key/value pairs:
:param pydruid.utils.filters.Filter filter: Indicates which rows of
data to include in the query
:param post_aggregations: A dict with string key =
'post_aggregator_name', and value pydruid.utils.PostAggregator
:param pydruid.utils.filters.Filter filter: Indicates which rows of data to include in the query
:param post_aggregations: A dict with string key = 'post_aggregator_name', and value pydruid.utils.PostAggregator
:param dict context: A dict of query context options
Example:
Expand All @@ -157,49 +147,39 @@ def timeseries(self, **kwargs):
datasource=twitterstream,
granularity='hour',
intervals='2013-06-14/pt1h',
aggregations=\
{"count": doublesum("count"), "rows": count("rows")},
post_aggregations=\
{'percent': (Field('count') / Field('rows')) * Const(100))},
aggregations={"count": doublesum("count"), "rows": count("rows")},
post_aggregations={'percent': (Field('count') / Field('rows')) * Const(100))},
context={"timeout": 1000}
)
>>> print counts
>>> [{'timestamp': '2013-06-14T00:00:00.000Z',
'result': {'count': 9619.0, 'rows': 8007,
'percent': 120.13238416385663}}]
>>> [{'timestamp': '2013-06-14T00:00:00.000Z', 'result': {'count': 9619.0, 'rows': 8007, 'percent': 120.13238416385663}}]
"""
query = self.query_builder.timeseries(kwargs)
return self._post(query)

def groupby(self, **kwargs):
"""
A group-by query groups a results set (the requested aggregate
metrics) by the specified dimension(s).
A group-by query groups a results set (the requested aggregate metrics) by the specified dimension(s).
Required key/value pairs:
:param str datasource: Data source to query
:param str granularity: Time bucket to aggregate data by hour, day, minute, etc.,
:param intervals: ISO-8601 intervals for which to run the query on
:type intervals: str or list
:param dict aggregations: A map from aggregator name to one of the
``pydruid.utils.aggregators`` e.g., ``doublesum``
:param dict aggregations: A map from aggregator name to one of the pydruid.utils.aggregators e.g., doublesum
:param list dimensions: The dimensions to group by
:return: The query result
:rtype: Query
Optional key/value pairs:
:param pydruid.utils.filters.Filter filter: Indicates which rows of
data to include in the query
:param pydruid.utils.having.Having having: Indicates which groups
in results set of query to keep
:param post_aggregations: A dict with string key = 'post_aggregator_name',
and value pydruid.utils.PostAggregator
:param pydruid.utils.filters.Filter filter: Indicates which rows of data to include in the query
:param pydruid.utils.having.Having having: Indicates which groups in results set of query to keep
:param post_aggregations: A dict with string key = 'post_aggregator_name', and value pydruid.utils.PostAggregator
:param dict context: A dict of query context options
:param dict limit_spec: A dict of parameters defining how to limit
the rows returned, as specified in the Druid api documentation
:param dict limit_spec: A dict of parameters defining how to limit the rows returned, as specified in the Druid api documentation
Example:
Expand All @@ -222,25 +202,8 @@ def groupby(self, **kwargs):
)
>>> for k in range(2):
... print group[k]
>>> {
'timestamp': '2013-10-04T00:00:00.000Z',
'version': 'v1',
'event': {
'count': 1.0,
'user_name': 'user_1',
'reply_to_name': 'user_2',
}
}
>>> {
'timestamp': '2013-10-04T00:00:00.000Z',
'version': 'v1',
'event': {
'count': 1.0,
'user_name': 'user_2',
'reply_to_name':
'user_3',
}
}
>>> {'timestamp': '2013-10-04T00:00:00.000Z', 'version': 'v1', 'event': {'count': 1.0, 'user_name': 'user_1', 'reply_to_name': 'user_2'}}
>>> {'timestamp': '2013-10-04T00:00:00.000Z', 'version': 'v1', 'event': {'count': 1.0, 'user_name': 'user_2', 'reply_to_name': 'user_3'}}
"""
query = self.query_builder.groupby(kwargs)
return self._post(query)
Expand Down Expand Up @@ -274,17 +237,11 @@ def segment_metadata(self, **kwargs):
.. code-block:: python
:linenos:
>>> meta = client.segment_metadata(
datasource='twitterstream', intervals = '2013-10-04/pt1h')
>>> meta = client.segment_metadata(datasource='twitterstream', intervals = '2013-10-04/pt1h')
>>> print meta[0].keys()
>>> ['intervals', 'id', 'columns', 'size']
>>> print meta[0]['columns']['tweet_length']
>>> {
'errorMessage': None,
'cardinality': None,
'type': 'FLOAT',
'size': 30908008,
}
>>> {'errorMessage': None, 'cardinality': None, 'type': 'FLOAT', 'size': 30908008}
"""
query = self.query_builder.segment_metadata(kwargs)
Expand Down Expand Up @@ -312,13 +269,7 @@ def time_boundary(self, **kwargs):
>>> bound = client.time_boundary(datasource='twitterstream')
>>> print bound
>>> [{
'timestamp': '2011-09-14T15:00:00.000Z',
'result': {
'minTime': '2011-09-14T15:00:00.000Z',
'maxTime': '2014-03-04T23:44:00.000Z',
}
}]
>>> [{'timestamp': '2011-09-14T15:00:00.000Z', 'result': {'minTime': '2011-09-14T15:00:00.000Z', 'maxTime': '2014-03-04T23:44:00.000Z'}}]
"""
query = self.query_builder.time_boundary(kwargs)
return self._post(query)
Expand All @@ -337,12 +288,9 @@ def select(self, **kwargs):
Optional key/value pairs:
:param pydruid.utils.filters.Filter filter: Indicates which rows of
data to include in the query
:param list dimensions: The list of dimensions to select. If left
empty, all dimensions are returned
:param list metrics: The list of metrics to select. If left empty,
all metrics are returned
:param pydruid.utils.filters.Filter filter: Indicates which rows of data to include in the query
:param list dimensions: The list of dimensions to select. If left empty, all dimensions are returned
:param list metrics: The list of metrics to select. If left empty, all metrics are returned
:param dict context: A dict of query context options
:return: The query result
Expand All @@ -360,22 +308,8 @@ def select(self, **kwargs):
paging_spec={'pagingIdentifies': {}, 'threshold': 1},
context={"timeout": 1000}
)
>>> print(raw_data)
>>> [{
'timestamp': '2013-06-14T00:00:00.000Z',
'result': {
'pagingIdentifiers': {
'twitterstream_...08:00:00.000Z_v1': 1,
'events': [{
'segmentId': 'twitterstr...000Z_v1',
'offset': 0,
'event': {
'timestamp': '2013-06-14T00:00:00.000Z',
'dim': 'value',
}
}]
}
}]
>>> print raw_data
>>> [{'timestamp': '2013-06-14T00:00:00.000Z', 'result': {'pagingIdentifiers': {'twitterstream_2013-06-14T00:00:00.000Z_2013-06-15T00:00:00.000Z_2013-06-15T08:00:00.000Z_v1': 1, 'events': [{'segmentId': 'twitterstream_2013-06-14T00:00:00.000Z_2013-06-15T00:00:00.000Z_2013-06-15T08:00:00.000Z_v1', 'offset': 0, 'event': {'timestamp': '2013-06-14T00:00:00.000Z', 'dim': 'value'}}]}}]
"""
query = self.query_builder.select(kwargs)
return self._post(query)
Expand All @@ -388,8 +322,7 @@ def export_tsv(self, dest_path):
Use Query.export_tsv() method instead.
"""
if self.query_builder.last_query is None:
raise AttributeError(
"There was no query executed by this client yet. Can't export!")
raise AttributeError("There was no query executed by this client yet. Can't export!")
else:
return self.query_builder.last_query.export_tsv(dest_path)

Expand All @@ -401,17 +334,15 @@ def export_pandas(self):
Use Query.export_pandas() method instead
"""
if self.query_builder.last_query is None:
raise AttributeError(
"There was no query executed by this client yet. Can't export!")
raise AttributeError("There was no query executed by this client yet. Can't export!")
else:
return self.query_builder.last_query.export_pandas()


class PyDruid(BaseDruidClient):
"""
PyDruid contains the functions for creating and executing Druid queries.
Returns Query objects that can be used for exporting query results
into TSV files or pandas.DataFrame objects for subsequent analysis.
PyDruid contains the functions for creating and executing Druid queries. Returns Query objects that can be used
for exporting query results into TSV files or pandas.DataFrame objects for subsequent analysis.
:param str url: URL of Broker node in the Druid cluster
:param str endpoint: Endpoint that Broker listens for queries on
Expand Down Expand Up @@ -460,18 +391,8 @@ class PyDruid(BaseDruidClient):
}
>>> print top.result
>>> [{
'timestamp': '2013-10-04T00:00:00.000Z',
'result': [
{
'count': 7.0,
'user_name': 'user_1',
},
{
'count': 6.0,
'user_name': 'user_2',
},
]}]
>>> [{'timestamp': '2013-10-04T00:00:00.000Z',
'result': [{'count': 7.0, 'user_name': 'user_1'}, {'count': 6.0, 'user_name': 'user_2'}]}]
>>> df = top.export_pandas()
>>> print df
Expand All @@ -484,10 +405,9 @@ def __init__(self, url, endpoint):

def ssl_context(self):
ctx = ssl.create_default_context()
if not self.ignore_certificate_errors:
return ctx
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
if self.ignore_certificate_errors:
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return ctx

def _post(self, query):
Expand All @@ -512,52 +432,3 @@ def _post(self, query):
else:
query.parse(data)
return query

def scan(self, **kwargs):
"""
A scan query returns raw Druid rows
Required key/value pairs:
:param str datasource: Data source to query
:param str granularity: Time bucket to aggregate data by hour, day, minute, etc.
:param int limit: The maximum number of rows to return
:param intervals: ISO-8601 intervals for which to run the query on
:type intervals: str or list
Optional key/value pairs:
:param pydruid.utils.filters.Filter filter: Indicates which rows of
data to include in the query
:param list dimensions: The list of dimensions to select. If left
empty, all dimensions are returned
:param list metrics: The list of metrics to select. If left empty,
all metrics are returned
:param dict context: A dict of query context options
:return: The query result
:rtype: Query
Example:
.. code-block:: python
:linenos:
>>> raw_data = client.scan(
datasource=twitterstream,
granularity='all',
intervals='2013-06-14/pt1h',
limit=1,
context={"timeout": 1000}
)
>>> print raw_data
>>> [{
u'segmentId': u'zzzz',
u'columns': [u'__time', 'status', 'region'],
'events': [{
u'status': u'ok', 'region': u'SF', u'__time': 1509494400000,
}]
}]
"""
query = self.query_builder.scan(kwargs)
return self._post(query)

0 comments on commit 9e64810

Please sign in to comment.