From edbdb1deb0fb98066bdc398d6f443b77bd8ed2df Mon Sep 17 00:00:00 2001 From: Zhenqiu Huang Date: Fri, 29 Nov 2024 22:23:33 -0800 Subject: [PATCH] [FLINK-36625] add lineage helper class for connector integration --- .../api/lineage/DefaultLineageVertex.java | 4 ++ .../lineage/DefaultSourceLineageVertex.java | 7 +- .../api/lineage/DefaultTypeDatasetFacet.java | 62 ++++++++++++++++++ .../streaming/api/lineage/LineageUtils.java | 53 +++++++++++++++ .../api/lineage/TypeDatasetFacet.java | 29 +++++++++ .../api/lineage/TypeDatasetFacetProvider.java | 34 ++++++++++ .../api/lineage/LineageGraphUtilsTest.java | 9 +-- .../api/lineage/LineageUtilsTest.java | 65 +++++++++++++++++++ .../JobStatusChangedListenerITCase.java | 6 +- 9 files changed, 257 insertions(+), 12 deletions(-) create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultTypeDatasetFacet.java create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageUtils.java create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/TypeDatasetFacet.java create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/TypeDatasetFacetProvider.java create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageUtilsTest.java diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java index 289021e3d2dad5..2396599681e493 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java @@ -32,6 +32,10 @@ public DefaultLineageVertex() { this.lineageDatasets = new ArrayList<>(); } + public DefaultLineageVertex(List lineageDatasets) { + this.lineageDatasets = lineageDatasets; + } + public void addLineageDataset(LineageDataset lineageDataset) { this.lineageDatasets.add(lineageDataset); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultSourceLineageVertex.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultSourceLineageVertex.java index fbc4ac4b2d4c68..58d2006cf7efeb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultSourceLineageVertex.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultSourceLineageVertex.java @@ -32,8 +32,13 @@ public class DefaultSourceLineageVertex implements SourceLineageVertex { private List lineageDatasets; public DefaultSourceLineageVertex(Boundedness boundedness) { - this.lineageDatasets = new ArrayList<>(); + this(boundedness, new ArrayList<>()); + } + + public DefaultSourceLineageVertex( + Boundedness boundedness, List lineageDatasets) { this.boundedness = boundedness; + this.lineageDatasets = lineageDatasets; } public void addDataset(LineageDataset lineageDataset) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultTypeDatasetFacet.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultTypeDatasetFacet.java new file mode 100644 index 00000000000000..b3675f2ae923b6 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultTypeDatasetFacet.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.util.Objects; + +/** Default implementation of {@link TypeDatasetFacet}. */ +@PublicEvolving +public class DefaultTypeDatasetFacet implements TypeDatasetFacet { + + public static final String TYPE_FACET_NAME = "type"; + + private final TypeInformation typeInformation; + + public DefaultTypeDatasetFacet(TypeInformation typeInformation) { + this.typeInformation = typeInformation; + } + + public TypeInformation getTypeInformation() { + return typeInformation; + } + + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DefaultTypeDatasetFacet that = (DefaultTypeDatasetFacet) o; + return Objects.equals(typeInformation, that.typeInformation); + } + + @Override + public int hashCode() { + return Objects.hash(typeInformation); + } + + @Override + public String name() { + return TYPE_FACET_NAME; + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageUtils.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageUtils.java new file mode 100644 index 00000000000000..363def24a13ac9 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageUtils.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.Boundedness; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** Util class for creating default LineageDataset and LineageVertex. */ +@Internal +public class LineageUtils { + + public static LineageDataset datasetOf( + String name, String namespace, TypeDatasetFacet typeDatasetFacet) { + return datasetOf(name, namespace, Collections.singletonList(typeDatasetFacet)); + } + + public static LineageDataset datasetOf( + String name, String namespace, List facets) { + return new DefaultLineageDataset( + name, + namespace, + facets.stream().collect(Collectors.toMap(LineageDatasetFacet::name, item -> item))); + } + + public static SourceLineageVertex sourceLineageVertexOf( + Boundedness boundedness, LineageDataset dataset) { + return new DefaultSourceLineageVertex(boundedness, Collections.singletonList(dataset)); + } + + public static LineageVertex lineageVertexOf(LineageDataset dataset) { + return new DefaultLineageVertex(Collections.singletonList(dataset)); + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/TypeDatasetFacet.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/TypeDatasetFacet.java new file mode 100644 index 00000000000000..9408264a66dc6e --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/TypeDatasetFacet.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +/** Facet definition to contain type information of source and sink. */ +@PublicEvolving +public interface TypeDatasetFacet extends LineageDatasetFacet { + + TypeInformation getTypeInformation(); +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/TypeDatasetFacetProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/TypeDatasetFacetProvider.java new file mode 100644 index 00000000000000..2ff65599ea3ed2 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/TypeDatasetFacetProvider.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Optional; + +/** Contains method to extract {@link TypeDatasetFacet}. */ +@PublicEvolving +public interface TypeDatasetFacetProvider { + + /** + * Returns a type dataset facet or `Optional.empty` in case an implementing class is not able to + * resolve type. + */ + Optional getTypeDatasetFacet(); +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest.java index f9fe10f9ae418c..1226c8a0665b97 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest.java @@ -194,9 +194,7 @@ public LineageVertex getLineageVertex() { LineageDataset lineageDataset = new DefaultLineageDataset( SINK_DATASET_NAME, SINK_DATASET_NAMESPACE, new HashMap<>()); - DefaultLineageVertex lineageVertex = new DefaultLineageVertex(); - lineageVertex.addLineageDataset(lineageDataset); - return lineageVertex; + return LineageUtils.lineageVertexOf(lineageDataset); } } @@ -212,10 +210,7 @@ public LineageVertex getLineageVertex() { LineageDataset lineageDataset = new DefaultLineageDataset( SOURCE_DATASET_NAME, SOURCE_DATASET_NAMESPACE, new HashMap<>()); - DefaultSourceLineageVertex lineageVertex = - new DefaultSourceLineageVertex(Boundedness.BOUNDED); - lineageVertex.addDataset(lineageDataset); - return lineageVertex; + return LineageUtils.sourceLineageVertexOf(Boundedness.BOUNDED, lineageDataset); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageUtilsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageUtilsTest.java new file mode 100644 index 00000000000000..0a2e9a2afb66ca --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageUtilsTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.lineage; + +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.connector.source.Boundedness; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Testing for lineage util. */ +public class LineageUtilsTest { + private static final String TEST_NAME = "testName"; + private static final String TEST_NAMESPACE = "testNameSpace"; + + @Test + public void testDataSetOf() { + DefaultTypeDatasetFacet typeDatasetFacet = new DefaultTypeDatasetFacet(Types.BIG_INT); + LineageDataset dataset = + LineageUtils.datasetOf(TEST_NAME, TEST_NAMESPACE, typeDatasetFacet); + + assertThat(dataset.name()).isEqualTo(TEST_NAME); + assertThat(dataset.namespace()).isEqualTo(TEST_NAMESPACE); + assertThat(dataset.facets()).size().isEqualTo(1); + assertThat(dataset.facets().get(typeDatasetFacet.name())).isEqualTo(typeDatasetFacet); + } + + @Test + public void testSourceLineageVertexOf() { + LineageDataset dataset = + LineageUtils.datasetOf( + TEST_NAME, TEST_NAMESPACE, new DefaultTypeDatasetFacet(Types.BIG_INT)); + SourceLineageVertex sourceLineageVertex = + LineageUtils.sourceLineageVertexOf(Boundedness.CONTINUOUS_UNBOUNDED, dataset); + + assertThat(sourceLineageVertex.boundedness()).isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED); + assertThat(sourceLineageVertex.datasets()).containsExactly(dataset); + } + + @Test + public void testLineageVertexOf() { + LineageDataset dataset = + LineageUtils.datasetOf( + TEST_NAME, TEST_NAMESPACE, new DefaultTypeDatasetFacet(Types.BIG_INT)); + LineageVertex lineageVertex = LineageUtils.lineageVertexOf(dataset); + assertThat(lineageVertex.datasets()).containsExactly(dataset); + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java index 0478b6d409f060..b9cc3c1313a900 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java @@ -37,10 +37,10 @@ import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.lineage.DefaultLineageDataset; -import org.apache.flink.streaming.api.lineage.DefaultLineageVertex; import org.apache.flink.streaming.api.lineage.DefaultSourceLineageVertex; import org.apache.flink.streaming.api.lineage.LineageDataset; import org.apache.flink.streaming.api.lineage.LineageGraph; +import org.apache.flink.streaming.api.lineage.LineageUtils; import org.apache.flink.streaming.api.lineage.LineageVertex; import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import org.apache.flink.streaming.runtime.execution.DefaultJobCreatedEvent; @@ -322,9 +322,7 @@ public LineageVertex getLineageVertex() { LineageDataset lineageDataset = new DefaultLineageDataset( SINK_DATASET_NAME, SINK_DATASET_NAMESPACE, new HashMap<>()); - DefaultLineageVertex lineageVertex = new DefaultLineageVertex(); - lineageVertex.addLineageDataset(lineageDataset); - return lineageVertex; + return LineageUtils.lineageVertexOf(lineageDataset); } } }