diff --git a/docs/operator.rst b/docs/operator.rst index 49a26673bd..00bd9ebc9e 100644 --- a/docs/operator.rst +++ b/docs/operator.rst @@ -794,13 +794,44 @@ according to these rules: * For any other value, the value itself is used as a key. +.. _operator-groupmap: + +groupMap +---------- + +The ``groupMap`` operator collects maps of values emitted by the source channel, grouping together the +elements that share the same key, and emitting a new map object for each distinct key collected. It is +nearly identical to the ``groupTuple`` operator, except that it operates on maps instead of tuples. As such, +the ``by`` parameter should specify string keys instead of integer indices. + +For example:: + + Channel.of( + [group: 1, name: 'A'], + [group: 1, name: 'B'], + [group: 2, name: 'C'], + [group: 3, name: 'B'], + [group: 1, name: 'C'], + [group: 2, name: 'A'], + [group: 3, name: 'D'] + ) + .groupMap(by: 'group') + .view() + +It prints:: + + [group: 1, name: [A, B, C]] + [group: 2, name: [C, A]] + [group: 3, name: [B, D]] + + .. _operator-grouptuple: groupTuple ---------- -The ``groupTuple`` operator collects tuples (or lists) of values emitted by the source channel grouping together the -elements that share the same key. Finally it emits a new tuple object for each distinct key collected. +The ``groupTuple`` operator collects tuples (or lists) of values emitted by the source channel, grouping together the +elements that share the same key, and emitting a new tuple object for each distinct key collected. In other words, the operator transforms a sequence of tuple like *(K, V, W, ..)* into a new channel emitting a sequence of *(K, list(V), list(W), ..)* diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/AbstractGroupOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/AbstractGroupOp.groovy new file mode 100644 index 0000000000..e8e028621f --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/extension/AbstractGroupOp.groovy @@ -0,0 +1,148 @@ +/* + * Copyright 2023, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.extension + +import groovy.util.logging.Slf4j +import groovyx.gpars.dataflow.DataflowReadChannel +import groovyx.gpars.dataflow.DataflowWriteChannel +import nextflow.Channel +import nextflow.util.ArrayBag +import nextflow.util.CacheHelper +import nextflow.util.CheckHelper +/** + * Abstract class for grouping operators. + * + * @author Ben Sherman + */ +@Slf4j +abstract class AbstractGroupOp { + + static protected Map PARAM_TYPES = [ + sort: [Boolean, 'true', 'natural', 'deep', 'hash', Closure, Comparator], + size: Integer, + remainder: Boolean + ] + + protected DataflowReadChannel source + + protected int size + + protected boolean remainder + + protected sort + + protected Comparator comparator + + protected DataflowWriteChannel target + + AbstractGroupOp(Map params, DataflowReadChannel source) { + this.source = source + size = params?.size ?: 0 + remainder = params?.remainder ?: false + sort = params?.sort + comparator = createComparator() + } + + /** + * Get the expected size of a key that contains a GroupKey. + * + * @param key + */ + static protected int sizeBy(List key) { + if( key.size()==1 && key[0] instanceof GroupKey ) { + final groupKey = (GroupKey)key[0] + final size = groupKey.getGroupSize() + log.debug "groupMap dynamic size: key=${groupKey} size=${size}" + return size + } + else + return 0 + } + + /** + * Create the comparator to be used depending the #sort property + */ + private Comparator createComparator() { + + /* + * comparator logic used to sort tuple elements + */ + switch(sort) { + case null: + return null + + case true: + case 'true': + case 'natural': + return { o1,o2 -> o1<=>o2 } as Comparator + + case 'hash': + return { o1, o2 -> + def h1 = CacheHelper.hasher(o1).hash() + def h2 = CacheHelper.hasher(o2).hash() + return h1.asLong() <=> h2.asLong() + } as Comparator + + case 'deep': + return { o1, o2 -> + def h1 = CacheHelper.hasher(o1, CacheHelper.HashMode.DEEP).hash() + def h2 = CacheHelper.hasher(o2, CacheHelper.HashMode.DEEP).hash() + return h1.asLong() <=> h2.asLong() + } as Comparator + + case Comparator: + return sort as Comparator + + case Closure: + final closure = (Closure)sort + if( closure.getMaximumNumberOfParameters()==2 ) + return sort as Comparator + + else if( closure.getMaximumNumberOfParameters()==1 ) + return { o1, o2 -> + def v1 = closure.call(o1) + def v2 = closure.call(o2) + return v1 <=> v2 + } as Comparator + + else + throw new IllegalArgumentException("Invalid groupMap option - The sort closure should have 1 or 2 arguments") + + default: + throw new IllegalArgumentException("Not a valid sort argument: ${sort}") + } + + } + + /** + * Invoke the operator + */ + DataflowWriteChannel apply() { + + if( target == null ) + target = CH.create() + + // apply the operator to the source channel + DataflowHelper.subscribeImpl(source, getHandlers()) + + // return the target channel + return target + } + + abstract protected Map getHandlers() + +} diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/GroupMapOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/GroupMapOp.groovy new file mode 100644 index 0000000000..49d905c3bb --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/extension/GroupMapOp.groovy @@ -0,0 +1,137 @@ +/* + * Copyright 2023, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.extension + +import groovy.util.logging.Slf4j +import groovyx.gpars.dataflow.DataflowReadChannel +import groovyx.gpars.dataflow.DataflowWriteChannel +import nextflow.Channel +import nextflow.util.ArrayBag +import nextflow.util.CacheHelper +import nextflow.util.CheckHelper +/** + * Implements {@link OperatorImpl#groupMap} operator logic + * + * @author Ben Sherman + */ +@Slf4j +class GroupMapOp extends AbstractGroupOp { + + private List indices + + private Map groups = [:] + + GroupMapOp(Map params, DataflowReadChannel source) { + + super(params, source) + + CheckHelper.checkParams('groupMap', params, PARAM_TYPES + [ by: [String, List] ]) + + indices = getIndices(params?.by) + } + + static private List getIndices( by ) { + + if( by == null ) + throw new IllegalArgumentException("The `by` option is required for `groupMap` operator: '${by}'") + + if( by instanceof List ) + return by + + if( by instanceof String ) + return [by] + + throw new IllegalArgumentException("Not a valid `by` index for `groupMap` operator: '${by}' -- It must be a string or a list of strings") + } + + @Override + protected Map getHandlers() { + [onNext: this.&onNext, onComplete: this.&onComplete] + } + + /** + * Collect a received item into its group. + * + * @param item + */ + private void onNext(Map item) { + + // get the grouping key + final key = indices.collect { k -> item[k] } + + // get the group for the specified key + // or create it if it does not exist + final Map group = groups.getOrCreate(key) { + def result = new HashMap(item.size()) + for( String k : item.keySet() ) + result[k] = (k in indices ? item[k] : new ArrayBag()) + return result + } + + // append the values in the item + int count = -1 + for( String k : item.keySet() ) { + if( k !in indices ) { + def list = (List)group[k] + list.add( item[k] ) + count = list.size() + } + } + + // emit group if it is complete + final size = this.size ?: sizeBy(key) + if( size > 0 && size == count ) { + bindGroup(group, size) + groups.remove(key) + } + } + + /** + * Emit the remaining groups when all values have been received. + */ + private void onComplete(nop) { + groups.each { key, group -> bindGroup(group, size ?: sizeBy(key)) } + target.bind(Channel.STOP) + } + + /** + * Emit a group. + * + * @param group + * @param size + */ + private void bindGroup( Map group, int size ) { + + def item = new HashMap(group) + + if( !remainder && size > 0 ) { + // make sure the group contains 'size' elements + List list = group.values().find { it instanceof List } + if( list.size() != size ) + return + } + + // sort the grouped entries + if( comparator ) + for( def entry : item.values() ) + if( entry instanceof List ) + Collections.sort((List)entry, comparator) + + target.bind( item ) + } + +} diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/GroupTupleOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/GroupTupleOp.groovy index c494917bbd..0c5d2c5b1a 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/GroupTupleOp.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/GroupTupleOp.groovy @@ -30,226 +30,116 @@ import nextflow.util.CheckHelper * @author Paolo Di Tommaso */ @Slf4j -class GroupTupleOp { - - static private Map GROUP_TUPLE_PARAMS = [ by: [Integer, List], sort: [Boolean, 'true','natural','deep','hash',Closure,Comparator], size: Integer, remainder: Boolean ] - - static private List GROUP_DEFAULT_INDEX = [0] - - - /** - * Comparator used to sort tuple entries (when required) - */ - private Comparator comparator - - private int size +class GroupTupleOp extends AbstractGroupOp { private List indices - private DataflowWriteChannel target - - private DataflowReadChannel channel - - private boolean remainder - private Map groups = [:] - private sort - GroupTupleOp(Map params, DataflowReadChannel source) { - CheckHelper.checkParams('groupTuple', params, GROUP_TUPLE_PARAMS) + super(params, source) - channel = source - indices = getGroupTupleIndices(params) - size = params?.size ?: 0 - remainder = params?.remainder ?: false - sort = params?.sort + CheckHelper.checkParams('groupTuple', params, PARAM_TYPES + [ by: [Integer, List] ]) - defineComparator() + indices = getIndices(params?.by) } GroupTupleOp setTarget(DataflowWriteChannel target) { - this.target = target + this.@target = target return this } - static private List getGroupTupleIndices( Map params ) { + static private List getIndices( by ) { - if( params?.by == null ) - return GROUP_DEFAULT_INDEX + if( by == null ) + return [0] - if( params.by instanceof List ) - return params.by as List + if( by instanceof List ) + return by as List - if( params.by instanceof Integer || params.by.toString().isInteger() ) - return [params.by as Integer] + if( by instanceof Integer || by.toString().isInteger() ) + return [by as Integer] - throw new IllegalArgumentException("Not a valid `by` index for `groupTuple` operator: '${params.by}' -- It must be an integer value or a list of integers") + throw new IllegalArgumentException("Not a valid `by` index for `groupTuple` operator: '${by}' -- It must be an integer value or a list of integers") } - /* - * Collects received values grouping by key + @Override + protected Map getHandlers() { + [onNext: this.&onNext, onComplete: this.&onComplete] + } + + /** + * Collect a received item into its group. + * + * @param item */ - private void collect(List tuple) { + private void onNext(List item) { - final key = tuple[indices] // the actual grouping key - final len = tuple.size() + // get the grouping key + final key = item[indices] + final len = item.size() - final List items = groups.getOrCreate(key) { // get the group for the specified key - def result = new ArrayList(len) // create if does not exists + // get the group for the specified key + // or create it if it does not exist + final List group = groups.getOrCreate(key) { + def result = new ArrayList(len) for( int i=0; i0 && sz==count ) { - bindTuple(items, sz) + // emit group if it is complete + final size = this.size ?: sizeBy(key) + if( size > 0 && size == count ) { + bindGroup(group, size) groups.remove(key) } } - - /* - * finalize the grouping binding the remaining values - */ - private void finalise(nop) { - groups.each { keys, items -> bindTuple(items, size ?: sizeBy(keys)) } - target.bind(Channel.STOP) - } - - /* - * bind collected items to the target channel - */ - private void bindTuple( List items, int sz ) { - - def tuple = new ArrayList(items) - - if( !remainder && sz>0 ) { - // verify exist it contains 'size' elements - List list = items.find { it instanceof List } - if( list.size() != sz ) { - return - } - } - - if( comparator ) { - sortInnerLists(tuple, comparator) - } - - target.bind( tuple ) - } - /** - * Define the comparator to be used depending the #sort property + * Emit the remaining groups when all values have been received. */ - private void defineComparator( ) { - - /* - * comparator logic used to sort tuple elements - */ - switch(sort) { - case null: - break - - case true: - case 'true': - case 'natural': - comparator = { o1,o2 -> o1<=>o2 } as Comparator - break; - - case 'hash': - comparator = { o1, o2 -> - def h1 = CacheHelper.hasher(o1).hash() - def h2 = CacheHelper.hasher(o2).hash() - return h1.asLong() <=> h2.asLong() - } as Comparator - break - - case 'deep': - comparator = { o1, o2 -> - def h1 = CacheHelper.hasher(o1, CacheHelper.HashMode.DEEP).hash() - def h2 = CacheHelper.hasher(o2, CacheHelper.HashMode.DEEP).hash() - return h1.asLong() <=> h2.asLong() - } as Comparator - break - - case Comparator: - comparator = sort as Comparator - break - - case Closure: - final closure = (Closure)sort - if( closure.getMaximumNumberOfParameters()==2 ) { - comparator = sort as Comparator - } - else if( closure.getMaximumNumberOfParameters()==1 ) { - comparator = { o1, o2 -> - def v1 = closure.call(o1) - def v2 = closure.call(o2) - return v1 <=> v2 - } as Comparator - } - else - throw new IllegalArgumentException("Invalid groupTuple option - The closure should have 1 or 2 arguments") - break - - default: - throw new IllegalArgumentException("Not a valid sort argument: ${sort}") - } - + private void onComplete(nop) { + groups.each { key, group -> bindGroup(group, size ?: sizeBy(key)) } + target.bind(Channel.STOP) } /** - * Main method to invoke the operator + * Emit a group. * - * @return The resulting channel + * @param group + * @param size */ - DataflowWriteChannel apply() { - - if( target == null ) - target = CH.create() - - /* - * apply the logic the the source channel - */ - DataflowHelper.subscribeImpl(channel, [onNext: this.&collect, onComplete: this.&finalise]) - - /* - * return the target channel - */ - return target - } + private void bindGroup( List group, int size ) { - private static sortInnerLists( List tuple, Comparator c ) { + def item = new ArrayList(group) - for( int i=0; i 0 ) { + // make sure the group contains 'size' elements + List list = group.find { it instanceof List } + if( list.size() != size ) { + return + } } - } + // sort the grouped entries + if( comparator ) + for( def entry : item ) + if( entry instanceof List ) + Collections.sort((List)entry, comparator) - static protected int sizeBy(List target) { - if( target.size()==1 && target[0] instanceof GroupKey ) { - final group = (GroupKey)target[0] - final size = group.getGroupSize() - log.debug "GroupTuple dynamic size: key=${group} size=$size" - return size - } - else - return 0 + target.bind( item ) } } diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy index 89656c4d35..a8564e25c3 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy @@ -226,9 +226,12 @@ class OperatorImpl { return result } - DataflowWriteChannel groupTuple( final DataflowReadChannel source, final Map params=null ) { - def result = new GroupTupleOp(params, source).apply() - return result + DataflowWriteChannel groupMap( DataflowReadChannel source, Map params=null ) { + new GroupMapOp(params, source).apply() + } + + DataflowWriteChannel groupTuple( DataflowReadChannel source, Map params=null ) { + new GroupTupleOp(params, source).apply() } /** diff --git a/modules/nextflow/src/test/groovy/nextflow/extension/GroupMapOpTest.groovy b/modules/nextflow/src/test/groovy/nextflow/extension/GroupMapOpTest.groovy new file mode 100644 index 0000000000..ca9d1253a5 --- /dev/null +++ b/modules/nextflow/src/test/groovy/nextflow/extension/GroupMapOpTest.groovy @@ -0,0 +1,79 @@ +/* + * Copyright 2023, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.extension + +import nextflow.Channel +import nextflow.Session +import spock.lang.Specification + +/** + * + * @author Ben Sherman + */ +class GroupMapOpTest extends Specification { + + def setup() { + new Session() + } + + def 'should group items using dynamic group size' () { + given: + def k1 = new GroupKey('k1', 2) + def k2 = new GroupKey('k2', 3) + def k3 = new GroupKey('k3', 4) + + def items = [ + [group: k1, name: 'a'], + [group: k1, name: 'b'], + [group: k2, name: 'x'], + [group: k3, name: 'q'], + [group: k1, name: 'd'], + [group: k1, name: 'c'], + [group: k2, name: 'y'], + [group: k1, name: 'f'], + [group: k2, name: 'z'] + ] + + when: + // here the size is defined as operator argument + def result = items.channel().groupMap(by: 'group', size: 2) + then: + result.val == [group: k1, name: ['a', 'b']] + result.val == [group: k1, name: ['d', 'c']] + result.val == [group: k2, name: ['x', 'y']] + result.val == Channel.STOP + + when: + // here the size is inferred by the key itself + result = items.channel().groupMap(by: 'group') + then: + result.val == [group: k1, name: ['a', 'b']] + result.val == [group: k1, name: ['d', 'c']] + result.val == [group: k2, name: ['x', 'y', 'z']] + result.val == Channel.STOP + + when: + result = items.channel().groupMap(by: 'group', remainder: true) + then: + result.val == [group: k1, name: ['a', 'b']] + result.val == [group: k1, name: ['d', 'c']] + result.val == [group: k2, name: ['x', 'y', 'z']] + result.val == [group: k3, name: ['q']] + result.val == [group: k1, name: ['f']] + result.val == Channel.STOP + } +} diff --git a/modules/nextflow/src/test/groovy/nextflow/extension/GroupTupleOpTest.groovy b/modules/nextflow/src/test/groovy/nextflow/extension/GroupTupleOpTest.groovy index ba48e88330..13c61f200b 100644 --- a/modules/nextflow/src/test/groovy/nextflow/extension/GroupTupleOpTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/extension/GroupTupleOpTest.groovy @@ -18,10 +18,8 @@ package nextflow.extension import nextflow.Channel - -import spock.lang.Specification - import nextflow.Session +import spock.lang.Specification /** * @@ -101,8 +99,7 @@ class GroupTupleOpTest extends Specification { 0 | [] } - - def 'should group items using dyn group size' () { + def 'should group items using dynamic group size' () { given: def k1 = new GroupKey("k1", 2) def k2 = new GroupKey('k2', 3) @@ -130,7 +127,6 @@ class GroupTupleOpTest extends Specification { result.val == [k2, ['x', 'y', 'z'] ] result.val == Channel.STOP - when: result = tuples.channel().groupTuple(remainder: true) then: