diff --git a/Cargo.lock b/Cargo.lock index 2470f44..3897f1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1159,7 +1159,7 @@ dependencies = [ [[package]] name = "tiktoklive" -version = "0.0.13" +version = "0.0.14" dependencies = [ "bytes", "env_logger", diff --git a/Cargo.toml b/Cargo.toml index 91e3fd6..9043da6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ protobuf = { version = "3.4.0", features = ["bytes"] } url = "2.5.0" tungstenite = "0.16.0" +futures-util = "0.3.30" [build-dependencies] protobuf-codegen = "3.4.0" diff --git a/src/core/live_client_websocket.rs b/src/core/live_client_websocket.rs index ff85982..ee07538 100644 --- a/src/core/live_client_websocket.rs +++ b/src/core/live_client_websocket.rs @@ -1,9 +1,12 @@ +use futures_util::{SinkExt, StreamExt}; +use log::info; use protobuf::Message; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::thread; -use tokio_tungstenite::tungstenite::connect; +use tokio::sync::Mutex; +use tokio::time::{interval, timeout, Duration}; use tokio_tungstenite::tungstenite::handshake::client::Request; +use tokio_tungstenite::{connect_async, tungstenite::protocol::Message as WsMessage}; use crate::core::live_client::TikTokLiveClient; use crate::core::live_client_mapper::TikTokLiveMessageMapper; @@ -49,13 +52,17 @@ impl TikTokLiveWebsocketClient { .header("Referer", "https://www.tiktok.com/") .header("Origin", "https://www.tiktok.com") .header("Accept-Language", "en-US,en;q=0.9") - .header("Accept-Encoding", "gzip, deflater") + .header("Accept-Encoding", "gzip, deflate") .header("Cookie", response.web_socket_cookies) .header("Sec-Websocket-Version", "13") .body(()) .map_err(|_| LibError::ParamsError)?; - let (mut socket, _) = connect(request).map_err(|_| LibError::WebSocketConnectFailed)?; + let (ws_stream, _) = connect_async(request) + .await + .map_err(|_| LibError::WebSocketConnectFailed)?; + let (write, mut read) = ws_stream.split(); + let write = Arc::new(Mutex::new(write)); client.set_connection_state(CONNECTED); client.publish_event(TikTokLiveEvent::OnConnected(TikTokConnectedEvent {})); @@ -64,49 +71,82 @@ impl TikTokLiveWebsocketClient { running.store(true, Ordering::SeqCst); let message_mapper = self.message_mapper.clone(); + let client_clone = client.clone(); + let write_clone = write.clone(); + let running_clone = running.clone(); + + tokio::spawn(async move { + info!("Websocket connected"); + while running_clone.load(Ordering::SeqCst) { + if let Some(Ok(message)) = read.next().await { + if let WsMessage::Binary(buffer) = message { + let mut push_frame = match WebcastPushFrame::parse_from_bytes(&buffer) { + Ok(frame) => frame, + Err(_) => continue, + }; - thread::spawn(move || { - while running.load(Ordering::SeqCst) { - let optional_message = socket.read_message(); - - if let Ok(message) = optional_message { - let buffer = message.into_data(); - - let mut push_frame = match WebcastPushFrame::parse_from_bytes(buffer.as_slice()) - { - Ok(frame) => frame, - Err(_) => continue, - }; - - let webcast_response = match WebcastResponse::parse_from_bytes( - push_frame.Payload.as_mut_slice(), - ) { - Ok(response) => response, - Err(_) => continue, - }; - - if webcast_response.needsAck { - let mut push_frame_ack = WebcastPushFrame::new(); - push_frame_ack.PayloadType = "ack".to_string(); - push_frame_ack.LogId = push_frame.LogId; - push_frame_ack.Payload = webcast_response.internalExt.clone().into_bytes(); - - let binary = match push_frame_ack.write_to_bytes() { - Ok(bytes) => bytes, + let webcast_response = match WebcastResponse::parse_from_bytes( + push_frame.Payload.as_mut_slice(), + ) { + Ok(response) => response, Err(_) => continue, }; - let message = tungstenite::protocol::Message::binary(binary); - if socket.write_message(message).is_err() { - continue; + if webcast_response.needsAck { + let mut push_frame_ack = WebcastPushFrame::new(); + push_frame_ack.PayloadType = "ack".to_string(); + push_frame_ack.LogId = push_frame.LogId; + push_frame_ack.Payload = + webcast_response.internalExt.clone().into_bytes(); + + let binary = match push_frame_ack.write_to_bytes() { + Ok(bytes) => bytes, + Err(_) => continue, + }; + + let message = WsMessage::Binary(binary); + if write_clone.lock().await.send(message).await.is_err() { + continue; + } } - } - message_mapper.handle_webcast_response(webcast_response, client.as_ref()); + message_mapper + .handle_webcast_response(webcast_response, client_clone.as_ref()); + } } } }); + let write_clone = write.clone(); + let running_clone = running.clone(); + tokio::spawn(async move { + let mut interval = interval(Duration::from_secs(9)); + while running_clone.load(Ordering::SeqCst) { + interval.tick().await; + + let heartbeat_message = WsMessage::Binary(vec![0x3a, 0x02, 0x68, 0x62]); + + match timeout( + Duration::from_secs(5), + write_clone.lock().await.send(heartbeat_message), + ) + .await + { + Ok(Ok(_)) => { + log::info!("Heartbeat sent"); + } + Ok(Err(e)) => { + log::error!("Failed to send heartbeat: {:?}", e); + break; + } + Err(e) => { + log::error!("Heartbeat send timed out: {:?}", e); + break; + } + } + } + log::info!("Heartbeat task stopped"); + }); Ok(()) }