Skip to content

Commit

Permalink
chore: update deps and improve readability of Cargo.toml (parseable…
Browse files Browse the repository at this point in the history
…hq#1075)

* chore: update datafusion to v44.0.0

* chore: update `object_store` to v0.11.2

Fixes included are mentioned in https://github.com/alamb/arrow-rs/blob/de87e2e7c0c9f4e3f8fe120c803ee3c0cb38f1d4/object_store/CHANGELOG.md

* style: organize deps better for maintainability

* style: fmt dep

* ci: use compatible version of rust

* chore: up MSRV to 1.83.0

---------

Signed-off-by: Nitish Tiwari <[email protected]>
Co-authored-by: Nitish Tiwari <[email protected]>
  • Loading branch information
2 people authored and parmesant committed Jan 13, 2025
1 parent a7289df commit c3d8861
Show file tree
Hide file tree
Showing 10 changed files with 369 additions and 230 deletions.
408 changes: 259 additions & 149 deletions Cargo.lock

Large diffs are not rendered by default.

142 changes: 79 additions & 63 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,85 @@ name = "parseable"
version = "1.7.0"
authors = ["Parseable Team <[email protected]>"]
edition = "2021"
rust-version = "1.77.1"
rust-version = "1.83.0"
categories = ["logging", "observability", "log analytics"]
build = "build.rs"

[dependencies]
### apache arrow/datafusion dependencies
arrow = "53.0.0"
arrow-schema = { version = "53.0.0", features = ["serde"] }
# Arrow and DataFusion ecosystem
arrow-array = { version = "53.0.0" }
arrow-json = "53.0.0"
arrow-flight = { version = "53.0.0", features = ["tls"] }
arrow-ipc = { version = "53.0.0", features = ["zstd"] }
arrow-json = "53.0.0"
arrow-schema = { version = "53.0.0", features = ["serde"] }
arrow-select = "53.0.0"
datafusion = "42.0.0"
object_store = { version = "0.11.1", features = ["cloud", "aws", "azure"] }
datafusion = "44.0.0"
object_store = { version = "0.11.2", features = ["cloud", "aws", "azure"] }
parquet = "53.0.0"
arrow-flight = { version = "53.0.0", features = ["tls"] }
tonic = { version = "0.12.3", features = ["tls", "transport", "gzip", "zstd"] }
tonic-web = "0.12.3"
tower-http = { version = "0.6.1", features = ["cors"] }

### actix dependencies
actix-web-httpauth = "0.8"
actix-web = { version = "4.9.0", features = ["rustls-0_22"] }
# Web server and HTTP-related
actix-cors = "0.7.0"
actix-web = { version = "4.9.0", features = ["rustls-0_22"] }
actix-web-httpauth = "0.8"
actix-web-prometheus = { version = "0.1" }
actix-web-static-files = "4.0"
http = "0.2.7"
http-auth-basic = "0.3.3"
mime = "0.3.17"
tonic = { version = "0.12.3", features = ["tls", "transport", "gzip", "zstd"] }
tonic-web = "0.12.3"
tower-http = { version = "0.6.1", features = ["cors"] }
url = "2.4.0"

### other dependencies
anyhow = { version = "1.0", features = ["backtrace"] }
# Authentication and Security
argon2 = "0.5.0"
async-trait = "0.1.82"
base64 = "0.22.0"
lazy_static = "1.4"
bytes = "1.4"
byteorder = "1.4.3"
bzip2 = { version = "*", features = ["static"] }
cookie = "0.18.1"
hex = "0.4"
openid = { version = "0.15.0", default-features = false, features = ["rustls"] }
rustls = "0.22.4"
rustls-pemfile = "2.1.2"
sha2 = "0.10.8"

# Serialization and Data Formats
byteorder = "1.4.3"
prost = "0.13.3"
serde = { version = "1.0", features = ["rc", "derive"] }
serde_json = "1.0"
serde_repr = "0.1.17"

# Async and Runtime
async-trait = "0.1.82"
futures = "0.3"
futures-util = "0.3.28"
tokio = { version = "1.28", default-features = false, features = [
"sync",
"macros",
"fs",
] }
tokio-stream = { version = "0.1", features = ["fs"] }

# Logging and Metrics
opentelemetry-proto = { git = "https://github.com/parseablehq/opentelemetry-rust", branch = "fix-metrics-u64-serialization" }
prometheus = { version = "0.13", features = ["process"] }
prometheus-parse = "0.2.5"
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }

# Time and Date
chrono = "0.4"
chrono-humanize = "0.2"
humantime = "2.1.0"
humantime-serde = "1.1"

# File System and I/O
bzip2 = { version = "*", features = ["static"] }
fs_extra = "1.3"
path-clean = "1.0.1"
relative-path = { version = "1.7", features = ["serde"] }
xz2 = { version = "*", features = ["static"] }

# CLI and System
clap = { version = "4.1", default-features = false, features = [
"std",
"color",
Expand All @@ -52,73 +91,50 @@ clap = { version = "4.1", default-features = false, features = [
"cargo",
"error-context",
] }
clokwerk = "0.4"
crossterm = "0.28.1"
derive_more = "0.99.18"
fs_extra = "1.3"
futures = "0.3"
futures-util = "0.3.28"
hex = "0.4"
hostname = "0.4.0"
http = "0.2.7"
humantime-serde = "1.1"
itertools = "0.13.0"
human-size = "0.4"
num_cpus = "1.15"
sysinfo = "0.31.4"
thread-priority = "1.0.0"
uptime_lib = "0.3.0"

# Kafka
rdkafka = { version = "0.36.2", default-features = false, features = ["tokio"] }

# Utility Libraries
anyhow = { version = "1.0", features = ["backtrace"] }
bytes = "1.4"
clokwerk = "0.4"
derive_more = "0.99.18"
hashlru = { version = "0.11.0", features = ["serde"] }
itertools = "0.13.0"
lazy_static = "1.4"
nom = "7.1.3"
once_cell = "1.17.1"
opentelemetry-proto = {git = "https://github.com/parseablehq/opentelemetry-rust", branch="fix-metrics-u64-serialization"}
prometheus = { version = "0.13", features = ["process"] }
rand = "0.8.5"
regex = "1.7.3"
relative-path = { version = "1.7", features = ["serde"] }
reqwest = { version = "0.11.27", default-features = false, features = [
"rustls-tls",
"json",
"gzip",
"brotli",
] } # cannot update cause rustls is not latest `see rustls`
rustls = "0.22.4" # cannot update to 0.23 actix has not caught up yet
rustls-pemfile = "2.1.2"
semver = "1.0"
serde = { version = "1.0", features = ["rc", "derive"] }
serde_json = "1.0"
static-files = "0.2"
sysinfo = "0.31.4"
thiserror = "2.0.0"
thread-priority = "1.0.0"
tokio = { version = "1.28", default-features = false, features = [
"sync",
"macros",
"fs",
] }
tokio-stream = { version = "0.1", features = ["fs"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
ulid = { version = "1.0", features = ["serde"] }
uptime_lib = "0.3.0"
xxhash-rust = { version = "0.8", features = ["xxh3"] }
xz2 = { version = "*", features = ["static"] }
nom = "7.1.3"
humantime = "2.1.0"
human-size = "0.4"
openid = { version = "0.15.0", default-features = false, features = ["rustls"] }
url = "2.4.0"
http-auth-basic = "0.3.3"
serde_repr = "0.1.17"
hashlru = { version = "0.11.0", features = ["serde"] }
path-clean = "1.0.1"
prost = "0.13.3"
prometheus-parse = "0.2.5"
sha2 = "0.10.8"
tracing = "0.1.41"

[build-dependencies]
cargo_toml = "0.20.1"
prost-build = "0.13.3"
sha1_smol = { version = "1.0", features = ["std"] }
static-files = "0.2"
ureq = "2.6"
url = "2.4.0"
vergen = { version = "8.1", features = ["build", "git", "cargo", "gitcl"] }
zip = { version = "2.2.0", default-features = false, features = ["deflate"] }
url = "2.4.0"
prost-build = "0.13.3"

[dev-dependencies]
maplit = "1.0"
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

# build stage
FROM rust:1.77.1-bookworm as builder
FROM rust:1.83.0-bookworm as builder

LABEL org.opencontainers.image.title="Parseable"
LABEL maintainer="Parseable Team <[email protected]>"
Expand Down
3 changes: 1 addition & 2 deletions src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion::error::DataFusionError;
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::SessionStateBuilder;
use datafusion::logical_expr::{Explain, Filter, LogicalPlan, PlanType, ToStringifiedPlan};
use datafusion::prelude::*;
Expand Down Expand Up @@ -77,7 +76,7 @@ impl Query {
};

let runtime_config = runtime_config.with_memory_limit(pool_size, fraction);
let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap());
let runtime = Arc::new(runtime_config.build().unwrap());

let mut config = SessionConfig::default()
.with_parquet_pruning(true)
Expand Down
4 changes: 3 additions & 1 deletion src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use chrono::{DateTime, NaiveDateTime, Timelike, Utc};
use datafusion::catalog::Session;
use datafusion::common::stats::Precision;
use datafusion::logical_expr::utils::conjunction;
use datafusion::physical_expr::LexOrdering;
use datafusion::{
catalog::SchemaProvider,
common::{
Expand Down Expand Up @@ -73,6 +74,7 @@ use super::listing_table_builder::ListingTableBuilder;
use crate::catalog::Snapshot as CatalogSnapshot;

// schema provider for stream based on global data
#[derive(Debug)]
pub struct GlobalSchemaProvider {
pub storage: Arc<dyn ObjectStorage>,
}
Expand Down Expand Up @@ -159,7 +161,7 @@ impl StandardTableProvider {
statistics,
projection: projection.cloned(),
limit,
output_ordering: vec![vec![sort_expr]],
output_ordering: vec![LexOrdering::from_iter([sort_expr])],
table_partition_cols: Vec::new(),
},
filters.as_ref(),
Expand Down
7 changes: 4 additions & 3 deletions src/storage/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use async_trait::async_trait;
use datafusion::datasource::object_store::{
DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl,
};
use datafusion::execution::runtime_env::RuntimeConfig;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder};
use object_store::{BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig};
use relative_path::{RelativePath, RelativePathBuf};
Expand Down Expand Up @@ -150,7 +150,7 @@ impl AzureBlobConfig {
}

impl ObjectStorageProvider for AzureBlobConfig {
fn get_datafusion_runtime(&self) -> RuntimeConfig {
fn get_datafusion_runtime(&self) -> RuntimeEnvBuilder {
let azure = self.get_default_builder().build().unwrap();
// limit objectstore to a concurrent request limit
let azure = LimitStore::new(azure, super::MAX_OBJECT_STORE_REQUESTS);
Expand All @@ -161,7 +161,7 @@ impl ObjectStorageProvider for AzureBlobConfig {
.unwrap();
object_store_registry.register_store(url.as_ref(), Arc::new(azure));

RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry))
RuntimeEnvBuilder::new().with_object_store_registry(Arc::new(object_store_registry))
}

fn construct_client(&self) -> Arc<dyn super::ObjectStorage> {
Expand Down Expand Up @@ -191,6 +191,7 @@ pub fn to_object_store_path(path: &RelativePath) -> StorePath {

// ObjStoreClient is generic client to enable interactions with different cloudprovider's
// object store such as S3 and Azure Blob
#[derive(Debug)]
pub struct BlobStore {
client: LimitStore<MicrosoftAzure>,
account: String,
Expand Down
7 changes: 4 additions & 3 deletions src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::{

use async_trait::async_trait;
use bytes::Bytes;
use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeConfig};
use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder};
use fs_extra::file::CopyOptions;
use futures::{stream::FuturesUnordered, TryStreamExt};
use relative_path::{RelativePath, RelativePathBuf};
Expand Down Expand Up @@ -64,8 +64,8 @@ pub struct FSConfig {
}

impl ObjectStorageProvider for FSConfig {
fn get_datafusion_runtime(&self) -> RuntimeConfig {
RuntimeConfig::new()
fn get_datafusion_runtime(&self) -> RuntimeEnvBuilder {
RuntimeEnvBuilder::new()
}

fn construct_client(&self) -> Arc<dyn ObjectStorage> {
Expand All @@ -81,6 +81,7 @@ impl ObjectStorageProvider for FSConfig {
}
}

#[derive(Debug)]
pub struct LocalFS {
// absolute path of the data directory
root: PathBuf,
Expand Down
7 changes: 4 additions & 3 deletions src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ use arrow_schema::Schema;
use async_trait::async_trait;
use bytes::Bytes;
use chrono::Local;
use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeConfig};
use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder};
use once_cell::sync::OnceCell;
use relative_path::RelativePath;
use relative_path::RelativePathBuf;
use tracing::error;

use std::collections::BTreeMap;
use std::fmt::Debug;
use std::num::NonZeroU32;
use std::{
collections::HashMap,
Expand All @@ -64,7 +65,7 @@ use std::{
};

pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug + Send + Sync {
fn get_datafusion_runtime(&self) -> RuntimeConfig;
fn get_datafusion_runtime(&self) -> RuntimeEnvBuilder;
fn construct_client(&self) -> Arc<dyn ObjectStorage>;
fn get_object_store(&self) -> Arc<dyn ObjectStorage> {
static STORE: OnceCell<Arc<dyn ObjectStorage>> = OnceCell::new();
Expand All @@ -76,7 +77,7 @@ pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug + Send + Sync
}

#[async_trait]
pub trait ObjectStorage: Send + Sync + 'static {
pub trait ObjectStorage: Debug + Send + Sync + 'static {
async fn get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError>;
// TODO: make the filter function optional as we may want to get all objects
async fn get_objects(
Expand Down
7 changes: 4 additions & 3 deletions src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use datafusion::datasource::listing::ListingTableUrl;
use datafusion::datasource::object_store::{
DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl,
};
use datafusion::execution::runtime_env::RuntimeConfig;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryStreamExt};
use object_store::aws::{AmazonS3, AmazonS3Builder, AmazonS3ConfigKey, Checksum};
Expand Down Expand Up @@ -285,7 +285,7 @@ impl S3Config {
}

impl ObjectStorageProvider for S3Config {
fn get_datafusion_runtime(&self) -> RuntimeConfig {
fn get_datafusion_runtime(&self) -> RuntimeEnvBuilder {
let s3 = self.get_default_builder().build().unwrap();

// limit objectstore to a concurrent request limit
Expand All @@ -296,7 +296,7 @@ impl ObjectStorageProvider for S3Config {
let url = ObjectStoreUrl::parse(format!("s3://{}", &self.bucket_name)).unwrap();
object_store_registry.register_store(url.as_ref(), Arc::new(s3));

RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry))
RuntimeEnvBuilder::new().with_object_store_registry(Arc::new(object_store_registry))
}

fn construct_client(&self) -> Arc<dyn ObjectStorage> {
Expand Down Expand Up @@ -325,6 +325,7 @@ fn to_object_store_path(path: &RelativePath) -> StorePath {
StorePath::from(path.as_str())
}

#[derive(Debug)]
pub struct S3 {
client: LimitStore<AmazonS3>,
bucket: String,
Expand Down
Loading

0 comments on commit c3d8861

Please sign in to comment.