Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update p8000 to 1.23.0 #586

Merged
merged 9 commits into from
Dec 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ geocoder = "~=1.33"
geojson = "~=2.4"
geomet = "~=0.2"
gunicorn = "~=20.1"
pg8000 = "==1.16.5"
pg8000 = "==1.23.0"
pickle-mixin = "==1.0.2"
pydantic = "~=1.8"
pymongo = "~=3.4"
Expand Down
1,084 changes: 669 additions & 415 deletions Pipfile.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
query responses (#584)
- Added instanceId for each entry received (#565)
- Support CrateDB authentication (#474)
- Updated PG8000 to 1.23.0 (#586)

### Bug fixes

Expand Down
23 changes: 13 additions & 10 deletions src/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,11 @@ def do_clean_crate():
crate_db_password = os.environ.get('CRATE_DB_PASS', None)

from crate import client
conn = client.connect(["{}:{}".format(crate_host, crate_port)],
error_trace=True, username=crate_db_username, password=crate_db_password)
conn = client.connect(["{}:{}".format(crate_host,
crate_port)],
error_trace=True,
username=crate_db_username,
password=crate_db_password)
cursor = conn.cursor()

try:
Expand Down Expand Up @@ -139,18 +142,18 @@ def crate_translator(clean_crate):
class Translator(CrateTranslator):

def insert(self, entities,
fiware_service=None, fiware_servicepath=None):
fiware_service=None, fiware_servicepath=None):
r = CrateTranslator.insert(self, entities,
fiware_service, fiware_servicepath)
fiware_service, fiware_servicepath)
self._refresh(set([e['type'] for e in entities]),
fiware_service=fiware_service)
fiware_service=fiware_service)
return r

def delete_entity(self, entity_id, entity_type=None,
fiware_service=None, **kwargs):
fiware_service=None, **kwargs):
r = CrateTranslator.delete_entity(self, entity_id, entity_type,
fiware_service=fiware_service,
**kwargs)
fiware_service=fiware_service,
**kwargs)
try:
self._refresh([entity_type], fiware_service=fiware_service)
except exceptions.ProgrammingError:
Expand Down Expand Up @@ -183,8 +186,8 @@ def clean(self, fiware_service=None, **kwargs):
if types:
for t in types:
CrateTranslator.drop_table(self, t,
fiware_service=fiware_service,
**kwargs)
fiware_service=fiware_service,
**kwargs)
try:
self._refresh(types, fiware_service=fiware_service)
except exceptions.ProgrammingError:
Expand Down
6 changes: 4 additions & 2 deletions src/reporter/query_1T1E1A.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ def query_1T1E1A(attr_name, # In Path
except NGSIUsageError as e:
msg = "Bad Request Error: {}".format(e)
logging.getLogger(__name__).error(msg, exc_info=True)
logging.warn("usage of id and type rather than entityId and entityType from version 0.9")
logging.warning(
"usage of id and type rather than entityId and entityType from version 0.9")
return {
"error": "{}".format(type(e)),
"description": str(e)
Expand Down Expand Up @@ -107,5 +108,6 @@ def query_1T1E1A_value(*args, **kwargs):
res.pop('entityId', None)
res.pop('entityType', None)
res.pop('attrName', None)
logging.warn("usage of id and type rather than entityId and entityType from version 0.9")
logging.warning(
"usage of id and type rather than entityId and entityType from version 0.9")
return res
6 changes: 4 additions & 2 deletions src/reporter/query_1T1ENA.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ def query_1T1ENA(entity_id, # In Path
'attributes': attributes
}
logging.getLogger(__name__).info("Query processed successfully")
logging.warn("usage of id and type rather than entityId and entityType from version 0.9")
logging.warning(
"usage of id and type rather than entityId and entityType from version 0.9")
return res

r = {
Expand All @@ -123,5 +124,6 @@ def query_1T1ENA_value(*args, **kwargs):
if isinstance(res, dict):
res.pop('entityId', None)
res.pop('entityType', None)
logging.warn("usage of id and type rather than entityId and entityType from version 0.9")
logging.warning(
"usage of id and type rather than entityId and entityType from version 0.9")
return res
6 changes: 4 additions & 2 deletions src/reporter/query_1TNE1A.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ def query_1TNE1A(attr_name, # In Path
from_date,
to_date,)
logging.getLogger(__name__).info("Query processed successfully")
logging.warn("usage of id and type rather than entityId and entityType from version 0.9")
logging.warning(
"usage of id and type rather than entityId and entityType from version 0.9")
return res

r = {
Expand Down Expand Up @@ -169,5 +170,6 @@ def query_1TNE1A_value(*args, **kwargs):
res.pop('attrName', None)
res['values'] = res['entities']
res.pop('entities', None)
logging.warn("usage of id and type rather than entityId and entityType from version 0.9")
logging.warning(
"usage of id and type rather than entityId and entityType from version 0.9")
return res
6 changes: 4 additions & 2 deletions src/reporter/query_1TNENA.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ def query_1TNENA(entity_type=None, # In Path
from_date,
to_date,)
logging.getLogger(__name__).info("Query processed successfully")
logging.warn("usage of id and type rather than entityId and entityType from version 0.9")
logging.warning(
"usage of id and type rather than entityId and entityType from version 0.9")
return res

r = {
Expand Down Expand Up @@ -160,5 +161,6 @@ def query_1TNENA_value(*args, **kwargs):
res.pop('entityType', None)
res['values'] = res['entities']
res.pop('entities', None)
logging.warn("usage of id and type rather than entityId and entityType from version 0.9")
logging.warning(
"usage of id and type rather than entityId and entityType from version 0.9")
return res
3 changes: 2 additions & 1 deletion src/reporter/query_NTNE.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ def query_NTNE(limit=10000,
del entity['id']
del entity['type']
res.append(entity)
logging.warn("usage of id and type rather than entityId and entityType from version 0.9")
logging.warning(
"usage of id and type rather than entityId and entityType from version 0.9")
logging.getLogger(__name__).info("Query processed successfully")
return res

Expand Down
6 changes: 4 additions & 2 deletions src/reporter/query_NTNE1A.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ def query_NTNE1A(attr_name, # In Path
'types': entity_type
}
logging.getLogger(__name__).info("Query processed successfully")
logging.warn("usage of id and type rather than entityId and entityType from version 0.9")
logging.warning(
"usage of id and type rather than entityId and entityType from version 0.9")
return res
r = {
"error": "Not Found",
Expand All @@ -155,5 +156,6 @@ def query_NTNE1A_value(*args, **kwargs):
res['values'] = res['types']
res.pop('attrName', None)
res.pop('types', None)
logging.warn("usage of id and type rather than entityId and entityType from version 0.9")
logging.warning(
"usage of id and type rather than entityId and entityType from version 0.9")
return res
6 changes: 4 additions & 2 deletions src/reporter/query_NTNENA.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ def query_NTNENA(id_=None, # In Query
'attrs': attrs_values
}
logging.getLogger(__name__).info("Query processed successfully")
logging.warn("usage of id and type rather than entityId and entityType from version 0.9")
logging.warning(
"usage of id and type rather than entityId and entityType from version 0.9")
return res

if err == "AggrMethod cannot be applied":
Expand All @@ -169,5 +170,6 @@ def query_NTNENA_value(*args, **kwargs):
if isinstance(res, dict):
res['values'] = res['attrs']
res.pop('attrs', None)
logging.warn("usage of id and type rather than entityId and entityType from version 0.9")
logging.warning(
"usage of id and type rather than entityId and entityType from version 0.9")
return res
3 changes: 2 additions & 1 deletion src/reporter/tests/test_instanceId.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def reporter_dataset():
for service in services:
delete_test_data(service, [entity_type])


@pytest.mark.parametrize("service", services)
def test_instanceId(service, reporter_dataset):
h = {'Fiware-Service': service}
Expand All @@ -46,7 +47,7 @@ def test_instanceId(service, reporter_dataset):

if instanceIds:
unique_instanceIds = []

# traverse for all elements
for x in instanceIds:
# check if exists in unique_list or not
Expand Down
9 changes: 7 additions & 2 deletions src/translators/crate.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
CRATE_DB_USER_ENV_VAR = 'CRATE_DB_USER'
CRATE_DB_PASS_ENV_VAR = 'CRATE_DB_PASS'


class CrateConnectionData:

def __init__(self, host='0.0.0.0', port=4200,
Expand All @@ -68,7 +69,7 @@ def read_env(self, env: dict = os.environ):
# Added backoff_factor for retry interval between attempt of
# consecutive retries
self.backoff_factor = r.read(FloatVar('CRATE_BACKOFF_FACTOR', 0.0))
self.active_shards = r.read(StrVar('CRATE_WAIT_ACTIVE_SHARDS', '1'))
self.active_shards = r.read(StrVar('CRATE_WAIT_ACTIVE_SHARDS', '1'))


class CrateTranslator(sql_translator.SQLTranslator):
Expand All @@ -94,7 +95,11 @@ def setup(self):
if self.connection is None:
try:
self.connection = client.connect(
[url], error_trace=True, backoff_factor=self.backoff_factor, username=self.username, password=self.password)
[url],
error_trace=True,
backoff_factor=self.backoff_factor,
username=self.username,
password=self.password)
self.ccm.set_connection('crate', self.connection)
except Exception as e:
self.logger.warning(str(e), exc_info=True)
Expand Down
19 changes: 9 additions & 10 deletions src/translators/sql_translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ def _preprocess_values(self, e, original_attrs, col_names,
elif cn == FIWARE_SERVICEPATH:
values.append(fiware_servicepath or '')
elif cn == 'instanceId':
values.append("urn:ngsi-ld:"+str(uuid4()))
values.append("urn:ngsi-ld:" + str(uuid4()))
else:
# Normal attributes
try:
Expand Down Expand Up @@ -1302,14 +1302,14 @@ def query_last_value(self,
return result

def query_instanceId(self,
entity_id=None,
entity_type=None,
from_date=None,
to_date=None,
limit=10000,
offset=0,
fiware_service=None,
fiware_servicepath=None):
entity_id=None,
entity_type=None,
from_date=None,
to_date=None,
limit=10000,
offset=0,
fiware_service=None,
fiware_servicepath=None):
if limit == 0:
return []

Expand All @@ -1335,7 +1335,6 @@ def query_instanceId(self,
to_date,
fiware_servicepath)


limit = min(10000, limit)
offset = max(0, offset)
result = []
Expand Down
9 changes: 6 additions & 3 deletions src/translators/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from six.moves.urllib.error import HTTPError
from six.moves.urllib.request import urlopen


def check_crate(docker_ip, public_port):
"""Check if a crate is reachable.

Expand Down Expand Up @@ -39,8 +39,11 @@ def docker_stack(docker_services):
# to test crate authentication
from time import sleep
sleep(5)
docker_services.execute('crate', "bash", "-c",
"crash -c \"CREATE USER quantumleap WITH (password = 'a_secret_password');\" && \
docker_services.execute(
'crate',
"bash",
"-c",
"crash -c \"CREATE USER quantumleap WITH (password = 'a_secret_password');\" && \
crash -c \"GRANT DML,DDL,DQL TO quantumleap;\"")


Expand Down
2 changes: 1 addition & 1 deletion src/translators/tests/test_timescale_insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def select_entities(pg_cursor, full_table_name, entity_id):
stmt = f'select * from {full_table_name} where entity_id = ?'

rows = pg_cursor.execute(stmt, [entity_id])
keys = [k[0].decode('utf-8') for k in pg_cursor.description]
keys = [k[0] for k in pg_cursor.description]
chicco785 marked this conversation as resolved.
Show resolved Hide resolved
return [dict(zip(keys, row)) for row in rows]


Expand Down
19 changes: 11 additions & 8 deletions src/translators/timescale.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from contextlib import contextmanager
from datetime import datetime, timezone
import pg8000
import json
from typing import Any, Callable, Sequence
import os
from translators import sql_translator
Expand Down Expand Up @@ -71,6 +72,10 @@ def read_env(self, env: dict = os.environ):
mask_value=True))


def _encode_to_json_string(data: dict or list) -> str:
return json.dumps(data)


class PostgresTranslator(sql_translator.SQLTranslator):
NGSI_TO_SQL = NGSI_TO_SQL

Expand Down Expand Up @@ -124,7 +129,7 @@ def with_connection_guard(self, db_action: Callable):
db_action()
except Exception as e:
self.sql_error_handler(e)
logging.debug(str(e), exc_info=True)
logging.error(str(e), exc_info=True)

def get_health(self):
health = {}
Expand Down Expand Up @@ -209,7 +214,7 @@ def _ngsi_slf_to_db(attr):
def _ngsi_structured_to_db(attr):
attr_v = attr.get('value', None)
if isinstance(attr_v, dict):
return pg8000.PGJsonb(attr_v)
return _encode_to_json_string(attr_v)
logging.warning('{} cannot be cast to {} replaced with None'.format(
attr.get('value', None), attr.get('type', None)))
return None
Expand All @@ -218,7 +223,7 @@ def _ngsi_structured_to_db(attr):
def _ngsi_array_to_db(attr):
attr_v = attr.get('value', None)
if isinstance(attr_v, list):
return pg8000.PGJsonb(attr_v)
return _encode_to_json_string(attr_v)
logging.warning('{} cannot be cast to {} replaced with None'.format(
attr.get('value', None), attr.get('type', None)))
return None
Expand Down Expand Up @@ -253,11 +258,9 @@ def _db_value_to_ngsi(self, db_value: Any, ngsi_type: str) -> Any:
# 2. Basic types (int, float, boolean and text). They also get converted
# back to their corresponding Python types.

# TODO with the new pg8000 PGJsonb is removed...
# it simply replace with json dumps()
@staticmethod
def _to_db_ngsi_structured_value(data: dict) -> pg8000.PGJsonb:
return pg8000.PGJsonb(data)
def _to_db_ngsi_structured_value(data: dict) -> str:
return _encode_to_json_string(data)

def _should_insert_original_entities(self,
insert_error: Exception) -> bool:
Expand All @@ -278,7 +281,7 @@ def do_store():
" on conflict (table_name)" \
" do update set entity_attrs = ?"
stmt = stmt.format(METADATA_TABLE_NAME)
entity_attrs_value = pg8000.PGJsonb(persisted_metadata)
entity_attrs_value = _encode_to_json_string(persisted_metadata)
self.cursor.execute(stmt, (table_name, entity_attrs_value,
entity_attrs_value))

Expand Down