Skip to content

Commit

Permalink
[dbs-leipzig#1559] Added static min/max degree operator and min/max/a…
Browse files Browse the repository at this point in the history
…vg degree evolution operator
  • Loading branch information
ChrizZz110 committed Apr 5, 2022
1 parent 1a9e7fc commit 943d90b
Show file tree
Hide file tree
Showing 21 changed files with 1,046 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.statistics;

import org.apache.flink.api.java.DataSet;
import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphToBaseGraphOperator;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.operators.statistics.functions.AddSumDegreesToGraphHeadCrossFunction;

/**
* Max degree operator calculates the maximum degree of all vertices of a graph and writes it to the graph
* head as a new property named {@link MaxDegree#PROPERTY_MAX_DEGREE}.
*/
public class MaxDegree implements UnaryBaseGraphToBaseGraphOperator<LogicalGraph> {

/**
* The name of the property that holds the max degree after the calculation.
*/
public static final String PROPERTY_MAX_DEGREE = "maxDegree";

@Override
public LogicalGraph execute(LogicalGraph graph) {

DataSet<EPGMGraphHead> newGraphHead = new VertexDegrees().execute(graph)
.max(1)
.crossWithTiny(graph.getGraphHead().first(1))
.with(new AddSumDegreesToGraphHeadCrossFunction(PROPERTY_MAX_DEGREE));

return graph.getFactory().fromDataSets(newGraphHead, graph.getVertices(), graph.getEdges());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.statistics;

import org.apache.flink.api.java.DataSet;
import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphToBaseGraphOperator;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.operators.statistics.functions.AddSumDegreesToGraphHeadCrossFunction;

/**
* Min degree operator calculates the minimum degree of all vertices of a graph and writes it to the graph
* head as a new property named {@link MinDegree#PROPERTY_MIN_DEGREE}.
*/
public class MinDegree implements UnaryBaseGraphToBaseGraphOperator<LogicalGraph> {

/**
* The name of the property that holds the min degree after the calculation.
*/
public static final String PROPERTY_MIN_DEGREE = "minDegree";

@Override
public LogicalGraph execute(LogicalGraph graph) {
DataSet<EPGMGraphHead> newGraphHead = new VertexDegrees().execute(graph)
.min(1)
.crossWithTiny(graph.getGraphHead().first(1))
.with(new AddSumDegreesToGraphHeadCrossFunction(PROPERTY_MIN_DEGREE));

return graph.getFactory().fromDataSets(newGraphHead, graph.getVertices(), graph.getEdges());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.sampling.statistics;
package org.gradoop.flink.model.impl.operators.statistics;

import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
import org.gradoop.flink.model.GradoopFlinkTestBase;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.operators.statistics.AverageClusteringCoefficient;
import org.gradoop.flink.util.FlinkAsciiGraphLoader;
import org.junit.Before;
import org.junit.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.sampling.statistics;
package org.gradoop.flink.model.impl.operators.statistics;

import org.gradoop.flink.model.GradoopFlinkTestBase;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.operators.statistics.AverageDegree;
import org.gradoop.flink.model.impl.operators.sampling.common.SamplingEvaluationConstants;
import org.junit.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.sampling.statistics;
package org.gradoop.flink.model.impl.operators.statistics;

import org.gradoop.flink.model.GradoopFlinkTestBase;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.operators.statistics.AverageIncomingDegree;
import org.gradoop.flink.model.impl.operators.sampling.common.SamplingEvaluationConstants;
import org.junit.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.sampling.statistics;
package org.gradoop.flink.model.impl.operators.statistics;

import org.gradoop.flink.model.GradoopFlinkTestBase;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.operators.statistics.AverageOutgoingDegree;
import org.gradoop.flink.model.impl.operators.sampling.common.SamplingEvaluationConstants;
import org.junit.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.sampling.statistics;
package org.gradoop.flink.model.impl.operators.statistics;

import com.google.common.collect.Lists;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.api.java.tuple.Tuple3;
import org.gradoop.flink.model.GradoopFlinkTestBase;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.operators.statistics.ConnectedComponentsDistribution;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.sampling.statistics;
package org.gradoop.flink.model.impl.operators.statistics;

import org.apache.flink.api.java.DataSet;
import org.gradoop.flink.model.GradoopFlinkTestBase;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.operators.statistics.DegreeCentrality;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.sampling.statistics;
package org.gradoop.flink.model.impl.operators.statistics;

import org.gradoop.flink.model.GradoopFlinkTestBase;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.operators.statistics.GraphDensity;
import org.gradoop.flink.model.impl.operators.sampling.common.SamplingEvaluationConstants;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/**
Expand All @@ -46,6 +46,6 @@ public void testGraphDensity() throws Exception {
// density should not be 0
assertTrue("Graph density is 0", density > 0.);
// density for social network graph should be (24 / 11 * 10) = 0.21818...
assertTrue("Computed graph density is incorrect", density == (24d / 110d));
assertEquals("Computed graph density is incorrect", (24d / 110d), density, 0.0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.statistics;

import org.gradoop.flink.model.GradoopFlinkTestBase;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

/**
* Integration test class of {@link MaxDegree}.
*/
public class MaxDegreeTest extends GradoopFlinkTestBase {

/**
* Tests the computation of the maximum degree of a logical graph.
*
* @throws Exception If loading of the example-graph fails
*/
@Test
public void testMaxDegree() throws Exception {
LogicalGraph graph = getSocialNetworkLoader().getLogicalGraph();

long maxDegree = graph.callForGraph(new MaxDegree())
.getGraphHead()
.collect()
.get(0)
.getPropertyValue(MaxDegree.PROPERTY_MAX_DEGREE).getLong();

assertEquals(6L, maxDegree);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.statistics;

import org.gradoop.flink.model.GradoopFlinkTestBase;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

/**
* Integration test class of {@link MinDegree}.
*/
public class MinDegreeTest extends GradoopFlinkTestBase {

/**
* Tests the computation of the minimum degree of a logical graph.
*
* @throws Exception If loading of the example-graph fails
*/
@Test
public void testMinDegree() throws Exception {
LogicalGraph graph = getSocialNetworkLoader().getLogicalGraph();

long minDegree = graph.callForGraph(new MinDegree())
.getGraphHead()
.collect()
.get(0)
.getPropertyValue(MinDegree.PROPERTY_MIN_DEGREE).getLong();

assertEquals(2L, minDegree);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.sampling.statistics;
package org.gradoop.flink.model.impl.operators.statistics;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.gradoop.flink.model.GradoopFlinkTestBase;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.operators.statistics.ConnectedComponentsDistributionAsValues;
import org.gradoop.flink.util.FlinkAsciiGraphLoader;
import org.junit.Test;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.operators.metric;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphToValueOperator;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.operators.metric.functions.AggregateType;
import org.gradoop.temporal.model.impl.operators.metric.functions.BuildTemporalDegreeTree;
import org.gradoop.temporal.model.impl.operators.metric.functions.FlatMapVertexIdEdgeInterval;
import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToAggregateDegrees;
import org.gradoop.temporal.model.impl.operators.metric.functions.TransformDeltaToAbsoluteDegreeTree;

import java.util.Objects;

/**
* Operator that calculates the average degree evolution of all vertices of a temporal graph for the
* whole lifetime of the graph. The average value is rounded up to the next integer.
*/
public class AvgDegreeEvolution
implements UnaryBaseGraphToValueOperator<TemporalGraph, DataSet<Tuple2<Long, Integer>>> {
/**
* The time dimension that will be considered.
*/
private final TimeDimension dimension;

/**
* The degree type (IN, OUT, BOTH);
*/
private final VertexDegree degreeType;

/**
* Creates an instance of this average degree evolution operator.
*
* @param degreeType the degree type to use (IN, OUT, BOTH).
* @param dimension the time dimension to use (VALID_TIME, TRANSACTION_TIME).
*/
public AvgDegreeEvolution(VertexDegree degreeType, TimeDimension dimension) {
this.degreeType = Objects.requireNonNull(degreeType);
this.dimension = Objects.requireNonNull(dimension);
}

@Override
public DataSet<Tuple2<Long, Integer>> execute(TemporalGraph graph) {
return graph.getEdges()
// 1) Extract vertex id(s) and corresponding time intervals
.flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType))
// 2) Group them by the vertex id
.groupBy(0)
// 3) For each vertex id, build a degree tree data structure
.reduceGroup(new BuildTemporalDegreeTree())
// 4) Transform each tree to aggregated evolution
.map(new TransformDeltaToAbsoluteDegreeTree())
// 5) Merge trees together and calculate aggregation
.reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregateType.AVG));
}
}
Loading

0 comments on commit 943d90b

Please sign in to comment.