Skip to content

Commit

Permalink
Add RecordBatch::schema_ref (#5474)
Browse files Browse the repository at this point in the history
* Add RecordBatch::schema_ref

* Fix Clippy errors

---------

Co-authored-by: Clide Stefani <[email protected]>
  • Loading branch information
monkwire and monkwire authored Mar 6, 2024
1 parent ace6d90 commit 1553267
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 24 deletions.
5 changes: 5 additions & 0 deletions arrow-array/src/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ impl RecordBatch {
self.schema.clone()
}

/// Returns a reference to the [`Schema`] of the record batch.
pub fn schema_ref(&self) -> &SchemaRef {
&self.schema
}

/// Projects the schema onto the specified columns
pub fn project(&self, indices: &[usize]) -> Result<RecordBatch, ArrowError> {
let projected_schema = self.schema.project(indices)?;
Expand Down
14 changes: 7 additions & 7 deletions arrow-flight/examples/flight_sql_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@ impl FlightSqlService for FlightSqlServiceImpl {
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
self.check_token(&request)?;
let batch = Self::fake_result().map_err(|e| status!("Could not fake a result", e))?;
let schema = batch.schema();
let batches = vec![batch];
let flight_data = batches_to_flight_data(schema.as_ref(), batches)
let schema = batch.schema_ref();
let batches = vec![batch.clone()];
let flight_data = batches_to_flight_data(schema, batches)
.map_err(|e| status!("Could not convert batches", e))?
.into_iter()
.map(Ok);
Expand Down Expand Up @@ -641,10 +641,10 @@ impl FlightSqlService for FlightSqlServiceImpl {
request: Request<Action>,
) -> Result<ActionCreatePreparedStatementResult, Status> {
self.check_token(&request)?;
let schema = Self::fake_result()
.map_err(|e| status!("Error getting result schema", e))?
.schema();
let message = SchemaAsIpc::new(&schema, &IpcWriteOptions::default())
let record_batch =
Self::fake_result().map_err(|e| status!("Error getting result schema", e))?;
let schema = record_batch.schema_ref();
let message = SchemaAsIpc::new(schema, &IpcWriteOptions::default())
.try_into()
.map_err(|e| status!("Unable to serialize schema", e))?;
let IpcMessage(schema_bytes) = message;
Expand Down
8 changes: 4 additions & 4 deletions arrow-flight/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ impl FlightDataEncoder {
let schema = match &self.schema {
Some(schema) => schema.clone(),
// encode the schema if this is the first time we have seen it
None => self.encode_schema(&batch.schema()),
None => self.encode_schema(batch.schema_ref()),
};

// encode the batch
Expand Down Expand Up @@ -565,12 +565,12 @@ mod tests {

let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c1) as ArrayRef)])
.expect("cannot create record batch");
let schema = batch.schema();
let schema = batch.schema_ref();

let (_, baseline_flight_batch) = make_flight_data(&batch, &options);

let big_batch = batch.slice(0, batch.num_rows() - 1);
let optimized_big_batch = prepare_batch_for_flight(&big_batch, Arc::clone(&schema), false)
let optimized_big_batch = prepare_batch_for_flight(&big_batch, Arc::clone(schema), false)
.expect("failed to optimize");
let (_, optimized_big_flight_batch) = make_flight_data(&optimized_big_batch, &options);

Expand All @@ -581,7 +581,7 @@ mod tests {

let small_batch = batch.slice(0, 1);
let optimized_small_batch =
prepare_batch_for_flight(&small_batch, Arc::clone(&schema), false)
prepare_batch_for_flight(&small_batch, Arc::clone(schema), false)
.expect("failed to optimize");
let (_, optimized_small_flight_batch) = make_flight_data(&optimized_small_batch, &options);

Expand Down
2 changes: 1 addition & 1 deletion arrow-flight/tests/encode_decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ async fn roundtrip(input: Vec<RecordBatch>) {
/// When <https://github.com/apache/arrow-rs/issues/3389> is resolved,
/// it should be possible to use `roundtrip`
async fn roundtrip_dictionary(input: Vec<RecordBatch>) {
let schema = Arc::new(prepare_schema_for_flight(&input[0].schema()));
let schema = Arc::new(prepare_schema_for_flight(input[0].schema_ref()));
let expected_output: Vec<_> = input
.iter()
.map(|batch| prepare_batch_for_flight(batch, schema.clone()).unwrap())
Expand Down
8 changes: 4 additions & 4 deletions arrow-flight/tests/flight_sql_client_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ impl FlightSqlServiceImpl {
let batch = Self::fake_result()?;

Ok(FlightInfo::new()
.try_with_schema(&batch.schema())
.try_with_schema(batch.schema_ref())
.expect("encoding schema")
.with_endpoint(
FlightEndpoint::new().with_ticket(Ticket::new(
Expand Down Expand Up @@ -245,9 +245,9 @@ impl FlightSqlService for FlightSqlServiceImpl {
"part_2" => batch.slice(2, 1),
ticket => panic!("Invalid ticket: {ticket:?}"),
};
let schema = batch.schema();
let batches = vec![batch];
let flight_data = batches_to_flight_data(schema.as_ref(), batches)
let schema = batch.schema_ref();
let batches = vec![batch.clone()];
let flight_data = batches_to_flight_data(schema, batches)
.unwrap()
.into_iter()
.map(Ok);
Expand Down
8 changes: 4 additions & 4 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1429,7 +1429,7 @@ mod tests {

fn roundtrip_ipc(rb: &RecordBatch) -> RecordBatch {
let mut buf = Vec::new();
let mut writer = crate::writer::FileWriter::try_new(&mut buf, &rb.schema()).unwrap();
let mut writer = crate::writer::FileWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
writer.write(rb).unwrap();
writer.finish().unwrap();
drop(writer);
Expand All @@ -1440,7 +1440,7 @@ mod tests {

fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch {
let mut buf = Vec::new();
let mut writer = crate::writer::StreamWriter::try_new(&mut buf, &rb.schema()).unwrap();
let mut writer = crate::writer::StreamWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
writer.write(rb).unwrap();
writer.finish().unwrap();
drop(writer);
Expand Down Expand Up @@ -1815,7 +1815,7 @@ mod tests {
let batch = RecordBatch::new_empty(schema);

let mut buf = Vec::new();
let mut writer = crate::writer::FileWriter::try_new(&mut buf, &batch.schema()).unwrap();
let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
writer.write(&batch).unwrap();
writer.finish().unwrap();
drop(writer);
Expand All @@ -1842,7 +1842,7 @@ mod tests {
let batch = RecordBatch::new_empty(schema);

let mut buf = Vec::new();
let mut writer = crate::writer::FileWriter::try_new(&mut buf, &batch.schema()).unwrap();
let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
writer.write(&batch).unwrap();
writer.finish().unwrap();
drop(writer);
Expand Down
6 changes: 3 additions & 3 deletions arrow-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1436,7 +1436,7 @@ mod tests {
use super::*;

fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
let mut writer = FileWriter::try_new(vec![], &rb.schema()).unwrap();
let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
writer.write(rb).unwrap();
writer.finish().unwrap();
writer.into_inner().unwrap()
Expand All @@ -1448,7 +1448,7 @@ mod tests {
}

fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
let mut stream_writer = StreamWriter::try_new(vec![], &record.schema()).unwrap();
let mut stream_writer = StreamWriter::try_new(vec![], record.schema_ref()).unwrap();
stream_writer.write(record).unwrap();
stream_writer.finish().unwrap();
stream_writer.into_inner().unwrap()
Expand Down Expand Up @@ -1982,7 +1982,7 @@ mod tests {
)
.expect("new batch");

let mut writer = StreamWriter::try_new(vec![], &batch.schema()).expect("new writer");
let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
writer.write(&batch).expect("write");
let outbuf = writer.into_inner().expect("inner");

Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3082,7 +3082,7 @@ mod tests {
.unwrap();

let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
let actual = concat_batches(&batch.schema(), &batches).unwrap();
let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
assert_eq!(actual.num_rows(), selection.row_count());

let mut batch_offset = 0;
Expand Down

0 comments on commit 1553267

Please sign in to comment.