From 0f1c3e9b8762a5c92d28a4a16560b63fb4e20406 Mon Sep 17 00:00:00 2001 From: RussellLuo Date: Wed, 2 May 2018 20:08:11 +0800 Subject: [PATCH 1/3] Add support for DeleteGroups Fixed #1095. --- broker.go | 11 ++++++ broker_test.go | 13 +++++++ delete_groups_request.go | 30 +++++++++++++++ delete_groups_request_test.go | 34 +++++++++++++++++ delete_groups_response.go | 67 ++++++++++++++++++++++++++++++++++ delete_groups_response_test.go | 57 +++++++++++++++++++++++++++++ request.go | 2 + 7 files changed, 214 insertions(+) create mode 100644 delete_groups_request.go create mode 100644 delete_groups_request_test.go create mode 100644 delete_groups_response.go create mode 100644 delete_groups_response_test.go diff --git a/broker.go b/broker.go index 9755a7d7c..d836bee6d 100644 --- a/broker.go +++ b/broker.go @@ -539,6 +539,17 @@ func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsRespon return response, nil } + +func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) { + response := new(DeleteGroupsResponse) + + if err := b.sendAndReceive(request, response); err != nil { + return nil, err + } + + return response, nil +} + func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) { b.lock.Lock() defer b.lock.Unlock() diff --git a/broker_test.go b/broker_test.go index cc73b4440..1525bcac6 100644 --- a/broker_test.go +++ b/broker_test.go @@ -297,6 +297,19 @@ var brokerTestTable = []struct { t.Error("ApiVersions request got no response!") } }}, + + {"DeleteGroupsRequest", + []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, + func(t *testing.T, broker *Broker) { + request := DeleteGroupsRequest{} + response, err := broker.DeleteGroups(&request) + if err != nil { + t.Error(err) + } + if response == nil { + t.Error("DeleteGroups request got no response!") + } + }}, } func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) { diff --git a/delete_groups_request.go b/delete_groups_request.go new file mode 100644 index 000000000..305a324ac --- /dev/null +++ b/delete_groups_request.go @@ -0,0 +1,30 @@ +package sarama + +type DeleteGroupsRequest struct { + Groups []string +} + +func (r *DeleteGroupsRequest) encode(pe packetEncoder) error { + return pe.putStringArray(r.Groups) +} + +func (r *DeleteGroupsRequest) decode(pd packetDecoder, version int16) (err error) { + r.Groups, err = pd.getStringArray() + return +} + +func (r *DeleteGroupsRequest) key() int16 { + return 42 +} + +func (r *DeleteGroupsRequest) version() int16 { + return 0 +} + +func (r *DeleteGroupsRequest) requiredVersion() KafkaVersion { + return V1_1_0_0 +} + +func (r *DeleteGroupsRequest) AddGroup(group string) { + r.Groups = append(r.Groups, group) +} diff --git a/delete_groups_request_test.go b/delete_groups_request_test.go new file mode 100644 index 000000000..908172498 --- /dev/null +++ b/delete_groups_request_test.go @@ -0,0 +1,34 @@ +package sarama + +import "testing" + +var ( + emptyDeleteGroupsRequest = []byte{0, 0, 0, 0} + + singleDeleteGroupsRequest = []byte{ + 0, 0, 0, 1, // 1 group + 0, 3, 'f', 'o', 'o', // group name: foo + } + + doubleDeleteGroupsRequest = []byte{ + 0, 0, 0, 2, // 2 groups + 0, 3, 'f', 'o', 'o', // group name: foo + 0, 3, 'b', 'a', 'r', // group name: foo + } +) + +func TestDeleteGroupsRequest(t *testing.T) { + var request *DeleteGroupsRequest + + request = new(DeleteGroupsRequest) + testRequest(t, "no groups", request, emptyDeleteGroupsRequest) + + request = new(DeleteGroupsRequest) + request.AddGroup("foo") + testRequest(t, "one group", request, singleDeleteGroupsRequest) + + request = new(DeleteGroupsRequest) + request.AddGroup("foo") + request.AddGroup("bar") + testRequest(t, "two groups", request, doubleDeleteGroupsRequest) +} diff --git a/delete_groups_response.go b/delete_groups_response.go new file mode 100644 index 000000000..81c372e36 --- /dev/null +++ b/delete_groups_response.go @@ -0,0 +1,67 @@ +package sarama + +type DeleteGroupsResponse struct { + ThrottleTimeMs int32 + GroupErrorCodes map[string]KError +} + +func (r *DeleteGroupsResponse) encode(pe packetEncoder) error { + pe.putInt32(r.ThrottleTimeMs) + + if err := pe.putArrayLength(len(r.GroupErrorCodes)); err != nil { + return err + } + for groupID, errorCode := range r.GroupErrorCodes { + if err := pe.putString(groupID); err != nil { + return err + } + pe.putInt16(int16(errorCode)) + } + + return nil +} + +func (r *DeleteGroupsResponse) decode(pd packetDecoder, version int16) error { + throttleTimeMs, err := pd.getInt32() + if err != nil { + return err + } + + r.ThrottleTimeMs = throttleTimeMs + + n, err := pd.getArrayLength() + if err != nil { + return err + } + if n == 0 { + return nil + } + + r.GroupErrorCodes = make(map[string]KError, n) + for i := 0; i < n; i++ { + groupID, err := pd.getString() + if err != nil { + return err + } + errorCode, err := pd.getInt16() + if err != nil { + return err + } + + r.GroupErrorCodes[groupID] = KError(errorCode) + } + + return nil +} + +func (r *DeleteGroupsResponse) key() int16 { + return 42 +} + +func (r *DeleteGroupsResponse) version() int16 { + return 0 +} + +func (r *DeleteGroupsResponse) requiredVersion() KafkaVersion { + return V1_1_0_0 +} diff --git a/delete_groups_response_test.go b/delete_groups_response_test.go new file mode 100644 index 000000000..855e305c8 --- /dev/null +++ b/delete_groups_response_test.go @@ -0,0 +1,57 @@ +package sarama + +import ( + "testing" +) + +var ( + emptyDeleteGroupsResponse = []byte{ + 0, 0, 0, 0, // does not violate any quota + 0, 0, 0, 0, // no groups + } + + errorDeleteGroupsResponse = []byte{ + 0, 0, 0, 0, // does not violate any quota + 0, 0, 0, 1, // 1 group + 0, 3, 'f', 'o', 'o', // group name + 0, 31, // error ErrClusterAuthorizationFailed + } + + noErrorDeleteGroupsResponse = []byte{ + 0, 0, 0, 0, // does not violate any quota + 0, 0, 0, 1, // 1 group + 0, 3, 'f', 'o', 'o', // group name + 0, 0, // no error + } +) + +func TestDeleteGroupsResponse(t *testing.T) { + var response *DeleteGroupsResponse + + response = new(DeleteGroupsResponse) + testVersionDecodable(t, "empty", response, emptyDeleteGroupsResponse, 0) + if response.ThrottleTimeMs != 0 { + t.Error("Expected no violation") + } + if len(response.GroupErrorCodes) != 0 { + t.Error("Expected no groups") + } + + response = new(DeleteGroupsResponse) + testVersionDecodable(t, "error", response, errorDeleteGroupsResponse, 0) + if response.ThrottleTimeMs != 0 { + t.Error("Expected no violation") + } + if response.GroupErrorCodes["foo"] != ErrClusterAuthorizationFailed { + t.Error("Expected error ErrClusterAuthorizationFailed, found:", response.GroupErrorCodes["foo"]) + } + + response = new(DeleteGroupsResponse) + testVersionDecodable(t, "no error", response, noErrorDeleteGroupsResponse, 0) + if response.ThrottleTimeMs != 0 { + t.Error("Expected no violation") + } + if response.GroupErrorCodes["foo"] != ErrNoError { + t.Error("Expected error ErrClusterAuthorizationFailed, found:", response.GroupErrorCodes["foo"]) + } +} diff --git a/request.go b/request.go index 80333a1f2..4d211a14f 100644 --- a/request.go +++ b/request.go @@ -142,6 +142,8 @@ func allocateBody(key, version int16) protocolBody { return &AlterConfigsRequest{} case 37: return &CreatePartitionsRequest{} + case 42: + return &DeleteGroupsRequest{} } return nil } From 7d531e25579af3979c5ef174d47b823916d26466 Mon Sep 17 00:00:00 2001 From: RussellLuo Date: Fri, 4 May 2018 13:45:35 +0800 Subject: [PATCH 2/3] Use native time type for ThrottleTime --- delete_groups_response.go | 13 ++++++++----- delete_groups_response_test.go | 6 +++--- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/delete_groups_response.go b/delete_groups_response.go index 81c372e36..c067ebb42 100644 --- a/delete_groups_response.go +++ b/delete_groups_response.go @@ -1,12 +1,16 @@ package sarama +import ( + "time" +) + type DeleteGroupsResponse struct { - ThrottleTimeMs int32 + ThrottleTime time.Duration GroupErrorCodes map[string]KError } func (r *DeleteGroupsResponse) encode(pe packetEncoder) error { - pe.putInt32(r.ThrottleTimeMs) + pe.putInt32(int32(r.ThrottleTime / time.Millisecond)) if err := pe.putArrayLength(len(r.GroupErrorCodes)); err != nil { return err @@ -22,12 +26,11 @@ func (r *DeleteGroupsResponse) encode(pe packetEncoder) error { } func (r *DeleteGroupsResponse) decode(pd packetDecoder, version int16) error { - throttleTimeMs, err := pd.getInt32() + throttleTime, err := pd.getInt32() if err != nil { return err } - - r.ThrottleTimeMs = throttleTimeMs + r.ThrottleTime = time.Duration(throttleTime) * time.Millisecond n, err := pd.getArrayLength() if err != nil { diff --git a/delete_groups_response_test.go b/delete_groups_response_test.go index 855e305c8..6f622b5f0 100644 --- a/delete_groups_response_test.go +++ b/delete_groups_response_test.go @@ -30,7 +30,7 @@ func TestDeleteGroupsResponse(t *testing.T) { response = new(DeleteGroupsResponse) testVersionDecodable(t, "empty", response, emptyDeleteGroupsResponse, 0) - if response.ThrottleTimeMs != 0 { + if response.ThrottleTime != 0 { t.Error("Expected no violation") } if len(response.GroupErrorCodes) != 0 { @@ -39,7 +39,7 @@ func TestDeleteGroupsResponse(t *testing.T) { response = new(DeleteGroupsResponse) testVersionDecodable(t, "error", response, errorDeleteGroupsResponse, 0) - if response.ThrottleTimeMs != 0 { + if response.ThrottleTime != 0 { t.Error("Expected no violation") } if response.GroupErrorCodes["foo"] != ErrClusterAuthorizationFailed { @@ -48,7 +48,7 @@ func TestDeleteGroupsResponse(t *testing.T) { response = new(DeleteGroupsResponse) testVersionDecodable(t, "no error", response, noErrorDeleteGroupsResponse, 0) - if response.ThrottleTimeMs != 0 { + if response.ThrottleTime != 0 { t.Error("Expected no violation") } if response.GroupErrorCodes["foo"] != ErrNoError { From 61cb20c055033e80b4ec423a2c0cb2cae56e964f Mon Sep 17 00:00:00 2001 From: RussellLuo Date: Fri, 4 May 2018 16:42:17 +0800 Subject: [PATCH 3/3] Make kafka-version configurable --- broker_test.go | 51 +++++++++++++++++++++++++++++++++----------------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/broker_test.go b/broker_test.go index 1525bcac6..9263cef8b 100644 --- a/broker_test.go +++ b/broker_test.go @@ -71,7 +71,7 @@ func TestSimpleBrokerCommunication(t *testing.T) { // Set the broker id in order to validate local broker metrics broker.id = 0 conf := NewConfig() - conf.Version = V0_10_0_0 + conf.Version = tt.version err := broker.Open(conf) if err != nil { t.Fatal(err) @@ -97,11 +97,13 @@ func TestSimpleBrokerCommunication(t *testing.T) { // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake var brokerTestTable = []struct { + version KafkaVersion name string response []byte runner func(*testing.T, *Broker) }{ - {"MetadataRequest", + {V0_10_0_0, + "MetadataRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := MetadataRequest{} @@ -114,7 +116,8 @@ var brokerTestTable = []struct { } }}, - {"ConsumerMetadataRequest", + {V0_10_0_0, + "ConsumerMetadataRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := ConsumerMetadataRequest{} @@ -127,7 +130,8 @@ var brokerTestTable = []struct { } }}, - {"ProduceRequest (NoResponse)", + {V0_10_0_0, + "ProduceRequest (NoResponse)", []byte{}, func(t *testing.T, broker *Broker) { request := ProduceRequest{} @@ -141,7 +145,8 @@ var brokerTestTable = []struct { } }}, - {"ProduceRequest (WaitForLocal)", + {V0_10_0_0, + "ProduceRequest (WaitForLocal)", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := ProduceRequest{} @@ -155,7 +160,8 @@ var brokerTestTable = []struct { } }}, - {"FetchRequest", + {V0_10_0_0, + "FetchRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := FetchRequest{} @@ -168,7 +174,8 @@ var brokerTestTable = []struct { } }}, - {"OffsetFetchRequest", + {V0_10_0_0, + "OffsetFetchRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := OffsetFetchRequest{} @@ -181,7 +188,8 @@ var brokerTestTable = []struct { } }}, - {"OffsetCommitRequest", + {V0_10_0_0, + "OffsetCommitRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := OffsetCommitRequest{} @@ -194,7 +202,8 @@ var brokerTestTable = []struct { } }}, - {"OffsetRequest", + {V0_10_0_0, + "OffsetRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := OffsetRequest{} @@ -207,7 +216,8 @@ var brokerTestTable = []struct { } }}, - {"JoinGroupRequest", + {V0_10_0_0, + "JoinGroupRequest", []byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := JoinGroupRequest{} @@ -220,7 +230,8 @@ var brokerTestTable = []struct { } }}, - {"SyncGroupRequest", + {V0_10_0_0, + "SyncGroupRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := SyncGroupRequest{} @@ -233,7 +244,8 @@ var brokerTestTable = []struct { } }}, - {"LeaveGroupRequest", + {V0_10_0_0, + "LeaveGroupRequest", []byte{0x00, 0x00}, func(t *testing.T, broker *Broker) { request := LeaveGroupRequest{} @@ -246,7 +258,8 @@ var brokerTestTable = []struct { } }}, - {"HeartbeatRequest", + {V0_10_0_0, + "HeartbeatRequest", []byte{0x00, 0x00}, func(t *testing.T, broker *Broker) { request := HeartbeatRequest{} @@ -259,7 +272,8 @@ var brokerTestTable = []struct { } }}, - {"ListGroupsRequest", + {V0_10_0_0, + "ListGroupsRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := ListGroupsRequest{} @@ -272,7 +286,8 @@ var brokerTestTable = []struct { } }}, - {"DescribeGroupsRequest", + {V0_10_0_0, + "DescribeGroupsRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := DescribeGroupsRequest{} @@ -285,7 +300,8 @@ var brokerTestTable = []struct { } }}, - {"ApiVersionsRequest", + {V0_10_0_0, + "ApiVersionsRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := ApiVersionsRequest{} @@ -298,7 +314,8 @@ var brokerTestTable = []struct { } }}, - {"DeleteGroupsRequest", + {V1_1_0_0, + "DeleteGroupsRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := DeleteGroupsRequest{}