Skip to content

Commit

Permalink
Added extra_headers parameter; Added compressed data support
Browse files Browse the repository at this point in the history
  • Loading branch information
wjdecorte committed Aug 13, 2019
1 parent 80cf6fe commit e9974db
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 70 deletions.
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,20 @@ TABLES
GoodBye!
```


# Compressed Data

Consider using the new "extra_headers" optional parameter to send
"Accept-Encoding: gzip" and have Druid return the results compressed,
increasing the performance of the query especially for large data sets.

```python
from pydruid.client import PyDruid

query = PyDruid(druid_url_goes_here, 'druid/v2', extra_headers={"Accept-Encoding": "gzip"})
```


# Contributing

Contributions are welcomed of course. We like to use `black` and `flake8`.
Expand Down
35 changes: 31 additions & 4 deletions pydruid/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from __future__ import division
from __future__ import absolute_import

import sys
import json
import re

Expand All @@ -24,19 +25,35 @@
from pydruid.query import QueryBuilder
from base64 import b64encode

if sys.version_info.major == 2 and sys.version_info.minor == 7:
import StringIO
from gzip import GzipFile

def decompress(data):
infile = StringIO.StringIO()
infile.write(data)
with GzipFile(fileobj=infile, mode="r") as f:
f.rewind()
ud = f.read()
return ud


else:
from gzip import decompress

# extract error from the <PRE> tag inside the HTML response
HTML_ERROR = re.compile("<pre>\\s*(.*?)\\s*</pre>", re.IGNORECASE)


class BaseDruidClient(object):
def __init__(self, url, endpoint):
def __init__(self, url, endpoint, extra_headers=None):
self.url = url
self.endpoint = endpoint
self.query_builder = QueryBuilder()
self.username = None
self.password = None
self.proxies = None
self.extra_headers = extra_headers

def set_basic_auth_credentials(self, username, password):
self.username = username
Expand All @@ -55,6 +72,8 @@ def _prepare_url_headers_and_body(self, query):
else:
url = self.url + "/" + self.endpoint
headers = {"Content-Type": "application/json"}
if self.extra_headers and isinstance(self.extra_headers, dict):
headers.update(self.extra_headers)
if (self.username is not None) and (self.password is not None):
authstring = "{}:{}".format(self.username, self.password)
b64string = b64encode(authstring.encode()).decode()
Expand Down Expand Up @@ -542,15 +561,23 @@ class PyDruid(BaseDruidClient):
1 6 2013-10-04T00:00:00.000Z user_2
"""

def __init__(self, url, endpoint):
super(PyDruid, self).__init__(url, endpoint)
def __init__(self, url, endpoint, extra_headers=None):
super(PyDruid, self).__init__(url, endpoint, extra_headers)

def _post(self, query):
try:
headers, querystr, url = self._prepare_url_headers_and_body(query)
req = urllib.request.Request(url, querystr, headers)
res = urllib.request.urlopen(req)
data = res.read().decode("utf-8")
content_encoding = res.info().get("Content-Encoding")
if content_encoding == "gzip":
data = decompress(res.read()).decode("utf-8")
elif content_encoding:
raise ValueError(
"Invalid content encoding: {}".format(content_encoding)
)
else:
data = res.read().decode("utf-8")
res.close()
except urllib.error.HTTPError as e:
err = e.read()
Expand Down
204 changes: 138 additions & 66 deletions tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: UTF-8 -*-
import textwrap

import sys
import pytest
from mock import patch, Mock
from six.moves import urllib
Expand All @@ -11,29 +11,38 @@
from pydruid.utils.aggregators import doublesum
from pydruid.utils.filters import Dimension

if sys.version_info.major == 2 and sys.version_info.minor == 7:
from gzip import GzipFile

def compress(data):
out = StringIO()
with GzipFile(fileobj=out, mode="w") as f:
f.write(data)
return out.getvalue()


else:
from gzip import compress

def create_client():
return PyDruid("http://localhost:8083", "druid/v2/")

def create_client(headers=None):
return PyDruid("http://localhost:8083", "druid/v2/", headers)


def create_blank_query():
return Query({}, 'none')
return Query({}, "none")


def _http_error(code, msg, data = ''):
def _http_error(code, msg, data=""):
# Need a file-like object for the response data
fp = StringIO(data)
return urllib.error.HTTPError(
url='http://fakeurl:8080/druid/v2/',
hdrs={},
code=code,
msg=msg,
fp=fp,
url="http://fakeurl:8080/druid/v2/", hdrs={}, code=code, msg=msg, fp=fp
)


class TestPyDruid:
@patch('pydruid.client.urllib.request.urlopen')
@patch("pydruid.client.urllib.request.urlopen")
def test_druid_returns_error(self, mock_urlopen):
# given
mock_urlopen.side_effect = _http_error(500, "Druid error")
Expand All @@ -42,20 +51,22 @@ def test_druid_returns_error(self, mock_urlopen):
# when / then
with pytest.raises(IOError):
client.topn(
datasource="testdatasource",
granularity="all",
intervals="2015-12-29/pt1h",
aggregations={"count": doublesum("count")},
dimension="user_name",
metric="count",
filter=Dimension("user_lang") == "en",
threshold=1,
context={"timeout": 1000})

@patch('pydruid.client.urllib.request.urlopen')
datasource="testdatasource",
granularity="all",
intervals="2015-12-29/pt1h",
aggregations={"count": doublesum("count")},
dimension="user_name",
metric="count",
filter=Dimension("user_lang") == "en",
threshold=1,
context={"timeout": 1000},
)

@patch("pydruid.client.urllib.request.urlopen")
def test_druid_returns_html_error(self, mock_urlopen):
# given
message = textwrap.dedent("""
message = textwrap.dedent(
"""
<html>
<head>
<meta http-equiv="Content-Type" content="text/html;charset=ISO-8859-1"/>
Expand All @@ -68,26 +79,31 @@ def test_druid_returns_html_error(self, mock_urlopen):
<hr /><a href="http://eclipse.org/jetty">Powered by Jetty:// 9.3.19.v20170502</a><hr/>
</body>
</html>
""").strip()
mock_urlopen.side_effect = _http_error(500, 'Internal Server Error', message)
"""
).strip()
mock_urlopen.side_effect = _http_error(500, "Internal Server Error", message)
client = create_client()

# when / then
with pytest.raises(IOError) as e:
client.topn(
datasource="testdatasource",
granularity="all",
intervals="2015-12-29/pt1h",
aggregations={"count": doublesum("count")},
dimension="user_name",
metric="count",
filter=Dimension("user_lang") == "en",
threshold=1,
context={"timeout": 1000})

assert str(e.value) == textwrap.dedent("""
HTTP Error 500: Internal Server Error
Druid Error: javax.servlet.ServletException: java.lang.OutOfMemoryError: GC overhead limit exceeded
datasource="testdatasource",
granularity="all",
intervals="2015-12-29/pt1h",
aggregations={"count": doublesum("count")},
dimension="user_name",
metric="count",
filter=Dimension("user_lang") == "en",
threshold=1,
context={"timeout": 1000},
)

assert (
str(e.value)
== textwrap.dedent(
"""
HTTP Error 500: Internal Server Error
Druid Error: javax.servlet.ServletException: java.lang.OutOfMemoryError: GC overhead limit exceeded
Query is: {
"aggregations": [
{
Expand All @@ -112,9 +128,11 @@ def test_druid_returns_html_error(self, mock_urlopen):
"queryType": "topN",
"threshold": 1
}
""").strip()
"""
).strip()
)

@patch('pydruid.client.urllib.request.urlopen')
@patch("pydruid.client.urllib.request.urlopen")
def test_druid_returns_results(self, mock_urlopen):
# given
response = Mock()
Expand All @@ -126,28 +144,32 @@ def test_druid_returns_results(self, mock_urlopen):
"metric" : 100
} ]
} ]
""".encode("utf-8")
""".encode(
"utf-8"
)
response.info.return_value = {}
mock_urlopen.return_value = response
client = create_client()

# when
top = client.topn(
datasource="testdatasource",
granularity="all",
intervals="2015-12-29/pt1h",
aggregations={"count": doublesum("count")},
dimension="user_name",
metric="count",
filter=Dimension("user_lang") == "en",
threshold=1,
context={"timeout": 1000})
datasource="testdatasource",
granularity="all",
intervals="2015-12-29/pt1h",
aggregations={"count": doublesum("count")},
dimension="user_name",
metric="count",
filter=Dimension("user_lang") == "en",
threshold=1,
context={"timeout": 1000},
)

# then
assert top is not None
assert len(top.result) == 1
assert len(top.result[0]['result']) == 1
assert len(top.result[0]["result"]) == 1

@patch('pydruid.client.urllib.request.urlopen')
@patch("pydruid.client.urllib.request.urlopen")
def test_client_allows_to_export_last_query(self, mock_urlopen):
# given
response = Mock()
Expand All @@ -159,29 +181,79 @@ def test_client_allows_to_export_last_query(self, mock_urlopen):
"metric" : 100
} ]
} ]
""".encode("utf-8")
""".encode(
"utf-8"
)
response.info.return_value = {}
mock_urlopen.return_value = response
client = create_client()
client.topn(
datasource="testdatasource",
granularity="all",
intervals="2015-12-29/pt1h",
aggregations={"count": doublesum("count")},
dimension="user_name",
metric="count",
filter=Dimension("user_lang") == "en",
threshold=1,
context={"timeout": 1000})
datasource="testdatasource",
granularity="all",
intervals="2015-12-29/pt1h",
aggregations={"count": doublesum("count")},
dimension="user_name",
metric="count",
filter=Dimension("user_lang") == "en",
threshold=1,
context={"timeout": 1000},
)

# when / then
# assert that last_query.export_tsv method was called (it should throw an exception, given empty path)
# assert that last_query.export_tsv method was called (it should throw an
# exception, given empty path)
with pytest.raises(TypeError):
client.export_tsv(None)

@patch('pydruid.client.urllib.request.urlopen')
@patch("pydruid.client.urllib.request.urlopen")
def test_client_auth_creds(self, mock_urlopen):
client = create_client()
query = create_blank_query()
client.set_basic_auth_credentials('myUsername', 'myPassword')
client.set_basic_auth_credentials("myUsername", "myPassword")
headers, _, _ = client._prepare_url_headers_and_body(query)
assert headers["Authorization"] == "Basic bXlVc2VybmFtZTpteVBhc3N3b3Jk"

def test_client_allows_extra_headers(self):
client = create_client(headers={"Accept-Encoding": "gzip"})
query = create_blank_query()
headers, _, _ = client._prepare_url_headers_and_body(query)
assert headers['Authorization'] == "Basic bXlVc2VybmFtZTpteVBhc3N3b3Jk"
assert headers["Accept-Encoding"] == "gzip"

@patch("pydruid.client.urllib.request.urlopen")
def test_return_compressed_data(self, mock_urlopen):
# given
response = Mock()
response.read.return_value = compress(
"""
[ {
"timestamp" : "2015-12-30T14:14:49.000Z",
"result" : [ {
"dimension" : "aaaa",
"metric" : 100
} ]
} ]
""".encode(
"utf-8"
)
)
response.info.return_value = {"Content-Encoding": "gzip"}
mock_urlopen.return_value = response
client = create_client(headers={"Accept-Encoding": "gzip"})

# when
top = client.topn(
datasource="testdatasource",
granularity="all",
intervals="2015-12-29/pt1h",
aggregations={"count": doublesum("count")},
dimension="user_name",
metric="count",
filter=Dimension("user_lang") == "en",
threshold=1,
context={"timeout": 1000},
)

# then
assert top is not None
assert len(top.result) == 1
assert len(top.result[0]["result"]) == 1

0 comments on commit e9974db

Please sign in to comment.