diff --git a/Cargo.lock b/Cargo.lock index f5f4735..ac79704 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1599,9 +1599,12 @@ dependencies = [ [[package]] name = "parquet2json" -version = "3.0.0" +version = "3.1.0" dependencies = [ + "arrow-array", + "arrow-cast", "arrow-json", + "arrow-schema", "aws-config", "aws-types", "clap", diff --git a/Cargo.toml b/Cargo.toml index 7c32e90..aedfaa6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,14 +1,17 @@ [package] name = "parquet2json" description = "A command-line tool for streaming Parquet as line-delimited JSON" -version = "3.0.0" +version = "3.1.0" edition = "2021" license = "MIT" authors = ["Pieter Raubenheimer "] repository = "https://github.com/jupiter/parquet2json" [dependencies] +arrow-array = { version = "51.0.0" } +arrow-cast = { version = "51.0.0" } arrow-json = { version = "51.0.0" } +arrow-schema = { version = "51.0.0" } aws-config = { version = "1.1.8" } aws-types = { version = "1.1.8" } clap = { version = "4.5.4", features = ["derive"] } diff --git a/README.md b/README.md index 1f2d302..e0127eb 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,7 @@ Options: -o, --offset Starts outputting from this row (first row: 0, last row: -1) [default: 0] -l, --limit Maximum number of rows to output -c, --columns Select columns by name (comma,separated,?prefixed_optional) + -n, --nulls Outputs null values -h, --help Print help ``` @@ -59,11 +60,11 @@ $ parquet2json ./myfile.parquet cat > output.jsonl #### From S3 or HTTP (S3) ```shell -$ parquet2json s3://noaa-ghcn-pds/parquet/by_year/YEAR=2024/ELEMENT=AWDR/83d5f1ef1edf4e3c838b15f56f07dc02_0.snappy.parquet cat +$ parquet2json s3://overturemaps-us-west-2/release/2024-03-12-alpha.0/theme=base/type=land/part-00001-10ae8a61-702e-480f-9024-6dee4abd93df-c000.zstd.parquet cat ``` ```shell -$ parquet2json https://noaa-ghcn-pds.s3.amazonaws.com/parquet/by_year/YEAR%3D2024/ELEMENT%3DAWDR/83d5f1ef1edf4e3c838b15f56f07dc02_0.snappy.parquet cat +$ parquet2json https://overturemaps-us-west-2.s3.us-west-2.amazonaws.com/release/2024-03-12-alpha.0/theme%3Dbase/type%3Dland/part-00001-10ae8a61-702e-480f-9024-6dee4abd93df-c000.zstd.parquet cat ``` #### Filter selected columns with jq diff --git a/src/cast.rs b/src/cast.rs new file mode 100644 index 0000000..56faa68 --- /dev/null +++ b/src/cast.rs @@ -0,0 +1,16 @@ +use std::sync::Arc; + +use arrow_array::{types::GenericBinaryType, Array, ArrayRef, GenericByteArray, OffsetSizeTrait}; +use arrow_cast::base64::{b64_encode, BASE64_STANDARD}; +use arrow_schema::ArrowError; + +pub fn cast_binary_to_string( + array: &dyn Array, +) -> Result { + let array = array + .as_any() + .downcast_ref::>>() + .unwrap(); + + Ok(Arc::new(b64_encode(&BASE64_STANDARD, array))) +} diff --git a/src/main.rs b/src/main.rs index b0d79fa..eae4fd6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,16 @@ use std::ops::Add; use std::sync::Arc; -use arrow_json::writer::LineDelimitedWriter; +use arrow_array::{Array, RecordBatch}; +use arrow_cast::display::FormatOptions; +use arrow_cast::{cast_with_options, CastOptions}; +use arrow_json::writer::LineDelimited; +use arrow_json::WriterBuilder; +use arrow_schema::{DataType, Field, SchemaBuilder}; use aws_config::profile::load; use aws_config::profile::profile_file::ProfileFiles; use aws_types::os_shim_internal::{Env, Fs}; +use cast::cast_binary_to_string; use clap::{Parser, Subcommand}; use object_store::aws::AmazonS3Builder; use object_store::http::HttpBuilder; @@ -19,6 +25,8 @@ use tokio_stream::StreamExt; use url::Url; use urlencoding::decode; +mod cast; + #[derive(Parser, Clone)] #[clap(version, about, long_about = None)] struct Cli { @@ -44,6 +52,10 @@ enum Commands { /// Select columns by name (comma,separated,?prefixed_optional) #[clap(short, long)] columns: Option, + + /// Outputs null values + #[clap(short, long)] + nulls: bool, }, /// Outputs the Thrift schema @@ -67,6 +79,7 @@ async fn output_for_command(mut reader: ParquetObjectReader, command: &Commands) offset, limit, columns, + nulls, } => { let absolute_offset: usize = if offset.is_negative() { parquet_metadata @@ -115,10 +128,73 @@ async fn output_for_command(mut reader: ParquetObjectReader, command: &Commands) } let mut iter = async_reader_builder.build().unwrap(); - let mut json_writer = LineDelimitedWriter::new(std::io::stdout()); - while let Some(Ok(batch)) = iter.next().await { - let _ = json_writer.write(&batch); + let builder = WriterBuilder::new().with_explicit_nulls(*nulls); + let mut json_writer = builder.build::<_, LineDelimited>(std::io::stdout()); + + while let Some(rbt) = iter.next().await { + match rbt { + Ok(batch) => { + let schema = batch.schema(); + let json_batch = if schema.fields.iter().any(|field| { + matches!( + field.data_type(), + DataType::Binary + | DataType::Decimal128(_, _) + | DataType::Decimal256(_, _) + ) + }) { + let mut columns: Vec> = vec![]; + let mut builder = SchemaBuilder::new(); + schema + .fields + .iter() + .for_each(|field| match field.data_type() { + DataType::Binary => { + builder.push(Field::new( + field.name(), + DataType::Utf8, + field.is_nullable(), + )); + let column = batch.column_by_name(field.name()).unwrap(); + let new_column = + cast_binary_to_string::(column).unwrap(); + columns.push(new_column); + } + DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => { + builder.push(Field::new( + field.name(), + DataType::Utf8, + field.is_nullable(), + )); + let column = batch.column_by_name(field.name()).unwrap(); + let new_column = cast_with_options( + column, + &DataType::Utf8, + &CastOptions { + safe: false, + format_options: FormatOptions::default(), + }, + ) + .unwrap(); + columns.push(new_column); + } + _ => { + builder.push(field.clone()); + columns.push( + batch.column_by_name(field.name()).unwrap().clone(), + ); + } + }); + let schema = builder.finish(); + RecordBatch::try_new(schema.into(), columns).unwrap() + } else { + batch + }; + json_writer.write(&json_batch).unwrap(); + } + Err(e) => println!("{}", e), + }; } json_writer.finish().unwrap(); }