Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dec 20 tele fix #49

Merged
merged 14 commits into from
Dec 29, 2023
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,6 @@ async fn main() {

background_tasks.abort().await;
info!("saving telemetry without sending, so should be quick");
basic_transmit::telemetry_full_cycle(gcx.clone(), true).await;
basic_transmit::basic_telemetry_compress(gcx.clone()).await;
info!("bb\n");
}
113 changes: 65 additions & 48 deletions src/telemetry/basic_robot_human.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::collections::HashMap;
use tracing::{error, info};
use std::sync::Arc;
use std::sync::{Arc, RwLockWriteGuard};
use std::sync::RwLock as StdRwLock;
use std::path::PathBuf;
use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_json::json;
use regex::Regex;

use tokio::sync::RwLock as ARwLock;

Expand All @@ -15,7 +15,51 @@ use crate::telemetry::telemetry_structs;
use crate::telemetry::telemetry_structs::{SnippetTracker, TeleRobotHumanAccum};


const ROBOT_HUMAN_FILE_STATS_UPDATE_EVERY: i64 = 15;

pub fn create_robot_human_record_if_not_exists(
tele_robot_human: &mut Vec<TeleRobotHumanAccum>,
uri: &String,
text: &String
) {
let record_mb = tele_robot_human.iter_mut().find(|stat| stat.uri.eq(uri));
if record_mb.is_some() {
return;
}
info!("create_robot_human_rec_if_not_exists: new uri {}", uri);
let record = TeleRobotHumanAccum::new(
uri.clone(),
text.clone(),
);
tele_robot_human.push(record);
}


fn update_robot_characters_baseline(
rec: &mut TeleRobotHumanAccum,
snip: &SnippetTracker
) {
let re = Regex::new(r"\s+").unwrap();
let robot_characters = re.replace_all(&snip.grey_text, "").len() as i64;
rec.robot_characters_acc_baseline += robot_characters;
}

fn basetext_to_text_leap_calculations(
rec: &mut TeleRobotHumanAccum,
baseline_text: String,
text: &String,
) {
let re = Regex::new(r"\s+").unwrap();
let (added_characters, removed_characters) = utils::get_add_del_from_texts(&baseline_text, text, false);

let (added_characters, _) = utils::get_add_del_chars_from_texts(&removed_characters, &added_characters);

// let real_characters_added = re.replace_all(&added_characters, "").len() as i64 - re.replace_all(&removed_characters, "").len() as i64;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove extra comments

let human_characters = re.replace_all(&added_characters, "").len() as i64 - rec.robot_characters_acc_baseline;
info!("human_characters: +{}; robot_characters: +{}", human_characters, rec.robot_characters_acc_baseline);
rec.human_characters += human_characters;
rec.robot_characters += rec.robot_characters_acc_baseline;
rec.robot_characters_acc_baseline = 0;
}
valaises marked this conversation as resolved.
Show resolved Hide resolved


pub fn increase_counters_from_finished_snippet(
Expand All @@ -24,67 +68,40 @@ pub fn increase_counters_from_finished_snippet(
text: &String,
snip: &SnippetTracker,
) {
// Snippet is finished when it stops being valid for correction (user has changed code in a different place) or it timeouts
fn robot_characters(snip: &SnippetTracker) -> i64 {
let re = Regex::new(r"\s+").unwrap();
let robot_characters = re.replace_all(&snip.grey_text, "").len() as i64;
info!("increase_counters_from_finished_snippet: ID: {}; robot_characters: {}", snip.snippet_telemetry_id, robot_characters);
robot_characters
}
fn human_characters(rec: &TeleRobotHumanAccum, text: &String) -> i64 {
let re = Regex::new(r"\s+").unwrap();
let (added_characters, _) = utils::get_add_del_from_texts(&rec.baseline_text, text);
let human_characters = re.replace_all(&added_characters, "").len() as i64 - rec.robot_characters_acc_baseline;
human_characters
}

info!("snip grey_text: {}", snip.grey_text);
let now = chrono::Local::now().timestamp();

if let Some(rec) = tele_robot_human.iter_mut().find(|stat| stat.uri.eq(uri)) {
if rec.used_snip_ids.contains(&snip.snippet_telemetry_id) {
return;
}
let robot_characters = robot_characters(snip);
rec.robot_characters_acc_baseline += robot_characters;
rec.used_snip_ids.push(snip.snippet_telemetry_id);
if rec.baseline_updated_ts + ROBOT_HUMAN_FILE_STATS_UPDATE_EVERY < now {
// New baseline, increase counters
rec.baseline_updated_ts = now;
rec.human_characters += human_characters(rec, text);
rec.robot_characters += rec.robot_characters_acc_baseline;
rec.robot_characters_acc_baseline = 0;
rec.baseline_text = text.clone();
}
info!("increasing for {}, human+{}, robot+{}", snip.snippet_telemetry_id, human_characters(rec, text), robot_characters);
} else {
info!("increase_counters_from_finished_snippet: new uri {}", uri);
let init_file_text_mb = snip.inputs.sources.get(&snip.inputs.cursor.file);
if init_file_text_mb.is_none() {
return;

if rec.used_snip_ids.is_empty() {
rec.model = snip.model.clone();
}
let init_file_text = init_file_text_mb.unwrap();
tele_robot_human.push(TeleRobotHumanAccum::new(
uri.clone(),
snip.model.clone(),
init_file_text.clone(),
robot_characters(snip),
vec![snip.snippet_telemetry_id],
));

update_robot_characters_baseline(rec, snip);
basetext_to_text_leap_calculations(rec, rec.baseline_text.clone(), text);

rec.used_snip_ids.push(snip.snippet_telemetry_id);
rec.baseline_updated_ts = now;
rec.baseline_text = text.clone();
}
}

fn compress_robot_human(
data: &mut Vec<TeleRobotHumanAccum>
storage_locked: &mut RwLockWriteGuard<telemetry_structs::Storage>
) -> Vec<TeleRobotHuman> {
let mut unique_combinations: HashMap<(String, String), Vec<&TeleRobotHumanAccum>> = HashMap::new();
let mut unique_combinations: HashMap<(String, String), Vec<TeleRobotHumanAccum>> = HashMap::new();

let tele_robot_human = storage_locked.tele_robot_human.clone();

for accum in data {
for accum in tele_robot_human {
let key = (accum.file_extension.clone(), accum.model.clone());
unique_combinations.entry(key).or_default().push(accum);
}
let mut compressed_vec= vec![];
for (key, entries) in unique_combinations {
info!("compress_robot_human: compressing {} entries for key {:?}", entries.len(), key);
// info!("compress_robot_human: compressing {} entries for key {:?}", entries.len(), key);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same with debug!

let mut record = TeleRobotHuman::new(
key.0.clone(),
key.1.clone()
Expand Down Expand Up @@ -114,7 +131,7 @@ pub async fn tele_robot_human_compress_to_file(
enduser_client_version = cx_locked.cmdline.enduser_client_version.clone();

let mut storage_locked = storage.write().unwrap();
for rec in compress_robot_human(&mut storage_locked.tele_robot_human) {
for rec in compress_robot_human(&mut storage_locked) {
let json_dict = serde_json::to_value(rec).unwrap();
records.as_array_mut().unwrap().push(json_dict);
}
Expand Down
35 changes: 19 additions & 16 deletions src/telemetry/basic_transmit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use crate::telemetry::utils::{sorted_json_files, read_file, cleanup_old_files, t


const TELEMETRY_TRANSMIT_AFTER_START_SECONDS: u64 = 60;
const TELEMETRY_TRANSMIT_EACH_N_SECONDS: u64 = 3600*3;
const TELEMETRY_FILES_KEEP: i32 = 30;
const TELEMETRY_TRANSMIT_EACH_N_SECONDS: u64 = 3600;
const TELEMETRY_FILES_KEEP: i32 = 128;

valaises marked this conversation as resolved.
Show resolved Hide resolved

pub async fn send_telemetry_data(
Expand Down Expand Up @@ -88,11 +88,19 @@ pub async fn send_telemetry_files_to_mothership(
}
}

pub async fn telemetry_full_cycle(
pub async fn basic_telemetry_compress(
global_context: Arc<ARwLock<global_context::GlobalContext>>,
skip_sending_part: bool,
) -> () {
) {
info!("basic telemetry compression starts");
basic_network::compress_basic_telemetry_to_file(global_context.clone()).await;
basic_robot_human::tele_robot_human_compress_to_file(global_context.clone()).await;
basic_comp_counters::compress_tele_completion_to_file(global_context.clone()).await;
}

pub async fn basic_telemetry_send(
global_context: Arc<ARwLock<global_context::GlobalContext>>,
) -> () {
info!("basic telemetry sending starts");
let caps: Option<Arc<StdRwLock<CodeAssistantCaps>>>;
let api_key: String;
let enable_basic_telemetry: bool; // from command line, will not send anything if false
Expand All @@ -111,11 +119,7 @@ pub async fn telemetry_full_cycle(
telemetry_basic_dest = caps.clone().unwrap().read().unwrap().telemetry_basic_dest.clone();
}

basic_network::compress_basic_telemetry_to_file(global_context.clone()).await;
basic_robot_human::tele_robot_human_compress_to_file(global_context.clone()).await;
basic_comp_counters::compress_tele_completion_to_file(global_context.clone()).await;

if enable_basic_telemetry && !telemetry_basic_dest.is_empty() && !skip_sending_part {
if enable_basic_telemetry && !telemetry_basic_dest.is_empty() {
send_telemetry_files_to_mothership(
dir_compressed.clone(),
dir_sent.clone(),
Expand All @@ -130,9 +134,6 @@ pub async fn telemetry_full_cycle(
if telemetry_basic_dest.is_empty() {
info!("basic telemetry dest is empty, skip");
}
if skip_sending_part {
info!("skip_sending_part is true, skip");
}
}
cleanup_old_files(dir_compressed, TELEMETRY_FILES_KEEP).await;
cleanup_old_files(dir_sent, TELEMETRY_FILES_KEEP).await;
Expand All @@ -141,9 +142,11 @@ pub async fn telemetry_full_cycle(
pub async fn telemetry_background_task(
global_context: Arc<ARwLock<global_context::GlobalContext>>,
) -> () {
tokio::time::sleep(tokio::time::Duration::from_secs(TELEMETRY_TRANSMIT_AFTER_START_SECONDS)).await;
basic_telemetry_send(global_context.clone()).await;
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(TELEMETRY_TRANSMIT_AFTER_START_SECONDS)).await;
telemetry_full_cycle(global_context.clone(), false).await;
tokio::time::sleep(tokio::time::Duration::from_secs(TELEMETRY_TRANSMIT_EACH_N_SECONDS - TELEMETRY_TRANSMIT_AFTER_START_SECONDS)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(TELEMETRY_TRANSMIT_EACH_N_SECONDS)).await;
basic_telemetry_send(global_context.clone()).await;
basic_telemetry_compress(global_context.clone()).await;
valaises marked this conversation as resolved.
Show resolved Hide resolved
}
}
4 changes: 4 additions & 0 deletions src/telemetry/snippets_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ pub async fn sources_changed(
) {
let tele_storage = gcx.read().await.telemetry.clone();
let mut storage_locked = tele_storage.write().unwrap();

basic_robot_human::create_robot_human_record_if_not_exists(&mut storage_locked.tele_robot_human, uri, text);

let mut accepted_snippets = vec![];
for snip in storage_locked.tele_snippets.iter_mut() {
if snip.accepted_ts == 0 || !uri.ends_with(&snip.inputs.cursor.file) {
Expand Down Expand Up @@ -142,6 +145,7 @@ pub async fn sources_changed(
}
}


for snip in accepted_snippets {
basic_robot_human::increase_counters_from_finished_snippet(&mut storage_locked.tele_robot_human, uri, text, &snip);
basic_comp_counters::create_data_accumulator_for_finished_snippet(&mut storage_locked.snippet_data_accumulators, uri, &snip);
Expand Down
10 changes: 5 additions & 5 deletions src/telemetry/telemetry_structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub struct SnippetTracker {
pub finished_ts: i64,
}

#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize, Clone, Default)]
pub struct TeleRobotHumanAccum {
// Internal struct, not sent anywhere
pub uri: String,
Expand All @@ -78,18 +78,18 @@ pub struct TeleRobotHumanAccum {

impl TeleRobotHumanAccum {
pub fn new(
uri: String, model: String, baseline_text: String, robot_characters_acc_baseline: i64, used_snip_ids: Vec<u64>
uri: String, baseline_text: String
) -> Self {
Self {
uri: uri.clone(),
file_extension: utils::extract_extension_or_filename(&uri),
model,
model: "".to_string(),
baseline_text,
baseline_updated_ts: 0,
robot_characters_acc_baseline,
robot_characters_acc_baseline: 0,
robot_characters: 0,
human_characters: 0,
used_snip_ids,
used_snip_ids: vec![],
}
}
}
Expand Down
60 changes: 55 additions & 5 deletions src/telemetry/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,78 @@ pub async fn telemetry_storage_dirs(cache_dir: &PathBuf) -> (PathBuf, PathBuf) {
pub fn get_add_del_from_texts(
text_a: &String,
text_b: &String,
only_one_deletion_allowed: bool,
) -> (String, String) {
let diff = TextDiff::from_lines(text_a, text_b);
let mut text_a_lines = text_a.lines().collect::<Vec<&str>>();
let mut text_b_lines = text_b.lines().collect::<Vec<&str>>();

for s in &mut text_a_lines {
*s = s.trim_end().trim_start();
// info!("text_a: {}; len: {}", s, s.len());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if u not sure, use debug!() macros. I will add extra param for debug lvl https://docs.rs/tracing/latest/tracing/macro.debug.html

}

for s in &mut text_b_lines {
*s = s.trim_end().trim_start();
// info!("text_b: {}; len: {}", s, s.len());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

}

let mut text_a_new = text_a_lines.join("\n");
let text_b_new = text_b_lines.join("\n");

if !text_a_new.ends_with("\n") {
text_a_new += "\n";
}

let diff = TextDiff::from_lines(&text_a_new, &text_b_new);

let mut added = "".to_string();
let mut removed = "".to_string();
for change in diff.iter_all_changes() {
match change.tag() {
ChangeTag::Delete => {
removed += change.value();
// info!("rem: {}; len: {}", change.value(), change.value().len());
if only_one_deletion_allowed {
removed = change.value().to_string();
} else {
removed += change.value();
}
valaises marked this conversation as resolved.
Show resolved Hide resolved
}
ChangeTag::Insert => {
added += change.value();
// info!("add: {}; len: {}", change.value(), change.value().len());
}
ChangeTag::Equal => {
}
}
}
added = added.replace("\r", "");
removed = added.replace("\r", "");

(added, removed)
}


pub fn get_add_del_chars_from_texts(
text_a: &String,
text_b: &String,
) -> (String, String) {
let diff = TextDiff::from_chars(text_a, text_b);
let mut added = "".to_string();
let mut removed = "".to_string();
for change in diff.iter_all_changes() {
match change.tag() {
ChangeTag::Delete => {
removed += change.value();
}
ChangeTag::Insert => {
added += change.value();
}
ChangeTag::Equal => {
}
}
}

(added, removed)
}

pub async fn file_save(path: PathBuf, json: serde_json::Value) -> Result<(), String> {
let mut f = tokio::fs::File::create(path).await.map_err(|e| format!("{:?}", e))?;
f.write_all(serde_json::to_string_pretty(&json).unwrap().as_bytes()).await.map_err(|e| format!("{}", e))?;
Expand Down Expand Up @@ -282,7 +332,7 @@ pub fn unchanged_percentage_approx(
}
}

let (texts_ab_added, _) = get_add_del_from_texts(text_a, text_b);
let (texts_ab_added, _) = get_add_del_from_texts(text_a, text_b, false);

// info!("unchanged_percentage_approx for snip:\n{grey_text_a}");
if texts_ab_added.is_empty() {
Expand Down