Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP]optimize : support multi-version(sdk-version) codec & fix not returning client registration failure msg #7000

Open
wants to merge 22 commits into
base: 2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,19 @@ public interface ProtocolConstants {
byte VERSION_0 = 0;

/**
* Protocol version
* Protocol version 1
*/
byte VERSION_1 = 1;

/**
* Protocol version 2
*/
byte VERSION_2 = 2;

/**
* Protocol version
*/
byte VERSION = VERSION_1;
byte VERSION = VERSION_2;

/**
* Max frame length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public RegisterRMResponse() {
public RegisterRMResponse(boolean result) {
super();
setIdentified(result);
setResultCode(result ? ResultCode.Success : ResultCode.Failed);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public RegisterTMResponse() {
public RegisterTMResponse(boolean result) {
super();
setIdentified(result);
setResultCode(result ? ResultCode.Success : ResultCode.Failed);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ public class Version {
private static final String VERSION_0_7_1 = "0.7.1";
private static final String VERSION_1_5_0 = "1.5.0";
private static final String VERSION_2_3_0 = "2.3.0";

public static final String VERSION_0_7_0 = "0.7.0";

private static final int MAX_VERSION_DOT = 3;

/**
Expand Down Expand Up @@ -94,6 +97,10 @@ public static boolean isAboveOrEqualVersion230(String version) {
return isAboveOrEqualVersion(version, VERSION_2_3_0);
}

public static boolean isAboveOrEqualVersion071(String version) {
return isAboveOrEqualVersion(version, VERSION_0_7_1);
}

public static boolean isAboveOrEqualVersion(String clientVersion, String divideVersion) {
boolean isAboveOrEqualVersion = false;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.seata.core.rpc.netty.v0.ProtocolEncoderV0;
import org.apache.seata.core.rpc.netty.v1.ProtocolDecoderV1;
import org.apache.seata.core.rpc.netty.v1.ProtocolEncoderV1;
import org.apache.seata.core.rpc.netty.v2.ProtocolDecoderV2;
import org.apache.seata.core.rpc.netty.v2.ProtocolEncoderV2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -83,12 +85,16 @@ int lengthFieldLength, FullLength is int(4B). so values is 4
int initialBytesToStrip we will check magic code and version self, so do not strip any bytes. so values is 0
*/
super(maxFrameLength, 3, 4, -7, 0);
this.protocolDecoderMap =
ImmutableMap.<Byte, ProtocolDecoder>builder().put(ProtocolConstants.VERSION_0, new ProtocolDecoderV0())
.put(ProtocolConstants.VERSION_1, new ProtocolDecoderV1()).build();
this.protocolEncoderMap =
ImmutableMap.<Byte, ProtocolEncoder>builder().put(ProtocolConstants.VERSION_0, new ProtocolEncoderV0())
.put(ProtocolConstants.VERSION_1, new ProtocolEncoderV1()).build();
this.protocolDecoderMap = ImmutableMap.<Byte, ProtocolDecoder>builder()
.put(ProtocolConstants.VERSION_0, new ProtocolDecoderV0())
.put(ProtocolConstants.VERSION_1, new ProtocolDecoderV1())
.put(ProtocolConstants.VERSION_2, new ProtocolDecoderV2())
.build();
this.protocolEncoderMap =ImmutableMap.<Byte, ProtocolEncoder>builder()
.put(ProtocolConstants.VERSION_0, new ProtocolEncoderV0())
.put(ProtocolConstants.VERSION_1, new ProtocolEncoderV1())
.put(ProtocolConstants.VERSION_2, new ProtocolEncoderV2())
.build();
this.channelHandlers = channelHandlers;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ public interface ProtocolDecoder {

RpcMessage decodeFrame(ByteBuf in);

byte protocolVersion();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@
**/
public interface ProtocolEncoder {
void encode(RpcMessage rpcMessage, ByteBuf out);

byte protocolVersion();
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public RpcMessage decodeFrame(ByteBuf in) {
bs2[1] = (byte) (0x00FF & typeCode);
System.arraycopy(bs, 0, bs2, 2, length);
byte codecType = isSeataCodec ? SerializerType.SEATA.getCode() : SerializerType.HESSIAN.getCode();
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(codecType), ProtocolConstants.VERSION_0);
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(codecType), protocolVersion());
rpcMessage.setBody(serializer.deserialize(bs2));
} catch (Exception e) {
LOGGER.error("decode error", e);
Expand All @@ -151,4 +151,9 @@ protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception
throw new DecodeException(exx);
}
}

@Override
public byte protocolVersion(){
return ProtocolConstants.VERSION_0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void encode(RpcMessage message, ByteBuf out) {
}

byte[] bodyBytes = null;
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(codec), ProtocolConstants.VERSION_0);
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(codec), protocolVersion());
bodyBytes = serializer.serialize(msg.getBody());

if (msg.isSeataCodec()) {
Expand Down Expand Up @@ -118,4 +118,9 @@ protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws
LOGGER.error("Encode request error!", e);
}
}

@Override
public byte protocolVersion(){
return ProtocolConstants.VERSION_0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,8 @@ protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception
return decoded;
}

@Override
public byte protocolVersion(){
return ProtocolConstants.VERSION_1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void encode(RpcMessage message, ByteBuf out) {

byte messageType = rpcMessage.getMessageType();
out.writeBytes(ProtocolConstants.MAGIC_CODE_BYTES);
out.writeByte(ProtocolConstants.VERSION_1);
out.writeByte(protocolVersion());
// full Length(4B) and head length(2B) will fix in the end.
out.writerIndex(out.writerIndex() + 6);
out.writeByte(messageType);
Expand All @@ -94,7 +94,7 @@ public void encode(RpcMessage message, ByteBuf out) {
if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
&& messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
// heartbeat has no body
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()), ProtocolConstants.VERSION_1);
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()), protocolVersion());
bodyBytes = serializer.serialize(rpcMessage.getBody());
Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor());
bodyBytes = compressor.compress(bodyBytes);
Expand Down Expand Up @@ -134,4 +134,8 @@ protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws
}
}

@Override
public byte protocolVersion(){
return ProtocolConstants.VERSION_1;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.core.rpc.netty.v2;

import org.apache.seata.core.protocol.ProtocolConstants;
import org.apache.seata.core.rpc.netty.v1.ProtocolDecoderV1;

/**
* Decoder of protocol-v2
**/
public class ProtocolDecoderV2 extends ProtocolDecoderV1 {

@Override
public byte protocolVersion(){
return ProtocolConstants.VERSION_1;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.core.rpc.netty.v2;

import org.apache.seata.core.protocol.ProtocolConstants;
import org.apache.seata.core.rpc.netty.v1.ProtocolEncoderV1;

/**
* Encoder of protocol-v2
**/
public class ProtocolEncoderV2 extends ProtocolEncoderV1 {
@Override
public byte protocolVersion(){
return ProtocolConstants.VERSION_1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ private void onRegRmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("RM checkAuth for client:{},vgroup:{},applicationId:{} is FAIL", ipAndPort, message.getTransactionServiceGroup(), message.getApplicationId());
}
errorInfo = "RM checkAuth fail";
}
} catch (Exception exx) {
isSuccess = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ private void onRegTmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
LOGGER.warn("TM checkAuth for client:{},vgroup:{},applicationId:{} is FAIL",
ipAndPort, message.getTransactionServiceGroup(), message.getApplicationId());
}
errorInfo = "TM checkAuth fail";
}
} catch (Exception exx) {
isSuccess = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.apache.seata.mockserver.processor;

import io.netty.channel.ChannelHandlerContext;
import org.apache.commons.lang.StringUtils;
import org.apache.seata.core.protocol.AbstractResultMessage;
import org.apache.seata.core.protocol.RegisterRMRequest;
import org.apache.seata.core.protocol.RegisterRMResponse;
import org.apache.seata.core.protocol.RegisterTMRequest;
Expand Down Expand Up @@ -49,27 +51,31 @@ public MockRegisterProcessor(RemotingServer remotingServer, Role role) {

@Override
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
if (role == Role.TM) {
RegisterTMRequest message = (RegisterTMRequest) rpcMessage.getBody();
LOGGER.info("message = " + message);

ChannelManager.registerTMChannel(message, ctx.channel());
Version.putChannelVersion(ctx.channel(), message.getVersion());

RegisterTMResponse resp = new RegisterTMResponse();
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resp);
LOGGER.info("sendAsyncResponse: {}", resp);
} else if (role == Role.RM) {
RegisterRMRequest message = (RegisterRMRequest) rpcMessage.getBody();
LOGGER.info("message = " + message);

ChannelManager.registerRMChannel(message, ctx.channel());
Version.putChannelVersion(ctx.channel(), message.getVersion());

RegisterRMResponse resp = new RegisterRMResponse();
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resp);
LOGGER.info("sendAsyncResponse: {}", resp);
String errorInfo = StringUtils.EMPTY;
AbstractResultMessage response = null;
try{
if (role == Role.TM) {
RegisterTMRequest message = (RegisterTMRequest) rpcMessage.getBody();
LOGGER.info("reg message = " + message);
ChannelManager.registerTMChannel(message, ctx.channel());
Version.putChannelVersion(ctx.channel(), message.getVersion());
response = new RegisterTMResponse();
} else if (role == Role.RM) {
RegisterRMRequest message = (RegisterRMRequest) rpcMessage.getBody();
LOGGER.info("reg message = " + message);
ChannelManager.registerRMChannel(message, ctx.channel());
Version.putChannelVersion(ctx.channel(), message.getVersion());
response = new RegisterRMResponse();
}
}catch (Exception e){
errorInfo = e.getMessage();
LOGGER.error(role +" register fail, error message:{}", errorInfo);
}
if (StringUtils.isNotEmpty(errorInfo)) {
response.setMsg(errorInfo);
}
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response);
LOGGER.info("sendAsyncResponse: {}", response);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

import org.apache.seata.core.protocol.ProtocolConstants;
import org.apache.seata.serializer.seata.protocol.BatchResultMessageCodec;
import org.apache.seata.serializer.seata.protocol.MergeResultMessageCodec;
import org.apache.seata.serializer.seata.protocol.MergedWarpMessageCodec;
Expand Down Expand Up @@ -77,6 +78,8 @@
import org.apache.seata.core.protocol.transaction.GlobalStatusRequest;
import org.apache.seata.core.protocol.transaction.GlobalStatusResponse;
import org.apache.seata.core.protocol.transaction.UndoLogDeleteRequest;
import org.apache.seata.serializer.seata.protocol.v2.RegisterRMResponseCodecV2;
import org.apache.seata.serializer.seata.protocol.v2.RegisterTMResponseCodecV2;

/**
* The type Message codec factory.
Expand Down Expand Up @@ -117,13 +120,21 @@ public static MessageSeataCodec getMessageCodec(short typeCode, byte version) {
msgCodec = new RegisterTMRequestCodec();
break;
case MessageType.TYPE_REG_CLT_RESULT:
msgCodec = new RegisterTMResponseCodec();
if (version == ProtocolConstants.VERSION_2) {
msgCodec = new RegisterTMResponseCodecV2();
} else {
msgCodec = new RegisterTMResponseCodec();
}
break;
case MessageType.TYPE_REG_RM:
msgCodec = new RegisterRMRequestCodec();
break;
case MessageType.TYPE_REG_RM_RESULT:
msgCodec = new RegisterRMResponseCodec();
if (version == ProtocolConstants.VERSION_2) {
msgCodec = new RegisterRMResponseCodecV2();
} else {
msgCodec = new RegisterRMResponseCodec();
}
break;
case MessageType.TYPE_BRANCH_COMMIT:
msgCodec = new BranchCommitRequestCodec();
Expand Down
Loading
Loading