From 6e761b033a957a13e39c291b25c1054c4cc40b33 Mon Sep 17 00:00:00 2001 From: "Xu Han@AutoMQ" Date: Thu, 26 Sep 2024 10:37:10 +0800 Subject: [PATCH] feat(zonerouter): add client type (#2037) Signed-off-by: Robin Han --- .../kafka/automq/zonerouter/ClientIdKey.java | 1 + .../automq/zonerouter/ClientIdMetadata.java | 8 ++++++ .../kafka/automq/zonerouter/ClientType.java | 28 +++++++++++++++++++ 3 files changed, 37 insertions(+) create mode 100644 core/src/main/java/kafka/automq/zonerouter/ClientType.java diff --git a/core/src/main/java/kafka/automq/zonerouter/ClientIdKey.java b/core/src/main/java/kafka/automq/zonerouter/ClientIdKey.java index bc31bd7372..39170e18f2 100644 --- a/core/src/main/java/kafka/automq/zonerouter/ClientIdKey.java +++ b/core/src/main/java/kafka/automq/zonerouter/ClientIdKey.java @@ -13,4 +13,5 @@ public class ClientIdKey { public static final String AVAILABILITY_ZONE = "automq_az"; + public static final String CLIENT_TYPE = "automq_type"; } diff --git a/core/src/main/java/kafka/automq/zonerouter/ClientIdMetadata.java b/core/src/main/java/kafka/automq/zonerouter/ClientIdMetadata.java index 9ea7c6d3cb..3b002896cd 100644 --- a/core/src/main/java/kafka/automq/zonerouter/ClientIdMetadata.java +++ b/core/src/main/java/kafka/automq/zonerouter/ClientIdMetadata.java @@ -43,6 +43,14 @@ public String rack() { return list.get(0); } + public ClientType clientType() { + List list = metadata.get(ClientIdKey.CLIENT_TYPE); + if (list == null || list.isEmpty()) { + return null; + } + return ClientType.parse(list.get(0)); + } + public String clientId() { return clientId; } diff --git a/core/src/main/java/kafka/automq/zonerouter/ClientType.java b/core/src/main/java/kafka/automq/zonerouter/ClientType.java new file mode 100644 index 0000000000..60ae2397ce --- /dev/null +++ b/core/src/main/java/kafka/automq/zonerouter/ClientType.java @@ -0,0 +1,28 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * The use of this file is governed by the Business Source License, + * as detailed in the file "/LICENSE.S3Stream" included in this repository. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package kafka.automq.zonerouter; + +public enum ClientType { + PRODUCER, CONSUMER; + + public static ClientType parse(String str) { + switch (str) { + case "producer": + return PRODUCER; + case "consumer": + return CONSUMER; + default: + return null; + } + } + +}