Skip to content

Commit

Permalink
Support typed SQL opts in with clause (#825)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde authored Jan 24, 2025
1 parent 96a09de commit c4014a4
Show file tree
Hide file tree
Showing 35 changed files with 682 additions and 547 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions crates/arroyo-connectors/src/blackhole/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ use arroyo_operator::operator::ConstructedOperator;
use arroyo_rpc::api_types::connections::{
ConnectionProfile, ConnectionSchema, ConnectionType, TestSourceMessage,
};
use arroyo_rpc::OperatorConfig;
use std::collections::HashMap;
use arroyo_rpc::{ConnectorOptions, OperatorConfig};

use crate::EmptyConfig;

Expand Down Expand Up @@ -75,7 +74,7 @@ impl Connector for BlackholeConnector {
fn from_options(
&self,
name: &str,
_options: &mut HashMap<String, String>,
_options: &mut ConnectorOptions,
schema: Option<&ConnectionSchema>,
_profile: Option<&ConnectionProfile>,
) -> anyhow::Result<Connection> {
Expand Down
23 changes: 12 additions & 11 deletions crates/arroyo-connectors/src/confluent/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use crate::kafka;
use crate::kafka::{
KafkaConfig, KafkaConfigAuthentication, KafkaConnector, KafkaTable, KafkaTester, TableType,
};
use crate::{kafka, pull_opt};
use anyhow::anyhow;
use arroyo_operator::connector::{Connection, Connector};
use arroyo_operator::operator::ConstructedOperator;
use arroyo_rpc::api_types::connections::{
ConnectionProfile, ConnectionSchema, ConnectionType, TestSourceMessage,
};
use arroyo_rpc::var_str::VarStr;
use arroyo_rpc::OperatorConfig;
use arroyo_rpc::{ConnectorOptions, OperatorConfig};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tokio::sync::mpsc::Sender;
Expand All @@ -33,12 +33,13 @@ pub struct ConfluentConnector {}

impl ConfluentConnector {
pub fn connection_from_options(
opts: &mut HashMap<String, String>,
opts: &mut ConnectorOptions,
) -> anyhow::Result<ConfluentProfile> {
let schema_registry: Option<anyhow::Result<_>> =
opts.remove("schema_registry.endpoint").map(|endpoint| {
let api_key = VarStr::new(pull_opt("schema_registry.api_key", opts)?);
let api_secret = VarStr::new(pull_opt("schema_registry.api_secret", opts)?);
let schema_registry: Option<anyhow::Result<_>> = opts
.pull_opt_str("schema_registry.endpoint")?
.map(|endpoint| {
let api_key = VarStr::new(opts.pull_str("schema_registry.api_key")?);
let api_secret = VarStr::new(opts.pull_str("schema_registry.api_secret")?);
Ok(ConfluentSchemaRegistry {
endpoint: Some(endpoint),
api_key: Some(api_key),
Expand All @@ -47,9 +48,9 @@ impl ConfluentConnector {
});

Ok(ConfluentProfile {
bootstrap_servers: BootstrapServers(pull_opt("bootstrap_servers", opts)?),
key: VarStr::new(pull_opt("key", opts)?),
secret: VarStr::new(pull_opt("secret", opts)?),
bootstrap_servers: BootstrapServers(opts.pull_str("bootstrap_servers")?),
key: VarStr::new(opts.pull_str("key")?),
secret: VarStr::new(opts.pull_str("secret")?),
schema_registry: schema_registry.transpose()?,
})
}
Expand Down Expand Up @@ -159,7 +160,7 @@ impl Connector for ConfluentConnector {
fn from_options(
&self,
name: &str,
options: &mut HashMap<String, String>,
options: &mut ConnectorOptions,
schema: Option<&ConnectionSchema>,
profile: Option<&ConnectionProfile>,
) -> anyhow::Result<Connection> {
Expand Down
7 changes: 3 additions & 4 deletions crates/arroyo-connectors/src/filesystem/delta.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use anyhow::{anyhow, bail};
use arroyo_operator::connector::Connection;
use arroyo_storage::BackendConfig;
use std::collections::HashMap;

use arroyo_rpc::api_types::connections::{
ConnectionProfile, ConnectionSchema, ConnectionType, TestSourceMessage,
};
use arroyo_rpc::OperatorConfig;
use arroyo_rpc::{ConnectorOptions, OperatorConfig};

use crate::filesystem::{
file_system_sink_from_options, CommitStyle, FileSystemTable, FormatSettings, TableType,
Expand Down Expand Up @@ -140,9 +139,9 @@ impl Connector for DeltaLakeConnector {
fn from_options(
&self,
name: &str,
options: &mut HashMap<String, String>,
options: &mut ConnectorOptions,
schema: Option<&ConnectionSchema>,
_profile: Option<&ConnectionProfile>,
_: Option<&ConnectionProfile>,
) -> anyhow::Result<Connection> {
let table = file_system_sink_from_options(options, schema, CommitStyle::DeltaLake)?;

Expand Down
62 changes: 34 additions & 28 deletions crates/arroyo-connectors/src/filesystem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ use arroyo_rpc::api_types::connections::{
ConnectionProfile, ConnectionSchema, ConnectionType, TestSourceMessage,
};
use arroyo_rpc::formats::Format;
use arroyo_rpc::OperatorConfig;
use arroyo_rpc::{ConnectorOptions, OperatorConfig};
use serde::{Deserialize, Serialize};

use crate::{pull_opt, pull_option_to_i64, EmptyConfig};
use crate::EmptyConfig;

use crate::filesystem::source::FileSystemSourceFunc;
use arroyo_operator::connector::Connector;
Expand Down Expand Up @@ -185,19 +185,19 @@ impl Connector for FileSystemConnector {
fn from_options(
&self,
name: &str,
options: &mut HashMap<String, String>,
options: &mut ConnectorOptions,
schema: Option<&ConnectionSchema>,
_profile: Option<&ConnectionProfile>,
_: Option<&ConnectionProfile>,
) -> anyhow::Result<Connection> {
match options.remove("type") {
match options.pull_opt_str("type")? {
Some(t) if t == "source" => {
let (storage_url, storage_options) = get_storage_url_and_options(options)?;
let compression_format = options
.remove("compression_format")
.pull_opt_str("compression_format")?
.map(|format| format.as_str().try_into().map_err(|err: &str| anyhow!(err)))
.transpose()?
.unwrap_or(CompressionFormat::None);
let matching_pattern = options.remove("source.regex-pattern");
let matching_pattern = options.pull_opt_str("source.regex-pattern")?;
self.from_config(
None,
name,
Expand Down Expand Up @@ -277,47 +277,53 @@ impl Connector for FileSystemConnector {
}

fn get_storage_url_and_options(
opts: &mut HashMap<String, String>,
opts: &mut ConnectorOptions,
) -> Result<(String, HashMap<String, String>)> {
let storage_url = pull_opt("path", opts)?;
let storage_options: HashMap<String, String> = opts
let storage_url = opts.pull_str("path")?;
let storage_keys: Vec<_> = opts.keys_with_prefix("storage.").cloned().collect();

let storage_options = storage_keys
.iter()
.filter(|(k, _)| k.starts_with("storage."))
.map(|(k, v)| (k.trim_start_matches("storage.").to_string(), v.to_string()))
.collect();
opts.retain(|k, _| !k.starts_with("storage."));
.map(|k| {
Ok((
k.trim_start_matches("storage.").to_string(),
opts.pull_str(k)?,
))
})
.collect::<Result<HashMap<String, String>>>()?;

BackendConfig::parse_url(&storage_url, true)?;
Ok((storage_url, storage_options))
}

pub fn file_system_sink_from_options(
opts: &mut std::collections::HashMap<String, String>,
opts: &mut ConnectorOptions,
schema: Option<&ConnectionSchema>,
commit_style: CommitStyle,
) -> Result<FileSystemTable> {
let (storage_url, storage_options) = get_storage_url_and_options(opts)?;

let inactivity_rollover_seconds = pull_option_to_i64("inactivity_rollover_seconds", opts)?;
let max_parts = pull_option_to_i64("max_parts", opts)?;
let rollover_seconds = pull_option_to_i64("rollover_seconds", opts)?;
let target_file_size = pull_option_to_i64("target_file_size", opts)?;
let target_part_size = pull_option_to_i64("target_part_size", opts)?;
let prefix = opts.remove("filename.prefix");
let suffix = opts.remove("filename.suffix");
let inactivity_rollover_seconds = opts.pull_opt_i64("inactivity_rollover_seconds")?;
let max_parts = opts.pull_opt_i64("max_parts")?;
let rollover_seconds = opts.pull_opt_i64("rollover_seconds")?;
let target_file_size = opts.pull_opt_i64("target_file_size")?;
let target_part_size = opts.pull_opt_i64("target_part_size")?;
let prefix = opts.pull_opt_str("filename.prefix")?;
let suffix = opts.pull_opt_str("filename.suffix")?;
let strategy = opts
.remove("filename.strategy")
.pull_opt_str("filename.strategy")?
.map(|value| {
FilenameStrategy::try_from(&value)
.map_err(|_err| anyhow!("{} is not a valid Filenaming Strategy", value))
})
.transpose()?;

let partition_fields: Vec<_> = opts
.remove("partition_fields")
.pull_opt_str("partition_fields")?
.map(|fields| fields.split(',').map(|f| f.to_string()).collect())
.unwrap_or_default();

let time_partition_pattern = opts.remove("time_partition_pattern");
let time_partition_pattern = opts.pull_opt_str("time_partition_pattern")?;

let partitioning = if time_partition_pattern.is_some() || !partition_fields.is_empty() {
Some(Partitioning {
Expand Down Expand Up @@ -354,15 +360,15 @@ pub fn file_system_sink_from_options(
))? {
Format::Parquet(..) => {
let compression = opts
.remove("parquet_compression")
.pull_opt_str("parquet_compression")?
.map(|value| {
Compression::try_from(&value).map_err(|_err| {
anyhow!("{} is not a valid parquet_compression argument", value)
})
})
.transpose()?;
let row_batch_size = pull_option_to_i64("parquet_row_batch_size", opts)?;
let row_group_size = pull_option_to_i64("parquet_row_group_size", opts)?;
let row_batch_size = opts.pull_opt_i64("parquet_row_batch_size")?;
let row_group_size = opts.pull_opt_i64("parquet_row_group_size")?;
Some(FormatSettings::Parquet {
compression,
row_batch_size,
Expand Down
17 changes: 8 additions & 9 deletions crates/arroyo-connectors/src/fluvio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@ use arroyo_formats::ser::ArrowSerializer;
use arroyo_operator::connector::{Connection, Connector};
use arroyo_operator::operator::ConstructedOperator;
use arroyo_rpc::api_types::connections::{ConnectionProfile, ConnectionSchema, TestSourceMessage};
use arroyo_rpc::OperatorConfig;
use arroyo_rpc::{ConnectorOptions, OperatorConfig};
use fluvio::Offset;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use typify::import_types;

use crate::fluvio::sink::FluvioSinkFunc;
use crate::fluvio::source::FluvioSourceFunc;
use crate::{pull_opt, ConnectionType, EmptyConfig};
use crate::{ConnectionType, EmptyConfig};

mod sink;
mod source;
Expand Down Expand Up @@ -85,17 +84,17 @@ impl Connector for FluvioConnector {
fn from_options(
&self,
name: &str,
options: &mut HashMap<String, String>,
options: &mut ConnectorOptions,
schema: Option<&ConnectionSchema>,
_profile: Option<&ConnectionProfile>,
_: Option<&ConnectionProfile>,
) -> anyhow::Result<Connection> {
let endpoint = options.remove("endpoint");
let topic = pull_opt("topic", options)?;
let table_type = pull_opt("type", options)?;
let endpoint = options.pull_opt_str("endpoint")?;
let topic = options.pull_str("topic")?;
let table_type = options.pull_str("type")?;

let table_type = match table_type.as_str() {
"source" => {
let offset = options.remove("source.offset");
let offset = options.pull_opt_str("source.offset")?;
TableType::Source {
offset: match offset.as_deref() {
Some("earliest") => SourceOffset::Earliest,
Expand Down
28 changes: 8 additions & 20 deletions crates/arroyo-connectors/src/impulse/mod.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
mod operator;

use anyhow::{anyhow, bail};
use anyhow::bail;
use arroyo_operator::connector::{Connection, Connector};
use arroyo_operator::operator::ConstructedOperator;
use arroyo_rpc::api_types::connections::FieldType::Primitive;
use arroyo_rpc::api_types::connections::{
ConnectionProfile, ConnectionSchema, PrimitiveType, TestSourceMessage,
};
use arroyo_rpc::OperatorConfig;
use arroyo_rpc::{ConnectorOptions, OperatorConfig};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::str::FromStr;
use std::time::{Duration, SystemTime};
use typify::import_types;

use crate::impulse::operator::{ImpulseSourceFunc, ImpulseSourceState, ImpulseSpec};
use crate::{pull_opt, source_field, ConnectionType, EmptyConfig};
use crate::{source_field, ConnectionType, EmptyConfig};

const TABLE_SCHEMA: &str = include_str!("./table.json");

Expand Down Expand Up @@ -99,24 +97,14 @@ impl Connector for ImpulseConnector {
fn from_options(
&self,
name: &str,
options: &mut HashMap<String, String>,
options: &mut ConnectorOptions,
schema: Option<&ConnectionSchema>,
_profile: Option<&ConnectionProfile>,
) -> anyhow::Result<Connection> {
let event_rate = f64::from_str(&pull_opt("event_rate", options)?)
.map_err(|_| anyhow!("invalid value for event_rate; expected float"))?;

let event_time_interval: Option<i64> = options
.remove("event_time_interval")
.map(|t| i64::from_str(&t))
.transpose()
.map_err(|_| anyhow!("invalid value for event_time_interval; expected integer"))?;

let message_count: Option<i64> = options
.remove("message_count")
.map(|t| i64::from_str(&t))
.transpose()
.map_err(|_| anyhow!("invalid value for message count; expected integer"))?;
let event_rate = options.pull_f64("event_rate")?;

let event_time_interval = options.pull_opt_i64("event_time_interval")?;
let message_count = options.pull_opt_i64("message_count")?;

// validate the schema
if let Some(s) = schema {
Expand Down
Loading

0 comments on commit c4014a4

Please sign in to comment.