Skip to content

Commit

Permalink
use rayon for multithreading (#9)
Browse files Browse the repository at this point in the history
* use rayon to parallelise

* use jemallocator to benefit from parallelisation on Linux

* unused import

* parallelise production
  • Loading branch information
hgrsd authored Apr 4, 2024
1 parent e5d21fc commit 68a1e1c
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 15 deletions.
75 changes: 74 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "drivel"
description = "Infer a schema from JSON input, and generate synthetic data based on the inferred schema."
license = "MIT"
authors = ["Daniël Hogers <[email protected]>"]
version = "0.1.13"
version = "0.1.14"
edition = "2021"
repository = "https://github.com/hgrsd/drivel"

Expand All @@ -14,8 +14,10 @@ chrono = "0.4.37"
clap = { version = "4.5.4", features = ["derive"] }
email_address = "0.2.4"
fake = { version = "2.9.2", features = ["chrono"] }
jemallocator = "0.5.4"
lazy_static = "1.4.0"
rand = "0.8.1"
rayon = "1.10.0"
regex = "1.8.1"
serde = { version = "1.0.197", features = ["derive"] }
serde_json = "1.0.115"
Expand Down
18 changes: 11 additions & 7 deletions src/infer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{infer_string::infer_string_type, NumberType, SchemaState, StringType};
use rayon::prelude::*;

fn min<T: PartialOrd>(left: T, right: T) -> T {
if left < right {
Expand All @@ -18,8 +19,10 @@ fn max<T: PartialOrd>(left: T, right: T) -> T {

fn merge(initial: SchemaState, new: SchemaState) -> SchemaState {
match (initial, new) {
(SchemaState::Initial, new) => new,
(SchemaState::Indefinite, s) | (s, SchemaState::Indefinite) => s,
(SchemaState::Initial, s)
| (s, SchemaState::Initial)
| (SchemaState::Indefinite, s)
| (s, SchemaState::Indefinite) => s,

// --- String merging ---
(
Expand Down Expand Up @@ -322,7 +325,7 @@ pub fn infer_schema(json: serde_json::Value) -> SchemaState {
serde_json::Value::Array(array) => SchemaState::Array {
min_length: array.len(),
max_length: array.len(),
schema: Box::new(infer_schema_from_iter(array.into_iter())),
schema: Box::new(infer_schema_from_iter(array)),
},
serde_json::Value::Object(object) => SchemaState::Object {
required: object
Expand Down Expand Up @@ -362,7 +365,7 @@ pub fn infer_schema(json: serde_json::Value) -> SchemaState {
/// ];
///
/// // Infer the schema from the iterator of JSON values
/// let schema = infer_schema_from_iter(values.into_iter());
/// let schema = infer_schema_from_iter(values);
///
/// assert_eq!(
/// schema,
Expand All @@ -380,10 +383,11 @@ pub fn infer_schema(json: serde_json::Value) -> SchemaState {
/// }
/// );
/// ```
pub fn infer_schema_from_iter<'a>(values: impl Iterator<Item = serde_json::Value>) -> SchemaState {
pub fn infer_schema_from_iter(values: Vec<serde_json::Value>) -> SchemaState {
values
.into_par_iter()
.map(|value| infer_schema(value))
.fold(SchemaState::Initial, merge)
.reduce(|| SchemaState::Initial, |left, right| merge(left, right))
}

#[cfg(test)]
Expand Down Expand Up @@ -807,7 +811,7 @@ mod tests {
"qux": true
}),
];
let schema = infer_schema_from_iter(input.into_iter());
let schema = infer_schema_from_iter(input);
assert_eq!(
schema,
SchemaState::Object {
Expand Down
17 changes: 11 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use clap::{Parser, Subcommand};
use drivel::SchemaState;
use jemallocator::Jemalloc;

#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;

#[derive(Subcommand, Debug)]
enum Mode {
Expand All @@ -22,17 +26,18 @@ struct Args {

fn main() {
let args = Args::parse();

let input = std::io::read_to_string(std::io::stdin()).expect("Unable to read from stdin");

let schema = if let Ok(json) = serde_json::from_str(&input) {
drivel::infer_schema(json)
} else {
// unable to parse input as JSON; try JSON lines format as fallback
let values = input.lines().map(|line| {
serde_json::from_str(line)
.expect("Unable to parse input; format is neither JSON nor JSON lines")
});
let values = input
.lines()
.map(|line| {
serde_json::from_str(line)
.expect("Unable to parse input; format is neither JSON nor JSON lines")
})
.collect();
drivel::infer_schema_from_iter(values)
};

Expand Down
2 changes: 2 additions & 0 deletions src/produce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use fake::{
Fake, Faker,
};
use rand::{random, thread_rng, Rng};
use rayon::prelude::*;
use serde_json::Number;

use crate::{NumberType, SchemaState, StringType};
Expand Down Expand Up @@ -133,6 +134,7 @@ fn produce_inner(schema: &SchemaState, repeat_n: usize, current_depth: usize) ->
};

let data: Vec<_> = (0..n_elements)
.into_par_iter()
.map(|_| produce_inner(schema, repeat_n, current_depth + 1))
.collect();
serde_json::Value::Array(data)
Expand Down

0 comments on commit 68a1e1c

Please sign in to comment.