From 677504d6f67415b5aee1b247fe3b1c3a845f6998 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Wed, 17 May 2023 10:28:18 +0100 Subject: [PATCH 1/8] Implement executeStream method This returns a new RecordBatchStream class that acts similarly to ArrowReader but record batches can be retrieved asynchronously. --- .../apache/arrow/datafusion/DataFrame.java | 8 ++ .../apache/arrow/datafusion/DataFrames.java | 2 + .../arrow/datafusion/DefaultDataFrame.java | 19 +++ .../datafusion/DefaultRecordBatchStream.java | 121 ++++++++++++++++++ .../arrow/datafusion/RecordBatchStream.java | 24 ++++ .../arrow/datafusion/TestExecuteStream.java | 48 +++++++ datafusion-jni/Cargo.toml | 1 + datafusion-jni/src/dataframe.rs | 39 ++++++ datafusion-jni/src/lib.rs | 1 + datafusion-jni/src/stream.rs | 119 +++++++++++++++++ 10 files changed, 382 insertions(+) create mode 100644 datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultRecordBatchStream.java create mode 100644 datafusion-java/src/main/java/org/apache/arrow/datafusion/RecordBatchStream.java create mode 100644 datafusion-java/src/test/java/org/apache/arrow/datafusion/TestExecuteStream.java create mode 100644 datafusion-jni/src/stream.rs diff --git a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrame.java b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrame.java index 28c3b4d..6e00417 100644 --- a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrame.java +++ b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrame.java @@ -21,6 +21,14 @@ public interface DataFrame extends NativeProxy { */ CompletableFuture collect(BufferAllocator allocator); + /** + * Execute this DataFrame and return a stream of the result data + * + * @param allocator {@link BufferAllocator buffer allocator} to allocate vectors for the stream + * @return Stream of results + */ + CompletableFuture executeStream(BufferAllocator allocator); + /** * Print results. * diff --git a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrames.java b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrames.java index 611f382..93e8aca 100644 --- a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrames.java +++ b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrames.java @@ -15,6 +15,8 @@ private DataFrames() {} static native void collectDataframe( long runtime, long dataframe, BiConsumer callback); + static native void executeStream(long runtime, long dataframe, ObjectResultCallback callback); + static native void writeParquet( long runtime, long dataframe, String path, Consumer callback); diff --git a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultDataFrame.java b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultDataFrame.java index 394668f..d5e4699 100644 --- a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultDataFrame.java +++ b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultDataFrame.java @@ -41,6 +41,25 @@ public CompletableFuture collect(BufferAllocator allocator) { return result; } + @Override + public CompletableFuture executeStream(BufferAllocator allocator) { + CompletableFuture result = new CompletableFuture<>(); + Runtime runtime = context.getRuntime(); + long runtimePointer = runtime.getPointer(); + long dataframe = getPointer(); + DataFrames.executeStream( + runtimePointer, + dataframe, + (errString, streamId) -> { + if (containsError(errString)) { + result.completeExceptionally(new RuntimeException(errString)); + } else { + result.complete(new DefaultRecordBatchStream(context, streamId, allocator)); + } + }); + return result; + } + private boolean containsError(String errString) { return errString != null && !"".equals(errString); } diff --git a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultRecordBatchStream.java b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultRecordBatchStream.java new file mode 100644 index 0000000..18c3198 --- /dev/null +++ b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultRecordBatchStream.java @@ -0,0 +1,121 @@ +package org.apache.arrow.datafusion; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel; +import org.apache.arrow.vector.util.TransferPair; + +class DefaultRecordBatchStream extends AbstractProxy implements RecordBatchStream { + private final SessionContext context; + private final BufferAllocator allocator; + private VectorSchemaRoot vectorSchemaRoot = null; + private boolean initialized = false; + + DefaultRecordBatchStream(SessionContext context, long pointer, BufferAllocator allocator) { + super(pointer); + this.context = context; + this.allocator = allocator; + } + + @Override + void doClose(long pointer) { + destroy(pointer); + if (initialized) { + vectorSchemaRoot.close(); + } + } + + @Override + public VectorSchemaRoot getVectorSchemaRoot() { + ensureInitialized(); + return vectorSchemaRoot; + } + + @Override + public CompletableFuture loadNextBatch() { + ensureInitialized(); + Runtime runtime = context.getRuntime(); + long runtimePointer = runtime.getPointer(); + long recordBatchStream = getPointer(); + CompletableFuture result = new CompletableFuture<>(); + next( + runtimePointer, + recordBatchStream, + (String errString, byte[] arr) -> { + if (containsError(errString)) { + result.completeExceptionally(new RuntimeException(errString)); + } else if (arr.length == 0) { + // Reached end of stream + result.complete(false); + } else { + ByteArrayReadableSeekableByteChannel byteChannel = + new ByteArrayReadableSeekableByteChannel(arr); + try (ArrowFileReader reader = new ArrowFileReader(byteChannel, allocator)) { + VectorSchemaRoot batchRoot = reader.getVectorSchemaRoot(); + if (!reader.loadNextBatch()) { + result.completeExceptionally(new RuntimeException("No record batch from reader")); + } else { + // Transfer data into our VectorSchemaRoot + List vectors = batchRoot.getFieldVectors(); + for (int i = 0; i < vectors.size(); ++i) { + TransferPair pair = + vectors.get(i).makeTransferPair(vectorSchemaRoot.getVector(i)); + pair.transfer(); + } + vectorSchemaRoot.setRowCount(batchRoot.getRowCount()); + result.complete(true); + } + } catch (Exception e) { + result.completeExceptionally(e); + } + } + }); + return result; + } + + private void ensureInitialized() { + if (!initialized) { + Schema schema = getSchema(); + this.vectorSchemaRoot = VectorSchemaRoot.create(schema, allocator); + } + initialized = true; + } + + private Schema getSchema() { + long recordBatchStream = getPointer(); + // Native method is not async, but use a future to store the result for convenience + CompletableFuture result = new CompletableFuture<>(); + getSchema( + recordBatchStream, + (String errString, byte[] arr) -> { + if (containsError(errString)) { + result.completeExceptionally(new RuntimeException(errString)); + } else { + ByteArrayReadableSeekableByteChannel byteChannel = + new ByteArrayReadableSeekableByteChannel(arr); + try (ArrowFileReader reader = new ArrowFileReader(byteChannel, allocator)) { + result.complete(reader.getVectorSchemaRoot().getSchema()); + } catch (Exception e) { + result.completeExceptionally(e); + } + } + }); + return result.join(); + } + + private static boolean containsError(String errString) { + return errString != null && !"".equals(errString); + } + + private static native void getSchema(long pointer, BiConsumer callback); + + private static native void next(long runtime, long pointer, BiConsumer callback); + + private static native void destroy(long pointer); +} diff --git a/datafusion-java/src/main/java/org/apache/arrow/datafusion/RecordBatchStream.java b/datafusion-java/src/main/java/org/apache/arrow/datafusion/RecordBatchStream.java new file mode 100644 index 0000000..d54ea9a --- /dev/null +++ b/datafusion-java/src/main/java/org/apache/arrow/datafusion/RecordBatchStream.java @@ -0,0 +1,24 @@ +package org.apache.arrow.datafusion; + +import java.util.concurrent.CompletableFuture; +import org.apache.arrow.vector.VectorSchemaRoot; + +/** + * A record batch stream is a stream of tabular Arrow data that can be iterated over asynchronously + */ +public interface RecordBatchStream extends AutoCloseable, NativeProxy { + /** + * Get the VectorSchemaRoot that will be populated with data as the stream is iterated over + * + * @return the stream's VectorSchemaRoot + */ + VectorSchemaRoot getVectorSchemaRoot(); + + /** + * Load the next record batch in the stream into the VectorSchemaRoot + * + * @return Future that will complete with true if a batch was loaded or false if the end of the + * stream has been reached + */ + CompletableFuture loadNextBatch(); +} diff --git a/datafusion-java/src/test/java/org/apache/arrow/datafusion/TestExecuteStream.java b/datafusion-java/src/test/java/org/apache/arrow/datafusion/TestExecuteStream.java new file mode 100644 index 0000000..42c94b4 --- /dev/null +++ b/datafusion-java/src/test/java/org/apache/arrow/datafusion/TestExecuteStream.java @@ -0,0 +1,48 @@ +package org.apache.arrow.datafusion; + +import static org.junit.jupiter.api.Assertions.*; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestExecuteStream { + @Test + public void executeStream(@TempDir Path tempDir) throws Exception { + try (SessionContext context = SessionContexts.create(); + BufferAllocator allocator = new RootAllocator()) { + Path csvFilePath = tempDir.resolve("data.csv"); + + List lines = Arrays.asList("x,y", "1,2", "3,4"); + Files.write(csvFilePath, lines); + + context.registerCsv("test", csvFilePath).join(); + + try (RecordBatchStream stream = + context + .sql("SELECT y FROM test WHERE x = 3") + .thenComposeAsync(df -> df.executeStream(allocator)) + .join()) { + VectorSchemaRoot root = stream.getVectorSchemaRoot(); + Schema schema = root.getSchema(); + assertEquals(1, schema.getFields().size()); + assertEquals("y", schema.getFields().get(0).getName()); + + assertTrue(stream.loadNextBatch().join()); + assertEquals(1, root.getRowCount()); + BigIntVector yValues = (BigIntVector) root.getVector(0); + assertEquals(4, yValues.get(0)); + + assertFalse(stream.loadNextBatch().join()); + } + } + } +} diff --git a/datafusion-jni/Cargo.toml b/datafusion-jni/Cargo.toml index b7f9be4..b9e2059 100644 --- a/datafusion-jni/Cargo.toml +++ b/datafusion-jni/Cargo.toml @@ -14,6 +14,7 @@ jni = "^0.19.0" tokio = "^1.18.0" arrow = "^22.0" datafusion = "^12.0" +futures = "0.3.28" [lib] crate_type = ["cdylib"] diff --git a/datafusion-jni/src/dataframe.rs b/datafusion-jni/src/dataframe.rs index 3a3d8fd..d34af65 100644 --- a/datafusion-jni/src/dataframe.rs +++ b/datafusion-jni/src/dataframe.rs @@ -51,6 +51,45 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_collectDatafr }); } +#[no_mangle] +pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_executeStream( + env: JNIEnv, + _class: JClass, + runtime: jlong, + dataframe: jlong, + callback: JObject, +) { + let runtime = unsafe { &mut *(runtime as *mut Runtime) }; + let dataframe = unsafe { &mut *(dataframe as *mut Arc) }; + runtime.block_on(async { + let stream_result = dataframe.execute_stream().await; + match stream_result { + Ok(stream) => { + let stream = Box::into_raw(Box::new(stream)) as jlong; + env.call_method( + callback, + "callback", + "(Ljava/lang/String;J)V", + &[JValue::Void, stream.into()], + ) + } + Err(err) => { + let stream = -1 as jlong; + let err_message = env + .new_string(err.to_string()) + .expect("Couldn't create java string!"); + env.call_method( + callback, + "callback", + "(Ljava/lang/String;J)V", + &[err_message.into(), stream.into()], + ) + } + } + .expect("failed to call method"); + }); +} + #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_showDataframe( env: JNIEnv, diff --git a/datafusion-jni/src/lib.rs b/datafusion-jni/src/lib.rs index ac30a3f..6806097 100644 --- a/datafusion-jni/src/lib.rs +++ b/datafusion-jni/src/lib.rs @@ -1,3 +1,4 @@ mod context; mod dataframe; mod runtime; +mod stream; diff --git a/datafusion-jni/src/stream.rs b/datafusion-jni/src/stream.rs new file mode 100644 index 0000000..10d9e95 --- /dev/null +++ b/datafusion-jni/src/stream.rs @@ -0,0 +1,119 @@ +use futures::stream::TryStreamExt; +use arrow::ipc::writer::FileWriter; +use jni::objects::{JClass, JObject}; +use jni::sys::jlong; +use jni::JNIEnv; +use std::convert::Into; +use std::io::BufWriter; +use std::io::Cursor; +use datafusion::physical_plan::SendableRecordBatchStream; +use tokio::runtime::Runtime; + +#[no_mangle] +pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultRecordBatchStream_next( + env: JNIEnv, + _class: JClass, + runtime: jlong, + stream: jlong, + callback: JObject, +) { + let runtime = unsafe { &mut *(runtime as *mut Runtime) }; + let stream = unsafe { &mut *(stream as *mut SendableRecordBatchStream) }; + runtime.block_on(async { + let next = stream.try_next().await; + match next { + Ok(Some(batch)) => { + let schema = batch.schema(); + let mut buff = Cursor::new(vec![0; 0]); + { + let mut writer = FileWriter::try_new(BufWriter::new(&mut buff), &schema) + .expect("failed to create writer"); + writer.write(&batch).expect("failed to write batch"); + writer.finish().expect("failed to finish writer"); + } + let err_message = env + .new_string("") + .expect("Couldn't create java string!"); + let ba = env + .byte_array_from_slice(&buff.get_ref()) + .expect("cannot create byte array"); + env.call_method( + callback, + "accept", + "(Ljava/lang/Object;Ljava/lang/Object;)V", + &[err_message.into(), ba.into()], + ) + } + Ok(None) => { + let err_message = env + .new_string("") + .expect("Couldn't create java string!"); + let buff = Cursor::new(vec![0; 0]); + let ba = env + .byte_array_from_slice(&buff.get_ref()) + .expect("cannot create empty byte array"); + env.call_method( + callback, + "accept", + "(Ljava/lang/Object;Ljava/lang/Object;)V", + &[err_message.into(), ba.into()], + ) + } + Err(err) => { + let err_message = env + .new_string(err.to_string()) + .expect("Couldn't create java string!"); + let buff = Cursor::new(vec![0; 0]); + let ba = env + .byte_array_from_slice(&buff.get_ref()) + .expect("cannot create empty byte array"); + env.call_method( + callback, + "accept", + "(Ljava/lang/Object;Ljava/lang/Object;)V", + &[err_message.into(), ba.into()], + ) + } + } + .expect("failed to call method"); + }); +} + +#[no_mangle] +pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultRecordBatchStream_getSchema( + env: JNIEnv, + _class: JClass, + stream: jlong, + callback: JObject, +) { + let stream = unsafe { &mut *(stream as *mut SendableRecordBatchStream) }; + let schema = stream.schema(); + let mut buff = Cursor::new(vec![0; 0]); + { + let mut writer = FileWriter::try_new(BufWriter::new(&mut buff), &schema) + .expect("failed to create writer"); + writer.finish().expect("failed to finish writer"); + } + let err_message = env + .new_string("") + .expect("Couldn't create java string!"); + let ba = env + .byte_array_from_slice(&buff.get_ref()) + .expect("cannot create byte array"); + env.call_method( + callback, + "accept", + "(Ljava/lang/Object;Ljava/lang/Object;)V", + &[err_message.into(), ba.into()], + ) + .expect("failed to call method"); +} + +#[no_mangle] +pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultRecordBatchStream_destroy( + _env: JNIEnv, + _class: JClass, + pointer: jlong, +) { + let _ = unsafe { Box::from_raw(pointer as *mut SendableRecordBatchStream) }; +} From 0d8e9bfcd121cf1cd66c0371a1ab52e7430c7101 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Wed, 17 May 2023 10:28:21 +0100 Subject: [PATCH 2/8] Use C data interface for reading stream data --- datafusion-java/build.gradle | 1 + .../datafusion/DefaultRecordBatchStream.java | 49 +++------ .../arrow/datafusion/TestExecuteStream.java | 16 ++- datafusion-jni/Cargo.toml | 2 +- datafusion-jni/src/stream.rs | 103 ++++++++---------- 5 files changed, 77 insertions(+), 94 deletions(-) diff --git a/datafusion-java/build.gradle b/datafusion-java/build.gradle index 892daaf..90c97fb 100644 --- a/datafusion-java/build.gradle +++ b/datafusion-java/build.gradle @@ -11,6 +11,7 @@ dependencies { implementation 'org.slf4j:slf4j-api:1.7.36' implementation 'org.apache.arrow:arrow-format:13.0.0' implementation 'org.apache.arrow:arrow-vector:13.0.0' + implementation 'org.apache.arrow:arrow-c-data:13.0.0' runtimeOnly 'org.apache.arrow:arrow-memory-unsafe:13.0.0' testImplementation 'org.junit.jupiter:junit-jupiter:5.8.1' testImplementation 'org.apache.hadoop:hadoop-client:3.3.5' diff --git a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultRecordBatchStream.java b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultRecordBatchStream.java index 18c3198..6a95984 100644 --- a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultRecordBatchStream.java +++ b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultRecordBatchStream.java @@ -1,15 +1,12 @@ package org.apache.arrow.datafusion; -import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.function.BiConsumer; +import org.apache.arrow.c.ArrowArray; +import org.apache.arrow.c.ArrowSchema; +import org.apache.arrow.c.Data; import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.ipc.ArrowFileReader; import org.apache.arrow.vector.types.pojo.Schema; -import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel; -import org.apache.arrow.vector.util.TransferPair; class DefaultRecordBatchStream extends AbstractProxy implements RecordBatchStream { private final SessionContext context; @@ -47,30 +44,17 @@ public CompletableFuture loadNextBatch() { next( runtimePointer, recordBatchStream, - (String errString, byte[] arr) -> { + (errString, arrowArrayAddress) -> { if (containsError(errString)) { result.completeExceptionally(new RuntimeException(errString)); - } else if (arr.length == 0) { + } else if (arrowArrayAddress == 0) { // Reached end of stream result.complete(false); } else { - ByteArrayReadableSeekableByteChannel byteChannel = - new ByteArrayReadableSeekableByteChannel(arr); - try (ArrowFileReader reader = new ArrowFileReader(byteChannel, allocator)) { - VectorSchemaRoot batchRoot = reader.getVectorSchemaRoot(); - if (!reader.loadNextBatch()) { - result.completeExceptionally(new RuntimeException("No record batch from reader")); - } else { - // Transfer data into our VectorSchemaRoot - List vectors = batchRoot.getFieldVectors(); - for (int i = 0; i < vectors.size(); ++i) { - TransferPair pair = - vectors.get(i).makeTransferPair(vectorSchemaRoot.getVector(i)); - pair.transfer(); - } - vectorSchemaRoot.setRowCount(batchRoot.getRowCount()); - result.complete(true); - } + try { + ArrowArray arrowArray = ArrowArray.wrap(arrowArrayAddress); + Data.importIntoVectorSchemaRoot(allocator, arrowArray, vectorSchemaRoot, null); + result.complete(true); } catch (Exception e) { result.completeExceptionally(e); } @@ -93,14 +77,15 @@ private Schema getSchema() { CompletableFuture result = new CompletableFuture<>(); getSchema( recordBatchStream, - (String errString, byte[] arr) -> { + (errString, arrowSchemaAddress) -> { if (containsError(errString)) { result.completeExceptionally(new RuntimeException(errString)); } else { - ByteArrayReadableSeekableByteChannel byteChannel = - new ByteArrayReadableSeekableByteChannel(arr); - try (ArrowFileReader reader = new ArrowFileReader(byteChannel, allocator)) { - result.complete(reader.getVectorSchemaRoot().getSchema()); + try { + ArrowSchema arrowSchema = ArrowSchema.wrap(arrowSchemaAddress); + Schema schema = Data.importSchema(allocator, arrowSchema, null); + result.complete(schema); + // The FFI schema will be released from rust when it is dropped } catch (Exception e) { result.completeExceptionally(e); } @@ -113,9 +98,9 @@ private static boolean containsError(String errString) { return errString != null && !"".equals(errString); } - private static native void getSchema(long pointer, BiConsumer callback); + private static native void getSchema(long pointer, ObjectResultCallback callback); - private static native void next(long runtime, long pointer, BiConsumer callback); + private static native void next(long runtime, long pointer, ObjectResultCallback callback); private static native void destroy(long pointer); } diff --git a/datafusion-java/src/test/java/org/apache/arrow/datafusion/TestExecuteStream.java b/datafusion-java/src/test/java/org/apache/arrow/datafusion/TestExecuteStream.java index 42c94b4..511cc04 100644 --- a/datafusion-java/src/test/java/org/apache/arrow/datafusion/TestExecuteStream.java +++ b/datafusion-java/src/test/java/org/apache/arrow/datafusion/TestExecuteStream.java @@ -9,6 +9,7 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.Float8Vector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.types.pojo.Schema; import org.junit.jupiter.api.Test; @@ -21,25 +22,30 @@ public void executeStream(@TempDir Path tempDir) throws Exception { BufferAllocator allocator = new RootAllocator()) { Path csvFilePath = tempDir.resolve("data.csv"); - List lines = Arrays.asList("x,y", "1,2", "3,4"); + List lines = Arrays.asList("x,y,z", "1,2,3.5", "4,5,6.5", "7,8,9.5"); Files.write(csvFilePath, lines); context.registerCsv("test", csvFilePath).join(); try (RecordBatchStream stream = context - .sql("SELECT y FROM test WHERE x = 3") + .sql("SELECT y,z FROM test WHERE x > 3") .thenComposeAsync(df -> df.executeStream(allocator)) .join()) { VectorSchemaRoot root = stream.getVectorSchemaRoot(); Schema schema = root.getSchema(); - assertEquals(1, schema.getFields().size()); + assertEquals(2, schema.getFields().size()); assertEquals("y", schema.getFields().get(0).getName()); + assertEquals("z", schema.getFields().get(1).getName()); assertTrue(stream.loadNextBatch().join()); - assertEquals(1, root.getRowCount()); + assertEquals(2, root.getRowCount()); BigIntVector yValues = (BigIntVector) root.getVector(0); - assertEquals(4, yValues.get(0)); + assertEquals(5, yValues.get(0)); + assertEquals(8, yValues.get(1)); + Float8Vector zValues = (Float8Vector) root.getVector(1); + assertEquals(6.5, zValues.get(0)); + assertEquals(9.5, zValues.get(1)); assertFalse(stream.loadNextBatch().join()); } diff --git a/datafusion-jni/Cargo.toml b/datafusion-jni/Cargo.toml index b9e2059..48cb593 100644 --- a/datafusion-jni/Cargo.toml +++ b/datafusion-jni/Cargo.toml @@ -12,7 +12,7 @@ edition = "2021" [dependencies] jni = "^0.19.0" tokio = "^1.18.0" -arrow = "^22.0" +arrow = { version = "^22.0", features = ["ffi"] } datafusion = "^12.0" futures = "0.3.28" diff --git a/datafusion-jni/src/stream.rs b/datafusion-jni/src/stream.rs index 10d9e95..1cf0e06 100644 --- a/datafusion-jni/src/stream.rs +++ b/datafusion-jni/src/stream.rs @@ -1,12 +1,13 @@ +use arrow::array::Array; +use arrow::array::StructArray; +use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; +use datafusion::physical_plan::SendableRecordBatchStream; use futures::stream::TryStreamExt; -use arrow::ipc::writer::FileWriter; -use jni::objects::{JClass, JObject}; +use jni::objects::{JClass, JObject, JValue}; use jni::sys::jlong; use jni::JNIEnv; use std::convert::Into; -use std::io::BufWriter; -use std::io::Cursor; -use datafusion::physical_plan::SendableRecordBatchStream; +use std::ptr::addr_of_mut; use tokio::runtime::Runtime; #[no_mangle] @@ -23,55 +24,39 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultRecordBatchStream let next = stream.try_next().await; match next { Ok(Some(batch)) => { - let schema = batch.schema(); - let mut buff = Cursor::new(vec![0; 0]); - { - let mut writer = FileWriter::try_new(BufWriter::new(&mut buff), &schema) - .expect("failed to create writer"); - writer.write(&batch).expect("failed to write batch"); - writer.finish().expect("failed to finish writer"); - } - let err_message = env - .new_string("") - .expect("Couldn't create java string!"); - let ba = env - .byte_array_from_slice(&buff.get_ref()) - .expect("cannot create byte array"); + // Convert to struct array for compatibility with FFI + let struct_array: StructArray = batch.into(); + let array_data = struct_array.into_data(); + let mut ffi_array = FFI_ArrowArray::new(&array_data); + let err_message = env.new_string("").expect("Couldn't create java string!"); + let array_address = addr_of_mut!(ffi_array) as jlong; env.call_method( callback, - "accept", - "(Ljava/lang/Object;Ljava/lang/Object;)V", - &[err_message.into(), ba.into()], + "callback", + "(Ljava/lang/String;J)V", + &[err_message.into(), array_address.into()], ) } Ok(None) => { - let err_message = env - .new_string("") - .expect("Couldn't create java string!"); - let buff = Cursor::new(vec![0; 0]); - let ba = env - .byte_array_from_slice(&buff.get_ref()) - .expect("cannot create empty byte array"); + let err_message = env.new_string("").expect("Couldn't create java string!"); + let array_address = 0 as jlong; env.call_method( callback, - "accept", - "(Ljava/lang/Object;Ljava/lang/Object;)V", - &[err_message.into(), ba.into()], + "callback", + "(Ljava/lang/String;J)V", + &[err_message.into(), array_address.into()], ) } Err(err) => { let err_message = env .new_string(err.to_string()) .expect("Couldn't create java string!"); - let buff = Cursor::new(vec![0; 0]); - let ba = env - .byte_array_from_slice(&buff.get_ref()) - .expect("cannot create empty byte array"); + let array_address = -1 as jlong; env.call_method( callback, - "accept", - "(Ljava/lang/Object;Ljava/lang/Object;)V", - &[err_message.into(), ba.into()], + "callback", + "(Ljava/lang/String;J)V", + &[err_message.into(), array_address.into()], ) } } @@ -88,24 +73,30 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultRecordBatchStream ) { let stream = unsafe { &mut *(stream as *mut SendableRecordBatchStream) }; let schema = stream.schema(); - let mut buff = Cursor::new(vec![0; 0]); - { - let mut writer = FileWriter::try_new(BufWriter::new(&mut buff), &schema) - .expect("failed to create writer"); - writer.finish().expect("failed to finish writer"); + let ffi_schema = FFI_ArrowSchema::try_from(&*schema); + match ffi_schema { + Ok(mut ffi_schema) => { + let schema_address = addr_of_mut!(ffi_schema) as jlong; + env.call_method( + callback, + "callback", + "(Ljava/lang/String;J)V", + &[JValue::Void, schema_address.into()], + ) + } + Err(err) => { + let err_message = env + .new_string(err.to_string()) + .expect("Couldn't create java string!"); + let schema_address = -1 as jlong; + env.call_method( + callback, + "callback", + "(Ljava/lang/String;J)V", + &[err_message.into(), schema_address.into()], + ) + } } - let err_message = env - .new_string("") - .expect("Couldn't create java string!"); - let ba = env - .byte_array_from_slice(&buff.get_ref()) - .expect("cannot create byte array"); - env.call_method( - callback, - "accept", - "(Ljava/lang/Object;Ljava/lang/Object;)V", - &[err_message.into(), ba.into()], - ) .expect("failed to call method"); } From 0f04ba83ab3c2567c4a3f600ca5f017680d90070 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Mon, 22 May 2023 15:44:02 +0100 Subject: [PATCH 3/8] Support reading Arrow dictionary data with executeStream --- .../datafusion/DefaultRecordBatchStream.java | 21 +++++- .../arrow/datafusion/RecordBatchStream.java | 3 +- .../arrow/datafusion/TestExecuteStream.java | 60 ++++++++++++++++++ .../test/resources/dictionary_data.parquet | Bin 0 -> 887 bytes datafusion-java/write_test_files.py | 18 ++++++ 5 files changed, 99 insertions(+), 3 deletions(-) create mode 100644 datafusion-java/src/test/resources/dictionary_data.parquet create mode 100644 datafusion-java/write_test_files.py diff --git a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultRecordBatchStream.java b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultRecordBatchStream.java index 6a95984..ef0ff9f 100644 --- a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultRecordBatchStream.java +++ b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultRecordBatchStream.java @@ -1,16 +1,20 @@ package org.apache.arrow.datafusion; +import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.arrow.c.ArrowArray; import org.apache.arrow.c.ArrowSchema; +import org.apache.arrow.c.CDataDictionaryProvider; import org.apache.arrow.c.Data; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.dictionary.Dictionary; import org.apache.arrow.vector.types.pojo.Schema; class DefaultRecordBatchStream extends AbstractProxy implements RecordBatchStream { private final SessionContext context; private final BufferAllocator allocator; + private final CDataDictionaryProvider dictionaryProvider; private VectorSchemaRoot vectorSchemaRoot = null; private boolean initialized = false; @@ -18,11 +22,13 @@ class DefaultRecordBatchStream extends AbstractProxy implements RecordBatchStrea super(pointer); this.context = context; this.allocator = allocator; + this.dictionaryProvider = new CDataDictionaryProvider(); } @Override void doClose(long pointer) { destroy(pointer); + dictionaryProvider.close(); if (initialized) { vectorSchemaRoot.close(); } @@ -53,7 +59,8 @@ public CompletableFuture loadNextBatch() { } else { try { ArrowArray arrowArray = ArrowArray.wrap(arrowArrayAddress); - Data.importIntoVectorSchemaRoot(allocator, arrowArray, vectorSchemaRoot, null); + Data.importIntoVectorSchemaRoot( + allocator, arrowArray, vectorSchemaRoot, dictionaryProvider); result.complete(true); } catch (Exception e) { result.completeExceptionally(e); @@ -63,6 +70,16 @@ public CompletableFuture loadNextBatch() { return result; } + @Override + public Dictionary lookup(long id) { + return dictionaryProvider.lookup(id); + } + + @Override + public Set getDictionaryIds() { + return dictionaryProvider.getDictionaryIds(); + } + private void ensureInitialized() { if (!initialized) { Schema schema = getSchema(); @@ -83,7 +100,7 @@ private Schema getSchema() { } else { try { ArrowSchema arrowSchema = ArrowSchema.wrap(arrowSchemaAddress); - Schema schema = Data.importSchema(allocator, arrowSchema, null); + Schema schema = Data.importSchema(allocator, arrowSchema, dictionaryProvider); result.complete(schema); // The FFI schema will be released from rust when it is dropped } catch (Exception e) { diff --git a/datafusion-java/src/main/java/org/apache/arrow/datafusion/RecordBatchStream.java b/datafusion-java/src/main/java/org/apache/arrow/datafusion/RecordBatchStream.java index d54ea9a..d234394 100644 --- a/datafusion-java/src/main/java/org/apache/arrow/datafusion/RecordBatchStream.java +++ b/datafusion-java/src/main/java/org/apache/arrow/datafusion/RecordBatchStream.java @@ -2,11 +2,12 @@ import java.util.concurrent.CompletableFuture; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.dictionary.DictionaryProvider; /** * A record batch stream is a stream of tabular Arrow data that can be iterated over asynchronously */ -public interface RecordBatchStream extends AutoCloseable, NativeProxy { +public interface RecordBatchStream extends AutoCloseable, NativeProxy, DictionaryProvider { /** * Get the VectorSchemaRoot that will be populated with data as the stream is iterated over * diff --git a/datafusion-java/src/test/java/org/apache/arrow/datafusion/TestExecuteStream.java b/datafusion-java/src/test/java/org/apache/arrow/datafusion/TestExecuteStream.java index 511cc04..2813cc9 100644 --- a/datafusion-java/src/test/java/org/apache/arrow/datafusion/TestExecuteStream.java +++ b/datafusion-java/src/test/java/org/apache/arrow/datafusion/TestExecuteStream.java @@ -2,15 +2,20 @@ import static org.junit.jupiter.api.Assertions.*; +import java.net.URL; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Arrays; import java.util.List; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.dictionary.DictionaryEncoder; import org.apache.arrow.vector.types.pojo.Schema; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -51,4 +56,59 @@ public void executeStream(@TempDir Path tempDir) throws Exception { } } } + + @Test + public void readDictionaryData() throws Exception { + try (SessionContext context = SessionContexts.create(); + BufferAllocator allocator = new RootAllocator()) { + + URL fileUrl = this.getClass().getResource("/dictionary_data.parquet"); + Path parquetFilePath = Paths.get(fileUrl.getPath()); + + context.registerParquet("test", parquetFilePath).join(); + + try (RecordBatchStream stream = + context + .sql("SELECT x,y FROM test") + .thenComposeAsync(df -> df.executeStream(allocator)) + .join()) { + VectorSchemaRoot root = stream.getVectorSchemaRoot(); + Schema schema = root.getSchema(); + assertEquals(2, schema.getFields().size()); + assertEquals("x", schema.getFields().get(0).getName()); + assertEquals("y", schema.getFields().get(1).getName()); + + int rowsRead = 0; + while (stream.loadNextBatch().join()) { + int batchNumRows = root.getRowCount(); + BigIntVector xValuesEncoded = (BigIntVector) root.getVector(0); + long xDictionaryId = xValuesEncoded.getField().getDictionary().getId(); + try (VarCharVector xValues = + (VarCharVector) + DictionaryEncoder.decode(xValuesEncoded, stream.lookup(xDictionaryId))) { + String[] expected = {"one", "two", "three"}; + for (int i = 0; i < batchNumRows; ++i) { + assertEquals( + new String(xValues.get(i), StandardCharsets.UTF_8), expected[(rowsRead + i) % 3]); + } + } + + BigIntVector yValuesEncoded = (BigIntVector) root.getVector(1); + long yDictionaryId = yValuesEncoded.getField().getDictionary().getId(); + try (VarCharVector yValues = + (VarCharVector) + DictionaryEncoder.decode(yValuesEncoded, stream.lookup(yDictionaryId))) { + String[] expected = {"four", "five", "six"}; + for (int i = 0; i < batchNumRows; ++i) { + assertEquals( + new String(yValues.get(i), StandardCharsets.UTF_8), expected[(rowsRead + i) % 3]); + } + } + rowsRead += batchNumRows; + } + + assertEquals(100, rowsRead); + } + } + } } diff --git a/datafusion-java/src/test/resources/dictionary_data.parquet b/datafusion-java/src/test/resources/dictionary_data.parquet new file mode 100644 index 0000000000000000000000000000000000000000..cfecba3056e60914b1ea98ff33ae1572e422d5ea GIT binary patch literal 887 zcmbu8%}(1u5XWcjH06*(RCSFle27G`D%GR`DF`7rj42c(s>V_2$1y@Cd0nssd=N{!Vnp(<-b^8OUNCjr^GjxLd|ELa$Ef%besH_`bd z!qlGcg-`n$!Bdf)A2vXO8Rw7nlF0Mj3b)%IzpND_xHr~YS#Ma@zS$?+W|u!Of8!|T zUFNjN#xY0Yy}!Vr)h(0kN*s%8;$aJ=98tV)`n9AINusEWQ@Bx6yiF*XyV4i;Fu#-S zku-bg$J?gTmw1FLjUIeCIB^(N0|8MKwZ%20ykky1ln{u!BhYf6vEViq^BNvl@Z<%L z!u|aOF-^or+li}h)zrWbYvNW%qCE1{t9@mzZ>y3KD3-K!CVXWd|xA4o| E0|aijCjbBd literal 0 HcmV?d00001 diff --git a/datafusion-java/write_test_files.py b/datafusion-java/write_test_files.py new file mode 100644 index 0000000..b42ecc3 --- /dev/null +++ b/datafusion-java/write_test_files.py @@ -0,0 +1,18 @@ +import pyarrow as pa +import pyarrow.parquet as pq + + +num_rows = 100 + +dict_array_x = pa.DictionaryArray.from_arrays( + pa.array([i % 3 for i in range(num_rows)]), + pa.array(["one", "two", "three"]) +) + +dict_array_y = pa.DictionaryArray.from_arrays( + pa.array([i % 3 for i in range(num_rows)]), + pa.array(["four", "five", "six"]) +) + +table = pa.Table.from_arrays([dict_array_x, dict_array_y], ["x", "y"]) +pq.write_table(table, "src/test/resources/dictionary_data.parquet") From 9f2ba2d020e5f114e9e7e163ef26f017c6076942 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sat, 2 Sep 2023 22:28:02 +0800 Subject: [PATCH 4/8] Update datafusion-jni/src/dataframe.rs --- datafusion-jni/src/dataframe.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion-jni/src/dataframe.rs b/datafusion-jni/src/dataframe.rs index 628ee54..aaf9c70 100644 --- a/datafusion-jni/src/dataframe.rs +++ b/datafusion-jni/src/dataframe.rs @@ -53,7 +53,7 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_collectDatafr #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_executeStream( - env: JNIEnv, + env: mut JNIEnv, _class: JClass, runtime: jlong, dataframe: jlong, From 4b61a36e2970eb20c65a09cf3779aa718a1e6a5e Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sat, 2 Sep 2023 22:28:07 +0800 Subject: [PATCH 5/8] Update datafusion-jni/src/stream.rs --- datafusion-jni/src/stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion-jni/src/stream.rs b/datafusion-jni/src/stream.rs index 1cf0e06..7988225 100644 --- a/datafusion-jni/src/stream.rs +++ b/datafusion-jni/src/stream.rs @@ -12,7 +12,7 @@ use tokio::runtime::Runtime; #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultRecordBatchStream_next( - env: JNIEnv, + env: mut JNIEnv, _class: JClass, runtime: jlong, stream: jlong, From 4ed42c7d0740c001b33e3c7c2404db6513cca2f7 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sat, 2 Sep 2023 22:28:12 +0800 Subject: [PATCH 6/8] Update datafusion-jni/src/stream.rs --- datafusion-jni/src/stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion-jni/src/stream.rs b/datafusion-jni/src/stream.rs index 7988225..cafc410 100644 --- a/datafusion-jni/src/stream.rs +++ b/datafusion-jni/src/stream.rs @@ -66,7 +66,7 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultRecordBatchStream #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultRecordBatchStream_getSchema( - env: JNIEnv, + env: mut JNIEnv, _class: JClass, stream: jlong, callback: JObject, From d1212c6febfdff90388c778b540902ca32ae35d1 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sat, 2 Sep 2023 22:39:55 +0800 Subject: [PATCH 7/8] Apply suggestions from code review --- datafusion-jni/src/dataframe.rs | 2 +- datafusion-jni/src/stream.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion-jni/src/dataframe.rs b/datafusion-jni/src/dataframe.rs index aaf9c70..3bac036 100644 --- a/datafusion-jni/src/dataframe.rs +++ b/datafusion-jni/src/dataframe.rs @@ -53,7 +53,7 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_collectDatafr #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_executeStream( - env: mut JNIEnv, + mut env: JNIEnv, _class: JClass, runtime: jlong, dataframe: jlong, diff --git a/datafusion-jni/src/stream.rs b/datafusion-jni/src/stream.rs index cafc410..7f7e84c 100644 --- a/datafusion-jni/src/stream.rs +++ b/datafusion-jni/src/stream.rs @@ -12,7 +12,7 @@ use tokio::runtime::Runtime; #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultRecordBatchStream_next( - env: mut JNIEnv, + mut env: JNIEnv, _class: JClass, runtime: jlong, stream: jlong, @@ -66,7 +66,7 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultRecordBatchStream #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultRecordBatchStream_getSchema( - env: mut JNIEnv, + mut env: JNIEnv, _class: JClass, stream: jlong, callback: JObject, From 3663dd624ac92c853e6944da7b9ad18632a05f66 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Mon, 4 Sep 2023 11:26:14 +1200 Subject: [PATCH 8/8] Fixes for upgraded JNI and datafusion --- datafusion-jni/src/dataframe.rs | 11 +++++++---- datafusion-jni/src/stream.rs | 13 +++++++------ 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/datafusion-jni/src/dataframe.rs b/datafusion-jni/src/dataframe.rs index bdbe9c6..23fc9d9 100644 --- a/datafusion-jni/src/dataframe.rs +++ b/datafusion-jni/src/dataframe.rs @@ -59,17 +59,20 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_executeStream callback: JObject, ) { let runtime = unsafe { &mut *(runtime as *mut Runtime) }; - let dataframe = unsafe { &mut *(dataframe as *mut Arc) }; + let dataframe = unsafe { &mut *(dataframe as *mut DataFrame) }; runtime.block_on(async { - let stream_result = dataframe.execute_stream().await; + let stream_result = dataframe.clone().execute_stream().await; match stream_result { Ok(stream) => { let stream = Box::into_raw(Box::new(stream)) as jlong; + let err_message = env + .new_string("".to_string()) + .expect("Couldn't create java string!"); env.call_method( callback, "callback", "(Ljava/lang/String;J)V", - &[JValue::Void, stream.into()], + &[(&err_message).into(), stream.into()], ) } Err(err) => { @@ -81,7 +84,7 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_executeStream callback, "callback", "(Ljava/lang/String;J)V", - &[err_message.into(), stream.into()], + &[(&err_message).into(), stream.into()], ) } } diff --git a/datafusion-jni/src/stream.rs b/datafusion-jni/src/stream.rs index 7f7e84c..455aeb8 100644 --- a/datafusion-jni/src/stream.rs +++ b/datafusion-jni/src/stream.rs @@ -3,7 +3,7 @@ use arrow::array::StructArray; use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; use datafusion::physical_plan::SendableRecordBatchStream; use futures::stream::TryStreamExt; -use jni::objects::{JClass, JObject, JValue}; +use jni::objects::{JClass, JObject}; use jni::sys::jlong; use jni::JNIEnv; use std::convert::Into; @@ -34,7 +34,7 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultRecordBatchStream callback, "callback", "(Ljava/lang/String;J)V", - &[err_message.into(), array_address.into()], + &[(&err_message).into(), array_address.into()], ) } Ok(None) => { @@ -44,7 +44,7 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultRecordBatchStream callback, "callback", "(Ljava/lang/String;J)V", - &[err_message.into(), array_address.into()], + &[(&err_message).into(), array_address.into()], ) } Err(err) => { @@ -56,7 +56,7 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultRecordBatchStream callback, "callback", "(Ljava/lang/String;J)V", - &[err_message.into(), array_address.into()], + &[(&err_message).into(), array_address.into()], ) } } @@ -77,11 +77,12 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultRecordBatchStream match ffi_schema { Ok(mut ffi_schema) => { let schema_address = addr_of_mut!(ffi_schema) as jlong; + let err_message = env.new_string("").expect("Couldn't create java string!"); env.call_method( callback, "callback", "(Ljava/lang/String;J)V", - &[JValue::Void, schema_address.into()], + &[(&err_message).into(), schema_address.into()], ) } Err(err) => { @@ -93,7 +94,7 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultRecordBatchStream callback, "callback", "(Ljava/lang/String;J)V", - &[err_message.into(), schema_address.into()], + &[(&err_message).into(), schema_address.into()], ) } }