Skip to content

Commit

Permalink
Merge pull request #11 from SiriDB/conn
Browse files Browse the repository at this point in the history
conn
  • Loading branch information
joente authored Feb 10, 2023
2 parents 9e8b155 + 8510122 commit 55179f5
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 6 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ test/
build/
dist/
siridb_connector.egg-info/
sample.py
2 changes: 1 addition & 1 deletion siridb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version_info__ = (2, 0, 9)
__version_info__ = (2, 1, 0)
__version__ = '.'.join(map(str, __version_info__))
__maintainer__ = 'Jeroen van der Heijden'
__email__ = '[email protected]'
2 changes: 1 addition & 1 deletion siridb/connector/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from .lib.protocol import _SiriDBInfoProtocol
from .lib.connection import SiriDBConnection
from .lib.defaults import DEFAULT_CLIENT_PORT
from .lib.client import SiriDBClient
from .lib.client import SiriDBClient, SiriDBConn
from .lib.constants import SECOND
from .lib.constants import MICROSECOND
from .lib.constants import MILLISECOND
Expand Down
157 changes: 153 additions & 4 deletions siridb/connector/lib/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,17 @@
import asyncio
import functools
import random
from .protocol import _SiriDBProtocol
from .protocol import _SiriDBProtocol, _SiriDBConnProtocol
from .connection import SiriDBAsyncConnection
from .exceptions import AuthenticationError
from .exceptions import ServerError
from .exceptions import PoolError
from .constants import SECOND
from .constants import MICROSECOND
from .constants import MILLISECOND
from .constants import NANOSECOND
from .protomap import CPROTO_REQ_QUERY
from .protomap import CPROTO_REQ_INSERT
from .logging import logger as logging


Expand Down Expand Up @@ -173,7 +179,8 @@ def connected(self):
def _log_connect_result(result):
for r in result:
if r:
logging.error(r)
msg = str(r) or type(r).__name__
logging.error(msg)

async def connect(self, timeout=None):
self._retry_connect = True
Expand Down Expand Up @@ -206,7 +213,8 @@ async def insert(self, data, timeout=300):
except PoolError as e:
if self._loop.time() > end:
raise
logging.debug(e)
msg = str(e) or type(e).__name__
logging.debug(msg)
await asyncio.sleep(2)
else:
return result
Expand Down Expand Up @@ -234,7 +242,8 @@ async def query(self, query, time_precision=None, timeout=60):
except PoolError as e:
if self._loop.time() > end:
raise
logging.debug(e)
msg = str(e) or type(e).__name__
logging.debug(msg)
await asyncio.sleep(2)
else:
return result
Expand Down Expand Up @@ -315,3 +324,143 @@ def _get_random_connection(self, try_unavailable=False):
return random.choice(connections)

raise PoolError('No available connections found')


class SiriDBConn:

MAX_RECONNECT_WAIT_TIME = 60
MAX_RECONNECT_TIMEOUT = 10
MAX_WRITE_RETRY = 600
RECONNECT_ATTEMPT = 3

def __init__(self,
username,
password,
dbname,
server,
port=9000,
loop=None):
self._username = username
self._password = password
self._dbname = dbname
self._server = server
self._port = port
self._loop = loop or asyncio.get_event_loop()
self._reconnecting = False
self._protocol = None

async def _connect(self, timeout):
client = self._loop.create_connection(
lambda: _SiriDBConnProtocol(
self._username,
self._password,
self._dbname),
host=self._server,
port=self._port)
_transport, self._protocol = \
await asyncio.wait_for(client, timeout=timeout)

try:
res = await asyncio.wait_for(
self._protocol.auth_future,
timeout=timeout)
except Exception as exc:
_transport.close()
raise exc

async def _reconnect_loop(self):
try:
wait_time = 1
timeout = 2
protocol = self._protocol
while True:
host, port = self._server, self._port
try:
await self._connect(timeout=timeout)
except Exception as e:
logging.error(
f'Connecting to {host}:{port} failed: '
f'{e}({e.__class__.__name__}), '
f'Try next connect in {wait_time} seconds'
)
else:
if protocol and protocol._connected:
# make sure the `old` connection will be dropped
self._loop.call_later(10.0, protocol.transport.close)
break

await asyncio.sleep(wait_time)
wait_time *= 2
wait_time = min(wait_time, self.MAX_RECONNECT_WAIT_TIME)
timeout = min(timeout+1, self.MAX_RECONNECT_TIMEOUT)
finally:
self._reconnecting = False

def _reconnect(self):
if self._reconnecting:
return asyncio.sleep(1)
self._reconnecting = True
return self._reconnect_loop()

def is_connected(self):
return self._protocol and self._protocol._connected

async def insert(self, data, timeout=300):
result = await self._ensure_write(
CPROTO_REQ_INSERT,
data=data,
timeout=timeout)
return result

def close(self):
if self.is_connected():
if not hasattr(self._protocol, 'close_future'):
self._protocol.close_future = self._loop.create_future()
self._protocol.transport.close()

async def wait_closed(self):
if self._protocol and hasattr(self._protocol, 'close_future'):
await self._protocol.close_future

async def query(self, query, time_precision=None, timeout=60):
assert isinstance(query, (str, bytes)), \
'query should be of type str, unicode or bytes'
assert time_precision in (
None,
SECOND,
MICROSECOND,
MILLISECOND,
NANOSECOND), 'time_precision must be either None, 0, 1, 2, 3'
result = await self._ensure_write(
CPROTO_REQ_QUERY,
data=(query, time_precision),
timeout=timeout)
return result

async def _ensure_write(
self,
tipe, data=None, is_binary=False, timeout=None):
retry = 0
while True:
retry += 1

if not self.is_connected():
if retry > self.MAX_WRITE_RETRY:
raise ConnectionError("Failed to create a connection")
if retry == 1:
logging.info('Wait for a connection')
await self._reconnect() # ensure the re-connect loop
continue

try:
res = await self._protocol.send_package(
tipe, data, is_binary, timeout)
except Exception as e:
if retry > self.MAX_WRITE_RETRY:
raise e
if retry % self.RECONNECT_ATTEMPT == 0:
self._reconnect()
await asyncio.sleep(1.0)
continue

return res
8 changes: 8 additions & 0 deletions siridb/connector/lib/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,11 @@ def connection_made(self, transport):
protomap.CPROTO_REQ_INFO,
data=None,
timeout=10)


class _SiriDBConnProtocol(_SiriDBProtocol):

def on_connection_lost(self, exc):
if hasattr(self, 'close_future'):
self.close_future.set_result(None)
delattr(self, 'close_future')

0 comments on commit 55179f5

Please sign in to comment.