Skip to content

Commit

Permalink
feat(processing_engine): error handling for triggers. (#26086)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacksonrnewhouse authored Mar 4, 2025
1 parent 357c05f commit c930d9e
Show file tree
Hide file tree
Showing 12 changed files with 370 additions and 154 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 12 additions & 2 deletions influxdb3/src/commands/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use base64::Engine as _;
use base64::engine::general_purpose::URL_SAFE_NO_PAD as B64;
use hashbrown::HashMap;
use influxdb3_client::Client;
use influxdb3_wal::TriggerSpecificationDefinition;
use influxdb3_wal::{ErrorBehavior, TriggerSettings, TriggerSpecificationDefinition};
use rand::RngCore;
use rand::rngs::OsRng;
use secrecy::ExposeSecret;
Expand Down Expand Up @@ -228,8 +228,12 @@ pub struct TriggerConfig {
/// Create trigger in disabled state
#[clap(long)]
disabled: bool,
/// Run each instance of the trigger asynchronously, allowing multiple triggers to run simultaneously.
#[clap(long)]
run_asynchronous: bool,
/// How you wish the system to respond in the event of an error from the plugin
#[clap(long, value_enum, default_value_t = ErrorBehavior::Log)]
error_behavior: ErrorBehavior,
/// Name for the new trigger
trigger_name: String,
}
Expand Down Expand Up @@ -353,13 +357,19 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
trigger_arguments,
disabled,
run_asynchronous,
error_behavior,
}) => {
let trigger_arguments: Option<HashMap<String, String>> = trigger_arguments.map(|a| {
a.into_iter()
.map(|SeparatedKeyValue((k, v))| (k, v))
.collect::<HashMap<String, String>>()
});

let trigger_settings = TriggerSettings {
run_async: run_asynchronous,
error_behavior,
};

match client
.api_v3_configure_processing_engine_trigger_create(
database_name,
Expand All @@ -368,7 +378,7 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
trigger_specification.string_rep(),
trigger_arguments,
disabled,
run_asynchronous,
trigger_settings,
)
.await
{
Expand Down
8 changes: 4 additions & 4 deletions influxdb3_catalog/src/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use influxdb3_id::SerdeVecMap;
use influxdb3_id::TableId;
use influxdb3_wal::{
DistinctCacheDefinition, LastCacheDefinition, LastCacheValueColumnsDef, PluginType,
TriggerDefinition, TriggerFlag,
TriggerDefinition, TriggerSettings,
};
use schema::InfluxColumnType;
use schema::InfluxFieldType;
Expand Down Expand Up @@ -155,7 +155,7 @@ impl From<DatabaseSnapshot> for DatabaseSchema {
trigger_name: trigger.trigger_name,
plugin_filename: trigger.plugin_filename,
trigger: serde_json::from_str(&trigger.trigger_specification).unwrap(),
flags: trigger.flags,
trigger_settings: trigger.trigger_settings,
trigger_arguments: trigger.trigger_arguments,
disabled: trigger.disabled,
database_name: trigger.database_name,
Expand Down Expand Up @@ -226,7 +226,7 @@ struct ProcessingEngineTriggerSnapshot {
pub plugin_filename: String,
pub database_name: String,
pub trigger_specification: String,
pub flags: Vec<TriggerFlag>,
pub trigger_settings: TriggerSettings,
pub trigger_arguments: Option<HashMap<String, String>>,
pub disabled: bool,
}
Expand Down Expand Up @@ -475,7 +475,7 @@ impl From<&TriggerDefinition> for ProcessingEngineTriggerSnapshot {
trigger_name: trigger.trigger_name.to_string(),
plugin_filename: trigger.plugin_filename.to_string(),
database_name: trigger.database_name.to_string(),
flags: trigger.flags.clone(),
trigger_settings: trigger.trigger_settings,
trigger_specification: serde_json::to_string(&trigger.trigger)
.expect("should be able to serialize trigger specification"),
trigger_arguments: trigger.trigger_arguments.clone(),
Expand Down
11 changes: 3 additions & 8 deletions influxdb3_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use url::Url;

use influxdb3_types::http::*;
pub use influxdb3_types::write::Precision;
use influxdb3_wal::TriggerFlag;
use influxdb3_wal::TriggerSettings;

/// Primary error type for the [`Client`]
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -466,13 +466,8 @@ impl Client {
trigger_spec: impl Into<String> + Send,
trigger_arguments: Option<HashMap<String, String>>,
disabled: bool,
execute_async: bool,
trigger_settings: TriggerSettings,
) -> Result<()> {
let flags = if execute_async {
vec![TriggerFlag::ExecuteAsynchronously]
} else {
vec![]
};
let _bytes = self
.send_json_get_bytes(
Method::POST,
Expand All @@ -482,7 +477,7 @@ impl Client {
trigger_name: trigger_name.into(),
plugin_filename: plugin_filename.into(),
trigger_specification: trigger_spec.into(),
flags,
trigger_settings,
trigger_arguments,
disabled,
}),
Expand Down
36 changes: 25 additions & 11 deletions influxdb3_processing_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use influxdb3_types::http::{
use influxdb3_wal::PluginType;
use influxdb3_wal::{
CatalogBatch, CatalogOp, DeleteTriggerDefinition, SnapshotDetails, TriggerDefinition,
TriggerFlag, TriggerIdentifier, TriggerSpecificationDefinition, Wal, WalContents,
TriggerIdentifier, TriggerSettings, TriggerSpecificationDefinition, Wal, WalContents,
WalFileNotifier, WalOp,
};
use influxdb3_write::WriteBuffer;
Expand Down Expand Up @@ -346,7 +346,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
db_name: &str,
trigger_name: String,
plugin_filename: String,
flags: Vec<TriggerFlag>,
trigger_settings: TriggerSettings,
trigger_specification: TriggerSpecificationDefinition,
trigger_arguments: Option<HashMap<String, String>>,
disabled: bool,
Expand All @@ -362,7 +362,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
trigger_name,
plugin_filename,
trigger: trigger_specification,
flags,
trigger_settings,
trigger_arguments,
disabled,
database_name: db_name.to_string(),
Expand Down Expand Up @@ -428,6 +428,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
&self,
write_buffer: Arc<dyn WriteBuffer>,
query_executor: Arc<dyn QueryExecutor>,
processing_engine_manager: Arc<dyn ProcessingEngineManager>,
db_name: &str,
trigger_name: &str,
) -> Result<(), ProcessingEngineError> {
Expand All @@ -451,6 +452,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
write_buffer,
query_executor,
sys_event_store: Arc::clone(&self.sys_event_store),
manager: processing_engine_manager,
};
let plugin_code = Arc::new(self.read_plugin_code(&trigger.plugin_filename).await?);
match trigger.trigger.plugin_type() {
Expand Down Expand Up @@ -578,6 +580,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
&self,
write_buffer: Arc<dyn WriteBuffer>,
query_executor: Arc<dyn QueryExecutor>,
manager: Arc<dyn ProcessingEngineManager>,
db_name: &str,
trigger_name: &str,
) -> Result<(), ProcessingEngineError> {
Expand Down Expand Up @@ -611,17 +614,21 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
self.wal.write_ops(vec![wal_op]).await?;
}

self.run_trigger(write_buffer, query_executor, db_name, trigger_name)
self.run_trigger(write_buffer, query_executor, manager, db_name, trigger_name)
.await?;
Ok(())
}

async fn start_triggers(&self) -> Result<(), ProcessingEngineError> {
async fn start_triggers(
&self,
manager: Arc<dyn ProcessingEngineManager>,
) -> Result<(), ProcessingEngineError> {
let triggers = self.catalog.active_triggers();
for (db_name, trigger_name) in triggers {
self.run_trigger(
Arc::clone(&self.write_buffer),
Arc::clone(&self.query_executor),
Arc::clone(&manager),
&db_name,
&trigger_name,
)
Expand Down Expand Up @@ -803,7 +810,7 @@ mod tests {
use influxdb3_catalog::catalog;
use influxdb3_internal_api::query_executor::UnimplementedQueryExecutor;
use influxdb3_sys_events::SysEventStore;
use influxdb3_wal::{Gen1Duration, TriggerSpecificationDefinition, WalConfig};
use influxdb3_wal::{Gen1Duration, TriggerSettings, TriggerSpecificationDefinition, WalConfig};
use influxdb3_write::persister::Persister;
use influxdb3_write::write_buffer::{WriteBufferImpl, WriteBufferImplArgs};
use influxdb3_write::{Precision, WriteBuffer};
Expand Down Expand Up @@ -840,6 +847,8 @@ mod tests {

// convert to Arc<WriteBuffer>
let write_buffer: Arc<dyn WriteBuffer> = Arc::clone(&pem.write_buffer);
let query_executor = Arc::clone(&pem.query_executor);
let pem: Arc<dyn ProcessingEngineManager> = Arc::new(pem);

// Create the DB by inserting a line.
write_buffer
Expand All @@ -858,7 +867,7 @@ mod tests {
"foo",
"test_trigger".to_string(),
file_name,
vec![],
TriggerSettings::default(),
TriggerSpecificationDefinition::AllTablesWalWrite,
None,
false,
Expand All @@ -868,7 +877,8 @@ mod tests {
// Run the trigger
pem.run_trigger(
Arc::clone(&write_buffer),
Arc::clone(&pem.query_executor),
Arc::clone(&query_executor),
Arc::clone(&pem),
"foo",
"test_trigger",
)
Expand All @@ -891,7 +901,8 @@ mod tests {
let result = pem
.enable_trigger(
Arc::clone(&write_buffer),
Arc::clone(&pem.query_executor),
Arc::clone(&query_executor),
Arc::clone(&pem),
"foo",
"test_trigger",
)
Expand Down Expand Up @@ -944,7 +955,7 @@ mod tests {
"foo",
"test_trigger".to_string(),
file_name,
vec![],
TriggerSettings::default(),
TriggerSpecificationDefinition::AllTablesWalWrite,
None,
true,
Expand Down Expand Up @@ -978,6 +989,8 @@ mod tests {
let (pem, _file_name) = setup(start_time, test_store, wal_config).await;

let write_buffer: Arc<dyn WriteBuffer> = Arc::clone(&pem.write_buffer);
let query_executor = Arc::clone(&pem.query_executor);
let pem: Arc<dyn ProcessingEngineManager> = Arc::new(pem);

// Create the DB by inserting a line.
write_buffer
Expand All @@ -994,7 +1007,8 @@ mod tests {
let result = pem
.enable_trigger(
Arc::clone(&write_buffer),
Arc::clone(&pem.query_executor),
Arc::clone(&query_executor),
Arc::clone(&pem),
"foo",
"nonexistent_trigger",
)
Expand Down
11 changes: 8 additions & 3 deletions influxdb3_processing_engine/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use influxdb3_types::http::{
SchedulePluginTestRequest, SchedulePluginTestResponse, WalPluginTestRequest,
WalPluginTestResponse,
};
use influxdb3_wal::{TriggerFlag, TriggerSpecificationDefinition};
use influxdb3_wal::{TriggerSettings, TriggerSpecificationDefinition};
use influxdb3_write::WriteBuffer;
use std::fmt::Debug;
use std::sync::Arc;
Expand Down Expand Up @@ -60,7 +60,7 @@ pub trait ProcessingEngineManager: Debug + Send + Sync + 'static {
db_name: &str,
trigger_name: String,
plugin_filename: String,
flags: Vec<TriggerFlag>,
trigger_settings: TriggerSettings,
trigger_specification: TriggerSpecificationDefinition,
trigger_arguments: Option<HashMap<String, String>>,
disabled: bool,
Expand All @@ -78,6 +78,7 @@ pub trait ProcessingEngineManager: Debug + Send + Sync + 'static {
&self,
write_buffer: Arc<dyn WriteBuffer>,
query_executor: Arc<dyn QueryExecutor>,
processing_engine_manager: Arc<dyn ProcessingEngineManager>,
db_name: &str,
trigger_name: &str,
) -> Result<(), ProcessingEngineError>;
Expand All @@ -92,11 +93,15 @@ pub trait ProcessingEngineManager: Debug + Send + Sync + 'static {
&self,
write_buffer: Arc<dyn WriteBuffer>,
query_executor: Arc<dyn QueryExecutor>,
manager: Arc<dyn ProcessingEngineManager>,
db_name: &str,
trigger_name: &str,
) -> Result<(), ProcessingEngineError>;

async fn start_triggers(&self) -> Result<(), ProcessingEngineError>;
async fn start_triggers(
&self,
manager: Arc<dyn ProcessingEngineManager>,
) -> Result<(), ProcessingEngineError>;

async fn test_wal_plugin(
&self,
Expand Down
Loading

0 comments on commit c930d9e

Please sign in to comment.