Skip to content

Commit

Permalink
[dbs-leipzig#1559] Changed return type to time intervals instead of t…
Browse files Browse the repository at this point in the history
…imestamps, put duplicated code in base class, added default constructors with default time dimension and degree type
  • Loading branch information
ChrizZz110 committed Aug 22, 2022
1 parent 8be5c3e commit c6ae6fe
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 294 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,60 +16,41 @@
package org.gradoop.temporal.model.impl.operators.metric;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphToValueOperator;
import org.apache.flink.api.java.tuple.Tuple3;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToAverageDegrees;
import org.gradoop.temporal.model.impl.operators.metric.functions.TransformDeltaToAbsoluteDegreeTree;
import org.gradoop.temporal.model.impl.operators.metric.functions.FlatMapVertexIdEdgeInterval;
import org.gradoop.temporal.model.impl.operators.metric.functions.BuildTemporalDegreeTree;

import java.util.Objects;
import org.gradoop.temporal.model.impl.operators.metric.functions.AggregateType;
import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToAggregateDegrees;

/**
* Operator that calculates the average degree evolution of all vertices of a temporal graph for the
* whole lifetime of the graph. The average value is rounded up to the next integer.
* Operator that calculates the evolution of the graph's average degree for the whole lifetime of the graph.
* The result is a triple dataset {@link DataSet<Tuple3>} in form {@code <Long, Long, Float>}. It
* represents a time interval (first and second element) and the aggregated degree value for this interval
* (3rd element).
*/
public class AvgDegreeEvolution
implements UnaryBaseGraphToValueOperator<TemporalGraph, DataSet<Tuple2<Long, Double>>> {
/**
* The time dimension that will be considered.
*/
private final TimeDimension dimension;

public class AvgDegreeEvolution extends BaseAggregateDegreeEvolution {
/**
* The degree type (IN, OUT, BOTH);
* Creates an instance of this average degree evolution operator using {@link TimeDimension#VALID_TIME}
* as default time dimension and {@link VertexDegree#BOTH} as default degree type.
*/
private final VertexDegree degreeType;
public AvgDegreeEvolution() {
super();
}

/**
* Creates an instance of this average degree evolution operator.
*
* @param degreeType the degree type to use (IN, OUT, BOTH).
* @param dimension the time dimension to use (VALID_TIME, TRANSACTION_TIME).
* Creates an instance of this average degree evolution operator using the given time dimension and
* degree type.
*
* @throws RuntimeException in case of an error.
* @param degreeType the degree type (IN, OUT or BOTH)
* @param dimension the time dimension to consider (VALID_TIME or TRANSACTION_TIME)
*/
public AvgDegreeEvolution(VertexDegree degreeType, TimeDimension dimension) throws RuntimeException {
this.degreeType = Objects.requireNonNull(degreeType);
this.dimension = Objects.requireNonNull(dimension);
public AvgDegreeEvolution(VertexDegree degreeType, TimeDimension dimension) {
super(degreeType, dimension);
}

@Override
public DataSet<Tuple2<Long, Double>> execute(TemporalGraph graph) {
return graph.getEdges()
// 1) Extract vertex id(s) and corresponding time intervals
.flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType))
// 2) Group them by the vertex id
.groupBy(0)
// 3) For each vertex id, build a degree tree data structure
.reduceGroup(new BuildTemporalDegreeTree())
// 4) Transform each tree to aggregated evolution
.map(new TransformDeltaToAbsoluteDegreeTree())
// 5) Merge trees together and calculate aggregation
.reduceGroup(new GroupDegreeTreesToAverageDegrees());

public DataSet<Tuple3<Long, Long, Float>> execute(TemporalGraph graph) {
return preProcess(graph).reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregateType.AVG));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.operators.metric;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphToValueOperator;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.operators.metric.functions.BuildTemporalDegreeTree;
import org.gradoop.temporal.model.impl.operators.metric.functions.FlatMapVertexIdEdgeInterval;
import org.gradoop.temporal.model.impl.operators.metric.functions.TransformDeltaToAbsoluteDegreeTree;

import java.util.Objects;
import java.util.TreeMap;

/**
* Abstract class as parent for aggregated degree evolution operators.
*/
abstract class BaseAggregateDegreeEvolution
implements UnaryBaseGraphToValueOperator<TemporalGraph, DataSet<Tuple3<Long, Long, Float>>> {

/**
* The time dimension that will be considered.
*/
private TimeDimension dimension = TimeDimension.VALID_TIME;

/**
* The degree type (IN, OUT, BOTH);
*/
private VertexDegree degreeType = VertexDegree.BOTH;

/**
* Default constructor using {@link TimeDimension#VALID_TIME} as default time dimension and
* {@link VertexDegree#BOTH} as default degree type.
*/
protected BaseAggregateDegreeEvolution() {
}

/**
* Abstract constructor for the aggregated degree evolution of a graph.
*
* @param degreeType the degree type (IN, OUT or BOTH)
* @param dimension the time dimension to consider (VALID_TIME or TRANSACTION_TIME)
*/
protected BaseAggregateDegreeEvolution(VertexDegree degreeType, TimeDimension dimension) {
this.degreeType = Objects.requireNonNull(degreeType);
this.dimension = Objects.requireNonNull(dimension);
}

/**
* A pre-process function to prevent duplicate code for min, max and avg aggregation. The result is an
* absolute degree tree for each vertex (id).
*
* @param graph the temporal graph as input
* @return a dataset containing an absolute degree tree for each vertex identifier
*/
public DataSet<Tuple2<GradoopId, TreeMap<Long, Integer>>> preProcess(TemporalGraph graph) {
return graph.getEdges()
// 1) Extract vertex id(s) and corresponding time intervals
.flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType))
// 2) Group them by the vertex id
.groupBy(0)
// 3) For each vertex id, build a degree tree data structure
.reduceGroup(new BuildTemporalDegreeTree())
// 4) Transform each tree to aggregated evolution
.map(new TransformDeltaToAbsoluteDegreeTree());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,60 +16,41 @@
package org.gradoop.temporal.model.impl.operators.metric;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphToValueOperator;
import org.apache.flink.api.java.tuple.Tuple3;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToAggregateDegrees;
import org.gradoop.temporal.model.impl.operators.metric.functions.TransformDeltaToAbsoluteDegreeTree;
import org.gradoop.temporal.model.impl.operators.metric.functions.FlatMapVertexIdEdgeInterval;
import org.gradoop.temporal.model.impl.operators.metric.functions.BuildTemporalDegreeTree;
import org.gradoop.temporal.model.impl.operators.metric.functions.AggregateType;

import java.util.Objects;
import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToAggregateDegrees;

/**
* Operator that calculates the maximum degree evolution of all vertices of a temporal graph for the
* whole lifetime of the graph.
* Operator that calculates the evolution of the graph's maximum degree for the whole lifetime of the graph.
* The result is a triple dataset {@link DataSet<Tuple3>} in form {@code <Long, Long, Float>}. It
* represents a time interval (first and second element) and the aggregated degree value for this interval
* (3rd element).
*/
public class MaxDegreeEvolution
implements UnaryBaseGraphToValueOperator<TemporalGraph, DataSet<Tuple2<Long, Integer>>> {
/**
* The time dimension that will be considered.
*/
private final TimeDimension dimension;

public class MaxDegreeEvolution extends BaseAggregateDegreeEvolution {
/**
* The degree type (IN, OUT, BOTH);
* Creates an instance of this maximum degree evolution operator using {@link TimeDimension#VALID_TIME}
* as default time dimension and {@link VertexDegree#BOTH} as default degree type.
*/
private final VertexDegree degreeType;
public MaxDegreeEvolution() {
super();
}

/**
* Creates an instance of this maximum degree evolution operator.
*
* @param degreeType the degree type to use (IN, OUT, BOTH).
* @param dimension the time dimension to use (VALID_TIME, TRANSACTION_TIME).
* Creates an instance of this maximum degree evolution operator using the given time dimension and
* degree type.
*
* @throws RuntimeException in case of an error.
* @param degreeType the degree type (IN, OUT or BOTH)
* @param dimension the time dimension to consider (VALID_TIME or TRANSACTION_TIME)
*/
public MaxDegreeEvolution(VertexDegree degreeType, TimeDimension dimension) throws RuntimeException {
this.degreeType = Objects.requireNonNull(degreeType);
this.dimension = Objects.requireNonNull(dimension);
public MaxDegreeEvolution(VertexDegree degreeType, TimeDimension dimension) {
super(degreeType, dimension);
}

@Override
public DataSet<Tuple2<Long, Integer>> execute(TemporalGraph graph) {
return graph.getEdges()
// 1) Extract vertex id(s) and corresponding time intervals
.flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType))
// 2) Group them by the vertex id
.groupBy(0)
// 3) For each vertex id, build a degree tree data structure
.reduceGroup(new BuildTemporalDegreeTree())
// 4) Transform each tree to aggregated evolution
.map(new TransformDeltaToAbsoluteDegreeTree())
// 5) Merge trees together and calculate aggregation
.reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregateType.MAX));
public DataSet<Tuple3<Long, Long, Float>> execute(TemporalGraph graph) {
return preProcess(graph).reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregateType.MAX));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,60 +16,42 @@
package org.gradoop.temporal.model.impl.operators.metric;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphToValueOperator;
import org.apache.flink.api.java.tuple.Tuple3;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToAggregateDegrees;
import org.gradoop.temporal.model.impl.operators.metric.functions.TransformDeltaToAbsoluteDegreeTree;
import org.gradoop.temporal.model.impl.operators.metric.functions.FlatMapVertexIdEdgeInterval;
import org.gradoop.temporal.model.impl.operators.metric.functions.BuildTemporalDegreeTree;
import org.gradoop.temporal.model.impl.operators.metric.functions.AggregateType;

import java.util.Objects;
import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToAggregateDegrees;

/**
* Operator that calculates the minimum degree evolution of all vertices of a temporal graph for the
* whole lifetime of the graph.
* Operator that calculates the evolution of the graph's minimum degree for the whole lifetime of the graph.
* The result is a triple dataset {@link DataSet<Tuple3>} in form {@code <Long, Long, Float>}. It
* represents a time interval (first and second element) and the aggregated degree value for this interval
* (3rd element).
*/
public class MinDegreeEvolution
implements UnaryBaseGraphToValueOperator<TemporalGraph, DataSet<Tuple2<Long, Integer>>> {
/**
* The time dimension that will be considered.
*/
private final TimeDimension dimension;
public class MinDegreeEvolution extends BaseAggregateDegreeEvolution {

/**
* The degree type (IN, OUT, BOTH);
* Creates an instance of this minimum degree evolution operator using {@link TimeDimension#VALID_TIME}
* as default time dimension and {@link VertexDegree#BOTH} as default degree type.
*/
private final VertexDegree degreeType;
public MinDegreeEvolution() {
super();
}

/**
* Creates an instance of this minimum degree evolution operator.
*
* @param degreeType the degree type to use (IN, OUT, BOTH).
* @param dimension the time dimension to use (VALID_TIME, TRANSACTION_TIME).
* Creates an instance of this minimum degree evolution operator using the given time dimension and
* degree type.
*
* @throws RuntimeException in case of an error.
* @param degreeType the degree type (IN, OUT or BOTH)
* @param dimension the time dimension to consider (VALID_TIME or TRANSACTION_TIME)
*/
public MinDegreeEvolution(VertexDegree degreeType, TimeDimension dimension) throws RuntimeException {
this.degreeType = Objects.requireNonNull(degreeType);
this.dimension = Objects.requireNonNull(dimension);
public MinDegreeEvolution(VertexDegree degreeType, TimeDimension dimension) {
super(degreeType, dimension);
}

@Override
public DataSet<Tuple2<Long, Integer>> execute(TemporalGraph graph) {
return graph.getEdges()
// 1) Extract vertex id(s) and corresponding time intervals
.flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType))
// 2) Group them by the vertex id
.groupBy(0)
// 3) For each vertex id, build a degree tree data structure
.reduceGroup(new BuildTemporalDegreeTree())
// 4) Transform each tree to aggregated evolution
.map(new TransformDeltaToAbsoluteDegreeTree())
// 5) Merge trees together and calculate aggregation
.reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregateType.MIN));
public DataSet<Tuple3<Long, Long, Float>> execute(TemporalGraph graph) {
return preProcess(graph).reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregateType.MIN));
}
}
Loading

0 comments on commit c6ae6fe

Please sign in to comment.