Skip to content

Commit

Permalink
feat: support reading tables via Unity Catalog provided credentials
Browse files Browse the repository at this point in the history
Signed-off-by: Stephen Carman <[email protected]>
  • Loading branch information
hntd187 committed Dec 23, 2024
1 parent 1c3fe85 commit 9a6e48e
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 95 deletions.
9 changes: 6 additions & 3 deletions crates/catalog-unity/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,8 +761,11 @@ mod tests {

let get_table_response = client
.get_table("catalog_name", "schema_name", "table_name")
.await
.unwrap();
assert!(matches!(get_table_response, GetTableResponse::Success(_)));
.await;
dbg!(&get_table_response);
assert!(matches!(
get_table_response.unwrap(),
GetTableResponse::Success(_)
));
}
}
273 changes: 181 additions & 92 deletions crates/catalog-unity/src/models.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
//! Api models for databricks unity catalog APIs
use chrono::serde::*;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt;
Expand Down Expand Up @@ -159,6 +161,7 @@ pub struct Catalog {
pub delta_sharing_valid_through_timestamp: u64,
}

#[allow(unused)]
#[derive(Deserialize, Default, Debug)]
pub struct ProvisioningInfo {
state: ProvisioningState,
Expand Down Expand Up @@ -240,6 +243,7 @@ pub struct Schema {
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
#[allow(missing_docs)]
/// Possible data source formats for unity tables
#[derive(Clone, PartialEq)]
pub enum DataSourceFormat {
#[default]
Undefined,
Expand All @@ -252,12 +256,27 @@ pub enum DataSourceFormat {
Text,
UnityCatalog,
Deltasharing,
DatabricksFormat,
MySQLFormat,
PostgreSQLFormat,
RedshiftFormat,
SnowflakeFormat,
SQLDWFormat,
SQLServerFormat,
SalesForceFormat,
BigQueryFormat,
NetSuiteFormat,
WorkdayRAASFormat,
HiveSerde,
HiveCustom,
VectorIndexFormat,
}

#[derive(Deserialize, Default, Debug)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
#[allow(missing_docs)]
/// Possible data source formats for unity tables
#[derive(PartialEq, Clone)]
pub enum TableType {
#[default]
Undefined,
Expand All @@ -278,35 +297,92 @@ pub struct TableSummary {
}

/// A table within a schema
#[derive(Deserialize, Default, Debug)]
#[serde(default)]
#[derive(Clone, Debug, PartialEq, Default, Deserialize)]
pub struct Table {
pub name: String,
/// Name of parent catalog.
pub catalog_name: String,
/// Name of parent schema relative to its parent catalog.
pub schema_name: String,
pub table_type: TableType,
pub data_source_format: DataSourceFormat,
/// The array of __ColumnInfo__ definitions of the table's columns.
pub columns: Vec<ColumnInfo>,
/// Storage root URL for table (for **MANAGED**, **EXTERNAL** tables)
pub storage_location: String,
pub view_definition: String,
pub sql_path: String,
pub owner: String,
pub comment: String,
/// User-provided free-form text description.
#[serde(skip_serializing_if = "Option::is_none")]
pub comment: Option<String>,
/// A map of key-value properties attached to the securable.
#[serde(skip_serializing_if = "HashMap::is_empty")]
pub properties: HashMap<String, String>,
pub storage_credential_name: String,
pub enable_predictive_optimization: String,
pub metastore_id: String,
pub full_name: String,
pub created_at: u64,
pub created_by: String,
pub updated_at: u64,
pub updated_by: String,
pub deleted_at: u64,
/// Time at which this table was created, in epoch milliseconds.
#[serde(with = "ts_milliseconds")]
pub created_at: DateTime<Utc>,
/// Time at which this table was last modified, in epoch milliseconds.
#[serde(with = "ts_milliseconds")]
pub updated_at: DateTime<Utc>,
/// Unique identifier for the table.
pub table_id: String,
pub delta_runtime_properties_kvpairs: DeltaRuntimeProperties,
pub effective_predictive_optimization_flag: EffectivePredictiveOptimizationFlag,
pub access_point: String,
pub pipeline_id: String,
pub browse_only: bool,
}

#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)]
pub struct ColumnInfo {
/// Name of Column.
pub name: String,
/// Full data type specification as SQL/catalogString text.
#[serde(skip_serializing_if = "Option::is_none")]
pub type_text: Option<String>,
/// Full data type specification, JSON-serialized.
#[serde(skip_serializing_if = "Option::is_none")]
pub type_json: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub type_name: Option<ColumnTypeName>,
/// Digits of precision; required for DecimalTypes.
#[serde(skip_serializing_if = "Option::is_none")]
pub type_precision: Option<i32>,
/// Digits to right of decimal; Required for DecimalTypes.
#[serde(skip_serializing_if = "Option::is_none")]
pub type_scale: Option<i32>,
/// Format of IntervalType.
#[serde(skip_serializing_if = "Option::is_none")]
pub type_interval_type: Option<String>,
/// Ordinal position of column (starting at position 0).
pub position: u32,
/// User-provided free-form text description.
#[serde(skip_serializing_if = "Option::is_none")]
pub comment: Option<String>,
/// Whether field may be Null.
pub nullable: bool,
/// Partition index for column.
#[serde(skip_serializing_if = "Option::is_none")]
pub partition_index: Option<i32>,
}

#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum ColumnTypeName {
Boolean,
Byte,
Short,
Int,
Long,
Float,
Double,
Date,
Timestamp,
TimestampNtz,
String,
Binary,
Decimal,
Interval,
Array,
Struct,
Map,
Char,
Null,
UserDefinedType,
TableType,
}

#[derive(Deserialize, Default, Debug)]
Expand All @@ -322,7 +398,7 @@ pub struct TemporaryTableCredentials {
pub gcp_oauth_token: Option<GcpOauthToken>,
pub r2_temp_credentials: Option<R2TempCredentials>,
#[serde(with = "chrono::serde::ts_milliseconds")]
pub expiration_time: chrono::DateTime<chrono::Utc>,
pub expiration_time: DateTime<Utc>,
pub url: String,
}

Expand Down Expand Up @@ -403,7 +479,8 @@ pub(crate) mod tests {
pub(crate) const ERROR_RESPONSE: &str = r#"
{
"error_code": "404",
"message": "error message"
"message": "error message",
"details": []
}
"#;

Expand All @@ -417,6 +494,7 @@ pub(crate) mod tests {
"full_name": "string",
"catalog_type": "string",
"catalog_name": "string",
"schema_name": "string",
"storage_root": "string",
"storage_location": "string",
"properties": {
Expand All @@ -427,7 +505,8 @@ pub(crate) mod tests {
"created_at": 0,
"owner": "string",
"updated_at": 0,
"metastore_id": "string"
"metastore_id": "string",
"table_id": "string"
}
]
}"#;
Expand All @@ -454,21 +533,55 @@ pub(crate) mod tests {
}"#;

pub(crate) const GET_TABLE_RESPONSE: &str = r#"
{
"created_by": "string",
"name": "table_name",
"updated_by": "string",
"sql_path": "string",
"data_source_format": "DELTA",
"full_name": "string",
"delta_runtime_properties_kvpairs": {
"delta_runtime_properties": {
{
"name": "string",
"catalog_name": "string",
"schema_name": "string",
"table_type": "MANAGED",
"data_source_format": "DELTA",
"columns": [
{
"name": "string",
"type_text": "string",
"type_name": "BOOLEAN",
"position": 0,
"type_precision": 0,
"type_scale": 0,
"type_interval_type": "string",
"type_json": "string",
"comment": "string",
"nullable": true,
"partition_index": 0,
"mask": {
"function_name": "string",
"using_column_names": [
"string"
]
}
}
],
"storage_location": "string",
"view_definition": "string",
"view_dependencies": {
"dependencies": [
{
"table": {
"table_full_name": "string"
},
"function": {
"function_full_name": "string"
}
}
]
},
"sql_path": "string",
"owner": "string",
"comment": "string",
"properties": {
"property1": "string",
"property2": "string"
}
},
"catalog_name": "string",
"table_constraints": {
},
"storage_credential_name": "string",
"table_constraints": [
{
"primary_key_constraint": {
Expand All @@ -491,63 +604,38 @@ pub(crate) mod tests {
"name": "string"
}
}
]
},
"schema_name": "string",
"storage_location": "string",
"properties": {
"property1": "string",
"property2": "string"
},
"columns": [
{
"nullable": "true",
"name": "string",
"type_interval_type": "string",
"mask": {
"function_name": "string",
"using_column_names": [
"string"
]
},
"type_scale": 0,
"type_text": "string",
"comment": "string",
"partition_index": 0,
"type_json": "string",
"position": 0,
"type_name": "BOOLEAN",
"type_precision": 0
}
],
"comment": "string",
"table_id": "string",
"table_type": "MANAGED",
"created_at": 0,
"row_filter": {
"name": "string",
"input_column_names": [
"string"
]
},
"owner": "string",
"storage_credential_name": "string",
"updated_at": 0,
"view_definition": "string",
"view_dependencies": [
{
"table": {
"table_full_name": "string"
},
"function": {
"function_full_name": "string"
],
"row_filter": {
"function_name": "string",
"input_column_names": [
"string"
]
},
"enable_predictive_optimization": "DISABLE",
"metastore_id": "string",
"full_name": "string",
"data_access_configuration_id": "string",
"created_at": 0,
"created_by": "string",
"updated_at": 0,
"updated_by": "string",
"deleted_at": 0,
"table_id": "string",
"delta_runtime_properties_kvpairs": {
"delta_runtime_properties": {
"property1": "string",
"property2": "string"
}
}
],
"data_access_configuration_id": "string",
"deleted_at": 0,
"metastore_id": "string"
}
},
"effective_predictive_optimization_flag": {
"value": "DISABLE",
"inherited_from_type": "CATALOG",
"inherited_from_name": "string"
},
"access_point": "string",
"pipeline_id": "string",
"browse_only": true
}
"#;

pub(crate) const LIST_TABLES: &str = r#"
Expand Down Expand Up @@ -608,6 +696,7 @@ pub(crate) mod tests {

let get_table: Result<GetTableResponse, _> = serde_json::from_str(ERROR_RESPONSE);
assert!(get_table.is_ok());
dbg!(&get_table);
assert!(matches!(get_table.unwrap(), GetTableResponse::Error(_)))
}
}

0 comments on commit 9a6e48e

Please sign in to comment.