Skip to content

Commit

Permalink
initial commit streaming aggs with DF
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Jan 27, 2025
1 parent ee3a53f commit 07b238d
Show file tree
Hide file tree
Showing 28 changed files with 6,828 additions and 164 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.arrow.spi;

import java.util.Set;

public interface PartitionedStreamProducer extends StreamProducer {
Set<StreamTicket> partitions();
void setRootTicket(StreamTicket ticket);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.core.tasks.TaskId;

import java.util.Set;
import java.util.function.Function;

/**
* Interface for managing Arrow data streams between producers and consumers.
* StreamManager handles the registration of producers, stream access control via tickets,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.opensearch.core.tasks.TaskId;

import java.io.Closeable;
import java.util.Collections;
import java.util.Set;

/**
* Represents a producer of Arrow streams. The producer first needs to define the job by implementing this interface and
Expand Down
1 change: 1 addition & 0 deletions libs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ subprojects {
Project depProject = project.project(dep.path)
if (depProject != null
&& (false == depProject.path.equals(':libs:opensearch-core') && false == depProject.path.equals(':libs:opensearch-arrow-memory-shaded') &&
false == depProject.path.equals(':libs:opensearch-arrow-spi') &&
false == depProject.path.equals(':libs:opensearch-common'))
&& depProject.path.startsWith(':libs')) {
throw new InvalidUserDataException("projects in :libs "
Expand Down
163 changes: 163 additions & 0 deletions libs/datafusion/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
import org.opensearch.gradle.testclusters.OpenSearchCluster

dependencies {
api project(':libs:opensearch-common')
implementation project(':libs:opensearch-arrow-spi')

api 'org.apache.arrow:arrow-c-data:17.0.0'
base {
archivesName = 'opensearch-datafusion'
}
// logging
implementation "org.apache.logging.log4j:log4j-api:${versions.log4j}"
implementation "org.apache.logging.log4j:log4j-core:${versions.log4j}"
implementation "org.apache.logging.log4j:log4j-slf4j-impl:${versions.log4j}"

// testing
testImplementation "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}"
testImplementation "junit:junit:${versions.junit}"
testImplementation "org.hamcrest:hamcrest:${versions.hamcrest}"
testImplementation(project(":test:framework")) {
exclude group: 'org.opensearch', module: 'opensearch-datafusion'
}
}

//compileJava {
// dependsOn "cargoBuild"
//}

tasks.named("assemble").configure {
dependsOn "cargoBuild"
}

tasks.named('forbiddenApisMain').configure {
replaceSignatureFiles 'jdk-signatures'
}

task cargoBuild(type:Exec) {
println("Running Cargo Build")
workingDir 'jni'
commandLine 'cargo', 'build', '--release'
}

task copyNativeLib(type: Copy) {
from 'jni/target/release'
include '*.so', '*.dll', '*.dylib', '*.d'
into "$rootDir/native-lib"
}

clean.doFirst {
delete "jni/target"
delete "$rootDir/native-lib"
}

cargoBuild.finalizedBy copyNativeLib

//build {
// dependsOn cargoBuild
//}

clean.doFirst {
delete "jni/target"
}

assemble.enabled = true

//test {
// dependsOn cargoBuild
// systemProperty "java.library.path", "$rootDir/jni/target/release"
//}
//
//testClusters.runTask {
// dependsOn cargoBuild
// println("ROOT DIR");
// print("$rootDir")
// systemProperty("java.library.path", "$rootDir/libs/datafusion/jni/target/release")
//}


// audit garbage
tasks.named('thirdPartyAudit').configure {
ignoreMissingClasses(
"com.conversantmedia.util.concurrent.SpinPolicy",
"com.fasterxml.jackson.dataformat.xml.JacksonXmlModule",
"com.fasterxml.jackson.dataformat.xml.XmlMapper",
"com.fasterxml.jackson.dataformat.xml.util.DefaultXmlPrettyPrinter",
"com.lmax.disruptor.EventFactory",
"com.lmax.disruptor.EventTranslator",
"com.lmax.disruptor.EventTranslatorTwoArg",
"com.lmax.disruptor.EventTranslatorVararg",
"com.lmax.disruptor.ExceptionHandler",
"com.lmax.disruptor.LifecycleAware",
"com.lmax.disruptor.RingBuffer",
"com.lmax.disruptor.Sequence",
"com.lmax.disruptor.SequenceBarrier",
"com.lmax.disruptor.SequenceReportingEventHandler",
"com.lmax.disruptor.TimeoutException",
"com.lmax.disruptor.WaitStrategy",
"com.lmax.disruptor.dsl.Disruptor",
"com.lmax.disruptor.dsl.ProducerType",
"javax.jms.Connection",
"javax.jms.ConnectionFactory",
"javax.jms.Destination",
"javax.jms.JMSException",
"javax.jms.MapMessage",
"javax.jms.Message",
"javax.jms.MessageConsumer",
"javax.jms.MessageProducer",
"javax.jms.Session",
"javax.mail.Authenticator",
"javax.mail.Message\$RecipientType",
"javax.mail.PasswordAuthentication",
"javax.mail.Session",
"javax.mail.Transport",
"javax.mail.internet.InternetAddress",
"javax.mail.internet.InternetHeaders",
"javax.mail.internet.MimeMessage",
"javax.mail.internet.MimeMultipart",
"javax.mail.internet.MimeUtility",
"org.apache.commons.compress.compressors.CompressorStreamFactory",
"org.apache.commons.compress.utils.IOUtils",
"org.apache.commons.csv.CSVFormat",
"org.apache.commons.csv.QuoteMode",
"org.apache.kafka.clients.producer.Producer",
"org.apache.kafka.clients.producer.RecordMetadata",
"org.apache.kafka.common.serialization.ByteArraySerializer",
"org.codehaus.stax2.XMLStreamWriter2",
"org.fusesource.jansi.Ansi",
"org.fusesource.jansi.AnsiRenderer\$Code",
"org.jctools.queues.MpscArrayQueue",
"org.osgi.framework.Bundle",
"org.osgi.framework.BundleActivator",
"org.osgi.framework.BundleContext",
"org.osgi.framework.BundleEvent",
"org.osgi.framework.BundleReference",
"org.osgi.framework.FrameworkUtil",
"org.osgi.framework.ServiceReference",
"org.osgi.framework.ServiceRegistration",
"org.osgi.framework.SynchronousBundleListener",
"org.osgi.framework.wiring.BundleWire",
"org.osgi.framework.wiring.BundleWiring",
"org.slf4j.ext.EventData",
"org.zeromq.SocketType",
"org.zeromq.ZContext",
"org.zeromq.ZMQ",
"org.zeromq.ZMQ\$Context",
"org.zeromq.ZMQ\$Socket",
"org.zeromq.ZMonitor",
"org.zeromq.ZMonitor\$Event",
"org.zeromq.ZMonitor\$ZEvent"
)

ignoreViolations(
"org.apache.logging.log4j.core.util.internal.UnsafeUtil",
"org.apache.logging.log4j.core.util.internal.UnsafeUtil\$1"
)
}
Loading

0 comments on commit 07b238d

Please sign in to comment.