Skip to content

Commit

Permalink
Merge pull request #6 from fugle-dev/develop
Browse files Browse the repository at this point in the history
refactor: WebSocket originally used asyncio, now switched to synchronous
  • Loading branch information
bistin authored Jan 11, 2024
2 parents d943378 + 3384741 commit 3890568
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 30 deletions.
22 changes: 11 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ print(stock.intraday.quote(symbol="2330"))
### WebSocket API

```py

from fugle_marketdata import WebSocketClient, RestClient
import asyncio


def handle_message(message):
print(f'message: {message}')
Expand All @@ -50,25 +49,26 @@ def handle_disconnect(code, message):
def handle_error(error):
print(f'error: {error}')

async def main():
client = WebSocketClient(api_key = 'YOUR_API_KEY')

def main():
client = WebSocketClient(api_key='YOUR_API_KEY')
stock = client.stock
stock.on("connect", handle_connect)
stock.on("message", handle_message)
stock.on("disconnect", handle_disconnect)
stock.on("error", handle_error)
await stock.connect()
stock.subscribe({
"channel": 'trades',
"symbol": '2330'
})
stock.connect()
stock.subscribe({
"channel": 'trades',
"symbol": '2330'
})


if __name__ == "__main__":
asyncio.run(main())
main()

```

## License

[MIT](LICENSE)

79 changes: 60 additions & 19 deletions fugle_marketdata/websocket/client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import asyncio
import time
import json
import websocket
from pyee import EventEmitter
import sys
from threading import Thread, Timer
from typing import Generic, TypeVar

from ..constants import (
CONNECT_EVENT,
Expand All @@ -14,11 +15,41 @@
UNAUTHENTICATED_MESSAGE
)

T = TypeVar('T')
E = TypeVar('E')


class Ok(Generic[T]):
def __init__(self, value: T):
self.value = value

def is_ok(self):
return True

def is_err(self):
return False

def __repr__(self):
return f"Ok({repr(self.value)})"


class Err(Generic[E]):
def __init__(self, error: E):
self.error = error

def is_ok(self):
return False

def is_err(self):
return True

def __repr__(self):
return f"Err({repr(self.error)})"


class WebSocketClient():
def __init__(self, **config):
self.config = config
# config: base_url, api_key?, bearer_token?
self.ee = EventEmitter()
self.ee.on(CONNECT_EVENT, self.__authenticate)
self.__ws = websocket.WebSocketApp(
Expand All @@ -27,8 +58,17 @@ def __init__(self, **config):
on_close=self.__on_close,
on_error=self.__on_error,
on_message=self.__on_message)
self.loop = asyncio.get_event_loop()
self.future = self.loop.create_future()

self.auth_result = None

def ping(self, message):
message = {
"event": "ping",
"data": {
"state": message
}
}
self.__send(message)

def subscribe(self, params):
message = {
Expand All @@ -43,13 +83,6 @@ def unsubscribe(self, params):
"data": params
}
self.__send(message)

def ping(self, params):
message = {
"event": "ping",
"data": params
}
self.__send(message)

def __authenticate(self):
if self.config.get('api_key'):
Expand Down Expand Up @@ -85,26 +118,34 @@ def __on_message(self, ws, data):
self.ee.emit(MESSAGE_EVENT, data)
if message['event'] == AUTHENTICATED_EVENT:
self.ee.emit(AUTHENTICATED_EVENT, message)
self.loop.call_soon_threadsafe(self.future.set_result, 'success')
self.auth_result = Ok("success")
elif message['event'] == ERROR_EVENT:
if message['data'] and message['data']['message'] == UNAUTHENTICATED_MESSAGE:
self.ee.emit(UNAUTHENTICATED_EVENT, message)
self.loop.call_soon_threadsafe(
self.future.set_exception, 'login fail')
self.auth_result = Err(UNAUTHENTICATED_MESSAGE)

def __on_error(self, ws, error):
self.ee.emit(ERROR_EVENT, error)

def on(self, event, listener):
self.ee.on(event, listener)

def off(self, event, listener):
self.ee.off(event, listener)

async def connect(self):
self.loop.run_in_executor(None, self.__ws.run_forever)
await self.future
def connect(self):
Thread(target=self.__ws.run_forever).start()
while True:
time.sleep(50/1000)
if self.auth_result is not None:
break

if self.auth_result.is_ok():
return ""
elif self.auth_result.is_err():
raise Exception(self.auth_result.error)

def disconnect(self):
if self.__ws is not None:
self.__ws.close()
self.auth_result = None

0 comments on commit 3890568

Please sign in to comment.