Skip to content

Commit

Permalink
Working on SiriDB connector
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeroen van der Heijden committed Oct 4, 2016
1 parent 0abb512 commit 1e054d9
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 25 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ __pycache__/
.pydevproject
.idea
checklist.txt
test/
36 changes: 29 additions & 7 deletions siridb/connector/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import os
import sys
import asyncio
from .lib.protocol import _SiriDBProtocol
from .lib.protocol import _SiriDBInfoProtocol
from .lib.connection import SiriDBConnection
from .lib.defaults import DEFAULT_CLIENT_PORT
from .lib.client import SiriDBClient


class SiriDBProtocol(_SiriDBProtocol):
Expand All @@ -12,18 +16,18 @@ def on_connection_made(self):
def on_authenticated(self):
pass

def on_connection_lost(self):
def on_connection_lost(self, exc):
pass


def connect(username,
password,
dbname,
host='127.0.0.1',
port=c.DEFAULT_CLIENT_PORT,
port=DEFAULT_CLIENT_PORT,
loop=None,
timeout=10,
protocol=SiriClientProtocol):
protocol=SiriDBProtocol):

return SiriDBConnection(
username,
Expand All @@ -40,11 +44,11 @@ async def async_connect(username,
password,
dbname,
host='127.0.0.1',
port=c.DEFAULT_CLIENT_PORT,
port=DEFAULT_CLIENT_PORT,
loop=None,
timeout=10,
keepalive=False,
protocol=SiriClientProtocol):
protocol=SiriDBProtocol):

connection = SiriDBAsyncConnection()
await connection.connect(
Expand All @@ -61,8 +65,26 @@ async def async_connect(username,
return connection


async def async_server_info(host='127.0.0.1',
port=DEFAULT_CLIENT_PORT,
loop=None,
timeout=10):
loop = loop or asyncio.get_event_loop()
client = loop.create_connection(
lambda: _SiriDBInfoProtocol(None, None, None),
host=host,
port=port)
transport, protocol = \
await asyncio.wait_for(client, timeout=timeout)
await protocol.future
transport.close()
return protocol._info


__all__ = [
'connect',
'async_connect',
'SiriDBProtocol']
'async_server_info',
'connect',
'SiriDBClient',
'SiriDBProtocol',
]
18 changes: 14 additions & 4 deletions siridb/connector/lib/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,18 @@
:copyright: 2016, Jeroen van der Heijden (Transceptor Technology)
'''
import asyncio
import functools
import logging
import random
from .protocol import _SiriDBProtocol
from .connection import SiriDBAsyncConnection
from .exceptions import AuthenticationError
from .exceptions import ServerError
from .exceptions import PoolError


class SiriDBClientProtocol(_SiriDBProtocol):
class _SiriDBClientProtocol(_SiriDBProtocol):

_is_available = False

Expand All @@ -19,7 +27,7 @@ def __init__(self, *args, trigger_connect, inactive_time):
def on_authenticated(self):
self._is_available = True

def on_connection_lost(self):
def on_connection_lost(self, exc):
self._is_available = False
self._trigger_connect()

Expand Down Expand Up @@ -71,6 +79,8 @@ class SiriDBClient:
- OverflowError (can only be raised when using the insert() method)
Raise when integer values cannot not be packed due to an overflow
error. (integer values should be signed and not more than 63 bits)
- UserAuthError
The user as no rights to perform the insert or query.
'''

def __init__(self,
Expand Down Expand Up @@ -130,7 +140,7 @@ def __init__(self,
self._keepalive = keepalive
for host, port, *config in hostlist:
config = config.pop() if config else {}
client = AsyncSiriClient()
client = SiriDBAsyncConnection()
client.host = host
client.port = port
client.is_backup = config.get('backup', False)
Expand All @@ -145,7 +155,7 @@ def __init__(self,
self._connect_task = None
self._max_wait_retry = max_wait_retry
self._protocol = \
functools.partial(SiriClusterProtocol,
functools.partial(_SiriDBClientProtocol,
trigger_connect=self._trigger_connect,
inactive_time=inactive_time)

Expand Down
33 changes: 21 additions & 12 deletions siridb/connector/lib/connection.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
import asyncio
import time
from .defaults import DEFAULT_CLIENT_PORT
from .protocol import _SiriDBProtocol
from .protomap import CPROTO_REQ_QUERY
from .protomap import CPROTO_REQ_INSERT
from .protomap import CPROTO_REQ_REGISTER_SERVER
from .protomap import CPROTO_REQ_PING
from .protomap import FILE_MAP

class SiriDBConnection():

Expand All @@ -6,10 +15,10 @@ def __init__(self,
password,
dbname,
host='127.0.0.1',
port=c.DEFAULT_CLIENT_PORT,
port=DEFAULT_CLIENT_PORT,
loop=None,
timeout=10,
protocol=SiriClientProtocol):
protocol=_SiriDBProtocol):
self._loop = loop or asyncio.get_event_loop()
client = self._loop.create_connection(
lambda: protocol(username, password, dbname),
Expand All @@ -25,14 +34,14 @@ def close(self):

def query(self, query, time_precision=None, timeout=30):
result = self._loop.run_until_complete(
self._protocol.send_package(npt.CPROTO_REQ_QUERY,
self._protocol.send_package(CPROTO_REQ_QUERY,
data=(query, time_precision),
timeout=timeout))
return result

def insert(self, data, timeout=600):
result = self._loop.run_until_complete(
self._protocol.send_package(npt.CPROTO_REQ_INSERT,
self._protocol.send_package(CPROTO_REQ_INSERT,
data=data,
timeout=timeout))
return result
Expand All @@ -44,7 +53,7 @@ def _register_server(self, server, timeout=30):
otherwise. Full access rights are required for this request.
'''
result = self._loop.run_until_complete(
self._protocol.send_package(npt.CPROTO_REQ_REGISTER_SERVER,
self._protocol.send_package(CPROTO_REQ_REGISTER_SERVER,
data=server,
timeout=timeout))
return result
Expand All @@ -55,11 +64,11 @@ def _get_file(self, fn, timeout=30):
This method is used by the SiriDB manage tool and should not be used
otherwise. Full access rights are required for this request.
'''
msg = npt.FILE_MAP.get(fn, None)
msg = FILE_MAP.get(fn, None)
if msg is None:
raise FileNotFoundError('Cannot get file {!r}. Available file '
'requests are: {}'
.format(fn, ', '.join(npt.FILE_MAP.keys())))
.format(fn, ', '.join(FILE_MAP.keys())))
result = self._loop.run_until_complete(
self._protocol.send_package(msg, timeout=timeout))
return result
Expand All @@ -81,7 +90,7 @@ async def keepalive_loop(self, interval=45):
if sleep == interval:
logging.debug('Send keep-alive package...')
try:
await self._protocol.send_package(npt.CPROTO_REQ_PING,
await self._protocol.send_package(CPROTO_REQ_PING,
timeout=15)
except asyncio.CancelledError:
break
Expand All @@ -95,11 +104,11 @@ async def connect(self,
password,
dbname,
host='127.0.0.1',
port=c.DEFAULT_CLIENT_PORT,
port=DEFAULT_CLIENT_PORT,
loop=None,
timeout=10,
keepalive=False,
protocol=SiriClientProtocol):
protocol=_SiriDBProtocol):
loop = loop or asyncio.get_event_loop()
client = loop.create_connection(
lambda: protocol(username, password, dbname),
Expand All @@ -123,15 +132,15 @@ def close(self):

async def query(self, query, time_precision=None, timeout=3600):
result = await self._protocol.send_package(
npt.CPROTO_REQ_QUERY,
CPROTO_REQ_QUERY,
data=(query, time_precision),
timeout=timeout)
self._last_resp = time.time()
return result

async def insert(self, data, timeout=3600):
result = await self._protocol.send_package(
npt.CPROTO_REQ_INSERT,
CPROTO_REQ_INSERT,
data=data,
timeout=timeout)
self._last_resp = time.time()
Expand Down
4 changes: 4 additions & 0 deletions siridb/connector/lib/defaults.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
'''SiriDB Default values
'''
DEFAULT_CLIENT_PORT = 9000
29 changes: 27 additions & 2 deletions siridb/connector/lib/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@

_MAP = (
lambda data: b'',
lambda data: _qpack_safe(data),
lambda data: qpack.packb(data),
lambda data: data
)


def _packdata(tipe, data=None):
assert tipe in protomap.MAP_REQ_DTYPE, \
'No data type found for message type: {}'.format(tipe)
Expand Down Expand Up @@ -230,3 +229,29 @@ def _on_package_received(self):
future,
self._data_package.data)


class _SiriDBInfoProtocol(_SiriDBProtocol):

_info = []

def connection_made(self, transport):
'''
override _SiriDBProtocol
'''
def finished(future):
if not future.exception():
self._info = future.result()

self.transport = transport
self.remote_ip, self.port = transport.get_extra_info('peername')[:2]

logging.info(
'Connection made (address: {} port: {})'
.format(self.remote_ip, self.port))

self.future = self.send_package(
protomap.CPROTO_REQ_INFO,
data=None,
timeout=10)

self.future.add_done_callback(finished)

0 comments on commit 1e054d9

Please sign in to comment.