Skip to content

Commit

Permalink
v3.1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
jupiter committed Apr 5, 2024
1 parent 67e5cd4 commit 76e0fff
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 8 deletions.
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
[package]
name = "parquet2json"
description = "A command-line tool for streaming Parquet as line-delimited JSON"
version = "3.0.0"
version = "3.1.0"
edition = "2021"
license = "MIT"
authors = ["Pieter Raubenheimer <[email protected]>"]
repository = "https://github.com/jupiter/parquet2json"

[dependencies]
arrow-array = { version = "51.0.0" }
arrow-cast = { version = "51.0.0" }
arrow-json = { version = "51.0.0" }
arrow-schema = { version = "51.0.0" }
aws-config = { version = "1.1.8" }
aws-types = { version = "1.1.8" }
clap = { version = "4.5.4", features = ["derive"] }
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Options:
-o, --offset <OFFSET> Starts outputting from this row (first row: 0, last row: -1) [default: 0]
-l, --limit <LIMIT> Maximum number of rows to output
-c, --columns <COLUMNS> Select columns by name (comma,separated,?prefixed_optional)
-n, --nulls Outputs null values
-h, --help Print help
```

Expand All @@ -59,11 +60,11 @@ $ parquet2json ./myfile.parquet cat > output.jsonl
#### From S3 or HTTP (S3)

```shell
$ parquet2json s3://noaa-ghcn-pds/parquet/by_year/YEAR=2024/ELEMENT=AWDR/83d5f1ef1edf4e3c838b15f56f07dc02_0.snappy.parquet cat
$ parquet2json s3://overturemaps-us-west-2/release/2024-03-12-alpha.0/theme=base/type=land/part-00001-10ae8a61-702e-480f-9024-6dee4abd93df-c000.zstd.parquet cat
```

```shell
$ parquet2json https://noaa-ghcn-pds.s3.amazonaws.com/parquet/by_year/YEAR%3D2024/ELEMENT%3DAWDR/83d5f1ef1edf4e3c838b15f56f07dc02_0.snappy.parquet cat
$ parquet2json https://overturemaps-us-west-2.s3.us-west-2.amazonaws.com/release/2024-03-12-alpha.0/theme%3Dbase/type%3Dland/part-00001-10ae8a61-702e-480f-9024-6dee4abd93df-c000.zstd.parquet cat
```

#### Filter selected columns with jq
Expand Down
16 changes: 16 additions & 0 deletions src/cast.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use std::sync::Arc;

use arrow_array::{types::GenericBinaryType, Array, ArrayRef, GenericByteArray, OffsetSizeTrait};
use arrow_cast::base64::{b64_encode, BASE64_STANDARD};
use arrow_schema::ArrowError;

pub fn cast_binary_to_string<O: OffsetSizeTrait>(
array: &dyn Array,
) -> Result<ArrayRef, ArrowError> {
let array = array
.as_any()
.downcast_ref::<GenericByteArray<GenericBinaryType<O>>>()
.unwrap();

Ok(Arc::new(b64_encode(&BASE64_STANDARD, array)))
}
86 changes: 82 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
use std::ops::Add;
use std::sync::Arc;

use arrow_json::writer::LineDelimitedWriter;
use arrow_array::{Array, RecordBatch};
use arrow_cast::display::FormatOptions;
use arrow_cast::{cast_with_options, CastOptions};
use arrow_json::writer::LineDelimited;
use arrow_json::WriterBuilder;
use arrow_schema::{DataType, Field, SchemaBuilder};
use aws_config::profile::load;
use aws_config::profile::profile_file::ProfileFiles;
use aws_types::os_shim_internal::{Env, Fs};
use cast::cast_binary_to_string;
use clap::{Parser, Subcommand};
use object_store::aws::AmazonS3Builder;
use object_store::http::HttpBuilder;
Expand All @@ -19,6 +25,8 @@ use tokio_stream::StreamExt;
use url::Url;
use urlencoding::decode;

mod cast;

#[derive(Parser, Clone)]
#[clap(version, about, long_about = None)]
struct Cli {
Expand All @@ -44,6 +52,10 @@ enum Commands {
/// Select columns by name (comma,separated,?prefixed_optional)
#[clap(short, long)]
columns: Option<String>,

/// Outputs null values
#[clap(short, long)]
nulls: bool,
},

/// Outputs the Thrift schema
Expand All @@ -67,6 +79,7 @@ async fn output_for_command(mut reader: ParquetObjectReader, command: &Commands)
offset,
limit,
columns,
nulls,
} => {
let absolute_offset: usize = if offset.is_negative() {
parquet_metadata
Expand Down Expand Up @@ -115,10 +128,75 @@ async fn output_for_command(mut reader: ParquetObjectReader, command: &Commands)
}

let mut iter = async_reader_builder.build().unwrap();
let mut json_writer = LineDelimitedWriter::new(std::io::stdout());

while let Some(Ok(batch)) = iter.next().await {
let _ = json_writer.write(&batch);
let builder = WriterBuilder::new().with_explicit_nulls(nulls.clone());
let mut json_writer = builder.build::<_, LineDelimited>(std::io::stdout());

while let Some(rbt) = iter.next().await {
match rbt {
Ok(batch) => {
let schema = batch.schema();
let json_batch = if schema.fields.iter().any(|field| {
if let DataType::Binary
| DataType::Decimal128(_, _)
| DataType::Decimal256(_, _) = field.data_type()
{
true
} else {
false
}
}) {
let mut columns: Vec<Arc<dyn Array>> = vec![];
let mut builder = SchemaBuilder::new();
schema
.fields
.iter()
.for_each(|field| match field.data_type() {
DataType::Binary => {
builder.push(Field::new(
field.name(),
DataType::Utf8,
field.is_nullable(),
));
let column = batch.column_by_name(field.name()).unwrap();
let new_column =
cast_binary_to_string::<i32>(column).unwrap();
columns.push(new_column);
}
DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => {
builder.push(Field::new(
field.name(),
DataType::Utf8,
field.is_nullable(),
));
let column = batch.column_by_name(field.name()).unwrap();
let new_column = cast_with_options(
column,
&DataType::Utf8,
&CastOptions {
safe: false,
format_options: FormatOptions::default(),
},
)
.unwrap();
columns.push(new_column);
}
_ => {
builder.push(field.clone());
columns.push(
batch.column_by_name(field.name()).unwrap().clone(),
);
}
});
let schema = builder.finish();
RecordBatch::try_new(schema.into(), columns).unwrap()
} else {
batch
};
let _ = json_writer.write(&json_batch).unwrap();
}
Err(e) => println!("{}", e),
};
}
json_writer.finish().unwrap();
}
Expand Down

0 comments on commit 76e0fff

Please sign in to comment.