From 83a1f007a125c273f174fd1c785cc40c51af3aa9 Mon Sep 17 00:00:00 2001 From: 0xfMissingNo <0xfmissingno@protonmail.com> Date: Wed, 29 Sep 2021 13:27:11 -0700 Subject: [PATCH] adds self.run_forever_flag to streams --- alpaca_trade_api/__init__.py | 2 +- alpaca_trade_api/stream.py | 26 ++++++++++++++++++++++++-- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/alpaca_trade_api/__init__.py b/alpaca_trade_api/__init__.py index 3c0bd89a..a0dcc816 100644 --- a/alpaca_trade_api/__init__.py +++ b/alpaca_trade_api/__init__.py @@ -3,4 +3,4 @@ from .stream import Stream # noqa from .stream2 import StreamConn # noqa -__version__ = '1.4.0' +__version__ = '1.4.1' diff --git a/alpaca_trade_api/stream.py b/alpaca_trade_api/stream.py index 49befd9c..9de05af1 100644 --- a/alpaca_trade_api/stream.py +++ b/alpaca_trade_api/stream.py @@ -45,6 +45,7 @@ def __init__(self, 'dailyBars': {}, } self._name = 'data' + self._run_forever_flag = False async def _connect(self): self._ws = await websockets.connect( @@ -79,6 +80,7 @@ async def close(self): await self._ws.close() self._ws = None self._running = False + self._run_forever_flag = False async def stop_ws(self): self._stop_stream_queue.put_nowait({"should_stop": True}) @@ -191,7 +193,8 @@ async def _run_forever(self): await asyncio.sleep(0.1) log.info(f'started {self._name} stream') self._running = False - while True: + self._run_forever_flag = True + while self._run_forever_flag: try: if not self._running: log.info("starting websocket connection") @@ -376,6 +379,7 @@ def __init__(self, self._ws = None self._running = False self._stop_stream_queue = queue.Queue() + self._run_forever_flag = False async def _connect(self): self._ws = await websockets.connect(self._endpoint) @@ -443,7 +447,8 @@ async def _run_forever(self): await asyncio.sleep(0.1) log.info('started trading stream') self._running = False - while True: + self._run_forever_flag = True + while self._run_forever_flag: try: if not self._running: log.info("starting websocket connection") @@ -466,6 +471,7 @@ async def close(self): await self._ws.close() self._ws = None self._running = False + self._run_forever_flag = False async def stop_ws(self): self._stop_stream_queue.put_nowait({"should_stop": True}) @@ -648,6 +654,22 @@ def run(self): print('keyboard interrupt, bye') pass + async def _close(self): + await asyncio.gather( + self.stop_ws(), + self._trading_ws.close(), + self._data_ws.close(), + self._crypto_ws.close() + ) + + def close(self): + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(self._close()) + except KeyboardInterrupt: + print('keyboard interrupt, bye') + pass + async def stop_ws(self): """ Signal the ws connections to stop listenning to api stream.