diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java index 289021e3d2dad5..2396599681e493 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java @@ -32,6 +32,10 @@ public DefaultLineageVertex() { this.lineageDatasets = new ArrayList<>(); } + public DefaultLineageVertex(List lineageDatasets) { + this.lineageDatasets = lineageDatasets; + } + public void addLineageDataset(LineageDataset lineageDataset) { this.lineageDatasets.add(lineageDataset); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultSourceLineageVertex.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultSourceLineageVertex.java index fbc4ac4b2d4c68..58d2006cf7efeb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultSourceLineageVertex.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultSourceLineageVertex.java @@ -32,8 +32,13 @@ public class DefaultSourceLineageVertex implements SourceLineageVertex { private List lineageDatasets; public DefaultSourceLineageVertex(Boundedness boundedness) { - this.lineageDatasets = new ArrayList<>(); + this(boundedness, new ArrayList<>()); + } + + public DefaultSourceLineageVertex( + Boundedness boundedness, List lineageDatasets) { this.boundedness = boundedness; + this.lineageDatasets = lineageDatasets; } public void addDataset(LineageDataset lineageDataset) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultTypeDatasetFacet.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultTypeDatasetFacet.java new file mode 100644 index 00000000000000..b3675f2ae923b6 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultTypeDatasetFacet.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.util.Objects; + +/** Default implementation of {@link TypeDatasetFacet}. */ +@PublicEvolving +public class DefaultTypeDatasetFacet implements TypeDatasetFacet { + + public static final String TYPE_FACET_NAME = "type"; + + private final TypeInformation typeInformation; + + public DefaultTypeDatasetFacet(TypeInformation typeInformation) { + this.typeInformation = typeInformation; + } + + public TypeInformation getTypeInformation() { + return typeInformation; + } + + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DefaultTypeDatasetFacet that = (DefaultTypeDatasetFacet) o; + return Objects.equals(typeInformation, that.typeInformation); + } + + @Override + public int hashCode() { + return Objects.hash(typeInformation); + } + + @Override + public String name() { + return TYPE_FACET_NAME; + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageUtil.java new file mode 100644 index 00000000000000..c34a9a2240ab49 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageUtil.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.api.connector.source.Boundedness; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +public class LineageUtil { + + public static LineageDataset datasetOf( + String name, String namespace, TypeDatasetFacet typeDatasetFacet) { + return datasetOf(name, namespace, Collections.singletonList(typeDatasetFacet)); + } + + public static LineageDataset datasetOf( + String name, String namespace, List facets) { + return new DefaultLineageDataset( + name, + namespace, + facets.stream().collect(Collectors.toMap(LineageDatasetFacet::name, item -> item))); + } + + public static SourceLineageVertex sourceLineageVertexOf( + Boundedness boundedness, LineageDataset dataset) { + return new DefaultSourceLineageVertex(boundedness, Collections.singletonList(dataset)); + } + + public static LineageVertex lineageVertexOf(LineageDataset dataset) { + return new DefaultLineageVertex(Collections.singletonList(dataset)); + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/TypeDatasetFacet.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/TypeDatasetFacet.java new file mode 100644 index 00000000000000..9408264a66dc6e --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/TypeDatasetFacet.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +/** Facet definition to contain type information of source and sink. */ +@PublicEvolving +public interface TypeDatasetFacet extends LineageDatasetFacet { + + TypeInformation getTypeInformation(); +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/TypeDatasetFacetProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/TypeDatasetFacetProvider.java new file mode 100644 index 00000000000000..2ff65599ea3ed2 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/TypeDatasetFacetProvider.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Optional; + +/** Contains method to extract {@link TypeDatasetFacet}. */ +@PublicEvolving +public interface TypeDatasetFacetProvider { + + /** + * Returns a type dataset facet or `Optional.empty` in case an implementing class is not able to + * resolve type. + */ + Optional getTypeDatasetFacet(); +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest.java index f9fe10f9ae418c..fcb75c65b703f9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest.java @@ -194,9 +194,7 @@ public LineageVertex getLineageVertex() { LineageDataset lineageDataset = new DefaultLineageDataset( SINK_DATASET_NAME, SINK_DATASET_NAMESPACE, new HashMap<>()); - DefaultLineageVertex lineageVertex = new DefaultLineageVertex(); - lineageVertex.addLineageDataset(lineageDataset); - return lineageVertex; + return LineageUtil.lineageVertexOf(lineageDataset); } } @@ -212,10 +210,7 @@ public LineageVertex getLineageVertex() { LineageDataset lineageDataset = new DefaultLineageDataset( SOURCE_DATASET_NAME, SOURCE_DATASET_NAMESPACE, new HashMap<>()); - DefaultSourceLineageVertex lineageVertex = - new DefaultSourceLineageVertex(Boundedness.BOUNDED); - lineageVertex.addDataset(lineageDataset); - return lineageVertex; + return LineageUtil.sourceLineageVertexOf(Boundedness.BOUNDED, lineageDataset); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageUtilTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageUtilTest.java new file mode 100644 index 00000000000000..3793a2bbd3d206 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageUtilTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.connector.source.Boundedness; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Testing for lineage util. */ +public class LineageUtilTest { + private static final String TEST_NAME = "testName"; + private static final String TEST_NAMESPACE = "testNameSpace"; + + @Test + public void testDataSetOf() { + DefaultTypeDatasetFacet typeDatasetFacet = new DefaultTypeDatasetFacet(Types.BIG_INT); + LineageDataset dataset = LineageUtil.datasetOf(TEST_NAME, TEST_NAMESPACE, typeDatasetFacet); + + assertThat(dataset.name()).isEqualTo(TEST_NAME); + assertThat(dataset.namespace()).isEqualTo(TEST_NAMESPACE); + assertThat(dataset.facets()).size().isEqualTo(1); + assertThat(dataset.facets().get(typeDatasetFacet.name())).isEqualTo(typeDatasetFacet); + } + + @Test + public void testSourceLineageVertexOf() { + LineageDataset dataset = + LineageUtil.datasetOf( + TEST_NAME, TEST_NAMESPACE, new DefaultTypeDatasetFacet(Types.BIG_INT)); + SourceLineageVertex sourceLineageVertex = + LineageUtil.sourceLineageVertexOf(Boundedness.CONTINUOUS_UNBOUNDED, dataset); + + assertThat(sourceLineageVertex.boundedness()).isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED); + assertThat(sourceLineageVertex.datasets()).containsExactly(dataset); + } + + @Test + public void testLineageVertexOf() { + LineageDataset dataset = + LineageUtil.datasetOf( + TEST_NAME, TEST_NAMESPACE, new DefaultTypeDatasetFacet(Types.BIG_INT)); + LineageVertex lineageVertex = LineageUtil.lineageVertexOf(dataset); + assertThat(lineageVertex.datasets()).containsExactly(dataset); + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java index 0478b6d409f060..30dd9480d54030 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java @@ -37,10 +37,10 @@ import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.lineage.DefaultLineageDataset; -import org.apache.flink.streaming.api.lineage.DefaultLineageVertex; import org.apache.flink.streaming.api.lineage.DefaultSourceLineageVertex; import org.apache.flink.streaming.api.lineage.LineageDataset; import org.apache.flink.streaming.api.lineage.LineageGraph; +import org.apache.flink.streaming.api.lineage.LineageUtil; import org.apache.flink.streaming.api.lineage.LineageVertex; import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import org.apache.flink.streaming.runtime.execution.DefaultJobCreatedEvent; @@ -322,9 +322,7 @@ public LineageVertex getLineageVertex() { LineageDataset lineageDataset = new DefaultLineageDataset( SINK_DATASET_NAME, SINK_DATASET_NAMESPACE, new HashMap<>()); - DefaultLineageVertex lineageVertex = new DefaultLineageVertex(); - lineageVertex.addLineageDataset(lineageDataset); - return lineageVertex; + return LineageUtil.lineageVertexOf(lineageDataset); } } }