Skip to content

Commit

Permalink
Resolve RUSTSEC-2023-0086
Browse files Browse the repository at this point in the history
upgrade API breaking arrow dependencies

fix SessionStateBuilder deprecation warnings

replace closure with function

remove unused tonic-0-11

cargo update

upgrade arrow

upgrade arrow convert

This fixes #2004.
  • Loading branch information
aradwann authored and tillrohrmann committed Oct 7, 2024
1 parent 86e6966 commit b0f7090
Show file tree
Hide file tree
Showing 12 changed files with 496 additions and 760 deletions.
1,125 changes: 467 additions & 658 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 5 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ restate-worker = { path = "crates/worker" }
ahash = "0.8.5"
anyhow = "1.0.68"
arc-swap = "1.6"
arrow = { version = "52.0.0", default-features = false }
arrow-flight = { version = "52.0.0" }
arrow = { version = "53.1.0", default-features = false }
arrow-flight = { version = "53.1.0" }
assert2 = "0.3.11"
async-channel = "2.1.1"
async-trait = "0.1.73"
Expand All @@ -83,7 +83,7 @@ bitflags = { version = "2.6.0" }
bytes = { version = "1.7", features = ["serde"] }
bytes-utils = "0.1.3"
bytestring = { version = "1.2", features = ["serde"] }
chrono = { version = "0.4.31", default-features = false, features = ["clock"] }
chrono = { version = "0.4.38", default-features = false, features = ["clock"] }
comfy-table = { version = "7.1" }
chrono-humanize = { version = "0.2.3" }
clap = { version = "4", default-features = false }
Expand All @@ -92,13 +92,13 @@ cling = { version = "0.1", default-features = false, features = ["derive"] }
criterion = "0.5"
crossterm = { version = "0.27.0" }
dashmap = { version = "6" }
datafusion = { version = "40.0.0", default-features = false, features = [
datafusion = { version = "42.0.0", default-features = false, features = [
"crypto_expressions",
"encoding_expressions",
"regex_expressions",
"unicode_expressions",
] }
datafusion-expr = { version = "40.0.0" }
datafusion-expr = { version = "42.0.0" }
derive_builder = "0.20.0"
derive_more = { version = "1", features = ["full"] }
dialoguer = { version = "0.11.0" }
Expand Down Expand Up @@ -186,7 +186,6 @@ tonic = { version = "0.12.3", default-features = false }
tonic-reflection = { version = "0.12.3" }
tonic-health = { version = "0.12.3" }
tonic-build = { version = "0.12.3" }
tonic-0-11 = { package = "tonic", version = "0.11.0", default-features = false }
tower = "0.4"
tower-http = { version = "0.5.2", default-features = false }
tracing = "0.1"
Expand Down
4 changes: 2 additions & 2 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ restate-types = { workspace = true }

anyhow = { workspace = true }
arc-swap = { workspace = true }
arrow = { version = "51.0.0", features = ["ipc", "prettyprint", "json"] }
arrow_convert = { version = "0.6.6" }
arrow = { version = "53.1.0", features = ["ipc", "prettyprint", "json"] }
arrow_convert = { version = "0.7.2" }
axum = { workspace = true, default-features = false, features = ["http1", "http2", "query", "tokio"] }
bytes = { workspace = true }
base62 = { version = "2.0.2" }
Expand Down
1 change: 0 additions & 1 deletion crates/admin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ serde_with = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true, features = ["transport", "codegen", "prost", "gzip"] }
tonic-0-11 = { workspace = true }
tower = { workspace = true, features = ["load-shed", "limit"] }
tracing = { workspace = true }

Expand Down
42 changes: 1 addition & 41 deletions crates/admin/src/storage_query/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
// by the Apache License, Version 2.0.

use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::task::{Context, Poll};

Expand All @@ -36,8 +35,6 @@ use restate_core::network::protobuf::node_svc::StorageQueryRequest;
use schemars::JsonSchema;
use serde::Deserialize;
use serde_with::serde_as;
use tonic::metadata::{KeyAndValueRef, MetadataMap};
use tonic::Status;

use super::error::StorageQueryError;
use crate::state::QueryServiceState;
Expand Down Expand Up @@ -81,7 +78,7 @@ pub async fn query(
data_body: response.data,
..FlightData::default()
})
.map_err(|status| FlightError::from(tonic_status_012_to_011(status))),
.map_err(FlightError::from),
);

// create a stream without LargeUtf8 or LargeBinary columns as JS doesn't support these yet
Expand Down Expand Up @@ -270,40 +267,3 @@ impl Stream for ConvertRecordBatchStream {
}
}
}

// todo: Remove once arrow-flight works with tonic 0.12
fn tonic_status_012_to_011(status: Status) -> tonic_0_11::Status {
let code = tonic_0_11::Code::from(status.code() as i32);
let message = status.message().to_owned();
let details = Bytes::copy_from_slice(status.details());
let metadata = tonic_metadata_map_012_to_011(status.metadata());
tonic_0_11::Status::with_details_and_metadata(code, message, details, metadata)
}

// todo: Remove once arrow-flight works with tonic 0.12
fn tonic_metadata_map_012_to_011(metadata_map: &MetadataMap) -> tonic_0_11::metadata::MetadataMap {
let mut resulting_metadata_map =
tonic_0_11::metadata::MetadataMap::with_capacity(metadata_map.len());
for key_value in metadata_map.iter() {
match key_value {
KeyAndValueRef::Ascii(key, value) => {
// ignore metadata map entries if conversion fails
if let Ok(value) =
tonic_0_11::metadata::MetadataValue::from_str(value.to_str().unwrap_or(""))
{
if let Ok(key) = tonic_0_11::metadata::MetadataKey::from_str(key.as_str()) {
resulting_metadata_map.insert(key, value);
}
}
}
KeyAndValueRef::Binary(key, value) => {
if let Ok(key) = tonic_0_11::metadata::MetadataKey::from_bytes(key.as_ref()) {
let value = tonic_0_11::metadata::MetadataValue::from_bytes(value.as_ref());
resulting_metadata_map.insert_bin(key, value);
}
}
}
}

resulting_metadata_map
}
2 changes: 1 addition & 1 deletion crates/errors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ include_doc = ["termimad"]
[dependencies]
codederror = { workspace = true }
paste = { workspace = true }
termimad = { version = "0.23", optional = true }
termimad = { version = "0.30.0", optional = true }
tracing = { workspace = true }

[dev-dependencies]
Expand Down
1 change: 0 additions & 1 deletion crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ tokio-stream = { workspace = true }
tokio-util = { workspace = true }
tonic = { workspace = true }
tonic-reflection = { workspace = true }
tonic-0-11 = { workspace = true }
tower = { workspace = true }
tower-http = { workspace = true, features = ["trace"] }
tracing = { workspace = true }
Expand Down
41 changes: 2 additions & 39 deletions crates/node/src/network_server/handler/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

use arrow_flight::encode::FlightDataEncoderBuilder;
use arrow_flight::error::FlightError;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::TryStreamExt;
use restate_core::network::protobuf::node_svc::node_svc_server::NodeSvc;
Expand All @@ -21,10 +20,8 @@ use restate_core::network::{ConnectionManager, GrpcConnector};
use restate_core::{metadata, TaskCenter};
use restate_types::protobuf::common::NodeStatus;
use restate_types::protobuf::node::Message;
use std::str::FromStr;
use tokio_stream::StreamExt;
use tonic::metadata::{MetadataKey, MetadataMap, MetadataValue};
use tonic::{Code, Request, Response, Status, Streaming};
use tonic::{Request, Response, Status, Streaming};

use crate::network_server::WorkerDependencies;

Expand Down Expand Up @@ -133,43 +130,9 @@ fn flight_error_to_tonic_status(err: FlightError) -> Status {
match err {
FlightError::Arrow(e) => Status::internal(e.to_string()),
FlightError::NotYetImplemented(e) => Status::internal(e),
FlightError::Tonic(status) => tonic_status_010_to_012(status),
FlightError::Tonic(status) => status,
FlightError::ProtocolError(e) => Status::internal(e),
FlightError::DecodeError(e) => Status::internal(e),
FlightError::ExternalError(e) => Status::internal(e.to_string()),
}
}

// todo: Remove once arrow-flight works with tonic 0.12
fn tonic_status_010_to_012(status: tonic_0_11::Status) -> Status {
let code = Code::from(status.code() as i32);
let message = status.message().to_owned();
let details = Bytes::copy_from_slice(status.details());
let metadata = tonic_metadata_map_010_to_012(status.metadata());
Status::with_details_and_metadata(code, message, details, metadata)
}

// todo: Remove once arrow-flight works with tonic 0.12
fn tonic_metadata_map_010_to_012(metadata_map: &tonic_0_11::metadata::MetadataMap) -> MetadataMap {
let mut resulting_metadata_map = MetadataMap::with_capacity(metadata_map.len());
for key_value in metadata_map.iter() {
match key_value {
tonic_0_11::metadata::KeyAndValueRef::Ascii(key, value) => {
// ignore metadata map entries if conversion fails
if let Ok(value) = MetadataValue::from_str(value.to_str().unwrap_or("")) {
if let Ok(key) = MetadataKey::from_str(key.as_str()) {
resulting_metadata_map.insert(key, value);
}
}
}
tonic_0_11::metadata::KeyAndValueRef::Binary(key, value) => {
if let Ok(key) = MetadataKey::from_bytes(key.as_ref()) {
let value = MetadataValue::from_bytes(value.as_ref());
resulting_metadata_map.insert_bin(key, value);
}
}
}
}

resulting_metadata_map
}
2 changes: 1 addition & 1 deletion crates/storage-query-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ ahash = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
bytestring = { workspace = true }
chrono = { version = "0.4.26", default-features = false, features = ["clock"] }
chrono = { workspace = true }
codederror = { workspace = true }
datafusion = { workspace = true }
derive_more = { workspace = true }
Expand Down
20 changes: 14 additions & 6 deletions crates/storage-query-datafusion/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ use std::sync::Arc;
use async_trait::async_trait;
use codederror::CodedError;
use datafusion::error::DataFusionError;
use datafusion::execution::context::{SQLOptions, SessionState};
use datafusion::execution::context::SQLOptions;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::prelude::{SessionConfig, SessionContext};

Expand Down Expand Up @@ -188,7 +190,10 @@ impl QueryContext {
//
// build the state
//
let mut state = SessionState::new_with_config_rt(session_config, runtime);
let mut state_builder = SessionStateBuilder::new()
.with_config(session_config)
.with_runtime_env(runtime)
.with_default_features();

// Rewrite the logical plan, to transparently add a 'partition_key' column to Join's
// To tables that have a partition key in their schema.
Expand All @@ -200,7 +205,7 @@ impl QueryContext {
// 'SELECT b.service_key FROM sys_invocation_status a JOIN state b on a.target_service_key = b.service_key AND a.partition_key = b.partition_key'
//
// This would be used by the SymmetricHashJoin as a watermark.
state.add_analyzer_rule(Arc::new(
state_builder = state_builder.with_analyzer_rule(Arc::new(
analyzer::UseSymmetricHashJoinWhenPartitionKeyIsPresent::new(),
));

Expand All @@ -219,10 +224,13 @@ impl QueryContext {
// A far more involved but potentially more robust solution would be wrap the SymmetricHashJoin in a ProjectionExec
// If this would become an issue for any reason, then we can explore that alternative.
//
let mut physical_optimizers = state.physical_optimizers().to_vec();
physical_optimizers.insert(0, Arc::new(physical_optimizer::JoinRewrite::new()));
let physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> =
vec![Arc::new(physical_optimizer::JoinRewrite::new())];

state_builder = state_builder.with_physical_optimizer_rules(physical_optimizers);

let state = state_builder.build();

state = state.with_physical_optimizer_rules(physical_optimizers);
let ctx = SessionContext::new_with_state(state);

let sql_options = SQLOptions::new()
Expand Down
6 changes: 3 additions & 3 deletions crates/storage-query-datafusion/src/table_providers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::common::DataFusionError;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::execution::context::{SessionState, TaskContext};
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::{
Expand Down Expand Up @@ -75,7 +75,7 @@ where

async fn scan(
&self,
_state: &SessionState,
_state: &(dyn datafusion::catalog::Session),
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
Expand Down Expand Up @@ -239,7 +239,7 @@ impl TableProvider for GenericTableProvider {

async fn scan(
&self,
_state: &SessionState,
_state: &(dyn datafusion::catalog::Session),
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
Expand Down
1 change: 0 additions & 1 deletion deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ version = 2
yanked = "deny"
ignore = [
{ id = "RUSTSEC-2024-0370", reason = "crate is unmaintained. This needs `arrow_convert` to use an alternative to `err-derive`" },
{ id = "RUSTSEC-2023-0086", reason = "lexical-core pending Arrow update https://github.com/restatedev/restate/issues/1966" }
]


Expand Down

0 comments on commit b0f7090

Please sign in to comment.