From 41ab6d3cb73ad3ec9827ef8f917c0d48e86c9f75 Mon Sep 17 00:00:00 2001 From: Praveen Kumar Date: Tue, 26 Nov 2024 16:22:53 +0000 Subject: [PATCH] feat: framework for sys events closes: https://github.com/influxdata/influxdb/issues/25581 --- Cargo.lock | 154 +++++++++ Cargo.toml | 2 + influxdb3/Cargo.toml | 2 + influxdb3_sys_events/Cargo.toml | 28 ++ influxdb3_sys_events/src/lib.rs | 583 ++++++++++++++++++++++++++++++++ 5 files changed, 769 insertions(+) create mode 100644 influxdb3_sys_events/Cargo.toml create mode 100644 influxdb3_sys_events/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index ef9ca8696c0..6a37cd0c72f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -605,6 +605,79 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "bevy_macro_utils" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfc65e570012e64a21f3546df68591aaede8349e6174fb500071677f54f06630" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", + "toml_edit", +] + +[[package]] +name = "bevy_ptr" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61baa1bdc1f4a7ac2c18217570a7cc04e1cd54d38456e91782f0371c79afe0a8" + +[[package]] +name = "bevy_reflect" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2508785a4a5809f25a237eec4fee2c91a4dbcf81324b2bbc2d6c52629e603781" +dependencies = [ + "bevy_ptr", + "bevy_reflect_derive", + "bevy_utils", + "downcast-rs", + "erased-serde", + "serde", + "smallvec", + "thiserror 1.0.69", +] + +[[package]] +name = "bevy_reflect_derive" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "967d5da1882ec3bb3675353915d3da909cafac033cbf31e58727824a1ad2a288" +dependencies = [ + "bevy_macro_utils", + "proc-macro2", + "quote", + "syn 2.0.87", + "uuid", +] + +[[package]] +name = "bevy_utils" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffb0ec333b5965771153bd746f92ffd8aeeb9d008a8620ffd9ed474859381a5e" +dependencies = [ + "ahash", + "bevy_utils_proc_macros", + "getrandom", + "hashbrown 0.14.5", + "thread_local", + "tracing", + "web-time", +] + +[[package]] +name = "bevy_utils_proc_macros" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38f1ab8f2f6f58439d260081d89a42b02690e5fdd64f814edc9417d33fcf2857" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "bimap" version = "0.6.3" @@ -1779,6 +1852,12 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" +[[package]] +name = "downcast-rs" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75b325c5dbd37f80359721ad39aca5a29fb04c89279657cffdda8736d0c0b9d2" + [[package]] name = "ed25519" version = "2.2.3" @@ -1854,6 +1933,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "erased-serde" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24e2389d65ab4fab27dc2a5de7b191e1f6617d1f1c8855c0dc569c94a4cbb18d" +dependencies = [ + "serde", + "typeid", +] + [[package]] name = "errno" version = "0.3.9" @@ -2136,8 +2225,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -2215,6 +2306,7 @@ checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" dependencies = [ "ahash", "allocator-api2", + "serde", ] [[package]] @@ -2699,6 +2791,7 @@ dependencies = [ "authz", "backtrace", "base64 0.22.1", + "bevy_reflect", "clap", "clap_blocks", "console-subscriber", @@ -2714,6 +2807,7 @@ dependencies = [ "influxdb3_client", "influxdb3_process", "influxdb3_server", + "influxdb3_sys_events", "influxdb3_telemetry", "influxdb3_wal", "influxdb3_write", @@ -2935,6 +3029,24 @@ dependencies = [ "urlencoding", ] +[[package]] +name = "influxdb3_sys_events" +version = "0.1.0" +dependencies = [ + "arrow", + "arrow-array", + "async-trait", + "bevy_reflect", + "chrono", + "dashmap", + "datafusion", + "influxdb3_server", + "iox_system_tables", + "iox_time", + "proptest", + "test-log", +] + [[package]] name = "influxdb3_telemetry" version = "0.1.0" @@ -6173,6 +6285,23 @@ dependencies = [ "workspace-hack", ] +[[package]] +name = "toml_datetime" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0dd7358ecb8fc2f8d014bf86f6f638ce72ba252a2c3a2572f2a795f1d23efb41" + +[[package]] +name = "toml_edit" +version = "0.22.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ae48d6208a266e853d946088ed816055e556cc6028c5e8e2b84d9fa5dd7c7f5" +dependencies = [ + "indexmap 2.6.0", + "toml_datetime", + "winnow", +] + [[package]] name = "tonic" version = "0.9.2" @@ -6480,6 +6609,12 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "typeid" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e13db2e0ccd5e14a544e8a246ba2312cd25223f616442d7f2cb0e3db614236e" + [[package]] name = "typenum" version = "1.17.0" @@ -6748,6 +6883,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-roots" version = "0.25.4" @@ -7054,6 +7199,15 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "winnow" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36c1fec1a2bb5866f07c25f68c26e565c4c200aebb96d7e55710c19d3e8ac49b" +dependencies = [ + "memchr", +] + [[package]] name = "winreg" version = "0.50.0" diff --git a/Cargo.toml b/Cargo.toml index 38ea96b3c07..35a9d1a8378 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ members = [ "influxdb3_load_generator", "influxdb3_process", "influxdb3_server", + "influxdb3_sys_events", "influxdb3_telemetry", "influxdb3_test_helpers", "influxdb3_wal", @@ -49,6 +50,7 @@ assert_cmd = "2.0.14" async-trait = "0.1" backtrace = "0.3" base64 = "0.22.0" +bevy_reflect = "0.14.2" bimap = "0.6.3" byteorder = "1.3.4" bytes = "1.8" diff --git a/influxdb3/Cargo.toml b/influxdb3/Cargo.toml index 3b4d0687f77..e17bcef64ca 100644 --- a/influxdb3/Cargo.toml +++ b/influxdb3/Cargo.toml @@ -32,6 +32,7 @@ influxdb3_server = { path = "../influxdb3_server" } influxdb3_wal = { path = "../influxdb3_wal" } influxdb3_write = { path = "../influxdb3_write" } influxdb3_telemetry = { path = "../influxdb3_telemetry" } +influxdb3_sys_events = { path = "../influxdb3_sys_events" } # Crates.io dependencies anyhow.workspace = true @@ -57,6 +58,7 @@ uuid.workspace = true # Optional Dependencies console-subscriber = { version = "0.1.10", optional = true, features = ["parking_lot"] } +bevy_reflect = "0.14.2" [features] default = ["jemalloc_replacing_malloc", "azure", "gcp", "aws"] diff --git a/influxdb3_sys_events/Cargo.toml b/influxdb3_sys_events/Cargo.toml new file mode 100644 index 00000000000..602c57d05f9 --- /dev/null +++ b/influxdb3_sys_events/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "influxdb3_sys_events" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true + + +[dependencies] +# core crates +iox_time.workspace = true +iox_system_tables.workspace = true + +# local deps +influxdb3_server = { path = "../influxdb3_server" } + +# crates.io deps +arrow.workspace = true +arrow-array.workspace = true +async-trait.workspace = true +bevy_reflect.workspace = true +chrono.workspace = true +dashmap.workspace = true +datafusion.workspace = true + +[dev-dependencies] +test-log.workspace = true +proptest.workspace = true diff --git a/influxdb3_sys_events/src/lib.rs b/influxdb3_sys_events/src/lib.rs new file mode 100644 index 00000000000..7909eb44504 --- /dev/null +++ b/influxdb3_sys_events/src/lib.rs @@ -0,0 +1,583 @@ +use std::{ + any::{Any, TypeId}, + mem::replace, + sync::Arc, +}; +use std::{fmt::Debug, ops::Deref}; + +use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef}; +use arrow_array::{RecordBatch, StructArray}; +use async_trait::async_trait; +use bevy_reflect::Reflect; +use dashmap::DashMap; +use datafusion::{ + error::DataFusionError, + logical_expr::{col, BinaryExpr, Expr, Operator}, + scalar::ScalarValue, +}; +use iox_system_tables::IoxSystemTable; +use iox_time::TimeProvider; + +const MAX_CAPACITY: usize = 1000; +const EVENT_TYPE_PREDICATE: &str = "event_type"; + +// `select * from system.events_descr` +// foo | Capturing... +// +// `system.events` -> where event_type = 'Foo' +// Foo is event that has foo `system.events.foo` table to query +// Foo can have various props (should it be map array? / struct array?) +// +// event_time | event_data +// xx | {time: "2024-12-11T23:59:59.000", generation_id: 0} +// xx | [{time: "2024-12-11T23:59:59.000"}, {generation_id: 0}] +// xx | [{time: "2024-12-11T23:59:59.000"}, {generation_id: 0}] +// xx | [{time: "2024-12-11T23:59:59.000"}, {generation_id: 0}] +// xx | [{time: "2024-12-11T23:59:59.000"}, {generation_id: 0}] +// xx | [{time: "2024-12-11T23:59:59.000"}, {generation_id: 0}] +// system.events.foo => foo returns event_time and data (map array?) +// +pub trait SysEvent { + fn to_struct_array(&self) -> StructArray; +} + +// SystemSchemaProvider -> tables.insert each time new event comes in +// +// Event -> impl IoxSystemTable + +struct RingBuffer { + buf: Vec, + max: usize, + write_index: usize, +} + +impl RingBuffer { + pub fn new(capacity: usize) -> Self { + Self { + buf: Vec::with_capacity(capacity), + max: capacity, + write_index: 0, + } + } + + pub fn push(&mut self, val: T) { + if !self.reached_max() { + self.buf.push(val); + } else { + let _ = replace(&mut self.buf[self.write_index], val); + } + self.write_index = (self.write_index + 1) % self.max; + } + + pub fn in_order(&self) -> impl Iterator { + let (head, tail) = self.buf.split_at(self.write_index); + tail.iter().chain(head.iter()) + } + + fn reached_max(&mut self) -> bool { + self.buf.len() >= self.max + } +} + +#[allow(dead_code)] +pub struct EventSystemTable { + sys_event_store: SysEventStore, +} + +pub fn error() -> DataFusionError { + DataFusionError::Plan(format!( + "must provide a {EVENT_TYPE_PREDICATE} = '' predicate in queries to \ + system.events" + )) +} + +#[async_trait] +impl IoxSystemTable for EventSystemTable { + fn schema(&self) -> SchemaRef { + let columns = vec![ + Field::new("event_time", DataType::Utf8, false), + Field::new("event_data", DataType::Struct(Fields::empty()), false), + ]; + Arc::new(Schema::new(columns)) + } + + async fn scan( + &self, + filters: Option>, + _limit: Option, + ) -> Result { + let event_type = filters + .ok_or_else(error)? + .iter() + .find_map(|f| match f { + Expr::BinaryExpr(BinaryExpr { left, op, right }) => { + if left.deref() == &col(EVENT_TYPE_PREDICATE) && op == &Operator::Eq { + match right.deref() { + Expr::Literal( + ScalarValue::Utf8(Some(s)) | ScalarValue::LargeUtf8(Some(s)), + ) => Some(s.to_owned()), + _ => None, + } + } else { + None + } + } + _ => None, + }) + .ok_or_else(error)?; + + let results = self.sys_event_store.query_by_event_type(event_type); + unimplemented!() + } +} + +#[derive(Default, Clone, Debug, Reflect)] +pub struct Event { + time: i64, + data: D, +} + +pub struct SysEventStore { + events: dashmap::DashMap>, + time_provider: Arc, +} + +impl SysEventStore { + pub fn new(time_provider: Arc) -> Self { + Self { + events: DashMap::new(), + time_provider, + } + } + + pub fn add(&self, val: E) + where + E: 'static + Debug + Sync + Send, + { + let wrapped = Event { + time: self.time_provider.now().timestamp_nanos(), + data: val, + }; + let mut buf = self + .events + .entry(TypeId::of::>>()) + .or_insert_with(|| Box::new(RingBuffer::>::new(MAX_CAPACITY))); + buf.downcast_mut::>>() + .unwrap() + .push(wrapped); + } + + pub fn query_by_event_type(&self, _event_type: E) -> Option>> + where + E: 'static + Clone + Debug + Sync + Send, + { + if let Some(buf) = self.events.get(&TypeId::of::>>()) { + let iter = buf + .downcast_ref::>>() + .unwrap() + .in_order(); + let mut vec = vec![]; + for i in iter { + vec.push(i.clone()); + } + return Some(vec); + }; + None + } + + pub fn query(&self) -> Option>> + where + E: 'static + Clone + Debug + Sync + Send, + { + if let Some(buf) = self.events.get(&TypeId::of::>>()) { + let iter = buf + .downcast_ref::>>() + .unwrap() + .in_order(); + let mut vec = vec![]; + for i in iter { + vec.push(i.clone()); + } + return Some(vec); + }; + None + } + + pub fn walk(&self) { + for key_val in self.events.iter() { + println!("val: {:?}", key_val.pair()); + } + } +} + +#[cfg(test)] +mod tests { + use core::panic; + use std::sync::Arc; + + use arrow::{ + array::{ + BooleanBuilder, Int16Builder, Int64Builder, Int8Builder, StringBuilder, StructBuilder, + UInt64Builder, + }, + datatypes::{DataType, Field}, + }; + use arrow_array::StructArray; + use bevy_reflect::{GetField, Reflect}; + use iox_time::{MockProvider, Time}; + + use crate::{Event, RingBuffer, SysEvent, SysEventStore}; + + #[derive(Default, Clone, Debug, Reflect)] + struct SampleEvent1 { + start_time: i64, + time_taken: u64, + total_fetched: u64, + random_name: String, + } + + impl SysEvent for SampleEvent1 { + fn to_struct_array(&self) -> StructArray { + let mut struct_builder = StructBuilder::from_fields( + vec![ + Field::new("start_time", DataType::Utf8, true), + Field::new("time_taken", DataType::Int64, true), + Field::new("total_fetched", DataType::UInt64, true), + Field::new("random_name", DataType::Utf8, true), + ], + 1, + ); + + let start_time_builder = struct_builder.field_builder::(0).unwrap(); + start_time_builder.append_value("2024-12-11T02:00:03.000"); + + let time_taken_builder = struct_builder.field_builder::(1).unwrap(); + time_taken_builder.append_value(0); + + let total_fetched_builder = struct_builder.field_builder::(2).unwrap(); + total_fetched_builder.append_value(500); + + let random_name_builder = struct_builder.field_builder::(3).unwrap(); + random_name_builder.append_value("random name"); + + struct_builder.append(true); + struct_builder.finish() + } + } + + #[derive(Default, Clone, Debug, Reflect)] + struct SampleEvent2 { + start_time: i64, + time_taken: u64, + generation_id: u64, + } + + impl SysEvent for SampleEvent2 { + fn to_struct_array(&self) -> StructArray { + let mut struct_builder = StructBuilder::from_fields( + vec![ + Field::new("start_time", DataType::Utf8, true), + Field::new("time_taken", DataType::UInt64, true), + Field::new("generation_id", DataType::UInt64, true), + ], + 1, + ); + + let start_time_builder = struct_builder.field_builder::(0).unwrap(); + start_time_builder.append_value("2024-12-11T02:00:03.000"); + + let time_taken_builder = struct_builder.field_builder::(1).unwrap(); + time_taken_builder.append_value(0); + + let generation_id_builder = struct_builder.field_builder::(2).unwrap(); + generation_id_builder.append_value(500); + + struct_builder.append(true); + struct_builder.finish() + } + } + + #[test] + fn test_ring_buffer_not_full_at_less_than_max() { + let mut buf = RingBuffer::new(2); + buf.push(1); + + let all_results: Vec<&u64> = buf.in_order().collect(); + let first = *all_results.first().unwrap(); + + assert_eq!(1, all_results.len()); + assert_eq!(&1, first); + } + + #[test] + fn test_ring_buffer_not_full_at_max() { + let mut buf = RingBuffer::new(2); + buf.push(1); + buf.push(2); + + let all_results: Vec<&u64> = buf.in_order().collect(); + let first = *all_results.first().unwrap(); + let second = *all_results.get(1).unwrap(); + + assert_eq!(2, all_results.len()); + assert_eq!(&1, first); + assert_eq!(&2, second); + } + + #[test] + fn test_ring_buffer() { + let mut buf = RingBuffer::new(2); + buf.push(1); + buf.push(2); + buf.push(3); + + let all_results: Vec<&u64> = buf.in_order().collect(); + let first = *all_results.first().unwrap(); + let second = *all_results.get(1).unwrap(); + + assert_eq!(2, all_results.len()); + assert_eq!(&2, first); + assert_eq!(&3, second); + } + + #[test] + fn test_event_store() { + let event_data = SampleEvent1 { + start_time: 0, + time_taken: 10, + total_fetched: 10, + random_name: "foo".to_owned(), + }; + + let event_data2 = SampleEvent2 { + start_time: 0, + time_taken: 10, + generation_id: 100, + }; + + let event_data3 = SampleEvent1 { + start_time: 0, + time_taken: 10, + total_fetched: 10, + random_name: "boo".to_owned(), + }; + + let time_provider = MockProvider::new(Time::from_timestamp_nanos(100)); + + let event_store = SysEventStore::new(Arc::new(time_provider)); + event_store.add(event_data); + event_store.add(event_data2); + event_store.add(event_data3); + assert_eq!(2, event_store.events.len()); + + event_store.walk(); + + let all_events = event_store.query::().unwrap(); + assert_eq!(2, all_events.len()); + println!("{:?}", all_events); + + let all_events = event_store.query::().unwrap(); + assert_eq!(1, all_events.len()); + println!("{:?}", all_events); + } + + #[test] + fn test_struct_builder() { + let event_data = SampleEvent1 { + start_time: 0, + time_taken: 10, + total_fetched: 10, + random_name: "foo".to_owned(), + }; + let event_data2 = SampleEvent1 { + start_time: 0, + time_taken: 10, + total_fetched: 10, + random_name: "boo".to_owned(), + }; + + let event = Event { + time: 0, + data: event_data.clone(), + }; + + let event2 = Event { + time: 0, + data: event_data2, + }; + + let mut time_arr = Int64Builder::with_capacity(2); + // let mut list_arr = GenericListBuilder::new(); + // let struct_arrays: Vec = [event, event2].iter().map(|e| e.data.to_struct_array()).collect(); + // concat(&struct_arrays); + let field_infos = derive_schema(&event_data); + let len = field_infos.len(); + let schema: Vec = field_infos.iter().map(|f| f.field.clone()).collect(); + let mut struct_builder = StructBuilder::from_fields(schema, len); + + for i in &[event, event2] { + time_arr.append_value(i.time); + let data = &i.data; + // let arr = i.data.to_struct_array(); + // println!(">>>> full struct array {:?}", arr); + + for (idx, schema) in field_infos.iter().enumerate() { + match schema.field.data_type() { + DataType::Null => { + // let get_field = data.get_field::(schema.field.name()); + // let value = get_field.unwrap(); + // struct_builder.field_builder::(idx).unwrap().append_value(value.to_owned()); + } + DataType::Boolean => { + let value = data.get_field::(schema.field.name()).unwrap(); + struct_builder + .field_builder::(idx) + .unwrap() + .append_value(value.to_owned()); + } + DataType::Int8 => { + let value = data.get_field::(schema.field.name()).unwrap(); + struct_builder + .field_builder::(idx) + .unwrap() + .append_value(value.to_owned()); + } + DataType::Int16 => { + let value = data.get_field::(schema.field.name()).unwrap(); + struct_builder + .field_builder::(idx) + .unwrap() + .append_value(value.to_owned()); + } + DataType::Int32 => { + let value = data.get_field::(schema.field.name()).unwrap(); + struct_builder + .field_builder::(idx) + .unwrap() + .append_value(value.to_owned()); + } + DataType::Int64 => { + let value = data.get_field::(schema.field.name()).unwrap(); + struct_builder + .field_builder::(idx) + .unwrap() + .append_value(value.to_owned()); + } + DataType::UInt8 => todo!(), + DataType::UInt16 => todo!(), + DataType::UInt32 => todo!(), + DataType::UInt64 => { + let value = data.get_field::(schema.field.name()).unwrap(); + struct_builder + .field_builder::(idx) + .unwrap() + .append_value(value.to_owned()); + } + DataType::Float16 => todo!(), + DataType::Float32 => todo!(), + DataType::Float64 => todo!(), + DataType::Timestamp(_, _) => todo!(), + DataType::Date32 => todo!(), + DataType::Date64 => todo!(), + DataType::Time32(_) => todo!(), + DataType::Time64(_) => todo!(), + DataType::Duration(_) => todo!(), + DataType::Interval(_) => todo!(), + DataType::Binary => todo!(), + DataType::FixedSizeBinary(_) => todo!(), + DataType::LargeBinary => todo!(), + DataType::BinaryView => todo!(), + DataType::Utf8 => { + let value = data.get_field::(schema.field.name()).unwrap(); + struct_builder + .field_builder::(idx) + .unwrap() + .append_value(value); + } + DataType::LargeUtf8 => todo!(), + DataType::Utf8View => todo!(), + DataType::List(_) => todo!(), + DataType::ListView(_) => todo!(), + DataType::FixedSizeList(_, _) => todo!(), + DataType::LargeList(_) => todo!(), + DataType::LargeListView(_) => todo!(), + DataType::Struct(_) => todo!(), + DataType::Union(_, _) => todo!(), + DataType::Dictionary(_, _) => todo!(), + DataType::Decimal128(_, _) => todo!(), + DataType::Decimal256(_, _) => todo!(), + DataType::Map(_, _) => todo!(), + DataType::RunEndEncoded(_, _) => todo!(), + }; + } + + let type_info = i.get_represented_type_info().unwrap(); + println!(" Got the type info {:?}", type_info); + match type_info { + bevy_reflect::TypeInfo::Struct(struct_info) => { + for i in struct_info.iter() { + println!(" Field name {:?} and type {:?}", i.name(), i.type_path()); + } + } + _ => panic!("Not allowed"), + }; + } + } + + #[test] + fn test_derive_schema() { + let foo = "foo"; + let s = Sample { + x: foo.to_string(), + y: Some(foo.to_string()), + z: foo, + }; + derive_schema(&s); + } + + #[derive(Reflect)] + struct Sample<'a> { + x: String, + y: Option, + z: &'a str, + } + + fn derive_schema(event_data: &T) -> Vec { + let mut fields = vec![]; + let type_info = event_data.get_represented_type_info().unwrap(); + println!(" Got the type info {:?}", type_info); + match type_info { + bevy_reflect::TypeInfo::Struct(struct_info) => { + for i in struct_info.iter() { + println!(" Field name {:?} and type {:?}", i.name(), i.type_path()); + if i.type_path() == "i64" { + fields.push(FieldInfo { + name: i.name(), + field: Field::new(i.name(), DataType::Int64, true), + }); + } else if i.type_path() == "u64" { + fields.push(FieldInfo { + name: i.name(), + field: Field::new(i.name(), DataType::UInt64, true), + }); + } else if i.type_path() == "alloc::string::String" { + fields.push(FieldInfo { + name: i.name(), + field: Field::new(i.name(), DataType::Utf8, true), + }); + } else { + println!("Unsupported field"); + } + } + } + _ => panic!("Not allowed"), + }; + fields + } + + #[allow(dead_code)] + struct FieldInfo<'a> { + name: &'a str, + field: Field, + } +}