From 80dfc515a1ec26c3a1d46aa6cbc7264d0ff3a8c9 Mon Sep 17 00:00:00 2001
From: Kunni <decq12ybhl@gmail.com>
Date: Thu, 9 Nov 2023 16:34:17 +0800
Subject: [PATCH] [3.0][cdc-runtime] add DataSinkWriterOperator to process
 Event (#2649)

---
 .../operators/schema/event/FlushEvent.java    |  42 +++++++
 .../schema/event/FlushSuccessEvent.java       |  18 +++
 .../schema/event/SinkWriterRegisterEvent.java |  11 ++
 .../operators/sink/SchemaEvolutionClient.java |  60 +++++++++
 .../sink/DataSinkWriterOperator.java          | 116 ++++++++++++++++++
 5 files changed, 247 insertions(+)
 create mode 100644 flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/FlushEvent.java
 create mode 100644 flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/sink/SchemaEvolutionClient.java
 create mode 100644 flink-cdc-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/DataSinkWriterOperator.java

diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/FlushEvent.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/FlushEvent.java
new file mode 100644
index 00000000000..fdbd579de6f
--- /dev/null
+++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/FlushEvent.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2023 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.runtime.operators.schema.event;
+
+import org.apache.flink.streaming.runtime.operators.sink.DataSinkWriterOperator;
+
+import com.ververica.cdc.common.event.Event;
+import com.ververica.cdc.common.event.TableId;
+import com.ververica.cdc.runtime.operators.schema.SchemaOperator;
+
+/**
+ * An {@link Event} from {@link SchemaOperator} to notify {@link DataSinkWriterOperator} that it
+ * start flushing.
+ */
+public class FlushEvent implements Event {
+
+    private static final long serialVersionUID = 1L;
+
+    private final TableId tableId;
+
+    public FlushEvent(TableId tableId) {
+        this.tableId = tableId;
+    }
+
+    public TableId getTableId() {
+        return tableId;
+    }
+}
diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/FlushSuccessEvent.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/FlushSuccessEvent.java
index 87c7bfa2574..e5d09fc0938 100644
--- a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/FlushSuccessEvent.java
+++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/FlushSuccessEvent.java
@@ -18,6 +18,7 @@
 
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 
+import com.ververica.cdc.common.event.TableId;
 import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaOperatorCoordinator;
 
 /**
@@ -26,4 +27,21 @@
  */
 public class FlushSuccessEvent implements OperatorEvent {
     private static final long serialVersionUID = 1L;
+
+    private final int subtask;
+
+    private final TableId tableId;
+
+    public FlushSuccessEvent(int subtask, TableId tableId) {
+        this.subtask = subtask;
+        this.tableId = tableId;
+    }
+
+    public int getSubtask() {
+        return subtask;
+    }
+
+    public TableId getTableId() {
+        return tableId;
+    }
 }
diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/SinkWriterRegisterEvent.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/SinkWriterRegisterEvent.java
index 596531b5c68..7e60081bf40 100644
--- a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/SinkWriterRegisterEvent.java
+++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/SinkWriterRegisterEvent.java
@@ -22,5 +22,16 @@
 
 /** A {@link OperatorEvent} that register sink writer to {@link SchemaOperatorCoordinator}. */
 public class SinkWriterRegisterEvent implements OperatorEvent {
+
     private static final long serialVersionUID = 1L;
+
+    private final int subtask;
+
+    public SinkWriterRegisterEvent(int subtask) {
+        this.subtask = subtask;
+    }
+
+    public int getSubtask() {
+        return subtask;
+    }
 }
diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/sink/SchemaEvolutionClient.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/sink/SchemaEvolutionClient.java
new file mode 100644
index 00000000000..b4d13f6691b
--- /dev/null
+++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/sink/SchemaEvolutionClient.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2023 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.runtime.operators.sink;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
+import org.apache.flink.streaming.runtime.operators.sink.DataSinkWriterOperator;
+import org.apache.flink.util.SerializedValue;
+
+import com.ververica.cdc.common.event.TableId;
+import com.ververica.cdc.runtime.operators.schema.SchemaOperator;
+import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaOperatorCoordinator;
+import com.ververica.cdc.runtime.operators.schema.event.FlushSuccessEvent;
+import com.ververica.cdc.runtime.operators.schema.event.SinkWriterRegisterEvent;
+
+import java.io.IOException;
+
+/**
+ * Client for {@link DataSinkWriterOperator} interact with {@link SchemaOperatorCoordinator} when
+ * table schema evolution happened.
+ */
+public class SchemaEvolutionClient {
+
+    private final TaskOperatorEventGateway toCoordinator;
+
+    /** a determinant OperatorID of {@link SchemaOperator}. */
+    private final OperatorID schemaOperatorID;
+
+    public SchemaEvolutionClient(
+            TaskOperatorEventGateway toCoordinator, OperatorID schemaOperatorID) {
+        this.toCoordinator = toCoordinator;
+        this.schemaOperatorID = schemaOperatorID;
+    }
+
+    /** send {@link SinkWriterRegisterEvent} to {@link SchemaOperatorCoordinator}. */
+    public void registerSubtask(int subtask) throws IOException {
+        toCoordinator.sendOperatorEventToCoordinator(
+                schemaOperatorID, new SerializedValue<>(new SinkWriterRegisterEvent(subtask)));
+    }
+
+    /** send {@link FlushSuccessEvent} to {@link SchemaOperatorCoordinator}. */
+    public void notifyFlushSuccess(int subtask, TableId tableId) throws IOException {
+        toCoordinator.sendOperatorEventToCoordinator(
+                schemaOperatorID, new SerializedValue<>(new FlushSuccessEvent(subtask, tableId)));
+    }
+}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/DataSinkWriterOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/DataSinkWriterOperator.java
new file mode 100644
index 00000000000..157a5ac32fe
--- /dev/null
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/DataSinkWriterOperator.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2023 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import com.ververica.cdc.common.event.Event;
+import com.ververica.cdc.runtime.operators.schema.event.FlushEvent;
+import com.ververica.cdc.runtime.operators.sink.SchemaEvolutionClient;
+
+import java.lang.reflect.Field;
+
+/**
+ * An operator that processes records to be written into a {@link
+ * org.apache.flink.api.connector.sink2.Sink}. It also has a way to process committables with the
+ * same parallelism or send them downstream to a {@link CommitterOperator} with a different
+ * parallelism.
+ *
+ * <p>The operator is always part of a sink pipeline and is the first operator.
+ *
+ * @param <CommT> the type of the committable (to send to downstream operators)
+ */
+public class DataSinkWriterOperator<CommT> extends SinkWriterOperator<Event, CommT> {
+
+    private SchemaEvolutionClient schemaEvolutionClient;
+
+    private SinkWriter<Event> copySinkWriter;
+
+    private final OperatorID schemaOperatorID;
+
+    public DataSinkWriterOperator(
+            Sink<Event> sink,
+            ProcessingTimeService processingTimeService,
+            MailboxExecutor mailboxExecutor,
+            OperatorID schemaOperatorID) {
+        super(sink, processingTimeService, mailboxExecutor);
+        this.schemaOperatorID = schemaOperatorID;
+    }
+
+    @Override
+    public void setup(StreamTask containingTask, StreamConfig config, Output output) {
+        super.setup(containingTask, config, output);
+        schemaEvolutionClient =
+                new SchemaEvolutionClient(
+                        containingTask.getEnvironment().getOperatorCoordinatorEventGateway(),
+                        schemaOperatorID);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        copySinkWriter = (SinkWriter) getFieldValue("sinkWriter");
+    }
+
+    /**
+     * Finds a field by name from its declaring class. This also searches for the field in super
+     * classes.
+     *
+     * @param fieldName the name of the field to find.
+     * @return the Object value of this field.
+     */
+    private Object getFieldValue(String fieldName) throws IllegalAccessException {
+        Class clazz = this.getClass();
+        while (clazz != null) {
+            try {
+                Field field = clazz.getDeclaredField(fieldName);
+                field.setAccessible(true);
+                return field.get(this);
+            } catch (NoSuchFieldException e) {
+                clazz = clazz.getSuperclass();
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws Exception {
+        super.initializeState(context);
+        schemaEvolutionClient.registerSubtask(getRuntimeContext().getIndexOfThisSubtask());
+    }
+
+    @Override
+    public void processElement(StreamRecord<Event> element) throws Exception {
+        Event event = element.getValue();
+        if (event instanceof FlushEvent) {
+            copySinkWriter.flush(false);
+            schemaEvolutionClient.notifyFlushSuccess(
+                    getRuntimeContext().getIndexOfThisSubtask(), ((FlushEvent) event).getTableId());
+        } else {
+            super.processElement(element);
+        }
+    }
+}