Skip to content

Commit

Permalink
Clean up event handling & document TikTokLiveClient class
Browse files Browse the repository at this point in the history
  • Loading branch information
isaackogan committed Feb 23, 2024
1 parent aeec4e7 commit b3eb322
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 36 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,12 @@ Both belong to the TikTokLive `Event` type and can be listened to. The following
- `ConnectEvent` - Triggered when the Webcast connection is initiated
- `DisconnectEvent` - Triggered when the Webcast connection closes (including the livestream ending)
- `LiveEndEvent` - Triggered when the livestream ends
- `LivePausedEvent` - Triggered when the livestream is paused
- `LiveUnpausedEvent` - Triggered when the livestream is unpaused
- `LivePauseEvent` - Triggered when the livestream is paused
- `LiveUnpauseEvent` - Triggered when the livestream is unpaused
- `FollowEvent` - Triggered when a user in the livestream follows the streamer
- `ShareEvent` - Triggered when a user shares the livestream
- `UnknownEvent` - Events not currently tracked by TikTokLive as they have not been reverse-engineered

- `WebsocketResponseEvent` - Triggered when any event is received (contains the event)
- `UnknownEvent` - An instance of `WebsocketResponseEvent` thrown whenever an event does not have an existing definition, useful for debugging

### Proto Events

Expand Down
186 changes: 160 additions & 26 deletions TikTokLive/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@
from TikTokLive.client.web.web_settings import WebDefaults
from TikTokLive.client.ws.ws_client import WebcastWSClient
from TikTokLive.events import Event, EventHandler
from TikTokLive.events.custom_events import UnknownEvent, ConnectEvent, FollowEvent, ShareEvent, LiveEndEvent, \
DisconnectEvent, LivePausedEvent, LiveUnpausedEvent
from TikTokLive.events.custom_events import WebsocketResponseEvent, ConnectEvent, FollowEvent, ShareEvent, LiveEndEvent, \
DisconnectEvent, LivePauseEvent, LiveUnpauseEvent, UnknownEvent, CustomEvent
from TikTokLive.events.proto_events import EVENT_MAPPINGS, ProtoEvent, ControlEvent
from TikTokLive.proto import WebcastResponse, WebcastResponseMessage, ControlAction


class TikTokLiveClient(AsyncIOEventEmitter):
"""
A client to connect to & read from TikTok LIVE streams
"""

def __init__(
self,
Expand All @@ -32,6 +36,17 @@ def __init__(
web_kwargs: dict = {},
ws_kwargs: dict = {}
):
"""
Instantiate the TikTokLiveClient client
:param unique_id: The username of the creator to connect to
:param web_proxy: An optional proxy used for HTTP requests
:param ws_proxy: An optional proxy used for the WebSocket connection
:param web_kwargs: Optional arguments used by the HTTP client
:param ws_kwargs: Optional arguments used by the WebSocket client
"""

super().__init__()
unique_id = self.parse_unique_id(unique_id)

Expand Down Expand Up @@ -176,7 +191,13 @@ async def disconnect(self) -> None:
self._event_loop_task = None

async def _client_loop(self, initial_response: WebcastResponse) -> None:
"""Run the main client loop to handle events"""
"""
Run the main client loop to handle events
:param initial_response: The WebcastResponse retrieved from the sign server with connection info
:return: None
"""

async for event in self._ws_loop(initial_response):

Expand All @@ -191,7 +212,13 @@ async def _client_loop(self, initial_response: WebcastResponse) -> None:
self.emit(ev.type, ev)

async def _ws_loop(self, initial_response: WebcastResponse) -> AsyncIterator[Optional[Event]]:
"""Run the websocket loop to handle incoming WS messages"""
"""
Run the websocket loop to handle incoming WS messages
:param initial_response: The WebcastResponse retrieved from the sign server with connection info
:return: None
"""

first_event: bool = True

Expand All @@ -213,7 +240,13 @@ async def _ws_loop(self, initial_response: WebcastResponse) -> AsyncIterator[Opt
yield event

def _build_connect_info(self, initial_response: WebcastResponse) -> Tuple[str, dict]:
"""Create connection info for starting the connection"""
"""
Create connection info for starting the connection
:param initial_response: The WebcastResponse retrieved from the sign server with connection info
:return: None
"""

connect_uri: str = (
initial_response.push_server
Expand All @@ -228,77 +261,178 @@ def _build_connect_info(self, initial_response: WebcastResponse) -> Tuple[str, d
return connect_uri, connect_headers

def on(self, event: Type[Event], f: Optional[EventHandler] = None) -> Union[Handler, Callable[[Handler], Handler]]:
"""
Decorator that can be used to register a Python function as an event listener
:param event: The event to listen to
:param f: The function to handle the event
:return: The wrapped function as a generated `pyee.Handler` object
"""

return super(TikTokLiveClient, self).on(event.get_type(), f)

def add_listener(self, event: Type[Event], f: EventHandler) -> Handler:
"""
Method that can be used to register a Python function as an event listener
:param event: The event to listen to
:param f: The function to handle the event
:return: The generated `pyee.Handler` object
"""

return super().add_listener(event=event.get_type(), f=f)

def has_listener(self, *events: Type[Event]) -> bool:
return any(event.__name__ in self._events for event in events)
def has_listener(self, event: Type[Event]) -> bool:
"""
Check whether the client is listening to a given event
:param event: The event to check listening for
:return: Whether it is being listened to
"""

return event.__name__ in self._events

def _parse_webcast_response(self, response: Optional[WebcastResponseMessage]) -> List[Event]:
"""Parse incoming webcast responses"""
"""
Parse incoming webcast responses into events that can be emitted
:param response: The WebcastResponseMessage protobuf message
:return: A list of events that can be gleamed from this event
"""

# Invalid response handler
if response is None:
self._logger.warning("Received a null response from the Webcast server.")
self._logger.warning("Received a null WebcastResponseMessage from the Webcast server.")
return []

# Get the proto mapping for proto-events
event_type: Optional[Type[ProtoEvent]] = EVENT_MAPPINGS.get(response.method)
response_event: Event = WebsocketResponseEvent().from_pydict(response.to_dict())

if not event_type:
return [UnknownEvent().from_pydict(response.to_dict())]
# If the event is not tracked, return
if event_type is None:
return [response_event, UnknownEvent().from_pydict(response.to_dict())]

event: Event = event_type().parse(response.payload)
# Get the underlying events
proto_event: ProtoEvent = event_type().parse(response.payload)
parsed_events: List[Event] = [response_event, proto_event]
custom_event: Optional[Event] = self._parse_custom_event(response, proto_event)

# Handle stream control events
if isinstance(event, ControlEvent):
return_events: List[Event] = [event]
# Add the custom event IF not null
return [custom_event, *parsed_events] if custom_event else parsed_events

@classmethod
def _parse_custom_event(cls, response: WebcastResponseMessage, event: ProtoEvent) -> Optional[CustomEvent]:
"""
Extract CustomEvent events from existing ProtoEvent events
:param response: The WebcastResponseMessage to parse for the custom event
:param event: The ProtoEvent to parse for the custom event
:return: The event, if one exists
"""

# LiveEndEvent, LivePauseEvent, LiveUnpauseEvent
if isinstance(event, ControlEvent):
if event.action == ControlAction.STREAM_ENDED:
return_events.append(LiveEndEvent().parse(response.payload))
return LiveEndEvent().parse(response.payload)
elif event.action == ControlAction.STREAM_PAUSED:
return_events.append(LivePausedEvent().parse(response.payload))
return LivePauseEvent().parse(response.payload)
elif event.action == ControlAction.STREAM_PAUSED:
return_events.append(LiveUnpausedEvent().parse(response.payload))
return LiveUnpauseEvent().parse(response.payload)
return None

return return_events
# FollowEvent
if "follow" in event.common.display_text.key:
return FollowEvent().parse(response.payload)

# Handle follow & share events
if self.has_listener(FollowEvent, ShareEvent):
if "follow" in event.common.display_text.key:
return [FollowEvent().parse(response.payload), event]
if "share" in event.common.display_text.key:
return [ShareEvent().parse(response.payload), event]
# ShareEvent
if "share" in event.common.display_text.key:
return ShareEvent().parse(response.payload)

return [event]
# Not a custom event
return None

@property
def room_id(self) -> Optional[str]:
"""
The room ID the user is currently connected to
:return: Room ID or None
"""

return self._room_id

@property
def web(self) -> TikTokWebClient:
"""
The HTTP client that this client uses for requests
:return: A copy of the TikTokWebClient
"""

return self._web

@property
def _asyncio_loop(self) -> AbstractEventLoop:
"""
Property to return the existing or generate a new asyncio event loop
:return: An asyncio event loop
"""

try:
return asyncio.get_running_loop()
except RuntimeError:
return asyncio.new_event_loop()

@property
def connected(self) -> bool:
"""
Whether the WebSocket client is currently connected to TikTok
:return: Connection status
"""

return self._ws.connected

@property
def logger(self) -> logging.Logger:
"""
The internal logger used by TikTokLive
:return: An instance of a `logging.Logger`
"""

return self._logger

@property
def gift_info(self) -> Optional[dict]:
"""
Information about the stream's gifts *if* fetch_gift_info=True when starting the client e.g. with `client.run`)
:return: The stream gift info
"""

return self._gift_info

@property
def room_info(self) -> Optional[dict]:
"""
Information about the room *if* fetch_room_info=True when starting the client (e.g. with `client.run`)
:return: Dictionary of room info
"""

return self._room_info
23 changes: 17 additions & 6 deletions TikTokLive/events/custom_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,16 @@
from TikTokLive.proto import WebcastResponseMessage


class UnknownEvent(WebcastResponseMessage, BaseEvent):
class WebsocketResponseEvent(WebcastResponseMessage, BaseEvent):
"""
Thrown when a Webcast message is received that is NOT tracked by TikTokLive yet.
Triggered when any event is received from the WebSocket
"""


class UnknownEvent(WebsocketResponseEvent):
"""
Triggered when a Webcast message is received that is NOT tracked by TikTokLive yet.
"""

Expand Down Expand Up @@ -40,14 +47,14 @@ class LiveEndEvent(ControlEvent):
"""


class LivePausedEvent(ControlEvent):
class LivePauseEvent(ControlEvent):
"""
Thrown when the stream is paused
"""


class LiveUnpausedEvent(ControlEvent):
class LiveUnpauseEvent(ControlEvent):
"""
Thrown when a paused stream is unpaused
Expand Down Expand Up @@ -84,22 +91,26 @@ def users_joined(self) -> Optional[int]:


CustomEvent: Type = Union[
WebsocketResponseEvent,
UnknownEvent,
ConnectEvent,
FollowEvent,
ShareEvent,
LiveEndEvent,
LivePauseEvent,
LiveUnpauseEvent,
DisconnectEvent
]

__all__ = [
"WebsocketResponseEvent",
"UnknownEvent",
"ConnectEvent",
"FollowEvent",
"ShareEvent",
"LiveEndEvent",
"LivePausedEvent",
"LiveUnpausedEvent",
"LivePauseEvent",
"LiveUnpauseEvent",
"CustomEvent",
"DisconnectEvent"
]

0 comments on commit b3eb322

Please sign in to comment.