From 07c45eae25a7b95f0884e01d90c5bc660af833ec Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 10 May 2024 15:41:11 +0800 Subject: [PATCH] feat(flow): flow node manager feat(flow): render src/sink feat(flow): flow node manager in standalone fix?: higher run freq chore: remove abunant error enum variant fix: run with higher freq if insert more chore: fix after rebase chore: typos --- Cargo.lock | 2 + src/cmd/Cargo.toml | 1 + src/cmd/src/standalone.rs | 26 +- src/flow/src/adapter.rs | 671 +++++++++++++++++++++++- src/flow/src/adapter/flownode_impl.rs | 117 +++++ src/flow/src/adapter/parse_expr.rs | 245 +++++++++ src/flow/src/adapter/server.rs | 147 ++++++ src/flow/src/adapter/tests.rs | 64 +++ src/flow/src/compute/render/src_sink.rs | 4 +- src/flow/src/expr/scalar.rs | 1 + src/flow/src/lib.rs | 5 + src/frontend/src/instance/standalone.rs | 9 +- tests-integration/Cargo.toml | 1 + tests-integration/src/standalone.rs | 21 +- 14 files changed, 1299 insertions(+), 15 deletions(-) create mode 100644 src/flow/src/adapter/flownode_impl.rs create mode 100644 src/flow/src/adapter/parse_expr.rs create mode 100644 src/flow/src/adapter/server.rs create mode 100644 src/flow/src/adapter/tests.rs diff --git a/Cargo.lock b/Cargo.lock index 37fd88cb4613..3d572aa3e832 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1629,6 +1629,7 @@ dependencies = [ "either", "etcd-client", "file-engine", + "flow", "frontend", "futures", "human-panic", @@ -10683,6 +10684,7 @@ dependencies = [ "datanode", "datatypes", "dotenv", + "flow", "frontend", "futures", "futures-util", diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 7052df92244e..a11a9a01974a 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -45,6 +45,7 @@ datatypes.workspace = true either = "1.8" etcd-client.workspace = true file-engine.workspace = true +flow.workspace = true frontend.workspace = true futures.workspace = true human-panic = "1.2.2" diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index ff810ead5d41..4a932abc76a0 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -45,6 +45,7 @@ use common_wal::config::StandaloneWalConfig; use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig}; use datanode::datanode::{Datanode, DatanodeBuilder}; use file_engine::config::EngineConfig as FileEngineConfig; +use flow::FlownodeBuilder; use frontend::frontend::FrontendOptions; use frontend::instance::builder::FrontendBuilder; use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager}; @@ -426,11 +427,26 @@ impl StartCommand { ) .await; + let table_metadata_manager = + Self::create_table_metadata_manager(kv_backend.clone()).await?; + + let flow_builder = FlownodeBuilder::new( + Default::default(), + fe_plugins.clone(), + table_metadata_manager.clone(), + catalog_manager.clone(), + ) + .with_kv_backend(kv_backend.clone()); + let flownode = Arc::new(flow_builder.build().await); + let builder = DatanodeBuilder::new(dn_opts, fe_plugins.clone()).with_kv_backend(kv_backend.clone()); let datanode = builder.build().await.context(StartDatanodeSnafu)?; - let node_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server())); + let node_manager = Arc::new(StandaloneDatanodeManager { + region_server: datanode.region_server(), + flow_server: flownode.clone(), + }); let table_id_sequence = Arc::new( SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone()) @@ -448,8 +464,6 @@ impl StartCommand { opts.wal.into(), kv_backend.clone(), )); - let table_metadata_manager = - Self::create_table_metadata_manager(kv_backend.clone()).await?; let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone())); let table_meta_allocator = Arc::new(TableMetadataAllocator::new( table_id_sequence, @@ -482,6 +496,12 @@ impl StartCommand { .await .context(StartFrontendSnafu)?; + // flow server need to be able to use frontend to write insert requests back + flownode + .set_frontend_invoker(Box::new(frontend.clone())) + .await; + let _handle = flownode.clone().run_background(); + let servers = Services::new(fe_opts.clone(), Arc::new(frontend.clone()), fe_plugins) .build() .await diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 33b05ddec12b..61d4b92350c0 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -14,19 +14,678 @@ //! for getting data from source and sending results to sink //! and communicating with other parts of the database +#![warn(unused_imports)] + +use std::collections::{BTreeMap, HashMap}; +use std::sync::Arc; + +use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests}; +use catalog::kvbackend::KvBackendCatalogManager; +use common_base::Plugins; +use common_error::ext::BoxedError; +use common_frontend::handler::FrontendInvoker; +use common_meta::key::TableMetadataManagerRef; +use common_meta::kv_backend::KvBackendRef; +use common_runtime::JoinHandle; +use common_telemetry::{debug, info}; +use datatypes::schema::ColumnSchema; +use datatypes::value::Value; +use greptime_proto::v1; +use itertools::Itertools; +use minstant::Anchor; +use query::{QueryEngine, QueryEngineFactory}; +use serde::{Deserialize, Serialize}; +use session::context::QueryContext; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::{ConcreteDataType, RegionId}; +use table::metadata::TableId; +use tokio::sync::{oneshot, watch, Mutex, RwLock}; + +use crate::adapter::error::{ExternalSnafu, TableNotFoundSnafu, UnexpectedSnafu}; +pub(crate) use crate::adapter::node_context::FlownodeContext; +use crate::adapter::parse_expr::parse_fixed; +use crate::adapter::table_source::TableSource; +use crate::adapter::util::column_schemas_to_proto; +use crate::adapter::worker::{create_worker, Worker, WorkerHandle}; +use crate::compute::ErrCollector; +use crate::expr::GlobalId; +use crate::repr::{self, DiffRow, Row}; +use crate::transform::sql_to_flow_plan; pub(crate) mod error; -pub(crate) mod node_context; -mod table_source; +mod flownode_impl; +mod parse_expr; +mod server; +#[cfg(test)] +mod tests; mod util; +mod worker; -pub(crate) use node_context::FlownodeContext; -pub(crate) use table_source::TableSource; +pub(crate) mod node_context; +mod table_source; -mod worker; +use error::Error; pub const PER_REQ_MAX_ROW_CNT: usize = 8192; + // TODO: refactor common types for flow to a separate module /// FlowId is a unique identifier for a flow task -pub type FlowId = u32; +pub type FlowId = u64; pub type TableName = [String; 3]; + +/// Options for flow node +#[derive(Clone, Default, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct FlownodeOptions { + /// rpc address + pub rpc_addr: String, +} + +/// Flownode Builder +pub struct FlownodeBuilder { + opts: FlownodeOptions, + plugins: Plugins, + kv_backend: Option, + table_meta: TableMetadataManagerRef, + catalog_manager: Arc, +} + +impl FlownodeBuilder { + /// init flownode builder + pub fn new( + opts: FlownodeOptions, + plugins: Plugins, + table_meta: TableMetadataManagerRef, + catalog_manager: Arc, + ) -> Self { + Self { + opts, + plugins, + kv_backend: None, + table_meta, + catalog_manager, + } + } + + /// set kv backend + pub fn with_kv_backend(self, kv_backend: KvBackendRef) -> Self { + Self { + kv_backend: Some(kv_backend), + ..self + } + } + + /// TODO(discord9): error handling + pub async fn build(self) -> FlownodeManager { + let query_engine_factory = QueryEngineFactory::new_with_plugins( + // query engine in flownode only translate plan with resolved table source. + self.catalog_manager.clone(), + None, + None, + None, + false, + self.plugins.clone(), + ); + let query_engine = query_engine_factory.query_engine(); + + let (tx, rx) = oneshot::channel(); + + let _handle = std::thread::spawn(move || { + let node_id = Some(1); + let (flow_node_manager, mut worker) = + FlownodeManager::new_with_worker(node_id, query_engine, self.table_meta.clone()); + let _ = tx.send(flow_node_manager); + info!("Flow Worker started in new thread"); + worker.run(); + }); + let man = rx.await.unwrap(); + info!("Flow Node Manager started"); + man + } +} + +/// This function will create a new thread for flow worker and return a handle to the flow node manager +pub fn start_flow_node_with_one_worker( + query_engine: Arc, + table_meta: TableMetadataManagerRef, +) -> FlownodeManager { + let (tx, rx) = oneshot::channel(); + + let _handle = std::thread::spawn(move || { + let node_id = Some(1); + let (flow_node_manager, mut worker) = + FlownodeManager::new_with_worker(node_id, query_engine, table_meta); + let _ = tx.send(flow_node_manager); + worker.run(); + }); + + rx.blocking_recv().unwrap() +} + +/// Arc-ed FlowNodeManager, cheaper to clone +pub type FlownodeManagerRef = Arc; + +/// FlowNodeManager manages the state of all tasks in the flow node, which should be run on the same thread +/// +/// The choice of timestamp is just using current system timestamp for now +pub struct FlownodeManager { + /// The handler to the worker that will run the dataflow + /// which is `!Send` so a handle is used + pub worker_handles: Vec>, + /// The query engine that will be used to parse the query and convert it to a dataflow plan + query_engine: Arc, + /// Getting table name and table schema from table info manager + table_info_source: TableSource, + frontend_invoker: RwLock>>, + /// contains mapping from table name to global id, and table schema + node_context: Mutex, + flow_err_collectors: RwLock>, + src_send_buf_lens: RwLock>>, + tick_manager: FlowTickManager, + node_id: Option, +} + +/// Building FlownodeManager +impl FlownodeManager { + /// set frontend invoker + pub async fn set_frontend_invoker( + self: &Arc, + frontend: Box, + ) { + *self.frontend_invoker.write().await = Some(frontend); + } + + /// Create **without** setting `frontend_invoker` + pub fn new( + node_id: Option, + query_engine: Arc, + table_meta: TableMetadataManagerRef, + ) -> Self { + let srv_map = TableSource::new( + table_meta.table_info_manager().clone(), + table_meta.table_name_manager().clone(), + ); + let node_context = FlownodeContext::default(); + let tick_manager = FlowTickManager::new(); + let worker_handles = Vec::new(); + FlownodeManager { + worker_handles, + query_engine, + table_info_source: srv_map, + frontend_invoker: RwLock::new(None), + node_context: Mutex::new(node_context), + flow_err_collectors: Default::default(), + src_send_buf_lens: Default::default(), + tick_manager, + node_id, + } + } + + /// Create a flownode manager with one worker + pub fn new_with_worker<'s>( + node_id: Option, + query_engine: Arc, + table_meta: TableMetadataManagerRef, + ) -> (Self, Worker<'s>) { + let mut zelf = Self::new(node_id, query_engine, table_meta); + let (handle, worker) = create_worker(); + zelf.add_worker_handle(handle); + (zelf, worker) + } + + /// add a worker handler to manager, meaning this corresponding worker is under it's manage + pub fn add_worker_handle(&mut self, handle: WorkerHandle) { + self.worker_handles.push(Mutex::new(handle)); + } +} + +#[derive(Debug)] +pub enum DiffRequest { + Insert(Vec<(Row, repr::Timestamp)>), + Delete(Vec<(Row, repr::Timestamp)>), +} + +/// iterate through the diff row and form continuous diff row with same diff type +pub fn diff_row_to_request(rows: Vec) -> Vec { + let mut reqs = Vec::new(); + for (row, ts, diff) in rows { + let last = reqs.last_mut(); + match (last, diff) { + (Some(DiffRequest::Insert(rows)), 1) => { + rows.push((row, ts)); + } + (Some(DiffRequest::Insert(_)), -1) => reqs.push(DiffRequest::Delete(vec![(row, ts)])), + (Some(DiffRequest::Delete(rows)), -1) => { + rows.push((row, ts)); + } + (Some(DiffRequest::Delete(_)), 1) => reqs.push(DiffRequest::Insert(vec![(row, ts)])), + (None, 1) => reqs.push(DiffRequest::Insert(vec![(row, ts)])), + (None, -1) => reqs.push(DiffRequest::Delete(vec![(row, ts)])), + _ => {} + } + } + reqs +} + +/// This impl block contains methods to send writeback requests to frontend +impl FlownodeManager { + /// TODO(discord9): merge all same type of diff row into one requests + /// + /// Return the number of requests it made + pub async fn send_writeback_requests(&self) -> Result { + let all_reqs = self.generate_writeback_request().await; + if all_reqs.is_empty() || all_reqs.iter().all(|v| v.1.is_empty()) { + return Ok(0); + } + let mut req_cnt = 0; + for (table_name, reqs) in all_reqs { + if reqs.is_empty() { + continue; + } + let (catalog, schema) = (table_name[0].clone(), table_name[1].clone()); + let ctx = Arc::new(QueryContext::with(&catalog, &schema)); + // TODO(discord9): instead of auto build table from request schema, actually build table + // before `create flow` to be able to assign pk and ts etc. + let (primary_keys, schema) = if let Some(table_id) = self + .table_info_source + .get_table_id_from_name(&table_name) + .await? + { + let table_info = self + .table_info_source + .get_table_info_value(&table_id) + .await? + .unwrap(); + let meta = table_info.table_info.meta; + let primary_keys = meta + .primary_key_indices + .into_iter() + .map(|i| meta.schema.column_schemas[i].name.clone()) + .collect_vec(); + let schema = meta.schema.column_schemas; + (primary_keys, schema) + } else { + // TODO(discord9): get ts column from `RelationType` once we are done rewriting flow plan to attach ts + let (primary_keys, schema) = { + let node_ctx = self.node_context.lock().await; + let gid: GlobalId = node_ctx + .table_repr + .get_by_name(&table_name) + .map(|x| x.1) + .unwrap(); + let schema = node_ctx + .schema + .get(&gid) + .with_context(|| TableNotFoundSnafu { + name: format!("Table name = {:?}", table_name), + })? + .clone(); + // TODO(discord9): use default key from schema + let primary_keys = schema + .keys + .first() + .map(|v| { + v.column_indices + .iter() + .map(|i| format!("Col_{i}")) + .collect_vec() + }) + .unwrap_or_default(); + let ts_col = ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + true, + ) + .with_time_index(true); + + let wout_ts = schema + .column_types + .into_iter() + .enumerate() + .map(|(idx, typ)| { + ColumnSchema::new(format!("Col_{idx}"), typ.scalar_type, typ.nullable) + }) + .collect_vec(); + let mut with_ts = wout_ts.clone(); + with_ts.push(ts_col); + (primary_keys, with_ts) + }; + (primary_keys, schema) + }; + + let proto_schema = column_schemas_to_proto(schema, &primary_keys)?; + + debug!( + "Sending {} writeback requests to table {}, reqs={:?}", + reqs.len(), + table_name.join("."), + reqs + ); + + for req in reqs { + match req { + DiffRequest::Insert(insert) => { + let rows_proto: Vec = insert + .into_iter() + .map(|(mut row, _ts)| { + row.extend(Some(Value::from( + common_time::Timestamp::new_millisecond(0), + ))); + row.into() + }) + .collect::>(); + let table_name = table_name.last().unwrap().clone(); + let req = RowInsertRequest { + table_name, + rows: Some(v1::Rows { + schema: proto_schema.clone(), + rows: rows_proto, + }), + }; + req_cnt += 1; + self.frontend_invoker + .read() + .await + .as_ref() + .with_context(|| UnexpectedSnafu { + reason: "Expect a frontend invoker for flownode to write back", + })? + .row_inserts(RowInsertRequests { inserts: vec![req] }, ctx.clone()) + .await + .map_err(BoxedError::new) + .with_context(|_| ExternalSnafu {})?; + } + DiffRequest::Delete(remove) => { + info!("original remove rows={:?}", remove); + let rows_proto: Vec = remove + .into_iter() + .map(|(mut row, _ts)| { + row.extend(Some(Value::from( + common_time::Timestamp::new_millisecond(0), + ))); + row.into() + }) + .collect::>(); + let table_name = table_name.last().unwrap().clone(); + let req = RowDeleteRequest { + table_name, + rows: Some(v1::Rows { + schema: proto_schema.clone(), + rows: rows_proto, + }), + }; + + req_cnt += 1; + self.frontend_invoker + .read() + .await + .as_ref() + .with_context(|| UnexpectedSnafu { + reason: "Expect a frontend invoker for flownode to write back", + })? + .row_deletes(RowDeleteRequests { deletes: vec![req] }, ctx.clone()) + .await + .map_err(BoxedError::new) + .with_context(|_| ExternalSnafu {})?; + } + } + } + } + Ok(req_cnt) + } + + /// Generate writeback request for all sink table + pub async fn generate_writeback_request(&self) -> BTreeMap> { + let mut output = BTreeMap::new(); + for (name, sink_recv) in self + .node_context + .lock() + .await + .sink_receiver + .iter_mut() + .map(|(n, (_s, r))| (n, r)) + { + let mut rows = Vec::new(); + while let Ok(row) = sink_recv.try_recv() { + rows.push(row); + } + let reqs = diff_row_to_request(rows); + output.insert(name.clone(), reqs); + } + output + } +} + +/// Flow Runtime related methods +impl FlownodeManager { + /// run in common_runtime background runtime + pub fn run_background(self: Arc) -> JoinHandle<()> { + info!("Starting flownode manager's background task"); + common_runtime::spawn_bg(async move { + self.run().await; + }) + } + + /// log all flow errors + pub async fn log_all_errors(&self) { + for (f_id, f_err) in self.flow_err_collectors.read().await.iter() { + let all_errors = f_err.get_all().await; + if !all_errors.is_empty() { + let all_errors = all_errors + .into_iter() + .map(|i| format!("{:?}", i)) + .join("\n"); + common_telemetry::error!("Flow {} has following errors: {}", f_id, all_errors); + } + } + } + + /// Trigger dataflow running, and then send writeback request to the source sender + /// + /// note that this method didn't handle input mirror request, as this should be handled by grpc server + pub async fn run(&self) { + debug!("Starting to run"); + loop { + // TODO(discord9): only run when new inputs arrive or scheduled to + self.run_available().await.unwrap(); + // TODO(discord9): error handling + self.send_writeback_requests().await.unwrap(); + self.log_all_errors().await; + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } + + /// Run all available subgraph in the flow node + /// This will try to run all dataflow in this node + /// + /// However this is not blocking and can sometimes return while actual computation is still running in worker thread + /// TODO(discord9): add flag for subgraph that have input since last run + pub async fn run_available(&self) -> Result<(), Error> { + let now = self.tick_manager.tick(); + + loop { + for worker in self.worker_handles.iter() { + // TODO(discord9): consider how to handle error in individual worker + worker.lock().await.run_available(now).await.unwrap(); + } + // first check how many inputs were sent + let send_cnt = match self.node_context.lock().await.flush_all_sender() { + Ok(cnt) => cnt, + Err(err) => { + common_telemetry::error!("Flush send buf errors: {:?}", err); + break; + } + }; + // if no inputs + if send_cnt == 0 { + break; + } else { + debug!("FlownodeManager::run_available: send_cnt={}", send_cnt); + } + } + + Ok(()) + } + + /// send write request to related source sender + pub async fn handle_write_request( + &self, + region_id: RegionId, + rows: Vec, + ) -> Result<(), Error> { + debug!( + "Handling write request for region_id={:?} with {} rows", + region_id, + rows.len() + ); + let table_id = region_id.table_id(); + self.node_context.lock().await.send(table_id, rows)?; + Ok(()) + } +} + +/// Create&Remove flow +impl FlownodeManager { + /// remove a flow by it's id + pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> { + for handle in self.worker_handles.iter() { + let handle = handle.lock().await; + if handle.contains_flow(flow_id).await? { + handle.remove_flow(flow_id).await?; + break; + } + } + Ok(()) + } + + /// Return task id if a new task is created, otherwise return None + /// + /// steps to create task: + /// 1. parse query into typed plan(and optional parse expire_when expr) + /// 2. render source/sink with output table id and used input table id + #[allow(clippy::too_many_arguments)] + pub async fn create_flow( + &self, + flow_id: FlowId, + sink_table_name: TableName, + source_table_ids: &[TableId], + create_if_not_exist: bool, + expire_when: Option, + comment: Option, + sql: String, + flow_options: HashMap, + query_ctx: Option, + ) -> Result, Error> { + if create_if_not_exist { + // check if the task already exists + for handle in self.worker_handles.iter() { + if handle.lock().await.contains_flow(flow_id).await? { + return Ok(None); + } + } + } + + let mut node_ctx = self.node_context.lock().await; + // assign global id to source and sink table + for source in source_table_ids { + node_ctx + .assign_global_id_to_table(&self.table_info_source, None, Some(*source)) + .await?; + } + node_ctx + .assign_global_id_to_table(&self.table_info_source, Some(sink_table_name.clone()), None) + .await?; + + node_ctx.register_task_src_sink(flow_id, source_table_ids, sink_table_name.clone()); + + node_ctx.query_context = query_ctx.map(Arc::new); + // construct a active dataflow state with it + let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?; + debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan); + node_ctx.assign_table_schema(&sink_table_name, flow_plan.typ.clone())?; + + let expire_when = expire_when + .and_then(|s| { + if s.is_empty() || s.split_whitespace().join("").is_empty() { + None + } else { + Some(s) + } + }) + .map(|d| { + let d = d.as_ref(); + parse_fixed(d) + .map(|(_, n)| n) + .map_err(|err| err.to_string()) + }) + .transpose() + .map_err(|err| UnexpectedSnafu { reason: err }.build())?; + let _ = comment; + let _ = flow_options; + + // TODO(discord9): add more than one handles + let sink_id = node_ctx.table_repr.get_by_name(&sink_table_name).unwrap().1; + let sink_sender = node_ctx.get_sink_by_global_id(&sink_id)?; + + let source_ids = source_table_ids + .iter() + .map(|id| node_ctx.table_repr.get_by_table_id(id).unwrap().1) + .collect_vec(); + let source_receivers = source_ids + .iter() + .map(|id| { + node_ctx + .get_source_by_global_id(id) + .map(|s| s.get_receiver()) + }) + .collect::, _>>()?; + let err_collector = ErrCollector::default(); + self.flow_err_collectors + .write() + .await + .insert(flow_id, err_collector.clone()); + let handle = &self.worker_handles[0].lock().await; + let create_request = worker::Request::Create { + flow_id, + plan: flow_plan, + sink_id, + sink_sender, + source_ids, + src_recvs: source_receivers, + expire_when, + create_if_not_exist, + err_collector, + }; + handle.create_flow(create_request).await?; + info!("Successfully create flow with id={}", flow_id); + Ok(Some(flow_id)) + } +} + +/// FlowTickManager is a manager for flow tick, which trakc flow execution progress +/// +/// TODO(discord9): better way to do it, and not expose flow tick even to other flow to avoid +/// TSO coord mess +#[derive(Clone)] +pub struct FlowTickManager { + anchor: Anchor, +} + +impl std::fmt::Debug for FlowTickManager { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FlowTickManager").finish() + } +} + +impl FlowTickManager { + pub fn new() -> Self { + FlowTickManager { + anchor: Anchor::new(), + } + } + + /// Return the current timestamp in milliseconds + pub fn tick(&self) -> repr::Timestamp { + (minstant::Instant::now().as_unix_nanos(&self.anchor) / 1_000_000) as repr::Timestamp + } +} diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs new file mode 100644 index 000000000000..057d8f932ed3 --- /dev/null +++ b/src/flow/src/adapter/flownode_impl.rs @@ -0,0 +1,117 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! impl `FlowNode` trait for FlowNodeManager so standalone can call them + +use api::v1::flow::{flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse}; +use api::v1::region::InsertRequests; +use common_error::ext::BoxedError; +use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu}; +use common_meta::node_manager::Flownode; +use itertools::Itertools; +use snafu::ResultExt; + +use crate::adapter::FlownodeManager; +use crate::repr::{self, DiffRow}; + +fn to_meta_err(err: crate::adapter::error::Error) -> common_meta::error::Error { + // TODO(discord9): refactor this + Err::<(), _>(BoxedError::new(err)) + .with_context(|_| ExternalSnafu) + .unwrap_err() +} + +#[async_trait::async_trait] +impl Flownode for FlownodeManager { + async fn handle(&self, request: FlowRequest) -> Result { + let query_ctx = request + .header + .and_then(|h| h.query_context) + .map(|ctx| ctx.into()); + match request.body { + Some(flow_request::Body::Create(CreateRequest { + flow_id: Some(task_id), + source_table_ids, + sink_table_name: Some(sink_table_name), + create_if_not_exists, + expire_when, + comment, + sql, + flow_options, + })) => { + let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec(); + let sink_table_name = [ + sink_table_name.catalog_name, + sink_table_name.schema_name, + sink_table_name.table_name, + ]; + let ret = self + .create_flow( + task_id.id as u64, + sink_table_name, + &source_table_ids, + create_if_not_exists, + Some(expire_when), + Some(comment), + sql, + flow_options, + query_ctx, + ) + .await + .map_err(to_meta_err)?; + Ok(FlowResponse { + affected_flows: ret + .map(|id| greptime_proto::v1::FlowId { id: id as u32 }) + .into_iter() + .collect_vec(), + ..Default::default() + }) + } + Some(flow_request::Body::Drop(DropRequest { + flow_id: Some(flow_id), + })) => { + self.remove_flow(flow_id.id as u64) + .await + .map_err(to_meta_err)?; + Ok(Default::default()) + } + None => UnexpectedSnafu { + err_msg: "Missing request body", + } + .fail(), + _ => UnexpectedSnafu { + err_msg: "Invalid request body.", + } + .fail(), + } + } + + async fn handle_inserts(&self, request: InsertRequests) -> Result { + for write_request in request.requests { + let region_id = write_request.region_id; + let rows_proto = write_request.rows.map(|r| r.rows).unwrap_or(vec![]); + // TODO(discord9): reconsider time assignment mechanism + let now = self.tick_manager.tick(); + let rows: Vec = rows_proto + .into_iter() + .map(repr::Row::from) + .map(|r| (r, now, 1)) + .collect_vec(); + self.handle_write_request(region_id.into(), rows) + .await + .map_err(to_meta_err)?; + } + Ok(Default::default()) + } +} diff --git a/src/flow/src/adapter/parse_expr.rs b/src/flow/src/adapter/parse_expr.rs new file mode 100644 index 000000000000..3a28e813d597 --- /dev/null +++ b/src/flow/src/adapter/parse_expr.rs @@ -0,0 +1,245 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! parse expr like "ts <= now() - interval '5 m'" + +use nom::branch::alt; +use nom::bytes::complete::{tag, tag_no_case}; +use nom::character::complete::{alphanumeric1, digit0, multispace0}; +use nom::combinator::peek; +use nom::sequence::tuple; +use nom::IResult; + +use crate::repr; + +#[test] +fn test_parse_duration() { + let input = "1 h 5 m 42 second"; + let (remain, ttl) = parse_duration(input).unwrap(); + assert_eq!(remain, ""); + assert_eq!(ttl, (3600 + 5 * 60 + 42) * 1000); +} + +#[test] +fn test_parse_fixed() { + let input = "timestamp < now() - INTERVAL '5m 42s'"; + let (remain, ttl) = parse_fixed(input).unwrap(); + assert_eq!(remain, ""); + assert_eq!(ttl, (5 * 60 + 42) * 1000); +} + +pub fn parse_fixed(input: &str) -> IResult<&str, i64> { + let (r, _) = tuple(( + multispace0, + tag_no_case("timestamp"), + multispace0, + tag("<"), + multispace0, + tag_no_case("now()"), + multispace0, + tag("-"), + multispace0, + tag_no_case("interval"), + multispace0, + ))(input)?; + tuple((tag("'"), parse_duration, tag("'")))(r).map(|(r, (_, ttl, _))| (r, ttl)) +} + +/// parse duration and return ttl, currently only support time part of psql interval type +pub fn parse_duration(input: &str) -> IResult<&str, i64> { + let mut intervals = vec![]; + let mut remain = input; + while peek(parse_quality)(remain).is_ok() { + let (r, number) = parse_quality(remain)?; + let (r, unit) = parse_time_unit(r)?; + intervals.push((number, unit)); + remain = r; + } + let mut total = 0; + for (number, unit) in intervals { + let number = match unit { + TimeUnit::Second => number, + TimeUnit::Minute => number * 60, + TimeUnit::Hour => number * 60 * 60, + }; + total += number; + } + total *= 1000; + Ok((remain, total)) +} + +enum Expr { + Col(String), + Now, + Duration(repr::Duration), + Binary { + left: Box, + op: String, + right: Box, + }, +} + +fn parse_expr(input: &str) -> IResult<&str, Expr> { + parse_expr_bp(input, 0) +} + +/// a simple pratt parser +fn parse_expr_bp(input: &str, min_bp: u8) -> IResult<&str, Expr> { + let (mut input, mut lhs): (&str, Expr) = parse_item(input)?; + loop { + let (r, op) = parse_op(input)?; + let (_, (l_bp, r_bp)) = infix_binding_power(op)?; + if l_bp < min_bp { + return Ok((input, lhs)); + } + let (r, rhs) = parse_expr_bp(r, r_bp)?; + input = r; + lhs = Expr::Binary { + left: Box::new(lhs), + op: op.to_string(), + right: Box::new(rhs), + }; + } +} + +fn parse_op(input: &str) -> IResult<&str, &str> { + alt((parse_add_sub, parse_cmp))(input) +} + +fn parse_item(input: &str) -> IResult<&str, Expr> { + if let Ok((r, name)) = parse_col_name(input) { + Ok((r, Expr::Col(name.to_string()))) + } else if let Ok((r, _now)) = parse_now(input) { + Ok((r, Expr::Now)) + } else if let Ok((_r, _num)) = parse_quality(input) { + todo!() + } else { + todo!() + } +} + +fn infix_binding_power(op: &str) -> IResult<&str, (u8, u8)> { + let ret = match op { + "<" | ">" | "<=" | ">=" => (1, 2), + "+" | "-" => (3, 4), + _ => { + return Err(nom::Err::Error(nom::error::Error::new( + op, + nom::error::ErrorKind::Fail, + ))) + } + }; + Ok((op, ret)) +} + +fn parse_col_name(input: &str) -> IResult<&str, &str> { + tuple((multispace0, alphanumeric1, multispace0))(input).map(|(r, (_, name, _))| (r, name)) +} + +fn parse_now(input: &str) -> IResult<&str, &str> { + tag_no_case("now()")(input) +} + +fn parse_add_sub(input: &str) -> IResult<&str, &str> { + tuple((multispace0, alt((tag("+"), tag("-"))), multispace0))(input) + .map(|(r, (_, op, _))| (r, op)) +} + +fn parse_cmp(input: &str) -> IResult<&str, &str> { + tuple(( + multispace0, + alt((tag("<="), tag(">="), tag("<"), tag(">"))), + multispace0, + ))(input) + .map(|(r, (_, op, _))| (r, op)) +} + +/// parse a number with optional sign +fn parse_quality(input: &str) -> IResult<&str, repr::Duration> { + tuple(( + multispace0, + alt((tag("+"), tag("-"), tag(""))), + digit0, + multispace0, + ))(input) + .map(|(r, (_, sign, name, _))| (r, sign, name)) + .and_then(|(r, sign, name)| { + let num = name.parse::().map_err(|_| { + nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Digit)) + })?; + let num = match sign { + "+" => num, + "-" => -num, + _ => num, + }; + Ok((r, num)) + }) +} + +#[derive(Debug, Clone)] +enum TimeUnit { + Second, + Minute, + Hour, +} + +#[derive(Debug, Clone)] +enum DateUnit { + Day, + Month, + Year, +} + +fn parse_time_unit(input: &str) -> IResult<&str, TimeUnit> { + fn to_second(input: &str) -> IResult<&str, TimeUnit> { + alt(( + tag_no_case("second"), + tag_no_case("seconds"), + tag_no_case("S"), + ))(input) + .map(move |(r, _)| (r, TimeUnit::Second)) + } + fn to_minute(input: &str) -> IResult<&str, TimeUnit> { + alt(( + tag_no_case("minute"), + tag_no_case("minutes"), + tag_no_case("m"), + ))(input) + .map(move |(r, _)| (r, TimeUnit::Minute)) + } + fn to_hour(input: &str) -> IResult<&str, TimeUnit> { + alt((tag_no_case("hour"), tag_no_case("hours"), tag_no_case("h")))(input) + .map(move |(r, _)| (r, TimeUnit::Hour)) + } + + tuple(( + multispace0, + alt(( + to_second, to_minute, + to_hour, /* + tag_no_case("day"), + tag_no_case("days"), + tag_no_case("d"), + tag_no_case("month"), + tag_no_case("months"), + tag_no_case("m"), + tag_no_case("year"), + tag_no_case("years"), + tag_no_case("y"), + */ + )), + multispace0, + ))(input) + .map(|(r, (_, unit, _))| (r, unit)) +} diff --git a/src/flow/src/adapter/server.rs b/src/flow/src/adapter/server.rs new file mode 100644 index 000000000000..c0d0854572c7 --- /dev/null +++ b/src/flow/src/adapter/server.rs @@ -0,0 +1,147 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Implementation of grpc service for flow node + +use std::net::SocketAddr; + +use common_meta::node_manager::Flownode; +use common_telemetry::tracing::info; +use futures::FutureExt; +use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests}; +use itertools::Itertools; +use servers::error::{AlreadyStartedSnafu, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu}; +use snafu::{ensure, ResultExt}; +use tokio::net::TcpListener; +use tokio::sync::{oneshot, Mutex}; +use tonic::transport::server::TcpIncoming; +use tonic::{Request, Response, Status}; + +use crate::adapter::FlownodeManagerRef; +pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER"; + +/// wrapping flow node manager to avoid orphan rule with Arc<...> +#[derive(Clone)] +pub struct FlowService { + pub manager: FlownodeManagerRef, +} + +#[async_trait::async_trait] +impl flow_server::Flow for FlowService { + async fn handle_create_remove( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + self.manager + .handle(request) + .await + .map(Response::new) + .map_err(|e| { + let msg = format!("failed to handle request: {:?}", e); + Status::internal(msg) + }) + } + + async fn handle_mirror_request( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + // TODO(discord9): fix protobuf import order shenanigans to remove this duplicated define + let request = api::v1::region::InsertRequests { + requests: request + .requests + .into_iter() + .map(|insert| api::v1::region::InsertRequest { + region_id: insert.region_id, + rows: insert.rows, + }) + .collect_vec(), + }; + self.manager + .handle_inserts(request) + .await + .map(Response::new) + .map_err(|e| { + let msg = format!("failed to handle request: {:?}", e); + Status::internal(msg) + }) + } +} + +pub struct FlownodeServer { + pub shutdown_tx: Mutex>>, + pub flow_service: FlowService, +} + +impl FlownodeServer { + pub fn create_flow_service(&self) -> flow_server::FlowServer { + flow_server::FlowServer::new(self.flow_service.clone()) + } +} + +#[async_trait::async_trait] +impl servers::server::Server for FlownodeServer { + async fn shutdown(&self) -> Result<(), servers::error::Error> { + let mut shutdown_tx = self.shutdown_tx.lock().await; + if let Some(tx) = shutdown_tx.take() { + if tx.send(()).is_err() { + info!("Receiver dropped, the flow node server has already shutdown"); + } + } + info!("Shutdown flow node server"); + + Ok(()) + } + async fn start(&self, addr: SocketAddr) -> Result { + let (tx, rx) = oneshot::channel::<()>(); + let (incoming, addr) = { + let mut shutdown_tx = self.shutdown_tx.lock().await; + ensure!( + shutdown_tx.is_none(), + AlreadyStartedSnafu { server: "flow" } + ); + let listener = TcpListener::bind(addr) + .await + .context(TcpBindSnafu { addr })?; + let addr = listener.local_addr().context(TcpBindSnafu { addr })?; + let incoming = + TcpIncoming::from_listener(listener, true, None).context(TcpIncomingSnafu)?; + info!("flow server is bound to {}", addr); + + *shutdown_tx = Some(tx); + + (incoming, addr) + }; + + let builder = tonic::transport::Server::builder().add_service(self.create_flow_service()); + let _handle = common_runtime::spawn_bg(async move { + let _result = builder + .serve_with_incoming_shutdown(incoming, rx.map(drop)) + .await + .context(StartGrpcSnafu); + }); + + // TODO(discord9): better place for dataflow to run per second + let manager_ref = self.flow_service.manager.clone(); + let _handle = manager_ref.clone().run_background(); + + Ok(addr) + } + + fn name(&self) -> &str { + FLOW_NODE_SERVER_NAME + } +} diff --git a/src/flow/src/adapter/tests.rs b/src/flow/src/adapter/tests.rs new file mode 100644 index 000000000000..4690ff54f021 --- /dev/null +++ b/src/flow/src/adapter/tests.rs @@ -0,0 +1,64 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Mock test for adapter module +//! TODO(discord9): write mock test + +use datatypes::schema::{ColumnSchema, SchemaBuilder}; +use store_api::storage::ConcreteDataType; +use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder}; + +use super::*; + +pub fn new_test_table_info_with_name>( + table_id: TableId, + table_name: &str, + region_numbers: I, +) -> TableInfo { + let column_schemas = vec![ + ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ]; + let schema = SchemaBuilder::try_from(column_schemas) + .unwrap() + .version(123) + .build() + .unwrap(); + + let meta = TableMetaBuilder::default() + .schema(Arc::new(schema)) + .primary_key_indices(vec![0]) + .engine("engine") + .next_column_id(3) + .region_numbers(region_numbers.into_iter().collect::>()) + .build() + .unwrap(); + TableInfoBuilder::default() + .table_id(table_id) + .table_version(5) + .name(table_name) + .meta(meta) + .build() + .unwrap() +} + +/// Create a mock harness for flow node manager +/// +/// containing several default table info and schema +fn mock_harness_flow_node_manager() {} diff --git a/src/flow/src/compute/render/src_sink.rs b/src/flow/src/compute/render/src_sink.rs index 77f3e4105382..33ecb9670caa 100644 --- a/src/flow/src/compute/render/src_sink.rs +++ b/src/flow/src/compute/render/src_sink.rs @@ -36,6 +36,7 @@ impl<'referred, 'df> Context<'referred, 'df> { &mut self, mut src_recv: broadcast::Receiver, ) -> Result { + debug!("Rendering Source"); let (send_port, recv_port) = self.df.make_edge::<_, Toff>("source"); let arrange_handler = self.compute_state.new_arrange(None); let arrange_handler_inner = @@ -60,7 +61,6 @@ impl<'referred, 'df> Context<'referred, 'df> { let prev_avail = arr.into_iter().map(|((k, _), t, d)| (k, t, d)); let mut to_send = Vec::new(); let mut to_arrange = Vec::new(); - // TODO(discord9): handling tokio broadcast error while let Ok((r, t, d)) = src_recv.try_recv() { if t <= now { @@ -72,7 +72,7 @@ impl<'referred, 'df> Context<'referred, 'df> { let all = prev_avail.chain(to_send).collect_vec(); if !all.is_empty() || !to_arrange.is_empty() { debug!( - "All send: {} rows, not yet send: {} rows", + "Rendered Source All send: {} rows, not yet send: {} rows", all.len(), to_arrange.len() ); diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index af51d0a53a2c..098de9c102e1 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -43,6 +43,7 @@ impl TypedExpr { } } +/// TODO(discord9): add tumble function here /// A scalar expression, which can be evaluated to a value. #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum ScalarExpr { diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs index dac53002cbc1..950920bf9f7f 100644 --- a/src/flow/src/lib.rs +++ b/src/flow/src/lib.rs @@ -30,3 +30,8 @@ mod plan; mod repr; mod transform; mod utils; + +pub use adapter::{ + start_flow_node_with_one_worker, FlownodeBuilder, FlownodeManager, FlownodeManagerRef, + FlownodeOptions, +}; diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index 911c7fd30b11..5c9e7a46f65a 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -31,16 +31,19 @@ use snafu::{OptionExt, ResultExt}; use crate::error::{InvalidRegionRequestSnafu, InvokeRegionServerSnafu, Result}; -pub struct StandaloneDatanodeManager(pub RegionServer); +pub struct StandaloneDatanodeManager { + pub region_server: RegionServer, + pub flow_server: FlownodeRef, +} #[async_trait] impl NodeManager for StandaloneDatanodeManager { async fn datanode(&self, _datanode: &Peer) -> DatanodeRef { - RegionInvoker::arc(self.0.clone()) + RegionInvoker::arc(self.region_server.clone()) } async fn flownode(&self, _node: &Peer) -> FlownodeRef { - unimplemented!() + self.flow_server.clone() } } diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 01681c37b0c1..fe6e3e3b0600 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -39,6 +39,7 @@ common-wal.workspace = true datanode = { workspace = true } datatypes.workspace = true dotenv.workspace = true +flow.workspace = true frontend = { workspace = true, features = ["testing"] } futures.workspace = true futures-util.workspace = true diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 4d99c9744fab..85d7cbf946e1 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -35,6 +35,7 @@ use common_procedure::options::ProcedureConfig; use common_procedure::ProcedureManagerRef; use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; use datanode::datanode::DatanodeBuilder; +use flow::FlownodeBuilder; use frontend::instance::builder::FrontendBuilder; use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager}; use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ}; @@ -128,6 +129,7 @@ impl GreptimeDbStandaloneBuilder { let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); table_metadata_manager.init().await.unwrap(); + let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone())); let layered_cache_builder = LayeredCacheRegistryBuilder::default(); @@ -149,7 +151,19 @@ impl GreptimeDbStandaloneBuilder { ) .await; - let node_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server())); + let flow_builder = FlownodeBuilder::new( + Default::default(), + plugins.clone(), + table_metadata_manager.clone(), + catalog_manager.clone(), + ) + .with_kv_backend(kv_backend.clone()); + let flownode = Arc::new(flow_builder.build().await); + + let node_manager = Arc::new(StandaloneDatanodeManager { + region_server: datanode.region_server(), + flow_server: flownode.clone(), + }); let table_id_sequence = Arc::new( SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone()) @@ -204,6 +218,11 @@ impl GreptimeDbStandaloneBuilder { .await .unwrap(); + flownode + .set_frontend_invoker(Box::new(instance.clone())) + .await; + let _node_handle = flownode.run_background(); + procedure_manager.start().await.unwrap(); wal_options_allocator.start().await.unwrap();