Skip to content

Commit

Permalink
Temp commit for OpLogFilter development direction
Browse files Browse the repository at this point in the history
test commit
  • Loading branch information
henry0715-dev committed Sep 11, 2024
1 parent 4a7e4ab commit 880ab3d
Show file tree
Hide file tree
Showing 6 changed files with 402 additions and 37 deletions.
236 changes: 233 additions & 3 deletions src/graphql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,161 @@ where
Ok((records, has_previous, has_next))
}

#[allow(clippy::too_many_lines)]
fn get_connection_by_prefix_timestamp_key<T>(
store: &RawEventStore<'_, T>,
filter: &(impl RawEventFilter + KeyExtractorByEndKey),
after: Option<String>,
before: Option<String>,
first: Option<usize>,
last: Option<usize>,
) -> Result<ConnArgs<T>>
where
T: DeserializeOwned + EventFilter,
{
let (records, has_previous, has_next) = if let Some(before) = before {
if after.is_some() {
return Err("cannot use both `after` and `before`".into());
}
if first.is_some() {
return Err("'before' and 'first' cannot be specified simultaneously".into());
}

let last = last.unwrap_or(MAXIMUM_PAGE_SIZE).min(MAXIMUM_PAGE_SIZE);
let cursor = base64_engine.decode(before)?;

Check warning on line 384 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L376-L384

Added lines #L376 - L384 were not covered by tests

// generate storage search key
let key_builder = StorageKey::builder2();
let from_key = key_builder
.clone()
.upper_open_bound_end_key(filter.get_range_end_key().1)
.mid_key(filter.get_mid_key())
.end_key(filter.get_end_key())
.build();
let to_key = key_builder
.lower_closed_bound_end_key(filter.get_range_end_key().0)
.mid_key(filter.get_mid_key())
.end_key(filter.get_end_key())
.build();

if cursor.cmp(&from_key.key()) == std::cmp::Ordering::Greater {
return Err("invalid cursor".into());
}
let mut iter = store
.boundary_iter(&cursor, &to_key.key(), Direction::Reverse)
.peekable();
if let Some(Ok((key, _))) = iter.peek() {
if key.as_ref() == cursor {
iter.next();
}
}
let (mut records, has_previous) = collect_records_with_timestamp_fn(
iter,
last,
filter,
Some(&get_timestamp_from_key_prefix),
);
records.reverse();
(records, has_previous, false)

Check warning on line 418 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L387-L418

Added lines #L387 - L418 were not covered by tests
} else if let Some(after) = after {
if before.is_some() {
return Err("cannot use both `after` and `before`".into());
}
if last.is_some() {
return Err("'after' and 'last' cannot be specified simultaneously".into());
}
let first = first.unwrap_or(MAXIMUM_PAGE_SIZE).min(MAXIMUM_PAGE_SIZE);
let cursor = base64_engine.decode(after)?;

Check warning on line 427 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L420-L427

Added lines #L420 - L427 were not covered by tests

// generate storage search key
let key_builder = StorageKey::builder2();
let from_key = key_builder
.clone()
.lower_closed_bound_end_key(filter.get_range_end_key().0)
.mid_key(filter.get_mid_key())
.end_key(filter.get_end_key())
.build();
let to_key = key_builder
.upper_open_bound_end_key(filter.get_range_end_key().1)
.mid_key(filter.get_mid_key())
.end_key(filter.get_end_key())
.build();

if cursor.cmp(&from_key.key()) == std::cmp::Ordering::Less {
return Err("invalid cursor".into());
}
let mut iter = store
.boundary_iter(&cursor, &to_key.key(), Direction::Forward)
.peekable();
if let Some(Ok((key, _))) = iter.peek() {
if key.as_ref() == cursor {
iter.next();
}
}
let (records, has_next) = collect_records_with_timestamp_fn(
iter,
first,
filter,
Some(&get_timestamp_from_key_prefix),
);
(records, false, has_next)

Check warning on line 460 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L430-L460

Added lines #L430 - L460 were not covered by tests
} else if let Some(last) = last {
if first.is_some() {
return Err("first and last cannot be used together".into());
}
let last = last.min(MAXIMUM_PAGE_SIZE);

// generate storage search key
let key_builder = StorageKey::builder2();
let from_key = key_builder
.clone()
.upper_closed_bound_end_key(filter.get_range_end_key().1)
.mid_key(filter.get_mid_key())
.end_key(filter.get_end_key())
.build();
let to_key = key_builder
.lower_closed_bound_end_key(filter.get_range_end_key().0)
.mid_key(filter.get_mid_key())
.end_key(filter.get_end_key())
.build();

let iter = store.boundary_iter(&from_key.key(), &to_key.key(), Direction::Reverse);
let (mut records, has_previous) = collect_records_with_timestamp_fn(
iter,
last,
filter,
Some(&get_timestamp_from_key_prefix),
);
records.reverse();
(records, has_previous, false)

Check warning on line 489 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L462-L489

Added lines #L462 - L489 were not covered by tests
} else {
let first = first.unwrap_or(MAXIMUM_PAGE_SIZE).min(MAXIMUM_PAGE_SIZE);
// generate storage search key
let key_builder = StorageKey::builder2();
let from_key = key_builder
.clone()
.lower_closed_bound_end_key(filter.get_range_end_key().0)
.mid_key(filter.get_mid_key())
.end_key(filter.get_end_key())
.build();
let to_key = key_builder
.upper_open_bound_end_key(filter.get_range_end_key().1)
.mid_key(filter.get_mid_key())
.end_key(filter.get_end_key())
.build();

let iter = store.boundary_iter(&from_key.key(), &to_key.key(), Direction::Forward);
let (records, has_next) = collect_records_with_timestamp_fn(
iter,
first,
filter,
Some(&get_timestamp_from_key_prefix),
);
(records, false, has_next)
};
Ok((records, has_previous, has_next))
}

fn load_connection<N, T>(
store: &RawEventStore<'_, T>,
filter: &(impl RawEventFilter + KeyExtractor),
Expand All @@ -375,6 +530,37 @@ where
let (records, has_previous, has_next) =
get_connection(store, filter, after, before, first, last)?;

create_connection(records, has_previous, has_next)
}

fn load_connection_by_prefix_timestamp_key<N, T>(
store: &RawEventStore<'_, T>,
filter: &(impl RawEventFilter + KeyExtractorByEndKey),
after: Option<String>,
before: Option<String>,
first: Option<usize>,
last: Option<usize>,
) -> Result<Connection<String, N>>
where
N: FromKeyValue<T> + OutputType,
T: DeserializeOwned + EventFilter,
{
let (records, has_previous, has_next) =
get_connection_by_prefix_timestamp_key(store, filter, after, before, first, last)?;

create_connection(records, has_previous, has_next)
}

#[allow(clippy::unnecessary_wraps)]
fn create_connection<N, T>(
records: Vec<(Box<[u8]>, T)>,
has_previous: bool,
has_next: bool,
) -> Result<Connection<String, N>>
where
N: FromKeyValue<T> + OutputType,
T: DeserializeOwned,
{
let mut connection: Connection<String, N> = Connection::new(has_previous, has_next);
connection.edges = records
.into_iter()
Expand All @@ -388,10 +574,12 @@ where
Ok(connection)
}

fn collect_records<I, T>(
#[allow(clippy::type_complexity)]
fn collect_records_internal<I, T>(
mut iter: I,
size: usize,
filter: &impl RawEventFilter,
get_timestamp: Option<&dyn Fn(&[u8]) -> Result<DateTime<Utc>, anyhow::Error>>,
) -> (Vec<KeyValue<T>>, bool)
where
I: Iterator<Item = anyhow::Result<(Box<[u8]>, T)>>,
Expand All @@ -408,6 +596,12 @@ where
let item = item.expect("not error value");
let data_type = item.1.data_type();

if let Some(get_timestamp_fn) = get_timestamp {
if let Ok(datetime) = get_timestamp_fn(&item.0) {
println!("datetime : {}, item : {:?}", datetime, item.1.agent_id());
}
}

match filter.check(
item.1.orig_addr(),
item.1.resp_addr(),
Expand Down Expand Up @@ -435,10 +629,44 @@ where
(records, has_more)
}

fn collect_records<I, T>(
iter: I,
size: usize,
filter: &impl RawEventFilter,
) -> (Vec<KeyValue<T>>, bool)
where
I: Iterator<Item = anyhow::Result<(Box<[u8]>, T)>>,
T: EventFilter,
{
collect_records_internal(iter, size, filter, None)
}

#[allow(clippy::type_complexity)]
fn collect_records_with_timestamp_fn<I, T>(
iter: I,
size: usize,
filter: &impl RawEventFilter,
timestamp_fn: Option<&dyn Fn(&[u8]) -> Result<DateTime<Utc>, anyhow::Error>>,
) -> (Vec<KeyValue<T>>, bool)
where
I: Iterator<Item = anyhow::Result<(Box<[u8]>, T)>>,
T: EventFilter,
{
collect_records_internal(iter, size, filter, timestamp_fn)
}

pub fn get_timestamp_from_key_prefix(key: &[u8]) -> Result<DateTime<Utc>, anyhow::Error> {
if key.len() > TIMESTAMP_SIZE {
let timestamp = i64::from_be_bytes(key[0..8].try_into()?);
return Ok(Utc.timestamp_nanos(timestamp));
}
Err(anyhow!("invalid database key length"))

Check warning on line 663 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L662-L663

Added lines #L662 - L663 were not covered by tests
}

pub fn get_timestamp_from_key(key: &[u8]) -> Result<DateTime<Utc>, anyhow::Error> {
if key.len() > TIMESTAMP_SIZE {
let nanos = i64::from_be_bytes(key[(key.len() - TIMESTAMP_SIZE)..].try_into()?);
return Ok(Utc.timestamp_nanos(nanos));
let timestamp = i64::from_be_bytes(key[(key.len() - TIMESTAMP_SIZE)..].try_into()?);
return Ok(Utc.timestamp_nanos(timestamp));
}
Err(anyhow!("invalid database key length"))
}
Expand Down Expand Up @@ -1513,6 +1741,8 @@ macro_rules! impl_from_giganto_search_filter_for_graphql_client {
}
pub(crate) use impl_from_giganto_search_filter_for_graphql_client;

use crate::storage::KeyExtractorByEndKey;

#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
Expand Down
2 changes: 1 addition & 1 deletion src/graphql/client/schema/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,7 @@ type NtlmRawEventEdge {

input OpLogFilter {
time: TimeRange
agentId: String!
agentId: String
logLevel: String
contents: String
}
Expand Down
33 changes: 21 additions & 12 deletions src/graphql/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ use graphql_client::GraphQLQuery;
use super::{
base64_engine,
client::derives::{log_raw_events, LogRawEvents},
get_timestamp_from_key, handle_paged_events,
impl_from_giganto_time_range_struct_for_graphql_client, load_connection,
paged_events_in_cluster, Engine, FromKeyValue,
get_timestamp_from_key, get_timestamp_from_key_prefix, handle_paged_events,
impl_from_giganto_time_range_struct_for_graphql_client,
load_connection_by_prefix_timestamp_key, paged_events_in_cluster, Engine, FromKeyValue,
};
use crate::{
graphql::{RawEventFilter, TimeRange},
storage::{Database, KeyExtractor},
storage::{Database, KeyExtractor, KeyExtractorByEndKey},
};

#[derive(Default)]
Expand Down Expand Up @@ -74,14 +74,14 @@ impl RawEventFilter for LogFilter {
#[derive(InputObject)]
pub struct OpLogFilter {
time: Option<TimeRange>,
agent_id: String,
agent_id: Option<String>,
log_level: Option<String>,
contents: Option<String>,
}

impl KeyExtractor for OpLogFilter {
fn get_start_key(&self) -> &str {
&self.agent_id
impl KeyExtractorByEndKey for OpLogFilter {
fn get_end_key(&self) -> &str {
self.agent_id.as_deref().unwrap_or("")
}

// oplog event don't use mid key
Expand Down Expand Up @@ -109,7 +109,7 @@ impl RawEventFilter for OpLogFilter {
log_contents: Option<String>,
_text: Option<String>,
_source: Option<String>,
_agent_id: Option<String>,
agent_id: Option<String>,
) -> Result<bool> {
if let Some(filter_level) = &self.log_level {
let log_level = if let Some(log_level) = log_level {
Expand All @@ -131,6 +131,16 @@ impl RawEventFilter for OpLogFilter {
return Ok(false);
}
}
if let Some(filter_agent_id) = &self.agent_id {
let is_agent_id_mismatch = if let Some(agent_id) = agent_id {
filter_agent_id != &agent_id
} else {
false

Check warning on line 138 in src/graphql/log.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql/log.rs#L138

Added line #L138 was not covered by tests
};
if is_agent_id_mismatch {
return Ok(false);

Check warning on line 141 in src/graphql/log.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql/log.rs#L141

Added line #L141 was not covered by tests
}
}
Ok(true)
}
}
Expand Down Expand Up @@ -161,7 +171,7 @@ struct OpLogRawEvent {
impl FromKeyValue<OpLog> for OpLogRawEvent {
fn from_key_value(key: &[u8], l: OpLog) -> Result<Self> {
Ok(OpLogRawEvent {
timestamp: get_timestamp_from_key(key)?,
timestamp: get_timestamp_from_key_prefix(key)?,
level: format!("{:?}", l.log_level),
contents: l.contents,
})
Expand Down Expand Up @@ -226,14 +236,13 @@ impl LogQuery {
) -> Result<Connection<String, OpLogRawEvent>> {
let db = ctx.data::<Database>()?;
let store = db.op_log_store()?;

query(
after,
before,
first,
last,
|after, before, first, last| async move {
load_connection(&store, &filter, after, before, first, last)
load_connection_by_prefix_timestamp_key(&store, &filter, after, before, first, last)
},
)
.await
Expand Down
Loading

0 comments on commit 880ab3d

Please sign in to comment.