From 6de30f8d2e3b669acc844ace806eb1c2eaf319b2 Mon Sep 17 00:00:00 2001 From: Alexander Egorov Date: Tue, 12 Jul 2022 12:56:13 -0500 Subject: [PATCH 1/2] 1. Create KinesisStateUtil class with static method `createShardsStateSerializer` that explicitly assigns `KryoSerializer` to `SequenceNumber` class and use this method to initialize state 2. Create unit test that checks for compatibility of previous `TypeInformation` based serializer and explicitly created `KryoSerializer` 3. Replace `Mokito` calls with utility `MockStreamingRuntimeContext` to improve testability and unify initialization method of Streaming Runtime Context --- .../kinesis/FlinkKinesisConsumer.java | 23 ++++---- .../kinesis/util/KinesisStateUtil.java | 56 ++++++++++++++++++ .../kinesis/FlinkKinesisConsumerTest.java | 59 +++++++++++++++++-- .../TestableFlinkKinesisConsumer.java | 29 ++------- 4 files changed, 126 insertions(+), 41 deletions(-) create mode 100644 flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisStateUtil.java diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index b0a729fb11eb5..f1c6536234b18 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -28,7 +28,6 @@ import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; @@ -46,6 +45,7 @@ import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisStateUtil; import org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil; import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker; import org.apache.flink.util.InstantiationUtil; @@ -228,7 +228,8 @@ public FlinkKinesisConsumer( "The provided deserialization schema is not serializable: " + deserializer.getClass().getName() + ". " - + "Please check that it does not contain references to non-serializable instances."); + + "Please check that it does not contain references " + + "to non-serializable instances."); this.deserializer = deserializer; StreamConsumerRegistrarUtil.eagerlyRegisterStreamConsumers(configProps, streams); @@ -370,7 +371,8 @@ public void run(SourceContext sourceContext) throws Exception { if (LOG.isInfoEnabled()) { LOG.info( - "Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}", + "Subtask {} will be seeded with initial shard {}, " + + "starting state set as sequence number {}", getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), startingSeqNum.get()); @@ -434,16 +436,13 @@ public TypeInformation getProducedType() { @Override public void initializeState(FunctionInitializationContext context) throws Exception { - TypeInformation> shardsStateTypeInfo = - new TupleTypeInfo<>( - TypeInformation.of(StreamShardMetadata.class), - TypeInformation.of(SequenceNumber.class)); - sequenceNumsStateForCheckpoint = context.getOperatorStateStore() .getUnionListState( new ListStateDescriptor<>( - sequenceNumsStateStoreName, shardsStateTypeInfo)); + sequenceNumsStateStoreName, + KinesisStateUtil.createShardsStateSerializer( + getRuntimeContext().getExecutionConfig()))); if (context.isRestored()) { if (sequenceNumsToRestore == null) { @@ -463,7 +462,8 @@ public void initializeState(FunctionInitializationContext context) throws Except } LOG.info( - "Setting restore state in the FlinkKinesisConsumer. Using the following offsets: {}", + "Setting restore state in the FlinkKinesisConsumer. " + + "Using the following offsets: {}", sequenceNumsToRestore); } } else { @@ -509,7 +509,8 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception { if (LOG.isDebugEnabled()) { LOG.debug( - "Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}", + "Snapshotted state, last processed sequence numbers: {}, " + + "checkpoint id: {}, timestamp: {}", lastStateSnapshot, context.getCheckpointId(), context.getCheckpointTimestamp()); diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisStateUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisStateUtil.java new file mode 100644 index 0000000000000..eba444056f336 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisStateUtil.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata; + +/** Utilities for Flink Kinesis connector state management. */ +public class KinesisStateUtil { + + /** To prevent instantiation of class. */ + private KinesisStateUtil() {} + + /** + * Creates state serializer for kinesis shard sequence number. Using of the explicit state + * serializer with KryoSerializer is needed because otherwise users cannot use + * 'disableGenericTypes' properties with KinesisConsumer, see FLINK-24943 for details + * + * @return state serializer + */ + public static TupleSerializer> + createShardsStateSerializer(ExecutionConfig executionConfig) { + // explicit serializer will keep the compatibility with GenericTypeInformation + // and allow to disableGenericTypes for users + TypeSerializer[] fieldSerializers = + new TypeSerializer[] { + TypeInformation.of(StreamShardMetadata.class).createSerializer(executionConfig), + new KryoSerializer<>(SequenceNumber.class, executionConfig) + }; + @SuppressWarnings("unchecked") + Class> tupleClass = + (Class>) (Class) Tuple2.class; + return new TupleSerializer<>(tupleClass, fieldSerializers); + } +} diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index 364a412c5d25c..ebbd827bac3e9 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -26,8 +26,11 @@ import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.common.time.Deadline; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.runtime.PojoSerializer; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.mock.Whitebox; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -54,10 +57,13 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils; import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisStateUtil; import org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter; import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.CollectingSourceContext; +import org.apache.flink.streaming.util.MockStreamingRuntimeContext; +import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.TestLogger; import com.amazonaws.services.kinesis.model.HashKeyRange; @@ -160,9 +166,7 @@ public void testUseRestoredStateForSnapshotIfFetcherNotInitialized() throws Exce FlinkKinesisConsumer consumer = new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config); - RuntimeContext context = mock(RuntimeContext.class); - when(context.getIndexOfThisSubtask()).thenReturn(0); - when(context.getNumberOfParallelSubtasks()).thenReturn(2); + RuntimeContext context = new MockStreamingRuntimeContext(true, 2, 0); consumer.setRuntimeContext(context); OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); @@ -280,8 +284,7 @@ public void testListStateChangedAfterSnapshotState() throws Exception { new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config); FlinkKinesisConsumer mockedConsumer = spy(consumer); - RuntimeContext context = mock(RuntimeContext.class); - when(context.getIndexOfThisSubtask()).thenReturn(1); + RuntimeContext context = new MockStreamingRuntimeContext(true, 1, 1); mockedConsumer.setRuntimeContext(context); mockedConsumer.initializeState(initializationContext); @@ -309,6 +312,52 @@ public void testListStateChangedAfterSnapshotState() throws Exception { } } + /** + * Before using an explicit TypeSerializer for the state the {@link FlinkKinesisConsumer} was + * creating a serializer implicitly using a {@link TypeInformation}. After fixing issue + * FLINK-24943, * serializer is created explicitly. Here, we verify that previous approach is + * compatible with the new one. + */ + @Test + public void testExplicitStateSerializerCompatibility() throws Exception { + ExecutionConfig executionConfig = new ExecutionConfig(); + + Tuple2 tuple = + new Tuple2<>( + KinesisDataFetcher.convertToStreamShardMetadata( + new StreamShardHandle( + "fakeStream", + new Shard() + .withShardId( + KinesisShardIdGenerator + .generateFromShardOrder(0)))), + new SequenceNumber("1")); + + // This is how serializer was created implicitly using a TypeInformation + // and since SequenceNumber is GenericType, Flink falls back to Kryo + TypeInformation> originalShardsStateTypeInfo = + new TupleTypeInfo<>( + TypeInformation.of(StreamShardMetadata.class), + TypeInformation.of(SequenceNumber.class)); + TypeSerializer> serializerFromTypeInfo = + originalShardsStateTypeInfo.createSerializer(executionConfig); + byte[] bytes = InstantiationUtil.serializeToByteArray(serializerFromTypeInfo, tuple); + + // This is how we create serializer explicitly with Kryo + TupleSerializer> serializerFromKryo = + KinesisStateUtil.createShardsStateSerializer(executionConfig); + + Tuple2 actualTuple = + InstantiationUtil.deserializeFromByteArray(serializerFromKryo, bytes); + + // Both ways should be the same + assertThat(tuple) + .overridingErrorMessage( + "Explicit serializer is not compatible with " + + "implicit method of creating serializer using TypeInformation.") + .isEqualTo(actualTuple); + } + // ---------------------------------------------------------------------- // Tests related to fetcher initialization // ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java index 5be5f320ae7b4..fc7aca5e27125 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java @@ -20,10 +20,7 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; - -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; +import org.apache.flink.streaming.util.MockStreamingRuntimeContext; import java.util.Properties; @@ -39,27 +36,9 @@ public TestableFlinkKinesisConsumer( final int indexOfThisConsumerSubtask) { super(fakeStream, new SimpleStringSchema(), fakeConfiguration); - this.mockedRuntimeCtx = Mockito.mock(RuntimeContext.class); - - Mockito.when(mockedRuntimeCtx.getNumberOfParallelSubtasks()) - .thenAnswer( - new Answer() { - @Override - public Integer answer(InvocationOnMock invocationOnMock) - throws Throwable { - return totalNumOfConsumerSubtasks; - } - }); - - Mockito.when(mockedRuntimeCtx.getIndexOfThisSubtask()) - .thenAnswer( - new Answer() { - @Override - public Integer answer(InvocationOnMock invocationOnMock) - throws Throwable { - return indexOfThisConsumerSubtask; - } - }); + this.mockedRuntimeCtx = + new MockStreamingRuntimeContext( + true, totalNumOfConsumerSubtasks, indexOfThisConsumerSubtask); } @Override From 610c2420b09f7fd05039ccdb9d8a810bb5d7daf7 Mon Sep 17 00:00:00 2001 From: Alexander Egorov Date: Tue, 12 Jul 2022 12:56:13 -0500 Subject: [PATCH 2/2] 1. Create KinesisStateUtil class with static method `createShardsStateSerializer` that explicitly assigns `KryoSerializer` to `SequenceNumber` class and use this method to initialize state 2. Create unit test that checks for compatibility of previous `TypeInformation` based serializer and explicitly created `KryoSerializer` 3. Replace `Mokito` calls with utility `MockStreamingRuntimeContext` to improve testability and unify initialization method of Streaming Runtime Context --- .../kinesis/FlinkKinesisConsumer.java | 23 ++++---- .../kinesis/util/KinesisStateUtil.java | 56 ++++++++++++++++++ .../kinesis/FlinkKinesisConsumerTest.java | 59 +++++++++++++++++-- .../TestableFlinkKinesisConsumer.java | 29 ++------- 4 files changed, 126 insertions(+), 41 deletions(-) create mode 100644 flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisStateUtil.java diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index b0a729fb11eb5..f1c6536234b18 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -28,7 +28,6 @@ import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; @@ -46,6 +45,7 @@ import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisStateUtil; import org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil; import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker; import org.apache.flink.util.InstantiationUtil; @@ -228,7 +228,8 @@ public FlinkKinesisConsumer( "The provided deserialization schema is not serializable: " + deserializer.getClass().getName() + ". " - + "Please check that it does not contain references to non-serializable instances."); + + "Please check that it does not contain references " + + "to non-serializable instances."); this.deserializer = deserializer; StreamConsumerRegistrarUtil.eagerlyRegisterStreamConsumers(configProps, streams); @@ -370,7 +371,8 @@ public void run(SourceContext sourceContext) throws Exception { if (LOG.isInfoEnabled()) { LOG.info( - "Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}", + "Subtask {} will be seeded with initial shard {}, " + + "starting state set as sequence number {}", getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), startingSeqNum.get()); @@ -434,16 +436,13 @@ public TypeInformation getProducedType() { @Override public void initializeState(FunctionInitializationContext context) throws Exception { - TypeInformation> shardsStateTypeInfo = - new TupleTypeInfo<>( - TypeInformation.of(StreamShardMetadata.class), - TypeInformation.of(SequenceNumber.class)); - sequenceNumsStateForCheckpoint = context.getOperatorStateStore() .getUnionListState( new ListStateDescriptor<>( - sequenceNumsStateStoreName, shardsStateTypeInfo)); + sequenceNumsStateStoreName, + KinesisStateUtil.createShardsStateSerializer( + getRuntimeContext().getExecutionConfig()))); if (context.isRestored()) { if (sequenceNumsToRestore == null) { @@ -463,7 +462,8 @@ public void initializeState(FunctionInitializationContext context) throws Except } LOG.info( - "Setting restore state in the FlinkKinesisConsumer. Using the following offsets: {}", + "Setting restore state in the FlinkKinesisConsumer. " + + "Using the following offsets: {}", sequenceNumsToRestore); } } else { @@ -509,7 +509,8 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception { if (LOG.isDebugEnabled()) { LOG.debug( - "Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}", + "Snapshotted state, last processed sequence numbers: {}, " + + "checkpoint id: {}, timestamp: {}", lastStateSnapshot, context.getCheckpointId(), context.getCheckpointTimestamp()); diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisStateUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisStateUtil.java new file mode 100644 index 0000000000000..eba444056f336 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisStateUtil.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata; + +/** Utilities for Flink Kinesis connector state management. */ +public class KinesisStateUtil { + + /** To prevent instantiation of class. */ + private KinesisStateUtil() {} + + /** + * Creates state serializer for kinesis shard sequence number. Using of the explicit state + * serializer with KryoSerializer is needed because otherwise users cannot use + * 'disableGenericTypes' properties with KinesisConsumer, see FLINK-24943 for details + * + * @return state serializer + */ + public static TupleSerializer> + createShardsStateSerializer(ExecutionConfig executionConfig) { + // explicit serializer will keep the compatibility with GenericTypeInformation + // and allow to disableGenericTypes for users + TypeSerializer[] fieldSerializers = + new TypeSerializer[] { + TypeInformation.of(StreamShardMetadata.class).createSerializer(executionConfig), + new KryoSerializer<>(SequenceNumber.class, executionConfig) + }; + @SuppressWarnings("unchecked") + Class> tupleClass = + (Class>) (Class) Tuple2.class; + return new TupleSerializer<>(tupleClass, fieldSerializers); + } +} diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index 364a412c5d25c..ebbd827bac3e9 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -26,8 +26,11 @@ import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.common.time.Deadline; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.runtime.PojoSerializer; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.mock.Whitebox; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -54,10 +57,13 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils; import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisStateUtil; import org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter; import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.CollectingSourceContext; +import org.apache.flink.streaming.util.MockStreamingRuntimeContext; +import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.TestLogger; import com.amazonaws.services.kinesis.model.HashKeyRange; @@ -160,9 +166,7 @@ public void testUseRestoredStateForSnapshotIfFetcherNotInitialized() throws Exce FlinkKinesisConsumer consumer = new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config); - RuntimeContext context = mock(RuntimeContext.class); - when(context.getIndexOfThisSubtask()).thenReturn(0); - when(context.getNumberOfParallelSubtasks()).thenReturn(2); + RuntimeContext context = new MockStreamingRuntimeContext(true, 2, 0); consumer.setRuntimeContext(context); OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); @@ -280,8 +284,7 @@ public void testListStateChangedAfterSnapshotState() throws Exception { new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config); FlinkKinesisConsumer mockedConsumer = spy(consumer); - RuntimeContext context = mock(RuntimeContext.class); - when(context.getIndexOfThisSubtask()).thenReturn(1); + RuntimeContext context = new MockStreamingRuntimeContext(true, 1, 1); mockedConsumer.setRuntimeContext(context); mockedConsumer.initializeState(initializationContext); @@ -309,6 +312,52 @@ public void testListStateChangedAfterSnapshotState() throws Exception { } } + /** + * Before using an explicit TypeSerializer for the state the {@link FlinkKinesisConsumer} was + * creating a serializer implicitly using a {@link TypeInformation}. After fixing issue + * FLINK-24943, * serializer is created explicitly. Here, we verify that previous approach is + * compatible with the new one. + */ + @Test + public void testExplicitStateSerializerCompatibility() throws Exception { + ExecutionConfig executionConfig = new ExecutionConfig(); + + Tuple2 tuple = + new Tuple2<>( + KinesisDataFetcher.convertToStreamShardMetadata( + new StreamShardHandle( + "fakeStream", + new Shard() + .withShardId( + KinesisShardIdGenerator + .generateFromShardOrder(0)))), + new SequenceNumber("1")); + + // This is how serializer was created implicitly using a TypeInformation + // and since SequenceNumber is GenericType, Flink falls back to Kryo + TypeInformation> originalShardsStateTypeInfo = + new TupleTypeInfo<>( + TypeInformation.of(StreamShardMetadata.class), + TypeInformation.of(SequenceNumber.class)); + TypeSerializer> serializerFromTypeInfo = + originalShardsStateTypeInfo.createSerializer(executionConfig); + byte[] bytes = InstantiationUtil.serializeToByteArray(serializerFromTypeInfo, tuple); + + // This is how we create serializer explicitly with Kryo + TupleSerializer> serializerFromKryo = + KinesisStateUtil.createShardsStateSerializer(executionConfig); + + Tuple2 actualTuple = + InstantiationUtil.deserializeFromByteArray(serializerFromKryo, bytes); + + // Both ways should be the same + assertThat(tuple) + .overridingErrorMessage( + "Explicit serializer is not compatible with " + + "implicit method of creating serializer using TypeInformation.") + .isEqualTo(actualTuple); + } + // ---------------------------------------------------------------------- // Tests related to fetcher initialization // ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java index 5be5f320ae7b4..fc7aca5e27125 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java @@ -20,10 +20,7 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; - -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; +import org.apache.flink.streaming.util.MockStreamingRuntimeContext; import java.util.Properties; @@ -39,27 +36,9 @@ public TestableFlinkKinesisConsumer( final int indexOfThisConsumerSubtask) { super(fakeStream, new SimpleStringSchema(), fakeConfiguration); - this.mockedRuntimeCtx = Mockito.mock(RuntimeContext.class); - - Mockito.when(mockedRuntimeCtx.getNumberOfParallelSubtasks()) - .thenAnswer( - new Answer() { - @Override - public Integer answer(InvocationOnMock invocationOnMock) - throws Throwable { - return totalNumOfConsumerSubtasks; - } - }); - - Mockito.when(mockedRuntimeCtx.getIndexOfThisSubtask()) - .thenAnswer( - new Answer() { - @Override - public Integer answer(InvocationOnMock invocationOnMock) - throws Throwable { - return indexOfThisConsumerSubtask; - } - }); + this.mockedRuntimeCtx = + new MockStreamingRuntimeContext( + true, totalNumOfConsumerSubtasks, indexOfThisConsumerSubtask); } @Override