diff --git a/go.mod b/go.mod index 70cacc5d9..1af95673d 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.17.0 github.com/prometheus/common v0.44.0 - github.com/segmentio/kafka-go v0.4.44 + github.com/segmentio/kafka-go v0.4.45 github.com/sirupsen/logrus v1.9.3 github.com/spf13/cobra v1.8.0 github.com/spf13/pflag v1.0.5 diff --git a/go.sum b/go.sum index b96e4ca36..34629c8d9 100644 --- a/go.sum +++ b/go.sum @@ -774,8 +774,8 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo= github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo= -github.com/segmentio/kafka-go v0.4.44 h1:Vjjksniy0WSTZ7CuVJrz1k04UoZeTc77UV6Yyk6tLY4= -github.com/segmentio/kafka-go v0.4.44/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= +github.com/segmentio/kafka-go v0.4.45 h1:prqrZp1mMId4kI6pyPolkLsH6sWOUmDxmmucbL4WS6E= +github.com/segmentio/kafka-go v0.4.45/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0= github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= diff --git a/vendor/github.com/segmentio/kafka-go/alterpartitionreassignments.go b/vendor/github.com/segmentio/kafka-go/alterpartitionreassignments.go index ec76dbd8b..dd67d003b 100644 --- a/vendor/github.com/segmentio/kafka-go/alterpartitionreassignments.go +++ b/vendor/github.com/segmentio/kafka-go/alterpartitionreassignments.go @@ -13,7 +13,8 @@ type AlterPartitionReassignmentsRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr - // Topic is the name of the topic to alter partitions in. + // Topic is the name of the topic to alter partitions in. Keep this field empty and use Topic in AlterPartitionReassignmentsRequestAssignment to + // reassign to multiple topics. Topic string // Assignments is the list of partition reassignments to submit to the API. @@ -26,10 +27,13 @@ type AlterPartitionReassignmentsRequest struct { // AlterPartitionReassignmentsRequestAssignment contains the requested reassignments for a single // partition. type AlterPartitionReassignmentsRequestAssignment struct { + // Topic is the name of the topic to alter partitions in. If empty, the value of Topic in AlterPartitionReassignmentsRequest is used. + Topic string + // PartitionID is the ID of the partition to make the reassignments in. PartitionID int - // BrokerIDs is a slice of brokers to set the partition replicas to. + // BrokerIDs is a slice of brokers to set the partition replicas to, or null to cancel a pending reassignment for this partition. BrokerIDs []int } @@ -46,6 +50,9 @@ type AlterPartitionReassignmentsResponse struct { // AlterPartitionReassignmentsResponsePartitionResult contains the detailed result of // doing reassignments for a single partition. type AlterPartitionReassignmentsResponsePartitionResult struct { + // Topic is the topic name. + Topic string + // PartitionID is the ID of the partition that was altered. PartitionID int @@ -58,16 +65,29 @@ func (c *Client) AlterPartitionReassignments( ctx context.Context, req *AlterPartitionReassignmentsRequest, ) (*AlterPartitionReassignmentsResponse, error) { - apiPartitions := []alterpartitionreassignments.RequestPartition{} + apiTopicMap := make(map[string]*alterpartitionreassignments.RequestTopic) for _, assignment := range req.Assignments { + topic := assignment.Topic + if topic == "" { + topic = req.Topic + } + + apiTopic := apiTopicMap[topic] + if apiTopic == nil { + apiTopic = &alterpartitionreassignments.RequestTopic{ + Name: topic, + } + apiTopicMap[topic] = apiTopic + } + replicas := []int32{} for _, brokerID := range assignment.BrokerIDs { replicas = append(replicas, int32(brokerID)) } - apiPartitions = append( - apiPartitions, + apiTopic.Partitions = append( + apiTopic.Partitions, alterpartitionreassignments.RequestPartition{ PartitionIndex: int32(assignment.PartitionID), Replicas: replicas, @@ -77,12 +97,10 @@ func (c *Client) AlterPartitionReassignments( apiReq := &alterpartitionreassignments.Request{ TimeoutMs: int32(req.Timeout.Milliseconds()), - Topics: []alterpartitionreassignments.RequestTopic{ - { - Name: req.Topic, - Partitions: apiPartitions, - }, - }, + } + + for _, apiTopic := range apiTopicMap { + apiReq.Topics = append(apiReq.Topics, *apiTopic) } protoResp, err := c.roundTrip( @@ -104,6 +122,7 @@ func (c *Client) AlterPartitionReassignments( resp.PartitionResults = append( resp.PartitionResults, AlterPartitionReassignmentsResponsePartitionResult{ + Topic: topicResult.Name, PartitionID: int(partitionResult.PartitionIndex), Error: makeError(partitionResult.ErrorCode, partitionResult.ErrorMessage), }, diff --git a/vendor/github.com/segmentio/kafka-go/createacls.go b/vendor/github.com/segmentio/kafka-go/createacls.go index 672f6fdce..601974171 100644 --- a/vendor/github.com/segmentio/kafka-go/createacls.go +++ b/vendor/github.com/segmentio/kafka-go/createacls.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net" + "strings" "time" "github.com/segmentio/kafka-go/protocol/createacls" @@ -42,6 +43,43 @@ const ( ACLPermissionTypeAllow ACLPermissionType = 3 ) +func (apt ACLPermissionType) String() string { + mapping := map[ACLPermissionType]string{ + ACLPermissionTypeUnknown: "Unknown", + ACLPermissionTypeAny: "Any", + ACLPermissionTypeDeny: "Deny", + ACLPermissionTypeAllow: "Allow", + } + s, ok := mapping[apt] + if !ok { + s = mapping[ACLPermissionTypeUnknown] + } + return s +} + +// MarshalText transforms an ACLPermissionType into its string representation. +func (apt ACLPermissionType) MarshalText() ([]byte, error) { + return []byte(apt.String()), nil +} + +// UnmarshalText takes a string representation of the resource type and converts it to an ACLPermissionType. +func (apt *ACLPermissionType) UnmarshalText(text []byte) error { + normalized := strings.ToLower(string(text)) + mapping := map[string]ACLPermissionType{ + "unknown": ACLPermissionTypeUnknown, + "any": ACLPermissionTypeAny, + "deny": ACLPermissionTypeDeny, + "allow": ACLPermissionTypeAllow, + } + parsed, ok := mapping[normalized] + if !ok { + *apt = ACLPermissionTypeUnknown + return fmt.Errorf("cannot parse %s as an ACLPermissionType", normalized) + } + *apt = parsed + return nil +} + type ACLOperationType int8 const ( @@ -60,6 +98,62 @@ const ( ACLOperationTypeIdempotentWrite ACLOperationType = 12 ) +func (aot ACLOperationType) String() string { + mapping := map[ACLOperationType]string{ + ACLOperationTypeUnknown: "Unknown", + ACLOperationTypeAny: "Any", + ACLOperationTypeAll: "All", + ACLOperationTypeRead: "Read", + ACLOperationTypeWrite: "Write", + ACLOperationTypeCreate: "Create", + ACLOperationTypeDelete: "Delete", + ACLOperationTypeAlter: "Alter", + ACLOperationTypeDescribe: "Describe", + ACLOperationTypeClusterAction: "ClusterAction", + ACLOperationTypeDescribeConfigs: "DescribeConfigs", + ACLOperationTypeAlterConfigs: "AlterConfigs", + ACLOperationTypeIdempotentWrite: "IdempotentWrite", + } + s, ok := mapping[aot] + if !ok { + s = mapping[ACLOperationTypeUnknown] + } + return s +} + +// MarshalText transforms an ACLOperationType into its string representation. +func (aot ACLOperationType) MarshalText() ([]byte, error) { + return []byte(aot.String()), nil +} + +// UnmarshalText takes a string representation of the resource type and converts it to an ACLPermissionType. +func (aot *ACLOperationType) UnmarshalText(text []byte) error { + normalized := strings.ToLower(string(text)) + mapping := map[string]ACLOperationType{ + "unknown": ACLOperationTypeUnknown, + "any": ACLOperationTypeAny, + "all": ACLOperationTypeAll, + "read": ACLOperationTypeRead, + "write": ACLOperationTypeWrite, + "create": ACLOperationTypeCreate, + "delete": ACLOperationTypeDelete, + "alter": ACLOperationTypeAlter, + "describe": ACLOperationTypeDescribe, + "clusteraction": ACLOperationTypeClusterAction, + "describeconfigs": ACLOperationTypeDescribeConfigs, + "alterconfigs": ACLOperationTypeAlterConfigs, + "idempotentwrite": ACLOperationTypeIdempotentWrite, + } + parsed, ok := mapping[normalized] + if !ok { + *aot = ACLOperationTypeUnknown + return fmt.Errorf("cannot parse %s as an ACLOperationType", normalized) + } + *aot = parsed + return nil + +} + type ACLEntry struct { ResourceType ResourceType ResourceName string diff --git a/vendor/github.com/segmentio/kafka-go/listpartitionreassignments.go b/vendor/github.com/segmentio/kafka-go/listpartitionreassignments.go new file mode 100644 index 000000000..aa01fff3f --- /dev/null +++ b/vendor/github.com/segmentio/kafka-go/listpartitionreassignments.go @@ -0,0 +1,135 @@ +package kafka + +import ( + "context" + "net" + "time" + + "github.com/segmentio/kafka-go/protocol/listpartitionreassignments" +) + +// ListPartitionReassignmentsRequest is a request to the ListPartitionReassignments API. +type ListPartitionReassignmentsRequest struct { + // Address of the kafka broker to send the request to. + Addr net.Addr + + // Topics we want reassignments for, mapped by their name, or nil to list everything. + Topics map[string]ListPartitionReassignmentsRequestTopic + + // Timeout is the amount of time to wait for the request to complete. + Timeout time.Duration +} + +// ListPartitionReassignmentsRequestTopic contains the requested partitions for a single +// topic. +type ListPartitionReassignmentsRequestTopic struct { + // The partitions to list partition reassignments for. + PartitionIndexes []int +} + +// ListPartitionReassignmentsResponse is a response from the ListPartitionReassignments API. +type ListPartitionReassignmentsResponse struct { + // Error is set to a non-nil value including the code and message if a top-level + // error was encountered. + Error error + + // Topics contains results for each topic, mapped by their name. + Topics map[string]ListPartitionReassignmentsResponseTopic +} + +// ListPartitionReassignmentsResponseTopic contains the detailed result of +// ongoing reassignments for a topic. +type ListPartitionReassignmentsResponseTopic struct { + // Partitions contains result for topic partitions. + Partitions []ListPartitionReassignmentsResponsePartition +} + +// ListPartitionReassignmentsResponsePartition contains the detailed result of +// ongoing reassignments for a single partition. +type ListPartitionReassignmentsResponsePartition struct { + // PartitionIndex contains index of the partition. + PartitionIndex int + + // Replicas contains the current replica set. + Replicas []int + + // AddingReplicas contains the set of replicas we are currently adding. + AddingReplicas []int + + // RemovingReplicas contains the set of replicas we are currently removing. + RemovingReplicas []int +} + +func (c *Client) ListPartitionReassignments( + ctx context.Context, + req *ListPartitionReassignmentsRequest, +) (*ListPartitionReassignmentsResponse, error) { + apiReq := &listpartitionreassignments.Request{ + TimeoutMs: int32(req.Timeout.Milliseconds()), + } + + for topicName, topicReq := range req.Topics { + apiReq.Topics = append( + apiReq.Topics, + listpartitionreassignments.RequestTopic{ + Name: topicName, + PartitionIndexes: intToInt32Array(topicReq.PartitionIndexes), + }, + ) + } + + protoResp, err := c.roundTrip( + ctx, + req.Addr, + apiReq, + ) + if err != nil { + return nil, err + } + apiResp := protoResp.(*listpartitionreassignments.Response) + + resp := &ListPartitionReassignmentsResponse{ + Error: makeError(apiResp.ErrorCode, apiResp.ErrorMessage), + Topics: make(map[string]ListPartitionReassignmentsResponseTopic), + } + + for _, topicResult := range apiResp.Topics { + respTopic := ListPartitionReassignmentsResponseTopic{} + for _, partitionResult := range topicResult.Partitions { + respTopic.Partitions = append( + respTopic.Partitions, + ListPartitionReassignmentsResponsePartition{ + PartitionIndex: int(partitionResult.PartitionIndex), + Replicas: int32ToIntArray(partitionResult.Replicas), + AddingReplicas: int32ToIntArray(partitionResult.AddingReplicas), + RemovingReplicas: int32ToIntArray(partitionResult.RemovingReplicas), + }, + ) + } + resp.Topics[topicResult.Name] = respTopic + } + + return resp, nil +} + +func intToInt32Array(arr []int) []int32 { + if arr == nil { + return nil + } + res := make([]int32, len(arr)) + for i := range arr { + res[i] = int32(arr[i]) + } + return res +} + +func int32ToIntArray(arr []int32) []int { + if arr == nil { + return nil + } + res := make([]int, len(arr)) + for i := range arr { + res[i] = int(arr[i]) + } + return res +} diff --git a/vendor/github.com/segmentio/kafka-go/protocol/alterpartitionreassignments/alterpartitionreassignments.go b/vendor/github.com/segmentio/kafka-go/protocol/alterpartitionreassignments/alterpartitionreassignments.go index 4894a2e6a..7f8d2ed2f 100644 --- a/vendor/github.com/segmentio/kafka-go/protocol/alterpartitionreassignments/alterpartitionreassignments.go +++ b/vendor/github.com/segmentio/kafka-go/protocol/alterpartitionreassignments/alterpartitionreassignments.go @@ -23,7 +23,7 @@ type RequestTopic struct { type RequestPartition struct { PartitionIndex int32 `kafka:"min=v0,max=v0"` - Replicas []int32 `kafka:"min=v0,max=v0"` + Replicas []int32 `kafka:"min=v0,max=v0,nullable"` } func (r *Request) ApiKey() protocol.ApiKey { diff --git a/vendor/github.com/segmentio/kafka-go/protocol/listpartitionreassignments/listpartitionreassignments.go b/vendor/github.com/segmentio/kafka-go/protocol/listpartitionreassignments/listpartitionreassignments.go new file mode 100644 index 000000000..d26a64101 --- /dev/null +++ b/vendor/github.com/segmentio/kafka-go/protocol/listpartitionreassignments/listpartitionreassignments.go @@ -0,0 +1,70 @@ +package listpartitionreassignments + +import "github.com/segmentio/kafka-go/protocol" + +func init() { + protocol.Register(&Request{}, &Response{}) +} + +// Detailed API definition: https://kafka.apache.org/protocol#The_Messages_ListPartitionReassignments. + +type Request struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v0,max=v0,tag"` + + TimeoutMs int32 `kafka:"min=v0,max=v0"` + Topics []RequestTopic `kafka:"min=v0,max=v0,nullable"` +} + +type RequestTopic struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v0,max=v0,tag"` + + Name string `kafka:"min=v0,max=v0"` + PartitionIndexes []int32 `kafka:"min=v0,max=v0"` +} + +func (r *Request) ApiKey() protocol.ApiKey { + return protocol.ListPartitionReassignments +} + +func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) { + return cluster.Brokers[cluster.Controller], nil +} + +type Response struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v0,max=v0,tag"` + + ThrottleTimeMs int32 `kafka:"min=v0,max=v0"` + ErrorCode int16 `kafka:"min=v0,max=v0"` + ErrorMessage string `kafka:"min=v0,max=v0,nullable"` + Topics []ResponseTopic `kafka:"min=v0,max=v0"` +} + +type ResponseTopic struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v0,max=v0,tag"` + + Name string `kafka:"min=v0,max=v0"` + Partitions []ResponsePartition `kafka:"min=v0,max=v0"` +} + +type ResponsePartition struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v0,max=v0,tag"` + + PartitionIndex int32 `kafka:"min=v0,max=v0"` + Replicas []int32 `kafka:"min=v0,max=v0"` + AddingReplicas []int32 `kafka:"min=v0,max=v0"` + RemovingReplicas []int32 `kafka:"min=v0,max=v0"` +} + +func (r *Response) ApiKey() protocol.ApiKey { + return protocol.ListPartitionReassignments +} diff --git a/vendor/github.com/segmentio/kafka-go/resource.go b/vendor/github.com/segmentio/kafka-go/resource.go index f5c2e73a5..b9be107c2 100644 --- a/vendor/github.com/segmentio/kafka-go/resource.go +++ b/vendor/github.com/segmentio/kafka-go/resource.go @@ -1,5 +1,10 @@ package kafka +import ( + "fmt" + "strings" +) + // https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java type ResourceType int8 @@ -15,6 +20,50 @@ const ( ResourceTypeDelegationToken ResourceType = 6 ) +func (rt ResourceType) String() string { + mapping := map[ResourceType]string{ + ResourceTypeUnknown: "Unknown", + ResourceTypeAny: "Any", + ResourceTypeTopic: "Topic", + ResourceTypeGroup: "Group", + // Note that ResourceTypeBroker and ResourceTypeCluster have the same value. + // A map cannot have duplicate values so we just use the same value for both. + ResourceTypeCluster: "Cluster", + ResourceTypeTransactionalID: "Transactionalid", + ResourceTypeDelegationToken: "Delegationtoken", + } + s, ok := mapping[rt] + if !ok { + s = mapping[ResourceTypeUnknown] + } + return s +} + +func (rt ResourceType) MarshalText() ([]byte, error) { + return []byte(rt.String()), nil +} + +func (rt *ResourceType) UnmarshalText(text []byte) error { + normalized := strings.ToLower(string(text)) + mapping := map[string]ResourceType{ + "unknown": ResourceTypeUnknown, + "any": ResourceTypeAny, + "topic": ResourceTypeTopic, + "group": ResourceTypeGroup, + "broker": ResourceTypeBroker, + "cluster": ResourceTypeCluster, + "transactionalid": ResourceTypeTransactionalID, + "delegationtoken": ResourceTypeDelegationToken, + } + parsed, ok := mapping[normalized] + if !ok { + *rt = ResourceTypeUnknown + return fmt.Errorf("cannot parse %s as a ResourceType", normalized) + } + *rt = parsed + return nil +} + // https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/PatternType.java type PatternType int8 @@ -35,3 +84,40 @@ const ( // that start with 'foo'. PatternTypePrefixed PatternType = 4 ) + +func (pt PatternType) String() string { + mapping := map[PatternType]string{ + PatternTypeUnknown: "Unknown", + PatternTypeAny: "Any", + PatternTypeMatch: "Match", + PatternTypeLiteral: "Literal", + PatternTypePrefixed: "Prefixed", + } + s, ok := mapping[pt] + if !ok { + s = mapping[PatternTypeUnknown] + } + return s +} + +func (pt PatternType) MarshalText() ([]byte, error) { + return []byte(pt.String()), nil +} + +func (pt *PatternType) UnmarshalText(text []byte) error { + normalized := strings.ToLower(string(text)) + mapping := map[string]PatternType{ + "unknown": PatternTypeUnknown, + "any": PatternTypeAny, + "match": PatternTypeMatch, + "literal": PatternTypeLiteral, + "prefixed": PatternTypePrefixed, + } + parsed, ok := mapping[normalized] + if !ok { + *pt = PatternTypeUnknown + return fmt.Errorf("cannot parse %s as a PatternType", normalized) + } + *pt = parsed + return nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 22b2f899c..e8185ccb1 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -334,7 +334,7 @@ github.com/sagikazarmark/locafero # github.com/sagikazarmark/slog-shim v0.1.0 ## explicit; go 1.20 github.com/sagikazarmark/slog-shim -# github.com/segmentio/kafka-go v0.4.44 +# github.com/segmentio/kafka-go v0.4.45 ## explicit; go 1.15 github.com/segmentio/kafka-go github.com/segmentio/kafka-go/compress @@ -373,6 +373,7 @@ github.com/segmentio/kafka-go/protocol/joingroup github.com/segmentio/kafka-go/protocol/leavegroup github.com/segmentio/kafka-go/protocol/listgroups github.com/segmentio/kafka-go/protocol/listoffsets +github.com/segmentio/kafka-go/protocol/listpartitionreassignments github.com/segmentio/kafka-go/protocol/metadata github.com/segmentio/kafka-go/protocol/offsetcommit github.com/segmentio/kafka-go/protocol/offsetdelete