From 7a5906e10589c4d358dd7351ce525066a994a34b Mon Sep 17 00:00:00 2001 From: Dat Date: Tue, 19 Dec 2023 13:30:10 +0700 Subject: [PATCH 01/17] Update Justfile and feeder.rs --- Justfile | 2 +- examples/all.rs | 1 - sonar-project.properties | 2 -- src/feeder.rs | 1 + 4 files changed, 2 insertions(+), 4 deletions(-) delete mode 100644 sonar-project.properties diff --git a/Justfile b/Justfile index ad49f23..a8254c5 100644 --- a/Justfile +++ b/Justfile @@ -28,7 +28,7 @@ example bin: cargo run --package tradingview-rs --example {{bin}} lines-of-code: - @git ls-files | grep '\.rs' | xargs wc -l + @cloc $(git ls-files) creds-scan: @ggshield secret scan repo ./ diff --git a/examples/all.rs b/examples/all.rs index fb2cd66..808637a 100644 --- a/examples/all.rs +++ b/examples/all.rs @@ -1,7 +1,6 @@ use dotenv::dotenv; use std::env; -use tracing::info; use tradingview::{ chart, chart::ChartOptions, diff --git a/sonar-project.properties b/sonar-project.properties deleted file mode 100644 index 89e70c4..0000000 --- a/sonar-project.properties +++ /dev/null @@ -1,2 +0,0 @@ -sonar.projectKey=tradingview-rs -sonar.exclusions=tests/**,examples/** diff --git a/src/feeder.rs b/src/feeder.rs index 12afaac..ef603b0 100644 --- a/src/feeder.rs +++ b/src/feeder.rs @@ -54,6 +54,7 @@ impl Feeder { // } pub fn unsubscribe(&mut self, event_type: TradingViewDataEvent, listener: T) { + todo!() } pub fn notify(&self, event_type: TradingViewDataEvent) {} From 47b6733dd8319b584d78e4b32b99c455608aa5b1 Mon Sep 17 00:00:00 2001 From: Dat Nguyen Date: Thu, 4 Jan 2024 12:19:43 +0700 Subject: [PATCH 02/17] Refactor WebSocket structs to be generic --- examples/all.rs | 2 +- src/chart/session.rs | 10 ++++----- src/error.rs | 51 +++++++++++++++++++++++++++----------------- src/feeder.rs | 51 ++++++++++++++++++++++++++++++++------------ src/quote/session.rs | 10 ++++----- src/socket.rs | 1 + 6 files changed, 80 insertions(+), 45 deletions(-) diff --git a/examples/all.rs b/examples/all.rs index 808637a..ac5406b 100644 --- a/examples/all.rs +++ b/examples/all.rs @@ -18,7 +18,7 @@ async fn main() -> anyhow::Result<()> { let socket = SocketSession::new(DataServer::ProData, auth_token).await?; - let publisher = Feeder::new(); + let publisher: Feeder = Feeder::new(); let mut chart = chart::session::WebSocket::new(publisher.clone(), socket.clone()); let mut quote = quote::session::WebSocket::new(publisher.clone(), socket.clone()); diff --git a/src/chart/session.rs b/src/chart/session.rs index 00fa261..8e1f3a8 100644 --- a/src/chart/session.rs +++ b/src/chart/session.rs @@ -11,8 +11,8 @@ use async_trait::async_trait; use serde_json::Value; #[derive(Clone)] -pub struct WebSocket { - feeder: Feeder, +pub struct WebSocket { + feeder: Feeder, socket: SocketSession, } @@ -22,8 +22,8 @@ pub struct SeriesInfo { pub options: ChartOptions, } -impl WebSocket { - pub fn new(feeder: Feeder, socket: SocketSession) -> Self { +impl WebSocket { + pub fn new(feeder: Feeder, socket: SocketSession) -> Self { Self { feeder, socket } } @@ -441,7 +441,7 @@ impl WebSocket { } #[async_trait] -impl Socket for WebSocket { +impl Socket for WebSocket { async fn handle_message_data(&mut self, message: SocketMessageDe) -> Result<()> { let event = TradingViewDataEvent::from(message.m.clone()); self.feeder.handle_events(event, &message.p).await; diff --git a/src/error.rs b/src/error.rs index 7ca5dd3..051338d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -2,18 +2,22 @@ use thiserror::Error; #[derive(Debug, Error)] pub enum Error { - #[error("Generic {0}")] Generic(String), - #[error("failed to send the api request")] RequestError(#[from] reqwest::Error), - #[error("failed to parse the api response")] JsonParseError(#[from] serde_json::Error), - #[error("failed to convert into int from {}", .0)] TypeConversionError( - #[from] std::num::ParseIntError, - ), - #[error("invalid header value")] HeaderValueError(#[from] reqwest::header::InvalidHeaderValue), - #[error("failed to login")] LoginError(#[from] LoginError), - #[error("failed to capture regex data")] RegexError(#[from] regex::Error), - #[error("can not establish websocket connection")] WebSocketError( - #[from] tokio_tungstenite::tungstenite::Error, - ), + #[error("Generic {0}")] + Generic(String), + #[error("failed to send the api request")] + RequestError(#[from] reqwest::Error), + #[error("failed to parse the api response")] + JsonParseError(#[from] serde_json::Error), + #[error("failed to convert into int from {}", .0)] + TypeConversionError(#[from] std::num::ParseIntError), + #[error("invalid header value")] + HeaderValueError(#[from] reqwest::header::InvalidHeaderValue), + #[error("failed to login")] + LoginError(#[from] LoginError), + #[error("failed to capture regex data")] + RegexError(#[from] regex::Error), + #[error("can not establish websocket connection")] + WebSocketError(#[from] tokio_tungstenite::tungstenite::Error), #[error("no chart token found")] NoChartTokenFound, #[error("No scan data found")] @@ -28,16 +32,23 @@ pub enum Error { SymbolsNotSpecified, #[error("no search data found")] NoSearchDataFound, - #[error("inexistent or unsupported indicator {}", .0)] IndicatorDataNotFound(String), - #[error("tokio task join error")] TokioJoinError(#[from] tokio::task::JoinError), - #[error("url parse error")] UrlParseError(#[from] url::ParseError), - #[error("base64 decode error")] Base64DecodeError(#[from] base64::DecodeError), - #[error("zip error")] ZipError(#[from] zip::result::ZipError), - #[error("io error")] IOError(#[from] std::io::Error), - #[error("TradingView error")] TradingViewError(#[from] TradingViewError), + #[error("inexistent or unsupported indicator {}", .0)] + IndicatorDataNotFound(String), + #[error("tokio task join error")] + TokioJoinError(#[from] tokio::task::JoinError), + #[error("url parse error")] + UrlParseError(#[from] url::ParseError), + #[error("base64 decode error")] + Base64DecodeError(#[from] base64::DecodeError), + #[error("zip error")] + ZipError(#[from] zip::result::ZipError), + #[error("io error")] + IOError(#[from] std::io::Error), + #[error("TradingView error")] + TradingViewError(#[from] TradingViewError), } -#[derive(Debug, Clone, Error)] +#[derive(Debug, Clone, Error, PartialEq)] pub enum TradingViewError { #[error("series_error")] SeriesError, diff --git a/src/feeder.rs b/src/feeder.rs index ef603b0..2a905a6 100644 --- a/src/feeder.rs +++ b/src/feeder.rs @@ -1,8 +1,3 @@ -use serde::Deserialize; -use serde_json::Value; -use std::{collections::HashMap, sync::Arc}; -use tracing::{debug, error, info, trace, warn}; - use crate::{ chart::{ models::{ChartResponseData, SeriesCompletedMessage, StudyResponseData, SymbolInfo}, @@ -15,11 +10,28 @@ use crate::{ socket::{Socket, SocketMessageDe, SocketSession, TradingViewDataEvent}, Error, Result, }; +use serde::Deserialize; +use serde_json::Value; +use std::future::Future; +use std::pin::Pin; +use std::{collections::HashMap, sync::Arc}; +use tracing::{debug, error, info, trace, warn}; -#[derive(Clone)] -pub struct Feeder { +#[derive(Clone, Default)] +pub struct Feeder { pub(crate) metadata: Metadata, - // publisher: Vec>, + callbacks: HashMap< + TradingViewDataEvent, + Arc Pin + Send + Sync>> + Send + Sync>, + >, +} + +pub struct FeederBuilder {} + +impl FeederBuilder { + pub fn new() -> Self { + Self {} + } } #[derive(Default, Clone)] @@ -32,7 +44,7 @@ pub struct Metadata { pub quote_session: String, } -impl Feeder { +impl Feeder { // pub async fn subscribe( // &mut self, // listener: &mut T, @@ -44,20 +56,31 @@ impl Feeder { pub fn new() -> Self { Self { metadata: Metadata::default(), + callbacks: HashMap::new(), // publisher: Vec::new(), } } + // pub fn process_with_callback(data: T, callback: F) + // where + // T: serde::Serialize, + // F: Fn(T), + // { + // callback(data); + // } + // pub fn add(mut self, publisher: Box) -> Self { // self.publisher.push(publisher); // self // } - pub fn unsubscribe(&mut self, event_type: TradingViewDataEvent, listener: T) { - todo!() - } + // pub fn unsubscribe(&mut self, event_type: TradingViewDataEvent, listener: T) { + // todo!() + // } - pub fn notify(&self, event_type: TradingViewDataEvent) {} + // pub fn notify(&self, event_type: TradingViewDataEvent, data: T) { + // todo!() + // } pub async fn handle_events(&mut self, event: TradingViewDataEvent, message: &Vec) { match event { @@ -132,7 +155,7 @@ impl Feeder { .collect(); // timestamp, open, high, low, close, volume debug!("series data extracted: {:?}", data); - + // self.notify(TradingViewDataEvent::OnChartData, data) // TODO: Notify function } None => { diff --git a/src/quote/session.rs b/src/quote/session.rs index 28887df..9ad8711 100644 --- a/src/quote/session.rs +++ b/src/quote/session.rs @@ -10,13 +10,13 @@ use async_trait::async_trait; use serde_json::Value; #[derive(Clone)] -pub struct WebSocket { - feeder: Feeder, +pub struct WebSocket { + feeder: Feeder, socket: SocketSession, } -impl WebSocket { - pub fn new(feeder: Feeder, socket: SocketSession) -> Self { +impl WebSocket { + pub fn new(feeder: Feeder, socket: SocketSession) -> Self { Self { feeder, socket } } @@ -78,7 +78,7 @@ impl WebSocket { } #[async_trait] -impl Socket for WebSocket { +impl Socket for WebSocket { async fn handle_message_data(&mut self, message: SocketMessageDe) -> Result<()> { let event = TradingViewDataEvent::from(message.m.clone()); self.feeder.handle_events(event, &message.p).await; diff --git a/src/socket.rs b/src/socket.rs index 9237ac4..ef4eec2 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -35,6 +35,7 @@ lazy_static::lazy_static! { }; } +#[derive(Debug, Clone, PartialEq)] pub enum TradingViewDataEvent { OnChartData, OnChartDataUpdate, From fd634d01fb806e8f1436c76cee35eea053623752 Mon Sep 17 00:00:00 2001 From: Dat Nguyen Date: Thu, 4 Jan 2024 13:39:13 +0700 Subject: [PATCH 03/17] Add todo!() to handle_study_data() and add notify functions for quote data --- src/feeder.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/feeder.rs b/src/feeder.rs index 2a905a6..6d5fa3e 100644 --- a/src/feeder.rs +++ b/src/feeder.rs @@ -180,7 +180,9 @@ impl Feeder { Ok(()) } - async fn handle_study_data(&self) {} + async fn handle_study_data(&self) { + todo!() + } async fn handle_quote_data(&mut self, message: &Vec) { let qsd = QuoteData::deserialize(&message[1]).unwrap(); @@ -193,9 +195,11 @@ impl Feeder { for (_, q) in &self.metadata.quotes { debug!("quote data: {:?}", q); + // TODO: Notify function for quote data } } else { error!("quote data status error: {:?}", qsd); + // TODO: Notify function for quote data error } } } From 6323b6fa585c1b41008b044b948f22b5de2679b6 Mon Sep 17 00:00:00 2001 From: Dat Nguyen Date: Thu, 4 Jan 2024 15:11:34 +0700 Subject: [PATCH 04/17] Refactor WebSocket struct and Feeder struct Add Debug trait to WebSocket and Feeder structs --- src/chart/session.rs | 7 ++++--- src/feeder.rs | 20 ++++++++++---------- src/quote/session.rs | 7 ++++--- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/src/chart/session.rs b/src/chart/session.rs index 8e1f3a8..ef74ed0 100644 --- a/src/chart/session.rs +++ b/src/chart/session.rs @@ -9,9 +9,10 @@ use crate::{ }; use async_trait::async_trait; use serde_json::Value; +use std::fmt::Debug; #[derive(Clone)] -pub struct WebSocket { +pub struct WebSocket { feeder: Feeder, socket: SocketSession, } @@ -22,7 +23,7 @@ pub struct SeriesInfo { pub options: ChartOptions, } -impl WebSocket { +impl WebSocket { pub fn new(feeder: Feeder, socket: SocketSession) -> Self { Self { feeder, socket } } @@ -441,7 +442,7 @@ impl WebSocket { } #[async_trait] -impl Socket for WebSocket { +impl Socket for WebSocket { async fn handle_message_data(&mut self, message: SocketMessageDe) -> Result<()> { let event = TradingViewDataEvent::from(message.m.clone()); self.feeder.handle_events(event, &message.p).await; diff --git a/src/feeder.rs b/src/feeder.rs index 6d5fa3e..89c5e47 100644 --- a/src/feeder.rs +++ b/src/feeder.rs @@ -12,13 +12,13 @@ use crate::{ }; use serde::Deserialize; use serde_json::Value; -use std::future::Future; use std::pin::Pin; use std::{collections::HashMap, sync::Arc}; +use std::{fmt::Debug, future::Future}; use tracing::{debug, error, info, trace, warn}; #[derive(Clone, Default)] -pub struct Feeder { +pub struct Feeder { pub(crate) metadata: Metadata, callbacks: HashMap< TradingViewDataEvent, @@ -44,7 +44,7 @@ pub struct Metadata { pub quote_session: String, } -impl Feeder { +impl Feeder { // pub async fn subscribe( // &mut self, // listener: &mut T, @@ -61,13 +61,13 @@ impl Feeder { } } - // pub fn process_with_callback(data: T, callback: F) - // where - // T: serde::Serialize, - // F: Fn(T), - // { - // callback(data); - // } + pub fn process_with_callback(data: T, callback: F) + where + T: serde::Serialize, + F: Fn(T), + { + callback(data); + } // pub fn add(mut self, publisher: Box) -> Self { // self.publisher.push(publisher); diff --git a/src/quote/session.rs b/src/quote/session.rs index 9ad8711..073d739 100644 --- a/src/quote/session.rs +++ b/src/quote/session.rs @@ -8,14 +8,15 @@ use crate::{ }; use async_trait::async_trait; use serde_json::Value; +use std::fmt::Debug; #[derive(Clone)] -pub struct WebSocket { +pub struct WebSocket { feeder: Feeder, socket: SocketSession, } -impl WebSocket { +impl WebSocket { pub fn new(feeder: Feeder, socket: SocketSession) -> Self { Self { feeder, socket } } @@ -78,7 +79,7 @@ impl WebSocket { } #[async_trait] -impl Socket for WebSocket { +impl Socket for WebSocket { async fn handle_message_data(&mut self, message: SocketMessageDe) -> Result<()> { let event = TradingViewDataEvent::from(message.m.clone()); self.feeder.handle_events(event, &message.p).await; From a3ee1e6f62e82c9542f19a85653ffd7b13c271eb Mon Sep 17 00:00:00 2001 From: Dat Nguyen Date: Fri, 5 Jan 2024 16:26:41 +0700 Subject: [PATCH 05/17] Add WebSocket connection establishment to SocketSession --- src/socket.rs | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/socket.rs b/src/socket.rs index ef4eec2..4c07d08 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -174,6 +174,29 @@ pub struct SocketSession { } impl SocketSession { + /// Establishes a WebSocket connection to a TradingView data server. + /// + /// # Arguments + /// + /// * `server` - A reference to the `DataServer` enum which represents the server to connect to. + /// * `auth_token` - A string slice that holds the authentication token. + /// + /// # Returns + /// + /// * A `Result` which is: + /// * `Ok` - A tuple containing the split sink and stream of the WebSocket connection. + /// * `Err` - An error that occurred while trying to establish the connection or send the authentication message. + /// + /// # Asynchronous + /// + /// This function is asynchronous, it returns a Future that should be awaited. + /// + /// This function first constructs the URL for the WebSocket connection based on the provided server. + /// It then creates a client request from the URL and adds necessary headers. + /// The `connect_async` function is used to establish the WebSocket connection. + /// The connection is then split into a write and read part. + /// An authentication message is sent using the write part of the connection. + /// Finally, it returns the write and read parts of the connection. async fn connect( server: &DataServer, auth_token: &str, From 7b8048600551a372209d38d9e657ffa1f9e82a18 Mon Sep 17 00:00:00 2001 From: Dat Nguyen Date: Sat, 6 Jan 2024 17:57:51 +0700 Subject: [PATCH 06/17] Add protobuf dependency and handle parsed messages in socket.rs --- Cargo.toml | 3 +- src/socket.rs | 12 ++------ src/utils.rs | 80 ++++++++++++++++++++++++++++----------------------- 3 files changed, 48 insertions(+), 47 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bf5fc90..3cb6349 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ path = "src/lib.rs" [features] default = ["user"] user = ["dep:google-authenticator"] +protobuf = ["dep:prost"] [dependencies] tokio = { version = "1", default-features = false } @@ -41,12 +42,12 @@ iso_currency = { version = "0.4", features = ["with-serde"] } zip = "^0.6" base64 = "^0.21" google-authenticator = { version = "^0.4", optional = true } +prost = { version = "0.12", optional = true } [dev-dependencies] tokio = { version = "1", features = ["full"] } criterion = { version = "0.5", features = ["html_reports"] } anyhow = "1" -protobuf = "3" dotenv = "0.15.0" tracing-subscriber = "0.3" chrono = { version = "0.4.31", features = ["serde"] } diff --git a/src/socket.rs b/src/socket.rs index 4c07d08..4cfadb3 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -313,16 +313,8 @@ pub trait Socket { match &raw { Message::Text(text) => { trace!("parsing message: {:?}", text); - match parse_packet(text) { - Ok(parsed_messages) => { - self.handle_parsed_messages(session, parsed_messages, &raw) - .await; - } - Err(e) => { - error!("error parsing message: {:?}", e); - self.handle_error(e).await; - } - } + self.handle_parsed_messages(session, parse_packet(text), &raw) + .await; } Message::Close(msg) => { warn!("connection closed with code: {:?}", msg); diff --git a/src/utils.rs b/src/utils.rs index 6d76d84..a546525 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,23 +1,26 @@ use crate::{ - models::{ MarketAdjustment, SessionType }, - socket::{ SocketMessage, SocketMessageDe }, + models::{MarketAdjustment, SessionType}, + socket::{SocketMessage, SocketMessageDe}, Result, }; -use base64::engine::{ general_purpose::STANDARD as BASE64, Engine as _ }; +use base64::engine::{general_purpose::STANDARD as BASE64, Engine as _}; use iso_currency::Currency; use rand::Rng; use regex::Regex; -use reqwest::header::{ HeaderMap, HeaderValue, ACCEPT, COOKIE, ORIGIN, REFERER }; +use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, COOKIE, ORIGIN, REFERER}; use serde::Serialize; use serde_json::Value; -use std::{ collections::HashMap, io::{ prelude::*, Cursor } }; +use std::{ + collections::HashMap, + io::{prelude::*, Cursor}, +}; use tokio_tungstenite::tungstenite::protocol::Message; -use tracing::{ debug, error }; +use tracing::{debug, error}; use zip::ZipArchive; lazy_static::lazy_static! { - static ref CLEANER_REGEX: Regex = Regex::new(r"~h~").unwrap(); - static ref SPLITTER_REGEX: Regex = Regex::new(r"~m~\d+~m~").unwrap(); + static ref CLEANER_REGEX: Regex = Regex::new(r"~h~").expect("Failed to compile regex"); + static ref SPLITTER_REGEX: Regex = Regex::new(r"~m~\d+~m~").expect("Failed to compile regex"); } #[macro_export] @@ -33,14 +36,19 @@ macro_rules! payload { pub fn build_request(cookie: Option<&str>) -> Result { let mut headers = HeaderMap::new(); headers.insert(ACCEPT, HeaderValue::from_static("application/json")); - headers.insert(ORIGIN, HeaderValue::from_static("https://www.tradingview.com")); - headers.insert(REFERER, HeaderValue::from_static("https://www.tradingview.com/")); + headers.insert( + ORIGIN, + HeaderValue::from_static("https://www.tradingview.com"), + ); + headers.insert( + REFERER, + HeaderValue::from_static("https://www.tradingview.com/"), + ); if let Some(cookie) = cookie { headers.insert(COOKIE, HeaderValue::from_str(cookie)?); } - let client = reqwest::Client - ::builder() + let client = reqwest::Client::builder() .use_rustls_tls() .default_headers(headers) .https_only(true) @@ -63,30 +71,29 @@ pub fn gen_id() -> String { result } -pub fn parse_packet(message: &str) -> Result>> { +pub fn parse_packet(message: &str) -> Vec> { if message.is_empty() { - return Ok(vec![]); + return vec![]; } let cleaned_message = CLEANER_REGEX.replace_all(message, ""); - let packets: Vec> = SPLITTER_REGEX.split(&cleaned_message) + let packets: Vec> = SPLITTER_REGEX + .split(&cleaned_message) .filter(|packet| !packet.is_empty()) - .map(|packet| { - match serde_json::from_str(packet) { - Ok(value) => value, - Err(error) => { - if error.is_syntax() { - error!("error parsing packet, invalid JSON: {}", error); - } else { - error!("error parsing packet: {}", error); - } - SocketMessage::Unknown(packet.to_string()) + .map(|packet| match serde_json::from_str(packet) { + Ok(value) => value, + Err(error) => { + if error.is_syntax() { + error!("error parsing packet, invalid JSON: {}", error); + } else { + error!("error parsing packet: {}", error); } + SocketMessage::Unknown(packet.to_string()) } }) .collect(); - Ok(packets) + packets } pub fn format_packet(packet: T) -> Result { @@ -101,7 +108,7 @@ pub fn symbol_init( adjustment: Option, currency: Option, session_type: Option, - replay: Option + replay: Option, ) -> Result { let mut symbol_init: HashMap = HashMap::new(); if let Some(s) = replay { @@ -135,18 +142,20 @@ pub fn _parse_compressed(data: &str) -> Result { mod tests { use serde_json::json; - use crate::{ models::{ MarketAdjustment, SessionType }, utils::* }; + use crate::{ + models::{MarketAdjustment, SessionType}, + utils::*, + }; #[test] fn test_parse_packet() { let current_dir = std::env::current_dir().unwrap().display().to_string(); println!("Current dir: {}", current_dir); - let messages = std::fs - ::read_to_string(format!("{}/tests/data/socket_messages.txt", current_dir)) - .unwrap(); + let messages = + std::fs::read_to_string(format!("{}/tests/data/socket_messages.txt", current_dir)) + .unwrap(); let result = parse_packet(messages.as_str()); - assert!(result.is_ok()); - let data = result.unwrap(); + let data = result; assert_eq!(data.len(), 42); } @@ -169,12 +178,11 @@ mod tests { Some(MarketAdjustment::Dividends), Some(iso_currency::Currency::USD), Some(SessionType::Extended), - Some("aaaaaaaaaaaa".to_string()) + Some("aaaaaaaaaaaa".to_string()), ); assert!(test2.is_ok()); let test2_json: Value = serde_json::from_str(&test2.unwrap().replace('=', "")).unwrap(); - let expected2_json = - json!({ + let expected2_json = json!({ "adjustment": "dividends", "currency-id": "USD", "replay": "aaaaaaaaaaaa", From 6141c55d5471375656dfb0d3b73ff0246eeac590 Mon Sep 17 00:00:00 2001 From: Dat Nguyen Date: Sat, 6 Jan 2024 22:26:53 +0700 Subject: [PATCH 07/17] Update async callback --- examples/all.rs | 2 +- src/chart/session.rs | 10 +++---- src/error.rs | 2 +- src/feeder.rs | 68 ++++++++++++++++++++++++++++++++++---------- src/quote/session.rs | 10 +++---- src/socket.rs | 2 +- 6 files changed, 66 insertions(+), 28 deletions(-) diff --git a/examples/all.rs b/examples/all.rs index ac5406b..89819be 100644 --- a/examples/all.rs +++ b/examples/all.rs @@ -18,7 +18,7 @@ async fn main() -> anyhow::Result<()> { let socket = SocketSession::new(DataServer::ProData, auth_token).await?; - let publisher: Feeder = Feeder::new(); + let publisher: Feeder = Feeder::new(); let mut chart = chart::session::WebSocket::new(publisher.clone(), socket.clone()); let mut quote = quote::session::WebSocket::new(publisher.clone(), socket.clone()); diff --git a/src/chart/session.rs b/src/chart/session.rs index ef74ed0..879c2e1 100644 --- a/src/chart/session.rs +++ b/src/chart/session.rs @@ -12,8 +12,8 @@ use serde_json::Value; use std::fmt::Debug; #[derive(Clone)] -pub struct WebSocket { - feeder: Feeder, +pub struct WebSocket<'a> { + feeder: Feeder<'a>, socket: SocketSession, } @@ -23,8 +23,8 @@ pub struct SeriesInfo { pub options: ChartOptions, } -impl WebSocket { - pub fn new(feeder: Feeder, socket: SocketSession) -> Self { +impl<'a> WebSocket<'a> { + pub fn new(feeder: Feeder<'a>, socket: SocketSession) -> Self { Self { feeder, socket } } @@ -442,7 +442,7 @@ impl WebSocket { } #[async_trait] -impl Socket for WebSocket { +impl<'a> Socket for WebSocket<'a> { async fn handle_message_data(&mut self, message: SocketMessageDe) -> Result<()> { let event = TradingViewDataEvent::from(message.m.clone()); self.feeder.handle_events(event, &message.p).await; diff --git a/src/error.rs b/src/error.rs index 051338d..e8934c4 100644 --- a/src/error.rs +++ b/src/error.rs @@ -48,7 +48,7 @@ pub enum Error { TradingViewError(#[from] TradingViewError), } -#[derive(Debug, Clone, Error, PartialEq)] +#[derive(Debug, Clone, Error, PartialEq, Eq, Hash)] pub enum TradingViewError { #[error("series_error")] SeriesError, diff --git a/src/feeder.rs b/src/feeder.rs index 89c5e47..2064667 100644 --- a/src/feeder.rs +++ b/src/feeder.rs @@ -10,7 +10,8 @@ use crate::{ socket::{Socket, SocketMessageDe, SocketSession, TradingViewDataEvent}, Error, Result, }; -use serde::Deserialize; +use futures_util::future::BoxFuture; +use serde::{Deserialize, Serialize}; use serde_json::Value; use std::pin::Pin; use std::{collections::HashMap, sync::Arc}; @@ -18,12 +19,43 @@ use std::{fmt::Debug, future::Future}; use tracing::{debug, error, info, trace, warn}; #[derive(Clone, Default)] -pub struct Feeder { +pub struct Feeder<'a> { pub(crate) metadata: Metadata, - callbacks: HashMap< - TradingViewDataEvent, - Arc Pin + Send + Sync>> + Send + Sync>, - >, + callbacks: Callbacks<'a>, +} + +type AsyncCallback<'a, T> = Box BoxFuture<'a, ()>) + Send + Sync + 'a>; + +#[derive(Clone)] +pub struct Callbacks<'a> { + on_chart_data: Arc>, + on_quote_data: Arc>, + on_series_completed: Arc>, + on_study_completed: Arc>, + on_unknown_event: Arc>, +} + +impl Default for Callbacks<'_> { + fn default() -> Self { + Self { + on_chart_data: Arc::new(Box::new(|_| Box::pin(async {}))), + on_quote_data: Arc::new(Box::new(|_| Box::pin(async {}))), + on_series_completed: Arc::new(Box::new(|_| Box::pin(async {}))), + on_study_completed: Arc::new(Box::new(|_| Box::pin(async {}))), + on_unknown_event: Arc::new(Box::new(|_| Box::pin(async {}))), + } + } +} + +impl<'a> Callbacks<'a> { + pub fn on_chart_data( + &mut self, + f: impl FnOnce(ChartResponseData) -> Fut + Send + Sync + 'a, + ) where + Fut: Future + Send + 'a, + { + self.on_chart_data = Arc::new(Box::new(|data| Box::pin(f(data)))); + } } pub struct FeederBuilder {} @@ -44,7 +76,7 @@ pub struct Metadata { pub quote_session: String, } -impl Feeder { +impl<'a> Feeder<'a> { // pub async fn subscribe( // &mut self, // listener: &mut T, @@ -56,17 +88,19 @@ impl Feeder { pub fn new() -> Self { Self { metadata: Metadata::default(), - callbacks: HashMap::new(), + callbacks: Callbacks::default(), // publisher: Vec::new(), } } - pub fn process_with_callback(data: T, callback: F) - where - T: serde::Serialize, - F: Fn(T), - { - callback(data); + pub fn process_with_callback( + &mut self, + event: TradingViewDataEvent, + callback: Box>>, + ) -> &mut Self { + // let cb = Arc::new(callback); + // self.callbacks.insert(event, cb); + self } // pub fn add(mut self, publisher: Box) -> Self { @@ -82,7 +116,11 @@ impl Feeder { // todo!() // } - pub async fn handle_events(&mut self, event: TradingViewDataEvent, message: &Vec) { + pub(crate) async fn handle_events( + &mut self, + event: TradingViewDataEvent, + message: &Vec, + ) { match event { TradingViewDataEvent::OnChartData | TradingViewDataEvent::OnChartDataUpdate => { trace!("received chart data: {:?}", message); diff --git a/src/quote/session.rs b/src/quote/session.rs index 073d739..110674f 100644 --- a/src/quote/session.rs +++ b/src/quote/session.rs @@ -11,13 +11,13 @@ use serde_json::Value; use std::fmt::Debug; #[derive(Clone)] -pub struct WebSocket { - feeder: Feeder, +pub struct WebSocket<'a> { + feeder: Feeder<'a>, socket: SocketSession, } -impl WebSocket { - pub fn new(feeder: Feeder, socket: SocketSession) -> Self { +impl<'a> WebSocket<'a> { + pub fn new(feeder: Feeder<'a>, socket: SocketSession) -> Self { Self { feeder, socket } } @@ -79,7 +79,7 @@ impl WebSocket { } #[async_trait] -impl Socket for WebSocket { +impl<'a> Socket for WebSocket<'a> { async fn handle_message_data(&mut self, message: SocketMessageDe) -> Result<()> { let event = TradingViewDataEvent::from(message.m.clone()); self.feeder.handle_events(event, &message.p).await; diff --git a/src/socket.rs b/src/socket.rs index 4cfadb3..544f747 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -35,7 +35,7 @@ lazy_static::lazy_static! { }; } -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum TradingViewDataEvent { OnChartData, OnChartDataUpdate, From 00f3186bda33470ebe7586e0961152b619e1be27 Mon Sep 17 00:00:00 2001 From: Dat Nguyen Date: Sun, 7 Jan 2024 00:06:03 +0700 Subject: [PATCH 08/17] Update module names and types --- examples/all.rs | 4 +- src/chart/session.rs | 30 +++++++++------ src/{feeder.rs => data_loader.rs} | 64 +++++++++++++++++++++++++++---- src/lib.rs | 2 +- src/quote/models.rs | 24 ++++++++++-- src/quote/session.rs | 27 +++++++------ 6 files changed, 113 insertions(+), 38 deletions(-) rename src/{feeder.rs => data_loader.rs} (82%) diff --git a/examples/all.rs b/examples/all.rs index 89819be..76e61bd 100644 --- a/examples/all.rs +++ b/examples/all.rs @@ -4,7 +4,7 @@ use std::env; use tradingview::{ chart, chart::ChartOptions, - feeder::Feeder, + data_loader::DataLoader, models::{pine_indicator::ScriptType, Interval}, quote, socket::{DataServer, SocketSession}, @@ -18,7 +18,7 @@ async fn main() -> anyhow::Result<()> { let socket = SocketSession::new(DataServer::ProData, auth_token).await?; - let publisher: Feeder = Feeder::new(); + let publisher: DataLoader = DataLoader::new(); let mut chart = chart::session::WebSocket::new(publisher.clone(), socket.clone()); let mut quote = quote::session::WebSocket::new(publisher.clone(), socket.clone()); diff --git a/src/chart/session.rs b/src/chart/session.rs index 879c2e1..b20e642 100644 --- a/src/chart/session.rs +++ b/src/chart/session.rs @@ -1,6 +1,6 @@ use crate::{ chart::{ChartOptions, StudyOptions}, - feeder::Feeder, + data_loader::DataLoader, models::{pine_indicator::PineIndicator, Interval, Timezone}, payload, socket::{Socket, SocketMessageDe, SocketSession, TradingViewDataEvent}, @@ -13,7 +13,7 @@ use std::fmt::Debug; #[derive(Clone)] pub struct WebSocket<'a> { - feeder: Feeder<'a>, + data_loader: DataLoader<'a>, socket: SocketSession, } @@ -24,8 +24,11 @@ pub struct SeriesInfo { } impl<'a> WebSocket<'a> { - pub fn new(feeder: Feeder<'a>, socket: SocketSession) -> Self { - Self { feeder, socket } + pub fn new(data_loader: DataLoader<'a>, socket: SocketSession) -> Self { + Self { + data_loader, + socket, + } } // Begin TradingView WebSocket methods @@ -325,7 +328,7 @@ impl<'a> WebSocket<'a> { } pub async fn delete(&mut self) -> Result<&mut Self> { - for (_, s) in self.feeder.metadata.series.clone() { + for (_, s) in self.data_loader.metadata.series.clone() { self.delete_chart_session_id(&s.chart_session).await?; } self.socket.close().await?; @@ -367,8 +370,8 @@ impl<'a> WebSocket<'a> { chart_session: &str, series_id: &str, ) -> Result<&mut Self> { - self.feeder.metadata.studies_count += 1; - let study_count = self.feeder.metadata.studies_count; + self.data_loader.metadata.studies_count += 1; + let study_count = self.data_loader.metadata.studies_count; let study_id = format!("st{}", study_count); let indicator = PineIndicator::build() @@ -379,7 +382,7 @@ impl<'a> WebSocket<'a> { ) .await?; - self.feeder + self.data_loader .metadata .studies .insert(indicator.metadata.data.id.clone(), study_id.clone()); @@ -390,8 +393,8 @@ impl<'a> WebSocket<'a> { } pub async fn set_market(&mut self, options: ChartOptions) -> Result<&mut Self> { - self.feeder.metadata.series_count += 1; - let series_count = self.feeder.metadata.series_count; + self.data_loader.metadata.series_count += 1; + let series_count = self.data_loader.metadata.series_count; let symbol_series_id = format!("sds_sym_{}", series_count); let series_id = format!("sds_{}", series_count); let series_version = format!("s{}", series_count); @@ -431,7 +434,10 @@ impl<'a> WebSocket<'a> { options, }; - self.feeder.metadata.series.insert(series_id, series_info); + self.data_loader + .metadata + .series + .insert(series_id, series_info); Ok(self) } @@ -445,7 +451,7 @@ impl<'a> WebSocket<'a> { impl<'a> Socket for WebSocket<'a> { async fn handle_message_data(&mut self, message: SocketMessageDe) -> Result<()> { let event = TradingViewDataEvent::from(message.m.clone()); - self.feeder.handle_events(event, &message.p).await; + self.data_loader.handle_events(event, &message.p).await; Ok(()) } } diff --git a/src/feeder.rs b/src/data_loader.rs similarity index 82% rename from src/feeder.rs rename to src/data_loader.rs index 2064667..9f22b8e 100644 --- a/src/feeder.rs +++ b/src/data_loader.rs @@ -19,16 +19,16 @@ use std::{fmt::Debug, future::Future}; use tracing::{debug, error, info, trace, warn}; #[derive(Clone, Default)] -pub struct Feeder<'a> { +pub struct DataLoader<'a> { pub(crate) metadata: Metadata, callbacks: Callbacks<'a>, } -type AsyncCallback<'a, T> = Box BoxFuture<'a, ()>) + Send + Sync + 'a>; +type AsyncCallback<'a, T> = Box BoxFuture<'a, ()>) + Send + Sync + 'a>; #[derive(Clone)] pub struct Callbacks<'a> { - on_chart_data: Arc>, + on_chart_data: Arc>>>, on_quote_data: Arc>, on_series_completed: Arc>, on_study_completed: Arc>, @@ -50,11 +50,57 @@ impl Default for Callbacks<'_> { impl<'a> Callbacks<'a> { pub fn on_chart_data( &mut self, - f: impl FnOnce(ChartResponseData) -> Fut + Send + Sync + 'a, - ) where + f: impl Fn(Vec>) -> Fut + Send + Sync + 'a, + ) -> &mut Self + where Fut: Future + Send + 'a, { - self.on_chart_data = Arc::new(Box::new(|data| Box::pin(f(data)))); + self.on_chart_data = Arc::new(Box::new(move |data| Box::pin(f(data)))); + self + } + + pub fn on_quote_data( + &mut self, + f: impl Fn(QuoteData) -> Fut + Send + Sync + 'a, + ) -> &mut Self + where + Fut: Future + Send + 'a, + { + self.on_quote_data = Arc::new(Box::new(move |data| Box::pin(f(data)))); + self + } + + pub fn on_series_completed( + &mut self, + f: impl Fn(SeriesCompletedMessage) -> Fut + Send + Sync + 'a, + ) -> &mut Self + where + Fut: Future + Send + 'a, + { + self.on_series_completed = Arc::new(Box::new(move |data| Box::pin(f(data)))); + self + } + + pub fn on_study_completed( + &mut self, + f: impl Fn(StudyResponseData) -> Fut + Send + Sync + 'a, + ) -> &mut Self + where + Fut: Future + Send + 'a, + { + self.on_study_completed = Arc::new(Box::new(move |data| Box::pin(f(data)))); + self + } + + pub fn on_unknown_event( + &mut self, + f: impl Fn(Value) -> Fut + Send + Sync + 'a, + ) -> &mut Self + where + Fut: Future + Send + 'a, + { + self.on_unknown_event = Arc::new(Box::new(move |data| Box::pin(f(data)))); + self } } @@ -76,7 +122,7 @@ pub struct Metadata { pub quote_session: String, } -impl<'a> Feeder<'a> { +impl<'a> DataLoader<'a> { // pub async fn subscribe( // &mut self, // listener: &mut T, @@ -194,7 +240,9 @@ impl<'a> Feeder<'a> { // timestamp, open, high, low, close, volume debug!("series data extracted: {:?}", data); // self.notify(TradingViewDataEvent::OnChartData, data) + // TODO: Notify function + (self.callbacks.on_chart_data)(data).await; } None => { debug!("receive empty data on series: {:?}", s); @@ -231,7 +279,7 @@ impl<'a> Feeder<'a> { self.metadata.quotes.insert(qsd.name, qsd.value); } - for (_, q) in &self.metadata.quotes { + for q in self.metadata.quotes.values() { debug!("quote data: {:?}", q); // TODO: Notify function for quote data } diff --git a/src/lib.rs b/src/lib.rs index af277ef..b4560c9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,7 @@ pub mod chart; pub mod client; pub mod error; -pub mod feeder; +pub mod data_loader; pub mod models; pub mod quote; pub mod socket; diff --git a/src/quote/models.rs b/src/quote/models.rs index e9543b4..db85fd8 100644 --- a/src/quote/models.rs +++ b/src/quote/models.rs @@ -1,4 +1,4 @@ -use serde::{ Deserialize, Serialize }; +use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)] pub struct QuoteData { @@ -10,41 +10,59 @@ pub struct QuoteData { pub value: QuoteValue, } -#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)] +#[derive(Clone, PartialEq, Deserialize, Serialize)] +#[cfg_attr(feature = "protobuf", derive(prost::Message))] +#[cfg_attr(not(feature = "protobuf"), derive(Debug))] pub struct QuoteValue { + #[cfg_attr(feature = "protobuf", prost(double, optional, tag = "1"))] #[serde(default)] pub ask: Option, + #[cfg_attr(feature = "protobuf", prost(double, optional, tag = "2"))] #[serde(default)] pub ask_size: Option, + #[cfg_attr(feature = "protobuf", prost(double, optional, tag = "3"))] #[serde(default)] pub bid: Option, + #[cfg_attr(feature = "protobuf", prost(double, optional, tag = "4"))] #[serde(default)] pub bid_size: Option, + #[cfg_attr(feature = "protobuf", prost(double, optional, tag = "5"))] #[serde(default, rename(deserialize = "ch"))] pub change: Option, + #[cfg_attr(feature = "protobuf", prost(double, optional, tag = "6"))] #[serde(default, rename(deserialize = "chp"))] pub change_percent: Option, + #[cfg_attr(feature = "protobuf", prost(double, optional, tag = "7"))] #[serde(default, rename(deserialize = "open_price"))] pub open: Option, + #[cfg_attr(feature = "protobuf", prost(double, optional, tag = "8"))] #[serde(default, rename(deserialize = "high_price"))] pub high: Option, + #[cfg_attr(feature = "protobuf", prost(double, optional, tag = "9"))] #[serde(default, rename(deserialize = "low_price"))] pub low: Option, + #[cfg_attr(feature = "protobuf", prost(double, optional, tag = "10"))] #[serde(default, rename(deserialize = "prev_close_price"))] pub prev_close: Option, + #[cfg_attr(feature = "protobuf", prost(double, optional, tag = "11"))] #[serde(default, rename(deserialize = "lp"))] pub price: Option, + #[cfg_attr(feature = "protobuf", prost(double, optional, tag = "12"))] #[serde(default, rename(deserialize = "lp_time"))] pub timestamp: Option, + #[cfg_attr(feature = "protobuf", prost(double, optional, tag = "13"))] #[serde(default)] pub volume: Option, - + #[cfg_attr(feature = "protobuf", prost(string, optional, tag = "14"))] #[serde(default, rename(deserialize = "currency_id"))] pub currency: Option, + #[cfg_attr(feature = "protobuf", prost(string, optional, tag = "15"))] #[serde(default, rename(deserialize = "short_name"))] pub symbol: Option, + #[cfg_attr(feature = "protobuf", prost(string, optional, tag = "16"))] #[serde(default, rename(deserialize = "exchange"))] pub exchange: Option, + #[cfg_attr(feature = "protobuf", prost(string, optional, tag = "17"))] #[serde(default, rename(deserialize = "type"))] pub market_type: Option, } diff --git a/src/quote/session.rs b/src/quote/session.rs index 110674f..c88ea5f 100644 --- a/src/quote/session.rs +++ b/src/quote/session.rs @@ -1,5 +1,5 @@ use crate::{ - feeder::Feeder, + data_loader::DataLoader, payload, quote::ALL_QUOTE_FIELDS, socket::{Socket, SocketMessageDe, SocketSession, TradingViewDataEvent}, @@ -8,22 +8,24 @@ use crate::{ }; use async_trait::async_trait; use serde_json::Value; -use std::fmt::Debug; #[derive(Clone)] pub struct WebSocket<'a> { - feeder: Feeder<'a>, + data_loader: DataLoader<'a>, socket: SocketSession, } impl<'a> WebSocket<'a> { - pub fn new(feeder: Feeder<'a>, socket: SocketSession) -> Self { - Self { feeder, socket } + pub fn new(data_loader: DataLoader<'a>, socket: SocketSession) -> Self { + Self { + data_loader, + socket, + } } pub async fn create_session(&mut self) -> Result<&mut Self> { let quote_session = gen_session_id("qs"); - self.feeder.metadata.quote_session = quote_session.clone(); + self.data_loader.metadata.quote_session = quote_session.clone(); self.socket .send("quote_create_session", &payload!(quote_session)) .await?; @@ -34,21 +36,22 @@ impl<'a> WebSocket<'a> { self.socket .send( "quote_delete_session", - &payload!(self.feeder.metadata.quote_session.clone()), + &payload!(self.data_loader.metadata.quote_session.clone()), ) .await?; Ok(self) } pub async fn set_fields(&mut self) -> Result<&mut Self> { - let mut quote_fields = payload![self.feeder.metadata.quote_session.clone().to_string()]; + let mut quote_fields = + payload![self.data_loader.metadata.quote_session.clone().to_string()]; quote_fields.extend(ALL_QUOTE_FIELDS.clone().into_iter().map(Value::from)); self.socket.send("quote_set_fields", "e_fields).await?; Ok(self) } pub async fn add_symbols(&mut self, symbols: Vec<&str>) -> Result<&mut Self> { - let mut payloads = payload![self.feeder.metadata.quote_session.clone()]; + let mut payloads = payload![self.data_loader.metadata.quote_session.clone()]; payloads.extend(symbols.into_iter().map(Value::from)); self.socket.send("quote_add_symbols", &payloads).await?; Ok(self) @@ -60,14 +63,14 @@ impl<'a> WebSocket<'a> { } pub async fn fast_symbols(&mut self, symbols: Vec<&str>) -> Result<&mut Self> { - let mut payloads = payload![self.feeder.metadata.quote_session.clone()]; + let mut payloads = payload![self.data_loader.metadata.quote_session.clone()]; payloads.extend(symbols.into_iter().map(Value::from)); self.socket.send("quote_fast_symbols", &payloads).await?; Ok(self) } pub async fn remove_symbols(&mut self, symbols: Vec<&str>) -> Result<&mut Self> { - let mut payloads = payload![self.feeder.metadata.quote_session.clone()]; + let mut payloads = payload![self.data_loader.metadata.quote_session.clone()]; payloads.extend(symbols.into_iter().map(Value::from)); self.socket.send("quote_remove_symbols", &payloads).await?; Ok(self) @@ -82,7 +85,7 @@ impl<'a> WebSocket<'a> { impl<'a> Socket for WebSocket<'a> { async fn handle_message_data(&mut self, message: SocketMessageDe) -> Result<()> { let event = TradingViewDataEvent::from(message.m.clone()); - self.feeder.handle_events(event, &message.p).await; + self.data_loader.handle_events(event, &message.p).await; Ok(()) } } From 425089c1f9047c7e6476a97552fec6d3e5c2f44b Mon Sep 17 00:00:00 2001 From: Dat Nguyen Date: Sun, 7 Jan 2024 00:17:28 +0700 Subject: [PATCH 09/17] added prost derive --- src/models/mod.rs | 34 +++++++++++++++++++++++++++------- src/models/pine_indicator.rs | 28 ++++++++++++++++++++-------- 2 files changed, 47 insertions(+), 15 deletions(-) diff --git a/src/models/mod.rs b/src/models/mod.rs index 0a03d35..3a4bebe 100644 --- a/src/models/mod.rs +++ b/src/models/mod.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use serde::{ Deserialize, Deserializer, Serialize }; +use serde::{Deserialize, Deserializer, Serialize}; pub mod pine_indicator; #[derive(Debug, Clone, Default, Serialize, Deserialize)] @@ -37,17 +37,27 @@ pub struct ChartDrawingSourceStatePoint { price: f64, } -#[derive(Default, Debug, Clone, Serialize, Deserialize)] +#[cfg_attr(not(feature = "protobuf"), derive(Debug, Default))] +#[cfg_attr(feature = "protobuf", derive(prost::Message))] +#[derive(Clone, Serialize, Deserialize)] pub struct UserCookies { + #[cfg_attr(feature = "protobuf", prost(uint32, tag = "1"))] pub id: u32, + #[cfg_attr(feature = "protobuf", prost(string, tag = "2"))] pub username: String, + #[cfg_attr(feature = "protobuf", prost(string, tag = "3"))] pub private_channel: String, + #[cfg_attr(feature = "protobuf", prost(string, tag = "4"))] pub auth_token: String, + #[cfg_attr(feature = "protobuf", prost(string, tag = "5"))] #[serde(default)] pub session: String, + #[cfg_attr(feature = "protobuf", prost(string, tag = "6"))] #[serde(default)] pub session_signature: String, + #[cfg_attr(feature = "protobuf", prost(string, tag = "7"))] pub session_hash: String, + #[cfg_attr(feature = "protobuf", prost(string, tag = "8"))] #[serde(default)] pub device_token: String, } @@ -59,19 +69,28 @@ pub struct SymbolSearchResponse { pub symbols: Vec, } -#[derive(Debug, Clone, Default, Deserialize)] +#[cfg_attr(not(feature = "protobuf"), derive(Debug, Default))] +#[cfg_attr(feature = "protobuf", derive(prost::Message))] +#[derive(Clone, Deserialize)] pub struct Symbol { + #[cfg_attr(feature = "protobuf", prost(string, tag = "1"))] pub symbol: String, + #[cfg_attr(feature = "protobuf", prost(string, tag = "2"))] #[serde(default)] pub description: String, + #[cfg_attr(feature = "protobuf", prost(string, tag = "3"))] #[serde(default, rename(deserialize = "type"))] pub market_type: String, #[serde(default)] + #[cfg_attr(feature = "protobuf", prost(string, tag = "4"))] pub exchange: String, + #[cfg_attr(feature = "protobuf", prost(string, tag = "5"))] #[serde(default)] pub currency_code: String, + #[cfg_attr(feature = "protobuf", prost(string, tag = "6"))] #[serde(default, rename(deserialize = "provider_id"))] pub data_provider: String, + #[cfg_attr(feature = "protobuf", prost(string, tag = "7"))] #[serde(default, rename(deserialize = "country"))] pub country_code: String, } @@ -451,16 +470,17 @@ impl std::fmt::Display for LanguageCode { #[derive(Debug, Clone, PartialEq, Serialize)] #[serde(untagged)] pub enum FinancialPeriod { - FiscalYear, // FY - FiscalQuarter, // FQ - FiscalHalfYear, // FH + FiscalYear, // FY + FiscalQuarter, // FQ + FiscalHalfYear, // FH TrailingTwelveMonths, // TTM UnknownPeriod(String), } impl<'de> Deserialize<'de> for FinancialPeriod { fn deserialize(deserializer: D) -> Result - where D: Deserializer<'de> + where + D: Deserializer<'de>, { let s: String = Deserialize::deserialize(deserializer)?; match s.as_str() { diff --git a/src/models/pine_indicator.rs b/src/models/pine_indicator.rs index 54fa2c3..0b44e45 100644 --- a/src/models/pine_indicator.rs +++ b/src/models/pine_indicator.rs @@ -4,9 +4,9 @@ use serde::Deserialize; use serde_json::Value; use crate::{ - chart::study::{ IndicatorInput, InputValue }, + chart::study::{IndicatorInput, InputValue}, client::misc::get_indicator_metadata, - models::{ FinancialPeriod, UserCookies }, + models::{FinancialPeriod, UserCookies}, Result, }; @@ -29,17 +29,26 @@ impl std::fmt::Display for BuiltinIndicators { } } -#[derive(Debug, Clone, PartialEq, Deserialize)] +#[derive(Clone, PartialEq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(not(feature = "protobuf"), derive(Debug))] +// #[cfg_attr(feature = "protobuf", derive(prost::Message))] pub struct PineInfo { + // #[cfg_attr(feature = "protobuf", prost(int64, optional, tag = "1"))] pub user_id: i64, + // #[cfg_attr(feature = "protobuf", prost(string, optional, tag = "2"))] pub script_name: String, + // #[cfg_attr(feature = "protobuf", prost(string, optional, tag = "3"))] pub script_source: String, + // #[cfg_attr(feature = "protobuf", prost(string, optional, tag = "4"))] #[serde(rename(deserialize = "scriptIdPart"))] pub script_id: String, + // #[cfg_attr(feature = "protobuf", prost(string, optional, tag = "5"))] pub script_access: String, + // #[cfg_attr(feature = "protobuf", prost(string, optional, tag = "6"))] #[serde(rename(deserialize = "version"))] pub script_version: String, + // #[cfg_attr(feature = "protobuf", prost(message, optional, tag = "7"))] pub extra: PineInfoExtra, } @@ -226,7 +235,7 @@ impl PineIndicatorBuilder { &mut self, script_id: &str, script_version: &str, - script_type: ScriptType + script_type: ScriptType, ) -> Result { let metadata = match &self.user { Some(user) => get_indicator_metadata(Some(user), script_id, script_version).await?, @@ -250,12 +259,15 @@ impl PineIndicator { let mut inputs: HashMap = HashMap::new(); inputs.insert( "text".to_string(), - IndicatorInput::String(self.metadata.il_template.clone()) + IndicatorInput::String(self.metadata.il_template.clone()), + ); + inputs.insert( + "pineId".to_string(), + IndicatorInput::String(self.script_id.clone()), ); - inputs.insert("pineId".to_string(), IndicatorInput::String(self.script_id.clone())); inputs.insert( "pineVersion".to_string(), - IndicatorInput::String(self.script_version.clone()) + IndicatorInput::String(self.script_version.clone()), ); self.metadata.data.inputs.iter().for_each(|input| { if input.id == "text" || input.id == "pineId" || input.id == "pineVersion" { @@ -267,7 +279,7 @@ impl PineIndicator { v: input.defval.clone(), f: Value::from(input.is_fake), t: Value::from(input.input_type.clone()), - }) + }), ); }); From eb65b9f6588b9ad35e6e1bd0e0e3b1d5b9b02e9f Mon Sep 17 00:00:00 2001 From: Dat Nguyen Date: Sun, 7 Jan 2024 10:28:00 +0700 Subject: [PATCH 10/17] Refactor DataLoader and add DataLoaderBuilder --- examples/all.rs | 2 +- src/callback.rs | 87 ++++++++++++++++++++++++ src/data_loader.rs | 161 ++++++--------------------------------------- src/lib.rs | 3 +- src/socket.rs | 24 ------- 5 files changed, 110 insertions(+), 167 deletions(-) create mode 100644 src/callback.rs diff --git a/examples/all.rs b/examples/all.rs index 76e61bd..08547fc 100644 --- a/examples/all.rs +++ b/examples/all.rs @@ -18,7 +18,7 @@ async fn main() -> anyhow::Result<()> { let socket = SocketSession::new(DataServer::ProData, auth_token).await?; - let publisher: DataLoader = DataLoader::new(); + let publisher: DataLoader = DataLoader::default(); let mut chart = chart::session::WebSocket::new(publisher.clone(), socket.clone()); let mut quote = quote::session::WebSocket::new(publisher.clone(), socket.clone()); diff --git a/src/callback.rs b/src/callback.rs new file mode 100644 index 0000000..d285d83 --- /dev/null +++ b/src/callback.rs @@ -0,0 +1,87 @@ +use crate::{ + chart::models::{SeriesCompletedMessage, StudyResponseData}, + quote::models::QuoteData, +}; +use futures_util::{future::BoxFuture, Future}; +use serde_json::Value; +use std::sync::Arc; + +pub type AsyncCallback<'a, T> = Box BoxFuture<'a, ()>) + Send + Sync + 'a>; + +#[derive(Clone)] +pub struct Callbacks<'a> { + pub(crate) on_chart_data: Arc>>>, + pub(crate) on_quote_data: Arc>, + pub(crate) on_series_completed: Arc>, + pub(crate) on_study_completed: Arc>, + pub(crate) on_unknown_event: Arc>, +} + +impl Default for Callbacks<'_> { + fn default() -> Self { + Self { + on_chart_data: Arc::new(Box::new(|_| Box::pin(async {}))), + on_quote_data: Arc::new(Box::new(|_| Box::pin(async {}))), + on_series_completed: Arc::new(Box::new(|_| Box::pin(async {}))), + on_study_completed: Arc::new(Box::new(|_| Box::pin(async {}))), + on_unknown_event: Arc::new(Box::new(|_| Box::pin(async {}))), + } + } +} + +impl<'a> Callbacks<'a> { + pub fn on_chart_data( + &mut self, + f: impl Fn(Vec>) -> Fut + Send + Sync + 'a, + ) -> &mut Self + where + Fut: Future + Send + 'a, + { + self.on_chart_data = Arc::new(Box::new(move |data| Box::pin(f(data)))); + self + } + + pub fn on_quote_data( + &mut self, + f: impl Fn(QuoteData) -> Fut + Send + Sync + 'a, + ) -> &mut Self + where + Fut: Future + Send + 'a, + { + self.on_quote_data = Arc::new(Box::new(move |data| Box::pin(f(data)))); + self + } + + pub fn on_series_completed( + &mut self, + f: impl Fn(SeriesCompletedMessage) -> Fut + Send + Sync + 'a, + ) -> &mut Self + where + Fut: Future + Send + 'a, + { + self.on_series_completed = Arc::new(Box::new(move |data| Box::pin(f(data)))); + self + } + + pub fn on_study_completed( + &mut self, + f: impl Fn(StudyResponseData) -> Fut + Send + Sync + 'a, + ) -> &mut Self + where + Fut: Future + Send + 'a, + { + self.on_study_completed = Arc::new(Box::new(move |data| Box::pin(f(data)))); + self + } + + pub fn on_unknown_event( + &mut self, + f: impl Fn(Value) -> Fut + Send + Sync + 'a, + ) -> &mut Self + where + Fut: Future + Send + 'a, + { + self.on_unknown_event = Arc::new(Box::new(move |data| Box::pin(f(data)))); + self + } +} diff --git a/src/data_loader.rs b/src/data_loader.rs index 9f22b8e..b2c9223 100644 --- a/src/data_loader.rs +++ b/src/data_loader.rs @@ -1,21 +1,19 @@ use crate::{ + callback::Callbacks, chart::{ - models::{ChartResponseData, SeriesCompletedMessage, StudyResponseData, SymbolInfo}, + models::{ChartResponseData, StudyResponseData, SymbolInfo}, session::SeriesInfo, }, quote::{ models::{QuoteData, QuoteValue}, utils::merge_quotes, }, - socket::{Socket, SocketMessageDe, SocketSession, TradingViewDataEvent}, - Error, Result, + socket::TradingViewDataEvent, + Result, }; -use futures_util::future::BoxFuture; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; use serde_json::Value; -use std::pin::Pin; -use std::{collections::HashMap, sync::Arc}; -use std::{fmt::Debug, future::Future}; +use std::collections::HashMap; use tracing::{debug, error, info, trace, warn}; #[derive(Clone, Default)] @@ -24,89 +22,9 @@ pub struct DataLoader<'a> { callbacks: Callbacks<'a>, } -type AsyncCallback<'a, T> = Box BoxFuture<'a, ()>) + Send + Sync + 'a>; +pub struct DataLoaderBuilder {} -#[derive(Clone)] -pub struct Callbacks<'a> { - on_chart_data: Arc>>>, - on_quote_data: Arc>, - on_series_completed: Arc>, - on_study_completed: Arc>, - on_unknown_event: Arc>, -} - -impl Default for Callbacks<'_> { - fn default() -> Self { - Self { - on_chart_data: Arc::new(Box::new(|_| Box::pin(async {}))), - on_quote_data: Arc::new(Box::new(|_| Box::pin(async {}))), - on_series_completed: Arc::new(Box::new(|_| Box::pin(async {}))), - on_study_completed: Arc::new(Box::new(|_| Box::pin(async {}))), - on_unknown_event: Arc::new(Box::new(|_| Box::pin(async {}))), - } - } -} - -impl<'a> Callbacks<'a> { - pub fn on_chart_data( - &mut self, - f: impl Fn(Vec>) -> Fut + Send + Sync + 'a, - ) -> &mut Self - where - Fut: Future + Send + 'a, - { - self.on_chart_data = Arc::new(Box::new(move |data| Box::pin(f(data)))); - self - } - - pub fn on_quote_data( - &mut self, - f: impl Fn(QuoteData) -> Fut + Send + Sync + 'a, - ) -> &mut Self - where - Fut: Future + Send + 'a, - { - self.on_quote_data = Arc::new(Box::new(move |data| Box::pin(f(data)))); - self - } - - pub fn on_series_completed( - &mut self, - f: impl Fn(SeriesCompletedMessage) -> Fut + Send + Sync + 'a, - ) -> &mut Self - where - Fut: Future + Send + 'a, - { - self.on_series_completed = Arc::new(Box::new(move |data| Box::pin(f(data)))); - self - } - - pub fn on_study_completed( - &mut self, - f: impl Fn(StudyResponseData) -> Fut + Send + Sync + 'a, - ) -> &mut Self - where - Fut: Future + Send + 'a, - { - self.on_study_completed = Arc::new(Box::new(move |data| Box::pin(f(data)))); - self - } - - pub fn on_unknown_event( - &mut self, - f: impl Fn(Value) -> Fut + Send + Sync + 'a, - ) -> &mut Self - where - Fut: Future + Send + 'a, - { - self.on_unknown_event = Arc::new(Box::new(move |data| Box::pin(f(data)))); - self - } -} - -pub struct FeederBuilder {} - -impl FeederBuilder { +impl DataLoaderBuilder { pub fn new() -> Self { Self {} } @@ -123,45 +41,6 @@ pub struct Metadata { } impl<'a> DataLoader<'a> { - // pub async fn subscribe( - // &mut self, - // listener: &mut T, - // socket: &mut SocketSession - // ) { - // listener.event_loop(&mut socket.clone()).await; - // } - - pub fn new() -> Self { - Self { - metadata: Metadata::default(), - callbacks: Callbacks::default(), - // publisher: Vec::new(), - } - } - - pub fn process_with_callback( - &mut self, - event: TradingViewDataEvent, - callback: Box>>, - ) -> &mut Self { - // let cb = Arc::new(callback); - // self.callbacks.insert(event, cb); - self - } - - // pub fn add(mut self, publisher: Box) -> Self { - // self.publisher.push(publisher); - // self - // } - - // pub fn unsubscribe(&mut self, event_type: TradingViewDataEvent, listener: T) { - // todo!() - // } - - // pub fn notify(&self, event_type: TradingViewDataEvent, data: T) { - // todo!() - // } - pub(crate) async fn handle_events( &mut self, event: TradingViewDataEvent, @@ -239,8 +118,6 @@ impl<'a> DataLoader<'a> { .collect(); // timestamp, open, high, low, close, volume debug!("series data extracted: {:?}", data); - // self.notify(TradingViewDataEvent::OnChartData, data) - // TODO: Notify function (self.callbacks.on_chart_data)(data).await; } @@ -249,27 +126,29 @@ impl<'a> DataLoader<'a> { } } } + + self.handle_study_data(studies, message).await?; + + Ok(()) + } + + async fn handle_study_data( + &self, + studies: &HashMap, + message: &Vec, + ) -> Result<()> { for (k, v) in studies.into_iter() { if let Some(resp_data) = message[1].get(v.as_str()) { debug!("study data received: {} - {:?}", k, resp_data); - let data: Vec> = StudyResponseData::deserialize(resp_data)? - .studies - .into_iter() - .map(|point| point.value) - .collect(); + let data = StudyResponseData::deserialize(resp_data)?; warn!("study data extracted: {} - {:?}", k, data); // TODO: Notify function } } - Ok(()) } - async fn handle_study_data(&self) { - todo!() - } - async fn handle_quote_data(&mut self, message: &Vec) { let qsd = QuoteData::deserialize(&message[1]).unwrap(); if qsd.status == "ok" { diff --git a/src/lib.rs b/src/lib.rs index b4560c9..c78c18f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,8 @@ +pub mod callback; pub mod chart; pub mod client; -pub mod error; pub mod data_loader; +pub mod error; pub mod models; pub mod quote; pub mod socket; diff --git a/src/socket.rs b/src/socket.rs index 544f747..28a3c1b 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -372,29 +372,5 @@ pub trait Socket { async fn handle_error(&mut self, error: Error) { error!("{}", error); - match error { - Error::Generic(_) => todo!(), - Error::RequestError(_) => todo!(), - Error::JsonParseError(_) => todo!(), - Error::TypeConversionError(_) => todo!(), - Error::HeaderValueError(_) => todo!(), - Error::LoginError(_) => todo!(), - Error::RegexError(_) => todo!(), - Error::WebSocketError(_) => todo!(), - Error::NoChartTokenFound => todo!(), - Error::NoScanDataFound => todo!(), - Error::SymbolsNotInSameExchange => todo!(), - Error::ExchangeNotSpecified => todo!(), - Error::InvalidExchange => todo!(), - Error::SymbolsNotSpecified => todo!(), - Error::NoSearchDataFound => todo!(), - Error::IndicatorDataNotFound(_) => todo!(), - Error::TokioJoinError(_) => todo!(), - Error::UrlParseError(_) => todo!(), - Error::Base64DecodeError(_) => todo!(), - Error::ZipError(_) => todo!(), - Error::IOError(_) => todo!(), - Error::TradingViewError(_) => todo!(), - } } } From 3e381151f14389ffe4cdf9c76d9dc26917fb7dee Mon Sep 17 00:00:00 2001 From: Dat Nguyen Date: Sun, 7 Jan 2024 10:30:24 +0700 Subject: [PATCH 11/17] Fix clippy --- src/data_loader.rs | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/src/data_loader.rs b/src/data_loader.rs index b2c9223..f9277e4 100644 --- a/src/data_loader.rs +++ b/src/data_loader.rs @@ -22,14 +22,9 @@ pub struct DataLoader<'a> { callbacks: Callbacks<'a>, } +#[derive(Default, Debug, Clone)] pub struct DataLoaderBuilder {} -impl DataLoaderBuilder { - pub fn new() -> Self { - Self {} - } -} - #[derive(Default, Clone)] pub struct Metadata { pub series_count: u16, @@ -107,7 +102,7 @@ impl<'a> DataLoader<'a> { studies: &HashMap, message: &Vec, ) -> Result<()> { - for (id, s) in series.into_iter() { + for (id, s) in series.iter() { trace!("received v: {:?}, m: {:?}", s, message); match message[1].get(id.as_str()) { Some(resp_data) => { @@ -135,9 +130,9 @@ impl<'a> DataLoader<'a> { async fn handle_study_data( &self, studies: &HashMap, - message: &Vec, + message: &[Value], ) -> Result<()> { - for (k, v) in studies.into_iter() { + for (k, v) in studies.iter() { if let Some(resp_data) = message[1].get(v.as_str()) { debug!("study data received: {} - {:?}", k, resp_data); let data = StudyResponseData::deserialize(resp_data)?; @@ -149,7 +144,7 @@ impl<'a> DataLoader<'a> { Ok(()) } - async fn handle_quote_data(&mut self, message: &Vec) { + async fn handle_quote_data(&mut self, message: &[Value]) { let qsd = QuoteData::deserialize(&message[1]).unwrap(); if qsd.status == "ok" { if let Some(prev_quote) = self.metadata.quotes.get_mut(&qsd.name) { From 9599364b4c0d3890bd379be6a4ef7b9fa6b4a8f7 Mon Sep 17 00:00:00 2001 From: Dat Nguyen Date: Sun, 7 Jan 2024 10:43:00 +0700 Subject: [PATCH 12/17] Update dependencies and remove async_trait from trait implementations --- Cargo.toml | 22 ++++++++++------------ src/chart/session.rs | 3 +-- src/quote/session.rs | 3 +-- src/socket.rs | 3 +-- 4 files changed, 13 insertions(+), 18 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3cb6349..101ecfc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,29 +19,27 @@ user = ["dep:google-authenticator"] protobuf = ["dep:prost"] [dependencies] +async-trait = "0.1" # Remove this when Rust 1.75 async trait is stable tokio = { version = "1", default-features = false } serde = { version = "1", features = ["derive"] } serde_json = { version = "1" } reqwest = { version = "0.11", features = ["rustls-tls", "json", "cookies"] } -lazy_static = "^1.4" -url = "^2" -urlencoding = "^2" +lazy_static = "1.4" +url = "2" +urlencoding = "2" rand = "^0.8" -regex = "^1" -tokio-tungstenite = { version = "^0.21", features = [ - "rustls-tls-webpki-roots", -] } +regex = "1" +tokio-tungstenite = { version = "0.21", features = ["rustls-tls-webpki-roots"] } tracing = "0.1" -thiserror = "^1.0" +thiserror = "1" futures-util = { version = "0.3", default-features = false, features = [ "sink", "std", ] } -async-trait = "^0.1" iso_currency = { version = "0.4", features = ["with-serde"] } -zip = "^0.6" -base64 = "^0.21" -google-authenticator = { version = "^0.4", optional = true } +zip = "0.6" +base64 = "0.21" +google-authenticator = { version = "0.4", optional = true } prost = { version = "0.12", optional = true } [dev-dependencies] diff --git a/src/chart/session.rs b/src/chart/session.rs index b20e642..ef85cca 100644 --- a/src/chart/session.rs +++ b/src/chart/session.rs @@ -7,7 +7,6 @@ use crate::{ utils::{gen_id, gen_session_id, symbol_init}, Result, }; -use async_trait::async_trait; use serde_json::Value; use std::fmt::Debug; @@ -447,7 +446,7 @@ impl<'a> WebSocket<'a> { } } -#[async_trait] +#[async_trait::async_trait] impl<'a> Socket for WebSocket<'a> { async fn handle_message_data(&mut self, message: SocketMessageDe) -> Result<()> { let event = TradingViewDataEvent::from(message.m.clone()); diff --git a/src/quote/session.rs b/src/quote/session.rs index c88ea5f..46258c8 100644 --- a/src/quote/session.rs +++ b/src/quote/session.rs @@ -6,7 +6,6 @@ use crate::{ utils::gen_session_id, Result, }; -use async_trait::async_trait; use serde_json::Value; #[derive(Clone)] @@ -81,7 +80,7 @@ impl<'a> WebSocket<'a> { } } -#[async_trait] +#[async_trait::async_trait] impl<'a> Socket for WebSocket<'a> { async fn handle_message_data(&mut self, message: SocketMessageDe) -> Result<()> { let event = TradingViewDataEvent::from(message.m.clone()); diff --git a/src/socket.rs b/src/socket.rs index 28a3c1b..1918348 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -5,7 +5,6 @@ use crate::{ utils::{format_packet, parse_packet}, Result, UA, }; -use async_trait::async_trait; use futures_util::{ stream::{SplitSink, SplitStream}, SinkExt, StreamExt, @@ -288,7 +287,7 @@ impl SocketSession { } } -#[async_trait] +#[async_trait::async_trait] pub trait Socket { async fn event_loop(&mut self, session: &mut SocketSession) { let read = session.read.clone(); From eb4acffc1a13bee4cdec731aadc204e55b1308db Mon Sep 17 00:00:00 2001 From: Dat Nguyen Date: Thu, 11 Jan 2024 11:17:42 +0700 Subject: [PATCH 13/17] Update user.rs, utils.rs, mod.rs, all.rs, and data_loader.rs --- examples/all.rs | 56 ++++++++++++++++----------------- examples/user.rs | 3 +- src/chart/models.rs | 77 ++++++++++++++++++++++++++++++++++----------- src/chart/utils.rs | 27 ++++++++++++++++ src/data_loader.rs | 25 +++++++++------ src/models/mod.rs | 42 ++++++++++++------------- tests/user_test.rs | 42 +++++++++++++++---------- 7 files changed, 178 insertions(+), 94 deletions(-) diff --git a/examples/all.rs b/examples/all.rs index 08547fc..1c377af 100644 --- a/examples/all.rs +++ b/examples/all.rs @@ -21,44 +21,44 @@ async fn main() -> anyhow::Result<()> { let publisher: DataLoader = DataLoader::default(); let mut chart = chart::session::WebSocket::new(publisher.clone(), socket.clone()); - let mut quote = quote::session::WebSocket::new(publisher.clone(), socket.clone()); + // let mut quote = quote::session::WebSocket::new(publisher.clone(), socket.clone()); // subscriber.subscribe(&mut chart, &mut socket); - let opts = ChartOptions::new("BINANCE:BTCUSDT", Interval::OneMinute).bar_count(100); - let opts2 = ChartOptions::new("BINANCE:BTCUSDT", Interval::Daily).study_config( - "STD;Candlestick%1Pattern%1Bearish%1Abandoned%1Baby", - "33.0", - ScriptType::IntervalScript, - ); - let opts3 = ChartOptions::new("BINANCE:BTCUSDT", Interval::OneHour) - .replay_mode(true) - .replay_from(1698624060); + // let opts = ChartOptions::new("BINANCE:BTCUSDT", Interval::OneMinute).bar_count(100); + // let opts2 = ChartOptions::new("BINANCE:BTCUSDT", Interval::Daily).study_config( + // "STD;Candlestick%1Pattern%1Bearish%1Abandoned%1Baby", + // "33.0", + // ScriptType::IntervalScript, + // ); + let opts3 = ChartOptions::new("BINANCE:BTCUSDT", Interval::OneHour).bar_count(1); + // .replay_mode(true) + // .replay_from(1698624060); chart - .set_market(opts) - .await? - .set_market(opts2) - .await? + // .set_market(opts) + // .await? + // .set_market(opts2) + // .await? .set_market(opts3) .await?; - quote - .create_session() - .await? - .set_fields() - .await? - .add_symbols(vec![ - "SP:SPX", - "BINANCE:BTCUSDT", - "BINANCE:ETHUSDT", - "BITSTAMP:ETHUSD", - "NASDAQ:TSLA", // "BINANCE:B", - ]) - .await?; + // quote + // .create_session() + // .await? + // .set_fields() + // .await? + // .add_symbols(vec![ + // "SP:SPX", + // "BINANCE:BTCUSDT", + // "BINANCE:ETHUSDT", + // "BITSTAMP:ETHUSD", + // "NASDAQ:TSLA", // "BINANCE:B", + // ]) + // .await?; tokio::spawn(async move { chart.clone().subscribe().await }); - tokio::spawn(async move { quote.clone().subscribe().await }); + // tokio::spawn(async move { quote.clone().subscribe().await }); loop {} } diff --git a/examples/user.rs b/examples/user.rs index d8391e2..bebe905 100644 --- a/examples/user.rs +++ b/examples/user.rs @@ -1,9 +1,10 @@ +#![cfg(feature = "user")] use tradingview::user::UserCookies; #[tokio::main] async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::init(); - let user = UserCookies::new() + let user = UserCookies::default() .login("testuser", "testpassword", None) .await?; diff --git a/src/chart/models.rs b/src/chart/models.rs index 6f74b91..f28fcbc 100644 --- a/src/chart/models.rs +++ b/src/chart/models.rs @@ -1,7 +1,6 @@ -use serde::{ Serialize, Deserialize }; -use serde_json::Value; - use crate::models::Interval; +use serde::{Deserialize, Serialize}; +use serde_json::Value; pub enum ChartType { HeikinAshi, @@ -75,50 +74,92 @@ pub struct ChartSeriesData { pub data: Vec>, } -#[derive(Debug, Default, Clone, Serialize)] +#[cfg_attr(not(feature = "protobuf"), derive(Debug, Default))] +#[cfg_attr(feature = "protobuf", derive(prost::Message))] +#[derive(Clone, Serialize)] pub struct SeriesCompletedMessage { - pub session: String, + #[cfg_attr(feature = "protobuf", prost(string, tag = "1"))] + #[serde(default)] pub id: String, + #[cfg_attr(feature = "protobuf", prost(string, tag = "2"))] + #[serde(default)] + pub session: String, + #[cfg_attr(feature = "protobuf", prost(string, tag = "3"))] + #[serde(default)] pub update_mode: String, + #[cfg_attr(feature = "protobuf", prost(string, tag = "4"))] + #[serde(default)] pub version: String, } -#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(default, rename_all = "camelCase")] +#[cfg_attr(not(feature = "protobuf"), derive(Debug, Default))] +#[cfg_attr(feature = "protobuf", derive(prost::Message))] +#[derive(Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] pub struct SymbolInfo { - #[serde(rename(deserialize = "pro_name"))] + #[cfg_attr(feature = "protobuf", prost(string, tag = "1"))] + #[serde(rename(deserialize = "pro_name"), default)] pub id: String, + #[cfg_attr(feature = "protobuf", prost(string, tag = "2"))] + #[serde(default)] pub name: String, + #[cfg_attr(feature = "protobuf", prost(string, tag = "3"))] + #[serde(default)] pub description: String, + #[cfg_attr(feature = "protobuf", prost(string, tag = "4"))] pub exchange: String, - #[serde(rename = "listed_exchange")] + #[cfg_attr(feature = "protobuf", prost(string, tag = "5"))] + #[serde(rename = "listed_exchange", default)] pub listed_exchange: String, - #[serde(rename = "provider_id")] + #[cfg_attr(feature = "protobuf", prost(string, tag = "6"))] + #[serde(rename = "provider_id", default)] pub provider_id: String, - #[serde(rename = "base_currency")] + #[cfg_attr(feature = "protobuf", prost(string, tag = "7"))] + #[serde(rename = "base_currency", default)] pub base_currency: String, - #[serde(rename = "base_currency_id")] + #[cfg_attr(feature = "protobuf", prost(string, tag = "8"))] + #[serde(rename = "base_currency_id", default)] pub base_currency_id: String, - #[serde(rename = "currency_id")] + #[cfg_attr(feature = "protobuf", prost(string, tag = "9"))] + #[serde(rename = "currency_id", default)] pub currency_id: String, - #[serde(rename = "currency_code")] + #[cfg_attr(feature = "protobuf", prost(string, tag = "10"))] + #[serde(rename = "currency_code", default)] pub currency_code: String, + #[cfg_attr(feature = "protobuf", prost(string, tag = "11"))] + #[serde(default)] pub session_holidays: String, + #[cfg_attr(feature = "protobuf", prost(message, repeated, tag = "12"))] + #[serde(default)] pub subsessions: Vec, + #[cfg_attr(feature = "protobuf", prost(string, tag = "13"))] + #[serde(default)] pub timezone: String, - #[serde(rename(deserialize = "type"))] + #[cfg_attr(feature = "protobuf", prost(string, tag = "14"))] + #[serde(rename(deserialize = "type"), default)] pub market_type: String, + #[cfg_attr(feature = "protobuf", prost(string, repeated, tag = "15"))] + #[serde(default)] pub typespecs: Vec, + #[cfg_attr(feature = "protobuf", prost(string, repeated, tag = "16"))] + #[serde(default)] pub aliases: Vec, } -#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(default, rename_all = "camelCase")] +#[cfg_attr(not(feature = "protobuf"), derive(Debug, Default))] +#[cfg_attr(feature = "protobuf", derive(prost::Message))] +#[derive(Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] pub struct Subsession { - pub description: String, + #[cfg_attr(feature = "protobuf", prost(string, tag = "1"))] pub id: String, + #[cfg_attr(feature = "protobuf", prost(string, tag = "2"))] + pub description: String, + #[cfg_attr(feature = "protobuf", prost(bool, tag = "3"))] pub private: bool, + #[cfg_attr(feature = "protobuf", prost(string, tag = "4"))] pub session: String, + #[cfg_attr(feature = "protobuf", prost(string, tag = "5"))] #[serde(rename(deserialize = "session-display"))] pub session_display: String, } diff --git a/src/chart/utils.rs b/src/chart/utils.rs index e69de29..c149f3c 100644 --- a/src/chart/utils.rs +++ b/src/chart/utils.rs @@ -0,0 +1,27 @@ +use serde::{Deserialize, Deserializer}; + +pub fn deserialize_string_default<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let s: String = Deserialize::deserialize(deserializer).unwrap_or_default(); + Ok(s) +} + +pub fn deserialize_vec_default<'de, D, T>(deserializer: D) -> Result, D::Error> +where + T: Deserialize<'de>, + D: Deserializer<'de>, +{ + let s: Vec = Deserialize::deserialize(deserializer).unwrap_or_default(); + Ok(s) +} + +pub fn deserialize_option_default<'de, D, T>(deserializer: D) -> Result, D::Error> +where + T: Deserialize<'de>, + D: Deserializer<'de>, +{ + let s: Option = Deserialize::deserialize(deserializer).unwrap_or_default(); + Ok(s) +} diff --git a/src/data_loader.rs b/src/data_loader.rs index f9277e4..b24f8bd 100644 --- a/src/data_loader.rs +++ b/src/data_loader.rs @@ -12,8 +12,8 @@ use crate::{ Result, }; use serde::Deserialize; -use serde_json::Value; -use std::collections::HashMap; +use serde_json::{error, Value}; +use std::{collections::HashMap, f32::consts::E}; use tracing::{debug, error, info, trace, warn}; #[derive(Clone, Default)] @@ -44,7 +44,6 @@ impl<'a> DataLoader<'a> { match event { TradingViewDataEvent::OnChartData | TradingViewDataEvent::OnChartDataUpdate => { trace!("received chart data: {:?}", message); - match self .handle_chart_data(&self.metadata.series, &self.metadata.studies, message) .await @@ -71,11 +70,15 @@ impl<'a> DataLoader<'a> { info!("series completed: {:#?}", message); } TradingViewDataEvent::OnSymbolResolved => { - let symbol_info = match SymbolInfo::deserialize(&message[2]) { - Ok(s) => s, - Err(_) => todo!(), + match SymbolInfo::deserialize(&message[2]) { + Ok(s) => debug!("{:?}", s), + Err(e) => { + error!("{:?}", e); + // return SymbolInfo::default(); + } }; - info!("{:?}", symbol_info) + // info!("symbol resolved: {:?}", &message[2]); + // debug!("{:?}", symbol_info) // let symbol_info = serde_json::from_value::(message[2].clone())?; } TradingViewDataEvent::OnReplayOk => { @@ -84,14 +87,16 @@ impl<'a> DataLoader<'a> { TradingViewDataEvent::OnReplayPoint => { info!("replay point: {:?}", message); } - TradingViewDataEvent::OnReplayInstanceId => todo!("7"), + TradingViewDataEvent::OnReplayInstanceId => { + info!("replay instance id: {:?}", message); + } TradingViewDataEvent::OnReplayResolutions => todo!("8"), TradingViewDataEvent::OnReplayDataEnd => todo!("9"), TradingViewDataEvent::OnStudyLoading => todo!("10"), TradingViewDataEvent::OnStudyCompleted => { info!("study completed: {:?}", message); } - TradingViewDataEvent::OnError(_) => todo!("12"), + TradingViewDataEvent::OnError(e) => error!("error: {:?}", e), TradingViewDataEvent::UnknownEvent(_) => todo!("13"), } } @@ -100,7 +105,7 @@ impl<'a> DataLoader<'a> { &self, series: &HashMap, studies: &HashMap, - message: &Vec, + message: &[Value], ) -> Result<()> { for (id, s) in series.iter() { trace!("received v: {:?}, m: {:?}", s, message); diff --git a/src/models/mod.rs b/src/models/mod.rs index 3a4bebe..8ba4068 100644 --- a/src/models/mod.rs +++ b/src/models/mod.rs @@ -342,29 +342,29 @@ impl std::fmt::Display for Timezone { } } -#[derive(Default, Clone, Copy, Debug, PartialEq, Serialize, Hash)] +#[derive(Debug, Default, Clone, Copy, PartialEq, Serialize, Hash)] pub enum Interval { - OneSecond, - FiveSeconds, - TenSeconds, - FifteenSeconds, - ThirtySeconds, - OneMinute, - ThreeMinutes, - FiveMinutes, - FifteenMinutes, - ThirtyMinutes, - FortyFiveMinutes, - OneHour, - TwoHours, - FourHours, + OneSecond = 0, + FiveSeconds = 1, + TenSeconds = 2, + FifteenSeconds = 3, + ThirtySeconds = 4, + OneMinute = 5, + ThreeMinutes = 6, + FiveMinutes = 7, + FifteenMinutes = 8, + ThirtyMinutes = 9, + FortyFiveMinutes = 10, + OneHour = 11, + TwoHours = 12, + FourHours = 13, #[default] - Daily, - Weekly, - Monthly, - Quarterly, - SixMonths, - Yearly, + Daily = 14, + Weekly = 15, + Monthly = 16, + Quarterly = 17, + SixMonths = 18, + Yearly = 19, } impl std::fmt::Display for Interval { diff --git a/tests/user_test.rs b/tests/user_test.rs index a2798a8..99280bf 100644 --- a/tests/user_test.rs +++ b/tests/user_test.rs @@ -3,17 +3,16 @@ // use dotenv::dotenv; // use std::env; // use tradingview::error::*; -// use tradingview::user::*; // #[tokio::test] // #[ignore] // async fn test_login_empty_credentials() { -// let mut empty_user = User::default(); -// empty_user.auth_token = "unauthorized_user_token".to_string(); +// let mut empty_user = tradingview::models::UserCookies::default(); +// // empty_user.auth_token = "unauthorized_user_token".to_string(); -// let result = User::build().credentials("", "").get().await; -// assert!(result.is_ok()); -// assert_eq!(result.unwrap(), empty_user); +// // let result = User::build().credentials("", "").get().await; +// // assert!(result.is_ok()); +// // assert_eq!(result.unwrap(), empty_user); // } // #[tokio::test] @@ -49,9 +48,10 @@ // assert!(result.is_err()); // let error = result.unwrap_err(); -// assert!( -// matches!(error, tradingview::error::Error::LoginError(LoginError::InvalidCredentials)) -// ); +// assert!(matches!( +// error, +// tradingview::error::Error::LoginError(LoginError::InvalidCredentials) +// )); // } // #[tokio::test] @@ -65,7 +65,11 @@ // let password = env::var("TV_TOTP_PASSWORD").unwrap(); // let totp = env::var("TV_TOTP_SECRET").unwrap(); -// let result = User::build().credentials(&username, &password).totp_secret(&totp).get().await; +// let result = User::build() +// .credentials(&username, &password) +// .totp_secret(&totp) +// .get() +// .await; // assert!(result.is_ok()); // let user = result.unwrap(); @@ -109,12 +113,16 @@ // let result = User::build() // .session("invalid_session", "invalid_session_signature") -// .get().await; +// .get() +// .await; // assert!(result.is_err()); // let error = result.unwrap_err(); -// assert!(matches!(error, tradingview::error::Error::LoginError(LoginError::InvalidSession))); +// assert!(matches!( +// error, +// tradingview::error::Error::LoginError(LoginError::InvalidSession) +// )); // } // #[tokio::test] @@ -130,12 +138,14 @@ // let result = User::build() // .credentials(&username, &password) // .totp_secret("ZTIXV4KTRISK4KK7") -// .get().await; +// .get() +// .await; // assert!(result.is_err()); // let error = result.unwrap_err(); -// assert!( -// matches!(error, tradingview::error::Error::LoginError(LoginError::InvalidOTPSecret)) -// ); +// assert!(matches!( +// error, +// tradingview::error::Error::LoginError(LoginError::InvalidOTPSecret) +// )); // } // } From b05885ab63e30ba237a1e4de78a240aa92bc8f9f Mon Sep 17 00:00:00 2001 From: Dat Nguyen Date: Thu, 11 Jan 2024 12:14:42 +0700 Subject: [PATCH 14/17] Improve data loader functionality --- src/callback.rs | 35 +++++++++++++++++++++++++++-------- src/chart/mod.rs | 6 +++--- src/chart/models.rs | 14 +++++--------- src/chart/session.rs | 6 +++++- src/data_loader.rs | 32 +++++++++++++++----------------- src/quote/session.rs | 8 ++++++-- src/socket.rs | 4 +--- 7 files changed, 62 insertions(+), 43 deletions(-) diff --git a/src/callback.rs b/src/callback.rs index d285d83..9d304e6 100644 --- a/src/callback.rs +++ b/src/callback.rs @@ -1,20 +1,26 @@ use crate::{ - chart::models::{SeriesCompletedMessage, StudyResponseData}, - quote::models::QuoteData, + chart::{ + models::{DataPoint, SeriesCompletedMessage, StudyResponseData}, + ChartOptions, StudyOptions, + }, + quote::models::QuoteValue, + Error, }; use futures_util::{future::BoxFuture, Future}; use serde_json::Value; use std::sync::Arc; +use tracing::error; pub type AsyncCallback<'a, T> = Box BoxFuture<'a, ()>) + Send + Sync + 'a>; #[derive(Clone)] pub struct Callbacks<'a> { - pub(crate) on_chart_data: Arc>>>, - pub(crate) on_quote_data: Arc>, + pub(crate) on_chart_data: Arc)>>, + pub(crate) on_quote_data: Arc>, pub(crate) on_series_completed: Arc>, - pub(crate) on_study_completed: Arc>, + pub(crate) on_study_completed: Arc>, pub(crate) on_unknown_event: Arc>, + pub(crate) on_error: Arc>, } impl Default for Callbacks<'_> { @@ -25,6 +31,11 @@ impl Default for Callbacks<'_> { on_series_completed: Arc::new(Box::new(|_| Box::pin(async {}))), on_study_completed: Arc::new(Box::new(|_| Box::pin(async {}))), on_unknown_event: Arc::new(Box::new(|_| Box::pin(async {}))), + on_error: Arc::new(Box::new(|e| { + Box::pin( + async move { error!("default error callback logging && handling, error: {e}") }, + ) + })), } } } @@ -32,7 +43,7 @@ impl Default for Callbacks<'_> { impl<'a> Callbacks<'a> { pub fn on_chart_data( &mut self, - f: impl Fn(Vec>) -> Fut + Send + Sync + 'a, + f: impl Fn((ChartOptions, Vec)) -> Fut + Send + Sync + 'a, ) -> &mut Self where Fut: Future + Send + 'a, @@ -43,7 +54,7 @@ impl<'a> Callbacks<'a> { pub fn on_quote_data( &mut self, - f: impl Fn(QuoteData) -> Fut + Send + Sync + 'a, + f: impl Fn(QuoteValue) -> Fut + Send + Sync + 'a, ) -> &mut Self where Fut: Future + Send + 'a, @@ -65,7 +76,7 @@ impl<'a> Callbacks<'a> { pub fn on_study_completed( &mut self, - f: impl Fn(StudyResponseData) -> Fut + Send + Sync + 'a, + f: impl Fn((StudyOptions, StudyResponseData)) -> Fut + Send + Sync + 'a, ) -> &mut Self where Fut: Future + Send + 'a, @@ -84,4 +95,12 @@ impl<'a> Callbacks<'a> { self.on_unknown_event = Arc::new(Box::new(move |data| Box::pin(f(data)))); self } + + pub fn on_error(&mut self, f: impl Fn(Error) -> Fut + Send + Sync + 'a) -> &mut Self + where + Fut: Future + Send + 'a, + { + self.on_error = Arc::new(Box::new(move |data| Box::pin(f(data)))); + self + } } diff --git a/src/chart/mod.rs b/src/chart/mod.rs index c4ab39b..f9fff7c 100644 --- a/src/chart/mod.rs +++ b/src/chart/mod.rs @@ -1,11 +1,11 @@ use iso_currency::Currency; -use crate::models::{ MarketAdjustment, SessionType, pine_indicator::ScriptType, Interval }; +use crate::models::{pine_indicator::ScriptType, Interval, MarketAdjustment, SessionType}; -pub mod session; -pub mod study; pub mod models; pub(crate) mod options; +pub mod session; +pub mod study; pub(crate) mod utils; #[derive(Default, Debug, Clone)] diff --git a/src/chart/models.rs b/src/chart/models.rs index f28fcbc..61f8b00 100644 --- a/src/chart/models.rs +++ b/src/chart/models.rs @@ -1,4 +1,3 @@ -use crate::models::Interval; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -50,10 +49,14 @@ pub struct GraphicDataResponse { pub indexes: Value, } -#[derive(Debug, Clone, Deserialize)] +#[cfg_attr(not(feature = "protobuf"), derive(Debug, Default))] +#[cfg_attr(feature = "protobuf", derive(prost::Message))] +#[derive(Clone, Deserialize)] pub struct DataPoint { + #[cfg_attr(feature = "protobuf", prost(int64, tag = "1"))] #[serde(rename(deserialize = "i"))] pub index: i64, + #[cfg_attr(feature = "protobuf", prost(double, repeated, tag = "2"))] #[serde(rename(deserialize = "v"))] pub value: Vec, } @@ -67,13 +70,6 @@ pub struct ChartDataChanges { pub zoffset: i64, } -#[derive(Debug, Default, Clone, Serialize)] -pub struct ChartSeriesData { - pub symbol: String, - pub interval: Interval, - pub data: Vec>, -} - #[cfg_attr(not(feature = "protobuf"), derive(Debug, Default))] #[cfg_attr(feature = "protobuf", derive(prost::Message))] #[derive(Clone, Serialize)] diff --git a/src/chart/session.rs b/src/chart/session.rs index ef85cca..1ed776c 100644 --- a/src/chart/session.rs +++ b/src/chart/session.rs @@ -5,7 +5,7 @@ use crate::{ payload, socket::{Socket, SocketMessageDe, SocketSession, TradingViewDataEvent}, utils::{gen_id, gen_session_id, symbol_init}, - Result, + Error, Result, }; use serde_json::Value; use std::fmt::Debug; @@ -453,4 +453,8 @@ impl<'a> Socket for WebSocket<'a> { self.data_loader.handle_events(event, &message.p).await; Ok(()) } + + async fn handle_error(&mut self, error: Error) { + (self.data_loader.callbacks.on_error)(error).await; + } } diff --git a/src/data_loader.rs b/src/data_loader.rs index b24f8bd..4925c63 100644 --- a/src/data_loader.rs +++ b/src/data_loader.rs @@ -4,22 +4,23 @@ use crate::{ models::{ChartResponseData, StudyResponseData, SymbolInfo}, session::SeriesInfo, }, + error::TradingViewError, quote::{ models::{QuoteData, QuoteValue}, utils::merge_quotes, }, socket::TradingViewDataEvent, - Result, + Error, Result, }; use serde::Deserialize; -use serde_json::{error, Value}; -use std::{collections::HashMap, f32::consts::E}; +use serde_json::Value; +use std::collections::HashMap; use tracing::{debug, error, info, trace, warn}; #[derive(Clone, Default)] pub struct DataLoader<'a> { pub(crate) metadata: Metadata, - callbacks: Callbacks<'a>, + pub(crate) callbacks: Callbacks<'a>, } #[derive(Default, Debug, Clone)] @@ -49,7 +50,9 @@ impl<'a> DataLoader<'a> { .await { Ok(_) => (), - Err(e) => error!("{}", e), + Err(e) => { + (self.callbacks.on_error)(e).await; + } }; } TradingViewDataEvent::OnQuoteData => self.handle_quote_data(message).await, @@ -111,24 +114,16 @@ impl<'a> DataLoader<'a> { trace!("received v: {:?}, m: {:?}", s, message); match message[1].get(id.as_str()) { Some(resp_data) => { - let data: Vec> = ChartResponseData::deserialize(resp_data)? - .series - .into_iter() - .map(|point| point.value) - .collect(); - // timestamp, open, high, low, close, volume + let data = ChartResponseData::deserialize(resp_data)?.series; debug!("series data extracted: {:?}", data); - // TODO: Notify function - (self.callbacks.on_chart_data)(data).await; + (self.callbacks.on_chart_data)((s.options.clone(), data)).await; } None => { debug!("receive empty data on series: {:?}", s); } } } - self.handle_study_data(studies, message).await?; - Ok(()) } @@ -160,11 +155,14 @@ impl<'a> DataLoader<'a> { for q in self.metadata.quotes.values() { debug!("quote data: {:?}", q); - // TODO: Notify function for quote data + (self.callbacks.on_quote_data)(q.to_owned()).await; } } else { error!("quote data status error: {:?}", qsd); - // TODO: Notify function for quote data error + (self.callbacks.on_error)(Error::TradingViewError( + TradingViewError::QuoteDataStatusError, + )) + .await; } } } diff --git a/src/quote/session.rs b/src/quote/session.rs index 46258c8..df6c4da 100644 --- a/src/quote/session.rs +++ b/src/quote/session.rs @@ -4,13 +4,13 @@ use crate::{ quote::ALL_QUOTE_FIELDS, socket::{Socket, SocketMessageDe, SocketSession, TradingViewDataEvent}, utils::gen_session_id, - Result, + Error, Result, }; use serde_json::Value; #[derive(Clone)] pub struct WebSocket<'a> { - data_loader: DataLoader<'a>, + pub(crate) data_loader: DataLoader<'a>, socket: SocketSession, } @@ -87,4 +87,8 @@ impl<'a> Socket for WebSocket<'a> { self.data_loader.handle_events(event, &message.p).await; Ok(()) } + + async fn handle_error(&mut self, error: Error) { + (self.data_loader.callbacks.on_error)(error).await; + } } diff --git a/src/socket.rs b/src/socket.rs index 1918348..3567594 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -369,7 +369,5 @@ pub trait Socket { async fn handle_message_data(&mut self, message: SocketMessageDe) -> Result<()>; - async fn handle_error(&mut self, error: Error) { - error!("{}", error); - } + async fn handle_error(&mut self, error: Error); } From 17ea3d6fdf5857b5440be65452d10311e8388893 Mon Sep 17 00:00:00 2001 From: Dat Nguyen Date: Thu, 11 Jan 2024 12:34:33 +0700 Subject: [PATCH 15/17] Update chart study callbacks --- examples/all.rs | 20 +++++++++++--------- src/callback.rs | 8 ++++---- src/chart/mod.rs | 2 +- src/chart/models.rs | 6 +++--- src/chart/session.rs | 2 +- src/data_loader.rs | 31 +++++++++++++++++-------------- src/quote/session.rs | 2 +- src/socket.rs | 2 +- 8 files changed, 39 insertions(+), 34 deletions(-) diff --git a/examples/all.rs b/examples/all.rs index 1c377af..dd7bd2b 100644 --- a/examples/all.rs +++ b/examples/all.rs @@ -26,22 +26,24 @@ async fn main() -> anyhow::Result<()> { // subscriber.subscribe(&mut chart, &mut socket); // let opts = ChartOptions::new("BINANCE:BTCUSDT", Interval::OneMinute).bar_count(100); - // let opts2 = ChartOptions::new("BINANCE:BTCUSDT", Interval::Daily).study_config( - // "STD;Candlestick%1Pattern%1Bearish%1Abandoned%1Baby", - // "33.0", - // ScriptType::IntervalScript, - // ); - let opts3 = ChartOptions::new("BINANCE:BTCUSDT", Interval::OneHour).bar_count(1); + let opts2 = ChartOptions::new("BINANCE:BTCUSDT", Interval::Daily) + .bar_count(1) + .study_config( + "STD;Candlestick%1Pattern%1Bearish%1Abandoned%1Baby", + "33.0", + ScriptType::IntervalScript, + ); + // let opts3 = ChartOptions::new("BINANCE:BTCUSDT", Interval::OneHour); // .replay_mode(true) // .replay_from(1698624060); chart // .set_market(opts) // .await? - // .set_market(opts2) - // .await? - .set_market(opts3) + .set_market(opts2) .await?; + // .set_market(opts3) + // .await?; // quote // .create_session() diff --git a/src/callback.rs b/src/callback.rs index 9d304e6..cc69d80 100644 --- a/src/callback.rs +++ b/src/callback.rs @@ -18,7 +18,7 @@ pub struct Callbacks<'a> { pub(crate) on_chart_data: Arc)>>, pub(crate) on_quote_data: Arc>, pub(crate) on_series_completed: Arc>, - pub(crate) on_study_completed: Arc>, + pub(crate) on_study_data: Arc>, pub(crate) on_unknown_event: Arc>, pub(crate) on_error: Arc>, } @@ -29,7 +29,7 @@ impl Default for Callbacks<'_> { on_chart_data: Arc::new(Box::new(|_| Box::pin(async {}))), on_quote_data: Arc::new(Box::new(|_| Box::pin(async {}))), on_series_completed: Arc::new(Box::new(|_| Box::pin(async {}))), - on_study_completed: Arc::new(Box::new(|_| Box::pin(async {}))), + on_study_data: Arc::new(Box::new(|_| Box::pin(async {}))), on_unknown_event: Arc::new(Box::new(|_| Box::pin(async {}))), on_error: Arc::new(Box::new(|e| { Box::pin( @@ -74,14 +74,14 @@ impl<'a> Callbacks<'a> { self } - pub fn on_study_completed( + pub fn on_study_data( &mut self, f: impl Fn((StudyOptions, StudyResponseData)) -> Fut + Send + Sync + 'a, ) -> &mut Self where Fut: Future + Send + 'a, { - self.on_study_completed = Arc::new(Box::new(move |data| Box::pin(f(data)))); + self.on_study_data = Arc::new(Box::new(move |data| Box::pin(f(data)))); self } diff --git a/src/chart/mod.rs b/src/chart/mod.rs index f9fff7c..9597a32 100644 --- a/src/chart/mod.rs +++ b/src/chart/mod.rs @@ -24,7 +24,7 @@ pub struct ChartOptions { adjustment: Option, currency: Option, session_type: Option, - study_config: Option, + pub study_config: Option, } #[derive(Default, Debug, Clone)] diff --git a/src/chart/models.rs b/src/chart/models.rs index 61f8b00..9c8ab3e 100644 --- a/src/chart/models.rs +++ b/src/chart/models.rs @@ -72,7 +72,7 @@ pub struct ChartDataChanges { #[cfg_attr(not(feature = "protobuf"), derive(Debug, Default))] #[cfg_attr(feature = "protobuf", derive(prost::Message))] -#[derive(Clone, Serialize)] +#[derive(Clone, PartialEq, Serialize, Deserialize, Hash)] pub struct SeriesCompletedMessage { #[cfg_attr(feature = "protobuf", prost(string, tag = "1"))] #[serde(default)] @@ -90,7 +90,7 @@ pub struct SeriesCompletedMessage { #[cfg_attr(not(feature = "protobuf"), derive(Debug, Default))] #[cfg_attr(feature = "protobuf", derive(prost::Message))] -#[derive(Clone, PartialEq, Serialize, Deserialize)] +#[derive(Clone, PartialEq, Serialize, Deserialize, Hash)] #[serde(rename_all = "camelCase")] pub struct SymbolInfo { #[cfg_attr(feature = "protobuf", prost(string, tag = "1"))] @@ -144,7 +144,7 @@ pub struct SymbolInfo { #[cfg_attr(not(feature = "protobuf"), derive(Debug, Default))] #[cfg_attr(feature = "protobuf", derive(prost::Message))] -#[derive(Clone, PartialEq, Serialize, Deserialize)] +#[derive(Clone, PartialEq, Serialize, Deserialize, Hash)] #[serde(rename_all = "camelCase")] pub struct Subsession { #[cfg_attr(feature = "protobuf", prost(string, tag = "1"))] diff --git a/src/chart/session.rs b/src/chart/session.rs index 1ed776c..d8b46fc 100644 --- a/src/chart/session.rs +++ b/src/chart/session.rs @@ -454,7 +454,7 @@ impl<'a> Socket for WebSocket<'a> { Ok(()) } - async fn handle_error(&mut self, error: Error) { + async fn handle_error(&self, error: Error) { (self.data_loader.callbacks.on_error)(error).await; } } diff --git a/src/data_loader.rs b/src/data_loader.rs index 4925c63..5a047f8 100644 --- a/src/data_loader.rs +++ b/src/data_loader.rs @@ -1,8 +1,9 @@ use crate::{ callback::Callbacks, chart::{ - models::{ChartResponseData, StudyResponseData, SymbolInfo}, + models::{ChartResponseData, SeriesCompletedMessage, StudyResponseData, SymbolInfo}, session::SeriesInfo, + StudyOptions, }, error::TradingViewError, quote::{ @@ -15,7 +16,7 @@ use crate::{ use serde::Deserialize; use serde_json::Value; use std::collections::HashMap; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, info, trace}; #[derive(Clone, Default)] pub struct DataLoader<'a> { @@ -63,14 +64,13 @@ impl<'a> DataLoader<'a> { trace!("series is loading: {:#?}", message); } TradingViewDataEvent::OnSeriesCompleted => { - // let message = SeriesCompletedMessage { - // session: get_string_value(&message, 0), - // id: get_string_value(&message, 1), - // update_mode: get_string_value(&message, 2), - // version: get_string_value(&message, 3), - // }; - - info!("series completed: {:#?}", message); + match SeriesCompletedMessage::deserialize(&message[1]) { + Ok(s) => info!("series completed: {:#?}", s), + Err(e) => { + error!("{:?}", e); + // return SymbolInfo::default(); + } + }; } TradingViewDataEvent::OnSymbolResolved => { match SymbolInfo::deserialize(&message[2]) { @@ -122,13 +122,18 @@ impl<'a> DataLoader<'a> { debug!("receive empty data on series: {:?}", s); } } + + if let Some(study_options) = &s.options.study_config { + self.handle_study_data(study_options, studies, message) + .await?; + } } - self.handle_study_data(studies, message).await?; Ok(()) } async fn handle_study_data( &self, + options: &StudyOptions, studies: &HashMap, message: &[Value], ) -> Result<()> { @@ -136,9 +141,7 @@ impl<'a> DataLoader<'a> { if let Some(resp_data) = message[1].get(v.as_str()) { debug!("study data received: {} - {:?}", k, resp_data); let data = StudyResponseData::deserialize(resp_data)?; - warn!("study data extracted: {} - {:?}", k, data); - - // TODO: Notify function + (self.callbacks.on_study_data)((options.clone(), data)).await; } } Ok(()) diff --git a/src/quote/session.rs b/src/quote/session.rs index df6c4da..6685d03 100644 --- a/src/quote/session.rs +++ b/src/quote/session.rs @@ -88,7 +88,7 @@ impl<'a> Socket for WebSocket<'a> { Ok(()) } - async fn handle_error(&mut self, error: Error) { + async fn handle_error(&self, error: Error) { (self.data_loader.callbacks.on_error)(error).await; } } diff --git a/src/socket.rs b/src/socket.rs index 3567594..85c842f 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -369,5 +369,5 @@ pub trait Socket { async fn handle_message_data(&mut self, message: SocketMessageDe) -> Result<()>; - async fn handle_error(&mut self, error: Error); + async fn handle_error(&self, error: Error); } From 36f28f4e90f87e4c00b21696d5c1e87cf2c585b4 Mon Sep 17 00:00:00 2001 From: Dat Nguyen Date: Thu, 11 Jan 2024 13:21:30 +0700 Subject: [PATCH 16/17] update dataloader chart and quote --- examples/all.rs | 48 +++++++++++++++++------------------- src/callback.rs | 60 ++++++++++++++++++++++++++------------------- src/chart/models.rs | 8 +----- src/chart/utils.rs | 26 -------------------- src/data_loader.rs | 51 +++++++++----------------------------- 5 files changed, 71 insertions(+), 122 deletions(-) diff --git a/examples/all.rs b/examples/all.rs index dd7bd2b..0040580 100644 --- a/examples/all.rs +++ b/examples/all.rs @@ -21,11 +21,9 @@ async fn main() -> anyhow::Result<()> { let publisher: DataLoader = DataLoader::default(); let mut chart = chart::session::WebSocket::new(publisher.clone(), socket.clone()); - // let mut quote = quote::session::WebSocket::new(publisher.clone(), socket.clone()); + let mut quote = quote::session::WebSocket::new(publisher.clone(), socket.clone()); - // subscriber.subscribe(&mut chart, &mut socket); - - // let opts = ChartOptions::new("BINANCE:BTCUSDT", Interval::OneMinute).bar_count(100); + let opts = ChartOptions::new("BINANCE:BTCUSDT", Interval::OneMinute).bar_count(100); let opts2 = ChartOptions::new("BINANCE:BTCUSDT", Interval::Daily) .bar_count(1) .study_config( @@ -33,34 +31,34 @@ async fn main() -> anyhow::Result<()> { "33.0", ScriptType::IntervalScript, ); - // let opts3 = ChartOptions::new("BINANCE:BTCUSDT", Interval::OneHour); - // .replay_mode(true) - // .replay_from(1698624060); + let opts3 = ChartOptions::new("BINANCE:BTCUSDT", Interval::OneHour) + .replay_mode(true) + .replay_from(1698624060); chart - // .set_market(opts) - // .await? + .set_market(opts) + .await? .set_market(opts2) + .await? + .set_market(opts3) .await?; - // .set_market(opts3) - // .await?; - // quote - // .create_session() - // .await? - // .set_fields() - // .await? - // .add_symbols(vec![ - // "SP:SPX", - // "BINANCE:BTCUSDT", - // "BINANCE:ETHUSDT", - // "BITSTAMP:ETHUSD", - // "NASDAQ:TSLA", // "BINANCE:B", - // ]) - // .await?; + quote + .create_session() + .await? + .set_fields() + .await? + .add_symbols(vec![ + "SP:SPX", + "BINANCE:BTCUSDT", + "BINANCE:ETHUSDT", + "BITSTAMP:ETHUSD", + "NASDAQ:TSLA", // "BINANCE:B", + ]) + .await?; tokio::spawn(async move { chart.clone().subscribe().await }); - // tokio::spawn(async move { quote.clone().subscribe().await }); + tokio::spawn(async move { quote.clone().subscribe().await }); loop {} } diff --git a/src/callback.rs b/src/callback.rs index cc69d80..0b37523 100644 --- a/src/callback.rs +++ b/src/callback.rs @@ -1,15 +1,16 @@ use crate::{ chart::{ - models::{DataPoint, SeriesCompletedMessage, StudyResponseData}, + models::{DataPoint, StudyResponseData, SymbolInfo}, ChartOptions, StudyOptions, }, quote::models::QuoteValue, + socket::TradingViewDataEvent, Error, }; use futures_util::{future::BoxFuture, Future}; use serde_json::Value; use std::sync::Arc; -use tracing::error; +use tracing::{error, info}; pub type AsyncCallback<'a, T> = Box BoxFuture<'a, ()>) + Send + Sync + 'a>; @@ -17,24 +18,33 @@ pub type AsyncCallback<'a, T> = Box BoxFuture<'a, ()>) + Send + Sy pub struct Callbacks<'a> { pub(crate) on_chart_data: Arc)>>, pub(crate) on_quote_data: Arc>, - pub(crate) on_series_completed: Arc>, pub(crate) on_study_data: Arc>, - pub(crate) on_unknown_event: Arc>, pub(crate) on_error: Arc>, + pub(crate) on_symbol_info: Arc>, + pub(crate) on_other_event: Arc)>>, } impl Default for Callbacks<'_> { fn default() -> Self { Self { - on_chart_data: Arc::new(Box::new(|_| Box::pin(async {}))), - on_quote_data: Arc::new(Box::new(|_| Box::pin(async {}))), - on_series_completed: Arc::new(Box::new(|_| Box::pin(async {}))), - on_study_data: Arc::new(Box::new(|_| Box::pin(async {}))), - on_unknown_event: Arc::new(Box::new(|_| Box::pin(async {}))), + on_chart_data: Arc::new(Box::new(|data| { + Box::pin(async move { info!("default callback logging && handling: {:?}", data) }) + })), + on_quote_data: Arc::new(Box::new(|data| { + Box::pin(async move { info!("default callback logging && handling: {:?}", data) }) + })), + on_study_data: Arc::new(Box::new(|data| { + Box::pin(async move { info!("default callback logging && handling: {:?}", data) }) + })), + on_error: Arc::new(Box::new(|e| { - Box::pin( - async move { error!("default error callback logging && handling, error: {e}") }, - ) + Box::pin(async move { error!("default callback logging && handling: {e}") }) + })), + on_symbol_info: Arc::new(Box::new(|data| { + Box::pin(async move { info!("default callback logging && handling: {:?}", data) }) + })), + on_other_event: Arc::new(Box::new(|data| { + Box::pin(async move { info!("default callback logging && handling: {:?}", data) }) })), } } @@ -63,44 +73,44 @@ impl<'a> Callbacks<'a> { self } - pub fn on_series_completed( + pub fn on_study_data( &mut self, - f: impl Fn(SeriesCompletedMessage) -> Fut + Send + Sync + 'a, + f: impl Fn((StudyOptions, StudyResponseData)) -> Fut + Send + Sync + 'a, ) -> &mut Self where Fut: Future + Send + 'a, { - self.on_series_completed = Arc::new(Box::new(move |data| Box::pin(f(data)))); + self.on_study_data = Arc::new(Box::new(move |data| Box::pin(f(data)))); self } - pub fn on_study_data( - &mut self, - f: impl Fn((StudyOptions, StudyResponseData)) -> Fut + Send + Sync + 'a, - ) -> &mut Self + pub fn on_error(&mut self, f: impl Fn(Error) -> Fut + Send + Sync + 'a) -> &mut Self where Fut: Future + Send + 'a, { - self.on_study_data = Arc::new(Box::new(move |data| Box::pin(f(data)))); + self.on_error = Arc::new(Box::new(move |data| Box::pin(f(data)))); self } - pub fn on_unknown_event( + pub fn on_symbol_info( &mut self, - f: impl Fn(Value) -> Fut + Send + Sync + 'a, + f: impl Fn(SymbolInfo) -> Fut + Send + Sync + 'a, ) -> &mut Self where Fut: Future + Send + 'a, { - self.on_unknown_event = Arc::new(Box::new(move |data| Box::pin(f(data)))); + self.on_symbol_info = Arc::new(Box::new(move |data| Box::pin(f(data)))); self } - pub fn on_error(&mut self, f: impl Fn(Error) -> Fut + Send + Sync + 'a) -> &mut Self + pub fn on_other_event( + &mut self, + f: impl Fn((TradingViewDataEvent, Vec)) -> Fut + Send + Sync + 'a, + ) -> &mut Self where Fut: Future + Send + 'a, { - self.on_error = Arc::new(Box::new(move |data| Box::pin(f(data)))); + self.on_other_event = Arc::new(Box::new(move |data| Box::pin(f(data)))); self } } diff --git a/src/chart/models.rs b/src/chart/models.rs index 9c8ab3e..299660e 100644 --- a/src/chart/models.rs +++ b/src/chart/models.rs @@ -72,20 +72,14 @@ pub struct ChartDataChanges { #[cfg_attr(not(feature = "protobuf"), derive(Debug, Default))] #[cfg_attr(feature = "protobuf", derive(prost::Message))] -#[derive(Clone, PartialEq, Serialize, Deserialize, Hash)] +#[derive(Clone, PartialEq, Serialize, Hash)] pub struct SeriesCompletedMessage { #[cfg_attr(feature = "protobuf", prost(string, tag = "1"))] #[serde(default)] pub id: String, #[cfg_attr(feature = "protobuf", prost(string, tag = "2"))] #[serde(default)] - pub session: String, - #[cfg_attr(feature = "protobuf", prost(string, tag = "3"))] - #[serde(default)] pub update_mode: String, - #[cfg_attr(feature = "protobuf", prost(string, tag = "4"))] - #[serde(default)] - pub version: String, } #[cfg_attr(not(feature = "protobuf"), derive(Debug, Default))] diff --git a/src/chart/utils.rs b/src/chart/utils.rs index c149f3c..8b13789 100644 --- a/src/chart/utils.rs +++ b/src/chart/utils.rs @@ -1,27 +1 @@ -use serde::{Deserialize, Deserializer}; -pub fn deserialize_string_default<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de>, -{ - let s: String = Deserialize::deserialize(deserializer).unwrap_or_default(); - Ok(s) -} - -pub fn deserialize_vec_default<'de, D, T>(deserializer: D) -> Result, D::Error> -where - T: Deserialize<'de>, - D: Deserializer<'de>, -{ - let s: Vec = Deserialize::deserialize(deserializer).unwrap_or_default(); - Ok(s) -} - -pub fn deserialize_option_default<'de, D, T>(deserializer: D) -> Result, D::Error> -where - T: Deserialize<'de>, - D: Deserializer<'de>, -{ - let s: Option = Deserialize::deserialize(deserializer).unwrap_or_default(); - Ok(s) -} diff --git a/src/data_loader.rs b/src/data_loader.rs index 5a047f8..95bafd7 100644 --- a/src/data_loader.rs +++ b/src/data_loader.rs @@ -1,7 +1,7 @@ use crate::{ callback::Callbacks, chart::{ - models::{ChartResponseData, SeriesCompletedMessage, StudyResponseData, SymbolInfo}, + models::{ChartResponseData, StudyResponseData, SymbolInfo}, session::SeriesInfo, StudyOptions, }, @@ -16,7 +16,7 @@ use crate::{ use serde::Deserialize; use serde_json::Value; use std::collections::HashMap; -use tracing::{debug, error, info, trace}; +use tracing::{debug, error, trace}; #[derive(Clone, Default)] pub struct DataLoader<'a> { @@ -52,55 +52,28 @@ impl<'a> DataLoader<'a> { { Ok(_) => (), Err(e) => { + error!("chart data parsing error: {:?}", e); (self.callbacks.on_error)(e).await; } }; } TradingViewDataEvent::OnQuoteData => self.handle_quote_data(message).await, - TradingViewDataEvent::OnQuoteCompleted => { - info!("quote completed: {:?}", message) - } - TradingViewDataEvent::OnSeriesLoading => { - trace!("series is loading: {:#?}", message); - } - TradingViewDataEvent::OnSeriesCompleted => { - match SeriesCompletedMessage::deserialize(&message[1]) { - Ok(s) => info!("series completed: {:#?}", s), - Err(e) => { - error!("{:?}", e); - // return SymbolInfo::default(); - } - }; - } TradingViewDataEvent::OnSymbolResolved => { match SymbolInfo::deserialize(&message[2]) { - Ok(s) => debug!("{:?}", s), + Ok(s) => { + debug!("receive symbol info: {:?}", s); + (self.callbacks.on_symbol_info)(s).await; + } Err(e) => { - error!("{:?}", e); - // return SymbolInfo::default(); + error!("symbol resolved parsing error: {:?}", e); + (self.callbacks.on_error)(Error::JsonParseError(e)).await; } }; - // info!("symbol resolved: {:?}", &message[2]); - // debug!("{:?}", symbol_info) - // let symbol_info = serde_json::from_value::(message[2].clone())?; - } - TradingViewDataEvent::OnReplayOk => { - info!("replay ok: {:?}", message); - } - TradingViewDataEvent::OnReplayPoint => { - info!("replay point: {:?}", message); - } - TradingViewDataEvent::OnReplayInstanceId => { - info!("replay instance id: {:?}", message); } - TradingViewDataEvent::OnReplayResolutions => todo!("8"), - TradingViewDataEvent::OnReplayDataEnd => todo!("9"), - TradingViewDataEvent::OnStudyLoading => todo!("10"), - TradingViewDataEvent::OnStudyCompleted => { - info!("study completed: {:?}", message); + _ => { + debug!("event: {:?}, message: {:?}", event, message); + (self.callbacks.on_other_event)((event, message.to_owned())).await; } - TradingViewDataEvent::OnError(e) => error!("error: {:?}", e), - TradingViewDataEvent::UnknownEvent(_) => todo!("13"), } } From e520c3a5d553dbba2ff2b3a1698eddba913e47a0 Mon Sep 17 00:00:00 2001 From: Dat Nguyen Date: Thu, 11 Jan 2024 13:28:05 +0700 Subject: [PATCH 17/17] Refactor UserCookies::new() method to initialize all fields --- .github/workflows/publish.yml | 24 ++++++++++++++++++++++++ src/user.rs | 11 ++++++++++- 2 files changed, 34 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/publish.yml diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml new file mode 100644 index 0000000..bcbb65c --- /dev/null +++ b/.github/workflows/publish.yml @@ -0,0 +1,24 @@ +name: Publish to crates.io + +on: + push: + tags: + - "*" + +jobs: + publish: + name: Publish to crates.io + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions-rs/toolchain@v1 + with: + toolchain: stable + override: true + - uses: actions-rs/cargo@v1 + with: + command: login + args: ${{ secrets.CRATES_IO_TOKEN }} + - uses: actions-rs/cargo@v1 + with: + command: publish \ No newline at end of file diff --git a/src/user.rs b/src/user.rs index 5deb70f..0f37e6b 100644 --- a/src/user.rs +++ b/src/user.rs @@ -11,7 +11,16 @@ use tracing::{debug, error, info, warn}; impl UserCookies { pub fn new() -> Self { - UserCookies::default() + UserCookies { + session: String::new(), + session_signature: String::new(), + device_token: String::new(), + session_hash: String::new(), + private_channel: String::new(), + auth_token: String::new(), + id: 0, + username: String::new(), + } } pub async fn login(