Skip to content

Commit

Permalink
Updated for new version of SDK.
Browse files Browse the repository at this point in the history
  • Loading branch information
kenstott committed Oct 11, 2024
1 parent f7e02ce commit e5b47f2
Show file tree
Hide file tree
Showing 12 changed files with 170 additions and 131 deletions.
15 changes: 8 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ wildcard_imports = { level = "allow" }
[workspace.dependencies]
jni = { version = "0.21.1", features = ["invocation"] }
axum-extra = "0.9.3"
http = "1.1.0"
http = "0.2"
mime = "0.3.17"
opentelemetry = "0.24.0"
opentelemetry-http = "0.13.0"
Expand All @@ -37,9 +37,9 @@ tracing-opentelemetry = "0.25.0"
tracing-subscriber = { version = "0.3", default-features = false, features = ["ansi", "env-filter", "fmt", "json"] }
dotenv = "0.15.0"
once_cell = "1.19.0"
ndc-models = { git = "https://github.com/hasura/ndc-spec.git", tag = "v0.1.5" }
ndc-sdk = { git = "https://github.com/hasura/ndc-sdk-rs.git", tag = "v0.2.2" }
ndc-test = { git = "https://github.com/hasura/ndc-spec.git", tag = "v0.1.5" }
ndc-models = { git = "https://github.com/hasura/ndc-spec.git", tag = "v0.1.6" }
ndc-sdk = { git = "https://github.com/hasura/ndc-sdk-rs.git", tag = "v0.4.0" }
ndc-test = { git = "https://github.com/hasura/ndc-spec.git", tag = "v0.1.6" }

anyhow = "1"
async-trait = "0.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.regex.Matcher;


/**
* The StatementPreparer class is responsible for preparing a SQL statement by replacing
Expand All @@ -31,10 +33,21 @@ public static PreparedStatement prepare(String input, Connection connection) thr
ArrayList<Object> extractedStringParams = new ArrayList<>();
modifiedInput = findParams(modifiedInput, extractedStrings, extractedStringParams);
PreparedStatement preparedStatement = connection.prepareStatement(modifiedInput);

// Regex pattern for RFC3339
String datePattern = "^[1-9]\\d{3}-(?:(?:0[1-9]|1[0-3])-(?:0[1-9]|1[0-9]|2[0-2])|(?:0[13-9]|1[0-2])-30|(?:0[13578]|1[02])-31)T(?:[01]\\d|2[0-3]):[0-5]\\d:[0-5]\\d(?:\\.\\d+)?(?:Z|[+-][01]\\d:[0-5]\\d)$";
Pattern pattern = Pattern.compile(datePattern);

for (int i = 0; i < extractedStringParams.size(); i++) {
Object item = extractedStringParams.get(i);
if (item instanceof String) {
preparedStatement.setString(i + 1, (String) item);
String strItem = (String) item;
if (pattern.matcher(strItem).matches()) {
// this string can be parsed as a date
preparedStatement.setDate(i + 1, java.sql.Date.valueOf(LocalDate.parse(strItem, DateTimeFormatter.ISO_OFFSET_DATE_TIME)));
} else {
preparedStatement.setString(i + 1, strItem);
}
}
}
return preparedStatement;
Expand Down
1 change: 1 addition & 0 deletions crates/calcite-schema/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }
once_cell = { workspace = true}
http = { workspace = true }

[dev-dependencies]
jsonschema = { workspace = true }
15 changes: 8 additions & 7 deletions crates/calcite-schema/src/collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
//! Introspect Calcite metadata and then reinterprets it into Calcite metadata.
//!
use std::collections::{BTreeMap, HashMap};
use std::error::Error;

use ndc_models::{CollectionInfo, CollectionName, FieldName, ForeignKeyConstraint, ObjectField, ObjectType, ObjectTypeName, ScalarType, ScalarTypeName, SchemaResponse, Type, TypeName, UniquenessConstraint};
use http::StatusCode;
use ndc_models::Type::{Named, Nullable};
use ndc_models::{CollectionInfo, CollectionName, FieldName, ForeignKeyConstraint, ObjectField, ObjectType, ObjectTypeName, ScalarType, ScalarTypeName, Type, TypeName, UniquenessConstraint};
use ndc_sdk::connector::{ErrorResponse, Result};
use serde_json::Value;
use tracing::Level;

use crate::calcite::{ColumnMetadata, TableMetadata};
Expand All @@ -32,7 +34,7 @@ use crate::calcite::{ColumnMetadata, TableMetadata};
pub fn collections(
data_models: &HashMap<CollectionName, TableMetadata>,
scalar_types: &BTreeMap<ScalarTypeName, ScalarType>,
) -> Result<(BTreeMap<ObjectTypeName, ObjectType>, Vec<CollectionInfo>), Result<SchemaResponse, Box<dyn Error>>, > {
) -> Result<(BTreeMap<ObjectTypeName, ObjectType>, Vec<CollectionInfo>)> {
let mut object_types: BTreeMap<ObjectTypeName, ObjectType> = BTreeMap::new();
let mut collection_infos: Vec<CollectionInfo> = Vec::new();

Expand All @@ -56,13 +58,12 @@ pub fn collections(
uniqueness_constraints,
})
} else {
return Err(Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
return Err(ErrorResponse::new(StatusCode::from_u16(500).unwrap(),
format!(
"Table names cannot be same as a scalar type name: {}",
table_metadata.name
),
))));
), Value::Null
));
}
}
Ok((object_types, collection_infos))
Expand Down
10 changes: 5 additions & 5 deletions crates/calcite-schema/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
//! the config file with the new schema.
//!

use std::error::Error;
use std::fs::File;
use std::io::Write;
use std::path::Path;
use jni::objects::GlobalRef;
use ndc_models as models;
use ndc_models::SchemaResponse;
use ndc_sdk::connector::{ErrorResponse, Result};
use tracing::{debug, event, Level};
use ndc_calcite_values::is_running_in_container::is_running_in_container;
use ndc_calcite_values::values::{CONFIGURATION_FILENAME, DEV_CONFIG_FILE_NAME, DOCKER_CONNECTOR_RW};
Expand Down Expand Up @@ -61,12 +61,12 @@ use crate::version5::ParsedConfiguration;
/// ```
// ANCHOR: get_schema
#[tracing::instrument(skip(configuration, calcite_ref), level=Level::INFO)]
pub fn get_schema(configuration: &ParsedConfiguration, calcite_ref: GlobalRef) -> Result<SchemaResponse, Box<dyn Error>> {
pub fn get_schema(configuration: &ParsedConfiguration, calcite_ref: GlobalRef) -> Result<SchemaResponse> {
let data_models = get_models(&calcite_ref);
let scalar_types = scalars::scalars();
let (object_types, collections) = match collections::collections(&data_models, &scalar_types) {
Ok(value) => value,
Err(value) => return value,
Err(value) => return Err(value),
};
let procedures = vec![];
let functions: Vec<models::FunctionInfo> = vec![];
Expand All @@ -89,8 +89,8 @@ pub fn get_schema(configuration: &ParsedConfiguration, calcite_ref: GlobalRef) -
let file = File::create(file_path);
match file {
Ok(mut file) => {
let serialized_json = serde_json::to_string_pretty(&new_configuration)?;
file.write_all(serialized_json.as_bytes())?;
let serialized_json = serde_json::to_string_pretty(&new_configuration).map_err(ErrorResponse::from_error)?;
file.write_all(serialized_json.as_bytes()).map_err(ErrorResponse::from_error)?;
event!(Level::INFO, "Wrote metadata to config: {}", serde_json::to_string(&schema).unwrap());
}
Err(_) => {
Expand Down
22 changes: 21 additions & 1 deletion crates/calcite-schema/src/version5.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
//! Internal Configuration and state for our connector.

use std::collections::{HashMap};
use std::{error, fmt};
use std::path::Path;
use jni::errors::{Error, JniError};
use jni::JNIEnv;
use jni::objects::{GlobalRef, JObject, JValueGen, JValueOwned};
use jni::objects::JValueGen::Object;
use ndc_models::CollectionName;
use ndc_sdk::connector::InitializationError;
use once_cell::sync::OnceCell;

use schemars::JsonSchema;
Expand All @@ -24,6 +24,26 @@ use crate::error::{ParseConfigurationError, WriteParsedConfigurationError};
use crate::jvm::{get_jvm};
use crate::models::get_models;

#[derive(Debug)]
pub enum InitializationError {
Other(Box<dyn error::Error>),
}

impl fmt::Display for InitializationError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
InitializationError::Other(ref e) => write!(f, "Other error: {}", e),
}
}
}

impl error::Error for InitializationError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match *self {
InitializationError::Other(ref e) => Some(&**e),
}
}
}
pub struct CalciteRefSingleton {
calcite_ref: OnceCell<GlobalRef>,
}
Expand Down
49 changes: 24 additions & 25 deletions crates/connectors/ndc-calcite/src/calcite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,19 @@ use jni::objects::{GlobalRef, JObject, JString, JValueGen};
use jni::objects::JValueGen::Object;
use ndc_models as models;
use ndc_models::{FieldName, RowFieldValue};
use ndc_sdk::connector::QueryError;
use ndc_sdk::connector::{ErrorResponse};
use opentelemetry::trace::{TraceContextExt};
use serde::de::DeserializeOwned;
use serde_json::Value;
use tracing::{event, Level};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use ndc_sdk::connector::error::Result;

use ndc_calcite_schema::jvm::get_jvm;
use ndc_calcite_schema::version5::create_jvm_connection;
use ndc_calcite_schema::version5::ParsedConfiguration;

pub type Row = IndexMap<FieldName, RowFieldValue>;

fn is_json_string<T: DeserializeOwned>(s: &str) -> bool {
serde_json::from_str::<T>(s).is_ok()
}

/// Creates a Calcite query engine.
///
/// This function creates an instance of the `CalciteQuery` class and initializes
Expand Down Expand Up @@ -61,21 +57,21 @@ fn is_json_string<T: DeserializeOwned>(s: &str) -> bool {
/// }
/// ```
#[tracing::instrument(skip(configuration, env), level = Level::INFO)]
pub fn create_query_engine<'a>(configuration: &'a ParsedConfiguration, env: &'a mut JNIEnv<'a>) -> JObject<'a> {
let class = env.find_class("org/kenstott/CalciteQuery").unwrap();
let instance = env.new_object(class, "()V", &[]).unwrap();
pub fn create_query_engine<'a>(configuration: &'a ParsedConfiguration, env: &'a mut JNIEnv<'a>) -> Result<JObject<'a>> {
let class = env.find_class("org/kenstott/CalciteQuery").map_err(ErrorResponse::from_error)?;
let instance = env.new_object(class, "()V", &[]).map_err(ErrorResponse::from_error)?;
let _ = create_jvm_connection(configuration, &instance, env);
event!(Level::INFO, "Instantiated Calcite Query Engine");
return instance;
Ok(instance)
}

fn parse_to_row(data: Vec<String>) -> Vec<Row> {
fn parse_to_row(data: Vec<String>) -> Result<Vec<Row>> {
let mut rows: Vec<Row> = Vec::new();
for item in data {
let row: Row = serde_json::from_str(&item).unwrap();
let row: Row = serde_json::from_str(&item).map_err(ErrorResponse::from_error)?;
rows.push(row);
}
rows
Ok(rows)
}

/// Executes a query using the Calcite Java library.
Expand Down Expand Up @@ -132,7 +128,7 @@ pub fn connector_query(
sql_query: &str,
query_metadata: &models::Query,
explain: &bool,
) -> Result<Vec<Row>, QueryError> {
) -> Result<Vec<Row>> {

// This method of retrieving current span context is not working!!!
let span = tracing::Span::current();
Expand All @@ -141,12 +137,12 @@ pub fn connector_query(
let trace_id = otel_context.span().span_context().trace_id();

let jvm = get_jvm().lock().unwrap();
let mut java_env = jvm.attach_current_thread().unwrap();
let calcite_query = java_env.new_local_ref(calcite_reference).unwrap();
let mut java_env = jvm.attach_current_thread().map_err(ErrorResponse::from_error)?;
let calcite_query = java_env.new_local_ref(calcite_reference).map_err(ErrorResponse::from_error)?;

let temp_string = java_env.new_string(sql_query).unwrap();
let trace_id_jstring = java_env.new_string(trace_id.to_string()).unwrap();
let span_id_jstring = java_env.new_string(span_id.to_string()).unwrap();
let temp_string = java_env.new_string(sql_query).or(Err(ErrorResponse::from_error(CalciteError { message: String::from("Failed to get sql query string") })))?;
let trace_id_jstring = java_env.new_string(trace_id.to_string()).or(Err(ErrorResponse::from_error(CalciteError { message: String::from("Failed to get trace id string") })))?;
let span_id_jstring = java_env.new_string(span_id.to_string()).or(Err(ErrorResponse::from_error(CalciteError { message: String::from("Failed to get span id string") })))?;
let temp_obj = JObject::from(temp_string);
let trace_id_obj = JObject::from(trace_id_jstring);
let span_id_obj = JObject::from(span_id_jstring);
Expand All @@ -161,22 +157,25 @@ pub fn connector_query(
match result.unwrap() {
Object(obj) => {
let json_string: String = java_env.get_string(&JString::from(obj)).unwrap().into();
let rows: Vec<Row> = match serde_json::from_str::<Vec<String>>(&json_string) {
let mut rows: Vec<Row> = match serde_json::from_str::<Vec<String>>(&json_string) {
Ok(json_rows) => {
parse_to_row(json_rows)
parse_to_row(json_rows)?
},
Err(_) => match serde_json::from_str::<Vec<Row>>(&json_string) {
Ok(vec) => vec,
Err(error) => {
let err = CalciteError { message: format!("Failed to deserialize JSON: {}", error) };
return Err(QueryError::Other(Box::new(err), Value::Null));
return Err(ErrorResponse::from_error(err));
}
}
};
if config.fixes.unwrap_or_default() {
rows = fix_rows(rows, query_metadata);
}
event!(Level::DEBUG, result = format!("Completed Query. Retrieved {} rows. Result: {:?}", rows.len().to_string(), serde_json::to_string_pretty(&rows)));
Ok(rows)
}
_ => Err(QueryError::Other(Box::new(CalciteError { message: String::from("Invalid response from Calcite.") }), Value::Null))
_ => Err(ErrorResponse::from_error(CalciteError { message: String::from("Invalid response from Calcite. Expected object.") }))
}
}

Expand Down Expand Up @@ -216,8 +215,8 @@ fn fix_rows(rows: Vec<Row>, query_metadata: &models::Query) -> Vec<Row> {
// ANCHOR_END: calcite_query

#[derive(Debug)]
struct CalciteError {
message: String,
pub(crate) struct CalciteError {
pub(crate) message: String,
}

impl fmt::Display for CalciteError {
Expand Down
8 changes: 4 additions & 4 deletions crates/connectors/ndc-calcite/src/capabilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@
//! - Nested fields
//! - Explain

use ndc_models::{
Capabilities, CapabilitiesResponse, LeafCapability, MutationCapabilities,
NestedFieldCapabilities, QueryCapabilities, RelationshipCapabilities,
};
use ndc_models::{Capabilities, CapabilitiesResponse, ExistsCapabilities, LeafCapability, MutationCapabilities, NestedFieldCapabilities, QueryCapabilities, RelationshipCapabilities};
use tracing::{Level};

/// Calculates the capabilities of the Calcite system.
Expand Down Expand Up @@ -41,6 +38,9 @@ pub fn calcite_capabilities() -> CapabilitiesResponse {
aggregates: Some(LeafCapability {}),
variables: Some(LeafCapability {}),
explain: Some(LeafCapability {}),
exists: ExistsCapabilities {
nested_collections: None,
},
nested_fields: NestedFieldCapabilities {
filter_by: None,
order_by: None,
Expand Down
Loading

0 comments on commit e5b47f2

Please sign in to comment.