Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support maps for combine, join, and groupTuple operators #3739

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions docs/operator.rst
Original file line number Diff line number Diff line change
@@ -794,13 +794,44 @@ according to these rules:
* For any other value, the value itself is used as a key.


.. _operator-groupmap:

groupMap
----------

The ``groupMap`` operator collects maps of values emitted by the source channel, grouping together the
elements that share the same key, and emitting a new map object for each distinct key collected. It is
nearly identical to the ``groupTuple`` operator, except that it operates on maps instead of tuples. As such,
the ``by`` parameter should specify string keys instead of integer indices.

For example::

Channel.of(
[group: 1, name: 'A'],
[group: 1, name: 'B'],
[group: 2, name: 'C'],
[group: 3, name: 'B'],
[group: 1, name: 'C'],
[group: 2, name: 'A'],
[group: 3, name: 'D']
)
.groupMap(by: 'group')
.view()

It prints::

[group: 1, name: [A, B, C]]
[group: 2, name: [C, A]]
[group: 3, name: [B, D]]


.. _operator-grouptuple:

groupTuple
----------

The ``groupTuple`` operator collects tuples (or lists) of values emitted by the source channel grouping together the
elements that share the same key. Finally it emits a new tuple object for each distinct key collected.
The ``groupTuple`` operator collects tuples (or lists) of values emitted by the source channel, grouping together the
elements that share the same key, and emitting a new tuple object for each distinct key collected.

In other words, the operator transforms a sequence of tuple like *(K, V, W, ..)* into a new channel emitting a sequence of
*(K, list(V), list(W), ..)*
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Copyright 2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.extension

import groovy.util.logging.Slf4j
import groovyx.gpars.dataflow.DataflowReadChannel
import groovyx.gpars.dataflow.DataflowWriteChannel
import nextflow.Channel
import nextflow.util.ArrayBag
import nextflow.util.CacheHelper
import nextflow.util.CheckHelper
/**
* Abstract class for grouping operators.
*
* @author Ben Sherman <[email protected]>
*/
@Slf4j
abstract class AbstractGroupOp {

static protected Map PARAM_TYPES = [
sort: [Boolean, 'true', 'natural', 'deep', 'hash', Closure, Comparator],
size: Integer,
remainder: Boolean
]

protected DataflowReadChannel source

protected int size

protected boolean remainder

protected sort

protected Comparator comparator

protected DataflowWriteChannel target

AbstractGroupOp(Map params, DataflowReadChannel source) {
this.source = source
size = params?.size ?: 0
remainder = params?.remainder ?: false
sort = params?.sort
comparator = createComparator()
}

/**
* Get the expected size of a key that contains a GroupKey.
*
* @param key
*/
static protected int sizeBy(List key) {
if( key.size()==1 && key[0] instanceof GroupKey ) {
final groupKey = (GroupKey)key[0]
final size = groupKey.getGroupSize()
log.debug "groupMap dynamic size: key=${groupKey} size=${size}"
return size
}
else
return 0
}

/**
* Create the comparator to be used depending the #sort property
*/
private Comparator createComparator() {

/*
* comparator logic used to sort tuple elements
*/
switch(sort) {
case null:
return null

case true:
case 'true':
case 'natural':
return { o1,o2 -> o1<=>o2 } as Comparator

case 'hash':
return { o1, o2 ->
def h1 = CacheHelper.hasher(o1).hash()
def h2 = CacheHelper.hasher(o2).hash()
return h1.asLong() <=> h2.asLong()
} as Comparator

case 'deep':
return { o1, o2 ->
def h1 = CacheHelper.hasher(o1, CacheHelper.HashMode.DEEP).hash()
def h2 = CacheHelper.hasher(o2, CacheHelper.HashMode.DEEP).hash()
return h1.asLong() <=> h2.asLong()
} as Comparator

case Comparator:
return sort as Comparator

case Closure:
final closure = (Closure)sort
if( closure.getMaximumNumberOfParameters()==2 )
return sort as Comparator

else if( closure.getMaximumNumberOfParameters()==1 )
return { o1, o2 ->
def v1 = closure.call(o1)
def v2 = closure.call(o2)
return v1 <=> v2
} as Comparator

else
throw new IllegalArgumentException("Invalid groupMap option - The sort closure should have 1 or 2 arguments")

default:
throw new IllegalArgumentException("Not a valid sort argument: ${sort}")
}

}

/**
* Invoke the operator
*/
DataflowWriteChannel apply() {

if( target == null )
target = CH.create()

// apply the operator to the source channel
DataflowHelper.subscribeImpl(source, getHandlers())

// return the target channel
return target
}

abstract protected Map getHandlers()

}
137 changes: 137 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/extension/GroupMapOp.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright 2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.extension

import groovy.util.logging.Slf4j
import groovyx.gpars.dataflow.DataflowReadChannel
import groovyx.gpars.dataflow.DataflowWriteChannel
import nextflow.Channel
import nextflow.util.ArrayBag
import nextflow.util.CacheHelper
import nextflow.util.CheckHelper
/**
* Implements {@link OperatorImpl#groupMap} operator logic
*
* @author Ben Sherman <[email protected]>
*/
@Slf4j
class GroupMapOp extends AbstractGroupOp {

private List indices

private Map<List,Map> groups = [:]

GroupMapOp(Map params, DataflowReadChannel source) {

super(params, source)

CheckHelper.checkParams('groupMap', params, PARAM_TYPES + [ by: [String, List] ])

indices = getIndices(params?.by)
}

static private List<String> getIndices( by ) {

if( by == null )
throw new IllegalArgumentException("The `by` option is required for `groupMap` operator: '${by}'")

if( by instanceof List )
return by

if( by instanceof String )
return [by]

throw new IllegalArgumentException("Not a valid `by` index for `groupMap` operator: '${by}' -- It must be a string or a list of strings")
}

@Override
protected Map getHandlers() {
[onNext: this.&onNext, onComplete: this.&onComplete]
}

/**
* Collect a received item into its group.
*
* @param item
*/
private void onNext(Map item) {

// get the grouping key
final key = indices.collect { k -> item[k] }

// get the group for the specified key
// or create it if it does not exist
final Map group = groups.getOrCreate(key) {
def result = new HashMap(item.size())
for( String k : item.keySet() )
result[k] = (k in indices ? item[k] : new ArrayBag())
return result
}

// append the values in the item
int count = -1
for( String k : item.keySet() ) {
if( k !in indices ) {
def list = (List)group[k]
list.add( item[k] )
count = list.size()
}
}

// emit group if it is complete
final size = this.size ?: sizeBy(key)
if( size > 0 && size == count ) {
bindGroup(group, size)
groups.remove(key)
}
}

/**
* Emit the remaining groups when all values have been received.
*/
private void onComplete(nop) {
groups.each { key, group -> bindGroup(group, size ?: sizeBy(key)) }
target.bind(Channel.STOP)
}

/**
* Emit a group.
*
* @param group
* @param size
*/
private void bindGroup( Map group, int size ) {

def item = new HashMap(group)

if( !remainder && size > 0 ) {
// make sure the group contains 'size' elements
List list = group.values().find { it instanceof List }
if( list.size() != size )
return
}

// sort the grouped entries
if( comparator )
for( def entry : item.values() )
if( entry instanceof List )
Collections.sort((List)entry, comparator)

target.bind( item )
}

}
236 changes: 63 additions & 173 deletions modules/nextflow/src/main/groovy/nextflow/extension/GroupTupleOp.groovy
Original file line number Diff line number Diff line change
@@ -30,226 +30,116 @@ import nextflow.util.CheckHelper
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
@Slf4j
class GroupTupleOp {

static private Map GROUP_TUPLE_PARAMS = [ by: [Integer, List], sort: [Boolean, 'true','natural','deep','hash',Closure,Comparator], size: Integer, remainder: Boolean ]

static private List<Integer> GROUP_DEFAULT_INDEX = [0]


/**
* Comparator used to sort tuple entries (when required)
*/
private Comparator comparator

private int size
class GroupTupleOp extends AbstractGroupOp {

private List indices

private DataflowWriteChannel target

private DataflowReadChannel channel

private boolean remainder

private Map<List,List> groups = [:]

private sort

GroupTupleOp(Map params, DataflowReadChannel source) {

CheckHelper.checkParams('groupTuple', params, GROUP_TUPLE_PARAMS)
super(params, source)

channel = source
indices = getGroupTupleIndices(params)
size = params?.size ?: 0
remainder = params?.remainder ?: false
sort = params?.sort
CheckHelper.checkParams('groupTuple', params, PARAM_TYPES + [ by: [Integer, List] ])

defineComparator()
indices = getIndices(params?.by)
}

GroupTupleOp setTarget(DataflowWriteChannel target) {
this.target = target
this.@target = target
return this
}

static private List<Integer> getGroupTupleIndices( Map params ) {
static private List<Integer> getIndices( by ) {

if( params?.by == null )
return GROUP_DEFAULT_INDEX
if( by == null )
return [0]

if( params.by instanceof List )
return params.by as List<Integer>
if( by instanceof List )
return by as List<Integer>

if( params.by instanceof Integer || params.by.toString().isInteger() )
return [params.by as Integer]
if( by instanceof Integer || by.toString().isInteger() )
return [by as Integer]

throw new IllegalArgumentException("Not a valid `by` index for `groupTuple` operator: '${params.by}' -- It must be an integer value or a list of integers")
throw new IllegalArgumentException("Not a valid `by` index for `groupTuple` operator: '${by}' -- It must be an integer value or a list of integers")
}

/*
* Collects received values grouping by key
@Override
protected Map getHandlers() {
[onNext: this.&onNext, onComplete: this.&onComplete]
}

/**
* Collect a received item into its group.
*
* @param item
*/
private void collect(List tuple) {
private void onNext(List item) {

final key = tuple[indices] // the actual grouping key
final len = tuple.size()
// get the grouping key
final key = item[indices]
final len = item.size()

final List items = groups.getOrCreate(key) { // get the group for the specified key
def result = new ArrayList(len) // create if does not exists
// get the group for the specified key
// or create it if it does not exist
final List group = groups.getOrCreate(key) {
def result = new ArrayList(len)
for( int i=0; i<len; i++ )
result[i] = (i in indices ? tuple[i] : new ArrayBag())
result[i] = (i in indices ? item[i] : new ArrayBag())
return result
}

int count=-1
for( int i=0; i<len; i++ ) { // append the values in the tuple
if( ! (i in indices) ) {
def list = (items[i] as List)
list.add( tuple[i] )
count=list.size()
// append the values in the item
int count = -1
for( int i=0; i<len; i++ ) {
if( i !in indices ) {
def list = (List)group[i]
list.add( item[i] )
count = list.size()
}
}

final sz = size ?: sizeBy(key)
if( sz>0 && sz==count ) {
bindTuple(items, sz)
// emit group if it is complete
final size = this.size ?: sizeBy(key)
if( size > 0 && size == count ) {
bindGroup(group, size)
groups.remove(key)
}
}


/*
* finalize the grouping binding the remaining values
*/
private void finalise(nop) {
groups.each { keys, items -> bindTuple(items, size ?: sizeBy(keys)) }
target.bind(Channel.STOP)
}

/*
* bind collected items to the target channel
*/
private void bindTuple( List items, int sz ) {

def tuple = new ArrayList(items)

if( !remainder && sz>0 ) {
// verify exist it contains 'size' elements
List list = items.find { it instanceof List }
if( list.size() != sz ) {
return
}
}

if( comparator ) {
sortInnerLists(tuple, comparator)
}

target.bind( tuple )
}

/**
* Define the comparator to be used depending the #sort property
* Emit the remaining groups when all values have been received.
*/
private void defineComparator( ) {

/*
* comparator logic used to sort tuple elements
*/
switch(sort) {
case null:
break

case true:
case 'true':
case 'natural':
comparator = { o1,o2 -> o1<=>o2 } as Comparator
break;

case 'hash':
comparator = { o1, o2 ->
def h1 = CacheHelper.hasher(o1).hash()
def h2 = CacheHelper.hasher(o2).hash()
return h1.asLong() <=> h2.asLong()
} as Comparator
break

case 'deep':
comparator = { o1, o2 ->
def h1 = CacheHelper.hasher(o1, CacheHelper.HashMode.DEEP).hash()
def h2 = CacheHelper.hasher(o2, CacheHelper.HashMode.DEEP).hash()
return h1.asLong() <=> h2.asLong()
} as Comparator
break

case Comparator:
comparator = sort as Comparator
break

case Closure:
final closure = (Closure)sort
if( closure.getMaximumNumberOfParameters()==2 ) {
comparator = sort as Comparator
}
else if( closure.getMaximumNumberOfParameters()==1 ) {
comparator = { o1, o2 ->
def v1 = closure.call(o1)
def v2 = closure.call(o2)
return v1 <=> v2
} as Comparator
}
else
throw new IllegalArgumentException("Invalid groupTuple option - The closure should have 1 or 2 arguments")
break

default:
throw new IllegalArgumentException("Not a valid sort argument: ${sort}")
}

private void onComplete(nop) {
groups.each { key, group -> bindGroup(group, size ?: sizeBy(key)) }
target.bind(Channel.STOP)
}

/**
* Main method to invoke the operator
* Emit a group.
*
* @return The resulting channel
* @param group
* @param size
*/
DataflowWriteChannel apply() {

if( target == null )
target = CH.create()

/*
* apply the logic the the source channel
*/
DataflowHelper.subscribeImpl(channel, [onNext: this.&collect, onComplete: this.&finalise])

/*
* return the target channel
*/
return target
}
private void bindGroup( List group, int size ) {

private static sortInnerLists( List tuple, Comparator c ) {
def item = new ArrayList(group)

for( int i=0; i<tuple.size(); i++ ) {
def entry = tuple[i]
if( !(entry instanceof List) ) continue
Collections.sort(entry as List, c)
if( !remainder && size > 0 ) {
// make sure the group contains 'size' elements
List list = group.find { it instanceof List }
if( list.size() != size ) {
return
}
}

}
// sort the grouped entries
if( comparator )
for( def entry : item )
if( entry instanceof List )
Collections.sort((List)entry, comparator)

static protected int sizeBy(List target) {
if( target.size()==1 && target[0] instanceof GroupKey ) {
final group = (GroupKey)target[0]
final size = group.getGroupSize()
log.debug "GroupTuple dynamic size: key=${group} size=$size"
return size
}
else
return 0
target.bind( item )
}

}
Original file line number Diff line number Diff line change
@@ -226,9 +226,12 @@ class OperatorImpl {
return result
}

DataflowWriteChannel groupTuple( final DataflowReadChannel source, final Map params=null ) {
def result = new GroupTupleOp(params, source).apply()
return result
DataflowWriteChannel groupMap( DataflowReadChannel source, Map params=null ) {
new GroupMapOp(params, source).apply()
}

DataflowWriteChannel groupTuple( DataflowReadChannel source, Map params=null ) {
new GroupTupleOp(params, source).apply()
}

/**
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.extension

import nextflow.Channel
import nextflow.Session
import spock.lang.Specification

/**
*
* @author Ben Sherman <bentshermann@gmail.com>
*/
class GroupMapOpTest extends Specification {

def setup() {
new Session()
}

def 'should group items using dynamic group size' () {
given:
def k1 = new GroupKey('k1', 2)
def k2 = new GroupKey('k2', 3)
def k3 = new GroupKey('k3', 4)

def items = [
[group: k1, name: 'a'],
[group: k1, name: 'b'],
[group: k2, name: 'x'],
[group: k3, name: 'q'],
[group: k1, name: 'd'],
[group: k1, name: 'c'],
[group: k2, name: 'y'],
[group: k1, name: 'f'],
[group: k2, name: 'z']
]

when:
// here the size is defined as operator argument
def result = items.channel().groupMap(by: 'group', size: 2)
then:
result.val == [group: k1, name: ['a', 'b']]
result.val == [group: k1, name: ['d', 'c']]
result.val == [group: k2, name: ['x', 'y']]
result.val == Channel.STOP

when:
// here the size is inferred by the key itself
result = items.channel().groupMap(by: 'group')
then:
result.val == [group: k1, name: ['a', 'b']]
result.val == [group: k1, name: ['d', 'c']]
result.val == [group: k2, name: ['x', 'y', 'z']]
result.val == Channel.STOP

when:
result = items.channel().groupMap(by: 'group', remainder: true)
then:
result.val == [group: k1, name: ['a', 'b']]
result.val == [group: k1, name: ['d', 'c']]
result.val == [group: k2, name: ['x', 'y', 'z']]
result.val == [group: k3, name: ['q']]
result.val == [group: k1, name: ['f']]
result.val == Channel.STOP
}
}
Original file line number Diff line number Diff line change
@@ -18,10 +18,8 @@
package nextflow.extension

import nextflow.Channel

import spock.lang.Specification

import nextflow.Session
import spock.lang.Specification

/**
*
@@ -101,8 +99,7 @@ class GroupTupleOpTest extends Specification {
0 | []
}


def 'should group items using dyn group size' () {
def 'should group items using dynamic group size' () {
given:
def k1 = new GroupKey("k1", 2)
def k2 = new GroupKey('k2', 3)
@@ -130,7 +127,6 @@ class GroupTupleOpTest extends Specification {
result.val == [k2, ['x', 'y', 'z'] ]
result.val == Channel.STOP


when:
result = tuples.channel().groupTuple(remainder: true)
then: