Skip to content

Commit

Permalink
[FEAT] connect: createDataFrame (#3363)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewgazelka authored Dec 5, 2024
1 parent d1d0fab commit 86523a0
Show file tree
Hide file tree
Showing 22 changed files with 679 additions and 51 deletions.
64 changes: 56 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ daft-dsl = {path = "src/daft-dsl"}
daft-hash = {path = "src/daft-hash"}
daft-local-execution = {path = "src/daft-local-execution"}
daft-logical-plan = {path = "src/daft-logical-plan"}
daft-micropartition = {path = "src/daft-micropartition"}
daft-scan = {path = "src/daft-scan"}
daft-schema = {path = "src/daft-schema"}
daft-table = {path = "src/daft-table"}
Expand Down
28 changes: 28 additions & 0 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,34 @@
ManyColumnsInputType = Union[ColumnInputType, Iterable[ColumnInputType]]


def to_logical_plan_builder(*parts: MicroPartition) -> LogicalPlanBuilder:
"""Creates a Daft DataFrame from a single Table.
Args:
parts: The Tables that we wish to convert into a Daft DataFrame.
Returns:
DataFrame: Daft DataFrame created from the provided Table.
"""
if not parts:
raise ValueError("Can't create a DataFrame from an empty list of tables.")

result_pset = LocalPartitionSet()

for i, part in enumerate(parts):
result_pset.set_partition_from_table(i, part)

context = get_context()
cache_entry = context.get_or_create_runner().put_partition_set_into_cache(result_pset)
size_bytes = result_pset.size_bytes()
num_rows = len(result_pset)

assert size_bytes is not None, "In-memory data should always have non-None size in bytes"
return LogicalPlanBuilder.from_in_memory_scan(
cache_entry, parts[0].schema(), result_pset.num_partitions(), size_bytes, num_rows=num_rows
)


class DataFrame:
"""A Daft DataFrame is a table of data. It has columns, where each column has a type and the same
number of items (rows) as all other columns.
Expand Down
2 changes: 1 addition & 1 deletion src/arrow2/src/io/ipc/read/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub fn read_record_batch<R: Read + Seek>(
file_size: u64,
scratch: &mut Vec<u8>,
) -> Result<Chunk<Box<dyn Array>>> {
assert_eq!(fields.len(), ipc_schema.fields.len());
assert_eq!(fields.len(), ipc_schema.fields.len(), "IPC schema fields and Arrow schema fields must be the same length");
let buffers = batch
.buffers()
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferBuffers(err)))?
Expand Down
2 changes: 1 addition & 1 deletion src/arrow2/src/io/ipc/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,4 @@ pub type Dictionaries = AHashMap<i64, Box<dyn Array>>;
pub(crate) type Node<'a> = arrow_format::ipc::FieldNodeRef<'a>;
pub(crate) type IpcBuffer<'a> = arrow_format::ipc::BufferRef<'a>;
pub(crate) type Compression<'a> = arrow_format::ipc::BodyCompressionRef<'a>;
pub(crate) type Version = arrow_format::ipc::MetadataVersion;
pub type Version = arrow_format::ipc::MetadataVersion;
9 changes: 7 additions & 2 deletions src/daft-connect/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
[dependencies]
arrow2 = {workspace = true}
arrow2 = {workspace = true, features = ["io_json_integration"]}
async-stream = "0.3.6"
color-eyre = "0.6.3"
common-daft-config = {workspace = true}
daft-core = {workspace = true}
daft-dsl = {workspace = true}
daft-local-execution = {workspace = true}
daft-logical-plan = {workspace = true}
daft-micropartition = {workspace = true}
daft-scan = {workspace = true}
daft-schema = {workspace = true}
daft-table = {workspace = true}
dashmap = "6.1.0"
derive_more = {workspace = true}
eyre = "0.6.12"
futures = "0.3.31"
itertools = {workspace = true}
pyo3 = {workspace = true, optional = true}
serde_json = {workspace = true}
spark-connect = {workspace = true}
tokio = {version = "1.40.0", features = ["full"]}
tonic = "0.12.3"
tracing = {workspace = true}
uuid = {version = "1.10.0", features = ["v4"]}

[features]
python = ["dep:pyo3", "common-daft-config/python", "daft-local-execution/python", "daft-logical-plan/python", "daft-scan/python", "daft-table/python", "daft-dsl/python", "daft-schema/python", "daft-core/python"]
python = ["dep:pyo3", "common-daft-config/python", "daft-local-execution/python", "daft-logical-plan/python", "daft-scan/python", "daft-table/python", "daft-dsl/python", "daft-schema/python", "daft-core/python", "daft-micropartition/python"]

[lints]
workspace = true
Expand Down
22 changes: 13 additions & 9 deletions src/daft-connect/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#![feature(iter_from_coroutine)]
#![feature(stmt_expr_attributes)]
#![feature(try_trait_v2_residual)]
#![deny(clippy::print_stdout)]

use dashmap::DashMap;
use eyre::Context;
Expand All @@ -23,7 +22,7 @@ use spark_connect::{
ReleaseExecuteResponse, ReleaseSessionRequest, ReleaseSessionResponse,
};
use tonic::{transport::Server, Request, Response, Status};
use tracing::{debug, info};
use tracing::info;
use uuid::Uuid;

use crate::session::Session;
Expand Down Expand Up @@ -325,8 +324,6 @@ impl SparkConnectService for DaftSparkConnectService {
result: Some(analyze_plan_response::Result::Schema(schema)),
};

debug!("response: {response:#?}");

Ok(Response::new(response))
}
_ => unimplemented_err!("Analyze plan operation is not yet implemented"),
Expand All @@ -346,7 +343,6 @@ impl SparkConnectService for DaftSparkConnectService {
&self,
_request: Request<InterruptRequest>,
) -> Result<Response<InterruptResponse>, Status> {
println!("got interrupt");
unimplemented_err!("interrupt operation is not yet implemented")
}

Expand All @@ -361,17 +357,26 @@ impl SparkConnectService for DaftSparkConnectService {
#[tracing::instrument(skip_all)]
async fn release_execute(
&self,
_request: Request<ReleaseExecuteRequest>,
request: Request<ReleaseExecuteRequest>,
) -> Result<Response<ReleaseExecuteResponse>, Status> {
unimplemented_err!("release_execute operation is not yet implemented")
let request = request.into_inner();

let session = self.get_session(&request.session_id)?;

let response = ReleaseExecuteResponse {
session_id: session.client_side_session_id().to_string(),
server_side_session_id: session.server_side_session_id().to_string(),
operation_id: None, // todo: set but not strictly required
};

Ok(Response::new(response))
}

#[tracing::instrument(skip_all)]
async fn release_session(
&self,
_request: Request<ReleaseSessionRequest>,
) -> Result<Response<ReleaseSessionResponse>, Status> {
println!("got release session");
unimplemented_err!("release_session operation is not yet implemented")
}

Expand All @@ -380,7 +385,6 @@ impl SparkConnectService for DaftSparkConnectService {
&self,
_request: Request<FetchErrorDetailsRequest>,
) -> Result<Response<FetchErrorDetailsResponse>, Status> {
println!("got fetch error details");
unimplemented_err!("fetch_error_details operation is not yet implemented")
}
}
Expand Down
11 changes: 5 additions & 6 deletions src/daft-connect/src/op/execute/root.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, future::ready, sync::Arc};
use std::{future::ready, sync::Arc};

use common_daft_config::DaftExecutionConfig;
use daft_local_execution::NativeExecutor;
Expand All @@ -10,6 +10,7 @@ use crate::{
op::execute::{ExecuteStream, PlanIds},
session::Session,
translation,
translation::Plan,
};

impl Session {
Expand All @@ -31,13 +32,11 @@ impl Session {
let (tx, rx) = tokio::sync::mpsc::channel::<eyre::Result<ExecutePlanResponse>>(1);
tokio::spawn(async move {
let execution_fut = async {
let plan = translation::to_logical_plan(command)?;
let optimized_plan = plan.optimize()?;
let Plan { builder, psets } = translation::to_logical_plan(command)?;
let optimized_plan = builder.optimize()?;
let cfg = Arc::new(DaftExecutionConfig::default());
let native_executor = NativeExecutor::from_logical_plan_builder(&optimized_plan)?;
let mut result_stream = native_executor
.run(HashMap::new(), cfg, None)?
.into_stream();
let mut result_stream = native_executor.run(psets, cfg, None)?.into_stream();

while let Some(result) = result_stream.next().await {
let result = result?;
Expand Down
4 changes: 2 additions & 2 deletions src/daft-connect/src/translation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ mod literal;
mod logical_plan;
mod schema;

pub use datatype::{to_daft_datatype, to_spark_datatype};
pub use datatype::{deser_spark_datatype, to_daft_datatype, to_spark_datatype};
pub use expr::to_daft_expr;
pub use literal::to_daft_literal;
pub use logical_plan::to_logical_plan;
pub use logical_plan::{to_logical_plan, Plan};
pub use schema::relation_to_schema;
3 changes: 3 additions & 0 deletions src/daft-connect/src/translation/datatype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ use eyre::{bail, ensure, WrapErr};
use spark_connect::data_type::Kind;
use tracing::warn;

mod codec;
pub use codec::deser as deser_spark_datatype;

pub fn to_spark_datatype(datatype: &DataType) -> spark_connect::DataType {
match datatype {
DataType::Null => spark_connect::DataType {
Expand Down
Loading

0 comments on commit 86523a0

Please sign in to comment.