Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] connect: createDataFrame #3363

Merged
merged 1 commit into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 56 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ daft-dsl = {path = "src/daft-dsl"}
daft-hash = {path = "src/daft-hash"}
daft-local-execution = {path = "src/daft-local-execution"}
daft-logical-plan = {path = "src/daft-logical-plan"}
daft-micropartition = {path = "src/daft-micropartition"}
daft-scan = {path = "src/daft-scan"}
daft-schema = {path = "src/daft-schema"}
daft-table = {path = "src/daft-table"}
Expand Down
28 changes: 28 additions & 0 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,34 @@
ManyColumnsInputType = Union[ColumnInputType, Iterable[ColumnInputType]]


def to_logical_plan_builder(*parts: MicroPartition) -> LogicalPlanBuilder:
"""Creates a Daft DataFrame from a single Table.
Args:
parts: The Tables that we wish to convert into a Daft DataFrame.
Returns:
DataFrame: Daft DataFrame created from the provided Table.
"""
if not parts:
raise ValueError("Can't create a DataFrame from an empty list of tables.")

result_pset = LocalPartitionSet()

for i, part in enumerate(parts):
result_pset.set_partition_from_table(i, part)

context = get_context()
cache_entry = context.get_or_create_runner().put_partition_set_into_cache(result_pset)
size_bytes = result_pset.size_bytes()
num_rows = len(result_pset)

assert size_bytes is not None, "In-memory data should always have non-None size in bytes"
return LogicalPlanBuilder.from_in_memory_scan(
cache_entry, parts[0].schema(), result_pset.num_partitions(), size_bytes, num_rows=num_rows
)


class DataFrame:
"""A Daft DataFrame is a table of data. It has columns, where each column has a type and the same
number of items (rows) as all other columns.
Expand Down
2 changes: 1 addition & 1 deletion src/arrow2/src/io/ipc/read/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub fn read_record_batch<R: Read + Seek>(
file_size: u64,
scratch: &mut Vec<u8>,
) -> Result<Chunk<Box<dyn Array>>> {
assert_eq!(fields.len(), ipc_schema.fields.len());
assert_eq!(fields.len(), ipc_schema.fields.len(), "IPC schema fields and Arrow schema fields must be the same length");
let buffers = batch
.buffers()
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferBuffers(err)))?
Expand Down
2 changes: 1 addition & 1 deletion src/arrow2/src/io/ipc/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,4 @@ pub type Dictionaries = AHashMap<i64, Box<dyn Array>>;
pub(crate) type Node<'a> = arrow_format::ipc::FieldNodeRef<'a>;
pub(crate) type IpcBuffer<'a> = arrow_format::ipc::BufferRef<'a>;
pub(crate) type Compression<'a> = arrow_format::ipc::BodyCompressionRef<'a>;
pub(crate) type Version = arrow_format::ipc::MetadataVersion;
pub type Version = arrow_format::ipc::MetadataVersion;
9 changes: 7 additions & 2 deletions src/daft-connect/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
[dependencies]
arrow2 = {workspace = true}
arrow2 = {workspace = true, features = ["io_json_integration"]}
async-stream = "0.3.6"
color-eyre = "0.6.3"
common-daft-config = {workspace = true}
daft-core = {workspace = true}
daft-dsl = {workspace = true}
daft-local-execution = {workspace = true}
daft-logical-plan = {workspace = true}
daft-micropartition = {workspace = true}
daft-scan = {workspace = true}
daft-schema = {workspace = true}
daft-table = {workspace = true}
dashmap = "6.1.0"
derive_more = {workspace = true}
eyre = "0.6.12"
futures = "0.3.31"
itertools = {workspace = true}
pyo3 = {workspace = true, optional = true}
serde_json = {workspace = true}
spark-connect = {workspace = true}
tokio = {version = "1.40.0", features = ["full"]}
tonic = "0.12.3"
tracing = {workspace = true}
uuid = {version = "1.10.0", features = ["v4"]}

[features]
python = ["dep:pyo3", "common-daft-config/python", "daft-local-execution/python", "daft-logical-plan/python", "daft-scan/python", "daft-table/python", "daft-dsl/python", "daft-schema/python", "daft-core/python"]
python = ["dep:pyo3", "common-daft-config/python", "daft-local-execution/python", "daft-logical-plan/python", "daft-scan/python", "daft-table/python", "daft-dsl/python", "daft-schema/python", "daft-core/python", "daft-micropartition/python"]

[lints]
workspace = true
Expand Down
22 changes: 13 additions & 9 deletions src/daft-connect/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#![feature(iter_from_coroutine)]
#![feature(stmt_expr_attributes)]
#![feature(try_trait_v2_residual)]
#![deny(clippy::print_stdout)]

use dashmap::DashMap;
use eyre::Context;
Expand All @@ -23,7 +22,7 @@ use spark_connect::{
ReleaseExecuteResponse, ReleaseSessionRequest, ReleaseSessionResponse,
};
use tonic::{transport::Server, Request, Response, Status};
use tracing::{debug, info};
use tracing::info;
use uuid::Uuid;

use crate::session::Session;
Expand Down Expand Up @@ -325,8 +324,6 @@ impl SparkConnectService for DaftSparkConnectService {
result: Some(analyze_plan_response::Result::Schema(schema)),
};

debug!("response: {response:#?}");

Ok(Response::new(response))
}
_ => unimplemented_err!("Analyze plan operation is not yet implemented"),
Expand All @@ -346,7 +343,6 @@ impl SparkConnectService for DaftSparkConnectService {
&self,
_request: Request<InterruptRequest>,
) -> Result<Response<InterruptResponse>, Status> {
println!("got interrupt");
unimplemented_err!("interrupt operation is not yet implemented")
}

Expand All @@ -361,17 +357,26 @@ impl SparkConnectService for DaftSparkConnectService {
#[tracing::instrument(skip_all)]
async fn release_execute(
&self,
_request: Request<ReleaseExecuteRequest>,
request: Request<ReleaseExecuteRequest>,
) -> Result<Response<ReleaseExecuteResponse>, Status> {
unimplemented_err!("release_execute operation is not yet implemented")
let request = request.into_inner();

let session = self.get_session(&request.session_id)?;

let response = ReleaseExecuteResponse {
session_id: session.client_side_session_id().to_string(),
server_side_session_id: session.server_side_session_id().to_string(),
operation_id: None, // todo: set but not strictly required
};

Ok(Response::new(response))
}

#[tracing::instrument(skip_all)]
async fn release_session(
&self,
_request: Request<ReleaseSessionRequest>,
) -> Result<Response<ReleaseSessionResponse>, Status> {
println!("got release session");
unimplemented_err!("release_session operation is not yet implemented")
}

Expand All @@ -380,7 +385,6 @@ impl SparkConnectService for DaftSparkConnectService {
&self,
_request: Request<FetchErrorDetailsRequest>,
) -> Result<Response<FetchErrorDetailsResponse>, Status> {
println!("got fetch error details");
unimplemented_err!("fetch_error_details operation is not yet implemented")
}
}
Expand Down
11 changes: 5 additions & 6 deletions src/daft-connect/src/op/execute/root.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, future::ready, sync::Arc};
use std::{future::ready, sync::Arc};

use common_daft_config::DaftExecutionConfig;
use daft_local_execution::NativeExecutor;
Expand All @@ -10,6 +10,7 @@ use crate::{
op::execute::{ExecuteStream, PlanIds},
session::Session,
translation,
translation::Plan,
};

impl Session {
Expand All @@ -31,13 +32,11 @@ impl Session {
let (tx, rx) = tokio::sync::mpsc::channel::<eyre::Result<ExecutePlanResponse>>(1);
tokio::spawn(async move {
let execution_fut = async {
let plan = translation::to_logical_plan(command)?;
let optimized_plan = plan.optimize()?;
let Plan { builder, psets } = translation::to_logical_plan(command)?;
let optimized_plan = builder.optimize()?;
andrewgazelka marked this conversation as resolved.
Show resolved Hide resolved
let cfg = Arc::new(DaftExecutionConfig::default());
let native_executor = NativeExecutor::from_logical_plan_builder(&optimized_plan)?;
let mut result_stream = native_executor
.run(HashMap::new(), cfg, None)?
.into_stream();
let mut result_stream = native_executor.run(psets, cfg, None)?.into_stream();

while let Some(result) = result_stream.next().await {
let result = result?;
Expand Down
4 changes: 2 additions & 2 deletions src/daft-connect/src/translation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ mod literal;
mod logical_plan;
mod schema;

pub use datatype::{to_daft_datatype, to_spark_datatype};
pub use datatype::{deser_spark_datatype, to_daft_datatype, to_spark_datatype};
pub use expr::to_daft_expr;
pub use literal::to_daft_literal;
pub use logical_plan::to_logical_plan;
pub use logical_plan::{to_logical_plan, Plan};
pub use schema::relation_to_schema;
3 changes: 3 additions & 0 deletions src/daft-connect/src/translation/datatype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ use eyre::{bail, ensure, WrapErr};
use spark_connect::data_type::Kind;
use tracing::warn;

mod codec;
pub use codec::deser as deser_spark_datatype;

pub fn to_spark_datatype(datatype: &DataType) -> spark_connect::DataType {
match datatype {
DataType::Null => spark_connect::DataType {
Expand Down
Loading
Loading