Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PipeConsensus: complete consensus prodedure and pipe components with new thrift service #12355

Merged
merged 121 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
121 commits
Select commit Hold shift + click to select a range
d7906a7
docs: add some comment
Pengzna Apr 11, 2024
2aa6a3d
Merge branch 'master' into pipe-consensus
Pengzna Apr 11, 2024
6877a92
feat: initialize PipeConsensus thrift service
Pengzna Apr 16, 2024
39ab10a
feat: add PipeConsensus config
Pengzna Apr 16, 2024
0171bb4
feat: add PipeConsensus config
Pengzna Apr 16, 2024
ccd134c
revert comment
Pengzna Apr 16, 2024
c7fc9ab
feat: add receiver logic
Pengzna Apr 16, 2024
cc70e6b
feat: add connector logic
Pengzna Apr 16, 2024
2f91090
feat: add receiver max waiting time
Pengzna Apr 16, 2024
6541ac7
fix review
Pengzna Apr 17, 2024
b901cc7
fix review
Pengzna Apr 17, 2024
e8d183c
add comment and temporarily complete receiver
Pengzna Apr 18, 2024
bae4893
feat: add clientManager and handshake logic
Pengzna Apr 20, 2024
a6493b7
feat: construct pipeConsensus payload
Pengzna Apr 20, 2024
463085b
fix: client manager code style
Pengzna Apr 20, 2024
87561fa
fix: manage event.referenceCount as review and add retry logic
Pengzna Apr 20, 2024
c09f027
fix: transfer request
Pengzna Apr 21, 2024
13ce8e6
feat: sync connector
Pengzna Apr 21, 2024
dbf9c61
feat: add new RPC service and client manager
Pengzna Apr 22, 2024
73dc7c9
feat: initialize handler
Pengzna Apr 22, 2024
d26acfb
refactor: adjust new RPC service
Pengzna Apr 22, 2024
0461eac
Merge remote-tracking branch 'base/master' into pipe-consensus
Pengzna Apr 22, 2024
2fcc786
refactor: adopt tsFile dependency and remove RPC to new consensus int…
Pengzna Apr 22, 2024
8953c3c
fix: redesign batch interface
Pengzna Apr 23, 2024
c0bcf50
wip: build bug-free
Pengzna Apr 23, 2024
b3f8842
feat: support tablet batch
Pengzna Apr 24, 2024
b020a46
feat: async connector handler
Pengzna Apr 24, 2024
93e3877
feat: complete async connector
Pengzna Apr 24, 2024
378c2b2
fix: tsFile pieces transfer
Pengzna Apr 25, 2024
90ffd12
fix: remove threadlocal in receiver
Pengzna Apr 26, 2024
32f95b5
fix: remove unnecessary handshake between RPC client and server
Pengzna Apr 26, 2024
95ba9ee
feat: integration mock connector with pipe
Pengzna May 7, 2024
fadaec4
refactor: add config dir
Pengzna May 7, 2024
35e32e7
fix: make RPC interface in iotdb-consensus and impl it in iotdb-core
Pengzna May 7, 2024
0ed9cba
test: add ut and fix some bugs
Pengzna May 7, 2024
c9d7d6f
test: fix ut fail
Pengzna May 7, 2024
5bc3a6a
finish pipe consensus
yschengzi May 11, 2024
2274c18
finish progress index
yschengzi May 14, 2024
c8519bb
finish procedure
yschengzi May 14, 2024
1623ab4
Merge branch 'pr/12355' into IOTDB-6321
yschengzi May 14, 2024
ac33113
working on merge
yschengzi May 15, 2024
a85ba1d
Merge pull request #1 from yschengzi/IOTDB-6321
Pengzna May 15, 2024
da0a0f1
internal thrift finish
yschengzi May 20, 2024
cc0594f
dependency revert and init for RPC
Pengzna May 20, 2024
9bedc4b
Merge remote-tracking branch 'refs/remotes/origin/pipe-consensus' int…
Pengzna May 20, 2024
d22251a
add ConsensusGroupId to pipeConsensus components
Pengzna May 20, 2024
4fa8dcb
add ConsensusGroupId to pipeConsensus connector
Pengzna May 20, 2024
1201491
finish pipe consensus thrift interface
yschengzi May 20, 2024
ec624ec
complete receive
Pengzna May 20, 2024
7a26e95
Merge remote-tracking branch 'refs/remotes/origin/pipe-consensus' int…
Pengzna May 20, 2024
a103e29
complete tsfile load framework
Pengzna May 20, 2024
3201dd1
Merge remote-tracking branch 'apache/master' into IOTDB-6321
yschengzi May 20, 2024
b058dc0
add exit logic for receiver
Pengzna May 20, 2024
3a7da12
fix
Pengzna May 20, 2024
b36a70e
finish merge master
yschengzi May 20, 2024
4acd059
fix
Pengzna May 20, 2024
d65abe7
Merge branch 'pipe-consensus' of https://github.com/Pengzna/iotdb int…
yschengzi May 20, 2024
f31aaa1
fix receiver start error
Pengzna May 20, 2024
f105e3f
optimize: asynchronizedly load tsFilePiece
Pengzna May 21, 2024
8f0da5a
precheck for read-only; null impl; inactive impl, etc.
Pengzna May 21, 2024
0adf91a
finish progress index
yschengzi May 21, 2024
f4d5982
Merge branch 'pipe-consensus' of https://github.com/Pengzna/iotdb int…
yschengzi May 21, 2024
2854d36
fix pipe_receiver_file_dirs path error in .bat and add consensus file…
Pengzna May 21, 2024
3c3252d
add consensus receiver file dirs config
Pengzna May 21, 2024
64ae178
remove useless todo
Pengzna May 21, 2024
5414ac9
fix review
Pengzna May 21, 2024
c400327
finish progress index for loading
yschengzi May 21, 2024
5364278
finish pipe consensus processor
yschengzi May 21, 2024
3b1285a
license
yschengzi May 21, 2024
0b2fb35
Merge branch 'pipe-consensus' of https://github.com/Pengzna/iotdb int…
yschengzi May 21, 2024
5764edb
Merge remote-tracking branch 'apache/master' into IOTDB-6321
yschengzi May 21, 2024
4683940
fix tsfile resouroce
yschengzi May 22, 2024
c90c54e
add dataNode route for receiver
Pengzna May 23, 2024
4c9a2cb
add replicate test
Pengzna May 23, 2024
e9e4037
Merge remote-tracking branch 'refs/remotes/base/master' into pipe-con…
Pengzna May 23, 2024
82d71c3
sync connector with mainstream pipe
Pengzna May 23, 2024
642318b
fix review
Pengzna May 23, 2024
61c2e6c
Merge remote-tracking branch 'refs/remotes/origin/pipe-consensus' int…
Pengzna May 23, 2024
5c9a3d6
fix review and give each receiver separate base file dirs
Pengzna May 23, 2024
d4b63ee
expose streamConsensus and batchConsensus to user
Pengzna May 24, 2024
d80650e
fix starting bug found in test
Pengzna May 24, 2024
0ce025b
remove fsync when transfer TsFile
Pengzna May 24, 2024
c919efc
add test
Pengzna May 24, 2024
b6a0dd0
add timeout
Pengzna May 24, 2024
c240abe
fix executor
Pengzna May 26, 2024
f92ceab
fix rebootTime
Pengzna May 26, 2024
c1d5c9d
fix validation
Pengzna May 26, 2024
927d6f1
fix create pipe
Pengzna May 26, 2024
4b18454
license
Pengzna May 26, 2024
55bfc27
connector parallel task set to 1 and add logs
Pengzna May 26, 2024
7b5f3d3
use pipe Consensus Processor
Pengzna May 26, 2024
4e5db8d
Merge remote-tracking branch 'refs/remotes/base/master' into pipe-con…
Pengzna May 26, 2024
c54896c
remove useless and fix pom
Pengzna May 26, 2024
3d050c0
delete ut temporarily
Pengzna May 26, 2024
e0e3c2b
fix review
Pengzna May 26, 2024
18aaf8a
remove unnecessary clean path
Pengzna May 26, 2024
8448fa1
First fix
Caideyipi May 28, 2024
d4d7348
Restore shells
Caideyipi May 28, 2024
f168839
delete raw
Caideyipi May 28, 2024
1ef6c77
Added IT
Caideyipi May 28, 2024
8cbf3f5
Change IT
Caideyipi May 28, 2024
85b9e00
fix
Pengzna May 29, 2024
15f29fe
fix review
Pengzna May 29, 2024
54d3d8a
Merge pull request #2 from Caideyipi/consensus-apply-comment
Pengzna May 29, 2024
80bc979
Merge remote-tracking branch 'refs/remotes/origin/pipe-consensus' int…
Pengzna May 29, 2024
821395d
fix all review
Pengzna May 29, 2024
c130490
Merge branch 'master' of https://github.com/apache/iotdb into pr/12355
SteveYurongSu May 29, 2024
7ab7798
revert IT and fix review
Pengzna May 29, 2024
41c7794
Merge remote-tracking branch 'refs/remotes/origin/pipe-consensus' int…
Pengzna May 29, 2024
a7f0b22
Merge remote-tracking branch 'refs/remotes/base/master' into pipe-con…
Pengzna May 29, 2024
3295635
fix: build success
Pengzna May 29, 2024
47bb893
fix review of ysc's part
Pengzna May 30, 2024
514d124
fix assignProgressIndexForTsFileRecovery
Pengzna May 30, 2024
7341cbc
fix dependency
Pengzna May 30, 2024
3592e77
fix dependency
Pengzna May 30, 2024
2982404
fix dependency
Pengzna May 30, 2024
465da4d
fix review
Pengzna May 30, 2024
5ce827b
Merge remote-tracking branch 'refs/remotes/base/master' into pipe-con…
Pengzna May 30, 2024
2c42b21
fix merge conflict
Pengzna May 31, 2024
d7362e3
Merge remote-tracking branch 'refs/remotes/base/master' into pipe-con…
Pengzna May 31, 2024
c22ea49
fix review and delete useless exit func
Pengzna May 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ public enum TSStatusCode {
DROP_CONSUMER_ERROR(2101),
ALTER_CONSUMER_ERROR(2102),
CONSUMER_PUSH_META_ERROR(2103),

// Pipe Consensus
PIPE_CONSENSUS_CONNECTOR_RESTART_ERROR(2200),
PIPE_CONSENSUS_VERSION_ERROR(2201),
;

private final int statusCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.receiver.protocol.airgap.IoTDBAirGapReceiverAgent;
import org.apache.iotdb.db.pipe.receiver.protocol.legacy.IoTDBLegacyPipeReceiverAgent;
import org.apache.iotdb.db.pipe.receiver.protocol.pipeconsensus.PipeConsensusReceiverAgent;
import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiverAgent;

import org.slf4j.Logger;
Expand All @@ -39,11 +40,13 @@ public class PipeDataNodeReceiverAgent {
private final IoTDBDataNodeReceiverAgent thriftAgent;
private final IoTDBAirGapReceiverAgent airGapAgent;
private final IoTDBLegacyPipeReceiverAgent legacyAgent;
private final PipeConsensusReceiverAgent pipeConsensusAgent;

public PipeDataNodeReceiverAgent() {
thriftAgent = new IoTDBDataNodeReceiverAgent();
airGapAgent = new IoTDBAirGapReceiverAgent();
legacyAgent = new IoTDBLegacyPipeReceiverAgent();
pipeConsensusAgent = new PipeConsensusReceiverAgent();
}

public IoTDBDataNodeReceiverAgent thrift() {
Expand All @@ -58,6 +61,10 @@ public IoTDBLegacyPipeReceiverAgent legacy() {
return legacyAgent;
}

public PipeConsensusReceiverAgent pipeConsensus() {
return pipeConsensusAgent;
}

public void cleanPipeReceiverDirs() {
String[] pipeReceiverFileDirs =
IoTDBDescriptor.getInstance().getConfig().getPipeReceiverFileDirs();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus;

import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

public class PipeConsensusAsyncConnector extends IoTDBDataRegionAsyncConnector {

private static final Logger LOGGER = LoggerFactory.getLogger(PipeConsensusAsyncConnector.class);

private static final String ENQUEUE_EXCEPTION_MSG =
"Timeout: PipeConsensusConnector offers an event into transferBuffer failed, because transferBuffer is full";

private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig();

private final BlockingQueue<Event> transferBuffer =
new LinkedBlockingDeque<>(COMMON_CONFIG.getPipeConsensusEventBufferSize());

/** Add an event to transferBuffer, whose events will be asynchronizedly transfer to receiver. */
private boolean addEvent2Buffer(Event event) {
Pengzna marked this conversation as resolved.
Show resolved Hide resolved
try {
LOGGER.debug(
Pengzna marked this conversation as resolved.
Show resolved Hide resolved
"PipeConsensus connector: one event enqueue, queue size = {}, limit size = {}",
transferBuffer.size(),
COMMON_CONFIG.getPipeConsensusEventBufferSize());
return transferBuffer.offer(
event, COMMON_CONFIG.getPipeConsensusEventEnqueueTimeoutInMs(), TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOGGER.info("PipeConsensusConnector transferBuffer queue offer is interrupted.", e);
Thread.currentThread().interrupt();
return false;
}
}

/**
* if one event is successfully processed by receiver in PipeConsensus, we will remove this event
* from transferBuffer in order to transfer other event.
*/
public void removeEventFromBuffer(Event event) {
synchronized (this) {
Pengzna marked this conversation as resolved.
Show resolved Hide resolved
LOGGER.debug(
"PipeConsensus connector: one event removed from queue, queue size = {}, limit size = {}",
transferBuffer.size(),
COMMON_CONFIG.getPipeConsensusEventBufferSize());
Iterator<Event> iterator = transferBuffer.iterator();
Event current = iterator.next();
while (!current.equals(event) && iterator.hasNext()) {
current = iterator.next();
}
iterator.remove();
}
}

@Override
public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
boolean enqueueResult = addEvent2Buffer(tabletInsertionEvent);
if (!enqueueResult) {
throw new PipeException(ENQUEUE_EXCEPTION_MSG);
}
// TODO:改造 request,加上 commitId 和 rebootTimes
// TODO: 改造 handler,onComplete 的优化 + onComplete 加上出队逻辑
super.transfer(tabletInsertionEvent);
}

@Override
public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
boolean enqueueResult = addEvent2Buffer(tsFileInsertionEvent);
if (!enqueueResult) {
throw new PipeException(ENQUEUE_EXCEPTION_MSG);
}
super.transfer(tsFileInsertionEvent);
}

@Override
public void transfer(Event event) throws Exception {
boolean enqueueResult = addEvent2Buffer(event);
if (!enqueueResult) {
throw new PipeException(ENQUEUE_EXCEPTION_MSG);
}
super.transfer(event);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.pipe.receiver.protocol.pipeconsensus;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
import org.apache.iotdb.mpp.rpc.thrift.TPipeConsensusTransferReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeConsensusTransferResp;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class PipeConsensusReceiver extends IoTDBDataNodeReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeConsensusReceiver.class);
private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig();
private final RequestExecutor requestExecutor = new RequestExecutor();

/**
* This method cannot be set to synchronize. Receive events can be concurrent since reqBuffer but
* load event must be synchronized.
*/
public TPipeConsensusTransferResp receive(final TPipeConsensusTransferReq req) {
return requestExecutor.onRequest(req);
}

private TPipeConsensusTransferResp loadEvent(final TPipeConsensusTransferReq req) {
// synchronized load event
// TODO: use DataRegionStateMachine to impl it.
return null;
}

/**
* An executor component to ensure all events sent from connector can be loaded in sequence,
* although events can arrive receiver in a random sequence.
*/
private class RequestExecutor {
// A min heap that buffers transfer request, whose length is not larger than
// PIPE_CONSENSUS_EVENT_BUFFER_SIZE
private final PriorityQueue<WrappedRequest> reqBuffer;
private final Lock lock;
private final Condition condition;
private int onSyncedCommitIndex = -1;
private int connectorRebootTimes = 0;

public RequestExecutor() {
reqBuffer =
new PriorityQueue<>(
COMMON_CONFIG.getPipeConsensusEventBufferSize(),
Comparator.comparingInt(WrappedRequest::getRebootTime)
.thenComparingInt(WrappedRequest::getCommitIndex));
lock = new ReentrantLock();
condition = lock.newCondition();
}

private TPipeConsensusTransferResp onRequest(final TPipeConsensusTransferReq req) {
lock.lock();
WrappedRequest wrappedReq = new WrappedRequest(req);
try {
reqBuffer.offer(wrappedReq);
// Judge whether connector has rebooted or not, if the rebootTimes increases compared to
// connectorRebootTimes, need to reset receiver because connector has been restarted.
if (wrappedReq.getRebootTime() > connectorRebootTimes) {
reset(connectorRebootTimes);
final TSStatus status =
new TSStatus(
RpcUtils.getStatus(
TSStatusCode.PIPE_CONSENSUS_CONNECTOR_RESTART_ERROR,
"PipeConsensus receiver identified the restart of connector, thus reset itself and reject event load temporarily"));
return new TPipeConsensusTransferResp(status);
}

// Polling to process
while (true) {
if (reqBuffer.peek().equals(req)
&& wrappedReq.getCommitIndex() == onSyncedCommitIndex + 1) {
// If current req is supposed to be process, load this event through
// DataRegionStateMachine.
TPipeConsensusTransferResp resp = loadEvent(req);
reqBuffer.remove();
onSyncedCommitIndex++;
return resp;
}

if (reqBuffer.size() >= COMMON_CONFIG.getPipeConsensusEventBufferSize()) {
// If the reqBuffer is full and its peek is hold by current thread, load this event.
if (reqBuffer.peek().equals(req)) {
TPipeConsensusTransferResp resp = loadEvent(req);
reqBuffer.remove();
onSyncedCommitIndex = wrappedReq.getCommitIndex();
return resp;
} else {
// If reqBuffer is full and current thread do not hold the reqBuffer's peek, this req
// is not supposed to be processed. So current thread should notify the corresponding
// threads to process the peek.
condition.signalAll();
}
} else {
// if the req is not supposed to be processed and reqBuffer is not full, current thread
// should wait until reqBuffer is full, which indicates the receiver has received all
// the requests from the connector without duplication or leakage.
try {
condition.await(
COMMON_CONFIG.getPipeConsensusEventEnqueueTimeoutInMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOGGER.warn(
"current waiting is interrupted. onSyncedCommitIndex: {}. Exception: ",
wrappedReq.getCommitIndex(),
e);
Thread.currentThread().interrupt();
}
}
}
} finally {
lock.unlock();
}
}

/**
* Reset all data to initial status and set connectorRebootTimes properly. This method is called
* when receiver identifies connector has rebooted.
*/
private void reset(int connectorRebootTimes) {
this.reqBuffer.clear();
this.onSyncedCommitIndex = -1;
this.connectorRebootTimes = connectorRebootTimes;
}
}

/**
* Wrapped TPipeConsensusTransferReq for RequestExecutor.reqBuffer in order to save memory
* allocation. We don’t really need to hold a reference to TPipeConsensusTransferReq here, because
* we only need the commitId information of TPipeConsensusTransferReq in the
* RequestExecutor.reqBuffer.
*/
private static class WrappedRequest {
Pengzna marked this conversation as resolved.
Show resolved Hide resolved
final int rebootTime;
final int commitIndex;

public WrappedRequest(TPipeConsensusTransferReq req) {
this.rebootTime = req.rebootTimes;
this.commitIndex = req.commitIndex;
}

public int getRebootTime() {
return rebootTime;
}

public int getCommitIndex() {
return commitIndex;
}
}
}
Loading