Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dec 20 tele fix #49

Merged
merged 14 commits into from
Dec 29, 2023
5 changes: 3 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::io::Write;

use tracing::{error, info};
use tracing::{error, info, Level};
use tracing_appender;

use crate::background_tasks::start_background_tasks;
Expand Down Expand Up @@ -41,6 +41,7 @@ async fn main() {
))
};
let _tracing = tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
.with_writer(logs_writer)
.with_target(true)
.with_line_number(true)
Expand Down Expand Up @@ -74,6 +75,6 @@ async fn main() {

background_tasks.abort().await;
info!("saving telemetry without sending, so should be quick");
basic_transmit::telemetry_full_cycle(gcx.clone(), true).await;
basic_transmit::basic_telemetry_compress(gcx.clone()).await;
info!("bb\n");
}
118 changes: 69 additions & 49 deletions src/telemetry/basic_robot_human.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::collections::HashMap;
use tracing::{error, info};
use std::sync::Arc;
use tracing::{debug, error, info};
use std::sync::{Arc, RwLockWriteGuard};
use std::sync::RwLock as StdRwLock;
use std::path::PathBuf;
use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_json::json;
use regex::Regex;

use tokio::sync::RwLock as ARwLock;

Expand All @@ -15,7 +15,51 @@ use crate::telemetry::telemetry_structs;
use crate::telemetry::telemetry_structs::{SnippetTracker, TeleRobotHumanAccum};


const ROBOT_HUMAN_FILE_STATS_UPDATE_EVERY: i64 = 15;

pub fn create_robot_human_record_if_not_exists(
tele_robot_human: &mut Vec<TeleRobotHumanAccum>,
uri: &String,
text: &String
) {
let record_mb = tele_robot_human.iter_mut().find(|stat| stat.uri.eq(uri));
if record_mb.is_some() {
return;
}
info!("create_robot_human_rec_if_not_exists: new uri {}", uri);
let record = TeleRobotHumanAccum::new(
uri.clone(),
text.clone(),
);
tele_robot_human.push(record);
}


fn update_robot_characters_baseline(
rec: &mut TeleRobotHumanAccum,
snip: &SnippetTracker
) {
let re = Regex::new(r"\s+").unwrap();
let robot_characters = re.replace_all(&snip.grey_text, "").len() as i64;
rec.robot_characters_acc_baseline += robot_characters;
}

fn basetext_to_text_leap_calculations(
rec: &mut TeleRobotHumanAccum,
baseline_text: String,
text: &String,
) {
let re = Regex::new(r"\s+").unwrap();
let (added_characters, removed_characters) = utils::get_add_del_from_texts(&baseline_text, text);

let (added_characters, _) = utils::get_add_del_chars_from_texts(&removed_characters, &added_characters);

// let real_characters_added = re.replace_all(&added_characters, "").len() as i64 - re.replace_all(&removed_characters, "").len() as i64;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove extra comments

let human_characters = re.replace_all(&added_characters, "").len() as i64 - rec.robot_characters_acc_baseline;
debug!("human_characters: +{}; robot_characters: +{}", human_characters, rec.robot_characters_acc_baseline);
rec.human_characters += human_characters;
rec.robot_characters += rec.robot_characters_acc_baseline;
rec.robot_characters_acc_baseline = 0;
}
valaises marked this conversation as resolved.
Show resolved Hide resolved


pub fn increase_counters_from_finished_snippet(
Expand All @@ -24,67 +68,40 @@ pub fn increase_counters_from_finished_snippet(
text: &String,
snip: &SnippetTracker,
) {
// Snippet is finished when it stops being valid for correction (user has changed code in a different place) or it timeouts
fn robot_characters(snip: &SnippetTracker) -> i64 {
let re = Regex::new(r"\s+").unwrap();
let robot_characters = re.replace_all(&snip.grey_text, "").len() as i64;
info!("increase_counters_from_finished_snippet: ID: {}; robot_characters: {}", snip.snippet_telemetry_id, robot_characters);
robot_characters
}
fn human_characters(rec: &TeleRobotHumanAccum, text: &String) -> i64 {
let re = Regex::new(r"\s+").unwrap();
let (added_characters, _) = utils::get_add_del_from_texts(&rec.baseline_text, text);
let human_characters = re.replace_all(&added_characters, "").len() as i64 - rec.robot_characters_acc_baseline;
human_characters
}

info!("snip grey_text: {}", snip.grey_text);
let now = chrono::Local::now().timestamp();

if let Some(rec) = tele_robot_human.iter_mut().find(|stat| stat.uri.eq(uri)) {
if rec.used_snip_ids.contains(&snip.snippet_telemetry_id) {
return;
}
let robot_characters = robot_characters(snip);
rec.robot_characters_acc_baseline += robot_characters;
rec.used_snip_ids.push(snip.snippet_telemetry_id);
if rec.baseline_updated_ts + ROBOT_HUMAN_FILE_STATS_UPDATE_EVERY < now {
// New baseline, increase counters
rec.baseline_updated_ts = now;
rec.human_characters += human_characters(rec, text);
rec.robot_characters += rec.robot_characters_acc_baseline;
rec.robot_characters_acc_baseline = 0;
rec.baseline_text = text.clone();
}
info!("increasing for {}, human+{}, robot+{}", snip.snippet_telemetry_id, human_characters(rec, text), robot_characters);
} else {
info!("increase_counters_from_finished_snippet: new uri {}", uri);
let init_file_text_mb = snip.inputs.sources.get(&snip.inputs.cursor.file);
if init_file_text_mb.is_none() {
return;

if rec.used_snip_ids.is_empty() {
rec.model = snip.model.clone();
}
let init_file_text = init_file_text_mb.unwrap();
tele_robot_human.push(TeleRobotHumanAccum::new(
uri.clone(),
snip.model.clone(),
init_file_text.clone(),
robot_characters(snip),
vec![snip.snippet_telemetry_id],
));

update_robot_characters_baseline(rec, snip);
basetext_to_text_leap_calculations(rec, rec.baseline_text.clone(), text);

rec.used_snip_ids.push(snip.snippet_telemetry_id);
rec.baseline_updated_ts = now;
rec.baseline_text = text.clone();
}
}

fn compress_robot_human(
data: &mut Vec<TeleRobotHumanAccum>
storage_locked: &mut RwLockWriteGuard<telemetry_structs::Storage>
) -> Vec<TeleRobotHuman> {
let mut unique_combinations: HashMap<(String, String), Vec<&TeleRobotHumanAccum>> = HashMap::new();
let mut unique_combinations: HashMap<(String, String), Vec<TeleRobotHumanAccum>> = HashMap::new();

let tele_robot_human = storage_locked.tele_robot_human.clone();

for accum in data {
for accum in tele_robot_human {
let key = (accum.file_extension.clone(), accum.model.clone());
unique_combinations.entry(key).or_default().push(accum);
}
let mut compressed_vec= vec![];
for (key, entries) in unique_combinations {
info!("compress_robot_human: compressing {} entries for key {:?}", entries.len(), key);
// info!("compress_robot_human: compressing {} entries for key {:?}", entries.len(), key);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same with debug!

let mut record = TeleRobotHuman::new(
key.0.clone(),
key.1.clone()
Expand Down Expand Up @@ -114,7 +131,10 @@ pub async fn tele_robot_human_compress_to_file(
enduser_client_version = cx_locked.cmdline.enduser_client_version.clone();

let mut storage_locked = storage.write().unwrap();
for rec in compress_robot_human(&mut storage_locked.tele_robot_human) {
for rec in compress_robot_human(&mut storage_locked) {
if rec.model.is_empty() && rec.robot_characters == 0 && rec.human_characters == 0 {
continue;
}
let json_dict = serde_json::to_value(rec).unwrap();
records.as_array_mut().unwrap().push(json_dict);
}
Expand Down
35 changes: 19 additions & 16 deletions src/telemetry/basic_transmit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use crate::telemetry::utils::{sorted_json_files, read_file, cleanup_old_files, t


const TELEMETRY_TRANSMIT_AFTER_START_SECONDS: u64 = 60;
const TELEMETRY_TRANSMIT_EACH_N_SECONDS: u64 = 3600*3;
const TELEMETRY_FILES_KEEP: i32 = 30;
const TELEMETRY_TRANSMIT_EACH_N_SECONDS: u64 = 3600;
const TELEMETRY_FILES_KEEP: i32 = 128;

valaises marked this conversation as resolved.
Show resolved Hide resolved

pub async fn send_telemetry_data(
Expand Down Expand Up @@ -88,11 +88,19 @@ pub async fn send_telemetry_files_to_mothership(
}
}

pub async fn telemetry_full_cycle(
pub async fn basic_telemetry_compress(
global_context: Arc<ARwLock<global_context::GlobalContext>>,
skip_sending_part: bool,
) -> () {
) {
info!("basic telemetry compression starts");
basic_network::compress_basic_telemetry_to_file(global_context.clone()).await;
basic_robot_human::tele_robot_human_compress_to_file(global_context.clone()).await;
basic_comp_counters::compress_tele_completion_to_file(global_context.clone()).await;
}

pub async fn basic_telemetry_send(
global_context: Arc<ARwLock<global_context::GlobalContext>>,
) -> () {
info!("basic telemetry sending starts");
let caps: Option<Arc<StdRwLock<CodeAssistantCaps>>>;
let api_key: String;
let enable_basic_telemetry: bool; // from command line, will not send anything if false
Expand All @@ -111,11 +119,7 @@ pub async fn telemetry_full_cycle(
telemetry_basic_dest = caps.clone().unwrap().read().unwrap().telemetry_basic_dest.clone();
}

basic_network::compress_basic_telemetry_to_file(global_context.clone()).await;
basic_robot_human::tele_robot_human_compress_to_file(global_context.clone()).await;
basic_comp_counters::compress_tele_completion_to_file(global_context.clone()).await;

if enable_basic_telemetry && !telemetry_basic_dest.is_empty() && !skip_sending_part {
if enable_basic_telemetry && !telemetry_basic_dest.is_empty() {
send_telemetry_files_to_mothership(
dir_compressed.clone(),
dir_sent.clone(),
Expand All @@ -130,9 +134,6 @@ pub async fn telemetry_full_cycle(
if telemetry_basic_dest.is_empty() {
info!("basic telemetry dest is empty, skip");
}
if skip_sending_part {
info!("skip_sending_part is true, skip");
}
}
cleanup_old_files(dir_compressed, TELEMETRY_FILES_KEEP).await;
cleanup_old_files(dir_sent, TELEMETRY_FILES_KEEP).await;
Expand All @@ -141,9 +142,11 @@ pub async fn telemetry_full_cycle(
pub async fn telemetry_background_task(
global_context: Arc<ARwLock<global_context::GlobalContext>>,
) -> () {
tokio::time::sleep(tokio::time::Duration::from_secs(TELEMETRY_TRANSMIT_AFTER_START_SECONDS)).await;
basic_telemetry_send(global_context.clone()).await;
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(TELEMETRY_TRANSMIT_AFTER_START_SECONDS)).await;
telemetry_full_cycle(global_context.clone(), false).await;
tokio::time::sleep(tokio::time::Duration::from_secs(TELEMETRY_TRANSMIT_EACH_N_SECONDS - TELEMETRY_TRANSMIT_AFTER_START_SECONDS)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(TELEMETRY_TRANSMIT_EACH_N_SECONDS)).await;
basic_telemetry_compress(global_context.clone()).await;
valaises marked this conversation as resolved.
Show resolved Hide resolved
basic_telemetry_send(global_context.clone()).await;
}
}
12 changes: 8 additions & 4 deletions src/telemetry/snippets_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::RwLock as StdRwLock;
use serde::{Serialize, Deserialize};

use tokio::sync::RwLock as ARwLock;
use tracing::info;
use tracing::{debug, info};

use crate::global_context;
use crate::completion_cache;
Expand Down Expand Up @@ -89,7 +89,7 @@ pub async fn snippet_accepted(
let snip = storage_locked.tele_snippets.iter_mut().find(|s| s.snippet_telemetry_id == snippet_telemetry_id);
if let Some(snip) = snip {
snip.accepted_ts = chrono::Local::now().timestamp();
info!("snippet_accepted: ID{}: snippet is accepted", snippet_telemetry_id);
debug!("snippet_accepted: ID{}: snippet is accepted", snippet_telemetry_id);
return true;
}
return false;
Expand All @@ -103,6 +103,9 @@ pub async fn sources_changed(
) {
let tele_storage = gcx.read().await.telemetry.clone();
let mut storage_locked = tele_storage.write().unwrap();

basic_robot_human::create_robot_human_record_if_not_exists(&mut storage_locked.tele_robot_human, uri, text);

let mut accepted_snippets = vec![];
for snip in storage_locked.tele_snippets.iter_mut() {
if snip.accepted_ts == 0 || !uri.ends_with(&snip.inputs.cursor.file) {
Expand All @@ -119,7 +122,7 @@ pub async fn sources_changed(
// if snip.id is not in the list of finished snippets, add it
if !accepted_snippets.iter().any(|s: &SnippetTracker| s.snippet_telemetry_id == snip.snippet_telemetry_id) {
accepted_snippets.push(snip.clone());
info!("sources_changed: ID{}: snippet is added to accepted", snip.snippet_telemetry_id);
debug!("sources_changed: ID{}: snippet is added to accepted", snip.snippet_telemetry_id);
}

let (grey_valid, mut grey_corrected) = utils::if_head_tail_equal_return_added_text(
Expand All @@ -135,13 +138,14 @@ pub async fn sources_changed(
} else {
if snip.remaining_percentage >= 0. {
snip.finished_ts = chrono::Local::now().timestamp();
// info!("ID{}: snippet is finished, remaining_percentage={}", snip.snippet_telemetry_id, snip.remaining_percentage);
debug!("ID{}: snippet is finished, remaining_percentage={}", snip.snippet_telemetry_id, snip.remaining_percentage);
} else {
snip.accepted_ts = 0; // that will cleanup and not send
}
}
}


for snip in accepted_snippets {
basic_robot_human::increase_counters_from_finished_snippet(&mut storage_locked.tele_robot_human, uri, text, &snip);
basic_comp_counters::create_data_accumulator_for_finished_snippet(&mut storage_locked.snippet_data_accumulators, uri, &snip);
Expand Down
10 changes: 5 additions & 5 deletions src/telemetry/telemetry_structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub struct SnippetTracker {
pub finished_ts: i64,
}

#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize, Clone, Default)]
pub struct TeleRobotHumanAccum {
// Internal struct, not sent anywhere
pub uri: String,
Expand All @@ -78,18 +78,18 @@ pub struct TeleRobotHumanAccum {

impl TeleRobotHumanAccum {
pub fn new(
uri: String, model: String, baseline_text: String, robot_characters_acc_baseline: i64, used_snip_ids: Vec<u64>
uri: String, baseline_text: String
) -> Self {
Self {
uri: uri.clone(),
file_extension: utils::extract_extension_or_filename(&uri),
model,
model: "".to_string(),
baseline_text,
baseline_updated_ts: 0,
robot_characters_acc_baseline,
robot_characters_acc_baseline: 0,
robot_characters: 0,
human_characters: 0,
used_snip_ids,
used_snip_ids: vec![],
}
}
}
Expand Down
Loading