From b62d9df612cd50998d4239d58c5b588286b14417 Mon Sep 17 00:00:00 2001 From: matthew-heartful Date: Wed, 17 Jul 2024 22:31:17 -0700 Subject: [PATCH] new logic to process only frame with highest difference versus previous frame, processes as soon as previous OCR task is complete --- screenpipe-vision/src/core.rs | 335 +++++++++++++++++++++------------- 1 file changed, 207 insertions(+), 128 deletions(-) diff --git a/screenpipe-vision/src/core.rs b/screenpipe-vision/src/core.rs index 9c5691bb4..894af9d56 100644 --- a/screenpipe-vision/src/core.rs +++ b/screenpipe-vision/src/core.rs @@ -1,12 +1,11 @@ use image::DynamicImage; use log::{debug, error, info}; -use rusty_tesseract::{Args, Image}; +use rusty_tesseract::{Args, Image, DataOutput}; use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; use xcap::Monitor; use std::collections::HashMap; -use std::convert::TryInto; use std::hash::{DefaultHasher, Hash, Hasher}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -14,6 +13,7 @@ use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::Mutex; use tokio::task; use serde_json; +use image_compare::{Algorithm, Metric, Similarity}; // Added import for Similarity pub enum ControlMessage { Pause, @@ -27,11 +27,11 @@ pub struct CaptureResult { pub text_json: Vec, pub frame_number: u64, pub timestamp: Instant, - pub tsv_output: String, + pub data_output: DataOutput, // Corrected this line } -const MAX_THREADS: usize = 4; // Adjust based on your needs -const MAX_QUEUE_SIZE: usize = 6; // Maximum number of frames to keep in the queue. 64/8 o +const MAX_THREADS: usize = 1; // Adjust based on your needs +// const MAX_QUEUE_SIZE: usize = 1; // Maximum number of frames to keep in the queue. 64/8 o // seems kinda counter intuitive but less threads for OCR = more CPU usage = less frame dropping pub async fn continuous_capture( @@ -51,51 +51,76 @@ pub async fn continuous_capture( let cache = Arc::new(Mutex::new(HashMap::::new())); let (ocr_tx, _) = - broadcast::channel::<(Arc, u64, u64, Instant, Sender)>(64); + broadcast::channel::<(Arc, u64, u64, Instant, Sender)>(2); let ocr_tx = Arc::new(ocr_tx); + let previous_text_json = Arc::new(Mutex::new(None)); + + let is_active = Arc::new(Mutex::new(false)); + // Spawn OCR tasks let ocr_handles: Vec<_> = (0..pool_size) .map(|id| { let mut ocr_rx = ocr_tx.subscribe(); let cache = Arc::clone(&cache); let should_stop = Arc::clone(&should_stop); + let previous_text_json = Arc::clone(&previous_text_json); + let is_active = Arc::clone(&is_active); task::spawn(async move { - // info!("OCR task {} started", id); + debug!("OCR task {} started", id); while !*should_stop.lock().await { match ocr_rx.recv().await { Ok((image_arc, image_hash, frame_number, timestamp, result_tx)) => { + debug!("OCR task {} received frame {}", id, frame_number); // Only process if the frame number modulo pool_size equals this task's id - if frame_number % pool_size as u64 == id as u64 { - let start_time = Instant::now(); - let mut cache = cache.lock().await; - let (text, tsv_output, json_output) = if let Some(cached_text) = cache.get(&image_hash) { - (cached_text.clone(), String::new(), String::new()) - } else { - let (new_text, new_tsv_output, new_json_output) = perform_ocr(&image_arc); - cache.insert(image_hash, new_text.clone()); - (new_text, new_tsv_output, new_json_output) - }; - - if let Err(e) = result_tx - .send(CaptureResult { - image: image_arc.into(), - text: text.clone(), - text_json: serde_json::from_str(&json_output).unwrap(), - frame_number, - timestamp, - tsv_output, - }) - .await - { - error!("Failed to send OCR result: {}", e); - } - let _duration = start_time.elapsed(); - // debug!( - // "OCR task {} processed frame {} in {:?}", - // id, frame_number, duration - // ); + // if frame_number % pool_size as u64 == id as u64 { + let start_time = Instant::now(); + let mut cache = cache.lock().await; + let (text, data_output, json_output) = if let Some(cached_text) = cache.get(&image_hash) { + (cached_text.clone(), DataOutput { output: String::new(), data: vec![] }, String::new()) + } else { + // Set is_active to true before performing OCR + *is_active.lock().await = true; + + let (new_text, data_output, new_json_output) = perform_ocr(&image_arc); + cache.insert(image_hash, new_text.clone()); + + // Set is_active to false after performing OCR + *is_active.lock().await = false; + + (new_text, data_output, new_json_output) + }; + + let current_text_json: Vec = serde_json::from_str(&json_output).unwrap(); + let mut previous_text_json = previous_text_json.lock().await; + + // Debug logging for current and previous text_json + let current_text_json_len: usize = current_text_json.iter().map(|s| s.len()).sum(); + let previous_text_json_len: usize = previous_text_json.as_ref().map_or(0, |v: &Vec| v.iter().map(|s| s.len()).sum()); + + debug!("JSON length Current: {} Previous: {}", current_text_json_len, previous_text_json_len); + + *previous_text_json = Some(current_text_json.clone()); + + if let Err(e) = result_tx + .send(CaptureResult { + image: image_arc.into(), + text: text.clone(), + text_json: current_text_json, + frame_number, + timestamp, + data_output, + }) + .await + { + error!("Failed to send OCR result: {}", e); } + let _duration = start_time.elapsed(); + debug!( + "OCR task {} processed frame {} in {:?}", + id, frame_number, _duration + ); + // } } Err(e) => match e { RecvError::Lagged(_) => { @@ -108,7 +133,7 @@ pub async fn continuous_capture( }, } } - // info!("OCR task {} stopped", id); + debug!("OCR task {} stopped", id); }) }) .collect(); @@ -117,6 +142,25 @@ pub async fn continuous_capture( let start_time = Instant::now(); let mut last_processed_frame = 0; + let mut previous_image: Option> = None; + + // Function to calculate the 50:50 weighted average of Histogram and SSIM + fn calculate_weighted_average(histogram: f64, ssim: f64) -> f64 { + (histogram + ssim) / 2.0 + } + + // Struct to hold the max average frame data + struct MaxAverageFrame { + image: Arc, + image_hash: u64, + frame_number: u64, + timestamp: Instant, + result_tx: Sender, + average: f64, + } + + let mut max_average: Option = None; + while !*should_stop.lock().await { // Check for control messages if let Ok(message) = control_rx.try_recv() { @@ -139,51 +183,102 @@ pub async fn continuous_capture( let capture_start = Instant::now(); let buffer = monitor.capture_image().unwrap(); let image = DynamicImage::ImageRgba8(buffer); + let image_hash = calculate_hash(&image); // Generate hash for the image let capture_duration = capture_start.elapsed(); - // Generate hash for the image - let image_hash = calculate_hash(&image); + // Initialize current_average + let mut current_average = 0.0; + + // Compare with previous image and print the differences + if let Some(prev_image) = &previous_image { + let histogram_diff = compare_images_histogram(prev_image, &image); + let ssim_diff = 1.0 - compare_images_ssim(prev_image, &image); + current_average = (histogram_diff + ssim_diff) / 2.0; + debug!("Frame {}: Histogram diff: {:.3}, SSIM diff: {:.3}, Average: {:.3}", frame_counter, histogram_diff, ssim_diff, current_average); + + if let Some(max_avg_frame) = &max_average { + if current_average < max_avg_frame.average { + debug!("Dropping frame {} due to lower average", frame_counter); + frame_counter += 1; + tokio::time::sleep(interval).await; + continue; + } else { + debug!("Storing frame {} as max_average {}", frame_counter, current_average); + max_average = Some(MaxAverageFrame { + image: Arc::new(image.clone()), + image_hash, + frame_number: frame_counter, + timestamp: capture_start, + result_tx: result_tx.clone(), + average: current_average, + }); + } + } + } else { + debug!("No previous image to compare for frame {}", frame_counter); + } + + // Set max_average if it is None + if max_average.is_none() { + debug!("Setting frame {} as initial max_average {}", frame_counter, current_average); + max_average = Some(MaxAverageFrame { + image: Arc::new(image.clone()), + image_hash, + frame_number: frame_counter, + timestamp: capture_start, + result_tx: result_tx.clone(), + average: current_average, + }); + } + + previous_image = Some(Arc::new(image.clone())); - // Clone necessary values for the OCR task - let result_tx_clone = result_tx.clone(); - let image_arc = Arc::new(image); - // Check if we need to drop this frame - let queue_size = ocr_tx.receiver_count() as u64; + // Check if we need to store this frame as max_average based on queue size + // let queue_size = ocr_tx.receiver_count() as u64; // debug!("OCR queue size: {}", queue_size); - if queue_size >= MAX_QUEUE_SIZE as u64 { - let frames_to_skip = queue_size - MAX_QUEUE_SIZE as u64 + 1; - if frame_counter - last_processed_frame <= frames_to_skip { - debug!("Dropping frame {} due to OCR backlog", frame_counter); + // if queue_size >= MAX_QUEUE_SIZE as u64 { + // let is_active = *is_active.lock().await; + // debug!("Dropping frame {} due to OCR backlog", frame_counter); + // frame_counter += 1; + // tokio::time::sleep(interval).await; + // continue; + // } + + // Clone necessary values for the OCR task + let result_tx_clone = result_tx.clone(); + // Send max_average frame for OCR processing + if let Some(max_avg_frame) = &max_average { + let is_active = *is_active.lock().await; + if !is_active { + let send_start = Instant::now(); + // queue_size.fetch_add(1, Ordering::SeqCst); // Increment the counter + if let Err(e) = ocr_tx.send(( + max_avg_frame.image.clone(), + max_avg_frame.image_hash, + max_avg_frame.frame_number, + max_avg_frame.timestamp, + result_tx_clone, + )) { + error!("Failed to send image for OCR processing: {}", e); + // queue_size.fetch_sub(1, Ordering::SeqCst); // Decrement the counter on error + } else { + last_processed_frame = frame_counter; + max_average = None; // Reset max_average after sending + } + let send_duration = send_start.elapsed(); + frame_counter += 1; - continue; + debug!( + "Frame {}: Capture time: {:?}, Send time: {:?}, Receiver count: {}", + frame_counter, + capture_duration, + send_duration, + ocr_tx.receiver_count() + ); } } - // Send image for OCR processing - let send_start = Instant::now(); - if let Err(e) = ocr_tx.send(( - image_arc, - image_hash, - frame_counter, - capture_start, - result_tx_clone, - )) { - error!("Failed to send image for OCR processing: {}", e); - } else { - last_processed_frame = frame_counter; - } - let send_duration = send_start.elapsed(); - - frame_counter += 1; - debug!( - "Frame {}: Capture time: {:?}, Send time: {:?}, Receiver count: {}", - frame_counter, - capture_duration, - send_duration, - ocr_tx.receiver_count() - ); - tokio::time::sleep(interval).await; } @@ -197,7 +292,7 @@ pub async fn continuous_capture( let total_duration = start_time.elapsed(); info!( - "Capture completed. Total frames: {}, Total time: {:?}, Avg FPS: {:.2}", + "Capture completed. Total frames: {}, Total time: {:.1?}, Avg FPS: {:.2}", frame_counter, total_duration, frame_counter as f64 / total_duration.as_secs_f64() @@ -209,43 +304,27 @@ fn calculate_hash(image: &DynamicImage) -> u64 { hasher.finish() } -pub fn perform_ocr(image: &DynamicImage) -> (String, String, String) { +pub fn perform_ocr(image: &DynamicImage) -> (String, DataOutput, String) { let args = Args { lang: "eng".to_string(), config_variables: HashMap::from([ ("tessedit_create_tsv".into(), "1".into()), ]), - dpi: Some(150), - psm: Some(3), - oem: Some(3), + dpi: Some(600), // 150 is a balanced option, 600 seems faster surprisingly, the more the more granualar + psm: Some(1), // PSM 1: Automatic page segmentation with OSD. PSM 3: Automatic page segmentation with OSD + oem: Some(1), //1: Neural nets LSTM engine only, 3: Default, based on what is available. (Default) }; + let ocr_image = Image::from_dynamic_image(image).unwrap(); - - let tsv_output = match rusty_tesseract::image_to_string(&ocr_image, &args) { - Ok(result) => {result}, - Err(e) => { - eprintln!("Error during image_to_string: {}", e); - String::from("OCR_TSV failed") - } - }; - // Extract text from TSV output - let text = tsv_output.lines() - .skip(1) // Skip the header line - .filter_map(|line| line.rsplit_once('\t').map(|(_, text)| text)) // Get the text after the last tab - .collect::>() - .join(" "); // Join all text parts with a space + // Extract data output + let data_output = rusty_tesseract::image_to_data(&ocr_image, &args).unwrap(); + // let tsv_output = data_output_to_tsv(&data_output); - // Define my_args for image_to_boxes - let my_args = Args { - lang: "eng".to_string(), - config_variables: HashMap::new(), - dpi: Some(150), - psm: Some(3), - oem: Some(3), - }; + // Extract text from data output + let text = data_output_to_text(&data_output); - let data_output = rusty_tesseract::image_to_data(&ocr_image, &my_args).unwrap(); + // Extract JSON output let mut lines: Vec = Vec::new(); let mut current_line = String::new(); let mut last_word_num = 0; @@ -265,40 +344,40 @@ pub fn perform_ocr(image: &DynamicImage) -> (String, String, String) { } last_word_num = record.word_num; } - if !current_line.is_empty() { lines.push(current_line); } let json_output = serde_json::to_string_pretty(&lines).unwrap(); - (text, tsv_output, json_output) + (text, data_output, json_output) } -#[cfg(test)] -mod tests { - use super::*; - use image::DynamicImage; - use std::path::PathBuf; - - #[test] - fn test_perform_ocr() { - // Use the correct path to the testing_OCR.png file - let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - path.push("tests/testing_OCR.png"); - let image = image::open(path).expect("Failed to open image"); - let (text, tsv_output, json_output) = perform_ocr(&image); - - // Generate text_json - // let text_json: Vec = text.lines().map(String::from).collect(); - - // Print the results - // println!("OCR Text: {}", text); - // println!("TSV Output: {}", tsv_output); - // println!("Text JSON: {:?}", json_output); - - assert!(!text.is_empty(), "OCR text should not be empty"); - assert!(!tsv_output.is_empty(), "TSV output should not be empty"); - assert!(!text_json.is_empty(), "Text JSON should not be empty"); +fn data_output_to_text(data_output: &DataOutput) -> String { + let mut text = String::new(); + for record in &data_output.data { + if !record.text.is_empty() { + if !text.is_empty() { + text.push(' '); + } + text.push_str(&record.text); + } } + text +} + +fn compare_images_histogram(image1: &DynamicImage, image2: &DynamicImage) -> f64 { + let image_one = image1.to_luma8(); + let image_two = image2.to_luma8(); + let result = image_compare::gray_similarity_histogram(Metric::Hellinger, &image_one, &image_two) + .expect("Images had different dimensions"); + result +} + +fn compare_images_ssim(image1: &DynamicImage, image2: &DynamicImage) -> f64 { + let image_one = image1.to_luma8(); + let image_two = image2.to_luma8(); + let result: Similarity = image_compare::gray_similarity_structure(&Algorithm::MSSIMSimple, &image_one, &image_two) + .expect("Images had different dimensions"); + result.score } \ No newline at end of file