diff --git a/examples/AdminClient/Program.cs b/examples/AdminClient/Program.cs index df71cf77e..b19a9dc56 100644 --- a/examples/AdminClient/Program.cs +++ b/examples/AdminClient/Program.cs @@ -790,6 +790,7 @@ static async Task DescribeConsumerGroupsAsync(string bootstrapServers, string[] Console.WriteLine($" IsSimpleConsumerGroup: {group.IsSimpleConsumerGroup}"); Console.WriteLine($" PartitionAssignor: {group.PartitionAssignor}"); Console.WriteLine($" State: {group.State}"); + Console.WriteLine($" Type: {group.Type}"); Console.WriteLine($" Members:"); foreach (var m in group.Members) { @@ -801,6 +802,13 @@ static async Task DescribeConsumerGroupsAsync(string bootstrapServers, string[] topicPartitions = String.Join(", ", m.Assignment.TopicPartitions.Select(tp => tp.ToString())); } Console.WriteLine($" TopicPartitions: [{topicPartitions}]"); + Console.WriteLine($" TargetAssignment:"); + var targetTopicPartitions = ""; + if (m.TargetAssignment.TopicPartitions != null) + { + targetTopicPartitions = String.Join(", ", m.TargetAssignment.TopicPartitions.Select(tp => tp.ToString())); + } + Console.WriteLine($" TopicPartitions: [{targetTopicPartitions}]"); } if (includeAuthorizedOperations) { diff --git a/src/Confluent.Kafka/Admin/ConsumerGroupDescription.cs b/src/Confluent.Kafka/Admin/ConsumerGroupDescription.cs index cce551d30..0f9fa22bd 100644 --- a/src/Confluent.Kafka/Admin/ConsumerGroupDescription.cs +++ b/src/Confluent.Kafka/Admin/ConsumerGroupDescription.cs @@ -51,6 +51,11 @@ public class ConsumerGroupDescription /// public ConsumerGroupState State { get; set; } + /// + /// Consumer group type. + /// + public ConsumerGroupType Type { get; set;} + /// /// Broker that acts as consumer group coordinator (null if not known). /// @@ -92,6 +97,7 @@ public override string ToString() result.Append($"{{\"GroupId\": {GroupId.Quote()}"); result.Append($", \"Error\": \"{Error.Code}\", \"IsSimpleConsumerGroup\": {IsSimpleConsumerGroup.Quote()}"); result.Append($", \"PartitionAssignor\": {PartitionAssignor.Quote()}, \"State\": {State.ToString().Quote()}"); + result.Append($", \"Type\": {Type.ToString().Quote()}"); result.Append($", \"Coordinator\": {Coordinator?.ToString() ?? "null"}, \"Members\": [{members}]"); result.Append($", \"AuthorizedOperations\": {authorizedOperations}}}"); diff --git a/src/Confluent.Kafka/Admin/MemberDescription.cs b/src/Confluent.Kafka/Admin/MemberDescription.cs index 7b3fdb372..b0d30d684 100644 --- a/src/Confluent.Kafka/Admin/MemberDescription.cs +++ b/src/Confluent.Kafka/Admin/MemberDescription.cs @@ -49,6 +49,11 @@ public class MemberDescription /// Member assignment. /// public MemberAssignment Assignment { get; set; } + + /// + /// Target assignment. + /// + public MemberAssignment TargetAssignment { get; set; } /// /// Returns a JSON representation of this object. @@ -63,10 +68,14 @@ public override string ToString() Assignment.TopicPartitions.Select(topicPartition => $"{{\"Topic\": {topicPartition.Topic.Quote()}, \"Partition\": {topicPartition.Partition.Value}}}" ).ToList()); + var targetAssignment = string.Join(",", + TargetAssignment.TopicPartitions.Select(topicPartition => + $"{{\"Topic\": {topicPartition.Topic.Quote()}, \"Partition\": {topicPartition.Partition.Value}}}" + ).ToList()); result.Append($"{{\"ClientId\": {ClientId.Quote()}"); result.Append($", \"GroupInstanceId\": {GroupInstanceId.Quote()}, \"ConsumerId\": {ConsumerId.Quote()}"); - result.Append($", \"Host\": {Host.Quote()}, \"Assignment\": [{assignment}]}}"); + result.Append($", \"Host\": {Host.Quote()}, \"Assignment\": [{assignment}], \"TargetAssignment\": [{targetAssignment}]}}"); return result.ToString(); } diff --git a/src/Confluent.Kafka/AdminClient.cs b/src/Confluent.Kafka/AdminClient.cs index 082222233..9766d2f1d 100644 --- a/src/Confluent.Kafka/AdminClient.cs +++ b/src/Confluent.Kafka/AdminClient.cs @@ -342,6 +342,13 @@ private DescribeConsumerGroupsReport extractDescribeConsumerGroupsResults(IntPtr { member.Assignment.TopicPartitions = SafeKafkaHandle.GetTopicPartitionList(topicPartitionPtr); } + var targetAssignmentPtr = Librdkafka.MemberDescription_target_assignment(memberPtr); + var targetTopicPartitionPtr = Librdkafka.MemberAssignment_target_partitions(targetAssignmentPtr); + member.TargetAssignment = new MemberAssignment(); + if (targetTopicPartitionPtr != IntPtr.Zero) + { + member.TargetAssignment.TopicPartitions = SafeKafkaHandle.GetTopicPartitionList(targetTopicPartitionPtr); + } members.Add(member); } @@ -362,6 +369,8 @@ private DescribeConsumerGroupsReport extractDescribeConsumerGroupsResults(IntPtr PtrToStringUTF8(Librdkafka.ConsumerGroupDescription_partition_assignor(groupPtr)), State = Librdkafka.ConsumerGroupDescription_state(groupPtr), + Type = + Librdkafka.ConsumerGroupDescription_type(groupPtr), Coordinator = coordinator, Members = members, AuthorizedOperations = authorizedOperations, diff --git a/src/Confluent.Kafka/Impl/LibRdKafka.cs b/src/Confluent.Kafka/Impl/LibRdKafka.cs index 9f2fc4970..8d276ad78 100644 --- a/src/Confluent.Kafka/Impl/LibRdKafka.cs +++ b/src/Confluent.Kafka/Impl/LibRdKafka.cs @@ -424,6 +424,7 @@ static bool SetDelegates(Type nativeMethodsClass) _ConsumerGroupDescription_is_simple_consumer_group = (_ConsumerGroupDescription_is_simple_consumer_group_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_is_simple_consumer_group").CreateDelegate(typeof (_ConsumerGroupDescription_is_simple_consumer_group_delegate)); _ConsumerGroupDescription_partition_assignor = (_ConsumerGroupDescription_partition_assignor_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_partition_assignor").CreateDelegate(typeof (_ConsumerGroupDescription_partition_assignor_delegate)); _ConsumerGroupDescription_state = (_ConsumerGroupDescription_state_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_state").CreateDelegate(typeof (_ConsumerGroupDescription_state_delegate)); + _ConsumerGroupDescription_type = (_ConsumerGroupDescription_type_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_type").CreateDelegate(typeof (_ConsumerGroupDescription_type_delegate)); _ConsumerGroupDescription_coordinator = (_ConsumerGroupDescription_coordinator_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_coordinator").CreateDelegate(typeof (_ConsumerGroupDescription_coordinator_delegate)); _ConsumerGroupDescription_member_count = (_ConsumerGroupDescription_member_count_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_member_count").CreateDelegate(typeof (_ConsumerGroupDescription_member_count_delegate)); _ConsumerGroupDescription_authorized_operations = (_ConsumerGroupDescription_authorized_operations_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_authorized_operations").CreateDelegate(typeof (_ConsumerGroupDescription_authorized_operations_delegate)); @@ -434,6 +435,8 @@ static bool SetDelegates(Type nativeMethodsClass) _MemberDescription_host = (_MemberDescription_host_delegate)methods.Single(m => m.Name == "rd_kafka_MemberDescription_host").CreateDelegate(typeof (_MemberDescription_host_delegate)); _MemberDescription_assignment = (_MemberDescription_assignment_delegate)methods.Single(m => m.Name == "rd_kafka_MemberDescription_assignment").CreateDelegate(typeof (_MemberDescription_assignment_delegate)); _MemberAssignment_partitions = (_MemberAssignment_partitions_delegate)methods.Single(m => m.Name == "rd_kafka_MemberAssignment_partitions").CreateDelegate(typeof (_MemberAssignment_partitions_delegate)); + _MemberDescription_target_assignment = (_MemberDescription_target_assignment_delegate)methods.Single(m => m.Name == "rd_kafka_MemberDescription_target_assignment").CreateDelegate(typeof (_MemberDescription_target_assignment_delegate)); + _MemberAssignment_target_partitions = (_MemberAssignment_target_partitions_delegate)methods.Single(m => m.Name == "rd_kafka_MemberAssignment_target_partitions").CreateDelegate(typeof (_MemberAssignment_target_partitions_delegate)); _Node_id = (_Node_id_delegate)methods.Single(m => m.Name == "rd_kafka_Node_id").CreateDelegate(typeof (_Node_id_delegate)); _Node_host = (_Node_host_delegate)methods.Single(m => m.Name == "rd_kafka_Node_host").CreateDelegate(typeof (_Node_host_delegate)); _Node_port = (_Node_port_delegate)methods.Single(m => m.Name == "rd_kafka_Node_port").CreateDelegate(typeof (_Node_port_delegate)); @@ -1953,6 +1956,12 @@ internal static ConsumerGroupState ConsumerGroupDescription_state(IntPtr grpdes return _ConsumerGroupDescription_state(grpdesc); } + private delegate ConsumerGroupType _ConsumerGroupDescription_type_delegate(IntPtr grpdesc); + private static _ConsumerGroupDescription_type_delegate _ConsumerGroupDescription_type; + + internal static ConsumerGroupType ConsumerGroupDescription_type(IntPtr grpdesc) + => _ConsumerGroupDescription_type(grpdesc); + private delegate IntPtr _ConsumerGroupDescription_coordinator_delegate(IntPtr grpdesc); private static _ConsumerGroupDescription_coordinator_delegate _ConsumerGroupDescription_coordinator; internal static IntPtr ConsumerGroupDescription_coordinator(IntPtr grpdesc) @@ -2003,6 +2012,16 @@ internal static IntPtr MemberDescription_assignment(IntPtr member) internal static IntPtr MemberAssignment_topic_partitions(IntPtr assignment) => _MemberAssignment_partitions(assignment); + private delegate IntPtr _MemberDescription_target_assignment_delegate(IntPtr member); + private static _MemberDescription_target_assignment_delegate _MemberDescription_target_assignment; + internal static IntPtr MemberDescription_target_assignment(IntPtr member) + => _MemberDescription_target_assignment(member); + + private delegate IntPtr _MemberAssignment_target_partitions_delegate(IntPtr assignment); + private static _MemberAssignment_target_partitions_delegate _MemberAssignment_target_partitions; + internal static IntPtr MemberAssignment_target_partitions(IntPtr assignment) + => _MemberAssignment_target_partitions(assignment); + private delegate IntPtr _Node_id_delegate(IntPtr node); private static _Node_id_delegate _Node_id; internal static IntPtr Node_id(IntPtr node) => _Node_id(node); diff --git a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods.cs b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods.cs index 5b08d48cb..6f12509d8 100644 --- a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods.cs +++ b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods.cs @@ -1073,6 +1073,9 @@ internal static extern void rd_kafka_DescribeConsumerGroups( [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern ConsumerGroupState rd_kafka_ConsumerGroupDescription_state(IntPtr grpdesc); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern ConsumerGroupType rd_kafka_ConsumerGroupDescription_type(IntPtr grpdesc); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_ConsumerGroupDescription_coordinator(IntPtr grpdesc); @@ -1103,6 +1106,12 @@ internal static extern void rd_kafka_DescribeConsumerGroups( [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_MemberAssignment_partitions(IntPtr assignment); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberDescription_target_assignment(IntPtr member); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberAssignment_target_partitions(IntPtr assignment); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_Node_id(IntPtr node); diff --git a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Alpine.cs b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Alpine.cs index e269144bf..b1a93ca49 100644 --- a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Alpine.cs +++ b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Alpine.cs @@ -1077,6 +1077,9 @@ internal static extern void rd_kafka_DescribeConsumerGroups( [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern ConsumerGroupState rd_kafka_ConsumerGroupDescription_state(IntPtr grpdesc); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern ConsumerGroupType rd_kafka_ConsumerGroupDescription_type(IntPtr grpdesc); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_ConsumerGroupDescription_coordinator(IntPtr grpdesc); @@ -1107,6 +1110,12 @@ internal static extern void rd_kafka_DescribeConsumerGroups( [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_MemberAssignment_partitions(IntPtr assignment); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberDescription_target_assignment(IntPtr member); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberAssignment_target_partitions(IntPtr assignment); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_Node_id(IntPtr node); diff --git a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Centos8.cs b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Centos8.cs index 91f536b19..d3a22e2ba 100644 --- a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Centos8.cs +++ b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Centos8.cs @@ -1077,6 +1077,9 @@ internal static extern void rd_kafka_DescribeConsumerGroups( [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern ConsumerGroupState rd_kafka_ConsumerGroupDescription_state(IntPtr grpdesc); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern ConsumerGroupType rd_kafka_ConsumerGroupDescription_type(IntPtr grpdesc); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_ConsumerGroupDescription_coordinator(IntPtr grpdesc); @@ -1107,6 +1110,12 @@ internal static extern void rd_kafka_DescribeConsumerGroups( [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_MemberAssignment_partitions(IntPtr assignment); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberDescription_target_assignment(IntPtr member); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberAssignment_target_partitions(IntPtr assignment); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_Node_id(IntPtr node); diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/AdminClient_DescribeConsumerGroupsCompatability.cs b/test/Confluent.Kafka.IntegrationTests/Tests/AdminClient_DescribeConsumerGroupsCompatability.cs new file mode 100644 index 000000000..0830899a8 --- /dev/null +++ b/test/Confluent.Kafka.IntegrationTests/Tests/AdminClient_DescribeConsumerGroupsCompatability.cs @@ -0,0 +1,214 @@ +// Copyright 2024 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +#pragma warning disable xUnit1026 + +using System; +using System.Linq; +using System.Collections.Generic; +using Xunit; +using Confluent.Kafka.Admin; +using Confluent.Kafka.TestsCommon; + +namespace Confluent.Kafka.IntegrationTests +{ + public partial class Tests + { + // A convenience method to check the resultant ConsumerGroupDescription obtained on describing a group. + private void checkConsumerGroupDescription( + ConsumerGroupDescription desc, ConsumerGroupState state, ConsumerGroupType type, + string protocol, string groupID, + Dictionary> clientIdToToppars) + { + Assert.Equal(groupID, desc.GroupId); + Assert.Equal(ErrorCode.NoError, desc.Error.Code); + Assert.Equal(state, desc.State); + Assert.Equal(protocol, desc.PartitionAssignor); + // We can't check exactly the Broker information, but we add a + // check for the zero-value of the Host. + Assert.NotEqual("", desc.Coordinator.Host); + Assert.Equal(clientIdToToppars.Count(), desc.Members.Count()); + // We will run all our tests on non-simple consumer groups only. + Assert.False(desc.IsSimpleConsumerGroup); + Assert.NotEmpty(desc.AuthorizedOperations); + + foreach (var member in desc.Members) + { + Assert.True(clientIdToToppars.ContainsKey(member.ClientId)); + Assert.True(clientIdToToppars[member.ClientId].SequenceEqual(member.Assignment.TopicPartitions)); + } + } + + /// + /// Test functionality of AdminClient.DescribeConsumerGroups. + /// We test three cases: + /// 1. A list with two consumer groups created with new protocol. + /// 2. A list with two consumer groups created with old protocol. + /// 3. A list with four consumer groups, two created with new protocol and two with old protocol. + /// + [Theory, MemberData(nameof(KafkaParameters))] + public void AdminClient_DescribeConsumerGroupsCompatability(string bootstrapServers) + { + if (TestConsumerGroupProtocol.IsClassic()) + { + LogToFile("Creating new protocol Consumer Groups require" + + "Consumer Protocol"); + return; + } + + LogToFile("start AdminClient_DescribeConsumerGroupsCompatability"); + var groupID_new1 = Guid.NewGuid().ToString(); + var groupID_new2 = Guid.NewGuid().ToString(); + var groupID_old1 = Guid.NewGuid().ToString(); + var groupID_old2 = Guid.NewGuid().ToString(); + const string clientID1 = "test.client.1"; + const string clientID2 = "test.client.2"; + const string clientID3 = "test.client.3"; + const string clientID4 = "test.client.4"; + + // Create an AdminClient here - we need it throughout the test. + using (var adminClient = new AdminClientBuilder(new AdminClientConfig + { + BootstrapServers = bootstrapServers + }).Build()) + { + var describeOptionsWithTimeout = new Admin.DescribeConsumerGroupsOptions() + { + RequestTimeout = TimeSpan.FromSeconds(30), + IncludeAuthorizedOperations = true, + }; + + var consumerConfigs = new List<(string GroupId, string ClientId, string ProtocolType)> + { + (groupID_new1, clientID1, "consumer"), + (groupID_new2, clientID2, "consumer"), + (groupID_old1, clientID3, "classic"), + (groupID_old2, clientID4, "classic") + }; + + var consumers = new List>(); + + foreach (var config in consumerConfigs) + { + var consumerConfig = new ConsumerConfig + { + GroupId = config.GroupId, + BootstrapServers = bootstrapServers, + SessionTimeoutMs = 6000, + PartitionAssignmentStrategy = PartitionAssignmentStrategy.Range, + ClientId = config.ClientId, + GroupProtocolType = config.ProtocolType + }; + + var consumer = new TestConsumerBuilder(consumerConfig).Build(); + consumer.Subscribe(new[] { partitionedTopic }); + + // Wait for rebalance + consumer.Consume(TimeSpan.FromSeconds(10)); + + consumers.Add(consumer); + }; + + var groupIdToClientIdToToppars = new Dictionary>>(); + + // Group IDs and Client IDs + var groupClientMapping = new Dictionary + { + { groupID_new1, clientID1 }, + { groupID_new2, clientID2 }, + { groupID_old1, clientID3 }, + { groupID_old2, clientID4 } + }; + + // Loop through each group ID and client ID + foreach (var entry in groupClientMapping) + { + var groupID = entry.Key; + var clientID = entry.Value; + + // Create the topic partition list for this client ID + var topicPartitions = new List + { + new TopicPartition(partitionedTopic, 0), + new TopicPartition(partitionedTopic, 1), + }; + + // Add the mapping to the nested dictionary + if (!groupIdToClientIdToToppars.ContainsKey(groupID)) + { + groupIdToClientIdToToppars[groupID] = new Dictionary>(); + } + + groupIdToClientIdToToppars[groupID][clientID] = topicPartitions; + } + + // We test for 3 scenarios, passing the two groups with new protocl in first, + // and the two groups with old protocol in the second and then a mix of both. + // List with two consumer groups created with new protocol. + var descResult = adminClient.DescribeConsumerGroupsAsync( + new List() { groupID_new1, groupID_new2 }, + describeOptionsWithTimeout).Result; + var groupDesc1 = descResult.ConsumerGroupDescriptions.Find(group => group.GroupId == groupID_new1); + var groupDesc2 = descResult.ConsumerGroupDescriptions.Find(group => group.GroupId == groupID_new2); + + checkConsumerGroupDescription( + groupDesc1, ConsumerGroupState.Stable, ConsumerGroupType.Consumer, "range", groupID_new1, groupIdToClientIdToToppars[groupID_new1]); + checkConsumerGroupDescription( + groupDesc2, ConsumerGroupState.Stable, ConsumerGroupType.Consumer, "range", groupID_new2, groupIdToClientIdToToppars[groupID_new2]); + + // List with two consumer groups created with old protocol. + descResult = adminClient.DescribeConsumerGroupsAsync( + new List() { groupID_old1, groupID_old2 }, + describeOptionsWithTimeout).Result; + groupDesc1 = descResult.ConsumerGroupDescriptions.Find(group => group.GroupId == groupID_old1); + groupDesc2 = descResult.ConsumerGroupDescriptions.Find(group => group.GroupId == groupID_old2); + + checkConsumerGroupDescription( + groupDesc1, ConsumerGroupState.Stable, ConsumerGroupType.Classic, "range", groupID_old1, groupIdToClientIdToToppars[groupID_old1]); + checkConsumerGroupDescription( + groupDesc2, ConsumerGroupState.Stable, ConsumerGroupType.Classic, "range", groupID_old2, groupIdToClientIdToToppars[groupID_old2]); + + // List with four consumer groups, two created with new protocol and two with old protocol. + descResult = adminClient.DescribeConsumerGroupsAsync( + new List() { groupID_new1, groupID_old1, groupID_new2, groupID_old2 }, + describeOptionsWithTimeout).Result; + groupDesc1 = descResult.ConsumerGroupDescriptions.Find(group => group.GroupId == groupID_new1); + groupDesc2 = descResult.ConsumerGroupDescriptions.Find(group => group.GroupId == groupID_old1); + var groupDesc3 = descResult.ConsumerGroupDescriptions.Find(group => group.GroupId == groupID_new2); + var groupDesc4 = descResult.ConsumerGroupDescriptions.Find(group => group.GroupId == groupID_old2); + + checkConsumerGroupDescription( + groupDesc1, ConsumerGroupState.Stable, ConsumerGroupType.Consumer, "range", groupID_new1, groupIdToClientIdToToppars[groupID_new1]); + checkConsumerGroupDescription( + groupDesc2, ConsumerGroupState.Stable, ConsumerGroupType.Classic, "range", groupID_old1, groupIdToClientIdToToppars[groupID_old1]); + checkConsumerGroupDescription( + groupDesc3, ConsumerGroupState.Stable, ConsumerGroupType.Consumer, "range", groupID_new2, groupIdToClientIdToToppars[groupID_new2]); + checkConsumerGroupDescription( + groupDesc4, ConsumerGroupState.Stable, ConsumerGroupType.Classic, "range", groupID_old2, groupIdToClientIdToToppars[groupID_old2]); + + + foreach (var consumer in consumers) + { + consumer.Close(); + consumer.Dispose(); + } + + } + Assert.Equal(0, Library.HandleCount); + LogToFile("end AdminClient_DescribeConsumerGroupsCompatability"); + } + } +}