A Java binding to Apache DataFusion
This project is still a work in progress, and it currently works with Arrow 14.0 and DataFusion version 25.0.
It is built and verified in CI against Java 11 and 21. You may check out the docker run instructions
where Java 21 jshell
is used to run interactively.
The artifacts are published to maven central, so you can use datafusion-java like any normal Java library:
dependencies {
implementation(
group = "io.github.datafusion-contrib",
name = "datafusion-java",
version = "0.16.0" // or latest version, checkout https://github.com/datafusion-contrib/datafusion-java/releases
)
}
To test it out, you can use this piece of demo code:
DataFusionDemo.java
package com.me;
import org.apache.arrow.datafusion.DataFrame;
import org.apache.arrow.datafusion.SessionContext;
import org.apache.arrow.datafusion.SessionContexts;
public class DataFusionDemo {
public static void main(String[] args) throws Exception {
try (SessionContext sessionContext = SessionContexts.create()) {
sessionContext.sql("select sqrt(65536)").thenCompose(DataFrame::show).join();
}
}
}
build.gradle.kts
plugins {
java
application
}
repositories {
mavenCentral()
google()
}
tasks {
application {
mainClass.set("com.me.DataFusionDemo")
}
}
dependencies {
implementation(
group = "io.github.datafusion-contrib",
name = "datafusion-java",
version = "0.16.0"
)
}
Run result
$ ./gradlew run
...
> Task :compileKotlin UP-TO-DATE
> Task :compileJava UP-TO-DATE
> Task :processResources NO-SOURCE
> Task :classes UP-TO-DATE
> Task :run
successfully created tokio runtime
+--------------------+
| sqrt(Int64(65536)) |
+--------------------+
| 256 |
+--------------------+
successfully shutdown tokio runtime
BUILD SUCCESSFUL in 2s
3 actionable tasks: 1 executed, 2 up-to-date
16:43:34: Execution finished 'run'.
First build the docker image:
docker build -t datafusion-example .
Then you can run the example program using Docker:
docker run --rm -it datafusion-example
Or start an interactive jshell session:
docker run --rm -it datafusion-example jshell
Example jshell session
Jan 11, 2024 1:49:28 AM java.util.prefs.FileSystemPreferences$1 run
INFO: Created user preferences directory.
| Welcome to JShell -- Version 21
| For an introduction type: /help intro
jshell> import org.apache.arrow.datafusion.*
jshell> var context = SessionContexts.create()
01:41:05.586 [main] DEBUG org.apache.arrow.datafusion.JNILoader -- successfully loaded datafusion_jni from library path
01:41:05.589 [main] DEBUG org.apache.arrow.datafusion.JNILoader -- datafusion_jni already loaded, returning
01:41:05.590 [main] DEBUG org.apache.arrow.datafusion.AbstractProxy -- Obtaining DefaultSessionContext@7f58383b8db0
01:41:05.591 [main] DEBUG org.apache.arrow.datafusion.AbstractProxy -- Obtaining TokioRuntime@7f58383ce110
context ==> org.apache.arrow.datafusion.DefaultSessionContext@2d209079
jshell> var df = context.sql("select 1.1 + cos(2.0)").join()
01:41:10.961 [main] DEBUG org.apache.arrow.datafusion.AbstractProxy -- Obtaining DefaultDataFrame@7f5838209100
df ==> org.apache.arrow.datafusion.DefaultDataFrame@34ce8af7
jshell> import org.apache.arrow.memory.*
jshell> var allocator = new RootAllocator()
01:41:22.521 [main] INFO org.apache.arrow.memory.BaseAllocator -- Debug mode disabled. Enable with the VM option -Darrow.memory.debug.allocator=true.
01:41:22.525 [main] INFO org.apache.arrow.memory.DefaultAllocationManagerOption -- allocation manager type not specified, using netty as the default type
01:41:22.525 [main] INFO org.apache.arrow.memory.CheckAllocator -- Using DefaultAllocationManager at memory-unsafe-14.0.2.jar!/org/apache/arrow/memory/DefaultAllocationManagerFactory.class
01:41:22.531 [main] DEBUG org.apache.arrow.memory.util.MemoryUtil -- Constructor for direct buffer found and made accessible
01:41:22.536 [main] DEBUG org.apache.arrow.memory.util.MemoryUtil -- direct buffer constructor: available
01:41:22.537 [main] DEBUG org.apache.arrow.memory.rounding.DefaultRoundingPolicy -- -Dorg.apache.memory.allocator.pageSize: 8192
01:41:22.537 [main] DEBUG org.apache.arrow.memory.rounding.DefaultRoundingPolicy -- -Dorg.apache.memory.allocator.maxOrder: 11
allocator ==> Allocator(ROOT) 0/0/0/9223372036854775807 (res/actual/peak/limit)
jshell> var r = df.collect(allocator).join()
01:41:29.635 [main] INFO org.apache.arrow.datafusion.DefaultDataFrame -- successfully completed with arr length=610
r ==> org.apache.arrow.vector.ipc.ArrowFileReader@7ac7a4e4
jshell> var root = r.getVectorSchemaRoot()
01:41:34.658 [main] DEBUG org.apache.arrow.vector.ipc.ReadChannel -- Reading buffer with size: 10
01:41:34.661 [main] DEBUG org.apache.arrow.vector.ipc.ArrowFileReader -- Footer starts at 416, length: 184
01:41:34.661 [main] DEBUG org.apache.arrow.vector.ipc.ReadChannel -- Reading buffer with size: 184
root ==> org.apache.arrow.vector.VectorSchemaRoot@6cd28fa7
jshell> r.loadNextBatch()
01:41:39.421 [main] DEBUG org.apache.arrow.vector.ipc.ArrowFileReader -- RecordBatch at 200, metadata: 192, body: 16
01:41:39.423 [main] DEBUG org.apache.arrow.vector.ipc.ReadChannel -- Reading buffer with size: 208
01:41:39.424 [main] DEBUG org.apache.arrow.vector.ipc.message.ArrowRecordBatch -- Buffer in RecordBatch at 0, length: 1
01:41:39.425 [main] DEBUG org.apache.arrow.vector.ipc.message.ArrowRecordBatch -- Buffer in RecordBatch at 8, length: 8
$8 ==> true
jshell> var v = root.getVector(0)
v ==> [0.6838531634528577]
Note you must have a local Rust and Java environment setup.
Run the example in one line:
./gradlew run
Or roll your own test example:
import org.apache.arrow.datafusion.DataFrame;
import org.apache.arrow.datafusion.SessionContext;
import org.apache.arrow.datafusion.SessionContexts;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class ExampleMain {
private static final Logger logger = LoggerFactory.getLogger(ExampleMain.class);
public static void main(String[] args) throws Exception {
try (SessionContext sessionContext = SessionContexts.create(); BufferAllocator allocator = new RootAllocator()) {
DataFrame dataFrame = sessionContext.sql("select 1.5 + sqrt(2.0)").get();
dataFrame.collect(allocator).thenAccept(ExampleMain::onReaderResult).get();
}
}
private static void onReaderResult(ArrowReader reader) {
try {
VectorSchemaRoot root = reader.getVectorSchemaRoot();
while (reader.loadNextBatch()) {
Float8Vector vector = (Float8Vector) root.getVector(0);
for (int i = 0; i < root.getRowCount(); i += 1) {
logger.info("value {}={}", i, vector.getValueAsDouble(i));
}
}
// close to release resource
reader.close();
} catch (IOException e) {
logger.warn("got IO Exception", e);
}
}
}
To build the library:
./gradlew build