Skip to content

Commit

Permalink
added heartbeat to websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
ZmoleCristian authored Jun 21, 2024
2 parents 3591e46 + 9f6308a commit 4d3c2da
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 38 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ protobuf = { version = "3.4.0", features = ["bytes"] }

url = "2.5.0"
tungstenite = "0.16.0"
futures-util = "0.3.30"

[build-dependencies]
protobuf-codegen = "3.4.0"
Expand Down
114 changes: 77 additions & 37 deletions src/core/live_client_websocket.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use futures_util::{SinkExt, StreamExt};
use log::info;
use protobuf::Message;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use tokio_tungstenite::tungstenite::connect;
use tokio::sync::Mutex;
use tokio::time::{interval, timeout, Duration};
use tokio_tungstenite::tungstenite::handshake::client::Request;
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message as WsMessage};

use crate::core::live_client::TikTokLiveClient;
use crate::core::live_client_mapper::TikTokLiveMessageMapper;
Expand Down Expand Up @@ -49,13 +52,17 @@ impl TikTokLiveWebsocketClient {
.header("Referer", "https://www.tiktok.com/")
.header("Origin", "https://www.tiktok.com")
.header("Accept-Language", "en-US,en;q=0.9")
.header("Accept-Encoding", "gzip, deflater")
.header("Accept-Encoding", "gzip, deflate")
.header("Cookie", response.web_socket_cookies)
.header("Sec-Websocket-Version", "13")
.body(())
.map_err(|_| LibError::ParamsError)?;

let (mut socket, _) = connect(request).map_err(|_| LibError::WebSocketConnectFailed)?;
let (ws_stream, _) = connect_async(request)
.await
.map_err(|_| LibError::WebSocketConnectFailed)?;
let (write, mut read) = ws_stream.split();
let write = Arc::new(Mutex::new(write));

client.set_connection_state(CONNECTED);
client.publish_event(TikTokLiveEvent::OnConnected(TikTokConnectedEvent {}));
Expand All @@ -64,49 +71,82 @@ impl TikTokLiveWebsocketClient {
running.store(true, Ordering::SeqCst);

let message_mapper = self.message_mapper.clone();
let client_clone = client.clone();
let write_clone = write.clone();
let running_clone = running.clone();

tokio::spawn(async move {
info!("Websocket connected");
while running_clone.load(Ordering::SeqCst) {
if let Some(Ok(message)) = read.next().await {
if let WsMessage::Binary(buffer) = message {
let mut push_frame = match WebcastPushFrame::parse_from_bytes(&buffer) {
Ok(frame) => frame,
Err(_) => continue,
};

thread::spawn(move || {
while running.load(Ordering::SeqCst) {
let optional_message = socket.read_message();

if let Ok(message) = optional_message {
let buffer = message.into_data();

let mut push_frame = match WebcastPushFrame::parse_from_bytes(buffer.as_slice())
{
Ok(frame) => frame,
Err(_) => continue,
};

let webcast_response = match WebcastResponse::parse_from_bytes(
push_frame.Payload.as_mut_slice(),
) {
Ok(response) => response,
Err(_) => continue,
};

if webcast_response.needsAck {
let mut push_frame_ack = WebcastPushFrame::new();
push_frame_ack.PayloadType = "ack".to_string();
push_frame_ack.LogId = push_frame.LogId;
push_frame_ack.Payload = webcast_response.internalExt.clone().into_bytes();

let binary = match push_frame_ack.write_to_bytes() {
Ok(bytes) => bytes,
let webcast_response = match WebcastResponse::parse_from_bytes(
push_frame.Payload.as_mut_slice(),
) {
Ok(response) => response,
Err(_) => continue,
};

let message = tungstenite::protocol::Message::binary(binary);
if socket.write_message(message).is_err() {
continue;
if webcast_response.needsAck {
let mut push_frame_ack = WebcastPushFrame::new();
push_frame_ack.PayloadType = "ack".to_string();
push_frame_ack.LogId = push_frame.LogId;
push_frame_ack.Payload =
webcast_response.internalExt.clone().into_bytes();

let binary = match push_frame_ack.write_to_bytes() {
Ok(bytes) => bytes,
Err(_) => continue,
};

let message = WsMessage::Binary(binary);
if write_clone.lock().await.send(message).await.is_err() {
continue;
}
}
}

message_mapper.handle_webcast_response(webcast_response, client.as_ref());
message_mapper
.handle_webcast_response(webcast_response, client_clone.as_ref());
}
}
}
});

let write_clone = write.clone();
let running_clone = running.clone();
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(9));
while running_clone.load(Ordering::SeqCst) {
interval.tick().await;

let heartbeat_message = WsMessage::Binary(vec![0x3a, 0x02, 0x68, 0x62]);

match timeout(
Duration::from_secs(5),
write_clone.lock().await.send(heartbeat_message),
)
.await
{
Ok(Ok(_)) => {
log::info!("Heartbeat sent");
}
Ok(Err(e)) => {
log::error!("Failed to send heartbeat: {:?}", e);
break;
}
Err(e) => {
log::error!("Heartbeat send timed out: {:?}", e);
break;
}
}
}
log::info!("Heartbeat task stopped");
});
Ok(())
}

Expand Down

0 comments on commit 4d3c2da

Please sign in to comment.