Skip to content

Commit

Permalink
[FLINK-36625] add lineage helper class for connector integration
Browse files Browse the repository at this point in the history
  • Loading branch information
HuangZhenQiu committed Nov 30, 2024
1 parent 02510f1 commit 8171f17
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ public DefaultLineageVertex() {
this.lineageDatasets = new ArrayList<>();
}

public DefaultLineageVertex(List<LineageDataset> lineageDatasets) {
this.lineageDatasets = lineageDatasets;
}

public void addLineageDataset(LineageDataset lineageDataset) {
this.lineageDatasets.add(lineageDataset);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,13 @@ public class DefaultSourceLineageVertex implements SourceLineageVertex {
private List<LineageDataset> lineageDatasets;

public DefaultSourceLineageVertex(Boundedness boundedness) {
this.lineageDatasets = new ArrayList<>();
this(boundedness, new ArrayList<>());
}

public DefaultSourceLineageVertex(
Boundedness boundedness, List<LineageDataset> lineageDatasets) {
this.boundedness = boundedness;
this.lineageDatasets = lineageDatasets;
}

public void addDataset(LineageDataset lineageDataset) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.lineage;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.util.Objects;

/** Default implementation of {@link TypeDatasetFacet}. */
@PublicEvolving
public class DefaultTypeDatasetFacet implements TypeDatasetFacet {

public static final String TYPE_FACET_NAME = "type";

private final TypeInformation typeInformation;

public DefaultTypeDatasetFacet(TypeInformation typeInformation) {
this.typeInformation = typeInformation;
}

public TypeInformation getTypeInformation() {
return typeInformation;
}

public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DefaultTypeDatasetFacet that = (DefaultTypeDatasetFacet) o;
return Objects.equals(typeInformation, that.typeInformation);
}

@Override
public int hashCode() {
return Objects.hash(typeInformation);
}

@Override
public String name() {
return TYPE_FACET_NAME;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.lineage;

import org.apache.flink.api.connector.source.Boundedness;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

public class LineageUtil {

public static LineageDataset datasetOf(
String name, String namespace, TypeDatasetFacet typeDatasetFacet) {
return datasetOf(name, namespace, Collections.singletonList(typeDatasetFacet));
}

public static LineageDataset datasetOf(
String name, String namespace, List<LineageDatasetFacet> facets) {
return new DefaultLineageDataset(
name,
namespace,
facets.stream().collect(Collectors.toMap(LineageDatasetFacet::name, item -> item)));
}

public static SourceLineageVertex sourceLineageVertexOf(
Boundedness boundedness, LineageDataset dataset) {
return new DefaultSourceLineageVertex(boundedness, Collections.singletonList(dataset));
}

public static LineageVertex lineageVertexOf(LineageDataset dataset) {
return new DefaultLineageVertex(Collections.singletonList(dataset));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.lineage;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;

/** Facet definition to contain type information of source and sink. */
@PublicEvolving
public interface TypeDatasetFacet extends LineageDatasetFacet {

TypeInformation getTypeInformation();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.lineage;

import org.apache.flink.annotation.PublicEvolving;

import java.util.Optional;

/** Contains method to extract {@link TypeDatasetFacet}. */
@PublicEvolving
public interface TypeDatasetFacetProvider {

/**
* Returns a type dataset facet or `Optional.empty` in case an implementing class is not able to
* resolve type.
*/
Optional<TypeDatasetFacet> getTypeDatasetFacet();
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,7 @@ public LineageVertex getLineageVertex() {
LineageDataset lineageDataset =
new DefaultLineageDataset(
SINK_DATASET_NAME, SINK_DATASET_NAMESPACE, new HashMap<>());
DefaultLineageVertex lineageVertex = new DefaultLineageVertex();
lineageVertex.addLineageDataset(lineageDataset);
return lineageVertex;
return LineageUtil.lineageVertexOf(lineageDataset);
}
}

Expand All @@ -212,10 +210,7 @@ public LineageVertex getLineageVertex() {
LineageDataset lineageDataset =
new DefaultLineageDataset(
SOURCE_DATASET_NAME, SOURCE_DATASET_NAMESPACE, new HashMap<>());
DefaultSourceLineageVertex lineageVertex =
new DefaultSourceLineageVertex(Boundedness.BOUNDED);
lineageVertex.addDataset(lineageDataset);
return lineageVertex;
return LineageUtil.sourceLineageVertexOf(Boundedness.BOUNDED, lineageDataset);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.lineage;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.Boundedness;

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

/** Testing for lineage util. */
public class LineageUtilTest {
private static final String TEST_NAME = "testName";
private static final String TEST_NAMESPACE = "testNameSpace";

@Test
public void testDataSetOf() {
DefaultTypeDatasetFacet typeDatasetFacet = new DefaultTypeDatasetFacet(Types.BIG_INT);
LineageDataset dataset = LineageUtil.datasetOf(TEST_NAME, TEST_NAMESPACE, typeDatasetFacet);

assertThat(dataset.name()).isEqualTo(TEST_NAME);
assertThat(dataset.namespace()).isEqualTo(TEST_NAMESPACE);
assertThat(dataset.facets()).size().isEqualTo(1);
assertThat(dataset.facets().get(typeDatasetFacet.name())).isEqualTo(typeDatasetFacet);
}

@Test
public void testSourceLineageVertexOf() {
LineageDataset dataset =
LineageUtil.datasetOf(
TEST_NAME, TEST_NAMESPACE, new DefaultTypeDatasetFacet(Types.BIG_INT));
SourceLineageVertex sourceLineageVertex =
LineageUtil.sourceLineageVertexOf(Boundedness.CONTINUOUS_UNBOUNDED, dataset);

assertThat(sourceLineageVertex.boundedness()).isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED);
assertThat(sourceLineageVertex.datasets()).containsExactly(dataset);
}

@Test
public void testLineageVertexOf() {
LineageDataset dataset =
LineageUtil.datasetOf(
TEST_NAME, TEST_NAMESPACE, new DefaultTypeDatasetFacet(Types.BIG_INT));
LineageVertex lineageVertex = LineageUtil.lineageVertexOf(dataset);
assertThat(lineageVertex.datasets()).containsExactly(dataset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.lineage.DefaultLineageDataset;
import org.apache.flink.streaming.api.lineage.DefaultLineageVertex;
import org.apache.flink.streaming.api.lineage.DefaultSourceLineageVertex;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageGraph;
import org.apache.flink.streaming.api.lineage.LineageUtil;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.streaming.runtime.execution.DefaultJobCreatedEvent;
Expand Down Expand Up @@ -322,9 +322,7 @@ public LineageVertex getLineageVertex() {
LineageDataset lineageDataset =
new DefaultLineageDataset(
SINK_DATASET_NAME, SINK_DATASET_NAMESPACE, new HashMap<>());
DefaultLineageVertex lineageVertex = new DefaultLineageVertex();
lineageVertex.addLineageDataset(lineageDataset);
return lineageVertex;
return LineageUtil.lineageVertexOf(lineageDataset);
}
}
}

0 comments on commit 8171f17

Please sign in to comment.