Skip to content

Commit

Permalink
Merge pull request IBM#1096 from RussellLuo/add-support-for-delete-gr…
Browse files Browse the repository at this point in the history
…oups

Add support for DeleteGroups
  • Loading branch information
eapache authored May 7, 2018
2 parents 1c45a00 + 61cb20c commit 3c763ff
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 16 deletions.
11 changes: 11 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,17 @@ func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsRespon

return response, nil
}

func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
response := new(DeleteGroupsResponse)

if err := b.sendAndReceive(request, response); err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
b.lock.Lock()
defer b.lock.Unlock()
Expand Down
62 changes: 46 additions & 16 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestSimpleBrokerCommunication(t *testing.T) {
// Set the broker id in order to validate local broker metrics
broker.id = 0
conf := NewConfig()
conf.Version = V0_10_0_0
conf.Version = tt.version
err := broker.Open(conf)
if err != nil {
t.Fatal(err)
Expand All @@ -97,11 +97,13 @@ func TestSimpleBrokerCommunication(t *testing.T) {

// We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
var brokerTestTable = []struct {
version KafkaVersion
name string
response []byte
runner func(*testing.T, *Broker)
}{
{"MetadataRequest",
{V0_10_0_0,
"MetadataRequest",
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := MetadataRequest{}
Expand All @@ -114,7 +116,8 @@ var brokerTestTable = []struct {
}
}},

{"ConsumerMetadataRequest",
{V0_10_0_0,
"ConsumerMetadataRequest",
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := ConsumerMetadataRequest{}
Expand All @@ -127,7 +130,8 @@ var brokerTestTable = []struct {
}
}},

{"ProduceRequest (NoResponse)",
{V0_10_0_0,
"ProduceRequest (NoResponse)",
[]byte{},
func(t *testing.T, broker *Broker) {
request := ProduceRequest{}
Expand All @@ -141,7 +145,8 @@ var brokerTestTable = []struct {
}
}},

{"ProduceRequest (WaitForLocal)",
{V0_10_0_0,
"ProduceRequest (WaitForLocal)",
[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := ProduceRequest{}
Expand All @@ -155,7 +160,8 @@ var brokerTestTable = []struct {
}
}},

{"FetchRequest",
{V0_10_0_0,
"FetchRequest",
[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := FetchRequest{}
Expand All @@ -168,7 +174,8 @@ var brokerTestTable = []struct {
}
}},

{"OffsetFetchRequest",
{V0_10_0_0,
"OffsetFetchRequest",
[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := OffsetFetchRequest{}
Expand All @@ -181,7 +188,8 @@ var brokerTestTable = []struct {
}
}},

{"OffsetCommitRequest",
{V0_10_0_0,
"OffsetCommitRequest",
[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := OffsetCommitRequest{}
Expand All @@ -194,7 +202,8 @@ var brokerTestTable = []struct {
}
}},

{"OffsetRequest",
{V0_10_0_0,
"OffsetRequest",
[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := OffsetRequest{}
Expand All @@ -207,7 +216,8 @@ var brokerTestTable = []struct {
}
}},

{"JoinGroupRequest",
{V0_10_0_0,
"JoinGroupRequest",
[]byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := JoinGroupRequest{}
Expand All @@ -220,7 +230,8 @@ var brokerTestTable = []struct {
}
}},

{"SyncGroupRequest",
{V0_10_0_0,
"SyncGroupRequest",
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := SyncGroupRequest{}
Expand All @@ -233,7 +244,8 @@ var brokerTestTable = []struct {
}
}},

{"LeaveGroupRequest",
{V0_10_0_0,
"LeaveGroupRequest",
[]byte{0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := LeaveGroupRequest{}
Expand All @@ -246,7 +258,8 @@ var brokerTestTable = []struct {
}
}},

{"HeartbeatRequest",
{V0_10_0_0,
"HeartbeatRequest",
[]byte{0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := HeartbeatRequest{}
Expand All @@ -259,7 +272,8 @@ var brokerTestTable = []struct {
}
}},

{"ListGroupsRequest",
{V0_10_0_0,
"ListGroupsRequest",
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := ListGroupsRequest{}
Expand All @@ -272,7 +286,8 @@ var brokerTestTable = []struct {
}
}},

{"DescribeGroupsRequest",
{V0_10_0_0,
"DescribeGroupsRequest",
[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := DescribeGroupsRequest{}
Expand All @@ -285,7 +300,8 @@ var brokerTestTable = []struct {
}
}},

{"ApiVersionsRequest",
{V0_10_0_0,
"ApiVersionsRequest",
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := ApiVersionsRequest{}
Expand All @@ -297,6 +313,20 @@ var brokerTestTable = []struct {
t.Error("ApiVersions request got no response!")
}
}},

{V1_1_0_0,
"DeleteGroupsRequest",
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := DeleteGroupsRequest{}
response, err := broker.DeleteGroups(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("DeleteGroups request got no response!")
}
}},
}

func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) {
Expand Down
30 changes: 30 additions & 0 deletions delete_groups_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package sarama

type DeleteGroupsRequest struct {
Groups []string
}

func (r *DeleteGroupsRequest) encode(pe packetEncoder) error {
return pe.putStringArray(r.Groups)
}

func (r *DeleteGroupsRequest) decode(pd packetDecoder, version int16) (err error) {
r.Groups, err = pd.getStringArray()
return
}

func (r *DeleteGroupsRequest) key() int16 {
return 42
}

func (r *DeleteGroupsRequest) version() int16 {
return 0
}

func (r *DeleteGroupsRequest) requiredVersion() KafkaVersion {
return V1_1_0_0
}

func (r *DeleteGroupsRequest) AddGroup(group string) {
r.Groups = append(r.Groups, group)
}
34 changes: 34 additions & 0 deletions delete_groups_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package sarama

import "testing"

var (
emptyDeleteGroupsRequest = []byte{0, 0, 0, 0}

singleDeleteGroupsRequest = []byte{
0, 0, 0, 1, // 1 group
0, 3, 'f', 'o', 'o', // group name: foo
}

doubleDeleteGroupsRequest = []byte{
0, 0, 0, 2, // 2 groups
0, 3, 'f', 'o', 'o', // group name: foo
0, 3, 'b', 'a', 'r', // group name: foo
}
)

func TestDeleteGroupsRequest(t *testing.T) {
var request *DeleteGroupsRequest

request = new(DeleteGroupsRequest)
testRequest(t, "no groups", request, emptyDeleteGroupsRequest)

request = new(DeleteGroupsRequest)
request.AddGroup("foo")
testRequest(t, "one group", request, singleDeleteGroupsRequest)

request = new(DeleteGroupsRequest)
request.AddGroup("foo")
request.AddGroup("bar")
testRequest(t, "two groups", request, doubleDeleteGroupsRequest)
}
70 changes: 70 additions & 0 deletions delete_groups_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package sarama

import (
"time"
)

type DeleteGroupsResponse struct {
ThrottleTime time.Duration
GroupErrorCodes map[string]KError
}

func (r *DeleteGroupsResponse) encode(pe packetEncoder) error {
pe.putInt32(int32(r.ThrottleTime / time.Millisecond))

if err := pe.putArrayLength(len(r.GroupErrorCodes)); err != nil {
return err
}
for groupID, errorCode := range r.GroupErrorCodes {
if err := pe.putString(groupID); err != nil {
return err
}
pe.putInt16(int16(errorCode))
}

return nil
}

func (r *DeleteGroupsResponse) decode(pd packetDecoder, version int16) error {
throttleTime, err := pd.getInt32()
if err != nil {
return err
}
r.ThrottleTime = time.Duration(throttleTime) * time.Millisecond

n, err := pd.getArrayLength()
if err != nil {
return err
}
if n == 0 {
return nil
}

r.GroupErrorCodes = make(map[string]KError, n)
for i := 0; i < n; i++ {
groupID, err := pd.getString()
if err != nil {
return err
}
errorCode, err := pd.getInt16()
if err != nil {
return err
}

r.GroupErrorCodes[groupID] = KError(errorCode)
}

return nil
}

func (r *DeleteGroupsResponse) key() int16 {
return 42
}

func (r *DeleteGroupsResponse) version() int16 {
return 0
}

func (r *DeleteGroupsResponse) requiredVersion() KafkaVersion {
return V1_1_0_0
}
57 changes: 57 additions & 0 deletions delete_groups_response_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package sarama

import (
"testing"
)

var (
emptyDeleteGroupsResponse = []byte{
0, 0, 0, 0, // does not violate any quota
0, 0, 0, 0, // no groups
}

errorDeleteGroupsResponse = []byte{
0, 0, 0, 0, // does not violate any quota
0, 0, 0, 1, // 1 group
0, 3, 'f', 'o', 'o', // group name
0, 31, // error ErrClusterAuthorizationFailed
}

noErrorDeleteGroupsResponse = []byte{
0, 0, 0, 0, // does not violate any quota
0, 0, 0, 1, // 1 group
0, 3, 'f', 'o', 'o', // group name
0, 0, // no error
}
)

func TestDeleteGroupsResponse(t *testing.T) {
var response *DeleteGroupsResponse

response = new(DeleteGroupsResponse)
testVersionDecodable(t, "empty", response, emptyDeleteGroupsResponse, 0)
if response.ThrottleTime != 0 {
t.Error("Expected no violation")
}
if len(response.GroupErrorCodes) != 0 {
t.Error("Expected no groups")
}

response = new(DeleteGroupsResponse)
testVersionDecodable(t, "error", response, errorDeleteGroupsResponse, 0)
if response.ThrottleTime != 0 {
t.Error("Expected no violation")
}
if response.GroupErrorCodes["foo"] != ErrClusterAuthorizationFailed {
t.Error("Expected error ErrClusterAuthorizationFailed, found:", response.GroupErrorCodes["foo"])
}

response = new(DeleteGroupsResponse)
testVersionDecodable(t, "no error", response, noErrorDeleteGroupsResponse, 0)
if response.ThrottleTime != 0 {
t.Error("Expected no violation")
}
if response.GroupErrorCodes["foo"] != ErrNoError {
t.Error("Expected error ErrClusterAuthorizationFailed, found:", response.GroupErrorCodes["foo"])
}
}
Loading

0 comments on commit 3c763ff

Please sign in to comment.