Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 165 additions & 5 deletions parquet-variant-compute/benches/variant_kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,21 @@

use arrow::array::{Array, ArrayRef, StringArray};
use arrow::util::test_util::seedable_rng;
use arrow_schema::{DataType, Field, Fields};
use chrono::{DateTime, Utc};
use criterion::{Criterion, criterion_group, criterion_main};
use parquet_variant::{Variant, VariantBuilder};
use parquet_variant::{Uuid, Variant, VariantBuilder, VariantBuilderExt};
use parquet_variant_compute::{
GetOptions, VariantArray, VariantArrayBuilder, json_to_variant, variant_get,
GetOptions, VariantArray, VariantArrayBuilder, json_to_variant, shred_variant, variant_get,
};
use rand::Rng;
use rand::SeedableRng;
use rand::distr::Alphanumeric;
use rand::rngs::StdRng;
use std::fmt::Write;
use std::sync::Arc;
fn benchmark_batch_json_string_to_variant(c: &mut Criterion) {

fn json_to_variant_bench(c: &mut Criterion) {
let input_array = StringArray::from_iter_values(json_repeated_struct(8000));
let array_ref: ArrayRef = Arc::new(input_array);
c.bench_function(
Expand Down Expand Up @@ -94,14 +97,58 @@ pub fn variant_get_bench(c: &mut Criterion) {
};

c.bench_function("variant_get_primitive", |b| {
b.iter(|| variant_get(&input.clone(), options.clone()))
b.iter(|| variant_get(&input, options.clone()))
});
}

pub fn shred_variant_bench(c: &mut Criterion) {
// This benchmark models shredding semi-structured log entries
// where each entry has a common set of fields, some optional fields,
// and some random extra fields.
//
let mut generator = VariantLogGenerator {
rows_per_batch: 8192,
optional_field_prob: 0.0,
extra_field_prob: 0.0,
rng: StdRng::seed_from_u64(42),
};

// shred out the common and optional fields
// leaving the rest in the value field
let shredding_schema = DataType::Struct(Fields::from(vec![
Field::new("timestamp", DataType::Utf8, true),
Field::new("level", DataType::Utf8, true),
Field::new("message", DataType::Utf8, true),
Field::new("user_id", DataType::Int64, true),
Field::new("session_id", DataType::Utf8, true),
]));

// Variants have only required fields
let variant_array = generator.next().unwrap();
c.bench_function("shred_variant common fields", |b| {
b.iter(|| shred_variant(&variant_array, &shredding_schema))
});

// Variants with some optional fields
generator.optional_field_prob = 0.5;
let variant_array = generator.next().unwrap();
c.bench_function("shred_variant optional fields", |b| {
b.iter(|| shred_variant(&variant_array, &shredding_schema))
});

// Variants with optional fields and random extra fields
generator.extra_field_prob = 0.7;
let variant_array = generator.next().unwrap();
c.bench_function("shred_variant extra fields", |b| {
b.iter(|| shred_variant(&variant_array, &shredding_schema))
});
}

criterion_group!(
benches,
variant_get_bench,
benchmark_batch_json_string_to_variant
shred_variant_bench,
json_to_variant_bench
);
criterion_main!(benches);

Expand Down Expand Up @@ -362,3 +409,116 @@ impl RandomJsonGenerator {
panic!("Random value did not match any type");
}
}

/// Data generator for VariantArrays that simulate structured log entries.
///
/// Each entry is an object with
/// 1. fields like "timestamp", "level", "message", that always appear
/// 2. Fields like "user_id", "session_id", that appear in some entries
/// 3. Arbitrary extra fields that should be preserved in the value field
///
///
/// Example entries:
/// ```json
/// {
/// "timestamp": "2024-10-01T12:00:00", -- always present
/// "level": "INFO", -- always present
/// "message": "User logged in", -- always present
/// "user_id": 12345, -- optional
/// "session_id": "abcde", -- optional
/// "extra_field_123": "extra_value_456" -- arbitrary extra field
/// }
/// ```
struct VariantLogGenerator {
/// How many rows per batch
rows_per_batch: usize,
/// Probability of including optional fields (0 to 1)
optional_field_prob: f64,
/// Probability of including extra arbitrary fields (0 to 1)
extra_field_prob: f64,
/// Random number generator
rng: StdRng,
}

impl Iterator for VariantLogGenerator {
type Item = VariantArray;

fn next(&mut self) -> Option<Self::Item> {
Some(self.next_array())
}
}

impl VariantLogGenerator {
fn next_array(&mut self) -> VariantArray {
let mut builder = VariantArrayBuilder::new(1000);
for _ in 0..self.rows_per_batch {
let mut obj_builder = builder.new_object();

obj_builder = obj_builder
.with_field("timestamp", self.random_timestamp())
.with_field("level", self.random_level())
.with_field("message", self.random_message());

// Optional fields
if self.rng.random::<f64>() < self.optional_field_prob {
obj_builder = obj_builder.with_field("user_id", self.rng.random_range(1000..9999));
}
if self.rng.random::<f64>() < self.optional_field_prob {
obj_builder = obj_builder.with_field("session_id", Uuid::new_v4());
}

// Random extra fields
if self.rng.random::<f64>() < self.extra_field_prob {
let num_extra_fields = self.rng.random_range(1..4);
for _ in 0..num_extra_fields {
// totally random field name and value (modeling random logging payloads)
let field_name = format!("extra_field_{}", self.rng.random_range(1..100000000));
let field_value =
format!("extra_value_{}", self.rng.random_range(1..100000000));
obj_builder = obj_builder.with_field(field_name.as_str(), field_value.as_str());
}
}

obj_builder.finish();
}
builder.build()
}

fn random_timestamp(&mut self) -> DateTime<Utc> {
// random timestamp
let hour = self.rng.random_range(0..24);
let minute = self.rng.random_range(0..60);
let second = self.rng.random_range(0..60);
let day = self.rng.random_range(1..28);
let month = self.rng.random_range(1..=12);
let year = self.rng.random_range(2020..=2024);
let naive = chrono::NaiveDate::from_ymd_opt(year, month, day)
.unwrap()
.and_hms_opt(hour, minute, second)
.unwrap();
DateTime::from_naive_utc_and_offset(naive, Utc)
}

/// Random level from ["DEBUG", "INFO", "WARN", "ERROR"]
fn random_level(&mut self) -> &'static str {
let levels = ["DEBUG", "INFO", "WARN", "ERROR"];
levels[self.rng.random_range(0..levels.len())]
}

/// Generate a random log message
fn random_message(&mut self) -> &str {
let messages = [
"User logged in",
"User logged out",
"File not found",
"Connection established",
"Error processing request",
// a few longer messages
"Database connection timed out after multiple attempts",
"User attempted to access restricted resource without proper authorization",
"Scheduled maintenance will occur at midnight UTC",
];

messages[self.rng.random_range(0..messages.len())]
}
}
Loading