Skip to content

Commit

Permalink
scaffolding for geoparquet
Browse files Browse the repository at this point in the history
  • Loading branch information
jjcfrancisco committed Aug 9, 2024
1 parent 415a9dd commit b7dbd6f
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 12 deletions.
8 changes: 8 additions & 0 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,12 @@
--table spain \
--mode=overwrite

@try-geoparquet:
cargo build --release
cd ./target/release/ && ./popgis --input ../../examples/geoparquet/example.parquet \
--uri postgresql://pio:password@localhost:25432/popgis \
--schema geoparquet \
--table example \
--mode=overwrite

@set-tags:
3 changes: 1 addition & 2 deletions src/file_types/geojson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ use wkb::geom_to_wkb;
use crate::pg::binary_copy::{infer_geometry_type, insert_row};
use crate::pg::ops::prepare_postgis;
use crate::utils::cli::Cli;

use super::common::{AcceptedTypes, NameAndType};
use crate::file_types::common::{AcceptedTypes, NameAndType};

pub fn insert_data(args: Cli) -> Result<()> {
// Determine data types of the input file
Expand Down
77 changes: 71 additions & 6 deletions src/file_types/geoparquet.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,75 @@
use std::any::Any;

use crate::{Error, Result};

use geoarrow::io::parquet::read_geoparquet_async;
use geoarrow::io::parquet::GeoParquetReaderOptions;
use geoarrow::table::GeoTable;
use tokio::fs::File;
use postgres::types::Type;
use std::collections::HashMap;
use std::any::TypeId;

use geoarrow::io::parquet::read_geoparquet_async;
use geoarrow::io::parquet::GeoParquetReaderOptions;
use crate::utils::cli::Cli;
use crate::file_types::common::NameAndType;
use crate::pg::ops::prepare_postgis;
use crate::pg::binary_copy::infer_geometry_type;

pub fn insert_data(args: Cli) -> Result<()> {
// Currently static batch size. In time, this should be dynamic
let batch_size = 1000;
// Read geoparquet file using tokio runtime
let runtime = tokio::runtime::Runtime::new()?;
let geotable = runtime.block_on(read_geoparquet(&args.input, batch_size))?;
let file_data_types = determine_file_data_types(&geotable)?;

// Determine data types of the input file
// Prepare database
prepare_postgis(&args, &file_data_types)?;

// Get data types
let mut types: Vec<Type> = Vec::new();
for column in file_data_types.iter() {
types.push(column.data_type.clone());
}
// Get geometry type
let geom_type = infer_geometry_type(&args.table, &args.schema, &args.uri)?;
// Add geometry type to types
types.push(geom_type);

// Process geotable
process_geotable(geotable)?;
Ok(())
}

fn determine_file_data_types(geotable: &GeoTable) -> Result<Vec<NameAndType>> {
let schema = geotable.schema();
let mut table_config: HashMap<String, Type> = HashMap::new();
for field in schema.fields() {
let name = field.name();
let data_type = field.data_type();
if data_type.type_id() == TypeId::of::<f64>() {
table_config.insert(name.to_string(), Type::FLOAT8);
} else if data_type.type_id() == TypeId::of::<i64>() {
table_config.insert(name.to_string(), Type::INT8);
} else if data_type.type_id() == TypeId::of::<String>() {
table_config.insert(name.to_string(), Type::VARCHAR);
} else if data_type.type_id() == TypeId::of::<bool>() {
table_config.insert(name.to_string(), Type::BOOL);
}
}

let mut names_and_types: Vec<NameAndType> = Vec::new();
for (name, data_type) in table_config.iter() {
names_and_types.push(NameAndType {
name: name.to_string(),
data_type: data_type.clone(),
});
}

Ok(names_and_types)

}

async fn read_geoparquet(file: &str, batch_size: usize) -> Result<GeoTable> {
let file = File::open(file).await.unwrap();
Expand All @@ -14,9 +79,7 @@ async fn read_geoparquet(file: &str, batch_size: usize) -> Result<GeoTable> {
Ok(geotable)
}

pub fn process_geotable() -> Result<()> {
let runtime = tokio::runtime::Runtime::new()?;
let geotable = runtime.block_on(read_geoparquet("../../data/saporo.parquet", 1000))?;
pub fn process_geotable(geotable: GeoTable) -> Result<()> {
let geom_column = geotable.geometry()?;
let geom_type = geom_column.data_type();
println!("{:?}", geom_type);
Expand Down Expand Up @@ -47,7 +110,9 @@ mod tests {
let runtime = tokio::runtime::Runtime::new().unwrap();
let file_path = "examples/geoparquet/example.parquet";
let batch_size = 1000;
let result = runtime.block_on(read_geoparquet(file_path, batch_size)).unwrap();
let result = runtime
.block_on(read_geoparquet(file_path, batch_size))
.unwrap();
assert_eq!(result.len(), 5);
}
}
3 changes: 1 addition & 2 deletions src/file_types/shapefile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@ use crate::{Error, Result};
use postgres::types::Type;
use shapefile::dbase::FieldValue;
use std::collections::HashMap;
use wkb::geom_to_wkb;

use crate::file_types::common::{AcceptedTypes, NameAndType};
use crate::file_types::geo::to_geo;
use crate::pg::binary_copy::{infer_geometry_type, insert_row, Wkb};
use crate::pg::ops::prepare_postgis;
use crate::utils::cli::Cli;

use wkb::geom_to_wkb;

pub fn insert_data(args: Cli) -> Result<()> {
// Determine data types of the input file
let file_data_types = determine_file_data_types(&args.input)?;
Expand Down
4 changes: 2 additions & 2 deletions src/utils/cli.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{Result, Error};
use crate::file_types::common::{FileType, determine_file_type};
use crate::utils::validate::validate_args;
use crate::file_types::{geojson, shapefile};
use crate::file_types::{geojson, shapefile, geoparquet};
use crate::pg::ops::{check_table_exists, drop_table};

use clap::Parser;
Expand Down Expand Up @@ -73,7 +73,7 @@ pub fn run() -> Result<()> {
shapefile::insert_data(args)?;
}
FileType::GeoParquet => {
// create var must be passed
geoparquet::insert_data(args)?;
}
};

Expand Down

0 comments on commit b7dbd6f

Please sign in to comment.