Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds self.run_forever_flag to streams #501

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

0xfMissingNo
Copy link

@0xfMissingNo 0xfMissingNo commented Sep 29, 2021

wanted to handle async loops from a separate thread if need be; pushing my patch upstream:

In [1]: import os

In [2]: import time

In [3]: import threading

In [4]: import asyncio

In [5]: from alpaca_trade_api.stream import Stream

In [6]: asyncio.set_event_loop(asyncio.new_event_loop())

In [7]: loop = asyncio.get_event_loop()

In [8]: async def print_quote(*args):
   ...:     print(*args)
   ...: 

In [9]: stream = Stream(os.getenv('PAPER_ALPACA_API_KEY_ID'), os.getenv('PAPER_ALPACA_API_SECRET_KEY'), 'https://paper-api.alpaca.markets')

In [10]: stream.subscribe_quotes(print_quote, 'AAPL')

In [11]: def run():
    ...:     loop.run_until_complete(stream._run_forever())
    ...: 

In [12]: thread = threading.Thread(target=run, args=(), daemon=True)

In [13]: thread.start()

In [14]: thread.is_alive()
Out[14]: True

In [15]: stream._data_ws._run_forever_flag
Out[15]: True

In [16]: asyncio.run_coroutine_threadsafe(stream.stop_ws(), loop)
Out[16]: <Future at 0x7f9fab23ff40 state=pending>

In [17]: stream._data_ws._run_forever_flag
Out[17]: True

In [18]: thread.is_alive()
Out[18]: True

In [19]: asyncio.run_coroutine_threadsafe(stream._close(), loop)
Out[19]: <Future at 0x7f9fa8757670 state=pending>

data websocket error, restarting connection: code = 1000 (OK), no reason
In [20]: thread.is_alive()
Out[20]: False

In [21]: stream._data_ws._run_forever_flag
Out[21]: False

In [22]: exit

@@ -3,4 +3,4 @@
from .stream import Stream # noqa
from .stream2 import StreamConn # noqa

__version__ = '1.4.0'
__version__ = '1.4.1'
Copy link
Contributor

@haxdds haxdds Nov 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi 0xfMissingNo! Version changes are necessary in your PR. They are made when we release and deploy a new version.

@@ -376,6 +379,7 @@ def __init__(self,
self._ws = None
self._running = False
self._stop_stream_queue = queue.Queue()
self._run_forever_flag = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this flag? What functionality does it add over while True?

@@ -648,6 +654,22 @@ def run(self):
print('keyboard interrupt, bye')
pass

async def _close(self):
Copy link
Contributor

@haxdds haxdds Nov 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking the time to contribute! To help us out, could you please elaborate a bit further on the goal of your PR?

@0xfMissingNo 0xfMissingNo requested a review from drew887 as a code owner April 7, 2022 14:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants