Skip to content

Commit

Permalink
Merge pull request #99 from louis030195/friend-wearable-integration
Browse files Browse the repository at this point in the history
Friend wearable integration
  • Loading branch information
m13v authored Aug 3, 2024
2 parents 2183965 + 5220107 commit 1631114
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ members = [
"screenpipe-core",
"screenpipe-vision",
"screenpipe-audio",
"screenpipe-server",
"screenpipe-server", "external-cloud-integrations",

]
exclude = [
Expand Down
15 changes: 15 additions & 0 deletions external-cloud-integrations/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "external-cloud-integrations"
version.workspace = true
authors.workspace = true
description.workspace = true
repository.workspace = true
license.workspace = true
edition.workspace = true

[dependencies]
curl = "0.4.46"
serde_json = "1.0"
uuid = { version = "1.3", features = ["v4"] }
chrono = "0.4"
log = "0.4"
93 changes: 93 additions & 0 deletions external-cloud-integrations/src/friend_wearable.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use curl::easy::{Easy, List};
use serde_json::{json, Value};
use std::io::Read;
use uuid::Uuid;
use chrono::Utc;
use log::debug;

fn encode_to_uuid(memory_source: &str, memory_id: &str) -> Uuid {
let prefix = match memory_source {
"audio" => "a",
"screen" => "s",
_ => "u",
};
let combined = format!("{}-{}", prefix, memory_id);
let mut bytes = [0u8; 16];
combined.bytes().enumerate().for_each(|(i, b)| {
if i < 16 { bytes[i] = b }
});
Uuid::from_bytes(bytes)
}

#[allow(dead_code)]
fn decode_from_uuid(uuid: Uuid) -> (String, String) {
let combined = String::from_utf8_lossy(uuid.as_bytes()).to_string();
let parts: Vec<&str> = combined.splitn(2, '-').collect();
let source = match parts[0] {
"a" => "audio",
"s" => "screen",
_ => "unknown",
};
(source.to_string(), parts.get(1).unwrap_or(&"").to_string())
}

pub fn send_data_to_friend_wearable(
memory_source: String,
memory_id: String,
memory_text: String,
uid: &str,
) -> Result<Value, Box<dyn std::error::Error>> {
let request_id = encode_to_uuid(&memory_source, &memory_id);

// Use the provided UID instead of a hardcoded value
let friend_user_id = uid.to_string();
let endpoint = "https://webhook-test.com/c46d38536e2851a100e3c230386ae238";

// Generate timestamp
let memory_timestamp = Utc::now().to_rfc3339();

let payload = json!({
"request_id": request_id.to_string(),
"memory_source": memory_source,
"memory_id": memory_id,
"memory_timestamp": memory_timestamp,
"memory_text": memory_text,
"friend_user_id": friend_user_id
});

debug!("Sending request to friend endpoint: {}", payload);
let data = payload.to_string().into_bytes();
let mut handle = Easy::new();
let mut response = Vec::new();

let mut headers = List::new();
headers.append("Content-Type: application/json")?;
handle.http_headers(headers)?;

handle.url(endpoint)?;
handle.post(true)?;
handle.post_field_size(data.len() as u64)?;

{
let mut transfer = handle.transfer();
transfer.read_function(|buf| {
Ok(data.as_slice().read(buf).unwrap_or(0))
})?;
transfer.write_function(|new_data| {
response.extend_from_slice(new_data);
Ok(new_data.len())
})?;
transfer.perform()?;
}

let response_body = String::from_utf8(response)?;
let response_json: Value = serde_json::from_str(&response_body)?;

match handle.response_code()? {
200 => Ok(response_json),
400 => Err("Bad Request: Invalid data sent".into()),
401 => Err("Unauthorized: Authentication failed".into()),
500 => Err("Server Error: Please try again later".into()),
_ => Err(format!("Unexpected response: {}", response_body).into()),
}
}
1 change: 1 addition & 0 deletions external-cloud-integrations/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod friend_wearable;
1 change: 1 addition & 0 deletions external-cloud-integrations/src/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod friend_wearable;
3 changes: 3 additions & 0 deletions screenpipe-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ reqwest = { version = "0.12.5", features = ["json"] }
# Concurrency
crossbeam = { workspace = true }

# Friend integration
external-cloud-integrations = { path = "../external-cloud-integrations" }

[dev-dependencies]
tempfile = "3.3.0"

Expand Down
19 changes: 15 additions & 4 deletions screenpipe-server/src/bin/screenpipe-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ struct Cli {
/// Enable cloud OCR processing
#[arg(long, default_value_t = false)]
cloud_ocr_on: bool,

/// UID key for sending data to friend wearable (if not provided, data won't be sent)
#[arg(long)]
friend_wearable_uid: Option<String>,
}

fn get_base_dir(custom_path: Option<String>) -> anyhow::Result<PathBuf> {
Expand Down Expand Up @@ -120,7 +124,8 @@ async fn main() -> anyhow::Result<()> {
.filter(None, LevelFilter::Info)
.filter_module("tokenizers", LevelFilter::Error)
.filter_module("rusty_tesseract", LevelFilter::Error)
.filter_module("symphonia", LevelFilter::Error);
.filter_module("symphonia", LevelFilter::Error)
.filter_module("external_cloud_integrations", LevelFilter::Debug); // Add this line

if cli.debug {
builder.filter_module("screenpipe", LevelFilter::Debug);
Expand Down Expand Up @@ -270,6 +275,9 @@ async fn main() -> anyhow::Result<()> {

let vision_control_server_clone = vision_control.clone();

// Before the loop starts, clone friend_wearable_uid
let friend_wearable_uid = cli.friend_wearable_uid.clone();

// Function to start or restart the recording task
let _start_recording = tokio::spawn(async move {
// hack
Expand All @@ -280,6 +288,8 @@ async fn main() -> anyhow::Result<()> {
let local_data_dir = local_data_dir.clone();
let vision_control = vision_control.clone();
let audio_devices_control = audio_devices_control.clone();
let friend_wearable_uid_clone = friend_wearable_uid.clone(); // Clone for each iteration

tokio::select! {
_ = &mut recording_task => {
// Recording task completed or errored, restart it
Expand All @@ -300,8 +310,9 @@ async fn main() -> anyhow::Result<()> {
vision_control,
audio_devices_control,
cli.save_text_files,
cli.cloud_audio_on, // Pass the cloud_audio flag
cli.cloud_ocr_on, // Pass the cloud_ocr flag
cli.cloud_audio_on,
cli.cloud_ocr_on,
friend_wearable_uid_clone, // Use the cloned version
)
.await;

Expand Down Expand Up @@ -369,4 +380,4 @@ async fn main() -> anyhow::Result<()> {
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
57 changes: 49 additions & 8 deletions screenpipe-server/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::task::JoinHandle;
use external_cloud_integrations::friend_wearable::send_data_to_friend_wearable;

pub enum RecorderControl {
Pause,
Resume,
Expand Down Expand Up @@ -49,12 +51,13 @@ pub async fn start_continuous_recording(
vision_control: Arc<AtomicBool>,
audio_devices_control: Arc<SegQueue<(AudioDevice, DeviceControl)>>,
save_text_files: bool,
cloud_audio: bool, // Added cloud_audio parameter
cloud_ocr: bool, // Added cloud_ocr parameter
cloud_audio: bool,
cloud_ocr: bool,
friend_wearable_uid: Option<String>, // Updated parameter
) -> Result<()> {
info!("Recording now");

let (whisper_sender, whisper_receiver) = create_whisper_channel(cloud_audio).await?; // Pass cloud_audio
let (whisper_sender, whisper_receiver) = create_whisper_channel(cloud_audio).await?;

let db_manager_video = Arc::clone(&db);
let db_manager_audio = Arc::clone(&db);
Expand All @@ -64,14 +67,17 @@ pub async fn start_continuous_recording(
let output_path_video = Arc::clone(&output_path);
let output_path_audio = Arc::clone(&output_path);

let friend_wearable_uid_video = friend_wearable_uid.clone(); // Clone for video handle

let video_handle = tokio::spawn(async move {
record_video(
db_manager_video,
output_path_video,
fps,
is_running_video,
save_text_files,
cloud_ocr, // Pass the cloud_ocr flag
cloud_ocr,
friend_wearable_uid_video, // Use the cloned version
)
.await
});
Expand All @@ -84,6 +90,7 @@ pub async fn start_continuous_recording(
whisper_sender,
whisper_receiver,
audio_devices_control,
friend_wearable_uid, // Use the original
)
.await
});
Expand All @@ -101,7 +108,8 @@ async fn record_video(
fps: f64,
is_running: Arc<AtomicBool>,
save_text_files: bool,
cloud_ocr: bool, // Added cloud_ocr parameter
cloud_ocr: bool,
friend_wearable_uid: Option<String>, // Updated parameter
) -> Result<()> {
debug!("record_video: Starting");
let db_chunk_callback = Arc::clone(&db);
Expand All @@ -122,7 +130,7 @@ async fn record_video(
fps,
new_chunk_callback,
save_text_files,
cloud_ocr, // Pass the cloud_ocr flag
cloud_ocr,
);

while is_running.load(Ordering::SeqCst) {
Expand Down Expand Up @@ -154,6 +162,20 @@ async fn record_video(
);
continue; // Skip to the next iteration
}

// Send data to friend wearable
if let Some(uid) = &friend_wearable_uid {
if let Err(e) = send_data_to_friend_wearable(
"screen".to_string(),
frame_id.to_string(),
frame.text.clone(),
uid, // Pass the UID to the function
) {
error!("Failed to send screen data to friend wearable: {}", e);
} else {
debug!("Sent screen data to friend wearable for frame {}", frame_id);
}
}
}
Err(e) => {
warn!("Failed to insert frame: {}", e);
Expand All @@ -176,6 +198,7 @@ async fn record_audio(
whisper_sender: UnboundedSender<AudioInput>,
mut whisper_receiver: UnboundedReceiver<TranscriptionResult>,
audio_devices_control: Arc<SegQueue<(AudioDevice, DeviceControl)>>,
friend_wearable_uid: Option<String>, // Updated parameter
) -> Result<()> {
let mut handles: HashMap<String, JoinHandle<()>> = HashMap::new();

Expand Down Expand Up @@ -287,15 +310,19 @@ async fn record_audio(
// Process whisper results
while let Ok(transcription) = whisper_receiver.try_recv() {
info!("Received transcription");
process_audio_result(&db, transcription).await;
process_audio_result(&db, transcription, friend_wearable_uid.as_deref()).await;
}

// Small delay to prevent busy-waiting
tokio::time::sleep(Duration::from_millis(100)).await;
}
}

async fn process_audio_result(db: &DatabaseManager, result: TranscriptionResult) {
async fn process_audio_result(
db: &DatabaseManager,
result: TranscriptionResult,
friend_wearable_uid: Option<&str>, // Updated parameter
) {
if result.error.is_some() || result.transcription.is_none() {
error!(
"Error in audio recording: {}. Not inserting audio result",
Expand Down Expand Up @@ -326,6 +353,20 @@ async fn process_audio_result(db: &DatabaseManager, result: TranscriptionResult)
"Inserted audio transcription for chunk {} from device {}",
audio_chunk_id, result.input.device
);

// Send data to friend wearable
if let Some(uid) = friend_wearable_uid {
if let Err(e) = send_data_to_friend_wearable(
"audio".to_string(),
audio_chunk_id.to_string(),
transcription.clone(),
uid, // Pass the UID to the function
) {
error!("Failed to send data to friend wearable: {}", e);
} else {
debug!("Sent audio data to friend wearable for chunk {}", audio_chunk_id);
}
}
}
}
Err(e) => error!(
Expand Down
2 changes: 1 addition & 1 deletion screenpipe-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ pub use server::health_check;
pub use server::AppState;
pub use server::HealthCheckResponse;
pub use server::Server;
pub use video::VideoCapture;
pub use video::VideoCapture;

0 comments on commit 1631114

Please sign in to comment.