Skip to content

Commit

Permalink
[3.0][cdc-runtime] add DataSinkWriterOperator to process Event (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
lvyanquan authored Nov 9, 2023
1 parent c52ccfa commit 80dfc51
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ververica.cdc.runtime.operators.schema.event;

import org.apache.flink.streaming.runtime.operators.sink.DataSinkWriterOperator;

import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.runtime.operators.schema.SchemaOperator;

/**
* An {@link Event} from {@link SchemaOperator} to notify {@link DataSinkWriterOperator} that it
* start flushing.
*/
public class FlushEvent implements Event {

private static final long serialVersionUID = 1L;

private final TableId tableId;

public FlushEvent(TableId tableId) {
this.tableId = tableId;
}

public TableId getTableId() {
return tableId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.flink.runtime.operators.coordination.OperatorEvent;

import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaOperatorCoordinator;

/**
Expand All @@ -26,4 +27,21 @@
*/
public class FlushSuccessEvent implements OperatorEvent {
private static final long serialVersionUID = 1L;

private final int subtask;

private final TableId tableId;

public FlushSuccessEvent(int subtask, TableId tableId) {
this.subtask = subtask;
this.tableId = tableId;
}

public int getSubtask() {
return subtask;
}

public TableId getTableId() {
return tableId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,16 @@

/** A {@link OperatorEvent} that register sink writer to {@link SchemaOperatorCoordinator}. */
public class SinkWriterRegisterEvent implements OperatorEvent {

private static final long serialVersionUID = 1L;

private final int subtask;

public SinkWriterRegisterEvent(int subtask) {
this.subtask = subtask;
}

public int getSubtask() {
return subtask;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ververica.cdc.runtime.operators.sink;

import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.streaming.runtime.operators.sink.DataSinkWriterOperator;
import org.apache.flink.util.SerializedValue;

import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.runtime.operators.schema.SchemaOperator;
import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaOperatorCoordinator;
import com.ververica.cdc.runtime.operators.schema.event.FlushSuccessEvent;
import com.ververica.cdc.runtime.operators.schema.event.SinkWriterRegisterEvent;

import java.io.IOException;

/**
* Client for {@link DataSinkWriterOperator} interact with {@link SchemaOperatorCoordinator} when
* table schema evolution happened.
*/
public class SchemaEvolutionClient {

private final TaskOperatorEventGateway toCoordinator;

/** a determinant OperatorID of {@link SchemaOperator}. */
private final OperatorID schemaOperatorID;

public SchemaEvolutionClient(
TaskOperatorEventGateway toCoordinator, OperatorID schemaOperatorID) {
this.toCoordinator = toCoordinator;
this.schemaOperatorID = schemaOperatorID;
}

/** send {@link SinkWriterRegisterEvent} to {@link SchemaOperatorCoordinator}. */
public void registerSubtask(int subtask) throws IOException {
toCoordinator.sendOperatorEventToCoordinator(
schemaOperatorID, new SerializedValue<>(new SinkWriterRegisterEvent(subtask)));
}

/** send {@link FlushSuccessEvent} to {@link SchemaOperatorCoordinator}. */
public void notifyFlushSuccess(int subtask, TableId tableId) throws IOException {
toCoordinator.sendOperatorEventToCoordinator(
schemaOperatorID, new SerializedValue<>(new FlushSuccessEvent(subtask, tableId)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.operators.sink;

import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;

import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.runtime.operators.schema.event.FlushEvent;
import com.ververica.cdc.runtime.operators.sink.SchemaEvolutionClient;

import java.lang.reflect.Field;

/**
* An operator that processes records to be written into a {@link
* org.apache.flink.api.connector.sink2.Sink}. It also has a way to process committables with the
* same parallelism or send them downstream to a {@link CommitterOperator} with a different
* parallelism.
*
* <p>The operator is always part of a sink pipeline and is the first operator.
*
* @param <CommT> the type of the committable (to send to downstream operators)
*/
public class DataSinkWriterOperator<CommT> extends SinkWriterOperator<Event, CommT> {

private SchemaEvolutionClient schemaEvolutionClient;

private SinkWriter<Event> copySinkWriter;

private final OperatorID schemaOperatorID;

public DataSinkWriterOperator(
Sink<Event> sink,
ProcessingTimeService processingTimeService,
MailboxExecutor mailboxExecutor,
OperatorID schemaOperatorID) {
super(sink, processingTimeService, mailboxExecutor);
this.schemaOperatorID = schemaOperatorID;
}

@Override
public void setup(StreamTask containingTask, StreamConfig config, Output output) {
super.setup(containingTask, config, output);
schemaEvolutionClient =
new SchemaEvolutionClient(
containingTask.getEnvironment().getOperatorCoordinatorEventGateway(),
schemaOperatorID);
}

@Override
public void open() throws Exception {
super.open();
copySinkWriter = (SinkWriter) getFieldValue("sinkWriter");
}

/**
* Finds a field by name from its declaring class. This also searches for the field in super
* classes.
*
* @param fieldName the name of the field to find.
* @return the Object value of this field.
*/
private Object getFieldValue(String fieldName) throws IllegalAccessException {
Class clazz = this.getClass();
while (clazz != null) {
try {
Field field = clazz.getDeclaredField(fieldName);
field.setAccessible(true);
return field.get(this);
} catch (NoSuchFieldException e) {
clazz = clazz.getSuperclass();
}
}
return null;
}

@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
schemaEvolutionClient.registerSubtask(getRuntimeContext().getIndexOfThisSubtask());
}

@Override
public void processElement(StreamRecord<Event> element) throws Exception {
Event event = element.getValue();
if (event instanceof FlushEvent) {
copySinkWriter.flush(false);
schemaEvolutionClient.notifyFlushSuccess(
getRuntimeContext().getIndexOfThisSubtask(), ((FlushEvent) event).getTableId());
} else {
super.processElement(element);
}
}
}

0 comments on commit 80dfc51

Please sign in to comment.