Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor of initial plugin #1

Merged
merged 4 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
strategy:
fail-fast: false
matrix:
java_version: [8, 11]
java_version: [17]

steps:
- name: Environment
Expand Down
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ This plugin provides two new operators:

: Load each Parquet file in a source channel, emitting each row as a separate item.

Default method emits every row in the parquet file as a Map.
In case you want to read only a subset
of fields (a projection) you can provide a java Record class with the fields you want to read

```groovy
record MyRecord( int id, String name){}

splitParquet( [record: MyRecord] )
```

`toParquet( path )`

: Write each item in a source channel to a Parquet file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ java {
}

compileJava {
options.release.set(11)
options.release.set(17)
}

tasks.withType(GroovyCompile) {
sourceCompatibility = '11'
targetCompatibility = '11'
sourceCompatibility = '17'
targetCompatibility = '17'
}

tasks.withType(Test) {
Expand Down
19 changes: 6 additions & 13 deletions plugins/nf-parquet/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,34 +50,27 @@ sourceSets {
}

ext {
nextflowVersion = '23.04.0'
nextflowVersion = '24.04.2'
}

dependencies {
// This dependency is exported to consumers, that is to say found on their compile classpath.
compileOnly "io.nextflow:nextflow:$nextflowVersion"
compileOnly 'org.slf4j:slf4j-api:1.7.10'
compileOnly 'org.pf4j:pf4j:3.4.1'

// add here plugins depepencies
api('org.apache.parquet:parquet-avro:1.10.0')
api('org.apache.hadoop:hadoop-common:3.1.0') {
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}
implementation 'com.jerolba:carpet-record:0.1.0'

// test configuration
testImplementation "org.codehaus.groovy:groovy:3.0.17"
testImplementation "org.codehaus.groovy:groovy-nio:3.0.17"
testImplementation "io.nextflow:nextflow:$nextflowVersion"
testImplementation ("org.codehaus.groovy:groovy-test:3.0.17") { exclude group: 'org.codehaus.groovy' }
testImplementation ("cglib:cglib-nodep:3.3.0")
testImplementation ("org.objenesis:objenesis:3.1")
testImplementation ("org.spockframework:spock-core:2.2-groovy-3.0") { exclude group: 'org.codehaus.groovy'; exclude group: 'net.bytebuddy' }
testImplementation ('org.spockframework:spock-junit4:2.2-groovy-3.0') { exclude group: 'org.codehaus.groovy'; exclude group: 'net.bytebuddy' }
testImplementation ('com.google.jimfs:jimfs:1.1')
testImplementation ("org.spockframework:spock-core:2.3-groovy-4.0") { exclude group: 'org.apache.groovy'; exclude group: 'net.bytebuddy' }
testImplementation ('org.spockframework:spock-junit4:2.3-groovy-4.0') { exclude group: 'org.apache.groovy'; exclude group: 'net.bytebuddy' }

testImplementation(testFixtures("io.nextflow:nextflow:$nextflowVersion"))
testImplementation(testFixtures("io.nextflow:nf-commons:$nextflowVersion"))


// see https://docs.gradle.org/4.1/userguide/dependency_management.html#sec:module_replacement
modules {
module("commons-logging:commons-logging") { replacedBy("org.slf4j:jcl-over-slf4j") }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package nextflow.parquet

import nextflow.plugin.extension.Factory
import nextflow.plugin.extension.Function

import java.nio.file.Path

import groovy.transform.CompileStatic
Expand All @@ -12,18 +15,16 @@ import nextflow.extension.CH
import nextflow.extension.DataflowHelper
import nextflow.plugin.extension.Operator
import nextflow.plugin.extension.PluginExtensionPoint
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path as HadoopPath
import org.apache.parquet.example.data.Group
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.io.ColumnIOFactory

import com.jerolba.carpet.CarpetReader
import com.jerolba.carpet.CarpetWriter
import org.apache.parquet.hadoop.ParquetFileWriter

/**
* Implements extensions for reading and writing Parquet files.
*
* @author Ben Sherman <[email protected]>
* @author Jorge Aguilera <[email protected]>
*/
@Slf4j
@CompileStatic
Expand All @@ -37,14 +38,14 @@ class ParquetExtension extends PluginExtensionPoint {
}

/**
* Load each Parquet file in a source channel, emitting each row as a separate item.
* Load a Parquet file emitting each row as a separate item.
*
* @param path
*/
@Operator
DataflowWriteChannel splitParquet(DataflowReadChannel source) {
DataflowWriteChannel splitParquet(DataflowReadChannel source, Map params=[:]) {
final target = CH.create()
final splitter = new ParquetSplitter(target)
final splitter = new ParquetSplitter(target, params)

final onNext = { it -> splitter.apply(it) }
final onComplete = { target << Channel.STOP }
Expand All @@ -55,66 +56,40 @@ class ParquetExtension extends PluginExtensionPoint {

class ParquetSplitter {
private DataflowWriteChannel target
private Class<Record> clazz

ParquetSplitter(DataflowWriteChannel target) {
ParquetSplitter(DataflowWriteChannel target, Map params) {
this.target = target
if( params.record ) {
if (!(params.record instanceof Class<Record>)) {
throw new IllegalArgumentException("A Record.class is required. Class provided $params.record")
}
this.clazz = params.record as Class<Record>
}
}

void apply(Object source) {
try {
log.debug "Start reading $source, with projection ${clazz ?: 'raw'}"
// create parquet reader
final reader = ParquetFileReader.open(HadoopInputFile.fromPath(toHadoopPath(source), new Configuration()))
final schema = reader.getFooter().getFileMetaData().getSchema()
final fields = schema.getFields()

// read each row from parquet file
def pages = null
try {
while( (pages=reader.readNextRowGroup()) != null ) {
final rows = pages.getRowCount()
final columnIO = new ColumnIOFactory().getColumnIO(schema)
final recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema))

for( int i = 0; i < rows; i++ )
target << fetchRow(recordReader.read())
}
}
finally {
reader.close()
final reader = new CarpetReader(toFile(source), clazz ?: Map)
for (def record : reader) {
target << record
}
}
catch( IOException e ) {
throw new IllegalStateException("Error while reading Parquet file - cause: ${e.message ?: e}", e)
}
}

private HadoopPath toHadoopPath(Object source) {
if( source instanceof String )
new HadoopPath((String)source)
else if( source instanceof Path )
new HadoopPath(((Path)source).toUriString())
else
throw new IllegalArgumentException("Invalid input for splitParquet operator: ${source}")
}

private Map fetchRow(Group group) {
def result = [:]

final fieldCount = group.getType().getFieldCount()
for( int field = 0; field < fieldCount; field++ ) {
final valueCount = group.getFieldRepetitionCount(field)
final fieldType = group.getType().getType(field)
final fieldName = fieldType.getName()

for( int index = 0; index < valueCount; index++ )
if( fieldType.isPrimitive() ) {
println "${fieldName} ${group.getValueToString(field, index)}"
result[fieldName] = group.getValueToString(field, index)
}
private File toFile(Object source){
return switch( source ){
case {it instanceof String}->Path.of(source as String).toFile()
case {it instanceof Path} -> (source as Path).toFile()
default->throw new IllegalArgumentException("Invalid input for splitParquet operator: ${source}")
}

return result
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,26 @@
package nextflow.parquet

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.plugin.BasePlugin
import org.pf4j.PluginWrapper

/**
* Implements the nf-parquet plugin entry point
*
* @author Ben Sherman <[email protected]>
* @author Jorge Aguilera <[email protected]>
*/
@CompileStatic
@Slf4j
class ParquetPlugin extends BasePlugin {

ParquetPlugin(PluginWrapper wrapper) {
super(wrapper)
initPlugin()
}

private void initPlugin(){
log.debug "${this.class.name} plugin initialized"
}
}
2 changes: 1 addition & 1 deletion plugins/nf-parquet/src/resources/META-INF/MANIFEST.MF
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ Plugin-Id: nf-parquet
Plugin-Version: 0.1.0
Plugin-Class: nextflow.parquet.ParquetPlugin
Plugin-Provider: nextflow
Plugin-Requires: >=22.10.0
Plugin-Requires: >=24.04.2
98 changes: 98 additions & 0 deletions plugins/nf-parquet/src/test/nextflow/parquet/PluginTest.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package nextflow.parquet

import nextflow.Channel
import nextflow.plugin.Plugins
import nextflow.plugin.TestPluginDescriptorFinder
import nextflow.plugin.TestPluginManager
import nextflow.plugin.extension.PluginExtensionProvider
import org.pf4j.PluginDescriptorFinder
import spock.lang.Shared
import test.Dsl2Spec
import test.MockScriptRunner

import java.nio.file.Files
import java.nio.file.Path
import java.util.jar.Manifest

class PluginTest extends Dsl2Spec{

@Shared String pluginsMode

def setup() {
// reset previous instances
PluginExtensionProvider.reset()
// this need to be set *before* the plugin manager class is created
pluginsMode = System.getProperty('pf4j.mode')
System.setProperty('pf4j.mode', 'dev')
// the plugin root should
def root = Path.of('.').toAbsolutePath().normalize()
def manager = new TestPluginManager(root){
@Override
protected PluginDescriptorFinder createPluginDescriptorFinder() {
return new TestPluginDescriptorFinder(){
@Override
protected Manifest readManifestFromDirectory(Path pluginPath) {
def manifestPath= getManifestPath(pluginPath)
final input = Files.newInputStream(manifestPath)
return new Manifest(input)
}
protected Path getManifestPath(Path pluginPath) {
return pluginPath.resolve('build/resources/main/META-INF/MANIFEST.MF')
}
}
}
}
Plugins.init(root, 'dev', manager)
}

def cleanup() {
Plugins.stop()
PluginExtensionProvider.reset()
pluginsMode ? System.setProperty('pf4j.mode',pluginsMode) : System.clearProperty('pf4j.mode')
}

def 'should starts' () {
when:
def SCRIPT = '''
channel.of('hi!')
'''
and:
def result = new MockScriptRunner([:]).setScript(SCRIPT).execute()
then:
result.val == 'hi!'
result.val == Channel.STOP
}

def 'should parse a parquet file in raw mode'(){
when:
def path = getClass().getResource('/test.parquet').toURI().path
def SCRIPT = """
include {splitParquet} from 'plugin/nf-parquet'
channel.fromPath("$path").splitParquet()
""".toString()
and:
def result = new MockScriptRunner([:]).setScript(SCRIPT).execute()
then:
result.val == [id:1, name:"test2", sizell:10, value:0.010838246310055144, percentile:0.28001529169191186]
result.val == Channel.STOP
}

def 'should parse a projection'(){
when:
def path = getClass().getResource('/test.parquet').toURI().path
def SCRIPT = """
include {splitParquet} from 'plugin/nf-parquet'

record SingleRecord(long id, String name) {
}

channel.fromPath("$path").splitParquet( [record:SingleRecord] )
""".toString()
and:
def result = new MockScriptRunner([:]).setScript(SCRIPT).execute()
then:
result.val.id == 1
result.val == Channel.STOP
}

}
Binary file added plugins/nf-parquet/src/testResources/test.parquet
Binary file not shown.
Binary file added validation/lib/module-info.class
Binary file not shown.
Binary file added validation/lib/myrecords/Address.class
Binary file not shown.
Binary file added validation/lib/myrecords/CustomRecord.class
Binary file not shown.
Binary file added validation/lib/myrecords/Job.class
Binary file not shown.
Binary file added validation/lib/myrecords/Person.class
Binary file not shown.
Binary file added validation/lib/myrecords/SingleAddress.class
Binary file not shown.
Binary file added validation/lib/myrecords/SinglePerson.class
Binary file not shown.
Binary file added validation/lib/myrecords/SingleRecord.class
Binary file not shown.
4 changes: 4 additions & 0 deletions validation/nextflow.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
plugins {
id "nf-parquet@${System.getenv("PARQUET_PLUGIN_VERSION") ?: "latest"}"
}

Binary file added validation/presidents.parquet
Binary file not shown.
7 changes: 7 additions & 0 deletions validation/read.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
include { splitParquet } from 'plugin/nf-parquet'

record SingleRecord(long id, String name) {
}

channel.fromPath("test*.parquet").splitParquet( record: SingleRecord)
| view
6 changes: 6 additions & 0 deletions validation/read_complex.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
include { splitParquet } from 'plugin/nf-parquet'

import myrecords.*

channel.fromPath("presidents.parquet").splitParquet()
| view
6 changes: 6 additions & 0 deletions validation/read_raw.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
include { splitParquet } from 'plugin/nf-parquet'

import myrecords.*

channel.fromPath("*.parquet").splitParquet()
| view
3 changes: 3 additions & 0 deletions validation/schemas/myrecords/Address.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package myrecords;

record Address(String street, String zip, String city) { }
4 changes: 4 additions & 0 deletions validation/schemas/myrecords/CustomRecord.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package myrecords;
record CustomRecord(long id, String name, int sizell, double value, double percentile) {

}
3 changes: 3 additions & 0 deletions validation/schemas/myrecords/Job.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package myrecords;

record Job(String company, String position, int years){ }
Loading
Loading