Skip to content

Commit

Permalink
Add Utils for put header to Metadata to avoid duplicate data. (#8792)
Browse files Browse the repository at this point in the history
  • Loading branch information
dingshuangxi888 authored Oct 9, 2024
1 parent a948f67 commit 95b1c51
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.proxy.common.utils;

import io.grpc.Attributes;
import io.grpc.Metadata;
import io.grpc.ServerCall;

public class GrpcUtils {

private GrpcUtils() {
}

public static <T> void putHeaderIfNotExist(Metadata headers, Metadata.Key<T> key, T value) {
if (headers == null) {
return;
}
if (!headers.containsKey(key) && value != null) {
headers.put(key, value);
}
}

public static <R, W, T> T getAttribute(ServerCall<R, W> call, Attributes.Key<T> key) {
Attributes attributes = call.getAttributes();
if (attributes == null) {
return null;
}
return attributes.get(key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.rocketmq.acl.common.AuthenticationHeader;
import org.apache.rocketmq.acl.plain.PlainAccessResource;
import org.apache.rocketmq.common.constant.GrpcConstants;
import org.apache.rocketmq.proxy.common.utils.GrpcUtils;
import org.apache.rocketmq.proxy.config.ConfigurationManager;

public class AuthenticationInterceptor implements ServerInterceptor {
Expand All @@ -49,8 +50,8 @@ public <R, W> ServerCall.Listener<R> interceptCall(ServerCall<R, W> call, Metada
@Override
public void onMessage(R message) {
GeneratedMessageV3 messageV3 = (GeneratedMessageV3) message;
headers.put(GrpcConstants.RPC_NAME, messageV3.getDescriptorForType().getFullName());
headers.put(GrpcConstants.SIMPLE_RPC_NAME, messageV3.getDescriptorForType().getName());
GrpcUtils.putHeaderIfNotExist(headers, GrpcConstants.RPC_NAME, messageV3.getDescriptorForType().getFullName());
GrpcUtils.putHeaderIfNotExist(headers, GrpcConstants.SIMPLE_RPC_NAME, messageV3.getDescriptorForType().getName());
if (ConfigurationManager.getProxyConfig().isEnableACL()) {
try {
AuthenticationHeader authenticationHeader = AuthenticationHeader.builder()
Expand Down Expand Up @@ -85,7 +86,7 @@ protected void validate(AuthenticationHeader authenticationHeader, Metadata head

if (accessResource instanceof PlainAccessResource) {
PlainAccessResource plainAccessResource = (PlainAccessResource) accessResource;
headers.put(GrpcConstants.AUTHORIZATION_AK, plainAccessResource.getAccessKey());
GrpcUtils.putHeaderIfNotExist(headers, GrpcConstants.AUTHORIZATION_AK, plainAccessResource.getAccessKey());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.constant.HAProxyConstants;
import org.apache.rocketmq.common.constant.GrpcConstants;
import org.apache.rocketmq.proxy.common.utils.GrpcUtils;
import org.apache.rocketmq.proxy.grpc.constant.AttributeKeys;

import java.net.InetSocketAddress;
Expand All @@ -44,11 +45,11 @@ public <R, W> ServerCall.Listener<R> interceptCall(
SocketAddress remoteSocketAddress = call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
remoteAddress = parseSocketAddress(remoteSocketAddress);
}
headers.put(GrpcConstants.REMOTE_ADDRESS, remoteAddress);
GrpcUtils.putHeaderIfNotExist(headers, GrpcConstants.REMOTE_ADDRESS, remoteAddress);

SocketAddress localSocketAddress = call.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
String localAddress = parseSocketAddress(localSocketAddress);
headers.put(GrpcConstants.LOCAL_ADDRESS, localAddress);
GrpcUtils.putHeaderIfNotExist(headers, GrpcConstants.LOCAL_ADDRESS, localAddress);

for (Attributes.Key<?> key : call.getAttributes().keys()) {
if (!StringUtils.startsWith(key.toString(), HAProxyConstants.PROXY_PROTOCOL_PREFIX)) {
Expand All @@ -57,12 +58,12 @@ public <R, W> ServerCall.Listener<R> interceptCall(
Metadata.Key<String> headerKey
= Metadata.Key.of(key.toString(), Metadata.ASCII_STRING_MARSHALLER);
String headerValue = String.valueOf(call.getAttributes().get(key));
headers.put(headerKey, headerValue);
GrpcUtils.putHeaderIfNotExist(headers, headerKey, headerValue);
}

String channelId = call.getAttributes().get(AttributeKeys.CHANNEL_ID);
if (StringUtils.isNotBlank(channelId)) {
headers.put(GrpcConstants.CHANNEL_ID, channelId);
GrpcUtils.putHeaderIfNotExist(headers, GrpcConstants.CHANNEL_ID, channelId);
}

return next.startCall(call, headers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.common.utils.GrpcUtils;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;

public class AuthenticationPipeline implements RequestPipeline {
Expand Down Expand Up @@ -73,7 +74,7 @@ protected AuthenticationContext newContext(ProxyContext context, Metadata header
if (result instanceof DefaultAuthenticationContext) {
DefaultAuthenticationContext defaultAuthenticationContext = (DefaultAuthenticationContext) result;
if (StringUtils.isNotBlank(defaultAuthenticationContext.getUsername())) {
headers.put(GrpcConstants.AUTHORIZATION_AK, defaultAuthenticationContext.getUsername());
GrpcUtils.putHeaderIfNotExist(headers, GrpcConstants.AUTHORIZATION_AK, defaultAuthenticationContext.getUsername());
}
}
return result;
Expand Down

0 comments on commit 95b1c51

Please sign in to comment.