diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/utils/GrpcUtils.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/utils/GrpcUtils.java new file mode 100644 index 00000000000..5c50de4426e --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/utils/GrpcUtils.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.common.utils; + +import io.grpc.Attributes; +import io.grpc.Metadata; +import io.grpc.ServerCall; + +public class GrpcUtils { + + private GrpcUtils() { + } + + public static void putHeaderIfNotExist(Metadata headers, Metadata.Key key, T value) { + if (headers == null) { + return; + } + if (!headers.containsKey(key) && value != null) { + headers.put(key, value); + } + } + + public static T getAttribute(ServerCall call, Attributes.Key key) { + Attributes attributes = call.getAttributes(); + if (attributes == null) { + return null; + } + return attributes.get(key); + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/AuthenticationInterceptor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/AuthenticationInterceptor.java index 28ee019fae7..e082ba6e28c 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/AuthenticationInterceptor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/AuthenticationInterceptor.java @@ -33,6 +33,7 @@ import org.apache.rocketmq.acl.common.AuthenticationHeader; import org.apache.rocketmq.acl.plain.PlainAccessResource; import org.apache.rocketmq.common.constant.GrpcConstants; +import org.apache.rocketmq.proxy.common.utils.GrpcUtils; import org.apache.rocketmq.proxy.config.ConfigurationManager; public class AuthenticationInterceptor implements ServerInterceptor { @@ -49,8 +50,8 @@ public ServerCall.Listener interceptCall(ServerCall call, Metada @Override public void onMessage(R message) { GeneratedMessageV3 messageV3 = (GeneratedMessageV3) message; - headers.put(GrpcConstants.RPC_NAME, messageV3.getDescriptorForType().getFullName()); - headers.put(GrpcConstants.SIMPLE_RPC_NAME, messageV3.getDescriptorForType().getName()); + GrpcUtils.putHeaderIfNotExist(headers, GrpcConstants.RPC_NAME, messageV3.getDescriptorForType().getFullName()); + GrpcUtils.putHeaderIfNotExist(headers, GrpcConstants.SIMPLE_RPC_NAME, messageV3.getDescriptorForType().getName()); if (ConfigurationManager.getProxyConfig().isEnableACL()) { try { AuthenticationHeader authenticationHeader = AuthenticationHeader.builder() @@ -85,7 +86,7 @@ protected void validate(AuthenticationHeader authenticationHeader, Metadata head if (accessResource instanceof PlainAccessResource) { PlainAccessResource plainAccessResource = (PlainAccessResource) accessResource; - headers.put(GrpcConstants.AUTHORIZATION_AK, plainAccessResource.getAccessKey()); + GrpcUtils.putHeaderIfNotExist(headers, GrpcConstants.AUTHORIZATION_AK, plainAccessResource.getAccessKey()); } } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java index 1de2ce4f986..e3e78841559 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java @@ -27,6 +27,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.constant.HAProxyConstants; import org.apache.rocketmq.common.constant.GrpcConstants; +import org.apache.rocketmq.proxy.common.utils.GrpcUtils; import org.apache.rocketmq.proxy.grpc.constant.AttributeKeys; import java.net.InetSocketAddress; @@ -44,11 +45,11 @@ public ServerCall.Listener interceptCall( SocketAddress remoteSocketAddress = call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); remoteAddress = parseSocketAddress(remoteSocketAddress); } - headers.put(GrpcConstants.REMOTE_ADDRESS, remoteAddress); + GrpcUtils.putHeaderIfNotExist(headers, GrpcConstants.REMOTE_ADDRESS, remoteAddress); SocketAddress localSocketAddress = call.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR); String localAddress = parseSocketAddress(localSocketAddress); - headers.put(GrpcConstants.LOCAL_ADDRESS, localAddress); + GrpcUtils.putHeaderIfNotExist(headers, GrpcConstants.LOCAL_ADDRESS, localAddress); for (Attributes.Key key : call.getAttributes().keys()) { if (!StringUtils.startsWith(key.toString(), HAProxyConstants.PROXY_PROTOCOL_PREFIX)) { @@ -57,12 +58,12 @@ public ServerCall.Listener interceptCall( Metadata.Key headerKey = Metadata.Key.of(key.toString(), Metadata.ASCII_STRING_MARSHALLER); String headerValue = String.valueOf(call.getAttributes().get(key)); - headers.put(headerKey, headerValue); + GrpcUtils.putHeaderIfNotExist(headers, headerKey, headerValue); } String channelId = call.getAttributes().get(AttributeKeys.CHANNEL_ID); if (StringUtils.isNotBlank(channelId)) { - headers.put(GrpcConstants.CHANNEL_ID, channelId); + GrpcUtils.putHeaderIfNotExist(headers, GrpcConstants.CHANNEL_ID, channelId); } return next.startCall(call, headers); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/pipeline/AuthenticationPipeline.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/pipeline/AuthenticationPipeline.java index 58eed91c9fa..e317b48f1ed 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/pipeline/AuthenticationPipeline.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/pipeline/AuthenticationPipeline.java @@ -31,6 +31,7 @@ import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.common.utils.GrpcUtils; import org.apache.rocketmq.proxy.processor.MessagingProcessor; public class AuthenticationPipeline implements RequestPipeline { @@ -73,7 +74,7 @@ protected AuthenticationContext newContext(ProxyContext context, Metadata header if (result instanceof DefaultAuthenticationContext) { DefaultAuthenticationContext defaultAuthenticationContext = (DefaultAuthenticationContext) result; if (StringUtils.isNotBlank(defaultAuthenticationContext.getUsername())) { - headers.put(GrpcConstants.AUTHORIZATION_AK, defaultAuthenticationContext.getUsername()); + GrpcUtils.putHeaderIfNotExist(headers, GrpcConstants.AUTHORIZATION_AK, defaultAuthenticationContext.getUsername()); } } return result;